1 use bytes::Buf;
2 use std::io;
3 use std::ops::DerefMut;
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6 
7 /// Writes bytes asynchronously.
8 ///
9 /// The trait inherits from [`std::io::Write`] and indicates that an I/O object is
10 /// **nonblocking**. All non-blocking I/O objects must return an error when
11 /// bytes cannot be written instead of blocking the current thread.
12 ///
13 /// Specifically, this means that the [`poll_write`] function will return one of
14 /// the following:
15 ///
16 /// * `Poll::Ready(Ok(n))` means that `n` bytes of data was immediately
17 ///   written.
18 ///
19 /// * `Poll::Pending` means that no data was written from the buffer
20 ///   provided. The I/O object is not currently writable but may become writable
21 ///   in the future. Most importantly, **the current future's task is scheduled
22 ///   to get unparked when the object is writable**. This means that like
23 ///   `Future::poll` you'll receive a notification when the I/O object is
24 ///   writable again.
25 ///
26 /// * `Poll::Ready(Err(e))` for other errors are standard I/O errors coming from the
27 ///   underlying object.
28 ///
29 /// This trait importantly means that the [`write`][stdwrite] method only works in
30 /// the context of a future's task. The object may panic if used outside of a task.
31 ///
32 /// Note that this trait also represents that the  [`Write::flush`][stdflush] method
33 /// works very similarly to the `write` method, notably that `Ok(())` means that the
34 /// writer has successfully been flushed, a "would block" error means that the
35 /// current task is ready to receive a notification when flushing can make more
36 /// progress, and otherwise normal errors can happen as well.
37 ///
38 /// Utilities for working with `AsyncWrite` values are provided by
39 /// [`AsyncWriteExt`].
40 ///
41 /// [`std::io::Write`]: std::io::Write
42 /// [`poll_write`]: AsyncWrite::poll_write()
43 /// [stdwrite]: std::io::Write::write()
44 /// [stdflush]: std::io::Write::flush()
45 /// [`AsyncWriteExt`]: crate::io::AsyncWriteExt
46 pub trait AsyncWrite {
47     /// Attempt to write bytes from `buf` into the object.
48     ///
49     /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
50     ///
51     /// If the object is not ready for writing, the method returns
52     /// `Poll::Pending` and arranges for the current task (via
53     /// `cx.waker()`) to receive a notification when the object becomes
54     /// writable or is closed.
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize, io::Error>>55     fn poll_write(
56         self: Pin<&mut Self>,
57         cx: &mut Context<'_>,
58         buf: &[u8],
59     ) -> Poll<Result<usize, io::Error>>;
60 
61     /// Attempts to flush the object, ensuring that any buffered data reach
62     /// their destination.
63     ///
64     /// On success, returns `Poll::Ready(Ok(()))`.
65     ///
66     /// If flushing cannot immediately complete, this method returns
67     /// `Poll::Pending` and arranges for the current task (via
68     /// `cx.waker()`) to receive a notification when the object can make
69     /// progress towards flushing.
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>70     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
71 
72     /// Initiates or attempts to shut down this writer, returning success when
73     /// the I/O connection has completely shut down.
74     ///
75     /// This method is intended to be used for asynchronous shutdown of I/O
76     /// connections. For example this is suitable for implementing shutdown of a
77     /// TLS connection or calling `TcpStream::shutdown` on a proxied connection.
78     /// Protocols sometimes need to flush out final pieces of data or otherwise
79     /// perform a graceful shutdown handshake, reading/writing more data as
80     /// appropriate. This method is the hook for such protocols to implement the
81     /// graceful shutdown logic.
82     ///
83     /// This `shutdown` method is required by implementers of the
84     /// `AsyncWrite` trait. Wrappers typically just want to proxy this call
85     /// through to the wrapped type, and base types will typically implement
86     /// shutdown logic here or just return `Ok(().into())`. Note that if you're
87     /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that
88     /// transitively the entire stream has been shut down. After your wrapper's
89     /// shutdown logic has been executed you should shut down the underlying
90     /// stream.
91     ///
92     /// Invocation of a `shutdown` implies an invocation of `flush`. Once this
93     /// method returns `Ready` it implies that a flush successfully happened
94     /// before the shutdown happened. That is, callers don't need to call
95     /// `flush` before calling `shutdown`. They can rely that by calling
96     /// `shutdown` any pending buffered data will be written out.
97     ///
98     /// # Return value
99     ///
100     /// This function returns a `Poll<io::Result<()>>` classified as such:
101     ///
102     /// * `Poll::Ready(Ok(()))` - indicates that the connection was
103     ///   successfully shut down and is now safe to deallocate/drop/close
104     ///   resources associated with it. This method means that the current task
105     ///   will no longer receive any notifications due to this method and the
106     ///   I/O object itself is likely no longer usable.
107     ///
108     /// * `Poll::Pending` - indicates that shutdown is initiated but could
109     ///   not complete just yet. This may mean that more I/O needs to happen to
110     ///   continue this shutdown operation. The current task is scheduled to
111     ///   receive a notification when it's otherwise ready to continue the
112     ///   shutdown operation. When woken up this method should be called again.
113     ///
114     /// * `Poll::Ready(Err(e))` - indicates a fatal error has happened with shutdown,
115     ///   indicating that the shutdown operation did not complete successfully.
116     ///   This typically means that the I/O object is no longer usable.
117     ///
118     /// # Errors
119     ///
120     /// This function can return normal I/O errors through `Err`, described
121     /// above. Additionally this method may also render the underlying
122     /// `Write::write` method no longer usable (e.g. will return errors in the
123     /// future). It's recommended that once `shutdown` is called the
124     /// `write` method is no longer called.
125     ///
126     /// # Panics
127     ///
128     /// This function will panic if not called within the context of a future's
129     /// task.
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>130     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
131 
132     /// Writes a `Buf` into this value, returning how many bytes were written.
133     ///
134     /// Note that this method will advance the `buf` provided automatically by
135     /// the number of bytes written.
poll_write_buf<B: Buf>( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B, ) -> Poll<Result<usize, io::Error>> where Self: Sized,136     fn poll_write_buf<B: Buf>(
137         self: Pin<&mut Self>,
138         cx: &mut Context<'_>,
139         buf: &mut B,
140     ) -> Poll<Result<usize, io::Error>>
141     where
142         Self: Sized,
143     {
144         if !buf.has_remaining() {
145             return Poll::Ready(Ok(0));
146         }
147 
148         let n = ready!(self.poll_write(cx, buf.bytes()))?;
149         buf.advance(n);
150         Poll::Ready(Ok(n))
151     }
152 }
153 
154 macro_rules! deref_async_write {
155     () => {
156         fn poll_write(
157             mut self: Pin<&mut Self>,
158             cx: &mut Context<'_>,
159             buf: &[u8],
160         ) -> Poll<io::Result<usize>> {
161             Pin::new(&mut **self).poll_write(cx, buf)
162         }
163 
164         fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
165             Pin::new(&mut **self).poll_flush(cx)
166         }
167 
168         fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
169             Pin::new(&mut **self).poll_shutdown(cx)
170         }
171     };
172 }
173 
174 impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> {
175     deref_async_write!();
176 }
177 
178 impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T {
179     deref_async_write!();
180 }
181 
182 impl<P> AsyncWrite for Pin<P>
183 where
184     P: DerefMut + Unpin,
185     P::Target: AsyncWrite,
186 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>187     fn poll_write(
188         self: Pin<&mut Self>,
189         cx: &mut Context<'_>,
190         buf: &[u8],
191     ) -> Poll<io::Result<usize>> {
192         self.get_mut().as_mut().poll_write(cx, buf)
193     }
194 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>195     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
196         self.get_mut().as_mut().poll_flush(cx)
197     }
198 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>199     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
200         self.get_mut().as_mut().poll_shutdown(cx)
201     }
202 }
203 
204 impl AsyncWrite for Vec<u8> {
poll_write( self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>205     fn poll_write(
206         self: Pin<&mut Self>,
207         _cx: &mut Context<'_>,
208         buf: &[u8],
209     ) -> Poll<io::Result<usize>> {
210         self.get_mut().extend_from_slice(buf);
211         Poll::Ready(Ok(buf.len()))
212     }
213 
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>214     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
215         Poll::Ready(Ok(()))
216     }
217 
poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>218     fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
219         Poll::Ready(Ok(()))
220     }
221 }
222 
223 impl AsyncWrite for io::Cursor<&mut [u8]> {
poll_write( mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>224     fn poll_write(
225         mut self: Pin<&mut Self>,
226         _: &mut Context<'_>,
227         buf: &[u8],
228     ) -> Poll<io::Result<usize>> {
229         Poll::Ready(io::Write::write(&mut *self, buf))
230     }
231 
poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>232     fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
233         Poll::Ready(io::Write::flush(&mut *self))
234     }
235 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>236     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
237         self.poll_flush(cx)
238     }
239 }
240 
241 impl AsyncWrite for io::Cursor<&mut Vec<u8>> {
poll_write( mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>242     fn poll_write(
243         mut self: Pin<&mut Self>,
244         _: &mut Context<'_>,
245         buf: &[u8],
246     ) -> Poll<io::Result<usize>> {
247         Poll::Ready(io::Write::write(&mut *self, buf))
248     }
249 
poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>250     fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
251         Poll::Ready(io::Write::flush(&mut *self))
252     }
253 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>254     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
255         self.poll_flush(cx)
256     }
257 }
258 
259 impl AsyncWrite for io::Cursor<Vec<u8>> {
poll_write( mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>260     fn poll_write(
261         mut self: Pin<&mut Self>,
262         _: &mut Context<'_>,
263         buf: &[u8],
264     ) -> Poll<io::Result<usize>> {
265         Poll::Ready(io::Write::write(&mut *self, buf))
266     }
267 
poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>268     fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
269         Poll::Ready(io::Write::flush(&mut *self))
270     }
271 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>272     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
273         self.poll_flush(cx)
274     }
275 }
276 
277 impl AsyncWrite for io::Cursor<Box<[u8]>> {
poll_write( mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>278     fn poll_write(
279         mut self: Pin<&mut Self>,
280         _: &mut Context<'_>,
281         buf: &[u8],
282     ) -> Poll<io::Result<usize>> {
283         Poll::Ready(io::Write::write(&mut *self, buf))
284     }
285 
poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>286     fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
287         Poll::Ready(io::Write::flush(&mut *self))
288     }
289 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>290     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
291         self.poll_flush(cx)
292     }
293 }
294