ExamplesBy LevelBy TopicLearning Paths
341 Intermediate

341: MPSC Channel

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "341: MPSC Channel" functional Rust example. Difficulty level: Intermediate. Key concepts covered: Functional Programming. Threads need to communicate without sharing memory unsafely. Key difference from OCaml: | Aspect | Rust `mpsc` | OCaml `Event` |

Tutorial

The Problem

Threads need to communicate without sharing memory unsafely. The MPSC (multi-producer, single-consumer) channel pattern solves thread communication by providing a typed message queue where many threads can send and one thread receives. Channels originate from Hoare's Communicating Sequential Processes (1978) and Dijkstra's work on process communication — the philosophy: don't communicate by sharing memory; share memory by communicating. Channels make data flow explicit, eliminating the need for locks around communication points and preventing entire classes of race conditions.

🎯 Learning Outcomes

  • • Use std::sync::mpsc::channel() to create unbounded channels
  • • Use mpsc::sync_channel(capacity) for bounded/backpressure channels
  • • Clone Sender<T> to create multiple producers for one receiver
  • • Drop the original sender so rx closes when all senders are gone
  • • Iterate rx.into_iter() to collect all messages until channel closes
  • • Implement fan-in patterns where many threads feed one aggregator
  • Code Example

    let (tx, rx) = mpsc::channel();
    tx.send(value).unwrap();

    Key Differences

    AspectRust mpscOCaml Event
    Producer countMany (Sender is Clone)Many (send to same channel)
    Consumer countOne (Receiver not Clone)One sync'd receive at a time
    Default behaviorAsynchronous (unbounded)Synchronous (rendezvous)
    Bounded variantsync_channel(n)No built-in; use Domainslib
    Type safetyT: Send + 'staticPolymorphic channel 'a Event.channel

    OCaml Approach

    OCaml's Event module provides synchronous channels from CML (Concurrent ML). Event.channel() creates a typed channel; Event.send and Event.receive create events that Event.sync commits:

    let ch = Event.channel () in
    let _ = Thread.create (fun () -> Event.sync (Event.send ch 42)) () in
    let v = Event.sync (Event.receive ch)
    

    For async message passing, Thread+Mutex+Queue combinations are common, or the Domainslib library in OCaml 5 provides Task.async/Task.await with channels between domains.

    Full Source

    #![allow(clippy::all)]
    //! # MPSC Channel
    //!
    //! Multi-producer, single-consumer channel — the standard way to communicate between threads.
    
    use std::sync::mpsc::{self, Sender};
    use std::thread;
    
    /// Creates a fan-in pattern: multiple producers, one consumer.
    pub fn fan_in<T: Send + 'static>(producers: Vec<Box<dyn FnOnce(Sender<T>) + Send>>) -> Vec<T> {
        let (tx, rx) = mpsc::channel();
    
        for producer in producers {
            let tx = tx.clone();
            thread::spawn(move || producer(tx));
        }
    
        drop(tx); // Important: drop original so rx closes when all producers done
        rx.into_iter().collect()
    }
    
    /// Creates a bounded channel that applies backpressure.
    pub fn bounded_producer_consumer(capacity: usize, items: Vec<i32>) -> Vec<i32> {
        let (tx, rx) = mpsc::sync_channel::<i32>(capacity);
    
        let producer = thread::spawn(move || {
            for item in items {
                tx.send(item).unwrap(); // Blocks if buffer full
            }
        });
    
        let results: Vec<_> = rx.into_iter().collect();
        producer.join().unwrap();
        results
    }
    
    /// Demonstrates multiple producers sending to one consumer.
    pub fn multi_producer(num_producers: usize, messages_per_producer: usize) -> Vec<String> {
        let (tx, rx) = mpsc::channel();
    
        for i in 0..num_producers {
            let tx = tx.clone();
            thread::spawn(move || {
                for j in 0..messages_per_producer {
                    tx.send(format!("producer-{}-msg-{}", i, j)).unwrap();
                }
            });
        }
    
        drop(tx);
        rx.into_iter().collect()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_fan_in() {
            let producers: Vec<Box<dyn FnOnce(Sender<i32>) + Send>> = vec![
                Box::new(|tx| {
                    tx.send(1).unwrap();
                    tx.send(2).unwrap();
                }),
                Box::new(|tx| {
                    tx.send(3).unwrap();
                }),
            ];
            let mut results = fan_in(producers);
            results.sort();
            assert_eq!(results, vec![1, 2, 3]);
        }
    
        #[test]
        fn test_bounded_channel() {
            let results = bounded_producer_consumer(2, vec![1, 2, 3, 4, 5]);
            assert_eq!(results, vec![1, 2, 3, 4, 5]);
        }
    
        #[test]
        fn test_multi_producer() {
            let results = multi_producer(3, 2);
            assert_eq!(results.len(), 6);
        }
    
        #[test]
        fn test_channel_closes_when_senders_dropped() {
            let (tx, rx) = mpsc::channel::<i32>();
            drop(tx);
            assert!(rx.recv().is_err());
        }
    
        #[test]
        fn test_sync_channel_blocks() {
            let (tx, rx) = mpsc::sync_channel::<i32>(1);
            tx.send(1).unwrap();
            // Next send would block if we didn't receive
            assert_eq!(rx.recv().unwrap(), 1);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_fan_in() {
            let producers: Vec<Box<dyn FnOnce(Sender<i32>) + Send>> = vec![
                Box::new(|tx| {
                    tx.send(1).unwrap();
                    tx.send(2).unwrap();
                }),
                Box::new(|tx| {
                    tx.send(3).unwrap();
                }),
            ];
            let mut results = fan_in(producers);
            results.sort();
            assert_eq!(results, vec![1, 2, 3]);
        }
    
        #[test]
        fn test_bounded_channel() {
            let results = bounded_producer_consumer(2, vec![1, 2, 3, 4, 5]);
            assert_eq!(results, vec![1, 2, 3, 4, 5]);
        }
    
        #[test]
        fn test_multi_producer() {
            let results = multi_producer(3, 2);
            assert_eq!(results.len(), 6);
        }
    
        #[test]
        fn test_channel_closes_when_senders_dropped() {
            let (tx, rx) = mpsc::channel::<i32>();
            drop(tx);
            assert!(rx.recv().is_err());
        }
    
        #[test]
        fn test_sync_channel_blocks() {
            let (tx, rx) = mpsc::sync_channel::<i32>(1);
            tx.send(1).unwrap();
            // Next send would block if we didn't receive
            assert_eq!(rx.recv().unwrap(), 1);
        }
    }

    Deep Comparison

    OCaml vs Rust: MPSC Channel

    Channel Usage

    OCaml:

    let ch = Event.new_channel () in
    Event.sync (Event.send ch value)
    

    Rust:

    let (tx, rx) = mpsc::channel();
    tx.send(value).unwrap();
    

    Key Differences

    AspectOCamlRust
    Split sender/receiverNoYes (tx, rx)
    Clone senderN/Atx.clone()
    Iterate receiverManual loopfor msg in rx
    Close detectionManualAutomatic on sender drop

    Exercises

  • Aggregator pipeline: Build a pipeline where 4 worker threads each transform a slice of data and send results to an aggregator thread that merges and sorts them.
  • Backpressure demo: Use sync_channel(2) with a slow consumer (thread::sleep) and a fast producer — observe that the producer blocks automatically.
  • Result fan-in: Modify fan_in to handle Result<T, E> messages, collecting successes and errors separately, without panicking on any individual failure.
  • Open Source Repos