347: Blocking in Async
Tutorial
The Problem
Async runtimes like Tokio use a fixed pool of threads to drive many concurrent tasks. If any task calls a blocking operation (CPU-intensive computation, synchronous I/O, thread::sleep), it stalls the entire thread, preventing all other tasks on that thread from making progress. This is the "blocking in async" problem — it can silently starve the runtime of threads, causing latency spikes and timeouts. The solution is to offload blocking work to a dedicated thread pool (tokio::task::spawn_blocking) so the async thread pool remains responsive. Understanding this boundary is critical for mixing synchronous libraries (database drivers, compression codecs) with async code.
🎯 Learning Outcomes
thread::spawn (or tokio::task::spawn_blocking) to offload blocking workspawn_blocking communicates results back via a oneshot channelMutex lock across an .await pointCode Example
#![allow(clippy::all)]
//! # Blocking in Async
//! How to safely run blocking operations in async contexts.
use std::thread;
use std::time::Duration;
pub fn blocking_computation(n: u64) -> u64 {
thread::sleep(Duration::from_millis(10));
(1..=n).product()
}
pub fn spawn_blocking<F, R>(f: F) -> thread::JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
thread::spawn(f)
}
pub fn run_blocking_batch<T, R, F>(items: Vec<T>, f: F) -> Vec<R>
where
T: Send + 'static,
R: Send + 'static,
F: Fn(T) -> R + Send + Sync + Clone + 'static,
{
let handles: Vec<_> = items
.into_iter()
.map(|item| {
let f = f.clone();
thread::spawn(move || f(item))
})
.collect();
handles.into_iter().map(|h| h.join().unwrap()).collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn blocking_works() {
assert_eq!(blocking_computation(5), 120);
}
#[test]
fn spawn_blocking_works() {
let h = spawn_blocking(|| 2 + 2);
assert_eq!(h.join().unwrap(), 4);
}
#[test]
fn batch_blocking() {
let results = run_blocking_batch(vec![1, 2, 3], |x| x * 2);
assert_eq!(results, vec![2, 4, 6]);
}
}Key Differences
| Aspect | Rust spawn_blocking | OCaml Lwt_preemptive.detach |
|---|---|---|
| Thread pool | Separate from async workers | Lwt_preemptive thread pool |
| Return type | JoinHandle<R> / async result | Lwt promise |
| Backpressure | Tokio limits pool size | Configurable thread pool size |
| Composability | .await in async context | let%lwt in Lwt context |
| Detection of mistakes | None at compile time | None at compile time |
OCaml Approach
Lwt uses Lwt_preemptive.detach to run blocking code in a thread pool:
let blocking_work n =
Unix.sleepf 0.01;
List.fold_left ( * ) 1 (List.init n (fun i -> i + 1))
let async_wrapper n =
Lwt_preemptive.detach (fun () -> blocking_work n) ()
detach runs the function in a preemptive thread, returning an Lwt promise. The Lwt event loop is not blocked — it continues handling other promises while the thread runs. This is the direct equivalent of spawn_blocking.
Full Source
#![allow(clippy::all)]
//! # Blocking in Async
//! How to safely run blocking operations in async contexts.
use std::thread;
use std::time::Duration;
pub fn blocking_computation(n: u64) -> u64 {
thread::sleep(Duration::from_millis(10));
(1..=n).product()
}
pub fn spawn_blocking<F, R>(f: F) -> thread::JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
thread::spawn(f)
}
pub fn run_blocking_batch<T, R, F>(items: Vec<T>, f: F) -> Vec<R>
where
T: Send + 'static,
R: Send + 'static,
F: Fn(T) -> R + Send + Sync + Clone + 'static,
{
let handles: Vec<_> = items
.into_iter()
.map(|item| {
let f = f.clone();
thread::spawn(move || f(item))
})
.collect();
handles.into_iter().map(|h| h.join().unwrap()).collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn blocking_works() {
assert_eq!(blocking_computation(5), 120);
}
#[test]
fn spawn_blocking_works() {
let h = spawn_blocking(|| 2 + 2);
assert_eq!(h.join().unwrap(), 4);
}
#[test]
fn batch_blocking() {
let results = run_blocking_batch(vec![1, 2, 3], |x| x * 2);
assert_eq!(results, vec![2, 4, 6]);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn blocking_works() {
assert_eq!(blocking_computation(5), 120);
}
#[test]
fn spawn_blocking_works() {
let h = spawn_blocking(|| 2 + 2);
assert_eq!(h.join().unwrap(), 4);
}
#[test]
fn batch_blocking() {
let results = run_blocking_batch(vec![1, 2, 3], |x| x * 2);
assert_eq!(results, vec![2, 4, 6]);
}
}
Deep Comparison
OCaml vs Rust: Blocking In Async
Overview
See the example.rs and example.ml files for detailed implementations.
Key Differences
| Aspect | OCaml | Rust |
|---|---|---|
| Type system | Hindley-Milner | Ownership + traits |
| Memory | GC | Zero-cost abstractions |
| Mutability | Explicit ref | mut keyword |
| Error handling | Option/Result | Result<T, E> |
See README.md for detailed comparison.
Exercises
thread::sleep(1s) directly (blocking!) and 1 prints a message every 100ms; observe that the print task starves; fix by using spawn_blocking.run_blocking_batch to return Vec<Result<R, String>> where each thread's panic is caught with thread::catch_unwind and converted to Err.run_blocking_batch to run at most N threads simultaneously using a semaphore (Arc<(Mutex<usize>, Condvar)>); test with 20 items and limit 4.