ExamplesBy LevelBy TopicLearning Paths
990 Fundamental

990 Semaphore

Functional Programming

Tutorial

The Problem

Implement a counting semaphore using Mutex<usize> + Condvar. The semaphore limits the number of concurrent operations: acquire blocks when the count reaches zero; release increments the count and wakes a waiting thread. Provide a RAII with_permit helper that acquires and releases automatically.

🎯 Learning Outcomes

  • • Implement Semaphore { count: Mutex<usize>, cond: Condvar, max: usize }
  • • Implement acquire using Condvar::wait(guard) in a while *count == 0 loop (spurious wakeup protection)
  • • Implement release that increments count and calls Condvar::notify_one()
  • • Implement with_permit<T, F: FnOnce() -> T>(&self, f: F) -> T for RAII acquire/release
  • • Understand the concurrency pattern: semaphore(1) = mutex, semaphore(N) = N-slot license
  • Code Example

    #![allow(clippy::all)]
    // 990: Semaphore via Mutex<usize> + Condvar
    // Counting semaphore: limit N concurrent operations
    
    use std::sync::{Arc, Condvar, Mutex};
    use std::thread;
    use std::time::Duration;
    
    struct Semaphore {
        count: Mutex<usize>,
        cond: Condvar,
        max: usize,
    }
    
    impl Semaphore {
        fn new(n: usize) -> Self {
            Semaphore {
                count: Mutex::new(n),
                cond: Condvar::new(),
                max: n,
            }
        }
    
        fn acquire(&self) {
            let mut count = self.count.lock().unwrap();
            while *count == 0 {
                count = self.cond.wait(count).unwrap();
            }
            *count -= 1;
        }
    
        fn release(&self) {
            let mut count = self.count.lock().unwrap();
            if *count < self.max {
                *count += 1;
                self.cond.notify_one();
            }
        }
    
        fn with_permit<T, F: FnOnce() -> T>(&self, f: F) -> T {
            self.acquire();
            let result = f();
            self.release();
            result
        }
    }
    
    // --- Approach 1: Limit concurrent workers ---
    fn limited_concurrency() -> usize {
        let sem = Arc::new(Semaphore::new(3));
        let active = Arc::new(Mutex::new(0usize));
        let max_active = Arc::new(Mutex::new(0usize));
    
        let handles: Vec<_> = (0..10)
            .map(|_| {
                let sem = Arc::clone(&sem);
                let active = Arc::clone(&active);
                let max_active = Arc::clone(&max_active);
                thread::spawn(move || {
                    sem.with_permit(|| {
                        {
                            let mut a = active.lock().unwrap();
                            *a += 1;
                            let mut m = max_active.lock().unwrap();
                            if *a > *m {
                                *m = *a;
                            }
                        }
                        thread::sleep(Duration::from_millis(5));
                        *active.lock().unwrap() -= 1;
                    });
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
        let x = *max_active.lock().unwrap();
        x
    }
    
    // --- Approach 2: Binary semaphore as mutex ---
    fn binary_semaphore_counter() -> u32 {
        let sem = Arc::new(Semaphore::new(1));
        let counter = Arc::new(Mutex::new(0u32));
    
        let handles: Vec<_> = (0..5)
            .map(|_| {
                let sem = Arc::clone(&sem);
                let counter = Arc::clone(&counter);
                thread::spawn(move || {
                    for _ in 0..100 {
                        sem.with_permit(|| {
                            *counter.lock().unwrap() += 1;
                        });
                    }
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
        let x = *counter.lock().unwrap();
        x
    }
    
    // --- Approach 3: Drain a resource pool ---
    fn resource_pool_demo() -> Vec<usize> {
        const POOL_SIZE: usize = 2;
        let sem = Arc::new(Semaphore::new(POOL_SIZE));
        let usage_log = Arc::new(Mutex::new(Vec::new()));
    
        let handles: Vec<_> = (0..6)
            .map(|i| {
                let sem = Arc::clone(&sem);
                let log = Arc::clone(&usage_log);
                thread::spawn(move || {
                    sem.with_permit(|| {
                        log.lock().unwrap().push(i);
                        thread::sleep(Duration::from_millis(2));
                    });
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
        let mut log = usage_log.lock().unwrap().clone();
        log.sort();
        log
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_limited_concurrency() {
            let max = limited_concurrency();
            assert!(max <= 3, "max concurrent was {}, expected ≤ 3", max);
            assert!(max >= 1);
        }
    
        #[test]
        fn test_binary_semaphore_correctness() {
            assert_eq!(binary_semaphore_counter(), 500);
        }
    
        #[test]
        fn test_resource_pool() {
            let log = resource_pool_demo();
            assert_eq!(log.len(), 6);
            assert_eq!(log, vec![0, 1, 2, 3, 4, 5]);
        }
    
        #[test]
        fn test_semaphore_acquire_release() {
            let sem = Semaphore::new(2);
            sem.acquire();
            sem.acquire();
            // Can't acquire a third — release one
            sem.release();
            sem.acquire(); // should succeed
            sem.release();
            sem.release();
        }
    
        #[test]
        fn test_semaphore_permits_count() {
            let sem = Semaphore::new(3);
            assert_eq!(*sem.count.lock().unwrap(), 3);
            sem.acquire();
            assert_eq!(*sem.count.lock().unwrap(), 2);
            sem.release();
            assert_eq!(*sem.count.lock().unwrap(), 3);
        }
    }

    Key Differences

    AspectRustOCaml
    Condition variableCondvar — paired with MutexCondition.t — paired with Mutex.t
    Spurious wakeupwhile *count == 0 loopwhile count = 0 do ... done loop
    RAII acquirewith_permit methodFun.protect ~finally
    Atomic wait+sleepcond.wait(guard) — releases mutex atomicallyCondition.wait cond mutex

    Counting semaphores model "N concurrent resources" — N database connections, N parallel downloads, N worker slots. tokio::sync::Semaphore provides the async equivalent for non-blocking workloads.

    OCaml Approach

    type t = {
      mutable count: int;
      max: int;
      mutex: Mutex.t;
      cond: Condition.t;
    }
    
    let create n = { count = n; max = n; mutex = Mutex.create (); cond = Condition.create () }
    
    let acquire s =
      Mutex.lock s.mutex;
      while s.count = 0 do
        Condition.wait s.cond s.mutex
      done;
      s.count <- s.count - 1;
      Mutex.unlock s.mutex
    
    let release s =
      Mutex.lock s.mutex;
      if s.count < s.max then begin
        s.count <- s.count + 1;
        Condition.signal s.cond
      end;
      Mutex.unlock s.mutex
    
    let with_permit s f =
      acquire s;
      Fun.protect ~finally:(fun () -> release s) f
    

    OCaml's Condition.wait is the direct analog of Condvar::wait. Fun.protect ~finally ensures release runs even if f raises an exception — the OCaml equivalent of Rust's drop-on-unwind.

    Full Source

    #![allow(clippy::all)]
    // 990: Semaphore via Mutex<usize> + Condvar
    // Counting semaphore: limit N concurrent operations
    
    use std::sync::{Arc, Condvar, Mutex};
    use std::thread;
    use std::time::Duration;
    
    struct Semaphore {
        count: Mutex<usize>,
        cond: Condvar,
        max: usize,
    }
    
    impl Semaphore {
        fn new(n: usize) -> Self {
            Semaphore {
                count: Mutex::new(n),
                cond: Condvar::new(),
                max: n,
            }
        }
    
        fn acquire(&self) {
            let mut count = self.count.lock().unwrap();
            while *count == 0 {
                count = self.cond.wait(count).unwrap();
            }
            *count -= 1;
        }
    
        fn release(&self) {
            let mut count = self.count.lock().unwrap();
            if *count < self.max {
                *count += 1;
                self.cond.notify_one();
            }
        }
    
        fn with_permit<T, F: FnOnce() -> T>(&self, f: F) -> T {
            self.acquire();
            let result = f();
            self.release();
            result
        }
    }
    
    // --- Approach 1: Limit concurrent workers ---
    fn limited_concurrency() -> usize {
        let sem = Arc::new(Semaphore::new(3));
        let active = Arc::new(Mutex::new(0usize));
        let max_active = Arc::new(Mutex::new(0usize));
    
        let handles: Vec<_> = (0..10)
            .map(|_| {
                let sem = Arc::clone(&sem);
                let active = Arc::clone(&active);
                let max_active = Arc::clone(&max_active);
                thread::spawn(move || {
                    sem.with_permit(|| {
                        {
                            let mut a = active.lock().unwrap();
                            *a += 1;
                            let mut m = max_active.lock().unwrap();
                            if *a > *m {
                                *m = *a;
                            }
                        }
                        thread::sleep(Duration::from_millis(5));
                        *active.lock().unwrap() -= 1;
                    });
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
        let x = *max_active.lock().unwrap();
        x
    }
    
    // --- Approach 2: Binary semaphore as mutex ---
    fn binary_semaphore_counter() -> u32 {
        let sem = Arc::new(Semaphore::new(1));
        let counter = Arc::new(Mutex::new(0u32));
    
        let handles: Vec<_> = (0..5)
            .map(|_| {
                let sem = Arc::clone(&sem);
                let counter = Arc::clone(&counter);
                thread::spawn(move || {
                    for _ in 0..100 {
                        sem.with_permit(|| {
                            *counter.lock().unwrap() += 1;
                        });
                    }
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
        let x = *counter.lock().unwrap();
        x
    }
    
    // --- Approach 3: Drain a resource pool ---
    fn resource_pool_demo() -> Vec<usize> {
        const POOL_SIZE: usize = 2;
        let sem = Arc::new(Semaphore::new(POOL_SIZE));
        let usage_log = Arc::new(Mutex::new(Vec::new()));
    
        let handles: Vec<_> = (0..6)
            .map(|i| {
                let sem = Arc::clone(&sem);
                let log = Arc::clone(&usage_log);
                thread::spawn(move || {
                    sem.with_permit(|| {
                        log.lock().unwrap().push(i);
                        thread::sleep(Duration::from_millis(2));
                    });
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
        let mut log = usage_log.lock().unwrap().clone();
        log.sort();
        log
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_limited_concurrency() {
            let max = limited_concurrency();
            assert!(max <= 3, "max concurrent was {}, expected ≤ 3", max);
            assert!(max >= 1);
        }
    
        #[test]
        fn test_binary_semaphore_correctness() {
            assert_eq!(binary_semaphore_counter(), 500);
        }
    
        #[test]
        fn test_resource_pool() {
            let log = resource_pool_demo();
            assert_eq!(log.len(), 6);
            assert_eq!(log, vec![0, 1, 2, 3, 4, 5]);
        }
    
        #[test]
        fn test_semaphore_acquire_release() {
            let sem = Semaphore::new(2);
            sem.acquire();
            sem.acquire();
            // Can't acquire a third — release one
            sem.release();
            sem.acquire(); // should succeed
            sem.release();
            sem.release();
        }
    
        #[test]
        fn test_semaphore_permits_count() {
            let sem = Semaphore::new(3);
            assert_eq!(*sem.count.lock().unwrap(), 3);
            sem.acquire();
            assert_eq!(*sem.count.lock().unwrap(), 2);
            sem.release();
            assert_eq!(*sem.count.lock().unwrap(), 3);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_limited_concurrency() {
            let max = limited_concurrency();
            assert!(max <= 3, "max concurrent was {}, expected ≤ 3", max);
            assert!(max >= 1);
        }
    
        #[test]
        fn test_binary_semaphore_correctness() {
            assert_eq!(binary_semaphore_counter(), 500);
        }
    
        #[test]
        fn test_resource_pool() {
            let log = resource_pool_demo();
            assert_eq!(log.len(), 6);
            assert_eq!(log, vec![0, 1, 2, 3, 4, 5]);
        }
    
        #[test]
        fn test_semaphore_acquire_release() {
            let sem = Semaphore::new(2);
            sem.acquire();
            sem.acquire();
            // Can't acquire a third — release one
            sem.release();
            sem.acquire(); // should succeed
            sem.release();
            sem.release();
        }
    
        #[test]
        fn test_semaphore_permits_count() {
            let sem = Semaphore::new(3);
            assert_eq!(*sem.count.lock().unwrap(), 3);
            sem.acquire();
            assert_eq!(*sem.count.lock().unwrap(), 2);
            sem.release();
            assert_eq!(*sem.count.lock().unwrap(), 3);
        }
    }

    Deep Comparison

    Semaphore — Comparison

    Core Insight

    A semaphore is a generalized mutex: where a mutex allows 1 concurrent holder, a semaphore allows N. Both OCaml and Rust implement it the same way — an integer protected by a mutex, with threads waiting on a condition variable when the count hits zero.

    OCaml Approach

  • count: int protected by Mutex.t + Condition.t
  • acquire: lock, wait while count = 0, decrement, unlock
  • release: lock, increment, signal, unlock
  • with_semaphore bracket pattern for exception safety
  • • No built-in semaphore in OCaml's stdlib — always custom
  • Rust Approach

  • Mutex<usize> for count, Condvar for waiting
  • acquire: lock guard, while *count == 0 { count = cond.wait(count) }, decrement
  • release: lock, increment, notify_one()
  • with_permit(f) RAII-style wrapper
  • • External crates (tokio, parking_lot) provide optimized async semaphores
  • Comparison Table

    ConceptOCamlRust
    Count storagemutable count: intMutex<usize>
    Wait mechanismCondition.wait cond mcond.wait(guard).unwrap()
    Signal waiterCondition.signal condcond.notify_one()
    Bracket acquirewith_semaphore sem fsem.with_permit(f)
    Binary modemake_semaphore 1Semaphore::new(1)
    Built into stdlibNoNo (use parking_lot or tokio)
    Overflow guardif count < max_countif *count < self.max

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Add a try_acquire() -> bool that returns immediately without blocking.
  • Add acquire_timeout(duration: Duration) -> bool using Condvar::wait_timeout.
  • Implement a RAII guard SemaphoreGuard with Drop that calls release — ensuring release even on panic.
  • Use the semaphore to limit concurrent HTTP requests: spawn 20 threads, but only allow 5 to run simultaneously.
  • Verify that semaphore(1) behaves identically to Mutex for exclusive access.
  • Open Source Repos