1 use std::fmt;
2 
3 use {Async, IntoFuture, Poll};
4 use stream::{Stream, Fuse, FuturesOrdered};
5 
6 /// An adaptor for a stream of futures to execute the futures concurrently, if
7 /// possible.
8 ///
9 /// This adaptor will buffer up a list of pending futures, and then return their
10 /// results in the order that they were pulled out of the original stream. This
11 /// is created by the `Stream::buffered` method.
12 #[must_use = "streams do nothing unless polled"]
13 pub struct Buffered<S>
14     where S: Stream,
15           S::Item: IntoFuture,
16 {
17     stream: Fuse<S>,
18     queue: FuturesOrdered<<S::Item as IntoFuture>::Future>,
19     max: usize,
20 }
21 
22 impl<S> fmt::Debug for Buffered<S>
23     where S: Stream + fmt::Debug,
24           S::Item: IntoFuture,
25           <<S as Stream>::Item as IntoFuture>::Future: fmt::Debug,
26           <<S as Stream>::Item as IntoFuture>::Item: fmt::Debug,
27           <<S as Stream>::Item as IntoFuture>::Error: fmt::Debug,
28 {
29     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
30         fmt.debug_struct("Buffered")
31             .field("stream", &self.stream)
32             .field("queue", &self.queue)
33             .field("max", &self.max)
34             .finish()
35     }
36 }
37 
38 pub fn new<S>(s: S, amt: usize) -> Buffered<S>
39     where S: Stream,
40           S::Item: IntoFuture<Error=<S as Stream>::Error>,
41 {
42     Buffered {
43         stream: super::fuse::new(s),
44         queue: FuturesOrdered::new(),
45         max: amt,
46     }
47 }
48 
49 impl<S> Buffered<S>
50     where S: Stream,
51           S::Item: IntoFuture<Error=<S as Stream>::Error>,
52 {
53     /// Acquires a reference to the underlying stream that this combinator is
54     /// pulling from.
55     pub fn get_ref(&self) -> &S {
56         self.stream.get_ref()
57     }
58 
59     /// Acquires a mutable reference to the underlying stream that this
60     /// combinator is pulling from.
61     ///
62     /// Note that care must be taken to avoid tampering with the state of the
63     /// stream which may otherwise confuse this combinator.
64     pub fn get_mut(&mut self) -> &mut S {
65         self.stream.get_mut()
66     }
67 
68     /// Consumes this combinator, returning the underlying stream.
69     ///
70     /// Note that this may discard intermediate state of this combinator, so
71     /// care should be taken to avoid losing resources when this is called.
72     pub fn into_inner(self) -> S {
73         self.stream.into_inner()
74     }
75 }
76 
77 // Forwarding impl of Sink from the underlying stream
78 impl<S> ::sink::Sink for Buffered<S>
79     where S: ::sink::Sink + Stream,
80           S::Item: IntoFuture,
81 {
82     type SinkItem = S::SinkItem;
83     type SinkError = S::SinkError;
84 
85     fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
86         self.stream.start_send(item)
87     }
88 
89     fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
90         self.stream.poll_complete()
91     }
92 
93     fn close(&mut self) -> Poll<(), S::SinkError> {
94         self.stream.close()
95     }
96 }
97 
98 impl<S> Stream for Buffered<S>
99     where S: Stream,
100           S::Item: IntoFuture<Error=<S as Stream>::Error>,
101 {
102     type Item = <S::Item as IntoFuture>::Item;
103     type Error = <S as Stream>::Error;
104 
105     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
106         // First up, try to spawn off as many futures as possible by filling up
107         // our slab of futures.
108         while self.queue.len() < self.max {
109             let future = match self.stream.poll()? {
110                 Async::Ready(Some(s)) => s.into_future(),
111                 Async::Ready(None) |
112                 Async::NotReady => break,
113             };
114 
115             self.queue.push(future);
116         }
117 
118         // Try polling a new future
119         if let Some(val) = try_ready!(self.queue.poll()) {
120             return Ok(Async::Ready(Some(val)));
121         }
122 
123         // If we've gotten this far, then there are no events for us to process
124         // and nothing was ready, so figure out if we're not done yet  or if
125         // we've reached the end.
126         if self.stream.is_done() {
127             Ok(Async::Ready(None))
128         } else {
129             Ok(Async::NotReady)
130         }
131     }
132 }
133