ExamplesBy LevelBy TopicLearning Paths
448 Fundamental

448: Rayon Parallel Iterators — Data Parallelism

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "448: Rayon Parallel Iterators — Data Parallelism" functional Rust example. Difficulty level: Fundamental. Key concepts covered: Functional Programming. Converting sequential code to parallel is usually complex: thread management, load balancing, result collection. Key difference from OCaml: 1. **API ergonomics**: Rayon's `.par_iter()` requires zero code change beyond the method name; this example requires manual chunking. OCaml's `Domainslib.parallel_for` is also low

Tutorial

The Problem

Converting sequential code to parallel is usually complex: thread management, load balancing, result collection. Rayon's parallel iterators solve this: replace .iter() with .par_iter() and Rayon handles thread spawning, work distribution, and result aggregation. The library implements parallel map, filter, fold, sum, and 40+ other operations. This example demonstrates the underlying pattern using thread::scope to show what Rayon does internally.

Rayon is used in data processing pipelines, image rendering, scientific simulations, build systems (Cargo uses Rayon for compilation), and any CPU-bound iteration over large datasets.

🎯 Learning Outcomes

  • • Understand the parallel iterator pattern: chunk data, process in parallel, collect results
  • • Learn how thread::available_parallelism() determines the optimal thread count
  • • See how chunk-based parallel map avoids the overhead of one-thread-per-element
  • • Understand how parallel reduce/fold works: local reductions joined in a tree
  • • Learn the data parallelism model vs. task parallelism (different granularity)
  • Code Example

    // With rayon crate:
    use rayon::prelude::*;
    let squares: Vec<_> = data.par_iter().map(|x| x * x).collect();
    
    // Manual implementation:
    fn parallel_map<T, U, F>(data: &[T], f: F) -> Vec<U>
    where T: Sync, U: Send + Default + Clone, F: Fn(&T) -> U + Sync
    {
        let chunk_size = data.len() / num_cpus;
        let mut results = vec![U::default(); data.len()];
        
        thread::scope(|s| {
            for (chunk_in, chunk_out) in 
                data.chunks(chunk_size).zip(results.chunks_mut(chunk_size)) 
            {
                s.spawn(|| {
                    for (input, output) in chunk_in.iter().zip(chunk_out.iter_mut()) {
                        *output = f(input);
                    }
                });
            }
        });
        results
    }

    Key Differences

  • API ergonomics: Rayon's .par_iter() requires zero code change beyond the method name; this example requires manual chunking. OCaml's Domainslib.parallel_for is also low-level.
  • Work stealing: Rayon uses work stealing for load balancing; this example's fixed-chunk approach can be imbalanced.
  • Composability: Rayon's parallel iterators compose: .par_iter().filter(pred).map(f).sum() is all parallel; manual chunking requires re-chunking at each stage.
  • Overhead: Rayon's global thread pool amortizes startup costs; thread::scope creates threads per scope (but with scoped thread caching in some implementations).
  • OCaml Approach

    OCaml 5.x's Domainslib.Task.parallel_for divides a range among domains: Task.parallel_for pool ~start:0 ~finish:n ~body:(fun i -> process arr.(i)). Domainslib.Task.async + Task.await provide future-style composition. OCaml 4.x has no true parallel iteration due to the GIL. The functional style makes parallel operations natural — pure functions with no shared state are trivially parallelizable.

    Full Source

    #![allow(clippy::all)]
    //! # Rayon Parallel Iterators — Data Parallelism Made Easy
    //!
    //! Demonstrates the parallel iterator pattern that Rayon provides,
    //! implemented here with scoped threads to show the concept.
    
    use std::thread;
    
    /// Approach 1: Parallel map over slices
    ///
    /// Maps a function over slice elements in parallel.
    pub fn parallel_map<T, U, F>(data: &[T], f: F) -> Vec<U>
    where
        T: Sync,
        U: Send + Default + Clone,
        F: Fn(&T) -> U + Sync,
    {
        let num_threads = thread::available_parallelism()
            .map(|n| n.get())
            .unwrap_or(4);
        let chunk_size = (data.len() / num_threads).max(1);
    
        let mut results = vec![U::default(); data.len()];
    
        thread::scope(|s| {
            for (chunk_in, chunk_out) in data.chunks(chunk_size).zip(results.chunks_mut(chunk_size)) {
                s.spawn(|| {
                    for (input, output) in chunk_in.iter().zip(chunk_out.iter_mut()) {
                        *output = f(input);
                    }
                });
            }
        });
    
        results
    }
    
    /// Approach 2: Parallel sum/reduce
    ///
    /// Sums elements in parallel by dividing into chunks.
    pub fn parallel_sum(data: &[f64]) -> f64 {
        let num_threads = 4;
        let chunk_size = (data.len() / num_threads).max(1);
    
        let partial_sums: Vec<f64> = thread::scope(|s| {
            data.chunks(chunk_size)
                .map(|chunk| s.spawn(move || chunk.iter().sum::<f64>()))
                .collect::<Vec<_>>()
                .into_iter()
                .map(|h| h.join().unwrap())
                .collect()
        });
    
        partial_sums.iter().sum()
    }
    
    /// Approach 3: Parallel filter-map
    ///
    /// Filters and maps in parallel.
    pub fn parallel_filter_map<T, U, F, P>(data: &[T], predicate: P, mapper: F) -> Vec<U>
    where
        T: Sync,
        U: Send,
        F: Fn(&T) -> U + Sync,
        P: Fn(&T) -> bool + Sync,
    {
        let num_threads = 4;
        let chunk_size = (data.len() / num_threads).max(1);
    
        let partial_results: Vec<Vec<U>> = thread::scope(|s| {
            data.chunks(chunk_size)
                .map(|chunk| {
                    s.spawn(|| {
                        chunk
                            .iter()
                            .filter(|x| predicate(x))
                            .map(|x| mapper(x))
                            .collect::<Vec<_>>()
                    })
                })
                .collect::<Vec<_>>()
                .into_iter()
                .map(|h| h.join().unwrap())
                .collect()
        });
    
        partial_results.into_iter().flatten().collect()
    }
    
    /// Approach 4: Parallel reduce with custom operation
    pub fn parallel_reduce<T, F>(data: &[T], identity: T, op: F) -> T
    where
        T: Send + Sync + Clone,
        F: Fn(T, T) -> T + Sync,
    {
        if data.is_empty() {
            return identity;
        }
    
        let num_threads = 4;
        let chunk_size = (data.len() / num_threads).max(1);
    
        let partial: Vec<T> = thread::scope(|s| {
            data.chunks(chunk_size)
                .map(|chunk| {
                    let op = &op;
                    let id = identity.clone();
                    s.spawn(move || chunk.iter().cloned().fold(id, |acc, x| op(acc, x)))
                })
                .collect::<Vec<_>>()
                .into_iter()
                .map(|h| h.join().unwrap())
                .collect()
        });
    
        partial.into_iter().fold(identity, |acc, x| op(acc, x))
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_parallel_map_squares() {
            let data: Vec<f64> = (1..=10).map(|x| x as f64).collect();
            let squares = parallel_map(&data, |x| x * x);
            let expected: Vec<f64> = (1..=10).map(|x| (x * x) as f64).collect();
            assert_eq!(squares, expected);
        }
    
        #[test]
        fn test_parallel_map_strings() {
            let data = vec!["hello", "world", "rust"];
            let lengths = parallel_map(&data, |s| s.len());
            assert_eq!(lengths, vec![5, 5, 4]);
        }
    
        #[test]
        fn test_parallel_sum() {
            let data: Vec<f64> = (1..=100).map(|x| x as f64).collect();
            let sum = parallel_sum(&data);
            assert!((sum - 5050.0).abs() < 1e-9);
        }
    
        #[test]
        fn test_parallel_sum_empty() {
            let data: Vec<f64> = vec![];
            let sum = parallel_sum(&data);
            assert!((sum - 0.0).abs() < 1e-9);
        }
    
        #[test]
        fn test_parallel_filter_map() {
            let data: Vec<i32> = (1..=20).collect();
            // Get squares of even numbers
            let result = parallel_filter_map(&data, |x| x % 2 == 0, |x| x * x);
            let expected: Vec<i32> = vec![4, 16, 36, 64, 100, 144, 196, 256, 324, 400];
            assert_eq!(result, expected);
        }
    
        #[test]
        fn test_parallel_reduce_sum() {
            let data: Vec<i32> = (1..=10).collect();
            let sum = parallel_reduce(&data, 0, |a, b| a + b);
            assert_eq!(sum, 55);
        }
    
        #[test]
        fn test_parallel_reduce_max() {
            let data = vec![3, 1, 4, 1, 5, 9, 2, 6];
            let max = parallel_reduce(&data, i32::MIN, |a, b| a.max(b));
            assert_eq!(max, 9);
        }
    
        #[test]
        fn test_parallel_reduce_product() {
            let data: Vec<i64> = (1..=5).collect();
            let product = parallel_reduce(&data, 1, |a, b| a * b);
            assert_eq!(product, 120);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_parallel_map_squares() {
            let data: Vec<f64> = (1..=10).map(|x| x as f64).collect();
            let squares = parallel_map(&data, |x| x * x);
            let expected: Vec<f64> = (1..=10).map(|x| (x * x) as f64).collect();
            assert_eq!(squares, expected);
        }
    
        #[test]
        fn test_parallel_map_strings() {
            let data = vec!["hello", "world", "rust"];
            let lengths = parallel_map(&data, |s| s.len());
            assert_eq!(lengths, vec![5, 5, 4]);
        }
    
        #[test]
        fn test_parallel_sum() {
            let data: Vec<f64> = (1..=100).map(|x| x as f64).collect();
            let sum = parallel_sum(&data);
            assert!((sum - 5050.0).abs() < 1e-9);
        }
    
        #[test]
        fn test_parallel_sum_empty() {
            let data: Vec<f64> = vec![];
            let sum = parallel_sum(&data);
            assert!((sum - 0.0).abs() < 1e-9);
        }
    
        #[test]
        fn test_parallel_filter_map() {
            let data: Vec<i32> = (1..=20).collect();
            // Get squares of even numbers
            let result = parallel_filter_map(&data, |x| x % 2 == 0, |x| x * x);
            let expected: Vec<i32> = vec![4, 16, 36, 64, 100, 144, 196, 256, 324, 400];
            assert_eq!(result, expected);
        }
    
        #[test]
        fn test_parallel_reduce_sum() {
            let data: Vec<i32> = (1..=10).collect();
            let sum = parallel_reduce(&data, 0, |a, b| a + b);
            assert_eq!(sum, 55);
        }
    
        #[test]
        fn test_parallel_reduce_max() {
            let data = vec![3, 1, 4, 1, 5, 9, 2, 6];
            let max = parallel_reduce(&data, i32::MIN, |a, b| a.max(b));
            assert_eq!(max, 9);
        }
    
        #[test]
        fn test_parallel_reduce_product() {
            let data: Vec<i64> = (1..=5).collect();
            let product = parallel_reduce(&data, 1, |a, b| a * b);
            assert_eq!(product, 120);
        }
    }

    Deep Comparison

    OCaml vs Rust: Parallel Iterators

    Parallel Map

    OCaml

    let parallel_map f arr =
      let n = Array.length arr in
      let res = Array.make n (f arr.(0)) in
      let num_threads = 4 in
      let chunk = (n + num_threads - 1) / num_threads in
      let threads = Array.init num_threads (fun t ->
        let lo = t * chunk in
        let hi = min n ((t + 1) * chunk) in
        Thread.create (fun () ->
          for i = lo to hi - 1 do
            res.(i) <- f arr.(i)
          done
        ) ()
      ) in
      Array.iter Thread.join threads;
      res
    

    Rust (with rayon, this becomes trivial)

    // With rayon crate:
    use rayon::prelude::*;
    let squares: Vec<_> = data.par_iter().map(|x| x * x).collect();
    
    // Manual implementation:
    fn parallel_map<T, U, F>(data: &[T], f: F) -> Vec<U>
    where T: Sync, U: Send + Default + Clone, F: Fn(&T) -> U + Sync
    {
        let chunk_size = data.len() / num_cpus;
        let mut results = vec![U::default(); data.len()];
        
        thread::scope(|s| {
            for (chunk_in, chunk_out) in 
                data.chunks(chunk_size).zip(results.chunks_mut(chunk_size)) 
            {
                s.spawn(|| {
                    for (input, output) in chunk_in.iter().zip(chunk_out.iter_mut()) {
                        *output = f(input);
                    }
                });
            }
        });
        results
    }
    

    Key Differences

    FeatureOCamlRust (Rayon)
    SyntaxManual chunking.par_iter().map()
    Work distributionStatic chunksWork-stealing
    Thread managementManualAutomatic thread pool
    ComposabilityLowHigh (chain operations)

    Parallel Sum

    OCaml

    let parallel_sum arr =
      let chunk = Array.length arr / 4 in
      let partials = Array.init 4 (fun t ->
        Thread.create (fun () ->
          let lo = t * chunk in
          let hi = if t = 3 then Array.length arr else (t+1) * chunk in
          let sum = ref 0.0 in
          for i = lo to hi - 1 do sum := !sum +. arr.(i) done;
          !sum
        ) ()
      ) in
      (* ... join and sum partials *)
    

    Rust

    // With rayon:
    let sum: f64 = data.par_iter().sum();
    
    // Manual:
    let partial_sums: Vec<f64> = thread::scope(|s| {
        data.chunks(chunk_size)
            .map(|c| s.spawn(move || c.iter().sum::<f64>()))
            .collect::<Vec<_>>()
            .into_iter()
            .map(|h| h.join().unwrap())
            .collect()
    });
    partial_sums.iter().sum()
    

    Rayon's Power

    // Complex pipeline — all parallel, work-stealing balanced
    let result: Vec<_> = data
        .par_iter()
        .filter(|x| x.is_valid())
        .map(|x| expensive_transform(x))
        .filter_map(|x| x.optional_step())
        .collect();
    

    Exercises

  • Image processing: Load a grayscale image as Vec<u8>. Apply a blur filter to each pixel in parallel using the chunked parallel_map pattern. Verify the result matches sequential processing.
  • Word count: Given Vec<String> of sentences, count total words in parallel using parallel_map (count per sentence) followed by a parallel sum. Compare performance vs. sequential for 1M sentences.
  • Matrix transpose: Implement parallel matrix transpose using thread::scope. Divide rows among threads; each thread writes its assigned rows to the transposed positions. Verify correctness and benchmark vs. sequential.
  • Open Source Repos