920-buffered-stream — Buffered Stream
Tutorial
The Problem
When processing a stream of items where each item requires significant I/O or computation, processing them one at a time creates a bottleneck. Buffered or concurrent processing allows N items to be in-flight simultaneously, keeping the processing pipeline saturated. This pattern appears in web crawlers (N concurrent HTTP requests), image processing pipelines (N images being resized in parallel), and database batch operations (N rows being processed simultaneously). The key challenge is bounding concurrency — allowing N concurrent operations without spawning unbounded threads. This example uses a semaphore to implement bounded concurrency.
🎯 Learning Outcomes
Arc<Mutex<Vec<(usize, U)>>> for thread-safe result collection with orderingthread::spawn + join for parallel map with bounded parallelismLwt_pool for bounded concurrent processingCode Example
#![allow(clippy::all)]
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
struct Semaphore {
count: Mutex<usize>,
cond: Condvar,
}
impl Semaphore {
fn new(n: usize) -> Arc<Self> {
Arc::new(Self {
count: Mutex::new(n),
cond: Condvar::new(),
})
}
fn acquire(&self) {
let mut c = self.count.lock().unwrap();
while *c == 0 {
c = self.cond.wait(c).unwrap();
}
*c -= 1;
}
fn release(&self) {
*self.count.lock().unwrap() += 1;
self.cond.notify_one();
}
}
fn buffered_map<T, U, F>(items: Vec<T>, concurrency: usize, f: F) -> Vec<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + Sync + 'static,
{
let sem = Semaphore::new(concurrency);
let f = Arc::new(f);
let results: Arc<Mutex<Vec<(usize, U)>>> = Arc::new(Mutex::new(Vec::new()));
let handles: Vec<_> = items
.into_iter()
.enumerate()
.map(|(i, item)| {
let sem = Arc::clone(&sem);
let f = Arc::clone(&f);
let results = Arc::clone(&results);
thread::spawn(move || {
sem.acquire();
let result = f(item);
sem.release();
results.lock().unwrap().push((i, result));
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let mut res = results.lock().unwrap().drain(..).collect::<Vec<_>>();
res.sort_by_key(|(i, _)| *i);
res.into_iter().map(|(_, v)| v).collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn buffered_map_all_results() {
let r = buffered_map(vec![1u64, 2, 3, 4, 5], 2, |x| x * 2);
assert_eq!(r, vec![2, 4, 6, 8, 10]);
}
#[test]
fn concurrency_1_sequential() {
let r = buffered_map(vec![1, 2, 3], 1, |x: i32| x + 10);
assert_eq!(r, vec![11, 12, 13]);
}
}Key Differences
Semaphore — must be built from Mutex + Condvar; OCaml's Lwt_pool is a higher-level abstraction.Lwt uses cooperative green threads (lighter per task).rayon or tokio provide more ergonomic bounded parallelism.OCaml Approach
OCaml's Lwt library uses Lwt_pool.create n f for bounded resource pools. Lwt_pool.use pool (fun resource -> ...) acquires a slot, runs the task, releases. For async/await: Eio.Pool.run pool task. OCaml's Thread module (and later Domain in OCaml 5) can implement similar patterns with Mutex and Condition primitives. The OCaml concurrent ecosystem is richer than the std primitives allow, while Rust's std library is intentionally minimal for concurrency.
Full Source
#![allow(clippy::all)]
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
struct Semaphore {
count: Mutex<usize>,
cond: Condvar,
}
impl Semaphore {
fn new(n: usize) -> Arc<Self> {
Arc::new(Self {
count: Mutex::new(n),
cond: Condvar::new(),
})
}
fn acquire(&self) {
let mut c = self.count.lock().unwrap();
while *c == 0 {
c = self.cond.wait(c).unwrap();
}
*c -= 1;
}
fn release(&self) {
*self.count.lock().unwrap() += 1;
self.cond.notify_one();
}
}
fn buffered_map<T, U, F>(items: Vec<T>, concurrency: usize, f: F) -> Vec<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + Sync + 'static,
{
let sem = Semaphore::new(concurrency);
let f = Arc::new(f);
let results: Arc<Mutex<Vec<(usize, U)>>> = Arc::new(Mutex::new(Vec::new()));
let handles: Vec<_> = items
.into_iter()
.enumerate()
.map(|(i, item)| {
let sem = Arc::clone(&sem);
let f = Arc::clone(&f);
let results = Arc::clone(&results);
thread::spawn(move || {
sem.acquire();
let result = f(item);
sem.release();
results.lock().unwrap().push((i, result));
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let mut res = results.lock().unwrap().drain(..).collect::<Vec<_>>();
res.sort_by_key(|(i, _)| *i);
res.into_iter().map(|(_, v)| v).collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn buffered_map_all_results() {
let r = buffered_map(vec![1u64, 2, 3, 4, 5], 2, |x| x * 2);
assert_eq!(r, vec![2, 4, 6, 8, 10]);
}
#[test]
fn concurrency_1_sequential() {
let r = buffered_map(vec![1, 2, 3], 1, |x: i32| x + 10);
assert_eq!(r, vec![11, 12, 13]);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn buffered_map_all_results() {
let r = buffered_map(vec![1u64, 2, 3, 4, 5], 2, |x| x * 2);
assert_eq!(r, vec![2, 4, 6, 8, 10]);
}
#[test]
fn concurrency_1_sequential() {
let r = buffered_map(vec![1, 2, 3], 1, |x: i32| x + 10);
assert_eq!(r, vec![11, 12, 13]);
}
}
Deep Comparison
920-buffered-stream — Language Comparison
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
buffered_map to use std::sync::mpsc channels instead of Arc<Mutex<Vec>> for result collection.buffered_filter_map<T, U, F>(items: Vec<T>, n: usize, f: F) -> Vec<U> that discards None results.