1 //! Michael-Scott lock-free queue. 2 //! 3 //! Usable with any number of producers and consumers. 4 //! 5 //! Michael and Scott. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue 6 //! Algorithms. PODC 1996. http://dl.acm.org/citation.cfm?id=248106 7 //! 8 //! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a 9 //! Practical Lock-Free Queue Algorithm. https://doi.org/10.1007/978-3-540-30232-2_7 10 11 use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; 12 13 use crossbeam_utils::CachePadded; 14 15 use maybe_uninit::MaybeUninit; 16 17 use {unprotected, Atomic, Guard, Owned, Shared}; 18 19 // The representation here is a singly-linked list, with a sentinel node at the front. In general 20 // the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or 21 // all `Blocked` (requests for data from blocked threads). 22 #[derive(Debug)] 23 pub struct Queue<T> { 24 head: CachePadded<Atomic<Node<T>>>, 25 tail: CachePadded<Atomic<Node<T>>>, 26 } 27 28 struct Node<T> { 29 /// The slot in which a value of type `T` can be stored. 30 /// 31 /// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`. 32 /// For example, the sentinel node in a queue never contains a value: its slot is always empty. 33 /// Other nodes start their life with a push operation and contain a value until it gets popped 34 /// out. After that such empty nodes get added to the collector for destruction. 35 data: MaybeUninit<T>, 36 37 next: Atomic<Node<T>>, 38 } 39 40 // Any particular `T` should never be accessed concurrently, so no need for `Sync`. 41 unsafe impl<T: Send> Sync for Queue<T> {} 42 unsafe impl<T: Send> Send for Queue<T> {} 43 44 impl<T> Queue<T> { 45 /// Create a new, empty queue. new() -> Queue<T>46 pub fn new() -> Queue<T> { 47 let q = Queue { 48 head: CachePadded::new(Atomic::null()), 49 tail: CachePadded::new(Atomic::null()), 50 }; 51 let sentinel = Owned::new(Node { 52 data: MaybeUninit::uninit(), 53 next: Atomic::null(), 54 }); 55 unsafe { 56 let guard = &unprotected(); 57 let sentinel = sentinel.into_shared(guard); 58 q.head.store(sentinel, Relaxed); 59 q.tail.store(sentinel, Relaxed); 60 q 61 } 62 } 63 64 /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on 65 /// success. The queue's `tail` pointer may be updated. 66 #[inline(always)] push_internal(&self, onto: Shared<Node<T>>, new: Shared<Node<T>>, guard: &Guard) -> bool67 fn push_internal(&self, onto: Shared<Node<T>>, new: Shared<Node<T>>, guard: &Guard) -> bool { 68 // is `onto` the actual tail? 69 let o = unsafe { onto.deref() }; 70 let next = o.next.load(Acquire, guard); 71 if unsafe { next.as_ref().is_some() } { 72 // if not, try to "help" by moving the tail pointer forward 73 let _ = self.tail.compare_and_set(onto, next, Release, guard); 74 false 75 } else { 76 // looks like the actual tail; attempt to link in `n` 77 let result = o 78 .next 79 .compare_and_set(Shared::null(), new, Release, guard) 80 .is_ok(); 81 if result { 82 // try to move the tail pointer forward 83 let _ = self.tail.compare_and_set(onto, new, Release, guard); 84 } 85 result 86 } 87 } 88 89 /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`. push(&self, t: T, guard: &Guard)90 pub fn push(&self, t: T, guard: &Guard) { 91 let new = Owned::new(Node { 92 data: MaybeUninit::new(t), 93 next: Atomic::null(), 94 }); 95 let new = Owned::into_shared(new, guard); 96 97 loop { 98 // We push onto the tail, so we'll start optimistically by looking there first. 99 let tail = self.tail.load(Acquire, guard); 100 101 // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed. 102 if self.push_internal(tail, new, guard) { 103 break; 104 } 105 } 106 } 107 108 /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop. 109 #[inline(always)] pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()>110 fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> { 111 let head = self.head.load(Acquire, guard); 112 let h = unsafe { head.deref() }; 113 let next = h.next.load(Acquire, guard); 114 match unsafe { next.as_ref() } { 115 Some(n) => unsafe { 116 self.head 117 .compare_and_set(head, next, Release, guard) 118 .map(|_| { 119 let tail = self.tail.load(Relaxed, guard); 120 // Advance the tail so that we don't retire a pointer to a reachable node. 121 if head == tail { 122 let _ = self.tail.compare_and_set(tail, next, Release, guard); 123 } 124 guard.defer_destroy(head); 125 // TODO: Replace with MaybeUninit::read when api is stable 126 Some(n.data.as_ptr().read()) 127 }) 128 .map_err(|_| ()) 129 }, 130 None => Ok(None), 131 } 132 } 133 134 /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue 135 /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop. 136 #[inline(always)] pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> where T: Sync, F: Fn(&T) -> bool,137 fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> 138 where 139 T: Sync, 140 F: Fn(&T) -> bool, 141 { 142 let head = self.head.load(Acquire, guard); 143 let h = unsafe { head.deref() }; 144 let next = h.next.load(Acquire, guard); 145 match unsafe { next.as_ref() } { 146 Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe { 147 self.head 148 .compare_and_set(head, next, Release, guard) 149 .map(|_| { 150 let tail = self.tail.load(Relaxed, guard); 151 // Advance the tail so that we don't retire a pointer to a reachable node. 152 if head == tail { 153 let _ = self.tail.compare_and_set(tail, next, Release, guard); 154 } 155 guard.defer_destroy(head); 156 Some(n.data.as_ptr().read()) 157 }) 158 .map_err(|_| ()) 159 }, 160 None | Some(_) => Ok(None), 161 } 162 } 163 164 /// Attempts to dequeue from the front. 165 /// 166 /// Returns `None` if the queue is observed to be empty. try_pop(&self, guard: &Guard) -> Option<T>167 pub fn try_pop(&self, guard: &Guard) -> Option<T> { 168 loop { 169 if let Ok(head) = self.pop_internal(guard) { 170 return head; 171 } 172 } 173 } 174 175 /// Attempts to dequeue from the front, if the item satisfies the given condition. 176 /// 177 /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given 178 /// condition. try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> where T: Sync, F: Fn(&T) -> bool,179 pub fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> 180 where 181 T: Sync, 182 F: Fn(&T) -> bool, 183 { 184 loop { 185 if let Ok(head) = self.pop_if_internal(&condition, guard) { 186 return head; 187 } 188 } 189 } 190 } 191 192 impl<T> Drop for Queue<T> { drop(&mut self)193 fn drop(&mut self) { 194 unsafe { 195 let guard = &unprotected(); 196 197 while let Some(_) = self.try_pop(guard) {} 198 199 // Destroy the remaining sentinel node. 200 let sentinel = self.head.load(Relaxed, guard); 201 drop(sentinel.into_owned()); 202 } 203 } 204 } 205 206 #[cfg(test)] 207 mod test { 208 use super::*; 209 use crossbeam_utils::thread; 210 use pin; 211 212 struct Queue<T> { 213 queue: super::Queue<T>, 214 } 215 216 impl<T> Queue<T> { new() -> Queue<T>217 pub fn new() -> Queue<T> { 218 Queue { 219 queue: super::Queue::new(), 220 } 221 } 222 push(&self, t: T)223 pub fn push(&self, t: T) { 224 let guard = &pin(); 225 self.queue.push(t, guard); 226 } 227 is_empty(&self) -> bool228 pub fn is_empty(&self) -> bool { 229 let guard = &pin(); 230 let head = self.queue.head.load(Acquire, guard); 231 let h = unsafe { head.deref() }; 232 h.next.load(Acquire, guard).is_null() 233 } 234 try_pop(&self) -> Option<T>235 pub fn try_pop(&self) -> Option<T> { 236 let guard = &pin(); 237 self.queue.try_pop(guard) 238 } 239 pop(&self) -> T240 pub fn pop(&self) -> T { 241 loop { 242 match self.try_pop() { 243 None => continue, 244 Some(t) => return t, 245 } 246 } 247 } 248 } 249 250 const CONC_COUNT: i64 = 1000000; 251 252 #[test] push_try_pop_1()253 fn push_try_pop_1() { 254 let q: Queue<i64> = Queue::new(); 255 assert!(q.is_empty()); 256 q.push(37); 257 assert!(!q.is_empty()); 258 assert_eq!(q.try_pop(), Some(37)); 259 assert!(q.is_empty()); 260 } 261 262 #[test] push_try_pop_2()263 fn push_try_pop_2() { 264 let q: Queue<i64> = Queue::new(); 265 assert!(q.is_empty()); 266 q.push(37); 267 q.push(48); 268 assert_eq!(q.try_pop(), Some(37)); 269 assert!(!q.is_empty()); 270 assert_eq!(q.try_pop(), Some(48)); 271 assert!(q.is_empty()); 272 } 273 274 #[test] push_try_pop_many_seq()275 fn push_try_pop_many_seq() { 276 let q: Queue<i64> = Queue::new(); 277 assert!(q.is_empty()); 278 for i in 0..200 { 279 q.push(i) 280 } 281 assert!(!q.is_empty()); 282 for i in 0..200 { 283 assert_eq!(q.try_pop(), Some(i)); 284 } 285 assert!(q.is_empty()); 286 } 287 288 #[test] push_pop_1()289 fn push_pop_1() { 290 let q: Queue<i64> = Queue::new(); 291 assert!(q.is_empty()); 292 q.push(37); 293 assert!(!q.is_empty()); 294 assert_eq!(q.pop(), 37); 295 assert!(q.is_empty()); 296 } 297 298 #[test] push_pop_2()299 fn push_pop_2() { 300 let q: Queue<i64> = Queue::new(); 301 q.push(37); 302 q.push(48); 303 assert_eq!(q.pop(), 37); 304 assert_eq!(q.pop(), 48); 305 } 306 307 #[test] push_pop_many_seq()308 fn push_pop_many_seq() { 309 let q: Queue<i64> = Queue::new(); 310 assert!(q.is_empty()); 311 for i in 0..200 { 312 q.push(i) 313 } 314 assert!(!q.is_empty()); 315 for i in 0..200 { 316 assert_eq!(q.pop(), i); 317 } 318 assert!(q.is_empty()); 319 } 320 321 #[test] push_try_pop_many_spsc()322 fn push_try_pop_many_spsc() { 323 let q: Queue<i64> = Queue::new(); 324 assert!(q.is_empty()); 325 326 thread::scope(|scope| { 327 scope.spawn(|_| { 328 let mut next = 0; 329 330 while next < CONC_COUNT { 331 if let Some(elem) = q.try_pop() { 332 assert_eq!(elem, next); 333 next += 1; 334 } 335 } 336 }); 337 338 for i in 0..CONC_COUNT { 339 q.push(i) 340 } 341 }) 342 .unwrap(); 343 } 344 345 #[test] push_try_pop_many_spmc()346 fn push_try_pop_many_spmc() { 347 fn recv(_t: i32, q: &Queue<i64>) { 348 let mut cur = -1; 349 for _i in 0..CONC_COUNT { 350 if let Some(elem) = q.try_pop() { 351 assert!(elem > cur); 352 cur = elem; 353 354 if cur == CONC_COUNT - 1 { 355 break; 356 } 357 } 358 } 359 } 360 361 let q: Queue<i64> = Queue::new(); 362 assert!(q.is_empty()); 363 thread::scope(|scope| { 364 for i in 0..3 { 365 let q = &q; 366 scope.spawn(move |_| recv(i, q)); 367 } 368 369 scope.spawn(|_| { 370 for i in 0..CONC_COUNT { 371 q.push(i); 372 } 373 }); 374 }) 375 .unwrap(); 376 } 377 378 #[test] push_try_pop_many_mpmc()379 fn push_try_pop_many_mpmc() { 380 enum LR { 381 Left(i64), 382 Right(i64), 383 } 384 385 let q: Queue<LR> = Queue::new(); 386 assert!(q.is_empty()); 387 388 thread::scope(|scope| { 389 for _t in 0..2 { 390 scope.spawn(|_| { 391 for i in CONC_COUNT - 1..CONC_COUNT { 392 q.push(LR::Left(i)) 393 } 394 }); 395 scope.spawn(|_| { 396 for i in CONC_COUNT - 1..CONC_COUNT { 397 q.push(LR::Right(i)) 398 } 399 }); 400 scope.spawn(|_| { 401 let mut vl = vec![]; 402 let mut vr = vec![]; 403 for _i in 0..CONC_COUNT { 404 match q.try_pop() { 405 Some(LR::Left(x)) => vl.push(x), 406 Some(LR::Right(x)) => vr.push(x), 407 _ => {} 408 } 409 } 410 411 let mut vl2 = vl.clone(); 412 let mut vr2 = vr.clone(); 413 vl2.sort(); 414 vr2.sort(); 415 416 assert_eq!(vl, vl2); 417 assert_eq!(vr, vr2); 418 }); 419 } 420 }) 421 .unwrap(); 422 } 423 424 #[test] push_pop_many_spsc()425 fn push_pop_many_spsc() { 426 let q: Queue<i64> = Queue::new(); 427 428 thread::scope(|scope| { 429 scope.spawn(|_| { 430 let mut next = 0; 431 while next < CONC_COUNT { 432 assert_eq!(q.pop(), next); 433 next += 1; 434 } 435 }); 436 437 for i in 0..CONC_COUNT { 438 q.push(i) 439 } 440 }) 441 .unwrap(); 442 assert!(q.is_empty()); 443 } 444 445 #[test] is_empty_dont_pop()446 fn is_empty_dont_pop() { 447 let q: Queue<i64> = Queue::new(); 448 q.push(20); 449 q.push(20); 450 assert!(!q.is_empty()); 451 assert!(!q.is_empty()); 452 assert!(q.try_pop().is_some()); 453 } 454 } 455