1 use core::pin::Pin;
2 use core::usize;
3 use futures_core::stream::{FusedStream, Stream};
4 use futures_core::task::{Context, Poll};
5 use pin_project::pin_project;
6 
7 /// Stream for the [`cycle`](super::StreamExt::cycle) method.
8 #[pin_project]
9 #[derive(Debug)]
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Cycle<St> {
12     orig: St,
13     #[pin]
14     stream: St,
15 }
16 
17 impl<St> Cycle<St>
18 where
19     St: Clone + Stream,
20 {
new(stream: St) -> Self21     pub(super) fn new(stream: St) -> Self {
22         Self {
23             orig: stream.clone(),
24             stream,
25         }
26     }
27 }
28 
29 impl<St> Stream for Cycle<St>
30 where
31     St: Clone + Stream,
32 {
33     type Item = St::Item;
34 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>35     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36         let mut this = self.project();
37 
38         match ready!(this.stream.as_mut().poll_next(cx)) {
39             None => {
40                 this.stream.set(this.orig.clone());
41                 this.stream.poll_next(cx)
42             }
43             item => Poll::Ready(item),
44         }
45     }
46 
size_hint(&self) -> (usize, Option<usize>)47     fn size_hint(&self) -> (usize, Option<usize>) {
48         // the cycle stream is either empty or infinite
49         match self.orig.size_hint() {
50             size @ (0, Some(0)) => size,
51             (0, _) => (0, None),
52             _ => (usize::max_value(), None),
53         }
54     }
55 }
56 
57 impl<St> FusedStream for Cycle<St>
58 where
59     St: Clone + Stream,
60 {
is_terminated(&self) -> bool61     fn is_terminated(&self) -> bool {
62         // the cycle stream is either empty or infinite
63         if let (0, Some(0)) = self.size_hint() {
64             true
65         } else {
66             false
67         }
68     }
69 }
70