1 //! Stream utilities for Tokio.
2 //!
3 //! A `Stream` is an asynchronous sequence of values. It can be thought of as an asynchronous version of the standard library's `Iterator` trait.
4 //!
5 //! This module provides helpers to work with them.
6
7 mod all;
8 use all::AllFuture;
9
10 mod any;
11 use any::AnyFuture;
12
13 mod chain;
14 use chain::Chain;
15
16 mod collect;
17 use collect::Collect;
18 pub use collect::FromStream;
19
20 mod empty;
21 pub use empty::{empty, Empty};
22
23 mod filter;
24 use filter::Filter;
25
26 mod filter_map;
27 use filter_map::FilterMap;
28
29 mod fold;
30 use fold::FoldFuture;
31
32 mod fuse;
33 use fuse::Fuse;
34
35 mod iter;
36 pub use iter::{iter, Iter};
37
38 mod map;
39 use map::Map;
40
41 mod merge;
42 use merge::Merge;
43
44 mod next;
45 use next::Next;
46
47 mod once;
48 pub use once::{once, Once};
49
50 mod pending;
51 pub use pending::{pending, Pending};
52
53 mod stream_map;
54 pub use stream_map::StreamMap;
55
56 mod skip;
57 use skip::Skip;
58
59 mod skip_while;
60 use skip_while::SkipWhile;
61
62 mod try_next;
63 use try_next::TryNext;
64
65 mod take;
66 use take::Take;
67
68 mod take_while;
69 use take_while::TakeWhile;
70
71 cfg_time! {
72 mod timeout;
73 use timeout::Timeout;
74 use std::time::Duration;
75 }
76
77 pub use futures_core::Stream;
78
79 /// An extension trait for `Stream`s that provides a variety of convenient
80 /// combinator functions.
81 pub trait StreamExt: Stream {
82 /// Consumes and returns the next value in the stream or `None` if the
83 /// stream is finished.
84 ///
85 /// Equivalent to:
86 ///
87 /// ```ignore
88 /// async fn next(&mut self) -> Option<Self::Item>;
89 /// ```
90 ///
91 /// Note that because `next` doesn't take ownership over the stream,
92 /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
93 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
94 /// be done by boxing the stream using [`Box::pin`] or
95 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
96 /// crate.
97 ///
98 /// # Examples
99 ///
100 /// ```
101 /// # #[tokio::main]
102 /// # async fn main() {
103 /// use tokio::stream::{self, StreamExt};
104 ///
105 /// let mut stream = stream::iter(1..=3);
106 ///
107 /// assert_eq!(stream.next().await, Some(1));
108 /// assert_eq!(stream.next().await, Some(2));
109 /// assert_eq!(stream.next().await, Some(3));
110 /// assert_eq!(stream.next().await, None);
111 /// # }
112 /// ```
next(&mut self) -> Next<'_, Self> where Self: Unpin,113 fn next(&mut self) -> Next<'_, Self>
114 where
115 Self: Unpin,
116 {
117 Next::new(self)
118 }
119
120 /// Consumes and returns the next item in the stream. If an error is
121 /// encountered before the next item, the error is returned instead.
122 ///
123 /// Equivalent to:
124 ///
125 /// ```ignore
126 /// async fn try_next(&mut self) -> Result<Option<T>, E>;
127 /// ```
128 ///
129 /// This is similar to the [`next`](StreamExt::next) combinator,
130 /// but returns a [`Result<Option<T>, E>`](Result) rather than
131 /// an [`Option<Result<T, E>>`](Option), making for easy use
132 /// with the [`?`](std::ops::Try) operator.
133 ///
134 /// # Examples
135 ///
136 /// ```
137 /// # #[tokio::main]
138 /// # async fn main() {
139 /// use tokio::stream::{self, StreamExt};
140 ///
141 /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
142 ///
143 /// assert_eq!(stream.try_next().await, Ok(Some(1)));
144 /// assert_eq!(stream.try_next().await, Ok(Some(2)));
145 /// assert_eq!(stream.try_next().await, Err("nope"));
146 /// # }
147 /// ```
try_next<T, E>(&mut self) -> TryNext<'_, Self> where Self: Stream<Item = Result<T, E>> + Unpin,148 fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
149 where
150 Self: Stream<Item = Result<T, E>> + Unpin,
151 {
152 TryNext::new(self)
153 }
154
155 /// Maps this stream's items to a different type, returning a new stream of
156 /// the resulting type.
157 ///
158 /// The provided closure is executed over all elements of this stream as
159 /// they are made available. It is executed inline with calls to
160 /// [`poll_next`](Stream::poll_next).
161 ///
162 /// Note that this function consumes the stream passed into it and returns a
163 /// wrapped version of it, similar to the existing `map` methods in the
164 /// standard library.
165 ///
166 /// # Examples
167 ///
168 /// ```
169 /// # #[tokio::main]
170 /// # async fn main() {
171 /// use tokio::stream::{self, StreamExt};
172 ///
173 /// let stream = stream::iter(1..=3);
174 /// let mut stream = stream.map(|x| x + 3);
175 ///
176 /// assert_eq!(stream.next().await, Some(4));
177 /// assert_eq!(stream.next().await, Some(5));
178 /// assert_eq!(stream.next().await, Some(6));
179 /// # }
180 /// ```
map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,181 fn map<T, F>(self, f: F) -> Map<Self, F>
182 where
183 F: FnMut(Self::Item) -> T,
184 Self: Sized,
185 {
186 Map::new(self, f)
187 }
188
189 /// Combine two streams into one by interleaving the output of both as it
190 /// is produced.
191 ///
192 /// Values are produced from the merged stream in the order they arrive from
193 /// the two source streams. If both source streams provide values
194 /// simultaneously, the merge stream alternates between them. This provides
195 /// some level of fairness. You should not chain calls to `merge`, as this
196 /// will break the fairness of the merging.
197 ///
198 /// The merged stream completes once **both** source streams complete. When
199 /// one source stream completes before the other, the merge stream
200 /// exclusively polls the remaining stream.
201 ///
202 /// For merging multiple streams, consider using [`StreamMap`] instead.
203 ///
204 /// [`StreamMap`]: crate::stream::StreamMap
205 ///
206 /// # Examples
207 ///
208 /// ```
209 /// use tokio::stream::StreamExt;
210 /// use tokio::sync::mpsc;
211 /// use tokio::time;
212 ///
213 /// use std::time::Duration;
214 ///
215 /// # /*
216 /// #[tokio::main]
217 /// # */
218 /// # #[tokio::main(basic_scheduler)]
219 /// async fn main() {
220 /// # time::pause();
221 /// let (mut tx1, rx1) = mpsc::channel(10);
222 /// let (mut tx2, rx2) = mpsc::channel(10);
223 ///
224 /// let mut rx = rx1.merge(rx2);
225 ///
226 /// tokio::spawn(async move {
227 /// // Send some values immediately
228 /// tx1.send(1).await.unwrap();
229 /// tx1.send(2).await.unwrap();
230 ///
231 /// // Let the other task send values
232 /// time::delay_for(Duration::from_millis(20)).await;
233 ///
234 /// tx1.send(4).await.unwrap();
235 /// });
236 ///
237 /// tokio::spawn(async move {
238 /// // Wait for the first task to send values
239 /// time::delay_for(Duration::from_millis(5)).await;
240 ///
241 /// tx2.send(3).await.unwrap();
242 ///
243 /// time::delay_for(Duration::from_millis(25)).await;
244 ///
245 /// // Send the final value
246 /// tx2.send(5).await.unwrap();
247 /// });
248 ///
249 /// assert_eq!(1, rx.next().await.unwrap());
250 /// assert_eq!(2, rx.next().await.unwrap());
251 /// assert_eq!(3, rx.next().await.unwrap());
252 /// assert_eq!(4, rx.next().await.unwrap());
253 /// assert_eq!(5, rx.next().await.unwrap());
254 ///
255 /// // The merged stream is consumed
256 /// assert!(rx.next().await.is_none());
257 /// }
258 /// ```
merge<U>(self, other: U) -> Merge<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,259 fn merge<U>(self, other: U) -> Merge<Self, U>
260 where
261 U: Stream<Item = Self::Item>,
262 Self: Sized,
263 {
264 Merge::new(self, other)
265 }
266
267 /// Filters the values produced by this stream according to the provided
268 /// predicate.
269 ///
270 /// As values of this stream are made available, the provided predicate `f`
271 /// will be run against them. If the predicate
272 /// resolves to `true`, then the stream will yield the value, but if the
273 /// predicate resolves to `false`, then the value
274 /// will be discarded and the next value will be produced.
275 ///
276 /// Note that this function consumes the stream passed into it and returns a
277 /// wrapped version of it, similar to [`Iterator::filter`] method in the
278 /// standard library.
279 ///
280 /// # Examples
281 ///
282 /// ```
283 /// # #[tokio::main]
284 /// # async fn main() {
285 /// use tokio::stream::{self, StreamExt};
286 ///
287 /// let stream = stream::iter(1..=8);
288 /// let mut evens = stream.filter(|x| x % 2 == 0);
289 ///
290 /// assert_eq!(Some(2), evens.next().await);
291 /// assert_eq!(Some(4), evens.next().await);
292 /// assert_eq!(Some(6), evens.next().await);
293 /// assert_eq!(Some(8), evens.next().await);
294 /// assert_eq!(None, evens.next().await);
295 /// # }
296 /// ```
filter<F>(self, f: F) -> Filter<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,297 fn filter<F>(self, f: F) -> Filter<Self, F>
298 where
299 F: FnMut(&Self::Item) -> bool,
300 Self: Sized,
301 {
302 Filter::new(self, f)
303 }
304
305 /// Filters the values produced by this stream while simultaneously mapping
306 /// them to a different type according to the provided closure.
307 ///
308 /// As values of this stream are made available, the provided function will
309 /// be run on them. If the predicate `f` resolves to
310 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
311 /// it resolves to [`None`], then the value will be skipped.
312 ///
313 /// Note that this function consumes the stream passed into it and returns a
314 /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
315 /// standard library.
316 ///
317 /// # Examples
318 /// ```
319 /// # #[tokio::main]
320 /// # async fn main() {
321 /// use tokio::stream::{self, StreamExt};
322 ///
323 /// let stream = stream::iter(1..=8);
324 /// let mut evens = stream.filter_map(|x| {
325 /// if x % 2 == 0 { Some(x + 1) } else { None }
326 /// });
327 ///
328 /// assert_eq!(Some(3), evens.next().await);
329 /// assert_eq!(Some(5), evens.next().await);
330 /// assert_eq!(Some(7), evens.next().await);
331 /// assert_eq!(Some(9), evens.next().await);
332 /// assert_eq!(None, evens.next().await);
333 /// # }
334 /// ```
filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized,335 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
336 where
337 F: FnMut(Self::Item) -> Option<T>,
338 Self: Sized,
339 {
340 FilterMap::new(self, f)
341 }
342
343 /// Creates a stream which ends after the first `None`.
344 ///
345 /// After a stream returns `None`, behavior is undefined. Future calls to
346 /// `poll_next` may or may not return `Some(T)` again or they may panic.
347 /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
348 /// return `None` forever.
349 ///
350 /// # Examples
351 ///
352 /// ```
353 /// use tokio::stream::{Stream, StreamExt};
354 ///
355 /// use std::pin::Pin;
356 /// use std::task::{Context, Poll};
357 ///
358 /// // a stream which alternates between Some and None
359 /// struct Alternate {
360 /// state: i32,
361 /// }
362 ///
363 /// impl Stream for Alternate {
364 /// type Item = i32;
365 ///
366 /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
367 /// let val = self.state;
368 /// self.state = self.state + 1;
369 ///
370 /// // if it's even, Some(i32), else None
371 /// if val % 2 == 0 {
372 /// Poll::Ready(Some(val))
373 /// } else {
374 /// Poll::Ready(None)
375 /// }
376 /// }
377 /// }
378 ///
379 /// #[tokio::main]
380 /// async fn main() {
381 /// let mut stream = Alternate { state: 0 };
382 ///
383 /// // the stream goes back and forth
384 /// assert_eq!(stream.next().await, Some(0));
385 /// assert_eq!(stream.next().await, None);
386 /// assert_eq!(stream.next().await, Some(2));
387 /// assert_eq!(stream.next().await, None);
388 ///
389 /// // however, once it is fused
390 /// let mut stream = stream.fuse();
391 ///
392 /// assert_eq!(stream.next().await, Some(4));
393 /// assert_eq!(stream.next().await, None);
394 ///
395 /// // it will always return `None` after the first time.
396 /// assert_eq!(stream.next().await, None);
397 /// assert_eq!(stream.next().await, None);
398 /// assert_eq!(stream.next().await, None);
399 /// }
400 /// ```
fuse(self) -> Fuse<Self> where Self: Sized,401 fn fuse(self) -> Fuse<Self>
402 where
403 Self: Sized,
404 {
405 Fuse::new(self)
406 }
407
408 /// Creates a new stream of at most `n` items of the underlying stream.
409 ///
410 /// Once `n` items have been yielded from this stream then it will always
411 /// return that the stream is done.
412 ///
413 /// # Examples
414 ///
415 /// ```
416 /// # #[tokio::main]
417 /// # async fn main() {
418 /// use tokio::stream::{self, StreamExt};
419 ///
420 /// let mut stream = stream::iter(1..=10).take(3);
421 ///
422 /// assert_eq!(Some(1), stream.next().await);
423 /// assert_eq!(Some(2), stream.next().await);
424 /// assert_eq!(Some(3), stream.next().await);
425 /// assert_eq!(None, stream.next().await);
426 /// # }
427 /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,428 fn take(self, n: usize) -> Take<Self>
429 where
430 Self: Sized,
431 {
432 Take::new(self, n)
433 }
434
435 /// Take elements from this stream while the provided predicate
436 /// resolves to `true`.
437 ///
438 /// This function, like `Iterator::take_while`, will take elements from the
439 /// stream until the predicate `f` resolves to `false`. Once one element
440 /// returns false it will always return that the stream is done.
441 ///
442 /// # Examples
443 ///
444 /// ```
445 /// # #[tokio::main]
446 /// # async fn main() {
447 /// use tokio::stream::{self, StreamExt};
448 ///
449 /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
450 ///
451 /// assert_eq!(Some(1), stream.next().await);
452 /// assert_eq!(Some(2), stream.next().await);
453 /// assert_eq!(Some(3), stream.next().await);
454 /// assert_eq!(None, stream.next().await);
455 /// # }
456 /// ```
take_while<F>(self, f: F) -> TakeWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,457 fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
458 where
459 F: FnMut(&Self::Item) -> bool,
460 Self: Sized,
461 {
462 TakeWhile::new(self, f)
463 }
464
465 /// Creates a new stream that will skip the `n` first items of the
466 /// underlying stream.
467 ///
468 /// # Examples
469 ///
470 /// ```
471 /// # #[tokio::main]
472 /// # async fn main() {
473 /// use tokio::stream::{self, StreamExt};
474 ///
475 /// let mut stream = stream::iter(1..=10).skip(7);
476 ///
477 /// assert_eq!(Some(8), stream.next().await);
478 /// assert_eq!(Some(9), stream.next().await);
479 /// assert_eq!(Some(10), stream.next().await);
480 /// assert_eq!(None, stream.next().await);
481 /// # }
482 /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,483 fn skip(self, n: usize) -> Skip<Self>
484 where
485 Self: Sized,
486 {
487 Skip::new(self, n)
488 }
489
490 /// Skip elements from the underlying stream while the provided predicate
491 /// resolves to `true`.
492 ///
493 /// This function, like [`Iterator::skip_while`], will ignore elemets from the
494 /// stream until the predicate `f` resolves to `false`. Once one element
495 /// returns false, the rest of the elements will be yielded.
496 ///
497 /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
498 ///
499 /// # Examples
500 ///
501 /// ```
502 /// # #[tokio::main]
503 /// # async fn main() {
504 /// use tokio::stream::{self, StreamExt};
505 /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
506 ///
507 /// assert_eq!(Some(3), stream.next().await);
508 /// assert_eq!(Some(4), stream.next().await);
509 /// assert_eq!(Some(1), stream.next().await);
510 /// assert_eq!(None, stream.next().await);
511 /// # }
512 /// ```
skip_while<F>(self, f: F) -> SkipWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,513 fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
514 where
515 F: FnMut(&Self::Item) -> bool,
516 Self: Sized,
517 {
518 SkipWhile::new(self, f)
519 }
520
521 /// Tests if every element of the stream matches a predicate.
522 ///
523 /// `all()` takes a closure that returns `true` or `false`. It applies
524 /// this closure to each element of the stream, and if they all return
525 /// `true`, then so does `all`. If any of them return `false`, it
526 /// returns `false`. An empty stream returns `true`.
527 ///
528 /// `all()` is short-circuiting; in other words, it will stop processing
529 /// as soon as it finds a `false`, given that no matter what else happens,
530 /// the result will also be `false`.
531 ///
532 /// An empty stream returns `true`.
533 ///
534 /// # Examples
535 ///
536 /// Basic usage:
537 ///
538 /// ```
539 /// # #[tokio::main]
540 /// # async fn main() {
541 /// use tokio::stream::{self, StreamExt};
542 ///
543 /// let a = [1, 2, 3];
544 ///
545 /// assert!(stream::iter(&a).all(|&x| x > 0).await);
546 ///
547 /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
548 /// # }
549 /// ```
550 ///
551 /// Stopping at the first `false`:
552 ///
553 /// ```
554 /// # #[tokio::main]
555 /// # async fn main() {
556 /// use tokio::stream::{self, StreamExt};
557 ///
558 /// let a = [1, 2, 3];
559 ///
560 /// let mut iter = stream::iter(&a);
561 ///
562 /// assert!(!iter.all(|&x| x != 2).await);
563 ///
564 /// // we can still use `iter`, as there are more elements.
565 /// assert_eq!(iter.next().await, Some(&3));
566 /// # }
567 /// ```
all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,568 fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
569 where
570 Self: Unpin,
571 F: FnMut(Self::Item) -> bool,
572 {
573 AllFuture::new(self, f)
574 }
575
576 /// Tests if any element of the stream matches a predicate.
577 ///
578 /// `any()` takes a closure that returns `true` or `false`. It applies
579 /// this closure to each element of the stream, and if any of them return
580 /// `true`, then so does `any()`. If they all return `false`, it
581 /// returns `false`.
582 ///
583 /// `any()` is short-circuiting; in other words, it will stop processing
584 /// as soon as it finds a `true`, given that no matter what else happens,
585 /// the result will also be `true`.
586 ///
587 /// An empty stream returns `false`.
588 ///
589 /// Basic usage:
590 ///
591 /// ```
592 /// # #[tokio::main]
593 /// # async fn main() {
594 /// use tokio::stream::{self, StreamExt};
595 ///
596 /// let a = [1, 2, 3];
597 ///
598 /// assert!(stream::iter(&a).any(|&x| x > 0).await);
599 ///
600 /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
601 /// # }
602 /// ```
603 ///
604 /// Stopping at the first `true`:
605 ///
606 /// ```
607 /// # #[tokio::main]
608 /// # async fn main() {
609 /// use tokio::stream::{self, StreamExt};
610 ///
611 /// let a = [1, 2, 3];
612 ///
613 /// let mut iter = stream::iter(&a);
614 ///
615 /// assert!(iter.any(|&x| x != 2).await);
616 ///
617 /// // we can still use `iter`, as there are more elements.
618 /// assert_eq!(iter.next().await, Some(&2));
619 /// # }
620 /// ```
any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,621 fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
622 where
623 Self: Unpin,
624 F: FnMut(Self::Item) -> bool,
625 {
626 AnyFuture::new(self, f)
627 }
628
629 /// Combine two streams into one by first returning all values from the
630 /// first stream then all values from the second stream.
631 ///
632 /// As long as `self` still has values to emit, no values from `other` are
633 /// emitted, even if some are ready.
634 ///
635 /// # Examples
636 ///
637 /// ```
638 /// use tokio::stream::{self, StreamExt};
639 ///
640 /// #[tokio::main]
641 /// async fn main() {
642 /// let one = stream::iter(vec![1, 2, 3]);
643 /// let two = stream::iter(vec![4, 5, 6]);
644 ///
645 /// let mut stream = one.chain(two);
646 ///
647 /// assert_eq!(stream.next().await, Some(1));
648 /// assert_eq!(stream.next().await, Some(2));
649 /// assert_eq!(stream.next().await, Some(3));
650 /// assert_eq!(stream.next().await, Some(4));
651 /// assert_eq!(stream.next().await, Some(5));
652 /// assert_eq!(stream.next().await, Some(6));
653 /// assert_eq!(stream.next().await, None);
654 /// }
655 /// ```
chain<U>(self, other: U) -> Chain<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,656 fn chain<U>(self, other: U) -> Chain<Self, U>
657 where
658 U: Stream<Item = Self::Item>,
659 Self: Sized,
660 {
661 Chain::new(self, other)
662 }
663
664 /// A combinator that applies a function to every element in a stream
665 /// producing a single, final value.
666 ///
667 /// # Examples
668 /// Basic usage:
669 /// ```
670 /// # #[tokio::main]
671 /// # async fn main() {
672 /// use tokio::stream::{self, *};
673 ///
674 /// let s = stream::iter(vec![1u8, 2, 3]);
675 /// let sum = s.fold(0, |acc, x| acc + x).await;
676 ///
677 /// assert_eq!(sum, 6);
678 /// # }
679 /// ```
fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> where Self: Sized, F: FnMut(B, Self::Item) -> B,680 fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
681 where
682 Self: Sized,
683 F: FnMut(B, Self::Item) -> B,
684 {
685 FoldFuture::new(self, init, f)
686 }
687
688 /// Drain stream pushing all emitted values into a collection.
689 ///
690 /// `collect` streams all values, awaiting as needed. Values are pushed into
691 /// a collection. A number of different target collection types are
692 /// supported, including [`Vec`](std::vec::Vec),
693 /// [`String`](std::string::String), and [`Bytes`](bytes::Bytes).
694 ///
695 /// # `Result`
696 ///
697 /// `collect()` can also be used with streams of type `Result<T, E>` where
698 /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
699 /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
700 /// streaming is terminated and `collect()` returns the `Err`.
701 ///
702 /// # Notes
703 ///
704 /// `FromStream` is currently a sealed trait. Stabilization is pending
705 /// enhancements to the Rust language.
706 ///
707 /// # Examples
708 ///
709 /// Basic usage:
710 ///
711 /// ```
712 /// use tokio::stream::{self, StreamExt};
713 ///
714 /// #[tokio::main]
715 /// async fn main() {
716 /// let doubled: Vec<i32> =
717 /// stream::iter(vec![1, 2, 3])
718 /// .map(|x| x * 2)
719 /// .collect()
720 /// .await;
721 ///
722 /// assert_eq!(vec![2, 4, 6], doubled);
723 /// }
724 /// ```
725 ///
726 /// Collecting a stream of `Result` values
727 ///
728 /// ```
729 /// use tokio::stream::{self, StreamExt};
730 ///
731 /// #[tokio::main]
732 /// async fn main() {
733 /// // A stream containing only `Ok` values will be collected
734 /// let values: Result<Vec<i32>, &str> =
735 /// stream::iter(vec![Ok(1), Ok(2), Ok(3)])
736 /// .collect()
737 /// .await;
738 ///
739 /// assert_eq!(Ok(vec![1, 2, 3]), values);
740 ///
741 /// // A stream containing `Err` values will return the first error.
742 /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
743 ///
744 /// let values: Result<Vec<i32>, &str> =
745 /// stream::iter(results)
746 /// .collect()
747 /// .await;
748 ///
749 /// assert_eq!(Err("no"), values);
750 /// }
751 /// ```
collect<T>(self) -> Collect<Self, T> where T: FromStream<Self::Item>, Self: Sized,752 fn collect<T>(self) -> Collect<Self, T>
753 where
754 T: FromStream<Self::Item>,
755 Self: Sized,
756 {
757 Collect::new(self)
758 }
759
760 /// Applies a per-item timeout to the passed stream.
761 ///
762 /// `timeout()` takes a `Duration` that represents the maximum amount of
763 /// time each element of the stream has to complete before timing out.
764 ///
765 /// If the wrapped stream yields a value before the deadline is reached, the
766 /// value is returned. Otherwise, an error is returned. The caller may decide
767 /// to continue consuming the stream and will eventually get the next source
768 /// stream value once it becomes available.
769 ///
770 /// # Notes
771 ///
772 /// This function consumes the stream passed into it and returns a
773 /// wrapped version of it.
774 ///
775 /// Polling the returned stream will continue to poll the inner stream even
776 /// if one or more items time out.
777 ///
778 /// # Examples
779 ///
780 /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
781 ///
782 /// ```
783 /// # #[tokio::main]
784 /// # async fn main() {
785 /// use tokio::stream::{self, StreamExt};
786 /// use std::time::Duration;
787 /// # let int_stream = stream::iter(1..=3);
788 ///
789 /// let mut int_stream = int_stream.timeout(Duration::from_secs(1));
790 ///
791 /// // When no items time out, we get the 3 elements in succession:
792 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
793 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
794 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
795 /// assert_eq!(int_stream.try_next().await, Ok(None));
796 ///
797 /// // If the second item times out, we get an error and continue polling the stream:
798 /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
799 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
800 /// assert!(int_stream.try_next().await.is_err());
801 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
802 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
803 /// assert_eq!(int_stream.try_next().await, Ok(None));
804 ///
805 /// // If we want to stop consuming the source stream the first time an
806 /// // element times out, we can use the `take_while` operator:
807 /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
808 /// let mut int_stream = int_stream.take_while(Result::is_ok);
809 ///
810 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
811 /// assert_eq!(int_stream.try_next().await, Ok(None));
812 /// # }
813 /// ```
814 #[cfg(all(feature = "time"))]
815 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
timeout(self, duration: Duration) -> Timeout<Self> where Self: Sized,816 fn timeout(self, duration: Duration) -> Timeout<Self>
817 where
818 Self: Sized,
819 {
820 Timeout::new(self, duration)
821 }
822 }
823
824 impl<St: ?Sized> StreamExt for St where St: Stream {}
825
826 /// Merge the size hints from two streams.
merge_size_hints( (left_low, left_high): (usize, Option<usize>), (right_low, right_hign): (usize, Option<usize>), ) -> (usize, Option<usize>)827 fn merge_size_hints(
828 (left_low, left_high): (usize, Option<usize>),
829 (right_low, right_hign): (usize, Option<usize>),
830 ) -> (usize, Option<usize>) {
831 let low = left_low.saturating_add(right_low);
832 let high = match (left_high, right_hign) {
833 (Some(h1), Some(h2)) => h1.checked_add(h2),
834 _ => None,
835 };
836 (low, high)
837 }
838