1 use futures_core::ready;
2 use futures_core::task::{Context, Poll};
3 use futures_io::AsyncWrite;
4 use futures_sink::Sink;
5 use std::io;
6 use std::pin::Pin;
7 use pin_project_lite::pin_project;
8 
9 #[derive(Debug)]
10 struct Block<Item> {
11     offset: usize,
12     bytes: Item,
13 }
14 
15 pin_project! {
16     /// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method.
17     #[must_use = "sinks do nothing unless polled"]
18     #[derive(Debug)]
19     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
20     pub struct IntoSink<W, Item> {
21         #[pin]
22         writer: W,
23         // An outstanding block for us to push into the underlying writer, along with an offset of how
24         // far into this block we have written already.
25         buffer: Option<Block<Item>>,
26     }
27 }
28 
29 impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> {
new(writer: W) -> Self30     pub(super) fn new(writer: W) -> Self {
31         Self { writer, buffer: None }
32     }
33 
34     /// If we have an outstanding block in `buffer` attempt to push it into the writer, does _not_
35     /// flush the writer after it succeeds in pushing the block into it.
poll_flush_buffer( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), io::Error>>36     fn poll_flush_buffer(
37         self: Pin<&mut Self>,
38         cx: &mut Context<'_>,
39     ) -> Poll<Result<(), io::Error>>
40     {
41         let mut this = self.project();
42 
43         if let Some(buffer) = this.buffer {
44             loop {
45                 let bytes = buffer.bytes.as_ref();
46                 let written = ready!(this.writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?;
47                 buffer.offset += written;
48                 if buffer.offset == bytes.len() {
49                     break;
50                 }
51             }
52         }
53         *this.buffer = None;
54         Poll::Ready(Ok(()))
55     }
56 
57 }
58 
59 impl<W: AsyncWrite, Item: AsRef<[u8]>> Sink<Item> for IntoSink<W, Item> {
60     type Error = io::Error;
61 
poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>62     fn poll_ready(
63         self: Pin<&mut Self>,
64         cx: &mut Context<'_>,
65     ) -> Poll<Result<(), Self::Error>>
66     {
67         ready!(self.poll_flush_buffer(cx))?;
68         Poll::Ready(Ok(()))
69     }
70 
71     #[allow(clippy::debug_assert_with_mut_call)]
start_send( self: Pin<&mut Self>, item: Item, ) -> Result<(), Self::Error>72     fn start_send(
73         self: Pin<&mut Self>,
74         item: Item,
75     ) -> Result<(), Self::Error>
76     {
77         debug_assert!(self.buffer.is_none());
78         *self.project().buffer = Some(Block { offset: 0, bytes: item });
79         Ok(())
80     }
81 
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>82     fn poll_flush(
83         mut self: Pin<&mut Self>,
84         cx: &mut Context<'_>,
85     ) -> Poll<Result<(), Self::Error>>
86     {
87         ready!(self.as_mut().poll_flush_buffer(cx))?;
88         ready!(self.project().writer.poll_flush(cx))?;
89         Poll::Ready(Ok(()))
90     }
91 
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>92     fn poll_close(
93         mut self: Pin<&mut Self>,
94         cx: &mut Context<'_>,
95     ) -> Poll<Result<(), Self::Error>>
96     {
97         ready!(self.as_mut().poll_flush_buffer(cx))?;
98         ready!(self.project().writer.poll_close(cx))?;
99         Poll::Ready(Ok(()))
100     }
101 }
102