1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3
4 use tokio::sync::oneshot;
5 use tokio_test::*;
6
7 use std::future::Future;
8 use std::pin::Pin;
9
10 trait AssertSend: Send {}
11 impl AssertSend for oneshot::Sender<i32> {}
12 impl AssertSend for oneshot::Receiver<i32> {}
13
14 #[test]
send_recv()15 fn send_recv() {
16 let (tx, rx) = oneshot::channel();
17 let mut rx = task::spawn(rx);
18
19 assert_pending!(rx.poll());
20
21 assert_ok!(tx.send(1));
22
23 assert!(rx.is_woken());
24
25 let val = assert_ready_ok!(rx.poll());
26 assert_eq!(val, 1);
27 }
28
29 #[tokio::test]
async_send_recv()30 async fn async_send_recv() {
31 let (tx, rx) = oneshot::channel();
32
33 assert_ok!(tx.send(1));
34 assert_eq!(1, assert_ok!(rx.await));
35 }
36
37 #[test]
close_tx()38 fn close_tx() {
39 let (tx, rx) = oneshot::channel::<i32>();
40 let mut rx = task::spawn(rx);
41
42 assert_pending!(rx.poll());
43
44 drop(tx);
45
46 assert!(rx.is_woken());
47 assert_ready_err!(rx.poll());
48 }
49
50 #[test]
close_rx()51 fn close_rx() {
52 // First, without checking poll_closed()
53 //
54 let (tx, _) = oneshot::channel();
55
56 assert_err!(tx.send(1));
57
58 // Second, via poll_closed();
59
60 let (tx, rx) = oneshot::channel();
61 let mut tx = task::spawn(tx);
62
63 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
64
65 drop(rx);
66
67 assert!(tx.is_woken());
68 assert!(tx.is_closed());
69 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
70
71 assert_err!(tx.into_inner().send(1));
72 }
73
74 #[tokio::test]
async_rx_closed()75 async fn async_rx_closed() {
76 let (mut tx, rx) = oneshot::channel::<()>();
77
78 tokio::spawn(async move {
79 drop(rx);
80 });
81
82 tx.closed().await;
83 }
84
85 #[test]
explicit_close_poll()86 fn explicit_close_poll() {
87 // First, with message sent
88 let (tx, rx) = oneshot::channel();
89 let mut rx = task::spawn(rx);
90
91 assert_ok!(tx.send(1));
92
93 rx.close();
94
95 let value = assert_ready_ok!(rx.poll());
96 assert_eq!(value, 1);
97
98 // Second, without the message sent
99 let (tx, rx) = oneshot::channel::<i32>();
100 let mut tx = task::spawn(tx);
101 let mut rx = task::spawn(rx);
102
103 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
104
105 rx.close();
106
107 assert!(tx.is_woken());
108 assert!(tx.is_closed());
109 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
110
111 assert_err!(tx.into_inner().send(1));
112 assert_ready_err!(rx.poll());
113
114 // Again, but without sending the value this time
115 let (tx, rx) = oneshot::channel::<i32>();
116 let mut tx = task::spawn(tx);
117 let mut rx = task::spawn(rx);
118
119 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
120
121 rx.close();
122
123 assert!(tx.is_woken());
124 assert!(tx.is_closed());
125 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
126
127 assert_ready_err!(rx.poll());
128 }
129
130 #[test]
explicit_close_try_recv()131 fn explicit_close_try_recv() {
132 // First, with message sent
133 let (tx, mut rx) = oneshot::channel();
134
135 assert_ok!(tx.send(1));
136
137 rx.close();
138
139 let val = assert_ok!(rx.try_recv());
140 assert_eq!(1, val);
141
142 // Second, without the message sent
143 let (tx, mut rx) = oneshot::channel::<i32>();
144 let mut tx = task::spawn(tx);
145
146 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
147
148 rx.close();
149
150 assert!(tx.is_woken());
151 assert!(tx.is_closed());
152 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
153
154 assert_err!(rx.try_recv());
155 }
156
157 #[test]
158 #[should_panic]
close_try_recv_poll()159 fn close_try_recv_poll() {
160 let (_tx, rx) = oneshot::channel::<i32>();
161 let mut rx = task::spawn(rx);
162
163 rx.close();
164
165 assert_err!(rx.try_recv());
166
167 let _ = rx.poll();
168 }
169
170 #[test]
drops_tasks()171 fn drops_tasks() {
172 let (mut tx, mut rx) = oneshot::channel::<i32>();
173 let mut tx_task = task::spawn(());
174 let mut rx_task = task::spawn(());
175
176 assert_pending!(tx_task.enter(|cx, _| tx.poll_closed(cx)));
177 assert_pending!(rx_task.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
178
179 drop(tx);
180 drop(rx);
181
182 assert_eq!(1, tx_task.waker_ref_count());
183 assert_eq!(1, rx_task.waker_ref_count());
184 }
185
186 #[test]
receiver_changes_task()187 fn receiver_changes_task() {
188 let (tx, mut rx) = oneshot::channel();
189
190 let mut task1 = task::spawn(());
191 let mut task2 = task::spawn(());
192
193 assert_pending!(task1.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
194
195 assert_eq!(2, task1.waker_ref_count());
196 assert_eq!(1, task2.waker_ref_count());
197
198 assert_pending!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
199
200 assert_eq!(1, task1.waker_ref_count());
201 assert_eq!(2, task2.waker_ref_count());
202
203 assert_ok!(tx.send(1));
204
205 assert!(!task1.is_woken());
206 assert!(task2.is_woken());
207
208 assert_ready_ok!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
209 }
210
211 #[test]
sender_changes_task()212 fn sender_changes_task() {
213 let (mut tx, rx) = oneshot::channel::<i32>();
214
215 let mut task1 = task::spawn(());
216 let mut task2 = task::spawn(());
217
218 assert_pending!(task1.enter(|cx, _| tx.poll_closed(cx)));
219
220 assert_eq!(2, task1.waker_ref_count());
221 assert_eq!(1, task2.waker_ref_count());
222
223 assert_pending!(task2.enter(|cx, _| tx.poll_closed(cx)));
224
225 assert_eq!(1, task1.waker_ref_count());
226 assert_eq!(2, task2.waker_ref_count());
227
228 drop(rx);
229
230 assert!(!task1.is_woken());
231 assert!(task2.is_woken());
232
233 assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
234 }
235