1 //! Stream utilities for Tokio.
2 //!
3 //! A `Stream` is an asynchronous sequence of values. It can be thought of as an asynchronous version of the standard library's `Iterator` trait.
4 //!
5 //! This module provides helpers to work with them.
6 
7 mod all;
8 use all::AllFuture;
9 
10 mod any;
11 use any::AnyFuture;
12 
13 mod chain;
14 use chain::Chain;
15 
16 mod collect;
17 use collect::Collect;
18 pub use collect::FromStream;
19 
20 mod empty;
21 pub use empty::{empty, Empty};
22 
23 mod filter;
24 use filter::Filter;
25 
26 mod filter_map;
27 use filter_map::FilterMap;
28 
29 mod fold;
30 use fold::FoldFuture;
31 
32 mod fuse;
33 use fuse::Fuse;
34 
35 mod iter;
36 pub use iter::{iter, Iter};
37 
38 mod map;
39 use map::Map;
40 
41 mod merge;
42 use merge::Merge;
43 
44 mod next;
45 use next::Next;
46 
47 mod once;
48 pub use once::{once, Once};
49 
50 mod pending;
51 pub use pending::{pending, Pending};
52 
53 mod stream_map;
54 pub use stream_map::StreamMap;
55 
56 mod skip;
57 use skip::Skip;
58 
59 mod skip_while;
60 use skip_while::SkipWhile;
61 
62 mod try_next;
63 use try_next::TryNext;
64 
65 mod take;
66 use take::Take;
67 
68 mod take_while;
69 use take_while::TakeWhile;
70 
71 cfg_time! {
72     mod timeout;
73     use timeout::Timeout;
74     use std::time::Duration;
75 }
76 
77 pub use futures_core::Stream;
78 
79 /// An extension trait for `Stream`s that provides a variety of convenient
80 /// combinator functions.
81 pub trait StreamExt: Stream {
82     /// Consumes and returns the next value in the stream or `None` if the
83     /// stream is finished.
84     ///
85     /// Equivalent to:
86     ///
87     /// ```ignore
88     /// async fn next(&mut self) -> Option<Self::Item>;
89     /// ```
90     ///
91     /// Note that because `next` doesn't take ownership over the stream,
92     /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
93     /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
94     /// be done by boxing the stream using [`Box::pin`] or
95     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
96     /// crate.
97     ///
98     /// # Examples
99     ///
100     /// ```
101     /// # #[tokio::main]
102     /// # async fn main() {
103     /// use tokio::stream::{self, StreamExt};
104     ///
105     /// let mut stream = stream::iter(1..=3);
106     ///
107     /// assert_eq!(stream.next().await, Some(1));
108     /// assert_eq!(stream.next().await, Some(2));
109     /// assert_eq!(stream.next().await, Some(3));
110     /// assert_eq!(stream.next().await, None);
111     /// # }
112     /// ```
next(&mut self) -> Next<'_, Self> where Self: Unpin,113     fn next(&mut self) -> Next<'_, Self>
114     where
115         Self: Unpin,
116     {
117         Next::new(self)
118     }
119 
120     /// Consumes and returns the next item in the stream. If an error is
121     /// encountered before the next item, the error is returned instead.
122     ///
123     /// Equivalent to:
124     ///
125     /// ```ignore
126     /// async fn try_next(&mut self) -> Result<Option<T>, E>;
127     /// ```
128     ///
129     /// This is similar to the [`next`](StreamExt::next) combinator,
130     /// but returns a [`Result<Option<T>, E>`](Result) rather than
131     /// an [`Option<Result<T, E>>`](Option), making for easy use
132     /// with the [`?`](std::ops::Try) operator.
133     ///
134     /// # Examples
135     ///
136     /// ```
137     /// # #[tokio::main]
138     /// # async fn main() {
139     /// use tokio::stream::{self, StreamExt};
140     ///
141     /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
142     ///
143     /// assert_eq!(stream.try_next().await, Ok(Some(1)));
144     /// assert_eq!(stream.try_next().await, Ok(Some(2)));
145     /// assert_eq!(stream.try_next().await, Err("nope"));
146     /// # }
147     /// ```
try_next<T, E>(&mut self) -> TryNext<'_, Self> where Self: Stream<Item = Result<T, E>> + Unpin,148     fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
149     where
150         Self: Stream<Item = Result<T, E>> + Unpin,
151     {
152         TryNext::new(self)
153     }
154 
155     /// Maps this stream's items to a different type, returning a new stream of
156     /// the resulting type.
157     ///
158     /// The provided closure is executed over all elements of this stream as
159     /// they are made available. It is executed inline with calls to
160     /// [`poll_next`](Stream::poll_next).
161     ///
162     /// Note that this function consumes the stream passed into it and returns a
163     /// wrapped version of it, similar to the existing `map` methods in the
164     /// standard library.
165     ///
166     /// # Examples
167     ///
168     /// ```
169     /// # #[tokio::main]
170     /// # async fn main() {
171     /// use tokio::stream::{self, StreamExt};
172     ///
173     /// let stream = stream::iter(1..=3);
174     /// let mut stream = stream.map(|x| x + 3);
175     ///
176     /// assert_eq!(stream.next().await, Some(4));
177     /// assert_eq!(stream.next().await, Some(5));
178     /// assert_eq!(stream.next().await, Some(6));
179     /// # }
180     /// ```
map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,181     fn map<T, F>(self, f: F) -> Map<Self, F>
182     where
183         F: FnMut(Self::Item) -> T,
184         Self: Sized,
185     {
186         Map::new(self, f)
187     }
188 
189     /// Combine two streams into one by interleaving the output of both as it
190     /// is produced.
191     ///
192     /// Values are produced from the merged stream in the order they arrive from
193     /// the two source streams. If both source streams provide values
194     /// simultaneously, the merge stream alternates between them. This provides
195     /// some level of fairness. You should not chain calls to `merge`, as this
196     /// will break the fairness of the merging.
197     ///
198     /// The merged stream completes once **both** source streams complete. When
199     /// one source stream completes before the other, the merge stream
200     /// exclusively polls the remaining stream.
201     ///
202     /// For merging multiple streams, consider using [`StreamMap`] instead.
203     ///
204     /// [`StreamMap`]: crate::stream::StreamMap
205     ///
206     /// # Examples
207     ///
208     /// ```
209     /// use tokio::stream::StreamExt;
210     /// use tokio::sync::mpsc;
211     /// use tokio::time;
212     ///
213     /// use std::time::Duration;
214     ///
215     /// # /*
216     /// #[tokio::main]
217     /// # */
218     /// # #[tokio::main(basic_scheduler)]
219     /// async fn main() {
220     /// # time::pause();
221     ///     let (mut tx1, rx1) = mpsc::channel(10);
222     ///     let (mut tx2, rx2) = mpsc::channel(10);
223     ///
224     ///     let mut rx = rx1.merge(rx2);
225     ///
226     ///     tokio::spawn(async move {
227     ///         // Send some values immediately
228     ///         tx1.send(1).await.unwrap();
229     ///         tx1.send(2).await.unwrap();
230     ///
231     ///         // Let the other task send values
232     ///         time::delay_for(Duration::from_millis(20)).await;
233     ///
234     ///         tx1.send(4).await.unwrap();
235     ///     });
236     ///
237     ///     tokio::spawn(async move {
238     ///         // Wait for the first task to send values
239     ///         time::delay_for(Duration::from_millis(5)).await;
240     ///
241     ///         tx2.send(3).await.unwrap();
242     ///
243     ///         time::delay_for(Duration::from_millis(25)).await;
244     ///
245     ///         // Send the final value
246     ///         tx2.send(5).await.unwrap();
247     ///     });
248     ///
249     ///    assert_eq!(1, rx.next().await.unwrap());
250     ///    assert_eq!(2, rx.next().await.unwrap());
251     ///    assert_eq!(3, rx.next().await.unwrap());
252     ///    assert_eq!(4, rx.next().await.unwrap());
253     ///    assert_eq!(5, rx.next().await.unwrap());
254     ///
255     ///    // The merged stream is consumed
256     ///    assert!(rx.next().await.is_none());
257     /// }
258     /// ```
merge<U>(self, other: U) -> Merge<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,259     fn merge<U>(self, other: U) -> Merge<Self, U>
260     where
261         U: Stream<Item = Self::Item>,
262         Self: Sized,
263     {
264         Merge::new(self, other)
265     }
266 
267     /// Filters the values produced by this stream according to the provided
268     /// predicate.
269     ///
270     /// As values of this stream are made available, the provided predicate `f`
271     /// will be run against them. If the predicate
272     /// resolves to `true`, then the stream will yield the value, but if the
273     /// predicate resolves to `false`, then the value
274     /// will be discarded and the next value will be produced.
275     ///
276     /// Note that this function consumes the stream passed into it and returns a
277     /// wrapped version of it, similar to [`Iterator::filter`] method in the
278     /// standard library.
279     ///
280     /// # Examples
281     ///
282     /// ```
283     /// # #[tokio::main]
284     /// # async fn main() {
285     /// use tokio::stream::{self, StreamExt};
286     ///
287     /// let stream = stream::iter(1..=8);
288     /// let mut evens = stream.filter(|x| x % 2 == 0);
289     ///
290     /// assert_eq!(Some(2), evens.next().await);
291     /// assert_eq!(Some(4), evens.next().await);
292     /// assert_eq!(Some(6), evens.next().await);
293     /// assert_eq!(Some(8), evens.next().await);
294     /// assert_eq!(None, evens.next().await);
295     /// # }
296     /// ```
filter<F>(self, f: F) -> Filter<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,297     fn filter<F>(self, f: F) -> Filter<Self, F>
298     where
299         F: FnMut(&Self::Item) -> bool,
300         Self: Sized,
301     {
302         Filter::new(self, f)
303     }
304 
305     /// Filters the values produced by this stream while simultaneously mapping
306     /// them to a different type according to the provided closure.
307     ///
308     /// As values of this stream are made available, the provided function will
309     /// be run on them. If the predicate `f` resolves to
310     /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
311     /// it resolves to [`None`], then the value will be skipped.
312     ///
313     /// Note that this function consumes the stream passed into it and returns a
314     /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
315     /// standard library.
316     ///
317     /// # Examples
318     /// ```
319     /// # #[tokio::main]
320     /// # async fn main() {
321     /// use tokio::stream::{self, StreamExt};
322     ///
323     /// let stream = stream::iter(1..=8);
324     /// let mut evens = stream.filter_map(|x| {
325     ///     if x % 2 == 0 { Some(x + 1) } else { None }
326     /// });
327     ///
328     /// assert_eq!(Some(3), evens.next().await);
329     /// assert_eq!(Some(5), evens.next().await);
330     /// assert_eq!(Some(7), evens.next().await);
331     /// assert_eq!(Some(9), evens.next().await);
332     /// assert_eq!(None, evens.next().await);
333     /// # }
334     /// ```
filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized,335     fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
336     where
337         F: FnMut(Self::Item) -> Option<T>,
338         Self: Sized,
339     {
340         FilterMap::new(self, f)
341     }
342 
343     /// Creates a stream which ends after the first `None`.
344     ///
345     /// After a stream returns `None`, behavior is undefined. Future calls to
346     /// `poll_next` may or may not return `Some(T)` again or they may panic.
347     /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
348     /// return `None` forever.
349     ///
350     /// # Examples
351     ///
352     /// ```
353     /// use tokio::stream::{Stream, StreamExt};
354     ///
355     /// use std::pin::Pin;
356     /// use std::task::{Context, Poll};
357     ///
358     /// // a stream which alternates between Some and None
359     /// struct Alternate {
360     ///     state: i32,
361     /// }
362     ///
363     /// impl Stream for Alternate {
364     ///     type Item = i32;
365     ///
366     ///     fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
367     ///         let val = self.state;
368     ///         self.state = self.state + 1;
369     ///
370     ///         // if it's even, Some(i32), else None
371     ///         if val % 2 == 0 {
372     ///             Poll::Ready(Some(val))
373     ///         } else {
374     ///             Poll::Ready(None)
375     ///         }
376     ///     }
377     /// }
378     ///
379     /// #[tokio::main]
380     /// async fn main() {
381     ///     let mut stream = Alternate { state: 0 };
382     ///
383     ///     // the stream goes back and forth
384     ///     assert_eq!(stream.next().await, Some(0));
385     ///     assert_eq!(stream.next().await, None);
386     ///     assert_eq!(stream.next().await, Some(2));
387     ///     assert_eq!(stream.next().await, None);
388     ///
389     ///     // however, once it is fused
390     ///     let mut stream = stream.fuse();
391     ///
392     ///     assert_eq!(stream.next().await, Some(4));
393     ///     assert_eq!(stream.next().await, None);
394     ///
395     ///     // it will always return `None` after the first time.
396     ///     assert_eq!(stream.next().await, None);
397     ///     assert_eq!(stream.next().await, None);
398     ///     assert_eq!(stream.next().await, None);
399     /// }
400     /// ```
fuse(self) -> Fuse<Self> where Self: Sized,401     fn fuse(self) -> Fuse<Self>
402     where
403         Self: Sized,
404     {
405         Fuse::new(self)
406     }
407 
408     /// Creates a new stream of at most `n` items of the underlying stream.
409     ///
410     /// Once `n` items have been yielded from this stream then it will always
411     /// return that the stream is done.
412     ///
413     /// # Examples
414     ///
415     /// ```
416     /// # #[tokio::main]
417     /// # async fn main() {
418     /// use tokio::stream::{self, StreamExt};
419     ///
420     /// let mut stream = stream::iter(1..=10).take(3);
421     ///
422     /// assert_eq!(Some(1), stream.next().await);
423     /// assert_eq!(Some(2), stream.next().await);
424     /// assert_eq!(Some(3), stream.next().await);
425     /// assert_eq!(None, stream.next().await);
426     /// # }
427     /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,428     fn take(self, n: usize) -> Take<Self>
429     where
430         Self: Sized,
431     {
432         Take::new(self, n)
433     }
434 
435     /// Take elements from this stream while the provided predicate
436     /// resolves to `true`.
437     ///
438     /// This function, like `Iterator::take_while`, will take elements from the
439     /// stream until the predicate `f` resolves to `false`. Once one element
440     /// returns false it will always return that the stream is done.
441     ///
442     /// # Examples
443     ///
444     /// ```
445     /// # #[tokio::main]
446     /// # async fn main() {
447     /// use tokio::stream::{self, StreamExt};
448     ///
449     /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
450     ///
451     /// assert_eq!(Some(1), stream.next().await);
452     /// assert_eq!(Some(2), stream.next().await);
453     /// assert_eq!(Some(3), stream.next().await);
454     /// assert_eq!(None, stream.next().await);
455     /// # }
456     /// ```
take_while<F>(self, f: F) -> TakeWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,457     fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
458     where
459         F: FnMut(&Self::Item) -> bool,
460         Self: Sized,
461     {
462         TakeWhile::new(self, f)
463     }
464 
465     /// Creates a new stream that will skip the `n` first items of the
466     /// underlying stream.
467     ///
468     /// # Examples
469     ///
470     /// ```
471     /// # #[tokio::main]
472     /// # async fn main() {
473     /// use tokio::stream::{self, StreamExt};
474     ///
475     /// let mut stream = stream::iter(1..=10).skip(7);
476     ///
477     /// assert_eq!(Some(8), stream.next().await);
478     /// assert_eq!(Some(9), stream.next().await);
479     /// assert_eq!(Some(10), stream.next().await);
480     /// assert_eq!(None, stream.next().await);
481     /// # }
482     /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,483     fn skip(self, n: usize) -> Skip<Self>
484     where
485         Self: Sized,
486     {
487         Skip::new(self, n)
488     }
489 
490     /// Skip elements from the underlying stream while the provided predicate
491     /// resolves to `true`.
492     ///
493     /// This function, like [`Iterator::skip_while`], will ignore elemets from the
494     /// stream until the predicate `f` resolves to `false`. Once one element
495     /// returns false, the rest of the elements will be yielded.
496     ///
497     /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
498     ///
499     /// # Examples
500     ///
501     /// ```
502     /// # #[tokio::main]
503     /// # async fn main() {
504     /// use tokio::stream::{self, StreamExt};
505     /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
506     ///
507     /// assert_eq!(Some(3), stream.next().await);
508     /// assert_eq!(Some(4), stream.next().await);
509     /// assert_eq!(Some(1), stream.next().await);
510     /// assert_eq!(None, stream.next().await);
511     /// # }
512     /// ```
skip_while<F>(self, f: F) -> SkipWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,513     fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
514     where
515         F: FnMut(&Self::Item) -> bool,
516         Self: Sized,
517     {
518         SkipWhile::new(self, f)
519     }
520 
521     /// Tests if every element of the stream matches a predicate.
522     ///
523     /// `all()` takes a closure that returns `true` or `false`. It applies
524     /// this closure to each element of the stream, and if they all return
525     /// `true`, then so does `all`. If any of them return `false`, it
526     /// returns `false`. An empty stream returns `true`.
527     ///
528     /// `all()` is short-circuiting; in other words, it will stop processing
529     /// as soon as it finds a `false`, given that no matter what else happens,
530     /// the result will also be `false`.
531     ///
532     /// An empty stream returns `true`.
533     ///
534     /// # Examples
535     ///
536     /// Basic usage:
537     ///
538     /// ```
539     /// # #[tokio::main]
540     /// # async fn main() {
541     /// use tokio::stream::{self, StreamExt};
542     ///
543     /// let a = [1, 2, 3];
544     ///
545     /// assert!(stream::iter(&a).all(|&x| x > 0).await);
546     ///
547     /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
548     /// # }
549     /// ```
550     ///
551     /// Stopping at the first `false`:
552     ///
553     /// ```
554     /// # #[tokio::main]
555     /// # async fn main() {
556     /// use tokio::stream::{self, StreamExt};
557     ///
558     /// let a = [1, 2, 3];
559     ///
560     /// let mut iter = stream::iter(&a);
561     ///
562     /// assert!(!iter.all(|&x| x != 2).await);
563     ///
564     /// // we can still use `iter`, as there are more elements.
565     /// assert_eq!(iter.next().await, Some(&3));
566     /// # }
567     /// ```
all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,568     fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
569     where
570         Self: Unpin,
571         F: FnMut(Self::Item) -> bool,
572     {
573         AllFuture::new(self, f)
574     }
575 
576     /// Tests if any element of the stream matches a predicate.
577     ///
578     /// `any()` takes a closure that returns `true` or `false`. It applies
579     /// this closure to each element of the stream, and if any of them return
580     /// `true`, then so does `any()`. If they all return `false`, it
581     /// returns `false`.
582     ///
583     /// `any()` is short-circuiting; in other words, it will stop processing
584     /// as soon as it finds a `true`, given that no matter what else happens,
585     /// the result will also be `true`.
586     ///
587     /// An empty stream returns `false`.
588     ///
589     /// Basic usage:
590     ///
591     /// ```
592     /// # #[tokio::main]
593     /// # async fn main() {
594     /// use tokio::stream::{self, StreamExt};
595     ///
596     /// let a = [1, 2, 3];
597     ///
598     /// assert!(stream::iter(&a).any(|&x| x > 0).await);
599     ///
600     /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
601     /// # }
602     /// ```
603     ///
604     /// Stopping at the first `true`:
605     ///
606     /// ```
607     /// # #[tokio::main]
608     /// # async fn main() {
609     /// use tokio::stream::{self, StreamExt};
610     ///
611     /// let a = [1, 2, 3];
612     ///
613     /// let mut iter = stream::iter(&a);
614     ///
615     /// assert!(iter.any(|&x| x != 2).await);
616     ///
617     /// // we can still use `iter`, as there are more elements.
618     /// assert_eq!(iter.next().await, Some(&2));
619     /// # }
620     /// ```
any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,621     fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
622     where
623         Self: Unpin,
624         F: FnMut(Self::Item) -> bool,
625     {
626         AnyFuture::new(self, f)
627     }
628 
629     /// Combine two streams into one by first returning all values from the
630     /// first stream then all values from the second stream.
631     ///
632     /// As long as `self` still has values to emit, no values from `other` are
633     /// emitted, even if some are ready.
634     ///
635     /// # Examples
636     ///
637     /// ```
638     /// use tokio::stream::{self, StreamExt};
639     ///
640     /// #[tokio::main]
641     /// async fn main() {
642     ///     let one = stream::iter(vec![1, 2, 3]);
643     ///     let two = stream::iter(vec![4, 5, 6]);
644     ///
645     ///     let mut stream = one.chain(two);
646     ///
647     ///     assert_eq!(stream.next().await, Some(1));
648     ///     assert_eq!(stream.next().await, Some(2));
649     ///     assert_eq!(stream.next().await, Some(3));
650     ///     assert_eq!(stream.next().await, Some(4));
651     ///     assert_eq!(stream.next().await, Some(5));
652     ///     assert_eq!(stream.next().await, Some(6));
653     ///     assert_eq!(stream.next().await, None);
654     /// }
655     /// ```
chain<U>(self, other: U) -> Chain<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,656     fn chain<U>(self, other: U) -> Chain<Self, U>
657     where
658         U: Stream<Item = Self::Item>,
659         Self: Sized,
660     {
661         Chain::new(self, other)
662     }
663 
664     /// A combinator that applies a function to every element in a stream
665     /// producing a single, final value.
666     ///
667     /// # Examples
668     /// Basic usage:
669     /// ```
670     /// # #[tokio::main]
671     /// # async fn main() {
672     /// use tokio::stream::{self, *};
673     ///
674     /// let s = stream::iter(vec![1u8, 2, 3]);
675     /// let sum = s.fold(0, |acc, x| acc + x).await;
676     ///
677     /// assert_eq!(sum, 6);
678     /// # }
679     /// ```
fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> where Self: Sized, F: FnMut(B, Self::Item) -> B,680     fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
681     where
682         Self: Sized,
683         F: FnMut(B, Self::Item) -> B,
684     {
685         FoldFuture::new(self, init, f)
686     }
687 
688     /// Drain stream pushing all emitted values into a collection.
689     ///
690     /// `collect` streams all values, awaiting as needed. Values are pushed into
691     /// a collection. A number of different target collection types are
692     /// supported, including [`Vec`](std::vec::Vec),
693     /// [`String`](std::string::String), and [`Bytes`](bytes::Bytes).
694     ///
695     /// # `Result`
696     ///
697     /// `collect()` can also be used with streams of type `Result<T, E>` where
698     /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
699     /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
700     /// streaming is terminated and `collect()` returns the `Err`.
701     ///
702     /// # Notes
703     ///
704     /// `FromStream` is currently a sealed trait. Stabilization is pending
705     /// enhancements to the Rust language.
706     ///
707     /// # Examples
708     ///
709     /// Basic usage:
710     ///
711     /// ```
712     /// use tokio::stream::{self, StreamExt};
713     ///
714     /// #[tokio::main]
715     /// async fn main() {
716     ///     let doubled: Vec<i32> =
717     ///         stream::iter(vec![1, 2, 3])
718     ///             .map(|x| x * 2)
719     ///             .collect()
720     ///             .await;
721     ///
722     ///     assert_eq!(vec![2, 4, 6], doubled);
723     /// }
724     /// ```
725     ///
726     /// Collecting a stream of `Result` values
727     ///
728     /// ```
729     /// use tokio::stream::{self, StreamExt};
730     ///
731     /// #[tokio::main]
732     /// async fn main() {
733     ///     // A stream containing only `Ok` values will be collected
734     ///     let values: Result<Vec<i32>, &str> =
735     ///         stream::iter(vec![Ok(1), Ok(2), Ok(3)])
736     ///             .collect()
737     ///             .await;
738     ///
739     ///     assert_eq!(Ok(vec![1, 2, 3]), values);
740     ///
741     ///     // A stream containing `Err` values will return the first error.
742     ///     let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
743     ///
744     ///     let values: Result<Vec<i32>, &str> =
745     ///         stream::iter(results)
746     ///             .collect()
747     ///             .await;
748     ///
749     ///     assert_eq!(Err("no"), values);
750     /// }
751     /// ```
collect<T>(self) -> Collect<Self, T> where T: FromStream<Self::Item>, Self: Sized,752     fn collect<T>(self) -> Collect<Self, T>
753     where
754         T: FromStream<Self::Item>,
755         Self: Sized,
756     {
757         Collect::new(self)
758     }
759 
760     /// Applies a per-item timeout to the passed stream.
761     ///
762     /// `timeout()` takes a `Duration` that represents the maximum amount of
763     /// time each element of the stream has to complete before timing out.
764     ///
765     /// If the wrapped stream yields a value before the deadline is reached, the
766     /// value is returned. Otherwise, an error is returned. The caller may decide
767     /// to continue consuming the stream and will eventually get the next source
768     /// stream value once it becomes available.
769     ///
770     /// # Notes
771     ///
772     /// This function consumes the stream passed into it and returns a
773     /// wrapped version of it.
774     ///
775     /// Polling the returned stream will continue to poll the inner stream even
776     /// if one or more items time out.
777     ///
778     /// # Examples
779     ///
780     /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
781     ///
782     /// ```
783     /// # #[tokio::main]
784     /// # async fn main() {
785     /// use tokio::stream::{self, StreamExt};
786     /// use std::time::Duration;
787     /// # let int_stream = stream::iter(1..=3);
788     ///
789     /// let mut int_stream = int_stream.timeout(Duration::from_secs(1));
790     ///
791     /// // When no items time out, we get the 3 elements in succession:
792     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
793     /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
794     /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
795     /// assert_eq!(int_stream.try_next().await, Ok(None));
796     ///
797     /// // If the second item times out, we get an error and continue polling the stream:
798     /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
799     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
800     /// assert!(int_stream.try_next().await.is_err());
801     /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
802     /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
803     /// assert_eq!(int_stream.try_next().await, Ok(None));
804     ///
805     /// // If we want to stop consuming the source stream the first time an
806     /// // element times out, we can use the `take_while` operator:
807     /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
808     /// let mut int_stream = int_stream.take_while(Result::is_ok);
809     ///
810     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
811     /// assert_eq!(int_stream.try_next().await, Ok(None));
812     /// # }
813     /// ```
814     #[cfg(all(feature = "time"))]
815     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
timeout(self, duration: Duration) -> Timeout<Self> where Self: Sized,816     fn timeout(self, duration: Duration) -> Timeout<Self>
817     where
818         Self: Sized,
819     {
820         Timeout::new(self, duration)
821     }
822 }
823 
824 impl<St: ?Sized> StreamExt for St where St: Stream {}
825 
826 /// Merge the size hints from two streams.
merge_size_hints( (left_low, left_high): (usize, Option<usize>), (right_low, right_hign): (usize, Option<usize>), ) -> (usize, Option<usize>)827 fn merge_size_hints(
828     (left_low, left_high): (usize, Option<usize>),
829     (right_low, right_hign): (usize, Option<usize>),
830 ) -> (usize, Option<usize>) {
831     let low = left_low.saturating_add(right_low);
832     let high = match (left_high, right_hign) {
833         (Some(h1), Some(h2)) => h1.checked_add(h2),
834         _ => None,
835     };
836     (low, high)
837 }
838