1 //! Combinators for the [`Stream`] trait.
2 //!
3 //! # Examples
4 //!
5 //! ```
6 //! use futures_lite::stream::{self, StreamExt};
7 //!
8 //! # spin_on::spin_on(async {
9 //! let mut s = stream::iter(vec![1, 2, 3]);
10 //!
11 //! assert_eq!(s.next().await, Some(1));
12 //! assert_eq!(s.next().await, Some(2));
13 //! assert_eq!(s.next().await, Some(3));
14 //! assert_eq!(s.next().await, None);
15 //! # });
16 //! ```
17 
18 #[cfg(feature = "alloc")]
19 extern crate alloc;
20 
21 #[doc(no_inline)]
22 pub use futures_core::stream::Stream;
23 
24 #[cfg(feature = "alloc")]
25 use alloc::boxed::Box;
26 
27 use core::fmt;
28 use core::future::Future;
29 use core::marker::PhantomData;
30 use core::mem;
31 use core::pin::Pin;
32 use core::task::{Context, Poll};
33 
34 use pin_project_lite::pin_project;
35 
36 use crate::ready;
37 
38 /// Converts a stream into a blocking iterator.
39 ///
40 /// # Examples
41 ///
42 /// ```
43 /// use futures_lite::{pin, stream};
44 ///
45 /// let stream = stream::once(7);
46 /// pin!(stream);
47 ///
48 /// let mut iter = stream::block_on(stream);
49 /// assert_eq!(iter.next(), Some(7));
50 /// assert_eq!(iter.next(), None);
51 /// ```
52 #[cfg(feature = "std")]
block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S>53 pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
54     BlockOn(stream)
55 }
56 
57 /// Iterator for the [`block_on()`] function.
58 #[derive(Debug)]
59 pub struct BlockOn<S>(S);
60 
61 #[cfg(feature = "std")]
62 impl<S: Stream + Unpin> Iterator for BlockOn<S> {
63     type Item = S::Item;
64 
next(&mut self) -> Option<Self::Item>65     fn next(&mut self) -> Option<Self::Item> {
66         crate::future::block_on(self.0.next())
67     }
68 }
69 
70 /// Creates an empty stream.
71 ///
72 /// # Examples
73 ///
74 /// ```
75 /// use futures_lite::stream::{self, StreamExt};
76 ///
77 /// # spin_on::spin_on(async {
78 /// let mut s = stream::empty::<i32>();
79 /// assert_eq!(s.next().await, None);
80 /// # })
81 /// ```
empty<T>() -> Empty<T>82 pub fn empty<T>() -> Empty<T> {
83     Empty {
84         _marker: PhantomData,
85     }
86 }
87 
88 /// Stream for the [`empty()`] function.
89 #[derive(Clone, Debug)]
90 #[must_use = "streams do nothing unless polled"]
91 pub struct Empty<T> {
92     _marker: PhantomData<T>,
93 }
94 
95 impl<T> Unpin for Empty<T> {}
96 
97 impl<T> Stream for Empty<T> {
98     type Item = T;
99 
poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>>100     fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
101         Poll::Ready(None)
102     }
103 
size_hint(&self) -> (usize, Option<usize>)104     fn size_hint(&self) -> (usize, Option<usize>) {
105         (0, Some(0))
106     }
107 }
108 
109 /// Creates a stream from an iterator.
110 ///
111 /// # Examples
112 ///
113 /// ```
114 /// use futures_lite::stream::{self, StreamExt};
115 ///
116 /// # spin_on::spin_on(async {
117 /// let mut s = stream::iter(vec![1, 2]);
118 ///
119 /// assert_eq!(s.next().await, Some(1));
120 /// assert_eq!(s.next().await, Some(2));
121 /// assert_eq!(s.next().await, None);
122 /// # })
123 /// ```
iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter>124 pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
125     Iter {
126         iter: iter.into_iter(),
127     }
128 }
129 
130 /// Stream for the [`iter()`] function.
131 #[derive(Clone, Debug)]
132 #[must_use = "streams do nothing unless polled"]
133 pub struct Iter<I> {
134     iter: I,
135 }
136 
137 impl<I> Unpin for Iter<I> {}
138 
139 impl<I: Iterator> Stream for Iter<I> {
140     type Item = I::Item;
141 
poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>>142     fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
143         Poll::Ready(self.iter.next())
144     }
145 
size_hint(&self) -> (usize, Option<usize>)146     fn size_hint(&self) -> (usize, Option<usize>) {
147         self.iter.size_hint()
148     }
149 }
150 
151 /// Creates a stream that yields a single item.
152 ///
153 /// # Examples
154 ///
155 /// ```
156 /// use futures_lite::stream::{self, StreamExt};
157 ///
158 /// # spin_on::spin_on(async {
159 /// let mut s = stream::once(7);
160 ///
161 /// assert_eq!(s.next().await, Some(7));
162 /// assert_eq!(s.next().await, None);
163 /// # })
164 /// ```
once<T>(t: T) -> Once<T>165 pub fn once<T>(t: T) -> Once<T> {
166     Once { value: Some(t) }
167 }
168 
169 pin_project! {
170     /// Stream for the [`once()`] function.
171     #[derive(Clone, Debug)]
172     #[must_use = "streams do nothing unless polled"]
173     pub struct Once<T> {
174         value: Option<T>,
175     }
176 }
177 
178 impl<T> Stream for Once<T> {
179     type Item = T;
180 
poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>>181     fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
182         Poll::Ready(self.project().value.take())
183     }
184 
size_hint(&self) -> (usize, Option<usize>)185     fn size_hint(&self) -> (usize, Option<usize>) {
186         if self.value.is_some() {
187             (1, Some(1))
188         } else {
189             (0, Some(0))
190         }
191     }
192 }
193 
194 /// Creates a stream that is always pending.
195 ///
196 /// # Examples
197 ///
198 /// ```no_run
199 /// use futures_lite::stream::{self, StreamExt};
200 ///
201 /// # spin_on::spin_on(async {
202 /// let mut s = stream::pending::<i32>();
203 /// s.next().await;
204 /// unreachable!();
205 /// # })
206 /// ```
pending<T>() -> Pending<T>207 pub fn pending<T>() -> Pending<T> {
208     Pending {
209         _marker: PhantomData,
210     }
211 }
212 
213 /// Stream for the [`pending()`] function.
214 #[derive(Clone, Debug)]
215 #[must_use = "streams do nothing unless polled"]
216 pub struct Pending<T> {
217     _marker: PhantomData<T>,
218 }
219 
220 impl<T> Unpin for Pending<T> {}
221 
222 impl<T> Stream for Pending<T> {
223     type Item = T;
224 
poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>>225     fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
226         Poll::Pending
227     }
228 
size_hint(&self) -> (usize, Option<usize>)229     fn size_hint(&self) -> (usize, Option<usize>) {
230         (0, Some(0))
231     }
232 }
233 
234 /// Creates a stream from a function returning [`Poll`].
235 ///
236 /// # Examples
237 ///
238 /// ```
239 /// use futures_lite::stream::{self, StreamExt};
240 /// use std::task::{Context, Poll};
241 ///
242 /// # spin_on::spin_on(async {
243 /// fn f(_: &mut Context<'_>) -> Poll<Option<i32>> {
244 ///     Poll::Ready(Some(7))
245 /// }
246 ///
247 /// assert_eq!(stream::poll_fn(f).next().await, Some(7));
248 /// # })
249 /// ```
poll_fn<T, F>(f: F) -> PollFn<F> where F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,250 pub fn poll_fn<T, F>(f: F) -> PollFn<F>
251 where
252     F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
253 {
254     PollFn { f }
255 }
256 
257 /// Stream for the [`poll_fn()`] function.
258 #[derive(Clone)]
259 #[must_use = "streams do nothing unless polled"]
260 pub struct PollFn<F> {
261     f: F,
262 }
263 
264 impl<F> Unpin for PollFn<F> {}
265 
266 impl<F> fmt::Debug for PollFn<F> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result267     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268         f.debug_struct("PollFn").finish()
269     }
270 }
271 
272 impl<T, F> Stream for PollFn<F>
273 where
274     F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
275 {
276     type Item = T;
277 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>278     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
279         (&mut self.f)(cx)
280     }
281 }
282 
283 /// Creates an infinite stream that yields the same item repeatedly.
284 ///
285 /// # Examples
286 ///
287 /// ```
288 /// use futures_lite::stream::{self, StreamExt};
289 ///
290 /// # spin_on::spin_on(async {
291 /// let mut s = stream::repeat(7);
292 ///
293 /// assert_eq!(s.next().await, Some(7));
294 /// assert_eq!(s.next().await, Some(7));
295 /// # })
296 /// ```
repeat<T: Clone>(item: T) -> Repeat<T>297 pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
298     Repeat { item }
299 }
300 
301 /// Stream for the [`repeat()`] function.
302 #[derive(Clone, Debug)]
303 #[must_use = "streams do nothing unless polled"]
304 pub struct Repeat<T> {
305     item: T,
306 }
307 
308 impl<T> Unpin for Repeat<T> {}
309 
310 impl<T: Clone> Stream for Repeat<T> {
311     type Item = T;
312 
poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>>313     fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
314         Poll::Ready(Some(self.item.clone()))
315     }
316 
size_hint(&self) -> (usize, Option<usize>)317     fn size_hint(&self) -> (usize, Option<usize>) {
318         (usize::max_value(), None)
319     }
320 }
321 
322 /// Creates an infinite stream from a closure that generates items.
323 ///
324 /// # Examples
325 ///
326 /// ```
327 /// use futures_lite::stream::{self, StreamExt};
328 ///
329 /// # spin_on::spin_on(async {
330 /// let mut s = stream::repeat_with(|| 7);
331 ///
332 /// assert_eq!(s.next().await, Some(7));
333 /// assert_eq!(s.next().await, Some(7));
334 /// # })
335 /// ```
repeat_with<T, F>(repeater: F) -> RepeatWith<F> where F: FnMut() -> T,336 pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
337 where
338     F: FnMut() -> T,
339 {
340     RepeatWith { f: repeater }
341 }
342 
343 /// Stream for the [`repeat_with()`] function.
344 #[derive(Clone, Debug)]
345 #[must_use = "streams do nothing unless polled"]
346 pub struct RepeatWith<F> {
347     f: F,
348 }
349 
350 impl<F> Unpin for RepeatWith<F> {}
351 
352 impl<T, F> Stream for RepeatWith<F>
353 where
354     F: FnMut() -> T,
355 {
356     type Item = T;
357 
poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>>358     fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
359         let item = (&mut self.f)();
360         Poll::Ready(Some(item))
361     }
362 
size_hint(&self) -> (usize, Option<usize>)363     fn size_hint(&self) -> (usize, Option<usize>) {
364         (usize::max_value(), None)
365     }
366 }
367 
368 /// Creates a stream from a seed value and an async closure operating on it.
369 ///
370 /// # Examples
371 ///
372 /// ```
373 /// use futures_lite::stream::{self, StreamExt};
374 ///
375 /// # spin_on::spin_on(async {
376 /// let s = stream::unfold(0, |mut n| async move {
377 ///     if n < 2 {
378 ///         let m = n + 1;
379 ///         Some((n, m))
380 ///     } else {
381 ///         None
382 ///     }
383 /// });
384 ///
385 /// let v: Vec<i32> = s.collect().await;
386 /// assert_eq!(v, [0, 1]);
387 /// # })
388 /// ```
unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut> where F: FnMut(T) -> Fut, Fut: Future<Output = Option<(Item, T)>>,389 pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
390 where
391     F: FnMut(T) -> Fut,
392     Fut: Future<Output = Option<(Item, T)>>,
393 {
394     Unfold {
395         f,
396         state: Some(seed),
397         fut: None,
398     }
399 }
400 
401 pin_project! {
402     /// Stream for the [`unfold()`] function.
403     #[derive(Clone)]
404     #[must_use = "streams do nothing unless polled"]
405     pub struct Unfold<T, F, Fut> {
406         f: F,
407         state: Option<T>,
408         #[pin]
409         fut: Option<Fut>,
410     }
411 }
412 
413 impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
414 where
415     T: fmt::Debug,
416     Fut: fmt::Debug,
417 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result418     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
419         f.debug_struct("Unfold")
420             .field("state", &self.state)
421             .field("fut", &self.fut)
422             .finish()
423     }
424 }
425 
426 impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
427 where
428     F: FnMut(T) -> Fut,
429     Fut: Future<Output = Option<(Item, T)>>,
430 {
431     type Item = Item;
432 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>433     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
434         let mut this = self.project();
435 
436         if let Some(state) = this.state.take() {
437             this.fut.set(Some((this.f)(state)));
438         }
439 
440         let step = ready!(this
441             .fut
442             .as_mut()
443             .as_pin_mut()
444             .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
445             .poll(cx));
446         this.fut.set(None);
447 
448         if let Some((item, next_state)) = step {
449             *this.state = Some(next_state);
450             Poll::Ready(Some(item))
451         } else {
452             Poll::Ready(None)
453         }
454     }
455 }
456 
457 /// Creates a stream from a seed value and a fallible async closure operating on it.
458 ///
459 /// # Examples
460 ///
461 /// ```
462 /// use futures_lite::stream::{self, StreamExt};
463 ///
464 /// # spin_on::spin_on(async {
465 /// let s = stream::try_unfold(0, |mut n| async move {
466 ///     if n < 2 {
467 ///         let m = n + 1;
468 ///         Ok(Some((n, m)))
469 ///     } else {
470 ///         std::io::Result::Ok(None)
471 ///     }
472 /// });
473 ///
474 /// let v: Vec<i32> = s.try_collect().await?;
475 /// assert_eq!(v, [0, 1]);
476 /// # std::io::Result::Ok(()) });
477 /// ```
try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut> where F: FnMut(T) -> Fut, Fut: Future<Output = Result<Option<(Item, T)>, E>>,478 pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
479 where
480     F: FnMut(T) -> Fut,
481     Fut: Future<Output = Result<Option<(Item, T)>, E>>,
482 {
483     TryUnfold {
484         f,
485         state: Some(init),
486         fut: None,
487     }
488 }
489 
490 pin_project! {
491     /// Stream for the [`try_unfold()`] function.
492     #[derive(Clone)]
493     #[must_use = "streams do nothing unless polled"]
494     pub struct TryUnfold<T, F, Fut> {
495         f: F,
496         state: Option<T>,
497         #[pin]
498         fut: Option<Fut>,
499     }
500 }
501 
502 impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
503 where
504     T: fmt::Debug,
505     Fut: fmt::Debug,
506 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result507     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
508         f.debug_struct("TryUnfold")
509             .field("state", &self.state)
510             .field("fut", &self.fut)
511             .finish()
512     }
513 }
514 
515 impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
516 where
517     F: FnMut(T) -> Fut,
518     Fut: Future<Output = Result<Option<(Item, T)>, E>>,
519 {
520     type Item = Result<Item, E>;
521 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>522     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
523         let mut this = self.project();
524 
525         if let Some(state) = this.state.take() {
526             this.fut.set(Some((this.f)(state)));
527         }
528 
529         match this.fut.as_mut().as_pin_mut() {
530             None => {
531                 // The future previously errored
532                 Poll::Ready(None)
533             }
534             Some(future) => {
535                 let step = ready!(future.poll(cx));
536                 this.fut.set(None);
537 
538                 match step {
539                     Ok(Some((item, next_state))) => {
540                         *this.state = Some(next_state);
541                         Poll::Ready(Some(Ok(item)))
542                     }
543                     Ok(None) => Poll::Ready(None),
544                     Err(e) => Poll::Ready(Some(Err(e))),
545                 }
546             }
547         }
548     }
549 }
550 
551 /// Extension trait for [`Stream`].
552 pub trait StreamExt: Stream {
553     /// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types.
poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> where Self: Unpin,554     fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
555     where
556         Self: Unpin,
557     {
558         Stream::poll_next(Pin::new(self), cx)
559     }
560 
561     /// Retrieves the next item in the stream.
562     ///
563     /// Returns [`None`] when iteration is finished. Stream implementations may choose to or not to
564     /// resume iteration after that.
565     ///
566     /// # Examples
567     ///
568     /// ```
569     /// use futures_lite::stream::{self, StreamExt};
570     ///
571     /// # spin_on::spin_on(async {
572     /// let mut s = stream::iter(1..=3);
573     ///
574     /// assert_eq!(s.next().await, Some(1));
575     /// assert_eq!(s.next().await, Some(2));
576     /// assert_eq!(s.next().await, Some(3));
577     /// assert_eq!(s.next().await, None);
578     /// # });
579     /// ```
next(&mut self) -> NextFuture<'_, Self> where Self: Unpin,580     fn next(&mut self) -> NextFuture<'_, Self>
581     where
582         Self: Unpin,
583     {
584         NextFuture { stream: self }
585     }
586 
587     /// Retrieves the next item in the stream.
588     ///
589     /// This is similar to the [`next()`][`StreamExt::next()`] method, but returns
590     /// `Result<Option<T>, E>` rather than `Option<Result<T, E>>`.
591     ///
592     /// Note that `s.try_next().await` is equivalent to `s.next().await.transpose()`.
593     ///
594     /// # Examples
595     ///
596     /// ```
597     /// use futures_lite::stream::{self, StreamExt};
598     ///
599     /// # spin_on::spin_on(async {
600     /// let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error")]);
601     ///
602     /// assert_eq!(s.try_next().await, Ok(Some(1)));
603     /// assert_eq!(s.try_next().await, Ok(Some(2)));
604     /// assert_eq!(s.try_next().await, Err("error"));
605     /// assert_eq!(s.try_next().await, Ok(None));
606     /// # });
607     /// ```
try_next<T, E>(&mut self) -> TryNextFuture<'_, Self> where Self: Stream<Item = Result<T, E>> + Unpin,608     fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
609     where
610         Self: Stream<Item = Result<T, E>> + Unpin,
611     {
612         TryNextFuture { stream: self }
613     }
614 
615     /// Counts the number of items in the stream.
616     ///
617     /// # Examples
618     ///
619     /// ```
620     /// use futures_lite::stream::{self, StreamExt};
621     ///
622     /// # spin_on::spin_on(async {
623     /// let s1 = stream::iter(vec![0]);
624     /// let s2 = stream::iter(vec![1, 2, 3]);
625     ///
626     /// assert_eq!(s1.count().await, 1);
627     /// assert_eq!(s2.count().await, 3);
628     /// # });
629     /// ```
count(self) -> CountFuture<Self> where Self: Sized,630     fn count(self) -> CountFuture<Self>
631     where
632         Self: Sized,
633     {
634         CountFuture {
635             stream: self,
636             count: 0,
637         }
638     }
639 
640     /// Maps items of the stream to new values using a closure.
641     ///
642     /// # Examples
643     ///
644     /// ```
645     /// use futures_lite::stream::{self, StreamExt};
646     ///
647     /// # spin_on::spin_on(async {
648     /// let s = stream::iter(vec![1, 2, 3]);
649     /// let mut s = s.map(|x| 2 * x);
650     ///
651     /// assert_eq!(s.next().await, Some(2));
652     /// assert_eq!(s.next().await, Some(4));
653     /// assert_eq!(s.next().await, Some(6));
654     /// assert_eq!(s.next().await, None);
655     /// # });
656     /// ```
map<T, F>(self, f: F) -> Map<Self, F> where Self: Sized, F: FnMut(Self::Item) -> T,657     fn map<T, F>(self, f: F) -> Map<Self, F>
658     where
659         Self: Sized,
660         F: FnMut(Self::Item) -> T,
661     {
662         Map { stream: self, f }
663     }
664 
665     /// Maps items to streams and then concatenates them.
666     ///
667     /// # Examples
668     ///
669     /// ```
670     /// use futures_lite::stream::{self, StreamExt};
671     ///
672     /// # spin_on::spin_on(async {
673     /// let words = stream::iter(vec!["one", "two"]);
674     ///
675     /// let s: String = words
676     ///     .flat_map(|s| stream::iter(s.chars()))
677     ///     .collect()
678     ///     .await;
679     ///
680     /// assert_eq!(s, "onetwo");
681     /// # });
682     /// ```
flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> where Self: Sized, U: Stream, F: FnMut(Self::Item) -> U,683     fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
684     where
685         Self: Sized,
686         U: Stream,
687         F: FnMut(Self::Item) -> U,
688     {
689         FlatMap {
690             stream: self.map(f),
691             inner_stream: None,
692         }
693     }
694 
695     /// Concatenates inner streams.
696     ///
697     /// # Examples
698     ///
699     /// ```
700     /// use futures_lite::stream::{self, StreamExt};
701     ///
702     /// # spin_on::spin_on(async {
703     /// let s1 = stream::iter(vec![1, 2, 3]);
704     /// let s2 = stream::iter(vec![4, 5]);
705     ///
706     /// let s = stream::iter(vec![s1, s2]);
707     /// let v: Vec<_> = s.flatten().collect().await;
708     /// assert_eq!(v, [1, 2, 3, 4, 5]);
709     /// # });
710     /// ```
flatten(self) -> Flatten<Self> where Self: Sized, Self::Item: Stream,711     fn flatten(self) -> Flatten<Self>
712     where
713         Self: Sized,
714         Self::Item: Stream,
715     {
716         Flatten {
717             stream: self,
718             inner_stream: None,
719         }
720     }
721 
722     /// Maps items of the stream to new values using an async closure.
723     ///
724     /// # Examples
725     ///
726     /// ```
727     /// use futures_lite::pin;
728     /// use futures_lite::stream::{self, StreamExt};
729     ///
730     /// # spin_on::spin_on(async {
731     /// let s = stream::iter(vec![1, 2, 3]);
732     /// let mut s = s.then(|x| async move { 2 * x });
733     ///
734     /// pin!(s);
735     /// assert_eq!(s.next().await, Some(2));
736     /// assert_eq!(s.next().await, Some(4));
737     /// assert_eq!(s.next().await, Some(6));
738     /// assert_eq!(s.next().await, None);
739     /// # });
740     /// ```
then<F, Fut>(self, f: F) -> Then<Self, F, Fut> where Self: Sized, F: FnMut(Self::Item) -> Fut, Fut: Future,741     fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
742     where
743         Self: Sized,
744         F: FnMut(Self::Item) -> Fut,
745         Fut: Future,
746     {
747         Then {
748             stream: self,
749             future: None,
750             f,
751         }
752     }
753 
754     /// Keeps items of the stream for which `predicate` returns `true`.
755     ///
756     /// # Examples
757     ///
758     /// ```
759     /// use futures_lite::stream::{self, StreamExt};
760     ///
761     /// # spin_on::spin_on(async {
762     /// let s = stream::iter(vec![1, 2, 3, 4]);
763     /// let mut s = s.filter(|i| i % 2 == 0);
764     ///
765     /// assert_eq!(s.next().await, Some(2));
766     /// assert_eq!(s.next().await, Some(4));
767     /// assert_eq!(s.next().await, None);
768     /// # });
769     /// ```
filter<P>(self, predicate: P) -> Filter<Self, P> where Self: Sized, P: FnMut(&Self::Item) -> bool,770     fn filter<P>(self, predicate: P) -> Filter<Self, P>
771     where
772         Self: Sized,
773         P: FnMut(&Self::Item) -> bool,
774     {
775         Filter {
776             stream: self,
777             predicate,
778         }
779     }
780 
781     /// Filters and maps items of the stream using a closure.
782     ///
783     /// # Examples
784     ///
785     /// ```
786     /// use futures_lite::stream::{self, StreamExt};
787     ///
788     /// # spin_on::spin_on(async {
789     /// let s = stream::iter(vec!["1", "lol", "3", "NaN", "5"]);
790     /// let mut s = s.filter_map(|a| a.parse::<u32>().ok());
791     ///
792     /// assert_eq!(s.next().await, Some(1));
793     /// assert_eq!(s.next().await, Some(3));
794     /// assert_eq!(s.next().await, Some(5));
795     /// assert_eq!(s.next().await, None);
796     /// # });
797     /// ```
filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where Self: Sized, F: FnMut(Self::Item) -> Option<T>,798     fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
799     where
800         Self: Sized,
801         F: FnMut(Self::Item) -> Option<T>,
802     {
803         FilterMap { stream: self, f }
804     }
805 
806     /// Takes only the first `n` items of the stream.
807     ///
808     /// # Examples
809     ///
810     /// ```
811     /// use futures_lite::stream::{self, StreamExt};
812     ///
813     /// # spin_on::spin_on(async {
814     /// let mut s = stream::repeat(7).take(2);
815     ///
816     /// assert_eq!(s.next().await, Some(7));
817     /// assert_eq!(s.next().await, Some(7));
818     /// assert_eq!(s.next().await, None);
819     /// # });
820     /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,821     fn take(self, n: usize) -> Take<Self>
822     where
823         Self: Sized,
824     {
825         Take { stream: self, n }
826     }
827 
828     /// Takes items while `predicate` returns `true`.
829     ///
830     /// # Examples
831     ///
832     /// ```
833     /// use futures_lite::stream::{self, StreamExt};
834     ///
835     /// # spin_on::spin_on(async {
836     /// let s = stream::iter(vec![1, 2, 3, 4]);
837     /// let mut s = s.take_while(|x| *x < 3);
838     ///
839     /// assert_eq!(s.next().await, Some(1));
840     /// assert_eq!(s.next().await, Some(2));
841     /// assert_eq!(s.next().await, None);
842     /// # });
843     /// ```
take_while<P>(self, predicate: P) -> TakeWhile<Self, P> where Self: Sized, P: FnMut(&Self::Item) -> bool,844     fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
845     where
846         Self: Sized,
847         P: FnMut(&Self::Item) -> bool,
848     {
849         TakeWhile {
850             stream: self,
851             predicate,
852         }
853     }
854 
855     /// Skips the first `n` items of the stream.
856     ///
857     /// # Examples
858     ///
859     /// ```
860     /// use futures_lite::stream::{self, StreamExt};
861     ///
862     /// # spin_on::spin_on(async {
863     /// let s = stream::iter(vec![1, 2, 3]);
864     /// let mut s = s.skip(2);
865     ///
866     /// assert_eq!(s.next().await, Some(3));
867     /// assert_eq!(s.next().await, None);
868     /// # });
869     /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,870     fn skip(self, n: usize) -> Skip<Self>
871     where
872         Self: Sized,
873     {
874         Skip { stream: self, n }
875     }
876 
877     /// Skips items while `predicate` returns `true`.
878     ///
879     /// # Examples
880     ///
881     /// ```
882     /// use futures_lite::stream::{self, StreamExt};
883     ///
884     /// # spin_on::spin_on(async {
885     /// let s = stream::iter(vec![-1i32, 0, 1]);
886     /// let mut s = s.skip_while(|x| x.is_negative());
887     ///
888     /// assert_eq!(s.next().await, Some(0));
889     /// assert_eq!(s.next().await, Some(1));
890     /// assert_eq!(s.next().await, None);
891     /// # });
892     /// ```
skip_while<P>(self, predicate: P) -> SkipWhile<Self, P> where Self: Sized, P: FnMut(&Self::Item) -> bool,893     fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
894     where
895         Self: Sized,
896         P: FnMut(&Self::Item) -> bool,
897     {
898         SkipWhile {
899             stream: self,
900             predicate: Some(predicate),
901         }
902     }
903 
904     /// Yields every `step`th item.
905     ///
906     /// # Panics
907     ///
908     /// This method will panic if the `step` is 0.
909     ///
910     /// # Examples
911     ///
912     /// ```
913     /// use futures_lite::stream::{self, StreamExt};
914     ///
915     /// # spin_on::spin_on(async {
916     /// let s = stream::iter(vec![0, 1, 2, 3, 4]);
917     /// let mut s = s.step_by(2);
918     ///
919     /// assert_eq!(s.next().await, Some(0));
920     /// assert_eq!(s.next().await, Some(2));
921     /// assert_eq!(s.next().await, Some(4));
922     /// assert_eq!(s.next().await, None);
923     /// # });
924     /// ```
step_by(self, step: usize) -> StepBy<Self> where Self: Sized,925     fn step_by(self, step: usize) -> StepBy<Self>
926     where
927         Self: Sized,
928     {
929         assert!(step > 0, "`step` must be greater than zero");
930         StepBy {
931             stream: self,
932             step,
933             i: 0,
934         }
935     }
936 
937     /// Appends another stream to the end of this one.
938     ///
939     /// # Examples
940     ///
941     /// ```
942     /// use futures_lite::stream::{self, StreamExt};
943     ///
944     /// # spin_on::spin_on(async {
945     /// let s1 = stream::iter(vec![1, 2]);
946     /// let s2 = stream::iter(vec![7, 8]);
947     /// let mut s = s1.chain(s2);
948     ///
949     /// assert_eq!(s.next().await, Some(1));
950     /// assert_eq!(s.next().await, Some(2));
951     /// assert_eq!(s.next().await, Some(7));
952     /// assert_eq!(s.next().await, Some(8));
953     /// assert_eq!(s.next().await, None);
954     /// # });
955     /// ```
chain<U>(self, other: U) -> Chain<Self, U> where Self: Sized, U: Stream<Item = Self::Item> + Sized,956     fn chain<U>(self, other: U) -> Chain<Self, U>
957     where
958         Self: Sized,
959         U: Stream<Item = Self::Item> + Sized,
960     {
961         Chain {
962             first: self.fuse(),
963             second: other.fuse(),
964         }
965     }
966 
967     /// Clones all items.
968     ///
969     /// # Examples
970     ///
971     /// ```
972     /// use futures_lite::stream::{self, StreamExt};
973     ///
974     /// # spin_on::spin_on(async {
975     /// let s = stream::iter(vec![&1, &2]);
976     /// let mut s = s.cloned();
977     ///
978     /// assert_eq!(s.next().await, Some(1));
979     /// assert_eq!(s.next().await, Some(2));
980     /// assert_eq!(s.next().await, None);
981     /// # });
982     /// ```
cloned<'a, T>(self) -> Cloned<Self> where Self: Stream<Item = &'a T> + Sized, T: Clone + 'a,983     fn cloned<'a, T>(self) -> Cloned<Self>
984     where
985         Self: Stream<Item = &'a T> + Sized,
986         T: Clone + 'a,
987     {
988         Cloned { stream: self }
989     }
990 
991     /// Copies all items.
992     ///
993     /// # Examples
994     ///
995     /// ```
996     /// use futures_lite::stream::{self, StreamExt};
997     ///
998     /// # spin_on::spin_on(async {
999     /// let s = stream::iter(vec![&1, &2]);
1000     /// let mut s = s.copied();
1001     ///
1002     /// assert_eq!(s.next().await, Some(1));
1003     /// assert_eq!(s.next().await, Some(2));
1004     /// assert_eq!(s.next().await, None);
1005     /// # });
1006     /// ```
copied<'a, T>(self) -> Copied<Self> where Self: Stream<Item = &'a T> + Sized, T: Copy + 'a,1007     fn copied<'a, T>(self) -> Copied<Self>
1008     where
1009         Self: Stream<Item = &'a T> + Sized,
1010         T: Copy + 'a,
1011     {
1012         Copied { stream: self }
1013     }
1014 
1015     /// Collects all items in the stream into a collection.
1016     ///
1017     /// # Examples
1018     ///
1019     /// ```
1020     /// use futures_lite::stream::{self, StreamExt};
1021     ///
1022     /// # spin_on::spin_on(async {
1023     /// let mut s = stream::iter(1..=3);
1024     ///
1025     /// let items: Vec<_> = s.collect().await;
1026     /// assert_eq!(items, [1, 2, 3]);
1027     /// # });
1028     /// ```
collect<C>(self) -> CollectFuture<Self, C> where Self: Sized, C: Default + Extend<Self::Item>,1029     fn collect<C>(self) -> CollectFuture<Self, C>
1030     where
1031         Self: Sized,
1032         C: Default + Extend<Self::Item>,
1033     {
1034         CollectFuture {
1035             stream: self,
1036             collection: Default::default(),
1037         }
1038     }
1039 
1040     /// Collects all items in the fallible stream into a collection.
1041     ///
1042     /// ```
1043     /// use futures_lite::stream::{self, StreamExt};
1044     ///
1045     /// # spin_on::spin_on(async {
1046     /// let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]);
1047     /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1048     /// assert_eq!(res, Err(2));
1049     ///
1050     /// let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1051     /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1052     /// assert_eq!(res, Ok(vec![1, 2, 3]));
1053     /// # })
1054     /// ```
try_collect<T, E, C>(self) -> TryCollectFuture<Self, C> where Self: Stream<Item = Result<T, E>> + Sized, C: Default + Extend<T>,1055     fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
1056     where
1057         Self: Stream<Item = Result<T, E>> + Sized,
1058         C: Default + Extend<T>,
1059     {
1060         TryCollectFuture {
1061             stream: self,
1062             items: Default::default(),
1063         }
1064     }
1065 
1066     /// Partitions items into those for which `predicate` is `true` and those for which it is
1067     /// `false`, and then collects them into two collections.
1068     ///
1069     /// # Examples
1070     ///
1071     /// ```
1072     /// use futures_lite::stream::{self, StreamExt};
1073     ///
1074     /// # spin_on::spin_on(async {
1075     /// let s = stream::iter(vec![1, 2, 3]);
1076     /// let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await;
1077     ///
1078     /// assert_eq!(even, &[2]);
1079     /// assert_eq!(odd, &[1, 3]);
1080     /// # })
1081     /// ```
partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B> where Self: Sized, B: Default + Extend<Self::Item>, P: FnMut(&Self::Item) -> bool,1082     fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
1083     where
1084         Self: Sized,
1085         B: Default + Extend<Self::Item>,
1086         P: FnMut(&Self::Item) -> bool,
1087     {
1088         PartitionFuture {
1089             stream: self,
1090             predicate,
1091             res: Some(Default::default()),
1092         }
1093     }
1094 
1095     /// Accumulates a computation over the stream.
1096     ///
1097     /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1098     /// the accumulator and each item in the stream. The final accumulator value is returned.
1099     ///
1100     /// # Examples
1101     ///
1102     /// ```
1103     /// use futures_lite::stream::{self, StreamExt};
1104     ///
1105     /// # spin_on::spin_on(async {
1106     /// let s = stream::iter(vec![1, 2, 3]);
1107     /// let sum = s.fold(0, |acc, x| acc + x).await;
1108     ///
1109     /// assert_eq!(sum, 6);
1110     /// # })
1111     /// ```
fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T> where Self: Sized, F: FnMut(T, Self::Item) -> T,1112     fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
1113     where
1114         Self: Sized,
1115         F: FnMut(T, Self::Item) -> T,
1116     {
1117         FoldFuture {
1118             stream: self,
1119             f,
1120             acc: Some(init),
1121         }
1122     }
1123 
1124     /// Accumulates a fallible computation over the stream.
1125     ///
1126     /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1127     /// the accumulator and each item in the stream. The final accumulator value is returned, or an
1128     /// error if `f` failed the computation.
1129     ///
1130     /// # Examples
1131     ///
1132     /// ```
1133     /// use futures_lite::stream::{self, StreamExt};
1134     ///
1135     /// # spin_on::spin_on(async {
1136     /// let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1137     ///
1138     /// let sum = s.try_fold(0, |acc, v| {
1139     ///     if (acc + v) % 2 == 1 {
1140     ///         Ok(acc + v)
1141     ///     } else {
1142     ///         Err("fail")
1143     ///     }
1144     /// })
1145     /// .await;
1146     ///
1147     /// assert_eq!(sum, Err("fail"));
1148     /// # })
1149     /// ```
try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B> where Self: Stream<Item = Result<T, E>> + Unpin + Sized, F: FnMut(B, T) -> Result<B, E>,1150     fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
1151     where
1152         Self: Stream<Item = Result<T, E>> + Unpin + Sized,
1153         F: FnMut(B, T) -> Result<B, E>,
1154     {
1155         TryFoldFuture {
1156             stream: self,
1157             f,
1158             acc: Some(init),
1159         }
1160     }
1161 
1162     /// Maps items of the stream to new values using a state value and a closure.
1163     ///
1164     /// Scanning begins with the inital state set to `initial_state`, and then applies `f` to the
1165     /// state and each item in the stream. The stream stops when `f` returns `None`.
1166     ///
1167     /// # Examples
1168     ///
1169     /// ```
1170     /// use futures_lite::stream::{self, StreamExt};
1171     ///
1172     /// # spin_on::spin_on(async {
1173     /// let s = stream::iter(vec![1, 2, 3]);
1174     /// let mut s = s.scan(1, |state, x| {
1175     ///     *state = *state * x;
1176     ///     Some(-*state)
1177     /// });
1178     ///
1179     /// assert_eq!(s.next().await, Some(-1));
1180     /// assert_eq!(s.next().await, Some(-2));
1181     /// assert_eq!(s.next().await, Some(-6));
1182     /// assert_eq!(s.next().await, None);
1183     /// # })
1184     /// ```
scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F> where Self: Sized, F: FnMut(&mut St, Self::Item) -> Option<B>,1185     fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1186     where
1187         Self: Sized,
1188         F: FnMut(&mut St, Self::Item) -> Option<B>,
1189     {
1190         Scan {
1191             stream: self,
1192             state_f: (initial_state, f),
1193         }
1194     }
1195 
1196     /// Fuses the stream so that it stops yielding items after the first [`None`].
1197     ///
1198     /// # Examples
1199     ///
1200     /// ```
1201     /// use futures_lite::stream::{self, StreamExt};
1202     ///
1203     /// # spin_on::spin_on(async {
1204     /// let mut s = stream::once(1).fuse();
1205     ///
1206     /// assert_eq!(s.next().await, Some(1));
1207     /// assert_eq!(s.next().await, None);
1208     /// assert_eq!(s.next().await, None);
1209     /// # })
1210     /// ```
fuse(self) -> Fuse<Self> where Self: Sized,1211     fn fuse(self) -> Fuse<Self>
1212     where
1213         Self: Sized,
1214     {
1215         Fuse {
1216             stream: self,
1217             done: false,
1218         }
1219     }
1220 
1221     /// Repeats the stream from beginning to end, forever.
1222     ///
1223     /// # Examples
1224     ///
1225     /// ```
1226     /// use futures_lite::stream::{self, StreamExt};
1227     ///
1228     /// # spin_on::spin_on(async {
1229     /// let mut s = stream::iter(vec![1, 2]).cycle();
1230     ///
1231     /// assert_eq!(s.next().await, Some(1));
1232     /// assert_eq!(s.next().await, Some(2));
1233     /// assert_eq!(s.next().await, Some(1));
1234     /// assert_eq!(s.next().await, Some(2));
1235     /// # });
1236     /// ```
cycle(self) -> Cycle<Self> where Self: Clone + Sized,1237     fn cycle(self) -> Cycle<Self>
1238     where
1239         Self: Clone + Sized,
1240     {
1241         Cycle {
1242             orig: self.clone(),
1243             stream: self,
1244         }
1245     }
1246 
1247     /// Enumerates items, mapping them to `(index, item)`.
1248     ///
1249     /// # Examples
1250     ///
1251     /// ```
1252     /// use futures_lite::stream::{self, StreamExt};
1253     ///
1254     /// # spin_on::spin_on(async {
1255     /// let s = stream::iter(vec!['a', 'b', 'c']);
1256     /// let mut s = s.enumerate();
1257     ///
1258     /// assert_eq!(s.next().await, Some((0, 'a')));
1259     /// assert_eq!(s.next().await, Some((1, 'b')));
1260     /// assert_eq!(s.next().await, Some((2, 'c')));
1261     /// assert_eq!(s.next().await, None);
1262     /// # });
1263     /// ```
enumerate(self) -> Enumerate<Self> where Self: Sized,1264     fn enumerate(self) -> Enumerate<Self>
1265     where
1266         Self: Sized,
1267     {
1268         Enumerate { stream: self, i: 0 }
1269     }
1270 
1271     /// Calls a closure on each item and passes it on.
1272     ///
1273     /// # Examples
1274     ///
1275     /// ```
1276     /// use futures_lite::stream::{self, StreamExt};
1277     ///
1278     /// # spin_on::spin_on(async {
1279     /// let s = stream::iter(vec![1, 2, 3, 4, 5]);
1280     ///
1281     /// let sum = s
1282     ///    .inspect(|x| println!("about to filter {}", x))
1283     ///    .filter(|x| x % 2 == 0)
1284     ///    .inspect(|x| println!("made it through filter: {}", x))
1285     ///    .fold(0, |sum, i| sum + i)
1286     ///    .await;
1287     /// # });
1288     /// ```
inspect<F>(self, f: F) -> Inspect<Self, F> where Self: Sized, F: FnMut(&Self::Item),1289     fn inspect<F>(self, f: F) -> Inspect<Self, F>
1290     where
1291         Self: Sized,
1292         F: FnMut(&Self::Item),
1293     {
1294         Inspect { stream: self, f }
1295     }
1296 
1297     /// Gets the `n`th item of the stream.
1298     ///
1299     /// In the end, `n+1` items of the stream will be consumed.
1300     ///
1301     /// # Examples
1302     ///
1303     /// ```
1304     /// use futures_lite::stream::{self, StreamExt};
1305     ///
1306     /// # spin_on::spin_on(async {
1307     /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]);
1308     ///
1309     /// assert_eq!(s.nth(2).await, Some(2));
1310     /// assert_eq!(s.nth(2).await, Some(5));
1311     /// assert_eq!(s.nth(2).await, None);
1312     /// # });
1313     /// ```
nth(&mut self, n: usize) -> NthFuture<'_, Self> where Self: Unpin,1314     fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
1315     where
1316         Self: Unpin,
1317     {
1318         NthFuture { stream: self, n }
1319     }
1320 
1321     /// Returns the last item in the stream.
1322     ///
1323     /// # Examples
1324     ///
1325     /// ```
1326     /// use futures_lite::stream::{self, StreamExt};
1327     ///
1328     /// # spin_on::spin_on(async {
1329     /// let s = stream::iter(vec![1, 2, 3, 4]);
1330     /// assert_eq!(s.last().await, Some(4));
1331     ///
1332     /// let s = stream::empty::<i32>();
1333     /// assert_eq!(s.last().await, None);
1334     /// # });
1335     /// ```
last(self) -> LastFuture<Self> where Self: Sized,1336     fn last(self) -> LastFuture<Self>
1337     where
1338         Self: Sized,
1339     {
1340         LastFuture {
1341             stream: self,
1342             last: None,
1343         }
1344     }
1345 
1346     /// Finds the first item of the stream for which `predicate` returns `true`.
1347     ///
1348     /// # Examples
1349     ///
1350     /// ```
1351     /// use futures_lite::stream::{self, StreamExt};
1352     ///
1353     /// # spin_on::spin_on(async {
1354     /// let mut s = stream::iter(vec![11, 12, 13, 14]);
1355     ///
1356     /// assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12));
1357     /// assert_eq!(s.next().await, Some(13));
1358     /// # });
1359     /// ```
find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P> where Self: Unpin, P: FnMut(&Self::Item) -> bool,1360     fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
1361     where
1362         Self: Unpin,
1363         P: FnMut(&Self::Item) -> bool,
1364     {
1365         FindFuture {
1366             stream: self,
1367             predicate,
1368         }
1369     }
1370 
1371     /// Applies a closure to items in the stream and returns the first [`Some`] result.
1372     ///
1373     /// # Examples
1374     ///
1375     /// ```
1376     /// use futures_lite::stream::{self, StreamExt};
1377     ///
1378     /// # spin_on::spin_on(async {
1379     /// let mut s = stream::iter(vec!["lol", "NaN", "2", "5"]);
1380     /// let number = s.find_map(|s| s.parse().ok()).await;
1381     ///
1382     /// assert_eq!(number, Some(2));
1383     /// # });
1384     /// ```
find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> Option<B>,1385     fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
1386     where
1387         Self: Unpin,
1388         F: FnMut(Self::Item) -> Option<B>,
1389     {
1390         FindMapFuture { stream: self, f }
1391     }
1392 
1393     /// Finds the index of the first item of the stream for which `predicate` returns `true`.
1394     ///
1395     /// # Examples
1396     ///
1397     /// ```
1398     /// use futures_lite::stream::{self, StreamExt};
1399     ///
1400     /// # spin_on::spin_on(async {
1401     /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]);
1402     ///
1403     /// assert_eq!(s.position(|x| x == 2).await, Some(2));
1404     /// assert_eq!(s.position(|x| x == 3).await, Some(0));
1405     /// assert_eq!(s.position(|x| x == 9).await, None);
1406     /// # });
1407     /// ```
position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P> where Self: Unpin, P: FnMut(Self::Item) -> bool,1408     fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
1409     where
1410         Self: Unpin,
1411         P: FnMut(Self::Item) -> bool,
1412     {
1413         PositionFuture {
1414             stream: self,
1415             predicate,
1416             index: 0,
1417         }
1418     }
1419 
1420     /// Tests if `predicate` returns `true` for all items in the stream.
1421     ///
1422     /// The result is `true` for an empty stream.
1423     ///
1424     /// # Examples
1425     ///
1426     /// ```
1427     /// use futures_lite::stream::{self, StreamExt};
1428     ///
1429     /// # spin_on::spin_on(async {
1430     /// let mut s = stream::iter(vec![1, 2, 3]);
1431     /// assert!(!s.all(|x| x % 2 == 0).await);
1432     ///
1433     /// let mut s = stream::iter(vec![2, 4, 6, 8]);
1434     /// assert!(s.all(|x| x % 2 == 0).await);
1435     ///
1436     /// let mut s = stream::empty::<i32>();
1437     /// assert!(s.all(|x| x % 2 == 0).await);
1438     /// # });
1439     /// ```
all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P> where Self: Unpin, P: FnMut(Self::Item) -> bool,1440     fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
1441     where
1442         Self: Unpin,
1443         P: FnMut(Self::Item) -> bool,
1444     {
1445         AllFuture {
1446             stream: self,
1447             predicate,
1448         }
1449     }
1450 
1451     /// Tests if `predicate` returns `true` for any item in the stream.
1452     ///
1453     /// The result is `false` for an empty stream.
1454     ///
1455     /// # Examples
1456     ///
1457     /// ```
1458     /// use futures_lite::stream::{self, StreamExt};
1459     ///
1460     /// # spin_on::spin_on(async {
1461     /// let mut s = stream::iter(vec![1, 3, 5, 7]);
1462     /// assert!(!s.any(|x| x % 2 == 0).await);
1463     ///
1464     /// let mut s = stream::iter(vec![1, 2, 3]);
1465     /// assert!(s.any(|x| x % 2 == 0).await);
1466     ///
1467     /// let mut s = stream::empty::<i32>();
1468     /// assert!(!s.any(|x| x % 2 == 0).await);
1469     /// # });
1470     /// ```
any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P> where Self: Unpin, P: FnMut(Self::Item) -> bool,1471     fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
1472     where
1473         Self: Unpin,
1474         P: FnMut(Self::Item) -> bool,
1475     {
1476         AnyFuture {
1477             stream: self,
1478             predicate,
1479         }
1480     }
1481 
1482     /// Calls a closure on each item of the stream.
1483     ///
1484     /// # Examples
1485     ///
1486     /// ```
1487     /// use futures_lite::stream::{self, StreamExt};
1488     ///
1489     /// # spin_on::spin_on(async {
1490     /// let mut s = stream::iter(vec![1, 2, 3]);
1491     /// s.for_each(|s| println!("{}", s)).await;
1492     /// # });
1493     /// ```
for_each<F>(self, f: F) -> ForEachFuture<Self, F> where Self: Sized, F: FnMut(Self::Item),1494     fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
1495     where
1496         Self: Sized,
1497         F: FnMut(Self::Item),
1498     {
1499         ForEachFuture { stream: self, f }
1500     }
1501 
1502     /// Calls a fallible closure on each item of the stream, stopping on first error.
1503     ///
1504     /// # Examples
1505     ///
1506     /// ```
1507     /// use futures_lite::stream::{self, StreamExt};
1508     ///
1509     /// # spin_on::spin_on(async {
1510     /// let mut s = stream::iter(vec![0, 1, 2, 3]);
1511     ///
1512     /// let mut v = vec![];
1513     /// let res = s
1514     ///     .try_for_each(|n| {
1515     ///         if n < 2 {
1516     ///             v.push(n);
1517     ///             Ok(())
1518     ///         } else {
1519     ///             Err("too big")
1520     ///         }
1521     ///     })
1522     ///     .await;
1523     ///
1524     /// assert_eq!(v, &[0, 1]);
1525     /// assert_eq!(res, Err("too big"));
1526     /// # });
1527     /// ```
try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> Result<(), E>,1528     fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
1529     where
1530         Self: Unpin,
1531         F: FnMut(Self::Item) -> Result<(), E>,
1532     {
1533         TryForEachFuture { stream: self, f }
1534     }
1535 
1536     /// Zips up two streams into a single stream of pairs.
1537     ///
1538     /// The stream of pairs stops when either of the original two streams is exhausted.
1539     ///
1540     /// # Examples
1541     ///
1542     /// ```
1543     /// use futures_lite::stream::{self, StreamExt};
1544     ///
1545     /// # spin_on::spin_on(async {
1546     /// let l = stream::iter(vec![1, 2, 3]);
1547     /// let r = stream::iter(vec![4, 5, 6, 7]);
1548     /// let mut s = l.zip(r);
1549     ///
1550     /// assert_eq!(s.next().await, Some((1, 4)));
1551     /// assert_eq!(s.next().await, Some((2, 5)));
1552     /// assert_eq!(s.next().await, Some((3, 6)));
1553     /// assert_eq!(s.next().await, None);
1554     /// # });
1555     /// ```
zip<U>(self, other: U) -> Zip<Self, U> where Self: Sized, U: Stream,1556     fn zip<U>(self, other: U) -> Zip<Self, U>
1557     where
1558         Self: Sized,
1559         U: Stream,
1560     {
1561         Zip {
1562             item_slot: None,
1563             first: self,
1564             second: other,
1565         }
1566     }
1567 
1568     /// Collects a stream of pairs into a pair of collections.
1569     ///
1570     /// # Examples
1571     ///
1572     /// ```
1573     /// use futures_lite::stream::{self, StreamExt};
1574     ///
1575     /// # spin_on::spin_on(async {
1576     /// let s = stream::iter(vec![(1, 2), (3, 4)]);
1577     /// let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1578     ///
1579     /// assert_eq!(left, [1, 3]);
1580     /// assert_eq!(right, [2, 4]);
1581     /// # });
1582     /// ```
unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB> where FromA: Default + Extend<A>, FromB: Default + Extend<B>, Self: Stream<Item = (A, B)> + Sized,1583     fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1584     where
1585         FromA: Default + Extend<A>,
1586         FromB: Default + Extend<B>,
1587         Self: Stream<Item = (A, B)> + Sized,
1588     {
1589         UnzipFuture {
1590             stream: self,
1591             res: Some(Default::default()),
1592         }
1593     }
1594 
1595     /// Merges with `other` stream, preferring items from `self` whenever both streams are ready.
1596     ///
1597     /// # Examples
1598     ///
1599     /// ```
1600     /// use futures_lite::stream::{self, StreamExt};
1601     /// use futures_lite::stream::{once, pending};
1602     ///
1603     /// # spin_on::spin_on(async {
1604     /// assert_eq!(once(1).or(pending()).next().await, Some(1));
1605     /// assert_eq!(pending().or(once(2)).next().await, Some(2));
1606     ///
1607     /// // The first future wins.
1608     /// assert_eq!(once(1).or(once(2)).next().await, Some(1));
1609     /// # })
1610     /// ```
or<S>(self, other: S) -> Or<Self, S> where Self: Sized, S: Stream<Item = Self::Item>,1611     fn or<S>(self, other: S) -> Or<Self, S>
1612     where
1613         Self: Sized,
1614         S: Stream<Item = Self::Item>,
1615     {
1616         Or {
1617             stream1: self,
1618             stream2: other,
1619         }
1620     }
1621 
1622     /// Merges with `other` stream, with no preference for either stream when both are ready.
1623     ///
1624     /// # Examples
1625     ///
1626     /// ```
1627     /// use futures_lite::stream::{self, StreamExt};
1628     /// use futures_lite::stream::{once, pending};
1629     ///
1630     /// # spin_on::spin_on(async {
1631     /// assert_eq!(once(1).race(pending()).next().await, Some(1));
1632     /// assert_eq!(pending().race(once(2)).next().await, Some(2));
1633     ///
1634     /// // One of the two stream is randomly chosen as the winner.
1635     /// let res = once(1).race(once(2)).next().await;
1636     /// # })
1637     /// ```
1638     #[cfg(feature = "std")]
race<S>(self, other: S) -> Race<Self, S> where Self: Sized, S: Stream<Item = Self::Item>,1639     fn race<S>(self, other: S) -> Race<Self, S>
1640     where
1641         Self: Sized,
1642         S: Stream<Item = Self::Item>,
1643     {
1644         Race {
1645             stream1: self,
1646             stream2: other,
1647         }
1648     }
1649 
1650     /// Boxes the stream and changes its type to `dyn Stream + Send + 'a`.
1651     ///
1652     /// # Examples
1653     ///
1654     /// ```
1655     /// use futures_lite::stream::{self, StreamExt};
1656     ///
1657     /// # spin_on::spin_on(async {
1658     /// let a = stream::once(1);
1659     /// let b = stream::empty();
1660     ///
1661     /// // Streams of different types can be stored in
1662     /// // the same collection when they are boxed:
1663     /// let streams = vec![a.boxed(), b.boxed()];
1664     /// # })
1665     /// ```
1666     #[cfg(feature = "alloc")]
boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>> where Self: Send + Sized + 'a,1667     fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
1668     where
1669         Self: Send + Sized + 'a,
1670     {
1671         Box::pin(self)
1672     }
1673 
1674     /// Boxes the stream and changes its type to `dyn Stream + 'a`.
1675     ///
1676     /// # Examples
1677     ///
1678     /// ```
1679     /// use futures_lite::stream::{self, StreamExt};
1680     ///
1681     /// # spin_on::spin_on(async {
1682     /// let a = stream::once(1);
1683     /// let b = stream::empty();
1684     ///
1685     /// // Streams of different types can be stored in
1686     /// // the same collection when they are boxed:
1687     /// let streams = vec![a.boxed_local(), b.boxed_local()];
1688     /// # })
1689     /// ```
1690     #[cfg(feature = "alloc")]
boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>> where Self: Sized + 'a,1691     fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
1692     where
1693         Self: Sized + 'a,
1694     {
1695         Box::pin(self)
1696     }
1697 }
1698 
1699 impl<S: Stream + ?Sized> StreamExt for S {}
1700 
1701 /// Type alias for `Pin<Box<dyn Stream<Item = T> + Send + 'static>>`.
1702 ///
1703 /// # Examples
1704 ///
1705 /// ```
1706 /// use futures_lite::stream::{self, StreamExt};
1707 ///
1708 /// // These two lines are equivalent:
1709 /// let s1: stream::Boxed<i32> = stream::once(7).boxed();
1710 /// let s2: stream::Boxed<i32> = Box::pin(stream::once(7));
1711 /// ```
1712 #[cfg(feature = "alloc")]
1713 pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
1714 
1715 /// Type alias for `Pin<Box<dyn Stream<Item = T> + 'static>>`.
1716 ///
1717 /// # Examples
1718 ///
1719 /// ```
1720 /// use futures_lite::stream::{self, StreamExt};
1721 ///
1722 /// // These two lines are equivalent:
1723 /// let s1: stream::BoxedLocal<i32> = stream::once(7).boxed_local();
1724 /// let s2: stream::BoxedLocal<i32> = Box::pin(stream::once(7));
1725 /// ```
1726 #[cfg(feature = "alloc")]
1727 pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
1728 
1729 /// Future for the [`StreamExt::next()`] method.
1730 #[derive(Debug)]
1731 #[must_use = "futures do nothing unless you `.await` or poll them"]
1732 pub struct NextFuture<'a, S: ?Sized> {
1733     stream: &'a mut S,
1734 }
1735 
1736 impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
1737 
1738 impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
1739     type Output = Option<S::Item>;
1740 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1741     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1742         self.stream.poll_next(cx)
1743     }
1744 }
1745 
1746 /// Future for the [`StreamExt::try_next()`] method.
1747 #[derive(Debug)]
1748 #[must_use = "futures do nothing unless you `.await` or poll them"]
1749 pub struct TryNextFuture<'a, S: ?Sized> {
1750     stream: &'a mut S,
1751 }
1752 
1753 impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
1754 
1755 impl<T, E, S> Future for TryNextFuture<'_, S>
1756 where
1757     S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
1758 {
1759     type Output = Result<Option<T>, E>;
1760 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1761     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1762         let res = ready!(self.stream.poll_next(cx));
1763         Poll::Ready(res.transpose())
1764     }
1765 }
1766 
1767 pin_project! {
1768     /// Future for the [`StreamExt::count()`] method.
1769     #[derive(Debug)]
1770     #[must_use = "futures do nothing unless you `.await` or poll them"]
1771     pub struct CountFuture<S: ?Sized> {
1772         count: usize,
1773         #[pin]
1774         stream: S,
1775     }
1776 }
1777 
1778 impl<S: Stream + ?Sized> Future for CountFuture<S> {
1779     type Output = usize;
1780 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1781     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1782         loop {
1783             match ready!(self.as_mut().project().stream.poll_next(cx)) {
1784                 None => return Poll::Ready(self.count),
1785                 Some(_) => *self.as_mut().project().count += 1,
1786             }
1787         }
1788     }
1789 }
1790 
1791 pin_project! {
1792     /// Future for the [`StreamExt::collect()`] method.
1793     #[derive(Debug)]
1794     #[must_use = "futures do nothing unless you `.await` or poll them"]
1795     pub struct CollectFuture<S, C> {
1796         #[pin]
1797         stream: S,
1798         collection: C,
1799     }
1800 }
1801 
1802 impl<S, C> Future for CollectFuture<S, C>
1803 where
1804     S: Stream,
1805     C: Default + Extend<S::Item>,
1806 {
1807     type Output = C;
1808 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C>1809     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
1810         let mut this = self.as_mut().project();
1811         loop {
1812             match ready!(this.stream.as_mut().poll_next(cx)) {
1813                 Some(e) => this.collection.extend(Some(e)),
1814                 None => {
1815                     return Poll::Ready(mem::replace(self.project().collection, Default::default()))
1816                 }
1817             }
1818         }
1819     }
1820 }
1821 
1822 pin_project! {
1823     /// Future for the [`StreamExt::try_collect()`] method.
1824     #[derive(Debug)]
1825     #[must_use = "futures do nothing unless you `.await` or poll them"]
1826     pub struct TryCollectFuture<S, C> {
1827         #[pin]
1828         stream: S,
1829         items: C,
1830     }
1831 }
1832 
1833 impl<T, E, S, C> Future for TryCollectFuture<S, C>
1834 where
1835     S: Stream<Item = Result<T, E>>,
1836     C: Default + Extend<T>,
1837 {
1838     type Output = Result<C, E>;
1839 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1840     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1841         let mut this = self.project();
1842         Poll::Ready(Ok(loop {
1843             match ready!(this.stream.as_mut().poll_next(cx)?) {
1844                 Some(x) => this.items.extend(Some(x)),
1845                 None => break mem::replace(this.items, Default::default()),
1846             }
1847         }))
1848     }
1849 }
1850 
1851 pin_project! {
1852     /// Future for the [`StreamExt::partition()`] method.
1853     #[derive(Debug)]
1854     #[must_use = "futures do nothing unless you `.await` or poll them"]
1855     pub struct PartitionFuture<S, P, B> {
1856         #[pin]
1857         stream: S,
1858         predicate: P,
1859         res: Option<(B, B)>,
1860     }
1861 }
1862 
1863 impl<S, P, B> Future for PartitionFuture<S, P, B>
1864 where
1865     S: Stream + Sized,
1866     P: FnMut(&S::Item) -> bool,
1867     B: Default + Extend<S::Item>,
1868 {
1869     type Output = (B, B);
1870 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1871     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1872         let mut this = self.project();
1873         loop {
1874             match ready!(this.stream.as_mut().poll_next(cx)) {
1875                 Some(v) => {
1876                     let res = this.res.as_mut().unwrap();
1877                     if (this.predicate)(&v) {
1878                         res.0.extend(Some(v))
1879                     } else {
1880                         res.1.extend(Some(v))
1881                     }
1882                 }
1883                 None => return Poll::Ready(this.res.take().unwrap()),
1884             }
1885         }
1886     }
1887 }
1888 
1889 pin_project! {
1890     /// Future for the [`StreamExt::fold()`] method.
1891     #[derive(Debug)]
1892     #[must_use = "futures do nothing unless you `.await` or poll them"]
1893     pub struct FoldFuture<S, F, T> {
1894         #[pin]
1895         stream: S,
1896         f: F,
1897         acc: Option<T>,
1898     }
1899 }
1900 
1901 impl<S, F, T> Future for FoldFuture<S, F, T>
1902 where
1903     S: Stream,
1904     F: FnMut(T, S::Item) -> T,
1905 {
1906     type Output = T;
1907 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1908     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1909         let mut this = self.project();
1910         loop {
1911             match ready!(this.stream.as_mut().poll_next(cx)) {
1912                 Some(v) => {
1913                     let old = this.acc.take().unwrap();
1914                     let new = (this.f)(old, v);
1915                     *this.acc = Some(new);
1916                 }
1917                 None => return Poll::Ready(this.acc.take().unwrap()),
1918             }
1919         }
1920     }
1921 }
1922 
1923 /// Future for the [`StreamExt::try_fold()`] method.
1924 #[derive(Debug)]
1925 #[must_use = "futures do nothing unless you `.await` or poll them"]
1926 pub struct TryFoldFuture<'a, S, F, B> {
1927     stream: &'a mut S,
1928     f: F,
1929     acc: Option<B>,
1930 }
1931 
1932 impl<'a, S, F, B> Unpin for TryFoldFuture<'a, S, F, B> {}
1933 
1934 impl<'a, T, E, S, F, B> Future for TryFoldFuture<'a, S, F, B>
1935 where
1936     S: Stream<Item = Result<T, E>> + Unpin,
1937     F: FnMut(B, T) -> Result<B, E>,
1938 {
1939     type Output = Result<B, E>;
1940 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1941     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1942         loop {
1943             match ready!(self.stream.poll_next(cx)) {
1944                 Some(Err(e)) => return Poll::Ready(Err(e)),
1945                 Some(Ok(t)) => {
1946                     let old = self.acc.take().unwrap();
1947                     let new = (&mut self.f)(old, t);
1948 
1949                     match new {
1950                         Ok(t) => self.acc = Some(t),
1951                         Err(e) => return Poll::Ready(Err(e)),
1952                     }
1953                 }
1954                 None => return Poll::Ready(Ok(self.acc.take().unwrap())),
1955             }
1956         }
1957     }
1958 }
1959 
1960 pin_project! {
1961     /// Stream for the [`StreamExt::scan()`] method.
1962     #[derive(Clone, Debug)]
1963     #[must_use = "streams do nothing unless polled"]
1964     pub struct Scan<S, St, F> {
1965         #[pin]
1966         stream: S,
1967         state_f: (St, F),
1968     }
1969 }
1970 
1971 impl<S, St, F, B> Stream for Scan<S, St, F>
1972 where
1973     S: Stream,
1974     F: FnMut(&mut St, S::Item) -> Option<B>,
1975 {
1976     type Item = B;
1977 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>>1978     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
1979         let mut this = self.project();
1980         this.stream.as_mut().poll_next(cx).map(|item| {
1981             item.and_then(|item| {
1982                 let (state, f) = this.state_f;
1983                 f(state, item)
1984             })
1985         })
1986     }
1987 }
1988 
1989 pin_project! {
1990     /// Stream for the [`StreamExt::fuse()`] method.
1991     #[derive(Clone, Debug)]
1992     #[must_use = "streams do nothing unless polled"]
1993     pub struct Fuse<S> {
1994         #[pin]
1995         stream: S,
1996         done: bool,
1997     }
1998 }
1999 
2000 impl<S: Stream> Stream for Fuse<S> {
2001     type Item = S::Item;
2002 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>>2003     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2004         let this = self.project();
2005 
2006         if *this.done {
2007             Poll::Ready(None)
2008         } else {
2009             let next = ready!(this.stream.poll_next(cx));
2010             if next.is_none() {
2011                 *this.done = true;
2012             }
2013             Poll::Ready(next)
2014         }
2015     }
2016 }
2017 
2018 pin_project! {
2019     /// Stream for the [`StreamExt::map()`] method.
2020     #[derive(Clone, Debug)]
2021     #[must_use = "streams do nothing unless polled"]
2022     pub struct Map<S, F> {
2023         #[pin]
2024         stream: S,
2025         f: F,
2026     }
2027 }
2028 
2029 impl<S, F, T> Stream for Map<S, F>
2030 where
2031     S: Stream,
2032     F: FnMut(S::Item) -> T,
2033 {
2034     type Item = T;
2035 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2036     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2037         let this = self.project();
2038         let next = ready!(this.stream.poll_next(cx));
2039         Poll::Ready(next.map(this.f))
2040     }
2041 
size_hint(&self) -> (usize, Option<usize>)2042     fn size_hint(&self) -> (usize, Option<usize>) {
2043         self.stream.size_hint()
2044     }
2045 }
2046 
2047 pin_project! {
2048     /// Stream for the [`StreamExt::flat_map()`] method.
2049     #[derive(Clone, Debug)]
2050     #[must_use = "streams do nothing unless polled"]
2051     pub struct FlatMap<S, U, F> {
2052         #[pin]
2053         stream: Map<S, F>,
2054         #[pin]
2055         inner_stream: Option<U>,
2056     }
2057 }
2058 
2059 impl<S, U, F> Stream for FlatMap<S, U, F>
2060 where
2061     S: Stream,
2062     U: Stream,
2063     F: FnMut(S::Item) -> U,
2064 {
2065     type Item = U::Item;
2066 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2067     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2068         let mut this = self.project();
2069         loop {
2070             if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2071                 match ready!(inner.poll_next(cx)) {
2072                     Some(item) => return Poll::Ready(Some(item)),
2073                     None => this.inner_stream.set(None),
2074                 }
2075             }
2076 
2077             match ready!(this.stream.as_mut().poll_next(cx)) {
2078                 Some(stream) => this.inner_stream.set(Some(stream)),
2079                 None => return Poll::Ready(None),
2080             }
2081         }
2082     }
2083 }
2084 
2085 pin_project! {
2086     /// Stream for the [`StreamExt::flat_map()`] method.
2087     #[derive(Clone, Debug)]
2088     #[must_use = "streams do nothing unless polled"]
2089     pub struct Flatten<S: Stream> {
2090         #[pin]
2091         stream: S,
2092         #[pin]
2093         inner_stream: Option<S::Item>,
2094     }
2095 }
2096 
2097 impl<S, U> Stream for Flatten<S>
2098 where
2099     S: Stream<Item = U>,
2100     U: Stream,
2101 {
2102     type Item = U::Item;
2103 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2104     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2105         let mut this = self.project();
2106         loop {
2107             if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2108                 match ready!(inner.poll_next(cx)) {
2109                     Some(item) => return Poll::Ready(Some(item)),
2110                     None => this.inner_stream.set(None),
2111                 }
2112             }
2113 
2114             match ready!(this.stream.as_mut().poll_next(cx)) {
2115                 Some(inner) => this.inner_stream.set(Some(inner)),
2116                 None => return Poll::Ready(None),
2117             }
2118         }
2119     }
2120 }
2121 
2122 pin_project! {
2123     /// Stream for the [`StreamExt::then()`] method.
2124     #[derive(Clone, Debug)]
2125     #[must_use = "streams do nothing unless polled"]
2126     pub struct Then<S, F, Fut> {
2127         #[pin]
2128         stream: S,
2129         #[pin]
2130         future: Option<Fut>,
2131         f: F,
2132     }
2133 }
2134 
2135 impl<S, F, Fut> Stream for Then<S, F, Fut>
2136 where
2137     S: Stream,
2138     F: FnMut(S::Item) -> Fut,
2139     Fut: Future,
2140 {
2141     type Item = Fut::Output;
2142 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2143     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2144         let mut this = self.project();
2145 
2146         loop {
2147             if let Some(fut) = this.future.as_mut().as_pin_mut() {
2148                 let item = ready!(fut.poll(cx));
2149                 this.future.set(None);
2150                 return Poll::Ready(Some(item));
2151             } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
2152                 this.future.set(Some((this.f)(item)));
2153             } else {
2154                 return Poll::Ready(None);
2155             }
2156         }
2157     }
2158 
size_hint(&self) -> (usize, Option<usize>)2159     fn size_hint(&self) -> (usize, Option<usize>) {
2160         let future_len = if self.future.is_some() { 1 } else { 0 };
2161         let (lower, upper) = self.stream.size_hint();
2162         let lower = lower.saturating_add(future_len);
2163         let upper = upper.and_then(|u| u.checked_add(future_len));
2164         (lower, upper)
2165     }
2166 }
2167 
2168 pin_project! {
2169     /// Stream for the [`StreamExt::filter()`] method.
2170     #[derive(Clone, Debug)]
2171     #[must_use = "streams do nothing unless polled"]
2172     pub struct Filter<S, P> {
2173         #[pin]
2174         stream: S,
2175         predicate: P,
2176     }
2177 }
2178 
2179 impl<S, P> Stream for Filter<S, P>
2180 where
2181     S: Stream,
2182     P: FnMut(&S::Item) -> bool,
2183 {
2184     type Item = S::Item;
2185 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2186     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2187         let mut this = self.project();
2188         loop {
2189             match ready!(this.stream.as_mut().poll_next(cx)) {
2190                 None => return Poll::Ready(None),
2191                 Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
2192                 Some(_) => {}
2193             }
2194         }
2195     }
2196 }
2197 
2198 /// Merges two streams, preferring items from `stream1` whenever both streams are ready.
2199 ///
2200 /// # Examples
2201 ///
2202 /// ```
2203 /// use futures_lite::stream::{self, once, pending, StreamExt};
2204 ///
2205 /// # spin_on::spin_on(async {
2206 /// assert_eq!(stream::or(once(1), pending()).next().await, Some(1));
2207 /// assert_eq!(stream::or(pending(), once(2)).next().await, Some(2));
2208 ///
2209 /// // The first stream wins.
2210 /// assert_eq!(stream::or(once(1), once(2)).next().await, Some(1));
2211 /// # })
2212 /// ```
or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2> where S1: Stream<Item = T>, S2: Stream<Item = T>,2213 pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
2214 where
2215     S1: Stream<Item = T>,
2216     S2: Stream<Item = T>,
2217 {
2218     Or { stream1, stream2 }
2219 }
2220 
2221 pin_project! {
2222     /// Stream for the [`or()`] function and the [`StreamExt::or()`] method.
2223     #[derive(Clone, Debug)]
2224     #[must_use = "streams do nothing unless polled"]
2225     pub struct Or<S1, S2> {
2226         #[pin]
2227         stream1: S1,
2228         #[pin]
2229         stream2: S2,
2230     }
2231 }
2232 
2233 impl<T, S1, S2> Stream for Or<S1, S2>
2234 where
2235     S1: Stream<Item = T>,
2236     S2: Stream<Item = T>,
2237 {
2238     type Item = T;
2239 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2240     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2241         let mut this = self.project();
2242 
2243         if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2244             return Poll::Ready(Some(t));
2245         }
2246         this.stream2.as_mut().poll_next(cx)
2247     }
2248 }
2249 
2250 /// Merges two streams, with no preference for either stream when both are ready.
2251 ///
2252 /// # Examples
2253 ///
2254 /// ```
2255 /// use futures_lite::stream::{self, once, pending, StreamExt};
2256 ///
2257 /// # spin_on::spin_on(async {
2258 /// assert_eq!(stream::race(once(1), pending()).next().await, Some(1));
2259 /// assert_eq!(stream::race(pending(), once(2)).next().await, Some(2));
2260 ///
2261 /// // One of the two stream is randomly chosen as the winner.
2262 /// let res = stream::race(once(1), once(2)).next().await;
2263 /// # })
2264 #[cfg(feature = "std")]
race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2> where S1: Stream<Item = T>, S2: Stream<Item = T>,2265 pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
2266 where
2267     S1: Stream<Item = T>,
2268     S2: Stream<Item = T>,
2269 {
2270     Race { stream1, stream2 }
2271 }
2272 
2273 #[cfg(feature = "std")]
2274 pin_project! {
2275     /// Stream for the [`race()`] function and the [`StreamExt::race()`] method.
2276     #[derive(Clone, Debug)]
2277     #[must_use = "streams do nothing unless polled"]
2278     pub struct Race<S1, S2> {
2279         #[pin]
2280         stream1: S1,
2281         #[pin]
2282         stream2: S2,
2283     }
2284 }
2285 
2286 #[cfg(feature = "std")]
2287 impl<T, S1, S2> Stream for Race<S1, S2>
2288 where
2289     S1: Stream<Item = T>,
2290     S2: Stream<Item = T>,
2291 {
2292     type Item = T;
2293 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2294     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2295         let mut this = self.project();
2296 
2297         if fastrand::bool() {
2298             if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2299                 return Poll::Ready(Some(t));
2300             }
2301             if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2302                 return Poll::Ready(Some(t));
2303             }
2304         } else {
2305             if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2306                 return Poll::Ready(Some(t));
2307             }
2308             if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2309                 return Poll::Ready(Some(t));
2310             }
2311         }
2312         Poll::Pending
2313     }
2314 }
2315 
2316 pin_project! {
2317     /// Stream for the [`StreamExt::filter_map()`] method.
2318     #[derive(Clone, Debug)]
2319     #[must_use = "streams do nothing unless polled"]
2320     pub struct FilterMap<S, F> {
2321         #[pin]
2322         stream: S,
2323         f: F,
2324     }
2325 }
2326 
2327 impl<S, F, T> Stream for FilterMap<S, F>
2328 where
2329     S: Stream,
2330     F: FnMut(S::Item) -> Option<T>,
2331 {
2332     type Item = T;
2333 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2334     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2335         let mut this = self.project();
2336         loop {
2337             match ready!(this.stream.as_mut().poll_next(cx)) {
2338                 None => return Poll::Ready(None),
2339                 Some(v) => {
2340                     if let Some(t) = (this.f)(v) {
2341                         return Poll::Ready(Some(t));
2342                     }
2343                 }
2344             }
2345         }
2346     }
2347 }
2348 
2349 pin_project! {
2350     /// Stream for the [`StreamExt::take()`] method.
2351     #[derive(Clone, Debug)]
2352     #[must_use = "streams do nothing unless polled"]
2353     pub struct Take<S> {
2354         #[pin]
2355         stream: S,
2356         n: usize,
2357     }
2358 }
2359 
2360 impl<S: Stream> Stream for Take<S> {
2361     type Item = S::Item;
2362 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>>2363     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2364         let this = self.project();
2365 
2366         if *this.n == 0 {
2367             Poll::Ready(None)
2368         } else {
2369             let next = ready!(this.stream.poll_next(cx));
2370             match next {
2371                 Some(_) => *this.n -= 1,
2372                 None => *this.n = 0,
2373             }
2374             Poll::Ready(next)
2375         }
2376     }
2377 }
2378 
2379 pin_project! {
2380     /// Stream for the [`StreamExt::take_while()`] method.
2381     #[derive(Clone, Debug)]
2382     #[must_use = "streams do nothing unless polled"]
2383     pub struct TakeWhile<S, P> {
2384         #[pin]
2385         stream: S,
2386         predicate: P,
2387     }
2388 }
2389 
2390 impl<S, P> Stream for TakeWhile<S, P>
2391 where
2392     S: Stream,
2393     P: FnMut(&S::Item) -> bool,
2394 {
2395     type Item = S::Item;
2396 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2397     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2398         let this = self.project();
2399 
2400         match ready!(this.stream.poll_next(cx)) {
2401             Some(v) => {
2402                 if (this.predicate)(&v) {
2403                     Poll::Ready(Some(v))
2404                 } else {
2405                     Poll::Ready(None)
2406                 }
2407             }
2408             None => Poll::Ready(None),
2409         }
2410     }
2411 }
2412 
2413 pin_project! {
2414     /// Stream for the [`StreamExt::skip()`] method.
2415     #[derive(Clone, Debug)]
2416     #[must_use = "streams do nothing unless polled"]
2417     pub struct Skip<S> {
2418         #[pin]
2419         stream: S,
2420         n: usize,
2421     }
2422 }
2423 
2424 impl<S: Stream> Stream for Skip<S> {
2425     type Item = S::Item;
2426 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2427     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2428         let mut this = self.project();
2429         loop {
2430             match ready!(this.stream.as_mut().poll_next(cx)) {
2431                 Some(v) => match *this.n {
2432                     0 => return Poll::Ready(Some(v)),
2433                     _ => *this.n -= 1,
2434                 },
2435                 None => return Poll::Ready(None),
2436             }
2437         }
2438     }
2439 }
2440 
2441 pin_project! {
2442     /// Stream for the [`StreamExt::skip_while()`] method.
2443     #[derive(Clone, Debug)]
2444     #[must_use = "streams do nothing unless polled"]
2445     pub struct SkipWhile<S, P> {
2446         #[pin]
2447         stream: S,
2448         predicate: Option<P>,
2449     }
2450 }
2451 
2452 impl<S, P> Stream for SkipWhile<S, P>
2453 where
2454     S: Stream,
2455     P: FnMut(&S::Item) -> bool,
2456 {
2457     type Item = S::Item;
2458 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2459     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2460         let mut this = self.project();
2461         loop {
2462             match ready!(this.stream.as_mut().poll_next(cx)) {
2463                 Some(v) => match this.predicate {
2464                     Some(p) => {
2465                         if !p(&v) {
2466                             *this.predicate = None;
2467                             return Poll::Ready(Some(v));
2468                         }
2469                     }
2470                     None => return Poll::Ready(Some(v)),
2471                 },
2472                 None => return Poll::Ready(None),
2473             }
2474         }
2475     }
2476 }
2477 
2478 pin_project! {
2479     /// Stream for the [`StreamExt::step_by()`] method.
2480     #[derive(Clone, Debug)]
2481     #[must_use = "streams do nothing unless polled"]
2482     pub struct StepBy<S> {
2483         #[pin]
2484         stream: S,
2485         step: usize,
2486         i: usize,
2487     }
2488 }
2489 
2490 impl<S: Stream> Stream for StepBy<S> {
2491     type Item = S::Item;
2492 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2493     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2494         let mut this = self.project();
2495         loop {
2496             match ready!(this.stream.as_mut().poll_next(cx)) {
2497                 Some(v) => {
2498                     if *this.i == 0 {
2499                         *this.i = *this.step - 1;
2500                         return Poll::Ready(Some(v));
2501                     } else {
2502                         *this.i -= 1;
2503                     }
2504                 }
2505                 None => return Poll::Ready(None),
2506             }
2507         }
2508     }
2509 }
2510 
2511 pin_project! {
2512     /// Stream for the [`StreamExt::chain()`] method.
2513     #[derive(Clone, Debug)]
2514     #[must_use = "streams do nothing unless polled"]
2515     pub struct Chain<S, U> {
2516         #[pin]
2517         first: Fuse<S>,
2518         #[pin]
2519         second: Fuse<U>,
2520     }
2521 }
2522 
2523 impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
2524     type Item = S::Item;
2525 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2526     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2527         let mut this = self.project();
2528 
2529         if !this.first.done {
2530             let next = ready!(this.first.as_mut().poll_next(cx));
2531             if let Some(next) = next {
2532                 return Poll::Ready(Some(next));
2533             }
2534         }
2535 
2536         if !this.second.done {
2537             let next = ready!(this.second.as_mut().poll_next(cx));
2538             if let Some(next) = next {
2539                 return Poll::Ready(Some(next));
2540             }
2541         }
2542 
2543         if this.first.done && this.second.done {
2544             Poll::Ready(None)
2545         } else {
2546             Poll::Pending
2547         }
2548     }
2549 }
2550 
2551 pin_project! {
2552     /// Stream for the [`StreamExt::cloned()`] method.
2553     #[derive(Clone, Debug)]
2554     #[must_use = "streams do nothing unless polled"]
2555     pub struct Cloned<S> {
2556         #[pin]
2557         stream: S,
2558     }
2559 }
2560 
2561 impl<'a, S, T: 'a> Stream for Cloned<S>
2562 where
2563     S: Stream<Item = &'a T>,
2564     T: Clone,
2565 {
2566     type Item = T;
2567 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2568     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2569         let this = self.project();
2570         let next = ready!(this.stream.poll_next(cx));
2571         Poll::Ready(next.cloned())
2572     }
2573 }
2574 
2575 pin_project! {
2576     /// Stream for the [`StreamExt::copied()`] method.
2577     #[derive(Clone, Debug)]
2578     #[must_use = "streams do nothing unless polled"]
2579     pub struct Copied<S> {
2580         #[pin]
2581         stream: S,
2582     }
2583 }
2584 
2585 impl<'a, S, T: 'a> Stream for Copied<S>
2586 where
2587     S: Stream<Item = &'a T>,
2588     T: Copy,
2589 {
2590     type Item = T;
2591 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2592     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2593         let this = self.project();
2594         let next = ready!(this.stream.poll_next(cx));
2595         Poll::Ready(next.copied())
2596     }
2597 }
2598 
2599 pin_project! {
2600     /// Stream for the [`StreamExt::cycle()`] method.
2601     #[derive(Clone, Debug)]
2602     #[must_use = "streams do nothing unless polled"]
2603     pub struct Cycle<S> {
2604         orig: S,
2605         #[pin]
2606         stream: S,
2607     }
2608 }
2609 
2610 impl<S> Stream for Cycle<S>
2611 where
2612     S: Stream + Clone,
2613 {
2614     type Item = S::Item;
2615 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2616     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2617         match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
2618             Some(item) => Poll::Ready(Some(item)),
2619             None => {
2620                 let new = self.as_mut().orig.clone();
2621                 self.as_mut().project().stream.set(new);
2622                 self.project().stream.poll_next(cx)
2623             }
2624         }
2625     }
2626 }
2627 
2628 pin_project! {
2629     /// Stream for the [`StreamExt::cycle()`] method.
2630     #[derive(Clone, Debug)]
2631     #[must_use = "streams do nothing unless polled"]
2632     pub struct Enumerate<S> {
2633         #[pin]
2634         stream: S,
2635         i: usize,
2636     }
2637 }
2638 
2639 impl<S> Stream for Enumerate<S>
2640 where
2641     S: Stream,
2642 {
2643     type Item = (usize, S::Item);
2644 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2645     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2646         let this = self.project();
2647 
2648         match ready!(this.stream.poll_next(cx)) {
2649             Some(v) => {
2650                 let ret = (*this.i, v);
2651                 *this.i += 1;
2652                 Poll::Ready(Some(ret))
2653             }
2654             None => Poll::Ready(None),
2655         }
2656     }
2657 }
2658 
2659 pin_project! {
2660     /// Stream for the [`StreamExt::inspect()`] method.
2661     #[derive(Clone, Debug)]
2662     #[must_use = "streams do nothing unless polled"]
2663     pub struct Inspect<S, F> {
2664         #[pin]
2665         stream: S,
2666         f: F,
2667     }
2668 }
2669 
2670 impl<S, F> Stream for Inspect<S, F>
2671 where
2672     S: Stream,
2673     F: FnMut(&S::Item),
2674 {
2675     type Item = S::Item;
2676 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2677     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2678         let mut this = self.project();
2679         let next = ready!(this.stream.as_mut().poll_next(cx));
2680         if let Some(x) = &next {
2681             (this.f)(x);
2682         }
2683         Poll::Ready(next)
2684     }
2685 }
2686 
2687 /// Future for the [`StreamExt::nth()`] method.
2688 #[derive(Debug)]
2689 #[must_use = "futures do nothing unless you `.await` or poll them"]
2690 pub struct NthFuture<'a, S: ?Sized> {
2691     stream: &'a mut S,
2692     n: usize,
2693 }
2694 
2695 impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
2696 
2697 impl<'a, S> Future for NthFuture<'a, S>
2698 where
2699     S: Stream + Unpin + ?Sized,
2700 {
2701     type Output = Option<S::Item>;
2702 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2703     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2704         loop {
2705             match ready!(self.stream.poll_next(cx)) {
2706                 Some(v) => match self.n {
2707                     0 => return Poll::Ready(Some(v)),
2708                     _ => self.n -= 1,
2709                 },
2710                 None => return Poll::Ready(None),
2711             }
2712         }
2713     }
2714 }
2715 
2716 pin_project! {
2717     /// Future for the [`StreamExt::last()`] method.
2718     #[derive(Debug)]
2719     #[must_use = "futures do nothing unless you `.await` or poll them"]
2720     pub struct LastFuture<S: Stream> {
2721         #[pin]
2722         stream: S,
2723         last: Option<S::Item>,
2724     }
2725 }
2726 
2727 impl<S: Stream> Future for LastFuture<S> {
2728     type Output = Option<S::Item>;
2729 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2730     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2731         let mut this = self.project();
2732         loop {
2733             match ready!(this.stream.as_mut().poll_next(cx)) {
2734                 Some(new) => *this.last = Some(new),
2735                 None => return Poll::Ready(this.last.take()),
2736             }
2737         }
2738     }
2739 }
2740 
2741 /// Future for the [`StreamExt::find()`] method.
2742 #[derive(Debug)]
2743 #[must_use = "futures do nothing unless you `.await` or poll them"]
2744 pub struct FindFuture<'a, S: ?Sized, P> {
2745     stream: &'a mut S,
2746     predicate: P,
2747 }
2748 
2749 impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
2750 
2751 impl<'a, S, P> Future for FindFuture<'a, S, P>
2752 where
2753     S: Stream + Unpin + ?Sized,
2754     P: FnMut(&S::Item) -> bool,
2755 {
2756     type Output = Option<S::Item>;
2757 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2758     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2759         loop {
2760             match ready!(self.stream.poll_next(cx)) {
2761                 Some(v) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
2762                 Some(_) => {}
2763                 None => return Poll::Ready(None),
2764             }
2765         }
2766     }
2767 }
2768 
2769 /// Future for the [`StreamExt::find_map()`] method.
2770 #[derive(Debug)]
2771 #[must_use = "futures do nothing unless you `.await` or poll them"]
2772 pub struct FindMapFuture<'a, S: ?Sized, F> {
2773     stream: &'a mut S,
2774     f: F,
2775 }
2776 
2777 impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
2778 
2779 impl<'a, S, B, F> Future for FindMapFuture<'a, S, F>
2780 where
2781     S: Stream + Unpin + ?Sized,
2782     F: FnMut(S::Item) -> Option<B>,
2783 {
2784     type Output = Option<B>;
2785 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2786     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2787         loop {
2788             match ready!(self.stream.poll_next(cx)) {
2789                 Some(v) => {
2790                     if let Some(v) = (&mut self.f)(v) {
2791                         return Poll::Ready(Some(v));
2792                     }
2793                 }
2794                 None => return Poll::Ready(None),
2795             }
2796         }
2797     }
2798 }
2799 
2800 /// Future for the [`StreamExt::position()`] method.
2801 #[derive(Debug)]
2802 #[must_use = "futures do nothing unless you `.await` or poll them"]
2803 pub struct PositionFuture<'a, S: ?Sized, P> {
2804     stream: &'a mut S,
2805     predicate: P,
2806     index: usize,
2807 }
2808 
2809 impl<'a, S: Unpin + ?Sized, P> Unpin for PositionFuture<'a, S, P> {}
2810 
2811 impl<'a, S, P> Future for PositionFuture<'a, S, P>
2812 where
2813     S: Stream + Unpin + ?Sized,
2814     P: FnMut(S::Item) -> bool,
2815 {
2816     type Output = Option<usize>;
2817 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2818     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2819         loop {
2820             match ready!(self.stream.poll_next(cx)) {
2821                 Some(v) => {
2822                     if (&mut self.predicate)(v) {
2823                         return Poll::Ready(Some(self.index));
2824                     } else {
2825                         self.index += 1;
2826                     }
2827                 }
2828                 None => return Poll::Ready(None),
2829             }
2830         }
2831     }
2832 }
2833 
2834 /// Future for the [`StreamExt::all()`] method.
2835 #[derive(Debug)]
2836 #[must_use = "futures do nothing unless you `.await` or poll them"]
2837 pub struct AllFuture<'a, S: ?Sized, P> {
2838     stream: &'a mut S,
2839     predicate: P,
2840 }
2841 
2842 impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
2843 
2844 impl<S, P> Future for AllFuture<'_, S, P>
2845 where
2846     S: Stream + Unpin + ?Sized,
2847     P: FnMut(S::Item) -> bool,
2848 {
2849     type Output = bool;
2850 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2851     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2852         loop {
2853             match ready!(self.stream.poll_next(cx)) {
2854                 Some(v) => {
2855                     if !(&mut self.predicate)(v) {
2856                         return Poll::Ready(false);
2857                     }
2858                 }
2859                 None => return Poll::Ready(true),
2860             }
2861         }
2862     }
2863 }
2864 
2865 /// Future for the [`StreamExt::any()`] method.
2866 #[derive(Debug)]
2867 #[must_use = "futures do nothing unless you `.await` or poll them"]
2868 pub struct AnyFuture<'a, S: ?Sized, P> {
2869     stream: &'a mut S,
2870     predicate: P,
2871 }
2872 
2873 impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
2874 
2875 impl<S, P> Future for AnyFuture<'_, S, P>
2876 where
2877     S: Stream + Unpin + ?Sized,
2878     P: FnMut(S::Item) -> bool,
2879 {
2880     type Output = bool;
2881 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2882     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2883         loop {
2884             match ready!(self.stream.poll_next(cx)) {
2885                 Some(v) => {
2886                     if (&mut self.predicate)(v) {
2887                         return Poll::Ready(true);
2888                     }
2889                 }
2890                 None => return Poll::Ready(false),
2891             }
2892         }
2893     }
2894 }
2895 
2896 pin_project! {
2897     /// Future for the [`StreamExt::for_each()`] method.
2898     #[derive(Debug)]
2899     #[must_use = "futures do nothing unless you `.await` or poll them"]
2900     pub struct ForEachFuture<S, F> {
2901         #[pin]
2902         stream: S,
2903         f: F,
2904     }
2905 }
2906 
2907 impl<S, F> Future for ForEachFuture<S, F>
2908 where
2909     S: Stream,
2910     F: FnMut(S::Item),
2911 {
2912     type Output = ();
2913 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2914     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2915         let mut this = self.project();
2916         loop {
2917             match ready!(this.stream.as_mut().poll_next(cx)) {
2918                 Some(v) => (this.f)(v),
2919                 None => return Poll::Ready(()),
2920             }
2921         }
2922     }
2923 }
2924 
2925 /// Future for the [`StreamExt::try_for_each()`] method.
2926 #[derive(Debug)]
2927 #[must_use = "futures do nothing unless you `.await` or poll them"]
2928 pub struct TryForEachFuture<'a, S: ?Sized, F> {
2929     stream: &'a mut S,
2930     f: F,
2931 }
2932 
2933 impl<'a, S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'a, S, F> {}
2934 
2935 impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F>
2936 where
2937     S: Stream + Unpin + ?Sized,
2938     F: FnMut(S::Item) -> Result<(), E>,
2939 {
2940     type Output = Result<(), E>;
2941 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2942     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2943         loop {
2944             match ready!(self.stream.poll_next(cx)) {
2945                 None => return Poll::Ready(Ok(())),
2946                 Some(v) => (&mut self.f)(v)?,
2947             }
2948         }
2949     }
2950 }
2951 
2952 pin_project! {
2953     /// Stream for the [`StreamExt::zip()`] method.
2954     #[derive(Clone, Debug)]
2955     #[must_use = "streams do nothing unless polled"]
2956     pub struct Zip<A: Stream, B> {
2957         item_slot: Option<A::Item>,
2958         #[pin]
2959         first: A,
2960         #[pin]
2961         second: B,
2962     }
2963 }
2964 
2965 impl<A: Stream, B: Stream> Stream for Zip<A, B> {
2966     type Item = (A::Item, B::Item);
2967 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2968     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2969         let this = self.project();
2970 
2971         if this.item_slot.is_none() {
2972             match this.first.poll_next(cx) {
2973                 Poll::Pending => return Poll::Pending,
2974                 Poll::Ready(None) => return Poll::Ready(None),
2975                 Poll::Ready(Some(item)) => *this.item_slot = Some(item),
2976             }
2977         }
2978 
2979         let second_item = ready!(this.second.poll_next(cx));
2980         let first_item = this.item_slot.take().unwrap();
2981         Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
2982     }
2983 }
2984 
2985 pin_project! {
2986     /// Future for the [`StreamExt::unzip()`] method.
2987     #[derive(Debug)]
2988     #[must_use = "futures do nothing unless you `.await` or poll them"]
2989     pub struct UnzipFuture<S, FromA, FromB> {
2990         #[pin]
2991         stream: S,
2992         res: Option<(FromA, FromB)>,
2993     }
2994 }
2995 
2996 impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
2997 where
2998     S: Stream<Item = (A, B)>,
2999     FromA: Default + Extend<A>,
3000     FromB: Default + Extend<B>,
3001 {
3002     type Output = (FromA, FromB);
3003 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>3004     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3005         let mut this = self.project();
3006 
3007         loop {
3008             match ready!(this.stream.as_mut().poll_next(cx)) {
3009                 Some((a, b)) => {
3010                     let res = this.res.as_mut().unwrap();
3011                     res.0.extend(Some(a));
3012                     res.1.extend(Some(b));
3013                 }
3014                 None => return Poll::Ready(this.res.take().unwrap()),
3015             }
3016         }
3017     }
3018 }
3019