ExamplesBy LevelBy TopicLearning Paths
327 Advanced

327: Spawning Concurrent Tasks

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "327: Spawning Concurrent Tasks" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. Fire-and-forget background tasks, parallel work distribution, and pipeline stages all require spawning independent units of work. Key difference from OCaml: 1. **Panic isolation**: Rust's `join()` returns `Result<T, Box<dyn Any>>` — a panicking thread produces `Err`; OCaml domains propagate exceptions through `Domain.join`.

Tutorial

The Problem

Fire-and-forget background tasks, parallel work distribution, and pipeline stages all require spawning independent units of work. thread::spawn (synchronous) and tokio::spawn (async) start tasks that run independently and optionally return results via JoinHandle. Understanding how to spawn tasks, collect results, and handle panics is foundational for concurrent Rust programming.

🎯 Learning Outcomes

  • • Use thread::spawn() to start independent background tasks
  • • Collect results from multiple threads using JoinHandle::join()
  • • Handle panics in spawned threads via join() returning Result
  • • Understand the difference between detached tasks (fire-and-forget) and joined tasks (collect results)
  • Code Example

    fn spawn_worker(id: usize, delay_ms: u64) -> thread::JoinHandle<String> {
        thread::spawn(move || {
            thread::sleep(Duration::from_millis(delay_ms));
            format!("worker-{id} done after {delay_ms}ms")
        })
    }

    Key Differences

  • Panic isolation: Rust's join() returns Result<T, Box<dyn Any>> — a panicking thread produces Err; OCaml domains propagate exceptions through Domain.join.
  • Return value: JoinHandle<T>::join() returns the thread's return value; async tokio::task::JoinHandle<T> does the same asynchronously.
  • Detach: Not calling join() on a JoinHandle causes the thread to run until completion but its result is discarded; the thread continues independently.
  • Stack size: thread::Builder::new().stack_size(N) customizes thread stack size for large recursive operations.
  • OCaml Approach

    OCaml 5's Domain module provides multi-core parallelism. For Lwt, Lwt.async fires background tasks, and Lwt.join waits for all:

    let spawn_worker id =
      Domain.spawn (fun () ->
        Unix.sleepf 0.01;
        Printf.sprintf "worker-%d done" id)
    
    let collect_results count =
      let domains = List.init count spawn_worker in
      List.map Domain.join domains
    

    Full Source

    #![allow(clippy::all)]
    //! # Spawning Concurrent Tasks
    //!
    //! Demonstrates spawning tasks to run in the background independently.
    //! Fire and forget, or collect results later via handles.
    
    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    /// Spawns a worker that completes after a delay and returns a message.
    pub fn spawn_worker(id: usize, delay_ms: u64) -> thread::JoinHandle<String> {
        thread::spawn(move || {
            thread::sleep(Duration::from_millis(delay_ms));
            format!("worker-{} done after {}ms", id, delay_ms)
        })
    }
    
    /// Spawns multiple workers and returns their handles.
    pub fn spawn_workers(count: usize) -> Vec<thread::JoinHandle<String>> {
        (0..count)
            .map(|i| spawn_worker(i, ((count - i) * 10) as u64))
            .collect()
    }
    
    /// Spawns workers that send results through a channel.
    /// Returns results in completion order (not spawn order).
    pub fn spawn_with_channel(n: usize) -> Vec<String> {
        let (tx, rx) = mpsc::channel();
    
        for i in 0..n {
            let tx = tx.clone();
            thread::spawn(move || {
                thread::sleep(Duration::from_millis((n - i) as u64 * 5));
                tx.send(format!("task-{}", i)).unwrap();
            });
        }
    
        drop(tx); // Important: drop original sender so receiver knows when to stop
        rx.into_iter().collect()
    }
    
    /// Spawns a task that returns a computed value.
    pub fn spawn_compute<T, F>(f: F) -> thread::JoinHandle<T>
    where
        T: Send + 'static,
        F: FnOnce() -> T + Send + 'static,
    {
        thread::spawn(f)
    }
    
    /// Spawns N parallel computations and collects results.
    pub fn parallel_map<T, U, F>(items: Vec<T>, f: F) -> Vec<U>
    where
        T: Send + 'static,
        U: Send + 'static,
        F: Fn(T) -> U + Send + Sync + 'static + Clone,
    {
        let handles: Vec<_> = items
            .into_iter()
            .map(|item| {
                let f = f.clone();
                thread::spawn(move || f(item))
            })
            .collect();
    
        handles.into_iter().map(|h| h.join().unwrap()).collect()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_spawn_worker_returns_message() {
            let handle = spawn_worker(42, 1);
            let result = handle.join().unwrap();
            assert!(result.contains("worker-42"));
        }
    
        #[test]
        fn test_spawn_workers_all_complete() {
            let handles = spawn_workers(3);
            let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
            assert_eq!(results.len(), 3);
        }
    
        #[test]
        fn test_spawn_with_channel_collects_all() {
            let results = spawn_with_channel(5);
            assert_eq!(results.len(), 5);
            // Results may be in any order due to concurrency
        }
    
        #[test]
        fn test_spawn_compute() {
            let handle = spawn_compute(|| 2 + 2);
            assert_eq!(handle.join().unwrap(), 4);
        }
    
        #[test]
        fn test_parallel_map() {
            let items = vec![1, 2, 3, 4, 5];
            let results = parallel_map(items, |x| x * x);
            assert_eq!(results, vec![1, 4, 9, 16, 25]);
        }
    
        #[test]
        fn test_parallel_map_preserves_order() {
            let items = vec!["a", "bb", "ccc"];
            let results = parallel_map(items, |s| s.len());
            assert_eq!(results, vec![1, 2, 3]);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_spawn_worker_returns_message() {
            let handle = spawn_worker(42, 1);
            let result = handle.join().unwrap();
            assert!(result.contains("worker-42"));
        }
    
        #[test]
        fn test_spawn_workers_all_complete() {
            let handles = spawn_workers(3);
            let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
            assert_eq!(results.len(), 3);
        }
    
        #[test]
        fn test_spawn_with_channel_collects_all() {
            let results = spawn_with_channel(5);
            assert_eq!(results.len(), 5);
            // Results may be in any order due to concurrency
        }
    
        #[test]
        fn test_spawn_compute() {
            let handle = spawn_compute(|| 2 + 2);
            assert_eq!(handle.join().unwrap(), 4);
        }
    
        #[test]
        fn test_parallel_map() {
            let items = vec![1, 2, 3, 4, 5];
            let results = parallel_map(items, |x| x * x);
            assert_eq!(results, vec![1, 4, 9, 16, 25]);
        }
    
        #[test]
        fn test_parallel_map_preserves_order() {
            let items = vec!["a", "bb", "ccc"];
            let results = parallel_map(items, |s| s.len());
            assert_eq!(results, vec![1, 2, 3]);
        }
    }

    Deep Comparison

    OCaml vs Rust: Spawning Tasks

    Spawn Worker

    OCaml:

    let spawn_worker id delay =
      Thread.create (fun () ->
        Thread.delay delay;
        Printf.sprintf "worker-%d done" id
      ) ()
    

    Rust:

    fn spawn_worker(id: usize, delay_ms: u64) -> thread::JoinHandle<String> {
        thread::spawn(move || {
            thread::sleep(Duration::from_millis(delay_ms));
            format!("worker-{id} done after {delay_ms}ms")
        })
    }
    

    Key Differences

    AspectOCamlRust
    Return typeThread.tJoinHandle<T>
    Thread creationThread.create f ()thread::spawn(closure)
    Getting resultN/A (thread returns unit)handle.join().unwrap()
    Move semanticsImplicitExplicit move
    Static lifetimeNot required'static required

    Exercises

  • Spawn N threads each computing the prime numbers in a range, then collect and merge all results.
  • Handle panicking threads: spawn threads that may panic, use join() to detect panics, and recover gracefully.
  • Implement a parallel map_reduce that maps a function over chunks in parallel and reduces the chunk results to a final answer.
  • Open Source Repos