1 //! A concurrent multi-producer multi-consumer queue.
2 //!
3 //! There are two kinds of queues:
4 //!
5 //! 1. [Bounded] queue with limited capacity.
6 //! 2. [Unbounded] queue with unlimited capacity.
7 //!
8 //! Queues also have the capability to get [closed] at any point. When closed, no more items can be
9 //! pushed into the queue, although the remaining items can still be popped.
10 //!
11 //! These features make it easy to build channels similar to [`std::sync::mpsc`] on top of this
12 //! crate.
13 //!
14 //! # Examples
15 //!
16 //! ```
17 //! use concurrent_queue::ConcurrentQueue;
18 //!
19 //! let q = ConcurrentQueue::unbounded();
20 //! q.push(1).unwrap();
21 //! q.push(2).unwrap();
22 //!
23 //! assert_eq!(q.pop(), Ok(1));
24 //! assert_eq!(q.pop(), Ok(2));
25 //! ```
26 //!
27 //! [Bounded]: `ConcurrentQueue::bounded()`
28 //! [Unbounded]: `ConcurrentQueue::unbounded()`
29 //! [closed]: `ConcurrentQueue::close()`
30 
31 #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
32 
33 use std::error;
34 use std::fmt;
35 use std::panic::{RefUnwindSafe, UnwindSafe};
36 use std::sync::atomic::{self, AtomicUsize, Ordering};
37 
38 use crate::bounded::Bounded;
39 use crate::single::Single;
40 use crate::unbounded::Unbounded;
41 
42 mod bounded;
43 mod single;
44 mod unbounded;
45 
46 /// A concurrent queue.
47 ///
48 /// # Examples
49 ///
50 /// ```
51 /// use concurrent_queue::{ConcurrentQueue, PopError, PushError};
52 ///
53 /// let q = ConcurrentQueue::bounded(2);
54 ///
55 /// assert_eq!(q.push('a'), Ok(()));
56 /// assert_eq!(q.push('b'), Ok(()));
57 /// assert_eq!(q.push('c'), Err(PushError::Full('c')));
58 ///
59 /// assert_eq!(q.pop(), Ok('a'));
60 /// assert_eq!(q.pop(), Ok('b'));
61 /// assert_eq!(q.pop(), Err(PopError::Empty));
62 /// ```
63 pub struct ConcurrentQueue<T>(Inner<T>);
64 
65 unsafe impl<T: Send> Send for ConcurrentQueue<T> {}
66 unsafe impl<T: Send> Sync for ConcurrentQueue<T> {}
67 
68 impl<T> UnwindSafe for ConcurrentQueue<T> {}
69 impl<T> RefUnwindSafe for ConcurrentQueue<T> {}
70 
71 enum Inner<T> {
72     Single(Single<T>),
73     Bounded(Box<Bounded<T>>),
74     Unbounded(Box<Unbounded<T>>),
75 }
76 
77 impl<T> ConcurrentQueue<T> {
78     /// Creates a new bounded queue.
79     ///
80     /// The queue allocates enough space for `cap` items.
81     ///
82     /// # Panics
83     ///
84     /// If the capacity is zero, this constructor will panic.
85     ///
86     /// # Examples
87     ///
88     /// ```
89     /// use concurrent_queue::ConcurrentQueue;
90     ///
91     /// let q = ConcurrentQueue::<i32>::bounded(100);
92     /// ```
bounded(cap: usize) -> ConcurrentQueue<T>93     pub fn bounded(cap: usize) -> ConcurrentQueue<T> {
94         if cap == 1 {
95             ConcurrentQueue(Inner::Single(Single::new()))
96         } else {
97             ConcurrentQueue(Inner::Bounded(Box::new(Bounded::new(cap))))
98         }
99     }
100 
101     /// Creates a new unbounded queue.
102     ///
103     /// # Examples
104     ///
105     /// ```
106     /// use concurrent_queue::ConcurrentQueue;
107     ///
108     /// let q = ConcurrentQueue::<i32>::unbounded();
109     /// ```
unbounded() -> ConcurrentQueue<T>110     pub fn unbounded() -> ConcurrentQueue<T> {
111         ConcurrentQueue(Inner::Unbounded(Box::new(Unbounded::new())))
112     }
113 
114     /// Attempts to push an item into the queue.
115     ///
116     /// If the queue is full or closed, the item is returned back as an error.
117     ///
118     /// # Examples
119     ///
120     /// ```
121     /// use concurrent_queue::{ConcurrentQueue, PushError};
122     ///
123     /// let q = ConcurrentQueue::bounded(1);
124     ///
125     /// // Push succeeds because there is space in the queue.
126     /// assert_eq!(q.push(10), Ok(()));
127     ///
128     /// // Push errors because the queue is now full.
129     /// assert_eq!(q.push(20), Err(PushError::Full(20)));
130     ///
131     /// // Close the queue, which will prevent further pushes.
132     /// q.close();
133     ///
134     /// // Pushing now errors indicating the queue is closed.
135     /// assert_eq!(q.push(20), Err(PushError::Closed(20)));
136     ///
137     /// // Pop the single item in the queue.
138     /// assert_eq!(q.pop(), Ok(10));
139     ///
140     /// // Even though there is space, no more items can be pushed.
141     /// assert_eq!(q.push(20), Err(PushError::Closed(20)));
142     /// ```
push(&self, value: T) -> Result<(), PushError<T>>143     pub fn push(&self, value: T) -> Result<(), PushError<T>> {
144         match &self.0 {
145             Inner::Single(q) => q.push(value),
146             Inner::Bounded(q) => q.push(value),
147             Inner::Unbounded(q) => q.push(value),
148         }
149     }
150 
151     /// Attempts to pop an item from the queue.
152     ///
153     /// If the queue is empty, an error is returned.
154     ///
155     /// # Examples
156     ///
157     /// ```
158     /// use concurrent_queue::{ConcurrentQueue, PopError};
159     ///
160     /// let q = ConcurrentQueue::bounded(1);
161     ///
162     /// // Pop errors when the queue is empty.
163     /// assert_eq!(q.pop(), Err(PopError::Empty));
164     ///
165     /// // Push one item and close the queue.
166     /// assert_eq!(q.push(10), Ok(()));
167     /// q.close();
168     ///
169     /// // Remaining items can be popped.
170     /// assert_eq!(q.pop(), Ok(10));
171     ///
172     /// // Again, pop errors when the queue is empty,
173     /// // but now also indicates that the queue is closed.
174     /// assert_eq!(q.pop(), Err(PopError::Closed));
175     /// ```
pop(&self) -> Result<T, PopError>176     pub fn pop(&self) -> Result<T, PopError> {
177         match &self.0 {
178             Inner::Single(q) => q.pop(),
179             Inner::Bounded(q) => q.pop(),
180             Inner::Unbounded(q) => q.pop(),
181         }
182     }
183 
184     /// Returns `true` if the queue is empty.
185     ///
186     /// # Examples
187     ///
188     /// ```
189     /// use concurrent_queue::ConcurrentQueue;
190     ///
191     /// let q = ConcurrentQueue::<i32>::unbounded();
192     ///
193     /// assert!(q.is_empty());
194     /// q.push(1).unwrap();
195     /// assert!(!q.is_empty());
196     /// ```
is_empty(&self) -> bool197     pub fn is_empty(&self) -> bool {
198         match &self.0 {
199             Inner::Single(q) => q.is_empty(),
200             Inner::Bounded(q) => q.is_empty(),
201             Inner::Unbounded(q) => q.is_empty(),
202         }
203     }
204 
205     /// Returns `true` if the queue is full.
206     ///
207     /// An unbounded queue is never full.
208     ///
209     /// # Examples
210     ///
211     /// ```
212     /// use concurrent_queue::ConcurrentQueue;
213     ///
214     /// let q = ConcurrentQueue::bounded(1);
215     ///
216     /// assert!(!q.is_full());
217     /// q.push(1).unwrap();
218     /// assert!(q.is_full());
219     /// ```
is_full(&self) -> bool220     pub fn is_full(&self) -> bool {
221         match &self.0 {
222             Inner::Single(q) => q.is_full(),
223             Inner::Bounded(q) => q.is_full(),
224             Inner::Unbounded(q) => q.is_full(),
225         }
226     }
227 
228     /// Returns the number of items in the queue.
229     ///
230     /// # Examples
231     ///
232     /// ```
233     /// use concurrent_queue::ConcurrentQueue;
234     ///
235     /// let q = ConcurrentQueue::unbounded();
236     /// assert_eq!(q.len(), 0);
237     ///
238     /// assert_eq!(q.push(10), Ok(()));
239     /// assert_eq!(q.len(), 1);
240     ///
241     /// assert_eq!(q.push(20), Ok(()));
242     /// assert_eq!(q.len(), 2);
243     /// ```
len(&self) -> usize244     pub fn len(&self) -> usize {
245         match &self.0 {
246             Inner::Single(q) => q.len(),
247             Inner::Bounded(q) => q.len(),
248             Inner::Unbounded(q) => q.len(),
249         }
250     }
251 
252     /// Returns the capacity of the queue.
253     ///
254     /// Unbounded queues have infinite capacity, represented as [`None`].
255     ///
256     /// # Examples
257     ///
258     /// ```
259     /// use concurrent_queue::ConcurrentQueue;
260     ///
261     /// let q = ConcurrentQueue::<i32>::bounded(7);
262     /// assert_eq!(q.capacity(), Some(7));
263     ///
264     /// let q = ConcurrentQueue::<i32>::unbounded();
265     /// assert_eq!(q.capacity(), None);
266     /// ```
capacity(&self) -> Option<usize>267     pub fn capacity(&self) -> Option<usize> {
268         match &self.0 {
269             Inner::Single(_) => Some(1),
270             Inner::Bounded(q) => Some(q.capacity()),
271             Inner::Unbounded(_) => None,
272         }
273     }
274 
275     /// Closes the queue.
276     ///
277     /// Returns `true` if this call closed the queue, or `false` if it was already closed.
278     ///
279     /// When a queue is closed, no more items can be pushed but the remaining items can still be
280     /// popped.
281     ///
282     /// # Examples
283     ///
284     /// ```
285     /// use concurrent_queue::{ConcurrentQueue, PopError, PushError};
286     ///
287     /// let q = ConcurrentQueue::unbounded();
288     /// assert_eq!(q.push(10), Ok(()));
289     ///
290     /// assert!(q.close());  // `true` because this call closes the queue.
291     /// assert!(!q.close()); // `false` because the queue is already closed.
292     ///
293     /// // Cannot push any more items when closed.
294     /// assert_eq!(q.push(20), Err(PushError::Closed(20)));
295     ///
296     /// // Remaining items can still be popped.
297     /// assert_eq!(q.pop(), Ok(10));
298     ///
299     /// // When no more items are present, the error is `Closed`.
300     /// assert_eq!(q.pop(), Err(PopError::Closed));
301     /// ```
close(&self) -> bool302     pub fn close(&self) -> bool {
303         match &self.0 {
304             Inner::Single(q) => q.close(),
305             Inner::Bounded(q) => q.close(),
306             Inner::Unbounded(q) => q.close(),
307         }
308     }
309 
310     /// Returns `true` if the queue is closed.
311     ///
312     /// # Examples
313     ///
314     /// ```
315     /// use concurrent_queue::ConcurrentQueue;
316     ///
317     /// let q = ConcurrentQueue::<i32>::unbounded();
318     ///
319     /// assert!(!q.is_closed());
320     /// q.close();
321     /// assert!(q.is_closed());
322     /// ```
is_closed(&self) -> bool323     pub fn is_closed(&self) -> bool {
324         match &self.0 {
325             Inner::Single(q) => q.is_closed(),
326             Inner::Bounded(q) => q.is_closed(),
327             Inner::Unbounded(q) => q.is_closed(),
328         }
329     }
330 }
331 
332 impl<T> fmt::Debug for ConcurrentQueue<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result333     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
334         f.debug_struct("ConcurrentQueue")
335             .field("len", &self.len())
336             .field("capacity", &self.capacity())
337             .field("is_closed", &self.is_closed())
338             .finish()
339     }
340 }
341 
342 /// Error which occurs when popping from an empty queue.
343 #[derive(Clone, Copy, Eq, PartialEq)]
344 pub enum PopError {
345     /// The queue is empty but not closed.
346     Empty,
347 
348     /// The queue is empty and closed.
349     Closed,
350 }
351 
352 impl PopError {
353     /// Returns `true` if the queue is empty but not closed.
is_empty(&self) -> bool354     pub fn is_empty(&self) -> bool {
355         match self {
356             PopError::Empty => true,
357             PopError::Closed => false,
358         }
359     }
360 
361     /// Returns `true` if the queue is empty and closed.
is_closed(&self) -> bool362     pub fn is_closed(&self) -> bool {
363         match self {
364             PopError::Empty => false,
365             PopError::Closed => true,
366         }
367     }
368 }
369 
370 impl error::Error for PopError {}
371 
372 impl fmt::Debug for PopError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result373     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374         match self {
375             PopError::Empty => write!(f, "Empty"),
376             PopError::Closed => write!(f, "Closed"),
377         }
378     }
379 }
380 
381 impl fmt::Display for PopError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result382     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
383         match self {
384             PopError::Empty => write!(f, "Empty"),
385             PopError::Closed => write!(f, "Closed"),
386         }
387     }
388 }
389 
390 /// Error which occurs when pushing into a full or closed queue.
391 #[derive(Clone, Copy, Eq, PartialEq)]
392 pub enum PushError<T> {
393     /// The queue is full but not closed.
394     Full(T),
395 
396     /// The queue is closed.
397     Closed(T),
398 }
399 
400 impl<T> PushError<T> {
401     /// Unwraps the item that couldn't be pushed.
into_inner(self) -> T402     pub fn into_inner(self) -> T {
403         match self {
404             PushError::Full(t) => t,
405             PushError::Closed(t) => t,
406         }
407     }
408 
409     /// Returns `true` if the queue is full but not closed.
is_full(&self) -> bool410     pub fn is_full(&self) -> bool {
411         match self {
412             PushError::Full(_) => true,
413             PushError::Closed(_) => false,
414         }
415     }
416 
417     /// Returns `true` if the queue is closed.
is_closed(&self) -> bool418     pub fn is_closed(&self) -> bool {
419         match self {
420             PushError::Full(_) => false,
421             PushError::Closed(_) => true,
422         }
423     }
424 }
425 
426 impl<T: fmt::Debug> error::Error for PushError<T> {}
427 
428 impl<T: fmt::Debug> fmt::Debug for PushError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result429     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
430         match self {
431             PushError::Full(t) => f.debug_tuple("Full").field(t).finish(),
432             PushError::Closed(t) => f.debug_tuple("Closed").field(t).finish(),
433         }
434     }
435 }
436 
437 impl<T> fmt::Display for PushError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result438     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
439         match self {
440             PushError::Full(_) => write!(f, "Full"),
441             PushError::Closed(_) => write!(f, "Closed"),
442         }
443     }
444 }
445 
446 /// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
447 #[inline]
full_fence()448 fn full_fence() {
449     if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
450         // HACK(stjepang): On x86 architectures there are two different ways of executing
451         // a `SeqCst` fence.
452         //
453         // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
454         // 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg` instruction.
455         //
456         // Both instructions have the effect of a full barrier, but empirical benchmarks have shown
457         // that the second one is sometimes a bit faster.
458         //
459         // The ideal solution here would be to use inline assembly, but we're instead creating a
460         // temporary atomic variable and compare-and-exchanging its value. No sane compiler to
461         // x86 platforms is going to optimize this away.
462         let a = AtomicUsize::new(0);
463         a.compare_and_swap(0, 1, Ordering::SeqCst);
464     } else {
465         atomic::fence(Ordering::SeqCst);
466     }
467 }
468