1 use core::cmp;
2 use core::pin::Pin;
3 use futures_core::stream::{Stream, FusedStream};
4 use futures_core::task::{Context, Poll};
5 #[cfg(feature = "sink")]
6 use futures_sink::Sink;
7 use pin_project::{pin_project, project};
8 
9 /// Stream for the [`take`](super::StreamExt::take) method.
10 #[pin_project]
11 #[derive(Debug)]
12 #[must_use = "streams do nothing unless polled"]
13 pub struct Take<St> {
14     #[pin]
15     stream: St,
16     remaining: usize,
17 }
18 
19 impl<St: Stream> Take<St> {
new(stream: St, n: usize) -> Take<St>20     pub(super) fn new(stream: St, n: usize) -> Take<St> {
21         Take {
22             stream,
23             remaining: n,
24         }
25     }
26 
27     delegate_access_inner!(stream, St, ());
28 }
29 
30 impl<St> Stream for Take<St>
31     where St: Stream,
32 {
33     type Item = St::Item;
34 
35     #[project]
poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>>36     fn poll_next(
37         self: Pin<&mut Self>,
38         cx: &mut Context<'_>,
39     ) -> Poll<Option<St::Item>> {
40         if self.remaining == 0 {
41             Poll::Ready(None)
42         } else {
43             #[project]
44             let Take { stream, remaining } = self.project();
45             let next = ready!(stream.poll_next(cx));
46             if next.is_some() {
47                 *remaining -= 1;
48             } else {
49                 *remaining = 0;
50             }
51             Poll::Ready(next)
52         }
53     }
54 
size_hint(&self) -> (usize, Option<usize>)55     fn size_hint(&self) -> (usize, Option<usize>) {
56         if self.remaining == 0 {
57             return (0, Some(0));
58         }
59 
60         let (lower, upper) = self.stream.size_hint();
61 
62         let lower = cmp::min(lower, self.remaining as usize);
63 
64         let upper = match upper {
65             Some(x) if x < self.remaining as usize => Some(x),
66             _ => Some(self.remaining as usize)
67         };
68 
69         (lower, upper)
70     }
71 }
72 
73 impl<St> FusedStream for Take<St>
74     where St: FusedStream,
75 {
is_terminated(&self) -> bool76     fn is_terminated(&self) -> bool {
77         self.remaining == 0 || self.stream.is_terminated()
78     }
79 }
80 
81 // Forwarding impl of Sink from the underlying stream
82 #[cfg(feature = "sink")]
83 impl<S, Item> Sink<Item> for Take<S>
84     where S: Stream + Sink<Item>,
85 {
86     type Error = S::Error;
87 
88     delegate_sink!(stream, Item);
89 }
90