1 //! Streams 2 //! 3 //! This module contains a number of functions for working with `Stream`s, 4 //! including the `StreamExt` trait which adds methods to `Stream` types. 5 6 use crate::future::{assert_future, Either}; 7 use crate::stream::assert_stream; 8 #[cfg(feature = "alloc")] 9 use alloc::boxed::Box; 10 #[cfg(feature = "alloc")] 11 use alloc::vec::Vec; 12 use core::pin::Pin; 13 #[cfg(feature = "sink")] 14 use futures_core::stream::TryStream; 15 #[cfg(feature = "alloc")] 16 use futures_core::stream::{BoxStream, LocalBoxStream}; 17 use futures_core::{ 18 future::Future, 19 stream::{FusedStream, Stream}, 20 task::{Context, Poll}, 21 }; 22 #[cfg(feature = "sink")] 23 use futures_sink::Sink; 24 25 use crate::fns::{inspect_fn, InspectFn}; 26 27 mod chain; 28 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 29 pub use self::chain::Chain; 30 31 mod collect; 32 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 33 pub use self::collect::Collect; 34 35 mod unzip; 36 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 37 pub use self::unzip::Unzip; 38 39 mod concat; 40 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 41 pub use self::concat::Concat; 42 43 mod cycle; 44 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 45 pub use self::cycle::Cycle; 46 47 mod enumerate; 48 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 49 pub use self::enumerate::Enumerate; 50 51 mod filter; 52 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 53 pub use self::filter::Filter; 54 55 mod filter_map; 56 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 57 pub use self::filter_map::FilterMap; 58 59 mod flatten; 60 61 delegate_all!( 62 /// Stream for the [`flatten`](StreamExt::flatten) method. 63 Flatten<St>( 64 flatten::Flatten<St, St::Item> 65 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)] 66 where St: Stream 67 ); 68 69 mod fold; 70 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 71 pub use self::fold::Fold; 72 73 mod any; 74 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 75 pub use self::any::Any; 76 77 mod all; 78 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 79 pub use self::all::All; 80 81 #[cfg(feature = "sink")] 82 mod forward; 83 84 #[cfg(feature = "sink")] 85 delegate_all!( 86 /// Future for the [`forward`](super::StreamExt::forward) method. 87 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 88 Forward<St, Si>( 89 forward::Forward<St, Si, St::Ok> 90 ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)] 91 where St: TryStream 92 ); 93 94 mod for_each; 95 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 96 pub use self::for_each::ForEach; 97 98 mod fuse; 99 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 100 pub use self::fuse::Fuse; 101 102 mod into_future; 103 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 104 pub use self::into_future::StreamFuture; 105 106 delegate_all!( 107 /// Stream for the [`inspect`](StreamExt::inspect) method. 108 Inspect<St, F>( 109 map::Map<St, InspectFn<F>> 110 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| map::Map::new(x, inspect_fn(f))] 111 ); 112 113 mod map; 114 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 115 pub use self::map::Map; 116 117 delegate_all!( 118 /// Stream for the [`flat_map`](StreamExt::flat_map) method. 119 FlatMap<St, U, F>( 120 flatten::Flatten<Map<St, F>, U> 121 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))] 122 ); 123 124 mod next; 125 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 126 pub use self::next::Next; 127 128 mod select_next_some; 129 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 130 pub use self::select_next_some::SelectNextSome; 131 132 mod peek; 133 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 134 pub use self::peek::{NextIf, NextIfEq, Peek, PeekMut, Peekable}; 135 136 mod skip; 137 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 138 pub use self::skip::Skip; 139 140 mod skip_while; 141 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 142 pub use self::skip_while::SkipWhile; 143 144 mod take; 145 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 146 pub use self::take::Take; 147 148 mod take_while; 149 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 150 pub use self::take_while::TakeWhile; 151 152 mod take_until; 153 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 154 pub use self::take_until::TakeUntil; 155 156 mod then; 157 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 158 pub use self::then::Then; 159 160 mod zip; 161 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 162 pub use self::zip::Zip; 163 164 #[cfg(feature = "alloc")] 165 mod chunks; 166 #[cfg(feature = "alloc")] 167 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 168 pub use self::chunks::Chunks; 169 170 #[cfg(feature = "alloc")] 171 mod ready_chunks; 172 #[cfg(feature = "alloc")] 173 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 174 pub use self::ready_chunks::ReadyChunks; 175 176 mod scan; 177 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 178 pub use self::scan::Scan; 179 180 #[cfg(not(futures_no_atomic_cas))] 181 #[cfg(feature = "alloc")] 182 mod buffer_unordered; 183 #[cfg(not(futures_no_atomic_cas))] 184 #[cfg(feature = "alloc")] 185 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 186 pub use self::buffer_unordered::BufferUnordered; 187 188 #[cfg(not(futures_no_atomic_cas))] 189 #[cfg(feature = "alloc")] 190 mod buffered; 191 #[cfg(not(futures_no_atomic_cas))] 192 #[cfg(feature = "alloc")] 193 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 194 pub use self::buffered::Buffered; 195 196 #[cfg(not(futures_no_atomic_cas))] 197 #[cfg(feature = "alloc")] 198 mod for_each_concurrent; 199 #[cfg(not(futures_no_atomic_cas))] 200 #[cfg(feature = "alloc")] 201 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 202 pub use self::for_each_concurrent::ForEachConcurrent; 203 204 #[cfg(not(futures_no_atomic_cas))] 205 #[cfg(feature = "sink")] 206 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 207 #[cfg(feature = "alloc")] 208 mod split; 209 #[cfg(not(futures_no_atomic_cas))] 210 #[cfg(feature = "sink")] 211 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 212 #[cfg(feature = "alloc")] 213 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 214 pub use self::split::{ReuniteError, SplitSink, SplitStream}; 215 216 #[cfg(feature = "std")] 217 mod catch_unwind; 218 #[cfg(feature = "std")] 219 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 220 pub use self::catch_unwind::CatchUnwind; 221 222 impl<T: ?Sized> StreamExt for T where T: Stream {} 223 224 /// An extension trait for `Stream`s that provides a variety of convenient 225 /// combinator functions. 226 pub trait StreamExt: Stream { 227 /// Creates a future that resolves to the next item in the stream. 228 /// 229 /// Note that because `next` doesn't take ownership over the stream, 230 /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a 231 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can 232 /// be done by boxing the stream using [`Box::pin`] or 233 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` 234 /// crate. 235 /// 236 /// # Examples 237 /// 238 /// ``` 239 /// # futures::executor::block_on(async { 240 /// use futures::stream::{self, StreamExt}; 241 /// 242 /// let mut stream = stream::iter(1..=3); 243 /// 244 /// assert_eq!(stream.next().await, Some(1)); 245 /// assert_eq!(stream.next().await, Some(2)); 246 /// assert_eq!(stream.next().await, Some(3)); 247 /// assert_eq!(stream.next().await, None); 248 /// # }); 249 /// ``` next(&mut self) -> Next<'_, Self> where Self: Unpin,250 fn next(&mut self) -> Next<'_, Self> 251 where 252 Self: Unpin, 253 { 254 assert_future::<Option<Self::Item>, _>(Next::new(self)) 255 } 256 257 /// Converts this stream into a future of `(next_item, tail_of_stream)`. 258 /// If the stream terminates, then the next item is [`None`]. 259 /// 260 /// The returned future can be used to compose streams and futures together 261 /// by placing everything into the "world of futures". 262 /// 263 /// Note that because `into_future` moves the stream, the [`Stream`] type 264 /// must be [`Unpin`]. If you want to use `into_future` with a 265 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can 266 /// be done by boxing the stream using [`Box::pin`] or 267 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` 268 /// crate. 269 /// 270 /// # Examples 271 /// 272 /// ``` 273 /// # futures::executor::block_on(async { 274 /// use futures::stream::{self, StreamExt}; 275 /// 276 /// let stream = stream::iter(1..=3); 277 /// 278 /// let (item, stream) = stream.into_future().await; 279 /// assert_eq!(Some(1), item); 280 /// 281 /// let (item, stream) = stream.into_future().await; 282 /// assert_eq!(Some(2), item); 283 /// # }); 284 /// ``` into_future(self) -> StreamFuture<Self> where Self: Sized + Unpin,285 fn into_future(self) -> StreamFuture<Self> 286 where 287 Self: Sized + Unpin, 288 { 289 assert_future::<(Option<Self::Item>, Self), _>(StreamFuture::new(self)) 290 } 291 292 /// Maps this stream's items to a different type, returning a new stream of 293 /// the resulting type. 294 /// 295 /// The provided closure is executed over all elements of this stream as 296 /// they are made available. It is executed inline with calls to 297 /// [`poll_next`](Stream::poll_next). 298 /// 299 /// Note that this function consumes the stream passed into it and returns a 300 /// wrapped version of it, similar to the existing `map` methods in the 301 /// standard library. 302 /// 303 /// # Examples 304 /// 305 /// ``` 306 /// # futures::executor::block_on(async { 307 /// use futures::stream::{self, StreamExt}; 308 /// 309 /// let stream = stream::iter(1..=3); 310 /// let stream = stream.map(|x| x + 3); 311 /// 312 /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await); 313 /// # }); 314 /// ``` map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,315 fn map<T, F>(self, f: F) -> Map<Self, F> 316 where 317 F: FnMut(Self::Item) -> T, 318 Self: Sized, 319 { 320 assert_stream::<T, _>(Map::new(self, f)) 321 } 322 323 /// Creates a stream which gives the current iteration count as well as 324 /// the next value. 325 /// 326 /// The stream returned yields pairs `(i, val)`, where `i` is the 327 /// current index of iteration and `val` is the value returned by the 328 /// stream. 329 /// 330 /// `enumerate()` keeps its count as a [`usize`]. If you want to count by a 331 /// different sized integer, the [`zip`](StreamExt::zip) function provides similar 332 /// functionality. 333 /// 334 /// # Overflow Behavior 335 /// 336 /// The method does no guarding against overflows, so enumerating more than 337 /// [`prim@usize::max_value()`] elements either produces the wrong result or panics. If 338 /// debug assertions are enabled, a panic is guaranteed. 339 /// 340 /// # Panics 341 /// 342 /// The returned stream might panic if the to-be-returned index would 343 /// overflow a [`usize`]. 344 /// 345 /// # Examples 346 /// 347 /// ``` 348 /// # futures::executor::block_on(async { 349 /// use futures::stream::{self, StreamExt}; 350 /// 351 /// let stream = stream::iter(vec!['a', 'b', 'c']); 352 /// 353 /// let mut stream = stream.enumerate(); 354 /// 355 /// assert_eq!(stream.next().await, Some((0, 'a'))); 356 /// assert_eq!(stream.next().await, Some((1, 'b'))); 357 /// assert_eq!(stream.next().await, Some((2, 'c'))); 358 /// assert_eq!(stream.next().await, None); 359 /// # }); 360 /// ``` enumerate(self) -> Enumerate<Self> where Self: Sized,361 fn enumerate(self) -> Enumerate<Self> 362 where 363 Self: Sized, 364 { 365 assert_stream::<(usize, Self::Item), _>(Enumerate::new(self)) 366 } 367 368 /// Filters the values produced by this stream according to the provided 369 /// asynchronous predicate. 370 /// 371 /// As values of this stream are made available, the provided predicate `f` 372 /// will be run against them. If the predicate returns a `Future` which 373 /// resolves to `true`, then the stream will yield the value, but if the 374 /// predicate returns a `Future` which resolves to `false`, then the value 375 /// will be discarded and the next value will be produced. 376 /// 377 /// Note that this function consumes the stream passed into it and returns a 378 /// wrapped version of it, similar to the existing `filter` methods in the 379 /// standard library. 380 /// 381 /// # Examples 382 /// 383 /// ``` 384 /// # futures::executor::block_on(async { 385 /// use futures::future; 386 /// use futures::stream::{self, StreamExt}; 387 /// 388 /// let stream = stream::iter(1..=10); 389 /// let evens = stream.filter(|x| future::ready(x % 2 == 0)); 390 /// 391 /// assert_eq!(vec![2, 4, 6, 8, 10], evens.collect::<Vec<_>>().await); 392 /// # }); 393 /// ``` filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,394 fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> 395 where 396 F: FnMut(&Self::Item) -> Fut, 397 Fut: Future<Output = bool>, 398 Self: Sized, 399 { 400 assert_stream::<Self::Item, _>(Filter::new(self, f)) 401 } 402 403 /// Filters the values produced by this stream while simultaneously mapping 404 /// them to a different type according to the provided asynchronous closure. 405 /// 406 /// As values of this stream are made available, the provided function will 407 /// be run on them. If the future returned by the predicate `f` resolves to 408 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if 409 /// it resolves to [`None`] then the next value will be produced. 410 /// 411 /// Note that this function consumes the stream passed into it and returns a 412 /// wrapped version of it, similar to the existing `filter_map` methods in 413 /// the standard library. 414 /// 415 /// # Examples 416 /// ``` 417 /// # futures::executor::block_on(async { 418 /// use futures::stream::{self, StreamExt}; 419 /// 420 /// let stream = stream::iter(1..=10); 421 /// let evens = stream.filter_map(|x| async move { 422 /// if x % 2 == 0 { Some(x + 1) } else { None } 423 /// }); 424 /// 425 /// assert_eq!(vec![3, 5, 7, 9, 11], evens.collect::<Vec<_>>().await); 426 /// # }); 427 /// ``` filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = Option<T>>, Self: Sized,428 fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> 429 where 430 F: FnMut(Self::Item) -> Fut, 431 Fut: Future<Output = Option<T>>, 432 Self: Sized, 433 { 434 assert_stream::<T, _>(FilterMap::new(self, f)) 435 } 436 437 /// Computes from this stream's items new items of a different type using 438 /// an asynchronous closure. 439 /// 440 /// The provided closure `f` will be called with an `Item` once a value is 441 /// ready, it returns a future which will then be run to completion 442 /// to produce the next value on this stream. 443 /// 444 /// Note that this function consumes the stream passed into it and returns a 445 /// wrapped version of it. 446 /// 447 /// # Examples 448 /// 449 /// ``` 450 /// # futures::executor::block_on(async { 451 /// use futures::stream::{self, StreamExt}; 452 /// 453 /// let stream = stream::iter(1..=3); 454 /// let stream = stream.then(|x| async move { x + 3 }); 455 /// 456 /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await); 457 /// # }); 458 /// ``` then<Fut, F>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,459 fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F> 460 where 461 F: FnMut(Self::Item) -> Fut, 462 Fut: Future, 463 Self: Sized, 464 { 465 assert_stream::<Fut::Output, _>(Then::new(self, f)) 466 } 467 468 /// Transforms a stream into a collection, returning a 469 /// future representing the result of that computation. 470 /// 471 /// The returned future will be resolved when the stream terminates. 472 /// 473 /// # Examples 474 /// 475 /// ``` 476 /// # futures::executor::block_on(async { 477 /// use futures::channel::mpsc; 478 /// use futures::stream::StreamExt; 479 /// use std::thread; 480 /// 481 /// let (tx, rx) = mpsc::unbounded(); 482 /// 483 /// thread::spawn(move || { 484 /// for i in 1..=5 { 485 /// tx.unbounded_send(i).unwrap(); 486 /// } 487 /// }); 488 /// 489 /// let output = rx.collect::<Vec<i32>>().await; 490 /// assert_eq!(output, vec![1, 2, 3, 4, 5]); 491 /// # }); 492 /// ``` collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C> where Self: Sized,493 fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C> 494 where 495 Self: Sized, 496 { 497 assert_future::<C, _>(Collect::new(self)) 498 } 499 500 /// Converts a stream of pairs into a future, which 501 /// resolves to pair of containers. 502 /// 503 /// `unzip()` produces a future, which resolves to two 504 /// collections: one from the left elements of the pairs, 505 /// and one from the right elements. 506 /// 507 /// The returned future will be resolved when the stream terminates. 508 /// 509 /// # Examples 510 /// 511 /// ``` 512 /// # futures::executor::block_on(async { 513 /// use futures::channel::mpsc; 514 /// use futures::stream::StreamExt; 515 /// use std::thread; 516 /// 517 /// let (tx, rx) = mpsc::unbounded(); 518 /// 519 /// thread::spawn(move || { 520 /// tx.unbounded_send((1, 2)).unwrap(); 521 /// tx.unbounded_send((3, 4)).unwrap(); 522 /// tx.unbounded_send((5, 6)).unwrap(); 523 /// }); 524 /// 525 /// let (o1, o2): (Vec<_>, Vec<_>) = rx.unzip().await; 526 /// assert_eq!(o1, vec![1, 3, 5]); 527 /// assert_eq!(o2, vec![2, 4, 6]); 528 /// # }); 529 /// ``` unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB> where FromA: Default + Extend<A>, FromB: Default + Extend<B>, Self: Sized + Stream<Item = (A, B)>,530 fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB> 531 where 532 FromA: Default + Extend<A>, 533 FromB: Default + Extend<B>, 534 Self: Sized + Stream<Item = (A, B)>, 535 { 536 assert_future::<(FromA, FromB), _>(Unzip::new(self)) 537 } 538 539 /// Concatenate all items of a stream into a single extendable 540 /// destination, returning a future representing the end result. 541 /// 542 /// This combinator will extend the first item with the contents 543 /// of all the subsequent results of the stream. If the stream is 544 /// empty, the default value will be returned. 545 /// 546 /// Works with all collections that implement the 547 /// [`Extend`](std::iter::Extend) trait. 548 /// 549 /// # Examples 550 /// 551 /// ``` 552 /// # futures::executor::block_on(async { 553 /// use futures::channel::mpsc; 554 /// use futures::stream::StreamExt; 555 /// use std::thread; 556 /// 557 /// let (tx, rx) = mpsc::unbounded(); 558 /// 559 /// thread::spawn(move || { 560 /// for i in (0..3).rev() { 561 /// let n = i * 3; 562 /// tx.unbounded_send(vec![n + 1, n + 2, n + 3]).unwrap(); 563 /// } 564 /// }); 565 /// 566 /// let result = rx.concat().await; 567 /// 568 /// assert_eq!(result, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]); 569 /// # }); 570 /// ``` concat(self) -> Concat<Self> where Self: Sized, Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,571 fn concat(self) -> Concat<Self> 572 where 573 Self: Sized, 574 Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default, 575 { 576 assert_future::<Self::Item, _>(Concat::new(self)) 577 } 578 579 /// Repeats a stream endlessly. 580 /// 581 /// The stream never terminates. Note that you likely want to avoid 582 /// usage of `collect` or such on the returned stream as it will exhaust 583 /// available memory as it tries to just fill up all RAM. 584 /// 585 /// # Examples 586 /// 587 /// ``` 588 /// # futures::executor::block_on(async { 589 /// use futures::stream::{self, StreamExt}; 590 /// let a = [1, 2, 3]; 591 /// let mut s = stream::iter(a.iter()).cycle(); 592 /// 593 /// assert_eq!(s.next().await, Some(&1)); 594 /// assert_eq!(s.next().await, Some(&2)); 595 /// assert_eq!(s.next().await, Some(&3)); 596 /// assert_eq!(s.next().await, Some(&1)); 597 /// assert_eq!(s.next().await, Some(&2)); 598 /// assert_eq!(s.next().await, Some(&3)); 599 /// assert_eq!(s.next().await, Some(&1)); 600 /// # }); 601 /// ``` cycle(self) -> Cycle<Self> where Self: Sized + Clone,602 fn cycle(self) -> Cycle<Self> 603 where 604 Self: Sized + Clone, 605 { 606 assert_stream::<Self::Item, _>(Cycle::new(self)) 607 } 608 609 /// Execute an accumulating asynchronous computation over a stream, 610 /// collecting all the values into one final result. 611 /// 612 /// This combinator will accumulate all values returned by this stream 613 /// according to the closure provided. The initial state is also provided to 614 /// this method and then is returned again by each execution of the closure. 615 /// Once the entire stream has been exhausted the returned future will 616 /// resolve to this value. 617 /// 618 /// # Examples 619 /// 620 /// ``` 621 /// # futures::executor::block_on(async { 622 /// use futures::stream::{self, StreamExt}; 623 /// 624 /// let number_stream = stream::iter(0..6); 625 /// let sum = number_stream.fold(0, |acc, x| async move { acc + x }); 626 /// assert_eq!(sum.await, 15); 627 /// # }); 628 /// ``` fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> where F: FnMut(T, Self::Item) -> Fut, Fut: Future<Output = T>, Self: Sized,629 fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> 630 where 631 F: FnMut(T, Self::Item) -> Fut, 632 Fut: Future<Output = T>, 633 Self: Sized, 634 { 635 assert_future::<T, _>(Fold::new(self, f, init)) 636 } 637 638 /// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate. 639 /// 640 /// # Examples 641 /// 642 /// ``` 643 /// # futures::executor::block_on(async { 644 /// use futures::stream::{self, StreamExt}; 645 /// 646 /// let number_stream = stream::iter(0..10); 647 /// let contain_three = number_stream.any(|i| async move { i == 3 }); 648 /// assert_eq!(contain_three.await, true); 649 /// # }); 650 /// ``` any<Fut, F>(self, f: F) -> Any<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,651 fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F> 652 where 653 F: FnMut(Self::Item) -> Fut, 654 Fut: Future<Output = bool>, 655 Self: Sized, 656 { 657 assert_future::<bool, _>(Any::new(self, f)) 658 } 659 660 /// Execute predicate over asynchronous stream, and return `true` if all element in stream satisfied a predicate. 661 /// 662 /// # Examples 663 /// 664 /// ``` 665 /// # futures::executor::block_on(async { 666 /// use futures::stream::{self, StreamExt}; 667 /// 668 /// let number_stream = stream::iter(0..10); 669 /// let less_then_twenty = number_stream.all(|i| async move { i < 20 }); 670 /// assert_eq!(less_then_twenty.await, true); 671 /// # }); 672 /// ``` all<Fut, F>(self, f: F) -> All<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,673 fn all<Fut, F>(self, f: F) -> All<Self, Fut, F> 674 where 675 F: FnMut(Self::Item) -> Fut, 676 Fut: Future<Output = bool>, 677 Self: Sized, 678 { 679 assert_future::<bool, _>(All::new(self, f)) 680 } 681 682 /// Flattens a stream of streams into just one continuous stream. 683 /// 684 /// # Examples 685 /// 686 /// ``` 687 /// # futures::executor::block_on(async { 688 /// use futures::channel::mpsc; 689 /// use futures::stream::StreamExt; 690 /// use std::thread; 691 /// 692 /// let (tx1, rx1) = mpsc::unbounded(); 693 /// let (tx2, rx2) = mpsc::unbounded(); 694 /// let (tx3, rx3) = mpsc::unbounded(); 695 /// 696 /// thread::spawn(move || { 697 /// tx1.unbounded_send(1).unwrap(); 698 /// tx1.unbounded_send(2).unwrap(); 699 /// }); 700 /// thread::spawn(move || { 701 /// tx2.unbounded_send(3).unwrap(); 702 /// tx2.unbounded_send(4).unwrap(); 703 /// }); 704 /// thread::spawn(move || { 705 /// tx3.unbounded_send(rx1).unwrap(); 706 /// tx3.unbounded_send(rx2).unwrap(); 707 /// }); 708 /// 709 /// let output = rx3.flatten().collect::<Vec<i32>>().await; 710 /// assert_eq!(output, vec![1, 2, 3, 4]); 711 /// # }); 712 /// ``` flatten(self) -> Flatten<Self> where Self::Item: Stream, Self: Sized,713 fn flatten(self) -> Flatten<Self> 714 where 715 Self::Item: Stream, 716 Self: Sized, 717 { 718 assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self)) 719 } 720 721 /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s. 722 /// 723 /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead, 724 /// you would have to chain combinators like `.map(f).flatten()` while this 725 /// combinator provides ability to write `.flat_map(f)` instead of chaining. 726 /// 727 /// The provided closure which produce inner streams is executed over all elements 728 /// of stream as last inner stream is terminated and next stream item is available. 729 /// 730 /// Note that this function consumes the stream passed into it and returns a 731 /// wrapped version of it, similar to the existing `flat_map` methods in the 732 /// standard library. 733 /// 734 /// # Examples 735 /// 736 /// ``` 737 /// # futures::executor::block_on(async { 738 /// use futures::stream::{self, StreamExt}; 739 /// 740 /// let stream = stream::iter(1..=3); 741 /// let stream = stream.flat_map(|x| stream::iter(vec![x + 3; x])); 742 /// 743 /// assert_eq!(vec![4, 5, 5, 6, 6, 6], stream.collect::<Vec<_>>().await); 744 /// # }); 745 /// ``` flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> where F: FnMut(Self::Item) -> U, U: Stream, Self: Sized,746 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> 747 where 748 F: FnMut(Self::Item) -> U, 749 U: Stream, 750 Self: Sized, 751 { 752 assert_stream::<U::Item, _>(FlatMap::new(self, f)) 753 } 754 755 /// Combinator similar to [`StreamExt::fold`] that holds internal state 756 /// and produces a new stream. 757 /// 758 /// Accepts initial state and closure which will be applied to each element 759 /// of the stream until provided closure returns `None`. Once `None` is 760 /// returned, stream will be terminated. 761 /// 762 /// # Examples 763 /// 764 /// ``` 765 /// # futures::executor::block_on(async { 766 /// use futures::future; 767 /// use futures::stream::{self, StreamExt}; 768 /// 769 /// let stream = stream::iter(1..=10); 770 /// 771 /// let stream = stream.scan(0, |state, x| { 772 /// *state += x; 773 /// future::ready(if *state < 10 { Some(x) } else { None }) 774 /// }); 775 /// 776 /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await); 777 /// # }); 778 /// ``` scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> where F: FnMut(&mut S, Self::Item) -> Fut, Fut: Future<Output = Option<B>>, Self: Sized,779 fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> 780 where 781 F: FnMut(&mut S, Self::Item) -> Fut, 782 Fut: Future<Output = Option<B>>, 783 Self: Sized, 784 { 785 assert_stream::<B, _>(Scan::new(self, initial_state, f)) 786 } 787 788 /// Skip elements on this stream while the provided asynchronous predicate 789 /// resolves to `true`. 790 /// 791 /// This function, like `Iterator::skip_while`, will skip elements on the 792 /// stream until the predicate `f` resolves to `false`. Once one element 793 /// returns `false`, all future elements will be returned from the underlying 794 /// stream. 795 /// 796 /// # Examples 797 /// 798 /// ``` 799 /// # futures::executor::block_on(async { 800 /// use futures::future; 801 /// use futures::stream::{self, StreamExt}; 802 /// 803 /// let stream = stream::iter(1..=10); 804 /// 805 /// let stream = stream.skip_while(|x| future::ready(*x <= 5)); 806 /// 807 /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await); 808 /// # }); 809 /// ``` skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,810 fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> 811 where 812 F: FnMut(&Self::Item) -> Fut, 813 Fut: Future<Output = bool>, 814 Self: Sized, 815 { 816 assert_stream::<Self::Item, _>(SkipWhile::new(self, f)) 817 } 818 819 /// Take elements from this stream while the provided asynchronous predicate 820 /// resolves to `true`. 821 /// 822 /// This function, like `Iterator::take_while`, will take elements from the 823 /// stream until the predicate `f` resolves to `false`. Once one element 824 /// returns `false`, it will always return that the stream is done. 825 /// 826 /// # Examples 827 /// 828 /// ``` 829 /// # futures::executor::block_on(async { 830 /// use futures::future; 831 /// use futures::stream::{self, StreamExt}; 832 /// 833 /// let stream = stream::iter(1..=10); 834 /// 835 /// let stream = stream.take_while(|x| future::ready(*x <= 5)); 836 /// 837 /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await); 838 /// # }); 839 /// ``` take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,840 fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> 841 where 842 F: FnMut(&Self::Item) -> Fut, 843 Fut: Future<Output = bool>, 844 Self: Sized, 845 { 846 assert_stream::<Self::Item, _>(TakeWhile::new(self, f)) 847 } 848 849 /// Take elements from this stream until the provided future resolves. 850 /// 851 /// This function will take elements from the stream until the provided 852 /// stopping future `fut` resolves. Once the `fut` future becomes ready, 853 /// this stream combinator will always return that the stream is done. 854 /// 855 /// The stopping future may return any type. Once the stream is stopped 856 /// the result of the stopping future may be accessed with `TakeUntil::take_result()`. 857 /// The stream may also be resumed with `TakeUntil::take_future()`. 858 /// See the documentation of [`TakeUntil`] for more information. 859 /// 860 /// # Examples 861 /// 862 /// ``` 863 /// # futures::executor::block_on(async { 864 /// use futures::future; 865 /// use futures::stream::{self, StreamExt}; 866 /// use futures::task::Poll; 867 /// 868 /// let stream = stream::iter(1..=10); 869 /// 870 /// let mut i = 0; 871 /// let stop_fut = future::poll_fn(|_cx| { 872 /// i += 1; 873 /// if i <= 5 { 874 /// Poll::Pending 875 /// } else { 876 /// Poll::Ready(()) 877 /// } 878 /// }); 879 /// 880 /// let stream = stream.take_until(stop_fut); 881 /// 882 /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await); 883 /// # }); 884 /// ``` take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> where Fut: Future, Self: Sized,885 fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> 886 where 887 Fut: Future, 888 Self: Sized, 889 { 890 assert_stream::<Self::Item, _>(TakeUntil::new(self, fut)) 891 } 892 893 /// Runs this stream to completion, executing the provided asynchronous 894 /// closure for each element on the stream. 895 /// 896 /// The closure provided will be called for each item this stream produces, 897 /// yielding a future. That future will then be executed to completion 898 /// before moving on to the next item. 899 /// 900 /// The returned value is a `Future` where the `Output` type is `()`; it is 901 /// executed entirely for its side effects. 902 /// 903 /// To process each item in the stream and produce another stream instead 904 /// of a single future, use `then` instead. 905 /// 906 /// # Examples 907 /// 908 /// ``` 909 /// # futures::executor::block_on(async { 910 /// use futures::future; 911 /// use futures::stream::{self, StreamExt}; 912 /// 913 /// let mut x = 0; 914 /// 915 /// { 916 /// let fut = stream::repeat(1).take(3).for_each(|item| { 917 /// x += item; 918 /// future::ready(()) 919 /// }); 920 /// fut.await; 921 /// } 922 /// 923 /// assert_eq!(x, 3); 924 /// # }); 925 /// ``` for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,926 fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> 927 where 928 F: FnMut(Self::Item) -> Fut, 929 Fut: Future<Output = ()>, 930 Self: Sized, 931 { 932 assert_future::<(), _>(ForEach::new(self, f)) 933 } 934 935 /// Runs this stream to completion, executing the provided asynchronous 936 /// closure for each element on the stream concurrently as elements become 937 /// available. 938 /// 939 /// This is similar to [`StreamExt::for_each`], but the futures 940 /// produced by the closure are run concurrently (but not in parallel-- 941 /// this combinator does not introduce any threads). 942 /// 943 /// The closure provided will be called for each item this stream produces, 944 /// yielding a future. That future will then be executed to completion 945 /// concurrently with the other futures produced by the closure. 946 /// 947 /// The first argument is an optional limit on the number of concurrent 948 /// futures. If this limit is not `None`, no more than `limit` futures 949 /// will be run concurrently. The `limit` argument is of type 950 /// `Into<Option<usize>>`, and so can be provided as either `None`, 951 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as 952 /// no limit at all, and will have the same result as passing in `None`. 953 /// 954 /// This method is only available when the `std` or `alloc` feature of this 955 /// library is activated, and it is activated by default. 956 /// 957 /// # Examples 958 /// 959 /// ``` 960 /// # futures::executor::block_on(async { 961 /// use futures::channel::oneshot; 962 /// use futures::stream::{self, StreamExt}; 963 /// 964 /// let (tx1, rx1) = oneshot::channel(); 965 /// let (tx2, rx2) = oneshot::channel(); 966 /// let (tx3, rx3) = oneshot::channel(); 967 /// 968 /// let fut = stream::iter(vec![rx1, rx2, rx3]).for_each_concurrent( 969 /// /* limit */ 2, 970 /// |rx| async move { 971 /// rx.await.unwrap(); 972 /// } 973 /// ); 974 /// tx1.send(()).unwrap(); 975 /// tx2.send(()).unwrap(); 976 /// tx3.send(()).unwrap(); 977 /// fut.await; 978 /// # }) 979 /// ``` 980 #[cfg(not(futures_no_atomic_cas))] 981 #[cfg(feature = "alloc")] for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> ForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,982 fn for_each_concurrent<Fut, F>( 983 self, 984 limit: impl Into<Option<usize>>, 985 f: F, 986 ) -> ForEachConcurrent<Self, Fut, F> 987 where 988 F: FnMut(Self::Item) -> Fut, 989 Fut: Future<Output = ()>, 990 Self: Sized, 991 { 992 assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f)) 993 } 994 995 /// Creates a new stream of at most `n` items of the underlying stream. 996 /// 997 /// Once `n` items have been yielded from this stream then it will always 998 /// return that the stream is done. 999 /// 1000 /// # Examples 1001 /// 1002 /// ``` 1003 /// # futures::executor::block_on(async { 1004 /// use futures::stream::{self, StreamExt}; 1005 /// 1006 /// let stream = stream::iter(1..=10).take(3); 1007 /// 1008 /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await); 1009 /// # }); 1010 /// ``` take(self, n: usize) -> Take<Self> where Self: Sized,1011 fn take(self, n: usize) -> Take<Self> 1012 where 1013 Self: Sized, 1014 { 1015 assert_stream::<Self::Item, _>(Take::new(self, n)) 1016 } 1017 1018 /// Creates a new stream which skips `n` items of the underlying stream. 1019 /// 1020 /// Once `n` items have been skipped from this stream then it will always 1021 /// return the remaining items on this stream. 1022 /// 1023 /// # Examples 1024 /// 1025 /// ``` 1026 /// # futures::executor::block_on(async { 1027 /// use futures::stream::{self, StreamExt}; 1028 /// 1029 /// let stream = stream::iter(1..=10).skip(5); 1030 /// 1031 /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await); 1032 /// # }); 1033 /// ``` skip(self, n: usize) -> Skip<Self> where Self: Sized,1034 fn skip(self, n: usize) -> Skip<Self> 1035 where 1036 Self: Sized, 1037 { 1038 assert_stream::<Self::Item, _>(Skip::new(self, n)) 1039 } 1040 1041 /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never 1042 /// again be called once it has finished. This method can be used to turn 1043 /// any `Stream` into a `FusedStream`. 1044 /// 1045 /// Normally, once a stream has returned [`None`] from 1046 /// [`poll_next`](Stream::poll_next) any further calls could exhibit bad 1047 /// behavior such as block forever, panic, never return, etc. If it is known 1048 /// that [`poll_next`](Stream::poll_next) may be called after stream 1049 /// has already finished, then this method can be used to ensure that it has 1050 /// defined semantics. 1051 /// 1052 /// The [`poll_next`](Stream::poll_next) method of a `fuse`d stream 1053 /// is guaranteed to return [`None`] after the underlying stream has 1054 /// finished. 1055 /// 1056 /// # Examples 1057 /// 1058 /// ``` 1059 /// use futures::executor::block_on_stream; 1060 /// use futures::stream::{self, StreamExt}; 1061 /// use futures::task::Poll; 1062 /// 1063 /// let mut x = 0; 1064 /// let stream = stream::poll_fn(|_| { 1065 /// x += 1; 1066 /// match x { 1067 /// 0..=2 => Poll::Ready(Some(x)), 1068 /// 3 => Poll::Ready(None), 1069 /// _ => panic!("should not happen") 1070 /// } 1071 /// }).fuse(); 1072 /// 1073 /// let mut iter = block_on_stream(stream); 1074 /// assert_eq!(Some(1), iter.next()); 1075 /// assert_eq!(Some(2), iter.next()); 1076 /// assert_eq!(None, iter.next()); 1077 /// assert_eq!(None, iter.next()); 1078 /// // ... 1079 /// ``` fuse(self) -> Fuse<Self> where Self: Sized,1080 fn fuse(self) -> Fuse<Self> 1081 where 1082 Self: Sized, 1083 { 1084 assert_stream::<Self::Item, _>(Fuse::new(self)) 1085 } 1086 1087 /// Borrows a stream, rather than consuming it. 1088 /// 1089 /// This is useful to allow applying stream adaptors while still retaining 1090 /// ownership of the original stream. 1091 /// 1092 /// # Examples 1093 /// 1094 /// ``` 1095 /// # futures::executor::block_on(async { 1096 /// use futures::stream::{self, StreamExt}; 1097 /// 1098 /// let mut stream = stream::iter(1..5); 1099 /// 1100 /// let sum = stream.by_ref() 1101 /// .take(2) 1102 /// .fold(0, |a, b| async move { a + b }) 1103 /// .await; 1104 /// assert_eq!(sum, 3); 1105 /// 1106 /// // You can use the stream again 1107 /// let sum = stream.take(2) 1108 /// .fold(0, |a, b| async move { a + b }) 1109 /// .await; 1110 /// assert_eq!(sum, 7); 1111 /// # }); 1112 /// ``` by_ref(&mut self) -> &mut Self1113 fn by_ref(&mut self) -> &mut Self { 1114 self 1115 } 1116 1117 /// Catches unwinding panics while polling the stream. 1118 /// 1119 /// Caught panic (if any) will be the last element of the resulting stream. 1120 /// 1121 /// In general, panics within a stream can propagate all the way out to the 1122 /// task level. This combinator makes it possible to halt unwinding within 1123 /// the stream itself. It's most commonly used within task executors. This 1124 /// method should not be used for error handling. 1125 /// 1126 /// Note that this method requires the `UnwindSafe` bound from the standard 1127 /// library. This isn't always applied automatically, and the standard 1128 /// library provides an `AssertUnwindSafe` wrapper type to apply it 1129 /// after-the fact. To assist using this method, the [`Stream`] trait is 1130 /// also implemented for `AssertUnwindSafe<St>` where `St` implements 1131 /// [`Stream`]. 1132 /// 1133 /// This method is only available when the `std` feature of this 1134 /// library is activated, and it is activated by default. 1135 /// 1136 /// # Examples 1137 /// 1138 /// ``` 1139 /// # futures::executor::block_on(async { 1140 /// use futures::stream::{self, StreamExt}; 1141 /// 1142 /// let stream = stream::iter(vec![Some(10), None, Some(11)]); 1143 /// // Panic on second element 1144 /// let stream_panicking = stream.map(|o| o.unwrap()); 1145 /// // Collect all the results 1146 /// let stream = stream_panicking.catch_unwind(); 1147 /// 1148 /// let results: Vec<Result<i32, _>> = stream.collect().await; 1149 /// match results[0] { 1150 /// Ok(10) => {} 1151 /// _ => panic!("unexpected result!"), 1152 /// } 1153 /// assert!(results[1].is_err()); 1154 /// assert_eq!(results.len(), 2); 1155 /// # }); 1156 /// ``` 1157 #[cfg(feature = "std")] catch_unwind(self) -> CatchUnwind<Self> where Self: Sized + std::panic::UnwindSafe,1158 fn catch_unwind(self) -> CatchUnwind<Self> 1159 where 1160 Self: Sized + std::panic::UnwindSafe, 1161 { 1162 assert_stream(CatchUnwind::new(self)) 1163 } 1164 1165 /// Wrap the stream in a Box, pinning it. 1166 /// 1167 /// This method is only available when the `std` or `alloc` feature of this 1168 /// library is activated, and it is activated by default. 1169 #[cfg(feature = "alloc")] boxed<'a>(self) -> BoxStream<'a, Self::Item> where Self: Sized + Send + 'a,1170 fn boxed<'a>(self) -> BoxStream<'a, Self::Item> 1171 where 1172 Self: Sized + Send + 'a, 1173 { 1174 assert_stream::<Self::Item, _>(Box::pin(self)) 1175 } 1176 1177 /// Wrap the stream in a Box, pinning it. 1178 /// 1179 /// Similar to `boxed`, but without the `Send` requirement. 1180 /// 1181 /// This method is only available when the `std` or `alloc` feature of this 1182 /// library is activated, and it is activated by default. 1183 #[cfg(feature = "alloc")] boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item> where Self: Sized + 'a,1184 fn boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item> 1185 where 1186 Self: Sized + 'a, 1187 { 1188 assert_stream::<Self::Item, _>(Box::pin(self)) 1189 } 1190 1191 /// An adaptor for creating a buffered list of pending futures. 1192 /// 1193 /// If this stream's item can be converted into a future, then this adaptor 1194 /// will buffer up to at most `n` futures and then return the outputs in the 1195 /// same order as the underlying stream. No more than `n` futures will be 1196 /// buffered at any point in time, and less than `n` may also be buffered 1197 /// depending on the state of each future. 1198 /// 1199 /// The returned stream will be a stream of each future's output. 1200 /// 1201 /// This method is only available when the `std` or `alloc` feature of this 1202 /// library is activated, and it is activated by default. 1203 #[cfg(not(futures_no_atomic_cas))] 1204 #[cfg(feature = "alloc")] buffered(self, n: usize) -> Buffered<Self> where Self::Item: Future, Self: Sized,1205 fn buffered(self, n: usize) -> Buffered<Self> 1206 where 1207 Self::Item: Future, 1208 Self: Sized, 1209 { 1210 assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n)) 1211 } 1212 1213 /// An adaptor for creating a buffered list of pending futures (unordered). 1214 /// 1215 /// If this stream's item can be converted into a future, then this adaptor 1216 /// will buffer up to `n` futures and then return the outputs in the order 1217 /// in which they complete. No more than `n` futures will be buffered at 1218 /// any point in time, and less than `n` may also be buffered depending on 1219 /// the state of each future. 1220 /// 1221 /// The returned stream will be a stream of each future's output. 1222 /// 1223 /// This method is only available when the `std` or `alloc` feature of this 1224 /// library is activated, and it is activated by default. 1225 /// 1226 /// # Examples 1227 /// 1228 /// ``` 1229 /// # futures::executor::block_on(async { 1230 /// use futures::channel::oneshot; 1231 /// use futures::stream::{self, StreamExt}; 1232 /// 1233 /// let (send_one, recv_one) = oneshot::channel(); 1234 /// let (send_two, recv_two) = oneshot::channel(); 1235 /// 1236 /// let stream_of_futures = stream::iter(vec![recv_one, recv_two]); 1237 /// let mut buffered = stream_of_futures.buffer_unordered(10); 1238 /// 1239 /// send_two.send(2i32)?; 1240 /// assert_eq!(buffered.next().await, Some(Ok(2i32))); 1241 /// 1242 /// send_one.send(1i32)?; 1243 /// assert_eq!(buffered.next().await, Some(Ok(1i32))); 1244 /// 1245 /// assert_eq!(buffered.next().await, None); 1246 /// # Ok::<(), i32>(()) }).unwrap(); 1247 /// ``` 1248 #[cfg(not(futures_no_atomic_cas))] 1249 #[cfg(feature = "alloc")] buffer_unordered(self, n: usize) -> BufferUnordered<Self> where Self::Item: Future, Self: Sized,1250 fn buffer_unordered(self, n: usize) -> BufferUnordered<Self> 1251 where 1252 Self::Item: Future, 1253 Self: Sized, 1254 { 1255 assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n)) 1256 } 1257 1258 /// An adapter for zipping two streams together. 1259 /// 1260 /// The zipped stream waits for both streams to produce an item, and then 1261 /// returns that pair. If either stream ends then the zipped stream will 1262 /// also end. 1263 /// 1264 /// # Examples 1265 /// 1266 /// ``` 1267 /// # futures::executor::block_on(async { 1268 /// use futures::stream::{self, StreamExt}; 1269 /// 1270 /// let stream1 = stream::iter(1..=3); 1271 /// let stream2 = stream::iter(5..=10); 1272 /// 1273 /// let vec = stream1.zip(stream2) 1274 /// .collect::<Vec<_>>() 1275 /// .await; 1276 /// assert_eq!(vec![(1, 5), (2, 6), (3, 7)], vec); 1277 /// # }); 1278 /// ``` 1279 /// zip<St>(self, other: St) -> Zip<Self, St> where St: Stream, Self: Sized,1280 fn zip<St>(self, other: St) -> Zip<Self, St> 1281 where 1282 St: Stream, 1283 Self: Sized, 1284 { 1285 assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other)) 1286 } 1287 1288 /// Adapter for chaining two streams. 1289 /// 1290 /// The resulting stream emits elements from the first stream, and when 1291 /// first stream reaches the end, emits the elements from the second stream. 1292 /// 1293 /// ``` 1294 /// # futures::executor::block_on(async { 1295 /// use futures::stream::{self, StreamExt}; 1296 /// 1297 /// let stream1 = stream::iter(vec![Ok(10), Err(false)]); 1298 /// let stream2 = stream::iter(vec![Err(true), Ok(20)]); 1299 /// 1300 /// let stream = stream1.chain(stream2); 1301 /// 1302 /// let result: Vec<_> = stream.collect().await; 1303 /// assert_eq!(result, vec![ 1304 /// Ok(10), 1305 /// Err(false), 1306 /// Err(true), 1307 /// Ok(20), 1308 /// ]); 1309 /// # }); 1310 /// ``` chain<St>(self, other: St) -> Chain<Self, St> where St: Stream<Item = Self::Item>, Self: Sized,1311 fn chain<St>(self, other: St) -> Chain<Self, St> 1312 where 1313 St: Stream<Item = Self::Item>, 1314 Self: Sized, 1315 { 1316 assert_stream::<Self::Item, _>(Chain::new(self, other)) 1317 } 1318 1319 /// Creates a new stream which exposes a `peek` method. 1320 /// 1321 /// Calling `peek` returns a reference to the next item in the stream. peekable(self) -> Peekable<Self> where Self: Sized,1322 fn peekable(self) -> Peekable<Self> 1323 where 1324 Self: Sized, 1325 { 1326 assert_stream::<Self::Item, _>(Peekable::new(self)) 1327 } 1328 1329 /// An adaptor for chunking up items of the stream inside a vector. 1330 /// 1331 /// This combinator will attempt to pull items from this stream and buffer 1332 /// them into a local vector. At most `capacity` items will get buffered 1333 /// before they're yielded from the returned stream. 1334 /// 1335 /// Note that the vectors returned from this iterator may not always have 1336 /// `capacity` elements. If the underlying stream ended and only a partial 1337 /// vector was created, it'll be returned. Additionally if an error happens 1338 /// from the underlying stream then the currently buffered items will be 1339 /// yielded. 1340 /// 1341 /// This method is only available when the `std` or `alloc` feature of this 1342 /// library is activated, and it is activated by default. 1343 /// 1344 /// # Panics 1345 /// 1346 /// This method will panic if `capacity` is zero. 1347 #[cfg(feature = "alloc")] chunks(self, capacity: usize) -> Chunks<Self> where Self: Sized,1348 fn chunks(self, capacity: usize) -> Chunks<Self> 1349 where 1350 Self: Sized, 1351 { 1352 assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity)) 1353 } 1354 1355 /// An adaptor for chunking up ready items of the stream inside a vector. 1356 /// 1357 /// This combinator will attempt to pull ready items from this stream and 1358 /// buffer them into a local vector. At most `capacity` items will get 1359 /// buffered before they're yielded from the returned stream. If underlying 1360 /// stream returns `Poll::Pending`, and collected chunk is not empty, it will 1361 /// be immediately returned. 1362 /// 1363 /// If the underlying stream ended and only a partial vector was created, 1364 /// it'll be returned. Additionally if an error happens from the underlying 1365 /// stream then the currently buffered items will be yielded. 1366 /// 1367 /// This method is only available when the `std` or `alloc` feature of this 1368 /// library is activated, and it is activated by default. 1369 /// 1370 /// # Panics 1371 /// 1372 /// This method will panic if `capacity` is zero. 1373 #[cfg(feature = "alloc")] ready_chunks(self, capacity: usize) -> ReadyChunks<Self> where Self: Sized,1374 fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self> 1375 where 1376 Self: Sized, 1377 { 1378 assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity)) 1379 } 1380 1381 /// A future that completes after the given stream has been fully processed 1382 /// into the sink and the sink has been flushed and closed. 1383 /// 1384 /// This future will drive the stream to keep producing items until it is 1385 /// exhausted, sending each item to the sink. It will complete once the 1386 /// stream is exhausted, the sink has received and flushed all items, and 1387 /// the sink is closed. Note that neither the original stream nor provided 1388 /// sink will be output by this future. Pass the sink by `Pin<&mut S>` 1389 /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in 1390 /// order to preserve access to the `Sink`. If the stream produces an error, 1391 /// that error will be returned by this future without flushing/closing the sink. 1392 #[cfg(feature = "sink")] 1393 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] forward<S>(self, sink: S) -> Forward<Self, S> where S: Sink<Self::Ok, Error = Self::Error>, Self: TryStream + Sized,1394 fn forward<S>(self, sink: S) -> Forward<Self, S> 1395 where 1396 S: Sink<Self::Ok, Error = Self::Error>, 1397 Self: TryStream + Sized, 1398 // Self: TryStream + Sized + Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>, 1399 { 1400 // TODO: type mismatch resolving `<Self as futures_core::Stream>::Item == std::result::Result<<Self as futures_core::TryStream>::Ok, <Self as futures_core::TryStream>::Error>` 1401 // assert_future::<Result<(), Self::Error>, _>(Forward::new(self, sink)) 1402 Forward::new(self, sink) 1403 } 1404 1405 /// Splits this `Stream + Sink` object into separate `Sink` and `Stream` 1406 /// objects. 1407 /// 1408 /// This can be useful when you want to split ownership between tasks, or 1409 /// allow direct interaction between the two objects (e.g. via 1410 /// `Sink::send_all`). 1411 /// 1412 /// This method is only available when the `std` or `alloc` feature of this 1413 /// library is activated, and it is activated by default. 1414 #[cfg(feature = "sink")] 1415 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 1416 #[cfg(not(futures_no_atomic_cas))] 1417 #[cfg(feature = "alloc")] split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) where Self: Sink<Item> + Sized,1418 fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) 1419 where 1420 Self: Sink<Item> + Sized, 1421 { 1422 let (sink, stream) = split::split(self); 1423 ( 1424 crate::sink::assert_sink::<Item, Self::Error, _>(sink), 1425 assert_stream::<Self::Item, _>(stream), 1426 ) 1427 } 1428 1429 /// Do something with each item of this stream, afterwards passing it on. 1430 /// 1431 /// This is similar to the `Iterator::inspect` method in the standard 1432 /// library where it allows easily inspecting each value as it passes 1433 /// through the stream, for example to debug what's going on. inspect<F>(self, f: F) -> Inspect<Self, F> where F: FnMut(&Self::Item), Self: Sized,1434 fn inspect<F>(self, f: F) -> Inspect<Self, F> 1435 where 1436 F: FnMut(&Self::Item), 1437 Self: Sized, 1438 { 1439 assert_stream::<Self::Item, _>(Inspect::new(self, f)) 1440 } 1441 1442 /// Wrap this stream in an `Either` stream, making it the left-hand variant 1443 /// of that `Either`. 1444 /// 1445 /// This can be used in combination with the `right_stream` method to write `if` 1446 /// statements that evaluate to different streams in different branches. left_stream<B>(self) -> Either<Self, B> where B: Stream<Item = Self::Item>, Self: Sized,1447 fn left_stream<B>(self) -> Either<Self, B> 1448 where 1449 B: Stream<Item = Self::Item>, 1450 Self: Sized, 1451 { 1452 assert_stream::<Self::Item, _>(Either::Left(self)) 1453 } 1454 1455 /// Wrap this stream in an `Either` stream, making it the right-hand variant 1456 /// of that `Either`. 1457 /// 1458 /// This can be used in combination with the `left_stream` method to write `if` 1459 /// statements that evaluate to different streams in different branches. right_stream<B>(self) -> Either<B, Self> where B: Stream<Item = Self::Item>, Self: Sized,1460 fn right_stream<B>(self) -> Either<B, Self> 1461 where 1462 B: Stream<Item = Self::Item>, 1463 Self: Sized, 1464 { 1465 assert_stream::<Self::Item, _>(Either::Right(self)) 1466 } 1467 1468 /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`] 1469 /// stream types. poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> where Self: Unpin,1470 fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> 1471 where 1472 Self: Unpin, 1473 { 1474 Pin::new(self).poll_next(cx) 1475 } 1476 1477 /// Returns a [`Future`] that resolves when the next item in this stream is 1478 /// ready. 1479 /// 1480 /// This is similar to the [`next`][StreamExt::next] method, but it won't 1481 /// resolve to [`None`] if used on an empty [`Stream`]. Instead, the 1482 /// returned future type will return `true` from 1483 /// [`FusedFuture::is_terminated`][] when the [`Stream`] is empty, allowing 1484 /// [`select_next_some`][StreamExt::select_next_some] to be easily used with 1485 /// the [`select!`] macro. 1486 /// 1487 /// If the future is polled after this [`Stream`] is empty it will panic. 1488 /// Using the future with a [`FusedFuture`][]-aware primitive like the 1489 /// [`select!`] macro will prevent this. 1490 /// 1491 /// [`FusedFuture`]: futures_core::future::FusedFuture 1492 /// [`FusedFuture::is_terminated`]: futures_core::future::FusedFuture::is_terminated 1493 /// 1494 /// # Examples 1495 /// 1496 /// ``` 1497 /// # futures::executor::block_on(async { 1498 /// use futures::{future, select}; 1499 /// use futures::stream::{StreamExt, FuturesUnordered}; 1500 /// 1501 /// let mut fut = future::ready(1); 1502 /// let mut async_tasks = FuturesUnordered::new(); 1503 /// let mut total = 0; 1504 /// loop { 1505 /// select! { 1506 /// num = fut => { 1507 /// // First, the `ready` future completes. 1508 /// total += num; 1509 /// // Then we spawn a new task onto `async_tasks`, 1510 /// async_tasks.push(async { 5 }); 1511 /// }, 1512 /// // On the next iteration of the loop, the task we spawned 1513 /// // completes. 1514 /// num = async_tasks.select_next_some() => { 1515 /// total += num; 1516 /// } 1517 /// // Finally, both the `ready` future and `async_tasks` have 1518 /// // finished, so we enter the `complete` branch. 1519 /// complete => break, 1520 /// } 1521 /// } 1522 /// assert_eq!(total, 6); 1523 /// # }); 1524 /// ``` select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream,1525 fn select_next_some(&mut self) -> SelectNextSome<'_, Self> 1526 where 1527 Self: Unpin + FusedStream, 1528 { 1529 assert_future::<Self::Item, _>(SelectNextSome::new(self)) 1530 } 1531 } 1532