ExamplesBy LevelBy TopicLearning Paths
451 Fundamental

451: Crossbeam Select — Multiplexing Channels

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "451: Crossbeam Select — Multiplexing Channels" functional Rust example. Difficulty level: Fundamental. Key concepts covered: Functional Programming. A thread watching multiple channels needs to respond to whichever has data first, without blocking on one while the other has messages. Key difference from OCaml: 1. **Blocking vs. polling**: `crossbeam::select!` blocks without CPU waste; the std polling approach wastes CPU on the spin loop.

Tutorial

The Problem

A thread watching multiple channels needs to respond to whichever has data first, without blocking on one while the other has messages. Go's select statement solves this natively. Rust's std::sync::mpsc has no select mechanism — you'd need to poll with try_recv in a loop, wasting CPU. crossbeam::select! provides efficient blocking select across multiple channels: the calling thread blocks until any channel has a message, then executes the matching arm.

Channel select appears in event-driven systems, timeouts combined with work channels, message routing, and any pattern where a thread must respond to multiple event sources.

🎯 Learning Outcomes

  • • Understand why channel select is needed (avoid blocked on wrong channel)
  • • Learn how crossbeam::select! blocks efficiently until any channel is ready
  • • See the polling approach as the naive alternative with busy-wait overhead
  • • Understand how to combine select with timeouts using a timeout channel
  • • Learn the use cases: event loops, control channels, multi-source aggregation
  • Code Example

    loop {
        if let Ok(v) = rx1.try_recv() { return First(v); }
        if let Ok(v) = rx2.try_recv() { return Second(v); }
        thread::sleep(Duration::from_millis(1));
    }

    Key Differences

  • Blocking vs. polling: crossbeam::select! blocks without CPU waste; the std polling approach wastes CPU on the spin loop.
  • Fairness: crossbeam::select! handles fairness for multiple ready channels; the polling approach picks the first channel it checks.
  • Timeout integration: crossbeam::select! with crossbeam_channel::after(duration) adds timeout cleanly; polling needs explicit Instant::now() tracking.
  • Go comparison: Go's select is built into the language; Rust requires the crossbeam crate or tokio::select! for async code.
  • OCaml Approach

    OCaml's Event module has Event.select [Event.receive ch1; Event.receive ch2] for channel-level select — blocking until any event is ready. Async.choose and Lwt.pick provide async-style select. Domainslib.Chan.recv_poll enables non-blocking attempts in OCaml 5.x. Event-driven OCaml programming uses these primitives to multiplex across I/O, timers, and inter-domain communication.

    Full Source

    #![allow(clippy::all)]
    //! # Crossbeam Select — Multiplexing Multiple Channels
    //!
    //! Wait on multiple channels simultaneously, receiving from whichever
    //! is ready first. Implemented here with std polling to show the concept.
    
    use std::sync::mpsc::{self, Receiver, TryRecvError};
    use std::thread;
    use std::time::Duration;
    
    /// Approach 1: Poll-based select with try_recv
    pub fn poll_select<T, U>(
        rx1: &Receiver<T>,
        rx2: &Receiver<U>,
        timeout: Duration,
    ) -> SelectResult<T, U> {
        let start = std::time::Instant::now();
        let poll_interval = Duration::from_millis(1);
    
        loop {
            // Try first channel
            match rx1.try_recv() {
                Ok(v) => return SelectResult::First(v),
                Err(TryRecvError::Disconnected) => return SelectResult::Closed,
                Err(TryRecvError::Empty) => {}
            }
    
            // Try second channel
            match rx2.try_recv() {
                Ok(v) => return SelectResult::Second(v),
                Err(TryRecvError::Disconnected) => return SelectResult::Closed,
                Err(TryRecvError::Empty) => {}
            }
    
            // Check timeout
            if start.elapsed() >= timeout {
                return SelectResult::Timeout;
            }
    
            thread::sleep(poll_interval);
        }
    }
    
    #[derive(Debug, PartialEq)]
    pub enum SelectResult<T, U> {
        First(T),
        Second(U),
        Timeout,
        Closed,
    }
    
    /// Approach 2: Priority select (prefer first channel)
    pub fn priority_select<T: Clone>(
        primary: &Receiver<T>,
        secondary: &Receiver<T>,
    ) -> Option<(T, bool)> {
        // Always try primary first
        if let Ok(v) = primary.try_recv() {
            return Some((v, true));
        }
    
        if let Ok(v) = secondary.try_recv() {
            return Some((v, false));
        }
    
        None
    }
    
    /// Approach 3: Drain all available messages
    pub fn drain_all<T>(receivers: &[Receiver<T>]) -> Vec<T> {
        let mut results = Vec::new();
    
        for rx in receivers {
            while let Ok(v) = rx.try_recv() {
                results.push(v);
            }
        }
    
        results
    }
    
    /// Approach 4: Select with stop signal
    pub fn select_with_stop<T>(data_rx: Receiver<T>, stop_rx: Receiver<()>, timeout: Duration) -> Vec<T>
    where
        T: Send + 'static,
    {
        let mut results = Vec::new();
        let start = std::time::Instant::now();
    
        loop {
            // Check stop signal
            if stop_rx.try_recv().is_ok() {
                break;
            }
    
            // Check data
            if let Ok(v) = data_rx.try_recv() {
                results.push(v);
                continue;
            }
    
            // Check timeout
            if start.elapsed() >= timeout {
                break;
            }
    
            thread::sleep(Duration::from_millis(1));
        }
    
        results
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_poll_select_first() {
            let (tx1, rx1) = mpsc::channel::<i32>();
            let (_tx2, rx2) = mpsc::channel::<String>();
    
            tx1.send(42).unwrap();
    
            let result = poll_select(&rx1, &rx2, Duration::from_millis(100));
            assert_eq!(result, SelectResult::First(42));
        }
    
        #[test]
        fn test_poll_select_second() {
            let (_tx1, rx1) = mpsc::channel::<i32>();
            let (tx2, rx2) = mpsc::channel::<String>();
    
            tx2.send("hello".into()).unwrap();
    
            let result = poll_select(&rx1, &rx2, Duration::from_millis(100));
            assert_eq!(result, SelectResult::Second("hello".into()));
        }
    
        #[test]
        fn test_poll_select_timeout() {
            let (_tx1, rx1) = mpsc::channel::<i32>();
            let (_tx2, rx2) = mpsc::channel::<String>();
    
            let result = poll_select(&rx1, &rx2, Duration::from_millis(10));
            assert_eq!(result, SelectResult::Timeout);
        }
    
        #[test]
        fn test_try_recv_empty() {
            let (_tx, rx) = mpsc::channel::<u32>();
            assert!(rx.try_recv().is_err());
        }
    
        #[test]
        fn test_try_recv_available() {
            let (tx, rx) = mpsc::channel::<u32>();
            tx.send(1).unwrap();
            assert_eq!(rx.try_recv().unwrap(), 1);
        }
    
        #[test]
        fn test_priority_select() {
            let (tx1, rx1) = mpsc::channel();
            let (tx2, rx2) = mpsc::channel();
    
            tx1.send(1).unwrap();
            tx2.send(2).unwrap();
    
            // Primary has priority
            let result = priority_select(&rx1, &rx2);
            assert_eq!(result, Some((1, true)));
    
            // Now secondary
            let result = priority_select(&rx1, &rx2);
            assert_eq!(result, Some((2, false)));
        }
    
        #[test]
        fn test_drain_all() {
            let (tx1, rx1) = mpsc::channel();
            let (tx2, rx2) = mpsc::channel();
    
            tx1.send(1).unwrap();
            tx1.send(2).unwrap();
            tx2.send(3).unwrap();
    
            let results = drain_all(&[rx1, rx2]);
            assert_eq!(results, vec![1, 2, 3]);
        }
    
        #[test]
        fn test_select_with_stop() {
            let (data_tx, data_rx) = mpsc::channel();
            let (stop_tx, stop_rx) = mpsc::channel();
    
            data_tx.send(1).unwrap();
            data_tx.send(2).unwrap();
            stop_tx.send(()).unwrap();
    
            let results = select_with_stop(data_rx, stop_rx, Duration::from_secs(1));
            assert!(results.len() <= 2);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_poll_select_first() {
            let (tx1, rx1) = mpsc::channel::<i32>();
            let (_tx2, rx2) = mpsc::channel::<String>();
    
            tx1.send(42).unwrap();
    
            let result = poll_select(&rx1, &rx2, Duration::from_millis(100));
            assert_eq!(result, SelectResult::First(42));
        }
    
        #[test]
        fn test_poll_select_second() {
            let (_tx1, rx1) = mpsc::channel::<i32>();
            let (tx2, rx2) = mpsc::channel::<String>();
    
            tx2.send("hello".into()).unwrap();
    
            let result = poll_select(&rx1, &rx2, Duration::from_millis(100));
            assert_eq!(result, SelectResult::Second("hello".into()));
        }
    
        #[test]
        fn test_poll_select_timeout() {
            let (_tx1, rx1) = mpsc::channel::<i32>();
            let (_tx2, rx2) = mpsc::channel::<String>();
    
            let result = poll_select(&rx1, &rx2, Duration::from_millis(10));
            assert_eq!(result, SelectResult::Timeout);
        }
    
        #[test]
        fn test_try_recv_empty() {
            let (_tx, rx) = mpsc::channel::<u32>();
            assert!(rx.try_recv().is_err());
        }
    
        #[test]
        fn test_try_recv_available() {
            let (tx, rx) = mpsc::channel::<u32>();
            tx.send(1).unwrap();
            assert_eq!(rx.try_recv().unwrap(), 1);
        }
    
        #[test]
        fn test_priority_select() {
            let (tx1, rx1) = mpsc::channel();
            let (tx2, rx2) = mpsc::channel();
    
            tx1.send(1).unwrap();
            tx2.send(2).unwrap();
    
            // Primary has priority
            let result = priority_select(&rx1, &rx2);
            assert_eq!(result, Some((1, true)));
    
            // Now secondary
            let result = priority_select(&rx1, &rx2);
            assert_eq!(result, Some((2, false)));
        }
    
        #[test]
        fn test_drain_all() {
            let (tx1, rx1) = mpsc::channel();
            let (tx2, rx2) = mpsc::channel();
    
            tx1.send(1).unwrap();
            tx1.send(2).unwrap();
            tx2.send(3).unwrap();
    
            let results = drain_all(&[rx1, rx2]);
            assert_eq!(results, vec![1, 2, 3]);
        }
    
        #[test]
        fn test_select_with_stop() {
            let (data_tx, data_rx) = mpsc::channel();
            let (stop_tx, stop_rx) = mpsc::channel();
    
            data_tx.send(1).unwrap();
            data_tx.send(2).unwrap();
            stop_tx.send(()).unwrap();
    
            let results = select_with_stop(data_rx, stop_rx, Duration::from_secs(1));
            assert!(results.len() <= 2);
        }
    }

    Deep Comparison

    OCaml vs Rust: Channel Select

    Select from Multiple Channels

    OCaml (Event module)

    let msg = Event.sync (Event.choose [
      Event.receive ch1;
      Event.receive ch2
    ])
    

    Rust (Polling)

    loop {
        if let Ok(v) = rx1.try_recv() { return First(v); }
        if let Ok(v) = rx2.try_recv() { return Second(v); }
        thread::sleep(Duration::from_millis(1));
    }
    

    Key Differences

    FeatureOCamlRust (std)
    Select primitiveEvent.choosePoll loop or crossbeam-channel
    Blocking selectYesNo (use crossbeam)
    TimeoutManualrecv_timeout

    Exercises

  • Control channel: Add a "shutdown" channel to a long-running worker. Use crossbeam::select! to receive either work items or shutdown signals, stopping when shutdown arrives.
  • Timeout with select: Implement receive_with_timeout<T>(rx: &Receiver<T>, timeout: Duration) -> Option<T> using crossbeam::select! with a crossbeam_channel::after(timeout) timeout channel.
  • Priority channels: Use two channels (high_priority, low_priority) and select!. First always check high_priority; only check low_priority when high_priority is empty. Implement this fairly (low priority eventually served even with continuous high-priority traffic).
  • Open Source Repos