1 use crate::stream::Stream; 2 3 use core::future::Future; 4 use core::pin::Pin; 5 use core::task::{Context, Poll}; 6 7 /// Future for the [`any`](super::StreamExt::any) method. 8 #[derive(Debug)] 9 #[must_use = "futures do nothing unless you `.await` or poll them"] 10 pub struct AnyFuture<'a, St: ?Sized, F> { 11 stream: &'a mut St, 12 f: F, 13 } 14 15 impl<'a, St: ?Sized, F> AnyFuture<'a, St, F> { new(stream: &'a mut St, f: F) -> Self16 pub(super) fn new(stream: &'a mut St, f: F) -> Self { 17 Self { stream, f } 18 } 19 } 20 21 impl<St: ?Sized + Unpin, F> Unpin for AnyFuture<'_, St, F> {} 22 23 impl<St, F> Future for AnyFuture<'_, St, F> 24 where 25 St: ?Sized + Stream + Unpin, 26 F: FnMut(St::Item) -> bool, 27 { 28 type Output = bool; 29 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>30 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 31 let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)); 32 33 match next { 34 Some(v) => { 35 if (&mut self.f)(v) { 36 Poll::Ready(true) 37 } else { 38 cx.waker().wake_by_ref(); 39 Poll::Pending 40 } 41 } 42 None => Poll::Ready(false), 43 } 44 } 45 } 46