1 //! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2 //! all consumers.
3 //!
4 //! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5 //! values. [`Sender`] handles are clone-able, allowing concurrent send and
6 //! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7 //! long as `T` is also `Send` or `Sync` respectively.
8 //!
9 //! When a value is sent, **all** [`Receiver`] handles are notified and will
10 //! receive the value. The value is stored once inside the channel and cloned on
11 //! demand for each receiver. Once all receivers have received a clone of the
12 //! value, the value is released from the channel.
13 //!
14 //! A channel is created by calling [`channel`], specifying the maximum number
15 //! of messages the channel can retain at any given time.
16 //!
17 //! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18 //! returned [`Receiver`] will receive values sent **after** the call to
19 //! `subscribe`.
20 //!
21 //! ## Lagging
22 //!
23 //! As sent messages must be retained until **all** [`Receiver`] handles receive
24 //! a clone, broadcast channels are suspectible to the "slow receiver" problem.
25 //! In this case, all but one receiver are able to receive values at the rate
26 //! they are sent. Because one receiver is stalled, the channel starts to fill
27 //! up.
28 //!
29 //! This broadcast channel implementation handles this case by setting a hard
30 //! upper bound on the number of values the channel may retain at any given
31 //! time. This upper bound is passed to the [`channel`] function as an argument.
32 //!
33 //! If a value is sent when the channel is at capacity, the oldest value
34 //! currently held by the channel is released. This frees up space for the new
35 //! value. Any receiver that has not yet seen the released value will return
36 //! [`RecvError::Lagged`] the next time [`recv`] is called.
37 //!
38 //! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
39 //! updated to the oldest value contained by the channel. The next call to
40 //! [`recv`] will return this value.
41 //!
42 //! This behavior enables a receiver to detect when it has lagged so far behind
43 //! that data has been dropped. The caller may decide how to respond to this:
44 //! either by aborting its task or by tolerating lost messages and resuming
45 //! consumption of the channel.
46 //!
47 //! ## Closing
48 //!
49 //! When **all** [`Sender`] handles have been dropped, no new values may be
50 //! sent. At this point, the channel is "closed". Once a receiver has received
51 //! all values retained by the channel, the next call to [`recv`] will return
52 //! with [`RecvError::Closed`].
53 //!
54 //! [`Sender`]: crate::sync::broadcast::Sender
55 //! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
56 //! [`Receiver`]: crate::sync::broadcast::Receiver
57 //! [`channel`]: crate::sync::broadcast::channel
58 //! [`RecvError::Lagged`]: crate::sync::broadcast::RecvError::Lagged
59 //! [`RecvError::Closed`]: crate::sync::broadcast::RecvError::Closed
60 //! [`recv`]: crate::sync::broadcast::Receiver::recv
61 //!
62 //! # Examples
63 //!
64 //! Basic usage
65 //!
66 //! ```
67 //! use tokio::sync::broadcast;
68 //!
69 //! #[tokio::main]
70 //! async fn main() {
71 //! let (tx, mut rx1) = broadcast::channel(16);
72 //! let mut rx2 = tx.subscribe();
73 //!
74 //! tokio::spawn(async move {
75 //! assert_eq!(rx1.recv().await.unwrap(), 10);
76 //! assert_eq!(rx1.recv().await.unwrap(), 20);
77 //! });
78 //!
79 //! tokio::spawn(async move {
80 //! assert_eq!(rx2.recv().await.unwrap(), 10);
81 //! assert_eq!(rx2.recv().await.unwrap(), 20);
82 //! });
83 //!
84 //! tx.send(10).unwrap();
85 //! tx.send(20).unwrap();
86 //! }
87 //! ```
88 //!
89 //! Handling lag
90 //!
91 //! ```
92 //! use tokio::sync::broadcast;
93 //!
94 //! #[tokio::main]
95 //! async fn main() {
96 //! let (tx, mut rx) = broadcast::channel(2);
97 //!
98 //! tx.send(10).unwrap();
99 //! tx.send(20).unwrap();
100 //! tx.send(30).unwrap();
101 //!
102 //! // The receiver lagged behind
103 //! assert!(rx.recv().await.is_err());
104 //!
105 //! // At this point, we can abort or continue with lost messages
106 //!
107 //! assert_eq!(20, rx.recv().await.unwrap());
108 //! assert_eq!(30, rx.recv().await.unwrap());
109 //! }
110
111 use crate::loom::cell::UnsafeCell;
112 use crate::loom::future::AtomicWaker;
113 use crate::loom::sync::atomic::{spin_loop_hint, AtomicBool, AtomicPtr, AtomicUsize};
114 use crate::loom::sync::{Arc, Condvar, Mutex};
115
116 use std::fmt;
117 use std::mem;
118 use std::ptr;
119 use std::sync::atomic::Ordering::SeqCst;
120 use std::task::{Context, Poll, Waker};
121 use std::usize;
122
123 /// Sending-half of the [`broadcast`] channel.
124 ///
125 /// May be used from many threads. Messages can be sent with
126 /// [`send`][Sender::send].
127 ///
128 /// # Examples
129 ///
130 /// ```
131 /// use tokio::sync::broadcast;
132 ///
133 /// #[tokio::main]
134 /// async fn main() {
135 /// let (tx, mut rx1) = broadcast::channel(16);
136 /// let mut rx2 = tx.subscribe();
137 ///
138 /// tokio::spawn(async move {
139 /// assert_eq!(rx1.recv().await.unwrap(), 10);
140 /// assert_eq!(rx1.recv().await.unwrap(), 20);
141 /// });
142 ///
143 /// tokio::spawn(async move {
144 /// assert_eq!(rx2.recv().await.unwrap(), 10);
145 /// assert_eq!(rx2.recv().await.unwrap(), 20);
146 /// });
147 ///
148 /// tx.send(10).unwrap();
149 /// tx.send(20).unwrap();
150 /// }
151 /// ```
152 ///
153 /// [`broadcast`]: crate::sync::broadcast
154 pub struct Sender<T> {
155 shared: Arc<Shared<T>>,
156 }
157
158 /// Receiving-half of the [`broadcast`] channel.
159 ///
160 /// Must not be used concurrently. Messages may be retrieved using
161 /// [`recv`][Receiver::recv].
162 ///
163 /// # Examples
164 ///
165 /// ```
166 /// use tokio::sync::broadcast;
167 ///
168 /// #[tokio::main]
169 /// async fn main() {
170 /// let (tx, mut rx1) = broadcast::channel(16);
171 /// let mut rx2 = tx.subscribe();
172 ///
173 /// tokio::spawn(async move {
174 /// assert_eq!(rx1.recv().await.unwrap(), 10);
175 /// assert_eq!(rx1.recv().await.unwrap(), 20);
176 /// });
177 ///
178 /// tokio::spawn(async move {
179 /// assert_eq!(rx2.recv().await.unwrap(), 10);
180 /// assert_eq!(rx2.recv().await.unwrap(), 20);
181 /// });
182 ///
183 /// tx.send(10).unwrap();
184 /// tx.send(20).unwrap();
185 /// }
186 /// ```
187 ///
188 /// [`broadcast`]: crate::sync::broadcast
189 pub struct Receiver<T> {
190 /// State shared with all receivers and senders.
191 shared: Arc<Shared<T>>,
192
193 /// Next position to read from
194 next: u64,
195
196 /// Waiter state
197 wait: Arc<WaitNode>,
198 }
199
200 /// Error returned by [`Sender::send`][Sender::send].
201 ///
202 /// A **send** operation can only fail if there are no active receivers,
203 /// implying that the message could never be received. The error contains the
204 /// message being sent as a payload so it can be recovered.
205 #[derive(Debug)]
206 pub struct SendError<T>(pub T);
207
208 /// An error returned from the [`recv`] function on a [`Receiver`].
209 ///
210 /// [`recv`]: crate::sync::broadcast::Receiver::recv
211 /// [`Receiver`]: crate::sync::broadcast::Receiver
212 #[derive(Debug, PartialEq)]
213 pub enum RecvError {
214 /// There are no more active senders implying no further messages will ever
215 /// be sent.
216 Closed,
217
218 /// The receiver lagged too far behind. Attempting to receive again will
219 /// return the oldest message still retained by the channel.
220 ///
221 /// Includes the number of skipped messages.
222 Lagged(u64),
223 }
224
225 /// An error returned from the [`try_recv`] function on a [`Receiver`].
226 ///
227 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
228 /// [`Receiver`]: crate::sync::broadcast::Receiver
229 #[derive(Debug, PartialEq)]
230 pub enum TryRecvError {
231 /// The channel is currently empty. There are still active
232 /// [`Sender`][Sender] handles, so data may yet become available.
233 Empty,
234
235 /// There are no more active senders implying no further messages will ever
236 /// be sent.
237 Closed,
238
239 /// The receiver lagged too far behind and has been forcibly disconnected.
240 /// Attempting to receive again will return the oldest message still
241 /// retained by the channel.
242 ///
243 /// Includes the number of skipped messages.
244 Lagged(u64),
245 }
246
247 /// Data shared between senders and receivers
248 struct Shared<T> {
249 /// slots in the channel
250 buffer: Box<[Slot<T>]>,
251
252 /// Mask a position -> index
253 mask: usize,
254
255 /// Tail of the queue
256 tail: Mutex<Tail>,
257
258 /// Notifies a sender that the slot is unlocked
259 condvar: Condvar,
260
261 /// Stack of pending waiters
262 wait_stack: AtomicPtr<WaitNode>,
263
264 /// Number of outstanding Sender handles
265 num_tx: AtomicUsize,
266 }
267
268 /// Next position to write a value
269 struct Tail {
270 /// Next position to write to
271 pos: u64,
272
273 /// Number of active receivers
274 rx_cnt: usize,
275 }
276
277 /// Slot in the buffer
278 struct Slot<T> {
279 /// Remaining number of receivers that are expected to see this value.
280 ///
281 /// When this goes to zero, the value is released.
282 rem: AtomicUsize,
283
284 /// Used to lock the `write` field.
285 lock: AtomicUsize,
286
287 /// The value being broadcast
288 ///
289 /// Synchronized by `state`
290 write: Write<T>,
291 }
292
293 /// A write in the buffer
294 struct Write<T> {
295 /// Uniquely identifies this write
296 pos: UnsafeCell<u64>,
297
298 /// The written value
299 val: UnsafeCell<Option<T>>,
300 }
301
302 /// Tracks a waiting receiver
303 #[derive(Debug)]
304 struct WaitNode {
305 /// `true` if queued
306 queued: AtomicBool,
307
308 /// Task to wake when a permit is made available.
309 waker: AtomicWaker,
310
311 /// Next pointer in the stack of waiting senders.
312 next: UnsafeCell<*const WaitNode>,
313 }
314
315 struct RecvGuard<'a, T> {
316 slot: &'a Slot<T>,
317 tail: &'a Mutex<Tail>,
318 condvar: &'a Condvar,
319 }
320
321 /// Max number of receivers. Reserve space to lock.
322 const MAX_RECEIVERS: usize = usize::MAX >> 1;
323
324 /// Create a bounded, multi-producer, multi-consumer channel where each sent
325 /// value is broadcasted to all active receivers.
326 ///
327 /// All data sent on [`Sender`] will become available on every active
328 /// [`Receiver`] in the same order as it was sent.
329 ///
330 /// The `Sender` can be cloned to `send` to the same channel from multiple
331 /// points in the process or it can be used concurrently from an `Arc`. New
332 /// `Receiver` handles are created by calling [`Sender::subscribe`].
333 ///
334 /// If all [`Receiver`] handles are dropped, the `send` method will return a
335 /// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
336 /// method will return a [`RecvError`].
337 ///
338 /// [`Sender`]: crate::sync::broadcast::Sender
339 /// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
340 /// [`Receiver`]: crate::sync::broadcast::Receiver
341 /// [`recv`]: crate::sync::broadcast::Receiver::recv
342 /// [`SendError`]: crate::sync::broadcast::SendError
343 /// [`RecvError`]: crate::sync::broadcast::RecvError
344 ///
345 /// # Examples
346 ///
347 /// ```
348 /// use tokio::sync::broadcast;
349 ///
350 /// #[tokio::main]
351 /// async fn main() {
352 /// let (tx, mut rx1) = broadcast::channel(16);
353 /// let mut rx2 = tx.subscribe();
354 ///
355 /// tokio::spawn(async move {
356 /// assert_eq!(rx1.recv().await.unwrap(), 10);
357 /// assert_eq!(rx1.recv().await.unwrap(), 20);
358 /// });
359 ///
360 /// tokio::spawn(async move {
361 /// assert_eq!(rx2.recv().await.unwrap(), 10);
362 /// assert_eq!(rx2.recv().await.unwrap(), 20);
363 /// });
364 ///
365 /// tx.send(10).unwrap();
366 /// tx.send(20).unwrap();
367 /// }
368 /// ```
channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>)369 pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
370 assert!(capacity > 0, "capacity is empty");
371 assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
372
373 // Round to a power of two
374 capacity = capacity.next_power_of_two();
375
376 let mut buffer = Vec::with_capacity(capacity);
377
378 for i in 0..capacity {
379 buffer.push(Slot {
380 rem: AtomicUsize::new(0),
381 lock: AtomicUsize::new(0),
382 write: Write {
383 pos: UnsafeCell::new((i as u64).wrapping_sub(capacity as u64)),
384 val: UnsafeCell::new(None),
385 },
386 });
387 }
388
389 let shared = Arc::new(Shared {
390 buffer: buffer.into_boxed_slice(),
391 mask: capacity - 1,
392 tail: Mutex::new(Tail { pos: 0, rx_cnt: 1 }),
393 condvar: Condvar::new(),
394 wait_stack: AtomicPtr::new(ptr::null_mut()),
395 num_tx: AtomicUsize::new(1),
396 });
397
398 let rx = Receiver {
399 shared: shared.clone(),
400 next: 0,
401 wait: Arc::new(WaitNode {
402 queued: AtomicBool::new(false),
403 waker: AtomicWaker::new(),
404 next: UnsafeCell::new(ptr::null()),
405 }),
406 };
407
408 let tx = Sender { shared };
409
410 (tx, rx)
411 }
412
413 unsafe impl<T: Send> Send for Sender<T> {}
414 unsafe impl<T: Send> Sync for Sender<T> {}
415
416 unsafe impl<T: Send> Send for Receiver<T> {}
417 unsafe impl<T: Send> Sync for Receiver<T> {}
418
419 impl<T> Sender<T> {
420 /// Attempts to send a value to all active [`Receiver`] handles, returning
421 /// it back if it could not be sent.
422 ///
423 /// A successful send occurs when there is at least one active [`Receiver`]
424 /// handle. An unsuccessful send would be one where all associated
425 /// [`Receiver`] handles have already been dropped.
426 ///
427 /// # Return
428 ///
429 /// On success, the number of subscribed [`Receiver`] handles is returned.
430 /// This does not mean that this number of receivers will see the message as
431 /// a receiver may drop before receiving the message.
432 ///
433 /// # Note
434 ///
435 /// A return value of `Ok` **does not** mean that the sent value will be
436 /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
437 /// handles may be dropped before receiving the sent message.
438 ///
439 /// A return value of `Err` **does not** mean that future calls to `send`
440 /// will fail. New [`Receiver`] handles may be created by calling
441 /// [`subscribe`].
442 ///
443 /// [`Receiver`]: crate::sync::broadcast::Receiver
444 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
445 ///
446 /// # Examples
447 ///
448 /// ```
449 /// use tokio::sync::broadcast;
450 ///
451 /// #[tokio::main]
452 /// async fn main() {
453 /// let (tx, mut rx1) = broadcast::channel(16);
454 /// let mut rx2 = tx.subscribe();
455 ///
456 /// tokio::spawn(async move {
457 /// assert_eq!(rx1.recv().await.unwrap(), 10);
458 /// assert_eq!(rx1.recv().await.unwrap(), 20);
459 /// });
460 ///
461 /// tokio::spawn(async move {
462 /// assert_eq!(rx2.recv().await.unwrap(), 10);
463 /// assert_eq!(rx2.recv().await.unwrap(), 20);
464 /// });
465 ///
466 /// tx.send(10).unwrap();
467 /// tx.send(20).unwrap();
468 /// }
469 /// ```
send(&self, value: T) -> Result<usize, SendError<T>>470 pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
471 self.send2(Some(value))
472 .map_err(|SendError(maybe_v)| SendError(maybe_v.unwrap()))
473 }
474
475 /// Creates a new [`Receiver`] handle that will receive values sent **after**
476 /// this call to `subscribe`.
477 ///
478 /// # Examples
479 ///
480 /// ```
481 /// use tokio::sync::broadcast;
482 ///
483 /// #[tokio::main]
484 /// async fn main() {
485 /// let (tx, _rx) = broadcast::channel(16);
486 ///
487 /// // Will not be seen
488 /// tx.send(10).unwrap();
489 ///
490 /// let mut rx = tx.subscribe();
491 ///
492 /// tx.send(20).unwrap();
493 ///
494 /// let value = rx.recv().await.unwrap();
495 /// assert_eq!(20, value);
496 /// }
497 /// ```
subscribe(&self) -> Receiver<T>498 pub fn subscribe(&self) -> Receiver<T> {
499 let shared = self.shared.clone();
500
501 let mut tail = shared.tail.lock().unwrap();
502
503 if tail.rx_cnt == MAX_RECEIVERS {
504 panic!("max receivers");
505 }
506
507 tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
508 let next = tail.pos;
509
510 drop(tail);
511
512 Receiver {
513 shared,
514 next,
515 wait: Arc::new(WaitNode {
516 queued: AtomicBool::new(false),
517 waker: AtomicWaker::new(),
518 next: UnsafeCell::new(ptr::null()),
519 }),
520 }
521 }
522
523 /// Returns the number of active receivers
524 ///
525 /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
526 /// [`subscribe`]. These are the handles that will receive values sent on
527 /// this [`Sender`].
528 ///
529 /// # Note
530 ///
531 /// It is not guaranteed that a sent message will reach this number of
532 /// receivers. Active receivers may never call [`recv`] again before
533 /// dropping.
534 ///
535 /// [`recv`]: crate::sync::broadcast::Receiver::recv
536 /// [`Receiver`]: crate::sync::broadcast::Receiver
537 /// [`Sender`]: crate::sync::broadcast::Sender
538 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
539 /// [`channel`]: crate::sync::broadcast::channel
540 ///
541 /// # Examples
542 ///
543 /// ```
544 /// use tokio::sync::broadcast;
545 ///
546 /// #[tokio::main]
547 /// async fn main() {
548 /// let (tx, _rx1) = broadcast::channel(16);
549 ///
550 /// assert_eq!(1, tx.receiver_count());
551 ///
552 /// let mut _rx2 = tx.subscribe();
553 ///
554 /// assert_eq!(2, tx.receiver_count());
555 ///
556 /// tx.send(10).unwrap();
557 /// }
558 /// ```
receiver_count(&self) -> usize559 pub fn receiver_count(&self) -> usize {
560 let tail = self.shared.tail.lock().unwrap();
561 tail.rx_cnt
562 }
563
send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>>564 fn send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>> {
565 let mut tail = self.shared.tail.lock().unwrap();
566
567 if tail.rx_cnt == 0 {
568 return Err(SendError(value));
569 }
570
571 // Position to write into
572 let pos = tail.pos;
573 let rem = tail.rx_cnt;
574 let idx = (pos & self.shared.mask as u64) as usize;
575
576 // Update the tail position
577 tail.pos = tail.pos.wrapping_add(1);
578
579 // Get the slot
580 let slot = &self.shared.buffer[idx];
581
582 // Acquire the write lock
583 let mut prev = slot.lock.fetch_or(1, SeqCst);
584
585 while prev & !1 != 0 {
586 // Concurrent readers, we must go to sleep
587 tail = self.shared.condvar.wait(tail).unwrap();
588
589 prev = slot.lock.load(SeqCst);
590
591 if prev & 1 == 0 {
592 // The writer lock bit was cleared while this thread was
593 // sleeping. This can only happen if a newer write happened on
594 // this slot by another thread. Bail early as an optimization,
595 // there is nothing left to do.
596 return Ok(rem);
597 }
598 }
599
600 if tail.pos.wrapping_sub(pos) > self.shared.buffer.len() as u64 {
601 // There is a newer pending write to the same slot.
602 return Ok(rem);
603 }
604
605 // Slot lock acquired
606 slot.write.pos.with_mut(|ptr| unsafe { *ptr = pos });
607 slot.write.val.with_mut(|ptr| unsafe { *ptr = value });
608
609 // Set remaining receivers
610 slot.rem.store(rem, SeqCst);
611
612 // Release the slot lock
613 slot.lock.store(0, SeqCst);
614
615 // Release the mutex. This must happen after the slot lock is released,
616 // otherwise the writer lock bit could be cleared while another thread
617 // is in the critical section.
618 drop(tail);
619
620 // Notify waiting receivers
621 self.notify_rx();
622
623 Ok(rem)
624 }
625
notify_rx(&self)626 fn notify_rx(&self) {
627 let mut curr = self.shared.wait_stack.swap(ptr::null_mut(), SeqCst) as *const WaitNode;
628
629 while !curr.is_null() {
630 let waiter = unsafe { Arc::from_raw(curr) };
631
632 // Update `curr` before toggling `queued` and waking
633 curr = waiter.next.with(|ptr| unsafe { *ptr });
634
635 // Unset queued
636 waiter.queued.store(false, SeqCst);
637
638 // Wake
639 waiter.waker.wake();
640 }
641 }
642 }
643
644 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>645 fn clone(&self) -> Sender<T> {
646 let shared = self.shared.clone();
647 shared.num_tx.fetch_add(1, SeqCst);
648
649 Sender { shared }
650 }
651 }
652
653 impl<T> Drop for Sender<T> {
drop(&mut self)654 fn drop(&mut self) {
655 if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
656 let _ = self.send2(None);
657 }
658 }
659 }
660
661 impl<T> Receiver<T> {
662 /// Locks the next value if there is one.
663 ///
664 /// The caller is responsible for unlocking
recv_ref(&mut self, spin: bool) -> Result<RecvGuard<'_, T>, TryRecvError>665 fn recv_ref(&mut self, spin: bool) -> Result<RecvGuard<'_, T>, TryRecvError> {
666 let idx = (self.next & self.shared.mask as u64) as usize;
667
668 // The slot holding the next value to read
669 let slot = &self.shared.buffer[idx];
670
671 // Lock the slot
672 if !slot.try_rx_lock() {
673 if spin {
674 while !slot.try_rx_lock() {
675 spin_loop_hint();
676 }
677 } else {
678 return Err(TryRecvError::Empty);
679 }
680 }
681
682 let guard = RecvGuard {
683 slot,
684 tail: &self.shared.tail,
685 condvar: &self.shared.condvar,
686 };
687
688 if guard.pos() != self.next {
689 let pos = guard.pos();
690
691 guard.drop_no_rem_dec();
692
693 if pos.wrapping_add(self.shared.buffer.len() as u64) == self.next {
694 return Err(TryRecvError::Empty);
695 } else {
696 let tail = self.shared.tail.lock().unwrap();
697
698 // `tail.pos` points to the slot the **next** send writes to.
699 // Because a receiver is lagging, this slot also holds the
700 // oldest value. To make the positions match, we subtract the
701 // capacity.
702 let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
703 let missed = next.wrapping_sub(self.next);
704
705 self.next = next;
706
707 return Err(TryRecvError::Lagged(missed));
708 }
709 }
710
711 self.next = self.next.wrapping_add(1);
712
713 Ok(guard)
714 }
715 }
716
717 impl<T> Receiver<T>
718 where
719 T: Clone,
720 {
721 /// Attempts to return a pending value on this receiver without awaiting.
722 ///
723 /// This is useful for a flavor of "optimistic check" before deciding to
724 /// await on a receiver.
725 ///
726 /// Compared with [`recv`], this function has three failure cases instead of one
727 /// (one for closed, one for an empty buffer, one for a lagging receiver).
728 ///
729 /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
730 /// dropped, indicating that no further values can be sent on the channel.
731 ///
732 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
733 /// sent values will overwrite old values. At this point, a call to [`recv`]
734 /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
735 /// internal cursor is updated to point to the oldest value still held by
736 /// the channel. A subsequent call to [`try_recv`] will return this value
737 /// **unless** it has been since overwritten. If there are no values to
738 /// receive, `Err(TryRecvError::Empty)` is returned.
739 ///
740 /// [`recv`]: crate::sync::broadcast::Receiver::recv
741 /// [`Receiver`]: crate::sync::broadcast::Receiver
742 ///
743 /// # Examples
744 ///
745 /// ```
746 /// use tokio::sync::broadcast;
747 ///
748 /// #[tokio::main]
749 /// async fn main() {
750 /// let (tx, mut rx) = broadcast::channel(16);
751 ///
752 /// assert!(rx.try_recv().is_err());
753 ///
754 /// tx.send(10).unwrap();
755 ///
756 /// let value = rx.try_recv().unwrap();
757 /// assert_eq!(10, value);
758 /// }
759 /// ```
try_recv(&mut self) -> Result<T, TryRecvError>760 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
761 let guard = self.recv_ref(false)?;
762 guard.clone_value().ok_or(TryRecvError::Closed)
763 }
764
765 #[doc(hidden)] // TODO: document
poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>766 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
767 if let Some(value) = ok_empty(self.try_recv())? {
768 return Poll::Ready(Ok(value));
769 }
770
771 self.register_waker(cx.waker());
772
773 if let Some(value) = ok_empty(self.try_recv())? {
774 Poll::Ready(Ok(value))
775 } else {
776 Poll::Pending
777 }
778 }
779
780 /// Receives the next value for this receiver.
781 ///
782 /// Each [`Receiver`] handle will receive a clone of all values sent
783 /// **after** it has subscribed.
784 ///
785 /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
786 /// dropped, indicating that no further values can be sent on the channel.
787 ///
788 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
789 /// sent values will overwrite old values. At this point, a call to [`recv`]
790 /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
791 /// internal cursor is updated to point to the oldest value still held by
792 /// the channel. A subsequent call to [`recv`] will return this value
793 /// **unless** it has been since overwritten.
794 ///
795 /// [`Receiver`]: crate::sync::broadcast::Receiver
796 /// [`recv`]: crate::sync::broadcast::Receiver::recv
797 ///
798 /// # Examples
799 ///
800 /// ```
801 /// use tokio::sync::broadcast;
802 ///
803 /// #[tokio::main]
804 /// async fn main() {
805 /// let (tx, mut rx1) = broadcast::channel(16);
806 /// let mut rx2 = tx.subscribe();
807 ///
808 /// tokio::spawn(async move {
809 /// assert_eq!(rx1.recv().await.unwrap(), 10);
810 /// assert_eq!(rx1.recv().await.unwrap(), 20);
811 /// });
812 ///
813 /// tokio::spawn(async move {
814 /// assert_eq!(rx2.recv().await.unwrap(), 10);
815 /// assert_eq!(rx2.recv().await.unwrap(), 20);
816 /// });
817 ///
818 /// tx.send(10).unwrap();
819 /// tx.send(20).unwrap();
820 /// }
821 /// ```
822 ///
823 /// Handling lag
824 ///
825 /// ```
826 /// use tokio::sync::broadcast;
827 ///
828 /// #[tokio::main]
829 /// async fn main() {
830 /// let (tx, mut rx) = broadcast::channel(2);
831 ///
832 /// tx.send(10).unwrap();
833 /// tx.send(20).unwrap();
834 /// tx.send(30).unwrap();
835 ///
836 /// // The receiver lagged behind
837 /// assert!(rx.recv().await.is_err());
838 ///
839 /// // At this point, we can abort or continue with lost messages
840 ///
841 /// assert_eq!(20, rx.recv().await.unwrap());
842 /// assert_eq!(30, rx.recv().await.unwrap());
843 /// }
recv(&mut self) -> Result<T, RecvError>844 pub async fn recv(&mut self) -> Result<T, RecvError> {
845 use crate::future::poll_fn;
846
847 poll_fn(|cx| self.poll_recv(cx)).await
848 }
849
register_waker(&self, cx: &Waker)850 fn register_waker(&self, cx: &Waker) {
851 self.wait.waker.register_by_ref(cx);
852
853 if !self.wait.queued.load(SeqCst) {
854 // Set `queued` before queuing.
855 self.wait.queued.store(true, SeqCst);
856
857 let mut curr = self.shared.wait_stack.load(SeqCst);
858
859 // The ref count is decremented in `notify_rx` when all nodes are
860 // removed from the waiter stack.
861 let node = Arc::into_raw(self.wait.clone()) as *mut _;
862
863 loop {
864 // Safety: `queued == false` means the caller has exclusive
865 // access to `self.wait.next`.
866 self.wait.next.with_mut(|ptr| unsafe { *ptr = curr });
867
868 let res = self
869 .shared
870 .wait_stack
871 .compare_exchange(curr, node, SeqCst, SeqCst);
872
873 match res {
874 Ok(_) => return,
875 Err(actual) => curr = actual,
876 }
877 }
878 }
879 }
880 }
881
882 #[cfg(feature = "stream")]
883 impl<T> crate::stream::Stream for Receiver<T>
884 where
885 T: Clone,
886 {
887 type Item = Result<T, RecvError>;
888
poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<T, RecvError>>>889 fn poll_next(
890 mut self: std::pin::Pin<&mut Self>,
891 cx: &mut Context<'_>,
892 ) -> Poll<Option<Result<T, RecvError>>> {
893 self.poll_recv(cx).map(|v| match v {
894 Ok(v) => Some(Ok(v)),
895 lag @ Err(RecvError::Lagged(_)) => Some(lag),
896 Err(RecvError::Closed) => None,
897 })
898 }
899 }
900
901 impl<T> Drop for Receiver<T> {
drop(&mut self)902 fn drop(&mut self) {
903 let mut tail = self.shared.tail.lock().unwrap();
904
905 tail.rx_cnt -= 1;
906 let until = tail.pos;
907
908 drop(tail);
909
910 while self.next != until {
911 match self.recv_ref(true) {
912 // Ignore the value
913 Ok(_) => {}
914 // The channel is closed
915 Err(TryRecvError::Closed) => break,
916 // Ignore lagging, we will catch up
917 Err(TryRecvError::Lagged(..)) => {}
918 // Can't be empty
919 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
920 }
921 }
922 }
923 }
924
925 impl<T> Drop for Shared<T> {
drop(&mut self)926 fn drop(&mut self) {
927 // Clear the wait stack
928 let mut curr = self.wait_stack.with_mut(|ptr| *ptr as *const WaitNode);
929
930 while !curr.is_null() {
931 let waiter = unsafe { Arc::from_raw(curr) };
932 curr = waiter.next.with(|ptr| unsafe { *ptr });
933 }
934 }
935 }
936
937 impl<T> fmt::Debug for Sender<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result938 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
939 write!(fmt, "broadcast::Sender")
940 }
941 }
942
943 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result944 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
945 write!(fmt, "broadcast::Receiver")
946 }
947 }
948
949 impl<T> Slot<T> {
950 /// Tries to lock the slot for a receiver. If `false`, then a sender holds the
951 /// lock and the calling task will be notified once the sender has released
952 /// the lock.
try_rx_lock(&self) -> bool953 fn try_rx_lock(&self) -> bool {
954 let mut curr = self.lock.load(SeqCst);
955
956 loop {
957 if curr & 1 == 1 {
958 // Locked by sender
959 return false;
960 }
961
962 // Only increment (by 2) if the LSB "lock" bit is not set.
963 let res = self.lock.compare_exchange(curr, curr + 2, SeqCst, SeqCst);
964
965 match res {
966 Ok(_) => return true,
967 Err(actual) => curr = actual,
968 }
969 }
970 }
971
rx_unlock(&self, tail: &Mutex<Tail>, condvar: &Condvar, rem_dec: bool)972 fn rx_unlock(&self, tail: &Mutex<Tail>, condvar: &Condvar, rem_dec: bool) {
973 if rem_dec {
974 // Decrement the remaining counter
975 if 1 == self.rem.fetch_sub(1, SeqCst) {
976 // Last receiver, drop the value
977 self.write.val.with_mut(|ptr| unsafe { *ptr = None });
978 }
979 }
980
981 if 1 == self.lock.fetch_sub(2, SeqCst) - 2 {
982 // First acquire the lock to make sure our sender is waiting on the
983 // condition variable, otherwise the notification could be lost.
984 mem::drop(tail.lock().unwrap());
985 // Wake up senders
986 condvar.notify_all();
987 }
988 }
989 }
990
991 impl<'a, T> RecvGuard<'a, T> {
pos(&self) -> u64992 fn pos(&self) -> u64 {
993 self.slot.write.pos.with(|ptr| unsafe { *ptr })
994 }
995
clone_value(&self) -> Option<T> where T: Clone,996 fn clone_value(&self) -> Option<T>
997 where
998 T: Clone,
999 {
1000 self.slot.write.val.with(|ptr| unsafe { (*ptr).clone() })
1001 }
1002
drop_no_rem_dec(self)1003 fn drop_no_rem_dec(self) {
1004 self.slot.rx_unlock(self.tail, self.condvar, false);
1005
1006 mem::forget(self);
1007 }
1008 }
1009
1010 impl<'a, T> Drop for RecvGuard<'a, T> {
drop(&mut self)1011 fn drop(&mut self) {
1012 self.slot.rx_unlock(self.tail, self.condvar, true)
1013 }
1014 }
1015
ok_empty<T>(res: Result<T, TryRecvError>) -> Result<Option<T>, RecvError>1016 fn ok_empty<T>(res: Result<T, TryRecvError>) -> Result<Option<T>, RecvError> {
1017 match res {
1018 Ok(value) => Ok(Some(value)),
1019 Err(TryRecvError::Empty) => Ok(None),
1020 Err(TryRecvError::Lagged(n)) => Err(RecvError::Lagged(n)),
1021 Err(TryRecvError::Closed) => Err(RecvError::Closed),
1022 }
1023 }
1024
1025 impl fmt::Display for RecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1026 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1027 match self {
1028 RecvError::Closed => write!(f, "channel closed"),
1029 RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
1030 }
1031 }
1032 }
1033
1034 impl std::error::Error for RecvError {}
1035
1036 impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1037 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1038 match self {
1039 TryRecvError::Empty => write!(f, "channel empty"),
1040 TryRecvError::Closed => write!(f, "channel closed"),
1041 TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
1042 }
1043 }
1044 }
1045
1046 impl std::error::Error for TryRecvError {}
1047