1 use core::pin::Pin;
2 use futures_core::stream::{FusedStream, Stream};
3 use futures_core::task::{Context, Poll};
4 #[cfg(feature = "sink")]
5 use futures_sink::Sink;
6 use pin_utils::{unsafe_pinned, unsafe_unpinned};
7 
8 /// Stream for the [`skip`](super::StreamExt::skip) method.
9 #[derive(Debug)]
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Skip<St> {
12     stream: St,
13     remaining: u64,
14 }
15 
16 impl<St: Unpin> Unpin for Skip<St> {}
17 
18 impl<St: Stream> Skip<St> {
19     unsafe_pinned!(stream: St);
20     unsafe_unpinned!(remaining: u64);
21 
new(stream: St, n: u64) -> Skip<St>22     pub(super) fn new(stream: St, n: u64) -> Skip<St> {
23         Skip {
24             stream,
25             remaining: n,
26         }
27     }
28 
29     /// Acquires a reference to the underlying stream that this combinator is
30     /// pulling from.
get_ref(&self) -> &St31     pub fn get_ref(&self) -> &St {
32         &self.stream
33     }
34 
35     /// Acquires a mutable reference to the underlying stream that this
36     /// combinator is pulling from.
37     ///
38     /// Note that care must be taken to avoid tampering with the state of the
39     /// stream which may otherwise confuse this combinator.
get_mut(&mut self) -> &mut St40     pub fn get_mut(&mut self) -> &mut St {
41         &mut self.stream
42     }
43 
44     /// Acquires a pinned mutable reference to the underlying stream that this
45     /// combinator is pulling from.
46     ///
47     /// Note that care must be taken to avoid tampering with the state of the
48     /// stream which may otherwise confuse this combinator.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St>49     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
50         self.stream()
51     }
52 
53     /// Consumes this combinator, returning the underlying stream.
54     ///
55     /// Note that this may discard intermediate state of this combinator, so
56     /// care should be taken to avoid losing resources when this is called.
into_inner(self) -> St57     pub fn into_inner(self) -> St {
58         self.stream
59     }
60 }
61 
62 impl<St: FusedStream> FusedStream for Skip<St> {
is_terminated(&self) -> bool63     fn is_terminated(&self) -> bool {
64         self.stream.is_terminated()
65     }
66 }
67 
68 impl<St: Stream> Stream for Skip<St> {
69     type Item = St::Item;
70 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>>71     fn poll_next(
72         mut self: Pin<&mut Self>,
73         cx: &mut Context<'_>,
74     ) -> Poll<Option<St::Item>> {
75         while self.remaining > 0 {
76             match ready!(self.as_mut().stream().poll_next(cx)) {
77                 Some(_) => *self.as_mut().remaining() -= 1,
78                 None => return Poll::Ready(None),
79             }
80         }
81 
82         self.as_mut().stream().poll_next(cx)
83     }
84 
size_hint(&self) -> (usize, Option<usize>)85     fn size_hint(&self) -> (usize, Option<usize>) {
86         let (lower, upper) = self.stream.size_hint();
87 
88         let lower = lower.saturating_sub(self.remaining as usize);
89         let upper = match upper {
90             Some(x) => Some(x.saturating_sub(self.remaining as usize)),
91             None => None,
92         };
93 
94         (lower, upper)
95     }
96 }
97 
98 // Forwarding impl of Sink from the underlying stream
99 #[cfg(feature = "sink")]
100 impl<S, Item> Sink<Item> for Skip<S>
101 where
102     S: Stream + Sink<Item>,
103 {
104     type Error = S::Error;
105 
106     delegate_sink!(stream, Item);
107 }
108