349: Broadcast Channel
Tutorial
The Problem
Publish-subscribe systems need one sender to deliver the same message to multiple independent receivers — event buses, real-time dashboards, multi-client notification systems. Standard MPSC channels route each message to exactly one receiver. A broadcast channel delivers every message to every subscriber. This pattern powers WebSocket fan-out (one server event → all connected clients), log streaming (one log source → multiple sinks), and reactive frameworks (one state change → all observers). Tokio provides tokio::sync::broadcast natively; this example shows the mechanics of building one from primitives.
🎯 Learning Outcomes
SyncSender<T>Arc<Mutex<Vec<SyncSender<T>>>>T: Clone)sync_channel per subscriber to apply per-subscriber backpressuretry_send, drop on lag)Code Example
#![allow(clippy::all)]
//! # Broadcast Channel
//! One sender, many receivers — every subscriber gets a copy of every message.
use std::sync::{mpsc, Arc, Mutex};
pub struct BroadcastSender<T: Clone + Send + 'static> {
subscribers: Arc<Mutex<Vec<mpsc::SyncSender<T>>>>,
}
pub struct BroadcastReceiver<T> {
rx: mpsc::Receiver<T>,
}
impl<T: Clone + Send + 'static> BroadcastSender<T> {
pub fn new() -> Self {
Self {
subscribers: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn subscribe(&self, buf: usize) -> BroadcastReceiver<T> {
let (tx, rx) = mpsc::sync_channel(buf);
self.subscribers.lock().unwrap().push(tx);
BroadcastReceiver { rx }
}
pub fn send(&self, msg: T) {
let subs = self.subscribers.lock().unwrap();
for sub in subs.iter() {
let _ = sub.try_send(msg.clone());
}
}
}
impl<T: Clone + Send + 'static> Default for BroadcastSender<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> BroadcastReceiver<T> {
pub fn recv(&self) -> Option<T> {
self.rx.recv().ok()
}
pub fn try_recv(&self) -> Option<T> {
self.rx.try_recv().ok()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn broadcast_to_multiple() {
let sender = BroadcastSender::new();
let r1 = sender.subscribe(10);
let r2 = sender.subscribe(10);
sender.send(42);
assert_eq!(r1.recv(), Some(42));
assert_eq!(r2.recv(), Some(42));
}
#[test]
fn late_subscriber_misses() {
let sender = BroadcastSender::new();
sender.send(1);
let r = sender.subscribe(10);
sender.send(2);
assert_eq!(r.recv(), Some(2));
}
}Key Differences
| Aspect | Rust channel broadcast | OCaml callback broadcast |
|---|---|---|
| Message delivery | Queued in per-subscriber channel | Immediate callback invocation |
| Backpressure | Per-subscriber buffer limit | None (callbacks run synchronously) |
| Lag handling | Drop messages or RecvError::Lagged | No buffering — caller's responsibility |
| Type safety | T: Clone + Send + 'static | Polymorphic callback 'a -> unit |
| Unsubscribe | Remove dead senders from list | Remove callback from list |
OCaml Approach
OCaml's event-driven approach uses a mutable list of callbacks:
type 'a broadcaster = {
mutable subscribers: ('a -> unit) list;
mutex: Mutex.t;
}
let broadcast b msg =
Mutex.lock b.mutex;
let subs = b.subscribers in
Mutex.unlock b.mutex;
List.iter (fun f -> f msg) subs
let subscribe b f =
Mutex.lock b.mutex;
b.subscribers <- f :: b.subscribers;
Mutex.unlock b.mutex
This callback model is common in OCaml GUI frameworks (LablGTK signals) and Lwt (reactive streams). The functional approach avoids channels entirely — each subscriber is a callback closure.
Full Source
#![allow(clippy::all)]
//! # Broadcast Channel
//! One sender, many receivers — every subscriber gets a copy of every message.
use std::sync::{mpsc, Arc, Mutex};
pub struct BroadcastSender<T: Clone + Send + 'static> {
subscribers: Arc<Mutex<Vec<mpsc::SyncSender<T>>>>,
}
pub struct BroadcastReceiver<T> {
rx: mpsc::Receiver<T>,
}
impl<T: Clone + Send + 'static> BroadcastSender<T> {
pub fn new() -> Self {
Self {
subscribers: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn subscribe(&self, buf: usize) -> BroadcastReceiver<T> {
let (tx, rx) = mpsc::sync_channel(buf);
self.subscribers.lock().unwrap().push(tx);
BroadcastReceiver { rx }
}
pub fn send(&self, msg: T) {
let subs = self.subscribers.lock().unwrap();
for sub in subs.iter() {
let _ = sub.try_send(msg.clone());
}
}
}
impl<T: Clone + Send + 'static> Default for BroadcastSender<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> BroadcastReceiver<T> {
pub fn recv(&self) -> Option<T> {
self.rx.recv().ok()
}
pub fn try_recv(&self) -> Option<T> {
self.rx.try_recv().ok()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn broadcast_to_multiple() {
let sender = BroadcastSender::new();
let r1 = sender.subscribe(10);
let r2 = sender.subscribe(10);
sender.send(42);
assert_eq!(r1.recv(), Some(42));
assert_eq!(r2.recv(), Some(42));
}
#[test]
fn late_subscriber_misses() {
let sender = BroadcastSender::new();
sender.send(1);
let r = sender.subscribe(10);
sender.send(2);
assert_eq!(r.recv(), Some(2));
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn broadcast_to_multiple() {
let sender = BroadcastSender::new();
let r1 = sender.subscribe(10);
let r2 = sender.subscribe(10);
sender.send(42);
assert_eq!(r1.recv(), Some(42));
assert_eq!(r2.recv(), Some(42));
}
#[test]
fn late_subscriber_misses() {
let sender = BroadcastSender::new();
sender.send(1);
let r = sender.subscribe(10);
sender.send(2);
assert_eq!(r.recv(), Some(2));
}
}
Deep Comparison
OCaml vs Rust: Broadcast Channel
Overview
See the example.rs and example.ml files for detailed implementations.
Key Differences
| Aspect | OCaml | Rust |
|---|---|---|
| Type system | Hindley-Milner | Ownership + traits |
| Memory | GC | Zero-cost abstractions |
| Mutability | Explicit ref | mut keyword |
| Error handling | Option/Result | Result<T, E> |
See README.md for detailed comparison.
Exercises
send, remove subscribers whose try_send returned Err(Full) or Err(Disconnected) — prune the subscriber list to prevent unbounded growth.tokio::sync::broadcast::channel(16); test that 3 subscribers each receive 5 messages in order, and observe RecvError::Lagged when a slow receiver falls behind.subscribe_filtered(buf, predicate) method that only delivers messages matching a condition to that subscriber; implement by wrapping the sender in a move |msg| if predicate(&msg) { ... } closure.