1 //! Combinators for the [`Stream`] trait.
2 //!
3 //! # Examples
4 //!
5 //! ```
6 //! use futures_lite::stream::{self, StreamExt};
7 //!
8 //! # spin_on::spin_on(async {
9 //! let mut s = stream::iter(vec![1, 2, 3]);
10 //!
11 //! assert_eq!(s.next().await, Some(1));
12 //! assert_eq!(s.next().await, Some(2));
13 //! assert_eq!(s.next().await, Some(3));
14 //! assert_eq!(s.next().await, None);
15 //! # });
16 //! ```
17
18 #[cfg(feature = "alloc")]
19 extern crate alloc;
20
21 #[doc(no_inline)]
22 pub use futures_core::stream::Stream;
23
24 #[cfg(feature = "alloc")]
25 use alloc::boxed::Box;
26
27 use core::fmt;
28 use core::future::Future;
29 use core::marker::PhantomData;
30 use core::mem;
31 use core::pin::Pin;
32 use core::task::{Context, Poll};
33
34 use pin_project_lite::pin_project;
35
36 use crate::ready;
37
38 /// Converts a stream into a blocking iterator.
39 ///
40 /// # Examples
41 ///
42 /// ```
43 /// use futures_lite::{pin, stream};
44 ///
45 /// let stream = stream::once(7);
46 /// pin!(stream);
47 ///
48 /// let mut iter = stream::block_on(stream);
49 /// assert_eq!(iter.next(), Some(7));
50 /// assert_eq!(iter.next(), None);
51 /// ```
52 #[cfg(feature = "std")]
block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S>53 pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
54 BlockOn(stream)
55 }
56
57 /// Iterator for the [`block_on()`] function.
58 #[derive(Debug)]
59 pub struct BlockOn<S>(S);
60
61 #[cfg(feature = "std")]
62 impl<S: Stream + Unpin> Iterator for BlockOn<S> {
63 type Item = S::Item;
64
next(&mut self) -> Option<Self::Item>65 fn next(&mut self) -> Option<Self::Item> {
66 crate::future::block_on(self.0.next())
67 }
68 }
69
70 /// Creates an empty stream.
71 ///
72 /// # Examples
73 ///
74 /// ```
75 /// use futures_lite::stream::{self, StreamExt};
76 ///
77 /// # spin_on::spin_on(async {
78 /// let mut s = stream::empty::<i32>();
79 /// assert_eq!(s.next().await, None);
80 /// # })
81 /// ```
empty<T>() -> Empty<T>82 pub fn empty<T>() -> Empty<T> {
83 Empty {
84 _marker: PhantomData,
85 }
86 }
87
88 /// Stream for the [`empty()`] function.
89 #[derive(Clone, Debug)]
90 #[must_use = "streams do nothing unless polled"]
91 pub struct Empty<T> {
92 _marker: PhantomData<T>,
93 }
94
95 impl<T> Unpin for Empty<T> {}
96
97 impl<T> Stream for Empty<T> {
98 type Item = T;
99
poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>>100 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
101 Poll::Ready(None)
102 }
103
size_hint(&self) -> (usize, Option<usize>)104 fn size_hint(&self) -> (usize, Option<usize>) {
105 (0, Some(0))
106 }
107 }
108
109 /// Creates a stream from an iterator.
110 ///
111 /// # Examples
112 ///
113 /// ```
114 /// use futures_lite::stream::{self, StreamExt};
115 ///
116 /// # spin_on::spin_on(async {
117 /// let mut s = stream::iter(vec![1, 2]);
118 ///
119 /// assert_eq!(s.next().await, Some(1));
120 /// assert_eq!(s.next().await, Some(2));
121 /// assert_eq!(s.next().await, None);
122 /// # })
123 /// ```
iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter>124 pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
125 Iter {
126 iter: iter.into_iter(),
127 }
128 }
129
130 /// Stream for the [`iter()`] function.
131 #[derive(Clone, Debug)]
132 #[must_use = "streams do nothing unless polled"]
133 pub struct Iter<I> {
134 iter: I,
135 }
136
137 impl<I> Unpin for Iter<I> {}
138
139 impl<I: Iterator> Stream for Iter<I> {
140 type Item = I::Item;
141
poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>>142 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
143 Poll::Ready(self.iter.next())
144 }
145
size_hint(&self) -> (usize, Option<usize>)146 fn size_hint(&self) -> (usize, Option<usize>) {
147 self.iter.size_hint()
148 }
149 }
150
151 /// Creates a stream that yields a single item.
152 ///
153 /// # Examples
154 ///
155 /// ```
156 /// use futures_lite::stream::{self, StreamExt};
157 ///
158 /// # spin_on::spin_on(async {
159 /// let mut s = stream::once(7);
160 ///
161 /// assert_eq!(s.next().await, Some(7));
162 /// assert_eq!(s.next().await, None);
163 /// # })
164 /// ```
once<T>(t: T) -> Once<T>165 pub fn once<T>(t: T) -> Once<T> {
166 Once { value: Some(t) }
167 }
168
169 pin_project! {
170 /// Stream for the [`once()`] function.
171 #[derive(Clone, Debug)]
172 #[must_use = "streams do nothing unless polled"]
173 pub struct Once<T> {
174 value: Option<T>,
175 }
176 }
177
178 impl<T> Stream for Once<T> {
179 type Item = T;
180
poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>>181 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
182 Poll::Ready(self.project().value.take())
183 }
184
size_hint(&self) -> (usize, Option<usize>)185 fn size_hint(&self) -> (usize, Option<usize>) {
186 if self.value.is_some() {
187 (1, Some(1))
188 } else {
189 (0, Some(0))
190 }
191 }
192 }
193
194 /// Creates a stream that is always pending.
195 ///
196 /// # Examples
197 ///
198 /// ```no_run
199 /// use futures_lite::stream::{self, StreamExt};
200 ///
201 /// # spin_on::spin_on(async {
202 /// let mut s = stream::pending::<i32>();
203 /// s.next().await;
204 /// unreachable!();
205 /// # })
206 /// ```
pending<T>() -> Pending<T>207 pub fn pending<T>() -> Pending<T> {
208 Pending {
209 _marker: PhantomData,
210 }
211 }
212
213 /// Stream for the [`pending()`] function.
214 #[derive(Clone, Debug)]
215 #[must_use = "streams do nothing unless polled"]
216 pub struct Pending<T> {
217 _marker: PhantomData<T>,
218 }
219
220 impl<T> Unpin for Pending<T> {}
221
222 impl<T> Stream for Pending<T> {
223 type Item = T;
224
poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>>225 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
226 Poll::Pending
227 }
228
size_hint(&self) -> (usize, Option<usize>)229 fn size_hint(&self) -> (usize, Option<usize>) {
230 (0, Some(0))
231 }
232 }
233
234 /// Creates a stream from a function returning [`Poll`].
235 ///
236 /// # Examples
237 ///
238 /// ```
239 /// use futures_lite::stream::{self, StreamExt};
240 /// use std::task::{Context, Poll};
241 ///
242 /// # spin_on::spin_on(async {
243 /// fn f(_: &mut Context<'_>) -> Poll<Option<i32>> {
244 /// Poll::Ready(Some(7))
245 /// }
246 ///
247 /// assert_eq!(stream::poll_fn(f).next().await, Some(7));
248 /// # })
249 /// ```
poll_fn<T, F>(f: F) -> PollFn<F> where F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,250 pub fn poll_fn<T, F>(f: F) -> PollFn<F>
251 where
252 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
253 {
254 PollFn { f }
255 }
256
257 /// Stream for the [`poll_fn()`] function.
258 #[derive(Clone)]
259 #[must_use = "streams do nothing unless polled"]
260 pub struct PollFn<F> {
261 f: F,
262 }
263
264 impl<F> Unpin for PollFn<F> {}
265
266 impl<F> fmt::Debug for PollFn<F> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result267 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268 f.debug_struct("PollFn").finish()
269 }
270 }
271
272 impl<T, F> Stream for PollFn<F>
273 where
274 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
275 {
276 type Item = T;
277
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>278 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
279 (&mut self.f)(cx)
280 }
281 }
282
283 /// Creates an infinite stream that yields the same item repeatedly.
284 ///
285 /// # Examples
286 ///
287 /// ```
288 /// use futures_lite::stream::{self, StreamExt};
289 ///
290 /// # spin_on::spin_on(async {
291 /// let mut s = stream::repeat(7);
292 ///
293 /// assert_eq!(s.next().await, Some(7));
294 /// assert_eq!(s.next().await, Some(7));
295 /// # })
296 /// ```
repeat<T: Clone>(item: T) -> Repeat<T>297 pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
298 Repeat { item }
299 }
300
301 /// Stream for the [`repeat()`] function.
302 #[derive(Clone, Debug)]
303 #[must_use = "streams do nothing unless polled"]
304 pub struct Repeat<T> {
305 item: T,
306 }
307
308 impl<T> Unpin for Repeat<T> {}
309
310 impl<T: Clone> Stream for Repeat<T> {
311 type Item = T;
312
poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>>313 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
314 Poll::Ready(Some(self.item.clone()))
315 }
316
size_hint(&self) -> (usize, Option<usize>)317 fn size_hint(&self) -> (usize, Option<usize>) {
318 (usize::max_value(), None)
319 }
320 }
321
322 /// Creates an infinite stream from a closure that generates items.
323 ///
324 /// # Examples
325 ///
326 /// ```
327 /// use futures_lite::stream::{self, StreamExt};
328 ///
329 /// # spin_on::spin_on(async {
330 /// let mut s = stream::repeat_with(|| 7);
331 ///
332 /// assert_eq!(s.next().await, Some(7));
333 /// assert_eq!(s.next().await, Some(7));
334 /// # })
335 /// ```
repeat_with<T, F>(repeater: F) -> RepeatWith<F> where F: FnMut() -> T,336 pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
337 where
338 F: FnMut() -> T,
339 {
340 RepeatWith { f: repeater }
341 }
342
343 /// Stream for the [`repeat_with()`] function.
344 #[derive(Clone, Debug)]
345 #[must_use = "streams do nothing unless polled"]
346 pub struct RepeatWith<F> {
347 f: F,
348 }
349
350 impl<F> Unpin for RepeatWith<F> {}
351
352 impl<T, F> Stream for RepeatWith<F>
353 where
354 F: FnMut() -> T,
355 {
356 type Item = T;
357
poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>>358 fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
359 let item = (&mut self.f)();
360 Poll::Ready(Some(item))
361 }
362
size_hint(&self) -> (usize, Option<usize>)363 fn size_hint(&self) -> (usize, Option<usize>) {
364 (usize::max_value(), None)
365 }
366 }
367
368 /// Creates a stream from a seed value and an async closure operating on it.
369 ///
370 /// # Examples
371 ///
372 /// ```
373 /// use futures_lite::stream::{self, StreamExt};
374 ///
375 /// # spin_on::spin_on(async {
376 /// let s = stream::unfold(0, |mut n| async move {
377 /// if n < 2 {
378 /// let m = n + 1;
379 /// Some((n, m))
380 /// } else {
381 /// None
382 /// }
383 /// });
384 ///
385 /// let v: Vec<i32> = s.collect().await;
386 /// assert_eq!(v, [0, 1]);
387 /// # })
388 /// ```
unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut> where F: FnMut(T) -> Fut, Fut: Future<Output = Option<(Item, T)>>,389 pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
390 where
391 F: FnMut(T) -> Fut,
392 Fut: Future<Output = Option<(Item, T)>>,
393 {
394 Unfold {
395 f,
396 state: Some(seed),
397 fut: None,
398 }
399 }
400
401 pin_project! {
402 /// Stream for the [`unfold()`] function.
403 #[derive(Clone)]
404 #[must_use = "streams do nothing unless polled"]
405 pub struct Unfold<T, F, Fut> {
406 f: F,
407 state: Option<T>,
408 #[pin]
409 fut: Option<Fut>,
410 }
411 }
412
413 impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
414 where
415 T: fmt::Debug,
416 Fut: fmt::Debug,
417 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result418 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
419 f.debug_struct("Unfold")
420 .field("state", &self.state)
421 .field("fut", &self.fut)
422 .finish()
423 }
424 }
425
426 impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
427 where
428 F: FnMut(T) -> Fut,
429 Fut: Future<Output = Option<(Item, T)>>,
430 {
431 type Item = Item;
432
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>433 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
434 let mut this = self.project();
435
436 if let Some(state) = this.state.take() {
437 this.fut.set(Some((this.f)(state)));
438 }
439
440 let step = ready!(this
441 .fut
442 .as_mut()
443 .as_pin_mut()
444 .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
445 .poll(cx));
446 this.fut.set(None);
447
448 if let Some((item, next_state)) = step {
449 *this.state = Some(next_state);
450 Poll::Ready(Some(item))
451 } else {
452 Poll::Ready(None)
453 }
454 }
455 }
456
457 /// Creates a stream from a seed value and a fallible async closure operating on it.
458 ///
459 /// # Examples
460 ///
461 /// ```
462 /// use futures_lite::stream::{self, StreamExt};
463 ///
464 /// # spin_on::spin_on(async {
465 /// let s = stream::try_unfold(0, |mut n| async move {
466 /// if n < 2 {
467 /// let m = n + 1;
468 /// Ok(Some((n, m)))
469 /// } else {
470 /// std::io::Result::Ok(None)
471 /// }
472 /// });
473 ///
474 /// let v: Vec<i32> = s.try_collect().await?;
475 /// assert_eq!(v, [0, 1]);
476 /// # std::io::Result::Ok(()) });
477 /// ```
try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut> where F: FnMut(T) -> Fut, Fut: Future<Output = Result<Option<(Item, T)>, E>>,478 pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
479 where
480 F: FnMut(T) -> Fut,
481 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
482 {
483 TryUnfold {
484 f,
485 state: Some(init),
486 fut: None,
487 }
488 }
489
490 pin_project! {
491 /// Stream for the [`try_unfold()`] function.
492 #[derive(Clone)]
493 #[must_use = "streams do nothing unless polled"]
494 pub struct TryUnfold<T, F, Fut> {
495 f: F,
496 state: Option<T>,
497 #[pin]
498 fut: Option<Fut>,
499 }
500 }
501
502 impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
503 where
504 T: fmt::Debug,
505 Fut: fmt::Debug,
506 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result507 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
508 f.debug_struct("TryUnfold")
509 .field("state", &self.state)
510 .field("fut", &self.fut)
511 .finish()
512 }
513 }
514
515 impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
516 where
517 F: FnMut(T) -> Fut,
518 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
519 {
520 type Item = Result<Item, E>;
521
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>522 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
523 let mut this = self.project();
524
525 if let Some(state) = this.state.take() {
526 this.fut.set(Some((this.f)(state)));
527 }
528
529 match this.fut.as_mut().as_pin_mut() {
530 None => {
531 // The future previously errored
532 Poll::Ready(None)
533 }
534 Some(future) => {
535 let step = ready!(future.poll(cx));
536 this.fut.set(None);
537
538 match step {
539 Ok(Some((item, next_state))) => {
540 *this.state = Some(next_state);
541 Poll::Ready(Some(Ok(item)))
542 }
543 Ok(None) => Poll::Ready(None),
544 Err(e) => Poll::Ready(Some(Err(e))),
545 }
546 }
547 }
548 }
549 }
550
551 /// Extension trait for [`Stream`].
552 pub trait StreamExt: Stream {
553 /// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types.
poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> where Self: Unpin,554 fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
555 where
556 Self: Unpin,
557 {
558 Stream::poll_next(Pin::new(self), cx)
559 }
560
561 /// Retrieves the next item in the stream.
562 ///
563 /// Returns [`None`] when iteration is finished. Stream implementations may choose to or not to
564 /// resume iteration after that.
565 ///
566 /// # Examples
567 ///
568 /// ```
569 /// use futures_lite::stream::{self, StreamExt};
570 ///
571 /// # spin_on::spin_on(async {
572 /// let mut s = stream::iter(1..=3);
573 ///
574 /// assert_eq!(s.next().await, Some(1));
575 /// assert_eq!(s.next().await, Some(2));
576 /// assert_eq!(s.next().await, Some(3));
577 /// assert_eq!(s.next().await, None);
578 /// # });
579 /// ```
next(&mut self) -> NextFuture<'_, Self> where Self: Unpin,580 fn next(&mut self) -> NextFuture<'_, Self>
581 where
582 Self: Unpin,
583 {
584 NextFuture { stream: self }
585 }
586
587 /// Retrieves the next item in the stream.
588 ///
589 /// This is similar to the [`next()`][`StreamExt::next()`] method, but returns
590 /// `Result<Option<T>, E>` rather than `Option<Result<T, E>>`.
591 ///
592 /// Note that `s.try_next().await` is equivalent to `s.next().await.transpose()`.
593 ///
594 /// # Examples
595 ///
596 /// ```
597 /// use futures_lite::stream::{self, StreamExt};
598 ///
599 /// # spin_on::spin_on(async {
600 /// let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error")]);
601 ///
602 /// assert_eq!(s.try_next().await, Ok(Some(1)));
603 /// assert_eq!(s.try_next().await, Ok(Some(2)));
604 /// assert_eq!(s.try_next().await, Err("error"));
605 /// assert_eq!(s.try_next().await, Ok(None));
606 /// # });
607 /// ```
try_next<T, E>(&mut self) -> TryNextFuture<'_, Self> where Self: Stream<Item = Result<T, E>> + Unpin,608 fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
609 where
610 Self: Stream<Item = Result<T, E>> + Unpin,
611 {
612 TryNextFuture { stream: self }
613 }
614
615 /// Counts the number of items in the stream.
616 ///
617 /// # Examples
618 ///
619 /// ```
620 /// use futures_lite::stream::{self, StreamExt};
621 ///
622 /// # spin_on::spin_on(async {
623 /// let s1 = stream::iter(vec![0]);
624 /// let s2 = stream::iter(vec![1, 2, 3]);
625 ///
626 /// assert_eq!(s1.count().await, 1);
627 /// assert_eq!(s2.count().await, 3);
628 /// # });
629 /// ```
count(self) -> CountFuture<Self> where Self: Sized,630 fn count(self) -> CountFuture<Self>
631 where
632 Self: Sized,
633 {
634 CountFuture {
635 stream: self,
636 count: 0,
637 }
638 }
639
640 /// Maps items of the stream to new values using a closure.
641 ///
642 /// # Examples
643 ///
644 /// ```
645 /// use futures_lite::stream::{self, StreamExt};
646 ///
647 /// # spin_on::spin_on(async {
648 /// let s = stream::iter(vec![1, 2, 3]);
649 /// let mut s = s.map(|x| 2 * x);
650 ///
651 /// assert_eq!(s.next().await, Some(2));
652 /// assert_eq!(s.next().await, Some(4));
653 /// assert_eq!(s.next().await, Some(6));
654 /// assert_eq!(s.next().await, None);
655 /// # });
656 /// ```
map<T, F>(self, f: F) -> Map<Self, F> where Self: Sized, F: FnMut(Self::Item) -> T,657 fn map<T, F>(self, f: F) -> Map<Self, F>
658 where
659 Self: Sized,
660 F: FnMut(Self::Item) -> T,
661 {
662 Map { stream: self, f }
663 }
664
665 /// Maps items to streams and then concatenates them.
666 ///
667 /// # Examples
668 ///
669 /// ```
670 /// use futures_lite::stream::{self, StreamExt};
671 ///
672 /// # spin_on::spin_on(async {
673 /// let words = stream::iter(vec!["one", "two"]);
674 ///
675 /// let s: String = words
676 /// .flat_map(|s| stream::iter(s.chars()))
677 /// .collect()
678 /// .await;
679 ///
680 /// assert_eq!(s, "onetwo");
681 /// # });
682 /// ```
flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> where Self: Sized, U: Stream, F: FnMut(Self::Item) -> U,683 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
684 where
685 Self: Sized,
686 U: Stream,
687 F: FnMut(Self::Item) -> U,
688 {
689 FlatMap {
690 stream: self.map(f),
691 inner_stream: None,
692 }
693 }
694
695 /// Concatenates inner streams.
696 ///
697 /// # Examples
698 ///
699 /// ```
700 /// use futures_lite::stream::{self, StreamExt};
701 ///
702 /// # spin_on::spin_on(async {
703 /// let s1 = stream::iter(vec![1, 2, 3]);
704 /// let s2 = stream::iter(vec![4, 5]);
705 ///
706 /// let s = stream::iter(vec![s1, s2]);
707 /// let v: Vec<_> = s.flatten().collect().await;
708 /// assert_eq!(v, [1, 2, 3, 4, 5]);
709 /// # });
710 /// ```
flatten(self) -> Flatten<Self> where Self: Sized, Self::Item: Stream,711 fn flatten(self) -> Flatten<Self>
712 where
713 Self: Sized,
714 Self::Item: Stream,
715 {
716 Flatten {
717 stream: self,
718 inner_stream: None,
719 }
720 }
721
722 /// Maps items of the stream to new values using an async closure.
723 ///
724 /// # Examples
725 ///
726 /// ```
727 /// use futures_lite::pin;
728 /// use futures_lite::stream::{self, StreamExt};
729 ///
730 /// # spin_on::spin_on(async {
731 /// let s = stream::iter(vec![1, 2, 3]);
732 /// let mut s = s.then(|x| async move { 2 * x });
733 ///
734 /// pin!(s);
735 /// assert_eq!(s.next().await, Some(2));
736 /// assert_eq!(s.next().await, Some(4));
737 /// assert_eq!(s.next().await, Some(6));
738 /// assert_eq!(s.next().await, None);
739 /// # });
740 /// ```
then<F, Fut>(self, f: F) -> Then<Self, F, Fut> where Self: Sized, F: FnMut(Self::Item) -> Fut, Fut: Future,741 fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
742 where
743 Self: Sized,
744 F: FnMut(Self::Item) -> Fut,
745 Fut: Future,
746 {
747 Then {
748 stream: self,
749 future: None,
750 f,
751 }
752 }
753
754 /// Keeps items of the stream for which `predicate` returns `true`.
755 ///
756 /// # Examples
757 ///
758 /// ```
759 /// use futures_lite::stream::{self, StreamExt};
760 ///
761 /// # spin_on::spin_on(async {
762 /// let s = stream::iter(vec![1, 2, 3, 4]);
763 /// let mut s = s.filter(|i| i % 2 == 0);
764 ///
765 /// assert_eq!(s.next().await, Some(2));
766 /// assert_eq!(s.next().await, Some(4));
767 /// assert_eq!(s.next().await, None);
768 /// # });
769 /// ```
filter<P>(self, predicate: P) -> Filter<Self, P> where Self: Sized, P: FnMut(&Self::Item) -> bool,770 fn filter<P>(self, predicate: P) -> Filter<Self, P>
771 where
772 Self: Sized,
773 P: FnMut(&Self::Item) -> bool,
774 {
775 Filter {
776 stream: self,
777 predicate,
778 }
779 }
780
781 /// Filters and maps items of the stream using a closure.
782 ///
783 /// # Examples
784 ///
785 /// ```
786 /// use futures_lite::stream::{self, StreamExt};
787 ///
788 /// # spin_on::spin_on(async {
789 /// let s = stream::iter(vec!["1", "lol", "3", "NaN", "5"]);
790 /// let mut s = s.filter_map(|a| a.parse::<u32>().ok());
791 ///
792 /// assert_eq!(s.next().await, Some(1));
793 /// assert_eq!(s.next().await, Some(3));
794 /// assert_eq!(s.next().await, Some(5));
795 /// assert_eq!(s.next().await, None);
796 /// # });
797 /// ```
filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where Self: Sized, F: FnMut(Self::Item) -> Option<T>,798 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
799 where
800 Self: Sized,
801 F: FnMut(Self::Item) -> Option<T>,
802 {
803 FilterMap { stream: self, f }
804 }
805
806 /// Takes only the first `n` items of the stream.
807 ///
808 /// # Examples
809 ///
810 /// ```
811 /// use futures_lite::stream::{self, StreamExt};
812 ///
813 /// # spin_on::spin_on(async {
814 /// let mut s = stream::repeat(7).take(2);
815 ///
816 /// assert_eq!(s.next().await, Some(7));
817 /// assert_eq!(s.next().await, Some(7));
818 /// assert_eq!(s.next().await, None);
819 /// # });
820 /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,821 fn take(self, n: usize) -> Take<Self>
822 where
823 Self: Sized,
824 {
825 Take { stream: self, n }
826 }
827
828 /// Takes items while `predicate` returns `true`.
829 ///
830 /// # Examples
831 ///
832 /// ```
833 /// use futures_lite::stream::{self, StreamExt};
834 ///
835 /// # spin_on::spin_on(async {
836 /// let s = stream::iter(vec![1, 2, 3, 4]);
837 /// let mut s = s.take_while(|x| *x < 3);
838 ///
839 /// assert_eq!(s.next().await, Some(1));
840 /// assert_eq!(s.next().await, Some(2));
841 /// assert_eq!(s.next().await, None);
842 /// # });
843 /// ```
take_while<P>(self, predicate: P) -> TakeWhile<Self, P> where Self: Sized, P: FnMut(&Self::Item) -> bool,844 fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
845 where
846 Self: Sized,
847 P: FnMut(&Self::Item) -> bool,
848 {
849 TakeWhile {
850 stream: self,
851 predicate,
852 }
853 }
854
855 /// Skips the first `n` items of the stream.
856 ///
857 /// # Examples
858 ///
859 /// ```
860 /// use futures_lite::stream::{self, StreamExt};
861 ///
862 /// # spin_on::spin_on(async {
863 /// let s = stream::iter(vec![1, 2, 3]);
864 /// let mut s = s.skip(2);
865 ///
866 /// assert_eq!(s.next().await, Some(3));
867 /// assert_eq!(s.next().await, None);
868 /// # });
869 /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,870 fn skip(self, n: usize) -> Skip<Self>
871 where
872 Self: Sized,
873 {
874 Skip { stream: self, n }
875 }
876
877 /// Skips items while `predicate` returns `true`.
878 ///
879 /// # Examples
880 ///
881 /// ```
882 /// use futures_lite::stream::{self, StreamExt};
883 ///
884 /// # spin_on::spin_on(async {
885 /// let s = stream::iter(vec![-1i32, 0, 1]);
886 /// let mut s = s.skip_while(|x| x.is_negative());
887 ///
888 /// assert_eq!(s.next().await, Some(0));
889 /// assert_eq!(s.next().await, Some(1));
890 /// assert_eq!(s.next().await, None);
891 /// # });
892 /// ```
skip_while<P>(self, predicate: P) -> SkipWhile<Self, P> where Self: Sized, P: FnMut(&Self::Item) -> bool,893 fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
894 where
895 Self: Sized,
896 P: FnMut(&Self::Item) -> bool,
897 {
898 SkipWhile {
899 stream: self,
900 predicate: Some(predicate),
901 }
902 }
903
904 /// Yields every `step`th item.
905 ///
906 /// # Panics
907 ///
908 /// This method will panic if the `step` is 0.
909 ///
910 /// # Examples
911 ///
912 /// ```
913 /// use futures_lite::stream::{self, StreamExt};
914 ///
915 /// # spin_on::spin_on(async {
916 /// let s = stream::iter(vec![0, 1, 2, 3, 4]);
917 /// let mut s = s.step_by(2);
918 ///
919 /// assert_eq!(s.next().await, Some(0));
920 /// assert_eq!(s.next().await, Some(2));
921 /// assert_eq!(s.next().await, Some(4));
922 /// assert_eq!(s.next().await, None);
923 /// # });
924 /// ```
step_by(self, step: usize) -> StepBy<Self> where Self: Sized,925 fn step_by(self, step: usize) -> StepBy<Self>
926 where
927 Self: Sized,
928 {
929 assert!(step > 0, "`step` must be greater than zero");
930 StepBy {
931 stream: self,
932 step,
933 i: 0,
934 }
935 }
936
937 /// Appends another stream to the end of this one.
938 ///
939 /// # Examples
940 ///
941 /// ```
942 /// use futures_lite::stream::{self, StreamExt};
943 ///
944 /// # spin_on::spin_on(async {
945 /// let s1 = stream::iter(vec![1, 2]);
946 /// let s2 = stream::iter(vec![7, 8]);
947 /// let mut s = s1.chain(s2);
948 ///
949 /// assert_eq!(s.next().await, Some(1));
950 /// assert_eq!(s.next().await, Some(2));
951 /// assert_eq!(s.next().await, Some(7));
952 /// assert_eq!(s.next().await, Some(8));
953 /// assert_eq!(s.next().await, None);
954 /// # });
955 /// ```
chain<U>(self, other: U) -> Chain<Self, U> where Self: Sized, U: Stream<Item = Self::Item> + Sized,956 fn chain<U>(self, other: U) -> Chain<Self, U>
957 where
958 Self: Sized,
959 U: Stream<Item = Self::Item> + Sized,
960 {
961 Chain {
962 first: self.fuse(),
963 second: other.fuse(),
964 }
965 }
966
967 /// Clones all items.
968 ///
969 /// # Examples
970 ///
971 /// ```
972 /// use futures_lite::stream::{self, StreamExt};
973 ///
974 /// # spin_on::spin_on(async {
975 /// let s = stream::iter(vec![&1, &2]);
976 /// let mut s = s.cloned();
977 ///
978 /// assert_eq!(s.next().await, Some(1));
979 /// assert_eq!(s.next().await, Some(2));
980 /// assert_eq!(s.next().await, None);
981 /// # });
982 /// ```
cloned<'a, T>(self) -> Cloned<Self> where Self: Stream<Item = &'a T> + Sized, T: Clone + 'a,983 fn cloned<'a, T>(self) -> Cloned<Self>
984 where
985 Self: Stream<Item = &'a T> + Sized,
986 T: Clone + 'a,
987 {
988 Cloned { stream: self }
989 }
990
991 /// Copies all items.
992 ///
993 /// # Examples
994 ///
995 /// ```
996 /// use futures_lite::stream::{self, StreamExt};
997 ///
998 /// # spin_on::spin_on(async {
999 /// let s = stream::iter(vec![&1, &2]);
1000 /// let mut s = s.copied();
1001 ///
1002 /// assert_eq!(s.next().await, Some(1));
1003 /// assert_eq!(s.next().await, Some(2));
1004 /// assert_eq!(s.next().await, None);
1005 /// # });
1006 /// ```
copied<'a, T>(self) -> Copied<Self> where Self: Stream<Item = &'a T> + Sized, T: Copy + 'a,1007 fn copied<'a, T>(self) -> Copied<Self>
1008 where
1009 Self: Stream<Item = &'a T> + Sized,
1010 T: Copy + 'a,
1011 {
1012 Copied { stream: self }
1013 }
1014
1015 /// Collects all items in the stream into a collection.
1016 ///
1017 /// # Examples
1018 ///
1019 /// ```
1020 /// use futures_lite::stream::{self, StreamExt};
1021 ///
1022 /// # spin_on::spin_on(async {
1023 /// let mut s = stream::iter(1..=3);
1024 ///
1025 /// let items: Vec<_> = s.collect().await;
1026 /// assert_eq!(items, [1, 2, 3]);
1027 /// # });
1028 /// ```
collect<C>(self) -> CollectFuture<Self, C> where Self: Sized, C: Default + Extend<Self::Item>,1029 fn collect<C>(self) -> CollectFuture<Self, C>
1030 where
1031 Self: Sized,
1032 C: Default + Extend<Self::Item>,
1033 {
1034 CollectFuture {
1035 stream: self,
1036 collection: Default::default(),
1037 }
1038 }
1039
1040 /// Collects all items in the fallible stream into a collection.
1041 ///
1042 /// ```
1043 /// use futures_lite::stream::{self, StreamExt};
1044 ///
1045 /// # spin_on::spin_on(async {
1046 /// let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]);
1047 /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1048 /// assert_eq!(res, Err(2));
1049 ///
1050 /// let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1051 /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1052 /// assert_eq!(res, Ok(vec![1, 2, 3]));
1053 /// # })
1054 /// ```
try_collect<T, E, C>(self) -> TryCollectFuture<Self, C> where Self: Stream<Item = Result<T, E>> + Sized, C: Default + Extend<T>,1055 fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
1056 where
1057 Self: Stream<Item = Result<T, E>> + Sized,
1058 C: Default + Extend<T>,
1059 {
1060 TryCollectFuture {
1061 stream: self,
1062 items: Default::default(),
1063 }
1064 }
1065
1066 /// Partitions items into those for which `predicate` is `true` and those for which it is
1067 /// `false`, and then collects them into two collections.
1068 ///
1069 /// # Examples
1070 ///
1071 /// ```
1072 /// use futures_lite::stream::{self, StreamExt};
1073 ///
1074 /// # spin_on::spin_on(async {
1075 /// let s = stream::iter(vec![1, 2, 3]);
1076 /// let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await;
1077 ///
1078 /// assert_eq!(even, &[2]);
1079 /// assert_eq!(odd, &[1, 3]);
1080 /// # })
1081 /// ```
partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B> where Self: Sized, B: Default + Extend<Self::Item>, P: FnMut(&Self::Item) -> bool,1082 fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
1083 where
1084 Self: Sized,
1085 B: Default + Extend<Self::Item>,
1086 P: FnMut(&Self::Item) -> bool,
1087 {
1088 PartitionFuture {
1089 stream: self,
1090 predicate,
1091 res: Some(Default::default()),
1092 }
1093 }
1094
1095 /// Accumulates a computation over the stream.
1096 ///
1097 /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1098 /// the accumulator and each item in the stream. The final accumulator value is returned.
1099 ///
1100 /// # Examples
1101 ///
1102 /// ```
1103 /// use futures_lite::stream::{self, StreamExt};
1104 ///
1105 /// # spin_on::spin_on(async {
1106 /// let s = stream::iter(vec![1, 2, 3]);
1107 /// let sum = s.fold(0, |acc, x| acc + x).await;
1108 ///
1109 /// assert_eq!(sum, 6);
1110 /// # })
1111 /// ```
fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T> where Self: Sized, F: FnMut(T, Self::Item) -> T,1112 fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
1113 where
1114 Self: Sized,
1115 F: FnMut(T, Self::Item) -> T,
1116 {
1117 FoldFuture {
1118 stream: self,
1119 f,
1120 acc: Some(init),
1121 }
1122 }
1123
1124 /// Accumulates a fallible computation over the stream.
1125 ///
1126 /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1127 /// the accumulator and each item in the stream. The final accumulator value is returned, or an
1128 /// error if `f` failed the computation.
1129 ///
1130 /// # Examples
1131 ///
1132 /// ```
1133 /// use futures_lite::stream::{self, StreamExt};
1134 ///
1135 /// # spin_on::spin_on(async {
1136 /// let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1137 ///
1138 /// let sum = s.try_fold(0, |acc, v| {
1139 /// if (acc + v) % 2 == 1 {
1140 /// Ok(acc + v)
1141 /// } else {
1142 /// Err("fail")
1143 /// }
1144 /// })
1145 /// .await;
1146 ///
1147 /// assert_eq!(sum, Err("fail"));
1148 /// # })
1149 /// ```
try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B> where Self: Stream<Item = Result<T, E>> + Unpin + Sized, F: FnMut(B, T) -> Result<B, E>,1150 fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
1151 where
1152 Self: Stream<Item = Result<T, E>> + Unpin + Sized,
1153 F: FnMut(B, T) -> Result<B, E>,
1154 {
1155 TryFoldFuture {
1156 stream: self,
1157 f,
1158 acc: Some(init),
1159 }
1160 }
1161
1162 /// Maps items of the stream to new values using a state value and a closure.
1163 ///
1164 /// Scanning begins with the inital state set to `initial_state`, and then applies `f` to the
1165 /// state and each item in the stream. The stream stops when `f` returns `None`.
1166 ///
1167 /// # Examples
1168 ///
1169 /// ```
1170 /// use futures_lite::stream::{self, StreamExt};
1171 ///
1172 /// # spin_on::spin_on(async {
1173 /// let s = stream::iter(vec![1, 2, 3]);
1174 /// let mut s = s.scan(1, |state, x| {
1175 /// *state = *state * x;
1176 /// Some(-*state)
1177 /// });
1178 ///
1179 /// assert_eq!(s.next().await, Some(-1));
1180 /// assert_eq!(s.next().await, Some(-2));
1181 /// assert_eq!(s.next().await, Some(-6));
1182 /// assert_eq!(s.next().await, None);
1183 /// # })
1184 /// ```
scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F> where Self: Sized, F: FnMut(&mut St, Self::Item) -> Option<B>,1185 fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1186 where
1187 Self: Sized,
1188 F: FnMut(&mut St, Self::Item) -> Option<B>,
1189 {
1190 Scan {
1191 stream: self,
1192 state_f: (initial_state, f),
1193 }
1194 }
1195
1196 /// Fuses the stream so that it stops yielding items after the first [`None`].
1197 ///
1198 /// # Examples
1199 ///
1200 /// ```
1201 /// use futures_lite::stream::{self, StreamExt};
1202 ///
1203 /// # spin_on::spin_on(async {
1204 /// let mut s = stream::once(1).fuse();
1205 ///
1206 /// assert_eq!(s.next().await, Some(1));
1207 /// assert_eq!(s.next().await, None);
1208 /// assert_eq!(s.next().await, None);
1209 /// # })
1210 /// ```
fuse(self) -> Fuse<Self> where Self: Sized,1211 fn fuse(self) -> Fuse<Self>
1212 where
1213 Self: Sized,
1214 {
1215 Fuse {
1216 stream: self,
1217 done: false,
1218 }
1219 }
1220
1221 /// Repeats the stream from beginning to end, forever.
1222 ///
1223 /// # Examples
1224 ///
1225 /// ```
1226 /// use futures_lite::stream::{self, StreamExt};
1227 ///
1228 /// # spin_on::spin_on(async {
1229 /// let mut s = stream::iter(vec![1, 2]).cycle();
1230 ///
1231 /// assert_eq!(s.next().await, Some(1));
1232 /// assert_eq!(s.next().await, Some(2));
1233 /// assert_eq!(s.next().await, Some(1));
1234 /// assert_eq!(s.next().await, Some(2));
1235 /// # });
1236 /// ```
cycle(self) -> Cycle<Self> where Self: Clone + Sized,1237 fn cycle(self) -> Cycle<Self>
1238 where
1239 Self: Clone + Sized,
1240 {
1241 Cycle {
1242 orig: self.clone(),
1243 stream: self,
1244 }
1245 }
1246
1247 /// Enumerates items, mapping them to `(index, item)`.
1248 ///
1249 /// # Examples
1250 ///
1251 /// ```
1252 /// use futures_lite::stream::{self, StreamExt};
1253 ///
1254 /// # spin_on::spin_on(async {
1255 /// let s = stream::iter(vec!['a', 'b', 'c']);
1256 /// let mut s = s.enumerate();
1257 ///
1258 /// assert_eq!(s.next().await, Some((0, 'a')));
1259 /// assert_eq!(s.next().await, Some((1, 'b')));
1260 /// assert_eq!(s.next().await, Some((2, 'c')));
1261 /// assert_eq!(s.next().await, None);
1262 /// # });
1263 /// ```
enumerate(self) -> Enumerate<Self> where Self: Sized,1264 fn enumerate(self) -> Enumerate<Self>
1265 where
1266 Self: Sized,
1267 {
1268 Enumerate { stream: self, i: 0 }
1269 }
1270
1271 /// Calls a closure on each item and passes it on.
1272 ///
1273 /// # Examples
1274 ///
1275 /// ```
1276 /// use futures_lite::stream::{self, StreamExt};
1277 ///
1278 /// # spin_on::spin_on(async {
1279 /// let s = stream::iter(vec![1, 2, 3, 4, 5]);
1280 ///
1281 /// let sum = s
1282 /// .inspect(|x| println!("about to filter {}", x))
1283 /// .filter(|x| x % 2 == 0)
1284 /// .inspect(|x| println!("made it through filter: {}", x))
1285 /// .fold(0, |sum, i| sum + i)
1286 /// .await;
1287 /// # });
1288 /// ```
inspect<F>(self, f: F) -> Inspect<Self, F> where Self: Sized, F: FnMut(&Self::Item),1289 fn inspect<F>(self, f: F) -> Inspect<Self, F>
1290 where
1291 Self: Sized,
1292 F: FnMut(&Self::Item),
1293 {
1294 Inspect { stream: self, f }
1295 }
1296
1297 /// Gets the `n`th item of the stream.
1298 ///
1299 /// In the end, `n+1` items of the stream will be consumed.
1300 ///
1301 /// # Examples
1302 ///
1303 /// ```
1304 /// use futures_lite::stream::{self, StreamExt};
1305 ///
1306 /// # spin_on::spin_on(async {
1307 /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]);
1308 ///
1309 /// assert_eq!(s.nth(2).await, Some(2));
1310 /// assert_eq!(s.nth(2).await, Some(5));
1311 /// assert_eq!(s.nth(2).await, None);
1312 /// # });
1313 /// ```
nth(&mut self, n: usize) -> NthFuture<'_, Self> where Self: Unpin,1314 fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
1315 where
1316 Self: Unpin,
1317 {
1318 NthFuture { stream: self, n }
1319 }
1320
1321 /// Returns the last item in the stream.
1322 ///
1323 /// # Examples
1324 ///
1325 /// ```
1326 /// use futures_lite::stream::{self, StreamExt};
1327 ///
1328 /// # spin_on::spin_on(async {
1329 /// let s = stream::iter(vec![1, 2, 3, 4]);
1330 /// assert_eq!(s.last().await, Some(4));
1331 ///
1332 /// let s = stream::empty::<i32>();
1333 /// assert_eq!(s.last().await, None);
1334 /// # });
1335 /// ```
last(self) -> LastFuture<Self> where Self: Sized,1336 fn last(self) -> LastFuture<Self>
1337 where
1338 Self: Sized,
1339 {
1340 LastFuture {
1341 stream: self,
1342 last: None,
1343 }
1344 }
1345
1346 /// Finds the first item of the stream for which `predicate` returns `true`.
1347 ///
1348 /// # Examples
1349 ///
1350 /// ```
1351 /// use futures_lite::stream::{self, StreamExt};
1352 ///
1353 /// # spin_on::spin_on(async {
1354 /// let mut s = stream::iter(vec![11, 12, 13, 14]);
1355 ///
1356 /// assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12));
1357 /// assert_eq!(s.next().await, Some(13));
1358 /// # });
1359 /// ```
find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P> where Self: Unpin, P: FnMut(&Self::Item) -> bool,1360 fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
1361 where
1362 Self: Unpin,
1363 P: FnMut(&Self::Item) -> bool,
1364 {
1365 FindFuture {
1366 stream: self,
1367 predicate,
1368 }
1369 }
1370
1371 /// Applies a closure to items in the stream and returns the first [`Some`] result.
1372 ///
1373 /// # Examples
1374 ///
1375 /// ```
1376 /// use futures_lite::stream::{self, StreamExt};
1377 ///
1378 /// # spin_on::spin_on(async {
1379 /// let mut s = stream::iter(vec!["lol", "NaN", "2", "5"]);
1380 /// let number = s.find_map(|s| s.parse().ok()).await;
1381 ///
1382 /// assert_eq!(number, Some(2));
1383 /// # });
1384 /// ```
find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> Option<B>,1385 fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
1386 where
1387 Self: Unpin,
1388 F: FnMut(Self::Item) -> Option<B>,
1389 {
1390 FindMapFuture { stream: self, f }
1391 }
1392
1393 /// Finds the index of the first item of the stream for which `predicate` returns `true`.
1394 ///
1395 /// # Examples
1396 ///
1397 /// ```
1398 /// use futures_lite::stream::{self, StreamExt};
1399 ///
1400 /// # spin_on::spin_on(async {
1401 /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]);
1402 ///
1403 /// assert_eq!(s.position(|x| x == 2).await, Some(2));
1404 /// assert_eq!(s.position(|x| x == 3).await, Some(0));
1405 /// assert_eq!(s.position(|x| x == 9).await, None);
1406 /// # });
1407 /// ```
position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P> where Self: Unpin, P: FnMut(Self::Item) -> bool,1408 fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
1409 where
1410 Self: Unpin,
1411 P: FnMut(Self::Item) -> bool,
1412 {
1413 PositionFuture {
1414 stream: self,
1415 predicate,
1416 index: 0,
1417 }
1418 }
1419
1420 /// Tests if `predicate` returns `true` for all items in the stream.
1421 ///
1422 /// The result is `true` for an empty stream.
1423 ///
1424 /// # Examples
1425 ///
1426 /// ```
1427 /// use futures_lite::stream::{self, StreamExt};
1428 ///
1429 /// # spin_on::spin_on(async {
1430 /// let mut s = stream::iter(vec![1, 2, 3]);
1431 /// assert!(!s.all(|x| x % 2 == 0).await);
1432 ///
1433 /// let mut s = stream::iter(vec![2, 4, 6, 8]);
1434 /// assert!(s.all(|x| x % 2 == 0).await);
1435 ///
1436 /// let mut s = stream::empty::<i32>();
1437 /// assert!(s.all(|x| x % 2 == 0).await);
1438 /// # });
1439 /// ```
all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P> where Self: Unpin, P: FnMut(Self::Item) -> bool,1440 fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
1441 where
1442 Self: Unpin,
1443 P: FnMut(Self::Item) -> bool,
1444 {
1445 AllFuture {
1446 stream: self,
1447 predicate,
1448 }
1449 }
1450
1451 /// Tests if `predicate` returns `true` for any item in the stream.
1452 ///
1453 /// The result is `false` for an empty stream.
1454 ///
1455 /// # Examples
1456 ///
1457 /// ```
1458 /// use futures_lite::stream::{self, StreamExt};
1459 ///
1460 /// # spin_on::spin_on(async {
1461 /// let mut s = stream::iter(vec![1, 3, 5, 7]);
1462 /// assert!(!s.any(|x| x % 2 == 0).await);
1463 ///
1464 /// let mut s = stream::iter(vec![1, 2, 3]);
1465 /// assert!(s.any(|x| x % 2 == 0).await);
1466 ///
1467 /// let mut s = stream::empty::<i32>();
1468 /// assert!(!s.any(|x| x % 2 == 0).await);
1469 /// # });
1470 /// ```
any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P> where Self: Unpin, P: FnMut(Self::Item) -> bool,1471 fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
1472 where
1473 Self: Unpin,
1474 P: FnMut(Self::Item) -> bool,
1475 {
1476 AnyFuture {
1477 stream: self,
1478 predicate,
1479 }
1480 }
1481
1482 /// Calls a closure on each item of the stream.
1483 ///
1484 /// # Examples
1485 ///
1486 /// ```
1487 /// use futures_lite::stream::{self, StreamExt};
1488 ///
1489 /// # spin_on::spin_on(async {
1490 /// let mut s = stream::iter(vec![1, 2, 3]);
1491 /// s.for_each(|s| println!("{}", s)).await;
1492 /// # });
1493 /// ```
for_each<F>(self, f: F) -> ForEachFuture<Self, F> where Self: Sized, F: FnMut(Self::Item),1494 fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
1495 where
1496 Self: Sized,
1497 F: FnMut(Self::Item),
1498 {
1499 ForEachFuture { stream: self, f }
1500 }
1501
1502 /// Calls a fallible closure on each item of the stream, stopping on first error.
1503 ///
1504 /// # Examples
1505 ///
1506 /// ```
1507 /// use futures_lite::stream::{self, StreamExt};
1508 ///
1509 /// # spin_on::spin_on(async {
1510 /// let mut s = stream::iter(vec![0, 1, 2, 3]);
1511 ///
1512 /// let mut v = vec![];
1513 /// let res = s
1514 /// .try_for_each(|n| {
1515 /// if n < 2 {
1516 /// v.push(n);
1517 /// Ok(())
1518 /// } else {
1519 /// Err("too big")
1520 /// }
1521 /// })
1522 /// .await;
1523 ///
1524 /// assert_eq!(v, &[0, 1]);
1525 /// assert_eq!(res, Err("too big"));
1526 /// # });
1527 /// ```
try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> Result<(), E>,1528 fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
1529 where
1530 Self: Unpin,
1531 F: FnMut(Self::Item) -> Result<(), E>,
1532 {
1533 TryForEachFuture { stream: self, f }
1534 }
1535
1536 /// Zips up two streams into a single stream of pairs.
1537 ///
1538 /// The stream of pairs stops when either of the original two streams is exhausted.
1539 ///
1540 /// # Examples
1541 ///
1542 /// ```
1543 /// use futures_lite::stream::{self, StreamExt};
1544 ///
1545 /// # spin_on::spin_on(async {
1546 /// let l = stream::iter(vec![1, 2, 3]);
1547 /// let r = stream::iter(vec![4, 5, 6, 7]);
1548 /// let mut s = l.zip(r);
1549 ///
1550 /// assert_eq!(s.next().await, Some((1, 4)));
1551 /// assert_eq!(s.next().await, Some((2, 5)));
1552 /// assert_eq!(s.next().await, Some((3, 6)));
1553 /// assert_eq!(s.next().await, None);
1554 /// # });
1555 /// ```
zip<U>(self, other: U) -> Zip<Self, U> where Self: Sized, U: Stream,1556 fn zip<U>(self, other: U) -> Zip<Self, U>
1557 where
1558 Self: Sized,
1559 U: Stream,
1560 {
1561 Zip {
1562 item_slot: None,
1563 first: self,
1564 second: other,
1565 }
1566 }
1567
1568 /// Collects a stream of pairs into a pair of collections.
1569 ///
1570 /// # Examples
1571 ///
1572 /// ```
1573 /// use futures_lite::stream::{self, StreamExt};
1574 ///
1575 /// # spin_on::spin_on(async {
1576 /// let s = stream::iter(vec![(1, 2), (3, 4)]);
1577 /// let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1578 ///
1579 /// assert_eq!(left, [1, 3]);
1580 /// assert_eq!(right, [2, 4]);
1581 /// # });
1582 /// ```
unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB> where FromA: Default + Extend<A>, FromB: Default + Extend<B>, Self: Stream<Item = (A, B)> + Sized,1583 fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1584 where
1585 FromA: Default + Extend<A>,
1586 FromB: Default + Extend<B>,
1587 Self: Stream<Item = (A, B)> + Sized,
1588 {
1589 UnzipFuture {
1590 stream: self,
1591 res: Some(Default::default()),
1592 }
1593 }
1594
1595 /// Merges with `other` stream, preferring items from `self` whenever both streams are ready.
1596 ///
1597 /// # Examples
1598 ///
1599 /// ```
1600 /// use futures_lite::stream::{self, StreamExt};
1601 /// use futures_lite::stream::{once, pending};
1602 ///
1603 /// # spin_on::spin_on(async {
1604 /// assert_eq!(once(1).or(pending()).next().await, Some(1));
1605 /// assert_eq!(pending().or(once(2)).next().await, Some(2));
1606 ///
1607 /// // The first future wins.
1608 /// assert_eq!(once(1).or(once(2)).next().await, Some(1));
1609 /// # })
1610 /// ```
or<S>(self, other: S) -> Or<Self, S> where Self: Sized, S: Stream<Item = Self::Item>,1611 fn or<S>(self, other: S) -> Or<Self, S>
1612 where
1613 Self: Sized,
1614 S: Stream<Item = Self::Item>,
1615 {
1616 Or {
1617 stream1: self,
1618 stream2: other,
1619 }
1620 }
1621
1622 /// Merges with `other` stream, with no preference for either stream when both are ready.
1623 ///
1624 /// # Examples
1625 ///
1626 /// ```
1627 /// use futures_lite::stream::{self, StreamExt};
1628 /// use futures_lite::stream::{once, pending};
1629 ///
1630 /// # spin_on::spin_on(async {
1631 /// assert_eq!(once(1).race(pending()).next().await, Some(1));
1632 /// assert_eq!(pending().race(once(2)).next().await, Some(2));
1633 ///
1634 /// // One of the two stream is randomly chosen as the winner.
1635 /// let res = once(1).race(once(2)).next().await;
1636 /// # })
1637 /// ```
1638 #[cfg(feature = "std")]
race<S>(self, other: S) -> Race<Self, S> where Self: Sized, S: Stream<Item = Self::Item>,1639 fn race<S>(self, other: S) -> Race<Self, S>
1640 where
1641 Self: Sized,
1642 S: Stream<Item = Self::Item>,
1643 {
1644 Race {
1645 stream1: self,
1646 stream2: other,
1647 }
1648 }
1649
1650 /// Boxes the stream and changes its type to `dyn Stream + Send + 'a`.
1651 ///
1652 /// # Examples
1653 ///
1654 /// ```
1655 /// use futures_lite::stream::{self, StreamExt};
1656 ///
1657 /// # spin_on::spin_on(async {
1658 /// let a = stream::once(1);
1659 /// let b = stream::empty();
1660 ///
1661 /// // Streams of different types can be stored in
1662 /// // the same collection when they are boxed:
1663 /// let streams = vec![a.boxed(), b.boxed()];
1664 /// # })
1665 /// ```
1666 #[cfg(feature = "alloc")]
boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>> where Self: Send + Sized + 'a,1667 fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
1668 where
1669 Self: Send + Sized + 'a,
1670 {
1671 Box::pin(self)
1672 }
1673
1674 /// Boxes the stream and changes its type to `dyn Stream + 'a`.
1675 ///
1676 /// # Examples
1677 ///
1678 /// ```
1679 /// use futures_lite::stream::{self, StreamExt};
1680 ///
1681 /// # spin_on::spin_on(async {
1682 /// let a = stream::once(1);
1683 /// let b = stream::empty();
1684 ///
1685 /// // Streams of different types can be stored in
1686 /// // the same collection when they are boxed:
1687 /// let streams = vec![a.boxed_local(), b.boxed_local()];
1688 /// # })
1689 /// ```
1690 #[cfg(feature = "alloc")]
boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>> where Self: Sized + 'a,1691 fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
1692 where
1693 Self: Sized + 'a,
1694 {
1695 Box::pin(self)
1696 }
1697 }
1698
1699 impl<S: Stream + ?Sized> StreamExt for S {}
1700
1701 /// Type alias for `Pin<Box<dyn Stream<Item = T> + Send + 'static>>`.
1702 ///
1703 /// # Examples
1704 ///
1705 /// ```
1706 /// use futures_lite::stream::{self, StreamExt};
1707 ///
1708 /// // These two lines are equivalent:
1709 /// let s1: stream::Boxed<i32> = stream::once(7).boxed();
1710 /// let s2: stream::Boxed<i32> = Box::pin(stream::once(7));
1711 /// ```
1712 #[cfg(feature = "alloc")]
1713 pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
1714
1715 /// Type alias for `Pin<Box<dyn Stream<Item = T> + 'static>>`.
1716 ///
1717 /// # Examples
1718 ///
1719 /// ```
1720 /// use futures_lite::stream::{self, StreamExt};
1721 ///
1722 /// // These two lines are equivalent:
1723 /// let s1: stream::BoxedLocal<i32> = stream::once(7).boxed_local();
1724 /// let s2: stream::BoxedLocal<i32> = Box::pin(stream::once(7));
1725 /// ```
1726 #[cfg(feature = "alloc")]
1727 pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
1728
1729 /// Future for the [`StreamExt::next()`] method.
1730 #[derive(Debug)]
1731 #[must_use = "futures do nothing unless you `.await` or poll them"]
1732 pub struct NextFuture<'a, S: ?Sized> {
1733 stream: &'a mut S,
1734 }
1735
1736 impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
1737
1738 impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
1739 type Output = Option<S::Item>;
1740
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1741 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1742 self.stream.poll_next(cx)
1743 }
1744 }
1745
1746 /// Future for the [`StreamExt::try_next()`] method.
1747 #[derive(Debug)]
1748 #[must_use = "futures do nothing unless you `.await` or poll them"]
1749 pub struct TryNextFuture<'a, S: ?Sized> {
1750 stream: &'a mut S,
1751 }
1752
1753 impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
1754
1755 impl<T, E, S> Future for TryNextFuture<'_, S>
1756 where
1757 S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
1758 {
1759 type Output = Result<Option<T>, E>;
1760
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1761 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1762 let res = ready!(self.stream.poll_next(cx));
1763 Poll::Ready(res.transpose())
1764 }
1765 }
1766
1767 pin_project! {
1768 /// Future for the [`StreamExt::count()`] method.
1769 #[derive(Debug)]
1770 #[must_use = "futures do nothing unless you `.await` or poll them"]
1771 pub struct CountFuture<S: ?Sized> {
1772 count: usize,
1773 #[pin]
1774 stream: S,
1775 }
1776 }
1777
1778 impl<S: Stream + ?Sized> Future for CountFuture<S> {
1779 type Output = usize;
1780
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1781 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1782 loop {
1783 match ready!(self.as_mut().project().stream.poll_next(cx)) {
1784 None => return Poll::Ready(self.count),
1785 Some(_) => *self.as_mut().project().count += 1,
1786 }
1787 }
1788 }
1789 }
1790
1791 pin_project! {
1792 /// Future for the [`StreamExt::collect()`] method.
1793 #[derive(Debug)]
1794 #[must_use = "futures do nothing unless you `.await` or poll them"]
1795 pub struct CollectFuture<S, C> {
1796 #[pin]
1797 stream: S,
1798 collection: C,
1799 }
1800 }
1801
1802 impl<S, C> Future for CollectFuture<S, C>
1803 where
1804 S: Stream,
1805 C: Default + Extend<S::Item>,
1806 {
1807 type Output = C;
1808
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C>1809 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
1810 let mut this = self.as_mut().project();
1811 loop {
1812 match ready!(this.stream.as_mut().poll_next(cx)) {
1813 Some(e) => this.collection.extend(Some(e)),
1814 None => {
1815 return Poll::Ready(mem::replace(self.project().collection, Default::default()))
1816 }
1817 }
1818 }
1819 }
1820 }
1821
1822 pin_project! {
1823 /// Future for the [`StreamExt::try_collect()`] method.
1824 #[derive(Debug)]
1825 #[must_use = "futures do nothing unless you `.await` or poll them"]
1826 pub struct TryCollectFuture<S, C> {
1827 #[pin]
1828 stream: S,
1829 items: C,
1830 }
1831 }
1832
1833 impl<T, E, S, C> Future for TryCollectFuture<S, C>
1834 where
1835 S: Stream<Item = Result<T, E>>,
1836 C: Default + Extend<T>,
1837 {
1838 type Output = Result<C, E>;
1839
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1840 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1841 let mut this = self.project();
1842 Poll::Ready(Ok(loop {
1843 match ready!(this.stream.as_mut().poll_next(cx)?) {
1844 Some(x) => this.items.extend(Some(x)),
1845 None => break mem::replace(this.items, Default::default()),
1846 }
1847 }))
1848 }
1849 }
1850
1851 pin_project! {
1852 /// Future for the [`StreamExt::partition()`] method.
1853 #[derive(Debug)]
1854 #[must_use = "futures do nothing unless you `.await` or poll them"]
1855 pub struct PartitionFuture<S, P, B> {
1856 #[pin]
1857 stream: S,
1858 predicate: P,
1859 res: Option<(B, B)>,
1860 }
1861 }
1862
1863 impl<S, P, B> Future for PartitionFuture<S, P, B>
1864 where
1865 S: Stream + Sized,
1866 P: FnMut(&S::Item) -> bool,
1867 B: Default + Extend<S::Item>,
1868 {
1869 type Output = (B, B);
1870
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1871 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1872 let mut this = self.project();
1873 loop {
1874 match ready!(this.stream.as_mut().poll_next(cx)) {
1875 Some(v) => {
1876 let res = this.res.as_mut().unwrap();
1877 if (this.predicate)(&v) {
1878 res.0.extend(Some(v))
1879 } else {
1880 res.1.extend(Some(v))
1881 }
1882 }
1883 None => return Poll::Ready(this.res.take().unwrap()),
1884 }
1885 }
1886 }
1887 }
1888
1889 pin_project! {
1890 /// Future for the [`StreamExt::fold()`] method.
1891 #[derive(Debug)]
1892 #[must_use = "futures do nothing unless you `.await` or poll them"]
1893 pub struct FoldFuture<S, F, T> {
1894 #[pin]
1895 stream: S,
1896 f: F,
1897 acc: Option<T>,
1898 }
1899 }
1900
1901 impl<S, F, T> Future for FoldFuture<S, F, T>
1902 where
1903 S: Stream,
1904 F: FnMut(T, S::Item) -> T,
1905 {
1906 type Output = T;
1907
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1908 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1909 let mut this = self.project();
1910 loop {
1911 match ready!(this.stream.as_mut().poll_next(cx)) {
1912 Some(v) => {
1913 let old = this.acc.take().unwrap();
1914 let new = (this.f)(old, v);
1915 *this.acc = Some(new);
1916 }
1917 None => return Poll::Ready(this.acc.take().unwrap()),
1918 }
1919 }
1920 }
1921 }
1922
1923 /// Future for the [`StreamExt::try_fold()`] method.
1924 #[derive(Debug)]
1925 #[must_use = "futures do nothing unless you `.await` or poll them"]
1926 pub struct TryFoldFuture<'a, S, F, B> {
1927 stream: &'a mut S,
1928 f: F,
1929 acc: Option<B>,
1930 }
1931
1932 impl<'a, S, F, B> Unpin for TryFoldFuture<'a, S, F, B> {}
1933
1934 impl<'a, T, E, S, F, B> Future for TryFoldFuture<'a, S, F, B>
1935 where
1936 S: Stream<Item = Result<T, E>> + Unpin,
1937 F: FnMut(B, T) -> Result<B, E>,
1938 {
1939 type Output = Result<B, E>;
1940
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1941 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1942 loop {
1943 match ready!(self.stream.poll_next(cx)) {
1944 Some(Err(e)) => return Poll::Ready(Err(e)),
1945 Some(Ok(t)) => {
1946 let old = self.acc.take().unwrap();
1947 let new = (&mut self.f)(old, t);
1948
1949 match new {
1950 Ok(t) => self.acc = Some(t),
1951 Err(e) => return Poll::Ready(Err(e)),
1952 }
1953 }
1954 None => return Poll::Ready(Ok(self.acc.take().unwrap())),
1955 }
1956 }
1957 }
1958 }
1959
1960 pin_project! {
1961 /// Stream for the [`StreamExt::scan()`] method.
1962 #[derive(Clone, Debug)]
1963 #[must_use = "streams do nothing unless polled"]
1964 pub struct Scan<S, St, F> {
1965 #[pin]
1966 stream: S,
1967 state_f: (St, F),
1968 }
1969 }
1970
1971 impl<S, St, F, B> Stream for Scan<S, St, F>
1972 where
1973 S: Stream,
1974 F: FnMut(&mut St, S::Item) -> Option<B>,
1975 {
1976 type Item = B;
1977
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>>1978 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
1979 let mut this = self.project();
1980 this.stream.as_mut().poll_next(cx).map(|item| {
1981 item.and_then(|item| {
1982 let (state, f) = this.state_f;
1983 f(state, item)
1984 })
1985 })
1986 }
1987 }
1988
1989 pin_project! {
1990 /// Stream for the [`StreamExt::fuse()`] method.
1991 #[derive(Clone, Debug)]
1992 #[must_use = "streams do nothing unless polled"]
1993 pub struct Fuse<S> {
1994 #[pin]
1995 stream: S,
1996 done: bool,
1997 }
1998 }
1999
2000 impl<S: Stream> Stream for Fuse<S> {
2001 type Item = S::Item;
2002
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>>2003 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2004 let this = self.project();
2005
2006 if *this.done {
2007 Poll::Ready(None)
2008 } else {
2009 let next = ready!(this.stream.poll_next(cx));
2010 if next.is_none() {
2011 *this.done = true;
2012 }
2013 Poll::Ready(next)
2014 }
2015 }
2016 }
2017
2018 pin_project! {
2019 /// Stream for the [`StreamExt::map()`] method.
2020 #[derive(Clone, Debug)]
2021 #[must_use = "streams do nothing unless polled"]
2022 pub struct Map<S, F> {
2023 #[pin]
2024 stream: S,
2025 f: F,
2026 }
2027 }
2028
2029 impl<S, F, T> Stream for Map<S, F>
2030 where
2031 S: Stream,
2032 F: FnMut(S::Item) -> T,
2033 {
2034 type Item = T;
2035
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2036 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2037 let this = self.project();
2038 let next = ready!(this.stream.poll_next(cx));
2039 Poll::Ready(next.map(this.f))
2040 }
2041
size_hint(&self) -> (usize, Option<usize>)2042 fn size_hint(&self) -> (usize, Option<usize>) {
2043 self.stream.size_hint()
2044 }
2045 }
2046
2047 pin_project! {
2048 /// Stream for the [`StreamExt::flat_map()`] method.
2049 #[derive(Clone, Debug)]
2050 #[must_use = "streams do nothing unless polled"]
2051 pub struct FlatMap<S, U, F> {
2052 #[pin]
2053 stream: Map<S, F>,
2054 #[pin]
2055 inner_stream: Option<U>,
2056 }
2057 }
2058
2059 impl<S, U, F> Stream for FlatMap<S, U, F>
2060 where
2061 S: Stream,
2062 U: Stream,
2063 F: FnMut(S::Item) -> U,
2064 {
2065 type Item = U::Item;
2066
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2067 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2068 let mut this = self.project();
2069 loop {
2070 if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2071 match ready!(inner.poll_next(cx)) {
2072 Some(item) => return Poll::Ready(Some(item)),
2073 None => this.inner_stream.set(None),
2074 }
2075 }
2076
2077 match ready!(this.stream.as_mut().poll_next(cx)) {
2078 Some(stream) => this.inner_stream.set(Some(stream)),
2079 None => return Poll::Ready(None),
2080 }
2081 }
2082 }
2083 }
2084
2085 pin_project! {
2086 /// Stream for the [`StreamExt::flat_map()`] method.
2087 #[derive(Clone, Debug)]
2088 #[must_use = "streams do nothing unless polled"]
2089 pub struct Flatten<S: Stream> {
2090 #[pin]
2091 stream: S,
2092 #[pin]
2093 inner_stream: Option<S::Item>,
2094 }
2095 }
2096
2097 impl<S, U> Stream for Flatten<S>
2098 where
2099 S: Stream<Item = U>,
2100 U: Stream,
2101 {
2102 type Item = U::Item;
2103
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2104 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2105 let mut this = self.project();
2106 loop {
2107 if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2108 match ready!(inner.poll_next(cx)) {
2109 Some(item) => return Poll::Ready(Some(item)),
2110 None => this.inner_stream.set(None),
2111 }
2112 }
2113
2114 match ready!(this.stream.as_mut().poll_next(cx)) {
2115 Some(inner) => this.inner_stream.set(Some(inner)),
2116 None => return Poll::Ready(None),
2117 }
2118 }
2119 }
2120 }
2121
2122 pin_project! {
2123 /// Stream for the [`StreamExt::then()`] method.
2124 #[derive(Clone, Debug)]
2125 #[must_use = "streams do nothing unless polled"]
2126 pub struct Then<S, F, Fut> {
2127 #[pin]
2128 stream: S,
2129 #[pin]
2130 future: Option<Fut>,
2131 f: F,
2132 }
2133 }
2134
2135 impl<S, F, Fut> Stream for Then<S, F, Fut>
2136 where
2137 S: Stream,
2138 F: FnMut(S::Item) -> Fut,
2139 Fut: Future,
2140 {
2141 type Item = Fut::Output;
2142
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2143 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2144 let mut this = self.project();
2145
2146 loop {
2147 if let Some(fut) = this.future.as_mut().as_pin_mut() {
2148 let item = ready!(fut.poll(cx));
2149 this.future.set(None);
2150 return Poll::Ready(Some(item));
2151 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
2152 this.future.set(Some((this.f)(item)));
2153 } else {
2154 return Poll::Ready(None);
2155 }
2156 }
2157 }
2158
size_hint(&self) -> (usize, Option<usize>)2159 fn size_hint(&self) -> (usize, Option<usize>) {
2160 let future_len = if self.future.is_some() { 1 } else { 0 };
2161 let (lower, upper) = self.stream.size_hint();
2162 let lower = lower.saturating_add(future_len);
2163 let upper = upper.and_then(|u| u.checked_add(future_len));
2164 (lower, upper)
2165 }
2166 }
2167
2168 pin_project! {
2169 /// Stream for the [`StreamExt::filter()`] method.
2170 #[derive(Clone, Debug)]
2171 #[must_use = "streams do nothing unless polled"]
2172 pub struct Filter<S, P> {
2173 #[pin]
2174 stream: S,
2175 predicate: P,
2176 }
2177 }
2178
2179 impl<S, P> Stream for Filter<S, P>
2180 where
2181 S: Stream,
2182 P: FnMut(&S::Item) -> bool,
2183 {
2184 type Item = S::Item;
2185
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2186 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2187 let mut this = self.project();
2188 loop {
2189 match ready!(this.stream.as_mut().poll_next(cx)) {
2190 None => return Poll::Ready(None),
2191 Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
2192 Some(_) => {}
2193 }
2194 }
2195 }
2196 }
2197
2198 /// Merges two streams, preferring items from `stream1` whenever both streams are ready.
2199 ///
2200 /// # Examples
2201 ///
2202 /// ```
2203 /// use futures_lite::stream::{self, once, pending, StreamExt};
2204 ///
2205 /// # spin_on::spin_on(async {
2206 /// assert_eq!(stream::or(once(1), pending()).next().await, Some(1));
2207 /// assert_eq!(stream::or(pending(), once(2)).next().await, Some(2));
2208 ///
2209 /// // The first stream wins.
2210 /// assert_eq!(stream::or(once(1), once(2)).next().await, Some(1));
2211 /// # })
2212 /// ```
or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2> where S1: Stream<Item = T>, S2: Stream<Item = T>,2213 pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
2214 where
2215 S1: Stream<Item = T>,
2216 S2: Stream<Item = T>,
2217 {
2218 Or { stream1, stream2 }
2219 }
2220
2221 pin_project! {
2222 /// Stream for the [`or()`] function and the [`StreamExt::or()`] method.
2223 #[derive(Clone, Debug)]
2224 #[must_use = "streams do nothing unless polled"]
2225 pub struct Or<S1, S2> {
2226 #[pin]
2227 stream1: S1,
2228 #[pin]
2229 stream2: S2,
2230 }
2231 }
2232
2233 impl<T, S1, S2> Stream for Or<S1, S2>
2234 where
2235 S1: Stream<Item = T>,
2236 S2: Stream<Item = T>,
2237 {
2238 type Item = T;
2239
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2240 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2241 let mut this = self.project();
2242
2243 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2244 return Poll::Ready(Some(t));
2245 }
2246 this.stream2.as_mut().poll_next(cx)
2247 }
2248 }
2249
2250 /// Merges two streams, with no preference for either stream when both are ready.
2251 ///
2252 /// # Examples
2253 ///
2254 /// ```
2255 /// use futures_lite::stream::{self, once, pending, StreamExt};
2256 ///
2257 /// # spin_on::spin_on(async {
2258 /// assert_eq!(stream::race(once(1), pending()).next().await, Some(1));
2259 /// assert_eq!(stream::race(pending(), once(2)).next().await, Some(2));
2260 ///
2261 /// // One of the two stream is randomly chosen as the winner.
2262 /// let res = stream::race(once(1), once(2)).next().await;
2263 /// # })
2264 #[cfg(feature = "std")]
race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2> where S1: Stream<Item = T>, S2: Stream<Item = T>,2265 pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
2266 where
2267 S1: Stream<Item = T>,
2268 S2: Stream<Item = T>,
2269 {
2270 Race { stream1, stream2 }
2271 }
2272
2273 #[cfg(feature = "std")]
2274 pin_project! {
2275 /// Stream for the [`race()`] function and the [`StreamExt::race()`] method.
2276 #[derive(Clone, Debug)]
2277 #[must_use = "streams do nothing unless polled"]
2278 pub struct Race<S1, S2> {
2279 #[pin]
2280 stream1: S1,
2281 #[pin]
2282 stream2: S2,
2283 }
2284 }
2285
2286 #[cfg(feature = "std")]
2287 impl<T, S1, S2> Stream for Race<S1, S2>
2288 where
2289 S1: Stream<Item = T>,
2290 S2: Stream<Item = T>,
2291 {
2292 type Item = T;
2293
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2294 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2295 let mut this = self.project();
2296
2297 if fastrand::bool() {
2298 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2299 return Poll::Ready(Some(t));
2300 }
2301 if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2302 return Poll::Ready(Some(t));
2303 }
2304 } else {
2305 if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2306 return Poll::Ready(Some(t));
2307 }
2308 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2309 return Poll::Ready(Some(t));
2310 }
2311 }
2312 Poll::Pending
2313 }
2314 }
2315
2316 pin_project! {
2317 /// Stream for the [`StreamExt::filter_map()`] method.
2318 #[derive(Clone, Debug)]
2319 #[must_use = "streams do nothing unless polled"]
2320 pub struct FilterMap<S, F> {
2321 #[pin]
2322 stream: S,
2323 f: F,
2324 }
2325 }
2326
2327 impl<S, F, T> Stream for FilterMap<S, F>
2328 where
2329 S: Stream,
2330 F: FnMut(S::Item) -> Option<T>,
2331 {
2332 type Item = T;
2333
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2334 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2335 let mut this = self.project();
2336 loop {
2337 match ready!(this.stream.as_mut().poll_next(cx)) {
2338 None => return Poll::Ready(None),
2339 Some(v) => {
2340 if let Some(t) = (this.f)(v) {
2341 return Poll::Ready(Some(t));
2342 }
2343 }
2344 }
2345 }
2346 }
2347 }
2348
2349 pin_project! {
2350 /// Stream for the [`StreamExt::take()`] method.
2351 #[derive(Clone, Debug)]
2352 #[must_use = "streams do nothing unless polled"]
2353 pub struct Take<S> {
2354 #[pin]
2355 stream: S,
2356 n: usize,
2357 }
2358 }
2359
2360 impl<S: Stream> Stream for Take<S> {
2361 type Item = S::Item;
2362
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>>2363 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2364 let this = self.project();
2365
2366 if *this.n == 0 {
2367 Poll::Ready(None)
2368 } else {
2369 let next = ready!(this.stream.poll_next(cx));
2370 match next {
2371 Some(_) => *this.n -= 1,
2372 None => *this.n = 0,
2373 }
2374 Poll::Ready(next)
2375 }
2376 }
2377 }
2378
2379 pin_project! {
2380 /// Stream for the [`StreamExt::take_while()`] method.
2381 #[derive(Clone, Debug)]
2382 #[must_use = "streams do nothing unless polled"]
2383 pub struct TakeWhile<S, P> {
2384 #[pin]
2385 stream: S,
2386 predicate: P,
2387 }
2388 }
2389
2390 impl<S, P> Stream for TakeWhile<S, P>
2391 where
2392 S: Stream,
2393 P: FnMut(&S::Item) -> bool,
2394 {
2395 type Item = S::Item;
2396
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2397 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2398 let this = self.project();
2399
2400 match ready!(this.stream.poll_next(cx)) {
2401 Some(v) => {
2402 if (this.predicate)(&v) {
2403 Poll::Ready(Some(v))
2404 } else {
2405 Poll::Ready(None)
2406 }
2407 }
2408 None => Poll::Ready(None),
2409 }
2410 }
2411 }
2412
2413 pin_project! {
2414 /// Stream for the [`StreamExt::skip()`] method.
2415 #[derive(Clone, Debug)]
2416 #[must_use = "streams do nothing unless polled"]
2417 pub struct Skip<S> {
2418 #[pin]
2419 stream: S,
2420 n: usize,
2421 }
2422 }
2423
2424 impl<S: Stream> Stream for Skip<S> {
2425 type Item = S::Item;
2426
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2427 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2428 let mut this = self.project();
2429 loop {
2430 match ready!(this.stream.as_mut().poll_next(cx)) {
2431 Some(v) => match *this.n {
2432 0 => return Poll::Ready(Some(v)),
2433 _ => *this.n -= 1,
2434 },
2435 None => return Poll::Ready(None),
2436 }
2437 }
2438 }
2439 }
2440
2441 pin_project! {
2442 /// Stream for the [`StreamExt::skip_while()`] method.
2443 #[derive(Clone, Debug)]
2444 #[must_use = "streams do nothing unless polled"]
2445 pub struct SkipWhile<S, P> {
2446 #[pin]
2447 stream: S,
2448 predicate: Option<P>,
2449 }
2450 }
2451
2452 impl<S, P> Stream for SkipWhile<S, P>
2453 where
2454 S: Stream,
2455 P: FnMut(&S::Item) -> bool,
2456 {
2457 type Item = S::Item;
2458
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2459 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2460 let mut this = self.project();
2461 loop {
2462 match ready!(this.stream.as_mut().poll_next(cx)) {
2463 Some(v) => match this.predicate {
2464 Some(p) => {
2465 if !p(&v) {
2466 *this.predicate = None;
2467 return Poll::Ready(Some(v));
2468 }
2469 }
2470 None => return Poll::Ready(Some(v)),
2471 },
2472 None => return Poll::Ready(None),
2473 }
2474 }
2475 }
2476 }
2477
2478 pin_project! {
2479 /// Stream for the [`StreamExt::step_by()`] method.
2480 #[derive(Clone, Debug)]
2481 #[must_use = "streams do nothing unless polled"]
2482 pub struct StepBy<S> {
2483 #[pin]
2484 stream: S,
2485 step: usize,
2486 i: usize,
2487 }
2488 }
2489
2490 impl<S: Stream> Stream for StepBy<S> {
2491 type Item = S::Item;
2492
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2493 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2494 let mut this = self.project();
2495 loop {
2496 match ready!(this.stream.as_mut().poll_next(cx)) {
2497 Some(v) => {
2498 if *this.i == 0 {
2499 *this.i = *this.step - 1;
2500 return Poll::Ready(Some(v));
2501 } else {
2502 *this.i -= 1;
2503 }
2504 }
2505 None => return Poll::Ready(None),
2506 }
2507 }
2508 }
2509 }
2510
2511 pin_project! {
2512 /// Stream for the [`StreamExt::chain()`] method.
2513 #[derive(Clone, Debug)]
2514 #[must_use = "streams do nothing unless polled"]
2515 pub struct Chain<S, U> {
2516 #[pin]
2517 first: Fuse<S>,
2518 #[pin]
2519 second: Fuse<U>,
2520 }
2521 }
2522
2523 impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
2524 type Item = S::Item;
2525
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2526 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2527 let mut this = self.project();
2528
2529 if !this.first.done {
2530 let next = ready!(this.first.as_mut().poll_next(cx));
2531 if let Some(next) = next {
2532 return Poll::Ready(Some(next));
2533 }
2534 }
2535
2536 if !this.second.done {
2537 let next = ready!(this.second.as_mut().poll_next(cx));
2538 if let Some(next) = next {
2539 return Poll::Ready(Some(next));
2540 }
2541 }
2542
2543 if this.first.done && this.second.done {
2544 Poll::Ready(None)
2545 } else {
2546 Poll::Pending
2547 }
2548 }
2549 }
2550
2551 pin_project! {
2552 /// Stream for the [`StreamExt::cloned()`] method.
2553 #[derive(Clone, Debug)]
2554 #[must_use = "streams do nothing unless polled"]
2555 pub struct Cloned<S> {
2556 #[pin]
2557 stream: S,
2558 }
2559 }
2560
2561 impl<'a, S, T: 'a> Stream for Cloned<S>
2562 where
2563 S: Stream<Item = &'a T>,
2564 T: Clone,
2565 {
2566 type Item = T;
2567
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2568 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2569 let this = self.project();
2570 let next = ready!(this.stream.poll_next(cx));
2571 Poll::Ready(next.cloned())
2572 }
2573 }
2574
2575 pin_project! {
2576 /// Stream for the [`StreamExt::copied()`] method.
2577 #[derive(Clone, Debug)]
2578 #[must_use = "streams do nothing unless polled"]
2579 pub struct Copied<S> {
2580 #[pin]
2581 stream: S,
2582 }
2583 }
2584
2585 impl<'a, S, T: 'a> Stream for Copied<S>
2586 where
2587 S: Stream<Item = &'a T>,
2588 T: Copy,
2589 {
2590 type Item = T;
2591
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2592 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2593 let this = self.project();
2594 let next = ready!(this.stream.poll_next(cx));
2595 Poll::Ready(next.copied())
2596 }
2597 }
2598
2599 pin_project! {
2600 /// Stream for the [`StreamExt::cycle()`] method.
2601 #[derive(Clone, Debug)]
2602 #[must_use = "streams do nothing unless polled"]
2603 pub struct Cycle<S> {
2604 orig: S,
2605 #[pin]
2606 stream: S,
2607 }
2608 }
2609
2610 impl<S> Stream for Cycle<S>
2611 where
2612 S: Stream + Clone,
2613 {
2614 type Item = S::Item;
2615
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2616 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2617 match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
2618 Some(item) => Poll::Ready(Some(item)),
2619 None => {
2620 let new = self.as_mut().orig.clone();
2621 self.as_mut().project().stream.set(new);
2622 self.project().stream.poll_next(cx)
2623 }
2624 }
2625 }
2626 }
2627
2628 pin_project! {
2629 /// Stream for the [`StreamExt::cycle()`] method.
2630 #[derive(Clone, Debug)]
2631 #[must_use = "streams do nothing unless polled"]
2632 pub struct Enumerate<S> {
2633 #[pin]
2634 stream: S,
2635 i: usize,
2636 }
2637 }
2638
2639 impl<S> Stream for Enumerate<S>
2640 where
2641 S: Stream,
2642 {
2643 type Item = (usize, S::Item);
2644
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2645 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2646 let this = self.project();
2647
2648 match ready!(this.stream.poll_next(cx)) {
2649 Some(v) => {
2650 let ret = (*this.i, v);
2651 *this.i += 1;
2652 Poll::Ready(Some(ret))
2653 }
2654 None => Poll::Ready(None),
2655 }
2656 }
2657 }
2658
2659 pin_project! {
2660 /// Stream for the [`StreamExt::inspect()`] method.
2661 #[derive(Clone, Debug)]
2662 #[must_use = "streams do nothing unless polled"]
2663 pub struct Inspect<S, F> {
2664 #[pin]
2665 stream: S,
2666 f: F,
2667 }
2668 }
2669
2670 impl<S, F> Stream for Inspect<S, F>
2671 where
2672 S: Stream,
2673 F: FnMut(&S::Item),
2674 {
2675 type Item = S::Item;
2676
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2677 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2678 let mut this = self.project();
2679 let next = ready!(this.stream.as_mut().poll_next(cx));
2680 if let Some(x) = &next {
2681 (this.f)(x);
2682 }
2683 Poll::Ready(next)
2684 }
2685 }
2686
2687 /// Future for the [`StreamExt::nth()`] method.
2688 #[derive(Debug)]
2689 #[must_use = "futures do nothing unless you `.await` or poll them"]
2690 pub struct NthFuture<'a, S: ?Sized> {
2691 stream: &'a mut S,
2692 n: usize,
2693 }
2694
2695 impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
2696
2697 impl<'a, S> Future for NthFuture<'a, S>
2698 where
2699 S: Stream + Unpin + ?Sized,
2700 {
2701 type Output = Option<S::Item>;
2702
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2703 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2704 loop {
2705 match ready!(self.stream.poll_next(cx)) {
2706 Some(v) => match self.n {
2707 0 => return Poll::Ready(Some(v)),
2708 _ => self.n -= 1,
2709 },
2710 None => return Poll::Ready(None),
2711 }
2712 }
2713 }
2714 }
2715
2716 pin_project! {
2717 /// Future for the [`StreamExt::last()`] method.
2718 #[derive(Debug)]
2719 #[must_use = "futures do nothing unless you `.await` or poll them"]
2720 pub struct LastFuture<S: Stream> {
2721 #[pin]
2722 stream: S,
2723 last: Option<S::Item>,
2724 }
2725 }
2726
2727 impl<S: Stream> Future for LastFuture<S> {
2728 type Output = Option<S::Item>;
2729
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2730 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2731 let mut this = self.project();
2732 loop {
2733 match ready!(this.stream.as_mut().poll_next(cx)) {
2734 Some(new) => *this.last = Some(new),
2735 None => return Poll::Ready(this.last.take()),
2736 }
2737 }
2738 }
2739 }
2740
2741 /// Future for the [`StreamExt::find()`] method.
2742 #[derive(Debug)]
2743 #[must_use = "futures do nothing unless you `.await` or poll them"]
2744 pub struct FindFuture<'a, S: ?Sized, P> {
2745 stream: &'a mut S,
2746 predicate: P,
2747 }
2748
2749 impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
2750
2751 impl<'a, S, P> Future for FindFuture<'a, S, P>
2752 where
2753 S: Stream + Unpin + ?Sized,
2754 P: FnMut(&S::Item) -> bool,
2755 {
2756 type Output = Option<S::Item>;
2757
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2758 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2759 loop {
2760 match ready!(self.stream.poll_next(cx)) {
2761 Some(v) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
2762 Some(_) => {}
2763 None => return Poll::Ready(None),
2764 }
2765 }
2766 }
2767 }
2768
2769 /// Future for the [`StreamExt::find_map()`] method.
2770 #[derive(Debug)]
2771 #[must_use = "futures do nothing unless you `.await` or poll them"]
2772 pub struct FindMapFuture<'a, S: ?Sized, F> {
2773 stream: &'a mut S,
2774 f: F,
2775 }
2776
2777 impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
2778
2779 impl<'a, S, B, F> Future for FindMapFuture<'a, S, F>
2780 where
2781 S: Stream + Unpin + ?Sized,
2782 F: FnMut(S::Item) -> Option<B>,
2783 {
2784 type Output = Option<B>;
2785
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2786 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2787 loop {
2788 match ready!(self.stream.poll_next(cx)) {
2789 Some(v) => {
2790 if let Some(v) = (&mut self.f)(v) {
2791 return Poll::Ready(Some(v));
2792 }
2793 }
2794 None => return Poll::Ready(None),
2795 }
2796 }
2797 }
2798 }
2799
2800 /// Future for the [`StreamExt::position()`] method.
2801 #[derive(Debug)]
2802 #[must_use = "futures do nothing unless you `.await` or poll them"]
2803 pub struct PositionFuture<'a, S: ?Sized, P> {
2804 stream: &'a mut S,
2805 predicate: P,
2806 index: usize,
2807 }
2808
2809 impl<'a, S: Unpin + ?Sized, P> Unpin for PositionFuture<'a, S, P> {}
2810
2811 impl<'a, S, P> Future for PositionFuture<'a, S, P>
2812 where
2813 S: Stream + Unpin + ?Sized,
2814 P: FnMut(S::Item) -> bool,
2815 {
2816 type Output = Option<usize>;
2817
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2818 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2819 loop {
2820 match ready!(self.stream.poll_next(cx)) {
2821 Some(v) => {
2822 if (&mut self.predicate)(v) {
2823 return Poll::Ready(Some(self.index));
2824 } else {
2825 self.index += 1;
2826 }
2827 }
2828 None => return Poll::Ready(None),
2829 }
2830 }
2831 }
2832 }
2833
2834 /// Future for the [`StreamExt::all()`] method.
2835 #[derive(Debug)]
2836 #[must_use = "futures do nothing unless you `.await` or poll them"]
2837 pub struct AllFuture<'a, S: ?Sized, P> {
2838 stream: &'a mut S,
2839 predicate: P,
2840 }
2841
2842 impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
2843
2844 impl<S, P> Future for AllFuture<'_, S, P>
2845 where
2846 S: Stream + Unpin + ?Sized,
2847 P: FnMut(S::Item) -> bool,
2848 {
2849 type Output = bool;
2850
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2851 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2852 loop {
2853 match ready!(self.stream.poll_next(cx)) {
2854 Some(v) => {
2855 if !(&mut self.predicate)(v) {
2856 return Poll::Ready(false);
2857 }
2858 }
2859 None => return Poll::Ready(true),
2860 }
2861 }
2862 }
2863 }
2864
2865 /// Future for the [`StreamExt::any()`] method.
2866 #[derive(Debug)]
2867 #[must_use = "futures do nothing unless you `.await` or poll them"]
2868 pub struct AnyFuture<'a, S: ?Sized, P> {
2869 stream: &'a mut S,
2870 predicate: P,
2871 }
2872
2873 impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
2874
2875 impl<S, P> Future for AnyFuture<'_, S, P>
2876 where
2877 S: Stream + Unpin + ?Sized,
2878 P: FnMut(S::Item) -> bool,
2879 {
2880 type Output = bool;
2881
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2882 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2883 loop {
2884 match ready!(self.stream.poll_next(cx)) {
2885 Some(v) => {
2886 if (&mut self.predicate)(v) {
2887 return Poll::Ready(true);
2888 }
2889 }
2890 None => return Poll::Ready(false),
2891 }
2892 }
2893 }
2894 }
2895
2896 pin_project! {
2897 /// Future for the [`StreamExt::for_each()`] method.
2898 #[derive(Debug)]
2899 #[must_use = "futures do nothing unless you `.await` or poll them"]
2900 pub struct ForEachFuture<S, F> {
2901 #[pin]
2902 stream: S,
2903 f: F,
2904 }
2905 }
2906
2907 impl<S, F> Future for ForEachFuture<S, F>
2908 where
2909 S: Stream,
2910 F: FnMut(S::Item),
2911 {
2912 type Output = ();
2913
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2914 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2915 let mut this = self.project();
2916 loop {
2917 match ready!(this.stream.as_mut().poll_next(cx)) {
2918 Some(v) => (this.f)(v),
2919 None => return Poll::Ready(()),
2920 }
2921 }
2922 }
2923 }
2924
2925 /// Future for the [`StreamExt::try_for_each()`] method.
2926 #[derive(Debug)]
2927 #[must_use = "futures do nothing unless you `.await` or poll them"]
2928 pub struct TryForEachFuture<'a, S: ?Sized, F> {
2929 stream: &'a mut S,
2930 f: F,
2931 }
2932
2933 impl<'a, S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'a, S, F> {}
2934
2935 impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F>
2936 where
2937 S: Stream + Unpin + ?Sized,
2938 F: FnMut(S::Item) -> Result<(), E>,
2939 {
2940 type Output = Result<(), E>;
2941
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>2942 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2943 loop {
2944 match ready!(self.stream.poll_next(cx)) {
2945 None => return Poll::Ready(Ok(())),
2946 Some(v) => (&mut self.f)(v)?,
2947 }
2948 }
2949 }
2950 }
2951
2952 pin_project! {
2953 /// Stream for the [`StreamExt::zip()`] method.
2954 #[derive(Clone, Debug)]
2955 #[must_use = "streams do nothing unless polled"]
2956 pub struct Zip<A: Stream, B> {
2957 item_slot: Option<A::Item>,
2958 #[pin]
2959 first: A,
2960 #[pin]
2961 second: B,
2962 }
2963 }
2964
2965 impl<A: Stream, B: Stream> Stream for Zip<A, B> {
2966 type Item = (A::Item, B::Item);
2967
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>2968 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2969 let this = self.project();
2970
2971 if this.item_slot.is_none() {
2972 match this.first.poll_next(cx) {
2973 Poll::Pending => return Poll::Pending,
2974 Poll::Ready(None) => return Poll::Ready(None),
2975 Poll::Ready(Some(item)) => *this.item_slot = Some(item),
2976 }
2977 }
2978
2979 let second_item = ready!(this.second.poll_next(cx));
2980 let first_item = this.item_slot.take().unwrap();
2981 Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
2982 }
2983 }
2984
2985 pin_project! {
2986 /// Future for the [`StreamExt::unzip()`] method.
2987 #[derive(Debug)]
2988 #[must_use = "futures do nothing unless you `.await` or poll them"]
2989 pub struct UnzipFuture<S, FromA, FromB> {
2990 #[pin]
2991 stream: S,
2992 res: Option<(FromA, FromB)>,
2993 }
2994 }
2995
2996 impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
2997 where
2998 S: Stream<Item = (A, B)>,
2999 FromA: Default + Extend<A>,
3000 FromB: Default + Extend<B>,
3001 {
3002 type Output = (FromA, FromB);
3003
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>3004 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3005 let mut this = self.project();
3006
3007 loop {
3008 match ready!(this.stream.as_mut().poll_next(cx)) {
3009 Some((a, b)) => {
3010 let res = this.res.as_mut().unwrap();
3011 res.0.extend(Some(a));
3012 res.1.extend(Some(b));
3013 }
3014 None => return Poll::Ready(this.res.take().unwrap()),
3015 }
3016 }
3017 }
3018 }
3019