1 use crate::io::AsyncBufRead;
2 
3 use pin_project_lite::pin_project;
4 use std::future::Future;
5 use std::io;
6 use std::marker::PhantomPinned;
7 use std::mem;
8 use std::pin::Pin;
9 use std::task::{Context, Poll};
10 
11 pin_project! {
12     /// Future for the [`read_until`](crate::io::AsyncBufReadExt::read_until) method.
13     /// The delimeter is included in the resulting vector.
14     #[derive(Debug)]
15     #[must_use = "futures do nothing unless you `.await` or poll them"]
16     pub struct ReadUntil<'a, R: ?Sized> {
17         reader: &'a mut R,
18         delimeter: u8,
19         buf: &'a mut Vec<u8>,
20         // The number of bytes appended to buf. This can be less than buf.len() if
21         // the buffer was not empty when the operation was started.
22         read: usize,
23         // Make this future `!Unpin` for compatibility with async trait methods.
24         #[pin]
25         _pin: PhantomPinned,
26     }
27 }
28 
read_until<'a, R>( reader: &'a mut R, delimeter: u8, buf: &'a mut Vec<u8>, ) -> ReadUntil<'a, R> where R: AsyncBufRead + ?Sized + Unpin,29 pub(crate) fn read_until<'a, R>(
30     reader: &'a mut R,
31     delimeter: u8,
32     buf: &'a mut Vec<u8>,
33 ) -> ReadUntil<'a, R>
34 where
35     R: AsyncBufRead + ?Sized + Unpin,
36 {
37     ReadUntil {
38         reader,
39         delimeter,
40         buf,
41         read: 0,
42         _pin: PhantomPinned,
43     }
44 }
45 
read_until_internal<R: AsyncBufRead + ?Sized>( mut reader: Pin<&mut R>, cx: &mut Context<'_>, delimeter: u8, buf: &mut Vec<u8>, read: &mut usize, ) -> Poll<io::Result<usize>>46 pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
47     mut reader: Pin<&mut R>,
48     cx: &mut Context<'_>,
49     delimeter: u8,
50     buf: &mut Vec<u8>,
51     read: &mut usize,
52 ) -> Poll<io::Result<usize>> {
53     loop {
54         let (done, used) = {
55             let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
56             if let Some(i) = memchr::memchr(delimeter, available) {
57                 buf.extend_from_slice(&available[..=i]);
58                 (true, i + 1)
59             } else {
60                 buf.extend_from_slice(available);
61                 (false, available.len())
62             }
63         };
64         reader.as_mut().consume(used);
65         *read += used;
66         if done || used == 0 {
67             return Poll::Ready(Ok(mem::replace(read, 0)));
68         }
69     }
70 }
71 
72 impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
73     type Output = io::Result<usize>;
74 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>75     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
76         let me = self.project();
77         read_until_internal(Pin::new(*me.reader), cx, *me.delimeter, me.buf, me.read)
78     }
79 }
80