ExamplesBy LevelBy TopicLearning Paths
984 Fundamental

984 Channel Pipeline

Functional Programming

Tutorial

The Problem

Build a multi-stage processing pipeline where each stage reads from an input channel, applies a transformation, and writes to an output channel. Each stage runs in its own thread. Implement pipeline_stage<T, U, F> as a reusable building block that returns the output Receiver, enabling declarative pipeline construction.

🎯 Learning Outcomes

  • • Implement pipeline_stage<T, U, F>(rx: Receiver<T>, f: F) -> Receiver<U> that spawns a worker thread
  • • Chain stages: rx1 = pipeline_stage(rx0, double); rx2 = pipeline_stage(rx1, add1)
  • • Use rx.iter() inside each stage — naturally stops when the upstream channel closes
  • • Recognize that dropping tx_out when the thread exits closes the downstream channel automatically
  • • Connect to Unix pipes and OCaml's Lwt_stream.map
  • Code Example

    #![allow(clippy::all)]
    // 984: Channel Pipeline
    // Chain of processing stages via mpsc channels
    
    use std::sync::mpsc;
    use std::thread;
    
    // --- Build a pipeline stage: read from rx, apply f, send to tx ---
    fn pipeline_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
    where
        T: Send + 'static,
        U: Send + 'static,
        F: Fn(T) -> U + Send + 'static,
    {
        let (tx_out, rx_out) = mpsc::channel();
        thread::spawn(move || {
            for item in rx.iter() {
                // iter() stops when channel closes
                tx_out.send(f(item)).unwrap();
            }
            // tx_out drops here → closes next stage
        });
        rx_out
    }
    
    // --- Build a full pipeline from a Vec of boxed functions ---
    fn run_pipeline(inputs: Vec<i32>) -> Vec<String> {
        let (tx_source, rx0) = mpsc::channel::<i32>();
    
        // Stage 1: double
        let rx1 = pipeline_stage(rx0, |x| x * 2);
        // Stage 2: add 1
        let rx2 = pipeline_stage(rx1, |x| x + 1);
        // Stage 3: to string
        let rx3 = pipeline_stage(rx2, |x: i32| x.to_string());
    
        // Producer
        let producer = thread::spawn(move || {
            for v in inputs {
                tx_source.send(v).unwrap();
            }
            // tx_source drops → closes pipeline
        });
    
        // Collect results
        let results: Vec<String> = rx3.iter().collect();
        producer.join().unwrap();
        results
    }
    
    // --- Parameterised N-stage pipeline ---
    fn run_n_stages(
        inputs: Vec<i32>,
        stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>>,
    ) -> Vec<i32> {
        let (tx_source, mut current_rx) = mpsc::channel::<i32>();
    
        for f in stages {
            current_rx = pipeline_stage(current_rx, f);
        }
    
        let producer = thread::spawn(move || {
            for v in inputs {
                tx_source.send(v).unwrap();
            }
        });
    
        let results: Vec<i32> = current_rx.iter().collect();
        producer.join().unwrap();
        results
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_pipeline_3_stages() {
            let results = run_pipeline(vec![1, 2, 3, 4, 5]);
            // 1->2->3, 2->4->5, 3->6->7, 4->8->9, 5->10->11
            assert_eq!(results, vec!["3", "5", "7", "9", "11"]);
        }
    
        #[test]
        fn test_pipeline_empty() {
            let results = run_pipeline(vec![]);
            assert!(results.is_empty());
        }
    
        #[test]
        fn test_pipeline_single_item() {
            let results = run_pipeline(vec![5]);
            assert_eq!(results, vec!["11"]); // 5*2=10, 10+1=11
        }
    
        #[test]
        fn test_n_stage_pipeline() {
            // +10, *3, -1: 1->11->33->32
            let stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>> = vec![
                Box::new(|x| x + 10),
                Box::new(|x| x * 3),
                Box::new(|x| x - 1),
            ];
            let results = run_n_stages(vec![1], stages);
            assert_eq!(results, vec![32]);
        }
    
        #[test]
        fn test_stage_closure() {
            let (tx, rx) = mpsc::channel::<i32>();
            let rx_out = pipeline_stage(rx, |x| x * x);
    
            let h = thread::spawn(move || {
                for v in [2, 3, 4] {
                    tx.send(v).unwrap();
                }
            });
            h.join().unwrap();
    
            let results: Vec<i32> = rx_out.iter().collect();
            assert_eq!(results, vec![4, 9, 16]);
        }
    }

    Key Differences

    AspectRustOCaml
    Stage patternpipeline_stage(rx, f) -> Receiver<U>Lwt_stream.map f stream (lazy)
    ParallelismTrue OS threads — stages run in parallelLwt_stream — cooperative scheduling
    ShutdownChannel close propagates automaticallyNone sentinel or stream close
    CompositionThread the returned ReceiverWrap stream in next map

    Each stage in the Rust pipeline runs in its own OS thread — stages truly overlap in execution. Stage 2 can process an item while stage 1 is processing the next item and stage 3 is processing the previous item.

    OCaml Approach

    open Lwt_stream
    
    (* Lwt_stream.map is the direct equivalent *)
    let run_pipeline inputs =
      let source = of_list inputs in
      let stage1 = map (fun x -> x * 2) source in
      let stage2 = map (fun x -> x + 1) stage1 in
      let stage3 = map (fun x -> string_of_int x) stage2 in
      to_list stage3  (* lazy evaluation starts here *)
    
    (* Thread-based pipeline with Domainslib *)
    let pipeline_stage rx f =
      let (tx_out, rx_out) = Domainslib.Chan.make_unbounded () in
      Domain.spawn (fun () ->
        let rec loop () = match Domainslib.Chan.recv rx with
          | None -> Domainslib.Chan.close tx_out
          | Some v -> Domainslib.Chan.send tx_out (Some (f v)); loop ()
        in loop ()
      ) |> ignore;
      rx_out
    

    OCaml's Lwt_stream.map is lazy — transformation happens on demand as elements are consumed, not eagerly in parallel threads. For true parallel pipeline stages, Domainslib.Chan is needed.

    Full Source

    #![allow(clippy::all)]
    // 984: Channel Pipeline
    // Chain of processing stages via mpsc channels
    
    use std::sync::mpsc;
    use std::thread;
    
    // --- Build a pipeline stage: read from rx, apply f, send to tx ---
    fn pipeline_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
    where
        T: Send + 'static,
        U: Send + 'static,
        F: Fn(T) -> U + Send + 'static,
    {
        let (tx_out, rx_out) = mpsc::channel();
        thread::spawn(move || {
            for item in rx.iter() {
                // iter() stops when channel closes
                tx_out.send(f(item)).unwrap();
            }
            // tx_out drops here → closes next stage
        });
        rx_out
    }
    
    // --- Build a full pipeline from a Vec of boxed functions ---
    fn run_pipeline(inputs: Vec<i32>) -> Vec<String> {
        let (tx_source, rx0) = mpsc::channel::<i32>();
    
        // Stage 1: double
        let rx1 = pipeline_stage(rx0, |x| x * 2);
        // Stage 2: add 1
        let rx2 = pipeline_stage(rx1, |x| x + 1);
        // Stage 3: to string
        let rx3 = pipeline_stage(rx2, |x: i32| x.to_string());
    
        // Producer
        let producer = thread::spawn(move || {
            for v in inputs {
                tx_source.send(v).unwrap();
            }
            // tx_source drops → closes pipeline
        });
    
        // Collect results
        let results: Vec<String> = rx3.iter().collect();
        producer.join().unwrap();
        results
    }
    
    // --- Parameterised N-stage pipeline ---
    fn run_n_stages(
        inputs: Vec<i32>,
        stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>>,
    ) -> Vec<i32> {
        let (tx_source, mut current_rx) = mpsc::channel::<i32>();
    
        for f in stages {
            current_rx = pipeline_stage(current_rx, f);
        }
    
        let producer = thread::spawn(move || {
            for v in inputs {
                tx_source.send(v).unwrap();
            }
        });
    
        let results: Vec<i32> = current_rx.iter().collect();
        producer.join().unwrap();
        results
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_pipeline_3_stages() {
            let results = run_pipeline(vec![1, 2, 3, 4, 5]);
            // 1->2->3, 2->4->5, 3->6->7, 4->8->9, 5->10->11
            assert_eq!(results, vec!["3", "5", "7", "9", "11"]);
        }
    
        #[test]
        fn test_pipeline_empty() {
            let results = run_pipeline(vec![]);
            assert!(results.is_empty());
        }
    
        #[test]
        fn test_pipeline_single_item() {
            let results = run_pipeline(vec![5]);
            assert_eq!(results, vec!["11"]); // 5*2=10, 10+1=11
        }
    
        #[test]
        fn test_n_stage_pipeline() {
            // +10, *3, -1: 1->11->33->32
            let stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>> = vec![
                Box::new(|x| x + 10),
                Box::new(|x| x * 3),
                Box::new(|x| x - 1),
            ];
            let results = run_n_stages(vec![1], stages);
            assert_eq!(results, vec![32]);
        }
    
        #[test]
        fn test_stage_closure() {
            let (tx, rx) = mpsc::channel::<i32>();
            let rx_out = pipeline_stage(rx, |x| x * x);
    
            let h = thread::spawn(move || {
                for v in [2, 3, 4] {
                    tx.send(v).unwrap();
                }
            });
            h.join().unwrap();
    
            let results: Vec<i32> = rx_out.iter().collect();
            assert_eq!(results, vec![4, 9, 16]);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_pipeline_3_stages() {
            let results = run_pipeline(vec![1, 2, 3, 4, 5]);
            // 1->2->3, 2->4->5, 3->6->7, 4->8->9, 5->10->11
            assert_eq!(results, vec!["3", "5", "7", "9", "11"]);
        }
    
        #[test]
        fn test_pipeline_empty() {
            let results = run_pipeline(vec![]);
            assert!(results.is_empty());
        }
    
        #[test]
        fn test_pipeline_single_item() {
            let results = run_pipeline(vec![5]);
            assert_eq!(results, vec!["11"]); // 5*2=10, 10+1=11
        }
    
        #[test]
        fn test_n_stage_pipeline() {
            // +10, *3, -1: 1->11->33->32
            let stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>> = vec![
                Box::new(|x| x + 10),
                Box::new(|x| x * 3),
                Box::new(|x| x - 1),
            ];
            let results = run_n_stages(vec![1], stages);
            assert_eq!(results, vec![32]);
        }
    
        #[test]
        fn test_stage_closure() {
            let (tx, rx) = mpsc::channel::<i32>();
            let rx_out = pipeline_stage(rx, |x| x * x);
    
            let h = thread::spawn(move || {
                for v in [2, 3, 4] {
                    tx.send(v).unwrap();
                }
            });
            h.join().unwrap();
    
            let results: Vec<i32> = rx_out.iter().collect();
            assert_eq!(results, vec![4, 9, 16]);
        }
    }

    Deep Comparison

    Channel Pipeline — Comparison

    Core Insight

    A channel pipeline is function composition in concurrent space: instead of f ∘ g ∘ h, you have stage(f) | stage(g) | stage(h) where | is a channel. This is Unix pipes, CSP, and the actor model all rolled into one pattern.

    OCaml Approach

  • • No built-in pipeline abstraction — build with Thread + Queue + Mutex + Condition
  • • Each stage is a thread looping over recv calls
  • • Close downstream by signalling closed = true + broadcasting
  • • More boilerplate, but same idea: transform + forward
  • Rust Approach

  • pipeline_stage(rx, f) creates a thread internally, returns new Receiver
  • • Composable: let rx2 = pipeline_stage(pipeline_stage(rx0, f), g)
  • • Channel closes automatically when Sender drops — propagates through pipeline
  • rx.iter() is the idiomatic "read until closed" loop
  • Comparison Table

    ConceptOCamlRust
    Stage abstractionManual thread + queue + mutexfn pipeline_stage(rx, f) -> Receiver
    Close propagationExplicit closed flag + broadcastDrop Sender → next stage's rx.iter() stops
    Back-pressureQueue fills up (manual limit needed)sync_channel(n) blocks producer
    Compose N stagesCreate N channel/thread pairsChain pipeline_stage calls
    Collect outputLoop over recv until Nonerx.iter().collect()
    ParallelismOne thread per stageOne thread per stage (same)

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Add a filter stage: filter_stage(rx, pred) -> Receiver<T> that drops items where pred(item) is false.
  • Add a buffer stage: buffer_stage(rx, n) -> Receiver<Vec<T>> that batches n items before passing downstream.
  • Add error handling: change stage functions to return Result<U, E> and propagate errors through the pipeline.
  • Implement a fan-out stage: broadcast_stage(rx, n) -> Vec<Receiver<T>> that sends each item to n downstream stages.
  • Benchmark a 5-stage pipeline processing 100,000 items against a sequential fold over the same transformations.
  • Open Source Repos