1 use core::pin::Pin;
2 use futures_core::stream::{FusedStream, Stream};
3 use futures_core::task::{Context, Poll};
4 #[cfg(feature = "sink")]
5 use futures_sink::Sink;
6 use pin_project::pin_project;
7 
8 /// Stream for the [`skip`](super::StreamExt::skip) method.
9 #[pin_project]
10 #[derive(Debug)]
11 #[must_use = "streams do nothing unless polled"]
12 pub struct Skip<St> {
13     #[pin]
14     stream: St,
15     remaining: usize,
16 }
17 
18 impl<St: Stream> Skip<St> {
new(stream: St, n: usize) -> Skip<St>19     pub(super) fn new(stream: St, n: usize) -> Skip<St> {
20         Skip {
21             stream,
22             remaining: n,
23         }
24     }
25 
26     delegate_access_inner!(stream, St, ());
27 }
28 
29 impl<St: FusedStream> FusedStream for Skip<St> {
is_terminated(&self) -> bool30     fn is_terminated(&self) -> bool {
31         self.stream.is_terminated()
32     }
33 }
34 
35 impl<St: Stream> Stream for Skip<St> {
36     type Item = St::Item;
37 
poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>>38     fn poll_next(
39         self: Pin<&mut Self>,
40         cx: &mut Context<'_>,
41     ) -> Poll<Option<St::Item>> {
42         let mut this = self.project();
43 
44         while *this.remaining > 0 {
45             if ready!(this.stream.as_mut().poll_next(cx)).is_some() {
46                 *this.remaining -= 1;
47             } else {
48                 return Poll::Ready(None);
49             }
50         }
51 
52         this.stream.poll_next(cx)
53     }
54 
size_hint(&self) -> (usize, Option<usize>)55     fn size_hint(&self) -> (usize, Option<usize>) {
56         let (lower, upper) = self.stream.size_hint();
57 
58         let lower = lower.saturating_sub(self.remaining as usize);
59         let upper = match upper {
60             Some(x) => Some(x.saturating_sub(self.remaining as usize)),
61             None => None,
62         };
63 
64         (lower, upper)
65     }
66 }
67 
68 // Forwarding impl of Sink from the underlying stream
69 #[cfg(feature = "sink")]
70 impl<S, Item> Sink<Item> for Skip<S>
71 where
72     S: Stream + Sink<Item>,
73 {
74     type Error = S::Error;
75 
76     delegate_sink!(stream, Item);
77 }
78