ExamplesBy LevelBy TopicLearning Paths
991 Advanced

991 Barrier Sync

Functional Programming

Tutorial

The Problem

Demonstrate std::sync::Barrier — a synchronization primitive that makes N threads wait at a point until all N have arrived, then releases all simultaneously. This enables phased computation: Phase 1 runs independently across all threads, then all synchronize at the barrier, then Phase 2 starts simultaneously in all threads.

🎯 Learning Outcomes

  • • Create a Barrier::new(n) and share via Arc::clone
  • • Call barrier.wait() — blocks until all n threads have called wait, then all are released
  • • Recognize that BarrierWaitResult::is_leader() returns true for exactly one thread — useful for single-threaded post-phase work
  • • Understand the use case: parallel algorithms with synchronization points (e.g., parallel BFS level synchronization)
  • • Compare with CountDownLatch (Java) and OCaml's manual Mutex + Condvar + counter equivalent
  • Code Example

    #![allow(clippy::all)]
    // 991: Barrier Synchronization
    // Rust: std::sync::Barrier — wait until N threads all arrive
    
    use std::sync::{Arc, Barrier};
    use std::thread;
    use std::time::Duration;
    
    // --- Approach 1: Simple barrier — all threads synchronize at one point ---
    fn barrier_demo() -> (Vec<String>, Vec<String>) {
        let n = 5;
        let barrier = Arc::new(Barrier::new(n));
        let phase1_log = Arc::new(std::sync::Mutex::new(Vec::new()));
        let phase2_log = Arc::new(std::sync::Mutex::new(Vec::new()));
    
        let handles: Vec<_> = (0..n)
            .map(|i| {
                let barrier = Arc::clone(&barrier);
                let p1 = Arc::clone(&phase1_log);
                let p2 = Arc::clone(&phase2_log);
                thread::spawn(move || {
                    // Phase 1: independent work
                    thread::sleep(Duration::from_millis(i as u64 * 2));
                    p1.lock().unwrap().push(format!("p1:{}", i));
    
                    // BARRIER — blocks until all N threads arrive
                    barrier.wait();
    
                    // Phase 2: all start together after barrier
                    p2.lock().unwrap().push(format!("p2:{}", i));
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
    
        let p1 = phase1_log.lock().unwrap().clone();
        let p2 = phase2_log.lock().unwrap().clone();
        (p1, p2)
    }
    
    // --- Approach 2: Detect the "leader" (the last thread to arrive) ---
    fn barrier_with_leader() -> Vec<bool> {
        let n = 4;
        let barrier = Arc::new(Barrier::new(n));
        let is_leader = Arc::new(std::sync::Mutex::new(Vec::new()));
    
        let handles: Vec<_> = (0..n)
            .map(|_| {
                let barrier = Arc::clone(&barrier);
                let leaders = Arc::clone(&is_leader);
                thread::spawn(move || {
                    let result = barrier.wait();
                    // BarrierWaitResult::is_leader() is true for exactly one thread
                    leaders.lock().unwrap().push(result.is_leader());
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
        let x = is_leader.lock().unwrap().clone();
        x
    }
    
    // --- Approach 3: Reusable barrier across multiple rounds ---
    fn multi_round_barrier() -> Vec<usize> {
        let n = 3;
        let barrier = Arc::new(Barrier::new(n));
        let counts = Arc::new(std::sync::Mutex::new(vec![0usize; 2]));
    
        let handles: Vec<_> = (0..n)
            .map(|_| {
                let barrier = Arc::clone(&barrier);
                let counts = Arc::clone(&counts);
                thread::spawn(move || {
                    for round in 0..2 {
                        counts.lock().unwrap()[round] += 1;
                        barrier.wait(); // resets automatically after all arrive
                    }
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
        let x = counts.lock().unwrap().clone();
        x
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_barrier_both_phases_complete() {
            let (p1, p2) = barrier_demo();
            assert_eq!(p1.len(), 5);
            assert_eq!(p2.len(), 5);
        }
    
        #[test]
        fn test_exactly_one_leader() {
            let leaders = barrier_with_leader();
            assert_eq!(leaders.len(), 4);
            assert_eq!(leaders.iter().filter(|&&b| b).count(), 1);
        }
    
        #[test]
        fn test_reusable_barrier() {
            let rounds = multi_round_barrier();
            assert_eq!(rounds, vec![3, 3]); // all 3 threads counted in each round
        }
    
        #[test]
        fn test_barrier_new() {
            // Barrier of 1 passes immediately
            let b = Barrier::new(1);
            let result = b.wait();
            assert!(result.is_leader());
        }
    
        #[test]
        fn test_barrier_synchronizes_ordering() {
            // Ensure no thread reaches phase2 before all finish phase1
            let n = 4;
            let barrier = Arc::new(Barrier::new(n));
            let phase1_done = Arc::new(std::sync::Mutex::new(0usize));
            let error = Arc::new(std::sync::Mutex::new(false));
    
            let handles: Vec<_> = (0..n)
                .map(|_| {
                    let b = Arc::clone(&barrier);
                    let done = Arc::clone(&phase1_done);
                    let err = Arc::clone(&error);
                    thread::spawn(move || {
                        *done.lock().unwrap() += 1;
                        b.wait();
                        // After barrier, all must have finished phase1
                        if *done.lock().unwrap() != n {
                            *err.lock().unwrap() = true;
                        }
                    })
                })
                .collect();
    
            for h in handles {
                h.join().unwrap();
            }
            assert!(!*error.lock().unwrap());
        }
    }

    Key Differences

    AspectRustOCaml
    Built-in barrierstd::sync::BarrierNo stdlib; manual with Mutex + Condition
    Leader electionis_leader() on BarrierWaitResultManual with first/last thread logic
    Broadcast wakeupInternally uses notify_allCondition.broadcast explicitly
    Generation counterInternal (handles reuse)Manual generation field

    Barriers enable clean phased parallel algorithms. They are used in parallel BFS (synchronize after each level), parallel matrix operations (synchronize after each row/column pass), and multi-stage simulation.

    OCaml Approach

    type barrier = {
      target: int;
      mutable count: int;
      mutex: Mutex.t;
      cond: Condition.t;
      mutable generation: int;
    }
    
    let create n = { target = n; count = 0; mutex = Mutex.create ();
                      cond = Condition.create (); generation = 0 }
    
    let wait b =
      Mutex.lock b.mutex;
      let gen = b.generation in
      b.count <- b.count + 1;
      if b.count = b.target then begin
        b.count <- 0;
        b.generation <- gen + 1;
        Condition.broadcast b.cond
      end else begin
        while b.generation = gen do
          Condition.wait b.cond b.mutex
        done
      end;
      Mutex.unlock b.mutex
    

    OCaml's barrier is manually implemented with Mutex + Condition + generation counter. The generation counter prevents spurious wakeups from releasing threads before all arrive. Condition.broadcast wakes all waiting threads at once — unlike notify_one.

    Full Source

    #![allow(clippy::all)]
    // 991: Barrier Synchronization
    // Rust: std::sync::Barrier — wait until N threads all arrive
    
    use std::sync::{Arc, Barrier};
    use std::thread;
    use std::time::Duration;
    
    // --- Approach 1: Simple barrier — all threads synchronize at one point ---
    fn barrier_demo() -> (Vec<String>, Vec<String>) {
        let n = 5;
        let barrier = Arc::new(Barrier::new(n));
        let phase1_log = Arc::new(std::sync::Mutex::new(Vec::new()));
        let phase2_log = Arc::new(std::sync::Mutex::new(Vec::new()));
    
        let handles: Vec<_> = (0..n)
            .map(|i| {
                let barrier = Arc::clone(&barrier);
                let p1 = Arc::clone(&phase1_log);
                let p2 = Arc::clone(&phase2_log);
                thread::spawn(move || {
                    // Phase 1: independent work
                    thread::sleep(Duration::from_millis(i as u64 * 2));
                    p1.lock().unwrap().push(format!("p1:{}", i));
    
                    // BARRIER — blocks until all N threads arrive
                    barrier.wait();
    
                    // Phase 2: all start together after barrier
                    p2.lock().unwrap().push(format!("p2:{}", i));
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
    
        let p1 = phase1_log.lock().unwrap().clone();
        let p2 = phase2_log.lock().unwrap().clone();
        (p1, p2)
    }
    
    // --- Approach 2: Detect the "leader" (the last thread to arrive) ---
    fn barrier_with_leader() -> Vec<bool> {
        let n = 4;
        let barrier = Arc::new(Barrier::new(n));
        let is_leader = Arc::new(std::sync::Mutex::new(Vec::new()));
    
        let handles: Vec<_> = (0..n)
            .map(|_| {
                let barrier = Arc::clone(&barrier);
                let leaders = Arc::clone(&is_leader);
                thread::spawn(move || {
                    let result = barrier.wait();
                    // BarrierWaitResult::is_leader() is true for exactly one thread
                    leaders.lock().unwrap().push(result.is_leader());
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
        let x = is_leader.lock().unwrap().clone();
        x
    }
    
    // --- Approach 3: Reusable barrier across multiple rounds ---
    fn multi_round_barrier() -> Vec<usize> {
        let n = 3;
        let barrier = Arc::new(Barrier::new(n));
        let counts = Arc::new(std::sync::Mutex::new(vec![0usize; 2]));
    
        let handles: Vec<_> = (0..n)
            .map(|_| {
                let barrier = Arc::clone(&barrier);
                let counts = Arc::clone(&counts);
                thread::spawn(move || {
                    for round in 0..2 {
                        counts.lock().unwrap()[round] += 1;
                        barrier.wait(); // resets automatically after all arrive
                    }
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
        let x = counts.lock().unwrap().clone();
        x
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_barrier_both_phases_complete() {
            let (p1, p2) = barrier_demo();
            assert_eq!(p1.len(), 5);
            assert_eq!(p2.len(), 5);
        }
    
        #[test]
        fn test_exactly_one_leader() {
            let leaders = barrier_with_leader();
            assert_eq!(leaders.len(), 4);
            assert_eq!(leaders.iter().filter(|&&b| b).count(), 1);
        }
    
        #[test]
        fn test_reusable_barrier() {
            let rounds = multi_round_barrier();
            assert_eq!(rounds, vec![3, 3]); // all 3 threads counted in each round
        }
    
        #[test]
        fn test_barrier_new() {
            // Barrier of 1 passes immediately
            let b = Barrier::new(1);
            let result = b.wait();
            assert!(result.is_leader());
        }
    
        #[test]
        fn test_barrier_synchronizes_ordering() {
            // Ensure no thread reaches phase2 before all finish phase1
            let n = 4;
            let barrier = Arc::new(Barrier::new(n));
            let phase1_done = Arc::new(std::sync::Mutex::new(0usize));
            let error = Arc::new(std::sync::Mutex::new(false));
    
            let handles: Vec<_> = (0..n)
                .map(|_| {
                    let b = Arc::clone(&barrier);
                    let done = Arc::clone(&phase1_done);
                    let err = Arc::clone(&error);
                    thread::spawn(move || {
                        *done.lock().unwrap() += 1;
                        b.wait();
                        // After barrier, all must have finished phase1
                        if *done.lock().unwrap() != n {
                            *err.lock().unwrap() = true;
                        }
                    })
                })
                .collect();
    
            for h in handles {
                h.join().unwrap();
            }
            assert!(!*error.lock().unwrap());
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_barrier_both_phases_complete() {
            let (p1, p2) = barrier_demo();
            assert_eq!(p1.len(), 5);
            assert_eq!(p2.len(), 5);
        }
    
        #[test]
        fn test_exactly_one_leader() {
            let leaders = barrier_with_leader();
            assert_eq!(leaders.len(), 4);
            assert_eq!(leaders.iter().filter(|&&b| b).count(), 1);
        }
    
        #[test]
        fn test_reusable_barrier() {
            let rounds = multi_round_barrier();
            assert_eq!(rounds, vec![3, 3]); // all 3 threads counted in each round
        }
    
        #[test]
        fn test_barrier_new() {
            // Barrier of 1 passes immediately
            let b = Barrier::new(1);
            let result = b.wait();
            assert!(result.is_leader());
        }
    
        #[test]
        fn test_barrier_synchronizes_ordering() {
            // Ensure no thread reaches phase2 before all finish phase1
            let n = 4;
            let barrier = Arc::new(Barrier::new(n));
            let phase1_done = Arc::new(std::sync::Mutex::new(0usize));
            let error = Arc::new(std::sync::Mutex::new(false));
    
            let handles: Vec<_> = (0..n)
                .map(|_| {
                    let b = Arc::clone(&barrier);
                    let done = Arc::clone(&phase1_done);
                    let err = Arc::clone(&error);
                    thread::spawn(move || {
                        *done.lock().unwrap() += 1;
                        b.wait();
                        // After barrier, all must have finished phase1
                        if *done.lock().unwrap() != n {
                            *err.lock().unwrap() = true;
                        }
                    })
                })
                .collect();
    
            for h in handles {
                h.join().unwrap();
            }
            assert!(!*error.lock().unwrap());
        }
    }

    Deep Comparison

    Barrier Synchronization — Comparison

    Core Insight

    A barrier is a collective synchronization point — like a countdown latch where N threads each decrement, and all are released when it hits zero. Used in parallel algorithms where phases must complete before the next begins.

    OCaml Approach

  • • No built-in barrier — simulate with Mutex + Condition + generation counter
  • generation counter prevents spurious wakeup confusion across rounds
  • Condition.broadcast wakes all waiting threads simultaneously
  • • Reusable: increment generation and reset count atomically
  • Rust Approach

  • std::sync::Barrier::new(n) — built-in, no boilerplate
  • barrier.wait() blocks until n threads have called it
  • • Returns BarrierWaitResult.is_leader() is true for exactly one thread
  • • Automatically resets — reusable for multiple rounds
  • • Thread-safe by design; panic-safe
  • Comparison Table

    ConceptOCaml (simulated)Rust
    CreateManual struct with mutex+condvarBarrier::new(n)
    Wait at barrierbarrier_wait bbarrier.wait()
    Leader detectionNot built-inresult.is_leader()
    Reuse after triggerManual generation counterAutomatic
    Prevent spurious wakewhile gen = b.generationHandled internally
    Wake mechanismCondition.broadcastInternal (implementation-defined)
    StdlibNoYes (std::sync::Barrier)

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Use the is_leader() result to have exactly one thread print "phase barrier reached" between phases.
  • Implement a reusable barrier (can wait multiple times for multiple phases) and verify phase isolation.
  • Build a parallel word count: Phase 1 = each thread counts its chunk, barrier, Phase 2 = one thread merges.
  • Implement a timeout_barrier that releases all threads if one hasn't arrived within a Duration.
  • Demonstrate that removing the barrier causes Phase 2 to start before Phase 1 completes on some threads.
  • Open Source Repos