1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3
4 use tokio::sync::oneshot;
5 use tokio::sync::oneshot::error::TryRecvError;
6 use tokio_test::*;
7
8 use std::future::Future;
9 use std::pin::Pin;
10 use std::task::{Context, Poll};
11
12 trait AssertSend: Send {}
13 impl AssertSend for oneshot::Sender<i32> {}
14 impl AssertSend for oneshot::Receiver<i32> {}
15
16 trait SenderExt {
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>17 fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>;
18 }
19 impl<T> SenderExt for oneshot::Sender<T> {
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>20 fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
21 tokio::pin! {
22 let fut = self.closed();
23 }
24 fut.poll(cx)
25 }
26 }
27
28 #[test]
send_recv()29 fn send_recv() {
30 let (tx, rx) = oneshot::channel();
31 let mut rx = task::spawn(rx);
32
33 assert_pending!(rx.poll());
34
35 assert_ok!(tx.send(1));
36
37 assert!(rx.is_woken());
38
39 let val = assert_ready_ok!(rx.poll());
40 assert_eq!(val, 1);
41 }
42
43 #[tokio::test]
async_send_recv()44 async fn async_send_recv() {
45 let (tx, rx) = oneshot::channel();
46
47 assert_ok!(tx.send(1));
48 assert_eq!(1, assert_ok!(rx.await));
49 }
50
51 #[test]
close_tx()52 fn close_tx() {
53 let (tx, rx) = oneshot::channel::<i32>();
54 let mut rx = task::spawn(rx);
55
56 assert_pending!(rx.poll());
57
58 drop(tx);
59
60 assert!(rx.is_woken());
61 assert_ready_err!(rx.poll());
62 }
63
64 #[test]
close_rx()65 fn close_rx() {
66 // First, without checking poll_closed()
67 //
68 let (tx, _) = oneshot::channel();
69
70 assert_err!(tx.send(1));
71
72 // Second, via poll_closed();
73
74 let (tx, rx) = oneshot::channel();
75 let mut tx = task::spawn(tx);
76
77 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
78
79 drop(rx);
80
81 assert!(tx.is_woken());
82 assert!(tx.is_closed());
83 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
84
85 assert_err!(tx.into_inner().send(1));
86 }
87
88 #[tokio::test]
async_rx_closed()89 async fn async_rx_closed() {
90 let (mut tx, rx) = oneshot::channel::<()>();
91
92 tokio::spawn(async move {
93 drop(rx);
94 });
95
96 tx.closed().await;
97 }
98
99 #[test]
explicit_close_poll()100 fn explicit_close_poll() {
101 // First, with message sent
102 let (tx, rx) = oneshot::channel();
103 let mut rx = task::spawn(rx);
104
105 assert_ok!(tx.send(1));
106
107 rx.close();
108
109 let value = assert_ready_ok!(rx.poll());
110 assert_eq!(value, 1);
111
112 // Second, without the message sent
113 let (tx, rx) = oneshot::channel::<i32>();
114 let mut tx = task::spawn(tx);
115 let mut rx = task::spawn(rx);
116
117 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
118
119 rx.close();
120
121 assert!(tx.is_woken());
122 assert!(tx.is_closed());
123 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
124
125 assert_err!(tx.into_inner().send(1));
126 assert_ready_err!(rx.poll());
127
128 // Again, but without sending the value this time
129 let (tx, rx) = oneshot::channel::<i32>();
130 let mut tx = task::spawn(tx);
131 let mut rx = task::spawn(rx);
132
133 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
134
135 rx.close();
136
137 assert!(tx.is_woken());
138 assert!(tx.is_closed());
139 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
140
141 assert_ready_err!(rx.poll());
142 }
143
144 #[test]
explicit_close_try_recv()145 fn explicit_close_try_recv() {
146 // First, with message sent
147 let (tx, mut rx) = oneshot::channel();
148
149 assert_ok!(tx.send(1));
150
151 rx.close();
152
153 let val = assert_ok!(rx.try_recv());
154 assert_eq!(1, val);
155
156 // Second, without the message sent
157 let (tx, mut rx) = oneshot::channel::<i32>();
158 let mut tx = task::spawn(tx);
159
160 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
161
162 rx.close();
163
164 assert!(tx.is_woken());
165 assert!(tx.is_closed());
166 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
167
168 assert_err!(rx.try_recv());
169 }
170
171 #[test]
172 #[should_panic]
close_try_recv_poll()173 fn close_try_recv_poll() {
174 let (_tx, rx) = oneshot::channel::<i32>();
175 let mut rx = task::spawn(rx);
176
177 rx.close();
178
179 assert_err!(rx.try_recv());
180
181 let _ = rx.poll();
182 }
183
184 #[test]
close_after_recv()185 fn close_after_recv() {
186 let (tx, mut rx) = oneshot::channel::<i32>();
187
188 tx.send(17).unwrap();
189
190 assert_eq!(17, rx.try_recv().unwrap());
191 rx.close();
192 }
193
194 #[test]
try_recv_after_completion()195 fn try_recv_after_completion() {
196 let (tx, mut rx) = oneshot::channel::<i32>();
197
198 tx.send(17).unwrap();
199
200 assert_eq!(17, rx.try_recv().unwrap());
201 assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
202 rx.close();
203 }
204
205 #[test]
drops_tasks()206 fn drops_tasks() {
207 let (mut tx, mut rx) = oneshot::channel::<i32>();
208 let mut tx_task = task::spawn(());
209 let mut rx_task = task::spawn(());
210
211 assert_pending!(tx_task.enter(|cx, _| tx.poll_closed(cx)));
212 assert_pending!(rx_task.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
213
214 drop(tx);
215 drop(rx);
216
217 assert_eq!(1, tx_task.waker_ref_count());
218 assert_eq!(1, rx_task.waker_ref_count());
219 }
220
221 #[test]
receiver_changes_task()222 fn receiver_changes_task() {
223 let (tx, mut rx) = oneshot::channel();
224
225 let mut task1 = task::spawn(());
226 let mut task2 = task::spawn(());
227
228 assert_pending!(task1.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
229
230 assert_eq!(2, task1.waker_ref_count());
231 assert_eq!(1, task2.waker_ref_count());
232
233 assert_pending!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
234
235 assert_eq!(1, task1.waker_ref_count());
236 assert_eq!(2, task2.waker_ref_count());
237
238 assert_ok!(tx.send(1));
239
240 assert!(!task1.is_woken());
241 assert!(task2.is_woken());
242
243 assert_ready_ok!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
244 }
245
246 #[test]
sender_changes_task()247 fn sender_changes_task() {
248 let (mut tx, rx) = oneshot::channel::<i32>();
249
250 let mut task1 = task::spawn(());
251 let mut task2 = task::spawn(());
252
253 assert_pending!(task1.enter(|cx, _| tx.poll_closed(cx)));
254
255 assert_eq!(2, task1.waker_ref_count());
256 assert_eq!(1, task2.waker_ref_count());
257
258 assert_pending!(task2.enter(|cx, _| tx.poll_closed(cx)));
259
260 assert_eq!(1, task1.waker_ref_count());
261 assert_eq!(2, task2.waker_ref_count());
262
263 drop(rx);
264
265 assert!(!task1.is_woken());
266 assert!(task2.is_woken());
267
268 assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
269 }
270