984 Channel Pipeline
Tutorial
The Problem
Build a multi-stage processing pipeline where each stage reads from an input channel, applies a transformation, and writes to an output channel. Each stage runs in its own thread. Implement pipeline_stage<T, U, F> as a reusable building block that returns the output Receiver, enabling declarative pipeline construction.
🎯 Learning Outcomes
pipeline_stage<T, U, F>(rx: Receiver<T>, f: F) -> Receiver<U> that spawns a worker threadrx1 = pipeline_stage(rx0, double); rx2 = pipeline_stage(rx1, add1)rx.iter() inside each stage — naturally stops when the upstream channel closestx_out when the thread exits closes the downstream channel automaticallyLwt_stream.mapCode Example
#![allow(clippy::all)]
// 984: Channel Pipeline
// Chain of processing stages via mpsc channels
use std::sync::mpsc;
use std::thread;
// --- Build a pipeline stage: read from rx, apply f, send to tx ---
fn pipeline_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + 'static,
{
let (tx_out, rx_out) = mpsc::channel();
thread::spawn(move || {
for item in rx.iter() {
// iter() stops when channel closes
tx_out.send(f(item)).unwrap();
}
// tx_out drops here → closes next stage
});
rx_out
}
// --- Build a full pipeline from a Vec of boxed functions ---
fn run_pipeline(inputs: Vec<i32>) -> Vec<String> {
let (tx_source, rx0) = mpsc::channel::<i32>();
// Stage 1: double
let rx1 = pipeline_stage(rx0, |x| x * 2);
// Stage 2: add 1
let rx2 = pipeline_stage(rx1, |x| x + 1);
// Stage 3: to string
let rx3 = pipeline_stage(rx2, |x: i32| x.to_string());
// Producer
let producer = thread::spawn(move || {
for v in inputs {
tx_source.send(v).unwrap();
}
// tx_source drops → closes pipeline
});
// Collect results
let results: Vec<String> = rx3.iter().collect();
producer.join().unwrap();
results
}
// --- Parameterised N-stage pipeline ---
fn run_n_stages(
inputs: Vec<i32>,
stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>>,
) -> Vec<i32> {
let (tx_source, mut current_rx) = mpsc::channel::<i32>();
for f in stages {
current_rx = pipeline_stage(current_rx, f);
}
let producer = thread::spawn(move || {
for v in inputs {
tx_source.send(v).unwrap();
}
});
let results: Vec<i32> = current_rx.iter().collect();
producer.join().unwrap();
results
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_3_stages() {
let results = run_pipeline(vec![1, 2, 3, 4, 5]);
// 1->2->3, 2->4->5, 3->6->7, 4->8->9, 5->10->11
assert_eq!(results, vec!["3", "5", "7", "9", "11"]);
}
#[test]
fn test_pipeline_empty() {
let results = run_pipeline(vec![]);
assert!(results.is_empty());
}
#[test]
fn test_pipeline_single_item() {
let results = run_pipeline(vec![5]);
assert_eq!(results, vec!["11"]); // 5*2=10, 10+1=11
}
#[test]
fn test_n_stage_pipeline() {
// +10, *3, -1: 1->11->33->32
let stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>> = vec![
Box::new(|x| x + 10),
Box::new(|x| x * 3),
Box::new(|x| x - 1),
];
let results = run_n_stages(vec![1], stages);
assert_eq!(results, vec![32]);
}
#[test]
fn test_stage_closure() {
let (tx, rx) = mpsc::channel::<i32>();
let rx_out = pipeline_stage(rx, |x| x * x);
let h = thread::spawn(move || {
for v in [2, 3, 4] {
tx.send(v).unwrap();
}
});
h.join().unwrap();
let results: Vec<i32> = rx_out.iter().collect();
assert_eq!(results, vec![4, 9, 16]);
}
}Key Differences
| Aspect | Rust | OCaml |
|---|---|---|
| Stage pattern | pipeline_stage(rx, f) -> Receiver<U> | Lwt_stream.map f stream (lazy) |
| Parallelism | True OS threads — stages run in parallel | Lwt_stream — cooperative scheduling |
| Shutdown | Channel close propagates automatically | None sentinel or stream close |
| Composition | Thread the returned Receiver | Wrap stream in next map |
Each stage in the Rust pipeline runs in its own OS thread — stages truly overlap in execution. Stage 2 can process an item while stage 1 is processing the next item and stage 3 is processing the previous item.
OCaml Approach
open Lwt_stream
(* Lwt_stream.map is the direct equivalent *)
let run_pipeline inputs =
let source = of_list inputs in
let stage1 = map (fun x -> x * 2) source in
let stage2 = map (fun x -> x + 1) stage1 in
let stage3 = map (fun x -> string_of_int x) stage2 in
to_list stage3 (* lazy evaluation starts here *)
(* Thread-based pipeline with Domainslib *)
let pipeline_stage rx f =
let (tx_out, rx_out) = Domainslib.Chan.make_unbounded () in
Domain.spawn (fun () ->
let rec loop () = match Domainslib.Chan.recv rx with
| None -> Domainslib.Chan.close tx_out
| Some v -> Domainslib.Chan.send tx_out (Some (f v)); loop ()
in loop ()
) |> ignore;
rx_out
OCaml's Lwt_stream.map is lazy — transformation happens on demand as elements are consumed, not eagerly in parallel threads. For true parallel pipeline stages, Domainslib.Chan is needed.
Full Source
#![allow(clippy::all)]
// 984: Channel Pipeline
// Chain of processing stages via mpsc channels
use std::sync::mpsc;
use std::thread;
// --- Build a pipeline stage: read from rx, apply f, send to tx ---
fn pipeline_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + 'static,
{
let (tx_out, rx_out) = mpsc::channel();
thread::spawn(move || {
for item in rx.iter() {
// iter() stops when channel closes
tx_out.send(f(item)).unwrap();
}
// tx_out drops here → closes next stage
});
rx_out
}
// --- Build a full pipeline from a Vec of boxed functions ---
fn run_pipeline(inputs: Vec<i32>) -> Vec<String> {
let (tx_source, rx0) = mpsc::channel::<i32>();
// Stage 1: double
let rx1 = pipeline_stage(rx0, |x| x * 2);
// Stage 2: add 1
let rx2 = pipeline_stage(rx1, |x| x + 1);
// Stage 3: to string
let rx3 = pipeline_stage(rx2, |x: i32| x.to_string());
// Producer
let producer = thread::spawn(move || {
for v in inputs {
tx_source.send(v).unwrap();
}
// tx_source drops → closes pipeline
});
// Collect results
let results: Vec<String> = rx3.iter().collect();
producer.join().unwrap();
results
}
// --- Parameterised N-stage pipeline ---
fn run_n_stages(
inputs: Vec<i32>,
stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>>,
) -> Vec<i32> {
let (tx_source, mut current_rx) = mpsc::channel::<i32>();
for f in stages {
current_rx = pipeline_stage(current_rx, f);
}
let producer = thread::spawn(move || {
for v in inputs {
tx_source.send(v).unwrap();
}
});
let results: Vec<i32> = current_rx.iter().collect();
producer.join().unwrap();
results
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_3_stages() {
let results = run_pipeline(vec![1, 2, 3, 4, 5]);
// 1->2->3, 2->4->5, 3->6->7, 4->8->9, 5->10->11
assert_eq!(results, vec!["3", "5", "7", "9", "11"]);
}
#[test]
fn test_pipeline_empty() {
let results = run_pipeline(vec![]);
assert!(results.is_empty());
}
#[test]
fn test_pipeline_single_item() {
let results = run_pipeline(vec![5]);
assert_eq!(results, vec!["11"]); // 5*2=10, 10+1=11
}
#[test]
fn test_n_stage_pipeline() {
// +10, *3, -1: 1->11->33->32
let stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>> = vec![
Box::new(|x| x + 10),
Box::new(|x| x * 3),
Box::new(|x| x - 1),
];
let results = run_n_stages(vec![1], stages);
assert_eq!(results, vec![32]);
}
#[test]
fn test_stage_closure() {
let (tx, rx) = mpsc::channel::<i32>();
let rx_out = pipeline_stage(rx, |x| x * x);
let h = thread::spawn(move || {
for v in [2, 3, 4] {
tx.send(v).unwrap();
}
});
h.join().unwrap();
let results: Vec<i32> = rx_out.iter().collect();
assert_eq!(results, vec![4, 9, 16]);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_3_stages() {
let results = run_pipeline(vec![1, 2, 3, 4, 5]);
// 1->2->3, 2->4->5, 3->6->7, 4->8->9, 5->10->11
assert_eq!(results, vec!["3", "5", "7", "9", "11"]);
}
#[test]
fn test_pipeline_empty() {
let results = run_pipeline(vec![]);
assert!(results.is_empty());
}
#[test]
fn test_pipeline_single_item() {
let results = run_pipeline(vec![5]);
assert_eq!(results, vec!["11"]); // 5*2=10, 10+1=11
}
#[test]
fn test_n_stage_pipeline() {
// +10, *3, -1: 1->11->33->32
let stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>> = vec![
Box::new(|x| x + 10),
Box::new(|x| x * 3),
Box::new(|x| x - 1),
];
let results = run_n_stages(vec![1], stages);
assert_eq!(results, vec![32]);
}
#[test]
fn test_stage_closure() {
let (tx, rx) = mpsc::channel::<i32>();
let rx_out = pipeline_stage(rx, |x| x * x);
let h = thread::spawn(move || {
for v in [2, 3, 4] {
tx.send(v).unwrap();
}
});
h.join().unwrap();
let results: Vec<i32> = rx_out.iter().collect();
assert_eq!(results, vec![4, 9, 16]);
}
}
Deep Comparison
Channel Pipeline — Comparison
Core Insight
A channel pipeline is function composition in concurrent space: instead of f ∘ g ∘ h, you have stage(f) | stage(g) | stage(h) where | is a channel. This is Unix pipes, CSP, and the actor model all rolled into one pattern.
OCaml Approach
Thread + Queue + Mutex + Conditionrecv callsclosed = true + broadcastingRust Approach
pipeline_stage(rx, f) creates a thread internally, returns new Receiverlet rx2 = pipeline_stage(pipeline_stage(rx0, f), g)Sender drops — propagates through pipelinerx.iter() is the idiomatic "read until closed" loopComparison Table
| Concept | OCaml | Rust |
|---|---|---|
| Stage abstraction | Manual thread + queue + mutex | fn pipeline_stage(rx, f) -> Receiver |
| Close propagation | Explicit closed flag + broadcast | Drop Sender → next stage's rx.iter() stops |
| Back-pressure | Queue fills up (manual limit needed) | sync_channel(n) blocks producer |
| Compose N stages | Create N channel/thread pairs | Chain pipeline_stage calls |
| Collect output | Loop over recv until None | rx.iter().collect() |
| Parallelism | One thread per stage | One thread per stage (same) |
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
filter_stage(rx, pred) -> Receiver<T> that drops items where pred(item) is false.buffer_stage(rx, n) -> Receiver<Vec<T>> that batches n items before passing downstream.Result<U, E> and propagate errors through the pipeline.broadcast_stage(rx, n) -> Vec<Receiver<T>> that sends each item to n downstream stages.