1 //! A multi-producer, single-consumer queue for sending values across
2 //! asynchronous tasks.
3 //!
4 //! Similarly to the `std`, channel creation provides [`Receiver`] and
5 //! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6 //! read values out of the channel. If there is no message to read from the
7 //! channel, the current task will be notified when a new value is sent.
8 //! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9 //! the channel. If the channel is at capacity, the send will be rejected and
10 //! the task will be notified when additional capacity is available. In other
11 //! words, the channel provides backpressure.
12 //!
13 //! Unbounded channels are also available using the `unbounded` constructor.
14 //!
15 //! # Disconnection
16 //!
17 //! When all [`Sender`] handles have been dropped, it is no longer
18 //! possible to send values into the channel. This is considered the termination
19 //! event of the stream. As such, [`Receiver::poll_next`]
20 //! will return `Ok(Ready(None))`.
21 //!
22 //! If the [`Receiver`] handle is dropped, then messages can no longer
23 //! be read out of the channel. In this case, all further attempts to send will
24 //! result in an error.
25 //!
26 //! # Clean Shutdown
27 //!
28 //! If the [`Receiver`] is simply dropped, then it is possible for
29 //! there to be messages still in the channel that will not be processed. As
30 //! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31 //! receiver will first call `close`, which will prevent any further messages to
32 //! be sent into the channel. Then, the receiver consumes the channel to
33 //! completion, at which point the receiver can be dropped.
34 //!
35 //! [`Sender`]: struct.Sender.html
36 //! [`Receiver`]: struct.Receiver.html
37 //! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38 //! [`Receiver::poll_next`]:
39 //! ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40
41 // At the core, the channel uses an atomic FIFO queue for message passing. This
42 // queue is used as the primary coordination primitive. In order to enforce
43 // capacity limits and handle back pressure, a secondary FIFO queue is used to
44 // send parked task handles.
45 //
46 // The general idea is that the channel is created with a `buffer` size of `n`.
47 // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48 // slot to hold a message. This allows `Sender` to know for a fact that a send
49 // will succeed *before* starting to do the actual work of sending the value.
50 // Since most of this work is lock-free, once the work starts, it is impossible
51 // to safely revert.
52 //
53 // If the sender is unable to process a send operation, then the current
54 // task is parked and the handle is sent on the parked task queue.
55 //
56 // Note that the implementation guarantees that the channel capacity will never
57 // exceed the configured limit, however there is no *strict* guarantee that the
58 // receiver will wake up a parked task *immediately* when a slot becomes
59 // available. However, it will almost always unpark a task when a slot becomes
60 // available and it is *guaranteed* that a sender will be unparked when the
61 // message that caused the sender to become parked is read out of the channel.
62 //
63 // The steps for sending a message are roughly:
64 //
65 // 1) Increment the channel message count
66 // 2) If the channel is at capacity, push the task handle onto the wait queue
67 // 3) Push the message onto the message queue.
68 //
69 // The steps for receiving a message are roughly:
70 //
71 // 1) Pop a message from the message queue
72 // 2) Pop a task handle from the wait queue
73 // 3) Decrement the channel message count.
74 //
75 // It's important for the order of operations on lock-free structures to happen
76 // in reverse order between the sender and receiver. This makes the message
77 // queue the primary coordination structure and establishes the necessary
78 // happens-before semantics required for the acquire / release semantics used
79 // by the queue structure.
80
81 use futures_core::stream::{FusedStream, Stream};
82 use futures_core::task::{Context, Poll, Waker};
83 use futures_core::task::__internal::AtomicWaker;
84 use std::fmt;
85 use std::pin::Pin;
86 use std::sync::{Arc, Mutex};
87 use std::sync::atomic::AtomicUsize;
88 use std::sync::atomic::Ordering::SeqCst;
89
90 use crate::mpsc::queue::Queue;
91
92 mod queue;
93 #[cfg(feature = "sink")]
94 mod sink_impl;
95
96 #[derive(Debug)]
97 struct UnboundedSenderInner<T> {
98 // Channel state shared between the sender and receiver.
99 inner: Arc<UnboundedInner<T>>,
100 }
101
102 #[derive(Debug)]
103 struct BoundedSenderInner<T> {
104 // Channel state shared between the sender and receiver.
105 inner: Arc<BoundedInner<T>>,
106
107 // Handle to the task that is blocked on this sender. This handle is sent
108 // to the receiver half in order to be notified when the sender becomes
109 // unblocked.
110 sender_task: Arc<Mutex<SenderTask>>,
111
112 // `true` if the sender might be blocked. This is an optimization to avoid
113 // having to lock the mutex most of the time.
114 maybe_parked: bool,
115 }
116
117 // We never project Pin<&mut SenderInner> to `Pin<&mut T>`
118 impl<T> Unpin for UnboundedSenderInner<T> {}
119 impl<T> Unpin for BoundedSenderInner<T> {}
120
121 /// The transmission end of a bounded mpsc channel.
122 ///
123 /// This value is created by the [`channel`](channel) function.
124 #[derive(Debug)]
125 pub struct Sender<T>(Option<BoundedSenderInner<T>>);
126
127 /// The transmission end of an unbounded mpsc channel.
128 ///
129 /// This value is created by the [`unbounded`](unbounded) function.
130 #[derive(Debug)]
131 pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
132
133 trait AssertKinds: Send + Sync + Clone {}
134 impl AssertKinds for UnboundedSender<u32> {}
135
136 /// The receiving end of a bounded mpsc channel.
137 ///
138 /// This value is created by the [`channel`](channel) function.
139 #[derive(Debug)]
140 pub struct Receiver<T> {
141 inner: Option<Arc<BoundedInner<T>>>,
142 }
143
144 /// The receiving end of an unbounded mpsc channel.
145 ///
146 /// This value is created by the [`unbounded`](unbounded) function.
147 #[derive(Debug)]
148 pub struct UnboundedReceiver<T> {
149 inner: Option<Arc<UnboundedInner<T>>>,
150 }
151
152 // `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
153 impl<T> Unpin for UnboundedReceiver<T> {}
154
155 /// The error type for [`Sender`s](Sender) used as `Sink`s.
156 #[derive(Clone, Debug, PartialEq, Eq)]
157 pub struct SendError {
158 kind: SendErrorKind,
159 }
160
161 /// The error type returned from [`try_send`](Sender::try_send).
162 #[derive(Clone, PartialEq, Eq)]
163 pub struct TrySendError<T> {
164 err: SendError,
165 val: T,
166 }
167
168 #[derive(Clone, Debug, PartialEq, Eq)]
169 enum SendErrorKind {
170 Full,
171 Disconnected,
172 }
173
174 /// The error type returned from [`try_next`](Receiver::try_next).
175 pub struct TryRecvError {
176 _priv: (),
177 }
178
179 impl fmt::Display for SendError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result180 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181 if self.is_full() {
182 write!(f, "send failed because channel is full")
183 } else {
184 write!(f, "send failed because receiver is gone")
185 }
186 }
187 }
188
189 impl std::error::Error for SendError {}
190
191 impl SendError {
192 /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool193 pub fn is_full(&self) -> bool {
194 match self.kind {
195 SendErrorKind::Full => true,
196 _ => false,
197 }
198 }
199
200 /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool201 pub fn is_disconnected(&self) -> bool {
202 match self.kind {
203 SendErrorKind::Disconnected => true,
204 _ => false,
205 }
206 }
207 }
208
209 impl<T> fmt::Debug for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result210 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211 f.debug_struct("TrySendError")
212 .field("kind", &self.err.kind)
213 .finish()
214 }
215 }
216
217 impl<T> fmt::Display for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219 if self.is_full() {
220 write!(f, "send failed because channel is full")
221 } else {
222 write!(f, "send failed because receiver is gone")
223 }
224 }
225 }
226
227 impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
228
229 impl<T> TrySendError<T> {
230 /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool231 pub fn is_full(&self) -> bool {
232 self.err.is_full()
233 }
234
235 /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool236 pub fn is_disconnected(&self) -> bool {
237 self.err.is_disconnected()
238 }
239
240 /// Returns the message that was attempted to be sent but failed.
into_inner(self) -> T241 pub fn into_inner(self) -> T {
242 self.val
243 }
244
245 /// Drops the message and converts into a `SendError`.
into_send_error(self) -> SendError246 pub fn into_send_error(self) -> SendError {
247 self.err
248 }
249 }
250
251 impl fmt::Debug for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result252 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253 f.debug_tuple("TryRecvError")
254 .finish()
255 }
256 }
257
258 impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result259 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260 write!(f, "receiver channel is empty")
261 }
262 }
263
264 impl std::error::Error for TryRecvError {}
265
266 #[derive(Debug)]
267 struct UnboundedInner<T> {
268 // Internal channel state. Consists of the number of messages stored in the
269 // channel as well as a flag signalling that the channel is closed.
270 state: AtomicUsize,
271
272 // Atomic, FIFO queue used to send messages to the receiver
273 message_queue: Queue<T>,
274
275 // Number of senders in existence
276 num_senders: AtomicUsize,
277
278 // Handle to the receiver's task.
279 recv_task: AtomicWaker,
280 }
281
282 #[derive(Debug)]
283 struct BoundedInner<T> {
284 // Max buffer size of the channel. If `None` then the channel is unbounded.
285 buffer: usize,
286
287 // Internal channel state. Consists of the number of messages stored in the
288 // channel as well as a flag signalling that the channel is closed.
289 state: AtomicUsize,
290
291 // Atomic, FIFO queue used to send messages to the receiver
292 message_queue: Queue<T>,
293
294 // Atomic, FIFO queue used to send parked task handles to the receiver.
295 parked_queue: Queue<Arc<Mutex<SenderTask>>>,
296
297 // Number of senders in existence
298 num_senders: AtomicUsize,
299
300 // Handle to the receiver's task.
301 recv_task: AtomicWaker,
302 }
303
304 // Struct representation of `Inner::state`.
305 #[derive(Debug, Clone, Copy)]
306 struct State {
307 // `true` when the channel is open
308 is_open: bool,
309
310 // Number of messages in the channel
311 num_messages: usize,
312 }
313
314 // The `is_open` flag is stored in the left-most bit of `Inner::state`
315 const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
316
317 // When a new channel is created, it is created in the open state with no
318 // pending messages.
319 const INIT_STATE: usize = OPEN_MASK;
320
321 // The maximum number of messages that a channel can track is `usize::max_value() >> 1`
322 const MAX_CAPACITY: usize = !(OPEN_MASK);
323
324 // The maximum requested buffer size must be less than the maximum capacity of
325 // a channel. This is because each sender gets a guaranteed slot.
326 const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
327
328 // Sent to the consumer to wake up blocked producers
329 #[derive(Debug)]
330 struct SenderTask {
331 task: Option<Waker>,
332 is_parked: bool,
333 }
334
335 impl SenderTask {
new() -> Self336 fn new() -> Self {
337 SenderTask {
338 task: None,
339 is_parked: false,
340 }
341 }
342
notify(&mut self)343 fn notify(&mut self) {
344 self.is_parked = false;
345
346 if let Some(task) = self.task.take() {
347 task.wake();
348 }
349 }
350 }
351
352 /// Creates a bounded mpsc channel for communicating between asynchronous tasks.
353 ///
354 /// Being bounded, this channel provides backpressure to ensure that the sender
355 /// outpaces the receiver by only a limited amount. The channel's capacity is
356 /// equal to `buffer + num-senders`. In other words, each sender gets a
357 /// guaranteed slot in the channel capacity, and on top of that there are
358 /// `buffer` "first come, first serve" slots available to all senders.
359 ///
360 /// The [`Receiver`](Receiver) returned implements the
361 /// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements
362 /// `Sink`.
channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>)363 pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
364 // Check that the requested buffer size does not exceed the maximum buffer
365 // size permitted by the system.
366 assert!(buffer < MAX_BUFFER, "requested buffer size too large");
367
368 let inner = Arc::new(BoundedInner {
369 buffer,
370 state: AtomicUsize::new(INIT_STATE),
371 message_queue: Queue::new(),
372 parked_queue: Queue::new(),
373 num_senders: AtomicUsize::new(1),
374 recv_task: AtomicWaker::new(),
375 });
376
377 let tx = BoundedSenderInner {
378 inner: inner.clone(),
379 sender_task: Arc::new(Mutex::new(SenderTask::new())),
380 maybe_parked: false,
381 };
382
383 let rx = Receiver {
384 inner: Some(inner),
385 };
386
387 (Sender(Some(tx)), rx)
388 }
389
390 /// Creates an unbounded mpsc channel for communicating between asynchronous
391 /// tasks.
392 ///
393 /// A `send` on this channel will always succeed as long as the receive half has
394 /// not been closed. If the receiver falls behind, messages will be arbitrarily
395 /// buffered.
396 ///
397 /// **Note** that the amount of available system memory is an implicit bound to
398 /// the channel. Using an `unbounded` channel has the ability of causing the
399 /// process to run out of memory. In this case, the process will be aborted.
unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)400 pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
401
402 let inner = Arc::new(UnboundedInner {
403 state: AtomicUsize::new(INIT_STATE),
404 message_queue: Queue::new(),
405 num_senders: AtomicUsize::new(1),
406 recv_task: AtomicWaker::new(),
407 });
408
409 let tx = UnboundedSenderInner {
410 inner: inner.clone(),
411 };
412
413 let rx = UnboundedReceiver {
414 inner: Some(inner),
415 };
416
417 (UnboundedSender(Some(tx)), rx)
418 }
419
420 /*
421 *
422 * ===== impl Sender =====
423 *
424 */
425
426 impl<T> UnboundedSenderInner<T> {
poll_ready_nb(&self) -> Poll<Result<(), SendError>>427 fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
428 let state = decode_state(self.inner.state.load(SeqCst));
429 if state.is_open {
430 Poll::Ready(Ok(()))
431 } else {
432 Poll::Ready(Err(SendError {
433 kind: SendErrorKind::Disconnected,
434 }))
435 }
436 }
437
438
439 // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)440 fn queue_push_and_signal(&self, msg: T) {
441 // Push the message onto the message queue
442 self.inner.message_queue.push(msg);
443
444 // Signal to the receiver that a message has been enqueued. If the
445 // receiver is parked, this will unpark the task.
446 self.inner.recv_task.wake();
447 }
448
449 // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>450 fn inc_num_messages(&self) -> Option<usize> {
451 let mut curr = self.inner.state.load(SeqCst);
452
453 loop {
454 let mut state = decode_state(curr);
455
456 // The receiver end closed the channel.
457 if !state.is_open {
458 return None;
459 }
460
461 // This probably is never hit? Odds are the process will run out of
462 // memory first. It may be worth to return something else in this
463 // case?
464 assert!(state.num_messages < MAX_CAPACITY, "buffer space \
465 exhausted; sending this messages would overflow the state");
466
467 state.num_messages += 1;
468
469 let next = encode_state(&state);
470 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
471 Ok(_) => {
472 return Some(state.num_messages)
473 }
474 Err(actual) => curr = actual,
475 }
476 }
477 }
478
479 /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool480 fn same_receiver(&self, other: &Self) -> bool {
481 Arc::ptr_eq(&self.inner, &other.inner)
482 }
483
484 /// Returns pointer to the Arc containing sender
485 ///
486 /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const UnboundedInner<T>487 fn ptr(&self) -> *const UnboundedInner<T> {
488 &*self.inner
489 }
490
491 /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool492 fn is_closed(&self) -> bool {
493 !decode_state(self.inner.state.load(SeqCst)).is_open
494 }
495
496 /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)497 fn close_channel(&self) {
498 // There's no need to park this sender, its dropping,
499 // and we don't want to check for capacity, so skip
500 // that stuff from `do_send`.
501
502 self.inner.set_closed();
503 self.inner.recv_task.wake();
504 }
505 }
506
507 impl<T> BoundedSenderInner<T> {
508 /// Attempts to send a message on this `Sender`, returning the message
509 /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>510 fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
511 // If the sender is currently blocked, reject the message
512 if !self.poll_unparked(None).is_ready() {
513 return Err(TrySendError {
514 err: SendError {
515 kind: SendErrorKind::Full,
516 },
517 val: msg,
518 });
519 }
520
521 // The channel has capacity to accept the message, so send it
522 self.do_send_b(msg)
523 }
524
525 // Do the send without failing.
526 // Can be called only by bounded sender.
527 #[allow(clippy::debug_assert_with_mut_call)]
do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>>528 fn do_send_b(&mut self, msg: T)
529 -> Result<(), TrySendError<T>>
530 {
531 // Anyone callig do_send *should* make sure there is room first,
532 // but assert here for tests as a sanity check.
533 debug_assert!(self.poll_unparked(None).is_ready());
534
535 // First, increment the number of messages contained by the channel.
536 // This operation will also atomically determine if the sender task
537 // should be parked.
538 //
539 // `None` is returned in the case that the channel has been closed by the
540 // receiver. This happens when `Receiver::close` is called or the
541 // receiver is dropped.
542 let park_self = match self.inc_num_messages() {
543 Some(num_messages) => {
544 // Block if the current number of pending messages has exceeded
545 // the configured buffer size
546 num_messages > self.inner.buffer
547 }
548 None => return Err(TrySendError {
549 err: SendError {
550 kind: SendErrorKind::Disconnected,
551 },
552 val: msg,
553 }),
554 };
555
556 // If the channel has reached capacity, then the sender task needs to
557 // be parked. This will send the task handle on the parked task queue.
558 //
559 // However, when `do_send` is called while dropping the `Sender`,
560 // `task::current()` can't be called safely. In this case, in order to
561 // maintain internal consistency, a blank message is pushed onto the
562 // parked task queue.
563 if park_self {
564 self.park();
565 }
566
567 self.queue_push_and_signal(msg);
568
569 Ok(())
570 }
571
572 // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)573 fn queue_push_and_signal(&self, msg: T) {
574 // Push the message onto the message queue
575 self.inner.message_queue.push(msg);
576
577 // Signal to the receiver that a message has been enqueued. If the
578 // receiver is parked, this will unpark the task.
579 self.inner.recv_task.wake();
580 }
581
582 // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>583 fn inc_num_messages(&self) -> Option<usize> {
584 let mut curr = self.inner.state.load(SeqCst);
585
586 loop {
587 let mut state = decode_state(curr);
588
589 // The receiver end closed the channel.
590 if !state.is_open {
591 return None;
592 }
593
594 // This probably is never hit? Odds are the process will run out of
595 // memory first. It may be worth to return something else in this
596 // case?
597 assert!(state.num_messages < MAX_CAPACITY, "buffer space \
598 exhausted; sending this messages would overflow the state");
599
600 state.num_messages += 1;
601
602 let next = encode_state(&state);
603 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
604 Ok(_) => {
605 return Some(state.num_messages)
606 }
607 Err(actual) => curr = actual,
608 }
609 }
610 }
611
park(&mut self)612 fn park(&mut self) {
613 {
614 let mut sender = self.sender_task.lock().unwrap();
615 sender.task = None;
616 sender.is_parked = true;
617 }
618
619 // Send handle over queue
620 let t = self.sender_task.clone();
621 self.inner.parked_queue.push(t);
622
623 // Check to make sure we weren't closed after we sent our task on the
624 // queue
625 let state = decode_state(self.inner.state.load(SeqCst));
626 self.maybe_parked = state.is_open;
627 }
628
629 /// Polls the channel to determine if there is guaranteed capacity to send
630 /// at least one item without waiting.
631 ///
632 /// # Return value
633 ///
634 /// This method returns:
635 ///
636 /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
637 /// - `Poll::Pending` if the channel may not have
638 /// capacity, in which case the current task is queued to be notified once
639 /// capacity is available;
640 /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), SendError>>641 fn poll_ready(
642 &mut self,
643 cx: &mut Context<'_>,
644 ) -> Poll<Result<(), SendError>> {
645 let state = decode_state(self.inner.state.load(SeqCst));
646 if !state.is_open {
647 return Poll::Ready(Err(SendError {
648 kind: SendErrorKind::Disconnected,
649 }));
650 }
651
652 self.poll_unparked(Some(cx)).map(Ok)
653 }
654
655 /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool656 fn same_receiver(&self, other: &Self) -> bool {
657 Arc::ptr_eq(&self.inner, &other.inner)
658 }
659
660 /// Returns pointer to the Arc containing sender
661 ///
662 /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const BoundedInner<T>663 fn ptr(&self) -> *const BoundedInner<T> {
664 &*self.inner
665 }
666
667 /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool668 fn is_closed(&self) -> bool {
669 !decode_state(self.inner.state.load(SeqCst)).is_open
670 }
671
672 /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)673 fn close_channel(&self) {
674 // There's no need to park this sender, its dropping,
675 // and we don't want to check for capacity, so skip
676 // that stuff from `do_send`.
677
678 self.inner.set_closed();
679 self.inner.recv_task.wake();
680 }
681
poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()>682 fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
683 // First check the `maybe_parked` variable. This avoids acquiring the
684 // lock in most cases
685 if self.maybe_parked {
686 // Get a lock on the task handle
687 let mut task = self.sender_task.lock().unwrap();
688
689 if !task.is_parked {
690 self.maybe_parked = false;
691 return Poll::Ready(())
692 }
693
694 // At this point, an unpark request is pending, so there will be an
695 // unpark sometime in the future. We just need to make sure that
696 // the correct task will be notified.
697 //
698 // Update the task in case the `Sender` has been moved to another
699 // task
700 task.task = cx.map(|cx| cx.waker().clone());
701
702 Poll::Pending
703 } else {
704 Poll::Ready(())
705 }
706 }
707 }
708
709 impl<T> Sender<T> {
710 /// Attempts to send a message on this `Sender`, returning the message
711 /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>712 pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
713 if let Some(inner) = &mut self.0 {
714 inner.try_send(msg)
715 } else {
716 Err(TrySendError {
717 err: SendError {
718 kind: SendErrorKind::Disconnected,
719 },
720 val: msg,
721 })
722 }
723 }
724
725 /// Send a message on the channel.
726 ///
727 /// This function should only be called after
728 /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
729 /// ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>730 pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
731 self.try_send(msg)
732 .map_err(|e| e.err)
733 }
734
735 /// Polls the channel to determine if there is guaranteed capacity to send
736 /// at least one item without waiting.
737 ///
738 /// # Return value
739 ///
740 /// This method returns:
741 ///
742 /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
743 /// - `Poll::Pending` if the channel may not have
744 /// capacity, in which case the current task is queued to be notified once
745 /// capacity is available;
746 /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), SendError>>747 pub fn poll_ready(
748 &mut self,
749 cx: &mut Context<'_>,
750 ) -> Poll<Result<(), SendError>> {
751 let inner = self.0.as_mut().ok_or(SendError {
752 kind: SendErrorKind::Disconnected,
753 })?;
754 inner.poll_ready(cx)
755 }
756
757 /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool758 pub fn is_closed(&self) -> bool {
759 self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
760 }
761
762 /// Closes this channel from the sender side, preventing any new messages.
close_channel(&mut self)763 pub fn close_channel(&mut self) {
764 if let Some(inner) = &mut self.0 {
765 inner.close_channel();
766 }
767 }
768
769 /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)770 pub fn disconnect(&mut self) {
771 self.0 = None;
772 }
773
774 /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool775 pub fn same_receiver(&self, other: &Self) -> bool {
776 match (&self.0, &other.0) {
777 (Some(inner), Some(other)) => inner.same_receiver(other),
778 _ => false,
779 }
780 }
781
782 /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher783 pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
784 use std::hash::Hash;
785
786 let ptr = self.0.as_ref().map(|inner| inner.ptr());
787 ptr.hash(hasher);
788 }
789 }
790
791 impl<T> UnboundedSender<T> {
792 /// Check if the channel is ready to receive a message.
poll_ready( &self, _: &mut Context<'_>, ) -> Poll<Result<(), SendError>>793 pub fn poll_ready(
794 &self,
795 _: &mut Context<'_>,
796 ) -> Poll<Result<(), SendError>> {
797 let inner = self.0.as_ref().ok_or(SendError {
798 kind: SendErrorKind::Disconnected,
799 })?;
800 inner.poll_ready_nb()
801 }
802
803 /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool804 pub fn is_closed(&self) -> bool {
805 self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
806 }
807
808 /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)809 pub fn close_channel(&self) {
810 if let Some(inner) = &self.0 {
811 inner.close_channel();
812 }
813 }
814
815 /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)816 pub fn disconnect(&mut self) {
817 self.0 = None;
818 }
819
820 // Do the send without parking current task.
do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>>821 fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
822 if let Some(inner) = &self.0 {
823 if inner.inc_num_messages().is_some() {
824 inner.queue_push_and_signal(msg);
825 return Ok(());
826 }
827 }
828
829 Err(TrySendError {
830 err: SendError {
831 kind: SendErrorKind::Disconnected,
832 },
833 val: msg,
834 })
835 }
836
837 /// Send a message on the channel.
838 ///
839 /// This method should only be called after `poll_ready` has been used to
840 /// verify that the channel is ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>841 pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
842 self.do_send_nb(msg)
843 .map_err(|e| e.err)
844 }
845
846 /// Sends a message along this channel.
847 ///
848 /// This is an unbounded sender, so this function differs from `Sink::send`
849 /// by ensuring the return type reflects that the channel is always ready to
850 /// receive messages.
unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>>851 pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
852 self.do_send_nb(msg)
853 }
854
855 /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool856 pub fn same_receiver(&self, other: &Self) -> bool {
857 match (&self.0, &other.0) {
858 (Some(inner), Some(other)) => inner.same_receiver(other),
859 _ => false,
860 }
861 }
862
863 /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher864 pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
865 use std::hash::Hash;
866
867 let ptr = self.0.as_ref().map(|inner| inner.ptr());
868 ptr.hash(hasher);
869 }
870 }
871
872 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>873 fn clone(&self) -> Sender<T> {
874 Sender(self.0.clone())
875 }
876 }
877
878 impl<T> Clone for UnboundedSender<T> {
clone(&self) -> UnboundedSender<T>879 fn clone(&self) -> UnboundedSender<T> {
880 UnboundedSender(self.0.clone())
881 }
882 }
883
884 impl<T> Clone for UnboundedSenderInner<T> {
clone(&self) -> UnboundedSenderInner<T>885 fn clone(&self) -> UnboundedSenderInner<T> {
886 // Since this atomic op isn't actually guarding any memory and we don't
887 // care about any orderings besides the ordering on the single atomic
888 // variable, a relaxed ordering is acceptable.
889 let mut curr = self.inner.num_senders.load(SeqCst);
890
891 loop {
892 // If the maximum number of senders has been reached, then fail
893 if curr == MAX_BUFFER {
894 panic!("cannot clone `Sender` -- too many outstanding senders");
895 }
896
897 debug_assert!(curr < MAX_BUFFER);
898
899 let next = curr + 1;
900 let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
901
902 // The ABA problem doesn't matter here. We only care that the
903 // number of senders never exceeds the maximum.
904 if actual == curr {
905 return UnboundedSenderInner {
906 inner: self.inner.clone(),
907 };
908 }
909
910 curr = actual;
911 }
912 }
913 }
914
915 impl<T> Clone for BoundedSenderInner<T> {
clone(&self) -> BoundedSenderInner<T>916 fn clone(&self) -> BoundedSenderInner<T> {
917 // Since this atomic op isn't actually guarding any memory and we don't
918 // care about any orderings besides the ordering on the single atomic
919 // variable, a relaxed ordering is acceptable.
920 let mut curr = self.inner.num_senders.load(SeqCst);
921
922 loop {
923 // If the maximum number of senders has been reached, then fail
924 if curr == self.inner.max_senders() {
925 panic!("cannot clone `Sender` -- too many outstanding senders");
926 }
927
928 debug_assert!(curr < self.inner.max_senders());
929
930 let next = curr + 1;
931 let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
932
933 // The ABA problem doesn't matter here. We only care that the
934 // number of senders never exceeds the maximum.
935 if actual == curr {
936 return BoundedSenderInner {
937 inner: self.inner.clone(),
938 sender_task: Arc::new(Mutex::new(SenderTask::new())),
939 maybe_parked: false,
940 };
941 }
942
943 curr = actual;
944 }
945 }
946 }
947
948 impl<T> Drop for UnboundedSenderInner<T> {
drop(&mut self)949 fn drop(&mut self) {
950 // Ordering between variables don't matter here
951 let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
952
953 if prev == 1 {
954 self.close_channel();
955 }
956 }
957 }
958
959 impl<T> Drop for BoundedSenderInner<T> {
drop(&mut self)960 fn drop(&mut self) {
961 // Ordering between variables don't matter here
962 let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
963
964 if prev == 1 {
965 self.close_channel();
966 }
967 }
968 }
969
970 /*
971 *
972 * ===== impl Receiver =====
973 *
974 */
975
976 impl<T> Receiver<T> {
977 /// Closes the receiving half of a channel, without dropping it.
978 ///
979 /// This prevents any further messages from being sent on the channel while
980 /// still enabling the receiver to drain messages that are buffered.
close(&mut self)981 pub fn close(&mut self) {
982 if let Some(inner) = &mut self.inner {
983 inner.set_closed();
984
985 // Wake up any threads waiting as they'll see that we've closed the
986 // channel and will continue on their merry way.
987 while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
988 task.lock().unwrap().notify();
989 }
990 }
991 }
992
993 /// Tries to receive the next message without notifying a context if empty.
994 ///
995 /// It is not recommended to call this function from inside of a future,
996 /// only when you've otherwise arranged to be notified when the channel is
997 /// no longer empty.
998 ///
999 /// This function will panic if called after `try_next` or `poll_next` has
1000 /// returned `None`.
try_next(&mut self) -> Result<Option<T>, TryRecvError>1001 pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1002 match self.next_message() {
1003 Poll::Ready(msg) => {
1004 Ok(msg)
1005 },
1006 Poll::Pending => Err(TryRecvError { _priv: () }),
1007 }
1008 }
1009
next_message(&mut self) -> Poll<Option<T>>1010 fn next_message(&mut self) -> Poll<Option<T>> {
1011 let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
1012 // Pop off a message
1013 match unsafe { inner.message_queue.pop_spin() } {
1014 Some(msg) => {
1015 // If there are any parked task handles in the parked queue,
1016 // pop one and unpark it.
1017 self.unpark_one();
1018
1019 // Decrement number of messages
1020 self.dec_num_messages();
1021
1022 Poll::Ready(Some(msg))
1023 }
1024 None => {
1025 let state = decode_state(inner.state.load(SeqCst));
1026 if state.is_open || state.num_messages != 0 {
1027 // If queue is open, we need to return Pending
1028 // to be woken up when new messages arrive.
1029 // If queue is closed but num_messages is non-zero,
1030 // it means that senders updated the state,
1031 // but didn't put message to queue yet,
1032 // so we need to park until sender unparks the task
1033 // after queueing the message.
1034 Poll::Pending
1035 } else {
1036 // If closed flag is set AND there are no pending messages
1037 // it means end of stream
1038 self.inner = None;
1039 Poll::Ready(None)
1040 }
1041 }
1042 }
1043 }
1044
1045 // Unpark a single task handle if there is one pending in the parked queue
unpark_one(&mut self)1046 fn unpark_one(&mut self) {
1047 if let Some(inner) = &mut self.inner {
1048 if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1049 task.lock().unwrap().notify();
1050 }
1051 }
1052 }
1053
dec_num_messages(&self)1054 fn dec_num_messages(&self) {
1055 if let Some(inner) = &self.inner {
1056 // OPEN_MASK is highest bit, so it's unaffected by subtraction
1057 // unless there's underflow, and we know there's no underflow
1058 // because number of messages at this point is always > 0.
1059 inner.state.fetch_sub(1, SeqCst);
1060 }
1061 }
1062 }
1063
1064 // The receiver does not ever take a Pin to the inner T
1065 impl<T> Unpin for Receiver<T> {}
1066
1067 impl<T> FusedStream for Receiver<T> {
is_terminated(&self) -> bool1068 fn is_terminated(&self) -> bool {
1069 self.inner.is_none()
1070 }
1071 }
1072
1073 impl<T> Stream for Receiver<T> {
1074 type Item = T;
1075
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<T>>1076 fn poll_next(
1077 mut self: Pin<&mut Self>,
1078 cx: &mut Context<'_>,
1079 ) -> Poll<Option<T>> {
1080 // Try to read a message off of the message queue.
1081 match self.next_message() {
1082 Poll::Ready(msg) => {
1083 if msg.is_none() {
1084 self.inner = None;
1085 }
1086 Poll::Ready(msg)
1087 },
1088 Poll::Pending => {
1089 // There are no messages to read, in this case, park.
1090 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1091 // Check queue again after parking to prevent race condition:
1092 // a message could be added to the queue after previous `next_message`
1093 // before `register` call.
1094 self.next_message()
1095 }
1096 }
1097 }
1098 }
1099
1100 impl<T> Drop for Receiver<T> {
drop(&mut self)1101 fn drop(&mut self) {
1102 // Drain the channel of all pending messages
1103 self.close();
1104 if self.inner.is_some() {
1105 while let Poll::Ready(Some(..)) = self.next_message() {
1106 // ...
1107 }
1108 }
1109 }
1110 }
1111
1112 impl<T> UnboundedReceiver<T> {
1113 /// Closes the receiving half of a channel, without dropping it.
1114 ///
1115 /// This prevents any further messages from being sent on the channel while
1116 /// still enabling the receiver to drain messages that are buffered.
close(&mut self)1117 pub fn close(&mut self) {
1118 if let Some(inner) = &mut self.inner {
1119 inner.set_closed();
1120 }
1121 }
1122
1123 /// Tries to receive the next message without notifying a context if empty.
1124 ///
1125 /// It is not recommended to call this function from inside of a future,
1126 /// only when you've otherwise arranged to be notified when the channel is
1127 /// no longer empty.
1128 ///
1129 /// This function will panic if called after `try_next` or `poll_next` has
1130 /// returned `None`.
try_next(&mut self) -> Result<Option<T>, TryRecvError>1131 pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1132 match self.next_message() {
1133 Poll::Ready(msg) => {
1134 Ok(msg)
1135 },
1136 Poll::Pending => Err(TryRecvError { _priv: () }),
1137 }
1138 }
1139
next_message(&mut self) -> Poll<Option<T>>1140 fn next_message(&mut self) -> Poll<Option<T>> {
1141 let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
1142 // Pop off a message
1143 match unsafe { inner.message_queue.pop_spin() } {
1144 Some(msg) => {
1145 // Decrement number of messages
1146 self.dec_num_messages();
1147
1148 Poll::Ready(Some(msg))
1149 }
1150 None => {
1151 let state = decode_state(inner.state.load(SeqCst));
1152 if state.is_open || state.num_messages != 0 {
1153 // If queue is open, we need to return Pending
1154 // to be woken up when new messages arrive.
1155 // If queue is closed but num_messages is non-zero,
1156 // it means that senders updated the state,
1157 // but didn't put message to queue yet,
1158 // so we need to park until sender unparks the task
1159 // after queueing the message.
1160 Poll::Pending
1161 } else {
1162 // If closed flag is set AND there are no pending messages
1163 // it means end of stream
1164 self.inner = None;
1165 Poll::Ready(None)
1166 }
1167 }
1168 }
1169 }
1170
dec_num_messages(&self)1171 fn dec_num_messages(&self) {
1172 if let Some(inner) = &self.inner {
1173 // OPEN_MASK is highest bit, so it's unaffected by subtraction
1174 // unless there's underflow, and we know there's no underflow
1175 // because number of messages at this point is always > 0.
1176 inner.state.fetch_sub(1, SeqCst);
1177 }
1178 }
1179 }
1180
1181 impl<T> FusedStream for UnboundedReceiver<T> {
is_terminated(&self) -> bool1182 fn is_terminated(&self) -> bool {
1183 self.inner.is_none()
1184 }
1185 }
1186
1187 impl<T> Stream for UnboundedReceiver<T> {
1188 type Item = T;
1189
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<T>>1190 fn poll_next(
1191 mut self: Pin<&mut Self>,
1192 cx: &mut Context<'_>,
1193 ) -> Poll<Option<T>> {
1194 // Try to read a message off of the message queue.
1195 match self.next_message() {
1196 Poll::Ready(msg) => {
1197 if msg.is_none() {
1198 self.inner = None;
1199 }
1200 Poll::Ready(msg)
1201 },
1202 Poll::Pending => {
1203 // There are no messages to read, in this case, park.
1204 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1205 // Check queue again after parking to prevent race condition:
1206 // a message could be added to the queue after previous `next_message`
1207 // before `register` call.
1208 self.next_message()
1209 }
1210 }
1211 }
1212 }
1213
1214 impl<T> Drop for UnboundedReceiver<T> {
drop(&mut self)1215 fn drop(&mut self) {
1216 // Drain the channel of all pending messages
1217 self.close();
1218 if self.inner.is_some() {
1219 while let Poll::Ready(Some(..)) = self.next_message() {
1220 // ...
1221 }
1222 }
1223 }
1224 }
1225
1226 /*
1227 *
1228 * ===== impl Inner =====
1229 *
1230 */
1231
1232 impl<T> UnboundedInner<T> {
1233 // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1234 fn set_closed(&self) {
1235 let curr = self.state.load(SeqCst);
1236 if !decode_state(curr).is_open {
1237 return;
1238 }
1239
1240 self.state.fetch_and(!OPEN_MASK, SeqCst);
1241 }
1242 }
1243
1244 impl<T> BoundedInner<T> {
1245 // The return value is such that the total number of messages that can be
1246 // enqueued into the channel will never exceed MAX_CAPACITY
max_senders(&self) -> usize1247 fn max_senders(&self) -> usize {
1248 MAX_CAPACITY - self.buffer
1249 }
1250
1251 // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1252 fn set_closed(&self) {
1253 let curr = self.state.load(SeqCst);
1254 if !decode_state(curr).is_open {
1255 return;
1256 }
1257
1258 self.state.fetch_and(!OPEN_MASK, SeqCst);
1259 }
1260 }
1261
1262 unsafe impl<T: Send> Send for UnboundedInner<T> {}
1263 unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1264
1265 unsafe impl<T: Send> Send for BoundedInner<T> {}
1266 unsafe impl<T: Send> Sync for BoundedInner<T> {}
1267
1268 /*
1269 *
1270 * ===== Helpers =====
1271 *
1272 */
1273
decode_state(num: usize) -> State1274 fn decode_state(num: usize) -> State {
1275 State {
1276 is_open: num & OPEN_MASK == OPEN_MASK,
1277 num_messages: num & MAX_CAPACITY,
1278 }
1279 }
1280
encode_state(state: &State) -> usize1281 fn encode_state(state: &State) -> usize {
1282 let mut num = state.num_messages;
1283
1284 if state.is_open {
1285 num |= OPEN_MASK;
1286 }
1287
1288 num
1289 }
1290