1 //! Futures
2 //!
3 //! This module contains a number of functions for working with `Future`s,
4 //! including the `FutureExt` trait which adds methods to `Future` types.
5 
6 #[cfg(feature = "alloc")]
7 use alloc::boxed::Box;
8 use core::pin::Pin;
9 
10 use crate::fns::{inspect_fn, into_fn, ok_fn, InspectFn, IntoFn, OkFn};
11 use crate::future::{assert_future, Either};
12 use crate::never::Never;
13 use crate::stream::assert_stream;
14 #[cfg(feature = "alloc")]
15 use futures_core::future::{BoxFuture, LocalBoxFuture};
16 use futures_core::{
17     future::Future,
18     stream::Stream,
19     task::{Context, Poll},
20 };
21 use pin_utils::pin_mut;
22 
23 // Combinators
24 
25 mod flatten;
26 mod fuse;
27 mod map;
28 
29 delegate_all!(
30     /// Future for the [`flatten`](super::FutureExt::flatten) method.
31     Flatten<F>(
32         flatten::Flatten<F, <F as Future>::Output>
33     ): Debug + Future + FusedFuture + New[|x: F| flatten::Flatten::new(x)]
34     where F: Future
35 );
36 
37 delegate_all!(
38     /// Stream for the [`flatten_stream`](FutureExt::flatten_stream) method.
39     FlattenStream<F>(
40         flatten::Flatten<F, <F as Future>::Output>
41     ): Debug + Sink + Stream + FusedStream + New[|x: F| flatten::Flatten::new(x)]
42     where F: Future
43 );
44 
45 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
46 pub use fuse::Fuse;
47 
48 delegate_all!(
49     /// Future for the [`map`](super::FutureExt::map) method.
50     Map<Fut, F>(
51         map::Map<Fut, F>
52     ): Debug + Future + FusedFuture + New[|x: Fut, f: F| map::Map::new(x, f)]
53 );
54 
55 delegate_all!(
56     /// Stream for the [`into_stream`](FutureExt::into_stream) method.
57     IntoStream<F>(
58         crate::stream::Once<F>
59     ): Debug + Stream + FusedStream + New[|x: F| crate::stream::Once::new(x)]
60 );
61 
62 delegate_all!(
63     /// Future for the [`map_into`](FutureExt::map_into) combinator.
64     MapInto<Fut, T>(
65         Map<Fut, IntoFn<T>>
66     ): Debug + Future + FusedFuture + New[|x: Fut| Map::new(x, into_fn())]
67 );
68 
69 delegate_all!(
70     /// Future for the [`then`](FutureExt::then) method.
71     Then<Fut1, Fut2, F>(
72         flatten::Flatten<Map<Fut1, F>, Fut2>
73     ): Debug + Future + FusedFuture + New[|x: Fut1, y: F| flatten::Flatten::new(Map::new(x, y))]
74 );
75 
76 delegate_all!(
77     /// Future for the [`inspect`](FutureExt::inspect) method.
78     Inspect<Fut, F>(
79         map::Map<Fut, InspectFn<F>>
80     ): Debug + Future + FusedFuture + New[|x: Fut, f: F| map::Map::new(x, inspect_fn(f))]
81 );
82 
83 delegate_all!(
84     /// Future for the [`never_error`](super::FutureExt::never_error) combinator.
85     NeverError<Fut>(
86         Map<Fut, OkFn<Never>>
87     ): Debug + Future + FusedFuture + New[|x: Fut| Map::new(x, ok_fn())]
88 );
89 
90 delegate_all!(
91     /// Future for the [`unit_error`](super::FutureExt::unit_error) combinator.
92     UnitError<Fut>(
93         Map<Fut, OkFn<()>>
94     ): Debug + Future + FusedFuture + New[|x: Fut| Map::new(x, ok_fn())]
95 );
96 
97 #[cfg(feature = "std")]
98 mod catch_unwind;
99 #[cfg(feature = "std")]
100 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
101 pub use self::catch_unwind::CatchUnwind;
102 
103 #[cfg(feature = "channel")]
104 #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
105 #[cfg(feature = "std")]
106 mod remote_handle;
107 #[cfg(feature = "channel")]
108 #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
109 #[cfg(feature = "std")]
110 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
111 pub use self::remote_handle::{Remote, RemoteHandle};
112 
113 #[cfg(feature = "std")]
114 mod shared;
115 #[cfg(feature = "std")]
116 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
117 pub use self::shared::{Shared, WeakShared};
118 
119 impl<T: ?Sized> FutureExt for T where T: Future {}
120 
121 /// An extension trait for `Future`s that provides a variety of convenient
122 /// adapters.
123 pub trait FutureExt: Future {
124     /// Map this future's output to a different type, returning a new future of
125     /// the resulting type.
126     ///
127     /// This function is similar to the `Option::map` or `Iterator::map` where
128     /// it will change the type of the underlying future. This is useful to
129     /// chain along a computation once a future has been resolved.
130     ///
131     /// Note that this function consumes the receiving future and returns a
132     /// wrapped version of it, similar to the existing `map` methods in the
133     /// standard library.
134     ///
135     /// # Examples
136     ///
137     /// ```
138     /// # futures::executor::block_on(async {
139     /// use futures::future::FutureExt;
140     ///
141     /// let future = async { 1 };
142     /// let new_future = future.map(|x| x + 3);
143     /// assert_eq!(new_future.await, 4);
144     /// # });
145     /// ```
map<U, F>(self, f: F) -> Map<Self, F> where F: FnOnce(Self::Output) -> U, Self: Sized,146     fn map<U, F>(self, f: F) -> Map<Self, F>
147     where
148         F: FnOnce(Self::Output) -> U,
149         Self: Sized,
150     {
151         assert_future::<U, _>(Map::new(self, f))
152     }
153 
154     /// Map this future's output to a different type, returning a new future of
155     /// the resulting type.
156     ///
157     /// This function is equivalent to calling `map(Into::into)` but allows naming
158     /// the return type.
map_into<U>(self) -> MapInto<Self, U> where Self::Output: Into<U>, Self: Sized,159     fn map_into<U>(self) -> MapInto<Self, U>
160     where
161         Self::Output: Into<U>,
162         Self: Sized,
163     {
164         assert_future::<U, _>(MapInto::new(self))
165     }
166 
167     /// Chain on a computation for when a future finished, passing the result of
168     /// the future to the provided closure `f`.
169     ///
170     /// The returned value of the closure must implement the `Future` trait
171     /// and can represent some more work to be done before the composed future
172     /// is finished.
173     ///
174     /// The closure `f` is only run *after* successful completion of the `self`
175     /// future.
176     ///
177     /// Note that this function consumes the receiving future and returns a
178     /// wrapped version of it.
179     ///
180     /// # Examples
181     ///
182     /// ```
183     /// # futures::executor::block_on(async {
184     /// use futures::future::FutureExt;
185     ///
186     /// let future_of_1 = async { 1 };
187     /// let future_of_4 = future_of_1.then(|x| async move { x + 3 });
188     /// assert_eq!(future_of_4.await, 4);
189     /// # });
190     /// ```
then<Fut, F>(self, f: F) -> Then<Self, Fut, F> where F: FnOnce(Self::Output) -> Fut, Fut: Future, Self: Sized,191     fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
192     where
193         F: FnOnce(Self::Output) -> Fut,
194         Fut: Future,
195         Self: Sized,
196     {
197         assert_future::<Fut::Output, _>(Then::new(self, f))
198     }
199 
200     /// Wrap this future in an `Either` future, making it the left-hand variant
201     /// of that `Either`.
202     ///
203     /// This can be used in combination with the `right_future` method to write `if`
204     /// statements that evaluate to different futures in different branches.
205     ///
206     /// # Examples
207     ///
208     /// ```
209     /// # futures::executor::block_on(async {
210     /// use futures::future::FutureExt;
211     ///
212     /// let x = 6;
213     /// let future = if x < 10 {
214     ///     async { true }.left_future()
215     /// } else {
216     ///     async { false }.right_future()
217     /// };
218     ///
219     /// assert_eq!(future.await, true);
220     /// # });
221     /// ```
left_future<B>(self) -> Either<Self, B> where B: Future<Output = Self::Output>, Self: Sized,222     fn left_future<B>(self) -> Either<Self, B>
223     where
224         B: Future<Output = Self::Output>,
225         Self: Sized,
226     {
227         assert_future::<Self::Output, _>(Either::Left(self))
228     }
229 
230     /// Wrap this future in an `Either` future, making it the right-hand variant
231     /// of that `Either`.
232     ///
233     /// This can be used in combination with the `left_future` method to write `if`
234     /// statements that evaluate to different futures in different branches.
235     ///
236     /// # Examples
237     ///
238     /// ```
239     /// # futures::executor::block_on(async {
240     /// use futures::future::FutureExt;
241     ///
242     /// let x = 6;
243     /// let future = if x > 10 {
244     ///     async { true }.left_future()
245     /// } else {
246     ///     async { false }.right_future()
247     /// };
248     ///
249     /// assert_eq!(future.await, false);
250     /// # });
251     /// ```
right_future<A>(self) -> Either<A, Self> where A: Future<Output = Self::Output>, Self: Sized,252     fn right_future<A>(self) -> Either<A, Self>
253     where
254         A: Future<Output = Self::Output>,
255         Self: Sized,
256     {
257         assert_future::<Self::Output, _>(Either::Right(self))
258     }
259 
260     /// Convert this future into a single element stream.
261     ///
262     /// The returned stream contains single success if this future resolves to
263     /// success or single error if this future resolves into error.
264     ///
265     /// # Examples
266     ///
267     /// ```
268     /// # futures::executor::block_on(async {
269     /// use futures::future::FutureExt;
270     /// use futures::stream::StreamExt;
271     ///
272     /// let future = async { 17 };
273     /// let stream = future.into_stream();
274     /// let collected: Vec<_> = stream.collect().await;
275     /// assert_eq!(collected, vec![17]);
276     /// # });
277     /// ```
into_stream(self) -> IntoStream<Self> where Self: Sized,278     fn into_stream(self) -> IntoStream<Self>
279     where
280         Self: Sized,
281     {
282         assert_stream::<Self::Output, _>(IntoStream::new(self))
283     }
284 
285     /// Flatten the execution of this future when the output of this
286     /// future is itself another future.
287     ///
288     /// This can be useful when combining futures together to flatten the
289     /// computation out the final result.
290     ///
291     /// This method is roughly equivalent to `self.then(|x| x)`.
292     ///
293     /// Note that this function consumes the receiving future and returns a
294     /// wrapped version of it.
295     ///
296     /// # Examples
297     ///
298     /// ```
299     /// # futures::executor::block_on(async {
300     /// use futures::future::FutureExt;
301     ///
302     /// let nested_future = async { async { 1 } };
303     /// let future = nested_future.flatten();
304     /// assert_eq!(future.await, 1);
305     /// # });
306     /// ```
flatten(self) -> Flatten<Self> where Self::Output: Future, Self: Sized,307     fn flatten(self) -> Flatten<Self>
308     where
309         Self::Output: Future,
310         Self: Sized,
311     {
312         let f = Flatten::new(self);
313         assert_future::<<<Self as Future>::Output as Future>::Output, _>(f)
314     }
315 
316     /// Flatten the execution of this future when the successful result of this
317     /// future is a stream.
318     ///
319     /// This can be useful when stream initialization is deferred, and it is
320     /// convenient to work with that stream as if stream was available at the
321     /// call site.
322     ///
323     /// Note that this function consumes this future and returns a wrapped
324     /// version of it.
325     ///
326     /// # Examples
327     ///
328     /// ```
329     /// # futures::executor::block_on(async {
330     /// use futures::future::FutureExt;
331     /// use futures::stream::{self, StreamExt};
332     ///
333     /// let stream_items = vec![17, 18, 19];
334     /// let future_of_a_stream = async { stream::iter(stream_items) };
335     ///
336     /// let stream = future_of_a_stream.flatten_stream();
337     /// let list: Vec<_> = stream.collect().await;
338     /// assert_eq!(list, vec![17, 18, 19]);
339     /// # });
340     /// ```
flatten_stream(self) -> FlattenStream<Self> where Self::Output: Stream, Self: Sized,341     fn flatten_stream(self) -> FlattenStream<Self>
342     where
343         Self::Output: Stream,
344         Self: Sized,
345     {
346         assert_stream::<<Self::Output as Stream>::Item, _>(FlattenStream::new(self))
347     }
348 
349     /// Fuse a future such that `poll` will never again be called once it has
350     /// completed. This method can be used to turn any `Future` into a
351     /// `FusedFuture`.
352     ///
353     /// Normally, once a future has returned `Poll::Ready` from `poll`,
354     /// any further calls could exhibit bad behavior such as blocking
355     /// forever, panicking, never returning, etc. If it is known that `poll`
356     /// may be called too often then this method can be used to ensure that it
357     /// has defined semantics.
358     ///
359     /// If a `fuse`d future is `poll`ed after having returned `Poll::Ready`
360     /// previously, it will return `Poll::Pending`, from `poll` again (and will
361     /// continue to do so for all future calls to `poll`).
362     ///
363     /// This combinator will drop the underlying future as soon as it has been
364     /// completed to ensure resources are reclaimed as soon as possible.
fuse(self) -> Fuse<Self> where Self: Sized,365     fn fuse(self) -> Fuse<Self>
366     where
367         Self: Sized,
368     {
369         let f = Fuse::new(self);
370         assert_future::<Self::Output, _>(f)
371     }
372 
373     /// Do something with the output of a future before passing it on.
374     ///
375     /// When using futures, you'll often chain several of them together.  While
376     /// working on such code, you might want to check out what's happening at
377     /// various parts in the pipeline, without consuming the intermediate
378     /// value. To do that, insert a call to `inspect`.
379     ///
380     /// # Examples
381     ///
382     /// ```
383     /// # futures::executor::block_on(async {
384     /// use futures::future::FutureExt;
385     ///
386     /// let future = async { 1 };
387     /// let new_future = future.inspect(|&x| println!("about to resolve: {}", x));
388     /// assert_eq!(new_future.await, 1);
389     /// # });
390     /// ```
inspect<F>(self, f: F) -> Inspect<Self, F> where F: FnOnce(&Self::Output), Self: Sized,391     fn inspect<F>(self, f: F) -> Inspect<Self, F>
392     where
393         F: FnOnce(&Self::Output),
394         Self: Sized,
395     {
396         assert_future::<Self::Output, _>(Inspect::new(self, f))
397     }
398 
399     /// Catches unwinding panics while polling the future.
400     ///
401     /// In general, panics within a future can propagate all the way out to the
402     /// task level. This combinator makes it possible to halt unwinding within
403     /// the future itself. It's most commonly used within task executors. It's
404     /// not recommended to use this for error handling.
405     ///
406     /// Note that this method requires the `UnwindSafe` bound from the standard
407     /// library. This isn't always applied automatically, and the standard
408     /// library provides an `AssertUnwindSafe` wrapper type to apply it
409     /// after-the fact. To assist using this method, the `Future` trait is also
410     /// implemented for `AssertUnwindSafe<F>` where `F` implements `Future`.
411     ///
412     /// This method is only available when the `std` feature of this
413     /// library is activated, and it is activated by default.
414     ///
415     /// # Examples
416     ///
417     /// ```
418     /// # futures::executor::block_on(async {
419     /// use futures::future::{self, FutureExt, Ready};
420     ///
421     /// let future = future::ready(2);
422     /// assert!(future.catch_unwind().await.is_ok());
423     ///
424     /// let future = future::lazy(|_| -> Ready<i32> {
425     ///     unimplemented!()
426     /// });
427     /// assert!(future.catch_unwind().await.is_err());
428     /// # });
429     /// ```
430     #[cfg(feature = "std")]
catch_unwind(self) -> CatchUnwind<Self> where Self: Sized + ::std::panic::UnwindSafe,431     fn catch_unwind(self) -> CatchUnwind<Self>
432     where
433         Self: Sized + ::std::panic::UnwindSafe,
434     {
435         assert_future::<Result<Self::Output, Box<dyn std::any::Any + Send>>, _>(CatchUnwind::new(
436             self,
437         ))
438     }
439 
440     /// Create a cloneable handle to this future where all handles will resolve
441     /// to the same result.
442     ///
443     /// The `shared` combinator method provides a method to convert any future
444     /// into a cloneable future. It enables a future to be polled by multiple
445     /// threads.
446     ///
447     /// This method is only available when the `std` feature of this
448     /// library is activated, and it is activated by default.
449     ///
450     /// # Examples
451     ///
452     /// ```
453     /// # futures::executor::block_on(async {
454     /// use futures::future::FutureExt;
455     ///
456     /// let future = async { 6 };
457     /// let shared1 = future.shared();
458     /// let shared2 = shared1.clone();
459     ///
460     /// assert_eq!(6, shared1.await);
461     /// assert_eq!(6, shared2.await);
462     /// # });
463     /// ```
464     ///
465     /// ```
466     /// // Note, unlike most examples this is written in the context of a
467     /// // synchronous function to better illustrate the cross-thread aspect of
468     /// // the `shared` combinator.
469     ///
470     /// # futures::executor::block_on(async {
471     /// use futures::future::FutureExt;
472     /// use futures::executor::block_on;
473     /// use std::thread;
474     ///
475     /// let future = async { 6 };
476     /// let shared1 = future.shared();
477     /// let shared2 = shared1.clone();
478     /// let join_handle = thread::spawn(move || {
479     ///     assert_eq!(6, block_on(shared2));
480     /// });
481     /// assert_eq!(6, shared1.await);
482     /// join_handle.join().unwrap();
483     /// # });
484     /// ```
485     #[cfg(feature = "std")]
shared(self) -> Shared<Self> where Self: Sized, Self::Output: Clone,486     fn shared(self) -> Shared<Self>
487     where
488         Self: Sized,
489         Self::Output: Clone,
490     {
491         assert_future::<Self::Output, _>(Shared::new(self))
492     }
493 
494     /// Turn this future into a future that yields `()` on completion and sends
495     /// its output to another future on a separate task.
496     ///
497     /// This can be used with spawning executors to easily retrieve the result
498     /// of a future executing on a separate task or thread.
499     ///
500     /// This method is only available when the `std` feature of this
501     /// library is activated, and it is activated by default.
502     #[cfg(feature = "channel")]
503     #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
504     #[cfg(feature = "std")]
remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>) where Self: Sized,505     fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>)
506     where
507         Self: Sized,
508     {
509         let (wrapped, handle) = remote_handle::remote_handle(self);
510         (assert_future::<(), _>(wrapped), handle)
511     }
512 
513     /// Wrap the future in a Box, pinning it.
514     ///
515     /// This method is only available when the `std` or `alloc` feature of this
516     /// library is activated, and it is activated by default.
517     #[cfg(feature = "alloc")]
boxed<'a>(self) -> BoxFuture<'a, Self::Output> where Self: Sized + Send + 'a,518     fn boxed<'a>(self) -> BoxFuture<'a, Self::Output>
519     where
520         Self: Sized + Send + 'a,
521     {
522         assert_future::<Self::Output, _>(Box::pin(self))
523     }
524 
525     /// Wrap the future in a Box, pinning it.
526     ///
527     /// Similar to `boxed`, but without the `Send` requirement.
528     ///
529     /// This method is only available when the `std` or `alloc` feature of this
530     /// library is activated, and it is activated by default.
531     #[cfg(feature = "alloc")]
boxed_local<'a>(self) -> LocalBoxFuture<'a, Self::Output> where Self: Sized + 'a,532     fn boxed_local<'a>(self) -> LocalBoxFuture<'a, Self::Output>
533     where
534         Self: Sized + 'a,
535     {
536         assert_future::<Self::Output, _>(Box::pin(self))
537     }
538 
539     /// Turns a [`Future<Output = T>`](Future) into a
540     /// [`TryFuture<Ok = T, Error = ()`>](futures_core::future::TryFuture).
unit_error(self) -> UnitError<Self> where Self: Sized,541     fn unit_error(self) -> UnitError<Self>
542     where
543         Self: Sized,
544     {
545         assert_future::<Result<Self::Output, ()>, _>(UnitError::new(self))
546     }
547 
548     /// Turns a [`Future<Output = T>`](Future) into a
549     /// [`TryFuture<Ok = T, Error = Never`>](futures_core::future::TryFuture).
never_error(self) -> NeverError<Self> where Self: Sized,550     fn never_error(self) -> NeverError<Self>
551     where
552         Self: Sized,
553     {
554         assert_future::<Result<Self::Output, Never>, _>(NeverError::new(self))
555     }
556 
557     /// A convenience for calling `Future::poll` on `Unpin` future types.
poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output> where Self: Unpin,558     fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output>
559     where
560         Self: Unpin,
561     {
562         Pin::new(self).poll(cx)
563     }
564 
565     /// Evaluates and consumes the future, returning the resulting output if
566     /// the future is ready after the first call to `Future::poll`.
567     ///
568     /// If `poll` instead returns `Poll::Pending`, `None` is returned.
569     ///
570     /// This method is useful in cases where immediacy is more important than
571     /// waiting for a result. It is also convenient for quickly obtaining
572     /// the value of a future that is known to always resolve immediately.
573     ///
574     /// # Examples
575     ///
576     /// ```
577     /// # use futures::prelude::*;
578     /// use futures::{future::ready, future::pending};
579     /// let future_ready = ready("foobar");
580     /// let future_pending = pending::<&'static str>();
581     ///
582     /// assert_eq!(future_ready.now_or_never(), Some("foobar"));
583     /// assert_eq!(future_pending.now_or_never(), None);
584     /// ```
585     ///
586     /// In cases where it is absolutely known that a future should always
587     /// resolve immediately and never return `Poll::Pending`, this method can
588     /// be combined with `expect()`:
589     ///
590     /// ```
591     /// # use futures::{prelude::*, future::ready};
592     /// let future_ready = ready("foobar");
593     ///
594     /// assert_eq!(future_ready.now_or_never().expect("Future not ready"), "foobar");
595     /// ```
now_or_never(self) -> Option<Self::Output> where Self: Sized,596     fn now_or_never(self) -> Option<Self::Output>
597     where
598         Self: Sized,
599     {
600         let noop_waker = crate::task::noop_waker();
601         let mut cx = Context::from_waker(&noop_waker);
602 
603         let this = self;
604         pin_mut!(this);
605         match this.poll(&mut cx) {
606             Poll::Ready(x) => Some(x),
607             _ => None,
608         }
609     }
610 }
611