983 Channel Basics
Tutorial
The Problem
Introduce Rust's std::sync::mpsc (Multi-Producer Single Consumer) channels for message-passing between threads. Implement a single-producer/consumer pair, a multi-producer/single-consumer pattern using tx.clone(), and a bounded channel using mpsc::sync_channel. Channels enforce ownership transfer — the sender gives up ownership of each sent value.
🎯 Learning Outcomes
(Sender<T>, Receiver<T>) pairs with mpsc::channel()move closure that owns the Sender endrx.iter() which loops until all senders are droppedSender for multiple producers: let tx2 = tx.clone()mpsc::sync_channel(capacity) for a bounded channel with backpressureCode Example
#![allow(clippy::all)]
// 983: MPSC Channel Basics
// Rust: std::sync::mpsc — Multiple Producer, Single Consumer
use std::sync::mpsc;
use std::thread;
// --- Approach 1: Single producer, single consumer ---
fn single_producer_consumer() -> Vec<i32> {
let (tx, rx) = mpsc::channel::<i32>();
let producer = thread::spawn(move || {
for i in 1..=5 {
tx.send(i).unwrap();
}
// tx drops here — channel closes
});
// Collect until channel is closed
let results: Vec<i32> = rx.iter().collect();
producer.join().unwrap();
results
}
// --- Approach 2: Multiple producers (clone the sender) ---
fn multi_producer_consumer() -> Vec<i32> {
let (tx, rx) = mpsc::channel::<i32>();
let handles: Vec<_> = (0..3)
.map(|batch| {
let tx = tx.clone(); // each producer gets its own sender
thread::spawn(move || {
let start = batch * 10 + 1;
for i in start..=start + 2 {
tx.send(i).unwrap();
}
// tx drops when thread exits
})
})
.collect();
drop(tx); // drop original so channel closes when all clones drop
let mut results: Vec<i32> = rx.iter().collect();
for h in handles {
h.join().unwrap();
}
results.sort();
results
}
// --- Approach 3: Producer sends typed messages ---
#[derive(Debug, PartialEq)]
enum WorkItem {
Task(String),
Done,
}
fn typed_channel() -> Vec<String> {
let (tx, rx) = mpsc::channel::<WorkItem>();
let producer = thread::spawn(move || {
for name in ["alpha", "beta", "gamma"] {
tx.send(WorkItem::Task(name.to_string())).unwrap();
}
tx.send(WorkItem::Done).unwrap();
});
let mut results = Vec::new();
loop {
match rx.recv().unwrap() {
WorkItem::Task(s) => results.push(s),
WorkItem::Done => break,
}
}
producer.join().unwrap();
results
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_single_producer() {
assert_eq!(single_producer_consumer(), vec![1, 2, 3, 4, 5]);
}
#[test]
fn test_multi_producer() {
let results = multi_producer_consumer();
assert_eq!(results.len(), 9);
// Contains items from all 3 batches
assert!(results.contains(&1));
assert!(results.contains(&11));
assert!(results.contains(&21));
}
#[test]
fn test_typed_channel() {
let results = typed_channel();
assert_eq!(results, vec!["alpha", "beta", "gamma"]);
}
#[test]
fn test_channel_closes_on_drop() {
let (tx, rx) = mpsc::channel::<i32>();
drop(tx); // immediately close
assert!(rx.recv().is_err()); // disconnected
}
#[test]
fn test_recv_blocks_until_send() {
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(1));
tx.send(42).unwrap();
});
assert_eq!(rx.recv().unwrap(), 42);
h.join().unwrap();
}
}Key Differences
| Aspect | Rust | OCaml |
|---|---|---|
| Channel type | mpsc — multi-producer, single-consumer | Event.channel — synchronous rendezvous |
| Buffer | Unbounded (channel) or bounded (sync_channel) | Zero-buffer (synchronous) |
| Multiple senders | tx.clone() — reference counted Sender | Event.channel is already shared |
| Channel close | All senders dropped | No built-in close; use sentinel value |
| Ownership transfer | Moved into channel | Shared via GC |
mpsc channels are a safe, efficient alternative to shared-memory concurrency. They enforce a clear ownership model: each value has exactly one owner at any time, moving from producer to consumer via the channel.
OCaml Approach
(* OCaml: Event module (stdlib) for synchronous channels *)
let ch = Event.new_channel ()
let producer () =
List.iter (fun i ->
Event.sync (Event.send ch i)
) [1;2;3;4;5]
let consumer () =
let rec loop acc =
let v = Event.sync (Event.receive ch) in
loop (v :: acc)
in
(* OCaml Event is synchronous — no buffer *)
loop []
(* Practical: use Lwt_stream or Domainslib.Chan for async/parallel *)
let (stream, push) = Lwt_stream.create ()
let push_items () =
List.iter (fun i -> push (Some i)) [1;2;3;4;5];
push None
OCaml's standard Event module provides synchronous channels (rendezvous — no buffer). For buffered async channels, Lwt_stream or Domainslib.Chan are the practical choices.
Full Source
#![allow(clippy::all)]
// 983: MPSC Channel Basics
// Rust: std::sync::mpsc — Multiple Producer, Single Consumer
use std::sync::mpsc;
use std::thread;
// --- Approach 1: Single producer, single consumer ---
fn single_producer_consumer() -> Vec<i32> {
let (tx, rx) = mpsc::channel::<i32>();
let producer = thread::spawn(move || {
for i in 1..=5 {
tx.send(i).unwrap();
}
// tx drops here — channel closes
});
// Collect until channel is closed
let results: Vec<i32> = rx.iter().collect();
producer.join().unwrap();
results
}
// --- Approach 2: Multiple producers (clone the sender) ---
fn multi_producer_consumer() -> Vec<i32> {
let (tx, rx) = mpsc::channel::<i32>();
let handles: Vec<_> = (0..3)
.map(|batch| {
let tx = tx.clone(); // each producer gets its own sender
thread::spawn(move || {
let start = batch * 10 + 1;
for i in start..=start + 2 {
tx.send(i).unwrap();
}
// tx drops when thread exits
})
})
.collect();
drop(tx); // drop original so channel closes when all clones drop
let mut results: Vec<i32> = rx.iter().collect();
for h in handles {
h.join().unwrap();
}
results.sort();
results
}
// --- Approach 3: Producer sends typed messages ---
#[derive(Debug, PartialEq)]
enum WorkItem {
Task(String),
Done,
}
fn typed_channel() -> Vec<String> {
let (tx, rx) = mpsc::channel::<WorkItem>();
let producer = thread::spawn(move || {
for name in ["alpha", "beta", "gamma"] {
tx.send(WorkItem::Task(name.to_string())).unwrap();
}
tx.send(WorkItem::Done).unwrap();
});
let mut results = Vec::new();
loop {
match rx.recv().unwrap() {
WorkItem::Task(s) => results.push(s),
WorkItem::Done => break,
}
}
producer.join().unwrap();
results
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_single_producer() {
assert_eq!(single_producer_consumer(), vec![1, 2, 3, 4, 5]);
}
#[test]
fn test_multi_producer() {
let results = multi_producer_consumer();
assert_eq!(results.len(), 9);
// Contains items from all 3 batches
assert!(results.contains(&1));
assert!(results.contains(&11));
assert!(results.contains(&21));
}
#[test]
fn test_typed_channel() {
let results = typed_channel();
assert_eq!(results, vec!["alpha", "beta", "gamma"]);
}
#[test]
fn test_channel_closes_on_drop() {
let (tx, rx) = mpsc::channel::<i32>();
drop(tx); // immediately close
assert!(rx.recv().is_err()); // disconnected
}
#[test]
fn test_recv_blocks_until_send() {
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(1));
tx.send(42).unwrap();
});
assert_eq!(rx.recv().unwrap(), 42);
h.join().unwrap();
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_single_producer() {
assert_eq!(single_producer_consumer(), vec![1, 2, 3, 4, 5]);
}
#[test]
fn test_multi_producer() {
let results = multi_producer_consumer();
assert_eq!(results.len(), 9);
// Contains items from all 3 batches
assert!(results.contains(&1));
assert!(results.contains(&11));
assert!(results.contains(&21));
}
#[test]
fn test_typed_channel() {
let results = typed_channel();
assert_eq!(results, vec!["alpha", "beta", "gamma"]);
}
#[test]
fn test_channel_closes_on_drop() {
let (tx, rx) = mpsc::channel::<i32>();
drop(tx); // immediately close
assert!(rx.recv().is_err()); // disconnected
}
#[test]
fn test_recv_blocks_until_send() {
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(1));
tx.send(42).unwrap();
});
assert_eq!(rx.recv().unwrap(), 42);
h.join().unwrap();
}
}
Deep Comparison
MPSC Channel Basics — Comparison
Core Insight
Channels are the functional alternative to shared mutable state: send immutable values between threads instead of sharing pointers. Both OCaml and Rust use typed channels, but Rust's MPSC is part of std while OCaml needs the Thread + Event or Thread + Queue + Mutex pattern.
OCaml Approach
Event.channel () creates a synchronous channel (rendezvous semantics)Event.sync (Event.send c v) blocks until receiver is readyThread + Queue + Mutex for asynchronous buffered channelsMutex + Queue + ConditionRust Approach
mpsc::channel() creates an unbounded asynchronous channelmpsc::sync_channel(n) creates a bounded channel (blocks on full)tx.clone() — all senders share one receiverSenders are droppedrx.iter() is idiomatic for "drain until closed"Comparison Table
| Concept | OCaml | Rust |
|---|---|---|
| Create channel | Event.channel () / Queue+Mutex | mpsc::channel() |
| Send message | Event.sync (Event.send c v) | tx.send(v).unwrap() |
| Receive message | Event.sync (Event.receive c) | rx.recv().unwrap() |
| Multiple producers | Multiple threads with shared mutex | tx.clone() per producer |
| Close channel | GC when last ref dropped | Drop all Senders |
| Bounded buffer | Manual ring buffer | mpsc::sync_channel(n) |
| Drain all messages | Loop until done signal | rx.iter().collect() |
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
Option<T> and a consumer that stops on None — a sentinel-based close signal.mpsc::sync_channel(8) to implement bounded backpressure and observe how the producer blocks when the buffer is full.Receiver (requires cloning messages with Arc<T>).Stage1 -> chan1 -> Stage2 -> chan2 -> Stage3.mpsc throughput vs Mutex<VecDeque<T>> for 1,000,000 messages.