1 use core::cmp; 2 use core::pin::Pin; 3 use futures_core::stream::{Stream, FusedStream}; 4 use futures_core::task::{Context, Poll}; 5 #[cfg(feature = "sink")] 6 use futures_sink::Sink; 7 use pin_project::{pin_project, project}; 8 9 /// Stream for the [`take`](super::StreamExt::take) method. 10 #[pin_project] 11 #[derive(Debug)] 12 #[must_use = "streams do nothing unless polled"] 13 pub struct Take<St> { 14 #[pin] 15 stream: St, 16 remaining: usize, 17 } 18 19 impl<St: Stream> Take<St> { new(stream: St, n: usize) -> Take<St>20 pub(super) fn new(stream: St, n: usize) -> Take<St> { 21 Take { 22 stream, 23 remaining: n, 24 } 25 } 26 27 delegate_access_inner!(stream, St, ()); 28 } 29 30 impl<St> Stream for Take<St> 31 where St: Stream, 32 { 33 type Item = St::Item; 34 35 #[project] poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>>36 fn poll_next( 37 self: Pin<&mut Self>, 38 cx: &mut Context<'_>, 39 ) -> Poll<Option<St::Item>> { 40 if self.remaining == 0 { 41 Poll::Ready(None) 42 } else { 43 #[project] 44 let Take { stream, remaining } = self.project(); 45 let next = ready!(stream.poll_next(cx)); 46 if next.is_some() { 47 *remaining -= 1; 48 } else { 49 *remaining = 0; 50 } 51 Poll::Ready(next) 52 } 53 } 54 size_hint(&self) -> (usize, Option<usize>)55 fn size_hint(&self) -> (usize, Option<usize>) { 56 if self.remaining == 0 { 57 return (0, Some(0)); 58 } 59 60 let (lower, upper) = self.stream.size_hint(); 61 62 let lower = cmp::min(lower, self.remaining as usize); 63 64 let upper = match upper { 65 Some(x) if x < self.remaining as usize => Some(x), 66 _ => Some(self.remaining as usize) 67 }; 68 69 (lower, upper) 70 } 71 } 72 73 impl<St> FusedStream for Take<St> 74 where St: FusedStream, 75 { is_terminated(&self) -> bool76 fn is_terminated(&self) -> bool { 77 self.remaining == 0 || self.stream.is_terminated() 78 } 79 } 80 81 // Forwarding impl of Sink from the underlying stream 82 #[cfg(feature = "sink")] 83 impl<S, Item> Sink<Item> for Take<S> 84 where S: Stream + Sink<Item>, 85 { 86 type Error = S::Error; 87 88 delegate_sink!(stream, Item); 89 } 90