1 use crate::stream::{Fuse, Stream};
2
3 use core::pin::Pin;
4 use core::task::{Context, Poll};
5 use pin_project_lite::pin_project;
6
7 pin_project! {
8 /// Stream returned by the [`merge`](super::StreamExt::merge) method.
9 pub struct Merge<T, U> {
10 #[pin]
11 a: Fuse<T>,
12 #[pin]
13 b: Fuse<U>,
14 // When `true`, poll `a` first, otherwise, `poll` b`.
15 a_first: bool,
16 }
17 }
18
19 impl<T, U> Merge<T, U> {
new(a: T, b: U) -> Merge<T, U> where T: Stream, U: Stream,20 pub(super) fn new(a: T, b: U) -> Merge<T, U>
21 where
22 T: Stream,
23 U: Stream,
24 {
25 Merge {
26 a: Fuse::new(a),
27 b: Fuse::new(b),
28 a_first: true,
29 }
30 }
31 }
32
33 impl<T, U> Stream for Merge<T, U>
34 where
35 T: Stream,
36 U: Stream<Item = T::Item>,
37 {
38 type Item = T::Item;
39
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>>40 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
41 let me = self.project();
42 let a_first = *me.a_first;
43
44 // Toggle the flag
45 *me.a_first = !a_first;
46
47 if a_first {
48 poll_next(me.a, me.b, cx)
49 } else {
50 poll_next(me.b, me.a, cx)
51 }
52 }
53
size_hint(&self) -> (usize, Option<usize>)54 fn size_hint(&self) -> (usize, Option<usize>) {
55 super::merge_size_hints(self.a.size_hint(), self.b.size_hint())
56 }
57 }
58
poll_next<T, U>( first: Pin<&mut T>, second: Pin<&mut U>, cx: &mut Context<'_>, ) -> Poll<Option<T::Item>> where T: Stream, U: Stream<Item = T::Item>,59 fn poll_next<T, U>(
60 first: Pin<&mut T>,
61 second: Pin<&mut U>,
62 cx: &mut Context<'_>,
63 ) -> Poll<Option<T::Item>>
64 where
65 T: Stream,
66 U: Stream<Item = T::Item>,
67 {
68 use Poll::*;
69
70 let mut done = true;
71
72 match first.poll_next(cx) {
73 Ready(Some(val)) => return Ready(Some(val)),
74 Ready(None) => {}
75 Pending => done = false,
76 }
77
78 match second.poll_next(cx) {
79 Ready(Some(val)) => return Ready(Some(val)),
80 Ready(None) => {}
81 Pending => done = false,
82 }
83
84 if done {
85 Ready(None)
86 } else {
87 Pending
88 }
89 }
90