ExamplesBy LevelBy TopicLearning Paths
981 Intermediate

981 Async Sequence

Functional Programming

Tutorial

The Problem

Demonstrate sequential async composition in Rust using chained .await calls. Each step uses the result of the previous step, mirroring OCaml's let* x = ... in sequential Lwt monadic binding. Implement a data-lookup pipeline: fetch a user ID, then use it to fetch the name, then use the name to fetch the email — each step depends on the previous.

🎯 Learning Outcomes

  • • Implement sequential async fn chains where each step let x = f().await uses the previous result
  • • Recognize that let x = f().await; let y = g(x).await; y is OCaml's let* x = f () in let* y = g x in y
  • • Understand that sequential .await is monadic bind for Future — each binds the value from the prior computation
  • • Implement helper async fn that take ownership of parameters via move closures
  • • Compare with parallel join (982) — sequential is ordered, parallel is unordered
  • Code Example

    #![allow(clippy::all)]
    // 981: Sequential Async Chain
    // Rust: sequential .await calls — like OCaml's let* x = ... in let* y = ...
    
    use std::future::Future;
    use std::pin::Pin;
    use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
    
    fn block_on<F: Future>(mut fut: F) -> F::Output {
        let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
        fn noop(_: *const ()) {}
        fn clone(p: *const ()) -> RawWaker {
            RawWaker::new(p, &VT)
        }
        static VT: RawWakerVTable = RawWakerVTable::new(clone, noop, noop, noop);
        let waker = unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VT)) };
        let mut cx = Context::from_waker(&waker);
        match fut.as_mut().poll(&mut cx) {
            Poll::Ready(v) => v,
            Poll::Pending => panic!("not ready"),
        }
    }
    
    // --- Simulated async data-fetch functions ---
    async fn fetch_user_id() -> u32 {
        42
    }
    async fn fetch_user_name(_id: u32) -> String {
        "Alice".to_string()
    }
    async fn fetch_user_email(_name: &str) -> String {
        "alice@example.com".to_string()
    }
    
    // --- Approach 1: Sequential let-binding with await ---
    // Each .await = one let* step in OCaml
    async fn full_lookup() -> (u32, String, String) {
        let id = fetch_user_id().await;
        let name = fetch_user_name(id).await;
        let email = fetch_user_email(&name).await;
        (id, name, email)
    }
    
    // --- Approach 2: Accumulating through a pipeline ---
    async fn step1(x: i32) -> i32 {
        x + 10
    }
    async fn step2(x: i32) -> i32 {
        x * 2
    }
    async fn step3(x: i32) -> i32 {
        x - 5
    }
    
    async fn pipeline_seq(input: i32) -> (i32, i32, i32, i32) {
        let a = step1(input).await;
        let b = step2(a).await;
        let c = step3(b).await;
        (input, a, b, c)
    }
    
    // --- Approach 3: Error-aware sequence with ? operator ---
    async fn guarded_div(a: i32, b: i32) -> Result<i32, &'static str> {
        if b == 0 {
            Err("division by zero")
        } else {
            Ok(a / b)
        }
    }
    
    async fn safe_pipeline() -> Result<i32, &'static str> {
        let x = 100;
        let y = guarded_div(x, 4).await?; // let*? — short-circuits on Err
        let z = guarded_div(y, 5).await?;
        Ok(z)
    }
    
    async fn bad_pipeline() -> Result<i32, &'static str> {
        let x = 100;
        let _y = guarded_div(x, 0).await?; // short-circuits here
        Ok(999)
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_full_lookup() {
            let (id, name, email) = block_on(full_lookup());
            assert_eq!(id, 42);
            assert_eq!(name, "Alice");
            assert_eq!(email, "alice@example.com");
        }
    
        #[test]
        fn test_pipeline_seq() {
            let (orig, a, b, c) = block_on(pipeline_seq(5));
            assert_eq!(orig, 5);
            assert_eq!(a, 15); // 5+10
            assert_eq!(b, 30); // 15*2
            assert_eq!(c, 25); // 30-5
        }
    
        #[test]
        fn test_safe_pipeline() {
            assert_eq!(block_on(safe_pipeline()), Ok(5)); // 100/4=25, 25/5=5
        }
    
        #[test]
        fn test_bad_pipeline_short_circuits() {
            assert_eq!(block_on(bad_pipeline()), Err("division by zero"));
        }
    
        #[test]
        fn test_sequential_order() {
            // Values from earlier awaits are available in later ones
            let result = block_on(async {
                let a = step1(10).await; // 20
                let b = step2(a).await; // 40 — uses a
                let c = step3(b).await; // 35 — uses b
                c
            });
            assert_eq!(result, 35);
        }
    }

    Key Differences

    AspectRustOCaml
    Sequential bindlet x = f().awaitlet* x = f () in
    Explicit bindf().then(\|x\| g(x))Lwt.bind (f ()) (fun x -> g x)
    State machineCompiler-generated from async fnLwt heap-allocated continuations
    Ownershipid moved into fetch_user_name(id)GC manages lifetime

    Sequential .await is the building block of any async workflow. Use it when each step depends on the previous result. Use parallel join (example 982) when steps are independent.

    OCaml Approach

    open Lwt.Syntax
    
    let fetch_user_id () = Lwt.return 42
    let fetch_user_name _id = Lwt.return "Alice"
    let fetch_user_email _name = Lwt.return "alice@example.com"
    
    (* Sequential with let* *)
    let full_lookup () =
      let* id    = fetch_user_id () in
      let* name  = fetch_user_name id in
      let* email = fetch_user_email name in
      Lwt.return (id, name, email)
    
    (* Equivalent with explicit bind *)
    let full_lookup_bind () =
      Lwt.bind (fetch_user_id ()) (fun id ->
      Lwt.bind (fetch_user_name id) (fun name ->
      Lwt.bind (fetch_user_email name) (fun email ->
      Lwt.return (id, name, email))))
    

    let* is syntactic sugar for Lwt.bind. The two forms are identical in semantics. Rust's let x = f().await is the third form of the same pattern — sequential monadic binding.

    Full Source

    #![allow(clippy::all)]
    // 981: Sequential Async Chain
    // Rust: sequential .await calls — like OCaml's let* x = ... in let* y = ...
    
    use std::future::Future;
    use std::pin::Pin;
    use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
    
    fn block_on<F: Future>(mut fut: F) -> F::Output {
        let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
        fn noop(_: *const ()) {}
        fn clone(p: *const ()) -> RawWaker {
            RawWaker::new(p, &VT)
        }
        static VT: RawWakerVTable = RawWakerVTable::new(clone, noop, noop, noop);
        let waker = unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VT)) };
        let mut cx = Context::from_waker(&waker);
        match fut.as_mut().poll(&mut cx) {
            Poll::Ready(v) => v,
            Poll::Pending => panic!("not ready"),
        }
    }
    
    // --- Simulated async data-fetch functions ---
    async fn fetch_user_id() -> u32 {
        42
    }
    async fn fetch_user_name(_id: u32) -> String {
        "Alice".to_string()
    }
    async fn fetch_user_email(_name: &str) -> String {
        "alice@example.com".to_string()
    }
    
    // --- Approach 1: Sequential let-binding with await ---
    // Each .await = one let* step in OCaml
    async fn full_lookup() -> (u32, String, String) {
        let id = fetch_user_id().await;
        let name = fetch_user_name(id).await;
        let email = fetch_user_email(&name).await;
        (id, name, email)
    }
    
    // --- Approach 2: Accumulating through a pipeline ---
    async fn step1(x: i32) -> i32 {
        x + 10
    }
    async fn step2(x: i32) -> i32 {
        x * 2
    }
    async fn step3(x: i32) -> i32 {
        x - 5
    }
    
    async fn pipeline_seq(input: i32) -> (i32, i32, i32, i32) {
        let a = step1(input).await;
        let b = step2(a).await;
        let c = step3(b).await;
        (input, a, b, c)
    }
    
    // --- Approach 3: Error-aware sequence with ? operator ---
    async fn guarded_div(a: i32, b: i32) -> Result<i32, &'static str> {
        if b == 0 {
            Err("division by zero")
        } else {
            Ok(a / b)
        }
    }
    
    async fn safe_pipeline() -> Result<i32, &'static str> {
        let x = 100;
        let y = guarded_div(x, 4).await?; // let*? — short-circuits on Err
        let z = guarded_div(y, 5).await?;
        Ok(z)
    }
    
    async fn bad_pipeline() -> Result<i32, &'static str> {
        let x = 100;
        let _y = guarded_div(x, 0).await?; // short-circuits here
        Ok(999)
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_full_lookup() {
            let (id, name, email) = block_on(full_lookup());
            assert_eq!(id, 42);
            assert_eq!(name, "Alice");
            assert_eq!(email, "alice@example.com");
        }
    
        #[test]
        fn test_pipeline_seq() {
            let (orig, a, b, c) = block_on(pipeline_seq(5));
            assert_eq!(orig, 5);
            assert_eq!(a, 15); // 5+10
            assert_eq!(b, 30); // 15*2
            assert_eq!(c, 25); // 30-5
        }
    
        #[test]
        fn test_safe_pipeline() {
            assert_eq!(block_on(safe_pipeline()), Ok(5)); // 100/4=25, 25/5=5
        }
    
        #[test]
        fn test_bad_pipeline_short_circuits() {
            assert_eq!(block_on(bad_pipeline()), Err("division by zero"));
        }
    
        #[test]
        fn test_sequential_order() {
            // Values from earlier awaits are available in later ones
            let result = block_on(async {
                let a = step1(10).await; // 20
                let b = step2(a).await; // 40 — uses a
                let c = step3(b).await; // 35 — uses b
                c
            });
            assert_eq!(result, 35);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_full_lookup() {
            let (id, name, email) = block_on(full_lookup());
            assert_eq!(id, 42);
            assert_eq!(name, "Alice");
            assert_eq!(email, "alice@example.com");
        }
    
        #[test]
        fn test_pipeline_seq() {
            let (orig, a, b, c) = block_on(pipeline_seq(5));
            assert_eq!(orig, 5);
            assert_eq!(a, 15); // 5+10
            assert_eq!(b, 30); // 15*2
            assert_eq!(c, 25); // 30-5
        }
    
        #[test]
        fn test_safe_pipeline() {
            assert_eq!(block_on(safe_pipeline()), Ok(5)); // 100/4=25, 25/5=5
        }
    
        #[test]
        fn test_bad_pipeline_short_circuits() {
            assert_eq!(block_on(bad_pipeline()), Err("division by zero"));
        }
    
        #[test]
        fn test_sequential_order() {
            // Values from earlier awaits are available in later ones
            let result = block_on(async {
                let a = step1(10).await; // 20
                let b = step2(a).await; // 40 — uses a
                let c = step3(b).await; // 35 — uses b
                c
            });
            assert_eq!(result, 35);
        }
    }

    Deep Comparison

    Sequential Async Chain — Comparison

    Core Insight

    Sequential async chains are monadic do-notation: each step depends on the previous. Both languages provide sugar for this — OCaml's let* (ppx_let / Lwt.Syntax) and Rust's .await on sequential lines. Values computed in earlier steps are in scope for later steps.

    OCaml Approach

  • let* x = fut in ... desugars to Lwt.bind fut (fun x -> ...)
  • • Requires open Lwt.Syntax or ppx_let
  • • Short-circuit via Lwt_result and let*?
  • • Each step is truly sequential — Lwt schedules them one after another
  • Rust Approach

  • • Sequential .await calls read like normal imperative code
  • • Variables from earlier awaits are in scope for later ones (captures)
  • ? operator provides short-circuit error propagation (like let*?)
  • • The compiler generates a state machine — no runtime overhead per step
  • Comparison Table

    ConceptOCaml (Lwt)Rust
    Sequential bindlet* x = f () in let* y = g x in …let x = f().await; let y = g(x).await
    Error short-circuitlet*? x = f () in …let x = f().await?;
    Later steps see earlierYes — closure captures xYes — in same async fn scope
    Sugar requiresopen Lwt.SyntaxJust async fn + .await
    Execution orderStrict left-to-rightStrict left-to-right
    ParallelismNo (use Lwt.both / Lwt.join)No (use join! or threads)

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Add error handling: change the functions to return Result<T, String> and use ? inside async fn.
  • Implement retry_async<F, T>(n: usize, f: F) -> Result<T, String> that retries a failing async fn up to n times.
  • Implement a timeout_async(duration, fut) that returns Err if the future does not complete within duration.
  • Chain 10 dependent async steps and verify that the output is correct.
  • Convert full_lookup to use tokio and measure the actual concurrency behavior under a real async runtime.
  • Open Source Repos