1 #![deprecated(note = "functionality provided by `select` now")]
2 #![allow(deprecated)]
3 
4 use {Poll, Async};
5 use stream::{Stream, Fuse};
6 
7 /// An adapter for merging the output of two streams.
8 ///
9 /// The merged stream produces items from one or both of the underlying
10 /// streams as they become available. Errors, however, are not merged: you
11 /// get at most one error at a time.
12 #[derive(Debug)]
13 #[must_use = "streams do nothing unless polled"]
14 pub struct Merge<S1, S2: Stream> {
15     stream1: Fuse<S1>,
16     stream2: Fuse<S2>,
17     queued_error: Option<S2::Error>,
18 }
19 
new<S1, S2>(stream1: S1, stream2: S2) -> Merge<S1, S2> where S1: Stream, S2: Stream<Error = S1::Error>20 pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Merge<S1, S2>
21     where S1: Stream, S2: Stream<Error = S1::Error>
22 {
23     Merge {
24         stream1: stream1.fuse(),
25         stream2: stream2.fuse(),
26         queued_error: None,
27     }
28 }
29 
30 /// An item returned from a merge stream, which represents an item from one or
31 /// both of the underlying streams.
32 #[derive(Debug)]
33 pub enum MergedItem<I1, I2> {
34     /// An item from the first stream
35     First(I1),
36     /// An item from the second stream
37     Second(I2),
38     /// Items from both streams
39     Both(I1, I2),
40 }
41 
42 impl<S1, S2> Stream for Merge<S1, S2>
43     where S1: Stream, S2: Stream<Error = S1::Error>
44 {
45     type Item = MergedItem<S1::Item, S2::Item>;
46     type Error = S1::Error;
47 
poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>48     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
49         if let Some(e) = self.queued_error.take() {
50             return Err(e)
51         }
52 
53         match self.stream1.poll()? {
54             Async::NotReady => {
55                 match try_ready!(self.stream2.poll()) {
56                     Some(item2) => Ok(Async::Ready(Some(MergedItem::Second(item2)))),
57                     None => Ok(Async::NotReady),
58                 }
59             }
60             Async::Ready(None) => {
61                 match try_ready!(self.stream2.poll()) {
62                     Some(item2) => Ok(Async::Ready(Some(MergedItem::Second(item2)))),
63                     None => Ok(Async::Ready(None)),
64                 }
65             }
66             Async::Ready(Some(item1)) => {
67                 match self.stream2.poll() {
68                     Err(e) => {
69                         self.queued_error = Some(e);
70                         Ok(Async::Ready(Some(MergedItem::First(item1))))
71                     }
72                     Ok(Async::NotReady) | Ok(Async::Ready(None)) => {
73                         Ok(Async::Ready(Some(MergedItem::First(item1))))
74                     }
75                     Ok(Async::Ready(Some(item2))) => {
76                         Ok(Async::Ready(Some(MergedItem::Both(item1, item2))))
77                     }
78                 }
79             }
80         }
81     }
82 }
83