1 #![allow(clippy::redundant_clone)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 
5 use tokio::sync::mpsc;
6 use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
7 use tokio_test::task;
8 use tokio_test::{
9     assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
10 };
11 
12 use std::sync::Arc;
13 
14 trait AssertSend: Send {}
15 impl AssertSend for mpsc::Sender<i32> {}
16 impl AssertSend for mpsc::Receiver<i32> {}
17 
18 #[test]
send_recv_with_buffer()19 fn send_recv_with_buffer() {
20     let (tx, rx) = mpsc::channel::<i32>(16);
21     let mut tx = task::spawn(tx);
22     let mut rx = task::spawn(rx);
23 
24     // Using poll_ready / try_send
25     assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx)));
26     tx.try_send(1).unwrap();
27 
28     // Without poll_ready
29     tx.try_send(2).unwrap();
30 
31     drop(tx);
32 
33     let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
34     assert_eq!(val, Some(1));
35 
36     let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
37     assert_eq!(val, Some(2));
38 
39     let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
40     assert!(val.is_none());
41 }
42 
43 #[test]
disarm()44 fn disarm() {
45     let (tx, rx) = mpsc::channel::<i32>(2);
46     let mut tx1 = task::spawn(tx.clone());
47     let mut tx2 = task::spawn(tx.clone());
48     let mut tx3 = task::spawn(tx.clone());
49     let mut tx4 = task::spawn(tx);
50     let mut rx = task::spawn(rx);
51 
52     // We should be able to `poll_ready` two handles without problem
53     assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
54     assert_ready_ok!(tx2.enter(|cx, mut tx| tx.poll_ready(cx)));
55 
56     // But a third should not be ready
57     assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
58 
59     // Using one of the reserved slots should allow a new handle to become ready
60     tx1.try_send(1).unwrap();
61     // We also need to receive for the slot to be free
62     let _ = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))).unwrap();
63     // Now there's a free slot!
64     assert_ready_ok!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
65     assert_pending!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));
66 
67     // Dropping a ready handle should also open up a slot
68     drop(tx2);
69     assert_ready_ok!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));
70     assert_pending!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
71 
72     // Explicitly disarming a handle should also open a slot
73     assert!(tx3.disarm());
74     assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
75 
76     // Disarming a non-armed sender does not free up a slot
77     assert!(!tx3.disarm());
78     assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
79 }
80 
81 #[tokio::test]
send_recv_stream_with_buffer()82 async fn send_recv_stream_with_buffer() {
83     use tokio::stream::StreamExt;
84 
85     let (mut tx, mut rx) = mpsc::channel::<i32>(16);
86 
87     tokio::spawn(async move {
88         assert_ok!(tx.send(1).await);
89         assert_ok!(tx.send(2).await);
90     });
91 
92     assert_eq!(Some(1), rx.next().await);
93     assert_eq!(Some(2), rx.next().await);
94     assert_eq!(None, rx.next().await);
95 }
96 
97 #[tokio::test]
async_send_recv_with_buffer()98 async fn async_send_recv_with_buffer() {
99     let (mut tx, mut rx) = mpsc::channel(16);
100 
101     tokio::spawn(async move {
102         assert_ok!(tx.send(1).await);
103         assert_ok!(tx.send(2).await);
104     });
105 
106     assert_eq!(Some(1), rx.recv().await);
107     assert_eq!(Some(2), rx.recv().await);
108     assert_eq!(None, rx.recv().await);
109 }
110 
111 #[test]
start_send_past_cap()112 fn start_send_past_cap() {
113     let mut t1 = task::spawn(());
114     let mut t2 = task::spawn(());
115     let mut t3 = task::spawn(());
116 
117     let (mut tx1, mut rx) = mpsc::channel(1);
118     let mut tx2 = tx1.clone();
119 
120     assert_ok!(tx1.try_send(()));
121 
122     t1.enter(|cx, _| {
123         assert_pending!(tx1.poll_ready(cx));
124     });
125 
126     t2.enter(|cx, _| {
127         assert_pending!(tx2.poll_ready(cx));
128     });
129 
130     drop(tx1);
131 
132     let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
133     assert!(val.is_some());
134 
135     assert!(t2.is_woken());
136     assert!(!t1.is_woken());
137 
138     drop(tx2);
139 
140     let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
141     assert!(val.is_none());
142 }
143 
144 #[test]
145 #[should_panic]
buffer_gteq_one()146 fn buffer_gteq_one() {
147     mpsc::channel::<i32>(0);
148 }
149 
150 #[test]
send_recv_unbounded()151 fn send_recv_unbounded() {
152     let mut t1 = task::spawn(());
153 
154     let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
155 
156     // Using `try_send`
157     assert_ok!(tx.send(1));
158     assert_ok!(tx.send(2));
159 
160     let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
161     assert_eq!(val, Some(1));
162 
163     let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
164     assert_eq!(val, Some(2));
165 
166     drop(tx);
167 
168     let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
169     assert!(val.is_none());
170 }
171 
172 #[tokio::test]
async_send_recv_unbounded()173 async fn async_send_recv_unbounded() {
174     let (tx, mut rx) = mpsc::unbounded_channel();
175 
176     tokio::spawn(async move {
177         assert_ok!(tx.send(1));
178         assert_ok!(tx.send(2));
179     });
180 
181     assert_eq!(Some(1), rx.recv().await);
182     assert_eq!(Some(2), rx.recv().await);
183     assert_eq!(None, rx.recv().await);
184 }
185 
186 #[tokio::test]
send_recv_stream_unbounded()187 async fn send_recv_stream_unbounded() {
188     use tokio::stream::StreamExt;
189 
190     let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
191 
192     tokio::spawn(async move {
193         assert_ok!(tx.send(1));
194         assert_ok!(tx.send(2));
195     });
196 
197     assert_eq!(Some(1), rx.next().await);
198     assert_eq!(Some(2), rx.next().await);
199     assert_eq!(None, rx.next().await);
200 }
201 
202 #[test]
no_t_bounds_buffer()203 fn no_t_bounds_buffer() {
204     struct NoImpls;
205 
206     let mut t1 = task::spawn(());
207     let (tx, mut rx) = mpsc::channel(100);
208 
209     // sender should be Debug even though T isn't Debug
210     println!("{:?}", tx);
211     // same with Receiver
212     println!("{:?}", rx);
213     // and sender should be Clone even though T isn't Clone
214     assert!(tx.clone().try_send(NoImpls).is_ok());
215 
216     let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
217     assert!(val.is_some());
218 }
219 
220 #[test]
no_t_bounds_unbounded()221 fn no_t_bounds_unbounded() {
222     struct NoImpls;
223 
224     let mut t1 = task::spawn(());
225     let (tx, mut rx) = mpsc::unbounded_channel();
226 
227     // sender should be Debug even though T isn't Debug
228     println!("{:?}", tx);
229     // same with Receiver
230     println!("{:?}", rx);
231     // and sender should be Clone even though T isn't Clone
232     assert!(tx.clone().send(NoImpls).is_ok());
233 
234     let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
235     assert!(val.is_some());
236 }
237 
238 #[test]
send_recv_buffer_limited()239 fn send_recv_buffer_limited() {
240     let mut t1 = task::spawn(());
241     let mut t2 = task::spawn(());
242 
243     let (mut tx, mut rx) = mpsc::channel::<i32>(1);
244 
245     // Run on a task context
246     t1.enter(|cx, _| {
247         assert_ready_ok!(tx.poll_ready(cx));
248 
249         // Send first message
250         assert_ok!(tx.try_send(1));
251 
252         // Not ready
253         assert_pending!(tx.poll_ready(cx));
254 
255         // Send second message
256         assert_err!(tx.try_send(1337));
257     });
258 
259     t2.enter(|cx, _| {
260         // Take the value
261         let val = assert_ready!(rx.poll_recv(cx));
262         assert_eq!(Some(1), val);
263     });
264 
265     assert!(t1.is_woken());
266 
267     t1.enter(|cx, _| {
268         assert_ready_ok!(tx.poll_ready(cx));
269 
270         assert_ok!(tx.try_send(2));
271 
272         // Not ready
273         assert_pending!(tx.poll_ready(cx));
274     });
275 
276     t2.enter(|cx, _| {
277         // Take the value
278         let val = assert_ready!(rx.poll_recv(cx));
279         assert_eq!(Some(2), val);
280     });
281 
282     t1.enter(|cx, _| {
283         assert_ready_ok!(tx.poll_ready(cx));
284     });
285 }
286 
287 #[test]
recv_close_gets_none_idle()288 fn recv_close_gets_none_idle() {
289     let mut t1 = task::spawn(());
290 
291     let (mut tx, mut rx) = mpsc::channel::<i32>(10);
292 
293     rx.close();
294 
295     t1.enter(|cx, _| {
296         let val = assert_ready!(rx.poll_recv(cx));
297         assert!(val.is_none());
298         assert_ready_err!(tx.poll_ready(cx));
299     });
300 }
301 
302 #[test]
recv_close_gets_none_reserved()303 fn recv_close_gets_none_reserved() {
304     let mut t1 = task::spawn(());
305     let mut t2 = task::spawn(());
306     let mut t3 = task::spawn(());
307 
308     let (mut tx1, mut rx) = mpsc::channel::<i32>(1);
309     let mut tx2 = tx1.clone();
310 
311     assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));
312 
313     t2.enter(|cx, _| {
314         assert_pending!(tx2.poll_ready(cx));
315     });
316 
317     rx.close();
318 
319     assert!(t2.is_woken());
320 
321     t2.enter(|cx, _| {
322         assert_ready_err!(tx2.poll_ready(cx));
323     });
324 
325     t3.enter(|cx, _| assert_pending!(rx.poll_recv(cx)));
326 
327     assert!(!t1.is_woken());
328     assert!(!t2.is_woken());
329 
330     assert_ok!(tx1.try_send(123));
331 
332     assert!(t3.is_woken());
333 
334     t3.enter(|cx, _| {
335         let v = assert_ready!(rx.poll_recv(cx));
336         assert_eq!(v, Some(123));
337 
338         let v = assert_ready!(rx.poll_recv(cx));
339         assert!(v.is_none());
340     });
341 }
342 
343 #[test]
tx_close_gets_none()344 fn tx_close_gets_none() {
345     let mut t1 = task::spawn(());
346 
347     let (_, mut rx) = mpsc::channel::<i32>(10);
348 
349     // Run on a task context
350     t1.enter(|cx, _| {
351         let v = assert_ready!(rx.poll_recv(cx));
352         assert!(v.is_none());
353     });
354 }
355 
356 #[test]
try_send_fail()357 fn try_send_fail() {
358     let mut t1 = task::spawn(());
359 
360     let (mut tx, mut rx) = mpsc::channel(1);
361 
362     tx.try_send("hello").unwrap();
363 
364     // This should fail
365     match assert_err!(tx.try_send("fail")) {
366         TrySendError::Full(..) => {}
367         _ => panic!(),
368     }
369 
370     let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
371     assert_eq!(val, Some("hello"));
372 
373     assert_ok!(tx.try_send("goodbye"));
374     drop(tx);
375 
376     let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
377     assert_eq!(val, Some("goodbye"));
378 
379     let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
380     assert!(val.is_none());
381 }
382 
383 #[test]
drop_tx_with_permit_releases_permit()384 fn drop_tx_with_permit_releases_permit() {
385     let mut t1 = task::spawn(());
386     let mut t2 = task::spawn(());
387 
388     // poll_ready reserves capacity, ensure that the capacity is released if tx
389     // is dropped w/o sending a value.
390     let (mut tx1, _rx) = mpsc::channel::<i32>(1);
391     let mut tx2 = tx1.clone();
392 
393     assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));
394 
395     t2.enter(|cx, _| {
396         assert_pending!(tx2.poll_ready(cx));
397     });
398 
399     drop(tx1);
400 
401     assert!(t2.is_woken());
402 
403     assert_ready_ok!(t2.enter(|cx, _| tx2.poll_ready(cx)));
404 }
405 
406 #[test]
dropping_rx_closes_channel()407 fn dropping_rx_closes_channel() {
408     let mut t1 = task::spawn(());
409 
410     let (mut tx, rx) = mpsc::channel(100);
411 
412     let msg = Arc::new(());
413     assert_ok!(tx.try_send(msg.clone()));
414 
415     drop(rx);
416     assert_ready_err!(t1.enter(|cx, _| tx.poll_ready(cx)));
417 
418     assert_eq!(1, Arc::strong_count(&msg));
419 }
420 
421 #[test]
dropping_rx_closes_channel_for_try()422 fn dropping_rx_closes_channel_for_try() {
423     let (mut tx, rx) = mpsc::channel(100);
424 
425     let msg = Arc::new(());
426     tx.try_send(msg.clone()).unwrap();
427 
428     drop(rx);
429 
430     {
431         let err = assert_err!(tx.try_send(msg.clone()));
432         match err {
433             TrySendError::Closed(..) => {}
434             _ => panic!(),
435         }
436     }
437 
438     assert_eq!(1, Arc::strong_count(&msg));
439 }
440 
441 #[test]
unconsumed_messages_are_dropped()442 fn unconsumed_messages_are_dropped() {
443     let msg = Arc::new(());
444 
445     let (mut tx, rx) = mpsc::channel(100);
446 
447     tx.try_send(msg.clone()).unwrap();
448 
449     assert_eq!(2, Arc::strong_count(&msg));
450 
451     drop((tx, rx));
452 
453     assert_eq!(1, Arc::strong_count(&msg));
454 }
455 
456 #[test]
try_recv()457 fn try_recv() {
458     let (mut tx, mut rx) = mpsc::channel(1);
459     match rx.try_recv() {
460         Err(TryRecvError::Empty) => {}
461         _ => panic!(),
462     }
463     tx.try_send(42).unwrap();
464     match rx.try_recv() {
465         Ok(42) => {}
466         _ => panic!(),
467     }
468     drop(tx);
469     match rx.try_recv() {
470         Err(TryRecvError::Closed) => {}
471         _ => panic!(),
472     }
473 }
474 
475 #[test]
try_recv_unbounded()476 fn try_recv_unbounded() {
477     let (tx, mut rx) = mpsc::unbounded_channel();
478     match rx.try_recv() {
479         Err(TryRecvError::Empty) => {}
480         _ => panic!(),
481     }
482     tx.send(42).unwrap();
483     match rx.try_recv() {
484         Ok(42) => {}
485         _ => panic!(),
486     }
487     drop(tx);
488     match rx.try_recv() {
489         Err(TryRecvError::Closed) => {}
490         _ => panic!(),
491     }
492 }
493