ExamplesBy LevelBy TopicLearning Paths
920 Intermediate

920-buffered-stream — Buffered Stream

Functional Programming

Tutorial

The Problem

When processing a stream of items where each item requires significant I/O or computation, processing them one at a time creates a bottleneck. Buffered or concurrent processing allows N items to be in-flight simultaneously, keeping the processing pipeline saturated. This pattern appears in web crawlers (N concurrent HTTP requests), image processing pipelines (N images being resized in parallel), and database batch operations (N rows being processed simultaneously). The key challenge is bounding concurrency — allowing N concurrent operations without spawning unbounded threads. This example uses a semaphore to implement bounded concurrency.

🎯 Learning Outcomes

  • • Implement bounded concurrent processing using a counting semaphore
  • • Use Arc<Mutex<Vec<(usize, U)>>> for thread-safe result collection with ordering
  • • Understand how semaphore acquire/release controls concurrency level
  • • Use thread::spawn + join for parallel map with bounded parallelism
  • • Compare with OCaml's Lwt_pool for bounded concurrent processing
  • Code Example

    #![allow(clippy::all)]
    use std::sync::{Arc, Condvar, Mutex};
    use std::thread;
    
    struct Semaphore {
        count: Mutex<usize>,
        cond: Condvar,
    }
    
    impl Semaphore {
        fn new(n: usize) -> Arc<Self> {
            Arc::new(Self {
                count: Mutex::new(n),
                cond: Condvar::new(),
            })
        }
        fn acquire(&self) {
            let mut c = self.count.lock().unwrap();
            while *c == 0 {
                c = self.cond.wait(c).unwrap();
            }
            *c -= 1;
        }
        fn release(&self) {
            *self.count.lock().unwrap() += 1;
            self.cond.notify_one();
        }
    }
    
    fn buffered_map<T, U, F>(items: Vec<T>, concurrency: usize, f: F) -> Vec<U>
    where
        T: Send + 'static,
        U: Send + 'static,
        F: Fn(T) -> U + Send + Sync + 'static,
    {
        let sem = Semaphore::new(concurrency);
        let f = Arc::new(f);
        let results: Arc<Mutex<Vec<(usize, U)>>> = Arc::new(Mutex::new(Vec::new()));
    
        let handles: Vec<_> = items
            .into_iter()
            .enumerate()
            .map(|(i, item)| {
                let sem = Arc::clone(&sem);
                let f = Arc::clone(&f);
                let results = Arc::clone(&results);
                thread::spawn(move || {
                    sem.acquire();
                    let result = f(item);
                    sem.release();
                    results.lock().unwrap().push((i, result));
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
    
        let mut res = results.lock().unwrap().drain(..).collect::<Vec<_>>();
        res.sort_by_key(|(i, _)| *i);
        res.into_iter().map(|(_, v)| v).collect()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        #[test]
        fn buffered_map_all_results() {
            let r = buffered_map(vec![1u64, 2, 3, 4, 5], 2, |x| x * 2);
            assert_eq!(r, vec![2, 4, 6, 8, 10]);
        }
        #[test]
        fn concurrency_1_sequential() {
            let r = buffered_map(vec![1, 2, 3], 1, |x: i32| x + 10);
            assert_eq!(r, vec![11, 12, 13]);
        }
    }

    Key Differences

  • Semaphore in std: Rust std has no built-in Semaphore — must be built from Mutex + Condvar; OCaml's Lwt_pool is a higher-level abstraction.
  • Result ordering: Both need to track original indices for ordered output from parallel processing.
  • Thread vs lightweight: Rust uses OS threads (expensive per thread); OCaml's Lwt uses cooperative green threads (lighter per task).
  • Bounded vs unbounded: The semaphore pattern bounds OS thread execution; Rust's rayon or tokio provide more ergonomic bounded parallelism.
  • OCaml Approach

    OCaml's Lwt library uses Lwt_pool.create n f for bounded resource pools. Lwt_pool.use pool (fun resource -> ...) acquires a slot, runs the task, releases. For async/await: Eio.Pool.run pool task. OCaml's Thread module (and later Domain in OCaml 5) can implement similar patterns with Mutex and Condition primitives. The OCaml concurrent ecosystem is richer than the std primitives allow, while Rust's std library is intentionally minimal for concurrency.

    Full Source

    #![allow(clippy::all)]
    use std::sync::{Arc, Condvar, Mutex};
    use std::thread;
    
    struct Semaphore {
        count: Mutex<usize>,
        cond: Condvar,
    }
    
    impl Semaphore {
        fn new(n: usize) -> Arc<Self> {
            Arc::new(Self {
                count: Mutex::new(n),
                cond: Condvar::new(),
            })
        }
        fn acquire(&self) {
            let mut c = self.count.lock().unwrap();
            while *c == 0 {
                c = self.cond.wait(c).unwrap();
            }
            *c -= 1;
        }
        fn release(&self) {
            *self.count.lock().unwrap() += 1;
            self.cond.notify_one();
        }
    }
    
    fn buffered_map<T, U, F>(items: Vec<T>, concurrency: usize, f: F) -> Vec<U>
    where
        T: Send + 'static,
        U: Send + 'static,
        F: Fn(T) -> U + Send + Sync + 'static,
    {
        let sem = Semaphore::new(concurrency);
        let f = Arc::new(f);
        let results: Arc<Mutex<Vec<(usize, U)>>> = Arc::new(Mutex::new(Vec::new()));
    
        let handles: Vec<_> = items
            .into_iter()
            .enumerate()
            .map(|(i, item)| {
                let sem = Arc::clone(&sem);
                let f = Arc::clone(&f);
                let results = Arc::clone(&results);
                thread::spawn(move || {
                    sem.acquire();
                    let result = f(item);
                    sem.release();
                    results.lock().unwrap().push((i, result));
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
    
        let mut res = results.lock().unwrap().drain(..).collect::<Vec<_>>();
        res.sort_by_key(|(i, _)| *i);
        res.into_iter().map(|(_, v)| v).collect()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        #[test]
        fn buffered_map_all_results() {
            let r = buffered_map(vec![1u64, 2, 3, 4, 5], 2, |x| x * 2);
            assert_eq!(r, vec![2, 4, 6, 8, 10]);
        }
        #[test]
        fn concurrency_1_sequential() {
            let r = buffered_map(vec![1, 2, 3], 1, |x: i32| x + 10);
            assert_eq!(r, vec![11, 12, 13]);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
        #[test]
        fn buffered_map_all_results() {
            let r = buffered_map(vec![1u64, 2, 3, 4, 5], 2, |x| x * 2);
            assert_eq!(r, vec![2, 4, 6, 8, 10]);
        }
        #[test]
        fn concurrency_1_sequential() {
            let r = buffered_map(vec![1, 2, 3], 1, |x: i32| x + 10);
            assert_eq!(r, vec![11, 12, 13]);
        }
    }

    Deep Comparison

    920-buffered-stream — Language Comparison

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Refactor buffered_map to use std::sync::mpsc channels instead of Arc<Mutex<Vec>> for result collection.
  • Implement buffered_filter_map<T, U, F>(items: Vec<T>, n: usize, f: F) -> Vec<U> that discards None results.
  • Add a timeout to each task: if a task takes longer than a specified duration, return a default value instead.
  • Open Source Repos