1 use crate::io::AsyncBufRead;
2
3 use pin_project_lite::pin_project;
4 use std::future::Future;
5 use std::io;
6 use std::marker::PhantomPinned;
7 use std::mem;
8 use std::pin::Pin;
9 use std::task::{Context, Poll};
10
11 pin_project! {
12 /// Future for the [`read_until`](crate::io::AsyncBufReadExt::read_until) method.
13 /// The delimeter is included in the resulting vector.
14 #[derive(Debug)]
15 #[must_use = "futures do nothing unless you `.await` or poll them"]
16 pub struct ReadUntil<'a, R: ?Sized> {
17 reader: &'a mut R,
18 delimeter: u8,
19 buf: &'a mut Vec<u8>,
20 // The number of bytes appended to buf. This can be less than buf.len() if
21 // the buffer was not empty when the operation was started.
22 read: usize,
23 // Make this future `!Unpin` for compatibility with async trait methods.
24 #[pin]
25 _pin: PhantomPinned,
26 }
27 }
28
read_until<'a, R>( reader: &'a mut R, delimeter: u8, buf: &'a mut Vec<u8>, ) -> ReadUntil<'a, R> where R: AsyncBufRead + ?Sized + Unpin,29 pub(crate) fn read_until<'a, R>(
30 reader: &'a mut R,
31 delimeter: u8,
32 buf: &'a mut Vec<u8>,
33 ) -> ReadUntil<'a, R>
34 where
35 R: AsyncBufRead + ?Sized + Unpin,
36 {
37 ReadUntil {
38 reader,
39 delimeter,
40 buf,
41 read: 0,
42 _pin: PhantomPinned,
43 }
44 }
45
read_until_internal<R: AsyncBufRead + ?Sized>( mut reader: Pin<&mut R>, cx: &mut Context<'_>, delimeter: u8, buf: &mut Vec<u8>, read: &mut usize, ) -> Poll<io::Result<usize>>46 pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
47 mut reader: Pin<&mut R>,
48 cx: &mut Context<'_>,
49 delimeter: u8,
50 buf: &mut Vec<u8>,
51 read: &mut usize,
52 ) -> Poll<io::Result<usize>> {
53 loop {
54 let (done, used) = {
55 let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
56 if let Some(i) = memchr::memchr(delimeter, available) {
57 buf.extend_from_slice(&available[..=i]);
58 (true, i + 1)
59 } else {
60 buf.extend_from_slice(available);
61 (false, available.len())
62 }
63 };
64 reader.as_mut().consume(used);
65 *read += used;
66 if done || used == 0 {
67 return Poll::Ready(Ok(mem::replace(read, 0)));
68 }
69 }
70 }
71
72 impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
73 type Output = io::Result<usize>;
74
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>75 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
76 let me = self.project();
77 read_until_internal(Pin::new(*me.reader), cx, *me.delimeter, me.buf, me.read)
78 }
79 }
80