1 //! Streams
2 //!
3 //! This module contains a number of functions for working with `Stream`s,
4 //! including the `StreamExt` trait which adds methods to `Stream` types.
5 
6 use crate::future::{assert_future, Either};
7 #[cfg(feature = "alloc")]
8 use alloc::boxed::Box;
9 use core::pin::Pin;
10 #[cfg(feature = "sink")]
11 use futures_core::stream::TryStream;
12 #[cfg(feature = "alloc")]
13 use futures_core::stream::{BoxStream, LocalBoxStream};
14 use futures_core::{
15     future::Future,
16     stream::{FusedStream, Stream},
17     task::{Context, Poll},
18 };
19 #[cfg(feature = "sink")]
20 use futures_sink::Sink;
21 
22 use crate::fns::{InspectFn, inspect_fn};
23 
24 mod chain;
25 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
26 pub use self::chain::Chain;
27 
28 mod collect;
29 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
30 pub use self::collect::Collect;
31 
32 mod concat;
33 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
34 pub use self::concat::Concat;
35 
36 mod cycle;
37 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
38 pub use self::cycle::Cycle;
39 
40 mod enumerate;
41 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
42 pub use self::enumerate::Enumerate;
43 
44 mod filter;
45 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
46 pub use self::filter::Filter;
47 
48 mod filter_map;
49 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
50 pub use self::filter_map::FilterMap;
51 
52 mod flatten;
53 
54 delegate_all!(
55     /// Stream for the [`flatten`](StreamExt::flatten) method.
56     Flatten<St>(
57         flatten::Flatten<St, St::Item>
58     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)]
59     where St: Stream
60 );
61 
62 mod fold;
63 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
64 pub use self::fold::Fold;
65 
66 #[cfg(feature = "sink")]
67 mod forward;
68 
69 #[cfg(feature = "sink")]
70 delegate_all!(
71     /// Future for the [`forward`](super::StreamExt::forward) method.
72     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
73     Forward<St, Si>(
74         forward::Forward<St, Si, St::Ok>
75     ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)]
76     where St: TryStream
77 );
78 
79 mod for_each;
80 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
81 pub use self::for_each::ForEach;
82 
83 mod fuse;
84 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
85 pub use self::fuse::Fuse;
86 
87 mod into_future;
88 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
89 pub use self::into_future::StreamFuture;
90 
91 delegate_all!(
92     /// Stream for the [`inspect`](StreamExt::inspect) method.
93     Inspect<St, F>(
94         map::Map<St, InspectFn<F>>
95     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| map::Map::new(x, inspect_fn(f))]
96 );
97 
98 mod map;
99 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
100 pub use self::map::Map;
101 
102 delegate_all!(
103     /// Stream for the [`flat_map`](StreamExt::flat_map) method.
104     FlatMap<St, U, F>(
105         flatten::Flatten<Map<St, F>, U>
106     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))]
107 );
108 
109 mod next;
110 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
111 pub use self::next::Next;
112 
113 mod select_next_some;
114 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
115 pub use self::select_next_some::SelectNextSome;
116 
117 mod peek;
118 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
119 pub use self::peek::{Peek, Peekable};
120 
121 mod skip;
122 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
123 pub use self::skip::Skip;
124 
125 mod skip_while;
126 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
127 pub use self::skip_while::SkipWhile;
128 
129 mod take;
130 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
131 pub use self::take::Take;
132 
133 mod take_while;
134 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
135 pub use self::take_while::TakeWhile;
136 
137 mod take_until;
138 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
139 pub use self::take_until::TakeUntil;
140 
141 mod then;
142 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
143 pub use self::then::Then;
144 
145 mod zip;
146 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
147 pub use self::zip::Zip;
148 
149 #[cfg(feature = "alloc")]
150 mod chunks;
151 #[cfg(feature = "alloc")]
152 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
153 pub use self::chunks::Chunks;
154 
155 #[cfg(feature = "alloc")]
156 mod ready_chunks;
157 #[cfg(feature = "alloc")]
158 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
159 pub use self::ready_chunks::ReadyChunks;
160 
161 mod scan;
162 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
163 pub use self::scan::Scan;
164 
165 cfg_target_has_atomic! {
166     #[cfg(feature = "alloc")]
167     mod buffer_unordered;
168     #[cfg(feature = "alloc")]
169     #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
170     pub use self::buffer_unordered::BufferUnordered;
171 
172     #[cfg(feature = "alloc")]
173     mod buffered;
174     #[cfg(feature = "alloc")]
175     #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
176     pub use self::buffered::Buffered;
177 
178     #[cfg(feature = "alloc")]
179     mod for_each_concurrent;
180     #[cfg(feature = "alloc")]
181     #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
182     pub use self::for_each_concurrent::ForEachConcurrent;
183 
184     #[cfg(feature = "sink")]
185     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
186     #[cfg(feature = "alloc")]
187     mod split;
188     #[cfg(feature = "sink")]
189     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
190     #[cfg(feature = "alloc")]
191     #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
192     pub use self::split::{SplitStream, SplitSink, ReuniteError};
193 }
194 
195 #[cfg(feature = "std")]
196 mod catch_unwind;
197 #[cfg(feature = "std")]
198 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
199 pub use self::catch_unwind::CatchUnwind;
200 use crate::stream::assert_stream;
201 
202 impl<T: ?Sized> StreamExt for T where T: Stream {}
203 
204 /// An extension trait for `Stream`s that provides a variety of convenient
205 /// combinator functions.
206 pub trait StreamExt: Stream {
207     /// Creates a future that resolves to the next item in the stream.
208     ///
209     /// Note that because `next` doesn't take ownership over the stream,
210     /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
211     /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
212     /// be done by boxing the stream using [`Box::pin`] or
213     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
214     /// crate.
215     ///
216     /// # Examples
217     ///
218     /// ```
219     /// # futures::executor::block_on(async {
220     /// use futures::stream::{self, StreamExt};
221     ///
222     /// let mut stream = stream::iter(1..=3);
223     ///
224     /// assert_eq!(stream.next().await, Some(1));
225     /// assert_eq!(stream.next().await, Some(2));
226     /// assert_eq!(stream.next().await, Some(3));
227     /// assert_eq!(stream.next().await, None);
228     /// # });
229     /// ```
next(&mut self) -> Next<'_, Self> where Self: Unpin,230     fn next(&mut self) -> Next<'_, Self>
231     where
232         Self: Unpin,
233     {
234         assert_future::<Option<Self::Item>, _>(Next::new(self))
235     }
236 
237     /// Converts this stream into a future of `(next_item, tail_of_stream)`.
238     /// If the stream terminates, then the next item is [`None`].
239     ///
240     /// The returned future can be used to compose streams and futures together
241     /// by placing everything into the "world of futures".
242     ///
243     /// Note that because `into_future` moves the stream, the [`Stream`] type
244     /// must be [`Unpin`]. If you want to use `into_future` with a
245     /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
246     /// be done by boxing the stream using [`Box::pin`] or
247     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
248     /// crate.
249     ///
250     /// # Examples
251     ///
252     /// ```
253     /// # futures::executor::block_on(async {
254     /// use futures::stream::{self, StreamExt};
255     ///
256     /// let stream = stream::iter(1..=3);
257     ///
258     /// let (item, stream) = stream.into_future().await;
259     /// assert_eq!(Some(1), item);
260     ///
261     /// let (item, stream) = stream.into_future().await;
262     /// assert_eq!(Some(2), item);
263     /// # });
264     /// ```
into_future(self) -> StreamFuture<Self> where Self: Sized + Unpin,265     fn into_future(self) -> StreamFuture<Self>
266     where
267         Self: Sized + Unpin,
268     {
269         assert_future::<(Option<Self::Item>, Self), _>(StreamFuture::new(self))
270     }
271 
272     /// Maps this stream's items to a different type, returning a new stream of
273     /// the resulting type.
274     ///
275     /// The provided closure is executed over all elements of this stream as
276     /// they are made available. It is executed inline with calls to
277     /// [`poll_next`](Stream::poll_next).
278     ///
279     /// Note that this function consumes the stream passed into it and returns a
280     /// wrapped version of it, similar to the existing `map` methods in the
281     /// standard library.
282     ///
283     /// # Examples
284     ///
285     /// ```
286     /// # futures::executor::block_on(async {
287     /// use futures::stream::{self, StreamExt};
288     ///
289     /// let stream = stream::iter(1..=3);
290     /// let stream = stream.map(|x| x + 3);
291     ///
292     /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
293     /// # });
294     /// ```
map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,295     fn map<T, F>(self, f: F) -> Map<Self, F>
296     where
297         F: FnMut(Self::Item) -> T,
298         Self: Sized,
299     {
300         assert_stream::<T, _>(Map::new(self, f))
301     }
302 
303     /// Creates a stream which gives the current iteration count as well as
304     /// the next value.
305     ///
306     /// The stream returned yields pairs `(i, val)`, where `i` is the
307     /// current index of iteration and `val` is the value returned by the
308     /// stream.
309     ///
310     /// `enumerate()` keeps its count as a [`usize`]. If you want to count by a
311     /// different sized integer, the [`zip`](StreamExt::zip) function provides similar
312     /// functionality.
313     ///
314     /// # Overflow Behavior
315     ///
316     /// The method does no guarding against overflows, so enumerating more than
317     /// [`prim@usize::max_value()`] elements either produces the wrong result or panics. If
318     /// debug assertions are enabled, a panic is guaranteed.
319     ///
320     /// # Panics
321     ///
322     /// The returned stream might panic if the to-be-returned index would
323     /// overflow a [`usize`].
324     ///
325     /// # Examples
326     ///
327     /// ```
328     /// # futures::executor::block_on(async {
329     /// use futures::stream::{self, StreamExt};
330     ///
331     /// let stream = stream::iter(vec!['a', 'b', 'c']);
332     ///
333     /// let mut stream = stream.enumerate();
334     ///
335     /// assert_eq!(stream.next().await, Some((0, 'a')));
336     /// assert_eq!(stream.next().await, Some((1, 'b')));
337     /// assert_eq!(stream.next().await, Some((2, 'c')));
338     /// assert_eq!(stream.next().await, None);
339     /// # });
340     /// ```
enumerate(self) -> Enumerate<Self> where Self: Sized,341     fn enumerate(self) -> Enumerate<Self>
342     where
343         Self: Sized,
344     {
345         assert_stream::<(usize, Self::Item), _>(Enumerate::new(self))
346     }
347 
348     /// Filters the values produced by this stream according to the provided
349     /// asynchronous predicate.
350     ///
351     /// As values of this stream are made available, the provided predicate `f`
352     /// will be run against them. If the predicate returns a `Future` which
353     /// resolves to `true`, then the stream will yield the value, but if the
354     /// predicate returns a `Future` which resolves to `false`, then the value
355     /// will be discarded and the next value will be produced.
356     ///
357     /// Note that this function consumes the stream passed into it and returns a
358     /// wrapped version of it, similar to the existing `filter` methods in the
359     /// standard library.
360     ///
361     /// # Examples
362     ///
363     /// ```
364     /// # futures::executor::block_on(async {
365     /// use futures::future;
366     /// use futures::stream::{self, StreamExt};
367     ///
368     /// let stream = stream::iter(1..=10);
369     /// let evens = stream.filter(|x| future::ready(x % 2 == 0));
370     ///
371     /// assert_eq!(vec![2, 4, 6, 8, 10], evens.collect::<Vec<_>>().await);
372     /// # });
373     /// ```
filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,374     fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
375     where
376         F: FnMut(&Self::Item) -> Fut,
377         Fut: Future<Output = bool>,
378         Self: Sized,
379     {
380         assert_stream::<Self::Item, _>(Filter::new(self, f))
381     }
382 
383     /// Filters the values produced by this stream while simultaneously mapping
384     /// them to a different type according to the provided asynchronous closure.
385     ///
386     /// As values of this stream are made available, the provided function will
387     /// be run on them. If the future returned by the predicate `f` resolves to
388     /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
389     /// it resolves to [`None`] then the next value will be produced.
390     ///
391     /// Note that this function consumes the stream passed into it and returns a
392     /// wrapped version of it, similar to the existing `filter_map` methods in
393     /// the standard library.
394     ///
395     /// # Examples
396     /// ```
397     /// # futures::executor::block_on(async {
398     /// use futures::stream::{self, StreamExt};
399     ///
400     /// let stream = stream::iter(1..=10);
401     /// let evens = stream.filter_map(|x| async move {
402     ///     if x % 2 == 0 { Some(x + 1) } else { None }
403     /// });
404     ///
405     /// assert_eq!(vec![3, 5, 7, 9, 11], evens.collect::<Vec<_>>().await);
406     /// # });
407     /// ```
filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = Option<T>>, Self: Sized,408     fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
409     where
410         F: FnMut(Self::Item) -> Fut,
411         Fut: Future<Output = Option<T>>,
412         Self: Sized,
413     {
414         assert_stream::<T, _>(FilterMap::new(self, f))
415     }
416 
417     /// Computes from this stream's items new items of a different type using
418     /// an asynchronous closure.
419     ///
420     /// The provided closure `f` will be called with an `Item` once a value is
421     /// ready, it returns a future which will then be run to completion
422     /// to produce the next value on this stream.
423     ///
424     /// Note that this function consumes the stream passed into it and returns a
425     /// wrapped version of it.
426     ///
427     /// # Examples
428     ///
429     /// ```
430     /// # futures::executor::block_on(async {
431     /// use futures::stream::{self, StreamExt};
432     ///
433     /// let stream = stream::iter(1..=3);
434     /// let stream = stream.then(|x| async move { x + 3 });
435     ///
436     /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
437     /// # });
438     /// ```
then<Fut, F>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,439     fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
440     where
441         F: FnMut(Self::Item) -> Fut,
442         Fut: Future,
443         Self: Sized,
444     {
445         assert_stream::<Fut::Output, _>(Then::new(self, f))
446     }
447 
448     /// Transforms a stream into a collection, returning a
449     /// future representing the result of that computation.
450     ///
451     /// The returned future will be resolved when the stream terminates.
452     ///
453     /// # Examples
454     ///
455     /// ```
456     /// # futures::executor::block_on(async {
457     /// use futures::channel::mpsc;
458     /// use futures::stream::StreamExt;
459     /// use std::thread;
460     ///
461     /// let (tx, rx) = mpsc::unbounded();
462     ///
463     /// thread::spawn(move || {
464     ///     for i in 1..=5 {
465     ///         tx.unbounded_send(i).unwrap();
466     ///     }
467     /// });
468     ///
469     /// let output = rx.collect::<Vec<i32>>().await;
470     /// assert_eq!(output, vec![1, 2, 3, 4, 5]);
471     /// # });
472     /// ```
collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C> where Self: Sized,473     fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C>
474     where
475         Self: Sized,
476     {
477         assert_future::<C, _>(Collect::new(self))
478     }
479 
480     /// Concatenate all items of a stream into a single extendable
481     /// destination, returning a future representing the end result.
482     ///
483     /// This combinator will extend the first item with the contents
484     /// of all the subsequent results of the stream. If the stream is
485     /// empty, the default value will be returned.
486     ///
487     /// Works with all collections that implement the
488     /// [`Extend`](std::iter::Extend) trait.
489     ///
490     /// # Examples
491     ///
492     /// ```
493     /// # futures::executor::block_on(async {
494     /// use futures::channel::mpsc;
495     /// use futures::stream::StreamExt;
496     /// use std::thread;
497     ///
498     /// let (tx, rx) = mpsc::unbounded();
499     ///
500     /// thread::spawn(move || {
501     ///     for i in (0..3).rev() {
502     ///         let n = i * 3;
503     ///         tx.unbounded_send(vec![n + 1, n + 2, n + 3]).unwrap();
504     ///     }
505     /// });
506     ///
507     /// let result = rx.concat().await;
508     ///
509     /// assert_eq!(result, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]);
510     /// # });
511     /// ```
concat(self) -> Concat<Self> where Self: Sized, Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,512     fn concat(self) -> Concat<Self>
513     where
514         Self: Sized,
515         Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
516     {
517         assert_future::<Self::Item, _>(Concat::new(self))
518     }
519 
520     /// Repeats a stream endlessly.
521     ///
522     /// The stream never terminates. Note that you likely want to avoid
523     /// usage of `collect` or such on the returned stream as it will exhaust
524     /// available memory as it tries to just fill up all RAM.
525     ///
526     /// # Examples
527     ///
528     /// ```
529     /// # futures::executor::block_on(async {
530     /// use futures::stream::{self, StreamExt};
531     /// let a = [1, 2, 3];
532     /// let mut s = stream::iter(a.iter()).cycle();
533     ///
534     /// assert_eq!(s.next().await, Some(&1));
535     /// assert_eq!(s.next().await, Some(&2));
536     /// assert_eq!(s.next().await, Some(&3));
537     /// assert_eq!(s.next().await, Some(&1));
538     /// assert_eq!(s.next().await, Some(&2));
539     /// assert_eq!(s.next().await, Some(&3));
540     /// assert_eq!(s.next().await, Some(&1));
541     /// # });
542     /// ```
cycle(self) -> Cycle<Self> where Self: Sized + Clone,543     fn cycle(self) -> Cycle<Self>
544     where
545         Self: Sized + Clone,
546     {
547         assert_stream::<Self::Item, _>(Cycle::new(self))
548     }
549 
550     /// Execute an accumulating asynchronous computation over a stream,
551     /// collecting all the values into one final result.
552     ///
553     /// This combinator will accumulate all values returned by this stream
554     /// according to the closure provided. The initial state is also provided to
555     /// this method and then is returned again by each execution of the closure.
556     /// Once the entire stream has been exhausted the returned future will
557     /// resolve to this value.
558     ///
559     /// # Examples
560     ///
561     /// ```
562     /// # futures::executor::block_on(async {
563     /// use futures::stream::{self, StreamExt};
564     ///
565     /// let number_stream = stream::iter(0..6);
566     /// let sum = number_stream.fold(0, |acc, x| async move { acc + x });
567     /// assert_eq!(sum.await, 15);
568     /// # });
569     /// ```
fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> where F: FnMut(T, Self::Item) -> Fut, Fut: Future<Output = T>, Self: Sized,570     fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
571     where
572         F: FnMut(T, Self::Item) -> Fut,
573         Fut: Future<Output = T>,
574         Self: Sized,
575     {
576         assert_future::<T, _>(Fold::new(self, f, init))
577     }
578 
579     /// Flattens a stream of streams into just one continuous stream.
580     ///
581     /// # Examples
582     ///
583     /// ```
584     /// # futures::executor::block_on(async {
585     /// use futures::channel::mpsc;
586     /// use futures::stream::StreamExt;
587     /// use std::thread;
588     ///
589     /// let (tx1, rx1) = mpsc::unbounded();
590     /// let (tx2, rx2) = mpsc::unbounded();
591     /// let (tx3, rx3) = mpsc::unbounded();
592     ///
593     /// thread::spawn(move || {
594     ///     tx1.unbounded_send(1).unwrap();
595     ///     tx1.unbounded_send(2).unwrap();
596     /// });
597     /// thread::spawn(move || {
598     ///     tx2.unbounded_send(3).unwrap();
599     ///     tx2.unbounded_send(4).unwrap();
600     /// });
601     /// thread::spawn(move || {
602     ///     tx3.unbounded_send(rx1).unwrap();
603     ///     tx3.unbounded_send(rx2).unwrap();
604     /// });
605     ///
606     /// let output = rx3.flatten().collect::<Vec<i32>>().await;
607     /// assert_eq!(output, vec![1, 2, 3, 4]);
608     /// # });
609     /// ```
flatten(self) -> Flatten<Self> where Self::Item: Stream, Self: Sized,610     fn flatten(self) -> Flatten<Self>
611     where
612         Self::Item: Stream,
613         Self: Sized,
614     {
615         assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self))
616     }
617 
618     /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
619     ///
620     /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead,
621     /// you would have to chain combinators like `.map(f).flatten()` while this
622     /// combinator provides ability to write `.flat_map(f)` instead of chaining.
623     ///
624     /// The provided closure which produce inner streams is executed over all elements
625     /// of stream as last inner stream is terminated and next stream item is available.
626     ///
627     /// Note that this function consumes the stream passed into it and returns a
628     /// wrapped version of it, similar to the existing `flat_map` methods in the
629     /// standard library.
630     ///
631     /// # Examples
632     ///
633     /// ```
634     /// # futures::executor::block_on(async {
635     /// use futures::stream::{self, StreamExt};
636     ///
637     /// let stream = stream::iter(1..=3);
638     /// let stream = stream.flat_map(|x| stream::iter(vec![x + 3; x]));
639     ///
640     /// assert_eq!(vec![4, 5, 5, 6, 6, 6], stream.collect::<Vec<_>>().await);
641     /// # });
642     /// ```
flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> where F: FnMut(Self::Item) -> U, U: Stream, Self: Sized,643     fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
644     where
645         F: FnMut(Self::Item) -> U,
646         U: Stream,
647         Self: Sized,
648     {
649         FlatMap::new(self, f)
650     }
651 
652     /// Combinator similar to [`StreamExt::fold`] that holds internal state
653     /// and produces a new stream.
654     ///
655     /// Accepts initial state and closure which will be applied to each element
656     /// of the stream until provided closure returns `None`. Once `None` is
657     /// returned, stream will be terminated.
658     ///
659     /// # Examples
660     ///
661     /// ```
662     /// # futures::executor::block_on(async {
663     /// use futures::future;
664     /// use futures::stream::{self, StreamExt};
665     ///
666     /// let stream = stream::iter(1..=10);
667     ///
668     /// let stream = stream.scan(0, |state, x| {
669     ///     *state += x;
670     ///     future::ready(if *state < 10 { Some(x) } else { None })
671     /// });
672     ///
673     /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await);
674     /// # });
675     /// ```
scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> where F: FnMut(&mut S, Self::Item) -> Fut, Fut: Future<Output = Option<B>>, Self: Sized,676     fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
677     where
678         F: FnMut(&mut S, Self::Item) -> Fut,
679         Fut: Future<Output = Option<B>>,
680         Self: Sized,
681     {
682         Scan::new(self, initial_state, f)
683     }
684 
685     /// Skip elements on this stream while the provided asynchronous predicate
686     /// resolves to `true`.
687     ///
688     /// This function, like `Iterator::skip_while`, will skip elements on the
689     /// stream until the predicate `f` resolves to `false`. Once one element
690     /// returns `false`, all future elements will be returned from the underlying
691     /// stream.
692     ///
693     /// # Examples
694     ///
695     /// ```
696     /// # futures::executor::block_on(async {
697     /// use futures::future;
698     /// use futures::stream::{self, StreamExt};
699     ///
700     /// let stream = stream::iter(1..=10);
701     ///
702     /// let stream = stream.skip_while(|x| future::ready(*x <= 5));
703     ///
704     /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await);
705     /// # });
706     /// ```
skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,707     fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
708     where
709         F: FnMut(&Self::Item) -> Fut,
710         Fut: Future<Output = bool>,
711         Self: Sized,
712     {
713         assert_stream::<Self::Item, _>(SkipWhile::new(self, f))
714     }
715 
716     /// Take elements from this stream while the provided asynchronous predicate
717     /// resolves to `true`.
718     ///
719     /// This function, like `Iterator::take_while`, will take elements from the
720     /// stream until the predicate `f` resolves to `false`. Once one element
721     /// returns `false`, it will always return that the stream is done.
722     ///
723     /// # Examples
724     ///
725     /// ```
726     /// # futures::executor::block_on(async {
727     /// use futures::future;
728     /// use futures::stream::{self, StreamExt};
729     ///
730     /// let stream = stream::iter(1..=10);
731     ///
732     /// let stream = stream.take_while(|x| future::ready(*x <= 5));
733     ///
734     /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
735     /// # });
736     /// ```
take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,737     fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
738     where
739         F: FnMut(&Self::Item) -> Fut,
740         Fut: Future<Output = bool>,
741         Self: Sized,
742     {
743         assert_stream::<Self::Item, _>(TakeWhile::new(self, f))
744     }
745 
746     /// Take elements from this stream until the provided future resolves.
747     ///
748     /// This function will take elements from the stream until the provided
749     /// stopping future `fut` resolves. Once the `fut` future becomes ready,
750     /// this stream combinator will always return that the stream is done.
751     ///
752     /// The stopping future may return any type. Once the stream is stopped
753     /// the result of the stopping future may be aceessed with `TakeUntil::take_result()`.
754     /// The stream may also be resumed with `TakeUntil::take_future()`.
755     /// See the documentation of [`TakeUntil`] for more information.
756     ///
757     /// # Examples
758     ///
759     /// ```
760     /// # futures::executor::block_on(async {
761     /// use futures::future;
762     /// use futures::stream::{self, StreamExt};
763     /// use futures::task::Poll;
764     ///
765     /// let stream = stream::iter(1..=10);
766     ///
767     /// let mut i = 0;
768     /// let stop_fut = future::poll_fn(|_cx| {
769     ///     i += 1;
770     ///     if i <= 5 {
771     ///         Poll::Pending
772     ///     } else {
773     ///         Poll::Ready(())
774     ///     }
775     /// });
776     ///
777     /// let stream = stream.take_until(stop_fut);
778     ///
779     /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
780     /// # });
781     /// ```
take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> where Fut: Future, Self: Sized,782     fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
783     where
784         Fut: Future,
785         Self: Sized,
786     {
787         TakeUntil::new(self, fut)
788     }
789 
790     /// Runs this stream to completion, executing the provided asynchronous
791     /// closure for each element on the stream.
792     ///
793     /// The closure provided will be called for each item this stream produces,
794     /// yielding a future. That future will then be executed to completion
795     /// before moving on to the next item.
796     ///
797     /// The returned value is a `Future` where the `Output` type is `()`; it is
798     /// executed entirely for its side effects.
799     ///
800     /// To process each item in the stream and produce another stream instead
801     /// of a single future, use `then` instead.
802     ///
803     /// # Examples
804     ///
805     /// ```
806     /// # futures::executor::block_on(async {
807     /// use futures::future;
808     /// use futures::stream::{self, StreamExt};
809     ///
810     /// let mut x = 0;
811     ///
812     /// {
813     ///     let fut = stream::repeat(1).take(3).for_each(|item| {
814     ///         x += item;
815     ///         future::ready(())
816     ///     });
817     ///     fut.await;
818     /// }
819     ///
820     /// assert_eq!(x, 3);
821     /// # });
822     /// ```
for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,823     fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
824     where
825         F: FnMut(Self::Item) -> Fut,
826         Fut: Future<Output = ()>,
827         Self: Sized,
828     {
829         assert_future::<(), _>(ForEach::new(self, f))
830     }
831 
832     /// Runs this stream to completion, executing the provided asynchronous
833     /// closure for each element on the stream concurrently as elements become
834     /// available.
835     ///
836     /// This is similar to [`StreamExt::for_each`], but the futures
837     /// produced by the closure are run concurrently (but not in parallel--
838     /// this combinator does not introduce any threads).
839     ///
840     /// The closure provided will be called for each item this stream produces,
841     /// yielding a future. That future will then be executed to completion
842     /// concurrently with the other futures produced by the closure.
843     ///
844     /// The first argument is an optional limit on the number of concurrent
845     /// futures. If this limit is not `None`, no more than `limit` futures
846     /// will be run concurrently. The `limit` argument is of type
847     /// `Into<Option<usize>>`, and so can be provided as either `None`,
848     /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
849     /// no limit at all, and will have the same result as passing in `None`.
850     ///
851     /// This method is only available when the `std` or `alloc` feature of this
852     /// library is activated, and it is activated by default.
853     ///
854     /// # Examples
855     ///
856     /// ```
857     /// # futures::executor::block_on(async {
858     /// use futures::channel::oneshot;
859     /// use futures::stream::{self, StreamExt};
860     ///
861     /// let (tx1, rx1) = oneshot::channel();
862     /// let (tx2, rx2) = oneshot::channel();
863     /// let (tx3, rx3) = oneshot::channel();
864     ///
865     /// let fut = stream::iter(vec![rx1, rx2, rx3]).for_each_concurrent(
866     ///     /* limit */ 2,
867     ///     |rx| async move {
868     ///         rx.await.unwrap();
869     ///     }
870     /// );
871     /// tx1.send(()).unwrap();
872     /// tx2.send(()).unwrap();
873     /// tx3.send(()).unwrap();
874     /// fut.await;
875     /// # })
876     /// ```
877     #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
878     #[cfg(feature = "alloc")]
for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> ForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,879     fn for_each_concurrent<Fut, F>(
880         self,
881         limit: impl Into<Option<usize>>,
882         f: F,
883     ) -> ForEachConcurrent<Self, Fut, F>
884     where
885         F: FnMut(Self::Item) -> Fut,
886         Fut: Future<Output = ()>,
887         Self: Sized,
888     {
889         assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f))
890     }
891 
892     /// Creates a new stream of at most `n` items of the underlying stream.
893     ///
894     /// Once `n` items have been yielded from this stream then it will always
895     /// return that the stream is done.
896     ///
897     /// # Examples
898     ///
899     /// ```
900     /// # futures::executor::block_on(async {
901     /// use futures::stream::{self, StreamExt};
902     ///
903     /// let stream = stream::iter(1..=10).take(3);
904     ///
905     /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await);
906     /// # });
907     /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,908     fn take(self, n: usize) -> Take<Self>
909     where
910         Self: Sized,
911     {
912         assert_stream::<Self::Item, _>(Take::new(self, n))
913     }
914 
915     /// Creates a new stream which skips `n` items of the underlying stream.
916     ///
917     /// Once `n` items have been skipped from this stream then it will always
918     /// return the remaining items on this stream.
919     ///
920     /// # Examples
921     ///
922     /// ```
923     /// # futures::executor::block_on(async {
924     /// use futures::stream::{self, StreamExt};
925     ///
926     /// let stream = stream::iter(1..=10).skip(5);
927     ///
928     /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await);
929     /// # });
930     /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,931     fn skip(self, n: usize) -> Skip<Self>
932     where
933         Self: Sized,
934     {
935         assert_stream::<Self::Item, _>(Skip::new(self, n))
936     }
937 
938     /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never
939     /// again be called once it has finished. This method can be used to turn
940     /// any `Stream` into a `FusedStream`.
941     ///
942     /// Normally, once a stream has returned [`None`] from
943     /// [`poll_next`](Stream::poll_next) any further calls could exhibit bad
944     /// behavior such as block forever, panic, never return, etc. If it is known
945     /// that [`poll_next`](Stream::poll_next) may be called after stream
946     /// has already finished, then this method can be used to ensure that it has
947     /// defined semantics.
948     ///
949     /// The [`poll_next`](Stream::poll_next) method of a `fuse`d stream
950     /// is guaranteed to return [`None`] after the underlying stream has
951     /// finished.
952     ///
953     /// # Examples
954     ///
955     /// ```
956     /// use futures::executor::block_on_stream;
957     /// use futures::stream::{self, StreamExt};
958     /// use futures::task::Poll;
959     ///
960     /// let mut x = 0;
961     /// let stream = stream::poll_fn(|_| {
962     ///     x += 1;
963     ///     match x {
964     ///         0..=2 => Poll::Ready(Some(x)),
965     ///         3 => Poll::Ready(None),
966     ///         _ => panic!("should not happen")
967     ///     }
968     /// }).fuse();
969     ///
970     /// let mut iter = block_on_stream(stream);
971     /// assert_eq!(Some(1), iter.next());
972     /// assert_eq!(Some(2), iter.next());
973     /// assert_eq!(None, iter.next());
974     /// assert_eq!(None, iter.next());
975     /// // ...
976     /// ```
fuse(self) -> Fuse<Self> where Self: Sized,977     fn fuse(self) -> Fuse<Self>
978     where
979         Self: Sized,
980     {
981         assert_stream::<Self::Item, _>(Fuse::new(self))
982     }
983 
984     /// Borrows a stream, rather than consuming it.
985     ///
986     /// This is useful to allow applying stream adaptors while still retaining
987     /// ownership of the original stream.
988     ///
989     /// # Examples
990     ///
991     /// ```
992     /// # futures::executor::block_on(async {
993     /// use futures::stream::{self, StreamExt};
994     ///
995     /// let mut stream = stream::iter(1..5);
996     ///
997     /// let sum = stream.by_ref()
998     ///                 .take(2)
999     ///                 .fold(0, |a, b| async move { a + b })
1000     ///                 .await;
1001     /// assert_eq!(sum, 3);
1002     ///
1003     /// // You can use the stream again
1004     /// let sum = stream.take(2)
1005     ///                 .fold(0, |a, b| async move { a + b })
1006     ///                 .await;
1007     /// assert_eq!(sum, 7);
1008     /// # });
1009     /// ```
by_ref(&mut self) -> &mut Self1010     fn by_ref(&mut self) -> &mut Self {
1011         self
1012     }
1013 
1014     /// Catches unwinding panics while polling the stream.
1015     ///
1016     /// Caught panic (if any) will be the last element of the resulting stream.
1017     ///
1018     /// In general, panics within a stream can propagate all the way out to the
1019     /// task level. This combinator makes it possible to halt unwinding within
1020     /// the stream itself. It's most commonly used within task executors. This
1021     /// method should not be used for error handling.
1022     ///
1023     /// Note that this method requires the `UnwindSafe` bound from the standard
1024     /// library. This isn't always applied automatically, and the standard
1025     /// library provides an `AssertUnwindSafe` wrapper type to apply it
1026     /// after-the fact. To assist using this method, the [`Stream`] trait is
1027     /// also implemented for `AssertUnwindSafe<St>` where `St` implements
1028     /// [`Stream`].
1029     ///
1030     /// This method is only available when the `std` feature of this
1031     /// library is activated, and it is activated by default.
1032     ///
1033     /// # Examples
1034     ///
1035     /// ```
1036     /// # futures::executor::block_on(async {
1037     /// use futures::stream::{self, StreamExt};
1038     ///
1039     /// let stream = stream::iter(vec![Some(10), None, Some(11)]);
1040     /// // Panic on second element
1041     /// let stream_panicking = stream.map(|o| o.unwrap());
1042     /// // Collect all the results
1043     /// let stream = stream_panicking.catch_unwind();
1044     ///
1045     /// let results: Vec<Result<i32, _>> = stream.collect().await;
1046     /// match results[0] {
1047     ///     Ok(10) => {}
1048     ///     _ => panic!("unexpected result!"),
1049     /// }
1050     /// assert!(results[1].is_err());
1051     /// assert_eq!(results.len(), 2);
1052     /// # });
1053     /// ```
1054     #[cfg(feature = "std")]
catch_unwind(self) -> CatchUnwind<Self> where Self: Sized + std::panic::UnwindSafe,1055     fn catch_unwind(self) -> CatchUnwind<Self>
1056     where
1057         Self: Sized + std::panic::UnwindSafe,
1058     {
1059         assert_stream(CatchUnwind::new(self))
1060     }
1061 
1062     /// Wrap the stream in a Box, pinning it.
1063     ///
1064     /// This method is only available when the `std` or `alloc` feature of this
1065     /// library is activated, and it is activated by default.
1066     #[cfg(feature = "alloc")]
boxed<'a>(self) -> BoxStream<'a, Self::Item> where Self: Sized + Send + 'a,1067     fn boxed<'a>(self) -> BoxStream<'a, Self::Item>
1068     where
1069         Self: Sized + Send + 'a,
1070     {
1071         assert_stream::<Self::Item, _>(Box::pin(self))
1072     }
1073 
1074     /// Wrap the stream in a Box, pinning it.
1075     ///
1076     /// Similar to `boxed`, but without the `Send` requirement.
1077     ///
1078     /// This method is only available when the `std` or `alloc` feature of this
1079     /// library is activated, and it is activated by default.
1080     #[cfg(feature = "alloc")]
boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item> where Self: Sized + 'a,1081     fn boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item>
1082     where
1083         Self: Sized + 'a,
1084     {
1085         assert_stream::<Self::Item, _>(Box::pin(self))
1086     }
1087 
1088     /// An adaptor for creating a buffered list of pending futures.
1089     ///
1090     /// If this stream's item can be converted into a future, then this adaptor
1091     /// will buffer up to at most `n` futures and then return the outputs in the
1092     /// same order as the underlying stream. No more than `n` futures will be
1093     /// buffered at any point in time, and less than `n` may also be buffered
1094     /// depending on the state of each future.
1095     ///
1096     /// The returned stream will be a stream of each future's output.
1097     ///
1098     /// This method is only available when the `std` or `alloc` feature of this
1099     /// library is activated, and it is activated by default.
1100     #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
1101     #[cfg(feature = "alloc")]
buffered(self, n: usize) -> Buffered<Self> where Self::Item: Future, Self: Sized,1102     fn buffered(self, n: usize) -> Buffered<Self>
1103     where
1104         Self::Item: Future,
1105         Self: Sized,
1106     {
1107         assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n))
1108     }
1109 
1110     /// An adaptor for creating a buffered list of pending futures (unordered).
1111     ///
1112     /// If this stream's item can be converted into a future, then this adaptor
1113     /// will buffer up to `n` futures and then return the outputs in the order
1114     /// in which they complete. No more than `n` futures will be buffered at
1115     /// any point in time, and less than `n` may also be buffered depending on
1116     /// the state of each future.
1117     ///
1118     /// The returned stream will be a stream of each future's output.
1119     ///
1120     /// This method is only available when the `std` or `alloc` feature of this
1121     /// library is activated, and it is activated by default.
1122     ///
1123     /// # Examples
1124     ///
1125     /// ```
1126     /// # futures::executor::block_on(async {
1127     /// use futures::channel::oneshot;
1128     /// use futures::stream::{self, StreamExt};
1129     ///
1130     /// let (send_one, recv_one) = oneshot::channel();
1131     /// let (send_two, recv_two) = oneshot::channel();
1132     ///
1133     /// let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
1134     /// let mut buffered = stream_of_futures.buffer_unordered(10);
1135     ///
1136     /// send_two.send(2i32)?;
1137     /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1138     ///
1139     /// send_one.send(1i32)?;
1140     /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1141     ///
1142     /// assert_eq!(buffered.next().await, None);
1143     /// # Ok::<(), i32>(()) }).unwrap();
1144     /// ```
1145     #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
1146     #[cfg(feature = "alloc")]
buffer_unordered(self, n: usize) -> BufferUnordered<Self> where Self::Item: Future, Self: Sized,1147     fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
1148     where
1149         Self::Item: Future,
1150         Self: Sized,
1151     {
1152         assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n))
1153     }
1154 
1155     /// An adapter for zipping two streams together.
1156     ///
1157     /// The zipped stream waits for both streams to produce an item, and then
1158     /// returns that pair. If either stream ends then the zipped stream will
1159     /// also end.
1160     ///
1161     /// # Examples
1162     ///
1163     /// ```
1164     /// # futures::executor::block_on(async {
1165     /// use futures::stream::{self, StreamExt};
1166     ///
1167     /// let stream1 = stream::iter(1..=3);
1168     /// let stream2 = stream::iter(5..=10);
1169     ///
1170     /// let vec = stream1.zip(stream2)
1171     ///                  .collect::<Vec<_>>()
1172     ///                  .await;
1173     /// assert_eq!(vec![(1, 5), (2, 6), (3, 7)], vec);
1174     /// # });
1175     /// ```
1176     ///
zip<St>(self, other: St) -> Zip<Self, St> where St: Stream, Self: Sized,1177     fn zip<St>(self, other: St) -> Zip<Self, St>
1178     where
1179         St: Stream,
1180         Self: Sized,
1181     {
1182         assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other))
1183     }
1184 
1185     /// Adapter for chaining two streams.
1186     ///
1187     /// The resulting stream emits elements from the first stream, and when
1188     /// first stream reaches the end, emits the elements from the second stream.
1189     ///
1190     /// ```
1191     /// # futures::executor::block_on(async {
1192     /// use futures::stream::{self, StreamExt};
1193     ///
1194     /// let stream1 = stream::iter(vec![Ok(10), Err(false)]);
1195     /// let stream2 = stream::iter(vec![Err(true), Ok(20)]);
1196     ///
1197     /// let stream = stream1.chain(stream2);
1198     ///
1199     /// let result: Vec<_> = stream.collect().await;
1200     /// assert_eq!(result, vec![
1201     ///     Ok(10),
1202     ///     Err(false),
1203     ///     Err(true),
1204     ///     Ok(20),
1205     /// ]);
1206     /// # });
1207     /// ```
chain<St>(self, other: St) -> Chain<Self, St> where St: Stream<Item = Self::Item>, Self: Sized,1208     fn chain<St>(self, other: St) -> Chain<Self, St>
1209     where
1210         St: Stream<Item = Self::Item>,
1211         Self: Sized,
1212     {
1213         assert_stream::<Self::Item, _>(Chain::new(self, other))
1214     }
1215 
1216     /// Creates a new stream which exposes a `peek` method.
1217     ///
1218     /// Calling `peek` returns a reference to the next item in the stream.
peekable(self) -> Peekable<Self> where Self: Sized,1219     fn peekable(self) -> Peekable<Self>
1220     where
1221         Self: Sized,
1222     {
1223         assert_stream::<Self::Item, _>(Peekable::new(self))
1224     }
1225 
1226     /// An adaptor for chunking up items of the stream inside a vector.
1227     ///
1228     /// This combinator will attempt to pull items from this stream and buffer
1229     /// them into a local vector. At most `capacity` items will get buffered
1230     /// before they're yielded from the returned stream.
1231     ///
1232     /// Note that the vectors returned from this iterator may not always have
1233     /// `capacity` elements. If the underlying stream ended and only a partial
1234     /// vector was created, it'll be returned. Additionally if an error happens
1235     /// from the underlying stream then the currently buffered items will be
1236     /// yielded.
1237     ///
1238     /// This method is only available when the `std` or `alloc` feature of this
1239     /// library is activated, and it is activated by default.
1240     ///
1241     /// # Panics
1242     ///
1243     /// This method will panic if `capacity` is zero.
1244     #[cfg(feature = "alloc")]
chunks(self, capacity: usize) -> Chunks<Self> where Self: Sized,1245     fn chunks(self, capacity: usize) -> Chunks<Self>
1246     where
1247         Self: Sized,
1248     {
1249         assert_stream::<alloc::vec::Vec<Self::Item>, _>(Chunks::new(self, capacity))
1250     }
1251 
1252     /// An adaptor for chunking up ready items of the stream inside a vector.
1253     ///
1254     /// This combinator will attempt to pull ready items from this stream and
1255     /// buffer them into a local vector. At most `capacity` items will get
1256     /// buffered before they're yielded from the returned stream. If underlying
1257     /// stream returns `Poll::Pending`, and collected chunk is not empty, it will
1258     /// be immediately returned.
1259     ///
1260     /// If the underlying stream ended and only a partial vector was created,
1261     /// it'll be returned. Additionally if an error happens from the underlying
1262     /// stream then the currently buffered items will be yielded.
1263     ///
1264     /// This method is only available when the `std` or `alloc` feature of this
1265     /// library is activated, and it is activated by default.
1266     ///
1267     /// # Panics
1268     ///
1269     /// This method will panic if `capacity` is zero.
1270     #[cfg(feature = "alloc")]
ready_chunks(self, capacity: usize) -> ReadyChunks<Self> where Self: Sized,1271     fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>
1272         where
1273             Self: Sized,
1274     {
1275         ReadyChunks::new(self, capacity)
1276     }
1277 
1278     /// A future that completes after the given stream has been fully processed
1279     /// into the sink and the sink has been flushed and closed.
1280     ///
1281     /// This future will drive the stream to keep producing items until it is
1282     /// exhausted, sending each item to the sink. It will complete once the
1283     /// stream is exhausted, the sink has received and flushed all items, and
1284     /// the sink is closed. Note that neither the original stream nor provided
1285     /// sink will be output by this future. Pass the sink by `Pin<&mut S>`
1286     /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in
1287     /// order to preserve access to the `Sink`.
1288     #[cfg(feature = "sink")]
1289     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
forward<S>(self, sink: S) -> Forward<Self, S> where S: Sink<Self::Ok, Error = Self::Error>, Self: TryStream + Sized,1290     fn forward<S>(self, sink: S) -> Forward<Self, S>
1291     where
1292         S: Sink<Self::Ok, Error = Self::Error>,
1293         Self: TryStream + Sized,
1294     {
1295         Forward::new(self, sink)
1296     }
1297 
1298     /// Splits this `Stream + Sink` object into separate `Sink` and `Stream`
1299     /// objects.
1300     ///
1301     /// This can be useful when you want to split ownership between tasks, or
1302     /// allow direct interaction between the two objects (e.g. via
1303     /// `Sink::send_all`).
1304     ///
1305     /// This method is only available when the `std` or `alloc` feature of this
1306     /// library is activated, and it is activated by default.
1307     #[cfg(feature = "sink")]
1308     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
1309     #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
1310     #[cfg(feature = "alloc")]
split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) where Self: Sink<Item> + Sized,1311     fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
1312     where
1313         Self: Sink<Item> + Sized,
1314     {
1315         let (sink, stream) = split::split(self);
1316         (sink, assert_stream::<Self::Item, _>(stream))
1317     }
1318 
1319     /// Do something with each item of this stream, afterwards passing it on.
1320     ///
1321     /// This is similar to the `Iterator::inspect` method in the standard
1322     /// library where it allows easily inspecting each value as it passes
1323     /// through the stream, for example to debug what's going on.
inspect<F>(self, f: F) -> Inspect<Self, F> where F: FnMut(&Self::Item), Self: Sized,1324     fn inspect<F>(self, f: F) -> Inspect<Self, F>
1325     where
1326         F: FnMut(&Self::Item),
1327         Self: Sized,
1328     {
1329         assert_stream::<Self::Item, _>(Inspect::new(self, f))
1330     }
1331 
1332     /// Wrap this stream in an `Either` stream, making it the left-hand variant
1333     /// of that `Either`.
1334     ///
1335     /// This can be used in combination with the `right_stream` method to write `if`
1336     /// statements that evaluate to different streams in different branches.
left_stream<B>(self) -> Either<Self, B> where B: Stream<Item = Self::Item>, Self: Sized,1337     fn left_stream<B>(self) -> Either<Self, B>
1338     where
1339         B: Stream<Item = Self::Item>,
1340         Self: Sized,
1341     {
1342         assert_stream::<Self::Item, _>(Either::Left(self))
1343     }
1344 
1345     /// Wrap this stream in an `Either` stream, making it the right-hand variant
1346     /// of that `Either`.
1347     ///
1348     /// This can be used in combination with the `left_stream` method to write `if`
1349     /// statements that evaluate to different streams in different branches.
right_stream<B>(self) -> Either<B, Self> where B: Stream<Item = Self::Item>, Self: Sized,1350     fn right_stream<B>(self) -> Either<B, Self>
1351     where
1352         B: Stream<Item = Self::Item>,
1353         Self: Sized,
1354     {
1355         assert_stream::<Self::Item, _>(Either::Right(self))
1356     }
1357 
1358     /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`]
1359     /// stream types.
poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> where Self: Unpin,1360     fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
1361     where
1362         Self: Unpin,
1363     {
1364         Pin::new(self).poll_next(cx)
1365     }
1366 
1367     /// Returns a [`Future`] that resolves when the next item in this stream is
1368     /// ready.
1369     ///
1370     /// This is similar to the [`next`][StreamExt::next] method, but it won't
1371     /// resolve to [`None`] if used on an empty [`Stream`]. Instead, the
1372     /// returned future type will return `true` from
1373     /// [`FusedFuture::is_terminated`][] when the [`Stream`] is empty, allowing
1374     /// [`select_next_some`][StreamExt::select_next_some] to be easily used with
1375     /// the [`select!`] macro.
1376     ///
1377     /// If the future is polled after this [`Stream`] is empty it will panic.
1378     /// Using the future with a [`FusedFuture`][]-aware primitive like the
1379     /// [`select!`] macro will prevent this.
1380     ///
1381     /// [`FusedFuture`]: futures_core::future::FusedFuture
1382     /// [`FusedFuture::is_terminated`]: futures_core::future::FusedFuture::is_terminated
1383     ///
1384     /// # Examples
1385     ///
1386     /// ```
1387     /// # futures::executor::block_on(async {
1388     /// use futures::{future, select};
1389     /// use futures::stream::{StreamExt, FuturesUnordered};
1390     ///
1391     /// let mut fut = future::ready(1);
1392     /// let mut async_tasks = FuturesUnordered::new();
1393     /// let mut total = 0;
1394     /// loop {
1395     ///     select! {
1396     ///         num = fut => {
1397     ///             // First, the `ready` future completes.
1398     ///             total += num;
1399     ///             // Then we spawn a new task onto `async_tasks`,
1400     ///             async_tasks.push(async { 5 });
1401     ///         },
1402     ///         // On the next iteration of the loop, the task we spawned
1403     ///         // completes.
1404     ///         num = async_tasks.select_next_some() => {
1405     ///             total += num;
1406     ///         }
1407     ///         // Finally, both the `ready` future and `async_tasks` have
1408     ///         // finished, so we enter the `complete` branch.
1409     ///         complete => break,
1410     ///     }
1411     /// }
1412     /// assert_eq!(total, 6);
1413     /// # });
1414     /// ```
select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream,1415     fn select_next_some(&mut self) -> SelectNextSome<'_, Self>
1416     where
1417         Self: Unpin + FusedStream,
1418     {
1419         SelectNextSome::new(self)
1420     }
1421 }
1422