ExamplesBy LevelBy TopicLearning Paths
349 Advanced

349: Broadcast Channel

Functional Programming

Tutorial

The Problem

Publish-subscribe systems need one sender to deliver the same message to multiple independent receivers — event buses, real-time dashboards, multi-client notification systems. Standard MPSC channels route each message to exactly one receiver. A broadcast channel delivers every message to every subscriber. This pattern powers WebSocket fan-out (one server event → all connected clients), log streaming (one log source → multiple sinks), and reactive frameworks (one state change → all observers). Tokio provides tokio::sync::broadcast natively; this example shows the mechanics of building one from primitives.

🎯 Learning Outcomes

  • • Implement a broadcast sender that maintains a list of per-subscriber SyncSender<T>
  • • Protect the subscriber list with Arc<Mutex<Vec<SyncSender<T>>>>
  • • Clone the message to each subscriber (requires T: Clone)
  • • Use bounded sync_channel per subscriber to apply per-subscriber backpressure
  • • Handle slow subscribers without blocking the sender (use try_send, drop on lag)
  • • Recognize the difference between fan-out (broadcast) and fan-in (mpsc aggregation)
  • Code Example

    #![allow(clippy::all)]
    //! # Broadcast Channel
    //! One sender, many receivers — every subscriber gets a copy of every message.
    
    use std::sync::{mpsc, Arc, Mutex};
    
    pub struct BroadcastSender<T: Clone + Send + 'static> {
        subscribers: Arc<Mutex<Vec<mpsc::SyncSender<T>>>>,
    }
    
    pub struct BroadcastReceiver<T> {
        rx: mpsc::Receiver<T>,
    }
    
    impl<T: Clone + Send + 'static> BroadcastSender<T> {
        pub fn new() -> Self {
            Self {
                subscribers: Arc::new(Mutex::new(Vec::new())),
            }
        }
    
        pub fn subscribe(&self, buf: usize) -> BroadcastReceiver<T> {
            let (tx, rx) = mpsc::sync_channel(buf);
            self.subscribers.lock().unwrap().push(tx);
            BroadcastReceiver { rx }
        }
    
        pub fn send(&self, msg: T) {
            let subs = self.subscribers.lock().unwrap();
            for sub in subs.iter() {
                let _ = sub.try_send(msg.clone());
            }
        }
    }
    
    impl<T: Clone + Send + 'static> Default for BroadcastSender<T> {
        fn default() -> Self {
            Self::new()
        }
    }
    
    impl<T> BroadcastReceiver<T> {
        pub fn recv(&self) -> Option<T> {
            self.rx.recv().ok()
        }
        pub fn try_recv(&self) -> Option<T> {
            self.rx.try_recv().ok()
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        #[test]
        fn broadcast_to_multiple() {
            let sender = BroadcastSender::new();
            let r1 = sender.subscribe(10);
            let r2 = sender.subscribe(10);
            sender.send(42);
            assert_eq!(r1.recv(), Some(42));
            assert_eq!(r2.recv(), Some(42));
        }
        #[test]
        fn late_subscriber_misses() {
            let sender = BroadcastSender::new();
            sender.send(1);
            let r = sender.subscribe(10);
            sender.send(2);
            assert_eq!(r.recv(), Some(2));
        }
    }

    Key Differences

    AspectRust channel broadcastOCaml callback broadcast
    Message deliveryQueued in per-subscriber channelImmediate callback invocation
    BackpressurePer-subscriber buffer limitNone (callbacks run synchronously)
    Lag handlingDrop messages or RecvError::LaggedNo buffering — caller's responsibility
    Type safetyT: Clone + Send + 'staticPolymorphic callback 'a -> unit
    UnsubscribeRemove dead senders from listRemove callback from list

    OCaml Approach

    OCaml's event-driven approach uses a mutable list of callbacks:

    type 'a broadcaster = {
      mutable subscribers: ('a -> unit) list;
      mutex: Mutex.t;
    }
    
    let broadcast b msg =
      Mutex.lock b.mutex;
      let subs = b.subscribers in
      Mutex.unlock b.mutex;
      List.iter (fun f -> f msg) subs
    
    let subscribe b f =
      Mutex.lock b.mutex;
      b.subscribers <- f :: b.subscribers;
      Mutex.unlock b.mutex
    

    This callback model is common in OCaml GUI frameworks (LablGTK signals) and Lwt (reactive streams). The functional approach avoids channels entirely — each subscriber is a callback closure.

    Full Source

    #![allow(clippy::all)]
    //! # Broadcast Channel
    //! One sender, many receivers — every subscriber gets a copy of every message.
    
    use std::sync::{mpsc, Arc, Mutex};
    
    pub struct BroadcastSender<T: Clone + Send + 'static> {
        subscribers: Arc<Mutex<Vec<mpsc::SyncSender<T>>>>,
    }
    
    pub struct BroadcastReceiver<T> {
        rx: mpsc::Receiver<T>,
    }
    
    impl<T: Clone + Send + 'static> BroadcastSender<T> {
        pub fn new() -> Self {
            Self {
                subscribers: Arc::new(Mutex::new(Vec::new())),
            }
        }
    
        pub fn subscribe(&self, buf: usize) -> BroadcastReceiver<T> {
            let (tx, rx) = mpsc::sync_channel(buf);
            self.subscribers.lock().unwrap().push(tx);
            BroadcastReceiver { rx }
        }
    
        pub fn send(&self, msg: T) {
            let subs = self.subscribers.lock().unwrap();
            for sub in subs.iter() {
                let _ = sub.try_send(msg.clone());
            }
        }
    }
    
    impl<T: Clone + Send + 'static> Default for BroadcastSender<T> {
        fn default() -> Self {
            Self::new()
        }
    }
    
    impl<T> BroadcastReceiver<T> {
        pub fn recv(&self) -> Option<T> {
            self.rx.recv().ok()
        }
        pub fn try_recv(&self) -> Option<T> {
            self.rx.try_recv().ok()
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        #[test]
        fn broadcast_to_multiple() {
            let sender = BroadcastSender::new();
            let r1 = sender.subscribe(10);
            let r2 = sender.subscribe(10);
            sender.send(42);
            assert_eq!(r1.recv(), Some(42));
            assert_eq!(r2.recv(), Some(42));
        }
        #[test]
        fn late_subscriber_misses() {
            let sender = BroadcastSender::new();
            sender.send(1);
            let r = sender.subscribe(10);
            sender.send(2);
            assert_eq!(r.recv(), Some(2));
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
        #[test]
        fn broadcast_to_multiple() {
            let sender = BroadcastSender::new();
            let r1 = sender.subscribe(10);
            let r2 = sender.subscribe(10);
            sender.send(42);
            assert_eq!(r1.recv(), Some(42));
            assert_eq!(r2.recv(), Some(42));
        }
        #[test]
        fn late_subscriber_misses() {
            let sender = BroadcastSender::new();
            sender.send(1);
            let r = sender.subscribe(10);
            sender.send(2);
            assert_eq!(r.recv(), Some(2));
        }
    }

    Deep Comparison

    OCaml vs Rust: Broadcast Channel

    Overview

    See the example.rs and example.ml files for detailed implementations.

    Key Differences

    AspectOCamlRust
    Type systemHindley-MilnerOwnership + traits
    MemoryGCZero-cost abstractions
    MutabilityExplicit refmut keyword
    Error handlingOption/ResultResult<T, E>

    See README.md for detailed comparison.

    Exercises

  • Dead subscriber cleanup: After each send, remove subscribers whose try_send returned Err(Full) or Err(Disconnected) — prune the subscriber list to prevent unbounded growth.
  • Tokio broadcast: Replace the manual implementation with tokio::sync::broadcast::channel(16); test that 3 subscribers each receive 5 messages in order, and observe RecvError::Lagged when a slow receiver falls behind.
  • Filtered subscription: Add a subscribe_filtered(buf, predicate) method that only delivers messages matching a condition to that subscriber; implement by wrapping the sender in a move |msg| if predicate(&msg) { ... } closure.
  • Open Source Repos