1 use bytes::Buf;
2 use futures::{Async, Poll};
3 use std::io as std_io;
4 
5 use AsyncRead;
6 
7 /// Writes bytes asynchronously.
8 ///
9 /// The trait inherits from `std::io::Write` and indicates that an I/O object is
10 /// **nonblocking**. All non-blocking I/O objects must return an error when
11 /// bytes cannot be written instead of blocking the current thread.
12 ///
13 /// Specifically, this means that the `poll_write` function will return one of
14 /// the following:
15 ///
16 /// * `Ok(Async::Ready(n))` means that `n` bytes of data was immediately
17 ///   written.
18 ///
19 /// * `Ok(Async::NotReady)` means that no data was written from the buffer
20 ///   provided. The I/O object is not currently writable but may become writable
21 ///   in the future. Most importantly, **the current future's task is scheduled
22 ///   to get unparked when the object is writable**. This means that like
23 ///   `Future::poll` you'll receive a notification when the I/O object is
24 ///   writable again.
25 ///
26 /// * `Err(e)` for other errors are standard I/O errors coming from the
27 ///   underlying object.
28 ///
29 /// This trait importantly means that the `write` method only works in the
30 /// context of a future's task. The object may panic if used outside of a task.
31 ///
32 /// Note that this trait also represents that the  `Write::flush` method works
33 /// very similarly to the `write` method, notably that `Ok(())` means that the
34 /// writer has successfully been flushed, a "would block" error means that the
35 /// current task is ready to receive a notification when flushing can make more
36 /// progress, and otherwise normal errors can happen as well.
37 pub trait AsyncWrite: std_io::Write {
38     /// Attempt to write bytes from `buf` into the object.
39     ///
40     /// On success, returns `Ok(Async::Ready(num_bytes_written))`.
41     ///
42     /// If the object is not ready for writing, the method returns
43     /// `Ok(Async::NotReady)` and arranges for the current task (via
44     /// `cx.waker()`) to receive a notification when the object becomes
45     /// readable or is closed.
poll_write(&mut self, buf: &[u8]) -> Poll<usize, std_io::Error>46     fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, std_io::Error> {
47         match self.write(buf) {
48             Ok(t) => Ok(Async::Ready(t)),
49             Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
50             Err(e) => return Err(e.into()),
51         }
52     }
53 
54     /// Attempt to flush the object, ensuring that any buffered data reach
55     /// their destination.
56     ///
57     /// On success, returns `Ok(Async::Ready(()))`.
58     ///
59     /// If flushing cannot immediately complete, this method returns
60     /// `Ok(Async::NotReady)` and arranges for the current task (via
61     /// `cx.waker()`) to receive a notification when the object can make
62     /// progress towards flushing.
poll_flush(&mut self) -> Poll<(), std_io::Error>63     fn poll_flush(&mut self) -> Poll<(), std_io::Error> {
64         match self.flush() {
65             Ok(t) => Ok(Async::Ready(t)),
66             Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
67             Err(e) => return Err(e.into()),
68         }
69     }
70 
71     /// Initiates or attempts to shut down this writer, returning success when
72     /// the I/O connection has completely shut down.
73     ///
74     /// This method is intended to be used for asynchronous shutdown of I/O
75     /// connections. For example this is suitable for implementing shutdown of a
76     /// TLS connection or calling `TcpStream::shutdown` on a proxied connection.
77     /// Protocols sometimes need to flush out final pieces of data or otherwise
78     /// perform a graceful shutdown handshake, reading/writing more data as
79     /// appropriate. This method is the hook for such protocols to implement the
80     /// graceful shutdown logic.
81     ///
82     /// This `shutdown` method is required by implementers of the
83     /// `AsyncWrite` trait. Wrappers typically just want to proxy this call
84     /// through to the wrapped type, and base types will typically implement
85     /// shutdown logic here or just return `Ok(().into())`. Note that if you're
86     /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that
87     /// transitively the entire stream has been shut down. After your wrapper's
88     /// shutdown logic has been executed you should shut down the underlying
89     /// stream.
90     ///
91     /// Invocation of a `shutdown` implies an invocation of `flush`. Once this
92     /// method returns `Ready` it implies that a flush successfully happened
93     /// before the shutdown happened. That is, callers don't need to call
94     /// `flush` before calling `shutdown`. They can rely that by calling
95     /// `shutdown` any pending buffered data will be written out.
96     ///
97     /// # Return value
98     ///
99     /// This function returns a `Poll<(), io::Error>` classified as such:
100     ///
101     /// * `Ok(Async::Ready(()))` - indicates that the connection was
102     ///   successfully shut down and is now safe to deallocate/drop/close
103     ///   resources associated with it. This method means that the current task
104     ///   will no longer receive any notifications due to this method and the
105     ///   I/O object itself is likely no longer usable.
106     ///
107     /// * `Ok(Async::NotReady)` - indicates that shutdown is initiated but could
108     ///   not complete just yet. This may mean that more I/O needs to happen to
109     ///   continue this shutdown operation. The current task is scheduled to
110     ///   receive a notification when it's otherwise ready to continue the
111     ///   shutdown operation. When woken up this method should be called again.
112     ///
113     /// * `Err(e)` - indicates a fatal error has happened with shutdown,
114     ///   indicating that the shutdown operation did not complete successfully.
115     ///   This typically means that the I/O object is no longer usable.
116     ///
117     /// # Errors
118     ///
119     /// This function can return normal I/O errors through `Err`, described
120     /// above. Additionally this method may also render the underlying
121     /// `Write::write` method no longer usable (e.g. will return errors in the
122     /// future). It's recommended that once `shutdown` is called the
123     /// `write` method is no longer called.
124     ///
125     /// # Panics
126     ///
127     /// This function will panic if not called within the context of a future's
128     /// task.
shutdown(&mut self) -> Poll<(), std_io::Error>129     fn shutdown(&mut self) -> Poll<(), std_io::Error>;
130 
131     /// Write a `Buf` into this value, returning how many bytes were written.
132     ///
133     /// Note that this method will advance the `buf` provided automatically by
134     /// the number of bytes written.
write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> where Self: Sized,135     fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error>
136     where
137         Self: Sized,
138     {
139         if !buf.has_remaining() {
140             return Ok(Async::Ready(0));
141         }
142 
143         let n = try_ready!(self.poll_write(buf.bytes()));
144         buf.advance(n);
145         Ok(Async::Ready(n))
146     }
147 }
148 
149 impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> {
shutdown(&mut self) -> Poll<(), std_io::Error>150     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
151         (**self).shutdown()
152     }
153 }
154 impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T {
shutdown(&mut self) -> Poll<(), std_io::Error>155     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
156         (**self).shutdown()
157     }
158 }
159 
160 impl AsyncRead for std_io::Repeat {
prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool161     unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
162         false
163     }
164 }
165 
166 impl AsyncWrite for std_io::Sink {
shutdown(&mut self) -> Poll<(), std_io::Error>167     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
168         Ok(().into())
169     }
170 }
171 
172 impl<T: AsyncRead> AsyncRead for std_io::Take<T> {
prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool173     unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
174         self.get_ref().prepare_uninitialized_buffer(buf)
175     }
176 }
177 
178 impl<T, U> AsyncRead for std_io::Chain<T, U>
179 where
180     T: AsyncRead,
181     U: AsyncRead,
182 {
prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool183     unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
184         let (t, u) = self.get_ref();
185         // We don't need to execute the second initializer if the first one
186         // already zeroed the buffer out.
187         t.prepare_uninitialized_buffer(buf) || u.prepare_uninitialized_buffer(buf)
188     }
189 }
190 
191 impl<T: AsyncWrite> AsyncWrite for std_io::BufWriter<T> {
shutdown(&mut self) -> Poll<(), std_io::Error>192     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
193         try_ready!(self.poll_flush());
194         self.get_mut().shutdown()
195     }
196 }
197 
198 impl<T: AsyncRead> AsyncRead for std_io::BufReader<T> {
prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool199     unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
200         self.get_ref().prepare_uninitialized_buffer(buf)
201     }
202 }
203 
204 impl<T: AsRef<[u8]>> AsyncRead for std_io::Cursor<T> {}
205 
206 impl<'a> AsyncWrite for std_io::Cursor<&'a mut [u8]> {
shutdown(&mut self) -> Poll<(), std_io::Error>207     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
208         Ok(().into())
209     }
210 }
211 
212 impl AsyncWrite for std_io::Cursor<Vec<u8>> {
shutdown(&mut self) -> Poll<(), std_io::Error>213     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
214         Ok(().into())
215     }
216 }
217 
218 impl AsyncWrite for std_io::Cursor<Box<[u8]>> {
shutdown(&mut self) -> Poll<(), std_io::Error>219     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
220         Ok(().into())
221     }
222 }
223