447: Work Stealing — Load Balancing Across Threads
Tutorial Video
Text description (accessibility)
This video demonstrates the "447: Work Stealing — Load Balancing Across Threads" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. A simple thread pool with a single shared queue becomes a contention bottleneck: all threads compete for the same lock to get the next job. Key difference from OCaml: 1. **Lock
Tutorial
The Problem
A simple thread pool with a single shared queue becomes a contention bottleneck: all threads compete for the same lock to get the next job. Work stealing solves this: each thread has its own local deque. Workers process their own tasks from the front; idle workers steal tasks from the back of busy workers' deques. This locality-aware load balancing was introduced in Cilk (MIT, 1990s) and is the algorithm behind Java's ForkJoinPool, rayon, Go's goroutine scheduler, and .NET's Task Parallel Library.
Work stealing powers rayon's parallel iterators, Tokio's multi-threaded runtime, and scientific computing frameworks needing efficient dynamic load balancing.
🎯 Learning Outcomes
Arc<Mutex<VecDeque<T>>> simulates a work-stealing dequerayon uses deque-based work stealing internallyCode Example
type Queue = Arc<Mutex<VecDeque<u32>>>;
fn worker(own: Queue, others: Vec<Queue>) {
loop {
// Own queue: pop from front
if let Some(job) = own.lock().unwrap().pop_front() {
process(job);
continue;
}
// Steal: pop from back of others
for q in &others {
if let Ok(mut g) = q.try_lock() {
if let Some(job) = g.pop_back() {
process(job);
break;
}
}
}
}
}Key Differences
Arc<Mutex<VecDeque>> for clarity.rayon provides production work stealing; manual implementation is for educational purposes.SeqCst/Acquire/Release); the lock-based version handles this automatically.OCaml Approach
OCaml 5.x's Domainslib uses work stealing internally for its task pool. The Task.pool creates a domain pool with per-domain queues and stealing. OCaml 4.x's thread-based work doesn't benefit from work stealing due to the GIL. For custom work stealing in OCaml 5.x, the deque package provides lock-free Chase-Lev deques — the standard work-stealing data structure.
Full Source
#![allow(clippy::all)]
//! # Work Stealing — Load Balancing Across Threads
//!
//! A pattern where idle workers "steal" tasks from busy workers'
//! queues to balance load dynamically.
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::thread;
/// A work-stealing deque for a single worker
pub type WorkQueue<T> = Arc<Mutex<VecDeque<T>>>;
/// Create a new work queue
pub fn new_queue<T>() -> WorkQueue<T> {
Arc::new(Mutex::new(VecDeque::new()))
}
/// Approach 1: Simple work stealing with shared deques
pub fn work_stealing_demo(num_jobs: usize, num_workers: usize) -> usize {
let queues: Vec<WorkQueue<u32>> = (0..num_workers).map(|_| new_queue()).collect();
// Load all jobs into first worker's queue
{
let mut q = queues[0].lock().unwrap();
for j in 0..num_jobs as u32 {
q.push_back(j);
}
}
let completed = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let handles: Vec<_> = (0..num_workers)
.map(|i| {
let own_queue = Arc::clone(&queues[i]);
let other_queues: Vec<_> = queues
.iter()
.enumerate()
.filter(|&(j, _)| j != i)
.map(|(_, q)| Arc::clone(q))
.collect();
let completed = Arc::clone(&completed);
thread::spawn(move || {
loop {
// Try own queue first (pop from front)
if let Some(_job) = own_queue.lock().unwrap().pop_front() {
completed.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
continue;
}
// Try stealing from others (pop from back)
let mut stole = false;
for other in &other_queues {
if let Ok(mut guard) = other.try_lock() {
if let Some(_job) = guard.pop_back() {
drop(guard);
completed.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
stole = true;
break;
}
}
}
if !stole {
break;
}
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
completed.load(std::sync::atomic::Ordering::SeqCst)
}
/// Approach 2: Work stealing with local processing
pub struct WorkStealingPool {
queues: Vec<WorkQueue<Box<dyn FnOnce() + Send>>>,
num_workers: usize,
}
impl WorkStealingPool {
pub fn new(num_workers: usize) -> Self {
let queues = (0..num_workers).map(|_| new_queue()).collect();
Self {
queues,
num_workers,
}
}
/// Push work to a specific worker's queue
pub fn push(&self, worker_id: usize, job: Box<dyn FnOnce() + Send>) {
let id = worker_id % self.num_workers;
self.queues[id].lock().unwrap().push_back(job);
}
/// Push work round-robin
pub fn push_round_robin(&self, jobs: Vec<Box<dyn FnOnce() + Send>>) {
for (i, job) in jobs.into_iter().enumerate() {
self.push(i, job);
}
}
/// Run all jobs using work stealing
pub fn run(self) {
let handles: Vec<_> = (0..self.num_workers)
.map(|i| {
let own = Arc::clone(&self.queues[i]);
let others: Vec<_> = self
.queues
.iter()
.enumerate()
.filter(|&(j, _)| j != i)
.map(|(_, q)| Arc::clone(q))
.collect();
thread::spawn(move || {
loop {
// Own queue first
if let Some(job) = own.lock().unwrap().pop_front() {
job();
continue;
}
// Steal
let mut done = true;
for other in &others {
if let Ok(mut g) = other.try_lock() {
if let Some(job) = g.pop_back() {
drop(g);
job();
done = false;
break;
}
}
}
if done {
break;
}
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_steal_from_back() {
let q: WorkQueue<u32> = new_queue();
{
let mut g = q.lock().unwrap();
g.push_back(1);
g.push_back(2);
g.push_back(3);
}
// Steal from back
assert_eq!(q.lock().unwrap().pop_back(), Some(3));
// Own work from front
assert_eq!(q.lock().unwrap().pop_front(), Some(1));
}
#[test]
fn test_all_jobs_complete() {
let completed = work_stealing_demo(20, 4);
assert_eq!(completed, 20);
}
#[test]
fn test_single_worker() {
let completed = work_stealing_demo(10, 1);
assert_eq!(completed, 10);
}
#[test]
fn test_work_stealing_pool() {
let count = Arc::new(AtomicUsize::new(0));
let pool = WorkStealingPool::new(4);
for _ in 0..20 {
let c = Arc::clone(&count);
pool.push(
0,
Box::new(move || {
c.fetch_add(1, Ordering::Relaxed);
}),
);
}
pool.run();
assert_eq!(count.load(Ordering::SeqCst), 20);
}
#[test]
fn test_round_robin_distribution() {
let counts: Vec<_> = (0..4).map(|_| Arc::new(AtomicUsize::new(0))).collect();
let pool = WorkStealingPool::new(4);
let jobs: Vec<_> = (0..8)
.map(|i| {
let c = Arc::clone(&counts[i % 4]);
let job: Box<dyn FnOnce() + Send> = Box::new(move || {
c.fetch_add(1, Ordering::Relaxed);
});
job
})
.collect();
pool.push_round_robin(jobs);
pool.run();
// Each counter should have been incremented twice
for c in &counts {
assert_eq!(c.load(Ordering::SeqCst), 2);
}
}
}#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_steal_from_back() {
let q: WorkQueue<u32> = new_queue();
{
let mut g = q.lock().unwrap();
g.push_back(1);
g.push_back(2);
g.push_back(3);
}
// Steal from back
assert_eq!(q.lock().unwrap().pop_back(), Some(3));
// Own work from front
assert_eq!(q.lock().unwrap().pop_front(), Some(1));
}
#[test]
fn test_all_jobs_complete() {
let completed = work_stealing_demo(20, 4);
assert_eq!(completed, 20);
}
#[test]
fn test_single_worker() {
let completed = work_stealing_demo(10, 1);
assert_eq!(completed, 10);
}
#[test]
fn test_work_stealing_pool() {
let count = Arc::new(AtomicUsize::new(0));
let pool = WorkStealingPool::new(4);
for _ in 0..20 {
let c = Arc::clone(&count);
pool.push(
0,
Box::new(move || {
c.fetch_add(1, Ordering::Relaxed);
}),
);
}
pool.run();
assert_eq!(count.load(Ordering::SeqCst), 20);
}
#[test]
fn test_round_robin_distribution() {
let counts: Vec<_> = (0..4).map(|_| Arc::new(AtomicUsize::new(0))).collect();
let pool = WorkStealingPool::new(4);
let jobs: Vec<_> = (0..8)
.map(|i| {
let c = Arc::clone(&counts[i % 4]);
let job: Box<dyn FnOnce() + Send> = Box::new(move || {
c.fetch_add(1, Ordering::Relaxed);
});
job
})
.collect();
pool.push_round_robin(jobs);
pool.run();
// Each counter should have been incremented twice
for c in &counts {
assert_eq!(c.load(Ordering::SeqCst), 2);
}
}
}
Deep Comparison
OCaml vs Rust: Work Stealing
Basic Concept
Work stealing: each worker has its own queue. When a worker's queue is empty, it "steals" work from another worker's queue (from the back).
OCaml (Simplified shared queue)
let deque = ref []
let mutex = Mutex.create ()
let steal () =
Mutex.lock mutex;
let r = match !deque with
| [] -> None
| x::rest -> deque := rest; Some x
in
Mutex.unlock mutex; r
Rust
type Queue = Arc<Mutex<VecDeque<u32>>>;
fn worker(own: Queue, others: Vec<Queue>) {
loop {
// Own queue: pop from front
if let Some(job) = own.lock().unwrap().pop_front() {
process(job);
continue;
}
// Steal: pop from back of others
for q in &others {
if let Ok(mut g) = q.try_lock() {
if let Some(job) = g.pop_back() {
process(job);
break;
}
}
}
}
}
Key Differences
| Feature | OCaml | Rust |
|---|---|---|
| Per-worker queue | No (shared) | Yes (VecDeque per worker) |
| Pop strategy | Front only | Front (own) / Back (steal) |
| Lock contention | High (shared) | Low (try_lock + skip) |
| Data structure | List | VecDeque (double-ended) |
Why Pop from Back?
Worker 1's queue: [A, B, C, D, E]
^ ^
| |
own pop steal pop
(front) (back)
- Worker 1 processes A, B, C...
- Worker 2 steals E, D...
- Less contention: both ends accessed
Non-blocking Steal
OCaml
(* No try_lock in stdlib — must block *)
Mutex.lock mutex;
(* ... *)
Mutex.unlock mutex
Rust
// try_lock: don't wait if locked
if let Ok(mut guard) = other_queue.try_lock() {
if let Some(job) = guard.pop_back() {
// Got work
}
}
// If locked, skip to next queue