1 //! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2 //! all consumers.
3 //!
4 //! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5 //! values. [`Sender`] handles are clone-able, allowing concurrent send and
6 //! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7 //! long as `T` is also `Send` or `Sync` respectively.
8 //!
9 //! When a value is sent, **all** [`Receiver`] handles are notified and will
10 //! receive the value. The value is stored once inside the channel and cloned on
11 //! demand for each receiver. Once all receivers have received a clone of the
12 //! value, the value is released from the channel.
13 //!
14 //! A channel is created by calling [`channel`], specifying the maximum number
15 //! of messages the channel can retain at any given time.
16 //!
17 //! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18 //! returned [`Receiver`] will receive values sent **after** the call to
19 //! `subscribe`.
20 //!
21 //! ## Lagging
22 //!
23 //! As sent messages must be retained until **all** [`Receiver`] handles receive
24 //! a clone, broadcast channels are susceptible to the "slow receiver" problem.
25 //! In this case, all but one receiver are able to receive values at the rate
26 //! they are sent. Because one receiver is stalled, the channel starts to fill
27 //! up.
28 //!
29 //! This broadcast channel implementation handles this case by setting a hard
30 //! upper bound on the number of values the channel may retain at any given
31 //! time. This upper bound is passed to the [`channel`] function as an argument.
32 //!
33 //! If a value is sent when the channel is at capacity, the oldest value
34 //! currently held by the channel is released. This frees up space for the new
35 //! value. Any receiver that has not yet seen the released value will return
36 //! [`RecvError::Lagged`] the next time [`recv`] is called.
37 //!
38 //! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
39 //! updated to the oldest value contained by the channel. The next call to
40 //! [`recv`] will return this value.
41 //!
42 //! This behavior enables a receiver to detect when it has lagged so far behind
43 //! that data has been dropped. The caller may decide how to respond to this:
44 //! either by aborting its task or by tolerating lost messages and resuming
45 //! consumption of the channel.
46 //!
47 //! ## Closing
48 //!
49 //! When **all** [`Sender`] handles have been dropped, no new values may be
50 //! sent. At this point, the channel is "closed". Once a receiver has received
51 //! all values retained by the channel, the next call to [`recv`] will return
52 //! with [`RecvError::Closed`].
53 //!
54 //! [`Sender`]: crate::sync::broadcast::Sender
55 //! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
56 //! [`Receiver`]: crate::sync::broadcast::Receiver
57 //! [`channel`]: crate::sync::broadcast::channel
58 //! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
59 //! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
60 //! [`recv`]: crate::sync::broadcast::Receiver::recv
61 //!
62 //! # Examples
63 //!
64 //! Basic usage
65 //!
66 //! ```
67 //! use tokio::sync::broadcast;
68 //!
69 //! #[tokio::main]
70 //! async fn main() {
71 //!     let (tx, mut rx1) = broadcast::channel(16);
72 //!     let mut rx2 = tx.subscribe();
73 //!
74 //!     tokio::spawn(async move {
75 //!         assert_eq!(rx1.recv().await.unwrap(), 10);
76 //!         assert_eq!(rx1.recv().await.unwrap(), 20);
77 //!     });
78 //!
79 //!     tokio::spawn(async move {
80 //!         assert_eq!(rx2.recv().await.unwrap(), 10);
81 //!         assert_eq!(rx2.recv().await.unwrap(), 20);
82 //!     });
83 //!
84 //!     tx.send(10).unwrap();
85 //!     tx.send(20).unwrap();
86 //! }
87 //! ```
88 //!
89 //! Handling lag
90 //!
91 //! ```
92 //! use tokio::sync::broadcast;
93 //!
94 //! #[tokio::main]
95 //! async fn main() {
96 //!     let (tx, mut rx) = broadcast::channel(2);
97 //!
98 //!     tx.send(10).unwrap();
99 //!     tx.send(20).unwrap();
100 //!     tx.send(30).unwrap();
101 //!
102 //!     // The receiver lagged behind
103 //!     assert!(rx.recv().await.is_err());
104 //!
105 //!     // At this point, we can abort or continue with lost messages
106 //!
107 //!     assert_eq!(20, rx.recv().await.unwrap());
108 //!     assert_eq!(30, rx.recv().await.unwrap());
109 //! }
110 //! ```
111 
112 use crate::loom::cell::UnsafeCell;
113 use crate::loom::sync::atomic::AtomicUsize;
114 use crate::loom::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
115 use crate::util::linked_list::{self, LinkedList};
116 
117 use std::fmt;
118 use std::future::Future;
119 use std::marker::PhantomPinned;
120 use std::pin::Pin;
121 use std::ptr::NonNull;
122 use std::sync::atomic::Ordering::SeqCst;
123 use std::task::{Context, Poll, Waker};
124 use std::usize;
125 
126 /// Sending-half of the [`broadcast`] channel.
127 ///
128 /// May be used from many threads. Messages can be sent with
129 /// [`send`][Sender::send].
130 ///
131 /// # Examples
132 ///
133 /// ```
134 /// use tokio::sync::broadcast;
135 ///
136 /// #[tokio::main]
137 /// async fn main() {
138 ///     let (tx, mut rx1) = broadcast::channel(16);
139 ///     let mut rx2 = tx.subscribe();
140 ///
141 ///     tokio::spawn(async move {
142 ///         assert_eq!(rx1.recv().await.unwrap(), 10);
143 ///         assert_eq!(rx1.recv().await.unwrap(), 20);
144 ///     });
145 ///
146 ///     tokio::spawn(async move {
147 ///         assert_eq!(rx2.recv().await.unwrap(), 10);
148 ///         assert_eq!(rx2.recv().await.unwrap(), 20);
149 ///     });
150 ///
151 ///     tx.send(10).unwrap();
152 ///     tx.send(20).unwrap();
153 /// }
154 /// ```
155 ///
156 /// [`broadcast`]: crate::sync::broadcast
157 pub struct Sender<T> {
158     shared: Arc<Shared<T>>,
159 }
160 
161 /// Receiving-half of the [`broadcast`] channel.
162 ///
163 /// Must not be used concurrently. Messages may be retrieved using
164 /// [`recv`][Receiver::recv].
165 ///
166 /// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
167 /// wrapper.
168 ///
169 /// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
170 ///
171 /// # Examples
172 ///
173 /// ```
174 /// use tokio::sync::broadcast;
175 ///
176 /// #[tokio::main]
177 /// async fn main() {
178 ///     let (tx, mut rx1) = broadcast::channel(16);
179 ///     let mut rx2 = tx.subscribe();
180 ///
181 ///     tokio::spawn(async move {
182 ///         assert_eq!(rx1.recv().await.unwrap(), 10);
183 ///         assert_eq!(rx1.recv().await.unwrap(), 20);
184 ///     });
185 ///
186 ///     tokio::spawn(async move {
187 ///         assert_eq!(rx2.recv().await.unwrap(), 10);
188 ///         assert_eq!(rx2.recv().await.unwrap(), 20);
189 ///     });
190 ///
191 ///     tx.send(10).unwrap();
192 ///     tx.send(20).unwrap();
193 /// }
194 /// ```
195 ///
196 /// [`broadcast`]: crate::sync::broadcast
197 pub struct Receiver<T> {
198     /// State shared with all receivers and senders.
199     shared: Arc<Shared<T>>,
200 
201     /// Next position to read from
202     next: u64,
203 }
204 
205 pub mod error {
206     //! Broadcast error types
207 
208     use std::fmt;
209 
210     /// Error returned by from the [`send`] function on a [`Sender`].
211     ///
212     /// A **send** operation can only fail if there are no active receivers,
213     /// implying that the message could never be received. The error contains the
214     /// message being sent as a payload so it can be recovered.
215     ///
216     /// [`send`]: crate::sync::broadcast::Sender::send
217     /// [`Sender`]: crate::sync::broadcast::Sender
218     #[derive(Debug)]
219     pub struct SendError<T>(pub T);
220 
221     impl<T> fmt::Display for SendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result222         fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223             write!(f, "channel closed")
224         }
225     }
226 
227     impl<T: fmt::Debug> std::error::Error for SendError<T> {}
228 
229     /// An error returned from the [`recv`] function on a [`Receiver`].
230     ///
231     /// [`recv`]: crate::sync::broadcast::Receiver::recv
232     /// [`Receiver`]: crate::sync::broadcast::Receiver
233     #[derive(Debug, PartialEq)]
234     pub enum RecvError {
235         /// There are no more active senders implying no further messages will ever
236         /// be sent.
237         Closed,
238 
239         /// The receiver lagged too far behind. Attempting to receive again will
240         /// return the oldest message still retained by the channel.
241         ///
242         /// Includes the number of skipped messages.
243         Lagged(u64),
244     }
245 
246     impl fmt::Display for RecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result247         fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248             match self {
249                 RecvError::Closed => write!(f, "channel closed"),
250                 RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
251             }
252         }
253     }
254 
255     impl std::error::Error for RecvError {}
256 
257     /// An error returned from the [`try_recv`] function on a [`Receiver`].
258     ///
259     /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
260     /// [`Receiver`]: crate::sync::broadcast::Receiver
261     #[derive(Debug, PartialEq)]
262     pub enum TryRecvError {
263         /// The channel is currently empty. There are still active
264         /// [`Sender`] handles, so data may yet become available.
265         ///
266         /// [`Sender`]: crate::sync::broadcast::Sender
267         Empty,
268 
269         /// There are no more active senders implying no further messages will ever
270         /// be sent.
271         Closed,
272 
273         /// The receiver lagged too far behind and has been forcibly disconnected.
274         /// Attempting to receive again will return the oldest message still
275         /// retained by the channel.
276         ///
277         /// Includes the number of skipped messages.
278         Lagged(u64),
279     }
280 
281     impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result282         fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
283             match self {
284                 TryRecvError::Empty => write!(f, "channel empty"),
285                 TryRecvError::Closed => write!(f, "channel closed"),
286                 TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
287             }
288         }
289     }
290 
291     impl std::error::Error for TryRecvError {}
292 }
293 
294 use self::error::*;
295 
296 /// Data shared between senders and receivers.
297 struct Shared<T> {
298     /// slots in the channel.
299     buffer: Box<[RwLock<Slot<T>>]>,
300 
301     /// Mask a position -> index.
302     mask: usize,
303 
304     /// Tail of the queue. Includes the rx wait list.
305     tail: Mutex<Tail>,
306 
307     /// Number of outstanding Sender handles.
308     num_tx: AtomicUsize,
309 }
310 
311 /// Next position to write a value.
312 struct Tail {
313     /// Next position to write to.
314     pos: u64,
315 
316     /// Number of active receivers.
317     rx_cnt: usize,
318 
319     /// True if the channel is closed.
320     closed: bool,
321 
322     /// Receivers waiting for a value.
323     waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
324 }
325 
326 /// Slot in the buffer.
327 struct Slot<T> {
328     /// Remaining number of receivers that are expected to see this value.
329     ///
330     /// When this goes to zero, the value is released.
331     ///
332     /// An atomic is used as it is mutated concurrently with the slot read lock
333     /// acquired.
334     rem: AtomicUsize,
335 
336     /// Uniquely identifies the `send` stored in the slot.
337     pos: u64,
338 
339     /// True signals the channel is closed.
340     closed: bool,
341 
342     /// The value being broadcast.
343     ///
344     /// The value is set by `send` when the write lock is held. When a reader
345     /// drops, `rem` is decremented. When it hits zero, the value is dropped.
346     val: UnsafeCell<Option<T>>,
347 }
348 
349 /// An entry in the wait queue.
350 struct Waiter {
351     /// True if queued.
352     queued: bool,
353 
354     /// Task waiting on the broadcast channel.
355     waker: Option<Waker>,
356 
357     /// Intrusive linked-list pointers.
358     pointers: linked_list::Pointers<Waiter>,
359 
360     /// Should not be `Unpin`.
361     _p: PhantomPinned,
362 }
363 
364 struct RecvGuard<'a, T> {
365     slot: RwLockReadGuard<'a, Slot<T>>,
366 }
367 
368 /// Receive a value future.
369 struct Recv<'a, T> {
370     /// Receiver being waited on.
371     receiver: &'a mut Receiver<T>,
372 
373     /// Entry in the waiter `LinkedList`.
374     waiter: UnsafeCell<Waiter>,
375 }
376 
377 unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
378 unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
379 
380 /// Max number of receivers. Reserve space to lock.
381 const MAX_RECEIVERS: usize = usize::MAX >> 2;
382 
383 /// Create a bounded, multi-producer, multi-consumer channel where each sent
384 /// value is broadcasted to all active receivers.
385 ///
386 /// All data sent on [`Sender`] will become available on every active
387 /// [`Receiver`] in the same order as it was sent.
388 ///
389 /// The `Sender` can be cloned to `send` to the same channel from multiple
390 /// points in the process or it can be used concurrently from an `Arc`. New
391 /// `Receiver` handles are created by calling [`Sender::subscribe`].
392 ///
393 /// If all [`Receiver`] handles are dropped, the `send` method will return a
394 /// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
395 /// method will return a [`RecvError`].
396 ///
397 /// [`Sender`]: crate::sync::broadcast::Sender
398 /// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
399 /// [`Receiver`]: crate::sync::broadcast::Receiver
400 /// [`recv`]: crate::sync::broadcast::Receiver::recv
401 /// [`SendError`]: crate::sync::broadcast::error::SendError
402 /// [`RecvError`]: crate::sync::broadcast::error::RecvError
403 ///
404 /// # Examples
405 ///
406 /// ```
407 /// use tokio::sync::broadcast;
408 ///
409 /// #[tokio::main]
410 /// async fn main() {
411 ///     let (tx, mut rx1) = broadcast::channel(16);
412 ///     let mut rx2 = tx.subscribe();
413 ///
414 ///     tokio::spawn(async move {
415 ///         assert_eq!(rx1.recv().await.unwrap(), 10);
416 ///         assert_eq!(rx1.recv().await.unwrap(), 20);
417 ///     });
418 ///
419 ///     tokio::spawn(async move {
420 ///         assert_eq!(rx2.recv().await.unwrap(), 10);
421 ///         assert_eq!(rx2.recv().await.unwrap(), 20);
422 ///     });
423 ///
424 ///     tx.send(10).unwrap();
425 ///     tx.send(20).unwrap();
426 /// }
427 /// ```
channel<T: Clone>(mut capacity: usize) -> (Sender<T>, Receiver<T>)428 pub fn channel<T: Clone>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
429     assert!(capacity > 0, "capacity is empty");
430     assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
431 
432     // Round to a power of two
433     capacity = capacity.next_power_of_two();
434 
435     let mut buffer = Vec::with_capacity(capacity);
436 
437     for i in 0..capacity {
438         buffer.push(RwLock::new(Slot {
439             rem: AtomicUsize::new(0),
440             pos: (i as u64).wrapping_sub(capacity as u64),
441             closed: false,
442             val: UnsafeCell::new(None),
443         }));
444     }
445 
446     let shared = Arc::new(Shared {
447         buffer: buffer.into_boxed_slice(),
448         mask: capacity - 1,
449         tail: Mutex::new(Tail {
450             pos: 0,
451             rx_cnt: 1,
452             closed: false,
453             waiters: LinkedList::new(),
454         }),
455         num_tx: AtomicUsize::new(1),
456     });
457 
458     let rx = Receiver {
459         shared: shared.clone(),
460         next: 0,
461     };
462 
463     let tx = Sender { shared };
464 
465     (tx, rx)
466 }
467 
468 unsafe impl<T: Send> Send for Sender<T> {}
469 unsafe impl<T: Send> Sync for Sender<T> {}
470 
471 unsafe impl<T: Send> Send for Receiver<T> {}
472 unsafe impl<T: Send> Sync for Receiver<T> {}
473 
474 impl<T> Sender<T> {
475     /// Attempts to send a value to all active [`Receiver`] handles, returning
476     /// it back if it could not be sent.
477     ///
478     /// A successful send occurs when there is at least one active [`Receiver`]
479     /// handle. An unsuccessful send would be one where all associated
480     /// [`Receiver`] handles have already been dropped.
481     ///
482     /// # Return
483     ///
484     /// On success, the number of subscribed [`Receiver`] handles is returned.
485     /// This does not mean that this number of receivers will see the message as
486     /// a receiver may drop before receiving the message.
487     ///
488     /// # Note
489     ///
490     /// A return value of `Ok` **does not** mean that the sent value will be
491     /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
492     /// handles may be dropped before receiving the sent message.
493     ///
494     /// A return value of `Err` **does not** mean that future calls to `send`
495     /// will fail. New [`Receiver`] handles may be created by calling
496     /// [`subscribe`].
497     ///
498     /// [`Receiver`]: crate::sync::broadcast::Receiver
499     /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
500     ///
501     /// # Examples
502     ///
503     /// ```
504     /// use tokio::sync::broadcast;
505     ///
506     /// #[tokio::main]
507     /// async fn main() {
508     ///     let (tx, mut rx1) = broadcast::channel(16);
509     ///     let mut rx2 = tx.subscribe();
510     ///
511     ///     tokio::spawn(async move {
512     ///         assert_eq!(rx1.recv().await.unwrap(), 10);
513     ///         assert_eq!(rx1.recv().await.unwrap(), 20);
514     ///     });
515     ///
516     ///     tokio::spawn(async move {
517     ///         assert_eq!(rx2.recv().await.unwrap(), 10);
518     ///         assert_eq!(rx2.recv().await.unwrap(), 20);
519     ///     });
520     ///
521     ///     tx.send(10).unwrap();
522     ///     tx.send(20).unwrap();
523     /// }
524     /// ```
send(&self, value: T) -> Result<usize, SendError<T>>525     pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
526         self.send2(Some(value))
527             .map_err(|SendError(maybe_v)| SendError(maybe_v.unwrap()))
528     }
529 
530     /// Creates a new [`Receiver`] handle that will receive values sent **after**
531     /// this call to `subscribe`.
532     ///
533     /// # Examples
534     ///
535     /// ```
536     /// use tokio::sync::broadcast;
537     ///
538     /// #[tokio::main]
539     /// async fn main() {
540     ///     let (tx, _rx) = broadcast::channel(16);
541     ///
542     ///     // Will not be seen
543     ///     tx.send(10).unwrap();
544     ///
545     ///     let mut rx = tx.subscribe();
546     ///
547     ///     tx.send(20).unwrap();
548     ///
549     ///     let value = rx.recv().await.unwrap();
550     ///     assert_eq!(20, value);
551     /// }
552     /// ```
subscribe(&self) -> Receiver<T>553     pub fn subscribe(&self) -> Receiver<T> {
554         let shared = self.shared.clone();
555         new_receiver(shared)
556     }
557 
558     /// Returns the number of active receivers
559     ///
560     /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
561     /// [`subscribe`]. These are the handles that will receive values sent on
562     /// this [`Sender`].
563     ///
564     /// # Note
565     ///
566     /// It is not guaranteed that a sent message will reach this number of
567     /// receivers. Active receivers may never call [`recv`] again before
568     /// dropping.
569     ///
570     /// [`recv`]: crate::sync::broadcast::Receiver::recv
571     /// [`Receiver`]: crate::sync::broadcast::Receiver
572     /// [`Sender`]: crate::sync::broadcast::Sender
573     /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
574     /// [`channel`]: crate::sync::broadcast::channel
575     ///
576     /// # Examples
577     ///
578     /// ```
579     /// use tokio::sync::broadcast;
580     ///
581     /// #[tokio::main]
582     /// async fn main() {
583     ///     let (tx, _rx1) = broadcast::channel(16);
584     ///
585     ///     assert_eq!(1, tx.receiver_count());
586     ///
587     ///     let mut _rx2 = tx.subscribe();
588     ///
589     ///     assert_eq!(2, tx.receiver_count());
590     ///
591     ///     tx.send(10).unwrap();
592     /// }
593     /// ```
receiver_count(&self) -> usize594     pub fn receiver_count(&self) -> usize {
595         let tail = self.shared.tail.lock();
596         tail.rx_cnt
597     }
598 
send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>>599     fn send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>> {
600         let mut tail = self.shared.tail.lock();
601 
602         if tail.rx_cnt == 0 {
603             return Err(SendError(value));
604         }
605 
606         // Position to write into
607         let pos = tail.pos;
608         let rem = tail.rx_cnt;
609         let idx = (pos & self.shared.mask as u64) as usize;
610 
611         // Update the tail position
612         tail.pos = tail.pos.wrapping_add(1);
613 
614         // Get the slot
615         let mut slot = self.shared.buffer[idx].write().unwrap();
616 
617         // Track the position
618         slot.pos = pos;
619 
620         // Set remaining receivers
621         slot.rem.with_mut(|v| *v = rem);
622 
623         // Set the closed bit if the value is `None`; otherwise write the value
624         if value.is_none() {
625             tail.closed = true;
626             slot.closed = true;
627         } else {
628             slot.val.with_mut(|ptr| unsafe { *ptr = value });
629         }
630 
631         // Release the slot lock before notifying the receivers.
632         drop(slot);
633 
634         tail.notify_rx();
635 
636         // Release the mutex. This must happen after the slot lock is released,
637         // otherwise the writer lock bit could be cleared while another thread
638         // is in the critical section.
639         drop(tail);
640 
641         Ok(rem)
642     }
643 }
644 
new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T>645 fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
646     let mut tail = shared.tail.lock();
647 
648     if tail.rx_cnt == MAX_RECEIVERS {
649         panic!("max receivers");
650     }
651 
652     tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
653 
654     let next = tail.pos;
655 
656     drop(tail);
657 
658     Receiver { shared, next }
659 }
660 
661 impl Tail {
notify_rx(&mut self)662     fn notify_rx(&mut self) {
663         while let Some(mut waiter) = self.waiters.pop_back() {
664             // Safety: `waiters` lock is still held.
665             let waiter = unsafe { waiter.as_mut() };
666 
667             assert!(waiter.queued);
668             waiter.queued = false;
669 
670             let waker = waiter.waker.take().unwrap();
671             waker.wake();
672         }
673     }
674 }
675 
676 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>677     fn clone(&self) -> Sender<T> {
678         let shared = self.shared.clone();
679         shared.num_tx.fetch_add(1, SeqCst);
680 
681         Sender { shared }
682     }
683 }
684 
685 impl<T> Drop for Sender<T> {
drop(&mut self)686     fn drop(&mut self) {
687         if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
688             let _ = self.send2(None);
689         }
690     }
691 }
692 
693 impl<T> Receiver<T> {
694     /// Locks the next value if there is one.
recv_ref( &mut self, waiter: Option<(&UnsafeCell<Waiter>, &Waker)>, ) -> Result<RecvGuard<'_, T>, TryRecvError>695     fn recv_ref(
696         &mut self,
697         waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
698     ) -> Result<RecvGuard<'_, T>, TryRecvError> {
699         let idx = (self.next & self.shared.mask as u64) as usize;
700 
701         // The slot holding the next value to read
702         let mut slot = self.shared.buffer[idx].read().unwrap();
703 
704         if slot.pos != self.next {
705             let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
706 
707             // The receiver has read all current values in the channel and there
708             // is no waiter to register
709             if waiter.is_none() && next_pos == self.next {
710                 return Err(TryRecvError::Empty);
711             }
712 
713             // Release the `slot` lock before attempting to acquire the `tail`
714             // lock. This is required because `send2` acquires the tail lock
715             // first followed by the slot lock. Acquiring the locks in reverse
716             // order here would result in a potential deadlock: `recv_ref`
717             // acquires the `slot` lock and attempts to acquire the `tail` lock
718             // while `send2` acquired the `tail` lock and attempts to acquire
719             // the slot lock.
720             drop(slot);
721 
722             let mut tail = self.shared.tail.lock();
723 
724             // Acquire slot lock again
725             slot = self.shared.buffer[idx].read().unwrap();
726 
727             // Make sure the position did not change. This could happen in the
728             // unlikely event that the buffer is wrapped between dropping the
729             // read lock and acquiring the tail lock.
730             if slot.pos != self.next {
731                 let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
732 
733                 if next_pos == self.next {
734                     // Store the waker
735                     if let Some((waiter, waker)) = waiter {
736                         // Safety: called while locked.
737                         unsafe {
738                             // Only queue if not already queued
739                             waiter.with_mut(|ptr| {
740                                 // If there is no waker **or** if the currently
741                                 // stored waker references a **different** task,
742                                 // track the tasks' waker to be notified on
743                                 // receipt of a new value.
744                                 match (*ptr).waker {
745                                     Some(ref w) if w.will_wake(waker) => {}
746                                     _ => {
747                                         (*ptr).waker = Some(waker.clone());
748                                     }
749                                 }
750 
751                                 if !(*ptr).queued {
752                                     (*ptr).queued = true;
753                                     tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
754                                 }
755                             });
756                         }
757                     }
758 
759                     return Err(TryRecvError::Empty);
760                 }
761 
762                 // At this point, the receiver has lagged behind the sender by
763                 // more than the channel capacity. The receiver will attempt to
764                 // catch up by skipping dropped messages and setting the
765                 // internal cursor to the **oldest** message stored by the
766                 // channel.
767                 //
768                 // However, finding the oldest position is a bit more
769                 // complicated than `tail-position - buffer-size`. When
770                 // the channel is closed, the tail position is incremented to
771                 // signal a new `None` message, but `None` is not stored in the
772                 // channel itself (see issue #2425 for why).
773                 //
774                 // To account for this, if the channel is closed, the tail
775                 // position is decremented by `buffer-size + 1`.
776                 let mut adjust = 0;
777                 if tail.closed {
778                     adjust = 1
779                 }
780                 let next = tail
781                     .pos
782                     .wrapping_sub(self.shared.buffer.len() as u64 + adjust);
783 
784                 let missed = next.wrapping_sub(self.next);
785 
786                 drop(tail);
787 
788                 // The receiver is slow but no values have been missed
789                 if missed == 0 {
790                     self.next = self.next.wrapping_add(1);
791 
792                     return Ok(RecvGuard { slot });
793                 }
794 
795                 self.next = next;
796 
797                 return Err(TryRecvError::Lagged(missed));
798             }
799         }
800 
801         self.next = self.next.wrapping_add(1);
802 
803         if slot.closed {
804             return Err(TryRecvError::Closed);
805         }
806 
807         Ok(RecvGuard { slot })
808     }
809 }
810 
811 impl<T: Clone> Receiver<T> {
812     /// Receives the next value for this receiver.
813     ///
814     /// Each [`Receiver`] handle will receive a clone of all values sent
815     /// **after** it has subscribed.
816     ///
817     /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
818     /// dropped, indicating that no further values can be sent on the channel.
819     ///
820     /// If the [`Receiver`] handle falls behind, once the channel is full, newly
821     /// sent values will overwrite old values. At this point, a call to [`recv`]
822     /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
823     /// internal cursor is updated to point to the oldest value still held by
824     /// the channel. A subsequent call to [`recv`] will return this value
825     /// **unless** it has been since overwritten.
826     ///
827     /// # Cancel safety
828     ///
829     /// This method is cancel safe. If `recv` is used as the event in a
830     /// [`tokio::select!`](crate::select) statement and some other branch
831     /// completes first, it is guaranteed that no messages were received on this
832     /// channel.
833     ///
834     /// [`Receiver`]: crate::sync::broadcast::Receiver
835     /// [`recv`]: crate::sync::broadcast::Receiver::recv
836     ///
837     /// # Examples
838     ///
839     /// ```
840     /// use tokio::sync::broadcast;
841     ///
842     /// #[tokio::main]
843     /// async fn main() {
844     ///     let (tx, mut rx1) = broadcast::channel(16);
845     ///     let mut rx2 = tx.subscribe();
846     ///
847     ///     tokio::spawn(async move {
848     ///         assert_eq!(rx1.recv().await.unwrap(), 10);
849     ///         assert_eq!(rx1.recv().await.unwrap(), 20);
850     ///     });
851     ///
852     ///     tokio::spawn(async move {
853     ///         assert_eq!(rx2.recv().await.unwrap(), 10);
854     ///         assert_eq!(rx2.recv().await.unwrap(), 20);
855     ///     });
856     ///
857     ///     tx.send(10).unwrap();
858     ///     tx.send(20).unwrap();
859     /// }
860     /// ```
861     ///
862     /// Handling lag
863     ///
864     /// ```
865     /// use tokio::sync::broadcast;
866     ///
867     /// #[tokio::main]
868     /// async fn main() {
869     ///     let (tx, mut rx) = broadcast::channel(2);
870     ///
871     ///     tx.send(10).unwrap();
872     ///     tx.send(20).unwrap();
873     ///     tx.send(30).unwrap();
874     ///
875     ///     // The receiver lagged behind
876     ///     assert!(rx.recv().await.is_err());
877     ///
878     ///     // At this point, we can abort or continue with lost messages
879     ///
880     ///     assert_eq!(20, rx.recv().await.unwrap());
881     ///     assert_eq!(30, rx.recv().await.unwrap());
882     /// }
883     /// ```
recv(&mut self) -> Result<T, RecvError>884     pub async fn recv(&mut self) -> Result<T, RecvError> {
885         let fut = Recv::new(self);
886         fut.await
887     }
888 
889     /// Attempts to return a pending value on this receiver without awaiting.
890     ///
891     /// This is useful for a flavor of "optimistic check" before deciding to
892     /// await on a receiver.
893     ///
894     /// Compared with [`recv`], this function has three failure cases instead of two
895     /// (one for closed, one for an empty buffer, one for a lagging receiver).
896     ///
897     /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
898     /// dropped, indicating that no further values can be sent on the channel.
899     ///
900     /// If the [`Receiver`] handle falls behind, once the channel is full, newly
901     /// sent values will overwrite old values. At this point, a call to [`recv`]
902     /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
903     /// internal cursor is updated to point to the oldest value still held by
904     /// the channel. A subsequent call to [`try_recv`] will return this value
905     /// **unless** it has been since overwritten. If there are no values to
906     /// receive, `Err(TryRecvError::Empty)` is returned.
907     ///
908     /// [`recv`]: crate::sync::broadcast::Receiver::recv
909     /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
910     /// [`Receiver`]: crate::sync::broadcast::Receiver
911     ///
912     /// # Examples
913     ///
914     /// ```
915     /// use tokio::sync::broadcast;
916     ///
917     /// #[tokio::main]
918     /// async fn main() {
919     ///     let (tx, mut rx) = broadcast::channel(16);
920     ///
921     ///     assert!(rx.try_recv().is_err());
922     ///
923     ///     tx.send(10).unwrap();
924     ///
925     ///     let value = rx.try_recv().unwrap();
926     ///     assert_eq!(10, value);
927     /// }
928     /// ```
try_recv(&mut self) -> Result<T, TryRecvError>929     pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
930         let guard = self.recv_ref(None)?;
931         guard.clone_value().ok_or(TryRecvError::Closed)
932     }
933 }
934 
935 impl<T> Drop for Receiver<T> {
drop(&mut self)936     fn drop(&mut self) {
937         let mut tail = self.shared.tail.lock();
938 
939         tail.rx_cnt -= 1;
940         let until = tail.pos;
941 
942         drop(tail);
943 
944         while self.next < until {
945             match self.recv_ref(None) {
946                 Ok(_) => {}
947                 // The channel is closed
948                 Err(TryRecvError::Closed) => break,
949                 // Ignore lagging, we will catch up
950                 Err(TryRecvError::Lagged(..)) => {}
951                 // Can't be empty
952                 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
953             }
954         }
955     }
956 }
957 
958 impl<'a, T> Recv<'a, T> {
new(receiver: &'a mut Receiver<T>) -> Recv<'a, T>959     fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
960         Recv {
961             receiver,
962             waiter: UnsafeCell::new(Waiter {
963                 queued: false,
964                 waker: None,
965                 pointers: linked_list::Pointers::new(),
966                 _p: PhantomPinned,
967             }),
968         }
969     }
970 
971     /// A custom `project` implementation is used in place of `pin-project-lite`
972     /// as a custom drop implementation is needed.
project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>)973     fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
974         unsafe {
975             // Safety: Receiver is Unpin
976             is_unpin::<&mut Receiver<T>>();
977 
978             let me = self.get_unchecked_mut();
979             (me.receiver, &me.waiter)
980         }
981     }
982 }
983 
984 impl<'a, T> Future for Recv<'a, T>
985 where
986     T: Clone,
987 {
988     type Output = Result<T, RecvError>;
989 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>990     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
991         let (receiver, waiter) = self.project();
992 
993         let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
994             Ok(value) => value,
995             Err(TryRecvError::Empty) => return Poll::Pending,
996             Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
997             Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
998         };
999 
1000         Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1001     }
1002 }
1003 
1004 impl<'a, T> Drop for Recv<'a, T> {
drop(&mut self)1005     fn drop(&mut self) {
1006         // Acquire the tail lock. This is required for safety before accessing
1007         // the waiter node.
1008         let mut tail = self.receiver.shared.tail.lock();
1009 
1010         // safety: tail lock is held
1011         let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued });
1012 
1013         if queued {
1014             // Remove the node
1015             //
1016             // safety: tail lock is held and the wait node is verified to be in
1017             // the list.
1018             unsafe {
1019                 self.waiter.with_mut(|ptr| {
1020                     tail.waiters.remove((&mut *ptr).into());
1021                 });
1022             }
1023         }
1024     }
1025 }
1026 
1027 /// # Safety
1028 ///
1029 /// `Waiter` is forced to be !Unpin.
1030 unsafe impl linked_list::Link for Waiter {
1031     type Handle = NonNull<Waiter>;
1032     type Target = Waiter;
1033 
as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>1034     fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1035         *handle
1036     }
1037 
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>1038     unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1039         ptr
1040     }
1041 
pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>1042     unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1043         NonNull::from(&mut target.as_mut().pointers)
1044     }
1045 }
1046 
1047 impl<T> fmt::Debug for Sender<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1048     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1049         write!(fmt, "broadcast::Sender")
1050     }
1051 }
1052 
1053 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1054     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1055         write!(fmt, "broadcast::Receiver")
1056     }
1057 }
1058 
1059 impl<'a, T> RecvGuard<'a, T> {
clone_value(&self) -> Option<T> where T: Clone,1060     fn clone_value(&self) -> Option<T>
1061     where
1062         T: Clone,
1063     {
1064         self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
1065     }
1066 }
1067 
1068 impl<'a, T> Drop for RecvGuard<'a, T> {
drop(&mut self)1069     fn drop(&mut self) {
1070         // Decrement the remaining counter
1071         if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1072             // Safety: Last receiver, drop the value
1073             self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
1074         }
1075     }
1076 }
1077 
is_unpin<T: Unpin>()1078 fn is_unpin<T: Unpin>() {}
1079