1 use std::fmt; 2 3 use {Async, IntoFuture, Poll}; 4 use stream::{Stream, Fuse, FuturesOrdered}; 5 6 /// An adaptor for a stream of futures to execute the futures concurrently, if 7 /// possible. 8 /// 9 /// This adaptor will buffer up a list of pending futures, and then return their 10 /// results in the order that they were pulled out of the original stream. This 11 /// is created by the `Stream::buffered` method. 12 #[must_use = "streams do nothing unless polled"] 13 pub struct Buffered<S> 14 where S: Stream, 15 S::Item: IntoFuture, 16 { 17 stream: Fuse<S>, 18 queue: FuturesOrdered<<S::Item as IntoFuture>::Future>, 19 max: usize, 20 } 21 22 impl<S> fmt::Debug for Buffered<S> 23 where S: Stream + fmt::Debug, 24 S::Item: IntoFuture, 25 <<S as Stream>::Item as IntoFuture>::Future: fmt::Debug, 26 <<S as Stream>::Item as IntoFuture>::Item: fmt::Debug, 27 <<S as Stream>::Item as IntoFuture>::Error: fmt::Debug, 28 { 29 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 30 fmt.debug_struct("Buffered") 31 .field("stream", &self.stream) 32 .field("queue", &self.queue) 33 .field("max", &self.max) 34 .finish() 35 } 36 } 37 38 pub fn new<S>(s: S, amt: usize) -> Buffered<S> 39 where S: Stream, 40 S::Item: IntoFuture<Error=<S as Stream>::Error>, 41 { 42 Buffered { 43 stream: super::fuse::new(s), 44 queue: FuturesOrdered::new(), 45 max: amt, 46 } 47 } 48 49 impl<S> Buffered<S> 50 where S: Stream, 51 S::Item: IntoFuture<Error=<S as Stream>::Error>, 52 { 53 /// Acquires a reference to the underlying stream that this combinator is 54 /// pulling from. 55 pub fn get_ref(&self) -> &S { 56 self.stream.get_ref() 57 } 58 59 /// Acquires a mutable reference to the underlying stream that this 60 /// combinator is pulling from. 61 /// 62 /// Note that care must be taken to avoid tampering with the state of the 63 /// stream which may otherwise confuse this combinator. 64 pub fn get_mut(&mut self) -> &mut S { 65 self.stream.get_mut() 66 } 67 68 /// Consumes this combinator, returning the underlying stream. 69 /// 70 /// Note that this may discard intermediate state of this combinator, so 71 /// care should be taken to avoid losing resources when this is called. 72 pub fn into_inner(self) -> S { 73 self.stream.into_inner() 74 } 75 } 76 77 // Forwarding impl of Sink from the underlying stream 78 impl<S> ::sink::Sink for Buffered<S> 79 where S: ::sink::Sink + Stream, 80 S::Item: IntoFuture, 81 { 82 type SinkItem = S::SinkItem; 83 type SinkError = S::SinkError; 84 85 fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> { 86 self.stream.start_send(item) 87 } 88 89 fn poll_complete(&mut self) -> Poll<(), S::SinkError> { 90 self.stream.poll_complete() 91 } 92 93 fn close(&mut self) -> Poll<(), S::SinkError> { 94 self.stream.close() 95 } 96 } 97 98 impl<S> Stream for Buffered<S> 99 where S: Stream, 100 S::Item: IntoFuture<Error=<S as Stream>::Error>, 101 { 102 type Item = <S::Item as IntoFuture>::Item; 103 type Error = <S as Stream>::Error; 104 105 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { 106 // First up, try to spawn off as many futures as possible by filling up 107 // our slab of futures. 108 while self.queue.len() < self.max { 109 let future = match self.stream.poll()? { 110 Async::Ready(Some(s)) => s.into_future(), 111 Async::Ready(None) | 112 Async::NotReady => break, 113 }; 114 115 self.queue.push(future); 116 } 117 118 // Try polling a new future 119 if let Some(val) = try_ready!(self.queue.poll()) { 120 return Ok(Async::Ready(Some(val))); 121 } 122 123 // If we've gotten this far, then there are no events for us to process 124 // and nothing was ready, so figure out if we're not done yet or if 125 // we've reached the end. 126 if self.stream.is_done() { 127 Ok(Async::Ready(None)) 128 } else { 129 Ok(Async::NotReady) 130 } 131 } 132 } 133