1 //! Composable asynchronous iteration. 2 //! 3 //! This module is an async version of [`std::iter`]. 4 //! 5 //! If you've found yourself with an asynchronous collection of some kind, 6 //! and needed to perform an operation on the elements of said collection, 7 //! you'll quickly run into 'streams'. Streams are heavily used in idiomatic 8 //! asynchronous Rust code, so it's worth becoming familiar with them. 9 //! 10 //! Before explaining more, let's talk about how this module is structured: 11 //! 12 //! # Organization 13 //! 14 //! This module is largely organized by type: 15 //! 16 //! * [Traits] are the core portion: these traits define what kind of streams 17 //! exist and what you can do with them. The methods of these traits are worth 18 //! putting some extra study time into. 19 //! * [Functions] provide some helpful ways to create some basic streams. 20 //! * [Structs] are often the return types of the various methods on this 21 //! module's traits. You'll usually want to look at the method that creates 22 //! the `struct`, rather than the `struct` itself. For more detail about why, 23 //! see '[Implementing Stream](#implementing-stream)'. 24 //! 25 //! [Traits]: #traits 26 //! [Functions]: #functions 27 //! [Structs]: #structs 28 //! 29 //! That's it! Let's dig into streams. 30 //! 31 //! # Stream 32 //! 33 //! The heart and soul of this module is the [`Stream`] trait. The core of 34 //! [`Stream`] looks like this: 35 //! 36 //! ``` 37 //! # use async_std::task::{Context, Poll}; 38 //! # use std::pin::Pin; 39 //! trait Stream { 40 //! type Item; 41 //! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; 42 //! } 43 //! ``` 44 //! 45 //! A stream has a method, [`next`], which when called, returns an 46 //! [`Poll`]<[`Option`]`<Item>>`. [`next`] will return `Ready(Some(Item))` 47 //! as long as there are elements, and once they've all been exhausted, will 48 //! return `None` to indicate that iteration is finished. If we're waiting on 49 //! something asynchronous to resolve `Pending` is returned. 50 //! 51 //! Individual streams may choose to resume iteration, and so calling 52 //! [`next`] again may or may not eventually start returning `Ready(Some(Item))` 53 //! again at some point. 54 //! 55 //! [`Stream`]'s full definition includes a number of other methods as well, 56 //! but they are default methods, built on top of [`next`], and so you get 57 //! them for free. 58 //! 59 //! Streams are also composable, and it's common to chain them together to do 60 //! more complex forms of processing. See the [Adapters](#adapters) section 61 //! below for more details. 62 //! 63 //! [`Poll`]: ../task/enum.Poll.html 64 //! [`Stream`]: trait.Stream.html 65 //! [`next`]: trait.Stream.html#tymethod.next 66 //! [`Option`]: ../../std/option/enum.Option.html 67 //! 68 //! # The three forms of streaming 69 //! 70 //! There are three common methods which can create streams from a collection: 71 //! 72 //! * `stream()`, which iterates over `&T`. 73 //! * `stream_mut()`, which iterates over `&mut T`. 74 //! * `into_stream()`, which iterates over `T`. 75 //! 76 //! Various things in async-std may implement one or more of the 77 //! three, where appropriate. 78 //! 79 //! # Implementing Stream 80 //! 81 //! Creating a stream of your own involves two steps: creating a `struct` to 82 //! hold the stream's state, and then `impl`ementing [`Stream`] for that 83 //! `struct`. This is why there are so many `struct`s in this module: there is 84 //! one for each stream and iterator adapter. 85 //! 86 //! Let's make a stream named `Counter` which counts from `1` to `5`: 87 //! 88 //! ``` 89 //! # use async_std::prelude::*; 90 //! # use async_std::task::{Context, Poll}; 91 //! # use std::pin::Pin; 92 //! // First, the struct: 93 //! 94 //! /// A stream which counts from one to five 95 //! struct Counter { 96 //! count: usize, 97 //! } 98 //! 99 //! // we want our count to start at one, so let's add a new() method to help. 100 //! // This isn't strictly necessary, but is convenient. Note that we start 101 //! // `count` at zero, we'll see why in `next()`'s implementation below. 102 //! impl Counter { 103 //! fn new() -> Counter { 104 //! Counter { count: 0 } 105 //! } 106 //! } 107 //! 108 //! // Then, we implement `Stream` for our `Counter`: 109 //! 110 //! impl Stream for Counter { 111 //! // we will be counting with usize 112 //! type Item = usize; 113 //! 114 //! // poll_next() is the only required method 115 //! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 116 //! // Increment our count. This is why we started at zero. 117 //! self.count += 1; 118 //! 119 //! // Check to see if we've finished counting or not. 120 //! if self.count < 6 { 121 //! Poll::Ready(Some(self.count)) 122 //! } else { 123 //! Poll::Ready(None) 124 //! } 125 //! } 126 //! } 127 //! 128 //! // And now we can use it! 129 //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { 130 //! # 131 //! let mut counter = Counter::new(); 132 //! 133 //! let x = counter.next().await.unwrap(); 134 //! println!("{}", x); 135 //! 136 //! let x = counter.next().await.unwrap(); 137 //! println!("{}", x); 138 //! 139 //! let x = counter.next().await.unwrap(); 140 //! println!("{}", x); 141 //! 142 //! let x = counter.next().await.unwrap(); 143 //! println!("{}", x); 144 //! 145 //! let x = counter.next().await.unwrap(); 146 //! println!("{}", x); 147 //! # 148 //! # Ok(()) }) } 149 //! ``` 150 //! 151 //! This will print `1` through `5`, each on their own line. 152 //! 153 //! Calling `next().await` this way gets repetitive. Rust has a construct which 154 //! can call `next()` on your stream, until it reaches `None`. Let's go over 155 //! that next. 156 //! 157 //! # while let Loops and IntoStream 158 //! 159 //! Rust's `while let` loop syntax is an idiomatic way to iterate over streams. Here's a basic 160 //! example of `while let`: 161 //! 162 //! ``` 163 //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { 164 //! # 165 //! # use async_std::prelude::*; 166 //! # use async_std::stream; 167 //! let mut values = stream::repeat(1u8).take(5); 168 //! 169 //! while let Some(x) = values.next().await { 170 //! println!("{}", x); 171 //! } 172 //! # 173 //! # Ok(()) }) } 174 //! ``` 175 //! 176 //! This will print the numbers one through five, each on their own line. But 177 //! you'll notice something here: we never called anything on our vector to 178 //! produce a stream. What gives? 179 //! 180 //! There's a trait in the standard library for converting something into an 181 //! stream: [`IntoStream`]. This trait has one method, [`into_stream`], 182 //! which converts the thing implementing [`IntoStream`] into a stream. 183 //! 184 //! Unlike `std::iter::IntoIterator`, `IntoStream` does not have compiler 185 //! support yet. This means that automatic conversions like with `for` loops 186 //! doesn't occur yet, and `into_stream` will always have to be called manually. 187 //! 188 //! [`IntoStream`]: trait.IntoStream.html 189 //! [`into_stream`]: trait.IntoStream.html#tymethod.into_stream 190 //! 191 //! # Adapters 192 //! 193 //! Functions which take an [`Stream`] and return another [`Stream`] are 194 //! often called 'stream adapters', as they are a form of the 'adapter 195 //! pattern'. 196 //! 197 //! Common stream adapters include [`map`], [`take`], and [`filter`]. 198 //! For more, see their documentation. 199 //! 200 //! [`map`]: trait.Stream.html#method.map 201 //! [`take`]: trait.Stream.html#method.take 202 //! [`filter`]: trait.Stream.html#method.filter 203 //! 204 //! # Laziness 205 //! 206 //! Streams (and stream [adapters](#adapters)) are *lazy*. This means that 207 //! just creating a stream doesn't _do_ a whole lot. Nothing really happens 208 //! until you call [`next`]. This is sometimes a source of confusion when 209 //! creating a stream solely for its side effects. For example, the [`map`] 210 //! method calls a closure on each element it iterates over: 211 //! 212 //! ``` 213 //! # #![allow(unused_must_use)] 214 //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { 215 //! # 216 //! # use async_std::prelude::*; 217 //! # use async_std::stream; 218 //! let v = stream::repeat(1u8).take(5); 219 //! v.map(|x| println!("{}", x)); 220 //! # 221 //! # Ok(()) }) } 222 //! ``` 223 //! 224 //! This will not print any values, as we only created a stream, rather than 225 //! using it. The compiler will warn us about this kind of behavior: 226 //! 227 //! ```text 228 //! warning: unused result that must be used: streams are lazy and 229 //! do nothing unless consumed 230 //! ``` 231 //! 232 //! The idiomatic way to write a [`map`] for its side effects is to use a 233 //! `while let` loop instead: 234 //! 235 //! ``` 236 //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { 237 //! # 238 //! # use async_std::prelude::*; 239 //! # use async_std::stream; 240 //! let mut v = stream::repeat(1u8).take(5); 241 //! 242 //! while let Some(x) = &v.next().await { 243 //! println!("{}", x); 244 //! } 245 //! # 246 //! # Ok(()) }) } 247 //! ``` 248 //! 249 //! [`map`]: trait.Stream.html#method.map 250 //! 251 //! The two most common ways to evaluate a stream are to use a `while let` loop 252 //! like this, or using the [`collect`] method to produce a new collection. 253 //! 254 //! [`collect`]: trait.Stream.html#method.collect 255 //! 256 //! # Infinity 257 //! 258 //! Streams do not have to be finite. As an example, an repeat stream is 259 //! an infinite stream: 260 //! 261 //! ``` 262 //! # use async_std::stream; 263 //! let numbers = stream::repeat(1u8); 264 //! ``` 265 //! 266 //! It is common to use the [`take`] stream adapter to turn an infinite 267 //! stream into a finite one: 268 //! 269 //! ``` 270 //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { 271 //! # 272 //! # use async_std::prelude::*; 273 //! # use async_std::stream; 274 //! let numbers = stream::repeat(1u8); 275 //! let mut five_numbers = numbers.take(5); 276 //! 277 //! while let Some(number) = five_numbers.next().await { 278 //! println!("{}", number); 279 //! } 280 //! # 281 //! # Ok(()) }) } 282 //! ``` 283 //! 284 //! This will print the numbers `0` through `4`, each on their own line. 285 //! 286 //! Bear in mind that methods on infinite streams, even those for which a 287 //! result can be determined mathematically in finite time, may not terminate. 288 //! Specifically, methods such as [`min`], which in the general case require 289 //! traversing every element in the stream, are likely not to return 290 //! successfully for any infinite streams. 291 //! 292 //! ```ignore 293 //! let ones = async_std::stream::repeat(1); 294 //! let least = ones.min().await.unwrap(); // Oh no! An infinite loop! 295 //! // `ones.min()` causes an infinite loop, so we won't reach this point! 296 //! println!("The smallest number one is {}.", least); 297 //! ``` 298 //! 299 //! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html 300 //! [`take`]: trait.Stream.html#method.take 301 //! [`min`]: trait.Stream.html#method.min 302 303 pub use empty::{empty, Empty}; 304 pub use from_fn::{from_fn, FromFn}; 305 pub use from_iter::{from_iter, FromIter}; 306 pub use once::{once, Once}; 307 pub use repeat::{repeat, Repeat}; 308 pub use repeat_with::{repeat_with, RepeatWith}; 309 pub use stream::*; 310 311 pub(crate) mod stream; 312 313 mod empty; 314 mod from_fn; 315 mod from_iter; 316 mod once; 317 mod repeat; 318 mod repeat_with; 319 320 cfg_unstable! { 321 mod double_ended_stream; 322 mod exact_size_stream; 323 mod extend; 324 mod from_stream; 325 mod fused_stream; 326 mod interval; 327 mod into_stream; 328 mod pending; 329 mod product; 330 mod successors; 331 mod sum; 332 333 pub use double_ended_stream::DoubleEndedStream; 334 pub use exact_size_stream::ExactSizeStream; 335 pub use extend::{extend, Extend}; 336 pub use from_stream::FromStream; 337 pub use fused_stream::FusedStream; 338 pub use interval::{interval, Interval}; 339 pub use into_stream::IntoStream; 340 pub use pending::{pending, Pending}; 341 pub use product::Product; 342 pub use stream::Merge; 343 pub use successors::{successors, Successors}; 344 pub use sum::Sum; 345 } 346