notes

Log | Files | Refs | README

data_streams.md (6676B)


      1 # Data Streams
      2 
      3 ## Data Streams and Pipe
      4 
      5 <img src="/whiteboards/data_stream.png" alt="Data Stream" width="100%">
      6 
      7 ## Passing Data Streams between rust binaries
      8 
      9 We can pass data directly between binaries, they don't have to go through a
     10 networking layers http server with ports, which adds unecessary overhead when it
     11 is not needed.
     12 
     13 <img src="/whiteboards/pipe_between_binaries.png" alt="Data Stream" width="100%">
     14 
     15 ## Making your own “filter” binary
     16 
     17 If you want your Rust binary to be the thing that “processes stdout from another
     18 binary”, just read from stdin and write to stdout:
     19 
     20 ```rust
     21 use std::io::{self, BufRead, Write};
     22 
     23 fn main() -> io::Result<()> {
     24     let stdin = io::stdin();
     25     let stdout = io::stdout();
     26     let mut out = stdout.lock();
     27 
     28     for line in stdin.lock().lines() {
     29         let line = line?;
     30         // transform line
     31         let processed = line.replace("foo", "bar");
     32         writeln!(out, "{processed}")?;
     33     }
     34 
     35     Ok(())
     36 }
     37 ```
     38 
     39 Then in a shell you can do `producer_cmd | your_rust_filter | consumer_cmd`.
     40 
     41 ## Use processed data as stdin to another binary
     42 
     43 To “chain” binaries but keep Rust in the middle, you can run a second command
     44 and write your processed data into its stdin.
     45 
     46 ```rust
     47 use std::io::{self, BufRead, BufReader, Write};
     48 use std::process::{Command, Stdio};
     49 
     50 fn main() -> io::Result<()> {
     51     // First binary: produces data
     52     let mut producer = Command::new("producer_cmd")
     53         .arg("some_arg")
     54         .stdout(Stdio::piped())
     55         .spawn()?;
     56 
     57     let producer_stdout = producer
     58         .stdout
     59         .take()
     60         .expect("failed to capture producer stdout");
     61     let reader = BufReader::new(producer_stdout);
     62 
     63     // Second binary: consumes processed data
     64     let mut consumer = Command::new("consumer_cmd")
     65         .arg("another_arg")
     66         .stdin(Stdio::piped())
     67         .spawn()?;
     68 
     69     let consumer_stdin = consumer
     70         .stdin
     71         .take()
     72         .expect("failed to open consumer stdin");
     73 
     74     // Process loop: read from producer, transform, send to consumer
     75     let mut writer = consumer_stdin;
     76     for line in reader.lines() {
     77         let line = line?;
     78         let processed = format!("prefix: {line}\n"); // any transformation
     79         writer.write_all(processed.as_bytes())?;
     80     }
     81     // Close consumer stdin so it can finish
     82     drop(writer);
     83 
     84     // Wait for both processes to exit
     85     let prod_status = producer.wait()?;
     86     let cons_status = consumer.wait()?;
     87 
     88     eprintln!("producer: {prod_status}, consumer: {cons_status}");
     89 
     90     Ok(())
     91 }
     92 ```
     93 
     94 Conceptually this is equivalent to
     95 `producer_cmd | my_rust_filter | consumer_cmd`, but Rust is the middle filter,
     96 so you never rely on shell piping.
     97 
     98 ## Async Pipes with Tokio
     99 
    100 Use Tokio's async I/O to pipe binary data. `tokio::process::Command` mirrors the
    101 sync API but uses `AsyncRead/AsyncWrite` traits, letting you `.await` on reads
    102 and writes without blocking threads.
    103 
    104 ```rust
    105 use tokio::process::Command;
    106 use tokio::io::{AsyncReadExt, AsyncWriteExt};
    107 use tokio::process::Stdio;
    108 
    109 #[tokio::main]
    110 async fn main() {
    111     let mut child = Command::new("xxd")
    112         .stdin(Stdio::piped())
    113         .stdout(Stdio::piped())
    114         .spawn()
    115         .expect("Failed to spawn");
    116 
    117     let binary_data: Vec<u8> = (0u8..=255).collect(); // 256 bytes of raw binary
    118 
    119     // Write asynchronously
    120     if let Some(mut stdin) = child.stdin.take() {
    121         stdin.write_all(&binary_data).await.expect("Write failed");
    122         // Drop stdin to close pipe (send EOF)
    123     }
    124 
    125     // Read asynchronously
    126     let mut output = Vec::new();
    127     if let Some(mut stdout) = child.stdout.take() {
    128         stdout.read_to_end(&mut output).await.expect("Read failed");
    129     }
    130 
    131     child.wait().await.expect("Child failed");
    132     println!("Received {} bytes of output", output.len());
    133 }
    134 ```
    135 
    136 For streaming, long-running binaries, use tokio_process_tools which wraps
    137 Tokio's process API with real-time line/byte inspection via channels. ​
    138 
    139 ## Fan-In: Many Binaries → One Receiver
    140 
    141 This is a classic multi-producer, single-consumer (MPSC) pattern. You don't need
    142 special OS-level listeners — Tokio's mpsc channel is purpose-built for this.
    143 
    144 The architecture:
    145 
    146 - Each producer binary is spawned as a child process with stdout(Stdio::piped())
    147 
    148 - Each gets a clone of the mpsc::Sender
    149 
    150 - A single async task owns the Receiver and processes all incoming data
    151 
    152 ```rust
    153 use tokio::process::Command;
    154 use tokio::io::{AsyncReadExt, BufReader};
    155 use tokio::sync::mpsc;
    156 use std::process::Stdio;
    157 
    158 #[tokio::main]
    159 async fn main() {
    160     let (tx, mut rx) = mpsc::channel::<(usize, Vec<u8>)>(32);
    161 
    162     let producers = vec!["producer_a", "producer_b", "producer_c"];
    163 
    164     for (id, binary) in producers.iter().enumerate() {
    165         let tx = tx.clone(); // clone sender for each producer
    166 
    167         let mut child = Command::new(binary)
    168             .stdout(Stdio::piped())
    169             .spawn()
    170             .expect("Failed to spawn producer");
    171 
    172         tokio::spawn(async move {
    173             let mut stdout = BufReader::new(child.stdout.take().unwrap());
    174             let mut buf = Vec::new();
    175 
    176             loop {
    177                 let mut chunk = vec![0u8; 4096];
    178                 match stdout.read(&mut chunk).await {
    179                     Ok(0) => break,           // EOF
    180                     Ok(n) => {
    181                         chunk.truncate(n);
    182                         tx.send((id, chunk)).await.unwrap();
    183                     }
    184                     Err(e) => { eprintln!("Error from producer {}: {}", id, e); break; }
    185                 }
    186             }
    187             child.wait().await.unwrap();
    188         });
    189     }
    190 
    191     drop(tx); // drop original sender so rx knows when all producers are done
    192 
    193     // Single receiver handles all producers
    194     while let Some((producer_id, data)) = rx.recv().await {
    195         println!("Producer {}: {} bytes", producer_id, data.len());
    196         // process binary data here...
    197     }
    198 }
    199 ```
    200 
    201 Do You Need an Event Listener? Not in the traditional sense. Tokio's scheduler
    202 is itself an event-driven async runtime — when no data is available, tasks are
    203 suspended and CPU is yielded, not wasted in a spin loop.
    204 
    205 # Delimiter
    206 
    207 A delimiter is a character or group of characters that marks where one piece of
    208 data ends and the next one begins.
    209 
    210 In computing and data formats, common delimiters include commas, tabs, spaces,
    211 semicolons, and pipes, such as in CSV files where commas separate values. In
    212 general English dictionaries, it is defined as a character that marks the
    213 beginning or end of a unit of data. In maths and programming, symbols like
    214 parentheses (), quotes " " and braces {} also act as delimiters because they
    215 enclose or bound expressions or code blocks.