ExamplesBy LevelBy TopicLearning Paths
331 Advanced

331: Timeouts with time::timeout

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "331: Timeouts with time::timeout" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Network operations that hang indefinitely freeze applications. Key difference from OCaml: 1. **Structured timeout error**: `TimeoutError<E>` preserves the operation's error type; raw timeout functions often just use string errors.

Tutorial

The Problem

Network operations that hang indefinitely freeze applications. A DNS lookup, database query, or HTTP request that never responds must be bounded by a deadline. Timeouts are the fundamental resilience mechanism for distributed systems — every external call should have one. The TimeoutError<E> pattern distinguishes operation failures (the operation ran but failed) from timeout failures (the deadline expired), enabling different recovery strategies for each.

🎯 Learning Outcomes

  • • Implement a TimeoutError<E> type with Elapsed and TaskFailed(E) variants
  • • Use mpsc::channel with a recv deadline to implement synchronous timeouts
  • • Distinguish timeout (deadline expired) from task failure (operation failed with error)
  • • Recognize the tokio::time::timeout(dur, future) pattern for async timeouts
  • Code Example

    fn with_timeout<T>(timeout: Duration, f: impl FnOnce() -> Result<T, E>) -> Result<T, TimeoutError<E>> {
        let (tx, rx) = mpsc::channel();
        thread::spawn(move || { let _ = tx.send(f()); });
        rx.recv_timeout(timeout)
    }

    Key Differences

  • Structured timeout error: TimeoutError<E> preserves the operation's error type; raw timeout functions often just use string errors.
  • Cancellation: Rust's thread-based timeout doesn't cancel the spawned thread (it continues); tokio::time::timeout genuinely cancels the future.
  • Cascading timeouts: In distributed systems, outer timeouts should be smaller than the sum of inner ones — a common design error.
  • Production: Every reqwest, sqlx, and tokio operation should have a timeout — ungated external calls are a reliability risk.
  • OCaml Approach

    OCaml's Lwt_unix.with_timeout provides timeout functionality. In Async, Clock.with_timeout serves the same purpose:

    let* result =
      Lwt_unix.with_timeout 5.0 (fun () -> perform_operation ())
    (* Returns Error `Timeout on expiry, propagates other errors *)
    

    Full Source

    #![allow(clippy::all)]
    //! # Timeouts with time::timeout
    //!
    //! Wrap any async operation with a deadline — if it doesn't complete in time,
    //! get a structured error instead of waiting forever.
    
    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    /// Error type distinguishing timeout from operation failure.
    #[derive(Debug, Clone, PartialEq, Eq)]
    pub enum TimeoutError<E> {
        /// Operation took too long
        Elapsed,
        /// Operation ran but returned an error
        TaskFailed(E),
    }
    
    impl<E: std::fmt::Display> std::fmt::Display for TimeoutError<E> {
        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
            match self {
                Self::Elapsed => write!(f, "operation timed out"),
                Self::TaskFailed(e) => write!(f, "task failed: {}", e),
            }
        }
    }
    
    impl<E: std::fmt::Debug + std::fmt::Display> std::error::Error for TimeoutError<E> {}
    
    /// Run a function with a timeout. Returns Err(TimeoutError::Elapsed) if it exceeds the deadline.
    pub fn with_timeout<T, E>(
        timeout: Duration,
        f: impl FnOnce() -> Result<T, E> + Send + 'static,
    ) -> Result<T, TimeoutError<E>>
    where
        T: Send + 'static,
        E: Send + 'static,
    {
        let (tx, rx) = mpsc::channel();
        thread::spawn(move || {
            let _ = tx.send(f());
        });
    
        match rx.recv_timeout(timeout) {
            Ok(Ok(value)) => Ok(value),
            Ok(Err(e)) => Err(TimeoutError::TaskFailed(e)),
            Err(mpsc::RecvTimeoutError::Timeout) => Err(TimeoutError::Elapsed),
            Err(mpsc::RecvTimeoutError::Disconnected) => {
                Err(TimeoutError::TaskFailed(panic!("thread disconnected")))
            }
        }
    }
    
    /// Simplified timeout for operations that can't fail (except by timeout).
    pub fn with_timeout_simple<T>(
        timeout: Duration,
        f: impl FnOnce() -> T + Send + 'static,
    ) -> Option<T>
    where
        T: Send + 'static,
    {
        let (tx, rx) = mpsc::channel();
        thread::spawn(move || {
            let _ = tx.send(f());
        });
        rx.recv_timeout(timeout).ok()
    }
    
    /// Run multiple operations with individual timeouts, returning the first success.
    pub fn first_success_with_timeout<T, E>(
        timeout_each: Duration,
        operations: Vec<Box<dyn FnOnce() -> Result<T, E> + Send>>,
    ) -> Result<T, TimeoutError<E>>
    where
        T: Send + 'static,
        E: Send + 'static + Clone,
    {
        let mut last_error = None;
    
        for op in operations {
            match with_timeout(timeout_each, op) {
                Ok(v) => return Ok(v),
                Err(e) => last_error = Some(e),
            }
        }
    
        Err(last_error.unwrap_or(TimeoutError::Elapsed))
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        fn slow_operation(delay_ms: u64, value: i32) -> Result<i32, String> {
            thread::sleep(Duration::from_millis(delay_ms));
            Ok(value)
        }
    
        #[test]
        fn test_completes_before_timeout() {
            let result = with_timeout(Duration::from_millis(200), || slow_operation(10, 42));
            assert_eq!(result.unwrap(), 42);
        }
    
        #[test]
        fn test_times_out() {
            let result = with_timeout(Duration::from_millis(10), || slow_operation(500, 0));
            assert!(matches!(result, Err(TimeoutError::Elapsed)));
        }
    
        #[test]
        fn test_task_error_propagates() {
            let result: Result<i32, TimeoutError<String>> =
                with_timeout(Duration::from_millis(100), || Err("failed".to_string()));
            assert!(matches!(result, Err(TimeoutError::TaskFailed(_))));
        }
    
        #[test]
        fn test_timeout_simple_success() {
            let result = with_timeout_simple(Duration::from_millis(100), || {
                thread::sleep(Duration::from_millis(5));
                42
            });
            assert_eq!(result, Some(42));
        }
    
        #[test]
        fn test_timeout_simple_failure() {
            let result = with_timeout_simple(Duration::from_millis(10), || {
                thread::sleep(Duration::from_millis(200));
                42
            });
            assert_eq!(result, None);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        fn slow_operation(delay_ms: u64, value: i32) -> Result<i32, String> {
            thread::sleep(Duration::from_millis(delay_ms));
            Ok(value)
        }
    
        #[test]
        fn test_completes_before_timeout() {
            let result = with_timeout(Duration::from_millis(200), || slow_operation(10, 42));
            assert_eq!(result.unwrap(), 42);
        }
    
        #[test]
        fn test_times_out() {
            let result = with_timeout(Duration::from_millis(10), || slow_operation(500, 0));
            assert!(matches!(result, Err(TimeoutError::Elapsed)));
        }
    
        #[test]
        fn test_task_error_propagates() {
            let result: Result<i32, TimeoutError<String>> =
                with_timeout(Duration::from_millis(100), || Err("failed".to_string()));
            assert!(matches!(result, Err(TimeoutError::TaskFailed(_))));
        }
    
        #[test]
        fn test_timeout_simple_success() {
            let result = with_timeout_simple(Duration::from_millis(100), || {
                thread::sleep(Duration::from_millis(5));
                42
            });
            assert_eq!(result, Some(42));
        }
    
        #[test]
        fn test_timeout_simple_failure() {
            let result = with_timeout_simple(Duration::from_millis(10), || {
                thread::sleep(Duration::from_millis(200));
                42
            });
            assert_eq!(result, None);
        }
    }

    Deep Comparison

    OCaml vs Rust: Timeout Async

    Timeout Pattern

    OCaml (Lwt):

    let with_timeout timeout f =
      Lwt.pick [
        f ();
        Lwt_unix.sleep timeout >>= fun () -> Lwt.fail Timeout
      ]
    

    Rust:

    fn with_timeout<T>(timeout: Duration, f: impl FnOnce() -> Result<T, E>) -> Result<T, TimeoutError<E>> {
        let (tx, rx) = mpsc::channel();
        thread::spawn(move || { let _ = tx.send(f()); });
        rx.recv_timeout(timeout)
    }
    

    Key Differences

    AspectOCamlRust
    Async timeoutLwt.pick with sleeptokio::time::timeout
    Sync timeoutBusy-wait looprecv_timeout
    Error typeExceptionEnum variant
    CancellationLwt.cancelFuture dropped

    Exercises

  • Add a retry parameter: if the operation times out, retry up to N times before giving up.
  • Implement a deadline_from_now(secs) function that creates a timeout computed from the current moment.
  • Distinguish between "timed out waiting for response" and "operation failed with error" in a client function, using TimeoutError<AppError>.
  • Open Source Repos