1 extern crate futures; 2 3 use std::sync::mpsc; 4 use std::thread; 5 6 use futures::prelude::*; 7 use futures::future::{lazy, ok}; 8 use futures::sync::oneshot::*; 9 10 mod support; 11 use support::*; 12 13 #[test] 14 fn smoke_poll() { 15 let (mut tx, rx) = channel::<u32>(); 16 let mut task = futures::executor::spawn(lazy(|| { 17 assert!(tx.poll_cancel().unwrap().is_not_ready()); 18 assert!(tx.poll_cancel().unwrap().is_not_ready()); 19 drop(rx); 20 assert!(tx.poll_cancel().unwrap().is_ready()); 21 assert!(tx.poll_cancel().unwrap().is_ready()); 22 ok::<(), ()>(()) 23 })); 24 assert!(task.poll_future_notify(¬ify_noop(), 0).unwrap().is_ready()); 25 } 26 27 #[test] 28 fn cancel_notifies() { 29 let (tx, rx) = channel::<u32>(); 30 let (tx2, rx2) = mpsc::channel(); 31 32 WaitForCancel { tx: tx }.then(move |v| tx2.send(v)).forget(); 33 drop(rx); 34 rx2.recv().unwrap().unwrap(); 35 } 36 37 struct WaitForCancel { 38 tx: Sender<u32>, 39 } 40 41 impl Future for WaitForCancel { 42 type Item = (); 43 type Error = (); 44 45 fn poll(&mut self) -> Poll<(), ()> { 46 self.tx.poll_cancel() 47 } 48 } 49 50 #[test] 51 fn cancel_lots() { 52 let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>(); 53 let t = thread::spawn(move || { 54 for (tx, tx2) in rx { 55 WaitForCancel { tx: tx }.then(move |v| tx2.send(v)).forget(); 56 } 57 58 }); 59 60 for _ in 0..20000 { 61 let (otx, orx) = channel::<u32>(); 62 let (tx2, rx2) = mpsc::channel(); 63 tx.send((otx, tx2)).unwrap(); 64 drop(orx); 65 rx2.recv().unwrap().unwrap(); 66 } 67 drop(tx); 68 69 t.join().unwrap(); 70 } 71 72 #[test] 73 fn close() { 74 let (mut tx, mut rx) = channel::<u32>(); 75 rx.close(); 76 assert!(rx.poll().is_err()); 77 assert!(tx.poll_cancel().unwrap().is_ready()); 78 } 79 80 #[test] 81 fn close_wakes() { 82 let (tx, mut rx) = channel::<u32>(); 83 let (tx2, rx2) = mpsc::channel(); 84 let t = thread::spawn(move || { 85 rx.close(); 86 rx2.recv().unwrap(); 87 }); 88 WaitForCancel { tx: tx }.wait().unwrap(); 89 tx2.send(()).unwrap(); 90 t.join().unwrap(); 91 } 92 93 #[test] 94 fn is_canceled() { 95 let (tx, rx) = channel::<u32>(); 96 assert!(!tx.is_canceled()); 97 drop(rx); 98 assert!(tx.is_canceled()); 99 } 100 101 #[test] 102 fn cancel_sends() { 103 let (tx, rx) = mpsc::channel::<Sender<_>>(); 104 let t = thread::spawn(move || { 105 for otx in rx { 106 let _ = otx.send(42); 107 } 108 }); 109 110 for _ in 0..20000 { 111 let (otx, mut orx) = channel::<u32>(); 112 tx.send(otx).unwrap(); 113 114 orx.close(); 115 // Not necessary to wrap in a task because the implementation of oneshot 116 // never calls `task::current()` if the channel has been closed already. 117 let _ = orx.poll(); 118 } 119 120 drop(tx); 121 t.join().unwrap(); 122 } 123 124 #[test] 125 fn spawn_sends_items() { 126 let core = local_executor::Core::new(); 127 let future = ok::<_, ()>(1); 128 let rx = spawn(future, &core); 129 assert_eq!(core.run(rx).unwrap(), 1); 130 } 131 132 #[test] 133 fn spawn_kill_dead_stream() { 134 use std::thread; 135 use std::time::Duration; 136 use futures::future::Either; 137 use futures::sync::oneshot; 138 139 // a future which never returns anything (forever accepting incoming 140 // connections), but dropping it leads to observable side effects 141 // (like closing listening sockets, releasing limited resources, 142 // ...) 143 #[derive(Debug)] 144 struct Dead { 145 // when dropped you should get Err(oneshot::Canceled) on the 146 // receiving end 147 done: oneshot::Sender<()>, 148 } 149 impl Future for Dead { 150 type Item = (); 151 type Error = (); 152 153 fn poll(&mut self) -> Poll<Self::Item, Self::Error> { 154 Ok(Async::NotReady) 155 } 156 } 157 158 // need to implement a timeout for the test, as it would hang 159 // forever right now 160 let (timeout_tx, timeout_rx) = oneshot::channel(); 161 thread::spawn(move || { 162 thread::sleep(Duration::from_millis(1000)); 163 let _ = timeout_tx.send(()); 164 }); 165 166 let core = local_executor::Core::new(); 167 let (done_tx, done_rx) = oneshot::channel(); 168 let future = Dead{done: done_tx}; 169 let rx = spawn(future, &core); 170 let res = core.run( 171 Ok::<_, ()>(()) 172 .into_future() 173 .then(move |_| { 174 // now drop the spawned future: maybe some timeout exceeded, 175 // or some connection on this end was closed by the remote 176 // end. 177 drop(rx); 178 // and wait for the spawned future to release its resources 179 done_rx 180 }) 181 .select2(timeout_rx) 182 ); 183 match res { 184 Err(Either::A((oneshot::Canceled, _))) => (), 185 Ok(Either::B(((), _))) => { 186 panic!("dead future wasn't canceled (timeout)"); 187 }, 188 _ => { 189 panic!("dead future wasn't canceled (unexpected result)"); 190 }, 191 } 192 } 193 194 #[test] 195 fn spawn_dont_kill_forgot_dead_stream() { 196 use std::thread; 197 use std::time::Duration; 198 use futures::future::Either; 199 use futures::sync::oneshot; 200 201 // a future which never returns anything (forever accepting incoming 202 // connections), but dropping it leads to observable side effects 203 // (like closing listening sockets, releasing limited resources, 204 // ...) 205 #[derive(Debug)] 206 struct Dead { 207 // when dropped you should get Err(oneshot::Canceled) on the 208 // receiving end 209 done: oneshot::Sender<()>, 210 } 211 impl Future for Dead { 212 type Item = (); 213 type Error = (); 214 215 fn poll(&mut self) -> Poll<Self::Item, Self::Error> { 216 Ok(Async::NotReady) 217 } 218 } 219 220 // need to implement a timeout for the test, as it would hang 221 // forever right now 222 let (timeout_tx, timeout_rx) = oneshot::channel(); 223 thread::spawn(move || { 224 thread::sleep(Duration::from_millis(1000)); 225 let _ = timeout_tx.send(()); 226 }); 227 228 let core = local_executor::Core::new(); 229 let (done_tx, done_rx) = oneshot::channel(); 230 let future = Dead{done: done_tx}; 231 let rx = spawn(future, &core); 232 let res = core.run( 233 Ok::<_, ()>(()) 234 .into_future() 235 .then(move |_| { 236 // forget the spawned future: should keep running, i.e. hit 237 // the timeout below. 238 rx.forget(); 239 // and wait for the spawned future to release its resources 240 done_rx 241 }) 242 .select2(timeout_rx) 243 ); 244 match res { 245 Err(Either::A((oneshot::Canceled, _))) => { 246 panic!("forgotten dead future was canceled"); 247 }, 248 Ok(Either::B(((), _))) => (), // reached timeout 249 _ => { 250 panic!("forgotten dead future was canceled (unexpected result)"); 251 }, 252 } 253 } 254