1 //! Streams 2 //! 3 //! This module contains a number of functions for working with `Stream`s, 4 //! including the `StreamExt` trait which adds methods to `Stream` types. 5 6 use crate::future::{assert_future, Either}; 7 use crate::stream::assert_stream; 8 #[cfg(feature = "alloc")] 9 use alloc::boxed::Box; 10 #[cfg(feature = "alloc")] 11 use alloc::vec::Vec; 12 use core::pin::Pin; 13 #[cfg(feature = "sink")] 14 use futures_core::stream::TryStream; 15 #[cfg(feature = "alloc")] 16 use futures_core::stream::{BoxStream, LocalBoxStream}; 17 use futures_core::{ 18 future::Future, 19 stream::{FusedStream, Stream}, 20 task::{Context, Poll}, 21 }; 22 #[cfg(feature = "sink")] 23 use futures_sink::Sink; 24 25 use crate::fns::{inspect_fn, InspectFn}; 26 27 mod chain; 28 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 29 pub use self::chain::Chain; 30 31 mod collect; 32 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 33 pub use self::collect::Collect; 34 35 mod unzip; 36 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 37 pub use self::unzip::Unzip; 38 39 mod concat; 40 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 41 pub use self::concat::Concat; 42 43 mod count; 44 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 45 pub use self::count::Count; 46 47 mod cycle; 48 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 49 pub use self::cycle::Cycle; 50 51 mod enumerate; 52 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 53 pub use self::enumerate::Enumerate; 54 55 mod filter; 56 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 57 pub use self::filter::Filter; 58 59 mod filter_map; 60 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 61 pub use self::filter_map::FilterMap; 62 63 mod flatten; 64 65 delegate_all!( 66 /// Stream for the [`flatten`](StreamExt::flatten) method. 67 Flatten<St>( 68 flatten::Flatten<St, St::Item> 69 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)] 70 where St: Stream 71 ); 72 73 mod fold; 74 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 75 pub use self::fold::Fold; 76 77 mod any; 78 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 79 pub use self::any::Any; 80 81 mod all; 82 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 83 pub use self::all::All; 84 85 #[cfg(feature = "sink")] 86 mod forward; 87 88 #[cfg(feature = "sink")] 89 delegate_all!( 90 /// Future for the [`forward`](super::StreamExt::forward) method. 91 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 92 Forward<St, Si>( 93 forward::Forward<St, Si, St::Ok> 94 ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)] 95 where St: TryStream 96 ); 97 98 mod for_each; 99 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 100 pub use self::for_each::ForEach; 101 102 mod fuse; 103 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 104 pub use self::fuse::Fuse; 105 106 mod into_future; 107 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 108 pub use self::into_future::StreamFuture; 109 110 delegate_all!( 111 /// Stream for the [`inspect`](StreamExt::inspect) method. 112 Inspect<St, F>( 113 map::Map<St, InspectFn<F>> 114 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| map::Map::new(x, inspect_fn(f))] 115 ); 116 117 mod map; 118 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 119 pub use self::map::Map; 120 121 delegate_all!( 122 /// Stream for the [`flat_map`](StreamExt::flat_map) method. 123 FlatMap<St, U, F>( 124 flatten::Flatten<Map<St, F>, U> 125 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))] 126 ); 127 128 mod next; 129 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 130 pub use self::next::Next; 131 132 mod select_next_some; 133 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 134 pub use self::select_next_some::SelectNextSome; 135 136 mod peek; 137 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 138 pub use self::peek::{NextIf, NextIfEq, Peek, PeekMut, Peekable}; 139 140 mod skip; 141 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 142 pub use self::skip::Skip; 143 144 mod skip_while; 145 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 146 pub use self::skip_while::SkipWhile; 147 148 mod take; 149 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 150 pub use self::take::Take; 151 152 mod take_while; 153 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 154 pub use self::take_while::TakeWhile; 155 156 mod take_until; 157 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 158 pub use self::take_until::TakeUntil; 159 160 mod then; 161 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 162 pub use self::then::Then; 163 164 mod zip; 165 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 166 pub use self::zip::Zip; 167 168 #[cfg(feature = "alloc")] 169 mod chunks; 170 #[cfg(feature = "alloc")] 171 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 172 pub use self::chunks::Chunks; 173 174 #[cfg(feature = "alloc")] 175 mod ready_chunks; 176 #[cfg(feature = "alloc")] 177 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 178 pub use self::ready_chunks::ReadyChunks; 179 180 mod scan; 181 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 182 pub use self::scan::Scan; 183 184 #[cfg(not(futures_no_atomic_cas))] 185 #[cfg(feature = "alloc")] 186 mod buffer_unordered; 187 #[cfg(not(futures_no_atomic_cas))] 188 #[cfg(feature = "alloc")] 189 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 190 pub use self::buffer_unordered::BufferUnordered; 191 192 #[cfg(not(futures_no_atomic_cas))] 193 #[cfg(feature = "alloc")] 194 mod buffered; 195 #[cfg(not(futures_no_atomic_cas))] 196 #[cfg(feature = "alloc")] 197 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 198 pub use self::buffered::Buffered; 199 200 #[cfg(not(futures_no_atomic_cas))] 201 #[cfg(feature = "alloc")] 202 mod flatten_unordered; 203 204 #[cfg(not(futures_no_atomic_cas))] 205 #[cfg(feature = "alloc")] 206 #[allow(unreachable_pub)] 207 pub use self::flatten_unordered::FlattenUnordered; 208 209 #[cfg(not(futures_no_atomic_cas))] 210 #[cfg(feature = "alloc")] 211 delegate_all!( 212 /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method. 213 FlatMapUnordered<St, U, F>( 214 FlattenUnordered<Map<St, F>> 215 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)] 216 where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U 217 ); 218 219 #[cfg(not(futures_no_atomic_cas))] 220 #[cfg(feature = "alloc")] 221 mod for_each_concurrent; 222 #[cfg(not(futures_no_atomic_cas))] 223 #[cfg(feature = "alloc")] 224 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 225 pub use self::for_each_concurrent::ForEachConcurrent; 226 227 #[cfg(not(futures_no_atomic_cas))] 228 #[cfg(feature = "sink")] 229 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 230 #[cfg(feature = "alloc")] 231 mod split; 232 #[cfg(not(futures_no_atomic_cas))] 233 #[cfg(feature = "sink")] 234 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 235 #[cfg(feature = "alloc")] 236 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 237 pub use self::split::{ReuniteError, SplitSink, SplitStream}; 238 239 #[cfg(feature = "std")] 240 mod catch_unwind; 241 #[cfg(feature = "std")] 242 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 243 pub use self::catch_unwind::CatchUnwind; 244 245 impl<T: ?Sized> StreamExt for T where T: Stream {} 246 247 /// An extension trait for `Stream`s that provides a variety of convenient 248 /// combinator functions. 249 pub trait StreamExt: Stream { 250 /// Creates a future that resolves to the next item in the stream. 251 /// 252 /// Note that because `next` doesn't take ownership over the stream, 253 /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a 254 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can 255 /// be done by boxing the stream using [`Box::pin`] or 256 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` 257 /// crate. 258 /// 259 /// # Examples 260 /// 261 /// ``` 262 /// # futures::executor::block_on(async { 263 /// use futures::stream::{self, StreamExt}; 264 /// 265 /// let mut stream = stream::iter(1..=3); 266 /// 267 /// assert_eq!(stream.next().await, Some(1)); 268 /// assert_eq!(stream.next().await, Some(2)); 269 /// assert_eq!(stream.next().await, Some(3)); 270 /// assert_eq!(stream.next().await, None); 271 /// # }); 272 /// ``` next(&mut self) -> Next<'_, Self> where Self: Unpin,273 fn next(&mut self) -> Next<'_, Self> 274 where 275 Self: Unpin, 276 { 277 assert_future::<Option<Self::Item>, _>(Next::new(self)) 278 } 279 280 /// Converts this stream into a future of `(next_item, tail_of_stream)`. 281 /// If the stream terminates, then the next item is [`None`]. 282 /// 283 /// The returned future can be used to compose streams and futures together 284 /// by placing everything into the "world of futures". 285 /// 286 /// Note that because `into_future` moves the stream, the [`Stream`] type 287 /// must be [`Unpin`]. If you want to use `into_future` with a 288 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can 289 /// be done by boxing the stream using [`Box::pin`] or 290 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` 291 /// crate. 292 /// 293 /// # Examples 294 /// 295 /// ``` 296 /// # futures::executor::block_on(async { 297 /// use futures::stream::{self, StreamExt}; 298 /// 299 /// let stream = stream::iter(1..=3); 300 /// 301 /// let (item, stream) = stream.into_future().await; 302 /// assert_eq!(Some(1), item); 303 /// 304 /// let (item, stream) = stream.into_future().await; 305 /// assert_eq!(Some(2), item); 306 /// # }); 307 /// ``` into_future(self) -> StreamFuture<Self> where Self: Sized + Unpin,308 fn into_future(self) -> StreamFuture<Self> 309 where 310 Self: Sized + Unpin, 311 { 312 assert_future::<(Option<Self::Item>, Self), _>(StreamFuture::new(self)) 313 } 314 315 /// Maps this stream's items to a different type, returning a new stream of 316 /// the resulting type. 317 /// 318 /// The provided closure is executed over all elements of this stream as 319 /// they are made available. It is executed inline with calls to 320 /// [`poll_next`](Stream::poll_next). 321 /// 322 /// Note that this function consumes the stream passed into it and returns a 323 /// wrapped version of it, similar to the existing `map` methods in the 324 /// standard library. 325 /// 326 /// # Examples 327 /// 328 /// ``` 329 /// # futures::executor::block_on(async { 330 /// use futures::stream::{self, StreamExt}; 331 /// 332 /// let stream = stream::iter(1..=3); 333 /// let stream = stream.map(|x| x + 3); 334 /// 335 /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await); 336 /// # }); 337 /// ``` map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,338 fn map<T, F>(self, f: F) -> Map<Self, F> 339 where 340 F: FnMut(Self::Item) -> T, 341 Self: Sized, 342 { 343 assert_stream::<T, _>(Map::new(self, f)) 344 } 345 346 /// Creates a stream which gives the current iteration count as well as 347 /// the next value. 348 /// 349 /// The stream returned yields pairs `(i, val)`, where `i` is the 350 /// current index of iteration and `val` is the value returned by the 351 /// stream. 352 /// 353 /// `enumerate()` keeps its count as a [`usize`]. If you want to count by a 354 /// different sized integer, the [`zip`](StreamExt::zip) function provides similar 355 /// functionality. 356 /// 357 /// # Overflow Behavior 358 /// 359 /// The method does no guarding against overflows, so enumerating more than 360 /// [`prim@usize::max_value()`] elements either produces the wrong result or panics. If 361 /// debug assertions are enabled, a panic is guaranteed. 362 /// 363 /// # Panics 364 /// 365 /// The returned stream might panic if the to-be-returned index would 366 /// overflow a [`usize`]. 367 /// 368 /// # Examples 369 /// 370 /// ``` 371 /// # futures::executor::block_on(async { 372 /// use futures::stream::{self, StreamExt}; 373 /// 374 /// let stream = stream::iter(vec!['a', 'b', 'c']); 375 /// 376 /// let mut stream = stream.enumerate(); 377 /// 378 /// assert_eq!(stream.next().await, Some((0, 'a'))); 379 /// assert_eq!(stream.next().await, Some((1, 'b'))); 380 /// assert_eq!(stream.next().await, Some((2, 'c'))); 381 /// assert_eq!(stream.next().await, None); 382 /// # }); 383 /// ``` enumerate(self) -> Enumerate<Self> where Self: Sized,384 fn enumerate(self) -> Enumerate<Self> 385 where 386 Self: Sized, 387 { 388 assert_stream::<(usize, Self::Item), _>(Enumerate::new(self)) 389 } 390 391 /// Filters the values produced by this stream according to the provided 392 /// asynchronous predicate. 393 /// 394 /// As values of this stream are made available, the provided predicate `f` 395 /// will be run against them. If the predicate returns a `Future` which 396 /// resolves to `true`, then the stream will yield the value, but if the 397 /// predicate returns a `Future` which resolves to `false`, then the value 398 /// will be discarded and the next value will be produced. 399 /// 400 /// Note that this function consumes the stream passed into it and returns a 401 /// wrapped version of it, similar to the existing `filter` methods in the 402 /// standard library. 403 /// 404 /// # Examples 405 /// 406 /// ``` 407 /// # futures::executor::block_on(async { 408 /// use futures::future; 409 /// use futures::stream::{self, StreamExt}; 410 /// 411 /// let stream = stream::iter(1..=10); 412 /// let events = stream.filter(|x| future::ready(x % 2 == 0)); 413 /// 414 /// assert_eq!(vec![2, 4, 6, 8, 10], events.collect::<Vec<_>>().await); 415 /// # }); 416 /// ``` filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,417 fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> 418 where 419 F: FnMut(&Self::Item) -> Fut, 420 Fut: Future<Output = bool>, 421 Self: Sized, 422 { 423 assert_stream::<Self::Item, _>(Filter::new(self, f)) 424 } 425 426 /// Filters the values produced by this stream while simultaneously mapping 427 /// them to a different type according to the provided asynchronous closure. 428 /// 429 /// As values of this stream are made available, the provided function will 430 /// be run on them. If the future returned by the predicate `f` resolves to 431 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if 432 /// it resolves to [`None`] then the next value will be produced. 433 /// 434 /// Note that this function consumes the stream passed into it and returns a 435 /// wrapped version of it, similar to the existing `filter_map` methods in 436 /// the standard library. 437 /// 438 /// # Examples 439 /// ``` 440 /// # futures::executor::block_on(async { 441 /// use futures::stream::{self, StreamExt}; 442 /// 443 /// let stream = stream::iter(1..=10); 444 /// let events = stream.filter_map(|x| async move { 445 /// if x % 2 == 0 { Some(x + 1) } else { None } 446 /// }); 447 /// 448 /// assert_eq!(vec![3, 5, 7, 9, 11], events.collect::<Vec<_>>().await); 449 /// # }); 450 /// ``` filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = Option<T>>, Self: Sized,451 fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> 452 where 453 F: FnMut(Self::Item) -> Fut, 454 Fut: Future<Output = Option<T>>, 455 Self: Sized, 456 { 457 assert_stream::<T, _>(FilterMap::new(self, f)) 458 } 459 460 /// Computes from this stream's items new items of a different type using 461 /// an asynchronous closure. 462 /// 463 /// The provided closure `f` will be called with an `Item` once a value is 464 /// ready, it returns a future which will then be run to completion 465 /// to produce the next value on this stream. 466 /// 467 /// Note that this function consumes the stream passed into it and returns a 468 /// wrapped version of it. 469 /// 470 /// # Examples 471 /// 472 /// ``` 473 /// # futures::executor::block_on(async { 474 /// use futures::stream::{self, StreamExt}; 475 /// 476 /// let stream = stream::iter(1..=3); 477 /// let stream = stream.then(|x| async move { x + 3 }); 478 /// 479 /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await); 480 /// # }); 481 /// ``` then<Fut, F>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,482 fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F> 483 where 484 F: FnMut(Self::Item) -> Fut, 485 Fut: Future, 486 Self: Sized, 487 { 488 assert_stream::<Fut::Output, _>(Then::new(self, f)) 489 } 490 491 /// Transforms a stream into a collection, returning a 492 /// future representing the result of that computation. 493 /// 494 /// The returned future will be resolved when the stream terminates. 495 /// 496 /// # Examples 497 /// 498 /// ``` 499 /// # futures::executor::block_on(async { 500 /// use futures::channel::mpsc; 501 /// use futures::stream::StreamExt; 502 /// use std::thread; 503 /// 504 /// let (tx, rx) = mpsc::unbounded(); 505 /// 506 /// thread::spawn(move || { 507 /// for i in 1..=5 { 508 /// tx.unbounded_send(i).unwrap(); 509 /// } 510 /// }); 511 /// 512 /// let output = rx.collect::<Vec<i32>>().await; 513 /// assert_eq!(output, vec![1, 2, 3, 4, 5]); 514 /// # }); 515 /// ``` collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C> where Self: Sized,516 fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C> 517 where 518 Self: Sized, 519 { 520 assert_future::<C, _>(Collect::new(self)) 521 } 522 523 /// Converts a stream of pairs into a future, which 524 /// resolves to pair of containers. 525 /// 526 /// `unzip()` produces a future, which resolves to two 527 /// collections: one from the left elements of the pairs, 528 /// and one from the right elements. 529 /// 530 /// The returned future will be resolved when the stream terminates. 531 /// 532 /// # Examples 533 /// 534 /// ``` 535 /// # futures::executor::block_on(async { 536 /// use futures::channel::mpsc; 537 /// use futures::stream::StreamExt; 538 /// use std::thread; 539 /// 540 /// let (tx, rx) = mpsc::unbounded(); 541 /// 542 /// thread::spawn(move || { 543 /// tx.unbounded_send((1, 2)).unwrap(); 544 /// tx.unbounded_send((3, 4)).unwrap(); 545 /// tx.unbounded_send((5, 6)).unwrap(); 546 /// }); 547 /// 548 /// let (o1, o2): (Vec<_>, Vec<_>) = rx.unzip().await; 549 /// assert_eq!(o1, vec![1, 3, 5]); 550 /// assert_eq!(o2, vec![2, 4, 6]); 551 /// # }); 552 /// ``` unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB> where FromA: Default + Extend<A>, FromB: Default + Extend<B>, Self: Sized + Stream<Item = (A, B)>,553 fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB> 554 where 555 FromA: Default + Extend<A>, 556 FromB: Default + Extend<B>, 557 Self: Sized + Stream<Item = (A, B)>, 558 { 559 assert_future::<(FromA, FromB), _>(Unzip::new(self)) 560 } 561 562 /// Concatenate all items of a stream into a single extendable 563 /// destination, returning a future representing the end result. 564 /// 565 /// This combinator will extend the first item with the contents 566 /// of all the subsequent results of the stream. If the stream is 567 /// empty, the default value will be returned. 568 /// 569 /// Works with all collections that implement the 570 /// [`Extend`](std::iter::Extend) trait. 571 /// 572 /// # Examples 573 /// 574 /// ``` 575 /// # futures::executor::block_on(async { 576 /// use futures::channel::mpsc; 577 /// use futures::stream::StreamExt; 578 /// use std::thread; 579 /// 580 /// let (tx, rx) = mpsc::unbounded(); 581 /// 582 /// thread::spawn(move || { 583 /// for i in (0..3).rev() { 584 /// let n = i * 3; 585 /// tx.unbounded_send(vec![n + 1, n + 2, n + 3]).unwrap(); 586 /// } 587 /// }); 588 /// 589 /// let result = rx.concat().await; 590 /// 591 /// assert_eq!(result, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]); 592 /// # }); 593 /// ``` concat(self) -> Concat<Self> where Self: Sized, Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,594 fn concat(self) -> Concat<Self> 595 where 596 Self: Sized, 597 Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default, 598 { 599 assert_future::<Self::Item, _>(Concat::new(self)) 600 } 601 602 /// Drives the stream to completion, counting the number of items. 603 /// 604 /// # Overflow Behavior 605 /// 606 /// The method does no guarding against overflows, so counting elements of a 607 /// stream with more than [`usize::MAX`] elements either produces the wrong 608 /// result or panics. If debug assertions are enabled, a panic is guaranteed. 609 /// 610 /// # Panics 611 /// 612 /// This function might panic if the iterator has more than [`usize::MAX`] 613 /// elements. 614 /// 615 /// # Examples 616 /// 617 /// ``` 618 /// # futures::executor::block_on(async { 619 /// use futures::stream::{self, StreamExt}; 620 /// 621 /// let stream = stream::iter(1..=10); 622 /// let count = stream.count().await; 623 /// 624 /// assert_eq!(count, 10); 625 /// # }); 626 /// ``` count(self) -> Count<Self> where Self: Sized,627 fn count(self) -> Count<Self> 628 where 629 Self: Sized, 630 { 631 assert_future::<usize, _>(Count::new(self)) 632 } 633 634 /// Repeats a stream endlessly. 635 /// 636 /// The stream never terminates. Note that you likely want to avoid 637 /// usage of `collect` or such on the returned stream as it will exhaust 638 /// available memory as it tries to just fill up all RAM. 639 /// 640 /// # Examples 641 /// 642 /// ``` 643 /// # futures::executor::block_on(async { 644 /// use futures::stream::{self, StreamExt}; 645 /// let a = [1, 2, 3]; 646 /// let mut s = stream::iter(a.iter()).cycle(); 647 /// 648 /// assert_eq!(s.next().await, Some(&1)); 649 /// assert_eq!(s.next().await, Some(&2)); 650 /// assert_eq!(s.next().await, Some(&3)); 651 /// assert_eq!(s.next().await, Some(&1)); 652 /// assert_eq!(s.next().await, Some(&2)); 653 /// assert_eq!(s.next().await, Some(&3)); 654 /// assert_eq!(s.next().await, Some(&1)); 655 /// # }); 656 /// ``` cycle(self) -> Cycle<Self> where Self: Sized + Clone,657 fn cycle(self) -> Cycle<Self> 658 where 659 Self: Sized + Clone, 660 { 661 assert_stream::<Self::Item, _>(Cycle::new(self)) 662 } 663 664 /// Execute an accumulating asynchronous computation over a stream, 665 /// collecting all the values into one final result. 666 /// 667 /// This combinator will accumulate all values returned by this stream 668 /// according to the closure provided. The initial state is also provided to 669 /// this method and then is returned again by each execution of the closure. 670 /// Once the entire stream has been exhausted the returned future will 671 /// resolve to this value. 672 /// 673 /// # Examples 674 /// 675 /// ``` 676 /// # futures::executor::block_on(async { 677 /// use futures::stream::{self, StreamExt}; 678 /// 679 /// let number_stream = stream::iter(0..6); 680 /// let sum = number_stream.fold(0, |acc, x| async move { acc + x }); 681 /// assert_eq!(sum.await, 15); 682 /// # }); 683 /// ``` fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> where F: FnMut(T, Self::Item) -> Fut, Fut: Future<Output = T>, Self: Sized,684 fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> 685 where 686 F: FnMut(T, Self::Item) -> Fut, 687 Fut: Future<Output = T>, 688 Self: Sized, 689 { 690 assert_future::<T, _>(Fold::new(self, f, init)) 691 } 692 693 /// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate. 694 /// 695 /// # Examples 696 /// 697 /// ``` 698 /// # futures::executor::block_on(async { 699 /// use futures::stream::{self, StreamExt}; 700 /// 701 /// let number_stream = stream::iter(0..10); 702 /// let contain_three = number_stream.any(|i| async move { i == 3 }); 703 /// assert_eq!(contain_three.await, true); 704 /// # }); 705 /// ``` any<Fut, F>(self, f: F) -> Any<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,706 fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F> 707 where 708 F: FnMut(Self::Item) -> Fut, 709 Fut: Future<Output = bool>, 710 Self: Sized, 711 { 712 assert_future::<bool, _>(Any::new(self, f)) 713 } 714 715 /// Execute predicate over asynchronous stream, and return `true` if all element in stream satisfied a predicate. 716 /// 717 /// # Examples 718 /// 719 /// ``` 720 /// # futures::executor::block_on(async { 721 /// use futures::stream::{self, StreamExt}; 722 /// 723 /// let number_stream = stream::iter(0..10); 724 /// let less_then_twenty = number_stream.all(|i| async move { i < 20 }); 725 /// assert_eq!(less_then_twenty.await, true); 726 /// # }); 727 /// ``` all<Fut, F>(self, f: F) -> All<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,728 fn all<Fut, F>(self, f: F) -> All<Self, Fut, F> 729 where 730 F: FnMut(Self::Item) -> Fut, 731 Fut: Future<Output = bool>, 732 Self: Sized, 733 { 734 assert_future::<bool, _>(All::new(self, f)) 735 } 736 737 /// Flattens a stream of streams into just one continuous stream. 738 /// 739 /// # Examples 740 /// 741 /// ``` 742 /// # futures::executor::block_on(async { 743 /// use futures::channel::mpsc; 744 /// use futures::stream::StreamExt; 745 /// use std::thread; 746 /// 747 /// let (tx1, rx1) = mpsc::unbounded(); 748 /// let (tx2, rx2) = mpsc::unbounded(); 749 /// let (tx3, rx3) = mpsc::unbounded(); 750 /// 751 /// thread::spawn(move || { 752 /// tx1.unbounded_send(1).unwrap(); 753 /// tx1.unbounded_send(2).unwrap(); 754 /// }); 755 /// thread::spawn(move || { 756 /// tx2.unbounded_send(3).unwrap(); 757 /// tx2.unbounded_send(4).unwrap(); 758 /// }); 759 /// thread::spawn(move || { 760 /// tx3.unbounded_send(rx1).unwrap(); 761 /// tx3.unbounded_send(rx2).unwrap(); 762 /// }); 763 /// 764 /// let output = rx3.flatten().collect::<Vec<i32>>().await; 765 /// assert_eq!(output, vec![1, 2, 3, 4]); 766 /// # }); 767 /// ``` flatten(self) -> Flatten<Self> where Self::Item: Stream, Self: Sized,768 fn flatten(self) -> Flatten<Self> 769 where 770 Self::Item: Stream, 771 Self: Sized, 772 { 773 assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self)) 774 } 775 776 /// Flattens a stream of streams into just one continuous stream. Polls 777 /// inner streams concurrently. 778 /// 779 /// # Examples 780 /// 781 /// ``` 782 /// # futures::executor::block_on(async { 783 /// use futures::channel::mpsc; 784 /// use futures::stream::StreamExt; 785 /// use std::thread; 786 /// 787 /// let (tx1, rx1) = mpsc::unbounded(); 788 /// let (tx2, rx2) = mpsc::unbounded(); 789 /// let (tx3, rx3) = mpsc::unbounded(); 790 /// 791 /// thread::spawn(move || { 792 /// tx1.unbounded_send(1).unwrap(); 793 /// tx1.unbounded_send(2).unwrap(); 794 /// }); 795 /// thread::spawn(move || { 796 /// tx2.unbounded_send(3).unwrap(); 797 /// tx2.unbounded_send(4).unwrap(); 798 /// }); 799 /// thread::spawn(move || { 800 /// tx3.unbounded_send(rx1).unwrap(); 801 /// tx3.unbounded_send(rx2).unwrap(); 802 /// }); 803 /// 804 /// let mut output = rx3.flatten_unordered(None).collect::<Vec<i32>>().await; 805 /// output.sort(); 806 /// 807 /// assert_eq!(output, vec![1, 2, 3, 4]); 808 /// # }); 809 /// ``` 810 #[cfg(not(futures_no_atomic_cas))] 811 #[cfg(feature = "alloc")] flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self> where Self::Item: Stream + Unpin, Self: Sized,812 fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self> 813 where 814 Self::Item: Stream + Unpin, 815 Self: Sized, 816 { 817 FlattenUnordered::new(self, limit.into()) 818 } 819 820 /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s. 821 /// 822 /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead, 823 /// you would have to chain combinators like `.map(f).flatten()` while this 824 /// combinator provides ability to write `.flat_map(f)` instead of chaining. 825 /// 826 /// The provided closure which produces inner streams is executed over all elements 827 /// of stream as last inner stream is terminated and next stream item is available. 828 /// 829 /// Note that this function consumes the stream passed into it and returns a 830 /// wrapped version of it, similar to the existing `flat_map` methods in the 831 /// standard library. 832 /// 833 /// # Examples 834 /// 835 /// ``` 836 /// # futures::executor::block_on(async { 837 /// use futures::stream::{self, StreamExt}; 838 /// 839 /// let stream = stream::iter(1..=3); 840 /// let stream = stream.flat_map(|x| stream::iter(vec![x + 3; x])); 841 /// 842 /// assert_eq!(vec![4, 5, 5, 6, 6, 6], stream.collect::<Vec<_>>().await); 843 /// # }); 844 /// ``` flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> where F: FnMut(Self::Item) -> U, U: Stream, Self: Sized,845 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> 846 where 847 F: FnMut(Self::Item) -> U, 848 U: Stream, 849 Self: Sized, 850 { 851 assert_stream::<U::Item, _>(FlatMap::new(self, f)) 852 } 853 854 /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s 855 /// and polls them concurrently, yielding items in any order, as they made 856 /// available. 857 /// 858 /// [`StreamExt::map`] is very useful, but if it produces `Stream`s 859 /// instead, and you need to poll all of them concurrently, you would 860 /// have to use something like `for_each_concurrent` and merge values 861 /// by hand. This combinator provides ability to collect all values 862 /// from concurrently polled streams into one stream. 863 /// 864 /// The first argument is an optional limit on the number of concurrently 865 /// polled streams. If this limit is not `None`, no more than `limit` streams 866 /// will be polled concurrently. The `limit` argument is of type 867 /// `Into<Option<usize>>`, and so can be provided as either `None`, 868 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as 869 /// no limit at all, and will have the same result as passing in `None`. 870 /// 871 /// The provided closure which produces inner streams is executed over 872 /// all elements of stream as next stream item is available and limit 873 /// of concurrently processed streams isn't exceeded. 874 /// 875 /// Note that this function consumes the stream passed into it and 876 /// returns a wrapped version of it. 877 /// 878 /// # Examples 879 /// 880 /// ``` 881 /// # futures::executor::block_on(async { 882 /// use futures::stream::{self, StreamExt}; 883 /// 884 /// let stream = stream::iter(1..5); 885 /// let stream = stream.flat_map_unordered(1, |x| stream::iter(vec![x; x])); 886 /// let mut values = stream.collect::<Vec<_>>().await; 887 /// values.sort(); 888 /// 889 /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values); 890 /// # }); 891 /// ``` 892 #[cfg(not(futures_no_atomic_cas))] 893 #[cfg(feature = "alloc")] flat_map_unordered<U, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> FlatMapUnordered<Self, U, F> where U: Stream + Unpin, F: FnMut(Self::Item) -> U, Self: Sized,894 fn flat_map_unordered<U, F>( 895 self, 896 limit: impl Into<Option<usize>>, 897 f: F, 898 ) -> FlatMapUnordered<Self, U, F> 899 where 900 U: Stream + Unpin, 901 F: FnMut(Self::Item) -> U, 902 Self: Sized, 903 { 904 FlatMapUnordered::new(self, limit.into(), f) 905 } 906 907 /// Combinator similar to [`StreamExt::fold`] that holds internal state 908 /// and produces a new stream. 909 /// 910 /// Accepts initial state and closure which will be applied to each element 911 /// of the stream until provided closure returns `None`. Once `None` is 912 /// returned, stream will be terminated. 913 /// 914 /// # Examples 915 /// 916 /// ``` 917 /// # futures::executor::block_on(async { 918 /// use futures::future; 919 /// use futures::stream::{self, StreamExt}; 920 /// 921 /// let stream = stream::iter(1..=10); 922 /// 923 /// let stream = stream.scan(0, |state, x| { 924 /// *state += x; 925 /// future::ready(if *state < 10 { Some(x) } else { None }) 926 /// }); 927 /// 928 /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await); 929 /// # }); 930 /// ``` scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> where F: FnMut(&mut S, Self::Item) -> Fut, Fut: Future<Output = Option<B>>, Self: Sized,931 fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> 932 where 933 F: FnMut(&mut S, Self::Item) -> Fut, 934 Fut: Future<Output = Option<B>>, 935 Self: Sized, 936 { 937 assert_stream::<B, _>(Scan::new(self, initial_state, f)) 938 } 939 940 /// Skip elements on this stream while the provided asynchronous predicate 941 /// resolves to `true`. 942 /// 943 /// This function, like `Iterator::skip_while`, will skip elements on the 944 /// stream until the predicate `f` resolves to `false`. Once one element 945 /// returns `false`, all future elements will be returned from the underlying 946 /// stream. 947 /// 948 /// # Examples 949 /// 950 /// ``` 951 /// # futures::executor::block_on(async { 952 /// use futures::future; 953 /// use futures::stream::{self, StreamExt}; 954 /// 955 /// let stream = stream::iter(1..=10); 956 /// 957 /// let stream = stream.skip_while(|x| future::ready(*x <= 5)); 958 /// 959 /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await); 960 /// # }); 961 /// ``` skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,962 fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> 963 where 964 F: FnMut(&Self::Item) -> Fut, 965 Fut: Future<Output = bool>, 966 Self: Sized, 967 { 968 assert_stream::<Self::Item, _>(SkipWhile::new(self, f)) 969 } 970 971 /// Take elements from this stream while the provided asynchronous predicate 972 /// resolves to `true`. 973 /// 974 /// This function, like `Iterator::take_while`, will take elements from the 975 /// stream until the predicate `f` resolves to `false`. Once one element 976 /// returns `false`, it will always return that the stream is done. 977 /// 978 /// # Examples 979 /// 980 /// ``` 981 /// # futures::executor::block_on(async { 982 /// use futures::future; 983 /// use futures::stream::{self, StreamExt}; 984 /// 985 /// let stream = stream::iter(1..=10); 986 /// 987 /// let stream = stream.take_while(|x| future::ready(*x <= 5)); 988 /// 989 /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await); 990 /// # }); 991 /// ``` take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,992 fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> 993 where 994 F: FnMut(&Self::Item) -> Fut, 995 Fut: Future<Output = bool>, 996 Self: Sized, 997 { 998 assert_stream::<Self::Item, _>(TakeWhile::new(self, f)) 999 } 1000 1001 /// Take elements from this stream until the provided future resolves. 1002 /// 1003 /// This function will take elements from the stream until the provided 1004 /// stopping future `fut` resolves. Once the `fut` future becomes ready, 1005 /// this stream combinator will always return that the stream is done. 1006 /// 1007 /// The stopping future may return any type. Once the stream is stopped 1008 /// the result of the stopping future may be accessed with `TakeUntil::take_result()`. 1009 /// The stream may also be resumed with `TakeUntil::take_future()`. 1010 /// See the documentation of [`TakeUntil`] for more information. 1011 /// 1012 /// # Examples 1013 /// 1014 /// ``` 1015 /// # futures::executor::block_on(async { 1016 /// use futures::future; 1017 /// use futures::stream::{self, StreamExt}; 1018 /// use futures::task::Poll; 1019 /// 1020 /// let stream = stream::iter(1..=10); 1021 /// 1022 /// let mut i = 0; 1023 /// let stop_fut = future::poll_fn(|_cx| { 1024 /// i += 1; 1025 /// if i <= 5 { 1026 /// Poll::Pending 1027 /// } else { 1028 /// Poll::Ready(()) 1029 /// } 1030 /// }); 1031 /// 1032 /// let stream = stream.take_until(stop_fut); 1033 /// 1034 /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await); 1035 /// # }); 1036 /// ``` take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> where Fut: Future, Self: Sized,1037 fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> 1038 where 1039 Fut: Future, 1040 Self: Sized, 1041 { 1042 assert_stream::<Self::Item, _>(TakeUntil::new(self, fut)) 1043 } 1044 1045 /// Runs this stream to completion, executing the provided asynchronous 1046 /// closure for each element on the stream. 1047 /// 1048 /// The closure provided will be called for each item this stream produces, 1049 /// yielding a future. That future will then be executed to completion 1050 /// before moving on to the next item. 1051 /// 1052 /// The returned value is a `Future` where the `Output` type is `()`; it is 1053 /// executed entirely for its side effects. 1054 /// 1055 /// To process each item in the stream and produce another stream instead 1056 /// of a single future, use `then` instead. 1057 /// 1058 /// # Examples 1059 /// 1060 /// ``` 1061 /// # futures::executor::block_on(async { 1062 /// use futures::future; 1063 /// use futures::stream::{self, StreamExt}; 1064 /// 1065 /// let mut x = 0; 1066 /// 1067 /// { 1068 /// let fut = stream::repeat(1).take(3).for_each(|item| { 1069 /// x += item; 1070 /// future::ready(()) 1071 /// }); 1072 /// fut.await; 1073 /// } 1074 /// 1075 /// assert_eq!(x, 3); 1076 /// # }); 1077 /// ``` for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,1078 fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> 1079 where 1080 F: FnMut(Self::Item) -> Fut, 1081 Fut: Future<Output = ()>, 1082 Self: Sized, 1083 { 1084 assert_future::<(), _>(ForEach::new(self, f)) 1085 } 1086 1087 /// Runs this stream to completion, executing the provided asynchronous 1088 /// closure for each element on the stream concurrently as elements become 1089 /// available. 1090 /// 1091 /// This is similar to [`StreamExt::for_each`], but the futures 1092 /// produced by the closure are run concurrently (but not in parallel-- 1093 /// this combinator does not introduce any threads). 1094 /// 1095 /// The closure provided will be called for each item this stream produces, 1096 /// yielding a future. That future will then be executed to completion 1097 /// concurrently with the other futures produced by the closure. 1098 /// 1099 /// The first argument is an optional limit on the number of concurrent 1100 /// futures. If this limit is not `None`, no more than `limit` futures 1101 /// will be run concurrently. The `limit` argument is of type 1102 /// `Into<Option<usize>>`, and so can be provided as either `None`, 1103 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as 1104 /// no limit at all, and will have the same result as passing in `None`. 1105 /// 1106 /// This method is only available when the `std` or `alloc` feature of this 1107 /// library is activated, and it is activated by default. 1108 /// 1109 /// # Examples 1110 /// 1111 /// ``` 1112 /// # futures::executor::block_on(async { 1113 /// use futures::channel::oneshot; 1114 /// use futures::stream::{self, StreamExt}; 1115 /// 1116 /// let (tx1, rx1) = oneshot::channel(); 1117 /// let (tx2, rx2) = oneshot::channel(); 1118 /// let (tx3, rx3) = oneshot::channel(); 1119 /// 1120 /// let fut = stream::iter(vec![rx1, rx2, rx3]).for_each_concurrent( 1121 /// /* limit */ 2, 1122 /// |rx| async move { 1123 /// rx.await.unwrap(); 1124 /// } 1125 /// ); 1126 /// tx1.send(()).unwrap(); 1127 /// tx2.send(()).unwrap(); 1128 /// tx3.send(()).unwrap(); 1129 /// fut.await; 1130 /// # }) 1131 /// ``` 1132 #[cfg(not(futures_no_atomic_cas))] 1133 #[cfg(feature = "alloc")] for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> ForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,1134 fn for_each_concurrent<Fut, F>( 1135 self, 1136 limit: impl Into<Option<usize>>, 1137 f: F, 1138 ) -> ForEachConcurrent<Self, Fut, F> 1139 where 1140 F: FnMut(Self::Item) -> Fut, 1141 Fut: Future<Output = ()>, 1142 Self: Sized, 1143 { 1144 assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f)) 1145 } 1146 1147 /// Creates a new stream of at most `n` items of the underlying stream. 1148 /// 1149 /// Once `n` items have been yielded from this stream then it will always 1150 /// return that the stream is done. 1151 /// 1152 /// # Examples 1153 /// 1154 /// ``` 1155 /// # futures::executor::block_on(async { 1156 /// use futures::stream::{self, StreamExt}; 1157 /// 1158 /// let stream = stream::iter(1..=10).take(3); 1159 /// 1160 /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await); 1161 /// # }); 1162 /// ``` take(self, n: usize) -> Take<Self> where Self: Sized,1163 fn take(self, n: usize) -> Take<Self> 1164 where 1165 Self: Sized, 1166 { 1167 assert_stream::<Self::Item, _>(Take::new(self, n)) 1168 } 1169 1170 /// Creates a new stream which skips `n` items of the underlying stream. 1171 /// 1172 /// Once `n` items have been skipped from this stream then it will always 1173 /// return the remaining items on this stream. 1174 /// 1175 /// # Examples 1176 /// 1177 /// ``` 1178 /// # futures::executor::block_on(async { 1179 /// use futures::stream::{self, StreamExt}; 1180 /// 1181 /// let stream = stream::iter(1..=10).skip(5); 1182 /// 1183 /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await); 1184 /// # }); 1185 /// ``` skip(self, n: usize) -> Skip<Self> where Self: Sized,1186 fn skip(self, n: usize) -> Skip<Self> 1187 where 1188 Self: Sized, 1189 { 1190 assert_stream::<Self::Item, _>(Skip::new(self, n)) 1191 } 1192 1193 /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never 1194 /// again be called once it has finished. This method can be used to turn 1195 /// any `Stream` into a `FusedStream`. 1196 /// 1197 /// Normally, once a stream has returned [`None`] from 1198 /// [`poll_next`](Stream::poll_next) any further calls could exhibit bad 1199 /// behavior such as block forever, panic, never return, etc. If it is known 1200 /// that [`poll_next`](Stream::poll_next) may be called after stream 1201 /// has already finished, then this method can be used to ensure that it has 1202 /// defined semantics. 1203 /// 1204 /// The [`poll_next`](Stream::poll_next) method of a `fuse`d stream 1205 /// is guaranteed to return [`None`] after the underlying stream has 1206 /// finished. 1207 /// 1208 /// # Examples 1209 /// 1210 /// ``` 1211 /// use futures::executor::block_on_stream; 1212 /// use futures::stream::{self, StreamExt}; 1213 /// use futures::task::Poll; 1214 /// 1215 /// let mut x = 0; 1216 /// let stream = stream::poll_fn(|_| { 1217 /// x += 1; 1218 /// match x { 1219 /// 0..=2 => Poll::Ready(Some(x)), 1220 /// 3 => Poll::Ready(None), 1221 /// _ => panic!("should not happen") 1222 /// } 1223 /// }).fuse(); 1224 /// 1225 /// let mut iter = block_on_stream(stream); 1226 /// assert_eq!(Some(1), iter.next()); 1227 /// assert_eq!(Some(2), iter.next()); 1228 /// assert_eq!(None, iter.next()); 1229 /// assert_eq!(None, iter.next()); 1230 /// // ... 1231 /// ``` fuse(self) -> Fuse<Self> where Self: Sized,1232 fn fuse(self) -> Fuse<Self> 1233 where 1234 Self: Sized, 1235 { 1236 assert_stream::<Self::Item, _>(Fuse::new(self)) 1237 } 1238 1239 /// Borrows a stream, rather than consuming it. 1240 /// 1241 /// This is useful to allow applying stream adaptors while still retaining 1242 /// ownership of the original stream. 1243 /// 1244 /// # Examples 1245 /// 1246 /// ``` 1247 /// # futures::executor::block_on(async { 1248 /// use futures::stream::{self, StreamExt}; 1249 /// 1250 /// let mut stream = stream::iter(1..5); 1251 /// 1252 /// let sum = stream.by_ref() 1253 /// .take(2) 1254 /// .fold(0, |a, b| async move { a + b }) 1255 /// .await; 1256 /// assert_eq!(sum, 3); 1257 /// 1258 /// // You can use the stream again 1259 /// let sum = stream.take(2) 1260 /// .fold(0, |a, b| async move { a + b }) 1261 /// .await; 1262 /// assert_eq!(sum, 7); 1263 /// # }); 1264 /// ``` by_ref(&mut self) -> &mut Self1265 fn by_ref(&mut self) -> &mut Self { 1266 self 1267 } 1268 1269 /// Catches unwinding panics while polling the stream. 1270 /// 1271 /// Caught panic (if any) will be the last element of the resulting stream. 1272 /// 1273 /// In general, panics within a stream can propagate all the way out to the 1274 /// task level. This combinator makes it possible to halt unwinding within 1275 /// the stream itself. It's most commonly used within task executors. This 1276 /// method should not be used for error handling. 1277 /// 1278 /// Note that this method requires the `UnwindSafe` bound from the standard 1279 /// library. This isn't always applied automatically, and the standard 1280 /// library provides an `AssertUnwindSafe` wrapper type to apply it 1281 /// after-the fact. To assist using this method, the [`Stream`] trait is 1282 /// also implemented for `AssertUnwindSafe<St>` where `St` implements 1283 /// [`Stream`]. 1284 /// 1285 /// This method is only available when the `std` feature of this 1286 /// library is activated, and it is activated by default. 1287 /// 1288 /// # Examples 1289 /// 1290 /// ``` 1291 /// # futures::executor::block_on(async { 1292 /// use futures::stream::{self, StreamExt}; 1293 /// 1294 /// let stream = stream::iter(vec![Some(10), None, Some(11)]); 1295 /// // Panic on second element 1296 /// let stream_panicking = stream.map(|o| o.unwrap()); 1297 /// // Collect all the results 1298 /// let stream = stream_panicking.catch_unwind(); 1299 /// 1300 /// let results: Vec<Result<i32, _>> = stream.collect().await; 1301 /// match results[0] { 1302 /// Ok(10) => {} 1303 /// _ => panic!("unexpected result!"), 1304 /// } 1305 /// assert!(results[1].is_err()); 1306 /// assert_eq!(results.len(), 2); 1307 /// # }); 1308 /// ``` 1309 #[cfg(feature = "std")] catch_unwind(self) -> CatchUnwind<Self> where Self: Sized + std::panic::UnwindSafe,1310 fn catch_unwind(self) -> CatchUnwind<Self> 1311 where 1312 Self: Sized + std::panic::UnwindSafe, 1313 { 1314 assert_stream(CatchUnwind::new(self)) 1315 } 1316 1317 /// Wrap the stream in a Box, pinning it. 1318 /// 1319 /// This method is only available when the `std` or `alloc` feature of this 1320 /// library is activated, and it is activated by default. 1321 #[cfg(feature = "alloc")] boxed<'a>(self) -> BoxStream<'a, Self::Item> where Self: Sized + Send + 'a,1322 fn boxed<'a>(self) -> BoxStream<'a, Self::Item> 1323 where 1324 Self: Sized + Send + 'a, 1325 { 1326 assert_stream::<Self::Item, _>(Box::pin(self)) 1327 } 1328 1329 /// Wrap the stream in a Box, pinning it. 1330 /// 1331 /// Similar to `boxed`, but without the `Send` requirement. 1332 /// 1333 /// This method is only available when the `std` or `alloc` feature of this 1334 /// library is activated, and it is activated by default. 1335 #[cfg(feature = "alloc")] boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item> where Self: Sized + 'a,1336 fn boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item> 1337 where 1338 Self: Sized + 'a, 1339 { 1340 assert_stream::<Self::Item, _>(Box::pin(self)) 1341 } 1342 1343 /// An adaptor for creating a buffered list of pending futures. 1344 /// 1345 /// If this stream's item can be converted into a future, then this adaptor 1346 /// will buffer up to at most `n` futures and then return the outputs in the 1347 /// same order as the underlying stream. No more than `n` futures will be 1348 /// buffered at any point in time, and less than `n` may also be buffered 1349 /// depending on the state of each future. 1350 /// 1351 /// The returned stream will be a stream of each future's output. 1352 /// 1353 /// This method is only available when the `std` or `alloc` feature of this 1354 /// library is activated, and it is activated by default. 1355 #[cfg(not(futures_no_atomic_cas))] 1356 #[cfg(feature = "alloc")] buffered(self, n: usize) -> Buffered<Self> where Self::Item: Future, Self: Sized,1357 fn buffered(self, n: usize) -> Buffered<Self> 1358 where 1359 Self::Item: Future, 1360 Self: Sized, 1361 { 1362 assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n)) 1363 } 1364 1365 /// An adaptor for creating a buffered list of pending futures (unordered). 1366 /// 1367 /// If this stream's item can be converted into a future, then this adaptor 1368 /// will buffer up to `n` futures and then return the outputs in the order 1369 /// in which they complete. No more than `n` futures will be buffered at 1370 /// any point in time, and less than `n` may also be buffered depending on 1371 /// the state of each future. 1372 /// 1373 /// The returned stream will be a stream of each future's output. 1374 /// 1375 /// This method is only available when the `std` or `alloc` feature of this 1376 /// library is activated, and it is activated by default. 1377 /// 1378 /// # Examples 1379 /// 1380 /// ``` 1381 /// # futures::executor::block_on(async { 1382 /// use futures::channel::oneshot; 1383 /// use futures::stream::{self, StreamExt}; 1384 /// 1385 /// let (send_one, recv_one) = oneshot::channel(); 1386 /// let (send_two, recv_two) = oneshot::channel(); 1387 /// 1388 /// let stream_of_futures = stream::iter(vec![recv_one, recv_two]); 1389 /// let mut buffered = stream_of_futures.buffer_unordered(10); 1390 /// 1391 /// send_two.send(2i32)?; 1392 /// assert_eq!(buffered.next().await, Some(Ok(2i32))); 1393 /// 1394 /// send_one.send(1i32)?; 1395 /// assert_eq!(buffered.next().await, Some(Ok(1i32))); 1396 /// 1397 /// assert_eq!(buffered.next().await, None); 1398 /// # Ok::<(), i32>(()) }).unwrap(); 1399 /// ``` 1400 #[cfg(not(futures_no_atomic_cas))] 1401 #[cfg(feature = "alloc")] buffer_unordered(self, n: usize) -> BufferUnordered<Self> where Self::Item: Future, Self: Sized,1402 fn buffer_unordered(self, n: usize) -> BufferUnordered<Self> 1403 where 1404 Self::Item: Future, 1405 Self: Sized, 1406 { 1407 assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n)) 1408 } 1409 1410 /// An adapter for zipping two streams together. 1411 /// 1412 /// The zipped stream waits for both streams to produce an item, and then 1413 /// returns that pair. If either stream ends then the zipped stream will 1414 /// also end. 1415 /// 1416 /// # Examples 1417 /// 1418 /// ``` 1419 /// # futures::executor::block_on(async { 1420 /// use futures::stream::{self, StreamExt}; 1421 /// 1422 /// let stream1 = stream::iter(1..=3); 1423 /// let stream2 = stream::iter(5..=10); 1424 /// 1425 /// let vec = stream1.zip(stream2) 1426 /// .collect::<Vec<_>>() 1427 /// .await; 1428 /// assert_eq!(vec![(1, 5), (2, 6), (3, 7)], vec); 1429 /// # }); 1430 /// ``` 1431 /// zip<St>(self, other: St) -> Zip<Self, St> where St: Stream, Self: Sized,1432 fn zip<St>(self, other: St) -> Zip<Self, St> 1433 where 1434 St: Stream, 1435 Self: Sized, 1436 { 1437 assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other)) 1438 } 1439 1440 /// Adapter for chaining two streams. 1441 /// 1442 /// The resulting stream emits elements from the first stream, and when 1443 /// first stream reaches the end, emits the elements from the second stream. 1444 /// 1445 /// ``` 1446 /// # futures::executor::block_on(async { 1447 /// use futures::stream::{self, StreamExt}; 1448 /// 1449 /// let stream1 = stream::iter(vec![Ok(10), Err(false)]); 1450 /// let stream2 = stream::iter(vec![Err(true), Ok(20)]); 1451 /// 1452 /// let stream = stream1.chain(stream2); 1453 /// 1454 /// let result: Vec<_> = stream.collect().await; 1455 /// assert_eq!(result, vec![ 1456 /// Ok(10), 1457 /// Err(false), 1458 /// Err(true), 1459 /// Ok(20), 1460 /// ]); 1461 /// # }); 1462 /// ``` chain<St>(self, other: St) -> Chain<Self, St> where St: Stream<Item = Self::Item>, Self: Sized,1463 fn chain<St>(self, other: St) -> Chain<Self, St> 1464 where 1465 St: Stream<Item = Self::Item>, 1466 Self: Sized, 1467 { 1468 assert_stream::<Self::Item, _>(Chain::new(self, other)) 1469 } 1470 1471 /// Creates a new stream which exposes a `peek` method. 1472 /// 1473 /// Calling `peek` returns a reference to the next item in the stream. peekable(self) -> Peekable<Self> where Self: Sized,1474 fn peekable(self) -> Peekable<Self> 1475 where 1476 Self: Sized, 1477 { 1478 assert_stream::<Self::Item, _>(Peekable::new(self)) 1479 } 1480 1481 /// An adaptor for chunking up items of the stream inside a vector. 1482 /// 1483 /// This combinator will attempt to pull items from this stream and buffer 1484 /// them into a local vector. At most `capacity` items will get buffered 1485 /// before they're yielded from the returned stream. 1486 /// 1487 /// Note that the vectors returned from this iterator may not always have 1488 /// `capacity` elements. If the underlying stream ended and only a partial 1489 /// vector was created, it'll be returned. Additionally if an error happens 1490 /// from the underlying stream then the currently buffered items will be 1491 /// yielded. 1492 /// 1493 /// This method is only available when the `std` or `alloc` feature of this 1494 /// library is activated, and it is activated by default. 1495 /// 1496 /// # Panics 1497 /// 1498 /// This method will panic if `capacity` is zero. 1499 #[cfg(feature = "alloc")] chunks(self, capacity: usize) -> Chunks<Self> where Self: Sized,1500 fn chunks(self, capacity: usize) -> Chunks<Self> 1501 where 1502 Self: Sized, 1503 { 1504 assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity)) 1505 } 1506 1507 /// An adaptor for chunking up ready items of the stream inside a vector. 1508 /// 1509 /// This combinator will attempt to pull ready items from this stream and 1510 /// buffer them into a local vector. At most `capacity` items will get 1511 /// buffered before they're yielded from the returned stream. If underlying 1512 /// stream returns `Poll::Pending`, and collected chunk is not empty, it will 1513 /// be immediately returned. 1514 /// 1515 /// If the underlying stream ended and only a partial vector was created, 1516 /// it'll be returned. Additionally if an error happens from the underlying 1517 /// stream then the currently buffered items will be yielded. 1518 /// 1519 /// This method is only available when the `std` or `alloc` feature of this 1520 /// library is activated, and it is activated by default. 1521 /// 1522 /// # Panics 1523 /// 1524 /// This method will panic if `capacity` is zero. 1525 #[cfg(feature = "alloc")] ready_chunks(self, capacity: usize) -> ReadyChunks<Self> where Self: Sized,1526 fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self> 1527 where 1528 Self: Sized, 1529 { 1530 assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity)) 1531 } 1532 1533 /// A future that completes after the given stream has been fully processed 1534 /// into the sink and the sink has been flushed and closed. 1535 /// 1536 /// This future will drive the stream to keep producing items until it is 1537 /// exhausted, sending each item to the sink. It will complete once the 1538 /// stream is exhausted, the sink has received and flushed all items, and 1539 /// the sink is closed. Note that neither the original stream nor provided 1540 /// sink will be output by this future. Pass the sink by `Pin<&mut S>` 1541 /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in 1542 /// order to preserve access to the `Sink`. If the stream produces an error, 1543 /// that error will be returned by this future without flushing/closing the sink. 1544 #[cfg(feature = "sink")] 1545 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] forward<S>(self, sink: S) -> Forward<Self, S> where S: Sink<Self::Ok, Error = Self::Error>, Self: TryStream + Sized,1546 fn forward<S>(self, sink: S) -> Forward<Self, S> 1547 where 1548 S: Sink<Self::Ok, Error = Self::Error>, 1549 Self: TryStream + Sized, 1550 // Self: TryStream + Sized + Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>, 1551 { 1552 // TODO: type mismatch resolving `<Self as futures_core::Stream>::Item == std::result::Result<<Self as futures_core::TryStream>::Ok, <Self as futures_core::TryStream>::Error>` 1553 // assert_future::<Result<(), Self::Error>, _>(Forward::new(self, sink)) 1554 Forward::new(self, sink) 1555 } 1556 1557 /// Splits this `Stream + Sink` object into separate `Sink` and `Stream` 1558 /// objects. 1559 /// 1560 /// This can be useful when you want to split ownership between tasks, or 1561 /// allow direct interaction between the two objects (e.g. via 1562 /// `Sink::send_all`). 1563 /// 1564 /// This method is only available when the `std` or `alloc` feature of this 1565 /// library is activated, and it is activated by default. 1566 #[cfg(feature = "sink")] 1567 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 1568 #[cfg(not(futures_no_atomic_cas))] 1569 #[cfg(feature = "alloc")] split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) where Self: Sink<Item> + Sized,1570 fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) 1571 where 1572 Self: Sink<Item> + Sized, 1573 { 1574 let (sink, stream) = split::split(self); 1575 ( 1576 crate::sink::assert_sink::<Item, Self::Error, _>(sink), 1577 assert_stream::<Self::Item, _>(stream), 1578 ) 1579 } 1580 1581 /// Do something with each item of this stream, afterwards passing it on. 1582 /// 1583 /// This is similar to the `Iterator::inspect` method in the standard 1584 /// library where it allows easily inspecting each value as it passes 1585 /// through the stream, for example to debug what's going on. inspect<F>(self, f: F) -> Inspect<Self, F> where F: FnMut(&Self::Item), Self: Sized,1586 fn inspect<F>(self, f: F) -> Inspect<Self, F> 1587 where 1588 F: FnMut(&Self::Item), 1589 Self: Sized, 1590 { 1591 assert_stream::<Self::Item, _>(Inspect::new(self, f)) 1592 } 1593 1594 /// Wrap this stream in an `Either` stream, making it the left-hand variant 1595 /// of that `Either`. 1596 /// 1597 /// This can be used in combination with the `right_stream` method to write `if` 1598 /// statements that evaluate to different streams in different branches. left_stream<B>(self) -> Either<Self, B> where B: Stream<Item = Self::Item>, Self: Sized,1599 fn left_stream<B>(self) -> Either<Self, B> 1600 where 1601 B: Stream<Item = Self::Item>, 1602 Self: Sized, 1603 { 1604 assert_stream::<Self::Item, _>(Either::Left(self)) 1605 } 1606 1607 /// Wrap this stream in an `Either` stream, making it the right-hand variant 1608 /// of that `Either`. 1609 /// 1610 /// This can be used in combination with the `left_stream` method to write `if` 1611 /// statements that evaluate to different streams in different branches. right_stream<B>(self) -> Either<B, Self> where B: Stream<Item = Self::Item>, Self: Sized,1612 fn right_stream<B>(self) -> Either<B, Self> 1613 where 1614 B: Stream<Item = Self::Item>, 1615 Self: Sized, 1616 { 1617 assert_stream::<Self::Item, _>(Either::Right(self)) 1618 } 1619 1620 /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`] 1621 /// stream types. poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> where Self: Unpin,1622 fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> 1623 where 1624 Self: Unpin, 1625 { 1626 Pin::new(self).poll_next(cx) 1627 } 1628 1629 /// Returns a [`Future`] that resolves when the next item in this stream is 1630 /// ready. 1631 /// 1632 /// This is similar to the [`next`][StreamExt::next] method, but it won't 1633 /// resolve to [`None`] if used on an empty [`Stream`]. Instead, the 1634 /// returned future type will return `true` from 1635 /// [`FusedFuture::is_terminated`][] when the [`Stream`] is empty, allowing 1636 /// [`select_next_some`][StreamExt::select_next_some] to be easily used with 1637 /// the [`select!`] macro. 1638 /// 1639 /// If the future is polled after this [`Stream`] is empty it will panic. 1640 /// Using the future with a [`FusedFuture`][]-aware primitive like the 1641 /// [`select!`] macro will prevent this. 1642 /// 1643 /// [`FusedFuture`]: futures_core::future::FusedFuture 1644 /// [`FusedFuture::is_terminated`]: futures_core::future::FusedFuture::is_terminated 1645 /// 1646 /// # Examples 1647 /// 1648 /// ``` 1649 /// # futures::executor::block_on(async { 1650 /// use futures::{future, select}; 1651 /// use futures::stream::{StreamExt, FuturesUnordered}; 1652 /// 1653 /// let mut fut = future::ready(1); 1654 /// let mut async_tasks = FuturesUnordered::new(); 1655 /// let mut total = 0; 1656 /// loop { 1657 /// select! { 1658 /// num = fut => { 1659 /// // First, the `ready` future completes. 1660 /// total += num; 1661 /// // Then we spawn a new task onto `async_tasks`, 1662 /// async_tasks.push(async { 5 }); 1663 /// }, 1664 /// // On the next iteration of the loop, the task we spawned 1665 /// // completes. 1666 /// num = async_tasks.select_next_some() => { 1667 /// total += num; 1668 /// } 1669 /// // Finally, both the `ready` future and `async_tasks` have 1670 /// // finished, so we enter the `complete` branch. 1671 /// complete => break, 1672 /// } 1673 /// } 1674 /// assert_eq!(total, 6); 1675 /// # }); 1676 /// ``` select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream,1677 fn select_next_some(&mut self) -> SelectNextSome<'_, Self> 1678 where 1679 Self: Unpin + FusedStream, 1680 { 1681 assert_future::<Self::Item, _>(SelectNextSome::new(self)) 1682 } 1683 } 1684