1 //! This crate provides multiple mechanisms for interrupting a `Stream`. 2 //! 3 //! # Stream combinator 4 //! 5 //! The extension trait [`StreamExt`] provides a single new `Stream` combinator: `take_until`. 6 //! [`StreamExt::take_until`] continues yielding elements from the underlying `Stream` until a 7 //! `Future` resolves, and at that moment immediately yields `None` and stops producing further 8 //! elements. 9 //! 10 //! For convenience, the crate also includes the [`Tripwire`] type, which produces a cloneable 11 //! `Future` that can then be passed to `take_until`. When a new `Tripwire` is created, an 12 //! associated [`Trigger`] is also returned, which interrupts the `Stream` when it is dropped. 13 //! 14 //! 15 //! ``` 16 //! # extern crate stream_cancel; 17 //! extern crate tokio; 18 //! 19 //! use stream_cancel::{StreamExt, Tripwire}; 20 //! use tokio::prelude::*; 21 //! 22 //! let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap(); 23 //! let (trigger, tripwire) = Tripwire::new(); 24 //! 25 //! let mut rt = tokio::runtime::Runtime::new().unwrap(); 26 //! rt.spawn( 27 //! listener 28 //! .incoming() 29 //! .take_until(tripwire) 30 //! .map_err(|e| eprintln!("accept failed = {:?}", e)) 31 //! .for_each(|sock| { 32 //! let (reader, writer) = sock.split(); 33 //! tokio::spawn( 34 //! tokio::io::copy(reader, writer) 35 //! .map(|amt| println!("wrote {:?} bytes", amt)) 36 //! .map_err(|err| eprintln!("IO error {:?}", err)), 37 //! ) 38 //! }), 39 //! ); 40 //! 41 //! // tell the listener to stop accepting new connections 42 //! drop(trigger); 43 //! rt.shutdown_on_idle().wait().unwrap(); 44 //! ``` 45 //! 46 //! # Stream wrapper 47 //! 48 //! Any stream can be wrapped in a [`Valved`], which enables it to be remotely terminated through 49 //! an associated [`Trigger`]. This can be useful to implement graceful shutdown on "infinite" 50 //! streams like a `TcpListener`. Once [`Trigger::close`] is called on the handle for a given 51 //! stream's [`Valved`], the stream will yield `None` to indicate that it has terminated. 52 //! 53 //! ``` 54 //! # extern crate stream_cancel; 55 //! extern crate tokio; 56 //! 57 //! use stream_cancel::Valved; 58 //! use tokio::prelude::*; 59 //! use std::thread; 60 //! 61 //! let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap(); 62 //! let (exit, incoming) = Valved::new(listener.incoming()); 63 //! 64 //! let server = thread::spawn(move || { 65 //! // start a tokio echo server 66 //! tokio::run( 67 //! incoming 68 //! .map_err(|e| eprintln!("accept failed = {:?}", e)) 69 //! .for_each(|sock| { 70 //! let (reader, writer) = sock.split(); 71 //! tokio::spawn( 72 //! tokio::io::copy(reader, writer) 73 //! .map(|amt| println!("wrote {:?} bytes", amt)) 74 //! .map_err(|err| eprintln!("IO error {:?}", err)), 75 //! ) 76 //! }), 77 //! ) 78 //! }); 79 //! 80 //! // the server thread will normally never exit, since more connections 81 //! // can always arrive. however, with a Valved, we can turn off the 82 //! // stream of incoming connections to initiate a graceful shutdown 83 //! drop(exit); 84 //! server.join().unwrap(); 85 //! ``` 86 //! 87 //! You can share the same [`Trigger`] between multiple streams by first creating a [`Valve`], 88 //! and then wrapping multiple streams using [`Valve::Wrap`]: 89 //! 90 //! ``` 91 //! # extern crate stream_cancel; 92 //! extern crate tokio; 93 //! 94 //! use stream_cancel::Valve; 95 //! use tokio::prelude::*; 96 //! 97 //! let (exit, valve) = Valve::new(); 98 //! let listener1 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap(); 99 //! let listener2 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap(); 100 //! let incoming1 = valve.wrap(listener1.incoming()); 101 //! let incoming2 = valve.wrap(listener2.incoming()); 102 //! 103 //! let mut rt = tokio::runtime::Runtime::new().unwrap(); 104 //! rt.spawn( 105 //! incoming1 106 //! .select(incoming2) 107 //! .map_err(|e| eprintln!("accept failed = {:?}", e)) 108 //! .for_each(|sock| { 109 //! let (reader, writer) = sock.split(); 110 //! tokio::spawn( 111 //! tokio::io::copy(reader, writer) 112 //! .map(|amt| println!("wrote {:?} bytes", amt)) 113 //! .map_err(|err| eprintln!("IO error {:?}", err)), 114 //! ) 115 //! }), 116 //! ); 117 //! 118 //! // the runtime will not become idle until both incoming1 and incoming2 have stopped 119 //! // (due to the select). this checks that they are indeed both interrupted when the 120 //! // valve is closed. 121 //! drop(exit); 122 //! rt.shutdown_on_idle().wait().unwrap(); 123 //! ``` 124 125 #![deny(missing_docs)] 126 127 extern crate futures; 128 129 #[cfg(test)] 130 extern crate tokio; 131 132 use futures::sync::oneshot; 133 134 mod combinator; 135 mod wrapper; 136 137 pub use combinator::{StreamExt, TakeUntil, Tripwire}; 138 pub use wrapper::{Valve, Valved}; 139 140 /// A handle to a set of cancellable streams. 141 /// 142 /// If the `Trigger` is dropped, any streams associated with it are interrupted (this is equivalent 143 /// to calling [`Trigger::close`]. To override this behavior, call [`Trigger::disable`]. 144 #[derive(Debug)] 145 pub struct Trigger(Option<oneshot::Sender<()>>); 146 147 impl Trigger { 148 /// Cancel all associated streams, and make them immediately yield `None`. cancel(self)149 pub fn cancel(self) { 150 drop(self); 151 } 152 153 /// Disable the `Trigger`, and leave all associated streams running to completion. disable(mut self)154 pub fn disable(mut self) { 155 let _ = self.0.take(); 156 drop(self); 157 } 158 } 159 160 impl Drop for Trigger { drop(&mut self)161 fn drop(&mut self) { 162 if let Some(tx) = self.0.take() { 163 // Send may fail when all associated rx'es are dropped already 164 // so code here cannot panic on error 165 let _ = tx.send(()); 166 } 167 } 168 } 169 170 #[cfg(test)] 171 mod tests { 172 use super::*; 173 use tokio::prelude::*; 174 175 #[test] tokio_run()176 fn tokio_run() { 177 use std::thread; 178 179 let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap(); 180 let (exit, incoming) = Valved::new(listener.incoming()); 181 182 let server = thread::spawn(move || { 183 // start a tokio echo server 184 tokio::run( 185 incoming 186 .map_err(|e| eprintln!("accept failed = {:?}", e)) 187 .for_each(|sock| { 188 let (reader, writer) = sock.split(); 189 tokio::spawn( 190 tokio::io::copy(reader, writer) 191 .map(|amt| println!("wrote {:?} bytes", amt)) 192 .map_err(|err| eprintln!("IO error {:?}", err)), 193 ) 194 }), 195 ) 196 }); 197 198 // the server thread will normally never exit, since more connections 199 // can always arrive. however, with a Valved, we can turn off the 200 // stream of incoming connections to initiate a graceful shutdown 201 drop(exit); 202 server.join().unwrap(); 203 } 204 205 #[test] tokio_rt_on_idle()206 fn tokio_rt_on_idle() { 207 let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap(); 208 let (exit, incoming) = Valved::new(listener.incoming()); 209 210 let mut rt = tokio::runtime::Runtime::new().unwrap(); 211 rt.spawn( 212 incoming 213 .map_err(|e| eprintln!("accept failed = {:?}", e)) 214 .for_each(|sock| { 215 let (reader, writer) = sock.split(); 216 tokio::spawn( 217 tokio::io::copy(reader, writer) 218 .map(|amt| println!("wrote {:?} bytes", amt)) 219 .map_err(|err| eprintln!("IO error {:?}", err)), 220 ) 221 }), 222 ); 223 224 drop(exit); 225 rt.shutdown_on_idle().wait().unwrap(); 226 } 227 228 #[test] multi_interrupt()229 fn multi_interrupt() { 230 let (exit, valve) = Valve::new(); 231 let listener1 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap(); 232 let listener2 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap(); 233 let incoming1 = valve.wrap(listener1.incoming()); 234 let incoming2 = valve.wrap(listener2.incoming()); 235 236 let mut rt = tokio::runtime::Runtime::new().unwrap(); 237 rt.spawn( 238 incoming1 239 .select(incoming2) 240 .map_err(|e| eprintln!("accept failed = {:?}", e)) 241 .for_each(|sock| { 242 let (reader, writer) = sock.split(); 243 tokio::spawn( 244 tokio::io::copy(reader, writer) 245 .map(|amt| println!("wrote {:?} bytes", amt)) 246 .map_err(|err| eprintln!("IO error {:?}", err)), 247 ) 248 }), 249 ); 250 251 // the runtime will not become idle until both incoming1 and incoming2 have stopped (due to 252 // the select). this checks that they are indeed both interrupted when the valve is closed. 253 drop(exit); 254 rt.shutdown_on_idle().wait().unwrap(); 255 } 256 257 #[test] yields_many()258 fn yields_many() { 259 use std::sync::{ 260 atomic::{AtomicUsize, Ordering}, 261 Arc, 262 }; 263 264 let (exit, valve) = Valve::new(); 265 let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap(); 266 let addr = listener.local_addr().unwrap(); 267 let incoming = valve.wrap(listener.incoming()); 268 269 let reqs = Arc::new(AtomicUsize::new(0)); 270 let got = reqs.clone(); 271 let mut rt = tokio::runtime::Runtime::new().unwrap(); 272 rt.spawn( 273 incoming 274 .map_err(|e| panic!("accept failed = {:?}", e)) 275 .for_each(move |sock| { 276 reqs.fetch_add(1, Ordering::SeqCst); 277 let (reader, writer) = sock.split(); 278 tokio::spawn( 279 tokio::io::copy(reader, writer) 280 .map(|_| ()) 281 .map_err(|err| panic!("IO error {:?}", err)), 282 ) 283 }), 284 ); 285 286 { 287 tokio::net::TcpStream::connect(&addr) 288 .and_then(|s| tokio::io::write_all(s, b"hello")) 289 .and_then(|(s, _)| tokio::io::read_exact(s, vec![0; 5])) 290 .and_then(|(_, buf)| { 291 assert_eq!(buf, b"hello"); 292 tokio::net::TcpStream::connect(&addr) 293 }) 294 .and_then(|s| tokio::io::write_all(s, b"world")) 295 .and_then(|(s, _)| tokio::io::read_exact(s, vec![0; 5])) 296 .inspect(|&(_, ref buf)| { 297 assert_eq!(buf, b"world"); 298 }) 299 .wait() 300 .unwrap(); 301 } 302 assert_eq!(got.load(Ordering::SeqCst), 2); 303 304 drop(exit); 305 rt.shutdown_on_idle().wait().unwrap(); 306 } 307 308 #[test] yields_some()309 fn yields_some() { 310 use std::sync::{ 311 atomic::{AtomicUsize, Ordering}, 312 Arc, 313 }; 314 315 let (exit, valve) = Valve::new(); 316 let listener1 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap(); 317 let listener2 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap(); 318 let addr1 = listener1.local_addr().unwrap(); 319 let addr2 = listener2.local_addr().unwrap(); 320 let incoming1 = valve.wrap(listener1.incoming()); 321 let incoming2 = valve.wrap(listener2.incoming()); 322 323 let reqs = Arc::new(AtomicUsize::new(0)); 324 let got = reqs.clone(); 325 let mut rt = tokio::runtime::Runtime::new().unwrap(); 326 rt.spawn( 327 incoming1 328 .select(incoming2) 329 .map_err(|e| panic!("accept failed = {:?}", e)) 330 .for_each(move |sock| { 331 reqs.fetch_add(1, Ordering::SeqCst); 332 let (reader, writer) = sock.split(); 333 tokio::spawn( 334 tokio::io::copy(reader, writer) 335 .map(|_| ()) 336 .map_err(|err| panic!("IO error {:?}", err)), 337 ) 338 }), 339 ); 340 341 { 342 tokio::net::TcpStream::connect(&addr1) 343 .and_then(|s| tokio::io::write_all(s, b"hello")) 344 .and_then(|(s, _)| tokio::io::read_exact(s, vec![0; 5])) 345 .and_then(|(_, buf)| { 346 assert_eq!(buf, b"hello"); 347 tokio::net::TcpStream::connect(&addr2) 348 }) 349 .and_then(|s| tokio::io::write_all(s, b"world")) 350 .and_then(|(s, _)| tokio::io::read_exact(s, vec![0; 5])) 351 .inspect(|&(_, ref buf)| { 352 assert_eq!(buf, b"world"); 353 }) 354 .wait() 355 .unwrap(); 356 } 357 assert_eq!(got.load(Ordering::SeqCst), 2); 358 359 drop(exit); 360 rt.shutdown_on_idle().wait().unwrap(); 361 } 362 } 363