ExamplesBy LevelBy TopicLearning Paths
444 Advanced

444: `Arc<RwLock<T>>` — Multiple Readers, One Writer

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "444: `Arc<RwLock<T>>` — Multiple Readers, One Writer" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. `Arc<Mutex<T>>` allows only one thread at a time — even for reads. Key difference from OCaml: 1. **Type enforcement**: Rust's `RwLock::read()` returns `RwLockReadGuard` which provides only immutable access; OCaml's rwlock doesn't prevent mutation through read locks.

Tutorial

The Problem

Arc<Mutex<T>> allows only one thread at a time — even for reads. For read-heavy workloads where many threads query shared state but writes are rare (configuration, routing tables, cached data), this is unnecessary serialization. RwLock<T> differentiates: any number of threads can hold a read() lock simultaneously, but a write() lock requires exclusive access. Combined with Arc, this enables high-concurrency reads with safe infrequent writes — the standard pattern for shared caches and configuration.

Arc<RwLock<T>> patterns appear in database connection pools, HTTP router tables, in-memory caches, feature flag systems, and any pattern with 95%+ read traffic.

🎯 Learning Outcomes

  • • Understand the RwLock contract: N concurrent readers OR 1 exclusive writer
  • • Learn how data.read().unwrap() acquires a shared read guard
  • • See how data.write().unwrap() acquires an exclusive write guard
  • • Understand when RwLock outperforms Mutex (read-heavy) and when it doesn't (write-heavy)
  • • Learn the writer starvation risk: frequent readers can indefinitely delay writers on some platforms
  • Code Example

    let cfg: Arc<RwLock<HashMap<&str, &str>>> = 
        Arc::new(RwLock::new(HashMap::new()));
    
    // Multiple readers — simultaneous, no blocking
    let guard = cfg.read().unwrap();
    let value = guard.get("host");
    
    // Exclusive write
    let mut guard = cfg.write().unwrap();
    guard.insert("host", "example.com");

    Key Differences

  • Type enforcement: Rust's RwLock::read() returns RwLockReadGuard which provides only immutable access; OCaml's rwlock doesn't prevent mutation through read locks.
  • Poisoning: Like Mutex, Rust's RwLock poisons on writer panic; OCaml has no poisoning.
  • Write starvation: Rust's std::sync::RwLock is platform-dependent and may starve writers; parking_lot::RwLock from the parking_lot crate provides fairer scheduling.
  • Performance: RwLock has higher overhead than Mutex per operation; gains only appear when concurrent reads significantly outnumber writes.
  • OCaml Approach

    OCaml 5.x uses Rwlock.t (a readers-writer lock) from the Thread module. OCaml 4.x's GIL makes Rwlock unnecessary since threads can't run in parallel anyway. The Core_kernel library provides Readers_writer_lock with similar semantics. OCaml doesn't enforce the read/write discipline through types — you can modify data through a read lock if the reference is mutable.

    Full Source

    #![allow(clippy::all)]
    //! # Arc<RwLock<T>> — Multiple Readers, One Writer
    //!
    //! Allow many threads to read shared data simultaneously, while guaranteeing
    //! exclusive access for writes.
    
    use std::collections::HashMap;
    use std::sync::{Arc, RwLock};
    use std::thread;
    use std::time::Duration;
    
    /// Approach 1: Shared configuration map
    pub struct SharedConfig {
        data: Arc<RwLock<HashMap<String, String>>>,
    }
    
    impl SharedConfig {
        pub fn new() -> Self {
            Self {
                data: Arc::new(RwLock::new(HashMap::new())),
            }
        }
    
        pub fn get(&self, key: &str) -> Option<String> {
            self.data.read().unwrap().get(key).cloned()
        }
    
        pub fn set(&self, key: String, value: String) {
            self.data.write().unwrap().insert(key, value);
        }
    
        pub fn clone_handle(&self) -> Arc<RwLock<HashMap<String, String>>> {
            Arc::clone(&self.data)
        }
    }
    
    impl Default for SharedConfig {
        fn default() -> Self {
            Self::new()
        }
    }
    
    /// Approach 2: Read-heavy workload simulation
    pub fn simulate_read_heavy(
        num_readers: usize,
        reads_per_thread: usize,
        write_delay_ms: u64,
    ) -> (usize, String) {
        let data = Arc::new(RwLock::new(String::from("initial")));
        let read_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
    
        let readers: Vec<_> = (0..num_readers)
            .map(|_| {
                let d = Arc::clone(&data);
                let count = Arc::clone(&read_count);
                thread::spawn(move || {
                    for _ in 0..reads_per_thread {
                        let guard = d.read().unwrap();
                        let _ = guard.len();
                        count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                        drop(guard);
                        thread::sleep(Duration::from_micros(10));
                    }
                })
            })
            .collect();
    
        let writer = {
            let d = Arc::clone(&data);
            thread::spawn(move || {
                thread::sleep(Duration::from_millis(write_delay_ms));
                *d.write().unwrap() = String::from("updated");
            })
        };
    
        for r in readers {
            r.join().unwrap();
        }
        writer.join().unwrap();
    
        let final_reads = read_count.load(std::sync::atomic::Ordering::Relaxed);
        let final_value = data.read().unwrap().clone();
        (final_reads, final_value)
    }
    
    /// Approach 3: Cache with concurrent reads
    pub struct ReadCache<K, V> {
        cache: Arc<RwLock<HashMap<K, V>>>,
    }
    
    impl<K, V> ReadCache<K, V>
    where
        K: std::hash::Hash + Eq + Clone,
        V: Clone,
    {
        pub fn new() -> Self {
            Self {
                cache: Arc::new(RwLock::new(HashMap::new())),
            }
        }
    
        pub fn get(&self, key: &K) -> Option<V> {
            self.cache.read().unwrap().get(key).cloned()
        }
    
        pub fn insert(&self, key: K, value: V) {
            self.cache.write().unwrap().insert(key, value);
        }
    
        pub fn len(&self) -> usize {
            self.cache.read().unwrap().len()
        }
    
        pub fn is_empty(&self) -> bool {
            self.len() == 0
        }
    
        pub fn clone_inner(&self) -> Arc<RwLock<HashMap<K, V>>> {
            Arc::clone(&self.cache)
        }
    }
    
    impl<K, V> Default for ReadCache<K, V>
    where
        K: std::hash::Hash + Eq + Clone,
        V: Clone,
    {
        fn default() -> Self {
            Self::new()
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_concurrent_reads() {
            let d = Arc::new(RwLock::new(vec![1, 2, 3]));
            let handles: Vec<_> = (0..4)
                .map(|_| {
                    let d = Arc::clone(&d);
                    thread::spawn(move || d.read().unwrap().iter().sum::<i32>())
                })
                .collect();
    
            for h in handles {
                assert_eq!(h.join().unwrap(), 6);
            }
        }
    
        #[test]
        fn test_write_then_read() {
            let d = RwLock::new(0u32);
            *d.write().unwrap() = 42;
            assert_eq!(*d.read().unwrap(), 42);
        }
    
        #[test]
        fn test_shared_config() {
            let config = SharedConfig::new();
            config.set("host".into(), "localhost".into());
            config.set("port".into(), "8080".into());
    
            assert_eq!(config.get("host"), Some("localhost".into()));
            assert_eq!(config.get("port"), Some("8080".into()));
            assert_eq!(config.get("missing"), None);
        }
    
        #[test]
        fn test_shared_config_concurrent() {
            let config = SharedConfig::new();
            config.set("key".into(), "value".into());
            let handle = config.clone_handle();
    
            thread::scope(|s| {
                for _ in 0..4 {
                    let h = Arc::clone(&handle);
                    s.spawn(move || {
                        let guard = h.read().unwrap();
                        assert_eq!(guard.get("key"), Some(&String::from("value")));
                    });
                }
            });
        }
    
        #[test]
        fn test_read_heavy_simulation() {
            let (reads, final_value) = simulate_read_heavy(4, 10, 5);
            assert_eq!(reads, 40);
            assert_eq!(final_value, "updated");
        }
    
        #[test]
        fn test_read_cache() {
            let cache: ReadCache<String, i32> = ReadCache::new();
            cache.insert("one".into(), 1);
            cache.insert("two".into(), 2);
    
            assert_eq!(cache.get(&"one".into()), Some(1));
            assert_eq!(cache.get(&"three".into()), None);
            assert_eq!(cache.len(), 2);
        }
    
        #[test]
        fn test_try_read_write() {
            let lock = RwLock::new(0);
    
            // Can get multiple read guards
            let r1 = lock.read().unwrap();
            let r2 = lock.try_read();
            assert!(r2.is_ok());
            drop(r1);
            drop(r2);
    
            // Write blocks reads
            let _w = lock.write().unwrap();
            assert!(lock.try_read().is_err());
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_concurrent_reads() {
            let d = Arc::new(RwLock::new(vec![1, 2, 3]));
            let handles: Vec<_> = (0..4)
                .map(|_| {
                    let d = Arc::clone(&d);
                    thread::spawn(move || d.read().unwrap().iter().sum::<i32>())
                })
                .collect();
    
            for h in handles {
                assert_eq!(h.join().unwrap(), 6);
            }
        }
    
        #[test]
        fn test_write_then_read() {
            let d = RwLock::new(0u32);
            *d.write().unwrap() = 42;
            assert_eq!(*d.read().unwrap(), 42);
        }
    
        #[test]
        fn test_shared_config() {
            let config = SharedConfig::new();
            config.set("host".into(), "localhost".into());
            config.set("port".into(), "8080".into());
    
            assert_eq!(config.get("host"), Some("localhost".into()));
            assert_eq!(config.get("port"), Some("8080".into()));
            assert_eq!(config.get("missing"), None);
        }
    
        #[test]
        fn test_shared_config_concurrent() {
            let config = SharedConfig::new();
            config.set("key".into(), "value".into());
            let handle = config.clone_handle();
    
            thread::scope(|s| {
                for _ in 0..4 {
                    let h = Arc::clone(&handle);
                    s.spawn(move || {
                        let guard = h.read().unwrap();
                        assert_eq!(guard.get("key"), Some(&String::from("value")));
                    });
                }
            });
        }
    
        #[test]
        fn test_read_heavy_simulation() {
            let (reads, final_value) = simulate_read_heavy(4, 10, 5);
            assert_eq!(reads, 40);
            assert_eq!(final_value, "updated");
        }
    
        #[test]
        fn test_read_cache() {
            let cache: ReadCache<String, i32> = ReadCache::new();
            cache.insert("one".into(), 1);
            cache.insert("two".into(), 2);
    
            assert_eq!(cache.get(&"one".into()), Some(1));
            assert_eq!(cache.get(&"three".into()), None);
            assert_eq!(cache.len(), 2);
        }
    
        #[test]
        fn test_try_read_write() {
            let lock = RwLock::new(0);
    
            // Can get multiple read guards
            let r1 = lock.read().unwrap();
            let r2 = lock.try_read();
            assert!(r2.is_ok());
            drop(r1);
            drop(r2);
    
            // Write blocks reads
            let _w = lock.write().unwrap();
            assert!(lock.try_read().is_err());
        }
    }

    Deep Comparison

    OCaml vs Rust: RwLock Pattern

    Read-Write Lock Semantics

    OCaml (no native RwLock — uses Mutex)

    let config = ref [("host","localhost")]
    let mutex = Mutex.create ()
    
    let read_config k =
      Mutex.lock mutex;
      let v = List.assoc_opt k !config in
      Mutex.unlock mutex; v
    
    let write_config k v =
      Mutex.lock mutex;
      config := (k,v) :: List.filter (fun (a,_) -> a<>k) !config;
      Mutex.unlock mutex
    

    Rust

    let cfg: Arc<RwLock<HashMap<&str, &str>>> = 
        Arc::new(RwLock::new(HashMap::new()));
    
    // Multiple readers — simultaneous, no blocking
    let guard = cfg.read().unwrap();
    let value = guard.get("host");
    
    // Exclusive write
    let mut guard = cfg.write().unwrap();
    guard.insert("host", "example.com");
    

    Key Differences

    FeatureOCamlRust
    RwLock availableNo (stdlib)Yes (std::sync::RwLock)
    Multiple readersBlocked (Mutex only)Concurrent (shared guard)
    Guard typesSingle typeRwLockReadGuard / RwLockWriteGuard
    UnlockManualAutomatic (RAII)

    Concurrent Readers

    OCaml

    (* All readers serialize on the single mutex *)
    let readers = List.init 4 (fun _ ->
      Thread.create (fun () ->
        Mutex.lock mutex;  (* blocks even for read-only *)
        let _ = List.assoc "host" !config in
        Mutex.unlock mutex
      ) ()
    )
    

    Rust

    // All readers run concurrently — no blocking
    let readers: Vec<_> = (0..4).map(|_| {
        let c = Arc::clone(&cfg);
        thread::spawn(move || {
            let guard = c.read().unwrap();  // shared — many OK
            let _ = guard.get("host");
        })
    }).collect();
    

    Writer Priority

    Rust

    // Writer waits for all current readers to release
    let writer = thread::spawn(move || {
        let mut guard = cfg.write().unwrap();  // blocks until readers done
        guard.insert("host", "newhost");
    });
    // Once writer has lock, new readers block
    

    When to Use RwLock vs Mutex

    Use CaseRecommendation
    Reads >> WritesRwLock — parallel reads
    Balanced read/writeMutex — simpler, less overhead
    Short critical sectionsMutex — RwLock overhead not worth it
    Long reads, rare writesRwLock — maximizes read throughput

    Exercises

  • Metrics registry: Build a MetricsRegistry using Arc<RwLock<HashMap<String, f64>>> where record(name, value) updates a metric and snapshot() -> HashMap<String, f64> returns a clone of all metrics. Spawn 16 reader threads and 2 writer threads, verifying no data races.
  • Cache with invalidation: Implement a Cache<K, V> wrapping Arc<RwLock<HashMap<K, V>>> with get, set, and invalidate_all methods. Show that invalidate_all temporarily blocks readers while it clears, and readers can proceed in parallel after.
  • Read-through cache: Build a cache that on miss acquires a write lock, checks again (to handle thundering herd), then computes and stores the value. Test with many concurrent misses to the same key, verifying the computation runs exactly once.
  • Open Source Repos