1 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
2 //!
3 //! Source:
4 //! - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
5 //!
6 //! Copyright & License:
7 //! - Copyright (c) 2010-2011 Dmitry Vyukov
8 //! - Simplified BSD License and Apache License, Version 2.0
9 //! - http://www.1024cores.net/home/code-license
fibonacci(sender: Sender<u64>)10
11 use alloc::vec::Vec;
12 use core::cell::UnsafeCell;
13 use core::fmt;
14 use core::marker::PhantomData;
15 use core::mem;
16 use core::ptr;
17 use core::sync::atomic::{self, AtomicUsize, Ordering};
18
main()19 use crossbeam_utils::{Backoff, CachePadded};
20
21 use err::{PopError, PushError};
22
23 /// A slot in a queue.
24 struct Slot<T> {
25 /// The current stamp.
26 ///
27 /// If the stamp equals the tail, this node will be next written to. If it equals head + 1,
28 /// this node will be next read from.
29 stamp: AtomicUsize,
30
31 /// The value in this slot.
32 value: UnsafeCell<T>,
33 }
34
35 /// A bounded multi-producer multi-consumer queue.
36 ///
37 /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
38 /// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an
39 /// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit
40 /// faster than [`SegQueue`].
41 ///
42 /// [`SegQueue`]: struct.SegQueue.html
43 ///
44 /// # Examples
45 ///
46 /// ```
47 /// use crossbeam_queue::{ArrayQueue, PushError};
48 ///
49 /// let q = ArrayQueue::new(2);
50 ///
51 /// assert_eq!(q.push('a'), Ok(()));
52 /// assert_eq!(q.push('b'), Ok(()));
53 /// assert_eq!(q.push('c'), Err(PushError('c')));
54 /// assert_eq!(q.pop(), Ok('a'));
55 /// ```
56 pub struct ArrayQueue<T> {
57 /// The head of the queue.
58 ///
59 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
60 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
61 ///
62 /// Elements are popped from the head of the queue.
63 head: CachePadded<AtomicUsize>,
64
65 /// The tail of the queue.
66 ///
67 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
68 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
69 ///
70 /// Elements are pushed into the tail of the queue.
71 tail: CachePadded<AtomicUsize>,
72
73 /// The buffer holding slots.
74 buffer: *mut Slot<T>,
75
76 /// The queue capacity.
77 cap: usize,
78
79 /// A stamp with the value of `{ lap: 1, index: 0 }`.
80 one_lap: usize,
81
82 /// Indicates that dropping an `ArrayQueue<T>` may drop elements of type `T`.
83 _marker: PhantomData<T>,
84 }
85
86 unsafe impl<T: Send> Sync for ArrayQueue<T> {}
87 unsafe impl<T: Send> Send for ArrayQueue<T> {}
88
89 impl<T> ArrayQueue<T> {
90 /// Creates a new bounded queue with the given capacity.
91 ///
92 /// # Panics
93 ///
94 /// Panics if the capacity is zero.
95 ///
96 /// # Examples
97 ///
98 /// ```
99 /// use crossbeam_queue::ArrayQueue;
100 ///
101 /// let q = ArrayQueue::<i32>::new(100);
102 /// ```
103 pub fn new(cap: usize) -> ArrayQueue<T> {
104 assert!(cap > 0, "capacity must be non-zero");
105
106 // Head is initialized to `{ lap: 0, index: 0 }`.
107 // Tail is initialized to `{ lap: 0, index: 0 }`.
108 let head = 0;
109 let tail = 0;
110
111 // Allocate a buffer of `cap` slots.
112 let buffer = {
113 let mut v = Vec::<Slot<T>>::with_capacity(cap);
114 let ptr = v.as_mut_ptr();
115 mem::forget(v);
116 ptr
117 };
118
119 // Initialize stamps in the slots.
120 for i in 0..cap {
121 unsafe {
122 // Set the stamp to `{ lap: 0, index: i }`.
123 let slot = buffer.add(i);
124 ptr::write(&mut (*slot).stamp, AtomicUsize::new(i));
125 }
126 }
127
128 // One lap is the smallest power of two greater than `cap`.
129 let one_lap = (cap + 1).next_power_of_two();
130
131 ArrayQueue {
132 buffer,
133 cap,
134 one_lap,
135 head: CachePadded::new(AtomicUsize::new(head)),
136 tail: CachePadded::new(AtomicUsize::new(tail)),
137 _marker: PhantomData,
138 }
139 }
140
141 /// Attempts to push an element into the queue.
142 ///
143 /// If the queue is full, the element is returned back as an error.
144 ///
145 /// # Examples
146 ///
147 /// ```
148 /// use crossbeam_queue::{ArrayQueue, PushError};
149 ///
150 /// let q = ArrayQueue::new(1);
151 ///
152 /// assert_eq!(q.push(10), Ok(()));
153 /// assert_eq!(q.push(20), Err(PushError(20)));
154 /// ```
155 pub fn push(&self, value: T) -> Result<(), PushError<T>> {
156 let backoff = Backoff::new();
157 let mut tail = self.tail.load(Ordering::Relaxed);
158
159 loop {
160 // Deconstruct the tail.
161 let index = tail & (self.one_lap - 1);
162 let lap = tail & !(self.one_lap - 1);
163
164 // Inspect the corresponding slot.
165 let slot = unsafe { &*self.buffer.add(index) };
166 let stamp = slot.stamp.load(Ordering::Acquire);
167
168 // If the tail and the stamp match, we may attempt to push.
169 if tail == stamp {
170 let new_tail = if index + 1 < self.cap {
171 // Same lap, incremented index.
172 // Set to `{ lap: lap, index: index + 1 }`.
173 tail + 1
174 } else {
175 // One lap forward, index wraps around to zero.
176 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
177 lap.wrapping_add(self.one_lap)
178 };
179
180 // Try moving the tail.
181 match self.tail.compare_exchange_weak(
182 tail,
183 new_tail,
184 Ordering::SeqCst,
185 Ordering::Relaxed,
186 ) {
187 Ok(_) => {
188 // Write the value into the slot and update the stamp.
189 unsafe {
190 slot.value.get().write(value);
191 }
192 slot.stamp.store(tail + 1, Ordering::Release);
193 return Ok(());
194 }
195 Err(t) => {
196 tail = t;
197 backoff.spin();
198 }
199 }
200 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
201 atomic::fence(Ordering::SeqCst);
202 let head = self.head.load(Ordering::Relaxed);
203
204 // If the head lags one lap behind the tail as well...
205 if head.wrapping_add(self.one_lap) == tail {
206 // ...then the queue is full.
207 return Err(PushError(value));
208 }
209
210 backoff.spin();
211 tail = self.tail.load(Ordering::Relaxed);
212 } else {
213 // Snooze because we need to wait for the stamp to get updated.
214 backoff.snooze();
215 tail = self.tail.load(Ordering::Relaxed);
216 }
217 }
218 }
219
220 /// Attempts to pop an element from the queue.
221 ///
222 /// If the queue is empty, an error is returned.
223 ///
224 /// # Examples
225 ///
226 /// ```
227 /// use crossbeam_queue::{ArrayQueue, PopError};
228 ///
229 /// let q = ArrayQueue::new(1);
230 /// assert_eq!(q.push(10), Ok(()));
231 ///
232 /// assert_eq!(q.pop(), Ok(10));
233 /// assert_eq!(q.pop(), Err(PopError));
234 /// ```
235 pub fn pop(&self) -> Result<T, PopError> {
236 let backoff = Backoff::new();
237 let mut head = self.head.load(Ordering::Relaxed);
238
239 loop {
240 // Deconstruct the head.
241 let index = head & (self.one_lap - 1);
242 let lap = head & !(self.one_lap - 1);
243
244 // Inspect the corresponding slot.
245 let slot = unsafe { &*self.buffer.add(index) };
246 let stamp = slot.stamp.load(Ordering::Acquire);
247
248 // If the the stamp is ahead of the head by 1, we may attempt to pop.
249 if head + 1 == stamp {
250 let new = if index + 1 < self.cap {
251 // Same lap, incremented index.
252 // Set to `{ lap: lap, index: index + 1 }`.
253 head + 1
254 } else {
255 // One lap forward, index wraps around to zero.
256 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
257 lap.wrapping_add(self.one_lap)
258 };
259
260 // Try moving the head.
261 match self.head.compare_exchange_weak(
262 head,
263 new,
264 Ordering::SeqCst,
265 Ordering::Relaxed,
266 ) {
267 Ok(_) => {
268 // Read the value from the slot and update the stamp.
269 let msg = unsafe { slot.value.get().read() };
270 slot.stamp
271 .store(head.wrapping_add(self.one_lap), Ordering::Release);
272 return Ok(msg);
273 }
274 Err(h) => {
275 head = h;
276 backoff.spin();
277 }
278 }
279 } else if stamp == head {
280 atomic::fence(Ordering::SeqCst);
281 let tail = self.tail.load(Ordering::Relaxed);
282
283 // If the tail equals the head, that means the channel is empty.
284 if tail == head {
285 return Err(PopError);
286 }
287
288 backoff.spin();
289 head = self.head.load(Ordering::Relaxed);
290 } else {
291 // Snooze because we need to wait for the stamp to get updated.
292 backoff.snooze();
293 head = self.head.load(Ordering::Relaxed);
294 }
295 }
296 }
297
298 /// Returns the capacity of the queue.
299 ///
300 /// # Examples
301 ///
302 /// ```
303 /// use crossbeam_queue::{ArrayQueue, PopError};
304 ///
305 /// let q = ArrayQueue::<i32>::new(100);
306 ///
307 /// assert_eq!(q.capacity(), 100);
308 /// ```
309 pub fn capacity(&self) -> usize {
310 self.cap
311 }
312
313 /// Returns `true` if the queue is empty.
314 ///
315 /// # Examples
316 ///
317 /// ```
318 /// use crossbeam_queue::{ArrayQueue, PopError};
319 ///
320 /// let q = ArrayQueue::new(100);
321 ///
322 /// assert!(q.is_empty());
323 /// q.push(1).unwrap();
324 /// assert!(!q.is_empty());
325 /// ```
326 pub fn is_empty(&self) -> bool {
327 let head = self.head.load(Ordering::SeqCst);
328 let tail = self.tail.load(Ordering::SeqCst);
329
330 // Is the tail lagging one lap behind head?
331 // Is the tail equal to the head?
332 //
333 // Note: If the head changes just before we load the tail, that means there was a moment
334 // when the channel was not empty, so it is safe to just return `false`.
335 tail == head
336 }
337
338 /// Returns `true` if the queue is full.
339 ///
340 /// # Examples
341 ///
342 /// ```
343 /// use crossbeam_queue::{ArrayQueue, PopError};
344 ///
345 /// let q = ArrayQueue::new(1);
346 ///
347 /// assert!(!q.is_full());
348 /// q.push(1).unwrap();
349 /// assert!(q.is_full());
350 /// ```
351 pub fn is_full(&self) -> bool {
352 let tail = self.tail.load(Ordering::SeqCst);
353 let head = self.head.load(Ordering::SeqCst);
354
355 // Is the head lagging one lap behind tail?
356 //
357 // Note: If the tail changes just before we load the head, that means there was a moment
358 // when the queue was not full, so it is safe to just return `false`.
359 head.wrapping_add(self.one_lap) == tail
360 }
361
362 /// Returns the number of elements in the queue.
363 ///
364 /// # Examples
365 ///
366 /// ```
367 /// use crossbeam_queue::{ArrayQueue, PopError};
368 ///
369 /// let q = ArrayQueue::new(100);
370 /// assert_eq!(q.len(), 0);
371 ///
372 /// q.push(10).unwrap();
373 /// assert_eq!(q.len(), 1);
374 ///
375 /// q.push(20).unwrap();
376 /// assert_eq!(q.len(), 2);
377 /// ```
378 pub fn len(&self) -> usize {
379 loop {
380 // Load the tail, then load the head.
381 let tail = self.tail.load(Ordering::SeqCst);
382 let head = self.head.load(Ordering::SeqCst);
383
384 // If the tail didn't change, we've got consistent values to work with.
385 if self.tail.load(Ordering::SeqCst) == tail {
386 let hix = head & (self.one_lap - 1);
387 let tix = tail & (self.one_lap - 1);
388
389 return if hix < tix {
390 tix - hix
391 } else if hix > tix {
392 self.cap - hix + tix
393 } else if tail == head {
394 0
395 } else {
396 self.cap
397 };
398 }
399 }
400 }
401 }
402
403 impl<T> Drop for ArrayQueue<T> {
404 fn drop(&mut self) {
405 // Get the index of the head.
406 let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1);
407
408 // Loop over all slots that hold a message and drop them.
409 for i in 0..self.len() {
410 // Compute the index of the next slot holding a message.
411 let index = if hix + i < self.cap {
412 hix + i
413 } else {
414 hix + i - self.cap
415 };
416
417 unsafe {
418 self.buffer.add(index).drop_in_place();
419 }
420 }
421
422 // Finally, deallocate the buffer, but don't run any destructors.
423 unsafe {
424 Vec::from_raw_parts(self.buffer, 0, self.cap);
425 }
426 }
427 }
428
429 impl<T> fmt::Debug for ArrayQueue<T> {
430 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
431 f.pad("ArrayQueue { .. }")
432 }
433 }
434