1 use crate::stream::{Fuse, Stream};
2 use crate::time::{Delay, Elapsed, Instant};
3 
4 use core::future::Future;
5 use core::pin::Pin;
6 use core::task::{Context, Poll};
7 use pin_project_lite::pin_project;
8 use std::time::Duration;
9 
10 pin_project! {
11     /// Stream returned by the [`timeout`](super::StreamExt::timeout) method.
12     #[must_use = "streams do nothing unless polled"]
13     #[derive(Debug)]
14     pub struct Timeout<S> {
15         #[pin]
16         stream: Fuse<S>,
17         deadline: Delay,
18         duration: Duration,
19         poll_deadline: bool,
20     }
21 }
22 
23 impl<S: Stream> Timeout<S> {
new(stream: S, duration: Duration) -> Self24     pub(super) fn new(stream: S, duration: Duration) -> Self {
25         let next = Instant::now() + duration;
26         let deadline = Delay::new_timeout(next, duration);
27 
28         Timeout {
29             stream: Fuse::new(stream),
30             deadline,
31             duration,
32             poll_deadline: true,
33         }
34     }
35 }
36 
37 impl<S: Stream> Stream for Timeout<S> {
38     type Item = Result<S::Item, Elapsed>;
39 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>40     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
41         match self.as_mut().project().stream.poll_next(cx) {
42             Poll::Ready(v) => {
43                 if v.is_some() {
44                     let next = Instant::now() + self.duration;
45                     self.as_mut().project().deadline.reset(next);
46                     *self.as_mut().project().poll_deadline = true;
47                 }
48                 return Poll::Ready(v.map(Ok));
49             }
50             Poll::Pending => {}
51         };
52 
53         if self.poll_deadline {
54             ready!(Pin::new(self.as_mut().project().deadline).poll(cx));
55             *self.as_mut().project().poll_deadline = false;
56             return Poll::Ready(Some(Err(Elapsed::new())));
57         }
58 
59         Poll::Pending
60     }
61 
size_hint(&self) -> (usize, Option<usize>)62     fn size_hint(&self) -> (usize, Option<usize>) {
63         self.stream.size_hint()
64     }
65 }
66