1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::TryFuture; 4 use futures_core::ready; 5 use futures_core::stream::{FusedStream, Stream, TryStream}; 6 use futures_core::task::{Context, Poll}; 7 #[cfg(feature = "sink")] 8 use futures_sink::Sink; 9 use pin_project_lite::pin_project; 10 11 pin_project! { 12 /// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while) 13 /// method. 14 #[must_use = "streams do nothing unless polled"] 15 pub struct TrySkipWhile<St, Fut, F> where St: TryStream { 16 #[pin] 17 stream: St, 18 f: F, 19 #[pin] 20 pending_fut: Option<Fut>, 21 pending_item: Option<St::Ok>, 22 done_skipping: bool, 23 } 24 } 25 26 impl<St, Fut, F> fmt::Debug for TrySkipWhile<St, Fut, F> 27 where 28 St: TryStream + fmt::Debug, 29 St::Ok: fmt::Debug, 30 Fut: fmt::Debug, 31 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 33 f.debug_struct("TrySkipWhile") 34 .field("stream", &self.stream) 35 .field("pending_fut", &self.pending_fut) 36 .field("pending_item", &self.pending_item) 37 .field("done_skipping", &self.done_skipping) 38 .finish() 39 } 40 } 41 42 impl<St, Fut, F> TrySkipWhile<St, Fut, F> 43 where 44 St: TryStream, 45 F: FnMut(&St::Ok) -> Fut, 46 Fut: TryFuture<Ok = bool, Error = St::Error>, 47 { new(stream: St, f: F) -> Self48 pub(super) fn new(stream: St, f: F) -> Self { 49 Self { stream, f, pending_fut: None, pending_item: None, done_skipping: false } 50 } 51 52 delegate_access_inner!(stream, St, ()); 53 } 54 55 impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F> 56 where 57 St: TryStream, 58 F: FnMut(&St::Ok) -> Fut, 59 Fut: TryFuture<Ok = bool, Error = St::Error>, 60 { 61 type Item = Result<St::Ok, St::Error>; 62 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>63 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 64 let mut this = self.project(); 65 66 if *this.done_skipping { 67 return this.stream.try_poll_next(cx); 68 } 69 70 Poll::Ready(loop { 71 if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { 72 let res = ready!(fut.try_poll(cx)); 73 this.pending_fut.set(None); 74 let skipped = res?; 75 let item = this.pending_item.take(); 76 if !skipped { 77 *this.done_skipping = true; 78 break item.map(Ok); 79 } 80 } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) { 81 this.pending_fut.set(Some((this.f)(&item))); 82 *this.pending_item = Some(item); 83 } else { 84 break None; 85 } 86 }) 87 } 88 size_hint(&self) -> (usize, Option<usize>)89 fn size_hint(&self) -> (usize, Option<usize>) { 90 let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; 91 let (_, upper) = self.stream.size_hint(); 92 let upper = match upper { 93 Some(x) => x.checked_add(pending_len), 94 None => None, 95 }; 96 (0, upper) // can't know a lower bound, due to the predicate 97 } 98 } 99 100 impl<St, Fut, F> FusedStream for TrySkipWhile<St, Fut, F> 101 where 102 St: TryStream + FusedStream, 103 F: FnMut(&St::Ok) -> Fut, 104 Fut: TryFuture<Ok = bool, Error = St::Error>, 105 { is_terminated(&self) -> bool106 fn is_terminated(&self) -> bool { 107 self.pending_item.is_none() && self.stream.is_terminated() 108 } 109 } 110 111 // Forwarding impl of Sink from the underlying stream 112 #[cfg(feature = "sink")] 113 impl<S, Fut, F, Item, E> Sink<Item> for TrySkipWhile<S, Fut, F> 114 where 115 S: TryStream + Sink<Item, Error = E>, 116 { 117 type Error = E; 118 119 delegate_sink!(stream, Item); 120 } 121