ExamplesBy LevelBy TopicLearning Paths
995 Fundamental

995 Pipeline Stages

Functional Programming

Tutorial

The Problem

Build a composable N-stage streaming pipeline where each stage is independently reusable: map_stage, filter_stage, flat_map_stage, and batch_stage. Each stage returns a Receiver<T> that can be threaded into the next stage. This is a richer version of example 984, adding filtering and batching primitives.

🎯 Learning Outcomes

  • • Implement map_stage<T, U, F>(rx, f) -> Receiver<U> — already seen in 984; revisit as a building block
  • • Implement filter_stage<T, F: Fn(&T) -> bool>(rx, pred) -> Receiver<T> — forward only matching items
  • • Implement flat_map_stage<T, U, F: Fn(T) -> Vec<U>>(rx, f) -> Receiver<U> — expand one item into many
  • • Implement batch_stage<T>(rx, n) -> Receiver<Vec<T>> — collect n items then forward as a batch
  • • Compose all four stages into a concrete pipeline and verify output
  • Code Example

    #![allow(clippy::all)]
    // 995: N-Stage Streaming Pipeline
    // Each stage is a thread + channel — filter/map/transform stages
    
    use std::sync::mpsc;
    use std::thread;
    
    // --- Map stage: applies f to each item ---
    fn map_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
    where
        T: Send + 'static,
        U: Send + 'static,
        F: Fn(T) -> U + Send + 'static,
    {
        let (tx, out) = mpsc::channel();
        thread::spawn(move || {
            for item in rx.iter() {
                tx.send(f(item)).unwrap();
            }
        });
        out
    }
    
    // --- Filter stage: only forward items where pred is true ---
    fn filter_stage<T, F>(rx: mpsc::Receiver<T>, pred: F) -> mpsc::Receiver<T>
    where
        T: Send + 'static,
        F: Fn(&T) -> bool + Send + 'static,
    {
        let (tx, out) = mpsc::channel();
        thread::spawn(move || {
            for item in rx.iter() {
                if pred(&item) {
                    tx.send(item).unwrap();
                }
            }
        });
        out
    }
    
    // --- Flat-map stage: one item → multiple outputs ---
    fn flat_map_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
    where
        T: Send + 'static,
        U: Send + 'static,
        F: Fn(T) -> Vec<U> + Send + 'static,
    {
        let (tx, out) = mpsc::channel();
        thread::spawn(move || {
            for item in rx.iter() {
                for v in f(item) {
                    tx.send(v).unwrap();
                }
            }
        });
        out
    }
    
    // --- Build a multi-stage pipeline ---
    fn pipeline_even_squares() -> Vec<String> {
        let (tx, rx) = mpsc::channel::<i32>();
    
        // Stage 1: square
        let rx1 = map_stage(rx, |x| x * x);
        // Stage 2: keep even
        let rx2 = filter_stage(rx1, |x| x % 2 == 0);
        // Stage 3: to string
        let rx3 = map_stage(rx2, |x: i32| x.to_string());
    
        // Producer
        let h = thread::spawn(move || {
            for i in 1..=10 {
                tx.send(i).unwrap();
            }
        });
    
        let results: Vec<String> = rx3.iter().collect();
        h.join().unwrap();
        results
    }
    
    // --- More complex: tokenize → filter stop words → count ---
    fn word_count_pipeline(text: &str) -> usize {
        let stop_words = ["the", "a", "an", "is", "in", "of", "to"];
        let words: Vec<String> = text.split_whitespace().map(|w| w.to_lowercase()).collect();
    
        let (tx, rx) = mpsc::channel::<String>();
    
        // Stage 1: emit each word
        let rx1 = filter_stage(rx, move |w: &String| !stop_words.contains(&w.as_str()));
        // Stage 2: remove empty
        let rx2 = filter_stage(rx1, |w: &String| !w.is_empty());
        // Stage 3: get length (to count)
        let rx3 = map_stage(rx2, |_: String| 1usize);
    
        let h = thread::spawn(move || {
            for w in words {
                tx.send(w).unwrap();
            }
        });
    
        let count: usize = rx3.iter().sum();
        h.join().unwrap();
        count
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_pipeline_even_squares() {
            let results = pipeline_even_squares();
            // Squares of 1..10: 1,4,9,16,25,36,49,64,81,100
            // Even: 4,16,36,64,100
            assert_eq!(results, vec!["4", "16", "36", "64", "100"]);
        }
    
        #[test]
        fn test_map_stage() {
            let (tx, rx) = mpsc::channel::<i32>();
            let out = map_stage(rx, |x| x * 2);
            for i in [1, 2, 3] {
                tx.send(i).unwrap();
            }
            drop(tx);
            let results: Vec<i32> = out.iter().collect();
            assert_eq!(results, vec![2, 4, 6]);
        }
    
        #[test]
        fn test_filter_stage() {
            let (tx, rx) = mpsc::channel::<i32>();
            let out = filter_stage(rx, |x| x % 2 == 0);
            for i in 1..=6 {
                tx.send(i).unwrap();
            }
            drop(tx);
            let results: Vec<i32> = out.iter().collect();
            assert_eq!(results, vec![2, 4, 6]);
        }
    
        #[test]
        fn test_flat_map_stage() {
            let (tx, rx) = mpsc::channel::<i32>();
            let out = flat_map_stage(rx, |x| vec![x, x * 10]);
            for i in [1, 2, 3] {
                tx.send(i).unwrap();
            }
            drop(tx);
            let results: Vec<i32> = out.iter().collect();
            assert_eq!(results, vec![1, 10, 2, 20, 3, 30]);
        }
    
        #[test]
        fn test_word_count_pipeline() {
            let count = word_count_pipeline("the quick brown fox jumps over the lazy dog");
            // 9 words - stop words: the(x2), over -> 6 content words
            assert!(count > 0 && count < 9);
        }
    }

    Key Differences

    AspectRustOCaml
    Channel closeAutomatic on Sender dropManual None sentinel
    filter predicateFn(&T) -> bool — borrows'a -> bool — same concept
    flat_map expansionVec<U> return, forward eachlist return, forward each
    Batchstd::mem::replace to swapBuffer or Queue based

    Each stage in this pipeline runs in its own OS thread — true parallel streaming. The channel acts as a bounded FIFO between stages (unbounded mpsc; use sync_channel(n) for backpressure).

    OCaml Approach

    let map_stage rx f =
      let (tx, out) = Domainslib.Chan.make_unbounded () in
      Domain.spawn (fun () ->
        let rec loop () = match Domainslib.Chan.recv rx with
          | None -> Domainslib.Chan.close tx
          | Some v -> Domainslib.Chan.send tx (Some (f v)); loop ()
        in loop ()
      ) |> ignore;
      out
    
    let filter_stage rx pred =
      let (tx, out) = Domainslib.Chan.make_unbounded () in
      Domain.spawn (fun () ->
        let rec loop () = match Domainslib.Chan.recv rx with
          | None -> Domainslib.Chan.close tx
          | Some v ->
            if pred v then Domainslib.Chan.send tx (Some v);
            loop ()
        in loop ()
      ) |> ignore;
      out
    

    OCaml uses None as a sentinel to close channels (since Domainslib.Chan lacks automatic close on owner drop). Rust's channel closes when the Sender is dropped — automatic sentinel via ownership.

    Full Source

    #![allow(clippy::all)]
    // 995: N-Stage Streaming Pipeline
    // Each stage is a thread + channel — filter/map/transform stages
    
    use std::sync::mpsc;
    use std::thread;
    
    // --- Map stage: applies f to each item ---
    fn map_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
    where
        T: Send + 'static,
        U: Send + 'static,
        F: Fn(T) -> U + Send + 'static,
    {
        let (tx, out) = mpsc::channel();
        thread::spawn(move || {
            for item in rx.iter() {
                tx.send(f(item)).unwrap();
            }
        });
        out
    }
    
    // --- Filter stage: only forward items where pred is true ---
    fn filter_stage<T, F>(rx: mpsc::Receiver<T>, pred: F) -> mpsc::Receiver<T>
    where
        T: Send + 'static,
        F: Fn(&T) -> bool + Send + 'static,
    {
        let (tx, out) = mpsc::channel();
        thread::spawn(move || {
            for item in rx.iter() {
                if pred(&item) {
                    tx.send(item).unwrap();
                }
            }
        });
        out
    }
    
    // --- Flat-map stage: one item → multiple outputs ---
    fn flat_map_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
    where
        T: Send + 'static,
        U: Send + 'static,
        F: Fn(T) -> Vec<U> + Send + 'static,
    {
        let (tx, out) = mpsc::channel();
        thread::spawn(move || {
            for item in rx.iter() {
                for v in f(item) {
                    tx.send(v).unwrap();
                }
            }
        });
        out
    }
    
    // --- Build a multi-stage pipeline ---
    fn pipeline_even_squares() -> Vec<String> {
        let (tx, rx) = mpsc::channel::<i32>();
    
        // Stage 1: square
        let rx1 = map_stage(rx, |x| x * x);
        // Stage 2: keep even
        let rx2 = filter_stage(rx1, |x| x % 2 == 0);
        // Stage 3: to string
        let rx3 = map_stage(rx2, |x: i32| x.to_string());
    
        // Producer
        let h = thread::spawn(move || {
            for i in 1..=10 {
                tx.send(i).unwrap();
            }
        });
    
        let results: Vec<String> = rx3.iter().collect();
        h.join().unwrap();
        results
    }
    
    // --- More complex: tokenize → filter stop words → count ---
    fn word_count_pipeline(text: &str) -> usize {
        let stop_words = ["the", "a", "an", "is", "in", "of", "to"];
        let words: Vec<String> = text.split_whitespace().map(|w| w.to_lowercase()).collect();
    
        let (tx, rx) = mpsc::channel::<String>();
    
        // Stage 1: emit each word
        let rx1 = filter_stage(rx, move |w: &String| !stop_words.contains(&w.as_str()));
        // Stage 2: remove empty
        let rx2 = filter_stage(rx1, |w: &String| !w.is_empty());
        // Stage 3: get length (to count)
        let rx3 = map_stage(rx2, |_: String| 1usize);
    
        let h = thread::spawn(move || {
            for w in words {
                tx.send(w).unwrap();
            }
        });
    
        let count: usize = rx3.iter().sum();
        h.join().unwrap();
        count
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_pipeline_even_squares() {
            let results = pipeline_even_squares();
            // Squares of 1..10: 1,4,9,16,25,36,49,64,81,100
            // Even: 4,16,36,64,100
            assert_eq!(results, vec!["4", "16", "36", "64", "100"]);
        }
    
        #[test]
        fn test_map_stage() {
            let (tx, rx) = mpsc::channel::<i32>();
            let out = map_stage(rx, |x| x * 2);
            for i in [1, 2, 3] {
                tx.send(i).unwrap();
            }
            drop(tx);
            let results: Vec<i32> = out.iter().collect();
            assert_eq!(results, vec![2, 4, 6]);
        }
    
        #[test]
        fn test_filter_stage() {
            let (tx, rx) = mpsc::channel::<i32>();
            let out = filter_stage(rx, |x| x % 2 == 0);
            for i in 1..=6 {
                tx.send(i).unwrap();
            }
            drop(tx);
            let results: Vec<i32> = out.iter().collect();
            assert_eq!(results, vec![2, 4, 6]);
        }
    
        #[test]
        fn test_flat_map_stage() {
            let (tx, rx) = mpsc::channel::<i32>();
            let out = flat_map_stage(rx, |x| vec![x, x * 10]);
            for i in [1, 2, 3] {
                tx.send(i).unwrap();
            }
            drop(tx);
            let results: Vec<i32> = out.iter().collect();
            assert_eq!(results, vec![1, 10, 2, 20, 3, 30]);
        }
    
        #[test]
        fn test_word_count_pipeline() {
            let count = word_count_pipeline("the quick brown fox jumps over the lazy dog");
            // 9 words - stop words: the(x2), over -> 6 content words
            assert!(count > 0 && count < 9);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_pipeline_even_squares() {
            let results = pipeline_even_squares();
            // Squares of 1..10: 1,4,9,16,25,36,49,64,81,100
            // Even: 4,16,36,64,100
            assert_eq!(results, vec!["4", "16", "36", "64", "100"]);
        }
    
        #[test]
        fn test_map_stage() {
            let (tx, rx) = mpsc::channel::<i32>();
            let out = map_stage(rx, |x| x * 2);
            for i in [1, 2, 3] {
                tx.send(i).unwrap();
            }
            drop(tx);
            let results: Vec<i32> = out.iter().collect();
            assert_eq!(results, vec![2, 4, 6]);
        }
    
        #[test]
        fn test_filter_stage() {
            let (tx, rx) = mpsc::channel::<i32>();
            let out = filter_stage(rx, |x| x % 2 == 0);
            for i in 1..=6 {
                tx.send(i).unwrap();
            }
            drop(tx);
            let results: Vec<i32> = out.iter().collect();
            assert_eq!(results, vec![2, 4, 6]);
        }
    
        #[test]
        fn test_flat_map_stage() {
            let (tx, rx) = mpsc::channel::<i32>();
            let out = flat_map_stage(rx, |x| vec![x, x * 10]);
            for i in [1, 2, 3] {
                tx.send(i).unwrap();
            }
            drop(tx);
            let results: Vec<i32> = out.iter().collect();
            assert_eq!(results, vec![1, 10, 2, 20, 3, 30]);
        }
    
        #[test]
        fn test_word_count_pipeline() {
            let count = word_count_pipeline("the quick brown fox jumps over the lazy dog");
            // 9 words - stop words: the(x2), over -> 6 content words
            assert!(count > 0 && count < 9);
        }
    }

    Deep Comparison

    N-Stage Streaming Pipeline — Comparison

    Core Insight

    Channel pipelines are lazy iterators that run in parallel: while stage N processes item K, stage N+1 can process item K-1. This is the concurrent equivalent of Seq.map f |> Seq.filter p |> Seq.map g.

    OCaml Approach

  • make_stage in_c f creates an output channel and spawns a thread
  • make_filter in_c pred filters items — only passes matching ones
  • • Chained: let c2 = make_stage (make_stage c0 f) g
  • • Backpressure: natural if using bounded channels
  • • Each stage closes its output when input is exhausted
  • Rust Approach

  • map_stage(rx, f) and filter_stage(rx, pred) return Receiver<T>
  • • Composable by chaining: map_stage(filter_stage(rx, pred), f)
  • flat_map_stage for one-to-many expansion
  • • Thread runs for item in rx.iter() — stops when channel closes
  • • Stages run truly in parallel — all cores can be utilized
  • Comparison Table

    Stage typeOCamlRust
    Mapmake_stage in_c fmap_stage(rx, f)
    Filtermake_filter in_c predfilter_stage(rx, pred)
    Flat-mapCustom stage with multiple sendsflat_map_stage(rx, f)
    Compose 2 stagesmake_stage (make_stage c0 f) gmap_stage(map_stage(rx, f), g)
    BackpressureBounded queue (manual)sync_channel(n) for stage tx
    Stage countAny NAny N
    Channel closeExplicit close_chan out_cDrop tx (RAII)

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Add a reduce_stage<T, R, F>(rx, init, f) -> R that is the terminal sink stage.
  • Implement tee_stage<T: Clone>(rx) -> (Receiver<T>, Receiver<T>) — broadcast each item to two downstream stages.
  • Add backpressure: change all stage channels to sync_channel(64) and observe producer slowing.
  • Build a complete text-processing pipeline: source → split_words → filter_stopwords → count_frequencies → top10.
  • Benchmark the N-stage pipeline processing 1,000,000 integers against a sequential iterator chain with the same operations.
  • Open Source Repos