1 use bytes::Buf; 2 use std::io; 3 use std::ops::DerefMut; 4 use std::pin::Pin; 5 use std::task::{Context, Poll}; 6 7 /// Writes bytes asynchronously. 8 /// 9 /// The trait inherits from [`std::io::Write`] and indicates that an I/O object is 10 /// **nonblocking**. All non-blocking I/O objects must return an error when 11 /// bytes cannot be written instead of blocking the current thread. 12 /// 13 /// Specifically, this means that the [`poll_write`] function will return one of 14 /// the following: 15 /// 16 /// * `Poll::Ready(Ok(n))` means that `n` bytes of data was immediately 17 /// written. 18 /// 19 /// * `Poll::Pending` means that no data was written from the buffer 20 /// provided. The I/O object is not currently writable but may become writable 21 /// in the future. Most importantly, **the current future's task is scheduled 22 /// to get unparked when the object is writable**. This means that like 23 /// `Future::poll` you'll receive a notification when the I/O object is 24 /// writable again. 25 /// 26 /// * `Poll::Ready(Err(e))` for other errors are standard I/O errors coming from the 27 /// underlying object. 28 /// 29 /// This trait importantly means that the [`write`][stdwrite] method only works in 30 /// the context of a future's task. The object may panic if used outside of a task. 31 /// 32 /// Note that this trait also represents that the [`Write::flush`][stdflush] method 33 /// works very similarly to the `write` method, notably that `Ok(())` means that the 34 /// writer has successfully been flushed, a "would block" error means that the 35 /// current task is ready to receive a notification when flushing can make more 36 /// progress, and otherwise normal errors can happen as well. 37 /// 38 /// Utilities for working with `AsyncWrite` values are provided by 39 /// [`AsyncWriteExt`]. 40 /// 41 /// [`std::io::Write`]: std::io::Write 42 /// [`poll_write`]: AsyncWrite::poll_write() 43 /// [stdwrite]: std::io::Write::write() 44 /// [stdflush]: std::io::Write::flush() 45 /// [`AsyncWriteExt`]: crate::io::AsyncWriteExt 46 pub trait AsyncWrite { 47 /// Attempt to write bytes from `buf` into the object. 48 /// 49 /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. 50 /// 51 /// If the object is not ready for writing, the method returns 52 /// `Poll::Pending` and arranges for the current task (via 53 /// `cx.waker()`) to receive a notification when the object becomes 54 /// writable or is closed. poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize, io::Error>>55 fn poll_write( 56 self: Pin<&mut Self>, 57 cx: &mut Context<'_>, 58 buf: &[u8], 59 ) -> Poll<Result<usize, io::Error>>; 60 61 /// Attempts to flush the object, ensuring that any buffered data reach 62 /// their destination. 63 /// 64 /// On success, returns `Poll::Ready(Ok(()))`. 65 /// 66 /// If flushing cannot immediately complete, this method returns 67 /// `Poll::Pending` and arranges for the current task (via 68 /// `cx.waker()`) to receive a notification when the object can make 69 /// progress towards flushing. poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>70 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>; 71 72 /// Initiates or attempts to shut down this writer, returning success when 73 /// the I/O connection has completely shut down. 74 /// 75 /// This method is intended to be used for asynchronous shutdown of I/O 76 /// connections. For example this is suitable for implementing shutdown of a 77 /// TLS connection or calling `TcpStream::shutdown` on a proxied connection. 78 /// Protocols sometimes need to flush out final pieces of data or otherwise 79 /// perform a graceful shutdown handshake, reading/writing more data as 80 /// appropriate. This method is the hook for such protocols to implement the 81 /// graceful shutdown logic. 82 /// 83 /// This `shutdown` method is required by implementers of the 84 /// `AsyncWrite` trait. Wrappers typically just want to proxy this call 85 /// through to the wrapped type, and base types will typically implement 86 /// shutdown logic here or just return `Ok(().into())`. Note that if you're 87 /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that 88 /// transitively the entire stream has been shut down. After your wrapper's 89 /// shutdown logic has been executed you should shut down the underlying 90 /// stream. 91 /// 92 /// Invocation of a `shutdown` implies an invocation of `flush`. Once this 93 /// method returns `Ready` it implies that a flush successfully happened 94 /// before the shutdown happened. That is, callers don't need to call 95 /// `flush` before calling `shutdown`. They can rely that by calling 96 /// `shutdown` any pending buffered data will be written out. 97 /// 98 /// # Return value 99 /// 100 /// This function returns a `Poll<io::Result<()>>` classified as such: 101 /// 102 /// * `Poll::Ready(Ok(()))` - indicates that the connection was 103 /// successfully shut down and is now safe to deallocate/drop/close 104 /// resources associated with it. This method means that the current task 105 /// will no longer receive any notifications due to this method and the 106 /// I/O object itself is likely no longer usable. 107 /// 108 /// * `Poll::Pending` - indicates that shutdown is initiated but could 109 /// not complete just yet. This may mean that more I/O needs to happen to 110 /// continue this shutdown operation. The current task is scheduled to 111 /// receive a notification when it's otherwise ready to continue the 112 /// shutdown operation. When woken up this method should be called again. 113 /// 114 /// * `Poll::Ready(Err(e))` - indicates a fatal error has happened with shutdown, 115 /// indicating that the shutdown operation did not complete successfully. 116 /// This typically means that the I/O object is no longer usable. 117 /// 118 /// # Errors 119 /// 120 /// This function can return normal I/O errors through `Err`, described 121 /// above. Additionally this method may also render the underlying 122 /// `Write::write` method no longer usable (e.g. will return errors in the 123 /// future). It's recommended that once `shutdown` is called the 124 /// `write` method is no longer called. 125 /// 126 /// # Panics 127 /// 128 /// This function will panic if not called within the context of a future's 129 /// task. poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>130 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>; 131 132 /// Writes a `Buf` into this value, returning how many bytes were written. 133 /// 134 /// Note that this method will advance the `buf` provided automatically by 135 /// the number of bytes written. poll_write_buf<B: Buf>( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B, ) -> Poll<Result<usize, io::Error>> where Self: Sized,136 fn poll_write_buf<B: Buf>( 137 self: Pin<&mut Self>, 138 cx: &mut Context<'_>, 139 buf: &mut B, 140 ) -> Poll<Result<usize, io::Error>> 141 where 142 Self: Sized, 143 { 144 if !buf.has_remaining() { 145 return Poll::Ready(Ok(0)); 146 } 147 148 let n = ready!(self.poll_write(cx, buf.bytes()))?; 149 buf.advance(n); 150 Poll::Ready(Ok(n)) 151 } 152 } 153 154 macro_rules! deref_async_write { 155 () => { 156 fn poll_write( 157 mut self: Pin<&mut Self>, 158 cx: &mut Context<'_>, 159 buf: &[u8], 160 ) -> Poll<io::Result<usize>> { 161 Pin::new(&mut **self).poll_write(cx, buf) 162 } 163 164 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 165 Pin::new(&mut **self).poll_flush(cx) 166 } 167 168 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 169 Pin::new(&mut **self).poll_shutdown(cx) 170 } 171 }; 172 } 173 174 impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> { 175 deref_async_write!(); 176 } 177 178 impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T { 179 deref_async_write!(); 180 } 181 182 impl<P> AsyncWrite for Pin<P> 183 where 184 P: DerefMut + Unpin, 185 P::Target: AsyncWrite, 186 { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>187 fn poll_write( 188 self: Pin<&mut Self>, 189 cx: &mut Context<'_>, 190 buf: &[u8], 191 ) -> Poll<io::Result<usize>> { 192 self.get_mut().as_mut().poll_write(cx, buf) 193 } 194 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>195 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 196 self.get_mut().as_mut().poll_flush(cx) 197 } 198 poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>199 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 200 self.get_mut().as_mut().poll_shutdown(cx) 201 } 202 } 203 204 impl AsyncWrite for Vec<u8> { poll_write( self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>205 fn poll_write( 206 self: Pin<&mut Self>, 207 _cx: &mut Context<'_>, 208 buf: &[u8], 209 ) -> Poll<io::Result<usize>> { 210 self.get_mut().extend_from_slice(buf); 211 Poll::Ready(Ok(buf.len())) 212 } 213 poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>214 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { 215 Poll::Ready(Ok(())) 216 } 217 poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>218 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { 219 Poll::Ready(Ok(())) 220 } 221 } 222 223 impl AsyncWrite for io::Cursor<&mut [u8]> { poll_write( mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>224 fn poll_write( 225 mut self: Pin<&mut Self>, 226 _: &mut Context<'_>, 227 buf: &[u8], 228 ) -> Poll<io::Result<usize>> { 229 Poll::Ready(io::Write::write(&mut *self, buf)) 230 } 231 poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>232 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 233 Poll::Ready(io::Write::flush(&mut *self)) 234 } 235 poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>236 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 237 self.poll_flush(cx) 238 } 239 } 240 241 impl AsyncWrite for io::Cursor<&mut Vec<u8>> { poll_write( mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>242 fn poll_write( 243 mut self: Pin<&mut Self>, 244 _: &mut Context<'_>, 245 buf: &[u8], 246 ) -> Poll<io::Result<usize>> { 247 Poll::Ready(io::Write::write(&mut *self, buf)) 248 } 249 poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>250 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 251 Poll::Ready(io::Write::flush(&mut *self)) 252 } 253 poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>254 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 255 self.poll_flush(cx) 256 } 257 } 258 259 impl AsyncWrite for io::Cursor<Vec<u8>> { poll_write( mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>260 fn poll_write( 261 mut self: Pin<&mut Self>, 262 _: &mut Context<'_>, 263 buf: &[u8], 264 ) -> Poll<io::Result<usize>> { 265 Poll::Ready(io::Write::write(&mut *self, buf)) 266 } 267 poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>268 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 269 Poll::Ready(io::Write::flush(&mut *self)) 270 } 271 poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>272 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 273 self.poll_flush(cx) 274 } 275 } 276 277 impl AsyncWrite for io::Cursor<Box<[u8]>> { poll_write( mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>278 fn poll_write( 279 mut self: Pin<&mut Self>, 280 _: &mut Context<'_>, 281 buf: &[u8], 282 ) -> Poll<io::Result<usize>> { 283 Poll::Ready(io::Write::write(&mut *self, buf)) 284 } 285 poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>286 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 287 Poll::Ready(io::Write::flush(&mut *self)) 288 } 289 poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>290 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 291 self.poll_flush(cx) 292 } 293 } 294