1 use core::pin::Pin; 2 use futures_core::stream::{FusedStream, Stream}; 3 use futures_core::task::{Context, Poll}; 4 #[cfg(feature = "sink")] 5 use futures_sink::Sink; 6 use pin_project::{pin_project, project}; 7 8 /// Stream for the [`skip`](super::StreamExt::skip) method. 9 #[pin_project] 10 #[derive(Debug)] 11 #[must_use = "streams do nothing unless polled"] 12 pub struct Skip<St> { 13 #[pin] 14 stream: St, 15 remaining: usize, 16 } 17 18 impl<St: Stream> Skip<St> { new(stream: St, n: usize) -> Skip<St>19 pub(super) fn new(stream: St, n: usize) -> Skip<St> { 20 Skip { 21 stream, 22 remaining: n, 23 } 24 } 25 26 delegate_access_inner!(stream, St, ()); 27 } 28 29 impl<St: FusedStream> FusedStream for Skip<St> { is_terminated(&self) -> bool30 fn is_terminated(&self) -> bool { 31 self.stream.is_terminated() 32 } 33 } 34 35 impl<St: Stream> Stream for Skip<St> { 36 type Item = St::Item; 37 38 #[project] poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>>39 fn poll_next( 40 self: Pin<&mut Self>, 41 cx: &mut Context<'_>, 42 ) -> Poll<Option<St::Item>> { 43 #[project] 44 let Skip { mut stream, remaining } = self.project(); 45 while *remaining > 0 { 46 if ready!(stream.as_mut().poll_next(cx)).is_some() { 47 *remaining -= 1; 48 } else { 49 return Poll::Ready(None); 50 } 51 } 52 53 stream.poll_next(cx) 54 } 55 size_hint(&self) -> (usize, Option<usize>)56 fn size_hint(&self) -> (usize, Option<usize>) { 57 let (lower, upper) = self.stream.size_hint(); 58 59 let lower = lower.saturating_sub(self.remaining as usize); 60 let upper = match upper { 61 Some(x) => Some(x.saturating_sub(self.remaining as usize)), 62 None => None, 63 }; 64 65 (lower, upper) 66 } 67 } 68 69 // Forwarding impl of Sink from the underlying stream 70 #[cfg(feature = "sink")] 71 impl<S, Item> Sink<Item> for Skip<S> 72 where 73 S: Stream + Sink<Item>, 74 { 75 type Error = S::Error; 76 77 delegate_sink!(stream, Item); 78 } 79