1 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue. 2 //! 3 //! Source: 4 //! - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue 5 //! 6 //! Copyright & License: 7 //! - Copyright (c) 2010-2011 Dmitry Vyukov 8 //! - Simplified BSD License and Apache License, Version 2.0 9 //! - http://www.1024cores.net/home/code-license 10 11 use alloc::vec::Vec; 12 use core::cell::UnsafeCell; 13 use core::fmt; 14 use core::marker::PhantomData; 15 use core::mem; 16 use core::ptr; 17 use core::sync::atomic::{self, AtomicUsize, Ordering}; 18 19 use crossbeam_utils::{Backoff, CachePadded}; 20 21 use maybe_uninit::MaybeUninit; 22 23 use err::{PopError, PushError}; 24 25 /// A slot in a queue. 26 struct Slot<T> { 27 /// The current stamp. 28 /// 29 /// If the stamp equals the tail, this node will be next written to. If it equals head + 1, 30 /// this node will be next read from. 31 stamp: AtomicUsize, 32 33 /// The value in this slot. 34 value: UnsafeCell<MaybeUninit<T>>, 35 } 36 37 /// A bounded multi-producer multi-consumer queue. 38 /// 39 /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed 40 /// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an 41 /// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit 42 /// faster than [`SegQueue`]. 43 /// 44 /// [`SegQueue`]: struct.SegQueue.html 45 /// 46 /// # Examples 47 /// 48 /// ``` 49 /// use crossbeam_queue::{ArrayQueue, PushError}; 50 /// 51 /// let q = ArrayQueue::new(2); 52 /// 53 /// assert_eq!(q.push('a'), Ok(())); 54 /// assert_eq!(q.push('b'), Ok(())); 55 /// assert_eq!(q.push('c'), Err(PushError('c'))); 56 /// assert_eq!(q.pop(), Ok('a')); 57 /// ``` 58 pub struct ArrayQueue<T> { 59 /// The head of the queue. 60 /// 61 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a 62 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. 63 /// 64 /// Elements are popped from the head of the queue. 65 head: CachePadded<AtomicUsize>, 66 67 /// The tail of the queue. 68 /// 69 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a 70 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. 71 /// 72 /// Elements are pushed into the tail of the queue. 73 tail: CachePadded<AtomicUsize>, 74 75 /// The buffer holding slots. 76 buffer: *mut Slot<T>, 77 78 /// The queue capacity. 79 cap: usize, 80 81 /// A stamp with the value of `{ lap: 1, index: 0 }`. 82 one_lap: usize, 83 84 /// Indicates that dropping an `ArrayQueue<T>` may drop elements of type `T`. 85 _marker: PhantomData<T>, 86 } 87 88 unsafe impl<T: Send> Sync for ArrayQueue<T> {} 89 unsafe impl<T: Send> Send for ArrayQueue<T> {} 90 91 impl<T> ArrayQueue<T> { 92 /// Creates a new bounded queue with the given capacity. 93 /// 94 /// # Panics 95 /// 96 /// Panics if the capacity is zero. 97 /// 98 /// # Examples 99 /// 100 /// ``` 101 /// use crossbeam_queue::ArrayQueue; 102 /// 103 /// let q = ArrayQueue::<i32>::new(100); 104 /// ``` new(cap: usize) -> ArrayQueue<T>105 pub fn new(cap: usize) -> ArrayQueue<T> { 106 assert!(cap > 0, "capacity must be non-zero"); 107 108 // Head is initialized to `{ lap: 0, index: 0 }`. 109 // Tail is initialized to `{ lap: 0, index: 0 }`. 110 let head = 0; 111 let tail = 0; 112 113 // Allocate a buffer of `cap` slots. 114 let buffer = { 115 let mut v = Vec::<Slot<T>>::with_capacity(cap); 116 let ptr = v.as_mut_ptr(); 117 mem::forget(v); 118 ptr 119 }; 120 121 // Initialize stamps in the slots. 122 for i in 0..cap { 123 unsafe { 124 // Set the stamp to `{ lap: 0, index: i }`. 125 let slot = buffer.add(i); 126 ptr::write(&mut (*slot).stamp, AtomicUsize::new(i)); 127 } 128 } 129 130 // One lap is the smallest power of two greater than `cap`. 131 let one_lap = (cap + 1).next_power_of_two(); 132 133 ArrayQueue { 134 buffer, 135 cap, 136 one_lap, 137 head: CachePadded::new(AtomicUsize::new(head)), 138 tail: CachePadded::new(AtomicUsize::new(tail)), 139 _marker: PhantomData, 140 } 141 } 142 143 /// Attempts to push an element into the queue. 144 /// 145 /// If the queue is full, the element is returned back as an error. 146 /// 147 /// # Examples 148 /// 149 /// ``` 150 /// use crossbeam_queue::{ArrayQueue, PushError}; 151 /// 152 /// let q = ArrayQueue::new(1); 153 /// 154 /// assert_eq!(q.push(10), Ok(())); 155 /// assert_eq!(q.push(20), Err(PushError(20))); 156 /// ``` push(&self, value: T) -> Result<(), PushError<T>>157 pub fn push(&self, value: T) -> Result<(), PushError<T>> { 158 let backoff = Backoff::new(); 159 let mut tail = self.tail.load(Ordering::Relaxed); 160 161 loop { 162 // Deconstruct the tail. 163 let index = tail & (self.one_lap - 1); 164 let lap = tail & !(self.one_lap - 1); 165 166 // Inspect the corresponding slot. 167 let slot = unsafe { &*self.buffer.add(index) }; 168 let stamp = slot.stamp.load(Ordering::Acquire); 169 170 // If the tail and the stamp match, we may attempt to push. 171 if tail == stamp { 172 let new_tail = if index + 1 < self.cap { 173 // Same lap, incremented index. 174 // Set to `{ lap: lap, index: index + 1 }`. 175 tail + 1 176 } else { 177 // One lap forward, index wraps around to zero. 178 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. 179 lap.wrapping_add(self.one_lap) 180 }; 181 182 // Try moving the tail. 183 match self.tail.compare_exchange_weak( 184 tail, 185 new_tail, 186 Ordering::SeqCst, 187 Ordering::Relaxed, 188 ) { 189 Ok(_) => { 190 // Write the value into the slot and update the stamp. 191 unsafe { 192 slot.value.get().write(MaybeUninit::new(value)); 193 } 194 slot.stamp.store(tail + 1, Ordering::Release); 195 return Ok(()); 196 } 197 Err(t) => { 198 tail = t; 199 backoff.spin(); 200 } 201 } 202 } else if stamp.wrapping_add(self.one_lap) == tail + 1 { 203 atomic::fence(Ordering::SeqCst); 204 let head = self.head.load(Ordering::Relaxed); 205 206 // If the head lags one lap behind the tail as well... 207 if head.wrapping_add(self.one_lap) == tail { 208 // ...then the queue is full. 209 return Err(PushError(value)); 210 } 211 212 backoff.spin(); 213 tail = self.tail.load(Ordering::Relaxed); 214 } else { 215 // Snooze because we need to wait for the stamp to get updated. 216 backoff.snooze(); 217 tail = self.tail.load(Ordering::Relaxed); 218 } 219 } 220 } 221 222 /// Attempts to pop an element from the queue. 223 /// 224 /// If the queue is empty, an error is returned. 225 /// 226 /// # Examples 227 /// 228 /// ``` 229 /// use crossbeam_queue::{ArrayQueue, PopError}; 230 /// 231 /// let q = ArrayQueue::new(1); 232 /// assert_eq!(q.push(10), Ok(())); 233 /// 234 /// assert_eq!(q.pop(), Ok(10)); 235 /// assert_eq!(q.pop(), Err(PopError)); 236 /// ``` pop(&self) -> Result<T, PopError>237 pub fn pop(&self) -> Result<T, PopError> { 238 let backoff = Backoff::new(); 239 let mut head = self.head.load(Ordering::Relaxed); 240 241 loop { 242 // Deconstruct the head. 243 let index = head & (self.one_lap - 1); 244 let lap = head & !(self.one_lap - 1); 245 246 // Inspect the corresponding slot. 247 let slot = unsafe { &*self.buffer.add(index) }; 248 let stamp = slot.stamp.load(Ordering::Acquire); 249 250 // If the the stamp is ahead of the head by 1, we may attempt to pop. 251 if head + 1 == stamp { 252 let new = if index + 1 < self.cap { 253 // Same lap, incremented index. 254 // Set to `{ lap: lap, index: index + 1 }`. 255 head + 1 256 } else { 257 // One lap forward, index wraps around to zero. 258 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. 259 lap.wrapping_add(self.one_lap) 260 }; 261 262 // Try moving the head. 263 match self.head.compare_exchange_weak( 264 head, 265 new, 266 Ordering::SeqCst, 267 Ordering::Relaxed, 268 ) { 269 Ok(_) => { 270 // Read the value from the slot and update the stamp. 271 let msg = unsafe { slot.value.get().read().assume_init() }; 272 slot.stamp 273 .store(head.wrapping_add(self.one_lap), Ordering::Release); 274 return Ok(msg); 275 } 276 Err(h) => { 277 head = h; 278 backoff.spin(); 279 } 280 } 281 } else if stamp == head { 282 atomic::fence(Ordering::SeqCst); 283 let tail = self.tail.load(Ordering::Relaxed); 284 285 // If the tail equals the head, that means the channel is empty. 286 if tail == head { 287 return Err(PopError); 288 } 289 290 backoff.spin(); 291 head = self.head.load(Ordering::Relaxed); 292 } else { 293 // Snooze because we need to wait for the stamp to get updated. 294 backoff.snooze(); 295 head = self.head.load(Ordering::Relaxed); 296 } 297 } 298 } 299 300 /// Returns the capacity of the queue. 301 /// 302 /// # Examples 303 /// 304 /// ``` 305 /// use crossbeam_queue::{ArrayQueue, PopError}; 306 /// 307 /// let q = ArrayQueue::<i32>::new(100); 308 /// 309 /// assert_eq!(q.capacity(), 100); 310 /// ``` capacity(&self) -> usize311 pub fn capacity(&self) -> usize { 312 self.cap 313 } 314 315 /// Returns `true` if the queue is empty. 316 /// 317 /// # Examples 318 /// 319 /// ``` 320 /// use crossbeam_queue::{ArrayQueue, PopError}; 321 /// 322 /// let q = ArrayQueue::new(100); 323 /// 324 /// assert!(q.is_empty()); 325 /// q.push(1).unwrap(); 326 /// assert!(!q.is_empty()); 327 /// ``` is_empty(&self) -> bool328 pub fn is_empty(&self) -> bool { 329 let head = self.head.load(Ordering::SeqCst); 330 let tail = self.tail.load(Ordering::SeqCst); 331 332 // Is the tail lagging one lap behind head? 333 // Is the tail equal to the head? 334 // 335 // Note: If the head changes just before we load the tail, that means there was a moment 336 // when the channel was not empty, so it is safe to just return `false`. 337 tail == head 338 } 339 340 /// Returns `true` if the queue is full. 341 /// 342 /// # Examples 343 /// 344 /// ``` 345 /// use crossbeam_queue::{ArrayQueue, PopError}; 346 /// 347 /// let q = ArrayQueue::new(1); 348 /// 349 /// assert!(!q.is_full()); 350 /// q.push(1).unwrap(); 351 /// assert!(q.is_full()); 352 /// ``` is_full(&self) -> bool353 pub fn is_full(&self) -> bool { 354 let tail = self.tail.load(Ordering::SeqCst); 355 let head = self.head.load(Ordering::SeqCst); 356 357 // Is the head lagging one lap behind tail? 358 // 359 // Note: If the tail changes just before we load the head, that means there was a moment 360 // when the queue was not full, so it is safe to just return `false`. 361 head.wrapping_add(self.one_lap) == tail 362 } 363 364 /// Returns the number of elements in the queue. 365 /// 366 /// # Examples 367 /// 368 /// ``` 369 /// use crossbeam_queue::{ArrayQueue, PopError}; 370 /// 371 /// let q = ArrayQueue::new(100); 372 /// assert_eq!(q.len(), 0); 373 /// 374 /// q.push(10).unwrap(); 375 /// assert_eq!(q.len(), 1); 376 /// 377 /// q.push(20).unwrap(); 378 /// assert_eq!(q.len(), 2); 379 /// ``` len(&self) -> usize380 pub fn len(&self) -> usize { 381 loop { 382 // Load the tail, then load the head. 383 let tail = self.tail.load(Ordering::SeqCst); 384 let head = self.head.load(Ordering::SeqCst); 385 386 // If the tail didn't change, we've got consistent values to work with. 387 if self.tail.load(Ordering::SeqCst) == tail { 388 let hix = head & (self.one_lap - 1); 389 let tix = tail & (self.one_lap - 1); 390 391 return if hix < tix { 392 tix - hix 393 } else if hix > tix { 394 self.cap - hix + tix 395 } else if tail == head { 396 0 397 } else { 398 self.cap 399 }; 400 } 401 } 402 } 403 } 404 405 impl<T> Drop for ArrayQueue<T> { drop(&mut self)406 fn drop(&mut self) { 407 // Get the index of the head. 408 let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1); 409 410 // Loop over all slots that hold a message and drop them. 411 for i in 0..self.len() { 412 // Compute the index of the next slot holding a message. 413 let index = if hix + i < self.cap { 414 hix + i 415 } else { 416 hix + i - self.cap 417 }; 418 419 unsafe { 420 let p = { 421 let slot = &mut *self.buffer.add(index); 422 let value = &mut *slot.value.get(); 423 value.as_mut_ptr() 424 }; 425 p.drop_in_place(); 426 } 427 } 428 429 // Finally, deallocate the buffer, but don't run any destructors. 430 unsafe { 431 Vec::from_raw_parts(self.buffer, 0, self.cap); 432 } 433 } 434 } 435 436 impl<T> fmt::Debug for ArrayQueue<T> { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result437 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 438 f.pad("ArrayQueue { .. }") 439 } 440 } 441