1 //! Michael-Scott lock-free queue.
2 //!
3 //! Usable with any number of producers and consumers.
4 //!
5 //! Michael and Scott.  Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
6 //! Algorithms.  PODC 1996.  http://dl.acm.org/citation.cfm?id=248106
7 //!
8 //! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a
9 //! Practical Lock-Free Queue Algorithm. https://doi.org/10.1007/978-3-540-30232-2_7
10 
11 use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
12 
13 use crossbeam_utils::CachePadded;
14 
15 use maybe_uninit::MaybeUninit;
16 
17 use {unprotected, Atomic, Guard, Owned, Shared};
18 
19 // The representation here is a singly-linked list, with a sentinel node at the front. In general
20 // the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or
21 // all `Blocked` (requests for data from blocked threads).
22 #[derive(Debug)]
23 pub struct Queue<T> {
24     head: CachePadded<Atomic<Node<T>>>,
25     tail: CachePadded<Atomic<Node<T>>>,
26 }
27 
28 struct Node<T> {
29     /// The slot in which a value of type `T` can be stored.
30     ///
31     /// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`.
32     /// For example, the sentinel node in a queue never contains a value: its slot is always empty.
33     /// Other nodes start their life with a push operation and contain a value until it gets popped
34     /// out. After that such empty nodes get added to the collector for destruction.
35     data: MaybeUninit<T>,
36 
37     next: Atomic<Node<T>>,
38 }
39 
40 // Any particular `T` should never be accessed concurrently, so no need for `Sync`.
41 unsafe impl<T: Send> Sync for Queue<T> {}
42 unsafe impl<T: Send> Send for Queue<T> {}
43 
44 impl<T> Queue<T> {
45     /// Create a new, empty queue.
new() -> Queue<T>46     pub fn new() -> Queue<T> {
47         let q = Queue {
48             head: CachePadded::new(Atomic::null()),
49             tail: CachePadded::new(Atomic::null()),
50         };
51         let sentinel = Owned::new(Node {
52             data: MaybeUninit::uninit(),
53             next: Atomic::null(),
54         });
55         unsafe {
56             let guard = &unprotected();
57             let sentinel = sentinel.into_shared(guard);
58             q.head.store(sentinel, Relaxed);
59             q.tail.store(sentinel, Relaxed);
60             q
61         }
62     }
63 
64     /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on
65     /// success. The queue's `tail` pointer may be updated.
66     #[inline(always)]
push_internal(&self, onto: Shared<Node<T>>, new: Shared<Node<T>>, guard: &Guard) -> bool67     fn push_internal(&self, onto: Shared<Node<T>>, new: Shared<Node<T>>, guard: &Guard) -> bool {
68         // is `onto` the actual tail?
69         let o = unsafe { onto.deref() };
70         let next = o.next.load(Acquire, guard);
71         if unsafe { next.as_ref().is_some() } {
72             // if not, try to "help" by moving the tail pointer forward
73             let _ = self.tail.compare_and_set(onto, next, Release, guard);
74             false
75         } else {
76             // looks like the actual tail; attempt to link in `n`
77             let result = o
78                 .next
79                 .compare_and_set(Shared::null(), new, Release, guard)
80                 .is_ok();
81             if result {
82                 // try to move the tail pointer forward
83                 let _ = self.tail.compare_and_set(onto, new, Release, guard);
84             }
85             result
86         }
87     }
88 
89     /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`.
push(&self, t: T, guard: &Guard)90     pub fn push(&self, t: T, guard: &Guard) {
91         let new = Owned::new(Node {
92             data: MaybeUninit::new(t),
93             next: Atomic::null(),
94         });
95         let new = Owned::into_shared(new, guard);
96 
97         loop {
98             // We push onto the tail, so we'll start optimistically by looking there first.
99             let tail = self.tail.load(Acquire, guard);
100 
101             // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
102             if self.push_internal(tail, new, guard) {
103                 break;
104             }
105         }
106     }
107 
108     /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop.
109     #[inline(always)]
pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()>110     fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
111         let head = self.head.load(Acquire, guard);
112         let h = unsafe { head.deref() };
113         let next = h.next.load(Acquire, guard);
114         match unsafe { next.as_ref() } {
115             Some(n) => unsafe {
116                 self.head
117                     .compare_and_set(head, next, Release, guard)
118                     .map(|_| {
119                         let tail = self.tail.load(Relaxed, guard);
120                         // Advance the tail so that we don't retire a pointer to a reachable node.
121                         if head == tail {
122                             let _ = self.tail.compare_and_set(tail, next, Release, guard);
123                         }
124                         guard.defer_destroy(head);
125                         // TODO: Replace with MaybeUninit::read when api is stable
126                         Some(n.data.as_ptr().read())
127                     })
128                     .map_err(|_| ())
129             },
130             None => Ok(None),
131         }
132     }
133 
134     /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue
135     /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop.
136     #[inline(always)]
pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> where T: Sync, F: Fn(&T) -> bool,137     fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()>
138     where
139         T: Sync,
140         F: Fn(&T) -> bool,
141     {
142         let head = self.head.load(Acquire, guard);
143         let h = unsafe { head.deref() };
144         let next = h.next.load(Acquire, guard);
145         match unsafe { next.as_ref() } {
146             Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe {
147                 self.head
148                     .compare_and_set(head, next, Release, guard)
149                     .map(|_| {
150                         let tail = self.tail.load(Relaxed, guard);
151                         // Advance the tail so that we don't retire a pointer to a reachable node.
152                         if head == tail {
153                             let _ = self.tail.compare_and_set(tail, next, Release, guard);
154                         }
155                         guard.defer_destroy(head);
156                         Some(n.data.as_ptr().read())
157                     })
158                     .map_err(|_| ())
159             },
160             None | Some(_) => Ok(None),
161         }
162     }
163 
164     /// Attempts to dequeue from the front.
165     ///
166     /// Returns `None` if the queue is observed to be empty.
try_pop(&self, guard: &Guard) -> Option<T>167     pub fn try_pop(&self, guard: &Guard) -> Option<T> {
168         loop {
169             if let Ok(head) = self.pop_internal(guard) {
170                 return head;
171             }
172         }
173     }
174 
175     /// Attempts to dequeue from the front, if the item satisfies the given condition.
176     ///
177     /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given
178     /// condition.
try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> where T: Sync, F: Fn(&T) -> bool,179     pub fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T>
180     where
181         T: Sync,
182         F: Fn(&T) -> bool,
183     {
184         loop {
185             if let Ok(head) = self.pop_if_internal(&condition, guard) {
186                 return head;
187             }
188         }
189     }
190 }
191 
192 impl<T> Drop for Queue<T> {
drop(&mut self)193     fn drop(&mut self) {
194         unsafe {
195             let guard = &unprotected();
196 
197             while let Some(_) = self.try_pop(guard) {}
198 
199             // Destroy the remaining sentinel node.
200             let sentinel = self.head.load(Relaxed, guard);
201             drop(sentinel.into_owned());
202         }
203     }
204 }
205 
206 #[cfg(test)]
207 mod test {
208     use super::*;
209     use crossbeam_utils::thread;
210     use pin;
211 
212     struct Queue<T> {
213         queue: super::Queue<T>,
214     }
215 
216     impl<T> Queue<T> {
new() -> Queue<T>217         pub fn new() -> Queue<T> {
218             Queue {
219                 queue: super::Queue::new(),
220             }
221         }
222 
push(&self, t: T)223         pub fn push(&self, t: T) {
224             let guard = &pin();
225             self.queue.push(t, guard);
226         }
227 
is_empty(&self) -> bool228         pub fn is_empty(&self) -> bool {
229             let guard = &pin();
230             let head = self.queue.head.load(Acquire, guard);
231             let h = unsafe { head.deref() };
232             h.next.load(Acquire, guard).is_null()
233         }
234 
try_pop(&self) -> Option<T>235         pub fn try_pop(&self) -> Option<T> {
236             let guard = &pin();
237             self.queue.try_pop(guard)
238         }
239 
pop(&self) -> T240         pub fn pop(&self) -> T {
241             loop {
242                 match self.try_pop() {
243                     None => continue,
244                     Some(t) => return t,
245                 }
246             }
247         }
248     }
249 
250     const CONC_COUNT: i64 = 1000000;
251 
252     #[test]
push_try_pop_1()253     fn push_try_pop_1() {
254         let q: Queue<i64> = Queue::new();
255         assert!(q.is_empty());
256         q.push(37);
257         assert!(!q.is_empty());
258         assert_eq!(q.try_pop(), Some(37));
259         assert!(q.is_empty());
260     }
261 
262     #[test]
push_try_pop_2()263     fn push_try_pop_2() {
264         let q: Queue<i64> = Queue::new();
265         assert!(q.is_empty());
266         q.push(37);
267         q.push(48);
268         assert_eq!(q.try_pop(), Some(37));
269         assert!(!q.is_empty());
270         assert_eq!(q.try_pop(), Some(48));
271         assert!(q.is_empty());
272     }
273 
274     #[test]
push_try_pop_many_seq()275     fn push_try_pop_many_seq() {
276         let q: Queue<i64> = Queue::new();
277         assert!(q.is_empty());
278         for i in 0..200 {
279             q.push(i)
280         }
281         assert!(!q.is_empty());
282         for i in 0..200 {
283             assert_eq!(q.try_pop(), Some(i));
284         }
285         assert!(q.is_empty());
286     }
287 
288     #[test]
push_pop_1()289     fn push_pop_1() {
290         let q: Queue<i64> = Queue::new();
291         assert!(q.is_empty());
292         q.push(37);
293         assert!(!q.is_empty());
294         assert_eq!(q.pop(), 37);
295         assert!(q.is_empty());
296     }
297 
298     #[test]
push_pop_2()299     fn push_pop_2() {
300         let q: Queue<i64> = Queue::new();
301         q.push(37);
302         q.push(48);
303         assert_eq!(q.pop(), 37);
304         assert_eq!(q.pop(), 48);
305     }
306 
307     #[test]
push_pop_many_seq()308     fn push_pop_many_seq() {
309         let q: Queue<i64> = Queue::new();
310         assert!(q.is_empty());
311         for i in 0..200 {
312             q.push(i)
313         }
314         assert!(!q.is_empty());
315         for i in 0..200 {
316             assert_eq!(q.pop(), i);
317         }
318         assert!(q.is_empty());
319     }
320 
321     #[test]
push_try_pop_many_spsc()322     fn push_try_pop_many_spsc() {
323         let q: Queue<i64> = Queue::new();
324         assert!(q.is_empty());
325 
326         thread::scope(|scope| {
327             scope.spawn(|_| {
328                 let mut next = 0;
329 
330                 while next < CONC_COUNT {
331                     if let Some(elem) = q.try_pop() {
332                         assert_eq!(elem, next);
333                         next += 1;
334                     }
335                 }
336             });
337 
338             for i in 0..CONC_COUNT {
339                 q.push(i)
340             }
341         })
342         .unwrap();
343     }
344 
345     #[test]
push_try_pop_many_spmc()346     fn push_try_pop_many_spmc() {
347         fn recv(_t: i32, q: &Queue<i64>) {
348             let mut cur = -1;
349             for _i in 0..CONC_COUNT {
350                 if let Some(elem) = q.try_pop() {
351                     assert!(elem > cur);
352                     cur = elem;
353 
354                     if cur == CONC_COUNT - 1 {
355                         break;
356                     }
357                 }
358             }
359         }
360 
361         let q: Queue<i64> = Queue::new();
362         assert!(q.is_empty());
363         thread::scope(|scope| {
364             for i in 0..3 {
365                 let q = &q;
366                 scope.spawn(move |_| recv(i, q));
367             }
368 
369             scope.spawn(|_| {
370                 for i in 0..CONC_COUNT {
371                     q.push(i);
372                 }
373             });
374         })
375         .unwrap();
376     }
377 
378     #[test]
push_try_pop_many_mpmc()379     fn push_try_pop_many_mpmc() {
380         enum LR {
381             Left(i64),
382             Right(i64),
383         }
384 
385         let q: Queue<LR> = Queue::new();
386         assert!(q.is_empty());
387 
388         thread::scope(|scope| {
389             for _t in 0..2 {
390                 scope.spawn(|_| {
391                     for i in CONC_COUNT - 1..CONC_COUNT {
392                         q.push(LR::Left(i))
393                     }
394                 });
395                 scope.spawn(|_| {
396                     for i in CONC_COUNT - 1..CONC_COUNT {
397                         q.push(LR::Right(i))
398                     }
399                 });
400                 scope.spawn(|_| {
401                     let mut vl = vec![];
402                     let mut vr = vec![];
403                     for _i in 0..CONC_COUNT {
404                         match q.try_pop() {
405                             Some(LR::Left(x)) => vl.push(x),
406                             Some(LR::Right(x)) => vr.push(x),
407                             _ => {}
408                         }
409                     }
410 
411                     let mut vl2 = vl.clone();
412                     let mut vr2 = vr.clone();
413                     vl2.sort();
414                     vr2.sort();
415 
416                     assert_eq!(vl, vl2);
417                     assert_eq!(vr, vr2);
418                 });
419             }
420         })
421         .unwrap();
422     }
423 
424     #[test]
push_pop_many_spsc()425     fn push_pop_many_spsc() {
426         let q: Queue<i64> = Queue::new();
427 
428         thread::scope(|scope| {
429             scope.spawn(|_| {
430                 let mut next = 0;
431                 while next < CONC_COUNT {
432                     assert_eq!(q.pop(), next);
433                     next += 1;
434                 }
435             });
436 
437             for i in 0..CONC_COUNT {
438                 q.push(i)
439             }
440         })
441         .unwrap();
442         assert!(q.is_empty());
443     }
444 
445     #[test]
is_empty_dont_pop()446     fn is_empty_dont_pop() {
447         let q: Queue<i64> = Queue::new();
448         q.push(20);
449         q.push(20);
450         assert!(!q.is_empty());
451         assert!(!q.is_empty());
452         assert!(q.try_pop().is_some());
453     }
454 }
455