ExamplesBy LevelBy TopicLearning Paths
343 Advanced

343: Cancellation Token

Functional Programming

Tutorial

The Problem

Long-running background workers need a way to stop cleanly when the application shuts down or a user cancels an operation. Forcibly killing threads (pthread_cancel in C) is unsafe — it can leave mutexes locked or resources open. Cooperative cancellation instead asks workers to check a shared flag and stop themselves at defined safe points. The cancellation token pattern formalizes this: a shared atomic boolean that any thread can check and any caller can flip. This is the same pattern used by Linux kernel's kthread_should_stop(), Java's Thread.interrupted(), and the CancellationToken in .NET and Tokio.

🎯 Learning Outcomes

  • • Use Arc<AtomicBool> to share a cancellation signal across threads safely
  • • Use Ordering::Relaxed for the cancellation check — correctness doesn't require sequential consistency here
  • • Implement worker loops that check is_cancelled() at each iteration
  • • Drop senders / signal tokens to trigger graceful shutdown
  • • Understand why cooperative cancellation is safer than forced termination
  • • Recognize where to place cancellation checkpoints in compute-bound loops
  • Code Example

    #![allow(clippy::all)]
    // 343: Cancellation Token
    // Arc<AtomicBool> for cooperative cancellation across threads
    
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::sync::Arc;
    use std::thread;
    use std::time::Duration;
    
    // Approach 1: Simple cancellation token
    fn make_token() -> Arc<AtomicBool> {
        Arc::new(AtomicBool::new(false))
    }
    
    fn cancel(token: &AtomicBool) {
        token.store(true, Ordering::Relaxed);
    }
    
    fn is_cancelled(token: &AtomicBool) -> bool {
        token.load(Ordering::Relaxed)
    }
    
    // Approach 2: Worker that respects cancellation
    fn worker(token: Arc<AtomicBool>, name: String) -> String {
        let mut count = 0u64;
        while !is_cancelled(&token) && count < 1_000_000 {
            count += 1;
        }
        format!("{} did {} iterations", name, count)
    }
    
    // Approach 3: Multi-worker with shared token
    fn run_workers(n: usize) -> Vec<String> {
        let token = make_token();
        let handles: Vec<_> = (0..n)
            .map(|i| {
                let t = token.clone();
                let name = format!("worker-{}", i);
                thread::spawn(move || worker(t, name))
            })
            .collect();
    
        thread::sleep(Duration::from_millis(1));
        cancel(&token);
    
        handles.into_iter().map(|h| h.join().unwrap()).collect()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_token() {
            let token = make_token();
            assert!(!is_cancelled(&token));
            cancel(&token);
            assert!(is_cancelled(&token));
        }
    
        #[test]
        fn test_worker_cancellation() {
            let token = make_token();
            let t = token.clone();
            let handle = thread::spawn(move || worker(t, "test".into()));
            thread::sleep(Duration::from_millis(1));
            cancel(&token);
            let result = handle.join().unwrap();
            assert!(result.starts_with("test"));
        }
    
        #[test]
        fn test_multi_workers() {
            let results = run_workers(3);
            assert_eq!(results.len(), 3);
            for r in &results {
                assert!(r.contains("worker-"));
            }
        }
    }

    Key Differences

    AspectRust Arc<AtomicBool>OCaml Atomic.make bool
    Sharing mechanismExplicit Arc::cloneGC handles sharing
    Memory orderingExplicit (Relaxed/Acquire/Release)Sequential consistency by default
    Compile-time safetyAtomicBool: Sync + Send verifiedNo equivalent check
    Tokio integrationtokio_util::CancellationToken (structured)Lwt.cancel for async tasks

    OCaml Approach

    OCaml 5 domains use Atomic references for shared flags:

    let cancelled = Atomic.make false in
    let worker () =
      while not (Atomic.get cancelled) do
        (* work *)
      done
    in
    let d = Domain.spawn worker in
    Unix.sleepf 0.1;
    Atomic.set cancelled true;
    Domain.join d
    

    In OCaml 4 with threads, a ref plus Mutex achieves the same — the GIL often makes plain ref safe for simple boolean flags, but Atomic is the correct approach for multi-domain OCaml 5 programs.

    Full Source

    #![allow(clippy::all)]
    // 343: Cancellation Token
    // Arc<AtomicBool> for cooperative cancellation across threads
    
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::sync::Arc;
    use std::thread;
    use std::time::Duration;
    
    // Approach 1: Simple cancellation token
    fn make_token() -> Arc<AtomicBool> {
        Arc::new(AtomicBool::new(false))
    }
    
    fn cancel(token: &AtomicBool) {
        token.store(true, Ordering::Relaxed);
    }
    
    fn is_cancelled(token: &AtomicBool) -> bool {
        token.load(Ordering::Relaxed)
    }
    
    // Approach 2: Worker that respects cancellation
    fn worker(token: Arc<AtomicBool>, name: String) -> String {
        let mut count = 0u64;
        while !is_cancelled(&token) && count < 1_000_000 {
            count += 1;
        }
        format!("{} did {} iterations", name, count)
    }
    
    // Approach 3: Multi-worker with shared token
    fn run_workers(n: usize) -> Vec<String> {
        let token = make_token();
        let handles: Vec<_> = (0..n)
            .map(|i| {
                let t = token.clone();
                let name = format!("worker-{}", i);
                thread::spawn(move || worker(t, name))
            })
            .collect();
    
        thread::sleep(Duration::from_millis(1));
        cancel(&token);
    
        handles.into_iter().map(|h| h.join().unwrap()).collect()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_token() {
            let token = make_token();
            assert!(!is_cancelled(&token));
            cancel(&token);
            assert!(is_cancelled(&token));
        }
    
        #[test]
        fn test_worker_cancellation() {
            let token = make_token();
            let t = token.clone();
            let handle = thread::spawn(move || worker(t, "test".into()));
            thread::sleep(Duration::from_millis(1));
            cancel(&token);
            let result = handle.join().unwrap();
            assert!(result.starts_with("test"));
        }
    
        #[test]
        fn test_multi_workers() {
            let results = run_workers(3);
            assert_eq!(results.len(), 3);
            for r in &results {
                assert!(r.contains("worker-"));
            }
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_token() {
            let token = make_token();
            assert!(!is_cancelled(&token));
            cancel(&token);
            assert!(is_cancelled(&token));
        }
    
        #[test]
        fn test_worker_cancellation() {
            let token = make_token();
            let t = token.clone();
            let handle = thread::spawn(move || worker(t, "test".into()));
            thread::sleep(Duration::from_millis(1));
            cancel(&token);
            let result = handle.join().unwrap();
            assert!(result.starts_with("test"));
        }
    
        #[test]
        fn test_multi_workers() {
            let results = run_workers(3);
            assert_eq!(results.len(), 3);
            for r in &results {
                assert!(r.contains("worker-"));
            }
        }
    }

    Deep Comparison

    Core Insight

    Shared atomic boolean provides cooperative cancellation — threads check the flag and exit gracefully

    OCaml Approach

  • • See example.ml for implementation
  • Rust Approach

  • • See example.rs for implementation
  • Comparison Table

    FeatureOCamlRust
    Seeexample.mlexample.rs

    Exercises

  • Timeout-based cancellation: Start a worker and cancel it after 100ms using thread::sleep in the main thread; report how many iterations the worker completed.
  • Multi-signal cancellation: Create a token that requires two callers to both signal before workers stop — implement this as a counter (AtomicUsize) that workers check against a threshold.
  • Graceful shutdown with cleanup: Add a second channel alongside the cancellation token so workers can send a "cleanup complete" message before exiting; have the main thread wait for all cleanup confirmations.
  • Open Source Repos