1 //! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2 //! all consumers.
3 //!
4 //! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5 //! values. [`Sender`] handles are clone-able, allowing concurrent send and
6 //! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7 //! long as `T` is also `Send` or `Sync` respectively.
8 //!
9 //! When a value is sent, **all** [`Receiver`] handles are notified and will
10 //! receive the value. The value is stored once inside the channel and cloned on
11 //! demand for each receiver. Once all receivers have received a clone of the
12 //! value, the value is released from the channel.
13 //!
14 //! A channel is created by calling [`channel`], specifying the maximum number
15 //! of messages the channel can retain at any given time.
16 //!
17 //! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18 //! returned [`Receiver`] will receive values sent **after** the call to
19 //! `subscribe`.
20 //!
21 //! ## Lagging
22 //!
23 //! As sent messages must be retained until **all** [`Receiver`] handles receive
24 //! a clone, broadcast channels are susceptible to the "slow receiver" problem.
25 //! In this case, all but one receiver are able to receive values at the rate
26 //! they are sent. Because one receiver is stalled, the channel starts to fill
27 //! up.
28 //!
29 //! This broadcast channel implementation handles this case by setting a hard
30 //! upper bound on the number of values the channel may retain at any given
31 //! time. This upper bound is passed to the [`channel`] function as an argument.
32 //!
33 //! If a value is sent when the channel is at capacity, the oldest value
34 //! currently held by the channel is released. This frees up space for the new
35 //! value. Any receiver that has not yet seen the released value will return
36 //! [`RecvError::Lagged`] the next time [`recv`] is called.
37 //!
38 //! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
39 //! updated to the oldest value contained by the channel. The next call to
40 //! [`recv`] will return this value.
41 //!
42 //! This behavior enables a receiver to detect when it has lagged so far behind
43 //! that data has been dropped. The caller may decide how to respond to this:
44 //! either by aborting its task or by tolerating lost messages and resuming
45 //! consumption of the channel.
46 //!
47 //! ## Closing
48 //!
49 //! When **all** [`Sender`] handles have been dropped, no new values may be
50 //! sent. At this point, the channel is "closed". Once a receiver has received
51 //! all values retained by the channel, the next call to [`recv`] will return
52 //! with [`RecvError::Closed`].
53 //!
54 //! [`Sender`]: crate::sync::broadcast::Sender
55 //! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
56 //! [`Receiver`]: crate::sync::broadcast::Receiver
57 //! [`channel`]: crate::sync::broadcast::channel
58 //! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
59 //! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
60 //! [`recv`]: crate::sync::broadcast::Receiver::recv
61 //!
62 //! # Examples
63 //!
64 //! Basic usage
65 //!
66 //! ```
67 //! use tokio::sync::broadcast;
68 //!
69 //! #[tokio::main]
70 //! async fn main() {
71 //! let (tx, mut rx1) = broadcast::channel(16);
72 //! let mut rx2 = tx.subscribe();
73 //!
74 //! tokio::spawn(async move {
75 //! assert_eq!(rx1.recv().await.unwrap(), 10);
76 //! assert_eq!(rx1.recv().await.unwrap(), 20);
77 //! });
78 //!
79 //! tokio::spawn(async move {
80 //! assert_eq!(rx2.recv().await.unwrap(), 10);
81 //! assert_eq!(rx2.recv().await.unwrap(), 20);
82 //! });
83 //!
84 //! tx.send(10).unwrap();
85 //! tx.send(20).unwrap();
86 //! }
87 //! ```
88 //!
89 //! Handling lag
90 //!
91 //! ```
92 //! use tokio::sync::broadcast;
93 //!
94 //! #[tokio::main]
95 //! async fn main() {
96 //! let (tx, mut rx) = broadcast::channel(2);
97 //!
98 //! tx.send(10).unwrap();
99 //! tx.send(20).unwrap();
100 //! tx.send(30).unwrap();
101 //!
102 //! // The receiver lagged behind
103 //! assert!(rx.recv().await.is_err());
104 //!
105 //! // At this point, we can abort or continue with lost messages
106 //!
107 //! assert_eq!(20, rx.recv().await.unwrap());
108 //! assert_eq!(30, rx.recv().await.unwrap());
109 //! }
110 //! ```
111
112 use crate::loom::cell::UnsafeCell;
113 use crate::loom::sync::atomic::AtomicUsize;
114 use crate::loom::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
115 use crate::util::linked_list::{self, LinkedList};
116
117 use std::fmt;
118 use std::future::Future;
119 use std::marker::PhantomPinned;
120 use std::pin::Pin;
121 use std::ptr::NonNull;
122 use std::sync::atomic::Ordering::SeqCst;
123 use std::task::{Context, Poll, Waker};
124 use std::usize;
125
126 /// Sending-half of the [`broadcast`] channel.
127 ///
128 /// May be used from many threads. Messages can be sent with
129 /// [`send`][Sender::send].
130 ///
131 /// # Examples
132 ///
133 /// ```
134 /// use tokio::sync::broadcast;
135 ///
136 /// #[tokio::main]
137 /// async fn main() {
138 /// let (tx, mut rx1) = broadcast::channel(16);
139 /// let mut rx2 = tx.subscribe();
140 ///
141 /// tokio::spawn(async move {
142 /// assert_eq!(rx1.recv().await.unwrap(), 10);
143 /// assert_eq!(rx1.recv().await.unwrap(), 20);
144 /// });
145 ///
146 /// tokio::spawn(async move {
147 /// assert_eq!(rx2.recv().await.unwrap(), 10);
148 /// assert_eq!(rx2.recv().await.unwrap(), 20);
149 /// });
150 ///
151 /// tx.send(10).unwrap();
152 /// tx.send(20).unwrap();
153 /// }
154 /// ```
155 ///
156 /// [`broadcast`]: crate::sync::broadcast
157 pub struct Sender<T> {
158 shared: Arc<Shared<T>>,
159 }
160
161 /// Receiving-half of the [`broadcast`] channel.
162 ///
163 /// Must not be used concurrently. Messages may be retrieved using
164 /// [`recv`][Receiver::recv].
165 ///
166 /// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
167 /// wrapper.
168 ///
169 /// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
170 ///
171 /// # Examples
172 ///
173 /// ```
174 /// use tokio::sync::broadcast;
175 ///
176 /// #[tokio::main]
177 /// async fn main() {
178 /// let (tx, mut rx1) = broadcast::channel(16);
179 /// let mut rx2 = tx.subscribe();
180 ///
181 /// tokio::spawn(async move {
182 /// assert_eq!(rx1.recv().await.unwrap(), 10);
183 /// assert_eq!(rx1.recv().await.unwrap(), 20);
184 /// });
185 ///
186 /// tokio::spawn(async move {
187 /// assert_eq!(rx2.recv().await.unwrap(), 10);
188 /// assert_eq!(rx2.recv().await.unwrap(), 20);
189 /// });
190 ///
191 /// tx.send(10).unwrap();
192 /// tx.send(20).unwrap();
193 /// }
194 /// ```
195 ///
196 /// [`broadcast`]: crate::sync::broadcast
197 pub struct Receiver<T> {
198 /// State shared with all receivers and senders.
199 shared: Arc<Shared<T>>,
200
201 /// Next position to read from
202 next: u64,
203 }
204
205 pub mod error {
206 //! Broadcast error types
207
208 use std::fmt;
209
210 /// Error returned by from the [`send`] function on a [`Sender`].
211 ///
212 /// A **send** operation can only fail if there are no active receivers,
213 /// implying that the message could never be received. The error contains the
214 /// message being sent as a payload so it can be recovered.
215 ///
216 /// [`send`]: crate::sync::broadcast::Sender::send
217 /// [`Sender`]: crate::sync::broadcast::Sender
218 #[derive(Debug)]
219 pub struct SendError<T>(pub T);
220
221 impl<T> fmt::Display for SendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 write!(f, "channel closed")
224 }
225 }
226
227 impl<T: fmt::Debug> std::error::Error for SendError<T> {}
228
229 /// An error returned from the [`recv`] function on a [`Receiver`].
230 ///
231 /// [`recv`]: crate::sync::broadcast::Receiver::recv
232 /// [`Receiver`]: crate::sync::broadcast::Receiver
233 #[derive(Debug, PartialEq)]
234 pub enum RecvError {
235 /// There are no more active senders implying no further messages will ever
236 /// be sent.
237 Closed,
238
239 /// The receiver lagged too far behind. Attempting to receive again will
240 /// return the oldest message still retained by the channel.
241 ///
242 /// Includes the number of skipped messages.
243 Lagged(u64),
244 }
245
246 impl fmt::Display for RecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248 match self {
249 RecvError::Closed => write!(f, "channel closed"),
250 RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
251 }
252 }
253 }
254
255 impl std::error::Error for RecvError {}
256
257 /// An error returned from the [`try_recv`] function on a [`Receiver`].
258 ///
259 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
260 /// [`Receiver`]: crate::sync::broadcast::Receiver
261 #[derive(Debug, PartialEq)]
262 pub enum TryRecvError {
263 /// The channel is currently empty. There are still active
264 /// [`Sender`] handles, so data may yet become available.
265 ///
266 /// [`Sender`]: crate::sync::broadcast::Sender
267 Empty,
268
269 /// There are no more active senders implying no further messages will ever
270 /// be sent.
271 Closed,
272
273 /// The receiver lagged too far behind and has been forcibly disconnected.
274 /// Attempting to receive again will return the oldest message still
275 /// retained by the channel.
276 ///
277 /// Includes the number of skipped messages.
278 Lagged(u64),
279 }
280
281 impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result282 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
283 match self {
284 TryRecvError::Empty => write!(f, "channel empty"),
285 TryRecvError::Closed => write!(f, "channel closed"),
286 TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
287 }
288 }
289 }
290
291 impl std::error::Error for TryRecvError {}
292 }
293
294 use self::error::*;
295
296 /// Data shared between senders and receivers.
297 struct Shared<T> {
298 /// slots in the channel.
299 buffer: Box<[RwLock<Slot<T>>]>,
300
301 /// Mask a position -> index.
302 mask: usize,
303
304 /// Tail of the queue. Includes the rx wait list.
305 tail: Mutex<Tail>,
306
307 /// Number of outstanding Sender handles.
308 num_tx: AtomicUsize,
309 }
310
311 /// Next position to write a value.
312 struct Tail {
313 /// Next position to write to.
314 pos: u64,
315
316 /// Number of active receivers.
317 rx_cnt: usize,
318
319 /// True if the channel is closed.
320 closed: bool,
321
322 /// Receivers waiting for a value.
323 waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
324 }
325
326 /// Slot in the buffer.
327 struct Slot<T> {
328 /// Remaining number of receivers that are expected to see this value.
329 ///
330 /// When this goes to zero, the value is released.
331 ///
332 /// An atomic is used as it is mutated concurrently with the slot read lock
333 /// acquired.
334 rem: AtomicUsize,
335
336 /// Uniquely identifies the `send` stored in the slot.
337 pos: u64,
338
339 /// True signals the channel is closed.
340 closed: bool,
341
342 /// The value being broadcast.
343 ///
344 /// The value is set by `send` when the write lock is held. When a reader
345 /// drops, `rem` is decremented. When it hits zero, the value is dropped.
346 val: UnsafeCell<Option<T>>,
347 }
348
349 /// An entry in the wait queue.
350 struct Waiter {
351 /// True if queued.
352 queued: bool,
353
354 /// Task waiting on the broadcast channel.
355 waker: Option<Waker>,
356
357 /// Intrusive linked-list pointers.
358 pointers: linked_list::Pointers<Waiter>,
359
360 /// Should not be `Unpin`.
361 _p: PhantomPinned,
362 }
363
364 struct RecvGuard<'a, T> {
365 slot: RwLockReadGuard<'a, Slot<T>>,
366 }
367
368 /// Receive a value future.
369 struct Recv<'a, T> {
370 /// Receiver being waited on.
371 receiver: &'a mut Receiver<T>,
372
373 /// Entry in the waiter `LinkedList`.
374 waiter: UnsafeCell<Waiter>,
375 }
376
377 unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
378 unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
379
380 /// Max number of receivers. Reserve space to lock.
381 const MAX_RECEIVERS: usize = usize::MAX >> 2;
382
383 /// Create a bounded, multi-producer, multi-consumer channel where each sent
384 /// value is broadcasted to all active receivers.
385 ///
386 /// All data sent on [`Sender`] will become available on every active
387 /// [`Receiver`] in the same order as it was sent.
388 ///
389 /// The `Sender` can be cloned to `send` to the same channel from multiple
390 /// points in the process or it can be used concurrently from an `Arc`. New
391 /// `Receiver` handles are created by calling [`Sender::subscribe`].
392 ///
393 /// If all [`Receiver`] handles are dropped, the `send` method will return a
394 /// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
395 /// method will return a [`RecvError`].
396 ///
397 /// [`Sender`]: crate::sync::broadcast::Sender
398 /// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
399 /// [`Receiver`]: crate::sync::broadcast::Receiver
400 /// [`recv`]: crate::sync::broadcast::Receiver::recv
401 /// [`SendError`]: crate::sync::broadcast::error::SendError
402 /// [`RecvError`]: crate::sync::broadcast::error::RecvError
403 ///
404 /// # Examples
405 ///
406 /// ```
407 /// use tokio::sync::broadcast;
408 ///
409 /// #[tokio::main]
410 /// async fn main() {
411 /// let (tx, mut rx1) = broadcast::channel(16);
412 /// let mut rx2 = tx.subscribe();
413 ///
414 /// tokio::spawn(async move {
415 /// assert_eq!(rx1.recv().await.unwrap(), 10);
416 /// assert_eq!(rx1.recv().await.unwrap(), 20);
417 /// });
418 ///
419 /// tokio::spawn(async move {
420 /// assert_eq!(rx2.recv().await.unwrap(), 10);
421 /// assert_eq!(rx2.recv().await.unwrap(), 20);
422 /// });
423 ///
424 /// tx.send(10).unwrap();
425 /// tx.send(20).unwrap();
426 /// }
427 /// ```
channel<T: Clone>(mut capacity: usize) -> (Sender<T>, Receiver<T>)428 pub fn channel<T: Clone>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
429 assert!(capacity > 0, "capacity is empty");
430 assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
431
432 // Round to a power of two
433 capacity = capacity.next_power_of_two();
434
435 let mut buffer = Vec::with_capacity(capacity);
436
437 for i in 0..capacity {
438 buffer.push(RwLock::new(Slot {
439 rem: AtomicUsize::new(0),
440 pos: (i as u64).wrapping_sub(capacity as u64),
441 closed: false,
442 val: UnsafeCell::new(None),
443 }));
444 }
445
446 let shared = Arc::new(Shared {
447 buffer: buffer.into_boxed_slice(),
448 mask: capacity - 1,
449 tail: Mutex::new(Tail {
450 pos: 0,
451 rx_cnt: 1,
452 closed: false,
453 waiters: LinkedList::new(),
454 }),
455 num_tx: AtomicUsize::new(1),
456 });
457
458 let rx = Receiver {
459 shared: shared.clone(),
460 next: 0,
461 };
462
463 let tx = Sender { shared };
464
465 (tx, rx)
466 }
467
468 unsafe impl<T: Send> Send for Sender<T> {}
469 unsafe impl<T: Send> Sync for Sender<T> {}
470
471 unsafe impl<T: Send> Send for Receiver<T> {}
472 unsafe impl<T: Send> Sync for Receiver<T> {}
473
474 impl<T> Sender<T> {
475 /// Attempts to send a value to all active [`Receiver`] handles, returning
476 /// it back if it could not be sent.
477 ///
478 /// A successful send occurs when there is at least one active [`Receiver`]
479 /// handle. An unsuccessful send would be one where all associated
480 /// [`Receiver`] handles have already been dropped.
481 ///
482 /// # Return
483 ///
484 /// On success, the number of subscribed [`Receiver`] handles is returned.
485 /// This does not mean that this number of receivers will see the message as
486 /// a receiver may drop before receiving the message.
487 ///
488 /// # Note
489 ///
490 /// A return value of `Ok` **does not** mean that the sent value will be
491 /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
492 /// handles may be dropped before receiving the sent message.
493 ///
494 /// A return value of `Err` **does not** mean that future calls to `send`
495 /// will fail. New [`Receiver`] handles may be created by calling
496 /// [`subscribe`].
497 ///
498 /// [`Receiver`]: crate::sync::broadcast::Receiver
499 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
500 ///
501 /// # Examples
502 ///
503 /// ```
504 /// use tokio::sync::broadcast;
505 ///
506 /// #[tokio::main]
507 /// async fn main() {
508 /// let (tx, mut rx1) = broadcast::channel(16);
509 /// let mut rx2 = tx.subscribe();
510 ///
511 /// tokio::spawn(async move {
512 /// assert_eq!(rx1.recv().await.unwrap(), 10);
513 /// assert_eq!(rx1.recv().await.unwrap(), 20);
514 /// });
515 ///
516 /// tokio::spawn(async move {
517 /// assert_eq!(rx2.recv().await.unwrap(), 10);
518 /// assert_eq!(rx2.recv().await.unwrap(), 20);
519 /// });
520 ///
521 /// tx.send(10).unwrap();
522 /// tx.send(20).unwrap();
523 /// }
524 /// ```
send(&self, value: T) -> Result<usize, SendError<T>>525 pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
526 self.send2(Some(value))
527 .map_err(|SendError(maybe_v)| SendError(maybe_v.unwrap()))
528 }
529
530 /// Creates a new [`Receiver`] handle that will receive values sent **after**
531 /// this call to `subscribe`.
532 ///
533 /// # Examples
534 ///
535 /// ```
536 /// use tokio::sync::broadcast;
537 ///
538 /// #[tokio::main]
539 /// async fn main() {
540 /// let (tx, _rx) = broadcast::channel(16);
541 ///
542 /// // Will not be seen
543 /// tx.send(10).unwrap();
544 ///
545 /// let mut rx = tx.subscribe();
546 ///
547 /// tx.send(20).unwrap();
548 ///
549 /// let value = rx.recv().await.unwrap();
550 /// assert_eq!(20, value);
551 /// }
552 /// ```
subscribe(&self) -> Receiver<T>553 pub fn subscribe(&self) -> Receiver<T> {
554 let shared = self.shared.clone();
555 new_receiver(shared)
556 }
557
558 /// Returns the number of active receivers
559 ///
560 /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
561 /// [`subscribe`]. These are the handles that will receive values sent on
562 /// this [`Sender`].
563 ///
564 /// # Note
565 ///
566 /// It is not guaranteed that a sent message will reach this number of
567 /// receivers. Active receivers may never call [`recv`] again before
568 /// dropping.
569 ///
570 /// [`recv`]: crate::sync::broadcast::Receiver::recv
571 /// [`Receiver`]: crate::sync::broadcast::Receiver
572 /// [`Sender`]: crate::sync::broadcast::Sender
573 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
574 /// [`channel`]: crate::sync::broadcast::channel
575 ///
576 /// # Examples
577 ///
578 /// ```
579 /// use tokio::sync::broadcast;
580 ///
581 /// #[tokio::main]
582 /// async fn main() {
583 /// let (tx, _rx1) = broadcast::channel(16);
584 ///
585 /// assert_eq!(1, tx.receiver_count());
586 ///
587 /// let mut _rx2 = tx.subscribe();
588 ///
589 /// assert_eq!(2, tx.receiver_count());
590 ///
591 /// tx.send(10).unwrap();
592 /// }
593 /// ```
receiver_count(&self) -> usize594 pub fn receiver_count(&self) -> usize {
595 let tail = self.shared.tail.lock();
596 tail.rx_cnt
597 }
598
send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>>599 fn send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>> {
600 let mut tail = self.shared.tail.lock();
601
602 if tail.rx_cnt == 0 {
603 return Err(SendError(value));
604 }
605
606 // Position to write into
607 let pos = tail.pos;
608 let rem = tail.rx_cnt;
609 let idx = (pos & self.shared.mask as u64) as usize;
610
611 // Update the tail position
612 tail.pos = tail.pos.wrapping_add(1);
613
614 // Get the slot
615 let mut slot = self.shared.buffer[idx].write().unwrap();
616
617 // Track the position
618 slot.pos = pos;
619
620 // Set remaining receivers
621 slot.rem.with_mut(|v| *v = rem);
622
623 // Set the closed bit if the value is `None`; otherwise write the value
624 if value.is_none() {
625 tail.closed = true;
626 slot.closed = true;
627 } else {
628 slot.val.with_mut(|ptr| unsafe { *ptr = value });
629 }
630
631 // Release the slot lock before notifying the receivers.
632 drop(slot);
633
634 tail.notify_rx();
635
636 // Release the mutex. This must happen after the slot lock is released,
637 // otherwise the writer lock bit could be cleared while another thread
638 // is in the critical section.
639 drop(tail);
640
641 Ok(rem)
642 }
643 }
644
new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T>645 fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
646 let mut tail = shared.tail.lock();
647
648 if tail.rx_cnt == MAX_RECEIVERS {
649 panic!("max receivers");
650 }
651
652 tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
653
654 let next = tail.pos;
655
656 drop(tail);
657
658 Receiver { shared, next }
659 }
660
661 impl Tail {
notify_rx(&mut self)662 fn notify_rx(&mut self) {
663 while let Some(mut waiter) = self.waiters.pop_back() {
664 // Safety: `waiters` lock is still held.
665 let waiter = unsafe { waiter.as_mut() };
666
667 assert!(waiter.queued);
668 waiter.queued = false;
669
670 let waker = waiter.waker.take().unwrap();
671 waker.wake();
672 }
673 }
674 }
675
676 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>677 fn clone(&self) -> Sender<T> {
678 let shared = self.shared.clone();
679 shared.num_tx.fetch_add(1, SeqCst);
680
681 Sender { shared }
682 }
683 }
684
685 impl<T> Drop for Sender<T> {
drop(&mut self)686 fn drop(&mut self) {
687 if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
688 let _ = self.send2(None);
689 }
690 }
691 }
692
693 impl<T> Receiver<T> {
694 /// Locks the next value if there is one.
recv_ref( &mut self, waiter: Option<(&UnsafeCell<Waiter>, &Waker)>, ) -> Result<RecvGuard<'_, T>, TryRecvError>695 fn recv_ref(
696 &mut self,
697 waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
698 ) -> Result<RecvGuard<'_, T>, TryRecvError> {
699 let idx = (self.next & self.shared.mask as u64) as usize;
700
701 // The slot holding the next value to read
702 let mut slot = self.shared.buffer[idx].read().unwrap();
703
704 if slot.pos != self.next {
705 let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
706
707 // The receiver has read all current values in the channel and there
708 // is no waiter to register
709 if waiter.is_none() && next_pos == self.next {
710 return Err(TryRecvError::Empty);
711 }
712
713 // Release the `slot` lock before attempting to acquire the `tail`
714 // lock. This is required because `send2` acquires the tail lock
715 // first followed by the slot lock. Acquiring the locks in reverse
716 // order here would result in a potential deadlock: `recv_ref`
717 // acquires the `slot` lock and attempts to acquire the `tail` lock
718 // while `send2` acquired the `tail` lock and attempts to acquire
719 // the slot lock.
720 drop(slot);
721
722 let mut tail = self.shared.tail.lock();
723
724 // Acquire slot lock again
725 slot = self.shared.buffer[idx].read().unwrap();
726
727 // Make sure the position did not change. This could happen in the
728 // unlikely event that the buffer is wrapped between dropping the
729 // read lock and acquiring the tail lock.
730 if slot.pos != self.next {
731 let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
732
733 if next_pos == self.next {
734 // Store the waker
735 if let Some((waiter, waker)) = waiter {
736 // Safety: called while locked.
737 unsafe {
738 // Only queue if not already queued
739 waiter.with_mut(|ptr| {
740 // If there is no waker **or** if the currently
741 // stored waker references a **different** task,
742 // track the tasks' waker to be notified on
743 // receipt of a new value.
744 match (*ptr).waker {
745 Some(ref w) if w.will_wake(waker) => {}
746 _ => {
747 (*ptr).waker = Some(waker.clone());
748 }
749 }
750
751 if !(*ptr).queued {
752 (*ptr).queued = true;
753 tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
754 }
755 });
756 }
757 }
758
759 return Err(TryRecvError::Empty);
760 }
761
762 // At this point, the receiver has lagged behind the sender by
763 // more than the channel capacity. The receiver will attempt to
764 // catch up by skipping dropped messages and setting the
765 // internal cursor to the **oldest** message stored by the
766 // channel.
767 //
768 // However, finding the oldest position is a bit more
769 // complicated than `tail-position - buffer-size`. When
770 // the channel is closed, the tail position is incremented to
771 // signal a new `None` message, but `None` is not stored in the
772 // channel itself (see issue #2425 for why).
773 //
774 // To account for this, if the channel is closed, the tail
775 // position is decremented by `buffer-size + 1`.
776 let mut adjust = 0;
777 if tail.closed {
778 adjust = 1
779 }
780 let next = tail
781 .pos
782 .wrapping_sub(self.shared.buffer.len() as u64 + adjust);
783
784 let missed = next.wrapping_sub(self.next);
785
786 drop(tail);
787
788 // The receiver is slow but no values have been missed
789 if missed == 0 {
790 self.next = self.next.wrapping_add(1);
791
792 return Ok(RecvGuard { slot });
793 }
794
795 self.next = next;
796
797 return Err(TryRecvError::Lagged(missed));
798 }
799 }
800
801 self.next = self.next.wrapping_add(1);
802
803 if slot.closed {
804 return Err(TryRecvError::Closed);
805 }
806
807 Ok(RecvGuard { slot })
808 }
809 }
810
811 impl<T: Clone> Receiver<T> {
812 /// Receives the next value for this receiver.
813 ///
814 /// Each [`Receiver`] handle will receive a clone of all values sent
815 /// **after** it has subscribed.
816 ///
817 /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
818 /// dropped, indicating that no further values can be sent on the channel.
819 ///
820 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
821 /// sent values will overwrite old values. At this point, a call to [`recv`]
822 /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
823 /// internal cursor is updated to point to the oldest value still held by
824 /// the channel. A subsequent call to [`recv`] will return this value
825 /// **unless** it has been since overwritten.
826 ///
827 /// # Cancel safety
828 ///
829 /// This method is cancel safe. If `recv` is used as the event in a
830 /// [`tokio::select!`](crate::select) statement and some other branch
831 /// completes first, it is guaranteed that no messages were received on this
832 /// channel.
833 ///
834 /// [`Receiver`]: crate::sync::broadcast::Receiver
835 /// [`recv`]: crate::sync::broadcast::Receiver::recv
836 ///
837 /// # Examples
838 ///
839 /// ```
840 /// use tokio::sync::broadcast;
841 ///
842 /// #[tokio::main]
843 /// async fn main() {
844 /// let (tx, mut rx1) = broadcast::channel(16);
845 /// let mut rx2 = tx.subscribe();
846 ///
847 /// tokio::spawn(async move {
848 /// assert_eq!(rx1.recv().await.unwrap(), 10);
849 /// assert_eq!(rx1.recv().await.unwrap(), 20);
850 /// });
851 ///
852 /// tokio::spawn(async move {
853 /// assert_eq!(rx2.recv().await.unwrap(), 10);
854 /// assert_eq!(rx2.recv().await.unwrap(), 20);
855 /// });
856 ///
857 /// tx.send(10).unwrap();
858 /// tx.send(20).unwrap();
859 /// }
860 /// ```
861 ///
862 /// Handling lag
863 ///
864 /// ```
865 /// use tokio::sync::broadcast;
866 ///
867 /// #[tokio::main]
868 /// async fn main() {
869 /// let (tx, mut rx) = broadcast::channel(2);
870 ///
871 /// tx.send(10).unwrap();
872 /// tx.send(20).unwrap();
873 /// tx.send(30).unwrap();
874 ///
875 /// // The receiver lagged behind
876 /// assert!(rx.recv().await.is_err());
877 ///
878 /// // At this point, we can abort or continue with lost messages
879 ///
880 /// assert_eq!(20, rx.recv().await.unwrap());
881 /// assert_eq!(30, rx.recv().await.unwrap());
882 /// }
883 /// ```
recv(&mut self) -> Result<T, RecvError>884 pub async fn recv(&mut self) -> Result<T, RecvError> {
885 let fut = Recv::new(self);
886 fut.await
887 }
888
889 /// Attempts to return a pending value on this receiver without awaiting.
890 ///
891 /// This is useful for a flavor of "optimistic check" before deciding to
892 /// await on a receiver.
893 ///
894 /// Compared with [`recv`], this function has three failure cases instead of two
895 /// (one for closed, one for an empty buffer, one for a lagging receiver).
896 ///
897 /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
898 /// dropped, indicating that no further values can be sent on the channel.
899 ///
900 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
901 /// sent values will overwrite old values. At this point, a call to [`recv`]
902 /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
903 /// internal cursor is updated to point to the oldest value still held by
904 /// the channel. A subsequent call to [`try_recv`] will return this value
905 /// **unless** it has been since overwritten. If there are no values to
906 /// receive, `Err(TryRecvError::Empty)` is returned.
907 ///
908 /// [`recv`]: crate::sync::broadcast::Receiver::recv
909 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
910 /// [`Receiver`]: crate::sync::broadcast::Receiver
911 ///
912 /// # Examples
913 ///
914 /// ```
915 /// use tokio::sync::broadcast;
916 ///
917 /// #[tokio::main]
918 /// async fn main() {
919 /// let (tx, mut rx) = broadcast::channel(16);
920 ///
921 /// assert!(rx.try_recv().is_err());
922 ///
923 /// tx.send(10).unwrap();
924 ///
925 /// let value = rx.try_recv().unwrap();
926 /// assert_eq!(10, value);
927 /// }
928 /// ```
try_recv(&mut self) -> Result<T, TryRecvError>929 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
930 let guard = self.recv_ref(None)?;
931 guard.clone_value().ok_or(TryRecvError::Closed)
932 }
933 }
934
935 impl<T> Drop for Receiver<T> {
drop(&mut self)936 fn drop(&mut self) {
937 let mut tail = self.shared.tail.lock();
938
939 tail.rx_cnt -= 1;
940 let until = tail.pos;
941
942 drop(tail);
943
944 while self.next < until {
945 match self.recv_ref(None) {
946 Ok(_) => {}
947 // The channel is closed
948 Err(TryRecvError::Closed) => break,
949 // Ignore lagging, we will catch up
950 Err(TryRecvError::Lagged(..)) => {}
951 // Can't be empty
952 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
953 }
954 }
955 }
956 }
957
958 impl<'a, T> Recv<'a, T> {
new(receiver: &'a mut Receiver<T>) -> Recv<'a, T>959 fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
960 Recv {
961 receiver,
962 waiter: UnsafeCell::new(Waiter {
963 queued: false,
964 waker: None,
965 pointers: linked_list::Pointers::new(),
966 _p: PhantomPinned,
967 }),
968 }
969 }
970
971 /// A custom `project` implementation is used in place of `pin-project-lite`
972 /// as a custom drop implementation is needed.
project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>)973 fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
974 unsafe {
975 // Safety: Receiver is Unpin
976 is_unpin::<&mut Receiver<T>>();
977
978 let me = self.get_unchecked_mut();
979 (me.receiver, &me.waiter)
980 }
981 }
982 }
983
984 impl<'a, T> Future for Recv<'a, T>
985 where
986 T: Clone,
987 {
988 type Output = Result<T, RecvError>;
989
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>990 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
991 let (receiver, waiter) = self.project();
992
993 let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
994 Ok(value) => value,
995 Err(TryRecvError::Empty) => return Poll::Pending,
996 Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
997 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
998 };
999
1000 Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1001 }
1002 }
1003
1004 impl<'a, T> Drop for Recv<'a, T> {
drop(&mut self)1005 fn drop(&mut self) {
1006 // Acquire the tail lock. This is required for safety before accessing
1007 // the waiter node.
1008 let mut tail = self.receiver.shared.tail.lock();
1009
1010 // safety: tail lock is held
1011 let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued });
1012
1013 if queued {
1014 // Remove the node
1015 //
1016 // safety: tail lock is held and the wait node is verified to be in
1017 // the list.
1018 unsafe {
1019 self.waiter.with_mut(|ptr| {
1020 tail.waiters.remove((&mut *ptr).into());
1021 });
1022 }
1023 }
1024 }
1025 }
1026
1027 /// # Safety
1028 ///
1029 /// `Waiter` is forced to be !Unpin.
1030 unsafe impl linked_list::Link for Waiter {
1031 type Handle = NonNull<Waiter>;
1032 type Target = Waiter;
1033
as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>1034 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1035 *handle
1036 }
1037
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>1038 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1039 ptr
1040 }
1041
pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>1042 unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1043 NonNull::from(&mut target.as_mut().pointers)
1044 }
1045 }
1046
1047 impl<T> fmt::Debug for Sender<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1048 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1049 write!(fmt, "broadcast::Sender")
1050 }
1051 }
1052
1053 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1054 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1055 write!(fmt, "broadcast::Receiver")
1056 }
1057 }
1058
1059 impl<'a, T> RecvGuard<'a, T> {
clone_value(&self) -> Option<T> where T: Clone,1060 fn clone_value(&self) -> Option<T>
1061 where
1062 T: Clone,
1063 {
1064 self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
1065 }
1066 }
1067
1068 impl<'a, T> Drop for RecvGuard<'a, T> {
drop(&mut self)1069 fn drop(&mut self) {
1070 // Decrement the remaining counter
1071 if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1072 // Safety: Last receiver, drop the value
1073 self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
1074 }
1075 }
1076 }
1077
is_unpin<T: Unpin>()1078 fn is_unpin<T: Unpin>() {}
1079