1 //! A queue of delayed elements. 2 //! 3 //! See [`DelayQueue`] for more details. 4 //! 5 //! [`DelayQueue`]: struct@DelayQueue 6 7 use crate::time::wheel::{self, Wheel}; 8 9 use futures_core::ready; 10 use tokio::time::{error::Error, sleep_until, Duration, Instant, Sleep}; 11 12 use slab::Slab; 13 use std::cmp; 14 use std::future::Future; 15 use std::marker::PhantomData; 16 use std::pin::Pin; 17 use std::task::{self, Poll, Waker}; 18 19 /// A queue of delayed elements. 20 /// 21 /// Once an element is inserted into the `DelayQueue`, it is yielded once the 22 /// specified deadline has been reached. 23 /// 24 /// # Usage 25 /// 26 /// Elements are inserted into `DelayQueue` using the [`insert`] or 27 /// [`insert_at`] methods. A deadline is provided with the item and a [`Key`] is 28 /// returned. The key is used to remove the entry or to change the deadline at 29 /// which it should be yielded back. 30 /// 31 /// Once delays have been configured, the `DelayQueue` is used via its 32 /// [`Stream`] implementation. [`poll_expired`] is called. If an entry has reached its 33 /// deadline, it is returned. If not, `Poll::Pending` is returned indicating that the 34 /// current task will be notified once the deadline has been reached. 35 /// 36 /// # `Stream` implementation 37 /// 38 /// Items are retrieved from the queue via [`DelayQueue::poll_expired`]. If no delays have 39 /// expired, no items are returned. In this case, `Poll::Pending` is returned and the 40 /// current task is registered to be notified once the next item's delay has 41 /// expired. 42 /// 43 /// If no items are in the queue, i.e. `is_empty()` returns `true`, then `poll` 44 /// returns `Poll::Ready(None)`. This indicates that the stream has reached an end. 45 /// However, if a new item is inserted *after*, `poll` will once again start 46 /// returning items or `Poll::Pending`. 47 /// 48 /// Items are returned ordered by their expirations. Items that are configured 49 /// to expire first will be returned first. There are no ordering guarantees 50 /// for items configured to expire at the same instant. Also note that delays are 51 /// rounded to the closest millisecond. 52 /// 53 /// # Implementation 54 /// 55 /// The [`DelayQueue`] is backed by a separate instance of a timer wheel similar to that used internally 56 /// by Tokio's standalone timer utilities such as [`sleep`]. Because of this, it offers the same 57 /// performance and scalability benefits. 58 /// 59 /// State associated with each entry is stored in a [`slab`]. This amortizes the cost of allocation, 60 /// and allows reuse of the memory allocated for expired entires. 61 /// 62 /// Capacity can be checked using [`capacity`] and allocated preemptively by using 63 /// the [`reserve`] method. 64 /// 65 /// # Usage 66 /// 67 /// Using `DelayQueue` to manage cache entries. 68 /// 69 /// ```rust,no_run 70 /// use tokio::time::error::Error; 71 /// use tokio_util::time::{DelayQueue, delay_queue}; 72 /// 73 /// use futures::ready; 74 /// use std::collections::HashMap; 75 /// use std::task::{Context, Poll}; 76 /// use std::time::Duration; 77 /// # type CacheKey = String; 78 /// # type Value = String; 79 /// 80 /// struct Cache { 81 /// entries: HashMap<CacheKey, (Value, delay_queue::Key)>, 82 /// expirations: DelayQueue<CacheKey>, 83 /// } 84 /// 85 /// const TTL_SECS: u64 = 30; 86 /// 87 /// impl Cache { 88 /// fn insert(&mut self, key: CacheKey, value: Value) { 89 /// let delay = self.expirations 90 /// .insert(key.clone(), Duration::from_secs(TTL_SECS)); 91 /// 92 /// self.entries.insert(key, (value, delay)); 93 /// } 94 /// 95 /// fn get(&self, key: &CacheKey) -> Option<&Value> { 96 /// self.entries.get(key) 97 /// .map(|&(ref v, _)| v) 98 /// } 99 /// 100 /// fn remove(&mut self, key: &CacheKey) { 101 /// if let Some((_, cache_key)) = self.entries.remove(key) { 102 /// self.expirations.remove(&cache_key); 103 /// } 104 /// } 105 /// 106 /// fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> { 107 /// while let Some(res) = ready!(self.expirations.poll_expired(cx)) { 108 /// let entry = res?; 109 /// self.entries.remove(entry.get_ref()); 110 /// } 111 /// 112 /// Poll::Ready(Ok(())) 113 /// } 114 /// } 115 /// ``` 116 /// 117 /// [`insert`]: method@Self::insert 118 /// [`insert_at`]: method@Self::insert_at 119 /// [`Key`]: struct@Key 120 /// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html 121 /// [`poll_expired`]: method@Self::poll_expired 122 /// [`Stream::poll_expired`]: method@Self::poll_expired 123 /// [`DelayQueue`]: struct@DelayQueue 124 /// [`sleep`]: fn@tokio::time::sleep 125 /// [`slab`]: slab 126 /// [`capacity`]: method@Self::capacity 127 /// [`reserve`]: method@Self::reserve 128 #[derive(Debug)] 129 pub struct DelayQueue<T> { 130 /// Stores data associated with entries 131 slab: Slab<Data<T>>, 132 133 /// Lookup structure tracking all delays in the queue 134 wheel: Wheel<Stack<T>>, 135 136 /// Delays that were inserted when already expired. These cannot be stored 137 /// in the wheel 138 expired: Stack<T>, 139 140 /// Delay expiring when the *first* item in the queue expires 141 delay: Option<Pin<Box<Sleep>>>, 142 143 /// Wheel polling state 144 wheel_now: u64, 145 146 /// Instant at which the timer starts 147 start: Instant, 148 149 /// Waker that is invoked when we potentially need to reset the timer. 150 /// Because we lazily create the timer when the first entry is created, we 151 /// need to awaken any poller that polled us before that point. 152 waker: Option<Waker>, 153 } 154 155 /// An entry in `DelayQueue` that has expired and been removed. 156 /// 157 /// Values are returned by [`DelayQueue::poll_expired`]. 158 /// 159 /// [`DelayQueue::poll_expired`]: method@DelayQueue::poll_expired 160 #[derive(Debug)] 161 pub struct Expired<T> { 162 /// The data stored in the queue 163 data: T, 164 165 /// The expiration time 166 deadline: Instant, 167 168 /// The key associated with the entry 169 key: Key, 170 } 171 172 /// Token to a value stored in a `DelayQueue`. 173 /// 174 /// Instances of `Key` are returned by [`DelayQueue::insert`]. See [`DelayQueue`] 175 /// documentation for more details. 176 /// 177 /// [`DelayQueue`]: struct@DelayQueue 178 /// [`DelayQueue::insert`]: method@DelayQueue::insert 179 #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] 180 pub struct Key { 181 index: usize, 182 } 183 184 #[derive(Debug)] 185 struct Stack<T> { 186 /// Head of the stack 187 head: Option<usize>, 188 _p: PhantomData<fn() -> T>, 189 } 190 191 #[derive(Debug)] 192 struct Data<T> { 193 /// The data being stored in the queue and will be returned at the requested 194 /// instant. 195 inner: T, 196 197 /// The instant at which the item is returned. 198 when: u64, 199 200 /// Set to true when stored in the `expired` queue 201 expired: bool, 202 203 /// Next entry in the stack 204 next: Option<usize>, 205 206 /// Previous entry in the stack 207 prev: Option<usize>, 208 } 209 210 /// Maximum number of entries the queue can handle 211 const MAX_ENTRIES: usize = (1 << 30) - 1; 212 213 impl<T> DelayQueue<T> { 214 /// Creates a new, empty, `DelayQueue`. 215 /// 216 /// The queue will not allocate storage until items are inserted into it. 217 /// 218 /// # Examples 219 /// 220 /// ```rust 221 /// # use tokio_util::time::DelayQueue; 222 /// let delay_queue: DelayQueue<u32> = DelayQueue::new(); 223 /// ``` new() -> DelayQueue<T>224 pub fn new() -> DelayQueue<T> { 225 DelayQueue::with_capacity(0) 226 } 227 228 /// Creates a new, empty, `DelayQueue` with the specified capacity. 229 /// 230 /// The queue will be able to hold at least `capacity` elements without 231 /// reallocating. If `capacity` is 0, the queue will not allocate for 232 /// storage. 233 /// 234 /// # Examples 235 /// 236 /// ```rust 237 /// # use tokio_util::time::DelayQueue; 238 /// # use std::time::Duration; 239 /// 240 /// # #[tokio::main] 241 /// # async fn main() { 242 /// let mut delay_queue = DelayQueue::with_capacity(10); 243 /// 244 /// // These insertions are done without further allocation 245 /// for i in 0..10 { 246 /// delay_queue.insert(i, Duration::from_secs(i)); 247 /// } 248 /// 249 /// // This will make the queue allocate additional storage 250 /// delay_queue.insert(11, Duration::from_secs(11)); 251 /// # } 252 /// ``` with_capacity(capacity: usize) -> DelayQueue<T>253 pub fn with_capacity(capacity: usize) -> DelayQueue<T> { 254 DelayQueue { 255 wheel: Wheel::new(), 256 slab: Slab::with_capacity(capacity), 257 expired: Stack::default(), 258 delay: None, 259 wheel_now: 0, 260 start: Instant::now(), 261 waker: None, 262 } 263 } 264 265 /// Inserts `value` into the queue set to expire at a specific instant in 266 /// time. 267 /// 268 /// This function is identical to `insert`, but takes an `Instant` instead 269 /// of a `Duration`. 270 /// 271 /// `value` is stored in the queue until `when` is reached. At which point, 272 /// `value` will be returned from [`poll_expired`]. If `when` has already been 273 /// reached, then `value` is immediately made available to poll. 274 /// 275 /// The return value represents the insertion and is used as an argument to 276 /// [`remove`] and [`reset`]. Note that [`Key`] is a token and is reused once 277 /// `value` is removed from the queue either by calling [`poll_expired`] after 278 /// `when` is reached or by calling [`remove`]. At this point, the caller 279 /// must take care to not use the returned [`Key`] again as it may reference 280 /// a different item in the queue. 281 /// 282 /// See [type] level documentation for more details. 283 /// 284 /// # Panics 285 /// 286 /// This function panics if `when` is too far in the future. 287 /// 288 /// # Examples 289 /// 290 /// Basic usage 291 /// 292 /// ```rust 293 /// use tokio::time::{Duration, Instant}; 294 /// use tokio_util::time::DelayQueue; 295 /// 296 /// # #[tokio::main] 297 /// # async fn main() { 298 /// let mut delay_queue = DelayQueue::new(); 299 /// let key = delay_queue.insert_at( 300 /// "foo", Instant::now() + Duration::from_secs(5)); 301 /// 302 /// // Remove the entry 303 /// let item = delay_queue.remove(&key); 304 /// assert_eq!(*item.get_ref(), "foo"); 305 /// # } 306 /// ``` 307 /// 308 /// [`poll_expired`]: method@Self::poll_expired 309 /// [`remove`]: method@Self::remove 310 /// [`reset`]: method@Self::reset 311 /// [`Key`]: struct@Key 312 /// [type]: # insert_at(&mut self, value: T, when: Instant) -> Key313 pub fn insert_at(&mut self, value: T, when: Instant) -> Key { 314 assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded"); 315 316 // Normalize the deadline. Values cannot be set to expire in the past. 317 let when = self.normalize_deadline(when); 318 319 // Insert the value in the store 320 let key = self.slab.insert(Data { 321 inner: value, 322 when, 323 expired: false, 324 next: None, 325 prev: None, 326 }); 327 328 self.insert_idx(when, key); 329 330 // Set a new delay if the current's deadline is later than the one of the new item 331 let should_set_delay = if let Some(ref delay) = self.delay { 332 let current_exp = self.normalize_deadline(delay.deadline()); 333 current_exp > when 334 } else { 335 true 336 }; 337 338 if should_set_delay { 339 if let Some(waker) = self.waker.take() { 340 waker.wake(); 341 } 342 343 let delay_time = self.start + Duration::from_millis(when); 344 if let Some(ref mut delay) = &mut self.delay { 345 delay.as_mut().reset(delay_time); 346 } else { 347 self.delay = Some(Box::pin(sleep_until(delay_time))); 348 } 349 } 350 351 Key::new(key) 352 } 353 354 /// Attempts to pull out the next value of the delay queue, registering the 355 /// current task for wakeup if the value is not yet available, and returning 356 /// `None` if the queue is exhausted. poll_expired( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<Expired<T>, Error>>>357 pub fn poll_expired( 358 &mut self, 359 cx: &mut task::Context<'_>, 360 ) -> Poll<Option<Result<Expired<T>, Error>>> { 361 if !self 362 .waker 363 .as_ref() 364 .map(|w| w.will_wake(cx.waker())) 365 .unwrap_or(false) 366 { 367 self.waker = Some(cx.waker().clone()); 368 } 369 370 let item = ready!(self.poll_idx(cx)); 371 Poll::Ready(item.map(|result| { 372 result.map(|idx| { 373 let data = self.slab.remove(idx); 374 debug_assert!(data.next.is_none()); 375 debug_assert!(data.prev.is_none()); 376 377 Expired { 378 key: Key::new(idx), 379 data: data.inner, 380 deadline: self.start + Duration::from_millis(data.when), 381 } 382 }) 383 })) 384 } 385 386 /// Inserts `value` into the queue set to expire after the requested duration 387 /// elapses. 388 /// 389 /// This function is identical to `insert_at`, but takes a `Duration` 390 /// instead of an `Instant`. 391 /// 392 /// `value` is stored in the queue until `timeout` duration has 393 /// elapsed after `insert` was called. At that point, `value` will 394 /// be returned from [`poll_expired`]. If `timeout` is a `Duration` of 395 /// zero, then `value` is immediately made available to poll. 396 /// 397 /// The return value represents the insertion and is used as an 398 /// argument to [`remove`] and [`reset`]. Note that [`Key`] is a 399 /// token and is reused once `value` is removed from the queue 400 /// either by calling [`poll_expired`] after `timeout` has elapsed 401 /// or by calling [`remove`]. At this point, the caller must not 402 /// use the returned [`Key`] again as it may reference a different 403 /// item in the queue. 404 /// 405 /// See [type] level documentation for more details. 406 /// 407 /// # Panics 408 /// 409 /// This function panics if `timeout` is greater than the maximum 410 /// duration supported by the timer in the current `Runtime`. 411 /// 412 /// # Examples 413 /// 414 /// Basic usage 415 /// 416 /// ```rust 417 /// use tokio_util::time::DelayQueue; 418 /// use std::time::Duration; 419 /// 420 /// # #[tokio::main] 421 /// # async fn main() { 422 /// let mut delay_queue = DelayQueue::new(); 423 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 424 /// 425 /// // Remove the entry 426 /// let item = delay_queue.remove(&key); 427 /// assert_eq!(*item.get_ref(), "foo"); 428 /// # } 429 /// ``` 430 /// 431 /// [`poll_expired`]: method@Self::poll_expired 432 /// [`remove`]: method@Self::remove 433 /// [`reset`]: method@Self::reset 434 /// [`Key`]: struct@Key 435 /// [type]: # insert(&mut self, value: T, timeout: Duration) -> Key436 pub fn insert(&mut self, value: T, timeout: Duration) -> Key { 437 self.insert_at(value, Instant::now() + timeout) 438 } 439 insert_idx(&mut self, when: u64, key: usize)440 fn insert_idx(&mut self, when: u64, key: usize) { 441 use self::wheel::{InsertError, Stack}; 442 443 // Register the deadline with the timer wheel 444 match self.wheel.insert(when, key, &mut self.slab) { 445 Ok(_) => {} 446 Err((_, InsertError::Elapsed)) => { 447 self.slab[key].expired = true; 448 // The delay is already expired, store it in the expired queue 449 self.expired.push(key, &mut self.slab); 450 } 451 Err((_, err)) => panic!("invalid deadline; err={:?}", err), 452 } 453 } 454 455 /// Removes the key from the expired queue or the timer wheel 456 /// depending on its expiration status. 457 /// 458 /// # Panics 459 /// 460 /// Panics if the key is not contained in the expired queue or the wheel. remove_key(&mut self, key: &Key)461 fn remove_key(&mut self, key: &Key) { 462 use crate::time::wheel::Stack; 463 464 // Special case the `expired` queue 465 if self.slab[key.index].expired { 466 self.expired.remove(&key.index, &mut self.slab); 467 } else { 468 self.wheel.remove(&key.index, &mut self.slab); 469 } 470 } 471 472 /// Removes the item associated with `key` from the queue. 473 /// 474 /// There must be an item associated with `key`. The function returns the 475 /// removed item as well as the `Instant` at which it will the delay will 476 /// have expired. 477 /// 478 /// # Panics 479 /// 480 /// The function panics if `key` is not contained by the queue. 481 /// 482 /// # Examples 483 /// 484 /// Basic usage 485 /// 486 /// ```rust 487 /// use tokio_util::time::DelayQueue; 488 /// use std::time::Duration; 489 /// 490 /// # #[tokio::main] 491 /// # async fn main() { 492 /// let mut delay_queue = DelayQueue::new(); 493 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 494 /// 495 /// // Remove the entry 496 /// let item = delay_queue.remove(&key); 497 /// assert_eq!(*item.get_ref(), "foo"); 498 /// # } 499 /// ``` remove(&mut self, key: &Key) -> Expired<T>500 pub fn remove(&mut self, key: &Key) -> Expired<T> { 501 let prev_deadline = self.next_deadline(); 502 503 self.remove_key(key); 504 let data = self.slab.remove(key.index); 505 506 let next_deadline = self.next_deadline(); 507 if prev_deadline != next_deadline { 508 match (next_deadline, &mut self.delay) { 509 (None, _) => self.delay = None, 510 (Some(deadline), Some(delay)) => delay.as_mut().reset(deadline), 511 (Some(deadline), None) => self.delay = Some(Box::pin(sleep_until(deadline))), 512 } 513 } 514 515 Expired { 516 key: Key::new(key.index), 517 data: data.inner, 518 deadline: self.start + Duration::from_millis(data.when), 519 } 520 } 521 522 /// Sets the delay of the item associated with `key` to expire at `when`. 523 /// 524 /// This function is identical to `reset` but takes an `Instant` instead of 525 /// a `Duration`. 526 /// 527 /// The item remains in the queue but the delay is set to expire at `when`. 528 /// If `when` is in the past, then the item is immediately made available to 529 /// the caller. 530 /// 531 /// # Panics 532 /// 533 /// This function panics if `when` is too far in the future or if `key` is 534 /// not contained by the queue. 535 /// 536 /// # Examples 537 /// 538 /// Basic usage 539 /// 540 /// ```rust 541 /// use tokio::time::{Duration, Instant}; 542 /// use tokio_util::time::DelayQueue; 543 /// 544 /// # #[tokio::main] 545 /// # async fn main() { 546 /// let mut delay_queue = DelayQueue::new(); 547 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 548 /// 549 /// // "foo" is scheduled to be returned in 5 seconds 550 /// 551 /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10)); 552 /// 553 /// // "foo" is now scheduled to be returned in 10 seconds 554 /// # } 555 /// ``` reset_at(&mut self, key: &Key, when: Instant)556 pub fn reset_at(&mut self, key: &Key, when: Instant) { 557 self.remove_key(key); 558 559 // Normalize the deadline. Values cannot be set to expire in the past. 560 let when = self.normalize_deadline(when); 561 562 self.slab[key.index].when = when; 563 self.slab[key.index].expired = false; 564 565 self.insert_idx(when, key.index); 566 567 let next_deadline = self.next_deadline(); 568 if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) { 569 // This should awaken us if necessary (ie, if already expired) 570 delay.as_mut().reset(deadline); 571 } 572 } 573 574 /// Returns the next time to poll as determined by the wheel next_deadline(&mut self) -> Option<Instant>575 fn next_deadline(&mut self) -> Option<Instant> { 576 self.wheel 577 .poll_at() 578 .map(|poll_at| self.start + Duration::from_millis(poll_at)) 579 } 580 581 /// Sets the delay of the item associated with `key` to expire after 582 /// `timeout`. 583 /// 584 /// This function is identical to `reset_at` but takes a `Duration` instead 585 /// of an `Instant`. 586 /// 587 /// The item remains in the queue but the delay is set to expire after 588 /// `timeout`. If `timeout` is zero, then the item is immediately made 589 /// available to the caller. 590 /// 591 /// # Panics 592 /// 593 /// This function panics if `timeout` is greater than the maximum supported 594 /// duration or if `key` is not contained by the queue. 595 /// 596 /// # Examples 597 /// 598 /// Basic usage 599 /// 600 /// ```rust 601 /// use tokio_util::time::DelayQueue; 602 /// use std::time::Duration; 603 /// 604 /// # #[tokio::main] 605 /// # async fn main() { 606 /// let mut delay_queue = DelayQueue::new(); 607 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 608 /// 609 /// // "foo" is scheduled to be returned in 5 seconds 610 /// 611 /// delay_queue.reset(&key, Duration::from_secs(10)); 612 /// 613 /// // "foo"is now scheduled to be returned in 10 seconds 614 /// # } 615 /// ``` reset(&mut self, key: &Key, timeout: Duration)616 pub fn reset(&mut self, key: &Key, timeout: Duration) { 617 self.reset_at(key, Instant::now() + timeout); 618 } 619 620 /// Clears the queue, removing all items. 621 /// 622 /// After calling `clear`, [`poll_expired`] will return `Ok(Ready(None))`. 623 /// 624 /// Note that this method has no effect on the allocated capacity. 625 /// 626 /// [`poll_expired`]: method@Self::poll_expired 627 /// 628 /// # Examples 629 /// 630 /// ```rust 631 /// use tokio_util::time::DelayQueue; 632 /// use std::time::Duration; 633 /// 634 /// # #[tokio::main] 635 /// # async fn main() { 636 /// let mut delay_queue = DelayQueue::new(); 637 /// 638 /// delay_queue.insert("foo", Duration::from_secs(5)); 639 /// 640 /// assert!(!delay_queue.is_empty()); 641 /// 642 /// delay_queue.clear(); 643 /// 644 /// assert!(delay_queue.is_empty()); 645 /// # } 646 /// ``` clear(&mut self)647 pub fn clear(&mut self) { 648 self.slab.clear(); 649 self.expired = Stack::default(); 650 self.wheel = Wheel::new(); 651 self.delay = None; 652 } 653 654 /// Returns the number of elements the queue can hold without reallocating. 655 /// 656 /// # Examples 657 /// 658 /// ```rust 659 /// use tokio_util::time::DelayQueue; 660 /// 661 /// let delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10); 662 /// assert_eq!(delay_queue.capacity(), 10); 663 /// ``` capacity(&self) -> usize664 pub fn capacity(&self) -> usize { 665 self.slab.capacity() 666 } 667 668 /// Returns the number of elements currently in the queue. 669 /// 670 /// # Examples 671 /// 672 /// ```rust 673 /// use tokio_util::time::DelayQueue; 674 /// use std::time::Duration; 675 /// 676 /// # #[tokio::main] 677 /// # async fn main() { 678 /// let mut delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10); 679 /// assert_eq!(delay_queue.len(), 0); 680 /// delay_queue.insert(3, Duration::from_secs(5)); 681 /// assert_eq!(delay_queue.len(), 1); 682 /// # } 683 /// ``` len(&self) -> usize684 pub fn len(&self) -> usize { 685 self.slab.len() 686 } 687 688 /// Reserves capacity for at least `additional` more items to be queued 689 /// without allocating. 690 /// 691 /// `reserve` does nothing if the queue already has sufficient capacity for 692 /// `additional` more values. If more capacity is required, a new segment of 693 /// memory will be allocated and all existing values will be copied into it. 694 /// As such, if the queue is already very large, a call to `reserve` can end 695 /// up being expensive. 696 /// 697 /// The queue may reserve more than `additional` extra space in order to 698 /// avoid frequent reallocations. 699 /// 700 /// # Panics 701 /// 702 /// Panics if the new capacity exceeds the maximum number of entries the 703 /// queue can contain. 704 /// 705 /// # Examples 706 /// 707 /// ``` 708 /// use tokio_util::time::DelayQueue; 709 /// use std::time::Duration; 710 /// 711 /// # #[tokio::main] 712 /// # async fn main() { 713 /// let mut delay_queue = DelayQueue::new(); 714 /// 715 /// delay_queue.insert("hello", Duration::from_secs(10)); 716 /// delay_queue.reserve(10); 717 /// 718 /// assert!(delay_queue.capacity() >= 11); 719 /// # } 720 /// ``` reserve(&mut self, additional: usize)721 pub fn reserve(&mut self, additional: usize) { 722 self.slab.reserve(additional); 723 } 724 725 /// Returns `true` if there are no items in the queue. 726 /// 727 /// Note that this function returns `false` even if all items have not yet 728 /// expired and a call to `poll` will return `Poll::Pending`. 729 /// 730 /// # Examples 731 /// 732 /// ``` 733 /// use tokio_util::time::DelayQueue; 734 /// use std::time::Duration; 735 /// 736 /// # #[tokio::main] 737 /// # async fn main() { 738 /// let mut delay_queue = DelayQueue::new(); 739 /// assert!(delay_queue.is_empty()); 740 /// 741 /// delay_queue.insert("hello", Duration::from_secs(5)); 742 /// assert!(!delay_queue.is_empty()); 743 /// # } 744 /// ``` is_empty(&self) -> bool745 pub fn is_empty(&self) -> bool { 746 self.slab.is_empty() 747 } 748 749 /// Polls the queue, returning the index of the next slot in the slab that 750 /// should be returned. 751 /// 752 /// A slot should be returned when the associated deadline has been reached. poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<usize, Error>>>753 fn poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<usize, Error>>> { 754 use self::wheel::Stack; 755 756 let expired = self.expired.pop(&mut self.slab); 757 758 if expired.is_some() { 759 return Poll::Ready(expired.map(Ok)); 760 } 761 762 loop { 763 if let Some(ref mut delay) = self.delay { 764 if !delay.is_elapsed() { 765 ready!(Pin::new(&mut *delay).poll(cx)); 766 } 767 768 let now = crate::time::ms(delay.deadline() - self.start, crate::time::Round::Down); 769 770 self.wheel_now = now; 771 } 772 773 // We poll the wheel to get the next value out before finding the next deadline. 774 let wheel_idx = self.wheel.poll(self.wheel_now, &mut self.slab); 775 776 self.delay = self.next_deadline().map(|when| Box::pin(sleep_until(when))); 777 778 if let Some(idx) = wheel_idx { 779 return Poll::Ready(Some(Ok(idx))); 780 } 781 782 if self.delay.is_none() { 783 return Poll::Ready(None); 784 } 785 } 786 } 787 normalize_deadline(&self, when: Instant) -> u64788 fn normalize_deadline(&self, when: Instant) -> u64 { 789 let when = if when < self.start { 790 0 791 } else { 792 crate::time::ms(when - self.start, crate::time::Round::Up) 793 }; 794 795 cmp::max(when, self.wheel.elapsed()) 796 } 797 } 798 799 // We never put `T` in a `Pin`... 800 impl<T> Unpin for DelayQueue<T> {} 801 802 impl<T> Default for DelayQueue<T> { default() -> DelayQueue<T>803 fn default() -> DelayQueue<T> { 804 DelayQueue::new() 805 } 806 } 807 808 impl<T> futures_core::Stream for DelayQueue<T> { 809 // DelayQueue seems much more specific, where a user may care that it 810 // has reached capacity, so return those errors instead of panicking. 811 type Item = Result<Expired<T>, Error>; 812 poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>>813 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { 814 DelayQueue::poll_expired(self.get_mut(), cx) 815 } 816 } 817 818 impl<T> wheel::Stack for Stack<T> { 819 type Owned = usize; 820 type Borrowed = usize; 821 type Store = Slab<Data<T>>; 822 is_empty(&self) -> bool823 fn is_empty(&self) -> bool { 824 self.head.is_none() 825 } 826 push(&mut self, item: Self::Owned, store: &mut Self::Store)827 fn push(&mut self, item: Self::Owned, store: &mut Self::Store) { 828 // Ensure the entry is not already in a stack. 829 debug_assert!(store[item].next.is_none()); 830 debug_assert!(store[item].prev.is_none()); 831 832 // Remove the old head entry 833 let old = self.head.take(); 834 835 if let Some(idx) = old { 836 store[idx].prev = Some(item); 837 } 838 839 store[item].next = old; 840 self.head = Some(item) 841 } 842 pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned>843 fn pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned> { 844 if let Some(idx) = self.head { 845 self.head = store[idx].next; 846 847 if let Some(idx) = self.head { 848 store[idx].prev = None; 849 } 850 851 store[idx].next = None; 852 debug_assert!(store[idx].prev.is_none()); 853 854 Some(idx) 855 } else { 856 None 857 } 858 } 859 remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store)860 fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) { 861 assert!(store.contains(*item)); 862 863 // Ensure that the entry is in fact contained by the stack 864 debug_assert!({ 865 // This walks the full linked list even if an entry is found. 866 let mut next = self.head; 867 let mut contains = false; 868 869 while let Some(idx) = next { 870 if idx == *item { 871 debug_assert!(!contains); 872 contains = true; 873 } 874 875 next = store[idx].next; 876 } 877 878 contains 879 }); 880 881 if let Some(next) = store[*item].next { 882 store[next].prev = store[*item].prev; 883 } 884 885 if let Some(prev) = store[*item].prev { 886 store[prev].next = store[*item].next; 887 } else { 888 self.head = store[*item].next; 889 } 890 891 store[*item].next = None; 892 store[*item].prev = None; 893 } 894 when(item: &Self::Borrowed, store: &Self::Store) -> u64895 fn when(item: &Self::Borrowed, store: &Self::Store) -> u64 { 896 store[*item].when 897 } 898 } 899 900 impl<T> Default for Stack<T> { default() -> Stack<T>901 fn default() -> Stack<T> { 902 Stack { 903 head: None, 904 _p: PhantomData, 905 } 906 } 907 } 908 909 impl Key { new(index: usize) -> Key910 pub(crate) fn new(index: usize) -> Key { 911 Key { index } 912 } 913 } 914 915 impl<T> Expired<T> { 916 /// Returns a reference to the inner value. get_ref(&self) -> &T917 pub fn get_ref(&self) -> &T { 918 &self.data 919 } 920 921 /// Returns a mutable reference to the inner value. get_mut(&mut self) -> &mut T922 pub fn get_mut(&mut self) -> &mut T { 923 &mut self.data 924 } 925 926 /// Consumes `self` and returns the inner value. into_inner(self) -> T927 pub fn into_inner(self) -> T { 928 self.data 929 } 930 931 /// Returns the deadline that the expiration was set to. deadline(&self) -> Instant932 pub fn deadline(&self) -> Instant { 933 self.deadline 934 } 935 936 /// Returns the key that the expiration is indexed by. key(&self) -> Key937 pub fn key(&self) -> Key { 938 self.key 939 } 940 } 941