Parallel Reduce
Tutorial Video
Text description (accessibility)
This video demonstrates the "Parallel Reduce" functional Rust example. Difficulty level: Fundamental. Key concepts covered: Functional Programming. Sequential `fold` over a million-element slice processes one element at a time. Key difference from OCaml: 1. **Scoped threads**: Rust's `thread::scope` borrows the slice directly without cloning; OCaml's `Domain.spawn` requires values to be shared across domain boundaries, usually via `Atomic` refs.
Tutorial
The Problem
Sequential fold over a million-element slice processes one element at a time. If the combining operation is associative (addition, multiplication, min, max, logical AND/OR), the computation can be split into independent subtrees and evaluated in parallel. MapReduce (Dean & Ghemawat, 2004), parallel prefix scans in GPU computing, and Spark's aggregate all rely on this algebraic observation. The wall-clock time drops from O(N) to O(N/P + log P) where P is the number of processors.
🎯 Learning Outcomes
thread::scope to borrow slice halves without Arc or 'static boundsT: Send + Sync + Clone constraints needed for cross-thread dataCode Example
#![allow(clippy::all)]
//! # Parallel Reduce — Concurrent Aggregation
//!
//! Reduce collections in parallel for faster aggregation.
use std::thread;
/// Parallel reduce with custom operation
pub fn parallel_reduce<T>(data: &[T], identity: T, op: &(dyn Fn(T, T) -> T + Sync)) -> T
where
T: Send + Sync + Clone,
{
const THRESHOLD: usize = 100;
if data.is_empty() {
return identity;
}
if data.len() <= THRESHOLD {
return data.iter().cloned().fold(identity, |a, b| op(a, b));
}
let mid = data.len() / 2;
let (left, right) = data.split_at(mid);
thread::scope(|s| {
let id_left = identity.clone();
let left_handle = s.spawn(|| parallel_reduce(left, id_left, op));
let right_result = parallel_reduce(right, identity, op);
op(left_handle.join().unwrap(), right_result)
})
}
/// Parallel sum
pub fn parallel_sum<T>(data: &[T]) -> T
where
T: Send + Sync + Clone + std::ops::Add<Output = T> + Default,
{
parallel_reduce(data, T::default(), &|a, b| a + b)
}
/// Parallel product
pub fn parallel_product<T>(data: &[T]) -> T
where
T: Send + Sync + Clone + std::ops::Mul<Output = T> + From<u8>,
{
parallel_reduce(data, T::from(1), &|a, b| a * b)
}
/// Parallel min
pub fn parallel_min<T: Send + Sync + Clone + Ord>(data: &[T]) -> Option<T> {
if data.is_empty() {
return None;
}
Some(parallel_reduce(data, data[0].clone(), &|a, b| {
if a < b {
a
} else {
b
}
}))
}
/// Parallel max
pub fn parallel_max<T: Send + Sync + Clone + Ord>(data: &[T]) -> Option<T> {
if data.is_empty() {
return None;
}
Some(parallel_reduce(data, data[0].clone(), &|a, b| {
if a > b {
a
} else {
b
}
}))
}
/// Parallel all (conjunction)
pub fn parallel_all<T>(data: &[T], predicate: &(dyn Fn(&T) -> bool + Sync)) -> bool
where
T: Sync,
{
const THRESHOLD: usize = 100;
if data.is_empty() {
return true;
}
if data.len() <= THRESHOLD {
return data.iter().all(predicate);
}
let mid = data.len() / 2;
let (left, right) = data.split_at(mid);
thread::scope(|s| {
let left_handle = s.spawn(|| parallel_all(left, predicate));
let right_result = parallel_all(right, predicate);
left_handle.join().unwrap() && right_result
})
}
/// Parallel any (disjunction)
pub fn parallel_any<T>(data: &[T], predicate: &(dyn Fn(&T) -> bool + Sync)) -> bool
where
T: Sync,
{
const THRESHOLD: usize = 100;
if data.is_empty() {
return false;
}
if data.len() <= THRESHOLD {
return data.iter().any(predicate);
}
let mid = data.len() / 2;
let (left, right) = data.split_at(mid);
thread::scope(|s| {
let left_handle = s.spawn(|| parallel_any(left, predicate));
let right_result = parallel_any(right, predicate);
left_handle.join().unwrap() || right_result
})
}
/// Parallel count matching predicate
pub fn parallel_count<T>(data: &[T], predicate: &(dyn Fn(&T) -> bool + Sync)) -> usize
where
T: Sync,
{
const THRESHOLD: usize = 100;
if data.len() <= THRESHOLD {
return data.iter().filter(|x| predicate(x)).count();
}
let mid = data.len() / 2;
let (left, right) = data.split_at(mid);
thread::scope(|s| {
let left_handle = s.spawn(|| parallel_count(left, predicate));
let right_result = parallel_count(right, predicate);
left_handle.join().unwrap() + right_result
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_reduce_sum() {
let data: Vec<i64> = (1..=1000).collect();
let sum = parallel_reduce(&data, 0, &|a, b| a + b);
assert_eq!(sum, 500500);
}
#[test]
fn test_parallel_sum() {
let data: Vec<i32> = (1..=100).collect();
assert_eq!(parallel_sum(&data), 5050);
}
#[test]
fn test_parallel_product() {
let data: Vec<i64> = (1..=10).collect();
assert_eq!(parallel_product(&data), 3628800); // 10!
}
#[test]
fn test_parallel_min() {
let data = vec![5, 2, 8, 1, 9, 3];
assert_eq!(parallel_min(&data), Some(1));
}
#[test]
fn test_parallel_max() {
let data = vec![5, 2, 8, 1, 9, 3];
assert_eq!(parallel_max(&data), Some(9));
}
#[test]
fn test_parallel_all() {
let data: Vec<i32> = (1..=100).collect();
assert!(parallel_all(&data, &|x| *x > 0));
assert!(!parallel_all(&data, &|x| *x > 50));
}
#[test]
fn test_parallel_any() {
let data: Vec<i32> = (1..=100).collect();
assert!(parallel_any(&data, &|x| *x == 50));
assert!(!parallel_any(&data, &|x| *x > 100));
}
#[test]
fn test_parallel_count() {
let data: Vec<i32> = (1..=100).collect();
assert_eq!(parallel_count(&data, &|x| x % 2 == 0), 50);
}
#[test]
fn test_empty_cases() {
let empty: Vec<i32> = vec![];
assert_eq!(parallel_sum(&empty), 0);
assert_eq!(parallel_min(&empty), None);
assert_eq!(parallel_max(&empty), None);
assert!(parallel_all(&empty, &|_: &i32| false));
assert!(!parallel_any(&empty, &|_: &i32| true));
}
}Key Differences
thread::scope borrows the slice directly without cloning; OCaml's Domain.spawn requires values to be shared across domain boundaries, usually via Atomic refs.T: Send + Sync + Clone is checked at compile time for every call site; OCaml has no equivalent — unsynchronised domain access is a runtime data race.op is associative; the algorithm is correct only when it is. OCaml has the same gap.OCaml Approach
Multicore OCaml uses Domainslib.Task.parallel_for_reduce:
let pool = Domainslib.Task.setup_pool ~num_additional_domains:3 ()
let sum = Domainslib.Task.run pool (fun () ->
Domainslib.Task.parallel_for_reduce pool
~start:0 ~finish:(Array.length arr - 1)
~body:(fun i -> arr.(i))
(+) 0)
Functional OCaml without Domainslib uses immutable arrays and List.fold_left sequentially — parallel reduction requires explicit domain creation via Domain.spawn.
Full Source
#![allow(clippy::all)]
//! # Parallel Reduce — Concurrent Aggregation
//!
//! Reduce collections in parallel for faster aggregation.
use std::thread;
/// Parallel reduce with custom operation
pub fn parallel_reduce<T>(data: &[T], identity: T, op: &(dyn Fn(T, T) -> T + Sync)) -> T
where
T: Send + Sync + Clone,
{
const THRESHOLD: usize = 100;
if data.is_empty() {
return identity;
}
if data.len() <= THRESHOLD {
return data.iter().cloned().fold(identity, |a, b| op(a, b));
}
let mid = data.len() / 2;
let (left, right) = data.split_at(mid);
thread::scope(|s| {
let id_left = identity.clone();
let left_handle = s.spawn(|| parallel_reduce(left, id_left, op));
let right_result = parallel_reduce(right, identity, op);
op(left_handle.join().unwrap(), right_result)
})
}
/// Parallel sum
pub fn parallel_sum<T>(data: &[T]) -> T
where
T: Send + Sync + Clone + std::ops::Add<Output = T> + Default,
{
parallel_reduce(data, T::default(), &|a, b| a + b)
}
/// Parallel product
pub fn parallel_product<T>(data: &[T]) -> T
where
T: Send + Sync + Clone + std::ops::Mul<Output = T> + From<u8>,
{
parallel_reduce(data, T::from(1), &|a, b| a * b)
}
/// Parallel min
pub fn parallel_min<T: Send + Sync + Clone + Ord>(data: &[T]) -> Option<T> {
if data.is_empty() {
return None;
}
Some(parallel_reduce(data, data[0].clone(), &|a, b| {
if a < b {
a
} else {
b
}
}))
}
/// Parallel max
pub fn parallel_max<T: Send + Sync + Clone + Ord>(data: &[T]) -> Option<T> {
if data.is_empty() {
return None;
}
Some(parallel_reduce(data, data[0].clone(), &|a, b| {
if a > b {
a
} else {
b
}
}))
}
/// Parallel all (conjunction)
pub fn parallel_all<T>(data: &[T], predicate: &(dyn Fn(&T) -> bool + Sync)) -> bool
where
T: Sync,
{
const THRESHOLD: usize = 100;
if data.is_empty() {
return true;
}
if data.len() <= THRESHOLD {
return data.iter().all(predicate);
}
let mid = data.len() / 2;
let (left, right) = data.split_at(mid);
thread::scope(|s| {
let left_handle = s.spawn(|| parallel_all(left, predicate));
let right_result = parallel_all(right, predicate);
left_handle.join().unwrap() && right_result
})
}
/// Parallel any (disjunction)
pub fn parallel_any<T>(data: &[T], predicate: &(dyn Fn(&T) -> bool + Sync)) -> bool
where
T: Sync,
{
const THRESHOLD: usize = 100;
if data.is_empty() {
return false;
}
if data.len() <= THRESHOLD {
return data.iter().any(predicate);
}
let mid = data.len() / 2;
let (left, right) = data.split_at(mid);
thread::scope(|s| {
let left_handle = s.spawn(|| parallel_any(left, predicate));
let right_result = parallel_any(right, predicate);
left_handle.join().unwrap() || right_result
})
}
/// Parallel count matching predicate
pub fn parallel_count<T>(data: &[T], predicate: &(dyn Fn(&T) -> bool + Sync)) -> usize
where
T: Sync,
{
const THRESHOLD: usize = 100;
if data.len() <= THRESHOLD {
return data.iter().filter(|x| predicate(x)).count();
}
let mid = data.len() / 2;
let (left, right) = data.split_at(mid);
thread::scope(|s| {
let left_handle = s.spawn(|| parallel_count(left, predicate));
let right_result = parallel_count(right, predicate);
left_handle.join().unwrap() + right_result
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_reduce_sum() {
let data: Vec<i64> = (1..=1000).collect();
let sum = parallel_reduce(&data, 0, &|a, b| a + b);
assert_eq!(sum, 500500);
}
#[test]
fn test_parallel_sum() {
let data: Vec<i32> = (1..=100).collect();
assert_eq!(parallel_sum(&data), 5050);
}
#[test]
fn test_parallel_product() {
let data: Vec<i64> = (1..=10).collect();
assert_eq!(parallel_product(&data), 3628800); // 10!
}
#[test]
fn test_parallel_min() {
let data = vec![5, 2, 8, 1, 9, 3];
assert_eq!(parallel_min(&data), Some(1));
}
#[test]
fn test_parallel_max() {
let data = vec![5, 2, 8, 1, 9, 3];
assert_eq!(parallel_max(&data), Some(9));
}
#[test]
fn test_parallel_all() {
let data: Vec<i32> = (1..=100).collect();
assert!(parallel_all(&data, &|x| *x > 0));
assert!(!parallel_all(&data, &|x| *x > 50));
}
#[test]
fn test_parallel_any() {
let data: Vec<i32> = (1..=100).collect();
assert!(parallel_any(&data, &|x| *x == 50));
assert!(!parallel_any(&data, &|x| *x > 100));
}
#[test]
fn test_parallel_count() {
let data: Vec<i32> = (1..=100).collect();
assert_eq!(parallel_count(&data, &|x| x % 2 == 0), 50);
}
#[test]
fn test_empty_cases() {
let empty: Vec<i32> = vec![];
assert_eq!(parallel_sum(&empty), 0);
assert_eq!(parallel_min(&empty), None);
assert_eq!(parallel_max(&empty), None);
assert!(parallel_all(&empty, &|_: &i32| false));
assert!(!parallel_any(&empty, &|_: &i32| true));
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_reduce_sum() {
let data: Vec<i64> = (1..=1000).collect();
let sum = parallel_reduce(&data, 0, &|a, b| a + b);
assert_eq!(sum, 500500);
}
#[test]
fn test_parallel_sum() {
let data: Vec<i32> = (1..=100).collect();
assert_eq!(parallel_sum(&data), 5050);
}
#[test]
fn test_parallel_product() {
let data: Vec<i64> = (1..=10).collect();
assert_eq!(parallel_product(&data), 3628800); // 10!
}
#[test]
fn test_parallel_min() {
let data = vec![5, 2, 8, 1, 9, 3];
assert_eq!(parallel_min(&data), Some(1));
}
#[test]
fn test_parallel_max() {
let data = vec![5, 2, 8, 1, 9, 3];
assert_eq!(parallel_max(&data), Some(9));
}
#[test]
fn test_parallel_all() {
let data: Vec<i32> = (1..=100).collect();
assert!(parallel_all(&data, &|x| *x > 0));
assert!(!parallel_all(&data, &|x| *x > 50));
}
#[test]
fn test_parallel_any() {
let data: Vec<i32> = (1..=100).collect();
assert!(parallel_any(&data, &|x| *x == 50));
assert!(!parallel_any(&data, &|x| *x > 100));
}
#[test]
fn test_parallel_count() {
let data: Vec<i32> = (1..=100).collect();
assert_eq!(parallel_count(&data, &|x| x % 2 == 0), 50);
}
#[test]
fn test_empty_cases() {
let empty: Vec<i32> = vec![];
assert_eq!(parallel_sum(&empty), 0);
assert_eq!(parallel_min(&empty), None);
assert_eq!(parallel_max(&empty), None);
assert!(parallel_all(&empty, &|_: &i32| false));
assert!(!parallel_any(&empty, &|_: &i32| true));
}
}
Deep Comparison
Comparison
See src/lib.rs for Rust implementation.
Exercises
available_parallelism(). Measure whether this reduces thread-spawn overhead.Monoid<T> trait with identity() -> T and combine(T, T) -> T, then rewrite parallel_reduce to accept M: Monoid<T> instead of separate identity and op arguments.parallel_prefix_sum(data: &[i64]) -> Vec<i64> that returns the inclusive prefix sum array using a parallel up-sweep and down-sweep (Blelloch scan algorithm).