ExamplesBy LevelBy TopicLearning Paths
325 Advanced

325: Racing Futures with select!

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "325: Racing Futures with select!" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Sometimes you want the first result from multiple concurrent operations — a timeout competing with an operation, querying multiple replicas and using the fastest response, or cancelling work when a stop signal arrives. Key difference from OCaml: 1. **Cancellation**: Rust's `select!` drops (cancels) unfinished futures when one completes; Lwt's `pick` actively cancels losers.

Tutorial

The Problem

Sometimes you want the first result from multiple concurrent operations — a timeout competing with an operation, querying multiple replicas and using the fastest response, or cancelling work when a stop signal arrives. The select! macro (in tokio or futures) polls multiple futures and returns when the first one completes, cancelling the others. This is the fundamental tool for implementing timeouts, fallbacks, and cancellation in async code.

🎯 Learning Outcomes

  • • Understand select! as polling multiple futures and returning on first completion
  • • Distinguish select! (first wins) from join! (all must complete)
  • • Implement racing with timeouts as a common select! pattern
  • • Recognize that unfinished futures in select! are dropped (cancelled)
  • Code Example

    fn race<T: Send + 'static>(
        tasks: Vec<(Box<dyn FnOnce()->T+Send>, &'static str)>
    ) -> (&'static str, T) {
        let (tx, rx) = mpsc::channel();
        for (f, label) in tasks {
            let tx = tx.clone();
            thread::spawn(move || { let _ = tx.send((label, f())); });
        }
        rx.recv().unwrap()
    }

    Key Differences

  • Cancellation: Rust's select! drops (cancels) unfinished futures when one completes; Lwt's pick actively cancels losers.
  • Macro syntax: tokio::select! uses Rust macro syntax with pattern = future => body arms; Lwt's pick is a regular function.
  • Non-determinism: When multiple futures complete simultaneously, select! chooses one biased toward the first arm by default; tokio::select! { biased; } makes this explicit.
  • Timeout pattern: tokio::time::timeout(dur, future) is a specialized select! for adding a deadline to any future.
  • OCaml Approach

    OCaml's Lwt.pick takes a list of promises and returns the first to resolve, cancelling the others:

    (* Lwt.pick: first to resolve wins, others are cancelled *)
    let* result = Lwt.pick [
      Lwt.map (fun v -> `Result v) (fetch ());
      Lwt.map (fun () -> `Timeout) (Lwt_unix.sleep timeout_secs);
    ]
    

    Full Source

    #![allow(clippy::all)]
    //! # Racing Futures with select!
    //!
    //! Demonstrates racing multiple tasks where the first one to complete wins
    //! and others are discarded. Includes timeout patterns.
    
    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    /// Race multiple labeled tasks. Returns the label and result of the first to complete.
    pub fn race<T>(tasks: Vec<(&'static str, Box<dyn FnOnce() -> T + Send>)>) -> (&'static str, T)
    where
        T: Send + 'static,
    {
        let (tx, rx) = mpsc::channel();
    
        for (label, f) in tasks {
            let tx = tx.clone();
            thread::spawn(move || {
                let _ = tx.send((label, f()));
            });
        }
    
        rx.recv().expect("all senders dropped")
    }
    
    /// Race tasks without labels.
    pub fn race_anonymous<T>(tasks: Vec<Box<dyn FnOnce() -> T + Send>>) -> T
    where
        T: Send + 'static,
    {
        let (tx, rx) = mpsc::channel();
    
        for f in tasks {
            let tx = tx.clone();
            thread::spawn(move || {
                let _ = tx.send(f());
            });
        }
    
        rx.recv().expect("all senders dropped")
    }
    
    /// Run a task with a timeout. Returns None if the timeout fires first.
    pub fn with_timeout<T>(f: Box<dyn FnOnce() -> T + Send>, timeout_ms: u64) -> Option<T>
    where
        T: Send + 'static,
    {
        let (tx, rx) = mpsc::channel();
    
        thread::spawn(move || {
            let _ = tx.send(f());
        });
    
        rx.recv_timeout(Duration::from_millis(timeout_ms)).ok()
    }
    
    /// Run a task with a timeout, returning a Result with a descriptive error.
    pub fn with_timeout_result<T>(
        f: Box<dyn FnOnce() -> T + Send>,
        timeout_ms: u64,
    ) -> Result<T, TimeoutError>
    where
        T: Send + 'static,
    {
        let (tx, rx) = mpsc::channel();
    
        thread::spawn(move || {
            let _ = tx.send(f());
        });
    
        rx.recv_timeout(Duration::from_millis(timeout_ms))
            .map_err(|_| TimeoutError { timeout_ms })
    }
    
    /// Error type for timeout operations.
    #[derive(Debug, Clone, PartialEq, Eq)]
    pub struct TimeoutError {
        pub timeout_ms: u64,
    }
    
    impl std::fmt::Display for TimeoutError {
        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
            write!(f, "operation timed out after {}ms", self.timeout_ms)
        }
    }
    
    impl std::error::Error for TimeoutError {}
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_race_fastest_wins() {
            let tasks: Vec<(&'static str, Box<dyn FnOnce() -> i32 + Send>)> = vec![
                (
                    "slow",
                    Box::new(|| {
                        thread::sleep(Duration::from_millis(100));
                        1
                    }),
                ),
                (
                    "fast",
                    Box::new(|| {
                        thread::sleep(Duration::from_millis(10));
                        2
                    }),
                ),
            ];
            let (label, value) = race(tasks);
            assert_eq!(label, "fast");
            assert_eq!(value, 2);
        }
    
        #[test]
        fn test_race_anonymous_returns_first() {
            let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![
                Box::new(|| {
                    thread::sleep(Duration::from_millis(50));
                    100
                }),
                Box::new(|| {
                    thread::sleep(Duration::from_millis(5));
                    42
                }),
            ];
            let result = race_anonymous(tasks);
            assert_eq!(result, 42);
        }
    
        #[test]
        fn test_with_timeout_succeeds() {
            let result = with_timeout(
                Box::new(|| {
                    thread::sleep(Duration::from_millis(5));
                    99
                }),
                200,
            );
            assert_eq!(result, Some(99));
        }
    
        #[test]
        fn test_with_timeout_fails() {
            let result = with_timeout(
                Box::new(|| {
                    thread::sleep(Duration::from_millis(200));
                    0
                }),
                50,
            );
            assert_eq!(result, None);
        }
    
        #[test]
        fn test_with_timeout_result_error() {
            let result = with_timeout_result(
                Box::new(|| {
                    thread::sleep(Duration::from_millis(200));
                    0
                }),
                50,
            );
            assert!(result.is_err());
            assert_eq!(result.unwrap_err().timeout_ms, 50);
        }
    
        #[test]
        fn test_timeout_error_display() {
            let err = TimeoutError { timeout_ms: 100 };
            assert_eq!(err.to_string(), "operation timed out after 100ms");
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_race_fastest_wins() {
            let tasks: Vec<(&'static str, Box<dyn FnOnce() -> i32 + Send>)> = vec![
                (
                    "slow",
                    Box::new(|| {
                        thread::sleep(Duration::from_millis(100));
                        1
                    }),
                ),
                (
                    "fast",
                    Box::new(|| {
                        thread::sleep(Duration::from_millis(10));
                        2
                    }),
                ),
            ];
            let (label, value) = race(tasks);
            assert_eq!(label, "fast");
            assert_eq!(value, 2);
        }
    
        #[test]
        fn test_race_anonymous_returns_first() {
            let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![
                Box::new(|| {
                    thread::sleep(Duration::from_millis(50));
                    100
                }),
                Box::new(|| {
                    thread::sleep(Duration::from_millis(5));
                    42
                }),
            ];
            let result = race_anonymous(tasks);
            assert_eq!(result, 42);
        }
    
        #[test]
        fn test_with_timeout_succeeds() {
            let result = with_timeout(
                Box::new(|| {
                    thread::sleep(Duration::from_millis(5));
                    99
                }),
                200,
            );
            assert_eq!(result, Some(99));
        }
    
        #[test]
        fn test_with_timeout_fails() {
            let result = with_timeout(
                Box::new(|| {
                    thread::sleep(Duration::from_millis(200));
                    0
                }),
                50,
            );
            assert_eq!(result, None);
        }
    
        #[test]
        fn test_with_timeout_result_error() {
            let result = with_timeout_result(
                Box::new(|| {
                    thread::sleep(Duration::from_millis(200));
                    0
                }),
                50,
            );
            assert!(result.is_err());
            assert_eq!(result.unwrap_err().timeout_ms, 50);
        }
    
        #[test]
        fn test_timeout_error_display() {
            let err = TimeoutError { timeout_ms: 100 };
            assert_eq!(err.to_string(), "operation timed out after 100ms");
        }
    }

    Deep Comparison

    OCaml vs Rust: Select/Race Futures

    Racing Tasks

    OCaml:

    let race tasks =
      let ch = Event.new_channel () in
      List.iter (fun f ->
        ignore (Thread.create (fun () -> Event.sync (Event.send ch (f ()))) ())
      ) tasks;
      Event.sync (Event.receive ch)
    

    Rust:

    fn race<T: Send + 'static>(
        tasks: Vec<(Box<dyn FnOnce()->T+Send>, &'static str)>
    ) -> (&'static str, T) {
        let (tx, rx) = mpsc::channel();
        for (f, label) in tasks {
            let tx = tx.clone();
            thread::spawn(move || { let _ = tx.send((label, f())); });
        }
        rx.recv().unwrap()
    }
    

    Timeout Pattern

    OCaml (with Lwt):

    Lwt_unix.with_timeout 5.0 (fun () -> slow_operation ())
    

    Rust:

    fn with_timeout<T: Send + 'static>(f: Box<dyn FnOnce()->T+Send>, ms: u64) -> Option<T> {
        let (tx, rx) = mpsc::channel();
        thread::spawn(move || { let _ = tx.send(f()); });
        rx.recv_timeout(Duration::from_millis(ms)).ok()
    }
    

    Key Differences

    AspectOCamlRust
    Channel typeEvent.channelmpsc::channel
    Receive firstEvent.sync (Event.receive ch)rx.recv()
    TimeoutLwt_unix.with_timeoutrecv_timeout
    Loser cleanupGCThreads continue (can be ignored)

    Exercises

  • Implement a with_timeout<T>(f: impl FnOnce() -> T, timeout: Duration) -> Option<T> that returns None if the operation takes too long.
  • Race two "replicas" returning the same type and use the first result, ensuring both tasks are started before waiting.
  • Implement a cancellation-aware worker: the worker computes a value, but can be interrupted by a cancellation signal arriving first.
  • Open Source Repos