use crate::stream::{StreamExt, Fuse}; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_utils::{unsafe_pinned, unsafe_unpinned}; /// A `Stream` that implements a `peek` method. /// /// The `peek` method can be used to retrieve a reference /// to the next `Stream::Item` if available. A subsequent /// call to `poll` will return the owned item. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Peekable { stream: Fuse, peeked: Option, } impl Unpin for Peekable {} impl Peekable { unsafe_pinned!(stream: Fuse); unsafe_unpinned!(peeked: Option); pub(super) fn new(stream: St) -> Peekable { Peekable { stream: stream.fuse(), peeked: None } } /// Acquires a reference to the underlying stream that this combinator is /// pulling from. pub fn get_ref(&self) -> &St { self.stream.get_ref() } /// Acquires a mutable reference to the underlying stream that this /// combinator is pulling from. /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. pub fn get_mut(&mut self) -> &mut St { self.stream.get_mut() } /// Acquires a pinned mutable reference to the underlying stream that this /// combinator is pulling from. /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { self.stream().get_pin_mut() } /// Consumes this combinator, returning the underlying stream. /// /// Note that this may discard intermediate state of this combinator, so /// care should be taken to avoid losing resources when this is called. pub fn into_inner(self) -> St { self.stream.into_inner() } /// Peek retrieves a reference to the next item in the stream. /// /// This method polls the underlying stream and return either a reference /// to the next item if the stream is ready or passes through any errors. pub fn poll_peek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { if self.peeked.is_some() { let this: &Self = self.into_ref().get_ref(); return Poll::Ready(this.peeked.as_ref()) } match ready!(self.as_mut().stream().poll_next(cx)) { None => Poll::Ready(None), Some(item) => { *self.as_mut().peeked() = Some(item); let this: &Self = self.into_ref().get_ref(); Poll::Ready(this.peeked.as_ref()) } } } } impl FusedStream for Peekable { fn is_terminated(&self) -> bool { self.peeked.is_none() && self.stream.is_terminated() } } impl Stream for Peekable { type Item = S::Item; fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { if let Some(item) = self.as_mut().peeked().take() { return Poll::Ready(Some(item)) } self.as_mut().stream().poll_next(cx) } fn size_hint(&self) -> (usize, Option) { let peek_len = if self.peeked.is_some() { 1 } else { 0 }; let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(peek_len); let upper = match upper { Some(x) => x.checked_add(peek_len), None => None, }; (lower, upper) } } // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl Sink for Peekable where S: Sink + Stream { type Error = S::Error; delegate_sink!(stream, Item); }