ExamplesBy LevelBy TopicLearning Paths
769 Fundamental

769-streaming-parser-pattern — Streaming Parser Pattern

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "769-streaming-parser-pattern — Streaming Parser Pattern" functional Rust example. Difficulty level: Fundamental. Key concepts covered: Functional Programming. Network protocols deliver data in chunks, not as complete messages. Key difference from OCaml: 1. **State representation**: Rust uses an explicit `ParseState` enum; Angstrom uses implicit continuation

Tutorial

The Problem

Network protocols deliver data in chunks, not as complete messages. A TCP stream may deliver the header in one packet and the body in several others. Streaming parsers process data incrementally, maintaining state between feed() calls and yielding complete messages only when enough data has arrived. This pattern is fundamental to every network server: HTTP, WebSocket, gRPC, and custom binary protocols all use streaming parsers. Without it, you must buffer entire messages in memory before parsing.

🎯 Learning Outcomes

  • • Implement a state machine parser with ParseState enum: Ready, InHeader, InBody, Complete, Error
  • • Write a feed(data: &[u8]) -> usize method that returns bytes consumed
  • • Handle partial messages: pause and resume when the buffer is exhausted mid-message
  • • Detect framing errors and transition to an Error state
  • • Test with fragmented input delivered in chunks of 1, 2, or 3 bytes at a time
  • Code Example

    pub enum ParseState {
        Ready,
        InBody { remaining: usize },
        Complete,
        Error(String),
    }
    
    pub struct StreamingParser {
        state: ParseState,
        buffer: Vec<u8>,
    }
    
    impl StreamingParser {
        pub fn feed(&mut self, data: &[u8]) -> usize {
            for &byte in data {
                match &self.state {
                    ParseState::Ready => { /* ... */ }
                    ParseState::InBody { remaining } => { /* ... */ }
                    _ => break,
                }
            }
        }
    }

    Key Differences

  • State representation: Rust uses an explicit ParseState enum; Angstrom uses implicit continuation-based state via its parser type.
  • Buffer management: Rust's StreamingParser owns its buffers; Angstrom manages a circular buffer internally, minimizing copies.
  • Backpressure: Rust's feed() -> usize reports consumed bytes for flow control; Angstrom integrates with Lwt/Eio for automatic backpressure.
  • Combinators: Angstrom's combinator-based parsers (let* n = take_int 4 in ...) are more composable than Rust's explicit state machine, but the Rust version is more transparent.
  • OCaml Approach

    OCaml's Angstrom library is designed exactly for this use case. It provides a streaming interface with Angstrom.Buffered.parse that feeds chunks to the parser incrementally. State is maintained implicitly in the parser combinator's continuation. Lwt-based servers use Angstrom_lwt_unix.parse_reader to drive streaming parsers from network sockets directly.

    Full Source

    #![allow(clippy::all)]
    //! # Streaming Parser Pattern
    //!
    //! Process data incrementally without loading everything into memory.
    
    /// Parser state
    #[derive(Debug)]
    pub enum ParseState {
        Ready,
        InHeader,
        InBody { remaining: usize },
        Complete,
        Error(String),
    }
    
    /// Streaming message parser
    pub struct StreamingParser {
        state: ParseState,
        header: Vec<u8>,
        body: Vec<u8>,
        body_length: usize,
    }
    
    impl StreamingParser {
        pub fn new() -> Self {
            StreamingParser {
                state: ParseState::Ready,
                header: Vec::new(),
                body: Vec::new(),
                body_length: 0,
            }
        }
    
        /// Feed data to the parser, returns bytes consumed
        pub fn feed(&mut self, data: &[u8]) -> usize {
            let mut consumed = 0;
    
            for &byte in data {
                consumed += 1;
                match &self.state {
                    ParseState::Ready => {
                        if byte == b':' {
                            // Header format: "length:body"
                            let len_str = String::from_utf8_lossy(&self.header);
                            match len_str.parse::<usize>() {
                                Ok(len) => {
                                    self.body_length = len;
                                    self.state = ParseState::InBody { remaining: len };
                                }
                                Err(_) => {
                                    self.state = ParseState::Error("Invalid length".to_string());
                                    break;
                                }
                            }
                            self.header.clear();
                        } else {
                            self.header.push(byte);
                        }
                    }
                    ParseState::InBody { remaining } => {
                        self.body.push(byte);
                        let new_remaining = remaining - 1;
                        if new_remaining == 0 {
                            self.state = ParseState::Complete;
                            break;
                        } else {
                            self.state = ParseState::InBody {
                                remaining: new_remaining,
                            };
                        }
                    }
                    ParseState::Complete | ParseState::Error(_) => break,
                    ParseState::InHeader => {}
                }
            }
    
            consumed
        }
    
        pub fn is_complete(&self) -> bool {
            matches!(self.state, ParseState::Complete)
        }
    
        pub fn is_error(&self) -> bool {
            matches!(self.state, ParseState::Error(_))
        }
    
        pub fn take_body(&mut self) -> Vec<u8> {
            std::mem::take(&mut self.body)
        }
    
        pub fn reset(&mut self) {
            self.state = ParseState::Ready;
            self.header.clear();
            self.body.clear();
            self.body_length = 0;
        }
    }
    
    impl Default for StreamingParser {
        fn default() -> Self {
            Self::new()
        }
    }
    
    /// Line-based streaming reader
    pub struct LineReader {
        buffer: Vec<u8>,
        lines: Vec<String>,
    }
    
    impl LineReader {
        pub fn new() -> Self {
            LineReader {
                buffer: Vec::new(),
                lines: Vec::new(),
            }
        }
    
        /// Feed data, extracting complete lines
        pub fn feed(&mut self, data: &[u8]) {
            for &byte in data {
                if byte == b'\n' {
                    if let Ok(line) = String::from_utf8(std::mem::take(&mut self.buffer)) {
                        self.lines.push(line);
                    }
                } else if byte != b'\r' {
                    self.buffer.push(byte);
                }
            }
        }
    
        /// Take all complete lines
        pub fn take_lines(&mut self) -> Vec<String> {
            std::mem::take(&mut self.lines)
        }
    
        /// Check for remaining buffered data
        pub fn has_partial(&self) -> bool {
            !self.buffer.is_empty()
        }
    }
    
    impl Default for LineReader {
        fn default() -> Self {
            Self::new()
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_streaming_parser_complete() {
            let mut parser = StreamingParser::new();
    
            parser.feed(b"5:hel");
            assert!(!parser.is_complete());
    
            parser.feed(b"lo");
            assert!(parser.is_complete());
            assert_eq!(parser.take_body(), b"hello");
        }
    
        #[test]
        fn test_streaming_parser_single_feed() {
            let mut parser = StreamingParser::new();
            parser.feed(b"3:abc");
            assert!(parser.is_complete());
            assert_eq!(parser.take_body(), b"abc");
        }
    
        #[test]
        fn test_streaming_parser_reset() {
            let mut parser = StreamingParser::new();
            parser.feed(b"2:ok");
            assert!(parser.is_complete());
    
            parser.reset();
            parser.feed(b"3:yes");
            assert!(parser.is_complete());
            assert_eq!(parser.take_body(), b"yes");
        }
    
        #[test]
        fn test_line_reader() {
            let mut reader = LineReader::new();
    
            reader.feed(b"hello\nwor");
            let lines = reader.take_lines();
            assert_eq!(lines, vec!["hello"]);
            assert!(reader.has_partial());
    
            reader.feed(b"ld\n");
            let lines = reader.take_lines();
            assert_eq!(lines, vec!["world"]);
        }
    
        #[test]
        fn test_line_reader_multiple() {
            let mut reader = LineReader::new();
            reader.feed(b"a\nb\nc\n");
            let lines = reader.take_lines();
            assert_eq!(lines, vec!["a", "b", "c"]);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_streaming_parser_complete() {
            let mut parser = StreamingParser::new();
    
            parser.feed(b"5:hel");
            assert!(!parser.is_complete());
    
            parser.feed(b"lo");
            assert!(parser.is_complete());
            assert_eq!(parser.take_body(), b"hello");
        }
    
        #[test]
        fn test_streaming_parser_single_feed() {
            let mut parser = StreamingParser::new();
            parser.feed(b"3:abc");
            assert!(parser.is_complete());
            assert_eq!(parser.take_body(), b"abc");
        }
    
        #[test]
        fn test_streaming_parser_reset() {
            let mut parser = StreamingParser::new();
            parser.feed(b"2:ok");
            assert!(parser.is_complete());
    
            parser.reset();
            parser.feed(b"3:yes");
            assert!(parser.is_complete());
            assert_eq!(parser.take_body(), b"yes");
        }
    
        #[test]
        fn test_line_reader() {
            let mut reader = LineReader::new();
    
            reader.feed(b"hello\nwor");
            let lines = reader.take_lines();
            assert_eq!(lines, vec!["hello"]);
            assert!(reader.has_partial());
    
            reader.feed(b"ld\n");
            let lines = reader.take_lines();
            assert_eq!(lines, vec!["world"]);
        }
    
        #[test]
        fn test_line_reader_multiple() {
            let mut reader = LineReader::new();
            reader.feed(b"a\nb\nc\n");
            let lines = reader.take_lines();
            assert_eq!(lines, vec!["a", "b", "c"]);
        }
    }

    Deep Comparison

    OCaml vs Rust: Streaming Parser Pattern

    State Machine Parser

    Rust

    pub enum ParseState {
        Ready,
        InBody { remaining: usize },
        Complete,
        Error(String),
    }
    
    pub struct StreamingParser {
        state: ParseState,
        buffer: Vec<u8>,
    }
    
    impl StreamingParser {
        pub fn feed(&mut self, data: &[u8]) -> usize {
            for &byte in data {
                match &self.state {
                    ParseState::Ready => { /* ... */ }
                    ParseState::InBody { remaining } => { /* ... */ }
                    _ => break,
                }
            }
        }
    }
    

    OCaml

    type parse_state =
      | Ready
      | In_body of { remaining: int }
      | Complete
      | Error of string
    
    type parser = {
      mutable state: parse_state;
      mutable buffer: bytes;
    }
    
    let feed parser data =
      (* Process byte by byte *)
    

    Key Differences

    AspectOCamlRust
    Mutabilitymutable fields&mut self
    State enumVariantsEnum with data
    Buffer growthBytes.extendVec::push
    Take patternManual clearstd::mem::take

    Exercises

  • Extend the parser to handle multiple messages in a single feed() call, accumulating completed messages in a Vec<(String, Vec<u8>)>.
  • Add a timeout mechanism: if body_length > 0 but no bytes arrive for N calls, transition to Error("timeout") and reset the parser.
  • Implement a reset() method that returns the parser to Ready state and clears all accumulated buffers, enabling reuse for the next message.
  • Open Source Repos