992 Actor Pattern
Tutorial
The Problem
Implement the actor model in Rust: an actor owns mutable state and processes messages sequentially from an mpsc channel mailbox. External code sends messages but never accesses state directly. The actor runs in its own thread. Implement a CounterActor with increment/decrement/get/shutdown messages, demonstrating request-reply via a one-shot Sender<T> in the message.
🎯 Learning Outcomes
enum CounterMsg { Increment(i64), Decrement(i64), GetValue(mpsc::Sender<i64>), Shutdown } for typed messagesstate: i64 and processes messages via rx.iter()GetValue(mpsc::Sender<i64>) carries a reply channel inside the messageShutdown: break from the processing loop, closing the mailbox channelCode Example
#![allow(clippy::all)]
// 992: Actor Pattern
// Rust: enum Message + thread + mpsc channel mailbox
use std::sync::mpsc;
use std::thread;
// --- Approach 1: Counter actor ---
#[derive(Debug)]
enum CounterMsg {
Increment(i64),
Decrement(i64),
GetValue(mpsc::Sender<i64>),
Shutdown,
}
struct CounterActor {
tx: mpsc::Sender<CounterMsg>,
}
impl CounterActor {
fn spawn() -> Self {
let (tx, rx) = mpsc::channel::<CounterMsg>();
thread::spawn(move || {
let mut state: i64 = 0;
for msg in rx.iter() {
match msg {
CounterMsg::Increment(n) => state += n,
CounterMsg::Decrement(n) => state -= n,
CounterMsg::GetValue(reply) => {
reply.send(state).ok();
}
CounterMsg::Shutdown => break,
}
}
});
CounterActor { tx }
}
fn increment(&self, n: i64) {
self.tx.send(CounterMsg::Increment(n)).unwrap();
}
fn decrement(&self, n: i64) {
self.tx.send(CounterMsg::Decrement(n)).unwrap();
}
fn get_value(&self) -> i64 {
let (reply_tx, reply_rx) = mpsc::channel();
self.tx.send(CounterMsg::GetValue(reply_tx)).unwrap();
reply_rx.recv().unwrap()
}
fn shutdown(self) {
self.tx.send(CounterMsg::Shutdown).ok();
}
}
// --- Approach 2: Generic actor with request-response ---
#[derive(Debug)]
enum AdderMsg {
Add {
a: i32,
b: i32,
reply: mpsc::Sender<i32>,
},
Stop,
}
struct AdderActor {
tx: mpsc::Sender<AdderMsg>,
}
impl AdderActor {
fn spawn() -> Self {
let (tx, rx) = mpsc::channel::<AdderMsg>();
thread::spawn(move || {
for msg in rx.iter() {
match msg {
AdderMsg::Add { a, b, reply } => {
reply.send(a + b).ok();
}
AdderMsg::Stop => break,
}
}
});
AdderActor { tx }
}
fn add(&self, a: i32, b: i32) -> i32 {
let (reply_tx, reply_rx) = mpsc::channel();
self.tx
.send(AdderMsg::Add {
a,
b,
reply: reply_tx,
})
.unwrap();
reply_rx.recv().unwrap()
}
fn stop(self) {
self.tx.send(AdderMsg::Stop).ok();
}
}
// --- Approach 3: State machine actor ---
#[derive(Debug, PartialEq, Clone)]
enum TrafficLight {
Red,
Yellow,
Green,
}
#[derive(Debug)]
enum TrafficMsg {
Next,
GetState(mpsc::Sender<TrafficLight>),
Stop,
}
struct TrafficActor {
tx: mpsc::Sender<TrafficMsg>,
}
impl TrafficActor {
fn spawn() -> Self {
let (tx, rx) = mpsc::channel::<TrafficMsg>();
thread::spawn(move || {
let mut state = TrafficLight::Red;
for msg in rx.iter() {
match msg {
TrafficMsg::Next => {
state = match state {
TrafficLight::Red => TrafficLight::Green,
TrafficLight::Green => TrafficLight::Yellow,
TrafficLight::Yellow => TrafficLight::Red,
};
}
TrafficMsg::GetState(reply) => {
reply.send(state.clone()).ok();
}
TrafficMsg::Stop => break,
}
}
});
TrafficActor { tx }
}
fn next(&self) {
self.tx.send(TrafficMsg::Next).unwrap();
}
fn state(&self) -> TrafficLight {
let (r, rx) = mpsc::channel();
self.tx.send(TrafficMsg::GetState(r)).unwrap();
rx.recv().unwrap()
}
fn stop(self) {
self.tx.send(TrafficMsg::Stop).ok();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_counter_actor() {
let actor = CounterActor::spawn();
actor.increment(10);
actor.increment(5);
actor.decrement(3);
assert_eq!(actor.get_value(), 12);
actor.shutdown();
}
#[test]
fn test_adder_actor() {
let adder = AdderActor::spawn();
assert_eq!(adder.add(17, 25), 42);
assert_eq!(adder.add(1, 1), 2);
adder.stop();
}
#[test]
fn test_traffic_light_actor() {
let t = TrafficActor::spawn();
assert_eq!(t.state(), TrafficLight::Red);
t.next();
assert_eq!(t.state(), TrafficLight::Green);
t.next();
assert_eq!(t.state(), TrafficLight::Yellow);
t.next();
assert_eq!(t.state(), TrafficLight::Red);
t.stop();
}
#[test]
fn test_counter_negative() {
let actor = CounterActor::spawn();
actor.decrement(5);
assert_eq!(actor.get_value(), -5);
actor.shutdown();
}
}Key Differences
| Aspect | Rust | OCaml |
|---|---|---|
| Mailbox | mpsc::Receiver<Msg> — buffered, async | Event.channel — synchronous rendezvous |
| Reply channel | mpsc::Sender<T> embedded in message | Event.channel embedded in message |
| State ownership | Actor thread exclusively owns it | Actor thread exclusively owns it |
| Message passing | Send + 'static bounds | GC handles ownership |
The actor model avoids lock contention by design: no two threads ever access state simultaneously because messages are processed sequentially. Actors compose naturally — one actor can send messages to another.
OCaml Approach
type counter_msg =
| Increment of int
| Decrement of int
| GetValue of int Event.channel
| Shutdown
let spawn_counter () =
let ch = Event.new_channel () in
let thread = Thread.create (fun () ->
let state = ref 0 in
let rec loop () = match Event.sync (Event.receive ch) with
| Increment n -> state := !state + n; loop ()
| Decrement n -> state := !state - n; loop ()
| GetValue reply ->
Event.sync (Event.send reply !state); loop ()
| Shutdown -> ()
in loop ()
) () in
(ch, thread)
OCaml's Event.channel is synchronous (rendezvous). For true async actors with buffered mailboxes, Domainslib.Chan or Lwt_mvar is used. The pattern is structurally identical: state owned by the thread, messages via a channel, reply via a one-shot channel in the message.
Full Source
#![allow(clippy::all)]
// 992: Actor Pattern
// Rust: enum Message + thread + mpsc channel mailbox
use std::sync::mpsc;
use std::thread;
// --- Approach 1: Counter actor ---
#[derive(Debug)]
enum CounterMsg {
Increment(i64),
Decrement(i64),
GetValue(mpsc::Sender<i64>),
Shutdown,
}
struct CounterActor {
tx: mpsc::Sender<CounterMsg>,
}
impl CounterActor {
fn spawn() -> Self {
let (tx, rx) = mpsc::channel::<CounterMsg>();
thread::spawn(move || {
let mut state: i64 = 0;
for msg in rx.iter() {
match msg {
CounterMsg::Increment(n) => state += n,
CounterMsg::Decrement(n) => state -= n,
CounterMsg::GetValue(reply) => {
reply.send(state).ok();
}
CounterMsg::Shutdown => break,
}
}
});
CounterActor { tx }
}
fn increment(&self, n: i64) {
self.tx.send(CounterMsg::Increment(n)).unwrap();
}
fn decrement(&self, n: i64) {
self.tx.send(CounterMsg::Decrement(n)).unwrap();
}
fn get_value(&self) -> i64 {
let (reply_tx, reply_rx) = mpsc::channel();
self.tx.send(CounterMsg::GetValue(reply_tx)).unwrap();
reply_rx.recv().unwrap()
}
fn shutdown(self) {
self.tx.send(CounterMsg::Shutdown).ok();
}
}
// --- Approach 2: Generic actor with request-response ---
#[derive(Debug)]
enum AdderMsg {
Add {
a: i32,
b: i32,
reply: mpsc::Sender<i32>,
},
Stop,
}
struct AdderActor {
tx: mpsc::Sender<AdderMsg>,
}
impl AdderActor {
fn spawn() -> Self {
let (tx, rx) = mpsc::channel::<AdderMsg>();
thread::spawn(move || {
for msg in rx.iter() {
match msg {
AdderMsg::Add { a, b, reply } => {
reply.send(a + b).ok();
}
AdderMsg::Stop => break,
}
}
});
AdderActor { tx }
}
fn add(&self, a: i32, b: i32) -> i32 {
let (reply_tx, reply_rx) = mpsc::channel();
self.tx
.send(AdderMsg::Add {
a,
b,
reply: reply_tx,
})
.unwrap();
reply_rx.recv().unwrap()
}
fn stop(self) {
self.tx.send(AdderMsg::Stop).ok();
}
}
// --- Approach 3: State machine actor ---
#[derive(Debug, PartialEq, Clone)]
enum TrafficLight {
Red,
Yellow,
Green,
}
#[derive(Debug)]
enum TrafficMsg {
Next,
GetState(mpsc::Sender<TrafficLight>),
Stop,
}
struct TrafficActor {
tx: mpsc::Sender<TrafficMsg>,
}
impl TrafficActor {
fn spawn() -> Self {
let (tx, rx) = mpsc::channel::<TrafficMsg>();
thread::spawn(move || {
let mut state = TrafficLight::Red;
for msg in rx.iter() {
match msg {
TrafficMsg::Next => {
state = match state {
TrafficLight::Red => TrafficLight::Green,
TrafficLight::Green => TrafficLight::Yellow,
TrafficLight::Yellow => TrafficLight::Red,
};
}
TrafficMsg::GetState(reply) => {
reply.send(state.clone()).ok();
}
TrafficMsg::Stop => break,
}
}
});
TrafficActor { tx }
}
fn next(&self) {
self.tx.send(TrafficMsg::Next).unwrap();
}
fn state(&self) -> TrafficLight {
let (r, rx) = mpsc::channel();
self.tx.send(TrafficMsg::GetState(r)).unwrap();
rx.recv().unwrap()
}
fn stop(self) {
self.tx.send(TrafficMsg::Stop).ok();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_counter_actor() {
let actor = CounterActor::spawn();
actor.increment(10);
actor.increment(5);
actor.decrement(3);
assert_eq!(actor.get_value(), 12);
actor.shutdown();
}
#[test]
fn test_adder_actor() {
let adder = AdderActor::spawn();
assert_eq!(adder.add(17, 25), 42);
assert_eq!(adder.add(1, 1), 2);
adder.stop();
}
#[test]
fn test_traffic_light_actor() {
let t = TrafficActor::spawn();
assert_eq!(t.state(), TrafficLight::Red);
t.next();
assert_eq!(t.state(), TrafficLight::Green);
t.next();
assert_eq!(t.state(), TrafficLight::Yellow);
t.next();
assert_eq!(t.state(), TrafficLight::Red);
t.stop();
}
#[test]
fn test_counter_negative() {
let actor = CounterActor::spawn();
actor.decrement(5);
assert_eq!(actor.get_value(), -5);
actor.shutdown();
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_counter_actor() {
let actor = CounterActor::spawn();
actor.increment(10);
actor.increment(5);
actor.decrement(3);
assert_eq!(actor.get_value(), 12);
actor.shutdown();
}
#[test]
fn test_adder_actor() {
let adder = AdderActor::spawn();
assert_eq!(adder.add(17, 25), 42);
assert_eq!(adder.add(1, 1), 2);
adder.stop();
}
#[test]
fn test_traffic_light_actor() {
let t = TrafficActor::spawn();
assert_eq!(t.state(), TrafficLight::Red);
t.next();
assert_eq!(t.state(), TrafficLight::Green);
t.next();
assert_eq!(t.state(), TrafficLight::Yellow);
t.next();
assert_eq!(t.state(), TrafficLight::Red);
t.stop();
}
#[test]
fn test_counter_negative() {
let actor = CounterActor::spawn();
actor.decrement(5);
assert_eq!(actor.get_value(), -5);
actor.shutdown();
}
}
Deep Comparison
Actor Pattern — Comparison
Core Insight
Actors replace shared mutable state with message passing to a single owner. The actor owns its state exclusively — no locks needed because only one thread touches it. The channel IS the thread-safe boundary.
OCaml Approach
recv callsresult ref + Condition in messageRust Approach
mpsc::channel::<Message>() is the mailboxthread::spawn runs the actor loop: for msg in rx.iter() { match msg }mpsc::Sender<Reply> in the message variantSender<Message> — provides typed API methodsArc<Mutex<...>> needed — the actor owns all stateComparison Table
| Concept | OCaml | Rust |
|---|---|---|
| Mailbox | Queue + Mutex + Condition | mpsc::channel::<Msg>() |
| Message type | Variant type / ADT | enum Message { ... } |
| Actor loop | while running { match recv } | for msg in rx.iter() { match msg } |
| Request-response | Embed result ref + Condition | Embed Sender<Reply> in variant |
| State ownership | ref inside actor closure | Local variable in thread closure |
| Shutdown | Shutdown variant | Shutdown variant (or drop tx) |
| No lock needed | Yes — one owner | Yes — one owner |
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
BankActor with Deposit, Withdraw, Balance, and Shutdown messages. Withdraw returns Err via the reply channel if balance would go negative.LoggingActor forwards messages to CounterActor and logs each operation.ActorRef<Msg> wrapper around Sender<Msg> for ergonomic message sending.tokio::sync::mpsc and tokio::task::spawn for async actor execution.