1 use core::pin::Pin; 2 use futures_core::stream::{FusedStream, Stream}; 3 use futures_core::task::{Context, Poll}; 4 #[cfg(feature = "sink")] 5 use futures_sink::Sink; 6 use pin_project::pin_project; 7 8 /// Stream for the [`skip`](super::StreamExt::skip) method. 9 #[pin_project] 10 #[derive(Debug)] 11 #[must_use = "streams do nothing unless polled"] 12 pub struct Skip<St> { 13 #[pin] 14 stream: St, 15 remaining: usize, 16 } 17 18 impl<St: Stream> Skip<St> { new(stream: St, n: usize) -> Skip<St>19 pub(super) fn new(stream: St, n: usize) -> Skip<St> { 20 Skip { 21 stream, 22 remaining: n, 23 } 24 } 25 26 delegate_access_inner!(stream, St, ()); 27 } 28 29 impl<St: FusedStream> FusedStream for Skip<St> { is_terminated(&self) -> bool30 fn is_terminated(&self) -> bool { 31 self.stream.is_terminated() 32 } 33 } 34 35 impl<St: Stream> Stream for Skip<St> { 36 type Item = St::Item; 37 poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>>38 fn poll_next( 39 self: Pin<&mut Self>, 40 cx: &mut Context<'_>, 41 ) -> Poll<Option<St::Item>> { 42 let mut this = self.project(); 43 44 while *this.remaining > 0 { 45 if ready!(this.stream.as_mut().poll_next(cx)).is_some() { 46 *this.remaining -= 1; 47 } else { 48 return Poll::Ready(None); 49 } 50 } 51 52 this.stream.poll_next(cx) 53 } 54 size_hint(&self) -> (usize, Option<usize>)55 fn size_hint(&self) -> (usize, Option<usize>) { 56 let (lower, upper) = self.stream.size_hint(); 57 58 let lower = lower.saturating_sub(self.remaining as usize); 59 let upper = match upper { 60 Some(x) => Some(x.saturating_sub(self.remaining as usize)), 61 None => None, 62 }; 63 64 (lower, upper) 65 } 66 } 67 68 // Forwarding impl of Sink from the underlying stream 69 #[cfg(feature = "sink")] 70 impl<S, Item> Sink<Item> for Skip<S> 71 where 72 S: Stream + Sink<Item>, 73 { 74 type Error = S::Error; 75 76 delegate_sink!(stream, Item); 77 } 78