1 use std::{collections::vec_deque::VecDeque, io, time::Duration};
2 
3 #[cfg(unix)]
4 use super::source::unix::UnixInternalEventSource;
5 #[cfg(windows)]
6 use super::source::windows::WindowsEventSource;
7 #[cfg(feature = "event-stream")]
8 use super::sys::Waker;
9 use super::{filter::Filter, source::EventSource, timeout::PollTimeout, InternalEvent, Result};
10 /// Can be used to read `InternalEvent`s.
11 pub(crate) struct InternalEventReader {
12     events: VecDeque<InternalEvent>,
13     source: Option<Box<dyn EventSource>>,
14     skipped_events: Vec<InternalEvent>,
15 }
16 
17 impl Default for InternalEventReader {
default() -> Self18     fn default() -> Self {
19         #[cfg(windows)]
20         let source = WindowsEventSource::new();
21         #[cfg(unix)]
22         let source = UnixInternalEventSource::new();
23 
24         let source = source.ok().map(|x| Box::new(x) as Box<dyn EventSource>);
25 
26         InternalEventReader {
27             source,
28             events: VecDeque::with_capacity(32),
29             skipped_events: Vec::with_capacity(32),
30         }
31     }
32 }
33 
34 impl InternalEventReader {
35     /// Returns a `Waker` allowing to wake/force the `poll` method to return `Ok(false)`.
36     #[cfg(feature = "event-stream")]
waker(&self) -> Waker37     pub(crate) fn waker(&self) -> Waker {
38         self.source.as_ref().expect("reader source not set").waker()
39     }
40 
poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> Result<bool> where F: Filter,41     pub(crate) fn poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> Result<bool>
42     where
43         F: Filter,
44     {
45         for event in &self.events {
46             if filter.eval(event) {
47                 return Ok(true);
48             }
49         }
50 
51         let event_source = match self.source.as_mut() {
52             Some(source) => source,
53             None => {
54                 return Err(std::io::Error::new(
55                     std::io::ErrorKind::Other,
56                     "Failed to initialize input reader",
57                 ))
58             }
59         };
60 
61         let poll_timeout = PollTimeout::new(timeout);
62 
63         loop {
64             let maybe_event = match event_source.try_read(poll_timeout.leftover()) {
65                 Ok(None) => None,
66                 Ok(Some(event)) => {
67                     if filter.eval(&event) {
68                         Some(event)
69                     } else {
70                         self.skipped_events.push(event);
71                         None
72                     }
73                 }
74                 Err(e) => {
75                     if e.kind() == io::ErrorKind::Interrupted {
76                         return Ok(false);
77                     }
78 
79                     return Err(e);
80                 }
81             };
82 
83             if poll_timeout.elapsed() || maybe_event.is_some() {
84                 self.events.extend(self.skipped_events.drain(..));
85 
86                 if let Some(event) = maybe_event {
87                     self.events.push_front(event);
88                     return Ok(true);
89                 }
90 
91                 return Ok(false);
92             }
93         }
94     }
95 
read<F>(&mut self, filter: &F) -> Result<InternalEvent> where F: Filter,96     pub(crate) fn read<F>(&mut self, filter: &F) -> Result<InternalEvent>
97     where
98         F: Filter,
99     {
100         let mut skipped_events = VecDeque::new();
101 
102         loop {
103             while let Some(event) = self.events.pop_front() {
104                 if filter.eval(&event) {
105                     while let Some(event) = skipped_events.pop_front() {
106                         self.events.push_back(event);
107                     }
108 
109                     return Ok(event);
110                 } else {
111                     // We can not directly write events back to `self.events`.
112                     // If we did, we would put our self's into an endless loop
113                     // that would enqueue -> dequeue -> enqueue etc.
114                     // This happens because `poll` in this function will always return true if there are events in it's.
115                     // And because we just put the non-fulfilling event there this is going to be the case.
116                     // Instead we can store them into the temporary buffer,
117                     // and then when the filter is fulfilled write all events back in order.
118                     skipped_events.push_back(event);
119                 }
120             }
121 
122             let _ = self.poll(None, filter)?;
123         }
124     }
125 }
126 
127 #[cfg(test)]
128 mod tests {
129     use std::io;
130     use std::{collections::VecDeque, time::Duration};
131 
132     use crate::ErrorKind;
133 
134     #[cfg(unix)]
135     use super::super::filter::CursorPositionFilter;
136     use super::{
137         super::{filter::InternalEventFilter, Event},
138         EventSource, InternalEvent, InternalEventReader,
139     };
140 
141     #[test]
test_poll_fails_without_event_source()142     fn test_poll_fails_without_event_source() {
143         let mut reader = InternalEventReader {
144             events: VecDeque::new(),
145             source: None,
146             skipped_events: Vec::with_capacity(32),
147         };
148 
149         assert!(reader.poll(None, &InternalEventFilter).is_err());
150         assert!(reader
151             .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
152             .is_err());
153         assert!(reader
154             .poll(Some(Duration::from_secs(10)), &InternalEventFilter)
155             .is_err());
156     }
157 
158     #[test]
test_poll_returns_true_for_matching_event_in_queue_at_front()159     fn test_poll_returns_true_for_matching_event_in_queue_at_front() {
160         let mut reader = InternalEventReader {
161             events: vec![InternalEvent::Event(Event::Resize(10, 10))].into(),
162             source: None,
163             skipped_events: Vec::with_capacity(32),
164         };
165 
166         assert!(reader.poll(None, &InternalEventFilter).unwrap());
167     }
168 
169     #[test]
170     #[cfg(unix)]
test_poll_returns_true_for_matching_event_in_queue_at_back()171     fn test_poll_returns_true_for_matching_event_in_queue_at_back() {
172         let mut reader = InternalEventReader {
173             events: vec![
174                 InternalEvent::Event(Event::Resize(10, 10)),
175                 InternalEvent::CursorPosition(10, 20),
176             ]
177             .into(),
178             source: None,
179             skipped_events: Vec::with_capacity(32),
180         };
181 
182         assert!(reader.poll(None, &CursorPositionFilter).unwrap());
183     }
184 
185     #[test]
test_read_returns_matching_event_in_queue_at_front()186     fn test_read_returns_matching_event_in_queue_at_front() {
187         const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
188 
189         let mut reader = InternalEventReader {
190             events: vec![EVENT].into(),
191             source: None,
192             skipped_events: Vec::with_capacity(32),
193         };
194 
195         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
196     }
197 
198     #[test]
199     #[cfg(unix)]
test_read_returns_matching_event_in_queue_at_back()200     fn test_read_returns_matching_event_in_queue_at_back() {
201         const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
202 
203         let mut reader = InternalEventReader {
204             events: vec![InternalEvent::Event(Event::Resize(10, 10)), CURSOR_EVENT].into(),
205             source: None,
206             skipped_events: Vec::with_capacity(32),
207         };
208 
209         assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
210     }
211 
212     #[test]
213     #[cfg(unix)]
test_read_does_not_consume_skipped_event()214     fn test_read_does_not_consume_skipped_event() {
215         const SKIPPED_EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
216         const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
217 
218         let mut reader = InternalEventReader {
219             events: vec![SKIPPED_EVENT, CURSOR_EVENT].into(),
220             source: None,
221             skipped_events: Vec::with_capacity(32),
222         };
223 
224         assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
225         assert_eq!(reader.read(&InternalEventFilter).unwrap(), SKIPPED_EVENT);
226     }
227 
228     #[test]
test_poll_timeouts_if_source_has_no_events()229     fn test_poll_timeouts_if_source_has_no_events() {
230         let source = FakeSource::default();
231 
232         let mut reader = InternalEventReader {
233             events: VecDeque::new(),
234             source: Some(Box::new(source)),
235             skipped_events: Vec::with_capacity(32),
236         };
237 
238         assert!(!reader
239             .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
240             .unwrap());
241     }
242 
243     #[test]
test_poll_returns_true_if_source_has_at_least_one_event()244     fn test_poll_returns_true_if_source_has_at_least_one_event() {
245         let source = FakeSource::with_events(&[InternalEvent::Event(Event::Resize(10, 10))]);
246 
247         let mut reader = InternalEventReader {
248             events: VecDeque::new(),
249             source: Some(Box::new(source)),
250             skipped_events: Vec::with_capacity(32),
251         };
252 
253         assert!(reader.poll(None, &InternalEventFilter).unwrap());
254         assert!(reader
255             .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
256             .unwrap());
257     }
258 
259     #[test]
test_reads_returns_event_if_source_has_at_least_one_event()260     fn test_reads_returns_event_if_source_has_at_least_one_event() {
261         const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
262 
263         let source = FakeSource::with_events(&[EVENT]);
264 
265         let mut reader = InternalEventReader {
266             events: VecDeque::new(),
267             source: Some(Box::new(source)),
268             skipped_events: Vec::with_capacity(32),
269         };
270 
271         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
272     }
273 
274     #[test]
test_read_returns_events_if_source_has_events()275     fn test_read_returns_events_if_source_has_events() {
276         const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
277 
278         let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
279 
280         let mut reader = InternalEventReader {
281             events: VecDeque::new(),
282             source: Some(Box::new(source)),
283             skipped_events: Vec::with_capacity(32),
284         };
285 
286         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
287         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
288         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
289     }
290 
291     #[test]
test_poll_returns_false_after_all_source_events_are_consumed()292     fn test_poll_returns_false_after_all_source_events_are_consumed() {
293         const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
294 
295         let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
296 
297         let mut reader = InternalEventReader {
298             events: VecDeque::new(),
299             source: Some(Box::new(source)),
300             skipped_events: Vec::with_capacity(32),
301         };
302 
303         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
304         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
305         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
306         assert!(!reader
307             .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
308             .unwrap());
309     }
310 
311     #[test]
test_poll_propagates_error()312     fn test_poll_propagates_error() {
313         let source = FakeSource::with_error(ErrorKind::from(io::ErrorKind::Other));
314 
315         let mut reader = InternalEventReader {
316             events: VecDeque::new(),
317             source: Some(Box::new(source)),
318             skipped_events: Vec::with_capacity(32),
319         };
320 
321         assert_eq!(
322             reader
323                 .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
324                 .err()
325                 .map(|e| format!("{:?}", &e)),
326             Some(format!("{:?}", ErrorKind::from(io::ErrorKind::Other)))
327         );
328     }
329 
330     #[test]
test_read_propagates_error()331     fn test_read_propagates_error() {
332         let source = FakeSource::with_error(ErrorKind::from(io::ErrorKind::Other));
333 
334         let mut reader = InternalEventReader {
335             events: VecDeque::new(),
336             source: Some(Box::new(source)),
337             skipped_events: Vec::with_capacity(32),
338         };
339 
340         assert_eq!(
341             reader
342                 .read(&InternalEventFilter)
343                 .err()
344                 .map(|e| format!("{:?}", &e)),
345             Some(format!("{:?}", ErrorKind::from(io::ErrorKind::Other)))
346         );
347     }
348 
349     #[test]
test_poll_continues_after_error()350     fn test_poll_continues_after_error() {
351         const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
352 
353         let source = FakeSource::new(&[EVENT, EVENT], ErrorKind::from(io::ErrorKind::Other));
354 
355         let mut reader = InternalEventReader {
356             events: VecDeque::new(),
357             source: Some(Box::new(source)),
358             skipped_events: Vec::with_capacity(32),
359         };
360 
361         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
362         assert!(reader.read(&InternalEventFilter).is_err());
363         assert!(reader
364             .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
365             .unwrap());
366     }
367 
368     #[test]
test_read_continues_after_error()369     fn test_read_continues_after_error() {
370         const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
371 
372         let source = FakeSource::new(&[EVENT, EVENT], ErrorKind::from(io::ErrorKind::Other));
373 
374         let mut reader = InternalEventReader {
375             events: VecDeque::new(),
376             source: Some(Box::new(source)),
377             skipped_events: Vec::with_capacity(32),
378         };
379 
380         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
381         assert!(reader.read(&InternalEventFilter).is_err());
382         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
383     }
384 
385     #[derive(Default)]
386     struct FakeSource {
387         events: VecDeque<InternalEvent>,
388         error: Option<ErrorKind>,
389     }
390 
391     impl FakeSource {
new(events: &[InternalEvent], error: ErrorKind) -> FakeSource392         fn new(events: &[InternalEvent], error: ErrorKind) -> FakeSource {
393             FakeSource {
394                 events: events.to_vec().into(),
395                 error: Some(error),
396             }
397         }
398 
with_events(events: &[InternalEvent]) -> FakeSource399         fn with_events(events: &[InternalEvent]) -> FakeSource {
400             FakeSource {
401                 events: events.to_vec().into(),
402                 error: None,
403             }
404         }
405 
with_error(error: ErrorKind) -> FakeSource406         fn with_error(error: ErrorKind) -> FakeSource {
407             FakeSource {
408                 events: VecDeque::new(),
409                 error: Some(error),
410             }
411         }
412     }
413 
414     impl EventSource for FakeSource {
try_read( &mut self, _timeout: Option<Duration>, ) -> Result<Option<InternalEvent>, ErrorKind>415         fn try_read(
416             &mut self,
417             _timeout: Option<Duration>,
418         ) -> Result<Option<InternalEvent>, ErrorKind> {
419             // Return error if set in case there's just one remaining event
420             if self.events.len() == 1 {
421                 if let Some(error) = self.error.take() {
422                     return Err(error);
423                 }
424             }
425 
426             // Return all events from the queue
427             if let Some(event) = self.events.pop_front() {
428                 return Ok(Some(event));
429             }
430 
431             // Return error if there're no more events
432             if let Some(error) = self.error.take() {
433                 return Err(error);
434             }
435 
436             // Timeout
437             Ok(None)
438         }
439 
440         #[cfg(feature = "event-stream")]
waker(&self) -> super::super::sys::Waker441         fn waker(&self) -> super::super::sys::Waker {
442             unimplemented!();
443         }
444     }
445 }
446