1 use crate::stream::Stream; 2 3 use core::fmt; 4 use core::pin::Pin; 5 use core::task::{Context, Poll}; 6 use pin_project_lite::pin_project; 7 8 pin_project! { 9 /// Stream for the [`take_while`](super::StreamExt::take_while) method. 10 #[must_use = "streams do nothing unless polled"] 11 pub struct TakeWhile<St, F> { 12 #[pin] 13 stream: St, 14 predicate: F, 15 done: bool, 16 } 17 } 18 19 impl<St, F> fmt::Debug for TakeWhile<St, F> 20 where 21 St: fmt::Debug, 22 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 24 f.debug_struct("TakeWhile") 25 .field("stream", &self.stream) 26 .field("done", &self.done) 27 .finish() 28 } 29 } 30 31 impl<St, F> TakeWhile<St, F> { new(stream: St, predicate: F) -> Self32 pub(super) fn new(stream: St, predicate: F) -> Self { 33 Self { 34 stream, 35 predicate, 36 done: false, 37 } 38 } 39 } 40 41 impl<St, F> Stream for TakeWhile<St, F> 42 where 43 St: Stream, 44 F: FnMut(&St::Item) -> bool, 45 { 46 type Item = St::Item; 47 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>48 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 49 if !*self.as_mut().project().done { 50 self.as_mut().project().stream.poll_next(cx).map(|ready| { 51 let ready = ready.and_then(|item| { 52 if !(self.as_mut().project().predicate)(&item) { 53 None 54 } else { 55 Some(item) 56 } 57 }); 58 59 if ready.is_none() { 60 *self.as_mut().project().done = true; 61 } 62 63 ready 64 }) 65 } else { 66 Poll::Ready(None) 67 } 68 } 69 size_hint(&self) -> (usize, Option<usize>)70 fn size_hint(&self) -> (usize, Option<usize>) { 71 if self.done { 72 return (0, Some(0)); 73 } 74 75 let (_, upper) = self.stream.size_hint(); 76 77 (0, upper) 78 } 79 } 80