ExamplesBy LevelBy TopicLearning Paths
921 Advanced

921-async-io — Async I/O

Functional Programming

Tutorial

The Problem

Blocking I/O operations — reading files, making network requests, querying databases — pause the calling thread for potentially seconds. In a single-threaded server, this means all other requests wait. The solution is asynchronous I/O: initiate the operation, yield control while waiting, resume when the result is ready. OCaml's Lwt library implements this with promises; Python uses asyncio; JavaScript uses Promise. Rust's async/await syntax desugars to state machines (Futures) with no runtime overhead. This example shows thread-based async I/O as a foundation before introducing the async keyword.

🎯 Learning Outcomes

  • • Offload blocking I/O to threads using std::sync::mpsc channels
  • • Understand spawn_io_task as a manual implementation of async I/O
  • • Process text statistics (lines, words, chars) from asynchronously fetched data
  • • Use BufRead and Write traits for buffered I/O
  • • Compare with OCaml's Lwt_unix and Lwt_io for non-blocking I/O
  • Code Example

    #![allow(clippy::all)]
    use std::io::{self, BufRead, Write};
    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    // Offload blocking I/O to a thread (async-style)
    fn read_string_async(content: String) -> impl FnOnce() -> String {
        move || {
            thread::sleep(Duration::from_millis(1)); // simulate I/O latency
            content
        }
    }
    
    fn spawn_io_task<T: Send + 'static>(f: impl FnOnce() -> T + Send + 'static) -> mpsc::Receiver<T> {
        let (tx, rx) = mpsc::channel();
        thread::spawn(move || {
            let _ = tx.send(f());
        });
        rx
    }
    
    fn process_text(text: &str) -> (usize, usize, usize) {
        let lines = text.lines().count();
        let words = text.split_whitespace().count();
        let chars = text.chars().count();
        (lines, words, chars)
    }
    
    fn write_to_buf(buf: &mut Vec<u8>, data: &[u8]) -> io::Result<usize> {
        buf.write(data)
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        #[test]
        fn process_text_counts() {
            let (l, w, c) = process_text("hello world\nfoo");
            assert_eq!(l, 2);
            assert_eq!(w, 3);
            assert_eq!(c, 15);
        }
        #[test]
        fn spawn_io_returns_value() {
            let rx = spawn_io_task(|| 42i32);
            assert_eq!(rx.recv().unwrap(), 42);
        }
        #[test]
        fn nonblocking_listener() {
            let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
            l.set_nonblocking(true).unwrap();
            assert!(l.accept().is_err()); // WouldBlock or similar
        }
    }

    Key Differences

  • Thread vs future: Rust std uses OS threads for async-like behavior; true async/await requires a runtime (tokio, async-std); OCaml's Lwt uses cooperative green threads.
  • Channel as future: Rust mpsc::Receiver<T> serves as a manual "future" — .recv() blocks until ready; OCaml Lwt.t is a typed promise.
  • Backpressure: Thread-based approach has no built-in backpressure; tokio and Lwt both support backpressure via buffer bounds.
  • Error handling: Both languages surface I/O errors as Result/option; Rust ? operator is more ergonomic than OCaml's explicit error threading.
  • OCaml Approach

    OCaml's Lwt_unix.read and Lwt_io.read_line are the async I/O primitives. Lwt.bind (Lwt_io.read_file path) (fun content -> Lwt.return (process content)) is the promise-chaining equivalent. OCaml 5's Eio library uses structured concurrency with fibers. For synchronous I/O in OCaml, In_channel.input_all reads a whole file. The big difference: OCaml's async model is cooperative (green threads), Rust's standard library uses OS threads for async, with tokio/async-std for true async I/O.

    Full Source

    #![allow(clippy::all)]
    use std::io::{self, BufRead, Write};
    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    // Offload blocking I/O to a thread (async-style)
    fn read_string_async(content: String) -> impl FnOnce() -> String {
        move || {
            thread::sleep(Duration::from_millis(1)); // simulate I/O latency
            content
        }
    }
    
    fn spawn_io_task<T: Send + 'static>(f: impl FnOnce() -> T + Send + 'static) -> mpsc::Receiver<T> {
        let (tx, rx) = mpsc::channel();
        thread::spawn(move || {
            let _ = tx.send(f());
        });
        rx
    }
    
    fn process_text(text: &str) -> (usize, usize, usize) {
        let lines = text.lines().count();
        let words = text.split_whitespace().count();
        let chars = text.chars().count();
        (lines, words, chars)
    }
    
    fn write_to_buf(buf: &mut Vec<u8>, data: &[u8]) -> io::Result<usize> {
        buf.write(data)
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        #[test]
        fn process_text_counts() {
            let (l, w, c) = process_text("hello world\nfoo");
            assert_eq!(l, 2);
            assert_eq!(w, 3);
            assert_eq!(c, 15);
        }
        #[test]
        fn spawn_io_returns_value() {
            let rx = spawn_io_task(|| 42i32);
            assert_eq!(rx.recv().unwrap(), 42);
        }
        #[test]
        fn nonblocking_listener() {
            let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
            l.set_nonblocking(true).unwrap();
            assert!(l.accept().is_err()); // WouldBlock or similar
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
        #[test]
        fn process_text_counts() {
            let (l, w, c) = process_text("hello world\nfoo");
            assert_eq!(l, 2);
            assert_eq!(w, 3);
            assert_eq!(c, 15);
        }
        #[test]
        fn spawn_io_returns_value() {
            let rx = spawn_io_task(|| 42i32);
            assert_eq!(rx.recv().unwrap(), 42);
        }
        #[test]
        fn nonblocking_listener() {
            let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
            l.set_nonblocking(true).unwrap();
            assert!(l.accept().is_err()); // WouldBlock or similar
        }
    }

    Deep Comparison

    921-async-io — Language Comparison

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Implement read_multiple_files(paths: &[&str]) -> Vec<String> using parallel threads and mpsc to read all files concurrently.
  • Write a pipeline that reads text, processes it in a worker thread, and streams formatted output back via a channel.
  • Implement a rate-limited I/O function that spawns tasks but sleeps between spawns to avoid overwhelming the I/O subsystem.
  • Open Source Repos