1 use core::pin::Pin;
2 use futures_core::ready;
3 use futures_core::stream::{FusedStream, Stream};
4 use futures_core::task::{Context, Poll};
5 #[cfg(feature = "sink")]
6 use futures_sink::Sink;
7 use pin_project_lite::pin_project;
8 
9 pin_project! {
10     /// Stream for the [`skip`](super::StreamExt::skip) method.
11     #[derive(Debug)]
12     #[must_use = "streams do nothing unless polled"]
13     pub struct Skip<St> {
14         #[pin]
15         stream: St,
16         remaining: usize,
17     }
18 }
19 
20 impl<St: Stream> Skip<St> {
new(stream: St, n: usize) -> Self21     pub(super) fn new(stream: St, n: usize) -> Self {
22         Self { stream, remaining: n }
23     }
24 
25     delegate_access_inner!(stream, St, ());
26 }
27 
28 impl<St: FusedStream> FusedStream for Skip<St> {
is_terminated(&self) -> bool29     fn is_terminated(&self) -> bool {
30         self.stream.is_terminated()
31     }
32 }
33 
34 impl<St: Stream> Stream for Skip<St> {
35     type Item = St::Item;
36 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>>37     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
38         let mut this = self.project();
39 
40         while *this.remaining > 0 {
41             if ready!(this.stream.as_mut().poll_next(cx)).is_some() {
42                 *this.remaining -= 1;
43             } else {
44                 return Poll::Ready(None);
45             }
46         }
47 
48         this.stream.poll_next(cx)
49     }
50 
size_hint(&self) -> (usize, Option<usize>)51     fn size_hint(&self) -> (usize, Option<usize>) {
52         let (lower, upper) = self.stream.size_hint();
53 
54         let lower = lower.saturating_sub(self.remaining);
55         let upper = upper.map(|x| x.saturating_sub(self.remaining));
56 
57         (lower, upper)
58     }
59 }
60 
61 // Forwarding impl of Sink from the underlying stream
62 #[cfg(feature = "sink")]
63 impl<S, Item> Sink<Item> for Skip<S>
64 where
65     S: Stream + Sink<Item>,
66 {
67     type Error = S::Error;
68 
69     delegate_sink!(stream, Item);
70 }
71