1 #![allow(clippy::redundant_clone)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4
5 use tokio::sync::mpsc;
6 use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
7 use tokio_test::task;
8 use tokio_test::{
9 assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
10 };
11
12 use std::sync::Arc;
13
14 trait AssertSend: Send {}
15 impl AssertSend for mpsc::Sender<i32> {}
16 impl AssertSend for mpsc::Receiver<i32> {}
17
18 #[test]
send_recv_with_buffer()19 fn send_recv_with_buffer() {
20 let (tx, rx) = mpsc::channel::<i32>(16);
21 let mut tx = task::spawn(tx);
22 let mut rx = task::spawn(rx);
23
24 // Using poll_ready / try_send
25 assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx)));
26 tx.try_send(1).unwrap();
27
28 // Without poll_ready
29 tx.try_send(2).unwrap();
30
31 drop(tx);
32
33 let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
34 assert_eq!(val, Some(1));
35
36 let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
37 assert_eq!(val, Some(2));
38
39 let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
40 assert!(val.is_none());
41 }
42
43 #[test]
disarm()44 fn disarm() {
45 let (tx, rx) = mpsc::channel::<i32>(2);
46 let mut tx1 = task::spawn(tx.clone());
47 let mut tx2 = task::spawn(tx.clone());
48 let mut tx3 = task::spawn(tx.clone());
49 let mut tx4 = task::spawn(tx);
50 let mut rx = task::spawn(rx);
51
52 // We should be able to `poll_ready` two handles without problem
53 assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
54 assert_ready_ok!(tx2.enter(|cx, mut tx| tx.poll_ready(cx)));
55
56 // But a third should not be ready
57 assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
58
59 // Using one of the reserved slots should allow a new handle to become ready
60 tx1.try_send(1).unwrap();
61 // We also need to receive for the slot to be free
62 let _ = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))).unwrap();
63 // Now there's a free slot!
64 assert_ready_ok!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
65 assert_pending!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));
66
67 // Dropping a ready handle should also open up a slot
68 drop(tx2);
69 assert_ready_ok!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));
70 assert_pending!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
71
72 // Explicitly disarming a handle should also open a slot
73 assert!(tx3.disarm());
74 assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
75
76 // Disarming a non-armed sender does not free up a slot
77 assert!(!tx3.disarm());
78 assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
79 }
80
81 #[tokio::test]
send_recv_stream_with_buffer()82 async fn send_recv_stream_with_buffer() {
83 use tokio::stream::StreamExt;
84
85 let (mut tx, mut rx) = mpsc::channel::<i32>(16);
86
87 tokio::spawn(async move {
88 assert_ok!(tx.send(1).await);
89 assert_ok!(tx.send(2).await);
90 });
91
92 assert_eq!(Some(1), rx.next().await);
93 assert_eq!(Some(2), rx.next().await);
94 assert_eq!(None, rx.next().await);
95 }
96
97 #[tokio::test]
async_send_recv_with_buffer()98 async fn async_send_recv_with_buffer() {
99 let (mut tx, mut rx) = mpsc::channel(16);
100
101 tokio::spawn(async move {
102 assert_ok!(tx.send(1).await);
103 assert_ok!(tx.send(2).await);
104 });
105
106 assert_eq!(Some(1), rx.recv().await);
107 assert_eq!(Some(2), rx.recv().await);
108 assert_eq!(None, rx.recv().await);
109 }
110
111 #[test]
start_send_past_cap()112 fn start_send_past_cap() {
113 let mut t1 = task::spawn(());
114 let mut t2 = task::spawn(());
115 let mut t3 = task::spawn(());
116
117 let (mut tx1, mut rx) = mpsc::channel(1);
118 let mut tx2 = tx1.clone();
119
120 assert_ok!(tx1.try_send(()));
121
122 t1.enter(|cx, _| {
123 assert_pending!(tx1.poll_ready(cx));
124 });
125
126 t2.enter(|cx, _| {
127 assert_pending!(tx2.poll_ready(cx));
128 });
129
130 drop(tx1);
131
132 let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
133 assert!(val.is_some());
134
135 assert!(t2.is_woken());
136 assert!(!t1.is_woken());
137
138 drop(tx2);
139
140 let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
141 assert!(val.is_none());
142 }
143
144 #[test]
145 #[should_panic]
buffer_gteq_one()146 fn buffer_gteq_one() {
147 mpsc::channel::<i32>(0);
148 }
149
150 #[test]
send_recv_unbounded()151 fn send_recv_unbounded() {
152 let mut t1 = task::spawn(());
153
154 let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
155
156 // Using `try_send`
157 assert_ok!(tx.send(1));
158 assert_ok!(tx.send(2));
159
160 let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
161 assert_eq!(val, Some(1));
162
163 let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
164 assert_eq!(val, Some(2));
165
166 drop(tx);
167
168 let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
169 assert!(val.is_none());
170 }
171
172 #[tokio::test]
async_send_recv_unbounded()173 async fn async_send_recv_unbounded() {
174 let (tx, mut rx) = mpsc::unbounded_channel();
175
176 tokio::spawn(async move {
177 assert_ok!(tx.send(1));
178 assert_ok!(tx.send(2));
179 });
180
181 assert_eq!(Some(1), rx.recv().await);
182 assert_eq!(Some(2), rx.recv().await);
183 assert_eq!(None, rx.recv().await);
184 }
185
186 #[tokio::test]
send_recv_stream_unbounded()187 async fn send_recv_stream_unbounded() {
188 use tokio::stream::StreamExt;
189
190 let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
191
192 tokio::spawn(async move {
193 assert_ok!(tx.send(1));
194 assert_ok!(tx.send(2));
195 });
196
197 assert_eq!(Some(1), rx.next().await);
198 assert_eq!(Some(2), rx.next().await);
199 assert_eq!(None, rx.next().await);
200 }
201
202 #[test]
no_t_bounds_buffer()203 fn no_t_bounds_buffer() {
204 struct NoImpls;
205
206 let mut t1 = task::spawn(());
207 let (tx, mut rx) = mpsc::channel(100);
208
209 // sender should be Debug even though T isn't Debug
210 println!("{:?}", tx);
211 // same with Receiver
212 println!("{:?}", rx);
213 // and sender should be Clone even though T isn't Clone
214 assert!(tx.clone().try_send(NoImpls).is_ok());
215
216 let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
217 assert!(val.is_some());
218 }
219
220 #[test]
no_t_bounds_unbounded()221 fn no_t_bounds_unbounded() {
222 struct NoImpls;
223
224 let mut t1 = task::spawn(());
225 let (tx, mut rx) = mpsc::unbounded_channel();
226
227 // sender should be Debug even though T isn't Debug
228 println!("{:?}", tx);
229 // same with Receiver
230 println!("{:?}", rx);
231 // and sender should be Clone even though T isn't Clone
232 assert!(tx.clone().send(NoImpls).is_ok());
233
234 let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
235 assert!(val.is_some());
236 }
237
238 #[test]
send_recv_buffer_limited()239 fn send_recv_buffer_limited() {
240 let mut t1 = task::spawn(());
241 let mut t2 = task::spawn(());
242
243 let (mut tx, mut rx) = mpsc::channel::<i32>(1);
244
245 // Run on a task context
246 t1.enter(|cx, _| {
247 assert_ready_ok!(tx.poll_ready(cx));
248
249 // Send first message
250 assert_ok!(tx.try_send(1));
251
252 // Not ready
253 assert_pending!(tx.poll_ready(cx));
254
255 // Send second message
256 assert_err!(tx.try_send(1337));
257 });
258
259 t2.enter(|cx, _| {
260 // Take the value
261 let val = assert_ready!(rx.poll_recv(cx));
262 assert_eq!(Some(1), val);
263 });
264
265 assert!(t1.is_woken());
266
267 t1.enter(|cx, _| {
268 assert_ready_ok!(tx.poll_ready(cx));
269
270 assert_ok!(tx.try_send(2));
271
272 // Not ready
273 assert_pending!(tx.poll_ready(cx));
274 });
275
276 t2.enter(|cx, _| {
277 // Take the value
278 let val = assert_ready!(rx.poll_recv(cx));
279 assert_eq!(Some(2), val);
280 });
281
282 t1.enter(|cx, _| {
283 assert_ready_ok!(tx.poll_ready(cx));
284 });
285 }
286
287 #[test]
recv_close_gets_none_idle()288 fn recv_close_gets_none_idle() {
289 let mut t1 = task::spawn(());
290
291 let (mut tx, mut rx) = mpsc::channel::<i32>(10);
292
293 rx.close();
294
295 t1.enter(|cx, _| {
296 let val = assert_ready!(rx.poll_recv(cx));
297 assert!(val.is_none());
298 assert_ready_err!(tx.poll_ready(cx));
299 });
300 }
301
302 #[test]
recv_close_gets_none_reserved()303 fn recv_close_gets_none_reserved() {
304 let mut t1 = task::spawn(());
305 let mut t2 = task::spawn(());
306 let mut t3 = task::spawn(());
307
308 let (mut tx1, mut rx) = mpsc::channel::<i32>(1);
309 let mut tx2 = tx1.clone();
310
311 assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));
312
313 t2.enter(|cx, _| {
314 assert_pending!(tx2.poll_ready(cx));
315 });
316
317 rx.close();
318
319 assert!(t2.is_woken());
320
321 t2.enter(|cx, _| {
322 assert_ready_err!(tx2.poll_ready(cx));
323 });
324
325 t3.enter(|cx, _| assert_pending!(rx.poll_recv(cx)));
326
327 assert!(!t1.is_woken());
328 assert!(!t2.is_woken());
329
330 assert_ok!(tx1.try_send(123));
331
332 assert!(t3.is_woken());
333
334 t3.enter(|cx, _| {
335 let v = assert_ready!(rx.poll_recv(cx));
336 assert_eq!(v, Some(123));
337
338 let v = assert_ready!(rx.poll_recv(cx));
339 assert!(v.is_none());
340 });
341 }
342
343 #[test]
tx_close_gets_none()344 fn tx_close_gets_none() {
345 let mut t1 = task::spawn(());
346
347 let (_, mut rx) = mpsc::channel::<i32>(10);
348
349 // Run on a task context
350 t1.enter(|cx, _| {
351 let v = assert_ready!(rx.poll_recv(cx));
352 assert!(v.is_none());
353 });
354 }
355
356 #[test]
try_send_fail()357 fn try_send_fail() {
358 let mut t1 = task::spawn(());
359
360 let (mut tx, mut rx) = mpsc::channel(1);
361
362 tx.try_send("hello").unwrap();
363
364 // This should fail
365 match assert_err!(tx.try_send("fail")) {
366 TrySendError::Full(..) => {}
367 _ => panic!(),
368 }
369
370 let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
371 assert_eq!(val, Some("hello"));
372
373 assert_ok!(tx.try_send("goodbye"));
374 drop(tx);
375
376 let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
377 assert_eq!(val, Some("goodbye"));
378
379 let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
380 assert!(val.is_none());
381 }
382
383 #[test]
drop_tx_with_permit_releases_permit()384 fn drop_tx_with_permit_releases_permit() {
385 let mut t1 = task::spawn(());
386 let mut t2 = task::spawn(());
387
388 // poll_ready reserves capacity, ensure that the capacity is released if tx
389 // is dropped w/o sending a value.
390 let (mut tx1, _rx) = mpsc::channel::<i32>(1);
391 let mut tx2 = tx1.clone();
392
393 assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));
394
395 t2.enter(|cx, _| {
396 assert_pending!(tx2.poll_ready(cx));
397 });
398
399 drop(tx1);
400
401 assert!(t2.is_woken());
402
403 assert_ready_ok!(t2.enter(|cx, _| tx2.poll_ready(cx)));
404 }
405
406 #[test]
dropping_rx_closes_channel()407 fn dropping_rx_closes_channel() {
408 let mut t1 = task::spawn(());
409
410 let (mut tx, rx) = mpsc::channel(100);
411
412 let msg = Arc::new(());
413 assert_ok!(tx.try_send(msg.clone()));
414
415 drop(rx);
416 assert_ready_err!(t1.enter(|cx, _| tx.poll_ready(cx)));
417
418 assert_eq!(1, Arc::strong_count(&msg));
419 }
420
421 #[test]
dropping_rx_closes_channel_for_try()422 fn dropping_rx_closes_channel_for_try() {
423 let (mut tx, rx) = mpsc::channel(100);
424
425 let msg = Arc::new(());
426 tx.try_send(msg.clone()).unwrap();
427
428 drop(rx);
429
430 {
431 let err = assert_err!(tx.try_send(msg.clone()));
432 match err {
433 TrySendError::Closed(..) => {}
434 _ => panic!(),
435 }
436 }
437
438 assert_eq!(1, Arc::strong_count(&msg));
439 }
440
441 #[test]
unconsumed_messages_are_dropped()442 fn unconsumed_messages_are_dropped() {
443 let msg = Arc::new(());
444
445 let (mut tx, rx) = mpsc::channel(100);
446
447 tx.try_send(msg.clone()).unwrap();
448
449 assert_eq!(2, Arc::strong_count(&msg));
450
451 drop((tx, rx));
452
453 assert_eq!(1, Arc::strong_count(&msg));
454 }
455
456 #[test]
try_recv()457 fn try_recv() {
458 let (mut tx, mut rx) = mpsc::channel(1);
459 match rx.try_recv() {
460 Err(TryRecvError::Empty) => {}
461 _ => panic!(),
462 }
463 tx.try_send(42).unwrap();
464 match rx.try_recv() {
465 Ok(42) => {}
466 _ => panic!(),
467 }
468 drop(tx);
469 match rx.try_recv() {
470 Err(TryRecvError::Closed) => {}
471 _ => panic!(),
472 }
473 }
474
475 #[test]
try_recv_unbounded()476 fn try_recv_unbounded() {
477 let (tx, mut rx) = mpsc::unbounded_channel();
478 match rx.try_recv() {
479 Err(TryRecvError::Empty) => {}
480 _ => panic!(),
481 }
482 tx.send(42).unwrap();
483 match rx.try_recv() {
484 Ok(42) => {}
485 _ => panic!(),
486 }
487 drop(tx);
488 match rx.try_recv() {
489 Err(TryRecvError::Closed) => {}
490 _ => panic!(),
491 }
492 }
493
494 #[test]
ready_close_cancel_bounded()495 fn ready_close_cancel_bounded() {
496 use futures::future::poll_fn;
497
498 let (mut tx, mut rx) = mpsc::channel::<()>(100);
499 let _tx2 = tx.clone();
500
501 {
502 let mut ready = task::spawn(async { poll_fn(|cx| tx.poll_ready(cx)).await });
503 assert_ready_ok!(ready.poll());
504 }
505
506 rx.close();
507
508 let mut recv = task::spawn(async { rx.recv().await });
509 assert_pending!(recv.poll());
510
511 drop(tx);
512
513 assert!(recv.is_woken());
514 }
515
516 #[tokio::test]
permit_available_not_acquired_close()517 async fn permit_available_not_acquired_close() {
518 use futures::future::poll_fn;
519
520 let (mut tx1, mut rx) = mpsc::channel::<()>(1);
521 let mut tx2 = tx1.clone();
522
523 {
524 let mut ready = task::spawn(poll_fn(|cx| tx1.poll_ready(cx)));
525 assert_ready_ok!(ready.poll());
526 }
527
528 let mut ready = task::spawn(poll_fn(|cx| tx2.poll_ready(cx)));
529 assert_pending!(ready.poll());
530
531 rx.close();
532
533 drop(tx1);
534 assert!(ready.is_woken());
535
536 drop(tx2);
537 assert!(rx.recv().await.is_none());
538 }
539