1 use crate::stream::{StreamExt, Fuse}; 2 use core::pin::Pin; 3 use futures_core::stream::{FusedStream, Stream}; 4 use futures_core::task::{Context, Poll}; 5 #[cfg(feature = "sink")] 6 use futures_sink::Sink; 7 use pin_utils::{unsafe_pinned, unsafe_unpinned}; 8 9 /// A `Stream` that implements a `peek` method. 10 /// 11 /// The `peek` method can be used to retrieve a reference 12 /// to the next `Stream::Item` if available. A subsequent 13 /// call to `poll` will return the owned item. 14 #[derive(Debug)] 15 #[must_use = "streams do nothing unless polled"] 16 pub struct Peekable<St: Stream> { 17 stream: Fuse<St>, 18 peeked: Option<St::Item>, 19 } 20 21 impl<St: Stream + Unpin> Unpin for Peekable<St> {} 22 23 impl<St: Stream> Peekable<St> { 24 unsafe_pinned!(stream: Fuse<St>); 25 unsafe_unpinned!(peeked: Option<St::Item>); 26 new(stream: St) -> Peekable<St>27 pub(super) fn new(stream: St) -> Peekable<St> { 28 Peekable { 29 stream: stream.fuse(), 30 peeked: None 31 } 32 } 33 34 /// Acquires a reference to the underlying stream that this combinator is 35 /// pulling from. get_ref(&self) -> &St36 pub fn get_ref(&self) -> &St { 37 self.stream.get_ref() 38 } 39 40 /// Acquires a mutable reference to the underlying stream that this 41 /// combinator is pulling from. 42 /// 43 /// Note that care must be taken to avoid tampering with the state of the 44 /// stream which may otherwise confuse this combinator. get_mut(&mut self) -> &mut St45 pub fn get_mut(&mut self) -> &mut St { 46 self.stream.get_mut() 47 } 48 49 /// Acquires a pinned mutable reference to the underlying stream that this 50 /// combinator is pulling from. 51 /// 52 /// Note that care must be taken to avoid tampering with the state of the 53 /// stream which may otherwise confuse this combinator. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St>54 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { 55 self.stream().get_pin_mut() 56 } 57 58 /// Consumes this combinator, returning the underlying stream. 59 /// 60 /// Note that this may discard intermediate state of this combinator, so 61 /// care should be taken to avoid losing resources when this is called. into_inner(self) -> St62 pub fn into_inner(self) -> St { 63 self.stream.into_inner() 64 } 65 66 /// Peek retrieves a reference to the next item in the stream. 67 /// 68 /// This method polls the underlying stream and return either a reference 69 /// to the next item if the stream is ready or passes through any errors. poll_peek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<&St::Item>>70 pub fn poll_peek( 71 mut self: Pin<&mut Self>, 72 cx: &mut Context<'_>, 73 ) -> Poll<Option<&St::Item>> { 74 if self.peeked.is_some() { 75 let this: &Self = self.into_ref().get_ref(); 76 return Poll::Ready(this.peeked.as_ref()) 77 } 78 match ready!(self.as_mut().stream().poll_next(cx)) { 79 None => Poll::Ready(None), 80 Some(item) => { 81 *self.as_mut().peeked() = Some(item); 82 let this: &Self = self.into_ref().get_ref(); 83 Poll::Ready(this.peeked.as_ref()) 84 } 85 } 86 } 87 } 88 89 impl<St: Stream> FusedStream for Peekable<St> { is_terminated(&self) -> bool90 fn is_terminated(&self) -> bool { 91 self.peeked.is_none() && self.stream.is_terminated() 92 } 93 } 94 95 impl<S: Stream> Stream for Peekable<S> { 96 type Item = S::Item; 97 poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>98 fn poll_next( 99 mut self: Pin<&mut Self>, 100 cx: &mut Context<'_>, 101 ) -> Poll<Option<Self::Item>> { 102 if let Some(item) = self.as_mut().peeked().take() { 103 return Poll::Ready(Some(item)) 104 } 105 self.as_mut().stream().poll_next(cx) 106 } 107 size_hint(&self) -> (usize, Option<usize>)108 fn size_hint(&self) -> (usize, Option<usize>) { 109 let peek_len = if self.peeked.is_some() { 1 } else { 0 }; 110 let (lower, upper) = self.stream.size_hint(); 111 let lower = lower.saturating_add(peek_len); 112 let upper = match upper { 113 Some(x) => x.checked_add(peek_len), 114 None => None, 115 }; 116 (lower, upper) 117 } 118 } 119 120 // Forwarding impl of Sink from the underlying stream 121 #[cfg(feature = "sink")] 122 impl<S, Item> Sink<Item> for Peekable<S> 123 where S: Sink<Item> + Stream 124 { 125 type Error = S::Error; 126 127 delegate_sink!(stream, Item); 128 } 129