343: Producer-Consumer Pattern
Tutorial
The Problem
When data is produced at a variable rate and consumed at a different variable rate, a bounded buffer between them smooths out the mismatch — producers slow down when the buffer is full (backpressure), consumers wait when it's empty. This pattern, formalized by Dijkstra (1965) as the "bounded buffer problem," underlies logging pipelines, work queues, streaming data processing, and I/O scheduling. Without a bounded buffer, fast producers can exhaust memory; without the blocking discipline, consumers busy-wait and waste CPU. The Rust implementation uses Mutex + Condvar to achieve efficient blocking on both conditions.
🎯 Learning Outcomes
Mutex<VecDeque<T>> + two Condvar variablesCondvar::wait() to block a thread until a condition becomes trueCondvar::notify_one() to wake exactly one waiting thread after a state changeArc to share it between producer and consumer threadsCondvars are needed: one for "not empty", one for "not full"mpsc::sync_channelCode Example
#![allow(clippy::all)]
//! # Producer-Consumer Pattern
//! Classic concurrent pattern with bounded buffer.
use std::collections::VecDeque;
use std::sync::{mpsc, Arc, Condvar, Mutex};
use std::thread;
pub struct BoundedBuffer<T> {
data: Mutex<VecDeque<T>>,
capacity: usize,
not_empty: Condvar,
not_full: Condvar,
}
impl<T> BoundedBuffer<T> {
pub fn new(capacity: usize) -> Arc<Self> {
Arc::new(Self {
data: Mutex::new(VecDeque::new()),
capacity,
not_empty: Condvar::new(),
not_full: Condvar::new(),
})
}
pub fn put(&self, item: T) {
let mut data = self.data.lock().unwrap();
while data.len() >= self.capacity {
data = self.not_full.wait(data).unwrap();
}
data.push_back(item);
self.not_empty.notify_one();
}
pub fn take(&self) -> T {
let mut data = self.data.lock().unwrap();
while data.is_empty() {
data = self.not_empty.wait(data).unwrap();
}
let item = data.pop_front().unwrap();
self.not_full.notify_one();
item
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn producer_consumer() {
let buffer = BoundedBuffer::new(2);
let b1 = Arc::clone(&buffer);
let producer = thread::spawn(move || {
for i in 0..5 {
b1.put(i);
}
});
let mut results = Vec::new();
for _ in 0..5 {
results.push(buffer.take());
}
producer.join().unwrap();
assert_eq!(results, vec![0, 1, 2, 3, 4]);
}
}Key Differences
| Aspect | Rust Condvar | OCaml Condition |
|---|---|---|
| API style | Method on Condvar, takes guard | Free function, takes mutex |
| Guard integration | wait takes and returns MutexGuard | Separate lock/unlock calls |
| Spurious wakeup | Must use while loop | Must use while loop |
| Simpler alternative | mpsc::sync_channel(capacity) | Event.channel (synchronous) |
| Type safety | T: Send required for cross-thread | Polymorphic, GC handles it |
OCaml Approach
OCaml's Mutex + Condition maps directly:
type 'a buffer = {
data: 'a Queue.t;
capacity: int;
not_empty: Condition.t;
not_full: Condition.t;
mutex: Mutex.t;
}
let put buf item =
Mutex.lock buf.mutex;
while Queue.length buf.data >= buf.capacity do
Condition.wait buf.not_full buf.mutex
done;
Queue.push item buf.data;
Condition.signal buf.not_empty;
Mutex.unlock buf.mutex
The structure is identical: lock, check condition in a loop, wait (releases lock), modify, signal, unlock. OCaml's Condition.signal is equivalent to Rust's notify_one.
Full Source
#![allow(clippy::all)]
//! # Producer-Consumer Pattern
//! Classic concurrent pattern with bounded buffer.
use std::collections::VecDeque;
use std::sync::{mpsc, Arc, Condvar, Mutex};
use std::thread;
pub struct BoundedBuffer<T> {
data: Mutex<VecDeque<T>>,
capacity: usize,
not_empty: Condvar,
not_full: Condvar,
}
impl<T> BoundedBuffer<T> {
pub fn new(capacity: usize) -> Arc<Self> {
Arc::new(Self {
data: Mutex::new(VecDeque::new()),
capacity,
not_empty: Condvar::new(),
not_full: Condvar::new(),
})
}
pub fn put(&self, item: T) {
let mut data = self.data.lock().unwrap();
while data.len() >= self.capacity {
data = self.not_full.wait(data).unwrap();
}
data.push_back(item);
self.not_empty.notify_one();
}
pub fn take(&self) -> T {
let mut data = self.data.lock().unwrap();
while data.is_empty() {
data = self.not_empty.wait(data).unwrap();
}
let item = data.pop_front().unwrap();
self.not_full.notify_one();
item
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn producer_consumer() {
let buffer = BoundedBuffer::new(2);
let b1 = Arc::clone(&buffer);
let producer = thread::spawn(move || {
for i in 0..5 {
b1.put(i);
}
});
let mut results = Vec::new();
for _ in 0..5 {
results.push(buffer.take());
}
producer.join().unwrap();
assert_eq!(results, vec![0, 1, 2, 3, 4]);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn producer_consumer() {
let buffer = BoundedBuffer::new(2);
let b1 = Arc::clone(&buffer);
let producer = thread::spawn(move || {
for i in 0..5 {
b1.put(i);
}
});
let mut results = Vec::new();
for _ in 0..5 {
results.push(buffer.take());
}
producer.join().unwrap();
assert_eq!(results, vec![0, 1, 2, 3, 4]);
}
}
Deep Comparison
OCaml vs Rust: Producer Consumer
Overview
See the example.rs and example.ml files for detailed implementations.
Key Differences
| Aspect | OCaml | Rust |
|---|---|---|
| Type system | Hindley-Milner | Ownership + traits |
| Memory | GC | Zero-cost abstractions |
| Mutability | Explicit ref | mut keyword |
| Error handling | Option/Result | Result<T, E> |
See README.md for detailed comparison.
Exercises
HashSet to track them.mpsc::sync_channel(capacity) instead of Condvar; compare code complexity and performance.Option<T> where None signals shutdown) so producers can signal consumers to stop cleanly; implement and test with a finite work queue.