1 //! Streams
2 //!
3 //! This module contains a number of functions for working with `Stream`s,
4 //! including the `StreamExt` trait which adds methods to `Stream` types.
5 
6 use crate::future::{assert_future, Either};
7 use crate::stream::assert_stream;
8 #[cfg(feature = "alloc")]
9 use alloc::boxed::Box;
10 #[cfg(feature = "alloc")]
11 use alloc::vec::Vec;
12 use core::pin::Pin;
13 #[cfg(feature = "sink")]
14 use futures_core::stream::TryStream;
15 #[cfg(feature = "alloc")]
16 use futures_core::stream::{BoxStream, LocalBoxStream};
17 use futures_core::{
18     future::Future,
19     stream::{FusedStream, Stream},
20     task::{Context, Poll},
21 };
22 #[cfg(feature = "sink")]
23 use futures_sink::Sink;
24 
25 use crate::fns::{inspect_fn, InspectFn};
26 
27 mod chain;
28 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
29 pub use self::chain::Chain;
30 
31 mod collect;
32 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
33 pub use self::collect::Collect;
34 
35 mod unzip;
36 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
37 pub use self::unzip::Unzip;
38 
39 mod concat;
40 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
41 pub use self::concat::Concat;
42 
43 mod count;
44 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
45 pub use self::count::Count;
46 
47 mod cycle;
48 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
49 pub use self::cycle::Cycle;
50 
51 mod enumerate;
52 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
53 pub use self::enumerate::Enumerate;
54 
55 mod filter;
56 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
57 pub use self::filter::Filter;
58 
59 mod filter_map;
60 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
61 pub use self::filter_map::FilterMap;
62 
63 mod flatten;
64 
65 delegate_all!(
66     /// Stream for the [`flatten`](StreamExt::flatten) method.
67     Flatten<St>(
68         flatten::Flatten<St, St::Item>
69     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)]
70     where St: Stream
71 );
72 
73 mod fold;
74 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
75 pub use self::fold::Fold;
76 
77 mod any;
78 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
79 pub use self::any::Any;
80 
81 mod all;
82 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
83 pub use self::all::All;
84 
85 #[cfg(feature = "sink")]
86 mod forward;
87 
88 #[cfg(feature = "sink")]
89 delegate_all!(
90     /// Future for the [`forward`](super::StreamExt::forward) method.
91     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
92     Forward<St, Si>(
93         forward::Forward<St, Si, St::Ok>
94     ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)]
95     where St: TryStream
96 );
97 
98 mod for_each;
99 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
100 pub use self::for_each::ForEach;
101 
102 mod fuse;
103 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
104 pub use self::fuse::Fuse;
105 
106 mod into_future;
107 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
108 pub use self::into_future::StreamFuture;
109 
110 delegate_all!(
111     /// Stream for the [`inspect`](StreamExt::inspect) method.
112     Inspect<St, F>(
113         map::Map<St, InspectFn<F>>
114     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| map::Map::new(x, inspect_fn(f))]
115 );
116 
117 mod map;
118 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
119 pub use self::map::Map;
120 
121 delegate_all!(
122     /// Stream for the [`flat_map`](StreamExt::flat_map) method.
123     FlatMap<St, U, F>(
124         flatten::Flatten<Map<St, F>, U>
125     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))]
126 );
127 
128 mod next;
129 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
130 pub use self::next::Next;
131 
132 mod select_next_some;
133 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
134 pub use self::select_next_some::SelectNextSome;
135 
136 mod peek;
137 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
138 pub use self::peek::{NextIf, NextIfEq, Peek, PeekMut, Peekable};
139 
140 mod skip;
141 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
142 pub use self::skip::Skip;
143 
144 mod skip_while;
145 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
146 pub use self::skip_while::SkipWhile;
147 
148 mod take;
149 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
150 pub use self::take::Take;
151 
152 mod take_while;
153 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
154 pub use self::take_while::TakeWhile;
155 
156 mod take_until;
157 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
158 pub use self::take_until::TakeUntil;
159 
160 mod then;
161 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
162 pub use self::then::Then;
163 
164 mod zip;
165 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
166 pub use self::zip::Zip;
167 
168 #[cfg(feature = "alloc")]
169 mod chunks;
170 #[cfg(feature = "alloc")]
171 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
172 pub use self::chunks::Chunks;
173 
174 #[cfg(feature = "alloc")]
175 mod ready_chunks;
176 #[cfg(feature = "alloc")]
177 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
178 pub use self::ready_chunks::ReadyChunks;
179 
180 mod scan;
181 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
182 pub use self::scan::Scan;
183 
184 #[cfg(not(futures_no_atomic_cas))]
185 #[cfg(feature = "alloc")]
186 mod buffer_unordered;
187 #[cfg(not(futures_no_atomic_cas))]
188 #[cfg(feature = "alloc")]
189 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
190 pub use self::buffer_unordered::BufferUnordered;
191 
192 #[cfg(not(futures_no_atomic_cas))]
193 #[cfg(feature = "alloc")]
194 mod buffered;
195 #[cfg(not(futures_no_atomic_cas))]
196 #[cfg(feature = "alloc")]
197 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
198 pub use self::buffered::Buffered;
199 
200 #[cfg(not(futures_no_atomic_cas))]
201 #[cfg(feature = "alloc")]
202 mod flatten_unordered;
203 
204 #[cfg(not(futures_no_atomic_cas))]
205 #[cfg(feature = "alloc")]
206 #[allow(unreachable_pub)]
207 pub use self::flatten_unordered::FlattenUnordered;
208 
209 #[cfg(not(futures_no_atomic_cas))]
210 #[cfg(feature = "alloc")]
211 delegate_all!(
212     /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method.
213     FlatMapUnordered<St, U, F>(
214         FlattenUnordered<Map<St, F>>
215     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)]
216     where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U
217 );
218 
219 #[cfg(not(futures_no_atomic_cas))]
220 #[cfg(feature = "alloc")]
221 mod for_each_concurrent;
222 #[cfg(not(futures_no_atomic_cas))]
223 #[cfg(feature = "alloc")]
224 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
225 pub use self::for_each_concurrent::ForEachConcurrent;
226 
227 #[cfg(not(futures_no_atomic_cas))]
228 #[cfg(feature = "sink")]
229 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
230 #[cfg(feature = "alloc")]
231 mod split;
232 #[cfg(not(futures_no_atomic_cas))]
233 #[cfg(feature = "sink")]
234 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
235 #[cfg(feature = "alloc")]
236 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
237 pub use self::split::{ReuniteError, SplitSink, SplitStream};
238 
239 #[cfg(feature = "std")]
240 mod catch_unwind;
241 #[cfg(feature = "std")]
242 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
243 pub use self::catch_unwind::CatchUnwind;
244 
245 impl<T: ?Sized> StreamExt for T where T: Stream {}
246 
247 /// An extension trait for `Stream`s that provides a variety of convenient
248 /// combinator functions.
249 pub trait StreamExt: Stream {
250     /// Creates a future that resolves to the next item in the stream.
251     ///
252     /// Note that because `next` doesn't take ownership over the stream,
253     /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
254     /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
255     /// be done by boxing the stream using [`Box::pin`] or
256     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
257     /// crate.
258     ///
259     /// # Examples
260     ///
261     /// ```
262     /// # futures::executor::block_on(async {
263     /// use futures::stream::{self, StreamExt};
264     ///
265     /// let mut stream = stream::iter(1..=3);
266     ///
267     /// assert_eq!(stream.next().await, Some(1));
268     /// assert_eq!(stream.next().await, Some(2));
269     /// assert_eq!(stream.next().await, Some(3));
270     /// assert_eq!(stream.next().await, None);
271     /// # });
272     /// ```
next(&mut self) -> Next<'_, Self> where Self: Unpin,273     fn next(&mut self) -> Next<'_, Self>
274     where
275         Self: Unpin,
276     {
277         assert_future::<Option<Self::Item>, _>(Next::new(self))
278     }
279 
280     /// Converts this stream into a future of `(next_item, tail_of_stream)`.
281     /// If the stream terminates, then the next item is [`None`].
282     ///
283     /// The returned future can be used to compose streams and futures together
284     /// by placing everything into the "world of futures".
285     ///
286     /// Note that because `into_future` moves the stream, the [`Stream`] type
287     /// must be [`Unpin`]. If you want to use `into_future` with a
288     /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
289     /// be done by boxing the stream using [`Box::pin`] or
290     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
291     /// crate.
292     ///
293     /// # Examples
294     ///
295     /// ```
296     /// # futures::executor::block_on(async {
297     /// use futures::stream::{self, StreamExt};
298     ///
299     /// let stream = stream::iter(1..=3);
300     ///
301     /// let (item, stream) = stream.into_future().await;
302     /// assert_eq!(Some(1), item);
303     ///
304     /// let (item, stream) = stream.into_future().await;
305     /// assert_eq!(Some(2), item);
306     /// # });
307     /// ```
into_future(self) -> StreamFuture<Self> where Self: Sized + Unpin,308     fn into_future(self) -> StreamFuture<Self>
309     where
310         Self: Sized + Unpin,
311     {
312         assert_future::<(Option<Self::Item>, Self), _>(StreamFuture::new(self))
313     }
314 
315     /// Maps this stream's items to a different type, returning a new stream of
316     /// the resulting type.
317     ///
318     /// The provided closure is executed over all elements of this stream as
319     /// they are made available. It is executed inline with calls to
320     /// [`poll_next`](Stream::poll_next).
321     ///
322     /// Note that this function consumes the stream passed into it and returns a
323     /// wrapped version of it, similar to the existing `map` methods in the
324     /// standard library.
325     ///
326     /// # Examples
327     ///
328     /// ```
329     /// # futures::executor::block_on(async {
330     /// use futures::stream::{self, StreamExt};
331     ///
332     /// let stream = stream::iter(1..=3);
333     /// let stream = stream.map(|x| x + 3);
334     ///
335     /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
336     /// # });
337     /// ```
map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,338     fn map<T, F>(self, f: F) -> Map<Self, F>
339     where
340         F: FnMut(Self::Item) -> T,
341         Self: Sized,
342     {
343         assert_stream::<T, _>(Map::new(self, f))
344     }
345 
346     /// Creates a stream which gives the current iteration count as well as
347     /// the next value.
348     ///
349     /// The stream returned yields pairs `(i, val)`, where `i` is the
350     /// current index of iteration and `val` is the value returned by the
351     /// stream.
352     ///
353     /// `enumerate()` keeps its count as a [`usize`]. If you want to count by a
354     /// different sized integer, the [`zip`](StreamExt::zip) function provides similar
355     /// functionality.
356     ///
357     /// # Overflow Behavior
358     ///
359     /// The method does no guarding against overflows, so enumerating more than
360     /// [`prim@usize::max_value()`] elements either produces the wrong result or panics. If
361     /// debug assertions are enabled, a panic is guaranteed.
362     ///
363     /// # Panics
364     ///
365     /// The returned stream might panic if the to-be-returned index would
366     /// overflow a [`usize`].
367     ///
368     /// # Examples
369     ///
370     /// ```
371     /// # futures::executor::block_on(async {
372     /// use futures::stream::{self, StreamExt};
373     ///
374     /// let stream = stream::iter(vec!['a', 'b', 'c']);
375     ///
376     /// let mut stream = stream.enumerate();
377     ///
378     /// assert_eq!(stream.next().await, Some((0, 'a')));
379     /// assert_eq!(stream.next().await, Some((1, 'b')));
380     /// assert_eq!(stream.next().await, Some((2, 'c')));
381     /// assert_eq!(stream.next().await, None);
382     /// # });
383     /// ```
enumerate(self) -> Enumerate<Self> where Self: Sized,384     fn enumerate(self) -> Enumerate<Self>
385     where
386         Self: Sized,
387     {
388         assert_stream::<(usize, Self::Item), _>(Enumerate::new(self))
389     }
390 
391     /// Filters the values produced by this stream according to the provided
392     /// asynchronous predicate.
393     ///
394     /// As values of this stream are made available, the provided predicate `f`
395     /// will be run against them. If the predicate returns a `Future` which
396     /// resolves to `true`, then the stream will yield the value, but if the
397     /// predicate returns a `Future` which resolves to `false`, then the value
398     /// will be discarded and the next value will be produced.
399     ///
400     /// Note that this function consumes the stream passed into it and returns a
401     /// wrapped version of it, similar to the existing `filter` methods in the
402     /// standard library.
403     ///
404     /// # Examples
405     ///
406     /// ```
407     /// # futures::executor::block_on(async {
408     /// use futures::future;
409     /// use futures::stream::{self, StreamExt};
410     ///
411     /// let stream = stream::iter(1..=10);
412     /// let events = stream.filter(|x| future::ready(x % 2 == 0));
413     ///
414     /// assert_eq!(vec![2, 4, 6, 8, 10], events.collect::<Vec<_>>().await);
415     /// # });
416     /// ```
filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,417     fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
418     where
419         F: FnMut(&Self::Item) -> Fut,
420         Fut: Future<Output = bool>,
421         Self: Sized,
422     {
423         assert_stream::<Self::Item, _>(Filter::new(self, f))
424     }
425 
426     /// Filters the values produced by this stream while simultaneously mapping
427     /// them to a different type according to the provided asynchronous closure.
428     ///
429     /// As values of this stream are made available, the provided function will
430     /// be run on them. If the future returned by the predicate `f` resolves to
431     /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
432     /// it resolves to [`None`] then the next value will be produced.
433     ///
434     /// Note that this function consumes the stream passed into it and returns a
435     /// wrapped version of it, similar to the existing `filter_map` methods in
436     /// the standard library.
437     ///
438     /// # Examples
439     /// ```
440     /// # futures::executor::block_on(async {
441     /// use futures::stream::{self, StreamExt};
442     ///
443     /// let stream = stream::iter(1..=10);
444     /// let events = stream.filter_map(|x| async move {
445     ///     if x % 2 == 0 { Some(x + 1) } else { None }
446     /// });
447     ///
448     /// assert_eq!(vec![3, 5, 7, 9, 11], events.collect::<Vec<_>>().await);
449     /// # });
450     /// ```
filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = Option<T>>, Self: Sized,451     fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
452     where
453         F: FnMut(Self::Item) -> Fut,
454         Fut: Future<Output = Option<T>>,
455         Self: Sized,
456     {
457         assert_stream::<T, _>(FilterMap::new(self, f))
458     }
459 
460     /// Computes from this stream's items new items of a different type using
461     /// an asynchronous closure.
462     ///
463     /// The provided closure `f` will be called with an `Item` once a value is
464     /// ready, it returns a future which will then be run to completion
465     /// to produce the next value on this stream.
466     ///
467     /// Note that this function consumes the stream passed into it and returns a
468     /// wrapped version of it.
469     ///
470     /// # Examples
471     ///
472     /// ```
473     /// # futures::executor::block_on(async {
474     /// use futures::stream::{self, StreamExt};
475     ///
476     /// let stream = stream::iter(1..=3);
477     /// let stream = stream.then(|x| async move { x + 3 });
478     ///
479     /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
480     /// # });
481     /// ```
then<Fut, F>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,482     fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
483     where
484         F: FnMut(Self::Item) -> Fut,
485         Fut: Future,
486         Self: Sized,
487     {
488         assert_stream::<Fut::Output, _>(Then::new(self, f))
489     }
490 
491     /// Transforms a stream into a collection, returning a
492     /// future representing the result of that computation.
493     ///
494     /// The returned future will be resolved when the stream terminates.
495     ///
496     /// # Examples
497     ///
498     /// ```
499     /// # futures::executor::block_on(async {
500     /// use futures::channel::mpsc;
501     /// use futures::stream::StreamExt;
502     /// use std::thread;
503     ///
504     /// let (tx, rx) = mpsc::unbounded();
505     ///
506     /// thread::spawn(move || {
507     ///     for i in 1..=5 {
508     ///         tx.unbounded_send(i).unwrap();
509     ///     }
510     /// });
511     ///
512     /// let output = rx.collect::<Vec<i32>>().await;
513     /// assert_eq!(output, vec![1, 2, 3, 4, 5]);
514     /// # });
515     /// ```
collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C> where Self: Sized,516     fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C>
517     where
518         Self: Sized,
519     {
520         assert_future::<C, _>(Collect::new(self))
521     }
522 
523     /// Converts a stream of pairs into a future, which
524     /// resolves to pair of containers.
525     ///
526     /// `unzip()` produces a future, which resolves to two
527     /// collections: one from the left elements of the pairs,
528     /// and one from the right elements.
529     ///
530     /// The returned future will be resolved when the stream terminates.
531     ///
532     /// # Examples
533     ///
534     /// ```
535     /// # futures::executor::block_on(async {
536     /// use futures::channel::mpsc;
537     /// use futures::stream::StreamExt;
538     /// use std::thread;
539     ///
540     /// let (tx, rx) = mpsc::unbounded();
541     ///
542     /// thread::spawn(move || {
543     ///     tx.unbounded_send((1, 2)).unwrap();
544     ///     tx.unbounded_send((3, 4)).unwrap();
545     ///     tx.unbounded_send((5, 6)).unwrap();
546     /// });
547     ///
548     /// let (o1, o2): (Vec<_>, Vec<_>) = rx.unzip().await;
549     /// assert_eq!(o1, vec![1, 3, 5]);
550     /// assert_eq!(o2, vec![2, 4, 6]);
551     /// # });
552     /// ```
unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB> where FromA: Default + Extend<A>, FromB: Default + Extend<B>, Self: Sized + Stream<Item = (A, B)>,553     fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
554     where
555         FromA: Default + Extend<A>,
556         FromB: Default + Extend<B>,
557         Self: Sized + Stream<Item = (A, B)>,
558     {
559         assert_future::<(FromA, FromB), _>(Unzip::new(self))
560     }
561 
562     /// Concatenate all items of a stream into a single extendable
563     /// destination, returning a future representing the end result.
564     ///
565     /// This combinator will extend the first item with the contents
566     /// of all the subsequent results of the stream. If the stream is
567     /// empty, the default value will be returned.
568     ///
569     /// Works with all collections that implement the
570     /// [`Extend`](std::iter::Extend) trait.
571     ///
572     /// # Examples
573     ///
574     /// ```
575     /// # futures::executor::block_on(async {
576     /// use futures::channel::mpsc;
577     /// use futures::stream::StreamExt;
578     /// use std::thread;
579     ///
580     /// let (tx, rx) = mpsc::unbounded();
581     ///
582     /// thread::spawn(move || {
583     ///     for i in (0..3).rev() {
584     ///         let n = i * 3;
585     ///         tx.unbounded_send(vec![n + 1, n + 2, n + 3]).unwrap();
586     ///     }
587     /// });
588     ///
589     /// let result = rx.concat().await;
590     ///
591     /// assert_eq!(result, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]);
592     /// # });
593     /// ```
concat(self) -> Concat<Self> where Self: Sized, Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,594     fn concat(self) -> Concat<Self>
595     where
596         Self: Sized,
597         Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
598     {
599         assert_future::<Self::Item, _>(Concat::new(self))
600     }
601 
602     /// Drives the stream to completion, counting the number of items.
603     ///
604     /// # Overflow Behavior
605     ///
606     /// The method does no guarding against overflows, so counting elements of a
607     /// stream with more than [`usize::MAX`] elements either produces the wrong
608     /// result or panics. If debug assertions are enabled, a panic is guaranteed.
609     ///
610     /// # Panics
611     ///
612     /// This function might panic if the iterator has more than [`usize::MAX`]
613     /// elements.
614     ///
615     /// # Examples
616     ///
617     /// ```
618     /// # futures::executor::block_on(async {
619     /// use futures::stream::{self, StreamExt};
620     ///
621     /// let stream = stream::iter(1..=10);
622     /// let count = stream.count().await;
623     ///
624     /// assert_eq!(count, 10);
625     /// # });
626     /// ```
count(self) -> Count<Self> where Self: Sized,627     fn count(self) -> Count<Self>
628     where
629         Self: Sized,
630     {
631         assert_future::<usize, _>(Count::new(self))
632     }
633 
634     /// Repeats a stream endlessly.
635     ///
636     /// The stream never terminates. Note that you likely want to avoid
637     /// usage of `collect` or such on the returned stream as it will exhaust
638     /// available memory as it tries to just fill up all RAM.
639     ///
640     /// # Examples
641     ///
642     /// ```
643     /// # futures::executor::block_on(async {
644     /// use futures::stream::{self, StreamExt};
645     /// let a = [1, 2, 3];
646     /// let mut s = stream::iter(a.iter()).cycle();
647     ///
648     /// assert_eq!(s.next().await, Some(&1));
649     /// assert_eq!(s.next().await, Some(&2));
650     /// assert_eq!(s.next().await, Some(&3));
651     /// assert_eq!(s.next().await, Some(&1));
652     /// assert_eq!(s.next().await, Some(&2));
653     /// assert_eq!(s.next().await, Some(&3));
654     /// assert_eq!(s.next().await, Some(&1));
655     /// # });
656     /// ```
cycle(self) -> Cycle<Self> where Self: Sized + Clone,657     fn cycle(self) -> Cycle<Self>
658     where
659         Self: Sized + Clone,
660     {
661         assert_stream::<Self::Item, _>(Cycle::new(self))
662     }
663 
664     /// Execute an accumulating asynchronous computation over a stream,
665     /// collecting all the values into one final result.
666     ///
667     /// This combinator will accumulate all values returned by this stream
668     /// according to the closure provided. The initial state is also provided to
669     /// this method and then is returned again by each execution of the closure.
670     /// Once the entire stream has been exhausted the returned future will
671     /// resolve to this value.
672     ///
673     /// # Examples
674     ///
675     /// ```
676     /// # futures::executor::block_on(async {
677     /// use futures::stream::{self, StreamExt};
678     ///
679     /// let number_stream = stream::iter(0..6);
680     /// let sum = number_stream.fold(0, |acc, x| async move { acc + x });
681     /// assert_eq!(sum.await, 15);
682     /// # });
683     /// ```
fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> where F: FnMut(T, Self::Item) -> Fut, Fut: Future<Output = T>, Self: Sized,684     fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
685     where
686         F: FnMut(T, Self::Item) -> Fut,
687         Fut: Future<Output = T>,
688         Self: Sized,
689     {
690         assert_future::<T, _>(Fold::new(self, f, init))
691     }
692 
693     /// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate.
694     ///
695     /// # Examples
696     ///
697     /// ```
698     /// # futures::executor::block_on(async {
699     /// use futures::stream::{self, StreamExt};
700     ///
701     /// let number_stream = stream::iter(0..10);
702     /// let contain_three = number_stream.any(|i| async move { i == 3 });
703     /// assert_eq!(contain_three.await, true);
704     /// # });
705     /// ```
any<Fut, F>(self, f: F) -> Any<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,706     fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
707     where
708         F: FnMut(Self::Item) -> Fut,
709         Fut: Future<Output = bool>,
710         Self: Sized,
711     {
712         assert_future::<bool, _>(Any::new(self, f))
713     }
714 
715     /// Execute predicate over asynchronous stream, and return `true` if all element in stream satisfied a predicate.
716     ///
717     /// # Examples
718     ///
719     /// ```
720     /// # futures::executor::block_on(async {
721     /// use futures::stream::{self, StreamExt};
722     ///
723     /// let number_stream = stream::iter(0..10);
724     /// let less_then_twenty = number_stream.all(|i| async move { i < 20 });
725     /// assert_eq!(less_then_twenty.await, true);
726     /// # });
727     /// ```
all<Fut, F>(self, f: F) -> All<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,728     fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
729     where
730         F: FnMut(Self::Item) -> Fut,
731         Fut: Future<Output = bool>,
732         Self: Sized,
733     {
734         assert_future::<bool, _>(All::new(self, f))
735     }
736 
737     /// Flattens a stream of streams into just one continuous stream.
738     ///
739     /// # Examples
740     ///
741     /// ```
742     /// # futures::executor::block_on(async {
743     /// use futures::channel::mpsc;
744     /// use futures::stream::StreamExt;
745     /// use std::thread;
746     ///
747     /// let (tx1, rx1) = mpsc::unbounded();
748     /// let (tx2, rx2) = mpsc::unbounded();
749     /// let (tx3, rx3) = mpsc::unbounded();
750     ///
751     /// thread::spawn(move || {
752     ///     tx1.unbounded_send(1).unwrap();
753     ///     tx1.unbounded_send(2).unwrap();
754     /// });
755     /// thread::spawn(move || {
756     ///     tx2.unbounded_send(3).unwrap();
757     ///     tx2.unbounded_send(4).unwrap();
758     /// });
759     /// thread::spawn(move || {
760     ///     tx3.unbounded_send(rx1).unwrap();
761     ///     tx3.unbounded_send(rx2).unwrap();
762     /// });
763     ///
764     /// let output = rx3.flatten().collect::<Vec<i32>>().await;
765     /// assert_eq!(output, vec![1, 2, 3, 4]);
766     /// # });
767     /// ```
flatten(self) -> Flatten<Self> where Self::Item: Stream, Self: Sized,768     fn flatten(self) -> Flatten<Self>
769     where
770         Self::Item: Stream,
771         Self: Sized,
772     {
773         assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self))
774     }
775 
776     /// Flattens a stream of streams into just one continuous stream. Polls
777     /// inner streams concurrently.
778     ///
779     /// # Examples
780     ///
781     /// ```
782     /// # futures::executor::block_on(async {
783     /// use futures::channel::mpsc;
784     /// use futures::stream::StreamExt;
785     /// use std::thread;
786     ///
787     /// let (tx1, rx1) = mpsc::unbounded();
788     /// let (tx2, rx2) = mpsc::unbounded();
789     /// let (tx3, rx3) = mpsc::unbounded();
790     ///
791     /// thread::spawn(move || {
792     ///     tx1.unbounded_send(1).unwrap();
793     ///     tx1.unbounded_send(2).unwrap();
794     /// });
795     /// thread::spawn(move || {
796     ///     tx2.unbounded_send(3).unwrap();
797     ///     tx2.unbounded_send(4).unwrap();
798     /// });
799     /// thread::spawn(move || {
800     ///     tx3.unbounded_send(rx1).unwrap();
801     ///     tx3.unbounded_send(rx2).unwrap();
802     /// });
803     ///
804     /// let mut output = rx3.flatten_unordered(None).collect::<Vec<i32>>().await;
805     /// output.sort();
806     ///
807     /// assert_eq!(output, vec![1, 2, 3, 4]);
808     /// # });
809     /// ```
810     #[cfg(not(futures_no_atomic_cas))]
811     #[cfg(feature = "alloc")]
flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self> where Self::Item: Stream + Unpin, Self: Sized,812     fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self>
813     where
814         Self::Item: Stream + Unpin,
815         Self: Sized,
816     {
817         FlattenUnordered::new(self, limit.into())
818     }
819 
820     /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
821     ///
822     /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead,
823     /// you would have to chain combinators like `.map(f).flatten()` while this
824     /// combinator provides ability to write `.flat_map(f)` instead of chaining.
825     ///
826     /// The provided closure which produces inner streams is executed over all elements
827     /// of stream as last inner stream is terminated and next stream item is available.
828     ///
829     /// Note that this function consumes the stream passed into it and returns a
830     /// wrapped version of it, similar to the existing `flat_map` methods in the
831     /// standard library.
832     ///
833     /// # Examples
834     ///
835     /// ```
836     /// # futures::executor::block_on(async {
837     /// use futures::stream::{self, StreamExt};
838     ///
839     /// let stream = stream::iter(1..=3);
840     /// let stream = stream.flat_map(|x| stream::iter(vec![x + 3; x]));
841     ///
842     /// assert_eq!(vec![4, 5, 5, 6, 6, 6], stream.collect::<Vec<_>>().await);
843     /// # });
844     /// ```
flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> where F: FnMut(Self::Item) -> U, U: Stream, Self: Sized,845     fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
846     where
847         F: FnMut(Self::Item) -> U,
848         U: Stream,
849         Self: Sized,
850     {
851         assert_stream::<U::Item, _>(FlatMap::new(self, f))
852     }
853 
854     /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s
855     /// and polls them concurrently, yielding items in any order, as they made
856     /// available.
857     ///
858     /// [`StreamExt::map`] is very useful, but if it produces `Stream`s
859     /// instead, and you need to poll all of them concurrently, you would
860     /// have to use something like `for_each_concurrent` and merge values
861     /// by hand. This combinator provides ability to collect all values
862     /// from concurrently polled streams into one stream.
863     ///
864     /// The first argument is an optional limit on the number of concurrently
865     /// polled streams. If this limit is not `None`, no more than `limit` streams
866     /// will be polled concurrently. The `limit` argument is of type
867     /// `Into<Option<usize>>`, and so can be provided as either `None`,
868     /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
869     /// no limit at all, and will have the same result as passing in `None`.
870     ///
871     /// The provided closure which produces inner streams is executed over
872     /// all elements of stream as next stream item is available and limit
873     /// of concurrently processed streams isn't exceeded.
874     ///
875     /// Note that this function consumes the stream passed into it and
876     /// returns a wrapped version of it.
877     ///
878     /// # Examples
879     ///
880     /// ```
881     /// # futures::executor::block_on(async {
882     /// use futures::stream::{self, StreamExt};
883     ///
884     /// let stream = stream::iter(1..5);
885     /// let stream = stream.flat_map_unordered(1, |x| stream::iter(vec![x; x]));
886     /// let mut values = stream.collect::<Vec<_>>().await;
887     /// values.sort();
888     ///
889     /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values);
890     /// # });
891     /// ```
892     #[cfg(not(futures_no_atomic_cas))]
893     #[cfg(feature = "alloc")]
flat_map_unordered<U, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> FlatMapUnordered<Self, U, F> where U: Stream + Unpin, F: FnMut(Self::Item) -> U, Self: Sized,894     fn flat_map_unordered<U, F>(
895         self,
896         limit: impl Into<Option<usize>>,
897         f: F,
898     ) -> FlatMapUnordered<Self, U, F>
899     where
900         U: Stream + Unpin,
901         F: FnMut(Self::Item) -> U,
902         Self: Sized,
903     {
904         FlatMapUnordered::new(self, limit.into(), f)
905     }
906 
907     /// Combinator similar to [`StreamExt::fold`] that holds internal state
908     /// and produces a new stream.
909     ///
910     /// Accepts initial state and closure which will be applied to each element
911     /// of the stream until provided closure returns `None`. Once `None` is
912     /// returned, stream will be terminated.
913     ///
914     /// # Examples
915     ///
916     /// ```
917     /// # futures::executor::block_on(async {
918     /// use futures::future;
919     /// use futures::stream::{self, StreamExt};
920     ///
921     /// let stream = stream::iter(1..=10);
922     ///
923     /// let stream = stream.scan(0, |state, x| {
924     ///     *state += x;
925     ///     future::ready(if *state < 10 { Some(x) } else { None })
926     /// });
927     ///
928     /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await);
929     /// # });
930     /// ```
scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> where F: FnMut(&mut S, Self::Item) -> Fut, Fut: Future<Output = Option<B>>, Self: Sized,931     fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
932     where
933         F: FnMut(&mut S, Self::Item) -> Fut,
934         Fut: Future<Output = Option<B>>,
935         Self: Sized,
936     {
937         assert_stream::<B, _>(Scan::new(self, initial_state, f))
938     }
939 
940     /// Skip elements on this stream while the provided asynchronous predicate
941     /// resolves to `true`.
942     ///
943     /// This function, like `Iterator::skip_while`, will skip elements on the
944     /// stream until the predicate `f` resolves to `false`. Once one element
945     /// returns `false`, all future elements will be returned from the underlying
946     /// stream.
947     ///
948     /// # Examples
949     ///
950     /// ```
951     /// # futures::executor::block_on(async {
952     /// use futures::future;
953     /// use futures::stream::{self, StreamExt};
954     ///
955     /// let stream = stream::iter(1..=10);
956     ///
957     /// let stream = stream.skip_while(|x| future::ready(*x <= 5));
958     ///
959     /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await);
960     /// # });
961     /// ```
skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,962     fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
963     where
964         F: FnMut(&Self::Item) -> Fut,
965         Fut: Future<Output = bool>,
966         Self: Sized,
967     {
968         assert_stream::<Self::Item, _>(SkipWhile::new(self, f))
969     }
970 
971     /// Take elements from this stream while the provided asynchronous predicate
972     /// resolves to `true`.
973     ///
974     /// This function, like `Iterator::take_while`, will take elements from the
975     /// stream until the predicate `f` resolves to `false`. Once one element
976     /// returns `false`, it will always return that the stream is done.
977     ///
978     /// # Examples
979     ///
980     /// ```
981     /// # futures::executor::block_on(async {
982     /// use futures::future;
983     /// use futures::stream::{self, StreamExt};
984     ///
985     /// let stream = stream::iter(1..=10);
986     ///
987     /// let stream = stream.take_while(|x| future::ready(*x <= 5));
988     ///
989     /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
990     /// # });
991     /// ```
take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,992     fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
993     where
994         F: FnMut(&Self::Item) -> Fut,
995         Fut: Future<Output = bool>,
996         Self: Sized,
997     {
998         assert_stream::<Self::Item, _>(TakeWhile::new(self, f))
999     }
1000 
1001     /// Take elements from this stream until the provided future resolves.
1002     ///
1003     /// This function will take elements from the stream until the provided
1004     /// stopping future `fut` resolves. Once the `fut` future becomes ready,
1005     /// this stream combinator will always return that the stream is done.
1006     ///
1007     /// The stopping future may return any type. Once the stream is stopped
1008     /// the result of the stopping future may be accessed with `TakeUntil::take_result()`.
1009     /// The stream may also be resumed with `TakeUntil::take_future()`.
1010     /// See the documentation of [`TakeUntil`] for more information.
1011     ///
1012     /// # Examples
1013     ///
1014     /// ```
1015     /// # futures::executor::block_on(async {
1016     /// use futures::future;
1017     /// use futures::stream::{self, StreamExt};
1018     /// use futures::task::Poll;
1019     ///
1020     /// let stream = stream::iter(1..=10);
1021     ///
1022     /// let mut i = 0;
1023     /// let stop_fut = future::poll_fn(|_cx| {
1024     ///     i += 1;
1025     ///     if i <= 5 {
1026     ///         Poll::Pending
1027     ///     } else {
1028     ///         Poll::Ready(())
1029     ///     }
1030     /// });
1031     ///
1032     /// let stream = stream.take_until(stop_fut);
1033     ///
1034     /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
1035     /// # });
1036     /// ```
take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> where Fut: Future, Self: Sized,1037     fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
1038     where
1039         Fut: Future,
1040         Self: Sized,
1041     {
1042         assert_stream::<Self::Item, _>(TakeUntil::new(self, fut))
1043     }
1044 
1045     /// Runs this stream to completion, executing the provided asynchronous
1046     /// closure for each element on the stream.
1047     ///
1048     /// The closure provided will be called for each item this stream produces,
1049     /// yielding a future. That future will then be executed to completion
1050     /// before moving on to the next item.
1051     ///
1052     /// The returned value is a `Future` where the `Output` type is `()`; it is
1053     /// executed entirely for its side effects.
1054     ///
1055     /// To process each item in the stream and produce another stream instead
1056     /// of a single future, use `then` instead.
1057     ///
1058     /// # Examples
1059     ///
1060     /// ```
1061     /// # futures::executor::block_on(async {
1062     /// use futures::future;
1063     /// use futures::stream::{self, StreamExt};
1064     ///
1065     /// let mut x = 0;
1066     ///
1067     /// {
1068     ///     let fut = stream::repeat(1).take(3).for_each(|item| {
1069     ///         x += item;
1070     ///         future::ready(())
1071     ///     });
1072     ///     fut.await;
1073     /// }
1074     ///
1075     /// assert_eq!(x, 3);
1076     /// # });
1077     /// ```
for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,1078     fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
1079     where
1080         F: FnMut(Self::Item) -> Fut,
1081         Fut: Future<Output = ()>,
1082         Self: Sized,
1083     {
1084         assert_future::<(), _>(ForEach::new(self, f))
1085     }
1086 
1087     /// Runs this stream to completion, executing the provided asynchronous
1088     /// closure for each element on the stream concurrently as elements become
1089     /// available.
1090     ///
1091     /// This is similar to [`StreamExt::for_each`], but the futures
1092     /// produced by the closure are run concurrently (but not in parallel--
1093     /// this combinator does not introduce any threads).
1094     ///
1095     /// The closure provided will be called for each item this stream produces,
1096     /// yielding a future. That future will then be executed to completion
1097     /// concurrently with the other futures produced by the closure.
1098     ///
1099     /// The first argument is an optional limit on the number of concurrent
1100     /// futures. If this limit is not `None`, no more than `limit` futures
1101     /// will be run concurrently. The `limit` argument is of type
1102     /// `Into<Option<usize>>`, and so can be provided as either `None`,
1103     /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
1104     /// no limit at all, and will have the same result as passing in `None`.
1105     ///
1106     /// This method is only available when the `std` or `alloc` feature of this
1107     /// library is activated, and it is activated by default.
1108     ///
1109     /// # Examples
1110     ///
1111     /// ```
1112     /// # futures::executor::block_on(async {
1113     /// use futures::channel::oneshot;
1114     /// use futures::stream::{self, StreamExt};
1115     ///
1116     /// let (tx1, rx1) = oneshot::channel();
1117     /// let (tx2, rx2) = oneshot::channel();
1118     /// let (tx3, rx3) = oneshot::channel();
1119     ///
1120     /// let fut = stream::iter(vec![rx1, rx2, rx3]).for_each_concurrent(
1121     ///     /* limit */ 2,
1122     ///     |rx| async move {
1123     ///         rx.await.unwrap();
1124     ///     }
1125     /// );
1126     /// tx1.send(()).unwrap();
1127     /// tx2.send(()).unwrap();
1128     /// tx3.send(()).unwrap();
1129     /// fut.await;
1130     /// # })
1131     /// ```
1132     #[cfg(not(futures_no_atomic_cas))]
1133     #[cfg(feature = "alloc")]
for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> ForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,1134     fn for_each_concurrent<Fut, F>(
1135         self,
1136         limit: impl Into<Option<usize>>,
1137         f: F,
1138     ) -> ForEachConcurrent<Self, Fut, F>
1139     where
1140         F: FnMut(Self::Item) -> Fut,
1141         Fut: Future<Output = ()>,
1142         Self: Sized,
1143     {
1144         assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f))
1145     }
1146 
1147     /// Creates a new stream of at most `n` items of the underlying stream.
1148     ///
1149     /// Once `n` items have been yielded from this stream then it will always
1150     /// return that the stream is done.
1151     ///
1152     /// # Examples
1153     ///
1154     /// ```
1155     /// # futures::executor::block_on(async {
1156     /// use futures::stream::{self, StreamExt};
1157     ///
1158     /// let stream = stream::iter(1..=10).take(3);
1159     ///
1160     /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await);
1161     /// # });
1162     /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,1163     fn take(self, n: usize) -> Take<Self>
1164     where
1165         Self: Sized,
1166     {
1167         assert_stream::<Self::Item, _>(Take::new(self, n))
1168     }
1169 
1170     /// Creates a new stream which skips `n` items of the underlying stream.
1171     ///
1172     /// Once `n` items have been skipped from this stream then it will always
1173     /// return the remaining items on this stream.
1174     ///
1175     /// # Examples
1176     ///
1177     /// ```
1178     /// # futures::executor::block_on(async {
1179     /// use futures::stream::{self, StreamExt};
1180     ///
1181     /// let stream = stream::iter(1..=10).skip(5);
1182     ///
1183     /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await);
1184     /// # });
1185     /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,1186     fn skip(self, n: usize) -> Skip<Self>
1187     where
1188         Self: Sized,
1189     {
1190         assert_stream::<Self::Item, _>(Skip::new(self, n))
1191     }
1192 
1193     /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never
1194     /// again be called once it has finished. This method can be used to turn
1195     /// any `Stream` into a `FusedStream`.
1196     ///
1197     /// Normally, once a stream has returned [`None`] from
1198     /// [`poll_next`](Stream::poll_next) any further calls could exhibit bad
1199     /// behavior such as block forever, panic, never return, etc. If it is known
1200     /// that [`poll_next`](Stream::poll_next) may be called after stream
1201     /// has already finished, then this method can be used to ensure that it has
1202     /// defined semantics.
1203     ///
1204     /// The [`poll_next`](Stream::poll_next) method of a `fuse`d stream
1205     /// is guaranteed to return [`None`] after the underlying stream has
1206     /// finished.
1207     ///
1208     /// # Examples
1209     ///
1210     /// ```
1211     /// use futures::executor::block_on_stream;
1212     /// use futures::stream::{self, StreamExt};
1213     /// use futures::task::Poll;
1214     ///
1215     /// let mut x = 0;
1216     /// let stream = stream::poll_fn(|_| {
1217     ///     x += 1;
1218     ///     match x {
1219     ///         0..=2 => Poll::Ready(Some(x)),
1220     ///         3 => Poll::Ready(None),
1221     ///         _ => panic!("should not happen")
1222     ///     }
1223     /// }).fuse();
1224     ///
1225     /// let mut iter = block_on_stream(stream);
1226     /// assert_eq!(Some(1), iter.next());
1227     /// assert_eq!(Some(2), iter.next());
1228     /// assert_eq!(None, iter.next());
1229     /// assert_eq!(None, iter.next());
1230     /// // ...
1231     /// ```
fuse(self) -> Fuse<Self> where Self: Sized,1232     fn fuse(self) -> Fuse<Self>
1233     where
1234         Self: Sized,
1235     {
1236         assert_stream::<Self::Item, _>(Fuse::new(self))
1237     }
1238 
1239     /// Borrows a stream, rather than consuming it.
1240     ///
1241     /// This is useful to allow applying stream adaptors while still retaining
1242     /// ownership of the original stream.
1243     ///
1244     /// # Examples
1245     ///
1246     /// ```
1247     /// # futures::executor::block_on(async {
1248     /// use futures::stream::{self, StreamExt};
1249     ///
1250     /// let mut stream = stream::iter(1..5);
1251     ///
1252     /// let sum = stream.by_ref()
1253     ///                 .take(2)
1254     ///                 .fold(0, |a, b| async move { a + b })
1255     ///                 .await;
1256     /// assert_eq!(sum, 3);
1257     ///
1258     /// // You can use the stream again
1259     /// let sum = stream.take(2)
1260     ///                 .fold(0, |a, b| async move { a + b })
1261     ///                 .await;
1262     /// assert_eq!(sum, 7);
1263     /// # });
1264     /// ```
by_ref(&mut self) -> &mut Self1265     fn by_ref(&mut self) -> &mut Self {
1266         self
1267     }
1268 
1269     /// Catches unwinding panics while polling the stream.
1270     ///
1271     /// Caught panic (if any) will be the last element of the resulting stream.
1272     ///
1273     /// In general, panics within a stream can propagate all the way out to the
1274     /// task level. This combinator makes it possible to halt unwinding within
1275     /// the stream itself. It's most commonly used within task executors. This
1276     /// method should not be used for error handling.
1277     ///
1278     /// Note that this method requires the `UnwindSafe` bound from the standard
1279     /// library. This isn't always applied automatically, and the standard
1280     /// library provides an `AssertUnwindSafe` wrapper type to apply it
1281     /// after-the fact. To assist using this method, the [`Stream`] trait is
1282     /// also implemented for `AssertUnwindSafe<St>` where `St` implements
1283     /// [`Stream`].
1284     ///
1285     /// This method is only available when the `std` feature of this
1286     /// library is activated, and it is activated by default.
1287     ///
1288     /// # Examples
1289     ///
1290     /// ```
1291     /// # futures::executor::block_on(async {
1292     /// use futures::stream::{self, StreamExt};
1293     ///
1294     /// let stream = stream::iter(vec![Some(10), None, Some(11)]);
1295     /// // Panic on second element
1296     /// let stream_panicking = stream.map(|o| o.unwrap());
1297     /// // Collect all the results
1298     /// let stream = stream_panicking.catch_unwind();
1299     ///
1300     /// let results: Vec<Result<i32, _>> = stream.collect().await;
1301     /// match results[0] {
1302     ///     Ok(10) => {}
1303     ///     _ => panic!("unexpected result!"),
1304     /// }
1305     /// assert!(results[1].is_err());
1306     /// assert_eq!(results.len(), 2);
1307     /// # });
1308     /// ```
1309     #[cfg(feature = "std")]
catch_unwind(self) -> CatchUnwind<Self> where Self: Sized + std::panic::UnwindSafe,1310     fn catch_unwind(self) -> CatchUnwind<Self>
1311     where
1312         Self: Sized + std::panic::UnwindSafe,
1313     {
1314         assert_stream(CatchUnwind::new(self))
1315     }
1316 
1317     /// Wrap the stream in a Box, pinning it.
1318     ///
1319     /// This method is only available when the `std` or `alloc` feature of this
1320     /// library is activated, and it is activated by default.
1321     #[cfg(feature = "alloc")]
boxed<'a>(self) -> BoxStream<'a, Self::Item> where Self: Sized + Send + 'a,1322     fn boxed<'a>(self) -> BoxStream<'a, Self::Item>
1323     where
1324         Self: Sized + Send + 'a,
1325     {
1326         assert_stream::<Self::Item, _>(Box::pin(self))
1327     }
1328 
1329     /// Wrap the stream in a Box, pinning it.
1330     ///
1331     /// Similar to `boxed`, but without the `Send` requirement.
1332     ///
1333     /// This method is only available when the `std` or `alloc` feature of this
1334     /// library is activated, and it is activated by default.
1335     #[cfg(feature = "alloc")]
boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item> where Self: Sized + 'a,1336     fn boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item>
1337     where
1338         Self: Sized + 'a,
1339     {
1340         assert_stream::<Self::Item, _>(Box::pin(self))
1341     }
1342 
1343     /// An adaptor for creating a buffered list of pending futures.
1344     ///
1345     /// If this stream's item can be converted into a future, then this adaptor
1346     /// will buffer up to at most `n` futures and then return the outputs in the
1347     /// same order as the underlying stream. No more than `n` futures will be
1348     /// buffered at any point in time, and less than `n` may also be buffered
1349     /// depending on the state of each future.
1350     ///
1351     /// The returned stream will be a stream of each future's output.
1352     ///
1353     /// This method is only available when the `std` or `alloc` feature of this
1354     /// library is activated, and it is activated by default.
1355     #[cfg(not(futures_no_atomic_cas))]
1356     #[cfg(feature = "alloc")]
buffered(self, n: usize) -> Buffered<Self> where Self::Item: Future, Self: Sized,1357     fn buffered(self, n: usize) -> Buffered<Self>
1358     where
1359         Self::Item: Future,
1360         Self: Sized,
1361     {
1362         assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n))
1363     }
1364 
1365     /// An adaptor for creating a buffered list of pending futures (unordered).
1366     ///
1367     /// If this stream's item can be converted into a future, then this adaptor
1368     /// will buffer up to `n` futures and then return the outputs in the order
1369     /// in which they complete. No more than `n` futures will be buffered at
1370     /// any point in time, and less than `n` may also be buffered depending on
1371     /// the state of each future.
1372     ///
1373     /// The returned stream will be a stream of each future's output.
1374     ///
1375     /// This method is only available when the `std` or `alloc` feature of this
1376     /// library is activated, and it is activated by default.
1377     ///
1378     /// # Examples
1379     ///
1380     /// ```
1381     /// # futures::executor::block_on(async {
1382     /// use futures::channel::oneshot;
1383     /// use futures::stream::{self, StreamExt};
1384     ///
1385     /// let (send_one, recv_one) = oneshot::channel();
1386     /// let (send_two, recv_two) = oneshot::channel();
1387     ///
1388     /// let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
1389     /// let mut buffered = stream_of_futures.buffer_unordered(10);
1390     ///
1391     /// send_two.send(2i32)?;
1392     /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1393     ///
1394     /// send_one.send(1i32)?;
1395     /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1396     ///
1397     /// assert_eq!(buffered.next().await, None);
1398     /// # Ok::<(), i32>(()) }).unwrap();
1399     /// ```
1400     #[cfg(not(futures_no_atomic_cas))]
1401     #[cfg(feature = "alloc")]
buffer_unordered(self, n: usize) -> BufferUnordered<Self> where Self::Item: Future, Self: Sized,1402     fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
1403     where
1404         Self::Item: Future,
1405         Self: Sized,
1406     {
1407         assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n))
1408     }
1409 
1410     /// An adapter for zipping two streams together.
1411     ///
1412     /// The zipped stream waits for both streams to produce an item, and then
1413     /// returns that pair. If either stream ends then the zipped stream will
1414     /// also end.
1415     ///
1416     /// # Examples
1417     ///
1418     /// ```
1419     /// # futures::executor::block_on(async {
1420     /// use futures::stream::{self, StreamExt};
1421     ///
1422     /// let stream1 = stream::iter(1..=3);
1423     /// let stream2 = stream::iter(5..=10);
1424     ///
1425     /// let vec = stream1.zip(stream2)
1426     ///                  .collect::<Vec<_>>()
1427     ///                  .await;
1428     /// assert_eq!(vec![(1, 5), (2, 6), (3, 7)], vec);
1429     /// # });
1430     /// ```
1431     ///
zip<St>(self, other: St) -> Zip<Self, St> where St: Stream, Self: Sized,1432     fn zip<St>(self, other: St) -> Zip<Self, St>
1433     where
1434         St: Stream,
1435         Self: Sized,
1436     {
1437         assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other))
1438     }
1439 
1440     /// Adapter for chaining two streams.
1441     ///
1442     /// The resulting stream emits elements from the first stream, and when
1443     /// first stream reaches the end, emits the elements from the second stream.
1444     ///
1445     /// ```
1446     /// # futures::executor::block_on(async {
1447     /// use futures::stream::{self, StreamExt};
1448     ///
1449     /// let stream1 = stream::iter(vec![Ok(10), Err(false)]);
1450     /// let stream2 = stream::iter(vec![Err(true), Ok(20)]);
1451     ///
1452     /// let stream = stream1.chain(stream2);
1453     ///
1454     /// let result: Vec<_> = stream.collect().await;
1455     /// assert_eq!(result, vec![
1456     ///     Ok(10),
1457     ///     Err(false),
1458     ///     Err(true),
1459     ///     Ok(20),
1460     /// ]);
1461     /// # });
1462     /// ```
chain<St>(self, other: St) -> Chain<Self, St> where St: Stream<Item = Self::Item>, Self: Sized,1463     fn chain<St>(self, other: St) -> Chain<Self, St>
1464     where
1465         St: Stream<Item = Self::Item>,
1466         Self: Sized,
1467     {
1468         assert_stream::<Self::Item, _>(Chain::new(self, other))
1469     }
1470 
1471     /// Creates a new stream which exposes a `peek` method.
1472     ///
1473     /// Calling `peek` returns a reference to the next item in the stream.
peekable(self) -> Peekable<Self> where Self: Sized,1474     fn peekable(self) -> Peekable<Self>
1475     where
1476         Self: Sized,
1477     {
1478         assert_stream::<Self::Item, _>(Peekable::new(self))
1479     }
1480 
1481     /// An adaptor for chunking up items of the stream inside a vector.
1482     ///
1483     /// This combinator will attempt to pull items from this stream and buffer
1484     /// them into a local vector. At most `capacity` items will get buffered
1485     /// before they're yielded from the returned stream.
1486     ///
1487     /// Note that the vectors returned from this iterator may not always have
1488     /// `capacity` elements. If the underlying stream ended and only a partial
1489     /// vector was created, it'll be returned. Additionally if an error happens
1490     /// from the underlying stream then the currently buffered items will be
1491     /// yielded.
1492     ///
1493     /// This method is only available when the `std` or `alloc` feature of this
1494     /// library is activated, and it is activated by default.
1495     ///
1496     /// # Panics
1497     ///
1498     /// This method will panic if `capacity` is zero.
1499     #[cfg(feature = "alloc")]
chunks(self, capacity: usize) -> Chunks<Self> where Self: Sized,1500     fn chunks(self, capacity: usize) -> Chunks<Self>
1501     where
1502         Self: Sized,
1503     {
1504         assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity))
1505     }
1506 
1507     /// An adaptor for chunking up ready items of the stream inside a vector.
1508     ///
1509     /// This combinator will attempt to pull ready items from this stream and
1510     /// buffer them into a local vector. At most `capacity` items will get
1511     /// buffered before they're yielded from the returned stream. If underlying
1512     /// stream returns `Poll::Pending`, and collected chunk is not empty, it will
1513     /// be immediately returned.
1514     ///
1515     /// If the underlying stream ended and only a partial vector was created,
1516     /// it'll be returned. Additionally if an error happens from the underlying
1517     /// stream then the currently buffered items will be yielded.
1518     ///
1519     /// This method is only available when the `std` or `alloc` feature of this
1520     /// library is activated, and it is activated by default.
1521     ///
1522     /// # Panics
1523     ///
1524     /// This method will panic if `capacity` is zero.
1525     #[cfg(feature = "alloc")]
ready_chunks(self, capacity: usize) -> ReadyChunks<Self> where Self: Sized,1526     fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>
1527     where
1528         Self: Sized,
1529     {
1530         assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity))
1531     }
1532 
1533     /// A future that completes after the given stream has been fully processed
1534     /// into the sink and the sink has been flushed and closed.
1535     ///
1536     /// This future will drive the stream to keep producing items until it is
1537     /// exhausted, sending each item to the sink. It will complete once the
1538     /// stream is exhausted, the sink has received and flushed all items, and
1539     /// the sink is closed. Note that neither the original stream nor provided
1540     /// sink will be output by this future. Pass the sink by `Pin<&mut S>`
1541     /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in
1542     /// order to preserve access to the `Sink`. If the stream produces an error,
1543     /// that error will be returned by this future without flushing/closing the sink.
1544     #[cfg(feature = "sink")]
1545     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
forward<S>(self, sink: S) -> Forward<Self, S> where S: Sink<Self::Ok, Error = Self::Error>, Self: TryStream + Sized,1546     fn forward<S>(self, sink: S) -> Forward<Self, S>
1547     where
1548         S: Sink<Self::Ok, Error = Self::Error>,
1549         Self: TryStream + Sized,
1550         // Self: TryStream + Sized + Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>,
1551     {
1552         // TODO: type mismatch resolving `<Self as futures_core::Stream>::Item == std::result::Result<<Self as futures_core::TryStream>::Ok, <Self as futures_core::TryStream>::Error>`
1553         // assert_future::<Result<(), Self::Error>, _>(Forward::new(self, sink))
1554         Forward::new(self, sink)
1555     }
1556 
1557     /// Splits this `Stream + Sink` object into separate `Sink` and `Stream`
1558     /// objects.
1559     ///
1560     /// This can be useful when you want to split ownership between tasks, or
1561     /// allow direct interaction between the two objects (e.g. via
1562     /// `Sink::send_all`).
1563     ///
1564     /// This method is only available when the `std` or `alloc` feature of this
1565     /// library is activated, and it is activated by default.
1566     #[cfg(feature = "sink")]
1567     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
1568     #[cfg(not(futures_no_atomic_cas))]
1569     #[cfg(feature = "alloc")]
split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) where Self: Sink<Item> + Sized,1570     fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
1571     where
1572         Self: Sink<Item> + Sized,
1573     {
1574         let (sink, stream) = split::split(self);
1575         (
1576             crate::sink::assert_sink::<Item, Self::Error, _>(sink),
1577             assert_stream::<Self::Item, _>(stream),
1578         )
1579     }
1580 
1581     /// Do something with each item of this stream, afterwards passing it on.
1582     ///
1583     /// This is similar to the `Iterator::inspect` method in the standard
1584     /// library where it allows easily inspecting each value as it passes
1585     /// through the stream, for example to debug what's going on.
inspect<F>(self, f: F) -> Inspect<Self, F> where F: FnMut(&Self::Item), Self: Sized,1586     fn inspect<F>(self, f: F) -> Inspect<Self, F>
1587     where
1588         F: FnMut(&Self::Item),
1589         Self: Sized,
1590     {
1591         assert_stream::<Self::Item, _>(Inspect::new(self, f))
1592     }
1593 
1594     /// Wrap this stream in an `Either` stream, making it the left-hand variant
1595     /// of that `Either`.
1596     ///
1597     /// This can be used in combination with the `right_stream` method to write `if`
1598     /// statements that evaluate to different streams in different branches.
left_stream<B>(self) -> Either<Self, B> where B: Stream<Item = Self::Item>, Self: Sized,1599     fn left_stream<B>(self) -> Either<Self, B>
1600     where
1601         B: Stream<Item = Self::Item>,
1602         Self: Sized,
1603     {
1604         assert_stream::<Self::Item, _>(Either::Left(self))
1605     }
1606 
1607     /// Wrap this stream in an `Either` stream, making it the right-hand variant
1608     /// of that `Either`.
1609     ///
1610     /// This can be used in combination with the `left_stream` method to write `if`
1611     /// statements that evaluate to different streams in different branches.
right_stream<B>(self) -> Either<B, Self> where B: Stream<Item = Self::Item>, Self: Sized,1612     fn right_stream<B>(self) -> Either<B, Self>
1613     where
1614         B: Stream<Item = Self::Item>,
1615         Self: Sized,
1616     {
1617         assert_stream::<Self::Item, _>(Either::Right(self))
1618     }
1619 
1620     /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`]
1621     /// stream types.
poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> where Self: Unpin,1622     fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
1623     where
1624         Self: Unpin,
1625     {
1626         Pin::new(self).poll_next(cx)
1627     }
1628 
1629     /// Returns a [`Future`] that resolves when the next item in this stream is
1630     /// ready.
1631     ///
1632     /// This is similar to the [`next`][StreamExt::next] method, but it won't
1633     /// resolve to [`None`] if used on an empty [`Stream`]. Instead, the
1634     /// returned future type will return `true` from
1635     /// [`FusedFuture::is_terminated`][] when the [`Stream`] is empty, allowing
1636     /// [`select_next_some`][StreamExt::select_next_some] to be easily used with
1637     /// the [`select!`] macro.
1638     ///
1639     /// If the future is polled after this [`Stream`] is empty it will panic.
1640     /// Using the future with a [`FusedFuture`][]-aware primitive like the
1641     /// [`select!`] macro will prevent this.
1642     ///
1643     /// [`FusedFuture`]: futures_core::future::FusedFuture
1644     /// [`FusedFuture::is_terminated`]: futures_core::future::FusedFuture::is_terminated
1645     ///
1646     /// # Examples
1647     ///
1648     /// ```
1649     /// # futures::executor::block_on(async {
1650     /// use futures::{future, select};
1651     /// use futures::stream::{StreamExt, FuturesUnordered};
1652     ///
1653     /// let mut fut = future::ready(1);
1654     /// let mut async_tasks = FuturesUnordered::new();
1655     /// let mut total = 0;
1656     /// loop {
1657     ///     select! {
1658     ///         num = fut => {
1659     ///             // First, the `ready` future completes.
1660     ///             total += num;
1661     ///             // Then we spawn a new task onto `async_tasks`,
1662     ///             async_tasks.push(async { 5 });
1663     ///         },
1664     ///         // On the next iteration of the loop, the task we spawned
1665     ///         // completes.
1666     ///         num = async_tasks.select_next_some() => {
1667     ///             total += num;
1668     ///         }
1669     ///         // Finally, both the `ready` future and `async_tasks` have
1670     ///         // finished, so we enter the `complete` branch.
1671     ///         complete => break,
1672     ///     }
1673     /// }
1674     /// assert_eq!(total, 6);
1675     /// # });
1676     /// ```
select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream,1677     fn select_next_some(&mut self) -> SelectNextSome<'_, Self>
1678     where
1679         Self: Unpin + FusedStream,
1680     {
1681         assert_future::<Self::Item, _>(SelectNextSome::new(self))
1682     }
1683 }
1684