1000 — Reactive Stream
Tutorial
The Problem
Implement a push-based reactive stream (Observable<T>) with an Observer trait providing on_next, on_error, and on_complete callbacks. Support map and filter operators that create derived observables. Compare with OCaml's record-based observer and functional observable composition.
🎯 Learning Outcomes
Observer<T> as a trait with three callbacksObservable<T> as a struct wrapping a subscribe_fn: Box<dyn Fn(&mut dyn Observer<T>)>map and filter as operators that create new Observables from existing onesRc<RefCell<T>> for shared mutable state inside observers{ on_next; on_error; on_complete }Code Example
#![allow(clippy::all)]
// 1000: Reactive Stream
// Push-based Observable<T> with map/filter/take/subscribe
use std::cell::RefCell;
use std::rc::Rc;
// --- Observer trait ---
trait Observer<T> {
fn on_next(&mut self, value: T);
fn on_error(&mut self, err: &str);
fn on_complete(&mut self);
}
// --- Simple functional observer ---
struct FnObserver<T> {
on_next_fn: Box<dyn FnMut(T)>,
on_complete_fn: Box<dyn FnMut()>,
}
impl<T> FnObserver<T> {
fn new(on_next: impl FnMut(T) + 'static) -> Self {
FnObserver {
on_next_fn: Box::new(on_next),
on_complete_fn: Box::new(|| {}),
}
}
fn with_complete(mut self, f: impl FnMut() + 'static) -> Self {
self.on_complete_fn = Box::new(f);
self
}
}
impl<T> Observer<T> for FnObserver<T> {
fn on_next(&mut self, value: T) {
(self.on_next_fn)(value);
}
fn on_error(&mut self, _err: &str) {}
fn on_complete(&mut self) {
(self.on_complete_fn)();
}
}
// --- Observable: a lazy push source ---
struct Observable<T> {
subscribe_fn: Box<dyn Fn(&mut dyn Observer<T>)>,
}
impl<T: Clone + 'static> Observable<T> {
fn new(f: impl Fn(&mut dyn Observer<T>) + 'static) -> Self {
Observable {
subscribe_fn: Box::new(f),
}
}
fn subscribe(&self, observer: &mut dyn Observer<T>) {
(self.subscribe_fn)(observer);
}
fn from_iter(items: Vec<T>) -> Self {
Observable::new(move |obs| {
for item in &items {
obs.on_next(item.clone());
}
obs.on_complete();
})
}
}
// --- Operators as free functions (return new Observable) ---
// Adapter structs allow borrowing `observer` without 'static
struct MapAdapter<'a, U, F> {
inner: &'a mut dyn Observer<U>,
f: &'a F,
}
impl<'a, T, U, F: Fn(T) -> U> Observer<T> for MapAdapter<'a, U, F> {
fn on_next(&mut self, value: T) {
self.inner.on_next((self.f)(value));
}
fn on_error(&mut self, err: &str) {
self.inner.on_error(err);
}
fn on_complete(&mut self) {
self.inner.on_complete();
}
}
fn obs_map<T: Clone + 'static, U: Clone + 'static>(
source: Observable<T>,
f: impl Fn(T) -> U + 'static,
) -> Observable<U> {
Observable::new(move |observer| {
let mut adapter = MapAdapter {
inner: observer,
f: &f,
};
source.subscribe(&mut adapter);
})
}
struct FilterAdapter<'a, T, P> {
inner: &'a mut dyn Observer<T>,
pred: &'a P,
}
impl<'a, T, P: Fn(&T) -> bool> Observer<T> for FilterAdapter<'a, T, P> {
fn on_next(&mut self, value: T) {
if (self.pred)(&value) {
self.inner.on_next(value);
}
}
fn on_error(&mut self, err: &str) {
self.inner.on_error(err);
}
fn on_complete(&mut self) {
self.inner.on_complete();
}
}
fn obs_filter<T: Clone + 'static>(
source: Observable<T>,
pred: impl Fn(&T) -> bool + 'static,
) -> Observable<T> {
Observable::new(move |observer| {
let mut adapter = FilterAdapter {
inner: observer,
pred: &pred,
};
source.subscribe(&mut adapter);
})
}
struct TakeAdapter<'a, T> {
inner: &'a mut dyn Observer<T>,
remaining: usize,
}
impl<'a, T> Observer<T> for TakeAdapter<'a, T> {
fn on_next(&mut self, value: T) {
if self.remaining > 0 {
self.remaining -= 1;
self.inner.on_next(value);
}
}
fn on_error(&mut self, err: &str) {
self.inner.on_error(err);
}
fn on_complete(&mut self) {
self.inner.on_complete();
}
}
fn obs_take<T: Clone + 'static>(source: Observable<T>, n: usize) -> Observable<T> {
Observable::new(move |observer| {
let mut adapter = TakeAdapter {
inner: observer,
remaining: n,
};
source.subscribe(&mut adapter);
})
}
// --- Collect all emitted values ---
fn collect<T: Clone + 'static>(source: Observable<T>) -> Vec<T> {
let results = Rc::new(RefCell::new(Vec::new()));
let results2 = Rc::clone(&results);
let mut observer = FnObserver::new(move |v: T| {
results2.borrow_mut().push(v);
});
source.subscribe(&mut observer);
let x = results.borrow().clone();
x
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_from_iter() {
let obs = Observable::from_iter(vec![1, 2, 3]);
assert_eq!(collect(obs), vec![1, 2, 3]);
}
#[test]
fn test_map() {
let obs = Observable::from_iter(vec![1, 2, 3]);
let mapped = obs_map(obs, |x: i32| x * 2);
assert_eq!(collect(mapped), vec![2, 4, 6]);
}
#[test]
fn test_filter() {
let obs = Observable::from_iter(vec![1, 2, 3, 4, 5]);
let filtered = obs_filter(obs, |x| x % 2 == 0);
assert_eq!(collect(filtered), vec![2, 4]);
}
#[test]
fn test_take() {
let obs = Observable::from_iter(vec![1, 2, 3, 4, 5]);
let taken = obs_take(obs, 3);
assert_eq!(collect(taken), vec![1, 2, 3]);
}
#[test]
fn test_chain() {
let source = Observable::from_iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let filtered = obs_filter(source, |x| x % 2 == 0);
let mapped = obs_map(filtered, |x: i32| x * x);
let taken = obs_take(mapped, 3);
assert_eq!(collect(taken), vec![4, 16, 36]);
}
#[test]
fn test_empty_observable() {
let obs: Observable<i32> = Observable::from_iter(vec![]);
assert_eq!(collect(obs), vec![]);
}
}Key Differences
| Aspect | Rust | OCaml |
|---|---|---|
| Observer | trait Observer<T> | Record 'a observer |
| Observable | Box<dyn Fn(&mut dyn Observer<T>)> | { subscribe: 'a observer -> unit -> unit } |
| Map operator | Creates new Observable | Creates new observable |
| Shared state | Rc<RefCell<…>> | ref or mutable binding |
| Unsubscribe | Not shown (complex) | Returns thunk unit -> unit |
| Verbosity | High (trait + Box + dyn) | Low (records + closures) |
Reactive streams implement the observer pattern functionally: pipelines are composed at subscription time, not execution time. The same observable can be subscribed to multiple times, each producing an independent stream execution.
OCaml Approach
OCaml's observer is a record: { on_next: 'a -> unit; on_error: exn -> unit; on_complete: unit -> unit }. The observable is { subscribe: 'a observer -> unit -> unit } — subscribe returns an unsubscribe thunk. map f obs wraps the observer's on_next with fun v -> observer.on_next (f v). The OCaml version is more concise because records are lighter than trait objects, and there is no need for Box or dyn.
Full Source
#![allow(clippy::all)]
// 1000: Reactive Stream
// Push-based Observable<T> with map/filter/take/subscribe
use std::cell::RefCell;
use std::rc::Rc;
// --- Observer trait ---
trait Observer<T> {
fn on_next(&mut self, value: T);
fn on_error(&mut self, err: &str);
fn on_complete(&mut self);
}
// --- Simple functional observer ---
struct FnObserver<T> {
on_next_fn: Box<dyn FnMut(T)>,
on_complete_fn: Box<dyn FnMut()>,
}
impl<T> FnObserver<T> {
fn new(on_next: impl FnMut(T) + 'static) -> Self {
FnObserver {
on_next_fn: Box::new(on_next),
on_complete_fn: Box::new(|| {}),
}
}
fn with_complete(mut self, f: impl FnMut() + 'static) -> Self {
self.on_complete_fn = Box::new(f);
self
}
}
impl<T> Observer<T> for FnObserver<T> {
fn on_next(&mut self, value: T) {
(self.on_next_fn)(value);
}
fn on_error(&mut self, _err: &str) {}
fn on_complete(&mut self) {
(self.on_complete_fn)();
}
}
// --- Observable: a lazy push source ---
struct Observable<T> {
subscribe_fn: Box<dyn Fn(&mut dyn Observer<T>)>,
}
impl<T: Clone + 'static> Observable<T> {
fn new(f: impl Fn(&mut dyn Observer<T>) + 'static) -> Self {
Observable {
subscribe_fn: Box::new(f),
}
}
fn subscribe(&self, observer: &mut dyn Observer<T>) {
(self.subscribe_fn)(observer);
}
fn from_iter(items: Vec<T>) -> Self {
Observable::new(move |obs| {
for item in &items {
obs.on_next(item.clone());
}
obs.on_complete();
})
}
}
// --- Operators as free functions (return new Observable) ---
// Adapter structs allow borrowing `observer` without 'static
struct MapAdapter<'a, U, F> {
inner: &'a mut dyn Observer<U>,
f: &'a F,
}
impl<'a, T, U, F: Fn(T) -> U> Observer<T> for MapAdapter<'a, U, F> {
fn on_next(&mut self, value: T) {
self.inner.on_next((self.f)(value));
}
fn on_error(&mut self, err: &str) {
self.inner.on_error(err);
}
fn on_complete(&mut self) {
self.inner.on_complete();
}
}
fn obs_map<T: Clone + 'static, U: Clone + 'static>(
source: Observable<T>,
f: impl Fn(T) -> U + 'static,
) -> Observable<U> {
Observable::new(move |observer| {
let mut adapter = MapAdapter {
inner: observer,
f: &f,
};
source.subscribe(&mut adapter);
})
}
struct FilterAdapter<'a, T, P> {
inner: &'a mut dyn Observer<T>,
pred: &'a P,
}
impl<'a, T, P: Fn(&T) -> bool> Observer<T> for FilterAdapter<'a, T, P> {
fn on_next(&mut self, value: T) {
if (self.pred)(&value) {
self.inner.on_next(value);
}
}
fn on_error(&mut self, err: &str) {
self.inner.on_error(err);
}
fn on_complete(&mut self) {
self.inner.on_complete();
}
}
fn obs_filter<T: Clone + 'static>(
source: Observable<T>,
pred: impl Fn(&T) -> bool + 'static,
) -> Observable<T> {
Observable::new(move |observer| {
let mut adapter = FilterAdapter {
inner: observer,
pred: &pred,
};
source.subscribe(&mut adapter);
})
}
struct TakeAdapter<'a, T> {
inner: &'a mut dyn Observer<T>,
remaining: usize,
}
impl<'a, T> Observer<T> for TakeAdapter<'a, T> {
fn on_next(&mut self, value: T) {
if self.remaining > 0 {
self.remaining -= 1;
self.inner.on_next(value);
}
}
fn on_error(&mut self, err: &str) {
self.inner.on_error(err);
}
fn on_complete(&mut self) {
self.inner.on_complete();
}
}
fn obs_take<T: Clone + 'static>(source: Observable<T>, n: usize) -> Observable<T> {
Observable::new(move |observer| {
let mut adapter = TakeAdapter {
inner: observer,
remaining: n,
};
source.subscribe(&mut adapter);
})
}
// --- Collect all emitted values ---
fn collect<T: Clone + 'static>(source: Observable<T>) -> Vec<T> {
let results = Rc::new(RefCell::new(Vec::new()));
let results2 = Rc::clone(&results);
let mut observer = FnObserver::new(move |v: T| {
results2.borrow_mut().push(v);
});
source.subscribe(&mut observer);
let x = results.borrow().clone();
x
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_from_iter() {
let obs = Observable::from_iter(vec![1, 2, 3]);
assert_eq!(collect(obs), vec![1, 2, 3]);
}
#[test]
fn test_map() {
let obs = Observable::from_iter(vec![1, 2, 3]);
let mapped = obs_map(obs, |x: i32| x * 2);
assert_eq!(collect(mapped), vec![2, 4, 6]);
}
#[test]
fn test_filter() {
let obs = Observable::from_iter(vec![1, 2, 3, 4, 5]);
let filtered = obs_filter(obs, |x| x % 2 == 0);
assert_eq!(collect(filtered), vec![2, 4]);
}
#[test]
fn test_take() {
let obs = Observable::from_iter(vec![1, 2, 3, 4, 5]);
let taken = obs_take(obs, 3);
assert_eq!(collect(taken), vec![1, 2, 3]);
}
#[test]
fn test_chain() {
let source = Observable::from_iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let filtered = obs_filter(source, |x| x % 2 == 0);
let mapped = obs_map(filtered, |x: i32| x * x);
let taken = obs_take(mapped, 3);
assert_eq!(collect(taken), vec![4, 16, 36]);
}
#[test]
fn test_empty_observable() {
let obs: Observable<i32> = Observable::from_iter(vec![]);
assert_eq!(collect(obs), vec![]);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_from_iter() {
let obs = Observable::from_iter(vec![1, 2, 3]);
assert_eq!(collect(obs), vec![1, 2, 3]);
}
#[test]
fn test_map() {
let obs = Observable::from_iter(vec![1, 2, 3]);
let mapped = obs_map(obs, |x: i32| x * 2);
assert_eq!(collect(mapped), vec![2, 4, 6]);
}
#[test]
fn test_filter() {
let obs = Observable::from_iter(vec![1, 2, 3, 4, 5]);
let filtered = obs_filter(obs, |x| x % 2 == 0);
assert_eq!(collect(filtered), vec![2, 4]);
}
#[test]
fn test_take() {
let obs = Observable::from_iter(vec![1, 2, 3, 4, 5]);
let taken = obs_take(obs, 3);
assert_eq!(collect(taken), vec![1, 2, 3]);
}
#[test]
fn test_chain() {
let source = Observable::from_iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let filtered = obs_filter(source, |x| x % 2 == 0);
let mapped = obs_map(filtered, |x: i32| x * x);
let taken = obs_take(mapped, 3);
assert_eq!(collect(taken), vec![4, 16, 36]);
}
#[test]
fn test_empty_observable() {
let obs: Observable<i32> = Observable::from_iter(vec![]);
assert_eq!(collect(obs), vec![]);
}
}
Deep Comparison
Reactive Stream — Comparison
Core Insight
Reactive streams are push-based lazy sequences: a producer pushes values to a subscriber. Unlike iterators (pull-based), the producer controls timing. Operators like map/filter wrap one observable in another — forming a lazy chain that only runs when subscribed.
OCaml Approach
observable = { subscribe: observer -> unsubscribe } as a recordobserver = { on_next; on_error; on_complete } callbacksmap/filter/take create new observables wrapping the oldSubject (hot observable): broadcasts to multiple subscribersRust Approach
Observable<T> wraps Box<dyn Fn(&mut dyn Observer<T>)>Observer<T> trait with on_next, on_error, on_completeFnObserver adapter for closure-based observersRc<RefCell<_>> for shared state within single-threaded observableobs_map, obs_filter, obs_take as free functions returning new Observablefutures::Stream or the rxrust crateComparison Table
| Concept | OCaml | Rust |
|---|---|---|
| Observable type | { subscribe: observer -> unsub } | struct Observable<T> { subscribe_fn } |
| Observer type | { on_next; on_error; on_complete } | trait Observer<T> |
| Map operator | map f obs → new observable | obs_map(source, f) → Observable<U> |
| Filter operator | filter pred obs | obs_filter(source, pred) |
| Take operator | take n obs | obs_take(source, n) |
| Hot observable | Subject type | Manual with Arc<Mutex<Vec<...>>> |
| Production | Lwt_react, rxocaml | futures::Stream, rxrust |
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
take(n: usize) operator that stops the stream after n values.merge operator that subscribes to two observables and pushes values from both to a single subscriber.map to catch panics from the mapping function and forward them to on_error.Subject<T> — a combined observable and observer — that fans out emissions to multiple subscribers.flat_map : ('a -> 'b observable) -> 'a observable -> 'b observable for observable chaining.