1 use futures::future::poll_fn;
2 use tokio::sync::mpsc::channel;
3 use tokio_test::task::spawn;
4 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
5 use tokio_util::sync::PollSender;
6
7 #[tokio::test]
test_simple()8 async fn test_simple() {
9 let (send, mut recv) = channel(3);
10 let mut send = PollSender::new(send);
11
12 for i in 1..=3i32 {
13 send.start_send(i).unwrap();
14 assert_ready_ok!(spawn(poll_fn(|cx| send.poll_send_done(cx))).poll());
15 }
16
17 send.start_send(4).unwrap();
18 let mut fourth_send = spawn(poll_fn(|cx| send.poll_send_done(cx)));
19 assert_pending!(fourth_send.poll());
20 assert_eq!(recv.recv().await.unwrap(), 1);
21 assert!(fourth_send.is_woken());
22 assert_ready_ok!(fourth_send.poll());
23
24 drop(recv);
25
26 // Here, start_send is not guaranteed to fail, but if it doesn't the first
27 // call to poll_send_done should.
28 if send.start_send(5).is_ok() {
29 assert_ready_err!(spawn(poll_fn(|cx| send.poll_send_done(cx))).poll());
30 }
31 }
32
33 #[tokio::test]
test_abort()34 async fn test_abort() {
35 let (send, mut recv) = channel(3);
36 let mut send = PollSender::new(send);
37 let send2 = send.clone_inner().unwrap();
38
39 for i in 1..=3i32 {
40 send.start_send(i).unwrap();
41 assert_ready_ok!(spawn(poll_fn(|cx| send.poll_send_done(cx))).poll());
42 }
43
44 send.start_send(4).unwrap();
45 {
46 let mut fourth_send = spawn(poll_fn(|cx| send.poll_send_done(cx)));
47 assert_pending!(fourth_send.poll());
48 assert_eq!(recv.recv().await.unwrap(), 1);
49 assert!(fourth_send.is_woken());
50 }
51
52 let mut send2_send = spawn(send2.send(5));
53 assert_pending!(send2_send.poll());
54 send.abort_send();
55 assert!(send2_send.is_woken());
56 assert_ready_ok!(send2_send.poll());
57
58 assert_eq!(recv.recv().await.unwrap(), 2);
59 assert_eq!(recv.recv().await.unwrap(), 3);
60 assert_eq!(recv.recv().await.unwrap(), 5);
61 }
62
63 #[tokio::test]
close_sender_last()64 async fn close_sender_last() {
65 let (send, mut recv) = channel::<i32>(3);
66 let mut send = PollSender::new(send);
67
68 let mut recv_task = spawn(recv.recv());
69 assert_pending!(recv_task.poll());
70
71 send.close_this_sender();
72
73 assert!(recv_task.is_woken());
74 assert!(assert_ready!(recv_task.poll()).is_none());
75 }
76
77 #[tokio::test]
close_sender_not_last()78 async fn close_sender_not_last() {
79 let (send, mut recv) = channel::<i32>(3);
80 let send2 = send.clone();
81 let mut send = PollSender::new(send);
82
83 let mut recv_task = spawn(recv.recv());
84 assert_pending!(recv_task.poll());
85
86 send.close_this_sender();
87
88 assert!(!recv_task.is_woken());
89 assert_pending!(recv_task.poll());
90
91 drop(send2);
92
93 assert!(recv_task.is_woken());
94 assert!(assert_ready!(recv_task.poll()).is_none());
95 }
96