1 //! A multi-producer, single-consumer queue for sending values across
2 //! asynchronous tasks.
3 //!
4 //! Similarly to the `std`, channel creation provides [`Receiver`] and
5 //! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6 //! read values out of the channel. If there is no message to read from the
7 //! channel, the current task will be notified when a new value is sent.
8 //! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9 //! the channel. If the channel is at capacity, the send will be rejected and
10 //! the task will be notified when additional capacity is available. In other
11 //! words, the channel provides backpressure.
12 //!
13 //! Unbounded channels are also available using the `unbounded` constructor.
14 //!
15 //! # Disconnection
16 //!
17 //! When all [`Sender`] handles have been dropped, it is no longer
18 //! possible to send values into the channel. This is considered the termination
19 //! event of the stream. As such, [`Receiver::poll_next`]
20 //! will return `Ok(Ready(None))`.
21 //!
22 //! If the [`Receiver`] handle is dropped, then messages can no longer
23 //! be read out of the channel. In this case, all further attempts to send will
24 //! result in an error.
25 //!
26 //! # Clean Shutdown
27 //!
28 //! If the [`Receiver`] is simply dropped, then it is possible for
29 //! there to be messages still in the channel that will not be processed. As
30 //! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31 //! receiver will first call `close`, which will prevent any further messages to
32 //! be sent into the channel. Then, the receiver consumes the channel to
33 //! completion, at which point the receiver can be dropped.
34 //!
35 //! [`Sender`]: struct.Sender.html
36 //! [`Receiver`]: struct.Receiver.html
37 //! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38 //! [`Receiver::poll_next`]:
39 //!     ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40 
41 // At the core, the channel uses an atomic FIFO queue for message passing. This
42 // queue is used as the primary coordination primitive. In order to enforce
43 // capacity limits and handle back pressure, a secondary FIFO queue is used to
44 // send parked task handles.
45 //
46 // The general idea is that the channel is created with a `buffer` size of `n`.
47 // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48 // slot to hold a message. This allows `Sender` to know for a fact that a send
49 // will succeed *before* starting to do the actual work of sending the value.
50 // Since most of this work is lock-free, once the work starts, it is impossible
51 // to safely revert.
52 //
53 // If the sender is unable to process a send operation, then the current
54 // task is parked and the handle is sent on the parked task queue.
55 //
56 // Note that the implementation guarantees that the channel capacity will never
57 // exceed the configured limit, however there is no *strict* guarantee that the
58 // receiver will wake up a parked task *immediately* when a slot becomes
59 // available. However, it will almost always unpark a task when a slot becomes
60 // available and it is *guaranteed* that a sender will be unparked when the
61 // message that caused the sender to become parked is read out of the channel.
62 //
63 // The steps for sending a message are roughly:
64 //
65 // 1) Increment the channel message count
66 // 2) If the channel is at capacity, push the task handle onto the wait queue
67 // 3) Push the message onto the message queue.
68 //
69 // The steps for receiving a message are roughly:
70 //
71 // 1) Pop a message from the message queue
72 // 2) Pop a task handle from the wait queue
73 // 3) Decrement the channel message count.
74 //
75 // It's important for the order of operations on lock-free structures to happen
76 // in reverse order between the sender and receiver. This makes the message
77 // queue the primary coordination structure and establishes the necessary
78 // happens-before semantics required for the acquire / release semantics used
79 // by the queue structure.
80 
81 use futures_core::stream::{FusedStream, Stream};
82 use futures_core::task::__internal::AtomicWaker;
83 use futures_core::task::{Context, Poll, Waker};
84 use std::fmt;
85 use std::pin::Pin;
86 use std::sync::atomic::AtomicUsize;
87 use std::sync::atomic::Ordering::SeqCst;
88 use std::sync::{Arc, Mutex};
89 use std::thread;
90 
91 use crate::mpsc::queue::Queue;
92 
93 mod queue;
94 #[cfg(feature = "sink")]
95 mod sink_impl;
96 
97 #[derive(Debug)]
98 struct UnboundedSenderInner<T> {
99     // Channel state shared between the sender and receiver.
100     inner: Arc<UnboundedInner<T>>,
101 }
102 
103 #[derive(Debug)]
104 struct BoundedSenderInner<T> {
105     // Channel state shared between the sender and receiver.
106     inner: Arc<BoundedInner<T>>,
107 
108     // Handle to the task that is blocked on this sender. This handle is sent
109     // to the receiver half in order to be notified when the sender becomes
110     // unblocked.
111     sender_task: Arc<Mutex<SenderTask>>,
112 
113     // `true` if the sender might be blocked. This is an optimization to avoid
114     // having to lock the mutex most of the time.
115     maybe_parked: bool,
116 }
117 
118 // We never project Pin<&mut SenderInner> to `Pin<&mut T>`
119 impl<T> Unpin for UnboundedSenderInner<T> {}
120 impl<T> Unpin for BoundedSenderInner<T> {}
121 
122 /// The transmission end of a bounded mpsc channel.
123 ///
124 /// This value is created by the [`channel`](channel) function.
125 #[derive(Debug)]
126 pub struct Sender<T>(Option<BoundedSenderInner<T>>);
127 
128 /// The transmission end of an unbounded mpsc channel.
129 ///
130 /// This value is created by the [`unbounded`](unbounded) function.
131 #[derive(Debug)]
132 pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
133 
134 trait AssertKinds: Send + Sync + Clone {}
135 impl AssertKinds for UnboundedSender<u32> {}
136 
137 /// The receiving end of a bounded mpsc channel.
138 ///
139 /// This value is created by the [`channel`](channel) function.
140 #[derive(Debug)]
141 pub struct Receiver<T> {
142     inner: Option<Arc<BoundedInner<T>>>,
143 }
144 
145 /// The receiving end of an unbounded mpsc channel.
146 ///
147 /// This value is created by the [`unbounded`](unbounded) function.
148 #[derive(Debug)]
149 pub struct UnboundedReceiver<T> {
150     inner: Option<Arc<UnboundedInner<T>>>,
151 }
152 
153 // `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
154 impl<T> Unpin for UnboundedReceiver<T> {}
155 
156 /// The error type for [`Sender`s](Sender) used as `Sink`s.
157 #[derive(Clone, Debug, PartialEq, Eq)]
158 pub struct SendError {
159     kind: SendErrorKind,
160 }
161 
162 /// The error type returned from [`try_send`](Sender::try_send).
163 #[derive(Clone, PartialEq, Eq)]
164 pub struct TrySendError<T> {
165     err: SendError,
166     val: T,
167 }
168 
169 #[derive(Clone, Debug, PartialEq, Eq)]
170 enum SendErrorKind {
171     Full,
172     Disconnected,
173 }
174 
175 /// The error type returned from [`try_next`](Receiver::try_next).
176 pub struct TryRecvError {
177     _priv: (),
178 }
179 
180 impl fmt::Display for SendError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result181     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182         if self.is_full() {
183             write!(f, "send failed because channel is full")
184         } else {
185             write!(f, "send failed because receiver is gone")
186         }
187     }
188 }
189 
190 impl std::error::Error for SendError {}
191 
192 impl SendError {
193     /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool194     pub fn is_full(&self) -> bool {
195         match self.kind {
196             SendErrorKind::Full => true,
197             _ => false,
198         }
199     }
200 
201     /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool202     pub fn is_disconnected(&self) -> bool {
203         match self.kind {
204             SendErrorKind::Disconnected => true,
205             _ => false,
206         }
207     }
208 }
209 
210 impl<T> fmt::Debug for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result211     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212         f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
213     }
214 }
215 
216 impl<T> fmt::Display for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result217     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218         if self.is_full() {
219             write!(f, "send failed because channel is full")
220         } else {
221             write!(f, "send failed because receiver is gone")
222         }
223     }
224 }
225 
226 impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
227 
228 impl<T> TrySendError<T> {
229     /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool230     pub fn is_full(&self) -> bool {
231         self.err.is_full()
232     }
233 
234     /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool235     pub fn is_disconnected(&self) -> bool {
236         self.err.is_disconnected()
237     }
238 
239     /// Returns the message that was attempted to be sent but failed.
into_inner(self) -> T240     pub fn into_inner(self) -> T {
241         self.val
242     }
243 
244     /// Drops the message and converts into a `SendError`.
into_send_error(self) -> SendError245     pub fn into_send_error(self) -> SendError {
246         self.err
247     }
248 }
249 
250 impl fmt::Debug for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result251     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252         f.debug_tuple("TryRecvError").finish()
253     }
254 }
255 
256 impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result257     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258         write!(f, "receiver channel is empty")
259     }
260 }
261 
262 impl std::error::Error for TryRecvError {}
263 
264 #[derive(Debug)]
265 struct UnboundedInner<T> {
266     // Internal channel state. Consists of the number of messages stored in the
267     // channel as well as a flag signalling that the channel is closed.
268     state: AtomicUsize,
269 
270     // Atomic, FIFO queue used to send messages to the receiver
271     message_queue: Queue<T>,
272 
273     // Number of senders in existence
274     num_senders: AtomicUsize,
275 
276     // Handle to the receiver's task.
277     recv_task: AtomicWaker,
278 }
279 
280 #[derive(Debug)]
281 struct BoundedInner<T> {
282     // Max buffer size of the channel. If `None` then the channel is unbounded.
283     buffer: usize,
284 
285     // Internal channel state. Consists of the number of messages stored in the
286     // channel as well as a flag signalling that the channel is closed.
287     state: AtomicUsize,
288 
289     // Atomic, FIFO queue used to send messages to the receiver
290     message_queue: Queue<T>,
291 
292     // Atomic, FIFO queue used to send parked task handles to the receiver.
293     parked_queue: Queue<Arc<Mutex<SenderTask>>>,
294 
295     // Number of senders in existence
296     num_senders: AtomicUsize,
297 
298     // Handle to the receiver's task.
299     recv_task: AtomicWaker,
300 }
301 
302 // Struct representation of `Inner::state`.
303 #[derive(Debug, Clone, Copy)]
304 struct State {
305     // `true` when the channel is open
306     is_open: bool,
307 
308     // Number of messages in the channel
309     num_messages: usize,
310 }
311 
312 // The `is_open` flag is stored in the left-most bit of `Inner::state`
313 const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
314 
315 // When a new channel is created, it is created in the open state with no
316 // pending messages.
317 const INIT_STATE: usize = OPEN_MASK;
318 
319 // The maximum number of messages that a channel can track is `usize::max_value() >> 1`
320 const MAX_CAPACITY: usize = !(OPEN_MASK);
321 
322 // The maximum requested buffer size must be less than the maximum capacity of
323 // a channel. This is because each sender gets a guaranteed slot.
324 const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
325 
326 // Sent to the consumer to wake up blocked producers
327 #[derive(Debug)]
328 struct SenderTask {
329     task: Option<Waker>,
330     is_parked: bool,
331 }
332 
333 impl SenderTask {
new() -> Self334     fn new() -> Self {
335         Self { task: None, is_parked: false }
336     }
337 
notify(&mut self)338     fn notify(&mut self) {
339         self.is_parked = false;
340 
341         if let Some(task) = self.task.take() {
342             task.wake();
343         }
344     }
345 }
346 
347 /// Creates a bounded mpsc channel for communicating between asynchronous tasks.
348 ///
349 /// Being bounded, this channel provides backpressure to ensure that the sender
350 /// outpaces the receiver by only a limited amount. The channel's capacity is
351 /// equal to `buffer + num-senders`. In other words, each sender gets a
352 /// guaranteed slot in the channel capacity, and on top of that there are
353 /// `buffer` "first come, first serve" slots available to all senders.
354 ///
355 /// The [`Receiver`](Receiver) returned implements the
356 /// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements
357 /// `Sink`.
channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>)358 pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
359     // Check that the requested buffer size does not exceed the maximum buffer
360     // size permitted by the system.
361     assert!(buffer < MAX_BUFFER, "requested buffer size too large");
362 
363     let inner = Arc::new(BoundedInner {
364         buffer,
365         state: AtomicUsize::new(INIT_STATE),
366         message_queue: Queue::new(),
367         parked_queue: Queue::new(),
368         num_senders: AtomicUsize::new(1),
369         recv_task: AtomicWaker::new(),
370     });
371 
372     let tx = BoundedSenderInner {
373         inner: inner.clone(),
374         sender_task: Arc::new(Mutex::new(SenderTask::new())),
375         maybe_parked: false,
376     };
377 
378     let rx = Receiver { inner: Some(inner) };
379 
380     (Sender(Some(tx)), rx)
381 }
382 
383 /// Creates an unbounded mpsc channel for communicating between asynchronous
384 /// tasks.
385 ///
386 /// A `send` on this channel will always succeed as long as the receive half has
387 /// not been closed. If the receiver falls behind, messages will be arbitrarily
388 /// buffered.
389 ///
390 /// **Note** that the amount of available system memory is an implicit bound to
391 /// the channel. Using an `unbounded` channel has the ability of causing the
392 /// process to run out of memory. In this case, the process will be aborted.
unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)393 pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
394     let inner = Arc::new(UnboundedInner {
395         state: AtomicUsize::new(INIT_STATE),
396         message_queue: Queue::new(),
397         num_senders: AtomicUsize::new(1),
398         recv_task: AtomicWaker::new(),
399     });
400 
401     let tx = UnboundedSenderInner { inner: inner.clone() };
402 
403     let rx = UnboundedReceiver { inner: Some(inner) };
404 
405     (UnboundedSender(Some(tx)), rx)
406 }
407 
408 /*
409  *
410  * ===== impl Sender =====
411  *
412  */
413 
414 impl<T> UnboundedSenderInner<T> {
poll_ready_nb(&self) -> Poll<Result<(), SendError>>415     fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
416         let state = decode_state(self.inner.state.load(SeqCst));
417         if state.is_open {
418             Poll::Ready(Ok(()))
419         } else {
420             Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
421         }
422     }
423 
424     // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)425     fn queue_push_and_signal(&self, msg: T) {
426         // Push the message onto the message queue
427         self.inner.message_queue.push(msg);
428 
429         // Signal to the receiver that a message has been enqueued. If the
430         // receiver is parked, this will unpark the task.
431         self.inner.recv_task.wake();
432     }
433 
434     // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>435     fn inc_num_messages(&self) -> Option<usize> {
436         let mut curr = self.inner.state.load(SeqCst);
437 
438         loop {
439             let mut state = decode_state(curr);
440 
441             // The receiver end closed the channel.
442             if !state.is_open {
443                 return None;
444             }
445 
446             // This probably is never hit? Odds are the process will run out of
447             // memory first. It may be worth to return something else in this
448             // case?
449             assert!(
450                 state.num_messages < MAX_CAPACITY,
451                 "buffer space \
452                     exhausted; sending this messages would overflow the state"
453             );
454 
455             state.num_messages += 1;
456 
457             let next = encode_state(&state);
458             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
459                 Ok(_) => return Some(state.num_messages),
460                 Err(actual) => curr = actual,
461             }
462         }
463     }
464 
465     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool466     fn same_receiver(&self, other: &Self) -> bool {
467         Arc::ptr_eq(&self.inner, &other.inner)
468     }
469 
470     /// Returns whether the sender send to this receiver.
is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool471     fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
472         Arc::ptr_eq(&self.inner, inner)
473     }
474 
475     /// Returns pointer to the Arc containing sender
476     ///
477     /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const UnboundedInner<T>478     fn ptr(&self) -> *const UnboundedInner<T> {
479         &*self.inner
480     }
481 
482     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool483     fn is_closed(&self) -> bool {
484         !decode_state(self.inner.state.load(SeqCst)).is_open
485     }
486 
487     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)488     fn close_channel(&self) {
489         // There's no need to park this sender, its dropping,
490         // and we don't want to check for capacity, so skip
491         // that stuff from `do_send`.
492 
493         self.inner.set_closed();
494         self.inner.recv_task.wake();
495     }
496 }
497 
498 impl<T> BoundedSenderInner<T> {
499     /// Attempts to send a message on this `Sender`, returning the message
500     /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>501     fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
502         // If the sender is currently blocked, reject the message
503         if !self.poll_unparked(None).is_ready() {
504             return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
505         }
506 
507         // The channel has capacity to accept the message, so send it
508         self.do_send_b(msg)
509     }
510 
511     // Do the send without failing.
512     // Can be called only by bounded sender.
do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>>513     fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
514         // Anyone calling do_send *should* make sure there is room first,
515         // but assert here for tests as a sanity check.
516         debug_assert!(self.poll_unparked(None).is_ready());
517 
518         // First, increment the number of messages contained by the channel.
519         // This operation will also atomically determine if the sender task
520         // should be parked.
521         //
522         // `None` is returned in the case that the channel has been closed by the
523         // receiver. This happens when `Receiver::close` is called or the
524         // receiver is dropped.
525         let park_self = match self.inc_num_messages() {
526             Some(num_messages) => {
527                 // Block if the current number of pending messages has exceeded
528                 // the configured buffer size
529                 num_messages > self.inner.buffer
530             }
531             None => {
532                 return Err(TrySendError {
533                     err: SendError { kind: SendErrorKind::Disconnected },
534                     val: msg,
535                 })
536             }
537         };
538 
539         // If the channel has reached capacity, then the sender task needs to
540         // be parked. This will send the task handle on the parked task queue.
541         //
542         // However, when `do_send` is called while dropping the `Sender`,
543         // `task::current()` can't be called safely. In this case, in order to
544         // maintain internal consistency, a blank message is pushed onto the
545         // parked task queue.
546         if park_self {
547             self.park();
548         }
549 
550         self.queue_push_and_signal(msg);
551 
552         Ok(())
553     }
554 
555     // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)556     fn queue_push_and_signal(&self, msg: T) {
557         // Push the message onto the message queue
558         self.inner.message_queue.push(msg);
559 
560         // Signal to the receiver that a message has been enqueued. If the
561         // receiver is parked, this will unpark the task.
562         self.inner.recv_task.wake();
563     }
564 
565     // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>566     fn inc_num_messages(&self) -> Option<usize> {
567         let mut curr = self.inner.state.load(SeqCst);
568 
569         loop {
570             let mut state = decode_state(curr);
571 
572             // The receiver end closed the channel.
573             if !state.is_open {
574                 return None;
575             }
576 
577             // This probably is never hit? Odds are the process will run out of
578             // memory first. It may be worth to return something else in this
579             // case?
580             assert!(
581                 state.num_messages < MAX_CAPACITY,
582                 "buffer space \
583                     exhausted; sending this messages would overflow the state"
584             );
585 
586             state.num_messages += 1;
587 
588             let next = encode_state(&state);
589             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
590                 Ok(_) => return Some(state.num_messages),
591                 Err(actual) => curr = actual,
592             }
593         }
594     }
595 
park(&mut self)596     fn park(&mut self) {
597         {
598             let mut sender = self.sender_task.lock().unwrap();
599             sender.task = None;
600             sender.is_parked = true;
601         }
602 
603         // Send handle over queue
604         let t = self.sender_task.clone();
605         self.inner.parked_queue.push(t);
606 
607         // Check to make sure we weren't closed after we sent our task on the
608         // queue
609         let state = decode_state(self.inner.state.load(SeqCst));
610         self.maybe_parked = state.is_open;
611     }
612 
613     /// Polls the channel to determine if there is guaranteed capacity to send
614     /// at least one item without waiting.
615     ///
616     /// # Return value
617     ///
618     /// This method returns:
619     ///
620     /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
621     /// - `Poll::Pending` if the channel may not have
622     ///   capacity, in which case the current task is queued to be notified once
623     ///   capacity is available;
624     /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>>625     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
626         let state = decode_state(self.inner.state.load(SeqCst));
627         if !state.is_open {
628             return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
629         }
630 
631         self.poll_unparked(Some(cx)).map(Ok)
632     }
633 
634     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool635     fn same_receiver(&self, other: &Self) -> bool {
636         Arc::ptr_eq(&self.inner, &other.inner)
637     }
638 
639     /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool640     fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
641         Arc::ptr_eq(&self.inner, receiver)
642     }
643 
644     /// Returns pointer to the Arc containing sender
645     ///
646     /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const BoundedInner<T>647     fn ptr(&self) -> *const BoundedInner<T> {
648         &*self.inner
649     }
650 
651     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool652     fn is_closed(&self) -> bool {
653         !decode_state(self.inner.state.load(SeqCst)).is_open
654     }
655 
656     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)657     fn close_channel(&self) {
658         // There's no need to park this sender, its dropping,
659         // and we don't want to check for capacity, so skip
660         // that stuff from `do_send`.
661 
662         self.inner.set_closed();
663         self.inner.recv_task.wake();
664     }
665 
poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()>666     fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
667         // First check the `maybe_parked` variable. This avoids acquiring the
668         // lock in most cases
669         if self.maybe_parked {
670             // Get a lock on the task handle
671             let mut task = self.sender_task.lock().unwrap();
672 
673             if !task.is_parked {
674                 self.maybe_parked = false;
675                 return Poll::Ready(());
676             }
677 
678             // At this point, an unpark request is pending, so there will be an
679             // unpark sometime in the future. We just need to make sure that
680             // the correct task will be notified.
681             //
682             // Update the task in case the `Sender` has been moved to another
683             // task
684             task.task = cx.map(|cx| cx.waker().clone());
685 
686             Poll::Pending
687         } else {
688             Poll::Ready(())
689         }
690     }
691 }
692 
693 impl<T> Sender<T> {
694     /// Attempts to send a message on this `Sender`, returning the message
695     /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>696     pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
697         if let Some(inner) = &mut self.0 {
698             inner.try_send(msg)
699         } else {
700             Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
701         }
702     }
703 
704     /// Send a message on the channel.
705     ///
706     /// This function should only be called after
707     /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
708     /// ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>709     pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
710         self.try_send(msg).map_err(|e| e.err)
711     }
712 
713     /// Polls the channel to determine if there is guaranteed capacity to send
714     /// at least one item without waiting.
715     ///
716     /// # Return value
717     ///
718     /// This method returns:
719     ///
720     /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
721     /// - `Poll::Pending` if the channel may not have
722     ///   capacity, in which case the current task is queued to be notified once
723     ///   capacity is available;
724     /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>>725     pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
726         let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
727         inner.poll_ready(cx)
728     }
729 
730     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool731     pub fn is_closed(&self) -> bool {
732         self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
733     }
734 
735     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&mut self)736     pub fn close_channel(&mut self) {
737         if let Some(inner) = &mut self.0 {
738             inner.close_channel();
739         }
740     }
741 
742     /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)743     pub fn disconnect(&mut self) {
744         self.0 = None;
745     }
746 
747     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool748     pub fn same_receiver(&self, other: &Self) -> bool {
749         match (&self.0, &other.0) {
750             (Some(inner), Some(other)) => inner.same_receiver(other),
751             _ => false,
752         }
753     }
754 
755     /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &Receiver<T>) -> bool756     pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
757         match (&self.0, &receiver.inner) {
758             (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
759             _ => false,
760         }
761     }
762 
763     /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher,764     pub fn hash_receiver<H>(&self, hasher: &mut H)
765     where
766         H: std::hash::Hasher,
767     {
768         use std::hash::Hash;
769 
770         let ptr = self.0.as_ref().map(|inner| inner.ptr());
771         ptr.hash(hasher);
772     }
773 }
774 
775 impl<T> UnboundedSender<T> {
776     /// Check if the channel is ready to receive a message.
poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>>777     pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
778         let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
779         inner.poll_ready_nb()
780     }
781 
782     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool783     pub fn is_closed(&self) -> bool {
784         self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
785     }
786 
787     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)788     pub fn close_channel(&self) {
789         if let Some(inner) = &self.0 {
790             inner.close_channel();
791         }
792     }
793 
794     /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)795     pub fn disconnect(&mut self) {
796         self.0 = None;
797     }
798 
799     // Do the send without parking current task.
do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>>800     fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
801         if let Some(inner) = &self.0 {
802             if inner.inc_num_messages().is_some() {
803                 inner.queue_push_and_signal(msg);
804                 return Ok(());
805             }
806         }
807 
808         Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
809     }
810 
811     /// Send a message on the channel.
812     ///
813     /// This method should only be called after `poll_ready` has been used to
814     /// verify that the channel is ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>815     pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
816         self.do_send_nb(msg).map_err(|e| e.err)
817     }
818 
819     /// Sends a message along this channel.
820     ///
821     /// This is an unbounded sender, so this function differs from `Sink::send`
822     /// by ensuring the return type reflects that the channel is always ready to
823     /// receive messages.
unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>>824     pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
825         self.do_send_nb(msg)
826     }
827 
828     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool829     pub fn same_receiver(&self, other: &Self) -> bool {
830         match (&self.0, &other.0) {
831             (Some(inner), Some(other)) => inner.same_receiver(other),
832             _ => false,
833         }
834     }
835 
836     /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool837     pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
838         match (&self.0, &receiver.inner) {
839             (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
840             _ => false,
841         }
842     }
843 
844     /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher,845     pub fn hash_receiver<H>(&self, hasher: &mut H)
846     where
847         H: std::hash::Hasher,
848     {
849         use std::hash::Hash;
850 
851         let ptr = self.0.as_ref().map(|inner| inner.ptr());
852         ptr.hash(hasher);
853     }
854 }
855 
856 impl<T> Clone for Sender<T> {
clone(&self) -> Self857     fn clone(&self) -> Self {
858         Self(self.0.clone())
859     }
860 }
861 
862 impl<T> Clone for UnboundedSender<T> {
clone(&self) -> Self863     fn clone(&self) -> Self {
864         Self(self.0.clone())
865     }
866 }
867 
868 impl<T> Clone for UnboundedSenderInner<T> {
clone(&self) -> Self869     fn clone(&self) -> Self {
870         // Since this atomic op isn't actually guarding any memory and we don't
871         // care about any orderings besides the ordering on the single atomic
872         // variable, a relaxed ordering is acceptable.
873         let mut curr = self.inner.num_senders.load(SeqCst);
874 
875         loop {
876             // If the maximum number of senders has been reached, then fail
877             if curr == MAX_BUFFER {
878                 panic!("cannot clone `Sender` -- too many outstanding senders");
879             }
880 
881             debug_assert!(curr < MAX_BUFFER);
882 
883             let next = curr + 1;
884             match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
885                 Ok(_) => {
886                     // The ABA problem doesn't matter here. We only care that the
887                     // number of senders never exceeds the maximum.
888                     return Self { inner: self.inner.clone() };
889                 }
890                 Err(actual) => curr = actual,
891             }
892         }
893     }
894 }
895 
896 impl<T> Clone for BoundedSenderInner<T> {
clone(&self) -> Self897     fn clone(&self) -> Self {
898         // Since this atomic op isn't actually guarding any memory and we don't
899         // care about any orderings besides the ordering on the single atomic
900         // variable, a relaxed ordering is acceptable.
901         let mut curr = self.inner.num_senders.load(SeqCst);
902 
903         loop {
904             // If the maximum number of senders has been reached, then fail
905             if curr == self.inner.max_senders() {
906                 panic!("cannot clone `Sender` -- too many outstanding senders");
907             }
908 
909             debug_assert!(curr < self.inner.max_senders());
910 
911             let next = curr + 1;
912             match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
913                 Ok(_) => {
914                     // The ABA problem doesn't matter here. We only care that the
915                     // number of senders never exceeds the maximum.
916                     return Self {
917                         inner: self.inner.clone(),
918                         sender_task: Arc::new(Mutex::new(SenderTask::new())),
919                         maybe_parked: false,
920                     };
921                 }
922                 Err(actual) => curr = actual,
923             }
924         }
925     }
926 }
927 
928 impl<T> Drop for UnboundedSenderInner<T> {
drop(&mut self)929     fn drop(&mut self) {
930         // Ordering between variables don't matter here
931         let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
932 
933         if prev == 1 {
934             self.close_channel();
935         }
936     }
937 }
938 
939 impl<T> Drop for BoundedSenderInner<T> {
drop(&mut self)940     fn drop(&mut self) {
941         // Ordering between variables don't matter here
942         let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
943 
944         if prev == 1 {
945             self.close_channel();
946         }
947     }
948 }
949 
950 /*
951  *
952  * ===== impl Receiver =====
953  *
954  */
955 
956 impl<T> Receiver<T> {
957     /// Closes the receiving half of a channel, without dropping it.
958     ///
959     /// This prevents any further messages from being sent on the channel while
960     /// still enabling the receiver to drain messages that are buffered.
close(&mut self)961     pub fn close(&mut self) {
962         if let Some(inner) = &mut self.inner {
963             inner.set_closed();
964 
965             // Wake up any threads waiting as they'll see that we've closed the
966             // channel and will continue on their merry way.
967             while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
968                 task.lock().unwrap().notify();
969             }
970         }
971     }
972 
973     /// Tries to receive the next message without notifying a context if empty.
974     ///
975     /// It is not recommended to call this function from inside of a future,
976     /// only when you've otherwise arranged to be notified when the channel is
977     /// no longer empty.
978     ///
979     /// This function returns:
980     /// * `Ok(Some(t))` when message is fetched
981     /// * `Ok(None)` when channel is closed and no messages left in the queue
982     /// * `Err(e)` when there are no messages available, but channel is not yet closed
try_next(&mut self) -> Result<Option<T>, TryRecvError>983     pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
984         match self.next_message() {
985             Poll::Ready(msg) => Ok(msg),
986             Poll::Pending => Err(TryRecvError { _priv: () }),
987         }
988     }
989 
next_message(&mut self) -> Poll<Option<T>>990     fn next_message(&mut self) -> Poll<Option<T>> {
991         let inner = match self.inner.as_mut() {
992             None => return Poll::Ready(None),
993             Some(inner) => inner,
994         };
995         // Pop off a message
996         match unsafe { inner.message_queue.pop_spin() } {
997             Some(msg) => {
998                 // If there are any parked task handles in the parked queue,
999                 // pop one and unpark it.
1000                 self.unpark_one();
1001 
1002                 // Decrement number of messages
1003                 self.dec_num_messages();
1004 
1005                 Poll::Ready(Some(msg))
1006             }
1007             None => {
1008                 let state = decode_state(inner.state.load(SeqCst));
1009                 if state.is_closed() {
1010                     // If closed flag is set AND there are no pending messages
1011                     // it means end of stream
1012                     self.inner = None;
1013                     Poll::Ready(None)
1014                 } else {
1015                     // If queue is open, we need to return Pending
1016                     // to be woken up when new messages arrive.
1017                     // If queue is closed but num_messages is non-zero,
1018                     // it means that senders updated the state,
1019                     // but didn't put message to queue yet,
1020                     // so we need to park until sender unparks the task
1021                     // after queueing the message.
1022                     Poll::Pending
1023                 }
1024             }
1025         }
1026     }
1027 
1028     // Unpark a single task handle if there is one pending in the parked queue
unpark_one(&mut self)1029     fn unpark_one(&mut self) {
1030         if let Some(inner) = &mut self.inner {
1031             if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1032                 task.lock().unwrap().notify();
1033             }
1034         }
1035     }
1036 
dec_num_messages(&self)1037     fn dec_num_messages(&self) {
1038         if let Some(inner) = &self.inner {
1039             // OPEN_MASK is highest bit, so it's unaffected by subtraction
1040             // unless there's underflow, and we know there's no underflow
1041             // because number of messages at this point is always > 0.
1042             inner.state.fetch_sub(1, SeqCst);
1043         }
1044     }
1045 }
1046 
1047 // The receiver does not ever take a Pin to the inner T
1048 impl<T> Unpin for Receiver<T> {}
1049 
1050 impl<T> FusedStream for Receiver<T> {
is_terminated(&self) -> bool1051     fn is_terminated(&self) -> bool {
1052         self.inner.is_none()
1053     }
1054 }
1055 
1056 impl<T> Stream for Receiver<T> {
1057     type Item = T;
1058 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>1059     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1060         // Try to read a message off of the message queue.
1061         match self.next_message() {
1062             Poll::Ready(msg) => {
1063                 if msg.is_none() {
1064                     self.inner = None;
1065                 }
1066                 Poll::Ready(msg)
1067             }
1068             Poll::Pending => {
1069                 // There are no messages to read, in this case, park.
1070                 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1071                 // Check queue again after parking to prevent race condition:
1072                 // a message could be added to the queue after previous `next_message`
1073                 // before `register` call.
1074                 self.next_message()
1075             }
1076         }
1077     }
1078 }
1079 
1080 impl<T> Drop for Receiver<T> {
drop(&mut self)1081     fn drop(&mut self) {
1082         // Drain the channel of all pending messages
1083         self.close();
1084         if self.inner.is_some() {
1085             loop {
1086                 match self.next_message() {
1087                     Poll::Ready(Some(_)) => {}
1088                     Poll::Ready(None) => break,
1089                     Poll::Pending => {
1090                         let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1091 
1092                         // If the channel is closed, then there is no need to park.
1093                         if state.is_closed() {
1094                             break;
1095                         }
1096 
1097                         // TODO: Spinning isn't ideal, it might be worth
1098                         // investigating using a condvar or some other strategy
1099                         // here. That said, if this case is hit, then another thread
1100                         // is about to push the value into the queue and this isn't
1101                         // the only spinlock in the impl right now.
1102                         thread::yield_now();
1103                     }
1104                 }
1105             }
1106         }
1107     }
1108 }
1109 
1110 impl<T> UnboundedReceiver<T> {
1111     /// Closes the receiving half of a channel, without dropping it.
1112     ///
1113     /// This prevents any further messages from being sent on the channel while
1114     /// still enabling the receiver to drain messages that are buffered.
close(&mut self)1115     pub fn close(&mut self) {
1116         if let Some(inner) = &mut self.inner {
1117             inner.set_closed();
1118         }
1119     }
1120 
1121     /// Tries to receive the next message without notifying a context if empty.
1122     ///
1123     /// It is not recommended to call this function from inside of a future,
1124     /// only when you've otherwise arranged to be notified when the channel is
1125     /// no longer empty.
1126     ///
1127     /// This function returns:
1128     /// * `Ok(Some(t))` when message is fetched
1129     /// * `Ok(None)` when channel is closed and no messages left in the queue
1130     /// * `Err(e)` when there are no messages available, but channel is not yet closed
try_next(&mut self) -> Result<Option<T>, TryRecvError>1131     pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1132         match self.next_message() {
1133             Poll::Ready(msg) => Ok(msg),
1134             Poll::Pending => Err(TryRecvError { _priv: () }),
1135         }
1136     }
1137 
next_message(&mut self) -> Poll<Option<T>>1138     fn next_message(&mut self) -> Poll<Option<T>> {
1139         let inner = match self.inner.as_mut() {
1140             None => return Poll::Ready(None),
1141             Some(inner) => inner,
1142         };
1143         // Pop off a message
1144         match unsafe { inner.message_queue.pop_spin() } {
1145             Some(msg) => {
1146                 // Decrement number of messages
1147                 self.dec_num_messages();
1148 
1149                 Poll::Ready(Some(msg))
1150             }
1151             None => {
1152                 let state = decode_state(inner.state.load(SeqCst));
1153                 if state.is_closed() {
1154                     // If closed flag is set AND there are no pending messages
1155                     // it means end of stream
1156                     self.inner = None;
1157                     Poll::Ready(None)
1158                 } else {
1159                     // If queue is open, we need to return Pending
1160                     // to be woken up when new messages arrive.
1161                     // If queue is closed but num_messages is non-zero,
1162                     // it means that senders updated the state,
1163                     // but didn't put message to queue yet,
1164                     // so we need to park until sender unparks the task
1165                     // after queueing the message.
1166                     Poll::Pending
1167                 }
1168             }
1169         }
1170     }
1171 
dec_num_messages(&self)1172     fn dec_num_messages(&self) {
1173         if let Some(inner) = &self.inner {
1174             // OPEN_MASK is highest bit, so it's unaffected by subtraction
1175             // unless there's underflow, and we know there's no underflow
1176             // because number of messages at this point is always > 0.
1177             inner.state.fetch_sub(1, SeqCst);
1178         }
1179     }
1180 }
1181 
1182 impl<T> FusedStream for UnboundedReceiver<T> {
is_terminated(&self) -> bool1183     fn is_terminated(&self) -> bool {
1184         self.inner.is_none()
1185     }
1186 }
1187 
1188 impl<T> Stream for UnboundedReceiver<T> {
1189     type Item = T;
1190 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>1191     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1192         // Try to read a message off of the message queue.
1193         match self.next_message() {
1194             Poll::Ready(msg) => {
1195                 if msg.is_none() {
1196                     self.inner = None;
1197                 }
1198                 Poll::Ready(msg)
1199             }
1200             Poll::Pending => {
1201                 // There are no messages to read, in this case, park.
1202                 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1203                 // Check queue again after parking to prevent race condition:
1204                 // a message could be added to the queue after previous `next_message`
1205                 // before `register` call.
1206                 self.next_message()
1207             }
1208         }
1209     }
1210 }
1211 
1212 impl<T> Drop for UnboundedReceiver<T> {
drop(&mut self)1213     fn drop(&mut self) {
1214         // Drain the channel of all pending messages
1215         self.close();
1216         if self.inner.is_some() {
1217             loop {
1218                 match self.next_message() {
1219                     Poll::Ready(Some(_)) => {}
1220                     Poll::Ready(None) => break,
1221                     Poll::Pending => {
1222                         let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1223 
1224                         // If the channel is closed, then there is no need to park.
1225                         if state.is_closed() {
1226                             break;
1227                         }
1228 
1229                         // TODO: Spinning isn't ideal, it might be worth
1230                         // investigating using a condvar or some other strategy
1231                         // here. That said, if this case is hit, then another thread
1232                         // is about to push the value into the queue and this isn't
1233                         // the only spinlock in the impl right now.
1234                         thread::yield_now();
1235                     }
1236                 }
1237             }
1238         }
1239     }
1240 }
1241 
1242 /*
1243  *
1244  * ===== impl Inner =====
1245  *
1246  */
1247 
1248 impl<T> UnboundedInner<T> {
1249     // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1250     fn set_closed(&self) {
1251         let curr = self.state.load(SeqCst);
1252         if !decode_state(curr).is_open {
1253             return;
1254         }
1255 
1256         self.state.fetch_and(!OPEN_MASK, SeqCst);
1257     }
1258 }
1259 
1260 impl<T> BoundedInner<T> {
1261     // The return value is such that the total number of messages that can be
1262     // enqueued into the channel will never exceed MAX_CAPACITY
max_senders(&self) -> usize1263     fn max_senders(&self) -> usize {
1264         MAX_CAPACITY - self.buffer
1265     }
1266 
1267     // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1268     fn set_closed(&self) {
1269         let curr = self.state.load(SeqCst);
1270         if !decode_state(curr).is_open {
1271             return;
1272         }
1273 
1274         self.state.fetch_and(!OPEN_MASK, SeqCst);
1275     }
1276 }
1277 
1278 unsafe impl<T: Send> Send for UnboundedInner<T> {}
1279 unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1280 
1281 unsafe impl<T: Send> Send for BoundedInner<T> {}
1282 unsafe impl<T: Send> Sync for BoundedInner<T> {}
1283 
1284 impl State {
is_closed(&self) -> bool1285     fn is_closed(&self) -> bool {
1286         !self.is_open && self.num_messages == 0
1287     }
1288 }
1289 
1290 /*
1291  *
1292  * ===== Helpers =====
1293  *
1294  */
1295 
decode_state(num: usize) -> State1296 fn decode_state(num: usize) -> State {
1297     State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
1298 }
1299 
encode_state(state: &State) -> usize1300 fn encode_state(state: &State) -> usize {
1301     let mut num = state.num_messages;
1302 
1303     if state.is_open {
1304         num |= OPEN_MASK;
1305     }
1306 
1307     num
1308 }
1309