1 extern crate futures;
2 
3 use std::sync::mpsc;
4 use std::thread;
5 
6 use futures::prelude::*;
7 use futures::future::{lazy, ok};
8 use futures::sync::oneshot::*;
9 
10 mod support;
11 use support::*;
12 
13 #[test]
14 fn smoke_poll() {
15     let (mut tx, rx) = channel::<u32>();
16     let mut task = futures::executor::spawn(lazy(|| {
17         assert!(tx.poll_cancel().unwrap().is_not_ready());
18         assert!(tx.poll_cancel().unwrap().is_not_ready());
19         drop(rx);
20         assert!(tx.poll_cancel().unwrap().is_ready());
21         assert!(tx.poll_cancel().unwrap().is_ready());
22         ok::<(), ()>(())
23     }));
24     assert!(task.poll_future_notify(&notify_noop(), 0).unwrap().is_ready());
25 }
26 
27 #[test]
28 fn cancel_notifies() {
29     let (tx, rx) = channel::<u32>();
30     let (tx2, rx2) = mpsc::channel();
31 
32     WaitForCancel { tx: tx }.then(move |v| tx2.send(v)).forget();
33     drop(rx);
34     rx2.recv().unwrap().unwrap();
35 }
36 
37 struct WaitForCancel {
38     tx: Sender<u32>,
39 }
40 
41 impl Future for WaitForCancel {
42     type Item = ();
43     type Error = ();
44 
45     fn poll(&mut self) -> Poll<(), ()> {
46         self.tx.poll_cancel()
47     }
48 }
49 
50 #[test]
51 fn cancel_lots() {
52     let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
53     let t = thread::spawn(move || {
54         for (tx, tx2) in rx {
55             WaitForCancel { tx: tx }.then(move |v| tx2.send(v)).forget();
56         }
57 
58     });
59 
60     for _ in 0..20000 {
61         let (otx, orx) = channel::<u32>();
62         let (tx2, rx2) = mpsc::channel();
63         tx.send((otx, tx2)).unwrap();
64         drop(orx);
65         rx2.recv().unwrap().unwrap();
66     }
67     drop(tx);
68 
69     t.join().unwrap();
70 }
71 
72 #[test]
73 fn close() {
74     let (mut tx, mut rx) = channel::<u32>();
75     rx.close();
76     assert!(rx.poll().is_err());
77     assert!(tx.poll_cancel().unwrap().is_ready());
78 }
79 
80 #[test]
81 fn close_wakes() {
82     let (tx, mut rx) = channel::<u32>();
83     let (tx2, rx2) = mpsc::channel();
84     let t = thread::spawn(move || {
85         rx.close();
86         rx2.recv().unwrap();
87     });
88     WaitForCancel { tx: tx }.wait().unwrap();
89     tx2.send(()).unwrap();
90     t.join().unwrap();
91 }
92 
93 #[test]
94 fn is_canceled() {
95     let (tx, rx) = channel::<u32>();
96     assert!(!tx.is_canceled());
97     drop(rx);
98     assert!(tx.is_canceled());
99 }
100 
101 #[test]
102 fn cancel_sends() {
103     let (tx, rx) = mpsc::channel::<Sender<_>>();
104     let t = thread::spawn(move || {
105         for otx in rx {
106             let _ = otx.send(42);
107         }
108     });
109 
110     for _ in 0..20000 {
111         let (otx, mut orx) = channel::<u32>();
112         tx.send(otx).unwrap();
113 
114         orx.close();
115         // Not necessary to wrap in a task because the implementation of oneshot
116         // never calls `task::current()` if the channel has been closed already.
117         let _ = orx.poll();
118     }
119 
120     drop(tx);
121     t.join().unwrap();
122 }
123 
124 #[test]
125 fn spawn_sends_items() {
126     let core = local_executor::Core::new();
127     let future = ok::<_, ()>(1);
128     let rx = spawn(future, &core);
129     assert_eq!(core.run(rx).unwrap(), 1);
130 }
131 
132 #[test]
133 fn spawn_kill_dead_stream() {
134     use std::thread;
135     use std::time::Duration;
136     use futures::future::Either;
137     use futures::sync::oneshot;
138 
139     // a future which never returns anything (forever accepting incoming
140     // connections), but dropping it leads to observable side effects
141     // (like closing listening sockets, releasing limited resources,
142     // ...)
143     #[derive(Debug)]
144     struct Dead {
145         // when dropped you should get Err(oneshot::Canceled) on the
146         // receiving end
147         done: oneshot::Sender<()>,
148     }
149     impl Future for Dead {
150         type Item = ();
151         type Error = ();
152 
153         fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
154             Ok(Async::NotReady)
155         }
156     }
157 
158     // need to implement a timeout for the test, as it would hang
159     // forever right now
160     let (timeout_tx, timeout_rx) = oneshot::channel();
161     thread::spawn(move || {
162         thread::sleep(Duration::from_millis(1000));
163         let _ = timeout_tx.send(());
164     });
165 
166     let core = local_executor::Core::new();
167     let (done_tx, done_rx) = oneshot::channel();
168     let future = Dead{done: done_tx};
169     let rx = spawn(future, &core);
170     let res = core.run(
171         Ok::<_, ()>(())
172         .into_future()
173         .then(move |_| {
174             // now drop the spawned future: maybe some timeout exceeded,
175             // or some connection on this end was closed by the remote
176             // end.
177             drop(rx);
178             // and wait for the spawned future to release its resources
179             done_rx
180         })
181         .select2(timeout_rx)
182     );
183     match res {
184         Err(Either::A((oneshot::Canceled, _))) => (),
185         Ok(Either::B(((), _))) => {
186             panic!("dead future wasn't canceled (timeout)");
187         },
188         _ => {
189             panic!("dead future wasn't canceled (unexpected result)");
190         },
191     }
192 }
193 
194 #[test]
195 fn spawn_dont_kill_forgot_dead_stream() {
196     use std::thread;
197     use std::time::Duration;
198     use futures::future::Either;
199     use futures::sync::oneshot;
200 
201     // a future which never returns anything (forever accepting incoming
202     // connections), but dropping it leads to observable side effects
203     // (like closing listening sockets, releasing limited resources,
204     // ...)
205     #[derive(Debug)]
206     struct Dead {
207         // when dropped you should get Err(oneshot::Canceled) on the
208         // receiving end
209         done: oneshot::Sender<()>,
210     }
211     impl Future for Dead {
212         type Item = ();
213         type Error = ();
214 
215         fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
216             Ok(Async::NotReady)
217         }
218     }
219 
220     // need to implement a timeout for the test, as it would hang
221     // forever right now
222     let (timeout_tx, timeout_rx) = oneshot::channel();
223     thread::spawn(move || {
224         thread::sleep(Duration::from_millis(1000));
225         let _ = timeout_tx.send(());
226     });
227 
228     let core = local_executor::Core::new();
229     let (done_tx, done_rx) = oneshot::channel();
230     let future = Dead{done: done_tx};
231     let rx = spawn(future, &core);
232     let res = core.run(
233         Ok::<_, ()>(())
234         .into_future()
235         .then(move |_| {
236             // forget the spawned future: should keep running, i.e. hit
237             // the timeout below.
238             rx.forget();
239             // and wait for the spawned future to release its resources
240             done_rx
241         })
242         .select2(timeout_rx)
243     );
244     match res {
245         Err(Either::A((oneshot::Canceled, _))) => {
246             panic!("forgotten dead future was canceled");
247         },
248         Ok(Either::B(((), _))) => (), // reached timeout
249         _ => {
250             panic!("forgotten dead future was canceled (unexpected result)");
251         },
252     }
253 }
254