1 use futures::channel::{mpsc, oneshot};
2 use futures::executor::{block_on, block_on_stream};
3 use futures::future::{poll_fn, FutureExt};
4 use futures::pin_mut;
5 use futures::sink::{Sink, SinkExt};
6 use futures::stream::{Stream, StreamExt};
7 use futures::task::{Context, Poll};
8 use futures_test::task::{new_count_waker, noop_context};
9 use std::sync::atomic::{AtomicUsize, Ordering};
10 use std::sync::{Arc, Mutex};
11 use std::thread;
12
13 trait AssertSend: Send {}
14 impl AssertSend for mpsc::Sender<i32> {}
15 impl AssertSend for mpsc::Receiver<i32> {}
16
17 #[test]
send_recv()18 fn send_recv() {
19 let (mut tx, rx) = mpsc::channel::<i32>(16);
20
21 block_on(tx.send(1)).unwrap();
22 drop(tx);
23 let v: Vec<_> = block_on(rx.collect());
24 assert_eq!(v, vec![1]);
25 }
26
27 #[test]
send_recv_no_buffer()28 fn send_recv_no_buffer() {
29 // Run on a task context
30 block_on(poll_fn(move |cx| {
31 let (tx, rx) = mpsc::channel::<i32>(0);
32 pin_mut!(tx, rx);
33
34 assert!(tx.as_mut().poll_flush(cx).is_ready());
35 assert!(tx.as_mut().poll_ready(cx).is_ready());
36
37 // Send first message
38 assert!(tx.as_mut().start_send(1).is_ok());
39 assert!(tx.as_mut().poll_ready(cx).is_pending());
40
41 // poll_ready said Pending, so no room in buffer, therefore new sends
42 // should get rejected with is_full.
43 assert!(tx.as_mut().start_send(0).unwrap_err().is_full());
44 assert!(tx.as_mut().poll_ready(cx).is_pending());
45
46 // Take the value
47 assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1)));
48 assert!(tx.as_mut().poll_ready(cx).is_ready());
49
50 // Send second message
51 assert!(tx.as_mut().poll_ready(cx).is_ready());
52 assert!(tx.as_mut().start_send(2).is_ok());
53 assert!(tx.as_mut().poll_ready(cx).is_pending());
54
55 // Take the value
56 assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2)));
57 assert!(tx.as_mut().poll_ready(cx).is_ready());
58
59 Poll::Ready(())
60 }));
61 }
62
63 #[test]
send_shared_recv()64 fn send_shared_recv() {
65 let (mut tx1, rx) = mpsc::channel::<i32>(16);
66 let mut rx = block_on_stream(rx);
67 let mut tx2 = tx1.clone();
68
69 block_on(tx1.send(1)).unwrap();
70 assert_eq!(rx.next(), Some(1));
71
72 block_on(tx2.send(2)).unwrap();
73 assert_eq!(rx.next(), Some(2));
74 }
75
76 #[test]
send_recv_threads()77 fn send_recv_threads() {
78 let (mut tx, rx) = mpsc::channel::<i32>(16);
79
80 let t = thread::spawn(move || {
81 block_on(tx.send(1)).unwrap();
82 });
83
84 let v: Vec<_> = block_on(rx.take(1).collect());
85 assert_eq!(v, vec![1]);
86
87 t.join().unwrap();
88 }
89
90 #[test]
send_recv_threads_no_capacity()91 fn send_recv_threads_no_capacity() {
92 let (mut tx, rx) = mpsc::channel::<i32>(0);
93
94 let t = thread::spawn(move || {
95 block_on(tx.send(1)).unwrap();
96 block_on(tx.send(2)).unwrap();
97 });
98
99 let v: Vec<_> = block_on(rx.collect());
100 assert_eq!(v, vec![1, 2]);
101
102 t.join().unwrap();
103 }
104
105 #[test]
recv_close_gets_none()106 fn recv_close_gets_none() {
107 let (mut tx, mut rx) = mpsc::channel::<i32>(10);
108
109 // Run on a task context
110 block_on(poll_fn(move |cx| {
111 rx.close();
112
113 assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None));
114 match tx.poll_ready(cx) {
115 Poll::Pending | Poll::Ready(Ok(_)) => panic!(),
116 Poll::Ready(Err(e)) => assert!(e.is_disconnected()),
117 };
118
119 Poll::Ready(())
120 }));
121 }
122
123 #[test]
tx_close_gets_none()124 fn tx_close_gets_none() {
125 let (_, mut rx) = mpsc::channel::<i32>(10);
126
127 // Run on a task context
128 block_on(poll_fn(move |cx| {
129 assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None));
130 Poll::Ready(())
131 }));
132 }
133
134 // #[test]
135 // fn spawn_sends_items() {
136 // let core = local_executor::Core::new();
137 // let stream = unfold(0, |i| Some(ok::<_,u8>((i, i + 1))));
138 // let rx = mpsc::spawn(stream, &core, 1);
139 // assert_eq!(core.run(rx.take(4).collect()).unwrap(),
140 // [0, 1, 2, 3]);
141 // }
142
143 // #[test]
144 // fn spawn_kill_dead_stream() {
145 // use std::thread;
146 // use std::time::Duration;
147 // use futures::future::Either;
148 // use futures::sync::oneshot;
149 //
150 // // a stream which never returns anything (maybe a remote end isn't
151 // // responding), but dropping it leads to observable side effects
152 // // (like closing connections, releasing limited resources, ...)
153 // #[derive(Debug)]
154 // struct Dead {
155 // // when dropped you should get Err(oneshot::Canceled) on the
156 // // receiving end
157 // done: oneshot::Sender<()>,
158 // }
159 // impl Stream for Dead {
160 // type Item = ();
161 // type Error = ();
162 //
163 // fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
164 // Ok(Poll::Pending)
165 // }
166 // }
167 //
168 // // need to implement a timeout for the test, as it would hang
169 // // forever right now
170 // let (timeout_tx, timeout_rx) = oneshot::channel();
171 // thread::spawn(move || {
172 // thread::sleep(Duration::from_millis(1000));
173 // let _ = timeout_tx.send(());
174 // });
175 //
176 // let core = local_executor::Core::new();
177 // let (done_tx, done_rx) = oneshot::channel();
178 // let stream = Dead{done: done_tx};
179 // let rx = mpsc::spawn(stream, &core, 1);
180 // let res = core.run(
181 // Ok::<_, ()>(())
182 // .into_future()
183 // .then(move |_| {
184 // // now drop the spawned stream: maybe some timeout exceeded,
185 // // or some connection on this end was closed by the remote
186 // // end.
187 // drop(rx);
188 // // and wait for the spawned stream to release its resources
189 // done_rx
190 // })
191 // .select2(timeout_rx)
192 // );
193 // match res {
194 // Err(Either::A((oneshot::Canceled, _))) => (),
195 // _ => {
196 // panic!("dead stream wasn't canceled");
197 // },
198 // }
199 // }
200
201 #[test]
stress_shared_unbounded()202 fn stress_shared_unbounded() {
203 #[cfg(miri)]
204 const AMT: u32 = 100;
205 #[cfg(not(miri))]
206 const AMT: u32 = 10000;
207 const NTHREADS: u32 = 8;
208 let (tx, rx) = mpsc::unbounded::<i32>();
209
210 let t = thread::spawn(move || {
211 let result: Vec<_> = block_on(rx.collect());
212 assert_eq!(result.len(), (AMT * NTHREADS) as usize);
213 for item in result {
214 assert_eq!(item, 1);
215 }
216 });
217
218 for _ in 0..NTHREADS {
219 let tx = tx.clone();
220
221 thread::spawn(move || {
222 for _ in 0..AMT {
223 tx.unbounded_send(1).unwrap();
224 }
225 });
226 }
227
228 drop(tx);
229
230 t.join().ok().unwrap();
231 }
232
233 #[test]
stress_shared_bounded_hard()234 fn stress_shared_bounded_hard() {
235 #[cfg(miri)]
236 const AMT: u32 = 100;
237 #[cfg(not(miri))]
238 const AMT: u32 = 10000;
239 const NTHREADS: u32 = 8;
240 let (tx, rx) = mpsc::channel::<i32>(0);
241
242 let t = thread::spawn(move || {
243 let result: Vec<_> = block_on(rx.collect());
244 assert_eq!(result.len(), (AMT * NTHREADS) as usize);
245 for item in result {
246 assert_eq!(item, 1);
247 }
248 });
249
250 for _ in 0..NTHREADS {
251 let mut tx = tx.clone();
252
253 thread::spawn(move || {
254 for _ in 0..AMT {
255 block_on(tx.send(1)).unwrap();
256 }
257 });
258 }
259
260 drop(tx);
261
262 t.join().unwrap();
263 }
264
265 #[allow(clippy::same_item_push)]
266 #[test]
stress_receiver_multi_task_bounded_hard()267 fn stress_receiver_multi_task_bounded_hard() {
268 #[cfg(miri)]
269 const AMT: usize = 100;
270 #[cfg(not(miri))]
271 const AMT: usize = 10_000;
272 const NTHREADS: u32 = 2;
273
274 let (mut tx, rx) = mpsc::channel::<usize>(0);
275 let rx = Arc::new(Mutex::new(Some(rx)));
276 let n = Arc::new(AtomicUsize::new(0));
277
278 let mut th = vec![];
279
280 for _ in 0..NTHREADS {
281 let rx = rx.clone();
282 let n = n.clone();
283
284 let t = thread::spawn(move || {
285 let mut i = 0;
286
287 loop {
288 i += 1;
289 let mut rx_opt = rx.lock().unwrap();
290 if let Some(rx) = &mut *rx_opt {
291 if i % 5 == 0 {
292 let item = block_on(rx.next());
293
294 if item.is_none() {
295 *rx_opt = None;
296 break;
297 }
298
299 n.fetch_add(1, Ordering::Relaxed);
300 } else {
301 // Just poll
302 let n = n.clone();
303 match rx.poll_next_unpin(&mut noop_context()) {
304 Poll::Ready(Some(_)) => {
305 n.fetch_add(1, Ordering::Relaxed);
306 }
307 Poll::Ready(None) => {
308 *rx_opt = None;
309 break;
310 }
311 Poll::Pending => {}
312 }
313 }
314 } else {
315 break;
316 }
317 }
318 });
319
320 th.push(t);
321 }
322
323 for i in 0..AMT {
324 block_on(tx.send(i)).unwrap();
325 }
326 drop(tx);
327
328 for t in th {
329 t.join().unwrap();
330 }
331
332 assert_eq!(AMT, n.load(Ordering::Relaxed));
333 }
334
335 /// Stress test that receiver properly receives all the messages
336 /// after sender dropped.
337 #[test]
stress_drop_sender()338 fn stress_drop_sender() {
339 #[cfg(miri)]
340 const ITER: usize = 100;
341 #[cfg(not(miri))]
342 const ITER: usize = 10000;
343
344 fn list() -> impl Stream<Item = i32> {
345 let (tx, rx) = mpsc::channel(1);
346 thread::spawn(move || {
347 block_on(send_one_two_three(tx));
348 });
349 rx
350 }
351
352 for _ in 0..ITER {
353 let v: Vec<_> = block_on(list().collect());
354 assert_eq!(v, vec![1, 2, 3]);
355 }
356 }
357
send_one_two_three(mut tx: mpsc::Sender<i32>)358 async fn send_one_two_three(mut tx: mpsc::Sender<i32>) {
359 for i in 1..=3 {
360 tx.send(i).await.unwrap();
361 }
362 }
363
364 /// Stress test that after receiver dropped,
365 /// no messages are lost.
stress_close_receiver_iter()366 fn stress_close_receiver_iter() {
367 let (tx, rx) = mpsc::unbounded();
368 let mut rx = block_on_stream(rx);
369 let (unwritten_tx, unwritten_rx) = std::sync::mpsc::channel();
370 let th = thread::spawn(move || {
371 for i in 1.. {
372 if tx.unbounded_send(i).is_err() {
373 unwritten_tx.send(i).expect("unwritten_tx");
374 return;
375 }
376 }
377 });
378
379 // Read one message to make sure thread effectively started
380 assert_eq!(Some(1), rx.next());
381
382 rx.close();
383
384 for i in 2.. {
385 match rx.next() {
386 Some(r) => assert!(i == r),
387 None => {
388 let unwritten = unwritten_rx.recv().expect("unwritten_rx");
389 assert_eq!(unwritten, i);
390 th.join().unwrap();
391 return;
392 }
393 }
394 }
395 }
396
397 #[cfg_attr(miri, ignore)] // Miri is too slow
398 #[test]
stress_close_receiver()399 fn stress_close_receiver() {
400 const ITER: usize = 10000;
401
402 for _ in 0..ITER {
403 stress_close_receiver_iter();
404 }
405 }
406
stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32)407 async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) {
408 for i in (1..=count).rev() {
409 sender.send(i).await.unwrap();
410 }
411 }
412
413 /// Tests that after `poll_ready` indicates capacity a channel can always send without waiting.
414 #[allow(clippy::same_item_push)]
415 #[test]
stress_poll_ready()416 fn stress_poll_ready() {
417 #[cfg(miri)]
418 const AMT: u32 = 100;
419 #[cfg(not(miri))]
420 const AMT: u32 = 1000;
421 const NTHREADS: u32 = 8;
422
423 /// Run a stress test using the specified channel capacity.
424 fn stress(capacity: usize) {
425 let (tx, rx) = mpsc::channel(capacity);
426 let mut threads = Vec::new();
427 for _ in 0..NTHREADS {
428 let sender = tx.clone();
429 threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT))));
430 }
431 drop(tx);
432
433 let result: Vec<_> = block_on(rx.collect());
434 assert_eq!(result.len() as u32, AMT * NTHREADS);
435
436 for thread in threads {
437 thread.join().unwrap();
438 }
439 }
440
441 stress(0);
442 stress(1);
443 stress(8);
444 stress(16);
445 }
446
447 #[cfg_attr(miri, ignore)] // Miri is too slow
448 #[test]
try_send_1()449 fn try_send_1() {
450 const N: usize = 3000;
451 let (mut tx, rx) = mpsc::channel(0);
452
453 let t = thread::spawn(move || {
454 for i in 0..N {
455 loop {
456 if tx.try_send(i).is_ok() {
457 break;
458 }
459 }
460 }
461 });
462
463 let result: Vec<_> = block_on(rx.collect());
464 for (i, j) in result.into_iter().enumerate() {
465 assert_eq!(i, j);
466 }
467
468 t.join().unwrap();
469 }
470
471 #[test]
try_send_2()472 fn try_send_2() {
473 let (mut tx, rx) = mpsc::channel(0);
474 let mut rx = block_on_stream(rx);
475
476 tx.try_send("hello").unwrap();
477
478 let (readytx, readyrx) = oneshot::channel::<()>();
479
480 let th = thread::spawn(move || {
481 block_on(poll_fn(|cx| {
482 assert!(tx.poll_ready(cx).is_pending());
483 Poll::Ready(())
484 }));
485
486 drop(readytx);
487 block_on(tx.send("goodbye")).unwrap();
488 });
489
490 let _ = block_on(readyrx);
491 assert_eq!(rx.next(), Some("hello"));
492 assert_eq!(rx.next(), Some("goodbye"));
493 assert_eq!(rx.next(), None);
494
495 th.join().unwrap();
496 }
497
498 #[test]
try_send_fail()499 fn try_send_fail() {
500 let (mut tx, rx) = mpsc::channel(0);
501 let mut rx = block_on_stream(rx);
502
503 tx.try_send("hello").unwrap();
504
505 // This should fail
506 assert!(tx.try_send("fail").is_err());
507
508 assert_eq!(rx.next(), Some("hello"));
509
510 tx.try_send("goodbye").unwrap();
511 drop(tx);
512
513 assert_eq!(rx.next(), Some("goodbye"));
514 assert_eq!(rx.next(), None);
515 }
516
517 #[test]
try_send_recv()518 fn try_send_recv() {
519 let (mut tx, mut rx) = mpsc::channel(1);
520 tx.try_send("hello").unwrap();
521 tx.try_send("hello").unwrap();
522 tx.try_send("hello").unwrap_err(); // should be full
523 rx.try_next().unwrap();
524 rx.try_next().unwrap();
525 rx.try_next().unwrap_err(); // should be empty
526 tx.try_send("hello").unwrap();
527 rx.try_next().unwrap();
528 rx.try_next().unwrap_err(); // should be empty
529 }
530
531 #[test]
same_receiver()532 fn same_receiver() {
533 let (mut txa1, _) = mpsc::channel::<i32>(1);
534 let txa2 = txa1.clone();
535
536 let (mut txb1, _) = mpsc::channel::<i32>(1);
537 let txb2 = txb1.clone();
538
539 assert!(txa1.same_receiver(&txa2));
540 assert!(txb1.same_receiver(&txb2));
541 assert!(!txa1.same_receiver(&txb1));
542
543 txa1.disconnect();
544 txb1.close_channel();
545
546 assert!(!txa1.same_receiver(&txa2));
547 assert!(txb1.same_receiver(&txb2));
548 }
549
550 #[test]
is_connected_to()551 fn is_connected_to() {
552 let (txa, rxa) = mpsc::channel::<i32>(1);
553 let (txb, rxb) = mpsc::channel::<i32>(1);
554
555 assert!(txa.is_connected_to(&rxa));
556 assert!(txb.is_connected_to(&rxb));
557 assert!(!txa.is_connected_to(&rxb));
558 assert!(!txb.is_connected_to(&rxa));
559 }
560
561 #[test]
hash_receiver()562 fn hash_receiver() {
563 use std::collections::hash_map::DefaultHasher;
564 use std::hash::Hasher;
565
566 let mut hasher_a1 = DefaultHasher::new();
567 let mut hasher_a2 = DefaultHasher::new();
568 let mut hasher_b1 = DefaultHasher::new();
569 let mut hasher_b2 = DefaultHasher::new();
570 let (mut txa1, _) = mpsc::channel::<i32>(1);
571 let txa2 = txa1.clone();
572
573 let (mut txb1, _) = mpsc::channel::<i32>(1);
574 let txb2 = txb1.clone();
575
576 txa1.hash_receiver(&mut hasher_a1);
577 let hash_a1 = hasher_a1.finish();
578 txa2.hash_receiver(&mut hasher_a2);
579 let hash_a2 = hasher_a2.finish();
580 txb1.hash_receiver(&mut hasher_b1);
581 let hash_b1 = hasher_b1.finish();
582 txb2.hash_receiver(&mut hasher_b2);
583 let hash_b2 = hasher_b2.finish();
584
585 assert_eq!(hash_a1, hash_a2);
586 assert_eq!(hash_b1, hash_b2);
587 assert!(hash_a1 != hash_b1);
588
589 txa1.disconnect();
590 txb1.close_channel();
591
592 let mut hasher_a1 = DefaultHasher::new();
593 let mut hasher_a2 = DefaultHasher::new();
594 let mut hasher_b1 = DefaultHasher::new();
595 let mut hasher_b2 = DefaultHasher::new();
596
597 txa1.hash_receiver(&mut hasher_a1);
598 let hash_a1 = hasher_a1.finish();
599 txa2.hash_receiver(&mut hasher_a2);
600 let hash_a2 = hasher_a2.finish();
601 txb1.hash_receiver(&mut hasher_b1);
602 let hash_b1 = hasher_b1.finish();
603 txb2.hash_receiver(&mut hasher_b2);
604 let hash_b2 = hasher_b2.finish();
605
606 assert!(hash_a1 != hash_a2);
607 assert_eq!(hash_b1, hash_b2);
608 }
609
610 #[test]
send_backpressure()611 fn send_backpressure() {
612 let (waker, counter) = new_count_waker();
613 let mut cx = Context::from_waker(&waker);
614
615 let (mut tx, mut rx) = mpsc::channel(1);
616 block_on(tx.send(1)).unwrap();
617
618 let mut task = tx.send(2);
619 assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
620 assert_eq!(counter, 0);
621
622 let item = block_on(rx.next()).unwrap();
623 assert_eq!(item, 1);
624 assert_eq!(counter, 1);
625 assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
626
627 let item = block_on(rx.next()).unwrap();
628 assert_eq!(item, 2);
629 }
630
631 #[test]
send_backpressure_multi_senders()632 fn send_backpressure_multi_senders() {
633 let (waker, counter) = new_count_waker();
634 let mut cx = Context::from_waker(&waker);
635
636 let (mut tx1, mut rx) = mpsc::channel(1);
637 let mut tx2 = tx1.clone();
638 block_on(tx1.send(1)).unwrap();
639
640 let mut task = tx2.send(2);
641 assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
642 assert_eq!(counter, 0);
643
644 let item = block_on(rx.next()).unwrap();
645 assert_eq!(item, 1);
646 assert_eq!(counter, 1);
647 assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
648
649 let item = block_on(rx.next()).unwrap();
650 assert_eq!(item, 2);
651 }
652