462: Pipeline Concurrency
Tutorial Video
Text description (accessibility)
This video demonstrates the "462: Pipeline Concurrency" functional Rust example. Difficulty level: Fundamental. Key concepts covered: Functional Programming. Data processing often has sequential stages: read → parse → validate → transform → write. Key difference from OCaml: 1. **Channel chaining**: Rust pipelines use explicit channel pairs between stages; OCaml's `Lwt_stream.map` chains lazily.
Tutorial
The Problem
Data processing often has sequential stages: read → parse → validate → transform → write. Running stages sequentially wastes time — the parser waits idle while the reader fetches the next batch. Pipelining runs stages concurrently: stage 1 processes item 1 while stage 2 processes item 0. With N stages each taking time T, throughput improves from T per item to T per item (after startup) with N stages running simultaneously. This is the assembly line model applied to software.
Pipeline concurrency appears in video encoding (decode→filter→encode stages), ETL pipelines, network packet processing, compiler stages (lex→parse→typecheck→codegen), and build systems.
🎯 Learning Outcomes
Sender<O> + Receiver<I> connect stages via channelsPipeline builder pattern for constructing multi-stage pipelinesCode Example
#![allow(clippy::all)]
//! # Pipeline Concurrency — Staged Processing
//!
//! Process data through multiple stages, each running in its own thread.
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread::{self, JoinHandle};
/// A pipeline stage
pub struct Stage<I, O> {
handle: JoinHandle<()>,
_phantom: std::marker::PhantomData<(I, O)>,
}
/// Build a processing pipeline
pub struct Pipeline<T> {
sender: Sender<T>,
handles: Vec<JoinHandle<()>>,
}
impl<T: Send + 'static> Pipeline<T> {
/// Create the first stage of the pipeline
pub fn new<F, O>(processor: F) -> (Self, Receiver<O>)
where
F: Fn(T) -> O + Send + 'static,
O: Send + 'static,
{
let (input_tx, input_rx) = mpsc::channel::<T>();
let (output_tx, output_rx) = mpsc::channel::<O>();
let handle = thread::spawn(move || {
for item in input_rx {
let result = processor(item);
if output_tx.send(result).is_err() {
break;
}
}
});
(
Pipeline {
sender: input_tx,
handles: vec![handle],
},
output_rx,
)
}
/// Send an item into the pipeline
pub fn send(&self, item: T) -> Result<(), mpsc::SendError<T>> {
self.sender.send(item)
}
/// Close input and wait for completion
pub fn finish(self) {
drop(self.sender);
for h in self.handles {
let _ = h.join();
}
}
}
/// Add a stage to a receiver
pub fn add_stage<I, O, F>(input: Receiver<I>, processor: F) -> Receiver<O>
where
I: Send + 'static,
O: Send + 'static,
F: Fn(I) -> O + Send + 'static,
{
let (output_tx, output_rx) = mpsc::channel();
thread::spawn(move || {
for item in input {
let result = processor(item);
if output_tx.send(result).is_err() {
break;
}
}
});
output_rx
}
/// Simple three-stage pipeline example
pub fn three_stage_pipeline(input: Vec<i32>) -> Vec<i32> {
let (tx, rx) = mpsc::channel();
// Stage 1: double
let rx = add_stage(rx, |x| x * 2);
// Stage 2: add 1
let rx = add_stage(rx, |x| x + 1);
// Stage 3: square
let rx = add_stage(rx, |x| x * x);
// Send input
for item in input {
tx.send(item).unwrap();
}
drop(tx);
// Collect output
rx.iter().collect()
}
/// Filter-map pipeline
pub fn filter_map_pipeline<T, U, F, P>(input: Vec<T>, predicate: P, mapper: F) -> Vec<U>
where
T: Send + 'static,
U: Send + 'static,
P: Fn(&T) -> bool + Send + 'static,
F: Fn(T) -> U + Send + 'static,
{
let (tx, rx) = mpsc::channel();
// Filter stage
let (filter_tx, filter_rx) = mpsc::channel();
thread::spawn(move || {
for item in rx {
if predicate(&item) {
if filter_tx.send(item).is_err() {
break;
}
}
}
});
// Map stage
let output_rx = add_stage(filter_rx, mapper);
// Send input
for item in input {
tx.send(item).unwrap();
}
drop(tx);
output_rx.iter().collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_simple_pipeline() {
let (pipeline, output) = Pipeline::new(|x: i32| x * 2);
pipeline.send(1).unwrap();
pipeline.send(2).unwrap();
pipeline.send(3).unwrap();
pipeline.finish();
let results: Vec<_> = output.iter().collect();
assert_eq!(results, vec![2, 4, 6]);
}
#[test]
fn test_three_stage() {
// For input [1, 2]: double -> +1 -> square
// 1 -> 2 -> 3 -> 9
// 2 -> 4 -> 5 -> 25
let results = three_stage_pipeline(vec![1, 2]);
assert_eq!(results, vec![9, 25]);
}
#[test]
fn test_add_stage() {
let (tx, rx) = mpsc::channel();
let rx = add_stage(rx, |x: i32| x.to_string());
tx.send(42).unwrap();
tx.send(100).unwrap();
drop(tx);
let results: Vec<_> = rx.iter().collect();
assert_eq!(results, vec!["42", "100"]);
}
#[test]
fn test_filter_map_pipeline() {
let input = vec![1, 2, 3, 4, 5, 6];
// Keep evens, then square
let results = filter_map_pipeline(input, |x| x % 2 == 0, |x| x * x);
assert_eq!(results, vec![4, 16, 36]);
}
#[test]
fn test_empty_input() {
let results = three_stage_pipeline(vec![]);
assert!(results.is_empty());
}
}Key Differences
Lwt_stream.map chains lazily.Result in channel messages; OCaml's Lwt uses promise rejection.Pipeline::add_stage builder enables composing stages; OCaml's function composition |> is more natural for pure transformations.OCaml Approach
OCaml implements pipelines with sequences of Thread.create connected by channels or queues. Lwt and Async have stream combinators (Lwt_stream.map, Pipe.map) for async pipeline stages. OCaml 5.x's Domainslib enables parallel pipeline stages across domains. The functional style naturally expresses pipelines as function composition: data |> stage1 |> stage2 |> stage3.
Full Source
#![allow(clippy::all)]
//! # Pipeline Concurrency — Staged Processing
//!
//! Process data through multiple stages, each running in its own thread.
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread::{self, JoinHandle};
/// A pipeline stage
pub struct Stage<I, O> {
handle: JoinHandle<()>,
_phantom: std::marker::PhantomData<(I, O)>,
}
/// Build a processing pipeline
pub struct Pipeline<T> {
sender: Sender<T>,
handles: Vec<JoinHandle<()>>,
}
impl<T: Send + 'static> Pipeline<T> {
/// Create the first stage of the pipeline
pub fn new<F, O>(processor: F) -> (Self, Receiver<O>)
where
F: Fn(T) -> O + Send + 'static,
O: Send + 'static,
{
let (input_tx, input_rx) = mpsc::channel::<T>();
let (output_tx, output_rx) = mpsc::channel::<O>();
let handle = thread::spawn(move || {
for item in input_rx {
let result = processor(item);
if output_tx.send(result).is_err() {
break;
}
}
});
(
Pipeline {
sender: input_tx,
handles: vec![handle],
},
output_rx,
)
}
/// Send an item into the pipeline
pub fn send(&self, item: T) -> Result<(), mpsc::SendError<T>> {
self.sender.send(item)
}
/// Close input and wait for completion
pub fn finish(self) {
drop(self.sender);
for h in self.handles {
let _ = h.join();
}
}
}
/// Add a stage to a receiver
pub fn add_stage<I, O, F>(input: Receiver<I>, processor: F) -> Receiver<O>
where
I: Send + 'static,
O: Send + 'static,
F: Fn(I) -> O + Send + 'static,
{
let (output_tx, output_rx) = mpsc::channel();
thread::spawn(move || {
for item in input {
let result = processor(item);
if output_tx.send(result).is_err() {
break;
}
}
});
output_rx
}
/// Simple three-stage pipeline example
pub fn three_stage_pipeline(input: Vec<i32>) -> Vec<i32> {
let (tx, rx) = mpsc::channel();
// Stage 1: double
let rx = add_stage(rx, |x| x * 2);
// Stage 2: add 1
let rx = add_stage(rx, |x| x + 1);
// Stage 3: square
let rx = add_stage(rx, |x| x * x);
// Send input
for item in input {
tx.send(item).unwrap();
}
drop(tx);
// Collect output
rx.iter().collect()
}
/// Filter-map pipeline
pub fn filter_map_pipeline<T, U, F, P>(input: Vec<T>, predicate: P, mapper: F) -> Vec<U>
where
T: Send + 'static,
U: Send + 'static,
P: Fn(&T) -> bool + Send + 'static,
F: Fn(T) -> U + Send + 'static,
{
let (tx, rx) = mpsc::channel();
// Filter stage
let (filter_tx, filter_rx) = mpsc::channel();
thread::spawn(move || {
for item in rx {
if predicate(&item) {
if filter_tx.send(item).is_err() {
break;
}
}
}
});
// Map stage
let output_rx = add_stage(filter_rx, mapper);
// Send input
for item in input {
tx.send(item).unwrap();
}
drop(tx);
output_rx.iter().collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_simple_pipeline() {
let (pipeline, output) = Pipeline::new(|x: i32| x * 2);
pipeline.send(1).unwrap();
pipeline.send(2).unwrap();
pipeline.send(3).unwrap();
pipeline.finish();
let results: Vec<_> = output.iter().collect();
assert_eq!(results, vec![2, 4, 6]);
}
#[test]
fn test_three_stage() {
// For input [1, 2]: double -> +1 -> square
// 1 -> 2 -> 3 -> 9
// 2 -> 4 -> 5 -> 25
let results = three_stage_pipeline(vec![1, 2]);
assert_eq!(results, vec![9, 25]);
}
#[test]
fn test_add_stage() {
let (tx, rx) = mpsc::channel();
let rx = add_stage(rx, |x: i32| x.to_string());
tx.send(42).unwrap();
tx.send(100).unwrap();
drop(tx);
let results: Vec<_> = rx.iter().collect();
assert_eq!(results, vec!["42", "100"]);
}
#[test]
fn test_filter_map_pipeline() {
let input = vec![1, 2, 3, 4, 5, 6];
// Keep evens, then square
let results = filter_map_pipeline(input, |x| x % 2 == 0, |x| x * x);
assert_eq!(results, vec![4, 16, 36]);
}
#[test]
fn test_empty_input() {
let results = three_stage_pipeline(vec![]);
assert!(results.is_empty());
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_simple_pipeline() {
let (pipeline, output) = Pipeline::new(|x: i32| x * 2);
pipeline.send(1).unwrap();
pipeline.send(2).unwrap();
pipeline.send(3).unwrap();
pipeline.finish();
let results: Vec<_> = output.iter().collect();
assert_eq!(results, vec![2, 4, 6]);
}
#[test]
fn test_three_stage() {
// For input [1, 2]: double -> +1 -> square
// 1 -> 2 -> 3 -> 9
// 2 -> 4 -> 5 -> 25
let results = three_stage_pipeline(vec![1, 2]);
assert_eq!(results, vec![9, 25]);
}
#[test]
fn test_add_stage() {
let (tx, rx) = mpsc::channel();
let rx = add_stage(rx, |x: i32| x.to_string());
tx.send(42).unwrap();
tx.send(100).unwrap();
drop(tx);
let results: Vec<_> = rx.iter().collect();
assert_eq!(results, vec!["42", "100"]);
}
#[test]
fn test_filter_map_pipeline() {
let input = vec![1, 2, 3, 4, 5, 6];
// Keep evens, then square
let results = filter_map_pipeline(input, |x| x % 2 == 0, |x| x * x);
assert_eq!(results, vec![4, 16, 36]);
}
#[test]
fn test_empty_input() {
let results = three_stage_pipeline(vec![]);
assert!(results.is_empty());
}
}
Deep Comparison
Comparison
See src/lib.rs for Rust implementation.
Exercises
mpsc::channel between stages. Verify with "the quick brown fox jumps over the lazy dog".metrics() -> Vec<StageMetrics> method on the pipeline to identify bottleneck stages.