921-async-io — Async I/O
Tutorial
The Problem
Blocking I/O operations — reading files, making network requests, querying databases — pause the calling thread for potentially seconds. In a single-threaded server, this means all other requests wait. The solution is asynchronous I/O: initiate the operation, yield control while waiting, resume when the result is ready. OCaml's Lwt library implements this with promises; Python uses asyncio; JavaScript uses Promise. Rust's async/await syntax desugars to state machines (Futures) with no runtime overhead. This example shows thread-based async I/O as a foundation before introducing the async keyword.
🎯 Learning Outcomes
std::sync::mpsc channelsspawn_io_task as a manual implementation of async I/OBufRead and Write traits for buffered I/OLwt_unix and Lwt_io for non-blocking I/OCode Example
#![allow(clippy::all)]
use std::io::{self, BufRead, Write};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
// Offload blocking I/O to a thread (async-style)
fn read_string_async(content: String) -> impl FnOnce() -> String {
move || {
thread::sleep(Duration::from_millis(1)); // simulate I/O latency
content
}
}
fn spawn_io_task<T: Send + 'static>(f: impl FnOnce() -> T + Send + 'static) -> mpsc::Receiver<T> {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let _ = tx.send(f());
});
rx
}
fn process_text(text: &str) -> (usize, usize, usize) {
let lines = text.lines().count();
let words = text.split_whitespace().count();
let chars = text.chars().count();
(lines, words, chars)
}
fn write_to_buf(buf: &mut Vec<u8>, data: &[u8]) -> io::Result<usize> {
buf.write(data)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn process_text_counts() {
let (l, w, c) = process_text("hello world\nfoo");
assert_eq!(l, 2);
assert_eq!(w, 3);
assert_eq!(c, 15);
}
#[test]
fn spawn_io_returns_value() {
let rx = spawn_io_task(|| 42i32);
assert_eq!(rx.recv().unwrap(), 42);
}
#[test]
fn nonblocking_listener() {
let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
l.set_nonblocking(true).unwrap();
assert!(l.accept().is_err()); // WouldBlock or similar
}
}Key Differences
async/await requires a runtime (tokio, async-std); OCaml's Lwt uses cooperative green threads.mpsc::Receiver<T> serves as a manual "future" — .recv() blocks until ready; OCaml Lwt.t is a typed promise.tokio and Lwt both support backpressure via buffer bounds.Result/option; Rust ? operator is more ergonomic than OCaml's explicit error threading.OCaml Approach
OCaml's Lwt_unix.read and Lwt_io.read_line are the async I/O primitives. Lwt.bind (Lwt_io.read_file path) (fun content -> Lwt.return (process content)) is the promise-chaining equivalent. OCaml 5's Eio library uses structured concurrency with fibers. For synchronous I/O in OCaml, In_channel.input_all reads a whole file. The big difference: OCaml's async model is cooperative (green threads), Rust's standard library uses OS threads for async, with tokio/async-std for true async I/O.
Full Source
#![allow(clippy::all)]
use std::io::{self, BufRead, Write};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
// Offload blocking I/O to a thread (async-style)
fn read_string_async(content: String) -> impl FnOnce() -> String {
move || {
thread::sleep(Duration::from_millis(1)); // simulate I/O latency
content
}
}
fn spawn_io_task<T: Send + 'static>(f: impl FnOnce() -> T + Send + 'static) -> mpsc::Receiver<T> {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let _ = tx.send(f());
});
rx
}
fn process_text(text: &str) -> (usize, usize, usize) {
let lines = text.lines().count();
let words = text.split_whitespace().count();
let chars = text.chars().count();
(lines, words, chars)
}
fn write_to_buf(buf: &mut Vec<u8>, data: &[u8]) -> io::Result<usize> {
buf.write(data)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn process_text_counts() {
let (l, w, c) = process_text("hello world\nfoo");
assert_eq!(l, 2);
assert_eq!(w, 3);
assert_eq!(c, 15);
}
#[test]
fn spawn_io_returns_value() {
let rx = spawn_io_task(|| 42i32);
assert_eq!(rx.recv().unwrap(), 42);
}
#[test]
fn nonblocking_listener() {
let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
l.set_nonblocking(true).unwrap();
assert!(l.accept().is_err()); // WouldBlock or similar
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn process_text_counts() {
let (l, w, c) = process_text("hello world\nfoo");
assert_eq!(l, 2);
assert_eq!(w, 3);
assert_eq!(c, 15);
}
#[test]
fn spawn_io_returns_value() {
let rx = spawn_io_task(|| 42i32);
assert_eq!(rx.recv().unwrap(), 42);
}
#[test]
fn nonblocking_listener() {
let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
l.set_nonblocking(true).unwrap();
assert!(l.accept().is_err()); // WouldBlock or similar
}
}
Deep Comparison
921-async-io — Language Comparison
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
read_multiple_files(paths: &[&str]) -> Vec<String> using parallel threads and mpsc to read all files concurrently.