1 //! Asynchronous streams
2 //!
3 //! This module contains the `Stream` trait and a number of adaptors for this
4 //! trait. This trait is very similar to the `Iterator` trait in the standard
5 //! library except that it expresses the concept of blocking as well. A stream
6 //! here is a sequential sequence of values which may take some amount of time
7 //! in between to produce.
8 //!
9 //! A stream may request that it is blocked between values while the next value
10 //! is calculated, and provides a way to get notified once the next value is
11 //! ready as well.
12 //!
13 //! You can find more information/tutorials about streams [online at
14 //! https://tokio.rs][online]
15 //!
16 //! [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/
17 
18 use {IntoFuture, Poll};
19 
20 mod iter;
21 #[allow(deprecated)]
22 pub use self::iter::{iter, Iter};
23 #[cfg(feature = "with-deprecated")]
24 #[allow(deprecated)]
25 pub use self::Iter as IterStream;
26 mod iter_ok;
27 pub use self::iter_ok::{iter_ok, IterOk};
28 mod iter_result;
29 pub use self::iter_result::{iter_result, IterResult};
30 
31 mod repeat;
32 pub use self::repeat::{repeat, Repeat};
33 
34 mod and_then;
35 mod chain;
36 mod concat;
37 mod empty;
38 mod filter;
39 mod filter_map;
40 mod flatten;
41 mod fold;
42 mod for_each;
43 mod from_err;
44 mod fuse;
45 mod future;
46 mod inspect;
47 mod inspect_err;
48 mod map;
49 mod map_err;
50 mod merge;
51 mod once;
52 mod or_else;
53 mod peek;
54 mod poll_fn;
55 mod select;
56 mod skip;
57 mod skip_while;
58 mod take;
59 mod take_while;
60 mod then;
61 mod unfold;
62 mod zip;
63 mod forward;
64 pub use self::and_then::AndThen;
65 pub use self::chain::Chain;
66 #[allow(deprecated)]
67 pub use self::concat::Concat;
68 pub use self::concat::Concat2;
69 pub use self::empty::{Empty, empty};
70 pub use self::filter::Filter;
71 pub use self::filter_map::FilterMap;
72 pub use self::flatten::Flatten;
73 pub use self::fold::Fold;
74 pub use self::for_each::ForEach;
75 pub use self::from_err::FromErr;
76 pub use self::fuse::Fuse;
77 pub use self::future::StreamFuture;
78 pub use self::inspect::Inspect;
79 pub use self::inspect_err::InspectErr;
80 pub use self::map::Map;
81 pub use self::map_err::MapErr;
82 #[allow(deprecated)]
83 pub use self::merge::{Merge, MergedItem};
84 pub use self::once::{Once, once};
85 pub use self::or_else::OrElse;
86 pub use self::peek::Peekable;
87 pub use self::poll_fn::{poll_fn, PollFn};
88 pub use self::select::Select;
89 pub use self::skip::Skip;
90 pub use self::skip_while::SkipWhile;
91 pub use self::take::Take;
92 pub use self::take_while::TakeWhile;
93 pub use self::then::Then;
94 pub use self::unfold::{Unfold, unfold};
95 pub use self::zip::Zip;
96 pub use self::forward::Forward;
97 use sink::{Sink};
98 
99 if_std! {
100     use std;
101 
102     mod buffered;
103     mod buffer_unordered;
104     mod catch_unwind;
105     mod chunks;
106     mod collect;
107     mod wait;
108     mod channel;
109     mod split;
110     pub mod futures_unordered;
111     mod futures_ordered;
112     pub use self::buffered::Buffered;
113     pub use self::buffer_unordered::BufferUnordered;
114     pub use self::catch_unwind::CatchUnwind;
115     pub use self::chunks::Chunks;
116     pub use self::collect::Collect;
117     pub use self::wait::Wait;
118     pub use self::split::{SplitStream, SplitSink, ReuniteError};
119     pub use self::futures_unordered::FuturesUnordered;
120     pub use self::futures_ordered::{futures_ordered, FuturesOrdered};
121 
122     #[doc(hidden)]
123     #[cfg(feature = "with-deprecated")]
124     #[allow(deprecated)]
125     pub use self::channel::{channel, Sender, Receiver, FutureSender, SendError};
126 
127     /// A type alias for `Box<Stream + Send>`
128     #[doc(hidden)]
129     #[deprecated(note = "removed without replacement, recommended to use a \
130                          local extension trait or function if needed, more \
131                          details in https://github.com/rust-lang-nursery/futures-rs/issues/228")]
132     pub type BoxStream<T, E> = ::std::boxed::Box<Stream<Item = T, Error = E> + Send>;
133 
134     impl<S: ?Sized + Stream> Stream for ::std::boxed::Box<S> {
135         type Item = S::Item;
136         type Error = S::Error;
137 
138         fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
139             (**self).poll()
140         }
141     }
142 }
143 
144 /// A stream of values, not all of which may have been produced yet.
145 ///
146 /// `Stream` is a trait to represent any source of sequential events or items
147 /// which acts like an iterator but long periods of time may pass between
148 /// items. Like `Future` the methods of `Stream` never block and it is thus
149 /// suitable for programming in an asynchronous fashion. This trait is very
150 /// similar to the `Iterator` trait in the standard library where `Some` is
151 /// used to signal elements of the stream and `None` is used to indicate that
152 /// the stream is finished.
153 ///
154 /// Like futures a stream has basic combinators to transform the stream, perform
155 /// more work on each item, etc.
156 ///
157 /// You can find more information/tutorials about streams [online at
158 /// https://tokio.rs][online]
159 ///
160 /// [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/
161 ///
162 /// # Streams as Futures
163 ///
164 /// Any instance of `Stream` can also be viewed as a `Future` where the resolved
165 /// value is the next item in the stream along with the rest of the stream. The
166 /// `into_future` adaptor can be used here to convert any stream into a future
167 /// for use with other future methods like `join` and `select`.
168 ///
169 /// # Errors
170 ///
171 /// Streams, like futures, can also model errors in their computation. All
172 /// streams have an associated `Error` type like with futures. Currently as of
173 /// the 0.1 release of this library an error on a stream **does not terminate
174 /// the stream**. That is, after one error is received, another error may be
175 /// received from the same stream (it's valid to keep polling).
176 ///
177 /// This property of streams, however, is [being considered] for change in 0.2
178 /// where an error on a stream is similar to `None`, it terminates the stream
179 /// entirely. If one of these use cases suits you perfectly and not the other,
180 /// please feel welcome to comment on [the issue][being considered]!
181 ///
182 /// [being considered]: https://github.com/rust-lang-nursery/futures-rs/issues/206
183 #[must_use = "streams do nothing unless polled"]
184 pub trait Stream {
185     /// The type of item this stream will yield on success.
186     type Item;
187 
188     /// The type of error this stream may generate.
189     type Error;
190 
191     /// Attempt to pull out the next value of this stream, returning `None` if
192     /// the stream is finished.
193     ///
194     /// This method, like `Future::poll`, is the sole method of pulling out a
195     /// value from a stream. This method must also be run within the context of
196     /// a task typically and implementors of this trait must ensure that
197     /// implementations of this method do not block, as it may cause consumers
198     /// to behave badly.
199     ///
200     /// # Return value
201     ///
202     /// If `NotReady` is returned then this stream's next value is not ready
203     /// yet and implementations will ensure that the current task will be
204     /// notified when the next value may be ready. If `Some` is returned then
205     /// the returned value represents the next value on the stream. `Err`
206     /// indicates an error happened, while `Ok` indicates whether there was a
207     /// new item on the stream or whether the stream has terminated.
208     ///
209     /// # Panics
210     ///
211     /// Once a stream is finished, that is `Ready(None)` has been returned,
212     /// further calls to `poll` may result in a panic or other "bad behavior".
213     /// If this is difficult to guard against then the `fuse` adapter can be
214     /// used to ensure that `poll` always has well-defined semantics.
215     // TODO: more here
poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>216     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
217 
218     // TODO: should there also be a method like `poll` but doesn't return an
219     //       item? basically just says "please make more progress internally"
220     //       seems crucial for buffering to actually make any sense.
221 
222     /// Creates an iterator which blocks the current thread until each item of
223     /// this stream is resolved.
224     ///
225     /// This method will consume ownership of this stream, returning an
226     /// implementation of a standard iterator. This iterator will *block the
227     /// current thread* on each call to `next` if the item in the stream isn't
228     /// ready yet.
229     ///
230     /// > **Note:** This method is not appropriate to call on event loops or
231     /// >           similar I/O situations because it will prevent the event
232     /// >           loop from making progress (this blocks the thread). This
233     /// >           method should only be called when it's guaranteed that the
234     /// >           blocking work associated with this stream will be completed
235     /// >           by another thread.
236     ///
237     /// This method is only available when the `use_std` feature of this
238     /// library is activated, and it is activated by default.
239     ///
240     /// # Panics
241     ///
242     /// The returned iterator does not attempt to catch panics. If the `poll`
243     /// function panics, panics will be propagated to the caller of `next`.
244     #[cfg(feature = "use_std")]
wait(self) -> Wait<Self> where Self: Sized245     fn wait(self) -> Wait<Self>
246         where Self: Sized
247     {
248         wait::new(self)
249     }
250 
251     /// Convenience function for turning this stream into a trait object.
252     ///
253     /// This simply avoids the need to write `Box::new` and can often help with
254     /// type inference as well by always returning a trait object. Note that
255     /// this method requires the `Send` bound and returns a `BoxStream`, which
256     /// also encodes this. If you'd like to create a `Box<Stream>` without the
257     /// `Send` bound, then the `Box::new` function can be used instead.
258     ///
259     /// This method is only available when the `use_std` feature of this
260     /// library is activated, and it is activated by default.
261     ///
262     /// # Examples
263     ///
264     /// ```
265     /// use futures::stream::*;
266     /// use futures::sync::mpsc;
267     ///
268     /// let (_tx, rx) = mpsc::channel(1);
269     /// let a: BoxStream<i32, ()> = rx.boxed();
270     /// ```
271     #[cfg(feature = "use_std")]
272     #[doc(hidden)]
273     #[deprecated(note = "removed without replacement, recommended to use a \
274                          local extension trait or function if needed, more \
275                          details in https://github.com/rust-lang-nursery/futures-rs/issues/228")]
276     #[allow(deprecated)]
boxed(self) -> BoxStream<Self::Item, Self::Error> where Self: Sized + Send + 'static,277     fn boxed(self) -> BoxStream<Self::Item, Self::Error>
278         where Self: Sized + Send + 'static,
279     {
280         ::std::boxed::Box::new(self)
281     }
282 
283     /// Converts this stream into a `Future`.
284     ///
285     /// A stream can be viewed as a future which will resolve to a pair containing
286     /// the next element of the stream plus the remaining stream. If the stream
287     /// terminates, then the next element is `None` and the remaining stream is
288     /// still passed back, to allow reclamation of its resources.
289     ///
290     /// The returned future can be used to compose streams and futures together by
291     /// placing everything into the "world of futures".
into_future(self) -> StreamFuture<Self> where Self: Sized292     fn into_future(self) -> StreamFuture<Self>
293         where Self: Sized
294     {
295         future::new(self)
296     }
297 
298     /// Converts a stream of type `T` to a stream of type `U`.
299     ///
300     /// The provided closure is executed over all elements of this stream as
301     /// they are made available, and the callback will be executed inline with
302     /// calls to `poll`.
303     ///
304     /// Note that this function consumes the receiving stream and returns a
305     /// wrapped version of it, similar to the existing `map` methods in the
306     /// standard library.
307     ///
308     /// # Examples
309     ///
310     /// ```
311     /// use futures::prelude::*;
312     /// use futures::sync::mpsc;
313     ///
314     /// let (_tx, rx) = mpsc::channel::<i32>(1);
315     /// let rx = rx.map(|x| x + 3);
316     /// ```
map<U, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> U, Self: Sized317     fn map<U, F>(self, f: F) -> Map<Self, F>
318         where F: FnMut(Self::Item) -> U,
319               Self: Sized
320     {
321         map::new(self, f)
322     }
323 
324     /// Converts a stream of error type `T` to a stream of error type `U`.
325     ///
326     /// The provided closure is executed over all errors of this stream as
327     /// they are made available, and the callback will be executed inline with
328     /// calls to `poll`.
329     ///
330     /// Note that this function consumes the receiving stream and returns a
331     /// wrapped version of it, similar to the existing `map_err` methods in the
332     /// standard library.
333     ///
334     /// # Examples
335     ///
336     /// ```
337     /// use futures::prelude::*;
338     /// use futures::sync::mpsc;
339     ///
340     /// let (_tx, rx) = mpsc::channel::<i32>(1);
341     /// let rx = rx.map_err(|()| 3);
342     /// ```
map_err<U, F>(self, f: F) -> MapErr<Self, F> where F: FnMut(Self::Error) -> U, Self: Sized343     fn map_err<U, F>(self, f: F) -> MapErr<Self, F>
344         where F: FnMut(Self::Error) -> U,
345               Self: Sized
346     {
347         map_err::new(self, f)
348     }
349 
350     /// Filters the values produced by this stream according to the provided
351     /// predicate.
352     ///
353     /// As values of this stream are made available, the provided predicate will
354     /// be run against them. If the predicate returns `true` then the stream
355     /// will yield the value, but if the predicate returns `false` then the
356     /// value will be discarded and the next value will be produced.
357     ///
358     /// All errors are passed through without filtering in this combinator.
359     ///
360     /// Note that this function consumes the receiving stream and returns a
361     /// wrapped version of it, similar to the existing `filter` methods in the
362     /// standard library.
363     ///
364     /// # Examples
365     ///
366     /// ```
367     /// use futures::prelude::*;
368     /// use futures::sync::mpsc;
369     ///
370     /// let (_tx, rx) = mpsc::channel::<i32>(1);
371     /// let evens = rx.filter(|x| x % 2 == 0);
372     /// ```
filter<F>(self, f: F) -> Filter<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized373     fn filter<F>(self, f: F) -> Filter<Self, F>
374         where F: FnMut(&Self::Item) -> bool,
375               Self: Sized
376     {
377         filter::new(self, f)
378     }
379 
380     /// Filters the values produced by this stream while simultaneously mapping
381     /// them to a different type.
382     ///
383     /// As values of this stream are made available, the provided function will
384     /// be run on them. If the predicate returns `Some(e)` then the stream will
385     /// yield the value `e`, but if the predicate returns `None` then the next
386     /// value will be produced.
387     ///
388     /// All errors are passed through without filtering in this combinator.
389     ///
390     /// Note that this function consumes the receiving stream and returns a
391     /// wrapped version of it, similar to the existing `filter_map` methods in the
392     /// standard library.
393     ///
394     /// # Examples
395     ///
396     /// ```
397     /// use futures::prelude::*;
398     /// use futures::sync::mpsc;
399     ///
400     /// let (_tx, rx) = mpsc::channel::<i32>(1);
401     /// let evens_plus_one = rx.filter_map(|x| {
402     ///     if x % 0 == 2 {
403     ///         Some(x + 1)
404     ///     } else {
405     ///         None
406     ///     }
407     /// });
408     /// ```
filter_map<F, B>(self, f: F) -> FilterMap<Self, F> where F: FnMut(Self::Item) -> Option<B>, Self: Sized409     fn filter_map<F, B>(self, f: F) -> FilterMap<Self, F>
410         where F: FnMut(Self::Item) -> Option<B>,
411               Self: Sized
412     {
413         filter_map::new(self, f)
414     }
415 
416     /// Chain on a computation for when a value is ready, passing the resulting
417     /// item to the provided closure `f`.
418     ///
419     /// This function can be used to ensure a computation runs regardless of
420     /// the next value on the stream. The closure provided will be yielded a
421     /// `Result` once a value is ready, and the returned future will then be run
422     /// to completion to produce the next value on this stream.
423     ///
424     /// The returned value of the closure must implement the `IntoFuture` trait
425     /// and can represent some more work to be done before the composed stream
426     /// is finished. Note that the `Result` type implements the `IntoFuture`
427     /// trait so it is possible to simply alter the `Result` yielded to the
428     /// closure and return it.
429     ///
430     /// Note that this function consumes the receiving stream and returns a
431     /// wrapped version of it.
432     ///
433     /// # Examples
434     ///
435     /// ```
436     /// use futures::prelude::*;
437     /// use futures::sync::mpsc;
438     ///
439     /// let (_tx, rx) = mpsc::channel::<i32>(1);
440     ///
441     /// let rx = rx.then(|result| {
442     ///     match result {
443     ///         Ok(e) => Ok(e + 3),
444     ///         Err(()) => Err(4),
445     ///     }
446     /// });
447     /// ```
then<F, U>(self, f: F) -> Then<Self, F, U> where F: FnMut(Result<Self::Item, Self::Error>) -> U, U: IntoFuture, Self: Sized448     fn then<F, U>(self, f: F) -> Then<Self, F, U>
449         where F: FnMut(Result<Self::Item, Self::Error>) -> U,
450               U: IntoFuture,
451               Self: Sized
452     {
453         then::new(self, f)
454     }
455 
456     /// Chain on a computation for when a value is ready, passing the successful
457     /// results to the provided closure `f`.
458     ///
459     /// This function can be used to run a unit of work when the next successful
460     /// value on a stream is ready. The closure provided will be yielded a value
461     /// when ready, and the returned future will then be run to completion to
462     /// produce the next value on this stream.
463     ///
464     /// Any errors produced by this stream will not be passed to the closure,
465     /// and will be passed through.
466     ///
467     /// The returned value of the closure must implement the `IntoFuture` trait
468     /// and can represent some more work to be done before the composed stream
469     /// is finished. Note that the `Result` type implements the `IntoFuture`
470     /// trait so it is possible to simply alter the `Result` yielded to the
471     /// closure and return it.
472     ///
473     /// Note that this function consumes the receiving stream and returns a
474     /// wrapped version of it.
475     ///
476     /// To process the entire stream and return a single future representing
477     /// success or error, use `for_each` instead.
478     ///
479     /// # Examples
480     ///
481     /// ```
482     /// use futures::prelude::*;
483     /// use futures::sync::mpsc;
484     ///
485     /// let (_tx, rx) = mpsc::channel::<i32>(1);
486     ///
487     /// let rx = rx.and_then(|result| {
488     ///     if result % 2 == 0 {
489     ///         Ok(result)
490     ///     } else {
491     ///         Err(())
492     ///     }
493     /// });
494     /// ```
and_then<F, U>(self, f: F) -> AndThen<Self, F, U> where F: FnMut(Self::Item) -> U, U: IntoFuture<Error = Self::Error>, Self: Sized495     fn and_then<F, U>(self, f: F) -> AndThen<Self, F, U>
496         where F: FnMut(Self::Item) -> U,
497               U: IntoFuture<Error = Self::Error>,
498               Self: Sized
499     {
500         and_then::new(self, f)
501     }
502 
503     /// Chain on a computation for when an error happens, passing the
504     /// erroneous result to the provided closure `f`.
505     ///
506     /// This function can be used to run a unit of work and attempt to recover from
507     /// an error if one happens. The closure provided will be yielded an error
508     /// when one appears, and the returned future will then be run to completion
509     /// to produce the next value on this stream.
510     ///
511     /// Any successful values produced by this stream will not be passed to the
512     /// closure, and will be passed through.
513     ///
514     /// The returned value of the closure must implement the `IntoFuture` trait
515     /// and can represent some more work to be done before the composed stream
516     /// is finished. Note that the `Result` type implements the `IntoFuture`
517     /// trait so it is possible to simply alter the `Result` yielded to the
518     /// closure and return it.
519     ///
520     /// Note that this function consumes the receiving stream and returns a
521     /// wrapped version of it.
or_else<F, U>(self, f: F) -> OrElse<Self, F, U> where F: FnMut(Self::Error) -> U, U: IntoFuture<Item = Self::Item>, Self: Sized522     fn or_else<F, U>(self, f: F) -> OrElse<Self, F, U>
523         where F: FnMut(Self::Error) -> U,
524               U: IntoFuture<Item = Self::Item>,
525               Self: Sized
526     {
527         or_else::new(self, f)
528     }
529 
530     /// Collect all of the values of this stream into a vector, returning a
531     /// future representing the result of that computation.
532     ///
533     /// This combinator will collect all successful results of this stream and
534     /// collect them into a `Vec<Self::Item>`. If an error happens then all
535     /// collected elements will be dropped and the error will be returned.
536     ///
537     /// The returned future will be resolved whenever an error happens or when
538     /// the stream returns `Ok(None)`.
539     ///
540     /// This method is only available when the `use_std` feature of this
541     /// library is activated, and it is activated by default.
542     ///
543     /// # Examples
544     ///
545     /// ```
546     /// use std::thread;
547     ///
548     /// use futures::prelude::*;
549     /// use futures::sync::mpsc;
550     ///
551     /// let (mut tx, rx) = mpsc::channel(1);
552     ///
553     /// thread::spawn(|| {
554     ///     for i in (0..5).rev() {
555     ///         tx = tx.send(i + 1).wait().unwrap();
556     ///     }
557     /// });
558     ///
559     /// let mut result = rx.collect();
560     /// assert_eq!(result.wait(), Ok(vec![5, 4, 3, 2, 1]));
561     /// ```
562     #[cfg(feature = "use_std")]
collect(self) -> Collect<Self> where Self: Sized563     fn collect(self) -> Collect<Self>
564         where Self: Sized
565     {
566         collect::new(self)
567     }
568 
569     /// Concatenate all results of a stream into a single extendable
570     /// destination, returning a future representing the end result.
571     ///
572     /// This combinator will extend the first item with the contents
573     /// of all the successful results of the stream. If the stream is
574     /// empty, the default value will be returned. If an error occurs,
575     /// all the results will be dropped and the error will be returned.
576     ///
577     /// The name `concat2` is an intermediate measure until the release of
578     /// futures 0.2, at which point it will be renamed back to `concat`.
579     ///
580     /// # Examples
581     ///
582     /// ```
583     /// use std::thread;
584     ///
585     /// use futures::prelude::*;
586     /// use futures::sync::mpsc;
587     ///
588     /// let (mut tx, rx) = mpsc::channel(1);
589     ///
590     /// thread::spawn(move || {
591     ///     for i in (0..3).rev() {
592     ///         let n = i * 3;
593     ///         tx = tx.send(vec![n + 1, n + 2, n + 3]).wait().unwrap();
594     ///     }
595     /// });
596     /// let result = rx.concat2();
597     /// assert_eq!(result.wait(), Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
598     /// ```
concat2(self) -> Concat2<Self> where Self: Sized, Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,599     fn concat2(self) -> Concat2<Self>
600         where Self: Sized,
601               Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
602     {
603         concat::new2(self)
604     }
605 
606     /// Concatenate all results of a stream into a single extendable
607     /// destination, returning a future representing the end result.
608     ///
609     /// This combinator will extend the first item with the contents
610     /// of all the successful results of the stream. If an error occurs,
611     /// all the results will be dropped and the error will be returned.
612     ///
613     /// # Examples
614     ///
615     /// ```
616     /// use std::thread;
617     ///
618     /// use futures::prelude::*;
619     /// use futures::sync::mpsc;
620     ///
621     /// let (mut tx, rx) = mpsc::channel(1);
622     ///
623     /// thread::spawn(move || {
624     ///     for i in (0..3).rev() {
625     ///         let n = i * 3;
626     ///         tx = tx.send(vec![n + 1, n + 2, n + 3]).wait().unwrap();
627     ///     }
628     /// });
629     /// let result = rx.concat();
630     /// assert_eq!(result.wait(), Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
631     /// ```
632     ///
633     /// # Panics
634     ///
635     /// It's important to note that this function will panic if the stream
636     /// is empty, which is the reason for its deprecation.
637     #[deprecated(since="0.1.14", note="please use `Stream::concat2` instead")]
638     #[allow(deprecated)]
concat(self) -> Concat<Self> where Self: Sized, Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator,639     fn concat(self) -> Concat<Self>
640         where Self: Sized,
641               Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator,
642     {
643         concat::new(self)
644     }
645 
646     /// Execute an accumulating computation over a stream, collecting all the
647     /// values into one final result.
648     ///
649     /// This combinator will collect all successful results of this stream
650     /// according to the closure provided. The initial state is also provided to
651     /// this method and then is returned again by each execution of the closure.
652     /// Once the entire stream has been exhausted the returned future will
653     /// resolve to this value.
654     ///
655     /// If an error happens then collected state will be dropped and the error
656     /// will be returned.
657     ///
658     /// # Examples
659     ///
660     /// ```
661     /// use futures::prelude::*;
662     /// use futures::stream;
663     /// use futures::future;
664     ///
665     /// let number_stream = stream::iter_ok::<_, ()>(0..6);
666     /// let sum = number_stream.fold(0, |acc, x| future::ok(acc + x));
667     /// assert_eq!(sum.wait(), Ok(15));
668     /// ```
fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T> where F: FnMut(T, Self::Item) -> Fut, Fut: IntoFuture<Item = T>, Self::Error: From<Fut::Error>, Self: Sized669     fn fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T>
670         where F: FnMut(T, Self::Item) -> Fut,
671               Fut: IntoFuture<Item = T>,
672               Self::Error: From<Fut::Error>,
673               Self: Sized
674     {
675         fold::new(self, f, init)
676     }
677 
678     /// Flattens a stream of streams into just one continuous stream.
679     ///
680     /// If this stream's elements are themselves streams then this combinator
681     /// will flatten out the entire stream to one long chain of elements. Any
682     /// errors are passed through without looking at them, but otherwise each
683     /// individual stream will get exhausted before moving on to the next.
684     ///
685     /// ```
686     /// use std::thread;
687     ///
688     /// use futures::prelude::*;
689     /// use futures::sync::mpsc;
690     ///
691     /// let (tx1, rx1) = mpsc::channel::<i32>(1);
692     /// let (tx2, rx2) = mpsc::channel::<i32>(1);
693     /// let (tx3, rx3) = mpsc::channel(1);
694     ///
695     /// thread::spawn(|| {
696     ///     tx1.send(1).wait().unwrap()
697     ///        .send(2).wait().unwrap();
698     /// });
699     /// thread::spawn(|| {
700     ///     tx2.send(3).wait().unwrap()
701     ///        .send(4).wait().unwrap();
702     /// });
703     /// thread::spawn(|| {
704     ///     tx3.send(rx1).wait().unwrap()
705     ///        .send(rx2).wait().unwrap();
706     /// });
707     ///
708     /// let mut result = rx3.flatten().collect();
709     /// assert_eq!(result.wait(), Ok(vec![1, 2, 3, 4]));
710     /// ```
flatten(self) -> Flatten<Self> where Self::Item: Stream, <Self::Item as Stream>::Error: From<Self::Error>, Self: Sized711     fn flatten(self) -> Flatten<Self>
712         where Self::Item: Stream,
713               <Self::Item as Stream>::Error: From<Self::Error>,
714               Self: Sized
715     {
716         flatten::new(self)
717     }
718 
719     /// Skip elements on this stream while the predicate provided resolves to
720     /// `true`.
721     ///
722     /// This function, like `Iterator::skip_while`, will skip elements on the
723     /// stream until the `predicate` resolves to `false`. Once one element
724     /// returns false all future elements will be returned from the underlying
725     /// stream.
skip_while<P, R>(self, pred: P) -> SkipWhile<Self, P, R> where P: FnMut(&Self::Item) -> R, R: IntoFuture<Item=bool, Error=Self::Error>, Self: Sized726     fn skip_while<P, R>(self, pred: P) -> SkipWhile<Self, P, R>
727         where P: FnMut(&Self::Item) -> R,
728               R: IntoFuture<Item=bool, Error=Self::Error>,
729               Self: Sized
730     {
731         skip_while::new(self, pred)
732     }
733 
734     /// Take elements from this stream while the predicate provided resolves to
735     /// `true`.
736     ///
737     /// This function, like `Iterator::take_while`, will take elements from the
738     /// stream until the `predicate` resolves to `false`. Once one element
739     /// returns false it will always return that the stream is done.
take_while<P, R>(self, pred: P) -> TakeWhile<Self, P, R> where P: FnMut(&Self::Item) -> R, R: IntoFuture<Item=bool, Error=Self::Error>, Self: Sized740     fn take_while<P, R>(self, pred: P) -> TakeWhile<Self, P, R>
741         where P: FnMut(&Self::Item) -> R,
742               R: IntoFuture<Item=bool, Error=Self::Error>,
743               Self: Sized
744     {
745         take_while::new(self, pred)
746     }
747 
748     /// Runs this stream to completion, executing the provided closure for each
749     /// element on the stream.
750     ///
751     /// The closure provided will be called for each item this stream resolves
752     /// to successfully, producing a future. That future will then be executed
753     /// to completion before moving on to the next item.
754     ///
755     /// The returned value is a `Future` where the `Item` type is `()` and
756     /// errors are otherwise threaded through. Any error on the stream or in the
757     /// closure will cause iteration to be halted immediately and the future
758     /// will resolve to that error.
759     ///
760     /// To process each item in the stream and produce another stream instead
761     /// of a single future, use `and_then` instead.
for_each<F, U>(self, f: F) -> ForEach<Self, F, U> where F: FnMut(Self::Item) -> U, U: IntoFuture<Item=(), Error = Self::Error>, Self: Sized762     fn for_each<F, U>(self, f: F) -> ForEach<Self, F, U>
763         where F: FnMut(Self::Item) -> U,
764               U: IntoFuture<Item=(), Error = Self::Error>,
765               Self: Sized
766     {
767         for_each::new(self, f)
768     }
769 
770     /// Map this stream's error to any error implementing `From` for
771     /// this stream's `Error`, returning a new stream.
772     ///
773     /// This function does for streams what `try!` does for `Result`,
774     /// by letting the compiler infer the type of the resulting error.
775     /// Just as `map_err` above, this is useful for example to ensure
776     /// that streams have the same error type when used with
777     /// combinators.
778     ///
779     /// Note that this function consumes the receiving stream and returns a
780     /// wrapped version of it.
from_err<E: From<Self::Error>>(self) -> FromErr<Self, E> where Self: Sized,781     fn from_err<E: From<Self::Error>>(self) -> FromErr<Self, E>
782         where Self: Sized,
783     {
784         from_err::new(self)
785     }
786 
787     /// Creates a new stream of at most `amt` items of the underlying stream.
788     ///
789     /// Once `amt` items have been yielded from this stream then it will always
790     /// return that the stream is done.
791     ///
792     /// # Errors
793     ///
794     /// Any errors yielded from underlying stream, before the desired amount of
795     /// items is reached, are passed through and do not affect the total number
796     /// of items taken.
take(self, amt: u64) -> Take<Self> where Self: Sized797     fn take(self, amt: u64) -> Take<Self>
798         where Self: Sized
799     {
800         take::new(self, amt)
801     }
802 
803     /// Creates a new stream which skips `amt` items of the underlying stream.
804     ///
805     /// Once `amt` items have been skipped from this stream then it will always
806     /// return the remaining items on this stream.
807     ///
808     /// # Errors
809     ///
810     /// All errors yielded from underlying stream are passed through and do not
811     /// affect the total number of items skipped.
skip(self, amt: u64) -> Skip<Self> where Self: Sized812     fn skip(self, amt: u64) -> Skip<Self>
813         where Self: Sized
814     {
815         skip::new(self, amt)
816     }
817 
818     /// Fuse a stream such that `poll` will never again be called once it has
819     /// finished.
820     ///
821     /// Currently once a stream has returned `None` from `poll` any further
822     /// calls could exhibit bad behavior such as block forever, panic, never
823     /// return, etc. If it is known that `poll` may be called after stream has
824     /// already finished, then this method can be used to ensure that it has
825     /// defined semantics.
826     ///
827     /// Once a stream has been `fuse`d and it finishes, then it will forever
828     /// return `None` from `poll`. This, unlike for the traits `poll` method,
829     /// is guaranteed.
830     ///
831     /// Also note that as soon as this stream returns `None` it will be dropped
832     /// to reclaim resources associated with it.
fuse(self) -> Fuse<Self> where Self: Sized833     fn fuse(self) -> Fuse<Self>
834         where Self: Sized
835     {
836         fuse::new(self)
837     }
838 
839     /// Borrows a stream, rather than consuming it.
840     ///
841     /// This is useful to allow applying stream adaptors while still retaining
842     /// ownership of the original stream.
843     ///
844     /// ```
845     /// use futures::prelude::*;
846     /// use futures::stream;
847     /// use futures::future;
848     ///
849     /// let mut stream = stream::iter_ok::<_, ()>(1..5);
850     ///
851     /// let sum = stream.by_ref().take(2).fold(0, |a, b| future::ok(a + b)).wait();
852     /// assert_eq!(sum, Ok(3));
853     ///
854     /// // You can use the stream again
855     /// let sum = stream.take(2).fold(0, |a, b| future::ok(a + b)).wait();
856     /// assert_eq!(sum, Ok(7));
857     /// ```
by_ref(&mut self) -> &mut Self where Self: Sized858     fn by_ref(&mut self) -> &mut Self
859         where Self: Sized
860     {
861         self
862     }
863 
864     /// Catches unwinding panics while polling the stream.
865     ///
866     /// Caught panic (if any) will be the last element of the resulting stream.
867     ///
868     /// In general, panics within a stream can propagate all the way out to the
869     /// task level. This combinator makes it possible to halt unwinding within
870     /// the stream itself. It's most commonly used within task executors. This
871     /// method should not be used for error handling.
872     ///
873     /// Note that this method requires the `UnwindSafe` bound from the standard
874     /// library. This isn't always applied automatically, and the standard
875     /// library provides an `AssertUnwindSafe` wrapper type to apply it
876     /// after-the fact. To assist using this method, the `Stream` trait is also
877     /// implemented for `AssertUnwindSafe<S>` where `S` implements `Stream`.
878     ///
879     /// This method is only available when the `use_std` feature of this
880     /// library is activated, and it is activated by default.
881     ///
882     /// # Examples
883     ///
884     /// ```rust
885     /// use futures::prelude::*;
886     /// use futures::stream;
887     ///
888     /// let stream = stream::iter_ok::<_, bool>(vec![Some(10), None, Some(11)]);
889     /// // panic on second element
890     /// let stream_panicking = stream.map(|o| o.unwrap());
891     /// let mut iter = stream_panicking.catch_unwind().wait();
892     ///
893     /// assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap());
894     /// assert!(iter.next().unwrap().is_err());
895     /// assert!(iter.next().is_none());
896     /// ```
897     #[cfg(feature = "use_std")]
catch_unwind(self) -> CatchUnwind<Self> where Self: Sized + std::panic::UnwindSafe898     fn catch_unwind(self) -> CatchUnwind<Self>
899         where Self: Sized + std::panic::UnwindSafe
900     {
901         catch_unwind::new(self)
902     }
903 
904     /// An adaptor for creating a buffered list of pending futures.
905     ///
906     /// If this stream's item can be converted into a future, then this adaptor
907     /// will buffer up to at most `amt` futures and then return results in the
908     /// same order as the underlying stream. No more than `amt` futures will be
909     /// buffered at any point in time, and less than `amt` may also be buffered
910     /// depending on the state of each future.
911     ///
912     /// The returned stream will be a stream of each future's result, with
913     /// errors passed through whenever they occur.
914     ///
915     /// This method is only available when the `use_std` feature of this
916     /// library is activated, and it is activated by default.
917     #[cfg(feature = "use_std")]
buffered(self, amt: usize) -> Buffered<Self> where Self::Item: IntoFuture<Error = <Self as Stream>::Error>, Self: Sized918     fn buffered(self, amt: usize) -> Buffered<Self>
919         where Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
920               Self: Sized
921     {
922         buffered::new(self, amt)
923     }
924 
925     /// An adaptor for creating a buffered list of pending futures (unordered).
926     ///
927     /// If this stream's item can be converted into a future, then this adaptor
928     /// will buffer up to `amt` futures and then return results in the order
929     /// in which they complete. No more than `amt` futures will be buffered at
930     /// any point in time, and less than `amt` may also be buffered depending on
931     /// the state of each future.
932     ///
933     /// The returned stream will be a stream of each future's result, with
934     /// errors passed through whenever they occur.
935     ///
936     /// This method is only available when the `use_std` feature of this
937     /// library is activated, and it is activated by default.
938     #[cfg(feature = "use_std")]
buffer_unordered(self, amt: usize) -> BufferUnordered<Self> where Self::Item: IntoFuture<Error = <Self as Stream>::Error>, Self: Sized939     fn buffer_unordered(self, amt: usize) -> BufferUnordered<Self>
940         where Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
941               Self: Sized
942     {
943         buffer_unordered::new(self, amt)
944     }
945 
946     /// An adapter for merging the output of two streams.
947     ///
948     /// The merged stream produces items from one or both of the underlying
949     /// streams as they become available. Errors, however, are not merged: you
950     /// get at most one error at a time.
951     #[deprecated(note = "functionality provided by `select` now")]
952     #[allow(deprecated)]
merge<S>(self, other: S) -> Merge<Self, S> where S: Stream<Error = Self::Error>, Self: Sized,953     fn merge<S>(self, other: S) -> Merge<Self, S>
954         where S: Stream<Error = Self::Error>,
955               Self: Sized,
956     {
957         merge::new(self, other)
958     }
959 
960     /// An adapter for zipping two streams together.
961     ///
962     /// The zipped stream waits for both streams to produce an item, and then
963     /// returns that pair. If an error happens, then that error will be returned
964     /// immediately. If either stream ends then the zipped stream will also end.
zip<S>(self, other: S) -> Zip<Self, S> where S: Stream<Error = Self::Error>, Self: Sized,965     fn zip<S>(self, other: S) -> Zip<Self, S>
966         where S: Stream<Error = Self::Error>,
967               Self: Sized,
968     {
969         zip::new(self, other)
970     }
971 
972     /// Adapter for chaining two stream.
973     ///
974     /// The resulting stream emits elements from the first stream, and when
975     /// first stream reaches the end, emits the elements from the second stream.
976     ///
977     /// ```rust
978     /// use futures::prelude::*;
979     /// use futures::stream;
980     ///
981     /// let stream1 = stream::iter_result(vec![Ok(10), Err(false)]);
982     /// let stream2 = stream::iter_result(vec![Err(true), Ok(20)]);
983     /// let mut chain = stream1.chain(stream2).wait();
984     ///
985     /// assert_eq!(Some(Ok(10)), chain.next());
986     /// assert_eq!(Some(Err(false)), chain.next());
987     /// assert_eq!(Some(Err(true)), chain.next());
988     /// assert_eq!(Some(Ok(20)), chain.next());
989     /// assert_eq!(None, chain.next());
990     /// ```
chain<S>(self, other: S) -> Chain<Self, S> where S: Stream<Item = Self::Item, Error = Self::Error>, Self: Sized991     fn chain<S>(self, other: S) -> Chain<Self, S>
992         where S: Stream<Item = Self::Item, Error = Self::Error>,
993               Self: Sized
994     {
995         chain::new(self, other)
996     }
997 
998     /// Creates a new stream which exposes a `peek` method.
999     ///
1000     /// Calling `peek` returns a reference to the next item in the stream.
peekable(self) -> Peekable<Self> where Self: Sized1001     fn peekable(self) -> Peekable<Self>
1002         where Self: Sized
1003     {
1004         peek::new(self)
1005     }
1006 
1007     /// An adaptor for chunking up items of the stream inside a vector.
1008     ///
1009     /// This combinator will attempt to pull items from this stream and buffer
1010     /// them into a local vector. At most `capacity` items will get buffered
1011     /// before they're yielded from the returned stream.
1012     ///
1013     /// Note that the vectors returned from this iterator may not always have
1014     /// `capacity` elements. If the underlying stream ended and only a partial
1015     /// vector was created, it'll be returned. Additionally if an error happens
1016     /// from the underlying stream then the currently buffered items will be
1017     /// yielded.
1018     ///
1019     /// Errors are passed through the stream unbuffered.
1020     ///
1021     /// This method is only available when the `use_std` feature of this
1022     /// library is activated, and it is activated by default.
1023     ///
1024     /// # Panics
1025     ///
1026     /// This method will panic of `capacity` is zero.
1027     #[cfg(feature = "use_std")]
chunks(self, capacity: usize) -> Chunks<Self> where Self: Sized1028     fn chunks(self, capacity: usize) -> Chunks<Self>
1029         where Self: Sized
1030     {
1031         chunks::new(self, capacity)
1032     }
1033 
1034     /// Creates a stream that selects the next element from either this stream
1035     /// or the provided one, whichever is ready first.
1036     ///
1037     /// This combinator will attempt to pull items from both streams. Each
1038     /// stream will be polled in a round-robin fashion, and whenever a stream is
1039     /// ready to yield an item that item is yielded.
1040     ///
1041     /// The `select` function is similar to `merge` except that it requires both
1042     /// streams to have the same item and error types.
1043     ///
1044     /// Error are passed through from either stream.
select<S>(self, other: S) -> Select<Self, S> where S: Stream<Item = Self::Item, Error = Self::Error>, Self: Sized,1045     fn select<S>(self, other: S) -> Select<Self, S>
1046         where S: Stream<Item = Self::Item, Error = Self::Error>,
1047               Self: Sized,
1048     {
1049         select::new(self, other)
1050     }
1051 
1052     /// A future that completes after the given stream has been fully processed
1053     /// into the sink, including flushing.
1054     ///
1055     /// This future will drive the stream to keep producing items until it is
1056     /// exhausted, sending each item to the sink. It will complete once both the
1057     /// stream is exhausted, and the sink has fully processed received item,
1058     /// flushed successfully, and closed successfully.
1059     ///
1060     /// Doing `stream.forward(sink)` is roughly equivalent to
1061     /// `sink.send_all(stream)`. The returned future will exhaust all items from
1062     /// `self`, sending them all to `sink`. Furthermore the `sink` will be
1063     /// closed and flushed.
1064     ///
1065     /// On completion, the pair `(stream, sink)` is returned.
forward<S>(self, sink: S) -> Forward<Self, S> where S: Sink<SinkItem = Self::Item>, Self::Error: From<S::SinkError>, Self: Sized1066     fn forward<S>(self, sink: S) -> Forward<Self, S>
1067         where S: Sink<SinkItem = Self::Item>,
1068               Self::Error: From<S::SinkError>,
1069               Self: Sized
1070     {
1071         forward::new(self, sink)
1072     }
1073 
1074     /// Splits this `Stream + Sink` object into separate `Stream` and `Sink`
1075     /// objects.
1076     ///
1077     /// This can be useful when you want to split ownership between tasks, or
1078     /// allow direct interaction between the two objects (e.g. via
1079     /// `Sink::send_all`).
1080     ///
1081     /// This method is only available when the `use_std` feature of this
1082     /// library is activated, and it is activated by default.
1083     #[cfg(feature = "use_std")]
split(self) -> (SplitSink<Self>, SplitStream<Self>) where Self: super::sink::Sink + Sized1084     fn split(self) -> (SplitSink<Self>, SplitStream<Self>)
1085         where Self: super::sink::Sink + Sized
1086     {
1087         split::split(self)
1088     }
1089 
1090     /// Do something with each item of this stream, afterwards passing it on.
1091     ///
1092     /// This is similar to the `Iterator::inspect` method in the standard
1093     /// library where it allows easily inspecting each value as it passes
1094     /// through the stream, for example to debug what's going on.
inspect<F>(self, f: F) -> Inspect<Self, F> where F: FnMut(&Self::Item), Self: Sized,1095     fn inspect<F>(self, f: F) -> Inspect<Self, F>
1096         where F: FnMut(&Self::Item),
1097               Self: Sized,
1098     {
1099         inspect::new(self, f)
1100     }
1101 
1102     /// Do something with the error of this stream, afterwards passing it on.
1103     ///
1104     /// This is similar to the `Stream::inspect` method where it allows
1105     /// easily inspecting the error as it passes through the stream, for
1106     /// example to debug what's going on.
inspect_err<F>(self, f: F) -> InspectErr<Self, F> where F: FnMut(&Self::Error), Self: Sized,1107     fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
1108         where F: FnMut(&Self::Error),
1109               Self: Sized,
1110     {
1111         inspect_err::new(self, f)
1112     }
1113 }
1114 
1115 impl<'a, S: ?Sized + Stream> Stream for &'a mut S {
1116     type Item = S::Item;
1117     type Error = S::Error;
1118 
poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>1119     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
1120         (**self).poll()
1121     }
1122 }
1123 
1124 /// Converts a list of futures into a `Stream` of results from the futures.
1125 ///
1126 /// This function will take an list of futures (e.g. a vector, an iterator,
1127 /// etc), and return a stream. The stream will yield items as they become
1128 /// available on the futures internally, in the order that they become
1129 /// available. This function is similar to `buffer_unordered` in that it may
1130 /// return items in a different order than in the list specified.
1131 ///
1132 /// Note that the returned set can also be used to dynamically push more
1133 /// futures into the set as they become available.
1134 #[cfg(feature = "use_std")]
futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future> where I: IntoIterator, I::Item: IntoFuture1135 pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future>
1136     where I: IntoIterator,
1137         I::Item: IntoFuture
1138 {
1139     let mut set = FuturesUnordered::new();
1140 
1141     for future in futures {
1142         set.push(future.into_future());
1143     }
1144 
1145     return set
1146 }
1147