1 //! Definition of the `TryJoinAll` combinator, waiting for all of a list of
2 //! futures to finish with either success or error.
3 
4 use alloc::boxed::Box;
5 use alloc::vec::Vec;
6 use core::fmt;
7 use core::future::Future;
8 use core::iter::FromIterator;
9 use core::mem;
10 use core::pin::Pin;
11 use core::task::{Context, Poll};
12 
13 use super::{assert_future, TryFuture, TryMaybeDone};
14 
iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>>15 fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
16     // Safety: `std` _could_ make this unsound if it were to decide Pin's
17     // invariants aren't required to transmit through slices. Otherwise this has
18     // the same safety as a normal field pin projection.
19     unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) })
20 }
21 
22 enum FinalState<E = ()> {
23     Pending,
24     AllDone,
25     Error(E),
26 }
27 
28 /// Future for the [`try_join_all`] function.
29 #[must_use = "futures do nothing unless you `.await` or poll them"]
30 pub struct TryJoinAll<F>
31 where
32     F: TryFuture,
33 {
34     elems: Pin<Box<[TryMaybeDone<F>]>>,
35 }
36 
37 impl<F> fmt::Debug for TryJoinAll<F>
38 where
39     F: TryFuture + fmt::Debug,
40     F::Ok: fmt::Debug,
41     F::Error: fmt::Debug,
42 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result43     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44         f.debug_struct("TryJoinAll").field("elems", &self.elems).finish()
45     }
46 }
47 
48 /// Creates a future which represents either a collection of the results of the
49 /// futures given or an error.
50 ///
51 /// The returned future will drive execution for all of its underlying futures,
52 /// collecting the results into a destination `Vec<T>` in the same order as they
53 /// were provided.
54 ///
55 /// If any future returns an error then all other futures will be canceled and
56 /// an error will be returned immediately. If all futures complete successfully,
57 /// however, then the returned future will succeed with a `Vec` of all the
58 /// successful results.
59 ///
60 /// This function is only available when the `std` or `alloc` feature of this
61 /// library is activated, and it is activated by default.
62 ///
63 /// # Examples
64 ///
65 /// ```
66 /// # futures::executor::block_on(async {
67 /// use futures::future::{self, try_join_all};
68 ///
69 /// let futures = vec![
70 ///     future::ok::<u32, u32>(1),
71 ///     future::ok::<u32, u32>(2),
72 ///     future::ok::<u32, u32>(3),
73 /// ];
74 ///
75 /// assert_eq!(try_join_all(futures).await, Ok(vec![1, 2, 3]));
76 ///
77 /// let futures = vec![
78 ///     future::ok::<u32, u32>(1),
79 ///     future::err::<u32, u32>(2),
80 ///     future::ok::<u32, u32>(3),
81 /// ];
82 ///
83 /// assert_eq!(try_join_all(futures).await, Err(2));
84 /// # });
85 /// ```
try_join_all<I>(i: I) -> TryJoinAll<I::Item> where I: IntoIterator, I::Item: TryFuture,86 pub fn try_join_all<I>(i: I) -> TryJoinAll<I::Item>
87 where
88     I: IntoIterator,
89     I::Item: TryFuture,
90 {
91     let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect();
92     assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
93         TryJoinAll { elems: elems.into() },
94     )
95 }
96 
97 impl<F> Future for TryJoinAll<F>
98 where
99     F: TryFuture,
100 {
101     type Output = Result<Vec<F::Ok>, F::Error>;
102 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>103     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
104         let mut state = FinalState::AllDone;
105 
106         for elem in iter_pin_mut(self.elems.as_mut()) {
107             match elem.try_poll(cx) {
108                 Poll::Pending => state = FinalState::Pending,
109                 Poll::Ready(Ok(())) => {}
110                 Poll::Ready(Err(e)) => {
111                     state = FinalState::Error(e);
112                     break;
113                 }
114             }
115         }
116 
117         match state {
118             FinalState::Pending => Poll::Pending,
119             FinalState::AllDone => {
120                 let mut elems = mem::replace(&mut self.elems, Box::pin([]));
121                 let results =
122                     iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
123                 Poll::Ready(Ok(results))
124             }
125             FinalState::Error(e) => {
126                 let _ = mem::replace(&mut self.elems, Box::pin([]));
127                 Poll::Ready(Err(e))
128             }
129         }
130     }
131 }
132 
133 impl<F: TryFuture> FromIterator<F> for TryJoinAll<F> {
from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self134     fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
135         try_join_all(iter)
136     }
137 }
138