1 use futures::channel::{mpsc, oneshot};
2 use futures::executor::block_on;
3 use futures::future::{self, poll_fn, FutureExt};
4 use futures::sink::SinkExt;
5 use futures::stream::StreamExt;
6 use futures::task::{Context, Poll};
7 use futures::{
8 join, pending, pin_mut, poll, select, select_biased, stream, stream_select, try_join,
9 };
10 use std::mem;
11
12 #[test]
poll_and_pending()13 fn poll_and_pending() {
14 let pending_once = async { pending!() };
15 block_on(async {
16 pin_mut!(pending_once);
17 assert_eq!(Poll::Pending, poll!(&mut pending_once));
18 assert_eq!(Poll::Ready(()), poll!(&mut pending_once));
19 });
20 }
21
22 #[test]
join()23 fn join() {
24 let (tx1, rx1) = oneshot::channel::<i32>();
25 let (tx2, rx2) = oneshot::channel::<i32>();
26
27 let fut = async {
28 let res = join!(rx1, rx2);
29 assert_eq!((Ok(1), Ok(2)), res);
30 };
31
32 block_on(async {
33 pin_mut!(fut);
34 assert_eq!(Poll::Pending, poll!(&mut fut));
35 tx1.send(1).unwrap();
36 assert_eq!(Poll::Pending, poll!(&mut fut));
37 tx2.send(2).unwrap();
38 assert_eq!(Poll::Ready(()), poll!(&mut fut));
39 });
40 }
41
42 #[test]
select()43 fn select() {
44 let (tx1, rx1) = oneshot::channel::<i32>();
45 let (_tx2, rx2) = oneshot::channel::<i32>();
46 tx1.send(1).unwrap();
47 let mut ran = false;
48 block_on(async {
49 select! {
50 res = rx1.fuse() => {
51 assert_eq!(Ok(1), res);
52 ran = true;
53 },
54 _ = rx2.fuse() => unreachable!(),
55 }
56 });
57 assert!(ran);
58 }
59
60 #[test]
select_biased()61 fn select_biased() {
62 let (tx1, rx1) = oneshot::channel::<i32>();
63 let (_tx2, rx2) = oneshot::channel::<i32>();
64 tx1.send(1).unwrap();
65 let mut ran = false;
66 block_on(async {
67 select_biased! {
68 res = rx1.fuse() => {
69 assert_eq!(Ok(1), res);
70 ran = true;
71 },
72 _ = rx2.fuse() => unreachable!(),
73 }
74 });
75 assert!(ran);
76 }
77
78 #[test]
select_streams()79 fn select_streams() {
80 let (mut tx1, rx1) = mpsc::channel::<i32>(1);
81 let (mut tx2, rx2) = mpsc::channel::<i32>(1);
82 let mut rx1 = rx1.fuse();
83 let mut rx2 = rx2.fuse();
84 let mut ran = false;
85 let mut total = 0;
86 block_on(async {
87 let mut tx1_opt;
88 let mut tx2_opt;
89 select! {
90 _ = rx1.next() => panic!(),
91 _ = rx2.next() => panic!(),
92 default => {
93 tx1.send(2).await.unwrap();
94 tx2.send(3).await.unwrap();
95 tx1_opt = Some(tx1);
96 tx2_opt = Some(tx2);
97 }
98 complete => panic!(),
99 }
100 loop {
101 select! {
102 // runs first and again after default
103 x = rx1.next() => if let Some(x) = x { total += x; },
104 // runs second and again after default
105 x = rx2.next() => if let Some(x) = x { total += x; },
106 // runs third
107 default => {
108 assert_eq!(total, 5);
109 ran = true;
110 drop(tx1_opt.take().unwrap());
111 drop(tx2_opt.take().unwrap());
112 },
113 // runs last
114 complete => break,
115 };
116 }
117 });
118 assert!(ran);
119 }
120
121 #[test]
select_can_move_uncompleted_futures()122 fn select_can_move_uncompleted_futures() {
123 let (tx1, rx1) = oneshot::channel::<i32>();
124 let (tx2, rx2) = oneshot::channel::<i32>();
125 tx1.send(1).unwrap();
126 tx2.send(2).unwrap();
127 let mut ran = false;
128 let mut rx1 = rx1.fuse();
129 let mut rx2 = rx2.fuse();
130 block_on(async {
131 select! {
132 res = rx1 => {
133 assert_eq!(Ok(1), res);
134 assert_eq!(Ok(2), rx2.await);
135 ran = true;
136 },
137 res = rx2 => {
138 assert_eq!(Ok(2), res);
139 assert_eq!(Ok(1), rx1.await);
140 ran = true;
141 },
142 }
143 });
144 assert!(ran);
145 }
146
147 #[test]
select_nested()148 fn select_nested() {
149 let mut outer_fut = future::ready(1);
150 let mut inner_fut = future::ready(2);
151 let res = block_on(async {
152 select! {
153 x = outer_fut => {
154 select! {
155 y = inner_fut => x + y,
156 }
157 }
158 }
159 });
160 assert_eq!(res, 3);
161 }
162
163 #[cfg_attr(not(target_pointer_width = "64"), ignore)]
164 #[test]
select_size()165 fn select_size() {
166 let fut = async {
167 let mut ready = future::ready(0i32);
168 select! {
169 _ = ready => {},
170 }
171 };
172 assert_eq!(mem::size_of_val(&fut), 24);
173
174 let fut = async {
175 let mut ready1 = future::ready(0i32);
176 let mut ready2 = future::ready(0i32);
177 select! {
178 _ = ready1 => {},
179 _ = ready2 => {},
180 }
181 };
182 assert_eq!(mem::size_of_val(&fut), 40);
183 }
184
185 #[test]
select_on_non_unpin_expressions()186 fn select_on_non_unpin_expressions() {
187 // The returned Future is !Unpin
188 let make_non_unpin_fut = || async { 5 };
189
190 let res = block_on(async {
191 let select_res;
192 select! {
193 value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
194 value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
195 };
196 select_res
197 });
198 assert_eq!(res, 5);
199 }
200
201 #[test]
select_on_non_unpin_expressions_with_default()202 fn select_on_non_unpin_expressions_with_default() {
203 // The returned Future is !Unpin
204 let make_non_unpin_fut = || async { 5 };
205
206 let res = block_on(async {
207 let select_res;
208 select! {
209 value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
210 value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
211 default => select_res = 7,
212 };
213 select_res
214 });
215 assert_eq!(res, 5);
216 }
217
218 #[cfg_attr(not(target_pointer_width = "64"), ignore)]
219 #[test]
select_on_non_unpin_size()220 fn select_on_non_unpin_size() {
221 // The returned Future is !Unpin
222 let make_non_unpin_fut = || async { 5 };
223
224 let fut = async {
225 let select_res;
226 select! {
227 value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
228 value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
229 };
230 select_res
231 };
232
233 assert_eq!(32, mem::size_of_val(&fut));
234 }
235
236 #[test]
select_can_be_used_as_expression()237 fn select_can_be_used_as_expression() {
238 block_on(async {
239 let res = select! {
240 x = future::ready(7) => x,
241 y = future::ready(3) => y + 1,
242 };
243 assert!(res == 7 || res == 4);
244 });
245 }
246
247 #[test]
select_with_default_can_be_used_as_expression()248 fn select_with_default_can_be_used_as_expression() {
249 fn poll_always_pending<T>(_cx: &mut Context<'_>) -> Poll<T> {
250 Poll::Pending
251 }
252
253 block_on(async {
254 let res = select! {
255 x = poll_fn(poll_always_pending::<i32>).fuse() => x,
256 y = poll_fn(poll_always_pending::<i32>).fuse() => y + 1,
257 default => 99,
258 };
259 assert_eq!(res, 99);
260 });
261 }
262
263 #[test]
select_with_complete_can_be_used_as_expression()264 fn select_with_complete_can_be_used_as_expression() {
265 block_on(async {
266 let res = select! {
267 x = future::pending::<i32>() => x,
268 y = future::pending::<i32>() => y + 1,
269 default => 99,
270 complete => 237,
271 };
272 assert_eq!(res, 237);
273 });
274 }
275
276 #[test]
277 #[allow(unused_assignments)]
select_on_mutable_borrowing_future_with_same_borrow_in_block()278 fn select_on_mutable_borrowing_future_with_same_borrow_in_block() {
279 async fn require_mutable(_: &mut i32) {}
280 async fn async_noop() {}
281
282 block_on(async {
283 let mut value = 234;
284 select! {
285 _ = require_mutable(&mut value).fuse() => { },
286 _ = async_noop().fuse() => {
287 value += 5;
288 },
289 }
290 });
291 }
292
293 #[test]
294 #[allow(unused_assignments)]
select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default()295 fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() {
296 async fn require_mutable(_: &mut i32) {}
297 async fn async_noop() {}
298
299 block_on(async {
300 let mut value = 234;
301 select! {
302 _ = require_mutable(&mut value).fuse() => { },
303 _ = async_noop().fuse() => {
304 value += 5;
305 },
306 default => {
307 value += 27;
308 },
309 }
310 });
311 }
312
313 #[test]
314 #[allow(unused_assignments)]
stream_select()315 fn stream_select() {
316 // stream_select! macro
317 block_on(async {
318 let endless_ints = |i| stream::iter(vec![i].into_iter().cycle());
319
320 let mut endless_ones = stream_select!(endless_ints(1i32), stream::pending());
321 assert_eq!(endless_ones.next().await, Some(1));
322 assert_eq!(endless_ones.next().await, Some(1));
323
324 let mut finite_list =
325 stream_select!(stream::iter(vec![1].into_iter()), stream::iter(vec![1].into_iter()));
326 assert_eq!(finite_list.next().await, Some(1));
327 assert_eq!(finite_list.next().await, Some(1));
328 assert_eq!(finite_list.next().await, None);
329
330 let endless_mixed = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3));
331 // Take 1000, and assert a somewhat even distribution of values.
332 // The fairness is randomized, but over 1000 samples we should be pretty close to even.
333 // This test may be a bit flaky. Feel free to adjust the margins as you see fit.
334 let mut count = 0;
335 let results = endless_mixed
336 .take_while(move |_| {
337 count += 1;
338 let ret = count < 1000;
339 async move { ret }
340 })
341 .collect::<Vec<_>>()
342 .await;
343 assert!(results.iter().filter(|x| **x == 1).count() >= 299);
344 assert!(results.iter().filter(|x| **x == 2).count() >= 299);
345 assert!(results.iter().filter(|x| **x == 3).count() >= 299);
346 });
347 }
348
349 #[test]
join_size()350 fn join_size() {
351 let fut = async {
352 let ready = future::ready(0i32);
353 join!(ready)
354 };
355 assert_eq!(mem::size_of_val(&fut), 16);
356
357 let fut = async {
358 let ready1 = future::ready(0i32);
359 let ready2 = future::ready(0i32);
360 join!(ready1, ready2)
361 };
362 assert_eq!(mem::size_of_val(&fut), 28);
363 }
364
365 #[test]
try_join_size()366 fn try_join_size() {
367 let fut = async {
368 let ready = future::ready(Ok::<i32, i32>(0));
369 try_join!(ready)
370 };
371 assert_eq!(mem::size_of_val(&fut), 16);
372
373 let fut = async {
374 let ready1 = future::ready(Ok::<i32, i32>(0));
375 let ready2 = future::ready(Ok::<i32, i32>(0));
376 try_join!(ready1, ready2)
377 };
378 assert_eq!(mem::size_of_val(&fut), 28);
379 }
380
381 #[test]
join_doesnt_require_unpin()382 fn join_doesnt_require_unpin() {
383 let _ = async { join!(async {}, async {}) };
384 }
385
386 #[test]
try_join_doesnt_require_unpin()387 fn try_join_doesnt_require_unpin() {
388 let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) };
389 }
390