1 use core::pin::Pin; 2 use core::usize; 3 use futures_core::stream::{FusedStream, Stream}; 4 use futures_core::task::{Context, Poll}; 5 use pin_project::pin_project; 6 7 /// Stream for the [`cycle`](super::StreamExt::cycle) method. 8 #[pin_project] 9 #[derive(Debug)] 10 #[must_use = "streams do nothing unless polled"] 11 pub struct Cycle<St> { 12 orig: St, 13 #[pin] 14 stream: St, 15 } 16 17 impl<St> Cycle<St> 18 where 19 St: Clone + Stream, 20 { new(stream: St) -> Self21 pub(super) fn new(stream: St) -> Self { 22 Self { 23 orig: stream.clone(), 24 stream, 25 } 26 } 27 } 28 29 impl<St> Stream for Cycle<St> 30 where 31 St: Clone + Stream, 32 { 33 type Item = St::Item; 34 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>35 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 36 let mut this = self.project(); 37 38 match ready!(this.stream.as_mut().poll_next(cx)) { 39 None => { 40 this.stream.set(this.orig.clone()); 41 this.stream.poll_next(cx) 42 } 43 item => Poll::Ready(item), 44 } 45 } 46 size_hint(&self) -> (usize, Option<usize>)47 fn size_hint(&self) -> (usize, Option<usize>) { 48 // the cycle stream is either empty or infinite 49 match self.orig.size_hint() { 50 size @ (0, Some(0)) => size, 51 (0, _) => (0, None), 52 _ => (usize::max_value(), None), 53 } 54 } 55 } 56 57 impl<St> FusedStream for Cycle<St> 58 where 59 St: Clone + Stream, 60 { is_terminated(&self) -> bool61 fn is_terminated(&self) -> bool { 62 // the cycle stream is either empty or infinite 63 if let (0, Some(0)) = self.size_hint() { 64 true 65 } else { 66 false 67 } 68 } 69 } 70