350: Oneshot Channel
Tutorial
The Problem
Many concurrent operations need exactly one result delivered back: spawning a task and waiting for its answer, implementing a request-response protocol, or delivering a computation result from a worker thread. MPSC channels are overkill here — they're designed for streams of messages. A oneshot channel is optimized for single-value delivery: the sender can only send once (consuming itself), the receiver can only receive once, and both are guaranteed by the type system. Tokio's tokio::sync::oneshot implements this; the stdlib provides no equivalent, making it an instructive primitive to build from scratch.
🎯 Learning Outcomes
Arc<(Mutex<Option<T>>, Condvar)>OneshotSender on send() to enforce single-send at the type levelCondvar to block the receiver until the value is readytry_recv alongside blocking recvSender being consumed (not &mut self) prevents double-sendCode Example
#![allow(clippy::all)]
//! # Oneshot Channel
//! Send exactly one value from one task to another.
use std::sync::{Arc, Condvar, Mutex};
pub struct OneshotSender<T> {
state: Arc<(Mutex<Option<T>>, Condvar)>,
}
pub struct OneshotReceiver<T> {
state: Arc<(Mutex<Option<T>>, Condvar)>,
}
pub fn oneshot<T>() -> (OneshotSender<T>, OneshotReceiver<T>) {
let state = Arc::new((Mutex::new(None), Condvar::new()));
(
OneshotSender {
state: Arc::clone(&state),
},
OneshotReceiver { state },
)
}
impl<T> OneshotSender<T> {
pub fn send(self, value: T) {
let (lock, cvar) = &*self.state;
*lock.lock().unwrap() = Some(value);
cvar.notify_one();
}
}
impl<T> OneshotReceiver<T> {
pub fn recv(self) -> T {
let (lock, cvar) = &*self.state;
let mut guard = lock.lock().unwrap();
while guard.is_none() {
guard = cvar.wait(guard).unwrap();
}
guard.take().unwrap()
}
pub fn try_recv(&self) -> Option<T> {
self.state.0.lock().unwrap().take()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn oneshot_sends_value() {
let (tx, rx) = oneshot();
tx.send(42);
assert_eq!(rx.recv(), 42);
}
#[test]
fn oneshot_across_threads() {
let (tx, rx) = oneshot();
thread::spawn(move || tx.send("hello"));
assert_eq!(rx.recv(), "hello");
}
#[test]
fn try_recv_before() {
let (_tx, rx) = oneshot::<i32>();
assert!(rx.try_recv().is_none());
}
}Key Differences
| Aspect | Rust oneshot channel | OCaml Lwt.task |
|---|---|---|
| Single-send enforcement | send(self) consumes sender | Idempotent wakeup_later |
| Blocking wait | Condvar::wait | let%lwt suspends fiber |
| Non-blocking check | try_recv() | Lwt.state promise |
| Dropped sender | Receiver blocks forever (deadlock) | Lwt.cancel can resolve |
| Async version | tokio::sync::oneshot | Built into Lwt |
OCaml Approach
OCaml's Lwt uses Lwt.task() for oneshot semantics:
let (promise, resolver) = Lwt.task () in
(* in another fiber: *)
Lwt.wakeup_later resolver 42;
(* back in caller: *)
let%lwt value = promise in
Printf.printf "%d\n" value
Lwt.task() returns a (promise, resolver) pair. The resolver can only be used once (wakeup_later is idempotent after first call). This is the direct Lwt equivalent of a oneshot channel.
Full Source
#![allow(clippy::all)]
//! # Oneshot Channel
//! Send exactly one value from one task to another.
use std::sync::{Arc, Condvar, Mutex};
pub struct OneshotSender<T> {
state: Arc<(Mutex<Option<T>>, Condvar)>,
}
pub struct OneshotReceiver<T> {
state: Arc<(Mutex<Option<T>>, Condvar)>,
}
pub fn oneshot<T>() -> (OneshotSender<T>, OneshotReceiver<T>) {
let state = Arc::new((Mutex::new(None), Condvar::new()));
(
OneshotSender {
state: Arc::clone(&state),
},
OneshotReceiver { state },
)
}
impl<T> OneshotSender<T> {
pub fn send(self, value: T) {
let (lock, cvar) = &*self.state;
*lock.lock().unwrap() = Some(value);
cvar.notify_one();
}
}
impl<T> OneshotReceiver<T> {
pub fn recv(self) -> T {
let (lock, cvar) = &*self.state;
let mut guard = lock.lock().unwrap();
while guard.is_none() {
guard = cvar.wait(guard).unwrap();
}
guard.take().unwrap()
}
pub fn try_recv(&self) -> Option<T> {
self.state.0.lock().unwrap().take()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn oneshot_sends_value() {
let (tx, rx) = oneshot();
tx.send(42);
assert_eq!(rx.recv(), 42);
}
#[test]
fn oneshot_across_threads() {
let (tx, rx) = oneshot();
thread::spawn(move || tx.send("hello"));
assert_eq!(rx.recv(), "hello");
}
#[test]
fn try_recv_before() {
let (_tx, rx) = oneshot::<i32>();
assert!(rx.try_recv().is_none());
}
}#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn oneshot_sends_value() {
let (tx, rx) = oneshot();
tx.send(42);
assert_eq!(rx.recv(), 42);
}
#[test]
fn oneshot_across_threads() {
let (tx, rx) = oneshot();
thread::spawn(move || tx.send("hello"));
assert_eq!(rx.recv(), "hello");
}
#[test]
fn try_recv_before() {
let (_tx, rx) = oneshot::<i32>();
assert!(rx.try_recv().is_none());
}
}
Deep Comparison
OCaml vs Rust: Oneshot Channel
Overview
See the example.rs and example.ml files for detailed implementations.
Key Differences
| Aspect | OCaml | Rust |
|---|---|---|
| Type system | Hindley-Milner | Ownership + traits |
| Memory | GC | Zero-cost abstractions |
| Mutability | Explicit ref | mut keyword |
| Error handling | Option/Result | Result<T, E> |
See README.md for detailed comparison.
Exercises
recv() with a timeout by spawning a thread that sleeps then drops the sender; return Result<T, Timeout> — use try_recv polling in a loop to detect the timeout.(payload, OneshotSender<Response>) on an mpsc channel; the worker processes it and sends back on the oneshot.tokio::sync::oneshot; demonstrate cancellation by dropping the receiver before the sender sends, and observe SendError on the sender side.