ExamplesBy LevelBy TopicLearning Paths
452 Advanced

452: Atomic Types — Lock-Free Concurrent Primitives

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "452: Atomic Types — Lock-Free Concurrent Primitives" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Mutex-based synchronization has overhead: kernel entry, thread scheduling, potential contention. Key difference from OCaml: 1. **Ordering control**: Rust exposes `Ordering` explicitly enabling optimization; OCaml 5.x's atomics use sequential consistency without user control.

Tutorial

The Problem

Mutex-based synchronization has overhead: kernel entry, thread scheduling, potential contention. For simple operations on single values — incrementing a counter, setting a flag, tracking a maximum — atomic hardware instructions provide the same guarantee without a lock. Modern CPUs support atomic read-modify-write operations (fetch_add, compare_and_swap) that execute atomically from all other cores' perspectives. Rust's std::sync::atomic module exposes these directly.

Atomics power reference counting (Arc's inner counter), lock-free data structures, metrics counters, progress tracking, and any concurrent primitive where one CPU instruction is sufficient.

🎯 Learning Outcomes

  • • Understand atomic operations: fetch_add, fetch_sub, load, store, compare_and_swap
  • • Learn the Ordering parameter: SeqCst, Acquire, Release, Relaxed and when each is appropriate
  • • See how AtomicCounter, AtomicBool, and AtomicUsize enable lock-free concurrent primitives
  • • Understand why Arc's reference counting uses AtomicUsize not Mutex<usize>
  • • Learn the performance advantage: no kernel calls, no lock contention, no scheduling
  • Code Example

    let counter = AtomicUsize::new(0);
    counter.fetch_add(1, Ordering::SeqCst);

    Key Differences

  • Ordering control: Rust exposes Ordering explicitly enabling optimization; OCaml 5.x's atomics use sequential consistency without user control.
  • Type variety: Rust has AtomicBool, AtomicI8/U8 through AtomicI64/U64, AtomicIsize/Usize, AtomicPtr; OCaml has a single 'a Atomic.t.
  • Performance model: Rust's Relaxed ordering is the cheapest; OCaml's fixed SeqCst has consistent but potentially higher cost.
  • Arc counter: Rust's Arc uses AtomicUsize for reference counting directly; OCaml's GC handles reference counting transparently.
  • OCaml Approach

    OCaml 5.x provides Atomic.make, Atomic.get, Atomic.set, Atomic.compare_and_set, and Atomic.fetch_and_add for atomic operations. OCaml 4.x doesn't need atomics for reference counting since the GIL handles it. The Atomic.t type in OCaml 5.x is similar to Rust's AtomicT types but without explicit ordering specification — OCaml uses sequentially consistent semantics by default.

    Full Source

    #![allow(clippy::all)]
    //! # Atomic Types — Lock-Free Concurrent Primitives
    //!
    //! Use atomic types for lock-free operations on shared data.
    //! Atomics provide thread-safe operations without locks.
    
    use std::sync::atomic::{AtomicBool, AtomicI64, AtomicUsize, Ordering};
    use std::sync::Arc;
    use std::thread;
    
    /// Approach 1: Atomic counter
    pub struct AtomicCounter {
        value: AtomicUsize,
    }
    
    impl AtomicCounter {
        pub fn new(initial: usize) -> Self {
            Self {
                value: AtomicUsize::new(initial),
            }
        }
    
        pub fn increment(&self) -> usize {
            self.value.fetch_add(1, Ordering::SeqCst)
        }
    
        pub fn decrement(&self) -> usize {
            self.value.fetch_sub(1, Ordering::SeqCst)
        }
    
        pub fn get(&self) -> usize {
            self.value.load(Ordering::SeqCst)
        }
    
        pub fn set(&self, value: usize) {
            self.value.store(value, Ordering::SeqCst);
        }
    }
    
    /// Approach 2: Atomic flag for signaling
    pub struct AtomicFlag {
        flag: AtomicBool,
    }
    
    impl AtomicFlag {
        pub fn new() -> Self {
            Self {
                flag: AtomicBool::new(false),
            }
        }
    
        pub fn set(&self) {
            self.flag.store(true, Ordering::Release);
        }
    
        pub fn clear(&self) {
            self.flag.store(false, Ordering::Release);
        }
    
        pub fn is_set(&self) -> bool {
            self.flag.load(Ordering::Acquire)
        }
    
        /// Test and set: returns previous value
        pub fn test_and_set(&self) -> bool {
            self.flag.swap(true, Ordering::SeqCst)
        }
    }
    
    impl Default for AtomicFlag {
        fn default() -> Self {
            Self::new()
        }
    }
    
    /// Approach 3: Atomic max tracker
    pub struct AtomicMax {
        value: AtomicI64,
    }
    
    impl AtomicMax {
        pub fn new(initial: i64) -> Self {
            Self {
                value: AtomicI64::new(initial),
            }
        }
    
        pub fn update(&self, new: i64) -> i64 {
            let mut current = self.value.load(Ordering::Relaxed);
            loop {
                if new <= current {
                    return current;
                }
                match self.value.compare_exchange_weak(
                    current,
                    new,
                    Ordering::SeqCst,
                    Ordering::Relaxed,
                ) {
                    Ok(_) => return new,
                    Err(actual) => current = actual,
                }
            }
        }
    
        pub fn get(&self) -> i64 {
            self.value.load(Ordering::SeqCst)
        }
    }
    
    /// Parallel increment test
    pub fn parallel_increment(threads: usize, increments: usize) -> usize {
        let counter = Arc::new(AtomicUsize::new(0));
    
        let handles: Vec<_> = (0..threads)
            .map(|_| {
                let c = Arc::clone(&counter);
                thread::spawn(move || {
                    for _ in 0..increments {
                        c.fetch_add(1, Ordering::Relaxed);
                    }
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
    
        counter.load(Ordering::SeqCst)
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_atomic_counter_basic() {
            let counter = AtomicCounter::new(0);
            assert_eq!(counter.increment(), 0);
            assert_eq!(counter.increment(), 1);
            assert_eq!(counter.get(), 2);
        }
    
        #[test]
        fn test_atomic_counter_concurrent() {
            let counter = Arc::new(AtomicCounter::new(0));
    
            let handles: Vec<_> = (0..4)
                .map(|_| {
                    let c = Arc::clone(&counter);
                    thread::spawn(move || {
                        for _ in 0..100 {
                            c.increment();
                        }
                    })
                })
                .collect();
    
            for h in handles {
                h.join().unwrap();
            }
    
            assert_eq!(counter.get(), 400);
        }
    
        #[test]
        fn test_atomic_flag() {
            let flag = AtomicFlag::new();
            assert!(!flag.is_set());
    
            flag.set();
            assert!(flag.is_set());
    
            flag.clear();
            assert!(!flag.is_set());
        }
    
        #[test]
        fn test_atomic_flag_test_and_set() {
            let flag = AtomicFlag::new();
    
            let prev = flag.test_and_set();
            assert!(!prev); // was false
            assert!(flag.is_set());
    
            let prev = flag.test_and_set();
            assert!(prev); // was true
        }
    
        #[test]
        fn test_atomic_max() {
            let max = AtomicMax::new(0);
    
            max.update(5);
            assert_eq!(max.get(), 5);
    
            max.update(3); // Less than current
            assert_eq!(max.get(), 5);
    
            max.update(10);
            assert_eq!(max.get(), 10);
        }
    
        #[test]
        fn test_parallel_increment() {
            let result = parallel_increment(4, 1000);
            assert_eq!(result, 4000);
        }
    
        #[test]
        fn test_fetch_operations() {
            let a = AtomicUsize::new(10);
    
            let old = a.fetch_add(5, Ordering::SeqCst);
            assert_eq!(old, 10);
            assert_eq!(a.load(Ordering::SeqCst), 15);
    
            let old = a.fetch_sub(3, Ordering::SeqCst);
            assert_eq!(old, 15);
            assert_eq!(a.load(Ordering::SeqCst), 12);
        }
    
        #[test]
        fn test_swap() {
            let flag = AtomicBool::new(false);
            let prev = flag.swap(true, Ordering::SeqCst);
            assert!(!prev);
            assert!(flag.load(Ordering::SeqCst));
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_atomic_counter_basic() {
            let counter = AtomicCounter::new(0);
            assert_eq!(counter.increment(), 0);
            assert_eq!(counter.increment(), 1);
            assert_eq!(counter.get(), 2);
        }
    
        #[test]
        fn test_atomic_counter_concurrent() {
            let counter = Arc::new(AtomicCounter::new(0));
    
            let handles: Vec<_> = (0..4)
                .map(|_| {
                    let c = Arc::clone(&counter);
                    thread::spawn(move || {
                        for _ in 0..100 {
                            c.increment();
                        }
                    })
                })
                .collect();
    
            for h in handles {
                h.join().unwrap();
            }
    
            assert_eq!(counter.get(), 400);
        }
    
        #[test]
        fn test_atomic_flag() {
            let flag = AtomicFlag::new();
            assert!(!flag.is_set());
    
            flag.set();
            assert!(flag.is_set());
    
            flag.clear();
            assert!(!flag.is_set());
        }
    
        #[test]
        fn test_atomic_flag_test_and_set() {
            let flag = AtomicFlag::new();
    
            let prev = flag.test_and_set();
            assert!(!prev); // was false
            assert!(flag.is_set());
    
            let prev = flag.test_and_set();
            assert!(prev); // was true
        }
    
        #[test]
        fn test_atomic_max() {
            let max = AtomicMax::new(0);
    
            max.update(5);
            assert_eq!(max.get(), 5);
    
            max.update(3); // Less than current
            assert_eq!(max.get(), 5);
    
            max.update(10);
            assert_eq!(max.get(), 10);
        }
    
        #[test]
        fn test_parallel_increment() {
            let result = parallel_increment(4, 1000);
            assert_eq!(result, 4000);
        }
    
        #[test]
        fn test_fetch_operations() {
            let a = AtomicUsize::new(10);
    
            let old = a.fetch_add(5, Ordering::SeqCst);
            assert_eq!(old, 10);
            assert_eq!(a.load(Ordering::SeqCst), 15);
    
            let old = a.fetch_sub(3, Ordering::SeqCst);
            assert_eq!(old, 15);
            assert_eq!(a.load(Ordering::SeqCst), 12);
        }
    
        #[test]
        fn test_swap() {
            let flag = AtomicBool::new(false);
            let prev = flag.swap(true, Ordering::SeqCst);
            assert!(!prev);
            assert!(flag.load(Ordering::SeqCst));
        }
    }

    Deep Comparison

    OCaml vs Rust: Atomic Types

    Atomic Counter

    OCaml 5

    let counter = Atomic.make 0
    let () = Atomic.fetch_and_add counter 1
    

    Rust

    let counter = AtomicUsize::new(0);
    counter.fetch_add(1, Ordering::SeqCst);
    

    Key Differences

    FeatureOCamlRust
    Type'a Atomic.tAtomic{Bool,Usize,Ptr,...}
    OrderingImplicit SeqCstExplicit parameter
    OperationsLimited setFull RMW suite

    Exercises

  • Lock-free max: Implement fn update_max(current: &AtomicUsize, value: usize) that atomically updates the stored maximum. Use compare_exchange in a loop. Test with 16 threads each trying to set the max.
  • Ordering experiment: Write a test demonstrating that Relaxed ordering on two independent counters can produce results that would be impossible with sequential consistency. (Hint: requires specific CPU architectures — document why x86 may not show the effect.)
  • Arc from scratch: Implement a simplified MyArc<T> using a raw pointer to (T, AtomicUsize). Implement Clone (increment count with fetch_add) and Drop (decrement with fetch_sub, free if zero). Verify correctness with 4 threads sharing the same value.
  • Open Source Repos