1 use crate::stream::{Fuse, Stream};
2 
3 use core::pin::Pin;
4 use core::task::{Context, Poll};
5 use pin_project_lite::pin_project;
6 
7 pin_project! {
8     /// Stream returned by the [`merge`](super::StreamExt::merge) method.
9     pub struct Merge<T, U> {
10         #[pin]
11         a: Fuse<T>,
12         #[pin]
13         b: Fuse<U>,
14         // When `true`, poll `a` first, otherwise, `poll` b`.
15         a_first: bool,
16     }
17 }
18 
19 impl<T, U> Merge<T, U> {
new(a: T, b: U) -> Merge<T, U> where T: Stream, U: Stream,20     pub(super) fn new(a: T, b: U) -> Merge<T, U>
21     where
22         T: Stream,
23         U: Stream,
24     {
25         Merge {
26             a: Fuse::new(a),
27             b: Fuse::new(b),
28             a_first: true,
29         }
30     }
31 }
32 
33 impl<T, U> Stream for Merge<T, U>
34 where
35     T: Stream,
36     U: Stream<Item = T::Item>,
37 {
38     type Item = T::Item;
39 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>>40     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
41         let me = self.project();
42         let a_first = *me.a_first;
43 
44         // Toggle the flag
45         *me.a_first = !a_first;
46 
47         if a_first {
48             poll_next(me.a, me.b, cx)
49         } else {
50             poll_next(me.b, me.a, cx)
51         }
52     }
53 
size_hint(&self) -> (usize, Option<usize>)54     fn size_hint(&self) -> (usize, Option<usize>) {
55         super::merge_size_hints(self.a.size_hint(), self.b.size_hint())
56     }
57 }
58 
poll_next<T, U>( first: Pin<&mut T>, second: Pin<&mut U>, cx: &mut Context<'_>, ) -> Poll<Option<T::Item>> where T: Stream, U: Stream<Item = T::Item>,59 fn poll_next<T, U>(
60     first: Pin<&mut T>,
61     second: Pin<&mut U>,
62     cx: &mut Context<'_>,
63 ) -> Poll<Option<T::Item>>
64 where
65     T: Stream,
66     U: Stream<Item = T::Item>,
67 {
68     use Poll::*;
69 
70     let mut done = true;
71 
72     match first.poll_next(cx) {
73         Ready(Some(val)) => return Ready(Some(val)),
74         Ready(None) => {}
75         Pending => done = false,
76     }
77 
78     match second.poll_next(cx) {
79         Ready(Some(val)) => return Ready(Some(val)),
80         Ready(None) => {}
81         Pending => done = false,
82     }
83 
84     if done {
85         Ready(None)
86     } else {
87         Pending
88     }
89 }
90