1 use futures::channel::{mpsc, oneshot};
2 use futures::executor::block_on;
3 use futures::future::{self, poll_fn, FutureExt};
4 use futures::sink::SinkExt;
5 use futures::stream::StreamExt;
6 use futures::task::{Context, Poll};
7 use futures::{
8     join, pending, pin_mut, poll, select, select_biased, stream, stream_select, try_join,
9 };
10 use std::mem;
11 
12 #[test]
poll_and_pending()13 fn poll_and_pending() {
14     let pending_once = async { pending!() };
15     block_on(async {
16         pin_mut!(pending_once);
17         assert_eq!(Poll::Pending, poll!(&mut pending_once));
18         assert_eq!(Poll::Ready(()), poll!(&mut pending_once));
19     });
20 }
21 
22 #[test]
join()23 fn join() {
24     let (tx1, rx1) = oneshot::channel::<i32>();
25     let (tx2, rx2) = oneshot::channel::<i32>();
26 
27     let fut = async {
28         let res = join!(rx1, rx2);
29         assert_eq!((Ok(1), Ok(2)), res);
30     };
31 
32     block_on(async {
33         pin_mut!(fut);
34         assert_eq!(Poll::Pending, poll!(&mut fut));
35         tx1.send(1).unwrap();
36         assert_eq!(Poll::Pending, poll!(&mut fut));
37         tx2.send(2).unwrap();
38         assert_eq!(Poll::Ready(()), poll!(&mut fut));
39     });
40 }
41 
42 #[test]
select()43 fn select() {
44     let (tx1, rx1) = oneshot::channel::<i32>();
45     let (_tx2, rx2) = oneshot::channel::<i32>();
46     tx1.send(1).unwrap();
47     let mut ran = false;
48     block_on(async {
49         select! {
50             res = rx1.fuse() => {
51                 assert_eq!(Ok(1), res);
52                 ran = true;
53             },
54             _ = rx2.fuse() => unreachable!(),
55         }
56     });
57     assert!(ran);
58 }
59 
60 #[test]
select_biased()61 fn select_biased() {
62     let (tx1, rx1) = oneshot::channel::<i32>();
63     let (_tx2, rx2) = oneshot::channel::<i32>();
64     tx1.send(1).unwrap();
65     let mut ran = false;
66     block_on(async {
67         select_biased! {
68             res = rx1.fuse() => {
69                 assert_eq!(Ok(1), res);
70                 ran = true;
71             },
72             _ = rx2.fuse() => unreachable!(),
73         }
74     });
75     assert!(ran);
76 }
77 
78 #[test]
select_streams()79 fn select_streams() {
80     let (mut tx1, rx1) = mpsc::channel::<i32>(1);
81     let (mut tx2, rx2) = mpsc::channel::<i32>(1);
82     let mut rx1 = rx1.fuse();
83     let mut rx2 = rx2.fuse();
84     let mut ran = false;
85     let mut total = 0;
86     block_on(async {
87         let mut tx1_opt;
88         let mut tx2_opt;
89         select! {
90             _ = rx1.next() => panic!(),
91             _ = rx2.next() => panic!(),
92             default => {
93                 tx1.send(2).await.unwrap();
94                 tx2.send(3).await.unwrap();
95                 tx1_opt = Some(tx1);
96                 tx2_opt = Some(tx2);
97             }
98             complete => panic!(),
99         }
100         loop {
101             select! {
102                 // runs first and again after default
103                 x = rx1.next() => if let Some(x) = x { total += x; },
104                 // runs second and again after default
105                 x = rx2.next()  => if let Some(x) = x { total += x; },
106                 // runs third
107                 default => {
108                     assert_eq!(total, 5);
109                     ran = true;
110                     drop(tx1_opt.take().unwrap());
111                     drop(tx2_opt.take().unwrap());
112                 },
113                 // runs last
114                 complete => break,
115             };
116         }
117     });
118     assert!(ran);
119 }
120 
121 #[test]
select_can_move_uncompleted_futures()122 fn select_can_move_uncompleted_futures() {
123     let (tx1, rx1) = oneshot::channel::<i32>();
124     let (tx2, rx2) = oneshot::channel::<i32>();
125     tx1.send(1).unwrap();
126     tx2.send(2).unwrap();
127     let mut ran = false;
128     let mut rx1 = rx1.fuse();
129     let mut rx2 = rx2.fuse();
130     block_on(async {
131         select! {
132             res = rx1 => {
133                 assert_eq!(Ok(1), res);
134                 assert_eq!(Ok(2), rx2.await);
135                 ran = true;
136             },
137             res = rx2 => {
138                 assert_eq!(Ok(2), res);
139                 assert_eq!(Ok(1), rx1.await);
140                 ran = true;
141             },
142         }
143     });
144     assert!(ran);
145 }
146 
147 #[test]
select_nested()148 fn select_nested() {
149     let mut outer_fut = future::ready(1);
150     let mut inner_fut = future::ready(2);
151     let res = block_on(async {
152         select! {
153             x = outer_fut => {
154                 select! {
155                     y = inner_fut => x + y,
156                 }
157             }
158         }
159     });
160     assert_eq!(res, 3);
161 }
162 
163 #[cfg_attr(not(target_pointer_width = "64"), ignore)]
164 #[test]
select_size()165 fn select_size() {
166     let fut = async {
167         let mut ready = future::ready(0i32);
168         select! {
169             _ = ready => {},
170         }
171     };
172     assert_eq!(mem::size_of_val(&fut), 24);
173 
174     let fut = async {
175         let mut ready1 = future::ready(0i32);
176         let mut ready2 = future::ready(0i32);
177         select! {
178             _ = ready1 => {},
179             _ = ready2 => {},
180         }
181     };
182     assert_eq!(mem::size_of_val(&fut), 40);
183 }
184 
185 #[test]
select_on_non_unpin_expressions()186 fn select_on_non_unpin_expressions() {
187     // The returned Future is !Unpin
188     let make_non_unpin_fut = || async { 5 };
189 
190     let res = block_on(async {
191         let select_res;
192         select! {
193             value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
194             value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
195         };
196         select_res
197     });
198     assert_eq!(res, 5);
199 }
200 
201 #[test]
select_on_non_unpin_expressions_with_default()202 fn select_on_non_unpin_expressions_with_default() {
203     // The returned Future is !Unpin
204     let make_non_unpin_fut = || async { 5 };
205 
206     let res = block_on(async {
207         let select_res;
208         select! {
209             value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
210             value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
211             default => select_res = 7,
212         };
213         select_res
214     });
215     assert_eq!(res, 5);
216 }
217 
218 #[cfg_attr(not(target_pointer_width = "64"), ignore)]
219 #[test]
select_on_non_unpin_size()220 fn select_on_non_unpin_size() {
221     // The returned Future is !Unpin
222     let make_non_unpin_fut = || async { 5 };
223 
224     let fut = async {
225         let select_res;
226         select! {
227             value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
228             value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
229         };
230         select_res
231     };
232 
233     assert_eq!(32, mem::size_of_val(&fut));
234 }
235 
236 #[test]
select_can_be_used_as_expression()237 fn select_can_be_used_as_expression() {
238     block_on(async {
239         let res = select! {
240             x = future::ready(7) => x,
241             y = future::ready(3) => y + 1,
242         };
243         assert!(res == 7 || res == 4);
244     });
245 }
246 
247 #[test]
select_with_default_can_be_used_as_expression()248 fn select_with_default_can_be_used_as_expression() {
249     fn poll_always_pending<T>(_cx: &mut Context<'_>) -> Poll<T> {
250         Poll::Pending
251     }
252 
253     block_on(async {
254         let res = select! {
255             x = poll_fn(poll_always_pending::<i32>).fuse() => x,
256             y = poll_fn(poll_always_pending::<i32>).fuse() => y + 1,
257             default => 99,
258         };
259         assert_eq!(res, 99);
260     });
261 }
262 
263 #[test]
select_with_complete_can_be_used_as_expression()264 fn select_with_complete_can_be_used_as_expression() {
265     block_on(async {
266         let res = select! {
267             x = future::pending::<i32>() => x,
268             y = future::pending::<i32>() => y + 1,
269             default => 99,
270             complete => 237,
271         };
272         assert_eq!(res, 237);
273     });
274 }
275 
276 #[test]
277 #[allow(unused_assignments)]
select_on_mutable_borrowing_future_with_same_borrow_in_block()278 fn select_on_mutable_borrowing_future_with_same_borrow_in_block() {
279     async fn require_mutable(_: &mut i32) {}
280     async fn async_noop() {}
281 
282     block_on(async {
283         let mut value = 234;
284         select! {
285             _ = require_mutable(&mut value).fuse() => { },
286             _ = async_noop().fuse() => {
287                 value += 5;
288             },
289         }
290     });
291 }
292 
293 #[test]
294 #[allow(unused_assignments)]
select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default()295 fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() {
296     async fn require_mutable(_: &mut i32) {}
297     async fn async_noop() {}
298 
299     block_on(async {
300         let mut value = 234;
301         select! {
302             _ = require_mutable(&mut value).fuse() => { },
303             _ = async_noop().fuse() => {
304                 value += 5;
305             },
306             default => {
307                 value += 27;
308             },
309         }
310     });
311 }
312 
313 #[test]
314 #[allow(unused_assignments)]
stream_select()315 fn stream_select() {
316     // stream_select! macro
317     block_on(async {
318         let endless_ints = |i| stream::iter(vec![i].into_iter().cycle());
319 
320         let mut endless_ones = stream_select!(endless_ints(1i32), stream::pending());
321         assert_eq!(endless_ones.next().await, Some(1));
322         assert_eq!(endless_ones.next().await, Some(1));
323 
324         let mut finite_list =
325             stream_select!(stream::iter(vec![1].into_iter()), stream::iter(vec![1].into_iter()));
326         assert_eq!(finite_list.next().await, Some(1));
327         assert_eq!(finite_list.next().await, Some(1));
328         assert_eq!(finite_list.next().await, None);
329 
330         let endless_mixed = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3));
331         // Take 1000, and assert a somewhat even distribution of values.
332         // The fairness is randomized, but over 1000 samples we should be pretty close to even.
333         // This test may be a bit flaky. Feel free to adjust the margins as you see fit.
334         let mut count = 0;
335         let results = endless_mixed
336             .take_while(move |_| {
337                 count += 1;
338                 let ret = count < 1000;
339                 async move { ret }
340             })
341             .collect::<Vec<_>>()
342             .await;
343         assert!(results.iter().filter(|x| **x == 1).count() >= 299);
344         assert!(results.iter().filter(|x| **x == 2).count() >= 299);
345         assert!(results.iter().filter(|x| **x == 3).count() >= 299);
346     });
347 }
348 
349 #[test]
join_size()350 fn join_size() {
351     let fut = async {
352         let ready = future::ready(0i32);
353         join!(ready)
354     };
355     assert_eq!(mem::size_of_val(&fut), 16);
356 
357     let fut = async {
358         let ready1 = future::ready(0i32);
359         let ready2 = future::ready(0i32);
360         join!(ready1, ready2)
361     };
362     assert_eq!(mem::size_of_val(&fut), 28);
363 }
364 
365 #[test]
try_join_size()366 fn try_join_size() {
367     let fut = async {
368         let ready = future::ready(Ok::<i32, i32>(0));
369         try_join!(ready)
370     };
371     assert_eq!(mem::size_of_val(&fut), 16);
372 
373     let fut = async {
374         let ready1 = future::ready(Ok::<i32, i32>(0));
375         let ready2 = future::ready(Ok::<i32, i32>(0));
376         try_join!(ready1, ready2)
377     };
378     assert_eq!(mem::size_of_val(&fut), 28);
379 }
380 
381 #[test]
join_doesnt_require_unpin()382 fn join_doesnt_require_unpin() {
383     let _ = async { join!(async {}, async {}) };
384 }
385 
386 #[test]
try_join_doesnt_require_unpin()387 fn try_join_doesnt_require_unpin() {
388     let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) };
389 }
390