1 //! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2 //! all consumers.
3 //!
4 //! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5 //! values. [`Sender`] handles are clone-able, allowing concurrent send and
6 //! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7 //! long as `T` is also `Send` or `Sync` respectively.
8 //!
9 //! When a value is sent, **all** [`Receiver`] handles are notified and will
10 //! receive the value. The value is stored once inside the channel and cloned on
11 //! demand for each receiver. Once all receivers have received a clone of the
12 //! value, the value is released from the channel.
13 //!
14 //! A channel is created by calling [`channel`], specifying the maximum number
15 //! of messages the channel can retain at any given time.
16 //!
17 //! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18 //! returned [`Receiver`] will receive values sent **after** the call to
19 //! `subscribe`.
20 //!
21 //! ## Lagging
22 //!
23 //! As sent messages must be retained until **all** [`Receiver`] handles receive
24 //! a clone, broadcast channels are susceptible to the "slow receiver" problem.
25 //! In this case, all but one receiver are able to receive values at the rate
26 //! they are sent. Because one receiver is stalled, the channel starts to fill
27 //! up.
28 //!
29 //! This broadcast channel implementation handles this case by setting a hard
30 //! upper bound on the number of values the channel may retain at any given
31 //! time. This upper bound is passed to the [`channel`] function as an argument.
32 //!
33 //! If a value is sent when the channel is at capacity, the oldest value
34 //! currently held by the channel is released. This frees up space for the new
35 //! value. Any receiver that has not yet seen the released value will return
36 //! [`RecvError::Lagged`] the next time [`recv`] is called.
37 //!
38 //! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
39 //! updated to the oldest value contained by the channel. The next call to
40 //! [`recv`] will return this value.
41 //!
42 //! This behavior enables a receiver to detect when it has lagged so far behind
43 //! that data has been dropped. The caller may decide how to respond to this:
44 //! either by aborting its task or by tolerating lost messages and resuming
45 //! consumption of the channel.
46 //!
47 //! ## Closing
48 //!
49 //! When **all** [`Sender`] handles have been dropped, no new values may be
50 //! sent. At this point, the channel is "closed". Once a receiver has received
51 //! all values retained by the channel, the next call to [`recv`] will return
52 //! with [`RecvError::Closed`].
53 //!
54 //! [`Sender`]: crate::sync::broadcast::Sender
55 //! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
56 //! [`Receiver`]: crate::sync::broadcast::Receiver
57 //! [`channel`]: crate::sync::broadcast::channel
58 //! [`RecvError::Lagged`]: crate::sync::broadcast::RecvError::Lagged
59 //! [`RecvError::Closed`]: crate::sync::broadcast::RecvError::Closed
60 //! [`recv`]: crate::sync::broadcast::Receiver::recv
61 //!
62 //! # Examples
63 //!
64 //! Basic usage
65 //!
66 //! ```
67 //! use tokio::sync::broadcast;
68 //!
69 //! #[tokio::main]
70 //! async fn main() {
71 //! let (tx, mut rx1) = broadcast::channel(16);
72 //! let mut rx2 = tx.subscribe();
73 //!
74 //! tokio::spawn(async move {
75 //! assert_eq!(rx1.recv().await.unwrap(), 10);
76 //! assert_eq!(rx1.recv().await.unwrap(), 20);
77 //! });
78 //!
79 //! tokio::spawn(async move {
80 //! assert_eq!(rx2.recv().await.unwrap(), 10);
81 //! assert_eq!(rx2.recv().await.unwrap(), 20);
82 //! });
83 //!
84 //! tx.send(10).unwrap();
85 //! tx.send(20).unwrap();
86 //! }
87 //! ```
88 //!
89 //! Handling lag
90 //!
91 //! ```
92 //! use tokio::sync::broadcast;
93 //!
94 //! #[tokio::main]
95 //! async fn main() {
96 //! let (tx, mut rx) = broadcast::channel(2);
97 //!
98 //! tx.send(10).unwrap();
99 //! tx.send(20).unwrap();
100 //! tx.send(30).unwrap();
101 //!
102 //! // The receiver lagged behind
103 //! assert!(rx.recv().await.is_err());
104 //!
105 //! // At this point, we can abort or continue with lost messages
106 //!
107 //! assert_eq!(20, rx.recv().await.unwrap());
108 //! assert_eq!(30, rx.recv().await.unwrap());
109 //! }
110
111 use crate::loom::cell::UnsafeCell;
112 use crate::loom::sync::atomic::AtomicUsize;
113 use crate::loom::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
114 use crate::util::linked_list::{self, LinkedList};
115
116 use std::fmt;
117 use std::future::Future;
118 use std::marker::PhantomPinned;
119 use std::pin::Pin;
120 use std::ptr::NonNull;
121 use std::sync::atomic::Ordering::SeqCst;
122 use std::task::{Context, Poll, Waker};
123 use std::usize;
124
125 /// Sending-half of the [`broadcast`] channel.
126 ///
127 /// May be used from many threads. Messages can be sent with
128 /// [`send`][Sender::send].
129 ///
130 /// # Examples
131 ///
132 /// ```
133 /// use tokio::sync::broadcast;
134 ///
135 /// #[tokio::main]
136 /// async fn main() {
137 /// let (tx, mut rx1) = broadcast::channel(16);
138 /// let mut rx2 = tx.subscribe();
139 ///
140 /// tokio::spawn(async move {
141 /// assert_eq!(rx1.recv().await.unwrap(), 10);
142 /// assert_eq!(rx1.recv().await.unwrap(), 20);
143 /// });
144 ///
145 /// tokio::spawn(async move {
146 /// assert_eq!(rx2.recv().await.unwrap(), 10);
147 /// assert_eq!(rx2.recv().await.unwrap(), 20);
148 /// });
149 ///
150 /// tx.send(10).unwrap();
151 /// tx.send(20).unwrap();
152 /// }
153 /// ```
154 ///
155 /// [`broadcast`]: crate::sync::broadcast
156 pub struct Sender<T> {
157 shared: Arc<Shared<T>>,
158 }
159
160 /// Receiving-half of the [`broadcast`] channel.
161 ///
162 /// Must not be used concurrently. Messages may be retrieved using
163 /// [`recv`][Receiver::recv].
164 ///
165 /// # Examples
166 ///
167 /// ```
168 /// use tokio::sync::broadcast;
169 ///
170 /// #[tokio::main]
171 /// async fn main() {
172 /// let (tx, mut rx1) = broadcast::channel(16);
173 /// let mut rx2 = tx.subscribe();
174 ///
175 /// tokio::spawn(async move {
176 /// assert_eq!(rx1.recv().await.unwrap(), 10);
177 /// assert_eq!(rx1.recv().await.unwrap(), 20);
178 /// });
179 ///
180 /// tokio::spawn(async move {
181 /// assert_eq!(rx2.recv().await.unwrap(), 10);
182 /// assert_eq!(rx2.recv().await.unwrap(), 20);
183 /// });
184 ///
185 /// tx.send(10).unwrap();
186 /// tx.send(20).unwrap();
187 /// }
188 /// ```
189 ///
190 /// [`broadcast`]: crate::sync::broadcast
191 pub struct Receiver<T> {
192 /// State shared with all receivers and senders.
193 shared: Arc<Shared<T>>,
194
195 /// Next position to read from
196 next: u64,
197
198 /// Used to support the deprecated `poll_recv` fn
199 waiter: Option<Pin<Box<UnsafeCell<Waiter>>>>,
200 }
201
202 /// Error returned by [`Sender::send`][Sender::send].
203 ///
204 /// A **send** operation can only fail if there are no active receivers,
205 /// implying that the message could never be received. The error contains the
206 /// message being sent as a payload so it can be recovered.
207 #[derive(Debug)]
208 pub struct SendError<T>(pub T);
209
210 /// An error returned from the [`recv`] function on a [`Receiver`].
211 ///
212 /// [`recv`]: crate::sync::broadcast::Receiver::recv
213 /// [`Receiver`]: crate::sync::broadcast::Receiver
214 #[derive(Debug, PartialEq)]
215 pub enum RecvError {
216 /// There are no more active senders implying no further messages will ever
217 /// be sent.
218 Closed,
219
220 /// The receiver lagged too far behind. Attempting to receive again will
221 /// return the oldest message still retained by the channel.
222 ///
223 /// Includes the number of skipped messages.
224 Lagged(u64),
225 }
226
227 /// An error returned from the [`try_recv`] function on a [`Receiver`].
228 ///
229 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
230 /// [`Receiver`]: crate::sync::broadcast::Receiver
231 #[derive(Debug, PartialEq)]
232 pub enum TryRecvError {
233 /// The channel is currently empty. There are still active
234 /// [`Sender`][Sender] handles, so data may yet become available.
235 Empty,
236
237 /// There are no more active senders implying no further messages will ever
238 /// be sent.
239 Closed,
240
241 /// The receiver lagged too far behind and has been forcibly disconnected.
242 /// Attempting to receive again will return the oldest message still
243 /// retained by the channel.
244 ///
245 /// Includes the number of skipped messages.
246 Lagged(u64),
247 }
248
249 /// Data shared between senders and receivers
250 struct Shared<T> {
251 /// slots in the channel
252 buffer: Box<[RwLock<Slot<T>>]>,
253
254 /// Mask a position -> index
255 mask: usize,
256
257 /// Tail of the queue. Includes the rx wait list.
258 tail: Mutex<Tail>,
259
260 /// Number of outstanding Sender handles
261 num_tx: AtomicUsize,
262 }
263
264 /// Next position to write a value
265 struct Tail {
266 /// Next position to write to
267 pos: u64,
268
269 /// Number of active receivers
270 rx_cnt: usize,
271
272 /// True if the channel is closed
273 closed: bool,
274
275 /// Receivers waiting for a value
276 waiters: LinkedList<Waiter>,
277 }
278
279 /// Slot in the buffer
280 struct Slot<T> {
281 /// Remaining number of receivers that are expected to see this value.
282 ///
283 /// When this goes to zero, the value is released.
284 ///
285 /// An atomic is used as it is mutated concurrently with the slot read lock
286 /// acquired.
287 rem: AtomicUsize,
288
289 /// Uniquely identifies the `send` stored in the slot
290 pos: u64,
291
292 /// True signals the channel is closed.
293 closed: bool,
294
295 /// The value being broadcast.
296 ///
297 /// The value is set by `send` when the write lock is held. When a reader
298 /// drops, `rem` is decremented. When it hits zero, the value is dropped.
299 val: UnsafeCell<Option<T>>,
300 }
301
302 /// An entry in the wait queue
303 struct Waiter {
304 /// True if queued
305 queued: bool,
306
307 /// Task waiting on the broadcast channel.
308 waker: Option<Waker>,
309
310 /// Intrusive linked-list pointers.
311 pointers: linked_list::Pointers<Waiter>,
312
313 /// Should not be `Unpin`.
314 _p: PhantomPinned,
315 }
316
317 struct RecvGuard<'a, T> {
318 slot: RwLockReadGuard<'a, Slot<T>>,
319 }
320
321 /// Receive a value future
322 struct Recv<R, T>
323 where
324 R: AsMut<Receiver<T>>,
325 {
326 /// Receiver being waited on
327 receiver: R,
328
329 /// Entry in the waiter `LinkedList`
330 waiter: UnsafeCell<Waiter>,
331
332 _p: std::marker::PhantomData<T>,
333 }
334
335 /// `AsMut<T>` is not implemented for `T` (coherence). Explicitly implementing
336 /// `AsMut` for `Receiver` would be included in the public API of the receiver
337 /// type. Instead, `Borrow` is used internally to bridge the gap.
338 struct Borrow<T>(T);
339
340 impl<T> AsMut<Receiver<T>> for Borrow<Receiver<T>> {
as_mut(&mut self) -> &mut Receiver<T>341 fn as_mut(&mut self) -> &mut Receiver<T> {
342 &mut self.0
343 }
344 }
345
346 impl<'a, T> AsMut<Receiver<T>> for Borrow<&'a mut Receiver<T>> {
as_mut(&mut self) -> &mut Receiver<T>347 fn as_mut(&mut self) -> &mut Receiver<T> {
348 &mut *self.0
349 }
350 }
351
352 unsafe impl<R: AsMut<Receiver<T>> + Send, T: Send> Send for Recv<R, T> {}
353 unsafe impl<R: AsMut<Receiver<T>> + Sync, T: Send> Sync for Recv<R, T> {}
354
355 /// Max number of receivers. Reserve space to lock.
356 const MAX_RECEIVERS: usize = usize::MAX >> 2;
357
358 /// Create a bounded, multi-producer, multi-consumer channel where each sent
359 /// value is broadcasted to all active receivers.
360 ///
361 /// All data sent on [`Sender`] will become available on every active
362 /// [`Receiver`] in the same order as it was sent.
363 ///
364 /// The `Sender` can be cloned to `send` to the same channel from multiple
365 /// points in the process or it can be used concurrently from an `Arc`. New
366 /// `Receiver` handles are created by calling [`Sender::subscribe`].
367 ///
368 /// If all [`Receiver`] handles are dropped, the `send` method will return a
369 /// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
370 /// method will return a [`RecvError`].
371 ///
372 /// [`Sender`]: crate::sync::broadcast::Sender
373 /// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
374 /// [`Receiver`]: crate::sync::broadcast::Receiver
375 /// [`recv`]: crate::sync::broadcast::Receiver::recv
376 /// [`SendError`]: crate::sync::broadcast::SendError
377 /// [`RecvError`]: crate::sync::broadcast::RecvError
378 ///
379 /// # Examples
380 ///
381 /// ```
382 /// use tokio::sync::broadcast;
383 ///
384 /// #[tokio::main]
385 /// async fn main() {
386 /// let (tx, mut rx1) = broadcast::channel(16);
387 /// let mut rx2 = tx.subscribe();
388 ///
389 /// tokio::spawn(async move {
390 /// assert_eq!(rx1.recv().await.unwrap(), 10);
391 /// assert_eq!(rx1.recv().await.unwrap(), 20);
392 /// });
393 ///
394 /// tokio::spawn(async move {
395 /// assert_eq!(rx2.recv().await.unwrap(), 10);
396 /// assert_eq!(rx2.recv().await.unwrap(), 20);
397 /// });
398 ///
399 /// tx.send(10).unwrap();
400 /// tx.send(20).unwrap();
401 /// }
402 /// ```
channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>)403 pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
404 assert!(capacity > 0, "capacity is empty");
405 assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
406
407 // Round to a power of two
408 capacity = capacity.next_power_of_two();
409
410 let mut buffer = Vec::with_capacity(capacity);
411
412 for i in 0..capacity {
413 buffer.push(RwLock::new(Slot {
414 rem: AtomicUsize::new(0),
415 pos: (i as u64).wrapping_sub(capacity as u64),
416 closed: false,
417 val: UnsafeCell::new(None),
418 }));
419 }
420
421 let shared = Arc::new(Shared {
422 buffer: buffer.into_boxed_slice(),
423 mask: capacity - 1,
424 tail: Mutex::new(Tail {
425 pos: 0,
426 rx_cnt: 1,
427 closed: false,
428 waiters: LinkedList::new(),
429 }),
430 num_tx: AtomicUsize::new(1),
431 });
432
433 let rx = Receiver {
434 shared: shared.clone(),
435 next: 0,
436 waiter: None,
437 };
438
439 let tx = Sender { shared };
440
441 (tx, rx)
442 }
443
444 unsafe impl<T: Send> Send for Sender<T> {}
445 unsafe impl<T: Send> Sync for Sender<T> {}
446
447 unsafe impl<T: Send> Send for Receiver<T> {}
448 unsafe impl<T: Send> Sync for Receiver<T> {}
449
450 impl<T> Sender<T> {
451 /// Attempts to send a value to all active [`Receiver`] handles, returning
452 /// it back if it could not be sent.
453 ///
454 /// A successful send occurs when there is at least one active [`Receiver`]
455 /// handle. An unsuccessful send would be one where all associated
456 /// [`Receiver`] handles have already been dropped.
457 ///
458 /// # Return
459 ///
460 /// On success, the number of subscribed [`Receiver`] handles is returned.
461 /// This does not mean that this number of receivers will see the message as
462 /// a receiver may drop before receiving the message.
463 ///
464 /// # Note
465 ///
466 /// A return value of `Ok` **does not** mean that the sent value will be
467 /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
468 /// handles may be dropped before receiving the sent message.
469 ///
470 /// A return value of `Err` **does not** mean that future calls to `send`
471 /// will fail. New [`Receiver`] handles may be created by calling
472 /// [`subscribe`].
473 ///
474 /// [`Receiver`]: crate::sync::broadcast::Receiver
475 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
476 ///
477 /// # Examples
478 ///
479 /// ```
480 /// use tokio::sync::broadcast;
481 ///
482 /// #[tokio::main]
483 /// async fn main() {
484 /// let (tx, mut rx1) = broadcast::channel(16);
485 /// let mut rx2 = tx.subscribe();
486 ///
487 /// tokio::spawn(async move {
488 /// assert_eq!(rx1.recv().await.unwrap(), 10);
489 /// assert_eq!(rx1.recv().await.unwrap(), 20);
490 /// });
491 ///
492 /// tokio::spawn(async move {
493 /// assert_eq!(rx2.recv().await.unwrap(), 10);
494 /// assert_eq!(rx2.recv().await.unwrap(), 20);
495 /// });
496 ///
497 /// tx.send(10).unwrap();
498 /// tx.send(20).unwrap();
499 /// }
500 /// ```
send(&self, value: T) -> Result<usize, SendError<T>>501 pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
502 self.send2(Some(value))
503 .map_err(|SendError(maybe_v)| SendError(maybe_v.unwrap()))
504 }
505
506 /// Creates a new [`Receiver`] handle that will receive values sent **after**
507 /// this call to `subscribe`.
508 ///
509 /// # Examples
510 ///
511 /// ```
512 /// use tokio::sync::broadcast;
513 ///
514 /// #[tokio::main]
515 /// async fn main() {
516 /// let (tx, _rx) = broadcast::channel(16);
517 ///
518 /// // Will not be seen
519 /// tx.send(10).unwrap();
520 ///
521 /// let mut rx = tx.subscribe();
522 ///
523 /// tx.send(20).unwrap();
524 ///
525 /// let value = rx.recv().await.unwrap();
526 /// assert_eq!(20, value);
527 /// }
528 /// ```
subscribe(&self) -> Receiver<T>529 pub fn subscribe(&self) -> Receiver<T> {
530 let shared = self.shared.clone();
531
532 let mut tail = shared.tail.lock().unwrap();
533
534 if tail.rx_cnt == MAX_RECEIVERS {
535 panic!("max receivers");
536 }
537
538 tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
539 let next = tail.pos;
540
541 drop(tail);
542
543 Receiver {
544 shared,
545 next,
546 waiter: None,
547 }
548 }
549
550 /// Returns the number of active receivers
551 ///
552 /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
553 /// [`subscribe`]. These are the handles that will receive values sent on
554 /// this [`Sender`].
555 ///
556 /// # Note
557 ///
558 /// It is not guaranteed that a sent message will reach this number of
559 /// receivers. Active receivers may never call [`recv`] again before
560 /// dropping.
561 ///
562 /// [`recv`]: crate::sync::broadcast::Receiver::recv
563 /// [`Receiver`]: crate::sync::broadcast::Receiver
564 /// [`Sender`]: crate::sync::broadcast::Sender
565 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
566 /// [`channel`]: crate::sync::broadcast::channel
567 ///
568 /// # Examples
569 ///
570 /// ```
571 /// use tokio::sync::broadcast;
572 ///
573 /// #[tokio::main]
574 /// async fn main() {
575 /// let (tx, _rx1) = broadcast::channel(16);
576 ///
577 /// assert_eq!(1, tx.receiver_count());
578 ///
579 /// let mut _rx2 = tx.subscribe();
580 ///
581 /// assert_eq!(2, tx.receiver_count());
582 ///
583 /// tx.send(10).unwrap();
584 /// }
585 /// ```
receiver_count(&self) -> usize586 pub fn receiver_count(&self) -> usize {
587 let tail = self.shared.tail.lock().unwrap();
588 tail.rx_cnt
589 }
590
send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>>591 fn send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>> {
592 let mut tail = self.shared.tail.lock().unwrap();
593
594 if tail.rx_cnt == 0 {
595 return Err(SendError(value));
596 }
597
598 // Position to write into
599 let pos = tail.pos;
600 let rem = tail.rx_cnt;
601 let idx = (pos & self.shared.mask as u64) as usize;
602
603 // Update the tail position
604 tail.pos = tail.pos.wrapping_add(1);
605
606 // Get the slot
607 let mut slot = self.shared.buffer[idx].write().unwrap();
608
609 // Track the position
610 slot.pos = pos;
611
612 // Set remaining receivers
613 slot.rem.with_mut(|v| *v = rem);
614
615 // Set the closed bit if the value is `None`; otherwise write the value
616 if value.is_none() {
617 tail.closed = true;
618 slot.closed = true;
619 } else {
620 slot.val.with_mut(|ptr| unsafe { *ptr = value });
621 }
622
623 // Release the slot lock before notifying the receivers.
624 drop(slot);
625
626 tail.notify_rx();
627
628 // Release the mutex. This must happen after the slot lock is released,
629 // otherwise the writer lock bit could be cleared while another thread
630 // is in the critical section.
631 drop(tail);
632
633 Ok(rem)
634 }
635 }
636
637 impl Tail {
notify_rx(&mut self)638 fn notify_rx(&mut self) {
639 while let Some(mut waiter) = self.waiters.pop_back() {
640 // Safety: `waiters` lock is still held.
641 let waiter = unsafe { waiter.as_mut() };
642
643 assert!(waiter.queued);
644 waiter.queued = false;
645
646 let waker = waiter.waker.take().unwrap();
647 waker.wake();
648 }
649 }
650 }
651
652 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>653 fn clone(&self) -> Sender<T> {
654 let shared = self.shared.clone();
655 shared.num_tx.fetch_add(1, SeqCst);
656
657 Sender { shared }
658 }
659 }
660
661 impl<T> Drop for Sender<T> {
drop(&mut self)662 fn drop(&mut self) {
663 if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
664 let _ = self.send2(None);
665 }
666 }
667 }
668
669 impl<T> Receiver<T> {
670 /// Locks the next value if there is one.
recv_ref( &mut self, waiter: Option<(&UnsafeCell<Waiter>, &Waker)>, ) -> Result<RecvGuard<'_, T>, TryRecvError>671 fn recv_ref(
672 &mut self,
673 waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
674 ) -> Result<RecvGuard<'_, T>, TryRecvError> {
675 let idx = (self.next & self.shared.mask as u64) as usize;
676
677 // The slot holding the next value to read
678 let mut slot = self.shared.buffer[idx].read().unwrap();
679
680 if slot.pos != self.next {
681 let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
682
683 // The receiver has read all current values in the channel and there
684 // is no waiter to register
685 if waiter.is_none() && next_pos == self.next {
686 return Err(TryRecvError::Empty);
687 }
688
689 // Release the `slot` lock before attempting to acquire the `tail`
690 // lock. This is required because `send2` acquires the tail lock
691 // first followed by the slot lock. Acquiring the locks in reverse
692 // order here would result in a potential deadlock: `recv_ref`
693 // acquires the `slot` lock and attempts to acquire the `tail` lock
694 // while `send2` acquired the `tail` lock and attempts to acquire
695 // the slot lock.
696 drop(slot);
697
698 let mut tail = self.shared.tail.lock().unwrap();
699
700 // Acquire slot lock again
701 slot = self.shared.buffer[idx].read().unwrap();
702
703 // Make sure the position did not change. This could happen in the
704 // unlikely event that the buffer is wrapped between dropping the
705 // read lock and acquiring the tail lock.
706 if slot.pos != self.next {
707 let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
708
709 if next_pos == self.next {
710 // Store the waker
711 if let Some((waiter, waker)) = waiter {
712 // Safety: called while locked.
713 unsafe {
714 // Only queue if not already queued
715 waiter.with_mut(|ptr| {
716 // If there is no waker **or** if the currently
717 // stored waker references a **different** task,
718 // track the tasks' waker to be notified on
719 // receipt of a new value.
720 match (*ptr).waker {
721 Some(ref w) if w.will_wake(waker) => {}
722 _ => {
723 (*ptr).waker = Some(waker.clone());
724 }
725 }
726
727 if !(*ptr).queued {
728 (*ptr).queued = true;
729 tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
730 }
731 });
732 }
733 }
734
735 return Err(TryRecvError::Empty);
736 }
737
738 // At this point, the receiver has lagged behind the sender by
739 // more than the channel capacity. The receiver will attempt to
740 // catch up by skipping dropped messages and setting the
741 // internal cursor to the **oldest** message stored by the
742 // channel.
743 //
744 // However, finding the oldest position is a bit more
745 // complicated than `tail-position - buffer-size`. When
746 // the channel is closed, the tail position is incremented to
747 // signal a new `None` message, but `None` is not stored in the
748 // channel itself (see issue #2425 for why).
749 //
750 // To account for this, if the channel is closed, the tail
751 // position is decremented by `buffer-size + 1`.
752 let mut adjust = 0;
753 if tail.closed {
754 adjust = 1
755 }
756 let next = tail
757 .pos
758 .wrapping_sub(self.shared.buffer.len() as u64 + adjust);
759
760 let missed = next.wrapping_sub(self.next);
761
762 drop(tail);
763
764 // The receiver is slow but no values have been missed
765 if missed == 0 {
766 self.next = self.next.wrapping_add(1);
767
768 return Ok(RecvGuard { slot });
769 }
770
771 self.next = next;
772
773 return Err(TryRecvError::Lagged(missed));
774 }
775 }
776
777 self.next = self.next.wrapping_add(1);
778
779 if slot.closed {
780 return Err(TryRecvError::Closed);
781 }
782
783 Ok(RecvGuard { slot })
784 }
785 }
786
787 impl<T> Receiver<T>
788 where
789 T: Clone,
790 {
791 /// Attempts to return a pending value on this receiver without awaiting.
792 ///
793 /// This is useful for a flavor of "optimistic check" before deciding to
794 /// await on a receiver.
795 ///
796 /// Compared with [`recv`], this function has three failure cases instead of one
797 /// (one for closed, one for an empty buffer, one for a lagging receiver).
798 ///
799 /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
800 /// dropped, indicating that no further values can be sent on the channel.
801 ///
802 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
803 /// sent values will overwrite old values. At this point, a call to [`recv`]
804 /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
805 /// internal cursor is updated to point to the oldest value still held by
806 /// the channel. A subsequent call to [`try_recv`] will return this value
807 /// **unless** it has been since overwritten. If there are no values to
808 /// receive, `Err(TryRecvError::Empty)` is returned.
809 ///
810 /// [`recv`]: crate::sync::broadcast::Receiver::recv
811 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
812 /// [`Receiver`]: crate::sync::broadcast::Receiver
813 ///
814 /// # Examples
815 ///
816 /// ```
817 /// use tokio::sync::broadcast;
818 ///
819 /// #[tokio::main]
820 /// async fn main() {
821 /// let (tx, mut rx) = broadcast::channel(16);
822 ///
823 /// assert!(rx.try_recv().is_err());
824 ///
825 /// tx.send(10).unwrap();
826 ///
827 /// let value = rx.try_recv().unwrap();
828 /// assert_eq!(10, value);
829 /// }
830 /// ```
try_recv(&mut self) -> Result<T, TryRecvError>831 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
832 let guard = self.recv_ref(None)?;
833 guard.clone_value().ok_or(TryRecvError::Closed)
834 }
835
836 #[doc(hidden)]
837 #[deprecated(since = "0.2.21", note = "use async fn recv()")]
poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>838 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
839 use Poll::{Pending, Ready};
840
841 // The borrow checker prohibits calling `self.poll_ref` while passing in
842 // a mutable ref to a field (as it should). To work around this,
843 // `waiter` is first *removed* from `self` then `poll_recv` is called.
844 //
845 // However, for safety, we must ensure that `waiter` is **not** dropped.
846 // It could be contained in the intrusive linked list. The `Receiver`
847 // drop implementation handles cleanup.
848 //
849 // The guard pattern is used to ensure that, on return, even due to
850 // panic, the waiter node is replaced on `self`.
851
852 struct Guard<'a, T> {
853 waiter: Option<Pin<Box<UnsafeCell<Waiter>>>>,
854 receiver: &'a mut Receiver<T>,
855 }
856
857 impl<'a, T> Drop for Guard<'a, T> {
858 fn drop(&mut self) {
859 self.receiver.waiter = self.waiter.take();
860 }
861 }
862
863 let waiter = self.waiter.take().or_else(|| {
864 Some(Box::pin(UnsafeCell::new(Waiter {
865 queued: false,
866 waker: None,
867 pointers: linked_list::Pointers::new(),
868 _p: PhantomPinned,
869 })))
870 });
871
872 let guard = Guard {
873 waiter,
874 receiver: self,
875 };
876 let res = guard
877 .receiver
878 .recv_ref(Some((&guard.waiter.as_ref().unwrap(), cx.waker())));
879
880 match res {
881 Ok(guard) => Ready(guard.clone_value().ok_or(RecvError::Closed)),
882 Err(TryRecvError::Closed) => Ready(Err(RecvError::Closed)),
883 Err(TryRecvError::Lagged(n)) => Ready(Err(RecvError::Lagged(n))),
884 Err(TryRecvError::Empty) => Pending,
885 }
886 }
887
888 /// Receives the next value for this receiver.
889 ///
890 /// Each [`Receiver`] handle will receive a clone of all values sent
891 /// **after** it has subscribed.
892 ///
893 /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
894 /// dropped, indicating that no further values can be sent on the channel.
895 ///
896 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
897 /// sent values will overwrite old values. At this point, a call to [`recv`]
898 /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
899 /// internal cursor is updated to point to the oldest value still held by
900 /// the channel. A subsequent call to [`recv`] will return this value
901 /// **unless** it has been since overwritten.
902 ///
903 /// [`Receiver`]: crate::sync::broadcast::Receiver
904 /// [`recv`]: crate::sync::broadcast::Receiver::recv
905 ///
906 /// # Examples
907 ///
908 /// ```
909 /// use tokio::sync::broadcast;
910 ///
911 /// #[tokio::main]
912 /// async fn main() {
913 /// let (tx, mut rx1) = broadcast::channel(16);
914 /// let mut rx2 = tx.subscribe();
915 ///
916 /// tokio::spawn(async move {
917 /// assert_eq!(rx1.recv().await.unwrap(), 10);
918 /// assert_eq!(rx1.recv().await.unwrap(), 20);
919 /// });
920 ///
921 /// tokio::spawn(async move {
922 /// assert_eq!(rx2.recv().await.unwrap(), 10);
923 /// assert_eq!(rx2.recv().await.unwrap(), 20);
924 /// });
925 ///
926 /// tx.send(10).unwrap();
927 /// tx.send(20).unwrap();
928 /// }
929 /// ```
930 ///
931 /// Handling lag
932 ///
933 /// ```
934 /// use tokio::sync::broadcast;
935 ///
936 /// #[tokio::main]
937 /// async fn main() {
938 /// let (tx, mut rx) = broadcast::channel(2);
939 ///
940 /// tx.send(10).unwrap();
941 /// tx.send(20).unwrap();
942 /// tx.send(30).unwrap();
943 ///
944 /// // The receiver lagged behind
945 /// assert!(rx.recv().await.is_err());
946 ///
947 /// // At this point, we can abort or continue with lost messages
948 ///
949 /// assert_eq!(20, rx.recv().await.unwrap());
950 /// assert_eq!(30, rx.recv().await.unwrap());
951 /// }
recv(&mut self) -> Result<T, RecvError>952 pub async fn recv(&mut self) -> Result<T, RecvError> {
953 let fut = Recv::<_, T>::new(Borrow(self));
954 fut.await
955 }
956 }
957
958 #[cfg(feature = "stream")]
959 #[doc(hidden)]
960 impl<T> crate::stream::Stream for Receiver<T>
961 where
962 T: Clone,
963 {
964 type Item = Result<T, RecvError>;
965
poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<T, RecvError>>>966 fn poll_next(
967 mut self: std::pin::Pin<&mut Self>,
968 cx: &mut Context<'_>,
969 ) -> Poll<Option<Result<T, RecvError>>> {
970 #[allow(deprecated)]
971 self.poll_recv(cx).map(|v| match v {
972 Ok(v) => Some(Ok(v)),
973 lag @ Err(RecvError::Lagged(_)) => Some(lag),
974 Err(RecvError::Closed) => None,
975 })
976 }
977 }
978
979 impl<T> Drop for Receiver<T> {
drop(&mut self)980 fn drop(&mut self) {
981 let mut tail = self.shared.tail.lock().unwrap();
982
983 if let Some(waiter) = &self.waiter {
984 // safety: tail lock is held
985 let queued = waiter.with(|ptr| unsafe { (*ptr).queued });
986
987 if queued {
988 // Remove the node
989 //
990 // safety: tail lock is held and the wait node is verified to be in
991 // the list.
992 unsafe {
993 waiter.with_mut(|ptr| {
994 tail.waiters.remove((&mut *ptr).into());
995 });
996 }
997 }
998 }
999
1000 tail.rx_cnt -= 1;
1001 let until = tail.pos;
1002
1003 drop(tail);
1004
1005 while self.next != until {
1006 match self.recv_ref(None) {
1007 Ok(_) => {}
1008 // The channel is closed
1009 Err(TryRecvError::Closed) => break,
1010 // Ignore lagging, we will catch up
1011 Err(TryRecvError::Lagged(..)) => {}
1012 // Can't be empty
1013 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1014 }
1015 }
1016 }
1017 }
1018
1019 impl<R, T> Recv<R, T>
1020 where
1021 R: AsMut<Receiver<T>>,
1022 {
new(receiver: R) -> Recv<R, T>1023 fn new(receiver: R) -> Recv<R, T> {
1024 Recv {
1025 receiver,
1026 waiter: UnsafeCell::new(Waiter {
1027 queued: false,
1028 waker: None,
1029 pointers: linked_list::Pointers::new(),
1030 _p: PhantomPinned,
1031 }),
1032 _p: std::marker::PhantomData,
1033 }
1034 }
1035
1036 /// A custom `project` implementation is used in place of `pin-project-lite`
1037 /// as a custom drop implementation is needed.
project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>)1038 fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1039 unsafe {
1040 // Safety: Receiver is Unpin
1041 is_unpin::<&mut Receiver<T>>();
1042
1043 let me = self.get_unchecked_mut();
1044 (me.receiver.as_mut(), &me.waiter)
1045 }
1046 }
1047 }
1048
1049 impl<R, T> Future for Recv<R, T>
1050 where
1051 R: AsMut<Receiver<T>>,
1052 T: Clone,
1053 {
1054 type Output = Result<T, RecvError>;
1055
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>1056 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1057 let (receiver, waiter) = self.project();
1058
1059 let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1060 Ok(value) => value,
1061 Err(TryRecvError::Empty) => return Poll::Pending,
1062 Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1063 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1064 };
1065
1066 Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1067 }
1068 }
1069
1070 cfg_stream! {
1071 use futures_core::Stream;
1072
1073 impl<T: Clone> Receiver<T> {
1074 /// Convert the receiver into a `Stream`.
1075 ///
1076 /// The conversion allows using `Receiver` with APIs that require stream
1077 /// values.
1078 ///
1079 /// # Examples
1080 ///
1081 /// ```
1082 /// use tokio::stream::StreamExt;
1083 /// use tokio::sync::broadcast;
1084 ///
1085 /// #[tokio::main]
1086 /// async fn main() {
1087 /// let (tx, rx) = broadcast::channel(128);
1088 ///
1089 /// tokio::spawn(async move {
1090 /// for i in 0..10_i32 {
1091 /// tx.send(i).unwrap();
1092 /// }
1093 /// });
1094 ///
1095 /// // Streams must be pinned to iterate.
1096 /// tokio::pin! {
1097 /// let stream = rx
1098 /// .into_stream()
1099 /// .filter(Result::is_ok)
1100 /// .map(Result::unwrap)
1101 /// .filter(|v| v % 2 == 0)
1102 /// .map(|v| v + 1);
1103 /// }
1104 ///
1105 /// while let Some(i) = stream.next().await {
1106 /// println!("{}", i);
1107 /// }
1108 /// }
1109 /// ```
1110 pub fn into_stream(self) -> impl Stream<Item = Result<T, RecvError>> {
1111 Recv::new(Borrow(self))
1112 }
1113 }
1114
1115 impl<R, T: Clone> Stream for Recv<R, T>
1116 where
1117 R: AsMut<Receiver<T>>,
1118 T: Clone,
1119 {
1120 type Item = Result<T, RecvError>;
1121
1122 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1123 let (receiver, waiter) = self.project();
1124
1125 let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1126 Ok(value) => value,
1127 Err(TryRecvError::Empty) => return Poll::Pending,
1128 Err(TryRecvError::Lagged(n)) => return Poll::Ready(Some(Err(RecvError::Lagged(n)))),
1129 Err(TryRecvError::Closed) => return Poll::Ready(None),
1130 };
1131
1132 Poll::Ready(guard.clone_value().map(Ok))
1133 }
1134 }
1135 }
1136
1137 impl<R, T> Drop for Recv<R, T>
1138 where
1139 R: AsMut<Receiver<T>>,
1140 {
drop(&mut self)1141 fn drop(&mut self) {
1142 // Acquire the tail lock. This is required for safety before accessing
1143 // the waiter node.
1144 let mut tail = self.receiver.as_mut().shared.tail.lock().unwrap();
1145
1146 // safety: tail lock is held
1147 let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued });
1148
1149 if queued {
1150 // Remove the node
1151 //
1152 // safety: tail lock is held and the wait node is verified to be in
1153 // the list.
1154 unsafe {
1155 self.waiter.with_mut(|ptr| {
1156 tail.waiters.remove((&mut *ptr).into());
1157 });
1158 }
1159 }
1160 }
1161 }
1162
1163 /// # Safety
1164 ///
1165 /// `Waiter` is forced to be !Unpin.
1166 unsafe impl linked_list::Link for Waiter {
1167 type Handle = NonNull<Waiter>;
1168 type Target = Waiter;
1169
as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>1170 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1171 *handle
1172 }
1173
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>1174 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1175 ptr
1176 }
1177
pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>1178 unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1179 NonNull::from(&mut target.as_mut().pointers)
1180 }
1181 }
1182
1183 impl<T> fmt::Debug for Sender<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1184 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1185 write!(fmt, "broadcast::Sender")
1186 }
1187 }
1188
1189 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1190 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1191 write!(fmt, "broadcast::Receiver")
1192 }
1193 }
1194
1195 impl<'a, T> RecvGuard<'a, T> {
clone_value(&self) -> Option<T> where T: Clone,1196 fn clone_value(&self) -> Option<T>
1197 where
1198 T: Clone,
1199 {
1200 self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
1201 }
1202 }
1203
1204 impl<'a, T> Drop for RecvGuard<'a, T> {
drop(&mut self)1205 fn drop(&mut self) {
1206 // Decrement the remaining counter
1207 if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1208 // Safety: Last receiver, drop the value
1209 self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
1210 }
1211 }
1212 }
1213
1214 impl fmt::Display for RecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1215 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1216 match self {
1217 RecvError::Closed => write!(f, "channel closed"),
1218 RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
1219 }
1220 }
1221 }
1222
1223 impl std::error::Error for RecvError {}
1224
1225 impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1226 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1227 match self {
1228 TryRecvError::Empty => write!(f, "channel empty"),
1229 TryRecvError::Closed => write!(f, "channel closed"),
1230 TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
1231 }
1232 }
1233 }
1234
1235 impl std::error::Error for TryRecvError {}
1236
is_unpin<T: Unpin>()1237 fn is_unpin<T: Unpin>() {}
1238