1 //! Channel that delivers messages periodically.
2 //!
3 //! Messages cannot be sent into this kind of channel; they are materialized on demand.
4 
5 use std::thread;
6 use std::time::{Duration, Instant};
7 
8 use crossbeam_utils::atomic::AtomicCell;
9 
10 use crate::context::Context;
11 use crate::err::{RecvTimeoutError, TryRecvError};
12 use crate::select::{Operation, SelectHandle, Token};
13 
14 /// Result of a receive operation.
15 pub(crate) type TickToken = Option<Instant>;
16 
17 /// Channel that delivers messages periodically.
18 pub(crate) struct Channel {
19     /// The instant at which the next message will be delivered.
20     delivery_time: AtomicCell<Instant>,
21 
22     /// The time interval in which messages get delivered.
23     duration: Duration,
24 }
25 
26 impl Channel {
27     /// Creates a channel that delivers messages periodically.
28     #[inline]
new(dur: Duration) -> Self29     pub(crate) fn new(dur: Duration) -> Self {
30         Channel {
31             delivery_time: AtomicCell::new(Instant::now() + dur),
32             duration: dur,
33         }
34     }
35 
36     /// Attempts to receive a message without blocking.
37     #[inline]
try_recv(&self) -> Result<Instant, TryRecvError>38     pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39         loop {
40             let now = Instant::now();
41             let delivery_time = self.delivery_time.load();
42 
43             if now < delivery_time {
44                 return Err(TryRecvError::Empty);
45             }
46 
47             if self
48                 .delivery_time
49                 .compare_exchange(delivery_time, now + self.duration)
50                 .is_ok()
51             {
52                 return Ok(delivery_time);
53             }
54         }
55     }
56 
57     /// Receives a message from the channel.
58     #[inline]
recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError>59     pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
60         loop {
61             let delivery_time = self.delivery_time.load();
62             let now = Instant::now();
63 
64             if let Some(d) = deadline {
65                 if d < delivery_time {
66                     if now < d {
67                         thread::sleep(d - now);
68                     }
69                     return Err(RecvTimeoutError::Timeout);
70                 }
71             }
72 
73             if self
74                 .delivery_time
75                 .compare_exchange(delivery_time, delivery_time.max(now) + self.duration)
76                 .is_ok()
77             {
78                 if now < delivery_time {
79                     thread::sleep(delivery_time - now);
80                 }
81                 return Ok(delivery_time);
82             }
83         }
84     }
85 
86     /// Reads a message from the channel.
87     #[inline]
read(&self, token: &mut Token) -> Result<Instant, ()>88     pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
89         token.tick.ok_or(())
90     }
91 
92     /// Returns `true` if the channel is empty.
93     #[inline]
is_empty(&self) -> bool94     pub(crate) fn is_empty(&self) -> bool {
95         Instant::now() < self.delivery_time.load()
96     }
97 
98     /// Returns `true` if the channel is full.
99     #[inline]
is_full(&self) -> bool100     pub(crate) fn is_full(&self) -> bool {
101         !self.is_empty()
102     }
103 
104     /// Returns the number of messages in the channel.
105     #[inline]
len(&self) -> usize106     pub(crate) fn len(&self) -> usize {
107         if self.is_empty() {
108             0
109         } else {
110             1
111         }
112     }
113 
114     /// Returns the capacity of the channel.
115     #[allow(clippy::unnecessary_wraps)] // This is intentional.
116     #[inline]
capacity(&self) -> Option<usize>117     pub(crate) fn capacity(&self) -> Option<usize> {
118         Some(1)
119     }
120 }
121 
122 impl SelectHandle for Channel {
123     #[inline]
try_select(&self, token: &mut Token) -> bool124     fn try_select(&self, token: &mut Token) -> bool {
125         match self.try_recv() {
126             Ok(msg) => {
127                 token.tick = Some(msg);
128                 true
129             }
130             Err(TryRecvError::Disconnected) => {
131                 token.tick = None;
132                 true
133             }
134             Err(TryRecvError::Empty) => false,
135         }
136     }
137 
138     #[inline]
deadline(&self) -> Option<Instant>139     fn deadline(&self) -> Option<Instant> {
140         Some(self.delivery_time.load())
141     }
142 
143     #[inline]
register(&self, _oper: Operation, _cx: &Context) -> bool144     fn register(&self, _oper: Operation, _cx: &Context) -> bool {
145         self.is_ready()
146     }
147 
148     #[inline]
unregister(&self, _oper: Operation)149     fn unregister(&self, _oper: Operation) {}
150 
151     #[inline]
accept(&self, token: &mut Token, _cx: &Context) -> bool152     fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
153         self.try_select(token)
154     }
155 
156     #[inline]
is_ready(&self) -> bool157     fn is_ready(&self) -> bool {
158         !self.is_empty()
159     }
160 
161     #[inline]
watch(&self, _oper: Operation, _cx: &Context) -> bool162     fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
163         self.is_ready()
164     }
165 
166     #[inline]
unwatch(&self, _oper: Operation)167     fn unwatch(&self, _oper: Operation) {}
168 }
169