1 //! Channel that delivers messages periodically. 2 //! 3 //! Messages cannot be sent into this kind of channel; they are materialized on demand. 4 5 use std::thread; 6 use std::time::{Duration, Instant}; 7 8 use crossbeam_utils::atomic::AtomicCell; 9 10 use crate::context::Context; 11 use crate::err::{RecvTimeoutError, TryRecvError}; 12 use crate::select::{Operation, SelectHandle, Token}; 13 14 /// Result of a receive operation. 15 pub(crate) type TickToken = Option<Instant>; 16 17 /// Channel that delivers messages periodically. 18 pub(crate) struct Channel { 19 /// The instant at which the next message will be delivered. 20 delivery_time: AtomicCell<Instant>, 21 22 /// The time interval in which messages get delivered. 23 duration: Duration, 24 } 25 26 impl Channel { 27 /// Creates a channel that delivers messages periodically. 28 #[inline] new(dur: Duration) -> Self29 pub(crate) fn new(dur: Duration) -> Self { 30 Channel { 31 delivery_time: AtomicCell::new(Instant::now() + dur), 32 duration: dur, 33 } 34 } 35 36 /// Attempts to receive a message without blocking. 37 #[inline] try_recv(&self) -> Result<Instant, TryRecvError>38 pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> { 39 loop { 40 let now = Instant::now(); 41 let delivery_time = self.delivery_time.load(); 42 43 if now < delivery_time { 44 return Err(TryRecvError::Empty); 45 } 46 47 if self 48 .delivery_time 49 .compare_exchange(delivery_time, now + self.duration) 50 .is_ok() 51 { 52 return Ok(delivery_time); 53 } 54 } 55 } 56 57 /// Receives a message from the channel. 58 #[inline] recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError>59 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { 60 loop { 61 let delivery_time = self.delivery_time.load(); 62 let now = Instant::now(); 63 64 if let Some(d) = deadline { 65 if d < delivery_time { 66 if now < d { 67 thread::sleep(d - now); 68 } 69 return Err(RecvTimeoutError::Timeout); 70 } 71 } 72 73 if self 74 .delivery_time 75 .compare_exchange(delivery_time, delivery_time.max(now) + self.duration) 76 .is_ok() 77 { 78 if now < delivery_time { 79 thread::sleep(delivery_time - now); 80 } 81 return Ok(delivery_time); 82 } 83 } 84 } 85 86 /// Reads a message from the channel. 87 #[inline] read(&self, token: &mut Token) -> Result<Instant, ()>88 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { 89 token.tick.ok_or(()) 90 } 91 92 /// Returns `true` if the channel is empty. 93 #[inline] is_empty(&self) -> bool94 pub(crate) fn is_empty(&self) -> bool { 95 Instant::now() < self.delivery_time.load() 96 } 97 98 /// Returns `true` if the channel is full. 99 #[inline] is_full(&self) -> bool100 pub(crate) fn is_full(&self) -> bool { 101 !self.is_empty() 102 } 103 104 /// Returns the number of messages in the channel. 105 #[inline] len(&self) -> usize106 pub(crate) fn len(&self) -> usize { 107 if self.is_empty() { 108 0 109 } else { 110 1 111 } 112 } 113 114 /// Returns the capacity of the channel. 115 #[allow(clippy::unnecessary_wraps)] // This is intentional. 116 #[inline] capacity(&self) -> Option<usize>117 pub(crate) fn capacity(&self) -> Option<usize> { 118 Some(1) 119 } 120 } 121 122 impl SelectHandle for Channel { 123 #[inline] try_select(&self, token: &mut Token) -> bool124 fn try_select(&self, token: &mut Token) -> bool { 125 match self.try_recv() { 126 Ok(msg) => { 127 token.tick = Some(msg); 128 true 129 } 130 Err(TryRecvError::Disconnected) => { 131 token.tick = None; 132 true 133 } 134 Err(TryRecvError::Empty) => false, 135 } 136 } 137 138 #[inline] deadline(&self) -> Option<Instant>139 fn deadline(&self) -> Option<Instant> { 140 Some(self.delivery_time.load()) 141 } 142 143 #[inline] register(&self, _oper: Operation, _cx: &Context) -> bool144 fn register(&self, _oper: Operation, _cx: &Context) -> bool { 145 self.is_ready() 146 } 147 148 #[inline] unregister(&self, _oper: Operation)149 fn unregister(&self, _oper: Operation) {} 150 151 #[inline] accept(&self, token: &mut Token, _cx: &Context) -> bool152 fn accept(&self, token: &mut Token, _cx: &Context) -> bool { 153 self.try_select(token) 154 } 155 156 #[inline] is_ready(&self) -> bool157 fn is_ready(&self) -> bool { 158 !self.is_empty() 159 } 160 161 #[inline] watch(&self, _oper: Operation, _cx: &Context) -> bool162 fn watch(&self, _oper: Operation, _cx: &Context) -> bool { 163 self.is_ready() 164 } 165 166 #[inline] unwatch(&self, _oper: Operation)167 fn unwatch(&self, _oper: Operation) {} 168 } 169