ExamplesBy LevelBy TopicLearning Paths
993 Intermediate

993 Work Queue

Functional Programming

Tutorial

The Problem

Implement a thread pool / work queue with N worker threads sharing a single mpsc::Receiver<Task> wrapped in Arc<Mutex<Receiver>>. Worker threads loop: lock the receiver, dequeue one task, unlock, execute. When the ThreadPool is dropped, the channel closes and workers exit cleanly.

🎯 Learning Outcomes

  • • Define type Task = Box<dyn FnOnce() + Send + 'static> for type-erased, owned closures
  • • Share a single Receiver<Task> across all workers using Arc<Mutex<Receiver<Task>>>
  • • Workers loop: lockrecv() (blocks for next task) → unlock → execute task
  • • Implement Drop for ThreadPool — joining all workers ensures tasks complete before the pool is destroyed
  • • Understand why Mutex is needed: Receiver<T> is not Sync (only one thread can receive at a time)
  • Code Example

    #![allow(clippy::all)]
    // 993: Thread Pool / Work Queue
    // Fixed N workers consuming tasks from a shared mpsc channel
    
    use std::sync::{mpsc, Arc, Mutex};
    use std::thread;
    
    type Task = Box<dyn FnOnce() + Send + 'static>;
    
    struct ThreadPool {
        sender: mpsc::Sender<Task>,
        workers: Vec<thread::JoinHandle<()>>,
    }
    
    impl ThreadPool {
        fn new(size: usize) -> Self {
            assert!(size > 0);
            let (sender, receiver) = mpsc::channel::<Task>();
            // Wrap receiver in Arc<Mutex> so all workers can share it
            let receiver = Arc::new(Mutex::new(receiver));
    
            let workers = (0..size)
                .map(|_| {
                    let rx = Arc::clone(&receiver);
                    thread::spawn(move || {
                        // Each worker loops: lock, get task, unlock, run task
                        loop {
                            let task = {
                                let lock = rx.lock().unwrap();
                                lock.recv() // blocks until task arrives or channel closes
                            };
                            match task {
                                Ok(f) => f(),
                                Err(_) => break, // channel closed → exit
                            }
                        }
                    })
                })
                .collect();
    
            ThreadPool { sender, workers }
        }
    
        fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
            self.sender.send(Box::new(f)).unwrap();
        }
    
        fn shutdown(self) {
            drop(self.sender); // close channel → workers see Err and break
            for w in self.workers {
                w.join().unwrap();
            }
        }
    }
    
    // --- Approach 1: Submit tasks that collect results ---
    fn pool_squares() -> Vec<i64> {
        let pool = ThreadPool::new(4);
        let results = Arc::new(Mutex::new(Vec::new()));
    
        for i in 1i64..=20 {
            let results = Arc::clone(&results);
            pool.execute(move || {
                results.lock().unwrap().push(i * i);
            });
        }
    
        pool.shutdown();
        let mut v = results.lock().unwrap().clone();
        v.sort();
        v
    }
    
    // --- Approach 2: Work queue with return values via channel ---
    fn pool_with_results(inputs: Vec<i32>) -> Vec<i32> {
        let pool = ThreadPool::new(3);
        let (tx, rx) = mpsc::channel::<i32>();
    
        let n = inputs.len();
        for x in inputs {
            let tx = tx.clone();
            pool.execute(move || {
                tx.send(x * x).unwrap();
            });
        }
        drop(tx); // close sender side
        pool.shutdown();
    
        let mut results: Vec<i32> = rx.iter().collect();
        // Ensure we got all results (pool shutdown closed the channel)
        assert_eq!(results.len(), n);
        results.sort();
        results
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_pool_squares_all_computed() {
            let squares = pool_squares();
            assert_eq!(squares.len(), 20);
            // Sum of i^2 for i=1..20 = 2870
            let sum: i64 = squares.iter().sum();
            assert_eq!(sum, 2870);
        }
    
        #[test]
        fn test_pool_with_results() {
            let results = pool_with_results(vec![1, 2, 3, 4, 5]);
            assert_eq!(results, vec![1, 4, 9, 16, 25]);
        }
    
        #[test]
        fn test_pool_empty_tasks() {
            let pool = ThreadPool::new(2);
            pool.shutdown(); // should not hang
        }
    
        #[test]
        fn test_pool_single_worker() {
            let pool = ThreadPool::new(1);
            let results = Arc::new(Mutex::new(Vec::new()));
            for i in 0..5 {
                let r = Arc::clone(&results);
                pool.execute(move || r.lock().unwrap().push(i));
            }
            pool.shutdown();
            let mut v = results.lock().unwrap().clone();
            v.sort();
            assert_eq!(v, vec![0, 1, 2, 3, 4]);
        }
    
        #[test]
        fn test_pool_more_tasks_than_workers() {
            let pool = ThreadPool::new(2);
            let counter = Arc::new(Mutex::new(0u32));
            for _ in 0..100 {
                let c = Arc::clone(&counter);
                pool.execute(move || *c.lock().unwrap() += 1);
            }
            pool.shutdown();
            assert_eq!(*counter.lock().unwrap(), 100);
        }
    }

    Key Differences

    AspectRustOCaml
    Task typeBox<dyn FnOnce() + Send + 'static>unit -> unit function
    Shared queueArc<Mutex<Receiver<Task>>>Queue + Mutex + Condition
    Channel closeSender drop propagates to recv()Manual running = false + broadcast
    Drop for joinimpl Drop for ThreadPool { ... }Explicit shutdown + join

    The Arc<Mutex<Receiver>> trick is idiomatic Rust for fan-out from a single channel to multiple consumers. The lock is held only during the recv() call (microseconds), so contention is minimal.

    OCaml Approach

    (* OCaml 5.0+: Domainslib.Task.pool *)
    let pool = Domainslib.Task.setup_pool ~num_domains:4 ()
    
    let submit_task pool f =
      Domainslib.Task.run pool (fun () -> f ())
    
    (* Manual thread pool with Queue + Mutex + Condition *)
    type 'a pool = {
      queue: 'a Queue.t;
      mutex: Mutex.t;
      cond: Condition.t;
      mutable running: bool;
    }
    
    let dequeue p =
      Mutex.lock p.mutex;
      while Queue.is_empty p.queue && p.running do
        Condition.wait p.cond p.mutex
      done;
      let task = if Queue.is_empty p.queue then None else Some (Queue.pop p.queue) in
      Mutex.unlock p.mutex;
      task
    

    OCaml's Domainslib.Task provides a production-ready parallel task pool for OCaml 5.0+. The manual implementation mirrors Rust's Arc<Mutex<Receiver>> pattern using Queue + Mutex + Condition.

    Full Source

    #![allow(clippy::all)]
    // 993: Thread Pool / Work Queue
    // Fixed N workers consuming tasks from a shared mpsc channel
    
    use std::sync::{mpsc, Arc, Mutex};
    use std::thread;
    
    type Task = Box<dyn FnOnce() + Send + 'static>;
    
    struct ThreadPool {
        sender: mpsc::Sender<Task>,
        workers: Vec<thread::JoinHandle<()>>,
    }
    
    impl ThreadPool {
        fn new(size: usize) -> Self {
            assert!(size > 0);
            let (sender, receiver) = mpsc::channel::<Task>();
            // Wrap receiver in Arc<Mutex> so all workers can share it
            let receiver = Arc::new(Mutex::new(receiver));
    
            let workers = (0..size)
                .map(|_| {
                    let rx = Arc::clone(&receiver);
                    thread::spawn(move || {
                        // Each worker loops: lock, get task, unlock, run task
                        loop {
                            let task = {
                                let lock = rx.lock().unwrap();
                                lock.recv() // blocks until task arrives or channel closes
                            };
                            match task {
                                Ok(f) => f(),
                                Err(_) => break, // channel closed → exit
                            }
                        }
                    })
                })
                .collect();
    
            ThreadPool { sender, workers }
        }
    
        fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
            self.sender.send(Box::new(f)).unwrap();
        }
    
        fn shutdown(self) {
            drop(self.sender); // close channel → workers see Err and break
            for w in self.workers {
                w.join().unwrap();
            }
        }
    }
    
    // --- Approach 1: Submit tasks that collect results ---
    fn pool_squares() -> Vec<i64> {
        let pool = ThreadPool::new(4);
        let results = Arc::new(Mutex::new(Vec::new()));
    
        for i in 1i64..=20 {
            let results = Arc::clone(&results);
            pool.execute(move || {
                results.lock().unwrap().push(i * i);
            });
        }
    
        pool.shutdown();
        let mut v = results.lock().unwrap().clone();
        v.sort();
        v
    }
    
    // --- Approach 2: Work queue with return values via channel ---
    fn pool_with_results(inputs: Vec<i32>) -> Vec<i32> {
        let pool = ThreadPool::new(3);
        let (tx, rx) = mpsc::channel::<i32>();
    
        let n = inputs.len();
        for x in inputs {
            let tx = tx.clone();
            pool.execute(move || {
                tx.send(x * x).unwrap();
            });
        }
        drop(tx); // close sender side
        pool.shutdown();
    
        let mut results: Vec<i32> = rx.iter().collect();
        // Ensure we got all results (pool shutdown closed the channel)
        assert_eq!(results.len(), n);
        results.sort();
        results
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_pool_squares_all_computed() {
            let squares = pool_squares();
            assert_eq!(squares.len(), 20);
            // Sum of i^2 for i=1..20 = 2870
            let sum: i64 = squares.iter().sum();
            assert_eq!(sum, 2870);
        }
    
        #[test]
        fn test_pool_with_results() {
            let results = pool_with_results(vec![1, 2, 3, 4, 5]);
            assert_eq!(results, vec![1, 4, 9, 16, 25]);
        }
    
        #[test]
        fn test_pool_empty_tasks() {
            let pool = ThreadPool::new(2);
            pool.shutdown(); // should not hang
        }
    
        #[test]
        fn test_pool_single_worker() {
            let pool = ThreadPool::new(1);
            let results = Arc::new(Mutex::new(Vec::new()));
            for i in 0..5 {
                let r = Arc::clone(&results);
                pool.execute(move || r.lock().unwrap().push(i));
            }
            pool.shutdown();
            let mut v = results.lock().unwrap().clone();
            v.sort();
            assert_eq!(v, vec![0, 1, 2, 3, 4]);
        }
    
        #[test]
        fn test_pool_more_tasks_than_workers() {
            let pool = ThreadPool::new(2);
            let counter = Arc::new(Mutex::new(0u32));
            for _ in 0..100 {
                let c = Arc::clone(&counter);
                pool.execute(move || *c.lock().unwrap() += 1);
            }
            pool.shutdown();
            assert_eq!(*counter.lock().unwrap(), 100);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_pool_squares_all_computed() {
            let squares = pool_squares();
            assert_eq!(squares.len(), 20);
            // Sum of i^2 for i=1..20 = 2870
            let sum: i64 = squares.iter().sum();
            assert_eq!(sum, 2870);
        }
    
        #[test]
        fn test_pool_with_results() {
            let results = pool_with_results(vec![1, 2, 3, 4, 5]);
            assert_eq!(results, vec![1, 4, 9, 16, 25]);
        }
    
        #[test]
        fn test_pool_empty_tasks() {
            let pool = ThreadPool::new(2);
            pool.shutdown(); // should not hang
        }
    
        #[test]
        fn test_pool_single_worker() {
            let pool = ThreadPool::new(1);
            let results = Arc::new(Mutex::new(Vec::new()));
            for i in 0..5 {
                let r = Arc::clone(&results);
                pool.execute(move || r.lock().unwrap().push(i));
            }
            pool.shutdown();
            let mut v = results.lock().unwrap().clone();
            v.sort();
            assert_eq!(v, vec![0, 1, 2, 3, 4]);
        }
    
        #[test]
        fn test_pool_more_tasks_than_workers() {
            let pool = ThreadPool::new(2);
            let counter = Arc::new(Mutex::new(0u32));
            for _ in 0..100 {
                let c = Arc::clone(&counter);
                pool.execute(move || *c.lock().unwrap() += 1);
            }
            pool.shutdown();
            assert_eq!(*counter.lock().unwrap(), 100);
        }
    }

    Deep Comparison

    Thread Pool / Work Queue — Comparison

    Core Insight

    A thread pool reuses a fixed number of threads for many tasks, avoiding thread-creation overhead. The shared queue distributes work; each worker races to grab the next task. Shutdown = close the channel.

    OCaml Approach

  • Queue + Mutex + Condition for the work channel
  • • Each worker: loop calling recv_work (blocks on condition variable)
  • close_chan sets closed = true + broadcasts to wake all workers
  • • Workers see None on closed+empty channel and exit
  • • Tasks are unit -> unit closures
  • Rust Approach

  • mpsc::channel::<Task>() where Task = Box<dyn FnOnce() + Send>
  • Arc<Mutex<Receiver<Task>>> — workers compete to lock and receive
  • • Drop Sender to close channel — workers get Err from recv() and break
  • JoinHandle collected; shutdown() joins all workers
  • • Rayon or tokio for production use; this is the minimal std pattern
  • Comparison Table

    ConceptOCamlRust
    Task typeunit -> unitBox<dyn FnOnce() + Send + 'static>
    Shared queueQueue + Mutex + Conditionmpsc::channel + Arc<Mutex<Rx>>
    Worker loopwhile recv_work ... do task ()loop { lock.recv().ok_or_else(break) }
    Shutdown signalclose_chan + condition broadcastDrop Sender — channel closes
    Worker countList.init n Thread.create(0..n).map(spawn).collect()
    Result collectionMutex-protected listSeparate mpsc::channel or Mutex<Vec>
    Production versionDomain pool (OCaml 5)Rayon / tokio

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Add a submit_with_result<T: Send + 'static>(f: FnOnce() -> T) -> impl Future<Output=T> using oneshot channels.
  • Implement ThreadPool::shutdown_graceful() that waits for all queued tasks to complete before joining workers.
  • Add a task priority queue: use BinaryHeap<(Priority, Task)> instead of the FIFO channel.
  • Track in-flight task count with Arc<AtomicUsize> and expose pending_tasks() -> usize.
  • Benchmark the thread pool against rayon::ThreadPool for 10,000 CPU-bound tasks.
  • Open Source Repos