ExamplesBy LevelBy TopicLearning Paths
330 Advanced

330: Async Sink — Buffered Writing

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "330: Async Sink — Buffered Writing" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Writing individual items to disk, network, or databases one at a time is inefficient. Key difference from OCaml: 1. **Futures Sink trait**: `futures::Sink` provides `poll_ready`, `start_send`, `poll_flush`, `poll_close` — a four

Tutorial

The Problem

Writing individual items to disk, network, or databases one at a time is inefficient. Batching writes — accumulating items in a buffer and flushing when the buffer is full or a flush is explicitly requested — is the standard optimization. The Sink trait (in the futures crate) is the write-side complement to Stream: it accepts items and provides backpressure when the buffer is full. Understanding the buffering and flushing lifecycle is essential for high-throughput I/O.

🎯 Learning Outcomes

  • • Understand a Sink as a destination that accepts items and controls backpressure
  • • Implement a BatchSink that buffers items and flushes in configurable-size batches
  • • Recognize the lifecycle: send() (add item), auto-flush when full, flush() for explicit drain
  • • Apply batching to reduce I/O overhead in database writes and network sends
  • Code Example

    struct BatchSink<T> {
        buffer: VecDeque<T>,
        capacity: usize,
        flushed_batches: Vec<Vec<T>>,
    }
    
    impl<T: Clone> BatchSink<T> {
        fn send(&mut self, item: T) -> Result<(), String> {
            self.buffer.push_back(item);
            if self.buffer.len() >= self.capacity { self.flush()?; }
            Ok(())
        }
    }

    Key Differences

  • Futures Sink trait: futures::Sink provides poll_ready, start_send, poll_flush, poll_close — a four-phase protocol for async backpressure.
  • Backpressure: A sync BatchSink blocks the producer inline; an async sink uses poll_readyPending to signal "not ready" without blocking.
  • Production use: Kafka producers, Elasticsearch bulk indexers, and PostgreSQL batch inserters all use this buffering pattern.
  • Flush on drop: Sinks should flush remaining items when dropped — implement Drop to flush the residual buffer.
  • OCaml Approach

    OCaml's Buffer module provides in-memory buffering, and Lwt_io.flush drains IO buffers. For custom batch logic, a mutable Queue.t serves as the accumulator:

    let batch_sink capacity =
      let buffer = Queue.create () in
      let flush () = (* send batch *) Queue.clear buffer in
      let send item =
        Queue.add item buffer;
        if Queue.length buffer >= capacity then flush ()
      in (send, flush)
    

    Full Source

    #![allow(clippy::all)]
    //! # Async Sink
    //!
    //! A destination that accepts values and flushes them in batches —
    //! the write side of a stream for efficient I/O.
    
    use std::collections::VecDeque;
    
    /// A sink that buffers items and flushes them in batches.
    pub struct BatchSink<T> {
        buffer: VecDeque<T>,
        capacity: usize,
        flushed_batches: Vec<Vec<T>>,
    }
    
    impl<T: Clone> BatchSink<T> {
        pub fn new(capacity: usize) -> Self {
            Self {
                buffer: VecDeque::new(),
                capacity,
                flushed_batches: Vec::new(),
            }
        }
    
        /// Send an item to the sink. Auto-flushes when buffer reaches capacity.
        pub fn send(&mut self, item: T) -> Result<(), String> {
            self.buffer.push_back(item);
            if self.buffer.len() >= self.capacity {
                self.flush()?;
            }
            Ok(())
        }
    
        /// Manually flush the buffer.
        pub fn flush(&mut self) -> Result<(), String> {
            if !self.buffer.is_empty() {
                let batch: Vec<T> = self.buffer.drain(..).collect();
                self.flushed_batches.push(batch);
            }
            Ok(())
        }
    
        /// Get the number of items currently buffered.
        pub fn buffered_count(&self) -> usize {
            self.buffer.len()
        }
    
        /// Get the number of batches that have been flushed.
        pub fn flushed_count(&self) -> usize {
            self.flushed_batches.len()
        }
    
        /// Consume the sink and return all flushed batches.
        pub fn into_batches(self) -> Vec<Vec<T>> {
            self.flushed_batches
        }
    }
    
    /// A sink with a custom flush function.
    pub struct CallbackSink<T, F>
    where
        F: FnMut(Vec<T>) -> Result<(), String>,
    {
        buffer: VecDeque<T>,
        capacity: usize,
        on_flush: F,
        flush_count: usize,
    }
    
    impl<T, F> CallbackSink<T, F>
    where
        F: FnMut(Vec<T>) -> Result<(), String>,
    {
        pub fn new(capacity: usize, on_flush: F) -> Self {
            Self {
                buffer: VecDeque::new(),
                capacity,
                on_flush,
                flush_count: 0,
            }
        }
    
        pub fn send(&mut self, item: T) -> Result<(), String> {
            self.buffer.push_back(item);
            if self.buffer.len() >= self.capacity {
                self.flush()?;
            }
            Ok(())
        }
    
        pub fn flush(&mut self) -> Result<(), String> {
            if !self.buffer.is_empty() {
                let batch: Vec<T> = self.buffer.drain(..).collect();
                (self.on_flush)(batch)?;
                self.flush_count += 1;
            }
            Ok(())
        }
    
        pub fn flush_count(&self) -> usize {
            self.flush_count
        }
    }
    
    /// A counting sink that tracks statistics.
    pub struct StatsSink {
        total_items: usize,
        total_batches: usize,
        max_batch_size: usize,
    }
    
    impl StatsSink {
        pub fn new() -> Self {
            Self {
                total_items: 0,
                total_batches: 0,
                max_batch_size: 0,
            }
        }
    
        pub fn record_batch(&mut self, size: usize) {
            self.total_items += size;
            self.total_batches += 1;
            self.max_batch_size = self.max_batch_size.max(size);
        }
    
        pub fn stats(&self) -> (usize, usize, usize) {
            (self.total_items, self.total_batches, self.max_batch_size)
        }
    }
    
    impl Default for StatsSink {
        fn default() -> Self {
            Self::new()
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_batch_sink_auto_flush() {
            let mut sink: BatchSink<i32> = BatchSink::new(3);
            for i in 1..=9 {
                sink.send(i).unwrap();
            }
            let batches = sink.into_batches();
            assert_eq!(batches.len(), 3);
            assert_eq!(batches[0], vec![1, 2, 3]);
            assert_eq!(batches[1], vec![4, 5, 6]);
            assert_eq!(batches[2], vec![7, 8, 9]);
        }
    
        #[test]
        fn test_batch_sink_manual_flush() {
            let mut sink: BatchSink<i32> = BatchSink::new(5);
            for i in 1..=3 {
                sink.send(i).unwrap();
            }
            assert_eq!(sink.buffered_count(), 3);
            sink.flush().unwrap();
            assert_eq!(sink.buffered_count(), 0);
            let batches = sink.into_batches();
            assert_eq!(batches[0], vec![1, 2, 3]);
        }
    
        #[test]
        fn test_batch_sink_partial_batch() {
            let mut sink: BatchSink<i32> = BatchSink::new(3);
            for i in 1..=5 {
                sink.send(i).unwrap();
            }
            sink.flush().unwrap();
            let batches = sink.into_batches();
            assert_eq!(batches.len(), 2);
            assert_eq!(batches[0], vec![1, 2, 3]);
            assert_eq!(batches[1], vec![4, 5]);
        }
    
        #[test]
        fn test_callback_sink() {
            let mut collected = Vec::new();
            let mut sink = CallbackSink::new(2, |batch: Vec<i32>| {
                collected.extend(batch);
                Ok(())
            });
    
            for i in 1..=4 {
                sink.send(i).unwrap();
            }
    
            assert_eq!(sink.flush_count(), 2);
        }
    
        #[test]
        fn test_stats_sink() {
            let mut stats = StatsSink::new();
            stats.record_batch(3);
            stats.record_batch(5);
            stats.record_batch(2);
    
            let (items, batches, max) = stats.stats();
            assert_eq!(items, 10);
            assert_eq!(batches, 3);
            assert_eq!(max, 5);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_batch_sink_auto_flush() {
            let mut sink: BatchSink<i32> = BatchSink::new(3);
            for i in 1..=9 {
                sink.send(i).unwrap();
            }
            let batches = sink.into_batches();
            assert_eq!(batches.len(), 3);
            assert_eq!(batches[0], vec![1, 2, 3]);
            assert_eq!(batches[1], vec![4, 5, 6]);
            assert_eq!(batches[2], vec![7, 8, 9]);
        }
    
        #[test]
        fn test_batch_sink_manual_flush() {
            let mut sink: BatchSink<i32> = BatchSink::new(5);
            for i in 1..=3 {
                sink.send(i).unwrap();
            }
            assert_eq!(sink.buffered_count(), 3);
            sink.flush().unwrap();
            assert_eq!(sink.buffered_count(), 0);
            let batches = sink.into_batches();
            assert_eq!(batches[0], vec![1, 2, 3]);
        }
    
        #[test]
        fn test_batch_sink_partial_batch() {
            let mut sink: BatchSink<i32> = BatchSink::new(3);
            for i in 1..=5 {
                sink.send(i).unwrap();
            }
            sink.flush().unwrap();
            let batches = sink.into_batches();
            assert_eq!(batches.len(), 2);
            assert_eq!(batches[0], vec![1, 2, 3]);
            assert_eq!(batches[1], vec![4, 5]);
        }
    
        #[test]
        fn test_callback_sink() {
            let mut collected = Vec::new();
            let mut sink = CallbackSink::new(2, |batch: Vec<i32>| {
                collected.extend(batch);
                Ok(())
            });
    
            for i in 1..=4 {
                sink.send(i).unwrap();
            }
    
            assert_eq!(sink.flush_count(), 2);
        }
    
        #[test]
        fn test_stats_sink() {
            let mut stats = StatsSink::new();
            stats.record_batch(3);
            stats.record_batch(5);
            stats.record_batch(2);
    
            let (items, batches, max) = stats.stats();
            assert_eq!(items, 10);
            assert_eq!(batches, 3);
            assert_eq!(max, 5);
        }
    }

    Deep Comparison

    OCaml vs Rust: Async Sink

    Sink Structure

    OCaml:

    type 'a sink = { mutable buf: 'a list; cap: int; flush_fn: 'a list -> unit }
    
    let send s x =
      s.buf <- x :: s.buf;
      if List.length s.buf >= s.cap then (s.flush_fn (List.rev s.buf); s.buf <- [])
    

    Rust:

    struct BatchSink<T> {
        buffer: VecDeque<T>,
        capacity: usize,
        flushed_batches: Vec<Vec<T>>,
    }
    
    impl<T: Clone> BatchSink<T> {
        fn send(&mut self, item: T) -> Result<(), String> {
            self.buffer.push_back(item);
            if self.buffer.len() >= self.capacity { self.flush()?; }
            Ok(())
        }
    }
    

    Key Differences

    AspectOCamlRust
    Buffer typeList (prepend, reverse)VecDeque (O(1) both ends)
    Flush callbackFirst-class functionStored or called directly
    Error handlingUnit/exceptionsResult type
    OwnershipGC handlesExplicit drain/move

    Exercises

  • Add Drop to BatchSink<T> that flushes any remaining buffered items when the sink is dropped.
  • Implement a LogSink that batches log messages and writes them to a writer in configurable-size chunks.
  • Add a high-watermark threshold: when the buffer reaches 80% capacity, start emitting backpressure signals.
  • Open Source Repos