458: Barrier Synchronization
Tutorial
The Problem
Parallel algorithms often have phases: all threads complete phase 1 before any begins phase 2. A barrier synchronization point ensures all N threads reach the barrier before any proceed. Without barriers, fast threads would start phase 2 while slow threads are still in phase 1, reading partially-computed data. Barriers are the foundation of parallel algorithm phases in scientific computing (iterative solvers, particle simulations), data pipeline stages, and distributed systems consensus.
std::sync::Barrier is used in parallel numerical solvers, image processing pipelines, distributed consensus protocols (simulated with threads), and test synchronization.
🎯 Learning Outcomes
Barrier::new(n) creates a synchronization point for n threadsbarrier.wait() blocks until all n threads have called itBarrierWaitResult::is_leader() identifies one thread per barrier crossingjoin (all-to-one vs. all-to-all)Code Example
#![allow(clippy::all)]
// 458. Barrier for thread synchronization
use std::sync::{Arc, Barrier};
use std::thread;
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_all_arrive() {
let n = 4;
let b = Arc::new(Barrier::new(n));
let c = Arc::new(AtomicUsize::new(0));
thread::scope(|s| {
for _ in 0..n {
let b = Arc::clone(&b);
let c = Arc::clone(&c);
s.spawn(move || {
c.fetch_add(1, Ordering::SeqCst);
b.wait();
assert_eq!(c.load(Ordering::SeqCst), n);
});
}
});
}
#[test]
fn test_one_leader() {
let n = 5;
let b = Arc::new(Barrier::new(n));
let leaders = Arc::new(AtomicUsize::new(0));
thread::scope(|s| {
for _ in 0..n {
let b = Arc::clone(&b);
let l = Arc::clone(&leaders);
s.spawn(move || {
if b.wait().is_leader() {
l.fetch_add(1, Ordering::SeqCst);
}
});
}
});
assert_eq!(leaders.load(Ordering::SeqCst), 1);
}
}Key Differences
std::sync::Barrier is in the standard library; OCaml requires manual implementation with Mutex + Condvar.Barrier can be reused across multiple phases; OCaml's manual implementation requires explicit reset.BarrierWaitResult::is_leader() identifies one thread per crossing; OCaml's implementation would need explicit tracking.Task.await chains for structured coordination.OCaml Approach
OCaml doesn't have a built-in Barrier type. A barrier is implemented with a Mutex + Condvar + counter: increment the counter under the mutex, then wait on the condvar until the count reaches N, then Condition.broadcast. OCaml 5.x's Domainslib.Task.async/await provides structured synchronization without manual barriers. The Thread.join approach works for one-shot synchronization.
Full Source
#![allow(clippy::all)]
// 458. Barrier for thread synchronization
use std::sync::{Arc, Barrier};
use std::thread;
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_all_arrive() {
let n = 4;
let b = Arc::new(Barrier::new(n));
let c = Arc::new(AtomicUsize::new(0));
thread::scope(|s| {
for _ in 0..n {
let b = Arc::clone(&b);
let c = Arc::clone(&c);
s.spawn(move || {
c.fetch_add(1, Ordering::SeqCst);
b.wait();
assert_eq!(c.load(Ordering::SeqCst), n);
});
}
});
}
#[test]
fn test_one_leader() {
let n = 5;
let b = Arc::new(Barrier::new(n));
let leaders = Arc::new(AtomicUsize::new(0));
thread::scope(|s| {
for _ in 0..n {
let b = Arc::clone(&b);
let l = Arc::clone(&leaders);
s.spawn(move || {
if b.wait().is_leader() {
l.fetch_add(1, Ordering::SeqCst);
}
});
}
});
assert_eq!(leaders.load(Ordering::SeqCst), 1);
}
}#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_all_arrive() {
let n = 4;
let b = Arc::new(Barrier::new(n));
let c = Arc::new(AtomicUsize::new(0));
thread::scope(|s| {
for _ in 0..n {
let b = Arc::clone(&b);
let c = Arc::clone(&c);
s.spawn(move || {
c.fetch_add(1, Ordering::SeqCst);
b.wait();
assert_eq!(c.load(Ordering::SeqCst), n);
});
}
});
}
#[test]
fn test_one_leader() {
let n = 5;
let b = Arc::new(Barrier::new(n));
let leaders = Arc::new(AtomicUsize::new(0));
thread::scope(|s| {
for _ in 0..n {
let b = Arc::clone(&b);
let l = Arc::clone(&leaders);
s.spawn(move || {
if b.wait().is_leader() {
l.fetch_add(1, Ordering::SeqCst);
}
});
}
});
assert_eq!(leaders.load(Ordering::SeqCst), 1);
}
}
Exercises
is_leader() to have exactly one thread write intermediate results to a file between phases. All other threads continue with phase 2 while the leader writes.TimedBarrier that acts like Barrier but returns an error if not all threads arrive within a timeout. Use Condvar::wait_timeout_while internally.