1 use core::pin::Pin;
2 use futures_core::future::{Future, FusedFuture};
3 use futures_core::stream::{Stream, FusedStream};
4 use futures_core::task::{Context, Poll};
5 use pin_utils::{unsafe_pinned, unsafe_unpinned};
6 
7 /// Future for the [`concat`](super::StreamExt::concat) method.
8 #[derive(Debug)]
9 #[must_use = "futures do nothing unless you `.await` or poll them"]
10 pub struct Concat<St: Stream> {
11     stream: St,
12     accum: Option<St::Item>,
13 }
14 
15 impl<St: Stream + Unpin> Unpin for Concat<St> {}
16 
17 impl<St> Concat<St>
18 where St: Stream,
19       St::Item: Extend<<St::Item as IntoIterator>::Item> +
20                 IntoIterator + Default,
21 {
22     unsafe_pinned!(stream: St);
23     unsafe_unpinned!(accum: Option<St::Item>);
24 
new(stream: St) -> Concat<St>25     pub(super) fn new(stream: St) -> Concat<St> {
26         Concat {
27             stream,
28             accum: None,
29         }
30     }
31 }
32 
33 impl<St> Future for Concat<St>
34 where St: Stream,
35       St::Item: Extend<<St::Item as IntoIterator>::Item> +
36                 IntoIterator + Default,
37 {
38     type Output = St::Item;
39 
poll( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Self::Output>40     fn poll(
41         mut self: Pin<&mut Self>, cx: &mut Context<'_>
42     ) -> Poll<Self::Output> {
43         loop {
44             match ready!(self.as_mut().stream().poll_next(cx)) {
45                 None => {
46                     return Poll::Ready(self.as_mut().accum().take().unwrap_or_default())
47                 }
48                 Some(e) => {
49                     let accum = self.as_mut().accum();
50                     if let Some(a) = accum {
51                         a.extend(e)
52                     } else {
53                         *accum = Some(e)
54                     }
55                 }
56             }
57         }
58     }
59 }
60 
61 impl<St> FusedFuture for Concat<St>
62 where St: FusedStream,
63       St::Item: Extend<<St::Item as IntoIterator>::Item> +
64                 IntoIterator + Default,
65 {
is_terminated(&self) -> bool66     fn is_terminated(&self) -> bool {
67         self.accum.is_none() && self.stream.is_terminated()
68     }
69 }
70