1 use core::{
2     pin::Pin,
3     task::{Context, Poll},
4 };
5 use std::io::Result;
6 
7 use crate::{
8     codec::Encode,
9     futures::write::{AsyncBufWrite, BufWriter},
10     util::PartialBuffer,
11 };
12 use futures_core::ready;
13 use futures_io::AsyncWrite;
14 use pin_project_lite::pin_project;
15 
16 #[derive(Debug)]
17 enum State {
18     Encoding,
19     Finishing,
20     Done,
21 }
22 
23 pin_project! {
24     #[derive(Debug)]
25     pub struct Encoder<W, E: Encode> {
26         #[pin]
27         writer: BufWriter<W>,
28         encoder: E,
29         state: State,
30     }
31 }
32 
33 impl<W: AsyncWrite, E: Encode> Encoder<W, E> {
new(writer: W, encoder: E) -> Self34     pub fn new(writer: W, encoder: E) -> Self {
35         Self {
36             writer: BufWriter::new(writer),
37             encoder,
38             state: State::Encoding,
39         }
40     }
41 
get_ref(&self) -> &W42     pub fn get_ref(&self) -> &W {
43         self.writer.get_ref()
44     }
45 
get_mut(&mut self) -> &mut W46     pub fn get_mut(&mut self) -> &mut W {
47         self.writer.get_mut()
48     }
49 
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W>50     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
51         self.project().writer.get_pin_mut()
52     }
53 
into_inner(self) -> W54     pub fn into_inner(self) -> W {
55         self.writer.into_inner()
56     }
57 
do_poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, input: &mut PartialBuffer<&[u8]>, ) -> Poll<Result<()>>58     fn do_poll_write(
59         self: Pin<&mut Self>,
60         cx: &mut Context<'_>,
61         input: &mut PartialBuffer<&[u8]>,
62     ) -> Poll<Result<()>> {
63         let mut this = self.project();
64 
65         loop {
66             let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?;
67             let mut output = PartialBuffer::new(output);
68 
69             *this.state = match this.state {
70                 State::Encoding => {
71                     this.encoder.encode(input, &mut output)?;
72                     State::Encoding
73                 }
74 
75                 State::Finishing | State::Done => panic!("Write after close"),
76             };
77 
78             let produced = output.written().len();
79             this.writer.as_mut().produce(produced);
80 
81             if input.unwritten().is_empty() {
82                 return Poll::Ready(Ok(()));
83             }
84         }
85     }
86 
do_poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>87     fn do_poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
88         let mut this = self.project();
89 
90         loop {
91             let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?;
92             let mut output = PartialBuffer::new(output);
93 
94             let done = match this.state {
95                 State::Encoding => this.encoder.flush(&mut output)?,
96 
97                 State::Finishing | State::Done => panic!("Flush after close"),
98             };
99 
100             let produced = output.written().len();
101             this.writer.as_mut().produce(produced);
102 
103             if done {
104                 return Poll::Ready(Ok(()));
105             }
106         }
107     }
108 
do_poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>109     fn do_poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
110         let mut this = self.project();
111 
112         loop {
113             let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?;
114             let mut output = PartialBuffer::new(output);
115 
116             *this.state = match this.state {
117                 State::Encoding | State::Finishing => {
118                     if this.encoder.finish(&mut output)? {
119                         State::Done
120                     } else {
121                         State::Finishing
122                     }
123                 }
124 
125                 State::Done => State::Done,
126             };
127 
128             let produced = output.written().len();
129             this.writer.as_mut().produce(produced);
130 
131             if let State::Done = this.state {
132                 return Poll::Ready(Ok(()));
133             }
134         }
135     }
136 }
137 
138 impl<W: AsyncWrite, E: Encode> AsyncWrite for Encoder<W, E> {
poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>139     fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
140         if buf.is_empty() {
141             return Poll::Ready(Ok(0));
142         }
143 
144         let mut input = PartialBuffer::new(buf);
145 
146         match self.do_poll_write(cx, &mut input)? {
147             Poll::Pending if input.written().is_empty() => Poll::Pending,
148             _ => Poll::Ready(Ok(input.written().len())),
149         }
150     }
151 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>152     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
153         ready!(self.as_mut().do_poll_flush(cx))?;
154         ready!(self.project().writer.as_mut().poll_flush(cx))?;
155         Poll::Ready(Ok(()))
156     }
157 
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>158     fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
159         ready!(self.as_mut().do_poll_close(cx))?;
160         ready!(self.project().writer.as_mut().poll_close(cx))?;
161         Poll::Ready(Ok(()))
162     }
163 }
164