1 //! Channel that delivers a message at a certain moment in time. 2 //! 3 //! Messages cannot be sent into this kind of channel; they are materialized on demand. 4 5 use std::sync::atomic::{AtomicBool, Ordering}; 6 use std::thread; 7 use std::time::{Duration, Instant}; 8 9 use crate::context::Context; 10 use crate::err::{RecvTimeoutError, TryRecvError}; 11 use crate::select::{Operation, SelectHandle, Token}; 12 use crate::utils; 13 14 /// Result of a receive operation. 15 pub(crate) type AtToken = Option<Instant>; 16 17 /// Channel that delivers a message at a certain moment in time 18 pub(crate) struct Channel { 19 /// The instant at which the message will be delivered. 20 delivery_time: Instant, 21 22 /// `true` if the message has been received. 23 received: AtomicBool, 24 } 25 26 impl Channel { 27 /// Creates a channel that delivers a message at a certain instant in time. 28 #[inline] new_deadline(when: Instant) -> Self29 pub(crate) fn new_deadline(when: Instant) -> Self { 30 Channel { 31 delivery_time: when, 32 received: AtomicBool::new(false), 33 } 34 } 35 /// Creates a channel that delivers a message after a certain duration of time. 36 #[inline] new_timeout(dur: Duration) -> Self37 pub(crate) fn new_timeout(dur: Duration) -> Self { 38 Self::new_deadline(Instant::now() + dur) 39 } 40 41 /// Attempts to receive a message without blocking. 42 #[inline] try_recv(&self) -> Result<Instant, TryRecvError>43 pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> { 44 // We use relaxed ordering because this is just an optional optimistic check. 45 if self.received.load(Ordering::Relaxed) { 46 // The message has already been received. 47 return Err(TryRecvError::Empty); 48 } 49 50 if Instant::now() < self.delivery_time { 51 // The message was not delivered yet. 52 return Err(TryRecvError::Empty); 53 } 54 55 // Try receiving the message if it is still available. 56 if !self.received.swap(true, Ordering::SeqCst) { 57 // Success! Return delivery time as the message. 58 Ok(self.delivery_time) 59 } else { 60 // The message was already received. 61 Err(TryRecvError::Empty) 62 } 63 } 64 65 /// Receives a message from the channel. 66 #[inline] recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError>67 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { 68 // We use relaxed ordering because this is just an optional optimistic check. 69 if self.received.load(Ordering::Relaxed) { 70 // The message has already been received. 71 utils::sleep_until(deadline); 72 return Err(RecvTimeoutError::Timeout); 73 } 74 75 // Wait until the message is received or the deadline is reached. 76 loop { 77 let now = Instant::now(); 78 79 let deadline = match deadline { 80 // Check if we can receive the next message. 81 _ if now >= self.delivery_time => break, 82 // Check if the timeout deadline has been reached. 83 Some(d) if now >= d => return Err(RecvTimeoutError::Timeout), 84 85 // Sleep until one of the above happens 86 Some(d) if d < self.delivery_time => d, 87 _ => self.delivery_time, 88 }; 89 90 thread::sleep(deadline - now); 91 } 92 93 // Try receiving the message if it is still available. 94 if !self.received.swap(true, Ordering::SeqCst) { 95 // Success! Return the message, which is the instant at which it was delivered. 96 Ok(self.delivery_time) 97 } else { 98 // The message was already received. Block forever. 99 utils::sleep_until(None); 100 unreachable!() 101 } 102 } 103 104 /// Reads a message from the channel. 105 #[inline] read(&self, token: &mut Token) -> Result<Instant, ()>106 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { 107 token.at.ok_or(()) 108 } 109 110 /// Returns `true` if the channel is empty. 111 #[inline] is_empty(&self) -> bool112 pub(crate) fn is_empty(&self) -> bool { 113 // We use relaxed ordering because this is just an optional optimistic check. 114 if self.received.load(Ordering::Relaxed) { 115 return true; 116 } 117 118 // If the delivery time hasn't been reached yet, the channel is empty. 119 if Instant::now() < self.delivery_time { 120 return true; 121 } 122 123 // The delivery time has been reached. The channel is empty only if the message has already 124 // been received. 125 self.received.load(Ordering::SeqCst) 126 } 127 128 /// Returns `true` if the channel is full. 129 #[inline] is_full(&self) -> bool130 pub(crate) fn is_full(&self) -> bool { 131 !self.is_empty() 132 } 133 134 /// Returns the number of messages in the channel. 135 #[inline] len(&self) -> usize136 pub(crate) fn len(&self) -> usize { 137 if self.is_empty() { 138 0 139 } else { 140 1 141 } 142 } 143 144 /// Returns the capacity of the channel. 145 #[allow(clippy::unnecessary_wraps)] // This is intentional. 146 #[inline] capacity(&self) -> Option<usize>147 pub(crate) fn capacity(&self) -> Option<usize> { 148 Some(1) 149 } 150 } 151 152 impl SelectHandle for Channel { 153 #[inline] try_select(&self, token: &mut Token) -> bool154 fn try_select(&self, token: &mut Token) -> bool { 155 match self.try_recv() { 156 Ok(msg) => { 157 token.at = Some(msg); 158 true 159 } 160 Err(TryRecvError::Disconnected) => { 161 token.at = None; 162 true 163 } 164 Err(TryRecvError::Empty) => false, 165 } 166 } 167 168 #[inline] deadline(&self) -> Option<Instant>169 fn deadline(&self) -> Option<Instant> { 170 // We use relaxed ordering because this is just an optional optimistic check. 171 if self.received.load(Ordering::Relaxed) { 172 None 173 } else { 174 Some(self.delivery_time) 175 } 176 } 177 178 #[inline] register(&self, _oper: Operation, _cx: &Context) -> bool179 fn register(&self, _oper: Operation, _cx: &Context) -> bool { 180 self.is_ready() 181 } 182 183 #[inline] unregister(&self, _oper: Operation)184 fn unregister(&self, _oper: Operation) {} 185 186 #[inline] accept(&self, token: &mut Token, _cx: &Context) -> bool187 fn accept(&self, token: &mut Token, _cx: &Context) -> bool { 188 self.try_select(token) 189 } 190 191 #[inline] is_ready(&self) -> bool192 fn is_ready(&self) -> bool { 193 !self.is_empty() 194 } 195 196 #[inline] watch(&self, _oper: Operation, _cx: &Context) -> bool197 fn watch(&self, _oper: Operation, _cx: &Context) -> bool { 198 self.is_ready() 199 } 200 201 #[inline] unwatch(&self, _oper: Operation)202 fn unwatch(&self, _oper: Operation) {} 203 } 204