1 #![allow(clippy::blacklisted_name)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 
5 use tokio::time::{self, delay_for, DelayQueue, Duration, Instant};
6 use tokio_test::{assert_ok, assert_pending, assert_ready, task};
7 
8 macro_rules! poll {
9     ($queue:ident) => {
10         $queue.enter(|cx, mut queue| queue.poll_expired(cx))
11     };
12 }
13 
14 macro_rules! assert_ready_ok {
15     ($e:expr) => {{
16         assert_ok!(match assert_ready!($e) {
17             Some(v) => v,
18             None => panic!("None"),
19         })
20     }};
21 }
22 
23 #[tokio::test]
single_immediate_delay()24 async fn single_immediate_delay() {
25     time::pause();
26 
27     let mut queue = task::spawn(DelayQueue::new());
28     let _key = queue.insert_at("foo", Instant::now());
29 
30     // Advance time by 1ms to handle thee rounding
31     delay_for(ms(1)).await;
32 
33     assert_ready_ok!(poll!(queue));
34 
35     let entry = assert_ready!(poll!(queue));
36     assert!(entry.is_none())
37 }
38 
39 #[tokio::test]
multi_immediate_delays()40 async fn multi_immediate_delays() {
41     time::pause();
42 
43     let mut queue = task::spawn(DelayQueue::new());
44 
45     let _k = queue.insert_at("1", Instant::now());
46     let _k = queue.insert_at("2", Instant::now());
47     let _k = queue.insert_at("3", Instant::now());
48 
49     delay_for(ms(1)).await;
50 
51     let mut res = vec![];
52 
53     while res.len() < 3 {
54         let entry = assert_ready_ok!(poll!(queue));
55         res.push(entry.into_inner());
56     }
57 
58     let entry = assert_ready!(poll!(queue));
59     assert!(entry.is_none());
60 
61     res.sort();
62 
63     assert_eq!("1", res[0]);
64     assert_eq!("2", res[1]);
65     assert_eq!("3", res[2]);
66 }
67 
68 #[tokio::test]
single_short_delay()69 async fn single_short_delay() {
70     time::pause();
71 
72     let mut queue = task::spawn(DelayQueue::new());
73     let _key = queue.insert_at("foo", Instant::now() + ms(5));
74 
75     assert_pending!(poll!(queue));
76 
77     delay_for(ms(1)).await;
78 
79     assert!(!queue.is_woken());
80 
81     delay_for(ms(5)).await;
82 
83     assert!(queue.is_woken());
84 
85     let entry = assert_ready_ok!(poll!(queue));
86     assert_eq!(*entry.get_ref(), "foo");
87 
88     let entry = assert_ready!(poll!(queue));
89     assert!(entry.is_none());
90 }
91 
92 #[tokio::test]
multi_delay_at_start()93 async fn multi_delay_at_start() {
94     time::pause();
95 
96     let long = 262_144 + 9 * 4096;
97     let delays = &[1000, 2, 234, long, 60, 10];
98 
99     let mut queue = task::spawn(DelayQueue::new());
100 
101     // Setup the delays
102     for &i in delays {
103         let _key = queue.insert_at(i, Instant::now() + ms(i));
104     }
105 
106     assert_pending!(poll!(queue));
107     assert!(!queue.is_woken());
108 
109     for elapsed in 0..1200 {
110         delay_for(ms(1)).await;
111         let elapsed = elapsed + 1;
112 
113         if delays.contains(&elapsed) {
114             assert!(queue.is_woken());
115             assert_ready!(poll!(queue));
116             assert_pending!(poll!(queue));
117         } else if queue.is_woken() {
118             let cascade = &[192, 960];
119             assert!(cascade.contains(&elapsed), "elapsed={}", elapsed);
120 
121             assert_pending!(poll!(queue));
122         }
123     }
124 }
125 
126 #[tokio::test]
insert_in_past_fires_immediately()127 async fn insert_in_past_fires_immediately() {
128     time::pause();
129 
130     let mut queue = task::spawn(DelayQueue::new());
131     let now = Instant::now();
132 
133     delay_for(ms(10)).await;
134 
135     queue.insert_at("foo", now);
136 
137     assert_ready!(poll!(queue));
138 }
139 
140 #[tokio::test]
remove_entry()141 async fn remove_entry() {
142     time::pause();
143 
144     let mut queue = task::spawn(DelayQueue::new());
145 
146     let key = queue.insert_at("foo", Instant::now() + ms(5));
147 
148     assert_pending!(poll!(queue));
149 
150     let entry = queue.remove(&key);
151     assert_eq!(entry.into_inner(), "foo");
152 
153     delay_for(ms(10)).await;
154 
155     let entry = assert_ready!(poll!(queue));
156     assert!(entry.is_none());
157 }
158 
159 #[tokio::test]
reset_entry()160 async fn reset_entry() {
161     time::pause();
162 
163     let mut queue = task::spawn(DelayQueue::new());
164 
165     let now = Instant::now();
166     let key = queue.insert_at("foo", now + ms(5));
167 
168     assert_pending!(poll!(queue));
169     delay_for(ms(1)).await;
170 
171     queue.reset_at(&key, now + ms(10));
172 
173     assert_pending!(poll!(queue));
174 
175     delay_for(ms(7)).await;
176 
177     assert!(!queue.is_woken());
178 
179     assert_pending!(poll!(queue));
180 
181     delay_for(ms(3)).await;
182 
183     assert!(queue.is_woken());
184 
185     let entry = assert_ready_ok!(poll!(queue));
186     assert_eq!(*entry.get_ref(), "foo");
187 
188     let entry = assert_ready!(poll!(queue));
189     assert!(entry.is_none())
190 }
191 
192 // Reproduces tokio-rs/tokio#849.
193 #[tokio::test]
reset_much_later()194 async fn reset_much_later() {
195     time::pause();
196 
197     let mut queue = task::spawn(DelayQueue::new());
198 
199     let now = Instant::now();
200     delay_for(ms(1)).await;
201 
202     let key = queue.insert_at("foo", now + ms(200));
203     assert_pending!(poll!(queue));
204 
205     delay_for(ms(3)).await;
206 
207     queue.reset_at(&key, now + ms(5));
208 
209     delay_for(ms(20)).await;
210 
211     assert!(queue.is_woken());
212 }
213 
214 // Reproduces tokio-rs/tokio#849.
215 #[tokio::test]
reset_twice()216 async fn reset_twice() {
217     time::pause();
218 
219     let mut queue = task::spawn(DelayQueue::new());
220     let now = Instant::now();
221 
222     delay_for(ms(1)).await;
223 
224     let key = queue.insert_at("foo", now + ms(200));
225 
226     assert_pending!(poll!(queue));
227 
228     delay_for(ms(3)).await;
229 
230     queue.reset_at(&key, now + ms(50));
231 
232     delay_for(ms(20)).await;
233 
234     queue.reset_at(&key, now + ms(40));
235 
236     delay_for(ms(20)).await;
237 
238     assert!(queue.is_woken());
239 }
240 
241 #[tokio::test]
remove_expired_item()242 async fn remove_expired_item() {
243     time::pause();
244 
245     let mut queue = DelayQueue::new();
246 
247     let now = Instant::now();
248 
249     delay_for(ms(10)).await;
250 
251     let key = queue.insert_at("foo", now);
252 
253     let entry = queue.remove(&key);
254     assert_eq!(entry.into_inner(), "foo");
255 }
256 
257 #[tokio::test]
expires_before_last_insert()258 async fn expires_before_last_insert() {
259     time::pause();
260 
261     let mut queue = task::spawn(DelayQueue::new());
262 
263     let now = Instant::now();
264 
265     queue.insert_at("foo", now + ms(10_000));
266 
267     // Delay should be set to 8.192s here.
268     assert_pending!(poll!(queue));
269 
270     // Delay should be set to the delay of the new item here
271     queue.insert_at("bar", now + ms(600));
272 
273     assert_pending!(poll!(queue));
274 
275     delay_for(ms(600)).await;
276 
277     assert!(queue.is_woken());
278 
279     let entry = assert_ready_ok!(poll!(queue)).into_inner();
280     assert_eq!(entry, "bar");
281 }
282 
283 #[tokio::test]
multi_reset()284 async fn multi_reset() {
285     time::pause();
286 
287     let mut queue = task::spawn(DelayQueue::new());
288 
289     let now = Instant::now();
290 
291     let one = queue.insert_at("one", now + ms(200));
292     let two = queue.insert_at("two", now + ms(250));
293 
294     assert_pending!(poll!(queue));
295 
296     queue.reset_at(&one, now + ms(300));
297     queue.reset_at(&two, now + ms(350));
298     queue.reset_at(&one, now + ms(400));
299 
300     delay_for(ms(310)).await;
301 
302     assert_pending!(poll!(queue));
303 
304     delay_for(ms(50)).await;
305 
306     let entry = assert_ready_ok!(poll!(queue));
307     assert_eq!(*entry.get_ref(), "two");
308 
309     assert_pending!(poll!(queue));
310 
311     delay_for(ms(50)).await;
312 
313     let entry = assert_ready_ok!(poll!(queue));
314     assert_eq!(*entry.get_ref(), "one");
315 
316     let entry = assert_ready!(poll!(queue));
317     assert!(entry.is_none())
318 }
319 
320 #[tokio::test]
expire_first_key_when_reset_to_expire_earlier()321 async fn expire_first_key_when_reset_to_expire_earlier() {
322     time::pause();
323 
324     let mut queue = task::spawn(DelayQueue::new());
325 
326     let now = Instant::now();
327 
328     let one = queue.insert_at("one", now + ms(200));
329     queue.insert_at("two", now + ms(250));
330 
331     assert_pending!(poll!(queue));
332 
333     queue.reset_at(&one, now + ms(100));
334 
335     delay_for(ms(100)).await;
336 
337     assert!(queue.is_woken());
338 
339     let entry = assert_ready_ok!(poll!(queue)).into_inner();
340     assert_eq!(entry, "one");
341 }
342 
343 #[tokio::test]
expire_second_key_when_reset_to_expire_earlier()344 async fn expire_second_key_when_reset_to_expire_earlier() {
345     time::pause();
346 
347     let mut queue = task::spawn(DelayQueue::new());
348 
349     let now = Instant::now();
350 
351     queue.insert_at("one", now + ms(200));
352     let two = queue.insert_at("two", now + ms(250));
353 
354     assert_pending!(poll!(queue));
355 
356     queue.reset_at(&two, now + ms(100));
357 
358     delay_for(ms(100)).await;
359 
360     assert!(queue.is_woken());
361 
362     let entry = assert_ready_ok!(poll!(queue)).into_inner();
363     assert_eq!(entry, "two");
364 }
365 
366 #[tokio::test]
reset_first_expiring_item_to_expire_later()367 async fn reset_first_expiring_item_to_expire_later() {
368     time::pause();
369 
370     let mut queue = task::spawn(DelayQueue::new());
371 
372     let now = Instant::now();
373 
374     let one = queue.insert_at("one", now + ms(200));
375     let _two = queue.insert_at("two", now + ms(250));
376 
377     assert_pending!(poll!(queue));
378 
379     queue.reset_at(&one, now + ms(300));
380     delay_for(ms(250)).await;
381 
382     assert!(queue.is_woken());
383 
384     let entry = assert_ready_ok!(poll!(queue)).into_inner();
385     assert_eq!(entry, "two");
386 }
387 
388 #[tokio::test]
insert_before_first_after_poll()389 async fn insert_before_first_after_poll() {
390     time::pause();
391 
392     let mut queue = task::spawn(DelayQueue::new());
393 
394     let now = Instant::now();
395 
396     let _one = queue.insert_at("one", now + ms(200));
397 
398     assert_pending!(poll!(queue));
399 
400     let _two = queue.insert_at("two", now + ms(100));
401 
402     delay_for(ms(99)).await;
403 
404     assert!(!queue.is_woken());
405 
406     delay_for(ms(1)).await;
407 
408     assert!(queue.is_woken());
409 
410     let entry = assert_ready_ok!(poll!(queue)).into_inner();
411     assert_eq!(entry, "two");
412 }
413 
414 #[tokio::test]
insert_after_ready_poll()415 async fn insert_after_ready_poll() {
416     time::pause();
417 
418     let mut queue = task::spawn(DelayQueue::new());
419 
420     let now = Instant::now();
421 
422     queue.insert_at("1", now + ms(100));
423     queue.insert_at("2", now + ms(100));
424     queue.insert_at("3", now + ms(100));
425 
426     assert_pending!(poll!(queue));
427 
428     delay_for(ms(100)).await;
429 
430     assert!(queue.is_woken());
431 
432     let mut res = vec![];
433 
434     while res.len() < 3 {
435         let entry = assert_ready_ok!(poll!(queue));
436         res.push(entry.into_inner());
437         queue.insert_at("foo", now + ms(500));
438     }
439 
440     res.sort();
441 
442     assert_eq!("1", res[0]);
443     assert_eq!("2", res[1]);
444     assert_eq!("3", res[2]);
445 }
446 
447 #[tokio::test]
reset_later_after_slot_starts()448 async fn reset_later_after_slot_starts() {
449     time::pause();
450 
451     let mut queue = task::spawn(DelayQueue::new());
452 
453     let now = Instant::now();
454 
455     let foo = queue.insert_at("foo", now + ms(100));
456 
457     assert_pending!(poll!(queue));
458 
459     delay_for(ms(80)).await;
460 
461     assert!(!queue.is_woken());
462 
463     // At this point the queue hasn't been polled, so `elapsed` on the wheel
464     // for the queue is still at 0 and hence the 1ms resolution slots cover
465     // [0-64).  Resetting the time on the entry to 120 causes it to get put in
466     // the [64-128) slot.  As the queue knows that the first entry is within
467     // that slot, but doesn't know when, it must wake immediately to advance
468     // the wheel.
469     queue.reset_at(&foo, now + ms(120));
470     assert!(queue.is_woken());
471 
472     assert_pending!(poll!(queue));
473 
474     delay_for(ms(39)).await;
475     assert!(!queue.is_woken());
476 
477     delay_for(ms(1)).await;
478     assert!(queue.is_woken());
479 
480     let entry = assert_ready_ok!(poll!(queue)).into_inner();
481     assert_eq!(entry, "foo");
482 }
483 
484 #[tokio::test]
reset_earlier_after_slot_starts()485 async fn reset_earlier_after_slot_starts() {
486     time::pause();
487 
488     let mut queue = task::spawn(DelayQueue::new());
489 
490     let now = Instant::now();
491 
492     let foo = queue.insert_at("foo", now + ms(200));
493 
494     assert_pending!(poll!(queue));
495 
496     delay_for(ms(80)).await;
497 
498     assert!(!queue.is_woken());
499 
500     // At this point the queue hasn't been polled, so `elapsed` on the wheel
501     // for the queue is still at 0 and hence the 1ms resolution slots cover
502     // [0-64).  Resetting the time on the entry to 120 causes it to get put in
503     // the [64-128) slot.  As the queue knows that the first entry is within
504     // that slot, but doesn't know when, it must wake immediately to advance
505     // the wheel.
506     queue.reset_at(&foo, now + ms(120));
507     assert!(queue.is_woken());
508 
509     assert_pending!(poll!(queue));
510 
511     delay_for(ms(39)).await;
512     assert!(!queue.is_woken());
513 
514     delay_for(ms(1)).await;
515     assert!(queue.is_woken());
516 
517     let entry = assert_ready_ok!(poll!(queue)).into_inner();
518     assert_eq!(entry, "foo");
519 }
520 
521 #[tokio::test]
insert_in_past_after_poll_fires_immediately()522 async fn insert_in_past_after_poll_fires_immediately() {
523     time::pause();
524 
525     let mut queue = task::spawn(DelayQueue::new());
526 
527     let now = Instant::now();
528 
529     queue.insert_at("foo", now + ms(200));
530 
531     assert_pending!(poll!(queue));
532 
533     delay_for(ms(80)).await;
534 
535     assert!(!queue.is_woken());
536     queue.insert_at("bar", now + ms(40));
537 
538     assert!(queue.is_woken());
539 
540     let entry = assert_ready_ok!(poll!(queue)).into_inner();
541     assert_eq!(entry, "bar");
542 }
543 
ms(n: u64) -> Duration544 fn ms(n: u64) -> Duration {
545     Duration::from_millis(n)
546 }
547