922-cancellation-token — Cancellation Token
Tutorial
The Problem
Long-running operations — large file downloads, iterative computations, background scans — must be stoppable. Stopping a thread forcefully from outside is unsafe in any language (leaked resources, inconsistent state). The safe pattern is cooperative cancellation: the running task periodically checks a cancellation flag and exits cleanly when set. This is the CancellationToken pattern from .NET, context.Context from Go, and AbortController from JavaScript. Rust's std::sync::atomic::AtomicBool provides the thread-safe flag needed for this pattern without heap allocation overhead.
🎯 Learning Outcomes
CancellationToken using Arc<AtomicBool> for shared cancellation stateOrdering::Release and Ordering::Acquire for correct cross-thread visibilityLwt_switch and Fiber.with_cancellationCode Example
#![allow(clippy::all)]
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[derive(Clone)]
struct CancellationToken {
cancelled: Arc<AtomicBool>,
}
impl CancellationToken {
fn new() -> Self {
Self {
cancelled: Arc::new(AtomicBool::new(false)),
}
}
fn cancel(&self) {
self.cancelled.store(true, Ordering::Release);
}
fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Acquire)
}
}
fn long_task(token: CancellationToken, steps: usize) -> Result<String, String> {
for i in 0..steps {
if token.is_cancelled() {
return Err(format!("cancelled at step {i}"));
}
// Do work
thread::sleep(Duration::from_millis(10));
println!("Step {i} complete");
}
Ok(format!("completed all {steps} steps"))
}
fn cancellable_sum(token: CancellationToken, data: &[i64]) -> Option<i64> {
let mut sum = 0i64;
for (i, &x) in data.iter().enumerate() {
if i % 1000 == 0 && token.is_cancelled() {
return None;
}
sum = sum.saturating_add(x);
}
Some(sum)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn token_starts_not_cancelled() {
let t = CancellationToken::new();
assert!(!t.is_cancelled());
}
#[test]
fn cancel_sets_flag() {
let t = CancellationToken::new();
t.cancel();
assert!(t.is_cancelled());
}
#[test]
fn task_completes_without_cancel() {
let t = CancellationToken::new();
let result = long_task(t, 2);
assert!(result.is_ok());
}
#[test]
fn task_cancelled_immediately() {
let t = CancellationToken::new();
t.cancel();
assert!(long_task(t, 100).is_err());
}
}Key Differences
AtomicBool for lock-free cancellation checking; OCaml needs ref + Mutex for equivalent thread safety.Release/Acquire semantics are explicit and precise; OCaml's Mutex provides full mutual exclusion (stronger but more costly).Arc::clone (atomic reference count); OCaml's Mutex-wrapped bool requires similar Arc / ref wrapping.Lwt_switch integrates with the Lwt event loop; Rust's AtomicBool is a low-level primitive — tokio::CancellationToken is the high-level version.OCaml Approach
OCaml's Lwt_switch provides cooperative cancellation for Lwt promises. Lwt_switch.create () creates a switch; Lwt_switch.add_hook switch f registers cleanup; Lwt_switch.turn_off switch cancels. OCaml 5 Eio uses Fiber.with_cancellation and Cancel.cancel. For plain threads: let cancelled = ref false with a Mutex for thread safety, equivalent to Rust's AtomicBool. The Go-style context.Context has no direct OCaml equivalent in the standard library.
Full Source
#![allow(clippy::all)]
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[derive(Clone)]
struct CancellationToken {
cancelled: Arc<AtomicBool>,
}
impl CancellationToken {
fn new() -> Self {
Self {
cancelled: Arc::new(AtomicBool::new(false)),
}
}
fn cancel(&self) {
self.cancelled.store(true, Ordering::Release);
}
fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Acquire)
}
}
fn long_task(token: CancellationToken, steps: usize) -> Result<String, String> {
for i in 0..steps {
if token.is_cancelled() {
return Err(format!("cancelled at step {i}"));
}
// Do work
thread::sleep(Duration::from_millis(10));
println!("Step {i} complete");
}
Ok(format!("completed all {steps} steps"))
}
fn cancellable_sum(token: CancellationToken, data: &[i64]) -> Option<i64> {
let mut sum = 0i64;
for (i, &x) in data.iter().enumerate() {
if i % 1000 == 0 && token.is_cancelled() {
return None;
}
sum = sum.saturating_add(x);
}
Some(sum)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn token_starts_not_cancelled() {
let t = CancellationToken::new();
assert!(!t.is_cancelled());
}
#[test]
fn cancel_sets_flag() {
let t = CancellationToken::new();
t.cancel();
assert!(t.is_cancelled());
}
#[test]
fn task_completes_without_cancel() {
let t = CancellationToken::new();
let result = long_task(t, 2);
assert!(result.is_ok());
}
#[test]
fn task_cancelled_immediately() {
let t = CancellationToken::new();
t.cancel();
assert!(long_task(t, 100).is_err());
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn token_starts_not_cancelled() {
let t = CancellationToken::new();
assert!(!t.is_cancelled());
}
#[test]
fn cancel_sets_flag() {
let t = CancellationToken::new();
t.cancel();
assert!(t.is_cancelled());
}
#[test]
fn task_completes_without_cancel() {
let t = CancellationToken::new();
let result = long_task(t, 2);
assert!(result.is_ok());
}
#[test]
fn task_cancelled_immediately() {
let t = CancellationToken::new();
t.cancel();
assert!(long_task(t, 100).is_err());
}
}
Deep Comparison
922-cancellation-token — Language Comparison
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
timeout_token that automatically sets itself after a specified Duration, combining CancellationToken with thread::sleep.cancel_after(n: usize) method to CancellationToken that sets itself after n calls to is_cancelled().CancellableIter<I: Iterator> wrapper that checks the token on each .next() call.