445: MPSC Channels — Message Passing Between Threads
Tutorial Video
Text description (accessibility)
This video demonstrates the "445: MPSC Channels — Message Passing Between Threads" functional Rust example. Difficulty level: Intermediate. Key concepts covered: Functional Programming. Shared mutable state with locks is error-prone: deadlocks, priority inversion, and complex lock ordering. Key difference from OCaml: 1. **MPSC vs. MPMC**: Rust's `std::sync::mpsc` is multiple
Tutorial
The Problem
Shared mutable state with locks is error-prone: deadlocks, priority inversion, and complex lock ordering. The alternative is message passing: threads communicate by sending values through channels, with no shared state. std::sync::mpsc (Multiple Producer, Single Consumer) provides channels for this pattern. Producers send messages; the consumer receives them. When all senders drop, the receiver's iteration automatically ends — a natural shutdown mechanism.
MPSC channels power the actor model, pipeline processing, result aggregation from worker threads, and the "channel as work queue" pattern used in thread pools.
🎯 Learning Outcomes
tx.clone() creates additional senders for multiple producer threadsdrop(tx) signals shutdown — the channel closes when all senders droprx.iter() for collecting all messages until channel closempsc and Go's channels (Go's are MPMC with select)Code Example
let (tx, rx) = mpsc::channel::<String>();
// Send
tx.send("message".into()).unwrap();
// Receive (blocking)
let msg = rx.recv().unwrap();Key Differences
std::sync::mpsc is multiple-producer, single-consumer; Go's channels are MPMC. For MPMC in Rust, use crossbeam::channel.mpsc::channel() is unbounded (back-pressure requires explicit management); mpsc::sync_channel(n) creates bounded channels.mpsc has no select; crossbeam::channel + crossbeam::select! enable multi-channel receive.OCaml Approach
OCaml's Event module provides synchronous channels: let ch = Event.new_channel(), Event.sync (Event.send ch v) blocks until a receiver is ready. The Thread_safe_queue from Core provides asynchronous buffered queues. OCaml 5.x's Domainslib.Chan provides a task pool with channels. Unlike Rust's mpsc, OCaml's built-in channel primitives are more primitive and require more assembly for complex patterns.
Full Source
#![allow(clippy::all)]
//! # MPSC Channels — Message Passing Between Threads
//!
//! Send values across threads with `std::sync::mpsc` — multiple producers,
//! one consumer, with automatic shutdown when all senders drop.
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;
use std::time::Duration;
/// Approach 1: Multiple producers, single consumer
pub fn multi_producer_single_consumer(
num_producers: usize,
msgs_per_producer: usize,
) -> Vec<String> {
let (tx, rx) = mpsc::channel::<String>();
let handles: Vec<_> = (0..num_producers)
.map(|id| {
let tx = tx.clone();
thread::spawn(move || {
for i in 0..msgs_per_producer {
tx.send(format!("p{}-msg{}", id, i)).unwrap();
}
})
})
.collect();
drop(tx); // Important: drop original sender
// Collect all messages
let messages: Vec<String> = rx.iter().collect();
for h in handles {
h.join().unwrap();
}
messages
}
/// Approach 2: Bounded channel using sync_channel
pub fn bounded_channel_demo(buffer_size: usize, num_msgs: usize) -> Vec<i32> {
let (tx, rx) = mpsc::sync_channel::<i32>(buffer_size);
let producer = thread::spawn(move || {
for i in 0..num_msgs as i32 {
tx.send(i).unwrap();
}
});
let consumer = thread::spawn(move || {
let mut results = Vec::new();
for msg in rx {
results.push(msg);
}
results
});
producer.join().unwrap();
consumer.join().unwrap()
}
/// Approach 3: Non-blocking try_recv and try_iter
pub fn non_blocking_receive(msgs: Vec<i32>) -> Vec<i32> {
let (tx, rx) = mpsc::channel();
for msg in msgs {
tx.send(msg).unwrap();
}
drop(tx);
// Non-blocking collect
rx.try_iter().collect()
}
/// Approach 4: Timeout-based receive
pub fn receive_with_timeout(timeout_ms: u64) -> Option<i32> {
let (tx, rx) = mpsc::channel();
let sender = thread::spawn(move || {
thread::sleep(Duration::from_millis(timeout_ms * 2));
let _ = tx.send(42);
});
let result = rx.recv_timeout(Duration::from_millis(timeout_ms)).ok();
sender.join().unwrap();
result
}
/// Producer-consumer pattern with work items
pub struct WorkQueue<T> {
sender: Sender<T>,
}
impl<T> WorkQueue<T> {
pub fn new() -> (Self, Receiver<T>) {
let (sender, receiver) = mpsc::channel();
(Self { sender }, receiver)
}
pub fn send(&self, item: T) -> Result<(), mpsc::SendError<T>> {
self.sender.send(item)
}
pub fn clone_sender(&self) -> Sender<T> {
self.sender.clone()
}
}
impl<T> Default for WorkQueue<T> {
fn default() -> Self {
Self::new().0
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_send_recv_basic() {
let (tx, rx) = mpsc::channel();
tx.send(42u32).unwrap();
assert_eq!(rx.recv().unwrap(), 42);
}
#[test]
fn test_channel_closed() {
let (tx, rx) = mpsc::channel::<i32>();
drop(tx);
assert!(rx.recv().is_err());
}
#[test]
fn test_multiple_producers() {
let (tx, rx) = mpsc::channel::<u32>();
let handles: Vec<_> = (0..4)
.map(|i| {
let tx = tx.clone();
thread::spawn(move || tx.send(i).unwrap())
})
.collect();
drop(tx);
let mut results: Vec<u32> = rx.iter().collect();
results.sort();
assert_eq!(results, vec![0, 1, 2, 3]);
for h in handles {
h.join().unwrap();
}
}
#[test]
fn test_multi_producer_consumer() {
let messages = multi_producer_single_consumer(3, 5);
assert_eq!(messages.len(), 15);
}
#[test]
fn test_bounded_channel() {
let results = bounded_channel_demo(2, 10);
assert_eq!(results, (0..10).collect::<Vec<i32>>());
}
#[test]
fn test_non_blocking() {
let input = vec![1, 2, 3, 4, 5];
let output = non_blocking_receive(input.clone());
assert_eq!(output, input);
}
#[test]
fn test_try_recv_empty() {
let (_tx, rx) = mpsc::channel::<i32>();
assert!(rx.try_recv().is_err());
}
#[test]
fn test_recv_timeout() {
let result = receive_with_timeout(10);
assert!(result.is_none()); // Timeout before message arrives
}
#[test]
fn test_work_queue() {
let (queue, rx) = WorkQueue::<i32>::new();
queue.send(1).unwrap();
queue.send(2).unwrap();
queue.send(3).unwrap();
let tx2 = queue.clone_sender();
tx2.send(4).unwrap();
drop(queue);
drop(tx2);
let results: Vec<i32> = rx.iter().collect();
assert_eq!(results, vec![1, 2, 3, 4]);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_send_recv_basic() {
let (tx, rx) = mpsc::channel();
tx.send(42u32).unwrap();
assert_eq!(rx.recv().unwrap(), 42);
}
#[test]
fn test_channel_closed() {
let (tx, rx) = mpsc::channel::<i32>();
drop(tx);
assert!(rx.recv().is_err());
}
#[test]
fn test_multiple_producers() {
let (tx, rx) = mpsc::channel::<u32>();
let handles: Vec<_> = (0..4)
.map(|i| {
let tx = tx.clone();
thread::spawn(move || tx.send(i).unwrap())
})
.collect();
drop(tx);
let mut results: Vec<u32> = rx.iter().collect();
results.sort();
assert_eq!(results, vec![0, 1, 2, 3]);
for h in handles {
h.join().unwrap();
}
}
#[test]
fn test_multi_producer_consumer() {
let messages = multi_producer_single_consumer(3, 5);
assert_eq!(messages.len(), 15);
}
#[test]
fn test_bounded_channel() {
let results = bounded_channel_demo(2, 10);
assert_eq!(results, (0..10).collect::<Vec<i32>>());
}
#[test]
fn test_non_blocking() {
let input = vec![1, 2, 3, 4, 5];
let output = non_blocking_receive(input.clone());
assert_eq!(output, input);
}
#[test]
fn test_try_recv_empty() {
let (_tx, rx) = mpsc::channel::<i32>();
assert!(rx.try_recv().is_err());
}
#[test]
fn test_recv_timeout() {
let result = receive_with_timeout(10);
assert!(result.is_none()); // Timeout before message arrives
}
#[test]
fn test_work_queue() {
let (queue, rx) = WorkQueue::<i32>::new();
queue.send(1).unwrap();
queue.send(2).unwrap();
queue.send(3).unwrap();
let tx2 = queue.clone_sender();
tx2.send(4).unwrap();
drop(queue);
drop(tx2);
let results: Vec<i32> = rx.iter().collect();
assert_eq!(results, vec![1, 2, 3, 4]);
}
}
Deep Comparison
OCaml vs Rust: MPSC Channels
Channel Creation
OCaml (Manual with Queue + Mutex + Condition)
let queue = Queue.create ()
let mutex = Mutex.create ()
let cond = Condition.create ()
let send v =
Mutex.lock mutex;
Queue.push v queue;
Condition.signal cond;
Mutex.unlock mutex
let recv () =
Mutex.lock mutex;
while Queue.is_empty queue do
Condition.wait cond mutex
done;
let v = Queue.pop queue in
Mutex.unlock mutex;
v
Rust
let (tx, rx) = mpsc::channel::<String>();
// Send
tx.send("message".into()).unwrap();
// Receive (blocking)
let msg = rx.recv().unwrap();
Multiple Producers
OCaml
(* Same send function works from multiple threads *)
let producers = List.init 3 (fun id ->
Thread.create (fun () ->
for i = 1 to 5 do
send (Printf.sprintf "p%d-msg%d" id i)
done
) ()
)
Rust
let handles: Vec<_> = (0..3).map(|id| {
let tx = tx.clone(); // Clone the sender
thread::spawn(move || {
for i in 0..5 {
tx.send(format!("p{}-msg{}", id, i)).unwrap();
}
})
}).collect();
drop(tx); // Drop original to close channel
Key Differences
| Feature | OCaml | Rust |
|---|---|---|
| Built-in channel | No (manual) | Yes (std::sync::mpsc) |
| Sender cloning | Same function | tx.clone() |
| Channel close | Sentinel value | Drop all senders |
| Shutdown signal | Manual | Automatic (recv() → Err) |
| Bounded channel | Manual size check | sync_channel(size) |
Consumer Iteration
OCaml
(* Must know message count or use sentinel *)
let consumer = Thread.create (fun () ->
for _ = 1 to 15 do
Printf.printf "got: %s\n%!" (recv ())
done
) ()
Rust
// Iterate until channel closes
for msg in rx {
println!("got: {}", msg);
}
// Loop exits when all senders drop
Non-blocking Operations
OCaml
(* Manual try with immediate check *)
let try_recv () =
Mutex.lock mutex;
let result =
if Queue.is_empty queue then None
else Some (Queue.pop queue)
in
Mutex.unlock mutex;
result
Rust
// try_recv returns immediately
match rx.try_recv() {
Ok(msg) => println!("got {}", msg),
Err(TryRecvError::Empty) => println!("no message"),
Err(TryRecvError::Disconnected) => println!("closed"),
}
// try_iter drains all available
let all: Vec<_> = rx.try_iter().collect();
Timeout Receive (Rust-specific)
match rx.recv_timeout(Duration::from_secs(1)) {
Ok(msg) => process(msg),
Err(RecvTimeoutError::Timeout) => handle_timeout(),
Err(RecvTimeoutError::Disconnected) => break,
}
Exercises
mpsc channels. Verify the final output.mpsc::channel() with mpsc::sync_channel(10). Producer threads will block when the buffer is full. Verify the producer is throttled when the consumer is slow (add a thread::sleep in the consumer).