1 use {Poll, Async};
2 use stream::{Stream, Fuse};
3
4 /// An adapter for merging the output of two streams.
5 ///
6 /// The merged stream produces items from either of the underlying streams as
7 /// they become available, and the streams are polled in a round-robin fashion.
8 /// Errors, however, are not merged: you get at most one error at a time.
9 #[derive(Debug)]
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Select<S1, S2> {
12 stream1: Fuse<S1>,
13 stream2: Fuse<S2>,
14 flag: bool,
15 }
16
new<S1, S2>(stream1: S1, stream2: S2) -> Select<S1, S2> where S1: Stream, S2: Stream<Item = S1::Item, Error = S1::Error>17 pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Select<S1, S2>
18 where S1: Stream,
19 S2: Stream<Item = S1::Item, Error = S1::Error>
20 {
21 Select {
22 stream1: stream1.fuse(),
23 stream2: stream2.fuse(),
24 flag: false,
25 }
26 }
27
28 impl<S1, S2> Stream for Select<S1, S2>
29 where S1: Stream,
30 S2: Stream<Item = S1::Item, Error = S1::Error>
31 {
32 type Item = S1::Item;
33 type Error = S1::Error;
34
poll(&mut self) -> Poll<Option<S1::Item>, S1::Error>35 fn poll(&mut self) -> Poll<Option<S1::Item>, S1::Error> {
36 let (a, b) = if self.flag {
37 (&mut self.stream2 as &mut Stream<Item=_, Error=_>,
38 &mut self.stream1 as &mut Stream<Item=_, Error=_>)
39 } else {
40 (&mut self.stream1 as &mut Stream<Item=_, Error=_>,
41 &mut self.stream2 as &mut Stream<Item=_, Error=_>)
42 };
43 self.flag = !self.flag;
44
45 let a_done = match a.poll()? {
46 Async::Ready(Some(item)) => return Ok(Some(item).into()),
47 Async::Ready(None) => true,
48 Async::NotReady => false,
49 };
50
51 match b.poll()? {
52 Async::Ready(Some(item)) => {
53 // If the other stream isn't finished yet, give them a chance to
54 // go first next time as we pulled something off `b`.
55 if !a_done {
56 self.flag = !self.flag;
57 }
58 Ok(Some(item).into())
59 }
60 Async::Ready(None) if a_done => Ok(None.into()),
61 Async::Ready(None) | Async::NotReady => Ok(Async::NotReady),
62 }
63 }
64 }
65