1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 use tokio::sync::oneshot;
5 use tokio_test::*;
6 
7 use std::future::Future;
8 use std::pin::Pin;
9 
10 trait AssertSend: Send {}
11 impl AssertSend for oneshot::Sender<i32> {}
12 impl AssertSend for oneshot::Receiver<i32> {}
13 
14 #[test]
send_recv()15 fn send_recv() {
16     let (tx, rx) = oneshot::channel();
17     let mut rx = task::spawn(rx);
18 
19     assert_pending!(rx.poll());
20 
21     assert_ok!(tx.send(1));
22 
23     assert!(rx.is_woken());
24 
25     let val = assert_ready_ok!(rx.poll());
26     assert_eq!(val, 1);
27 }
28 
29 #[tokio::test]
async_send_recv()30 async fn async_send_recv() {
31     let (tx, rx) = oneshot::channel();
32 
33     assert_ok!(tx.send(1));
34     assert_eq!(1, assert_ok!(rx.await));
35 }
36 
37 #[test]
close_tx()38 fn close_tx() {
39     let (tx, rx) = oneshot::channel::<i32>();
40     let mut rx = task::spawn(rx);
41 
42     assert_pending!(rx.poll());
43 
44     drop(tx);
45 
46     assert!(rx.is_woken());
47     assert_ready_err!(rx.poll());
48 }
49 
50 #[test]
close_rx()51 fn close_rx() {
52     // First, without checking poll_closed()
53     //
54     let (tx, _) = oneshot::channel();
55 
56     assert_err!(tx.send(1));
57 
58     // Second, via poll_closed();
59 
60     let (tx, rx) = oneshot::channel();
61     let mut tx = task::spawn(tx);
62 
63     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
64 
65     drop(rx);
66 
67     assert!(tx.is_woken());
68     assert!(tx.is_closed());
69     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
70 
71     assert_err!(tx.into_inner().send(1));
72 }
73 
74 #[tokio::test]
async_rx_closed()75 async fn async_rx_closed() {
76     let (mut tx, rx) = oneshot::channel::<()>();
77 
78     tokio::spawn(async move {
79         drop(rx);
80     });
81 
82     tx.closed().await;
83 }
84 
85 #[test]
explicit_close_poll()86 fn explicit_close_poll() {
87     // First, with message sent
88     let (tx, rx) = oneshot::channel();
89     let mut rx = task::spawn(rx);
90 
91     assert_ok!(tx.send(1));
92 
93     rx.close();
94 
95     let value = assert_ready_ok!(rx.poll());
96     assert_eq!(value, 1);
97 
98     // Second, without the message sent
99     let (tx, rx) = oneshot::channel::<i32>();
100     let mut tx = task::spawn(tx);
101     let mut rx = task::spawn(rx);
102 
103     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
104 
105     rx.close();
106 
107     assert!(tx.is_woken());
108     assert!(tx.is_closed());
109     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
110 
111     assert_err!(tx.into_inner().send(1));
112     assert_ready_err!(rx.poll());
113 
114     // Again, but without sending the value this time
115     let (tx, rx) = oneshot::channel::<i32>();
116     let mut tx = task::spawn(tx);
117     let mut rx = task::spawn(rx);
118 
119     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
120 
121     rx.close();
122 
123     assert!(tx.is_woken());
124     assert!(tx.is_closed());
125     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
126 
127     assert_ready_err!(rx.poll());
128 }
129 
130 #[test]
explicit_close_try_recv()131 fn explicit_close_try_recv() {
132     // First, with message sent
133     let (tx, mut rx) = oneshot::channel();
134 
135     assert_ok!(tx.send(1));
136 
137     rx.close();
138 
139     let val = assert_ok!(rx.try_recv());
140     assert_eq!(1, val);
141 
142     // Second, without the message sent
143     let (tx, mut rx) = oneshot::channel::<i32>();
144     let mut tx = task::spawn(tx);
145 
146     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
147 
148     rx.close();
149 
150     assert!(tx.is_woken());
151     assert!(tx.is_closed());
152     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
153 
154     assert_err!(rx.try_recv());
155 }
156 
157 #[test]
158 #[should_panic]
close_try_recv_poll()159 fn close_try_recv_poll() {
160     let (_tx, rx) = oneshot::channel::<i32>();
161     let mut rx = task::spawn(rx);
162 
163     rx.close();
164 
165     assert_err!(rx.try_recv());
166 
167     let _ = rx.poll();
168 }
169 
170 #[test]
drops_tasks()171 fn drops_tasks() {
172     let (mut tx, mut rx) = oneshot::channel::<i32>();
173     let mut tx_task = task::spawn(());
174     let mut rx_task = task::spawn(());
175 
176     assert_pending!(tx_task.enter(|cx, _| tx.poll_closed(cx)));
177     assert_pending!(rx_task.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
178 
179     drop(tx);
180     drop(rx);
181 
182     assert_eq!(1, tx_task.waker_ref_count());
183     assert_eq!(1, rx_task.waker_ref_count());
184 }
185 
186 #[test]
receiver_changes_task()187 fn receiver_changes_task() {
188     let (tx, mut rx) = oneshot::channel();
189 
190     let mut task1 = task::spawn(());
191     let mut task2 = task::spawn(());
192 
193     assert_pending!(task1.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
194 
195     assert_eq!(2, task1.waker_ref_count());
196     assert_eq!(1, task2.waker_ref_count());
197 
198     assert_pending!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
199 
200     assert_eq!(1, task1.waker_ref_count());
201     assert_eq!(2, task2.waker_ref_count());
202 
203     assert_ok!(tx.send(1));
204 
205     assert!(!task1.is_woken());
206     assert!(task2.is_woken());
207 
208     assert_ready_ok!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
209 }
210 
211 #[test]
sender_changes_task()212 fn sender_changes_task() {
213     let (mut tx, rx) = oneshot::channel::<i32>();
214 
215     let mut task1 = task::spawn(());
216     let mut task2 = task::spawn(());
217 
218     assert_pending!(task1.enter(|cx, _| tx.poll_closed(cx)));
219 
220     assert_eq!(2, task1.waker_ref_count());
221     assert_eq!(1, task2.waker_ref_count());
222 
223     assert_pending!(task2.enter(|cx, _| tx.poll_closed(cx)));
224 
225     assert_eq!(1, task1.waker_ref_count());
226     assert_eq!(2, task2.waker_ref_count());
227 
228     drop(rx);
229 
230     assert!(!task1.is_woken());
231     assert!(task2.is_woken());
232 
233     assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
234 }
235