337: Async Mutex
Tutorial Video
Text description (accessibility)
This video demonstrates the "337: Async Mutex" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Shared mutable state across concurrent threads or tasks requires mutual exclusion. Key difference from OCaml: 1. **Lock guard RAII**: Rust's `lock().unwrap()` returns a `MutexGuard` that unlocks on drop; OCaml requires explicit `Mutex.unlock()`.
Tutorial
The Problem
Shared mutable state across concurrent threads or tasks requires mutual exclusion. std::sync::Mutex<T> provides this but blocks the OS thread when locked. In async contexts, blocking a thread blocks all tasks on that thread — a major performance problem. The correct pattern is to use tokio::sync::Mutex for async code (yields instead of blocks) and std::sync::Mutex only for brief critical sections that never span .await points.
🎯 Learning Outcomes
Arc<Mutex<T>> for shared mutable state across synchronous threadsstd::sync::Mutex guard across .await is a deadlock riskArc<Mutex<T>> with brief lock-and-release for async contextsMutexCode Example
let m = Mutex::new(0);
{
let mut guard = m.lock().unwrap();
*guard += 1;
} // guard drops, lock releasedKey Differences
lock().unwrap() returns a MutexGuard that unlocks on drop; OCaml requires explicit Mutex.unlock().lock() returns Err thereafter; OCaml has no poisoning concept.tokio::sync::Mutex is async-aware — lock().await yields instead of blocking; std::sync::Mutex should not span .await points.RwLock<T> allows multiple concurrent readers and one exclusive writer.OCaml Approach
OCaml uses Mutex.t from the standard library for threading, and Lwt_mutex.t for async-aware locking:
let counter = ref 0
let mutex = Mutex.create ()
let increment () =
Mutex.lock mutex;
incr counter;
Mutex.unlock mutex
OCaml 5's multi-core support uses Mutex from Thread + Domain.
Full Source
#![allow(clippy::all)]
//! # Async Mutex
//!
//! Lock shared state safely across async tasks — demonstrates correct patterns
//! for using `std::sync::Mutex` and avoiding deadlocks across await points.
use std::sync::{Arc, Mutex};
use std::thread;
/// Demonstrates concurrent increments with a mutex.
pub fn concurrent_increment(num_threads: usize) -> i32 {
let counter = Arc::new(Mutex::new(0));
let handles: Vec<_> = (0..num_threads)
.map(|_| {
let c = Arc::clone(&counter);
thread::spawn(move || {
*c.lock().unwrap() += 1;
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let result = *counter.lock().unwrap();
result
}
/// Demonstrates the correct pattern: release lock before doing other work.
pub fn correct_lock_pattern(data: Vec<i32>) -> i32 {
let shared = Arc::new(Mutex::new(data));
// CORRECT: compute value inside a scope, guard drops at scope end
let sum = {
let guard = shared.lock().unwrap();
guard.iter().sum::<i32>()
}; // guard drops here, lock released BEFORE any other work
sum
}
/// Demonstrates safe read-modify-write pattern.
pub fn safe_update<F>(mutex: &Mutex<i32>, f: F) -> i32
where
F: FnOnce(i32) -> i32,
{
let mut guard = mutex.lock().unwrap();
*guard = f(*guard);
*guard
}
/// Demonstrates poison recovery after a panic.
pub fn with_poison_recovery(mutex: &Mutex<i32>) -> Result<i32, i32> {
match mutex.lock() {
Ok(guard) => Ok(*guard),
Err(poisoned) => {
// Recover by accessing the data anyway
let recovered = poisoned.into_inner();
Err(*recovered)
}
}
}
/// A thread-safe counter using Mutex.
pub struct Counter {
value: Mutex<i32>,
}
impl Counter {
pub fn new(initial: i32) -> Self {
Self {
value: Mutex::new(initial),
}
}
pub fn increment(&self) -> i32 {
let mut guard = self.value.lock().unwrap();
*guard += 1;
*guard
}
pub fn get(&self) -> i32 {
*self.value.lock().unwrap()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_concurrent_increment() {
assert_eq!(concurrent_increment(10), 10);
}
#[test]
fn test_high_contention() {
let counter = Arc::new(Mutex::new(0));
let handles: Vec<_> = (0..100)
.map(|_| {
let c = Arc::clone(&counter);
thread::spawn(move || {
*c.lock().unwrap() += 1;
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(*counter.lock().unwrap(), 100);
}
#[test]
fn test_correct_lock_pattern() {
let sum = correct_lock_pattern(vec![1, 2, 3, 4, 5]);
assert_eq!(sum, 15);
}
#[test]
fn test_safe_update() {
let m = Mutex::new(10);
let result = safe_update(&m, |x| x * 2);
assert_eq!(result, 20);
}
#[test]
fn test_counter() {
let counter = Counter::new(0);
assert_eq!(counter.increment(), 1);
assert_eq!(counter.increment(), 2);
assert_eq!(counter.get(), 2);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_concurrent_increment() {
assert_eq!(concurrent_increment(10), 10);
}
#[test]
fn test_high_contention() {
let counter = Arc::new(Mutex::new(0));
let handles: Vec<_> = (0..100)
.map(|_| {
let c = Arc::clone(&counter);
thread::spawn(move || {
*c.lock().unwrap() += 1;
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(*counter.lock().unwrap(), 100);
}
#[test]
fn test_correct_lock_pattern() {
let sum = correct_lock_pattern(vec![1, 2, 3, 4, 5]);
assert_eq!(sum, 15);
}
#[test]
fn test_safe_update() {
let m = Mutex::new(10);
let result = safe_update(&m, |x| x * 2);
assert_eq!(result, 20);
}
#[test]
fn test_counter() {
let counter = Counter::new(0);
assert_eq!(counter.increment(), 1);
assert_eq!(counter.increment(), 2);
assert_eq!(counter.get(), 2);
}
}
Deep Comparison
OCaml vs Rust: Async Mutex
Basic Mutex
OCaml:
let m = Mutex.create () in
Mutex.lock m;
(* critical section *)
Mutex.unlock m
Rust:
let m = Mutex::new(0);
{
let mut guard = m.lock().unwrap();
*guard += 1;
} // guard drops, lock released
Key Differences
| Aspect | OCaml | Rust |
|---|---|---|
| Lock/unlock | Explicit methods | RAII guard |
| Poison | Not possible | PoisonError on panic |
| Data association | Separate from mutex | Mutex wraps data |
| Async version | Lwt_mutex | tokio::sync::Mutex |
Exercises
Arc<Mutex<LruCache<K, V>>>.MutexGuard across .await — demonstrate the issue and the fix.Arc<Mutex<T>> vs Arc<RwLock<T>> for a read-heavy workload with 10 readers and 1 writer.