1 use crate::stream::{StreamExt, Fuse}; 2 use core::cmp; 3 use core::pin::Pin; 4 use futures_core::stream::{FusedStream, Stream}; 5 use futures_core::task::{Context, Poll}; 6 use pin_utils::{unsafe_pinned, unsafe_unpinned}; 7 8 /// Stream for the [`zip`](super::StreamExt::zip) method. 9 #[derive(Debug)] 10 #[must_use = "streams do nothing unless polled"] 11 pub struct Zip<St1: Stream, St2: Stream> { 12 stream1: Fuse<St1>, 13 stream2: Fuse<St2>, 14 queued1: Option<St1::Item>, 15 queued2: Option<St2::Item>, 16 } 17 18 #[allow(clippy::type_repetition_in_bounds)] // https://github.com/rust-lang/rust-clippy/issues/4323 19 impl<St1, St2> Unpin for Zip<St1, St2> 20 where 21 St1: Stream, 22 Fuse<St1>: Unpin, 23 St2: Stream, 24 Fuse<St2>: Unpin, 25 {} 26 27 impl<St1: Stream, St2: Stream> Zip<St1, St2> { 28 unsafe_pinned!(stream1: Fuse<St1>); 29 unsafe_pinned!(stream2: Fuse<St2>); 30 unsafe_unpinned!(queued1: Option<St1::Item>); 31 unsafe_unpinned!(queued2: Option<St2::Item>); 32 new(stream1: St1, stream2: St2) -> Zip<St1, St2>33 pub(super) fn new(stream1: St1, stream2: St2) -> Zip<St1, St2> { 34 Zip { 35 stream1: stream1.fuse(), 36 stream2: stream2.fuse(), 37 queued1: None, 38 queued2: None, 39 } 40 } 41 42 /// Acquires a reference to the underlying streams that this combinator is 43 /// pulling from. get_ref(&self) -> (&St1, &St2)44 pub fn get_ref(&self) -> (&St1, &St2) { 45 (self.stream1.get_ref(), self.stream2.get_ref()) 46 } 47 48 /// Acquires a mutable reference to the underlying streams that this 49 /// combinator is pulling from. 50 /// 51 /// Note that care must be taken to avoid tampering with the state of the 52 /// stream which may otherwise confuse this combinator. get_mut(&mut self) -> (&mut St1, &mut St2)53 pub fn get_mut(&mut self) -> (&mut St1, &mut St2) { 54 (self.stream1.get_mut(), self.stream2.get_mut()) 55 } 56 57 /// Acquires a pinned mutable reference to the underlying streams that this 58 /// combinator is pulling from. 59 /// 60 /// Note that care must be taken to avoid tampering with the state of the 61 /// stream which may otherwise confuse this combinator. get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>)62 pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { 63 unsafe { 64 let Self { stream1, stream2, .. } = self.get_unchecked_mut(); 65 (Pin::new_unchecked(stream1).get_pin_mut(), Pin::new_unchecked(stream2).get_pin_mut()) 66 } 67 } 68 69 /// Consumes this combinator, returning the underlying streams. 70 /// 71 /// Note that this may discard intermediate state of this combinator, so 72 /// care should be taken to avoid losing resources when this is called. into_inner(self) -> (St1, St2)73 pub fn into_inner(self) -> (St1, St2) { 74 (self.stream1.into_inner(), self.stream2.into_inner()) 75 } 76 } 77 78 impl<St1, St2> FusedStream for Zip<St1, St2> 79 where St1: Stream, St2: Stream, 80 { is_terminated(&self) -> bool81 fn is_terminated(&self) -> bool { 82 self.stream1.is_terminated() && self.stream2.is_terminated() 83 } 84 } 85 86 impl<St1, St2> Stream for Zip<St1, St2> 87 where St1: Stream, St2: Stream 88 { 89 type Item = (St1::Item, St2::Item); 90 poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>91 fn poll_next( 92 mut self: Pin<&mut Self>, 93 cx: &mut Context<'_>, 94 ) -> Poll<Option<Self::Item>> { 95 if self.queued1.is_none() { 96 match self.as_mut().stream1().poll_next(cx) { 97 Poll::Ready(Some(item1)) => *self.as_mut().queued1() = Some(item1), 98 Poll::Ready(None) | Poll::Pending => {} 99 } 100 } 101 if self.queued2.is_none() { 102 match self.as_mut().stream2().poll_next(cx) { 103 Poll::Ready(Some(item2)) => *self.as_mut().queued2() = Some(item2), 104 Poll::Ready(None) | Poll::Pending => {} 105 } 106 } 107 108 if self.queued1.is_some() && self.queued2.is_some() { 109 let pair = (self.as_mut().queued1().take().unwrap(), 110 self.as_mut().queued2().take().unwrap()); 111 Poll::Ready(Some(pair)) 112 } else if self.stream1.is_done() || self.stream2.is_done() { 113 Poll::Ready(None) 114 } else { 115 Poll::Pending 116 } 117 } 118 size_hint(&self) -> (usize, Option<usize>)119 fn size_hint(&self) -> (usize, Option<usize>) { 120 let queued1_len = if self.queued1.is_some() { 1 } else { 0 }; 121 let queued2_len = if self.queued2.is_some() { 1 } else { 0 }; 122 let (stream1_lower, stream1_upper) = self.stream1.size_hint(); 123 let (stream2_lower, stream2_upper) = self.stream2.size_hint(); 124 125 let stream1_lower = stream1_lower.saturating_add(queued1_len); 126 let stream2_lower = stream2_lower.saturating_add(queued2_len); 127 128 let lower = cmp::min(stream1_lower, stream2_lower); 129 130 let upper = match (stream1_upper, stream2_upper) { 131 (Some(x), Some(y)) => { 132 let x = x.saturating_add(queued1_len); 133 let y = y.saturating_add(queued2_len); 134 Some(cmp::min(x, y)) 135 } 136 (Some(x), None) => x.checked_add(queued1_len), 137 (None, Some(y)) => y.checked_add(queued2_len), 138 (None, None) => None 139 }; 140 141 (lower, upper) 142 } 143 } 144