451: Crossbeam Select — Multiplexing Channels
Tutorial Video
Text description (accessibility)
This video demonstrates the "451: Crossbeam Select — Multiplexing Channels" functional Rust example. Difficulty level: Fundamental. Key concepts covered: Functional Programming. A thread watching multiple channels needs to respond to whichever has data first, without blocking on one while the other has messages. Key difference from OCaml: 1. **Blocking vs. polling**: `crossbeam::select!` blocks without CPU waste; the std polling approach wastes CPU on the spin loop.
Tutorial
The Problem
A thread watching multiple channels needs to respond to whichever has data first, without blocking on one while the other has messages. Go's select statement solves this natively. Rust's std::sync::mpsc has no select mechanism — you'd need to poll with try_recv in a loop, wasting CPU. crossbeam::select! provides efficient blocking select across multiple channels: the calling thread blocks until any channel has a message, then executes the matching arm.
Channel select appears in event-driven systems, timeouts combined with work channels, message routing, and any pattern where a thread must respond to multiple event sources.
🎯 Learning Outcomes
crossbeam::select! blocks efficiently until any channel is readyCode Example
loop {
if let Ok(v) = rx1.try_recv() { return First(v); }
if let Ok(v) = rx2.try_recv() { return Second(v); }
thread::sleep(Duration::from_millis(1));
}Key Differences
crossbeam::select! blocks without CPU waste; the std polling approach wastes CPU on the spin loop.crossbeam::select! handles fairness for multiple ready channels; the polling approach picks the first channel it checks.crossbeam::select! with crossbeam_channel::after(duration) adds timeout cleanly; polling needs explicit Instant::now() tracking.select is built into the language; Rust requires the crossbeam crate or tokio::select! for async code.OCaml Approach
OCaml's Event module has Event.select [Event.receive ch1; Event.receive ch2] for channel-level select — blocking until any event is ready. Async.choose and Lwt.pick provide async-style select. Domainslib.Chan.recv_poll enables non-blocking attempts in OCaml 5.x. Event-driven OCaml programming uses these primitives to multiplex across I/O, timers, and inter-domain communication.
Full Source
#![allow(clippy::all)]
//! # Crossbeam Select — Multiplexing Multiple Channels
//!
//! Wait on multiple channels simultaneously, receiving from whichever
//! is ready first. Implemented here with std polling to show the concept.
use std::sync::mpsc::{self, Receiver, TryRecvError};
use std::thread;
use std::time::Duration;
/// Approach 1: Poll-based select with try_recv
pub fn poll_select<T, U>(
rx1: &Receiver<T>,
rx2: &Receiver<U>,
timeout: Duration,
) -> SelectResult<T, U> {
let start = std::time::Instant::now();
let poll_interval = Duration::from_millis(1);
loop {
// Try first channel
match rx1.try_recv() {
Ok(v) => return SelectResult::First(v),
Err(TryRecvError::Disconnected) => return SelectResult::Closed,
Err(TryRecvError::Empty) => {}
}
// Try second channel
match rx2.try_recv() {
Ok(v) => return SelectResult::Second(v),
Err(TryRecvError::Disconnected) => return SelectResult::Closed,
Err(TryRecvError::Empty) => {}
}
// Check timeout
if start.elapsed() >= timeout {
return SelectResult::Timeout;
}
thread::sleep(poll_interval);
}
}
#[derive(Debug, PartialEq)]
pub enum SelectResult<T, U> {
First(T),
Second(U),
Timeout,
Closed,
}
/// Approach 2: Priority select (prefer first channel)
pub fn priority_select<T: Clone>(
primary: &Receiver<T>,
secondary: &Receiver<T>,
) -> Option<(T, bool)> {
// Always try primary first
if let Ok(v) = primary.try_recv() {
return Some((v, true));
}
if let Ok(v) = secondary.try_recv() {
return Some((v, false));
}
None
}
/// Approach 3: Drain all available messages
pub fn drain_all<T>(receivers: &[Receiver<T>]) -> Vec<T> {
let mut results = Vec::new();
for rx in receivers {
while let Ok(v) = rx.try_recv() {
results.push(v);
}
}
results
}
/// Approach 4: Select with stop signal
pub fn select_with_stop<T>(data_rx: Receiver<T>, stop_rx: Receiver<()>, timeout: Duration) -> Vec<T>
where
T: Send + 'static,
{
let mut results = Vec::new();
let start = std::time::Instant::now();
loop {
// Check stop signal
if stop_rx.try_recv().is_ok() {
break;
}
// Check data
if let Ok(v) = data_rx.try_recv() {
results.push(v);
continue;
}
// Check timeout
if start.elapsed() >= timeout {
break;
}
thread::sleep(Duration::from_millis(1));
}
results
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_poll_select_first() {
let (tx1, rx1) = mpsc::channel::<i32>();
let (_tx2, rx2) = mpsc::channel::<String>();
tx1.send(42).unwrap();
let result = poll_select(&rx1, &rx2, Duration::from_millis(100));
assert_eq!(result, SelectResult::First(42));
}
#[test]
fn test_poll_select_second() {
let (_tx1, rx1) = mpsc::channel::<i32>();
let (tx2, rx2) = mpsc::channel::<String>();
tx2.send("hello".into()).unwrap();
let result = poll_select(&rx1, &rx2, Duration::from_millis(100));
assert_eq!(result, SelectResult::Second("hello".into()));
}
#[test]
fn test_poll_select_timeout() {
let (_tx1, rx1) = mpsc::channel::<i32>();
let (_tx2, rx2) = mpsc::channel::<String>();
let result = poll_select(&rx1, &rx2, Duration::from_millis(10));
assert_eq!(result, SelectResult::Timeout);
}
#[test]
fn test_try_recv_empty() {
let (_tx, rx) = mpsc::channel::<u32>();
assert!(rx.try_recv().is_err());
}
#[test]
fn test_try_recv_available() {
let (tx, rx) = mpsc::channel::<u32>();
tx.send(1).unwrap();
assert_eq!(rx.try_recv().unwrap(), 1);
}
#[test]
fn test_priority_select() {
let (tx1, rx1) = mpsc::channel();
let (tx2, rx2) = mpsc::channel();
tx1.send(1).unwrap();
tx2.send(2).unwrap();
// Primary has priority
let result = priority_select(&rx1, &rx2);
assert_eq!(result, Some((1, true)));
// Now secondary
let result = priority_select(&rx1, &rx2);
assert_eq!(result, Some((2, false)));
}
#[test]
fn test_drain_all() {
let (tx1, rx1) = mpsc::channel();
let (tx2, rx2) = mpsc::channel();
tx1.send(1).unwrap();
tx1.send(2).unwrap();
tx2.send(3).unwrap();
let results = drain_all(&[rx1, rx2]);
assert_eq!(results, vec![1, 2, 3]);
}
#[test]
fn test_select_with_stop() {
let (data_tx, data_rx) = mpsc::channel();
let (stop_tx, stop_rx) = mpsc::channel();
data_tx.send(1).unwrap();
data_tx.send(2).unwrap();
stop_tx.send(()).unwrap();
let results = select_with_stop(data_rx, stop_rx, Duration::from_secs(1));
assert!(results.len() <= 2);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_poll_select_first() {
let (tx1, rx1) = mpsc::channel::<i32>();
let (_tx2, rx2) = mpsc::channel::<String>();
tx1.send(42).unwrap();
let result = poll_select(&rx1, &rx2, Duration::from_millis(100));
assert_eq!(result, SelectResult::First(42));
}
#[test]
fn test_poll_select_second() {
let (_tx1, rx1) = mpsc::channel::<i32>();
let (tx2, rx2) = mpsc::channel::<String>();
tx2.send("hello".into()).unwrap();
let result = poll_select(&rx1, &rx2, Duration::from_millis(100));
assert_eq!(result, SelectResult::Second("hello".into()));
}
#[test]
fn test_poll_select_timeout() {
let (_tx1, rx1) = mpsc::channel::<i32>();
let (_tx2, rx2) = mpsc::channel::<String>();
let result = poll_select(&rx1, &rx2, Duration::from_millis(10));
assert_eq!(result, SelectResult::Timeout);
}
#[test]
fn test_try_recv_empty() {
let (_tx, rx) = mpsc::channel::<u32>();
assert!(rx.try_recv().is_err());
}
#[test]
fn test_try_recv_available() {
let (tx, rx) = mpsc::channel::<u32>();
tx.send(1).unwrap();
assert_eq!(rx.try_recv().unwrap(), 1);
}
#[test]
fn test_priority_select() {
let (tx1, rx1) = mpsc::channel();
let (tx2, rx2) = mpsc::channel();
tx1.send(1).unwrap();
tx2.send(2).unwrap();
// Primary has priority
let result = priority_select(&rx1, &rx2);
assert_eq!(result, Some((1, true)));
// Now secondary
let result = priority_select(&rx1, &rx2);
assert_eq!(result, Some((2, false)));
}
#[test]
fn test_drain_all() {
let (tx1, rx1) = mpsc::channel();
let (tx2, rx2) = mpsc::channel();
tx1.send(1).unwrap();
tx1.send(2).unwrap();
tx2.send(3).unwrap();
let results = drain_all(&[rx1, rx2]);
assert_eq!(results, vec![1, 2, 3]);
}
#[test]
fn test_select_with_stop() {
let (data_tx, data_rx) = mpsc::channel();
let (stop_tx, stop_rx) = mpsc::channel();
data_tx.send(1).unwrap();
data_tx.send(2).unwrap();
stop_tx.send(()).unwrap();
let results = select_with_stop(data_rx, stop_rx, Duration::from_secs(1));
assert!(results.len() <= 2);
}
}
Deep Comparison
OCaml vs Rust: Channel Select
Select from Multiple Channels
OCaml (Event module)
let msg = Event.sync (Event.choose [
Event.receive ch1;
Event.receive ch2
])
Rust (Polling)
loop {
if let Ok(v) = rx1.try_recv() { return First(v); }
if let Ok(v) = rx2.try_recv() { return Second(v); }
thread::sleep(Duration::from_millis(1));
}
Key Differences
| Feature | OCaml | Rust (std) |
|---|---|---|
| Select primitive | Event.choose | Poll loop or crossbeam-channel |
| Blocking select | Yes | No (use crossbeam) |
| Timeout | Manual | recv_timeout |
Exercises
crossbeam::select! to receive either work items or shutdown signals, stopping when shutdown arrives.receive_with_timeout<T>(rx: &Receiver<T>, timeout: Duration) -> Option<T> using crossbeam::select! with a crossbeam_channel::after(timeout) timeout channel.select!. First always check high_priority; only check low_priority when high_priority is empty. Implement this fairly (low priority eventually served even with continuous high-priority traffic).