1 use crate::stream::{Fuse, Stream}; 2 use crate::time::{Delay, Elapsed, Instant}; 3 4 use core::future::Future; 5 use core::pin::Pin; 6 use core::task::{Context, Poll}; 7 use pin_project_lite::pin_project; 8 use std::time::Duration; 9 10 pin_project! { 11 /// Stream returned by the [`timeout`](super::StreamExt::timeout) method. 12 #[must_use = "streams do nothing unless polled"] 13 #[derive(Debug)] 14 pub struct Timeout<S> { 15 #[pin] 16 stream: Fuse<S>, 17 deadline: Delay, 18 duration: Duration, 19 poll_deadline: bool, 20 } 21 } 22 23 impl<S: Stream> Timeout<S> { new(stream: S, duration: Duration) -> Self24 pub(super) fn new(stream: S, duration: Duration) -> Self { 25 let next = Instant::now() + duration; 26 let deadline = Delay::new_timeout(next, duration); 27 28 Timeout { 29 stream: Fuse::new(stream), 30 deadline, 31 duration, 32 poll_deadline: true, 33 } 34 } 35 } 36 37 impl<S: Stream> Stream for Timeout<S> { 38 type Item = Result<S::Item, Elapsed>; 39 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>40 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 41 match self.as_mut().project().stream.poll_next(cx) { 42 Poll::Ready(v) => { 43 if v.is_some() { 44 let next = Instant::now() + self.duration; 45 self.as_mut().project().deadline.reset(next); 46 *self.as_mut().project().poll_deadline = true; 47 } 48 return Poll::Ready(v.map(Ok)); 49 } 50 Poll::Pending => {} 51 }; 52 53 if self.poll_deadline { 54 ready!(Pin::new(self.as_mut().project().deadline).poll(cx)); 55 *self.as_mut().project().poll_deadline = false; 56 return Poll::Ready(Some(Err(Elapsed::new()))); 57 } 58 59 Poll::Pending 60 } 61 size_hint(&self) -> (usize, Option<usize>)62 fn size_hint(&self) -> (usize, Option<usize>) { 63 self.stream.size_hint() 64 } 65 } 66