ExamplesBy LevelBy TopicLearning Paths
983 Fundamental

983 Channel Basics

Functional Programming

Tutorial

The Problem

Introduce Rust's std::sync::mpsc (Multi-Producer Single Consumer) channels for message-passing between threads. Implement a single-producer/consumer pair, a multi-producer/single-consumer pattern using tx.clone(), and a bounded channel using mpsc::sync_channel. Channels enforce ownership transfer — the sender gives up ownership of each sent value.

🎯 Learning Outcomes

  • • Create (Sender<T>, Receiver<T>) pairs with mpsc::channel()
  • • Spawn a producer thread with move closure that owns the Sender end
  • • Consume messages with rx.iter() which loops until all senders are dropped
  • • Clone Sender for multiple producers: let tx2 = tx.clone()
  • • Use mpsc::sync_channel(capacity) for a bounded channel with backpressure
  • Code Example

    #![allow(clippy::all)]
    // 983: MPSC Channel Basics
    // Rust: std::sync::mpsc — Multiple Producer, Single Consumer
    
    use std::sync::mpsc;
    use std::thread;
    
    // --- Approach 1: Single producer, single consumer ---
    fn single_producer_consumer() -> Vec<i32> {
        let (tx, rx) = mpsc::channel::<i32>();
    
        let producer = thread::spawn(move || {
            for i in 1..=5 {
                tx.send(i).unwrap();
            }
            // tx drops here — channel closes
        });
    
        // Collect until channel is closed
        let results: Vec<i32> = rx.iter().collect();
        producer.join().unwrap();
        results
    }
    
    // --- Approach 2: Multiple producers (clone the sender) ---
    fn multi_producer_consumer() -> Vec<i32> {
        let (tx, rx) = mpsc::channel::<i32>();
    
        let handles: Vec<_> = (0..3)
            .map(|batch| {
                let tx = tx.clone(); // each producer gets its own sender
                thread::spawn(move || {
                    let start = batch * 10 + 1;
                    for i in start..=start + 2 {
                        tx.send(i).unwrap();
                    }
                    // tx drops when thread exits
                })
            })
            .collect();
    
        drop(tx); // drop original so channel closes when all clones drop
    
        let mut results: Vec<i32> = rx.iter().collect();
        for h in handles {
            h.join().unwrap();
        }
        results.sort();
        results
    }
    
    // --- Approach 3: Producer sends typed messages ---
    #[derive(Debug, PartialEq)]
    enum WorkItem {
        Task(String),
        Done,
    }
    
    fn typed_channel() -> Vec<String> {
        let (tx, rx) = mpsc::channel::<WorkItem>();
    
        let producer = thread::spawn(move || {
            for name in ["alpha", "beta", "gamma"] {
                tx.send(WorkItem::Task(name.to_string())).unwrap();
            }
            tx.send(WorkItem::Done).unwrap();
        });
    
        let mut results = Vec::new();
        loop {
            match rx.recv().unwrap() {
                WorkItem::Task(s) => results.push(s),
                WorkItem::Done => break,
            }
        }
        producer.join().unwrap();
        results
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_single_producer() {
            assert_eq!(single_producer_consumer(), vec![1, 2, 3, 4, 5]);
        }
    
        #[test]
        fn test_multi_producer() {
            let results = multi_producer_consumer();
            assert_eq!(results.len(), 9);
            // Contains items from all 3 batches
            assert!(results.contains(&1));
            assert!(results.contains(&11));
            assert!(results.contains(&21));
        }
    
        #[test]
        fn test_typed_channel() {
            let results = typed_channel();
            assert_eq!(results, vec!["alpha", "beta", "gamma"]);
        }
    
        #[test]
        fn test_channel_closes_on_drop() {
            let (tx, rx) = mpsc::channel::<i32>();
            drop(tx); // immediately close
            assert!(rx.recv().is_err()); // disconnected
        }
    
        #[test]
        fn test_recv_blocks_until_send() {
            let (tx, rx) = mpsc::channel();
            let h = thread::spawn(move || {
                thread::sleep(std::time::Duration::from_millis(1));
                tx.send(42).unwrap();
            });
            assert_eq!(rx.recv().unwrap(), 42);
            h.join().unwrap();
        }
    }

    Key Differences

    AspectRustOCaml
    Channel typempsc — multi-producer, single-consumerEvent.channel — synchronous rendezvous
    BufferUnbounded (channel) or bounded (sync_channel)Zero-buffer (synchronous)
    Multiple senderstx.clone() — reference counted SenderEvent.channel is already shared
    Channel closeAll senders droppedNo built-in close; use sentinel value
    Ownership transferMoved into channelShared via GC

    mpsc channels are a safe, efficient alternative to shared-memory concurrency. They enforce a clear ownership model: each value has exactly one owner at any time, moving from producer to consumer via the channel.

    OCaml Approach

    (* OCaml: Event module (stdlib) for synchronous channels *)
    let ch = Event.new_channel ()
    
    let producer () =
      List.iter (fun i ->
        Event.sync (Event.send ch i)
      ) [1;2;3;4;5]
    
    let consumer () =
      let rec loop acc =
        let v = Event.sync (Event.receive ch) in
        loop (v :: acc)
      in
      (* OCaml Event is synchronous — no buffer *)
      loop []
    
    (* Practical: use Lwt_stream or Domainslib.Chan for async/parallel *)
    let (stream, push) = Lwt_stream.create ()
    let push_items () =
      List.iter (fun i -> push (Some i)) [1;2;3;4;5];
      push None
    

    OCaml's standard Event module provides synchronous channels (rendezvous — no buffer). For buffered async channels, Lwt_stream or Domainslib.Chan are the practical choices.

    Full Source

    #![allow(clippy::all)]
    // 983: MPSC Channel Basics
    // Rust: std::sync::mpsc — Multiple Producer, Single Consumer
    
    use std::sync::mpsc;
    use std::thread;
    
    // --- Approach 1: Single producer, single consumer ---
    fn single_producer_consumer() -> Vec<i32> {
        let (tx, rx) = mpsc::channel::<i32>();
    
        let producer = thread::spawn(move || {
            for i in 1..=5 {
                tx.send(i).unwrap();
            }
            // tx drops here — channel closes
        });
    
        // Collect until channel is closed
        let results: Vec<i32> = rx.iter().collect();
        producer.join().unwrap();
        results
    }
    
    // --- Approach 2: Multiple producers (clone the sender) ---
    fn multi_producer_consumer() -> Vec<i32> {
        let (tx, rx) = mpsc::channel::<i32>();
    
        let handles: Vec<_> = (0..3)
            .map(|batch| {
                let tx = tx.clone(); // each producer gets its own sender
                thread::spawn(move || {
                    let start = batch * 10 + 1;
                    for i in start..=start + 2 {
                        tx.send(i).unwrap();
                    }
                    // tx drops when thread exits
                })
            })
            .collect();
    
        drop(tx); // drop original so channel closes when all clones drop
    
        let mut results: Vec<i32> = rx.iter().collect();
        for h in handles {
            h.join().unwrap();
        }
        results.sort();
        results
    }
    
    // --- Approach 3: Producer sends typed messages ---
    #[derive(Debug, PartialEq)]
    enum WorkItem {
        Task(String),
        Done,
    }
    
    fn typed_channel() -> Vec<String> {
        let (tx, rx) = mpsc::channel::<WorkItem>();
    
        let producer = thread::spawn(move || {
            for name in ["alpha", "beta", "gamma"] {
                tx.send(WorkItem::Task(name.to_string())).unwrap();
            }
            tx.send(WorkItem::Done).unwrap();
        });
    
        let mut results = Vec::new();
        loop {
            match rx.recv().unwrap() {
                WorkItem::Task(s) => results.push(s),
                WorkItem::Done => break,
            }
        }
        producer.join().unwrap();
        results
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_single_producer() {
            assert_eq!(single_producer_consumer(), vec![1, 2, 3, 4, 5]);
        }
    
        #[test]
        fn test_multi_producer() {
            let results = multi_producer_consumer();
            assert_eq!(results.len(), 9);
            // Contains items from all 3 batches
            assert!(results.contains(&1));
            assert!(results.contains(&11));
            assert!(results.contains(&21));
        }
    
        #[test]
        fn test_typed_channel() {
            let results = typed_channel();
            assert_eq!(results, vec!["alpha", "beta", "gamma"]);
        }
    
        #[test]
        fn test_channel_closes_on_drop() {
            let (tx, rx) = mpsc::channel::<i32>();
            drop(tx); // immediately close
            assert!(rx.recv().is_err()); // disconnected
        }
    
        #[test]
        fn test_recv_blocks_until_send() {
            let (tx, rx) = mpsc::channel();
            let h = thread::spawn(move || {
                thread::sleep(std::time::Duration::from_millis(1));
                tx.send(42).unwrap();
            });
            assert_eq!(rx.recv().unwrap(), 42);
            h.join().unwrap();
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_single_producer() {
            assert_eq!(single_producer_consumer(), vec![1, 2, 3, 4, 5]);
        }
    
        #[test]
        fn test_multi_producer() {
            let results = multi_producer_consumer();
            assert_eq!(results.len(), 9);
            // Contains items from all 3 batches
            assert!(results.contains(&1));
            assert!(results.contains(&11));
            assert!(results.contains(&21));
        }
    
        #[test]
        fn test_typed_channel() {
            let results = typed_channel();
            assert_eq!(results, vec!["alpha", "beta", "gamma"]);
        }
    
        #[test]
        fn test_channel_closes_on_drop() {
            let (tx, rx) = mpsc::channel::<i32>();
            drop(tx); // immediately close
            assert!(rx.recv().is_err()); // disconnected
        }
    
        #[test]
        fn test_recv_blocks_until_send() {
            let (tx, rx) = mpsc::channel();
            let h = thread::spawn(move || {
                thread::sleep(std::time::Duration::from_millis(1));
                tx.send(42).unwrap();
            });
            assert_eq!(rx.recv().unwrap(), 42);
            h.join().unwrap();
        }
    }

    Deep Comparison

    MPSC Channel Basics — Comparison

    Core Insight

    Channels are the functional alternative to shared mutable state: send immutable values between threads instead of sharing pointers. Both OCaml and Rust use typed channels, but Rust's MPSC is part of std while OCaml needs the Thread + Event or Thread + Queue + Mutex pattern.

    OCaml Approach

  • Event.channel () creates a synchronous channel (rendezvous semantics)
  • Event.sync (Event.send c v) blocks until receiver is ready
  • Thread + Queue + Mutex for asynchronous buffered channels
  • • No built-in MPSC — must implement with Mutex + Queue + Condition
  • • Type-safe: channels are parameterized by message type
  • Rust Approach

  • mpsc::channel() creates an unbounded asynchronous channel
  • mpsc::sync_channel(n) creates a bounded channel (blocks on full)
  • • Multiple producers via tx.clone() — all senders share one receiver
  • • Channel closes automatically when all Senders are dropped
  • rx.iter() is idiomatic for "drain until closed"
  • Comparison Table

    ConceptOCamlRust
    Create channelEvent.channel () / Queue+Mutexmpsc::channel()
    Send messageEvent.sync (Event.send c v)tx.send(v).unwrap()
    Receive messageEvent.sync (Event.receive c)rx.recv().unwrap()
    Multiple producersMultiple threads with shared mutextx.clone() per producer
    Close channelGC when last ref droppedDrop all Senders
    Bounded bufferManual ring buffermpsc::sync_channel(n)
    Drain all messagesLoop until done signalrx.iter().collect()

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Implement a producer that sends Option<T> and a consumer that stops on None — a sentinel-based close signal.
  • Use mpsc::sync_channel(8) to implement bounded backpressure and observe how the producer blocks when the buffer is full.
  • Implement a fan-out: one producer, multiple consumers each with their own Receiver (requires cloning messages with Arc<T>).
  • Implement a pipeline with three stages connected by two channels: Stage1 -> chan1 -> Stage2 -> chan2 -> Stage3.
  • Benchmark mpsc throughput vs Mutex<VecDeque<T>> for 1,000,000 messages.
  • Open Source Repos