325: Racing Futures with select!
Tutorial Video
Text description (accessibility)
This video demonstrates the "325: Racing Futures with select!" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Sometimes you want the first result from multiple concurrent operations — a timeout competing with an operation, querying multiple replicas and using the fastest response, or cancelling work when a stop signal arrives. Key difference from OCaml: 1. **Cancellation**: Rust's `select!` drops (cancels) unfinished futures when one completes; Lwt's `pick` actively cancels losers.
Tutorial
The Problem
Sometimes you want the first result from multiple concurrent operations — a timeout competing with an operation, querying multiple replicas and using the fastest response, or cancelling work when a stop signal arrives. The select! macro (in tokio or futures) polls multiple futures and returns when the first one completes, cancelling the others. This is the fundamental tool for implementing timeouts, fallbacks, and cancellation in async code.
🎯 Learning Outcomes
select! as polling multiple futures and returning on first completionselect! (first wins) from join! (all must complete)select! patternselect! are dropped (cancelled)Code Example
fn race<T: Send + 'static>(
tasks: Vec<(Box<dyn FnOnce()->T+Send>, &'static str)>
) -> (&'static str, T) {
let (tx, rx) = mpsc::channel();
for (f, label) in tasks {
let tx = tx.clone();
thread::spawn(move || { let _ = tx.send((label, f())); });
}
rx.recv().unwrap()
}Key Differences
select! drops (cancels) unfinished futures when one completes; Lwt's pick actively cancels losers.tokio::select! uses Rust macro syntax with pattern = future => body arms; Lwt's pick is a regular function.select! chooses one biased toward the first arm by default; tokio::select! { biased; } makes this explicit.tokio::time::timeout(dur, future) is a specialized select! for adding a deadline to any future.OCaml Approach
OCaml's Lwt.pick takes a list of promises and returns the first to resolve, cancelling the others:
(* Lwt.pick: first to resolve wins, others are cancelled *)
let* result = Lwt.pick [
Lwt.map (fun v -> `Result v) (fetch ());
Lwt.map (fun () -> `Timeout) (Lwt_unix.sleep timeout_secs);
]
Full Source
#![allow(clippy::all)]
//! # Racing Futures with select!
//!
//! Demonstrates racing multiple tasks where the first one to complete wins
//! and others are discarded. Includes timeout patterns.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
/// Race multiple labeled tasks. Returns the label and result of the first to complete.
pub fn race<T>(tasks: Vec<(&'static str, Box<dyn FnOnce() -> T + Send>)>) -> (&'static str, T)
where
T: Send + 'static,
{
let (tx, rx) = mpsc::channel();
for (label, f) in tasks {
let tx = tx.clone();
thread::spawn(move || {
let _ = tx.send((label, f()));
});
}
rx.recv().expect("all senders dropped")
}
/// Race tasks without labels.
pub fn race_anonymous<T>(tasks: Vec<Box<dyn FnOnce() -> T + Send>>) -> T
where
T: Send + 'static,
{
let (tx, rx) = mpsc::channel();
for f in tasks {
let tx = tx.clone();
thread::spawn(move || {
let _ = tx.send(f());
});
}
rx.recv().expect("all senders dropped")
}
/// Run a task with a timeout. Returns None if the timeout fires first.
pub fn with_timeout<T>(f: Box<dyn FnOnce() -> T + Send>, timeout_ms: u64) -> Option<T>
where
T: Send + 'static,
{
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let _ = tx.send(f());
});
rx.recv_timeout(Duration::from_millis(timeout_ms)).ok()
}
/// Run a task with a timeout, returning a Result with a descriptive error.
pub fn with_timeout_result<T>(
f: Box<dyn FnOnce() -> T + Send>,
timeout_ms: u64,
) -> Result<T, TimeoutError>
where
T: Send + 'static,
{
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let _ = tx.send(f());
});
rx.recv_timeout(Duration::from_millis(timeout_ms))
.map_err(|_| TimeoutError { timeout_ms })
}
/// Error type for timeout operations.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TimeoutError {
pub timeout_ms: u64,
}
impl std::fmt::Display for TimeoutError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "operation timed out after {}ms", self.timeout_ms)
}
}
impl std::error::Error for TimeoutError {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_race_fastest_wins() {
let tasks: Vec<(&'static str, Box<dyn FnOnce() -> i32 + Send>)> = vec![
(
"slow",
Box::new(|| {
thread::sleep(Duration::from_millis(100));
1
}),
),
(
"fast",
Box::new(|| {
thread::sleep(Duration::from_millis(10));
2
}),
),
];
let (label, value) = race(tasks);
assert_eq!(label, "fast");
assert_eq!(value, 2);
}
#[test]
fn test_race_anonymous_returns_first() {
let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![
Box::new(|| {
thread::sleep(Duration::from_millis(50));
100
}),
Box::new(|| {
thread::sleep(Duration::from_millis(5));
42
}),
];
let result = race_anonymous(tasks);
assert_eq!(result, 42);
}
#[test]
fn test_with_timeout_succeeds() {
let result = with_timeout(
Box::new(|| {
thread::sleep(Duration::from_millis(5));
99
}),
200,
);
assert_eq!(result, Some(99));
}
#[test]
fn test_with_timeout_fails() {
let result = with_timeout(
Box::new(|| {
thread::sleep(Duration::from_millis(200));
0
}),
50,
);
assert_eq!(result, None);
}
#[test]
fn test_with_timeout_result_error() {
let result = with_timeout_result(
Box::new(|| {
thread::sleep(Duration::from_millis(200));
0
}),
50,
);
assert!(result.is_err());
assert_eq!(result.unwrap_err().timeout_ms, 50);
}
#[test]
fn test_timeout_error_display() {
let err = TimeoutError { timeout_ms: 100 };
assert_eq!(err.to_string(), "operation timed out after 100ms");
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_race_fastest_wins() {
let tasks: Vec<(&'static str, Box<dyn FnOnce() -> i32 + Send>)> = vec![
(
"slow",
Box::new(|| {
thread::sleep(Duration::from_millis(100));
1
}),
),
(
"fast",
Box::new(|| {
thread::sleep(Duration::from_millis(10));
2
}),
),
];
let (label, value) = race(tasks);
assert_eq!(label, "fast");
assert_eq!(value, 2);
}
#[test]
fn test_race_anonymous_returns_first() {
let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![
Box::new(|| {
thread::sleep(Duration::from_millis(50));
100
}),
Box::new(|| {
thread::sleep(Duration::from_millis(5));
42
}),
];
let result = race_anonymous(tasks);
assert_eq!(result, 42);
}
#[test]
fn test_with_timeout_succeeds() {
let result = with_timeout(
Box::new(|| {
thread::sleep(Duration::from_millis(5));
99
}),
200,
);
assert_eq!(result, Some(99));
}
#[test]
fn test_with_timeout_fails() {
let result = with_timeout(
Box::new(|| {
thread::sleep(Duration::from_millis(200));
0
}),
50,
);
assert_eq!(result, None);
}
#[test]
fn test_with_timeout_result_error() {
let result = with_timeout_result(
Box::new(|| {
thread::sleep(Duration::from_millis(200));
0
}),
50,
);
assert!(result.is_err());
assert_eq!(result.unwrap_err().timeout_ms, 50);
}
#[test]
fn test_timeout_error_display() {
let err = TimeoutError { timeout_ms: 100 };
assert_eq!(err.to_string(), "operation timed out after 100ms");
}
}
Deep Comparison
OCaml vs Rust: Select/Race Futures
Racing Tasks
OCaml:
let race tasks =
let ch = Event.new_channel () in
List.iter (fun f ->
ignore (Thread.create (fun () -> Event.sync (Event.send ch (f ()))) ())
) tasks;
Event.sync (Event.receive ch)
Rust:
fn race<T: Send + 'static>(
tasks: Vec<(Box<dyn FnOnce()->T+Send>, &'static str)>
) -> (&'static str, T) {
let (tx, rx) = mpsc::channel();
for (f, label) in tasks {
let tx = tx.clone();
thread::spawn(move || { let _ = tx.send((label, f())); });
}
rx.recv().unwrap()
}
Timeout Pattern
OCaml (with Lwt):
Lwt_unix.with_timeout 5.0 (fun () -> slow_operation ())
Rust:
fn with_timeout<T: Send + 'static>(f: Box<dyn FnOnce()->T+Send>, ms: u64) -> Option<T> {
let (tx, rx) = mpsc::channel();
thread::spawn(move || { let _ = tx.send(f()); });
rx.recv_timeout(Duration::from_millis(ms)).ok()
}
Key Differences
| Aspect | OCaml | Rust |
|---|---|---|
| Channel type | Event.channel | mpsc::channel |
| Receive first | Event.sync (Event.receive ch) | rx.recv() |
| Timeout | Lwt_unix.with_timeout | recv_timeout |
| Loser cleanup | GC | Threads continue (can be ignored) |
Exercises
with_timeout<T>(f: impl FnOnce() -> T, timeout: Duration) -> Option<T> that returns None if the operation takes too long.