1 //! Asynchronous streams
2 //!
3 //! This module contains the `Stream` trait and a number of adaptors for this
4 //! trait. This trait is very similar to the `Iterator` trait in the standard
5 //! library except that it expresses the concept of blocking as well. A stream
6 //! here is a sequential sequence of values which may take some amount of time
7 //! in between to produce.
8 //!
9 //! A stream may request that it is blocked between values while the next value
10 //! is calculated, and provides a way to get notified once the next value is
11 //! ready as well.
12 //!
13 //! You can find more information/tutorials about streams [online at
14 //! https://tokio.rs][online]
15 //!
16 //! [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/
17
18 use {IntoFuture, Poll};
19
20 mod iter;
21 #[allow(deprecated)]
22 pub use self::iter::{iter, Iter};
23 #[cfg(feature = "with-deprecated")]
24 #[allow(deprecated)]
25 pub use self::Iter as IterStream;
26 mod iter_ok;
27 pub use self::iter_ok::{iter_ok, IterOk};
28 mod iter_result;
29 pub use self::iter_result::{iter_result, IterResult};
30
31 mod repeat;
32 pub use self::repeat::{repeat, Repeat};
33
34 mod and_then;
35 mod chain;
36 mod concat;
37 mod empty;
38 mod filter;
39 mod filter_map;
40 mod flatten;
41 mod fold;
42 mod for_each;
43 mod from_err;
44 mod fuse;
45 mod future;
46 mod inspect;
47 mod inspect_err;
48 mod map;
49 mod map_err;
50 mod merge;
51 mod once;
52 mod or_else;
53 mod peek;
54 mod poll_fn;
55 mod select;
56 mod skip;
57 mod skip_while;
58 mod take;
59 mod take_while;
60 mod then;
61 mod unfold;
62 mod zip;
63 mod forward;
64 pub use self::and_then::AndThen;
65 pub use self::chain::Chain;
66 #[allow(deprecated)]
67 pub use self::concat::Concat;
68 pub use self::concat::Concat2;
69 pub use self::empty::{Empty, empty};
70 pub use self::filter::Filter;
71 pub use self::filter_map::FilterMap;
72 pub use self::flatten::Flatten;
73 pub use self::fold::Fold;
74 pub use self::for_each::ForEach;
75 pub use self::from_err::FromErr;
76 pub use self::fuse::Fuse;
77 pub use self::future::StreamFuture;
78 pub use self::inspect::Inspect;
79 pub use self::inspect_err::InspectErr;
80 pub use self::map::Map;
81 pub use self::map_err::MapErr;
82 #[allow(deprecated)]
83 pub use self::merge::{Merge, MergedItem};
84 pub use self::once::{Once, once};
85 pub use self::or_else::OrElse;
86 pub use self::peek::Peekable;
87 pub use self::poll_fn::{poll_fn, PollFn};
88 pub use self::select::Select;
89 pub use self::skip::Skip;
90 pub use self::skip_while::SkipWhile;
91 pub use self::take::Take;
92 pub use self::take_while::TakeWhile;
93 pub use self::then::Then;
94 pub use self::unfold::{Unfold, unfold};
95 pub use self::zip::Zip;
96 pub use self::forward::Forward;
97 use sink::{Sink};
98
99 if_std! {
100 use std;
101
102 mod buffered;
103 mod buffer_unordered;
104 mod catch_unwind;
105 mod chunks;
106 mod collect;
107 mod wait;
108 mod channel;
109 mod split;
110 pub mod futures_unordered;
111 mod futures_ordered;
112 pub use self::buffered::Buffered;
113 pub use self::buffer_unordered::BufferUnordered;
114 pub use self::catch_unwind::CatchUnwind;
115 pub use self::chunks::Chunks;
116 pub use self::collect::Collect;
117 pub use self::wait::Wait;
118 pub use self::split::{SplitStream, SplitSink, ReuniteError};
119 pub use self::futures_unordered::FuturesUnordered;
120 pub use self::futures_ordered::{futures_ordered, FuturesOrdered};
121
122 #[doc(hidden)]
123 #[cfg(feature = "with-deprecated")]
124 #[allow(deprecated)]
125 pub use self::channel::{channel, Sender, Receiver, FutureSender, SendError};
126
127 /// A type alias for `Box<Stream + Send>`
128 #[doc(hidden)]
129 #[deprecated(note = "removed without replacement, recommended to use a \
130 local extension trait or function if needed, more \
131 details in https://github.com/rust-lang-nursery/futures-rs/issues/228")]
132 pub type BoxStream<T, E> = ::std::boxed::Box<Stream<Item = T, Error = E> + Send>;
133
134 impl<S: ?Sized + Stream> Stream for ::std::boxed::Box<S> {
135 type Item = S::Item;
136 type Error = S::Error;
137
138 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
139 (**self).poll()
140 }
141 }
142 }
143
144 /// A stream of values, not all of which may have been produced yet.
145 ///
146 /// `Stream` is a trait to represent any source of sequential events or items
147 /// which acts like an iterator but long periods of time may pass between
148 /// items. Like `Future` the methods of `Stream` never block and it is thus
149 /// suitable for programming in an asynchronous fashion. This trait is very
150 /// similar to the `Iterator` trait in the standard library where `Some` is
151 /// used to signal elements of the stream and `None` is used to indicate that
152 /// the stream is finished.
153 ///
154 /// Like futures a stream has basic combinators to transform the stream, perform
155 /// more work on each item, etc.
156 ///
157 /// You can find more information/tutorials about streams [online at
158 /// https://tokio.rs][online]
159 ///
160 /// [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/
161 ///
162 /// # Streams as Futures
163 ///
164 /// Any instance of `Stream` can also be viewed as a `Future` where the resolved
165 /// value is the next item in the stream along with the rest of the stream. The
166 /// `into_future` adaptor can be used here to convert any stream into a future
167 /// for use with other future methods like `join` and `select`.
168 ///
169 /// # Errors
170 ///
171 /// Streams, like futures, can also model errors in their computation. All
172 /// streams have an associated `Error` type like with futures. Currently as of
173 /// the 0.1 release of this library an error on a stream **does not terminate
174 /// the stream**. That is, after one error is received, another error may be
175 /// received from the same stream (it's valid to keep polling).
176 ///
177 /// This property of streams, however, is [being considered] for change in 0.2
178 /// where an error on a stream is similar to `None`, it terminates the stream
179 /// entirely. If one of these use cases suits you perfectly and not the other,
180 /// please feel welcome to comment on [the issue][being considered]!
181 ///
182 /// [being considered]: https://github.com/rust-lang-nursery/futures-rs/issues/206
183 #[must_use = "streams do nothing unless polled"]
184 pub trait Stream {
185 /// The type of item this stream will yield on success.
186 type Item;
187
188 /// The type of error this stream may generate.
189 type Error;
190
191 /// Attempt to pull out the next value of this stream, returning `None` if
192 /// the stream is finished.
193 ///
194 /// This method, like `Future::poll`, is the sole method of pulling out a
195 /// value from a stream. This method must also be run within the context of
196 /// a task typically and implementors of this trait must ensure that
197 /// implementations of this method do not block, as it may cause consumers
198 /// to behave badly.
199 ///
200 /// # Return value
201 ///
202 /// If `NotReady` is returned then this stream's next value is not ready
203 /// yet and implementations will ensure that the current task will be
204 /// notified when the next value may be ready. If `Some` is returned then
205 /// the returned value represents the next value on the stream. `Err`
206 /// indicates an error happened, while `Ok` indicates whether there was a
207 /// new item on the stream or whether the stream has terminated.
208 ///
209 /// # Panics
210 ///
211 /// Once a stream is finished, that is `Ready(None)` has been returned,
212 /// further calls to `poll` may result in a panic or other "bad behavior".
213 /// If this is difficult to guard against then the `fuse` adapter can be
214 /// used to ensure that `poll` always has well-defined semantics.
215 // TODO: more here
poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>216 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
217
218 // TODO: should there also be a method like `poll` but doesn't return an
219 // item? basically just says "please make more progress internally"
220 // seems crucial for buffering to actually make any sense.
221
222 /// Creates an iterator which blocks the current thread until each item of
223 /// this stream is resolved.
224 ///
225 /// This method will consume ownership of this stream, returning an
226 /// implementation of a standard iterator. This iterator will *block the
227 /// current thread* on each call to `next` if the item in the stream isn't
228 /// ready yet.
229 ///
230 /// > **Note:** This method is not appropriate to call on event loops or
231 /// > similar I/O situations because it will prevent the event
232 /// > loop from making progress (this blocks the thread). This
233 /// > method should only be called when it's guaranteed that the
234 /// > blocking work associated with this stream will be completed
235 /// > by another thread.
236 ///
237 /// This method is only available when the `use_std` feature of this
238 /// library is activated, and it is activated by default.
239 ///
240 /// # Panics
241 ///
242 /// The returned iterator does not attempt to catch panics. If the `poll`
243 /// function panics, panics will be propagated to the caller of `next`.
244 #[cfg(feature = "use_std")]
wait(self) -> Wait<Self> where Self: Sized245 fn wait(self) -> Wait<Self>
246 where Self: Sized
247 {
248 wait::new(self)
249 }
250
251 /// Convenience function for turning this stream into a trait object.
252 ///
253 /// This simply avoids the need to write `Box::new` and can often help with
254 /// type inference as well by always returning a trait object. Note that
255 /// this method requires the `Send` bound and returns a `BoxStream`, which
256 /// also encodes this. If you'd like to create a `Box<Stream>` without the
257 /// `Send` bound, then the `Box::new` function can be used instead.
258 ///
259 /// This method is only available when the `use_std` feature of this
260 /// library is activated, and it is activated by default.
261 ///
262 /// # Examples
263 ///
264 /// ```
265 /// use futures::stream::*;
266 /// use futures::sync::mpsc;
267 ///
268 /// let (_tx, rx) = mpsc::channel(1);
269 /// let a: BoxStream<i32, ()> = rx.boxed();
270 /// ```
271 #[cfg(feature = "use_std")]
272 #[doc(hidden)]
273 #[deprecated(note = "removed without replacement, recommended to use a \
274 local extension trait or function if needed, more \
275 details in https://github.com/rust-lang-nursery/futures-rs/issues/228")]
276 #[allow(deprecated)]
boxed(self) -> BoxStream<Self::Item, Self::Error> where Self: Sized + Send + 'static,277 fn boxed(self) -> BoxStream<Self::Item, Self::Error>
278 where Self: Sized + Send + 'static,
279 {
280 ::std::boxed::Box::new(self)
281 }
282
283 /// Converts this stream into a `Future`.
284 ///
285 /// A stream can be viewed as a future which will resolve to a pair containing
286 /// the next element of the stream plus the remaining stream. If the stream
287 /// terminates, then the next element is `None` and the remaining stream is
288 /// still passed back, to allow reclamation of its resources.
289 ///
290 /// The returned future can be used to compose streams and futures together by
291 /// placing everything into the "world of futures".
into_future(self) -> StreamFuture<Self> where Self: Sized292 fn into_future(self) -> StreamFuture<Self>
293 where Self: Sized
294 {
295 future::new(self)
296 }
297
298 /// Converts a stream of type `T` to a stream of type `U`.
299 ///
300 /// The provided closure is executed over all elements of this stream as
301 /// they are made available, and the callback will be executed inline with
302 /// calls to `poll`.
303 ///
304 /// Note that this function consumes the receiving stream and returns a
305 /// wrapped version of it, similar to the existing `map` methods in the
306 /// standard library.
307 ///
308 /// # Examples
309 ///
310 /// ```
311 /// use futures::prelude::*;
312 /// use futures::sync::mpsc;
313 ///
314 /// let (_tx, rx) = mpsc::channel::<i32>(1);
315 /// let rx = rx.map(|x| x + 3);
316 /// ```
map<U, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> U, Self: Sized317 fn map<U, F>(self, f: F) -> Map<Self, F>
318 where F: FnMut(Self::Item) -> U,
319 Self: Sized
320 {
321 map::new(self, f)
322 }
323
324 /// Converts a stream of error type `T` to a stream of error type `U`.
325 ///
326 /// The provided closure is executed over all errors of this stream as
327 /// they are made available, and the callback will be executed inline with
328 /// calls to `poll`.
329 ///
330 /// Note that this function consumes the receiving stream and returns a
331 /// wrapped version of it, similar to the existing `map_err` methods in the
332 /// standard library.
333 ///
334 /// # Examples
335 ///
336 /// ```
337 /// use futures::prelude::*;
338 /// use futures::sync::mpsc;
339 ///
340 /// let (_tx, rx) = mpsc::channel::<i32>(1);
341 /// let rx = rx.map_err(|()| 3);
342 /// ```
map_err<U, F>(self, f: F) -> MapErr<Self, F> where F: FnMut(Self::Error) -> U, Self: Sized343 fn map_err<U, F>(self, f: F) -> MapErr<Self, F>
344 where F: FnMut(Self::Error) -> U,
345 Self: Sized
346 {
347 map_err::new(self, f)
348 }
349
350 /// Filters the values produced by this stream according to the provided
351 /// predicate.
352 ///
353 /// As values of this stream are made available, the provided predicate will
354 /// be run against them. If the predicate returns `true` then the stream
355 /// will yield the value, but if the predicate returns `false` then the
356 /// value will be discarded and the next value will be produced.
357 ///
358 /// All errors are passed through without filtering in this combinator.
359 ///
360 /// Note that this function consumes the receiving stream and returns a
361 /// wrapped version of it, similar to the existing `filter` methods in the
362 /// standard library.
363 ///
364 /// # Examples
365 ///
366 /// ```
367 /// use futures::prelude::*;
368 /// use futures::sync::mpsc;
369 ///
370 /// let (_tx, rx) = mpsc::channel::<i32>(1);
371 /// let evens = rx.filter(|x| x % 2 == 0);
372 /// ```
filter<F>(self, f: F) -> Filter<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized373 fn filter<F>(self, f: F) -> Filter<Self, F>
374 where F: FnMut(&Self::Item) -> bool,
375 Self: Sized
376 {
377 filter::new(self, f)
378 }
379
380 /// Filters the values produced by this stream while simultaneously mapping
381 /// them to a different type.
382 ///
383 /// As values of this stream are made available, the provided function will
384 /// be run on them. If the predicate returns `Some(e)` then the stream will
385 /// yield the value `e`, but if the predicate returns `None` then the next
386 /// value will be produced.
387 ///
388 /// All errors are passed through without filtering in this combinator.
389 ///
390 /// Note that this function consumes the receiving stream and returns a
391 /// wrapped version of it, similar to the existing `filter_map` methods in the
392 /// standard library.
393 ///
394 /// # Examples
395 ///
396 /// ```
397 /// use futures::prelude::*;
398 /// use futures::sync::mpsc;
399 ///
400 /// let (_tx, rx) = mpsc::channel::<i32>(1);
401 /// let evens_plus_one = rx.filter_map(|x| {
402 /// if x % 0 == 2 {
403 /// Some(x + 1)
404 /// } else {
405 /// None
406 /// }
407 /// });
408 /// ```
filter_map<F, B>(self, f: F) -> FilterMap<Self, F> where F: FnMut(Self::Item) -> Option<B>, Self: Sized409 fn filter_map<F, B>(self, f: F) -> FilterMap<Self, F>
410 where F: FnMut(Self::Item) -> Option<B>,
411 Self: Sized
412 {
413 filter_map::new(self, f)
414 }
415
416 /// Chain on a computation for when a value is ready, passing the resulting
417 /// item to the provided closure `f`.
418 ///
419 /// This function can be used to ensure a computation runs regardless of
420 /// the next value on the stream. The closure provided will be yielded a
421 /// `Result` once a value is ready, and the returned future will then be run
422 /// to completion to produce the next value on this stream.
423 ///
424 /// The returned value of the closure must implement the `IntoFuture` trait
425 /// and can represent some more work to be done before the composed stream
426 /// is finished. Note that the `Result` type implements the `IntoFuture`
427 /// trait so it is possible to simply alter the `Result` yielded to the
428 /// closure and return it.
429 ///
430 /// Note that this function consumes the receiving stream and returns a
431 /// wrapped version of it.
432 ///
433 /// # Examples
434 ///
435 /// ```
436 /// use futures::prelude::*;
437 /// use futures::sync::mpsc;
438 ///
439 /// let (_tx, rx) = mpsc::channel::<i32>(1);
440 ///
441 /// let rx = rx.then(|result| {
442 /// match result {
443 /// Ok(e) => Ok(e + 3),
444 /// Err(()) => Err(4),
445 /// }
446 /// });
447 /// ```
then<F, U>(self, f: F) -> Then<Self, F, U> where F: FnMut(Result<Self::Item, Self::Error>) -> U, U: IntoFuture, Self: Sized448 fn then<F, U>(self, f: F) -> Then<Self, F, U>
449 where F: FnMut(Result<Self::Item, Self::Error>) -> U,
450 U: IntoFuture,
451 Self: Sized
452 {
453 then::new(self, f)
454 }
455
456 /// Chain on a computation for when a value is ready, passing the successful
457 /// results to the provided closure `f`.
458 ///
459 /// This function can be used to run a unit of work when the next successful
460 /// value on a stream is ready. The closure provided will be yielded a value
461 /// when ready, and the returned future will then be run to completion to
462 /// produce the next value on this stream.
463 ///
464 /// Any errors produced by this stream will not be passed to the closure,
465 /// and will be passed through.
466 ///
467 /// The returned value of the closure must implement the `IntoFuture` trait
468 /// and can represent some more work to be done before the composed stream
469 /// is finished. Note that the `Result` type implements the `IntoFuture`
470 /// trait so it is possible to simply alter the `Result` yielded to the
471 /// closure and return it.
472 ///
473 /// Note that this function consumes the receiving stream and returns a
474 /// wrapped version of it.
475 ///
476 /// To process the entire stream and return a single future representing
477 /// success or error, use `for_each` instead.
478 ///
479 /// # Examples
480 ///
481 /// ```
482 /// use futures::prelude::*;
483 /// use futures::sync::mpsc;
484 ///
485 /// let (_tx, rx) = mpsc::channel::<i32>(1);
486 ///
487 /// let rx = rx.and_then(|result| {
488 /// if result % 2 == 0 {
489 /// Ok(result)
490 /// } else {
491 /// Err(())
492 /// }
493 /// });
494 /// ```
and_then<F, U>(self, f: F) -> AndThen<Self, F, U> where F: FnMut(Self::Item) -> U, U: IntoFuture<Error = Self::Error>, Self: Sized495 fn and_then<F, U>(self, f: F) -> AndThen<Self, F, U>
496 where F: FnMut(Self::Item) -> U,
497 U: IntoFuture<Error = Self::Error>,
498 Self: Sized
499 {
500 and_then::new(self, f)
501 }
502
503 /// Chain on a computation for when an error happens, passing the
504 /// erroneous result to the provided closure `f`.
505 ///
506 /// This function can be used to run a unit of work and attempt to recover from
507 /// an error if one happens. The closure provided will be yielded an error
508 /// when one appears, and the returned future will then be run to completion
509 /// to produce the next value on this stream.
510 ///
511 /// Any successful values produced by this stream will not be passed to the
512 /// closure, and will be passed through.
513 ///
514 /// The returned value of the closure must implement the `IntoFuture` trait
515 /// and can represent some more work to be done before the composed stream
516 /// is finished. Note that the `Result` type implements the `IntoFuture`
517 /// trait so it is possible to simply alter the `Result` yielded to the
518 /// closure and return it.
519 ///
520 /// Note that this function consumes the receiving stream and returns a
521 /// wrapped version of it.
or_else<F, U>(self, f: F) -> OrElse<Self, F, U> where F: FnMut(Self::Error) -> U, U: IntoFuture<Item = Self::Item>, Self: Sized522 fn or_else<F, U>(self, f: F) -> OrElse<Self, F, U>
523 where F: FnMut(Self::Error) -> U,
524 U: IntoFuture<Item = Self::Item>,
525 Self: Sized
526 {
527 or_else::new(self, f)
528 }
529
530 /// Collect all of the values of this stream into a vector, returning a
531 /// future representing the result of that computation.
532 ///
533 /// This combinator will collect all successful results of this stream and
534 /// collect them into a `Vec<Self::Item>`. If an error happens then all
535 /// collected elements will be dropped and the error will be returned.
536 ///
537 /// The returned future will be resolved whenever an error happens or when
538 /// the stream returns `Ok(None)`.
539 ///
540 /// This method is only available when the `use_std` feature of this
541 /// library is activated, and it is activated by default.
542 ///
543 /// # Examples
544 ///
545 /// ```
546 /// use std::thread;
547 ///
548 /// use futures::prelude::*;
549 /// use futures::sync::mpsc;
550 ///
551 /// let (mut tx, rx) = mpsc::channel(1);
552 ///
553 /// thread::spawn(|| {
554 /// for i in (0..5).rev() {
555 /// tx = tx.send(i + 1).wait().unwrap();
556 /// }
557 /// });
558 ///
559 /// let mut result = rx.collect();
560 /// assert_eq!(result.wait(), Ok(vec![5, 4, 3, 2, 1]));
561 /// ```
562 #[cfg(feature = "use_std")]
collect(self) -> Collect<Self> where Self: Sized563 fn collect(self) -> Collect<Self>
564 where Self: Sized
565 {
566 collect::new(self)
567 }
568
569 /// Concatenate all results of a stream into a single extendable
570 /// destination, returning a future representing the end result.
571 ///
572 /// This combinator will extend the first item with the contents
573 /// of all the successful results of the stream. If the stream is
574 /// empty, the default value will be returned. If an error occurs,
575 /// all the results will be dropped and the error will be returned.
576 ///
577 /// The name `concat2` is an intermediate measure until the release of
578 /// futures 0.2, at which point it will be renamed back to `concat`.
579 ///
580 /// # Examples
581 ///
582 /// ```
583 /// use std::thread;
584 ///
585 /// use futures::prelude::*;
586 /// use futures::sync::mpsc;
587 ///
588 /// let (mut tx, rx) = mpsc::channel(1);
589 ///
590 /// thread::spawn(move || {
591 /// for i in (0..3).rev() {
592 /// let n = i * 3;
593 /// tx = tx.send(vec![n + 1, n + 2, n + 3]).wait().unwrap();
594 /// }
595 /// });
596 /// let result = rx.concat2();
597 /// assert_eq!(result.wait(), Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
598 /// ```
concat2(self) -> Concat2<Self> where Self: Sized, Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,599 fn concat2(self) -> Concat2<Self>
600 where Self: Sized,
601 Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
602 {
603 concat::new2(self)
604 }
605
606 /// Concatenate all results of a stream into a single extendable
607 /// destination, returning a future representing the end result.
608 ///
609 /// This combinator will extend the first item with the contents
610 /// of all the successful results of the stream. If an error occurs,
611 /// all the results will be dropped and the error will be returned.
612 ///
613 /// # Examples
614 ///
615 /// ```
616 /// use std::thread;
617 ///
618 /// use futures::prelude::*;
619 /// use futures::sync::mpsc;
620 ///
621 /// let (mut tx, rx) = mpsc::channel(1);
622 ///
623 /// thread::spawn(move || {
624 /// for i in (0..3).rev() {
625 /// let n = i * 3;
626 /// tx = tx.send(vec![n + 1, n + 2, n + 3]).wait().unwrap();
627 /// }
628 /// });
629 /// let result = rx.concat();
630 /// assert_eq!(result.wait(), Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
631 /// ```
632 ///
633 /// # Panics
634 ///
635 /// It's important to note that this function will panic if the stream
636 /// is empty, which is the reason for its deprecation.
637 #[deprecated(since="0.1.14", note="please use `Stream::concat2` instead")]
638 #[allow(deprecated)]
concat(self) -> Concat<Self> where Self: Sized, Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator,639 fn concat(self) -> Concat<Self>
640 where Self: Sized,
641 Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator,
642 {
643 concat::new(self)
644 }
645
646 /// Execute an accumulating computation over a stream, collecting all the
647 /// values into one final result.
648 ///
649 /// This combinator will collect all successful results of this stream
650 /// according to the closure provided. The initial state is also provided to
651 /// this method and then is returned again by each execution of the closure.
652 /// Once the entire stream has been exhausted the returned future will
653 /// resolve to this value.
654 ///
655 /// If an error happens then collected state will be dropped and the error
656 /// will be returned.
657 ///
658 /// # Examples
659 ///
660 /// ```
661 /// use futures::prelude::*;
662 /// use futures::stream;
663 /// use futures::future;
664 ///
665 /// let number_stream = stream::iter_ok::<_, ()>(0..6);
666 /// let sum = number_stream.fold(0, |acc, x| future::ok(acc + x));
667 /// assert_eq!(sum.wait(), Ok(15));
668 /// ```
fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T> where F: FnMut(T, Self::Item) -> Fut, Fut: IntoFuture<Item = T>, Self::Error: From<Fut::Error>, Self: Sized669 fn fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T>
670 where F: FnMut(T, Self::Item) -> Fut,
671 Fut: IntoFuture<Item = T>,
672 Self::Error: From<Fut::Error>,
673 Self: Sized
674 {
675 fold::new(self, f, init)
676 }
677
678 /// Flattens a stream of streams into just one continuous stream.
679 ///
680 /// If this stream's elements are themselves streams then this combinator
681 /// will flatten out the entire stream to one long chain of elements. Any
682 /// errors are passed through without looking at them, but otherwise each
683 /// individual stream will get exhausted before moving on to the next.
684 ///
685 /// ```
686 /// use std::thread;
687 ///
688 /// use futures::prelude::*;
689 /// use futures::sync::mpsc;
690 ///
691 /// let (tx1, rx1) = mpsc::channel::<i32>(1);
692 /// let (tx2, rx2) = mpsc::channel::<i32>(1);
693 /// let (tx3, rx3) = mpsc::channel(1);
694 ///
695 /// thread::spawn(|| {
696 /// tx1.send(1).wait().unwrap()
697 /// .send(2).wait().unwrap();
698 /// });
699 /// thread::spawn(|| {
700 /// tx2.send(3).wait().unwrap()
701 /// .send(4).wait().unwrap();
702 /// });
703 /// thread::spawn(|| {
704 /// tx3.send(rx1).wait().unwrap()
705 /// .send(rx2).wait().unwrap();
706 /// });
707 ///
708 /// let mut result = rx3.flatten().collect();
709 /// assert_eq!(result.wait(), Ok(vec![1, 2, 3, 4]));
710 /// ```
flatten(self) -> Flatten<Self> where Self::Item: Stream, <Self::Item as Stream>::Error: From<Self::Error>, Self: Sized711 fn flatten(self) -> Flatten<Self>
712 where Self::Item: Stream,
713 <Self::Item as Stream>::Error: From<Self::Error>,
714 Self: Sized
715 {
716 flatten::new(self)
717 }
718
719 /// Skip elements on this stream while the predicate provided resolves to
720 /// `true`.
721 ///
722 /// This function, like `Iterator::skip_while`, will skip elements on the
723 /// stream until the `predicate` resolves to `false`. Once one element
724 /// returns false all future elements will be returned from the underlying
725 /// stream.
skip_while<P, R>(self, pred: P) -> SkipWhile<Self, P, R> where P: FnMut(&Self::Item) -> R, R: IntoFuture<Item=bool, Error=Self::Error>, Self: Sized726 fn skip_while<P, R>(self, pred: P) -> SkipWhile<Self, P, R>
727 where P: FnMut(&Self::Item) -> R,
728 R: IntoFuture<Item=bool, Error=Self::Error>,
729 Self: Sized
730 {
731 skip_while::new(self, pred)
732 }
733
734 /// Take elements from this stream while the predicate provided resolves to
735 /// `true`.
736 ///
737 /// This function, like `Iterator::take_while`, will take elements from the
738 /// stream until the `predicate` resolves to `false`. Once one element
739 /// returns false it will always return that the stream is done.
take_while<P, R>(self, pred: P) -> TakeWhile<Self, P, R> where P: FnMut(&Self::Item) -> R, R: IntoFuture<Item=bool, Error=Self::Error>, Self: Sized740 fn take_while<P, R>(self, pred: P) -> TakeWhile<Self, P, R>
741 where P: FnMut(&Self::Item) -> R,
742 R: IntoFuture<Item=bool, Error=Self::Error>,
743 Self: Sized
744 {
745 take_while::new(self, pred)
746 }
747
748 /// Runs this stream to completion, executing the provided closure for each
749 /// element on the stream.
750 ///
751 /// The closure provided will be called for each item this stream resolves
752 /// to successfully, producing a future. That future will then be executed
753 /// to completion before moving on to the next item.
754 ///
755 /// The returned value is a `Future` where the `Item` type is `()` and
756 /// errors are otherwise threaded through. Any error on the stream or in the
757 /// closure will cause iteration to be halted immediately and the future
758 /// will resolve to that error.
759 ///
760 /// To process each item in the stream and produce another stream instead
761 /// of a single future, use `and_then` instead.
for_each<F, U>(self, f: F) -> ForEach<Self, F, U> where F: FnMut(Self::Item) -> U, U: IntoFuture<Item=(), Error = Self::Error>, Self: Sized762 fn for_each<F, U>(self, f: F) -> ForEach<Self, F, U>
763 where F: FnMut(Self::Item) -> U,
764 U: IntoFuture<Item=(), Error = Self::Error>,
765 Self: Sized
766 {
767 for_each::new(self, f)
768 }
769
770 /// Map this stream's error to any error implementing `From` for
771 /// this stream's `Error`, returning a new stream.
772 ///
773 /// This function does for streams what `try!` does for `Result`,
774 /// by letting the compiler infer the type of the resulting error.
775 /// Just as `map_err` above, this is useful for example to ensure
776 /// that streams have the same error type when used with
777 /// combinators.
778 ///
779 /// Note that this function consumes the receiving stream and returns a
780 /// wrapped version of it.
from_err<E: From<Self::Error>>(self) -> FromErr<Self, E> where Self: Sized,781 fn from_err<E: From<Self::Error>>(self) -> FromErr<Self, E>
782 where Self: Sized,
783 {
784 from_err::new(self)
785 }
786
787 /// Creates a new stream of at most `amt` items of the underlying stream.
788 ///
789 /// Once `amt` items have been yielded from this stream then it will always
790 /// return that the stream is done.
791 ///
792 /// # Errors
793 ///
794 /// Any errors yielded from underlying stream, before the desired amount of
795 /// items is reached, are passed through and do not affect the total number
796 /// of items taken.
take(self, amt: u64) -> Take<Self> where Self: Sized797 fn take(self, amt: u64) -> Take<Self>
798 where Self: Sized
799 {
800 take::new(self, amt)
801 }
802
803 /// Creates a new stream which skips `amt` items of the underlying stream.
804 ///
805 /// Once `amt` items have been skipped from this stream then it will always
806 /// return the remaining items on this stream.
807 ///
808 /// # Errors
809 ///
810 /// All errors yielded from underlying stream are passed through and do not
811 /// affect the total number of items skipped.
skip(self, amt: u64) -> Skip<Self> where Self: Sized812 fn skip(self, amt: u64) -> Skip<Self>
813 where Self: Sized
814 {
815 skip::new(self, amt)
816 }
817
818 /// Fuse a stream such that `poll` will never again be called once it has
819 /// finished.
820 ///
821 /// Currently once a stream has returned `None` from `poll` any further
822 /// calls could exhibit bad behavior such as block forever, panic, never
823 /// return, etc. If it is known that `poll` may be called after stream has
824 /// already finished, then this method can be used to ensure that it has
825 /// defined semantics.
826 ///
827 /// Once a stream has been `fuse`d and it finishes, then it will forever
828 /// return `None` from `poll`. This, unlike for the traits `poll` method,
829 /// is guaranteed.
830 ///
831 /// Also note that as soon as this stream returns `None` it will be dropped
832 /// to reclaim resources associated with it.
fuse(self) -> Fuse<Self> where Self: Sized833 fn fuse(self) -> Fuse<Self>
834 where Self: Sized
835 {
836 fuse::new(self)
837 }
838
839 /// Borrows a stream, rather than consuming it.
840 ///
841 /// This is useful to allow applying stream adaptors while still retaining
842 /// ownership of the original stream.
843 ///
844 /// ```
845 /// use futures::prelude::*;
846 /// use futures::stream;
847 /// use futures::future;
848 ///
849 /// let mut stream = stream::iter_ok::<_, ()>(1..5);
850 ///
851 /// let sum = stream.by_ref().take(2).fold(0, |a, b| future::ok(a + b)).wait();
852 /// assert_eq!(sum, Ok(3));
853 ///
854 /// // You can use the stream again
855 /// let sum = stream.take(2).fold(0, |a, b| future::ok(a + b)).wait();
856 /// assert_eq!(sum, Ok(7));
857 /// ```
by_ref(&mut self) -> &mut Self where Self: Sized858 fn by_ref(&mut self) -> &mut Self
859 where Self: Sized
860 {
861 self
862 }
863
864 /// Catches unwinding panics while polling the stream.
865 ///
866 /// Caught panic (if any) will be the last element of the resulting stream.
867 ///
868 /// In general, panics within a stream can propagate all the way out to the
869 /// task level. This combinator makes it possible to halt unwinding within
870 /// the stream itself. It's most commonly used within task executors. This
871 /// method should not be used for error handling.
872 ///
873 /// Note that this method requires the `UnwindSafe` bound from the standard
874 /// library. This isn't always applied automatically, and the standard
875 /// library provides an `AssertUnwindSafe` wrapper type to apply it
876 /// after-the fact. To assist using this method, the `Stream` trait is also
877 /// implemented for `AssertUnwindSafe<S>` where `S` implements `Stream`.
878 ///
879 /// This method is only available when the `use_std` feature of this
880 /// library is activated, and it is activated by default.
881 ///
882 /// # Examples
883 ///
884 /// ```rust
885 /// use futures::prelude::*;
886 /// use futures::stream;
887 ///
888 /// let stream = stream::iter_ok::<_, bool>(vec![Some(10), None, Some(11)]);
889 /// // panic on second element
890 /// let stream_panicking = stream.map(|o| o.unwrap());
891 /// let mut iter = stream_panicking.catch_unwind().wait();
892 ///
893 /// assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap());
894 /// assert!(iter.next().unwrap().is_err());
895 /// assert!(iter.next().is_none());
896 /// ```
897 #[cfg(feature = "use_std")]
catch_unwind(self) -> CatchUnwind<Self> where Self: Sized + std::panic::UnwindSafe898 fn catch_unwind(self) -> CatchUnwind<Self>
899 where Self: Sized + std::panic::UnwindSafe
900 {
901 catch_unwind::new(self)
902 }
903
904 /// An adaptor for creating a buffered list of pending futures.
905 ///
906 /// If this stream's item can be converted into a future, then this adaptor
907 /// will buffer up to at most `amt` futures and then return results in the
908 /// same order as the underlying stream. No more than `amt` futures will be
909 /// buffered at any point in time, and less than `amt` may also be buffered
910 /// depending on the state of each future.
911 ///
912 /// The returned stream will be a stream of each future's result, with
913 /// errors passed through whenever they occur.
914 ///
915 /// This method is only available when the `use_std` feature of this
916 /// library is activated, and it is activated by default.
917 #[cfg(feature = "use_std")]
buffered(self, amt: usize) -> Buffered<Self> where Self::Item: IntoFuture<Error = <Self as Stream>::Error>, Self: Sized918 fn buffered(self, amt: usize) -> Buffered<Self>
919 where Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
920 Self: Sized
921 {
922 buffered::new(self, amt)
923 }
924
925 /// An adaptor for creating a buffered list of pending futures (unordered).
926 ///
927 /// If this stream's item can be converted into a future, then this adaptor
928 /// will buffer up to `amt` futures and then return results in the order
929 /// in which they complete. No more than `amt` futures will be buffered at
930 /// any point in time, and less than `amt` may also be buffered depending on
931 /// the state of each future.
932 ///
933 /// The returned stream will be a stream of each future's result, with
934 /// errors passed through whenever they occur.
935 ///
936 /// This method is only available when the `use_std` feature of this
937 /// library is activated, and it is activated by default.
938 #[cfg(feature = "use_std")]
buffer_unordered(self, amt: usize) -> BufferUnordered<Self> where Self::Item: IntoFuture<Error = <Self as Stream>::Error>, Self: Sized939 fn buffer_unordered(self, amt: usize) -> BufferUnordered<Self>
940 where Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
941 Self: Sized
942 {
943 buffer_unordered::new(self, amt)
944 }
945
946 /// An adapter for merging the output of two streams.
947 ///
948 /// The merged stream produces items from one or both of the underlying
949 /// streams as they become available. Errors, however, are not merged: you
950 /// get at most one error at a time.
951 #[deprecated(note = "functionality provided by `select` now")]
952 #[allow(deprecated)]
merge<S>(self, other: S) -> Merge<Self, S> where S: Stream<Error = Self::Error>, Self: Sized,953 fn merge<S>(self, other: S) -> Merge<Self, S>
954 where S: Stream<Error = Self::Error>,
955 Self: Sized,
956 {
957 merge::new(self, other)
958 }
959
960 /// An adapter for zipping two streams together.
961 ///
962 /// The zipped stream waits for both streams to produce an item, and then
963 /// returns that pair. If an error happens, then that error will be returned
964 /// immediately. If either stream ends then the zipped stream will also end.
zip<S>(self, other: S) -> Zip<Self, S> where S: Stream<Error = Self::Error>, Self: Sized,965 fn zip<S>(self, other: S) -> Zip<Self, S>
966 where S: Stream<Error = Self::Error>,
967 Self: Sized,
968 {
969 zip::new(self, other)
970 }
971
972 /// Adapter for chaining two stream.
973 ///
974 /// The resulting stream emits elements from the first stream, and when
975 /// first stream reaches the end, emits the elements from the second stream.
976 ///
977 /// ```rust
978 /// use futures::prelude::*;
979 /// use futures::stream;
980 ///
981 /// let stream1 = stream::iter_result(vec![Ok(10), Err(false)]);
982 /// let stream2 = stream::iter_result(vec![Err(true), Ok(20)]);
983 /// let mut chain = stream1.chain(stream2).wait();
984 ///
985 /// assert_eq!(Some(Ok(10)), chain.next());
986 /// assert_eq!(Some(Err(false)), chain.next());
987 /// assert_eq!(Some(Err(true)), chain.next());
988 /// assert_eq!(Some(Ok(20)), chain.next());
989 /// assert_eq!(None, chain.next());
990 /// ```
chain<S>(self, other: S) -> Chain<Self, S> where S: Stream<Item = Self::Item, Error = Self::Error>, Self: Sized991 fn chain<S>(self, other: S) -> Chain<Self, S>
992 where S: Stream<Item = Self::Item, Error = Self::Error>,
993 Self: Sized
994 {
995 chain::new(self, other)
996 }
997
998 /// Creates a new stream which exposes a `peek` method.
999 ///
1000 /// Calling `peek` returns a reference to the next item in the stream.
peekable(self) -> Peekable<Self> where Self: Sized1001 fn peekable(self) -> Peekable<Self>
1002 where Self: Sized
1003 {
1004 peek::new(self)
1005 }
1006
1007 /// An adaptor for chunking up items of the stream inside a vector.
1008 ///
1009 /// This combinator will attempt to pull items from this stream and buffer
1010 /// them into a local vector. At most `capacity` items will get buffered
1011 /// before they're yielded from the returned stream.
1012 ///
1013 /// Note that the vectors returned from this iterator may not always have
1014 /// `capacity` elements. If the underlying stream ended and only a partial
1015 /// vector was created, it'll be returned. Additionally if an error happens
1016 /// from the underlying stream then the currently buffered items will be
1017 /// yielded.
1018 ///
1019 /// Errors are passed through the stream unbuffered.
1020 ///
1021 /// This method is only available when the `use_std` feature of this
1022 /// library is activated, and it is activated by default.
1023 ///
1024 /// # Panics
1025 ///
1026 /// This method will panic of `capacity` is zero.
1027 #[cfg(feature = "use_std")]
chunks(self, capacity: usize) -> Chunks<Self> where Self: Sized1028 fn chunks(self, capacity: usize) -> Chunks<Self>
1029 where Self: Sized
1030 {
1031 chunks::new(self, capacity)
1032 }
1033
1034 /// Creates a stream that selects the next element from either this stream
1035 /// or the provided one, whichever is ready first.
1036 ///
1037 /// This combinator will attempt to pull items from both streams. Each
1038 /// stream will be polled in a round-robin fashion, and whenever a stream is
1039 /// ready to yield an item that item is yielded.
1040 ///
1041 /// The `select` function is similar to `merge` except that it requires both
1042 /// streams to have the same item and error types.
1043 ///
1044 /// Error are passed through from either stream.
select<S>(self, other: S) -> Select<Self, S> where S: Stream<Item = Self::Item, Error = Self::Error>, Self: Sized,1045 fn select<S>(self, other: S) -> Select<Self, S>
1046 where S: Stream<Item = Self::Item, Error = Self::Error>,
1047 Self: Sized,
1048 {
1049 select::new(self, other)
1050 }
1051
1052 /// A future that completes after the given stream has been fully processed
1053 /// into the sink, including flushing.
1054 ///
1055 /// This future will drive the stream to keep producing items until it is
1056 /// exhausted, sending each item to the sink. It will complete once both the
1057 /// stream is exhausted, and the sink has fully processed received item,
1058 /// flushed successfully, and closed successfully.
1059 ///
1060 /// Doing `stream.forward(sink)` is roughly equivalent to
1061 /// `sink.send_all(stream)`. The returned future will exhaust all items from
1062 /// `self`, sending them all to `sink`. Furthermore the `sink` will be
1063 /// closed and flushed.
1064 ///
1065 /// On completion, the pair `(stream, sink)` is returned.
forward<S>(self, sink: S) -> Forward<Self, S> where S: Sink<SinkItem = Self::Item>, Self::Error: From<S::SinkError>, Self: Sized1066 fn forward<S>(self, sink: S) -> Forward<Self, S>
1067 where S: Sink<SinkItem = Self::Item>,
1068 Self::Error: From<S::SinkError>,
1069 Self: Sized
1070 {
1071 forward::new(self, sink)
1072 }
1073
1074 /// Splits this `Stream + Sink` object into separate `Stream` and `Sink`
1075 /// objects.
1076 ///
1077 /// This can be useful when you want to split ownership between tasks, or
1078 /// allow direct interaction between the two objects (e.g. via
1079 /// `Sink::send_all`).
1080 ///
1081 /// This method is only available when the `use_std` feature of this
1082 /// library is activated, and it is activated by default.
1083 #[cfg(feature = "use_std")]
split(self) -> (SplitSink<Self>, SplitStream<Self>) where Self: super::sink::Sink + Sized1084 fn split(self) -> (SplitSink<Self>, SplitStream<Self>)
1085 where Self: super::sink::Sink + Sized
1086 {
1087 split::split(self)
1088 }
1089
1090 /// Do something with each item of this stream, afterwards passing it on.
1091 ///
1092 /// This is similar to the `Iterator::inspect` method in the standard
1093 /// library where it allows easily inspecting each value as it passes
1094 /// through the stream, for example to debug what's going on.
inspect<F>(self, f: F) -> Inspect<Self, F> where F: FnMut(&Self::Item), Self: Sized,1095 fn inspect<F>(self, f: F) -> Inspect<Self, F>
1096 where F: FnMut(&Self::Item),
1097 Self: Sized,
1098 {
1099 inspect::new(self, f)
1100 }
1101
1102 /// Do something with the error of this stream, afterwards passing it on.
1103 ///
1104 /// This is similar to the `Stream::inspect` method where it allows
1105 /// easily inspecting the error as it passes through the stream, for
1106 /// example to debug what's going on.
inspect_err<F>(self, f: F) -> InspectErr<Self, F> where F: FnMut(&Self::Error), Self: Sized,1107 fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
1108 where F: FnMut(&Self::Error),
1109 Self: Sized,
1110 {
1111 inspect_err::new(self, f)
1112 }
1113 }
1114
1115 impl<'a, S: ?Sized + Stream> Stream for &'a mut S {
1116 type Item = S::Item;
1117 type Error = S::Error;
1118
poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>1119 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
1120 (**self).poll()
1121 }
1122 }
1123
1124 /// Converts a list of futures into a `Stream` of results from the futures.
1125 ///
1126 /// This function will take an list of futures (e.g. a vector, an iterator,
1127 /// etc), and return a stream. The stream will yield items as they become
1128 /// available on the futures internally, in the order that they become
1129 /// available. This function is similar to `buffer_unordered` in that it may
1130 /// return items in a different order than in the list specified.
1131 ///
1132 /// Note that the returned set can also be used to dynamically push more
1133 /// futures into the set as they become available.
1134 #[cfg(feature = "use_std")]
futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future> where I: IntoIterator, I::Item: IntoFuture1135 pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future>
1136 where I: IntoIterator,
1137 I::Item: IntoFuture
1138 {
1139 let mut set = FuturesUnordered::new();
1140
1141 for future in futures {
1142 set.push(future.into_future());
1143 }
1144
1145 return set
1146 }
1147