452: Atomic Types — Lock-Free Concurrent Primitives
Tutorial Video
Text description (accessibility)
This video demonstrates the "452: Atomic Types — Lock-Free Concurrent Primitives" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Mutex-based synchronization has overhead: kernel entry, thread scheduling, potential contention. Key difference from OCaml: 1. **Ordering control**: Rust exposes `Ordering` explicitly enabling optimization; OCaml 5.x's atomics use sequential consistency without user control.
Tutorial
The Problem
Mutex-based synchronization has overhead: kernel entry, thread scheduling, potential contention. For simple operations on single values — incrementing a counter, setting a flag, tracking a maximum — atomic hardware instructions provide the same guarantee without a lock. Modern CPUs support atomic read-modify-write operations (fetch_add, compare_and_swap) that execute atomically from all other cores' perspectives. Rust's std::sync::atomic module exposes these directly.
Atomics power reference counting (Arc's inner counter), lock-free data structures, metrics counters, progress tracking, and any concurrent primitive where one CPU instruction is sufficient.
🎯 Learning Outcomes
Ordering parameter: SeqCst, Acquire, Release, Relaxed and when each is appropriateAtomicCounter, AtomicBool, and AtomicUsize enable lock-free concurrent primitivesArc's reference counting uses AtomicUsize not Mutex<usize>Code Example
let counter = AtomicUsize::new(0);
counter.fetch_add(1, Ordering::SeqCst);Key Differences
Ordering explicitly enabling optimization; OCaml 5.x's atomics use sequential consistency without user control.AtomicBool, AtomicI8/U8 through AtomicI64/U64, AtomicIsize/Usize, AtomicPtr; OCaml has a single 'a Atomic.t.Relaxed ordering is the cheapest; OCaml's fixed SeqCst has consistent but potentially higher cost.Arc uses AtomicUsize for reference counting directly; OCaml's GC handles reference counting transparently.OCaml Approach
OCaml 5.x provides Atomic.make, Atomic.get, Atomic.set, Atomic.compare_and_set, and Atomic.fetch_and_add for atomic operations. OCaml 4.x doesn't need atomics for reference counting since the GIL handles it. The Atomic.t type in OCaml 5.x is similar to Rust's AtomicT types but without explicit ordering specification — OCaml uses sequentially consistent semantics by default.
Full Source
#![allow(clippy::all)]
//! # Atomic Types — Lock-Free Concurrent Primitives
//!
//! Use atomic types for lock-free operations on shared data.
//! Atomics provide thread-safe operations without locks.
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
/// Approach 1: Atomic counter
pub struct AtomicCounter {
value: AtomicUsize,
}
impl AtomicCounter {
pub fn new(initial: usize) -> Self {
Self {
value: AtomicUsize::new(initial),
}
}
pub fn increment(&self) -> usize {
self.value.fetch_add(1, Ordering::SeqCst)
}
pub fn decrement(&self) -> usize {
self.value.fetch_sub(1, Ordering::SeqCst)
}
pub fn get(&self) -> usize {
self.value.load(Ordering::SeqCst)
}
pub fn set(&self, value: usize) {
self.value.store(value, Ordering::SeqCst);
}
}
/// Approach 2: Atomic flag for signaling
pub struct AtomicFlag {
flag: AtomicBool,
}
impl AtomicFlag {
pub fn new() -> Self {
Self {
flag: AtomicBool::new(false),
}
}
pub fn set(&self) {
self.flag.store(true, Ordering::Release);
}
pub fn clear(&self) {
self.flag.store(false, Ordering::Release);
}
pub fn is_set(&self) -> bool {
self.flag.load(Ordering::Acquire)
}
/// Test and set: returns previous value
pub fn test_and_set(&self) -> bool {
self.flag.swap(true, Ordering::SeqCst)
}
}
impl Default for AtomicFlag {
fn default() -> Self {
Self::new()
}
}
/// Approach 3: Atomic max tracker
pub struct AtomicMax {
value: AtomicI64,
}
impl AtomicMax {
pub fn new(initial: i64) -> Self {
Self {
value: AtomicI64::new(initial),
}
}
pub fn update(&self, new: i64) -> i64 {
let mut current = self.value.load(Ordering::Relaxed);
loop {
if new <= current {
return current;
}
match self.value.compare_exchange_weak(
current,
new,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => return new,
Err(actual) => current = actual,
}
}
}
pub fn get(&self) -> i64 {
self.value.load(Ordering::SeqCst)
}
}
/// Parallel increment test
pub fn parallel_increment(threads: usize, increments: usize) -> usize {
let counter = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..threads)
.map(|_| {
let c = Arc::clone(&counter);
thread::spawn(move || {
for _ in 0..increments {
c.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
counter.load(Ordering::SeqCst)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_atomic_counter_basic() {
let counter = AtomicCounter::new(0);
assert_eq!(counter.increment(), 0);
assert_eq!(counter.increment(), 1);
assert_eq!(counter.get(), 2);
}
#[test]
fn test_atomic_counter_concurrent() {
let counter = Arc::new(AtomicCounter::new(0));
let handles: Vec<_> = (0..4)
.map(|_| {
let c = Arc::clone(&counter);
thread::spawn(move || {
for _ in 0..100 {
c.increment();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(counter.get(), 400);
}
#[test]
fn test_atomic_flag() {
let flag = AtomicFlag::new();
assert!(!flag.is_set());
flag.set();
assert!(flag.is_set());
flag.clear();
assert!(!flag.is_set());
}
#[test]
fn test_atomic_flag_test_and_set() {
let flag = AtomicFlag::new();
let prev = flag.test_and_set();
assert!(!prev); // was false
assert!(flag.is_set());
let prev = flag.test_and_set();
assert!(prev); // was true
}
#[test]
fn test_atomic_max() {
let max = AtomicMax::new(0);
max.update(5);
assert_eq!(max.get(), 5);
max.update(3); // Less than current
assert_eq!(max.get(), 5);
max.update(10);
assert_eq!(max.get(), 10);
}
#[test]
fn test_parallel_increment() {
let result = parallel_increment(4, 1000);
assert_eq!(result, 4000);
}
#[test]
fn test_fetch_operations() {
let a = AtomicUsize::new(10);
let old = a.fetch_add(5, Ordering::SeqCst);
assert_eq!(old, 10);
assert_eq!(a.load(Ordering::SeqCst), 15);
let old = a.fetch_sub(3, Ordering::SeqCst);
assert_eq!(old, 15);
assert_eq!(a.load(Ordering::SeqCst), 12);
}
#[test]
fn test_swap() {
let flag = AtomicBool::new(false);
let prev = flag.swap(true, Ordering::SeqCst);
assert!(!prev);
assert!(flag.load(Ordering::SeqCst));
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_atomic_counter_basic() {
let counter = AtomicCounter::new(0);
assert_eq!(counter.increment(), 0);
assert_eq!(counter.increment(), 1);
assert_eq!(counter.get(), 2);
}
#[test]
fn test_atomic_counter_concurrent() {
let counter = Arc::new(AtomicCounter::new(0));
let handles: Vec<_> = (0..4)
.map(|_| {
let c = Arc::clone(&counter);
thread::spawn(move || {
for _ in 0..100 {
c.increment();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(counter.get(), 400);
}
#[test]
fn test_atomic_flag() {
let flag = AtomicFlag::new();
assert!(!flag.is_set());
flag.set();
assert!(flag.is_set());
flag.clear();
assert!(!flag.is_set());
}
#[test]
fn test_atomic_flag_test_and_set() {
let flag = AtomicFlag::new();
let prev = flag.test_and_set();
assert!(!prev); // was false
assert!(flag.is_set());
let prev = flag.test_and_set();
assert!(prev); // was true
}
#[test]
fn test_atomic_max() {
let max = AtomicMax::new(0);
max.update(5);
assert_eq!(max.get(), 5);
max.update(3); // Less than current
assert_eq!(max.get(), 5);
max.update(10);
assert_eq!(max.get(), 10);
}
#[test]
fn test_parallel_increment() {
let result = parallel_increment(4, 1000);
assert_eq!(result, 4000);
}
#[test]
fn test_fetch_operations() {
let a = AtomicUsize::new(10);
let old = a.fetch_add(5, Ordering::SeqCst);
assert_eq!(old, 10);
assert_eq!(a.load(Ordering::SeqCst), 15);
let old = a.fetch_sub(3, Ordering::SeqCst);
assert_eq!(old, 15);
assert_eq!(a.load(Ordering::SeqCst), 12);
}
#[test]
fn test_swap() {
let flag = AtomicBool::new(false);
let prev = flag.swap(true, Ordering::SeqCst);
assert!(!prev);
assert!(flag.load(Ordering::SeqCst));
}
}
Deep Comparison
OCaml vs Rust: Atomic Types
Atomic Counter
OCaml 5
let counter = Atomic.make 0
let () = Atomic.fetch_and_add counter 1
Rust
let counter = AtomicUsize::new(0);
counter.fetch_add(1, Ordering::SeqCst);
Key Differences
| Feature | OCaml | Rust |
|---|---|---|
| Type | 'a Atomic.t | Atomic{Bool,Usize,Ptr,...} |
| Ordering | Implicit SeqCst | Explicit parameter |
| Operations | Limited set | Full RMW suite |
Exercises
fn update_max(current: &AtomicUsize, value: usize) that atomically updates the stored maximum. Use compare_exchange in a loop. Test with 16 threads each trying to set the max.Relaxed ordering on two independent counters can produce results that would be impossible with sequential consistency. (Hint: requires specific CPU architectures — document why x86 may not show the effect.)MyArc<T> using a raw pointer to (T, AtomicUsize). Implement Clone (increment count with fetch_add) and Drop (decrement with fetch_sub, free if zero). Verify correctness with 4 threads sharing the same value.