ExamplesBy LevelBy TopicLearning Paths
336 Advanced

336: Executor Basics

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "336: Executor Basics" functional Rust example. Difficulty level: Advanced. Key concepts covered: Functional Programming. `async fn` and `Future` trait implementations don't run themselves — they need an executor to drive them to completion by calling `poll()` repeatedly until `Poll::Ready`. Key difference from OCaml: 1. **Poll vs callback**: Rust's executor poll

Tutorial

The Problem

async fn and Future trait implementations don't run themselves — they need an executor to drive them to completion by calling poll() repeatedly until Poll::Ready. Understanding how an executor works explains the behavior of tokio, async-std, and other runtimes. This example builds a minimal single-threaded executor from scratch, demonstrating task queueing, waker implementation, and the poll loop.

🎯 Learning Outcomes

  • • Understand that an executor drives futures by polling them until Poll::Ready
  • • Implement a minimal task queue using mpsc::sync_channel for the ready queue
  • • Build a Waker using the RawWaker vtable API for custom wake behavior
  • • Recognize the event loop: poll task → if Pending, park; on wake, re-queue and re-poll
  • Code Example

    let ex = SimpleExecutor::new();
    ex.spawn(async { task1() });
    ex.spawn(async { task2() });
    ex.run();

    Key Differences

  • Poll vs callback: Rust's executor poll-based model is more explicit about state; OCaml's Lwt uses implicit callback registration.
  • Thread model: A single-threaded executor processes futures sequentially; Tokio's multi-threaded executor uses work-stealing.
  • RawWaker vtable: The RawWaker API requires unsafe code for custom wakers — production code uses the waker_fn or futures::task::noop_waker helpers.
  • Production runtimes: Tokio, async-std, and smol all implement this loop with I/O multiplexing (epoll/kqueue/IOCP) for true async I/O.
  • OCaml Approach

    OCaml's Lwt has a similar event loop internally, but it uses a cooperative scheduling model based on callbacks. The "scheduler" in Lwt processes ready callbacks in a queue:

    (* Lwt's internal loop, simplified: *)
    let () =
      while not (Queue.is_empty ready_callbacks) do
        let callback = Queue.pop ready_callbacks in
        callback ()
      done
    

    Full Source

    #![allow(clippy::all)]
    //! # Executor Basics
    //!
    //! A minimal async executor — the engine that drives futures to completion by polling them.
    
    use std::future::Future;
    use std::pin::Pin;
    use std::sync::{mpsc, Arc, Mutex};
    use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
    
    type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
    
    struct Task {
        future: Mutex<Option<BoxFuture>>,
        sender: mpsc::SyncSender<Arc<Task>>,
    }
    
    impl Task {
        fn schedule(self: &Arc<Self>) {
            let _ = self.sender.send(Arc::clone(self));
        }
    }
    
    fn make_waker(task: Arc<Task>) -> Waker {
        let ptr = Arc::into_raw(task) as *const ();
    
        unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
            let arc = Arc::from_raw(ptr as *const Task);
            std::mem::forget(Arc::clone(&arc));
            std::mem::forget(arc);
            RawWaker::new(ptr, &VTABLE)
        }
    
        unsafe fn wake(ptr: *const ()) {
            Arc::from_raw(ptr as *const Task).schedule();
        }
    
        unsafe fn wake_by_ref(ptr: *const ()) {
            let arc = Arc::from_raw(ptr as *const Task);
            arc.schedule();
            std::mem::forget(arc);
        }
    
        unsafe fn drop_waker(ptr: *const ()) {
            drop(Arc::from_raw(ptr as *const Task));
        }
    
        static VTABLE: RawWakerVTable = RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop_waker);
    
        unsafe { Waker::from_raw(RawWaker::new(ptr, &VTABLE)) }
    }
    
    /// A simple single-threaded executor.
    pub struct SimpleExecutor {
        rx: mpsc::Receiver<Arc<Task>>,
        tx: mpsc::SyncSender<Arc<Task>>,
    }
    
    impl SimpleExecutor {
        pub fn new() -> Self {
            let (tx, rx) = mpsc::sync_channel(100);
            Self { rx, tx }
        }
    
        pub fn spawn(&self, fut: impl Future<Output = ()> + Send + 'static) {
            let task = Arc::new(Task {
                future: Mutex::new(Some(Box::pin(fut))),
                sender: self.tx.clone(),
            });
            task.schedule();
        }
    
        pub fn run(self) {
            drop(self.tx); // Drop sender so rx.recv() ends when all tasks complete
    
            while let Ok(task) = self.rx.recv() {
                let mut slot = task.future.lock().unwrap();
                if let Some(mut fut) = slot.take() {
                    let waker = make_waker(Arc::clone(&task));
                    let mut cx = Context::from_waker(&waker);
                    if fut.as_mut().poll(&mut cx) == Poll::Pending {
                        *slot = Some(fut);
                    }
                }
            }
        }
    }
    
    impl Default for SimpleExecutor {
        fn default() -> Self {
            Self::new()
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        use std::sync::atomic::{AtomicUsize, Ordering};
    
        #[test]
        fn test_runs_single_task() {
            let counter = Arc::new(AtomicUsize::new(0));
            let c = Arc::clone(&counter);
    
            let ex = SimpleExecutor::new();
            ex.spawn(async move {
                c.fetch_add(1, Ordering::SeqCst);
            });
            ex.run();
    
            assert_eq!(counter.load(Ordering::SeqCst), 1);
        }
    
        #[test]
        fn test_runs_multiple_tasks() {
            let counter = Arc::new(AtomicUsize::new(0));
    
            let ex = SimpleExecutor::new();
            for _ in 0..5 {
                let c = Arc::clone(&counter);
                ex.spawn(async move {
                    c.fetch_add(1, Ordering::SeqCst);
                });
            }
            ex.run();
    
            assert_eq!(counter.load(Ordering::SeqCst), 5);
        }
    
        #[test]
        fn test_empty_executor() {
            let ex = SimpleExecutor::new();
            ex.run(); // Should complete immediately
        }
    
        #[test]
        fn test_tasks_run_to_completion() {
            let values = Arc::new(Mutex::new(Vec::new()));
    
            let ex = SimpleExecutor::new();
            for i in 0..3 {
                let v = Arc::clone(&values);
                ex.spawn(async move {
                    v.lock().unwrap().push(i);
                });
            }
            ex.run();
    
            let mut result = values.lock().unwrap().clone();
            result.sort();
            assert_eq!(result, vec![0, 1, 2]);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
        use std::sync::atomic::{AtomicUsize, Ordering};
    
        #[test]
        fn test_runs_single_task() {
            let counter = Arc::new(AtomicUsize::new(0));
            let c = Arc::clone(&counter);
    
            let ex = SimpleExecutor::new();
            ex.spawn(async move {
                c.fetch_add(1, Ordering::SeqCst);
            });
            ex.run();
    
            assert_eq!(counter.load(Ordering::SeqCst), 1);
        }
    
        #[test]
        fn test_runs_multiple_tasks() {
            let counter = Arc::new(AtomicUsize::new(0));
    
            let ex = SimpleExecutor::new();
            for _ in 0..5 {
                let c = Arc::clone(&counter);
                ex.spawn(async move {
                    c.fetch_add(1, Ordering::SeqCst);
                });
            }
            ex.run();
    
            assert_eq!(counter.load(Ordering::SeqCst), 5);
        }
    
        #[test]
        fn test_empty_executor() {
            let ex = SimpleExecutor::new();
            ex.run(); // Should complete immediately
        }
    
        #[test]
        fn test_tasks_run_to_completion() {
            let values = Arc::new(Mutex::new(Vec::new()));
    
            let ex = SimpleExecutor::new();
            for i in 0..3 {
                let v = Arc::clone(&values);
                ex.spawn(async move {
                    v.lock().unwrap().push(i);
                });
            }
            ex.run();
    
            let mut result = values.lock().unwrap().clone();
            result.sort();
            assert_eq!(result, vec![0, 1, 2]);
        }
    }

    Deep Comparison

    OCaml vs Rust: Executor Basics

    Task Queue

    OCaml (Lwt):

    (* Implicit scheduler in Lwt *)
    Lwt_main.run (task1 >>= fun () -> task2)
    

    Rust:

    let ex = SimpleExecutor::new();
    ex.spawn(async { task1() });
    ex.spawn(async { task2() });
    ex.run();
    

    Key Differences

    AspectOCamlRust
    ExecutorImplicit (Lwt scheduler)Explicit run() call
    Task queueInternal to Lwtmpsc::sync_channel
    Waker mechanismLwt callbacksManual Waker vtable
    SpawnLwt.asyncexecutor.spawn()

    Exercises

  • Extend the minimal executor to support spawning new tasks from within futures.
  • Add a counter to the executor that tracks how many times each task was polled — useful for performance debugging.
  • Implement a block_on(future) function that runs a single future to completion using the minimal executor.
  • Open Source Repos