1 use core::mem; 2 use core::pin::Pin; 3 use futures_core::future::{FusedFuture, Future}; 4 use futures_core::stream::{FusedStream, Stream}; 5 use futures_core::task::{Context, Poll}; 6 use pin_project::pin_project; 7 8 /// Future for the [`collect`](super::StreamExt::collect) method. 9 #[pin_project] 10 #[derive(Debug)] 11 #[must_use = "futures do nothing unless you `.await` or poll them"] 12 pub struct Collect<St, C> { 13 #[pin] 14 stream: St, 15 collection: C, 16 } 17 18 impl<St: Stream, C: Default> Collect<St, C> { finish(self: Pin<&mut Self>) -> C19 fn finish(self: Pin<&mut Self>) -> C { 20 mem::replace(self.project().collection, Default::default()) 21 } 22 new(stream: St) -> Collect<St, C>23 pub(super) fn new(stream: St) -> Collect<St, C> { 24 Collect { 25 stream, 26 collection: Default::default(), 27 } 28 } 29 } 30 31 impl<St, C> FusedFuture for Collect<St, C> 32 where St: FusedStream, 33 C: Default + Extend<St:: Item> 34 { is_terminated(&self) -> bool35 fn is_terminated(&self) -> bool { 36 self.stream.is_terminated() 37 } 38 } 39 40 impl<St, C> Future for Collect<St, C> 41 where St: Stream, 42 C: Default + Extend<St:: Item> 43 { 44 type Output = C; 45 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C>46 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> { 47 let mut this = self.as_mut().project(); 48 loop { 49 match ready!(this.stream.as_mut().poll_next(cx)) { 50 Some(e) => this.collection.extend(Some(e)), 51 None => return Poll::Ready(self.finish()), 52 } 53 } 54 } 55 } 56