1 //! An SPSC broadcast channel.
2 //!
3 //! - The value can only be a `usize`.
4 //! - The consumer is only notified if the value is different.
5 //! - The value `0` is reserved for closed.
6 
7 use futures_util::task::AtomicWaker;
8 use std::sync::{
9     atomic::{AtomicUsize, Ordering},
10     Arc,
11 };
12 use std::task;
13 
14 type Value = usize;
15 
16 pub(crate) const CLOSED: usize = 0;
17 
channel(initial: Value) -> (Sender, Receiver)18 pub(crate) fn channel(initial: Value) -> (Sender, Receiver) {
19     debug_assert!(
20         initial != CLOSED,
21         "watch::channel initial state of 0 is reserved"
22     );
23 
24     let shared = Arc::new(Shared {
25         value: AtomicUsize::new(initial),
26         waker: AtomicWaker::new(),
27     });
28 
29     (
30         Sender {
31             shared: shared.clone(),
32         },
33         Receiver { shared },
34     )
35 }
36 
37 pub(crate) struct Sender {
38     shared: Arc<Shared>,
39 }
40 
41 pub(crate) struct Receiver {
42     shared: Arc<Shared>,
43 }
44 
45 struct Shared {
46     value: AtomicUsize,
47     waker: AtomicWaker,
48 }
49 
50 impl Sender {
send(&mut self, value: Value)51     pub(crate) fn send(&mut self, value: Value) {
52         if self.shared.value.swap(value, Ordering::SeqCst) != value {
53             self.shared.waker.wake();
54         }
55     }
56 }
57 
58 impl Drop for Sender {
drop(&mut self)59     fn drop(&mut self) {
60         self.send(CLOSED);
61     }
62 }
63 
64 impl Receiver {
load(&mut self, cx: &mut task::Context<'_>) -> Value65     pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value {
66         self.shared.waker.register(cx.waker());
67         self.shared.value.load(Ordering::SeqCst)
68     }
69 
peek(&self) -> Value70     pub(crate) fn peek(&self) -> Value {
71         self.shared.value.load(Ordering::Relaxed)
72     }
73 }
74