1 //! A concurrent multi-producer multi-consumer queue.
2 //!
3 //! There are two kinds of queues:
4 //!
5 //! 1. [Bounded] queue with limited capacity.
6 //! 2. [Unbounded] queue with unlimited capacity.
7 //!
8 //! Queues also have the capability to get [closed] at any point. When closed, no more items can be
9 //! pushed into the queue, although the remaining items can still be popped.
10 //!
11 //! These features make it easy to build channels similar to [`std::sync::mpsc`] on top of this
12 //! crate.
13 //!
14 //! # Examples
15 //!
16 //! ```
17 //! use concurrent_queue::ConcurrentQueue;
18 //!
19 //! let q = ConcurrentQueue::unbounded();
20 //! q.push(1).unwrap();
21 //! q.push(2).unwrap();
22 //!
23 //! assert_eq!(q.pop(), Ok(1));
24 //! assert_eq!(q.pop(), Ok(2));
25 //! ```
26 //!
27 //! [Bounded]: `ConcurrentQueue::bounded()`
28 //! [Unbounded]: `ConcurrentQueue::unbounded()`
29 //! [closed]: `ConcurrentQueue::close()`
30
31 #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
32
33 use std::error;
34 use std::fmt;
35 use std::panic::{RefUnwindSafe, UnwindSafe};
36 use std::sync::atomic::{self, AtomicUsize, Ordering};
37
38 use crate::bounded::Bounded;
39 use crate::single::Single;
40 use crate::unbounded::Unbounded;
41
42 mod bounded;
43 mod single;
44 mod unbounded;
45
46 /// A concurrent queue.
47 ///
48 /// # Examples
49 ///
50 /// ```
51 /// use concurrent_queue::{ConcurrentQueue, PopError, PushError};
52 ///
53 /// let q = ConcurrentQueue::bounded(2);
54 ///
55 /// assert_eq!(q.push('a'), Ok(()));
56 /// assert_eq!(q.push('b'), Ok(()));
57 /// assert_eq!(q.push('c'), Err(PushError::Full('c')));
58 ///
59 /// assert_eq!(q.pop(), Ok('a'));
60 /// assert_eq!(q.pop(), Ok('b'));
61 /// assert_eq!(q.pop(), Err(PopError::Empty));
62 /// ```
63 pub struct ConcurrentQueue<T>(Inner<T>);
64
65 unsafe impl<T: Send> Send for ConcurrentQueue<T> {}
66 unsafe impl<T: Send> Sync for ConcurrentQueue<T> {}
67
68 impl<T> UnwindSafe for ConcurrentQueue<T> {}
69 impl<T> RefUnwindSafe for ConcurrentQueue<T> {}
70
71 enum Inner<T> {
72 Single(Single<T>),
73 Bounded(Box<Bounded<T>>),
74 Unbounded(Box<Unbounded<T>>),
75 }
76
77 impl<T> ConcurrentQueue<T> {
78 /// Creates a new bounded queue.
79 ///
80 /// The queue allocates enough space for `cap` items.
81 ///
82 /// # Panics
83 ///
84 /// If the capacity is zero, this constructor will panic.
85 ///
86 /// # Examples
87 ///
88 /// ```
89 /// use concurrent_queue::ConcurrentQueue;
90 ///
91 /// let q = ConcurrentQueue::<i32>::bounded(100);
92 /// ```
bounded(cap: usize) -> ConcurrentQueue<T>93 pub fn bounded(cap: usize) -> ConcurrentQueue<T> {
94 if cap == 1 {
95 ConcurrentQueue(Inner::Single(Single::new()))
96 } else {
97 ConcurrentQueue(Inner::Bounded(Box::new(Bounded::new(cap))))
98 }
99 }
100
101 /// Creates a new unbounded queue.
102 ///
103 /// # Examples
104 ///
105 /// ```
106 /// use concurrent_queue::ConcurrentQueue;
107 ///
108 /// let q = ConcurrentQueue::<i32>::unbounded();
109 /// ```
unbounded() -> ConcurrentQueue<T>110 pub fn unbounded() -> ConcurrentQueue<T> {
111 ConcurrentQueue(Inner::Unbounded(Box::new(Unbounded::new())))
112 }
113
114 /// Attempts to push an item into the queue.
115 ///
116 /// If the queue is full or closed, the item is returned back as an error.
117 ///
118 /// # Examples
119 ///
120 /// ```
121 /// use concurrent_queue::{ConcurrentQueue, PushError};
122 ///
123 /// let q = ConcurrentQueue::bounded(1);
124 ///
125 /// // Push succeeds because there is space in the queue.
126 /// assert_eq!(q.push(10), Ok(()));
127 ///
128 /// // Push errors because the queue is now full.
129 /// assert_eq!(q.push(20), Err(PushError::Full(20)));
130 ///
131 /// // Close the queue, which will prevent further pushes.
132 /// q.close();
133 ///
134 /// // Pushing now errors indicating the queue is closed.
135 /// assert_eq!(q.push(20), Err(PushError::Closed(20)));
136 ///
137 /// // Pop the single item in the queue.
138 /// assert_eq!(q.pop(), Ok(10));
139 ///
140 /// // Even though there is space, no more items can be pushed.
141 /// assert_eq!(q.push(20), Err(PushError::Closed(20)));
142 /// ```
push(&self, value: T) -> Result<(), PushError<T>>143 pub fn push(&self, value: T) -> Result<(), PushError<T>> {
144 match &self.0 {
145 Inner::Single(q) => q.push(value),
146 Inner::Bounded(q) => q.push(value),
147 Inner::Unbounded(q) => q.push(value),
148 }
149 }
150
151 /// Attempts to pop an item from the queue.
152 ///
153 /// If the queue is empty, an error is returned.
154 ///
155 /// # Examples
156 ///
157 /// ```
158 /// use concurrent_queue::{ConcurrentQueue, PopError};
159 ///
160 /// let q = ConcurrentQueue::bounded(1);
161 ///
162 /// // Pop errors when the queue is empty.
163 /// assert_eq!(q.pop(), Err(PopError::Empty));
164 ///
165 /// // Push one item and close the queue.
166 /// assert_eq!(q.push(10), Ok(()));
167 /// q.close();
168 ///
169 /// // Remaining items can be popped.
170 /// assert_eq!(q.pop(), Ok(10));
171 ///
172 /// // Again, pop errors when the queue is empty,
173 /// // but now also indicates that the queue is closed.
174 /// assert_eq!(q.pop(), Err(PopError::Closed));
175 /// ```
pop(&self) -> Result<T, PopError>176 pub fn pop(&self) -> Result<T, PopError> {
177 match &self.0 {
178 Inner::Single(q) => q.pop(),
179 Inner::Bounded(q) => q.pop(),
180 Inner::Unbounded(q) => q.pop(),
181 }
182 }
183
184 /// Returns `true` if the queue is empty.
185 ///
186 /// # Examples
187 ///
188 /// ```
189 /// use concurrent_queue::ConcurrentQueue;
190 ///
191 /// let q = ConcurrentQueue::<i32>::unbounded();
192 ///
193 /// assert!(q.is_empty());
194 /// q.push(1).unwrap();
195 /// assert!(!q.is_empty());
196 /// ```
is_empty(&self) -> bool197 pub fn is_empty(&self) -> bool {
198 match &self.0 {
199 Inner::Single(q) => q.is_empty(),
200 Inner::Bounded(q) => q.is_empty(),
201 Inner::Unbounded(q) => q.is_empty(),
202 }
203 }
204
205 /// Returns `true` if the queue is full.
206 ///
207 /// An unbounded queue is never full.
208 ///
209 /// # Examples
210 ///
211 /// ```
212 /// use concurrent_queue::ConcurrentQueue;
213 ///
214 /// let q = ConcurrentQueue::bounded(1);
215 ///
216 /// assert!(!q.is_full());
217 /// q.push(1).unwrap();
218 /// assert!(q.is_full());
219 /// ```
is_full(&self) -> bool220 pub fn is_full(&self) -> bool {
221 match &self.0 {
222 Inner::Single(q) => q.is_full(),
223 Inner::Bounded(q) => q.is_full(),
224 Inner::Unbounded(q) => q.is_full(),
225 }
226 }
227
228 /// Returns the number of items in the queue.
229 ///
230 /// # Examples
231 ///
232 /// ```
233 /// use concurrent_queue::ConcurrentQueue;
234 ///
235 /// let q = ConcurrentQueue::unbounded();
236 /// assert_eq!(q.len(), 0);
237 ///
238 /// assert_eq!(q.push(10), Ok(()));
239 /// assert_eq!(q.len(), 1);
240 ///
241 /// assert_eq!(q.push(20), Ok(()));
242 /// assert_eq!(q.len(), 2);
243 /// ```
len(&self) -> usize244 pub fn len(&self) -> usize {
245 match &self.0 {
246 Inner::Single(q) => q.len(),
247 Inner::Bounded(q) => q.len(),
248 Inner::Unbounded(q) => q.len(),
249 }
250 }
251
252 /// Returns the capacity of the queue.
253 ///
254 /// Unbounded queues have infinite capacity, represented as [`None`].
255 ///
256 /// # Examples
257 ///
258 /// ```
259 /// use concurrent_queue::ConcurrentQueue;
260 ///
261 /// let q = ConcurrentQueue::<i32>::bounded(7);
262 /// assert_eq!(q.capacity(), Some(7));
263 ///
264 /// let q = ConcurrentQueue::<i32>::unbounded();
265 /// assert_eq!(q.capacity(), None);
266 /// ```
capacity(&self) -> Option<usize>267 pub fn capacity(&self) -> Option<usize> {
268 match &self.0 {
269 Inner::Single(_) => Some(1),
270 Inner::Bounded(q) => Some(q.capacity()),
271 Inner::Unbounded(_) => None,
272 }
273 }
274
275 /// Closes the queue.
276 ///
277 /// Returns `true` if this call closed the queue, or `false` if it was already closed.
278 ///
279 /// When a queue is closed, no more items can be pushed but the remaining items can still be
280 /// popped.
281 ///
282 /// # Examples
283 ///
284 /// ```
285 /// use concurrent_queue::{ConcurrentQueue, PopError, PushError};
286 ///
287 /// let q = ConcurrentQueue::unbounded();
288 /// assert_eq!(q.push(10), Ok(()));
289 ///
290 /// assert!(q.close()); // `true` because this call closes the queue.
291 /// assert!(!q.close()); // `false` because the queue is already closed.
292 ///
293 /// // Cannot push any more items when closed.
294 /// assert_eq!(q.push(20), Err(PushError::Closed(20)));
295 ///
296 /// // Remaining items can still be popped.
297 /// assert_eq!(q.pop(), Ok(10));
298 ///
299 /// // When no more items are present, the error is `Closed`.
300 /// assert_eq!(q.pop(), Err(PopError::Closed));
301 /// ```
close(&self) -> bool302 pub fn close(&self) -> bool {
303 match &self.0 {
304 Inner::Single(q) => q.close(),
305 Inner::Bounded(q) => q.close(),
306 Inner::Unbounded(q) => q.close(),
307 }
308 }
309
310 /// Returns `true` if the queue is closed.
311 ///
312 /// # Examples
313 ///
314 /// ```
315 /// use concurrent_queue::ConcurrentQueue;
316 ///
317 /// let q = ConcurrentQueue::<i32>::unbounded();
318 ///
319 /// assert!(!q.is_closed());
320 /// q.close();
321 /// assert!(q.is_closed());
322 /// ```
is_closed(&self) -> bool323 pub fn is_closed(&self) -> bool {
324 match &self.0 {
325 Inner::Single(q) => q.is_closed(),
326 Inner::Bounded(q) => q.is_closed(),
327 Inner::Unbounded(q) => q.is_closed(),
328 }
329 }
330 }
331
332 impl<T> fmt::Debug for ConcurrentQueue<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result333 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
334 f.debug_struct("ConcurrentQueue")
335 .field("len", &self.len())
336 .field("capacity", &self.capacity())
337 .field("is_closed", &self.is_closed())
338 .finish()
339 }
340 }
341
342 /// Error which occurs when popping from an empty queue.
343 #[derive(Clone, Copy, Eq, PartialEq)]
344 pub enum PopError {
345 /// The queue is empty but not closed.
346 Empty,
347
348 /// The queue is empty and closed.
349 Closed,
350 }
351
352 impl PopError {
353 /// Returns `true` if the queue is empty but not closed.
is_empty(&self) -> bool354 pub fn is_empty(&self) -> bool {
355 match self {
356 PopError::Empty => true,
357 PopError::Closed => false,
358 }
359 }
360
361 /// Returns `true` if the queue is empty and closed.
is_closed(&self) -> bool362 pub fn is_closed(&self) -> bool {
363 match self {
364 PopError::Empty => false,
365 PopError::Closed => true,
366 }
367 }
368 }
369
370 impl error::Error for PopError {}
371
372 impl fmt::Debug for PopError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result373 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374 match self {
375 PopError::Empty => write!(f, "Empty"),
376 PopError::Closed => write!(f, "Closed"),
377 }
378 }
379 }
380
381 impl fmt::Display for PopError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result382 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
383 match self {
384 PopError::Empty => write!(f, "Empty"),
385 PopError::Closed => write!(f, "Closed"),
386 }
387 }
388 }
389
390 /// Error which occurs when pushing into a full or closed queue.
391 #[derive(Clone, Copy, Eq, PartialEq)]
392 pub enum PushError<T> {
393 /// The queue is full but not closed.
394 Full(T),
395
396 /// The queue is closed.
397 Closed(T),
398 }
399
400 impl<T> PushError<T> {
401 /// Unwraps the item that couldn't be pushed.
into_inner(self) -> T402 pub fn into_inner(self) -> T {
403 match self {
404 PushError::Full(t) => t,
405 PushError::Closed(t) => t,
406 }
407 }
408
409 /// Returns `true` if the queue is full but not closed.
is_full(&self) -> bool410 pub fn is_full(&self) -> bool {
411 match self {
412 PushError::Full(_) => true,
413 PushError::Closed(_) => false,
414 }
415 }
416
417 /// Returns `true` if the queue is closed.
is_closed(&self) -> bool418 pub fn is_closed(&self) -> bool {
419 match self {
420 PushError::Full(_) => false,
421 PushError::Closed(_) => true,
422 }
423 }
424 }
425
426 impl<T: fmt::Debug> error::Error for PushError<T> {}
427
428 impl<T: fmt::Debug> fmt::Debug for PushError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result429 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
430 match self {
431 PushError::Full(t) => f.debug_tuple("Full").field(t).finish(),
432 PushError::Closed(t) => f.debug_tuple("Closed").field(t).finish(),
433 }
434 }
435 }
436
437 impl<T> fmt::Display for PushError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result438 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
439 match self {
440 PushError::Full(_) => write!(f, "Full"),
441 PushError::Closed(_) => write!(f, "Closed"),
442 }
443 }
444 }
445
446 /// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
447 #[inline]
full_fence()448 fn full_fence() {
449 if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
450 // HACK(stjepang): On x86 architectures there are two different ways of executing
451 // a `SeqCst` fence.
452 //
453 // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
454 // 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg` instruction.
455 //
456 // Both instructions have the effect of a full barrier, but empirical benchmarks have shown
457 // that the second one is sometimes a bit faster.
458 //
459 // The ideal solution here would be to use inline assembly, but we're instead creating a
460 // temporary atomic variable and compare-and-exchanging its value. No sane compiler to
461 // x86 platforms is going to optimize this away.
462 let a = AtomicUsize::new(0);
463 a.compare_and_swap(0, 1, Ordering::SeqCst);
464 } else {
465 atomic::fence(Ordering::SeqCst);
466 }
467 }
468