ExamplesBy LevelBy TopicLearning Paths
458 Advanced

458: Barrier Synchronization

Functional Programming

Tutorial

The Problem

Parallel algorithms often have phases: all threads complete phase 1 before any begins phase 2. A barrier synchronization point ensures all N threads reach the barrier before any proceed. Without barriers, fast threads would start phase 2 while slow threads are still in phase 1, reading partially-computed data. Barriers are the foundation of parallel algorithm phases in scientific computing (iterative solvers, particle simulations), data pipeline stages, and distributed systems consensus.

std::sync::Barrier is used in parallel numerical solvers, image processing pipelines, distributed consensus protocols (simulated with threads), and test synchronization.

🎯 Learning Outcomes

  • • Understand how Barrier::new(n) creates a synchronization point for n threads
  • • Learn how barrier.wait() blocks until all n threads have called it
  • • See how BarrierWaitResult::is_leader() identifies one thread per barrier crossing
  • • Understand barrier phases: a barrier can be reused to synchronize multiple phases
  • • Learn when barriers are appropriate vs. join (all-to-one vs. all-to-all)
  • Code Example

    #![allow(clippy::all)]
    // 458. Barrier for thread synchronization
    
    use std::sync::{Arc, Barrier};
    use std::thread;
    
    #[cfg(test)]
    mod tests {
        use super::*;
        use std::sync::atomic::{AtomicUsize, Ordering};
        #[test]
        fn test_all_arrive() {
            let n = 4;
            let b = Arc::new(Barrier::new(n));
            let c = Arc::new(AtomicUsize::new(0));
            thread::scope(|s| {
                for _ in 0..n {
                    let b = Arc::clone(&b);
                    let c = Arc::clone(&c);
                    s.spawn(move || {
                        c.fetch_add(1, Ordering::SeqCst);
                        b.wait();
                        assert_eq!(c.load(Ordering::SeqCst), n);
                    });
                }
            });
        }
        #[test]
        fn test_one_leader() {
            let n = 5;
            let b = Arc::new(Barrier::new(n));
            let leaders = Arc::new(AtomicUsize::new(0));
            thread::scope(|s| {
                for _ in 0..n {
                    let b = Arc::clone(&b);
                    let l = Arc::clone(&leaders);
                    s.spawn(move || {
                        if b.wait().is_leader() {
                            l.fetch_add(1, Ordering::SeqCst);
                        }
                    });
                }
            });
            assert_eq!(leaders.load(Ordering::SeqCst), 1);
        }
    }

    Key Differences

  • Built-in: Rust's std::sync::Barrier is in the standard library; OCaml requires manual implementation with Mutex + Condvar.
  • Reusability: Rust's Barrier can be reused across multiple phases; OCaml's manual implementation requires explicit reset.
  • Leader selection: Rust's BarrierWaitResult::is_leader() identifies one thread per crossing; OCaml's implementation would need explicit tracking.
  • Phase alignment: Rust's barrier naturally aligns parallel phases; OCaml uses Task.await chains for structured coordination.
  • OCaml Approach

    OCaml doesn't have a built-in Barrier type. A barrier is implemented with a Mutex + Condvar + counter: increment the counter under the mutex, then wait on the condvar until the count reaches N, then Condition.broadcast. OCaml 5.x's Domainslib.Task.async/await provides structured synchronization without manual barriers. The Thread.join approach works for one-shot synchronization.

    Full Source

    #![allow(clippy::all)]
    // 458. Barrier for thread synchronization
    
    use std::sync::{Arc, Barrier};
    use std::thread;
    
    #[cfg(test)]
    mod tests {
        use super::*;
        use std::sync::atomic::{AtomicUsize, Ordering};
        #[test]
        fn test_all_arrive() {
            let n = 4;
            let b = Arc::new(Barrier::new(n));
            let c = Arc::new(AtomicUsize::new(0));
            thread::scope(|s| {
                for _ in 0..n {
                    let b = Arc::clone(&b);
                    let c = Arc::clone(&c);
                    s.spawn(move || {
                        c.fetch_add(1, Ordering::SeqCst);
                        b.wait();
                        assert_eq!(c.load(Ordering::SeqCst), n);
                    });
                }
            });
        }
        #[test]
        fn test_one_leader() {
            let n = 5;
            let b = Arc::new(Barrier::new(n));
            let leaders = Arc::new(AtomicUsize::new(0));
            thread::scope(|s| {
                for _ in 0..n {
                    let b = Arc::clone(&b);
                    let l = Arc::clone(&leaders);
                    s.spawn(move || {
                        if b.wait().is_leader() {
                            l.fetch_add(1, Ordering::SeqCst);
                        }
                    });
                }
            });
            assert_eq!(leaders.load(Ordering::SeqCst), 1);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
        use std::sync::atomic::{AtomicUsize, Ordering};
        #[test]
        fn test_all_arrive() {
            let n = 4;
            let b = Arc::new(Barrier::new(n));
            let c = Arc::new(AtomicUsize::new(0));
            thread::scope(|s| {
                for _ in 0..n {
                    let b = Arc::clone(&b);
                    let c = Arc::clone(&c);
                    s.spawn(move || {
                        c.fetch_add(1, Ordering::SeqCst);
                        b.wait();
                        assert_eq!(c.load(Ordering::SeqCst), n);
                    });
                }
            });
        }
        #[test]
        fn test_one_leader() {
            let n = 5;
            let b = Arc::new(Barrier::new(n));
            let leaders = Arc::new(AtomicUsize::new(0));
            thread::scope(|s| {
                for _ in 0..n {
                    let b = Arc::clone(&b);
                    let l = Arc::clone(&leaders);
                    s.spawn(move || {
                        if b.wait().is_leader() {
                            l.fetch_add(1, Ordering::SeqCst);
                        }
                    });
                }
            });
            assert_eq!(leaders.load(Ordering::SeqCst), 1);
        }
    }

    Exercises

  • Two-phase computation: Use a barrier to implement two-phase Gaussian elimination: phase 1 all threads compute row reductions, barrier, phase 2 all threads back-substitute. Verify the result matches sequential elimination.
  • Leader action: Use is_leader() to have exactly one thread write intermediate results to a file between phases. All other threads continue with phase 2 while the leader writes.
  • Barrier timeout: Implement a TimedBarrier that acts like Barrier but returns an error if not all threads arrive within a timeout. Use Condvar::wait_timeout_while internally.
  • Open Source Repos