1 use futures::channel::oneshot::{self, Sender};
2 use futures::executor::block_on;
3 use futures::future::{poll_fn, FutureExt};
4 use futures::task::{Context, Poll};
5 use futures_test::task::panic_waker_ref;
6 use std::sync::mpsc;
7 use std::thread;
8 
9 #[test]
smoke_poll()10 fn smoke_poll() {
11     let (mut tx, rx) = oneshot::channel::<u32>();
12     let mut rx = Some(rx);
13     let f = poll_fn(|cx| {
14         assert!(tx.poll_canceled(cx).is_pending());
15         assert!(tx.poll_canceled(cx).is_pending());
16         drop(rx.take());
17         assert!(tx.poll_canceled(cx).is_ready());
18         assert!(tx.poll_canceled(cx).is_ready());
19         Poll::Ready(())
20     });
21 
22     block_on(f);
23 }
24 
25 #[test]
cancel_notifies()26 fn cancel_notifies() {
27     let (mut tx, rx) = oneshot::channel::<u32>();
28 
29     let t = thread::spawn(move || {
30         block_on(tx.cancellation());
31     });
32     drop(rx);
33     t.join().unwrap();
34 }
35 
36 #[test]
cancel_lots()37 fn cancel_lots() {
38     let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
39     let t = thread::spawn(move || {
40         for (mut tx, tx2) in rx {
41             block_on(tx.cancellation());
42             tx2.send(()).unwrap();
43         }
44     });
45 
46     for _ in 0..20000 {
47         let (otx, orx) = oneshot::channel::<u32>();
48         let (tx2, rx2) = mpsc::channel();
49         tx.send((otx, tx2)).unwrap();
50         drop(orx);
51         rx2.recv().unwrap();
52     }
53     drop(tx);
54 
55     t.join().unwrap();
56 }
57 
58 #[test]
cancel_after_sender_drop_doesnt_notify()59 fn cancel_after_sender_drop_doesnt_notify() {
60     let (mut tx, rx) = oneshot::channel::<u32>();
61     let mut cx = Context::from_waker(panic_waker_ref());
62     assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending);
63     drop(tx);
64     drop(rx);
65 }
66 
67 #[test]
close()68 fn close() {
69     let (mut tx, mut rx) = oneshot::channel::<u32>();
70     rx.close();
71     block_on(poll_fn(|cx| {
72         match rx.poll_unpin(cx) {
73             Poll::Ready(Err(_)) => {}
74             _ => panic!(),
75         };
76         assert!(tx.poll_canceled(cx).is_ready());
77         Poll::Ready(())
78     }));
79 }
80 
81 #[test]
close_wakes()82 fn close_wakes() {
83     let (mut tx, mut rx) = oneshot::channel::<u32>();
84     let (tx2, rx2) = mpsc::channel();
85     let t = thread::spawn(move || {
86         rx.close();
87         rx2.recv().unwrap();
88     });
89     block_on(tx.cancellation());
90     tx2.send(()).unwrap();
91     t.join().unwrap();
92 }
93 
94 #[test]
is_canceled()95 fn is_canceled() {
96     let (tx, rx) = oneshot::channel::<u32>();
97     assert!(!tx.is_canceled());
98     drop(rx);
99     assert!(tx.is_canceled());
100 }
101 
102 #[test]
cancel_sends()103 fn cancel_sends() {
104     let (tx, rx) = mpsc::channel::<Sender<_>>();
105     let t = thread::spawn(move || {
106         for otx in rx {
107             let _ = otx.send(42);
108         }
109     });
110 
111     for _ in 0..20000 {
112         let (otx, mut orx) = oneshot::channel::<u32>();
113         tx.send(otx).unwrap();
114 
115         orx.close();
116         let _ = block_on(orx);
117     }
118 
119     drop(tx);
120     t.join().unwrap();
121 }
122 
123 // #[test]
124 // fn spawn_sends_items() {
125 //     let core = local_executor::Core::new();
126 //     let future = ok::<_, ()>(1);
127 //     let rx = spawn(future, &core);
128 //     assert_eq!(core.run(rx).unwrap(), 1);
129 // }
130 //
131 // #[test]
132 // fn spawn_kill_dead_stream() {
133 //     use std::thread;
134 //     use std::time::Duration;
135 //     use futures::future::Either;
136 //     use futures::sync::oneshot;
137 //
138 //     // a future which never returns anything (forever accepting incoming
139 //     // connections), but dropping it leads to observable side effects
140 //     // (like closing listening sockets, releasing limited resources,
141 //     // ...)
142 //     #[derive(Debug)]
143 //     struct Dead {
144 //         // when dropped you should get Err(oneshot::Canceled) on the
145 //         // receiving end
146 //         done: oneshot::Sender<()>,
147 //     }
148 //     impl Future for Dead {
149 //         type Item = ();
150 //         type Error = ();
151 //
152 //         fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
153 //             Ok(Poll::Pending)
154 //         }
155 //     }
156 //
157 //     // need to implement a timeout for the test, as it would hang
158 //     // forever right now
159 //     let (timeout_tx, timeout_rx) = oneshot::channel();
160 //     thread::spawn(move || {
161 //         thread::sleep(Duration::from_millis(1000));
162 //         let _ = timeout_tx.send(());
163 //     });
164 //
165 //     let core = local_executor::Core::new();
166 //     let (done_tx, done_rx) = oneshot::channel();
167 //     let future = Dead{done: done_tx};
168 //     let rx = spawn(future, &core);
169 //     let res = core.run(
170 //         Ok::<_, ()>(())
171 //         .into_future()
172 //         .then(move |_| {
173 //             // now drop the spawned future: maybe some timeout exceeded,
174 //             // or some connection on this end was closed by the remote
175 //             // end.
176 //             drop(rx);
177 //             // and wait for the spawned future to release its resources
178 //             done_rx
179 //         })
180 //         .select2(timeout_rx)
181 //     );
182 //     match res {
183 //         Err(Either::A((oneshot::Canceled, _))) => (),
184 //         Ok(Either::B(((), _))) => {
185 //             panic!("dead future wasn't canceled (timeout)");
186 //         },
187 //         _ => {
188 //             panic!("dead future wasn't canceled (unexpected result)");
189 //         },
190 //     }
191 // }
192 //
193 // #[test]
194 // fn spawn_dont_kill_forgot_dead_stream() {
195 //     use std::thread;
196 //     use std::time::Duration;
197 //     use futures::future::Either;
198 //     use futures::sync::oneshot;
199 //
200 //     // a future which never returns anything (forever accepting incoming
201 //     // connections), but dropping it leads to observable side effects
202 //     // (like closing listening sockets, releasing limited resources,
203 //     // ...)
204 //     #[derive(Debug)]
205 //     struct Dead {
206 //         // when dropped you should get Err(oneshot::Canceled) on the
207 //         // receiving end
208 //         done: oneshot::Sender<()>,
209 //     }
210 //     impl Future for Dead {
211 //         type Item = ();
212 //         type Error = ();
213 //
214 //         fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
215 //             Ok(Poll::Pending)
216 //         }
217 //     }
218 //
219 //     // need to implement a timeout for the test, as it would hang
220 //     // forever right now
221 //     let (timeout_tx, timeout_rx) = oneshot::channel();
222 //     thread::spawn(move || {
223 //         thread::sleep(Duration::from_millis(1000));
224 //         let _ = timeout_tx.send(());
225 //     });
226 //
227 //     let core = local_executor::Core::new();
228 //     let (done_tx, done_rx) = oneshot::channel();
229 //     let future = Dead{done: done_tx};
230 //     let rx = spawn(future, &core);
231 //     let res = core.run(
232 //         Ok::<_, ()>(())
233 //         .into_future()
234 //         .then(move |_| {
235 //             // forget the spawned future: should keep running, i.e. hit
236 //             // the timeout below.
237 //             rx.forget();
238 //             // and wait for the spawned future to release its resources
239 //             done_rx
240 //         })
241 //         .select2(timeout_rx)
242 //     );
243 //     match res {
244 //         Err(Either::A((oneshot::Canceled, _))) => {
245 //             panic!("forgotten dead future was canceled");
246 //         },
247 //         Ok(Either::B(((), _))) => (), // reached timeout
248 //         _ => {
249 //             panic!("forgotten dead future was canceled (unexpected result)");
250 //         },
251 //     }
252 // }
253