461: Producer-Consumer Pattern
Tutorial
The Problem
The producer-consumer pattern separates work generation from work processing: one or more producer threads generate items at their own rate; one or more consumer threads process items at their own rate. A bounded buffer between them provides decoupling and backpressure. This is the most common inter-thread communication pattern — used in web servers (request producers, handler consumers), data pipelines (readers produce, transformers consume), and I/O systems (network receivers produce, parsers consume).
Producer-consumer appears in every concurrent system: OS kernel I/O buffers, database connection pools, message queue brokers (Kafka, RabbitMQ), and async runtime task queues.
🎯 Learning Outcomes
mpsc::sync_channel(n) creates a bounded buffer with backpressuretx.clone() to send to the same channeldrop(tx) after spawning producers signals consumer shutdownArc<Mutex<Receiver>> pattern for sharing a receiver across multiple consumersCode Example
#![allow(clippy::all)]
// 461. Producer-consumer pattern
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_all_consumed() {
let (tx, rx) = mpsc::sync_channel::<u32>(4);
let rx = Arc::new(Mutex::new(rx));
let ps: Vec<_> = (0..2)
.map(|id| {
let tx = tx.clone();
thread::spawn(move || {
for i in 0..5u32 {
tx.send(id * 10 + i).unwrap();
}
})
})
.collect();
drop(tx);
let c = thread::spawn(move || rx.lock().unwrap().iter().count());
for p in ps {
p.join().unwrap();
}
assert_eq!(c.join().unwrap(), 10);
}
}Key Differences
mpsc supports multiple consumers via Arc<Mutex<Receiver>>; crossbeam::channel supports native MPMC.sync_channel(n) blocks producers when buffer full; OCaml's unbounded Queue has no built-in backpressure.mpsc::channel::<T>()); OCaml's Queue.t is polymorphic 'a Queue.t.OCaml Approach
OCaml implements producer-consumer with Queue.t + Mutex.t + Condition.t: producers enqueue under lock and signal the "not empty" condition; consumers wait on the condition, dequeue under lock, and signal "not full". Domainslib.Chan.make_bounded n provides a ready-made bounded channel for OCaml 5.x. The pattern is the same; the implementation details differ.
Full Source
#![allow(clippy::all)]
// 461. Producer-consumer pattern
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_all_consumed() {
let (tx, rx) = mpsc::sync_channel::<u32>(4);
let rx = Arc::new(Mutex::new(rx));
let ps: Vec<_> = (0..2)
.map(|id| {
let tx = tx.clone();
thread::spawn(move || {
for i in 0..5u32 {
tx.send(id * 10 + i).unwrap();
}
})
})
.collect();
drop(tx);
let c = thread::spawn(move || rx.lock().unwrap().iter().count());
for p in ps {
p.join().unwrap();
}
assert_eq!(c.join().unwrap(), 10);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_all_consumed() {
let (tx, rx) = mpsc::sync_channel::<u32>(4);
let rx = Arc::new(Mutex::new(rx));
let ps: Vec<_> = (0..2)
.map(|id| {
let tx = tx.clone();
thread::spawn(move || {
for i in 0..5u32 {
tx.send(id * 10 + i).unwrap();
}
})
})
.collect();
drop(tx);
let c = thread::spawn(move || rx.lock().unwrap().iter().count());
for p in ps {
p.join().unwrap();
}
assert_eq!(c.join().unwrap(), 10);
}
}
Exercises
thread::sleep and Instant::now(). Verify the consumer processes items at the limited rate.Arc<Mutex<Receiver>>. Plot throughput vs. consumer count to find the optimal number for your CPU.None item when done; consumers exit on receiving None and propagate it.