ExamplesBy LevelBy TopicLearning Paths
996 Advanced

996 Timeout Pattern

Functional Programming

Tutorial

The Problem

Implement timeout patterns in Rust using mpsc::recv_timeout. Run an operation in a background thread and wait for its result with a deadline; return an error if the deadline expires. Also implement a "race" pattern where N tasks run concurrently and the first result wins — the async equivalent of Lwt.pick.

🎯 Learning Outcomes

  • • Use rx.recv_timeout(Duration) to wait for a channel message with a deadline
  • • Implement with_timeout<T, F>(timeout, f) -> Option<T> — run f in a thread, return None on timeout
  • • Implement race<T>(tasks, timeout) -> Option<T> — spawn N tasks sharing one sender, return first result
  • • Handle RecvTimeoutError::Timeout vs RecvTimeoutError::Disconnected in match arms
  • • Understand why leftover threads continue running after timeout — graceful cancellation requires AtomicBool
  • Code Example

    #![allow(clippy::all)]
    // 996: Timeout Pattern
    // Rust: mpsc::recv_timeout — like OCaml's Lwt.pick with sleep
    
    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    // --- Approach 1: recv_timeout on a channel ---
    fn channel_with_timeout(delay_ms: u64, timeout_ms: u64) -> Result<i32, &'static str> {
        let (tx, rx) = mpsc::channel::<i32>();
    
        thread::spawn(move || {
            thread::sleep(Duration::from_millis(delay_ms));
            tx.send(42).ok(); // may fail if receiver timed out and was dropped
        });
    
        match rx.recv_timeout(Duration::from_millis(timeout_ms)) {
            Ok(v) => Ok(v),
            Err(mpsc::RecvTimeoutError::Timeout) => Err("timeout"),
            Err(mpsc::RecvTimeoutError::Disconnected) => Err("disconnected"),
        }
    }
    
    // --- Approach 2: Run any function with a timeout via thread ---
    fn with_timeout<T, F>(timeout: Duration, f: F) -> Option<T>
    where
        T: Send + 'static,
        F: FnOnce() -> T + Send + 'static,
    {
        let (tx, rx) = mpsc::channel::<T>();
        thread::spawn(move || {
            let result = f();
            tx.send(result).ok();
        });
        rx.recv_timeout(timeout).ok()
    }
    
    // --- Approach 3: First-of-N wins (Lwt.pick analogue) ---
    fn race<T: Send + 'static>(
        tasks: Vec<Box<dyn FnOnce() -> T + Send + 'static>>,
        timeout: Duration,
    ) -> Option<T> {
        let (tx, rx) = mpsc::channel::<T>();
    
        for task in tasks {
            let tx = tx.clone();
            thread::spawn(move || {
                let result = task();
                tx.send(result).ok(); // first to arrive wins
            });
        }
        drop(tx); // close original sender
    
        rx.recv_timeout(timeout).ok()
    }
    
    // --- Approach 4: Retry with overall deadline ---
    fn retry_with_deadline<T, E, F>(
        max_attempts: usize,
        timeout_per_attempt: Duration,
        f: F,
    ) -> Result<T, &'static str>
    where
        T: Send + 'static,
        E: Send + 'static,
        F: Fn() -> Result<T, E> + Send + Sync + Clone + 'static,
    {
        for attempt in 0..max_attempts {
            let f = f.clone();
            let result = with_timeout(timeout_per_attempt, f);
            match result {
                Some(Ok(v)) => return Ok(v),
                Some(Err(_)) | None => {
                    if attempt + 1 < max_attempts {
                        thread::sleep(Duration::from_millis(1 << attempt));
                    }
                }
            }
        }
        Err("max attempts exceeded")
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_recv_before_timeout() {
            let result = channel_with_timeout(10, 500);
            assert_eq!(result, Ok(42));
        }
    
        #[test]
        fn test_recv_after_timeout() {
            let result = channel_with_timeout(200, 20);
            assert_eq!(result, Err("timeout"));
        }
    
        #[test]
        fn test_with_timeout_succeeds() {
            let result = with_timeout(Duration::from_millis(500), || {
                thread::sleep(Duration::from_millis(5));
                99i32
            });
            assert_eq!(result, Some(99));
        }
    
        #[test]
        fn test_with_timeout_expires() {
            let result = with_timeout(Duration::from_millis(5), || {
                thread::sleep(Duration::from_millis(100));
                99i32
            });
            assert_eq!(result, None);
        }
    
        #[test]
        fn test_race_fastest_wins() {
            let tasks: Vec<Box<dyn FnOnce() -> u32 + Send + 'static>> = vec![
                Box::new(|| {
                    thread::sleep(Duration::from_millis(50));
                    1
                }),
                Box::new(|| {
                    thread::sleep(Duration::from_millis(5));
                    2
                }),
                Box::new(|| {
                    thread::sleep(Duration::from_millis(30));
                    3
                }),
            ];
            let winner = race(tasks, Duration::from_millis(200));
            assert_eq!(winner, Some(2)); // fastest thread wins
        }
    
        #[test]
        fn test_recv_timeout_error_types() {
            let (_, rx) = mpsc::channel::<i32>();
            // Disconnected immediately (no sender)
            let err = rx.recv_timeout(Duration::from_millis(1));
            assert!(err.is_err());
        }
    }

    Key Differences

    AspectRustOCaml
    Timeout primitiverecv_timeout(Duration)Lwt.pick + Lwt_unix.sleep
    CancellationThread leaks (manual AtomicBool needed)Lwt.pick cancels losers automatically
    Non-blocking sleepthread::sleep blocks the OS threadLwt_unix.sleep is a non-blocking yield
    Race resultFirst to send wins; rest silently failFirst resolved; rest are cancelled

    thread::sleep blocks the OS thread — spawning many concurrent timeouts wastes one thread each. Use tokio::time::timeout for non-blocking async timeouts in production.

    OCaml Approach

    open Lwt
    
    (* Lwt.pick cancels all losers; Lwt.choose does not *)
    let with_timeout duration f =
      Lwt.pick [
        (let* () = Lwt_unix.sleep duration in Lwt.return None);
        (let* v = f () in Lwt.return (Some v));
      ]
    
    let race tasks timeout =
      Lwt.pick (
        (let* () = Lwt_unix.sleep timeout in Lwt.return None) ::
        List.map (fun f ->
          let* v = f () in Lwt.return (Some v)
        ) tasks
      )
    

    Lwt.pick runs all promises concurrently and returns the first result, cancelling all others. Lwt_unix.sleep integrates with the Lwt scheduler for non-blocking sleep — unlike Thread.sleep which blocks the OS thread.

    Full Source

    #![allow(clippy::all)]
    // 996: Timeout Pattern
    // Rust: mpsc::recv_timeout — like OCaml's Lwt.pick with sleep
    
    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    // --- Approach 1: recv_timeout on a channel ---
    fn channel_with_timeout(delay_ms: u64, timeout_ms: u64) -> Result<i32, &'static str> {
        let (tx, rx) = mpsc::channel::<i32>();
    
        thread::spawn(move || {
            thread::sleep(Duration::from_millis(delay_ms));
            tx.send(42).ok(); // may fail if receiver timed out and was dropped
        });
    
        match rx.recv_timeout(Duration::from_millis(timeout_ms)) {
            Ok(v) => Ok(v),
            Err(mpsc::RecvTimeoutError::Timeout) => Err("timeout"),
            Err(mpsc::RecvTimeoutError::Disconnected) => Err("disconnected"),
        }
    }
    
    // --- Approach 2: Run any function with a timeout via thread ---
    fn with_timeout<T, F>(timeout: Duration, f: F) -> Option<T>
    where
        T: Send + 'static,
        F: FnOnce() -> T + Send + 'static,
    {
        let (tx, rx) = mpsc::channel::<T>();
        thread::spawn(move || {
            let result = f();
            tx.send(result).ok();
        });
        rx.recv_timeout(timeout).ok()
    }
    
    // --- Approach 3: First-of-N wins (Lwt.pick analogue) ---
    fn race<T: Send + 'static>(
        tasks: Vec<Box<dyn FnOnce() -> T + Send + 'static>>,
        timeout: Duration,
    ) -> Option<T> {
        let (tx, rx) = mpsc::channel::<T>();
    
        for task in tasks {
            let tx = tx.clone();
            thread::spawn(move || {
                let result = task();
                tx.send(result).ok(); // first to arrive wins
            });
        }
        drop(tx); // close original sender
    
        rx.recv_timeout(timeout).ok()
    }
    
    // --- Approach 4: Retry with overall deadline ---
    fn retry_with_deadline<T, E, F>(
        max_attempts: usize,
        timeout_per_attempt: Duration,
        f: F,
    ) -> Result<T, &'static str>
    where
        T: Send + 'static,
        E: Send + 'static,
        F: Fn() -> Result<T, E> + Send + Sync + Clone + 'static,
    {
        for attempt in 0..max_attempts {
            let f = f.clone();
            let result = with_timeout(timeout_per_attempt, f);
            match result {
                Some(Ok(v)) => return Ok(v),
                Some(Err(_)) | None => {
                    if attempt + 1 < max_attempts {
                        thread::sleep(Duration::from_millis(1 << attempt));
                    }
                }
            }
        }
        Err("max attempts exceeded")
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_recv_before_timeout() {
            let result = channel_with_timeout(10, 500);
            assert_eq!(result, Ok(42));
        }
    
        #[test]
        fn test_recv_after_timeout() {
            let result = channel_with_timeout(200, 20);
            assert_eq!(result, Err("timeout"));
        }
    
        #[test]
        fn test_with_timeout_succeeds() {
            let result = with_timeout(Duration::from_millis(500), || {
                thread::sleep(Duration::from_millis(5));
                99i32
            });
            assert_eq!(result, Some(99));
        }
    
        #[test]
        fn test_with_timeout_expires() {
            let result = with_timeout(Duration::from_millis(5), || {
                thread::sleep(Duration::from_millis(100));
                99i32
            });
            assert_eq!(result, None);
        }
    
        #[test]
        fn test_race_fastest_wins() {
            let tasks: Vec<Box<dyn FnOnce() -> u32 + Send + 'static>> = vec![
                Box::new(|| {
                    thread::sleep(Duration::from_millis(50));
                    1
                }),
                Box::new(|| {
                    thread::sleep(Duration::from_millis(5));
                    2
                }),
                Box::new(|| {
                    thread::sleep(Duration::from_millis(30));
                    3
                }),
            ];
            let winner = race(tasks, Duration::from_millis(200));
            assert_eq!(winner, Some(2)); // fastest thread wins
        }
    
        #[test]
        fn test_recv_timeout_error_types() {
            let (_, rx) = mpsc::channel::<i32>();
            // Disconnected immediately (no sender)
            let err = rx.recv_timeout(Duration::from_millis(1));
            assert!(err.is_err());
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_recv_before_timeout() {
            let result = channel_with_timeout(10, 500);
            assert_eq!(result, Ok(42));
        }
    
        #[test]
        fn test_recv_after_timeout() {
            let result = channel_with_timeout(200, 20);
            assert_eq!(result, Err("timeout"));
        }
    
        #[test]
        fn test_with_timeout_succeeds() {
            let result = with_timeout(Duration::from_millis(500), || {
                thread::sleep(Duration::from_millis(5));
                99i32
            });
            assert_eq!(result, Some(99));
        }
    
        #[test]
        fn test_with_timeout_expires() {
            let result = with_timeout(Duration::from_millis(5), || {
                thread::sleep(Duration::from_millis(100));
                99i32
            });
            assert_eq!(result, None);
        }
    
        #[test]
        fn test_race_fastest_wins() {
            let tasks: Vec<Box<dyn FnOnce() -> u32 + Send + 'static>> = vec![
                Box::new(|| {
                    thread::sleep(Duration::from_millis(50));
                    1
                }),
                Box::new(|| {
                    thread::sleep(Duration::from_millis(5));
                    2
                }),
                Box::new(|| {
                    thread::sleep(Duration::from_millis(30));
                    3
                }),
            ];
            let winner = race(tasks, Duration::from_millis(200));
            assert_eq!(winner, Some(2)); // fastest thread wins
        }
    
        #[test]
        fn test_recv_timeout_error_types() {
            let (_, rx) = mpsc::channel::<i32>();
            // Disconnected immediately (no sender)
            let err = rx.recv_timeout(Duration::from_millis(1));
            assert!(err.is_err());
        }
    }

    Deep Comparison

    Timeout Pattern — Comparison

    Core Insight

    Timeouts express "I'd rather fail fast than wait forever." In OCaml's Lwt: Lwt.pick [op; sleep] races two promises and takes the first. In Rust std: recv_timeout is the primitive, or wrap in a thread for arbitrary operations.

    OCaml Approach

  • Lwt.pick [operation; Lwt_unix.sleep timeout] — cancels the loser
  • • Thread-based: spawn worker, timed Condition.wait with deadline
  • • OCaml cannot kill threads — worker keeps running after "timeout"
  • Unix.gettimeofday for wall-clock deadline tracking
  • Rust Approach

  • rx.recv_timeout(Duration)Result<T, RecvTimeoutError>
  • RecvTimeoutError::Timeout vs RecvTimeoutError::Disconnected
  • with_timeout(dur, f) pattern: spawn thread, recv_timeout, discard handle
  • • The "lost" thread keeps running but its channel is dropped — no cleanup needed
  • race(tasks, timeout) for "first-of-N" / Lwt.pick over multiple computations
  • Comparison Table

    ConceptOCaml (Lwt)Rust
    Timeout primitiveLwt_unix.sleep trx.recv_timeout(Duration::from_millis(t))
    Race two futuresLwt.pick [f; sleep t]race([task], timeout)
    Timeout resultexception or NoneErr(RecvTimeoutError::Timeout)
    CancellationLwt cancels the losing promiseThread keeps running (can't kill)
    Timed channel recvManual Condition.wait with deadlinerx.recv_timeout(dur)
    Wrap arbitrary workLwt.wrap (fun () -> ...)with_timeout(dur, || f())

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Implement with_cancel<T, F>(cancel: Arc<AtomicBool>, f: F) -> Option<T>f checks cancel periodically and stops early.
  • Implement retry_with_timeout(n, timeout, f) — retry up to n times, each attempt capped at timeout.
  • Implement first_k_of_n<T>(tasks, k, timeout) -> Vec<T> — collect the first k results.
  • Rewrite with_timeout using tokio::time::timeout and compare implementation complexity.
  • Benchmark: 100 concurrent with_timeout calls using threads vs using tokio::spawn + tokio::time::timeout.
  • Open Source Repos