1 //! A queue of delayed elements.
2 //!
3 //! See [`DelayQueue`] for more details.
4 //!
5 //! [`DelayQueue`]: struct.DelayQueue.html
6 
7 use clock::now;
8 use timer::Handle;
9 use wheel::{self, Wheel};
10 use {Delay, Error};
11 
12 use futures::{Future, Poll, Stream};
13 use slab::Slab;
14 
15 use std::cmp;
16 use std::marker::PhantomData;
17 use std::time::{Duration, Instant};
18 
19 /// A queue of delayed elements.
20 ///
21 /// Once an element is inserted into the `DelayQueue`, it is yielded once the
22 /// specified deadline has been reached.
23 ///
24 /// # Usage
25 ///
26 /// Elements are inserted into `DelayQueue` using the [`insert`] or
27 /// [`insert_at`] methods. A deadline is provided with the item and a [`Key`] is
28 /// returned. The key is used to remove the entry or to change the deadline at
29 /// which it should be yielded back.
30 ///
31 /// Once delays have been configured, the `DelayQueue` is used via its
32 /// [`Stream`] implementation. [`poll`] is called. If an entry has reached its
33 /// deadline, it is returned. If not, `Async::NotReady` indicating that the
34 /// current task will be notified once the deadline has been reached.
35 ///
36 /// # `Stream` implementation
37 ///
38 /// Items are retrieved from the queue via [`Stream::poll`]. If no delays have
39 /// expired, no items are returned. In this case, `NotReady` is returned and the
40 /// current task is registered to be notified once the next item's delay has
41 /// expired.
42 ///
43 /// If no items are in the queue, i.e. `is_empty()` returns `true`, then `poll`
44 /// returns `Ready(None)`. This indicates that the stream has reached an end.
45 /// However, if a new item is inserted *after*, `poll` will once again start
46 /// returning items or `NotReady.
47 ///
48 /// Items are returned ordered by their expirations. Items that are configured
49 /// to expire first will be returned first. There are no ordering guarantees
50 /// for items configured to expire the same instant. Also note that delays are
51 /// rounded to the closest millisecond.
52 ///
53 /// # Implementation
54 ///
55 /// The `DelayQueue` is backed by the same hashed timing wheel implementation as
56 /// [`Timer`] as such, it offers the same performance benefits. See [`Timer`]
57 /// for further implementation notes.
58 ///
59 /// State associated with each entry is stored in a [`slab`]. This allows
60 /// amortizing the cost of allocation. Space created for expired entries is
61 /// reused when inserting new entries.
62 ///
63 /// Capacity can be checked using [`capacity`] and allocated preemptively by using
64 /// the [`reserve`] method.
65 ///
66 /// # Usage
67 ///
68 /// Using `DelayQueue` to manage cache entries.
69 ///
70 /// ```rust
71 /// #[macro_use]
72 /// extern crate futures;
73 /// extern crate tokio;
74 /// # type CacheKey = String;
75 /// # type Value = String;
76 /// use tokio::timer::{delay_queue, DelayQueue, Error};
77 /// use futures::{Async, Poll, Stream};
78 /// use std::collections::HashMap;
79 /// use std::time::Duration;
80 ///
81 /// struct Cache {
82 ///     entries: HashMap<CacheKey, (Value, delay_queue::Key)>,
83 ///     expirations: DelayQueue<CacheKey>,
84 /// }
85 ///
86 /// const TTL_SECS: u64 = 30;
87 ///
88 /// impl Cache {
89 ///     fn insert(&mut self, key: CacheKey, value: Value) {
90 ///         let delay = self.expirations
91 ///             .insert(key.clone(), Duration::from_secs(TTL_SECS));
92 ///
93 ///         self.entries.insert(key, (value, delay));
94 ///     }
95 ///
96 ///     fn get(&self, key: &CacheKey) -> Option<&Value> {
97 ///         self.entries.get(key)
98 ///             .map(|&(ref v, _)| v)
99 ///     }
100 ///
101 ///     fn remove(&mut self, key: &CacheKey) {
102 ///         if let Some((_, cache_key)) = self.entries.remove(key) {
103 ///             self.expirations.remove(&cache_key);
104 ///         }
105 ///     }
106 ///
107 ///     fn poll_purge(&mut self) -> Poll<(), Error> {
108 ///         while let Some(entry) = try_ready!(self.expirations.poll()) {
109 ///             self.entries.remove(entry.get_ref());
110 ///         }
111 ///
112 ///         Ok(Async::Ready(()))
113 ///     }
114 /// }
115 /// # fn main() {}
116 /// ```
117 ///
118 /// [`insert`]: #method.insert
119 /// [`insert_at`]: #method.insert_at
120 /// [`Key`]: struct.Key.html
121 /// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
122 /// [`poll`]: #method.poll
123 /// [`Stream::poll`]: #method.poll
124 /// [`Timer`]: ../struct.Timer.html
125 /// [`slab`]: https://docs.rs/slab
126 /// [`capacity`]: #method.capacity
127 /// [`reserve`]: #method.reserve
128 #[derive(Debug)]
129 pub struct DelayQueue<T> {
130     /// Handle to the timer driving the `DelayQueue`
131     handle: Handle,
132 
133     /// Stores data associated with entries
134     slab: Slab<Data<T>>,
135 
136     /// Lookup structure tracking all delays in the queue
137     wheel: Wheel<Stack<T>>,
138 
139     /// Delays that were inserted when already expired. These cannot be stored
140     /// in the wheel
141     expired: Stack<T>,
142 
143     /// Delay expiring when the *first* item in the queue expires
144     delay: Option<Delay>,
145 
146     /// Wheel polling state
147     poll: wheel::Poll,
148 
149     /// Instant at which the timer starts
150     start: Instant,
151 }
152 
153 /// An entry in `DelayQueue` that has expired and removed.
154 ///
155 /// Values are returned by [`DelayQueue::poll`].
156 ///
157 /// [`DelayQueue::poll`]: struct.DelayQueue.html#method.poll
158 #[derive(Debug)]
159 pub struct Expired<T> {
160     /// The data stored in the queue
161     data: T,
162 
163     /// The expiration time
164     deadline: Instant,
165 
166     /// The key associated with the entry
167     key: Key,
168 }
169 
170 /// Token to a value stored in a `DelayQueue`.
171 ///
172 /// Instances of `Key` are returned by [`DelayQueue::insert`]. See [`DelayQueue`]
173 /// documentation for more details.
174 ///
175 /// [`DelayQueue`]: struct.DelayQueue.html
176 /// [`DelayQueue::insert`]: struct.DelayQueue.html#method.insert
177 #[derive(Debug, Clone)]
178 pub struct Key {
179     index: usize,
180 }
181 
182 #[derive(Debug)]
183 struct Stack<T> {
184     /// Head of the stack
185     head: Option<usize>,
186     _p: PhantomData<T>,
187 }
188 
189 #[derive(Debug)]
190 struct Data<T> {
191     /// The data being stored in the queue and will be returned at the requested
192     /// instant.
193     inner: T,
194 
195     /// The instant at which the item is returned.
196     when: u64,
197 
198     /// Set to true when stored in the `expired` queue
199     expired: bool,
200 
201     /// Next entry in the stack
202     next: Option<usize>,
203 
204     /// Previous entry in the stack
205     prev: Option<usize>,
206 }
207 
208 /// Maximum number of entries the queue can handle
209 const MAX_ENTRIES: usize = (1 << 30) - 1;
210 
211 impl<T> DelayQueue<T> {
212     /// Create a new, empty, `DelayQueue`
213     ///
214     /// The queue will not allocate storage until items are inserted into it.
215     ///
216     /// # Examples
217     ///
218     /// ```rust
219     /// # use tokio_timer::DelayQueue;
220     /// let delay_queue: DelayQueue<u32> = DelayQueue::new();
221     /// ```
new() -> DelayQueue<T>222     pub fn new() -> DelayQueue<T> {
223         DelayQueue::with_capacity(0)
224     }
225 
226     /// Create a new, empty, `DelayQueue` backed by the specified timer.
227     ///
228     /// The queue will not allocate storage until items are inserted into it.
229     ///
230     /// # Examples
231     ///
232     /// ```rust,no_run
233     /// # use tokio_timer::DelayQueue;
234     /// use tokio_timer::timer::Handle;
235     ///
236     /// let handle = Handle::default();
237     /// let delay_queue: DelayQueue<u32> = DelayQueue::with_capacity_and_handle(0, &handle);
238     /// ```
with_capacity_and_handle(capacity: usize, handle: &Handle) -> DelayQueue<T>239     pub fn with_capacity_and_handle(capacity: usize, handle: &Handle) -> DelayQueue<T> {
240         DelayQueue {
241             handle: handle.clone(),
242             wheel: Wheel::new(),
243             slab: Slab::with_capacity(capacity),
244             expired: Stack::default(),
245             delay: None,
246             poll: wheel::Poll::new(0),
247             start: now(),
248         }
249     }
250 
251     /// Create a new, empty, `DelayQueue` with the specified capacity.
252     ///
253     /// The queue will be able to hold at least `capacity` elements without
254     /// reallocating. If `capacity` is 0, the queue will not allocate for
255     /// storage.
256     ///
257     /// # Examples
258     ///
259     /// ```rust
260     /// # use tokio_timer::DelayQueue;
261     /// # use std::time::Duration;
262     /// let mut delay_queue = DelayQueue::with_capacity(10);
263     ///
264     /// // These insertions are done without further allocation
265     /// for i in 0..10 {
266     ///     delay_queue.insert(i, Duration::from_secs(i));
267     /// }
268     ///
269     /// // This will make the queue allocate additional storage
270     /// delay_queue.insert(11, Duration::from_secs(11));
271     /// ```
with_capacity(capacity: usize) -> DelayQueue<T>272     pub fn with_capacity(capacity: usize) -> DelayQueue<T> {
273         DelayQueue::with_capacity_and_handle(capacity, &Handle::default())
274     }
275 
276     /// Insert `value` into the queue set to expire at a specific instant in
277     /// time.
278     ///
279     /// This function is identical to `insert`, but takes an `Instant` instead
280     /// of a `Duration`.
281     ///
282     /// `value` is stored in the queue until `when` is reached. At which point,
283     /// `value` will be returned from [`poll`]. If `when` has already been
284     /// reached, then `value` is immediately made available to poll.
285     ///
286     /// The return value represents the insertion and is used at an argument to
287     /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once
288     /// `value` is removed from the queue either by calling [`poll`] after
289     /// `when` is reached or by calling [`remove`]. At this point, the caller
290     /// must take care to not use the returned [`Key`] again as it may reference
291     /// a different item in the queue.
292     ///
293     /// See [type] level documentation for more details.
294     ///
295     /// # Panics
296     ///
297     /// This function panics if `when` is too far in the future.
298     ///
299     /// # Examples
300     ///
301     /// Basic usage
302     ///
303     /// ```rust
304     /// # extern crate tokio;
305     /// use tokio::timer::DelayQueue;
306     /// use std::time::{Instant, Duration};
307     ///
308     /// # fn main() {
309     /// let mut delay_queue = DelayQueue::new();
310     /// let key = delay_queue.insert_at(
311     ///     "foo", Instant::now() + Duration::from_secs(5));
312     ///
313     /// // Remove the entry
314     /// let item = delay_queue.remove(&key);
315     /// assert_eq!(*item.get_ref(), "foo");
316     /// # }
317     /// ```
318     ///
319     /// [`poll`]: #method.poll
320     /// [`remove`]: #method.remove
321     /// [`reset`]: #method.reset
322     /// [`Key`]: struct.Key.html
323     /// [type]: #
insert_at(&mut self, value: T, when: Instant) -> Key324     pub fn insert_at(&mut self, value: T, when: Instant) -> Key {
325         assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded");
326 
327         // Normalize the deadline. Values cannot be set to expire in the past.
328         let when = self.normalize_deadline(when);
329 
330         // Insert the value in the store
331         let key = self.slab.insert(Data {
332             inner: value,
333             when,
334             expired: false,
335             next: None,
336             prev: None,
337         });
338 
339         self.insert_idx(when, key);
340 
341         // Set a new delay if the current's deadline is later than the one of the new item
342         let should_set_delay = if let Some(ref delay) = self.delay {
343             let current_exp = self.normalize_deadline(delay.deadline());
344             current_exp > when
345         } else {
346             true
347         };
348 
349         if should_set_delay {
350             self.delay = Some(self.handle.delay(self.start + Duration::from_millis(when)));
351         }
352 
353         Key::new(key)
354     }
355 
356     /// Insert `value` into the queue set to expire after the requested duration
357     /// elapses.
358     ///
359     /// This function is identical to `insert_at`, but takes a `Duration`
360     /// instead of an `Instant`.
361     ///
362     /// `value` is stored in the queue until `when` is reached. At which point,
363     /// `value` will be returned from [`poll`]. If `when` has already been
364     /// reached, then `value` is immediately made available to poll.
365     ///
366     /// The return value represents the insertion and is used at an argument to
367     /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once
368     /// `value` is removed from the queue either by calling [`poll`] after
369     /// `when` is reached or by calling [`remove`]. At this point, the caller
370     /// must take care to not use the returned [`Key`] again as it may reference
371     /// a different item in the queue.
372     ///
373     /// See [type] level documentation for more details.
374     ///
375     /// # Panics
376     ///
377     /// This function panics if `timeout` is greater than the maximum supported
378     /// duration.
379     ///
380     /// # Examples
381     ///
382     /// Basic usage
383     ///
384     /// ```rust
385     /// # extern crate tokio;
386     /// use tokio::timer::DelayQueue;
387     /// use std::time::Duration;
388     ///
389     /// # fn main() {
390     /// let mut delay_queue = DelayQueue::new();
391     /// let key = delay_queue.insert("foo", Duration::from_secs(5));
392     ///
393     /// // Remove the entry
394     /// let item = delay_queue.remove(&key);
395     /// assert_eq!(*item.get_ref(), "foo");
396     /// # }
397     /// ```
398     ///
399     /// [`poll`]: #method.poll
400     /// [`remove`]: #method.remove
401     /// [`reset`]: #method.reset
402     /// [`Key`]: struct.Key.html
403     /// [type]: #
insert(&mut self, value: T, timeout: Duration) -> Key404     pub fn insert(&mut self, value: T, timeout: Duration) -> Key {
405         self.insert_at(value, now() + timeout)
406     }
407 
insert_idx(&mut self, when: u64, key: usize)408     fn insert_idx(&mut self, when: u64, key: usize) {
409         use self::wheel::{InsertError, Stack};
410 
411         // Register the deadline with the timer wheel
412         match self.wheel.insert(when, key, &mut self.slab) {
413             Ok(_) => {}
414             Err((_, InsertError::Elapsed)) => {
415                 self.slab[key].expired = true;
416                 // The delay is already expired, store it in the expired queue
417                 self.expired.push(key, &mut self.slab);
418             }
419             Err((_, err)) => panic!("invalid deadline; err={:?}", err),
420         }
421     }
422 
423     /// Remove the item associated with `key` from the queue.
424     ///
425     /// There must be an item associated with `key`. The function returns the
426     /// removed item as well as the `Instant` at which it will the delay will
427     /// have expired.
428     ///
429     /// # Panics
430     ///
431     /// The function panics if `key` is not contained by the queue.
432     ///
433     /// # Examples
434     ///
435     /// Basic usage
436     ///
437     /// ```rust
438     /// # extern crate tokio;
439     /// use tokio::timer::DelayQueue;
440     /// use std::time::Duration;
441     ///
442     /// # fn main() {
443     /// let mut delay_queue = DelayQueue::new();
444     /// let key = delay_queue.insert("foo", Duration::from_secs(5));
445     ///
446     /// // Remove the entry
447     /// let item = delay_queue.remove(&key);
448     /// assert_eq!(*item.get_ref(), "foo");
449     /// # }
450     /// ```
remove(&mut self, key: &Key) -> Expired<T>451     pub fn remove(&mut self, key: &Key) -> Expired<T> {
452         use wheel::Stack;
453 
454         // Special case the `expired` queue
455         if self.slab[key.index].expired {
456             self.expired.remove(&key.index, &mut self.slab);
457         } else {
458             self.wheel.remove(&key.index, &mut self.slab);
459         }
460 
461         let data = self.slab.remove(key.index);
462 
463         Expired {
464             key: Key::new(key.index),
465             data: data.inner,
466             deadline: self.start + Duration::from_millis(data.when),
467         }
468     }
469 
470     /// Sets the delay of the item associated with `key` to expire at `when`.
471     ///
472     /// This function is identical to `reset` but takes an `Instant` instead of
473     /// a `Duration`.
474     ///
475     /// The item remains in the queue but the delay is set to expire at `when`.
476     /// If `when` is in the past, then the item is immediately made available to
477     /// the caller.
478     ///
479     /// # Panics
480     ///
481     /// This function panics if `when` is too far in the future or if `key` is
482     /// not contained by the queue.
483     ///
484     /// # Examples
485     ///
486     /// Basic usage
487     ///
488     /// ```rust
489     /// # extern crate tokio;
490     /// use tokio::timer::DelayQueue;
491     /// use std::time::{Duration, Instant};
492     ///
493     /// # fn main() {
494     /// let mut delay_queue = DelayQueue::new();
495     /// let key = delay_queue.insert("foo", Duration::from_secs(5));
496     ///
497     /// // "foo" is scheduled to be returned in 5 seconds
498     ///
499     /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10));
500     ///
501     /// // "foo"is now scheduled to be returned in 10 seconds
502     /// # }
503     /// ```
reset_at(&mut self, key: &Key, when: Instant)504     pub fn reset_at(&mut self, key: &Key, when: Instant) {
505         self.wheel.remove(&key.index, &mut self.slab);
506 
507         // Normalize the deadline. Values cannot be set to expire in the past.
508         let when = self.normalize_deadline(when);
509 
510         self.slab[key.index].when = when;
511         self.insert_idx(when, key.index);
512 
513         let next_deadline = self.next_deadline();
514         if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) {
515             delay.reset(deadline);
516         }
517     }
518 
519     /// Returns the next time poll as determined by the wheel
next_deadline(&mut self) -> Option<Instant>520     fn next_deadline(&mut self) -> Option<Instant> {
521         self.wheel
522             .poll_at()
523             .map(|poll_at| self.start + Duration::from_millis(poll_at))
524     }
525 
526     /// Sets the delay of the item associated with `key` to expire after
527     /// `timeout`.
528     ///
529     /// This function is identical to `reset_at` but takes a `Duration` instead
530     /// of an `Instant`.
531     ///
532     /// The item remains in the queue but the delay is set to expire after
533     /// `timeout`.  If `timeout` is zero, then the item is immediately made
534     /// available to the caller.
535     ///
536     /// # Panics
537     ///
538     /// This function panics if `timeout` is greater than the maximum supported
539     /// duration or if `key` is not contained by the queue.
540     ///
541     /// # Examples
542     ///
543     /// Basic usage
544     ///
545     /// ```rust
546     /// # extern crate tokio;
547     /// use tokio::timer::DelayQueue;
548     /// use std::time::Duration;
549     ///
550     /// # fn main() {
551     /// let mut delay_queue = DelayQueue::new();
552     /// let key = delay_queue.insert("foo", Duration::from_secs(5));
553     ///
554     /// // "foo" is scheduled to be returned in 5 seconds
555     ///
556     /// delay_queue.reset(&key, Duration::from_secs(10));
557     ///
558     /// // "foo"is now scheduled to be returned in 10 seconds
559     /// # }
560     /// ```
reset(&mut self, key: &Key, timeout: Duration)561     pub fn reset(&mut self, key: &Key, timeout: Duration) {
562         self.reset_at(key, now() + timeout);
563     }
564 
565     /// Clears the queue, removing all items.
566     ///
567     /// After calling `clear`, [`poll`] will return `Ok(Ready(None))`.
568     ///
569     /// Note that this method has no effect on the allocated capacity.
570     ///
571     /// [`poll`]: #method.poll
572     ///
573     /// # Examples
574     ///
575     /// ```rust
576     /// # extern crate tokio;
577     /// use tokio::timer::DelayQueue;
578     /// use std::time::Duration;
579     ///
580     /// # fn main() {
581     /// let mut delay_queue = DelayQueue::new();
582     ///
583     /// delay_queue.insert("foo", Duration::from_secs(5));
584     ///
585     /// assert!(!delay_queue.is_empty());
586     ///
587     /// delay_queue.clear();
588     ///
589     /// assert!(delay_queue.is_empty());
590     /// # }
591     /// ```
clear(&mut self)592     pub fn clear(&mut self) {
593         self.slab.clear();
594         self.expired = Stack::default();
595         self.wheel = Wheel::new();
596         self.delay = None;
597     }
598 
599     /// Returns the number of elements the queue can hold without reallocating.
600     ///
601     /// # Examples
602     ///
603     /// ```rust
604     /// # use tokio_timer::DelayQueue;
605     /// let delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10);
606     /// assert_eq!(delay_queue.capacity(), 10);
607     /// ```
capacity(&self) -> usize608     pub fn capacity(&self) -> usize {
609         self.slab.capacity()
610     }
611 
612     /// Reserve capacity for at least `additional` more items to be queued
613     /// without allocating.
614     ///
615     /// `reserve` does nothing if the queue already has sufficient capacity for
616     /// `additional` more values. If more capacity is required, a new segment of
617     /// memory will be allocated and all existing values will be copied into it.
618     /// As such, if the queue is already very large, a call to `reserve` can end
619     /// up being expensive.
620     ///
621     /// The queue may reserve more than `additional` extra space in order to
622     /// avoid frequent reallocations.
623     ///
624     /// # Panics
625     ///
626     /// Panics if the new capacity exceeds the maximum number of entries the
627     /// queue can contain.
628     ///
629     /// # Examples
630     ///
631     /// ```
632     /// # use tokio_timer::DelayQueue;
633     /// # use std::time::Duration;
634     /// let mut delay_queue = DelayQueue::new();
635     /// delay_queue.insert("hello", Duration::from_secs(10));
636     /// delay_queue.reserve(10);
637     /// assert!(delay_queue.capacity() >= 11);
638     /// ```
reserve(&mut self, additional: usize)639     pub fn reserve(&mut self, additional: usize) {
640         self.slab.reserve(additional);
641     }
642 
643     /// Returns `true` if there are no items in the queue.
644     ///
645     /// Note that this function returns `false` even if all items have not yet
646     /// expired and a call to `poll` will return `NotReady`.
647     ///
648     /// # Examples
649     ///
650     /// ```
651     /// # use tokio_timer::DelayQueue;
652     /// use std::time::Duration;
653     /// let mut delay_queue = DelayQueue::new();
654     /// assert!(delay_queue.is_empty());
655     ///
656     /// delay_queue.insert("hello", Duration::from_secs(5));
657     /// assert!(!delay_queue.is_empty());
658     /// ```
is_empty(&self) -> bool659     pub fn is_empty(&self) -> bool {
660         self.slab.is_empty()
661     }
662 
663     /// Polls the queue, returning the index of the next slot in the slab that
664     /// should be returned.
665     ///
666     /// A slot should be returned when the associated deadline has been reached.
poll_idx(&mut self) -> Poll<Option<usize>, Error>667     fn poll_idx(&mut self) -> Poll<Option<usize>, Error> {
668         use self::wheel::Stack;
669 
670         let expired = self.expired.pop(&mut self.slab);
671 
672         if expired.is_some() {
673             return Ok(expired.into());
674         }
675 
676         loop {
677             if let Some(ref mut delay) = self.delay {
678                 if !delay.is_elapsed() {
679                     try_ready!(delay.poll());
680                 }
681 
682                 let now = ::ms(delay.deadline() - self.start, ::Round::Down);
683 
684                 self.poll = wheel::Poll::new(now);
685             }
686 
687             self.delay = None;
688 
689             if let Some(idx) = self.wheel.poll(&mut self.poll, &mut self.slab) {
690                 return Ok(Some(idx).into());
691             }
692 
693             if let Some(deadline) = self.next_deadline() {
694                 self.delay = Some(self.handle.delay(deadline));
695             } else {
696                 return Ok(None.into());
697             }
698         }
699     }
700 
normalize_deadline(&self, when: Instant) -> u64701     fn normalize_deadline(&self, when: Instant) -> u64 {
702         let when = if when < self.start {
703             0
704         } else {
705             ::ms(when - self.start, ::Round::Up)
706         };
707 
708         cmp::max(when, self.wheel.elapsed())
709     }
710 }
711 
712 impl<T> Stream for DelayQueue<T> {
713     type Item = Expired<T>;
714     type Error = Error;
715 
poll(&mut self) -> Poll<Option<Self::Item>, Error>716     fn poll(&mut self) -> Poll<Option<Self::Item>, Error> {
717         let item = try_ready!(self.poll_idx()).map(|idx| {
718             let data = self.slab.remove(idx);
719             debug_assert!(data.next.is_none());
720             debug_assert!(data.prev.is_none());
721 
722             Expired {
723                 key: Key::new(idx),
724                 data: data.inner,
725                 deadline: self.start + Duration::from_millis(data.when),
726             }
727         });
728 
729         Ok(item.into())
730     }
731 }
732 
733 impl<T> wheel::Stack for Stack<T> {
734     type Owned = usize;
735     type Borrowed = usize;
736     type Store = Slab<Data<T>>;
737 
is_empty(&self) -> bool738     fn is_empty(&self) -> bool {
739         self.head.is_none()
740     }
741 
push(&mut self, item: Self::Owned, store: &mut Self::Store)742     fn push(&mut self, item: Self::Owned, store: &mut Self::Store) {
743         // Ensure the entry is not already in a stack.
744         debug_assert!(store[item].next.is_none());
745         debug_assert!(store[item].prev.is_none());
746 
747         // Remove the old head entry
748         let old = self.head.take();
749 
750         if let Some(idx) = old {
751             store[idx].prev = Some(item);
752         }
753 
754         store[item].next = old;
755         self.head = Some(item)
756     }
757 
pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned>758     fn pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned> {
759         if let Some(idx) = self.head {
760             self.head = store[idx].next;
761 
762             if let Some(idx) = self.head {
763                 store[idx].prev = None;
764             }
765 
766             store[idx].next = None;
767             debug_assert!(store[idx].prev.is_none());
768 
769             Some(idx)
770         } else {
771             None
772         }
773     }
774 
remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store)775     fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) {
776         assert!(store.contains(*item));
777 
778         // Ensure that the entry is in fact contained by the stack
779         debug_assert!({
780             // This walks the full linked list even if an entry is found.
781             let mut next = self.head;
782             let mut contains = false;
783 
784             while let Some(idx) = next {
785                 if idx == *item {
786                     debug_assert!(!contains);
787                     contains = true;
788                 }
789 
790                 next = store[idx].next;
791             }
792 
793             contains
794         });
795 
796         if let Some(next) = store[*item].next {
797             store[next].prev = store[*item].prev;
798         }
799 
800         if let Some(prev) = store[*item].prev {
801             store[prev].next = store[*item].next;
802         } else {
803             self.head = store[*item].next;
804         }
805 
806         store[*item].next = None;
807         store[*item].prev = None;
808     }
809 
when(item: &Self::Borrowed, store: &Self::Store) -> u64810     fn when(item: &Self::Borrowed, store: &Self::Store) -> u64 {
811         store[*item].when
812     }
813 }
814 
815 impl<T> Default for Stack<T> {
default() -> Stack<T>816     fn default() -> Stack<T> {
817         Stack {
818             head: None,
819             _p: PhantomData,
820         }
821     }
822 }
823 
824 impl Key {
new(index: usize) -> Key825     pub(crate) fn new(index: usize) -> Key {
826         Key { index }
827     }
828 }
829 
830 impl<T> Expired<T> {
831     /// Returns a reference to the inner value.
get_ref(&self) -> &T832     pub fn get_ref(&self) -> &T {
833         &self.data
834     }
835 
836     /// Returns a mutable reference to the inner value.
get_mut(&mut self) -> &mut T837     pub fn get_mut(&mut self) -> &mut T {
838         &mut self.data
839     }
840 
841     /// Consumes `self` and returns the inner value.
into_inner(self) -> T842     pub fn into_inner(self) -> T {
843         self.data
844     }
845 }
846