1 use crate::stream::{Fuse, StreamExt}; 2 use core::fmt; 3 use core::pin::Pin; 4 use futures_core::future::{FusedFuture, Future}; 5 use futures_core::stream::{FusedStream, Stream}; 6 use futures_core::task::{Context, Poll}; 7 #[cfg(feature = "sink")] 8 use futures_sink::Sink; 9 use pin_project::{pin_project, project}; 10 11 /// A `Stream` that implements a `peek` method. 12 /// 13 /// The `peek` method can be used to retrieve a reference 14 /// to the next `Stream::Item` if available. A subsequent 15 /// call to `poll` will return the owned item. 16 #[pin_project] 17 #[derive(Debug)] 18 #[must_use = "streams do nothing unless polled"] 19 pub struct Peekable<St: Stream> { 20 #[pin] 21 stream: Fuse<St>, 22 peeked: Option<St::Item>, 23 } 24 25 impl<St: Stream> Peekable<St> { new(stream: St) -> Peekable<St>26 pub(super) fn new(stream: St) -> Peekable<St> { 27 Peekable { 28 stream: stream.fuse(), 29 peeked: None, 30 } 31 } 32 33 delegate_access_inner!(stream, St, (.)); 34 35 /// Produces a `Peek` future which retrieves a reference to the next item 36 /// in the stream, or `None` if the underlying stream terminates. peek(self: Pin<&mut Self>) -> Peek<'_, St>37 pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> { 38 Peek { inner: Some(self) } 39 } 40 41 /// Peek retrieves a reference to the next item in the stream. 42 /// 43 /// This method polls the underlying stream and return either a reference 44 /// to the next item if the stream is ready or passes through any errors. 45 #[project] poll_peek( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<&St::Item>>46 pub fn poll_peek( 47 self: Pin<&mut Self>, 48 cx: &mut Context<'_>, 49 ) -> Poll<Option<&St::Item>> { 50 #[project] 51 let Peekable { mut stream, peeked } = self.project(); 52 53 Poll::Ready(loop { 54 if peeked.is_some() { 55 break peeked.as_ref(); 56 } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { 57 *peeked = Some(item); 58 } else { 59 break None; 60 } 61 }) 62 } 63 } 64 65 impl<St: Stream> FusedStream for Peekable<St> { is_terminated(&self) -> bool66 fn is_terminated(&self) -> bool { 67 self.peeked.is_none() && self.stream.is_terminated() 68 } 69 } 70 71 impl<S: Stream> Stream for Peekable<S> { 72 type Item = S::Item; 73 74 #[project] poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>75 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 76 #[project] 77 let Peekable { stream, peeked } = self.project(); 78 if let Some(item) = peeked.take() { 79 return Poll::Ready(Some(item)); 80 } 81 stream.poll_next(cx) 82 } 83 size_hint(&self) -> (usize, Option<usize>)84 fn size_hint(&self) -> (usize, Option<usize>) { 85 let peek_len = if self.peeked.is_some() { 1 } else { 0 }; 86 let (lower, upper) = self.stream.size_hint(); 87 let lower = lower.saturating_add(peek_len); 88 let upper = match upper { 89 Some(x) => x.checked_add(peek_len), 90 None => None, 91 }; 92 (lower, upper) 93 } 94 } 95 96 // Forwarding impl of Sink from the underlying stream 97 #[cfg(feature = "sink")] 98 impl<S, Item> Sink<Item> for Peekable<S> 99 where 100 S: Sink<Item> + Stream, 101 { 102 type Error = S::Error; 103 104 delegate_sink!(stream, Item); 105 } 106 107 /// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`] 108 #[pin_project] 109 #[must_use = "futures do nothing unless polled"] 110 pub struct Peek<'a, St: Stream> { 111 inner: Option<Pin<&'a mut Peekable<St>>>, 112 } 113 114 impl<St> fmt::Debug for Peek<'_, St> 115 where 116 St: Stream + fmt::Debug, 117 St::Item: fmt::Debug, 118 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result119 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 120 f.debug_struct("Peek") 121 .field("inner", &self.inner) 122 .finish() 123 } 124 } 125 126 impl<St: Stream> FusedFuture for Peek<'_, St> { is_terminated(&self) -> bool127 fn is_terminated(&self) -> bool { 128 self.inner.is_none() 129 } 130 } 131 132 impl<'a, St> Future for Peek<'a, St> 133 where 134 St: Stream, 135 { 136 type Output = Option<&'a St::Item>; poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>137 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 138 let inner = self.project().inner; 139 if let Some(peekable) = inner { 140 ready!(peekable.as_mut().poll_peek(cx)); 141 142 inner.take().unwrap().poll_peek(cx) 143 } else { 144 panic!("Peek polled after completion") 145 } 146 } 147 } 148