1 use core::pin::Pin; 2 use futures_core::stream::{FusedStream, Stream}; 3 use futures_core::task::{Context, Poll}; 4 #[cfg(feature = "sink")] 5 use futures_sink::Sink; 6 use pin_utils::{unsafe_pinned, unsafe_unpinned}; 7 8 /// Stream for the [`enumerate`](super::StreamExt::enumerate) method. 9 #[derive(Debug)] 10 #[must_use = "streams do nothing unless polled"] 11 pub struct Enumerate<St> { 12 stream: St, 13 count: usize, 14 } 15 16 impl<St: Unpin> Unpin for Enumerate<St> {} 17 18 impl<St: Stream> Enumerate<St> { 19 unsafe_pinned!(stream: St); 20 unsafe_unpinned!(count: usize); 21 new(stream: St) -> Enumerate<St>22 pub(super) fn new(stream: St) -> Enumerate<St> { 23 Enumerate { 24 stream, 25 count: 0, 26 } 27 } 28 29 /// Acquires a reference to the underlying stream that this combinator is 30 /// pulling from. get_ref(&self) -> &St31 pub fn get_ref(&self) -> &St { 32 &self.stream 33 } 34 35 /// Acquires a mutable reference to the underlying stream that this 36 /// combinator is pulling from. 37 /// 38 /// Note that care must be taken to avoid tampering with the state of the 39 /// stream which may otherwise confuse this combinator. get_mut(&mut self) -> &mut St40 pub fn get_mut(&mut self) -> &mut St { 41 &mut self.stream 42 } 43 44 /// Acquires a pinned mutable reference to the underlying stream that this 45 /// combinator is pulling from. 46 /// 47 /// Note that care must be taken to avoid tampering with the state of the 48 /// stream which may otherwise confuse this combinator. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St>49 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { 50 self.stream() 51 } 52 53 /// Consumes this combinator, returning the underlying stream. 54 /// 55 /// Note that this may discard intermediate state of this combinator, so 56 /// care should be taken to avoid losing resources when this is called. into_inner(self) -> St57 pub fn into_inner(self) -> St { 58 self.stream 59 } 60 } 61 62 impl<St: Stream + FusedStream> FusedStream for Enumerate<St> { is_terminated(&self) -> bool63 fn is_terminated(&self) -> bool { 64 self.stream.is_terminated() 65 } 66 } 67 68 impl<St: Stream> Stream for Enumerate<St> { 69 type Item = (usize, St::Item); 70 poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>71 fn poll_next( 72 mut self: Pin<&mut Self>, 73 cx: &mut Context<'_>, 74 ) -> Poll<Option<Self::Item>> { 75 match ready!(self.as_mut().stream().poll_next(cx)) { 76 Some(item) => { 77 let count = self.count; 78 *self.as_mut().count() += 1; 79 Poll::Ready(Some((count, item))) 80 } 81 None => Poll::Ready(None), 82 } 83 } 84 size_hint(&self) -> (usize, Option<usize>)85 fn size_hint(&self) -> (usize, Option<usize>) { 86 self.stream.size_hint() 87 } 88 } 89 90 // Forwarding impl of Sink from the underlying stream 91 #[cfg(feature = "sink")] 92 impl<S, Item> Sink<Item> for Enumerate<S> 93 where 94 S: Stream + Sink<Item>, 95 { 96 type Error = S::Error; 97 98 delegate_sink!(stream, Item); 99 } 100