1 use core::mem; 2 use core::pin::Pin; 3 use futures_core::future::{FusedFuture, Future}; 4 use futures_core::stream::{FusedStream, Stream}; 5 use futures_core::task::{Context, Poll}; 6 use pin_project::{pin_project, project}; 7 8 /// Future for the [`collect`](super::StreamExt::collect) method. 9 #[pin_project] 10 #[derive(Debug)] 11 #[must_use = "futures do nothing unless you `.await` or poll them"] 12 pub struct Collect<St, C> { 13 #[pin] 14 stream: St, 15 collection: C, 16 } 17 18 impl<St: Stream, C: Default> Collect<St, C> { finish(self: Pin<&mut Self>) -> C19 fn finish(self: Pin<&mut Self>) -> C { 20 mem::replace(self.project().collection, Default::default()) 21 } 22 new(stream: St) -> Collect<St, C>23 pub(super) fn new(stream: St) -> Collect<St, C> { 24 Collect { 25 stream, 26 collection: Default::default(), 27 } 28 } 29 } 30 31 impl<St, C> FusedFuture for Collect<St, C> 32 where St: FusedStream, 33 C: Default + Extend<St:: Item> 34 { is_terminated(&self) -> bool35 fn is_terminated(&self) -> bool { 36 self.stream.is_terminated() 37 } 38 } 39 40 impl<St, C> Future for Collect<St, C> 41 where St: Stream, 42 C: Default + Extend<St:: Item> 43 { 44 type Output = C; 45 46 #[project] poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C>47 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> { 48 #[project] 49 let Collect { mut stream, collection } = self.as_mut().project(); 50 loop { 51 match ready!(stream.as_mut().poll_next(cx)) { 52 Some(e) => collection.extend(Some(e)), 53 None => return Poll::Ready(self.finish()), 54 } 55 } 56 } 57 } 58