1 //! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2 //! all consumers.
3 //!
4 //! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5 //! values. [`Sender`] handles are clone-able, allowing concurrent send and
6 //! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7 //! long as `T` is also `Send` or `Sync` respectively.
8 //!
9 //! When a value is sent, **all** [`Receiver`] handles are notified and will
10 //! receive the value. The value is stored once inside the channel and cloned on
11 //! demand for each receiver. Once all receivers have received a clone of the
12 //! value, the value is released from the channel.
13 //!
14 //! A channel is created by calling [`channel`], specifying the maximum number
15 //! of messages the channel can retain at any given time.
16 //!
17 //! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18 //! returned [`Receiver`] will receive values sent **after** the call to
19 //! `subscribe`.
20 //!
21 //! ## Lagging
22 //!
23 //! As sent messages must be retained until **all** [`Receiver`] handles receive
24 //! a clone, broadcast channels are suspectible to the "slow receiver" problem.
25 //! In this case, all but one receiver are able to receive values at the rate
26 //! they are sent. Because one receiver is stalled, the channel starts to fill
27 //! up.
28 //!
29 //! This broadcast channel implementation handles this case by setting a hard
30 //! upper bound on the number of values the channel may retain at any given
31 //! time. This upper bound is passed to the [`channel`] function as an argument.
32 //!
33 //! If a value is sent when the channel is at capacity, the oldest value
34 //! currently held by the channel is released. This frees up space for the new
35 //! value. Any receiver that has not yet seen the released value will return
36 //! [`RecvError::Lagged`] the next time [`recv`] is called.
37 //!
38 //! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
39 //! updated to the oldest value contained by the channel. The next call to
40 //! [`recv`] will return this value.
41 //!
42 //! This behavior enables a receiver to detect when it has lagged so far behind
43 //! that data has been dropped. The caller may decide how to respond to this:
44 //! either by aborting its task or by tolerating lost messages and resuming
45 //! consumption of the channel.
46 //!
47 //! ## Closing
48 //!
49 //! When **all** [`Sender`] handles have been dropped, no new values may be
50 //! sent. At this point, the channel is "closed". Once a receiver has received
51 //! all values retained by the channel, the next call to [`recv`] will return
52 //! with [`RecvError::Closed`].
53 //!
54 //! [`Sender`]: crate::sync::broadcast::Sender
55 //! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
56 //! [`Receiver`]: crate::sync::broadcast::Receiver
57 //! [`channel`]: crate::sync::broadcast::channel
58 //! [`RecvError::Lagged`]: crate::sync::broadcast::RecvError::Lagged
59 //! [`RecvError::Closed`]: crate::sync::broadcast::RecvError::Closed
60 //! [`recv`]: crate::sync::broadcast::Receiver::recv
61 //!
62 //! # Examples
63 //!
64 //! Basic usage
65 //!
66 //! ```
67 //! use tokio::sync::broadcast;
68 //!
69 //! #[tokio::main]
70 //! async fn main() {
71 //!     let (tx, mut rx1) = broadcast::channel(16);
72 //!     let mut rx2 = tx.subscribe();
73 //!
74 //!     tokio::spawn(async move {
75 //!         assert_eq!(rx1.recv().await.unwrap(), 10);
76 //!         assert_eq!(rx1.recv().await.unwrap(), 20);
77 //!     });
78 //!
79 //!     tokio::spawn(async move {
80 //!         assert_eq!(rx2.recv().await.unwrap(), 10);
81 //!         assert_eq!(rx2.recv().await.unwrap(), 20);
82 //!     });
83 //!
84 //!     tx.send(10).unwrap();
85 //!     tx.send(20).unwrap();
86 //! }
87 //! ```
88 //!
89 //! Handling lag
90 //!
91 //! ```
92 //! use tokio::sync::broadcast;
93 //!
94 //! #[tokio::main]
95 //! async fn main() {
96 //!     let (tx, mut rx) = broadcast::channel(2);
97 //!
98 //!     tx.send(10).unwrap();
99 //!     tx.send(20).unwrap();
100 //!     tx.send(30).unwrap();
101 //!
102 //!     // The receiver lagged behind
103 //!     assert!(rx.recv().await.is_err());
104 //!
105 //!     // At this point, we can abort or continue with lost messages
106 //!
107 //!     assert_eq!(20, rx.recv().await.unwrap());
108 //!     assert_eq!(30, rx.recv().await.unwrap());
109 //! }
110 
111 use crate::loom::cell::UnsafeCell;
112 use crate::loom::future::AtomicWaker;
113 use crate::loom::sync::atomic::{spin_loop_hint, AtomicBool, AtomicPtr, AtomicUsize};
114 use crate::loom::sync::{Arc, Condvar, Mutex};
115 
116 use std::fmt;
117 use std::mem;
118 use std::ptr;
119 use std::sync::atomic::Ordering::SeqCst;
120 use std::task::{Context, Poll, Waker};
121 use std::usize;
122 
123 /// Sending-half of the [`broadcast`] channel.
124 ///
125 /// May be used from many threads. Messages can be sent with
126 /// [`send`][Sender::send].
127 ///
128 /// # Examples
129 ///
130 /// ```
131 /// use tokio::sync::broadcast;
132 ///
133 /// #[tokio::main]
134 /// async fn main() {
135 ///     let (tx, mut rx1) = broadcast::channel(16);
136 ///     let mut rx2 = tx.subscribe();
137 ///
138 ///     tokio::spawn(async move {
139 ///         assert_eq!(rx1.recv().await.unwrap(), 10);
140 ///         assert_eq!(rx1.recv().await.unwrap(), 20);
141 ///     });
142 ///
143 ///     tokio::spawn(async move {
144 ///         assert_eq!(rx2.recv().await.unwrap(), 10);
145 ///         assert_eq!(rx2.recv().await.unwrap(), 20);
146 ///     });
147 ///
148 ///     tx.send(10).unwrap();
149 ///     tx.send(20).unwrap();
150 /// }
151 /// ```
152 ///
153 /// [`broadcast`]: crate::sync::broadcast
154 pub struct Sender<T> {
155     shared: Arc<Shared<T>>,
156 }
157 
158 /// Receiving-half of the [`broadcast`] channel.
159 ///
160 /// Must not be used concurrently. Messages may be retrieved using
161 /// [`recv`][Receiver::recv].
162 ///
163 /// # Examples
164 ///
165 /// ```
166 /// use tokio::sync::broadcast;
167 ///
168 /// #[tokio::main]
169 /// async fn main() {
170 ///     let (tx, mut rx1) = broadcast::channel(16);
171 ///     let mut rx2 = tx.subscribe();
172 ///
173 ///     tokio::spawn(async move {
174 ///         assert_eq!(rx1.recv().await.unwrap(), 10);
175 ///         assert_eq!(rx1.recv().await.unwrap(), 20);
176 ///     });
177 ///
178 ///     tokio::spawn(async move {
179 ///         assert_eq!(rx2.recv().await.unwrap(), 10);
180 ///         assert_eq!(rx2.recv().await.unwrap(), 20);
181 ///     });
182 ///
183 ///     tx.send(10).unwrap();
184 ///     tx.send(20).unwrap();
185 /// }
186 /// ```
187 ///
188 /// [`broadcast`]: crate::sync::broadcast
189 pub struct Receiver<T> {
190     /// State shared with all receivers and senders.
191     shared: Arc<Shared<T>>,
192 
193     /// Next position to read from
194     next: u64,
195 
196     /// Waiter state
197     wait: Arc<WaitNode>,
198 }
199 
200 /// Error returned by [`Sender::send`][Sender::send].
201 ///
202 /// A **send** operation can only fail if there are no active receivers,
203 /// implying that the message could never be received. The error contains the
204 /// message being sent as a payload so it can be recovered.
205 #[derive(Debug)]
206 pub struct SendError<T>(pub T);
207 
208 /// An error returned from the [`recv`] function on a [`Receiver`].
209 ///
210 /// [`recv`]: crate::sync::broadcast::Receiver::recv
211 /// [`Receiver`]: crate::sync::broadcast::Receiver
212 #[derive(Debug, PartialEq)]
213 pub enum RecvError {
214     /// There are no more active senders implying no further messages will ever
215     /// be sent.
216     Closed,
217 
218     /// The receiver lagged too far behind. Attempting to receive again will
219     /// return the oldest message still retained by the channel.
220     ///
221     /// Includes the number of skipped messages.
222     Lagged(u64),
223 }
224 
225 /// An error returned from the [`try_recv`] function on a [`Receiver`].
226 ///
227 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
228 /// [`Receiver`]: crate::sync::broadcast::Receiver
229 #[derive(Debug, PartialEq)]
230 pub enum TryRecvError {
231     /// The channel is currently empty. There are still active
232     /// [`Sender`][Sender] handles, so data may yet become available.
233     Empty,
234 
235     /// There are no more active senders implying no further messages will ever
236     /// be sent.
237     Closed,
238 
239     /// The receiver lagged too far behind and has been forcibly disconnected.
240     /// Attempting to receive again will return the oldest message still
241     /// retained by the channel.
242     ///
243     /// Includes the number of skipped messages.
244     Lagged(u64),
245 }
246 
247 /// Data shared between senders and receivers
248 struct Shared<T> {
249     /// slots in the channel
250     buffer: Box<[Slot<T>]>,
251 
252     /// Mask a position -> index
253     mask: usize,
254 
255     /// Tail of the queue
256     tail: Mutex<Tail>,
257 
258     /// Notifies a sender that the slot is unlocked
259     condvar: Condvar,
260 
261     /// Stack of pending waiters
262     wait_stack: AtomicPtr<WaitNode>,
263 
264     /// Number of outstanding Sender handles
265     num_tx: AtomicUsize,
266 }
267 
268 /// Next position to write a value
269 struct Tail {
270     /// Next position to write to
271     pos: u64,
272 
273     /// Number of active receivers
274     rx_cnt: usize,
275 }
276 
277 /// Slot in the buffer
278 struct Slot<T> {
279     /// Remaining number of receivers that are expected to see this value.
280     ///
281     /// When this goes to zero, the value is released.
282     rem: AtomicUsize,
283 
284     /// Used to lock the `write` field.
285     lock: AtomicUsize,
286 
287     /// The value being broadcast
288     ///
289     /// Synchronized by `state`
290     write: Write<T>,
291 }
292 
293 /// A write in the buffer
294 struct Write<T> {
295     /// Uniquely identifies this write
296     pos: UnsafeCell<u64>,
297 
298     /// The written value
299     val: UnsafeCell<Option<T>>,
300 }
301 
302 /// Tracks a waiting receiver
303 #[derive(Debug)]
304 struct WaitNode {
305     /// `true` if queued
306     queued: AtomicBool,
307 
308     /// Task to wake when a permit is made available.
309     waker: AtomicWaker,
310 
311     /// Next pointer in the stack of waiting senders.
312     next: UnsafeCell<*const WaitNode>,
313 }
314 
315 struct RecvGuard<'a, T> {
316     slot: &'a Slot<T>,
317     tail: &'a Mutex<Tail>,
318     condvar: &'a Condvar,
319 }
320 
321 /// Max number of receivers. Reserve space to lock.
322 const MAX_RECEIVERS: usize = usize::MAX >> 1;
323 
324 /// Create a bounded, multi-producer, multi-consumer channel where each sent
325 /// value is broadcasted to all active receivers.
326 ///
327 /// All data sent on [`Sender`] will become available on every active
328 /// [`Receiver`] in the same order as it was sent.
329 ///
330 /// The `Sender` can be cloned to `send` to the same channel from multiple
331 /// points in the process or it can be used concurrently from an `Arc`. New
332 /// `Receiver` handles are created by calling [`Sender::subscribe`].
333 ///
334 /// If all [`Receiver`] handles are dropped, the `send` method will return a
335 /// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
336 /// method will return a [`RecvError`].
337 ///
338 /// [`Sender`]: crate::sync::broadcast::Sender
339 /// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
340 /// [`Receiver`]: crate::sync::broadcast::Receiver
341 /// [`recv`]: crate::sync::broadcast::Receiver::recv
342 /// [`SendError`]: crate::sync::broadcast::SendError
343 /// [`RecvError`]: crate::sync::broadcast::RecvError
344 ///
345 /// # Examples
346 ///
347 /// ```
348 /// use tokio::sync::broadcast;
349 ///
350 /// #[tokio::main]
351 /// async fn main() {
352 ///     let (tx, mut rx1) = broadcast::channel(16);
353 ///     let mut rx2 = tx.subscribe();
354 ///
355 ///     tokio::spawn(async move {
356 ///         assert_eq!(rx1.recv().await.unwrap(), 10);
357 ///         assert_eq!(rx1.recv().await.unwrap(), 20);
358 ///     });
359 ///
360 ///     tokio::spawn(async move {
361 ///         assert_eq!(rx2.recv().await.unwrap(), 10);
362 ///         assert_eq!(rx2.recv().await.unwrap(), 20);
363 ///     });
364 ///
365 ///     tx.send(10).unwrap();
366 ///     tx.send(20).unwrap();
367 /// }
368 /// ```
channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>)369 pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
370     assert!(capacity > 0, "capacity is empty");
371     assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
372 
373     // Round to a power of two
374     capacity = capacity.next_power_of_two();
375 
376     let mut buffer = Vec::with_capacity(capacity);
377 
378     for i in 0..capacity {
379         buffer.push(Slot {
380             rem: AtomicUsize::new(0),
381             lock: AtomicUsize::new(0),
382             write: Write {
383                 pos: UnsafeCell::new((i as u64).wrapping_sub(capacity as u64)),
384                 val: UnsafeCell::new(None),
385             },
386         });
387     }
388 
389     let shared = Arc::new(Shared {
390         buffer: buffer.into_boxed_slice(),
391         mask: capacity - 1,
392         tail: Mutex::new(Tail { pos: 0, rx_cnt: 1 }),
393         condvar: Condvar::new(),
394         wait_stack: AtomicPtr::new(ptr::null_mut()),
395         num_tx: AtomicUsize::new(1),
396     });
397 
398     let rx = Receiver {
399         shared: shared.clone(),
400         next: 0,
401         wait: Arc::new(WaitNode {
402             queued: AtomicBool::new(false),
403             waker: AtomicWaker::new(),
404             next: UnsafeCell::new(ptr::null()),
405         }),
406     };
407 
408     let tx = Sender { shared };
409 
410     (tx, rx)
411 }
412 
413 unsafe impl<T: Send> Send for Sender<T> {}
414 unsafe impl<T: Send> Sync for Sender<T> {}
415 
416 unsafe impl<T: Send> Send for Receiver<T> {}
417 unsafe impl<T: Send> Sync for Receiver<T> {}
418 
419 impl<T> Sender<T> {
420     /// Attempts to send a value to all active [`Receiver`] handles, returning
421     /// it back if it could not be sent.
422     ///
423     /// A successful send occurs when there is at least one active [`Receiver`]
424     /// handle. An unsuccessful send would be one where all associated
425     /// [`Receiver`] handles have already been dropped.
426     ///
427     /// # Return
428     ///
429     /// On success, the number of subscribed [`Receiver`] handles is returned.
430     /// This does not mean that this number of receivers will see the message as
431     /// a receiver may drop before receiving the message.
432     ///
433     /// # Note
434     ///
435     /// A return value of `Ok` **does not** mean that the sent value will be
436     /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
437     /// handles may be dropped before receiving the sent message.
438     ///
439     /// A return value of `Err` **does not** mean that future calls to `send`
440     /// will fail. New [`Receiver`] handles may be created by calling
441     /// [`subscribe`].
442     ///
443     /// [`Receiver`]: crate::sync::broadcast::Receiver
444     /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
445     ///
446     /// # Examples
447     ///
448     /// ```
449     /// use tokio::sync::broadcast;
450     ///
451     /// #[tokio::main]
452     /// async fn main() {
453     ///     let (tx, mut rx1) = broadcast::channel(16);
454     ///     let mut rx2 = tx.subscribe();
455     ///
456     ///     tokio::spawn(async move {
457     ///         assert_eq!(rx1.recv().await.unwrap(), 10);
458     ///         assert_eq!(rx1.recv().await.unwrap(), 20);
459     ///     });
460     ///
461     ///     tokio::spawn(async move {
462     ///         assert_eq!(rx2.recv().await.unwrap(), 10);
463     ///         assert_eq!(rx2.recv().await.unwrap(), 20);
464     ///     });
465     ///
466     ///     tx.send(10).unwrap();
467     ///     tx.send(20).unwrap();
468     /// }
469     /// ```
send(&self, value: T) -> Result<usize, SendError<T>>470     pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
471         self.send2(Some(value))
472             .map_err(|SendError(maybe_v)| SendError(maybe_v.unwrap()))
473     }
474 
475     /// Creates a new [`Receiver`] handle that will receive values sent **after**
476     /// this call to `subscribe`.
477     ///
478     /// # Examples
479     ///
480     /// ```
481     /// use tokio::sync::broadcast;
482     ///
483     /// #[tokio::main]
484     /// async fn main() {
485     ///     let (tx, _rx) = broadcast::channel(16);
486     ///
487     ///     // Will not be seen
488     ///     tx.send(10).unwrap();
489     ///
490     ///     let mut rx = tx.subscribe();
491     ///
492     ///     tx.send(20).unwrap();
493     ///
494     ///     let value = rx.recv().await.unwrap();
495     ///     assert_eq!(20, value);
496     /// }
497     /// ```
subscribe(&self) -> Receiver<T>498     pub fn subscribe(&self) -> Receiver<T> {
499         let shared = self.shared.clone();
500 
501         let mut tail = shared.tail.lock().unwrap();
502 
503         if tail.rx_cnt == MAX_RECEIVERS {
504             panic!("max receivers");
505         }
506 
507         tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
508         let next = tail.pos;
509 
510         drop(tail);
511 
512         Receiver {
513             shared,
514             next,
515             wait: Arc::new(WaitNode {
516                 queued: AtomicBool::new(false),
517                 waker: AtomicWaker::new(),
518                 next: UnsafeCell::new(ptr::null()),
519             }),
520         }
521     }
522 
523     /// Returns the number of active receivers
524     ///
525     /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
526     /// [`subscribe`]. These are the handles that will receive values sent on
527     /// this [`Sender`].
528     ///
529     /// # Note
530     ///
531     /// It is not guaranteed that a sent message will reach this number of
532     /// receivers. Active receivers may never call [`recv`] again before
533     /// dropping.
534     ///
535     /// [`recv`]: crate::sync::broadcast::Receiver::recv
536     /// [`Receiver`]: crate::sync::broadcast::Receiver
537     /// [`Sender`]: crate::sync::broadcast::Sender
538     /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
539     /// [`channel`]: crate::sync::broadcast::channel
540     ///
541     /// # Examples
542     ///
543     /// ```
544     /// use tokio::sync::broadcast;
545     ///
546     /// #[tokio::main]
547     /// async fn main() {
548     ///     let (tx, _rx1) = broadcast::channel(16);
549     ///
550     ///     assert_eq!(1, tx.receiver_count());
551     ///
552     ///     let mut _rx2 = tx.subscribe();
553     ///
554     ///     assert_eq!(2, tx.receiver_count());
555     ///
556     ///     tx.send(10).unwrap();
557     /// }
558     /// ```
receiver_count(&self) -> usize559     pub fn receiver_count(&self) -> usize {
560         let tail = self.shared.tail.lock().unwrap();
561         tail.rx_cnt
562     }
563 
send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>>564     fn send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>> {
565         let mut tail = self.shared.tail.lock().unwrap();
566 
567         if tail.rx_cnt == 0 {
568             return Err(SendError(value));
569         }
570 
571         // Position to write into
572         let pos = tail.pos;
573         let rem = tail.rx_cnt;
574         let idx = (pos & self.shared.mask as u64) as usize;
575 
576         // Update the tail position
577         tail.pos = tail.pos.wrapping_add(1);
578 
579         // Get the slot
580         let slot = &self.shared.buffer[idx];
581 
582         // Acquire the write lock
583         let mut prev = slot.lock.fetch_or(1, SeqCst);
584 
585         while prev & !1 != 0 {
586             // Concurrent readers, we must go to sleep
587             tail = self.shared.condvar.wait(tail).unwrap();
588 
589             prev = slot.lock.load(SeqCst);
590 
591             if prev & 1 == 0 {
592                 // The writer lock bit was cleared while this thread was
593                 // sleeping. This can only happen if a newer write happened on
594                 // this slot by another thread. Bail early as an optimization,
595                 // there is nothing left to do.
596                 return Ok(rem);
597             }
598         }
599 
600         if tail.pos.wrapping_sub(pos) > self.shared.buffer.len() as u64 {
601             // There is a newer pending write to the same slot.
602             return Ok(rem);
603         }
604 
605         // Slot lock acquired
606         slot.write.pos.with_mut(|ptr| unsafe { *ptr = pos });
607         slot.write.val.with_mut(|ptr| unsafe { *ptr = value });
608 
609         // Set remaining receivers
610         slot.rem.store(rem, SeqCst);
611 
612         // Release the slot lock
613         slot.lock.store(0, SeqCst);
614 
615         // Release the mutex. This must happen after the slot lock is released,
616         // otherwise the writer lock bit could be cleared while another thread
617         // is in the critical section.
618         drop(tail);
619 
620         // Notify waiting receivers
621         self.notify_rx();
622 
623         Ok(rem)
624     }
625 
notify_rx(&self)626     fn notify_rx(&self) {
627         let mut curr = self.shared.wait_stack.swap(ptr::null_mut(), SeqCst) as *const WaitNode;
628 
629         while !curr.is_null() {
630             let waiter = unsafe { Arc::from_raw(curr) };
631 
632             // Update `curr` before toggling `queued` and waking
633             curr = waiter.next.with(|ptr| unsafe { *ptr });
634 
635             // Unset queued
636             waiter.queued.store(false, SeqCst);
637 
638             // Wake
639             waiter.waker.wake();
640         }
641     }
642 }
643 
644 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>645     fn clone(&self) -> Sender<T> {
646         let shared = self.shared.clone();
647         shared.num_tx.fetch_add(1, SeqCst);
648 
649         Sender { shared }
650     }
651 }
652 
653 impl<T> Drop for Sender<T> {
drop(&mut self)654     fn drop(&mut self) {
655         if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
656             let _ = self.send2(None);
657         }
658     }
659 }
660 
661 impl<T> Receiver<T> {
662     /// Locks the next value if there is one.
663     ///
664     /// The caller is responsible for unlocking
recv_ref(&mut self, spin: bool) -> Result<RecvGuard<'_, T>, TryRecvError>665     fn recv_ref(&mut self, spin: bool) -> Result<RecvGuard<'_, T>, TryRecvError> {
666         let idx = (self.next & self.shared.mask as u64) as usize;
667 
668         // The slot holding the next value to read
669         let slot = &self.shared.buffer[idx];
670 
671         // Lock the slot
672         if !slot.try_rx_lock() {
673             if spin {
674                 while !slot.try_rx_lock() {
675                     spin_loop_hint();
676                 }
677             } else {
678                 return Err(TryRecvError::Empty);
679             }
680         }
681 
682         let guard = RecvGuard {
683             slot,
684             tail: &self.shared.tail,
685             condvar: &self.shared.condvar,
686         };
687 
688         if guard.pos() != self.next {
689             let pos = guard.pos();
690 
691             guard.drop_no_rem_dec();
692 
693             if pos.wrapping_add(self.shared.buffer.len() as u64) == self.next {
694                 return Err(TryRecvError::Empty);
695             } else {
696                 let tail = self.shared.tail.lock().unwrap();
697 
698                 // `tail.pos` points to the slot the **next** send writes to.
699                 // Because a receiver is lagging, this slot also holds the
700                 // oldest value. To make the positions match, we subtract the
701                 // capacity.
702                 let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
703                 let missed = next.wrapping_sub(self.next);
704 
705                 self.next = next;
706 
707                 return Err(TryRecvError::Lagged(missed));
708             }
709         }
710 
711         self.next = self.next.wrapping_add(1);
712 
713         Ok(guard)
714     }
715 }
716 
717 impl<T> Receiver<T>
718 where
719     T: Clone,
720 {
721     /// Attempts to return a pending value on this receiver without awaiting.
722     ///
723     /// This is useful for a flavor of "optimistic check" before deciding to
724     /// await on a receiver.
725     ///
726     /// Compared with [`recv`], this function has three failure cases instead of one
727     /// (one for closed, one for an empty buffer, one for a lagging receiver).
728     ///
729     /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
730     /// dropped, indicating that no further values can be sent on the channel.
731     ///
732     /// If the [`Receiver`] handle falls behind, once the channel is full, newly
733     /// sent values will overwrite old values. At this point, a call to [`recv`]
734     /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
735     /// internal cursor is updated to point to the oldest value still held by
736     /// the channel. A subsequent call to [`try_recv`] will return this value
737     /// **unless** it has been since overwritten. If there are no values to
738     /// receive, `Err(TryRecvError::Empty)` is returned.
739     ///
740     /// [`recv`]: crate::sync::broadcast::Receiver::recv
741     /// [`Receiver`]: crate::sync::broadcast::Receiver
742     ///
743     /// # Examples
744     ///
745     /// ```
746     /// use tokio::sync::broadcast;
747     ///
748     /// #[tokio::main]
749     /// async fn main() {
750     ///     let (tx, mut rx) = broadcast::channel(16);
751     ///
752     ///     assert!(rx.try_recv().is_err());
753     ///
754     ///     tx.send(10).unwrap();
755     ///
756     ///     let value = rx.try_recv().unwrap();
757     ///     assert_eq!(10, value);
758     /// }
759     /// ```
try_recv(&mut self) -> Result<T, TryRecvError>760     pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
761         let guard = self.recv_ref(false)?;
762         guard.clone_value().ok_or(TryRecvError::Closed)
763     }
764 
765     #[doc(hidden)] // TODO: document
poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>766     pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
767         if let Some(value) = ok_empty(self.try_recv())? {
768             return Poll::Ready(Ok(value));
769         }
770 
771         self.register_waker(cx.waker());
772 
773         if let Some(value) = ok_empty(self.try_recv())? {
774             Poll::Ready(Ok(value))
775         } else {
776             Poll::Pending
777         }
778     }
779 
780     /// Receives the next value for this receiver.
781     ///
782     /// Each [`Receiver`] handle will receive a clone of all values sent
783     /// **after** it has subscribed.
784     ///
785     /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
786     /// dropped, indicating that no further values can be sent on the channel.
787     ///
788     /// If the [`Receiver`] handle falls behind, once the channel is full, newly
789     /// sent values will overwrite old values. At this point, a call to [`recv`]
790     /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
791     /// internal cursor is updated to point to the oldest value still held by
792     /// the channel. A subsequent call to [`recv`] will return this value
793     /// **unless** it has been since overwritten.
794     ///
795     /// [`Receiver`]: crate::sync::broadcast::Receiver
796     /// [`recv`]: crate::sync::broadcast::Receiver::recv
797     ///
798     /// # Examples
799     ///
800     /// ```
801     /// use tokio::sync::broadcast;
802     ///
803     /// #[tokio::main]
804     /// async fn main() {
805     ///     let (tx, mut rx1) = broadcast::channel(16);
806     ///     let mut rx2 = tx.subscribe();
807     ///
808     ///     tokio::spawn(async move {
809     ///         assert_eq!(rx1.recv().await.unwrap(), 10);
810     ///         assert_eq!(rx1.recv().await.unwrap(), 20);
811     ///     });
812     ///
813     ///     tokio::spawn(async move {
814     ///         assert_eq!(rx2.recv().await.unwrap(), 10);
815     ///         assert_eq!(rx2.recv().await.unwrap(), 20);
816     ///     });
817     ///
818     ///     tx.send(10).unwrap();
819     ///     tx.send(20).unwrap();
820     /// }
821     /// ```
822     ///
823     /// Handling lag
824     ///
825     /// ```
826     /// use tokio::sync::broadcast;
827     ///
828     /// #[tokio::main]
829     /// async fn main() {
830     ///     let (tx, mut rx) = broadcast::channel(2);
831     ///
832     ///     tx.send(10).unwrap();
833     ///     tx.send(20).unwrap();
834     ///     tx.send(30).unwrap();
835     ///
836     ///     // The receiver lagged behind
837     ///     assert!(rx.recv().await.is_err());
838     ///
839     ///     // At this point, we can abort or continue with lost messages
840     ///
841     ///     assert_eq!(20, rx.recv().await.unwrap());
842     ///     assert_eq!(30, rx.recv().await.unwrap());
843     /// }
recv(&mut self) -> Result<T, RecvError>844     pub async fn recv(&mut self) -> Result<T, RecvError> {
845         use crate::future::poll_fn;
846 
847         poll_fn(|cx| self.poll_recv(cx)).await
848     }
849 
register_waker(&self, cx: &Waker)850     fn register_waker(&self, cx: &Waker) {
851         self.wait.waker.register_by_ref(cx);
852 
853         if !self.wait.queued.load(SeqCst) {
854             // Set `queued` before queuing.
855             self.wait.queued.store(true, SeqCst);
856 
857             let mut curr = self.shared.wait_stack.load(SeqCst);
858 
859             // The ref count is decremented in `notify_rx` when all nodes are
860             // removed from the waiter stack.
861             let node = Arc::into_raw(self.wait.clone()) as *mut _;
862 
863             loop {
864                 // Safety: `queued == false` means the caller has exclusive
865                 // access to `self.wait.next`.
866                 self.wait.next.with_mut(|ptr| unsafe { *ptr = curr });
867 
868                 let res = self
869                     .shared
870                     .wait_stack
871                     .compare_exchange(curr, node, SeqCst, SeqCst);
872 
873                 match res {
874                     Ok(_) => return,
875                     Err(actual) => curr = actual,
876                 }
877             }
878         }
879     }
880 }
881 
882 #[cfg(feature = "stream")]
883 impl<T> crate::stream::Stream for Receiver<T>
884 where
885     T: Clone,
886 {
887     type Item = Result<T, RecvError>;
888 
poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<T, RecvError>>>889     fn poll_next(
890         mut self: std::pin::Pin<&mut Self>,
891         cx: &mut Context<'_>,
892     ) -> Poll<Option<Result<T, RecvError>>> {
893         self.poll_recv(cx).map(|v| match v {
894             Ok(v) => Some(Ok(v)),
895             lag @ Err(RecvError::Lagged(_)) => Some(lag),
896             Err(RecvError::Closed) => None,
897         })
898     }
899 }
900 
901 impl<T> Drop for Receiver<T> {
drop(&mut self)902     fn drop(&mut self) {
903         let mut tail = self.shared.tail.lock().unwrap();
904 
905         tail.rx_cnt -= 1;
906         let until = tail.pos;
907 
908         drop(tail);
909 
910         while self.next != until {
911             match self.recv_ref(true) {
912                 // Ignore the value
913                 Ok(_) => {}
914                 // The channel is closed
915                 Err(TryRecvError::Closed) => break,
916                 // Ignore lagging, we will catch up
917                 Err(TryRecvError::Lagged(..)) => {}
918                 // Can't be empty
919                 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
920             }
921         }
922     }
923 }
924 
925 impl<T> Drop for Shared<T> {
drop(&mut self)926     fn drop(&mut self) {
927         // Clear the wait stack
928         let mut curr = self.wait_stack.with_mut(|ptr| *ptr as *const WaitNode);
929 
930         while !curr.is_null() {
931             let waiter = unsafe { Arc::from_raw(curr) };
932             curr = waiter.next.with(|ptr| unsafe { *ptr });
933         }
934     }
935 }
936 
937 impl<T> fmt::Debug for Sender<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result938     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
939         write!(fmt, "broadcast::Sender")
940     }
941 }
942 
943 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result944     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
945         write!(fmt, "broadcast::Receiver")
946     }
947 }
948 
949 impl<T> Slot<T> {
950     /// Tries to lock the slot for a receiver. If `false`, then a sender holds the
951     /// lock and the calling task will be notified once the sender has released
952     /// the lock.
try_rx_lock(&self) -> bool953     fn try_rx_lock(&self) -> bool {
954         let mut curr = self.lock.load(SeqCst);
955 
956         loop {
957             if curr & 1 == 1 {
958                 // Locked by sender
959                 return false;
960             }
961 
962             // Only increment (by 2) if the LSB "lock" bit is not set.
963             let res = self.lock.compare_exchange(curr, curr + 2, SeqCst, SeqCst);
964 
965             match res {
966                 Ok(_) => return true,
967                 Err(actual) => curr = actual,
968             }
969         }
970     }
971 
rx_unlock(&self, tail: &Mutex<Tail>, condvar: &Condvar, rem_dec: bool)972     fn rx_unlock(&self, tail: &Mutex<Tail>, condvar: &Condvar, rem_dec: bool) {
973         if rem_dec {
974             // Decrement the remaining counter
975             if 1 == self.rem.fetch_sub(1, SeqCst) {
976                 // Last receiver, drop the value
977                 self.write.val.with_mut(|ptr| unsafe { *ptr = None });
978             }
979         }
980 
981         if 1 == self.lock.fetch_sub(2, SeqCst) - 2 {
982             // First acquire the lock to make sure our sender is waiting on the
983             // condition variable, otherwise the notification could be lost.
984             mem::drop(tail.lock().unwrap());
985             // Wake up senders
986             condvar.notify_all();
987         }
988     }
989 }
990 
991 impl<'a, T> RecvGuard<'a, T> {
pos(&self) -> u64992     fn pos(&self) -> u64 {
993         self.slot.write.pos.with(|ptr| unsafe { *ptr })
994     }
995 
clone_value(&self) -> Option<T> where T: Clone,996     fn clone_value(&self) -> Option<T>
997     where
998         T: Clone,
999     {
1000         self.slot.write.val.with(|ptr| unsafe { (*ptr).clone() })
1001     }
1002 
drop_no_rem_dec(self)1003     fn drop_no_rem_dec(self) {
1004         self.slot.rx_unlock(self.tail, self.condvar, false);
1005 
1006         mem::forget(self);
1007     }
1008 }
1009 
1010 impl<'a, T> Drop for RecvGuard<'a, T> {
drop(&mut self)1011     fn drop(&mut self) {
1012         self.slot.rx_unlock(self.tail, self.condvar, true)
1013     }
1014 }
1015 
ok_empty<T>(res: Result<T, TryRecvError>) -> Result<Option<T>, RecvError>1016 fn ok_empty<T>(res: Result<T, TryRecvError>) -> Result<Option<T>, RecvError> {
1017     match res {
1018         Ok(value) => Ok(Some(value)),
1019         Err(TryRecvError::Empty) => Ok(None),
1020         Err(TryRecvError::Lagged(n)) => Err(RecvError::Lagged(n)),
1021         Err(TryRecvError::Closed) => Err(RecvError::Closed),
1022     }
1023 }
1024 
1025 impl fmt::Display for RecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1026     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1027         match self {
1028             RecvError::Closed => write!(f, "channel closed"),
1029             RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
1030         }
1031     }
1032 }
1033 
1034 impl std::error::Error for RecvError {}
1035 
1036 impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1037     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1038         match self {
1039             TryRecvError::Empty => write!(f, "channel empty"),
1040             TryRecvError::Closed => write!(f, "channel closed"),
1041             TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
1042         }
1043     }
1044 }
1045 
1046 impl std::error::Error for TryRecvError {}
1047