ExamplesBy LevelBy TopicLearning Paths
332 Advanced

332: Retry Async with Exponential Backoff

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "332: Retry Async with Exponential Backoff" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Transient failures — network blips, temporary service overloads, rate limits — are common in distributed systems. Key difference from OCaml: 1. **Transient vs permanent**: Rust's `RetryError<E>` embeds the retry decision in the error type; the caller signals "try again" or "give up".

Tutorial

The Problem

Transient failures — network blips, temporary service overloads, rate limits — are common in distributed systems. Retrying immediately is counterproductive (may worsen overload); retrying with exponential backoff gives services time to recover while limiting wait time with a maximum delay cap. Distinguishing transient from permanent errors is essential: don't retry bad input or authentication failures. This is a fundamental resilience pattern for any service calling external APIs.

🎯 Learning Outcomes

  • • Distinguish transient errors (worth retrying) from permanent errors (don't retry)
  • • Implement exponential backoff: delay doubles each attempt, capped at a maximum
  • • Add jitter to prevent thundering herd: randomize delays slightly
  • • Implement a configurable retry loop with max attempts, base delay, multiplier, and max delay
  • Code Example

    fn retry<T, E: Clone>(cfg: &RetryConfig, mut f: impl FnMut()->Result<T,RetryError<E>>) -> Result<T, E> {
        let mut delay = cfg.base_delay;
        for attempt in 1..=cfg.max_attempts {
            match f() {
                Ok(v) => return Ok(v),
                Err(RetryError::Permanent(e)) => return Err(e),
                Err(RetryError::Transient(e)) => {
                    thread::sleep(delay);
                    delay = delay.mul_f64(cfg.multiplier);
                }
            }
        }
    }

    Key Differences

  • Transient vs permanent: Rust's RetryError<E> embeds the retry decision in the error type; the caller signals "try again" or "give up".
  • Jitter: Adding randomness (delay * (1 + 0.1 * random)) prevents synchronized retries from all clients hitting the server at the same moment.
  • Production libraries: backoff and tower::retry crates provide production-ready retry middleware with customizable policies.
  • Circuit breaker: Combines with retry: after too many failures, open the circuit to stop retrying for a cooling period.
  • OCaml Approach

    OCaml's Lwt provides Lwt_retry in some libraries, or a custom recursive retry:

    let rec retry ?(attempts=3) ?(delay=0.1) f () =
      Lwt.catch (fun () -> f () >>= fun v -> Lwt.return (Ok v))
        (fun exn ->
           if attempts <= 1 then Lwt.return (Error exn)
           else Lwt_unix.sleep delay >>= retry ~attempts:(attempts-1) ~delay:(delay *. 2.0) f)
    

    Full Source

    #![allow(clippy::all)]
    //! # Retry Async
    //!
    //! Retry failed operations with exponential backoff —
    //! the foundation of resilient async services.
    
    use std::thread;
    use std::time::Duration;
    
    /// Error type distinguishing transient from permanent failures.
    #[derive(Debug, Clone, PartialEq, Eq)]
    pub enum RetryError<E> {
        /// Worth retrying — network blip, timeout, overload
        Transient(E),
        /// Don't retry — bad input, auth failure, 404
        Permanent(E),
    }
    
    /// Configuration for retry behavior.
    #[derive(Debug, Clone)]
    pub struct RetryConfig {
        pub max_attempts: usize,
        pub base_delay: Duration,
        pub multiplier: f64,
        pub max_delay: Duration,
    }
    
    impl Default for RetryConfig {
        fn default() -> Self {
            Self {
                max_attempts: 3,
                base_delay: Duration::from_millis(100),
                multiplier: 2.0,
                max_delay: Duration::from_secs(30),
            }
        }
    }
    
    impl RetryConfig {
        pub fn with_attempts(mut self, n: usize) -> Self {
            self.max_attempts = n;
            self
        }
    
        pub fn with_base_delay(mut self, d: Duration) -> Self {
            self.base_delay = d;
            self
        }
    }
    
    /// Retry an operation with exponential backoff.
    pub fn retry<T, E: Clone>(
        config: &RetryConfig,
        mut operation: impl FnMut() -> Result<T, RetryError<E>>,
    ) -> Result<T, E> {
        let mut delay = config.base_delay;
        let mut last_error = None;
    
        for attempt in 1..=config.max_attempts {
            match operation() {
                Ok(value) => return Ok(value),
                Err(RetryError::Permanent(e)) => return Err(e),
                Err(RetryError::Transient(e)) => {
                    last_error = Some(e);
                    if attempt < config.max_attempts {
                        thread::sleep(delay);
                        delay = delay.mul_f64(config.multiplier).min(config.max_delay);
                    }
                }
            }
        }
    
        Err(last_error.unwrap())
    }
    
    /// Retry with a simple predicate to determine if error is transient.
    pub fn retry_if<T, E: Clone>(
        config: &RetryConfig,
        is_transient: impl Fn(&E) -> bool,
        mut operation: impl FnMut() -> Result<T, E>,
    ) -> Result<T, E> {
        retry(config, || match operation() {
            Ok(v) => Ok(v),
            Err(e) => {
                if is_transient(&e) {
                    Err(RetryError::Transient(e))
                } else {
                    Err(RetryError::Permanent(e))
                }
            }
        })
    }
    
    /// Simple retry with fixed delay (no exponential backoff).
    pub fn retry_fixed<T, E: Clone>(
        max_attempts: usize,
        delay: Duration,
        mut operation: impl FnMut() -> Result<T, RetryError<E>>,
    ) -> Result<T, E> {
        let mut last_error = None;
    
        for attempt in 1..=max_attempts {
            match operation() {
                Ok(value) => return Ok(value),
                Err(RetryError::Permanent(e)) => return Err(e),
                Err(RetryError::Transient(e)) => {
                    last_error = Some(e);
                    if attempt < max_attempts {
                        thread::sleep(delay);
                    }
                }
            }
        }
    
        Err(last_error.unwrap())
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        use std::sync::atomic::{AtomicUsize, Ordering};
        use std::sync::Arc;
    
        #[test]
        fn test_succeeds_after_retries() {
            let counter = Arc::new(AtomicUsize::new(0));
            let c = Arc::clone(&counter);
    
            let config = RetryConfig::default()
                .with_attempts(5)
                .with_base_delay(Duration::from_millis(1));
    
            let result: Result<i32, String> = retry(&config, move || {
                let n = c.fetch_add(1, Ordering::SeqCst);
                if n < 2 {
                    Err(RetryError::Transient(format!("attempt {}", n + 1)))
                } else {
                    Ok(42)
                }
            });
    
            assert_eq!(result.unwrap(), 42);
            assert_eq!(counter.load(Ordering::SeqCst), 3);
        }
    
        #[test]
        fn test_permanent_error_no_retry() {
            let counter = Arc::new(AtomicUsize::new(0));
            let c = Arc::clone(&counter);
    
            let config = RetryConfig::default();
    
            let result: Result<i32, String> = retry(&config, move || {
                c.fetch_add(1, Ordering::SeqCst);
                Err(RetryError::Permanent("fatal".to_string()))
            });
    
            assert!(result.is_err());
            assert_eq!(counter.load(Ordering::SeqCst), 1);
        }
    
        #[test]
        fn test_exhausts_all_attempts() {
            let counter = Arc::new(AtomicUsize::new(0));
            let c = Arc::clone(&counter);
    
            let config = RetryConfig::default()
                .with_attempts(3)
                .with_base_delay(Duration::from_millis(1));
    
            let result: Result<i32, String> = retry(&config, move || {
                c.fetch_add(1, Ordering::SeqCst);
                Err(RetryError::Transient("still failing".to_string()))
            });
    
            assert!(result.is_err());
            assert_eq!(counter.load(Ordering::SeqCst), 3);
        }
    
        #[test]
        fn test_retry_if() {
            let counter = Arc::new(AtomicUsize::new(0));
            let c = Arc::clone(&counter);
    
            let config = RetryConfig::default()
                .with_attempts(5)
                .with_base_delay(Duration::from_millis(1));
    
            let result: Result<i32, i32> = retry_if(
                &config,
                |e| *e == 500, // only retry 500 errors
                move || {
                    let n = c.fetch_add(1, Ordering::SeqCst);
                    if n < 2 {
                        Err(500)
                    } else {
                        Ok(42)
                    }
                },
            );
    
            assert_eq!(result.unwrap(), 42);
        }
    
        #[test]
        fn test_retry_fixed() {
            let counter = Arc::new(AtomicUsize::new(0));
            let c = Arc::clone(&counter);
    
            let result: Result<i32, String> = retry_fixed(3, Duration::from_millis(1), move || {
                let n = c.fetch_add(1, Ordering::SeqCst);
                if n < 1 {
                    Err(RetryError::Transient("not yet".to_string()))
                } else {
                    Ok(99)
                }
            });
    
            assert_eq!(result.unwrap(), 99);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
        use std::sync::atomic::{AtomicUsize, Ordering};
        use std::sync::Arc;
    
        #[test]
        fn test_succeeds_after_retries() {
            let counter = Arc::new(AtomicUsize::new(0));
            let c = Arc::clone(&counter);
    
            let config = RetryConfig::default()
                .with_attempts(5)
                .with_base_delay(Duration::from_millis(1));
    
            let result: Result<i32, String> = retry(&config, move || {
                let n = c.fetch_add(1, Ordering::SeqCst);
                if n < 2 {
                    Err(RetryError::Transient(format!("attempt {}", n + 1)))
                } else {
                    Ok(42)
                }
            });
    
            assert_eq!(result.unwrap(), 42);
            assert_eq!(counter.load(Ordering::SeqCst), 3);
        }
    
        #[test]
        fn test_permanent_error_no_retry() {
            let counter = Arc::new(AtomicUsize::new(0));
            let c = Arc::clone(&counter);
    
            let config = RetryConfig::default();
    
            let result: Result<i32, String> = retry(&config, move || {
                c.fetch_add(1, Ordering::SeqCst);
                Err(RetryError::Permanent("fatal".to_string()))
            });
    
            assert!(result.is_err());
            assert_eq!(counter.load(Ordering::SeqCst), 1);
        }
    
        #[test]
        fn test_exhausts_all_attempts() {
            let counter = Arc::new(AtomicUsize::new(0));
            let c = Arc::clone(&counter);
    
            let config = RetryConfig::default()
                .with_attempts(3)
                .with_base_delay(Duration::from_millis(1));
    
            let result: Result<i32, String> = retry(&config, move || {
                c.fetch_add(1, Ordering::SeqCst);
                Err(RetryError::Transient("still failing".to_string()))
            });
    
            assert!(result.is_err());
            assert_eq!(counter.load(Ordering::SeqCst), 3);
        }
    
        #[test]
        fn test_retry_if() {
            let counter = Arc::new(AtomicUsize::new(0));
            let c = Arc::clone(&counter);
    
            let config = RetryConfig::default()
                .with_attempts(5)
                .with_base_delay(Duration::from_millis(1));
    
            let result: Result<i32, i32> = retry_if(
                &config,
                |e| *e == 500, // only retry 500 errors
                move || {
                    let n = c.fetch_add(1, Ordering::SeqCst);
                    if n < 2 {
                        Err(500)
                    } else {
                        Ok(42)
                    }
                },
            );
    
            assert_eq!(result.unwrap(), 42);
        }
    
        #[test]
        fn test_retry_fixed() {
            let counter = Arc::new(AtomicUsize::new(0));
            let c = Arc::clone(&counter);
    
            let result: Result<i32, String> = retry_fixed(3, Duration::from_millis(1), move || {
                let n = c.fetch_add(1, Ordering::SeqCst);
                if n < 1 {
                    Err(RetryError::Transient("not yet".to_string()))
                } else {
                    Ok(99)
                }
            });
    
            assert_eq!(result.unwrap(), 99);
        }
    }

    Deep Comparison

    OCaml vs Rust: Retry Async

    Retry Loop

    OCaml:

    let rec loop attempt delay =
      match f () with
      | Ok v -> Ok v
      | Error (Permanent e) -> Error e
      | Error (Transient e) ->
        if attempt >= max_attempts then Error e
        else (Thread.delay delay; loop (attempt + 1) (delay *. multiplier))
    

    Rust:

    fn retry<T, E: Clone>(cfg: &RetryConfig, mut f: impl FnMut()->Result<T,RetryError<E>>) -> Result<T, E> {
        let mut delay = cfg.base_delay;
        for attempt in 1..=cfg.max_attempts {
            match f() {
                Ok(v) => return Ok(v),
                Err(RetryError::Permanent(e)) => return Err(e),
                Err(RetryError::Transient(e)) => {
                    thread::sleep(delay);
                    delay = delay.mul_f64(cfg.multiplier);
                }
            }
        }
    }
    

    Key Differences

    AspectOCamlRust
    Loop styleRecursiveIterative for loop
    Delay typeFloat secondsDuration
    Delay scaling*. 2.0.mul_f64(2.0)
    StateCounter as refFnMut with internal state

    Exercises

  • Add jitter to the delay calculation: multiply the computed delay by a random factor between 0.9 and 1.1.
  • Implement a retry with a deadline: stop retrying after a total wall-clock time budget regardless of attempt count.
  • Build a circuit breaker on top of retry: after 5 consecutive failures, open the circuit and fail fast for 30 seconds.
  • Open Source Repos