data_streams.md (6676B)
1 # Data Streams 2 3 ## Data Streams and Pipe 4 5 <img src="/whiteboards/data_stream.png" alt="Data Stream" width="100%"> 6 7 ## Passing Data Streams between rust binaries 8 9 We can pass data directly between binaries, they don't have to go through a 10 networking layers http server with ports, which adds unecessary overhead when it 11 is not needed. 12 13 <img src="/whiteboards/pipe_between_binaries.png" alt="Data Stream" width="100%"> 14 15 ## Making your own “filter” binary 16 17 If you want your Rust binary to be the thing that “processes stdout from another 18 binary”, just read from stdin and write to stdout: 19 20 ```rust 21 use std::io::{self, BufRead, Write}; 22 23 fn main() -> io::Result<()> { 24 let stdin = io::stdin(); 25 let stdout = io::stdout(); 26 let mut out = stdout.lock(); 27 28 for line in stdin.lock().lines() { 29 let line = line?; 30 // transform line 31 let processed = line.replace("foo", "bar"); 32 writeln!(out, "{processed}")?; 33 } 34 35 Ok(()) 36 } 37 ``` 38 39 Then in a shell you can do `producer_cmd | your_rust_filter | consumer_cmd`. 40 41 ## Use processed data as stdin to another binary 42 43 To “chain” binaries but keep Rust in the middle, you can run a second command 44 and write your processed data into its stdin. 45 46 ```rust 47 use std::io::{self, BufRead, BufReader, Write}; 48 use std::process::{Command, Stdio}; 49 50 fn main() -> io::Result<()> { 51 // First binary: produces data 52 let mut producer = Command::new("producer_cmd") 53 .arg("some_arg") 54 .stdout(Stdio::piped()) 55 .spawn()?; 56 57 let producer_stdout = producer 58 .stdout 59 .take() 60 .expect("failed to capture producer stdout"); 61 let reader = BufReader::new(producer_stdout); 62 63 // Second binary: consumes processed data 64 let mut consumer = Command::new("consumer_cmd") 65 .arg("another_arg") 66 .stdin(Stdio::piped()) 67 .spawn()?; 68 69 let consumer_stdin = consumer 70 .stdin 71 .take() 72 .expect("failed to open consumer stdin"); 73 74 // Process loop: read from producer, transform, send to consumer 75 let mut writer = consumer_stdin; 76 for line in reader.lines() { 77 let line = line?; 78 let processed = format!("prefix: {line}\n"); // any transformation 79 writer.write_all(processed.as_bytes())?; 80 } 81 // Close consumer stdin so it can finish 82 drop(writer); 83 84 // Wait for both processes to exit 85 let prod_status = producer.wait()?; 86 let cons_status = consumer.wait()?; 87 88 eprintln!("producer: {prod_status}, consumer: {cons_status}"); 89 90 Ok(()) 91 } 92 ``` 93 94 Conceptually this is equivalent to 95 `producer_cmd | my_rust_filter | consumer_cmd`, but Rust is the middle filter, 96 so you never rely on shell piping. 97 98 ## Async Pipes with Tokio 99 100 Use Tokio's async I/O to pipe binary data. `tokio::process::Command` mirrors the 101 sync API but uses `AsyncRead/AsyncWrite` traits, letting you `.await` on reads 102 and writes without blocking threads. 103 104 ```rust 105 use tokio::process::Command; 106 use tokio::io::{AsyncReadExt, AsyncWriteExt}; 107 use tokio::process::Stdio; 108 109 #[tokio::main] 110 async fn main() { 111 let mut child = Command::new("xxd") 112 .stdin(Stdio::piped()) 113 .stdout(Stdio::piped()) 114 .spawn() 115 .expect("Failed to spawn"); 116 117 let binary_data: Vec<u8> = (0u8..=255).collect(); // 256 bytes of raw binary 118 119 // Write asynchronously 120 if let Some(mut stdin) = child.stdin.take() { 121 stdin.write_all(&binary_data).await.expect("Write failed"); 122 // Drop stdin to close pipe (send EOF) 123 } 124 125 // Read asynchronously 126 let mut output = Vec::new(); 127 if let Some(mut stdout) = child.stdout.take() { 128 stdout.read_to_end(&mut output).await.expect("Read failed"); 129 } 130 131 child.wait().await.expect("Child failed"); 132 println!("Received {} bytes of output", output.len()); 133 } 134 ``` 135 136 For streaming, long-running binaries, use tokio_process_tools which wraps 137 Tokio's process API with real-time line/byte inspection via channels. 138 139 ## Fan-In: Many Binaries → One Receiver 140 141 This is a classic multi-producer, single-consumer (MPSC) pattern. You don't need 142 special OS-level listeners — Tokio's mpsc channel is purpose-built for this. 143 144 The architecture: 145 146 - Each producer binary is spawned as a child process with stdout(Stdio::piped()) 147 148 - Each gets a clone of the mpsc::Sender 149 150 - A single async task owns the Receiver and processes all incoming data 151 152 ```rust 153 use tokio::process::Command; 154 use tokio::io::{AsyncReadExt, BufReader}; 155 use tokio::sync::mpsc; 156 use std::process::Stdio; 157 158 #[tokio::main] 159 async fn main() { 160 let (tx, mut rx) = mpsc::channel::<(usize, Vec<u8>)>(32); 161 162 let producers = vec!["producer_a", "producer_b", "producer_c"]; 163 164 for (id, binary) in producers.iter().enumerate() { 165 let tx = tx.clone(); // clone sender for each producer 166 167 let mut child = Command::new(binary) 168 .stdout(Stdio::piped()) 169 .spawn() 170 .expect("Failed to spawn producer"); 171 172 tokio::spawn(async move { 173 let mut stdout = BufReader::new(child.stdout.take().unwrap()); 174 let mut buf = Vec::new(); 175 176 loop { 177 let mut chunk = vec![0u8; 4096]; 178 match stdout.read(&mut chunk).await { 179 Ok(0) => break, // EOF 180 Ok(n) => { 181 chunk.truncate(n); 182 tx.send((id, chunk)).await.unwrap(); 183 } 184 Err(e) => { eprintln!("Error from producer {}: {}", id, e); break; } 185 } 186 } 187 child.wait().await.unwrap(); 188 }); 189 } 190 191 drop(tx); // drop original sender so rx knows when all producers are done 192 193 // Single receiver handles all producers 194 while let Some((producer_id, data)) = rx.recv().await { 195 println!("Producer {}: {} bytes", producer_id, data.len()); 196 // process binary data here... 197 } 198 } 199 ``` 200 201 Do You Need an Event Listener? Not in the traditional sense. Tokio's scheduler 202 is itself an event-driven async runtime — when no data is available, tasks are 203 suspended and CPU is yielded, not wasted in a spin loop. 204 205 # Delimiter 206 207 A delimiter is a character or group of characters that marks where one piece of 208 data ends and the next one begins. 209 210 In computing and data formats, common delimiters include commas, tabs, spaces, 211 semicolons, and pipes, such as in CSV files where commas separate values. In 212 general English dictionaries, it is defined as a character that marks the 213 beginning or end of a unit of data. In maths and programming, symbols like 214 parentheses (), quotes " " and braces {} also act as delimiters because they 215 enclose or bound expressions or code blocks.