ExamplesBy LevelBy TopicLearning Paths
982 Advanced

982 Async Join

Functional Programming

Tutorial

The Problem

Demonstrate parallel async execution in Rust using thread::spawn + join() as a synchronous analog of Lwt.both and Lwt.all. Spawn multiple threads for independent computations, then join all threads to collect results. Implement both a two-thread join and a parallel map over a vector of tasks.

🎯 Learning Outcomes

  • • Implement parallel_both<A, B> spawning two threads and joining both — analog of Lwt.both
  • • Implement parallel_map<T, F> spawning one thread per task and collecting results — analog of Lwt.all
  • • Understand the Send + 'static bounds: types crossing thread boundaries must be Send; closures must own their data
  • • Distinguish sequential join (wait then proceed) from async join (both run concurrently, resolved together)
  • • Recognize that thread::spawn + join is the sync version; tokio::join! is the async equivalent
  • Code Example

    #![allow(clippy::all)]
    // 982: Join Parallel Async
    // Rust: std::thread::spawn + join() — like OCaml's Lwt.both
    
    use std::thread;
    
    // --- Approach 1: Join two threads (Lwt.both analogue) ---
    fn parallel_both<A, B, F1, F2>(f1: F1, f2: F2) -> (A, B)
    where
        A: Send + 'static,
        B: Send + 'static,
        F1: FnOnce() -> A + Send + 'static,
        F2: FnOnce() -> B + Send + 'static,
    {
        let h1 = thread::spawn(f1);
        let h2 = thread::spawn(f2);
        // Both run concurrently; join waits for both
        let a = h1.join().expect("thread 1 panicked");
        let b = h2.join().expect("thread 2 panicked");
        (a, b)
    }
    
    // --- Approach 2: Join N tasks and collect results ---
    fn parallel_map<T, F>(tasks: Vec<F>) -> Vec<T>
    where
        T: Send + 'static,
        F: FnOnce() -> T + Send + 'static,
    {
        let handles: Vec<_> = tasks.into_iter().map(thread::spawn).collect();
        handles
            .into_iter()
            .map(|h| h.join().expect("task panicked"))
            .collect()
    }
    
    // --- Approach 3: Parallel sum ---
    fn parallel_sum(ns: Vec<i32>) -> i32 {
        let handles: Vec<_> = ns
            .into_iter()
            .map(|n| thread::spawn(move || n * n))
            .collect();
        handles.into_iter().map(|h| h.join().unwrap()).sum()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_parallel_both() {
            let (a, b) = parallel_both(|| 6 * 7, || 10 + 20);
            assert_eq!(a, 42);
            assert_eq!(b, 30);
        }
    
        #[test]
        fn test_parallel_map() {
            let mut results = parallel_map(vec![
                Box::new(|| 2 + 2) as Box<dyn FnOnce() -> i32 + Send>,
                Box::new(|| 3 * 3),
                Box::new(|| 10 - 1),
            ]);
            results.sort(); // order may vary
            assert_eq!(results, vec![4, 9, 9]);
        }
    
        #[test]
        fn test_parallel_sum() {
            // 1+4+9+16 = 30
            assert_eq!(parallel_sum(vec![1, 2, 3, 4]), 30);
        }
    
        #[test]
        fn test_both_independent() {
            // Results don't depend on order
            let (x, y) = parallel_both(|| "hello", || 42u32);
            assert_eq!(x, "hello");
            assert_eq!(y, 42);
        }
    
        #[test]
        fn test_empty_parallel_map() {
            let results: Vec<i32> = parallel_map::<i32, fn() -> i32>(vec![]);
            assert!(results.is_empty());
        }
    }

    Key Differences

    AspectRustOCaml
    OS threadsthread::spawn — always real OS threadThread.create — real OS thread
    Async paralleltokio::join!(f1, f2)Lwt.both f1 f2
    Parallel domainsstd::threadDomain.spawn (OCaml 5+)
    Send boundRequired for cross-thread valuesNo equivalent (GC manages)
    Join resultResult<T, Box<dyn Any>>Raises exception on join if domain panicked

    thread::spawn creates a real OS thread. For high-concurrency workloads with many short tasks, prefer a thread pool (example 923) or async runtime over spawning one thread per task.

    OCaml Approach

    open Lwt
    
    (* Lwt.both: run two promises concurrently, wait for both *)
    let parallel_both f1 f2 =
      Lwt.both (f1 ()) (f2 ())
    
    (* Lwt.all: run a list of promises concurrently *)
    let parallel_map tasks =
      Lwt.all (List.map (fun f -> f ()) tasks)
    
    (* Thread-based parallel in OCaml (5.0+ domains) *)
    let parallel_both_domain f1 f2 =
      let d1 = Domain.spawn f1 in
      let d2 = Domain.spawn f2 in
      let a = Domain.join d1 in
      let b = Domain.join d2 in
      (a, b)
    

    OCaml's Lwt.both cooperatively runs two promises on a single thread (via the Lwt scheduler). For true OS-level parallelism, OCaml 5.0+ Domain.spawn is the equivalent of thread::spawn.

    Full Source

    #![allow(clippy::all)]
    // 982: Join Parallel Async
    // Rust: std::thread::spawn + join() — like OCaml's Lwt.both
    
    use std::thread;
    
    // --- Approach 1: Join two threads (Lwt.both analogue) ---
    fn parallel_both<A, B, F1, F2>(f1: F1, f2: F2) -> (A, B)
    where
        A: Send + 'static,
        B: Send + 'static,
        F1: FnOnce() -> A + Send + 'static,
        F2: FnOnce() -> B + Send + 'static,
    {
        let h1 = thread::spawn(f1);
        let h2 = thread::spawn(f2);
        // Both run concurrently; join waits for both
        let a = h1.join().expect("thread 1 panicked");
        let b = h2.join().expect("thread 2 panicked");
        (a, b)
    }
    
    // --- Approach 2: Join N tasks and collect results ---
    fn parallel_map<T, F>(tasks: Vec<F>) -> Vec<T>
    where
        T: Send + 'static,
        F: FnOnce() -> T + Send + 'static,
    {
        let handles: Vec<_> = tasks.into_iter().map(thread::spawn).collect();
        handles
            .into_iter()
            .map(|h| h.join().expect("task panicked"))
            .collect()
    }
    
    // --- Approach 3: Parallel sum ---
    fn parallel_sum(ns: Vec<i32>) -> i32 {
        let handles: Vec<_> = ns
            .into_iter()
            .map(|n| thread::spawn(move || n * n))
            .collect();
        handles.into_iter().map(|h| h.join().unwrap()).sum()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_parallel_both() {
            let (a, b) = parallel_both(|| 6 * 7, || 10 + 20);
            assert_eq!(a, 42);
            assert_eq!(b, 30);
        }
    
        #[test]
        fn test_parallel_map() {
            let mut results = parallel_map(vec![
                Box::new(|| 2 + 2) as Box<dyn FnOnce() -> i32 + Send>,
                Box::new(|| 3 * 3),
                Box::new(|| 10 - 1),
            ]);
            results.sort(); // order may vary
            assert_eq!(results, vec![4, 9, 9]);
        }
    
        #[test]
        fn test_parallel_sum() {
            // 1+4+9+16 = 30
            assert_eq!(parallel_sum(vec![1, 2, 3, 4]), 30);
        }
    
        #[test]
        fn test_both_independent() {
            // Results don't depend on order
            let (x, y) = parallel_both(|| "hello", || 42u32);
            assert_eq!(x, "hello");
            assert_eq!(y, 42);
        }
    
        #[test]
        fn test_empty_parallel_map() {
            let results: Vec<i32> = parallel_map::<i32, fn() -> i32>(vec![]);
            assert!(results.is_empty());
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_parallel_both() {
            let (a, b) = parallel_both(|| 6 * 7, || 10 + 20);
            assert_eq!(a, 42);
            assert_eq!(b, 30);
        }
    
        #[test]
        fn test_parallel_map() {
            let mut results = parallel_map(vec![
                Box::new(|| 2 + 2) as Box<dyn FnOnce() -> i32 + Send>,
                Box::new(|| 3 * 3),
                Box::new(|| 10 - 1),
            ]);
            results.sort(); // order may vary
            assert_eq!(results, vec![4, 9, 9]);
        }
    
        #[test]
        fn test_parallel_sum() {
            // 1+4+9+16 = 30
            assert_eq!(parallel_sum(vec![1, 2, 3, 4]), 30);
        }
    
        #[test]
        fn test_both_independent() {
            // Results don't depend on order
            let (x, y) = parallel_both(|| "hello", || 42u32);
            assert_eq!(x, "hello");
            assert_eq!(y, 42);
        }
    
        #[test]
        fn test_empty_parallel_map() {
            let results: Vec<i32> = parallel_map::<i32, fn() -> i32>(vec![]);
            assert!(results.is_empty());
        }
    }

    Deep Comparison

    Join Parallel Async — Comparison

    Core Insight

    Lwt.both and thread::spawn + join both express "run two things concurrently, wait for both". The key difference: Lwt uses cooperative concurrency on one thread; Rust std::thread uses OS threads with true parallelism.

    OCaml Approach

  • Lwt.both p1 p2 runs both promises on the event loop concurrently
  • • Returns (v1, v2) when both resolve
  • Lwt.all [p1; p2; p3] for N promises
  • • Cooperative — yields at I/O points, single-threaded
  • • For true parallelism: OCaml 5 Domains or Thread + mutexes
  • Rust Approach

  • thread::spawn(f) starts a real OS thread, returns JoinHandle<T>
  • handle.join() blocks until the thread completes, returns Result<T>
  • • True parallelism — all cores can be used simultaneously
  • Vec<JoinHandle> pattern for N parallel tasks
  • Send + 'static bounds ensure data is safe to transfer
  • Comparison Table

    ConceptOCaml (Lwt)Rust
    Run two in parallelLwt.both p1 p2spawn(f1); spawn(f2); join both
    Run N in parallelLwt.all [p1; p2; ...]tasks.map(spawn).map(join)
    Wait for resultlet* (a,b) = Lwt.both ...h.join().unwrap()
    Concurrency modelCooperative / event loopTrue parallelism (OS threads)
    Error propagationLwt_result.bothh.join() returns Result
    Data sharingShared heap (GC)Send + 'static + Arc

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Rewrite parallel_map to use a fixed-size thread pool (from example 923) instead of spawning per task.
  • Add error handling: change task return type to Result<T, String> and handle panics gracefully.
  • Implement parallel_filter<T, F>(items: Vec<T>, pred: F) -> Vec<T> that tests items in parallel.
  • Measure the speedup of parallel_map vs sequential map for 8 CPU-bound tasks on an 8-core machine.
  • Rewrite using tokio::join! and tokio::task::spawn to compare async vs thread-based parallelism.
  • Open Source Repos