990 Semaphore
Tutorial
The Problem
Implement a counting semaphore using Mutex<usize> + Condvar. The semaphore limits the number of concurrent operations: acquire blocks when the count reaches zero; release increments the count and wakes a waiting thread. Provide a RAII with_permit helper that acquires and releases automatically.
🎯 Learning Outcomes
Semaphore { count: Mutex<usize>, cond: Condvar, max: usize }acquire using Condvar::wait(guard) in a while *count == 0 loop (spurious wakeup protection)release that increments count and calls Condvar::notify_one()with_permit<T, F: FnOnce() -> T>(&self, f: F) -> T for RAII acquire/releaseCode Example
#![allow(clippy::all)]
// 990: Semaphore via Mutex<usize> + Condvar
// Counting semaphore: limit N concurrent operations
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
struct Semaphore {
count: Mutex<usize>,
cond: Condvar,
max: usize,
}
impl Semaphore {
fn new(n: usize) -> Self {
Semaphore {
count: Mutex::new(n),
cond: Condvar::new(),
max: n,
}
}
fn acquire(&self) {
let mut count = self.count.lock().unwrap();
while *count == 0 {
count = self.cond.wait(count).unwrap();
}
*count -= 1;
}
fn release(&self) {
let mut count = self.count.lock().unwrap();
if *count < self.max {
*count += 1;
self.cond.notify_one();
}
}
fn with_permit<T, F: FnOnce() -> T>(&self, f: F) -> T {
self.acquire();
let result = f();
self.release();
result
}
}
// --- Approach 1: Limit concurrent workers ---
fn limited_concurrency() -> usize {
let sem = Arc::new(Semaphore::new(3));
let active = Arc::new(Mutex::new(0usize));
let max_active = Arc::new(Mutex::new(0usize));
let handles: Vec<_> = (0..10)
.map(|_| {
let sem = Arc::clone(&sem);
let active = Arc::clone(&active);
let max_active = Arc::clone(&max_active);
thread::spawn(move || {
sem.with_permit(|| {
{
let mut a = active.lock().unwrap();
*a += 1;
let mut m = max_active.lock().unwrap();
if *a > *m {
*m = *a;
}
}
thread::sleep(Duration::from_millis(5));
*active.lock().unwrap() -= 1;
});
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let x = *max_active.lock().unwrap();
x
}
// --- Approach 2: Binary semaphore as mutex ---
fn binary_semaphore_counter() -> u32 {
let sem = Arc::new(Semaphore::new(1));
let counter = Arc::new(Mutex::new(0u32));
let handles: Vec<_> = (0..5)
.map(|_| {
let sem = Arc::clone(&sem);
let counter = Arc::clone(&counter);
thread::spawn(move || {
for _ in 0..100 {
sem.with_permit(|| {
*counter.lock().unwrap() += 1;
});
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let x = *counter.lock().unwrap();
x
}
// --- Approach 3: Drain a resource pool ---
fn resource_pool_demo() -> Vec<usize> {
const POOL_SIZE: usize = 2;
let sem = Arc::new(Semaphore::new(POOL_SIZE));
let usage_log = Arc::new(Mutex::new(Vec::new()));
let handles: Vec<_> = (0..6)
.map(|i| {
let sem = Arc::clone(&sem);
let log = Arc::clone(&usage_log);
thread::spawn(move || {
sem.with_permit(|| {
log.lock().unwrap().push(i);
thread::sleep(Duration::from_millis(2));
});
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let mut log = usage_log.lock().unwrap().clone();
log.sort();
log
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_limited_concurrency() {
let max = limited_concurrency();
assert!(max <= 3, "max concurrent was {}, expected ≤ 3", max);
assert!(max >= 1);
}
#[test]
fn test_binary_semaphore_correctness() {
assert_eq!(binary_semaphore_counter(), 500);
}
#[test]
fn test_resource_pool() {
let log = resource_pool_demo();
assert_eq!(log.len(), 6);
assert_eq!(log, vec![0, 1, 2, 3, 4, 5]);
}
#[test]
fn test_semaphore_acquire_release() {
let sem = Semaphore::new(2);
sem.acquire();
sem.acquire();
// Can't acquire a third — release one
sem.release();
sem.acquire(); // should succeed
sem.release();
sem.release();
}
#[test]
fn test_semaphore_permits_count() {
let sem = Semaphore::new(3);
assert_eq!(*sem.count.lock().unwrap(), 3);
sem.acquire();
assert_eq!(*sem.count.lock().unwrap(), 2);
sem.release();
assert_eq!(*sem.count.lock().unwrap(), 3);
}
}Key Differences
| Aspect | Rust | OCaml |
|---|---|---|
| Condition variable | Condvar — paired with Mutex | Condition.t — paired with Mutex.t |
| Spurious wakeup | while *count == 0 loop | while count = 0 do ... done loop |
| RAII acquire | with_permit method | Fun.protect ~finally |
| Atomic wait+sleep | cond.wait(guard) — releases mutex atomically | Condition.wait cond mutex |
Counting semaphores model "N concurrent resources" — N database connections, N parallel downloads, N worker slots. tokio::sync::Semaphore provides the async equivalent for non-blocking workloads.
OCaml Approach
type t = {
mutable count: int;
max: int;
mutex: Mutex.t;
cond: Condition.t;
}
let create n = { count = n; max = n; mutex = Mutex.create (); cond = Condition.create () }
let acquire s =
Mutex.lock s.mutex;
while s.count = 0 do
Condition.wait s.cond s.mutex
done;
s.count <- s.count - 1;
Mutex.unlock s.mutex
let release s =
Mutex.lock s.mutex;
if s.count < s.max then begin
s.count <- s.count + 1;
Condition.signal s.cond
end;
Mutex.unlock s.mutex
let with_permit s f =
acquire s;
Fun.protect ~finally:(fun () -> release s) f
OCaml's Condition.wait is the direct analog of Condvar::wait. Fun.protect ~finally ensures release runs even if f raises an exception — the OCaml equivalent of Rust's drop-on-unwind.
Full Source
#![allow(clippy::all)]
// 990: Semaphore via Mutex<usize> + Condvar
// Counting semaphore: limit N concurrent operations
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
struct Semaphore {
count: Mutex<usize>,
cond: Condvar,
max: usize,
}
impl Semaphore {
fn new(n: usize) -> Self {
Semaphore {
count: Mutex::new(n),
cond: Condvar::new(),
max: n,
}
}
fn acquire(&self) {
let mut count = self.count.lock().unwrap();
while *count == 0 {
count = self.cond.wait(count).unwrap();
}
*count -= 1;
}
fn release(&self) {
let mut count = self.count.lock().unwrap();
if *count < self.max {
*count += 1;
self.cond.notify_one();
}
}
fn with_permit<T, F: FnOnce() -> T>(&self, f: F) -> T {
self.acquire();
let result = f();
self.release();
result
}
}
// --- Approach 1: Limit concurrent workers ---
fn limited_concurrency() -> usize {
let sem = Arc::new(Semaphore::new(3));
let active = Arc::new(Mutex::new(0usize));
let max_active = Arc::new(Mutex::new(0usize));
let handles: Vec<_> = (0..10)
.map(|_| {
let sem = Arc::clone(&sem);
let active = Arc::clone(&active);
let max_active = Arc::clone(&max_active);
thread::spawn(move || {
sem.with_permit(|| {
{
let mut a = active.lock().unwrap();
*a += 1;
let mut m = max_active.lock().unwrap();
if *a > *m {
*m = *a;
}
}
thread::sleep(Duration::from_millis(5));
*active.lock().unwrap() -= 1;
});
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let x = *max_active.lock().unwrap();
x
}
// --- Approach 2: Binary semaphore as mutex ---
fn binary_semaphore_counter() -> u32 {
let sem = Arc::new(Semaphore::new(1));
let counter = Arc::new(Mutex::new(0u32));
let handles: Vec<_> = (0..5)
.map(|_| {
let sem = Arc::clone(&sem);
let counter = Arc::clone(&counter);
thread::spawn(move || {
for _ in 0..100 {
sem.with_permit(|| {
*counter.lock().unwrap() += 1;
});
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let x = *counter.lock().unwrap();
x
}
// --- Approach 3: Drain a resource pool ---
fn resource_pool_demo() -> Vec<usize> {
const POOL_SIZE: usize = 2;
let sem = Arc::new(Semaphore::new(POOL_SIZE));
let usage_log = Arc::new(Mutex::new(Vec::new()));
let handles: Vec<_> = (0..6)
.map(|i| {
let sem = Arc::clone(&sem);
let log = Arc::clone(&usage_log);
thread::spawn(move || {
sem.with_permit(|| {
log.lock().unwrap().push(i);
thread::sleep(Duration::from_millis(2));
});
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let mut log = usage_log.lock().unwrap().clone();
log.sort();
log
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_limited_concurrency() {
let max = limited_concurrency();
assert!(max <= 3, "max concurrent was {}, expected ≤ 3", max);
assert!(max >= 1);
}
#[test]
fn test_binary_semaphore_correctness() {
assert_eq!(binary_semaphore_counter(), 500);
}
#[test]
fn test_resource_pool() {
let log = resource_pool_demo();
assert_eq!(log.len(), 6);
assert_eq!(log, vec![0, 1, 2, 3, 4, 5]);
}
#[test]
fn test_semaphore_acquire_release() {
let sem = Semaphore::new(2);
sem.acquire();
sem.acquire();
// Can't acquire a third — release one
sem.release();
sem.acquire(); // should succeed
sem.release();
sem.release();
}
#[test]
fn test_semaphore_permits_count() {
let sem = Semaphore::new(3);
assert_eq!(*sem.count.lock().unwrap(), 3);
sem.acquire();
assert_eq!(*sem.count.lock().unwrap(), 2);
sem.release();
assert_eq!(*sem.count.lock().unwrap(), 3);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_limited_concurrency() {
let max = limited_concurrency();
assert!(max <= 3, "max concurrent was {}, expected ≤ 3", max);
assert!(max >= 1);
}
#[test]
fn test_binary_semaphore_correctness() {
assert_eq!(binary_semaphore_counter(), 500);
}
#[test]
fn test_resource_pool() {
let log = resource_pool_demo();
assert_eq!(log.len(), 6);
assert_eq!(log, vec![0, 1, 2, 3, 4, 5]);
}
#[test]
fn test_semaphore_acquire_release() {
let sem = Semaphore::new(2);
sem.acquire();
sem.acquire();
// Can't acquire a third — release one
sem.release();
sem.acquire(); // should succeed
sem.release();
sem.release();
}
#[test]
fn test_semaphore_permits_count() {
let sem = Semaphore::new(3);
assert_eq!(*sem.count.lock().unwrap(), 3);
sem.acquire();
assert_eq!(*sem.count.lock().unwrap(), 2);
sem.release();
assert_eq!(*sem.count.lock().unwrap(), 3);
}
}
Deep Comparison
Semaphore — Comparison
Core Insight
A semaphore is a generalized mutex: where a mutex allows 1 concurrent holder, a semaphore allows N. Both OCaml and Rust implement it the same way — an integer protected by a mutex, with threads waiting on a condition variable when the count hits zero.
OCaml Approach
count: int protected by Mutex.t + Condition.tacquire: lock, wait while count = 0, decrement, unlockrelease: lock, increment, signal, unlockwith_semaphore bracket pattern for exception safetyRust Approach
Mutex<usize> for count, Condvar for waitingacquire: lock guard, while *count == 0 { count = cond.wait(count) }, decrementrelease: lock, increment, notify_one()with_permit(f) RAII-style wrapperComparison Table
| Concept | OCaml | Rust |
|---|---|---|
| Count storage | mutable count: int | Mutex<usize> |
| Wait mechanism | Condition.wait cond m | cond.wait(guard).unwrap() |
| Signal waiter | Condition.signal cond | cond.notify_one() |
| Bracket acquire | with_semaphore sem f | sem.with_permit(f) |
| Binary mode | make_semaphore 1 | Semaphore::new(1) |
| Built into stdlib | No | No (use parking_lot or tokio) |
| Overflow guard | if count < max_count | if *count < self.max |
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
try_acquire() -> bool that returns immediately without blocking.acquire_timeout(duration: Duration) -> bool using Condvar::wait_timeout.SemaphoreGuard with Drop that calls release — ensuring release even on panic.semaphore(1) behaves identically to Mutex for exclusive access.