ExamplesBy LevelBy TopicLearning Paths
462 Fundamental

462: Pipeline Concurrency

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "462: Pipeline Concurrency" functional Rust example. Difficulty level: Fundamental. Key concepts covered: Functional Programming. Data processing often has sequential stages: read → parse → validate → transform → write. Key difference from OCaml: 1. **Channel chaining**: Rust pipelines use explicit channel pairs between stages; OCaml's `Lwt_stream.map` chains lazily.

Tutorial

The Problem

Data processing often has sequential stages: read → parse → validate → transform → write. Running stages sequentially wastes time — the parser waits idle while the reader fetches the next batch. Pipelining runs stages concurrently: stage 1 processes item 1 while stage 2 processes item 0. With N stages each taking time T, throughput improves from T per item to T per item (after startup) with N stages running simultaneously. This is the assembly line model applied to software.

Pipeline concurrency appears in video encoding (decode→filter→encode stages), ETL pipelines, network packet processing, compiler stages (lex→parse→typecheck→codegen), and build systems.

🎯 Learning Outcomes

  • • Understand how channel-connected stages enable pipeline concurrency
  • • Learn how each stage runs in its own thread consuming from one channel and producing to the next
  • • See how Sender<O> + Receiver<I> connect stages via channels
  • • Understand pipeline throughput vs. latency: throughput improves, per-item latency increases
  • • Learn the Pipeline builder pattern for constructing multi-stage pipelines
  • Code Example

    #![allow(clippy::all)]
    //! # Pipeline Concurrency — Staged Processing
    //!
    //! Process data through multiple stages, each running in its own thread.
    
    use std::sync::mpsc::{self, Receiver, Sender};
    use std::thread::{self, JoinHandle};
    
    /// A pipeline stage
    pub struct Stage<I, O> {
        handle: JoinHandle<()>,
        _phantom: std::marker::PhantomData<(I, O)>,
    }
    
    /// Build a processing pipeline
    pub struct Pipeline<T> {
        sender: Sender<T>,
        handles: Vec<JoinHandle<()>>,
    }
    
    impl<T: Send + 'static> Pipeline<T> {
        /// Create the first stage of the pipeline
        pub fn new<F, O>(processor: F) -> (Self, Receiver<O>)
        where
            F: Fn(T) -> O + Send + 'static,
            O: Send + 'static,
        {
            let (input_tx, input_rx) = mpsc::channel::<T>();
            let (output_tx, output_rx) = mpsc::channel::<O>();
    
            let handle = thread::spawn(move || {
                for item in input_rx {
                    let result = processor(item);
                    if output_tx.send(result).is_err() {
                        break;
                    }
                }
            });
    
            (
                Pipeline {
                    sender: input_tx,
                    handles: vec![handle],
                },
                output_rx,
            )
        }
    
        /// Send an item into the pipeline
        pub fn send(&self, item: T) -> Result<(), mpsc::SendError<T>> {
            self.sender.send(item)
        }
    
        /// Close input and wait for completion
        pub fn finish(self) {
            drop(self.sender);
            for h in self.handles {
                let _ = h.join();
            }
        }
    }
    
    /// Add a stage to a receiver
    pub fn add_stage<I, O, F>(input: Receiver<I>, processor: F) -> Receiver<O>
    where
        I: Send + 'static,
        O: Send + 'static,
        F: Fn(I) -> O + Send + 'static,
    {
        let (output_tx, output_rx) = mpsc::channel();
    
        thread::spawn(move || {
            for item in input {
                let result = processor(item);
                if output_tx.send(result).is_err() {
                    break;
                }
            }
        });
    
        output_rx
    }
    
    /// Simple three-stage pipeline example
    pub fn three_stage_pipeline(input: Vec<i32>) -> Vec<i32> {
        let (tx, rx) = mpsc::channel();
    
        // Stage 1: double
        let rx = add_stage(rx, |x| x * 2);
    
        // Stage 2: add 1
        let rx = add_stage(rx, |x| x + 1);
    
        // Stage 3: square
        let rx = add_stage(rx, |x| x * x);
    
        // Send input
        for item in input {
            tx.send(item).unwrap();
        }
        drop(tx);
    
        // Collect output
        rx.iter().collect()
    }
    
    /// Filter-map pipeline
    pub fn filter_map_pipeline<T, U, F, P>(input: Vec<T>, predicate: P, mapper: F) -> Vec<U>
    where
        T: Send + 'static,
        U: Send + 'static,
        P: Fn(&T) -> bool + Send + 'static,
        F: Fn(T) -> U + Send + 'static,
    {
        let (tx, rx) = mpsc::channel();
    
        // Filter stage
        let (filter_tx, filter_rx) = mpsc::channel();
        thread::spawn(move || {
            for item in rx {
                if predicate(&item) {
                    if filter_tx.send(item).is_err() {
                        break;
                    }
                }
            }
        });
    
        // Map stage
        let output_rx = add_stage(filter_rx, mapper);
    
        // Send input
        for item in input {
            tx.send(item).unwrap();
        }
        drop(tx);
    
        output_rx.iter().collect()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_simple_pipeline() {
            let (pipeline, output) = Pipeline::new(|x: i32| x * 2);
    
            pipeline.send(1).unwrap();
            pipeline.send(2).unwrap();
            pipeline.send(3).unwrap();
            pipeline.finish();
    
            let results: Vec<_> = output.iter().collect();
            assert_eq!(results, vec![2, 4, 6]);
        }
    
        #[test]
        fn test_three_stage() {
            // For input [1, 2]: double -> +1 -> square
            // 1 -> 2 -> 3 -> 9
            // 2 -> 4 -> 5 -> 25
            let results = three_stage_pipeline(vec![1, 2]);
            assert_eq!(results, vec![9, 25]);
        }
    
        #[test]
        fn test_add_stage() {
            let (tx, rx) = mpsc::channel();
    
            let rx = add_stage(rx, |x: i32| x.to_string());
    
            tx.send(42).unwrap();
            tx.send(100).unwrap();
            drop(tx);
    
            let results: Vec<_> = rx.iter().collect();
            assert_eq!(results, vec!["42", "100"]);
        }
    
        #[test]
        fn test_filter_map_pipeline() {
            let input = vec![1, 2, 3, 4, 5, 6];
            // Keep evens, then square
            let results = filter_map_pipeline(input, |x| x % 2 == 0, |x| x * x);
            assert_eq!(results, vec![4, 16, 36]);
        }
    
        #[test]
        fn test_empty_input() {
            let results = three_stage_pipeline(vec![]);
            assert!(results.is_empty());
        }
    }

    Key Differences

  • Channel chaining: Rust pipelines use explicit channel pairs between stages; OCaml's Lwt_stream.map chains lazily.
  • Backpressure: Rust's bounded channels provide backpressure between stages; OCaml's async streams propagate backpressure through demand.
  • Error handling: Rust pipeline errors propagate via Result in channel messages; OCaml's Lwt uses promise rejection.
  • Stage composition: Rust's Pipeline::add_stage builder enables composing stages; OCaml's function composition |> is more natural for pure transformations.
  • OCaml Approach

    OCaml implements pipelines with sequences of Thread.create connected by channels or queues. Lwt and Async have stream combinators (Lwt_stream.map, Pipe.map) for async pipeline stages. OCaml 5.x's Domainslib enables parallel pipeline stages across domains. The functional style naturally expresses pipelines as function composition: data |> stage1 |> stage2 |> stage3.

    Full Source

    #![allow(clippy::all)]
    //! # Pipeline Concurrency — Staged Processing
    //!
    //! Process data through multiple stages, each running in its own thread.
    
    use std::sync::mpsc::{self, Receiver, Sender};
    use std::thread::{self, JoinHandle};
    
    /// A pipeline stage
    pub struct Stage<I, O> {
        handle: JoinHandle<()>,
        _phantom: std::marker::PhantomData<(I, O)>,
    }
    
    /// Build a processing pipeline
    pub struct Pipeline<T> {
        sender: Sender<T>,
        handles: Vec<JoinHandle<()>>,
    }
    
    impl<T: Send + 'static> Pipeline<T> {
        /// Create the first stage of the pipeline
        pub fn new<F, O>(processor: F) -> (Self, Receiver<O>)
        where
            F: Fn(T) -> O + Send + 'static,
            O: Send + 'static,
        {
            let (input_tx, input_rx) = mpsc::channel::<T>();
            let (output_tx, output_rx) = mpsc::channel::<O>();
    
            let handle = thread::spawn(move || {
                for item in input_rx {
                    let result = processor(item);
                    if output_tx.send(result).is_err() {
                        break;
                    }
                }
            });
    
            (
                Pipeline {
                    sender: input_tx,
                    handles: vec![handle],
                },
                output_rx,
            )
        }
    
        /// Send an item into the pipeline
        pub fn send(&self, item: T) -> Result<(), mpsc::SendError<T>> {
            self.sender.send(item)
        }
    
        /// Close input and wait for completion
        pub fn finish(self) {
            drop(self.sender);
            for h in self.handles {
                let _ = h.join();
            }
        }
    }
    
    /// Add a stage to a receiver
    pub fn add_stage<I, O, F>(input: Receiver<I>, processor: F) -> Receiver<O>
    where
        I: Send + 'static,
        O: Send + 'static,
        F: Fn(I) -> O + Send + 'static,
    {
        let (output_tx, output_rx) = mpsc::channel();
    
        thread::spawn(move || {
            for item in input {
                let result = processor(item);
                if output_tx.send(result).is_err() {
                    break;
                }
            }
        });
    
        output_rx
    }
    
    /// Simple three-stage pipeline example
    pub fn three_stage_pipeline(input: Vec<i32>) -> Vec<i32> {
        let (tx, rx) = mpsc::channel();
    
        // Stage 1: double
        let rx = add_stage(rx, |x| x * 2);
    
        // Stage 2: add 1
        let rx = add_stage(rx, |x| x + 1);
    
        // Stage 3: square
        let rx = add_stage(rx, |x| x * x);
    
        // Send input
        for item in input {
            tx.send(item).unwrap();
        }
        drop(tx);
    
        // Collect output
        rx.iter().collect()
    }
    
    /// Filter-map pipeline
    pub fn filter_map_pipeline<T, U, F, P>(input: Vec<T>, predicate: P, mapper: F) -> Vec<U>
    where
        T: Send + 'static,
        U: Send + 'static,
        P: Fn(&T) -> bool + Send + 'static,
        F: Fn(T) -> U + Send + 'static,
    {
        let (tx, rx) = mpsc::channel();
    
        // Filter stage
        let (filter_tx, filter_rx) = mpsc::channel();
        thread::spawn(move || {
            for item in rx {
                if predicate(&item) {
                    if filter_tx.send(item).is_err() {
                        break;
                    }
                }
            }
        });
    
        // Map stage
        let output_rx = add_stage(filter_rx, mapper);
    
        // Send input
        for item in input {
            tx.send(item).unwrap();
        }
        drop(tx);
    
        output_rx.iter().collect()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_simple_pipeline() {
            let (pipeline, output) = Pipeline::new(|x: i32| x * 2);
    
            pipeline.send(1).unwrap();
            pipeline.send(2).unwrap();
            pipeline.send(3).unwrap();
            pipeline.finish();
    
            let results: Vec<_> = output.iter().collect();
            assert_eq!(results, vec![2, 4, 6]);
        }
    
        #[test]
        fn test_three_stage() {
            // For input [1, 2]: double -> +1 -> square
            // 1 -> 2 -> 3 -> 9
            // 2 -> 4 -> 5 -> 25
            let results = three_stage_pipeline(vec![1, 2]);
            assert_eq!(results, vec![9, 25]);
        }
    
        #[test]
        fn test_add_stage() {
            let (tx, rx) = mpsc::channel();
    
            let rx = add_stage(rx, |x: i32| x.to_string());
    
            tx.send(42).unwrap();
            tx.send(100).unwrap();
            drop(tx);
    
            let results: Vec<_> = rx.iter().collect();
            assert_eq!(results, vec!["42", "100"]);
        }
    
        #[test]
        fn test_filter_map_pipeline() {
            let input = vec![1, 2, 3, 4, 5, 6];
            // Keep evens, then square
            let results = filter_map_pipeline(input, |x| x % 2 == 0, |x| x * x);
            assert_eq!(results, vec![4, 16, 36]);
        }
    
        #[test]
        fn test_empty_input() {
            let results = three_stage_pipeline(vec![]);
            assert!(results.is_empty());
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_simple_pipeline() {
            let (pipeline, output) = Pipeline::new(|x: i32| x * 2);
    
            pipeline.send(1).unwrap();
            pipeline.send(2).unwrap();
            pipeline.send(3).unwrap();
            pipeline.finish();
    
            let results: Vec<_> = output.iter().collect();
            assert_eq!(results, vec![2, 4, 6]);
        }
    
        #[test]
        fn test_three_stage() {
            // For input [1, 2]: double -> +1 -> square
            // 1 -> 2 -> 3 -> 9
            // 2 -> 4 -> 5 -> 25
            let results = three_stage_pipeline(vec![1, 2]);
            assert_eq!(results, vec![9, 25]);
        }
    
        #[test]
        fn test_add_stage() {
            let (tx, rx) = mpsc::channel();
    
            let rx = add_stage(rx, |x: i32| x.to_string());
    
            tx.send(42).unwrap();
            tx.send(100).unwrap();
            drop(tx);
    
            let results: Vec<_> = rx.iter().collect();
            assert_eq!(results, vec!["42", "100"]);
        }
    
        #[test]
        fn test_filter_map_pipeline() {
            let input = vec![1, 2, 3, 4, 5, 6];
            // Keep evens, then square
            let results = filter_map_pipeline(input, |x| x % 2 == 0, |x| x * x);
            assert_eq!(results, vec![4, 16, 36]);
        }
    
        #[test]
        fn test_empty_input() {
            let results = three_stage_pipeline(vec![]);
            assert!(results.is_empty());
        }
    }

    Deep Comparison

    Comparison

    See src/lib.rs for Rust implementation.

    Exercises

  • Three-stage pipeline: Build a text processing pipeline: stage 1 splits text into words, stage 2 filters words longer than 5 characters, stage 3 converts to uppercase. Use mpsc::channel between stages. Verify with "the quick brown fox jumps over the lazy dog".
  • Parallel stage: Implement a stage that fans out to N worker threads (like fan-out/fan-in) then collects results. This enables a single slow stage to have parallelism without affecting the pipeline structure.
  • Pipeline metrics: Add per-stage counters tracking items processed and processing time. Expose a metrics() -> Vec<StageMetrics> method on the pipeline to identify bottleneck stages.
  • Open Source Repos