1 //! Streams
2 //!
3 //! This module contains a number of functions for working with `Stream`s,
4 //! including the `StreamExt` trait which adds methods to `Stream` types.
5 
6 use crate::future::{assert_future, Either};
7 use crate::stream::assert_stream;
8 #[cfg(feature = "alloc")]
9 use alloc::boxed::Box;
10 #[cfg(feature = "alloc")]
11 use alloc::vec::Vec;
12 use core::pin::Pin;
13 #[cfg(feature = "sink")]
14 use futures_core::stream::TryStream;
15 #[cfg(feature = "alloc")]
16 use futures_core::stream::{BoxStream, LocalBoxStream};
17 use futures_core::{
18     future::Future,
19     stream::{FusedStream, Stream},
20     task::{Context, Poll},
21 };
22 #[cfg(feature = "sink")]
23 use futures_sink::Sink;
24 
25 use crate::fns::{inspect_fn, InspectFn};
26 
27 mod chain;
28 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
29 pub use self::chain::Chain;
30 
31 mod collect;
32 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
33 pub use self::collect::Collect;
34 
35 mod unzip;
36 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
37 pub use self::unzip::Unzip;
38 
39 mod concat;
40 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
41 pub use self::concat::Concat;
42 
43 mod cycle;
44 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
45 pub use self::cycle::Cycle;
46 
47 mod enumerate;
48 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
49 pub use self::enumerate::Enumerate;
50 
51 mod filter;
52 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
53 pub use self::filter::Filter;
54 
55 mod filter_map;
56 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
57 pub use self::filter_map::FilterMap;
58 
59 mod flatten;
60 
61 delegate_all!(
62     /// Stream for the [`flatten`](StreamExt::flatten) method.
63     Flatten<St>(
64         flatten::Flatten<St, St::Item>
65     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)]
66     where St: Stream
67 );
68 
69 mod fold;
70 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
71 pub use self::fold::Fold;
72 
73 mod any;
74 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
75 pub use self::any::Any;
76 
77 mod all;
78 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
79 pub use self::all::All;
80 
81 #[cfg(feature = "sink")]
82 mod forward;
83 
84 #[cfg(feature = "sink")]
85 delegate_all!(
86     /// Future for the [`forward`](super::StreamExt::forward) method.
87     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
88     Forward<St, Si>(
89         forward::Forward<St, Si, St::Ok>
90     ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)]
91     where St: TryStream
92 );
93 
94 mod for_each;
95 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
96 pub use self::for_each::ForEach;
97 
98 mod fuse;
99 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
100 pub use self::fuse::Fuse;
101 
102 mod into_future;
103 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
104 pub use self::into_future::StreamFuture;
105 
106 delegate_all!(
107     /// Stream for the [`inspect`](StreamExt::inspect) method.
108     Inspect<St, F>(
109         map::Map<St, InspectFn<F>>
110     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| map::Map::new(x, inspect_fn(f))]
111 );
112 
113 mod map;
114 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
115 pub use self::map::Map;
116 
117 delegate_all!(
118     /// Stream for the [`flat_map`](StreamExt::flat_map) method.
119     FlatMap<St, U, F>(
120         flatten::Flatten<Map<St, F>, U>
121     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))]
122 );
123 
124 mod next;
125 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
126 pub use self::next::Next;
127 
128 mod select_next_some;
129 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
130 pub use self::select_next_some::SelectNextSome;
131 
132 mod peek;
133 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
134 pub use self::peek::{NextIf, NextIfEq, Peek, PeekMut, Peekable};
135 
136 mod skip;
137 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
138 pub use self::skip::Skip;
139 
140 mod skip_while;
141 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
142 pub use self::skip_while::SkipWhile;
143 
144 mod take;
145 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
146 pub use self::take::Take;
147 
148 mod take_while;
149 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
150 pub use self::take_while::TakeWhile;
151 
152 mod take_until;
153 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
154 pub use self::take_until::TakeUntil;
155 
156 mod then;
157 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
158 pub use self::then::Then;
159 
160 mod zip;
161 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
162 pub use self::zip::Zip;
163 
164 #[cfg(feature = "alloc")]
165 mod chunks;
166 #[cfg(feature = "alloc")]
167 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
168 pub use self::chunks::Chunks;
169 
170 #[cfg(feature = "alloc")]
171 mod ready_chunks;
172 #[cfg(feature = "alloc")]
173 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
174 pub use self::ready_chunks::ReadyChunks;
175 
176 mod scan;
177 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
178 pub use self::scan::Scan;
179 
180 #[cfg(not(futures_no_atomic_cas))]
181 #[cfg(feature = "alloc")]
182 mod buffer_unordered;
183 #[cfg(not(futures_no_atomic_cas))]
184 #[cfg(feature = "alloc")]
185 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
186 pub use self::buffer_unordered::BufferUnordered;
187 
188 #[cfg(not(futures_no_atomic_cas))]
189 #[cfg(feature = "alloc")]
190 mod buffered;
191 #[cfg(not(futures_no_atomic_cas))]
192 #[cfg(feature = "alloc")]
193 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
194 pub use self::buffered::Buffered;
195 
196 #[cfg(not(futures_no_atomic_cas))]
197 #[cfg(feature = "alloc")]
198 mod for_each_concurrent;
199 #[cfg(not(futures_no_atomic_cas))]
200 #[cfg(feature = "alloc")]
201 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
202 pub use self::for_each_concurrent::ForEachConcurrent;
203 
204 #[cfg(not(futures_no_atomic_cas))]
205 #[cfg(feature = "sink")]
206 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
207 #[cfg(feature = "alloc")]
208 mod split;
209 #[cfg(not(futures_no_atomic_cas))]
210 #[cfg(feature = "sink")]
211 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
212 #[cfg(feature = "alloc")]
213 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
214 pub use self::split::{ReuniteError, SplitSink, SplitStream};
215 
216 #[cfg(feature = "std")]
217 mod catch_unwind;
218 #[cfg(feature = "std")]
219 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
220 pub use self::catch_unwind::CatchUnwind;
221 
222 impl<T: ?Sized> StreamExt for T where T: Stream {}
223 
224 /// An extension trait for `Stream`s that provides a variety of convenient
225 /// combinator functions.
226 pub trait StreamExt: Stream {
227     /// Creates a future that resolves to the next item in the stream.
228     ///
229     /// Note that because `next` doesn't take ownership over the stream,
230     /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
231     /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
232     /// be done by boxing the stream using [`Box::pin`] or
233     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
234     /// crate.
235     ///
236     /// # Examples
237     ///
238     /// ```
239     /// # futures::executor::block_on(async {
240     /// use futures::stream::{self, StreamExt};
241     ///
242     /// let mut stream = stream::iter(1..=3);
243     ///
244     /// assert_eq!(stream.next().await, Some(1));
245     /// assert_eq!(stream.next().await, Some(2));
246     /// assert_eq!(stream.next().await, Some(3));
247     /// assert_eq!(stream.next().await, None);
248     /// # });
249     /// ```
next(&mut self) -> Next<'_, Self> where Self: Unpin,250     fn next(&mut self) -> Next<'_, Self>
251     where
252         Self: Unpin,
253     {
254         assert_future::<Option<Self::Item>, _>(Next::new(self))
255     }
256 
257     /// Converts this stream into a future of `(next_item, tail_of_stream)`.
258     /// If the stream terminates, then the next item is [`None`].
259     ///
260     /// The returned future can be used to compose streams and futures together
261     /// by placing everything into the "world of futures".
262     ///
263     /// Note that because `into_future` moves the stream, the [`Stream`] type
264     /// must be [`Unpin`]. If you want to use `into_future` with a
265     /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
266     /// be done by boxing the stream using [`Box::pin`] or
267     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
268     /// crate.
269     ///
270     /// # Examples
271     ///
272     /// ```
273     /// # futures::executor::block_on(async {
274     /// use futures::stream::{self, StreamExt};
275     ///
276     /// let stream = stream::iter(1..=3);
277     ///
278     /// let (item, stream) = stream.into_future().await;
279     /// assert_eq!(Some(1), item);
280     ///
281     /// let (item, stream) = stream.into_future().await;
282     /// assert_eq!(Some(2), item);
283     /// # });
284     /// ```
into_future(self) -> StreamFuture<Self> where Self: Sized + Unpin,285     fn into_future(self) -> StreamFuture<Self>
286     where
287         Self: Sized + Unpin,
288     {
289         assert_future::<(Option<Self::Item>, Self), _>(StreamFuture::new(self))
290     }
291 
292     /// Maps this stream's items to a different type, returning a new stream of
293     /// the resulting type.
294     ///
295     /// The provided closure is executed over all elements of this stream as
296     /// they are made available. It is executed inline with calls to
297     /// [`poll_next`](Stream::poll_next).
298     ///
299     /// Note that this function consumes the stream passed into it and returns a
300     /// wrapped version of it, similar to the existing `map` methods in the
301     /// standard library.
302     ///
303     /// # Examples
304     ///
305     /// ```
306     /// # futures::executor::block_on(async {
307     /// use futures::stream::{self, StreamExt};
308     ///
309     /// let stream = stream::iter(1..=3);
310     /// let stream = stream.map(|x| x + 3);
311     ///
312     /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
313     /// # });
314     /// ```
map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,315     fn map<T, F>(self, f: F) -> Map<Self, F>
316     where
317         F: FnMut(Self::Item) -> T,
318         Self: Sized,
319     {
320         assert_stream::<T, _>(Map::new(self, f))
321     }
322 
323     /// Creates a stream which gives the current iteration count as well as
324     /// the next value.
325     ///
326     /// The stream returned yields pairs `(i, val)`, where `i` is the
327     /// current index of iteration and `val` is the value returned by the
328     /// stream.
329     ///
330     /// `enumerate()` keeps its count as a [`usize`]. If you want to count by a
331     /// different sized integer, the [`zip`](StreamExt::zip) function provides similar
332     /// functionality.
333     ///
334     /// # Overflow Behavior
335     ///
336     /// The method does no guarding against overflows, so enumerating more than
337     /// [`prim@usize::max_value()`] elements either produces the wrong result or panics. If
338     /// debug assertions are enabled, a panic is guaranteed.
339     ///
340     /// # Panics
341     ///
342     /// The returned stream might panic if the to-be-returned index would
343     /// overflow a [`usize`].
344     ///
345     /// # Examples
346     ///
347     /// ```
348     /// # futures::executor::block_on(async {
349     /// use futures::stream::{self, StreamExt};
350     ///
351     /// let stream = stream::iter(vec!['a', 'b', 'c']);
352     ///
353     /// let mut stream = stream.enumerate();
354     ///
355     /// assert_eq!(stream.next().await, Some((0, 'a')));
356     /// assert_eq!(stream.next().await, Some((1, 'b')));
357     /// assert_eq!(stream.next().await, Some((2, 'c')));
358     /// assert_eq!(stream.next().await, None);
359     /// # });
360     /// ```
enumerate(self) -> Enumerate<Self> where Self: Sized,361     fn enumerate(self) -> Enumerate<Self>
362     where
363         Self: Sized,
364     {
365         assert_stream::<(usize, Self::Item), _>(Enumerate::new(self))
366     }
367 
368     /// Filters the values produced by this stream according to the provided
369     /// asynchronous predicate.
370     ///
371     /// As values of this stream are made available, the provided predicate `f`
372     /// will be run against them. If the predicate returns a `Future` which
373     /// resolves to `true`, then the stream will yield the value, but if the
374     /// predicate returns a `Future` which resolves to `false`, then the value
375     /// will be discarded and the next value will be produced.
376     ///
377     /// Note that this function consumes the stream passed into it and returns a
378     /// wrapped version of it, similar to the existing `filter` methods in the
379     /// standard library.
380     ///
381     /// # Examples
382     ///
383     /// ```
384     /// # futures::executor::block_on(async {
385     /// use futures::future;
386     /// use futures::stream::{self, StreamExt};
387     ///
388     /// let stream = stream::iter(1..=10);
389     /// let evens = stream.filter(|x| future::ready(x % 2 == 0));
390     ///
391     /// assert_eq!(vec![2, 4, 6, 8, 10], evens.collect::<Vec<_>>().await);
392     /// # });
393     /// ```
filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,394     fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
395     where
396         F: FnMut(&Self::Item) -> Fut,
397         Fut: Future<Output = bool>,
398         Self: Sized,
399     {
400         assert_stream::<Self::Item, _>(Filter::new(self, f))
401     }
402 
403     /// Filters the values produced by this stream while simultaneously mapping
404     /// them to a different type according to the provided asynchronous closure.
405     ///
406     /// As values of this stream are made available, the provided function will
407     /// be run on them. If the future returned by the predicate `f` resolves to
408     /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
409     /// it resolves to [`None`] then the next value will be produced.
410     ///
411     /// Note that this function consumes the stream passed into it and returns a
412     /// wrapped version of it, similar to the existing `filter_map` methods in
413     /// the standard library.
414     ///
415     /// # Examples
416     /// ```
417     /// # futures::executor::block_on(async {
418     /// use futures::stream::{self, StreamExt};
419     ///
420     /// let stream = stream::iter(1..=10);
421     /// let evens = stream.filter_map(|x| async move {
422     ///     if x % 2 == 0 { Some(x + 1) } else { None }
423     /// });
424     ///
425     /// assert_eq!(vec![3, 5, 7, 9, 11], evens.collect::<Vec<_>>().await);
426     /// # });
427     /// ```
filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = Option<T>>, Self: Sized,428     fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
429     where
430         F: FnMut(Self::Item) -> Fut,
431         Fut: Future<Output = Option<T>>,
432         Self: Sized,
433     {
434         assert_stream::<T, _>(FilterMap::new(self, f))
435     }
436 
437     /// Computes from this stream's items new items of a different type using
438     /// an asynchronous closure.
439     ///
440     /// The provided closure `f` will be called with an `Item` once a value is
441     /// ready, it returns a future which will then be run to completion
442     /// to produce the next value on this stream.
443     ///
444     /// Note that this function consumes the stream passed into it and returns a
445     /// wrapped version of it.
446     ///
447     /// # Examples
448     ///
449     /// ```
450     /// # futures::executor::block_on(async {
451     /// use futures::stream::{self, StreamExt};
452     ///
453     /// let stream = stream::iter(1..=3);
454     /// let stream = stream.then(|x| async move { x + 3 });
455     ///
456     /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
457     /// # });
458     /// ```
then<Fut, F>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,459     fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
460     where
461         F: FnMut(Self::Item) -> Fut,
462         Fut: Future,
463         Self: Sized,
464     {
465         assert_stream::<Fut::Output, _>(Then::new(self, f))
466     }
467 
468     /// Transforms a stream into a collection, returning a
469     /// future representing the result of that computation.
470     ///
471     /// The returned future will be resolved when the stream terminates.
472     ///
473     /// # Examples
474     ///
475     /// ```
476     /// # futures::executor::block_on(async {
477     /// use futures::channel::mpsc;
478     /// use futures::stream::StreamExt;
479     /// use std::thread;
480     ///
481     /// let (tx, rx) = mpsc::unbounded();
482     ///
483     /// thread::spawn(move || {
484     ///     for i in 1..=5 {
485     ///         tx.unbounded_send(i).unwrap();
486     ///     }
487     /// });
488     ///
489     /// let output = rx.collect::<Vec<i32>>().await;
490     /// assert_eq!(output, vec![1, 2, 3, 4, 5]);
491     /// # });
492     /// ```
collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C> where Self: Sized,493     fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C>
494     where
495         Self: Sized,
496     {
497         assert_future::<C, _>(Collect::new(self))
498     }
499 
500     /// Converts a stream of pairs into a future, which
501     /// resolves to pair of containers.
502     ///
503     /// `unzip()` produces a future, which resolves to two
504     /// collections: one from the left elements of the pairs,
505     /// and one from the right elements.
506     ///
507     /// The returned future will be resolved when the stream terminates.
508     ///
509     /// # Examples
510     ///
511     /// ```
512     /// # futures::executor::block_on(async {
513     /// use futures::channel::mpsc;
514     /// use futures::stream::StreamExt;
515     /// use std::thread;
516     ///
517     /// let (tx, rx) = mpsc::unbounded();
518     ///
519     /// thread::spawn(move || {
520     ///     tx.unbounded_send((1, 2)).unwrap();
521     ///     tx.unbounded_send((3, 4)).unwrap();
522     ///     tx.unbounded_send((5, 6)).unwrap();
523     /// });
524     ///
525     /// let (o1, o2): (Vec<_>, Vec<_>) = rx.unzip().await;
526     /// assert_eq!(o1, vec![1, 3, 5]);
527     /// assert_eq!(o2, vec![2, 4, 6]);
528     /// # });
529     /// ```
unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB> where FromA: Default + Extend<A>, FromB: Default + Extend<B>, Self: Sized + Stream<Item = (A, B)>,530     fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
531     where
532         FromA: Default + Extend<A>,
533         FromB: Default + Extend<B>,
534         Self: Sized + Stream<Item = (A, B)>,
535     {
536         assert_future::<(FromA, FromB), _>(Unzip::new(self))
537     }
538 
539     /// Concatenate all items of a stream into a single extendable
540     /// destination, returning a future representing the end result.
541     ///
542     /// This combinator will extend the first item with the contents
543     /// of all the subsequent results of the stream. If the stream is
544     /// empty, the default value will be returned.
545     ///
546     /// Works with all collections that implement the
547     /// [`Extend`](std::iter::Extend) trait.
548     ///
549     /// # Examples
550     ///
551     /// ```
552     /// # futures::executor::block_on(async {
553     /// use futures::channel::mpsc;
554     /// use futures::stream::StreamExt;
555     /// use std::thread;
556     ///
557     /// let (tx, rx) = mpsc::unbounded();
558     ///
559     /// thread::spawn(move || {
560     ///     for i in (0..3).rev() {
561     ///         let n = i * 3;
562     ///         tx.unbounded_send(vec![n + 1, n + 2, n + 3]).unwrap();
563     ///     }
564     /// });
565     ///
566     /// let result = rx.concat().await;
567     ///
568     /// assert_eq!(result, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]);
569     /// # });
570     /// ```
concat(self) -> Concat<Self> where Self: Sized, Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,571     fn concat(self) -> Concat<Self>
572     where
573         Self: Sized,
574         Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
575     {
576         assert_future::<Self::Item, _>(Concat::new(self))
577     }
578 
579     /// Repeats a stream endlessly.
580     ///
581     /// The stream never terminates. Note that you likely want to avoid
582     /// usage of `collect` or such on the returned stream as it will exhaust
583     /// available memory as it tries to just fill up all RAM.
584     ///
585     /// # Examples
586     ///
587     /// ```
588     /// # futures::executor::block_on(async {
589     /// use futures::stream::{self, StreamExt};
590     /// let a = [1, 2, 3];
591     /// let mut s = stream::iter(a.iter()).cycle();
592     ///
593     /// assert_eq!(s.next().await, Some(&1));
594     /// assert_eq!(s.next().await, Some(&2));
595     /// assert_eq!(s.next().await, Some(&3));
596     /// assert_eq!(s.next().await, Some(&1));
597     /// assert_eq!(s.next().await, Some(&2));
598     /// assert_eq!(s.next().await, Some(&3));
599     /// assert_eq!(s.next().await, Some(&1));
600     /// # });
601     /// ```
cycle(self) -> Cycle<Self> where Self: Sized + Clone,602     fn cycle(self) -> Cycle<Self>
603     where
604         Self: Sized + Clone,
605     {
606         assert_stream::<Self::Item, _>(Cycle::new(self))
607     }
608 
609     /// Execute an accumulating asynchronous computation over a stream,
610     /// collecting all the values into one final result.
611     ///
612     /// This combinator will accumulate all values returned by this stream
613     /// according to the closure provided. The initial state is also provided to
614     /// this method and then is returned again by each execution of the closure.
615     /// Once the entire stream has been exhausted the returned future will
616     /// resolve to this value.
617     ///
618     /// # Examples
619     ///
620     /// ```
621     /// # futures::executor::block_on(async {
622     /// use futures::stream::{self, StreamExt};
623     ///
624     /// let number_stream = stream::iter(0..6);
625     /// let sum = number_stream.fold(0, |acc, x| async move { acc + x });
626     /// assert_eq!(sum.await, 15);
627     /// # });
628     /// ```
fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> where F: FnMut(T, Self::Item) -> Fut, Fut: Future<Output = T>, Self: Sized,629     fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
630     where
631         F: FnMut(T, Self::Item) -> Fut,
632         Fut: Future<Output = T>,
633         Self: Sized,
634     {
635         assert_future::<T, _>(Fold::new(self, f, init))
636     }
637 
638     /// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate.
639     ///
640     /// # Examples
641     ///
642     /// ```
643     /// # futures::executor::block_on(async {
644     /// use futures::stream::{self, StreamExt};
645     ///
646     /// let number_stream = stream::iter(0..10);
647     /// let contain_three = number_stream.any(|i| async move { i == 3 });
648     /// assert_eq!(contain_three.await, true);
649     /// # });
650     /// ```
any<Fut, F>(self, f: F) -> Any<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,651     fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
652     where
653         F: FnMut(Self::Item) -> Fut,
654         Fut: Future<Output = bool>,
655         Self: Sized,
656     {
657         assert_future::<bool, _>(Any::new(self, f))
658     }
659 
660     /// Execute predicate over asynchronous stream, and return `true` if all element in stream satisfied a predicate.
661     ///
662     /// # Examples
663     ///
664     /// ```
665     /// # futures::executor::block_on(async {
666     /// use futures::stream::{self, StreamExt};
667     ///
668     /// let number_stream = stream::iter(0..10);
669     /// let less_then_twenty = number_stream.all(|i| async move { i < 20 });
670     /// assert_eq!(less_then_twenty.await, true);
671     /// # });
672     /// ```
all<Fut, F>(self, f: F) -> All<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,673     fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
674     where
675         F: FnMut(Self::Item) -> Fut,
676         Fut: Future<Output = bool>,
677         Self: Sized,
678     {
679         assert_future::<bool, _>(All::new(self, f))
680     }
681 
682     /// Flattens a stream of streams into just one continuous stream.
683     ///
684     /// # Examples
685     ///
686     /// ```
687     /// # futures::executor::block_on(async {
688     /// use futures::channel::mpsc;
689     /// use futures::stream::StreamExt;
690     /// use std::thread;
691     ///
692     /// let (tx1, rx1) = mpsc::unbounded();
693     /// let (tx2, rx2) = mpsc::unbounded();
694     /// let (tx3, rx3) = mpsc::unbounded();
695     ///
696     /// thread::spawn(move || {
697     ///     tx1.unbounded_send(1).unwrap();
698     ///     tx1.unbounded_send(2).unwrap();
699     /// });
700     /// thread::spawn(move || {
701     ///     tx2.unbounded_send(3).unwrap();
702     ///     tx2.unbounded_send(4).unwrap();
703     /// });
704     /// thread::spawn(move || {
705     ///     tx3.unbounded_send(rx1).unwrap();
706     ///     tx3.unbounded_send(rx2).unwrap();
707     /// });
708     ///
709     /// let output = rx3.flatten().collect::<Vec<i32>>().await;
710     /// assert_eq!(output, vec![1, 2, 3, 4]);
711     /// # });
712     /// ```
flatten(self) -> Flatten<Self> where Self::Item: Stream, Self: Sized,713     fn flatten(self) -> Flatten<Self>
714     where
715         Self::Item: Stream,
716         Self: Sized,
717     {
718         assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self))
719     }
720 
721     /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
722     ///
723     /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead,
724     /// you would have to chain combinators like `.map(f).flatten()` while this
725     /// combinator provides ability to write `.flat_map(f)` instead of chaining.
726     ///
727     /// The provided closure which produce inner streams is executed over all elements
728     /// of stream as last inner stream is terminated and next stream item is available.
729     ///
730     /// Note that this function consumes the stream passed into it and returns a
731     /// wrapped version of it, similar to the existing `flat_map` methods in the
732     /// standard library.
733     ///
734     /// # Examples
735     ///
736     /// ```
737     /// # futures::executor::block_on(async {
738     /// use futures::stream::{self, StreamExt};
739     ///
740     /// let stream = stream::iter(1..=3);
741     /// let stream = stream.flat_map(|x| stream::iter(vec![x + 3; x]));
742     ///
743     /// assert_eq!(vec![4, 5, 5, 6, 6, 6], stream.collect::<Vec<_>>().await);
744     /// # });
745     /// ```
flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> where F: FnMut(Self::Item) -> U, U: Stream, Self: Sized,746     fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
747     where
748         F: FnMut(Self::Item) -> U,
749         U: Stream,
750         Self: Sized,
751     {
752         assert_stream::<U::Item, _>(FlatMap::new(self, f))
753     }
754 
755     /// Combinator similar to [`StreamExt::fold`] that holds internal state
756     /// and produces a new stream.
757     ///
758     /// Accepts initial state and closure which will be applied to each element
759     /// of the stream until provided closure returns `None`. Once `None` is
760     /// returned, stream will be terminated.
761     ///
762     /// # Examples
763     ///
764     /// ```
765     /// # futures::executor::block_on(async {
766     /// use futures::future;
767     /// use futures::stream::{self, StreamExt};
768     ///
769     /// let stream = stream::iter(1..=10);
770     ///
771     /// let stream = stream.scan(0, |state, x| {
772     ///     *state += x;
773     ///     future::ready(if *state < 10 { Some(x) } else { None })
774     /// });
775     ///
776     /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await);
777     /// # });
778     /// ```
scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> where F: FnMut(&mut S, Self::Item) -> Fut, Fut: Future<Output = Option<B>>, Self: Sized,779     fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
780     where
781         F: FnMut(&mut S, Self::Item) -> Fut,
782         Fut: Future<Output = Option<B>>,
783         Self: Sized,
784     {
785         assert_stream::<B, _>(Scan::new(self, initial_state, f))
786     }
787 
788     /// Skip elements on this stream while the provided asynchronous predicate
789     /// resolves to `true`.
790     ///
791     /// This function, like `Iterator::skip_while`, will skip elements on the
792     /// stream until the predicate `f` resolves to `false`. Once one element
793     /// returns `false`, all future elements will be returned from the underlying
794     /// stream.
795     ///
796     /// # Examples
797     ///
798     /// ```
799     /// # futures::executor::block_on(async {
800     /// use futures::future;
801     /// use futures::stream::{self, StreamExt};
802     ///
803     /// let stream = stream::iter(1..=10);
804     ///
805     /// let stream = stream.skip_while(|x| future::ready(*x <= 5));
806     ///
807     /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await);
808     /// # });
809     /// ```
skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,810     fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
811     where
812         F: FnMut(&Self::Item) -> Fut,
813         Fut: Future<Output = bool>,
814         Self: Sized,
815     {
816         assert_stream::<Self::Item, _>(SkipWhile::new(self, f))
817     }
818 
819     /// Take elements from this stream while the provided asynchronous predicate
820     /// resolves to `true`.
821     ///
822     /// This function, like `Iterator::take_while`, will take elements from the
823     /// stream until the predicate `f` resolves to `false`. Once one element
824     /// returns `false`, it will always return that the stream is done.
825     ///
826     /// # Examples
827     ///
828     /// ```
829     /// # futures::executor::block_on(async {
830     /// use futures::future;
831     /// use futures::stream::{self, StreamExt};
832     ///
833     /// let stream = stream::iter(1..=10);
834     ///
835     /// let stream = stream.take_while(|x| future::ready(*x <= 5));
836     ///
837     /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
838     /// # });
839     /// ```
take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,840     fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
841     where
842         F: FnMut(&Self::Item) -> Fut,
843         Fut: Future<Output = bool>,
844         Self: Sized,
845     {
846         assert_stream::<Self::Item, _>(TakeWhile::new(self, f))
847     }
848 
849     /// Take elements from this stream until the provided future resolves.
850     ///
851     /// This function will take elements from the stream until the provided
852     /// stopping future `fut` resolves. Once the `fut` future becomes ready,
853     /// this stream combinator will always return that the stream is done.
854     ///
855     /// The stopping future may return any type. Once the stream is stopped
856     /// the result of the stopping future may be accessed with `TakeUntil::take_result()`.
857     /// The stream may also be resumed with `TakeUntil::take_future()`.
858     /// See the documentation of [`TakeUntil`] for more information.
859     ///
860     /// # Examples
861     ///
862     /// ```
863     /// # futures::executor::block_on(async {
864     /// use futures::future;
865     /// use futures::stream::{self, StreamExt};
866     /// use futures::task::Poll;
867     ///
868     /// let stream = stream::iter(1..=10);
869     ///
870     /// let mut i = 0;
871     /// let stop_fut = future::poll_fn(|_cx| {
872     ///     i += 1;
873     ///     if i <= 5 {
874     ///         Poll::Pending
875     ///     } else {
876     ///         Poll::Ready(())
877     ///     }
878     /// });
879     ///
880     /// let stream = stream.take_until(stop_fut);
881     ///
882     /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
883     /// # });
884     /// ```
take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> where Fut: Future, Self: Sized,885     fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
886     where
887         Fut: Future,
888         Self: Sized,
889     {
890         assert_stream::<Self::Item, _>(TakeUntil::new(self, fut))
891     }
892 
893     /// Runs this stream to completion, executing the provided asynchronous
894     /// closure for each element on the stream.
895     ///
896     /// The closure provided will be called for each item this stream produces,
897     /// yielding a future. That future will then be executed to completion
898     /// before moving on to the next item.
899     ///
900     /// The returned value is a `Future` where the `Output` type is `()`; it is
901     /// executed entirely for its side effects.
902     ///
903     /// To process each item in the stream and produce another stream instead
904     /// of a single future, use `then` instead.
905     ///
906     /// # Examples
907     ///
908     /// ```
909     /// # futures::executor::block_on(async {
910     /// use futures::future;
911     /// use futures::stream::{self, StreamExt};
912     ///
913     /// let mut x = 0;
914     ///
915     /// {
916     ///     let fut = stream::repeat(1).take(3).for_each(|item| {
917     ///         x += item;
918     ///         future::ready(())
919     ///     });
920     ///     fut.await;
921     /// }
922     ///
923     /// assert_eq!(x, 3);
924     /// # });
925     /// ```
for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,926     fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
927     where
928         F: FnMut(Self::Item) -> Fut,
929         Fut: Future<Output = ()>,
930         Self: Sized,
931     {
932         assert_future::<(), _>(ForEach::new(self, f))
933     }
934 
935     /// Runs this stream to completion, executing the provided asynchronous
936     /// closure for each element on the stream concurrently as elements become
937     /// available.
938     ///
939     /// This is similar to [`StreamExt::for_each`], but the futures
940     /// produced by the closure are run concurrently (but not in parallel--
941     /// this combinator does not introduce any threads).
942     ///
943     /// The closure provided will be called for each item this stream produces,
944     /// yielding a future. That future will then be executed to completion
945     /// concurrently with the other futures produced by the closure.
946     ///
947     /// The first argument is an optional limit on the number of concurrent
948     /// futures. If this limit is not `None`, no more than `limit` futures
949     /// will be run concurrently. The `limit` argument is of type
950     /// `Into<Option<usize>>`, and so can be provided as either `None`,
951     /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
952     /// no limit at all, and will have the same result as passing in `None`.
953     ///
954     /// This method is only available when the `std` or `alloc` feature of this
955     /// library is activated, and it is activated by default.
956     ///
957     /// # Examples
958     ///
959     /// ```
960     /// # futures::executor::block_on(async {
961     /// use futures::channel::oneshot;
962     /// use futures::stream::{self, StreamExt};
963     ///
964     /// let (tx1, rx1) = oneshot::channel();
965     /// let (tx2, rx2) = oneshot::channel();
966     /// let (tx3, rx3) = oneshot::channel();
967     ///
968     /// let fut = stream::iter(vec![rx1, rx2, rx3]).for_each_concurrent(
969     ///     /* limit */ 2,
970     ///     |rx| async move {
971     ///         rx.await.unwrap();
972     ///     }
973     /// );
974     /// tx1.send(()).unwrap();
975     /// tx2.send(()).unwrap();
976     /// tx3.send(()).unwrap();
977     /// fut.await;
978     /// # })
979     /// ```
980     #[cfg(not(futures_no_atomic_cas))]
981     #[cfg(feature = "alloc")]
for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> ForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,982     fn for_each_concurrent<Fut, F>(
983         self,
984         limit: impl Into<Option<usize>>,
985         f: F,
986     ) -> ForEachConcurrent<Self, Fut, F>
987     where
988         F: FnMut(Self::Item) -> Fut,
989         Fut: Future<Output = ()>,
990         Self: Sized,
991     {
992         assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f))
993     }
994 
995     /// Creates a new stream of at most `n` items of the underlying stream.
996     ///
997     /// Once `n` items have been yielded from this stream then it will always
998     /// return that the stream is done.
999     ///
1000     /// # Examples
1001     ///
1002     /// ```
1003     /// # futures::executor::block_on(async {
1004     /// use futures::stream::{self, StreamExt};
1005     ///
1006     /// let stream = stream::iter(1..=10).take(3);
1007     ///
1008     /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await);
1009     /// # });
1010     /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,1011     fn take(self, n: usize) -> Take<Self>
1012     where
1013         Self: Sized,
1014     {
1015         assert_stream::<Self::Item, _>(Take::new(self, n))
1016     }
1017 
1018     /// Creates a new stream which skips `n` items of the underlying stream.
1019     ///
1020     /// Once `n` items have been skipped from this stream then it will always
1021     /// return the remaining items on this stream.
1022     ///
1023     /// # Examples
1024     ///
1025     /// ```
1026     /// # futures::executor::block_on(async {
1027     /// use futures::stream::{self, StreamExt};
1028     ///
1029     /// let stream = stream::iter(1..=10).skip(5);
1030     ///
1031     /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await);
1032     /// # });
1033     /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,1034     fn skip(self, n: usize) -> Skip<Self>
1035     where
1036         Self: Sized,
1037     {
1038         assert_stream::<Self::Item, _>(Skip::new(self, n))
1039     }
1040 
1041     /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never
1042     /// again be called once it has finished. This method can be used to turn
1043     /// any `Stream` into a `FusedStream`.
1044     ///
1045     /// Normally, once a stream has returned [`None`] from
1046     /// [`poll_next`](Stream::poll_next) any further calls could exhibit bad
1047     /// behavior such as block forever, panic, never return, etc. If it is known
1048     /// that [`poll_next`](Stream::poll_next) may be called after stream
1049     /// has already finished, then this method can be used to ensure that it has
1050     /// defined semantics.
1051     ///
1052     /// The [`poll_next`](Stream::poll_next) method of a `fuse`d stream
1053     /// is guaranteed to return [`None`] after the underlying stream has
1054     /// finished.
1055     ///
1056     /// # Examples
1057     ///
1058     /// ```
1059     /// use futures::executor::block_on_stream;
1060     /// use futures::stream::{self, StreamExt};
1061     /// use futures::task::Poll;
1062     ///
1063     /// let mut x = 0;
1064     /// let stream = stream::poll_fn(|_| {
1065     ///     x += 1;
1066     ///     match x {
1067     ///         0..=2 => Poll::Ready(Some(x)),
1068     ///         3 => Poll::Ready(None),
1069     ///         _ => panic!("should not happen")
1070     ///     }
1071     /// }).fuse();
1072     ///
1073     /// let mut iter = block_on_stream(stream);
1074     /// assert_eq!(Some(1), iter.next());
1075     /// assert_eq!(Some(2), iter.next());
1076     /// assert_eq!(None, iter.next());
1077     /// assert_eq!(None, iter.next());
1078     /// // ...
1079     /// ```
fuse(self) -> Fuse<Self> where Self: Sized,1080     fn fuse(self) -> Fuse<Self>
1081     where
1082         Self: Sized,
1083     {
1084         assert_stream::<Self::Item, _>(Fuse::new(self))
1085     }
1086 
1087     /// Borrows a stream, rather than consuming it.
1088     ///
1089     /// This is useful to allow applying stream adaptors while still retaining
1090     /// ownership of the original stream.
1091     ///
1092     /// # Examples
1093     ///
1094     /// ```
1095     /// # futures::executor::block_on(async {
1096     /// use futures::stream::{self, StreamExt};
1097     ///
1098     /// let mut stream = stream::iter(1..5);
1099     ///
1100     /// let sum = stream.by_ref()
1101     ///                 .take(2)
1102     ///                 .fold(0, |a, b| async move { a + b })
1103     ///                 .await;
1104     /// assert_eq!(sum, 3);
1105     ///
1106     /// // You can use the stream again
1107     /// let sum = stream.take(2)
1108     ///                 .fold(0, |a, b| async move { a + b })
1109     ///                 .await;
1110     /// assert_eq!(sum, 7);
1111     /// # });
1112     /// ```
by_ref(&mut self) -> &mut Self1113     fn by_ref(&mut self) -> &mut Self {
1114         self
1115     }
1116 
1117     /// Catches unwinding panics while polling the stream.
1118     ///
1119     /// Caught panic (if any) will be the last element of the resulting stream.
1120     ///
1121     /// In general, panics within a stream can propagate all the way out to the
1122     /// task level. This combinator makes it possible to halt unwinding within
1123     /// the stream itself. It's most commonly used within task executors. This
1124     /// method should not be used for error handling.
1125     ///
1126     /// Note that this method requires the `UnwindSafe` bound from the standard
1127     /// library. This isn't always applied automatically, and the standard
1128     /// library provides an `AssertUnwindSafe` wrapper type to apply it
1129     /// after-the fact. To assist using this method, the [`Stream`] trait is
1130     /// also implemented for `AssertUnwindSafe<St>` where `St` implements
1131     /// [`Stream`].
1132     ///
1133     /// This method is only available when the `std` feature of this
1134     /// library is activated, and it is activated by default.
1135     ///
1136     /// # Examples
1137     ///
1138     /// ```
1139     /// # futures::executor::block_on(async {
1140     /// use futures::stream::{self, StreamExt};
1141     ///
1142     /// let stream = stream::iter(vec![Some(10), None, Some(11)]);
1143     /// // Panic on second element
1144     /// let stream_panicking = stream.map(|o| o.unwrap());
1145     /// // Collect all the results
1146     /// let stream = stream_panicking.catch_unwind();
1147     ///
1148     /// let results: Vec<Result<i32, _>> = stream.collect().await;
1149     /// match results[0] {
1150     ///     Ok(10) => {}
1151     ///     _ => panic!("unexpected result!"),
1152     /// }
1153     /// assert!(results[1].is_err());
1154     /// assert_eq!(results.len(), 2);
1155     /// # });
1156     /// ```
1157     #[cfg(feature = "std")]
catch_unwind(self) -> CatchUnwind<Self> where Self: Sized + std::panic::UnwindSafe,1158     fn catch_unwind(self) -> CatchUnwind<Self>
1159     where
1160         Self: Sized + std::panic::UnwindSafe,
1161     {
1162         assert_stream(CatchUnwind::new(self))
1163     }
1164 
1165     /// Wrap the stream in a Box, pinning it.
1166     ///
1167     /// This method is only available when the `std` or `alloc` feature of this
1168     /// library is activated, and it is activated by default.
1169     #[cfg(feature = "alloc")]
boxed<'a>(self) -> BoxStream<'a, Self::Item> where Self: Sized + Send + 'a,1170     fn boxed<'a>(self) -> BoxStream<'a, Self::Item>
1171     where
1172         Self: Sized + Send + 'a,
1173     {
1174         assert_stream::<Self::Item, _>(Box::pin(self))
1175     }
1176 
1177     /// Wrap the stream in a Box, pinning it.
1178     ///
1179     /// Similar to `boxed`, but without the `Send` requirement.
1180     ///
1181     /// This method is only available when the `std` or `alloc` feature of this
1182     /// library is activated, and it is activated by default.
1183     #[cfg(feature = "alloc")]
boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item> where Self: Sized + 'a,1184     fn boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item>
1185     where
1186         Self: Sized + 'a,
1187     {
1188         assert_stream::<Self::Item, _>(Box::pin(self))
1189     }
1190 
1191     /// An adaptor for creating a buffered list of pending futures.
1192     ///
1193     /// If this stream's item can be converted into a future, then this adaptor
1194     /// will buffer up to at most `n` futures and then return the outputs in the
1195     /// same order as the underlying stream. No more than `n` futures will be
1196     /// buffered at any point in time, and less than `n` may also be buffered
1197     /// depending on the state of each future.
1198     ///
1199     /// The returned stream will be a stream of each future's output.
1200     ///
1201     /// This method is only available when the `std` or `alloc` feature of this
1202     /// library is activated, and it is activated by default.
1203     #[cfg(not(futures_no_atomic_cas))]
1204     #[cfg(feature = "alloc")]
buffered(self, n: usize) -> Buffered<Self> where Self::Item: Future, Self: Sized,1205     fn buffered(self, n: usize) -> Buffered<Self>
1206     where
1207         Self::Item: Future,
1208         Self: Sized,
1209     {
1210         assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n))
1211     }
1212 
1213     /// An adaptor for creating a buffered list of pending futures (unordered).
1214     ///
1215     /// If this stream's item can be converted into a future, then this adaptor
1216     /// will buffer up to `n` futures and then return the outputs in the order
1217     /// in which they complete. No more than `n` futures will be buffered at
1218     /// any point in time, and less than `n` may also be buffered depending on
1219     /// the state of each future.
1220     ///
1221     /// The returned stream will be a stream of each future's output.
1222     ///
1223     /// This method is only available when the `std` or `alloc` feature of this
1224     /// library is activated, and it is activated by default.
1225     ///
1226     /// # Examples
1227     ///
1228     /// ```
1229     /// # futures::executor::block_on(async {
1230     /// use futures::channel::oneshot;
1231     /// use futures::stream::{self, StreamExt};
1232     ///
1233     /// let (send_one, recv_one) = oneshot::channel();
1234     /// let (send_two, recv_two) = oneshot::channel();
1235     ///
1236     /// let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
1237     /// let mut buffered = stream_of_futures.buffer_unordered(10);
1238     ///
1239     /// send_two.send(2i32)?;
1240     /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1241     ///
1242     /// send_one.send(1i32)?;
1243     /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1244     ///
1245     /// assert_eq!(buffered.next().await, None);
1246     /// # Ok::<(), i32>(()) }).unwrap();
1247     /// ```
1248     #[cfg(not(futures_no_atomic_cas))]
1249     #[cfg(feature = "alloc")]
buffer_unordered(self, n: usize) -> BufferUnordered<Self> where Self::Item: Future, Self: Sized,1250     fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
1251     where
1252         Self::Item: Future,
1253         Self: Sized,
1254     {
1255         assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n))
1256     }
1257 
1258     /// An adapter for zipping two streams together.
1259     ///
1260     /// The zipped stream waits for both streams to produce an item, and then
1261     /// returns that pair. If either stream ends then the zipped stream will
1262     /// also end.
1263     ///
1264     /// # Examples
1265     ///
1266     /// ```
1267     /// # futures::executor::block_on(async {
1268     /// use futures::stream::{self, StreamExt};
1269     ///
1270     /// let stream1 = stream::iter(1..=3);
1271     /// let stream2 = stream::iter(5..=10);
1272     ///
1273     /// let vec = stream1.zip(stream2)
1274     ///                  .collect::<Vec<_>>()
1275     ///                  .await;
1276     /// assert_eq!(vec![(1, 5), (2, 6), (3, 7)], vec);
1277     /// # });
1278     /// ```
1279     ///
zip<St>(self, other: St) -> Zip<Self, St> where St: Stream, Self: Sized,1280     fn zip<St>(self, other: St) -> Zip<Self, St>
1281     where
1282         St: Stream,
1283         Self: Sized,
1284     {
1285         assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other))
1286     }
1287 
1288     /// Adapter for chaining two streams.
1289     ///
1290     /// The resulting stream emits elements from the first stream, and when
1291     /// first stream reaches the end, emits the elements from the second stream.
1292     ///
1293     /// ```
1294     /// # futures::executor::block_on(async {
1295     /// use futures::stream::{self, StreamExt};
1296     ///
1297     /// let stream1 = stream::iter(vec![Ok(10), Err(false)]);
1298     /// let stream2 = stream::iter(vec![Err(true), Ok(20)]);
1299     ///
1300     /// let stream = stream1.chain(stream2);
1301     ///
1302     /// let result: Vec<_> = stream.collect().await;
1303     /// assert_eq!(result, vec![
1304     ///     Ok(10),
1305     ///     Err(false),
1306     ///     Err(true),
1307     ///     Ok(20),
1308     /// ]);
1309     /// # });
1310     /// ```
chain<St>(self, other: St) -> Chain<Self, St> where St: Stream<Item = Self::Item>, Self: Sized,1311     fn chain<St>(self, other: St) -> Chain<Self, St>
1312     where
1313         St: Stream<Item = Self::Item>,
1314         Self: Sized,
1315     {
1316         assert_stream::<Self::Item, _>(Chain::new(self, other))
1317     }
1318 
1319     /// Creates a new stream which exposes a `peek` method.
1320     ///
1321     /// Calling `peek` returns a reference to the next item in the stream.
peekable(self) -> Peekable<Self> where Self: Sized,1322     fn peekable(self) -> Peekable<Self>
1323     where
1324         Self: Sized,
1325     {
1326         assert_stream::<Self::Item, _>(Peekable::new(self))
1327     }
1328 
1329     /// An adaptor for chunking up items of the stream inside a vector.
1330     ///
1331     /// This combinator will attempt to pull items from this stream and buffer
1332     /// them into a local vector. At most `capacity` items will get buffered
1333     /// before they're yielded from the returned stream.
1334     ///
1335     /// Note that the vectors returned from this iterator may not always have
1336     /// `capacity` elements. If the underlying stream ended and only a partial
1337     /// vector was created, it'll be returned. Additionally if an error happens
1338     /// from the underlying stream then the currently buffered items will be
1339     /// yielded.
1340     ///
1341     /// This method is only available when the `std` or `alloc` feature of this
1342     /// library is activated, and it is activated by default.
1343     ///
1344     /// # Panics
1345     ///
1346     /// This method will panic if `capacity` is zero.
1347     #[cfg(feature = "alloc")]
chunks(self, capacity: usize) -> Chunks<Self> where Self: Sized,1348     fn chunks(self, capacity: usize) -> Chunks<Self>
1349     where
1350         Self: Sized,
1351     {
1352         assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity))
1353     }
1354 
1355     /// An adaptor for chunking up ready items of the stream inside a vector.
1356     ///
1357     /// This combinator will attempt to pull ready items from this stream and
1358     /// buffer them into a local vector. At most `capacity` items will get
1359     /// buffered before they're yielded from the returned stream. If underlying
1360     /// stream returns `Poll::Pending`, and collected chunk is not empty, it will
1361     /// be immediately returned.
1362     ///
1363     /// If the underlying stream ended and only a partial vector was created,
1364     /// it'll be returned. Additionally if an error happens from the underlying
1365     /// stream then the currently buffered items will be yielded.
1366     ///
1367     /// This method is only available when the `std` or `alloc` feature of this
1368     /// library is activated, and it is activated by default.
1369     ///
1370     /// # Panics
1371     ///
1372     /// This method will panic if `capacity` is zero.
1373     #[cfg(feature = "alloc")]
ready_chunks(self, capacity: usize) -> ReadyChunks<Self> where Self: Sized,1374     fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>
1375     where
1376         Self: Sized,
1377     {
1378         assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity))
1379     }
1380 
1381     /// A future that completes after the given stream has been fully processed
1382     /// into the sink and the sink has been flushed and closed.
1383     ///
1384     /// This future will drive the stream to keep producing items until it is
1385     /// exhausted, sending each item to the sink. It will complete once the
1386     /// stream is exhausted, the sink has received and flushed all items, and
1387     /// the sink is closed. Note that neither the original stream nor provided
1388     /// sink will be output by this future. Pass the sink by `Pin<&mut S>`
1389     /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in
1390     /// order to preserve access to the `Sink`. If the stream produces an error,
1391     /// that error will be returned by this future without flushing/closing the sink.
1392     #[cfg(feature = "sink")]
1393     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
forward<S>(self, sink: S) -> Forward<Self, S> where S: Sink<Self::Ok, Error = Self::Error>, Self: TryStream + Sized,1394     fn forward<S>(self, sink: S) -> Forward<Self, S>
1395     where
1396         S: Sink<Self::Ok, Error = Self::Error>,
1397         Self: TryStream + Sized,
1398         // Self: TryStream + Sized + Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>,
1399     {
1400         // TODO: type mismatch resolving `<Self as futures_core::Stream>::Item == std::result::Result<<Self as futures_core::TryStream>::Ok, <Self as futures_core::TryStream>::Error>`
1401         // assert_future::<Result<(), Self::Error>, _>(Forward::new(self, sink))
1402         Forward::new(self, sink)
1403     }
1404 
1405     /// Splits this `Stream + Sink` object into separate `Sink` and `Stream`
1406     /// objects.
1407     ///
1408     /// This can be useful when you want to split ownership between tasks, or
1409     /// allow direct interaction between the two objects (e.g. via
1410     /// `Sink::send_all`).
1411     ///
1412     /// This method is only available when the `std` or `alloc` feature of this
1413     /// library is activated, and it is activated by default.
1414     #[cfg(feature = "sink")]
1415     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
1416     #[cfg(not(futures_no_atomic_cas))]
1417     #[cfg(feature = "alloc")]
split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) where Self: Sink<Item> + Sized,1418     fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
1419     where
1420         Self: Sink<Item> + Sized,
1421     {
1422         let (sink, stream) = split::split(self);
1423         (
1424             crate::sink::assert_sink::<Item, Self::Error, _>(sink),
1425             assert_stream::<Self::Item, _>(stream),
1426         )
1427     }
1428 
1429     /// Do something with each item of this stream, afterwards passing it on.
1430     ///
1431     /// This is similar to the `Iterator::inspect` method in the standard
1432     /// library where it allows easily inspecting each value as it passes
1433     /// through the stream, for example to debug what's going on.
inspect<F>(self, f: F) -> Inspect<Self, F> where F: FnMut(&Self::Item), Self: Sized,1434     fn inspect<F>(self, f: F) -> Inspect<Self, F>
1435     where
1436         F: FnMut(&Self::Item),
1437         Self: Sized,
1438     {
1439         assert_stream::<Self::Item, _>(Inspect::new(self, f))
1440     }
1441 
1442     /// Wrap this stream in an `Either` stream, making it the left-hand variant
1443     /// of that `Either`.
1444     ///
1445     /// This can be used in combination with the `right_stream` method to write `if`
1446     /// statements that evaluate to different streams in different branches.
left_stream<B>(self) -> Either<Self, B> where B: Stream<Item = Self::Item>, Self: Sized,1447     fn left_stream<B>(self) -> Either<Self, B>
1448     where
1449         B: Stream<Item = Self::Item>,
1450         Self: Sized,
1451     {
1452         assert_stream::<Self::Item, _>(Either::Left(self))
1453     }
1454 
1455     /// Wrap this stream in an `Either` stream, making it the right-hand variant
1456     /// of that `Either`.
1457     ///
1458     /// This can be used in combination with the `left_stream` method to write `if`
1459     /// statements that evaluate to different streams in different branches.
right_stream<B>(self) -> Either<B, Self> where B: Stream<Item = Self::Item>, Self: Sized,1460     fn right_stream<B>(self) -> Either<B, Self>
1461     where
1462         B: Stream<Item = Self::Item>,
1463         Self: Sized,
1464     {
1465         assert_stream::<Self::Item, _>(Either::Right(self))
1466     }
1467 
1468     /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`]
1469     /// stream types.
poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> where Self: Unpin,1470     fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
1471     where
1472         Self: Unpin,
1473     {
1474         Pin::new(self).poll_next(cx)
1475     }
1476 
1477     /// Returns a [`Future`] that resolves when the next item in this stream is
1478     /// ready.
1479     ///
1480     /// This is similar to the [`next`][StreamExt::next] method, but it won't
1481     /// resolve to [`None`] if used on an empty [`Stream`]. Instead, the
1482     /// returned future type will return `true` from
1483     /// [`FusedFuture::is_terminated`][] when the [`Stream`] is empty, allowing
1484     /// [`select_next_some`][StreamExt::select_next_some] to be easily used with
1485     /// the [`select!`] macro.
1486     ///
1487     /// If the future is polled after this [`Stream`] is empty it will panic.
1488     /// Using the future with a [`FusedFuture`][]-aware primitive like the
1489     /// [`select!`] macro will prevent this.
1490     ///
1491     /// [`FusedFuture`]: futures_core::future::FusedFuture
1492     /// [`FusedFuture::is_terminated`]: futures_core::future::FusedFuture::is_terminated
1493     ///
1494     /// # Examples
1495     ///
1496     /// ```
1497     /// # futures::executor::block_on(async {
1498     /// use futures::{future, select};
1499     /// use futures::stream::{StreamExt, FuturesUnordered};
1500     ///
1501     /// let mut fut = future::ready(1);
1502     /// let mut async_tasks = FuturesUnordered::new();
1503     /// let mut total = 0;
1504     /// loop {
1505     ///     select! {
1506     ///         num = fut => {
1507     ///             // First, the `ready` future completes.
1508     ///             total += num;
1509     ///             // Then we spawn a new task onto `async_tasks`,
1510     ///             async_tasks.push(async { 5 });
1511     ///         },
1512     ///         // On the next iteration of the loop, the task we spawned
1513     ///         // completes.
1514     ///         num = async_tasks.select_next_some() => {
1515     ///             total += num;
1516     ///         }
1517     ///         // Finally, both the `ready` future and `async_tasks` have
1518     ///         // finished, so we enter the `complete` branch.
1519     ///         complete => break,
1520     ///     }
1521     /// }
1522     /// assert_eq!(total, 6);
1523     /// # });
1524     /// ```
select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream,1525     fn select_next_some(&mut self) -> SelectNextSome<'_, Self>
1526     where
1527         Self: Unpin + FusedStream,
1528     {
1529         assert_future::<Self::Item, _>(SelectNextSome::new(self))
1530     }
1531 }
1532