1 //! A queue of delayed elements. 2 //! 3 //! See [`DelayQueue`] for more details. 4 //! 5 //! [`DelayQueue`]: struct@DelayQueue 6 7 use crate::time::wheel::{self, Wheel}; 8 9 use futures_core::ready; 10 use tokio::time::{error::Error, sleep_until, Duration, Instant, Sleep}; 11 12 use slab::Slab; 13 use std::cmp; 14 use std::future::Future; 15 use std::marker::PhantomData; 16 use std::pin::Pin; 17 use std::task::{self, Poll, Waker}; 18 19 /// A queue of delayed elements. 20 /// 21 /// Once an element is inserted into the `DelayQueue`, it is yielded once the 22 /// specified deadline has been reached. 23 /// 24 /// # Usage 25 /// 26 /// Elements are inserted into `DelayQueue` using the [`insert`] or 27 /// [`insert_at`] methods. A deadline is provided with the item and a [`Key`] is 28 /// returned. The key is used to remove the entry or to change the deadline at 29 /// which it should be yielded back. 30 /// 31 /// Once delays have been configured, the `DelayQueue` is used via its 32 /// [`Stream`] implementation. [`poll_expired`] is called. If an entry has reached its 33 /// deadline, it is returned. If not, `Poll::Pending` is returned indicating that the 34 /// current task will be notified once the deadline has been reached. 35 /// 36 /// # `Stream` implementation 37 /// 38 /// Items are retrieved from the queue via [`DelayQueue::poll_expired`]. If no delays have 39 /// expired, no items are returned. In this case, `Poll::Pending` is returned and the 40 /// current task is registered to be notified once the next item's delay has 41 /// expired. 42 /// 43 /// If no items are in the queue, i.e. `is_empty()` returns `true`, then `poll` 44 /// returns `Poll::Ready(None)`. This indicates that the stream has reached an end. 45 /// However, if a new item is inserted *after*, `poll` will once again start 46 /// returning items or `Poll::Pending`. 47 /// 48 /// Items are returned ordered by their expirations. Items that are configured 49 /// to expire first will be returned first. There are no ordering guarantees 50 /// for items configured to expire at the same instant. Also note that delays are 51 /// rounded to the closest millisecond. 52 /// 53 /// # Implementation 54 /// 55 /// The [`DelayQueue`] is backed by a separate instance of a timer wheel similar to that used internally 56 /// by Tokio's standalone timer utilities such as [`sleep`]. Because of this, it offers the same 57 /// performance and scalability benefits. 58 /// 59 /// State associated with each entry is stored in a [`slab`]. This amortizes the cost of allocation, 60 /// and allows reuse of the memory allocated for expired entires. 61 /// 62 /// Capacity can be checked using [`capacity`] and allocated preemptively by using 63 /// the [`reserve`] method. 64 /// 65 /// # Usage 66 /// 67 /// Using `DelayQueue` to manage cache entries. 68 /// 69 /// ```rust,no_run 70 /// use tokio::time::error::Error; 71 /// use tokio_util::time::{DelayQueue, delay_queue}; 72 /// 73 /// use futures::ready; 74 /// use std::collections::HashMap; 75 /// use std::task::{Context, Poll}; 76 /// use std::time::Duration; 77 /// # type CacheKey = String; 78 /// # type Value = String; 79 /// 80 /// struct Cache { 81 /// entries: HashMap<CacheKey, (Value, delay_queue::Key)>, 82 /// expirations: DelayQueue<CacheKey>, 83 /// } 84 /// 85 /// const TTL_SECS: u64 = 30; 86 /// 87 /// impl Cache { 88 /// fn insert(&mut self, key: CacheKey, value: Value) { 89 /// let delay = self.expirations 90 /// .insert(key.clone(), Duration::from_secs(TTL_SECS)); 91 /// 92 /// self.entries.insert(key, (value, delay)); 93 /// } 94 /// 95 /// fn get(&self, key: &CacheKey) -> Option<&Value> { 96 /// self.entries.get(key) 97 /// .map(|&(ref v, _)| v) 98 /// } 99 /// 100 /// fn remove(&mut self, key: &CacheKey) { 101 /// if let Some((_, cache_key)) = self.entries.remove(key) { 102 /// self.expirations.remove(&cache_key); 103 /// } 104 /// } 105 /// 106 /// fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> { 107 /// while let Some(res) = ready!(self.expirations.poll_expired(cx)) { 108 /// let entry = res?; 109 /// self.entries.remove(entry.get_ref()); 110 /// } 111 /// 112 /// Poll::Ready(Ok(())) 113 /// } 114 /// } 115 /// ``` 116 /// 117 /// [`insert`]: method@Self::insert 118 /// [`insert_at`]: method@Self::insert_at 119 /// [`Key`]: struct@Key 120 /// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html 121 /// [`poll_expired`]: method@Self::poll_expired 122 /// [`Stream::poll_expired`]: method@Self::poll_expired 123 /// [`DelayQueue`]: struct@DelayQueue 124 /// [`sleep`]: fn@tokio::time::sleep 125 /// [`slab`]: slab 126 /// [`capacity`]: method@Self::capacity 127 /// [`reserve`]: method@Self::reserve 128 #[derive(Debug)] 129 pub struct DelayQueue<T> { 130 /// Stores data associated with entries 131 slab: Slab<Data<T>>, 132 133 /// Lookup structure tracking all delays in the queue 134 wheel: Wheel<Stack<T>>, 135 136 /// Delays that were inserted when already expired. These cannot be stored 137 /// in the wheel 138 expired: Stack<T>, 139 140 /// Delay expiring when the *first* item in the queue expires 141 delay: Option<Pin<Box<Sleep>>>, 142 143 /// Wheel polling state 144 wheel_now: u64, 145 146 /// Instant at which the timer starts 147 start: Instant, 148 149 /// Waker that is invoked when we potentially need to reset the timer. 150 /// Because we lazily create the timer when the first entry is created, we 151 /// need to awaken any poller that polled us before that point. 152 waker: Option<Waker>, 153 } 154 155 /// An entry in `DelayQueue` that has expired and been removed. 156 /// 157 /// Values are returned by [`DelayQueue::poll_expired`]. 158 /// 159 /// [`DelayQueue::poll_expired`]: method@DelayQueue::poll_expired 160 #[derive(Debug)] 161 pub struct Expired<T> { 162 /// The data stored in the queue 163 data: T, 164 165 /// The expiration time 166 deadline: Instant, 167 168 /// The key associated with the entry 169 key: Key, 170 } 171 172 /// Token to a value stored in a `DelayQueue`. 173 /// 174 /// Instances of `Key` are returned by [`DelayQueue::insert`]. See [`DelayQueue`] 175 /// documentation for more details. 176 /// 177 /// [`DelayQueue`]: struct@DelayQueue 178 /// [`DelayQueue::insert`]: method@DelayQueue::insert 179 #[derive(Debug, Clone)] 180 pub struct Key { 181 index: usize, 182 } 183 184 #[derive(Debug)] 185 struct Stack<T> { 186 /// Head of the stack 187 head: Option<usize>, 188 _p: PhantomData<fn() -> T>, 189 } 190 191 #[derive(Debug)] 192 struct Data<T> { 193 /// The data being stored in the queue and will be returned at the requested 194 /// instant. 195 inner: T, 196 197 /// The instant at which the item is returned. 198 when: u64, 199 200 /// Set to true when stored in the `expired` queue 201 expired: bool, 202 203 /// Next entry in the stack 204 next: Option<usize>, 205 206 /// Previous entry in the stack 207 prev: Option<usize>, 208 } 209 210 /// Maximum number of entries the queue can handle 211 const MAX_ENTRIES: usize = (1 << 30) - 1; 212 213 impl<T> DelayQueue<T> { 214 /// Creates a new, empty, `DelayQueue`. 215 /// 216 /// The queue will not allocate storage until items are inserted into it. 217 /// 218 /// # Examples 219 /// 220 /// ```rust 221 /// # use tokio_util::time::DelayQueue; 222 /// let delay_queue: DelayQueue<u32> = DelayQueue::new(); 223 /// ``` new() -> DelayQueue<T>224 pub fn new() -> DelayQueue<T> { 225 DelayQueue::with_capacity(0) 226 } 227 228 /// Creates a new, empty, `DelayQueue` with the specified capacity. 229 /// 230 /// The queue will be able to hold at least `capacity` elements without 231 /// reallocating. If `capacity` is 0, the queue will not allocate for 232 /// storage. 233 /// 234 /// # Examples 235 /// 236 /// ```rust 237 /// # use tokio_util::time::DelayQueue; 238 /// # use std::time::Duration; 239 /// 240 /// # #[tokio::main] 241 /// # async fn main() { 242 /// let mut delay_queue = DelayQueue::with_capacity(10); 243 /// 244 /// // These insertions are done without further allocation 245 /// for i in 0..10 { 246 /// delay_queue.insert(i, Duration::from_secs(i)); 247 /// } 248 /// 249 /// // This will make the queue allocate additional storage 250 /// delay_queue.insert(11, Duration::from_secs(11)); 251 /// # } 252 /// ``` with_capacity(capacity: usize) -> DelayQueue<T>253 pub fn with_capacity(capacity: usize) -> DelayQueue<T> { 254 DelayQueue { 255 wheel: Wheel::new(), 256 slab: Slab::with_capacity(capacity), 257 expired: Stack::default(), 258 delay: None, 259 wheel_now: 0, 260 start: Instant::now(), 261 waker: None, 262 } 263 } 264 265 /// Inserts `value` into the queue set to expire at a specific instant in 266 /// time. 267 /// 268 /// This function is identical to `insert`, but takes an `Instant` instead 269 /// of a `Duration`. 270 /// 271 /// `value` is stored in the queue until `when` is reached. At which point, 272 /// `value` will be returned from [`poll_expired`]. If `when` has already been 273 /// reached, then `value` is immediately made available to poll. 274 /// 275 /// The return value represents the insertion and is used as an argument to 276 /// [`remove`] and [`reset`]. Note that [`Key`] is a token and is reused once 277 /// `value` is removed from the queue either by calling [`poll_expired`] after 278 /// `when` is reached or by calling [`remove`]. At this point, the caller 279 /// must take care to not use the returned [`Key`] again as it may reference 280 /// a different item in the queue. 281 /// 282 /// See [type] level documentation for more details. 283 /// 284 /// # Panics 285 /// 286 /// This function panics if `when` is too far in the future. 287 /// 288 /// # Examples 289 /// 290 /// Basic usage 291 /// 292 /// ```rust 293 /// use tokio::time::{Duration, Instant}; 294 /// use tokio_util::time::DelayQueue; 295 /// 296 /// # #[tokio::main] 297 /// # async fn main() { 298 /// let mut delay_queue = DelayQueue::new(); 299 /// let key = delay_queue.insert_at( 300 /// "foo", Instant::now() + Duration::from_secs(5)); 301 /// 302 /// // Remove the entry 303 /// let item = delay_queue.remove(&key); 304 /// assert_eq!(*item.get_ref(), "foo"); 305 /// # } 306 /// ``` 307 /// 308 /// [`poll_expired`]: method@Self::poll_expired 309 /// [`remove`]: method@Self::remove 310 /// [`reset`]: method@Self::reset 311 /// [`Key`]: struct@Key 312 /// [type]: # insert_at(&mut self, value: T, when: Instant) -> Key313 pub fn insert_at(&mut self, value: T, when: Instant) -> Key { 314 assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded"); 315 316 // Normalize the deadline. Values cannot be set to expire in the past. 317 let when = self.normalize_deadline(when); 318 319 // Insert the value in the store 320 let key = self.slab.insert(Data { 321 inner: value, 322 when, 323 expired: false, 324 next: None, 325 prev: None, 326 }); 327 328 self.insert_idx(when, key); 329 330 // Set a new delay if the current's deadline is later than the one of the new item 331 let should_set_delay = if let Some(ref delay) = self.delay { 332 let current_exp = self.normalize_deadline(delay.deadline()); 333 current_exp > when 334 } else { 335 true 336 }; 337 338 if should_set_delay { 339 if let Some(waker) = self.waker.take() { 340 waker.wake(); 341 } 342 343 let delay_time = self.start + Duration::from_millis(when); 344 if let Some(ref mut delay) = &mut self.delay { 345 delay.as_mut().reset(delay_time); 346 } else { 347 self.delay = Some(Box::pin(sleep_until(delay_time))); 348 } 349 } 350 351 Key::new(key) 352 } 353 354 /// Attempts to pull out the next value of the delay queue, registering the 355 /// current task for wakeup if the value is not yet available, and returning 356 /// `None` if the queue is exhausted. poll_expired( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<Expired<T>, Error>>>357 pub fn poll_expired( 358 &mut self, 359 cx: &mut task::Context<'_>, 360 ) -> Poll<Option<Result<Expired<T>, Error>>> { 361 if !self 362 .waker 363 .as_ref() 364 .map(|w| w.will_wake(cx.waker())) 365 .unwrap_or(false) 366 { 367 self.waker = Some(cx.waker().clone()); 368 } 369 370 let item = ready!(self.poll_idx(cx)); 371 Poll::Ready(item.map(|result| { 372 result.map(|idx| { 373 let data = self.slab.remove(idx); 374 debug_assert!(data.next.is_none()); 375 debug_assert!(data.prev.is_none()); 376 377 Expired { 378 key: Key::new(idx), 379 data: data.inner, 380 deadline: self.start + Duration::from_millis(data.when), 381 } 382 }) 383 })) 384 } 385 386 /// Inserts `value` into the queue set to expire after the requested duration 387 /// elapses. 388 /// 389 /// This function is identical to `insert_at`, but takes a `Duration` 390 /// instead of an `Instant`. 391 /// 392 /// `value` is stored in the queue until `timeout` duration has 393 /// elapsed after `insert` was called. At that point, `value` will 394 /// be returned from [`poll_expired`]. If `timeout` is a `Duration` of 395 /// zero, then `value` is immediately made available to poll. 396 /// 397 /// The return value represents the insertion and is used as an 398 /// argument to [`remove`] and [`reset`]. Note that [`Key`] is a 399 /// token and is reused once `value` is removed from the queue 400 /// either by calling [`poll_expired`] after `timeout` has elapsed 401 /// or by calling [`remove`]. At this point, the caller must not 402 /// use the returned [`Key`] again as it may reference a different 403 /// item in the queue. 404 /// 405 /// See [type] level documentation for more details. 406 /// 407 /// # Panics 408 /// 409 /// This function panics if `timeout` is greater than the maximum 410 /// duration supported by the timer in the current `Runtime`. 411 /// 412 /// # Examples 413 /// 414 /// Basic usage 415 /// 416 /// ```rust 417 /// use tokio_util::time::DelayQueue; 418 /// use std::time::Duration; 419 /// 420 /// # #[tokio::main] 421 /// # async fn main() { 422 /// let mut delay_queue = DelayQueue::new(); 423 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 424 /// 425 /// // Remove the entry 426 /// let item = delay_queue.remove(&key); 427 /// assert_eq!(*item.get_ref(), "foo"); 428 /// # } 429 /// ``` 430 /// 431 /// [`poll_expired`]: method@Self::poll_expired 432 /// [`remove`]: method@Self::remove 433 /// [`reset`]: method@Self::reset 434 /// [`Key`]: struct@Key 435 /// [type]: # insert(&mut self, value: T, timeout: Duration) -> Key436 pub fn insert(&mut self, value: T, timeout: Duration) -> Key { 437 self.insert_at(value, Instant::now() + timeout) 438 } 439 insert_idx(&mut self, when: u64, key: usize)440 fn insert_idx(&mut self, when: u64, key: usize) { 441 use self::wheel::{InsertError, Stack}; 442 443 // Register the deadline with the timer wheel 444 match self.wheel.insert(when, key, &mut self.slab) { 445 Ok(_) => {} 446 Err((_, InsertError::Elapsed)) => { 447 self.slab[key].expired = true; 448 // The delay is already expired, store it in the expired queue 449 self.expired.push(key, &mut self.slab); 450 } 451 Err((_, err)) => panic!("invalid deadline; err={:?}", err), 452 } 453 } 454 455 /// Removes the key from the expired queue or the timer wheel 456 /// depending on its expiration status. 457 /// 458 /// # Panics 459 /// 460 /// Panics if the key is not contained in the expired queue or the wheel. remove_key(&mut self, key: &Key)461 fn remove_key(&mut self, key: &Key) { 462 use crate::time::wheel::Stack; 463 464 // Special case the `expired` queue 465 if self.slab[key.index].expired { 466 self.expired.remove(&key.index, &mut self.slab); 467 } else { 468 self.wheel.remove(&key.index, &mut self.slab); 469 } 470 } 471 472 /// Removes the item associated with `key` from the queue. 473 /// 474 /// There must be an item associated with `key`. The function returns the 475 /// removed item as well as the `Instant` at which it will the delay will 476 /// have expired. 477 /// 478 /// # Panics 479 /// 480 /// The function panics if `key` is not contained by the queue. 481 /// 482 /// # Examples 483 /// 484 /// Basic usage 485 /// 486 /// ```rust 487 /// use tokio_util::time::DelayQueue; 488 /// use std::time::Duration; 489 /// 490 /// # #[tokio::main] 491 /// # async fn main() { 492 /// let mut delay_queue = DelayQueue::new(); 493 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 494 /// 495 /// // Remove the entry 496 /// let item = delay_queue.remove(&key); 497 /// assert_eq!(*item.get_ref(), "foo"); 498 /// # } 499 /// ``` remove(&mut self, key: &Key) -> Expired<T>500 pub fn remove(&mut self, key: &Key) -> Expired<T> { 501 self.remove_key(key); 502 let data = self.slab.remove(key.index); 503 504 Expired { 505 key: Key::new(key.index), 506 data: data.inner, 507 deadline: self.start + Duration::from_millis(data.when), 508 } 509 } 510 511 /// Sets the delay of the item associated with `key` to expire at `when`. 512 /// 513 /// This function is identical to `reset` but takes an `Instant` instead of 514 /// a `Duration`. 515 /// 516 /// The item remains in the queue but the delay is set to expire at `when`. 517 /// If `when` is in the past, then the item is immediately made available to 518 /// the caller. 519 /// 520 /// # Panics 521 /// 522 /// This function panics if `when` is too far in the future or if `key` is 523 /// not contained by the queue. 524 /// 525 /// # Examples 526 /// 527 /// Basic usage 528 /// 529 /// ```rust 530 /// use tokio::time::{Duration, Instant}; 531 /// use tokio_util::time::DelayQueue; 532 /// 533 /// # #[tokio::main] 534 /// # async fn main() { 535 /// let mut delay_queue = DelayQueue::new(); 536 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 537 /// 538 /// // "foo" is scheduled to be returned in 5 seconds 539 /// 540 /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10)); 541 /// 542 /// // "foo" is now scheduled to be returned in 10 seconds 543 /// # } 544 /// ``` reset_at(&mut self, key: &Key, when: Instant)545 pub fn reset_at(&mut self, key: &Key, when: Instant) { 546 self.remove_key(key); 547 548 // Normalize the deadline. Values cannot be set to expire in the past. 549 let when = self.normalize_deadline(when); 550 551 self.slab[key.index].when = when; 552 self.slab[key.index].expired = false; 553 554 self.insert_idx(when, key.index); 555 556 let next_deadline = self.next_deadline(); 557 if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) { 558 // This should awaken us if necessary (ie, if already expired) 559 delay.as_mut().reset(deadline); 560 } 561 } 562 563 /// Returns the next time to poll as determined by the wheel next_deadline(&mut self) -> Option<Instant>564 fn next_deadline(&mut self) -> Option<Instant> { 565 self.wheel 566 .poll_at() 567 .map(|poll_at| self.start + Duration::from_millis(poll_at)) 568 } 569 570 /// Sets the delay of the item associated with `key` to expire after 571 /// `timeout`. 572 /// 573 /// This function is identical to `reset_at` but takes a `Duration` instead 574 /// of an `Instant`. 575 /// 576 /// The item remains in the queue but the delay is set to expire after 577 /// `timeout`. If `timeout` is zero, then the item is immediately made 578 /// available to the caller. 579 /// 580 /// # Panics 581 /// 582 /// This function panics if `timeout` is greater than the maximum supported 583 /// duration or if `key` is not contained by the queue. 584 /// 585 /// # Examples 586 /// 587 /// Basic usage 588 /// 589 /// ```rust 590 /// use tokio_util::time::DelayQueue; 591 /// use std::time::Duration; 592 /// 593 /// # #[tokio::main] 594 /// # async fn main() { 595 /// let mut delay_queue = DelayQueue::new(); 596 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 597 /// 598 /// // "foo" is scheduled to be returned in 5 seconds 599 /// 600 /// delay_queue.reset(&key, Duration::from_secs(10)); 601 /// 602 /// // "foo"is now scheduled to be returned in 10 seconds 603 /// # } 604 /// ``` reset(&mut self, key: &Key, timeout: Duration)605 pub fn reset(&mut self, key: &Key, timeout: Duration) { 606 self.reset_at(key, Instant::now() + timeout); 607 } 608 609 /// Clears the queue, removing all items. 610 /// 611 /// After calling `clear`, [`poll_expired`] will return `Ok(Ready(None))`. 612 /// 613 /// Note that this method has no effect on the allocated capacity. 614 /// 615 /// [`poll_expired`]: method@Self::poll_expired 616 /// 617 /// # Examples 618 /// 619 /// ```rust 620 /// use tokio_util::time::DelayQueue; 621 /// use std::time::Duration; 622 /// 623 /// # #[tokio::main] 624 /// # async fn main() { 625 /// let mut delay_queue = DelayQueue::new(); 626 /// 627 /// delay_queue.insert("foo", Duration::from_secs(5)); 628 /// 629 /// assert!(!delay_queue.is_empty()); 630 /// 631 /// delay_queue.clear(); 632 /// 633 /// assert!(delay_queue.is_empty()); 634 /// # } 635 /// ``` clear(&mut self)636 pub fn clear(&mut self) { 637 self.slab.clear(); 638 self.expired = Stack::default(); 639 self.wheel = Wheel::new(); 640 self.delay = None; 641 } 642 643 /// Returns the number of elements the queue can hold without reallocating. 644 /// 645 /// # Examples 646 /// 647 /// ```rust 648 /// use tokio_util::time::DelayQueue; 649 /// 650 /// let delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10); 651 /// assert_eq!(delay_queue.capacity(), 10); 652 /// ``` capacity(&self) -> usize653 pub fn capacity(&self) -> usize { 654 self.slab.capacity() 655 } 656 657 /// Returns the number of elements currently in the queue. 658 /// 659 /// # Examples 660 /// 661 /// ```rust 662 /// use tokio_util::time::DelayQueue; 663 /// use std::time::Duration; 664 /// 665 /// # #[tokio::main] 666 /// # async fn main() { 667 /// let mut delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10); 668 /// assert_eq!(delay_queue.len(), 0); 669 /// delay_queue.insert(3, Duration::from_secs(5)); 670 /// assert_eq!(delay_queue.len(), 1); 671 /// # } 672 /// ``` len(&self) -> usize673 pub fn len(&self) -> usize { 674 self.slab.len() 675 } 676 677 /// Reserves capacity for at least `additional` more items to be queued 678 /// without allocating. 679 /// 680 /// `reserve` does nothing if the queue already has sufficient capacity for 681 /// `additional` more values. If more capacity is required, a new segment of 682 /// memory will be allocated and all existing values will be copied into it. 683 /// As such, if the queue is already very large, a call to `reserve` can end 684 /// up being expensive. 685 /// 686 /// The queue may reserve more than `additional` extra space in order to 687 /// avoid frequent reallocations. 688 /// 689 /// # Panics 690 /// 691 /// Panics if the new capacity exceeds the maximum number of entries the 692 /// queue can contain. 693 /// 694 /// # Examples 695 /// 696 /// ``` 697 /// use tokio_util::time::DelayQueue; 698 /// use std::time::Duration; 699 /// 700 /// # #[tokio::main] 701 /// # async fn main() { 702 /// let mut delay_queue = DelayQueue::new(); 703 /// 704 /// delay_queue.insert("hello", Duration::from_secs(10)); 705 /// delay_queue.reserve(10); 706 /// 707 /// assert!(delay_queue.capacity() >= 11); 708 /// # } 709 /// ``` reserve(&mut self, additional: usize)710 pub fn reserve(&mut self, additional: usize) { 711 self.slab.reserve(additional); 712 } 713 714 /// Returns `true` if there are no items in the queue. 715 /// 716 /// Note that this function returns `false` even if all items have not yet 717 /// expired and a call to `poll` will return `Poll::Pending`. 718 /// 719 /// # Examples 720 /// 721 /// ``` 722 /// use tokio_util::time::DelayQueue; 723 /// use std::time::Duration; 724 /// 725 /// # #[tokio::main] 726 /// # async fn main() { 727 /// let mut delay_queue = DelayQueue::new(); 728 /// assert!(delay_queue.is_empty()); 729 /// 730 /// delay_queue.insert("hello", Duration::from_secs(5)); 731 /// assert!(!delay_queue.is_empty()); 732 /// # } 733 /// ``` is_empty(&self) -> bool734 pub fn is_empty(&self) -> bool { 735 self.slab.is_empty() 736 } 737 738 /// Polls the queue, returning the index of the next slot in the slab that 739 /// should be returned. 740 /// 741 /// A slot should be returned when the associated deadline has been reached. poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<usize, Error>>>742 fn poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<usize, Error>>> { 743 use self::wheel::Stack; 744 745 let expired = self.expired.pop(&mut self.slab); 746 747 if expired.is_some() { 748 return Poll::Ready(expired.map(Ok)); 749 } 750 751 loop { 752 if let Some(ref mut delay) = self.delay { 753 if !delay.is_elapsed() { 754 ready!(Pin::new(&mut *delay).poll(cx)); 755 } 756 757 let now = crate::time::ms(delay.deadline() - self.start, crate::time::Round::Down); 758 759 self.wheel_now = now; 760 } 761 762 // We poll the wheel to get the next value out before finding the next deadline. 763 let wheel_idx = self.wheel.poll(self.wheel_now, &mut self.slab); 764 765 self.delay = self.next_deadline().map(|when| Box::pin(sleep_until(when))); 766 767 if let Some(idx) = wheel_idx { 768 return Poll::Ready(Some(Ok(idx))); 769 } 770 771 if self.delay.is_none() { 772 return Poll::Ready(None); 773 } 774 } 775 } 776 normalize_deadline(&self, when: Instant) -> u64777 fn normalize_deadline(&self, when: Instant) -> u64 { 778 let when = if when < self.start { 779 0 780 } else { 781 crate::time::ms(when - self.start, crate::time::Round::Up) 782 }; 783 784 cmp::max(when, self.wheel.elapsed()) 785 } 786 } 787 788 // We never put `T` in a `Pin`... 789 impl<T> Unpin for DelayQueue<T> {} 790 791 impl<T> Default for DelayQueue<T> { default() -> DelayQueue<T>792 fn default() -> DelayQueue<T> { 793 DelayQueue::new() 794 } 795 } 796 797 impl<T> futures_core::Stream for DelayQueue<T> { 798 // DelayQueue seems much more specific, where a user may care that it 799 // has reached capacity, so return those errors instead of panicking. 800 type Item = Result<Expired<T>, Error>; 801 poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>>802 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { 803 DelayQueue::poll_expired(self.get_mut(), cx) 804 } 805 } 806 807 impl<T> wheel::Stack for Stack<T> { 808 type Owned = usize; 809 type Borrowed = usize; 810 type Store = Slab<Data<T>>; 811 is_empty(&self) -> bool812 fn is_empty(&self) -> bool { 813 self.head.is_none() 814 } 815 push(&mut self, item: Self::Owned, store: &mut Self::Store)816 fn push(&mut self, item: Self::Owned, store: &mut Self::Store) { 817 // Ensure the entry is not already in a stack. 818 debug_assert!(store[item].next.is_none()); 819 debug_assert!(store[item].prev.is_none()); 820 821 // Remove the old head entry 822 let old = self.head.take(); 823 824 if let Some(idx) = old { 825 store[idx].prev = Some(item); 826 } 827 828 store[item].next = old; 829 self.head = Some(item) 830 } 831 pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned>832 fn pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned> { 833 if let Some(idx) = self.head { 834 self.head = store[idx].next; 835 836 if let Some(idx) = self.head { 837 store[idx].prev = None; 838 } 839 840 store[idx].next = None; 841 debug_assert!(store[idx].prev.is_none()); 842 843 Some(idx) 844 } else { 845 None 846 } 847 } 848 remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store)849 fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) { 850 assert!(store.contains(*item)); 851 852 // Ensure that the entry is in fact contained by the stack 853 debug_assert!({ 854 // This walks the full linked list even if an entry is found. 855 let mut next = self.head; 856 let mut contains = false; 857 858 while let Some(idx) = next { 859 if idx == *item { 860 debug_assert!(!contains); 861 contains = true; 862 } 863 864 next = store[idx].next; 865 } 866 867 contains 868 }); 869 870 if let Some(next) = store[*item].next { 871 store[next].prev = store[*item].prev; 872 } 873 874 if let Some(prev) = store[*item].prev { 875 store[prev].next = store[*item].next; 876 } else { 877 self.head = store[*item].next; 878 } 879 880 store[*item].next = None; 881 store[*item].prev = None; 882 } 883 when(item: &Self::Borrowed, store: &Self::Store) -> u64884 fn when(item: &Self::Borrowed, store: &Self::Store) -> u64 { 885 store[*item].when 886 } 887 } 888 889 impl<T> Default for Stack<T> { default() -> Stack<T>890 fn default() -> Stack<T> { 891 Stack { 892 head: None, 893 _p: PhantomData, 894 } 895 } 896 } 897 898 impl Key { new(index: usize) -> Key899 pub(crate) fn new(index: usize) -> Key { 900 Key { index } 901 } 902 } 903 904 impl<T> Expired<T> { 905 /// Returns a reference to the inner value. get_ref(&self) -> &T906 pub fn get_ref(&self) -> &T { 907 &self.data 908 } 909 910 /// Returns a mutable reference to the inner value. get_mut(&mut self) -> &mut T911 pub fn get_mut(&mut self) -> &mut T { 912 &mut self.data 913 } 914 915 /// Consumes `self` and returns the inner value. into_inner(self) -> T916 pub fn into_inner(self) -> T { 917 self.data 918 } 919 920 /// Returns the deadline that the expiration was set to. deadline(&self) -> Instant921 pub fn deadline(&self) -> Instant { 922 self.deadline 923 } 924 } 925