341: MPSC Channel
Tutorial Video
Text description (accessibility)
This video demonstrates the "341: MPSC Channel" functional Rust example. Difficulty level: Intermediate. Key concepts covered: Functional Programming. Threads need to communicate without sharing memory unsafely. Key difference from OCaml: | Aspect | Rust `mpsc` | OCaml `Event` |
Tutorial
The Problem
Threads need to communicate without sharing memory unsafely. The MPSC (multi-producer, single-consumer) channel pattern solves thread communication by providing a typed message queue where many threads can send and one thread receives. Channels originate from Hoare's Communicating Sequential Processes (1978) and Dijkstra's work on process communication — the philosophy: don't communicate by sharing memory; share memory by communicating. Channels make data flow explicit, eliminating the need for locks around communication points and preventing entire classes of race conditions.
🎯 Learning Outcomes
std::sync::mpsc::channel() to create unbounded channelsmpsc::sync_channel(capacity) for bounded/backpressure channelsSender<T> to create multiple producers for one receiverrx closes when all senders are gonerx.into_iter() to collect all messages until channel closesCode Example
let (tx, rx) = mpsc::channel();
tx.send(value).unwrap();Key Differences
| Aspect | Rust mpsc | OCaml Event |
|---|---|---|
| Producer count | Many (Sender is Clone) | Many (send to same channel) |
| Consumer count | One (Receiver not Clone) | One sync'd receive at a time |
| Default behavior | Asynchronous (unbounded) | Synchronous (rendezvous) |
| Bounded variant | sync_channel(n) | No built-in; use Domainslib |
| Type safety | T: Send + 'static | Polymorphic channel 'a Event.channel |
OCaml Approach
OCaml's Event module provides synchronous channels from CML (Concurrent ML). Event.channel() creates a typed channel; Event.send and Event.receive create events that Event.sync commits:
let ch = Event.channel () in
let _ = Thread.create (fun () -> Event.sync (Event.send ch 42)) () in
let v = Event.sync (Event.receive ch)
For async message passing, Thread+Mutex+Queue combinations are common, or the Domainslib library in OCaml 5 provides Task.async/Task.await with channels between domains.
Full Source
#![allow(clippy::all)]
//! # MPSC Channel
//!
//! Multi-producer, single-consumer channel — the standard way to communicate between threads.
use std::sync::mpsc::{self, Sender};
use std::thread;
/// Creates a fan-in pattern: multiple producers, one consumer.
pub fn fan_in<T: Send + 'static>(producers: Vec<Box<dyn FnOnce(Sender<T>) + Send>>) -> Vec<T> {
let (tx, rx) = mpsc::channel();
for producer in producers {
let tx = tx.clone();
thread::spawn(move || producer(tx));
}
drop(tx); // Important: drop original so rx closes when all producers done
rx.into_iter().collect()
}
/// Creates a bounded channel that applies backpressure.
pub fn bounded_producer_consumer(capacity: usize, items: Vec<i32>) -> Vec<i32> {
let (tx, rx) = mpsc::sync_channel::<i32>(capacity);
let producer = thread::spawn(move || {
for item in items {
tx.send(item).unwrap(); // Blocks if buffer full
}
});
let results: Vec<_> = rx.into_iter().collect();
producer.join().unwrap();
results
}
/// Demonstrates multiple producers sending to one consumer.
pub fn multi_producer(num_producers: usize, messages_per_producer: usize) -> Vec<String> {
let (tx, rx) = mpsc::channel();
for i in 0..num_producers {
let tx = tx.clone();
thread::spawn(move || {
for j in 0..messages_per_producer {
tx.send(format!("producer-{}-msg-{}", i, j)).unwrap();
}
});
}
drop(tx);
rx.into_iter().collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fan_in() {
let producers: Vec<Box<dyn FnOnce(Sender<i32>) + Send>> = vec![
Box::new(|tx| {
tx.send(1).unwrap();
tx.send(2).unwrap();
}),
Box::new(|tx| {
tx.send(3).unwrap();
}),
];
let mut results = fan_in(producers);
results.sort();
assert_eq!(results, vec![1, 2, 3]);
}
#[test]
fn test_bounded_channel() {
let results = bounded_producer_consumer(2, vec![1, 2, 3, 4, 5]);
assert_eq!(results, vec![1, 2, 3, 4, 5]);
}
#[test]
fn test_multi_producer() {
let results = multi_producer(3, 2);
assert_eq!(results.len(), 6);
}
#[test]
fn test_channel_closes_when_senders_dropped() {
let (tx, rx) = mpsc::channel::<i32>();
drop(tx);
assert!(rx.recv().is_err());
}
#[test]
fn test_sync_channel_blocks() {
let (tx, rx) = mpsc::sync_channel::<i32>(1);
tx.send(1).unwrap();
// Next send would block if we didn't receive
assert_eq!(rx.recv().unwrap(), 1);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fan_in() {
let producers: Vec<Box<dyn FnOnce(Sender<i32>) + Send>> = vec![
Box::new(|tx| {
tx.send(1).unwrap();
tx.send(2).unwrap();
}),
Box::new(|tx| {
tx.send(3).unwrap();
}),
];
let mut results = fan_in(producers);
results.sort();
assert_eq!(results, vec![1, 2, 3]);
}
#[test]
fn test_bounded_channel() {
let results = bounded_producer_consumer(2, vec![1, 2, 3, 4, 5]);
assert_eq!(results, vec![1, 2, 3, 4, 5]);
}
#[test]
fn test_multi_producer() {
let results = multi_producer(3, 2);
assert_eq!(results.len(), 6);
}
#[test]
fn test_channel_closes_when_senders_dropped() {
let (tx, rx) = mpsc::channel::<i32>();
drop(tx);
assert!(rx.recv().is_err());
}
#[test]
fn test_sync_channel_blocks() {
let (tx, rx) = mpsc::sync_channel::<i32>(1);
tx.send(1).unwrap();
// Next send would block if we didn't receive
assert_eq!(rx.recv().unwrap(), 1);
}
}
Deep Comparison
OCaml vs Rust: MPSC Channel
Channel Usage
OCaml:
let ch = Event.new_channel () in
Event.sync (Event.send ch value)
Rust:
let (tx, rx) = mpsc::channel();
tx.send(value).unwrap();
Key Differences
| Aspect | OCaml | Rust |
|---|---|---|
| Split sender/receiver | No | Yes (tx, rx) |
| Clone sender | N/A | tx.clone() |
| Iterate receiver | Manual loop | for msg in rx |
| Close detection | Manual | Automatic on sender drop |
Exercises
sync_channel(2) with a slow consumer (thread::sleep) and a fast producer — observe that the producer blocks automatically.fan_in to handle Result<T, E> messages, collecting successes and errors separately, without panicking on any individual failure.