1 #![allow(clippy::unit_arg)]
2
3 use crate::signal::os::{OsExtraData, OsStorage};
4
5 use crate::sync::mpsc::Sender;
6
7 use lazy_static::lazy_static;
8 use std::ops;
9 use std::pin::Pin;
10 use std::sync::atomic::{AtomicBool, Ordering};
11 use std::sync::Mutex;
12
13 pub(crate) type EventId = usize;
14
15 /// State for a specific event, whether a notification is pending delivery,
16 /// and what listeners are registered.
17 #[derive(Default, Debug)]
18 pub(crate) struct EventInfo {
19 pending: AtomicBool,
20 recipients: Mutex<Vec<Sender<()>>>,
21 }
22
23 /// An interface for retrieving the `EventInfo` for a particular eventId.
24 pub(crate) trait Storage {
25 /// Gets the `EventInfo` for `id` if it exists.
event_info(&self, id: EventId) -> Option<&EventInfo>26 fn event_info(&self, id: EventId) -> Option<&EventInfo>;
27
28 /// Invokes `f` once for each defined `EventInfo` in this storage.
for_each<'a, F>(&'a self, f: F) where F: FnMut(&'a EventInfo)29 fn for_each<'a, F>(&'a self, f: F)
30 where
31 F: FnMut(&'a EventInfo);
32 }
33
34 impl Storage for Vec<EventInfo> {
event_info(&self, id: EventId) -> Option<&EventInfo>35 fn event_info(&self, id: EventId) -> Option<&EventInfo> {
36 self.get(id)
37 }
38
for_each<'a, F>(&'a self, f: F) where F: FnMut(&'a EventInfo),39 fn for_each<'a, F>(&'a self, f: F)
40 where
41 F: FnMut(&'a EventInfo),
42 {
43 self.iter().for_each(f)
44 }
45 }
46
47 /// An interface for initializing a type. Useful for situations where we cannot
48 /// inject a configured instance in the constructor of another type.
49 pub(crate) trait Init {
init() -> Self50 fn init() -> Self;
51 }
52
53 /// Manages and distributes event notifications to any registered listeners.
54 ///
55 /// Generic over the underlying storage to allow for domain specific
56 /// optimizations (e.g. eventIds may or may not be contiguous).
57 #[derive(Debug)]
58 pub(crate) struct Registry<S> {
59 storage: S,
60 }
61
62 impl<S> Registry<S> {
new(storage: S) -> Self63 fn new(storage: S) -> Self {
64 Self { storage }
65 }
66 }
67
68 impl<S: Storage> Registry<S> {
69 /// Registers a new listener for `event_id`.
register_listener(&self, event_id: EventId, listener: Sender<()>)70 fn register_listener(&self, event_id: EventId, listener: Sender<()>) {
71 self.storage
72 .event_info(event_id)
73 .unwrap_or_else(|| panic!("invalid event_id: {}", event_id))
74 .recipients
75 .lock()
76 .unwrap()
77 .push(listener);
78 }
79
80 /// Marks `event_id` as having been delivered, without broadcasting it to
81 /// any listeners.
record_event(&self, event_id: EventId)82 fn record_event(&self, event_id: EventId) {
83 if let Some(event_info) = self.storage.event_info(event_id) {
84 event_info.pending.store(true, Ordering::SeqCst)
85 }
86 }
87
88 /// Broadcasts all previously recorded events to their respective listeners.
89 ///
90 /// Returns `true` if an event was delivered to at least one listener.
broadcast(&self) -> bool91 fn broadcast(&self) -> bool {
92 use crate::sync::mpsc::error::TrySendError;
93
94 let mut did_notify = false;
95 self.storage.for_each(|event_info| {
96 // Any signal of this kind arrived since we checked last?
97 if !event_info.pending.swap(false, Ordering::SeqCst) {
98 return;
99 }
100
101 let mut recipients = event_info.recipients.lock().unwrap();
102
103 // Notify all waiters on this signal that the signal has been
104 // received. If we can't push a message into the queue then we don't
105 // worry about it as everything is coalesced anyway. If the channel
106 // has gone away then we can remove that slot.
107 for i in (0..recipients.len()).rev() {
108 match recipients[i].try_send(()) {
109 Ok(()) => did_notify = true,
110 Err(TrySendError::Closed(..)) => {
111 recipients.swap_remove(i);
112 }
113
114 // Channel is full, ignore the error since the
115 // receiver has already been woken up
116 Err(_) => {}
117 }
118 }
119 });
120
121 did_notify
122 }
123 }
124
125 pub(crate) struct Globals {
126 extra: OsExtraData,
127 registry: Registry<OsStorage>,
128 }
129
130 impl ops::Deref for Globals {
131 type Target = OsExtraData;
132
deref(&self) -> &Self::Target133 fn deref(&self) -> &Self::Target {
134 &self.extra
135 }
136 }
137
138 impl Globals {
139 /// Registers a new listener for `event_id`.
register_listener(&self, event_id: EventId, listener: Sender<()>)140 pub(crate) fn register_listener(&self, event_id: EventId, listener: Sender<()>) {
141 self.registry.register_listener(event_id, listener);
142 }
143
144 /// Marks `event_id` as having been delivered, without broadcasting it to
145 /// any listeners.
record_event(&self, event_id: EventId)146 pub(crate) fn record_event(&self, event_id: EventId) {
147 self.registry.record_event(event_id);
148 }
149
150 /// Broadcasts all previously recorded events to their respective listeners.
151 ///
152 /// Returns `true` if an event was delivered to at least one listener.
broadcast(&self) -> bool153 pub(crate) fn broadcast(&self) -> bool {
154 self.registry.broadcast()
155 }
156
157 #[cfg(unix)]
storage(&self) -> &OsStorage158 pub(crate) fn storage(&self) -> &OsStorage {
159 &self.registry.storage
160 }
161 }
162
globals() -> Pin<&'static Globals> where OsExtraData: 'static + Send + Sync + Init, OsStorage: 'static + Send + Sync + Init,163 pub(crate) fn globals() -> Pin<&'static Globals>
164 where
165 OsExtraData: 'static + Send + Sync + Init,
166 OsStorage: 'static + Send + Sync + Init,
167 {
168 lazy_static! {
169 static ref GLOBALS: Pin<Box<Globals>> = Box::pin(Globals {
170 extra: OsExtraData::init(),
171 registry: Registry::new(OsStorage::init()),
172 });
173 }
174
175 GLOBALS.as_ref()
176 }
177
178 #[cfg(all(test, not(loom)))]
179 mod tests {
180 use super::*;
181 use crate::runtime::{self, Runtime};
182 use crate::sync::{mpsc, oneshot};
183
184 use futures::future;
185
186 #[test]
smoke()187 fn smoke() {
188 let mut rt = rt();
189 rt.block_on(async move {
190 let registry = Registry::new(vec![
191 EventInfo::default(),
192 EventInfo::default(),
193 EventInfo::default(),
194 ]);
195
196 let (first_tx, first_rx) = mpsc::channel(3);
197 let (second_tx, second_rx) = mpsc::channel(3);
198 let (third_tx, third_rx) = mpsc::channel(3);
199
200 registry.register_listener(0, first_tx);
201 registry.register_listener(1, second_tx);
202 registry.register_listener(2, third_tx);
203
204 let (fire, wait) = oneshot::channel();
205
206 crate::spawn(async {
207 wait.await.expect("wait failed");
208
209 // Record some events which should get coalesced
210 registry.record_event(0);
211 registry.record_event(0);
212 registry.record_event(1);
213 registry.record_event(1);
214 registry.broadcast();
215
216 // Send subsequent signal
217 registry.record_event(0);
218 registry.broadcast();
219
220 drop(registry);
221 });
222
223 let _ = fire.send(());
224 let all = future::join3(collect(first_rx), collect(second_rx), collect(third_rx));
225
226 let (first_results, second_results, third_results) = all.await;
227 assert_eq!(2, first_results.len());
228 assert_eq!(1, second_results.len());
229 assert_eq!(0, third_results.len());
230 });
231 }
232
233 #[test]
234 #[should_panic = "invalid event_id: 1"]
register_panics_on_invalid_input()235 fn register_panics_on_invalid_input() {
236 let registry = Registry::new(vec![EventInfo::default()]);
237
238 let (tx, _) = mpsc::channel(1);
239 registry.register_listener(1, tx);
240 }
241
242 #[test]
record_invalid_event_does_nothing()243 fn record_invalid_event_does_nothing() {
244 let registry = Registry::new(vec![EventInfo::default()]);
245 registry.record_event(42);
246 }
247
248 #[test]
broadcast_cleans_up_disconnected_listeners()249 fn broadcast_cleans_up_disconnected_listeners() {
250 let mut rt = Runtime::new().unwrap();
251
252 rt.block_on(async {
253 let registry = Registry::new(vec![EventInfo::default()]);
254
255 let (first_tx, first_rx) = mpsc::channel(1);
256 let (second_tx, second_rx) = mpsc::channel(1);
257 let (third_tx, third_rx) = mpsc::channel(1);
258
259 registry.register_listener(0, first_tx);
260 registry.register_listener(0, second_tx);
261 registry.register_listener(0, third_tx);
262
263 drop(first_rx);
264 drop(second_rx);
265
266 let (fire, wait) = oneshot::channel();
267
268 crate::spawn(async {
269 wait.await.expect("wait failed");
270
271 registry.record_event(0);
272 registry.broadcast();
273
274 assert_eq!(1, registry.storage[0].recipients.lock().unwrap().len());
275 drop(registry);
276 });
277
278 let _ = fire.send(());
279 let results = collect(third_rx).await;
280
281 assert_eq!(1, results.len());
282 });
283 }
284
285 #[test]
broadcast_returns_if_at_least_one_event_fired()286 fn broadcast_returns_if_at_least_one_event_fired() {
287 let registry = Registry::new(vec![EventInfo::default()]);
288
289 registry.record_event(0);
290 assert_eq!(false, registry.broadcast());
291
292 let (first_tx, first_rx) = mpsc::channel(1);
293 let (second_tx, second_rx) = mpsc::channel(1);
294
295 registry.register_listener(0, first_tx);
296 registry.register_listener(0, second_tx);
297
298 registry.record_event(0);
299 assert_eq!(true, registry.broadcast());
300
301 drop(first_rx);
302 registry.record_event(0);
303 assert_eq!(false, registry.broadcast());
304
305 drop(second_rx);
306 }
307
rt() -> Runtime308 fn rt() -> Runtime {
309 runtime::Builder::new().basic_scheduler().build().unwrap()
310 }
311
collect(mut rx: crate::sync::mpsc::Receiver<()>) -> Vec<()>312 async fn collect(mut rx: crate::sync::mpsc::Receiver<()>) -> Vec<()> {
313 let mut ret = vec![];
314
315 while let Some(v) = rx.recv().await {
316 ret.push(v);
317 }
318
319 ret
320 }
321 }
322