ExamplesBy LevelBy TopicLearning Paths
443 Advanced

443: `Arc<Mutex<T>>` — Shared Mutable State

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "443: `Arc<Mutex<T>>` — Shared Mutable State" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Multiple threads sharing a single mutable value is a fundamental concurrency pattern: a counter, a shared cache, a work queue. Key difference from OCaml: 1. **Type enforcement**: Rust's `Mutex<T>` forces all access through the lock; OCaml's `Mutex.t` is advisory — you can access the value without locking.

Tutorial

The Problem

Multiple threads sharing a single mutable value is a fundamental concurrency pattern: a counter, a shared cache, a work queue. Rust's ownership model normally prevents this — only one owner can have mutable access. Arc<T> enables multiple-ownership across threads (atomic reference counting), and Mutex<T> ensures exclusive access — only one thread holds the lock at a time. Together, Arc<Mutex<T>> is the standard Rust pattern for shared mutable state across threads.

Arc<Mutex<T>> appears in every multi-threaded Rust program: shared caches, event buses, job queues, game state machines, and any pattern requiring coordinated mutation from multiple threads.

🎯 Learning Outcomes

  • • Understand how Arc (atomic reference counting) enables shared ownership across threads
  • • Learn how Mutex provides exclusive access with automatic unlock on drop (RAII guard)
  • • See the Arc::clone(&counter) pattern for sharing ownership across spawned threads
  • • Understand the lock guard pattern: counter.lock().unwrap() returns a MutexGuard
  • • Learn the performance implication: every lock acquisition is a synchronization point
  • Code Example

    let counter = Arc::new(Mutex::new(0u64));
    
    let handles: Vec<_> = (0..10).map(|_| {
        let c = Arc::clone(&counter);
        thread::spawn(move || {
            for _ in 0..100 {
                *c.lock().unwrap() += 1;
            }
        })
    }).collect();
    
    for h in handles { h.join().unwrap(); }
    println!("Counter: {}", *counter.lock().unwrap());

    Key Differences

  • Type enforcement: Rust's Mutex<T> forces all access through the lock; OCaml's Mutex.t is advisory — you can access the value without locking.
  • Poisoning: Rust's mutex becomes "poisoned" if a thread panics while holding it; subsequent lock() calls return Err. OCaml has no poisoning concept.
  • RAII unlock: Rust's MutexGuard unlocks on drop automatically; OCaml requires explicit Mutex.unlock (forgetting it = deadlock).
  • Arc vs. GC: Rust needs Arc for shared ownership; OCaml's GC manages reference counting transparently.
  • OCaml Approach

    OCaml uses Mutex.create() and Mutex.lock/Mutex.unlock for mutual exclusion. Shared mutable state is ref or mutable record fields protected by a mutex. OCaml 5.x's Atomic.t handles simple counter patterns without locks. Unlike Rust, OCaml doesn't enforce that all accesses to a shared value go through a mutex — the type system doesn't track this invariant.

    Full Source

    #![allow(clippy::all)]
    //! # Arc<Mutex<T>> — Shared Mutable State Across Threads
    //!
    //! Share a single mutable value across multiple threads using `Arc` for
    //! ownership and `Mutex` for exclusive access.
    
    use std::sync::{Arc, Mutex};
    use std::thread;
    
    /// Approach 1: Shared counter with multiple threads
    pub fn parallel_increment(num_threads: usize, increments_per_thread: usize) -> u64 {
        let counter = Arc::new(Mutex::new(0u64));
    
        let handles: Vec<_> = (0..num_threads)
            .map(|_| {
                let c = Arc::clone(&counter);
                thread::spawn(move || {
                    for _ in 0..increments_per_thread {
                        *c.lock().unwrap() += 1;
                    }
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
    
        let result = *counter.lock().unwrap();
        result
    }
    
    /// Approach 2: Shared collection (Vec)
    pub fn parallel_collect<T, F>(num_threads: usize, producer: F) -> Vec<T>
    where
        T: Send + std::fmt::Debug + 'static,
        F: Fn(usize) -> T + Send + Sync + 'static + Clone,
    {
        let results: Arc<Mutex<Vec<T>>> = Arc::new(Mutex::new(Vec::new()));
    
        let handles: Vec<_> = (0..num_threads)
            .map(|i| {
                let results = Arc::clone(&results);
                let producer = producer.clone();
                thread::spawn(move || {
                    let value = producer(i);
                    results.lock().unwrap().push(value);
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
    
        Arc::try_unwrap(results)
            .expect("all threads joined")
            .into_inner()
            .unwrap()
    }
    
    /// Approach 3: try_lock for non-blocking access
    pub fn try_lock_demo() -> Option<u64> {
        let data = Arc::new(Mutex::new(42u64));
        let data_clone = Arc::clone(&data);
    
        // Hold the lock in main thread
        let _guard = data.lock().unwrap();
    
        // Another thread tries to get it
        let handle = thread::spawn(move || {
            // try_lock returns Err if lock is held
            match data_clone.try_lock() {
                Ok(mut guard) => {
                    *guard += 1;
                    Some(*guard)
                }
                Err(_) => None, // Lock was held
            }
        });
    
        handle.join().unwrap()
    }
    
    /// Thread-safe accumulator struct
    pub struct SharedAccumulator {
        value: Arc<Mutex<i64>>,
    }
    
    impl SharedAccumulator {
        pub fn new(initial: i64) -> Self {
            Self {
                value: Arc::new(Mutex::new(initial)),
            }
        }
    
        pub fn add(&self, amount: i64) {
            *self.value.lock().unwrap() += amount;
        }
    
        pub fn get(&self) -> i64 {
            *self.value.lock().unwrap()
        }
    
        pub fn clone_handle(&self) -> Arc<Mutex<i64>> {
            Arc::clone(&self.value)
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_parallel_increment_10_threads() {
            let result = parallel_increment(10, 100);
            assert_eq!(result, 1000);
        }
    
        #[test]
        fn test_parallel_increment_single_thread() {
            let result = parallel_increment(1, 500);
            assert_eq!(result, 500);
        }
    
        #[test]
        fn test_parallel_collect() {
            let mut results = parallel_collect(4, |i| format!("thread-{}", i));
            results.sort();
            assert_eq!(results.len(), 4);
            assert!(results.contains(&String::from("thread-0")));
            assert!(results.contains(&String::from("thread-3")));
        }
    
        #[test]
        fn test_try_lock_fails_when_held() {
            let result = try_lock_demo();
            assert_eq!(result, None);
        }
    
        #[test]
        fn test_try_lock_succeeds_when_free() {
            let m = Mutex::new(0);
            {
                let guard = m.try_lock();
                assert!(guard.is_ok());
            }
            // Lock released, try again
            assert!(m.try_lock().is_ok());
        }
    
        #[test]
        fn test_shared_accumulator() {
            let acc = SharedAccumulator::new(0);
            let handle = acc.clone_handle();
    
            thread::scope(|s| {
                s.spawn(|| {
                    for _ in 0..100 {
                        *handle.lock().unwrap() += 1;
                    }
                });
                s.spawn(|| {
                    for _ in 0..100 {
                        acc.add(1);
                    }
                });
            });
    
            assert_eq!(acc.get(), 200);
        }
    
        #[test]
        fn test_mutex_guard_drops_on_scope_exit() {
            let m = Mutex::new(vec![1, 2, 3]);
            {
                let mut guard = m.lock().unwrap();
                guard.push(4);
            } // guard drops here
            assert_eq!(m.lock().unwrap().len(), 4);
        }
    
        #[test]
        fn test_arc_clone_count() {
            let data = Arc::new(Mutex::new(0));
            assert_eq!(Arc::strong_count(&data), 1);
    
            let clone1 = Arc::clone(&data);
            assert_eq!(Arc::strong_count(&data), 2);
    
            drop(clone1);
            assert_eq!(Arc::strong_count(&data), 1);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_parallel_increment_10_threads() {
            let result = parallel_increment(10, 100);
            assert_eq!(result, 1000);
        }
    
        #[test]
        fn test_parallel_increment_single_thread() {
            let result = parallel_increment(1, 500);
            assert_eq!(result, 500);
        }
    
        #[test]
        fn test_parallel_collect() {
            let mut results = parallel_collect(4, |i| format!("thread-{}", i));
            results.sort();
            assert_eq!(results.len(), 4);
            assert!(results.contains(&String::from("thread-0")));
            assert!(results.contains(&String::from("thread-3")));
        }
    
        #[test]
        fn test_try_lock_fails_when_held() {
            let result = try_lock_demo();
            assert_eq!(result, None);
        }
    
        #[test]
        fn test_try_lock_succeeds_when_free() {
            let m = Mutex::new(0);
            {
                let guard = m.try_lock();
                assert!(guard.is_ok());
            }
            // Lock released, try again
            assert!(m.try_lock().is_ok());
        }
    
        #[test]
        fn test_shared_accumulator() {
            let acc = SharedAccumulator::new(0);
            let handle = acc.clone_handle();
    
            thread::scope(|s| {
                s.spawn(|| {
                    for _ in 0..100 {
                        *handle.lock().unwrap() += 1;
                    }
                });
                s.spawn(|| {
                    for _ in 0..100 {
                        acc.add(1);
                    }
                });
            });
    
            assert_eq!(acc.get(), 200);
        }
    
        #[test]
        fn test_mutex_guard_drops_on_scope_exit() {
            let m = Mutex::new(vec![1, 2, 3]);
            {
                let mut guard = m.lock().unwrap();
                guard.push(4);
            } // guard drops here
            assert_eq!(m.lock().unwrap().len(), 4);
        }
    
        #[test]
        fn test_arc_clone_count() {
            let data = Arc::new(Mutex::new(0));
            assert_eq!(Arc::strong_count(&data), 1);
    
            let clone1 = Arc::clone(&data);
            assert_eq!(Arc::strong_count(&data), 2);
    
            drop(clone1);
            assert_eq!(Arc::strong_count(&data), 1);
        }
    }

    Deep Comparison

    OCaml vs Rust: Arc<Mutex<T>>

    Shared Counter Pattern

    OCaml

    let counter = ref 0
    let mutex   = Mutex.create ()
    
    let () =
      let threads = List.init 10 (fun _ ->
        Thread.create (fun () ->
          for _ = 1 to 100 do
            Mutex.lock mutex;
            incr counter;
            Mutex.unlock mutex
          done) ()
      ) in
      List.iter Thread.join threads;
      Printf.printf "Counter = %d\n" !counter
    

    Rust

    let counter = Arc::new(Mutex::new(0u64));
    
    let handles: Vec<_> = (0..10).map(|_| {
        let c = Arc::clone(&counter);
        thread::spawn(move || {
            for _ in 0..100 {
                *c.lock().unwrap() += 1;
            }
        })
    }).collect();
    
    for h in handles { h.join().unwrap(); }
    println!("Counter: {}", *counter.lock().unwrap());
    

    Key Differences

    FeatureOCamlRust
    Data + LockSeparate (ref + Mutex.t)Unified (Mutex<T> wraps data)
    Forget to lockPossible (data accessible without lock)Impossible (data inside mutex)
    Lock acquisitionMutex.lock mm.lock().unwrap()
    UnlockManual Mutex.unlock mAutomatic (guard drops)
    Shared ownershipGCArc::clone(&arc)
    Error on held lockBlocksBlocks (or try_lockErr)

    Lock Guard RAII

    OCaml

    (* Manual unlock required — easy to forget on error path *)
    Mutex.lock mutex;
    (* do work *)
    Mutex.unlock mutex
    

    Rust

    {
        let mut guard = mutex.lock().unwrap();
        *guard += 1;
    } // guard dropped → lock released automatically
    // Even on panic, Drop runs and releases the lock
    

    Shared Collections

    OCaml

    let log = ref []
    let mutex = Mutex.create ()
    
    let add_log msg =
      Mutex.lock mutex;
      log := msg :: !log;
      Mutex.unlock mutex
    

    Rust

    let log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(vec![]));
    
    let log_clone = Arc::clone(&log);
    thread::spawn(move || {
        log_clone.lock().unwrap().push("message".into());
    });
    

    Poisoning (Rust-specific)

    // If a thread panics while holding the lock, mutex becomes "poisoned"
    let mutex = Arc::new(Mutex::new(0));
    let m = Arc::clone(&mutex);
    
    let _ = thread::spawn(move || {
        let _guard = m.lock().unwrap();
        panic!("boom");
    }).join();
    
    // Subsequent locks return Err(PoisonError)
    match mutex.lock() {
        Ok(guard) => println!("got {}", *guard),
        Err(poisoned) => {
            // Can still recover the data
            let guard = poisoned.into_inner();
            println!("recovered: {}", *guard);
        }
    }
    

    OCaml has no equivalent — exceptions in threads propagate differently.

    Exercises

  • Rate limiter: Build a RateLimiter using Arc<Mutex<(u32, Instant)>> tracking (count, window_start). fn check_and_increment(&self) -> bool returns true if the rate limit allows the request, false if exceeded. Test with concurrent threads.
  • Bounded queue: Implement a BoundedQueue<T> wrapping Arc<Mutex<VecDeque<T>>> with a capacity limit. push returns Err(val) when full; pop returns None when empty. Verify correct behavior with producer/consumer threads.
  • Deadlock avoidance: Write a program that could deadlock with Arc<Mutex<T>> (two threads each holding one lock waiting for the other's lock). Then fix it using lock ordering or try_lock with backoff.
  • Open Source Repos