1 use bytes::Buf; 2 use futures::{Async, Poll}; 3 use std::io as std_io; 4 5 use AsyncRead; 6 7 /// Writes bytes asynchronously. 8 /// 9 /// The trait inherits from `std::io::Write` and indicates that an I/O object is 10 /// **nonblocking**. All non-blocking I/O objects must return an error when 11 /// bytes cannot be written instead of blocking the current thread. 12 /// 13 /// Specifically, this means that the `poll_write` function will return one of 14 /// the following: 15 /// 16 /// * `Ok(Async::Ready(n))` means that `n` bytes of data was immediately 17 /// written. 18 /// 19 /// * `Ok(Async::NotReady)` means that no data was written from the buffer 20 /// provided. The I/O object is not currently writable but may become writable 21 /// in the future. Most importantly, **the current future's task is scheduled 22 /// to get unparked when the object is writable**. This means that like 23 /// `Future::poll` you'll receive a notification when the I/O object is 24 /// writable again. 25 /// 26 /// * `Err(e)` for other errors are standard I/O errors coming from the 27 /// underlying object. 28 /// 29 /// This trait importantly means that the `write` method only works in the 30 /// context of a future's task. The object may panic if used outside of a task. 31 /// 32 /// Note that this trait also represents that the `Write::flush` method works 33 /// very similarly to the `write` method, notably that `Ok(())` means that the 34 /// writer has successfully been flushed, a "would block" error means that the 35 /// current task is ready to receive a notification when flushing can make more 36 /// progress, and otherwise normal errors can happen as well. 37 pub trait AsyncWrite: std_io::Write { 38 /// Attempt to write bytes from `buf` into the object. 39 /// 40 /// On success, returns `Ok(Async::Ready(num_bytes_written))`. 41 /// 42 /// If the object is not ready for writing, the method returns 43 /// `Ok(Async::NotReady)` and arranges for the current task (via 44 /// `cx.waker()`) to receive a notification when the object becomes 45 /// readable or is closed. poll_write(&mut self, buf: &[u8]) -> Poll<usize, std_io::Error>46 fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, std_io::Error> { 47 match self.write(buf) { 48 Ok(t) => Ok(Async::Ready(t)), 49 Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => return Ok(Async::NotReady), 50 Err(e) => return Err(e.into()), 51 } 52 } 53 54 /// Attempt to flush the object, ensuring that any buffered data reach 55 /// their destination. 56 /// 57 /// On success, returns `Ok(Async::Ready(()))`. 58 /// 59 /// If flushing cannot immediately complete, this method returns 60 /// `Ok(Async::NotReady)` and arranges for the current task (via 61 /// `cx.waker()`) to receive a notification when the object can make 62 /// progress towards flushing. poll_flush(&mut self) -> Poll<(), std_io::Error>63 fn poll_flush(&mut self) -> Poll<(), std_io::Error> { 64 match self.flush() { 65 Ok(t) => Ok(Async::Ready(t)), 66 Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => return Ok(Async::NotReady), 67 Err(e) => return Err(e.into()), 68 } 69 } 70 71 /// Initiates or attempts to shut down this writer, returning success when 72 /// the I/O connection has completely shut down. 73 /// 74 /// This method is intended to be used for asynchronous shutdown of I/O 75 /// connections. For example this is suitable for implementing shutdown of a 76 /// TLS connection or calling `TcpStream::shutdown` on a proxied connection. 77 /// Protocols sometimes need to flush out final pieces of data or otherwise 78 /// perform a graceful shutdown handshake, reading/writing more data as 79 /// appropriate. This method is the hook for such protocols to implement the 80 /// graceful shutdown logic. 81 /// 82 /// This `shutdown` method is required by implementers of the 83 /// `AsyncWrite` trait. Wrappers typically just want to proxy this call 84 /// through to the wrapped type, and base types will typically implement 85 /// shutdown logic here or just return `Ok(().into())`. Note that if you're 86 /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that 87 /// transitively the entire stream has been shut down. After your wrapper's 88 /// shutdown logic has been executed you should shut down the underlying 89 /// stream. 90 /// 91 /// Invocation of a `shutdown` implies an invocation of `flush`. Once this 92 /// method returns `Ready` it implies that a flush successfully happened 93 /// before the shutdown happened. That is, callers don't need to call 94 /// `flush` before calling `shutdown`. They can rely that by calling 95 /// `shutdown` any pending buffered data will be written out. 96 /// 97 /// # Return value 98 /// 99 /// This function returns a `Poll<(), io::Error>` classified as such: 100 /// 101 /// * `Ok(Async::Ready(()))` - indicates that the connection was 102 /// successfully shut down and is now safe to deallocate/drop/close 103 /// resources associated with it. This method means that the current task 104 /// will no longer receive any notifications due to this method and the 105 /// I/O object itself is likely no longer usable. 106 /// 107 /// * `Ok(Async::NotReady)` - indicates that shutdown is initiated but could 108 /// not complete just yet. This may mean that more I/O needs to happen to 109 /// continue this shutdown operation. The current task is scheduled to 110 /// receive a notification when it's otherwise ready to continue the 111 /// shutdown operation. When woken up this method should be called again. 112 /// 113 /// * `Err(e)` - indicates a fatal error has happened with shutdown, 114 /// indicating that the shutdown operation did not complete successfully. 115 /// This typically means that the I/O object is no longer usable. 116 /// 117 /// # Errors 118 /// 119 /// This function can return normal I/O errors through `Err`, described 120 /// above. Additionally this method may also render the underlying 121 /// `Write::write` method no longer usable (e.g. will return errors in the 122 /// future). It's recommended that once `shutdown` is called the 123 /// `write` method is no longer called. 124 /// 125 /// # Panics 126 /// 127 /// This function will panic if not called within the context of a future's 128 /// task. shutdown(&mut self) -> Poll<(), std_io::Error>129 fn shutdown(&mut self) -> Poll<(), std_io::Error>; 130 131 /// Write a `Buf` into this value, returning how many bytes were written. 132 /// 133 /// Note that this method will advance the `buf` provided automatically by 134 /// the number of bytes written. write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> where Self: Sized,135 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> 136 where 137 Self: Sized, 138 { 139 if !buf.has_remaining() { 140 return Ok(Async::Ready(0)); 141 } 142 143 let n = try_ready!(self.poll_write(buf.bytes())); 144 buf.advance(n); 145 Ok(Async::Ready(n)) 146 } 147 } 148 149 impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> { shutdown(&mut self) -> Poll<(), std_io::Error>150 fn shutdown(&mut self) -> Poll<(), std_io::Error> { 151 (**self).shutdown() 152 } 153 } 154 impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T { shutdown(&mut self) -> Poll<(), std_io::Error>155 fn shutdown(&mut self) -> Poll<(), std_io::Error> { 156 (**self).shutdown() 157 } 158 } 159 160 impl AsyncRead for std_io::Repeat { prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool161 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { 162 false 163 } 164 } 165 166 impl AsyncWrite for std_io::Sink { shutdown(&mut self) -> Poll<(), std_io::Error>167 fn shutdown(&mut self) -> Poll<(), std_io::Error> { 168 Ok(().into()) 169 } 170 } 171 172 impl<T: AsyncRead> AsyncRead for std_io::Take<T> { prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool173 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { 174 self.get_ref().prepare_uninitialized_buffer(buf) 175 } 176 } 177 178 impl<T, U> AsyncRead for std_io::Chain<T, U> 179 where 180 T: AsyncRead, 181 U: AsyncRead, 182 { prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool183 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { 184 let (t, u) = self.get_ref(); 185 // We don't need to execute the second initializer if the first one 186 // already zeroed the buffer out. 187 t.prepare_uninitialized_buffer(buf) || u.prepare_uninitialized_buffer(buf) 188 } 189 } 190 191 impl<T: AsyncWrite> AsyncWrite for std_io::BufWriter<T> { shutdown(&mut self) -> Poll<(), std_io::Error>192 fn shutdown(&mut self) -> Poll<(), std_io::Error> { 193 try_ready!(self.poll_flush()); 194 self.get_mut().shutdown() 195 } 196 } 197 198 impl<T: AsyncRead> AsyncRead for std_io::BufReader<T> { prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool199 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { 200 self.get_ref().prepare_uninitialized_buffer(buf) 201 } 202 } 203 204 impl<T: AsRef<[u8]>> AsyncRead for std_io::Cursor<T> {} 205 206 impl<'a> AsyncWrite for std_io::Cursor<&'a mut [u8]> { shutdown(&mut self) -> Poll<(), std_io::Error>207 fn shutdown(&mut self) -> Poll<(), std_io::Error> { 208 Ok(().into()) 209 } 210 } 211 212 impl AsyncWrite for std_io::Cursor<Vec<u8>> { shutdown(&mut self) -> Poll<(), std_io::Error>213 fn shutdown(&mut self) -> Poll<(), std_io::Error> { 214 Ok(().into()) 215 } 216 } 217 218 impl AsyncWrite for std_io::Cursor<Box<[u8]>> { shutdown(&mut self) -> Poll<(), std_io::Error>219 fn shutdown(&mut self) -> Poll<(), std_io::Error> { 220 Ok(().into()) 221 } 222 } 223