ExamplesBy LevelBy TopicLearning Paths
342 Advanced

342: Async I/O Concepts

Functional Programming

Tutorial

The Problem

Servers that handle thousands of simultaneous network connections cannot dedicate one OS thread per connection — thread stacks alone would consume gigabytes of memory. Async I/O solves this by decoupling waiting from threads: while one operation waits for a disk read or network packet, the same thread processes other work. This model traces back to the C10K problem (Dan Kegel, 1999) and the design of event loops in Node.js, nginx, and later Tokio. Rust's async/await brings this efficiency without sacrificing type safety or requiring a garbage collector, achieving C-level throughput with safe, readable code.

🎯 Learning Outcomes

  • • Understand the difference between blocking and non-blocking I/O at the system call level
  • • See how polling-based I/O avoids wasting threads on waiting
  • • Use channels to simulate the async fan-out pattern with threads
  • • Recognize that async fn compiles to a state machine that calls poll()
  • • Understand that an executor drives futures by repeatedly polling them until Ready
  • • Compare async overhead (task scheduling) vs thread overhead (stack allocation)
  • Code Example

    #![allow(clippy::all)]
    // 342: Async I/O Concepts
    // Polling vs blocking, simulated with threads and channels
    
    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    // Approach 1: Blocking I/O
    fn blocking_read() -> String {
        thread::sleep(Duration::from_millis(10));
        "data from blocking read".to_string()
    }
    
    // Approach 2: Threaded I/O with channels
    fn parallel_reads() -> Vec<String> {
        let (tx1, rx) = mpsc::channel();
        let tx2 = tx1.clone();
    
        thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            tx1.send("result1".to_string()).unwrap();
        });
    
        thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            tx2.send("result2".to_string()).unwrap();
        });
    
        let mut results = Vec::new();
        for _ in 0..2 {
            results.push(rx.recv().unwrap());
        }
        results
    }
    
    // Approach 3: Polling simulation
    enum PollResult<T> {
        Ready(T),
        Pending,
    }
    
    fn simulate_poll(counter: &mut u32) -> PollResult<&'static str> {
        if *counter >= 3 {
            *counter = 0;
            PollResult::Ready("done")
        } else {
            *counter += 1;
            PollResult::Pending
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_blocking() {
            assert_eq!(blocking_read(), "data from blocking read");
        }
    
        #[test]
        fn test_parallel() {
            let results = parallel_reads();
            assert_eq!(results.len(), 2);
            assert!(results.contains(&"result1".to_string()));
            assert!(results.contains(&"result2".to_string()));
        }
    
        #[test]
        fn test_poll() {
            let mut counter = 0;
            assert!(matches!(simulate_poll(&mut counter), PollResult::Pending));
            assert!(matches!(simulate_poll(&mut counter), PollResult::Pending));
            assert!(matches!(simulate_poll(&mut counter), PollResult::Pending));
            assert!(matches!(
                simulate_poll(&mut counter),
                PollResult::Ready("done")
            ));
        }
    }

    Key Differences

    AspectRust async/awaitOCaml Lwt
    Concurrency modelMulti-threaded (Tokio) or single-threadSingle-threaded by default
    Syntaxasync fn / await built into languagelet%lwt PPX rewriter macro
    Error propagation? operator in async contextLwt_result.bind or let*
    Cancellationtokio::select! / CancellationTokenLwt.cancel
    Zero-costYes (state machines, no allocation)No (heap-allocated continuations)

    OCaml Approach

    OCaml's Lwt library implements cooperative async I/O via promises:

    let%lwt content1 = Lwt_io.read_file "a.txt" in
    let%lwt content2 = Lwt_io.read_file "b.txt" in
    (* sequential - overlapping requires Lwt.both *)
    let%lwt (c1, c2) = Lwt.both
      (Lwt_io.read_file "a.txt")
      (Lwt_io.read_file "b.txt") in
    Lwt_io.printf "%s %s\n" c1 c2
    

    Lwt.both runs two I/O operations concurrently on one thread, analogous to tokio::join!. Both Lwt and Tokio use an event loop driven by OS-level readiness notifications.

    Full Source

    #![allow(clippy::all)]
    // 342: Async I/O Concepts
    // Polling vs blocking, simulated with threads and channels
    
    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    // Approach 1: Blocking I/O
    fn blocking_read() -> String {
        thread::sleep(Duration::from_millis(10));
        "data from blocking read".to_string()
    }
    
    // Approach 2: Threaded I/O with channels
    fn parallel_reads() -> Vec<String> {
        let (tx1, rx) = mpsc::channel();
        let tx2 = tx1.clone();
    
        thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            tx1.send("result1".to_string()).unwrap();
        });
    
        thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            tx2.send("result2".to_string()).unwrap();
        });
    
        let mut results = Vec::new();
        for _ in 0..2 {
            results.push(rx.recv().unwrap());
        }
        results
    }
    
    // Approach 3: Polling simulation
    enum PollResult<T> {
        Ready(T),
        Pending,
    }
    
    fn simulate_poll(counter: &mut u32) -> PollResult<&'static str> {
        if *counter >= 3 {
            *counter = 0;
            PollResult::Ready("done")
        } else {
            *counter += 1;
            PollResult::Pending
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_blocking() {
            assert_eq!(blocking_read(), "data from blocking read");
        }
    
        #[test]
        fn test_parallel() {
            let results = parallel_reads();
            assert_eq!(results.len(), 2);
            assert!(results.contains(&"result1".to_string()));
            assert!(results.contains(&"result2".to_string()));
        }
    
        #[test]
        fn test_poll() {
            let mut counter = 0;
            assert!(matches!(simulate_poll(&mut counter), PollResult::Pending));
            assert!(matches!(simulate_poll(&mut counter), PollResult::Pending));
            assert!(matches!(simulate_poll(&mut counter), PollResult::Pending));
            assert!(matches!(
                simulate_poll(&mut counter),
                PollResult::Ready("done")
            ));
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_blocking() {
            assert_eq!(blocking_read(), "data from blocking read");
        }
    
        #[test]
        fn test_parallel() {
            let results = parallel_reads();
            assert_eq!(results.len(), 2);
            assert!(results.contains(&"result1".to_string()));
            assert!(results.contains(&"result2".to_string()));
        }
    
        #[test]
        fn test_poll() {
            let mut counter = 0;
            assert!(matches!(simulate_poll(&mut counter), PollResult::Pending));
            assert!(matches!(simulate_poll(&mut counter), PollResult::Pending));
            assert!(matches!(simulate_poll(&mut counter), PollResult::Pending));
            assert!(matches!(
                simulate_poll(&mut counter),
                PollResult::Ready("done")
            ));
        }
    }

    Deep Comparison

    Core Insight

    Blocking I/O wastes threads waiting; async uses polling/callbacks to handle many connections on few threads

    OCaml Approach

  • • See example.ml for implementation
  • Rust Approach

  • • See example.rs for implementation
  • Comparison Table

    FeatureOCamlRust
    Seeexample.mlexample.rs

    Exercises

  • Timer overlap: Simulate 5 independent 10ms delays overlapping: spawn 5 threads, each sleeping 10ms and sending to a channel; measure that total elapsed time is ~10ms not ~50ms.
  • Polling loop: Implement a simple polling loop that calls simulate_poll every millisecond until it returns Ready — then refactor to use a Condvar to avoid busy-waiting.
  • Async port: Using tokio, rewrite parallel_reads as an async function using tokio::join! with two tokio::time::sleep futures; verify it completes in ~10ms not ~20ms.
  • Open Source Repos