ExamplesBy LevelBy TopicLearning Paths
985 Fundamental

985 Select Pattern

Functional Programming

Tutorial

The Problem

Implement a select pattern that multiplexes over multiple channels — analogous to Unix select(2) or OCaml's Event.choose. Use try_recv polling with thread::yield_now to non-blockingly check each channel in a loop, returning the first available value. Model the result as an enum Selected<A, B> that indicates which channel produced a value.

🎯 Learning Outcomes

  • • Implement select<A, B>(rx1, rx2) -> Selected<A, B> using try_recv on each channel
  • • Handle TryRecvError::Empty (channel has no item yet) vs TryRecvError::Disconnected (channel closed)
  • • Track r1_closed / r2_closed flags and return Selected::BothClosed when all channels close
  • • Use thread::yield_now() to avoid busy-spinning the CPU
  • • Understand the limitations vs real select/epoll and why crossbeam::channel::select! is preferred for production
  • Code Example

    #![allow(clippy::all)]
    // 985: Select Pattern — Poll Multiple Channels
    // Rust: try_recv loop for non-blocking select over multiple channels
    
    use std::sync::mpsc::{self, TryRecvError};
    use std::thread;
    
    #[derive(Debug, PartialEq)]
    enum Selected<A, B> {
        Left(A),
        Right(B),
        BothClosed,
    }
    
    // --- Non-blocking select over two channels ---
    fn select<A, B>(rx1: &mpsc::Receiver<A>, rx2: &mpsc::Receiver<B>) -> Selected<A, B> {
        let mut r1_closed = false;
        let mut r2_closed = false;
        loop {
            if !r1_closed {
                match rx1.try_recv() {
                    Ok(v) => return Selected::Left(v),
                    Err(TryRecvError::Disconnected) => r1_closed = true,
                    Err(TryRecvError::Empty) => {}
                }
            }
            if !r2_closed {
                match rx2.try_recv() {
                    Ok(v) => return Selected::Right(v),
                    Err(TryRecvError::Disconnected) => r2_closed = true,
                    Err(TryRecvError::Empty) => {}
                }
            }
            if r1_closed && r2_closed {
                return Selected::BothClosed;
            }
            thread::yield_now();
        }
    }
    
    // --- Drain both channels via select, categorizing messages ---
    fn select_drain(rx1: mpsc::Receiver<i32>, rx2: mpsc::Receiver<String>) -> (Vec<i32>, Vec<String>) {
        let mut lefts = Vec::new();
        let mut rights = Vec::new();
    
        loop {
            match select(&rx1, &rx2) {
                Selected::Left(v) => lefts.push(v),
                Selected::Right(v) => rights.push(v),
                Selected::BothClosed => break,
            }
        }
        (lefts, rights)
    }
    
    // --- Priority select: prefer channel 1 when both have data ---
    fn priority_recv<T>(high: &mpsc::Receiver<T>, low: &mpsc::Receiver<T>) -> Option<(T, bool)> {
        // true = came from high priority
        match high.try_recv() {
            Ok(v) => Some((v, true)),
            Err(_) => low.try_recv().ok().map(|v| (v, false)),
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_select_drain() {
            let (tx1, rx1) = mpsc::channel::<i32>();
            let (tx2, rx2) = mpsc::channel::<String>();
    
            for i in [1, 2, 3] {
                tx1.send(i).unwrap();
            }
            for s in ["a", "b", "c"] {
                tx2.send(s.to_string()).unwrap();
            }
            drop(tx1);
            drop(tx2);
    
            let (mut lefts, mut rights) = select_drain(rx1, rx2);
            lefts.sort();
            rights.sort();
            assert_eq!(lefts, vec![1, 2, 3]);
            assert_eq!(rights, vec!["a", "b", "c"]);
        }
    
        #[test]
        fn test_both_closed() {
            let (tx1, rx1) = mpsc::channel::<i32>();
            let (tx2, rx2) = mpsc::channel::<i32>();
            drop(tx1);
            drop(tx2);
            assert_eq!(select(&rx1, &rx2), Selected::BothClosed);
        }
    
        #[test]
        fn test_priority_recv() {
            let (htx, hrx) = mpsc::channel::<i32>();
            let (ltx, lrx) = mpsc::channel::<i32>();
    
            htx.send(10).unwrap();
            ltx.send(20).unwrap();
    
            // High priority wins
            let result = priority_recv(&hrx, &lrx);
            assert_eq!(result, Some((10, true)));
    
            // Now only low available
            let result2 = priority_recv(&hrx, &lrx);
            assert_eq!(result2, Some((20, false)));
        }
    
        #[test]
        fn test_select_empty_left() {
            let (_tx1, rx1) = mpsc::channel::<i32>();
            let (tx2, rx2) = mpsc::channel::<i32>();
            tx2.send(99).unwrap();
            drop(tx2);
            // rx1 never closes so we'll get Right(99) first
            assert_eq!(select(&rx1, &rx2), Selected::Right(99));
        }
    }

    Key Differences

    AspectRustOCaml
    Standard selectNone — use crossbeam::select!Event.choose or Lwt.pick
    Polling approachtry_recv loop + yield_nowNot needed — blocking select
    CPU usagePolling wastes CPUBlocking is efficient
    Multiple channelsManual loopchoose [e1; e2; e3; ...] list

    For production use, crossbeam::channel::select! is the correct tool — it uses OS synchronization primitives for efficient multi-channel blocking. The polling approach here illustrates the mechanics.

    OCaml Approach

    open Event
    
    type ('a, 'b) selected = Left of 'a | Right of 'b | BothClosed
    
    let select ch1 ch2 =
      let e1 = wrap (receive ch1) (fun v -> Left v) in
      let e2 = wrap (receive ch2) (fun v -> Right v) in
      sync (choose [e1; e2])
    
    (* Lwt version *)
    let lwt_select p1 p2 =
      Lwt.pick [
        Lwt.map (fun v -> `Left v)  p1;
        Lwt.map (fun v -> `Right v) p2;
      ]
    

    OCaml's Event.choose selects non-deterministically from a list of events — whichever event fires first is returned. Lwt.pick does the same for promises. Both are blocking (no polling loop), making them more efficient than the Rust try_recv loop.

    Full Source

    #![allow(clippy::all)]
    // 985: Select Pattern — Poll Multiple Channels
    // Rust: try_recv loop for non-blocking select over multiple channels
    
    use std::sync::mpsc::{self, TryRecvError};
    use std::thread;
    
    #[derive(Debug, PartialEq)]
    enum Selected<A, B> {
        Left(A),
        Right(B),
        BothClosed,
    }
    
    // --- Non-blocking select over two channels ---
    fn select<A, B>(rx1: &mpsc::Receiver<A>, rx2: &mpsc::Receiver<B>) -> Selected<A, B> {
        let mut r1_closed = false;
        let mut r2_closed = false;
        loop {
            if !r1_closed {
                match rx1.try_recv() {
                    Ok(v) => return Selected::Left(v),
                    Err(TryRecvError::Disconnected) => r1_closed = true,
                    Err(TryRecvError::Empty) => {}
                }
            }
            if !r2_closed {
                match rx2.try_recv() {
                    Ok(v) => return Selected::Right(v),
                    Err(TryRecvError::Disconnected) => r2_closed = true,
                    Err(TryRecvError::Empty) => {}
                }
            }
            if r1_closed && r2_closed {
                return Selected::BothClosed;
            }
            thread::yield_now();
        }
    }
    
    // --- Drain both channels via select, categorizing messages ---
    fn select_drain(rx1: mpsc::Receiver<i32>, rx2: mpsc::Receiver<String>) -> (Vec<i32>, Vec<String>) {
        let mut lefts = Vec::new();
        let mut rights = Vec::new();
    
        loop {
            match select(&rx1, &rx2) {
                Selected::Left(v) => lefts.push(v),
                Selected::Right(v) => rights.push(v),
                Selected::BothClosed => break,
            }
        }
        (lefts, rights)
    }
    
    // --- Priority select: prefer channel 1 when both have data ---
    fn priority_recv<T>(high: &mpsc::Receiver<T>, low: &mpsc::Receiver<T>) -> Option<(T, bool)> {
        // true = came from high priority
        match high.try_recv() {
            Ok(v) => Some((v, true)),
            Err(_) => low.try_recv().ok().map(|v| (v, false)),
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_select_drain() {
            let (tx1, rx1) = mpsc::channel::<i32>();
            let (tx2, rx2) = mpsc::channel::<String>();
    
            for i in [1, 2, 3] {
                tx1.send(i).unwrap();
            }
            for s in ["a", "b", "c"] {
                tx2.send(s.to_string()).unwrap();
            }
            drop(tx1);
            drop(tx2);
    
            let (mut lefts, mut rights) = select_drain(rx1, rx2);
            lefts.sort();
            rights.sort();
            assert_eq!(lefts, vec![1, 2, 3]);
            assert_eq!(rights, vec!["a", "b", "c"]);
        }
    
        #[test]
        fn test_both_closed() {
            let (tx1, rx1) = mpsc::channel::<i32>();
            let (tx2, rx2) = mpsc::channel::<i32>();
            drop(tx1);
            drop(tx2);
            assert_eq!(select(&rx1, &rx2), Selected::BothClosed);
        }
    
        #[test]
        fn test_priority_recv() {
            let (htx, hrx) = mpsc::channel::<i32>();
            let (ltx, lrx) = mpsc::channel::<i32>();
    
            htx.send(10).unwrap();
            ltx.send(20).unwrap();
    
            // High priority wins
            let result = priority_recv(&hrx, &lrx);
            assert_eq!(result, Some((10, true)));
    
            // Now only low available
            let result2 = priority_recv(&hrx, &lrx);
            assert_eq!(result2, Some((20, false)));
        }
    
        #[test]
        fn test_select_empty_left() {
            let (_tx1, rx1) = mpsc::channel::<i32>();
            let (tx2, rx2) = mpsc::channel::<i32>();
            tx2.send(99).unwrap();
            drop(tx2);
            // rx1 never closes so we'll get Right(99) first
            assert_eq!(select(&rx1, &rx2), Selected::Right(99));
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_select_drain() {
            let (tx1, rx1) = mpsc::channel::<i32>();
            let (tx2, rx2) = mpsc::channel::<String>();
    
            for i in [1, 2, 3] {
                tx1.send(i).unwrap();
            }
            for s in ["a", "b", "c"] {
                tx2.send(s.to_string()).unwrap();
            }
            drop(tx1);
            drop(tx2);
    
            let (mut lefts, mut rights) = select_drain(rx1, rx2);
            lefts.sort();
            rights.sort();
            assert_eq!(lefts, vec![1, 2, 3]);
            assert_eq!(rights, vec!["a", "b", "c"]);
        }
    
        #[test]
        fn test_both_closed() {
            let (tx1, rx1) = mpsc::channel::<i32>();
            let (tx2, rx2) = mpsc::channel::<i32>();
            drop(tx1);
            drop(tx2);
            assert_eq!(select(&rx1, &rx2), Selected::BothClosed);
        }
    
        #[test]
        fn test_priority_recv() {
            let (htx, hrx) = mpsc::channel::<i32>();
            let (ltx, lrx) = mpsc::channel::<i32>();
    
            htx.send(10).unwrap();
            ltx.send(20).unwrap();
    
            // High priority wins
            let result = priority_recv(&hrx, &lrx);
            assert_eq!(result, Some((10, true)));
    
            // Now only low available
            let result2 = priority_recv(&hrx, &lrx);
            assert_eq!(result2, Some((20, false)));
        }
    
        #[test]
        fn test_select_empty_left() {
            let (_tx1, rx1) = mpsc::channel::<i32>();
            let (tx2, rx2) = mpsc::channel::<i32>();
            tx2.send(99).unwrap();
            drop(tx2);
            // rx1 never closes so we'll get Right(99) first
            assert_eq!(select(&rx1, &rx2), Selected::Right(99));
        }
    }

    Deep Comparison

    Select Pattern — Comparison

    Core Insight

    select picks from whichever channel is ready first. In async runtimes this is a syscall (epoll/kqueue); in pure std Rust we spin with try_recv + yield_now. In OCaml, Lwt.pick or Event.select provides similar semantics.

    OCaml Approach

  • Lwt.pick [p1; p2] returns the first promise to resolve, cancels others
  • Event.select [ev1; ev2] for Thread/Event module
  • • Non-blocking: no built-in try_receive — must use Thread.create + timeout tricks
  • try_recv simulation: poll with Unix.select for I/O events
  • Rust Approach

  • rx.try_recv() is non-blocking: Ok(v) | Err(Empty) | Err(Disconnected)
  • • Loop over all receivers, yield_now() when all empty
  • • Priority select: check high-priority channel first
  • • For true async select: crossbeam::select! macro (external crate)
  • • For async/await: tokio::select! or futures::select!
  • Comparison Table

    ConceptOCamlRust (std)
    Select first readyLwt.pick [p1; p2]try_recv spin loop
    Non-blocking recvNo built-in (use timeout)rx.try_recv()
    Distinguish sourcesPattern match on promise listMatch on (r1, r2) tuples
    Cancel othersLwt.pick cancels losersJust ignore other channels
    Priority channelsNot built-inCheck high first in loop
    Efficient (no spin)Lwt.pick event-drivencrossbeam::select! (external)

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Extend select to work with three channels using a three-way enum Selected3<A, B, C>.
  • Add a timeout: return Selected::Timeout if no message arrives within a given Duration.
  • Implement select_all<T>(receivers: &[Receiver<T>]) -> Option<T> that returns the first available value from any channel in a list.
  • Rewrite select using crossbeam::select! and compare the implementation complexity.
  • Implement a load balancer: N producers send to one channel; M consumers each call select over N channels and process the first available item.
  • Open Source Repos