998 Circuit Breaker
Tutorial
The Problem
Implement the Circuit Breaker pattern — a fault-tolerance mechanism with three states: Closed (normal operation), Open (refusing calls after too many failures), and Half-Open (testing recovery after a timeout). When failure count exceeds a threshold, the breaker opens and rejects calls. After a recovery timeout, it transitions to Half-Open and allows one test call.
🎯 Learning Outcomes
enum BreakerState { Closed, Open { opened_at: Instant }, HalfOpen }Mutex<BreakerState> and Mutex<u32> for failure countcall<T, E, F> that: checks state, executes f(), updates state based on success/failuremaybe_transition_to_half_open that checks if recovery_timeout has elapsed since openingCallResult::CircuitOpen when the breaker is open, without calling fCode Example
#![allow(clippy::all)]
// 998: Circuit Breaker
// Open/Half-Open/Closed state machine for fault tolerance
use std::sync::Mutex;
use std::time::{Duration, Instant};
#[derive(Debug, PartialEq, Clone)]
enum BreakerState {
Closed,
Open { opened_at: Instant },
HalfOpen,
}
pub struct CircuitBreaker {
state: Mutex<BreakerState>,
failures: Mutex<u32>,
failure_threshold: u32,
recovery_timeout: Duration,
}
#[derive(Debug, PartialEq)]
pub enum CallResult<T, E> {
Success(T),
Failure(E),
CircuitOpen,
}
impl CircuitBreaker {
pub fn new(failure_threshold: u32, recovery_timeout: Duration) -> Self {
CircuitBreaker {
state: Mutex::new(BreakerState::Closed),
failures: Mutex::new(0),
failure_threshold,
recovery_timeout,
}
}
fn maybe_transition_to_half_open(&self) {
let mut state = self.state.lock().unwrap();
if let BreakerState::Open { opened_at } = *state {
if opened_at.elapsed() >= self.recovery_timeout {
*state = BreakerState::HalfOpen;
}
}
}
pub fn call<T, E, F>(&self, f: F) -> CallResult<T, E>
where
F: FnOnce() -> Result<T, E>,
{
self.maybe_transition_to_half_open();
let current_state = self.state.lock().unwrap().clone();
match current_state {
BreakerState::Open { .. } => CallResult::CircuitOpen,
BreakerState::Closed | BreakerState::HalfOpen => {
match f() {
Ok(v) => {
// Success: reset failures, close circuit
*self.failures.lock().unwrap() = 0;
*self.state.lock().unwrap() = BreakerState::Closed;
CallResult::Success(v)
}
Err(e) => {
let mut failures = self.failures.lock().unwrap();
*failures += 1;
if *failures >= self.failure_threshold {
*self.state.lock().unwrap() = BreakerState::Open {
opened_at: Instant::now(),
};
}
CallResult::Failure(e)
}
}
}
}
}
pub fn state_name(&self) -> &'static str {
match *self.state.lock().unwrap() {
BreakerState::Closed => "Closed",
BreakerState::Open { .. } => "Open",
BreakerState::HalfOpen => "HalfOpen",
}
}
pub fn reset(&self) {
*self.state.lock().unwrap() = BreakerState::Closed;
*self.failures.lock().unwrap() = 0;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_initial_state_closed() {
let cb = CircuitBreaker::new(3, Duration::from_secs(1));
assert_eq!(cb.state_name(), "Closed");
}
#[test]
fn test_opens_after_threshold() {
let cb = CircuitBreaker::new(3, Duration::from_secs(10));
for _ in 0..3 {
cb.call(|| Err::<i32, &str>("err"));
}
assert_eq!(cb.state_name(), "Open");
}
#[test]
fn test_rejects_when_open() {
let cb = CircuitBreaker::new(2, Duration::from_secs(10));
cb.call(|| Err::<i32, &str>("e"));
cb.call(|| Err::<i32, &str>("e")); // trip breaker
let r = cb.call(|| Ok::<i32, &str>(42));
assert_eq!(r, CallResult::CircuitOpen);
}
#[test]
fn test_recovers_after_timeout() {
let cb = CircuitBreaker::new(2, Duration::from_millis(20));
cb.call(|| Err::<i32, &str>("e"));
cb.call(|| Err::<i32, &str>("e")); // open
assert_eq!(cb.state_name(), "Open");
std::thread::sleep(Duration::from_millis(30));
let r = cb.call(|| Ok::<i32, &str>(99));
assert_eq!(r, CallResult::Success(99));
assert_eq!(cb.state_name(), "Closed");
}
#[test]
fn test_success_resets_failures() {
let cb = CircuitBreaker::new(3, Duration::from_secs(1));
cb.call(|| Err::<i32, &str>("e"));
cb.call(|| Err::<i32, &str>("e")); // 2 failures
cb.call(|| Ok::<i32, &str>(1)); // success — reset
cb.call(|| Err::<i32, &str>("e")); // 1 failure — not open yet
assert_eq!(cb.state_name(), "Closed");
}
}Key Differences
| Aspect | Rust | OCaml |
|---|---|---|
| State enum | BreakerState with Instant in Open | Custom type with float timestamp |
| Synchronization | Two Mutex fields (or one Mutex<Inner>) | Single Mutex.protect |
| Elapsed check | opened_at.elapsed() | gettimeofday () -. t |
| Return type | CallResult<T, E> — three variants | Result with CircuitOpen error |
Circuit breakers prevent cascading failures: if a downstream service is down, the breaker opens and fast-fails subsequent calls rather than piling up waiting threads. The Half-Open state enables automatic recovery without manual intervention.
OCaml Approach
type state = Closed | Open of float | HalfOpen
type t = {
mutable state: state;
mutable failures: int;
threshold: int;
recovery_s: float;
mutex: Mutex.t;
}
let call cb f =
Mutex.protect cb.mutex (fun () ->
(match cb.state with
| Open t when Unix.gettimeofday () -. t >= cb.recovery_s ->
cb.state <- HalfOpen
| _ -> ());
match cb.state with
| Open _ -> Error `CircuitOpen
| _ ->
match f () with
| Ok v ->
cb.state <- Closed; cb.failures <- 0; Ok v
| Error e ->
cb.failures <- cb.failures + 1;
if cb.failures >= cb.threshold then
cb.state <- Open (Unix.gettimeofday ());
Error e)
OCaml's Mutex.protect wraps the entire operation — simpler than Rust's two separate Mutex fields but holds the lock longer. The state machine logic is identical; only the synchronization primitive differs.
Full Source
#![allow(clippy::all)]
// 998: Circuit Breaker
// Open/Half-Open/Closed state machine for fault tolerance
use std::sync::Mutex;
use std::time::{Duration, Instant};
#[derive(Debug, PartialEq, Clone)]
enum BreakerState {
Closed,
Open { opened_at: Instant },
HalfOpen,
}
pub struct CircuitBreaker {
state: Mutex<BreakerState>,
failures: Mutex<u32>,
failure_threshold: u32,
recovery_timeout: Duration,
}
#[derive(Debug, PartialEq)]
pub enum CallResult<T, E> {
Success(T),
Failure(E),
CircuitOpen,
}
impl CircuitBreaker {
pub fn new(failure_threshold: u32, recovery_timeout: Duration) -> Self {
CircuitBreaker {
state: Mutex::new(BreakerState::Closed),
failures: Mutex::new(0),
failure_threshold,
recovery_timeout,
}
}
fn maybe_transition_to_half_open(&self) {
let mut state = self.state.lock().unwrap();
if let BreakerState::Open { opened_at } = *state {
if opened_at.elapsed() >= self.recovery_timeout {
*state = BreakerState::HalfOpen;
}
}
}
pub fn call<T, E, F>(&self, f: F) -> CallResult<T, E>
where
F: FnOnce() -> Result<T, E>,
{
self.maybe_transition_to_half_open();
let current_state = self.state.lock().unwrap().clone();
match current_state {
BreakerState::Open { .. } => CallResult::CircuitOpen,
BreakerState::Closed | BreakerState::HalfOpen => {
match f() {
Ok(v) => {
// Success: reset failures, close circuit
*self.failures.lock().unwrap() = 0;
*self.state.lock().unwrap() = BreakerState::Closed;
CallResult::Success(v)
}
Err(e) => {
let mut failures = self.failures.lock().unwrap();
*failures += 1;
if *failures >= self.failure_threshold {
*self.state.lock().unwrap() = BreakerState::Open {
opened_at: Instant::now(),
};
}
CallResult::Failure(e)
}
}
}
}
}
pub fn state_name(&self) -> &'static str {
match *self.state.lock().unwrap() {
BreakerState::Closed => "Closed",
BreakerState::Open { .. } => "Open",
BreakerState::HalfOpen => "HalfOpen",
}
}
pub fn reset(&self) {
*self.state.lock().unwrap() = BreakerState::Closed;
*self.failures.lock().unwrap() = 0;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_initial_state_closed() {
let cb = CircuitBreaker::new(3, Duration::from_secs(1));
assert_eq!(cb.state_name(), "Closed");
}
#[test]
fn test_opens_after_threshold() {
let cb = CircuitBreaker::new(3, Duration::from_secs(10));
for _ in 0..3 {
cb.call(|| Err::<i32, &str>("err"));
}
assert_eq!(cb.state_name(), "Open");
}
#[test]
fn test_rejects_when_open() {
let cb = CircuitBreaker::new(2, Duration::from_secs(10));
cb.call(|| Err::<i32, &str>("e"));
cb.call(|| Err::<i32, &str>("e")); // trip breaker
let r = cb.call(|| Ok::<i32, &str>(42));
assert_eq!(r, CallResult::CircuitOpen);
}
#[test]
fn test_recovers_after_timeout() {
let cb = CircuitBreaker::new(2, Duration::from_millis(20));
cb.call(|| Err::<i32, &str>("e"));
cb.call(|| Err::<i32, &str>("e")); // open
assert_eq!(cb.state_name(), "Open");
std::thread::sleep(Duration::from_millis(30));
let r = cb.call(|| Ok::<i32, &str>(99));
assert_eq!(r, CallResult::Success(99));
assert_eq!(cb.state_name(), "Closed");
}
#[test]
fn test_success_resets_failures() {
let cb = CircuitBreaker::new(3, Duration::from_secs(1));
cb.call(|| Err::<i32, &str>("e"));
cb.call(|| Err::<i32, &str>("e")); // 2 failures
cb.call(|| Ok::<i32, &str>(1)); // success — reset
cb.call(|| Err::<i32, &str>("e")); // 1 failure — not open yet
assert_eq!(cb.state_name(), "Closed");
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_initial_state_closed() {
let cb = CircuitBreaker::new(3, Duration::from_secs(1));
assert_eq!(cb.state_name(), "Closed");
}
#[test]
fn test_opens_after_threshold() {
let cb = CircuitBreaker::new(3, Duration::from_secs(10));
for _ in 0..3 {
cb.call(|| Err::<i32, &str>("err"));
}
assert_eq!(cb.state_name(), "Open");
}
#[test]
fn test_rejects_when_open() {
let cb = CircuitBreaker::new(2, Duration::from_secs(10));
cb.call(|| Err::<i32, &str>("e"));
cb.call(|| Err::<i32, &str>("e")); // trip breaker
let r = cb.call(|| Ok::<i32, &str>(42));
assert_eq!(r, CallResult::CircuitOpen);
}
#[test]
fn test_recovers_after_timeout() {
let cb = CircuitBreaker::new(2, Duration::from_millis(20));
cb.call(|| Err::<i32, &str>("e"));
cb.call(|| Err::<i32, &str>("e")); // open
assert_eq!(cb.state_name(), "Open");
std::thread::sleep(Duration::from_millis(30));
let r = cb.call(|| Ok::<i32, &str>(99));
assert_eq!(r, CallResult::Success(99));
assert_eq!(cb.state_name(), "Closed");
}
#[test]
fn test_success_resets_failures() {
let cb = CircuitBreaker::new(3, Duration::from_secs(1));
cb.call(|| Err::<i32, &str>("e"));
cb.call(|| Err::<i32, &str>("e")); // 2 failures
cb.call(|| Ok::<i32, &str>(1)); // success — reset
cb.call(|| Err::<i32, &str>("e")); // 1 failure — not open yet
assert_eq!(cb.state_name(), "Closed");
}
}
Deep Comparison
Circuit Breaker — Comparison
Core Insight
The circuit breaker is an automatic state machine that protects callers from cascading failures. Like an electrical circuit breaker: too many failures "trip" it to Open, then it tests recovery in HalfOpen, then resets to Closed on success.
OCaml Approach
state, failures, failure_thresholdMutex to protect state transitions (thread-safe)Unix.gettimeofday() float for wall-clock timingOpen of float carries the timestamp when it openedcall functionRust Approach
Mutex<BreakerState> + Mutex<u32> for state and failuresBreakerState::Open { opened_at: Instant } — Instant for elapsed timeInstant::now().elapsed() >= recovery_timeout for timeout checkcall<T, E, F>(&self, f: F) -> CallResult<T, E> — generic over result typemaybe_transition_to_half_open() for clean separationComparison Table
| Concept | OCaml | Rust |
|---|---|---|
| State enum | type state = Closed \| Open \| HalfOpen | enum BreakerState { Closed, Open { at: Instant }, HalfOpen } |
| Thread safety | Mutex.t + explicit lock/unlock | Mutex<BreakerState> RAII |
| Timing | Unix.gettimeofday () (f64 secs) | Instant::now() / .elapsed() |
| Call result | BrResult v \| CircuitOpen \| CallError e | CallResult<T,E> enum |
| Transition logic | Pattern match in call | Separate maybe_transition method |
| Generic over types | 'a circuit_breaker | Generic <T, E, F> on call |
| Production | Manual or library (retrying-oc) | failsafe-rs, tower::limit |
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
state and failures into a single Mutex<BreakerInner> to eliminate TOCTOU.timeout, count as failure.AtomicUsize counters.on_state_change: impl Fn(BreakerState, BreakerState) callback fired on every state transition.CircuitBreakerRegistry that manages multiple named breakers and provides aggregate health status.