ExamplesBy LevelBy TopicLearning Paths
446 Fundamental

446: Thread Pool Pattern

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "446: Thread Pool Pattern" functional Rust example. Difficulty level: Fundamental. Key concepts covered: Functional Programming. Spawning a new OS thread for each incoming request is expensive: thread creation takes ~100Ξs and each thread consumes ~8MB of stack space by default. Key difference from OCaml: 1. **Job type**: Rust uses `Box<dyn FnOnce() + Send>` for type

Tutorial

The Problem

Spawning a new OS thread for each incoming request is expensive: thread creation takes ~100Ξs and each thread consumes ~8MB of stack space by default. A thread pool pre-creates N worker threads that loop waiting for jobs, processing each job from a shared queue. The rayon crate provides a global thread pool, but understanding the implementation reveals the fundamental pattern: a channel as work queue, Arc<Mutex<Receiver>> for shared job pickup, and JoinHandles for graceful shutdown.

Thread pools power web servers (tokio's blocking thread pool), HTTP clients, database connection management, and any system with variable-rate work items.

🎯 Learning Outcomes

  • â€Ē Understand why thread pools outperform per-request thread spawning
  • â€Ē Learn how Arc<Mutex<Receiver<Job>>> shares a single channel receiver across workers
  • â€Ē See how Box<dyn FnOnce() + Send + 'static> erases job types into a uniform queue entry
  • â€Ē Understand graceful shutdown: dropping the sender closes the channel, workers exit their loops
  • â€Ē Learn how the Drop impl on ThreadPool ensures clean shutdown
  • Code Example

    type Job = Box<dyn FnOnce() + Send + 'static>;
    
    pub struct ThreadPool {
        workers: Vec<JoinHandle<()>>,
        sender: Option<Sender<Job>>,
    }
    
    impl ThreadPool {
        pub fn new(size: usize) -> Self {
            let (sender, receiver) = mpsc::channel::<Job>();
            let receiver = Arc::new(Mutex::new(receiver));
            
            let workers = (0..size).map(|_| {
                let rx = Arc::clone(&receiver);
                thread::spawn(move || loop {
                    match rx.lock().unwrap().recv() {
                        Ok(job) => job(),
                        Err(_) => break,
                    }
                })
            }).collect();
            
            ThreadPool { workers, sender: Some(sender) }
        }
    }

    Key Differences

  • Job type: Rust uses Box<dyn FnOnce() + Send> for type-erased closures; OCaml uses unit -> unit closures.
  • Worker count: Rust thread pools use OS threads; OCaml 5.x's domain pools use parallelism domains.
  • Graceful shutdown: Rust uses channel close for shutdown signal; OCaml typically uses a sentinel value or explicit stop flag.
  • Rayon alternative: rayon::ThreadPool provides work-stealing on top of OS threads for better load balancing than the simple queue approach.
  • OCaml Approach

    OCaml's Domainslib provides Task.pool — a domain pool (OCaml 5.x's parallel unit) for distributing work. Task.run pool (fun () -> computation) submits work. For OCaml 4.x threads, Thread_pool libraries (like moonpool) provide similar functionality. The Lwt and Async libraries have their own thread pool abstractions for offloading blocking work from their event loops.

    Full Source

    #![allow(clippy::all)]
    //! # Thread Pool Pattern — Reusable Worker Threads
    //!
    //! A pool of worker threads that process jobs from a shared queue,
    //! avoiding the overhead of spawning threads per task.
    
    use std::sync::mpsc::{self, Sender};
    use std::sync::{Arc, Mutex};
    use std::thread::{self, JoinHandle};
    
    /// A job is a boxed closure that runs once
    type Job = Box<dyn FnOnce() + Send + 'static>;
    
    /// Approach 1: Basic thread pool with channel-based job queue
    pub struct ThreadPool {
        workers: Vec<JoinHandle<()>>,
        sender: Option<Sender<Job>>,
    }
    
    impl ThreadPool {
        /// Create a new thread pool with `size` workers
        pub fn new(size: usize) -> Self {
            assert!(size > 0, "Thread pool must have at least one worker");
    
            let (sender, receiver) = mpsc::channel::<Job>();
            let receiver = Arc::new(Mutex::new(receiver));
    
            let workers = (0..size)
                .map(|_id| {
                    let rx = Arc::clone(&receiver);
                    thread::spawn(move || loop {
                        let job = rx.lock().unwrap().recv();
                        match job {
                            Ok(job) => job(),
                            Err(_) => break, // Channel closed
                        }
                    })
                })
                .collect();
    
            ThreadPool {
                workers,
                sender: Some(sender),
            }
        }
    
        /// Submit a job to be executed by a worker
        pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static,
        {
            let job = Box::new(f);
            self.sender.as_ref().unwrap().send(job).unwrap();
        }
    
        /// Get the number of workers
        pub fn size(&self) -> usize {
            self.workers.len()
        }
    }
    
    impl Drop for ThreadPool {
        fn drop(&mut self) {
            // Drop sender to close channel
            drop(self.sender.take());
    
            // Wait for all workers to finish
            for worker in self.workers.drain(..) {
                worker.join().unwrap();
            }
        }
    }
    
    /// Approach 2: Scoped thread pool for borrowed data
    pub fn scoped_pool<T, R, F>(data: &[T], num_threads: usize, f: F) -> Vec<R>
    where
        T: Sync,
        R: Send + Default + Clone,
        F: Fn(&T) -> R + Sync,
    {
        let chunk_size = (data.len() + num_threads - 1) / num_threads;
        let mut results = vec![R::default(); data.len()];
    
        thread::scope(|s| {
            for (chunk_data, chunk_results) in
                data.chunks(chunk_size).zip(results.chunks_mut(chunk_size))
            {
                s.spawn(|| {
                    for (input, output) in chunk_data.iter().zip(chunk_results.iter_mut()) {
                        *output = f(input);
                    }
                });
            }
        });
    
        results
    }
    
    /// Approach 3: Simple parallel map using thread pool
    pub fn parallel_map<T, U, F>(pool: &ThreadPool, data: Vec<T>, f: F) -> Vec<U>
    where
        T: Send + 'static,
        U: Send + std::fmt::Debug + 'static,
        F: Fn(T) -> U + Send + Sync + Clone + 'static,
    {
        let results: Arc<Mutex<Vec<Option<(usize, U)>>>> = Arc::new(Mutex::new(Vec::new()));
    
        for (i, item) in data.into_iter().enumerate() {
            let f = f.clone();
            let results = Arc::clone(&results);
            pool.execute(move || {
                let result = f(item);
                results.lock().unwrap().push(Some((i, result)));
            });
        }
    
        // Wait for results (this is a simplified approach)
        drop(pool);
    
        let mut collected: Vec<_> = Arc::try_unwrap(results)
            .unwrap()
            .into_inner()
            .unwrap()
            .into_iter()
            .flatten()
            .collect();
    
        collected.sort_by_key(|(i, _)| *i);
        collected.into_iter().map(|(_, v)| v).collect()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        use std::sync::atomic::{AtomicUsize, Ordering};
    
        #[test]
        fn test_pool_executes_all_jobs() {
            let count = Arc::new(AtomicUsize::new(0));
    
            {
                let pool = ThreadPool::new(4);
    
                for _ in 0..10 {
                    let count = Arc::clone(&count);
                    pool.execute(move || {
                        count.fetch_add(1, Ordering::SeqCst);
                    });
                }
            } // Pool dropped, all jobs complete
    
            assert_eq!(count.load(Ordering::SeqCst), 10);
        }
    
        #[test]
        fn test_pool_size() {
            let pool = ThreadPool::new(4);
            assert_eq!(pool.size(), 4);
        }
    
        #[test]
        fn test_multiple_pools() {
            let count = Arc::new(AtomicUsize::new(0));
    
            {
                let pool1 = ThreadPool::new(2);
                let pool2 = ThreadPool::new(2);
    
                for _ in 0..5 {
                    let c = Arc::clone(&count);
                    pool1.execute(move || {
                        c.fetch_add(1, Ordering::SeqCst);
                    });
                }
                for _ in 0..5 {
                    let c = Arc::clone(&count);
                    pool2.execute(move || {
                        c.fetch_add(1, Ordering::SeqCst);
                    });
                }
            }
    
            assert_eq!(count.load(Ordering::SeqCst), 10);
        }
    
        #[test]
        fn test_scoped_pool() {
            let data: Vec<i32> = (1..=10).collect();
            let results = scoped_pool(&data, 4, |x| x * x);
            assert_eq!(results, vec![1, 4, 9, 16, 25, 36, 49, 64, 81, 100]);
        }
    
        #[test]
        fn test_results_collected() {
            let results = Arc::new(Mutex::new(Vec::new()));
    
            {
                let pool = ThreadPool::new(2);
                for i in 0..5 {
                    let r = Arc::clone(&results);
                    pool.execute(move || {
                        r.lock().unwrap().push(i * i);
                    });
                }
            }
    
            let mut collected = results.lock().unwrap().clone();
            collected.sort();
            assert_eq!(collected, vec![0, 1, 4, 9, 16]);
        }
    
        #[test]
        #[should_panic]
        fn test_zero_workers_panics() {
            let _ = ThreadPool::new(0);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
        use std::sync::atomic::{AtomicUsize, Ordering};
    
        #[test]
        fn test_pool_executes_all_jobs() {
            let count = Arc::new(AtomicUsize::new(0));
    
            {
                let pool = ThreadPool::new(4);
    
                for _ in 0..10 {
                    let count = Arc::clone(&count);
                    pool.execute(move || {
                        count.fetch_add(1, Ordering::SeqCst);
                    });
                }
            } // Pool dropped, all jobs complete
    
            assert_eq!(count.load(Ordering::SeqCst), 10);
        }
    
        #[test]
        fn test_pool_size() {
            let pool = ThreadPool::new(4);
            assert_eq!(pool.size(), 4);
        }
    
        #[test]
        fn test_multiple_pools() {
            let count = Arc::new(AtomicUsize::new(0));
    
            {
                let pool1 = ThreadPool::new(2);
                let pool2 = ThreadPool::new(2);
    
                for _ in 0..5 {
                    let c = Arc::clone(&count);
                    pool1.execute(move || {
                        c.fetch_add(1, Ordering::SeqCst);
                    });
                }
                for _ in 0..5 {
                    let c = Arc::clone(&count);
                    pool2.execute(move || {
                        c.fetch_add(1, Ordering::SeqCst);
                    });
                }
            }
    
            assert_eq!(count.load(Ordering::SeqCst), 10);
        }
    
        #[test]
        fn test_scoped_pool() {
            let data: Vec<i32> = (1..=10).collect();
            let results = scoped_pool(&data, 4, |x| x * x);
            assert_eq!(results, vec![1, 4, 9, 16, 25, 36, 49, 64, 81, 100]);
        }
    
        #[test]
        fn test_results_collected() {
            let results = Arc::new(Mutex::new(Vec::new()));
    
            {
                let pool = ThreadPool::new(2);
                for i in 0..5 {
                    let r = Arc::clone(&results);
                    pool.execute(move || {
                        r.lock().unwrap().push(i * i);
                    });
                }
            }
    
            let mut collected = results.lock().unwrap().clone();
            collected.sort();
            assert_eq!(collected, vec![0, 1, 4, 9, 16]);
        }
    
        #[test]
        #[should_panic]
        fn test_zero_workers_panics() {
            let _ = ThreadPool::new(0);
        }
    }

    Deep Comparison

    OCaml vs Rust: Thread Pool Pattern

    Thread Pool Creation

    OCaml

    let make_pool n =
      let q = Queue.create () in
      let m = Mutex.create () in
      let c = Condition.create () in
      let stop = ref false in
      let workers = Array.init n (fun _ ->
        Thread.create (fun () ->
          while not !stop do
            Mutex.lock m;
            while Queue.is_empty q && not !stop do
              Condition.wait c m
            done;
            if not (Queue.is_empty q) then
              let f = Queue.pop q in
              Mutex.unlock m; f ()
            else Mutex.unlock m
          done) ()
      ) in
      (* returns submit and shutdown functions *)
    

    Rust

    type Job = Box<dyn FnOnce() + Send + 'static>;
    
    pub struct ThreadPool {
        workers: Vec<JoinHandle<()>>,
        sender: Option<Sender<Job>>,
    }
    
    impl ThreadPool {
        pub fn new(size: usize) -> Self {
            let (sender, receiver) = mpsc::channel::<Job>();
            let receiver = Arc::new(Mutex::new(receiver));
            
            let workers = (0..size).map(|_| {
                let rx = Arc::clone(&receiver);
                thread::spawn(move || loop {
                    match rx.lock().unwrap().recv() {
                        Ok(job) => job(),
                        Err(_) => break,
                    }
                })
            }).collect();
            
            ThreadPool { workers, sender: Some(sender) }
        }
    }
    

    Key Differences

    FeatureOCamlRust
    Job typeunit -> unitBox<dyn FnOnce() + Send + 'static>
    ShutdownManual stop flag + broadcastDrop sender → recv returns Err
    Thread safetyMutex + ConditionMPSC channel + Arc
    CleanupManual joinDrop trait implementation

    Job Submission

    OCaml

    let submit f =
      Mutex.lock m;
      Queue.push f q;
      Condition.signal c;
      Mutex.unlock m
    

    Rust

    pub fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
        self.sender.as_ref().unwrap().send(Box::new(f)).unwrap();
    }
    

    Graceful Shutdown

    OCaml

    let shutdown () =
      Mutex.lock m;
      stop := true;
      Condition.broadcast c;
      Mutex.unlock m;
      Array.iter Thread.join workers
    

    Rust

    impl Drop for ThreadPool {
        fn drop(&mut self) {
            drop(self.sender.take());  // Close channel
            for w in self.workers.drain(..) {
                w.join().unwrap();
            }
        }
    }
    

    Exercises

  • Priority queue: Replace the mpsc::channel with Arc<Mutex<BinaryHeap<(Priority, Job)>>>. Support execute_with_priority(priority: u8, f: impl FnOnce() + Send) and verify higher-priority jobs execute first.
  • Worker metrics: Add per-worker job counters using Arc<AtomicU64>. Expose fn job_counts(&self) -> Vec<u64> that returns each worker's processed job count. Verify work is reasonably balanced.
  • Timeout jobs: Extend ThreadPool::execute to accept execute_with_timeout(timeout: Duration, f: impl FnOnce() + Send). Spawn a monitoring thread that kills long-running jobs (simulated by tracking active jobs).
  • Open Source Repos