ExamplesBy LevelBy TopicLearning Paths
335 Expert

335: Waker and Context

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "335: Waker and Context" functional Rust example. Difficulty level: Expert. Key concepts covered: Functional Programming. When a future returns `Poll::Pending`, the executor must know when to re-poll it. Key difference from OCaml: 1. **Pull vs push**: Rust's Waker is pull

Tutorial

The Problem

When a future returns Poll::Pending, the executor must know when to re-poll it. Without this signaling mechanism, the executor would either poll continuously (busy-waiting, wasting CPU) or miss the completion event. The Waker mechanism solves this: the future stores the waker from Context, then calls waker.wake() when it's ready to be re-polled. This is the fundamental scheduling mechanism behind all async runtimes.

🎯 Learning Outcomes

  • • Understand Context<'_> as the channel through which the executor gives the future a Waker
  • • Implement a future that stores the Waker and signals it when external state changes
  • • Recognize that wake() schedules the future for re-polling — not immediate execution
  • • Understand the shared state pattern: Arc<Mutex<SharedState<T>>> between future and resolver
  • Code Example

    let (fut, resolver) = make_future();
    resolver.fulfill(42);

    Key Differences

  • Pull vs push: Rust's Waker is pull-based (executor polls); OCaml's Lwt callbacks are push-based (completion triggers continuation).
  • Clone requirement: Waker implements Clone so multiple contexts can hold it; the executor uses a RawWaker vtable for dynamic dispatch.
  • Wake = schedule: waker.wake() doesn't immediately execute the future; it just queues it for the executor's next scheduling cycle.
  • Thread safety: Waker is Send + Sync — it can be stored in any thread-safe context and woken from any thread.
  • OCaml Approach

    OCaml's Lwt uses a resolver pattern (Lwt.wait() returns a promise + resolver). The internal mechanism uses callbacks rather than poll-based wakers:

    let (promise, resolver) = Lwt.wait ()
    (* Elsewhere: Lwt.wakeup resolver value *)
    (* The promise completes when wakeup is called *)
    

    Full Source

    #![allow(clippy::all)]
    //! # Waker and Context
    //!
    //! How the executor knows to re-poll a future — `cx.waker().wake()` schedules a re-poll.
    
    use std::future::Future;
    use std::pin::Pin;
    use std::sync::{Arc, Mutex};
    use std::task::{Context, Poll, Waker};
    
    /// Shared state between a future and its resolver.
    #[derive(Default)]
    pub struct SharedState<T> {
        pub value: Option<T>,
        pub waker: Option<Waker>,
    }
    
    /// A future that completes when an external source provides a value.
    pub struct ExternalFuture<T> {
        state: Arc<Mutex<SharedState<T>>>,
    }
    
    impl<T: Clone> Future for ExternalFuture<T> {
        type Output = T;
    
        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
            let mut state = self.state.lock().unwrap();
    
            if let Some(value) = state.value.clone() {
                Poll::Ready(value)
            } else {
                // Store the waker so the resolver can wake us later
                state.waker = Some(cx.waker().clone());
                Poll::Pending
            }
        }
    }
    
    /// A resolver that can fulfill the associated future.
    pub struct Resolver<T> {
        state: Arc<Mutex<SharedState<T>>>,
    }
    
    impl<T> Resolver<T> {
        /// Fulfill the future with a value, waking the executor.
        pub fn fulfill(self, value: T) {
            let mut state = self.state.lock().unwrap();
            state.value = Some(value);
    
            // Wake the executor so it knows to re-poll
            if let Some(waker) = state.waker.take() {
                waker.wake();
            }
        }
    }
    
    /// Create a linked future and resolver pair.
    pub fn make_future<T>() -> (ExternalFuture<T>, Resolver<T>) {
        let state = Arc::new(Mutex::new(SharedState {
            value: None,
            waker: None,
        }));
        (
            ExternalFuture {
                state: Arc::clone(&state),
            },
            Resolver { state },
        )
    }
    
    /// A minimal executor that properly handles waker notifications.
    pub fn block_on<F: Future>(fut: F) -> F::Output {
        use std::sync::atomic::{AtomicBool, Ordering};
        use std::task::{RawWaker, RawWakerVTable};
    
        let ready = Arc::new(AtomicBool::new(true));
    
        // Waker vtable that operates on Arc<AtomicBool>
        unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
            let arc = Arc::from_raw(ptr as *const AtomicBool);
            let cloned = Arc::clone(&arc);
            std::mem::forget(arc); // don't drop the original
            RawWaker::new(Arc::into_raw(cloned) as *const (), &VTABLE)
        }
    
        unsafe fn wake(ptr: *const ()) {
            let arc = Arc::from_raw(ptr as *const AtomicBool);
            arc.store(true, Ordering::Release);
            // arc is dropped here (consumed)
        }
    
        unsafe fn wake_by_ref(ptr: *const ()) {
            let arc = Arc::from_raw(ptr as *const AtomicBool);
            arc.store(true, Ordering::Release);
            std::mem::forget(arc); // don't drop
        }
    
        unsafe fn drop_waker(ptr: *const ()) {
            drop(Arc::from_raw(ptr as *const AtomicBool));
        }
    
        static VTABLE: RawWakerVTable = RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop_waker);
    
        let raw = RawWaker::new(Arc::into_raw(Arc::clone(&ready)) as *const (), &VTABLE);
        let waker = unsafe { Waker::from_raw(raw) };
        let mut cx = Context::from_waker(&waker);
        let mut fut = Box::pin(fut);
    
        loop {
            if let Poll::Ready(value) = fut.as_mut().poll(&mut cx) {
                return value;
            }
            // Wait until woken
            while !ready.swap(false, Ordering::AcqRel) {
                std::hint::spin_loop();
            }
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        use std::thread;
        use std::time::Duration;
    
        #[test]
        fn test_external_future() {
            let (fut, resolver) = make_future::<i32>();
    
            thread::spawn(move || {
                thread::sleep(Duration::from_millis(5));
                resolver.fulfill(42);
            });
    
            assert_eq!(block_on(fut), 42);
        }
    
        #[test]
        fn test_immediate_fulfill() {
            let (fut, resolver) = make_future::<String>();
            resolver.fulfill("hello".to_string());
            assert_eq!(block_on(fut), "hello");
        }
    
        #[test]
        fn test_multiple_futures() {
            let (fut1, res1) = make_future::<i32>();
            let (fut2, res2) = make_future::<i32>();
    
            thread::spawn(move || {
                thread::sleep(Duration::from_millis(5));
                res1.fulfill(1);
            });
            thread::spawn(move || {
                thread::sleep(Duration::from_millis(3));
                res2.fulfill(2);
            });
    
            assert_eq!(block_on(fut1), 1);
            assert_eq!(block_on(fut2), 2);
        }
    
        #[test]
        fn test_shared_state_default() {
            let state: SharedState<i32> = SharedState::default();
            assert!(state.value.is_none());
            assert!(state.waker.is_none());
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
        use std::thread;
        use std::time::Duration;
    
        #[test]
        fn test_external_future() {
            let (fut, resolver) = make_future::<i32>();
    
            thread::spawn(move || {
                thread::sleep(Duration::from_millis(5));
                resolver.fulfill(42);
            });
    
            assert_eq!(block_on(fut), 42);
        }
    
        #[test]
        fn test_immediate_fulfill() {
            let (fut, resolver) = make_future::<String>();
            resolver.fulfill("hello".to_string());
            assert_eq!(block_on(fut), "hello");
        }
    
        #[test]
        fn test_multiple_futures() {
            let (fut1, res1) = make_future::<i32>();
            let (fut2, res2) = make_future::<i32>();
    
            thread::spawn(move || {
                thread::sleep(Duration::from_millis(5));
                res1.fulfill(1);
            });
            thread::spawn(move || {
                thread::sleep(Duration::from_millis(3));
                res2.fulfill(2);
            });
    
            assert_eq!(block_on(fut1), 1);
            assert_eq!(block_on(fut2), 2);
        }
    
        #[test]
        fn test_shared_state_default() {
            let state: SharedState<i32> = SharedState::default();
            assert!(state.value.is_none());
            assert!(state.waker.is_none());
        }
    }

    Deep Comparison

    OCaml vs Rust: Waker and Context

    Promise/Resolver Pattern

    OCaml (Lwt):

    let (promise, resolver) = Lwt.wait () in
    Lwt.wakeup resolver 42
    

    Rust:

    let (fut, resolver) = make_future();
    resolver.fulfill(42);
    

    Future Implementation

    OCaml: Internal to Lwt library, uses callbacks.

    Rust:

    impl Future for ExternalFuture {
        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<i32> {
            let mut s = self.state.lock().unwrap();
            if let Some(v) = s.value { Poll::Ready(v) }
            else { s.waker = Some(cx.waker().clone()); Poll::Pending }
        }
    }
    

    Key Differences

    AspectOCamlRust
    Wake mechanismLwt.wakeupwaker.wake()
    SuspensionLwt.wait()Poll::Pending
    Executor notifyInternalExplicit waker
    Shared stateMutable refArc<Mutex<T>>

    Exercises

  • Implement a ChannelFuture<T> that wraps an mpsc::Receiver<T> and uses a Waker to be notified when a message arrives.
  • Show the difference between wake_by_ref() (keeps the waker alive) and wake() (consumes the waker).
  • Build a simple timer future that stores the waker and spawns a thread that sleeps, then calls wake() after the delay.
  • Open Source Repos