324: Running Futures Concurrently with join!
Tutorial Video
Text description (accessibility)
This video demonstrates the "324: Running Futures Concurrently with join!" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Sequential async execution wastes time when multiple operations are independent: fetching user data and fetching their posts can happen simultaneously. Key difference from OCaml: 1. **Macro vs function**: Rust's `tokio::join!` is a macro enabling heterogeneous future types; `futures::join_all()` is a function for homogeneous types.
Tutorial
The Problem
Sequential async execution wastes time when multiple operations are independent: fetching user data and fetching their posts can happen simultaneously. join! (or tokio::join!) starts all futures concurrently and waits for all to complete. The total time equals the slowest task, not the sum of all tasks. This is the fundamental tool for parallelizing independent I/O operations in async Rust.
🎯 Learning Outcomes
join! starts all futures simultaneously and waits for all to completejoin! (wait for all) from select! (wait for first)max(task_times) not sum(task_times) with join!join! to fetch independent data sources concurrentlyCode Example
fn join_all<T: Send + 'static>(tasks: Vec<Box<dyn FnOnce()->T+Send>>) -> Vec<T> {
tasks.into_iter()
.map(|f| thread::spawn(f))
.collect::<Vec<_>>()
.into_iter()
.map(|h| h.join().unwrap())
.collect()
}Key Differences
tokio::join! is a macro enabling heterogeneous future types; futures::join_all() is a function for homogeneous types.try_join! fails fast if any future returns Err; join! returns a tuple including errors.join! enforces a structured scope — all spawned work completes before proceeding; spawn() allows detached tasks.join! is concurrent (single thread, cooperative), not necessarily parallel; rayon::join! is parallel (multi-thread).OCaml Approach
OCaml's Lwt.both and Lwt.all provide equivalent concurrent execution:
(* Wait for both: total time = max(a, b) *)
let* (a, b) = Lwt.both (fetch_a ()) (fetch_b ())
(* Wait for all: Lwt.all returns list of results *)
let* results = Lwt.all [fetch_a (); fetch_b (); fetch_c ()]
Full Source
#![allow(clippy::all)]
//! # Running Futures Concurrently with join!
//!
//! Demonstrates concurrent execution where all tasks run simultaneously
//! and we wait for ALL of them to complete. Total time is max(individual), not sum.
use std::thread;
use std::time::Duration;
/// A slow addition that simulates I/O latency.
pub fn slow_add(a: i32, b: i32, delay_ms: u64) -> i32 {
thread::sleep(Duration::from_millis(delay_ms));
a + b
}
/// Approach 1: Join all tasks using threads.
/// Spawns all tasks first, then waits for all to complete.
/// Time is max(tasks), not sum(tasks).
pub fn join_all<T, F>(tasks: Vec<F>) -> Vec<T>
where
T: Send + 'static,
F: FnOnce() -> T + Send + 'static,
{
// Phase 1: spawn everything (all start running now)
let handles: Vec<_> = tasks.into_iter().map(|f| thread::spawn(f)).collect();
// Phase 2: collect results (wait for each to finish)
handles
.into_iter()
.map(|h| h.join().expect("task panicked"))
.collect()
}
/// Approach 2: Join with labels for debugging.
pub fn join_all_labeled<T, F>(tasks: Vec<(&'static str, F)>) -> Vec<(&'static str, T)>
where
T: Send + 'static,
F: FnOnce() -> T + Send + 'static,
{
let handles: Vec<_> = tasks
.into_iter()
.map(|(label, f)| {
let handle = thread::spawn(f);
(label, handle)
})
.collect();
handles
.into_iter()
.map(|(label, h)| (label, h.join().expect("task panicked")))
.collect()
}
/// Approach 3: Join exactly two tasks and return a tuple.
/// More ergonomic for common two-task patterns.
pub fn join_pair<A, B, FA, FB>(task_a: FA, task_b: FB) -> (A, B)
where
A: Send + 'static,
B: Send + 'static,
FA: FnOnce() -> A + Send + 'static,
FB: FnOnce() -> B + Send + 'static,
{
let handle_a = thread::spawn(task_a);
let handle_b = thread::spawn(task_b);
(
handle_a.join().expect("task A panicked"),
handle_b.join().expect("task B panicked"),
)
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
#[test]
fn test_join_all_returns_all_results() {
let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> =
vec![Box::new(|| 1 + 1), Box::new(|| 2 + 2), Box::new(|| 3 + 3)];
let results = join_all(tasks);
assert_eq!(results, vec![2, 4, 6]);
}
#[test]
fn test_join_all_concurrent_faster_than_sequential() {
let start = Instant::now();
let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![
Box::new(|| {
thread::sleep(Duration::from_millis(30));
1
}),
Box::new(|| {
thread::sleep(Duration::from_millis(30));
2
}),
];
let _ = join_all(tasks);
// If sequential, would take ~60ms. Concurrent should be ~30ms.
assert!(
start.elapsed() < Duration::from_millis(55),
"Should be concurrent, not sequential"
);
}
#[test]
fn test_join_all_preserves_order() {
let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![
Box::new(|| {
thread::sleep(Duration::from_millis(30));
1
}),
Box::new(|| {
thread::sleep(Duration::from_millis(10));
2
}),
Box::new(|| {
thread::sleep(Duration::from_millis(20));
3
}),
];
let results = join_all(tasks);
// Order should match input order, not completion order
assert_eq!(results, vec![1, 2, 3]);
}
#[test]
fn test_join_pair_different_types() {
let (s, n) = join_pair(|| "hello".to_string(), || 42);
assert_eq!(s, "hello");
assert_eq!(n, 42);
}
#[test]
fn test_join_all_labeled() {
let tasks: Vec<(&'static str, Box<dyn FnOnce() -> i32 + Send>)> =
vec![("first", Box::new(|| 10)), ("second", Box::new(|| 20))];
let results = join_all_labeled(tasks);
assert_eq!(results, vec![("first", 10), ("second", 20)]);
}
#[test]
fn test_join_all_empty() {
let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![];
let results = join_all(tasks);
assert!(results.is_empty());
}
}#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
#[test]
fn test_join_all_returns_all_results() {
let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> =
vec![Box::new(|| 1 + 1), Box::new(|| 2 + 2), Box::new(|| 3 + 3)];
let results = join_all(tasks);
assert_eq!(results, vec![2, 4, 6]);
}
#[test]
fn test_join_all_concurrent_faster_than_sequential() {
let start = Instant::now();
let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![
Box::new(|| {
thread::sleep(Duration::from_millis(30));
1
}),
Box::new(|| {
thread::sleep(Duration::from_millis(30));
2
}),
];
let _ = join_all(tasks);
// If sequential, would take ~60ms. Concurrent should be ~30ms.
assert!(
start.elapsed() < Duration::from_millis(55),
"Should be concurrent, not sequential"
);
}
#[test]
fn test_join_all_preserves_order() {
let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![
Box::new(|| {
thread::sleep(Duration::from_millis(30));
1
}),
Box::new(|| {
thread::sleep(Duration::from_millis(10));
2
}),
Box::new(|| {
thread::sleep(Duration::from_millis(20));
3
}),
];
let results = join_all(tasks);
// Order should match input order, not completion order
assert_eq!(results, vec![1, 2, 3]);
}
#[test]
fn test_join_pair_different_types() {
let (s, n) = join_pair(|| "hello".to_string(), || 42);
assert_eq!(s, "hello");
assert_eq!(n, 42);
}
#[test]
fn test_join_all_labeled() {
let tasks: Vec<(&'static str, Box<dyn FnOnce() -> i32 + Send>)> =
vec![("first", Box::new(|| 10)), ("second", Box::new(|| 20))];
let results = join_all_labeled(tasks);
assert_eq!(results, vec![("first", 10), ("second", 20)]);
}
#[test]
fn test_join_all_empty() {
let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![];
let results = join_all(tasks);
assert!(results.is_empty());
}
}
Deep Comparison
OCaml vs Rust: Join Futures
Parallel Execution
OCaml:
let parallel tasks =
let threads = List.map (fun f -> Thread.create f ()) tasks in
List.iter Thread.join threads
Rust:
fn join_all<T: Send + 'static>(tasks: Vec<Box<dyn FnOnce()->T+Send>>) -> Vec<T> {
tasks.into_iter()
.map(|f| thread::spawn(f))
.collect::<Vec<_>>()
.into_iter()
.map(|h| h.join().unwrap())
.collect()
}
Key Differences
| Aspect | OCaml | Rust |
|---|---|---|
| Return values | None (unit) | Vec<T> collected |
| Thread creation | Thread.create f () | thread::spawn(f) |
| Waiting | Thread.join | handle.join() |
| Type constraints | None | Send + 'static |
| Error handling | Exceptions | Result from join |
| Result order | N/A | Preserved |
Exercises
join_all concurrent execution of 5 tasks with varying delays — measure wall-clock time.fetch_all(urls: Vec<Url>) -> Vec<Result<Response, Error>> that fetches all URLs concurrently using join_all.join! with 3 tasks of 100ms, 200ms, and 300ms takes ~300ms total, not ~600ms as sequential would.