1 use {Poll, Async};
2 use stream::{Stream, Fuse};
3 
4 /// An adapter for merging the output of two streams.
5 ///
6 /// The merged stream produces items from either of the underlying streams as
7 /// they become available, and the streams are polled in a round-robin fashion.
8 /// Errors, however, are not merged: you get at most one error at a time.
9 #[derive(Debug)]
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Select<S1, S2> {
12     stream1: Fuse<S1>,
13     stream2: Fuse<S2>,
14     flag: bool,
15 }
16 
new<S1, S2>(stream1: S1, stream2: S2) -> Select<S1, S2> where S1: Stream, S2: Stream<Item = S1::Item, Error = S1::Error>17 pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Select<S1, S2>
18     where S1: Stream,
19           S2: Stream<Item = S1::Item, Error = S1::Error>
20 {
21     Select {
22         stream1: stream1.fuse(),
23         stream2: stream2.fuse(),
24         flag: false,
25     }
26 }
27 
28 impl<S1, S2> Stream for Select<S1, S2>
29     where S1: Stream,
30           S2: Stream<Item = S1::Item, Error = S1::Error>
31 {
32     type Item = S1::Item;
33     type Error = S1::Error;
34 
poll(&mut self) -> Poll<Option<S1::Item>, S1::Error>35     fn poll(&mut self) -> Poll<Option<S1::Item>, S1::Error> {
36         let (a, b) = if self.flag {
37             (&mut self.stream2 as &mut Stream<Item=_, Error=_>,
38              &mut self.stream1 as &mut Stream<Item=_, Error=_>)
39         } else {
40             (&mut self.stream1 as &mut Stream<Item=_, Error=_>,
41              &mut self.stream2 as &mut Stream<Item=_, Error=_>)
42         };
43         self.flag = !self.flag;
44 
45         let a_done = match a.poll()? {
46             Async::Ready(Some(item)) => return Ok(Some(item).into()),
47             Async::Ready(None) => true,
48             Async::NotReady => false,
49         };
50 
51         match b.poll()? {
52             Async::Ready(Some(item)) => {
53                 // If the other stream isn't finished yet, give them a chance to
54                 // go first next time as we pulled something off `b`.
55                 if !a_done {
56                     self.flag = !self.flag;
57                 }
58                 Ok(Some(item).into())
59             }
60             Async::Ready(None) if a_done => Ok(None.into()),
61             Async::Ready(None) | Async::NotReady => Ok(Async::NotReady),
62         }
63     }
64 }
65