ExamplesBy LevelBy TopicLearning Paths
999 Advanced

999 Rate Limiter

Functional Programming

Tutorial

The Problem

Implement a token bucket rate limiter. The bucket holds up to capacity tokens and refills at refill_rate tokens per second. Each request consumes cost tokens. try_acquire returns immediately (non-blocking); acquire sleeps until tokens are available. The bucket is thread-safe via Mutex<BucketState>.

🎯 Learning Outcomes

  • • Model a token bucket with { tokens: f64, last_refill: Instant } protected by Mutex
  • • Implement lazy refill: on every access, compute elapsed time and add elapsed * refill_rate tokens (capped at capacity)
  • • Implement try_acquire(cost) -> bool — non-blocking check and decrement
  • • Implement acquire(cost) — blocking version that sleeps until tokens are available
  • • Understand the token bucket vs leaky bucket distinction: token bucket allows short bursts up to capacity
  • Code Example

    #![allow(clippy::all)]
    // 999: Rate Limiter — Token Bucket
    // Tokens refill over time; consume one per request. Uses std::time::Instant.
    
    use std::sync::Mutex;
    use std::thread;
    use std::time::{Duration, Instant};
    
    struct TokenBucket {
        state: Mutex<BucketState>,
        capacity: f64,
        refill_rate: f64, // tokens per second
    }
    
    struct BucketState {
        tokens: f64,
        last_refill: Instant,
    }
    
    impl TokenBucket {
        fn new(capacity: f64, refill_rate: f64) -> Self {
            TokenBucket {
                state: Mutex::new(BucketState {
                    tokens: capacity,
                    last_refill: Instant::now(),
                }),
                capacity,
                refill_rate,
            }
        }
    
        fn refill(state: &mut BucketState, capacity: f64, refill_rate: f64) {
            let elapsed = state.last_refill.elapsed().as_secs_f64();
            let new_tokens = elapsed * refill_rate;
            state.tokens = (state.tokens + new_tokens).min(capacity);
            state.last_refill = Instant::now();
        }
    
        fn try_acquire(&self, cost: f64) -> bool {
            let mut state = self.state.lock().unwrap();
            Self::refill(&mut state, self.capacity, self.refill_rate);
            if state.tokens >= cost {
                state.tokens -= cost;
                true
            } else {
                false
            }
        }
    
        fn acquire(&self, cost: f64) {
            while !self.try_acquire(cost) {
                thread::sleep(Duration::from_millis(1));
            }
        }
    
        fn available_tokens(&self) -> f64 {
            let mut state = self.state.lock().unwrap();
            Self::refill(&mut state, self.capacity, self.refill_rate);
            state.tokens
        }
    }
    
    // --- Approach 1: Burst then deny ---
    fn burst_then_deny() -> (usize, usize) {
        let bucket = TokenBucket::new(5.0, 1.0); // 5 capacity, 1 token/sec
        let mut allowed = 0;
        let mut denied = 0;
    
        for _ in 0..10 {
            if bucket.try_acquire(1.0) {
                allowed += 1;
            } else {
                denied += 1;
            }
        }
        (allowed, denied)
    }
    
    // --- Approach 2: Refill over time ---
    fn refill_over_time() -> usize {
        let bucket = TokenBucket::new(3.0, 1000.0); // 1000 tokens/sec
    
        // Drain all 3 tokens
        for _ in 0..3 {
            assert!(bucket.try_acquire(1.0));
        }
        assert!(!bucket.try_acquire(1.0)); // empty
    
        // Wait 10ms → should get ~10 tokens back, capped at 3
        thread::sleep(Duration::from_millis(15));
    
        let mut refilled = 0;
        for _ in 0..5 {
            if bucket.try_acquire(1.0) {
                refilled += 1;
            }
        }
        refilled
    }
    
    // --- Approach 3: Rate-limited batch processing ---
    fn rate_limited_processing(items: Vec<i32>, rps: f64) -> Vec<i32> {
        let bucket = TokenBucket::new(rps, rps); // allow rps/sec burst
        items
            .into_iter()
            .map(|item| {
                bucket.acquire(1.0); // wait for token
                item * 2
            })
            .collect()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_burst_allows_up_to_capacity() {
            let (allowed, denied) = burst_then_deny();
            assert_eq!(allowed, 5);
            assert_eq!(denied, 5);
        }
    
        #[test]
        fn test_tokens_refill() {
            let refilled = refill_over_time();
            assert!(
                refilled >= 3,
                "expected at least 3 tokens refilled, got {}",
                refilled
            );
        }
    
        #[test]
        fn test_try_acquire_returns_false_when_empty() {
            let bucket = TokenBucket::new(2.0, 0.001); // very slow refill
            bucket.try_acquire(1.0);
            bucket.try_acquire(1.0);
            assert!(!bucket.try_acquire(1.0));
        }
    
        #[test]
        fn test_available_tokens_starts_at_capacity() {
            let bucket = TokenBucket::new(10.0, 1.0);
            let tokens = bucket.available_tokens();
            assert!((tokens - 10.0).abs() < 0.1, "expected ~10, got {}", tokens);
        }
    
        #[test]
        fn test_cost_greater_than_one() {
            let bucket = TokenBucket::new(10.0, 1.0);
            // Acquire 5 tokens at once (one heavy request)
            assert!(bucket.try_acquire(5.0));
            // Now only 5 left — can't take 6
            assert!(!bucket.try_acquire(6.0));
            // Can take 5
            assert!(bucket.try_acquire(5.0));
        }
    
        #[test]
        fn test_rate_limited_processing() {
            let results = rate_limited_processing(vec![1, 2, 3], 1000.0);
            assert_eq!(results, vec![2, 4, 6]);
        }
    }

    Key Differences

    AspectRustOCaml
    Monotonic timeInstant::now() — guaranteed monotonicUnix.gettimeofday() — wall clock, may go backward
    Elapsed time.elapsed().as_secs_f64()now -. last_refill
    Token cap.min(capacity)Float.min capacity ...
    Lock scopeMutex::lock guard (RAII)Mutex.protect closure

    The token bucket allows bursts: if no requests arrive for 10 seconds and capacity = 100, the next burst of 100 requests all succeed immediately. Use a leaky bucket (fixed rate, no burst) when strict rate smoothing is required.

    OCaml Approach

    type state = {
      mutable tokens: float;
      mutable last_refill: float;
    }
    
    type t = {
      state: state;
      capacity: float;
      refill_rate: float;
      mutex: Mutex.t;
    }
    
    let create capacity refill_rate =
      { state = { tokens = capacity; last_refill = Unix.gettimeofday () };
        capacity; refill_rate; mutex = Mutex.create () }
    
    let try_acquire tb cost =
      Mutex.protect tb.mutex (fun () ->
        let now = Unix.gettimeofday () in
        let elapsed = now -. tb.state.last_refill in
        tb.state.tokens <- Float.min tb.capacity (tb.state.tokens +. elapsed *. tb.refill_rate);
        tb.state.last_refill <- now;
        if tb.state.tokens >= cost then begin
          tb.state.tokens <- tb.state.tokens -. cost;
          true
        end else false
      )
    

    OCaml's Unix.gettimeofday() returns float seconds since epoch — less ergonomic than Rust's Instant (monotonic, no epoch arithmetic). Mutex.protect wraps the critical section cleanly.

    Full Source

    #![allow(clippy::all)]
    // 999: Rate Limiter — Token Bucket
    // Tokens refill over time; consume one per request. Uses std::time::Instant.
    
    use std::sync::Mutex;
    use std::thread;
    use std::time::{Duration, Instant};
    
    struct TokenBucket {
        state: Mutex<BucketState>,
        capacity: f64,
        refill_rate: f64, // tokens per second
    }
    
    struct BucketState {
        tokens: f64,
        last_refill: Instant,
    }
    
    impl TokenBucket {
        fn new(capacity: f64, refill_rate: f64) -> Self {
            TokenBucket {
                state: Mutex::new(BucketState {
                    tokens: capacity,
                    last_refill: Instant::now(),
                }),
                capacity,
                refill_rate,
            }
        }
    
        fn refill(state: &mut BucketState, capacity: f64, refill_rate: f64) {
            let elapsed = state.last_refill.elapsed().as_secs_f64();
            let new_tokens = elapsed * refill_rate;
            state.tokens = (state.tokens + new_tokens).min(capacity);
            state.last_refill = Instant::now();
        }
    
        fn try_acquire(&self, cost: f64) -> bool {
            let mut state = self.state.lock().unwrap();
            Self::refill(&mut state, self.capacity, self.refill_rate);
            if state.tokens >= cost {
                state.tokens -= cost;
                true
            } else {
                false
            }
        }
    
        fn acquire(&self, cost: f64) {
            while !self.try_acquire(cost) {
                thread::sleep(Duration::from_millis(1));
            }
        }
    
        fn available_tokens(&self) -> f64 {
            let mut state = self.state.lock().unwrap();
            Self::refill(&mut state, self.capacity, self.refill_rate);
            state.tokens
        }
    }
    
    // --- Approach 1: Burst then deny ---
    fn burst_then_deny() -> (usize, usize) {
        let bucket = TokenBucket::new(5.0, 1.0); // 5 capacity, 1 token/sec
        let mut allowed = 0;
        let mut denied = 0;
    
        for _ in 0..10 {
            if bucket.try_acquire(1.0) {
                allowed += 1;
            } else {
                denied += 1;
            }
        }
        (allowed, denied)
    }
    
    // --- Approach 2: Refill over time ---
    fn refill_over_time() -> usize {
        let bucket = TokenBucket::new(3.0, 1000.0); // 1000 tokens/sec
    
        // Drain all 3 tokens
        for _ in 0..3 {
            assert!(bucket.try_acquire(1.0));
        }
        assert!(!bucket.try_acquire(1.0)); // empty
    
        // Wait 10ms → should get ~10 tokens back, capped at 3
        thread::sleep(Duration::from_millis(15));
    
        let mut refilled = 0;
        for _ in 0..5 {
            if bucket.try_acquire(1.0) {
                refilled += 1;
            }
        }
        refilled
    }
    
    // --- Approach 3: Rate-limited batch processing ---
    fn rate_limited_processing(items: Vec<i32>, rps: f64) -> Vec<i32> {
        let bucket = TokenBucket::new(rps, rps); // allow rps/sec burst
        items
            .into_iter()
            .map(|item| {
                bucket.acquire(1.0); // wait for token
                item * 2
            })
            .collect()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_burst_allows_up_to_capacity() {
            let (allowed, denied) = burst_then_deny();
            assert_eq!(allowed, 5);
            assert_eq!(denied, 5);
        }
    
        #[test]
        fn test_tokens_refill() {
            let refilled = refill_over_time();
            assert!(
                refilled >= 3,
                "expected at least 3 tokens refilled, got {}",
                refilled
            );
        }
    
        #[test]
        fn test_try_acquire_returns_false_when_empty() {
            let bucket = TokenBucket::new(2.0, 0.001); // very slow refill
            bucket.try_acquire(1.0);
            bucket.try_acquire(1.0);
            assert!(!bucket.try_acquire(1.0));
        }
    
        #[test]
        fn test_available_tokens_starts_at_capacity() {
            let bucket = TokenBucket::new(10.0, 1.0);
            let tokens = bucket.available_tokens();
            assert!((tokens - 10.0).abs() < 0.1, "expected ~10, got {}", tokens);
        }
    
        #[test]
        fn test_cost_greater_than_one() {
            let bucket = TokenBucket::new(10.0, 1.0);
            // Acquire 5 tokens at once (one heavy request)
            assert!(bucket.try_acquire(5.0));
            // Now only 5 left — can't take 6
            assert!(!bucket.try_acquire(6.0));
            // Can take 5
            assert!(bucket.try_acquire(5.0));
        }
    
        #[test]
        fn test_rate_limited_processing() {
            let results = rate_limited_processing(vec![1, 2, 3], 1000.0);
            assert_eq!(results, vec![2, 4, 6]);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_burst_allows_up_to_capacity() {
            let (allowed, denied) = burst_then_deny();
            assert_eq!(allowed, 5);
            assert_eq!(denied, 5);
        }
    
        #[test]
        fn test_tokens_refill() {
            let refilled = refill_over_time();
            assert!(
                refilled >= 3,
                "expected at least 3 tokens refilled, got {}",
                refilled
            );
        }
    
        #[test]
        fn test_try_acquire_returns_false_when_empty() {
            let bucket = TokenBucket::new(2.0, 0.001); // very slow refill
            bucket.try_acquire(1.0);
            bucket.try_acquire(1.0);
            assert!(!bucket.try_acquire(1.0));
        }
    
        #[test]
        fn test_available_tokens_starts_at_capacity() {
            let bucket = TokenBucket::new(10.0, 1.0);
            let tokens = bucket.available_tokens();
            assert!((tokens - 10.0).abs() < 0.1, "expected ~10, got {}", tokens);
        }
    
        #[test]
        fn test_cost_greater_than_one() {
            let bucket = TokenBucket::new(10.0, 1.0);
            // Acquire 5 tokens at once (one heavy request)
            assert!(bucket.try_acquire(5.0));
            // Now only 5 left — can't take 6
            assert!(!bucket.try_acquire(6.0));
            // Can take 5
            assert!(bucket.try_acquire(5.0));
        }
    
        #[test]
        fn test_rate_limited_processing() {
            let results = rate_limited_processing(vec![1, 2, 3], 1000.0);
            assert_eq!(results, vec![2, 4, 6]);
        }
    }

    Deep Comparison

    Rate Limiter — Token Bucket — Comparison

    Core Insight

    The token bucket algorithm is continuous-time rate limiting: tokens accumulate at a fixed rate up to capacity, and each request costs tokens. This allows bursts (up to capacity) while enforcing average rate.

    OCaml Approach

  • Unix.gettimeofday () returns a float (seconds since epoch)
  • elapsed = now - last_refill gives time delta
  • new_tokens = elapsed * refill_rate — continuous refill
  • Float.min capacity (tokens + new_tokens) — cap at capacity
  • Mutex for thread safety; try_acquire is non-blocking
  • Rust Approach

  • Instant::now() — monotonic clock, immune to system time changes
  • .elapsed().as_secs_f64() for fractional seconds
  • Mutex<BucketState> wraps mutable state (tokens + last_refill)
  • acquire spins with thread::sleep(1ms) when empty
  • try_acquire(cost) for variable-cost requests (e.g., large queries cost more)
  • Comparison Table

    ConceptOCamlRust
    Time primitiveUnix.gettimeofday () (wall clock)Instant::now() (monotonic)
    Elapsed timenow -. last_refill (float secs).elapsed().as_secs_f64()
    Refill formulamin capacity (tokens + dt * rate)(tokens + dt * rate).min(capacity)
    Non-blocking checktry_acquiretry_acquire(cost) -> bool
    Blocking acquireSpin with sleepf 0.001Spin with thread::sleep(1ms)
    Variable cost~cost parametercost: f64 parameter
    Thread safetyMutex.t + explicit lock/unlockMutex<BucketState> RAII

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Implement a sliding window rate limiter: track timestamps of the last N requests, reject if N requests have occurred in the last window_duration.
  • Add metrics: track requests_allowed, requests_denied, and total_wait_ms.
  • Implement acquire_many(cost: f64, timeout: Duration) -> bool — acquire tokens but give up after timeout.
  • Build a rate-limited HTTP client: wrap reqwest::Client with TokenBucket to limit to N requests/second.
  • Implement a hierarchical rate limiter: global bucket + per-user bucket; both must have tokens for a request to proceed.
  • Open Source Repos