1 use core::pin::Pin;
2 use futures_core::ready;
3 use futures_core::stream::{FusedStream, Stream};
4 use futures_core::task::{Context, Poll};
5 #[cfg(feature = "sink")]
6 use futures_sink::Sink;
7 use pin_project_lite::pin_project;
8 
9 pin_project! {
10     /// Stream for the [`enumerate`](super::StreamExt::enumerate) method.
11     #[derive(Debug)]
12     #[must_use = "streams do nothing unless polled"]
13     pub struct Enumerate<St> {
14         #[pin]
15         stream: St,
16         count: usize,
17     }
18 }
19 
20 impl<St: Stream> Enumerate<St> {
new(stream: St) -> Self21     pub(super) fn new(stream: St) -> Self {
22         Self { stream, count: 0 }
23     }
24 
25     delegate_access_inner!(stream, St, ());
26 }
27 
28 impl<St: Stream + FusedStream> FusedStream for Enumerate<St> {
is_terminated(&self) -> bool29     fn is_terminated(&self) -> bool {
30         self.stream.is_terminated()
31     }
32 }
33 
34 impl<St: Stream> Stream for Enumerate<St> {
35     type Item = (usize, St::Item);
36 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>37     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38         let this = self.project();
39 
40         match ready!(this.stream.poll_next(cx)) {
41             Some(item) => {
42                 let prev_count = *this.count;
43                 *this.count += 1;
44                 Poll::Ready(Some((prev_count, item)))
45             }
46             None => Poll::Ready(None),
47         }
48     }
49 
size_hint(&self) -> (usize, Option<usize>)50     fn size_hint(&self) -> (usize, Option<usize>) {
51         self.stream.size_hint()
52     }
53 }
54 
55 // Forwarding impl of Sink from the underlying stream
56 #[cfg(feature = "sink")]
57 impl<S, Item> Sink<Item> for Enumerate<S>
58 where
59     S: Stream + Sink<Item>,
60 {
61     type Error = S::Error;
62 
63     delegate_sink!(stream, Item);
64 }
65