1 use crate::future::Either;
2 use crate::stream::{Fuse, StreamExt};
3 use core::fmt;
4 use core::pin::Pin;
5 use futures_core::future::{FusedFuture, Future};
6 use futures_core::stream::{FusedStream, Stream};
7 use futures_core::task::{Context, Poll};
8 #[cfg(feature = "sink")]
9 use futures_sink::Sink;
10 use pin_utils::{unsafe_pinned, unsafe_unpinned};
11 
12 /// A `Stream` that implements a `peek` method.
13 ///
14 /// The `peek` method can be used to retrieve a reference
15 /// to the next `Stream::Item` if available. A subsequent
16 /// call to `poll` will return the owned item.
17 #[derive(Debug)]
18 #[must_use = "streams do nothing unless polled"]
19 pub struct Peekable<St: Stream> {
20     stream: Fuse<St>,
21     peeked: Option<St::Item>,
22 }
23 
24 impl<St: Stream + Unpin> Unpin for Peekable<St> {}
25 
26 impl<St: Stream> Peekable<St> {
27     unsafe_pinned!(stream: Fuse<St>);
28     unsafe_unpinned!(peeked: Option<St::Item>);
29 
new(stream: St) -> Peekable<St>30     pub(super) fn new(stream: St) -> Peekable<St> {
31         Peekable {
32             stream: stream.fuse(),
33             peeked: None,
34         }
35     }
36 
37     /// Acquires a reference to the underlying stream that this combinator is
38     /// pulling from.
get_ref(&self) -> &St39     pub fn get_ref(&self) -> &St {
40         self.stream.get_ref()
41     }
42 
43     /// Acquires a mutable reference to the underlying stream that this
44     /// combinator is pulling from.
45     ///
46     /// Note that care must be taken to avoid tampering with the state of the
47     /// stream which may otherwise confuse this combinator.
get_mut(&mut self) -> &mut St48     pub fn get_mut(&mut self) -> &mut St {
49         self.stream.get_mut()
50     }
51 
52     /// Acquires a pinned mutable reference to the underlying stream that this
53     /// combinator is pulling from.
54     ///
55     /// Note that care must be taken to avoid tampering with the state of the
56     /// stream which may otherwise confuse this combinator.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St>57     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
58         self.stream().get_pin_mut()
59     }
60 
61     /// Consumes this combinator, returning the underlying stream.
62     ///
63     /// Note that this may discard intermediate state of this combinator, so
64     /// care should be taken to avoid losing resources when this is called.
into_inner(self) -> St65     pub fn into_inner(self) -> St {
66         self.stream.into_inner()
67     }
68 
69     /// Produces a `Peek` future which retrieves a reference to the next item
70     /// in the stream, or `None` if the underlying stream terminates.
peek(self: Pin<&mut Self>) -> Peek<'_, St>71     pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> {
72         Peek { inner: Some(self) }
73     }
74 
75     /// Attempt to poll the underlying stream, and return the mutable borrow
76     /// in case that is desirable to try for another time.
77     /// In case a peeking poll is successful, the reference to the next item
78     /// will be in the `Either::Right` variant; otherwise, the mutable borrow
79     /// will be in the `Either::Left` variant.
do_poll_peek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Either<Pin<&mut Self>, Option<&St::Item>>80     fn do_poll_peek(
81         mut self: Pin<&mut Self>,
82         cx: &mut Context<'_>,
83     ) -> Either<Pin<&mut Self>, Option<&St::Item>> {
84         if self.peeked.is_some() {
85             let this: &Self = self.into_ref().get_ref();
86             return Either::Right(this.peeked.as_ref());
87         }
88         match self.as_mut().stream().poll_next(cx) {
89             Poll::Ready(None) => Either::Right(None),
90             Poll::Ready(Some(item)) => {
91                 *self.as_mut().peeked() = Some(item);
92                 let this: &Self = self.into_ref().get_ref();
93                 Either::Right(this.peeked.as_ref())
94             }
95             _ => Either::Left(self),
96         }
97     }
98 
99     /// Peek retrieves a reference to the next item in the stream.
100     ///
101     /// This method polls the underlying stream and return either a reference
102     /// to the next item if the stream is ready or passes through any errors.
poll_peek( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<&St::Item>>103     pub fn poll_peek(
104         self: Pin<&mut Self>,
105         cx: &mut Context<'_>,
106     ) -> Poll<Option<&St::Item>> {
107         match self.do_poll_peek(cx) {
108             Either::Left(_) => Poll::Pending,
109             Either::Right(poll) => Poll::Ready(poll),
110         }
111     }
112 }
113 
114 impl<St: Stream> FusedStream for Peekable<St> {
is_terminated(&self) -> bool115     fn is_terminated(&self) -> bool {
116         self.peeked.is_none() && self.stream.is_terminated()
117     }
118 }
119 
120 impl<S: Stream> Stream for Peekable<S> {
121     type Item = S::Item;
122 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>123     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124         if let Some(item) = self.as_mut().peeked().take() {
125             return Poll::Ready(Some(item));
126         }
127         self.as_mut().stream().poll_next(cx)
128     }
129 
size_hint(&self) -> (usize, Option<usize>)130     fn size_hint(&self) -> (usize, Option<usize>) {
131         let peek_len = if self.peeked.is_some() { 1 } else { 0 };
132         let (lower, upper) = self.stream.size_hint();
133         let lower = lower.saturating_add(peek_len);
134         let upper = match upper {
135             Some(x) => x.checked_add(peek_len),
136             None => None,
137         };
138         (lower, upper)
139     }
140 }
141 
142 // Forwarding impl of Sink from the underlying stream
143 #[cfg(feature = "sink")]
144 impl<S, Item> Sink<Item> for Peekable<S>
145 where
146     S: Sink<Item> + Stream,
147 {
148     type Error = S::Error;
149 
150     delegate_sink!(stream, Item);
151 }
152 
153 /// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`]
154 #[must_use = "futures do nothing unless polled"]
155 pub struct Peek<'a, St: Stream> {
156     inner: Option<Pin<&'a mut Peekable<St>>>,
157 }
158 
159 impl<St: Stream> Unpin for Peek<'_, St> {}
160 
161 impl<St> fmt::Debug for Peek<'_, St>
162 where
163     St: Stream + fmt::Debug,
164     St::Item: fmt::Debug,
165 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result166     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
167         f.debug_struct("Peek")
168             .field("inner", &self.inner)
169             .finish()
170     }
171 }
172 
173 impl<St: Stream> FusedFuture for Peek<'_, St> {
is_terminated(&self) -> bool174     fn is_terminated(&self) -> bool {
175         self.inner.is_none()
176     }
177 }
178 
179 impl<'a, St> Future for Peek<'a, St>
180 where
181     St: Stream,
182 {
183     type Output = Option<&'a St::Item>;
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>184     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
185         if let Some(peekable) = self.inner.take() {
186             match peekable.do_poll_peek(cx) {
187                 Either::Left(peekable) => {
188                     self.inner = Some(peekable);
189                     Poll::Pending
190                 }
191                 Either::Right(peek) => Poll::Ready(peek),
192             }
193         } else {
194             panic!("Peek polled after completion")
195         }
196     }
197 }
198