1 #![allow(clippy::unit_arg)]
2 
3 use crate::signal::os::{OsExtraData, OsStorage};
4 
5 use crate::sync::mpsc::Sender;
6 
7 use lazy_static::lazy_static;
8 use std::ops;
9 use std::pin::Pin;
10 use std::sync::atomic::{AtomicBool, Ordering};
11 use std::sync::Mutex;
12 
13 pub(crate) type EventId = usize;
14 
15 /// State for a specific event, whether a notification is pending delivery,
16 /// and what listeners are registered.
17 #[derive(Default, Debug)]
18 pub(crate) struct EventInfo {
19     pending: AtomicBool,
20     recipients: Mutex<Vec<Sender<()>>>,
21 }
22 
23 /// An interface for retrieving the `EventInfo` for a particular eventId.
24 pub(crate) trait Storage {
25     /// Gets the `EventInfo` for `id` if it exists.
event_info(&self, id: EventId) -> Option<&EventInfo>26     fn event_info(&self, id: EventId) -> Option<&EventInfo>;
27 
28     /// Invokes `f` once for each defined `EventInfo` in this storage.
for_each<'a, F>(&'a self, f: F) where F: FnMut(&'a EventInfo)29     fn for_each<'a, F>(&'a self, f: F)
30     where
31         F: FnMut(&'a EventInfo);
32 }
33 
34 impl Storage for Vec<EventInfo> {
event_info(&self, id: EventId) -> Option<&EventInfo>35     fn event_info(&self, id: EventId) -> Option<&EventInfo> {
36         self.get(id)
37     }
38 
for_each<'a, F>(&'a self, f: F) where F: FnMut(&'a EventInfo),39     fn for_each<'a, F>(&'a self, f: F)
40     where
41         F: FnMut(&'a EventInfo),
42     {
43         self.iter().for_each(f)
44     }
45 }
46 
47 /// An interface for initializing a type. Useful for situations where we cannot
48 /// inject a configured instance in the constructor of another type.
49 pub(crate) trait Init {
init() -> Self50     fn init() -> Self;
51 }
52 
53 /// Manages and distributes event notifications to any registered listeners.
54 ///
55 /// Generic over the underlying storage to allow for domain specific
56 /// optimizations (e.g. eventIds may or may not be contiguous).
57 #[derive(Debug)]
58 pub(crate) struct Registry<S> {
59     storage: S,
60 }
61 
62 impl<S> Registry<S> {
new(storage: S) -> Self63     fn new(storage: S) -> Self {
64         Self { storage }
65     }
66 }
67 
68 impl<S: Storage> Registry<S> {
69     /// Registers a new listener for `event_id`.
register_listener(&self, event_id: EventId, listener: Sender<()>)70     fn register_listener(&self, event_id: EventId, listener: Sender<()>) {
71         self.storage
72             .event_info(event_id)
73             .unwrap_or_else(|| panic!("invalid event_id: {}", event_id))
74             .recipients
75             .lock()
76             .unwrap()
77             .push(listener);
78     }
79 
80     /// Marks `event_id` as having been delivered, without broadcasting it to
81     /// any listeners.
record_event(&self, event_id: EventId)82     fn record_event(&self, event_id: EventId) {
83         if let Some(event_info) = self.storage.event_info(event_id) {
84             event_info.pending.store(true, Ordering::SeqCst)
85         }
86     }
87 
88     /// Broadcasts all previously recorded events to their respective listeners.
89     ///
90     /// Returns `true` if an event was delivered to at least one listener.
broadcast(&self) -> bool91     fn broadcast(&self) -> bool {
92         use crate::sync::mpsc::error::TrySendError;
93 
94         let mut did_notify = false;
95         self.storage.for_each(|event_info| {
96             // Any signal of this kind arrived since we checked last?
97             if !event_info.pending.swap(false, Ordering::SeqCst) {
98                 return;
99             }
100 
101             let mut recipients = event_info.recipients.lock().unwrap();
102 
103             // Notify all waiters on this signal that the signal has been
104             // received. If we can't push a message into the queue then we don't
105             // worry about it as everything is coalesced anyway. If the channel
106             // has gone away then we can remove that slot.
107             for i in (0..recipients.len()).rev() {
108                 match recipients[i].try_send(()) {
109                     Ok(()) => did_notify = true,
110                     Err(TrySendError::Closed(..)) => {
111                         recipients.swap_remove(i);
112                     }
113 
114                     // Channel is full, ignore the error since the
115                     // receiver has already been woken up
116                     Err(_) => {}
117                 }
118             }
119         });
120 
121         did_notify
122     }
123 }
124 
125 pub(crate) struct Globals {
126     extra: OsExtraData,
127     registry: Registry<OsStorage>,
128 }
129 
130 impl ops::Deref for Globals {
131     type Target = OsExtraData;
132 
deref(&self) -> &Self::Target133     fn deref(&self) -> &Self::Target {
134         &self.extra
135     }
136 }
137 
138 impl Globals {
139     /// Registers a new listener for `event_id`.
register_listener(&self, event_id: EventId, listener: Sender<()>)140     pub(crate) fn register_listener(&self, event_id: EventId, listener: Sender<()>) {
141         self.registry.register_listener(event_id, listener);
142     }
143 
144     /// Marks `event_id` as having been delivered, without broadcasting it to
145     /// any listeners.
record_event(&self, event_id: EventId)146     pub(crate) fn record_event(&self, event_id: EventId) {
147         self.registry.record_event(event_id);
148     }
149 
150     /// Broadcasts all previously recorded events to their respective listeners.
151     ///
152     /// Returns `true` if an event was delivered to at least one listener.
broadcast(&self) -> bool153     pub(crate) fn broadcast(&self) -> bool {
154         self.registry.broadcast()
155     }
156 
157     #[cfg(unix)]
storage(&self) -> &OsStorage158     pub(crate) fn storage(&self) -> &OsStorage {
159         &self.registry.storage
160     }
161 }
162 
globals() -> Pin<&'static Globals> where OsExtraData: 'static + Send + Sync + Init, OsStorage: 'static + Send + Sync + Init,163 pub(crate) fn globals() -> Pin<&'static Globals>
164 where
165     OsExtraData: 'static + Send + Sync + Init,
166     OsStorage: 'static + Send + Sync + Init,
167 {
168     lazy_static! {
169         static ref GLOBALS: Pin<Box<Globals>> = Box::pin(Globals {
170             extra: OsExtraData::init(),
171             registry: Registry::new(OsStorage::init()),
172         });
173     }
174 
175     GLOBALS.as_ref()
176 }
177 
178 #[cfg(all(test, not(loom)))]
179 mod tests {
180     use super::*;
181     use crate::runtime::{self, Runtime};
182     use crate::sync::{mpsc, oneshot};
183 
184     use futures::future;
185 
186     #[test]
smoke()187     fn smoke() {
188         let mut rt = rt();
189         rt.block_on(async move {
190             let registry = Registry::new(vec![
191                 EventInfo::default(),
192                 EventInfo::default(),
193                 EventInfo::default(),
194             ]);
195 
196             let (first_tx, first_rx) = mpsc::channel(3);
197             let (second_tx, second_rx) = mpsc::channel(3);
198             let (third_tx, third_rx) = mpsc::channel(3);
199 
200             registry.register_listener(0, first_tx);
201             registry.register_listener(1, second_tx);
202             registry.register_listener(2, third_tx);
203 
204             let (fire, wait) = oneshot::channel();
205 
206             crate::spawn(async {
207                 wait.await.expect("wait failed");
208 
209                 // Record some events which should get coalesced
210                 registry.record_event(0);
211                 registry.record_event(0);
212                 registry.record_event(1);
213                 registry.record_event(1);
214                 registry.broadcast();
215 
216                 // Send subsequent signal
217                 registry.record_event(0);
218                 registry.broadcast();
219 
220                 drop(registry);
221             });
222 
223             let _ = fire.send(());
224             let all = future::join3(collect(first_rx), collect(second_rx), collect(third_rx));
225 
226             let (first_results, second_results, third_results) = all.await;
227             assert_eq!(2, first_results.len());
228             assert_eq!(1, second_results.len());
229             assert_eq!(0, third_results.len());
230         });
231     }
232 
233     #[test]
234     #[should_panic = "invalid event_id: 1"]
register_panics_on_invalid_input()235     fn register_panics_on_invalid_input() {
236         let registry = Registry::new(vec![EventInfo::default()]);
237 
238         let (tx, _) = mpsc::channel(1);
239         registry.register_listener(1, tx);
240     }
241 
242     #[test]
record_invalid_event_does_nothing()243     fn record_invalid_event_does_nothing() {
244         let registry = Registry::new(vec![EventInfo::default()]);
245         registry.record_event(42);
246     }
247 
248     #[test]
broadcast_cleans_up_disconnected_listeners()249     fn broadcast_cleans_up_disconnected_listeners() {
250         let mut rt = Runtime::new().unwrap();
251 
252         rt.block_on(async {
253             let registry = Registry::new(vec![EventInfo::default()]);
254 
255             let (first_tx, first_rx) = mpsc::channel(1);
256             let (second_tx, second_rx) = mpsc::channel(1);
257             let (third_tx, third_rx) = mpsc::channel(1);
258 
259             registry.register_listener(0, first_tx);
260             registry.register_listener(0, second_tx);
261             registry.register_listener(0, third_tx);
262 
263             drop(first_rx);
264             drop(second_rx);
265 
266             let (fire, wait) = oneshot::channel();
267 
268             crate::spawn(async {
269                 wait.await.expect("wait failed");
270 
271                 registry.record_event(0);
272                 registry.broadcast();
273 
274                 assert_eq!(1, registry.storage[0].recipients.lock().unwrap().len());
275                 drop(registry);
276             });
277 
278             let _ = fire.send(());
279             let results = collect(third_rx).await;
280 
281             assert_eq!(1, results.len());
282         });
283     }
284 
285     #[test]
broadcast_returns_if_at_least_one_event_fired()286     fn broadcast_returns_if_at_least_one_event_fired() {
287         let registry = Registry::new(vec![EventInfo::default()]);
288 
289         registry.record_event(0);
290         assert_eq!(false, registry.broadcast());
291 
292         let (first_tx, first_rx) = mpsc::channel(1);
293         let (second_tx, second_rx) = mpsc::channel(1);
294 
295         registry.register_listener(0, first_tx);
296         registry.register_listener(0, second_tx);
297 
298         registry.record_event(0);
299         assert_eq!(true, registry.broadcast());
300 
301         drop(first_rx);
302         registry.record_event(0);
303         assert_eq!(false, registry.broadcast());
304 
305         drop(second_rx);
306     }
307 
rt() -> Runtime308     fn rt() -> Runtime {
309         runtime::Builder::new().basic_scheduler().build().unwrap()
310     }
311 
collect(mut rx: crate::sync::mpsc::Receiver<()>) -> Vec<()>312     async fn collect(mut rx: crate::sync::mpsc::Receiver<()>) -> Vec<()> {
313         let mut ret = vec![];
314 
315         while let Some(v) = rx.recv().await {
316             ret.push(v);
317         }
318 
319         ret
320     }
321 }
322