1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 use tokio::sync::oneshot;
5 use tokio::sync::oneshot::error::TryRecvError;
6 use tokio_test::*;
7 
8 use std::future::Future;
9 use std::pin::Pin;
10 use std::task::{Context, Poll};
11 
12 trait AssertSend: Send {}
13 impl AssertSend for oneshot::Sender<i32> {}
14 impl AssertSend for oneshot::Receiver<i32> {}
15 
16 trait SenderExt {
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>17     fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>;
18 }
19 impl<T> SenderExt for oneshot::Sender<T> {
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>20     fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
21         tokio::pin! {
22             let fut = self.closed();
23         }
24         fut.poll(cx)
25     }
26 }
27 
28 #[test]
send_recv()29 fn send_recv() {
30     let (tx, rx) = oneshot::channel();
31     let mut rx = task::spawn(rx);
32 
33     assert_pending!(rx.poll());
34 
35     assert_ok!(tx.send(1));
36 
37     assert!(rx.is_woken());
38 
39     let val = assert_ready_ok!(rx.poll());
40     assert_eq!(val, 1);
41 }
42 
43 #[tokio::test]
async_send_recv()44 async fn async_send_recv() {
45     let (tx, rx) = oneshot::channel();
46 
47     assert_ok!(tx.send(1));
48     assert_eq!(1, assert_ok!(rx.await));
49 }
50 
51 #[test]
close_tx()52 fn close_tx() {
53     let (tx, rx) = oneshot::channel::<i32>();
54     let mut rx = task::spawn(rx);
55 
56     assert_pending!(rx.poll());
57 
58     drop(tx);
59 
60     assert!(rx.is_woken());
61     assert_ready_err!(rx.poll());
62 }
63 
64 #[test]
close_rx()65 fn close_rx() {
66     // First, without checking poll_closed()
67     //
68     let (tx, _) = oneshot::channel();
69 
70     assert_err!(tx.send(1));
71 
72     // Second, via poll_closed();
73 
74     let (tx, rx) = oneshot::channel();
75     let mut tx = task::spawn(tx);
76 
77     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
78 
79     drop(rx);
80 
81     assert!(tx.is_woken());
82     assert!(tx.is_closed());
83     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
84 
85     assert_err!(tx.into_inner().send(1));
86 }
87 
88 #[tokio::test]
async_rx_closed()89 async fn async_rx_closed() {
90     let (mut tx, rx) = oneshot::channel::<()>();
91 
92     tokio::spawn(async move {
93         drop(rx);
94     });
95 
96     tx.closed().await;
97 }
98 
99 #[test]
explicit_close_poll()100 fn explicit_close_poll() {
101     // First, with message sent
102     let (tx, rx) = oneshot::channel();
103     let mut rx = task::spawn(rx);
104 
105     assert_ok!(tx.send(1));
106 
107     rx.close();
108 
109     let value = assert_ready_ok!(rx.poll());
110     assert_eq!(value, 1);
111 
112     // Second, without the message sent
113     let (tx, rx) = oneshot::channel::<i32>();
114     let mut tx = task::spawn(tx);
115     let mut rx = task::spawn(rx);
116 
117     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
118 
119     rx.close();
120 
121     assert!(tx.is_woken());
122     assert!(tx.is_closed());
123     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
124 
125     assert_err!(tx.into_inner().send(1));
126     assert_ready_err!(rx.poll());
127 
128     // Again, but without sending the value this time
129     let (tx, rx) = oneshot::channel::<i32>();
130     let mut tx = task::spawn(tx);
131     let mut rx = task::spawn(rx);
132 
133     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
134 
135     rx.close();
136 
137     assert!(tx.is_woken());
138     assert!(tx.is_closed());
139     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
140 
141     assert_ready_err!(rx.poll());
142 }
143 
144 #[test]
explicit_close_try_recv()145 fn explicit_close_try_recv() {
146     // First, with message sent
147     let (tx, mut rx) = oneshot::channel();
148 
149     assert_ok!(tx.send(1));
150 
151     rx.close();
152 
153     let val = assert_ok!(rx.try_recv());
154     assert_eq!(1, val);
155 
156     // Second, without the message sent
157     let (tx, mut rx) = oneshot::channel::<i32>();
158     let mut tx = task::spawn(tx);
159 
160     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
161 
162     rx.close();
163 
164     assert!(tx.is_woken());
165     assert!(tx.is_closed());
166     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
167 
168     assert_err!(rx.try_recv());
169 }
170 
171 #[test]
172 #[should_panic]
close_try_recv_poll()173 fn close_try_recv_poll() {
174     let (_tx, rx) = oneshot::channel::<i32>();
175     let mut rx = task::spawn(rx);
176 
177     rx.close();
178 
179     assert_err!(rx.try_recv());
180 
181     let _ = rx.poll();
182 }
183 
184 #[test]
close_after_recv()185 fn close_after_recv() {
186     let (tx, mut rx) = oneshot::channel::<i32>();
187 
188     tx.send(17).unwrap();
189 
190     assert_eq!(17, rx.try_recv().unwrap());
191     rx.close();
192 }
193 
194 #[test]
try_recv_after_completion()195 fn try_recv_after_completion() {
196     let (tx, mut rx) = oneshot::channel::<i32>();
197 
198     tx.send(17).unwrap();
199 
200     assert_eq!(17, rx.try_recv().unwrap());
201     assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
202     rx.close();
203 }
204 
205 #[test]
drops_tasks()206 fn drops_tasks() {
207     let (mut tx, mut rx) = oneshot::channel::<i32>();
208     let mut tx_task = task::spawn(());
209     let mut rx_task = task::spawn(());
210 
211     assert_pending!(tx_task.enter(|cx, _| tx.poll_closed(cx)));
212     assert_pending!(rx_task.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
213 
214     drop(tx);
215     drop(rx);
216 
217     assert_eq!(1, tx_task.waker_ref_count());
218     assert_eq!(1, rx_task.waker_ref_count());
219 }
220 
221 #[test]
receiver_changes_task()222 fn receiver_changes_task() {
223     let (tx, mut rx) = oneshot::channel();
224 
225     let mut task1 = task::spawn(());
226     let mut task2 = task::spawn(());
227 
228     assert_pending!(task1.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
229 
230     assert_eq!(2, task1.waker_ref_count());
231     assert_eq!(1, task2.waker_ref_count());
232 
233     assert_pending!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
234 
235     assert_eq!(1, task1.waker_ref_count());
236     assert_eq!(2, task2.waker_ref_count());
237 
238     assert_ok!(tx.send(1));
239 
240     assert!(!task1.is_woken());
241     assert!(task2.is_woken());
242 
243     assert_ready_ok!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
244 }
245 
246 #[test]
sender_changes_task()247 fn sender_changes_task() {
248     let (mut tx, rx) = oneshot::channel::<i32>();
249 
250     let mut task1 = task::spawn(());
251     let mut task2 = task::spawn(());
252 
253     assert_pending!(task1.enter(|cx, _| tx.poll_closed(cx)));
254 
255     assert_eq!(2, task1.waker_ref_count());
256     assert_eq!(1, task2.waker_ref_count());
257 
258     assert_pending!(task2.enter(|cx, _| tx.poll_closed(cx)));
259 
260     assert_eq!(1, task1.waker_ref_count());
261     assert_eq!(2, task2.waker_ref_count());
262 
263     drop(rx);
264 
265     assert!(!task1.is_woken());
266     assert!(task2.is_woken());
267 
268     assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
269 }
270