1 use core::pin::Pin;
2 use futures_core::stream::{FusedStream, Stream};
3 use futures_core::task::{Context, Poll};
4 #[cfg(feature = "sink")]
5 use futures_sink::Sink;
6 use pin_utils::{unsafe_pinned, unsafe_unpinned};
7 
8 /// Stream for the [`enumerate`](super::StreamExt::enumerate) method.
9 #[derive(Debug)]
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Enumerate<St> {
12     stream: St,
13     count: usize,
14 }
15 
16 impl<St: Unpin> Unpin for Enumerate<St> {}
17 
18 impl<St: Stream> Enumerate<St> {
19     unsafe_pinned!(stream: St);
20     unsafe_unpinned!(count: usize);
21 
new(stream: St) -> Enumerate<St>22     pub(super) fn new(stream: St) -> Enumerate<St> {
23         Enumerate {
24             stream,
25             count: 0,
26         }
27     }
28 
29     /// Acquires a reference to the underlying stream that this combinator is
30     /// pulling from.
get_ref(&self) -> &St31     pub fn get_ref(&self) -> &St {
32         &self.stream
33     }
34 
35     /// Acquires a mutable reference to the underlying stream that this
36     /// combinator is pulling from.
37     ///
38     /// Note that care must be taken to avoid tampering with the state of the
39     /// stream which may otherwise confuse this combinator.
get_mut(&mut self) -> &mut St40     pub fn get_mut(&mut self) -> &mut St {
41         &mut self.stream
42     }
43 
44     /// Acquires a pinned mutable reference to the underlying stream that this
45     /// combinator is pulling from.
46     ///
47     /// Note that care must be taken to avoid tampering with the state of the
48     /// stream which may otherwise confuse this combinator.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St>49     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
50         self.stream()
51     }
52 
53     /// Consumes this combinator, returning the underlying stream.
54     ///
55     /// Note that this may discard intermediate state of this combinator, so
56     /// care should be taken to avoid losing resources when this is called.
into_inner(self) -> St57     pub fn into_inner(self) -> St {
58         self.stream
59     }
60 }
61 
62 impl<St: Stream + FusedStream> FusedStream for Enumerate<St> {
is_terminated(&self) -> bool63     fn is_terminated(&self) -> bool {
64         self.stream.is_terminated()
65     }
66 }
67 
68 impl<St: Stream> Stream for Enumerate<St> {
69     type Item = (usize, St::Item);
70 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>71     fn poll_next(
72         mut self: Pin<&mut Self>,
73         cx: &mut Context<'_>,
74     ) -> Poll<Option<Self::Item>> {
75         match ready!(self.as_mut().stream().poll_next(cx)) {
76             Some(item) => {
77                 let count = self.count;
78                 *self.as_mut().count() += 1;
79                 Poll::Ready(Some((count, item)))
80             }
81             None => Poll::Ready(None),
82         }
83     }
84 
size_hint(&self) -> (usize, Option<usize>)85     fn size_hint(&self) -> (usize, Option<usize>) {
86         self.stream.size_hint()
87     }
88 }
89 
90 // Forwarding impl of Sink from the underlying stream
91 #[cfg(feature = "sink")]
92 impl<S, Item> Sink<Item> for Enumerate<S>
93 where
94     S: Stream + Sink<Item>,
95 {
96     type Error = S::Error;
97 
98     delegate_sink!(stream, Item);
99 }
100