446: Thread Pool Pattern
Tutorial Video
Text description (accessibility)
This video demonstrates the "446: Thread Pool Pattern" functional Rust example. Difficulty level: Fundamental. Key concepts covered: Functional Programming. Spawning a new OS thread for each incoming request is expensive: thread creation takes ~100Ξs and each thread consumes ~8MB of stack space by default. Key difference from OCaml: 1. **Job type**: Rust uses `Box<dyn FnOnce() + Send>` for type
Tutorial
The Problem
Spawning a new OS thread for each incoming request is expensive: thread creation takes ~100Ξs and each thread consumes ~8MB of stack space by default. A thread pool pre-creates N worker threads that loop waiting for jobs, processing each job from a shared queue. The rayon crate provides a global thread pool, but understanding the implementation reveals the fundamental pattern: a channel as work queue, Arc<Mutex<Receiver>> for shared job pickup, and JoinHandles for graceful shutdown.
Thread pools power web servers (tokio's blocking thread pool), HTTP clients, database connection management, and any system with variable-rate work items.
🎯 Learning Outcomes
Arc<Mutex<Receiver<Job>>> shares a single channel receiver across workersBox<dyn FnOnce() + Send + 'static> erases job types into a uniform queue entryDrop impl on ThreadPool ensures clean shutdownCode Example
type Job = Box<dyn FnOnce() + Send + 'static>;
pub struct ThreadPool {
workers: Vec<JoinHandle<()>>,
sender: Option<Sender<Job>>,
}
impl ThreadPool {
pub fn new(size: usize) -> Self {
let (sender, receiver) = mpsc::channel::<Job>();
let receiver = Arc::new(Mutex::new(receiver));
let workers = (0..size).map(|_| {
let rx = Arc::clone(&receiver);
thread::spawn(move || loop {
match rx.lock().unwrap().recv() {
Ok(job) => job(),
Err(_) => break,
}
})
}).collect();
ThreadPool { workers, sender: Some(sender) }
}
}Key Differences
Box<dyn FnOnce() + Send> for type-erased closures; OCaml uses unit -> unit closures.rayon::ThreadPool provides work-stealing on top of OS threads for better load balancing than the simple queue approach.OCaml Approach
OCaml's Domainslib provides Task.pool â a domain pool (OCaml 5.x's parallel unit) for distributing work. Task.run pool (fun () -> computation) submits work. For OCaml 4.x threads, Thread_pool libraries (like moonpool) provide similar functionality. The Lwt and Async libraries have their own thread pool abstractions for offloading blocking work from their event loops.
Full Source
#![allow(clippy::all)]
//! # Thread Pool Pattern â Reusable Worker Threads
//!
//! A pool of worker threads that process jobs from a shared queue,
//! avoiding the overhead of spawning threads per task.
use std::sync::mpsc::{self, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
/// A job is a boxed closure that runs once
type Job = Box<dyn FnOnce() + Send + 'static>;
/// Approach 1: Basic thread pool with channel-based job queue
pub struct ThreadPool {
workers: Vec<JoinHandle<()>>,
sender: Option<Sender<Job>>,
}
impl ThreadPool {
/// Create a new thread pool with `size` workers
pub fn new(size: usize) -> Self {
assert!(size > 0, "Thread pool must have at least one worker");
let (sender, receiver) = mpsc::channel::<Job>();
let receiver = Arc::new(Mutex::new(receiver));
let workers = (0..size)
.map(|_id| {
let rx = Arc::clone(&receiver);
thread::spawn(move || loop {
let job = rx.lock().unwrap().recv();
match job {
Ok(job) => job(),
Err(_) => break, // Channel closed
}
})
})
.collect();
ThreadPool {
workers,
sender: Some(sender),
}
}
/// Submit a job to be executed by a worker
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
/// Get the number of workers
pub fn size(&self) -> usize {
self.workers.len()
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
// Drop sender to close channel
drop(self.sender.take());
// Wait for all workers to finish
for worker in self.workers.drain(..) {
worker.join().unwrap();
}
}
}
/// Approach 2: Scoped thread pool for borrowed data
pub fn scoped_pool<T, R, F>(data: &[T], num_threads: usize, f: F) -> Vec<R>
where
T: Sync,
R: Send + Default + Clone,
F: Fn(&T) -> R + Sync,
{
let chunk_size = (data.len() + num_threads - 1) / num_threads;
let mut results = vec![R::default(); data.len()];
thread::scope(|s| {
for (chunk_data, chunk_results) in
data.chunks(chunk_size).zip(results.chunks_mut(chunk_size))
{
s.spawn(|| {
for (input, output) in chunk_data.iter().zip(chunk_results.iter_mut()) {
*output = f(input);
}
});
}
});
results
}
/// Approach 3: Simple parallel map using thread pool
pub fn parallel_map<T, U, F>(pool: &ThreadPool, data: Vec<T>, f: F) -> Vec<U>
where
T: Send + 'static,
U: Send + std::fmt::Debug + 'static,
F: Fn(T) -> U + Send + Sync + Clone + 'static,
{
let results: Arc<Mutex<Vec<Option<(usize, U)>>>> = Arc::new(Mutex::new(Vec::new()));
for (i, item) in data.into_iter().enumerate() {
let f = f.clone();
let results = Arc::clone(&results);
pool.execute(move || {
let result = f(item);
results.lock().unwrap().push(Some((i, result)));
});
}
// Wait for results (this is a simplified approach)
drop(pool);
let mut collected: Vec<_> = Arc::try_unwrap(results)
.unwrap()
.into_inner()
.unwrap()
.into_iter()
.flatten()
.collect();
collected.sort_by_key(|(i, _)| *i);
collected.into_iter().map(|(_, v)| v).collect()
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_pool_executes_all_jobs() {
let count = Arc::new(AtomicUsize::new(0));
{
let pool = ThreadPool::new(4);
for _ in 0..10 {
let count = Arc::clone(&count);
pool.execute(move || {
count.fetch_add(1, Ordering::SeqCst);
});
}
} // Pool dropped, all jobs complete
assert_eq!(count.load(Ordering::SeqCst), 10);
}
#[test]
fn test_pool_size() {
let pool = ThreadPool::new(4);
assert_eq!(pool.size(), 4);
}
#[test]
fn test_multiple_pools() {
let count = Arc::new(AtomicUsize::new(0));
{
let pool1 = ThreadPool::new(2);
let pool2 = ThreadPool::new(2);
for _ in 0..5 {
let c = Arc::clone(&count);
pool1.execute(move || {
c.fetch_add(1, Ordering::SeqCst);
});
}
for _ in 0..5 {
let c = Arc::clone(&count);
pool2.execute(move || {
c.fetch_add(1, Ordering::SeqCst);
});
}
}
assert_eq!(count.load(Ordering::SeqCst), 10);
}
#[test]
fn test_scoped_pool() {
let data: Vec<i32> = (1..=10).collect();
let results = scoped_pool(&data, 4, |x| x * x);
assert_eq!(results, vec![1, 4, 9, 16, 25, 36, 49, 64, 81, 100]);
}
#[test]
fn test_results_collected() {
let results = Arc::new(Mutex::new(Vec::new()));
{
let pool = ThreadPool::new(2);
for i in 0..5 {
let r = Arc::clone(&results);
pool.execute(move || {
r.lock().unwrap().push(i * i);
});
}
}
let mut collected = results.lock().unwrap().clone();
collected.sort();
assert_eq!(collected, vec![0, 1, 4, 9, 16]);
}
#[test]
#[should_panic]
fn test_zero_workers_panics() {
let _ = ThreadPool::new(0);
}
}#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_pool_executes_all_jobs() {
let count = Arc::new(AtomicUsize::new(0));
{
let pool = ThreadPool::new(4);
for _ in 0..10 {
let count = Arc::clone(&count);
pool.execute(move || {
count.fetch_add(1, Ordering::SeqCst);
});
}
} // Pool dropped, all jobs complete
assert_eq!(count.load(Ordering::SeqCst), 10);
}
#[test]
fn test_pool_size() {
let pool = ThreadPool::new(4);
assert_eq!(pool.size(), 4);
}
#[test]
fn test_multiple_pools() {
let count = Arc::new(AtomicUsize::new(0));
{
let pool1 = ThreadPool::new(2);
let pool2 = ThreadPool::new(2);
for _ in 0..5 {
let c = Arc::clone(&count);
pool1.execute(move || {
c.fetch_add(1, Ordering::SeqCst);
});
}
for _ in 0..5 {
let c = Arc::clone(&count);
pool2.execute(move || {
c.fetch_add(1, Ordering::SeqCst);
});
}
}
assert_eq!(count.load(Ordering::SeqCst), 10);
}
#[test]
fn test_scoped_pool() {
let data: Vec<i32> = (1..=10).collect();
let results = scoped_pool(&data, 4, |x| x * x);
assert_eq!(results, vec![1, 4, 9, 16, 25, 36, 49, 64, 81, 100]);
}
#[test]
fn test_results_collected() {
let results = Arc::new(Mutex::new(Vec::new()));
{
let pool = ThreadPool::new(2);
for i in 0..5 {
let r = Arc::clone(&results);
pool.execute(move || {
r.lock().unwrap().push(i * i);
});
}
}
let mut collected = results.lock().unwrap().clone();
collected.sort();
assert_eq!(collected, vec![0, 1, 4, 9, 16]);
}
#[test]
#[should_panic]
fn test_zero_workers_panics() {
let _ = ThreadPool::new(0);
}
}
Deep Comparison
OCaml vs Rust: Thread Pool Pattern
Thread Pool Creation
OCaml
let make_pool n =
let q = Queue.create () in
let m = Mutex.create () in
let c = Condition.create () in
let stop = ref false in
let workers = Array.init n (fun _ ->
Thread.create (fun () ->
while not !stop do
Mutex.lock m;
while Queue.is_empty q && not !stop do
Condition.wait c m
done;
if not (Queue.is_empty q) then
let f = Queue.pop q in
Mutex.unlock m; f ()
else Mutex.unlock m
done) ()
) in
(* returns submit and shutdown functions *)
Rust
type Job = Box<dyn FnOnce() + Send + 'static>;
pub struct ThreadPool {
workers: Vec<JoinHandle<()>>,
sender: Option<Sender<Job>>,
}
impl ThreadPool {
pub fn new(size: usize) -> Self {
let (sender, receiver) = mpsc::channel::<Job>();
let receiver = Arc::new(Mutex::new(receiver));
let workers = (0..size).map(|_| {
let rx = Arc::clone(&receiver);
thread::spawn(move || loop {
match rx.lock().unwrap().recv() {
Ok(job) => job(),
Err(_) => break,
}
})
}).collect();
ThreadPool { workers, sender: Some(sender) }
}
}
Key Differences
| Feature | OCaml | Rust |
|---|---|---|
| Job type | unit -> unit | Box<dyn FnOnce() + Send + 'static> |
| Shutdown | Manual stop flag + broadcast | Drop sender â recv returns Err |
| Thread safety | Mutex + Condition | MPSC channel + Arc |
| Cleanup | Manual join | Drop trait implementation |
Job Submission
OCaml
let submit f =
Mutex.lock m;
Queue.push f q;
Condition.signal c;
Mutex.unlock m
Rust
pub fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
self.sender.as_ref().unwrap().send(Box::new(f)).unwrap();
}
Graceful Shutdown
OCaml
let shutdown () =
Mutex.lock m;
stop := true;
Condition.broadcast c;
Mutex.unlock m;
Array.iter Thread.join workers
Rust
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take()); // Close channel
for w in self.workers.drain(..) {
w.join().unwrap();
}
}
}
Exercises
mpsc::channel with Arc<Mutex<BinaryHeap<(Priority, Job)>>>. Support execute_with_priority(priority: u8, f: impl FnOnce() + Send) and verify higher-priority jobs execute first.Arc<AtomicU64>. Expose fn job_counts(&self) -> Vec<u64> that returns each worker's processed job count. Verify work is reasonably balanced.ThreadPool::execute to accept execute_with_timeout(timeout: Duration, f: impl FnOnce() + Send). Spawn a monitoring thread that kills long-running jobs (simulated by tracking active jobs).