1 use std::io as std_io; 2 use bytes::Buf; 3 use futures::{Async, Poll}; 4 5 use AsyncRead; 6 7 /// Writes bytes asynchronously. 8 /// 9 /// The trait inherits from `std::io::Write` and indicates that an I/O object is 10 /// **nonblocking**. All non-blocking I/O objects must return an error when 11 /// bytes cannot be written instead of blocking the current thread. 12 /// 13 /// Specifically, this means that the `write` function will return one of the 14 /// following: 15 /// 16 /// * `Ok(n)` means that `n` bytes of data was immediately written . 17 /// 18 /// * `Err(e) if e.kind() == ErrorKind::WouldBlock` means that no data was 19 /// written from the buffer provided. The I/O object is not currently 20 /// writable but may become writable in the future. Most importantly, **the 21 /// current future's task is scheduled to get unparked when the object is 22 /// readable**. This means that like `Future::poll` you'll receive a 23 /// notification when the I/O object is writable again. 24 /// 25 /// * `Err(e)` for other errors are standard I/O errors coming from the 26 /// underlying object. 27 /// 28 /// This trait importantly means that the `write` method only works in the 29 /// context of a future's task. The object may panic if used outside of a task. 30 /// 31 /// Note that this trait also represents that the `Write::flush` method works 32 /// very similarly to the `write` method, notably that `Ok(())` means that the 33 /// writer has successfully been flushed, a "would block" error means that the 34 /// current task is ready to receive a notification when flushing can make more 35 /// progress, and otherwise normal errors can happen as well. 36 pub trait AsyncWrite: std_io::Write { 37 /// Attempt to write bytes from `buf` into the object. 38 /// 39 /// On success, returns `Ok(Async::Ready(num_bytes_written))`. 40 /// 41 /// If the object is not ready for writing, the method returns 42 /// `Ok(Async::Pending)` and arranges for the current task (via 43 /// `cx.waker()`) to receive a notification when the object becomes 44 /// readable or is closed. poll_write(&mut self, buf: &[u8]) -> Poll<usize, std_io::Error>45 fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, std_io::Error> { 46 match self.write(buf) { 47 Ok(t) => Ok(Async::Ready(t)), 48 Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => { 49 return Ok(Async::NotReady) 50 } 51 Err(e) => return Err(e.into()), 52 } 53 } 54 55 /// Attempt to flush the object, ensuring that any buffered data reach 56 /// their destination. 57 /// 58 /// On success, returns `Ok(Async::Ready(()))`. 59 /// 60 /// If flushing cannot immediately complete, this method returns 61 /// `Ok(Async::Pending)` and arranges for the current task (via 62 /// `cx.waker()`) to receive a notification when the object can make 63 /// progress towards flushing. poll_flush(&mut self) -> Poll<(), std_io::Error>64 fn poll_flush(&mut self) -> Poll<(), std_io::Error> { 65 match self.flush() { 66 Ok(t) => Ok(Async::Ready(t)), 67 Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => { 68 return Ok(Async::NotReady) 69 } 70 Err(e) => return Err(e.into()), 71 } 72 } 73 74 /// Initiates or attempts to shut down this writer, returning success when 75 /// the I/O connection has completely shut down. 76 /// 77 /// This method is intended to be used for asynchronous shutdown of I/O 78 /// connections. For example this is suitable for implementing shutdown of a 79 /// TLS connection or calling `TcpStream::shutdown` on a proxied connection. 80 /// Protocols sometimes need to flush out final pieces of data or otherwise 81 /// perform a graceful shutdown handshake, reading/writing more data as 82 /// appropriate. This method is the hook for such protocols to implement the 83 /// graceful shutdown logic. 84 /// 85 /// This `shutdown` method is required by implementors of the 86 /// `AsyncWrite` trait. Wrappers typically just want to proxy this call 87 /// through to the wrapped type, and base types will typically implement 88 /// shutdown logic here or just return `Ok(().into())`. Note that if you're 89 /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that 90 /// transitively the entire stream has been shut down. After your wrapper's 91 /// shutdown logic has been executed you should shut down the underlying 92 /// stream. 93 /// 94 /// Invocation of a `shutdown` implies an invocation of `flush`. Once this 95 /// method returns `Ready` it implies that a flush successfully happened 96 /// before the shutdown happened. That is, callers don't need to call 97 /// `flush` before calling `shutdown`. They can rely that by calling 98 /// `shutdown` any pending buffered data will be written out. 99 /// 100 /// # Return value 101 /// 102 /// This function returns a `Poll<(), io::Error>` classified as such: 103 /// 104 /// * `Ok(Async::Ready(()))` - indicates that the connection was 105 /// successfully shut down and is now safe to deallocate/drop/close 106 /// resources associated with it. This method means that the current task 107 /// will no longer receive any notifications due to this method and the 108 /// I/O object itself is likely no longer usable. 109 /// 110 /// * `Ok(Async::NotReady)` - indicates that shutdown is initiated but could 111 /// not complete just yet. This may mean that more I/O needs to happen to 112 /// continue this shutdown operation. The current task is scheduled to 113 /// receive a notification when it's otherwise ready to continue the 114 /// shutdown operation. When woken up this method should be called again. 115 /// 116 /// * `Err(e)` - indicates a fatal error has happened with shutdown, 117 /// indicating that the shutdown operation did not complete successfully. 118 /// This typically means that the I/O object is no longer usable. 119 /// 120 /// # Errors 121 /// 122 /// This function can return normal I/O errors through `Err`, described 123 /// above. Additionally this method may also render the underlying 124 /// `Write::write` method no longer usable (e.g. will return errors in the 125 /// future). It's recommended that once `shutdown` is called the 126 /// `write` method is no longer called. 127 /// 128 /// # Panics 129 /// 130 /// This function will panic if not called within the context of a future's 131 /// task. shutdown(&mut self) -> Poll<(), std_io::Error>132 fn shutdown(&mut self) -> Poll<(), std_io::Error>; 133 134 /// Write a `Buf` into this value, returning how many bytes were written. 135 /// 136 /// Note that this method will advance the `buf` provided automatically by 137 /// the number of bytes written. write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> where Self: Sized,138 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> 139 where Self: Sized, 140 { 141 if !buf.has_remaining() { 142 return Ok(Async::Ready(0)); 143 } 144 145 let n = try_ready!(self.poll_write(buf.bytes())); 146 buf.advance(n); 147 Ok(Async::Ready(n)) 148 } 149 } 150 151 impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> { shutdown(&mut self) -> Poll<(), std_io::Error>152 fn shutdown(&mut self) -> Poll<(), std_io::Error> { 153 (**self).shutdown() 154 } 155 } 156 impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T { shutdown(&mut self) -> Poll<(), std_io::Error>157 fn shutdown(&mut self) -> Poll<(), std_io::Error> { 158 (**self).shutdown() 159 } 160 } 161 162 impl AsyncRead for std_io::Repeat { prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool163 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { 164 false 165 } 166 } 167 168 impl AsyncWrite for std_io::Sink { shutdown(&mut self) -> Poll<(), std_io::Error>169 fn shutdown(&mut self) -> Poll<(), std_io::Error> { 170 Ok(().into()) 171 } 172 } 173 174 // TODO: Implement `prepare_uninitialized_buffer` for `io::Take`. 175 // This is blocked on rust-lang/rust#27269 176 impl<T: AsyncRead> AsyncRead for std_io::Take<T> { 177 } 178 179 // TODO: Implement `prepare_uninitialized_buffer` when upstream exposes inner 180 // parts 181 impl<T, U> AsyncRead for std_io::Chain<T, U> 182 where T: AsyncRead, 183 U: AsyncRead, 184 { 185 } 186 187 impl<T: AsyncWrite> AsyncWrite for std_io::BufWriter<T> { shutdown(&mut self) -> Poll<(), std_io::Error>188 fn shutdown(&mut self) -> Poll<(), std_io::Error> { 189 try_ready!(self.poll_flush()); 190 self.get_mut().shutdown() 191 } 192 } 193 194 impl<T: AsyncRead> AsyncRead for std_io::BufReader<T> { prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool195 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { 196 self.get_ref().prepare_uninitialized_buffer(buf) 197 } 198 } 199 200 impl<T: AsRef<[u8]>> AsyncRead for std_io::Cursor<T> { 201 } 202 203 impl<'a> AsyncWrite for std_io::Cursor<&'a mut [u8]> { shutdown(&mut self) -> Poll<(), std_io::Error>204 fn shutdown(&mut self) -> Poll<(), std_io::Error> { 205 Ok(().into()) 206 } 207 } 208 209 impl AsyncWrite for std_io::Cursor<Vec<u8>> { shutdown(&mut self) -> Poll<(), std_io::Error>210 fn shutdown(&mut self) -> Poll<(), std_io::Error> { 211 Ok(().into()) 212 } 213 } 214 215 impl AsyncWrite for std_io::Cursor<Box<[u8]>> { shutdown(&mut self) -> Poll<(), std_io::Error>216 fn shutdown(&mut self) -> Poll<(), std_io::Error> { 217 Ok(().into()) 218 } 219 } 220