ExamplesBy LevelBy TopicLearning Paths
450 Fundamental

450: Crossbeam Channels — Advanced Channel Patterns

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "450: Crossbeam Channels — Advanced Channel Patterns" functional Rust example. Difficulty level: Fundamental. Key concepts covered: Functional Programming. `std::sync::mpsc` provides basic MPSC (multiple producer, single consumer) channels but lacks bounded backpressure, multiple consumers, and select across channels. Key difference from OCaml: 1. **MPMC**: `crossbeam::channel` supports multiple consumers; `std::sync::mpsc` does not (MPSC only).

Tutorial

The Problem

std::sync::mpsc provides basic MPSC (multiple producer, single consumer) channels but lacks bounded backpressure, multiple consumers, and select across channels. crossbeam::channel provides both bounded (crossbeam::channel::bounded(n)) and unbounded channels with MPMC (multiple producer, multiple consumer) semantics. Bounded channels implement backpressure — the producer blocks when the buffer is full — preventing fast producers from overwhelming slow consumers with unbounded memory growth.

Crossbeam channels are used in high-performance task queues, trading systems, event brokers, and any pipeline where multiple workers process from the same channel.

🎯 Learning Outcomes

  • • Understand bounded channels: sync_channel(n) blocks sender when buffer is full
  • • Learn the backpressure mechanism: slow consumers throttle fast producers
  • • See how mpsc::sync_channel simulates crossbeam bounded channels
  • • Understand MPMC vs. MPSC: multiple consumers enable worker pool patterns
  • • Learn how channel capacity affects latency, throughput, and memory usage
  • Code Example

    let (tx, rx) = mpsc::sync_channel::<u32>(capacity);
    
    // Send blocks when full
    tx.send(value).unwrap();
    
    // Receive blocks when empty
    let value = rx.recv().unwrap();

    Key Differences

  • MPMC: crossbeam::channel supports multiple consumers; std::sync::mpsc does not (MPSC only).
  • Select: crossbeam::select! enables waiting on multiple channels; std::sync::mpsc has no select.
  • Bounded semantics: Both mpsc::sync_channel and crossbeam::bounded block the sender when full; they differ in MPMC support.
  • Performance: crossbeam::channel uses more sophisticated lock-free algorithms vs. mpsc's mutex-based bounded channel.
  • OCaml Approach

    OCaml's Event module provides synchronous (zero-buffer) channels — the send always blocks until a receiver is ready. Domainslib.Chan.make_bounded n creates bounded channels for OCaml 5.x domains. Async.Pipe and Lwt_stream provide bounded buffered streams for async OCaml. The backpressure semantics are similar but integrated with their respective runtimes.

    Full Source

    #![allow(clippy::all)]
    //! # Crossbeam Channels — Advanced Channel Patterns
    //!
    //! Demonstrates bounded channels and multi-consumer patterns
    //! that crossbeam-channel provides, implemented with std.
    
    use std::sync::mpsc::{self, Receiver, SyncSender};
    use std::sync::{Arc, Mutex};
    use std::thread;
    use std::time::Duration;
    
    /// Approach 1: Bounded (sync) channel
    ///
    /// Sender blocks when buffer is full, providing backpressure.
    pub fn bounded_channel_demo(capacity: usize, num_messages: usize) -> Vec<u32> {
        let (tx, rx) = mpsc::sync_channel::<u32>(capacity);
    
        let producer = thread::spawn(move || {
            for i in 0..num_messages as u32 {
                tx.send(i).unwrap();
            }
        });
    
        let consumer = thread::spawn(move || {
            let mut received = Vec::new();
            for msg in rx {
                received.push(msg);
            }
            received
        });
    
        producer.join().unwrap();
        consumer.join().unwrap()
    }
    
    /// Approach 2: Multi-consumer pattern with Arc<Mutex<Receiver>>
    pub fn multi_consumer_demo(num_consumers: usize, num_messages: usize) -> Vec<u32> {
        let (tx, rx) = mpsc::channel::<u32>();
        let rx = Arc::new(Mutex::new(rx));
        let results = Arc::new(Mutex::new(Vec::new()));
    
        let consumers: Vec<_> = (0..num_consumers)
            .map(|_id| {
                let rx = Arc::clone(&rx);
                let results = Arc::clone(&results);
                thread::spawn(move || loop {
                    let msg = rx.lock().unwrap().recv();
                    match msg {
                        Ok(v) => results.lock().unwrap().push(v),
                        Err(_) => break,
                    }
                })
            })
            .collect();
    
        // Send messages
        for i in 0..num_messages as u32 {
            tx.send(i).unwrap();
        }
        drop(tx); // Close channel
    
        for c in consumers {
            c.join().unwrap();
        }
    
        let mut result = Arc::try_unwrap(results).unwrap().into_inner().unwrap();
        result.sort();
        result
    }
    
    /// Approach 3: try_send for non-blocking send
    pub fn try_send_demo(capacity: usize) -> (usize, usize) {
        let (tx, rx) = mpsc::sync_channel::<i32>(capacity);
        let mut sent = 0;
        let mut failed = 0;
    
        for i in 0..capacity as i32 + 5 {
            match tx.try_send(i) {
                Ok(()) => sent += 1,
                Err(_) => failed += 1,
            }
        }
    
        // Drain to prevent deadlock
        drop(tx);
        for _ in rx {}
    
        (sent, failed)
    }
    
    /// Producer-consumer with timeout
    pub fn recv_timeout_demo(timeout_ms: u64) -> Option<i32> {
        let (tx, rx) = mpsc::sync_channel::<i32>(1);
    
        // Delayed sender
        let _ = thread::spawn(move || {
            thread::sleep(Duration::from_millis(timeout_ms * 2));
            let _ = tx.send(42);
        });
    
        rx.recv_timeout(Duration::from_millis(timeout_ms)).ok()
    }
    
    /// Work distribution pattern
    pub fn work_distribution(num_workers: usize, jobs: Vec<i32>) -> Vec<i32> {
        let (tx, rx) = mpsc::sync_channel::<i32>(num_workers * 2);
        let rx = Arc::new(Mutex::new(rx));
        let results = Arc::new(Mutex::new(Vec::new()));
    
        // Spawn workers
        let workers: Vec<_> = (0..num_workers)
            .map(|_| {
                let rx = Arc::clone(&rx);
                let results = Arc::clone(&results);
                thread::spawn(move || loop {
                    match rx.lock().unwrap().recv() {
                        Ok(job) => {
                            let result = job * job; // Process job
                            results.lock().unwrap().push(result);
                        }
                        Err(_) => break,
                    }
                })
            })
            .collect();
    
        // Send jobs
        for job in jobs {
            tx.send(job).unwrap();
        }
        drop(tx);
    
        // Wait for workers
        for w in workers {
            w.join().unwrap();
        }
    
        let mut r = Arc::try_unwrap(results).unwrap().into_inner().unwrap();
        r.sort();
        r
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_bounded_channel() {
            let results = bounded_channel_demo(3, 10);
            assert_eq!(results, (0..10).collect::<Vec<_>>());
        }
    
        #[test]
        fn test_sync_channel_blocks_when_full() {
            let (tx, rx) = mpsc::sync_channel::<u32>(2);
    
            tx.send(1).unwrap();
            tx.send(2).unwrap();
    
            // Buffer full, try_send should fail
            assert!(tx.try_send(3).is_err());
    
            // After receiving, we can send again
            assert_eq!(rx.recv().unwrap(), 1);
            assert!(tx.try_send(3).is_ok());
        }
    
        #[test]
        fn test_multi_consumer() {
            let results = multi_consumer_demo(3, 9);
            assert_eq!(results, (0..9).collect::<Vec<_>>());
        }
    
        #[test]
        fn test_try_send() {
            let (sent, failed) = try_send_demo(3);
            assert_eq!(sent, 3);
            assert_eq!(failed, 5);
        }
    
        #[test]
        fn test_recv_timeout() {
            let result = recv_timeout_demo(10);
            assert!(result.is_none()); // Timeout before message
        }
    
        #[test]
        fn test_work_distribution() {
            let jobs: Vec<i32> = (1..=5).collect();
            let results = work_distribution(2, jobs);
            assert_eq!(results, vec![1, 4, 9, 16, 25]);
        }
    
        #[test]
        fn test_empty_work_distribution() {
            let results = work_distribution(2, vec![]);
            assert!(results.is_empty());
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_bounded_channel() {
            let results = bounded_channel_demo(3, 10);
            assert_eq!(results, (0..10).collect::<Vec<_>>());
        }
    
        #[test]
        fn test_sync_channel_blocks_when_full() {
            let (tx, rx) = mpsc::sync_channel::<u32>(2);
    
            tx.send(1).unwrap();
            tx.send(2).unwrap();
    
            // Buffer full, try_send should fail
            assert!(tx.try_send(3).is_err());
    
            // After receiving, we can send again
            assert_eq!(rx.recv().unwrap(), 1);
            assert!(tx.try_send(3).is_ok());
        }
    
        #[test]
        fn test_multi_consumer() {
            let results = multi_consumer_demo(3, 9);
            assert_eq!(results, (0..9).collect::<Vec<_>>());
        }
    
        #[test]
        fn test_try_send() {
            let (sent, failed) = try_send_demo(3);
            assert_eq!(sent, 3);
            assert_eq!(failed, 5);
        }
    
        #[test]
        fn test_recv_timeout() {
            let result = recv_timeout_demo(10);
            assert!(result.is_none()); // Timeout before message
        }
    
        #[test]
        fn test_work_distribution() {
            let jobs: Vec<i32> = (1..=5).collect();
            let results = work_distribution(2, jobs);
            assert_eq!(results, vec![1, 4, 9, 16, 25]);
        }
    
        #[test]
        fn test_empty_work_distribution() {
            let results = work_distribution(2, vec![]);
            assert!(results.is_empty());
        }
    }

    Deep Comparison

    OCaml vs Rust: Bounded Channels

    Bounded Channel Creation

    OCaml (Manual implementation)

    let make_bounded capacity =
      let queue = Queue.create () in
      let mutex = Mutex.create () in
      let not_full = Condition.create () in
      let not_empty = Condition.create () in
      
      let send v =
        Mutex.lock mutex;
        while Queue.length queue >= capacity do
          Condition.wait not_full mutex
        done;
        Queue.push v queue;
        Condition.signal not_empty;
        Mutex.unlock mutex
      in
      
      let recv () =
        Mutex.lock mutex;
        while Queue.is_empty queue do
          Condition.wait not_empty mutex
        done;
        let v = Queue.pop queue in
        Condition.signal not_full;
        Mutex.unlock mutex;
        v
      in
      (send, recv)
    

    Rust

    let (tx, rx) = mpsc::sync_channel::<u32>(capacity);
    
    // Send blocks when full
    tx.send(value).unwrap();
    
    // Receive blocks when empty
    let value = rx.recv().unwrap();
    

    Key Differences

    FeatureOCamlRust
    Bounded channelManualsync_channel(cap)
    BackpressureManual CondvarBuilt-in blocking
    Try operationsManualtry_send, try_recv
    Multi-consumerShared queueArc<Mutex<Receiver>>

    Multi-Consumer Pattern

    OCaml

    (* Same queue shared by multiple threads *)
    let consumers = Array.init num_consumers (fun id ->
      Thread.create (fun () ->
        (* Each thread calls recv on shared channel *)
      ) ()
    )
    

    Rust

    let (tx, rx) = mpsc::channel::<u32>();
    let rx = Arc::new(Mutex::new(rx));  // Wrap for sharing
    
    let consumers: Vec<_> = (0..num_consumers).map(|_| {
        let rx = Arc::clone(&rx);
        thread::spawn(move || loop {
            match rx.lock().unwrap().recv() {
                Ok(v) => process(v),
                Err(_) => break,  // Channel closed
            }
        })
    }).collect();
    

    Non-blocking Operations

    Rust

    // Try to send without blocking
    match tx.try_send(value) {
        Ok(()) => println!("sent"),
        Err(TrySendError::Full(v)) => println!("channel full"),
        Err(TrySendError::Disconnected(v)) => println!("closed"),
    }
    
    // Try to receive without blocking
    match rx.try_recv() {
        Ok(v) => println!("got {}", v),
        Err(TryRecvError::Empty) => println!("nothing yet"),
        Err(TryRecvError::Disconnected) => println!("closed"),
    }
    

    With Crossbeam (Real library)

    use crossbeam_channel::{bounded, select};
    
    let (tx, rx) = bounded::<i32>(10);
    
    // Select from multiple channels
    select! {
        recv(rx1) -> msg => println!("from rx1: {:?}", msg),
        recv(rx2) -> msg => println!("from rx2: {:?}", msg),
        default => println!("nothing ready"),
    }
    

    Exercises

  • Worker pool with bounded channel: Create a bounded work queue (bounded(100)) and spawn 4 worker threads, all consuming from the same channel. Verify that all work items are processed exactly once.
  • Backpressure test: Measure how bounded vs. unbounded channels affect peak memory usage when a producer sends 1M large messages and a consumer processes one every millisecond.
  • Timeout send: Using crossbeam::channel or a custom wrapper, implement try_send_timeout(val, timeout) that attempts to send but returns Err(val) if the channel doesn't have capacity within the timeout.
  • Open Source Repos