1 //! Michael-Scott lock-free queue.
2 //!
3 //! Usable with any number of producers and consumers.
4 //!
5 //! Michael and Scott.  Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
6 //! Algorithms.  PODC 1996.  <http://dl.acm.org/citation.cfm?id=248106>
7 //!
8 //! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a
9 //! Practical Lock-Free Queue Algorithm. <https://doi.org/10.1007/978-3-540-30232-2_7>
10 
11 use core::mem::MaybeUninit;
12 use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
13 
14 use crossbeam_utils::CachePadded;
15 
16 use crate::{unprotected, Atomic, Guard, Owned, Shared};
17 
18 // The representation here is a singly-linked list, with a sentinel node at the front. In general
19 // the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or
20 // all `Blocked` (requests for data from blocked threads).
21 #[derive(Debug)]
22 pub(crate) struct Queue<T> {
23     head: CachePadded<Atomic<Node<T>>>,
24     tail: CachePadded<Atomic<Node<T>>>,
25 }
26 
27 struct Node<T> {
28     /// The slot in which a value of type `T` can be stored.
29     ///
30     /// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`.
31     /// For example, the sentinel node in a queue never contains a value: its slot is always empty.
32     /// Other nodes start their life with a push operation and contain a value until it gets popped
33     /// out. After that such empty nodes get added to the collector for destruction.
34     data: MaybeUninit<T>,
35 
36     next: Atomic<Node<T>>,
37 }
38 
39 // Any particular `T` should never be accessed concurrently, so no need for `Sync`.
40 unsafe impl<T: Send> Sync for Queue<T> {}
41 unsafe impl<T: Send> Send for Queue<T> {}
42 
43 impl<T> Queue<T> {
44     /// Create a new, empty queue.
new() -> Queue<T>45     pub(crate) fn new() -> Queue<T> {
46         let q = Queue {
47             head: CachePadded::new(Atomic::null()),
48             tail: CachePadded::new(Atomic::null()),
49         };
50         let sentinel = Owned::new(Node {
51             data: MaybeUninit::uninit(),
52             next: Atomic::null(),
53         });
54         unsafe {
55             let guard = unprotected();
56             let sentinel = sentinel.into_shared(guard);
57             q.head.store(sentinel, Relaxed);
58             q.tail.store(sentinel, Relaxed);
59             q
60         }
61     }
62 
63     /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on
64     /// success. The queue's `tail` pointer may be updated.
65     #[inline(always)]
push_internal( &self, onto: Shared<'_, Node<T>>, new: Shared<'_, Node<T>>, guard: &Guard, ) -> bool66     fn push_internal(
67         &self,
68         onto: Shared<'_, Node<T>>,
69         new: Shared<'_, Node<T>>,
70         guard: &Guard,
71     ) -> bool {
72         // is `onto` the actual tail?
73         let o = unsafe { onto.deref() };
74         let next = o.next.load(Acquire, guard);
75         if unsafe { next.as_ref().is_some() } {
76             // if not, try to "help" by moving the tail pointer forward
77             let _ = self
78                 .tail
79                 .compare_exchange(onto, next, Release, Relaxed, guard);
80             false
81         } else {
82             // looks like the actual tail; attempt to link in `n`
83             let result = o
84                 .next
85                 .compare_exchange(Shared::null(), new, Release, Relaxed, guard)
86                 .is_ok();
87             if result {
88                 // try to move the tail pointer forward
89                 let _ = self
90                     .tail
91                     .compare_exchange(onto, new, Release, Relaxed, guard);
92             }
93             result
94         }
95     }
96 
97     /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`.
push(&self, t: T, guard: &Guard)98     pub(crate) fn push(&self, t: T, guard: &Guard) {
99         let new = Owned::new(Node {
100             data: MaybeUninit::new(t),
101             next: Atomic::null(),
102         });
103         let new = Owned::into_shared(new, guard);
104 
105         loop {
106             // We push onto the tail, so we'll start optimistically by looking there first.
107             let tail = self.tail.load(Acquire, guard);
108 
109             // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
110             if self.push_internal(tail, new, guard) {
111                 break;
112             }
113         }
114     }
115 
116     /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop.
117     #[inline(always)]
pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()>118     fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
119         let head = self.head.load(Acquire, guard);
120         let h = unsafe { head.deref() };
121         let next = h.next.load(Acquire, guard);
122         match unsafe { next.as_ref() } {
123             Some(n) => unsafe {
124                 self.head
125                     .compare_exchange(head, next, Release, Relaxed, guard)
126                     .map(|_| {
127                         let tail = self.tail.load(Relaxed, guard);
128                         // Advance the tail so that we don't retire a pointer to a reachable node.
129                         if head == tail {
130                             let _ = self
131                                 .tail
132                                 .compare_exchange(tail, next, Release, Relaxed, guard);
133                         }
134                         guard.defer_destroy(head);
135                         // TODO: Replace with MaybeUninit::read when api is stable
136                         Some(n.data.as_ptr().read())
137                     })
138                     .map_err(|_| ())
139             },
140             None => Ok(None),
141         }
142     }
143 
144     /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue
145     /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop.
146     #[inline(always)]
pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> where T: Sync, F: Fn(&T) -> bool,147     fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()>
148     where
149         T: Sync,
150         F: Fn(&T) -> bool,
151     {
152         let head = self.head.load(Acquire, guard);
153         let h = unsafe { head.deref() };
154         let next = h.next.load(Acquire, guard);
155         match unsafe { next.as_ref() } {
156             Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe {
157                 self.head
158                     .compare_exchange(head, next, Release, Relaxed, guard)
159                     .map(|_| {
160                         let tail = self.tail.load(Relaxed, guard);
161                         // Advance the tail so that we don't retire a pointer to a reachable node.
162                         if head == tail {
163                             let _ = self
164                                 .tail
165                                 .compare_exchange(tail, next, Release, Relaxed, guard);
166                         }
167                         guard.defer_destroy(head);
168                         Some(n.data.as_ptr().read())
169                     })
170                     .map_err(|_| ())
171             },
172             None | Some(_) => Ok(None),
173         }
174     }
175 
176     /// Attempts to dequeue from the front.
177     ///
178     /// Returns `None` if the queue is observed to be empty.
try_pop(&self, guard: &Guard) -> Option<T>179     pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> {
180         loop {
181             if let Ok(head) = self.pop_internal(guard) {
182                 return head;
183             }
184         }
185     }
186 
187     /// Attempts to dequeue from the front, if the item satisfies the given condition.
188     ///
189     /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given
190     /// condition.
try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> where T: Sync, F: Fn(&T) -> bool,191     pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T>
192     where
193         T: Sync,
194         F: Fn(&T) -> bool,
195     {
196         loop {
197             if let Ok(head) = self.pop_if_internal(&condition, guard) {
198                 return head;
199             }
200         }
201     }
202 }
203 
204 impl<T> Drop for Queue<T> {
drop(&mut self)205     fn drop(&mut self) {
206         unsafe {
207             let guard = unprotected();
208 
209             while self.try_pop(guard).is_some() {}
210 
211             // Destroy the remaining sentinel node.
212             let sentinel = self.head.load(Relaxed, guard);
213             drop(sentinel.into_owned());
214         }
215     }
216 }
217 
218 #[cfg(all(test, not(crossbeam_loom)))]
219 mod test {
220     use super::*;
221     use crate::pin;
222     use crossbeam_utils::thread;
223 
224     struct Queue<T> {
225         queue: super::Queue<T>,
226     }
227 
228     impl<T> Queue<T> {
new() -> Queue<T>229         pub(crate) fn new() -> Queue<T> {
230             Queue {
231                 queue: super::Queue::new(),
232             }
233         }
234 
push(&self, t: T)235         pub(crate) fn push(&self, t: T) {
236             let guard = &pin();
237             self.queue.push(t, guard);
238         }
239 
is_empty(&self) -> bool240         pub(crate) fn is_empty(&self) -> bool {
241             let guard = &pin();
242             let head = self.queue.head.load(Acquire, guard);
243             let h = unsafe { head.deref() };
244             h.next.load(Acquire, guard).is_null()
245         }
246 
try_pop(&self) -> Option<T>247         pub(crate) fn try_pop(&self) -> Option<T> {
248             let guard = &pin();
249             self.queue.try_pop(guard)
250         }
251 
pop(&self) -> T252         pub(crate) fn pop(&self) -> T {
253             loop {
254                 match self.try_pop() {
255                     None => continue,
256                     Some(t) => return t,
257                 }
258             }
259         }
260     }
261 
262     const CONC_COUNT: i64 = 1000000;
263 
264     #[test]
push_try_pop_1()265     fn push_try_pop_1() {
266         let q: Queue<i64> = Queue::new();
267         assert!(q.is_empty());
268         q.push(37);
269         assert!(!q.is_empty());
270         assert_eq!(q.try_pop(), Some(37));
271         assert!(q.is_empty());
272     }
273 
274     #[test]
push_try_pop_2()275     fn push_try_pop_2() {
276         let q: Queue<i64> = Queue::new();
277         assert!(q.is_empty());
278         q.push(37);
279         q.push(48);
280         assert_eq!(q.try_pop(), Some(37));
281         assert!(!q.is_empty());
282         assert_eq!(q.try_pop(), Some(48));
283         assert!(q.is_empty());
284     }
285 
286     #[test]
push_try_pop_many_seq()287     fn push_try_pop_many_seq() {
288         let q: Queue<i64> = Queue::new();
289         assert!(q.is_empty());
290         for i in 0..200 {
291             q.push(i)
292         }
293         assert!(!q.is_empty());
294         for i in 0..200 {
295             assert_eq!(q.try_pop(), Some(i));
296         }
297         assert!(q.is_empty());
298     }
299 
300     #[test]
push_pop_1()301     fn push_pop_1() {
302         let q: Queue<i64> = Queue::new();
303         assert!(q.is_empty());
304         q.push(37);
305         assert!(!q.is_empty());
306         assert_eq!(q.pop(), 37);
307         assert!(q.is_empty());
308     }
309 
310     #[test]
push_pop_2()311     fn push_pop_2() {
312         let q: Queue<i64> = Queue::new();
313         q.push(37);
314         q.push(48);
315         assert_eq!(q.pop(), 37);
316         assert_eq!(q.pop(), 48);
317     }
318 
319     #[test]
push_pop_many_seq()320     fn push_pop_many_seq() {
321         let q: Queue<i64> = Queue::new();
322         assert!(q.is_empty());
323         for i in 0..200 {
324             q.push(i)
325         }
326         assert!(!q.is_empty());
327         for i in 0..200 {
328             assert_eq!(q.pop(), i);
329         }
330         assert!(q.is_empty());
331     }
332 
333     #[test]
push_try_pop_many_spsc()334     fn push_try_pop_many_spsc() {
335         let q: Queue<i64> = Queue::new();
336         assert!(q.is_empty());
337 
338         thread::scope(|scope| {
339             scope.spawn(|_| {
340                 let mut next = 0;
341 
342                 while next < CONC_COUNT {
343                     if let Some(elem) = q.try_pop() {
344                         assert_eq!(elem, next);
345                         next += 1;
346                     }
347                 }
348             });
349 
350             for i in 0..CONC_COUNT {
351                 q.push(i)
352             }
353         })
354         .unwrap();
355     }
356 
357     #[test]
push_try_pop_many_spmc()358     fn push_try_pop_many_spmc() {
359         fn recv(_t: i32, q: &Queue<i64>) {
360             let mut cur = -1;
361             for _i in 0..CONC_COUNT {
362                 if let Some(elem) = q.try_pop() {
363                     assert!(elem > cur);
364                     cur = elem;
365 
366                     if cur == CONC_COUNT - 1 {
367                         break;
368                     }
369                 }
370             }
371         }
372 
373         let q: Queue<i64> = Queue::new();
374         assert!(q.is_empty());
375         thread::scope(|scope| {
376             for i in 0..3 {
377                 let q = &q;
378                 scope.spawn(move |_| recv(i, q));
379             }
380 
381             scope.spawn(|_| {
382                 for i in 0..CONC_COUNT {
383                     q.push(i);
384                 }
385             });
386         })
387         .unwrap();
388     }
389 
390     #[test]
push_try_pop_many_mpmc()391     fn push_try_pop_many_mpmc() {
392         enum LR {
393             Left(i64),
394             Right(i64),
395         }
396 
397         let q: Queue<LR> = Queue::new();
398         assert!(q.is_empty());
399 
400         thread::scope(|scope| {
401             for _t in 0..2 {
402                 scope.spawn(|_| {
403                     for i in CONC_COUNT - 1..CONC_COUNT {
404                         q.push(LR::Left(i))
405                     }
406                 });
407                 scope.spawn(|_| {
408                     for i in CONC_COUNT - 1..CONC_COUNT {
409                         q.push(LR::Right(i))
410                     }
411                 });
412                 scope.spawn(|_| {
413                     let mut vl = vec![];
414                     let mut vr = vec![];
415                     for _i in 0..CONC_COUNT {
416                         match q.try_pop() {
417                             Some(LR::Left(x)) => vl.push(x),
418                             Some(LR::Right(x)) => vr.push(x),
419                             _ => {}
420                         }
421                     }
422 
423                     let mut vl2 = vl.clone();
424                     let mut vr2 = vr.clone();
425                     vl2.sort();
426                     vr2.sort();
427 
428                     assert_eq!(vl, vl2);
429                     assert_eq!(vr, vr2);
430                 });
431             }
432         })
433         .unwrap();
434     }
435 
436     #[test]
push_pop_many_spsc()437     fn push_pop_many_spsc() {
438         let q: Queue<i64> = Queue::new();
439 
440         thread::scope(|scope| {
441             scope.spawn(|_| {
442                 let mut next = 0;
443                 while next < CONC_COUNT {
444                     assert_eq!(q.pop(), next);
445                     next += 1;
446                 }
447             });
448 
449             for i in 0..CONC_COUNT {
450                 q.push(i)
451             }
452         })
453         .unwrap();
454         assert!(q.is_empty());
455     }
456 
457     #[test]
is_empty_dont_pop()458     fn is_empty_dont_pop() {
459         let q: Queue<i64> = Queue::new();
460         q.push(20);
461         q.push(20);
462         assert!(!q.is_empty());
463         assert!(!q.is_empty());
464         assert!(q.try_pop().is_some());
465     }
466 }
467