ExamplesBy LevelBy TopicLearning Paths
463 Fundamental

463: Fan-Out / Fan-In Pattern

Functional Programming

Tutorial

The Problem

One slow processing step can bottleneck an entire pipeline. Fan-out distributes work items from one source to N parallel workers; fan-in collects results from all N workers back to one consumer. Together, they parallelize the bottleneck stage without changing the serial stages around it. If one processing step takes 100ms and you have 8 cores, 8 parallel workers reduce the step's throughput contribution to ~12.5ms per item — 8x improvement.

Fan-out/fan-in appears in MapReduce frameworks, parallel database aggregations, web crawler link processing, batch ML inference, and any stage requiring horizontal scaling.

🎯 Learning Outcomes

  • • Understand fan-out: distributing work items to multiple concurrent workers
  • • Learn fan-in: collecting results from multiple workers into a single channel
  • • See how Arc<Mutex<Iterator>> enables work stealing among workers
  • • Understand channel close as the shutdown signal: all workers drop their tx clones
  • • Learn the pattern's relationship to MapReduce (fan-out = map, fan-in = reduce)
  • Code Example

    #![allow(clippy::all)]
    // 463. Fan-out / fan-in
    use std::sync::{mpsc, Arc, Mutex};
    use std::thread;
    
    fn fan_map<T, U, F>(items: Vec<T>, n: usize, f: F) -> Vec<U>
    where
        T: Send + 'static,
        U: Send + 'static,
        F: Fn(T) -> U + Send + Sync + 'static,
    {
        let work = Arc::new(Mutex::new(items.into_iter()));
        let f = Arc::new(f);
        let (tx, rx) = mpsc::channel::<U>();
        let ws: Vec<_> = (0..n)
            .map(|_| {
                let (w, f, t) = (Arc::clone(&work), Arc::clone(&f), tx.clone());
                thread::spawn(move || loop {
                    let item = w.lock().unwrap().next();
                    match item {
                        Some(x) => {
                            let _ = t.send(f(x));
                        }
                        None => break,
                    }
                })
            })
            .collect();
        drop(tx);
        for w in ws {
            w.join().unwrap();
        }
        rx.iter().collect()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        #[test]
        fn test_fan_map() {
            let mut r = fan_map((1..=8u32).collect(), 4, |x| x * 2);
            r.sort();
            assert_eq!(r, vec![2, 4, 6, 8, 10, 12, 14, 16]);
        }
        #[test]
        fn test_all() {
            assert_eq!(fan_map((0..100u32).collect(), 8, |x| x).len(), 100);
        }
    }

    Key Differences

  • Work distribution: Rust's fan-out uses Arc<Mutex<Iterator>> (work stealing); OCaml typically pre-distributes work (static partitioning).
  • Result order: Fan-in with mpsc::channel collects results in completion order (non-deterministic); OCaml's List.map Domain.join collects in spawn order.
  • Dynamic load: Work stealing (Arc<Mutex<Iterator>>) handles variable-cost items better than static partitioning.
  • Rayon analogy: rayon::par_iter().map(f).collect() is fan-out/fan-in in one operation; this manual implementation shows the underlying mechanism.
  • OCaml Approach

    OCaml's fan-out uses List.map (fun item -> Domain.spawn (fun () -> process item)) items in OCaml 5.x, then List.map Domain.join handles for fan-in. Domainslib.Task.parallel_for is the idiomatic OCaml 5.x approach. In OCaml 4.x, Thread.create with channels provides the same pattern. OCaml's list map naturally expresses the fan-out structure.

    Full Source

    #![allow(clippy::all)]
    // 463. Fan-out / fan-in
    use std::sync::{mpsc, Arc, Mutex};
    use std::thread;
    
    fn fan_map<T, U, F>(items: Vec<T>, n: usize, f: F) -> Vec<U>
    where
        T: Send + 'static,
        U: Send + 'static,
        F: Fn(T) -> U + Send + Sync + 'static,
    {
        let work = Arc::new(Mutex::new(items.into_iter()));
        let f = Arc::new(f);
        let (tx, rx) = mpsc::channel::<U>();
        let ws: Vec<_> = (0..n)
            .map(|_| {
                let (w, f, t) = (Arc::clone(&work), Arc::clone(&f), tx.clone());
                thread::spawn(move || loop {
                    let item = w.lock().unwrap().next();
                    match item {
                        Some(x) => {
                            let _ = t.send(f(x));
                        }
                        None => break,
                    }
                })
            })
            .collect();
        drop(tx);
        for w in ws {
            w.join().unwrap();
        }
        rx.iter().collect()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        #[test]
        fn test_fan_map() {
            let mut r = fan_map((1..=8u32).collect(), 4, |x| x * 2);
            r.sort();
            assert_eq!(r, vec![2, 4, 6, 8, 10, 12, 14, 16]);
        }
        #[test]
        fn test_all() {
            assert_eq!(fan_map((0..100u32).collect(), 8, |x| x).len(), 100);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
        #[test]
        fn test_fan_map() {
            let mut r = fan_map((1..=8u32).collect(), 4, |x| x * 2);
            r.sort();
            assert_eq!(r, vec![2, 4, 6, 8, 10, 12, 14, 16]);
        }
        #[test]
        fn test_all() {
            assert_eq!(fan_map((0..100u32).collect(), 8, |x| x).len(), 100);
        }
    }

    Exercises

  • Order-preserving fan-in: Modify fan_map to return results in the same order as the input. Hint: include an index with each work item and sort results by index after fan-in.
  • Dynamic fan-out: Instead of fixed N workers, implement adaptive fan-out that spawns new workers when the queue is more than 50% full and lets workers exit when the queue is empty. Track the peak worker count.
  • Pipeline fan-out: Integrate fan-out into the pipeline from example 462. Replace a slow single stage with a fan-out stage that has N parallel workers, while the surrounding serial stages remain unchanged.
  • Open Source Repos