339: Semaphore — Controlling Concurrency
Tutorial Video
Text description (accessibility)
This video demonstrates the "339: Semaphore — Controlling Concurrency" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Connection pools, rate limiters, and resource-bounded operations need to limit concurrent access to at most N simultaneous operations. Key difference from OCaml: 1. **Condvar pattern**: Both Rust and OCaml use mutex + condition variable for efficient blocking — the same POSIX pattern.
Tutorial
The Problem
Connection pools, rate limiters, and resource-bounded operations need to limit concurrent access to at most N simultaneous operations. A mutex limits to 1; a semaphore limits to N. Common uses: limit concurrent HTTP requests to 10 to avoid overwhelming a third-party API, limit concurrent database connections to the pool size, or limit parallel file reads to avoid exhausting file descriptors.
🎯 Learning Outcomes
Mutex<usize> + Condvar for signalingacquire() to block when the permit count reaches zerorelease() to increment the count and signal waiting taskstokio::sync::Semaphore as the async-aware production alternativeCode Example
fn acquire(&self) {
let mut c = self.count.lock().unwrap();
while *c == 0 {
c = self.cond.wait(c).unwrap();
}
*c -= 1;
}Key Differences
SemaphorePermit (from tokio::sync::Semaphore) uses RAII — release happens automatically on drop; manual semaphores require explicit release.acquire().await yields instead of blocking the thread.acquire(), perform operation, release() after a minimum time — implements rate limiting.OCaml Approach
OCaml implements semaphores with Mutex + Condition similarly:
type semaphore = { mutable count: int; mutex: Mutex.t; cond: Condition.t }
let acquire s =
Mutex.lock s.mutex;
while s.count = 0 do Condition.wait s.cond s.mutex done;
s.count <- s.count - 1;
Mutex.unlock s.mutex
Full Source
#![allow(clippy::all)]
//! # Semaphore Async
//!
//! Limit how many tasks run concurrently — rate limiting, connection pools, throttling.
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
/// A counting semaphore that limits concurrent access.
pub struct Semaphore {
count: Mutex<usize>,
cond: Condvar,
}
impl Semaphore {
pub fn new(permits: usize) -> Arc<Self> {
Arc::new(Self {
count: Mutex::new(permits),
cond: Condvar::new(),
})
}
/// Acquire a permit, blocking if none available.
pub fn acquire(&self) {
let mut count = self.count.lock().unwrap();
while *count == 0 {
count = self.cond.wait(count).unwrap();
}
*count -= 1;
}
/// Release a permit, potentially waking a waiting thread.
pub fn release(&self) {
*self.count.lock().unwrap() += 1;
self.cond.notify_one();
}
/// Get current available permits.
pub fn available(&self) -> usize {
*self.count.lock().unwrap()
}
}
/// RAII permit that auto-releases when dropped.
pub struct Permit<'a> {
semaphore: &'a Semaphore,
}
impl<'a> Permit<'a> {
pub fn new(semaphore: &'a Semaphore) -> Self {
semaphore.acquire();
Self { semaphore }
}
}
impl<'a> Drop for Permit<'a> {
fn drop(&mut self) {
self.semaphore.release();
}
}
impl Semaphore {
/// Acquire a permit that auto-releases on drop.
pub fn permit(&self) -> Permit<'_> {
Permit::new(self)
}
}
/// Demonstrates semaphore limiting concurrency.
pub fn limited_concurrency_demo(max_concurrent: usize, num_workers: usize) -> usize {
use std::sync::atomic::{AtomicUsize, Ordering};
let sem = Semaphore::new(max_concurrent);
let active = Arc::new(AtomicUsize::new(0));
let peak = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..num_workers)
.map(|_| {
let sem = Arc::clone(&sem);
let active = Arc::clone(&active);
let peak = Arc::clone(&peak);
thread::spawn(move || {
let _permit = sem.permit();
let current = active.fetch_add(1, Ordering::SeqCst) + 1;
peak.fetch_max(current, Ordering::SeqCst);
thread::sleep(Duration::from_millis(5));
active.fetch_sub(1, Ordering::SeqCst);
})
})
.collect();
for h in handles {
h.join().unwrap();
}
peak.load(Ordering::SeqCst)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_semaphore_limits_concurrency() {
let peak = limited_concurrency_demo(2, 6);
assert!(peak <= 2, "Peak {} exceeded limit 2", peak);
}
#[test]
fn test_permit_auto_release() {
let sem = Semaphore::new(1);
{
let _permit = sem.permit();
assert_eq!(sem.available(), 0);
}
assert_eq!(sem.available(), 1);
}
#[test]
fn test_multiple_permits() {
let sem = Semaphore::new(3);
let _p1 = sem.permit();
let _p2 = sem.permit();
assert_eq!(sem.available(), 1);
}
#[test]
fn test_acquire_release() {
let sem = Semaphore::new(2);
sem.acquire();
assert_eq!(sem.available(), 1);
sem.release();
assert_eq!(sem.available(), 2);
}
#[test]
fn test_concurrent_workers() {
let sem = Semaphore::new(3);
let active = Arc::new(AtomicUsize::new(0));
let peak = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..8)
.map(|_| {
let sem = Arc::clone(&sem);
let active = Arc::clone(&active);
let peak = Arc::clone(&peak);
thread::spawn(move || {
let _permit = sem.permit();
let cur = active.fetch_add(1, Ordering::SeqCst) + 1;
peak.fetch_max(cur, Ordering::SeqCst);
thread::sleep(Duration::from_millis(2));
active.fetch_sub(1, Ordering::SeqCst);
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert!(peak.load(Ordering::SeqCst) <= 3);
}
}#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_semaphore_limits_concurrency() {
let peak = limited_concurrency_demo(2, 6);
assert!(peak <= 2, "Peak {} exceeded limit 2", peak);
}
#[test]
fn test_permit_auto_release() {
let sem = Semaphore::new(1);
{
let _permit = sem.permit();
assert_eq!(sem.available(), 0);
}
assert_eq!(sem.available(), 1);
}
#[test]
fn test_multiple_permits() {
let sem = Semaphore::new(3);
let _p1 = sem.permit();
let _p2 = sem.permit();
assert_eq!(sem.available(), 1);
}
#[test]
fn test_acquire_release() {
let sem = Semaphore::new(2);
sem.acquire();
assert_eq!(sem.available(), 1);
sem.release();
assert_eq!(sem.available(), 2);
}
#[test]
fn test_concurrent_workers() {
let sem = Semaphore::new(3);
let active = Arc::new(AtomicUsize::new(0));
let peak = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..8)
.map(|_| {
let sem = Arc::clone(&sem);
let active = Arc::clone(&active);
let peak = Arc::clone(&peak);
thread::spawn(move || {
let _permit = sem.permit();
let cur = active.fetch_add(1, Ordering::SeqCst) + 1;
peak.fetch_max(cur, Ordering::SeqCst);
thread::sleep(Duration::from_millis(2));
active.fetch_sub(1, Ordering::SeqCst);
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert!(peak.load(Ordering::SeqCst) <= 3);
}
}
Deep Comparison
OCaml vs Rust: Semaphore Async
Semaphore Implementation
OCaml:
let acquire sem =
Mutex.lock sem.m;
while !(sem.count) = 0 do
Condition.wait sem.cv sem.m
done;
decr sem.count;
Mutex.unlock sem.m
Rust:
fn acquire(&self) {
let mut c = self.count.lock().unwrap();
while *c == 0 {
c = self.cond.wait(c).unwrap();
}
*c -= 1;
}
Key Differences
| Aspect | OCaml | Rust |
|---|---|---|
| Stdlib support | No | tokio::sync::Semaphore |
| RAII release | No | Permit with Drop |
| Condition wait | Condition.wait cv m | cond.wait(guard) |
| Atomic ops | Not needed | AtomicUsize for stats |