1 use core::mem;
2 use core::pin::Pin;
3 use futures_core::future::{FusedFuture, Future};
4 use futures_core::ready;
5 use futures_core::stream::{FusedStream, Stream};
6 use futures_core::task::{Context, Poll};
7 use pin_project_lite::pin_project;
8 
9 pin_project! {
10     /// Future for the [`unzip`](super::StreamExt::unzip) method.
11     #[derive(Debug)]
12     #[must_use = "futures do nothing unless you `.await` or poll them"]
13     pub struct Unzip<St, FromA, FromB> {
14         #[pin]
15         stream: St,
16         left: FromA,
17         right: FromB,
18     }
19 }
20 
21 impl<St: Stream, FromA: Default, FromB: Default> Unzip<St, FromA, FromB> {
finish(self: Pin<&mut Self>) -> (FromA, FromB)22     fn finish(self: Pin<&mut Self>) -> (FromA, FromB) {
23         let this = self.project();
24         (mem::replace(this.left, Default::default()), mem::replace(this.right, Default::default()))
25     }
26 
new(stream: St) -> Self27     pub(super) fn new(stream: St) -> Self {
28         Self { stream, left: Default::default(), right: Default::default() }
29     }
30 }
31 
32 impl<St, A, B, FromA, FromB> FusedFuture for Unzip<St, FromA, FromB>
33 where
34     St: FusedStream<Item = (A, B)>,
35     FromA: Default + Extend<A>,
36     FromB: Default + Extend<B>,
37 {
is_terminated(&self) -> bool38     fn is_terminated(&self) -> bool {
39         self.stream.is_terminated()
40     }
41 }
42 
43 impl<St, A, B, FromA, FromB> Future for Unzip<St, FromA, FromB>
44 where
45     St: Stream<Item = (A, B)>,
46     FromA: Default + Extend<A>,
47     FromB: Default + Extend<B>,
48 {
49     type Output = (FromA, FromB);
50 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(FromA, FromB)>51     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(FromA, FromB)> {
52         let mut this = self.as_mut().project();
53         loop {
54             match ready!(this.stream.as_mut().poll_next(cx)) {
55                 Some(e) => {
56                     this.left.extend(Some(e.0));
57                     this.right.extend(Some(e.1));
58                 }
59                 None => return Poll::Ready(self.finish()),
60             }
61         }
62     }
63 }
64