1 //! This crate provides multiple mechanisms for interrupting a `Stream`.
2 //!
3 //! # Stream combinator
4 //!
5 //! The extension trait [`StreamExt`] provides a single new `Stream` combinator: `take_until`.
6 //! [`StreamExt::take_until`] continues yielding elements from the underlying `Stream` until a
7 //! `Future` resolves, and at that moment immediately yields `None` and stops producing further
8 //! elements.
9 //!
10 //! For convenience, the crate also includes the [`Tripwire`] type, which produces a cloneable
11 //! `Future` that can then be passed to `take_until`. When a new `Tripwire` is created, an
12 //! associated [`Trigger`] is also returned, which interrupts the `Stream` when it is dropped.
13 //!
14 //!
15 //! ```
16 //! # extern crate stream_cancel;
17 //! extern crate tokio;
18 //!
19 //! use stream_cancel::{StreamExt, Tripwire};
20 //! use tokio::prelude::*;
21 //!
22 //! let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
23 //! let (trigger, tripwire) = Tripwire::new();
24 //!
25 //! let mut rt = tokio::runtime::Runtime::new().unwrap();
26 //! rt.spawn(
27 //!     listener
28 //!         .incoming()
29 //!         .take_until(tripwire)
30 //!         .map_err(|e| eprintln!("accept failed = {:?}", e))
31 //!         .for_each(|sock| {
32 //!             let (reader, writer) = sock.split();
33 //!             tokio::spawn(
34 //!                 tokio::io::copy(reader, writer)
35 //!                     .map(|amt| println!("wrote {:?} bytes", amt))
36 //!                     .map_err(|err| eprintln!("IO error {:?}", err)),
37 //!             )
38 //!         }),
39 //! );
40 //!
41 //! // tell the listener to stop accepting new connections
42 //! drop(trigger);
43 //! rt.shutdown_on_idle().wait().unwrap();
44 //! ```
45 //!
46 //! # Stream wrapper
47 //!
48 //! Any stream can be wrapped in a [`Valved`], which enables it to be remotely terminated through
49 //! an associated [`Trigger`]. This can be useful to implement graceful shutdown on "infinite"
50 //! streams like a `TcpListener`. Once [`Trigger::close`] is called on the handle for a given
51 //! stream's [`Valved`], the stream will yield `None` to indicate that it has terminated.
52 //!
53 //! ```
54 //! # extern crate stream_cancel;
55 //! extern crate tokio;
56 //!
57 //! use stream_cancel::Valved;
58 //! use tokio::prelude::*;
59 //! use std::thread;
60 //!
61 //! let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
62 //! let (exit, incoming) = Valved::new(listener.incoming());
63 //!
64 //! let server = thread::spawn(move || {
65 //!     // start a tokio echo server
66 //!     tokio::run(
67 //!         incoming
68 //!             .map_err(|e| eprintln!("accept failed = {:?}", e))
69 //!             .for_each(|sock| {
70 //!                 let (reader, writer) = sock.split();
71 //!                 tokio::spawn(
72 //!                     tokio::io::copy(reader, writer)
73 //!                         .map(|amt| println!("wrote {:?} bytes", amt))
74 //!                         .map_err(|err| eprintln!("IO error {:?}", err)),
75 //!                 )
76 //!             }),
77 //!     )
78 //! });
79 //!
80 //! // the server thread will normally never exit, since more connections
81 //! // can always arrive. however, with a Valved, we can turn off the
82 //! // stream of incoming connections to initiate a graceful shutdown
83 //! drop(exit);
84 //! server.join().unwrap();
85 //! ```
86 //!
87 //! You can share the same [`Trigger`] between multiple streams by first creating a [`Valve`],
88 //! and then wrapping multiple streams using [`Valve::Wrap`]:
89 //!
90 //! ```
91 //! # extern crate stream_cancel;
92 //! extern crate tokio;
93 //!
94 //! use stream_cancel::Valve;
95 //! use tokio::prelude::*;
96 //!
97 //! let (exit, valve) = Valve::new();
98 //! let listener1 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
99 //! let listener2 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
100 //! let incoming1 = valve.wrap(listener1.incoming());
101 //! let incoming2 = valve.wrap(listener2.incoming());
102 //!
103 //! let mut rt = tokio::runtime::Runtime::new().unwrap();
104 //! rt.spawn(
105 //!     incoming1
106 //!         .select(incoming2)
107 //!         .map_err(|e| eprintln!("accept failed = {:?}", e))
108 //!         .for_each(|sock| {
109 //!             let (reader, writer) = sock.split();
110 //!             tokio::spawn(
111 //!                 tokio::io::copy(reader, writer)
112 //!                     .map(|amt| println!("wrote {:?} bytes", amt))
113 //!                     .map_err(|err| eprintln!("IO error {:?}", err)),
114 //!             )
115 //!         }),
116 //! );
117 //!
118 //! // the runtime will not become idle until both incoming1 and incoming2 have stopped
119 //! // (due to the select). this checks that they are indeed both interrupted when the
120 //! // valve is closed.
121 //! drop(exit);
122 //! rt.shutdown_on_idle().wait().unwrap();
123 //! ```
124 
125 #![deny(missing_docs)]
126 
127 extern crate futures;
128 
129 #[cfg(test)]
130 extern crate tokio;
131 
132 use futures::sync::oneshot;
133 
134 mod combinator;
135 mod wrapper;
136 
137 pub use combinator::{StreamExt, TakeUntil, Tripwire};
138 pub use wrapper::{Valve, Valved};
139 
140 /// A handle to a set of cancellable streams.
141 ///
142 /// If the `Trigger` is dropped, any streams associated with it are interrupted (this is equivalent
143 /// to calling [`Trigger::close`]. To override this behavior, call [`Trigger::disable`].
144 #[derive(Debug)]
145 pub struct Trigger(Option<oneshot::Sender<()>>);
146 
147 impl Trigger {
148     /// Cancel all associated streams, and make them immediately yield `None`.
cancel(self)149     pub fn cancel(self) {
150         drop(self);
151     }
152 
153     /// Disable the `Trigger`, and leave all associated streams running to completion.
disable(mut self)154     pub fn disable(mut self) {
155         let _ = self.0.take();
156         drop(self);
157     }
158 }
159 
160 impl Drop for Trigger {
drop(&mut self)161     fn drop(&mut self) {
162         if let Some(tx) = self.0.take() {
163             // Send may fail when all associated rx'es are dropped already
164             // so code here cannot panic on error
165             let _ = tx.send(());
166         }
167     }
168 }
169 
170 #[cfg(test)]
171 mod tests {
172     use super::*;
173     use tokio::prelude::*;
174 
175     #[test]
tokio_run()176     fn tokio_run() {
177         use std::thread;
178 
179         let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
180         let (exit, incoming) = Valved::new(listener.incoming());
181 
182         let server = thread::spawn(move || {
183             // start a tokio echo server
184             tokio::run(
185                 incoming
186                     .map_err(|e| eprintln!("accept failed = {:?}", e))
187                     .for_each(|sock| {
188                         let (reader, writer) = sock.split();
189                         tokio::spawn(
190                             tokio::io::copy(reader, writer)
191                                 .map(|amt| println!("wrote {:?} bytes", amt))
192                                 .map_err(|err| eprintln!("IO error {:?}", err)),
193                         )
194                     }),
195             )
196         });
197 
198         // the server thread will normally never exit, since more connections
199         // can always arrive. however, with a Valved, we can turn off the
200         // stream of incoming connections to initiate a graceful shutdown
201         drop(exit);
202         server.join().unwrap();
203     }
204 
205     #[test]
tokio_rt_on_idle()206     fn tokio_rt_on_idle() {
207         let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
208         let (exit, incoming) = Valved::new(listener.incoming());
209 
210         let mut rt = tokio::runtime::Runtime::new().unwrap();
211         rt.spawn(
212             incoming
213                 .map_err(|e| eprintln!("accept failed = {:?}", e))
214                 .for_each(|sock| {
215                     let (reader, writer) = sock.split();
216                     tokio::spawn(
217                         tokio::io::copy(reader, writer)
218                             .map(|amt| println!("wrote {:?} bytes", amt))
219                             .map_err(|err| eprintln!("IO error {:?}", err)),
220                     )
221                 }),
222         );
223 
224         drop(exit);
225         rt.shutdown_on_idle().wait().unwrap();
226     }
227 
228     #[test]
multi_interrupt()229     fn multi_interrupt() {
230         let (exit, valve) = Valve::new();
231         let listener1 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
232         let listener2 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
233         let incoming1 = valve.wrap(listener1.incoming());
234         let incoming2 = valve.wrap(listener2.incoming());
235 
236         let mut rt = tokio::runtime::Runtime::new().unwrap();
237         rt.spawn(
238             incoming1
239                 .select(incoming2)
240                 .map_err(|e| eprintln!("accept failed = {:?}", e))
241                 .for_each(|sock| {
242                     let (reader, writer) = sock.split();
243                     tokio::spawn(
244                         tokio::io::copy(reader, writer)
245                             .map(|amt| println!("wrote {:?} bytes", amt))
246                             .map_err(|err| eprintln!("IO error {:?}", err)),
247                     )
248                 }),
249         );
250 
251         // the runtime will not become idle until both incoming1 and incoming2 have stopped (due to
252         // the select). this checks that they are indeed both interrupted when the valve is closed.
253         drop(exit);
254         rt.shutdown_on_idle().wait().unwrap();
255     }
256 
257     #[test]
yields_many()258     fn yields_many() {
259         use std::sync::{
260             atomic::{AtomicUsize, Ordering},
261             Arc,
262         };
263 
264         let (exit, valve) = Valve::new();
265         let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
266         let addr = listener.local_addr().unwrap();
267         let incoming = valve.wrap(listener.incoming());
268 
269         let reqs = Arc::new(AtomicUsize::new(0));
270         let got = reqs.clone();
271         let mut rt = tokio::runtime::Runtime::new().unwrap();
272         rt.spawn(
273             incoming
274                 .map_err(|e| panic!("accept failed = {:?}", e))
275                 .for_each(move |sock| {
276                     reqs.fetch_add(1, Ordering::SeqCst);
277                     let (reader, writer) = sock.split();
278                     tokio::spawn(
279                         tokio::io::copy(reader, writer)
280                             .map(|_| ())
281                             .map_err(|err| panic!("IO error {:?}", err)),
282                     )
283                 }),
284         );
285 
286         {
287             tokio::net::TcpStream::connect(&addr)
288                 .and_then(|s| tokio::io::write_all(s, b"hello"))
289                 .and_then(|(s, _)| tokio::io::read_exact(s, vec![0; 5]))
290                 .and_then(|(_, buf)| {
291                     assert_eq!(buf, b"hello");
292                     tokio::net::TcpStream::connect(&addr)
293                 })
294                 .and_then(|s| tokio::io::write_all(s, b"world"))
295                 .and_then(|(s, _)| tokio::io::read_exact(s, vec![0; 5]))
296                 .inspect(|&(_, ref buf)| {
297                     assert_eq!(buf, b"world");
298                 })
299                 .wait()
300                 .unwrap();
301         }
302         assert_eq!(got.load(Ordering::SeqCst), 2);
303 
304         drop(exit);
305         rt.shutdown_on_idle().wait().unwrap();
306     }
307 
308     #[test]
yields_some()309     fn yields_some() {
310         use std::sync::{
311             atomic::{AtomicUsize, Ordering},
312             Arc,
313         };
314 
315         let (exit, valve) = Valve::new();
316         let listener1 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
317         let listener2 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
318         let addr1 = listener1.local_addr().unwrap();
319         let addr2 = listener2.local_addr().unwrap();
320         let incoming1 = valve.wrap(listener1.incoming());
321         let incoming2 = valve.wrap(listener2.incoming());
322 
323         let reqs = Arc::new(AtomicUsize::new(0));
324         let got = reqs.clone();
325         let mut rt = tokio::runtime::Runtime::new().unwrap();
326         rt.spawn(
327             incoming1
328                 .select(incoming2)
329                 .map_err(|e| panic!("accept failed = {:?}", e))
330                 .for_each(move |sock| {
331                     reqs.fetch_add(1, Ordering::SeqCst);
332                     let (reader, writer) = sock.split();
333                     tokio::spawn(
334                         tokio::io::copy(reader, writer)
335                             .map(|_| ())
336                             .map_err(|err| panic!("IO error {:?}", err)),
337                     )
338                 }),
339         );
340 
341         {
342             tokio::net::TcpStream::connect(&addr1)
343                 .and_then(|s| tokio::io::write_all(s, b"hello"))
344                 .and_then(|(s, _)| tokio::io::read_exact(s, vec![0; 5]))
345                 .and_then(|(_, buf)| {
346                     assert_eq!(buf, b"hello");
347                     tokio::net::TcpStream::connect(&addr2)
348                 })
349                 .and_then(|s| tokio::io::write_all(s, b"world"))
350                 .and_then(|(s, _)| tokio::io::read_exact(s, vec![0; 5]))
351                 .inspect(|&(_, ref buf)| {
352                     assert_eq!(buf, b"world");
353                 })
354                 .wait()
355                 .unwrap();
356         }
357         assert_eq!(got.load(Ordering::SeqCst), 2);
358 
359         drop(exit);
360         rt.shutdown_on_idle().wait().unwrap();
361     }
362 }
363