1 use std::{collections::vec_deque::VecDeque, io, time::Duration}; 2 3 use crate::ErrorKind; 4 5 #[cfg(unix)] 6 use super::source::unix::UnixInternalEventSource; 7 #[cfg(windows)] 8 use super::source::windows::WindowsEventSource; 9 #[cfg(feature = "event-stream")] 10 use super::sys::Waker; 11 use super::{filter::Filter, source::EventSource, timeout::PollTimeout, InternalEvent, Result}; 12 13 /// Can be used to read `InternalEvent`s. 14 pub(crate) struct InternalEventReader { 15 events: VecDeque<InternalEvent>, 16 source: Option<Box<dyn EventSource>>, 17 skipped_events: Vec<InternalEvent>, 18 } 19 20 impl Default for InternalEventReader { default() -> Self21 fn default() -> Self { 22 #[cfg(windows)] 23 let source = WindowsEventSource::new(); 24 #[cfg(unix)] 25 let source = UnixInternalEventSource::new(); 26 27 let source = source.ok().map(|x| Box::new(x) as Box<dyn EventSource>); 28 29 InternalEventReader { 30 source, 31 events: VecDeque::with_capacity(32), 32 skipped_events: Vec::with_capacity(32), 33 } 34 } 35 } 36 37 impl InternalEventReader { 38 /// Returns a `Waker` allowing to wake/force the `poll` method to return `Ok(false)`. 39 #[cfg(feature = "event-stream")] waker(&self) -> Waker40 pub(crate) fn waker(&self) -> Waker { 41 self.source.as_ref().expect("reader source not set").waker() 42 } 43 poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> Result<bool> where F: Filter,44 pub(crate) fn poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> Result<bool> 45 where 46 F: Filter, 47 { 48 for event in &self.events { 49 if filter.eval(&event) { 50 return Ok(true); 51 } 52 } 53 54 let event_source = match self.source.as_mut() { 55 Some(source) => source, 56 None => { 57 return Err(std::io::Error::new( 58 std::io::ErrorKind::Other, 59 "Failed to initialize input reader", 60 ) 61 .into()) 62 } 63 }; 64 65 let poll_timeout = PollTimeout::new(timeout); 66 67 loop { 68 let maybe_event = match event_source.try_read(timeout) { 69 Ok(None) => None, 70 Ok(Some(event)) => { 71 if filter.eval(&event) { 72 Some(event) 73 } else { 74 self.skipped_events.push(event); 75 None 76 } 77 } 78 Err(ErrorKind::IoError(e)) => { 79 if e.kind() == io::ErrorKind::Interrupted { 80 return Ok(false); 81 } 82 83 return Err(ErrorKind::IoError(e)); 84 } 85 Err(e) => return Err(e), 86 }; 87 88 if poll_timeout.elapsed() || maybe_event.is_some() { 89 self.events.extend(self.skipped_events.drain(..)); 90 91 if let Some(event) = maybe_event { 92 self.events.push_front(event); 93 return Ok(true); 94 } 95 96 return Ok(false); 97 } 98 } 99 } 100 read<F>(&mut self, filter: &F) -> Result<InternalEvent> where F: Filter,101 pub(crate) fn read<F>(&mut self, filter: &F) -> Result<InternalEvent> 102 where 103 F: Filter, 104 { 105 let mut skipped_events = VecDeque::new(); 106 107 loop { 108 while let Some(event) = self.events.pop_front() { 109 if filter.eval(&event) { 110 while let Some(event) = skipped_events.pop_front() { 111 self.events.push_back(event); 112 } 113 114 return Ok(event); 115 } else { 116 // We can not directly write events back to `self.events`. 117 // If we did, we would put our self's into an endless loop 118 // that would enqueue -> dequeue -> enqueue etc. 119 // This happens because `poll` in this function will always return true if there are events in it's. 120 // And because we just put the non-fulfilling event there this is going to be the case. 121 // Instead we can store them into the temporary buffer, 122 // and then when the filter is fulfilled write all events back in order. 123 skipped_events.push_back(event); 124 } 125 } 126 127 let _ = self.poll(None, filter)?; 128 } 129 } 130 } 131 132 #[cfg(test)] 133 mod tests { 134 use std::{collections::VecDeque, time::Duration}; 135 136 use crate::ErrorKind; 137 138 #[cfg(unix)] 139 use super::super::filter::CursorPositionFilter; 140 use super::{ 141 super::{filter::InternalEventFilter, Event}, 142 EventSource, InternalEvent, InternalEventReader, 143 }; 144 145 #[test] test_poll_fails_without_event_source()146 fn test_poll_fails_without_event_source() { 147 let mut reader = InternalEventReader { 148 events: VecDeque::new(), 149 source: None, 150 skipped_events: Vec::with_capacity(32), 151 }; 152 153 assert!(reader.poll(None, &InternalEventFilter).is_err()); 154 assert!(reader 155 .poll(Some(Duration::from_secs(0)), &InternalEventFilter) 156 .is_err()); 157 assert!(reader 158 .poll(Some(Duration::from_secs(10)), &InternalEventFilter) 159 .is_err()); 160 } 161 162 #[test] test_poll_returns_true_for_matching_event_in_queue_at_front()163 fn test_poll_returns_true_for_matching_event_in_queue_at_front() { 164 let mut reader = InternalEventReader { 165 events: vec![InternalEvent::Event(Event::Resize(10, 10))].into(), 166 source: None, 167 skipped_events: Vec::with_capacity(32), 168 }; 169 170 assert!(reader.poll(None, &InternalEventFilter).unwrap()); 171 } 172 173 #[test] 174 #[cfg(unix)] test_poll_returns_true_for_matching_event_in_queue_at_back()175 fn test_poll_returns_true_for_matching_event_in_queue_at_back() { 176 let mut reader = InternalEventReader { 177 events: vec![ 178 InternalEvent::Event(Event::Resize(10, 10)), 179 InternalEvent::CursorPosition(10, 20), 180 ] 181 .into(), 182 source: None, 183 skipped_events: Vec::with_capacity(32), 184 }; 185 186 assert!(reader.poll(None, &CursorPositionFilter).unwrap()); 187 } 188 189 #[test] test_read_returns_matching_event_in_queue_at_front()190 fn test_read_returns_matching_event_in_queue_at_front() { 191 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10)); 192 193 let mut reader = InternalEventReader { 194 events: vec![EVENT].into(), 195 source: None, 196 skipped_events: Vec::with_capacity(32), 197 }; 198 199 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 200 } 201 202 #[test] 203 #[cfg(unix)] test_read_returns_matching_event_in_queue_at_back()204 fn test_read_returns_matching_event_in_queue_at_back() { 205 const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20); 206 207 let mut reader = InternalEventReader { 208 events: vec![InternalEvent::Event(Event::Resize(10, 10)), CURSOR_EVENT].into(), 209 source: None, 210 skipped_events: Vec::with_capacity(32), 211 }; 212 213 assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT); 214 } 215 216 #[test] 217 #[cfg(unix)] test_read_does_not_consume_skipped_event()218 fn test_read_does_not_consume_skipped_event() { 219 const SKIPPED_EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10)); 220 const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20); 221 222 let mut reader = InternalEventReader { 223 events: vec![SKIPPED_EVENT, CURSOR_EVENT].into(), 224 source: None, 225 skipped_events: Vec::with_capacity(32), 226 }; 227 228 assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT); 229 assert_eq!(reader.read(&InternalEventFilter).unwrap(), SKIPPED_EVENT); 230 } 231 232 #[test] test_poll_timeouts_if_source_has_no_events()233 fn test_poll_timeouts_if_source_has_no_events() { 234 let source = FakeSource::default(); 235 236 let mut reader = InternalEventReader { 237 events: VecDeque::new(), 238 source: Some(Box::new(source)), 239 skipped_events: Vec::with_capacity(32), 240 }; 241 242 assert!(!reader 243 .poll(Some(Duration::from_secs(0)), &InternalEventFilter) 244 .unwrap()); 245 } 246 247 #[test] test_poll_returns_true_if_source_has_at_least_one_event()248 fn test_poll_returns_true_if_source_has_at_least_one_event() { 249 let source = FakeSource::with_events(&[InternalEvent::Event(Event::Resize(10, 10))]); 250 251 let mut reader = InternalEventReader { 252 events: VecDeque::new(), 253 source: Some(Box::new(source)), 254 skipped_events: Vec::with_capacity(32), 255 }; 256 257 assert!(reader.poll(None, &InternalEventFilter).unwrap()); 258 assert!(reader 259 .poll(Some(Duration::from_secs(0)), &InternalEventFilter) 260 .unwrap()); 261 } 262 263 #[test] test_reads_returns_event_if_source_has_at_least_one_event()264 fn test_reads_returns_event_if_source_has_at_least_one_event() { 265 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10)); 266 267 let source = FakeSource::with_events(&[EVENT]); 268 269 let mut reader = InternalEventReader { 270 events: VecDeque::new(), 271 source: Some(Box::new(source)), 272 skipped_events: Vec::with_capacity(32), 273 }; 274 275 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 276 } 277 278 #[test] test_read_returns_events_if_source_has_events()279 fn test_read_returns_events_if_source_has_events() { 280 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10)); 281 282 let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]); 283 284 let mut reader = InternalEventReader { 285 events: VecDeque::new(), 286 source: Some(Box::new(source)), 287 skipped_events: Vec::with_capacity(32), 288 }; 289 290 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 291 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 292 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 293 } 294 295 #[test] test_poll_returns_false_after_all_source_events_are_consumed()296 fn test_poll_returns_false_after_all_source_events_are_consumed() { 297 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10)); 298 299 let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]); 300 301 let mut reader = InternalEventReader { 302 events: VecDeque::new(), 303 source: Some(Box::new(source)), 304 skipped_events: Vec::with_capacity(32), 305 }; 306 307 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 308 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 309 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 310 assert!(!reader 311 .poll(Some(Duration::from_secs(0)), &InternalEventFilter) 312 .unwrap()); 313 } 314 315 #[test] test_poll_propagates_error()316 fn test_poll_propagates_error() { 317 let source = FakeSource::with_error(ErrorKind::ResizingTerminalFailure("Foo".to_string())); 318 319 let mut reader = InternalEventReader { 320 events: VecDeque::new(), 321 source: Some(Box::new(source)), 322 skipped_events: Vec::with_capacity(32), 323 }; 324 325 assert_eq!( 326 reader 327 .poll(Some(Duration::from_secs(0)), &InternalEventFilter) 328 .err() 329 .map(|e| format!("{:?}", &e)), 330 Some(format!( 331 "{:?}", 332 ErrorKind::ResizingTerminalFailure("Foo".to_string()) 333 )) 334 ); 335 } 336 337 #[test] test_read_propagates_error()338 fn test_read_propagates_error() { 339 let source = FakeSource::with_error(ErrorKind::ResizingTerminalFailure("Foo".to_string())); 340 341 let mut reader = InternalEventReader { 342 events: VecDeque::new(), 343 source: Some(Box::new(source)), 344 skipped_events: Vec::with_capacity(32), 345 }; 346 347 assert_eq!( 348 reader 349 .read(&InternalEventFilter) 350 .err() 351 .map(|e| format!("{:?}", &e)), 352 Some(format!( 353 "{:?}", 354 ErrorKind::ResizingTerminalFailure("Foo".to_string()) 355 )) 356 ); 357 } 358 359 #[test] test_poll_continues_after_error()360 fn test_poll_continues_after_error() { 361 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10)); 362 363 let source = FakeSource::new( 364 &[EVENT, EVENT], 365 ErrorKind::ResizingTerminalFailure("Foo".to_string()), 366 ); 367 368 let mut reader = InternalEventReader { 369 events: VecDeque::new(), 370 source: Some(Box::new(source)), 371 skipped_events: Vec::with_capacity(32), 372 }; 373 374 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 375 assert!(reader.read(&InternalEventFilter).is_err()); 376 assert!(reader 377 .poll(Some(Duration::from_secs(0)), &InternalEventFilter) 378 .unwrap()); 379 } 380 381 #[test] test_read_continues_after_error()382 fn test_read_continues_after_error() { 383 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10)); 384 385 let source = FakeSource::new( 386 &[EVENT, EVENT], 387 ErrorKind::ResizingTerminalFailure("Foo".to_string()), 388 ); 389 390 let mut reader = InternalEventReader { 391 events: VecDeque::new(), 392 source: Some(Box::new(source)), 393 skipped_events: Vec::with_capacity(32), 394 }; 395 396 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 397 assert!(reader.read(&InternalEventFilter).is_err()); 398 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT); 399 } 400 401 #[derive(Default)] 402 struct FakeSource { 403 events: VecDeque<InternalEvent>, 404 error: Option<ErrorKind>, 405 } 406 407 impl FakeSource { new(events: &[InternalEvent], error: ErrorKind) -> FakeSource408 fn new(events: &[InternalEvent], error: ErrorKind) -> FakeSource { 409 FakeSource { 410 events: events.to_vec().into(), 411 error: Some(error), 412 } 413 } 414 with_events(events: &[InternalEvent]) -> FakeSource415 fn with_events(events: &[InternalEvent]) -> FakeSource { 416 FakeSource { 417 events: events.to_vec().into(), 418 error: None, 419 } 420 } 421 with_error(error: ErrorKind) -> FakeSource422 fn with_error(error: ErrorKind) -> FakeSource { 423 FakeSource { 424 events: VecDeque::new(), 425 error: Some(error), 426 } 427 } 428 } 429 430 impl EventSource for FakeSource { try_read( &mut self, _timeout: Option<Duration>, ) -> Result<Option<InternalEvent>, ErrorKind>431 fn try_read( 432 &mut self, 433 _timeout: Option<Duration>, 434 ) -> Result<Option<InternalEvent>, ErrorKind> { 435 // Return error if set in case there's just one remaining event 436 if self.events.len() == 1 { 437 if let Some(error) = self.error.take() { 438 return Err(error); 439 } 440 } 441 442 // Return all events from the queue 443 if let Some(event) = self.events.pop_front() { 444 return Ok(Some(event)); 445 } 446 447 // Return error if there're no more events 448 if let Some(error) = self.error.take() { 449 return Err(error); 450 } 451 452 // Timeout 453 Ok(None) 454 } 455 456 #[cfg(feature = "event-stream")] waker(&self) -> super::super::sys::Waker457 fn waker(&self) -> super::super::sys::Waker { 458 unimplemented!(); 459 } 460 } 461 } 462