1 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
2 //!
3 //! Source:
4 //!   - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
5 //!
6 //! Copyright & License:
7 //!   - Copyright (c) 2010-2011 Dmitry Vyukov
8 //!   - Simplified BSD License and Apache License, Version 2.0
9 //!   - http://www.1024cores.net/home/code-license
10 
11 use alloc::vec::Vec;
12 use core::cell::UnsafeCell;
13 use core::fmt;
14 use core::marker::PhantomData;
15 use core::mem;
16 use core::ptr;
17 use core::sync::atomic::{self, AtomicUsize, Ordering};
18 
19 use crossbeam_utils::{Backoff, CachePadded};
20 
21 use maybe_uninit::MaybeUninit;
22 
23 use err::{PopError, PushError};
24 
25 /// A slot in a queue.
26 struct Slot<T> {
27     /// The current stamp.
28     ///
29     /// If the stamp equals the tail, this node will be next written to. If it equals head + 1,
30     /// this node will be next read from.
31     stamp: AtomicUsize,
32 
33     /// The value in this slot.
34     value: UnsafeCell<MaybeUninit<T>>,
35 }
36 
37 /// A bounded multi-producer multi-consumer queue.
38 ///
39 /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
40 /// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an
41 /// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit
42 /// faster than [`SegQueue`].
43 ///
44 /// [`SegQueue`]: struct.SegQueue.html
45 ///
46 /// # Examples
47 ///
48 /// ```
49 /// use crossbeam_queue::{ArrayQueue, PushError};
50 ///
51 /// let q = ArrayQueue::new(2);
52 ///
53 /// assert_eq!(q.push('a'), Ok(()));
54 /// assert_eq!(q.push('b'), Ok(()));
55 /// assert_eq!(q.push('c'), Err(PushError('c')));
56 /// assert_eq!(q.pop(), Ok('a'));
57 /// ```
58 pub struct ArrayQueue<T> {
59     /// The head of the queue.
60     ///
61     /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
62     /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
63     ///
64     /// Elements are popped from the head of the queue.
65     head: CachePadded<AtomicUsize>,
66 
67     /// The tail of the queue.
68     ///
69     /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
70     /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
71     ///
72     /// Elements are pushed into the tail of the queue.
73     tail: CachePadded<AtomicUsize>,
74 
75     /// The buffer holding slots.
76     buffer: *mut Slot<T>,
77 
78     /// The queue capacity.
79     cap: usize,
80 
81     /// A stamp with the value of `{ lap: 1, index: 0 }`.
82     one_lap: usize,
83 
84     /// Indicates that dropping an `ArrayQueue<T>` may drop elements of type `T`.
85     _marker: PhantomData<T>,
86 }
87 
88 unsafe impl<T: Send> Sync for ArrayQueue<T> {}
89 unsafe impl<T: Send> Send for ArrayQueue<T> {}
90 
91 impl<T> ArrayQueue<T> {
92     /// Creates a new bounded queue with the given capacity.
93     ///
94     /// # Panics
95     ///
96     /// Panics if the capacity is zero.
97     ///
98     /// # Examples
99     ///
100     /// ```
101     /// use crossbeam_queue::ArrayQueue;
102     ///
103     /// let q = ArrayQueue::<i32>::new(100);
104     /// ```
new(cap: usize) -> ArrayQueue<T>105     pub fn new(cap: usize) -> ArrayQueue<T> {
106         assert!(cap > 0, "capacity must be non-zero");
107 
108         // Head is initialized to `{ lap: 0, index: 0 }`.
109         // Tail is initialized to `{ lap: 0, index: 0 }`.
110         let head = 0;
111         let tail = 0;
112 
113         // Allocate a buffer of `cap` slots.
114         let buffer = {
115             let mut v = Vec::<Slot<T>>::with_capacity(cap);
116             let ptr = v.as_mut_ptr();
117             mem::forget(v);
118             ptr
119         };
120 
121         // Initialize stamps in the slots.
122         for i in 0..cap {
123             unsafe {
124                 // Set the stamp to `{ lap: 0, index: i }`.
125                 let slot = buffer.add(i);
126                 ptr::write(&mut (*slot).stamp, AtomicUsize::new(i));
127             }
128         }
129 
130         // One lap is the smallest power of two greater than `cap`.
131         let one_lap = (cap + 1).next_power_of_two();
132 
133         ArrayQueue {
134             buffer,
135             cap,
136             one_lap,
137             head: CachePadded::new(AtomicUsize::new(head)),
138             tail: CachePadded::new(AtomicUsize::new(tail)),
139             _marker: PhantomData,
140         }
141     }
142 
143     /// Attempts to push an element into the queue.
144     ///
145     /// If the queue is full, the element is returned back as an error.
146     ///
147     /// # Examples
148     ///
149     /// ```
150     /// use crossbeam_queue::{ArrayQueue, PushError};
151     ///
152     /// let q = ArrayQueue::new(1);
153     ///
154     /// assert_eq!(q.push(10), Ok(()));
155     /// assert_eq!(q.push(20), Err(PushError(20)));
156     /// ```
push(&self, value: T) -> Result<(), PushError<T>>157     pub fn push(&self, value: T) -> Result<(), PushError<T>> {
158         let backoff = Backoff::new();
159         let mut tail = self.tail.load(Ordering::Relaxed);
160 
161         loop {
162             // Deconstruct the tail.
163             let index = tail & (self.one_lap - 1);
164             let lap = tail & !(self.one_lap - 1);
165 
166             // Inspect the corresponding slot.
167             let slot = unsafe { &*self.buffer.add(index) };
168             let stamp = slot.stamp.load(Ordering::Acquire);
169 
170             // If the tail and the stamp match, we may attempt to push.
171             if tail == stamp {
172                 let new_tail = if index + 1 < self.cap {
173                     // Same lap, incremented index.
174                     // Set to `{ lap: lap, index: index + 1 }`.
175                     tail + 1
176                 } else {
177                     // One lap forward, index wraps around to zero.
178                     // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
179                     lap.wrapping_add(self.one_lap)
180                 };
181 
182                 // Try moving the tail.
183                 match self.tail.compare_exchange_weak(
184                     tail,
185                     new_tail,
186                     Ordering::SeqCst,
187                     Ordering::Relaxed,
188                 ) {
189                     Ok(_) => {
190                         // Write the value into the slot and update the stamp.
191                         unsafe {
192                             slot.value.get().write(MaybeUninit::new(value));
193                         }
194                         slot.stamp.store(tail + 1, Ordering::Release);
195                         return Ok(());
196                     }
197                     Err(t) => {
198                         tail = t;
199                         backoff.spin();
200                     }
201                 }
202             } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
203                 atomic::fence(Ordering::SeqCst);
204                 let head = self.head.load(Ordering::Relaxed);
205 
206                 // If the head lags one lap behind the tail as well...
207                 if head.wrapping_add(self.one_lap) == tail {
208                     // ...then the queue is full.
209                     return Err(PushError(value));
210                 }
211 
212                 backoff.spin();
213                 tail = self.tail.load(Ordering::Relaxed);
214             } else {
215                 // Snooze because we need to wait for the stamp to get updated.
216                 backoff.snooze();
217                 tail = self.tail.load(Ordering::Relaxed);
218             }
219         }
220     }
221 
222     /// Attempts to pop an element from the queue.
223     ///
224     /// If the queue is empty, an error is returned.
225     ///
226     /// # Examples
227     ///
228     /// ```
229     /// use crossbeam_queue::{ArrayQueue, PopError};
230     ///
231     /// let q = ArrayQueue::new(1);
232     /// assert_eq!(q.push(10), Ok(()));
233     ///
234     /// assert_eq!(q.pop(), Ok(10));
235     /// assert_eq!(q.pop(), Err(PopError));
236     /// ```
pop(&self) -> Result<T, PopError>237     pub fn pop(&self) -> Result<T, PopError> {
238         let backoff = Backoff::new();
239         let mut head = self.head.load(Ordering::Relaxed);
240 
241         loop {
242             // Deconstruct the head.
243             let index = head & (self.one_lap - 1);
244             let lap = head & !(self.one_lap - 1);
245 
246             // Inspect the corresponding slot.
247             let slot = unsafe { &*self.buffer.add(index) };
248             let stamp = slot.stamp.load(Ordering::Acquire);
249 
250             // If the the stamp is ahead of the head by 1, we may attempt to pop.
251             if head + 1 == stamp {
252                 let new = if index + 1 < self.cap {
253                     // Same lap, incremented index.
254                     // Set to `{ lap: lap, index: index + 1 }`.
255                     head + 1
256                 } else {
257                     // One lap forward, index wraps around to zero.
258                     // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
259                     lap.wrapping_add(self.one_lap)
260                 };
261 
262                 // Try moving the head.
263                 match self.head.compare_exchange_weak(
264                     head,
265                     new,
266                     Ordering::SeqCst,
267                     Ordering::Relaxed,
268                 ) {
269                     Ok(_) => {
270                         // Read the value from the slot and update the stamp.
271                         let msg = unsafe { slot.value.get().read().assume_init() };
272                         slot.stamp
273                             .store(head.wrapping_add(self.one_lap), Ordering::Release);
274                         return Ok(msg);
275                     }
276                     Err(h) => {
277                         head = h;
278                         backoff.spin();
279                     }
280                 }
281             } else if stamp == head {
282                 atomic::fence(Ordering::SeqCst);
283                 let tail = self.tail.load(Ordering::Relaxed);
284 
285                 // If the tail equals the head, that means the channel is empty.
286                 if tail == head {
287                     return Err(PopError);
288                 }
289 
290                 backoff.spin();
291                 head = self.head.load(Ordering::Relaxed);
292             } else {
293                 // Snooze because we need to wait for the stamp to get updated.
294                 backoff.snooze();
295                 head = self.head.load(Ordering::Relaxed);
296             }
297         }
298     }
299 
300     /// Returns the capacity of the queue.
301     ///
302     /// # Examples
303     ///
304     /// ```
305     /// use crossbeam_queue::{ArrayQueue, PopError};
306     ///
307     /// let q = ArrayQueue::<i32>::new(100);
308     ///
309     /// assert_eq!(q.capacity(), 100);
310     /// ```
capacity(&self) -> usize311     pub fn capacity(&self) -> usize {
312         self.cap
313     }
314 
315     /// Returns `true` if the queue is empty.
316     ///
317     /// # Examples
318     ///
319     /// ```
320     /// use crossbeam_queue::{ArrayQueue, PopError};
321     ///
322     /// let q = ArrayQueue::new(100);
323     ///
324     /// assert!(q.is_empty());
325     /// q.push(1).unwrap();
326     /// assert!(!q.is_empty());
327     /// ```
is_empty(&self) -> bool328     pub fn is_empty(&self) -> bool {
329         let head = self.head.load(Ordering::SeqCst);
330         let tail = self.tail.load(Ordering::SeqCst);
331 
332         // Is the tail lagging one lap behind head?
333         // Is the tail equal to the head?
334         //
335         // Note: If the head changes just before we load the tail, that means there was a moment
336         // when the channel was not empty, so it is safe to just return `false`.
337         tail == head
338     }
339 
340     /// Returns `true` if the queue is full.
341     ///
342     /// # Examples
343     ///
344     /// ```
345     /// use crossbeam_queue::{ArrayQueue, PopError};
346     ///
347     /// let q = ArrayQueue::new(1);
348     ///
349     /// assert!(!q.is_full());
350     /// q.push(1).unwrap();
351     /// assert!(q.is_full());
352     /// ```
is_full(&self) -> bool353     pub fn is_full(&self) -> bool {
354         let tail = self.tail.load(Ordering::SeqCst);
355         let head = self.head.load(Ordering::SeqCst);
356 
357         // Is the head lagging one lap behind tail?
358         //
359         // Note: If the tail changes just before we load the head, that means there was a moment
360         // when the queue was not full, so it is safe to just return `false`.
361         head.wrapping_add(self.one_lap) == tail
362     }
363 
364     /// Returns the number of elements in the queue.
365     ///
366     /// # Examples
367     ///
368     /// ```
369     /// use crossbeam_queue::{ArrayQueue, PopError};
370     ///
371     /// let q = ArrayQueue::new(100);
372     /// assert_eq!(q.len(), 0);
373     ///
374     /// q.push(10).unwrap();
375     /// assert_eq!(q.len(), 1);
376     ///
377     /// q.push(20).unwrap();
378     /// assert_eq!(q.len(), 2);
379     /// ```
len(&self) -> usize380     pub fn len(&self) -> usize {
381         loop {
382             // Load the tail, then load the head.
383             let tail = self.tail.load(Ordering::SeqCst);
384             let head = self.head.load(Ordering::SeqCst);
385 
386             // If the tail didn't change, we've got consistent values to work with.
387             if self.tail.load(Ordering::SeqCst) == tail {
388                 let hix = head & (self.one_lap - 1);
389                 let tix = tail & (self.one_lap - 1);
390 
391                 return if hix < tix {
392                     tix - hix
393                 } else if hix > tix {
394                     self.cap - hix + tix
395                 } else if tail == head {
396                     0
397                 } else {
398                     self.cap
399                 };
400             }
401         }
402     }
403 }
404 
405 impl<T> Drop for ArrayQueue<T> {
drop(&mut self)406     fn drop(&mut self) {
407         // Get the index of the head.
408         let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1);
409 
410         // Loop over all slots that hold a message and drop them.
411         for i in 0..self.len() {
412             // Compute the index of the next slot holding a message.
413             let index = if hix + i < self.cap {
414                 hix + i
415             } else {
416                 hix + i - self.cap
417             };
418 
419             unsafe {
420                 let p = {
421                     let slot = &mut *self.buffer.add(index);
422                     let value = &mut *slot.value.get();
423                     value.as_mut_ptr()
424                 };
425                 p.drop_in_place();
426             }
427         }
428 
429         // Finally, deallocate the buffer, but don't run any destructors.
430         unsafe {
431             Vec::from_raw_parts(self.buffer, 0, self.cap);
432         }
433     }
434 }
435 
436 impl<T> fmt::Debug for ArrayQueue<T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result437     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
438         f.pad("ArrayQueue { .. }")
439     }
440 }
441