1 #![allow(clippy::redundant_clone)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 
5 use std::thread;
6 use tokio::runtime::Runtime;
7 use tokio::sync::mpsc;
8 use tokio::sync::mpsc::error::TrySendError;
9 use tokio_test::task;
10 use tokio_test::{
11     assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
12 };
13 
14 use std::sync::Arc;
15 
16 mod support {
17     pub(crate) mod mpsc_stream;
18 }
19 
20 trait AssertSend: Send {}
21 impl AssertSend for mpsc::Sender<i32> {}
22 impl AssertSend for mpsc::Receiver<i32> {}
23 
24 #[tokio::test]
send_recv_with_buffer()25 async fn send_recv_with_buffer() {
26     let (tx, mut rx) = mpsc::channel::<i32>(16);
27 
28     // Using poll_ready / try_send
29     // let permit assert_ready_ok!(tx.reserve());
30     let permit = tx.reserve().await.unwrap();
31     permit.send(1);
32 
33     // Without poll_ready
34     tx.try_send(2).unwrap();
35 
36     drop(tx);
37 
38     let val = rx.recv().await;
39     assert_eq!(val, Some(1));
40 
41     let val = rx.recv().await;
42     assert_eq!(val, Some(2));
43 
44     let val = rx.recv().await;
45     assert!(val.is_none());
46 }
47 
48 #[tokio::test]
reserve_disarm()49 async fn reserve_disarm() {
50     let (tx, mut rx) = mpsc::channel::<i32>(2);
51     let tx1 = tx.clone();
52     let tx2 = tx.clone();
53     let tx3 = tx.clone();
54     let tx4 = tx;
55 
56     // We should be able to `poll_ready` two handles without problem
57     let permit1 = assert_ok!(tx1.reserve().await);
58     let permit2 = assert_ok!(tx2.reserve().await);
59 
60     // But a third should not be ready
61     let mut r3 = task::spawn(tx3.reserve());
62     assert_pending!(r3.poll());
63 
64     let mut r4 = task::spawn(tx4.reserve());
65     assert_pending!(r4.poll());
66 
67     // Using one of the reserved slots should allow a new handle to become ready
68     permit1.send(1);
69 
70     // We also need to receive for the slot to be free
71     assert!(!r3.is_woken());
72     rx.recv().await.unwrap();
73     // Now there's a free slot!
74     assert!(r3.is_woken());
75     assert!(!r4.is_woken());
76 
77     // Dropping a permit should also open up a slot
78     drop(permit2);
79     assert!(r4.is_woken());
80 
81     let mut r1 = task::spawn(tx1.reserve());
82     assert_pending!(r1.poll());
83 }
84 
85 #[tokio::test]
send_recv_stream_with_buffer()86 async fn send_recv_stream_with_buffer() {
87     use tokio_stream::StreamExt;
88 
89     let (tx, rx) = support::mpsc_stream::channel_stream::<i32>(16);
90     let mut rx = Box::pin(rx);
91 
92     tokio::spawn(async move {
93         assert_ok!(tx.send(1).await);
94         assert_ok!(tx.send(2).await);
95     });
96 
97     assert_eq!(Some(1), rx.next().await);
98     assert_eq!(Some(2), rx.next().await);
99     assert_eq!(None, rx.next().await);
100 }
101 
102 #[tokio::test]
async_send_recv_with_buffer()103 async fn async_send_recv_with_buffer() {
104     let (tx, mut rx) = mpsc::channel(16);
105 
106     tokio::spawn(async move {
107         assert_ok!(tx.send(1).await);
108         assert_ok!(tx.send(2).await);
109     });
110 
111     assert_eq!(Some(1), rx.recv().await);
112     assert_eq!(Some(2), rx.recv().await);
113     assert_eq!(None, rx.recv().await);
114 }
115 
116 #[tokio::test]
start_send_past_cap()117 async fn start_send_past_cap() {
118     use std::future::Future;
119 
120     let mut t1 = task::spawn(());
121 
122     let (tx1, mut rx) = mpsc::channel(1);
123     let tx2 = tx1.clone();
124 
125     assert_ok!(tx1.try_send(()));
126 
127     let mut r1 = Box::pin(tx1.reserve());
128     t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx)));
129 
130     {
131         let mut r2 = task::spawn(tx2.reserve());
132         assert_pending!(r2.poll());
133 
134         drop(r1);
135 
136         assert!(rx.recv().await.is_some());
137 
138         assert!(r2.is_woken());
139         assert!(!t1.is_woken());
140     }
141 
142     drop(tx1);
143     drop(tx2);
144 
145     assert!(rx.recv().await.is_none());
146 }
147 
148 #[test]
149 #[should_panic]
buffer_gteq_one()150 fn buffer_gteq_one() {
151     mpsc::channel::<i32>(0);
152 }
153 
154 #[tokio::test]
send_recv_unbounded()155 async fn send_recv_unbounded() {
156     let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
157 
158     // Using `try_send`
159     assert_ok!(tx.send(1));
160     assert_ok!(tx.send(2));
161 
162     assert_eq!(rx.recv().await, Some(1));
163     assert_eq!(rx.recv().await, Some(2));
164 
165     drop(tx);
166 
167     assert!(rx.recv().await.is_none());
168 }
169 
170 #[tokio::test]
async_send_recv_unbounded()171 async fn async_send_recv_unbounded() {
172     let (tx, mut rx) = mpsc::unbounded_channel();
173 
174     tokio::spawn(async move {
175         assert_ok!(tx.send(1));
176         assert_ok!(tx.send(2));
177     });
178 
179     assert_eq!(Some(1), rx.recv().await);
180     assert_eq!(Some(2), rx.recv().await);
181     assert_eq!(None, rx.recv().await);
182 }
183 
184 #[tokio::test]
send_recv_stream_unbounded()185 async fn send_recv_stream_unbounded() {
186     use tokio_stream::StreamExt;
187 
188     let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<i32>();
189 
190     let mut rx = Box::pin(rx);
191 
192     tokio::spawn(async move {
193         assert_ok!(tx.send(1));
194         assert_ok!(tx.send(2));
195     });
196 
197     assert_eq!(Some(1), rx.next().await);
198     assert_eq!(Some(2), rx.next().await);
199     assert_eq!(None, rx.next().await);
200 }
201 
202 #[tokio::test]
no_t_bounds_buffer()203 async fn no_t_bounds_buffer() {
204     struct NoImpls;
205 
206     let (tx, mut rx) = mpsc::channel(100);
207 
208     // sender should be Debug even though T isn't Debug
209     println!("{:?}", tx);
210     // same with Receiver
211     println!("{:?}", rx);
212     // and sender should be Clone even though T isn't Clone
213     assert!(tx.clone().try_send(NoImpls).is_ok());
214 
215     assert!(rx.recv().await.is_some());
216 }
217 
218 #[tokio::test]
no_t_bounds_unbounded()219 async fn no_t_bounds_unbounded() {
220     struct NoImpls;
221 
222     let (tx, mut rx) = mpsc::unbounded_channel();
223 
224     // sender should be Debug even though T isn't Debug
225     println!("{:?}", tx);
226     // same with Receiver
227     println!("{:?}", rx);
228     // and sender should be Clone even though T isn't Clone
229     assert!(tx.clone().send(NoImpls).is_ok());
230 
231     assert!(rx.recv().await.is_some());
232 }
233 
234 #[tokio::test]
send_recv_buffer_limited()235 async fn send_recv_buffer_limited() {
236     let (tx, mut rx) = mpsc::channel::<i32>(1);
237 
238     // Reserve capacity
239     let p1 = assert_ok!(tx.reserve().await);
240 
241     // Send first message
242     p1.send(1);
243 
244     // Not ready
245     let mut p2 = task::spawn(tx.reserve());
246     assert_pending!(p2.poll());
247 
248     // Take the value
249     assert!(rx.recv().await.is_some());
250 
251     // Notified
252     assert!(p2.is_woken());
253 
254     // Trying to send fails
255     assert_err!(tx.try_send(1337));
256 
257     // Send second
258     let permit = assert_ready_ok!(p2.poll());
259     permit.send(2);
260 
261     assert!(rx.recv().await.is_some());
262 }
263 
264 #[tokio::test]
recv_close_gets_none_idle()265 async fn recv_close_gets_none_idle() {
266     let (tx, mut rx) = mpsc::channel::<i32>(10);
267 
268     rx.close();
269 
270     assert!(rx.recv().await.is_none());
271 
272     assert_err!(tx.send(1).await);
273 }
274 
275 #[tokio::test]
recv_close_gets_none_reserved()276 async fn recv_close_gets_none_reserved() {
277     let (tx1, mut rx) = mpsc::channel::<i32>(1);
278     let tx2 = tx1.clone();
279 
280     let permit1 = assert_ok!(tx1.reserve().await);
281     let mut permit2 = task::spawn(tx2.reserve());
282     assert_pending!(permit2.poll());
283 
284     rx.close();
285 
286     assert!(permit2.is_woken());
287     assert_ready_err!(permit2.poll());
288 
289     {
290         let mut recv = task::spawn(rx.recv());
291         assert_pending!(recv.poll());
292 
293         permit1.send(123);
294         assert!(recv.is_woken());
295 
296         let v = assert_ready!(recv.poll());
297         assert_eq!(v, Some(123));
298     }
299 
300     assert!(rx.recv().await.is_none());
301 }
302 
303 #[tokio::test]
tx_close_gets_none()304 async fn tx_close_gets_none() {
305     let (_, mut rx) = mpsc::channel::<i32>(10);
306     assert!(rx.recv().await.is_none());
307 }
308 
309 #[tokio::test]
try_send_fail()310 async fn try_send_fail() {
311     let (tx, mut rx) = mpsc::channel(1);
312 
313     tx.try_send("hello").unwrap();
314 
315     // This should fail
316     match assert_err!(tx.try_send("fail")) {
317         TrySendError::Full(..) => {}
318         _ => panic!(),
319     }
320 
321     assert_eq!(rx.recv().await, Some("hello"));
322 
323     assert_ok!(tx.try_send("goodbye"));
324     drop(tx);
325 
326     assert_eq!(rx.recv().await, Some("goodbye"));
327     assert!(rx.recv().await.is_none());
328 }
329 
330 #[tokio::test]
try_reserve_fails()331 async fn try_reserve_fails() {
332     let (tx, mut rx) = mpsc::channel(1);
333 
334     let permit = tx.try_reserve().unwrap();
335 
336     // This should fail
337     match assert_err!(tx.try_reserve()) {
338         TrySendError::Full(()) => {}
339         _ => panic!(),
340     }
341 
342     permit.send("foo");
343 
344     assert_eq!(rx.recv().await, Some("foo"));
345 
346     // Dropping permit releases the slot.
347     let permit = tx.try_reserve().unwrap();
348     drop(permit);
349 
350     let _permit = tx.try_reserve().unwrap();
351 }
352 
353 #[tokio::test]
drop_permit_releases_permit()354 async fn drop_permit_releases_permit() {
355     // poll_ready reserves capacity, ensure that the capacity is released if tx
356     // is dropped w/o sending a value.
357     let (tx1, _rx) = mpsc::channel::<i32>(1);
358     let tx2 = tx1.clone();
359 
360     let permit = assert_ok!(tx1.reserve().await);
361 
362     let mut reserve2 = task::spawn(tx2.reserve());
363     assert_pending!(reserve2.poll());
364 
365     drop(permit);
366 
367     assert!(reserve2.is_woken());
368     assert_ready_ok!(reserve2.poll());
369 }
370 
371 #[tokio::test]
dropping_rx_closes_channel()372 async fn dropping_rx_closes_channel() {
373     let (tx, rx) = mpsc::channel(100);
374 
375     let msg = Arc::new(());
376     assert_ok!(tx.try_send(msg.clone()));
377 
378     drop(rx);
379     assert_err!(tx.reserve().await);
380     assert_eq!(1, Arc::strong_count(&msg));
381 }
382 
383 #[test]
dropping_rx_closes_channel_for_try()384 fn dropping_rx_closes_channel_for_try() {
385     let (tx, rx) = mpsc::channel(100);
386 
387     let msg = Arc::new(());
388     tx.try_send(msg.clone()).unwrap();
389 
390     drop(rx);
391 
392     {
393         let err = assert_err!(tx.try_send(msg.clone()));
394         match err {
395             TrySendError::Closed(..) => {}
396             _ => panic!(),
397         }
398     }
399 
400     assert_eq!(1, Arc::strong_count(&msg));
401 }
402 
403 #[test]
unconsumed_messages_are_dropped()404 fn unconsumed_messages_are_dropped() {
405     let msg = Arc::new(());
406 
407     let (tx, rx) = mpsc::channel(100);
408 
409     tx.try_send(msg.clone()).unwrap();
410 
411     assert_eq!(2, Arc::strong_count(&msg));
412 
413     drop((tx, rx));
414 
415     assert_eq!(1, Arc::strong_count(&msg));
416 }
417 
418 #[test]
blocking_recv()419 fn blocking_recv() {
420     let (tx, mut rx) = mpsc::channel::<u8>(1);
421 
422     let sync_code = thread::spawn(move || {
423         assert_eq!(Some(10), rx.blocking_recv());
424     });
425 
426     Runtime::new().unwrap().block_on(async move {
427         let _ = tx.send(10).await;
428     });
429     sync_code.join().unwrap()
430 }
431 
432 #[tokio::test]
433 #[should_panic]
blocking_recv_async()434 async fn blocking_recv_async() {
435     let (_tx, mut rx) = mpsc::channel::<()>(1);
436     let _ = rx.blocking_recv();
437 }
438 
439 #[test]
blocking_send()440 fn blocking_send() {
441     let (tx, mut rx) = mpsc::channel::<u8>(1);
442 
443     let sync_code = thread::spawn(move || {
444         tx.blocking_send(10).unwrap();
445     });
446 
447     Runtime::new().unwrap().block_on(async move {
448         assert_eq!(Some(10), rx.recv().await);
449     });
450     sync_code.join().unwrap()
451 }
452 
453 #[tokio::test]
454 #[should_panic]
blocking_send_async()455 async fn blocking_send_async() {
456     let (tx, _rx) = mpsc::channel::<()>(1);
457     let _ = tx.blocking_send(());
458 }
459 
460 #[tokio::test]
ready_close_cancel_bounded()461 async fn ready_close_cancel_bounded() {
462     let (tx, mut rx) = mpsc::channel::<()>(100);
463     let _tx2 = tx.clone();
464 
465     let permit = assert_ok!(tx.reserve().await);
466 
467     rx.close();
468 
469     let mut recv = task::spawn(rx.recv());
470     assert_pending!(recv.poll());
471 
472     drop(permit);
473 
474     assert!(recv.is_woken());
475     let val = assert_ready!(recv.poll());
476     assert!(val.is_none());
477 }
478 
479 #[tokio::test]
permit_available_not_acquired_close()480 async fn permit_available_not_acquired_close() {
481     let (tx1, mut rx) = mpsc::channel::<()>(1);
482     let tx2 = tx1.clone();
483 
484     let permit1 = assert_ok!(tx1.reserve().await);
485 
486     let mut permit2 = task::spawn(tx2.reserve());
487     assert_pending!(permit2.poll());
488 
489     rx.close();
490 
491     drop(permit1);
492     assert!(permit2.is_woken());
493 
494     drop(permit2);
495     assert!(rx.recv().await.is_none());
496 }
497