981 Async Sequence
Tutorial
The Problem
Demonstrate sequential async composition in Rust using chained .await calls. Each step uses the result of the previous step, mirroring OCaml's let* x = ... in sequential Lwt monadic binding. Implement a data-lookup pipeline: fetch a user ID, then use it to fetch the name, then use the name to fetch the email — each step depends on the previous.
🎯 Learning Outcomes
async fn chains where each step let x = f().await uses the previous resultlet x = f().await; let y = g(x).await; y is OCaml's let* x = f () in let* y = g x in y.await is monadic bind for Future — each binds the value from the prior computationasync fn that take ownership of parameters via move closuresCode Example
#![allow(clippy::all)]
// 981: Sequential Async Chain
// Rust: sequential .await calls — like OCaml's let* x = ... in let* y = ...
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
fn block_on<F: Future>(mut fut: F) -> F::Output {
let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
fn noop(_: *const ()) {}
fn clone(p: *const ()) -> RawWaker {
RawWaker::new(p, &VT)
}
static VT: RawWakerVTable = RawWakerVTable::new(clone, noop, noop, noop);
let waker = unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VT)) };
let mut cx = Context::from_waker(&waker);
match fut.as_mut().poll(&mut cx) {
Poll::Ready(v) => v,
Poll::Pending => panic!("not ready"),
}
}
// --- Simulated async data-fetch functions ---
async fn fetch_user_id() -> u32 {
42
}
async fn fetch_user_name(_id: u32) -> String {
"Alice".to_string()
}
async fn fetch_user_email(_name: &str) -> String {
"alice@example.com".to_string()
}
// --- Approach 1: Sequential let-binding with await ---
// Each .await = one let* step in OCaml
async fn full_lookup() -> (u32, String, String) {
let id = fetch_user_id().await;
let name = fetch_user_name(id).await;
let email = fetch_user_email(&name).await;
(id, name, email)
}
// --- Approach 2: Accumulating through a pipeline ---
async fn step1(x: i32) -> i32 {
x + 10
}
async fn step2(x: i32) -> i32 {
x * 2
}
async fn step3(x: i32) -> i32 {
x - 5
}
async fn pipeline_seq(input: i32) -> (i32, i32, i32, i32) {
let a = step1(input).await;
let b = step2(a).await;
let c = step3(b).await;
(input, a, b, c)
}
// --- Approach 3: Error-aware sequence with ? operator ---
async fn guarded_div(a: i32, b: i32) -> Result<i32, &'static str> {
if b == 0 {
Err("division by zero")
} else {
Ok(a / b)
}
}
async fn safe_pipeline() -> Result<i32, &'static str> {
let x = 100;
let y = guarded_div(x, 4).await?; // let*? — short-circuits on Err
let z = guarded_div(y, 5).await?;
Ok(z)
}
async fn bad_pipeline() -> Result<i32, &'static str> {
let x = 100;
let _y = guarded_div(x, 0).await?; // short-circuits here
Ok(999)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_full_lookup() {
let (id, name, email) = block_on(full_lookup());
assert_eq!(id, 42);
assert_eq!(name, "Alice");
assert_eq!(email, "alice@example.com");
}
#[test]
fn test_pipeline_seq() {
let (orig, a, b, c) = block_on(pipeline_seq(5));
assert_eq!(orig, 5);
assert_eq!(a, 15); // 5+10
assert_eq!(b, 30); // 15*2
assert_eq!(c, 25); // 30-5
}
#[test]
fn test_safe_pipeline() {
assert_eq!(block_on(safe_pipeline()), Ok(5)); // 100/4=25, 25/5=5
}
#[test]
fn test_bad_pipeline_short_circuits() {
assert_eq!(block_on(bad_pipeline()), Err("division by zero"));
}
#[test]
fn test_sequential_order() {
// Values from earlier awaits are available in later ones
let result = block_on(async {
let a = step1(10).await; // 20
let b = step2(a).await; // 40 — uses a
let c = step3(b).await; // 35 — uses b
c
});
assert_eq!(result, 35);
}
}Key Differences
| Aspect | Rust | OCaml |
|---|---|---|
| Sequential bind | let x = f().await | let* x = f () in |
| Explicit bind | f().then(\|x\| g(x)) | Lwt.bind (f ()) (fun x -> g x) |
| State machine | Compiler-generated from async fn | Lwt heap-allocated continuations |
| Ownership | id moved into fetch_user_name(id) | GC manages lifetime |
Sequential .await is the building block of any async workflow. Use it when each step depends on the previous result. Use parallel join (example 982) when steps are independent.
OCaml Approach
open Lwt.Syntax
let fetch_user_id () = Lwt.return 42
let fetch_user_name _id = Lwt.return "Alice"
let fetch_user_email _name = Lwt.return "alice@example.com"
(* Sequential with let* *)
let full_lookup () =
let* id = fetch_user_id () in
let* name = fetch_user_name id in
let* email = fetch_user_email name in
Lwt.return (id, name, email)
(* Equivalent with explicit bind *)
let full_lookup_bind () =
Lwt.bind (fetch_user_id ()) (fun id ->
Lwt.bind (fetch_user_name id) (fun name ->
Lwt.bind (fetch_user_email name) (fun email ->
Lwt.return (id, name, email))))
let* is syntactic sugar for Lwt.bind. The two forms are identical in semantics. Rust's let x = f().await is the third form of the same pattern — sequential monadic binding.
Full Source
#![allow(clippy::all)]
// 981: Sequential Async Chain
// Rust: sequential .await calls — like OCaml's let* x = ... in let* y = ...
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
fn block_on<F: Future>(mut fut: F) -> F::Output {
let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
fn noop(_: *const ()) {}
fn clone(p: *const ()) -> RawWaker {
RawWaker::new(p, &VT)
}
static VT: RawWakerVTable = RawWakerVTable::new(clone, noop, noop, noop);
let waker = unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VT)) };
let mut cx = Context::from_waker(&waker);
match fut.as_mut().poll(&mut cx) {
Poll::Ready(v) => v,
Poll::Pending => panic!("not ready"),
}
}
// --- Simulated async data-fetch functions ---
async fn fetch_user_id() -> u32 {
42
}
async fn fetch_user_name(_id: u32) -> String {
"Alice".to_string()
}
async fn fetch_user_email(_name: &str) -> String {
"alice@example.com".to_string()
}
// --- Approach 1: Sequential let-binding with await ---
// Each .await = one let* step in OCaml
async fn full_lookup() -> (u32, String, String) {
let id = fetch_user_id().await;
let name = fetch_user_name(id).await;
let email = fetch_user_email(&name).await;
(id, name, email)
}
// --- Approach 2: Accumulating through a pipeline ---
async fn step1(x: i32) -> i32 {
x + 10
}
async fn step2(x: i32) -> i32 {
x * 2
}
async fn step3(x: i32) -> i32 {
x - 5
}
async fn pipeline_seq(input: i32) -> (i32, i32, i32, i32) {
let a = step1(input).await;
let b = step2(a).await;
let c = step3(b).await;
(input, a, b, c)
}
// --- Approach 3: Error-aware sequence with ? operator ---
async fn guarded_div(a: i32, b: i32) -> Result<i32, &'static str> {
if b == 0 {
Err("division by zero")
} else {
Ok(a / b)
}
}
async fn safe_pipeline() -> Result<i32, &'static str> {
let x = 100;
let y = guarded_div(x, 4).await?; // let*? — short-circuits on Err
let z = guarded_div(y, 5).await?;
Ok(z)
}
async fn bad_pipeline() -> Result<i32, &'static str> {
let x = 100;
let _y = guarded_div(x, 0).await?; // short-circuits here
Ok(999)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_full_lookup() {
let (id, name, email) = block_on(full_lookup());
assert_eq!(id, 42);
assert_eq!(name, "Alice");
assert_eq!(email, "alice@example.com");
}
#[test]
fn test_pipeline_seq() {
let (orig, a, b, c) = block_on(pipeline_seq(5));
assert_eq!(orig, 5);
assert_eq!(a, 15); // 5+10
assert_eq!(b, 30); // 15*2
assert_eq!(c, 25); // 30-5
}
#[test]
fn test_safe_pipeline() {
assert_eq!(block_on(safe_pipeline()), Ok(5)); // 100/4=25, 25/5=5
}
#[test]
fn test_bad_pipeline_short_circuits() {
assert_eq!(block_on(bad_pipeline()), Err("division by zero"));
}
#[test]
fn test_sequential_order() {
// Values from earlier awaits are available in later ones
let result = block_on(async {
let a = step1(10).await; // 20
let b = step2(a).await; // 40 — uses a
let c = step3(b).await; // 35 — uses b
c
});
assert_eq!(result, 35);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_full_lookup() {
let (id, name, email) = block_on(full_lookup());
assert_eq!(id, 42);
assert_eq!(name, "Alice");
assert_eq!(email, "alice@example.com");
}
#[test]
fn test_pipeline_seq() {
let (orig, a, b, c) = block_on(pipeline_seq(5));
assert_eq!(orig, 5);
assert_eq!(a, 15); // 5+10
assert_eq!(b, 30); // 15*2
assert_eq!(c, 25); // 30-5
}
#[test]
fn test_safe_pipeline() {
assert_eq!(block_on(safe_pipeline()), Ok(5)); // 100/4=25, 25/5=5
}
#[test]
fn test_bad_pipeline_short_circuits() {
assert_eq!(block_on(bad_pipeline()), Err("division by zero"));
}
#[test]
fn test_sequential_order() {
// Values from earlier awaits are available in later ones
let result = block_on(async {
let a = step1(10).await; // 20
let b = step2(a).await; // 40 — uses a
let c = step3(b).await; // 35 — uses b
c
});
assert_eq!(result, 35);
}
}
Deep Comparison
Sequential Async Chain — Comparison
Core Insight
Sequential async chains are monadic do-notation: each step depends on the previous. Both languages provide sugar for this — OCaml's let* (ppx_let / Lwt.Syntax) and Rust's .await on sequential lines. Values computed in earlier steps are in scope for later steps.
OCaml Approach
let* x = fut in ... desugars to Lwt.bind fut (fun x -> ...)open Lwt.Syntax or ppx_letLwt_result and let*?Rust Approach
.await calls read like normal imperative code? operator provides short-circuit error propagation (like let*?)Comparison Table
| Concept | OCaml (Lwt) | Rust |
|---|---|---|
| Sequential bind | let* x = f () in let* y = g x in … | let x = f().await; let y = g(x).await |
| Error short-circuit | let*? x = f () in … | let x = f().await?; |
| Later steps see earlier | Yes — closure captures x | Yes — in same async fn scope |
| Sugar requires | open Lwt.Syntax | Just async fn + .await |
| Execution order | Strict left-to-right | Strict left-to-right |
| Parallelism | No (use Lwt.both / Lwt.join) | No (use join! or threads) |
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
Result<T, String> and use ? inside async fn.retry_async<F, T>(n: usize, f: F) -> Result<T, String> that retries a failing async fn up to n times.timeout_async(duration, fut) that returns Err if the future does not complete within duration.full_lookup to use tokio and measure the actual concurrency behavior under a real async runtime.