985 Select Pattern
Tutorial
The Problem
Implement a select pattern that multiplexes over multiple channels — analogous to Unix select(2) or OCaml's Event.choose. Use try_recv polling with thread::yield_now to non-blockingly check each channel in a loop, returning the first available value. Model the result as an enum Selected<A, B> that indicates which channel produced a value.
🎯 Learning Outcomes
select<A, B>(rx1, rx2) -> Selected<A, B> using try_recv on each channelTryRecvError::Empty (channel has no item yet) vs TryRecvError::Disconnected (channel closed)r1_closed / r2_closed flags and return Selected::BothClosed when all channels closethread::yield_now() to avoid busy-spinning the CPUselect/epoll and why crossbeam::channel::select! is preferred for productionCode Example
#![allow(clippy::all)]
// 985: Select Pattern — Poll Multiple Channels
// Rust: try_recv loop for non-blocking select over multiple channels
use std::sync::mpsc::{self, TryRecvError};
use std::thread;
#[derive(Debug, PartialEq)]
enum Selected<A, B> {
Left(A),
Right(B),
BothClosed,
}
// --- Non-blocking select over two channels ---
fn select<A, B>(rx1: &mpsc::Receiver<A>, rx2: &mpsc::Receiver<B>) -> Selected<A, B> {
let mut r1_closed = false;
let mut r2_closed = false;
loop {
if !r1_closed {
match rx1.try_recv() {
Ok(v) => return Selected::Left(v),
Err(TryRecvError::Disconnected) => r1_closed = true,
Err(TryRecvError::Empty) => {}
}
}
if !r2_closed {
match rx2.try_recv() {
Ok(v) => return Selected::Right(v),
Err(TryRecvError::Disconnected) => r2_closed = true,
Err(TryRecvError::Empty) => {}
}
}
if r1_closed && r2_closed {
return Selected::BothClosed;
}
thread::yield_now();
}
}
// --- Drain both channels via select, categorizing messages ---
fn select_drain(rx1: mpsc::Receiver<i32>, rx2: mpsc::Receiver<String>) -> (Vec<i32>, Vec<String>) {
let mut lefts = Vec::new();
let mut rights = Vec::new();
loop {
match select(&rx1, &rx2) {
Selected::Left(v) => lefts.push(v),
Selected::Right(v) => rights.push(v),
Selected::BothClosed => break,
}
}
(lefts, rights)
}
// --- Priority select: prefer channel 1 when both have data ---
fn priority_recv<T>(high: &mpsc::Receiver<T>, low: &mpsc::Receiver<T>) -> Option<(T, bool)> {
// true = came from high priority
match high.try_recv() {
Ok(v) => Some((v, true)),
Err(_) => low.try_recv().ok().map(|v| (v, false)),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_select_drain() {
let (tx1, rx1) = mpsc::channel::<i32>();
let (tx2, rx2) = mpsc::channel::<String>();
for i in [1, 2, 3] {
tx1.send(i).unwrap();
}
for s in ["a", "b", "c"] {
tx2.send(s.to_string()).unwrap();
}
drop(tx1);
drop(tx2);
let (mut lefts, mut rights) = select_drain(rx1, rx2);
lefts.sort();
rights.sort();
assert_eq!(lefts, vec![1, 2, 3]);
assert_eq!(rights, vec!["a", "b", "c"]);
}
#[test]
fn test_both_closed() {
let (tx1, rx1) = mpsc::channel::<i32>();
let (tx2, rx2) = mpsc::channel::<i32>();
drop(tx1);
drop(tx2);
assert_eq!(select(&rx1, &rx2), Selected::BothClosed);
}
#[test]
fn test_priority_recv() {
let (htx, hrx) = mpsc::channel::<i32>();
let (ltx, lrx) = mpsc::channel::<i32>();
htx.send(10).unwrap();
ltx.send(20).unwrap();
// High priority wins
let result = priority_recv(&hrx, &lrx);
assert_eq!(result, Some((10, true)));
// Now only low available
let result2 = priority_recv(&hrx, &lrx);
assert_eq!(result2, Some((20, false)));
}
#[test]
fn test_select_empty_left() {
let (_tx1, rx1) = mpsc::channel::<i32>();
let (tx2, rx2) = mpsc::channel::<i32>();
tx2.send(99).unwrap();
drop(tx2);
// rx1 never closes so we'll get Right(99) first
assert_eq!(select(&rx1, &rx2), Selected::Right(99));
}
}Key Differences
| Aspect | Rust | OCaml |
|---|---|---|
| Standard select | None — use crossbeam::select! | Event.choose or Lwt.pick |
| Polling approach | try_recv loop + yield_now | Not needed — blocking select |
| CPU usage | Polling wastes CPU | Blocking is efficient |
| Multiple channels | Manual loop | choose [e1; e2; e3; ...] list |
For production use, crossbeam::channel::select! is the correct tool — it uses OS synchronization primitives for efficient multi-channel blocking. The polling approach here illustrates the mechanics.
OCaml Approach
open Event
type ('a, 'b) selected = Left of 'a | Right of 'b | BothClosed
let select ch1 ch2 =
let e1 = wrap (receive ch1) (fun v -> Left v) in
let e2 = wrap (receive ch2) (fun v -> Right v) in
sync (choose [e1; e2])
(* Lwt version *)
let lwt_select p1 p2 =
Lwt.pick [
Lwt.map (fun v -> `Left v) p1;
Lwt.map (fun v -> `Right v) p2;
]
OCaml's Event.choose selects non-deterministically from a list of events — whichever event fires first is returned. Lwt.pick does the same for promises. Both are blocking (no polling loop), making them more efficient than the Rust try_recv loop.
Full Source
#![allow(clippy::all)]
// 985: Select Pattern — Poll Multiple Channels
// Rust: try_recv loop for non-blocking select over multiple channels
use std::sync::mpsc::{self, TryRecvError};
use std::thread;
#[derive(Debug, PartialEq)]
enum Selected<A, B> {
Left(A),
Right(B),
BothClosed,
}
// --- Non-blocking select over two channels ---
fn select<A, B>(rx1: &mpsc::Receiver<A>, rx2: &mpsc::Receiver<B>) -> Selected<A, B> {
let mut r1_closed = false;
let mut r2_closed = false;
loop {
if !r1_closed {
match rx1.try_recv() {
Ok(v) => return Selected::Left(v),
Err(TryRecvError::Disconnected) => r1_closed = true,
Err(TryRecvError::Empty) => {}
}
}
if !r2_closed {
match rx2.try_recv() {
Ok(v) => return Selected::Right(v),
Err(TryRecvError::Disconnected) => r2_closed = true,
Err(TryRecvError::Empty) => {}
}
}
if r1_closed && r2_closed {
return Selected::BothClosed;
}
thread::yield_now();
}
}
// --- Drain both channels via select, categorizing messages ---
fn select_drain(rx1: mpsc::Receiver<i32>, rx2: mpsc::Receiver<String>) -> (Vec<i32>, Vec<String>) {
let mut lefts = Vec::new();
let mut rights = Vec::new();
loop {
match select(&rx1, &rx2) {
Selected::Left(v) => lefts.push(v),
Selected::Right(v) => rights.push(v),
Selected::BothClosed => break,
}
}
(lefts, rights)
}
// --- Priority select: prefer channel 1 when both have data ---
fn priority_recv<T>(high: &mpsc::Receiver<T>, low: &mpsc::Receiver<T>) -> Option<(T, bool)> {
// true = came from high priority
match high.try_recv() {
Ok(v) => Some((v, true)),
Err(_) => low.try_recv().ok().map(|v| (v, false)),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_select_drain() {
let (tx1, rx1) = mpsc::channel::<i32>();
let (tx2, rx2) = mpsc::channel::<String>();
for i in [1, 2, 3] {
tx1.send(i).unwrap();
}
for s in ["a", "b", "c"] {
tx2.send(s.to_string()).unwrap();
}
drop(tx1);
drop(tx2);
let (mut lefts, mut rights) = select_drain(rx1, rx2);
lefts.sort();
rights.sort();
assert_eq!(lefts, vec![1, 2, 3]);
assert_eq!(rights, vec!["a", "b", "c"]);
}
#[test]
fn test_both_closed() {
let (tx1, rx1) = mpsc::channel::<i32>();
let (tx2, rx2) = mpsc::channel::<i32>();
drop(tx1);
drop(tx2);
assert_eq!(select(&rx1, &rx2), Selected::BothClosed);
}
#[test]
fn test_priority_recv() {
let (htx, hrx) = mpsc::channel::<i32>();
let (ltx, lrx) = mpsc::channel::<i32>();
htx.send(10).unwrap();
ltx.send(20).unwrap();
// High priority wins
let result = priority_recv(&hrx, &lrx);
assert_eq!(result, Some((10, true)));
// Now only low available
let result2 = priority_recv(&hrx, &lrx);
assert_eq!(result2, Some((20, false)));
}
#[test]
fn test_select_empty_left() {
let (_tx1, rx1) = mpsc::channel::<i32>();
let (tx2, rx2) = mpsc::channel::<i32>();
tx2.send(99).unwrap();
drop(tx2);
// rx1 never closes so we'll get Right(99) first
assert_eq!(select(&rx1, &rx2), Selected::Right(99));
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_select_drain() {
let (tx1, rx1) = mpsc::channel::<i32>();
let (tx2, rx2) = mpsc::channel::<String>();
for i in [1, 2, 3] {
tx1.send(i).unwrap();
}
for s in ["a", "b", "c"] {
tx2.send(s.to_string()).unwrap();
}
drop(tx1);
drop(tx2);
let (mut lefts, mut rights) = select_drain(rx1, rx2);
lefts.sort();
rights.sort();
assert_eq!(lefts, vec![1, 2, 3]);
assert_eq!(rights, vec!["a", "b", "c"]);
}
#[test]
fn test_both_closed() {
let (tx1, rx1) = mpsc::channel::<i32>();
let (tx2, rx2) = mpsc::channel::<i32>();
drop(tx1);
drop(tx2);
assert_eq!(select(&rx1, &rx2), Selected::BothClosed);
}
#[test]
fn test_priority_recv() {
let (htx, hrx) = mpsc::channel::<i32>();
let (ltx, lrx) = mpsc::channel::<i32>();
htx.send(10).unwrap();
ltx.send(20).unwrap();
// High priority wins
let result = priority_recv(&hrx, &lrx);
assert_eq!(result, Some((10, true)));
// Now only low available
let result2 = priority_recv(&hrx, &lrx);
assert_eq!(result2, Some((20, false)));
}
#[test]
fn test_select_empty_left() {
let (_tx1, rx1) = mpsc::channel::<i32>();
let (tx2, rx2) = mpsc::channel::<i32>();
tx2.send(99).unwrap();
drop(tx2);
// rx1 never closes so we'll get Right(99) first
assert_eq!(select(&rx1, &rx2), Selected::Right(99));
}
}
Deep Comparison
Select Pattern — Comparison
Core Insight
select picks from whichever channel is ready first. In async runtimes this is a syscall (epoll/kqueue); in pure std Rust we spin with try_recv + yield_now. In OCaml, Lwt.pick or Event.select provides similar semantics.
OCaml Approach
Lwt.pick [p1; p2] returns the first promise to resolve, cancels othersEvent.select [ev1; ev2] for Thread/Event moduletry_receive — must use Thread.create + timeout trickstry_recv simulation: poll with Unix.select for I/O eventsRust Approach
rx.try_recv() is non-blocking: Ok(v) | Err(Empty) | Err(Disconnected)yield_now() when all emptycrossbeam::select! macro (external crate)tokio::select! or futures::select!Comparison Table
| Concept | OCaml | Rust (std) |
|---|---|---|
| Select first ready | Lwt.pick [p1; p2] | try_recv spin loop |
| Non-blocking recv | No built-in (use timeout) | rx.try_recv() |
| Distinguish sources | Pattern match on promise list | Match on (r1, r2) tuples |
| Cancel others | Lwt.pick cancels losers | Just ignore other channels |
| Priority channels | Not built-in | Check high first in loop |
| Efficient (no spin) | Lwt.pick event-driven | crossbeam::select! (external) |
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
select to work with three channels using a three-way enum Selected3<A, B, C>.Selected::Timeout if no message arrives within a given Duration.select_all<T>(receivers: &[Receiver<T>]) -> Option<T> that returns the first available value from any channel in a list.select using crossbeam::select! and compare the implementation complexity.select over N channels and process the first available item.