1 #![cfg(feature = "full")]
2 
3 use tokio::stream::{self, StreamExt};
4 use tokio::time::{self, delay_for, Duration};
5 use tokio_test::*;
6 
7 use futures::StreamExt as _;
8 
maybe_delay(idx: i32) -> i329 async fn maybe_delay(idx: i32) -> i32 {
10     if idx % 2 == 0 {
11         delay_for(ms(200)).await;
12     }
13     idx
14 }
15 
ms(n: u64) -> Duration16 fn ms(n: u64) -> Duration {
17     Duration::from_millis(n)
18 }
19 
20 #[tokio::test]
basic_usage()21 async fn basic_usage() {
22     time::pause();
23 
24     // Items 2 and 4 time out. If we run the stream until it completes,
25     // we end up with the following items:
26     //
27     // [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)]
28 
29     let stream = stream::iter(1..=4).then(maybe_delay).timeout(ms(100));
30     let mut stream = task::spawn(stream);
31 
32     // First item completes immediately
33     assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
34 
35     // Second item is delayed 200ms, times out after 100ms
36     assert_pending!(stream.poll_next());
37 
38     time::advance(ms(150)).await;
39     let v = assert_ready!(stream.poll_next());
40     assert!(v.unwrap().is_err());
41 
42     assert_pending!(stream.poll_next());
43 
44     time::advance(ms(100)).await;
45     assert_ready_eq!(stream.poll_next(), Some(Ok(2)));
46 
47     // Third item is ready immediately
48     assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
49 
50     // Fourth item is delayed 200ms, times out after 100ms
51     assert_pending!(stream.poll_next());
52 
53     time::advance(ms(60)).await;
54     assert_pending!(stream.poll_next()); // nothing ready yet
55 
56     time::advance(ms(60)).await;
57     let v = assert_ready!(stream.poll_next());
58     assert!(v.unwrap().is_err()); // timeout!
59 
60     time::advance(ms(120)).await;
61     assert_ready_eq!(stream.poll_next(), Some(Ok(4)));
62 
63     // Done.
64     assert_ready_eq!(stream.poll_next(), None);
65 }
66 
67 #[tokio::test]
return_elapsed_errors_only_once()68 async fn return_elapsed_errors_only_once() {
69     time::pause();
70 
71     let stream = stream::iter(1..=3).then(maybe_delay).timeout(ms(50));
72     let mut stream = task::spawn(stream);
73 
74     // First item completes immediately
75     assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
76 
77     // Second item is delayed 200ms, times out after 50ms. Only one `Elapsed`
78     // error is returned.
79     assert_pending!(stream.poll_next());
80     //
81     time::advance(ms(50)).await;
82     let v = assert_ready!(stream.poll_next());
83     assert!(v.unwrap().is_err()); // timeout!
84 
85     // deadline elapses again, but no error is returned
86     time::advance(ms(50)).await;
87     assert_pending!(stream.poll_next());
88 
89     time::advance(ms(100)).await;
90     assert_ready_eq!(stream.poll_next(), Some(Ok(2)));
91     assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
92 
93     // Done
94     assert_ready_eq!(stream.poll_next(), None);
95 }
96 
97 #[tokio::test]
no_timeouts()98 async fn no_timeouts() {
99     let stream = stream::iter(vec![1, 3, 5])
100         .then(maybe_delay)
101         .timeout(ms(100));
102 
103     let mut stream = task::spawn(stream);
104 
105     assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
106     assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
107     assert_ready_eq!(stream.poll_next(), Some(Ok(5)));
108     assert_ready_eq!(stream.poll_next(), None);
109 }
110