1 //! Michael-Scott lock-free queue.
2 //!
3 //! Usable with any number of producers and consumers.
4 //!
5 //! Michael and Scott.  Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
6 //! Algorithms.  PODC 1996.  <http://dl.acm.org/citation.cfm?id=248106>
7 //!
8 //! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a
9 //! Practical Lock-Free Queue Algorithm. <https://doi.org/10.1007/978-3-540-30232-2_7>
10 
11 use core::mem::MaybeUninit;
12 use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
13 
14 use crossbeam_utils::CachePadded;
15 
16 use crate::{unprotected, Atomic, Guard, Owned, Shared};
17 
18 // The representation here is a singly-linked list, with a sentinel node at the front. In general
19 // the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or
20 // all `Blocked` (requests for data from blocked threads).
21 #[derive(Debug)]
22 pub(crate) struct Queue<T> {
23     head: CachePadded<Atomic<Node<T>>>,
24     tail: CachePadded<Atomic<Node<T>>>,
25 }
26 
27 struct Node<T> {
28     /// The slot in which a value of type `T` can be stored.
29     ///
30     /// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`.
31     /// For example, the sentinel node in a queue never contains a value: its slot is always empty.
32     /// Other nodes start their life with a push operation and contain a value until it gets popped
33     /// out. After that such empty nodes get added to the collector for destruction.
34     data: MaybeUninit<T>,
35 
36     next: Atomic<Node<T>>,
37 }
38 
39 // Any particular `T` should never be accessed concurrently, so no need for `Sync`.
40 unsafe impl<T: Send> Sync for Queue<T> {}
41 unsafe impl<T: Send> Send for Queue<T> {}
42 
43 impl<T> Queue<T> {
44     /// Create a new, empty queue.
new() -> Queue<T>45     pub(crate) fn new() -> Queue<T> {
46         let q = Queue {
47             head: CachePadded::new(Atomic::null()),
48             tail: CachePadded::new(Atomic::null()),
49         };
50         let sentinel = Owned::new(Node {
51             data: MaybeUninit::uninit(),
52             next: Atomic::null(),
53         });
54         unsafe {
55             let guard = unprotected();
56             let sentinel = sentinel.into_shared(guard);
57             q.head.store(sentinel, Relaxed);
58             q.tail.store(sentinel, Relaxed);
59             q
60         }
61     }
62 
63     /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on
64     /// success. The queue's `tail` pointer may be updated.
65     #[inline(always)]
push_internal( &self, onto: Shared<'_, Node<T>>, new: Shared<'_, Node<T>>, guard: &Guard, ) -> bool66     fn push_internal(
67         &self,
68         onto: Shared<'_, Node<T>>,
69         new: Shared<'_, Node<T>>,
70         guard: &Guard,
71     ) -> bool {
72         // is `onto` the actual tail?
73         let o = unsafe { onto.deref() };
74         let next = o.next.load(Acquire, guard);
75         if unsafe { next.as_ref().is_some() } {
76             // if not, try to "help" by moving the tail pointer forward
77             let _ = self
78                 .tail
79                 .compare_exchange(onto, next, Release, Relaxed, guard);
80             false
81         } else {
82             // looks like the actual tail; attempt to link in `n`
83             let result = o
84                 .next
85                 .compare_exchange(Shared::null(), new, Release, Relaxed, guard)
86                 .is_ok();
87             if result {
88                 // try to move the tail pointer forward
89                 let _ = self
90                     .tail
91                     .compare_exchange(onto, new, Release, Relaxed, guard);
92             }
93             result
94         }
95     }
96 
97     /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`.
push(&self, t: T, guard: &Guard)98     pub(crate) fn push(&self, t: T, guard: &Guard) {
99         let new = Owned::new(Node {
100             data: MaybeUninit::new(t),
101             next: Atomic::null(),
102         });
103         let new = Owned::into_shared(new, guard);
104 
105         loop {
106             // We push onto the tail, so we'll start optimistically by looking there first.
107             let tail = self.tail.load(Acquire, guard);
108 
109             // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
110             if self.push_internal(tail, new, guard) {
111                 break;
112             }
113         }
114     }
115 
116     /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop.
117     #[inline(always)]
pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()>118     fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
119         let head = self.head.load(Acquire, guard);
120         let h = unsafe { head.deref() };
121         let next = h.next.load(Acquire, guard);
122         match unsafe { next.as_ref() } {
123             Some(n) => unsafe {
124                 self.head
125                     .compare_exchange(head, next, Release, Relaxed, guard)
126                     .map(|_| {
127                         let tail = self.tail.load(Relaxed, guard);
128                         // Advance the tail so that we don't retire a pointer to a reachable node.
129                         if head == tail {
130                             let _ = self
131                                 .tail
132                                 .compare_exchange(tail, next, Release, Relaxed, guard);
133                         }
134                         guard.defer_destroy(head);
135                         // TODO: Replace with MaybeUninit::read when api is stable
136                         Some(n.data.as_ptr().read())
137                     })
138                     .map_err(|_| ())
139             },
140             None => Ok(None),
141         }
142     }
143 
144     /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue
145     /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop.
146     #[inline(always)]
pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> where T: Sync, F: Fn(&T) -> bool,147     fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()>
148     where
149         T: Sync,
150         F: Fn(&T) -> bool,
151     {
152         let head = self.head.load(Acquire, guard);
153         let h = unsafe { head.deref() };
154         let next = h.next.load(Acquire, guard);
155         match unsafe { next.as_ref() } {
156             Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe {
157                 self.head
158                     .compare_exchange(head, next, Release, Relaxed, guard)
159                     .map(|_| {
160                         let tail = self.tail.load(Relaxed, guard);
161                         // Advance the tail so that we don't retire a pointer to a reachable node.
162                         if head == tail {
163                             let _ = self
164                                 .tail
165                                 .compare_exchange(tail, next, Release, Relaxed, guard);
166                         }
167                         guard.defer_destroy(head);
168                         Some(n.data.as_ptr().read())
169                     })
170                     .map_err(|_| ())
171             },
172             None | Some(_) => Ok(None),
173         }
174     }
175 
176     /// Attempts to dequeue from the front.
177     ///
178     /// Returns `None` if the queue is observed to be empty.
try_pop(&self, guard: &Guard) -> Option<T>179     pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> {
180         loop {
181             if let Ok(head) = self.pop_internal(guard) {
182                 return head;
183             }
184         }
185     }
186 
187     /// Attempts to dequeue from the front, if the item satisfies the given condition.
188     ///
189     /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given
190     /// condition.
try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> where T: Sync, F: Fn(&T) -> bool,191     pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T>
192     where
193         T: Sync,
194         F: Fn(&T) -> bool,
195     {
196         loop {
197             if let Ok(head) = self.pop_if_internal(&condition, guard) {
198                 return head;
199             }
200         }
201     }
202 }
203 
204 impl<T> Drop for Queue<T> {
drop(&mut self)205     fn drop(&mut self) {
206         unsafe {
207             let guard = unprotected();
208 
209             while self.try_pop(guard).is_some() {}
210 
211             // Destroy the remaining sentinel node.
212             let sentinel = self.head.load(Relaxed, guard);
213             drop(sentinel.into_owned());
214         }
215     }
216 }
217 
218 #[cfg(all(test, not(crossbeam_loom)))]
219 mod test {
220     use super::*;
221     use crate::pin;
222     use crossbeam_utils::thread;
223 
224     struct Queue<T> {
225         queue: super::Queue<T>,
226     }
227 
228     impl<T> Queue<T> {
new() -> Queue<T>229         pub(crate) fn new() -> Queue<T> {
230             Queue {
231                 queue: super::Queue::new(),
232             }
233         }
234 
push(&self, t: T)235         pub(crate) fn push(&self, t: T) {
236             let guard = &pin();
237             self.queue.push(t, guard);
238         }
239 
is_empty(&self) -> bool240         pub(crate) fn is_empty(&self) -> bool {
241             let guard = &pin();
242             let head = self.queue.head.load(Acquire, guard);
243             let h = unsafe { head.deref() };
244             h.next.load(Acquire, guard).is_null()
245         }
246 
try_pop(&self) -> Option<T>247         pub(crate) fn try_pop(&self) -> Option<T> {
248             let guard = &pin();
249             self.queue.try_pop(guard)
250         }
251 
pop(&self) -> T252         pub(crate) fn pop(&self) -> T {
253             loop {
254                 match self.try_pop() {
255                     None => continue,
256                     Some(t) => return t,
257                 }
258             }
259         }
260     }
261 
262     #[cfg(miri)]
263     const CONC_COUNT: i64 = 1000;
264     #[cfg(not(miri))]
265     const CONC_COUNT: i64 = 1000000;
266 
267     #[test]
push_try_pop_1()268     fn push_try_pop_1() {
269         let q: Queue<i64> = Queue::new();
270         assert!(q.is_empty());
271         q.push(37);
272         assert!(!q.is_empty());
273         assert_eq!(q.try_pop(), Some(37));
274         assert!(q.is_empty());
275     }
276 
277     #[test]
push_try_pop_2()278     fn push_try_pop_2() {
279         let q: Queue<i64> = Queue::new();
280         assert!(q.is_empty());
281         q.push(37);
282         q.push(48);
283         assert_eq!(q.try_pop(), Some(37));
284         assert!(!q.is_empty());
285         assert_eq!(q.try_pop(), Some(48));
286         assert!(q.is_empty());
287     }
288 
289     #[test]
push_try_pop_many_seq()290     fn push_try_pop_many_seq() {
291         let q: Queue<i64> = Queue::new();
292         assert!(q.is_empty());
293         for i in 0..200 {
294             q.push(i)
295         }
296         assert!(!q.is_empty());
297         for i in 0..200 {
298             assert_eq!(q.try_pop(), Some(i));
299         }
300         assert!(q.is_empty());
301     }
302 
303     #[test]
push_pop_1()304     fn push_pop_1() {
305         let q: Queue<i64> = Queue::new();
306         assert!(q.is_empty());
307         q.push(37);
308         assert!(!q.is_empty());
309         assert_eq!(q.pop(), 37);
310         assert!(q.is_empty());
311     }
312 
313     #[test]
push_pop_2()314     fn push_pop_2() {
315         let q: Queue<i64> = Queue::new();
316         q.push(37);
317         q.push(48);
318         assert_eq!(q.pop(), 37);
319         assert_eq!(q.pop(), 48);
320     }
321 
322     #[test]
push_pop_many_seq()323     fn push_pop_many_seq() {
324         let q: Queue<i64> = Queue::new();
325         assert!(q.is_empty());
326         for i in 0..200 {
327             q.push(i)
328         }
329         assert!(!q.is_empty());
330         for i in 0..200 {
331             assert_eq!(q.pop(), i);
332         }
333         assert!(q.is_empty());
334     }
335 
336     #[test]
push_try_pop_many_spsc()337     fn push_try_pop_many_spsc() {
338         let q: Queue<i64> = Queue::new();
339         assert!(q.is_empty());
340 
341         thread::scope(|scope| {
342             scope.spawn(|_| {
343                 let mut next = 0;
344 
345                 while next < CONC_COUNT {
346                     if let Some(elem) = q.try_pop() {
347                         assert_eq!(elem, next);
348                         next += 1;
349                     }
350                 }
351             });
352 
353             for i in 0..CONC_COUNT {
354                 q.push(i)
355             }
356         })
357         .unwrap();
358     }
359 
360     #[test]
push_try_pop_many_spmc()361     fn push_try_pop_many_spmc() {
362         fn recv(_t: i32, q: &Queue<i64>) {
363             let mut cur = -1;
364             for _i in 0..CONC_COUNT {
365                 if let Some(elem) = q.try_pop() {
366                     assert!(elem > cur);
367                     cur = elem;
368 
369                     if cur == CONC_COUNT - 1 {
370                         break;
371                     }
372                 }
373             }
374         }
375 
376         let q: Queue<i64> = Queue::new();
377         assert!(q.is_empty());
378         thread::scope(|scope| {
379             for i in 0..3 {
380                 let q = &q;
381                 scope.spawn(move |_| recv(i, q));
382             }
383 
384             scope.spawn(|_| {
385                 for i in 0..CONC_COUNT {
386                     q.push(i);
387                 }
388             });
389         })
390         .unwrap();
391     }
392 
393     #[test]
push_try_pop_many_mpmc()394     fn push_try_pop_many_mpmc() {
395         enum LR {
396             Left(i64),
397             Right(i64),
398         }
399 
400         let q: Queue<LR> = Queue::new();
401         assert!(q.is_empty());
402 
403         thread::scope(|scope| {
404             for _t in 0..2 {
405                 scope.spawn(|_| {
406                     for i in CONC_COUNT - 1..CONC_COUNT {
407                         q.push(LR::Left(i))
408                     }
409                 });
410                 scope.spawn(|_| {
411                     for i in CONC_COUNT - 1..CONC_COUNT {
412                         q.push(LR::Right(i))
413                     }
414                 });
415                 scope.spawn(|_| {
416                     let mut vl = vec![];
417                     let mut vr = vec![];
418                     for _i in 0..CONC_COUNT {
419                         match q.try_pop() {
420                             Some(LR::Left(x)) => vl.push(x),
421                             Some(LR::Right(x)) => vr.push(x),
422                             _ => {}
423                         }
424                     }
425 
426                     let mut vl2 = vl.clone();
427                     let mut vr2 = vr.clone();
428                     vl2.sort_unstable();
429                     vr2.sort_unstable();
430 
431                     assert_eq!(vl, vl2);
432                     assert_eq!(vr, vr2);
433                 });
434             }
435         })
436         .unwrap();
437     }
438 
439     #[test]
push_pop_many_spsc()440     fn push_pop_many_spsc() {
441         let q: Queue<i64> = Queue::new();
442 
443         thread::scope(|scope| {
444             scope.spawn(|_| {
445                 let mut next = 0;
446                 while next < CONC_COUNT {
447                     assert_eq!(q.pop(), next);
448                     next += 1;
449                 }
450             });
451 
452             for i in 0..CONC_COUNT {
453                 q.push(i)
454             }
455         })
456         .unwrap();
457         assert!(q.is_empty());
458     }
459 
460     #[test]
is_empty_dont_pop()461     fn is_empty_dont_pop() {
462         let q: Queue<i64> = Queue::new();
463         q.push(20);
464         q.push(20);
465         assert!(!q.is_empty());
466         assert!(!q.is_empty());
467         assert!(q.try_pop().is_some());
468     }
469 }
470