1 #![allow(clippy::blacklisted_name)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4
5 use tokio::time::{self, sleep, sleep_until, Duration, Instant};
6 use tokio_test::{assert_ok, assert_pending, assert_ready, task};
7 use tokio_util::time::DelayQueue;
8
9 macro_rules! poll {
10 ($queue:ident) => {
11 $queue.enter(|cx, mut queue| queue.poll_expired(cx))
12 };
13 }
14
15 macro_rules! assert_ready_ok {
16 ($e:expr) => {{
17 assert_ok!(match assert_ready!($e) {
18 Some(v) => v,
19 None => panic!("None"),
20 })
21 }};
22 }
23
24 #[tokio::test]
single_immediate_delay()25 async fn single_immediate_delay() {
26 time::pause();
27
28 let mut queue = task::spawn(DelayQueue::new());
29 let _key = queue.insert_at("foo", Instant::now());
30
31 // Advance time by 1ms to handle thee rounding
32 sleep(ms(1)).await;
33
34 assert_ready_ok!(poll!(queue));
35
36 let entry = assert_ready!(poll!(queue));
37 assert!(entry.is_none())
38 }
39
40 #[tokio::test]
multi_immediate_delays()41 async fn multi_immediate_delays() {
42 time::pause();
43
44 let mut queue = task::spawn(DelayQueue::new());
45
46 let _k = queue.insert_at("1", Instant::now());
47 let _k = queue.insert_at("2", Instant::now());
48 let _k = queue.insert_at("3", Instant::now());
49
50 sleep(ms(1)).await;
51
52 let mut res = vec![];
53
54 while res.len() < 3 {
55 let entry = assert_ready_ok!(poll!(queue));
56 res.push(entry.into_inner());
57 }
58
59 let entry = assert_ready!(poll!(queue));
60 assert!(entry.is_none());
61
62 res.sort_unstable();
63
64 assert_eq!("1", res[0]);
65 assert_eq!("2", res[1]);
66 assert_eq!("3", res[2]);
67 }
68
69 #[tokio::test]
single_short_delay()70 async fn single_short_delay() {
71 time::pause();
72
73 let mut queue = task::spawn(DelayQueue::new());
74 let _key = queue.insert_at("foo", Instant::now() + ms(5));
75
76 assert_pending!(poll!(queue));
77
78 sleep(ms(1)).await;
79
80 assert!(!queue.is_woken());
81
82 sleep(ms(5)).await;
83
84 assert!(queue.is_woken());
85
86 let entry = assert_ready_ok!(poll!(queue));
87 assert_eq!(*entry.get_ref(), "foo");
88
89 let entry = assert_ready!(poll!(queue));
90 assert!(entry.is_none());
91 }
92
93 #[tokio::test]
multi_delay_at_start()94 async fn multi_delay_at_start() {
95 time::pause();
96
97 let long = 262_144 + 9 * 4096;
98 let delays = &[1000, 2, 234, long, 60, 10];
99
100 let mut queue = task::spawn(DelayQueue::new());
101
102 // Setup the delays
103 for &i in delays {
104 let _key = queue.insert_at(i, Instant::now() + ms(i));
105 }
106
107 assert_pending!(poll!(queue));
108 assert!(!queue.is_woken());
109
110 let start = Instant::now();
111 for elapsed in 0..1200 {
112 let elapsed = elapsed + 1;
113 tokio::time::sleep_until(start + ms(elapsed)).await;
114
115 if delays.contains(&elapsed) {
116 assert!(queue.is_woken());
117 assert_ready!(poll!(queue));
118 assert_pending!(poll!(queue));
119 } else if queue.is_woken() {
120 let cascade = &[192, 960];
121 assert!(
122 cascade.contains(&elapsed),
123 "elapsed={} dt={:?}",
124 elapsed,
125 Instant::now() - start
126 );
127
128 assert_pending!(poll!(queue));
129 }
130 }
131 }
132
133 #[tokio::test]
insert_in_past_fires_immediately()134 async fn insert_in_past_fires_immediately() {
135 time::pause();
136
137 let mut queue = task::spawn(DelayQueue::new());
138 let now = Instant::now();
139
140 sleep(ms(10)).await;
141
142 queue.insert_at("foo", now);
143
144 assert_ready!(poll!(queue));
145 }
146
147 #[tokio::test]
remove_entry()148 async fn remove_entry() {
149 time::pause();
150
151 let mut queue = task::spawn(DelayQueue::new());
152
153 let key = queue.insert_at("foo", Instant::now() + ms(5));
154
155 assert_pending!(poll!(queue));
156
157 let entry = queue.remove(&key);
158 assert_eq!(entry.into_inner(), "foo");
159
160 sleep(ms(10)).await;
161
162 let entry = assert_ready!(poll!(queue));
163 assert!(entry.is_none());
164 }
165
166 #[tokio::test]
reset_entry()167 async fn reset_entry() {
168 time::pause();
169
170 let mut queue = task::spawn(DelayQueue::new());
171
172 let now = Instant::now();
173 let key = queue.insert_at("foo", now + ms(5));
174
175 assert_pending!(poll!(queue));
176 sleep(ms(1)).await;
177
178 queue.reset_at(&key, now + ms(10));
179
180 assert_pending!(poll!(queue));
181
182 sleep(ms(7)).await;
183
184 assert!(!queue.is_woken());
185
186 assert_pending!(poll!(queue));
187
188 sleep(ms(3)).await;
189
190 assert!(queue.is_woken());
191
192 let entry = assert_ready_ok!(poll!(queue));
193 assert_eq!(*entry.get_ref(), "foo");
194
195 let entry = assert_ready!(poll!(queue));
196 assert!(entry.is_none())
197 }
198
199 // Reproduces tokio-rs/tokio#849.
200 #[tokio::test]
reset_much_later()201 async fn reset_much_later() {
202 time::pause();
203
204 let mut queue = task::spawn(DelayQueue::new());
205
206 let now = Instant::now();
207 sleep(ms(1)).await;
208
209 let key = queue.insert_at("foo", now + ms(200));
210 assert_pending!(poll!(queue));
211
212 sleep(ms(3)).await;
213
214 queue.reset_at(&key, now + ms(10));
215
216 sleep(ms(20)).await;
217
218 assert!(queue.is_woken());
219 }
220
221 // Reproduces tokio-rs/tokio#849.
222 #[tokio::test]
reset_twice()223 async fn reset_twice() {
224 time::pause();
225
226 let mut queue = task::spawn(DelayQueue::new());
227 let now = Instant::now();
228
229 sleep(ms(1)).await;
230
231 let key = queue.insert_at("foo", now + ms(200));
232
233 assert_pending!(poll!(queue));
234
235 sleep(ms(3)).await;
236
237 queue.reset_at(&key, now + ms(50));
238
239 sleep(ms(20)).await;
240
241 queue.reset_at(&key, now + ms(40));
242
243 sleep(ms(20)).await;
244
245 assert!(queue.is_woken());
246 }
247
248 /// Regression test: Given an entry inserted with a deadline in the past, so
249 /// that it is placed directly on the expired queue, reset the entry to a
250 /// deadline in the future. Validate that this leaves the entry and queue in an
251 /// internally consistent state by running an additional reset on the entry
252 /// before polling it to completion.
253 #[tokio::test]
repeatedly_reset_entry_inserted_as_expired()254 async fn repeatedly_reset_entry_inserted_as_expired() {
255 time::pause();
256 let mut queue = task::spawn(DelayQueue::new());
257 let now = Instant::now();
258
259 let key = queue.insert_at("foo", now - ms(100));
260
261 queue.reset_at(&key, now + ms(100));
262 queue.reset_at(&key, now + ms(50));
263
264 assert_pending!(poll!(queue));
265
266 time::sleep_until(now + ms(60)).await;
267
268 assert!(queue.is_woken());
269
270 let entry = assert_ready_ok!(poll!(queue)).into_inner();
271 assert_eq!(entry, "foo");
272
273 let entry = assert_ready!(poll!(queue));
274 assert!(entry.is_none());
275 }
276
277 #[tokio::test]
remove_expired_item()278 async fn remove_expired_item() {
279 time::pause();
280
281 let mut queue = DelayQueue::new();
282
283 let now = Instant::now();
284
285 sleep(ms(10)).await;
286
287 let key = queue.insert_at("foo", now);
288
289 let entry = queue.remove(&key);
290 assert_eq!(entry.into_inner(), "foo");
291 }
292
293 /// Regression test: it should be possible to remove entries which fall in the
294 /// 0th slot of the internal timer wheel — that is, entries whose expiration
295 /// (a) falls at the beginning of one of the wheel's hierarchical levels and (b)
296 /// is equal to the wheel's current elapsed time.
297 #[tokio::test]
remove_at_timer_wheel_threshold()298 async fn remove_at_timer_wheel_threshold() {
299 time::pause();
300
301 let mut queue = task::spawn(DelayQueue::new());
302
303 let now = Instant::now();
304
305 let key1 = queue.insert_at("foo", now + ms(64));
306 let key2 = queue.insert_at("bar", now + ms(64));
307
308 sleep(ms(80)).await;
309
310 let entry = assert_ready_ok!(poll!(queue)).into_inner();
311
312 match entry {
313 "foo" => {
314 let entry = queue.remove(&key2).into_inner();
315 assert_eq!(entry, "bar");
316 }
317 "bar" => {
318 let entry = queue.remove(&key1).into_inner();
319 assert_eq!(entry, "foo");
320 }
321 other => panic!("other: {:?}", other),
322 }
323 }
324
325 #[tokio::test]
expires_before_last_insert()326 async fn expires_before_last_insert() {
327 time::pause();
328
329 let mut queue = task::spawn(DelayQueue::new());
330
331 let now = Instant::now();
332
333 queue.insert_at("foo", now + ms(10_000));
334
335 // Delay should be set to 8.192s here.
336 assert_pending!(poll!(queue));
337
338 // Delay should be set to the delay of the new item here
339 queue.insert_at("bar", now + ms(600));
340
341 assert_pending!(poll!(queue));
342
343 sleep(ms(600)).await;
344
345 assert!(queue.is_woken());
346
347 let entry = assert_ready_ok!(poll!(queue)).into_inner();
348 assert_eq!(entry, "bar");
349 }
350
351 #[tokio::test]
multi_reset()352 async fn multi_reset() {
353 time::pause();
354
355 let mut queue = task::spawn(DelayQueue::new());
356
357 let now = Instant::now();
358
359 let one = queue.insert_at("one", now + ms(200));
360 let two = queue.insert_at("two", now + ms(250));
361
362 assert_pending!(poll!(queue));
363
364 queue.reset_at(&one, now + ms(300));
365 queue.reset_at(&two, now + ms(350));
366 queue.reset_at(&one, now + ms(400));
367
368 sleep(ms(310)).await;
369
370 assert_pending!(poll!(queue));
371
372 sleep(ms(50)).await;
373
374 let entry = assert_ready_ok!(poll!(queue));
375 assert_eq!(*entry.get_ref(), "two");
376
377 assert_pending!(poll!(queue));
378
379 sleep(ms(50)).await;
380
381 let entry = assert_ready_ok!(poll!(queue));
382 assert_eq!(*entry.get_ref(), "one");
383
384 let entry = assert_ready!(poll!(queue));
385 assert!(entry.is_none())
386 }
387
388 #[tokio::test]
expire_first_key_when_reset_to_expire_earlier()389 async fn expire_first_key_when_reset_to_expire_earlier() {
390 time::pause();
391
392 let mut queue = task::spawn(DelayQueue::new());
393
394 let now = Instant::now();
395
396 let one = queue.insert_at("one", now + ms(200));
397 queue.insert_at("two", now + ms(250));
398
399 assert_pending!(poll!(queue));
400
401 queue.reset_at(&one, now + ms(100));
402
403 sleep(ms(100)).await;
404
405 assert!(queue.is_woken());
406
407 let entry = assert_ready_ok!(poll!(queue)).into_inner();
408 assert_eq!(entry, "one");
409 }
410
411 #[tokio::test]
expire_second_key_when_reset_to_expire_earlier()412 async fn expire_second_key_when_reset_to_expire_earlier() {
413 time::pause();
414
415 let mut queue = task::spawn(DelayQueue::new());
416
417 let now = Instant::now();
418
419 queue.insert_at("one", now + ms(200));
420 let two = queue.insert_at("two", now + ms(250));
421
422 assert_pending!(poll!(queue));
423
424 queue.reset_at(&two, now + ms(100));
425
426 sleep(ms(100)).await;
427
428 assert!(queue.is_woken());
429
430 let entry = assert_ready_ok!(poll!(queue)).into_inner();
431 assert_eq!(entry, "two");
432 }
433
434 #[tokio::test]
reset_first_expiring_item_to_expire_later()435 async fn reset_first_expiring_item_to_expire_later() {
436 time::pause();
437
438 let mut queue = task::spawn(DelayQueue::new());
439
440 let now = Instant::now();
441
442 let one = queue.insert_at("one", now + ms(200));
443 let _two = queue.insert_at("two", now + ms(250));
444
445 assert_pending!(poll!(queue));
446
447 queue.reset_at(&one, now + ms(300));
448 sleep(ms(250)).await;
449
450 assert!(queue.is_woken());
451
452 let entry = assert_ready_ok!(poll!(queue)).into_inner();
453 assert_eq!(entry, "two");
454 }
455
456 #[tokio::test]
insert_before_first_after_poll()457 async fn insert_before_first_after_poll() {
458 time::pause();
459
460 let mut queue = task::spawn(DelayQueue::new());
461
462 let now = Instant::now();
463
464 let _one = queue.insert_at("one", now + ms(200));
465
466 assert_pending!(poll!(queue));
467
468 let _two = queue.insert_at("two", now + ms(100));
469
470 sleep(ms(99)).await;
471
472 assert_pending!(poll!(queue));
473
474 sleep(ms(1)).await;
475
476 assert!(queue.is_woken());
477
478 let entry = assert_ready_ok!(poll!(queue)).into_inner();
479 assert_eq!(entry, "two");
480 }
481
482 #[tokio::test]
insert_after_ready_poll()483 async fn insert_after_ready_poll() {
484 time::pause();
485
486 let mut queue = task::spawn(DelayQueue::new());
487
488 let now = Instant::now();
489
490 queue.insert_at("1", now + ms(100));
491 queue.insert_at("2", now + ms(100));
492 queue.insert_at("3", now + ms(100));
493
494 assert_pending!(poll!(queue));
495
496 sleep(ms(100)).await;
497
498 assert!(queue.is_woken());
499
500 let mut res = vec![];
501
502 while res.len() < 3 {
503 let entry = assert_ready_ok!(poll!(queue));
504 res.push(entry.into_inner());
505 queue.insert_at("foo", now + ms(500));
506 }
507
508 res.sort_unstable();
509
510 assert_eq!("1", res[0]);
511 assert_eq!("2", res[1]);
512 assert_eq!("3", res[2]);
513 }
514
515 #[tokio::test]
reset_later_after_slot_starts()516 async fn reset_later_after_slot_starts() {
517 time::pause();
518
519 let mut queue = task::spawn(DelayQueue::new());
520
521 let now = Instant::now();
522
523 let foo = queue.insert_at("foo", now + ms(100));
524
525 assert_pending!(poll!(queue));
526
527 sleep_until(now + Duration::from_millis(80)).await;
528
529 assert!(!queue.is_woken());
530
531 // At this point the queue hasn't been polled, so `elapsed` on the wheel
532 // for the queue is still at 0 and hence the 1ms resolution slots cover
533 // [0-64). Resetting the time on the entry to 120 causes it to get put in
534 // the [64-128) slot. As the queue knows that the first entry is within
535 // that slot, but doesn't know when, it must wake immediately to advance
536 // the wheel.
537 queue.reset_at(&foo, now + ms(120));
538 assert!(queue.is_woken());
539
540 assert_pending!(poll!(queue));
541
542 sleep_until(now + Duration::from_millis(119)).await;
543 assert!(!queue.is_woken());
544
545 sleep(ms(1)).await;
546 assert!(queue.is_woken());
547
548 let entry = assert_ready_ok!(poll!(queue)).into_inner();
549 assert_eq!(entry, "foo");
550 }
551
552 #[tokio::test]
reset_inserted_expired()553 async fn reset_inserted_expired() {
554 time::pause();
555 let mut queue = task::spawn(DelayQueue::new());
556 let now = Instant::now();
557
558 let key = queue.insert_at("foo", now - ms(100));
559
560 // this causes the panic described in #2473
561 queue.reset_at(&key, now + ms(100));
562
563 assert_eq!(1, queue.len());
564
565 sleep(ms(200)).await;
566
567 let entry = assert_ready_ok!(poll!(queue)).into_inner();
568 assert_eq!(entry, "foo");
569
570 assert_eq!(queue.len(), 0);
571 }
572
573 #[tokio::test]
reset_earlier_after_slot_starts()574 async fn reset_earlier_after_slot_starts() {
575 time::pause();
576
577 let mut queue = task::spawn(DelayQueue::new());
578
579 let now = Instant::now();
580
581 let foo = queue.insert_at("foo", now + ms(200));
582
583 assert_pending!(poll!(queue));
584
585 sleep_until(now + Duration::from_millis(80)).await;
586
587 assert!(!queue.is_woken());
588
589 // At this point the queue hasn't been polled, so `elapsed` on the wheel
590 // for the queue is still at 0 and hence the 1ms resolution slots cover
591 // [0-64). Resetting the time on the entry to 120 causes it to get put in
592 // the [64-128) slot. As the queue knows that the first entry is within
593 // that slot, but doesn't know when, it must wake immediately to advance
594 // the wheel.
595 queue.reset_at(&foo, now + ms(120));
596 assert!(queue.is_woken());
597
598 assert_pending!(poll!(queue));
599
600 sleep_until(now + Duration::from_millis(119)).await;
601 assert!(!queue.is_woken());
602
603 sleep(ms(1)).await;
604 assert!(queue.is_woken());
605
606 let entry = assert_ready_ok!(poll!(queue)).into_inner();
607 assert_eq!(entry, "foo");
608 }
609
610 #[tokio::test]
insert_in_past_after_poll_fires_immediately()611 async fn insert_in_past_after_poll_fires_immediately() {
612 time::pause();
613
614 let mut queue = task::spawn(DelayQueue::new());
615
616 let now = Instant::now();
617
618 queue.insert_at("foo", now + ms(200));
619
620 assert_pending!(poll!(queue));
621
622 sleep(ms(80)).await;
623
624 assert!(!queue.is_woken());
625 queue.insert_at("bar", now + ms(40));
626
627 assert!(queue.is_woken());
628
629 let entry = assert_ready_ok!(poll!(queue)).into_inner();
630 assert_eq!(entry, "bar");
631 }
632
ms(n: u64) -> Duration633 fn ms(n: u64) -> Duration {
634 Duration::from_millis(n)
635 }
636