1 #![allow(clippy::blacklisted_name)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4
5 use tokio::time::{self, delay_for, DelayQueue, Duration, Instant};
6 use tokio_test::{assert_ok, assert_pending, assert_ready, task};
7
8 macro_rules! poll {
9 ($queue:ident) => {
10 $queue.enter(|cx, mut queue| queue.poll_expired(cx))
11 };
12 }
13
14 macro_rules! assert_ready_ok {
15 ($e:expr) => {{
16 assert_ok!(match assert_ready!($e) {
17 Some(v) => v,
18 None => panic!("None"),
19 })
20 }};
21 }
22
23 #[tokio::test]
single_immediate_delay()24 async fn single_immediate_delay() {
25 time::pause();
26
27 let mut queue = task::spawn(DelayQueue::new());
28 let _key = queue.insert_at("foo", Instant::now());
29
30 // Advance time by 1ms to handle thee rounding
31 delay_for(ms(1)).await;
32
33 assert_ready_ok!(poll!(queue));
34
35 let entry = assert_ready!(poll!(queue));
36 assert!(entry.is_none())
37 }
38
39 #[tokio::test]
multi_immediate_delays()40 async fn multi_immediate_delays() {
41 time::pause();
42
43 let mut queue = task::spawn(DelayQueue::new());
44
45 let _k = queue.insert_at("1", Instant::now());
46 let _k = queue.insert_at("2", Instant::now());
47 let _k = queue.insert_at("3", Instant::now());
48
49 delay_for(ms(1)).await;
50
51 let mut res = vec![];
52
53 while res.len() < 3 {
54 let entry = assert_ready_ok!(poll!(queue));
55 res.push(entry.into_inner());
56 }
57
58 let entry = assert_ready!(poll!(queue));
59 assert!(entry.is_none());
60
61 res.sort();
62
63 assert_eq!("1", res[0]);
64 assert_eq!("2", res[1]);
65 assert_eq!("3", res[2]);
66 }
67
68 #[tokio::test]
single_short_delay()69 async fn single_short_delay() {
70 time::pause();
71
72 let mut queue = task::spawn(DelayQueue::new());
73 let _key = queue.insert_at("foo", Instant::now() + ms(5));
74
75 assert_pending!(poll!(queue));
76
77 delay_for(ms(1)).await;
78
79 assert!(!queue.is_woken());
80
81 delay_for(ms(5)).await;
82
83 assert!(queue.is_woken());
84
85 let entry = assert_ready_ok!(poll!(queue));
86 assert_eq!(*entry.get_ref(), "foo");
87
88 let entry = assert_ready!(poll!(queue));
89 assert!(entry.is_none());
90 }
91
92 #[tokio::test]
multi_delay_at_start()93 async fn multi_delay_at_start() {
94 time::pause();
95
96 let long = 262_144 + 9 * 4096;
97 let delays = &[1000, 2, 234, long, 60, 10];
98
99 let mut queue = task::spawn(DelayQueue::new());
100
101 // Setup the delays
102 for &i in delays {
103 let _key = queue.insert_at(i, Instant::now() + ms(i));
104 }
105
106 assert_pending!(poll!(queue));
107 assert!(!queue.is_woken());
108
109 for elapsed in 0..1200 {
110 delay_for(ms(1)).await;
111 let elapsed = elapsed + 1;
112
113 if delays.contains(&elapsed) {
114 assert!(queue.is_woken());
115 assert_ready!(poll!(queue));
116 assert_pending!(poll!(queue));
117 } else if queue.is_woken() {
118 let cascade = &[192, 960];
119 assert!(cascade.contains(&elapsed), "elapsed={}", elapsed);
120
121 assert_pending!(poll!(queue));
122 }
123 }
124 }
125
126 #[tokio::test]
insert_in_past_fires_immediately()127 async fn insert_in_past_fires_immediately() {
128 time::pause();
129
130 let mut queue = task::spawn(DelayQueue::new());
131 let now = Instant::now();
132
133 delay_for(ms(10)).await;
134
135 queue.insert_at("foo", now);
136
137 assert_ready!(poll!(queue));
138 }
139
140 #[tokio::test]
remove_entry()141 async fn remove_entry() {
142 time::pause();
143
144 let mut queue = task::spawn(DelayQueue::new());
145
146 let key = queue.insert_at("foo", Instant::now() + ms(5));
147
148 assert_pending!(poll!(queue));
149
150 let entry = queue.remove(&key);
151 assert_eq!(entry.into_inner(), "foo");
152
153 delay_for(ms(10)).await;
154
155 let entry = assert_ready!(poll!(queue));
156 assert!(entry.is_none());
157 }
158
159 #[tokio::test]
reset_entry()160 async fn reset_entry() {
161 time::pause();
162
163 let mut queue = task::spawn(DelayQueue::new());
164
165 let now = Instant::now();
166 let key = queue.insert_at("foo", now + ms(5));
167
168 assert_pending!(poll!(queue));
169 delay_for(ms(1)).await;
170
171 queue.reset_at(&key, now + ms(10));
172
173 assert_pending!(poll!(queue));
174
175 delay_for(ms(7)).await;
176
177 assert!(!queue.is_woken());
178
179 assert_pending!(poll!(queue));
180
181 delay_for(ms(3)).await;
182
183 assert!(queue.is_woken());
184
185 let entry = assert_ready_ok!(poll!(queue));
186 assert_eq!(*entry.get_ref(), "foo");
187
188 let entry = assert_ready!(poll!(queue));
189 assert!(entry.is_none())
190 }
191
192 // Reproduces tokio-rs/tokio#849.
193 #[tokio::test]
reset_much_later()194 async fn reset_much_later() {
195 time::pause();
196
197 let mut queue = task::spawn(DelayQueue::new());
198
199 let now = Instant::now();
200 delay_for(ms(1)).await;
201
202 let key = queue.insert_at("foo", now + ms(200));
203 assert_pending!(poll!(queue));
204
205 delay_for(ms(3)).await;
206
207 queue.reset_at(&key, now + ms(5));
208
209 delay_for(ms(20)).await;
210
211 assert!(queue.is_woken());
212 }
213
214 // Reproduces tokio-rs/tokio#849.
215 #[tokio::test]
reset_twice()216 async fn reset_twice() {
217 time::pause();
218
219 let mut queue = task::spawn(DelayQueue::new());
220 let now = Instant::now();
221
222 delay_for(ms(1)).await;
223
224 let key = queue.insert_at("foo", now + ms(200));
225
226 assert_pending!(poll!(queue));
227
228 delay_for(ms(3)).await;
229
230 queue.reset_at(&key, now + ms(50));
231
232 delay_for(ms(20)).await;
233
234 queue.reset_at(&key, now + ms(40));
235
236 delay_for(ms(20)).await;
237
238 assert!(queue.is_woken());
239 }
240
241 #[tokio::test]
remove_expired_item()242 async fn remove_expired_item() {
243 time::pause();
244
245 let mut queue = DelayQueue::new();
246
247 let now = Instant::now();
248
249 delay_for(ms(10)).await;
250
251 let key = queue.insert_at("foo", now);
252
253 let entry = queue.remove(&key);
254 assert_eq!(entry.into_inner(), "foo");
255 }
256
257 #[tokio::test]
expires_before_last_insert()258 async fn expires_before_last_insert() {
259 time::pause();
260
261 let mut queue = task::spawn(DelayQueue::new());
262
263 let now = Instant::now();
264
265 queue.insert_at("foo", now + ms(10_000));
266
267 // Delay should be set to 8.192s here.
268 assert_pending!(poll!(queue));
269
270 // Delay should be set to the delay of the new item here
271 queue.insert_at("bar", now + ms(600));
272
273 assert_pending!(poll!(queue));
274
275 delay_for(ms(600)).await;
276
277 assert!(queue.is_woken());
278
279 let entry = assert_ready_ok!(poll!(queue)).into_inner();
280 assert_eq!(entry, "bar");
281 }
282
283 #[tokio::test]
multi_reset()284 async fn multi_reset() {
285 time::pause();
286
287 let mut queue = task::spawn(DelayQueue::new());
288
289 let now = Instant::now();
290
291 let one = queue.insert_at("one", now + ms(200));
292 let two = queue.insert_at("two", now + ms(250));
293
294 assert_pending!(poll!(queue));
295
296 queue.reset_at(&one, now + ms(300));
297 queue.reset_at(&two, now + ms(350));
298 queue.reset_at(&one, now + ms(400));
299
300 delay_for(ms(310)).await;
301
302 assert_pending!(poll!(queue));
303
304 delay_for(ms(50)).await;
305
306 let entry = assert_ready_ok!(poll!(queue));
307 assert_eq!(*entry.get_ref(), "two");
308
309 assert_pending!(poll!(queue));
310
311 delay_for(ms(50)).await;
312
313 let entry = assert_ready_ok!(poll!(queue));
314 assert_eq!(*entry.get_ref(), "one");
315
316 let entry = assert_ready!(poll!(queue));
317 assert!(entry.is_none())
318 }
319
320 #[tokio::test]
expire_first_key_when_reset_to_expire_earlier()321 async fn expire_first_key_when_reset_to_expire_earlier() {
322 time::pause();
323
324 let mut queue = task::spawn(DelayQueue::new());
325
326 let now = Instant::now();
327
328 let one = queue.insert_at("one", now + ms(200));
329 queue.insert_at("two", now + ms(250));
330
331 assert_pending!(poll!(queue));
332
333 queue.reset_at(&one, now + ms(100));
334
335 delay_for(ms(100)).await;
336
337 assert!(queue.is_woken());
338
339 let entry = assert_ready_ok!(poll!(queue)).into_inner();
340 assert_eq!(entry, "one");
341 }
342
343 #[tokio::test]
expire_second_key_when_reset_to_expire_earlier()344 async fn expire_second_key_when_reset_to_expire_earlier() {
345 time::pause();
346
347 let mut queue = task::spawn(DelayQueue::new());
348
349 let now = Instant::now();
350
351 queue.insert_at("one", now + ms(200));
352 let two = queue.insert_at("two", now + ms(250));
353
354 assert_pending!(poll!(queue));
355
356 queue.reset_at(&two, now + ms(100));
357
358 delay_for(ms(100)).await;
359
360 assert!(queue.is_woken());
361
362 let entry = assert_ready_ok!(poll!(queue)).into_inner();
363 assert_eq!(entry, "two");
364 }
365
366 #[tokio::test]
reset_first_expiring_item_to_expire_later()367 async fn reset_first_expiring_item_to_expire_later() {
368 time::pause();
369
370 let mut queue = task::spawn(DelayQueue::new());
371
372 let now = Instant::now();
373
374 let one = queue.insert_at("one", now + ms(200));
375 let _two = queue.insert_at("two", now + ms(250));
376
377 assert_pending!(poll!(queue));
378
379 queue.reset_at(&one, now + ms(300));
380 delay_for(ms(250)).await;
381
382 assert!(queue.is_woken());
383
384 let entry = assert_ready_ok!(poll!(queue)).into_inner();
385 assert_eq!(entry, "two");
386 }
387
388 #[tokio::test]
insert_before_first_after_poll()389 async fn insert_before_first_after_poll() {
390 time::pause();
391
392 let mut queue = task::spawn(DelayQueue::new());
393
394 let now = Instant::now();
395
396 let _one = queue.insert_at("one", now + ms(200));
397
398 assert_pending!(poll!(queue));
399
400 let _two = queue.insert_at("two", now + ms(100));
401
402 delay_for(ms(99)).await;
403
404 assert!(!queue.is_woken());
405
406 delay_for(ms(1)).await;
407
408 assert!(queue.is_woken());
409
410 let entry = assert_ready_ok!(poll!(queue)).into_inner();
411 assert_eq!(entry, "two");
412 }
413
414 #[tokio::test]
insert_after_ready_poll()415 async fn insert_after_ready_poll() {
416 time::pause();
417
418 let mut queue = task::spawn(DelayQueue::new());
419
420 let now = Instant::now();
421
422 queue.insert_at("1", now + ms(100));
423 queue.insert_at("2", now + ms(100));
424 queue.insert_at("3", now + ms(100));
425
426 assert_pending!(poll!(queue));
427
428 delay_for(ms(100)).await;
429
430 assert!(queue.is_woken());
431
432 let mut res = vec![];
433
434 while res.len() < 3 {
435 let entry = assert_ready_ok!(poll!(queue));
436 res.push(entry.into_inner());
437 queue.insert_at("foo", now + ms(500));
438 }
439
440 res.sort();
441
442 assert_eq!("1", res[0]);
443 assert_eq!("2", res[1]);
444 assert_eq!("3", res[2]);
445 }
446
447 #[tokio::test]
reset_later_after_slot_starts()448 async fn reset_later_after_slot_starts() {
449 time::pause();
450
451 let mut queue = task::spawn(DelayQueue::new());
452
453 let now = Instant::now();
454
455 let foo = queue.insert_at("foo", now + ms(100));
456
457 assert_pending!(poll!(queue));
458
459 delay_for(ms(80)).await;
460
461 assert!(!queue.is_woken());
462
463 // At this point the queue hasn't been polled, so `elapsed` on the wheel
464 // for the queue is still at 0 and hence the 1ms resolution slots cover
465 // [0-64). Resetting the time on the entry to 120 causes it to get put in
466 // the [64-128) slot. As the queue knows that the first entry is within
467 // that slot, but doesn't know when, it must wake immediately to advance
468 // the wheel.
469 queue.reset_at(&foo, now + ms(120));
470 assert!(queue.is_woken());
471
472 assert_pending!(poll!(queue));
473
474 delay_for(ms(39)).await;
475 assert!(!queue.is_woken());
476
477 delay_for(ms(1)).await;
478 assert!(queue.is_woken());
479
480 let entry = assert_ready_ok!(poll!(queue)).into_inner();
481 assert_eq!(entry, "foo");
482 }
483
484 #[tokio::test]
reset_earlier_after_slot_starts()485 async fn reset_earlier_after_slot_starts() {
486 time::pause();
487
488 let mut queue = task::spawn(DelayQueue::new());
489
490 let now = Instant::now();
491
492 let foo = queue.insert_at("foo", now + ms(200));
493
494 assert_pending!(poll!(queue));
495
496 delay_for(ms(80)).await;
497
498 assert!(!queue.is_woken());
499
500 // At this point the queue hasn't been polled, so `elapsed` on the wheel
501 // for the queue is still at 0 and hence the 1ms resolution slots cover
502 // [0-64). Resetting the time on the entry to 120 causes it to get put in
503 // the [64-128) slot. As the queue knows that the first entry is within
504 // that slot, but doesn't know when, it must wake immediately to advance
505 // the wheel.
506 queue.reset_at(&foo, now + ms(120));
507 assert!(queue.is_woken());
508
509 assert_pending!(poll!(queue));
510
511 delay_for(ms(39)).await;
512 assert!(!queue.is_woken());
513
514 delay_for(ms(1)).await;
515 assert!(queue.is_woken());
516
517 let entry = assert_ready_ok!(poll!(queue)).into_inner();
518 assert_eq!(entry, "foo");
519 }
520
521 #[tokio::test]
insert_in_past_after_poll_fires_immediately()522 async fn insert_in_past_after_poll_fires_immediately() {
523 time::pause();
524
525 let mut queue = task::spawn(DelayQueue::new());
526
527 let now = Instant::now();
528
529 queue.insert_at("foo", now + ms(200));
530
531 assert_pending!(poll!(queue));
532
533 delay_for(ms(80)).await;
534
535 assert!(!queue.is_woken());
536 queue.insert_at("bar", now + ms(40));
537
538 assert!(queue.is_woken());
539
540 let entry = assert_ready_ok!(poll!(queue)).into_inner();
541 assert_eq!(entry, "bar");
542 }
543
ms(n: u64) -> Duration544 fn ms(n: u64) -> Duration {
545 Duration::from_millis(n)
546 }
547