769-streaming-parser-pattern — Streaming Parser Pattern
Tutorial Video
Text description (accessibility)
This video demonstrates the "769-streaming-parser-pattern — Streaming Parser Pattern" functional Rust example. Difficulty level: Fundamental. Key concepts covered: Functional Programming. Network protocols deliver data in chunks, not as complete messages. Key difference from OCaml: 1. **State representation**: Rust uses an explicit `ParseState` enum; Angstrom uses implicit continuation
Tutorial
The Problem
Network protocols deliver data in chunks, not as complete messages. A TCP stream may deliver the header in one packet and the body in several others. Streaming parsers process data incrementally, maintaining state between feed() calls and yielding complete messages only when enough data has arrived. This pattern is fundamental to every network server: HTTP, WebSocket, gRPC, and custom binary protocols all use streaming parsers. Without it, you must buffer entire messages in memory before parsing.
🎯 Learning Outcomes
ParseState enum: Ready, InHeader, InBody, Complete, Errorfeed(data: &[u8]) -> usize method that returns bytes consumedError stateCode Example
pub enum ParseState {
Ready,
InBody { remaining: usize },
Complete,
Error(String),
}
pub struct StreamingParser {
state: ParseState,
buffer: Vec<u8>,
}
impl StreamingParser {
pub fn feed(&mut self, data: &[u8]) -> usize {
for &byte in data {
match &self.state {
ParseState::Ready => { /* ... */ }
ParseState::InBody { remaining } => { /* ... */ }
_ => break,
}
}
}
}Key Differences
ParseState enum; Angstrom uses implicit continuation-based state via its parser type.StreamingParser owns its buffers; Angstrom manages a circular buffer internally, minimizing copies.feed() -> usize reports consumed bytes for flow control; Angstrom integrates with Lwt/Eio for automatic backpressure.let* n = take_int 4 in ...) are more composable than Rust's explicit state machine, but the Rust version is more transparent.OCaml Approach
OCaml's Angstrom library is designed exactly for this use case. It provides a streaming interface with Angstrom.Buffered.parse that feeds chunks to the parser incrementally. State is maintained implicitly in the parser combinator's continuation. Lwt-based servers use Angstrom_lwt_unix.parse_reader to drive streaming parsers from network sockets directly.
Full Source
#![allow(clippy::all)]
//! # Streaming Parser Pattern
//!
//! Process data incrementally without loading everything into memory.
/// Parser state
#[derive(Debug)]
pub enum ParseState {
Ready,
InHeader,
InBody { remaining: usize },
Complete,
Error(String),
}
/// Streaming message parser
pub struct StreamingParser {
state: ParseState,
header: Vec<u8>,
body: Vec<u8>,
body_length: usize,
}
impl StreamingParser {
pub fn new() -> Self {
StreamingParser {
state: ParseState::Ready,
header: Vec::new(),
body: Vec::new(),
body_length: 0,
}
}
/// Feed data to the parser, returns bytes consumed
pub fn feed(&mut self, data: &[u8]) -> usize {
let mut consumed = 0;
for &byte in data {
consumed += 1;
match &self.state {
ParseState::Ready => {
if byte == b':' {
// Header format: "length:body"
let len_str = String::from_utf8_lossy(&self.header);
match len_str.parse::<usize>() {
Ok(len) => {
self.body_length = len;
self.state = ParseState::InBody { remaining: len };
}
Err(_) => {
self.state = ParseState::Error("Invalid length".to_string());
break;
}
}
self.header.clear();
} else {
self.header.push(byte);
}
}
ParseState::InBody { remaining } => {
self.body.push(byte);
let new_remaining = remaining - 1;
if new_remaining == 0 {
self.state = ParseState::Complete;
break;
} else {
self.state = ParseState::InBody {
remaining: new_remaining,
};
}
}
ParseState::Complete | ParseState::Error(_) => break,
ParseState::InHeader => {}
}
}
consumed
}
pub fn is_complete(&self) -> bool {
matches!(self.state, ParseState::Complete)
}
pub fn is_error(&self) -> bool {
matches!(self.state, ParseState::Error(_))
}
pub fn take_body(&mut self) -> Vec<u8> {
std::mem::take(&mut self.body)
}
pub fn reset(&mut self) {
self.state = ParseState::Ready;
self.header.clear();
self.body.clear();
self.body_length = 0;
}
}
impl Default for StreamingParser {
fn default() -> Self {
Self::new()
}
}
/// Line-based streaming reader
pub struct LineReader {
buffer: Vec<u8>,
lines: Vec<String>,
}
impl LineReader {
pub fn new() -> Self {
LineReader {
buffer: Vec::new(),
lines: Vec::new(),
}
}
/// Feed data, extracting complete lines
pub fn feed(&mut self, data: &[u8]) {
for &byte in data {
if byte == b'\n' {
if let Ok(line) = String::from_utf8(std::mem::take(&mut self.buffer)) {
self.lines.push(line);
}
} else if byte != b'\r' {
self.buffer.push(byte);
}
}
}
/// Take all complete lines
pub fn take_lines(&mut self) -> Vec<String> {
std::mem::take(&mut self.lines)
}
/// Check for remaining buffered data
pub fn has_partial(&self) -> bool {
!self.buffer.is_empty()
}
}
impl Default for LineReader {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_streaming_parser_complete() {
let mut parser = StreamingParser::new();
parser.feed(b"5:hel");
assert!(!parser.is_complete());
parser.feed(b"lo");
assert!(parser.is_complete());
assert_eq!(parser.take_body(), b"hello");
}
#[test]
fn test_streaming_parser_single_feed() {
let mut parser = StreamingParser::new();
parser.feed(b"3:abc");
assert!(parser.is_complete());
assert_eq!(parser.take_body(), b"abc");
}
#[test]
fn test_streaming_parser_reset() {
let mut parser = StreamingParser::new();
parser.feed(b"2:ok");
assert!(parser.is_complete());
parser.reset();
parser.feed(b"3:yes");
assert!(parser.is_complete());
assert_eq!(parser.take_body(), b"yes");
}
#[test]
fn test_line_reader() {
let mut reader = LineReader::new();
reader.feed(b"hello\nwor");
let lines = reader.take_lines();
assert_eq!(lines, vec!["hello"]);
assert!(reader.has_partial());
reader.feed(b"ld\n");
let lines = reader.take_lines();
assert_eq!(lines, vec!["world"]);
}
#[test]
fn test_line_reader_multiple() {
let mut reader = LineReader::new();
reader.feed(b"a\nb\nc\n");
let lines = reader.take_lines();
assert_eq!(lines, vec!["a", "b", "c"]);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_streaming_parser_complete() {
let mut parser = StreamingParser::new();
parser.feed(b"5:hel");
assert!(!parser.is_complete());
parser.feed(b"lo");
assert!(parser.is_complete());
assert_eq!(parser.take_body(), b"hello");
}
#[test]
fn test_streaming_parser_single_feed() {
let mut parser = StreamingParser::new();
parser.feed(b"3:abc");
assert!(parser.is_complete());
assert_eq!(parser.take_body(), b"abc");
}
#[test]
fn test_streaming_parser_reset() {
let mut parser = StreamingParser::new();
parser.feed(b"2:ok");
assert!(parser.is_complete());
parser.reset();
parser.feed(b"3:yes");
assert!(parser.is_complete());
assert_eq!(parser.take_body(), b"yes");
}
#[test]
fn test_line_reader() {
let mut reader = LineReader::new();
reader.feed(b"hello\nwor");
let lines = reader.take_lines();
assert_eq!(lines, vec!["hello"]);
assert!(reader.has_partial());
reader.feed(b"ld\n");
let lines = reader.take_lines();
assert_eq!(lines, vec!["world"]);
}
#[test]
fn test_line_reader_multiple() {
let mut reader = LineReader::new();
reader.feed(b"a\nb\nc\n");
let lines = reader.take_lines();
assert_eq!(lines, vec!["a", "b", "c"]);
}
}
Deep Comparison
OCaml vs Rust: Streaming Parser Pattern
State Machine Parser
Rust
pub enum ParseState {
Ready,
InBody { remaining: usize },
Complete,
Error(String),
}
pub struct StreamingParser {
state: ParseState,
buffer: Vec<u8>,
}
impl StreamingParser {
pub fn feed(&mut self, data: &[u8]) -> usize {
for &byte in data {
match &self.state {
ParseState::Ready => { /* ... */ }
ParseState::InBody { remaining } => { /* ... */ }
_ => break,
}
}
}
}
OCaml
type parse_state =
| Ready
| In_body of { remaining: int }
| Complete
| Error of string
type parser = {
mutable state: parse_state;
mutable buffer: bytes;
}
let feed parser data =
(* Process byte by byte *)
Key Differences
| Aspect | OCaml | Rust |
|---|---|---|
| Mutability | mutable fields | &mut self |
| State enum | Variants | Enum with data |
| Buffer growth | Bytes.extend | Vec::push |
| Take pattern | Manual clear | std::mem::take |
Exercises
feed() call, accumulating completed messages in a Vec<(String, Vec<u8>)>.body_length > 0 but no bytes arrive for N calls, transition to Error("timeout") and reset the parser.reset() method that returns the parser to Ready state and clears all accumulated buffers, enabling reuse for the next message.