ExamplesBy LevelBy TopicLearning Paths
337 Advanced

337: Async Mutex

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "337: Async Mutex" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Shared mutable state across concurrent threads or tasks requires mutual exclusion. Key difference from OCaml: 1. **Lock guard RAII**: Rust's `lock().unwrap()` returns a `MutexGuard` that unlocks on drop; OCaml requires explicit `Mutex.unlock()`.

Tutorial

The Problem

Shared mutable state across concurrent threads or tasks requires mutual exclusion. std::sync::Mutex<T> provides this but blocks the OS thread when locked. In async contexts, blocking a thread blocks all tasks on that thread — a major performance problem. The correct pattern is to use tokio::sync::Mutex for async code (yields instead of blocks) and std::sync::Mutex only for brief critical sections that never span .await points.

🎯 Learning Outcomes

  • • Use Arc<Mutex<T>> for shared mutable state across synchronous threads
  • • Understand why holding a std::sync::Mutex guard across .await is a deadlock risk
  • • Use Arc<Mutex<T>> with brief lock-and-release for async contexts
  • • Implement thread-safe caches and counters using Mutex
  • Code Example

    let m = Mutex::new(0);
    {
        let mut guard = m.lock().unwrap();
        *guard += 1;
    } // guard drops, lock released

    Key Differences

  • Lock guard RAII: Rust's lock().unwrap() returns a MutexGuard that unlocks on drop; OCaml requires explicit Mutex.unlock().
  • Poisoning: Rust mutexes are "poisoned" if a thread panics while holding the lock — lock() returns Err thereafter; OCaml has no poisoning concept.
  • Async mutex: tokio::sync::Mutex is async-aware — lock().await yields instead of blocking; std::sync::Mutex should not span .await points.
  • RwLock alternative: For read-heavy workloads, RwLock<T> allows multiple concurrent readers and one exclusive writer.
  • OCaml Approach

    OCaml uses Mutex.t from the standard library for threading, and Lwt_mutex.t for async-aware locking:

    let counter = ref 0
    let mutex = Mutex.create ()
    
    let increment () =
      Mutex.lock mutex;
      incr counter;
      Mutex.unlock mutex
    

    OCaml 5's multi-core support uses Mutex from Thread + Domain.

    Full Source

    #![allow(clippy::all)]
    //! # Async Mutex
    //!
    //! Lock shared state safely across async tasks — demonstrates correct patterns
    //! for using `std::sync::Mutex` and avoiding deadlocks across await points.
    
    use std::sync::{Arc, Mutex};
    use std::thread;
    
    /// Demonstrates concurrent increments with a mutex.
    pub fn concurrent_increment(num_threads: usize) -> i32 {
        let counter = Arc::new(Mutex::new(0));
    
        let handles: Vec<_> = (0..num_threads)
            .map(|_| {
                let c = Arc::clone(&counter);
                thread::spawn(move || {
                    *c.lock().unwrap() += 1;
                })
            })
            .collect();
    
        for h in handles {
            h.join().unwrap();
        }
    
        let result = *counter.lock().unwrap();
        result
    }
    
    /// Demonstrates the correct pattern: release lock before doing other work.
    pub fn correct_lock_pattern(data: Vec<i32>) -> i32 {
        let shared = Arc::new(Mutex::new(data));
    
        // CORRECT: compute value inside a scope, guard drops at scope end
        let sum = {
            let guard = shared.lock().unwrap();
            guard.iter().sum::<i32>()
        }; // guard drops here, lock released BEFORE any other work
    
        sum
    }
    
    /// Demonstrates safe read-modify-write pattern.
    pub fn safe_update<F>(mutex: &Mutex<i32>, f: F) -> i32
    where
        F: FnOnce(i32) -> i32,
    {
        let mut guard = mutex.lock().unwrap();
        *guard = f(*guard);
        *guard
    }
    
    /// Demonstrates poison recovery after a panic.
    pub fn with_poison_recovery(mutex: &Mutex<i32>) -> Result<i32, i32> {
        match mutex.lock() {
            Ok(guard) => Ok(*guard),
            Err(poisoned) => {
                // Recover by accessing the data anyway
                let recovered = poisoned.into_inner();
                Err(*recovered)
            }
        }
    }
    
    /// A thread-safe counter using Mutex.
    pub struct Counter {
        value: Mutex<i32>,
    }
    
    impl Counter {
        pub fn new(initial: i32) -> Self {
            Self {
                value: Mutex::new(initial),
            }
        }
    
        pub fn increment(&self) -> i32 {
            let mut guard = self.value.lock().unwrap();
            *guard += 1;
            *guard
        }
    
        pub fn get(&self) -> i32 {
            *self.value.lock().unwrap()
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_concurrent_increment() {
            assert_eq!(concurrent_increment(10), 10);
        }
    
        #[test]
        fn test_high_contention() {
            let counter = Arc::new(Mutex::new(0));
            let handles: Vec<_> = (0..100)
                .map(|_| {
                    let c = Arc::clone(&counter);
                    thread::spawn(move || {
                        *c.lock().unwrap() += 1;
                    })
                })
                .collect();
    
            for h in handles {
                h.join().unwrap();
            }
    
            assert_eq!(*counter.lock().unwrap(), 100);
        }
    
        #[test]
        fn test_correct_lock_pattern() {
            let sum = correct_lock_pattern(vec![1, 2, 3, 4, 5]);
            assert_eq!(sum, 15);
        }
    
        #[test]
        fn test_safe_update() {
            let m = Mutex::new(10);
            let result = safe_update(&m, |x| x * 2);
            assert_eq!(result, 20);
        }
    
        #[test]
        fn test_counter() {
            let counter = Counter::new(0);
            assert_eq!(counter.increment(), 1);
            assert_eq!(counter.increment(), 2);
            assert_eq!(counter.get(), 2);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_concurrent_increment() {
            assert_eq!(concurrent_increment(10), 10);
        }
    
        #[test]
        fn test_high_contention() {
            let counter = Arc::new(Mutex::new(0));
            let handles: Vec<_> = (0..100)
                .map(|_| {
                    let c = Arc::clone(&counter);
                    thread::spawn(move || {
                        *c.lock().unwrap() += 1;
                    })
                })
                .collect();
    
            for h in handles {
                h.join().unwrap();
            }
    
            assert_eq!(*counter.lock().unwrap(), 100);
        }
    
        #[test]
        fn test_correct_lock_pattern() {
            let sum = correct_lock_pattern(vec![1, 2, 3, 4, 5]);
            assert_eq!(sum, 15);
        }
    
        #[test]
        fn test_safe_update() {
            let m = Mutex::new(10);
            let result = safe_update(&m, |x| x * 2);
            assert_eq!(result, 20);
        }
    
        #[test]
        fn test_counter() {
            let counter = Counter::new(0);
            assert_eq!(counter.increment(), 1);
            assert_eq!(counter.increment(), 2);
            assert_eq!(counter.get(), 2);
        }
    }

    Deep Comparison

    OCaml vs Rust: Async Mutex

    Basic Mutex

    OCaml:

    let m = Mutex.create () in
    Mutex.lock m;
    (* critical section *)
    Mutex.unlock m
    

    Rust:

    let m = Mutex::new(0);
    {
        let mut guard = m.lock().unwrap();
        *guard += 1;
    } // guard drops, lock released
    

    Key Differences

    AspectOCamlRust
    Lock/unlockExplicit methodsRAII guard
    PoisonNot possiblePoisonError on panic
    Data associationSeparate from mutexMutex wraps data
    Async versionLwt_mutextokio::sync::Mutex

    Exercises

  • Implement a thread-safe LRU cache using Arc<Mutex<LruCache<K, V>>>.
  • Show the deadlock risk of holding a MutexGuard across .await — demonstrate the issue and the fix.
  • Benchmark Arc<Mutex<T>> vs Arc<RwLock<T>> for a read-heavy workload with 10 readers and 1 writer.
  • Open Source Repos