463: Fan-Out / Fan-In Pattern
Tutorial
The Problem
One slow processing step can bottleneck an entire pipeline. Fan-out distributes work items from one source to N parallel workers; fan-in collects results from all N workers back to one consumer. Together, they parallelize the bottleneck stage without changing the serial stages around it. If one processing step takes 100ms and you have 8 cores, 8 parallel workers reduce the step's throughput contribution to ~12.5ms per item — 8x improvement.
Fan-out/fan-in appears in MapReduce frameworks, parallel database aggregations, web crawler link processing, batch ML inference, and any stage requiring horizontal scaling.
🎯 Learning Outcomes
Arc<Mutex<Iterator>> enables work stealing among workerstx clonesCode Example
#![allow(clippy::all)]
// 463. Fan-out / fan-in
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
fn fan_map<T, U, F>(items: Vec<T>, n: usize, f: F) -> Vec<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + Sync + 'static,
{
let work = Arc::new(Mutex::new(items.into_iter()));
let f = Arc::new(f);
let (tx, rx) = mpsc::channel::<U>();
let ws: Vec<_> = (0..n)
.map(|_| {
let (w, f, t) = (Arc::clone(&work), Arc::clone(&f), tx.clone());
thread::spawn(move || loop {
let item = w.lock().unwrap().next();
match item {
Some(x) => {
let _ = t.send(f(x));
}
None => break,
}
})
})
.collect();
drop(tx);
for w in ws {
w.join().unwrap();
}
rx.iter().collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fan_map() {
let mut r = fan_map((1..=8u32).collect(), 4, |x| x * 2);
r.sort();
assert_eq!(r, vec![2, 4, 6, 8, 10, 12, 14, 16]);
}
#[test]
fn test_all() {
assert_eq!(fan_map((0..100u32).collect(), 8, |x| x).len(), 100);
}
}Key Differences
Arc<Mutex<Iterator>> (work stealing); OCaml typically pre-distributes work (static partitioning).mpsc::channel collects results in completion order (non-deterministic); OCaml's List.map Domain.join collects in spawn order.Arc<Mutex<Iterator>>) handles variable-cost items better than static partitioning.rayon::par_iter().map(f).collect() is fan-out/fan-in in one operation; this manual implementation shows the underlying mechanism.OCaml Approach
OCaml's fan-out uses List.map (fun item -> Domain.spawn (fun () -> process item)) items in OCaml 5.x, then List.map Domain.join handles for fan-in. Domainslib.Task.parallel_for is the idiomatic OCaml 5.x approach. In OCaml 4.x, Thread.create with channels provides the same pattern. OCaml's list map naturally expresses the fan-out structure.
Full Source
#![allow(clippy::all)]
// 463. Fan-out / fan-in
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
fn fan_map<T, U, F>(items: Vec<T>, n: usize, f: F) -> Vec<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + Sync + 'static,
{
let work = Arc::new(Mutex::new(items.into_iter()));
let f = Arc::new(f);
let (tx, rx) = mpsc::channel::<U>();
let ws: Vec<_> = (0..n)
.map(|_| {
let (w, f, t) = (Arc::clone(&work), Arc::clone(&f), tx.clone());
thread::spawn(move || loop {
let item = w.lock().unwrap().next();
match item {
Some(x) => {
let _ = t.send(f(x));
}
None => break,
}
})
})
.collect();
drop(tx);
for w in ws {
w.join().unwrap();
}
rx.iter().collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fan_map() {
let mut r = fan_map((1..=8u32).collect(), 4, |x| x * 2);
r.sort();
assert_eq!(r, vec![2, 4, 6, 8, 10, 12, 14, 16]);
}
#[test]
fn test_all() {
assert_eq!(fan_map((0..100u32).collect(), 8, |x| x).len(), 100);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fan_map() {
let mut r = fan_map((1..=8u32).collect(), 4, |x| x * 2);
r.sort();
assert_eq!(r, vec![2, 4, 6, 8, 10, 12, 14, 16]);
}
#[test]
fn test_all() {
assert_eq!(fan_map((0..100u32).collect(), 8, |x| x).len(), 100);
}
}
Exercises
fan_map to return results in the same order as the input. Hint: include an index with each work item and sort results by index after fan-in.