1 #![allow(clippy::blacklisted_name)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 
5 use tokio::time::{self, sleep, sleep_until, Duration, Instant};
6 use tokio_test::{assert_ok, assert_pending, assert_ready, task};
7 use tokio_util::time::DelayQueue;
8 
9 macro_rules! poll {
10     ($queue:ident) => {
11         $queue.enter(|cx, mut queue| queue.poll_expired(cx))
12     };
13 }
14 
15 macro_rules! assert_ready_ok {
16     ($e:expr) => {{
17         assert_ok!(match assert_ready!($e) {
18             Some(v) => v,
19             None => panic!("None"),
20         })
21     }};
22 }
23 
24 #[tokio::test]
single_immediate_delay()25 async fn single_immediate_delay() {
26     time::pause();
27 
28     let mut queue = task::spawn(DelayQueue::new());
29     let _key = queue.insert_at("foo", Instant::now());
30 
31     // Advance time by 1ms to handle thee rounding
32     sleep(ms(1)).await;
33 
34     assert_ready_ok!(poll!(queue));
35 
36     let entry = assert_ready!(poll!(queue));
37     assert!(entry.is_none())
38 }
39 
40 #[tokio::test]
multi_immediate_delays()41 async fn multi_immediate_delays() {
42     time::pause();
43 
44     let mut queue = task::spawn(DelayQueue::new());
45 
46     let _k = queue.insert_at("1", Instant::now());
47     let _k = queue.insert_at("2", Instant::now());
48     let _k = queue.insert_at("3", Instant::now());
49 
50     sleep(ms(1)).await;
51 
52     let mut res = vec![];
53 
54     while res.len() < 3 {
55         let entry = assert_ready_ok!(poll!(queue));
56         res.push(entry.into_inner());
57     }
58 
59     let entry = assert_ready!(poll!(queue));
60     assert!(entry.is_none());
61 
62     res.sort_unstable();
63 
64     assert_eq!("1", res[0]);
65     assert_eq!("2", res[1]);
66     assert_eq!("3", res[2]);
67 }
68 
69 #[tokio::test]
single_short_delay()70 async fn single_short_delay() {
71     time::pause();
72 
73     let mut queue = task::spawn(DelayQueue::new());
74     let _key = queue.insert_at("foo", Instant::now() + ms(5));
75 
76     assert_pending!(poll!(queue));
77 
78     sleep(ms(1)).await;
79 
80     assert!(!queue.is_woken());
81 
82     sleep(ms(5)).await;
83 
84     assert!(queue.is_woken());
85 
86     let entry = assert_ready_ok!(poll!(queue));
87     assert_eq!(*entry.get_ref(), "foo");
88 
89     let entry = assert_ready!(poll!(queue));
90     assert!(entry.is_none());
91 }
92 
93 #[tokio::test]
multi_delay_at_start()94 async fn multi_delay_at_start() {
95     time::pause();
96 
97     let long = 262_144 + 9 * 4096;
98     let delays = &[1000, 2, 234, long, 60, 10];
99 
100     let mut queue = task::spawn(DelayQueue::new());
101 
102     // Setup the delays
103     for &i in delays {
104         let _key = queue.insert_at(i, Instant::now() + ms(i));
105     }
106 
107     assert_pending!(poll!(queue));
108     assert!(!queue.is_woken());
109 
110     let start = Instant::now();
111     for elapsed in 0..1200 {
112         let elapsed = elapsed + 1;
113         tokio::time::sleep_until(start + ms(elapsed)).await;
114 
115         if delays.contains(&elapsed) {
116             assert!(queue.is_woken());
117             assert_ready!(poll!(queue));
118             assert_pending!(poll!(queue));
119         } else if queue.is_woken() {
120             let cascade = &[192, 960];
121             assert!(
122                 cascade.contains(&elapsed),
123                 "elapsed={} dt={:?}",
124                 elapsed,
125                 Instant::now() - start
126             );
127 
128             assert_pending!(poll!(queue));
129         }
130     }
131 }
132 
133 #[tokio::test]
insert_in_past_fires_immediately()134 async fn insert_in_past_fires_immediately() {
135     time::pause();
136 
137     let mut queue = task::spawn(DelayQueue::new());
138     let now = Instant::now();
139 
140     sleep(ms(10)).await;
141 
142     queue.insert_at("foo", now);
143 
144     assert_ready!(poll!(queue));
145 }
146 
147 #[tokio::test]
remove_entry()148 async fn remove_entry() {
149     time::pause();
150 
151     let mut queue = task::spawn(DelayQueue::new());
152 
153     let key = queue.insert_at("foo", Instant::now() + ms(5));
154 
155     assert_pending!(poll!(queue));
156 
157     let entry = queue.remove(&key);
158     assert_eq!(entry.into_inner(), "foo");
159 
160     sleep(ms(10)).await;
161 
162     let entry = assert_ready!(poll!(queue));
163     assert!(entry.is_none());
164 }
165 
166 #[tokio::test]
reset_entry()167 async fn reset_entry() {
168     time::pause();
169 
170     let mut queue = task::spawn(DelayQueue::new());
171 
172     let now = Instant::now();
173     let key = queue.insert_at("foo", now + ms(5));
174 
175     assert_pending!(poll!(queue));
176     sleep(ms(1)).await;
177 
178     queue.reset_at(&key, now + ms(10));
179 
180     assert_pending!(poll!(queue));
181 
182     sleep(ms(7)).await;
183 
184     assert!(!queue.is_woken());
185 
186     assert_pending!(poll!(queue));
187 
188     sleep(ms(3)).await;
189 
190     assert!(queue.is_woken());
191 
192     let entry = assert_ready_ok!(poll!(queue));
193     assert_eq!(*entry.get_ref(), "foo");
194 
195     let entry = assert_ready!(poll!(queue));
196     assert!(entry.is_none())
197 }
198 
199 // Reproduces tokio-rs/tokio#849.
200 #[tokio::test]
reset_much_later()201 async fn reset_much_later() {
202     time::pause();
203 
204     let mut queue = task::spawn(DelayQueue::new());
205 
206     let now = Instant::now();
207     sleep(ms(1)).await;
208 
209     let key = queue.insert_at("foo", now + ms(200));
210     assert_pending!(poll!(queue));
211 
212     sleep(ms(3)).await;
213 
214     queue.reset_at(&key, now + ms(10));
215 
216     sleep(ms(20)).await;
217 
218     assert!(queue.is_woken());
219 }
220 
221 // Reproduces tokio-rs/tokio#849.
222 #[tokio::test]
reset_twice()223 async fn reset_twice() {
224     time::pause();
225 
226     let mut queue = task::spawn(DelayQueue::new());
227     let now = Instant::now();
228 
229     sleep(ms(1)).await;
230 
231     let key = queue.insert_at("foo", now + ms(200));
232 
233     assert_pending!(poll!(queue));
234 
235     sleep(ms(3)).await;
236 
237     queue.reset_at(&key, now + ms(50));
238 
239     sleep(ms(20)).await;
240 
241     queue.reset_at(&key, now + ms(40));
242 
243     sleep(ms(20)).await;
244 
245     assert!(queue.is_woken());
246 }
247 
248 /// Regression test: Given an entry inserted with a deadline in the past, so
249 /// that it is placed directly on the expired queue, reset the entry to a
250 /// deadline in the future. Validate that this leaves the entry and queue in an
251 /// internally consistent state by running an additional reset on the entry
252 /// before polling it to completion.
253 #[tokio::test]
repeatedly_reset_entry_inserted_as_expired()254 async fn repeatedly_reset_entry_inserted_as_expired() {
255     time::pause();
256     let mut queue = task::spawn(DelayQueue::new());
257     let now = Instant::now();
258 
259     let key = queue.insert_at("foo", now - ms(100));
260 
261     queue.reset_at(&key, now + ms(100));
262     queue.reset_at(&key, now + ms(50));
263 
264     assert_pending!(poll!(queue));
265 
266     time::sleep_until(now + ms(60)).await;
267 
268     assert!(queue.is_woken());
269 
270     let entry = assert_ready_ok!(poll!(queue)).into_inner();
271     assert_eq!(entry, "foo");
272 
273     let entry = assert_ready!(poll!(queue));
274     assert!(entry.is_none());
275 }
276 
277 #[tokio::test]
remove_expired_item()278 async fn remove_expired_item() {
279     time::pause();
280 
281     let mut queue = DelayQueue::new();
282 
283     let now = Instant::now();
284 
285     sleep(ms(10)).await;
286 
287     let key = queue.insert_at("foo", now);
288 
289     let entry = queue.remove(&key);
290     assert_eq!(entry.into_inner(), "foo");
291 }
292 
293 /// Regression test: it should be possible to remove entries which fall in the
294 /// 0th slot of the internal timer wheel — that is, entries whose expiration
295 /// (a) falls at the beginning of one of the wheel's hierarchical levels and (b)
296 /// is equal to the wheel's current elapsed time.
297 #[tokio::test]
remove_at_timer_wheel_threshold()298 async fn remove_at_timer_wheel_threshold() {
299     time::pause();
300 
301     let mut queue = task::spawn(DelayQueue::new());
302 
303     let now = Instant::now();
304 
305     let key1 = queue.insert_at("foo", now + ms(64));
306     let key2 = queue.insert_at("bar", now + ms(64));
307 
308     sleep(ms(80)).await;
309 
310     let entry = assert_ready_ok!(poll!(queue)).into_inner();
311 
312     match entry {
313         "foo" => {
314             let entry = queue.remove(&key2).into_inner();
315             assert_eq!(entry, "bar");
316         }
317         "bar" => {
318             let entry = queue.remove(&key1).into_inner();
319             assert_eq!(entry, "foo");
320         }
321         other => panic!("other: {:?}", other),
322     }
323 }
324 
325 #[tokio::test]
expires_before_last_insert()326 async fn expires_before_last_insert() {
327     time::pause();
328 
329     let mut queue = task::spawn(DelayQueue::new());
330 
331     let now = Instant::now();
332 
333     queue.insert_at("foo", now + ms(10_000));
334 
335     // Delay should be set to 8.192s here.
336     assert_pending!(poll!(queue));
337 
338     // Delay should be set to the delay of the new item here
339     queue.insert_at("bar", now + ms(600));
340 
341     assert_pending!(poll!(queue));
342 
343     sleep(ms(600)).await;
344 
345     assert!(queue.is_woken());
346 
347     let entry = assert_ready_ok!(poll!(queue)).into_inner();
348     assert_eq!(entry, "bar");
349 }
350 
351 #[tokio::test]
multi_reset()352 async fn multi_reset() {
353     time::pause();
354 
355     let mut queue = task::spawn(DelayQueue::new());
356 
357     let now = Instant::now();
358 
359     let one = queue.insert_at("one", now + ms(200));
360     let two = queue.insert_at("two", now + ms(250));
361 
362     assert_pending!(poll!(queue));
363 
364     queue.reset_at(&one, now + ms(300));
365     queue.reset_at(&two, now + ms(350));
366     queue.reset_at(&one, now + ms(400));
367 
368     sleep(ms(310)).await;
369 
370     assert_pending!(poll!(queue));
371 
372     sleep(ms(50)).await;
373 
374     let entry = assert_ready_ok!(poll!(queue));
375     assert_eq!(*entry.get_ref(), "two");
376 
377     assert_pending!(poll!(queue));
378 
379     sleep(ms(50)).await;
380 
381     let entry = assert_ready_ok!(poll!(queue));
382     assert_eq!(*entry.get_ref(), "one");
383 
384     let entry = assert_ready!(poll!(queue));
385     assert!(entry.is_none())
386 }
387 
388 #[tokio::test]
expire_first_key_when_reset_to_expire_earlier()389 async fn expire_first_key_when_reset_to_expire_earlier() {
390     time::pause();
391 
392     let mut queue = task::spawn(DelayQueue::new());
393 
394     let now = Instant::now();
395 
396     let one = queue.insert_at("one", now + ms(200));
397     queue.insert_at("two", now + ms(250));
398 
399     assert_pending!(poll!(queue));
400 
401     queue.reset_at(&one, now + ms(100));
402 
403     sleep(ms(100)).await;
404 
405     assert!(queue.is_woken());
406 
407     let entry = assert_ready_ok!(poll!(queue)).into_inner();
408     assert_eq!(entry, "one");
409 }
410 
411 #[tokio::test]
expire_second_key_when_reset_to_expire_earlier()412 async fn expire_second_key_when_reset_to_expire_earlier() {
413     time::pause();
414 
415     let mut queue = task::spawn(DelayQueue::new());
416 
417     let now = Instant::now();
418 
419     queue.insert_at("one", now + ms(200));
420     let two = queue.insert_at("two", now + ms(250));
421 
422     assert_pending!(poll!(queue));
423 
424     queue.reset_at(&two, now + ms(100));
425 
426     sleep(ms(100)).await;
427 
428     assert!(queue.is_woken());
429 
430     let entry = assert_ready_ok!(poll!(queue)).into_inner();
431     assert_eq!(entry, "two");
432 }
433 
434 #[tokio::test]
reset_first_expiring_item_to_expire_later()435 async fn reset_first_expiring_item_to_expire_later() {
436     time::pause();
437 
438     let mut queue = task::spawn(DelayQueue::new());
439 
440     let now = Instant::now();
441 
442     let one = queue.insert_at("one", now + ms(200));
443     let _two = queue.insert_at("two", now + ms(250));
444 
445     assert_pending!(poll!(queue));
446 
447     queue.reset_at(&one, now + ms(300));
448     sleep(ms(250)).await;
449 
450     assert!(queue.is_woken());
451 
452     let entry = assert_ready_ok!(poll!(queue)).into_inner();
453     assert_eq!(entry, "two");
454 }
455 
456 #[tokio::test]
insert_before_first_after_poll()457 async fn insert_before_first_after_poll() {
458     time::pause();
459 
460     let mut queue = task::spawn(DelayQueue::new());
461 
462     let now = Instant::now();
463 
464     let _one = queue.insert_at("one", now + ms(200));
465 
466     assert_pending!(poll!(queue));
467 
468     let _two = queue.insert_at("two", now + ms(100));
469 
470     sleep(ms(99)).await;
471 
472     assert_pending!(poll!(queue));
473 
474     sleep(ms(1)).await;
475 
476     assert!(queue.is_woken());
477 
478     let entry = assert_ready_ok!(poll!(queue)).into_inner();
479     assert_eq!(entry, "two");
480 }
481 
482 #[tokio::test]
insert_after_ready_poll()483 async fn insert_after_ready_poll() {
484     time::pause();
485 
486     let mut queue = task::spawn(DelayQueue::new());
487 
488     let now = Instant::now();
489 
490     queue.insert_at("1", now + ms(100));
491     queue.insert_at("2", now + ms(100));
492     queue.insert_at("3", now + ms(100));
493 
494     assert_pending!(poll!(queue));
495 
496     sleep(ms(100)).await;
497 
498     assert!(queue.is_woken());
499 
500     let mut res = vec![];
501 
502     while res.len() < 3 {
503         let entry = assert_ready_ok!(poll!(queue));
504         res.push(entry.into_inner());
505         queue.insert_at("foo", now + ms(500));
506     }
507 
508     res.sort_unstable();
509 
510     assert_eq!("1", res[0]);
511     assert_eq!("2", res[1]);
512     assert_eq!("3", res[2]);
513 }
514 
515 #[tokio::test]
reset_later_after_slot_starts()516 async fn reset_later_after_slot_starts() {
517     time::pause();
518 
519     let mut queue = task::spawn(DelayQueue::new());
520 
521     let now = Instant::now();
522 
523     let foo = queue.insert_at("foo", now + ms(100));
524 
525     assert_pending!(poll!(queue));
526 
527     sleep_until(now + Duration::from_millis(80)).await;
528 
529     assert!(!queue.is_woken());
530 
531     // At this point the queue hasn't been polled, so `elapsed` on the wheel
532     // for the queue is still at 0 and hence the 1ms resolution slots cover
533     // [0-64).  Resetting the time on the entry to 120 causes it to get put in
534     // the [64-128) slot.  As the queue knows that the first entry is within
535     // that slot, but doesn't know when, it must wake immediately to advance
536     // the wheel.
537     queue.reset_at(&foo, now + ms(120));
538     assert!(queue.is_woken());
539 
540     assert_pending!(poll!(queue));
541 
542     sleep_until(now + Duration::from_millis(119)).await;
543     assert!(!queue.is_woken());
544 
545     sleep(ms(1)).await;
546     assert!(queue.is_woken());
547 
548     let entry = assert_ready_ok!(poll!(queue)).into_inner();
549     assert_eq!(entry, "foo");
550 }
551 
552 #[tokio::test]
reset_inserted_expired()553 async fn reset_inserted_expired() {
554     time::pause();
555     let mut queue = task::spawn(DelayQueue::new());
556     let now = Instant::now();
557 
558     let key = queue.insert_at("foo", now - ms(100));
559 
560     // this causes the panic described in #2473
561     queue.reset_at(&key, now + ms(100));
562 
563     assert_eq!(1, queue.len());
564 
565     sleep(ms(200)).await;
566 
567     let entry = assert_ready_ok!(poll!(queue)).into_inner();
568     assert_eq!(entry, "foo");
569 
570     assert_eq!(queue.len(), 0);
571 }
572 
573 #[tokio::test]
reset_earlier_after_slot_starts()574 async fn reset_earlier_after_slot_starts() {
575     time::pause();
576 
577     let mut queue = task::spawn(DelayQueue::new());
578 
579     let now = Instant::now();
580 
581     let foo = queue.insert_at("foo", now + ms(200));
582 
583     assert_pending!(poll!(queue));
584 
585     sleep_until(now + Duration::from_millis(80)).await;
586 
587     assert!(!queue.is_woken());
588 
589     // At this point the queue hasn't been polled, so `elapsed` on the wheel
590     // for the queue is still at 0 and hence the 1ms resolution slots cover
591     // [0-64).  Resetting the time on the entry to 120 causes it to get put in
592     // the [64-128) slot.  As the queue knows that the first entry is within
593     // that slot, but doesn't know when, it must wake immediately to advance
594     // the wheel.
595     queue.reset_at(&foo, now + ms(120));
596     assert!(queue.is_woken());
597 
598     assert_pending!(poll!(queue));
599 
600     sleep_until(now + Duration::from_millis(119)).await;
601     assert!(!queue.is_woken());
602 
603     sleep(ms(1)).await;
604     assert!(queue.is_woken());
605 
606     let entry = assert_ready_ok!(poll!(queue)).into_inner();
607     assert_eq!(entry, "foo");
608 }
609 
610 #[tokio::test]
insert_in_past_after_poll_fires_immediately()611 async fn insert_in_past_after_poll_fires_immediately() {
612     time::pause();
613 
614     let mut queue = task::spawn(DelayQueue::new());
615 
616     let now = Instant::now();
617 
618     queue.insert_at("foo", now + ms(200));
619 
620     assert_pending!(poll!(queue));
621 
622     sleep(ms(80)).await;
623 
624     assert!(!queue.is_woken());
625     queue.insert_at("bar", now + ms(40));
626 
627     assert!(queue.is_woken());
628 
629     let entry = assert_ready_ok!(poll!(queue)).into_inner();
630     assert_eq!(entry, "bar");
631 }
632 
ms(n: u64) -> Duration633 fn ms(n: u64) -> Duration {
634     Duration::from_millis(n)
635 }
636