999 Rate Limiter
Tutorial
The Problem
Implement a token bucket rate limiter. The bucket holds up to capacity tokens and refills at refill_rate tokens per second. Each request consumes cost tokens. try_acquire returns immediately (non-blocking); acquire sleeps until tokens are available. The bucket is thread-safe via Mutex<BucketState>.
🎯 Learning Outcomes
{ tokens: f64, last_refill: Instant } protected by Mutexelapsed * refill_rate tokens (capped at capacity)try_acquire(cost) -> bool — non-blocking check and decrementacquire(cost) — blocking version that sleeps until tokens are availablecapacityCode Example
#![allow(clippy::all)]
// 999: Rate Limiter — Token Bucket
// Tokens refill over time; consume one per request. Uses std::time::Instant.
use std::sync::Mutex;
use std::thread;
use std::time::{Duration, Instant};
struct TokenBucket {
state: Mutex<BucketState>,
capacity: f64,
refill_rate: f64, // tokens per second
}
struct BucketState {
tokens: f64,
last_refill: Instant,
}
impl TokenBucket {
fn new(capacity: f64, refill_rate: f64) -> Self {
TokenBucket {
state: Mutex::new(BucketState {
tokens: capacity,
last_refill: Instant::now(),
}),
capacity,
refill_rate,
}
}
fn refill(state: &mut BucketState, capacity: f64, refill_rate: f64) {
let elapsed = state.last_refill.elapsed().as_secs_f64();
let new_tokens = elapsed * refill_rate;
state.tokens = (state.tokens + new_tokens).min(capacity);
state.last_refill = Instant::now();
}
fn try_acquire(&self, cost: f64) -> bool {
let mut state = self.state.lock().unwrap();
Self::refill(&mut state, self.capacity, self.refill_rate);
if state.tokens >= cost {
state.tokens -= cost;
true
} else {
false
}
}
fn acquire(&self, cost: f64) {
while !self.try_acquire(cost) {
thread::sleep(Duration::from_millis(1));
}
}
fn available_tokens(&self) -> f64 {
let mut state = self.state.lock().unwrap();
Self::refill(&mut state, self.capacity, self.refill_rate);
state.tokens
}
}
// --- Approach 1: Burst then deny ---
fn burst_then_deny() -> (usize, usize) {
let bucket = TokenBucket::new(5.0, 1.0); // 5 capacity, 1 token/sec
let mut allowed = 0;
let mut denied = 0;
for _ in 0..10 {
if bucket.try_acquire(1.0) {
allowed += 1;
} else {
denied += 1;
}
}
(allowed, denied)
}
// --- Approach 2: Refill over time ---
fn refill_over_time() -> usize {
let bucket = TokenBucket::new(3.0, 1000.0); // 1000 tokens/sec
// Drain all 3 tokens
for _ in 0..3 {
assert!(bucket.try_acquire(1.0));
}
assert!(!bucket.try_acquire(1.0)); // empty
// Wait 10ms → should get ~10 tokens back, capped at 3
thread::sleep(Duration::from_millis(15));
let mut refilled = 0;
for _ in 0..5 {
if bucket.try_acquire(1.0) {
refilled += 1;
}
}
refilled
}
// --- Approach 3: Rate-limited batch processing ---
fn rate_limited_processing(items: Vec<i32>, rps: f64) -> Vec<i32> {
let bucket = TokenBucket::new(rps, rps); // allow rps/sec burst
items
.into_iter()
.map(|item| {
bucket.acquire(1.0); // wait for token
item * 2
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_burst_allows_up_to_capacity() {
let (allowed, denied) = burst_then_deny();
assert_eq!(allowed, 5);
assert_eq!(denied, 5);
}
#[test]
fn test_tokens_refill() {
let refilled = refill_over_time();
assert!(
refilled >= 3,
"expected at least 3 tokens refilled, got {}",
refilled
);
}
#[test]
fn test_try_acquire_returns_false_when_empty() {
let bucket = TokenBucket::new(2.0, 0.001); // very slow refill
bucket.try_acquire(1.0);
bucket.try_acquire(1.0);
assert!(!bucket.try_acquire(1.0));
}
#[test]
fn test_available_tokens_starts_at_capacity() {
let bucket = TokenBucket::new(10.0, 1.0);
let tokens = bucket.available_tokens();
assert!((tokens - 10.0).abs() < 0.1, "expected ~10, got {}", tokens);
}
#[test]
fn test_cost_greater_than_one() {
let bucket = TokenBucket::new(10.0, 1.0);
// Acquire 5 tokens at once (one heavy request)
assert!(bucket.try_acquire(5.0));
// Now only 5 left — can't take 6
assert!(!bucket.try_acquire(6.0));
// Can take 5
assert!(bucket.try_acquire(5.0));
}
#[test]
fn test_rate_limited_processing() {
let results = rate_limited_processing(vec![1, 2, 3], 1000.0);
assert_eq!(results, vec![2, 4, 6]);
}
}Key Differences
| Aspect | Rust | OCaml |
|---|---|---|
| Monotonic time | Instant::now() — guaranteed monotonic | Unix.gettimeofday() — wall clock, may go backward |
| Elapsed time | .elapsed().as_secs_f64() | now -. last_refill |
| Token cap | .min(capacity) | Float.min capacity ... |
| Lock scope | Mutex::lock guard (RAII) | Mutex.protect closure |
The token bucket allows bursts: if no requests arrive for 10 seconds and capacity = 100, the next burst of 100 requests all succeed immediately. Use a leaky bucket (fixed rate, no burst) when strict rate smoothing is required.
OCaml Approach
type state = {
mutable tokens: float;
mutable last_refill: float;
}
type t = {
state: state;
capacity: float;
refill_rate: float;
mutex: Mutex.t;
}
let create capacity refill_rate =
{ state = { tokens = capacity; last_refill = Unix.gettimeofday () };
capacity; refill_rate; mutex = Mutex.create () }
let try_acquire tb cost =
Mutex.protect tb.mutex (fun () ->
let now = Unix.gettimeofday () in
let elapsed = now -. tb.state.last_refill in
tb.state.tokens <- Float.min tb.capacity (tb.state.tokens +. elapsed *. tb.refill_rate);
tb.state.last_refill <- now;
if tb.state.tokens >= cost then begin
tb.state.tokens <- tb.state.tokens -. cost;
true
end else false
)
OCaml's Unix.gettimeofday() returns float seconds since epoch — less ergonomic than Rust's Instant (monotonic, no epoch arithmetic). Mutex.protect wraps the critical section cleanly.
Full Source
#![allow(clippy::all)]
// 999: Rate Limiter — Token Bucket
// Tokens refill over time; consume one per request. Uses std::time::Instant.
use std::sync::Mutex;
use std::thread;
use std::time::{Duration, Instant};
struct TokenBucket {
state: Mutex<BucketState>,
capacity: f64,
refill_rate: f64, // tokens per second
}
struct BucketState {
tokens: f64,
last_refill: Instant,
}
impl TokenBucket {
fn new(capacity: f64, refill_rate: f64) -> Self {
TokenBucket {
state: Mutex::new(BucketState {
tokens: capacity,
last_refill: Instant::now(),
}),
capacity,
refill_rate,
}
}
fn refill(state: &mut BucketState, capacity: f64, refill_rate: f64) {
let elapsed = state.last_refill.elapsed().as_secs_f64();
let new_tokens = elapsed * refill_rate;
state.tokens = (state.tokens + new_tokens).min(capacity);
state.last_refill = Instant::now();
}
fn try_acquire(&self, cost: f64) -> bool {
let mut state = self.state.lock().unwrap();
Self::refill(&mut state, self.capacity, self.refill_rate);
if state.tokens >= cost {
state.tokens -= cost;
true
} else {
false
}
}
fn acquire(&self, cost: f64) {
while !self.try_acquire(cost) {
thread::sleep(Duration::from_millis(1));
}
}
fn available_tokens(&self) -> f64 {
let mut state = self.state.lock().unwrap();
Self::refill(&mut state, self.capacity, self.refill_rate);
state.tokens
}
}
// --- Approach 1: Burst then deny ---
fn burst_then_deny() -> (usize, usize) {
let bucket = TokenBucket::new(5.0, 1.0); // 5 capacity, 1 token/sec
let mut allowed = 0;
let mut denied = 0;
for _ in 0..10 {
if bucket.try_acquire(1.0) {
allowed += 1;
} else {
denied += 1;
}
}
(allowed, denied)
}
// --- Approach 2: Refill over time ---
fn refill_over_time() -> usize {
let bucket = TokenBucket::new(3.0, 1000.0); // 1000 tokens/sec
// Drain all 3 tokens
for _ in 0..3 {
assert!(bucket.try_acquire(1.0));
}
assert!(!bucket.try_acquire(1.0)); // empty
// Wait 10ms → should get ~10 tokens back, capped at 3
thread::sleep(Duration::from_millis(15));
let mut refilled = 0;
for _ in 0..5 {
if bucket.try_acquire(1.0) {
refilled += 1;
}
}
refilled
}
// --- Approach 3: Rate-limited batch processing ---
fn rate_limited_processing(items: Vec<i32>, rps: f64) -> Vec<i32> {
let bucket = TokenBucket::new(rps, rps); // allow rps/sec burst
items
.into_iter()
.map(|item| {
bucket.acquire(1.0); // wait for token
item * 2
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_burst_allows_up_to_capacity() {
let (allowed, denied) = burst_then_deny();
assert_eq!(allowed, 5);
assert_eq!(denied, 5);
}
#[test]
fn test_tokens_refill() {
let refilled = refill_over_time();
assert!(
refilled >= 3,
"expected at least 3 tokens refilled, got {}",
refilled
);
}
#[test]
fn test_try_acquire_returns_false_when_empty() {
let bucket = TokenBucket::new(2.0, 0.001); // very slow refill
bucket.try_acquire(1.0);
bucket.try_acquire(1.0);
assert!(!bucket.try_acquire(1.0));
}
#[test]
fn test_available_tokens_starts_at_capacity() {
let bucket = TokenBucket::new(10.0, 1.0);
let tokens = bucket.available_tokens();
assert!((tokens - 10.0).abs() < 0.1, "expected ~10, got {}", tokens);
}
#[test]
fn test_cost_greater_than_one() {
let bucket = TokenBucket::new(10.0, 1.0);
// Acquire 5 tokens at once (one heavy request)
assert!(bucket.try_acquire(5.0));
// Now only 5 left — can't take 6
assert!(!bucket.try_acquire(6.0));
// Can take 5
assert!(bucket.try_acquire(5.0));
}
#[test]
fn test_rate_limited_processing() {
let results = rate_limited_processing(vec![1, 2, 3], 1000.0);
assert_eq!(results, vec![2, 4, 6]);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_burst_allows_up_to_capacity() {
let (allowed, denied) = burst_then_deny();
assert_eq!(allowed, 5);
assert_eq!(denied, 5);
}
#[test]
fn test_tokens_refill() {
let refilled = refill_over_time();
assert!(
refilled >= 3,
"expected at least 3 tokens refilled, got {}",
refilled
);
}
#[test]
fn test_try_acquire_returns_false_when_empty() {
let bucket = TokenBucket::new(2.0, 0.001); // very slow refill
bucket.try_acquire(1.0);
bucket.try_acquire(1.0);
assert!(!bucket.try_acquire(1.0));
}
#[test]
fn test_available_tokens_starts_at_capacity() {
let bucket = TokenBucket::new(10.0, 1.0);
let tokens = bucket.available_tokens();
assert!((tokens - 10.0).abs() < 0.1, "expected ~10, got {}", tokens);
}
#[test]
fn test_cost_greater_than_one() {
let bucket = TokenBucket::new(10.0, 1.0);
// Acquire 5 tokens at once (one heavy request)
assert!(bucket.try_acquire(5.0));
// Now only 5 left — can't take 6
assert!(!bucket.try_acquire(6.0));
// Can take 5
assert!(bucket.try_acquire(5.0));
}
#[test]
fn test_rate_limited_processing() {
let results = rate_limited_processing(vec![1, 2, 3], 1000.0);
assert_eq!(results, vec![2, 4, 6]);
}
}
Deep Comparison
Rate Limiter — Token Bucket — Comparison
Core Insight
The token bucket algorithm is continuous-time rate limiting: tokens accumulate at a fixed rate up to capacity, and each request costs tokens. This allows bursts (up to capacity) while enforcing average rate.
OCaml Approach
Unix.gettimeofday () returns a float (seconds since epoch)elapsed = now - last_refill gives time deltanew_tokens = elapsed * refill_rate — continuous refillFloat.min capacity (tokens + new_tokens) — cap at capacityMutex for thread safety; try_acquire is non-blockingRust Approach
Instant::now() — monotonic clock, immune to system time changes.elapsed().as_secs_f64() for fractional secondsMutex<BucketState> wraps mutable state (tokens + last_refill)acquire spins with thread::sleep(1ms) when emptytry_acquire(cost) for variable-cost requests (e.g., large queries cost more)Comparison Table
| Concept | OCaml | Rust |
|---|---|---|
| Time primitive | Unix.gettimeofday () (wall clock) | Instant::now() (monotonic) |
| Elapsed time | now -. last_refill (float secs) | .elapsed().as_secs_f64() |
| Refill formula | min capacity (tokens + dt * rate) | (tokens + dt * rate).min(capacity) |
| Non-blocking check | try_acquire | try_acquire(cost) -> bool |
| Blocking acquire | Spin with sleepf 0.001 | Spin with thread::sleep(1ms) |
| Variable cost | ~cost parameter | cost: f64 parameter |
| Thread safety | Mutex.t + explicit lock/unlock | Mutex<BucketState> RAII |
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
N requests, reject if N requests have occurred in the last window_duration.requests_allowed, requests_denied, and total_wait_ms.acquire_many(cost: f64, timeout: Duration) -> bool — acquire tokens but give up after timeout.reqwest::Client with TokenBucket to limit to N requests/second.