notes

Log | Files | Refs | README

unix_socket.md (6053B)


      1 # Unix Socket
      2 
      3 A Unix domain socket (UDS) is a special file on your filesystem (e.g.
      4 `/tmp/app.sock`) that acts as a communication endpoint between processes on the
      5 same machine. Unlike TCP sockets, data never leaves the OS kernel — it's copied
      6 directly between process buffers — making it significantly faster for local IPC.
      7 The server binds to the socket path, and one or more clients connect to it.
      8 
      9 Key properties:
     10 
     11 - Lives at a filesystem path, controlled by file permissions ​
     12 
     13 - Supports stream mode (like TCP — ordered, reliable) and datagram mode (like
     14   UDP — message-based)
     15 
     16 - Commonly used between e.g. Nginx ↔ PHP-FPM, apps ↔ local databases, or
     17   microservices on the same host
     18 
     19 ## Rust Examples: Multiple Producers → Single Receiver
     20 
     21 The pattern here is: **multiple client threads each connect to the same Unix
     22 socket**, and a single server receives all their messages. We'll use tokio for
     23 async I/O.
     24 [Unix Listener](https://docs.rs/tokio/latest/tokio/net/struct.UnixListener.html),
     25 [Unix Stream](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html)
     26 
     27 ```toml
     28 [dependencies]
     29 tokio = { version = "1", features = ["full"] }
     30 ```
     31 
     32 ### Example 1 — Basic Server (Single Receiver)
     33 
     34 The server binds to a socket path and handles each incoming connection, printing
     35 whatever data it receives.
     36 [docs](https://docs.rs/tokio/latest/tokio/net/struct.UnixListener.html)
     37 
     38 ```rust
     39 // server.rs
     40 use tokio::net::UnixListener;
     41 use tokio::io::AsyncReadExt;
     42 
     43 #[tokio::main]
     44 async fn main() {
     45     let socket_path = "/tmp/mpsr.sock";
     46     let _ = std::fs::remove_file(socket_path); // clean up stale socket
     47 
     48     let listener = UnixListener::bind(socket_path).unwrap();
     49     println!("Server listening on {}", socket_path);
     50 
     51     loop {
     52         match listener.accept().await {
     53             Ok((mut stream, _addr)) => {
     54                 // Spawn a task per connection — all funnel into this single server
     55                 tokio::spawn(async move {
     56                     let mut buf = vec![0u8; 1024];
     57                     match stream.read(&mut buf).await {
     58                         Ok(n) if n > 0 => {
     59                             let msg = String::from_utf8_lossy(&buf[..n]);
     60                             println!("[Server] Received: {}", msg);
     61                         }
     62                         _ => eprintln!("[Server] Connection closed or error"),
     63                     }
     64                 });
     65             }
     66             Err(e) => eprintln!("Accept error: {}", e),
     67         }
     68     }
     69 }
     70 ```
     71 
     72 ### Example 2 — Multiple Producers (Clients)
     73 
     74 Each producer runs in its own thread, connects to the socket, and sends a
     75 message.
     76 
     77 ```rust
     78 // client.rs
     79 use tokio::net::UnixStream;
     80 use tokio::io::AsyncWriteExt;
     81 
     82 #[tokio::main]
     83 async fn main() {
     84     let socket_path = "/tmp/mpsr.sock";
     85     let num_producers = 5;
     86 
     87     let mut handles = vec![];
     88 
     89     for i in 0..num_producers {
     90         let path = socket_path.to_string();
     91         let handle = tokio::spawn(async move {
     92             match UnixStream::connect(&path).await {
     93                 Ok(mut stream) => {
     94                     let msg = format!("Hello from producer {}", i);
     95                     stream.write_all(msg.as_bytes()).await.unwrap();
     96                     println!("[Producer {}] Sent: {}", i, msg);
     97                 }
     98                 Err(e) => eprintln!("[Producer {}] Failed to connect: {}", i, e),
     99             }
    100         });
    101         handles.push(handle);
    102     }
    103 
    104     for h in handles {
    105         h.await.unwrap();
    106     }
    107 }
    108 ```
    109 
    110 ### Example 3 — Combined: Server + Producers in One Binary
    111 
    112 A self-contained example where the server is spawned as a background task, and 5
    113 producers send concurrently.
    114 
    115 ```rust
    116 use tokio::net::{UnixListener, UnixStream};
    117 use tokio::io::{AsyncReadExt, AsyncWriteExt};
    118 use tokio::time::{sleep, Duration};
    119 
    120 #[tokio::main]
    121 async fn main() {
    122     let socket_path = "/tmp/mpsr_combined.sock";
    123     let _ = std::fs::remove_file(socket_path);
    124 
    125     // --- Spawn the single receiver (server) ---
    126     let server_path = socket_path.to_string();
    127     tokio::spawn(async move {
    128         let listener = UnixListener::bind(&server_path).unwrap();
    129         println!("[Server] Listening...");
    130         loop {
    131             if let Ok((mut stream, _)) = listener.accept().await {
    132                 tokio::spawn(async move {
    133                     let mut buf = vec![0u8; 256];
    134                     if let Ok(n) = stream.read(&mut buf).await {
    135                         println!("[Server] Got: {}", String::from_utf8_lossy(&buf[..n]));
    136                     }
    137                 });
    138             }
    139         }
    140     });
    141 
    142     // Give the server a moment to bind
    143     sleep(Duration::from_millis(50)).await;
    144 
    145     // --- Spawn multiple producers ---
    146     let mut handles = vec![];
    147     for i in 0..5 {
    148         let path = socket_path.to_string();
    149         handles.push(tokio::spawn(async move {
    150             let mut stream = UnixStream::connect(&path).await.unwrap();
    151             let msg = format!("Message from producer {}", i);
    152             stream.write_all(msg.as_bytes()).await.unwrap();
    153             println!("[Producer {}] Sent.", i);
    154         }));
    155     }
    156 
    157     for h in handles {
    158         h.await.unwrap();
    159     }
    160 
    161     sleep(Duration::from_millis(100)).await; // let server print all messages
    162 }
    163 ```
    164 
    165 Expected output (order may vary since tasks run concurrently):
    166 
    167 ```text
    168 [Server] Listening...
    169 [Producer 0] Sent.
    170 [Producer 2] Sent.
    171 [Server] Got: Message from producer 0
    172 [Server] Got: Message from producer 2
    173 ...
    174 ```
    175 
    176 ### How the MPSR Pattern Works Here
    177 
    178 Each producer independently connects and sends — the Unix socket listener
    179 naturally queues incoming connections. The tokio::spawn per connection is what
    180 makes this a multiple-producer, single-receiver funnel: one server loop accepts
    181 all connections, but each connection is handled concurrently. If you also want
    182 to aggregate messages into a single channel, you can pair this with
    183 std::sync::mpsc or tokio::sync::mpsc inside the server's accept loop.
    184 [mpsc](https://blog.softwaremill.com/multithreading-in-rust-with-mpsc-multi-producer-single-consumer-channels-db0fc91ae3fa)