ExamplesBy LevelBy TopicLearning Paths
328 Intermediate

328: Async Channels (mpsc)

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "328: Async Channels (mpsc)" functional Rust example. Difficulty level: Intermediate. Key concepts covered: Functional Programming. Shared mutable state across threads requires locks — error-prone and contended. Key difference from OCaml: 1. **MPSC vs MPMC**: Rust's standard `mpsc` is multi

Tutorial

The Problem

Shared mutable state across threads requires locks — error-prone and contended. Channels provide an alternative: communicate via message passing rather than sharing state. Go popularized "don't communicate by sharing memory; share memory by communicating." Rust's std::sync::mpsc (multi-producer, single-consumer) channels implement this pattern with type-safe, backpressure-supporting message queues. This is the foundation for actor-based and pipeline architectures.

🎯 Learning Outcomes

  • • Use mpsc::channel() for unbounded and mpsc::sync_channel() for bounded channels
  • • Implement fan-in: multiple producers sending to one consumer
  • • Handle channel closure: Sender drop signals end of stream to Receiver
  • • Understand backpressure: bounded channels block producers when the buffer is full
  • Code Example

    fn producer(tx: mpsc::Sender<String>, label: &'static str, n: usize) {
        thread::spawn(move || {
            for i in 1..=n {
                tx.send(format!("{label}-{i}")).unwrap();
            }
        });
    }

    Key Differences

  • MPSC vs MPMC: Rust's standard mpsc is multi-producer single-consumer; Tokio's broadcast and watch channels provide multi-consumer.
  • Backpressure: sync_channel(capacity) blocks the sender when full; unbounded channel() never blocks but may allocate unboundedly.
  • Closure detection: The receiver detects channel closure when all senders are dropped — recv() returns Err or into_iter() terminates.
  • async channels: tokio::sync::mpsc is the async-aware version — send().await yields instead of blocking the thread.
  • OCaml Approach

    OCaml uses Event channels or the Domainslib library for channel-based concurrency. Lwt provides Lwt_stream for async streams:

    (* Lwt_stream: lazy push-based stream *)
    let (stream, push) = Lwt_stream.create ()
    let () = push (Some "message1"); push (Some "message2"); push None
    let* messages = Lwt_stream.to_list stream
    

    Full Source

    #![allow(clippy::all)]
    //! # Async Channels (mpsc)
    //!
    //! Multi-producer, single-consumer channels let multiple tasks send messages
    //! to one receiver — the safe, idiomatic way to communicate between concurrent workers.
    
    use std::sync::mpsc::{self, Receiver, Sender};
    use std::thread;
    use std::time::Duration;
    
    /// Creates a producer that sends labeled messages through a channel.
    pub fn create_producer(
        tx: Sender<String>,
        label: &'static str,
        count: usize,
        delay_ms: u64,
    ) -> thread::JoinHandle<()> {
        thread::spawn(move || {
            for i in 1..=count {
                thread::sleep(Duration::from_millis(delay_ms));
                tx.send(format!("{}-{}", label, i)).unwrap();
            }
        })
    }
    
    /// Collects all messages from multiple producers.
    pub fn collect_messages(rx: Receiver<String>) -> Vec<String> {
        rx.into_iter().collect()
    }
    
    /// Creates a bounded channel simulation with a buffer.
    pub struct BoundedChannel<T> {
        tx: Sender<T>,
        rx: Option<Receiver<T>>,
    }
    
    impl<T> BoundedChannel<T> {
        pub fn new() -> Self {
            let (tx, rx) = mpsc::channel();
            Self { tx, rx: Some(rx) }
        }
    
        pub fn sender(&self) -> Sender<T> {
            self.tx.clone()
        }
    
        pub fn take_receiver(&mut self) -> Option<Receiver<T>> {
            self.rx.take()
        }
    }
    
    impl<T> Default for BoundedChannel<T> {
        fn default() -> Self {
            Self::new()
        }
    }
    
    /// Fan-in pattern: multiple sources, one collector.
    pub fn fan_in<T: Send + 'static>(sources: Vec<Box<dyn FnOnce(Sender<T>) + Send>>) -> Vec<T> {
        let (tx, rx) = mpsc::channel();
    
        for source in sources {
            let tx = tx.clone();
            thread::spawn(move || source(tx));
        }
    
        drop(tx); // Drop original sender
        rx.into_iter().collect()
    }
    
    /// Worker pool pattern: distribute work, collect results.
    pub fn worker_pool<T, R, F>(items: Vec<T>, workers: usize, f: F) -> Vec<R>
    where
        T: Send + 'static,
        R: Send + 'static,
        F: Fn(T) -> R + Send + Sync + Clone + 'static,
    {
        let (work_tx, work_rx) = mpsc::channel::<T>();
        let (result_tx, result_rx) = mpsc::channel::<R>();
        let work_rx = std::sync::Arc::new(std::sync::Mutex::new(work_rx));
    
        // Spawn workers
        let handles: Vec<_> = (0..workers)
            .map(|_| {
                let work_rx = work_rx.clone();
                let result_tx = result_tx.clone();
                let f = f.clone();
                thread::spawn(move || {
                    while let Ok(item) = work_rx.lock().unwrap().recv() {
                        let _ = result_tx.send(f(item));
                    }
                })
            })
            .collect();
    
        // Send work
        for item in items {
            work_tx.send(item).unwrap();
        }
        drop(work_tx);
    
        // Wait for workers to finish
        drop(result_tx);
        for h in handles {
            h.join().unwrap();
        }
    
        result_rx.into_iter().collect()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_producer_sends_all_messages() {
            let (tx, rx) = mpsc::channel();
            let handle = create_producer(tx, "test", 3, 1);
            handle.join().unwrap();
            let msgs: Vec<_> = rx.into_iter().collect();
            assert_eq!(msgs.len(), 3);
        }
    
        #[test]
        fn test_multiple_producers() {
            let (tx, rx) = mpsc::channel();
            let h1 = create_producer(tx.clone(), "A", 2, 1);
            let h2 = create_producer(tx, "B", 2, 1);
            h1.join().unwrap();
            h2.join().unwrap();
            let msgs: Vec<_> = rx.into_iter().collect();
            assert_eq!(msgs.len(), 4);
        }
    
        #[test]
        fn test_channel_closes_on_sender_drop() {
            let (tx, rx) = mpsc::channel::<i32>();
            drop(tx);
            assert!(rx.recv().is_err());
        }
    
        #[test]
        fn test_fan_in() {
            let sources: Vec<Box<dyn FnOnce(Sender<i32>) + Send>> = vec![
                Box::new(|tx| {
                    tx.send(1).unwrap();
                    tx.send(2).unwrap();
                }),
                Box::new(|tx| {
                    tx.send(3).unwrap();
                }),
            ];
            let mut results = fan_in(sources);
            results.sort();
            assert_eq!(results, vec![1, 2, 3]);
        }
    
        #[test]
        fn test_worker_pool() {
            let items = vec![1, 2, 3, 4, 5];
            let mut results = worker_pool(items, 2, |x| x * 2);
            results.sort();
            assert_eq!(results, vec![2, 4, 6, 8, 10]);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_producer_sends_all_messages() {
            let (tx, rx) = mpsc::channel();
            let handle = create_producer(tx, "test", 3, 1);
            handle.join().unwrap();
            let msgs: Vec<_> = rx.into_iter().collect();
            assert_eq!(msgs.len(), 3);
        }
    
        #[test]
        fn test_multiple_producers() {
            let (tx, rx) = mpsc::channel();
            let h1 = create_producer(tx.clone(), "A", 2, 1);
            let h2 = create_producer(tx, "B", 2, 1);
            h1.join().unwrap();
            h2.join().unwrap();
            let msgs: Vec<_> = rx.into_iter().collect();
            assert_eq!(msgs.len(), 4);
        }
    
        #[test]
        fn test_channel_closes_on_sender_drop() {
            let (tx, rx) = mpsc::channel::<i32>();
            drop(tx);
            assert!(rx.recv().is_err());
        }
    
        #[test]
        fn test_fan_in() {
            let sources: Vec<Box<dyn FnOnce(Sender<i32>) + Send>> = vec![
                Box::new(|tx| {
                    tx.send(1).unwrap();
                    tx.send(2).unwrap();
                }),
                Box::new(|tx| {
                    tx.send(3).unwrap();
                }),
            ];
            let mut results = fan_in(sources);
            results.sort();
            assert_eq!(results, vec![1, 2, 3]);
        }
    
        #[test]
        fn test_worker_pool() {
            let items = vec![1, 2, 3, 4, 5];
            let mut results = worker_pool(items, 2, |x| x * 2);
            results.sort();
            assert_eq!(results, vec![2, 4, 6, 8, 10]);
        }
    }

    Deep Comparison

    OCaml vs Rust: Async Channels

    Producer Pattern

    OCaml:

    let prod label n =
      Thread.create (fun () ->
        for i = 1 to n do
          Event.sync (Event.send ch (Printf.sprintf "%s-%d" label i))
        done
      ) ()
    

    Rust:

    fn producer(tx: mpsc::Sender<String>, label: &'static str, n: usize) {
        thread::spawn(move || {
            for i in 1..=n {
                tx.send(format!("{label}-{i}")).unwrap();
            }
        });
    }
    

    Key Differences

    AspectOCamlRust
    Channel creationEvent.new_channel ()mpsc::channel()
    Send operationEvent.sync (Event.send ch x)tx.send(x)
    Clone senderN/Atx.clone()
    Close signalManualAll senders dropped
    Error handlingExceptionsResult type

    Exercises

  • Implement a pipeline with three stages connected by channels: generator → transformer → aggregator.
  • Use a bounded sync_channel to implement backpressure: the producer should slow down when the consumer can't keep up.
  • Implement a work-stealing queue where multiple workers receive tasks from a shared channel and report results back to a collector.
  • Open Source Repos