1 use futures_core::ready; 2 use futures_core::task::{Context, Poll}; 3 use futures_io::AsyncWrite; 4 use futures_sink::Sink; 5 use std::io; 6 use std::pin::Pin; 7 use pin_project_lite::pin_project; 8 9 #[derive(Debug)] 10 struct Block<Item> { 11 offset: usize, 12 bytes: Item, 13 } 14 15 pin_project! { 16 /// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method. 17 #[must_use = "sinks do nothing unless polled"] 18 #[derive(Debug)] 19 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 20 pub struct IntoSink<W, Item> { 21 #[pin] 22 writer: W, 23 // An outstanding block for us to push into the underlying writer, along with an offset of how 24 // far into this block we have written already. 25 buffer: Option<Block<Item>>, 26 } 27 } 28 29 impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> { new(writer: W) -> Self30 pub(super) fn new(writer: W) -> Self { 31 Self { writer, buffer: None } 32 } 33 34 /// If we have an outstanding block in `buffer` attempt to push it into the writer, does _not_ 35 /// flush the writer after it succeeds in pushing the block into it. poll_flush_buffer( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), io::Error>>36 fn poll_flush_buffer( 37 self: Pin<&mut Self>, 38 cx: &mut Context<'_>, 39 ) -> Poll<Result<(), io::Error>> 40 { 41 let mut this = self.project(); 42 43 if let Some(buffer) = this.buffer { 44 loop { 45 let bytes = buffer.bytes.as_ref(); 46 let written = ready!(this.writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?; 47 buffer.offset += written; 48 if buffer.offset == bytes.len() { 49 break; 50 } 51 } 52 } 53 *this.buffer = None; 54 Poll::Ready(Ok(())) 55 } 56 57 } 58 59 impl<W: AsyncWrite, Item: AsRef<[u8]>> Sink<Item> for IntoSink<W, Item> { 60 type Error = io::Error; 61 poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>62 fn poll_ready( 63 self: Pin<&mut Self>, 64 cx: &mut Context<'_>, 65 ) -> Poll<Result<(), Self::Error>> 66 { 67 ready!(self.poll_flush_buffer(cx))?; 68 Poll::Ready(Ok(())) 69 } 70 71 #[allow(clippy::debug_assert_with_mut_call)] start_send( self: Pin<&mut Self>, item: Item, ) -> Result<(), Self::Error>72 fn start_send( 73 self: Pin<&mut Self>, 74 item: Item, 75 ) -> Result<(), Self::Error> 76 { 77 debug_assert!(self.buffer.is_none()); 78 *self.project().buffer = Some(Block { offset: 0, bytes: item }); 79 Ok(()) 80 } 81 poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>82 fn poll_flush( 83 mut self: Pin<&mut Self>, 84 cx: &mut Context<'_>, 85 ) -> Poll<Result<(), Self::Error>> 86 { 87 ready!(self.as_mut().poll_flush_buffer(cx))?; 88 ready!(self.project().writer.poll_flush(cx))?; 89 Poll::Ready(Ok(())) 90 } 91 poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>92 fn poll_close( 93 mut self: Pin<&mut Self>, 94 cx: &mut Context<'_>, 95 ) -> Poll<Result<(), Self::Error>> 96 { 97 ready!(self.as_mut().poll_flush_buffer(cx))?; 98 ready!(self.project().writer.poll_close(cx))?; 99 Poll::Ready(Ok(())) 100 } 101 } 102