1 //! Channel that delivers a message at a certain moment in time.
2 //!
3 //! Messages cannot be sent into this kind of channel; they are materialized on demand.
4 
5 use std::sync::atomic::{AtomicBool, Ordering};
6 use std::thread;
7 use std::time::{Duration, Instant};
8 
9 use crate::context::Context;
10 use crate::err::{RecvTimeoutError, TryRecvError};
11 use crate::select::{Operation, SelectHandle, Token};
12 use crate::utils;
13 
14 /// Result of a receive operation.
15 pub(crate) type AtToken = Option<Instant>;
16 
17 /// Channel that delivers a message at a certain moment in time
18 pub(crate) struct Channel {
19     /// The instant at which the message will be delivered.
20     delivery_time: Instant,
21 
22     /// `true` if the message has been received.
23     received: AtomicBool,
24 }
25 
26 impl Channel {
27     /// Creates a channel that delivers a message at a certain instant in time.
28     #[inline]
new_deadline(when: Instant) -> Self29     pub(crate) fn new_deadline(when: Instant) -> Self {
30         Channel {
31             delivery_time: when,
32             received: AtomicBool::new(false),
33         }
34     }
35     /// Creates a channel that delivers a message after a certain duration of time.
36     #[inline]
new_timeout(dur: Duration) -> Self37     pub(crate) fn new_timeout(dur: Duration) -> Self {
38         Self::new_deadline(Instant::now() + dur)
39     }
40 
41     /// Attempts to receive a message without blocking.
42     #[inline]
try_recv(&self) -> Result<Instant, TryRecvError>43     pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
44         // We use relaxed ordering because this is just an optional optimistic check.
45         if self.received.load(Ordering::Relaxed) {
46             // The message has already been received.
47             return Err(TryRecvError::Empty);
48         }
49 
50         if Instant::now() < self.delivery_time {
51             // The message was not delivered yet.
52             return Err(TryRecvError::Empty);
53         }
54 
55         // Try receiving the message if it is still available.
56         if !self.received.swap(true, Ordering::SeqCst) {
57             // Success! Return delivery time as the message.
58             Ok(self.delivery_time)
59         } else {
60             // The message was already received.
61             Err(TryRecvError::Empty)
62         }
63     }
64 
65     /// Receives a message from the channel.
66     #[inline]
recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError>67     pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
68         // We use relaxed ordering because this is just an optional optimistic check.
69         if self.received.load(Ordering::Relaxed) {
70             // The message has already been received.
71             utils::sleep_until(deadline);
72             return Err(RecvTimeoutError::Timeout);
73         }
74 
75         // Wait until the message is received or the deadline is reached.
76         loop {
77             let now = Instant::now();
78 
79             let deadline = match deadline {
80                 // Check if we can receive the next message.
81                 _ if now >= self.delivery_time => break,
82                 // Check if the timeout deadline has been reached.
83                 Some(d) if now >= d => return Err(RecvTimeoutError::Timeout),
84 
85                 // Sleep until one of the above happens
86                 Some(d) if d < self.delivery_time => d,
87                 _ => self.delivery_time,
88             };
89 
90             thread::sleep(deadline - now);
91         }
92 
93         // Try receiving the message if it is still available.
94         if !self.received.swap(true, Ordering::SeqCst) {
95             // Success! Return the message, which is the instant at which it was delivered.
96             Ok(self.delivery_time)
97         } else {
98             // The message was already received. Block forever.
99             utils::sleep_until(None);
100             unreachable!()
101         }
102     }
103 
104     /// Reads a message from the channel.
105     #[inline]
read(&self, token: &mut Token) -> Result<Instant, ()>106     pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
107         token.at.ok_or(())
108     }
109 
110     /// Returns `true` if the channel is empty.
111     #[inline]
is_empty(&self) -> bool112     pub(crate) fn is_empty(&self) -> bool {
113         // We use relaxed ordering because this is just an optional optimistic check.
114         if self.received.load(Ordering::Relaxed) {
115             return true;
116         }
117 
118         // If the delivery time hasn't been reached yet, the channel is empty.
119         if Instant::now() < self.delivery_time {
120             return true;
121         }
122 
123         // The delivery time has been reached. The channel is empty only if the message has already
124         // been received.
125         self.received.load(Ordering::SeqCst)
126     }
127 
128     /// Returns `true` if the channel is full.
129     #[inline]
is_full(&self) -> bool130     pub(crate) fn is_full(&self) -> bool {
131         !self.is_empty()
132     }
133 
134     /// Returns the number of messages in the channel.
135     #[inline]
len(&self) -> usize136     pub(crate) fn len(&self) -> usize {
137         if self.is_empty() {
138             0
139         } else {
140             1
141         }
142     }
143 
144     /// Returns the capacity of the channel.
145     #[allow(clippy::unnecessary_wraps)] // This is intentional.
146     #[inline]
capacity(&self) -> Option<usize>147     pub(crate) fn capacity(&self) -> Option<usize> {
148         Some(1)
149     }
150 }
151 
152 impl SelectHandle for Channel {
153     #[inline]
try_select(&self, token: &mut Token) -> bool154     fn try_select(&self, token: &mut Token) -> bool {
155         match self.try_recv() {
156             Ok(msg) => {
157                 token.at = Some(msg);
158                 true
159             }
160             Err(TryRecvError::Disconnected) => {
161                 token.at = None;
162                 true
163             }
164             Err(TryRecvError::Empty) => false,
165         }
166     }
167 
168     #[inline]
deadline(&self) -> Option<Instant>169     fn deadline(&self) -> Option<Instant> {
170         // We use relaxed ordering because this is just an optional optimistic check.
171         if self.received.load(Ordering::Relaxed) {
172             None
173         } else {
174             Some(self.delivery_time)
175         }
176     }
177 
178     #[inline]
register(&self, _oper: Operation, _cx: &Context) -> bool179     fn register(&self, _oper: Operation, _cx: &Context) -> bool {
180         self.is_ready()
181     }
182 
183     #[inline]
unregister(&self, _oper: Operation)184     fn unregister(&self, _oper: Operation) {}
185 
186     #[inline]
accept(&self, token: &mut Token, _cx: &Context) -> bool187     fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
188         self.try_select(token)
189     }
190 
191     #[inline]
is_ready(&self) -> bool192     fn is_ready(&self) -> bool {
193         !self.is_empty()
194     }
195 
196     #[inline]
watch(&self, _oper: Operation, _cx: &Context) -> bool197     fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
198         self.is_ready()
199     }
200 
201     #[inline]
unwatch(&self, _oper: Operation)202     fn unwatch(&self, _oper: Operation) {}
203 }
204