Lock-Free Queue
Tutorial
The Problem
A mutex-protected queue serialises every enqueue and dequeue. In high-throughput systems — network packet processing, work-stealing schedulers, disruptor-pattern event buses — the lock becomes the bottleneck. Michael and Scott (1996) published a non-blocking queue algorithm using only CAS operations on head and tail pointers. It is the basis for Java's ConcurrentLinkedQueue, the crossbeam-queue crate, and the LMAX Disruptor.
🎯 Learning Outcomes
AtomicPtr with Acquire/Release ordering for safe cross-thread pointer sharingDrop that drains remaining nodes without leaking memoryCode Example
#![allow(clippy::all)]
// 468. Lock-free queue basics (Michael-Scott, simplified)
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use std::thread;
struct Node<T> {
value: Option<T>,
next: AtomicPtr<Node<T>>,
}
impl<T> Node<T> {
fn new(v: Option<T>) -> *mut Self {
Box::into_raw(Box::new(Node {
value: v,
next: AtomicPtr::new(ptr::null_mut()),
}))
}
}
pub struct Queue<T> {
head: AtomicPtr<Node<T>>,
tail: AtomicPtr<Node<T>>,
}
unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}
impl<T> Queue<T> {
pub fn new() -> Self {
let d = Node::new(None);
Queue {
head: AtomicPtr::new(d),
tail: AtomicPtr::new(d),
}
}
pub fn enqueue(&self, v: T) {
let n = Node::new(Some(v));
loop {
let t = self.tail.load(Ordering::Acquire);
let next = unsafe { (*t).next.load(Ordering::Acquire) };
if next.is_null() {
match unsafe {
(*t).next.compare_exchange_weak(
ptr::null_mut(),
n,
Ordering::Release,
Ordering::Relaxed,
)
} {
Ok(_) => {
let _ =
self.tail
.compare_exchange(t, n, Ordering::Release, Ordering::Relaxed);
return;
}
Err(_) => {}
}
} else {
let _ = self
.tail
.compare_exchange(t, next, Ordering::Release, Ordering::Relaxed);
}
}
}
pub fn dequeue(&self) -> Option<T> {
loop {
let h = self.head.load(Ordering::Acquire);
let t = self.tail.load(Ordering::Acquire);
let next = unsafe { (*h).next.load(Ordering::Acquire) };
if h == t {
if next.is_null() {
return None;
}
let _ = self
.tail
.compare_exchange(t, next, Ordering::Release, Ordering::Relaxed);
} else {
match self
.head
.compare_exchange_weak(h, next, Ordering::AcqRel, Ordering::Relaxed)
{
Ok(_) => {
let v = unsafe { ptr::read(&(*next).value) };
unsafe { drop(Box::from_raw(h)) };
return v;
}
Err(_) => {}
}
}
}
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
while self.dequeue().is_some() {}
unsafe {
drop(Box::from_raw(self.head.load(Ordering::Relaxed)));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fifo() {
let q = Queue::new();
for i in 1..=5u32 {
q.enqueue(i);
}
for i in 1..=5 {
assert_eq!(q.dequeue(), Some(i));
}
assert_eq!(q.dequeue(), None);
}
#[test]
fn test_concurrent() {
let q = Arc::new(Queue::<u32>::new());
thread::scope(|s| {
for i in 0..4u32 {
let q = Arc::clone(&q);
s.spawn(move || {
for j in 0..25 {
q.enqueue(i * 25 + j);
}
});
}
});
let mut c = 0;
while q.dequeue().is_some() {
c += 1;
}
assert_eq!(c, 100);
}
}Key Differences
unsafe**: Rust requires unsafe blocks around raw pointer dereferences and Box::from_raw; OCaml's GC tracks all pointers and prohibits raw arithmetic entirely.Box::from_raw(h) to free dequeued nodes; in production, epoch-based reclamation (example 467) is needed to prevent use-after-free under concurrent dequeue.Send/Sync proofs**: Rust demands explicit unsafe impl Send/Sync; OCaml's type system does not distinguish shared vs. owned data at the type level.OCaml Approach
Multicore OCaml's Saturn library provides Saturn.Queue (Michael-Scott) and Saturn.Single_prod_single_cons_queue. In pure OCaml the idiom is:
(* Using Saturn *)
let q = Saturn.Queue.create ()
let () = Saturn.Queue.push q 42
let v = Saturn.Queue.pop_opt q (* None | Some x *)
Without Saturn, functional OCaml uses immutable persistent queues (two-list Okasaki queue) which are naturally thread-safe for reads but require atomic references for multi-producer use.
Full Source
#![allow(clippy::all)]
// 468. Lock-free queue basics (Michael-Scott, simplified)
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use std::thread;
struct Node<T> {
value: Option<T>,
next: AtomicPtr<Node<T>>,
}
impl<T> Node<T> {
fn new(v: Option<T>) -> *mut Self {
Box::into_raw(Box::new(Node {
value: v,
next: AtomicPtr::new(ptr::null_mut()),
}))
}
}
pub struct Queue<T> {
head: AtomicPtr<Node<T>>,
tail: AtomicPtr<Node<T>>,
}
unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}
impl<T> Queue<T> {
pub fn new() -> Self {
let d = Node::new(None);
Queue {
head: AtomicPtr::new(d),
tail: AtomicPtr::new(d),
}
}
pub fn enqueue(&self, v: T) {
let n = Node::new(Some(v));
loop {
let t = self.tail.load(Ordering::Acquire);
let next = unsafe { (*t).next.load(Ordering::Acquire) };
if next.is_null() {
match unsafe {
(*t).next.compare_exchange_weak(
ptr::null_mut(),
n,
Ordering::Release,
Ordering::Relaxed,
)
} {
Ok(_) => {
let _ =
self.tail
.compare_exchange(t, n, Ordering::Release, Ordering::Relaxed);
return;
}
Err(_) => {}
}
} else {
let _ = self
.tail
.compare_exchange(t, next, Ordering::Release, Ordering::Relaxed);
}
}
}
pub fn dequeue(&self) -> Option<T> {
loop {
let h = self.head.load(Ordering::Acquire);
let t = self.tail.load(Ordering::Acquire);
let next = unsafe { (*h).next.load(Ordering::Acquire) };
if h == t {
if next.is_null() {
return None;
}
let _ = self
.tail
.compare_exchange(t, next, Ordering::Release, Ordering::Relaxed);
} else {
match self
.head
.compare_exchange_weak(h, next, Ordering::AcqRel, Ordering::Relaxed)
{
Ok(_) => {
let v = unsafe { ptr::read(&(*next).value) };
unsafe { drop(Box::from_raw(h)) };
return v;
}
Err(_) => {}
}
}
}
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
while self.dequeue().is_some() {}
unsafe {
drop(Box::from_raw(self.head.load(Ordering::Relaxed)));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fifo() {
let q = Queue::new();
for i in 1..=5u32 {
q.enqueue(i);
}
for i in 1..=5 {
assert_eq!(q.dequeue(), Some(i));
}
assert_eq!(q.dequeue(), None);
}
#[test]
fn test_concurrent() {
let q = Arc::new(Queue::<u32>::new());
thread::scope(|s| {
for i in 0..4u32 {
let q = Arc::clone(&q);
s.spawn(move || {
for j in 0..25 {
q.enqueue(i * 25 + j);
}
});
}
});
let mut c = 0;
while q.dequeue().is_some() {
c += 1;
}
assert_eq!(c, 100);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fifo() {
let q = Queue::new();
for i in 1..=5u32 {
q.enqueue(i);
}
for i in 1..=5 {
assert_eq!(q.dequeue(), Some(i));
}
assert_eq!(q.dequeue(), None);
}
#[test]
fn test_concurrent() {
let q = Arc::new(Queue::<u32>::new());
thread::scope(|s| {
for i in 0..4u32 {
let q = Arc::clone(&q);
s.spawn(move || {
for j in 0..25 {
q.enqueue(i * 25 + j);
}
});
}
});
let mut c = 0;
while q.dequeue().is_some() {
c += 1;
}
assert_eq!(c, 100);
}
}
Exercises
EpochMgr::retire from example 467 to eliminate the use-after-free hazard.AtomicUsize length counter; block enqueue when capacity is reached using a Condvar, turning the queue into a bounded channel.Mutex<VecDeque> under 4 producer / 4 consumer threads using criterion.