1 //! A queue of delayed elements. 2 //! 3 //! See [`DelayQueue`] for more details. 4 //! 5 //! [`DelayQueue`]: struct.DelayQueue.html 6 7 use clock::now; 8 use timer::Handle; 9 use wheel::{self, Wheel}; 10 use {Delay, Error}; 11 12 use futures::{Future, Poll, Stream}; 13 use slab::Slab; 14 15 use std::cmp; 16 use std::marker::PhantomData; 17 use std::time::{Duration, Instant}; 18 19 /// A queue of delayed elements. 20 /// 21 /// Once an element is inserted into the `DelayQueue`, it is yielded once the 22 /// specified deadline has been reached. 23 /// 24 /// # Usage 25 /// 26 /// Elements are inserted into `DelayQueue` using the [`insert`] or 27 /// [`insert_at`] methods. A deadline is provided with the item and a [`Key`] is 28 /// returned. The key is used to remove the entry or to change the deadline at 29 /// which it should be yielded back. 30 /// 31 /// Once delays have been configured, the `DelayQueue` is used via its 32 /// [`Stream`] implementation. [`poll`] is called. If an entry has reached its 33 /// deadline, it is returned. If not, `Async::NotReady` indicating that the 34 /// current task will be notified once the deadline has been reached. 35 /// 36 /// # `Stream` implementation 37 /// 38 /// Items are retrieved from the queue via [`Stream::poll`]. If no delays have 39 /// expired, no items are returned. In this case, `NotReady` is returned and the 40 /// current task is registered to be notified once the next item's delay has 41 /// expired. 42 /// 43 /// If no items are in the queue, i.e. `is_empty()` returns `true`, then `poll` 44 /// returns `Ready(None)`. This indicates that the stream has reached an end. 45 /// However, if a new item is inserted *after*, `poll` will once again start 46 /// returning items or `NotReady. 47 /// 48 /// Items are returned ordered by their expirations. Items that are configured 49 /// to expire first will be returned first. There are no ordering guarantees 50 /// for items configured to expire the same instant. Also note that delays are 51 /// rounded to the closest millisecond. 52 /// 53 /// # Implementation 54 /// 55 /// The `DelayQueue` is backed by the same hashed timing wheel implementation as 56 /// [`Timer`] as such, it offers the same performance benefits. See [`Timer`] 57 /// for further implementation notes. 58 /// 59 /// State associated with each entry is stored in a [`slab`]. This allows 60 /// amortizing the cost of allocation. Space created for expired entries is 61 /// reused when inserting new entries. 62 /// 63 /// Capacity can be checked using [`capacity`] and allocated preemptively by using 64 /// the [`reserve`] method. 65 /// 66 /// # Usage 67 /// 68 /// Using `DelayQueue` to manage cache entries. 69 /// 70 /// ```rust 71 /// #[macro_use] 72 /// extern crate futures; 73 /// extern crate tokio; 74 /// # type CacheKey = String; 75 /// # type Value = String; 76 /// use tokio::timer::{delay_queue, DelayQueue, Error}; 77 /// use futures::{Async, Poll, Stream}; 78 /// use std::collections::HashMap; 79 /// use std::time::Duration; 80 /// 81 /// struct Cache { 82 /// entries: HashMap<CacheKey, (Value, delay_queue::Key)>, 83 /// expirations: DelayQueue<CacheKey>, 84 /// } 85 /// 86 /// const TTL_SECS: u64 = 30; 87 /// 88 /// impl Cache { 89 /// fn insert(&mut self, key: CacheKey, value: Value) { 90 /// let delay = self.expirations 91 /// .insert(key.clone(), Duration::from_secs(TTL_SECS)); 92 /// 93 /// self.entries.insert(key, (value, delay)); 94 /// } 95 /// 96 /// fn get(&self, key: &CacheKey) -> Option<&Value> { 97 /// self.entries.get(key) 98 /// .map(|&(ref v, _)| v) 99 /// } 100 /// 101 /// fn remove(&mut self, key: &CacheKey) { 102 /// if let Some((_, cache_key)) = self.entries.remove(key) { 103 /// self.expirations.remove(&cache_key); 104 /// } 105 /// } 106 /// 107 /// fn poll_purge(&mut self) -> Poll<(), Error> { 108 /// while let Some(entry) = try_ready!(self.expirations.poll()) { 109 /// self.entries.remove(entry.get_ref()); 110 /// } 111 /// 112 /// Ok(Async::Ready(())) 113 /// } 114 /// } 115 /// # fn main() {} 116 /// ``` 117 /// 118 /// [`insert`]: #method.insert 119 /// [`insert_at`]: #method.insert_at 120 /// [`Key`]: struct.Key.html 121 /// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html 122 /// [`poll`]: #method.poll 123 /// [`Stream::poll`]: #method.poll 124 /// [`Timer`]: ../struct.Timer.html 125 /// [`slab`]: https://docs.rs/slab 126 /// [`capacity`]: #method.capacity 127 /// [`reserve`]: #method.reserve 128 #[derive(Debug)] 129 pub struct DelayQueue<T> { 130 /// Handle to the timer driving the `DelayQueue` 131 handle: Handle, 132 133 /// Stores data associated with entries 134 slab: Slab<Data<T>>, 135 136 /// Lookup structure tracking all delays in the queue 137 wheel: Wheel<Stack<T>>, 138 139 /// Delays that were inserted when already expired. These cannot be stored 140 /// in the wheel 141 expired: Stack<T>, 142 143 /// Delay expiring when the *first* item in the queue expires 144 delay: Option<Delay>, 145 146 /// Wheel polling state 147 poll: wheel::Poll, 148 149 /// Instant at which the timer starts 150 start: Instant, 151 } 152 153 /// An entry in `DelayQueue` that has expired and removed. 154 /// 155 /// Values are returned by [`DelayQueue::poll`]. 156 /// 157 /// [`DelayQueue::poll`]: struct.DelayQueue.html#method.poll 158 #[derive(Debug)] 159 pub struct Expired<T> { 160 /// The data stored in the queue 161 data: T, 162 163 /// The expiration time 164 deadline: Instant, 165 166 /// The key associated with the entry 167 key: Key, 168 } 169 170 /// Token to a value stored in a `DelayQueue`. 171 /// 172 /// Instances of `Key` are returned by [`DelayQueue::insert`]. See [`DelayQueue`] 173 /// documentation for more details. 174 /// 175 /// [`DelayQueue`]: struct.DelayQueue.html 176 /// [`DelayQueue::insert`]: struct.DelayQueue.html#method.insert 177 #[derive(Debug, Clone)] 178 pub struct Key { 179 index: usize, 180 } 181 182 #[derive(Debug)] 183 struct Stack<T> { 184 /// Head of the stack 185 head: Option<usize>, 186 _p: PhantomData<T>, 187 } 188 189 #[derive(Debug)] 190 struct Data<T> { 191 /// The data being stored in the queue and will be returned at the requested 192 /// instant. 193 inner: T, 194 195 /// The instant at which the item is returned. 196 when: u64, 197 198 /// Set to true when stored in the `expired` queue 199 expired: bool, 200 201 /// Next entry in the stack 202 next: Option<usize>, 203 204 /// Previous entry in the stack 205 prev: Option<usize>, 206 } 207 208 /// Maximum number of entries the queue can handle 209 const MAX_ENTRIES: usize = (1 << 30) - 1; 210 211 impl<T> DelayQueue<T> { 212 /// Create a new, empty, `DelayQueue` 213 /// 214 /// The queue will not allocate storage until items are inserted into it. 215 /// 216 /// # Examples 217 /// 218 /// ```rust 219 /// # use tokio_timer::DelayQueue; 220 /// let delay_queue: DelayQueue<u32> = DelayQueue::new(); 221 /// ``` new() -> DelayQueue<T>222 pub fn new() -> DelayQueue<T> { 223 DelayQueue::with_capacity(0) 224 } 225 226 /// Create a new, empty, `DelayQueue` backed by the specified timer. 227 /// 228 /// The queue will not allocate storage until items are inserted into it. 229 /// 230 /// # Examples 231 /// 232 /// ```rust,no_run 233 /// # use tokio_timer::DelayQueue; 234 /// use tokio_timer::timer::Handle; 235 /// 236 /// let handle = Handle::default(); 237 /// let delay_queue: DelayQueue<u32> = DelayQueue::with_capacity_and_handle(0, &handle); 238 /// ``` with_capacity_and_handle(capacity: usize, handle: &Handle) -> DelayQueue<T>239 pub fn with_capacity_and_handle(capacity: usize, handle: &Handle) -> DelayQueue<T> { 240 DelayQueue { 241 handle: handle.clone(), 242 wheel: Wheel::new(), 243 slab: Slab::with_capacity(capacity), 244 expired: Stack::default(), 245 delay: None, 246 poll: wheel::Poll::new(0), 247 start: now(), 248 } 249 } 250 251 /// Create a new, empty, `DelayQueue` with the specified capacity. 252 /// 253 /// The queue will be able to hold at least `capacity` elements without 254 /// reallocating. If `capacity` is 0, the queue will not allocate for 255 /// storage. 256 /// 257 /// # Examples 258 /// 259 /// ```rust 260 /// # use tokio_timer::DelayQueue; 261 /// # use std::time::Duration; 262 /// let mut delay_queue = DelayQueue::with_capacity(10); 263 /// 264 /// // These insertions are done without further allocation 265 /// for i in 0..10 { 266 /// delay_queue.insert(i, Duration::from_secs(i)); 267 /// } 268 /// 269 /// // This will make the queue allocate additional storage 270 /// delay_queue.insert(11, Duration::from_secs(11)); 271 /// ``` with_capacity(capacity: usize) -> DelayQueue<T>272 pub fn with_capacity(capacity: usize) -> DelayQueue<T> { 273 DelayQueue::with_capacity_and_handle(capacity, &Handle::default()) 274 } 275 276 /// Insert `value` into the queue set to expire at a specific instant in 277 /// time. 278 /// 279 /// This function is identical to `insert`, but takes an `Instant` instead 280 /// of a `Duration`. 281 /// 282 /// `value` is stored in the queue until `when` is reached. At which point, 283 /// `value` will be returned from [`poll`]. If `when` has already been 284 /// reached, then `value` is immediately made available to poll. 285 /// 286 /// The return value represents the insertion and is used at an argument to 287 /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once 288 /// `value` is removed from the queue either by calling [`poll`] after 289 /// `when` is reached or by calling [`remove`]. At this point, the caller 290 /// must take care to not use the returned [`Key`] again as it may reference 291 /// a different item in the queue. 292 /// 293 /// See [type] level documentation for more details. 294 /// 295 /// # Panics 296 /// 297 /// This function panics if `when` is too far in the future. 298 /// 299 /// # Examples 300 /// 301 /// Basic usage 302 /// 303 /// ```rust 304 /// # extern crate tokio; 305 /// use tokio::timer::DelayQueue; 306 /// use std::time::{Instant, Duration}; 307 /// 308 /// # fn main() { 309 /// let mut delay_queue = DelayQueue::new(); 310 /// let key = delay_queue.insert_at( 311 /// "foo", Instant::now() + Duration::from_secs(5)); 312 /// 313 /// // Remove the entry 314 /// let item = delay_queue.remove(&key); 315 /// assert_eq!(*item.get_ref(), "foo"); 316 /// # } 317 /// ``` 318 /// 319 /// [`poll`]: #method.poll 320 /// [`remove`]: #method.remove 321 /// [`reset`]: #method.reset 322 /// [`Key`]: struct.Key.html 323 /// [type]: # insert_at(&mut self, value: T, when: Instant) -> Key324 pub fn insert_at(&mut self, value: T, when: Instant) -> Key { 325 assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded"); 326 327 // Normalize the deadline. Values cannot be set to expire in the past. 328 let when = self.normalize_deadline(when); 329 330 // Insert the value in the store 331 let key = self.slab.insert(Data { 332 inner: value, 333 when, 334 expired: false, 335 next: None, 336 prev: None, 337 }); 338 339 self.insert_idx(when, key); 340 341 // Set a new delay if the current's deadline is later than the one of the new item 342 let should_set_delay = if let Some(ref delay) = self.delay { 343 let current_exp = self.normalize_deadline(delay.deadline()); 344 current_exp > when 345 } else { 346 true 347 }; 348 349 if should_set_delay { 350 self.delay = Some(self.handle.delay(self.start + Duration::from_millis(when))); 351 } 352 353 Key::new(key) 354 } 355 356 /// Insert `value` into the queue set to expire after the requested duration 357 /// elapses. 358 /// 359 /// This function is identical to `insert_at`, but takes a `Duration` 360 /// instead of an `Instant`. 361 /// 362 /// `value` is stored in the queue until `when` is reached. At which point, 363 /// `value` will be returned from [`poll`]. If `when` has already been 364 /// reached, then `value` is immediately made available to poll. 365 /// 366 /// The return value represents the insertion and is used at an argument to 367 /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once 368 /// `value` is removed from the queue either by calling [`poll`] after 369 /// `when` is reached or by calling [`remove`]. At this point, the caller 370 /// must take care to not use the returned [`Key`] again as it may reference 371 /// a different item in the queue. 372 /// 373 /// See [type] level documentation for more details. 374 /// 375 /// # Panics 376 /// 377 /// This function panics if `timeout` is greater than the maximum supported 378 /// duration. 379 /// 380 /// # Examples 381 /// 382 /// Basic usage 383 /// 384 /// ```rust 385 /// # extern crate tokio; 386 /// use tokio::timer::DelayQueue; 387 /// use std::time::Duration; 388 /// 389 /// # fn main() { 390 /// let mut delay_queue = DelayQueue::new(); 391 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 392 /// 393 /// // Remove the entry 394 /// let item = delay_queue.remove(&key); 395 /// assert_eq!(*item.get_ref(), "foo"); 396 /// # } 397 /// ``` 398 /// 399 /// [`poll`]: #method.poll 400 /// [`remove`]: #method.remove 401 /// [`reset`]: #method.reset 402 /// [`Key`]: struct.Key.html 403 /// [type]: # insert(&mut self, value: T, timeout: Duration) -> Key404 pub fn insert(&mut self, value: T, timeout: Duration) -> Key { 405 self.insert_at(value, now() + timeout) 406 } 407 insert_idx(&mut self, when: u64, key: usize)408 fn insert_idx(&mut self, when: u64, key: usize) { 409 use self::wheel::{InsertError, Stack}; 410 411 // Register the deadline with the timer wheel 412 match self.wheel.insert(when, key, &mut self.slab) { 413 Ok(_) => {} 414 Err((_, InsertError::Elapsed)) => { 415 self.slab[key].expired = true; 416 // The delay is already expired, store it in the expired queue 417 self.expired.push(key, &mut self.slab); 418 } 419 Err((_, err)) => panic!("invalid deadline; err={:?}", err), 420 } 421 } 422 423 /// Remove the item associated with `key` from the queue. 424 /// 425 /// There must be an item associated with `key`. The function returns the 426 /// removed item as well as the `Instant` at which it will the delay will 427 /// have expired. 428 /// 429 /// # Panics 430 /// 431 /// The function panics if `key` is not contained by the queue. 432 /// 433 /// # Examples 434 /// 435 /// Basic usage 436 /// 437 /// ```rust 438 /// # extern crate tokio; 439 /// use tokio::timer::DelayQueue; 440 /// use std::time::Duration; 441 /// 442 /// # fn main() { 443 /// let mut delay_queue = DelayQueue::new(); 444 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 445 /// 446 /// // Remove the entry 447 /// let item = delay_queue.remove(&key); 448 /// assert_eq!(*item.get_ref(), "foo"); 449 /// # } 450 /// ``` remove(&mut self, key: &Key) -> Expired<T>451 pub fn remove(&mut self, key: &Key) -> Expired<T> { 452 use wheel::Stack; 453 454 // Special case the `expired` queue 455 if self.slab[key.index].expired { 456 self.expired.remove(&key.index, &mut self.slab); 457 } else { 458 self.wheel.remove(&key.index, &mut self.slab); 459 } 460 461 let data = self.slab.remove(key.index); 462 463 Expired { 464 key: Key::new(key.index), 465 data: data.inner, 466 deadline: self.start + Duration::from_millis(data.when), 467 } 468 } 469 470 /// Sets the delay of the item associated with `key` to expire at `when`. 471 /// 472 /// This function is identical to `reset` but takes an `Instant` instead of 473 /// a `Duration`. 474 /// 475 /// The item remains in the queue but the delay is set to expire at `when`. 476 /// If `when` is in the past, then the item is immediately made available to 477 /// the caller. 478 /// 479 /// # Panics 480 /// 481 /// This function panics if `when` is too far in the future or if `key` is 482 /// not contained by the queue. 483 /// 484 /// # Examples 485 /// 486 /// Basic usage 487 /// 488 /// ```rust 489 /// # extern crate tokio; 490 /// use tokio::timer::DelayQueue; 491 /// use std::time::{Duration, Instant}; 492 /// 493 /// # fn main() { 494 /// let mut delay_queue = DelayQueue::new(); 495 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 496 /// 497 /// // "foo" is scheduled to be returned in 5 seconds 498 /// 499 /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10)); 500 /// 501 /// // "foo"is now scheduled to be returned in 10 seconds 502 /// # } 503 /// ``` reset_at(&mut self, key: &Key, when: Instant)504 pub fn reset_at(&mut self, key: &Key, when: Instant) { 505 self.wheel.remove(&key.index, &mut self.slab); 506 507 // Normalize the deadline. Values cannot be set to expire in the past. 508 let when = self.normalize_deadline(when); 509 510 self.slab[key.index].when = when; 511 self.insert_idx(when, key.index); 512 513 let next_deadline = self.next_deadline(); 514 if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) { 515 delay.reset(deadline); 516 } 517 } 518 519 /// Returns the next time poll as determined by the wheel next_deadline(&mut self) -> Option<Instant>520 fn next_deadline(&mut self) -> Option<Instant> { 521 self.wheel 522 .poll_at() 523 .map(|poll_at| self.start + Duration::from_millis(poll_at)) 524 } 525 526 /// Sets the delay of the item associated with `key` to expire after 527 /// `timeout`. 528 /// 529 /// This function is identical to `reset_at` but takes a `Duration` instead 530 /// of an `Instant`. 531 /// 532 /// The item remains in the queue but the delay is set to expire after 533 /// `timeout`. If `timeout` is zero, then the item is immediately made 534 /// available to the caller. 535 /// 536 /// # Panics 537 /// 538 /// This function panics if `timeout` is greater than the maximum supported 539 /// duration or if `key` is not contained by the queue. 540 /// 541 /// # Examples 542 /// 543 /// Basic usage 544 /// 545 /// ```rust 546 /// # extern crate tokio; 547 /// use tokio::timer::DelayQueue; 548 /// use std::time::Duration; 549 /// 550 /// # fn main() { 551 /// let mut delay_queue = DelayQueue::new(); 552 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 553 /// 554 /// // "foo" is scheduled to be returned in 5 seconds 555 /// 556 /// delay_queue.reset(&key, Duration::from_secs(10)); 557 /// 558 /// // "foo"is now scheduled to be returned in 10 seconds 559 /// # } 560 /// ``` reset(&mut self, key: &Key, timeout: Duration)561 pub fn reset(&mut self, key: &Key, timeout: Duration) { 562 self.reset_at(key, now() + timeout); 563 } 564 565 /// Clears the queue, removing all items. 566 /// 567 /// After calling `clear`, [`poll`] will return `Ok(Ready(None))`. 568 /// 569 /// Note that this method has no effect on the allocated capacity. 570 /// 571 /// [`poll`]: #method.poll 572 /// 573 /// # Examples 574 /// 575 /// ```rust 576 /// # extern crate tokio; 577 /// use tokio::timer::DelayQueue; 578 /// use std::time::Duration; 579 /// 580 /// # fn main() { 581 /// let mut delay_queue = DelayQueue::new(); 582 /// 583 /// delay_queue.insert("foo", Duration::from_secs(5)); 584 /// 585 /// assert!(!delay_queue.is_empty()); 586 /// 587 /// delay_queue.clear(); 588 /// 589 /// assert!(delay_queue.is_empty()); 590 /// # } 591 /// ``` clear(&mut self)592 pub fn clear(&mut self) { 593 self.slab.clear(); 594 self.expired = Stack::default(); 595 self.wheel = Wheel::new(); 596 self.delay = None; 597 } 598 599 /// Returns the number of elements the queue can hold without reallocating. 600 /// 601 /// # Examples 602 /// 603 /// ```rust 604 /// # use tokio_timer::DelayQueue; 605 /// let delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10); 606 /// assert_eq!(delay_queue.capacity(), 10); 607 /// ``` capacity(&self) -> usize608 pub fn capacity(&self) -> usize { 609 self.slab.capacity() 610 } 611 612 /// Reserve capacity for at least `additional` more items to be queued 613 /// without allocating. 614 /// 615 /// `reserve` does nothing if the queue already has sufficient capacity for 616 /// `additional` more values. If more capacity is required, a new segment of 617 /// memory will be allocated and all existing values will be copied into it. 618 /// As such, if the queue is already very large, a call to `reserve` can end 619 /// up being expensive. 620 /// 621 /// The queue may reserve more than `additional` extra space in order to 622 /// avoid frequent reallocations. 623 /// 624 /// # Panics 625 /// 626 /// Panics if the new capacity exceeds the maximum number of entries the 627 /// queue can contain. 628 /// 629 /// # Examples 630 /// 631 /// ``` 632 /// # use tokio_timer::DelayQueue; 633 /// # use std::time::Duration; 634 /// let mut delay_queue = DelayQueue::new(); 635 /// delay_queue.insert("hello", Duration::from_secs(10)); 636 /// delay_queue.reserve(10); 637 /// assert!(delay_queue.capacity() >= 11); 638 /// ``` reserve(&mut self, additional: usize)639 pub fn reserve(&mut self, additional: usize) { 640 self.slab.reserve(additional); 641 } 642 643 /// Returns `true` if there are no items in the queue. 644 /// 645 /// Note that this function returns `false` even if all items have not yet 646 /// expired and a call to `poll` will return `NotReady`. 647 /// 648 /// # Examples 649 /// 650 /// ``` 651 /// # use tokio_timer::DelayQueue; 652 /// use std::time::Duration; 653 /// let mut delay_queue = DelayQueue::new(); 654 /// assert!(delay_queue.is_empty()); 655 /// 656 /// delay_queue.insert("hello", Duration::from_secs(5)); 657 /// assert!(!delay_queue.is_empty()); 658 /// ``` is_empty(&self) -> bool659 pub fn is_empty(&self) -> bool { 660 self.slab.is_empty() 661 } 662 663 /// Polls the queue, returning the index of the next slot in the slab that 664 /// should be returned. 665 /// 666 /// A slot should be returned when the associated deadline has been reached. poll_idx(&mut self) -> Poll<Option<usize>, Error>667 fn poll_idx(&mut self) -> Poll<Option<usize>, Error> { 668 use self::wheel::Stack; 669 670 let expired = self.expired.pop(&mut self.slab); 671 672 if expired.is_some() { 673 return Ok(expired.into()); 674 } 675 676 loop { 677 if let Some(ref mut delay) = self.delay { 678 if !delay.is_elapsed() { 679 try_ready!(delay.poll()); 680 } 681 682 let now = ::ms(delay.deadline() - self.start, ::Round::Down); 683 684 self.poll = wheel::Poll::new(now); 685 } 686 687 self.delay = None; 688 689 if let Some(idx) = self.wheel.poll(&mut self.poll, &mut self.slab) { 690 return Ok(Some(idx).into()); 691 } 692 693 if let Some(deadline) = self.next_deadline() { 694 self.delay = Some(self.handle.delay(deadline)); 695 } else { 696 return Ok(None.into()); 697 } 698 } 699 } 700 normalize_deadline(&self, when: Instant) -> u64701 fn normalize_deadline(&self, when: Instant) -> u64 { 702 let when = if when < self.start { 703 0 704 } else { 705 ::ms(when - self.start, ::Round::Up) 706 }; 707 708 cmp::max(when, self.wheel.elapsed()) 709 } 710 } 711 712 impl<T> Stream for DelayQueue<T> { 713 type Item = Expired<T>; 714 type Error = Error; 715 poll(&mut self) -> Poll<Option<Self::Item>, Error>716 fn poll(&mut self) -> Poll<Option<Self::Item>, Error> { 717 let item = try_ready!(self.poll_idx()).map(|idx| { 718 let data = self.slab.remove(idx); 719 debug_assert!(data.next.is_none()); 720 debug_assert!(data.prev.is_none()); 721 722 Expired { 723 key: Key::new(idx), 724 data: data.inner, 725 deadline: self.start + Duration::from_millis(data.when), 726 } 727 }); 728 729 Ok(item.into()) 730 } 731 } 732 733 impl<T> wheel::Stack for Stack<T> { 734 type Owned = usize; 735 type Borrowed = usize; 736 type Store = Slab<Data<T>>; 737 is_empty(&self) -> bool738 fn is_empty(&self) -> bool { 739 self.head.is_none() 740 } 741 push(&mut self, item: Self::Owned, store: &mut Self::Store)742 fn push(&mut self, item: Self::Owned, store: &mut Self::Store) { 743 // Ensure the entry is not already in a stack. 744 debug_assert!(store[item].next.is_none()); 745 debug_assert!(store[item].prev.is_none()); 746 747 // Remove the old head entry 748 let old = self.head.take(); 749 750 if let Some(idx) = old { 751 store[idx].prev = Some(item); 752 } 753 754 store[item].next = old; 755 self.head = Some(item) 756 } 757 pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned>758 fn pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned> { 759 if let Some(idx) = self.head { 760 self.head = store[idx].next; 761 762 if let Some(idx) = self.head { 763 store[idx].prev = None; 764 } 765 766 store[idx].next = None; 767 debug_assert!(store[idx].prev.is_none()); 768 769 Some(idx) 770 } else { 771 None 772 } 773 } 774 remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store)775 fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) { 776 assert!(store.contains(*item)); 777 778 // Ensure that the entry is in fact contained by the stack 779 debug_assert!({ 780 // This walks the full linked list even if an entry is found. 781 let mut next = self.head; 782 let mut contains = false; 783 784 while let Some(idx) = next { 785 if idx == *item { 786 debug_assert!(!contains); 787 contains = true; 788 } 789 790 next = store[idx].next; 791 } 792 793 contains 794 }); 795 796 if let Some(next) = store[*item].next { 797 store[next].prev = store[*item].prev; 798 } 799 800 if let Some(prev) = store[*item].prev { 801 store[prev].next = store[*item].next; 802 } else { 803 self.head = store[*item].next; 804 } 805 806 store[*item].next = None; 807 store[*item].prev = None; 808 } 809 when(item: &Self::Borrowed, store: &Self::Store) -> u64810 fn when(item: &Self::Borrowed, store: &Self::Store) -> u64 { 811 store[*item].when 812 } 813 } 814 815 impl<T> Default for Stack<T> { default() -> Stack<T>816 fn default() -> Stack<T> { 817 Stack { 818 head: None, 819 _p: PhantomData, 820 } 821 } 822 } 823 824 impl Key { new(index: usize) -> Key825 pub(crate) fn new(index: usize) -> Key { 826 Key { index } 827 } 828 } 829 830 impl<T> Expired<T> { 831 /// Returns a reference to the inner value. get_ref(&self) -> &T832 pub fn get_ref(&self) -> &T { 833 &self.data 834 } 835 836 /// Returns a mutable reference to the inner value. get_mut(&mut self) -> &mut T837 pub fn get_mut(&mut self) -> &mut T { 838 &mut self.data 839 } 840 841 /// Consumes `self` and returns the inner value. into_inner(self) -> T842 pub fn into_inner(self) -> T { 843 self.data 844 } 845 } 846