1 use crate::stream::{Fuse, StreamExt}; 2 use core::fmt; 3 use core::pin::Pin; 4 use futures_core::future::{FusedFuture, Future}; 5 use futures_core::stream::{FusedStream, Stream}; 6 use futures_core::task::{Context, Poll}; 7 #[cfg(feature = "sink")] 8 use futures_sink::Sink; 9 use pin_project::pin_project; 10 11 /// A `Stream` that implements a `peek` method. 12 /// 13 /// The `peek` method can be used to retrieve a reference 14 /// to the next `Stream::Item` if available. A subsequent 15 /// call to `poll` will return the owned item. 16 #[pin_project] 17 #[derive(Debug)] 18 #[must_use = "streams do nothing unless polled"] 19 pub struct Peekable<St: Stream> { 20 #[pin] 21 stream: Fuse<St>, 22 peeked: Option<St::Item>, 23 } 24 25 impl<St: Stream> Peekable<St> { new(stream: St) -> Peekable<St>26 pub(super) fn new(stream: St) -> Peekable<St> { 27 Peekable { 28 stream: stream.fuse(), 29 peeked: None, 30 } 31 } 32 33 delegate_access_inner!(stream, St, (.)); 34 35 /// Produces a `Peek` future which retrieves a reference to the next item 36 /// in the stream, or `None` if the underlying stream terminates. peek(self: Pin<&mut Self>) -> Peek<'_, St>37 pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> { 38 Peek { inner: Some(self) } 39 } 40 41 /// Peek retrieves a reference to the next item in the stream. 42 /// 43 /// This method polls the underlying stream and return either a reference 44 /// to the next item if the stream is ready or passes through any errors. poll_peek( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<&St::Item>>45 pub fn poll_peek( 46 self: Pin<&mut Self>, 47 cx: &mut Context<'_>, 48 ) -> Poll<Option<&St::Item>> { 49 let mut this = self.project(); 50 51 Poll::Ready(loop { 52 if this.peeked.is_some() { 53 break this.peeked.as_ref(); 54 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { 55 *this.peeked = Some(item); 56 } else { 57 break None; 58 } 59 }) 60 } 61 } 62 63 impl<St: Stream> FusedStream for Peekable<St> { is_terminated(&self) -> bool64 fn is_terminated(&self) -> bool { 65 self.peeked.is_none() && self.stream.is_terminated() 66 } 67 } 68 69 impl<S: Stream> Stream for Peekable<S> { 70 type Item = S::Item; 71 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>72 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 73 let this = self.project(); 74 if let Some(item) = this.peeked.take() { 75 return Poll::Ready(Some(item)); 76 } 77 this.stream.poll_next(cx) 78 } 79 size_hint(&self) -> (usize, Option<usize>)80 fn size_hint(&self) -> (usize, Option<usize>) { 81 let peek_len = if self.peeked.is_some() { 1 } else { 0 }; 82 let (lower, upper) = self.stream.size_hint(); 83 let lower = lower.saturating_add(peek_len); 84 let upper = match upper { 85 Some(x) => x.checked_add(peek_len), 86 None => None, 87 }; 88 (lower, upper) 89 } 90 } 91 92 // Forwarding impl of Sink from the underlying stream 93 #[cfg(feature = "sink")] 94 impl<S, Item> Sink<Item> for Peekable<S> 95 where 96 S: Sink<Item> + Stream, 97 { 98 type Error = S::Error; 99 100 delegate_sink!(stream, Item); 101 } 102 103 /// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`] 104 #[pin_project] 105 #[must_use = "futures do nothing unless polled"] 106 pub struct Peek<'a, St: Stream> { 107 inner: Option<Pin<&'a mut Peekable<St>>>, 108 } 109 110 impl<St> fmt::Debug for Peek<'_, St> 111 where 112 St: Stream + fmt::Debug, 113 St::Item: fmt::Debug, 114 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 116 f.debug_struct("Peek") 117 .field("inner", &self.inner) 118 .finish() 119 } 120 } 121 122 impl<St: Stream> FusedFuture for Peek<'_, St> { is_terminated(&self) -> bool123 fn is_terminated(&self) -> bool { 124 self.inner.is_none() 125 } 126 } 127 128 impl<'a, St> Future for Peek<'a, St> 129 where 130 St: Stream, 131 { 132 type Output = Option<&'a St::Item>; poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>133 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 134 let inner = self.project().inner; 135 if let Some(peekable) = inner { 136 ready!(peekable.as_mut().poll_peek(cx)); 137 138 inner.take().unwrap().poll_peek(cx) 139 } else { 140 panic!("Peek polled after completion") 141 } 142 } 143 } 144