1 use crate::future::Either; 2 use crate::stream::{Fuse, StreamExt}; 3 use core::fmt; 4 use core::pin::Pin; 5 use futures_core::future::{FusedFuture, Future}; 6 use futures_core::stream::{FusedStream, Stream}; 7 use futures_core::task::{Context, Poll}; 8 #[cfg(feature = "sink")] 9 use futures_sink::Sink; 10 use pin_utils::{unsafe_pinned, unsafe_unpinned}; 11 12 /// A `Stream` that implements a `peek` method. 13 /// 14 /// The `peek` method can be used to retrieve a reference 15 /// to the next `Stream::Item` if available. A subsequent 16 /// call to `poll` will return the owned item. 17 #[derive(Debug)] 18 #[must_use = "streams do nothing unless polled"] 19 pub struct Peekable<St: Stream> { 20 stream: Fuse<St>, 21 peeked: Option<St::Item>, 22 } 23 24 impl<St: Stream + Unpin> Unpin for Peekable<St> {} 25 26 impl<St: Stream> Peekable<St> { 27 unsafe_pinned!(stream: Fuse<St>); 28 unsafe_unpinned!(peeked: Option<St::Item>); 29 new(stream: St) -> Peekable<St>30 pub(super) fn new(stream: St) -> Peekable<St> { 31 Peekable { 32 stream: stream.fuse(), 33 peeked: None, 34 } 35 } 36 37 /// Acquires a reference to the underlying stream that this combinator is 38 /// pulling from. get_ref(&self) -> &St39 pub fn get_ref(&self) -> &St { 40 self.stream.get_ref() 41 } 42 43 /// Acquires a mutable reference to the underlying stream that this 44 /// combinator is pulling from. 45 /// 46 /// Note that care must be taken to avoid tampering with the state of the 47 /// stream which may otherwise confuse this combinator. get_mut(&mut self) -> &mut St48 pub fn get_mut(&mut self) -> &mut St { 49 self.stream.get_mut() 50 } 51 52 /// Acquires a pinned mutable reference to the underlying stream that this 53 /// combinator is pulling from. 54 /// 55 /// Note that care must be taken to avoid tampering with the state of the 56 /// stream which may otherwise confuse this combinator. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St>57 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { 58 self.stream().get_pin_mut() 59 } 60 61 /// Consumes this combinator, returning the underlying stream. 62 /// 63 /// Note that this may discard intermediate state of this combinator, so 64 /// care should be taken to avoid losing resources when this is called. into_inner(self) -> St65 pub fn into_inner(self) -> St { 66 self.stream.into_inner() 67 } 68 69 /// Produces a `Peek` future which retrieves a reference to the next item 70 /// in the stream, or `None` if the underlying stream terminates. peek(self: Pin<&mut Self>) -> Peek<'_, St>71 pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> { 72 Peek { inner: Some(self) } 73 } 74 75 /// Attempt to poll the underlying stream, and return the mutable borrow 76 /// in case that is desirable to try for another time. 77 /// In case a peeking poll is successful, the reference to the next item 78 /// will be in the `Either::Right` variant; otherwise, the mutable borrow 79 /// will be in the `Either::Left` variant. do_poll_peek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Either<Pin<&mut Self>, Option<&St::Item>>80 fn do_poll_peek( 81 mut self: Pin<&mut Self>, 82 cx: &mut Context<'_>, 83 ) -> Either<Pin<&mut Self>, Option<&St::Item>> { 84 if self.peeked.is_some() { 85 let this: &Self = self.into_ref().get_ref(); 86 return Either::Right(this.peeked.as_ref()); 87 } 88 match self.as_mut().stream().poll_next(cx) { 89 Poll::Ready(None) => Either::Right(None), 90 Poll::Ready(Some(item)) => { 91 *self.as_mut().peeked() = Some(item); 92 let this: &Self = self.into_ref().get_ref(); 93 Either::Right(this.peeked.as_ref()) 94 } 95 _ => Either::Left(self), 96 } 97 } 98 99 /// Peek retrieves a reference to the next item in the stream. 100 /// 101 /// This method polls the underlying stream and return either a reference 102 /// to the next item if the stream is ready or passes through any errors. poll_peek( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<&St::Item>>103 pub fn poll_peek( 104 self: Pin<&mut Self>, 105 cx: &mut Context<'_>, 106 ) -> Poll<Option<&St::Item>> { 107 match self.do_poll_peek(cx) { 108 Either::Left(_) => Poll::Pending, 109 Either::Right(poll) => Poll::Ready(poll), 110 } 111 } 112 } 113 114 impl<St: Stream> FusedStream for Peekable<St> { is_terminated(&self) -> bool115 fn is_terminated(&self) -> bool { 116 self.peeked.is_none() && self.stream.is_terminated() 117 } 118 } 119 120 impl<S: Stream> Stream for Peekable<S> { 121 type Item = S::Item; 122 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>123 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 124 if let Some(item) = self.as_mut().peeked().take() { 125 return Poll::Ready(Some(item)); 126 } 127 self.as_mut().stream().poll_next(cx) 128 } 129 size_hint(&self) -> (usize, Option<usize>)130 fn size_hint(&self) -> (usize, Option<usize>) { 131 let peek_len = if self.peeked.is_some() { 1 } else { 0 }; 132 let (lower, upper) = self.stream.size_hint(); 133 let lower = lower.saturating_add(peek_len); 134 let upper = match upper { 135 Some(x) => x.checked_add(peek_len), 136 None => None, 137 }; 138 (lower, upper) 139 } 140 } 141 142 // Forwarding impl of Sink from the underlying stream 143 #[cfg(feature = "sink")] 144 impl<S, Item> Sink<Item> for Peekable<S> 145 where 146 S: Sink<Item> + Stream, 147 { 148 type Error = S::Error; 149 150 delegate_sink!(stream, Item); 151 } 152 153 /// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`] 154 #[must_use = "futures do nothing unless polled"] 155 pub struct Peek<'a, St: Stream> { 156 inner: Option<Pin<&'a mut Peekable<St>>>, 157 } 158 159 impl<St: Stream> Unpin for Peek<'_, St> {} 160 161 impl<St> fmt::Debug for Peek<'_, St> 162 where 163 St: Stream + fmt::Debug, 164 St::Item: fmt::Debug, 165 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result166 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 167 f.debug_struct("Peek") 168 .field("inner", &self.inner) 169 .finish() 170 } 171 } 172 173 impl<St: Stream> FusedFuture for Peek<'_, St> { is_terminated(&self) -> bool174 fn is_terminated(&self) -> bool { 175 self.inner.is_none() 176 } 177 } 178 179 impl<'a, St> Future for Peek<'a, St> 180 where 181 St: Stream, 182 { 183 type Output = Option<&'a St::Item>; poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>184 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 185 if let Some(peekable) = self.inner.take() { 186 match peekable.do_poll_peek(cx) { 187 Either::Left(peekable) => { 188 self.inner = Some(peekable); 189 Poll::Pending 190 } 191 Either::Right(peek) => Poll::Ready(peek), 192 } 193 } else { 194 panic!("Peek polled after completion") 195 } 196 } 197 } 198