1 use core::mem;
2 use core::pin::Pin;
3 use futures_core::future::{FusedFuture, Future};
4 use futures_core::stream::{FusedStream, Stream};
5 use futures_core::task::{Context, Poll};
6 use pin_project::{pin_project, project};
7 
8 /// Future for the [`collect`](super::StreamExt::collect) method.
9 #[pin_project]
10 #[derive(Debug)]
11 #[must_use = "futures do nothing unless you `.await` or poll them"]
12 pub struct Collect<St, C> {
13     #[pin]
14     stream: St,
15     collection: C,
16 }
17 
18 impl<St: Stream, C: Default> Collect<St, C> {
finish(self: Pin<&mut Self>) -> C19     fn finish(self: Pin<&mut Self>) -> C {
20         mem::replace(self.project().collection, Default::default())
21     }
22 
new(stream: St) -> Collect<St, C>23     pub(super) fn new(stream: St) -> Collect<St, C> {
24         Collect {
25             stream,
26             collection: Default::default(),
27         }
28     }
29 }
30 
31 impl<St, C> FusedFuture for Collect<St, C>
32 where St: FusedStream,
33       C: Default + Extend<St:: Item>
34 {
is_terminated(&self) -> bool35     fn is_terminated(&self) -> bool {
36         self.stream.is_terminated()
37     }
38 }
39 
40 impl<St, C> Future for Collect<St, C>
41 where St: Stream,
42       C: Default + Extend<St:: Item>
43 {
44     type Output = C;
45 
46     #[project]
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C>47     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
48         #[project]
49         let Collect { mut stream, collection } = self.as_mut().project();
50         loop {
51             match ready!(stream.as_mut().poll_next(cx)) {
52                 Some(e) => collection.extend(Some(e)),
53                 None => return Poll::Ready(self.finish()),
54             }
55         }
56     }
57 }
58