1 //! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2 //! all consumers.
3 //!
4 //! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5 //! values. [`Sender`] handles are clone-able, allowing concurrent send and
6 //! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7 //! long as `T` is also `Send` or `Sync` respectively.
8 //!
9 //! When a value is sent, **all** [`Receiver`] handles are notified and will
10 //! receive the value. The value is stored once inside the channel and cloned on
11 //! demand for each receiver. Once all receivers have received a clone of the
12 //! value, the value is released from the channel.
13 //!
14 //! A channel is created by calling [`channel`], specifying the maximum number
15 //! of messages the channel can retain at any given time.
16 //!
17 //! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18 //! returned [`Receiver`] will receive values sent **after** the call to
19 //! `subscribe`.
20 //!
21 //! ## Lagging
22 //!
23 //! As sent messages must be retained until **all** [`Receiver`] handles receive
24 //! a clone, broadcast channels are susceptible to the "slow receiver" problem.
25 //! In this case, all but one receiver are able to receive values at the rate
26 //! they are sent. Because one receiver is stalled, the channel starts to fill
27 //! up.
28 //!
29 //! This broadcast channel implementation handles this case by setting a hard
30 //! upper bound on the number of values the channel may retain at any given
31 //! time. This upper bound is passed to the [`channel`] function as an argument.
32 //!
33 //! If a value is sent when the channel is at capacity, the oldest value
34 //! currently held by the channel is released. This frees up space for the new
35 //! value. Any receiver that has not yet seen the released value will return
36 //! [`RecvError::Lagged`] the next time [`recv`] is called.
37 //!
38 //! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
39 //! updated to the oldest value contained by the channel. The next call to
40 //! [`recv`] will return this value.
41 //!
42 //! This behavior enables a receiver to detect when it has lagged so far behind
43 //! that data has been dropped. The caller may decide how to respond to this:
44 //! either by aborting its task or by tolerating lost messages and resuming
45 //! consumption of the channel.
46 //!
47 //! ## Closing
48 //!
49 //! When **all** [`Sender`] handles have been dropped, no new values may be
50 //! sent. At this point, the channel is "closed". Once a receiver has received
51 //! all values retained by the channel, the next call to [`recv`] will return
52 //! with [`RecvError::Closed`].
53 //!
54 //! [`Sender`]: crate::sync::broadcast::Sender
55 //! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
56 //! [`Receiver`]: crate::sync::broadcast::Receiver
57 //! [`channel`]: crate::sync::broadcast::channel
58 //! [`RecvError::Lagged`]: crate::sync::broadcast::RecvError::Lagged
59 //! [`RecvError::Closed`]: crate::sync::broadcast::RecvError::Closed
60 //! [`recv`]: crate::sync::broadcast::Receiver::recv
61 //!
62 //! # Examples
63 //!
64 //! Basic usage
65 //!
66 //! ```
67 //! use tokio::sync::broadcast;
68 //!
69 //! #[tokio::main]
70 //! async fn main() {
71 //!     let (tx, mut rx1) = broadcast::channel(16);
72 //!     let mut rx2 = tx.subscribe();
73 //!
74 //!     tokio::spawn(async move {
75 //!         assert_eq!(rx1.recv().await.unwrap(), 10);
76 //!         assert_eq!(rx1.recv().await.unwrap(), 20);
77 //!     });
78 //!
79 //!     tokio::spawn(async move {
80 //!         assert_eq!(rx2.recv().await.unwrap(), 10);
81 //!         assert_eq!(rx2.recv().await.unwrap(), 20);
82 //!     });
83 //!
84 //!     tx.send(10).unwrap();
85 //!     tx.send(20).unwrap();
86 //! }
87 //! ```
88 //!
89 //! Handling lag
90 //!
91 //! ```
92 //! use tokio::sync::broadcast;
93 //!
94 //! #[tokio::main]
95 //! async fn main() {
96 //!     let (tx, mut rx) = broadcast::channel(2);
97 //!
98 //!     tx.send(10).unwrap();
99 //!     tx.send(20).unwrap();
100 //!     tx.send(30).unwrap();
101 //!
102 //!     // The receiver lagged behind
103 //!     assert!(rx.recv().await.is_err());
104 //!
105 //!     // At this point, we can abort or continue with lost messages
106 //!
107 //!     assert_eq!(20, rx.recv().await.unwrap());
108 //!     assert_eq!(30, rx.recv().await.unwrap());
109 //! }
110 
111 use crate::loom::cell::UnsafeCell;
112 use crate::loom::sync::atomic::AtomicUsize;
113 use crate::loom::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
114 use crate::util::linked_list::{self, LinkedList};
115 
116 use std::fmt;
117 use std::future::Future;
118 use std::marker::PhantomPinned;
119 use std::pin::Pin;
120 use std::ptr::NonNull;
121 use std::sync::atomic::Ordering::SeqCst;
122 use std::task::{Context, Poll, Waker};
123 use std::usize;
124 
125 /// Sending-half of the [`broadcast`] channel.
126 ///
127 /// May be used from many threads. Messages can be sent with
128 /// [`send`][Sender::send].
129 ///
130 /// # Examples
131 ///
132 /// ```
133 /// use tokio::sync::broadcast;
134 ///
135 /// #[tokio::main]
136 /// async fn main() {
137 ///     let (tx, mut rx1) = broadcast::channel(16);
138 ///     let mut rx2 = tx.subscribe();
139 ///
140 ///     tokio::spawn(async move {
141 ///         assert_eq!(rx1.recv().await.unwrap(), 10);
142 ///         assert_eq!(rx1.recv().await.unwrap(), 20);
143 ///     });
144 ///
145 ///     tokio::spawn(async move {
146 ///         assert_eq!(rx2.recv().await.unwrap(), 10);
147 ///         assert_eq!(rx2.recv().await.unwrap(), 20);
148 ///     });
149 ///
150 ///     tx.send(10).unwrap();
151 ///     tx.send(20).unwrap();
152 /// }
153 /// ```
154 ///
155 /// [`broadcast`]: crate::sync::broadcast
156 pub struct Sender<T> {
157     shared: Arc<Shared<T>>,
158 }
159 
160 /// Receiving-half of the [`broadcast`] channel.
161 ///
162 /// Must not be used concurrently. Messages may be retrieved using
163 /// [`recv`][Receiver::recv].
164 ///
165 /// # Examples
166 ///
167 /// ```
168 /// use tokio::sync::broadcast;
169 ///
170 /// #[tokio::main]
171 /// async fn main() {
172 ///     let (tx, mut rx1) = broadcast::channel(16);
173 ///     let mut rx2 = tx.subscribe();
174 ///
175 ///     tokio::spawn(async move {
176 ///         assert_eq!(rx1.recv().await.unwrap(), 10);
177 ///         assert_eq!(rx1.recv().await.unwrap(), 20);
178 ///     });
179 ///
180 ///     tokio::spawn(async move {
181 ///         assert_eq!(rx2.recv().await.unwrap(), 10);
182 ///         assert_eq!(rx2.recv().await.unwrap(), 20);
183 ///     });
184 ///
185 ///     tx.send(10).unwrap();
186 ///     tx.send(20).unwrap();
187 /// }
188 /// ```
189 ///
190 /// [`broadcast`]: crate::sync::broadcast
191 pub struct Receiver<T> {
192     /// State shared with all receivers and senders.
193     shared: Arc<Shared<T>>,
194 
195     /// Next position to read from
196     next: u64,
197 
198     /// Used to support the deprecated `poll_recv` fn
199     waiter: Option<Pin<Box<UnsafeCell<Waiter>>>>,
200 }
201 
202 /// Error returned by [`Sender::send`][Sender::send].
203 ///
204 /// A **send** operation can only fail if there are no active receivers,
205 /// implying that the message could never be received. The error contains the
206 /// message being sent as a payload so it can be recovered.
207 #[derive(Debug)]
208 pub struct SendError<T>(pub T);
209 
210 /// An error returned from the [`recv`] function on a [`Receiver`].
211 ///
212 /// [`recv`]: crate::sync::broadcast::Receiver::recv
213 /// [`Receiver`]: crate::sync::broadcast::Receiver
214 #[derive(Debug, PartialEq)]
215 pub enum RecvError {
216     /// There are no more active senders implying no further messages will ever
217     /// be sent.
218     Closed,
219 
220     /// The receiver lagged too far behind. Attempting to receive again will
221     /// return the oldest message still retained by the channel.
222     ///
223     /// Includes the number of skipped messages.
224     Lagged(u64),
225 }
226 
227 /// An error returned from the [`try_recv`] function on a [`Receiver`].
228 ///
229 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
230 /// [`Receiver`]: crate::sync::broadcast::Receiver
231 #[derive(Debug, PartialEq)]
232 pub enum TryRecvError {
233     /// The channel is currently empty. There are still active
234     /// [`Sender`][Sender] handles, so data may yet become available.
235     Empty,
236 
237     /// There are no more active senders implying no further messages will ever
238     /// be sent.
239     Closed,
240 
241     /// The receiver lagged too far behind and has been forcibly disconnected.
242     /// Attempting to receive again will return the oldest message still
243     /// retained by the channel.
244     ///
245     /// Includes the number of skipped messages.
246     Lagged(u64),
247 }
248 
249 /// Data shared between senders and receivers
250 struct Shared<T> {
251     /// slots in the channel
252     buffer: Box<[RwLock<Slot<T>>]>,
253 
254     /// Mask a position -> index
255     mask: usize,
256 
257     /// Tail of the queue. Includes the rx wait list.
258     tail: Mutex<Tail>,
259 
260     /// Number of outstanding Sender handles
261     num_tx: AtomicUsize,
262 }
263 
264 /// Next position to write a value
265 struct Tail {
266     /// Next position to write to
267     pos: u64,
268 
269     /// Number of active receivers
270     rx_cnt: usize,
271 
272     /// True if the channel is closed
273     closed: bool,
274 
275     /// Receivers waiting for a value
276     waiters: LinkedList<Waiter>,
277 }
278 
279 /// Slot in the buffer
280 struct Slot<T> {
281     /// Remaining number of receivers that are expected to see this value.
282     ///
283     /// When this goes to zero, the value is released.
284     ///
285     /// An atomic is used as it is mutated concurrently with the slot read lock
286     /// acquired.
287     rem: AtomicUsize,
288 
289     /// Uniquely identifies the `send` stored in the slot
290     pos: u64,
291 
292     /// True signals the channel is closed.
293     closed: bool,
294 
295     /// The value being broadcast.
296     ///
297     /// The value is set by `send` when the write lock is held. When a reader
298     /// drops, `rem` is decremented. When it hits zero, the value is dropped.
299     val: UnsafeCell<Option<T>>,
300 }
301 
302 /// An entry in the wait queue
303 struct Waiter {
304     /// True if queued
305     queued: bool,
306 
307     /// Task waiting on the broadcast channel.
308     waker: Option<Waker>,
309 
310     /// Intrusive linked-list pointers.
311     pointers: linked_list::Pointers<Waiter>,
312 
313     /// Should not be `Unpin`.
314     _p: PhantomPinned,
315 }
316 
317 struct RecvGuard<'a, T> {
318     slot: RwLockReadGuard<'a, Slot<T>>,
319 }
320 
321 /// Receive a value future
322 struct Recv<R, T>
323 where
324     R: AsMut<Receiver<T>>,
325 {
326     /// Receiver being waited on
327     receiver: R,
328 
329     /// Entry in the waiter `LinkedList`
330     waiter: UnsafeCell<Waiter>,
331 
332     _p: std::marker::PhantomData<T>,
333 }
334 
335 /// `AsMut<T>` is not implemented for `T` (coherence). Explicitly implementing
336 /// `AsMut` for `Receiver` would be included in the public API of the receiver
337 /// type. Instead, `Borrow` is used internally to bridge the gap.
338 struct Borrow<T>(T);
339 
340 impl<T> AsMut<Receiver<T>> for Borrow<Receiver<T>> {
as_mut(&mut self) -> &mut Receiver<T>341     fn as_mut(&mut self) -> &mut Receiver<T> {
342         &mut self.0
343     }
344 }
345 
346 impl<'a, T> AsMut<Receiver<T>> for Borrow<&'a mut Receiver<T>> {
as_mut(&mut self) -> &mut Receiver<T>347     fn as_mut(&mut self) -> &mut Receiver<T> {
348         &mut *self.0
349     }
350 }
351 
352 unsafe impl<R: AsMut<Receiver<T>> + Send, T: Send> Send for Recv<R, T> {}
353 unsafe impl<R: AsMut<Receiver<T>> + Sync, T: Send> Sync for Recv<R, T> {}
354 
355 /// Max number of receivers. Reserve space to lock.
356 const MAX_RECEIVERS: usize = usize::MAX >> 2;
357 
358 /// Create a bounded, multi-producer, multi-consumer channel where each sent
359 /// value is broadcasted to all active receivers.
360 ///
361 /// All data sent on [`Sender`] will become available on every active
362 /// [`Receiver`] in the same order as it was sent.
363 ///
364 /// The `Sender` can be cloned to `send` to the same channel from multiple
365 /// points in the process or it can be used concurrently from an `Arc`. New
366 /// `Receiver` handles are created by calling [`Sender::subscribe`].
367 ///
368 /// If all [`Receiver`] handles are dropped, the `send` method will return a
369 /// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
370 /// method will return a [`RecvError`].
371 ///
372 /// [`Sender`]: crate::sync::broadcast::Sender
373 /// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
374 /// [`Receiver`]: crate::sync::broadcast::Receiver
375 /// [`recv`]: crate::sync::broadcast::Receiver::recv
376 /// [`SendError`]: crate::sync::broadcast::SendError
377 /// [`RecvError`]: crate::sync::broadcast::RecvError
378 ///
379 /// # Examples
380 ///
381 /// ```
382 /// use tokio::sync::broadcast;
383 ///
384 /// #[tokio::main]
385 /// async fn main() {
386 ///     let (tx, mut rx1) = broadcast::channel(16);
387 ///     let mut rx2 = tx.subscribe();
388 ///
389 ///     tokio::spawn(async move {
390 ///         assert_eq!(rx1.recv().await.unwrap(), 10);
391 ///         assert_eq!(rx1.recv().await.unwrap(), 20);
392 ///     });
393 ///
394 ///     tokio::spawn(async move {
395 ///         assert_eq!(rx2.recv().await.unwrap(), 10);
396 ///         assert_eq!(rx2.recv().await.unwrap(), 20);
397 ///     });
398 ///
399 ///     tx.send(10).unwrap();
400 ///     tx.send(20).unwrap();
401 /// }
402 /// ```
channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>)403 pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
404     assert!(capacity > 0, "capacity is empty");
405     assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
406 
407     // Round to a power of two
408     capacity = capacity.next_power_of_two();
409 
410     let mut buffer = Vec::with_capacity(capacity);
411 
412     for i in 0..capacity {
413         buffer.push(RwLock::new(Slot {
414             rem: AtomicUsize::new(0),
415             pos: (i as u64).wrapping_sub(capacity as u64),
416             closed: false,
417             val: UnsafeCell::new(None),
418         }));
419     }
420 
421     let shared = Arc::new(Shared {
422         buffer: buffer.into_boxed_slice(),
423         mask: capacity - 1,
424         tail: Mutex::new(Tail {
425             pos: 0,
426             rx_cnt: 1,
427             closed: false,
428             waiters: LinkedList::new(),
429         }),
430         num_tx: AtomicUsize::new(1),
431     });
432 
433     let rx = Receiver {
434         shared: shared.clone(),
435         next: 0,
436         waiter: None,
437     };
438 
439     let tx = Sender { shared };
440 
441     (tx, rx)
442 }
443 
444 unsafe impl<T: Send> Send for Sender<T> {}
445 unsafe impl<T: Send> Sync for Sender<T> {}
446 
447 unsafe impl<T: Send> Send for Receiver<T> {}
448 unsafe impl<T: Send> Sync for Receiver<T> {}
449 
450 impl<T> Sender<T> {
451     /// Attempts to send a value to all active [`Receiver`] handles, returning
452     /// it back if it could not be sent.
453     ///
454     /// A successful send occurs when there is at least one active [`Receiver`]
455     /// handle. An unsuccessful send would be one where all associated
456     /// [`Receiver`] handles have already been dropped.
457     ///
458     /// # Return
459     ///
460     /// On success, the number of subscribed [`Receiver`] handles is returned.
461     /// This does not mean that this number of receivers will see the message as
462     /// a receiver may drop before receiving the message.
463     ///
464     /// # Note
465     ///
466     /// A return value of `Ok` **does not** mean that the sent value will be
467     /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
468     /// handles may be dropped before receiving the sent message.
469     ///
470     /// A return value of `Err` **does not** mean that future calls to `send`
471     /// will fail. New [`Receiver`] handles may be created by calling
472     /// [`subscribe`].
473     ///
474     /// [`Receiver`]: crate::sync::broadcast::Receiver
475     /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
476     ///
477     /// # Examples
478     ///
479     /// ```
480     /// use tokio::sync::broadcast;
481     ///
482     /// #[tokio::main]
483     /// async fn main() {
484     ///     let (tx, mut rx1) = broadcast::channel(16);
485     ///     let mut rx2 = tx.subscribe();
486     ///
487     ///     tokio::spawn(async move {
488     ///         assert_eq!(rx1.recv().await.unwrap(), 10);
489     ///         assert_eq!(rx1.recv().await.unwrap(), 20);
490     ///     });
491     ///
492     ///     tokio::spawn(async move {
493     ///         assert_eq!(rx2.recv().await.unwrap(), 10);
494     ///         assert_eq!(rx2.recv().await.unwrap(), 20);
495     ///     });
496     ///
497     ///     tx.send(10).unwrap();
498     ///     tx.send(20).unwrap();
499     /// }
500     /// ```
send(&self, value: T) -> Result<usize, SendError<T>>501     pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
502         self.send2(Some(value))
503             .map_err(|SendError(maybe_v)| SendError(maybe_v.unwrap()))
504     }
505 
506     /// Creates a new [`Receiver`] handle that will receive values sent **after**
507     /// this call to `subscribe`.
508     ///
509     /// # Examples
510     ///
511     /// ```
512     /// use tokio::sync::broadcast;
513     ///
514     /// #[tokio::main]
515     /// async fn main() {
516     ///     let (tx, _rx) = broadcast::channel(16);
517     ///
518     ///     // Will not be seen
519     ///     tx.send(10).unwrap();
520     ///
521     ///     let mut rx = tx.subscribe();
522     ///
523     ///     tx.send(20).unwrap();
524     ///
525     ///     let value = rx.recv().await.unwrap();
526     ///     assert_eq!(20, value);
527     /// }
528     /// ```
subscribe(&self) -> Receiver<T>529     pub fn subscribe(&self) -> Receiver<T> {
530         let shared = self.shared.clone();
531 
532         let mut tail = shared.tail.lock().unwrap();
533 
534         if tail.rx_cnt == MAX_RECEIVERS {
535             panic!("max receivers");
536         }
537 
538         tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
539         let next = tail.pos;
540 
541         drop(tail);
542 
543         Receiver {
544             shared,
545             next,
546             waiter: None,
547         }
548     }
549 
550     /// Returns the number of active receivers
551     ///
552     /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
553     /// [`subscribe`]. These are the handles that will receive values sent on
554     /// this [`Sender`].
555     ///
556     /// # Note
557     ///
558     /// It is not guaranteed that a sent message will reach this number of
559     /// receivers. Active receivers may never call [`recv`] again before
560     /// dropping.
561     ///
562     /// [`recv`]: crate::sync::broadcast::Receiver::recv
563     /// [`Receiver`]: crate::sync::broadcast::Receiver
564     /// [`Sender`]: crate::sync::broadcast::Sender
565     /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
566     /// [`channel`]: crate::sync::broadcast::channel
567     ///
568     /// # Examples
569     ///
570     /// ```
571     /// use tokio::sync::broadcast;
572     ///
573     /// #[tokio::main]
574     /// async fn main() {
575     ///     let (tx, _rx1) = broadcast::channel(16);
576     ///
577     ///     assert_eq!(1, tx.receiver_count());
578     ///
579     ///     let mut _rx2 = tx.subscribe();
580     ///
581     ///     assert_eq!(2, tx.receiver_count());
582     ///
583     ///     tx.send(10).unwrap();
584     /// }
585     /// ```
receiver_count(&self) -> usize586     pub fn receiver_count(&self) -> usize {
587         let tail = self.shared.tail.lock().unwrap();
588         tail.rx_cnt
589     }
590 
send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>>591     fn send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>> {
592         let mut tail = self.shared.tail.lock().unwrap();
593 
594         if tail.rx_cnt == 0 {
595             return Err(SendError(value));
596         }
597 
598         // Position to write into
599         let pos = tail.pos;
600         let rem = tail.rx_cnt;
601         let idx = (pos & self.shared.mask as u64) as usize;
602 
603         // Update the tail position
604         tail.pos = tail.pos.wrapping_add(1);
605 
606         // Get the slot
607         let mut slot = self.shared.buffer[idx].write().unwrap();
608 
609         // Track the position
610         slot.pos = pos;
611 
612         // Set remaining receivers
613         slot.rem.with_mut(|v| *v = rem);
614 
615         // Set the closed bit if the value is `None`; otherwise write the value
616         if value.is_none() {
617             tail.closed = true;
618             slot.closed = true;
619         } else {
620             slot.val.with_mut(|ptr| unsafe { *ptr = value });
621         }
622 
623         // Release the slot lock before notifying the receivers.
624         drop(slot);
625 
626         tail.notify_rx();
627 
628         // Release the mutex. This must happen after the slot lock is released,
629         // otherwise the writer lock bit could be cleared while another thread
630         // is in the critical section.
631         drop(tail);
632 
633         Ok(rem)
634     }
635 }
636 
637 impl Tail {
notify_rx(&mut self)638     fn notify_rx(&mut self) {
639         while let Some(mut waiter) = self.waiters.pop_back() {
640             // Safety: `waiters` lock is still held.
641             let waiter = unsafe { waiter.as_mut() };
642 
643             assert!(waiter.queued);
644             waiter.queued = false;
645 
646             let waker = waiter.waker.take().unwrap();
647             waker.wake();
648         }
649     }
650 }
651 
652 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>653     fn clone(&self) -> Sender<T> {
654         let shared = self.shared.clone();
655         shared.num_tx.fetch_add(1, SeqCst);
656 
657         Sender { shared }
658     }
659 }
660 
661 impl<T> Drop for Sender<T> {
drop(&mut self)662     fn drop(&mut self) {
663         if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
664             let _ = self.send2(None);
665         }
666     }
667 }
668 
669 impl<T> Receiver<T> {
670     /// Locks the next value if there is one.
recv_ref( &mut self, waiter: Option<(&UnsafeCell<Waiter>, &Waker)>, ) -> Result<RecvGuard<'_, T>, TryRecvError>671     fn recv_ref(
672         &mut self,
673         waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
674     ) -> Result<RecvGuard<'_, T>, TryRecvError> {
675         let idx = (self.next & self.shared.mask as u64) as usize;
676 
677         // The slot holding the next value to read
678         let mut slot = self.shared.buffer[idx].read().unwrap();
679 
680         if slot.pos != self.next {
681             let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
682 
683             // The receiver has read all current values in the channel and there
684             // is no waiter to register
685             if waiter.is_none() && next_pos == self.next {
686                 return Err(TryRecvError::Empty);
687             }
688 
689             // Release the `slot` lock before attempting to acquire the `tail`
690             // lock. This is required because `send2` acquires the tail lock
691             // first followed by the slot lock. Acquiring the locks in reverse
692             // order here would result in a potential deadlock: `recv_ref`
693             // acquires the `slot` lock and attempts to acquire the `tail` lock
694             // while `send2` acquired the `tail` lock and attempts to acquire
695             // the slot lock.
696             drop(slot);
697 
698             let mut tail = self.shared.tail.lock().unwrap();
699 
700             // Acquire slot lock again
701             slot = self.shared.buffer[idx].read().unwrap();
702 
703             // Make sure the position did not change. This could happen in the
704             // unlikely event that the buffer is wrapped between dropping the
705             // read lock and acquiring the tail lock.
706             if slot.pos != self.next {
707                 let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
708 
709                 if next_pos == self.next {
710                     // Store the waker
711                     if let Some((waiter, waker)) = waiter {
712                         // Safety: called while locked.
713                         unsafe {
714                             // Only queue if not already queued
715                             waiter.with_mut(|ptr| {
716                                 // If there is no waker **or** if the currently
717                                 // stored waker references a **different** task,
718                                 // track the tasks' waker to be notified on
719                                 // receipt of a new value.
720                                 match (*ptr).waker {
721                                     Some(ref w) if w.will_wake(waker) => {}
722                                     _ => {
723                                         (*ptr).waker = Some(waker.clone());
724                                     }
725                                 }
726 
727                                 if !(*ptr).queued {
728                                     (*ptr).queued = true;
729                                     tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
730                                 }
731                             });
732                         }
733                     }
734 
735                     return Err(TryRecvError::Empty);
736                 }
737 
738                 // At this point, the receiver has lagged behind the sender by
739                 // more than the channel capacity. The receiver will attempt to
740                 // catch up by skipping dropped messages and setting the
741                 // internal cursor to the **oldest** message stored by the
742                 // channel.
743                 //
744                 // However, finding the oldest position is a bit more
745                 // complicated than `tail-position - buffer-size`. When
746                 // the channel is closed, the tail position is incremented to
747                 // signal a new `None` message, but `None` is not stored in the
748                 // channel itself (see issue #2425 for why).
749                 //
750                 // To account for this, if the channel is closed, the tail
751                 // position is decremented by `buffer-size + 1`.
752                 let mut adjust = 0;
753                 if tail.closed {
754                     adjust = 1
755                 }
756                 let next = tail
757                     .pos
758                     .wrapping_sub(self.shared.buffer.len() as u64 + adjust);
759 
760                 let missed = next.wrapping_sub(self.next);
761 
762                 drop(tail);
763 
764                 // The receiver is slow but no values have been missed
765                 if missed == 0 {
766                     self.next = self.next.wrapping_add(1);
767 
768                     return Ok(RecvGuard { slot });
769                 }
770 
771                 self.next = next;
772 
773                 return Err(TryRecvError::Lagged(missed));
774             }
775         }
776 
777         self.next = self.next.wrapping_add(1);
778 
779         if slot.closed {
780             return Err(TryRecvError::Closed);
781         }
782 
783         Ok(RecvGuard { slot })
784     }
785 }
786 
787 impl<T> Receiver<T>
788 where
789     T: Clone,
790 {
791     /// Attempts to return a pending value on this receiver without awaiting.
792     ///
793     /// This is useful for a flavor of "optimistic check" before deciding to
794     /// await on a receiver.
795     ///
796     /// Compared with [`recv`], this function has three failure cases instead of one
797     /// (one for closed, one for an empty buffer, one for a lagging receiver).
798     ///
799     /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
800     /// dropped, indicating that no further values can be sent on the channel.
801     ///
802     /// If the [`Receiver`] handle falls behind, once the channel is full, newly
803     /// sent values will overwrite old values. At this point, a call to [`recv`]
804     /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
805     /// internal cursor is updated to point to the oldest value still held by
806     /// the channel. A subsequent call to [`try_recv`] will return this value
807     /// **unless** it has been since overwritten. If there are no values to
808     /// receive, `Err(TryRecvError::Empty)` is returned.
809     ///
810     /// [`recv`]: crate::sync::broadcast::Receiver::recv
811     /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
812     /// [`Receiver`]: crate::sync::broadcast::Receiver
813     ///
814     /// # Examples
815     ///
816     /// ```
817     /// use tokio::sync::broadcast;
818     ///
819     /// #[tokio::main]
820     /// async fn main() {
821     ///     let (tx, mut rx) = broadcast::channel(16);
822     ///
823     ///     assert!(rx.try_recv().is_err());
824     ///
825     ///     tx.send(10).unwrap();
826     ///
827     ///     let value = rx.try_recv().unwrap();
828     ///     assert_eq!(10, value);
829     /// }
830     /// ```
try_recv(&mut self) -> Result<T, TryRecvError>831     pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
832         let guard = self.recv_ref(None)?;
833         guard.clone_value().ok_or(TryRecvError::Closed)
834     }
835 
836     #[doc(hidden)]
837     #[deprecated(since = "0.2.21", note = "use async fn recv()")]
poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>838     pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
839         use Poll::{Pending, Ready};
840 
841         // The borrow checker prohibits calling `self.poll_ref` while passing in
842         // a mutable ref to a field (as it should). To work around this,
843         // `waiter` is first *removed* from `self` then `poll_recv` is called.
844         //
845         // However, for safety, we must ensure that `waiter` is **not** dropped.
846         // It could be contained in the intrusive linked list. The `Receiver`
847         // drop implementation handles cleanup.
848         //
849         // The guard pattern is used to ensure that, on return, even due to
850         // panic, the waiter node is replaced on `self`.
851 
852         struct Guard<'a, T> {
853             waiter: Option<Pin<Box<UnsafeCell<Waiter>>>>,
854             receiver: &'a mut Receiver<T>,
855         }
856 
857         impl<'a, T> Drop for Guard<'a, T> {
858             fn drop(&mut self) {
859                 self.receiver.waiter = self.waiter.take();
860             }
861         }
862 
863         let waiter = self.waiter.take().or_else(|| {
864             Some(Box::pin(UnsafeCell::new(Waiter {
865                 queued: false,
866                 waker: None,
867                 pointers: linked_list::Pointers::new(),
868                 _p: PhantomPinned,
869             })))
870         });
871 
872         let guard = Guard {
873             waiter,
874             receiver: self,
875         };
876         let res = guard
877             .receiver
878             .recv_ref(Some((&guard.waiter.as_ref().unwrap(), cx.waker())));
879 
880         match res {
881             Ok(guard) => Ready(guard.clone_value().ok_or(RecvError::Closed)),
882             Err(TryRecvError::Closed) => Ready(Err(RecvError::Closed)),
883             Err(TryRecvError::Lagged(n)) => Ready(Err(RecvError::Lagged(n))),
884             Err(TryRecvError::Empty) => Pending,
885         }
886     }
887 
888     /// Receives the next value for this receiver.
889     ///
890     /// Each [`Receiver`] handle will receive a clone of all values sent
891     /// **after** it has subscribed.
892     ///
893     /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
894     /// dropped, indicating that no further values can be sent on the channel.
895     ///
896     /// If the [`Receiver`] handle falls behind, once the channel is full, newly
897     /// sent values will overwrite old values. At this point, a call to [`recv`]
898     /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
899     /// internal cursor is updated to point to the oldest value still held by
900     /// the channel. A subsequent call to [`recv`] will return this value
901     /// **unless** it has been since overwritten.
902     ///
903     /// [`Receiver`]: crate::sync::broadcast::Receiver
904     /// [`recv`]: crate::sync::broadcast::Receiver::recv
905     ///
906     /// # Examples
907     ///
908     /// ```
909     /// use tokio::sync::broadcast;
910     ///
911     /// #[tokio::main]
912     /// async fn main() {
913     ///     let (tx, mut rx1) = broadcast::channel(16);
914     ///     let mut rx2 = tx.subscribe();
915     ///
916     ///     tokio::spawn(async move {
917     ///         assert_eq!(rx1.recv().await.unwrap(), 10);
918     ///         assert_eq!(rx1.recv().await.unwrap(), 20);
919     ///     });
920     ///
921     ///     tokio::spawn(async move {
922     ///         assert_eq!(rx2.recv().await.unwrap(), 10);
923     ///         assert_eq!(rx2.recv().await.unwrap(), 20);
924     ///     });
925     ///
926     ///     tx.send(10).unwrap();
927     ///     tx.send(20).unwrap();
928     /// }
929     /// ```
930     ///
931     /// Handling lag
932     ///
933     /// ```
934     /// use tokio::sync::broadcast;
935     ///
936     /// #[tokio::main]
937     /// async fn main() {
938     ///     let (tx, mut rx) = broadcast::channel(2);
939     ///
940     ///     tx.send(10).unwrap();
941     ///     tx.send(20).unwrap();
942     ///     tx.send(30).unwrap();
943     ///
944     ///     // The receiver lagged behind
945     ///     assert!(rx.recv().await.is_err());
946     ///
947     ///     // At this point, we can abort or continue with lost messages
948     ///
949     ///     assert_eq!(20, rx.recv().await.unwrap());
950     ///     assert_eq!(30, rx.recv().await.unwrap());
951     /// }
recv(&mut self) -> Result<T, RecvError>952     pub async fn recv(&mut self) -> Result<T, RecvError> {
953         let fut = Recv::<_, T>::new(Borrow(self));
954         fut.await
955     }
956 }
957 
958 #[cfg(feature = "stream")]
959 #[doc(hidden)]
960 impl<T> crate::stream::Stream for Receiver<T>
961 where
962     T: Clone,
963 {
964     type Item = Result<T, RecvError>;
965 
poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<T, RecvError>>>966     fn poll_next(
967         mut self: std::pin::Pin<&mut Self>,
968         cx: &mut Context<'_>,
969     ) -> Poll<Option<Result<T, RecvError>>> {
970         #[allow(deprecated)]
971         self.poll_recv(cx).map(|v| match v {
972             Ok(v) => Some(Ok(v)),
973             lag @ Err(RecvError::Lagged(_)) => Some(lag),
974             Err(RecvError::Closed) => None,
975         })
976     }
977 }
978 
979 impl<T> Drop for Receiver<T> {
drop(&mut self)980     fn drop(&mut self) {
981         let mut tail = self.shared.tail.lock().unwrap();
982 
983         if let Some(waiter) = &self.waiter {
984             // safety: tail lock is held
985             let queued = waiter.with(|ptr| unsafe { (*ptr).queued });
986 
987             if queued {
988                 // Remove the node
989                 //
990                 // safety: tail lock is held and the wait node is verified to be in
991                 // the list.
992                 unsafe {
993                     waiter.with_mut(|ptr| {
994                         tail.waiters.remove((&mut *ptr).into());
995                     });
996                 }
997             }
998         }
999 
1000         tail.rx_cnt -= 1;
1001         let until = tail.pos;
1002 
1003         drop(tail);
1004 
1005         while self.next != until {
1006             match self.recv_ref(None) {
1007                 Ok(_) => {}
1008                 // The channel is closed
1009                 Err(TryRecvError::Closed) => break,
1010                 // Ignore lagging, we will catch up
1011                 Err(TryRecvError::Lagged(..)) => {}
1012                 // Can't be empty
1013                 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1014             }
1015         }
1016     }
1017 }
1018 
1019 impl<R, T> Recv<R, T>
1020 where
1021     R: AsMut<Receiver<T>>,
1022 {
new(receiver: R) -> Recv<R, T>1023     fn new(receiver: R) -> Recv<R, T> {
1024         Recv {
1025             receiver,
1026             waiter: UnsafeCell::new(Waiter {
1027                 queued: false,
1028                 waker: None,
1029                 pointers: linked_list::Pointers::new(),
1030                 _p: PhantomPinned,
1031             }),
1032             _p: std::marker::PhantomData,
1033         }
1034     }
1035 
1036     /// A custom `project` implementation is used in place of `pin-project-lite`
1037     /// as a custom drop implementation is needed.
project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>)1038     fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1039         unsafe {
1040             // Safety: Receiver is Unpin
1041             is_unpin::<&mut Receiver<T>>();
1042 
1043             let me = self.get_unchecked_mut();
1044             (me.receiver.as_mut(), &me.waiter)
1045         }
1046     }
1047 }
1048 
1049 impl<R, T> Future for Recv<R, T>
1050 where
1051     R: AsMut<Receiver<T>>,
1052     T: Clone,
1053 {
1054     type Output = Result<T, RecvError>;
1055 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>1056     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1057         let (receiver, waiter) = self.project();
1058 
1059         let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1060             Ok(value) => value,
1061             Err(TryRecvError::Empty) => return Poll::Pending,
1062             Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1063             Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1064         };
1065 
1066         Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1067     }
1068 }
1069 
1070 cfg_stream! {
1071     use futures_core::Stream;
1072 
1073     impl<T: Clone> Receiver<T> {
1074         /// Convert the receiver into a `Stream`.
1075         ///
1076         /// The conversion allows using `Receiver` with APIs that require stream
1077         /// values.
1078         ///
1079         /// # Examples
1080         ///
1081         /// ```
1082         /// use tokio::stream::StreamExt;
1083         /// use tokio::sync::broadcast;
1084         ///
1085         /// #[tokio::main]
1086         /// async fn main() {
1087         ///     let (tx, rx) = broadcast::channel(128);
1088         ///
1089         ///     tokio::spawn(async move {
1090         ///         for i in 0..10_i32 {
1091         ///             tx.send(i).unwrap();
1092         ///         }
1093         ///     });
1094         ///
1095         ///    // Streams must be pinned to iterate.
1096         ///     tokio::pin! {
1097         ///         let stream = rx
1098         ///             .into_stream()
1099         ///             .filter(Result::is_ok)
1100         ///             .map(Result::unwrap)
1101         ///             .filter(|v| v % 2 == 0)
1102         ///             .map(|v| v + 1);
1103         ///     }
1104         ///
1105         ///     while let Some(i) = stream.next().await {
1106         ///         println!("{}", i);
1107         ///     }
1108         /// }
1109         /// ```
1110         pub fn into_stream(self) -> impl Stream<Item = Result<T, RecvError>> {
1111             Recv::new(Borrow(self))
1112         }
1113     }
1114 
1115     impl<R, T: Clone> Stream for Recv<R, T>
1116     where
1117         R: AsMut<Receiver<T>>,
1118         T: Clone,
1119     {
1120         type Item = Result<T, RecvError>;
1121 
1122         fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1123             let (receiver, waiter) = self.project();
1124 
1125             let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1126                 Ok(value) => value,
1127                 Err(TryRecvError::Empty) => return Poll::Pending,
1128                 Err(TryRecvError::Lagged(n)) => return Poll::Ready(Some(Err(RecvError::Lagged(n)))),
1129                 Err(TryRecvError::Closed) => return Poll::Ready(None),
1130             };
1131 
1132             Poll::Ready(guard.clone_value().map(Ok))
1133         }
1134     }
1135 }
1136 
1137 impl<R, T> Drop for Recv<R, T>
1138 where
1139     R: AsMut<Receiver<T>>,
1140 {
drop(&mut self)1141     fn drop(&mut self) {
1142         // Acquire the tail lock. This is required for safety before accessing
1143         // the waiter node.
1144         let mut tail = self.receiver.as_mut().shared.tail.lock().unwrap();
1145 
1146         // safety: tail lock is held
1147         let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued });
1148 
1149         if queued {
1150             // Remove the node
1151             //
1152             // safety: tail lock is held and the wait node is verified to be in
1153             // the list.
1154             unsafe {
1155                 self.waiter.with_mut(|ptr| {
1156                     tail.waiters.remove((&mut *ptr).into());
1157                 });
1158             }
1159         }
1160     }
1161 }
1162 
1163 /// # Safety
1164 ///
1165 /// `Waiter` is forced to be !Unpin.
1166 unsafe impl linked_list::Link for Waiter {
1167     type Handle = NonNull<Waiter>;
1168     type Target = Waiter;
1169 
as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>1170     fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1171         *handle
1172     }
1173 
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>1174     unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1175         ptr
1176     }
1177 
pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>1178     unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1179         NonNull::from(&mut target.as_mut().pointers)
1180     }
1181 }
1182 
1183 impl<T> fmt::Debug for Sender<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1184     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1185         write!(fmt, "broadcast::Sender")
1186     }
1187 }
1188 
1189 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1190     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1191         write!(fmt, "broadcast::Receiver")
1192     }
1193 }
1194 
1195 impl<'a, T> RecvGuard<'a, T> {
clone_value(&self) -> Option<T> where T: Clone,1196     fn clone_value(&self) -> Option<T>
1197     where
1198         T: Clone,
1199     {
1200         self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
1201     }
1202 }
1203 
1204 impl<'a, T> Drop for RecvGuard<'a, T> {
drop(&mut self)1205     fn drop(&mut self) {
1206         // Decrement the remaining counter
1207         if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1208             // Safety: Last receiver, drop the value
1209             self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
1210         }
1211     }
1212 }
1213 
1214 impl fmt::Display for RecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1215     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1216         match self {
1217             RecvError::Closed => write!(f, "channel closed"),
1218             RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
1219         }
1220     }
1221 }
1222 
1223 impl std::error::Error for RecvError {}
1224 
1225 impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1226     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1227         match self {
1228             TryRecvError::Empty => write!(f, "channel empty"),
1229             TryRecvError::Closed => write!(f, "channel closed"),
1230             TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
1231         }
1232     }
1233 }
1234 
1235 impl std::error::Error for TryRecvError {}
1236 
is_unpin<T: Unpin>()1237 fn is_unpin<T: Unpin>() {}
1238