1 //! An SPSC broadcast channel.
2 //!
3 //! - The value can only be a `usize`.
4 //! - The consumer is only notified if the value is different.
5 //! - The value `0` is reserved for closed.
6
7 use futures_util::task::AtomicWaker;
8 use std::sync::{
9 atomic::{AtomicUsize, Ordering},
10 Arc,
11 };
12 use std::task;
13
14 type Value = usize;
15
16 pub(crate) const CLOSED: usize = 0;
17
channel(initial: Value) -> (Sender, Receiver)18 pub(crate) fn channel(initial: Value) -> (Sender, Receiver) {
19 debug_assert!(
20 initial != CLOSED,
21 "watch::channel initial state of 0 is reserved"
22 );
23
24 let shared = Arc::new(Shared {
25 value: AtomicUsize::new(initial),
26 waker: AtomicWaker::new(),
27 });
28
29 (
30 Sender {
31 shared: shared.clone(),
32 },
33 Receiver { shared },
34 )
35 }
36
37 pub(crate) struct Sender {
38 shared: Arc<Shared>,
39 }
40
41 pub(crate) struct Receiver {
42 shared: Arc<Shared>,
43 }
44
45 struct Shared {
46 value: AtomicUsize,
47 waker: AtomicWaker,
48 }
49
50 impl Sender {
send(&mut self, value: Value)51 pub(crate) fn send(&mut self, value: Value) {
52 if self.shared.value.swap(value, Ordering::SeqCst) != value {
53 self.shared.waker.wake();
54 }
55 }
56 }
57
58 impl Drop for Sender {
drop(&mut self)59 fn drop(&mut self) {
60 self.send(CLOSED);
61 }
62 }
63
64 impl Receiver {
load(&mut self, cx: &mut task::Context<'_>) -> Value65 pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value {
66 self.shared.waker.register(cx.waker());
67 self.shared.value.load(Ordering::SeqCst)
68 }
69
peek(&self) -> Value70 pub(crate) fn peek(&self) -> Value {
71 self.shared.value.load(Ordering::Relaxed)
72 }
73 }
74