1 use tokio_stream::{self as stream, StreamExt};
2 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
3
4 mod support {
5 pub(crate) mod mpsc;
6 }
7
8 use support::mpsc;
9
10 #[allow(clippy::let_unit_value)]
11 #[tokio::test]
empty_unit()12 async fn empty_unit() {
13 // Drains the stream.
14 let mut iter = vec![(), (), ()].into_iter();
15 let _: () = stream::iter(&mut iter).collect().await;
16 assert!(iter.next().is_none());
17 }
18
19 #[tokio::test]
empty_vec()20 async fn empty_vec() {
21 let coll: Vec<u32> = stream::empty().collect().await;
22 assert!(coll.is_empty());
23 }
24
25 #[tokio::test]
empty_box_slice()26 async fn empty_box_slice() {
27 let coll: Box<[u32]> = stream::empty().collect().await;
28 assert!(coll.is_empty());
29 }
30
31 #[tokio::test]
empty_string()32 async fn empty_string() {
33 let coll: String = stream::empty::<&str>().collect().await;
34 assert!(coll.is_empty());
35 }
36
37 #[tokio::test]
empty_result()38 async fn empty_result() {
39 let coll: Result<Vec<u32>, &str> = stream::empty().collect().await;
40 assert_eq!(Ok(vec![]), coll);
41 }
42
43 #[tokio::test]
collect_vec_items()44 async fn collect_vec_items() {
45 let (tx, rx) = mpsc::unbounded_channel_stream();
46 let mut fut = task::spawn(rx.collect::<Vec<i32>>());
47
48 assert_pending!(fut.poll());
49
50 tx.send(1).unwrap();
51 assert!(fut.is_woken());
52 assert_pending!(fut.poll());
53
54 tx.send(2).unwrap();
55 assert!(fut.is_woken());
56 assert_pending!(fut.poll());
57
58 drop(tx);
59 assert!(fut.is_woken());
60 let coll = assert_ready!(fut.poll());
61 assert_eq!(vec![1, 2], coll);
62 }
63
64 #[tokio::test]
collect_string_items()65 async fn collect_string_items() {
66 let (tx, rx) = mpsc::unbounded_channel_stream();
67
68 let mut fut = task::spawn(rx.collect::<String>());
69
70 assert_pending!(fut.poll());
71
72 tx.send("hello ".to_string()).unwrap();
73 assert!(fut.is_woken());
74 assert_pending!(fut.poll());
75
76 tx.send("world".to_string()).unwrap();
77 assert!(fut.is_woken());
78 assert_pending!(fut.poll());
79
80 drop(tx);
81 assert!(fut.is_woken());
82 let coll = assert_ready!(fut.poll());
83 assert_eq!("hello world", coll);
84 }
85
86 #[tokio::test]
collect_str_items()87 async fn collect_str_items() {
88 let (tx, rx) = mpsc::unbounded_channel_stream();
89
90 let mut fut = task::spawn(rx.collect::<String>());
91
92 assert_pending!(fut.poll());
93
94 tx.send("hello ").unwrap();
95 assert!(fut.is_woken());
96 assert_pending!(fut.poll());
97
98 tx.send("world").unwrap();
99 assert!(fut.is_woken());
100 assert_pending!(fut.poll());
101
102 drop(tx);
103 assert!(fut.is_woken());
104 let coll = assert_ready!(fut.poll());
105 assert_eq!("hello world", coll);
106 }
107
108 #[tokio::test]
collect_results_ok()109 async fn collect_results_ok() {
110 let (tx, rx) = mpsc::unbounded_channel_stream();
111
112 let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
113
114 assert_pending!(fut.poll());
115
116 tx.send(Ok("hello ")).unwrap();
117 assert!(fut.is_woken());
118 assert_pending!(fut.poll());
119
120 tx.send(Ok("world")).unwrap();
121 assert!(fut.is_woken());
122 assert_pending!(fut.poll());
123
124 drop(tx);
125 assert!(fut.is_woken());
126 let coll = assert_ready_ok!(fut.poll());
127 assert_eq!("hello world", coll);
128 }
129
130 #[tokio::test]
collect_results_err()131 async fn collect_results_err() {
132 let (tx, rx) = mpsc::unbounded_channel_stream();
133
134 let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
135
136 assert_pending!(fut.poll());
137
138 tx.send(Ok("hello ")).unwrap();
139 assert!(fut.is_woken());
140 assert_pending!(fut.poll());
141
142 tx.send(Err("oh no")).unwrap();
143 assert!(fut.is_woken());
144 let err = assert_ready_err!(fut.poll());
145 assert_eq!("oh no", err);
146 }
147