ExamplesBy LevelBy TopicLearning Paths
1000 Advanced

1000 — Reactive Stream

Functional Programming

Tutorial

The Problem

Implement a push-based reactive stream (Observable<T>) with an Observer trait providing on_next, on_error, and on_complete callbacks. Support map and filter operators that create derived observables. Compare with OCaml's record-based observer and functional observable composition.

🎯 Learning Outcomes

  • • Model an Observer<T> as a trait with three callbacks
  • • Represent an Observable<T> as a struct wrapping a subscribe_fn: Box<dyn Fn(&mut dyn Observer<T>)>
  • • Build map and filter as operators that create new Observables from existing ones
  • • Use Rc<RefCell<T>> for shared mutable state inside observers
  • • Map Rust's trait-based observer to OCaml's record { on_next; on_error; on_complete }
  • • Recognise that functional reactive programming is deferred composition of push pipelines
  • Code Example

    #![allow(clippy::all)]
    // 1000: Reactive Stream
    // Push-based Observable<T> with map/filter/take/subscribe
    
    use std::cell::RefCell;
    use std::rc::Rc;
    
    // --- Observer trait ---
    trait Observer<T> {
        fn on_next(&mut self, value: T);
        fn on_error(&mut self, err: &str);
        fn on_complete(&mut self);
    }
    
    // --- Simple functional observer ---
    struct FnObserver<T> {
        on_next_fn: Box<dyn FnMut(T)>,
        on_complete_fn: Box<dyn FnMut()>,
    }
    
    impl<T> FnObserver<T> {
        fn new(on_next: impl FnMut(T) + 'static) -> Self {
            FnObserver {
                on_next_fn: Box::new(on_next),
                on_complete_fn: Box::new(|| {}),
            }
        }
    
        fn with_complete(mut self, f: impl FnMut() + 'static) -> Self {
            self.on_complete_fn = Box::new(f);
            self
        }
    }
    
    impl<T> Observer<T> for FnObserver<T> {
        fn on_next(&mut self, value: T) {
            (self.on_next_fn)(value);
        }
        fn on_error(&mut self, _err: &str) {}
        fn on_complete(&mut self) {
            (self.on_complete_fn)();
        }
    }
    
    // --- Observable: a lazy push source ---
    struct Observable<T> {
        subscribe_fn: Box<dyn Fn(&mut dyn Observer<T>)>,
    }
    
    impl<T: Clone + 'static> Observable<T> {
        fn new(f: impl Fn(&mut dyn Observer<T>) + 'static) -> Self {
            Observable {
                subscribe_fn: Box::new(f),
            }
        }
    
        fn subscribe(&self, observer: &mut dyn Observer<T>) {
            (self.subscribe_fn)(observer);
        }
    
        fn from_iter(items: Vec<T>) -> Self {
            Observable::new(move |obs| {
                for item in &items {
                    obs.on_next(item.clone());
                }
                obs.on_complete();
            })
        }
    }
    
    // --- Operators as free functions (return new Observable) ---
    
    // Adapter structs allow borrowing `observer` without 'static
    struct MapAdapter<'a, U, F> {
        inner: &'a mut dyn Observer<U>,
        f: &'a F,
    }
    
    impl<'a, T, U, F: Fn(T) -> U> Observer<T> for MapAdapter<'a, U, F> {
        fn on_next(&mut self, value: T) {
            self.inner.on_next((self.f)(value));
        }
        fn on_error(&mut self, err: &str) {
            self.inner.on_error(err);
        }
        fn on_complete(&mut self) {
            self.inner.on_complete();
        }
    }
    
    fn obs_map<T: Clone + 'static, U: Clone + 'static>(
        source: Observable<T>,
        f: impl Fn(T) -> U + 'static,
    ) -> Observable<U> {
        Observable::new(move |observer| {
            let mut adapter = MapAdapter {
                inner: observer,
                f: &f,
            };
            source.subscribe(&mut adapter);
        })
    }
    
    struct FilterAdapter<'a, T, P> {
        inner: &'a mut dyn Observer<T>,
        pred: &'a P,
    }
    
    impl<'a, T, P: Fn(&T) -> bool> Observer<T> for FilterAdapter<'a, T, P> {
        fn on_next(&mut self, value: T) {
            if (self.pred)(&value) {
                self.inner.on_next(value);
            }
        }
        fn on_error(&mut self, err: &str) {
            self.inner.on_error(err);
        }
        fn on_complete(&mut self) {
            self.inner.on_complete();
        }
    }
    
    fn obs_filter<T: Clone + 'static>(
        source: Observable<T>,
        pred: impl Fn(&T) -> bool + 'static,
    ) -> Observable<T> {
        Observable::new(move |observer| {
            let mut adapter = FilterAdapter {
                inner: observer,
                pred: &pred,
            };
            source.subscribe(&mut adapter);
        })
    }
    
    struct TakeAdapter<'a, T> {
        inner: &'a mut dyn Observer<T>,
        remaining: usize,
    }
    
    impl<'a, T> Observer<T> for TakeAdapter<'a, T> {
        fn on_next(&mut self, value: T) {
            if self.remaining > 0 {
                self.remaining -= 1;
                self.inner.on_next(value);
            }
        }
        fn on_error(&mut self, err: &str) {
            self.inner.on_error(err);
        }
        fn on_complete(&mut self) {
            self.inner.on_complete();
        }
    }
    
    fn obs_take<T: Clone + 'static>(source: Observable<T>, n: usize) -> Observable<T> {
        Observable::new(move |observer| {
            let mut adapter = TakeAdapter {
                inner: observer,
                remaining: n,
            };
            source.subscribe(&mut adapter);
        })
    }
    
    // --- Collect all emitted values ---
    fn collect<T: Clone + 'static>(source: Observable<T>) -> Vec<T> {
        let results = Rc::new(RefCell::new(Vec::new()));
        let results2 = Rc::clone(&results);
        let mut observer = FnObserver::new(move |v: T| {
            results2.borrow_mut().push(v);
        });
        source.subscribe(&mut observer);
        let x = results.borrow().clone();
        x
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_from_iter() {
            let obs = Observable::from_iter(vec![1, 2, 3]);
            assert_eq!(collect(obs), vec![1, 2, 3]);
        }
    
        #[test]
        fn test_map() {
            let obs = Observable::from_iter(vec![1, 2, 3]);
            let mapped = obs_map(obs, |x: i32| x * 2);
            assert_eq!(collect(mapped), vec![2, 4, 6]);
        }
    
        #[test]
        fn test_filter() {
            let obs = Observable::from_iter(vec![1, 2, 3, 4, 5]);
            let filtered = obs_filter(obs, |x| x % 2 == 0);
            assert_eq!(collect(filtered), vec![2, 4]);
        }
    
        #[test]
        fn test_take() {
            let obs = Observable::from_iter(vec![1, 2, 3, 4, 5]);
            let taken = obs_take(obs, 3);
            assert_eq!(collect(taken), vec![1, 2, 3]);
        }
    
        #[test]
        fn test_chain() {
            let source = Observable::from_iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
            let filtered = obs_filter(source, |x| x % 2 == 0);
            let mapped = obs_map(filtered, |x: i32| x * x);
            let taken = obs_take(mapped, 3);
            assert_eq!(collect(taken), vec![4, 16, 36]);
        }
    
        #[test]
        fn test_empty_observable() {
            let obs: Observable<i32> = Observable::from_iter(vec![]);
            assert_eq!(collect(obs), vec![]);
        }
    }

    Key Differences

    AspectRustOCaml
    Observertrait Observer<T>Record 'a observer
    ObservableBox<dyn Fn(&mut dyn Observer<T>)>{ subscribe: 'a observer -> unit -> unit }
    Map operatorCreates new ObservableCreates new observable
    Shared stateRc<RefCell<…>>ref or mutable binding
    UnsubscribeNot shown (complex)Returns thunk unit -> unit
    VerbosityHigh (trait + Box + dyn)Low (records + closures)

    Reactive streams implement the observer pattern functionally: pipelines are composed at subscription time, not execution time. The same observable can be subscribed to multiple times, each producing an independent stream execution.

    OCaml Approach

    OCaml's observer is a record: { on_next: 'a -> unit; on_error: exn -> unit; on_complete: unit -> unit }. The observable is { subscribe: 'a observer -> unit -> unit } — subscribe returns an unsubscribe thunk. map f obs wraps the observer's on_next with fun v -> observer.on_next (f v). The OCaml version is more concise because records are lighter than trait objects, and there is no need for Box or dyn.

    Full Source

    #![allow(clippy::all)]
    // 1000: Reactive Stream
    // Push-based Observable<T> with map/filter/take/subscribe
    
    use std::cell::RefCell;
    use std::rc::Rc;
    
    // --- Observer trait ---
    trait Observer<T> {
        fn on_next(&mut self, value: T);
        fn on_error(&mut self, err: &str);
        fn on_complete(&mut self);
    }
    
    // --- Simple functional observer ---
    struct FnObserver<T> {
        on_next_fn: Box<dyn FnMut(T)>,
        on_complete_fn: Box<dyn FnMut()>,
    }
    
    impl<T> FnObserver<T> {
        fn new(on_next: impl FnMut(T) + 'static) -> Self {
            FnObserver {
                on_next_fn: Box::new(on_next),
                on_complete_fn: Box::new(|| {}),
            }
        }
    
        fn with_complete(mut self, f: impl FnMut() + 'static) -> Self {
            self.on_complete_fn = Box::new(f);
            self
        }
    }
    
    impl<T> Observer<T> for FnObserver<T> {
        fn on_next(&mut self, value: T) {
            (self.on_next_fn)(value);
        }
        fn on_error(&mut self, _err: &str) {}
        fn on_complete(&mut self) {
            (self.on_complete_fn)();
        }
    }
    
    // --- Observable: a lazy push source ---
    struct Observable<T> {
        subscribe_fn: Box<dyn Fn(&mut dyn Observer<T>)>,
    }
    
    impl<T: Clone + 'static> Observable<T> {
        fn new(f: impl Fn(&mut dyn Observer<T>) + 'static) -> Self {
            Observable {
                subscribe_fn: Box::new(f),
            }
        }
    
        fn subscribe(&self, observer: &mut dyn Observer<T>) {
            (self.subscribe_fn)(observer);
        }
    
        fn from_iter(items: Vec<T>) -> Self {
            Observable::new(move |obs| {
                for item in &items {
                    obs.on_next(item.clone());
                }
                obs.on_complete();
            })
        }
    }
    
    // --- Operators as free functions (return new Observable) ---
    
    // Adapter structs allow borrowing `observer` without 'static
    struct MapAdapter<'a, U, F> {
        inner: &'a mut dyn Observer<U>,
        f: &'a F,
    }
    
    impl<'a, T, U, F: Fn(T) -> U> Observer<T> for MapAdapter<'a, U, F> {
        fn on_next(&mut self, value: T) {
            self.inner.on_next((self.f)(value));
        }
        fn on_error(&mut self, err: &str) {
            self.inner.on_error(err);
        }
        fn on_complete(&mut self) {
            self.inner.on_complete();
        }
    }
    
    fn obs_map<T: Clone + 'static, U: Clone + 'static>(
        source: Observable<T>,
        f: impl Fn(T) -> U + 'static,
    ) -> Observable<U> {
        Observable::new(move |observer| {
            let mut adapter = MapAdapter {
                inner: observer,
                f: &f,
            };
            source.subscribe(&mut adapter);
        })
    }
    
    struct FilterAdapter<'a, T, P> {
        inner: &'a mut dyn Observer<T>,
        pred: &'a P,
    }
    
    impl<'a, T, P: Fn(&T) -> bool> Observer<T> for FilterAdapter<'a, T, P> {
        fn on_next(&mut self, value: T) {
            if (self.pred)(&value) {
                self.inner.on_next(value);
            }
        }
        fn on_error(&mut self, err: &str) {
            self.inner.on_error(err);
        }
        fn on_complete(&mut self) {
            self.inner.on_complete();
        }
    }
    
    fn obs_filter<T: Clone + 'static>(
        source: Observable<T>,
        pred: impl Fn(&T) -> bool + 'static,
    ) -> Observable<T> {
        Observable::new(move |observer| {
            let mut adapter = FilterAdapter {
                inner: observer,
                pred: &pred,
            };
            source.subscribe(&mut adapter);
        })
    }
    
    struct TakeAdapter<'a, T> {
        inner: &'a mut dyn Observer<T>,
        remaining: usize,
    }
    
    impl<'a, T> Observer<T> for TakeAdapter<'a, T> {
        fn on_next(&mut self, value: T) {
            if self.remaining > 0 {
                self.remaining -= 1;
                self.inner.on_next(value);
            }
        }
        fn on_error(&mut self, err: &str) {
            self.inner.on_error(err);
        }
        fn on_complete(&mut self) {
            self.inner.on_complete();
        }
    }
    
    fn obs_take<T: Clone + 'static>(source: Observable<T>, n: usize) -> Observable<T> {
        Observable::new(move |observer| {
            let mut adapter = TakeAdapter {
                inner: observer,
                remaining: n,
            };
            source.subscribe(&mut adapter);
        })
    }
    
    // --- Collect all emitted values ---
    fn collect<T: Clone + 'static>(source: Observable<T>) -> Vec<T> {
        let results = Rc::new(RefCell::new(Vec::new()));
        let results2 = Rc::clone(&results);
        let mut observer = FnObserver::new(move |v: T| {
            results2.borrow_mut().push(v);
        });
        source.subscribe(&mut observer);
        let x = results.borrow().clone();
        x
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_from_iter() {
            let obs = Observable::from_iter(vec![1, 2, 3]);
            assert_eq!(collect(obs), vec![1, 2, 3]);
        }
    
        #[test]
        fn test_map() {
            let obs = Observable::from_iter(vec![1, 2, 3]);
            let mapped = obs_map(obs, |x: i32| x * 2);
            assert_eq!(collect(mapped), vec![2, 4, 6]);
        }
    
        #[test]
        fn test_filter() {
            let obs = Observable::from_iter(vec![1, 2, 3, 4, 5]);
            let filtered = obs_filter(obs, |x| x % 2 == 0);
            assert_eq!(collect(filtered), vec![2, 4]);
        }
    
        #[test]
        fn test_take() {
            let obs = Observable::from_iter(vec![1, 2, 3, 4, 5]);
            let taken = obs_take(obs, 3);
            assert_eq!(collect(taken), vec![1, 2, 3]);
        }
    
        #[test]
        fn test_chain() {
            let source = Observable::from_iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
            let filtered = obs_filter(source, |x| x % 2 == 0);
            let mapped = obs_map(filtered, |x: i32| x * x);
            let taken = obs_take(mapped, 3);
            assert_eq!(collect(taken), vec![4, 16, 36]);
        }
    
        #[test]
        fn test_empty_observable() {
            let obs: Observable<i32> = Observable::from_iter(vec![]);
            assert_eq!(collect(obs), vec![]);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_from_iter() {
            let obs = Observable::from_iter(vec![1, 2, 3]);
            assert_eq!(collect(obs), vec![1, 2, 3]);
        }
    
        #[test]
        fn test_map() {
            let obs = Observable::from_iter(vec![1, 2, 3]);
            let mapped = obs_map(obs, |x: i32| x * 2);
            assert_eq!(collect(mapped), vec![2, 4, 6]);
        }
    
        #[test]
        fn test_filter() {
            let obs = Observable::from_iter(vec![1, 2, 3, 4, 5]);
            let filtered = obs_filter(obs, |x| x % 2 == 0);
            assert_eq!(collect(filtered), vec![2, 4]);
        }
    
        #[test]
        fn test_take() {
            let obs = Observable::from_iter(vec![1, 2, 3, 4, 5]);
            let taken = obs_take(obs, 3);
            assert_eq!(collect(taken), vec![1, 2, 3]);
        }
    
        #[test]
        fn test_chain() {
            let source = Observable::from_iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
            let filtered = obs_filter(source, |x| x % 2 == 0);
            let mapped = obs_map(filtered, |x: i32| x * x);
            let taken = obs_take(mapped, 3);
            assert_eq!(collect(taken), vec![4, 16, 36]);
        }
    
        #[test]
        fn test_empty_observable() {
            let obs: Observable<i32> = Observable::from_iter(vec![]);
            assert_eq!(collect(obs), vec![]);
        }
    }

    Deep Comparison

    Reactive Stream — Comparison

    Core Insight

    Reactive streams are push-based lazy sequences: a producer pushes values to a subscriber. Unlike iterators (pull-based), the producer controls timing. Operators like map/filter wrap one observable in another — forming a lazy chain that only runs when subscribed.

    OCaml Approach

  • observable = { subscribe: observer -> unsubscribe } as a record
  • observer = { on_next; on_error; on_complete } callbacks
  • map/filter/take create new observables wrapping the old
  • Subject (hot observable): broadcasts to multiple subscribers
  • • Closely mirrors RxJS architecture
  • Rust Approach

  • Observable<T> wraps Box<dyn Fn(&mut dyn Observer<T>)>
  • Observer<T> trait with on_next, on_error, on_complete
  • FnObserver adapter for closure-based observers
  • Rc<RefCell<_>> for shared state within single-threaded observable
  • obs_map, obs_filter, obs_take as free functions returning new Observable
  • • For production: futures::Stream or the rxrust crate
  • Comparison Table

    ConceptOCamlRust
    Observable type{ subscribe: observer -> unsub }struct Observable<T> { subscribe_fn }
    Observer type{ on_next; on_error; on_complete }trait Observer<T>
    Map operatormap f obs → new observableobs_map(source, f)Observable<U>
    Filter operatorfilter pred obsobs_filter(source, pred)
    Take operatortake n obsobs_take(source, n)
    Hot observableSubject typeManual with Arc<Mutex<Vec<...>>>
    ProductionLwt_react, rxocamlfutures::Stream, rxrust

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Implement a take(n: usize) operator that stops the stream after n values.
  • Implement a merge operator that subscribes to two observables and pushes values from both to a single subscriber.
  • Add error propagation: modify map to catch panics from the mapping function and forward them to on_error.
  • Implement a Subject<T> — a combined observable and observer — that fans out emissions to multiple subscribers.
  • In OCaml, implement flat_map : ('a -> 'b observable) -> 'a observable -> 'b observable for observable chaining.
  • Open Source Repos