982 Async Join
Tutorial
The Problem
Demonstrate parallel async execution in Rust using thread::spawn + join() as a synchronous analog of Lwt.both and Lwt.all. Spawn multiple threads for independent computations, then join all threads to collect results. Implement both a two-thread join and a parallel map over a vector of tasks.
🎯 Learning Outcomes
parallel_both<A, B> spawning two threads and joining both — analog of Lwt.bothparallel_map<T, F> spawning one thread per task and collecting results — analog of Lwt.allSend + 'static bounds: types crossing thread boundaries must be Send; closures must own their dataasync join (both run concurrently, resolved together)thread::spawn + join is the sync version; tokio::join! is the async equivalentCode Example
#![allow(clippy::all)]
// 982: Join Parallel Async
// Rust: std::thread::spawn + join() — like OCaml's Lwt.both
use std::thread;
// --- Approach 1: Join two threads (Lwt.both analogue) ---
fn parallel_both<A, B, F1, F2>(f1: F1, f2: F2) -> (A, B)
where
A: Send + 'static,
B: Send + 'static,
F1: FnOnce() -> A + Send + 'static,
F2: FnOnce() -> B + Send + 'static,
{
let h1 = thread::spawn(f1);
let h2 = thread::spawn(f2);
// Both run concurrently; join waits for both
let a = h1.join().expect("thread 1 panicked");
let b = h2.join().expect("thread 2 panicked");
(a, b)
}
// --- Approach 2: Join N tasks and collect results ---
fn parallel_map<T, F>(tasks: Vec<F>) -> Vec<T>
where
T: Send + 'static,
F: FnOnce() -> T + Send + 'static,
{
let handles: Vec<_> = tasks.into_iter().map(thread::spawn).collect();
handles
.into_iter()
.map(|h| h.join().expect("task panicked"))
.collect()
}
// --- Approach 3: Parallel sum ---
fn parallel_sum(ns: Vec<i32>) -> i32 {
let handles: Vec<_> = ns
.into_iter()
.map(|n| thread::spawn(move || n * n))
.collect();
handles.into_iter().map(|h| h.join().unwrap()).sum()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_both() {
let (a, b) = parallel_both(|| 6 * 7, || 10 + 20);
assert_eq!(a, 42);
assert_eq!(b, 30);
}
#[test]
fn test_parallel_map() {
let mut results = parallel_map(vec![
Box::new(|| 2 + 2) as Box<dyn FnOnce() -> i32 + Send>,
Box::new(|| 3 * 3),
Box::new(|| 10 - 1),
]);
results.sort(); // order may vary
assert_eq!(results, vec![4, 9, 9]);
}
#[test]
fn test_parallel_sum() {
// 1+4+9+16 = 30
assert_eq!(parallel_sum(vec![1, 2, 3, 4]), 30);
}
#[test]
fn test_both_independent() {
// Results don't depend on order
let (x, y) = parallel_both(|| "hello", || 42u32);
assert_eq!(x, "hello");
assert_eq!(y, 42);
}
#[test]
fn test_empty_parallel_map() {
let results: Vec<i32> = parallel_map::<i32, fn() -> i32>(vec![]);
assert!(results.is_empty());
}
}Key Differences
| Aspect | Rust | OCaml |
|---|---|---|
| OS threads | thread::spawn — always real OS thread | Thread.create — real OS thread |
| Async parallel | tokio::join!(f1, f2) | Lwt.both f1 f2 |
| Parallel domains | std::thread | Domain.spawn (OCaml 5+) |
Send bound | Required for cross-thread values | No equivalent (GC manages) |
| Join result | Result<T, Box<dyn Any>> | Raises exception on join if domain panicked |
thread::spawn creates a real OS thread. For high-concurrency workloads with many short tasks, prefer a thread pool (example 923) or async runtime over spawning one thread per task.
OCaml Approach
open Lwt
(* Lwt.both: run two promises concurrently, wait for both *)
let parallel_both f1 f2 =
Lwt.both (f1 ()) (f2 ())
(* Lwt.all: run a list of promises concurrently *)
let parallel_map tasks =
Lwt.all (List.map (fun f -> f ()) tasks)
(* Thread-based parallel in OCaml (5.0+ domains) *)
let parallel_both_domain f1 f2 =
let d1 = Domain.spawn f1 in
let d2 = Domain.spawn f2 in
let a = Domain.join d1 in
let b = Domain.join d2 in
(a, b)
OCaml's Lwt.both cooperatively runs two promises on a single thread (via the Lwt scheduler). For true OS-level parallelism, OCaml 5.0+ Domain.spawn is the equivalent of thread::spawn.
Full Source
#![allow(clippy::all)]
// 982: Join Parallel Async
// Rust: std::thread::spawn + join() — like OCaml's Lwt.both
use std::thread;
// --- Approach 1: Join two threads (Lwt.both analogue) ---
fn parallel_both<A, B, F1, F2>(f1: F1, f2: F2) -> (A, B)
where
A: Send + 'static,
B: Send + 'static,
F1: FnOnce() -> A + Send + 'static,
F2: FnOnce() -> B + Send + 'static,
{
let h1 = thread::spawn(f1);
let h2 = thread::spawn(f2);
// Both run concurrently; join waits for both
let a = h1.join().expect("thread 1 panicked");
let b = h2.join().expect("thread 2 panicked");
(a, b)
}
// --- Approach 2: Join N tasks and collect results ---
fn parallel_map<T, F>(tasks: Vec<F>) -> Vec<T>
where
T: Send + 'static,
F: FnOnce() -> T + Send + 'static,
{
let handles: Vec<_> = tasks.into_iter().map(thread::spawn).collect();
handles
.into_iter()
.map(|h| h.join().expect("task panicked"))
.collect()
}
// --- Approach 3: Parallel sum ---
fn parallel_sum(ns: Vec<i32>) -> i32 {
let handles: Vec<_> = ns
.into_iter()
.map(|n| thread::spawn(move || n * n))
.collect();
handles.into_iter().map(|h| h.join().unwrap()).sum()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_both() {
let (a, b) = parallel_both(|| 6 * 7, || 10 + 20);
assert_eq!(a, 42);
assert_eq!(b, 30);
}
#[test]
fn test_parallel_map() {
let mut results = parallel_map(vec![
Box::new(|| 2 + 2) as Box<dyn FnOnce() -> i32 + Send>,
Box::new(|| 3 * 3),
Box::new(|| 10 - 1),
]);
results.sort(); // order may vary
assert_eq!(results, vec![4, 9, 9]);
}
#[test]
fn test_parallel_sum() {
// 1+4+9+16 = 30
assert_eq!(parallel_sum(vec![1, 2, 3, 4]), 30);
}
#[test]
fn test_both_independent() {
// Results don't depend on order
let (x, y) = parallel_both(|| "hello", || 42u32);
assert_eq!(x, "hello");
assert_eq!(y, 42);
}
#[test]
fn test_empty_parallel_map() {
let results: Vec<i32> = parallel_map::<i32, fn() -> i32>(vec![]);
assert!(results.is_empty());
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_both() {
let (a, b) = parallel_both(|| 6 * 7, || 10 + 20);
assert_eq!(a, 42);
assert_eq!(b, 30);
}
#[test]
fn test_parallel_map() {
let mut results = parallel_map(vec![
Box::new(|| 2 + 2) as Box<dyn FnOnce() -> i32 + Send>,
Box::new(|| 3 * 3),
Box::new(|| 10 - 1),
]);
results.sort(); // order may vary
assert_eq!(results, vec![4, 9, 9]);
}
#[test]
fn test_parallel_sum() {
// 1+4+9+16 = 30
assert_eq!(parallel_sum(vec![1, 2, 3, 4]), 30);
}
#[test]
fn test_both_independent() {
// Results don't depend on order
let (x, y) = parallel_both(|| "hello", || 42u32);
assert_eq!(x, "hello");
assert_eq!(y, 42);
}
#[test]
fn test_empty_parallel_map() {
let results: Vec<i32> = parallel_map::<i32, fn() -> i32>(vec![]);
assert!(results.is_empty());
}
}
Deep Comparison
Join Parallel Async — Comparison
Core Insight
Lwt.both and thread::spawn + join both express "run two things concurrently, wait for both". The key difference: Lwt uses cooperative concurrency on one thread; Rust std::thread uses OS threads with true parallelism.
OCaml Approach
Lwt.both p1 p2 runs both promises on the event loop concurrently(v1, v2) when both resolveLwt.all [p1; p2; p3] for N promisesThread + mutexesRust Approach
thread::spawn(f) starts a real OS thread, returns JoinHandle<T>handle.join() blocks until the thread completes, returns Result<T>Vec<JoinHandle> pattern for N parallel tasksSend + 'static bounds ensure data is safe to transferComparison Table
| Concept | OCaml (Lwt) | Rust |
|---|---|---|
| Run two in parallel | Lwt.both p1 p2 | spawn(f1); spawn(f2); join both |
| Run N in parallel | Lwt.all [p1; p2; ...] | tasks.map(spawn).map(join) |
| Wait for result | let* (a,b) = Lwt.both ... | h.join().unwrap() |
| Concurrency model | Cooperative / event loop | True parallelism (OS threads) |
| Error propagation | Lwt_result.both | h.join() returns Result |
| Data sharing | Shared heap (GC) | Send + 'static + Arc |
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
parallel_map to use a fixed-size thread pool (from example 923) instead of spawning per task.Result<T, String> and handle panics gracefully.parallel_filter<T, F>(items: Vec<T>, pred: F) -> Vec<T> that tests items in parallel.parallel_map vs sequential map for 8 CPU-bound tasks on an 8-core machine.tokio::join! and tokio::task::spawn to compare async vs thread-based parallelism.