991 Barrier Sync
Tutorial
The Problem
Demonstrate std::sync::Barrier — a synchronization primitive that makes N threads wait at a point until all N have arrived, then releases all simultaneously. This enables phased computation: Phase 1 runs independently across all threads, then all synchronize at the barrier, then Phase 2 starts simultaneously in all threads.
🎯 Learning Outcomes
Barrier::new(n) and share via Arc::clonebarrier.wait() — blocks until all n threads have called wait, then all are releasedBarrierWaitResult::is_leader() returns true for exactly one thread — useful for single-threaded post-phase workCountDownLatch (Java) and OCaml's manual Mutex + Condvar + counter equivalentCode Example
#![allow(clippy::all)]
// 991: Barrier Synchronization
// Rust: std::sync::Barrier — wait until N threads all arrive
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
// --- Approach 1: Simple barrier — all threads synchronize at one point ---
fn barrier_demo() -> (Vec<String>, Vec<String>) {
let n = 5;
let barrier = Arc::new(Barrier::new(n));
let phase1_log = Arc::new(std::sync::Mutex::new(Vec::new()));
let phase2_log = Arc::new(std::sync::Mutex::new(Vec::new()));
let handles: Vec<_> = (0..n)
.map(|i| {
let barrier = Arc::clone(&barrier);
let p1 = Arc::clone(&phase1_log);
let p2 = Arc::clone(&phase2_log);
thread::spawn(move || {
// Phase 1: independent work
thread::sleep(Duration::from_millis(i as u64 * 2));
p1.lock().unwrap().push(format!("p1:{}", i));
// BARRIER — blocks until all N threads arrive
barrier.wait();
// Phase 2: all start together after barrier
p2.lock().unwrap().push(format!("p2:{}", i));
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let p1 = phase1_log.lock().unwrap().clone();
let p2 = phase2_log.lock().unwrap().clone();
(p1, p2)
}
// --- Approach 2: Detect the "leader" (the last thread to arrive) ---
fn barrier_with_leader() -> Vec<bool> {
let n = 4;
let barrier = Arc::new(Barrier::new(n));
let is_leader = Arc::new(std::sync::Mutex::new(Vec::new()));
let handles: Vec<_> = (0..n)
.map(|_| {
let barrier = Arc::clone(&barrier);
let leaders = Arc::clone(&is_leader);
thread::spawn(move || {
let result = barrier.wait();
// BarrierWaitResult::is_leader() is true for exactly one thread
leaders.lock().unwrap().push(result.is_leader());
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let x = is_leader.lock().unwrap().clone();
x
}
// --- Approach 3: Reusable barrier across multiple rounds ---
fn multi_round_barrier() -> Vec<usize> {
let n = 3;
let barrier = Arc::new(Barrier::new(n));
let counts = Arc::new(std::sync::Mutex::new(vec![0usize; 2]));
let handles: Vec<_> = (0..n)
.map(|_| {
let barrier = Arc::clone(&barrier);
let counts = Arc::clone(&counts);
thread::spawn(move || {
for round in 0..2 {
counts.lock().unwrap()[round] += 1;
barrier.wait(); // resets automatically after all arrive
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let x = counts.lock().unwrap().clone();
x
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_barrier_both_phases_complete() {
let (p1, p2) = barrier_demo();
assert_eq!(p1.len(), 5);
assert_eq!(p2.len(), 5);
}
#[test]
fn test_exactly_one_leader() {
let leaders = barrier_with_leader();
assert_eq!(leaders.len(), 4);
assert_eq!(leaders.iter().filter(|&&b| b).count(), 1);
}
#[test]
fn test_reusable_barrier() {
let rounds = multi_round_barrier();
assert_eq!(rounds, vec![3, 3]); // all 3 threads counted in each round
}
#[test]
fn test_barrier_new() {
// Barrier of 1 passes immediately
let b = Barrier::new(1);
let result = b.wait();
assert!(result.is_leader());
}
#[test]
fn test_barrier_synchronizes_ordering() {
// Ensure no thread reaches phase2 before all finish phase1
let n = 4;
let barrier = Arc::new(Barrier::new(n));
let phase1_done = Arc::new(std::sync::Mutex::new(0usize));
let error = Arc::new(std::sync::Mutex::new(false));
let handles: Vec<_> = (0..n)
.map(|_| {
let b = Arc::clone(&barrier);
let done = Arc::clone(&phase1_done);
let err = Arc::clone(&error);
thread::spawn(move || {
*done.lock().unwrap() += 1;
b.wait();
// After barrier, all must have finished phase1
if *done.lock().unwrap() != n {
*err.lock().unwrap() = true;
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert!(!*error.lock().unwrap());
}
}Key Differences
| Aspect | Rust | OCaml |
|---|---|---|
| Built-in barrier | std::sync::Barrier | No stdlib; manual with Mutex + Condition |
| Leader election | is_leader() on BarrierWaitResult | Manual with first/last thread logic |
| Broadcast wakeup | Internally uses notify_all | Condition.broadcast explicitly |
| Generation counter | Internal (handles reuse) | Manual generation field |
Barriers enable clean phased parallel algorithms. They are used in parallel BFS (synchronize after each level), parallel matrix operations (synchronize after each row/column pass), and multi-stage simulation.
OCaml Approach
type barrier = {
target: int;
mutable count: int;
mutex: Mutex.t;
cond: Condition.t;
mutable generation: int;
}
let create n = { target = n; count = 0; mutex = Mutex.create ();
cond = Condition.create (); generation = 0 }
let wait b =
Mutex.lock b.mutex;
let gen = b.generation in
b.count <- b.count + 1;
if b.count = b.target then begin
b.count <- 0;
b.generation <- gen + 1;
Condition.broadcast b.cond
end else begin
while b.generation = gen do
Condition.wait b.cond b.mutex
done
end;
Mutex.unlock b.mutex
OCaml's barrier is manually implemented with Mutex + Condition + generation counter. The generation counter prevents spurious wakeups from releasing threads before all arrive. Condition.broadcast wakes all waiting threads at once — unlike notify_one.
Full Source
#![allow(clippy::all)]
// 991: Barrier Synchronization
// Rust: std::sync::Barrier — wait until N threads all arrive
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
// --- Approach 1: Simple barrier — all threads synchronize at one point ---
fn barrier_demo() -> (Vec<String>, Vec<String>) {
let n = 5;
let barrier = Arc::new(Barrier::new(n));
let phase1_log = Arc::new(std::sync::Mutex::new(Vec::new()));
let phase2_log = Arc::new(std::sync::Mutex::new(Vec::new()));
let handles: Vec<_> = (0..n)
.map(|i| {
let barrier = Arc::clone(&barrier);
let p1 = Arc::clone(&phase1_log);
let p2 = Arc::clone(&phase2_log);
thread::spawn(move || {
// Phase 1: independent work
thread::sleep(Duration::from_millis(i as u64 * 2));
p1.lock().unwrap().push(format!("p1:{}", i));
// BARRIER — blocks until all N threads arrive
barrier.wait();
// Phase 2: all start together after barrier
p2.lock().unwrap().push(format!("p2:{}", i));
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let p1 = phase1_log.lock().unwrap().clone();
let p2 = phase2_log.lock().unwrap().clone();
(p1, p2)
}
// --- Approach 2: Detect the "leader" (the last thread to arrive) ---
fn barrier_with_leader() -> Vec<bool> {
let n = 4;
let barrier = Arc::new(Barrier::new(n));
let is_leader = Arc::new(std::sync::Mutex::new(Vec::new()));
let handles: Vec<_> = (0..n)
.map(|_| {
let barrier = Arc::clone(&barrier);
let leaders = Arc::clone(&is_leader);
thread::spawn(move || {
let result = barrier.wait();
// BarrierWaitResult::is_leader() is true for exactly one thread
leaders.lock().unwrap().push(result.is_leader());
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let x = is_leader.lock().unwrap().clone();
x
}
// --- Approach 3: Reusable barrier across multiple rounds ---
fn multi_round_barrier() -> Vec<usize> {
let n = 3;
let barrier = Arc::new(Barrier::new(n));
let counts = Arc::new(std::sync::Mutex::new(vec![0usize; 2]));
let handles: Vec<_> = (0..n)
.map(|_| {
let barrier = Arc::clone(&barrier);
let counts = Arc::clone(&counts);
thread::spawn(move || {
for round in 0..2 {
counts.lock().unwrap()[round] += 1;
barrier.wait(); // resets automatically after all arrive
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let x = counts.lock().unwrap().clone();
x
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_barrier_both_phases_complete() {
let (p1, p2) = barrier_demo();
assert_eq!(p1.len(), 5);
assert_eq!(p2.len(), 5);
}
#[test]
fn test_exactly_one_leader() {
let leaders = barrier_with_leader();
assert_eq!(leaders.len(), 4);
assert_eq!(leaders.iter().filter(|&&b| b).count(), 1);
}
#[test]
fn test_reusable_barrier() {
let rounds = multi_round_barrier();
assert_eq!(rounds, vec![3, 3]); // all 3 threads counted in each round
}
#[test]
fn test_barrier_new() {
// Barrier of 1 passes immediately
let b = Barrier::new(1);
let result = b.wait();
assert!(result.is_leader());
}
#[test]
fn test_barrier_synchronizes_ordering() {
// Ensure no thread reaches phase2 before all finish phase1
let n = 4;
let barrier = Arc::new(Barrier::new(n));
let phase1_done = Arc::new(std::sync::Mutex::new(0usize));
let error = Arc::new(std::sync::Mutex::new(false));
let handles: Vec<_> = (0..n)
.map(|_| {
let b = Arc::clone(&barrier);
let done = Arc::clone(&phase1_done);
let err = Arc::clone(&error);
thread::spawn(move || {
*done.lock().unwrap() += 1;
b.wait();
// After barrier, all must have finished phase1
if *done.lock().unwrap() != n {
*err.lock().unwrap() = true;
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert!(!*error.lock().unwrap());
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_barrier_both_phases_complete() {
let (p1, p2) = barrier_demo();
assert_eq!(p1.len(), 5);
assert_eq!(p2.len(), 5);
}
#[test]
fn test_exactly_one_leader() {
let leaders = barrier_with_leader();
assert_eq!(leaders.len(), 4);
assert_eq!(leaders.iter().filter(|&&b| b).count(), 1);
}
#[test]
fn test_reusable_barrier() {
let rounds = multi_round_barrier();
assert_eq!(rounds, vec![3, 3]); // all 3 threads counted in each round
}
#[test]
fn test_barrier_new() {
// Barrier of 1 passes immediately
let b = Barrier::new(1);
let result = b.wait();
assert!(result.is_leader());
}
#[test]
fn test_barrier_synchronizes_ordering() {
// Ensure no thread reaches phase2 before all finish phase1
let n = 4;
let barrier = Arc::new(Barrier::new(n));
let phase1_done = Arc::new(std::sync::Mutex::new(0usize));
let error = Arc::new(std::sync::Mutex::new(false));
let handles: Vec<_> = (0..n)
.map(|_| {
let b = Arc::clone(&barrier);
let done = Arc::clone(&phase1_done);
let err = Arc::clone(&error);
thread::spawn(move || {
*done.lock().unwrap() += 1;
b.wait();
// After barrier, all must have finished phase1
if *done.lock().unwrap() != n {
*err.lock().unwrap() = true;
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert!(!*error.lock().unwrap());
}
}
Deep Comparison
Barrier Synchronization — Comparison
Core Insight
A barrier is a collective synchronization point — like a countdown latch where N threads each decrement, and all are released when it hits zero. Used in parallel algorithms where phases must complete before the next begins.
OCaml Approach
Mutex + Condition + generation countergeneration counter prevents spurious wakeup confusion across roundsCondition.broadcast wakes all waiting threads simultaneouslygeneration and reset count atomicallyRust Approach
std::sync::Barrier::new(n) — built-in, no boilerplatebarrier.wait() blocks until n threads have called itBarrierWaitResult — .is_leader() is true for exactly one threadComparison Table
| Concept | OCaml (simulated) | Rust |
|---|---|---|
| Create | Manual struct with mutex+condvar | Barrier::new(n) |
| Wait at barrier | barrier_wait b | barrier.wait() |
| Leader detection | Not built-in | result.is_leader() |
| Reuse after trigger | Manual generation counter | Automatic |
| Prevent spurious wake | while gen = b.generation | Handled internally |
| Wake mechanism | Condition.broadcast | Internal (implementation-defined) |
| Stdlib | No | Yes (std::sync::Barrier) |
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
is_leader() result to have exactly one thread print "phase barrier reached" between phases.wait multiple times for multiple phases) and verify phase isolation.timeout_barrier that releases all threads if one hasn't arrived within a Duration.