1 use core::pin::Pin;
2 use futures_core::stream::{FusedStream, Stream};
3 use futures_core::task::{Context, Poll};
4 #[cfg(feature = "sink")]
5 use futures_sink::Sink;
6 use pin_project::{pin_project, project};
7 
8 /// Stream for the [`skip`](super::StreamExt::skip) method.
9 #[pin_project]
10 #[derive(Debug)]
11 #[must_use = "streams do nothing unless polled"]
12 pub struct Skip<St> {
13     #[pin]
14     stream: St,
15     remaining: usize,
16 }
17 
18 impl<St: Stream> Skip<St> {
new(stream: St, n: usize) -> Skip<St>19     pub(super) fn new(stream: St, n: usize) -> Skip<St> {
20         Skip {
21             stream,
22             remaining: n,
23         }
24     }
25 
26     delegate_access_inner!(stream, St, ());
27 }
28 
29 impl<St: FusedStream> FusedStream for Skip<St> {
is_terminated(&self) -> bool30     fn is_terminated(&self) -> bool {
31         self.stream.is_terminated()
32     }
33 }
34 
35 impl<St: Stream> Stream for Skip<St> {
36     type Item = St::Item;
37 
38     #[project]
poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>>39     fn poll_next(
40         self: Pin<&mut Self>,
41         cx: &mut Context<'_>,
42     ) -> Poll<Option<St::Item>> {
43         #[project]
44         let Skip { mut stream, remaining } = self.project();
45         while *remaining > 0 {
46             if ready!(stream.as_mut().poll_next(cx)).is_some() {
47                 *remaining -= 1;
48             } else {
49                 return Poll::Ready(None);
50             }
51         }
52 
53         stream.poll_next(cx)
54     }
55 
size_hint(&self) -> (usize, Option<usize>)56     fn size_hint(&self) -> (usize, Option<usize>) {
57         let (lower, upper) = self.stream.size_hint();
58 
59         let lower = lower.saturating_sub(self.remaining as usize);
60         let upper = match upper {
61             Some(x) => Some(x.saturating_sub(self.remaining as usize)),
62             None => None,
63         };
64 
65         (lower, upper)
66     }
67 }
68 
69 // Forwarding impl of Sink from the underlying stream
70 #[cfg(feature = "sink")]
71 impl<S, Item> Sink<Item> for Skip<S>
72 where
73     S: Stream + Sink<Item>,
74 {
75     type Error = S::Error;
76 
77     delegate_sink!(stream, Item);
78 }
79