ExamplesBy LevelBy TopicLearning Paths
324 Advanced

324: Running Futures Concurrently with join!

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "324: Running Futures Concurrently with join!" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Sequential async execution wastes time when multiple operations are independent: fetching user data and fetching their posts can happen simultaneously. Key difference from OCaml: 1. **Macro vs function**: Rust's `tokio::join!` is a macro enabling heterogeneous future types; `futures::join_all()` is a function for homogeneous types.

Tutorial

The Problem

Sequential async execution wastes time when multiple operations are independent: fetching user data and fetching their posts can happen simultaneously. join! (or tokio::join!) starts all futures concurrently and waits for all to complete. The total time equals the slowest task, not the sum of all tasks. This is the fundamental tool for parallelizing independent I/O operations in async Rust.

🎯 Learning Outcomes

  • • Understand that join! starts all futures simultaneously and waits for all to complete
  • • Distinguish join! (wait for all) from select! (wait for first)
  • • Recognize that total time is max(task_times) not sum(task_times) with join!
  • • Apply join! to fetch independent data sources concurrently
  • Code Example

    fn join_all<T: Send + 'static>(tasks: Vec<Box<dyn FnOnce()->T+Send>>) -> Vec<T> {
        tasks.into_iter()
            .map(|f| thread::spawn(f))
            .collect::<Vec<_>>()
            .into_iter()
            .map(|h| h.join().unwrap())
            .collect()
    }

    Key Differences

  • Macro vs function: Rust's tokio::join! is a macro enabling heterogeneous future types; futures::join_all() is a function for homogeneous types.
  • Error propagation: try_join! fails fast if any future returns Err; join! returns a tuple including errors.
  • Structured concurrency: join! enforces a structured scope — all spawned work completes before proceeding; spawn() allows detached tasks.
  • vs parallel: join! is concurrent (single thread, cooperative), not necessarily parallel; rayon::join! is parallel (multi-thread).
  • OCaml Approach

    OCaml's Lwt.both and Lwt.all provide equivalent concurrent execution:

    (* Wait for both: total time = max(a, b) *)
    let* (a, b) = Lwt.both (fetch_a ()) (fetch_b ())
    
    (* Wait for all: Lwt.all returns list of results *)
    let* results = Lwt.all [fetch_a (); fetch_b (); fetch_c ()]
    

    Full Source

    #![allow(clippy::all)]
    //! # Running Futures Concurrently with join!
    //!
    //! Demonstrates concurrent execution where all tasks run simultaneously
    //! and we wait for ALL of them to complete. Total time is max(individual), not sum.
    
    use std::thread;
    use std::time::Duration;
    
    /// A slow addition that simulates I/O latency.
    pub fn slow_add(a: i32, b: i32, delay_ms: u64) -> i32 {
        thread::sleep(Duration::from_millis(delay_ms));
        a + b
    }
    
    /// Approach 1: Join all tasks using threads.
    /// Spawns all tasks first, then waits for all to complete.
    /// Time is max(tasks), not sum(tasks).
    pub fn join_all<T, F>(tasks: Vec<F>) -> Vec<T>
    where
        T: Send + 'static,
        F: FnOnce() -> T + Send + 'static,
    {
        // Phase 1: spawn everything (all start running now)
        let handles: Vec<_> = tasks.into_iter().map(|f| thread::spawn(f)).collect();
    
        // Phase 2: collect results (wait for each to finish)
        handles
            .into_iter()
            .map(|h| h.join().expect("task panicked"))
            .collect()
    }
    
    /// Approach 2: Join with labels for debugging.
    pub fn join_all_labeled<T, F>(tasks: Vec<(&'static str, F)>) -> Vec<(&'static str, T)>
    where
        T: Send + 'static,
        F: FnOnce() -> T + Send + 'static,
    {
        let handles: Vec<_> = tasks
            .into_iter()
            .map(|(label, f)| {
                let handle = thread::spawn(f);
                (label, handle)
            })
            .collect();
    
        handles
            .into_iter()
            .map(|(label, h)| (label, h.join().expect("task panicked")))
            .collect()
    }
    
    /// Approach 3: Join exactly two tasks and return a tuple.
    /// More ergonomic for common two-task patterns.
    pub fn join_pair<A, B, FA, FB>(task_a: FA, task_b: FB) -> (A, B)
    where
        A: Send + 'static,
        B: Send + 'static,
        FA: FnOnce() -> A + Send + 'static,
        FB: FnOnce() -> B + Send + 'static,
    {
        let handle_a = thread::spawn(task_a);
        let handle_b = thread::spawn(task_b);
        (
            handle_a.join().expect("task A panicked"),
            handle_b.join().expect("task B panicked"),
        )
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        use std::time::Instant;
    
        #[test]
        fn test_join_all_returns_all_results() {
            let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> =
                vec![Box::new(|| 1 + 1), Box::new(|| 2 + 2), Box::new(|| 3 + 3)];
            let results = join_all(tasks);
            assert_eq!(results, vec![2, 4, 6]);
        }
    
        #[test]
        fn test_join_all_concurrent_faster_than_sequential() {
            let start = Instant::now();
    
            let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![
                Box::new(|| {
                    thread::sleep(Duration::from_millis(30));
                    1
                }),
                Box::new(|| {
                    thread::sleep(Duration::from_millis(30));
                    2
                }),
            ];
            let _ = join_all(tasks);
    
            // If sequential, would take ~60ms. Concurrent should be ~30ms.
            assert!(
                start.elapsed() < Duration::from_millis(55),
                "Should be concurrent, not sequential"
            );
        }
    
        #[test]
        fn test_join_all_preserves_order() {
            let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![
                Box::new(|| {
                    thread::sleep(Duration::from_millis(30));
                    1
                }),
                Box::new(|| {
                    thread::sleep(Duration::from_millis(10));
                    2
                }),
                Box::new(|| {
                    thread::sleep(Duration::from_millis(20));
                    3
                }),
            ];
            let results = join_all(tasks);
            // Order should match input order, not completion order
            assert_eq!(results, vec![1, 2, 3]);
        }
    
        #[test]
        fn test_join_pair_different_types() {
            let (s, n) = join_pair(|| "hello".to_string(), || 42);
            assert_eq!(s, "hello");
            assert_eq!(n, 42);
        }
    
        #[test]
        fn test_join_all_labeled() {
            let tasks: Vec<(&'static str, Box<dyn FnOnce() -> i32 + Send>)> =
                vec![("first", Box::new(|| 10)), ("second", Box::new(|| 20))];
            let results = join_all_labeled(tasks);
            assert_eq!(results, vec![("first", 10), ("second", 20)]);
        }
    
        #[test]
        fn test_join_all_empty() {
            let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![];
            let results = join_all(tasks);
            assert!(results.is_empty());
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
        use std::time::Instant;
    
        #[test]
        fn test_join_all_returns_all_results() {
            let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> =
                vec![Box::new(|| 1 + 1), Box::new(|| 2 + 2), Box::new(|| 3 + 3)];
            let results = join_all(tasks);
            assert_eq!(results, vec![2, 4, 6]);
        }
    
        #[test]
        fn test_join_all_concurrent_faster_than_sequential() {
            let start = Instant::now();
    
            let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![
                Box::new(|| {
                    thread::sleep(Duration::from_millis(30));
                    1
                }),
                Box::new(|| {
                    thread::sleep(Duration::from_millis(30));
                    2
                }),
            ];
            let _ = join_all(tasks);
    
            // If sequential, would take ~60ms. Concurrent should be ~30ms.
            assert!(
                start.elapsed() < Duration::from_millis(55),
                "Should be concurrent, not sequential"
            );
        }
    
        #[test]
        fn test_join_all_preserves_order() {
            let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![
                Box::new(|| {
                    thread::sleep(Duration::from_millis(30));
                    1
                }),
                Box::new(|| {
                    thread::sleep(Duration::from_millis(10));
                    2
                }),
                Box::new(|| {
                    thread::sleep(Duration::from_millis(20));
                    3
                }),
            ];
            let results = join_all(tasks);
            // Order should match input order, not completion order
            assert_eq!(results, vec![1, 2, 3]);
        }
    
        #[test]
        fn test_join_pair_different_types() {
            let (s, n) = join_pair(|| "hello".to_string(), || 42);
            assert_eq!(s, "hello");
            assert_eq!(n, 42);
        }
    
        #[test]
        fn test_join_all_labeled() {
            let tasks: Vec<(&'static str, Box<dyn FnOnce() -> i32 + Send>)> =
                vec![("first", Box::new(|| 10)), ("second", Box::new(|| 20))];
            let results = join_all_labeled(tasks);
            assert_eq!(results, vec![("first", 10), ("second", 20)]);
        }
    
        #[test]
        fn test_join_all_empty() {
            let tasks: Vec<Box<dyn FnOnce() -> i32 + Send>> = vec![];
            let results = join_all(tasks);
            assert!(results.is_empty());
        }
    }

    Deep Comparison

    OCaml vs Rust: Join Futures

    Parallel Execution

    OCaml:

    let parallel tasks =
      let threads = List.map (fun f -> Thread.create f ()) tasks in
      List.iter Thread.join threads
    

    Rust:

    fn join_all<T: Send + 'static>(tasks: Vec<Box<dyn FnOnce()->T+Send>>) -> Vec<T> {
        tasks.into_iter()
            .map(|f| thread::spawn(f))
            .collect::<Vec<_>>()
            .into_iter()
            .map(|h| h.join().unwrap())
            .collect()
    }
    

    Key Differences

    AspectOCamlRust
    Return valuesNone (unit)Vec<T> collected
    Thread creationThread.create f ()thread::spawn(f)
    WaitingThread.joinhandle.join()
    Type constraintsNoneSend + 'static
    Error handlingExceptionsResult from join
    Result orderN/APreserved

    Exercises

  • Time the difference between sequential and join_all concurrent execution of 5 tasks with varying delays — measure wall-clock time.
  • Implement a fetch_all(urls: Vec<Url>) -> Vec<Result<Response, Error>> that fetches all URLs concurrently using join_all.
  • Show that join! with 3 tasks of 100ms, 200ms, and 300ms takes ~300ms total, not ~600ms as sequential would.
  • Open Source Repos