1 //! Michael-Scott lock-free queue. 2 //! 3 //! Usable with any number of producers and consumers. 4 //! 5 //! Michael and Scott. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue 6 //! Algorithms. PODC 1996. <http://dl.acm.org/citation.cfm?id=248106> 7 //! 8 //! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a 9 //! Practical Lock-Free Queue Algorithm. <https://doi.org/10.1007/978-3-540-30232-2_7> 10 11 use core::mem::MaybeUninit; 12 use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; 13 14 use crossbeam_utils::CachePadded; 15 16 use crate::{unprotected, Atomic, Guard, Owned, Shared}; 17 18 // The representation here is a singly-linked list, with a sentinel node at the front. In general 19 // the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or 20 // all `Blocked` (requests for data from blocked threads). 21 #[derive(Debug)] 22 pub(crate) struct Queue<T> { 23 head: CachePadded<Atomic<Node<T>>>, 24 tail: CachePadded<Atomic<Node<T>>>, 25 } 26 27 struct Node<T> { 28 /// The slot in which a value of type `T` can be stored. 29 /// 30 /// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`. 31 /// For example, the sentinel node in a queue never contains a value: its slot is always empty. 32 /// Other nodes start their life with a push operation and contain a value until it gets popped 33 /// out. After that such empty nodes get added to the collector for destruction. 34 data: MaybeUninit<T>, 35 36 next: Atomic<Node<T>>, 37 } 38 39 // Any particular `T` should never be accessed concurrently, so no need for `Sync`. 40 unsafe impl<T: Send> Sync for Queue<T> {} 41 unsafe impl<T: Send> Send for Queue<T> {} 42 43 impl<T> Queue<T> { 44 /// Create a new, empty queue. new() -> Queue<T>45 pub(crate) fn new() -> Queue<T> { 46 let q = Queue { 47 head: CachePadded::new(Atomic::null()), 48 tail: CachePadded::new(Atomic::null()), 49 }; 50 let sentinel = Owned::new(Node { 51 data: MaybeUninit::uninit(), 52 next: Atomic::null(), 53 }); 54 unsafe { 55 let guard = unprotected(); 56 let sentinel = sentinel.into_shared(guard); 57 q.head.store(sentinel, Relaxed); 58 q.tail.store(sentinel, Relaxed); 59 q 60 } 61 } 62 63 /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on 64 /// success. The queue's `tail` pointer may be updated. 65 #[inline(always)] push_internal( &self, onto: Shared<'_, Node<T>>, new: Shared<'_, Node<T>>, guard: &Guard, ) -> bool66 fn push_internal( 67 &self, 68 onto: Shared<'_, Node<T>>, 69 new: Shared<'_, Node<T>>, 70 guard: &Guard, 71 ) -> bool { 72 // is `onto` the actual tail? 73 let o = unsafe { onto.deref() }; 74 let next = o.next.load(Acquire, guard); 75 if unsafe { next.as_ref().is_some() } { 76 // if not, try to "help" by moving the tail pointer forward 77 let _ = self 78 .tail 79 .compare_exchange(onto, next, Release, Relaxed, guard); 80 false 81 } else { 82 // looks like the actual tail; attempt to link in `n` 83 let result = o 84 .next 85 .compare_exchange(Shared::null(), new, Release, Relaxed, guard) 86 .is_ok(); 87 if result { 88 // try to move the tail pointer forward 89 let _ = self 90 .tail 91 .compare_exchange(onto, new, Release, Relaxed, guard); 92 } 93 result 94 } 95 } 96 97 /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`. push(&self, t: T, guard: &Guard)98 pub(crate) fn push(&self, t: T, guard: &Guard) { 99 let new = Owned::new(Node { 100 data: MaybeUninit::new(t), 101 next: Atomic::null(), 102 }); 103 let new = Owned::into_shared(new, guard); 104 105 loop { 106 // We push onto the tail, so we'll start optimistically by looking there first. 107 let tail = self.tail.load(Acquire, guard); 108 109 // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed. 110 if self.push_internal(tail, new, guard) { 111 break; 112 } 113 } 114 } 115 116 /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop. 117 #[inline(always)] pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()>118 fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> { 119 let head = self.head.load(Acquire, guard); 120 let h = unsafe { head.deref() }; 121 let next = h.next.load(Acquire, guard); 122 match unsafe { next.as_ref() } { 123 Some(n) => unsafe { 124 self.head 125 .compare_exchange(head, next, Release, Relaxed, guard) 126 .map(|_| { 127 let tail = self.tail.load(Relaxed, guard); 128 // Advance the tail so that we don't retire a pointer to a reachable node. 129 if head == tail { 130 let _ = self 131 .tail 132 .compare_exchange(tail, next, Release, Relaxed, guard); 133 } 134 guard.defer_destroy(head); 135 // TODO: Replace with MaybeUninit::read when api is stable 136 Some(n.data.as_ptr().read()) 137 }) 138 .map_err(|_| ()) 139 }, 140 None => Ok(None), 141 } 142 } 143 144 /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue 145 /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop. 146 #[inline(always)] pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> where T: Sync, F: Fn(&T) -> bool,147 fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> 148 where 149 T: Sync, 150 F: Fn(&T) -> bool, 151 { 152 let head = self.head.load(Acquire, guard); 153 let h = unsafe { head.deref() }; 154 let next = h.next.load(Acquire, guard); 155 match unsafe { next.as_ref() } { 156 Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe { 157 self.head 158 .compare_exchange(head, next, Release, Relaxed, guard) 159 .map(|_| { 160 let tail = self.tail.load(Relaxed, guard); 161 // Advance the tail so that we don't retire a pointer to a reachable node. 162 if head == tail { 163 let _ = self 164 .tail 165 .compare_exchange(tail, next, Release, Relaxed, guard); 166 } 167 guard.defer_destroy(head); 168 Some(n.data.as_ptr().read()) 169 }) 170 .map_err(|_| ()) 171 }, 172 None | Some(_) => Ok(None), 173 } 174 } 175 176 /// Attempts to dequeue from the front. 177 /// 178 /// Returns `None` if the queue is observed to be empty. try_pop(&self, guard: &Guard) -> Option<T>179 pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> { 180 loop { 181 if let Ok(head) = self.pop_internal(guard) { 182 return head; 183 } 184 } 185 } 186 187 /// Attempts to dequeue from the front, if the item satisfies the given condition. 188 /// 189 /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given 190 /// condition. try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> where T: Sync, F: Fn(&T) -> bool,191 pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> 192 where 193 T: Sync, 194 F: Fn(&T) -> bool, 195 { 196 loop { 197 if let Ok(head) = self.pop_if_internal(&condition, guard) { 198 return head; 199 } 200 } 201 } 202 } 203 204 impl<T> Drop for Queue<T> { drop(&mut self)205 fn drop(&mut self) { 206 unsafe { 207 let guard = unprotected(); 208 209 while self.try_pop(guard).is_some() {} 210 211 // Destroy the remaining sentinel node. 212 let sentinel = self.head.load(Relaxed, guard); 213 drop(sentinel.into_owned()); 214 } 215 } 216 } 217 218 #[cfg(all(test, not(crossbeam_loom)))] 219 mod test { 220 use super::*; 221 use crate::pin; 222 use crossbeam_utils::thread; 223 224 struct Queue<T> { 225 queue: super::Queue<T>, 226 } 227 228 impl<T> Queue<T> { new() -> Queue<T>229 pub(crate) fn new() -> Queue<T> { 230 Queue { 231 queue: super::Queue::new(), 232 } 233 } 234 push(&self, t: T)235 pub(crate) fn push(&self, t: T) { 236 let guard = &pin(); 237 self.queue.push(t, guard); 238 } 239 is_empty(&self) -> bool240 pub(crate) fn is_empty(&self) -> bool { 241 let guard = &pin(); 242 let head = self.queue.head.load(Acquire, guard); 243 let h = unsafe { head.deref() }; 244 h.next.load(Acquire, guard).is_null() 245 } 246 try_pop(&self) -> Option<T>247 pub(crate) fn try_pop(&self) -> Option<T> { 248 let guard = &pin(); 249 self.queue.try_pop(guard) 250 } 251 pop(&self) -> T252 pub(crate) fn pop(&self) -> T { 253 loop { 254 match self.try_pop() { 255 None => continue, 256 Some(t) => return t, 257 } 258 } 259 } 260 } 261 262 const CONC_COUNT: i64 = 1000000; 263 264 #[test] push_try_pop_1()265 fn push_try_pop_1() { 266 let q: Queue<i64> = Queue::new(); 267 assert!(q.is_empty()); 268 q.push(37); 269 assert!(!q.is_empty()); 270 assert_eq!(q.try_pop(), Some(37)); 271 assert!(q.is_empty()); 272 } 273 274 #[test] push_try_pop_2()275 fn push_try_pop_2() { 276 let q: Queue<i64> = Queue::new(); 277 assert!(q.is_empty()); 278 q.push(37); 279 q.push(48); 280 assert_eq!(q.try_pop(), Some(37)); 281 assert!(!q.is_empty()); 282 assert_eq!(q.try_pop(), Some(48)); 283 assert!(q.is_empty()); 284 } 285 286 #[test] push_try_pop_many_seq()287 fn push_try_pop_many_seq() { 288 let q: Queue<i64> = Queue::new(); 289 assert!(q.is_empty()); 290 for i in 0..200 { 291 q.push(i) 292 } 293 assert!(!q.is_empty()); 294 for i in 0..200 { 295 assert_eq!(q.try_pop(), Some(i)); 296 } 297 assert!(q.is_empty()); 298 } 299 300 #[test] push_pop_1()301 fn push_pop_1() { 302 let q: Queue<i64> = Queue::new(); 303 assert!(q.is_empty()); 304 q.push(37); 305 assert!(!q.is_empty()); 306 assert_eq!(q.pop(), 37); 307 assert!(q.is_empty()); 308 } 309 310 #[test] push_pop_2()311 fn push_pop_2() { 312 let q: Queue<i64> = Queue::new(); 313 q.push(37); 314 q.push(48); 315 assert_eq!(q.pop(), 37); 316 assert_eq!(q.pop(), 48); 317 } 318 319 #[test] push_pop_many_seq()320 fn push_pop_many_seq() { 321 let q: Queue<i64> = Queue::new(); 322 assert!(q.is_empty()); 323 for i in 0..200 { 324 q.push(i) 325 } 326 assert!(!q.is_empty()); 327 for i in 0..200 { 328 assert_eq!(q.pop(), i); 329 } 330 assert!(q.is_empty()); 331 } 332 333 #[test] push_try_pop_many_spsc()334 fn push_try_pop_many_spsc() { 335 let q: Queue<i64> = Queue::new(); 336 assert!(q.is_empty()); 337 338 thread::scope(|scope| { 339 scope.spawn(|_| { 340 let mut next = 0; 341 342 while next < CONC_COUNT { 343 if let Some(elem) = q.try_pop() { 344 assert_eq!(elem, next); 345 next += 1; 346 } 347 } 348 }); 349 350 for i in 0..CONC_COUNT { 351 q.push(i) 352 } 353 }) 354 .unwrap(); 355 } 356 357 #[test] push_try_pop_many_spmc()358 fn push_try_pop_many_spmc() { 359 fn recv(_t: i32, q: &Queue<i64>) { 360 let mut cur = -1; 361 for _i in 0..CONC_COUNT { 362 if let Some(elem) = q.try_pop() { 363 assert!(elem > cur); 364 cur = elem; 365 366 if cur == CONC_COUNT - 1 { 367 break; 368 } 369 } 370 } 371 } 372 373 let q: Queue<i64> = Queue::new(); 374 assert!(q.is_empty()); 375 thread::scope(|scope| { 376 for i in 0..3 { 377 let q = &q; 378 scope.spawn(move |_| recv(i, q)); 379 } 380 381 scope.spawn(|_| { 382 for i in 0..CONC_COUNT { 383 q.push(i); 384 } 385 }); 386 }) 387 .unwrap(); 388 } 389 390 #[test] push_try_pop_many_mpmc()391 fn push_try_pop_many_mpmc() { 392 enum LR { 393 Left(i64), 394 Right(i64), 395 } 396 397 let q: Queue<LR> = Queue::new(); 398 assert!(q.is_empty()); 399 400 thread::scope(|scope| { 401 for _t in 0..2 { 402 scope.spawn(|_| { 403 for i in CONC_COUNT - 1..CONC_COUNT { 404 q.push(LR::Left(i)) 405 } 406 }); 407 scope.spawn(|_| { 408 for i in CONC_COUNT - 1..CONC_COUNT { 409 q.push(LR::Right(i)) 410 } 411 }); 412 scope.spawn(|_| { 413 let mut vl = vec![]; 414 let mut vr = vec![]; 415 for _i in 0..CONC_COUNT { 416 match q.try_pop() { 417 Some(LR::Left(x)) => vl.push(x), 418 Some(LR::Right(x)) => vr.push(x), 419 _ => {} 420 } 421 } 422 423 let mut vl2 = vl.clone(); 424 let mut vr2 = vr.clone(); 425 vl2.sort(); 426 vr2.sort(); 427 428 assert_eq!(vl, vl2); 429 assert_eq!(vr, vr2); 430 }); 431 } 432 }) 433 .unwrap(); 434 } 435 436 #[test] push_pop_many_spsc()437 fn push_pop_many_spsc() { 438 let q: Queue<i64> = Queue::new(); 439 440 thread::scope(|scope| { 441 scope.spawn(|_| { 442 let mut next = 0; 443 while next < CONC_COUNT { 444 assert_eq!(q.pop(), next); 445 next += 1; 446 } 447 }); 448 449 for i in 0..CONC_COUNT { 450 q.push(i) 451 } 452 }) 453 .unwrap(); 454 assert!(q.is_empty()); 455 } 456 457 #[test] is_empty_dont_pop()458 fn is_empty_dont_pop() { 459 let q: Queue<i64> = Queue::new(); 460 q.push(20); 461 q.push(20); 462 assert!(!q.is_empty()); 463 assert!(!q.is_empty()); 464 assert!(q.try_pop().is_some()); 465 } 466 } 467