ExamplesBy LevelBy TopicLearning Paths
468 Expert

Lock-Free Queue

Functional Programming

Tutorial

The Problem

A mutex-protected queue serialises every enqueue and dequeue. In high-throughput systems — network packet processing, work-stealing schedulers, disruptor-pattern event buses — the lock becomes the bottleneck. Michael and Scott (1996) published a non-blocking queue algorithm using only CAS operations on head and tail pointers. It is the basis for Java's ConcurrentLinkedQueue, the crossbeam-queue crate, and the LMAX Disruptor.

🎯 Learning Outcomes

  • • Understand the Michael-Scott two-pointer queue invariant (sentinel node, tail may lag)
  • • Use AtomicPtr with Acquire/Release ordering for safe cross-thread pointer sharing
  • • Implement CAS-based enqueue with tail advancement
  • • Handle the dequeue loop: empty check, lagging tail fix, head advance
  • • Write safe Drop that drains remaining nodes without leaking memory
  • Code Example

    #![allow(clippy::all)]
    // 468. Lock-free queue basics (Michael-Scott, simplified)
    use std::ptr;
    use std::sync::atomic::{AtomicPtr, Ordering};
    use std::sync::Arc;
    use std::thread;
    
    struct Node<T> {
        value: Option<T>,
        next: AtomicPtr<Node<T>>,
    }
    impl<T> Node<T> {
        fn new(v: Option<T>) -> *mut Self {
            Box::into_raw(Box::new(Node {
                value: v,
                next: AtomicPtr::new(ptr::null_mut()),
            }))
        }
    }
    
    pub struct Queue<T> {
        head: AtomicPtr<Node<T>>,
        tail: AtomicPtr<Node<T>>,
    }
    unsafe impl<T: Send> Send for Queue<T> {}
    unsafe impl<T: Send> Sync for Queue<T> {}
    
    impl<T> Queue<T> {
        pub fn new() -> Self {
            let d = Node::new(None);
            Queue {
                head: AtomicPtr::new(d),
                tail: AtomicPtr::new(d),
            }
        }
        pub fn enqueue(&self, v: T) {
            let n = Node::new(Some(v));
            loop {
                let t = self.tail.load(Ordering::Acquire);
                let next = unsafe { (*t).next.load(Ordering::Acquire) };
                if next.is_null() {
                    match unsafe {
                        (*t).next.compare_exchange_weak(
                            ptr::null_mut(),
                            n,
                            Ordering::Release,
                            Ordering::Relaxed,
                        )
                    } {
                        Ok(_) => {
                            let _ =
                                self.tail
                                    .compare_exchange(t, n, Ordering::Release, Ordering::Relaxed);
                            return;
                        }
                        Err(_) => {}
                    }
                } else {
                    let _ = self
                        .tail
                        .compare_exchange(t, next, Ordering::Release, Ordering::Relaxed);
                }
            }
        }
        pub fn dequeue(&self) -> Option<T> {
            loop {
                let h = self.head.load(Ordering::Acquire);
                let t = self.tail.load(Ordering::Acquire);
                let next = unsafe { (*h).next.load(Ordering::Acquire) };
                if h == t {
                    if next.is_null() {
                        return None;
                    }
                    let _ = self
                        .tail
                        .compare_exchange(t, next, Ordering::Release, Ordering::Relaxed);
                } else {
                    match self
                        .head
                        .compare_exchange_weak(h, next, Ordering::AcqRel, Ordering::Relaxed)
                    {
                        Ok(_) => {
                            let v = unsafe { ptr::read(&(*next).value) };
                            unsafe { drop(Box::from_raw(h)) };
                            return v;
                        }
                        Err(_) => {}
                    }
                }
            }
        }
    }
    impl<T> Drop for Queue<T> {
        fn drop(&mut self) {
            while self.dequeue().is_some() {}
            unsafe {
                drop(Box::from_raw(self.head.load(Ordering::Relaxed)));
            }
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        #[test]
        fn test_fifo() {
            let q = Queue::new();
            for i in 1..=5u32 {
                q.enqueue(i);
            }
            for i in 1..=5 {
                assert_eq!(q.dequeue(), Some(i));
            }
            assert_eq!(q.dequeue(), None);
        }
        #[test]
        fn test_concurrent() {
            let q = Arc::new(Queue::<u32>::new());
            thread::scope(|s| {
                for i in 0..4u32 {
                    let q = Arc::clone(&q);
                    s.spawn(move || {
                        for j in 0..25 {
                            q.enqueue(i * 25 + j);
                        }
                    });
                }
            });
            let mut c = 0;
            while q.dequeue().is_some() {
                c += 1;
            }
            assert_eq!(c, 100);
        }
    }

    Key Differences

  • **Explicit unsafe**: Rust requires unsafe blocks around raw pointer dereferences and Box::from_raw; OCaml's GC tracks all pointers and prohibits raw arithmetic entirely.
  • Memory reclamation: The example uses immediate Box::from_raw(h) to free dequeued nodes; in production, epoch-based reclamation (example 467) is needed to prevent use-after-free under concurrent dequeue.
  • ABA risk: Rust's simplified CAS on a raw pointer is susceptible to the ABA problem; OCaml's GC guarantees unique addresses for live objects, mitigating ABA for GC-managed nodes.
  • **Send/Sync proofs**: Rust demands explicit unsafe impl Send/Sync; OCaml's type system does not distinguish shared vs. owned data at the type level.
  • OCaml Approach

    Multicore OCaml's Saturn library provides Saturn.Queue (Michael-Scott) and Saturn.Single_prod_single_cons_queue. In pure OCaml the idiom is:

    (* Using Saturn *)
    let q = Saturn.Queue.create ()
    let () = Saturn.Queue.push q 42
    let v  = Saturn.Queue.pop_opt q  (* None | Some x *)
    

    Without Saturn, functional OCaml uses immutable persistent queues (two-list Okasaki queue) which are naturally thread-safe for reads but require atomic references for multi-producer use.

    Full Source

    #![allow(clippy::all)]
    // 468. Lock-free queue basics (Michael-Scott, simplified)
    use std::ptr;
    use std::sync::atomic::{AtomicPtr, Ordering};
    use std::sync::Arc;
    use std::thread;
    
    struct Node<T> {
        value: Option<T>,
        next: AtomicPtr<Node<T>>,
    }
    impl<T> Node<T> {
        fn new(v: Option<T>) -> *mut Self {
            Box::into_raw(Box::new(Node {
                value: v,
                next: AtomicPtr::new(ptr::null_mut()),
            }))
        }
    }
    
    pub struct Queue<T> {
        head: AtomicPtr<Node<T>>,
        tail: AtomicPtr<Node<T>>,
    }
    unsafe impl<T: Send> Send for Queue<T> {}
    unsafe impl<T: Send> Sync for Queue<T> {}
    
    impl<T> Queue<T> {
        pub fn new() -> Self {
            let d = Node::new(None);
            Queue {
                head: AtomicPtr::new(d),
                tail: AtomicPtr::new(d),
            }
        }
        pub fn enqueue(&self, v: T) {
            let n = Node::new(Some(v));
            loop {
                let t = self.tail.load(Ordering::Acquire);
                let next = unsafe { (*t).next.load(Ordering::Acquire) };
                if next.is_null() {
                    match unsafe {
                        (*t).next.compare_exchange_weak(
                            ptr::null_mut(),
                            n,
                            Ordering::Release,
                            Ordering::Relaxed,
                        )
                    } {
                        Ok(_) => {
                            let _ =
                                self.tail
                                    .compare_exchange(t, n, Ordering::Release, Ordering::Relaxed);
                            return;
                        }
                        Err(_) => {}
                    }
                } else {
                    let _ = self
                        .tail
                        .compare_exchange(t, next, Ordering::Release, Ordering::Relaxed);
                }
            }
        }
        pub fn dequeue(&self) -> Option<T> {
            loop {
                let h = self.head.load(Ordering::Acquire);
                let t = self.tail.load(Ordering::Acquire);
                let next = unsafe { (*h).next.load(Ordering::Acquire) };
                if h == t {
                    if next.is_null() {
                        return None;
                    }
                    let _ = self
                        .tail
                        .compare_exchange(t, next, Ordering::Release, Ordering::Relaxed);
                } else {
                    match self
                        .head
                        .compare_exchange_weak(h, next, Ordering::AcqRel, Ordering::Relaxed)
                    {
                        Ok(_) => {
                            let v = unsafe { ptr::read(&(*next).value) };
                            unsafe { drop(Box::from_raw(h)) };
                            return v;
                        }
                        Err(_) => {}
                    }
                }
            }
        }
    }
    impl<T> Drop for Queue<T> {
        fn drop(&mut self) {
            while self.dequeue().is_some() {}
            unsafe {
                drop(Box::from_raw(self.head.load(Ordering::Relaxed)));
            }
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        #[test]
        fn test_fifo() {
            let q = Queue::new();
            for i in 1..=5u32 {
                q.enqueue(i);
            }
            for i in 1..=5 {
                assert_eq!(q.dequeue(), Some(i));
            }
            assert_eq!(q.dequeue(), None);
        }
        #[test]
        fn test_concurrent() {
            let q = Arc::new(Queue::<u32>::new());
            thread::scope(|s| {
                for i in 0..4u32 {
                    let q = Arc::clone(&q);
                    s.spawn(move || {
                        for j in 0..25 {
                            q.enqueue(i * 25 + j);
                        }
                    });
                }
            });
            let mut c = 0;
            while q.dequeue().is_some() {
                c += 1;
            }
            assert_eq!(c, 100);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
        #[test]
        fn test_fifo() {
            let q = Queue::new();
            for i in 1..=5u32 {
                q.enqueue(i);
            }
            for i in 1..=5 {
                assert_eq!(q.dequeue(), Some(i));
            }
            assert_eq!(q.dequeue(), None);
        }
        #[test]
        fn test_concurrent() {
            let q = Arc::new(Queue::<u32>::new());
            thread::scope(|s| {
                for i in 0..4u32 {
                    let q = Arc::clone(&q);
                    s.spawn(move || {
                        for j in 0..25 {
                            q.enqueue(i * 25 + j);
                        }
                    });
                }
            });
            let mut c = 0;
            while q.dequeue().is_some() {
                c += 1;
            }
            assert_eq!(c, 100);
        }
    }

    Exercises

  • Epoch integration: Replace immediate node deallocation with EpochMgr::retire from example 467 to eliminate the use-after-free hazard.
  • Bounded queue: Add an AtomicUsize length counter; block enqueue when capacity is reached using a Condvar, turning the queue into a bounded channel.
  • Benchmark: Compare throughput of this lock-free queue against Mutex<VecDeque> under 4 producer / 4 consumer threads using criterion.
  • Open Source Repos