1 //! A multi-producer, single-consumer queue for sending values across
2 //! asynchronous tasks.
3 //!
4 //! Similarly to the `std`, channel creation provides [`Receiver`] and
5 //! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6 //! read values out of the channel. If there is no message to read from the
7 //! channel, the current task will be notified when a new value is sent.
8 //! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9 //! the channel. If the channel is at capacity, the send will be rejected and
10 //! the task will be notified when additional capacity is available. In other
11 //! words, the channel provides backpressure.
12 //!
13 //! Unbounded channels are also available using the `unbounded` constructor.
14 //!
15 //! # Disconnection
16 //!
17 //! When all [`Sender`] handles have been dropped, it is no longer
18 //! possible to send values into the channel. This is considered the termination
19 //! event of the stream. As such, [`Receiver::poll_next`]
20 //! will return `Ok(Ready(None))`.
21 //!
22 //! If the [`Receiver`] handle is dropped, then messages can no longer
23 //! be read out of the channel. In this case, all further attempts to send will
24 //! result in an error.
25 //!
26 //! # Clean Shutdown
27 //!
28 //! If the [`Receiver`] is simply dropped, then it is possible for
29 //! there to be messages still in the channel that will not be processed. As
30 //! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31 //! receiver will first call `close`, which will prevent any further messages to
32 //! be sent into the channel. Then, the receiver consumes the channel to
33 //! completion, at which point the receiver can be dropped.
34 //!
35 //! [`Sender`]: struct.Sender.html
36 //! [`Receiver`]: struct.Receiver.html
37 //! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38 //! [`Receiver::poll_next`]:
39 //!     ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40 
41 // At the core, the channel uses an atomic FIFO queue for message passing. This
42 // queue is used as the primary coordination primitive. In order to enforce
43 // capacity limits and handle back pressure, a secondary FIFO queue is used to
44 // send parked task handles.
45 //
46 // The general idea is that the channel is created with a `buffer` size of `n`.
47 // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48 // slot to hold a message. This allows `Sender` to know for a fact that a send
49 // will succeed *before* starting to do the actual work of sending the value.
50 // Since most of this work is lock-free, once the work starts, it is impossible
51 // to safely revert.
52 //
53 // If the sender is unable to process a send operation, then the current
54 // task is parked and the handle is sent on the parked task queue.
55 //
56 // Note that the implementation guarantees that the channel capacity will never
57 // exceed the configured limit, however there is no *strict* guarantee that the
58 // receiver will wake up a parked task *immediately* when a slot becomes
59 // available. However, it will almost always unpark a task when a slot becomes
60 // available and it is *guaranteed* that a sender will be unparked when the
61 // message that caused the sender to become parked is read out of the channel.
62 //
63 // The steps for sending a message are roughly:
64 //
65 // 1) Increment the channel message count
66 // 2) If the channel is at capacity, push the task handle onto the wait queue
67 // 3) Push the message onto the message queue.
68 //
69 // The steps for receiving a message are roughly:
70 //
71 // 1) Pop a message from the message queue
72 // 2) Pop a task handle from the wait queue
73 // 3) Decrement the channel message count.
74 //
75 // It's important for the order of operations on lock-free structures to happen
76 // in reverse order between the sender and receiver. This makes the message
77 // queue the primary coordination structure and establishes the necessary
78 // happens-before semantics required for the acquire / release semantics used
79 // by the queue structure.
80 
81 use futures_core::stream::{FusedStream, Stream};
82 use futures_core::task::{Context, Poll, Waker};
83 use futures_core::task::__internal::AtomicWaker;
84 use std::fmt;
85 use std::pin::Pin;
86 use std::sync::{Arc, Mutex};
87 use std::sync::atomic::AtomicUsize;
88 use std::sync::atomic::Ordering::SeqCst;
89 
90 use crate::mpsc::queue::Queue;
91 
92 mod queue;
93 #[cfg(feature = "sink")]
94 mod sink_impl;
95 
96 #[derive(Debug)]
97 struct UnboundedSenderInner<T> {
98     // Channel state shared between the sender and receiver.
99     inner: Arc<UnboundedInner<T>>,
100 }
101 
102 #[derive(Debug)]
103 struct BoundedSenderInner<T> {
104     // Channel state shared between the sender and receiver.
105     inner: Arc<BoundedInner<T>>,
106 
107     // Handle to the task that is blocked on this sender. This handle is sent
108     // to the receiver half in order to be notified when the sender becomes
109     // unblocked.
110     sender_task: Arc<Mutex<SenderTask>>,
111 
112     // `true` if the sender might be blocked. This is an optimization to avoid
113     // having to lock the mutex most of the time.
114     maybe_parked: bool,
115 }
116 
117 // We never project Pin<&mut SenderInner> to `Pin<&mut T>`
118 impl<T> Unpin for UnboundedSenderInner<T> {}
119 impl<T> Unpin for BoundedSenderInner<T> {}
120 
121 /// The transmission end of a bounded mpsc channel.
122 ///
123 /// This value is created by the [`channel`](channel) function.
124 #[derive(Debug)]
125 pub struct Sender<T>(Option<BoundedSenderInner<T>>);
126 
127 /// The transmission end of an unbounded mpsc channel.
128 ///
129 /// This value is created by the [`unbounded`](unbounded) function.
130 #[derive(Debug)]
131 pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
132 
133 trait AssertKinds: Send + Sync + Clone {}
134 impl AssertKinds for UnboundedSender<u32> {}
135 
136 /// The receiving end of a bounded mpsc channel.
137 ///
138 /// This value is created by the [`channel`](channel) function.
139 #[derive(Debug)]
140 pub struct Receiver<T> {
141     inner: Option<Arc<BoundedInner<T>>>,
142 }
143 
144 /// The receiving end of an unbounded mpsc channel.
145 ///
146 /// This value is created by the [`unbounded`](unbounded) function.
147 #[derive(Debug)]
148 pub struct UnboundedReceiver<T> {
149     inner: Option<Arc<UnboundedInner<T>>>,
150 }
151 
152 // `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
153 impl<T> Unpin for UnboundedReceiver<T> {}
154 
155 /// The error type for [`Sender`s](Sender) used as `Sink`s.
156 #[derive(Clone, Debug, PartialEq, Eq)]
157 pub struct SendError {
158     kind: SendErrorKind,
159 }
160 
161 /// The error type returned from [`try_send`](Sender::try_send).
162 #[derive(Clone, PartialEq, Eq)]
163 pub struct TrySendError<T> {
164     err: SendError,
165     val: T,
166 }
167 
168 #[derive(Clone, Debug, PartialEq, Eq)]
169 enum SendErrorKind {
170     Full,
171     Disconnected,
172 }
173 
174 /// The error type returned from [`try_next`](Receiver::try_next).
175 pub struct TryRecvError {
176     _priv: (),
177 }
178 
179 impl fmt::Display for SendError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result180     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181         if self.is_full() {
182             write!(f, "send failed because channel is full")
183         } else {
184             write!(f, "send failed because receiver is gone")
185         }
186     }
187 }
188 
189 impl std::error::Error for SendError {}
190 
191 impl SendError {
192     /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool193     pub fn is_full(&self) -> bool {
194         match self.kind {
195             SendErrorKind::Full => true,
196             _ => false,
197         }
198     }
199 
200     /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool201     pub fn is_disconnected(&self) -> bool {
202         match self.kind {
203             SendErrorKind::Disconnected => true,
204             _ => false,
205         }
206     }
207 }
208 
209 impl<T> fmt::Debug for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result210     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211         f.debug_struct("TrySendError")
212             .field("kind", &self.err.kind)
213             .finish()
214     }
215 }
216 
217 impl<T> fmt::Display for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result218     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219         if self.is_full() {
220             write!(f, "send failed because channel is full")
221         } else {
222             write!(f, "send failed because receiver is gone")
223         }
224     }
225 }
226 
227 impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
228 
229 impl<T> TrySendError<T> {
230     /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool231     pub fn is_full(&self) -> bool {
232         self.err.is_full()
233     }
234 
235     /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool236     pub fn is_disconnected(&self) -> bool {
237         self.err.is_disconnected()
238     }
239 
240     /// Returns the message that was attempted to be sent but failed.
into_inner(self) -> T241     pub fn into_inner(self) -> T {
242         self.val
243     }
244 
245     /// Drops the message and converts into a `SendError`.
into_send_error(self) -> SendError246     pub fn into_send_error(self) -> SendError {
247         self.err
248     }
249 }
250 
251 impl fmt::Debug for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result252     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253         f.debug_tuple("TryRecvError")
254             .finish()
255     }
256 }
257 
258 impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result259     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260         write!(f, "receiver channel is empty")
261     }
262 }
263 
264 impl std::error::Error for TryRecvError {}
265 
266 #[derive(Debug)]
267 struct UnboundedInner<T> {
268     // Internal channel state. Consists of the number of messages stored in the
269     // channel as well as a flag signalling that the channel is closed.
270     state: AtomicUsize,
271 
272     // Atomic, FIFO queue used to send messages to the receiver
273     message_queue: Queue<T>,
274 
275     // Number of senders in existence
276     num_senders: AtomicUsize,
277 
278     // Handle to the receiver's task.
279     recv_task: AtomicWaker,
280 }
281 
282 #[derive(Debug)]
283 struct BoundedInner<T> {
284     // Max buffer size of the channel. If `None` then the channel is unbounded.
285     buffer: usize,
286 
287     // Internal channel state. Consists of the number of messages stored in the
288     // channel as well as a flag signalling that the channel is closed.
289     state: AtomicUsize,
290 
291     // Atomic, FIFO queue used to send messages to the receiver
292     message_queue: Queue<T>,
293 
294     // Atomic, FIFO queue used to send parked task handles to the receiver.
295     parked_queue: Queue<Arc<Mutex<SenderTask>>>,
296 
297     // Number of senders in existence
298     num_senders: AtomicUsize,
299 
300     // Handle to the receiver's task.
301     recv_task: AtomicWaker,
302 }
303 
304 // Struct representation of `Inner::state`.
305 #[derive(Debug, Clone, Copy)]
306 struct State {
307     // `true` when the channel is open
308     is_open: bool,
309 
310     // Number of messages in the channel
311     num_messages: usize,
312 }
313 
314 // The `is_open` flag is stored in the left-most bit of `Inner::state`
315 const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
316 
317 // When a new channel is created, it is created in the open state with no
318 // pending messages.
319 const INIT_STATE: usize = OPEN_MASK;
320 
321 // The maximum number of messages that a channel can track is `usize::max_value() >> 1`
322 const MAX_CAPACITY: usize = !(OPEN_MASK);
323 
324 // The maximum requested buffer size must be less than the maximum capacity of
325 // a channel. This is because each sender gets a guaranteed slot.
326 const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
327 
328 // Sent to the consumer to wake up blocked producers
329 #[derive(Debug)]
330 struct SenderTask {
331     task: Option<Waker>,
332     is_parked: bool,
333 }
334 
335 impl SenderTask {
new() -> Self336     fn new() -> Self {
337         SenderTask {
338             task: None,
339             is_parked: false,
340         }
341     }
342 
notify(&mut self)343     fn notify(&mut self) {
344         self.is_parked = false;
345 
346         if let Some(task) = self.task.take() {
347             task.wake();
348         }
349     }
350 }
351 
352 /// Creates a bounded mpsc channel for communicating between asynchronous tasks.
353 ///
354 /// Being bounded, this channel provides backpressure to ensure that the sender
355 /// outpaces the receiver by only a limited amount. The channel's capacity is
356 /// equal to `buffer + num-senders`. In other words, each sender gets a
357 /// guaranteed slot in the channel capacity, and on top of that there are
358 /// `buffer` "first come, first serve" slots available to all senders.
359 ///
360 /// The [`Receiver`](Receiver) returned implements the
361 /// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements
362 /// `Sink`.
channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>)363 pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
364     // Check that the requested buffer size does not exceed the maximum buffer
365     // size permitted by the system.
366     assert!(buffer < MAX_BUFFER, "requested buffer size too large");
367 
368     let inner = Arc::new(BoundedInner {
369         buffer,
370         state: AtomicUsize::new(INIT_STATE),
371         message_queue: Queue::new(),
372         parked_queue: Queue::new(),
373         num_senders: AtomicUsize::new(1),
374         recv_task: AtomicWaker::new(),
375     });
376 
377     let tx = BoundedSenderInner {
378         inner: inner.clone(),
379         sender_task: Arc::new(Mutex::new(SenderTask::new())),
380         maybe_parked: false,
381     };
382 
383     let rx = Receiver {
384         inner: Some(inner),
385     };
386 
387     (Sender(Some(tx)), rx)
388 }
389 
390 /// Creates an unbounded mpsc channel for communicating between asynchronous
391 /// tasks.
392 ///
393 /// A `send` on this channel will always succeed as long as the receive half has
394 /// not been closed. If the receiver falls behind, messages will be arbitrarily
395 /// buffered.
396 ///
397 /// **Note** that the amount of available system memory is an implicit bound to
398 /// the channel. Using an `unbounded` channel has the ability of causing the
399 /// process to run out of memory. In this case, the process will be aborted.
unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)400 pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
401 
402     let inner = Arc::new(UnboundedInner {
403         state: AtomicUsize::new(INIT_STATE),
404         message_queue: Queue::new(),
405         num_senders: AtomicUsize::new(1),
406         recv_task: AtomicWaker::new(),
407     });
408 
409     let tx = UnboundedSenderInner {
410         inner: inner.clone(),
411     };
412 
413     let rx = UnboundedReceiver {
414         inner: Some(inner),
415     };
416 
417     (UnboundedSender(Some(tx)), rx)
418 }
419 
420 /*
421  *
422  * ===== impl Sender =====
423  *
424  */
425 
426 impl<T> UnboundedSenderInner<T> {
poll_ready_nb(&self) -> Poll<Result<(), SendError>>427     fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
428         let state = decode_state(self.inner.state.load(SeqCst));
429         if state.is_open {
430             Poll::Ready(Ok(()))
431         } else {
432             Poll::Ready(Err(SendError {
433                 kind: SendErrorKind::Disconnected,
434             }))
435         }
436     }
437 
438 
439     // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)440     fn queue_push_and_signal(&self, msg: T) {
441         // Push the message onto the message queue
442         self.inner.message_queue.push(msg);
443 
444         // Signal to the receiver that a message has been enqueued. If the
445         // receiver is parked, this will unpark the task.
446         self.inner.recv_task.wake();
447     }
448 
449     // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>450     fn inc_num_messages(&self) -> Option<usize> {
451         let mut curr = self.inner.state.load(SeqCst);
452 
453         loop {
454             let mut state = decode_state(curr);
455 
456             // The receiver end closed the channel.
457             if !state.is_open {
458                 return None;
459             }
460 
461             // This probably is never hit? Odds are the process will run out of
462             // memory first. It may be worth to return something else in this
463             // case?
464             assert!(state.num_messages < MAX_CAPACITY, "buffer space \
465                     exhausted; sending this messages would overflow the state");
466 
467             state.num_messages += 1;
468 
469             let next = encode_state(&state);
470             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
471                 Ok(_) => {
472                     return Some(state.num_messages)
473                 }
474                 Err(actual) => curr = actual,
475             }
476         }
477     }
478 
479     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool480     fn same_receiver(&self, other: &Self) -> bool {
481         Arc::ptr_eq(&self.inner, &other.inner)
482     }
483 
484     /// Returns pointer to the Arc containing sender
485     ///
486     /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const UnboundedInner<T>487     fn ptr(&self) -> *const UnboundedInner<T> {
488         &*self.inner
489     }
490 
491     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool492     fn is_closed(&self) -> bool {
493         !decode_state(self.inner.state.load(SeqCst)).is_open
494     }
495 
496     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)497     fn close_channel(&self) {
498         // There's no need to park this sender, its dropping,
499         // and we don't want to check for capacity, so skip
500         // that stuff from `do_send`.
501 
502         self.inner.set_closed();
503         self.inner.recv_task.wake();
504     }
505 }
506 
507 impl<T> BoundedSenderInner<T> {
508     /// Attempts to send a message on this `Sender`, returning the message
509     /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>510     fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
511         // If the sender is currently blocked, reject the message
512         if !self.poll_unparked(None).is_ready() {
513             return Err(TrySendError {
514                 err: SendError {
515                     kind: SendErrorKind::Full,
516                 },
517                 val: msg,
518             });
519         }
520 
521         // The channel has capacity to accept the message, so send it
522         self.do_send_b(msg)
523     }
524 
525     // Do the send without failing.
526     // Can be called only by bounded sender.
527     #[allow(clippy::debug_assert_with_mut_call)]
do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>>528     fn do_send_b(&mut self, msg: T)
529         -> Result<(), TrySendError<T>>
530     {
531         // Anyone callig do_send *should* make sure there is room first,
532         // but assert here for tests as a sanity check.
533         debug_assert!(self.poll_unparked(None).is_ready());
534 
535         // First, increment the number of messages contained by the channel.
536         // This operation will also atomically determine if the sender task
537         // should be parked.
538         //
539         // `None` is returned in the case that the channel has been closed by the
540         // receiver. This happens when `Receiver::close` is called or the
541         // receiver is dropped.
542         let park_self = match self.inc_num_messages() {
543             Some(num_messages) => {
544                 // Block if the current number of pending messages has exceeded
545                 // the configured buffer size
546                 num_messages > self.inner.buffer
547             }
548             None => return Err(TrySendError {
549                 err: SendError {
550                     kind: SendErrorKind::Disconnected,
551                 },
552                 val: msg,
553             }),
554         };
555 
556         // If the channel has reached capacity, then the sender task needs to
557         // be parked. This will send the task handle on the parked task queue.
558         //
559         // However, when `do_send` is called while dropping the `Sender`,
560         // `task::current()` can't be called safely. In this case, in order to
561         // maintain internal consistency, a blank message is pushed onto the
562         // parked task queue.
563         if park_self {
564             self.park();
565         }
566 
567         self.queue_push_and_signal(msg);
568 
569         Ok(())
570     }
571 
572     // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)573     fn queue_push_and_signal(&self, msg: T) {
574         // Push the message onto the message queue
575         self.inner.message_queue.push(msg);
576 
577         // Signal to the receiver that a message has been enqueued. If the
578         // receiver is parked, this will unpark the task.
579         self.inner.recv_task.wake();
580     }
581 
582     // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>583     fn inc_num_messages(&self) -> Option<usize> {
584         let mut curr = self.inner.state.load(SeqCst);
585 
586         loop {
587             let mut state = decode_state(curr);
588 
589             // The receiver end closed the channel.
590             if !state.is_open {
591                 return None;
592             }
593 
594             // This probably is never hit? Odds are the process will run out of
595             // memory first. It may be worth to return something else in this
596             // case?
597             assert!(state.num_messages < MAX_CAPACITY, "buffer space \
598                     exhausted; sending this messages would overflow the state");
599 
600             state.num_messages += 1;
601 
602             let next = encode_state(&state);
603             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
604                 Ok(_) => {
605                     return Some(state.num_messages)
606                 }
607                 Err(actual) => curr = actual,
608             }
609         }
610     }
611 
park(&mut self)612     fn park(&mut self) {
613         {
614             let mut sender = self.sender_task.lock().unwrap();
615             sender.task = None;
616             sender.is_parked = true;
617         }
618 
619         // Send handle over queue
620         let t = self.sender_task.clone();
621         self.inner.parked_queue.push(t);
622 
623         // Check to make sure we weren't closed after we sent our task on the
624         // queue
625         let state = decode_state(self.inner.state.load(SeqCst));
626         self.maybe_parked = state.is_open;
627     }
628 
629     /// Polls the channel to determine if there is guaranteed capacity to send
630     /// at least one item without waiting.
631     ///
632     /// # Return value
633     ///
634     /// This method returns:
635     ///
636     /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
637     /// - `Poll::Pending` if the channel may not have
638     ///   capacity, in which case the current task is queued to be notified once
639     ///   capacity is available;
640     /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), SendError>>641     fn poll_ready(
642         &mut self,
643         cx: &mut Context<'_>,
644     ) -> Poll<Result<(), SendError>> {
645         let state = decode_state(self.inner.state.load(SeqCst));
646         if !state.is_open {
647             return Poll::Ready(Err(SendError {
648                 kind: SendErrorKind::Disconnected,
649             }));
650         }
651 
652         self.poll_unparked(Some(cx)).map(Ok)
653     }
654 
655     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool656     fn same_receiver(&self, other: &Self) -> bool {
657         Arc::ptr_eq(&self.inner, &other.inner)
658     }
659 
660     /// Returns pointer to the Arc containing sender
661     ///
662     /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const BoundedInner<T>663     fn ptr(&self) -> *const BoundedInner<T> {
664         &*self.inner
665     }
666 
667     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool668     fn is_closed(&self) -> bool {
669         !decode_state(self.inner.state.load(SeqCst)).is_open
670     }
671 
672     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)673     fn close_channel(&self) {
674         // There's no need to park this sender, its dropping,
675         // and we don't want to check for capacity, so skip
676         // that stuff from `do_send`.
677 
678         self.inner.set_closed();
679         self.inner.recv_task.wake();
680     }
681 
poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()>682     fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
683         // First check the `maybe_parked` variable. This avoids acquiring the
684         // lock in most cases
685         if self.maybe_parked {
686             // Get a lock on the task handle
687             let mut task = self.sender_task.lock().unwrap();
688 
689             if !task.is_parked {
690                 self.maybe_parked = false;
691                 return Poll::Ready(())
692             }
693 
694             // At this point, an unpark request is pending, so there will be an
695             // unpark sometime in the future. We just need to make sure that
696             // the correct task will be notified.
697             //
698             // Update the task in case the `Sender` has been moved to another
699             // task
700             task.task = cx.map(|cx| cx.waker().clone());
701 
702             Poll::Pending
703         } else {
704             Poll::Ready(())
705         }
706     }
707 }
708 
709 impl<T> Sender<T> {
710     /// Attempts to send a message on this `Sender`, returning the message
711     /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>712     pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
713         if let Some(inner) = &mut self.0 {
714             inner.try_send(msg)
715         } else {
716             Err(TrySendError {
717                 err: SendError {
718                     kind: SendErrorKind::Disconnected,
719                 },
720                 val: msg,
721             })
722         }
723     }
724 
725     /// Send a message on the channel.
726     ///
727     /// This function should only be called after
728     /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
729     /// ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>730     pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
731         self.try_send(msg)
732             .map_err(|e| e.err)
733     }
734 
735     /// Polls the channel to determine if there is guaranteed capacity to send
736     /// at least one item without waiting.
737     ///
738     /// # Return value
739     ///
740     /// This method returns:
741     ///
742     /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
743     /// - `Poll::Pending` if the channel may not have
744     ///   capacity, in which case the current task is queued to be notified once
745     ///   capacity is available;
746     /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), SendError>>747     pub fn poll_ready(
748         &mut self,
749         cx: &mut Context<'_>,
750     ) -> Poll<Result<(), SendError>> {
751         let inner = self.0.as_mut().ok_or(SendError {
752             kind: SendErrorKind::Disconnected,
753         })?;
754         inner.poll_ready(cx)
755     }
756 
757     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool758     pub fn is_closed(&self) -> bool {
759         self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
760     }
761 
762     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&mut self)763     pub fn close_channel(&mut self) {
764         if let Some(inner) = &mut self.0 {
765             inner.close_channel();
766         }
767     }
768 
769     /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)770     pub fn disconnect(&mut self) {
771         self.0 = None;
772     }
773 
774     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool775     pub fn same_receiver(&self, other: &Self) -> bool {
776         match (&self.0, &other.0) {
777             (Some(inner), Some(other)) => inner.same_receiver(other),
778             _ => false,
779         }
780     }
781 
782     /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher783     pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
784         use std::hash::Hash;
785 
786         let ptr = self.0.as_ref().map(|inner| inner.ptr());
787         ptr.hash(hasher);
788     }
789 }
790 
791 impl<T> UnboundedSender<T> {
792     /// Check if the channel is ready to receive a message.
poll_ready( &self, _: &mut Context<'_>, ) -> Poll<Result<(), SendError>>793     pub fn poll_ready(
794         &self,
795         _: &mut Context<'_>,
796     ) -> Poll<Result<(), SendError>> {
797         let inner = self.0.as_ref().ok_or(SendError {
798             kind: SendErrorKind::Disconnected,
799         })?;
800         inner.poll_ready_nb()
801     }
802 
803     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool804     pub fn is_closed(&self) -> bool {
805         self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
806     }
807 
808     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)809     pub fn close_channel(&self) {
810         if let Some(inner) = &self.0 {
811             inner.close_channel();
812         }
813     }
814 
815     /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)816     pub fn disconnect(&mut self) {
817         self.0 = None;
818     }
819 
820     // Do the send without parking current task.
do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>>821     fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
822         if let Some(inner) = &self.0 {
823             if inner.inc_num_messages().is_some() {
824                 inner.queue_push_and_signal(msg);
825                 return Ok(());
826             }
827         }
828 
829         Err(TrySendError {
830             err: SendError {
831                 kind: SendErrorKind::Disconnected,
832             },
833             val: msg,
834         })
835     }
836 
837     /// Send a message on the channel.
838     ///
839     /// This method should only be called after `poll_ready` has been used to
840     /// verify that the channel is ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>841     pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
842         self.do_send_nb(msg)
843             .map_err(|e| e.err)
844     }
845 
846     /// Sends a message along this channel.
847     ///
848     /// This is an unbounded sender, so this function differs from `Sink::send`
849     /// by ensuring the return type reflects that the channel is always ready to
850     /// receive messages.
unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>>851     pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
852         self.do_send_nb(msg)
853     }
854 
855     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool856     pub fn same_receiver(&self, other: &Self) -> bool {
857         match (&self.0, &other.0) {
858             (Some(inner), Some(other)) => inner.same_receiver(other),
859             _ => false,
860         }
861     }
862 
863     /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher864     pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
865         use std::hash::Hash;
866 
867         let ptr = self.0.as_ref().map(|inner| inner.ptr());
868         ptr.hash(hasher);
869     }
870 }
871 
872 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>873     fn clone(&self) -> Sender<T> {
874         Sender(self.0.clone())
875     }
876 }
877 
878 impl<T> Clone for UnboundedSender<T> {
clone(&self) -> UnboundedSender<T>879     fn clone(&self) -> UnboundedSender<T> {
880         UnboundedSender(self.0.clone())
881     }
882 }
883 
884 impl<T> Clone for UnboundedSenderInner<T> {
clone(&self) -> UnboundedSenderInner<T>885     fn clone(&self) -> UnboundedSenderInner<T> {
886         // Since this atomic op isn't actually guarding any memory and we don't
887         // care about any orderings besides the ordering on the single atomic
888         // variable, a relaxed ordering is acceptable.
889         let mut curr = self.inner.num_senders.load(SeqCst);
890 
891         loop {
892             // If the maximum number of senders has been reached, then fail
893             if curr == MAX_BUFFER {
894                 panic!("cannot clone `Sender` -- too many outstanding senders");
895             }
896 
897             debug_assert!(curr < MAX_BUFFER);
898 
899             let next = curr + 1;
900             let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
901 
902             // The ABA problem doesn't matter here. We only care that the
903             // number of senders never exceeds the maximum.
904             if actual == curr {
905                 return UnboundedSenderInner {
906                     inner: self.inner.clone(),
907                 };
908             }
909 
910             curr = actual;
911         }
912     }
913 }
914 
915 impl<T> Clone for BoundedSenderInner<T> {
clone(&self) -> BoundedSenderInner<T>916     fn clone(&self) -> BoundedSenderInner<T> {
917         // Since this atomic op isn't actually guarding any memory and we don't
918         // care about any orderings besides the ordering on the single atomic
919         // variable, a relaxed ordering is acceptable.
920         let mut curr = self.inner.num_senders.load(SeqCst);
921 
922         loop {
923             // If the maximum number of senders has been reached, then fail
924             if curr == self.inner.max_senders() {
925                 panic!("cannot clone `Sender` -- too many outstanding senders");
926             }
927 
928             debug_assert!(curr < self.inner.max_senders());
929 
930             let next = curr + 1;
931             let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
932 
933             // The ABA problem doesn't matter here. We only care that the
934             // number of senders never exceeds the maximum.
935             if actual == curr {
936                 return BoundedSenderInner {
937                     inner: self.inner.clone(),
938                     sender_task: Arc::new(Mutex::new(SenderTask::new())),
939                     maybe_parked: false,
940                 };
941             }
942 
943             curr = actual;
944         }
945     }
946 }
947 
948 impl<T> Drop for UnboundedSenderInner<T> {
drop(&mut self)949     fn drop(&mut self) {
950         // Ordering between variables don't matter here
951         let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
952 
953         if prev == 1 {
954             self.close_channel();
955         }
956     }
957 }
958 
959 impl<T> Drop for BoundedSenderInner<T> {
drop(&mut self)960     fn drop(&mut self) {
961         // Ordering between variables don't matter here
962         let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
963 
964         if prev == 1 {
965             self.close_channel();
966         }
967     }
968 }
969 
970 /*
971  *
972  * ===== impl Receiver =====
973  *
974  */
975 
976 impl<T> Receiver<T> {
977     /// Closes the receiving half of a channel, without dropping it.
978     ///
979     /// This prevents any further messages from being sent on the channel while
980     /// still enabling the receiver to drain messages that are buffered.
close(&mut self)981     pub fn close(&mut self) {
982         if let Some(inner) = &mut self.inner {
983             inner.set_closed();
984 
985             // Wake up any threads waiting as they'll see that we've closed the
986             // channel and will continue on their merry way.
987             while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
988                 task.lock().unwrap().notify();
989             }
990         }
991     }
992 
993     /// Tries to receive the next message without notifying a context if empty.
994     ///
995     /// It is not recommended to call this function from inside of a future,
996     /// only when you've otherwise arranged to be notified when the channel is
997     /// no longer empty.
998     ///
999     /// This function will panic if called after `try_next` or `poll_next` has
1000     /// returned `None`.
try_next(&mut self) -> Result<Option<T>, TryRecvError>1001     pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1002         match self.next_message() {
1003             Poll::Ready(msg) => {
1004                 Ok(msg)
1005             },
1006             Poll::Pending => Err(TryRecvError { _priv: () }),
1007         }
1008     }
1009 
next_message(&mut self) -> Poll<Option<T>>1010     fn next_message(&mut self) -> Poll<Option<T>> {
1011         let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
1012         // Pop off a message
1013         match unsafe { inner.message_queue.pop_spin() } {
1014             Some(msg) => {
1015                 // If there are any parked task handles in the parked queue,
1016                 // pop one and unpark it.
1017                 self.unpark_one();
1018 
1019                 // Decrement number of messages
1020                 self.dec_num_messages();
1021 
1022                 Poll::Ready(Some(msg))
1023             }
1024             None => {
1025                 let state = decode_state(inner.state.load(SeqCst));
1026                 if state.is_open || state.num_messages != 0 {
1027                     // If queue is open, we need to return Pending
1028                     // to be woken up when new messages arrive.
1029                     // If queue is closed but num_messages is non-zero,
1030                     // it means that senders updated the state,
1031                     // but didn't put message to queue yet,
1032                     // so we need to park until sender unparks the task
1033                     // after queueing the message.
1034                     Poll::Pending
1035                 } else {
1036                     // If closed flag is set AND there are no pending messages
1037                     // it means end of stream
1038                     self.inner = None;
1039                     Poll::Ready(None)
1040                 }
1041             }
1042         }
1043     }
1044 
1045     // Unpark a single task handle if there is one pending in the parked queue
unpark_one(&mut self)1046     fn unpark_one(&mut self) {
1047         if let Some(inner) = &mut self.inner {
1048             if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1049                 task.lock().unwrap().notify();
1050             }
1051         }
1052     }
1053 
dec_num_messages(&self)1054     fn dec_num_messages(&self) {
1055         if let Some(inner) = &self.inner {
1056             // OPEN_MASK is highest bit, so it's unaffected by subtraction
1057             // unless there's underflow, and we know there's no underflow
1058             // because number of messages at this point is always > 0.
1059             inner.state.fetch_sub(1, SeqCst);
1060         }
1061     }
1062 }
1063 
1064 // The receiver does not ever take a Pin to the inner T
1065 impl<T> Unpin for Receiver<T> {}
1066 
1067 impl<T> FusedStream for Receiver<T> {
is_terminated(&self) -> bool1068     fn is_terminated(&self) -> bool {
1069         self.inner.is_none()
1070     }
1071 }
1072 
1073 impl<T> Stream for Receiver<T> {
1074     type Item = T;
1075 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<T>>1076     fn poll_next(
1077         mut self: Pin<&mut Self>,
1078         cx: &mut Context<'_>,
1079     ) -> Poll<Option<T>> {
1080             // Try to read a message off of the message queue.
1081         match self.next_message() {
1082             Poll::Ready(msg) => {
1083                 if msg.is_none() {
1084                     self.inner = None;
1085                 }
1086                 Poll::Ready(msg)
1087             },
1088             Poll::Pending => {
1089                 // There are no messages to read, in this case, park.
1090                 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1091                 // Check queue again after parking to prevent race condition:
1092                 // a message could be added to the queue after previous `next_message`
1093                 // before `register` call.
1094                 self.next_message()
1095             }
1096         }
1097     }
1098 }
1099 
1100 impl<T> Drop for Receiver<T> {
drop(&mut self)1101     fn drop(&mut self) {
1102         // Drain the channel of all pending messages
1103         self.close();
1104         if self.inner.is_some() {
1105             while let Poll::Ready(Some(..)) = self.next_message() {
1106                 // ...
1107             }
1108         }
1109     }
1110 }
1111 
1112 impl<T> UnboundedReceiver<T> {
1113     /// Closes the receiving half of a channel, without dropping it.
1114     ///
1115     /// This prevents any further messages from being sent on the channel while
1116     /// still enabling the receiver to drain messages that are buffered.
close(&mut self)1117     pub fn close(&mut self) {
1118         if let Some(inner) = &mut self.inner {
1119             inner.set_closed();
1120         }
1121     }
1122 
1123     /// Tries to receive the next message without notifying a context if empty.
1124     ///
1125     /// It is not recommended to call this function from inside of a future,
1126     /// only when you've otherwise arranged to be notified when the channel is
1127     /// no longer empty.
1128     ///
1129     /// This function will panic if called after `try_next` or `poll_next` has
1130     /// returned `None`.
try_next(&mut self) -> Result<Option<T>, TryRecvError>1131     pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1132         match self.next_message() {
1133             Poll::Ready(msg) => {
1134                 Ok(msg)
1135             },
1136             Poll::Pending => Err(TryRecvError { _priv: () }),
1137         }
1138     }
1139 
next_message(&mut self) -> Poll<Option<T>>1140     fn next_message(&mut self) -> Poll<Option<T>> {
1141         let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
1142         // Pop off a message
1143         match unsafe { inner.message_queue.pop_spin() } {
1144             Some(msg) => {
1145                 // Decrement number of messages
1146                 self.dec_num_messages();
1147 
1148                 Poll::Ready(Some(msg))
1149             }
1150             None => {
1151                 let state = decode_state(inner.state.load(SeqCst));
1152                 if state.is_open || state.num_messages != 0 {
1153                     // If queue is open, we need to return Pending
1154                     // to be woken up when new messages arrive.
1155                     // If queue is closed but num_messages is non-zero,
1156                     // it means that senders updated the state,
1157                     // but didn't put message to queue yet,
1158                     // so we need to park until sender unparks the task
1159                     // after queueing the message.
1160                     Poll::Pending
1161                 } else {
1162                     // If closed flag is set AND there are no pending messages
1163                     // it means end of stream
1164                     self.inner = None;
1165                     Poll::Ready(None)
1166                 }
1167             }
1168         }
1169     }
1170 
dec_num_messages(&self)1171     fn dec_num_messages(&self) {
1172         if let Some(inner) = &self.inner {
1173             // OPEN_MASK is highest bit, so it's unaffected by subtraction
1174             // unless there's underflow, and we know there's no underflow
1175             // because number of messages at this point is always > 0.
1176             inner.state.fetch_sub(1, SeqCst);
1177         }
1178     }
1179 }
1180 
1181 impl<T> FusedStream for UnboundedReceiver<T> {
is_terminated(&self) -> bool1182     fn is_terminated(&self) -> bool {
1183         self.inner.is_none()
1184     }
1185 }
1186 
1187 impl<T> Stream for UnboundedReceiver<T> {
1188     type Item = T;
1189 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<T>>1190     fn poll_next(
1191         mut self: Pin<&mut Self>,
1192         cx: &mut Context<'_>,
1193     ) -> Poll<Option<T>> {
1194         // Try to read a message off of the message queue.
1195         match self.next_message() {
1196             Poll::Ready(msg) => {
1197                 if msg.is_none() {
1198                     self.inner = None;
1199                 }
1200                 Poll::Ready(msg)
1201             },
1202             Poll::Pending => {
1203                 // There are no messages to read, in this case, park.
1204                 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1205                 // Check queue again after parking to prevent race condition:
1206                 // a message could be added to the queue after previous `next_message`
1207                 // before `register` call.
1208                 self.next_message()
1209             }
1210         }
1211     }
1212 }
1213 
1214 impl<T> Drop for UnboundedReceiver<T> {
drop(&mut self)1215     fn drop(&mut self) {
1216         // Drain the channel of all pending messages
1217         self.close();
1218         if self.inner.is_some() {
1219             while let Poll::Ready(Some(..)) = self.next_message() {
1220                 // ...
1221             }
1222         }
1223     }
1224 }
1225 
1226 /*
1227  *
1228  * ===== impl Inner =====
1229  *
1230  */
1231 
1232 impl<T> UnboundedInner<T> {
1233     // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1234     fn set_closed(&self) {
1235         let curr = self.state.load(SeqCst);
1236         if !decode_state(curr).is_open {
1237             return;
1238         }
1239 
1240         self.state.fetch_and(!OPEN_MASK, SeqCst);
1241     }
1242 }
1243 
1244 impl<T> BoundedInner<T> {
1245     // The return value is such that the total number of messages that can be
1246     // enqueued into the channel will never exceed MAX_CAPACITY
max_senders(&self) -> usize1247     fn max_senders(&self) -> usize {
1248         MAX_CAPACITY - self.buffer
1249     }
1250 
1251     // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1252     fn set_closed(&self) {
1253         let curr = self.state.load(SeqCst);
1254         if !decode_state(curr).is_open {
1255             return;
1256         }
1257 
1258         self.state.fetch_and(!OPEN_MASK, SeqCst);
1259     }
1260 }
1261 
1262 unsafe impl<T: Send> Send for UnboundedInner<T> {}
1263 unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1264 
1265 unsafe impl<T: Send> Send for BoundedInner<T> {}
1266 unsafe impl<T: Send> Sync for BoundedInner<T> {}
1267 
1268 /*
1269  *
1270  * ===== Helpers =====
1271  *
1272  */
1273 
decode_state(num: usize) -> State1274 fn decode_state(num: usize) -> State {
1275     State {
1276         is_open: num & OPEN_MASK == OPEN_MASK,
1277         num_messages: num & MAX_CAPACITY,
1278     }
1279 }
1280 
encode_state(state: &State) -> usize1281 fn encode_state(state: &State) -> usize {
1282     let mut num = state.num_messages;
1283 
1284     if state.is_open {
1285         num |= OPEN_MASK;
1286     }
1287 
1288     num
1289 }
1290