1 use crate::stream::{StreamExt, Fuse};
2 use core::cmp;
3 use core::pin::Pin;
4 use futures_core::stream::{FusedStream, Stream};
5 use futures_core::task::{Context, Poll};
6 use pin_utils::{unsafe_pinned, unsafe_unpinned};
7 
8 /// Stream for the [`zip`](super::StreamExt::zip) method.
9 #[derive(Debug)]
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Zip<St1: Stream, St2: Stream> {
12     stream1: Fuse<St1>,
13     stream2: Fuse<St2>,
14     queued1: Option<St1::Item>,
15     queued2: Option<St2::Item>,
16 }
17 
18 #[allow(clippy::type_repetition_in_bounds)] // https://github.com/rust-lang/rust-clippy/issues/4323
19 impl<St1, St2> Unpin for Zip<St1, St2>
20 where
21     St1: Stream,
22     Fuse<St1>: Unpin,
23     St2: Stream,
24     Fuse<St2>: Unpin,
25 {}
26 
27 impl<St1: Stream, St2: Stream> Zip<St1, St2> {
28     unsafe_pinned!(stream1: Fuse<St1>);
29     unsafe_pinned!(stream2: Fuse<St2>);
30     unsafe_unpinned!(queued1: Option<St1::Item>);
31     unsafe_unpinned!(queued2: Option<St2::Item>);
32 
new(stream1: St1, stream2: St2) -> Zip<St1, St2>33     pub(super) fn new(stream1: St1, stream2: St2) -> Zip<St1, St2> {
34         Zip {
35             stream1: stream1.fuse(),
36             stream2: stream2.fuse(),
37             queued1: None,
38             queued2: None,
39         }
40     }
41 
42     /// Acquires a reference to the underlying streams that this combinator is
43     /// pulling from.
get_ref(&self) -> (&St1, &St2)44     pub fn get_ref(&self) -> (&St1, &St2) {
45         (self.stream1.get_ref(), self.stream2.get_ref())
46     }
47 
48     /// Acquires a mutable reference to the underlying streams that this
49     /// combinator is pulling from.
50     ///
51     /// Note that care must be taken to avoid tampering with the state of the
52     /// stream which may otherwise confuse this combinator.
get_mut(&mut self) -> (&mut St1, &mut St2)53     pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
54         (self.stream1.get_mut(), self.stream2.get_mut())
55     }
56 
57     /// Acquires a pinned mutable reference to the underlying streams that this
58     /// combinator is pulling from.
59     ///
60     /// Note that care must be taken to avoid tampering with the state of the
61     /// stream which may otherwise confuse this combinator.
get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>)62     pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
63         unsafe {
64             let Self { stream1, stream2, .. } = self.get_unchecked_mut();
65             (Pin::new_unchecked(stream1).get_pin_mut(), Pin::new_unchecked(stream2).get_pin_mut())
66         }
67     }
68 
69     /// Consumes this combinator, returning the underlying streams.
70     ///
71     /// Note that this may discard intermediate state of this combinator, so
72     /// care should be taken to avoid losing resources when this is called.
into_inner(self) -> (St1, St2)73     pub fn into_inner(self) -> (St1, St2) {
74         (self.stream1.into_inner(), self.stream2.into_inner())
75     }
76 }
77 
78 impl<St1, St2> FusedStream for Zip<St1, St2>
79     where St1: Stream, St2: Stream,
80 {
is_terminated(&self) -> bool81     fn is_terminated(&self) -> bool {
82         self.stream1.is_terminated() && self.stream2.is_terminated()
83     }
84 }
85 
86 impl<St1, St2> Stream for Zip<St1, St2>
87     where St1: Stream, St2: Stream
88 {
89     type Item = (St1::Item, St2::Item);
90 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>91     fn poll_next(
92         mut self: Pin<&mut Self>,
93         cx: &mut Context<'_>,
94     ) -> Poll<Option<Self::Item>> {
95         if self.queued1.is_none() {
96             match self.as_mut().stream1().poll_next(cx) {
97                 Poll::Ready(Some(item1)) => *self.as_mut().queued1() = Some(item1),
98                 Poll::Ready(None) | Poll::Pending => {}
99             }
100         }
101         if self.queued2.is_none() {
102             match self.as_mut().stream2().poll_next(cx) {
103                 Poll::Ready(Some(item2)) => *self.as_mut().queued2() = Some(item2),
104                 Poll::Ready(None) | Poll::Pending => {}
105             }
106         }
107 
108         if self.queued1.is_some() && self.queued2.is_some() {
109             let pair = (self.as_mut().queued1().take().unwrap(),
110                         self.as_mut().queued2().take().unwrap());
111             Poll::Ready(Some(pair))
112         } else if self.stream1.is_done() || self.stream2.is_done() {
113             Poll::Ready(None)
114         } else {
115             Poll::Pending
116         }
117     }
118 
size_hint(&self) -> (usize, Option<usize>)119     fn size_hint(&self) -> (usize, Option<usize>) {
120         let queued1_len = if self.queued1.is_some() { 1 } else { 0 };
121         let queued2_len = if self.queued2.is_some() { 1 } else { 0 };
122         let (stream1_lower, stream1_upper) = self.stream1.size_hint();
123         let (stream2_lower, stream2_upper) = self.stream2.size_hint();
124 
125         let stream1_lower = stream1_lower.saturating_add(queued1_len);
126         let stream2_lower = stream2_lower.saturating_add(queued2_len);
127 
128         let lower = cmp::min(stream1_lower, stream2_lower);
129 
130         let upper = match (stream1_upper, stream2_upper) {
131             (Some(x), Some(y)) => {
132                 let x = x.saturating_add(queued1_len);
133                 let y = y.saturating_add(queued2_len);
134                 Some(cmp::min(x, y))
135             }
136             (Some(x), None) => x.checked_add(queued1_len),
137             (None, Some(y)) => y.checked_add(queued2_len),
138             (None, None) => None
139         };
140 
141         (lower, upper)
142     }
143 }
144