1 use core::pin::Pin; 2 use core::task::{Context, Poll}; 3 use core::future::Future; 4 5 use crate::stream::Stream; 6 7 #[doc(hidden)] 8 #[allow(missing_debug_implementations)] 9 pub struct NthFuture<'a, S> { 10 stream: &'a mut S, 11 n: usize, 12 } 13 14 impl<S: Unpin> Unpin for NthFuture<'_, S> {} 15 16 impl<'a, S> NthFuture<'a, S> { new(stream: &'a mut S, n: usize) -> Self17 pub(crate) fn new(stream: &'a mut S, n: usize) -> Self { 18 Self { stream, n } 19 } 20 } 21 22 impl<'a, S> Future for NthFuture<'a, S> 23 where 24 S: Stream + Unpin + Sized, 25 { 26 type Output = Option<S::Item>; 27 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>28 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 29 let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); 30 match next { 31 Some(v) => match self.n { 32 0 => Poll::Ready(Some(v)), 33 _ => { 34 self.n -= 1; 35 cx.waker().wake_by_ref(); 36 Poll::Pending 37 } 38 }, 39 None => Poll::Ready(None), 40 } 41 } 42 } 43