1 //! Streams 2 //! 3 //! This module contains a number of functions for working with `Stream`s, 4 //! including the `StreamExt` trait which adds methods to `Stream` types. 5 6 use crate::future::{assert_future, Either}; 7 #[cfg(feature = "alloc")] 8 use alloc::boxed::Box; 9 use core::pin::Pin; 10 #[cfg(feature = "sink")] 11 use futures_core::stream::TryStream; 12 #[cfg(feature = "alloc")] 13 use futures_core::stream::{BoxStream, LocalBoxStream}; 14 use futures_core::{ 15 future::Future, 16 stream::{FusedStream, Stream}, 17 task::{Context, Poll}, 18 }; 19 #[cfg(feature = "sink")] 20 use futures_sink::Sink; 21 22 use crate::fns::{InspectFn, inspect_fn}; 23 24 mod chain; 25 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 26 pub use self::chain::Chain; 27 28 mod collect; 29 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 30 pub use self::collect::Collect; 31 32 mod concat; 33 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 34 pub use self::concat::Concat; 35 36 mod cycle; 37 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 38 pub use self::cycle::Cycle; 39 40 mod enumerate; 41 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 42 pub use self::enumerate::Enumerate; 43 44 mod filter; 45 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 46 pub use self::filter::Filter; 47 48 mod filter_map; 49 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 50 pub use self::filter_map::FilterMap; 51 52 mod flatten; 53 54 delegate_all!( 55 /// Stream for the [`flatten`](StreamExt::flatten) method. 56 Flatten<St>( 57 flatten::Flatten<St, St::Item> 58 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)] 59 where St: Stream 60 ); 61 62 mod fold; 63 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 64 pub use self::fold::Fold; 65 66 #[cfg(feature = "sink")] 67 mod forward; 68 69 #[cfg(feature = "sink")] 70 delegate_all!( 71 /// Future for the [`forward`](super::StreamExt::forward) method. 72 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 73 Forward<St, Si>( 74 forward::Forward<St, Si, St::Ok> 75 ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)] 76 where St: TryStream 77 ); 78 79 mod for_each; 80 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 81 pub use self::for_each::ForEach; 82 83 mod fuse; 84 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 85 pub use self::fuse::Fuse; 86 87 mod into_future; 88 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 89 pub use self::into_future::StreamFuture; 90 91 delegate_all!( 92 /// Stream for the [`inspect`](StreamExt::inspect) method. 93 Inspect<St, F>( 94 map::Map<St, InspectFn<F>> 95 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| map::Map::new(x, inspect_fn(f))] 96 ); 97 98 mod map; 99 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 100 pub use self::map::Map; 101 102 delegate_all!( 103 /// Stream for the [`flat_map`](StreamExt::flat_map) method. 104 FlatMap<St, U, F>( 105 flatten::Flatten<Map<St, F>, U> 106 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))] 107 ); 108 109 mod next; 110 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 111 pub use self::next::Next; 112 113 mod select_next_some; 114 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 115 pub use self::select_next_some::SelectNextSome; 116 117 mod peek; 118 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 119 pub use self::peek::{Peek, Peekable}; 120 121 mod skip; 122 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 123 pub use self::skip::Skip; 124 125 mod skip_while; 126 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 127 pub use self::skip_while::SkipWhile; 128 129 mod take; 130 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 131 pub use self::take::Take; 132 133 mod take_while; 134 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 135 pub use self::take_while::TakeWhile; 136 137 mod take_until; 138 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 139 pub use self::take_until::TakeUntil; 140 141 mod then; 142 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 143 pub use self::then::Then; 144 145 mod zip; 146 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 147 pub use self::zip::Zip; 148 149 #[cfg(feature = "alloc")] 150 mod chunks; 151 #[cfg(feature = "alloc")] 152 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 153 pub use self::chunks::Chunks; 154 155 #[cfg(feature = "alloc")] 156 mod ready_chunks; 157 #[cfg(feature = "alloc")] 158 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 159 pub use self::ready_chunks::ReadyChunks; 160 161 mod scan; 162 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 163 pub use self::scan::Scan; 164 165 cfg_target_has_atomic! { 166 #[cfg(feature = "alloc")] 167 mod buffer_unordered; 168 #[cfg(feature = "alloc")] 169 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 170 pub use self::buffer_unordered::BufferUnordered; 171 172 #[cfg(feature = "alloc")] 173 mod buffered; 174 #[cfg(feature = "alloc")] 175 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 176 pub use self::buffered::Buffered; 177 178 #[cfg(feature = "alloc")] 179 mod for_each_concurrent; 180 #[cfg(feature = "alloc")] 181 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 182 pub use self::for_each_concurrent::ForEachConcurrent; 183 184 #[cfg(feature = "sink")] 185 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 186 #[cfg(feature = "alloc")] 187 mod split; 188 #[cfg(feature = "sink")] 189 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 190 #[cfg(feature = "alloc")] 191 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 192 pub use self::split::{SplitStream, SplitSink, ReuniteError}; 193 } 194 195 #[cfg(feature = "std")] 196 mod catch_unwind; 197 #[cfg(feature = "std")] 198 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 199 pub use self::catch_unwind::CatchUnwind; 200 use crate::stream::assert_stream; 201 202 impl<T: ?Sized> StreamExt for T where T: Stream {} 203 204 /// An extension trait for `Stream`s that provides a variety of convenient 205 /// combinator functions. 206 pub trait StreamExt: Stream { 207 /// Creates a future that resolves to the next item in the stream. 208 /// 209 /// Note that because `next` doesn't take ownership over the stream, 210 /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a 211 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can 212 /// be done by boxing the stream using [`Box::pin`] or 213 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` 214 /// crate. 215 /// 216 /// # Examples 217 /// 218 /// ``` 219 /// # futures::executor::block_on(async { 220 /// use futures::stream::{self, StreamExt}; 221 /// 222 /// let mut stream = stream::iter(1..=3); 223 /// 224 /// assert_eq!(stream.next().await, Some(1)); 225 /// assert_eq!(stream.next().await, Some(2)); 226 /// assert_eq!(stream.next().await, Some(3)); 227 /// assert_eq!(stream.next().await, None); 228 /// # }); 229 /// ``` next(&mut self) -> Next<'_, Self> where Self: Unpin,230 fn next(&mut self) -> Next<'_, Self> 231 where 232 Self: Unpin, 233 { 234 assert_future::<Option<Self::Item>, _>(Next::new(self)) 235 } 236 237 /// Converts this stream into a future of `(next_item, tail_of_stream)`. 238 /// If the stream terminates, then the next item is [`None`]. 239 /// 240 /// The returned future can be used to compose streams and futures together 241 /// by placing everything into the "world of futures". 242 /// 243 /// Note that because `into_future` moves the stream, the [`Stream`] type 244 /// must be [`Unpin`]. If you want to use `into_future` with a 245 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can 246 /// be done by boxing the stream using [`Box::pin`] or 247 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` 248 /// crate. 249 /// 250 /// # Examples 251 /// 252 /// ``` 253 /// # futures::executor::block_on(async { 254 /// use futures::stream::{self, StreamExt}; 255 /// 256 /// let stream = stream::iter(1..=3); 257 /// 258 /// let (item, stream) = stream.into_future().await; 259 /// assert_eq!(Some(1), item); 260 /// 261 /// let (item, stream) = stream.into_future().await; 262 /// assert_eq!(Some(2), item); 263 /// # }); 264 /// ``` into_future(self) -> StreamFuture<Self> where Self: Sized + Unpin,265 fn into_future(self) -> StreamFuture<Self> 266 where 267 Self: Sized + Unpin, 268 { 269 assert_future::<(Option<Self::Item>, Self), _>(StreamFuture::new(self)) 270 } 271 272 /// Maps this stream's items to a different type, returning a new stream of 273 /// the resulting type. 274 /// 275 /// The provided closure is executed over all elements of this stream as 276 /// they are made available. It is executed inline with calls to 277 /// [`poll_next`](Stream::poll_next). 278 /// 279 /// Note that this function consumes the stream passed into it and returns a 280 /// wrapped version of it, similar to the existing `map` methods in the 281 /// standard library. 282 /// 283 /// # Examples 284 /// 285 /// ``` 286 /// # futures::executor::block_on(async { 287 /// use futures::stream::{self, StreamExt}; 288 /// 289 /// let stream = stream::iter(1..=3); 290 /// let stream = stream.map(|x| x + 3); 291 /// 292 /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await); 293 /// # }); 294 /// ``` map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,295 fn map<T, F>(self, f: F) -> Map<Self, F> 296 where 297 F: FnMut(Self::Item) -> T, 298 Self: Sized, 299 { 300 assert_stream::<T, _>(Map::new(self, f)) 301 } 302 303 /// Creates a stream which gives the current iteration count as well as 304 /// the next value. 305 /// 306 /// The stream returned yields pairs `(i, val)`, where `i` is the 307 /// current index of iteration and `val` is the value returned by the 308 /// stream. 309 /// 310 /// `enumerate()` keeps its count as a [`usize`]. If you want to count by a 311 /// different sized integer, the [`zip`](StreamExt::zip) function provides similar 312 /// functionality. 313 /// 314 /// # Overflow Behavior 315 /// 316 /// The method does no guarding against overflows, so enumerating more than 317 /// [`prim@usize::max_value()`] elements either produces the wrong result or panics. If 318 /// debug assertions are enabled, a panic is guaranteed. 319 /// 320 /// # Panics 321 /// 322 /// The returned stream might panic if the to-be-returned index would 323 /// overflow a [`usize`]. 324 /// 325 /// # Examples 326 /// 327 /// ``` 328 /// # futures::executor::block_on(async { 329 /// use futures::stream::{self, StreamExt}; 330 /// 331 /// let stream = stream::iter(vec!['a', 'b', 'c']); 332 /// 333 /// let mut stream = stream.enumerate(); 334 /// 335 /// assert_eq!(stream.next().await, Some((0, 'a'))); 336 /// assert_eq!(stream.next().await, Some((1, 'b'))); 337 /// assert_eq!(stream.next().await, Some((2, 'c'))); 338 /// assert_eq!(stream.next().await, None); 339 /// # }); 340 /// ``` enumerate(self) -> Enumerate<Self> where Self: Sized,341 fn enumerate(self) -> Enumerate<Self> 342 where 343 Self: Sized, 344 { 345 assert_stream::<(usize, Self::Item), _>(Enumerate::new(self)) 346 } 347 348 /// Filters the values produced by this stream according to the provided 349 /// asynchronous predicate. 350 /// 351 /// As values of this stream are made available, the provided predicate `f` 352 /// will be run against them. If the predicate returns a `Future` which 353 /// resolves to `true`, then the stream will yield the value, but if the 354 /// predicate returns a `Future` which resolves to `false`, then the value 355 /// will be discarded and the next value will be produced. 356 /// 357 /// Note that this function consumes the stream passed into it and returns a 358 /// wrapped version of it, similar to the existing `filter` methods in the 359 /// standard library. 360 /// 361 /// # Examples 362 /// 363 /// ``` 364 /// # futures::executor::block_on(async { 365 /// use futures::future; 366 /// use futures::stream::{self, StreamExt}; 367 /// 368 /// let stream = stream::iter(1..=10); 369 /// let evens = stream.filter(|x| future::ready(x % 2 == 0)); 370 /// 371 /// assert_eq!(vec![2, 4, 6, 8, 10], evens.collect::<Vec<_>>().await); 372 /// # }); 373 /// ``` filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,374 fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> 375 where 376 F: FnMut(&Self::Item) -> Fut, 377 Fut: Future<Output = bool>, 378 Self: Sized, 379 { 380 assert_stream::<Self::Item, _>(Filter::new(self, f)) 381 } 382 383 /// Filters the values produced by this stream while simultaneously mapping 384 /// them to a different type according to the provided asynchronous closure. 385 /// 386 /// As values of this stream are made available, the provided function will 387 /// be run on them. If the future returned by the predicate `f` resolves to 388 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if 389 /// it resolves to [`None`] then the next value will be produced. 390 /// 391 /// Note that this function consumes the stream passed into it and returns a 392 /// wrapped version of it, similar to the existing `filter_map` methods in 393 /// the standard library. 394 /// 395 /// # Examples 396 /// ``` 397 /// # futures::executor::block_on(async { 398 /// use futures::stream::{self, StreamExt}; 399 /// 400 /// let stream = stream::iter(1..=10); 401 /// let evens = stream.filter_map(|x| async move { 402 /// if x % 2 == 0 { Some(x + 1) } else { None } 403 /// }); 404 /// 405 /// assert_eq!(vec![3, 5, 7, 9, 11], evens.collect::<Vec<_>>().await); 406 /// # }); 407 /// ``` filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = Option<T>>, Self: Sized,408 fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> 409 where 410 F: FnMut(Self::Item) -> Fut, 411 Fut: Future<Output = Option<T>>, 412 Self: Sized, 413 { 414 assert_stream::<T, _>(FilterMap::new(self, f)) 415 } 416 417 /// Computes from this stream's items new items of a different type using 418 /// an asynchronous closure. 419 /// 420 /// The provided closure `f` will be called with an `Item` once a value is 421 /// ready, it returns a future which will then be run to completion 422 /// to produce the next value on this stream. 423 /// 424 /// Note that this function consumes the stream passed into it and returns a 425 /// wrapped version of it. 426 /// 427 /// # Examples 428 /// 429 /// ``` 430 /// # futures::executor::block_on(async { 431 /// use futures::stream::{self, StreamExt}; 432 /// 433 /// let stream = stream::iter(1..=3); 434 /// let stream = stream.then(|x| async move { x + 3 }); 435 /// 436 /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await); 437 /// # }); 438 /// ``` then<Fut, F>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,439 fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F> 440 where 441 F: FnMut(Self::Item) -> Fut, 442 Fut: Future, 443 Self: Sized, 444 { 445 assert_stream::<Fut::Output, _>(Then::new(self, f)) 446 } 447 448 /// Transforms a stream into a collection, returning a 449 /// future representing the result of that computation. 450 /// 451 /// The returned future will be resolved when the stream terminates. 452 /// 453 /// # Examples 454 /// 455 /// ``` 456 /// # futures::executor::block_on(async { 457 /// use futures::channel::mpsc; 458 /// use futures::stream::StreamExt; 459 /// use std::thread; 460 /// 461 /// let (tx, rx) = mpsc::unbounded(); 462 /// 463 /// thread::spawn(move || { 464 /// for i in 1..=5 { 465 /// tx.unbounded_send(i).unwrap(); 466 /// } 467 /// }); 468 /// 469 /// let output = rx.collect::<Vec<i32>>().await; 470 /// assert_eq!(output, vec![1, 2, 3, 4, 5]); 471 /// # }); 472 /// ``` collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C> where Self: Sized,473 fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C> 474 where 475 Self: Sized, 476 { 477 assert_future::<C, _>(Collect::new(self)) 478 } 479 480 /// Concatenate all items of a stream into a single extendable 481 /// destination, returning a future representing the end result. 482 /// 483 /// This combinator will extend the first item with the contents 484 /// of all the subsequent results of the stream. If the stream is 485 /// empty, the default value will be returned. 486 /// 487 /// Works with all collections that implement the 488 /// [`Extend`](std::iter::Extend) trait. 489 /// 490 /// # Examples 491 /// 492 /// ``` 493 /// # futures::executor::block_on(async { 494 /// use futures::channel::mpsc; 495 /// use futures::stream::StreamExt; 496 /// use std::thread; 497 /// 498 /// let (tx, rx) = mpsc::unbounded(); 499 /// 500 /// thread::spawn(move || { 501 /// for i in (0..3).rev() { 502 /// let n = i * 3; 503 /// tx.unbounded_send(vec![n + 1, n + 2, n + 3]).unwrap(); 504 /// } 505 /// }); 506 /// 507 /// let result = rx.concat().await; 508 /// 509 /// assert_eq!(result, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]); 510 /// # }); 511 /// ``` concat(self) -> Concat<Self> where Self: Sized, Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,512 fn concat(self) -> Concat<Self> 513 where 514 Self: Sized, 515 Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default, 516 { 517 assert_future::<Self::Item, _>(Concat::new(self)) 518 } 519 520 /// Repeats a stream endlessly. 521 /// 522 /// The stream never terminates. Note that you likely want to avoid 523 /// usage of `collect` or such on the returned stream as it will exhaust 524 /// available memory as it tries to just fill up all RAM. 525 /// 526 /// # Examples 527 /// 528 /// ``` 529 /// # futures::executor::block_on(async { 530 /// use futures::stream::{self, StreamExt}; 531 /// let a = [1, 2, 3]; 532 /// let mut s = stream::iter(a.iter()).cycle(); 533 /// 534 /// assert_eq!(s.next().await, Some(&1)); 535 /// assert_eq!(s.next().await, Some(&2)); 536 /// assert_eq!(s.next().await, Some(&3)); 537 /// assert_eq!(s.next().await, Some(&1)); 538 /// assert_eq!(s.next().await, Some(&2)); 539 /// assert_eq!(s.next().await, Some(&3)); 540 /// assert_eq!(s.next().await, Some(&1)); 541 /// # }); 542 /// ``` cycle(self) -> Cycle<Self> where Self: Sized + Clone,543 fn cycle(self) -> Cycle<Self> 544 where 545 Self: Sized + Clone, 546 { 547 assert_stream::<Self::Item, _>(Cycle::new(self)) 548 } 549 550 /// Execute an accumulating asynchronous computation over a stream, 551 /// collecting all the values into one final result. 552 /// 553 /// This combinator will accumulate all values returned by this stream 554 /// according to the closure provided. The initial state is also provided to 555 /// this method and then is returned again by each execution of the closure. 556 /// Once the entire stream has been exhausted the returned future will 557 /// resolve to this value. 558 /// 559 /// # Examples 560 /// 561 /// ``` 562 /// # futures::executor::block_on(async { 563 /// use futures::stream::{self, StreamExt}; 564 /// 565 /// let number_stream = stream::iter(0..6); 566 /// let sum = number_stream.fold(0, |acc, x| async move { acc + x }); 567 /// assert_eq!(sum.await, 15); 568 /// # }); 569 /// ``` fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> where F: FnMut(T, Self::Item) -> Fut, Fut: Future<Output = T>, Self: Sized,570 fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> 571 where 572 F: FnMut(T, Self::Item) -> Fut, 573 Fut: Future<Output = T>, 574 Self: Sized, 575 { 576 assert_future::<T, _>(Fold::new(self, f, init)) 577 } 578 579 /// Flattens a stream of streams into just one continuous stream. 580 /// 581 /// # Examples 582 /// 583 /// ``` 584 /// # futures::executor::block_on(async { 585 /// use futures::channel::mpsc; 586 /// use futures::stream::StreamExt; 587 /// use std::thread; 588 /// 589 /// let (tx1, rx1) = mpsc::unbounded(); 590 /// let (tx2, rx2) = mpsc::unbounded(); 591 /// let (tx3, rx3) = mpsc::unbounded(); 592 /// 593 /// thread::spawn(move || { 594 /// tx1.unbounded_send(1).unwrap(); 595 /// tx1.unbounded_send(2).unwrap(); 596 /// }); 597 /// thread::spawn(move || { 598 /// tx2.unbounded_send(3).unwrap(); 599 /// tx2.unbounded_send(4).unwrap(); 600 /// }); 601 /// thread::spawn(move || { 602 /// tx3.unbounded_send(rx1).unwrap(); 603 /// tx3.unbounded_send(rx2).unwrap(); 604 /// }); 605 /// 606 /// let output = rx3.flatten().collect::<Vec<i32>>().await; 607 /// assert_eq!(output, vec![1, 2, 3, 4]); 608 /// # }); 609 /// ``` flatten(self) -> Flatten<Self> where Self::Item: Stream, Self: Sized,610 fn flatten(self) -> Flatten<Self> 611 where 612 Self::Item: Stream, 613 Self: Sized, 614 { 615 assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self)) 616 } 617 618 /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s. 619 /// 620 /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead, 621 /// you would have to chain combinators like `.map(f).flatten()` while this 622 /// combinator provides ability to write `.flat_map(f)` instead of chaining. 623 /// 624 /// The provided closure which produce inner streams is executed over all elements 625 /// of stream as last inner stream is terminated and next stream item is available. 626 /// 627 /// Note that this function consumes the stream passed into it and returns a 628 /// wrapped version of it, similar to the existing `flat_map` methods in the 629 /// standard library. 630 /// 631 /// # Examples 632 /// 633 /// ``` 634 /// # futures::executor::block_on(async { 635 /// use futures::stream::{self, StreamExt}; 636 /// 637 /// let stream = stream::iter(1..=3); 638 /// let stream = stream.flat_map(|x| stream::iter(vec![x + 3; x])); 639 /// 640 /// assert_eq!(vec![4, 5, 5, 6, 6, 6], stream.collect::<Vec<_>>().await); 641 /// # }); 642 /// ``` flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> where F: FnMut(Self::Item) -> U, U: Stream, Self: Sized,643 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> 644 where 645 F: FnMut(Self::Item) -> U, 646 U: Stream, 647 Self: Sized, 648 { 649 FlatMap::new(self, f) 650 } 651 652 /// Combinator similar to [`StreamExt::fold`] that holds internal state 653 /// and produces a new stream. 654 /// 655 /// Accepts initial state and closure which will be applied to each element 656 /// of the stream until provided closure returns `None`. Once `None` is 657 /// returned, stream will be terminated. 658 /// 659 /// # Examples 660 /// 661 /// ``` 662 /// # futures::executor::block_on(async { 663 /// use futures::future; 664 /// use futures::stream::{self, StreamExt}; 665 /// 666 /// let stream = stream::iter(1..=10); 667 /// 668 /// let stream = stream.scan(0, |state, x| { 669 /// *state += x; 670 /// future::ready(if *state < 10 { Some(x) } else { None }) 671 /// }); 672 /// 673 /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await); 674 /// # }); 675 /// ``` scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> where F: FnMut(&mut S, Self::Item) -> Fut, Fut: Future<Output = Option<B>>, Self: Sized,676 fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> 677 where 678 F: FnMut(&mut S, Self::Item) -> Fut, 679 Fut: Future<Output = Option<B>>, 680 Self: Sized, 681 { 682 Scan::new(self, initial_state, f) 683 } 684 685 /// Skip elements on this stream while the provided asynchronous predicate 686 /// resolves to `true`. 687 /// 688 /// This function, like `Iterator::skip_while`, will skip elements on the 689 /// stream until the predicate `f` resolves to `false`. Once one element 690 /// returns `false`, all future elements will be returned from the underlying 691 /// stream. 692 /// 693 /// # Examples 694 /// 695 /// ``` 696 /// # futures::executor::block_on(async { 697 /// use futures::future; 698 /// use futures::stream::{self, StreamExt}; 699 /// 700 /// let stream = stream::iter(1..=10); 701 /// 702 /// let stream = stream.skip_while(|x| future::ready(*x <= 5)); 703 /// 704 /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await); 705 /// # }); 706 /// ``` skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,707 fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> 708 where 709 F: FnMut(&Self::Item) -> Fut, 710 Fut: Future<Output = bool>, 711 Self: Sized, 712 { 713 assert_stream::<Self::Item, _>(SkipWhile::new(self, f)) 714 } 715 716 /// Take elements from this stream while the provided asynchronous predicate 717 /// resolves to `true`. 718 /// 719 /// This function, like `Iterator::take_while`, will take elements from the 720 /// stream until the predicate `f` resolves to `false`. Once one element 721 /// returns `false`, it will always return that the stream is done. 722 /// 723 /// # Examples 724 /// 725 /// ``` 726 /// # futures::executor::block_on(async { 727 /// use futures::future; 728 /// use futures::stream::{self, StreamExt}; 729 /// 730 /// let stream = stream::iter(1..=10); 731 /// 732 /// let stream = stream.take_while(|x| future::ready(*x <= 5)); 733 /// 734 /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await); 735 /// # }); 736 /// ``` take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,737 fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> 738 where 739 F: FnMut(&Self::Item) -> Fut, 740 Fut: Future<Output = bool>, 741 Self: Sized, 742 { 743 assert_stream::<Self::Item, _>(TakeWhile::new(self, f)) 744 } 745 746 /// Take elements from this stream until the provided future resolves. 747 /// 748 /// This function will take elements from the stream until the provided 749 /// stopping future `fut` resolves. Once the `fut` future becomes ready, 750 /// this stream combinator will always return that the stream is done. 751 /// 752 /// The stopping future may return any type. Once the stream is stopped 753 /// the result of the stopping future may be aceessed with `TakeUntil::take_result()`. 754 /// The stream may also be resumed with `TakeUntil::take_future()`. 755 /// See the documentation of [`TakeUntil`] for more information. 756 /// 757 /// # Examples 758 /// 759 /// ``` 760 /// # futures::executor::block_on(async { 761 /// use futures::future; 762 /// use futures::stream::{self, StreamExt}; 763 /// use futures::task::Poll; 764 /// 765 /// let stream = stream::iter(1..=10); 766 /// 767 /// let mut i = 0; 768 /// let stop_fut = future::poll_fn(|_cx| { 769 /// i += 1; 770 /// if i <= 5 { 771 /// Poll::Pending 772 /// } else { 773 /// Poll::Ready(()) 774 /// } 775 /// }); 776 /// 777 /// let stream = stream.take_until(stop_fut); 778 /// 779 /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await); 780 /// # }); 781 /// ``` take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> where Fut: Future, Self: Sized,782 fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> 783 where 784 Fut: Future, 785 Self: Sized, 786 { 787 TakeUntil::new(self, fut) 788 } 789 790 /// Runs this stream to completion, executing the provided asynchronous 791 /// closure for each element on the stream. 792 /// 793 /// The closure provided will be called for each item this stream produces, 794 /// yielding a future. That future will then be executed to completion 795 /// before moving on to the next item. 796 /// 797 /// The returned value is a `Future` where the `Output` type is `()`; it is 798 /// executed entirely for its side effects. 799 /// 800 /// To process each item in the stream and produce another stream instead 801 /// of a single future, use `then` instead. 802 /// 803 /// # Examples 804 /// 805 /// ``` 806 /// # futures::executor::block_on(async { 807 /// use futures::future; 808 /// use futures::stream::{self, StreamExt}; 809 /// 810 /// let mut x = 0; 811 /// 812 /// { 813 /// let fut = stream::repeat(1).take(3).for_each(|item| { 814 /// x += item; 815 /// future::ready(()) 816 /// }); 817 /// fut.await; 818 /// } 819 /// 820 /// assert_eq!(x, 3); 821 /// # }); 822 /// ``` for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,823 fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> 824 where 825 F: FnMut(Self::Item) -> Fut, 826 Fut: Future<Output = ()>, 827 Self: Sized, 828 { 829 assert_future::<(), _>(ForEach::new(self, f)) 830 } 831 832 /// Runs this stream to completion, executing the provided asynchronous 833 /// closure for each element on the stream concurrently as elements become 834 /// available. 835 /// 836 /// This is similar to [`StreamExt::for_each`], but the futures 837 /// produced by the closure are run concurrently (but not in parallel-- 838 /// this combinator does not introduce any threads). 839 /// 840 /// The closure provided will be called for each item this stream produces, 841 /// yielding a future. That future will then be executed to completion 842 /// concurrently with the other futures produced by the closure. 843 /// 844 /// The first argument is an optional limit on the number of concurrent 845 /// futures. If this limit is not `None`, no more than `limit` futures 846 /// will be run concurrently. The `limit` argument is of type 847 /// `Into<Option<usize>>`, and so can be provided as either `None`, 848 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as 849 /// no limit at all, and will have the same result as passing in `None`. 850 /// 851 /// This method is only available when the `std` or `alloc` feature of this 852 /// library is activated, and it is activated by default. 853 /// 854 /// # Examples 855 /// 856 /// ``` 857 /// # futures::executor::block_on(async { 858 /// use futures::channel::oneshot; 859 /// use futures::stream::{self, StreamExt}; 860 /// 861 /// let (tx1, rx1) = oneshot::channel(); 862 /// let (tx2, rx2) = oneshot::channel(); 863 /// let (tx3, rx3) = oneshot::channel(); 864 /// 865 /// let fut = stream::iter(vec![rx1, rx2, rx3]).for_each_concurrent( 866 /// /* limit */ 2, 867 /// |rx| async move { 868 /// rx.await.unwrap(); 869 /// } 870 /// ); 871 /// tx1.send(()).unwrap(); 872 /// tx2.send(()).unwrap(); 873 /// tx3.send(()).unwrap(); 874 /// fut.await; 875 /// # }) 876 /// ``` 877 #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] 878 #[cfg(feature = "alloc")] for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> ForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,879 fn for_each_concurrent<Fut, F>( 880 self, 881 limit: impl Into<Option<usize>>, 882 f: F, 883 ) -> ForEachConcurrent<Self, Fut, F> 884 where 885 F: FnMut(Self::Item) -> Fut, 886 Fut: Future<Output = ()>, 887 Self: Sized, 888 { 889 assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f)) 890 } 891 892 /// Creates a new stream of at most `n` items of the underlying stream. 893 /// 894 /// Once `n` items have been yielded from this stream then it will always 895 /// return that the stream is done. 896 /// 897 /// # Examples 898 /// 899 /// ``` 900 /// # futures::executor::block_on(async { 901 /// use futures::stream::{self, StreamExt}; 902 /// 903 /// let stream = stream::iter(1..=10).take(3); 904 /// 905 /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await); 906 /// # }); 907 /// ``` take(self, n: usize) -> Take<Self> where Self: Sized,908 fn take(self, n: usize) -> Take<Self> 909 where 910 Self: Sized, 911 { 912 assert_stream::<Self::Item, _>(Take::new(self, n)) 913 } 914 915 /// Creates a new stream which skips `n` items of the underlying stream. 916 /// 917 /// Once `n` items have been skipped from this stream then it will always 918 /// return the remaining items on this stream. 919 /// 920 /// # Examples 921 /// 922 /// ``` 923 /// # futures::executor::block_on(async { 924 /// use futures::stream::{self, StreamExt}; 925 /// 926 /// let stream = stream::iter(1..=10).skip(5); 927 /// 928 /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await); 929 /// # }); 930 /// ``` skip(self, n: usize) -> Skip<Self> where Self: Sized,931 fn skip(self, n: usize) -> Skip<Self> 932 where 933 Self: Sized, 934 { 935 assert_stream::<Self::Item, _>(Skip::new(self, n)) 936 } 937 938 /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never 939 /// again be called once it has finished. This method can be used to turn 940 /// any `Stream` into a `FusedStream`. 941 /// 942 /// Normally, once a stream has returned [`None`] from 943 /// [`poll_next`](Stream::poll_next) any further calls could exhibit bad 944 /// behavior such as block forever, panic, never return, etc. If it is known 945 /// that [`poll_next`](Stream::poll_next) may be called after stream 946 /// has already finished, then this method can be used to ensure that it has 947 /// defined semantics. 948 /// 949 /// The [`poll_next`](Stream::poll_next) method of a `fuse`d stream 950 /// is guaranteed to return [`None`] after the underlying stream has 951 /// finished. 952 /// 953 /// # Examples 954 /// 955 /// ``` 956 /// use futures::executor::block_on_stream; 957 /// use futures::stream::{self, StreamExt}; 958 /// use futures::task::Poll; 959 /// 960 /// let mut x = 0; 961 /// let stream = stream::poll_fn(|_| { 962 /// x += 1; 963 /// match x { 964 /// 0..=2 => Poll::Ready(Some(x)), 965 /// 3 => Poll::Ready(None), 966 /// _ => panic!("should not happen") 967 /// } 968 /// }).fuse(); 969 /// 970 /// let mut iter = block_on_stream(stream); 971 /// assert_eq!(Some(1), iter.next()); 972 /// assert_eq!(Some(2), iter.next()); 973 /// assert_eq!(None, iter.next()); 974 /// assert_eq!(None, iter.next()); 975 /// // ... 976 /// ``` fuse(self) -> Fuse<Self> where Self: Sized,977 fn fuse(self) -> Fuse<Self> 978 where 979 Self: Sized, 980 { 981 assert_stream::<Self::Item, _>(Fuse::new(self)) 982 } 983 984 /// Borrows a stream, rather than consuming it. 985 /// 986 /// This is useful to allow applying stream adaptors while still retaining 987 /// ownership of the original stream. 988 /// 989 /// # Examples 990 /// 991 /// ``` 992 /// # futures::executor::block_on(async { 993 /// use futures::stream::{self, StreamExt}; 994 /// 995 /// let mut stream = stream::iter(1..5); 996 /// 997 /// let sum = stream.by_ref() 998 /// .take(2) 999 /// .fold(0, |a, b| async move { a + b }) 1000 /// .await; 1001 /// assert_eq!(sum, 3); 1002 /// 1003 /// // You can use the stream again 1004 /// let sum = stream.take(2) 1005 /// .fold(0, |a, b| async move { a + b }) 1006 /// .await; 1007 /// assert_eq!(sum, 7); 1008 /// # }); 1009 /// ``` by_ref(&mut self) -> &mut Self1010 fn by_ref(&mut self) -> &mut Self { 1011 self 1012 } 1013 1014 /// Catches unwinding panics while polling the stream. 1015 /// 1016 /// Caught panic (if any) will be the last element of the resulting stream. 1017 /// 1018 /// In general, panics within a stream can propagate all the way out to the 1019 /// task level. This combinator makes it possible to halt unwinding within 1020 /// the stream itself. It's most commonly used within task executors. This 1021 /// method should not be used for error handling. 1022 /// 1023 /// Note that this method requires the `UnwindSafe` bound from the standard 1024 /// library. This isn't always applied automatically, and the standard 1025 /// library provides an `AssertUnwindSafe` wrapper type to apply it 1026 /// after-the fact. To assist using this method, the [`Stream`] trait is 1027 /// also implemented for `AssertUnwindSafe<St>` where `St` implements 1028 /// [`Stream`]. 1029 /// 1030 /// This method is only available when the `std` feature of this 1031 /// library is activated, and it is activated by default. 1032 /// 1033 /// # Examples 1034 /// 1035 /// ``` 1036 /// # futures::executor::block_on(async { 1037 /// use futures::stream::{self, StreamExt}; 1038 /// 1039 /// let stream = stream::iter(vec![Some(10), None, Some(11)]); 1040 /// // Panic on second element 1041 /// let stream_panicking = stream.map(|o| o.unwrap()); 1042 /// // Collect all the results 1043 /// let stream = stream_panicking.catch_unwind(); 1044 /// 1045 /// let results: Vec<Result<i32, _>> = stream.collect().await; 1046 /// match results[0] { 1047 /// Ok(10) => {} 1048 /// _ => panic!("unexpected result!"), 1049 /// } 1050 /// assert!(results[1].is_err()); 1051 /// assert_eq!(results.len(), 2); 1052 /// # }); 1053 /// ``` 1054 #[cfg(feature = "std")] catch_unwind(self) -> CatchUnwind<Self> where Self: Sized + std::panic::UnwindSafe,1055 fn catch_unwind(self) -> CatchUnwind<Self> 1056 where 1057 Self: Sized + std::panic::UnwindSafe, 1058 { 1059 assert_stream(CatchUnwind::new(self)) 1060 } 1061 1062 /// Wrap the stream in a Box, pinning it. 1063 /// 1064 /// This method is only available when the `std` or `alloc` feature of this 1065 /// library is activated, and it is activated by default. 1066 #[cfg(feature = "alloc")] boxed<'a>(self) -> BoxStream<'a, Self::Item> where Self: Sized + Send + 'a,1067 fn boxed<'a>(self) -> BoxStream<'a, Self::Item> 1068 where 1069 Self: Sized + Send + 'a, 1070 { 1071 assert_stream::<Self::Item, _>(Box::pin(self)) 1072 } 1073 1074 /// Wrap the stream in a Box, pinning it. 1075 /// 1076 /// Similar to `boxed`, but without the `Send` requirement. 1077 /// 1078 /// This method is only available when the `std` or `alloc` feature of this 1079 /// library is activated, and it is activated by default. 1080 #[cfg(feature = "alloc")] boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item> where Self: Sized + 'a,1081 fn boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item> 1082 where 1083 Self: Sized + 'a, 1084 { 1085 assert_stream::<Self::Item, _>(Box::pin(self)) 1086 } 1087 1088 /// An adaptor for creating a buffered list of pending futures. 1089 /// 1090 /// If this stream's item can be converted into a future, then this adaptor 1091 /// will buffer up to at most `n` futures and then return the outputs in the 1092 /// same order as the underlying stream. No more than `n` futures will be 1093 /// buffered at any point in time, and less than `n` may also be buffered 1094 /// depending on the state of each future. 1095 /// 1096 /// The returned stream will be a stream of each future's output. 1097 /// 1098 /// This method is only available when the `std` or `alloc` feature of this 1099 /// library is activated, and it is activated by default. 1100 #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] 1101 #[cfg(feature = "alloc")] buffered(self, n: usize) -> Buffered<Self> where Self::Item: Future, Self: Sized,1102 fn buffered(self, n: usize) -> Buffered<Self> 1103 where 1104 Self::Item: Future, 1105 Self: Sized, 1106 { 1107 assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n)) 1108 } 1109 1110 /// An adaptor for creating a buffered list of pending futures (unordered). 1111 /// 1112 /// If this stream's item can be converted into a future, then this adaptor 1113 /// will buffer up to `n` futures and then return the outputs in the order 1114 /// in which they complete. No more than `n` futures will be buffered at 1115 /// any point in time, and less than `n` may also be buffered depending on 1116 /// the state of each future. 1117 /// 1118 /// The returned stream will be a stream of each future's output. 1119 /// 1120 /// This method is only available when the `std` or `alloc` feature of this 1121 /// library is activated, and it is activated by default. 1122 /// 1123 /// # Examples 1124 /// 1125 /// ``` 1126 /// # futures::executor::block_on(async { 1127 /// use futures::channel::oneshot; 1128 /// use futures::stream::{self, StreamExt}; 1129 /// 1130 /// let (send_one, recv_one) = oneshot::channel(); 1131 /// let (send_two, recv_two) = oneshot::channel(); 1132 /// 1133 /// let stream_of_futures = stream::iter(vec![recv_one, recv_two]); 1134 /// let mut buffered = stream_of_futures.buffer_unordered(10); 1135 /// 1136 /// send_two.send(2i32)?; 1137 /// assert_eq!(buffered.next().await, Some(Ok(2i32))); 1138 /// 1139 /// send_one.send(1i32)?; 1140 /// assert_eq!(buffered.next().await, Some(Ok(1i32))); 1141 /// 1142 /// assert_eq!(buffered.next().await, None); 1143 /// # Ok::<(), i32>(()) }).unwrap(); 1144 /// ``` 1145 #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] 1146 #[cfg(feature = "alloc")] buffer_unordered(self, n: usize) -> BufferUnordered<Self> where Self::Item: Future, Self: Sized,1147 fn buffer_unordered(self, n: usize) -> BufferUnordered<Self> 1148 where 1149 Self::Item: Future, 1150 Self: Sized, 1151 { 1152 assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n)) 1153 } 1154 1155 /// An adapter for zipping two streams together. 1156 /// 1157 /// The zipped stream waits for both streams to produce an item, and then 1158 /// returns that pair. If either stream ends then the zipped stream will 1159 /// also end. 1160 /// 1161 /// # Examples 1162 /// 1163 /// ``` 1164 /// # futures::executor::block_on(async { 1165 /// use futures::stream::{self, StreamExt}; 1166 /// 1167 /// let stream1 = stream::iter(1..=3); 1168 /// let stream2 = stream::iter(5..=10); 1169 /// 1170 /// let vec = stream1.zip(stream2) 1171 /// .collect::<Vec<_>>() 1172 /// .await; 1173 /// assert_eq!(vec![(1, 5), (2, 6), (3, 7)], vec); 1174 /// # }); 1175 /// ``` 1176 /// zip<St>(self, other: St) -> Zip<Self, St> where St: Stream, Self: Sized,1177 fn zip<St>(self, other: St) -> Zip<Self, St> 1178 where 1179 St: Stream, 1180 Self: Sized, 1181 { 1182 assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other)) 1183 } 1184 1185 /// Adapter for chaining two streams. 1186 /// 1187 /// The resulting stream emits elements from the first stream, and when 1188 /// first stream reaches the end, emits the elements from the second stream. 1189 /// 1190 /// ``` 1191 /// # futures::executor::block_on(async { 1192 /// use futures::stream::{self, StreamExt}; 1193 /// 1194 /// let stream1 = stream::iter(vec![Ok(10), Err(false)]); 1195 /// let stream2 = stream::iter(vec![Err(true), Ok(20)]); 1196 /// 1197 /// let stream = stream1.chain(stream2); 1198 /// 1199 /// let result: Vec<_> = stream.collect().await; 1200 /// assert_eq!(result, vec![ 1201 /// Ok(10), 1202 /// Err(false), 1203 /// Err(true), 1204 /// Ok(20), 1205 /// ]); 1206 /// # }); 1207 /// ``` chain<St>(self, other: St) -> Chain<Self, St> where St: Stream<Item = Self::Item>, Self: Sized,1208 fn chain<St>(self, other: St) -> Chain<Self, St> 1209 where 1210 St: Stream<Item = Self::Item>, 1211 Self: Sized, 1212 { 1213 assert_stream::<Self::Item, _>(Chain::new(self, other)) 1214 } 1215 1216 /// Creates a new stream which exposes a `peek` method. 1217 /// 1218 /// Calling `peek` returns a reference to the next item in the stream. peekable(self) -> Peekable<Self> where Self: Sized,1219 fn peekable(self) -> Peekable<Self> 1220 where 1221 Self: Sized, 1222 { 1223 assert_stream::<Self::Item, _>(Peekable::new(self)) 1224 } 1225 1226 /// An adaptor for chunking up items of the stream inside a vector. 1227 /// 1228 /// This combinator will attempt to pull items from this stream and buffer 1229 /// them into a local vector. At most `capacity` items will get buffered 1230 /// before they're yielded from the returned stream. 1231 /// 1232 /// Note that the vectors returned from this iterator may not always have 1233 /// `capacity` elements. If the underlying stream ended and only a partial 1234 /// vector was created, it'll be returned. Additionally if an error happens 1235 /// from the underlying stream then the currently buffered items will be 1236 /// yielded. 1237 /// 1238 /// This method is only available when the `std` or `alloc` feature of this 1239 /// library is activated, and it is activated by default. 1240 /// 1241 /// # Panics 1242 /// 1243 /// This method will panic if `capacity` is zero. 1244 #[cfg(feature = "alloc")] chunks(self, capacity: usize) -> Chunks<Self> where Self: Sized,1245 fn chunks(self, capacity: usize) -> Chunks<Self> 1246 where 1247 Self: Sized, 1248 { 1249 assert_stream::<alloc::vec::Vec<Self::Item>, _>(Chunks::new(self, capacity)) 1250 } 1251 1252 /// An adaptor for chunking up ready items of the stream inside a vector. 1253 /// 1254 /// This combinator will attempt to pull ready items from this stream and 1255 /// buffer them into a local vector. At most `capacity` items will get 1256 /// buffered before they're yielded from the returned stream. If underlying 1257 /// stream returns `Poll::Pending`, and collected chunk is not empty, it will 1258 /// be immediately returned. 1259 /// 1260 /// If the underlying stream ended and only a partial vector was created, 1261 /// it'll be returned. Additionally if an error happens from the underlying 1262 /// stream then the currently buffered items will be yielded. 1263 /// 1264 /// This method is only available when the `std` or `alloc` feature of this 1265 /// library is activated, and it is activated by default. 1266 /// 1267 /// # Panics 1268 /// 1269 /// This method will panic if `capacity` is zero. 1270 #[cfg(feature = "alloc")] ready_chunks(self, capacity: usize) -> ReadyChunks<Self> where Self: Sized,1271 fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self> 1272 where 1273 Self: Sized, 1274 { 1275 ReadyChunks::new(self, capacity) 1276 } 1277 1278 /// A future that completes after the given stream has been fully processed 1279 /// into the sink and the sink has been flushed and closed. 1280 /// 1281 /// This future will drive the stream to keep producing items until it is 1282 /// exhausted, sending each item to the sink. It will complete once the 1283 /// stream is exhausted, the sink has received and flushed all items, and 1284 /// the sink is closed. Note that neither the original stream nor provided 1285 /// sink will be output by this future. Pass the sink by `Pin<&mut S>` 1286 /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in 1287 /// order to preserve access to the `Sink`. 1288 #[cfg(feature = "sink")] 1289 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] forward<S>(self, sink: S) -> Forward<Self, S> where S: Sink<Self::Ok, Error = Self::Error>, Self: TryStream + Sized,1290 fn forward<S>(self, sink: S) -> Forward<Self, S> 1291 where 1292 S: Sink<Self::Ok, Error = Self::Error>, 1293 Self: TryStream + Sized, 1294 { 1295 Forward::new(self, sink) 1296 } 1297 1298 /// Splits this `Stream + Sink` object into separate `Sink` and `Stream` 1299 /// objects. 1300 /// 1301 /// This can be useful when you want to split ownership between tasks, or 1302 /// allow direct interaction between the two objects (e.g. via 1303 /// `Sink::send_all`). 1304 /// 1305 /// This method is only available when the `std` or `alloc` feature of this 1306 /// library is activated, and it is activated by default. 1307 #[cfg(feature = "sink")] 1308 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 1309 #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] 1310 #[cfg(feature = "alloc")] split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) where Self: Sink<Item> + Sized,1311 fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) 1312 where 1313 Self: Sink<Item> + Sized, 1314 { 1315 let (sink, stream) = split::split(self); 1316 (sink, assert_stream::<Self::Item, _>(stream)) 1317 } 1318 1319 /// Do something with each item of this stream, afterwards passing it on. 1320 /// 1321 /// This is similar to the `Iterator::inspect` method in the standard 1322 /// library where it allows easily inspecting each value as it passes 1323 /// through the stream, for example to debug what's going on. inspect<F>(self, f: F) -> Inspect<Self, F> where F: FnMut(&Self::Item), Self: Sized,1324 fn inspect<F>(self, f: F) -> Inspect<Self, F> 1325 where 1326 F: FnMut(&Self::Item), 1327 Self: Sized, 1328 { 1329 assert_stream::<Self::Item, _>(Inspect::new(self, f)) 1330 } 1331 1332 /// Wrap this stream in an `Either` stream, making it the left-hand variant 1333 /// of that `Either`. 1334 /// 1335 /// This can be used in combination with the `right_stream` method to write `if` 1336 /// statements that evaluate to different streams in different branches. left_stream<B>(self) -> Either<Self, B> where B: Stream<Item = Self::Item>, Self: Sized,1337 fn left_stream<B>(self) -> Either<Self, B> 1338 where 1339 B: Stream<Item = Self::Item>, 1340 Self: Sized, 1341 { 1342 assert_stream::<Self::Item, _>(Either::Left(self)) 1343 } 1344 1345 /// Wrap this stream in an `Either` stream, making it the right-hand variant 1346 /// of that `Either`. 1347 /// 1348 /// This can be used in combination with the `left_stream` method to write `if` 1349 /// statements that evaluate to different streams in different branches. right_stream<B>(self) -> Either<B, Self> where B: Stream<Item = Self::Item>, Self: Sized,1350 fn right_stream<B>(self) -> Either<B, Self> 1351 where 1352 B: Stream<Item = Self::Item>, 1353 Self: Sized, 1354 { 1355 assert_stream::<Self::Item, _>(Either::Right(self)) 1356 } 1357 1358 /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`] 1359 /// stream types. poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> where Self: Unpin,1360 fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> 1361 where 1362 Self: Unpin, 1363 { 1364 Pin::new(self).poll_next(cx) 1365 } 1366 1367 /// Returns a [`Future`] that resolves when the next item in this stream is 1368 /// ready. 1369 /// 1370 /// This is similar to the [`next`][StreamExt::next] method, but it won't 1371 /// resolve to [`None`] if used on an empty [`Stream`]. Instead, the 1372 /// returned future type will return `true` from 1373 /// [`FusedFuture::is_terminated`][] when the [`Stream`] is empty, allowing 1374 /// [`select_next_some`][StreamExt::select_next_some] to be easily used with 1375 /// the [`select!`] macro. 1376 /// 1377 /// If the future is polled after this [`Stream`] is empty it will panic. 1378 /// Using the future with a [`FusedFuture`][]-aware primitive like the 1379 /// [`select!`] macro will prevent this. 1380 /// 1381 /// [`FusedFuture`]: futures_core::future::FusedFuture 1382 /// [`FusedFuture::is_terminated`]: futures_core::future::FusedFuture::is_terminated 1383 /// 1384 /// # Examples 1385 /// 1386 /// ``` 1387 /// # futures::executor::block_on(async { 1388 /// use futures::{future, select}; 1389 /// use futures::stream::{StreamExt, FuturesUnordered}; 1390 /// 1391 /// let mut fut = future::ready(1); 1392 /// let mut async_tasks = FuturesUnordered::new(); 1393 /// let mut total = 0; 1394 /// loop { 1395 /// select! { 1396 /// num = fut => { 1397 /// // First, the `ready` future completes. 1398 /// total += num; 1399 /// // Then we spawn a new task onto `async_tasks`, 1400 /// async_tasks.push(async { 5 }); 1401 /// }, 1402 /// // On the next iteration of the loop, the task we spawned 1403 /// // completes. 1404 /// num = async_tasks.select_next_some() => { 1405 /// total += num; 1406 /// } 1407 /// // Finally, both the `ready` future and `async_tasks` have 1408 /// // finished, so we enter the `complete` branch. 1409 /// complete => break, 1410 /// } 1411 /// } 1412 /// assert_eq!(total, 6); 1413 /// # }); 1414 /// ``` select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream,1415 fn select_next_some(&mut self) -> SelectNextSome<'_, Self> 1416 where 1417 Self: Unpin + FusedStream, 1418 { 1419 SelectNextSome::new(self) 1420 } 1421 } 1422