1 #![deprecated(note = "functionality provided by `select` now")]
2 #![allow(deprecated)]
3
4 use {Poll, Async};
5 use stream::{Stream, Fuse};
6
7 /// An adapter for merging the output of two streams.
8 ///
9 /// The merged stream produces items from one or both of the underlying
10 /// streams as they become available. Errors, however, are not merged: you
11 /// get at most one error at a time.
12 #[derive(Debug)]
13 #[must_use = "streams do nothing unless polled"]
14 pub struct Merge<S1, S2: Stream> {
15 stream1: Fuse<S1>,
16 stream2: Fuse<S2>,
17 queued_error: Option<S2::Error>,
18 }
19
new<S1, S2>(stream1: S1, stream2: S2) -> Merge<S1, S2> where S1: Stream, S2: Stream<Error = S1::Error>20 pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Merge<S1, S2>
21 where S1: Stream, S2: Stream<Error = S1::Error>
22 {
23 Merge {
24 stream1: stream1.fuse(),
25 stream2: stream2.fuse(),
26 queued_error: None,
27 }
28 }
29
30 /// An item returned from a merge stream, which represents an item from one or
31 /// both of the underlying streams.
32 #[derive(Debug)]
33 pub enum MergedItem<I1, I2> {
34 /// An item from the first stream
35 First(I1),
36 /// An item from the second stream
37 Second(I2),
38 /// Items from both streams
39 Both(I1, I2),
40 }
41
42 impl<S1, S2> Stream for Merge<S1, S2>
43 where S1: Stream, S2: Stream<Error = S1::Error>
44 {
45 type Item = MergedItem<S1::Item, S2::Item>;
46 type Error = S1::Error;
47
poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>48 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
49 if let Some(e) = self.queued_error.take() {
50 return Err(e)
51 }
52
53 match self.stream1.poll()? {
54 Async::NotReady => {
55 match try_ready!(self.stream2.poll()) {
56 Some(item2) => Ok(Async::Ready(Some(MergedItem::Second(item2)))),
57 None => Ok(Async::NotReady),
58 }
59 }
60 Async::Ready(None) => {
61 match try_ready!(self.stream2.poll()) {
62 Some(item2) => Ok(Async::Ready(Some(MergedItem::Second(item2)))),
63 None => Ok(Async::Ready(None)),
64 }
65 }
66 Async::Ready(Some(item1)) => {
67 match self.stream2.poll() {
68 Err(e) => {
69 self.queued_error = Some(e);
70 Ok(Async::Ready(Some(MergedItem::First(item1))))
71 }
72 Ok(Async::NotReady) | Ok(Async::Ready(None)) => {
73 Ok(Async::Ready(Some(MergedItem::First(item1))))
74 }
75 Ok(Async::Ready(Some(item2))) => {
76 Ok(Async::Ready(Some(MergedItem::Both(item1, item2))))
77 }
78 }
79 }
80 }
81 }
82 }
83