ExamplesBy LevelBy TopicLearning Paths
992 Advanced

992 Actor Pattern

Functional Programming

Tutorial

The Problem

Implement the actor model in Rust: an actor owns mutable state and processes messages sequentially from an mpsc channel mailbox. External code sends messages but never accesses state directly. The actor runs in its own thread. Implement a CounterActor with increment/decrement/get/shutdown messages, demonstrating request-reply via a one-shot Sender<T> in the message.

🎯 Learning Outcomes

  • • Define a enum CounterMsg { Increment(i64), Decrement(i64), GetValue(mpsc::Sender<i64>), Shutdown } for typed messages
  • • Spawn an actor thread that owns state: i64 and processes messages via rx.iter()
  • • Implement request-reply: GetValue(mpsc::Sender<i64>) carries a reply channel inside the message
  • • Implement Shutdown: break from the processing loop, closing the mailbox channel
  • • Recognize the actor model as a safe alternative to shared mutable state
  • Code Example

    #![allow(clippy::all)]
    // 992: Actor Pattern
    // Rust: enum Message + thread + mpsc channel mailbox
    
    use std::sync::mpsc;
    use std::thread;
    
    // --- Approach 1: Counter actor ---
    #[derive(Debug)]
    enum CounterMsg {
        Increment(i64),
        Decrement(i64),
        GetValue(mpsc::Sender<i64>),
        Shutdown,
    }
    
    struct CounterActor {
        tx: mpsc::Sender<CounterMsg>,
    }
    
    impl CounterActor {
        fn spawn() -> Self {
            let (tx, rx) = mpsc::channel::<CounterMsg>();
            thread::spawn(move || {
                let mut state: i64 = 0;
                for msg in rx.iter() {
                    match msg {
                        CounterMsg::Increment(n) => state += n,
                        CounterMsg::Decrement(n) => state -= n,
                        CounterMsg::GetValue(reply) => {
                            reply.send(state).ok();
                        }
                        CounterMsg::Shutdown => break,
                    }
                }
            });
            CounterActor { tx }
        }
    
        fn increment(&self, n: i64) {
            self.tx.send(CounterMsg::Increment(n)).unwrap();
        }
        fn decrement(&self, n: i64) {
            self.tx.send(CounterMsg::Decrement(n)).unwrap();
        }
    
        fn get_value(&self) -> i64 {
            let (reply_tx, reply_rx) = mpsc::channel();
            self.tx.send(CounterMsg::GetValue(reply_tx)).unwrap();
            reply_rx.recv().unwrap()
        }
    
        fn shutdown(self) {
            self.tx.send(CounterMsg::Shutdown).ok();
        }
    }
    
    // --- Approach 2: Generic actor with request-response ---
    #[derive(Debug)]
    enum AdderMsg {
        Add {
            a: i32,
            b: i32,
            reply: mpsc::Sender<i32>,
        },
        Stop,
    }
    
    struct AdderActor {
        tx: mpsc::Sender<AdderMsg>,
    }
    
    impl AdderActor {
        fn spawn() -> Self {
            let (tx, rx) = mpsc::channel::<AdderMsg>();
            thread::spawn(move || {
                for msg in rx.iter() {
                    match msg {
                        AdderMsg::Add { a, b, reply } => {
                            reply.send(a + b).ok();
                        }
                        AdderMsg::Stop => break,
                    }
                }
            });
            AdderActor { tx }
        }
    
        fn add(&self, a: i32, b: i32) -> i32 {
            let (reply_tx, reply_rx) = mpsc::channel();
            self.tx
                .send(AdderMsg::Add {
                    a,
                    b,
                    reply: reply_tx,
                })
                .unwrap();
            reply_rx.recv().unwrap()
        }
    
        fn stop(self) {
            self.tx.send(AdderMsg::Stop).ok();
        }
    }
    
    // --- Approach 3: State machine actor ---
    #[derive(Debug, PartialEq, Clone)]
    enum TrafficLight {
        Red,
        Yellow,
        Green,
    }
    
    #[derive(Debug)]
    enum TrafficMsg {
        Next,
        GetState(mpsc::Sender<TrafficLight>),
        Stop,
    }
    
    struct TrafficActor {
        tx: mpsc::Sender<TrafficMsg>,
    }
    
    impl TrafficActor {
        fn spawn() -> Self {
            let (tx, rx) = mpsc::channel::<TrafficMsg>();
            thread::spawn(move || {
                let mut state = TrafficLight::Red;
                for msg in rx.iter() {
                    match msg {
                        TrafficMsg::Next => {
                            state = match state {
                                TrafficLight::Red => TrafficLight::Green,
                                TrafficLight::Green => TrafficLight::Yellow,
                                TrafficLight::Yellow => TrafficLight::Red,
                            };
                        }
                        TrafficMsg::GetState(reply) => {
                            reply.send(state.clone()).ok();
                        }
                        TrafficMsg::Stop => break,
                    }
                }
            });
            TrafficActor { tx }
        }
    
        fn next(&self) {
            self.tx.send(TrafficMsg::Next).unwrap();
        }
    
        fn state(&self) -> TrafficLight {
            let (r, rx) = mpsc::channel();
            self.tx.send(TrafficMsg::GetState(r)).unwrap();
            rx.recv().unwrap()
        }
    
        fn stop(self) {
            self.tx.send(TrafficMsg::Stop).ok();
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_counter_actor() {
            let actor = CounterActor::spawn();
            actor.increment(10);
            actor.increment(5);
            actor.decrement(3);
            assert_eq!(actor.get_value(), 12);
            actor.shutdown();
        }
    
        #[test]
        fn test_adder_actor() {
            let adder = AdderActor::spawn();
            assert_eq!(adder.add(17, 25), 42);
            assert_eq!(adder.add(1, 1), 2);
            adder.stop();
        }
    
        #[test]
        fn test_traffic_light_actor() {
            let t = TrafficActor::spawn();
            assert_eq!(t.state(), TrafficLight::Red);
            t.next();
            assert_eq!(t.state(), TrafficLight::Green);
            t.next();
            assert_eq!(t.state(), TrafficLight::Yellow);
            t.next();
            assert_eq!(t.state(), TrafficLight::Red);
            t.stop();
        }
    
        #[test]
        fn test_counter_negative() {
            let actor = CounterActor::spawn();
            actor.decrement(5);
            assert_eq!(actor.get_value(), -5);
            actor.shutdown();
        }
    }

    Key Differences

    AspectRustOCaml
    Mailboxmpsc::Receiver<Msg> — buffered, asyncEvent.channel — synchronous rendezvous
    Reply channelmpsc::Sender<T> embedded in messageEvent.channel embedded in message
    State ownershipActor thread exclusively owns itActor thread exclusively owns it
    Message passingSend + 'static boundsGC handles ownership

    The actor model avoids lock contention by design: no two threads ever access state simultaneously because messages are processed sequentially. Actors compose naturally — one actor can send messages to another.

    OCaml Approach

    type counter_msg =
      | Increment of int
      | Decrement of int
      | GetValue of int Event.channel
      | Shutdown
    
    let spawn_counter () =
      let ch = Event.new_channel () in
      let thread = Thread.create (fun () ->
        let state = ref 0 in
        let rec loop () = match Event.sync (Event.receive ch) with
          | Increment n -> state := !state + n; loop ()
          | Decrement n -> state := !state - n; loop ()
          | GetValue reply ->
            Event.sync (Event.send reply !state); loop ()
          | Shutdown -> ()
        in loop ()
      ) () in
      (ch, thread)
    

    OCaml's Event.channel is synchronous (rendezvous). For true async actors with buffered mailboxes, Domainslib.Chan or Lwt_mvar is used. The pattern is structurally identical: state owned by the thread, messages via a channel, reply via a one-shot channel in the message.

    Full Source

    #![allow(clippy::all)]
    // 992: Actor Pattern
    // Rust: enum Message + thread + mpsc channel mailbox
    
    use std::sync::mpsc;
    use std::thread;
    
    // --- Approach 1: Counter actor ---
    #[derive(Debug)]
    enum CounterMsg {
        Increment(i64),
        Decrement(i64),
        GetValue(mpsc::Sender<i64>),
        Shutdown,
    }
    
    struct CounterActor {
        tx: mpsc::Sender<CounterMsg>,
    }
    
    impl CounterActor {
        fn spawn() -> Self {
            let (tx, rx) = mpsc::channel::<CounterMsg>();
            thread::spawn(move || {
                let mut state: i64 = 0;
                for msg in rx.iter() {
                    match msg {
                        CounterMsg::Increment(n) => state += n,
                        CounterMsg::Decrement(n) => state -= n,
                        CounterMsg::GetValue(reply) => {
                            reply.send(state).ok();
                        }
                        CounterMsg::Shutdown => break,
                    }
                }
            });
            CounterActor { tx }
        }
    
        fn increment(&self, n: i64) {
            self.tx.send(CounterMsg::Increment(n)).unwrap();
        }
        fn decrement(&self, n: i64) {
            self.tx.send(CounterMsg::Decrement(n)).unwrap();
        }
    
        fn get_value(&self) -> i64 {
            let (reply_tx, reply_rx) = mpsc::channel();
            self.tx.send(CounterMsg::GetValue(reply_tx)).unwrap();
            reply_rx.recv().unwrap()
        }
    
        fn shutdown(self) {
            self.tx.send(CounterMsg::Shutdown).ok();
        }
    }
    
    // --- Approach 2: Generic actor with request-response ---
    #[derive(Debug)]
    enum AdderMsg {
        Add {
            a: i32,
            b: i32,
            reply: mpsc::Sender<i32>,
        },
        Stop,
    }
    
    struct AdderActor {
        tx: mpsc::Sender<AdderMsg>,
    }
    
    impl AdderActor {
        fn spawn() -> Self {
            let (tx, rx) = mpsc::channel::<AdderMsg>();
            thread::spawn(move || {
                for msg in rx.iter() {
                    match msg {
                        AdderMsg::Add { a, b, reply } => {
                            reply.send(a + b).ok();
                        }
                        AdderMsg::Stop => break,
                    }
                }
            });
            AdderActor { tx }
        }
    
        fn add(&self, a: i32, b: i32) -> i32 {
            let (reply_tx, reply_rx) = mpsc::channel();
            self.tx
                .send(AdderMsg::Add {
                    a,
                    b,
                    reply: reply_tx,
                })
                .unwrap();
            reply_rx.recv().unwrap()
        }
    
        fn stop(self) {
            self.tx.send(AdderMsg::Stop).ok();
        }
    }
    
    // --- Approach 3: State machine actor ---
    #[derive(Debug, PartialEq, Clone)]
    enum TrafficLight {
        Red,
        Yellow,
        Green,
    }
    
    #[derive(Debug)]
    enum TrafficMsg {
        Next,
        GetState(mpsc::Sender<TrafficLight>),
        Stop,
    }
    
    struct TrafficActor {
        tx: mpsc::Sender<TrafficMsg>,
    }
    
    impl TrafficActor {
        fn spawn() -> Self {
            let (tx, rx) = mpsc::channel::<TrafficMsg>();
            thread::spawn(move || {
                let mut state = TrafficLight::Red;
                for msg in rx.iter() {
                    match msg {
                        TrafficMsg::Next => {
                            state = match state {
                                TrafficLight::Red => TrafficLight::Green,
                                TrafficLight::Green => TrafficLight::Yellow,
                                TrafficLight::Yellow => TrafficLight::Red,
                            };
                        }
                        TrafficMsg::GetState(reply) => {
                            reply.send(state.clone()).ok();
                        }
                        TrafficMsg::Stop => break,
                    }
                }
            });
            TrafficActor { tx }
        }
    
        fn next(&self) {
            self.tx.send(TrafficMsg::Next).unwrap();
        }
    
        fn state(&self) -> TrafficLight {
            let (r, rx) = mpsc::channel();
            self.tx.send(TrafficMsg::GetState(r)).unwrap();
            rx.recv().unwrap()
        }
    
        fn stop(self) {
            self.tx.send(TrafficMsg::Stop).ok();
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_counter_actor() {
            let actor = CounterActor::spawn();
            actor.increment(10);
            actor.increment(5);
            actor.decrement(3);
            assert_eq!(actor.get_value(), 12);
            actor.shutdown();
        }
    
        #[test]
        fn test_adder_actor() {
            let adder = AdderActor::spawn();
            assert_eq!(adder.add(17, 25), 42);
            assert_eq!(adder.add(1, 1), 2);
            adder.stop();
        }
    
        #[test]
        fn test_traffic_light_actor() {
            let t = TrafficActor::spawn();
            assert_eq!(t.state(), TrafficLight::Red);
            t.next();
            assert_eq!(t.state(), TrafficLight::Green);
            t.next();
            assert_eq!(t.state(), TrafficLight::Yellow);
            t.next();
            assert_eq!(t.state(), TrafficLight::Red);
            t.stop();
        }
    
        #[test]
        fn test_counter_negative() {
            let actor = CounterActor::spawn();
            actor.decrement(5);
            assert_eq!(actor.get_value(), -5);
            actor.shutdown();
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_counter_actor() {
            let actor = CounterActor::spawn();
            actor.increment(10);
            actor.increment(5);
            actor.decrement(3);
            assert_eq!(actor.get_value(), 12);
            actor.shutdown();
        }
    
        #[test]
        fn test_adder_actor() {
            let adder = AdderActor::spawn();
            assert_eq!(adder.add(17, 25), 42);
            assert_eq!(adder.add(1, 1), 2);
            adder.stop();
        }
    
        #[test]
        fn test_traffic_light_actor() {
            let t = TrafficActor::spawn();
            assert_eq!(t.state(), TrafficLight::Red);
            t.next();
            assert_eq!(t.state(), TrafficLight::Green);
            t.next();
            assert_eq!(t.state(), TrafficLight::Yellow);
            t.next();
            assert_eq!(t.state(), TrafficLight::Red);
            t.stop();
        }
    
        #[test]
        fn test_counter_negative() {
            let actor = CounterActor::spawn();
            actor.decrement(5);
            assert_eq!(actor.get_value(), -5);
            actor.shutdown();
        }
    }

    Deep Comparison

    Actor Pattern — Comparison

    Core Insight

    Actors replace shared mutable state with message passing to a single owner. The actor owns its state exclusively — no locks needed because only one thread touches it. The channel IS the thread-safe boundary.

    OCaml Approach

  • • Thread + Queue + Mutex + Condition = manual mailbox
  • • Message type is a variant; actor loops over recv calls
  • • Request-response: embed result ref + Condition in message
  • • More boilerplate, same concept as Rust
  • • Akka/Erlang heritage — functional languages pioneered this pattern
  • Rust Approach

  • mpsc::channel::<Message>() is the mailbox
  • thread::spawn runs the actor loop: for msg in rx.iter() { match msg }
  • • Request-response: embed mpsc::Sender<Reply> in the message variant
  • • Struct wraps the Sender<Message> — provides typed API methods
  • • No Arc<Mutex<...>> needed — the actor owns all state
  • Comparison Table

    ConceptOCamlRust
    MailboxQueue + Mutex + Conditionmpsc::channel::<Msg>()
    Message typeVariant type / ADTenum Message { ... }
    Actor loopwhile running { match recv }for msg in rx.iter() { match msg }
    Request-responseEmbed result ref + ConditionEmbed Sender<Reply> in variant
    State ownershipref inside actor closureLocal variable in thread closure
    ShutdownShutdown variantShutdown variant (or drop tx)
    No lock neededYes — one ownerYes — one owner

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Implement a BankActor with Deposit, Withdraw, Balance, and Shutdown messages. Withdraw returns Err via the reply channel if balance would go negative.
  • Implement a supervisor: if a worker actor panics, the supervisor detects the dead thread and spawns a replacement.
  • Chain two actors: LoggingActor forwards messages to CounterActor and logs each operation.
  • Implement an ActorRef<Msg> wrapper around Sender<Msg> for ergonomic message sending.
  • Rewrite using tokio::sync::mpsc and tokio::task::spawn for async actor execution.
  • Open Source Repos