1 //! A queue of delayed elements. 2 //! 3 //! See [`DelayQueue`] for more details. 4 //! 5 //! [`DelayQueue`]: struct@DelayQueue 6 7 use crate::time::wheel::{self, Wheel}; 8 use crate::time::{delay_until, Delay, Duration, Error, Instant}; 9 10 use slab::Slab; 11 use std::cmp; 12 use std::future::Future; 13 use std::marker::PhantomData; 14 use std::pin::Pin; 15 use std::task::{self, Poll}; 16 17 /// A queue of delayed elements. 18 /// 19 /// Once an element is inserted into the `DelayQueue`, it is yielded once the 20 /// specified deadline has been reached. 21 /// 22 /// # Usage 23 /// 24 /// Elements are inserted into `DelayQueue` using the [`insert`] or 25 /// [`insert_at`] methods. A deadline is provided with the item and a [`Key`] is 26 /// returned. The key is used to remove the entry or to change the deadline at 27 /// which it should be yielded back. 28 /// 29 /// Once delays have been configured, the `DelayQueue` is used via its 30 /// [`Stream`] implementation. [`poll`] is called. If an entry has reached its 31 /// deadline, it is returned. If not, `Poll::Pending` indicating that the 32 /// current task will be notified once the deadline has been reached. 33 /// 34 /// # `Stream` implementation 35 /// 36 /// Items are retrieved from the queue via [`Stream::poll`]. If no delays have 37 /// expired, no items are returned. In this case, `NotReady` is returned and the 38 /// current task is registered to be notified once the next item's delay has 39 /// expired. 40 /// 41 /// If no items are in the queue, i.e. `is_empty()` returns `true`, then `poll` 42 /// returns `Ready(None)`. This indicates that the stream has reached an end. 43 /// However, if a new item is inserted *after*, `poll` will once again start 44 /// returning items or `NotReady. 45 /// 46 /// Items are returned ordered by their expirations. Items that are configured 47 /// to expire first will be returned first. There are no ordering guarantees 48 /// for items configured to expire the same instant. Also note that delays are 49 /// rounded to the closest millisecond. 50 /// 51 /// # Implementation 52 /// 53 /// The [`DelayQueue`] is backed by a separate instance of the same timer wheel used internally by 54 /// Tokio's standalone timer utilities such as [`delay_for`]. Because of this, it offers the same 55 /// performance and scalability benefits. 56 /// 57 /// State associated with each entry is stored in a [`slab`]. This amortizes the cost of allocation, 58 /// and allows reuse of the memory allocated for expired entires. 59 /// 60 /// Capacity can be checked using [`capacity`] and allocated preemptively by using 61 /// the [`reserve`] method. 62 /// 63 /// # Usage 64 /// 65 /// Using `DelayQueue` to manage cache entries. 66 /// 67 /// ```rust,no_run 68 /// use tokio::time::{delay_queue, DelayQueue, Error}; 69 /// 70 /// use futures::ready; 71 /// use std::collections::HashMap; 72 /// use std::task::{Context, Poll}; 73 /// use std::time::Duration; 74 /// # type CacheKey = String; 75 /// # type Value = String; 76 /// 77 /// struct Cache { 78 /// entries: HashMap<CacheKey, (Value, delay_queue::Key)>, 79 /// expirations: DelayQueue<CacheKey>, 80 /// } 81 /// 82 /// const TTL_SECS: u64 = 30; 83 /// 84 /// impl Cache { 85 /// fn insert(&mut self, key: CacheKey, value: Value) { 86 /// let delay = self.expirations 87 /// .insert(key.clone(), Duration::from_secs(TTL_SECS)); 88 /// 89 /// self.entries.insert(key, (value, delay)); 90 /// } 91 /// 92 /// fn get(&self, key: &CacheKey) -> Option<&Value> { 93 /// self.entries.get(key) 94 /// .map(|&(ref v, _)| v) 95 /// } 96 /// 97 /// fn remove(&mut self, key: &CacheKey) { 98 /// if let Some((_, cache_key)) = self.entries.remove(key) { 99 /// self.expirations.remove(&cache_key); 100 /// } 101 /// } 102 /// 103 /// fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> { 104 /// while let Some(res) = ready!(self.expirations.poll_expired(cx)) { 105 /// let entry = res?; 106 /// self.entries.remove(entry.get_ref()); 107 /// } 108 /// 109 /// Poll::Ready(Ok(())) 110 /// } 111 /// } 112 /// ``` 113 /// 114 /// [`insert`]: #method.insert 115 /// [`insert_at`]: #method.insert_at 116 /// [`Key`]: struct@Key 117 /// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html 118 /// [`poll`]: #method.poll 119 /// [`Stream::poll`]: #method.poll 120 /// [`DelayQueue`]: struct@DelayQueue 121 /// [`delay_for`]: fn@super::delay_for 122 /// [`slab`]: https://docs.rs/slab 123 /// [`capacity`]: #method.capacity 124 /// [`reserve`]: #method.reserve 125 #[derive(Debug)] 126 pub struct DelayQueue<T> { 127 /// Stores data associated with entries 128 slab: Slab<Data<T>>, 129 130 /// Lookup structure tracking all delays in the queue 131 wheel: Wheel<Stack<T>>, 132 133 /// Delays that were inserted when already expired. These cannot be stored 134 /// in the wheel 135 expired: Stack<T>, 136 137 /// Delay expiring when the *first* item in the queue expires 138 delay: Option<Delay>, 139 140 /// Wheel polling state 141 poll: wheel::Poll, 142 143 /// Instant at which the timer starts 144 start: Instant, 145 } 146 147 /// An entry in `DelayQueue` that has expired and removed. 148 /// 149 /// Values are returned by [`DelayQueue::poll`]. 150 /// 151 /// [`DelayQueue::poll`]: method@DelayQueue::poll 152 #[derive(Debug)] 153 pub struct Expired<T> { 154 /// The data stored in the queue 155 data: T, 156 157 /// The expiration time 158 deadline: Instant, 159 160 /// The key associated with the entry 161 key: Key, 162 } 163 164 /// Token to a value stored in a `DelayQueue`. 165 /// 166 /// Instances of `Key` are returned by [`DelayQueue::insert`]. See [`DelayQueue`] 167 /// documentation for more details. 168 /// 169 /// [`DelayQueue`]: struct@DelayQueue 170 /// [`DelayQueue::insert`]: method@DelayQueue::insert 171 #[derive(Debug, Clone)] 172 pub struct Key { 173 index: usize, 174 } 175 176 #[derive(Debug)] 177 struct Stack<T> { 178 /// Head of the stack 179 head: Option<usize>, 180 _p: PhantomData<fn() -> T>, 181 } 182 183 #[derive(Debug)] 184 struct Data<T> { 185 /// The data being stored in the queue and will be returned at the requested 186 /// instant. 187 inner: T, 188 189 /// The instant at which the item is returned. 190 when: u64, 191 192 /// Set to true when stored in the `expired` queue 193 expired: bool, 194 195 /// Next entry in the stack 196 next: Option<usize>, 197 198 /// Previous entry in the stack 199 prev: Option<usize>, 200 } 201 202 /// Maximum number of entries the queue can handle 203 const MAX_ENTRIES: usize = (1 << 30) - 1; 204 205 impl<T> DelayQueue<T> { 206 /// Creates a new, empty, `DelayQueue` 207 /// 208 /// The queue will not allocate storage until items are inserted into it. 209 /// 210 /// # Examples 211 /// 212 /// ```rust 213 /// # use tokio::time::DelayQueue; 214 /// let delay_queue: DelayQueue<u32> = DelayQueue::new(); 215 /// ``` new() -> DelayQueue<T>216 pub fn new() -> DelayQueue<T> { 217 DelayQueue::with_capacity(0) 218 } 219 220 /// Creates a new, empty, `DelayQueue` with the specified capacity. 221 /// 222 /// The queue will be able to hold at least `capacity` elements without 223 /// reallocating. If `capacity` is 0, the queue will not allocate for 224 /// storage. 225 /// 226 /// # Examples 227 /// 228 /// ```rust 229 /// # use tokio::time::DelayQueue; 230 /// # use std::time::Duration; 231 /// 232 /// # #[tokio::main] 233 /// # async fn main() { 234 /// let mut delay_queue = DelayQueue::with_capacity(10); 235 /// 236 /// // These insertions are done without further allocation 237 /// for i in 0..10 { 238 /// delay_queue.insert(i, Duration::from_secs(i)); 239 /// } 240 /// 241 /// // This will make the queue allocate additional storage 242 /// delay_queue.insert(11, Duration::from_secs(11)); 243 /// # } 244 /// ``` with_capacity(capacity: usize) -> DelayQueue<T>245 pub fn with_capacity(capacity: usize) -> DelayQueue<T> { 246 DelayQueue { 247 wheel: Wheel::new(), 248 slab: Slab::with_capacity(capacity), 249 expired: Stack::default(), 250 delay: None, 251 poll: wheel::Poll::new(0), 252 start: Instant::now(), 253 } 254 } 255 256 /// Inserts `value` into the queue set to expire at a specific instant in 257 /// time. 258 /// 259 /// This function is identical to `insert`, but takes an `Instant` instead 260 /// of a `Duration`. 261 /// 262 /// `value` is stored in the queue until `when` is reached. At which point, 263 /// `value` will be returned from [`poll`]. If `when` has already been 264 /// reached, then `value` is immediately made available to poll. 265 /// 266 /// The return value represents the insertion and is used at an argument to 267 /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once 268 /// `value` is removed from the queue either by calling [`poll`] after 269 /// `when` is reached or by calling [`remove`]. At this point, the caller 270 /// must take care to not use the returned [`Key`] again as it may reference 271 /// a different item in the queue. 272 /// 273 /// See [type] level documentation for more details. 274 /// 275 /// # Panics 276 /// 277 /// This function panics if `when` is too far in the future. 278 /// 279 /// # Examples 280 /// 281 /// Basic usage 282 /// 283 /// ```rust 284 /// use tokio::time::{DelayQueue, Duration, Instant}; 285 /// 286 /// # #[tokio::main] 287 /// # async fn main() { 288 /// let mut delay_queue = DelayQueue::new(); 289 /// let key = delay_queue.insert_at( 290 /// "foo", Instant::now() + Duration::from_secs(5)); 291 /// 292 /// // Remove the entry 293 /// let item = delay_queue.remove(&key); 294 /// assert_eq!(*item.get_ref(), "foo"); 295 /// # } 296 /// ``` 297 /// 298 /// [`poll`]: #method.poll 299 /// [`remove`]: #method.remove 300 /// [`reset`]: #method.reset 301 /// [`Key`]: struct@Key 302 /// [type]: # insert_at(&mut self, value: T, when: Instant) -> Key303 pub fn insert_at(&mut self, value: T, when: Instant) -> Key { 304 assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded"); 305 306 // Normalize the deadline. Values cannot be set to expire in the past. 307 let when = self.normalize_deadline(when); 308 309 // Insert the value in the store 310 let key = self.slab.insert(Data { 311 inner: value, 312 when, 313 expired: false, 314 next: None, 315 prev: None, 316 }); 317 318 self.insert_idx(when, key); 319 320 // Set a new delay if the current's deadline is later than the one of the new item 321 let should_set_delay = if let Some(ref delay) = self.delay { 322 let current_exp = self.normalize_deadline(delay.deadline()); 323 current_exp > when 324 } else { 325 true 326 }; 327 328 if should_set_delay { 329 let delay_time = self.start + Duration::from_millis(when); 330 if let Some(ref mut delay) = &mut self.delay { 331 delay.reset(delay_time); 332 } else { 333 self.delay = Some(delay_until(delay_time)); 334 } 335 } 336 337 Key::new(key) 338 } 339 340 /// Attempts to pull out the next value of the delay queue, registering the 341 /// current task for wakeup if the value is not yet available, and returning 342 /// None if the queue is exhausted. poll_expired( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<Expired<T>, Error>>>343 pub fn poll_expired( 344 &mut self, 345 cx: &mut task::Context<'_>, 346 ) -> Poll<Option<Result<Expired<T>, Error>>> { 347 let item = ready!(self.poll_idx(cx)); 348 Poll::Ready(item.map(|result| { 349 result.map(|idx| { 350 let data = self.slab.remove(idx); 351 debug_assert!(data.next.is_none()); 352 debug_assert!(data.prev.is_none()); 353 354 Expired { 355 key: Key::new(idx), 356 data: data.inner, 357 deadline: self.start + Duration::from_millis(data.when), 358 } 359 }) 360 })) 361 } 362 363 /// Inserts `value` into the queue set to expire after the requested duration 364 /// elapses. 365 /// 366 /// This function is identical to `insert_at`, but takes a `Duration` 367 /// instead of an `Instant`. 368 /// 369 /// `value` is stored in the queue until `when` is reached. At which point, 370 /// `value` will be returned from [`poll`]. If `when` has already been 371 /// reached, then `value` is immediately made available to poll. 372 /// 373 /// The return value represents the insertion and is used at an argument to 374 /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once 375 /// `value` is removed from the queue either by calling [`poll`] after 376 /// `when` is reached or by calling [`remove`]. At this point, the caller 377 /// must take care to not use the returned [`Key`] again as it may reference 378 /// a different item in the queue. 379 /// 380 /// See [type] level documentation for more details. 381 /// 382 /// # Panics 383 /// 384 /// This function panics if `timeout` is greater than the maximum supported 385 /// duration. 386 /// 387 /// # Examples 388 /// 389 /// Basic usage 390 /// 391 /// ```rust 392 /// use tokio::time::DelayQueue; 393 /// use std::time::Duration; 394 /// 395 /// # #[tokio::main] 396 /// # async fn main() { 397 /// let mut delay_queue = DelayQueue::new(); 398 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 399 /// 400 /// // Remove the entry 401 /// let item = delay_queue.remove(&key); 402 /// assert_eq!(*item.get_ref(), "foo"); 403 /// # } 404 /// ``` 405 /// 406 /// [`poll`]: #method.poll 407 /// [`remove`]: #method.remove 408 /// [`reset`]: #method.reset 409 /// [`Key`]: struct@Key 410 /// [type]: # insert(&mut self, value: T, timeout: Duration) -> Key411 pub fn insert(&mut self, value: T, timeout: Duration) -> Key { 412 self.insert_at(value, Instant::now() + timeout) 413 } 414 insert_idx(&mut self, when: u64, key: usize)415 fn insert_idx(&mut self, when: u64, key: usize) { 416 use self::wheel::{InsertError, Stack}; 417 418 // Register the deadline with the timer wheel 419 match self.wheel.insert(when, key, &mut self.slab) { 420 Ok(_) => {} 421 Err((_, InsertError::Elapsed)) => { 422 self.slab[key].expired = true; 423 // The delay is already expired, store it in the expired queue 424 self.expired.push(key, &mut self.slab); 425 } 426 Err((_, err)) => panic!("invalid deadline; err={:?}", err), 427 } 428 } 429 430 /// Removes the item associated with `key` from the queue. 431 /// 432 /// There must be an item associated with `key`. The function returns the 433 /// removed item as well as the `Instant` at which it will the delay will 434 /// have expired. 435 /// 436 /// # Panics 437 /// 438 /// The function panics if `key` is not contained by the queue. 439 /// 440 /// # Examples 441 /// 442 /// Basic usage 443 /// 444 /// ```rust 445 /// use tokio::time::DelayQueue; 446 /// use std::time::Duration; 447 /// 448 /// # #[tokio::main] 449 /// # async fn main() { 450 /// let mut delay_queue = DelayQueue::new(); 451 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 452 /// 453 /// // Remove the entry 454 /// let item = delay_queue.remove(&key); 455 /// assert_eq!(*item.get_ref(), "foo"); 456 /// # } 457 /// ``` remove(&mut self, key: &Key) -> Expired<T>458 pub fn remove(&mut self, key: &Key) -> Expired<T> { 459 use crate::time::wheel::Stack; 460 461 // Special case the `expired` queue 462 if self.slab[key.index].expired { 463 self.expired.remove(&key.index, &mut self.slab); 464 } else { 465 self.wheel.remove(&key.index, &mut self.slab); 466 } 467 468 let data = self.slab.remove(key.index); 469 470 Expired { 471 key: Key::new(key.index), 472 data: data.inner, 473 deadline: self.start + Duration::from_millis(data.when), 474 } 475 } 476 477 /// Sets the delay of the item associated with `key` to expire at `when`. 478 /// 479 /// This function is identical to `reset` but takes an `Instant` instead of 480 /// a `Duration`. 481 /// 482 /// The item remains in the queue but the delay is set to expire at `when`. 483 /// If `when` is in the past, then the item is immediately made available to 484 /// the caller. 485 /// 486 /// # Panics 487 /// 488 /// This function panics if `when` is too far in the future or if `key` is 489 /// not contained by the queue. 490 /// 491 /// # Examples 492 /// 493 /// Basic usage 494 /// 495 /// ```rust 496 /// use tokio::time::{DelayQueue, Duration, Instant}; 497 /// 498 /// # #[tokio::main] 499 /// # async fn main() { 500 /// let mut delay_queue = DelayQueue::new(); 501 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 502 /// 503 /// // "foo" is scheduled to be returned in 5 seconds 504 /// 505 /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10)); 506 /// 507 /// // "foo"is now scheduled to be returned in 10 seconds 508 /// # } 509 /// ``` reset_at(&mut self, key: &Key, when: Instant)510 pub fn reset_at(&mut self, key: &Key, when: Instant) { 511 self.wheel.remove(&key.index, &mut self.slab); 512 513 // Normalize the deadline. Values cannot be set to expire in the past. 514 let when = self.normalize_deadline(when); 515 516 self.slab[key.index].when = when; 517 self.insert_idx(when, key.index); 518 519 let next_deadline = self.next_deadline(); 520 if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) { 521 delay.reset(deadline); 522 } 523 } 524 525 /// Returns the next time poll as determined by the wheel next_deadline(&mut self) -> Option<Instant>526 fn next_deadline(&mut self) -> Option<Instant> { 527 self.wheel 528 .poll_at() 529 .map(|poll_at| self.start + Duration::from_millis(poll_at)) 530 } 531 532 /// Sets the delay of the item associated with `key` to expire after 533 /// `timeout`. 534 /// 535 /// This function is identical to `reset_at` but takes a `Duration` instead 536 /// of an `Instant`. 537 /// 538 /// The item remains in the queue but the delay is set to expire after 539 /// `timeout`. If `timeout` is zero, then the item is immediately made 540 /// available to the caller. 541 /// 542 /// # Panics 543 /// 544 /// This function panics if `timeout` is greater than the maximum supported 545 /// duration or if `key` is not contained by the queue. 546 /// 547 /// # Examples 548 /// 549 /// Basic usage 550 /// 551 /// ```rust 552 /// use tokio::time::DelayQueue; 553 /// use std::time::Duration; 554 /// 555 /// # #[tokio::main] 556 /// # async fn main() { 557 /// let mut delay_queue = DelayQueue::new(); 558 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 559 /// 560 /// // "foo" is scheduled to be returned in 5 seconds 561 /// 562 /// delay_queue.reset(&key, Duration::from_secs(10)); 563 /// 564 /// // "foo"is now scheduled to be returned in 10 seconds 565 /// # } 566 /// ``` reset(&mut self, key: &Key, timeout: Duration)567 pub fn reset(&mut self, key: &Key, timeout: Duration) { 568 self.reset_at(key, Instant::now() + timeout); 569 } 570 571 /// Clears the queue, removing all items. 572 /// 573 /// After calling `clear`, [`poll`] will return `Ok(Ready(None))`. 574 /// 575 /// Note that this method has no effect on the allocated capacity. 576 /// 577 /// [`poll`]: #method.poll 578 /// 579 /// # Examples 580 /// 581 /// ```rust 582 /// use tokio::time::DelayQueue; 583 /// use std::time::Duration; 584 /// 585 /// # #[tokio::main] 586 /// # async fn main() { 587 /// let mut delay_queue = DelayQueue::new(); 588 /// 589 /// delay_queue.insert("foo", Duration::from_secs(5)); 590 /// 591 /// assert!(!delay_queue.is_empty()); 592 /// 593 /// delay_queue.clear(); 594 /// 595 /// assert!(delay_queue.is_empty()); 596 /// # } 597 /// ``` clear(&mut self)598 pub fn clear(&mut self) { 599 self.slab.clear(); 600 self.expired = Stack::default(); 601 self.wheel = Wheel::new(); 602 self.delay = None; 603 } 604 605 /// Returns the number of elements the queue can hold without reallocating. 606 /// 607 /// # Examples 608 /// 609 /// ```rust 610 /// use tokio::time::DelayQueue; 611 /// 612 /// let delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10); 613 /// assert_eq!(delay_queue.capacity(), 10); 614 /// ``` capacity(&self) -> usize615 pub fn capacity(&self) -> usize { 616 self.slab.capacity() 617 } 618 619 /// Returns the number of elements currently in the queue. 620 /// 621 /// # Examples 622 /// 623 /// ```rust 624 /// use tokio::time::DelayQueue; 625 /// use std::time::Duration; 626 /// 627 /// # #[tokio::main] 628 /// # async fn main() { 629 /// let mut delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10); 630 /// assert_eq!(delay_queue.len(), 0); 631 /// delay_queue.insert(3, Duration::from_secs(5)); 632 /// assert_eq!(delay_queue.len(), 1); 633 /// # } 634 /// ``` len(&self) -> usize635 pub fn len(&self) -> usize { 636 self.slab.len() 637 } 638 639 /// Reserves capacity for at least `additional` more items to be queued 640 /// without allocating. 641 /// 642 /// `reserve` does nothing if the queue already has sufficient capacity for 643 /// `additional` more values. If more capacity is required, a new segment of 644 /// memory will be allocated and all existing values will be copied into it. 645 /// As such, if the queue is already very large, a call to `reserve` can end 646 /// up being expensive. 647 /// 648 /// The queue may reserve more than `additional` extra space in order to 649 /// avoid frequent reallocations. 650 /// 651 /// # Panics 652 /// 653 /// Panics if the new capacity exceeds the maximum number of entries the 654 /// queue can contain. 655 /// 656 /// # Examples 657 /// 658 /// ``` 659 /// use tokio::time::DelayQueue; 660 /// use std::time::Duration; 661 /// 662 /// # #[tokio::main] 663 /// # async fn main() { 664 /// let mut delay_queue = DelayQueue::new(); 665 /// 666 /// delay_queue.insert("hello", Duration::from_secs(10)); 667 /// delay_queue.reserve(10); 668 /// 669 /// assert!(delay_queue.capacity() >= 11); 670 /// # } 671 /// ``` reserve(&mut self, additional: usize)672 pub fn reserve(&mut self, additional: usize) { 673 self.slab.reserve(additional); 674 } 675 676 /// Returns `true` if there are no items in the queue. 677 /// 678 /// Note that this function returns `false` even if all items have not yet 679 /// expired and a call to `poll` will return `NotReady`. 680 /// 681 /// # Examples 682 /// 683 /// ``` 684 /// use tokio::time::DelayQueue; 685 /// use std::time::Duration; 686 /// 687 /// # #[tokio::main] 688 /// # async fn main() { 689 /// let mut delay_queue = DelayQueue::new(); 690 /// assert!(delay_queue.is_empty()); 691 /// 692 /// delay_queue.insert("hello", Duration::from_secs(5)); 693 /// assert!(!delay_queue.is_empty()); 694 /// # } 695 /// ``` is_empty(&self) -> bool696 pub fn is_empty(&self) -> bool { 697 self.slab.is_empty() 698 } 699 700 /// Polls the queue, returning the index of the next slot in the slab that 701 /// should be returned. 702 /// 703 /// A slot should be returned when the associated deadline has been reached. poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<usize, Error>>>704 fn poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<usize, Error>>> { 705 use self::wheel::Stack; 706 707 let expired = self.expired.pop(&mut self.slab); 708 709 if expired.is_some() { 710 return Poll::Ready(expired.map(Ok)); 711 } 712 713 loop { 714 if let Some(ref mut delay) = self.delay { 715 if !delay.is_elapsed() { 716 ready!(Pin::new(&mut *delay).poll(cx)); 717 } 718 719 let now = crate::time::ms(delay.deadline() - self.start, crate::time::Round::Down); 720 721 self.poll = wheel::Poll::new(now); 722 } 723 724 // We poll the wheel to get the next value out before finding the next deadline. 725 let wheel_idx = self.wheel.poll(&mut self.poll, &mut self.slab); 726 727 self.delay = self.next_deadline().map(delay_until); 728 729 if let Some(idx) = wheel_idx { 730 return Poll::Ready(Some(Ok(idx))); 731 } 732 733 if self.delay.is_none() { 734 return Poll::Ready(None); 735 } 736 } 737 } 738 normalize_deadline(&self, when: Instant) -> u64739 fn normalize_deadline(&self, when: Instant) -> u64 { 740 let when = if when < self.start { 741 0 742 } else { 743 crate::time::ms(when - self.start, crate::time::Round::Up) 744 }; 745 746 cmp::max(when, self.wheel.elapsed()) 747 } 748 } 749 750 // We never put `T` in a `Pin`... 751 impl<T> Unpin for DelayQueue<T> {} 752 753 impl<T> Default for DelayQueue<T> { default() -> DelayQueue<T>754 fn default() -> DelayQueue<T> { 755 DelayQueue::new() 756 } 757 } 758 759 #[cfg(feature = "stream")] 760 impl<T> futures_core::Stream for DelayQueue<T> { 761 // DelayQueue seems much more specific, where a user may care that it 762 // has reached capacity, so return those errors instead of panicking. 763 type Item = Result<Expired<T>, Error>; 764 poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>>765 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { 766 DelayQueue::poll_expired(self.get_mut(), cx) 767 } 768 } 769 770 impl<T> wheel::Stack for Stack<T> { 771 type Owned = usize; 772 type Borrowed = usize; 773 type Store = Slab<Data<T>>; 774 is_empty(&self) -> bool775 fn is_empty(&self) -> bool { 776 self.head.is_none() 777 } 778 push(&mut self, item: Self::Owned, store: &mut Self::Store)779 fn push(&mut self, item: Self::Owned, store: &mut Self::Store) { 780 // Ensure the entry is not already in a stack. 781 debug_assert!(store[item].next.is_none()); 782 debug_assert!(store[item].prev.is_none()); 783 784 // Remove the old head entry 785 let old = self.head.take(); 786 787 if let Some(idx) = old { 788 store[idx].prev = Some(item); 789 } 790 791 store[item].next = old; 792 self.head = Some(item) 793 } 794 pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned>795 fn pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned> { 796 if let Some(idx) = self.head { 797 self.head = store[idx].next; 798 799 if let Some(idx) = self.head { 800 store[idx].prev = None; 801 } 802 803 store[idx].next = None; 804 debug_assert!(store[idx].prev.is_none()); 805 806 Some(idx) 807 } else { 808 None 809 } 810 } 811 remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store)812 fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) { 813 assert!(store.contains(*item)); 814 815 // Ensure that the entry is in fact contained by the stack 816 debug_assert!({ 817 // This walks the full linked list even if an entry is found. 818 let mut next = self.head; 819 let mut contains = false; 820 821 while let Some(idx) = next { 822 if idx == *item { 823 debug_assert!(!contains); 824 contains = true; 825 } 826 827 next = store[idx].next; 828 } 829 830 contains 831 }); 832 833 if let Some(next) = store[*item].next { 834 store[next].prev = store[*item].prev; 835 } 836 837 if let Some(prev) = store[*item].prev { 838 store[prev].next = store[*item].next; 839 } else { 840 self.head = store[*item].next; 841 } 842 843 store[*item].next = None; 844 store[*item].prev = None; 845 } 846 when(item: &Self::Borrowed, store: &Self::Store) -> u64847 fn when(item: &Self::Borrowed, store: &Self::Store) -> u64 { 848 store[*item].when 849 } 850 } 851 852 impl<T> Default for Stack<T> { default() -> Stack<T>853 fn default() -> Stack<T> { 854 Stack { 855 head: None, 856 _p: PhantomData, 857 } 858 } 859 } 860 861 impl Key { new(index: usize) -> Key862 pub(crate) fn new(index: usize) -> Key { 863 Key { index } 864 } 865 } 866 867 impl<T> Expired<T> { 868 /// Returns a reference to the inner value. get_ref(&self) -> &T869 pub fn get_ref(&self) -> &T { 870 &self.data 871 } 872 873 /// Returns a mutable reference to the inner value. get_mut(&mut self) -> &mut T874 pub fn get_mut(&mut self) -> &mut T { 875 &mut self.data 876 } 877 878 /// Consumes `self` and returns the inner value. into_inner(self) -> T879 pub fn into_inner(self) -> T { 880 self.data 881 } 882 883 /// Returns the deadline that the expiration was set to. deadline(&self) -> Instant884 pub fn deadline(&self) -> Instant { 885 self.deadline 886 } 887 } 888