1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::Future; 4 use futures_core::stream::{FusedStream, Stream}; 5 use futures_core::task::{Context, Poll}; 6 #[cfg(feature = "sink")] 7 use futures_sink::Sink; 8 use pin_project::{pin_project, project}; 9 10 /// Stream for the [`skip_while`](super::StreamExt::skip_while) method. 11 #[pin_project] 12 #[must_use = "streams do nothing unless polled"] 13 pub struct SkipWhile<St, Fut, F> where St: Stream { 14 #[pin] 15 stream: St, 16 f: F, 17 #[pin] 18 pending_fut: Option<Fut>, 19 pending_item: Option<St::Item>, 20 done_skipping: bool, 21 } 22 23 impl<St, Fut, F> fmt::Debug for SkipWhile<St, Fut, F> 24 where 25 St: Stream + fmt::Debug, 26 St::Item: fmt::Debug, 27 Fut: fmt::Debug, 28 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 30 f.debug_struct("SkipWhile") 31 .field("stream", &self.stream) 32 .field("pending_fut", &self.pending_fut) 33 .field("pending_item", &self.pending_item) 34 .field("done_skipping", &self.done_skipping) 35 .finish() 36 } 37 } 38 39 impl<St, Fut, F> SkipWhile<St, Fut, F> 40 where St: Stream, 41 F: FnMut(&St::Item) -> Fut, 42 Fut: Future<Output = bool>, 43 { new(stream: St, f: F) -> SkipWhile<St, Fut, F>44 pub(super) fn new(stream: St, f: F) -> SkipWhile<St, Fut, F> { 45 SkipWhile { 46 stream, 47 f, 48 pending_fut: None, 49 pending_item: None, 50 done_skipping: false, 51 } 52 } 53 54 delegate_access_inner!(stream, St, ()); 55 } 56 57 impl<St, Fut, F> FusedStream for SkipWhile<St, Fut, F> 58 where St: FusedStream, 59 F: FnMut(&St::Item) -> Fut, 60 Fut: Future<Output = bool>, 61 { is_terminated(&self) -> bool62 fn is_terminated(&self) -> bool { 63 self.pending_item.is_none() && self.stream.is_terminated() 64 } 65 } 66 67 impl<St, Fut, F> Stream for SkipWhile<St, Fut, F> 68 where St: Stream, 69 F: FnMut(&St::Item) -> Fut, 70 Fut: Future<Output = bool>, 71 { 72 type Item = St::Item; 73 74 #[project] poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>>75 fn poll_next( 76 self: Pin<&mut Self>, 77 cx: &mut Context<'_>, 78 ) -> Poll<Option<St::Item>> { 79 #[project] 80 let SkipWhile { mut stream, f, mut pending_fut, pending_item, done_skipping } = self.project(); 81 82 if *done_skipping { 83 return stream.poll_next(cx); 84 } 85 86 Poll::Ready(loop { 87 if let Some(fut) = pending_fut.as_mut().as_pin_mut() { 88 let skipped = ready!(fut.poll(cx)); 89 let item = pending_item.take(); 90 pending_fut.set(None); 91 if !skipped { 92 *done_skipping = true; 93 break item; 94 } 95 } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { 96 pending_fut.set(Some(f(&item))); 97 *pending_item = Some(item); 98 } else { 99 break None; 100 } 101 }) 102 } 103 size_hint(&self) -> (usize, Option<usize>)104 fn size_hint(&self) -> (usize, Option<usize>) { 105 if self.done_skipping { 106 self.stream.size_hint() 107 } else { 108 let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; 109 let (_, upper) = self.stream.size_hint(); 110 let upper = match upper { 111 Some(x) => x.checked_add(pending_len), 112 None => None, 113 }; 114 (0, upper) // can't know a lower bound, due to the predicate 115 } 116 } 117 } 118 119 // Forwarding impl of Sink from the underlying stream 120 #[cfg(feature = "sink")] 121 impl<S, Fut, F, Item> Sink<Item> for SkipWhile<S, Fut, F> 122 where S: Stream + Sink<Item>, 123 F: FnMut(&S::Item) -> Fut, 124 Fut: Future<Output = bool>, 125 { 126 type Error = S::Error; 127 128 delegate_sink!(stream, Item); 129 } 130