ExamplesBy LevelBy TopicLearning Paths
329 Advanced

329: Async Streams

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "329: Async Streams" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Database cursor results, paginated API responses, file lines, and network byte streams all produce data incrementally. Key difference from OCaml: 1. **Async streams**: Rust's `futures::Stream` / `tokio_stream::Stream` is the async version with `poll_next()` instead of `next()`; OCaml's `Lwt_stream` is the equivalent.

Tutorial

The Problem

Database cursor results, paginated API responses, file lines, and network byte streams all produce data incrementally. Loading everything into memory before processing is impractical for large datasets. The Stream trait (async equivalent of Iterator) yields values one at a time, allowing the consumer to process each before the next is produced. Async streams are the foundation of streaming ETL, real-time data processing, and lazy I/O pipelines.

🎯 Learning Outcomes

  • • Understand Stream as the async equivalent of Iterator — lazy, sequential, potentially infinite
  • • Implement a RangeStream using Rust's Iterator trait as a synchronous analogy
  • • Use map(), filter(), and collect() on synchronous streams as preparation for async streams
  • • Recognize that tokio_stream and futures::Stream provide the full async streaming API
  • Code Example

    struct RangeStream { current: i64, end: i64 }
    
    impl Iterator for RangeStream {
        type Item = i64;
        fn next(&mut self) -> Option<i64> {
            if self.current >= self.end { None }
            else { let v = self.current; self.current += 1; Some(v) }
        }
    }

    Key Differences

  • Async streams: Rust's futures::Stream / tokio_stream::Stream is the async version with poll_next() instead of next(); OCaml's Lwt_stream is the equivalent.
  • Backpressure: Async streams naturally implement backpressure — the producer only generates the next item when the consumer polls for it.
  • Real-world use: tokio_stream::wrappers::ReceiverStream wraps a channel as a stream; tokio_stream::StreamExt::timeout_repeating adds retry logic.
  • Generator syntax: The async-stream crate provides stream! { yield value; } syntax for ergonomic async stream creation.
  • OCaml Approach

    OCaml's Seq module is the synchronous lazy stream type, directly analogous to this iterator-based stream:

    let range start end_ =
      Seq.unfold (fun n -> if n >= end_ then None else Some (n, n+1)) start
    
    (* Lazy processing: *)
    let sum = Seq.fold_left (+) 0
      (Seq.take 100 (Seq.filter_map
        (fun x -> if x mod 2 = 0 then Some (x*x) else None)
        (range 0 1_000_000)))
    

    Full Source

    #![allow(clippy::all)]
    //! # Async Streams
    //!
    //! An iterator that yields values one by one — with the ability to pause
    //! and resume between each item. Foundation for streaming APIs and pipelines.
    
    /// A lazy range stream that generates values on demand.
    pub struct RangeStream {
        current: i64,
        end: i64,
    }
    
    impl RangeStream {
        pub fn new(start: i64, end: i64) -> Self {
            Self {
                current: start,
                end,
            }
        }
    }
    
    impl Iterator for RangeStream {
        type Item = i64;
    
        fn next(&mut self) -> Option<Self::Item> {
            if self.current >= self.end {
                None
            } else {
                let value = self.current;
                self.current += 1;
                Some(value)
            }
        }
    }
    
    /// A stateful stream that yields data in fixed-size chunks.
    pub enum ChunkedStream<T> {
        Active {
            data: Vec<T>,
            position: usize,
            chunk_size: usize,
        },
        Done,
    }
    
    impl<T: Clone> ChunkedStream<T> {
        pub fn new(data: Vec<T>, chunk_size: usize) -> Self {
            Self::Active {
                data,
                position: 0,
                chunk_size,
            }
        }
    
        pub fn next_chunk(&mut self) -> Option<Vec<T>> {
            match self {
                Self::Done => None,
                Self::Active {
                    data,
                    position,
                    chunk_size,
                } => {
                    if *position >= data.len() {
                        *self = Self::Done;
                        return None;
                    }
                    let end = (*position + *chunk_size).min(data.len());
                    let chunk = data[*position..end].to_vec();
                    *position = end;
                    Some(chunk)
                }
            }
        }
    }
    
    /// A stream that applies a transformation to each element.
    pub struct MapStream<I, F> {
        inner: I,
        f: F,
    }
    
    impl<I, F, T, U> MapStream<I, F>
    where
        I: Iterator<Item = T>,
        F: FnMut(T) -> U,
    {
        pub fn new(inner: I, f: F) -> Self {
            Self { inner, f }
        }
    }
    
    impl<I, F, T, U> Iterator for MapStream<I, F>
    where
        I: Iterator<Item = T>,
        F: FnMut(T) -> U,
    {
        type Item = U;
    
        fn next(&mut self) -> Option<Self::Item> {
            self.inner.next().map(&mut self.f)
        }
    }
    
    /// A stream that filters elements based on a predicate.
    pub struct FilterStream<I, P> {
        inner: I,
        predicate: P,
    }
    
    impl<I, P, T> FilterStream<I, P>
    where
        I: Iterator<Item = T>,
        P: FnMut(&T) -> bool,
    {
        pub fn new(inner: I, predicate: P) -> Self {
            Self { inner, predicate }
        }
    }
    
    impl<I, P, T> Iterator for FilterStream<I, P>
    where
        I: Iterator<Item = T>,
        P: FnMut(&T) -> bool,
    {
        type Item = T;
    
        fn next(&mut self) -> Option<Self::Item> {
            loop {
                match self.inner.next() {
                    None => return None,
                    Some(item) => {
                        if (self.predicate)(&item) {
                            return Some(item);
                        }
                    }
                }
            }
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_range_stream_basic() {
            let stream = RangeStream::new(0, 5);
            let values: Vec<_> = stream.collect();
            assert_eq!(values, vec![0, 1, 2, 3, 4]);
        }
    
        #[test]
        fn test_range_stream_empty() {
            let stream = RangeStream::new(5, 5);
            let values: Vec<_> = stream.collect();
            assert!(values.is_empty());
        }
    
        #[test]
        fn test_range_stream_with_filter_map() {
            let result: Vec<i64> = RangeStream::new(0, 10)
                .filter(|x| x % 2 == 0)
                .map(|x| x * x)
                .collect();
            assert_eq!(result, vec![0, 4, 16, 36, 64]);
        }
    
        #[test]
        fn test_chunked_stream_exact() {
            let mut stream = ChunkedStream::new(vec![1, 2, 3, 4, 5, 6], 2);
            assert_eq!(stream.next_chunk(), Some(vec![1, 2]));
            assert_eq!(stream.next_chunk(), Some(vec![3, 4]));
            assert_eq!(stream.next_chunk(), Some(vec![5, 6]));
            assert_eq!(stream.next_chunk(), None);
        }
    
        #[test]
        fn test_chunked_stream_partial() {
            let mut stream = ChunkedStream::new(vec![1, 2, 3, 4, 5], 2);
            let mut all = Vec::new();
            while let Some(chunk) = stream.next_chunk() {
                all.extend(chunk);
            }
            assert_eq!(all, vec![1, 2, 3, 4, 5]);
        }
    
        #[test]
        fn test_map_stream() {
            let stream = MapStream::new(vec![1, 2, 3].into_iter(), |x| x * 2);
            let values: Vec<_> = stream.collect();
            assert_eq!(values, vec![2, 4, 6]);
        }
    
        #[test]
        fn test_filter_stream() {
            let stream = FilterStream::new(vec![1, 2, 3, 4, 5].into_iter(), |x| x % 2 == 1);
            let values: Vec<_> = stream.collect();
            assert_eq!(values, vec![1, 3, 5]);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_range_stream_basic() {
            let stream = RangeStream::new(0, 5);
            let values: Vec<_> = stream.collect();
            assert_eq!(values, vec![0, 1, 2, 3, 4]);
        }
    
        #[test]
        fn test_range_stream_empty() {
            let stream = RangeStream::new(5, 5);
            let values: Vec<_> = stream.collect();
            assert!(values.is_empty());
        }
    
        #[test]
        fn test_range_stream_with_filter_map() {
            let result: Vec<i64> = RangeStream::new(0, 10)
                .filter(|x| x % 2 == 0)
                .map(|x| x * x)
                .collect();
            assert_eq!(result, vec![0, 4, 16, 36, 64]);
        }
    
        #[test]
        fn test_chunked_stream_exact() {
            let mut stream = ChunkedStream::new(vec![1, 2, 3, 4, 5, 6], 2);
            assert_eq!(stream.next_chunk(), Some(vec![1, 2]));
            assert_eq!(stream.next_chunk(), Some(vec![3, 4]));
            assert_eq!(stream.next_chunk(), Some(vec![5, 6]));
            assert_eq!(stream.next_chunk(), None);
        }
    
        #[test]
        fn test_chunked_stream_partial() {
            let mut stream = ChunkedStream::new(vec![1, 2, 3, 4, 5], 2);
            let mut all = Vec::new();
            while let Some(chunk) = stream.next_chunk() {
                all.extend(chunk);
            }
            assert_eq!(all, vec![1, 2, 3, 4, 5]);
        }
    
        #[test]
        fn test_map_stream() {
            let stream = MapStream::new(vec![1, 2, 3].into_iter(), |x| x * 2);
            let values: Vec<_> = stream.collect();
            assert_eq!(values, vec![2, 4, 6]);
        }
    
        #[test]
        fn test_filter_stream() {
            let stream = FilterStream::new(vec![1, 2, 3, 4, 5].into_iter(), |x| x % 2 == 1);
            let values: Vec<_> = stream.collect();
            assert_eq!(values, vec![1, 3, 5]);
        }
    }

    Deep Comparison

    OCaml vs Rust: Async Streams

    Lazy Range

    OCaml:

    type 'a stream = Empty | Cons of 'a * (unit -> 'a stream)
    
    let range_stream start stop =
      let rec loop i () = if i>=stop then Empty else Cons(i, loop (i+1))
      in loop start ()
    

    Rust:

    struct RangeStream { current: i64, end: i64 }
    
    impl Iterator for RangeStream {
        type Item = i64;
        fn next(&mut self) -> Option<i64> {
            if self.current >= self.end { None }
            else { let v = self.current; self.current += 1; Some(v) }
        }
    }
    

    Key Differences

    AspectOCamlRust
    Stream typeADT with thunksIterator trait
    LazinessExplicit closuresImplicit in next()
    StateIn closuresIn struct fields
    CombinatorsManual recursionBuilt-in .filter(), .map()
    MemoryGC handles thunksNo allocation (stack)

    Exercises

  • Implement a FibonacciStream that yields Fibonacci numbers indefinitely.
  • Build a pipeline using stream adapters: take a RangeStream, filter even numbers, square them, and collect only the first 10 results.
  • Simulate an async data source by implementing a stream that yields values with configurable delays between items.
  • Open Source Repos