443: `Arc<Mutex<T>>` — Shared Mutable State
Tutorial Video
Text description (accessibility)
This video demonstrates the "443: `Arc<Mutex<T>>` — Shared Mutable State" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Multiple threads sharing a single mutable value is a fundamental concurrency pattern: a counter, a shared cache, a work queue. Key difference from OCaml: 1. **Type enforcement**: Rust's `Mutex<T>` forces all access through the lock; OCaml's `Mutex.t` is advisory — you can access the value without locking.
Tutorial
The Problem
Multiple threads sharing a single mutable value is a fundamental concurrency pattern: a counter, a shared cache, a work queue. Rust's ownership model normally prevents this — only one owner can have mutable access. Arc<T> enables multiple-ownership across threads (atomic reference counting), and Mutex<T> ensures exclusive access — only one thread holds the lock at a time. Together, Arc<Mutex<T>> is the standard Rust pattern for shared mutable state across threads.
Arc<Mutex<T>> appears in every multi-threaded Rust program: shared caches, event buses, job queues, game state machines, and any pattern requiring coordinated mutation from multiple threads.
🎯 Learning Outcomes
Arc (atomic reference counting) enables shared ownership across threadsMutex provides exclusive access with automatic unlock on drop (RAII guard)Arc::clone(&counter) pattern for sharing ownership across spawned threadscounter.lock().unwrap() returns a MutexGuardCode Example
let counter = Arc::new(Mutex::new(0u64));
let handles: Vec<_> = (0..10).map(|_| {
let c = Arc::clone(&counter);
thread::spawn(move || {
for _ in 0..100 {
*c.lock().unwrap() += 1;
}
})
}).collect();
for h in handles { h.join().unwrap(); }
println!("Counter: {}", *counter.lock().unwrap());Key Differences
Mutex<T> forces all access through the lock; OCaml's Mutex.t is advisory — you can access the value without locking.lock() calls return Err. OCaml has no poisoning concept.MutexGuard unlocks on drop automatically; OCaml requires explicit Mutex.unlock (forgetting it = deadlock).Arc for shared ownership; OCaml's GC manages reference counting transparently.OCaml Approach
OCaml uses Mutex.create() and Mutex.lock/Mutex.unlock for mutual exclusion. Shared mutable state is ref or mutable record fields protected by a mutex. OCaml 5.x's Atomic.t handles simple counter patterns without locks. Unlike Rust, OCaml doesn't enforce that all accesses to a shared value go through a mutex — the type system doesn't track this invariant.
Full Source
#![allow(clippy::all)]
//! # Arc<Mutex<T>> — Shared Mutable State Across Threads
//!
//! Share a single mutable value across multiple threads using `Arc` for
//! ownership and `Mutex` for exclusive access.
use std::sync::{Arc, Mutex};
use std::thread;
/// Approach 1: Shared counter with multiple threads
pub fn parallel_increment(num_threads: usize, increments_per_thread: usize) -> u64 {
let counter = Arc::new(Mutex::new(0u64));
let handles: Vec<_> = (0..num_threads)
.map(|_| {
let c = Arc::clone(&counter);
thread::spawn(move || {
for _ in 0..increments_per_thread {
*c.lock().unwrap() += 1;
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let result = *counter.lock().unwrap();
result
}
/// Approach 2: Shared collection (Vec)
pub fn parallel_collect<T, F>(num_threads: usize, producer: F) -> Vec<T>
where
T: Send + std::fmt::Debug + 'static,
F: Fn(usize) -> T + Send + Sync + 'static + Clone,
{
let results: Arc<Mutex<Vec<T>>> = Arc::new(Mutex::new(Vec::new()));
let handles: Vec<_> = (0..num_threads)
.map(|i| {
let results = Arc::clone(&results);
let producer = producer.clone();
thread::spawn(move || {
let value = producer(i);
results.lock().unwrap().push(value);
})
})
.collect();
for h in handles {
h.join().unwrap();
}
Arc::try_unwrap(results)
.expect("all threads joined")
.into_inner()
.unwrap()
}
/// Approach 3: try_lock for non-blocking access
pub fn try_lock_demo() -> Option<u64> {
let data = Arc::new(Mutex::new(42u64));
let data_clone = Arc::clone(&data);
// Hold the lock in main thread
let _guard = data.lock().unwrap();
// Another thread tries to get it
let handle = thread::spawn(move || {
// try_lock returns Err if lock is held
match data_clone.try_lock() {
Ok(mut guard) => {
*guard += 1;
Some(*guard)
}
Err(_) => None, // Lock was held
}
});
handle.join().unwrap()
}
/// Thread-safe accumulator struct
pub struct SharedAccumulator {
value: Arc<Mutex<i64>>,
}
impl SharedAccumulator {
pub fn new(initial: i64) -> Self {
Self {
value: Arc::new(Mutex::new(initial)),
}
}
pub fn add(&self, amount: i64) {
*self.value.lock().unwrap() += amount;
}
pub fn get(&self) -> i64 {
*self.value.lock().unwrap()
}
pub fn clone_handle(&self) -> Arc<Mutex<i64>> {
Arc::clone(&self.value)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_increment_10_threads() {
let result = parallel_increment(10, 100);
assert_eq!(result, 1000);
}
#[test]
fn test_parallel_increment_single_thread() {
let result = parallel_increment(1, 500);
assert_eq!(result, 500);
}
#[test]
fn test_parallel_collect() {
let mut results = parallel_collect(4, |i| format!("thread-{}", i));
results.sort();
assert_eq!(results.len(), 4);
assert!(results.contains(&String::from("thread-0")));
assert!(results.contains(&String::from("thread-3")));
}
#[test]
fn test_try_lock_fails_when_held() {
let result = try_lock_demo();
assert_eq!(result, None);
}
#[test]
fn test_try_lock_succeeds_when_free() {
let m = Mutex::new(0);
{
let guard = m.try_lock();
assert!(guard.is_ok());
}
// Lock released, try again
assert!(m.try_lock().is_ok());
}
#[test]
fn test_shared_accumulator() {
let acc = SharedAccumulator::new(0);
let handle = acc.clone_handle();
thread::scope(|s| {
s.spawn(|| {
for _ in 0..100 {
*handle.lock().unwrap() += 1;
}
});
s.spawn(|| {
for _ in 0..100 {
acc.add(1);
}
});
});
assert_eq!(acc.get(), 200);
}
#[test]
fn test_mutex_guard_drops_on_scope_exit() {
let m = Mutex::new(vec![1, 2, 3]);
{
let mut guard = m.lock().unwrap();
guard.push(4);
} // guard drops here
assert_eq!(m.lock().unwrap().len(), 4);
}
#[test]
fn test_arc_clone_count() {
let data = Arc::new(Mutex::new(0));
assert_eq!(Arc::strong_count(&data), 1);
let clone1 = Arc::clone(&data);
assert_eq!(Arc::strong_count(&data), 2);
drop(clone1);
assert_eq!(Arc::strong_count(&data), 1);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_increment_10_threads() {
let result = parallel_increment(10, 100);
assert_eq!(result, 1000);
}
#[test]
fn test_parallel_increment_single_thread() {
let result = parallel_increment(1, 500);
assert_eq!(result, 500);
}
#[test]
fn test_parallel_collect() {
let mut results = parallel_collect(4, |i| format!("thread-{}", i));
results.sort();
assert_eq!(results.len(), 4);
assert!(results.contains(&String::from("thread-0")));
assert!(results.contains(&String::from("thread-3")));
}
#[test]
fn test_try_lock_fails_when_held() {
let result = try_lock_demo();
assert_eq!(result, None);
}
#[test]
fn test_try_lock_succeeds_when_free() {
let m = Mutex::new(0);
{
let guard = m.try_lock();
assert!(guard.is_ok());
}
// Lock released, try again
assert!(m.try_lock().is_ok());
}
#[test]
fn test_shared_accumulator() {
let acc = SharedAccumulator::new(0);
let handle = acc.clone_handle();
thread::scope(|s| {
s.spawn(|| {
for _ in 0..100 {
*handle.lock().unwrap() += 1;
}
});
s.spawn(|| {
for _ in 0..100 {
acc.add(1);
}
});
});
assert_eq!(acc.get(), 200);
}
#[test]
fn test_mutex_guard_drops_on_scope_exit() {
let m = Mutex::new(vec![1, 2, 3]);
{
let mut guard = m.lock().unwrap();
guard.push(4);
} // guard drops here
assert_eq!(m.lock().unwrap().len(), 4);
}
#[test]
fn test_arc_clone_count() {
let data = Arc::new(Mutex::new(0));
assert_eq!(Arc::strong_count(&data), 1);
let clone1 = Arc::clone(&data);
assert_eq!(Arc::strong_count(&data), 2);
drop(clone1);
assert_eq!(Arc::strong_count(&data), 1);
}
}
Deep Comparison
OCaml vs Rust: Arc<Mutex<T>>
Shared Counter Pattern
OCaml
let counter = ref 0
let mutex = Mutex.create ()
let () =
let threads = List.init 10 (fun _ ->
Thread.create (fun () ->
for _ = 1 to 100 do
Mutex.lock mutex;
incr counter;
Mutex.unlock mutex
done) ()
) in
List.iter Thread.join threads;
Printf.printf "Counter = %d\n" !counter
Rust
let counter = Arc::new(Mutex::new(0u64));
let handles: Vec<_> = (0..10).map(|_| {
let c = Arc::clone(&counter);
thread::spawn(move || {
for _ in 0..100 {
*c.lock().unwrap() += 1;
}
})
}).collect();
for h in handles { h.join().unwrap(); }
println!("Counter: {}", *counter.lock().unwrap());
Key Differences
| Feature | OCaml | Rust |
|---|---|---|
| Data + Lock | Separate (ref + Mutex.t) | Unified (Mutex<T> wraps data) |
| Forget to lock | Possible (data accessible without lock) | Impossible (data inside mutex) |
| Lock acquisition | Mutex.lock m | m.lock().unwrap() |
| Unlock | Manual Mutex.unlock m | Automatic (guard drops) |
| Shared ownership | GC | Arc::clone(&arc) |
| Error on held lock | Blocks | Blocks (or try_lock → Err) |
Lock Guard RAII
OCaml
(* Manual unlock required — easy to forget on error path *)
Mutex.lock mutex;
(* do work *)
Mutex.unlock mutex
Rust
{
let mut guard = mutex.lock().unwrap();
*guard += 1;
} // guard dropped → lock released automatically
// Even on panic, Drop runs and releases the lock
Shared Collections
OCaml
let log = ref []
let mutex = Mutex.create ()
let add_log msg =
Mutex.lock mutex;
log := msg :: !log;
Mutex.unlock mutex
Rust
let log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(vec![]));
let log_clone = Arc::clone(&log);
thread::spawn(move || {
log_clone.lock().unwrap().push("message".into());
});
Poisoning (Rust-specific)
// If a thread panics while holding the lock, mutex becomes "poisoned"
let mutex = Arc::new(Mutex::new(0));
let m = Arc::clone(&mutex);
let _ = thread::spawn(move || {
let _guard = m.lock().unwrap();
panic!("boom");
}).join();
// Subsequent locks return Err(PoisonError)
match mutex.lock() {
Ok(guard) => println!("got {}", *guard),
Err(poisoned) => {
// Can still recover the data
let guard = poisoned.into_inner();
println!("recovered: {}", *guard);
}
}
OCaml has no equivalent — exceptions in threads propagate differently.
Exercises
RateLimiter using Arc<Mutex<(u32, Instant)>> tracking (count, window_start). fn check_and_increment(&self) -> bool returns true if the rate limit allows the request, false if exceeded. Test with concurrent threads.BoundedQueue<T> wrapping Arc<Mutex<VecDeque<T>>> with a capacity limit. push returns Err(val) when full; pop returns None when empty. Verify correct behavior with producer/consumer threads.Arc<Mutex<T>> (two threads each holding one lock waiting for the other's lock). Then fix it using lock ordering or try_lock with backoff.