1 use std::io as std_io;
2 use bytes::Buf;
3 use futures::{Async, Poll};
4 
5 use AsyncRead;
6 
7 /// Writes bytes asynchronously.
8 ///
9 /// The trait inherits from `std::io::Write` and indicates that an I/O object is
10 /// **nonblocking**. All non-blocking I/O objects must return an error when
11 /// bytes cannot be written instead of blocking the current thread.
12 ///
13 /// Specifically, this means that the `write` function will return one of the
14 /// following:
15 ///
16 /// * `Ok(n)` means that `n` bytes of data was immediately written .
17 ///
18 /// * `Err(e) if e.kind() == ErrorKind::WouldBlock` means that no data was
19 ///   written from the buffer provided. The I/O object is not currently
20 ///   writable but may become writable in the future. Most importantly, **the
21 ///   current future's task is scheduled to get unparked when the object is
22 ///   readable**. This means that like `Future::poll` you'll receive a
23 ///   notification when the I/O object is writable again.
24 ///
25 /// * `Err(e)` for other errors are standard I/O errors coming from the
26 ///   underlying object.
27 ///
28 /// This trait importantly means that the `write` method only works in the
29 /// context of a future's task. The object may panic if used outside of a task.
30 ///
31 /// Note that this trait also represents that the  `Write::flush` method works
32 /// very similarly to the `write` method, notably that `Ok(())` means that the
33 /// writer has successfully been flushed, a "would block" error means that the
34 /// current task is ready to receive a notification when flushing can make more
35 /// progress, and otherwise normal errors can happen as well.
36 pub trait AsyncWrite: std_io::Write {
37     /// Attempt to write bytes from `buf` into the object.
38     ///
39     /// On success, returns `Ok(Async::Ready(num_bytes_written))`.
40     ///
41     /// If the object is not ready for writing, the method returns
42     /// `Ok(Async::Pending)` and arranges for the current task (via
43     /// `cx.waker()`) to receive a notification when the object becomes
44     /// readable or is closed.
poll_write(&mut self, buf: &[u8]) -> Poll<usize, std_io::Error>45     fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, std_io::Error> {
46         match self.write(buf) {
47             Ok(t) => Ok(Async::Ready(t)),
48             Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => {
49                 return Ok(Async::NotReady)
50             }
51             Err(e) => return Err(e.into()),
52         }
53     }
54 
55     /// Attempt to flush the object, ensuring that any buffered data reach
56     /// their destination.
57     ///
58     /// On success, returns `Ok(Async::Ready(()))`.
59     ///
60     /// If flushing cannot immediately complete, this method returns
61     /// `Ok(Async::Pending)` and arranges for the current task (via
62     /// `cx.waker()`) to receive a notification when the object can make
63     /// progress towards flushing.
poll_flush(&mut self) -> Poll<(), std_io::Error>64     fn poll_flush(&mut self) -> Poll<(), std_io::Error> {
65         match self.flush() {
66             Ok(t) => Ok(Async::Ready(t)),
67             Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => {
68                 return Ok(Async::NotReady)
69             }
70             Err(e) => return Err(e.into()),
71         }
72     }
73 
74     /// Initiates or attempts to shut down this writer, returning success when
75     /// the I/O connection has completely shut down.
76     ///
77     /// This method is intended to be used for asynchronous shutdown of I/O
78     /// connections. For example this is suitable for implementing shutdown of a
79     /// TLS connection or calling `TcpStream::shutdown` on a proxied connection.
80     /// Protocols sometimes need to flush out final pieces of data or otherwise
81     /// perform a graceful shutdown handshake, reading/writing more data as
82     /// appropriate. This method is the hook for such protocols to implement the
83     /// graceful shutdown logic.
84     ///
85     /// This `shutdown` method is required by implementors of the
86     /// `AsyncWrite` trait. Wrappers typically just want to proxy this call
87     /// through to the wrapped type, and base types will typically implement
88     /// shutdown logic here or just return `Ok(().into())`. Note that if you're
89     /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that
90     /// transitively the entire stream has been shut down. After your wrapper's
91     /// shutdown logic has been executed you should shut down the underlying
92     /// stream.
93     ///
94     /// Invocation of a `shutdown` implies an invocation of `flush`. Once this
95     /// method returns `Ready` it implies that a flush successfully happened
96     /// before the shutdown happened. That is, callers don't need to call
97     /// `flush` before calling `shutdown`. They can rely that by calling
98     /// `shutdown` any pending buffered data will be written out.
99     ///
100     /// # Return value
101     ///
102     /// This function returns a `Poll<(), io::Error>` classified as such:
103     ///
104     /// * `Ok(Async::Ready(()))` - indicates that the connection was
105     ///   successfully shut down and is now safe to deallocate/drop/close
106     ///   resources associated with it. This method means that the current task
107     ///   will no longer receive any notifications due to this method and the
108     ///   I/O object itself is likely no longer usable.
109     ///
110     /// * `Ok(Async::NotReady)` - indicates that shutdown is initiated but could
111     ///   not complete just yet. This may mean that more I/O needs to happen to
112     ///   continue this shutdown operation. The current task is scheduled to
113     ///   receive a notification when it's otherwise ready to continue the
114     ///   shutdown operation. When woken up this method should be called again.
115     ///
116     /// * `Err(e)` - indicates a fatal error has happened with shutdown,
117     ///   indicating that the shutdown operation did not complete successfully.
118     ///   This typically means that the I/O object is no longer usable.
119     ///
120     /// # Errors
121     ///
122     /// This function can return normal I/O errors through `Err`, described
123     /// above. Additionally this method may also render the underlying
124     /// `Write::write` method no longer usable (e.g. will return errors in the
125     /// future). It's recommended that once `shutdown` is called the
126     /// `write` method is no longer called.
127     ///
128     /// # Panics
129     ///
130     /// This function will panic if not called within the context of a future's
131     /// task.
shutdown(&mut self) -> Poll<(), std_io::Error>132     fn shutdown(&mut self) -> Poll<(), std_io::Error>;
133 
134     /// Write a `Buf` into this value, returning how many bytes were written.
135     ///
136     /// Note that this method will advance the `buf` provided automatically by
137     /// the number of bytes written.
write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> where Self: Sized,138     fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error>
139         where Self: Sized,
140     {
141         if !buf.has_remaining() {
142             return Ok(Async::Ready(0));
143         }
144 
145         let n = try_ready!(self.poll_write(buf.bytes()));
146         buf.advance(n);
147         Ok(Async::Ready(n))
148     }
149 }
150 
151 impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> {
shutdown(&mut self) -> Poll<(), std_io::Error>152     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
153         (**self).shutdown()
154     }
155 }
156 impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T {
shutdown(&mut self) -> Poll<(), std_io::Error>157     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
158         (**self).shutdown()
159     }
160 }
161 
162 impl AsyncRead for std_io::Repeat {
prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool163     unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
164         false
165     }
166 }
167 
168 impl AsyncWrite for std_io::Sink {
shutdown(&mut self) -> Poll<(), std_io::Error>169     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
170         Ok(().into())
171     }
172 }
173 
174 // TODO: Implement `prepare_uninitialized_buffer` for `io::Take`.
175 // This is blocked on rust-lang/rust#27269
176 impl<T: AsyncRead> AsyncRead for std_io::Take<T> {
177 }
178 
179 // TODO: Implement `prepare_uninitialized_buffer` when upstream exposes inner
180 // parts
181 impl<T, U> AsyncRead for std_io::Chain<T, U>
182     where T: AsyncRead,
183           U: AsyncRead,
184 {
185 }
186 
187 impl<T: AsyncWrite> AsyncWrite for std_io::BufWriter<T> {
shutdown(&mut self) -> Poll<(), std_io::Error>188     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
189         try_ready!(self.poll_flush());
190         self.get_mut().shutdown()
191     }
192 }
193 
194 impl<T: AsyncRead> AsyncRead for std_io::BufReader<T> {
prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool195     unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
196         self.get_ref().prepare_uninitialized_buffer(buf)
197     }
198 }
199 
200 impl<T: AsRef<[u8]>> AsyncRead for std_io::Cursor<T> {
201 }
202 
203 impl<'a> AsyncWrite for std_io::Cursor<&'a mut [u8]> {
shutdown(&mut self) -> Poll<(), std_io::Error>204     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
205         Ok(().into())
206     }
207 }
208 
209 impl AsyncWrite for std_io::Cursor<Vec<u8>> {
shutdown(&mut self) -> Poll<(), std_io::Error>210     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
211         Ok(().into())
212     }
213 }
214 
215 impl AsyncWrite for std_io::Cursor<Box<[u8]>> {
shutdown(&mut self) -> Poll<(), std_io::Error>216     fn shutdown(&mut self) -> Poll<(), std_io::Error> {
217         Ok(().into())
218     }
219 }
220