1 //! Composable asynchronous iteration.
2 //!
3 //! This module is an async version of [`std::iter`].
4 //!
5 //! If you've found yourself with an asynchronous collection of some kind,
6 //! and needed to perform an operation on the elements of said collection,
7 //! you'll quickly run into 'streams'. Streams are heavily used in idiomatic
8 //! asynchronous Rust code, so it's worth becoming familiar with them.
9 //!
10 //! Before explaining more, let's talk about how this module is structured:
11 //!
12 //! # Organization
13 //!
14 //! This module is largely organized by type:
15 //!
16 //! * [Traits] are the core portion: these traits define what kind of streams
17 //!   exist and what you can do with them. The methods of these traits are worth
18 //!   putting some extra study time into.
19 //! * [Functions] provide some helpful ways to create some basic streams.
20 //! * [Structs] are often the return types of the various methods on this
21 //!   module's traits. You'll usually want to look at the method that creates
22 //!   the `struct`, rather than the `struct` itself. For more detail about why,
23 //!   see '[Implementing Stream](#implementing-stream)'.
24 //!
25 //! [Traits]: #traits
26 //! [Functions]: #functions
27 //! [Structs]: #structs
28 //!
29 //! That's it! Let's dig into streams.
30 //!
31 //! # Stream
32 //!
33 //! The heart and soul of this module is the [`Stream`] trait. The core of
34 //! [`Stream`] looks like this:
35 //!
36 //! ```
37 //! # use async_std::task::{Context, Poll};
38 //! # use std::pin::Pin;
39 //! trait Stream {
40 //!     type Item;
41 //!     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
42 //! }
43 //! ```
44 //!
45 //! A stream has a method, [`next`], which when called, returns an
46 //! [`Poll`]<[`Option`]`<Item>>`. [`next`] will return `Ready(Some(Item))`
47 //! as long as there are elements, and once they've all been exhausted, will
48 //! return `None` to indicate that iteration is finished. If we're waiting on
49 //! something asynchronous to resolve `Pending` is returned.
50 //!
51 //! Individual streams may choose to resume iteration, and so calling
52 //! [`next`] again may or may not eventually start returning `Ready(Some(Item))`
53 //! again at some point.
54 //!
55 //! [`Stream`]'s full definition includes a number of other methods as well,
56 //! but they are default methods, built on top of [`next`], and so you get
57 //! them for free.
58 //!
59 //! Streams are also composable, and it's common to chain them together to do
60 //! more complex forms of processing. See the [Adapters](#adapters) section
61 //! below for more details.
62 //!
63 //! [`Poll`]: ../task/enum.Poll.html
64 //! [`Stream`]: trait.Stream.html
65 //! [`next`]: trait.Stream.html#tymethod.next
66 //! [`Option`]: ../../std/option/enum.Option.html
67 //!
68 //! # The three forms of streaming
69 //!
70 //! There are three common methods which can create streams from a collection:
71 //!
72 //! * `stream()`, which iterates over `&T`.
73 //! * `stream_mut()`, which iterates over `&mut T`.
74 //! * `into_stream()`, which iterates over `T`.
75 //!
76 //! Various things in async-std may implement one or more of the
77 //! three, where appropriate.
78 //!
79 //! # Implementing Stream
80 //!
81 //! Creating a stream of your own involves two steps: creating a `struct` to
82 //! hold the stream's state, and then `impl`ementing [`Stream`] for that
83 //! `struct`. This is why there are so many `struct`s in this module: there is
84 //! one for each stream and iterator adapter.
85 //!
86 //! Let's make a stream named `Counter` which counts from `1` to `5`:
87 //!
88 //! ```
89 //! # use async_std::prelude::*;
90 //! # use async_std::task::{Context, Poll};
91 //! # use std::pin::Pin;
92 //! // First, the struct:
93 //!
94 //! /// A stream which counts from one to five
95 //! struct Counter {
96 //!     count: usize,
97 //! }
98 //!
99 //! // we want our count to start at one, so let's add a new() method to help.
100 //! // This isn't strictly necessary, but is convenient. Note that we start
101 //! // `count` at zero, we'll see why in `next()`'s implementation below.
102 //! impl Counter {
103 //!     fn new() -> Counter {
104 //!         Counter { count: 0 }
105 //!     }
106 //! }
107 //!
108 //! // Then, we implement `Stream` for our `Counter`:
109 //!
110 //! impl Stream for Counter {
111 //!     // we will be counting with usize
112 //!     type Item = usize;
113 //!
114 //!     // poll_next() is the only required method
115 //!     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
116 //!         // Increment our count. This is why we started at zero.
117 //!         self.count += 1;
118 //!
119 //!         // Check to see if we've finished counting or not.
120 //!         if self.count < 6 {
121 //!             Poll::Ready(Some(self.count))
122 //!         } else {
123 //!             Poll::Ready(None)
124 //!         }
125 //!     }
126 //! }
127 //!
128 //! // And now we can use it!
129 //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
130 //! #
131 //! let mut counter = Counter::new();
132 //!
133 //! let x = counter.next().await.unwrap();
134 //! println!("{}", x);
135 //!
136 //! let x = counter.next().await.unwrap();
137 //! println!("{}", x);
138 //!
139 //! let x = counter.next().await.unwrap();
140 //! println!("{}", x);
141 //!
142 //! let x = counter.next().await.unwrap();
143 //! println!("{}", x);
144 //!
145 //! let x = counter.next().await.unwrap();
146 //! println!("{}", x);
147 //! #
148 //! # Ok(()) }) }
149 //! ```
150 //!
151 //! This will print `1` through `5`, each on their own line.
152 //!
153 //! Calling `next().await` this way gets repetitive. Rust has a construct which
154 //! can call `next()` on your stream, until it reaches `None`. Let's go over
155 //! that next.
156 //!
157 //! # while let Loops and IntoStream
158 //!
159 //! Rust's `while let` loop syntax is an idiomatic way to iterate over streams. Here's a basic
160 //! example of `while let`:
161 //!
162 //! ```
163 //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
164 //! #
165 //! # use async_std::prelude::*;
166 //! # use async_std::stream;
167 //! let mut values = stream::repeat(1u8).take(5);
168 //!
169 //! while let Some(x) = values.next().await {
170 //!     println!("{}", x);
171 //! }
172 //! #
173 //! # Ok(()) }) }
174 //! ```
175 //!
176 //! This will print the numbers one through five, each on their own line. But
177 //! you'll notice something here: we never called anything on our vector to
178 //! produce a stream. What gives?
179 //!
180 //! There's a trait in the standard library for converting something into an
181 //! stream: [`IntoStream`]. This trait has one method, [`into_stream`],
182 //! which converts the thing implementing [`IntoStream`] into a stream.
183 //!
184 //! Unlike `std::iter::IntoIterator`, `IntoStream` does not have compiler
185 //! support yet. This means that automatic conversions like with `for` loops
186 //! doesn't occur yet, and `into_stream` will always have to be called manually.
187 //!
188 //! [`IntoStream`]: trait.IntoStream.html
189 //! [`into_stream`]: trait.IntoStream.html#tymethod.into_stream
190 //!
191 //! # Adapters
192 //!
193 //! Functions which take an [`Stream`] and return another [`Stream`] are
194 //! often called 'stream adapters', as they are a form of the 'adapter
195 //! pattern'.
196 //!
197 //! Common stream adapters include [`map`], [`take`], and [`filter`].
198 //! For more, see their documentation.
199 //!
200 //! [`map`]: trait.Stream.html#method.map
201 //! [`take`]: trait.Stream.html#method.take
202 //! [`filter`]: trait.Stream.html#method.filter
203 //!
204 //! # Laziness
205 //!
206 //! Streams (and stream [adapters](#adapters)) are *lazy*. This means that
207 //! just creating a stream doesn't _do_ a whole lot. Nothing really happens
208 //! until you call [`next`]. This is sometimes a source of confusion when
209 //! creating a stream solely for its side effects. For example, the [`map`]
210 //! method calls a closure on each element it iterates over:
211 //!
212 //! ```
213 //! # #![allow(unused_must_use)]
214 //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
215 //! #
216 //! # use async_std::prelude::*;
217 //! # use async_std::stream;
218 //! let v = stream::repeat(1u8).take(5);
219 //! v.map(|x| println!("{}", x));
220 //! #
221 //! # Ok(()) }) }
222 //! ```
223 //!
224 //! This will not print any values, as we only created a stream, rather than
225 //! using it. The compiler will warn us about this kind of behavior:
226 //!
227 //! ```text
228 //! warning: unused result that must be used: streams are lazy and
229 //! do nothing unless consumed
230 //! ```
231 //!
232 //! The idiomatic way to write a [`map`] for its side effects is to use a
233 //! `while let` loop instead:
234 //!
235 //! ```
236 //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
237 //! #
238 //! # use async_std::prelude::*;
239 //! # use async_std::stream;
240 //! let mut v = stream::repeat(1u8).take(5);
241 //!
242 //! while let Some(x) = &v.next().await {
243 //!     println!("{}", x);
244 //! }
245 //! #
246 //! # Ok(()) }) }
247 //! ```
248 //!
249 //! [`map`]: trait.Stream.html#method.map
250 //!
251 //! The two most common ways to evaluate a stream are to use a `while let` loop
252 //! like this, or using the [`collect`] method to produce a new collection.
253 //!
254 //! [`collect`]: trait.Stream.html#method.collect
255 //!
256 //! # Infinity
257 //!
258 //! Streams do not have to be finite. As an example, an repeat stream is
259 //! an infinite stream:
260 //!
261 //! ```
262 //! # use async_std::stream;
263 //! let numbers = stream::repeat(1u8);
264 //! ```
265 //!
266 //! It is common to use the [`take`] stream adapter to turn an infinite
267 //! stream into a finite one:
268 //!
269 //! ```
270 //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
271 //! #
272 //! # use async_std::prelude::*;
273 //! # use async_std::stream;
274 //! let numbers = stream::repeat(1u8);
275 //! let mut five_numbers = numbers.take(5);
276 //!
277 //! while let Some(number) = five_numbers.next().await {
278 //!     println!("{}", number);
279 //! }
280 //! #
281 //! # Ok(()) }) }
282 //! ```
283 //!
284 //! This will print the numbers `0` through `4`, each on their own line.
285 //!
286 //! Bear in mind that methods on infinite streams, even those for which a
287 //! result can be determined mathematically in finite time, may not terminate.
288 //! Specifically, methods such as [`min`], which in the general case require
289 //! traversing every element in the stream, are likely not to return
290 //! successfully for any infinite streams.
291 //!
292 //! ```ignore
293 //! let ones = async_std::stream::repeat(1);
294 //! let least = ones.min().await.unwrap(); // Oh no! An infinite loop!
295 //! // `ones.min()` causes an infinite loop, so we won't reach this point!
296 //! println!("The smallest number one is {}.", least);
297 //! ```
298 //!
299 //! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html
300 //! [`take`]: trait.Stream.html#method.take
301 //! [`min`]: trait.Stream.html#method.min
302 
303 pub use empty::{empty, Empty};
304 pub use from_fn::{from_fn, FromFn};
305 pub use from_iter::{from_iter, FromIter};
306 pub use once::{once, Once};
307 pub use repeat::{repeat, Repeat};
308 pub use repeat_with::{repeat_with, RepeatWith};
309 pub use stream::*;
310 
311 pub(crate) mod stream;
312 
313 mod empty;
314 mod from_fn;
315 mod from_iter;
316 mod once;
317 mod repeat;
318 mod repeat_with;
319 
320 cfg_unstable! {
321     mod double_ended_stream;
322     mod exact_size_stream;
323     mod extend;
324     mod from_stream;
325     mod fused_stream;
326     mod interval;
327     mod into_stream;
328     mod pending;
329     mod product;
330     mod successors;
331     mod sum;
332 
333     pub use double_ended_stream::DoubleEndedStream;
334     pub use exact_size_stream::ExactSizeStream;
335     pub use extend::{extend, Extend};
336     pub use from_stream::FromStream;
337     pub use fused_stream::FusedStream;
338     pub use interval::{interval, Interval};
339     pub use into_stream::IntoStream;
340     pub use pending::{pending, Pending};
341     pub use product::Product;
342     pub use stream::Merge;
343     pub use successors::{successors, Successors};
344     pub use sum::Sum;
345 }
346