450: Crossbeam Channels — Advanced Channel Patterns
Tutorial Video
Text description (accessibility)
This video demonstrates the "450: Crossbeam Channels — Advanced Channel Patterns" functional Rust example. Difficulty level: Fundamental. Key concepts covered: Functional Programming. `std::sync::mpsc` provides basic MPSC (multiple producer, single consumer) channels but lacks bounded backpressure, multiple consumers, and select across channels. Key difference from OCaml: 1. **MPMC**: `crossbeam::channel` supports multiple consumers; `std::sync::mpsc` does not (MPSC only).
Tutorial
The Problem
std::sync::mpsc provides basic MPSC (multiple producer, single consumer) channels but lacks bounded backpressure, multiple consumers, and select across channels. crossbeam::channel provides both bounded (crossbeam::channel::bounded(n)) and unbounded channels with MPMC (multiple producer, multiple consumer) semantics. Bounded channels implement backpressure — the producer blocks when the buffer is full — preventing fast producers from overwhelming slow consumers with unbounded memory growth.
Crossbeam channels are used in high-performance task queues, trading systems, event brokers, and any pipeline where multiple workers process from the same channel.
🎯 Learning Outcomes
sync_channel(n) blocks sender when buffer is fullmpsc::sync_channel simulates crossbeam bounded channelsCode Example
let (tx, rx) = mpsc::sync_channel::<u32>(capacity);
// Send blocks when full
tx.send(value).unwrap();
// Receive blocks when empty
let value = rx.recv().unwrap();Key Differences
crossbeam::channel supports multiple consumers; std::sync::mpsc does not (MPSC only).crossbeam::select! enables waiting on multiple channels; std::sync::mpsc has no select.mpsc::sync_channel and crossbeam::bounded block the sender when full; they differ in MPMC support.crossbeam::channel uses more sophisticated lock-free algorithms vs. mpsc's mutex-based bounded channel.OCaml Approach
OCaml's Event module provides synchronous (zero-buffer) channels — the send always blocks until a receiver is ready. Domainslib.Chan.make_bounded n creates bounded channels for OCaml 5.x domains. Async.Pipe and Lwt_stream provide bounded buffered streams for async OCaml. The backpressure semantics are similar but integrated with their respective runtimes.
Full Source
#![allow(clippy::all)]
//! # Crossbeam Channels — Advanced Channel Patterns
//!
//! Demonstrates bounded channels and multi-consumer patterns
//! that crossbeam-channel provides, implemented with std.
use std::sync::mpsc::{self, Receiver, SyncSender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
/// Approach 1: Bounded (sync) channel
///
/// Sender blocks when buffer is full, providing backpressure.
pub fn bounded_channel_demo(capacity: usize, num_messages: usize) -> Vec<u32> {
let (tx, rx) = mpsc::sync_channel::<u32>(capacity);
let producer = thread::spawn(move || {
for i in 0..num_messages as u32 {
tx.send(i).unwrap();
}
});
let consumer = thread::spawn(move || {
let mut received = Vec::new();
for msg in rx {
received.push(msg);
}
received
});
producer.join().unwrap();
consumer.join().unwrap()
}
/// Approach 2: Multi-consumer pattern with Arc<Mutex<Receiver>>
pub fn multi_consumer_demo(num_consumers: usize, num_messages: usize) -> Vec<u32> {
let (tx, rx) = mpsc::channel::<u32>();
let rx = Arc::new(Mutex::new(rx));
let results = Arc::new(Mutex::new(Vec::new()));
let consumers: Vec<_> = (0..num_consumers)
.map(|_id| {
let rx = Arc::clone(&rx);
let results = Arc::clone(&results);
thread::spawn(move || loop {
let msg = rx.lock().unwrap().recv();
match msg {
Ok(v) => results.lock().unwrap().push(v),
Err(_) => break,
}
})
})
.collect();
// Send messages
for i in 0..num_messages as u32 {
tx.send(i).unwrap();
}
drop(tx); // Close channel
for c in consumers {
c.join().unwrap();
}
let mut result = Arc::try_unwrap(results).unwrap().into_inner().unwrap();
result.sort();
result
}
/// Approach 3: try_send for non-blocking send
pub fn try_send_demo(capacity: usize) -> (usize, usize) {
let (tx, rx) = mpsc::sync_channel::<i32>(capacity);
let mut sent = 0;
let mut failed = 0;
for i in 0..capacity as i32 + 5 {
match tx.try_send(i) {
Ok(()) => sent += 1,
Err(_) => failed += 1,
}
}
// Drain to prevent deadlock
drop(tx);
for _ in rx {}
(sent, failed)
}
/// Producer-consumer with timeout
pub fn recv_timeout_demo(timeout_ms: u64) -> Option<i32> {
let (tx, rx) = mpsc::sync_channel::<i32>(1);
// Delayed sender
let _ = thread::spawn(move || {
thread::sleep(Duration::from_millis(timeout_ms * 2));
let _ = tx.send(42);
});
rx.recv_timeout(Duration::from_millis(timeout_ms)).ok()
}
/// Work distribution pattern
pub fn work_distribution(num_workers: usize, jobs: Vec<i32>) -> Vec<i32> {
let (tx, rx) = mpsc::sync_channel::<i32>(num_workers * 2);
let rx = Arc::new(Mutex::new(rx));
let results = Arc::new(Mutex::new(Vec::new()));
// Spawn workers
let workers: Vec<_> = (0..num_workers)
.map(|_| {
let rx = Arc::clone(&rx);
let results = Arc::clone(&results);
thread::spawn(move || loop {
match rx.lock().unwrap().recv() {
Ok(job) => {
let result = job * job; // Process job
results.lock().unwrap().push(result);
}
Err(_) => break,
}
})
})
.collect();
// Send jobs
for job in jobs {
tx.send(job).unwrap();
}
drop(tx);
// Wait for workers
for w in workers {
w.join().unwrap();
}
let mut r = Arc::try_unwrap(results).unwrap().into_inner().unwrap();
r.sort();
r
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bounded_channel() {
let results = bounded_channel_demo(3, 10);
assert_eq!(results, (0..10).collect::<Vec<_>>());
}
#[test]
fn test_sync_channel_blocks_when_full() {
let (tx, rx) = mpsc::sync_channel::<u32>(2);
tx.send(1).unwrap();
tx.send(2).unwrap();
// Buffer full, try_send should fail
assert!(tx.try_send(3).is_err());
// After receiving, we can send again
assert_eq!(rx.recv().unwrap(), 1);
assert!(tx.try_send(3).is_ok());
}
#[test]
fn test_multi_consumer() {
let results = multi_consumer_demo(3, 9);
assert_eq!(results, (0..9).collect::<Vec<_>>());
}
#[test]
fn test_try_send() {
let (sent, failed) = try_send_demo(3);
assert_eq!(sent, 3);
assert_eq!(failed, 5);
}
#[test]
fn test_recv_timeout() {
let result = recv_timeout_demo(10);
assert!(result.is_none()); // Timeout before message
}
#[test]
fn test_work_distribution() {
let jobs: Vec<i32> = (1..=5).collect();
let results = work_distribution(2, jobs);
assert_eq!(results, vec![1, 4, 9, 16, 25]);
}
#[test]
fn test_empty_work_distribution() {
let results = work_distribution(2, vec![]);
assert!(results.is_empty());
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bounded_channel() {
let results = bounded_channel_demo(3, 10);
assert_eq!(results, (0..10).collect::<Vec<_>>());
}
#[test]
fn test_sync_channel_blocks_when_full() {
let (tx, rx) = mpsc::sync_channel::<u32>(2);
tx.send(1).unwrap();
tx.send(2).unwrap();
// Buffer full, try_send should fail
assert!(tx.try_send(3).is_err());
// After receiving, we can send again
assert_eq!(rx.recv().unwrap(), 1);
assert!(tx.try_send(3).is_ok());
}
#[test]
fn test_multi_consumer() {
let results = multi_consumer_demo(3, 9);
assert_eq!(results, (0..9).collect::<Vec<_>>());
}
#[test]
fn test_try_send() {
let (sent, failed) = try_send_demo(3);
assert_eq!(sent, 3);
assert_eq!(failed, 5);
}
#[test]
fn test_recv_timeout() {
let result = recv_timeout_demo(10);
assert!(result.is_none()); // Timeout before message
}
#[test]
fn test_work_distribution() {
let jobs: Vec<i32> = (1..=5).collect();
let results = work_distribution(2, jobs);
assert_eq!(results, vec![1, 4, 9, 16, 25]);
}
#[test]
fn test_empty_work_distribution() {
let results = work_distribution(2, vec![]);
assert!(results.is_empty());
}
}
Deep Comparison
OCaml vs Rust: Bounded Channels
Bounded Channel Creation
OCaml (Manual implementation)
let make_bounded capacity =
let queue = Queue.create () in
let mutex = Mutex.create () in
let not_full = Condition.create () in
let not_empty = Condition.create () in
let send v =
Mutex.lock mutex;
while Queue.length queue >= capacity do
Condition.wait not_full mutex
done;
Queue.push v queue;
Condition.signal not_empty;
Mutex.unlock mutex
in
let recv () =
Mutex.lock mutex;
while Queue.is_empty queue do
Condition.wait not_empty mutex
done;
let v = Queue.pop queue in
Condition.signal not_full;
Mutex.unlock mutex;
v
in
(send, recv)
Rust
let (tx, rx) = mpsc::sync_channel::<u32>(capacity);
// Send blocks when full
tx.send(value).unwrap();
// Receive blocks when empty
let value = rx.recv().unwrap();
Key Differences
| Feature | OCaml | Rust |
|---|---|---|
| Bounded channel | Manual | sync_channel(cap) |
| Backpressure | Manual Condvar | Built-in blocking |
| Try operations | Manual | try_send, try_recv |
| Multi-consumer | Shared queue | Arc<Mutex<Receiver>> |
Multi-Consumer Pattern
OCaml
(* Same queue shared by multiple threads *)
let consumers = Array.init num_consumers (fun id ->
Thread.create (fun () ->
(* Each thread calls recv on shared channel *)
) ()
)
Rust
let (tx, rx) = mpsc::channel::<u32>();
let rx = Arc::new(Mutex::new(rx)); // Wrap for sharing
let consumers: Vec<_> = (0..num_consumers).map(|_| {
let rx = Arc::clone(&rx);
thread::spawn(move || loop {
match rx.lock().unwrap().recv() {
Ok(v) => process(v),
Err(_) => break, // Channel closed
}
})
}).collect();
Non-blocking Operations
Rust
// Try to send without blocking
match tx.try_send(value) {
Ok(()) => println!("sent"),
Err(TrySendError::Full(v)) => println!("channel full"),
Err(TrySendError::Disconnected(v)) => println!("closed"),
}
// Try to receive without blocking
match rx.try_recv() {
Ok(v) => println!("got {}", v),
Err(TryRecvError::Empty) => println!("nothing yet"),
Err(TryRecvError::Disconnected) => println!("closed"),
}
With Crossbeam (Real library)
use crossbeam_channel::{bounded, select};
let (tx, rx) = bounded::<i32>(10);
// Select from multiple channels
select! {
recv(rx1) -> msg => println!("from rx1: {:?}", msg),
recv(rx2) -> msg => println!("from rx2: {:?}", msg),
default => println!("nothing ready"),
}
Exercises
bounded(100)) and spawn 4 worker threads, all consuming from the same channel. Verify that all work items are processed exactly once.crossbeam::channel or a custom wrapper, implement try_send_timeout(val, timeout) that attempts to send but returns Err(val) if the channel doesn't have capacity within the timeout.