1002 — Backpressure
Tutorial
The Problem
Demonstrate backpressure in a concurrent pipeline using mpsc::sync_channel — a bounded channel where the sender blocks when the buffer is full. Show try_send for non-blocking drop-on-full semantics, and a multi-stage bounded pipeline. Compare with OCaml's Mutex/Condition-based bounded channel implementation.
🎯 Learning Outcomes
mpsc::sync_channel(N) to create a channel that blocks senders when N items are bufferedtry_send for non-blocking send that returns TrySendError::Full when the buffer is fullsync_channel to OCaml's manual Mutex + Condition bounded queueCode Example
#![allow(clippy::all)]
// 1002: Backpressure — Bounded sync_channel blocks producer
// When consumer is slow, bounded buffer fills and producer is forced to wait
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
// --- Approach 1: sync_channel with slow consumer ---
fn bounded_backpressure() -> (usize, Duration) {
const BUFFER_SIZE: usize = 3;
// sync_channel(N): sender blocks when N items are buffered
let (tx, rx) = mpsc::sync_channel::<i32>(BUFFER_SIZE);
let start = Instant::now();
let producer = thread::spawn(move || {
for i in 1..=9 {
tx.send(i).unwrap(); // blocks when buffer is full
}
// tx drops here — signals consumer to stop
});
let consumer = thread::spawn(move || {
for item in rx.iter() {
thread::sleep(Duration::from_millis(5)); // slow consumer
let _ = item;
}
});
producer.join().unwrap();
consumer.join().unwrap();
(9, start.elapsed())
}
// --- Approach 2: try_send for non-blocking backpressure (drop or error) ---
fn try_send_demo() -> (usize, usize) {
let (tx, rx) = mpsc::sync_channel::<i32>(2);
let mut accepted = 0;
let mut dropped = 0;
for i in 1..=10 {
match tx.try_send(i) {
Ok(_) => accepted += 1,
Err(mpsc::TrySendError::Full(_)) => dropped += 1,
Err(mpsc::TrySendError::Disconnected(_)) => break,
}
}
drop(tx);
let drained: Vec<_> = rx.iter().collect();
assert_eq!(drained.len(), accepted);
(accepted, dropped)
}
// --- Approach 3: Bounded pipeline with backpressure between stages ---
fn bounded_pipeline(items: Vec<i32>) -> Vec<i32> {
// Stage channels — each bounded to 2 items
let (tx1, rx1) = mpsc::sync_channel::<i32>(2);
let (tx2, rx2) = mpsc::sync_channel::<i32>(2);
let (tx3, rx3) = mpsc::sync_channel::<i32>(2);
// Stage 1: double
thread::spawn(move || {
for item in rx1.iter() {
tx2.send(item * 2).unwrap();
}
});
// Stage 2: add 1 (slow)
thread::spawn(move || {
for item in rx2.iter() {
thread::sleep(Duration::from_millis(1)); // simulate slow processing
tx3.send(item + 1).unwrap();
}
});
// Producer
let producer = thread::spawn(move || {
for item in items {
tx1.send(item).unwrap();
} // blocks when stage 1 full
});
// Collect
let results: Vec<i32> = rx3.iter().collect();
producer.join().unwrap();
results
}
// --- Approach 4: Measure backpressure effect ---
fn measure_backpressure_effect() -> bool {
// With buffer=1: producer is slowed to consumer's pace
let (tx_fast, rx_fast) = mpsc::channel::<i32>(); // unbounded
let (tx_bounded, rx_bounded) = mpsc::sync_channel::<i32>(1); // bounded=1
let fast_start = Instant::now();
let h = thread::spawn(move || {
for i in 0..20 {
tx_fast.send(i).unwrap();
}
});
h.join().unwrap();
let fast_time = fast_start.elapsed();
drop(rx_fast);
let bounded_start = Instant::now();
let h2 = thread::spawn(move || {
for i in 0..20 {
tx_bounded.send(i).unwrap();
}
});
// Slow consumer
thread::spawn(move || {
for _ in rx_bounded.iter() {
thread::sleep(Duration::from_millis(1));
}
});
h2.join().unwrap();
let bounded_time = bounded_start.elapsed();
// Bounded (backpressure) should be slower than unbounded
bounded_time > fast_time
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bounded_backpressure_processes_all() {
let (count, _) = bounded_backpressure();
assert_eq!(count, 9);
}
#[test]
fn test_try_send_drops_when_full() {
let (accepted, dropped) = try_send_demo();
assert_eq!(accepted, 2); // buffer size = 2
assert_eq!(dropped, 8); // remaining 8 are dropped
assert_eq!(accepted + dropped, 10);
}
#[test]
fn test_bounded_pipeline_correctness() {
// 1*2+1=3, 2*2+1=5, 3*2+1=7
let mut results = bounded_pipeline(vec![1, 2, 3]);
results.sort();
assert_eq!(results, vec![3, 5, 7]);
}
#[test]
fn test_sync_channel_zero_buffer_rendezvous() {
// sync_channel(0) = rendezvous — sender blocks until receiver takes
let (tx, rx) = mpsc::sync_channel::<i32>(0);
let h = thread::spawn(move || {
tx.send(42).unwrap(); // blocks until receiver calls recv()
});
assert_eq!(rx.recv().unwrap(), 42);
h.join().unwrap();
}
#[test]
fn test_backpressure_is_slower() {
assert!(measure_backpressure_effect());
}
#[test]
fn test_try_send_error_type() {
let (tx, _rx) = mpsc::sync_channel::<i32>(1);
tx.try_send(1).unwrap(); // fills the buffer
let err = tx.try_send(2);
assert!(matches!(err, Err(mpsc::TrySendError::Full(_))));
}
}Key Differences
| Aspect | Rust | OCaml |
|---|---|---|
| Bounded channel | mpsc::sync_channel(N) | Manual Mutex + Condition |
| Blocking send | tx.send(v) blocks when full | Condition.wait not_full m |
| Non-blocking | tx.try_send(v) → TrySendError | Conditional if Queue.length < capacity |
| Consumer | rx.iter() | recv_bounded with Condition.wait not_empty |
| Pipeline | Chain sync_channels | Chain make_bounded_chans |
| Code length | Short (stdlib) | Verbose (manual synchronisation) |
Backpressure prevents fast producers from overwhelming slow consumers. Without it, unbounded buffers grow until memory is exhausted. sync_channel is Rust's built-in solution; the manual OCaml implementation shows the underlying mechanism.
OCaml Approach
OCaml's bounded channel uses Queue.t with Mutex and two Condition variables: not_full (producer waits) and not_empty (consumer waits). send_bounded locks the mutex, waits on not_full while the queue is at capacity, pushes the value, and signals not_empty. This is the standard Condition-based producer-consumer pattern, equivalent to Rust's sync_channel semantics but implemented manually.
Full Source
#![allow(clippy::all)]
// 1002: Backpressure — Bounded sync_channel blocks producer
// When consumer is slow, bounded buffer fills and producer is forced to wait
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
// --- Approach 1: sync_channel with slow consumer ---
fn bounded_backpressure() -> (usize, Duration) {
const BUFFER_SIZE: usize = 3;
// sync_channel(N): sender blocks when N items are buffered
let (tx, rx) = mpsc::sync_channel::<i32>(BUFFER_SIZE);
let start = Instant::now();
let producer = thread::spawn(move || {
for i in 1..=9 {
tx.send(i).unwrap(); // blocks when buffer is full
}
// tx drops here — signals consumer to stop
});
let consumer = thread::spawn(move || {
for item in rx.iter() {
thread::sleep(Duration::from_millis(5)); // slow consumer
let _ = item;
}
});
producer.join().unwrap();
consumer.join().unwrap();
(9, start.elapsed())
}
// --- Approach 2: try_send for non-blocking backpressure (drop or error) ---
fn try_send_demo() -> (usize, usize) {
let (tx, rx) = mpsc::sync_channel::<i32>(2);
let mut accepted = 0;
let mut dropped = 0;
for i in 1..=10 {
match tx.try_send(i) {
Ok(_) => accepted += 1,
Err(mpsc::TrySendError::Full(_)) => dropped += 1,
Err(mpsc::TrySendError::Disconnected(_)) => break,
}
}
drop(tx);
let drained: Vec<_> = rx.iter().collect();
assert_eq!(drained.len(), accepted);
(accepted, dropped)
}
// --- Approach 3: Bounded pipeline with backpressure between stages ---
fn bounded_pipeline(items: Vec<i32>) -> Vec<i32> {
// Stage channels — each bounded to 2 items
let (tx1, rx1) = mpsc::sync_channel::<i32>(2);
let (tx2, rx2) = mpsc::sync_channel::<i32>(2);
let (tx3, rx3) = mpsc::sync_channel::<i32>(2);
// Stage 1: double
thread::spawn(move || {
for item in rx1.iter() {
tx2.send(item * 2).unwrap();
}
});
// Stage 2: add 1 (slow)
thread::spawn(move || {
for item in rx2.iter() {
thread::sleep(Duration::from_millis(1)); // simulate slow processing
tx3.send(item + 1).unwrap();
}
});
// Producer
let producer = thread::spawn(move || {
for item in items {
tx1.send(item).unwrap();
} // blocks when stage 1 full
});
// Collect
let results: Vec<i32> = rx3.iter().collect();
producer.join().unwrap();
results
}
// --- Approach 4: Measure backpressure effect ---
fn measure_backpressure_effect() -> bool {
// With buffer=1: producer is slowed to consumer's pace
let (tx_fast, rx_fast) = mpsc::channel::<i32>(); // unbounded
let (tx_bounded, rx_bounded) = mpsc::sync_channel::<i32>(1); // bounded=1
let fast_start = Instant::now();
let h = thread::spawn(move || {
for i in 0..20 {
tx_fast.send(i).unwrap();
}
});
h.join().unwrap();
let fast_time = fast_start.elapsed();
drop(rx_fast);
let bounded_start = Instant::now();
let h2 = thread::spawn(move || {
for i in 0..20 {
tx_bounded.send(i).unwrap();
}
});
// Slow consumer
thread::spawn(move || {
for _ in rx_bounded.iter() {
thread::sleep(Duration::from_millis(1));
}
});
h2.join().unwrap();
let bounded_time = bounded_start.elapsed();
// Bounded (backpressure) should be slower than unbounded
bounded_time > fast_time
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bounded_backpressure_processes_all() {
let (count, _) = bounded_backpressure();
assert_eq!(count, 9);
}
#[test]
fn test_try_send_drops_when_full() {
let (accepted, dropped) = try_send_demo();
assert_eq!(accepted, 2); // buffer size = 2
assert_eq!(dropped, 8); // remaining 8 are dropped
assert_eq!(accepted + dropped, 10);
}
#[test]
fn test_bounded_pipeline_correctness() {
// 1*2+1=3, 2*2+1=5, 3*2+1=7
let mut results = bounded_pipeline(vec![1, 2, 3]);
results.sort();
assert_eq!(results, vec![3, 5, 7]);
}
#[test]
fn test_sync_channel_zero_buffer_rendezvous() {
// sync_channel(0) = rendezvous — sender blocks until receiver takes
let (tx, rx) = mpsc::sync_channel::<i32>(0);
let h = thread::spawn(move || {
tx.send(42).unwrap(); // blocks until receiver calls recv()
});
assert_eq!(rx.recv().unwrap(), 42);
h.join().unwrap();
}
#[test]
fn test_backpressure_is_slower() {
assert!(measure_backpressure_effect());
}
#[test]
fn test_try_send_error_type() {
let (tx, _rx) = mpsc::sync_channel::<i32>(1);
tx.try_send(1).unwrap(); // fills the buffer
let err = tx.try_send(2);
assert!(matches!(err, Err(mpsc::TrySendError::Full(_))));
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bounded_backpressure_processes_all() {
let (count, _) = bounded_backpressure();
assert_eq!(count, 9);
}
#[test]
fn test_try_send_drops_when_full() {
let (accepted, dropped) = try_send_demo();
assert_eq!(accepted, 2); // buffer size = 2
assert_eq!(dropped, 8); // remaining 8 are dropped
assert_eq!(accepted + dropped, 10);
}
#[test]
fn test_bounded_pipeline_correctness() {
// 1*2+1=3, 2*2+1=5, 3*2+1=7
let mut results = bounded_pipeline(vec![1, 2, 3]);
results.sort();
assert_eq!(results, vec![3, 5, 7]);
}
#[test]
fn test_sync_channel_zero_buffer_rendezvous() {
// sync_channel(0) = rendezvous — sender blocks until receiver takes
let (tx, rx) = mpsc::sync_channel::<i32>(0);
let h = thread::spawn(move || {
tx.send(42).unwrap(); // blocks until receiver calls recv()
});
assert_eq!(rx.recv().unwrap(), 42);
h.join().unwrap();
}
#[test]
fn test_backpressure_is_slower() {
assert!(measure_backpressure_effect());
}
#[test]
fn test_try_send_error_type() {
let (tx, _rx) = mpsc::sync_channel::<i32>(1);
tx.try_send(1).unwrap(); // fills the buffer
let err = tx.try_send(2);
assert!(matches!(err, Err(mpsc::TrySendError::Full(_))));
}
}
Deep Comparison
Backpressure — Comparison
Core Insight
Backpressure prevents unbounded buffering: instead of letting producers flood a buffer until it runs out of memory, the producer is forced to wait when the buffer is full. This propagates slowness upstream — the natural rate-limiting of processing pipelines.
OCaml Approach
Queue + Mutex + two Condition variablessend_bounded: wait while Queue.length >= capacity (not_full condition)recv_bounded: signal not_full after each receivetry_send: non-blocking check — returns bool indicating acceptanceRust Approach
mpsc::sync_channel(N) creates a bounded channel with buffer of Ntx.send(v) blocks when buffer is full — zero-cost backpressuretx.try_send(v) returns Err(TrySendError::Full(_)) immediatelysync_channel(0) is a CSP rendezvous — synchronous handoffrx.iter() — pipeline stages auto-throttleComparison Table
| Concept | OCaml (simulated) | Rust |
|---|---|---|
| Bounded channel | Manual Queue + Mutex + 2 Condvar | mpsc::sync_channel(N) |
| Blocking send | Condition.wait not_full in send | tx.send(v) blocks automatically |
| Non-blocking send | try_send (custom) | tx.try_send(v) built-in |
| Buffer full error | return false from try_send | Err(TrySendError::Full(v)) |
| Rendezvous (N=0) | capacity=0 edge case | sync_channel(0) first-class |
| Pipeline backpressure | Manual per-stage | Each stage's sync_channel auto-throttles |
| Async backpressure | N/A | tokio::sync::mpsc with send().await |
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
tx.send_timeout(v, Duration::from_millis(100)) (if available) or implement via try_send + sleep loop.try_send_demo to return (accepted, dropped, drained) and verify accepted == drained.sync_channel(1) vs sync_channel(100) for throughput on a CPU-bound transform stage.try_send variant to the bounded channel that returns false instead of blocking when full.