ExamplesBy LevelBy TopicLearning Paths
1002 Advanced

1002 — Backpressure

Functional Programming

Tutorial

The Problem

Demonstrate backpressure in a concurrent pipeline using mpsc::sync_channel — a bounded channel where the sender blocks when the buffer is full. Show try_send for non-blocking drop-on-full semantics, and a multi-stage bounded pipeline. Compare with OCaml's Mutex/Condition-based bounded channel implementation.

🎯 Learning Outcomes

  • • Use mpsc::sync_channel(N) to create a channel that blocks senders when N items are buffered
  • • Understand backpressure: the producer is implicitly rate-limited by the consumer's speed
  • • Use try_send for non-blocking send that returns TrySendError::Full when the buffer is full
  • • Build a two-stage pipeline with bounded channels between stages
  • • Map Rust's sync_channel to OCaml's manual Mutex + Condition bounded queue
  • • Recognise backpressure as a fundamental flow-control pattern in streaming systems
  • Code Example

    #![allow(clippy::all)]
    // 1002: Backpressure — Bounded sync_channel blocks producer
    // When consumer is slow, bounded buffer fills and producer is forced to wait
    
    use std::sync::mpsc;
    use std::thread;
    use std::time::{Duration, Instant};
    
    // --- Approach 1: sync_channel with slow consumer ---
    fn bounded_backpressure() -> (usize, Duration) {
        const BUFFER_SIZE: usize = 3;
        // sync_channel(N): sender blocks when N items are buffered
        let (tx, rx) = mpsc::sync_channel::<i32>(BUFFER_SIZE);
    
        let start = Instant::now();
    
        let producer = thread::spawn(move || {
            for i in 1..=9 {
                tx.send(i).unwrap(); // blocks when buffer is full
            }
            // tx drops here — signals consumer to stop
        });
    
        let consumer = thread::spawn(move || {
            for item in rx.iter() {
                thread::sleep(Duration::from_millis(5)); // slow consumer
                let _ = item;
            }
        });
    
        producer.join().unwrap();
        consumer.join().unwrap();
        (9, start.elapsed())
    }
    
    // --- Approach 2: try_send for non-blocking backpressure (drop or error) ---
    fn try_send_demo() -> (usize, usize) {
        let (tx, rx) = mpsc::sync_channel::<i32>(2);
    
        let mut accepted = 0;
        let mut dropped = 0;
    
        for i in 1..=10 {
            match tx.try_send(i) {
                Ok(_) => accepted += 1,
                Err(mpsc::TrySendError::Full(_)) => dropped += 1,
                Err(mpsc::TrySendError::Disconnected(_)) => break,
            }
        }
    
        drop(tx);
        let drained: Vec<_> = rx.iter().collect();
        assert_eq!(drained.len(), accepted);
        (accepted, dropped)
    }
    
    // --- Approach 3: Bounded pipeline with backpressure between stages ---
    fn bounded_pipeline(items: Vec<i32>) -> Vec<i32> {
        // Stage channels — each bounded to 2 items
        let (tx1, rx1) = mpsc::sync_channel::<i32>(2);
        let (tx2, rx2) = mpsc::sync_channel::<i32>(2);
        let (tx3, rx3) = mpsc::sync_channel::<i32>(2);
    
        // Stage 1: double
        thread::spawn(move || {
            for item in rx1.iter() {
                tx2.send(item * 2).unwrap();
            }
        });
    
        // Stage 2: add 1 (slow)
        thread::spawn(move || {
            for item in rx2.iter() {
                thread::sleep(Duration::from_millis(1)); // simulate slow processing
                tx3.send(item + 1).unwrap();
            }
        });
    
        // Producer
        let producer = thread::spawn(move || {
            for item in items {
                tx1.send(item).unwrap();
            } // blocks when stage 1 full
        });
    
        // Collect
        let results: Vec<i32> = rx3.iter().collect();
        producer.join().unwrap();
        results
    }
    
    // --- Approach 4: Measure backpressure effect ---
    fn measure_backpressure_effect() -> bool {
        // With buffer=1: producer is slowed to consumer's pace
        let (tx_fast, rx_fast) = mpsc::channel::<i32>(); // unbounded
        let (tx_bounded, rx_bounded) = mpsc::sync_channel::<i32>(1); // bounded=1
    
        let fast_start = Instant::now();
        let h = thread::spawn(move || {
            for i in 0..20 {
                tx_fast.send(i).unwrap();
            }
        });
        h.join().unwrap();
        let fast_time = fast_start.elapsed();
        drop(rx_fast);
    
        let bounded_start = Instant::now();
        let h2 = thread::spawn(move || {
            for i in 0..20 {
                tx_bounded.send(i).unwrap();
            }
        });
        // Slow consumer
        thread::spawn(move || {
            for _ in rx_bounded.iter() {
                thread::sleep(Duration::from_millis(1));
            }
        });
        h2.join().unwrap();
        let bounded_time = bounded_start.elapsed();
    
        // Bounded (backpressure) should be slower than unbounded
        bounded_time > fast_time
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_bounded_backpressure_processes_all() {
            let (count, _) = bounded_backpressure();
            assert_eq!(count, 9);
        }
    
        #[test]
        fn test_try_send_drops_when_full() {
            let (accepted, dropped) = try_send_demo();
            assert_eq!(accepted, 2); // buffer size = 2
            assert_eq!(dropped, 8); // remaining 8 are dropped
            assert_eq!(accepted + dropped, 10);
        }
    
        #[test]
        fn test_bounded_pipeline_correctness() {
            // 1*2+1=3, 2*2+1=5, 3*2+1=7
            let mut results = bounded_pipeline(vec![1, 2, 3]);
            results.sort();
            assert_eq!(results, vec![3, 5, 7]);
        }
    
        #[test]
        fn test_sync_channel_zero_buffer_rendezvous() {
            // sync_channel(0) = rendezvous — sender blocks until receiver takes
            let (tx, rx) = mpsc::sync_channel::<i32>(0);
            let h = thread::spawn(move || {
                tx.send(42).unwrap(); // blocks until receiver calls recv()
            });
            assert_eq!(rx.recv().unwrap(), 42);
            h.join().unwrap();
        }
    
        #[test]
        fn test_backpressure_is_slower() {
            assert!(measure_backpressure_effect());
        }
    
        #[test]
        fn test_try_send_error_type() {
            let (tx, _rx) = mpsc::sync_channel::<i32>(1);
            tx.try_send(1).unwrap(); // fills the buffer
            let err = tx.try_send(2);
            assert!(matches!(err, Err(mpsc::TrySendError::Full(_))));
        }
    }

    Key Differences

    AspectRustOCaml
    Bounded channelmpsc::sync_channel(N)Manual Mutex + Condition
    Blocking sendtx.send(v) blocks when fullCondition.wait not_full m
    Non-blockingtx.try_send(v)TrySendErrorConditional if Queue.length < capacity
    Consumerrx.iter()recv_bounded with Condition.wait not_empty
    PipelineChain sync_channelsChain make_bounded_chans
    Code lengthShort (stdlib)Verbose (manual synchronisation)

    Backpressure prevents fast producers from overwhelming slow consumers. Without it, unbounded buffers grow until memory is exhausted. sync_channel is Rust's built-in solution; the manual OCaml implementation shows the underlying mechanism.

    OCaml Approach

    OCaml's bounded channel uses Queue.t with Mutex and two Condition variables: not_full (producer waits) and not_empty (consumer waits). send_bounded locks the mutex, waits on not_full while the queue is at capacity, pushes the value, and signals not_empty. This is the standard Condition-based producer-consumer pattern, equivalent to Rust's sync_channel semantics but implemented manually.

    Full Source

    #![allow(clippy::all)]
    // 1002: Backpressure — Bounded sync_channel blocks producer
    // When consumer is slow, bounded buffer fills and producer is forced to wait
    
    use std::sync::mpsc;
    use std::thread;
    use std::time::{Duration, Instant};
    
    // --- Approach 1: sync_channel with slow consumer ---
    fn bounded_backpressure() -> (usize, Duration) {
        const BUFFER_SIZE: usize = 3;
        // sync_channel(N): sender blocks when N items are buffered
        let (tx, rx) = mpsc::sync_channel::<i32>(BUFFER_SIZE);
    
        let start = Instant::now();
    
        let producer = thread::spawn(move || {
            for i in 1..=9 {
                tx.send(i).unwrap(); // blocks when buffer is full
            }
            // tx drops here — signals consumer to stop
        });
    
        let consumer = thread::spawn(move || {
            for item in rx.iter() {
                thread::sleep(Duration::from_millis(5)); // slow consumer
                let _ = item;
            }
        });
    
        producer.join().unwrap();
        consumer.join().unwrap();
        (9, start.elapsed())
    }
    
    // --- Approach 2: try_send for non-blocking backpressure (drop or error) ---
    fn try_send_demo() -> (usize, usize) {
        let (tx, rx) = mpsc::sync_channel::<i32>(2);
    
        let mut accepted = 0;
        let mut dropped = 0;
    
        for i in 1..=10 {
            match tx.try_send(i) {
                Ok(_) => accepted += 1,
                Err(mpsc::TrySendError::Full(_)) => dropped += 1,
                Err(mpsc::TrySendError::Disconnected(_)) => break,
            }
        }
    
        drop(tx);
        let drained: Vec<_> = rx.iter().collect();
        assert_eq!(drained.len(), accepted);
        (accepted, dropped)
    }
    
    // --- Approach 3: Bounded pipeline with backpressure between stages ---
    fn bounded_pipeline(items: Vec<i32>) -> Vec<i32> {
        // Stage channels — each bounded to 2 items
        let (tx1, rx1) = mpsc::sync_channel::<i32>(2);
        let (tx2, rx2) = mpsc::sync_channel::<i32>(2);
        let (tx3, rx3) = mpsc::sync_channel::<i32>(2);
    
        // Stage 1: double
        thread::spawn(move || {
            for item in rx1.iter() {
                tx2.send(item * 2).unwrap();
            }
        });
    
        // Stage 2: add 1 (slow)
        thread::spawn(move || {
            for item in rx2.iter() {
                thread::sleep(Duration::from_millis(1)); // simulate slow processing
                tx3.send(item + 1).unwrap();
            }
        });
    
        // Producer
        let producer = thread::spawn(move || {
            for item in items {
                tx1.send(item).unwrap();
            } // blocks when stage 1 full
        });
    
        // Collect
        let results: Vec<i32> = rx3.iter().collect();
        producer.join().unwrap();
        results
    }
    
    // --- Approach 4: Measure backpressure effect ---
    fn measure_backpressure_effect() -> bool {
        // With buffer=1: producer is slowed to consumer's pace
        let (tx_fast, rx_fast) = mpsc::channel::<i32>(); // unbounded
        let (tx_bounded, rx_bounded) = mpsc::sync_channel::<i32>(1); // bounded=1
    
        let fast_start = Instant::now();
        let h = thread::spawn(move || {
            for i in 0..20 {
                tx_fast.send(i).unwrap();
            }
        });
        h.join().unwrap();
        let fast_time = fast_start.elapsed();
        drop(rx_fast);
    
        let bounded_start = Instant::now();
        let h2 = thread::spawn(move || {
            for i in 0..20 {
                tx_bounded.send(i).unwrap();
            }
        });
        // Slow consumer
        thread::spawn(move || {
            for _ in rx_bounded.iter() {
                thread::sleep(Duration::from_millis(1));
            }
        });
        h2.join().unwrap();
        let bounded_time = bounded_start.elapsed();
    
        // Bounded (backpressure) should be slower than unbounded
        bounded_time > fast_time
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_bounded_backpressure_processes_all() {
            let (count, _) = bounded_backpressure();
            assert_eq!(count, 9);
        }
    
        #[test]
        fn test_try_send_drops_when_full() {
            let (accepted, dropped) = try_send_demo();
            assert_eq!(accepted, 2); // buffer size = 2
            assert_eq!(dropped, 8); // remaining 8 are dropped
            assert_eq!(accepted + dropped, 10);
        }
    
        #[test]
        fn test_bounded_pipeline_correctness() {
            // 1*2+1=3, 2*2+1=5, 3*2+1=7
            let mut results = bounded_pipeline(vec![1, 2, 3]);
            results.sort();
            assert_eq!(results, vec![3, 5, 7]);
        }
    
        #[test]
        fn test_sync_channel_zero_buffer_rendezvous() {
            // sync_channel(0) = rendezvous — sender blocks until receiver takes
            let (tx, rx) = mpsc::sync_channel::<i32>(0);
            let h = thread::spawn(move || {
                tx.send(42).unwrap(); // blocks until receiver calls recv()
            });
            assert_eq!(rx.recv().unwrap(), 42);
            h.join().unwrap();
        }
    
        #[test]
        fn test_backpressure_is_slower() {
            assert!(measure_backpressure_effect());
        }
    
        #[test]
        fn test_try_send_error_type() {
            let (tx, _rx) = mpsc::sync_channel::<i32>(1);
            tx.try_send(1).unwrap(); // fills the buffer
            let err = tx.try_send(2);
            assert!(matches!(err, Err(mpsc::TrySendError::Full(_))));
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_bounded_backpressure_processes_all() {
            let (count, _) = bounded_backpressure();
            assert_eq!(count, 9);
        }
    
        #[test]
        fn test_try_send_drops_when_full() {
            let (accepted, dropped) = try_send_demo();
            assert_eq!(accepted, 2); // buffer size = 2
            assert_eq!(dropped, 8); // remaining 8 are dropped
            assert_eq!(accepted + dropped, 10);
        }
    
        #[test]
        fn test_bounded_pipeline_correctness() {
            // 1*2+1=3, 2*2+1=5, 3*2+1=7
            let mut results = bounded_pipeline(vec![1, 2, 3]);
            results.sort();
            assert_eq!(results, vec![3, 5, 7]);
        }
    
        #[test]
        fn test_sync_channel_zero_buffer_rendezvous() {
            // sync_channel(0) = rendezvous — sender blocks until receiver takes
            let (tx, rx) = mpsc::sync_channel::<i32>(0);
            let h = thread::spawn(move || {
                tx.send(42).unwrap(); // blocks until receiver calls recv()
            });
            assert_eq!(rx.recv().unwrap(), 42);
            h.join().unwrap();
        }
    
        #[test]
        fn test_backpressure_is_slower() {
            assert!(measure_backpressure_effect());
        }
    
        #[test]
        fn test_try_send_error_type() {
            let (tx, _rx) = mpsc::sync_channel::<i32>(1);
            tx.try_send(1).unwrap(); // fills the buffer
            let err = tx.try_send(2);
            assert!(matches!(err, Err(mpsc::TrySendError::Full(_))));
        }
    }

    Deep Comparison

    Backpressure — Comparison

    Core Insight

    Backpressure prevents unbounded buffering: instead of letting producers flood a buffer until it runs out of memory, the producer is forced to wait when the buffer is full. This propagates slowness upstream — the natural rate-limiting of processing pipelines.

    OCaml Approach

  • • Simulate bounded channel with Queue + Mutex + two Condition variables
  • send_bounded: wait while Queue.length >= capacity (not_full condition)
  • recv_bounded: signal not_full after each receive
  • try_send: non-blocking check — returns bool indicating acceptance
  • • More boilerplate than Rust — no built-in bounded channel
  • Rust Approach

  • mpsc::sync_channel(N) creates a bounded channel with buffer of N
  • tx.send(v) blocks when buffer is full — zero-cost backpressure
  • tx.try_send(v) returns Err(TrySendError::Full(_)) immediately
  • sync_channel(0) is a CSP rendezvous — synchronous handoff
  • • Works transparently with rx.iter() — pipeline stages auto-throttle
  • Comparison Table

    ConceptOCaml (simulated)Rust
    Bounded channelManual Queue + Mutex + 2 Condvarmpsc::sync_channel(N)
    Blocking sendCondition.wait not_full in sendtx.send(v) blocks automatically
    Non-blocking sendtry_send (custom)tx.try_send(v) built-in
    Buffer full errorreturn false from try_sendErr(TrySendError::Full(v))
    Rendezvous (N=0)capacity=0 edge casesync_channel(0) first-class
    Pipeline backpressureManual per-stageEach stage's sync_channel auto-throttles
    Async backpressureN/Atokio::sync::mpsc with send().await

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Add a timeout to the blocking send: use tx.send_timeout(v, Duration::from_millis(100)) (if available) or implement via try_send + sleep loop.
  • Add a drop counter: modify try_send_demo to return (accepted, dropped, drained) and verify accepted == drained.
  • Implement a three-stage pipeline: producer → transform → consumer with bounded channels between each stage.
  • Benchmark sync_channel(1) vs sync_channel(100) for throughput on a CPU-bound transform stage.
  • In OCaml, add a try_send variant to the bounded channel that returns false instead of blocking when full.
  • Open Source Repos