1 use core::mem; 2 use core::pin::Pin; 3 use futures_core::future::{FusedFuture, Future}; 4 use futures_core::ready; 5 use futures_core::stream::{FusedStream, Stream}; 6 use futures_core::task::{Context, Poll}; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Future for the [`unzip`](super::StreamExt::unzip) method. 11 #[derive(Debug)] 12 #[must_use = "futures do nothing unless you `.await` or poll them"] 13 pub struct Unzip<St, FromA, FromB> { 14 #[pin] 15 stream: St, 16 left: FromA, 17 right: FromB, 18 } 19 } 20 21 impl<St: Stream, FromA: Default, FromB: Default> Unzip<St, FromA, FromB> { finish(self: Pin<&mut Self>) -> (FromA, FromB)22 fn finish(self: Pin<&mut Self>) -> (FromA, FromB) { 23 let this = self.project(); 24 (mem::replace(this.left, Default::default()), mem::replace(this.right, Default::default())) 25 } 26 new(stream: St) -> Self27 pub(super) fn new(stream: St) -> Self { 28 Self { stream, left: Default::default(), right: Default::default() } 29 } 30 } 31 32 impl<St, A, B, FromA, FromB> FusedFuture for Unzip<St, FromA, FromB> 33 where 34 St: FusedStream<Item = (A, B)>, 35 FromA: Default + Extend<A>, 36 FromB: Default + Extend<B>, 37 { is_terminated(&self) -> bool38 fn is_terminated(&self) -> bool { 39 self.stream.is_terminated() 40 } 41 } 42 43 impl<St, A, B, FromA, FromB> Future for Unzip<St, FromA, FromB> 44 where 45 St: Stream<Item = (A, B)>, 46 FromA: Default + Extend<A>, 47 FromB: Default + Extend<B>, 48 { 49 type Output = (FromA, FromB); 50 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(FromA, FromB)>51 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(FromA, FromB)> { 52 let mut this = self.as_mut().project(); 53 loop { 54 match ready!(this.stream.as_mut().poll_next(cx)) { 55 Some(e) => { 56 this.left.extend(Some(e.0)); 57 this.right.extend(Some(e.1)); 58 } 59 None => return Poll::Ready(self.finish()), 60 } 61 } 62 } 63 } 64