1 use super::buf_writer::BufWriter;
2 use futures_core::ready;
3 use futures_core::task::{Context, Poll};
4 use futures_io::AsyncWrite;
5 use futures_io::IoSlice;
6 use pin_project_lite::pin_project;
7 use std::io;
8 use std::pin::Pin;
9 
10 pin_project! {
11 /// Wrap a writer, like [`BufWriter`] does, but prioritizes buffering lines
12 ///
13 /// This was written based on `std::io::LineWriter` which goes into further details
14 /// explaining the code.
15 ///
16 /// Buffering is actually done using `BufWriter`. This class will leverage `BufWriter`
17 /// to write on-each-line.
18 #[derive(Debug)]
19 pub struct LineWriter<W: AsyncWrite> {
20     #[pin]
21     buf_writer: BufWriter<W>,
22 }
23 }
24 
25 impl<W: AsyncWrite> LineWriter<W> {
26     /// Create a new `LineWriter` with default buffer capacity. The default is currently 1KB
27     /// which was taken from `std::io::LineWriter`
new(inner: W) -> LineWriter<W>28     pub fn new(inner: W) -> LineWriter<W> {
29         LineWriter::with_capacity(1024, inner)
30     }
31 
32     /// Creates a new `LineWriter` with the specified buffer capacity.
with_capacity(capacity: usize, inner: W) -> LineWriter<W>33     pub fn with_capacity(capacity: usize, inner: W) -> LineWriter<W> {
34         LineWriter { buf_writer: BufWriter::with_capacity(capacity, inner) }
35     }
36 
37     /// Flush `buf_writer` if last char is "new line"
flush_if_completed_line(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>38     fn flush_if_completed_line(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
39         let this = self.project();
40         match this.buf_writer.buffer().last().copied() {
41             Some(b'\n') => this.buf_writer.flush_buf(cx),
42             _ => Poll::Ready(Ok(())),
43         }
44     }
45 
46     /// Returns a reference to `buf_writer`'s internally buffered data.
buffer(&self) -> &[u8]47     pub fn buffer(&self) -> &[u8] {
48         self.buf_writer.buffer()
49     }
50 
51     /// Acquires a reference to the underlying sink or stream that this combinator is
52     /// pulling from.
get_ref(&self) -> &W53     pub fn get_ref(&self) -> &W {
54         self.buf_writer.get_ref()
55     }
56 }
57 
58 impl<W: AsyncWrite> AsyncWrite for LineWriter<W> {
poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>59     fn poll_write(
60         mut self: Pin<&mut Self>,
61         cx: &mut Context<'_>,
62         buf: &[u8],
63     ) -> Poll<io::Result<usize>> {
64         let mut this = self.as_mut().project();
65         let newline_index = match memchr::memrchr(b'\n', buf) {
66             None => {
67                 ready!(self.as_mut().flush_if_completed_line(cx)?);
68                 return self.project().buf_writer.poll_write(cx, buf);
69             }
70             Some(newline_index) => newline_index + 1,
71         };
72 
73         ready!(this.buf_writer.as_mut().poll_flush(cx)?);
74 
75         let lines = &buf[..newline_index];
76 
77         let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write(cx, lines))? };
78 
79         if flushed == 0 {
80             return Poll::Ready(Ok(0));
81         }
82 
83         let tail = if flushed >= newline_index {
84             &buf[flushed..]
85         } else if newline_index - flushed <= this.buf_writer.capacity() {
86             &buf[flushed..newline_index]
87         } else {
88             let scan_area = &buf[flushed..];
89             let scan_area = &scan_area[..this.buf_writer.capacity()];
90             match memchr::memrchr(b'\n', scan_area) {
91                 Some(newline_index) => &scan_area[..newline_index + 1],
92                 None => scan_area,
93             }
94         };
95 
96         let buffered = this.buf_writer.as_mut().write_to_buf(tail);
97         Poll::Ready(Ok(flushed + buffered))
98     }
99 
poll_write_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>100     fn poll_write_vectored(
101         mut self: Pin<&mut Self>,
102         cx: &mut Context<'_>,
103         bufs: &[IoSlice<'_>],
104     ) -> Poll<io::Result<usize>> {
105         let mut this = self.as_mut().project();
106         // `is_write_vectored()` is handled in original code, but not in this crate
107         // see https://github.com/rust-lang/rust/issues/70436
108 
109         let last_newline_buf_idx = bufs
110             .iter()
111             .enumerate()
112             .rev()
113             .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i));
114         let last_newline_buf_idx = match last_newline_buf_idx {
115             None => {
116                 ready!(self.as_mut().flush_if_completed_line(cx)?);
117                 return self.project().buf_writer.poll_write_vectored(cx, bufs);
118             }
119             Some(i) => i,
120         };
121 
122         ready!(this.buf_writer.as_mut().poll_flush(cx)?);
123 
124         let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1);
125 
126         let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write_vectored(cx, lines))? };
127         if flushed == 0 {
128             return Poll::Ready(Ok(0));
129         }
130 
131         let lines_len = lines.iter().map(|buf| buf.len()).sum();
132         if flushed < lines_len {
133             return Poll::Ready(Ok(flushed));
134         }
135 
136         let buffered: usize = tail
137             .iter()
138             .filter(|buf| !buf.is_empty())
139             .map(|buf| this.buf_writer.as_mut().write_to_buf(buf))
140             .take_while(|&n| n > 0)
141             .sum();
142 
143         Poll::Ready(Ok(flushed + buffered))
144     }
145 
146     /// Forward to `buf_writer` 's `BufWriter::poll_flush()`
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>147     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
148         self.as_mut().project().buf_writer.poll_flush(cx)
149     }
150 
151     /// Forward to `buf_writer` 's `BufWriter::poll_close()`
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>152     fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
153         self.as_mut().project().buf_writer.poll_close(cx)
154     }
155 }
156