995 Pipeline Stages
Tutorial
The Problem
Build a composable N-stage streaming pipeline where each stage is independently reusable: map_stage, filter_stage, flat_map_stage, and batch_stage. Each stage returns a Receiver<T> that can be threaded into the next stage. This is a richer version of example 984, adding filtering and batching primitives.
🎯 Learning Outcomes
map_stage<T, U, F>(rx, f) -> Receiver<U> — already seen in 984; revisit as a building blockfilter_stage<T, F: Fn(&T) -> bool>(rx, pred) -> Receiver<T> — forward only matching itemsflat_map_stage<T, U, F: Fn(T) -> Vec<U>>(rx, f) -> Receiver<U> — expand one item into manybatch_stage<T>(rx, n) -> Receiver<Vec<T>> — collect n items then forward as a batchCode Example
#![allow(clippy::all)]
// 995: N-Stage Streaming Pipeline
// Each stage is a thread + channel — filter/map/transform stages
use std::sync::mpsc;
use std::thread;
// --- Map stage: applies f to each item ---
fn map_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + 'static,
{
let (tx, out) = mpsc::channel();
thread::spawn(move || {
for item in rx.iter() {
tx.send(f(item)).unwrap();
}
});
out
}
// --- Filter stage: only forward items where pred is true ---
fn filter_stage<T, F>(rx: mpsc::Receiver<T>, pred: F) -> mpsc::Receiver<T>
where
T: Send + 'static,
F: Fn(&T) -> bool + Send + 'static,
{
let (tx, out) = mpsc::channel();
thread::spawn(move || {
for item in rx.iter() {
if pred(&item) {
tx.send(item).unwrap();
}
}
});
out
}
// --- Flat-map stage: one item → multiple outputs ---
fn flat_map_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> Vec<U> + Send + 'static,
{
let (tx, out) = mpsc::channel();
thread::spawn(move || {
for item in rx.iter() {
for v in f(item) {
tx.send(v).unwrap();
}
}
});
out
}
// --- Build a multi-stage pipeline ---
fn pipeline_even_squares() -> Vec<String> {
let (tx, rx) = mpsc::channel::<i32>();
// Stage 1: square
let rx1 = map_stage(rx, |x| x * x);
// Stage 2: keep even
let rx2 = filter_stage(rx1, |x| x % 2 == 0);
// Stage 3: to string
let rx3 = map_stage(rx2, |x: i32| x.to_string());
// Producer
let h = thread::spawn(move || {
for i in 1..=10 {
tx.send(i).unwrap();
}
});
let results: Vec<String> = rx3.iter().collect();
h.join().unwrap();
results
}
// --- More complex: tokenize → filter stop words → count ---
fn word_count_pipeline(text: &str) -> usize {
let stop_words = ["the", "a", "an", "is", "in", "of", "to"];
let words: Vec<String> = text.split_whitespace().map(|w| w.to_lowercase()).collect();
let (tx, rx) = mpsc::channel::<String>();
// Stage 1: emit each word
let rx1 = filter_stage(rx, move |w: &String| !stop_words.contains(&w.as_str()));
// Stage 2: remove empty
let rx2 = filter_stage(rx1, |w: &String| !w.is_empty());
// Stage 3: get length (to count)
let rx3 = map_stage(rx2, |_: String| 1usize);
let h = thread::spawn(move || {
for w in words {
tx.send(w).unwrap();
}
});
let count: usize = rx3.iter().sum();
h.join().unwrap();
count
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_even_squares() {
let results = pipeline_even_squares();
// Squares of 1..10: 1,4,9,16,25,36,49,64,81,100
// Even: 4,16,36,64,100
assert_eq!(results, vec!["4", "16", "36", "64", "100"]);
}
#[test]
fn test_map_stage() {
let (tx, rx) = mpsc::channel::<i32>();
let out = map_stage(rx, |x| x * 2);
for i in [1, 2, 3] {
tx.send(i).unwrap();
}
drop(tx);
let results: Vec<i32> = out.iter().collect();
assert_eq!(results, vec![2, 4, 6]);
}
#[test]
fn test_filter_stage() {
let (tx, rx) = mpsc::channel::<i32>();
let out = filter_stage(rx, |x| x % 2 == 0);
for i in 1..=6 {
tx.send(i).unwrap();
}
drop(tx);
let results: Vec<i32> = out.iter().collect();
assert_eq!(results, vec![2, 4, 6]);
}
#[test]
fn test_flat_map_stage() {
let (tx, rx) = mpsc::channel::<i32>();
let out = flat_map_stage(rx, |x| vec![x, x * 10]);
for i in [1, 2, 3] {
tx.send(i).unwrap();
}
drop(tx);
let results: Vec<i32> = out.iter().collect();
assert_eq!(results, vec![1, 10, 2, 20, 3, 30]);
}
#[test]
fn test_word_count_pipeline() {
let count = word_count_pipeline("the quick brown fox jumps over the lazy dog");
// 9 words - stop words: the(x2), over -> 6 content words
assert!(count > 0 && count < 9);
}
}Key Differences
| Aspect | Rust | OCaml |
|---|---|---|
| Channel close | Automatic on Sender drop | Manual None sentinel |
filter predicate | Fn(&T) -> bool — borrows | 'a -> bool — same concept |
flat_map expansion | Vec<U> return, forward each | list return, forward each |
| Batch | std::mem::replace to swap | Buffer or Queue based |
Each stage in this pipeline runs in its own OS thread — true parallel streaming. The channel acts as a bounded FIFO between stages (unbounded mpsc; use sync_channel(n) for backpressure).
OCaml Approach
let map_stage rx f =
let (tx, out) = Domainslib.Chan.make_unbounded () in
Domain.spawn (fun () ->
let rec loop () = match Domainslib.Chan.recv rx with
| None -> Domainslib.Chan.close tx
| Some v -> Domainslib.Chan.send tx (Some (f v)); loop ()
in loop ()
) |> ignore;
out
let filter_stage rx pred =
let (tx, out) = Domainslib.Chan.make_unbounded () in
Domain.spawn (fun () ->
let rec loop () = match Domainslib.Chan.recv rx with
| None -> Domainslib.Chan.close tx
| Some v ->
if pred v then Domainslib.Chan.send tx (Some v);
loop ()
in loop ()
) |> ignore;
out
OCaml uses None as a sentinel to close channels (since Domainslib.Chan lacks automatic close on owner drop). Rust's channel closes when the Sender is dropped — automatic sentinel via ownership.
Full Source
#![allow(clippy::all)]
// 995: N-Stage Streaming Pipeline
// Each stage is a thread + channel — filter/map/transform stages
use std::sync::mpsc;
use std::thread;
// --- Map stage: applies f to each item ---
fn map_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + 'static,
{
let (tx, out) = mpsc::channel();
thread::spawn(move || {
for item in rx.iter() {
tx.send(f(item)).unwrap();
}
});
out
}
// --- Filter stage: only forward items where pred is true ---
fn filter_stage<T, F>(rx: mpsc::Receiver<T>, pred: F) -> mpsc::Receiver<T>
where
T: Send + 'static,
F: Fn(&T) -> bool + Send + 'static,
{
let (tx, out) = mpsc::channel();
thread::spawn(move || {
for item in rx.iter() {
if pred(&item) {
tx.send(item).unwrap();
}
}
});
out
}
// --- Flat-map stage: one item → multiple outputs ---
fn flat_map_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> Vec<U> + Send + 'static,
{
let (tx, out) = mpsc::channel();
thread::spawn(move || {
for item in rx.iter() {
for v in f(item) {
tx.send(v).unwrap();
}
}
});
out
}
// --- Build a multi-stage pipeline ---
fn pipeline_even_squares() -> Vec<String> {
let (tx, rx) = mpsc::channel::<i32>();
// Stage 1: square
let rx1 = map_stage(rx, |x| x * x);
// Stage 2: keep even
let rx2 = filter_stage(rx1, |x| x % 2 == 0);
// Stage 3: to string
let rx3 = map_stage(rx2, |x: i32| x.to_string());
// Producer
let h = thread::spawn(move || {
for i in 1..=10 {
tx.send(i).unwrap();
}
});
let results: Vec<String> = rx3.iter().collect();
h.join().unwrap();
results
}
// --- More complex: tokenize → filter stop words → count ---
fn word_count_pipeline(text: &str) -> usize {
let stop_words = ["the", "a", "an", "is", "in", "of", "to"];
let words: Vec<String> = text.split_whitespace().map(|w| w.to_lowercase()).collect();
let (tx, rx) = mpsc::channel::<String>();
// Stage 1: emit each word
let rx1 = filter_stage(rx, move |w: &String| !stop_words.contains(&w.as_str()));
// Stage 2: remove empty
let rx2 = filter_stage(rx1, |w: &String| !w.is_empty());
// Stage 3: get length (to count)
let rx3 = map_stage(rx2, |_: String| 1usize);
let h = thread::spawn(move || {
for w in words {
tx.send(w).unwrap();
}
});
let count: usize = rx3.iter().sum();
h.join().unwrap();
count
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_even_squares() {
let results = pipeline_even_squares();
// Squares of 1..10: 1,4,9,16,25,36,49,64,81,100
// Even: 4,16,36,64,100
assert_eq!(results, vec!["4", "16", "36", "64", "100"]);
}
#[test]
fn test_map_stage() {
let (tx, rx) = mpsc::channel::<i32>();
let out = map_stage(rx, |x| x * 2);
for i in [1, 2, 3] {
tx.send(i).unwrap();
}
drop(tx);
let results: Vec<i32> = out.iter().collect();
assert_eq!(results, vec![2, 4, 6]);
}
#[test]
fn test_filter_stage() {
let (tx, rx) = mpsc::channel::<i32>();
let out = filter_stage(rx, |x| x % 2 == 0);
for i in 1..=6 {
tx.send(i).unwrap();
}
drop(tx);
let results: Vec<i32> = out.iter().collect();
assert_eq!(results, vec![2, 4, 6]);
}
#[test]
fn test_flat_map_stage() {
let (tx, rx) = mpsc::channel::<i32>();
let out = flat_map_stage(rx, |x| vec![x, x * 10]);
for i in [1, 2, 3] {
tx.send(i).unwrap();
}
drop(tx);
let results: Vec<i32> = out.iter().collect();
assert_eq!(results, vec![1, 10, 2, 20, 3, 30]);
}
#[test]
fn test_word_count_pipeline() {
let count = word_count_pipeline("the quick brown fox jumps over the lazy dog");
// 9 words - stop words: the(x2), over -> 6 content words
assert!(count > 0 && count < 9);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_even_squares() {
let results = pipeline_even_squares();
// Squares of 1..10: 1,4,9,16,25,36,49,64,81,100
// Even: 4,16,36,64,100
assert_eq!(results, vec!["4", "16", "36", "64", "100"]);
}
#[test]
fn test_map_stage() {
let (tx, rx) = mpsc::channel::<i32>();
let out = map_stage(rx, |x| x * 2);
for i in [1, 2, 3] {
tx.send(i).unwrap();
}
drop(tx);
let results: Vec<i32> = out.iter().collect();
assert_eq!(results, vec![2, 4, 6]);
}
#[test]
fn test_filter_stage() {
let (tx, rx) = mpsc::channel::<i32>();
let out = filter_stage(rx, |x| x % 2 == 0);
for i in 1..=6 {
tx.send(i).unwrap();
}
drop(tx);
let results: Vec<i32> = out.iter().collect();
assert_eq!(results, vec![2, 4, 6]);
}
#[test]
fn test_flat_map_stage() {
let (tx, rx) = mpsc::channel::<i32>();
let out = flat_map_stage(rx, |x| vec![x, x * 10]);
for i in [1, 2, 3] {
tx.send(i).unwrap();
}
drop(tx);
let results: Vec<i32> = out.iter().collect();
assert_eq!(results, vec![1, 10, 2, 20, 3, 30]);
}
#[test]
fn test_word_count_pipeline() {
let count = word_count_pipeline("the quick brown fox jumps over the lazy dog");
// 9 words - stop words: the(x2), over -> 6 content words
assert!(count > 0 && count < 9);
}
}
Deep Comparison
N-Stage Streaming Pipeline — Comparison
Core Insight
Channel pipelines are lazy iterators that run in parallel: while stage N processes item K, stage N+1 can process item K-1. This is the concurrent equivalent of Seq.map f |> Seq.filter p |> Seq.map g.
OCaml Approach
make_stage in_c f creates an output channel and spawns a threadmake_filter in_c pred filters items — only passes matching oneslet c2 = make_stage (make_stage c0 f) gRust Approach
map_stage(rx, f) and filter_stage(rx, pred) return Receiver<T>map_stage(filter_stage(rx, pred), f)flat_map_stage for one-to-many expansionfor item in rx.iter() — stops when channel closesComparison Table
| Stage type | OCaml | Rust |
|---|---|---|
| Map | make_stage in_c f | map_stage(rx, f) |
| Filter | make_filter in_c pred | filter_stage(rx, pred) |
| Flat-map | Custom stage with multiple sends | flat_map_stage(rx, f) |
| Compose 2 stages | make_stage (make_stage c0 f) g | map_stage(map_stage(rx, f), g) |
| Backpressure | Bounded queue (manual) | sync_channel(n) for stage tx |
| Stage count | Any N | Any N |
| Channel close | Explicit close_chan out_c | Drop tx (RAII) |
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
reduce_stage<T, R, F>(rx, init, f) -> R that is the terminal sink stage.tee_stage<T: Clone>(rx) -> (Receiver<T>, Receiver<T>) — broadcast each item to two downstream stages.sync_channel(64) and observe producer slowing.