1 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue. 2 //! 3 //! Source: 4 //! - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue 5 //! 6 //! Copyright & License: 7 //! - Copyright (c) 2010-2011 Dmitry Vyukov 8 //! - Simplified BSD License and Apache License, Version 2.0 9 //! - http://www.1024cores.net/home/code-license 10 11 use alloc::vec::Vec; 12 use core::cell::UnsafeCell; 13 use core::fmt; 14 use core::marker::PhantomData; 15 use core::mem; 16 use core::ptr; 17 use core::sync::atomic::{self, AtomicUsize, Ordering}; 18 19 use crossbeam_utils::{Backoff, CachePadded}; 20 21 use err::{PopError, PushError}; 22 23 /// A slot in a queue. 24 struct Slot<T> { 25 /// The current stamp. 26 /// 27 /// If the stamp equals the tail, this node will be next written to. If it equals head + 1, 28 /// this node will be next read from. 29 stamp: AtomicUsize, 30 31 /// The value in this slot. 32 value: UnsafeCell<T>, 33 } 34 35 /// A bounded multi-producer multi-consumer queue. 36 /// 37 /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed 38 /// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an 39 /// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit 40 /// faster than [`SegQueue`]. 41 /// 42 /// [`SegQueue`]: struct.SegQueue.html 43 /// 44 /// # Examples 45 /// 46 /// ``` 47 /// use crossbeam_queue::{ArrayQueue, PushError}; 48 /// 49 /// let q = ArrayQueue::new(2); 50 /// 51 /// assert_eq!(q.push('a'), Ok(())); 52 /// assert_eq!(q.push('b'), Ok(())); 53 /// assert_eq!(q.push('c'), Err(PushError('c'))); 54 /// assert_eq!(q.pop(), Ok('a')); 55 /// ``` 56 pub struct ArrayQueue<T> { 57 /// The head of the queue. 58 /// 59 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a 60 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. 61 /// 62 /// Elements are popped from the head of the queue. 63 head: CachePadded<AtomicUsize>, 64 65 /// The tail of the queue. 66 /// 67 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a 68 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. 69 /// 70 /// Elements are pushed into the tail of the queue. 71 tail: CachePadded<AtomicUsize>, 72 73 /// The buffer holding slots. 74 buffer: *mut Slot<T>, 75 76 /// The queue capacity. 77 cap: usize, 78 79 /// A stamp with the value of `{ lap: 1, index: 0 }`. 80 one_lap: usize, 81 82 /// Indicates that dropping an `ArrayQueue<T>` may drop elements of type `T`. 83 _marker: PhantomData<T>, 84 } 85 86 unsafe impl<T: Send> Sync for ArrayQueue<T> {} 87 unsafe impl<T: Send> Send for ArrayQueue<T> {} 88 89 impl<T> ArrayQueue<T> { 90 /// Creates a new bounded queue with the given capacity. 91 /// 92 /// # Panics 93 /// 94 /// Panics if the capacity is zero. 95 /// 96 /// # Examples 97 /// 98 /// ``` 99 /// use crossbeam_queue::ArrayQueue; 100 /// 101 /// let q = ArrayQueue::<i32>::new(100); 102 /// ``` new(cap: usize) -> ArrayQueue<T>103 pub fn new(cap: usize) -> ArrayQueue<T> { 104 assert!(cap > 0, "capacity must be non-zero"); 105 106 // Head is initialized to `{ lap: 0, index: 0 }`. 107 // Tail is initialized to `{ lap: 0, index: 0 }`. 108 let head = 0; 109 let tail = 0; 110 111 // Allocate a buffer of `cap` slots. 112 let buffer = { 113 let mut v = Vec::<Slot<T>>::with_capacity(cap); 114 let ptr = v.as_mut_ptr(); 115 mem::forget(v); 116 ptr 117 }; 118 119 // Initialize stamps in the slots. 120 for i in 0..cap { 121 unsafe { 122 // Set the stamp to `{ lap: 0, index: i }`. 123 let slot = buffer.add(i); 124 ptr::write(&mut (*slot).stamp, AtomicUsize::new(i)); 125 } 126 } 127 128 // One lap is the smallest power of two greater than `cap`. 129 let one_lap = (cap + 1).next_power_of_two(); 130 131 ArrayQueue { 132 buffer, 133 cap, 134 one_lap, 135 head: CachePadded::new(AtomicUsize::new(head)), 136 tail: CachePadded::new(AtomicUsize::new(tail)), 137 _marker: PhantomData, 138 } 139 } 140 141 /// Attempts to push an element into the queue. 142 /// 143 /// If the queue is full, the element is returned back as an error. 144 /// 145 /// # Examples 146 /// 147 /// ``` 148 /// use crossbeam_queue::{ArrayQueue, PushError}; 149 /// 150 /// let q = ArrayQueue::new(1); 151 /// 152 /// assert_eq!(q.push(10), Ok(())); 153 /// assert_eq!(q.push(20), Err(PushError(20))); 154 /// ``` push(&self, value: T) -> Result<(), PushError<T>>155 pub fn push(&self, value: T) -> Result<(), PushError<T>> { 156 let backoff = Backoff::new(); 157 let mut tail = self.tail.load(Ordering::Relaxed); 158 159 loop { 160 // Deconstruct the tail. 161 let index = tail & (self.one_lap - 1); 162 let lap = tail & !(self.one_lap - 1); 163 164 // Inspect the corresponding slot. 165 let slot = unsafe { &*self.buffer.add(index) }; 166 let stamp = slot.stamp.load(Ordering::Acquire); 167 168 // If the tail and the stamp match, we may attempt to push. 169 if tail == stamp { 170 let new_tail = if index + 1 < self.cap { 171 // Same lap, incremented index. 172 // Set to `{ lap: lap, index: index + 1 }`. 173 tail + 1 174 } else { 175 // One lap forward, index wraps around to zero. 176 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. 177 lap.wrapping_add(self.one_lap) 178 }; 179 180 // Try moving the tail. 181 match self.tail.compare_exchange_weak( 182 tail, 183 new_tail, 184 Ordering::SeqCst, 185 Ordering::Relaxed, 186 ) { 187 Ok(_) => { 188 // Write the value into the slot and update the stamp. 189 unsafe { 190 slot.value.get().write(value); 191 } 192 slot.stamp.store(tail + 1, Ordering::Release); 193 return Ok(()); 194 } 195 Err(t) => { 196 tail = t; 197 backoff.spin(); 198 } 199 } 200 } else if stamp.wrapping_add(self.one_lap) == tail + 1 { 201 atomic::fence(Ordering::SeqCst); 202 let head = self.head.load(Ordering::Relaxed); 203 204 // If the head lags one lap behind the tail as well... 205 if head.wrapping_add(self.one_lap) == tail { 206 // ...then the queue is full. 207 return Err(PushError(value)); 208 } 209 210 backoff.spin(); 211 tail = self.tail.load(Ordering::Relaxed); 212 } else { 213 // Snooze because we need to wait for the stamp to get updated. 214 backoff.snooze(); 215 tail = self.tail.load(Ordering::Relaxed); 216 } 217 } 218 } 219 220 /// Attempts to pop an element from the queue. 221 /// 222 /// If the queue is empty, an error is returned. 223 /// 224 /// # Examples 225 /// 226 /// ``` 227 /// use crossbeam_queue::{ArrayQueue, PopError}; 228 /// 229 /// let q = ArrayQueue::new(1); 230 /// assert_eq!(q.push(10), Ok(())); 231 /// 232 /// assert_eq!(q.pop(), Ok(10)); 233 /// assert_eq!(q.pop(), Err(PopError)); 234 /// ``` pop(&self) -> Result<T, PopError>235 pub fn pop(&self) -> Result<T, PopError> { 236 let backoff = Backoff::new(); 237 let mut head = self.head.load(Ordering::Relaxed); 238 239 loop { 240 // Deconstruct the head. 241 let index = head & (self.one_lap - 1); 242 let lap = head & !(self.one_lap - 1); 243 244 // Inspect the corresponding slot. 245 let slot = unsafe { &*self.buffer.add(index) }; 246 let stamp = slot.stamp.load(Ordering::Acquire); 247 248 // If the the stamp is ahead of the head by 1, we may attempt to pop. 249 if head + 1 == stamp { 250 let new = if index + 1 < self.cap { 251 // Same lap, incremented index. 252 // Set to `{ lap: lap, index: index + 1 }`. 253 head + 1 254 } else { 255 // One lap forward, index wraps around to zero. 256 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. 257 lap.wrapping_add(self.one_lap) 258 }; 259 260 // Try moving the head. 261 match self.head.compare_exchange_weak( 262 head, 263 new, 264 Ordering::SeqCst, 265 Ordering::Relaxed, 266 ) { 267 Ok(_) => { 268 // Read the value from the slot and update the stamp. 269 let msg = unsafe { slot.value.get().read() }; 270 slot.stamp 271 .store(head.wrapping_add(self.one_lap), Ordering::Release); 272 return Ok(msg); 273 } 274 Err(h) => { 275 head = h; 276 backoff.spin(); 277 } 278 } 279 } else if stamp == head { 280 atomic::fence(Ordering::SeqCst); 281 let tail = self.tail.load(Ordering::Relaxed); 282 283 // If the tail equals the head, that means the channel is empty. 284 if tail == head { 285 return Err(PopError); 286 } 287 288 backoff.spin(); 289 head = self.head.load(Ordering::Relaxed); 290 } else { 291 // Snooze because we need to wait for the stamp to get updated. 292 backoff.snooze(); 293 head = self.head.load(Ordering::Relaxed); 294 } 295 } 296 } 297 298 /// Returns the capacity of the queue. 299 /// 300 /// # Examples 301 /// 302 /// ``` 303 /// use crossbeam_queue::{ArrayQueue, PopError}; 304 /// 305 /// let q = ArrayQueue::<i32>::new(100); 306 /// 307 /// assert_eq!(q.capacity(), 100); 308 /// ``` capacity(&self) -> usize309 pub fn capacity(&self) -> usize { 310 self.cap 311 } 312 313 /// Returns `true` if the queue is empty. 314 /// 315 /// # Examples 316 /// 317 /// ``` 318 /// use crossbeam_queue::{ArrayQueue, PopError}; 319 /// 320 /// let q = ArrayQueue::new(100); 321 /// 322 /// assert!(q.is_empty()); 323 /// q.push(1).unwrap(); 324 /// assert!(!q.is_empty()); 325 /// ``` is_empty(&self) -> bool326 pub fn is_empty(&self) -> bool { 327 let head = self.head.load(Ordering::SeqCst); 328 let tail = self.tail.load(Ordering::SeqCst); 329 330 // Is the tail lagging one lap behind head? 331 // Is the tail equal to the head? 332 // 333 // Note: If the head changes just before we load the tail, that means there was a moment 334 // when the channel was not empty, so it is safe to just return `false`. 335 tail == head 336 } 337 338 /// Returns `true` if the queue is full. 339 /// 340 /// # Examples 341 /// 342 /// ``` 343 /// use crossbeam_queue::{ArrayQueue, PopError}; 344 /// 345 /// let q = ArrayQueue::new(1); 346 /// 347 /// assert!(!q.is_full()); 348 /// q.push(1).unwrap(); 349 /// assert!(q.is_full()); 350 /// ``` is_full(&self) -> bool351 pub fn is_full(&self) -> bool { 352 let tail = self.tail.load(Ordering::SeqCst); 353 let head = self.head.load(Ordering::SeqCst); 354 355 // Is the head lagging one lap behind tail? 356 // 357 // Note: If the tail changes just before we load the head, that means there was a moment 358 // when the queue was not full, so it is safe to just return `false`. 359 head.wrapping_add(self.one_lap) == tail 360 } 361 362 /// Returns the number of elements in the queue. 363 /// 364 /// # Examples 365 /// 366 /// ``` 367 /// use crossbeam_queue::{ArrayQueue, PopError}; 368 /// 369 /// let q = ArrayQueue::new(100); 370 /// assert_eq!(q.len(), 0); 371 /// 372 /// q.push(10).unwrap(); 373 /// assert_eq!(q.len(), 1); 374 /// 375 /// q.push(20).unwrap(); 376 /// assert_eq!(q.len(), 2); 377 /// ``` len(&self) -> usize378 pub fn len(&self) -> usize { 379 loop { 380 // Load the tail, then load the head. 381 let tail = self.tail.load(Ordering::SeqCst); 382 let head = self.head.load(Ordering::SeqCst); 383 384 // If the tail didn't change, we've got consistent values to work with. 385 if self.tail.load(Ordering::SeqCst) == tail { 386 let hix = head & (self.one_lap - 1); 387 let tix = tail & (self.one_lap - 1); 388 389 return if hix < tix { 390 tix - hix 391 } else if hix > tix { 392 self.cap - hix + tix 393 } else if tail == head { 394 0 395 } else { 396 self.cap 397 }; 398 } 399 } 400 } 401 } 402 403 impl<T> Drop for ArrayQueue<T> { drop(&mut self)404 fn drop(&mut self) { 405 // Get the index of the head. 406 let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1); 407 408 // Loop over all slots that hold a message and drop them. 409 for i in 0..self.len() { 410 // Compute the index of the next slot holding a message. 411 let index = if hix + i < self.cap { 412 hix + i 413 } else { 414 hix + i - self.cap 415 }; 416 417 unsafe { 418 self.buffer.add(index).drop_in_place(); 419 } 420 } 421 422 // Finally, deallocate the buffer, but don't run any destructors. 423 unsafe { 424 Vec::from_raw_parts(self.buffer, 0, self.cap); 425 } 426 } 427 } 428 429 impl<T> fmt::Debug for ArrayQueue<T> { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result430 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 431 f.pad("ArrayQueue { .. }") 432 } 433 } 434