996 Timeout Pattern
Tutorial
The Problem
Implement timeout patterns in Rust using mpsc::recv_timeout. Run an operation in a background thread and wait for its result with a deadline; return an error if the deadline expires. Also implement a "race" pattern where N tasks run concurrently and the first result wins — the async equivalent of Lwt.pick.
🎯 Learning Outcomes
rx.recv_timeout(Duration) to wait for a channel message with a deadlinewith_timeout<T, F>(timeout, f) -> Option<T> — run f in a thread, return None on timeoutrace<T>(tasks, timeout) -> Option<T> — spawn N tasks sharing one sender, return first resultRecvTimeoutError::Timeout vs RecvTimeoutError::Disconnected in match armsAtomicBoolCode Example
#![allow(clippy::all)]
// 996: Timeout Pattern
// Rust: mpsc::recv_timeout — like OCaml's Lwt.pick with sleep
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
// --- Approach 1: recv_timeout on a channel ---
fn channel_with_timeout(delay_ms: u64, timeout_ms: u64) -> Result<i32, &'static str> {
let (tx, rx) = mpsc::channel::<i32>();
thread::spawn(move || {
thread::sleep(Duration::from_millis(delay_ms));
tx.send(42).ok(); // may fail if receiver timed out and was dropped
});
match rx.recv_timeout(Duration::from_millis(timeout_ms)) {
Ok(v) => Ok(v),
Err(mpsc::RecvTimeoutError::Timeout) => Err("timeout"),
Err(mpsc::RecvTimeoutError::Disconnected) => Err("disconnected"),
}
}
// --- Approach 2: Run any function with a timeout via thread ---
fn with_timeout<T, F>(timeout: Duration, f: F) -> Option<T>
where
T: Send + 'static,
F: FnOnce() -> T + Send + 'static,
{
let (tx, rx) = mpsc::channel::<T>();
thread::spawn(move || {
let result = f();
tx.send(result).ok();
});
rx.recv_timeout(timeout).ok()
}
// --- Approach 3: First-of-N wins (Lwt.pick analogue) ---
fn race<T: Send + 'static>(
tasks: Vec<Box<dyn FnOnce() -> T + Send + 'static>>,
timeout: Duration,
) -> Option<T> {
let (tx, rx) = mpsc::channel::<T>();
for task in tasks {
let tx = tx.clone();
thread::spawn(move || {
let result = task();
tx.send(result).ok(); // first to arrive wins
});
}
drop(tx); // close original sender
rx.recv_timeout(timeout).ok()
}
// --- Approach 4: Retry with overall deadline ---
fn retry_with_deadline<T, E, F>(
max_attempts: usize,
timeout_per_attempt: Duration,
f: F,
) -> Result<T, &'static str>
where
T: Send + 'static,
E: Send + 'static,
F: Fn() -> Result<T, E> + Send + Sync + Clone + 'static,
{
for attempt in 0..max_attempts {
let f = f.clone();
let result = with_timeout(timeout_per_attempt, f);
match result {
Some(Ok(v)) => return Ok(v),
Some(Err(_)) | None => {
if attempt + 1 < max_attempts {
thread::sleep(Duration::from_millis(1 << attempt));
}
}
}
}
Err("max attempts exceeded")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_recv_before_timeout() {
let result = channel_with_timeout(10, 500);
assert_eq!(result, Ok(42));
}
#[test]
fn test_recv_after_timeout() {
let result = channel_with_timeout(200, 20);
assert_eq!(result, Err("timeout"));
}
#[test]
fn test_with_timeout_succeeds() {
let result = with_timeout(Duration::from_millis(500), || {
thread::sleep(Duration::from_millis(5));
99i32
});
assert_eq!(result, Some(99));
}
#[test]
fn test_with_timeout_expires() {
let result = with_timeout(Duration::from_millis(5), || {
thread::sleep(Duration::from_millis(100));
99i32
});
assert_eq!(result, None);
}
#[test]
fn test_race_fastest_wins() {
let tasks: Vec<Box<dyn FnOnce() -> u32 + Send + 'static>> = vec![
Box::new(|| {
thread::sleep(Duration::from_millis(50));
1
}),
Box::new(|| {
thread::sleep(Duration::from_millis(5));
2
}),
Box::new(|| {
thread::sleep(Duration::from_millis(30));
3
}),
];
let winner = race(tasks, Duration::from_millis(200));
assert_eq!(winner, Some(2)); // fastest thread wins
}
#[test]
fn test_recv_timeout_error_types() {
let (_, rx) = mpsc::channel::<i32>();
// Disconnected immediately (no sender)
let err = rx.recv_timeout(Duration::from_millis(1));
assert!(err.is_err());
}
}Key Differences
| Aspect | Rust | OCaml |
|---|---|---|
| Timeout primitive | recv_timeout(Duration) | Lwt.pick + Lwt_unix.sleep |
| Cancellation | Thread leaks (manual AtomicBool needed) | Lwt.pick cancels losers automatically |
| Non-blocking sleep | thread::sleep blocks the OS thread | Lwt_unix.sleep is a non-blocking yield |
| Race result | First to send wins; rest silently fail | First resolved; rest are cancelled |
thread::sleep blocks the OS thread — spawning many concurrent timeouts wastes one thread each. Use tokio::time::timeout for non-blocking async timeouts in production.
OCaml Approach
open Lwt
(* Lwt.pick cancels all losers; Lwt.choose does not *)
let with_timeout duration f =
Lwt.pick [
(let* () = Lwt_unix.sleep duration in Lwt.return None);
(let* v = f () in Lwt.return (Some v));
]
let race tasks timeout =
Lwt.pick (
(let* () = Lwt_unix.sleep timeout in Lwt.return None) ::
List.map (fun f ->
let* v = f () in Lwt.return (Some v)
) tasks
)
Lwt.pick runs all promises concurrently and returns the first result, cancelling all others. Lwt_unix.sleep integrates with the Lwt scheduler for non-blocking sleep — unlike Thread.sleep which blocks the OS thread.
Full Source
#![allow(clippy::all)]
// 996: Timeout Pattern
// Rust: mpsc::recv_timeout — like OCaml's Lwt.pick with sleep
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
// --- Approach 1: recv_timeout on a channel ---
fn channel_with_timeout(delay_ms: u64, timeout_ms: u64) -> Result<i32, &'static str> {
let (tx, rx) = mpsc::channel::<i32>();
thread::spawn(move || {
thread::sleep(Duration::from_millis(delay_ms));
tx.send(42).ok(); // may fail if receiver timed out and was dropped
});
match rx.recv_timeout(Duration::from_millis(timeout_ms)) {
Ok(v) => Ok(v),
Err(mpsc::RecvTimeoutError::Timeout) => Err("timeout"),
Err(mpsc::RecvTimeoutError::Disconnected) => Err("disconnected"),
}
}
// --- Approach 2: Run any function with a timeout via thread ---
fn with_timeout<T, F>(timeout: Duration, f: F) -> Option<T>
where
T: Send + 'static,
F: FnOnce() -> T + Send + 'static,
{
let (tx, rx) = mpsc::channel::<T>();
thread::spawn(move || {
let result = f();
tx.send(result).ok();
});
rx.recv_timeout(timeout).ok()
}
// --- Approach 3: First-of-N wins (Lwt.pick analogue) ---
fn race<T: Send + 'static>(
tasks: Vec<Box<dyn FnOnce() -> T + Send + 'static>>,
timeout: Duration,
) -> Option<T> {
let (tx, rx) = mpsc::channel::<T>();
for task in tasks {
let tx = tx.clone();
thread::spawn(move || {
let result = task();
tx.send(result).ok(); // first to arrive wins
});
}
drop(tx); // close original sender
rx.recv_timeout(timeout).ok()
}
// --- Approach 4: Retry with overall deadline ---
fn retry_with_deadline<T, E, F>(
max_attempts: usize,
timeout_per_attempt: Duration,
f: F,
) -> Result<T, &'static str>
where
T: Send + 'static,
E: Send + 'static,
F: Fn() -> Result<T, E> + Send + Sync + Clone + 'static,
{
for attempt in 0..max_attempts {
let f = f.clone();
let result = with_timeout(timeout_per_attempt, f);
match result {
Some(Ok(v)) => return Ok(v),
Some(Err(_)) | None => {
if attempt + 1 < max_attempts {
thread::sleep(Duration::from_millis(1 << attempt));
}
}
}
}
Err("max attempts exceeded")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_recv_before_timeout() {
let result = channel_with_timeout(10, 500);
assert_eq!(result, Ok(42));
}
#[test]
fn test_recv_after_timeout() {
let result = channel_with_timeout(200, 20);
assert_eq!(result, Err("timeout"));
}
#[test]
fn test_with_timeout_succeeds() {
let result = with_timeout(Duration::from_millis(500), || {
thread::sleep(Duration::from_millis(5));
99i32
});
assert_eq!(result, Some(99));
}
#[test]
fn test_with_timeout_expires() {
let result = with_timeout(Duration::from_millis(5), || {
thread::sleep(Duration::from_millis(100));
99i32
});
assert_eq!(result, None);
}
#[test]
fn test_race_fastest_wins() {
let tasks: Vec<Box<dyn FnOnce() -> u32 + Send + 'static>> = vec![
Box::new(|| {
thread::sleep(Duration::from_millis(50));
1
}),
Box::new(|| {
thread::sleep(Duration::from_millis(5));
2
}),
Box::new(|| {
thread::sleep(Duration::from_millis(30));
3
}),
];
let winner = race(tasks, Duration::from_millis(200));
assert_eq!(winner, Some(2)); // fastest thread wins
}
#[test]
fn test_recv_timeout_error_types() {
let (_, rx) = mpsc::channel::<i32>();
// Disconnected immediately (no sender)
let err = rx.recv_timeout(Duration::from_millis(1));
assert!(err.is_err());
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_recv_before_timeout() {
let result = channel_with_timeout(10, 500);
assert_eq!(result, Ok(42));
}
#[test]
fn test_recv_after_timeout() {
let result = channel_with_timeout(200, 20);
assert_eq!(result, Err("timeout"));
}
#[test]
fn test_with_timeout_succeeds() {
let result = with_timeout(Duration::from_millis(500), || {
thread::sleep(Duration::from_millis(5));
99i32
});
assert_eq!(result, Some(99));
}
#[test]
fn test_with_timeout_expires() {
let result = with_timeout(Duration::from_millis(5), || {
thread::sleep(Duration::from_millis(100));
99i32
});
assert_eq!(result, None);
}
#[test]
fn test_race_fastest_wins() {
let tasks: Vec<Box<dyn FnOnce() -> u32 + Send + 'static>> = vec![
Box::new(|| {
thread::sleep(Duration::from_millis(50));
1
}),
Box::new(|| {
thread::sleep(Duration::from_millis(5));
2
}),
Box::new(|| {
thread::sleep(Duration::from_millis(30));
3
}),
];
let winner = race(tasks, Duration::from_millis(200));
assert_eq!(winner, Some(2)); // fastest thread wins
}
#[test]
fn test_recv_timeout_error_types() {
let (_, rx) = mpsc::channel::<i32>();
// Disconnected immediately (no sender)
let err = rx.recv_timeout(Duration::from_millis(1));
assert!(err.is_err());
}
}
Deep Comparison
Timeout Pattern — Comparison
Core Insight
Timeouts express "I'd rather fail fast than wait forever." In OCaml's Lwt: Lwt.pick [op; sleep] races two promises and takes the first. In Rust std: recv_timeout is the primitive, or wrap in a thread for arbitrary operations.
OCaml Approach
Lwt.pick [operation; Lwt_unix.sleep timeout] — cancels the loserCondition.wait with deadlineUnix.gettimeofday for wall-clock deadline trackingRust Approach
rx.recv_timeout(Duration) → Result<T, RecvTimeoutError>RecvTimeoutError::Timeout vs RecvTimeoutError::Disconnectedwith_timeout(dur, f) pattern: spawn thread, recv_timeout, discard handlerace(tasks, timeout) for "first-of-N" / Lwt.pick over multiple computationsComparison Table
| Concept | OCaml (Lwt) | Rust |
|---|---|---|
| Timeout primitive | Lwt_unix.sleep t | rx.recv_timeout(Duration::from_millis(t)) |
| Race two futures | Lwt.pick [f; sleep t] | race([task], timeout) |
| Timeout result | exception or None | Err(RecvTimeoutError::Timeout) |
| Cancellation | Lwt cancels the losing promise | Thread keeps running (can't kill) |
| Timed channel recv | Manual Condition.wait with deadline | rx.recv_timeout(dur) |
| Wrap arbitrary work | Lwt.wrap (fun () -> ...) | with_timeout(dur, || f()) |
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
with_cancel<T, F>(cancel: Arc<AtomicBool>, f: F) -> Option<T> — f checks cancel periodically and stops early.retry_with_timeout(n, timeout, f) — retry up to n times, each attempt capped at timeout.first_k_of_n<T>(tasks, k, timeout) -> Vec<T> — collect the first k results.with_timeout using tokio::time::timeout and compare implementation complexity.with_timeout calls using threads vs using tokio::spawn + tokio::time::timeout.