1 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
2 //!
3 //! Source:
4 //!   - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
5 //!
6 //! Copyright & License:
7 //!   - Copyright (c) 2010-2011 Dmitry Vyukov
8 //!   - Simplified BSD License and Apache License, Version 2.0
9 //!   - http://www.1024cores.net/home/code-license
10 
11 use std::cell::UnsafeCell;
12 use std::fmt;
13 use std::marker::PhantomData;
14 use std::mem;
15 use std::ptr;
16 use std::sync::atomic::{self, AtomicUsize, Ordering};
17 
18 use crossbeam_utils::{Backoff, CachePadded};
19 
20 use err::{PopError, PushError};
21 
22 /// A slot in a queue.
23 struct Slot<T> {
24     /// The current stamp.
25     ///
26     /// If the stamp equals the tail, this node will be next written to. If it equals the head,
27     /// this node will be next read from.
28     stamp: AtomicUsize,
29 
30     /// The value in this slot.
31     value: UnsafeCell<T>,
32 }
33 
34 /// A bounded multi-producer multi-consumer queue.
35 ///
36 /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
37 /// elements. The queue cannot hold more elements that the buffer allows. Attempting to push an
38 /// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit
39 /// faster than [`SegQueue`].
40 ///
41 /// [`SegQueue`]: struct.SegQueue.html
42 ///
43 /// # Examples
44 ///
45 /// ```
46 /// use crossbeam_queue::{ArrayQueue, PushError};
47 ///
48 /// let q = ArrayQueue::new(2);
49 ///
50 /// assert_eq!(q.push('a'), Ok(()));
51 /// assert_eq!(q.push('b'), Ok(()));
52 /// assert_eq!(q.push('c'), Err(PushError('c')));
53 /// assert_eq!(q.pop(), Ok('a'));
54 /// ```
55 pub struct ArrayQueue<T> {
56     /// The head of the queue.
57     ///
58     /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
59     /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
60     ///
61     /// Elements are popped from the head of the queue.
62     head: CachePadded<AtomicUsize>,
63 
64     /// The tail of the queue.
65     ///
66     /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
67     /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
68     ///
69     /// Elements are pushed into the tail of the queue.
70     tail: CachePadded<AtomicUsize>,
71 
72     /// The buffer holding slots.
73     buffer: *mut Slot<T>,
74 
75     /// The queue capacity.
76     cap: usize,
77 
78     /// A stamp with the value of `{ lap: 1, index: 0 }`.
79     one_lap: usize,
80 
81     /// Indicates that dropping an `ArrayQueue<T>` may drop elements of type `T`.
82     _marker: PhantomData<T>,
83 }
84 
85 unsafe impl<T: Send> Sync for ArrayQueue<T> {}
86 unsafe impl<T: Send> Send for ArrayQueue<T> {}
87 
88 impl<T> ArrayQueue<T> {
89     /// Creates a new bounded queue with the given capacity.
90     ///
91     /// # Panics
92     ///
93     /// Panics if the capacity is zero.
94     ///
95     /// # Examples
96     ///
97     /// ```
98     /// use crossbeam_queue::ArrayQueue;
99     ///
100     /// let q = ArrayQueue::<i32>::new(100);
101     /// ```
new(cap: usize) -> ArrayQueue<T>102     pub fn new(cap: usize) -> ArrayQueue<T> {
103         assert!(cap > 0, "capacity must be non-zero");
104 
105         // Head is initialized to `{ lap: 0, index: 0 }`.
106         // Tail is initialized to `{ lap: 0, index: 0 }`.
107         let head = 0;
108         let tail = 0;
109 
110         // Allocate a buffer of `cap` slots.
111         let buffer = {
112             let mut v = Vec::<Slot<T>>::with_capacity(cap);
113             let ptr = v.as_mut_ptr();
114             mem::forget(v);
115             ptr
116         };
117 
118         // Initialize stamps in the slots.
119         for i in 0..cap {
120             unsafe {
121                 // Set the stamp to `{ lap: 0, index: i }`.
122                 let slot = buffer.add(i);
123                 ptr::write(&mut (*slot).stamp, AtomicUsize::new(i));
124             }
125         }
126 
127         // One lap is the smallest power of two greater than `cap`.
128         let one_lap = (cap + 1).next_power_of_two();
129 
130         ArrayQueue {
131             buffer,
132             cap,
133             one_lap,
134             head: CachePadded::new(AtomicUsize::new(head)),
135             tail: CachePadded::new(AtomicUsize::new(tail)),
136             _marker: PhantomData,
137         }
138     }
139 
140     /// Attempts to push an element into the queue.
141     ///
142     /// If the queue is full, the element is returned back as an error.
143     ///
144     /// # Examples
145     ///
146     /// ```
147     /// use crossbeam_queue::{ArrayQueue, PushError};
148     ///
149     /// let q = ArrayQueue::new(1);
150     ///
151     /// assert_eq!(q.push(10), Ok(()));
152     /// assert_eq!(q.push(20), Err(PushError(20)));
153     /// ```
push(&self, value: T) -> Result<(), PushError<T>>154     pub fn push(&self, value: T) -> Result<(), PushError<T>> {
155         let backoff = Backoff::new();
156         let mut tail = self.tail.load(Ordering::Relaxed);
157 
158         loop {
159             // Deconstruct the tail.
160             let index = tail & (self.one_lap - 1);
161             let lap = tail & !(self.one_lap - 1);
162 
163             // Inspect the corresponding slot.
164             let slot = unsafe { &*self.buffer.add(index) };
165             let stamp = slot.stamp.load(Ordering::Acquire);
166 
167             // If the tail and the stamp match, we may attempt to push.
168             if tail == stamp {
169                 let new_tail = if index + 1 < self.cap {
170                     // Same lap, incremented index.
171                     // Set to `{ lap: lap, index: index + 1 }`.
172                     tail + 1
173                 } else {
174                     // One lap forward, index wraps around to zero.
175                     // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
176                     lap.wrapping_add(self.one_lap)
177                 };
178 
179                 // Try moving the tail.
180                 match self
181                     .tail
182                     .compare_exchange_weak(tail, new_tail, Ordering::SeqCst, Ordering::Relaxed)
183                 {
184                     Ok(_) => {
185                         // Write the value into the slot and update the stamp.
186                         unsafe { slot.value.get().write(value); }
187                         slot.stamp.store(tail + 1, Ordering::Release);
188                         return Ok(());
189                     }
190                     Err(t) => {
191                         tail = t;
192                         backoff.spin();
193                     }
194                 }
195             } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
196                 atomic::fence(Ordering::SeqCst);
197                 let head = self.head.load(Ordering::Relaxed);
198 
199                 // If the head lags one lap behind the tail as well...
200                 if head.wrapping_add(self.one_lap) == tail {
201                     // ...then the queue is full.
202                     return Err(PushError(value));
203                 }
204 
205                 backoff.spin();
206                 tail = self.tail.load(Ordering::Relaxed);
207             } else {
208                 // Snooze because we need to wait for the stamp to get updated.
209                 backoff.snooze();
210                 tail = self.tail.load(Ordering::Relaxed);
211             }
212         }
213     }
214 
215     /// Attempts to pop an element from the queue.
216     ///
217     /// If the queue is empty, an error is returned.
218     ///
219     /// # Examples
220     ///
221     /// ```
222     /// use crossbeam_queue::{ArrayQueue, PopError};
223     ///
224     /// let q = ArrayQueue::new(1);
225     /// assert_eq!(q.push(10), Ok(()));
226     ///
227     /// assert_eq!(q.pop(), Ok(10));
228     /// assert_eq!(q.pop(), Err(PopError));
229     /// ```
pop(&self) -> Result<T, PopError>230     pub fn pop(&self) -> Result<T, PopError> {
231         let backoff = Backoff::new();
232         let mut head = self.head.load(Ordering::Relaxed);
233 
234         loop {
235             // Deconstruct the head.
236             let index = head & (self.one_lap - 1);
237             let lap = head & !(self.one_lap - 1);
238 
239             // Inspect the corresponding slot.
240             let slot = unsafe { &*self.buffer.add(index) };
241             let stamp = slot.stamp.load(Ordering::Acquire);
242 
243             // If the the stamp is ahead of the head by 1, we may attempt to pop.
244             if head + 1 == stamp {
245                 let new = if index + 1 < self.cap {
246                     // Same lap, incremented index.
247                     // Set to `{ lap: lap, index: index + 1 }`.
248                     head + 1
249                 } else {
250                     // One lap forward, index wraps around to zero.
251                     // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
252                     lap.wrapping_add(self.one_lap)
253                 };
254 
255                 // Try moving the head.
256                 match self
257                     .head
258                     .compare_exchange_weak(head, new, Ordering::SeqCst, Ordering::Relaxed)
259                 {
260                     Ok(_) => {
261                         // Read the value from the slot and update the stamp.
262                         let msg = unsafe { slot.value.get().read() };
263                         slot.stamp.store(head.wrapping_add(self.one_lap), Ordering::Release);
264                         return Ok(msg);
265                     }
266                     Err(h) => {
267                         head = h;
268                         backoff.spin();
269                     }
270                 }
271             } else if stamp == head {
272                 atomic::fence(Ordering::SeqCst);
273                 let tail = self.tail.load(Ordering::Relaxed);
274 
275                 // If the tail equals the head, that means the channel is empty.
276                 if tail == head {
277                     return Err(PopError);
278                 }
279 
280                 backoff.spin();
281                 head = self.head.load(Ordering::Relaxed);
282             } else {
283                 // Snooze because we need to wait for the stamp to get updated.
284                 backoff.snooze();
285                 head = self.head.load(Ordering::Relaxed);
286             }
287         }
288     }
289 
290     /// Returns the capacity of the queue.
291     ///
292     /// # Examples
293     ///
294     /// ```
295     /// use crossbeam_queue::{ArrayQueue, PopError};
296     ///
297     /// let q = ArrayQueue::<i32>::new(100);
298     ///
299     /// assert_eq!(q.capacity(), 100);
300     /// ```
capacity(&self) -> usize301     pub fn capacity(&self) -> usize {
302         self.cap
303     }
304 
305     /// Returns `true` if the queue is empty.
306     ///
307     /// # Examples
308     ///
309     /// ```
310     /// use crossbeam_queue::{ArrayQueue, PopError};
311     ///
312     /// let q = ArrayQueue::new(100);
313     ///
314     /// assert!(q.is_empty());
315     /// q.push(1).unwrap();
316     /// assert!(!q.is_empty());
317     /// ```
is_empty(&self) -> bool318     pub fn is_empty(&self) -> bool {
319         let head = self.head.load(Ordering::SeqCst);
320         let tail = self.tail.load(Ordering::SeqCst);
321 
322         // Is the tail lagging one lap behind head?
323         // Is the tail equal to the head?
324         //
325         // Note: If the head changes just before we load the tail, that means there was a moment
326         // when the channel was not empty, so it is safe to just return `false`.
327         tail == head
328     }
329 
330     /// Returns `true` if the queue is full.
331     ///
332     /// # Examples
333     ///
334     /// ```
335     /// use crossbeam_queue::{ArrayQueue, PopError};
336     ///
337     /// let q = ArrayQueue::new(1);
338     ///
339     /// assert!(!q.is_full());
340     /// q.push(1).unwrap();
341     /// assert!(q.is_full());
342     /// ```
is_full(&self) -> bool343     pub fn is_full(&self) -> bool {
344         let tail = self.tail.load(Ordering::SeqCst);
345         let head = self.head.load(Ordering::SeqCst);
346 
347         // Is the head lagging one lap behind tail?
348         //
349         // Note: If the tail changes just before we load the head, that means there was a moment
350         // when the queue was not full, so it is safe to just return `false`.
351         head.wrapping_add(self.one_lap) == tail
352     }
353 
354     /// Returns the number of elements in the queue.
355     ///
356     /// # Examples
357     ///
358     /// ```
359     /// use crossbeam_queue::{ArrayQueue, PopError};
360     ///
361     /// let q = ArrayQueue::new(100);
362     /// assert_eq!(q.len(), 0);
363     ///
364     /// q.push(10).unwrap();
365     /// assert_eq!(q.len(), 1);
366     ///
367     /// q.push(20).unwrap();
368     /// assert_eq!(q.len(), 2);
369     /// ```
len(&self) -> usize370     pub fn len(&self) -> usize {
371         loop {
372             // Load the tail, then load the head.
373             let tail = self.tail.load(Ordering::SeqCst);
374             let head = self.head.load(Ordering::SeqCst);
375 
376             // If the tail didn't change, we've got consistent values to work with.
377             if self.tail.load(Ordering::SeqCst) == tail {
378                 let hix = head & (self.one_lap - 1);
379                 let tix = tail & (self.one_lap - 1);
380 
381                 return if hix < tix {
382                     tix - hix
383                 } else if hix > tix {
384                     self.cap - hix + tix
385                 } else if tail == head {
386                     0
387                 } else {
388                     self.cap
389                 };
390             }
391         }
392     }
393 }
394 
395 impl<T> Drop for ArrayQueue<T> {
drop(&mut self)396     fn drop(&mut self) {
397         // Get the index of the head.
398         let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1);
399 
400         // Loop over all slots that hold a message and drop them.
401         for i in 0..self.len() {
402             // Compute the index of the next slot holding a message.
403             let index = if hix + i < self.cap {
404                 hix + i
405             } else {
406                 hix + i - self.cap
407             };
408 
409             unsafe {
410                 self.buffer.add(index).drop_in_place();
411             }
412         }
413 
414         // Finally, deallocate the buffer, but don't run any destructors.
415         unsafe {
416             Vec::from_raw_parts(self.buffer, 0, self.cap);
417         }
418     }
419 }
420 
421 impl<T> fmt::Debug for ArrayQueue<T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result422     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
423         f.pad("ArrayQueue { .. }")
424     }
425 }
426