1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::future::Future;
4 use futures_core::stream::{FusedStream, Stream};
5 use futures_core::task::{Context, Poll};
6 #[cfg(feature = "sink")]
7 use futures_sink::Sink;
8 use pin_project::{pin_project, project};
9 
10 /// Stream for the [`skip_while`](super::StreamExt::skip_while) method.
11 #[pin_project]
12 #[must_use = "streams do nothing unless polled"]
13 pub struct SkipWhile<St, Fut, F> where St: Stream {
14     #[pin]
15     stream: St,
16     f: F,
17     #[pin]
18     pending_fut: Option<Fut>,
19     pending_item: Option<St::Item>,
20     done_skipping: bool,
21 }
22 
23 impl<St, Fut, F> fmt::Debug for SkipWhile<St, Fut, F>
24 where
25     St: Stream + fmt::Debug,
26     St::Item: fmt::Debug,
27     Fut: fmt::Debug,
28 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result29     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30         f.debug_struct("SkipWhile")
31             .field("stream", &self.stream)
32             .field("pending_fut", &self.pending_fut)
33             .field("pending_item", &self.pending_item)
34             .field("done_skipping", &self.done_skipping)
35             .finish()
36     }
37 }
38 
39 impl<St, Fut, F> SkipWhile<St, Fut, F>
40     where St: Stream,
41           F: FnMut(&St::Item) -> Fut,
42           Fut: Future<Output = bool>,
43 {
new(stream: St, f: F) -> SkipWhile<St, Fut, F>44     pub(super) fn new(stream: St, f: F) -> SkipWhile<St, Fut, F> {
45         SkipWhile {
46             stream,
47             f,
48             pending_fut: None,
49             pending_item: None,
50             done_skipping: false,
51         }
52     }
53 
54     delegate_access_inner!(stream, St, ());
55 }
56 
57 impl<St, Fut, F> FusedStream for SkipWhile<St, Fut, F>
58     where St: FusedStream,
59           F: FnMut(&St::Item) -> Fut,
60           Fut: Future<Output = bool>,
61 {
is_terminated(&self) -> bool62     fn is_terminated(&self) -> bool {
63         self.pending_item.is_none() && self.stream.is_terminated()
64     }
65 }
66 
67 impl<St, Fut, F> Stream for SkipWhile<St, Fut, F>
68     where St: Stream,
69           F: FnMut(&St::Item) -> Fut,
70           Fut: Future<Output = bool>,
71 {
72     type Item = St::Item;
73 
74     #[project]
poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>>75     fn poll_next(
76         self: Pin<&mut Self>,
77         cx: &mut Context<'_>,
78     ) -> Poll<Option<St::Item>> {
79         #[project]
80         let SkipWhile { mut stream, f, mut pending_fut, pending_item, done_skipping } = self.project();
81 
82         if *done_skipping {
83             return stream.poll_next(cx);
84         }
85 
86         Poll::Ready(loop {
87             if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
88                 let skipped = ready!(fut.poll(cx));
89                 let item = pending_item.take();
90                 pending_fut.set(None);
91                 if !skipped {
92                     *done_skipping = true;
93                     break item;
94                 }
95             } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
96                 pending_fut.set(Some(f(&item)));
97                 *pending_item = Some(item);
98             } else {
99                 break None;
100             }
101         })
102     }
103 
size_hint(&self) -> (usize, Option<usize>)104     fn size_hint(&self) -> (usize, Option<usize>) {
105         if self.done_skipping {
106             self.stream.size_hint()
107         } else {
108             let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
109             let (_, upper) = self.stream.size_hint();
110             let upper = match upper {
111                 Some(x) => x.checked_add(pending_len),
112                 None => None,
113             };
114             (0, upper) // can't know a lower bound, due to the predicate
115         }
116     }
117 }
118 
119 // Forwarding impl of Sink from the underlying stream
120 #[cfg(feature = "sink")]
121 impl<S, Fut, F, Item> Sink<Item> for SkipWhile<S, Fut, F>
122     where S: Stream + Sink<Item>,
123           F: FnMut(&S::Item) -> Fut,
124           Fut: Future<Output = bool>,
125 {
126     type Error = S::Error;
127 
128     delegate_sink!(stream, Item);
129 }
130