ExamplesBy LevelBy TopicLearning Paths
445 Intermediate

445: MPSC Channels — Message Passing Between Threads

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "445: MPSC Channels — Message Passing Between Threads" functional Rust example. Difficulty level: Intermediate. Key concepts covered: Functional Programming. Shared mutable state with locks is error-prone: deadlocks, priority inversion, and complex lock ordering. Key difference from OCaml: 1. **MPSC vs. MPMC**: Rust's `std::sync::mpsc` is multiple

Tutorial

The Problem

Shared mutable state with locks is error-prone: deadlocks, priority inversion, and complex lock ordering. The alternative is message passing: threads communicate by sending values through channels, with no shared state. std::sync::mpsc (Multiple Producer, Single Consumer) provides channels for this pattern. Producers send messages; the consumer receives them. When all senders drop, the receiver's iteration automatically ends — a natural shutdown mechanism.

MPSC channels power the actor model, pipeline processing, result aggregation from worker threads, and the "channel as work queue" pattern used in thread pools.

🎯 Learning Outcomes

  • • Understand the MPSC channel contract: multiple senders, one receiver, bounded or unbounded
  • • Learn how tx.clone() creates additional senders for multiple producer threads
  • • See how drop(tx) signals shutdown — the channel closes when all senders drop
  • • Understand rx.iter() for collecting all messages until channel close
  • • Learn the relationship between mpsc and Go's channels (Go's are MPMC with select)
  • Code Example

    let (tx, rx) = mpsc::channel::<String>();
    
    // Send
    tx.send("message".into()).unwrap();
    
    // Receive (blocking)
    let msg = rx.recv().unwrap();

    Key Differences

  • MPSC vs. MPMC: Rust's std::sync::mpsc is multiple-producer, single-consumer; Go's channels are MPMC. For MPMC in Rust, use crossbeam::channel.
  • Shutdown signal: Rust channels close when all senders drop — automatic shutdown; OCaml requires explicit sentinel values or condition variables.
  • Bounded vs. unbounded: mpsc::channel() is unbounded (back-pressure requires explicit management); mpsc::sync_channel(n) creates bounded channels.
  • Select: Rust's mpsc has no select; crossbeam::channel + crossbeam::select! enable multi-channel receive.
  • OCaml Approach

    OCaml's Event module provides synchronous channels: let ch = Event.new_channel(), Event.sync (Event.send ch v) blocks until a receiver is ready. The Thread_safe_queue from Core provides asynchronous buffered queues. OCaml 5.x's Domainslib.Chan provides a task pool with channels. Unlike Rust's mpsc, OCaml's built-in channel primitives are more primitive and require more assembly for complex patterns.

    Full Source

    #![allow(clippy::all)]
    //! # MPSC Channels — Message Passing Between Threads
    //!
    //! Send values across threads with `std::sync::mpsc` — multiple producers,
    //! one consumer, with automatic shutdown when all senders drop.
    
    use std::sync::mpsc::{self, Receiver, Sender};
    use std::thread;
    use std::time::Duration;
    
    /// Approach 1: Multiple producers, single consumer
    pub fn multi_producer_single_consumer(
        num_producers: usize,
        msgs_per_producer: usize,
    ) -> Vec<String> {
        let (tx, rx) = mpsc::channel::<String>();
    
        let handles: Vec<_> = (0..num_producers)
            .map(|id| {
                let tx = tx.clone();
                thread::spawn(move || {
                    for i in 0..msgs_per_producer {
                        tx.send(format!("p{}-msg{}", id, i)).unwrap();
                    }
                })
            })
            .collect();
    
        drop(tx); // Important: drop original sender
    
        // Collect all messages
        let messages: Vec<String> = rx.iter().collect();
    
        for h in handles {
            h.join().unwrap();
        }
    
        messages
    }
    
    /// Approach 2: Bounded channel using sync_channel
    pub fn bounded_channel_demo(buffer_size: usize, num_msgs: usize) -> Vec<i32> {
        let (tx, rx) = mpsc::sync_channel::<i32>(buffer_size);
    
        let producer = thread::spawn(move || {
            for i in 0..num_msgs as i32 {
                tx.send(i).unwrap();
            }
        });
    
        let consumer = thread::spawn(move || {
            let mut results = Vec::new();
            for msg in rx {
                results.push(msg);
            }
            results
        });
    
        producer.join().unwrap();
        consumer.join().unwrap()
    }
    
    /// Approach 3: Non-blocking try_recv and try_iter
    pub fn non_blocking_receive(msgs: Vec<i32>) -> Vec<i32> {
        let (tx, rx) = mpsc::channel();
    
        for msg in msgs {
            tx.send(msg).unwrap();
        }
        drop(tx);
    
        // Non-blocking collect
        rx.try_iter().collect()
    }
    
    /// Approach 4: Timeout-based receive
    pub fn receive_with_timeout(timeout_ms: u64) -> Option<i32> {
        let (tx, rx) = mpsc::channel();
    
        let sender = thread::spawn(move || {
            thread::sleep(Duration::from_millis(timeout_ms * 2));
            let _ = tx.send(42);
        });
    
        let result = rx.recv_timeout(Duration::from_millis(timeout_ms)).ok();
    
        sender.join().unwrap();
        result
    }
    
    /// Producer-consumer pattern with work items
    pub struct WorkQueue<T> {
        sender: Sender<T>,
    }
    
    impl<T> WorkQueue<T> {
        pub fn new() -> (Self, Receiver<T>) {
            let (sender, receiver) = mpsc::channel();
            (Self { sender }, receiver)
        }
    
        pub fn send(&self, item: T) -> Result<(), mpsc::SendError<T>> {
            self.sender.send(item)
        }
    
        pub fn clone_sender(&self) -> Sender<T> {
            self.sender.clone()
        }
    }
    
    impl<T> Default for WorkQueue<T> {
        fn default() -> Self {
            Self::new().0
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_send_recv_basic() {
            let (tx, rx) = mpsc::channel();
            tx.send(42u32).unwrap();
            assert_eq!(rx.recv().unwrap(), 42);
        }
    
        #[test]
        fn test_channel_closed() {
            let (tx, rx) = mpsc::channel::<i32>();
            drop(tx);
            assert!(rx.recv().is_err());
        }
    
        #[test]
        fn test_multiple_producers() {
            let (tx, rx) = mpsc::channel::<u32>();
            let handles: Vec<_> = (0..4)
                .map(|i| {
                    let tx = tx.clone();
                    thread::spawn(move || tx.send(i).unwrap())
                })
                .collect();
    
            drop(tx);
    
            let mut results: Vec<u32> = rx.iter().collect();
            results.sort();
            assert_eq!(results, vec![0, 1, 2, 3]);
    
            for h in handles {
                h.join().unwrap();
            }
        }
    
        #[test]
        fn test_multi_producer_consumer() {
            let messages = multi_producer_single_consumer(3, 5);
            assert_eq!(messages.len(), 15);
        }
    
        #[test]
        fn test_bounded_channel() {
            let results = bounded_channel_demo(2, 10);
            assert_eq!(results, (0..10).collect::<Vec<i32>>());
        }
    
        #[test]
        fn test_non_blocking() {
            let input = vec![1, 2, 3, 4, 5];
            let output = non_blocking_receive(input.clone());
            assert_eq!(output, input);
        }
    
        #[test]
        fn test_try_recv_empty() {
            let (_tx, rx) = mpsc::channel::<i32>();
            assert!(rx.try_recv().is_err());
        }
    
        #[test]
        fn test_recv_timeout() {
            let result = receive_with_timeout(10);
            assert!(result.is_none()); // Timeout before message arrives
        }
    
        #[test]
        fn test_work_queue() {
            let (queue, rx) = WorkQueue::<i32>::new();
    
            queue.send(1).unwrap();
            queue.send(2).unwrap();
            queue.send(3).unwrap();
    
            let tx2 = queue.clone_sender();
            tx2.send(4).unwrap();
    
            drop(queue);
            drop(tx2);
    
            let results: Vec<i32> = rx.iter().collect();
            assert_eq!(results, vec![1, 2, 3, 4]);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_send_recv_basic() {
            let (tx, rx) = mpsc::channel();
            tx.send(42u32).unwrap();
            assert_eq!(rx.recv().unwrap(), 42);
        }
    
        #[test]
        fn test_channel_closed() {
            let (tx, rx) = mpsc::channel::<i32>();
            drop(tx);
            assert!(rx.recv().is_err());
        }
    
        #[test]
        fn test_multiple_producers() {
            let (tx, rx) = mpsc::channel::<u32>();
            let handles: Vec<_> = (0..4)
                .map(|i| {
                    let tx = tx.clone();
                    thread::spawn(move || tx.send(i).unwrap())
                })
                .collect();
    
            drop(tx);
    
            let mut results: Vec<u32> = rx.iter().collect();
            results.sort();
            assert_eq!(results, vec![0, 1, 2, 3]);
    
            for h in handles {
                h.join().unwrap();
            }
        }
    
        #[test]
        fn test_multi_producer_consumer() {
            let messages = multi_producer_single_consumer(3, 5);
            assert_eq!(messages.len(), 15);
        }
    
        #[test]
        fn test_bounded_channel() {
            let results = bounded_channel_demo(2, 10);
            assert_eq!(results, (0..10).collect::<Vec<i32>>());
        }
    
        #[test]
        fn test_non_blocking() {
            let input = vec![1, 2, 3, 4, 5];
            let output = non_blocking_receive(input.clone());
            assert_eq!(output, input);
        }
    
        #[test]
        fn test_try_recv_empty() {
            let (_tx, rx) = mpsc::channel::<i32>();
            assert!(rx.try_recv().is_err());
        }
    
        #[test]
        fn test_recv_timeout() {
            let result = receive_with_timeout(10);
            assert!(result.is_none()); // Timeout before message arrives
        }
    
        #[test]
        fn test_work_queue() {
            let (queue, rx) = WorkQueue::<i32>::new();
    
            queue.send(1).unwrap();
            queue.send(2).unwrap();
            queue.send(3).unwrap();
    
            let tx2 = queue.clone_sender();
            tx2.send(4).unwrap();
    
            drop(queue);
            drop(tx2);
    
            let results: Vec<i32> = rx.iter().collect();
            assert_eq!(results, vec![1, 2, 3, 4]);
        }
    }

    Deep Comparison

    OCaml vs Rust: MPSC Channels

    Channel Creation

    OCaml (Manual with Queue + Mutex + Condition)

    let queue = Queue.create ()
    let mutex = Mutex.create ()
    let cond = Condition.create ()
    
    let send v =
      Mutex.lock mutex;
      Queue.push v queue;
      Condition.signal cond;
      Mutex.unlock mutex
    
    let recv () =
      Mutex.lock mutex;
      while Queue.is_empty queue do
        Condition.wait cond mutex
      done;
      let v = Queue.pop queue in
      Mutex.unlock mutex;
      v
    

    Rust

    let (tx, rx) = mpsc::channel::<String>();
    
    // Send
    tx.send("message".into()).unwrap();
    
    // Receive (blocking)
    let msg = rx.recv().unwrap();
    

    Multiple Producers

    OCaml

    (* Same send function works from multiple threads *)
    let producers = List.init 3 (fun id ->
      Thread.create (fun () ->
        for i = 1 to 5 do
          send (Printf.sprintf "p%d-msg%d" id i)
        done
      ) ()
    )
    

    Rust

    let handles: Vec<_> = (0..3).map(|id| {
        let tx = tx.clone();  // Clone the sender
        thread::spawn(move || {
            for i in 0..5 {
                tx.send(format!("p{}-msg{}", id, i)).unwrap();
            }
        })
    }).collect();
    
    drop(tx);  // Drop original to close channel
    

    Key Differences

    FeatureOCamlRust
    Built-in channelNo (manual)Yes (std::sync::mpsc)
    Sender cloningSame functiontx.clone()
    Channel closeSentinel valueDrop all senders
    Shutdown signalManualAutomatic (recv()Err)
    Bounded channelManual size checksync_channel(size)

    Consumer Iteration

    OCaml

    (* Must know message count or use sentinel *)
    let consumer = Thread.create (fun () ->
      for _ = 1 to 15 do
        Printf.printf "got: %s\n%!" (recv ())
      done
    ) ()
    

    Rust

    // Iterate until channel closes
    for msg in rx {
        println!("got: {}", msg);
    }
    // Loop exits when all senders drop
    

    Non-blocking Operations

    OCaml

    (* Manual try with immediate check *)
    let try_recv () =
      Mutex.lock mutex;
      let result =
        if Queue.is_empty queue then None
        else Some (Queue.pop queue)
      in
      Mutex.unlock mutex;
      result
    

    Rust

    // try_recv returns immediately
    match rx.try_recv() {
        Ok(msg) => println!("got {}", msg),
        Err(TryRecvError::Empty) => println!("no message"),
        Err(TryRecvError::Disconnected) => println!("closed"),
    }
    
    // try_iter drains all available
    let all: Vec<_> = rx.try_iter().collect();
    

    Timeout Receive (Rust-specific)

    match rx.recv_timeout(Duration::from_secs(1)) {
        Ok(msg) => process(msg),
        Err(RecvTimeoutError::Timeout) => handle_timeout(),
        Err(RecvTimeoutError::Disconnected) => break,
    }
    

    Exercises

  • Pipeline with channels: Build a three-stage pipeline: stage 1 produces numbers 1-1000, stage 2 squares them, stage 3 filters evens. Connect stages with mpsc channels. Verify the final output.
  • Fan-out and fan-in: Create a work distributor: one sender distributes work items to N workers via N channels. Each worker sends results back via a single results channel. Verify all items are processed.
  • Backpressure with sync_channel: Replace mpsc::channel() with mpsc::sync_channel(10). Producer threads will block when the buffer is full. Verify the producer is throttled when the consumer is slow (add a thread::sleep in the consumer).
  • Open Source Repos