1 use std::{collections::vec_deque::VecDeque, io, time::Duration};
2 
3 use crate::ErrorKind;
4 
5 #[cfg(unix)]
6 use super::source::unix::UnixInternalEventSource;
7 #[cfg(windows)]
8 use super::source::windows::WindowsEventSource;
9 #[cfg(feature = "event-stream")]
10 use super::sys::Waker;
11 use super::{filter::Filter, source::EventSource, timeout::PollTimeout, InternalEvent, Result};
12 
13 /// Can be used to read `InternalEvent`s.
14 pub(crate) struct InternalEventReader {
15     events: VecDeque<InternalEvent>,
16     source: Option<Box<dyn EventSource>>,
17     skipped_events: Vec<InternalEvent>,
18 }
19 
20 impl Default for InternalEventReader {
default() -> Self21     fn default() -> Self {
22         #[cfg(windows)]
23         let source = WindowsEventSource::new();
24         #[cfg(unix)]
25         let source = UnixInternalEventSource::new();
26 
27         let source = source.ok().map(|x| Box::new(x) as Box<dyn EventSource>);
28 
29         InternalEventReader {
30             source,
31             events: VecDeque::with_capacity(32),
32             skipped_events: Vec::with_capacity(32),
33         }
34     }
35 }
36 
37 impl InternalEventReader {
38     /// Returns a `Waker` allowing to wake/force the `poll` method to return `Ok(false)`.
39     #[cfg(feature = "event-stream")]
waker(&self) -> Waker40     pub(crate) fn waker(&self) -> Waker {
41         self.source.as_ref().expect("reader source not set").waker()
42     }
43 
poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> Result<bool> where F: Filter,44     pub(crate) fn poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> Result<bool>
45     where
46         F: Filter,
47     {
48         for event in &self.events {
49             if filter.eval(&event) {
50                 return Ok(true);
51             }
52         }
53 
54         let event_source = match self.source.as_mut() {
55             Some(source) => source,
56             None => {
57                 return Err(std::io::Error::new(
58                     std::io::ErrorKind::Other,
59                     "Failed to initialize input reader",
60                 )
61                 .into())
62             }
63         };
64 
65         let poll_timeout = PollTimeout::new(timeout);
66 
67         loop {
68             let maybe_event = match event_source.try_read(timeout) {
69                 Ok(None) => None,
70                 Ok(Some(event)) => {
71                     if filter.eval(&event) {
72                         Some(event)
73                     } else {
74                         self.skipped_events.push(event);
75                         None
76                     }
77                 }
78                 Err(ErrorKind::IoError(e)) => {
79                     if e.kind() == io::ErrorKind::Interrupted {
80                         return Ok(false);
81                     }
82 
83                     return Err(ErrorKind::IoError(e));
84                 }
85                 Err(e) => return Err(e),
86             };
87 
88             if poll_timeout.elapsed() || maybe_event.is_some() {
89                 self.events.extend(self.skipped_events.drain(..));
90 
91                 if let Some(event) = maybe_event {
92                     self.events.push_front(event);
93                     return Ok(true);
94                 }
95 
96                 return Ok(false);
97             }
98         }
99     }
100 
read<F>(&mut self, filter: &F) -> Result<InternalEvent> where F: Filter,101     pub(crate) fn read<F>(&mut self, filter: &F) -> Result<InternalEvent>
102     where
103         F: Filter,
104     {
105         let mut skipped_events = VecDeque::new();
106 
107         loop {
108             while let Some(event) = self.events.pop_front() {
109                 if filter.eval(&event) {
110                     while let Some(event) = skipped_events.pop_front() {
111                         self.events.push_back(event);
112                     }
113 
114                     return Ok(event);
115                 } else {
116                     // We can not directly write events back to `self.events`.
117                     // If we did, we would put our self's into an endless loop
118                     // that would enqueue -> dequeue -> enqueue etc.
119                     // This happens because `poll` in this function will always return true if there are events in it's.
120                     // And because we just put the non-fulfilling event there this is going to be the case.
121                     // Instead we can store them into the temporary buffer,
122                     // and then when the filter is fulfilled write all events back in order.
123                     skipped_events.push_back(event);
124                 }
125             }
126 
127             let _ = self.poll(None, filter)?;
128         }
129     }
130 }
131 
132 #[cfg(test)]
133 mod tests {
134     use std::{collections::VecDeque, time::Duration};
135 
136     use crate::ErrorKind;
137 
138     #[cfg(unix)]
139     use super::super::filter::CursorPositionFilter;
140     use super::{
141         super::{filter::InternalEventFilter, Event},
142         EventSource, InternalEvent, InternalEventReader,
143     };
144 
145     #[test]
test_poll_fails_without_event_source()146     fn test_poll_fails_without_event_source() {
147         let mut reader = InternalEventReader {
148             events: VecDeque::new(),
149             source: None,
150             skipped_events: Vec::with_capacity(32),
151         };
152 
153         assert!(reader.poll(None, &InternalEventFilter).is_err());
154         assert!(reader
155             .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
156             .is_err());
157         assert!(reader
158             .poll(Some(Duration::from_secs(10)), &InternalEventFilter)
159             .is_err());
160     }
161 
162     #[test]
test_poll_returns_true_for_matching_event_in_queue_at_front()163     fn test_poll_returns_true_for_matching_event_in_queue_at_front() {
164         let mut reader = InternalEventReader {
165             events: vec![InternalEvent::Event(Event::Resize(10, 10))].into(),
166             source: None,
167             skipped_events: Vec::with_capacity(32),
168         };
169 
170         assert!(reader.poll(None, &InternalEventFilter).unwrap());
171     }
172 
173     #[test]
174     #[cfg(unix)]
test_poll_returns_true_for_matching_event_in_queue_at_back()175     fn test_poll_returns_true_for_matching_event_in_queue_at_back() {
176         let mut reader = InternalEventReader {
177             events: vec![
178                 InternalEvent::Event(Event::Resize(10, 10)),
179                 InternalEvent::CursorPosition(10, 20),
180             ]
181             .into(),
182             source: None,
183             skipped_events: Vec::with_capacity(32),
184         };
185 
186         assert!(reader.poll(None, &CursorPositionFilter).unwrap());
187     }
188 
189     #[test]
test_read_returns_matching_event_in_queue_at_front()190     fn test_read_returns_matching_event_in_queue_at_front() {
191         const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
192 
193         let mut reader = InternalEventReader {
194             events: vec![EVENT].into(),
195             source: None,
196             skipped_events: Vec::with_capacity(32),
197         };
198 
199         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
200     }
201 
202     #[test]
203     #[cfg(unix)]
test_read_returns_matching_event_in_queue_at_back()204     fn test_read_returns_matching_event_in_queue_at_back() {
205         const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
206 
207         let mut reader = InternalEventReader {
208             events: vec![InternalEvent::Event(Event::Resize(10, 10)), CURSOR_EVENT].into(),
209             source: None,
210             skipped_events: Vec::with_capacity(32),
211         };
212 
213         assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
214     }
215 
216     #[test]
217     #[cfg(unix)]
test_read_does_not_consume_skipped_event()218     fn test_read_does_not_consume_skipped_event() {
219         const SKIPPED_EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
220         const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
221 
222         let mut reader = InternalEventReader {
223             events: vec![SKIPPED_EVENT, CURSOR_EVENT].into(),
224             source: None,
225             skipped_events: Vec::with_capacity(32),
226         };
227 
228         assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
229         assert_eq!(reader.read(&InternalEventFilter).unwrap(), SKIPPED_EVENT);
230     }
231 
232     #[test]
test_poll_timeouts_if_source_has_no_events()233     fn test_poll_timeouts_if_source_has_no_events() {
234         let source = FakeSource::default();
235 
236         let mut reader = InternalEventReader {
237             events: VecDeque::new(),
238             source: Some(Box::new(source)),
239             skipped_events: Vec::with_capacity(32),
240         };
241 
242         assert!(!reader
243             .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
244             .unwrap());
245     }
246 
247     #[test]
test_poll_returns_true_if_source_has_at_least_one_event()248     fn test_poll_returns_true_if_source_has_at_least_one_event() {
249         let source = FakeSource::with_events(&[InternalEvent::Event(Event::Resize(10, 10))]);
250 
251         let mut reader = InternalEventReader {
252             events: VecDeque::new(),
253             source: Some(Box::new(source)),
254             skipped_events: Vec::with_capacity(32),
255         };
256 
257         assert!(reader.poll(None, &InternalEventFilter).unwrap());
258         assert!(reader
259             .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
260             .unwrap());
261     }
262 
263     #[test]
test_reads_returns_event_if_source_has_at_least_one_event()264     fn test_reads_returns_event_if_source_has_at_least_one_event() {
265         const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
266 
267         let source = FakeSource::with_events(&[EVENT]);
268 
269         let mut reader = InternalEventReader {
270             events: VecDeque::new(),
271             source: Some(Box::new(source)),
272             skipped_events: Vec::with_capacity(32),
273         };
274 
275         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
276     }
277 
278     #[test]
test_read_returns_events_if_source_has_events()279     fn test_read_returns_events_if_source_has_events() {
280         const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
281 
282         let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
283 
284         let mut reader = InternalEventReader {
285             events: VecDeque::new(),
286             source: Some(Box::new(source)),
287             skipped_events: Vec::with_capacity(32),
288         };
289 
290         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
291         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
292         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
293     }
294 
295     #[test]
test_poll_returns_false_after_all_source_events_are_consumed()296     fn test_poll_returns_false_after_all_source_events_are_consumed() {
297         const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
298 
299         let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
300 
301         let mut reader = InternalEventReader {
302             events: VecDeque::new(),
303             source: Some(Box::new(source)),
304             skipped_events: Vec::with_capacity(32),
305         };
306 
307         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
308         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
309         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
310         assert!(!reader
311             .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
312             .unwrap());
313     }
314 
315     #[test]
test_poll_propagates_error()316     fn test_poll_propagates_error() {
317         let source = FakeSource::with_error(ErrorKind::ResizingTerminalFailure("Foo".to_string()));
318 
319         let mut reader = InternalEventReader {
320             events: VecDeque::new(),
321             source: Some(Box::new(source)),
322             skipped_events: Vec::with_capacity(32),
323         };
324 
325         assert_eq!(
326             reader
327                 .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
328                 .err()
329                 .map(|e| format!("{:?}", &e)),
330             Some(format!(
331                 "{:?}",
332                 ErrorKind::ResizingTerminalFailure("Foo".to_string())
333             ))
334         );
335     }
336 
337     #[test]
test_read_propagates_error()338     fn test_read_propagates_error() {
339         let source = FakeSource::with_error(ErrorKind::ResizingTerminalFailure("Foo".to_string()));
340 
341         let mut reader = InternalEventReader {
342             events: VecDeque::new(),
343             source: Some(Box::new(source)),
344             skipped_events: Vec::with_capacity(32),
345         };
346 
347         assert_eq!(
348             reader
349                 .read(&InternalEventFilter)
350                 .err()
351                 .map(|e| format!("{:?}", &e)),
352             Some(format!(
353                 "{:?}",
354                 ErrorKind::ResizingTerminalFailure("Foo".to_string())
355             ))
356         );
357     }
358 
359     #[test]
test_poll_continues_after_error()360     fn test_poll_continues_after_error() {
361         const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
362 
363         let source = FakeSource::new(
364             &[EVENT, EVENT],
365             ErrorKind::ResizingTerminalFailure("Foo".to_string()),
366         );
367 
368         let mut reader = InternalEventReader {
369             events: VecDeque::new(),
370             source: Some(Box::new(source)),
371             skipped_events: Vec::with_capacity(32),
372         };
373 
374         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
375         assert!(reader.read(&InternalEventFilter).is_err());
376         assert!(reader
377             .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
378             .unwrap());
379     }
380 
381     #[test]
test_read_continues_after_error()382     fn test_read_continues_after_error() {
383         const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
384 
385         let source = FakeSource::new(
386             &[EVENT, EVENT],
387             ErrorKind::ResizingTerminalFailure("Foo".to_string()),
388         );
389 
390         let mut reader = InternalEventReader {
391             events: VecDeque::new(),
392             source: Some(Box::new(source)),
393             skipped_events: Vec::with_capacity(32),
394         };
395 
396         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
397         assert!(reader.read(&InternalEventFilter).is_err());
398         assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
399     }
400 
401     #[derive(Default)]
402     struct FakeSource {
403         events: VecDeque<InternalEvent>,
404         error: Option<ErrorKind>,
405     }
406 
407     impl FakeSource {
new(events: &[InternalEvent], error: ErrorKind) -> FakeSource408         fn new(events: &[InternalEvent], error: ErrorKind) -> FakeSource {
409             FakeSource {
410                 events: events.to_vec().into(),
411                 error: Some(error),
412             }
413         }
414 
with_events(events: &[InternalEvent]) -> FakeSource415         fn with_events(events: &[InternalEvent]) -> FakeSource {
416             FakeSource {
417                 events: events.to_vec().into(),
418                 error: None,
419             }
420         }
421 
with_error(error: ErrorKind) -> FakeSource422         fn with_error(error: ErrorKind) -> FakeSource {
423             FakeSource {
424                 events: VecDeque::new(),
425                 error: Some(error),
426             }
427         }
428     }
429 
430     impl EventSource for FakeSource {
try_read( &mut self, _timeout: Option<Duration>, ) -> Result<Option<InternalEvent>, ErrorKind>431         fn try_read(
432             &mut self,
433             _timeout: Option<Duration>,
434         ) -> Result<Option<InternalEvent>, ErrorKind> {
435             // Return error if set in case there's just one remaining event
436             if self.events.len() == 1 {
437                 if let Some(error) = self.error.take() {
438                     return Err(error);
439                 }
440             }
441 
442             // Return all events from the queue
443             if let Some(event) = self.events.pop_front() {
444                 return Ok(Some(event));
445             }
446 
447             // Return error if there're no more events
448             if let Some(error) = self.error.take() {
449                 return Err(error);
450             }
451 
452             // Timeout
453             Ok(None)
454         }
455 
456         #[cfg(feature = "event-stream")]
waker(&self) -> super::super::sys::Waker457         fn waker(&self) -> super::super::sys::Waker {
458             unimplemented!();
459         }
460     }
461 }
462