ExamplesBy LevelBy TopicLearning Paths
998 Advanced

998 Circuit Breaker

Functional Programming

Tutorial

The Problem

Implement the Circuit Breaker pattern — a fault-tolerance mechanism with three states: Closed (normal operation), Open (refusing calls after too many failures), and Half-Open (testing recovery after a timeout). When failure count exceeds a threshold, the breaker opens and rejects calls. After a recovery timeout, it transitions to Half-Open and allows one test call.

🎯 Learning Outcomes

  • • Model the circuit breaker as enum BreakerState { Closed, Open { opened_at: Instant }, HalfOpen }
  • • Protect state transitions with Mutex<BreakerState> and Mutex<u32> for failure count
  • • Implement call<T, E, F> that: checks state, executes f(), updates state based on success/failure
  • • Implement maybe_transition_to_half_open that checks if recovery_timeout has elapsed since opening
  • • Return CallResult::CircuitOpen when the breaker is open, without calling f
  • Code Example

    #![allow(clippy::all)]
    // 998: Circuit Breaker
    // Open/Half-Open/Closed state machine for fault tolerance
    
    use std::sync::Mutex;
    use std::time::{Duration, Instant};
    
    #[derive(Debug, PartialEq, Clone)]
    enum BreakerState {
        Closed,
        Open { opened_at: Instant },
        HalfOpen,
    }
    
    pub struct CircuitBreaker {
        state: Mutex<BreakerState>,
        failures: Mutex<u32>,
        failure_threshold: u32,
        recovery_timeout: Duration,
    }
    
    #[derive(Debug, PartialEq)]
    pub enum CallResult<T, E> {
        Success(T),
        Failure(E),
        CircuitOpen,
    }
    
    impl CircuitBreaker {
        pub fn new(failure_threshold: u32, recovery_timeout: Duration) -> Self {
            CircuitBreaker {
                state: Mutex::new(BreakerState::Closed),
                failures: Mutex::new(0),
                failure_threshold,
                recovery_timeout,
            }
        }
    
        fn maybe_transition_to_half_open(&self) {
            let mut state = self.state.lock().unwrap();
            if let BreakerState::Open { opened_at } = *state {
                if opened_at.elapsed() >= self.recovery_timeout {
                    *state = BreakerState::HalfOpen;
                }
            }
        }
    
        pub fn call<T, E, F>(&self, f: F) -> CallResult<T, E>
        where
            F: FnOnce() -> Result<T, E>,
        {
            self.maybe_transition_to_half_open();
    
            let current_state = self.state.lock().unwrap().clone();
            match current_state {
                BreakerState::Open { .. } => CallResult::CircuitOpen,
                BreakerState::Closed | BreakerState::HalfOpen => {
                    match f() {
                        Ok(v) => {
                            // Success: reset failures, close circuit
                            *self.failures.lock().unwrap() = 0;
                            *self.state.lock().unwrap() = BreakerState::Closed;
                            CallResult::Success(v)
                        }
                        Err(e) => {
                            let mut failures = self.failures.lock().unwrap();
                            *failures += 1;
                            if *failures >= self.failure_threshold {
                                *self.state.lock().unwrap() = BreakerState::Open {
                                    opened_at: Instant::now(),
                                };
                            }
                            CallResult::Failure(e)
                        }
                    }
                }
            }
        }
    
        pub fn state_name(&self) -> &'static str {
            match *self.state.lock().unwrap() {
                BreakerState::Closed => "Closed",
                BreakerState::Open { .. } => "Open",
                BreakerState::HalfOpen => "HalfOpen",
            }
        }
    
        pub fn reset(&self) {
            *self.state.lock().unwrap() = BreakerState::Closed;
            *self.failures.lock().unwrap() = 0;
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_initial_state_closed() {
            let cb = CircuitBreaker::new(3, Duration::from_secs(1));
            assert_eq!(cb.state_name(), "Closed");
        }
    
        #[test]
        fn test_opens_after_threshold() {
            let cb = CircuitBreaker::new(3, Duration::from_secs(10));
            for _ in 0..3 {
                cb.call(|| Err::<i32, &str>("err"));
            }
            assert_eq!(cb.state_name(), "Open");
        }
    
        #[test]
        fn test_rejects_when_open() {
            let cb = CircuitBreaker::new(2, Duration::from_secs(10));
            cb.call(|| Err::<i32, &str>("e"));
            cb.call(|| Err::<i32, &str>("e")); // trip breaker
            let r = cb.call(|| Ok::<i32, &str>(42));
            assert_eq!(r, CallResult::CircuitOpen);
        }
    
        #[test]
        fn test_recovers_after_timeout() {
            let cb = CircuitBreaker::new(2, Duration::from_millis(20));
            cb.call(|| Err::<i32, &str>("e"));
            cb.call(|| Err::<i32, &str>("e")); // open
            assert_eq!(cb.state_name(), "Open");
    
            std::thread::sleep(Duration::from_millis(30));
    
            let r = cb.call(|| Ok::<i32, &str>(99));
            assert_eq!(r, CallResult::Success(99));
            assert_eq!(cb.state_name(), "Closed");
        }
    
        #[test]
        fn test_success_resets_failures() {
            let cb = CircuitBreaker::new(3, Duration::from_secs(1));
            cb.call(|| Err::<i32, &str>("e"));
            cb.call(|| Err::<i32, &str>("e")); // 2 failures
            cb.call(|| Ok::<i32, &str>(1)); // success — reset
            cb.call(|| Err::<i32, &str>("e")); // 1 failure — not open yet
            assert_eq!(cb.state_name(), "Closed");
        }
    }

    Key Differences

    AspectRustOCaml
    State enumBreakerState with Instant in OpenCustom type with float timestamp
    SynchronizationTwo Mutex fields (or one Mutex<Inner>)Single Mutex.protect
    Elapsed checkopened_at.elapsed()gettimeofday () -. t
    Return typeCallResult<T, E> — three variantsResult with CircuitOpen error

    Circuit breakers prevent cascading failures: if a downstream service is down, the breaker opens and fast-fails subsequent calls rather than piling up waiting threads. The Half-Open state enables automatic recovery without manual intervention.

    OCaml Approach

    type state = Closed | Open of float | HalfOpen
    
    type t = {
      mutable state: state;
      mutable failures: int;
      threshold: int;
      recovery_s: float;
      mutex: Mutex.t;
    }
    
    let call cb f =
      Mutex.protect cb.mutex (fun () ->
        (match cb.state with
         | Open t when Unix.gettimeofday () -. t >= cb.recovery_s ->
           cb.state <- HalfOpen
         | _ -> ());
        match cb.state with
        | Open _ -> Error `CircuitOpen
        | _ ->
          match f () with
          | Ok v ->
            cb.state <- Closed; cb.failures <- 0; Ok v
          | Error e ->
            cb.failures <- cb.failures + 1;
            if cb.failures >= cb.threshold then
              cb.state <- Open (Unix.gettimeofday ());
            Error e)
    

    OCaml's Mutex.protect wraps the entire operation — simpler than Rust's two separate Mutex fields but holds the lock longer. The state machine logic is identical; only the synchronization primitive differs.

    Full Source

    #![allow(clippy::all)]
    // 998: Circuit Breaker
    // Open/Half-Open/Closed state machine for fault tolerance
    
    use std::sync::Mutex;
    use std::time::{Duration, Instant};
    
    #[derive(Debug, PartialEq, Clone)]
    enum BreakerState {
        Closed,
        Open { opened_at: Instant },
        HalfOpen,
    }
    
    pub struct CircuitBreaker {
        state: Mutex<BreakerState>,
        failures: Mutex<u32>,
        failure_threshold: u32,
        recovery_timeout: Duration,
    }
    
    #[derive(Debug, PartialEq)]
    pub enum CallResult<T, E> {
        Success(T),
        Failure(E),
        CircuitOpen,
    }
    
    impl CircuitBreaker {
        pub fn new(failure_threshold: u32, recovery_timeout: Duration) -> Self {
            CircuitBreaker {
                state: Mutex::new(BreakerState::Closed),
                failures: Mutex::new(0),
                failure_threshold,
                recovery_timeout,
            }
        }
    
        fn maybe_transition_to_half_open(&self) {
            let mut state = self.state.lock().unwrap();
            if let BreakerState::Open { opened_at } = *state {
                if opened_at.elapsed() >= self.recovery_timeout {
                    *state = BreakerState::HalfOpen;
                }
            }
        }
    
        pub fn call<T, E, F>(&self, f: F) -> CallResult<T, E>
        where
            F: FnOnce() -> Result<T, E>,
        {
            self.maybe_transition_to_half_open();
    
            let current_state = self.state.lock().unwrap().clone();
            match current_state {
                BreakerState::Open { .. } => CallResult::CircuitOpen,
                BreakerState::Closed | BreakerState::HalfOpen => {
                    match f() {
                        Ok(v) => {
                            // Success: reset failures, close circuit
                            *self.failures.lock().unwrap() = 0;
                            *self.state.lock().unwrap() = BreakerState::Closed;
                            CallResult::Success(v)
                        }
                        Err(e) => {
                            let mut failures = self.failures.lock().unwrap();
                            *failures += 1;
                            if *failures >= self.failure_threshold {
                                *self.state.lock().unwrap() = BreakerState::Open {
                                    opened_at: Instant::now(),
                                };
                            }
                            CallResult::Failure(e)
                        }
                    }
                }
            }
        }
    
        pub fn state_name(&self) -> &'static str {
            match *self.state.lock().unwrap() {
                BreakerState::Closed => "Closed",
                BreakerState::Open { .. } => "Open",
                BreakerState::HalfOpen => "HalfOpen",
            }
        }
    
        pub fn reset(&self) {
            *self.state.lock().unwrap() = BreakerState::Closed;
            *self.failures.lock().unwrap() = 0;
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_initial_state_closed() {
            let cb = CircuitBreaker::new(3, Duration::from_secs(1));
            assert_eq!(cb.state_name(), "Closed");
        }
    
        #[test]
        fn test_opens_after_threshold() {
            let cb = CircuitBreaker::new(3, Duration::from_secs(10));
            for _ in 0..3 {
                cb.call(|| Err::<i32, &str>("err"));
            }
            assert_eq!(cb.state_name(), "Open");
        }
    
        #[test]
        fn test_rejects_when_open() {
            let cb = CircuitBreaker::new(2, Duration::from_secs(10));
            cb.call(|| Err::<i32, &str>("e"));
            cb.call(|| Err::<i32, &str>("e")); // trip breaker
            let r = cb.call(|| Ok::<i32, &str>(42));
            assert_eq!(r, CallResult::CircuitOpen);
        }
    
        #[test]
        fn test_recovers_after_timeout() {
            let cb = CircuitBreaker::new(2, Duration::from_millis(20));
            cb.call(|| Err::<i32, &str>("e"));
            cb.call(|| Err::<i32, &str>("e")); // open
            assert_eq!(cb.state_name(), "Open");
    
            std::thread::sleep(Duration::from_millis(30));
    
            let r = cb.call(|| Ok::<i32, &str>(99));
            assert_eq!(r, CallResult::Success(99));
            assert_eq!(cb.state_name(), "Closed");
        }
    
        #[test]
        fn test_success_resets_failures() {
            let cb = CircuitBreaker::new(3, Duration::from_secs(1));
            cb.call(|| Err::<i32, &str>("e"));
            cb.call(|| Err::<i32, &str>("e")); // 2 failures
            cb.call(|| Ok::<i32, &str>(1)); // success — reset
            cb.call(|| Err::<i32, &str>("e")); // 1 failure — not open yet
            assert_eq!(cb.state_name(), "Closed");
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_initial_state_closed() {
            let cb = CircuitBreaker::new(3, Duration::from_secs(1));
            assert_eq!(cb.state_name(), "Closed");
        }
    
        #[test]
        fn test_opens_after_threshold() {
            let cb = CircuitBreaker::new(3, Duration::from_secs(10));
            for _ in 0..3 {
                cb.call(|| Err::<i32, &str>("err"));
            }
            assert_eq!(cb.state_name(), "Open");
        }
    
        #[test]
        fn test_rejects_when_open() {
            let cb = CircuitBreaker::new(2, Duration::from_secs(10));
            cb.call(|| Err::<i32, &str>("e"));
            cb.call(|| Err::<i32, &str>("e")); // trip breaker
            let r = cb.call(|| Ok::<i32, &str>(42));
            assert_eq!(r, CallResult::CircuitOpen);
        }
    
        #[test]
        fn test_recovers_after_timeout() {
            let cb = CircuitBreaker::new(2, Duration::from_millis(20));
            cb.call(|| Err::<i32, &str>("e"));
            cb.call(|| Err::<i32, &str>("e")); // open
            assert_eq!(cb.state_name(), "Open");
    
            std::thread::sleep(Duration::from_millis(30));
    
            let r = cb.call(|| Ok::<i32, &str>(99));
            assert_eq!(r, CallResult::Success(99));
            assert_eq!(cb.state_name(), "Closed");
        }
    
        #[test]
        fn test_success_resets_failures() {
            let cb = CircuitBreaker::new(3, Duration::from_secs(1));
            cb.call(|| Err::<i32, &str>("e"));
            cb.call(|| Err::<i32, &str>("e")); // 2 failures
            cb.call(|| Ok::<i32, &str>(1)); // success — reset
            cb.call(|| Err::<i32, &str>("e")); // 1 failure — not open yet
            assert_eq!(cb.state_name(), "Closed");
        }
    }

    Deep Comparison

    Circuit Breaker — Comparison

    Core Insight

    The circuit breaker is an automatic state machine that protects callers from cascading failures. Like an electrical circuit breaker: too many failures "trip" it to Open, then it tests recovery in HalfOpen, then resets to Closed on success.

    OCaml Approach

  • • Mutable record fields for state, failures, failure_threshold
  • Mutex to protect state transitions (thread-safe)
  • Unix.gettimeofday() float for wall-clock timing
  • Open of float carries the timestamp when it opened
  • • State check and transition in call function
  • Rust Approach

  • Mutex<BreakerState> + Mutex<u32> for state and failures
  • BreakerState::Open { opened_at: Instant }Instant for elapsed time
  • Instant::now().elapsed() >= recovery_timeout for timeout check
  • call<T, E, F>(&self, f: F) -> CallResult<T, E> — generic over result type
  • maybe_transition_to_half_open() for clean separation
  • Comparison Table

    ConceptOCamlRust
    State enumtype state = Closed \| Open \| HalfOpenenum BreakerState { Closed, Open { at: Instant }, HalfOpen }
    Thread safetyMutex.t + explicit lock/unlockMutex<BreakerState> RAII
    TimingUnix.gettimeofday () (f64 secs)Instant::now() / .elapsed()
    Call resultBrResult v \| CircuitOpen \| CallError eCallResult<T,E> enum
    Transition logicPattern match in callSeparate maybe_transition method
    Generic over types'a circuit_breakerGeneric <T, E, F> on call
    ProductionManual or library (retrying-oc)failsafe-rs, tower::limit

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Combine state and failures into a single Mutex<BreakerInner> to eliminate TOCTOU.
  • Add half-open timeout: if a Half-Open call takes longer than timeout, count as failure.
  • Implement metrics: track total calls, successes, failures, and rejections as AtomicUsize counters.
  • Add an on_state_change: impl Fn(BreakerState, BreakerState) callback fired on every state transition.
  • Implement a CircuitBreakerRegistry that manages multiple named breakers and provides aggregate health status.
  • Open Source Repos