1 //! Core I/O traits and combinators when working with Tokio.
2 //!
3 //! A description of the high-level I/O combinators can be [found online] in
4 //! addition to a description of the [low level details].
5 //!
6 //! [found online]: https://tokio.rs/docs/getting-started/core/
7 //! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/
8 
9 #![deny(missing_docs, missing_debug_implementations)]
10 #![doc(html_root_url = "https://docs.rs/tokio-io/0.1")]
11 
12 #[macro_use]
13 extern crate log;
14 
15 #[macro_use]
16 extern crate futures;
17 extern crate bytes;
18 
19 use std::io as std_io;
20 use std::io::Write;
21 
22 use futures::{Async, Poll};
23 use futures::future::BoxFuture;
24 use futures::stream::BoxStream;
25 
26 use bytes::{Buf, BufMut};
27 
28 /// A convenience typedef around a `Future` whose error component is `io::Error`
29 pub type IoFuture<T> = BoxFuture<T, std_io::Error>;
30 
31 /// A convenience typedef around a `Stream` whose error component is `io::Error`
32 pub type IoStream<T> = BoxStream<T, std_io::Error>;
33 
34 /// A convenience macro for working with `io::Result<T>` from the `Read` and
35 /// `Write` traits.
36 ///
37 /// This macro takes `io::Result<T>` as input, and returns `T` as the output. If
38 /// the input type is of the `Err` variant, then `Poll::NotReady` is returned if
39 /// it indicates `WouldBlock` or otherwise `Err` is returned.
40 #[macro_export]
41 macro_rules! try_nb {
42     ($e:expr) => (match $e {
43         Ok(t) => t,
44         Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => {
45             return Ok(::futures::Async::NotReady)
46         }
47         Err(e) => return Err(e.into()),
48     })
49 }
50 
51 pub mod io;
52 pub mod codec;
53 
54 mod copy;
55 mod flush;
56 mod framed;
57 mod framed_read;
58 mod framed_write;
59 mod length_delimited;
60 mod lines;
61 mod read;
62 mod read_exact;
63 mod read_to_end;
64 mod read_until;
65 mod shutdown;
66 mod split;
67 mod window;
68 mod write_all;
69 
70 use codec::{Decoder, Encoder, Framed};
71 use split::{ReadHalf, WriteHalf};
72 
73 /// A trait for readable objects which operated in an asynchronous and
74 /// futures-aware fashion.
75 ///
76 /// This trait inherits from `io::Read` and indicates as a marker that an I/O
77 /// object is **nonblocking**, meaning that it will return an error instead of
78 /// blocking when bytes are unavailable, but the stream hasn't reached EOF.
79 /// Specifically this means that the `read` function for types that implement
80 /// this trait can have a few return values:
81 ///
82 /// * `Ok(n)` means that `n` bytes of data was immediately read and placed into
83 ///   the output buffer, where `n` == 0 implies that EOF has been reached.
84 /// * `Err(e) if e.kind() == ErrorKind::WouldBlock` means that no data was read
85 ///   into the buffer provided. The I/O object is not currently readable but may
86 ///   become readable in the future. Most importantly, **the current future's
87 ///   task is scheduled to get unparked when the object is readable**. This
88 ///   means that like `Future::poll` you'll receive a notification when the I/O
89 ///   object is readable again.
90 /// * `Err(e)` for other errors are standard I/O errors coming from the
91 ///   underlying object.
92 ///
93 /// This trait importantly means that the `read` method only works in the
94 /// context of a future's task. The object may panic if used outside of a task.
95 pub trait AsyncRead: std_io::Read {
96     /// Prepares an uninitialized buffer to be safe to pass to `read`. Returns
97     /// `true` if the supplied buffer was zeroed out.
98     ///
99     /// While it would be highly unusual, implementations of [`io::Read`] are
100     /// able to read data from the buffer passed as an argument. Because of
101     /// this, the buffer passed to [`io::Read`] must be initialized memory. In
102     /// situations where large numbers of buffers are used, constantly having to
103     /// zero out buffers can be expensive.
104     ///
105     /// This function does any necessary work to prepare an uninitialized buffer
106     /// to be safe to pass to `read`. If `read` guarantees to never attempt read
107     /// data out of the supplied buffer, then `prepare_uninitialized_buffer`
108     /// doesn't need to do any work.
109     ///
110     /// If this function returns `true`, then the memory has been zeroed out.
111     /// This allows implementations of `AsyncRead` which are composed of
112     /// multiple sub implementations to efficiently implement
113     /// `prepare_uninitialized_buffer`.
114     ///
115     /// This function isn't actually `unsafe` to call but `unsafe` to implement.
116     /// The implementor must ensure that either the whole `buf` has been zeroed
117     /// or `read_buf()` overwrites the buffer without reading it and returns
118     /// correct value.
119     ///
120     /// This function is called from [`read_buf`].
121     ///
122     /// [`io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
123     /// [`read_buf`]: #method.read_buf
prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool124     unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
125         for i in 0..buf.len() {
126             buf[i] = 0;
127         }
128 
129         true
130     }
131 
132     /// Pull some bytes from this source into the specified `Buf`, returning
133     /// how many bytes were read.
134     ///
135     /// The `buf` provided will have bytes read into it and the internal cursor
136     /// will be advanced if any bytes were read. Note that this method typically
137     /// will not reallocate the buffer provided.
read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> where Self: Sized,138     fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error>
139         where Self: Sized,
140     {
141         if !buf.has_remaining_mut() {
142             return Ok(Async::Ready(0));
143         }
144 
145         unsafe {
146             let n = {
147                 let b = buf.bytes_mut();
148 
149                 self.prepare_uninitialized_buffer(b);
150 
151                 try_nb!(self.read(b))
152             };
153 
154             buf.advance_mut(n);
155             Ok(Async::Ready(n))
156         }
157     }
158 
159     /// Provides a `Stream` and `Sink` interface for reading and writing to this
160     /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
161     ///
162     /// Raw I/O objects work with byte sequences, but higher-level code usually
163     /// wants to batch these into meaningful chunks, called "frames". This
164     /// method layers framing on top of an I/O object, by using the `Codec`
165     /// traits to handle encoding and decoding of messages frames. Note that
166     /// the incoming and outgoing frame types may be distinct.
167     ///
168     /// This function returns a *single* object that is both `Stream` and
169     /// `Sink`; grouping this into a single object is often useful for layering
170     /// things like gzip or TLS, which require both read and write access to the
171     /// underlying object.
172     ///
173     /// If you want to work more directly with the streams and sink, consider
174     /// calling `split` on the `Framed` returned by this method, which will
175     /// break them into separate objects, allowing them to interact more easily.
framed<T: Encoder + Decoder>(self, codec: T) -> Framed<Self, T> where Self: AsyncWrite + Sized,176     fn framed<T: Encoder + Decoder>(self, codec: T) -> Framed<Self, T>
177         where Self: AsyncWrite + Sized,
178     {
179         framed::framed(self, codec)
180     }
181 
182     /// Helper method for splitting this read/write object into two halves.
183     ///
184     /// The two halves returned implement the `Read` and `Write` traits,
185     /// respectively.
split(self) -> (ReadHalf<Self>, WriteHalf<Self>) where Self: AsyncWrite + Sized,186     fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
187         where Self: AsyncWrite + Sized,
188     {
189         split::split(self)
190     }
191 }
192 
193 impl<T: ?Sized + AsyncRead> AsyncRead for Box<T> {
prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool194     unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
195         (**self).prepare_uninitialized_buffer(buf)
196     }
197 }
198 
199 impl<'a, T: ?Sized + AsyncRead> AsyncRead for &'a mut T {
prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool200     unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
201         (**self).prepare_uninitialized_buffer(buf)
202     }
203 }
204 
205 impl<'a> AsyncRead for &'a [u8] {
prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool206     unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool {
207         false
208     }
209 }
210 
211 /// A trait for writable objects which operated in an asynchronous and
212 /// futures-aware fashion.
213 ///
214 /// This trait inherits from `io::Write` and indicates that an I/O object is
215 /// **nonblocking**, meaning that it will return an error instead of blocking
216 /// when bytes cannot currently be written, but hasn't closed. Specifically
217 /// this means that the `write` function for types that implement this trait
218 /// can have a few return values:
219 ///
220 /// * `Ok(n)` means that `n` bytes of data was immediately written .
221 /// * `Err(e) if e.kind() == ErrorKind::WouldBlock` means that no data was
222 ///   written from the buffer provided. The I/O object is not currently
223 ///   writable but may become writable in the future. Most importantly, **the
224 ///   current future's task is scheduled to get unparked when the object is
225 ///   readable**. This means that like `Future::poll` you'll receive a
226 ///   notification when the I/O object is writable again.
227 /// * `Err(e)` for other errors are standard I/O errors coming from the
228 ///   underlying object.
229 ///
230 /// This trait importantly means that the `write` method only works in the
231 /// context of a future's task. The object may panic if used outside of a task.
232 pub trait AsyncWrite: std_io::Write {
233     /// Initiates or attempts to shut down this writer, returning success when
234     /// the I/O connection has completely shut down.
235     ///
236     /// This method is intended to be used for asynchronous shutdown of I/O
237     /// connections. For example this is suitable for implementing shutdown of a
238     /// TLS connection or calling `TcpStream::shutdown` on a proxied connection.
239     /// Protocols sometimes need to flush out final pieces of data or otherwise
240     /// perform a graceful shutdown handshake, reading/writing more data as
241     /// appropriate. This method is the hook for such protocols to implement the
242     /// graceful shutdown logic.
243     ///
244     /// This `shutdown` method is required by implementors of the
245     /// `AsyncWrite` trait. Wrappers typically just want to proxy this call
246     /// through to the wrapped type, and base types will typically implement
247     /// shutdown logic here or just return `Ok(().into())`. Note that if you're
248     /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that
249     /// transitively the entire stream has been shut down. After your wrapper's
250     /// shutdown logic has been executed you should shut down the underlying
251     /// stream.
252     ///
253     /// Invocation of a `shutdown` implies an invocation of `flush`. Once this
254     /// method returns `Ready` it implies that a flush successfully happened
255     /// before the shutdown happened. That is, callers don't need to call
256     /// `flush` before calling `shutdown`. They can rely that by calling
257     /// `shutdown` any pending buffered data will be written out.
258     ///
259     /// # Return value
260     ///
261     /// This function returns a `Poll<(), io::Error>` classified as such:
262     ///
263     /// * `Ok(Async::Ready(()))` - indicates that the connection was
264     ///   successfully shut down and is now safe to deallocate/drop/close
265     ///   resources associated with it. This method means that the current task
266     ///   will no longer receive any notifications due to this method and the
267     ///   I/O object itself is likely no longer usable.
268     ///
269     /// * `Ok(Async::NotReady)` - indicates that shutdown is initiated but could
270     ///   not complete just yet. This may mean that more I/O needs to happen to
271     ///   continue this shutdown operation. The current task is scheduled to
272     ///   receive a notification when it's otherwise ready to continue the
273     ///   shutdown operation. When woken up this method should be called again.
274     ///
275     /// * `Err(e)` - indicates a fatal error has happened with shutdown,
276     ///   indicating that the shutdown operation did not complete successfully.
277     ///   This typically means that the I/O object is no longer usable.
278     ///
279     /// # Errors
280     ///
281     /// This function can return normal I/O errors through `Err`, described
282     /// above. Additionally this method may also render the underlying
283     /// `Write::write` method no longer usable (e.g. will return errors in the
284     /// future). It's recommended that once `shutdown` is called the
285     /// `write` method is no longer called.
286     ///
287     /// # Panics
288     ///
289     /// This function will panic if not called within the context of a future's
290     /// task.
shutdown(&mut self) -> Poll<(), std_io::Error>291     fn shutdown(&mut self) -> Poll<(), std_io::Error>;
292 
293     /// Write a `Buf` into this value, returning how many bytes were written.
294     ///
295     /// Note that this method will advance the `buf` provided automatically by
296     /// the number of bytes written.
write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> where Self: Sized,297     fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error>
298         where Self: Sized,
299     {
300         if !buf.has_remaining() {
301             return Ok(Async::Ready(0));
302         }
303 
304         let n = try_nb!(self.write(buf.bytes()));
305         buf.advance(n);
306         Ok(Async::Ready(n))
307     }
308 }
309 
310 impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> {
shutdown(&mut self) -> Poll<(), std_io::Error>311     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
312         (**self).shutdown()
313     }
314 }
315 impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T {
shutdown(&mut self) -> Poll<(), std_io::Error>316     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
317         (**self).shutdown()
318     }
319 }
320 
321 impl AsyncRead for std_io::Repeat {
prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool322     unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
323         false
324     }
325 }
326 
327 impl AsyncWrite for std_io::Sink {
shutdown(&mut self) -> Poll<(), std_io::Error>328     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
329         Ok(().into())
330     }
331 }
332 
333 // TODO: Implement `prepare_uninitialized_buffer` for `io::Take`.
334 // This is blocked on rust-lang/rust#27269
335 impl<T: AsyncRead> AsyncRead for std_io::Take<T> {
336 }
337 
338 // TODO: Implement `prepare_uninitialized_buffer` when upstream exposes inner
339 // parts
340 impl<T, U> AsyncRead for std_io::Chain<T, U>
341     where T: AsyncRead,
342           U: AsyncRead,
343 {
344 }
345 
346 impl<T: AsyncWrite> AsyncWrite for std_io::BufWriter<T> {
shutdown(&mut self) -> Poll<(), std_io::Error>347     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
348         try_nb!(self.flush());
349         self.get_mut().shutdown()
350     }
351 }
352 
353 impl<T: AsyncRead> AsyncRead for std_io::BufReader<T> {
prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool354     unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
355         self.get_ref().prepare_uninitialized_buffer(buf)
356     }
357 }
358 
359 impl<T: AsRef<[u8]>> AsyncRead for std_io::Cursor<T> {
360 }
361 
362 impl<'a> AsyncWrite for std_io::Cursor<&'a mut [u8]> {
shutdown(&mut self) -> Poll<(), std_io::Error>363     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
364         Ok(().into())
365     }
366 }
367 
368 impl AsyncWrite for std_io::Cursor<Vec<u8>> {
shutdown(&mut self) -> Poll<(), std_io::Error>369     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
370         Ok(().into())
371     }
372 }
373 
374 impl AsyncWrite for std_io::Cursor<Box<[u8]>> {
shutdown(&mut self) -> Poll<(), std_io::Error>375     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
376         Ok(().into())
377     }
378 }
379 
_assert_objects()380 fn _assert_objects() {
381     fn _assert<T>() {}
382     _assert::<Box<AsyncRead>>();
383     _assert::<Box<AsyncWrite>>();
384 }
385