1 use core::{ 2 pin::Pin, 3 task::{Context, Poll}, 4 }; 5 use std::io::Result; 6 7 use crate::{ 8 codec::Encode, 9 futures::write::{AsyncBufWrite, BufWriter}, 10 util::PartialBuffer, 11 }; 12 use futures_core::ready; 13 use futures_io::AsyncWrite; 14 use pin_project_lite::pin_project; 15 16 #[derive(Debug)] 17 enum State { 18 Encoding, 19 Finishing, 20 Done, 21 } 22 23 pin_project! { 24 #[derive(Debug)] 25 pub struct Encoder<W, E: Encode> { 26 #[pin] 27 writer: BufWriter<W>, 28 encoder: E, 29 state: State, 30 } 31 } 32 33 impl<W: AsyncWrite, E: Encode> Encoder<W, E> { new(writer: W, encoder: E) -> Self34 pub fn new(writer: W, encoder: E) -> Self { 35 Self { 36 writer: BufWriter::new(writer), 37 encoder, 38 state: State::Encoding, 39 } 40 } 41 get_ref(&self) -> &W42 pub fn get_ref(&self) -> &W { 43 self.writer.get_ref() 44 } 45 get_mut(&mut self) -> &mut W46 pub fn get_mut(&mut self) -> &mut W { 47 self.writer.get_mut() 48 } 49 get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W>50 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { 51 self.project().writer.get_pin_mut() 52 } 53 into_inner(self) -> W54 pub fn into_inner(self) -> W { 55 self.writer.into_inner() 56 } 57 do_poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, input: &mut PartialBuffer<&[u8]>, ) -> Poll<Result<()>>58 fn do_poll_write( 59 self: Pin<&mut Self>, 60 cx: &mut Context<'_>, 61 input: &mut PartialBuffer<&[u8]>, 62 ) -> Poll<Result<()>> { 63 let mut this = self.project(); 64 65 loop { 66 let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; 67 let mut output = PartialBuffer::new(output); 68 69 *this.state = match this.state { 70 State::Encoding => { 71 this.encoder.encode(input, &mut output)?; 72 State::Encoding 73 } 74 75 State::Finishing | State::Done => panic!("Write after close"), 76 }; 77 78 let produced = output.written().len(); 79 this.writer.as_mut().produce(produced); 80 81 if input.unwritten().is_empty() { 82 return Poll::Ready(Ok(())); 83 } 84 } 85 } 86 do_poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>87 fn do_poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 88 let mut this = self.project(); 89 90 loop { 91 let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; 92 let mut output = PartialBuffer::new(output); 93 94 let done = match this.state { 95 State::Encoding => this.encoder.flush(&mut output)?, 96 97 State::Finishing | State::Done => panic!("Flush after close"), 98 }; 99 100 let produced = output.written().len(); 101 this.writer.as_mut().produce(produced); 102 103 if done { 104 return Poll::Ready(Ok(())); 105 } 106 } 107 } 108 do_poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>109 fn do_poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 110 let mut this = self.project(); 111 112 loop { 113 let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; 114 let mut output = PartialBuffer::new(output); 115 116 *this.state = match this.state { 117 State::Encoding | State::Finishing => { 118 if this.encoder.finish(&mut output)? { 119 State::Done 120 } else { 121 State::Finishing 122 } 123 } 124 125 State::Done => State::Done, 126 }; 127 128 let produced = output.written().len(); 129 this.writer.as_mut().produce(produced); 130 131 if let State::Done = this.state { 132 return Poll::Ready(Ok(())); 133 } 134 } 135 } 136 } 137 138 impl<W: AsyncWrite, E: Encode> AsyncWrite for Encoder<W, E> { poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>139 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { 140 if buf.is_empty() { 141 return Poll::Ready(Ok(0)); 142 } 143 144 let mut input = PartialBuffer::new(buf); 145 146 match self.do_poll_write(cx, &mut input)? { 147 Poll::Pending if input.written().is_empty() => Poll::Pending, 148 _ => Poll::Ready(Ok(input.written().len())), 149 } 150 } 151 poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>152 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 153 ready!(self.as_mut().do_poll_flush(cx))?; 154 ready!(self.project().writer.as_mut().poll_flush(cx))?; 155 Poll::Ready(Ok(())) 156 } 157 poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>158 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 159 ready!(self.as_mut().do_poll_close(cx))?; 160 ready!(self.project().writer.as_mut().poll_close(cx))?; 161 Poll::Ready(Ok(())) 162 } 163 } 164