1 use crate::stream::{StreamExt, Fuse}; 2 use core::cmp; 3 use core::pin::Pin; 4 use futures_core::stream::{FusedStream, Stream}; 5 use futures_core::task::{Context, Poll}; 6 use pin_project::{pin_project, project}; 7 8 /// Stream for the [`zip`](super::StreamExt::zip) method. 9 #[pin_project] 10 #[derive(Debug)] 11 #[must_use = "streams do nothing unless polled"] 12 pub struct Zip<St1: Stream, St2: Stream> { 13 #[pin] 14 stream1: Fuse<St1>, 15 #[pin] 16 stream2: Fuse<St2>, 17 queued1: Option<St1::Item>, 18 queued2: Option<St2::Item>, 19 } 20 21 impl<St1: Stream, St2: Stream> Zip<St1, St2> { new(stream1: St1, stream2: St2) -> Zip<St1, St2>22 pub(super) fn new(stream1: St1, stream2: St2) -> Zip<St1, St2> { 23 Zip { 24 stream1: stream1.fuse(), 25 stream2: stream2.fuse(), 26 queued1: None, 27 queued2: None, 28 } 29 } 30 31 /// Acquires a reference to the underlying streams that this combinator is 32 /// pulling from. get_ref(&self) -> (&St1, &St2)33 pub fn get_ref(&self) -> (&St1, &St2) { 34 (self.stream1.get_ref(), self.stream2.get_ref()) 35 } 36 37 /// Acquires a mutable reference to the underlying streams that this 38 /// combinator is pulling from. 39 /// 40 /// Note that care must be taken to avoid tampering with the state of the 41 /// stream which may otherwise confuse this combinator. get_mut(&mut self) -> (&mut St1, &mut St2)42 pub fn get_mut(&mut self) -> (&mut St1, &mut St2) { 43 (self.stream1.get_mut(), self.stream2.get_mut()) 44 } 45 46 /// Acquires a pinned mutable reference to the underlying streams that this 47 /// combinator is pulling from. 48 /// 49 /// Note that care must be taken to avoid tampering with the state of the 50 /// stream which may otherwise confuse this combinator. get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>)51 pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { 52 unsafe { 53 let Self { stream1, stream2, .. } = self.get_unchecked_mut(); 54 (Pin::new_unchecked(stream1).get_pin_mut(), Pin::new_unchecked(stream2).get_pin_mut()) 55 } 56 } 57 58 /// Consumes this combinator, returning the underlying streams. 59 /// 60 /// Note that this may discard intermediate state of this combinator, so 61 /// care should be taken to avoid losing resources when this is called. into_inner(self) -> (St1, St2)62 pub fn into_inner(self) -> (St1, St2) { 63 (self.stream1.into_inner(), self.stream2.into_inner()) 64 } 65 } 66 67 impl<St1, St2> FusedStream for Zip<St1, St2> 68 where St1: Stream, St2: Stream, 69 { is_terminated(&self) -> bool70 fn is_terminated(&self) -> bool { 71 self.stream1.is_terminated() && self.stream2.is_terminated() 72 } 73 } 74 75 impl<St1, St2> Stream for Zip<St1, St2> 76 where St1: Stream, St2: Stream 77 { 78 type Item = (St1::Item, St2::Item); 79 80 #[project] poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>81 fn poll_next( 82 self: Pin<&mut Self>, 83 cx: &mut Context<'_>, 84 ) -> Poll<Option<Self::Item>> { 85 #[project] 86 let Zip { mut stream1, mut stream2, queued1, queued2 } = self.project(); 87 88 if queued1.is_none() { 89 match stream1.as_mut().poll_next(cx) { 90 Poll::Ready(Some(item1)) => *queued1 = Some(item1), 91 Poll::Ready(None) | Poll::Pending => {} 92 } 93 } 94 if queued2.is_none() { 95 match stream2.as_mut().poll_next(cx) { 96 Poll::Ready(Some(item2)) => *queued2 = Some(item2), 97 Poll::Ready(None) | Poll::Pending => {} 98 } 99 } 100 101 if queued1.is_some() && queued2.is_some() { 102 let pair = (queued1.take().unwrap(), queued2.take().unwrap()); 103 Poll::Ready(Some(pair)) 104 } else if stream1.is_done() || stream2.is_done() { 105 Poll::Ready(None) 106 } else { 107 Poll::Pending 108 } 109 } 110 size_hint(&self) -> (usize, Option<usize>)111 fn size_hint(&self) -> (usize, Option<usize>) { 112 let queued1_len = if self.queued1.is_some() { 1 } else { 0 }; 113 let queued2_len = if self.queued2.is_some() { 1 } else { 0 }; 114 let (stream1_lower, stream1_upper) = self.stream1.size_hint(); 115 let (stream2_lower, stream2_upper) = self.stream2.size_hint(); 116 117 let stream1_lower = stream1_lower.saturating_add(queued1_len); 118 let stream2_lower = stream2_lower.saturating_add(queued2_len); 119 120 let lower = cmp::min(stream1_lower, stream2_lower); 121 122 let upper = match (stream1_upper, stream2_upper) { 123 (Some(x), Some(y)) => { 124 let x = x.saturating_add(queued1_len); 125 let y = y.saturating_add(queued2_len); 126 Some(cmp::min(x, y)) 127 } 128 (Some(x), None) => x.checked_add(queued1_len), 129 (None, Some(y)) => y.checked_add(queued2_len), 130 (None, None) => None 131 }; 132 133 (lower, upper) 134 } 135 } 136