1 #![allow(clippy::unit_arg)]
2 
3 use crate::signal::os::{OsExtraData, OsStorage};
4 
5 use crate::sync::watch;
6 
7 use once_cell::sync::Lazy;
8 use std::ops;
9 use std::pin::Pin;
10 use std::sync::atomic::{AtomicBool, Ordering};
11 
12 pub(crate) type EventId = usize;
13 
14 /// State for a specific event, whether a notification is pending delivery,
15 /// and what listeners are registered.
16 #[derive(Debug)]
17 pub(crate) struct EventInfo {
18     pending: AtomicBool,
19     tx: watch::Sender<()>,
20 }
21 
22 impl Default for EventInfo {
default() -> Self23     fn default() -> Self {
24         let (tx, _rx) = watch::channel(());
25 
26         Self {
27             pending: AtomicBool::new(false),
28             tx,
29         }
30     }
31 }
32 
33 /// An interface for retrieving the `EventInfo` for a particular eventId.
34 pub(crate) trait Storage {
35     /// Gets the `EventInfo` for `id` if it exists.
event_info(&self, id: EventId) -> Option<&EventInfo>36     fn event_info(&self, id: EventId) -> Option<&EventInfo>;
37 
38     /// Invokes `f` once for each defined `EventInfo` in this storage.
for_each<'a, F>(&'a self, f: F) where F: FnMut(&'a EventInfo)39     fn for_each<'a, F>(&'a self, f: F)
40     where
41         F: FnMut(&'a EventInfo);
42 }
43 
44 impl Storage for Vec<EventInfo> {
event_info(&self, id: EventId) -> Option<&EventInfo>45     fn event_info(&self, id: EventId) -> Option<&EventInfo> {
46         self.get(id)
47     }
48 
for_each<'a, F>(&'a self, f: F) where F: FnMut(&'a EventInfo),49     fn for_each<'a, F>(&'a self, f: F)
50     where
51         F: FnMut(&'a EventInfo),
52     {
53         self.iter().for_each(f)
54     }
55 }
56 
57 /// An interface for initializing a type. Useful for situations where we cannot
58 /// inject a configured instance in the constructor of another type.
59 pub(crate) trait Init {
init() -> Self60     fn init() -> Self;
61 }
62 
63 /// Manages and distributes event notifications to any registered listeners.
64 ///
65 /// Generic over the underlying storage to allow for domain specific
66 /// optimizations (e.g. eventIds may or may not be contiguous).
67 #[derive(Debug)]
68 pub(crate) struct Registry<S> {
69     storage: S,
70 }
71 
72 impl<S> Registry<S> {
new(storage: S) -> Self73     fn new(storage: S) -> Self {
74         Self { storage }
75     }
76 }
77 
78 impl<S: Storage> Registry<S> {
79     /// Registers a new listener for `event_id`.
register_listener(&self, event_id: EventId) -> watch::Receiver<()>80     fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
81         self.storage
82             .event_info(event_id)
83             .unwrap_or_else(|| panic!("invalid event_id: {}", event_id))
84             .tx
85             .subscribe()
86     }
87 
88     /// Marks `event_id` as having been delivered, without broadcasting it to
89     /// any listeners.
record_event(&self, event_id: EventId)90     fn record_event(&self, event_id: EventId) {
91         if let Some(event_info) = self.storage.event_info(event_id) {
92             event_info.pending.store(true, Ordering::SeqCst)
93         }
94     }
95 
96     /// Broadcasts all previously recorded events to their respective listeners.
97     ///
98     /// Returns `true` if an event was delivered to at least one listener.
broadcast(&self) -> bool99     fn broadcast(&self) -> bool {
100         let mut did_notify = false;
101         self.storage.for_each(|event_info| {
102             // Any signal of this kind arrived since we checked last?
103             if !event_info.pending.swap(false, Ordering::SeqCst) {
104                 return;
105             }
106 
107             // Ignore errors if there are no listeners
108             if event_info.tx.send(()).is_ok() {
109                 did_notify = true;
110             }
111         });
112 
113         did_notify
114     }
115 }
116 
117 pub(crate) struct Globals {
118     extra: OsExtraData,
119     registry: Registry<OsStorage>,
120 }
121 
122 impl ops::Deref for Globals {
123     type Target = OsExtraData;
124 
deref(&self) -> &Self::Target125     fn deref(&self) -> &Self::Target {
126         &self.extra
127     }
128 }
129 
130 impl Globals {
131     /// Registers a new listener for `event_id`.
register_listener(&self, event_id: EventId) -> watch::Receiver<()>132     pub(crate) fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
133         self.registry.register_listener(event_id)
134     }
135 
136     /// Marks `event_id` as having been delivered, without broadcasting it to
137     /// any listeners.
record_event(&self, event_id: EventId)138     pub(crate) fn record_event(&self, event_id: EventId) {
139         self.registry.record_event(event_id);
140     }
141 
142     /// Broadcasts all previously recorded events to their respective listeners.
143     ///
144     /// Returns `true` if an event was delivered to at least one listener.
broadcast(&self) -> bool145     pub(crate) fn broadcast(&self) -> bool {
146         self.registry.broadcast()
147     }
148 
149     #[cfg(unix)]
storage(&self) -> &OsStorage150     pub(crate) fn storage(&self) -> &OsStorage {
151         &self.registry.storage
152     }
153 }
154 
globals() -> Pin<&'static Globals> where OsExtraData: 'static + Send + Sync + Init, OsStorage: 'static + Send + Sync + Init,155 pub(crate) fn globals() -> Pin<&'static Globals>
156 where
157     OsExtraData: 'static + Send + Sync + Init,
158     OsStorage: 'static + Send + Sync + Init,
159 {
160     static GLOBALS: Lazy<Pin<Box<Globals>>> = Lazy::new(|| {
161         Box::pin(Globals {
162             extra: OsExtraData::init(),
163             registry: Registry::new(OsStorage::init()),
164         })
165     });
166 
167     GLOBALS.as_ref()
168 }
169 
170 #[cfg(all(test, not(loom)))]
171 mod tests {
172     use super::*;
173     use crate::runtime::{self, Runtime};
174     use crate::sync::{oneshot, watch};
175 
176     use futures::future;
177 
178     #[test]
smoke()179     fn smoke() {
180         let rt = rt();
181         rt.block_on(async move {
182             let registry = Registry::new(vec![
183                 EventInfo::default(),
184                 EventInfo::default(),
185                 EventInfo::default(),
186             ]);
187 
188             let first = registry.register_listener(0);
189             let second = registry.register_listener(1);
190             let third = registry.register_listener(2);
191 
192             let (fire, wait) = oneshot::channel();
193 
194             crate::spawn(async {
195                 wait.await.expect("wait failed");
196 
197                 // Record some events which should get coalesced
198                 registry.record_event(0);
199                 registry.record_event(0);
200                 registry.record_event(1);
201                 registry.record_event(1);
202                 registry.broadcast();
203 
204                 // Yield so the previous broadcast can get received
205                 crate::time::sleep(std::time::Duration::from_millis(10)).await;
206 
207                 // Send subsequent signal
208                 registry.record_event(0);
209                 registry.broadcast();
210 
211                 drop(registry);
212             });
213 
214             let _ = fire.send(());
215             let all = future::join3(collect(first), collect(second), collect(third));
216 
217             let (first_results, second_results, third_results) = all.await;
218             assert_eq!(2, first_results.len());
219             assert_eq!(1, second_results.len());
220             assert_eq!(0, third_results.len());
221         });
222     }
223 
224     #[test]
225     #[should_panic = "invalid event_id: 1"]
register_panics_on_invalid_input()226     fn register_panics_on_invalid_input() {
227         let registry = Registry::new(vec![EventInfo::default()]);
228 
229         registry.register_listener(1);
230     }
231 
232     #[test]
record_invalid_event_does_nothing()233     fn record_invalid_event_does_nothing() {
234         let registry = Registry::new(vec![EventInfo::default()]);
235         registry.record_event(42);
236     }
237 
238     #[test]
broadcast_returns_if_at_least_one_event_fired()239     fn broadcast_returns_if_at_least_one_event_fired() {
240         let registry = Registry::new(vec![EventInfo::default(), EventInfo::default()]);
241 
242         registry.record_event(0);
243         assert!(!registry.broadcast());
244 
245         let first = registry.register_listener(0);
246         let second = registry.register_listener(1);
247 
248         registry.record_event(0);
249         assert!(registry.broadcast());
250 
251         drop(first);
252         registry.record_event(0);
253         assert!(!registry.broadcast());
254 
255         drop(second);
256     }
257 
rt() -> Runtime258     fn rt() -> Runtime {
259         runtime::Builder::new_current_thread()
260             .enable_time()
261             .build()
262             .unwrap()
263     }
264 
collect(mut rx: watch::Receiver<()>) -> Vec<()>265     async fn collect(mut rx: watch::Receiver<()>) -> Vec<()> {
266         let mut ret = vec![];
267 
268         while let Ok(v) = rx.changed().await {
269             ret.push(v);
270         }
271 
272         ret
273     }
274 }
275