1 use futures::future::poll_fn;
2 use tokio::sync::mpsc::channel;
3 use tokio_test::task::spawn;
4 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
5 use tokio_util::sync::PollSender;
6 
7 #[tokio::test]
test_simple()8 async fn test_simple() {
9     let (send, mut recv) = channel(3);
10     let mut send = PollSender::new(send);
11 
12     for i in 1..=3i32 {
13         send.start_send(i).unwrap();
14         assert_ready_ok!(spawn(poll_fn(|cx| send.poll_send_done(cx))).poll());
15     }
16 
17     send.start_send(4).unwrap();
18     let mut fourth_send = spawn(poll_fn(|cx| send.poll_send_done(cx)));
19     assert_pending!(fourth_send.poll());
20     assert_eq!(recv.recv().await.unwrap(), 1);
21     assert!(fourth_send.is_woken());
22     assert_ready_ok!(fourth_send.poll());
23 
24     drop(recv);
25 
26     // Here, start_send is not guaranteed to fail, but if it doesn't the first
27     // call to poll_send_done should.
28     if send.start_send(5).is_ok() {
29         assert_ready_err!(spawn(poll_fn(|cx| send.poll_send_done(cx))).poll());
30     }
31 }
32 
33 #[tokio::test]
test_abort()34 async fn test_abort() {
35     let (send, mut recv) = channel(3);
36     let mut send = PollSender::new(send);
37     let send2 = send.clone_inner().unwrap();
38 
39     for i in 1..=3i32 {
40         send.start_send(i).unwrap();
41         assert_ready_ok!(spawn(poll_fn(|cx| send.poll_send_done(cx))).poll());
42     }
43 
44     send.start_send(4).unwrap();
45     {
46         let mut fourth_send = spawn(poll_fn(|cx| send.poll_send_done(cx)));
47         assert_pending!(fourth_send.poll());
48         assert_eq!(recv.recv().await.unwrap(), 1);
49         assert!(fourth_send.is_woken());
50     }
51 
52     let mut send2_send = spawn(send2.send(5));
53     assert_pending!(send2_send.poll());
54     send.abort_send();
55     assert!(send2_send.is_woken());
56     assert_ready_ok!(send2_send.poll());
57 
58     assert_eq!(recv.recv().await.unwrap(), 2);
59     assert_eq!(recv.recv().await.unwrap(), 3);
60     assert_eq!(recv.recv().await.unwrap(), 5);
61 }
62 
63 #[tokio::test]
close_sender_last()64 async fn close_sender_last() {
65     let (send, mut recv) = channel::<i32>(3);
66     let mut send = PollSender::new(send);
67 
68     let mut recv_task = spawn(recv.recv());
69     assert_pending!(recv_task.poll());
70 
71     send.close_this_sender();
72 
73     assert!(recv_task.is_woken());
74     assert!(assert_ready!(recv_task.poll()).is_none());
75 }
76 
77 #[tokio::test]
close_sender_not_last()78 async fn close_sender_not_last() {
79     let (send, mut recv) = channel::<i32>(3);
80     let send2 = send.clone();
81     let mut send = PollSender::new(send);
82 
83     let mut recv_task = spawn(recv.recv());
84     assert_pending!(recv_task.poll());
85 
86     send.close_this_sender();
87 
88     assert!(!recv_task.is_woken());
89     assert_pending!(recv_task.poll());
90 
91     drop(send2);
92 
93     assert!(recv_task.is_woken());
94     assert!(assert_ready!(recv_task.poll()).is_none());
95 }
96