993 Work Queue
Tutorial
The Problem
Implement a thread pool / work queue with N worker threads sharing a single mpsc::Receiver<Task> wrapped in Arc<Mutex<Receiver>>. Worker threads loop: lock the receiver, dequeue one task, unlock, execute. When the ThreadPool is dropped, the channel closes and workers exit cleanly.
🎯 Learning Outcomes
type Task = Box<dyn FnOnce() + Send + 'static> for type-erased, owned closuresReceiver<Task> across all workers using Arc<Mutex<Receiver<Task>>>lock → recv() (blocks for next task) → unlock → execute taskDrop for ThreadPool — joining all workers ensures tasks complete before the pool is destroyedMutex is needed: Receiver<T> is not Sync (only one thread can receive at a time)Code Example
#![allow(clippy::all)]
// 993: Thread Pool / Work Queue
// Fixed N workers consuming tasks from a shared mpsc channel
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
type Task = Box<dyn FnOnce() + Send + 'static>;
struct ThreadPool {
sender: mpsc::Sender<Task>,
workers: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
fn new(size: usize) -> Self {
assert!(size > 0);
let (sender, receiver) = mpsc::channel::<Task>();
// Wrap receiver in Arc<Mutex> so all workers can share it
let receiver = Arc::new(Mutex::new(receiver));
let workers = (0..size)
.map(|_| {
let rx = Arc::clone(&receiver);
thread::spawn(move || {
// Each worker loops: lock, get task, unlock, run task
loop {
let task = {
let lock = rx.lock().unwrap();
lock.recv() // blocks until task arrives or channel closes
};
match task {
Ok(f) => f(),
Err(_) => break, // channel closed → exit
}
}
})
})
.collect();
ThreadPool { sender, workers }
}
fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
self.sender.send(Box::new(f)).unwrap();
}
fn shutdown(self) {
drop(self.sender); // close channel → workers see Err and break
for w in self.workers {
w.join().unwrap();
}
}
}
// --- Approach 1: Submit tasks that collect results ---
fn pool_squares() -> Vec<i64> {
let pool = ThreadPool::new(4);
let results = Arc::new(Mutex::new(Vec::new()));
for i in 1i64..=20 {
let results = Arc::clone(&results);
pool.execute(move || {
results.lock().unwrap().push(i * i);
});
}
pool.shutdown();
let mut v = results.lock().unwrap().clone();
v.sort();
v
}
// --- Approach 2: Work queue with return values via channel ---
fn pool_with_results(inputs: Vec<i32>) -> Vec<i32> {
let pool = ThreadPool::new(3);
let (tx, rx) = mpsc::channel::<i32>();
let n = inputs.len();
for x in inputs {
let tx = tx.clone();
pool.execute(move || {
tx.send(x * x).unwrap();
});
}
drop(tx); // close sender side
pool.shutdown();
let mut results: Vec<i32> = rx.iter().collect();
// Ensure we got all results (pool shutdown closed the channel)
assert_eq!(results.len(), n);
results.sort();
results
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_squares_all_computed() {
let squares = pool_squares();
assert_eq!(squares.len(), 20);
// Sum of i^2 for i=1..20 = 2870
let sum: i64 = squares.iter().sum();
assert_eq!(sum, 2870);
}
#[test]
fn test_pool_with_results() {
let results = pool_with_results(vec![1, 2, 3, 4, 5]);
assert_eq!(results, vec![1, 4, 9, 16, 25]);
}
#[test]
fn test_pool_empty_tasks() {
let pool = ThreadPool::new(2);
pool.shutdown(); // should not hang
}
#[test]
fn test_pool_single_worker() {
let pool = ThreadPool::new(1);
let results = Arc::new(Mutex::new(Vec::new()));
for i in 0..5 {
let r = Arc::clone(&results);
pool.execute(move || r.lock().unwrap().push(i));
}
pool.shutdown();
let mut v = results.lock().unwrap().clone();
v.sort();
assert_eq!(v, vec![0, 1, 2, 3, 4]);
}
#[test]
fn test_pool_more_tasks_than_workers() {
let pool = ThreadPool::new(2);
let counter = Arc::new(Mutex::new(0u32));
for _ in 0..100 {
let c = Arc::clone(&counter);
pool.execute(move || *c.lock().unwrap() += 1);
}
pool.shutdown();
assert_eq!(*counter.lock().unwrap(), 100);
}
}Key Differences
| Aspect | Rust | OCaml |
|---|---|---|
| Task type | Box<dyn FnOnce() + Send + 'static> | unit -> unit function |
| Shared queue | Arc<Mutex<Receiver<Task>>> | Queue + Mutex + Condition |
| Channel close | Sender drop propagates to recv() | Manual running = false + broadcast |
Drop for join | impl Drop for ThreadPool { ... } | Explicit shutdown + join |
The Arc<Mutex<Receiver>> trick is idiomatic Rust for fan-out from a single channel to multiple consumers. The lock is held only during the recv() call (microseconds), so contention is minimal.
OCaml Approach
(* OCaml 5.0+: Domainslib.Task.pool *)
let pool = Domainslib.Task.setup_pool ~num_domains:4 ()
let submit_task pool f =
Domainslib.Task.run pool (fun () -> f ())
(* Manual thread pool with Queue + Mutex + Condition *)
type 'a pool = {
queue: 'a Queue.t;
mutex: Mutex.t;
cond: Condition.t;
mutable running: bool;
}
let dequeue p =
Mutex.lock p.mutex;
while Queue.is_empty p.queue && p.running do
Condition.wait p.cond p.mutex
done;
let task = if Queue.is_empty p.queue then None else Some (Queue.pop p.queue) in
Mutex.unlock p.mutex;
task
OCaml's Domainslib.Task provides a production-ready parallel task pool for OCaml 5.0+. The manual implementation mirrors Rust's Arc<Mutex<Receiver>> pattern using Queue + Mutex + Condition.
Full Source
#![allow(clippy::all)]
// 993: Thread Pool / Work Queue
// Fixed N workers consuming tasks from a shared mpsc channel
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
type Task = Box<dyn FnOnce() + Send + 'static>;
struct ThreadPool {
sender: mpsc::Sender<Task>,
workers: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
fn new(size: usize) -> Self {
assert!(size > 0);
let (sender, receiver) = mpsc::channel::<Task>();
// Wrap receiver in Arc<Mutex> so all workers can share it
let receiver = Arc::new(Mutex::new(receiver));
let workers = (0..size)
.map(|_| {
let rx = Arc::clone(&receiver);
thread::spawn(move || {
// Each worker loops: lock, get task, unlock, run task
loop {
let task = {
let lock = rx.lock().unwrap();
lock.recv() // blocks until task arrives or channel closes
};
match task {
Ok(f) => f(),
Err(_) => break, // channel closed → exit
}
}
})
})
.collect();
ThreadPool { sender, workers }
}
fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
self.sender.send(Box::new(f)).unwrap();
}
fn shutdown(self) {
drop(self.sender); // close channel → workers see Err and break
for w in self.workers {
w.join().unwrap();
}
}
}
// --- Approach 1: Submit tasks that collect results ---
fn pool_squares() -> Vec<i64> {
let pool = ThreadPool::new(4);
let results = Arc::new(Mutex::new(Vec::new()));
for i in 1i64..=20 {
let results = Arc::clone(&results);
pool.execute(move || {
results.lock().unwrap().push(i * i);
});
}
pool.shutdown();
let mut v = results.lock().unwrap().clone();
v.sort();
v
}
// --- Approach 2: Work queue with return values via channel ---
fn pool_with_results(inputs: Vec<i32>) -> Vec<i32> {
let pool = ThreadPool::new(3);
let (tx, rx) = mpsc::channel::<i32>();
let n = inputs.len();
for x in inputs {
let tx = tx.clone();
pool.execute(move || {
tx.send(x * x).unwrap();
});
}
drop(tx); // close sender side
pool.shutdown();
let mut results: Vec<i32> = rx.iter().collect();
// Ensure we got all results (pool shutdown closed the channel)
assert_eq!(results.len(), n);
results.sort();
results
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_squares_all_computed() {
let squares = pool_squares();
assert_eq!(squares.len(), 20);
// Sum of i^2 for i=1..20 = 2870
let sum: i64 = squares.iter().sum();
assert_eq!(sum, 2870);
}
#[test]
fn test_pool_with_results() {
let results = pool_with_results(vec![1, 2, 3, 4, 5]);
assert_eq!(results, vec![1, 4, 9, 16, 25]);
}
#[test]
fn test_pool_empty_tasks() {
let pool = ThreadPool::new(2);
pool.shutdown(); // should not hang
}
#[test]
fn test_pool_single_worker() {
let pool = ThreadPool::new(1);
let results = Arc::new(Mutex::new(Vec::new()));
for i in 0..5 {
let r = Arc::clone(&results);
pool.execute(move || r.lock().unwrap().push(i));
}
pool.shutdown();
let mut v = results.lock().unwrap().clone();
v.sort();
assert_eq!(v, vec![0, 1, 2, 3, 4]);
}
#[test]
fn test_pool_more_tasks_than_workers() {
let pool = ThreadPool::new(2);
let counter = Arc::new(Mutex::new(0u32));
for _ in 0..100 {
let c = Arc::clone(&counter);
pool.execute(move || *c.lock().unwrap() += 1);
}
pool.shutdown();
assert_eq!(*counter.lock().unwrap(), 100);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_squares_all_computed() {
let squares = pool_squares();
assert_eq!(squares.len(), 20);
// Sum of i^2 for i=1..20 = 2870
let sum: i64 = squares.iter().sum();
assert_eq!(sum, 2870);
}
#[test]
fn test_pool_with_results() {
let results = pool_with_results(vec![1, 2, 3, 4, 5]);
assert_eq!(results, vec![1, 4, 9, 16, 25]);
}
#[test]
fn test_pool_empty_tasks() {
let pool = ThreadPool::new(2);
pool.shutdown(); // should not hang
}
#[test]
fn test_pool_single_worker() {
let pool = ThreadPool::new(1);
let results = Arc::new(Mutex::new(Vec::new()));
for i in 0..5 {
let r = Arc::clone(&results);
pool.execute(move || r.lock().unwrap().push(i));
}
pool.shutdown();
let mut v = results.lock().unwrap().clone();
v.sort();
assert_eq!(v, vec![0, 1, 2, 3, 4]);
}
#[test]
fn test_pool_more_tasks_than_workers() {
let pool = ThreadPool::new(2);
let counter = Arc::new(Mutex::new(0u32));
for _ in 0..100 {
let c = Arc::clone(&counter);
pool.execute(move || *c.lock().unwrap() += 1);
}
pool.shutdown();
assert_eq!(*counter.lock().unwrap(), 100);
}
}
Deep Comparison
Thread Pool / Work Queue — Comparison
Core Insight
A thread pool reuses a fixed number of threads for many tasks, avoiding thread-creation overhead. The shared queue distributes work; each worker races to grab the next task. Shutdown = close the channel.
OCaml Approach
Queue + Mutex + Condition for the work channelrecv_work (blocks on condition variable)close_chan sets closed = true + broadcasts to wake all workersNone on closed+empty channel and exitunit -> unit closuresRust Approach
mpsc::channel::<Task>() where Task = Box<dyn FnOnce() + Send>Arc<Mutex<Receiver<Task>>> — workers compete to lock and receiveSender to close channel — workers get Err from recv() and breakJoinHandle collected; shutdown() joins all workersComparison Table
| Concept | OCaml | Rust |
|---|---|---|
| Task type | unit -> unit | Box<dyn FnOnce() + Send + 'static> |
| Shared queue | Queue + Mutex + Condition | mpsc::channel + Arc<Mutex<Rx>> |
| Worker loop | while recv_work ... do task () | loop { lock.recv().ok_or_else(break) } |
| Shutdown signal | close_chan + condition broadcast | Drop Sender — channel closes |
| Worker count | List.init n Thread.create | (0..n).map(spawn).collect() |
| Result collection | Mutex-protected list | Separate mpsc::channel or Mutex<Vec> |
| Production version | Domain pool (OCaml 5) | Rayon / tokio |
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
submit_with_result<T: Send + 'static>(f: FnOnce() -> T) -> impl Future<Output=T> using oneshot channels.ThreadPool::shutdown_graceful() that waits for all queued tasks to complete before joining workers.BinaryHeap<(Priority, Task)> instead of the FIFO channel.Arc<AtomicUsize> and expose pending_tasks() -> usize.rayon::ThreadPool for 10,000 CPU-bound tasks.