1 use std::{collections::vec_deque::VecDeque, io, time::Duration}; 2 3 #[cfg(unix)] 4 use super::source::unix::UnixInternalEventSource; 5 #[cfg(windows)] 6 use super::source::windows::WindowsEventSource; 7 #[cfg(feature = "event-stream")] 8 use super::sys::Waker; 9 use super::{filter::Filter, source::EventSource, timeout::PollTimeout, InternalEvent, Result}; 10 /// Can be used to read `InternalEvent`s. 11 pub(crate) struct InternalEventReader { 12 events: VecDeque<InternalEvent>, 13 source: Option<Box<dyn EventSource>>, 14 skipped_events: Vec<InternalEvent>, 15 } 16 17 impl Default for InternalEventReader { default() -> Self18 fn default() -> Self { 19 #[cfg(windows)] 20 let source = WindowsEventSource::new(); 21 #[cfg(unix)] 22 let source = UnixInternalEventSource::new(); 23 24 let source = source.ok().map(|x| Box::new(x) as Box<dyn EventSource>); 25 26 InternalEventReader { 27 source, 28 events: VecDeque::with_capacity(32), 29 skipped_events: Vec::with_capacity(32), 30 } 31 } 32 } 33 34 impl InternalEventReader { 35 /// Returns a `Waker` allowing to wake/force the `poll` method to return `Ok(false)`. 36 #[cfg(feature = "event-stream")] waker(&self) -> Waker37 pub(crate) fn waker(&self) -> Waker { 38 self.source.as_ref().expect("reader source not set").waker() 39 } 40 poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> Result<bool> where F: Filter,41 pub(crate) fn poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> Result<bool> 42 where 43 F: Filter, 44 { 45 for event in &self.events { 46 if filter.eval(event) { 47 return Ok(true); 48 } 49 } 50 51 let event_source = match self.source.as_mut() { 52 Some(source) => source, 53 None => { 54 return Err(std::io::Error::new( 55 std::io::ErrorKind::Other, 56 "Failed to initialize input reader", 57 )) 58 } 59 }; 60 61 let poll_timeout = PollTimeout::new(timeout); 62 63 loop { 64 let maybe_event = match event_source.try_read(poll_timeout.leftover()) { 65 Ok(None) => None, 66 Ok(Some(event)) => { 67 if filter.eval(&event) { 68 Some(event) 69 } else { 70 self.skipped_events.push(event); 71 None 72 } 73 } 74 Err(e) => { 75 if e.kind() == io::ErrorKind::Interrupted { 76 return Ok(false); 77 } 78 79 return Err(e); 80 } 81 }; 82 83 if poll_timeout.elapsed() || maybe_event.is_some() { 84 self.events.extend(self.skipped_events.drain(..)); 85 86 if let Some(event) = maybe_event { 87 self.events.push_front(event); 88 return Ok(true); 89 } 90 91 return Ok(false); 92 } 93 } 94 } 95 read<F>(&mut self, filter: &F) -> Result<InternalEvent> where F: Filter,96 pub(crate) fn read<F>(&mut self, filter: &F) -> Result<InternalEvent> 97 where 98 F: Filter, 99 { 100 let mut skipped_events = VecDeque::new(); 101 102 loop { 103 while let Some(event) = self.events.pop_front() { 104 if filter.eval(&event) { 105 while let Some(event) = skipped_events.pop_front() { 106 self.events.push_back(event); 107 } 108 109 return Ok(event); 110 } else { 111 // We can not directly write events back to `self.events`. 112 // If we did, we would put our self's into an endless loop 113 // that would enqueue -> dequeue -> enqueue etc. 114 // This happens because `poll` in this function will always return true if there are events in it's. 115 // And because we just put the non-fulfilling event there this is going to be the case. 116 // Instead we can store them into the temporary buffer, 117 // and then when the filter is fulfilled write all events back in order. 118 skipped_events.push_back(event); 119 } 120 } 121 122 let _ = self.poll(None, filter)?; 123 } 124 } 125 } 126 127 #[cfg(test)] 128 mod tests { 129 use std::io; 130 use std::{collections::VecDeque, time::Duration}; 131 132 use crate::ErrorKind; 133 134 #[cfg(unix)] 135 use super::super::filter::CursorPositionFilter; 136 use super::{ 137 super::{filter::InternalEventFilter, Event}, 138 EventSource, InternalEvent, InternalEventReader, 139 }; 140 141 #[test] test_poll_fails_without_event_source()142 fn test_poll_fails_without_event_source() { 143 let mut reader = InternalEventReader { 144 events: VecDeque::new(), 145 source: None, 146 skipped_events: Vec::with_capacity(32), 147 }; 148 149 assert!(reader.poll(None, &InternalEventFilter).is_err()); 150 assert!(reader 151 .poll(Some(Duration::from_secs(0)), &InternalEventFilter) 152 .is_err()); 153 assert!(reader 154 .poll(Some(Duration::from_secs(10)), &InternalEventFilter) 155 .is_err()); 156 } 157 158 #[test] test_poll_returns_true_for_matching_event_in_queue_at_front()159 fn test_poll_returns_true_for_matching_event_in_queue_at_front() { 160 let mut reader = InternalEventReader { 161 events: vec![InternalEvent::Event(Event::Resize(10, 10))].into(), 162 source: None, 163 skipped_events: Vec::with_capacity(32), 164 }; 165 166 assert!(reader.poll(None, &InternalEventFilter).unwrap()); 167 } 168 169 #[test] 170 #[cfg(unix)] test_poll_returns_true_for_matching_event_in_queue_at_back()171 fn test_poll_returns_true_for_matching_event_in_queue_at_back() { 172 let mut reader = InternalEventReader { 173 events: vec![ 174 InternalEvent::Event(Event::Resize(10, 10)), 175 InternalEvent::CursorPosition(10, 20), 176 ] 177 .into(), 178 source: None, 179 skipped_events: Vec::with_capacity(32), 180 }; 181 182 assert!(reader.poll(None, &CursorPositionFilter).unwrap()); 183 } 184 185 #[test] test_read_returns_matching_event_in_queue_at_front()186 fn test_read_returns_matching_event_in_queue_at_front() { 187 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10)); 188 189 let mut reader = InternalEventReader { 190 events: vec![EVENT].into(), 191 source: None, 192 skipped_events: Vec::with_capacity(32), 193 }; 194 195 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 196 } 197 198 #[test] 199 #[cfg(unix)] test_read_returns_matching_event_in_queue_at_back()200 fn test_read_returns_matching_event_in_queue_at_back() { 201 const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20); 202 203 let mut reader = InternalEventReader { 204 events: vec![InternalEvent::Event(Event::Resize(10, 10)), CURSOR_EVENT].into(), 205 source: None, 206 skipped_events: Vec::with_capacity(32), 207 }; 208 209 assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT); 210 } 211 212 #[test] 213 #[cfg(unix)] test_read_does_not_consume_skipped_event()214 fn test_read_does_not_consume_skipped_event() { 215 const SKIPPED_EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10)); 216 const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20); 217 218 let mut reader = InternalEventReader { 219 events: vec![SKIPPED_EVENT, CURSOR_EVENT].into(), 220 source: None, 221 skipped_events: Vec::with_capacity(32), 222 }; 223 224 assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT); 225 assert_eq!(reader.read(&InternalEventFilter).unwrap(), SKIPPED_EVENT); 226 } 227 228 #[test] test_poll_timeouts_if_source_has_no_events()229 fn test_poll_timeouts_if_source_has_no_events() { 230 let source = FakeSource::default(); 231 232 let mut reader = InternalEventReader { 233 events: VecDeque::new(), 234 source: Some(Box::new(source)), 235 skipped_events: Vec::with_capacity(32), 236 }; 237 238 assert!(!reader 239 .poll(Some(Duration::from_secs(0)), &InternalEventFilter) 240 .unwrap()); 241 } 242 243 #[test] test_poll_returns_true_if_source_has_at_least_one_event()244 fn test_poll_returns_true_if_source_has_at_least_one_event() { 245 let source = FakeSource::with_events(&[InternalEvent::Event(Event::Resize(10, 10))]); 246 247 let mut reader = InternalEventReader { 248 events: VecDeque::new(), 249 source: Some(Box::new(source)), 250 skipped_events: Vec::with_capacity(32), 251 }; 252 253 assert!(reader.poll(None, &InternalEventFilter).unwrap()); 254 assert!(reader 255 .poll(Some(Duration::from_secs(0)), &InternalEventFilter) 256 .unwrap()); 257 } 258 259 #[test] test_reads_returns_event_if_source_has_at_least_one_event()260 fn test_reads_returns_event_if_source_has_at_least_one_event() { 261 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10)); 262 263 let source = FakeSource::with_events(&[EVENT]); 264 265 let mut reader = InternalEventReader { 266 events: VecDeque::new(), 267 source: Some(Box::new(source)), 268 skipped_events: Vec::with_capacity(32), 269 }; 270 271 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 272 } 273 274 #[test] test_read_returns_events_if_source_has_events()275 fn test_read_returns_events_if_source_has_events() { 276 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10)); 277 278 let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]); 279 280 let mut reader = InternalEventReader { 281 events: VecDeque::new(), 282 source: Some(Box::new(source)), 283 skipped_events: Vec::with_capacity(32), 284 }; 285 286 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 287 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 288 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 289 } 290 291 #[test] test_poll_returns_false_after_all_source_events_are_consumed()292 fn test_poll_returns_false_after_all_source_events_are_consumed() { 293 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10)); 294 295 let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]); 296 297 let mut reader = InternalEventReader { 298 events: VecDeque::new(), 299 source: Some(Box::new(source)), 300 skipped_events: Vec::with_capacity(32), 301 }; 302 303 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 304 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 305 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 306 assert!(!reader 307 .poll(Some(Duration::from_secs(0)), &InternalEventFilter) 308 .unwrap()); 309 } 310 311 #[test] test_poll_propagates_error()312 fn test_poll_propagates_error() { 313 let source = FakeSource::with_error(ErrorKind::from(io::ErrorKind::Other)); 314 315 let mut reader = InternalEventReader { 316 events: VecDeque::new(), 317 source: Some(Box::new(source)), 318 skipped_events: Vec::with_capacity(32), 319 }; 320 321 assert_eq!( 322 reader 323 .poll(Some(Duration::from_secs(0)), &InternalEventFilter) 324 .err() 325 .map(|e| format!("{:?}", &e)), 326 Some(format!("{:?}", ErrorKind::from(io::ErrorKind::Other))) 327 ); 328 } 329 330 #[test] test_read_propagates_error()331 fn test_read_propagates_error() { 332 let source = FakeSource::with_error(ErrorKind::from(io::ErrorKind::Other)); 333 334 let mut reader = InternalEventReader { 335 events: VecDeque::new(), 336 source: Some(Box::new(source)), 337 skipped_events: Vec::with_capacity(32), 338 }; 339 340 assert_eq!( 341 reader 342 .read(&InternalEventFilter) 343 .err() 344 .map(|e| format!("{:?}", &e)), 345 Some(format!("{:?}", ErrorKind::from(io::ErrorKind::Other))) 346 ); 347 } 348 349 #[test] test_poll_continues_after_error()350 fn test_poll_continues_after_error() { 351 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10)); 352 353 let source = FakeSource::new(&[EVENT, EVENT], ErrorKind::from(io::ErrorKind::Other)); 354 355 let mut reader = InternalEventReader { 356 events: VecDeque::new(), 357 source: Some(Box::new(source)), 358 skipped_events: Vec::with_capacity(32), 359 }; 360 361 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 362 assert!(reader.read(&InternalEventFilter).is_err()); 363 assert!(reader 364 .poll(Some(Duration::from_secs(0)), &InternalEventFilter) 365 .unwrap()); 366 } 367 368 #[test] test_read_continues_after_error()369 fn test_read_continues_after_error() { 370 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10)); 371 372 let source = FakeSource::new(&[EVENT, EVENT], ErrorKind::from(io::ErrorKind::Other)); 373 374 let mut reader = InternalEventReader { 375 events: VecDeque::new(), 376 source: Some(Box::new(source)), 377 skipped_events: Vec::with_capacity(32), 378 }; 379 380 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 381 assert!(reader.read(&InternalEventFilter).is_err()); 382 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 383 } 384 385 #[derive(Default)] 386 struct FakeSource { 387 events: VecDeque<InternalEvent>, 388 error: Option<ErrorKind>, 389 } 390 391 impl FakeSource { new(events: &[InternalEvent], error: ErrorKind) -> FakeSource392 fn new(events: &[InternalEvent], error: ErrorKind) -> FakeSource { 393 FakeSource { 394 events: events.to_vec().into(), 395 error: Some(error), 396 } 397 } 398 with_events(events: &[InternalEvent]) -> FakeSource399 fn with_events(events: &[InternalEvent]) -> FakeSource { 400 FakeSource { 401 events: events.to_vec().into(), 402 error: None, 403 } 404 } 405 with_error(error: ErrorKind) -> FakeSource406 fn with_error(error: ErrorKind) -> FakeSource { 407 FakeSource { 408 events: VecDeque::new(), 409 error: Some(error), 410 } 411 } 412 } 413 414 impl EventSource for FakeSource { try_read( &mut self, _timeout: Option<Duration>, ) -> Result<Option<InternalEvent>, ErrorKind>415 fn try_read( 416 &mut self, 417 _timeout: Option<Duration>, 418 ) -> Result<Option<InternalEvent>, ErrorKind> { 419 // Return error if set in case there's just one remaining event 420 if self.events.len() == 1 { 421 if let Some(error) = self.error.take() { 422 return Err(error); 423 } 424 } 425 426 // Return all events from the queue 427 if let Some(event) = self.events.pop_front() { 428 return Ok(Some(event)); 429 } 430 431 // Return error if there're no more events 432 if let Some(error) = self.error.take() { 433 return Err(error); 434 } 435 436 // Timeout 437 Ok(None) 438 } 439 440 #[cfg(feature = "event-stream")] waker(&self) -> super::super::sys::Waker441 fn waker(&self) -> super::super::sys::Waker { 442 unimplemented!(); 443 } 444 } 445 } 446