]>
Commit | Line | Data |
---|---|---|
5099ac24 FG |
1 | //! Definition of the `TryJoinAll` combinator, waiting for all of a list of |
2 | //! futures to finish with either success or error. | |
3 | ||
4 | use alloc::boxed::Box; | |
5 | use alloc::vec::Vec; | |
6 | use core::fmt; | |
7 | use core::future::Future; | |
8 | use core::iter::FromIterator; | |
9 | use core::mem; | |
10 | use core::pin::Pin; | |
11 | use core::task::{Context, Poll}; | |
12 | ||
353b0b11 | 13 | use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone}; |
5099ac24 | 14 | |
353b0b11 FG |
15 | #[cfg(not(futures_no_atomic_cas))] |
16 | use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt}; | |
17 | use crate::TryFutureExt; | |
5099ac24 FG |
18 | |
19 | enum FinalState<E = ()> { | |
20 | Pending, | |
21 | AllDone, | |
22 | Error(E), | |
23 | } | |
24 | ||
25 | /// Future for the [`try_join_all`] function. | |
26 | #[must_use = "futures do nothing unless you `.await` or poll them"] | |
27 | pub struct TryJoinAll<F> | |
28 | where | |
29 | F: TryFuture, | |
30 | { | |
353b0b11 FG |
31 | kind: TryJoinAllKind<F>, |
32 | } | |
33 | ||
34 | enum TryJoinAllKind<F> | |
35 | where | |
36 | F: TryFuture, | |
37 | { | |
38 | Small { | |
39 | elems: Pin<Box<[TryMaybeDone<IntoFuture<F>>]>>, | |
40 | }, | |
41 | #[cfg(not(futures_no_atomic_cas))] | |
42 | Big { | |
43 | fut: TryCollect<FuturesOrdered<IntoFuture<F>>, Vec<F::Ok>>, | |
44 | }, | |
5099ac24 FG |
45 | } |
46 | ||
47 | impl<F> fmt::Debug for TryJoinAll<F> | |
48 | where | |
49 | F: TryFuture + fmt::Debug, | |
50 | F::Ok: fmt::Debug, | |
51 | F::Error: fmt::Debug, | |
353b0b11 | 52 | F::Output: fmt::Debug, |
5099ac24 FG |
53 | { |
54 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
353b0b11 FG |
55 | match self.kind { |
56 | TryJoinAllKind::Small { ref elems } => { | |
57 | f.debug_struct("TryJoinAll").field("elems", elems).finish() | |
58 | } | |
59 | #[cfg(not(futures_no_atomic_cas))] | |
60 | TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), | |
61 | } | |
5099ac24 FG |
62 | } |
63 | } | |
64 | ||
65 | /// Creates a future which represents either a collection of the results of the | |
66 | /// futures given or an error. | |
67 | /// | |
68 | /// The returned future will drive execution for all of its underlying futures, | |
69 | /// collecting the results into a destination `Vec<T>` in the same order as they | |
70 | /// were provided. | |
71 | /// | |
72 | /// If any future returns an error then all other futures will be canceled and | |
73 | /// an error will be returned immediately. If all futures complete successfully, | |
74 | /// however, then the returned future will succeed with a `Vec` of all the | |
75 | /// successful results. | |
76 | /// | |
77 | /// This function is only available when the `std` or `alloc` feature of this | |
78 | /// library is activated, and it is activated by default. | |
79 | /// | |
353b0b11 FG |
80 | /// # See Also |
81 | /// | |
82 | /// `try_join_all` will switch to the more powerful [`FuturesOrdered`] for performance | |
83 | /// reasons if the number of futures is large. You may want to look into using it or | |
84 | /// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly. | |
85 | /// | |
86 | /// Some examples for additional functionality provided by these are: | |
87 | /// | |
88 | /// * Adding new futures to the set even after it has been started. | |
89 | /// | |
90 | /// * Only polling the specific futures that have been woken. In cases where | |
91 | /// you have a lot of futures this will result in much more efficient polling. | |
92 | /// | |
93 | /// | |
5099ac24 FG |
94 | /// # Examples |
95 | /// | |
96 | /// ``` | |
97 | /// # futures::executor::block_on(async { | |
98 | /// use futures::future::{self, try_join_all}; | |
99 | /// | |
100 | /// let futures = vec![ | |
101 | /// future::ok::<u32, u32>(1), | |
102 | /// future::ok::<u32, u32>(2), | |
103 | /// future::ok::<u32, u32>(3), | |
104 | /// ]; | |
105 | /// | |
106 | /// assert_eq!(try_join_all(futures).await, Ok(vec![1, 2, 3])); | |
107 | /// | |
108 | /// let futures = vec![ | |
109 | /// future::ok::<u32, u32>(1), | |
110 | /// future::err::<u32, u32>(2), | |
111 | /// future::ok::<u32, u32>(3), | |
112 | /// ]; | |
113 | /// | |
114 | /// assert_eq!(try_join_all(futures).await, Err(2)); | |
115 | /// # }); | |
116 | /// ``` | |
353b0b11 | 117 | pub fn try_join_all<I>(iter: I) -> TryJoinAll<I::Item> |
5099ac24 FG |
118 | where |
119 | I: IntoIterator, | |
120 | I::Item: TryFuture, | |
121 | { | |
353b0b11 FG |
122 | let iter = iter.into_iter().map(TryFutureExt::into_future); |
123 | ||
124 | #[cfg(futures_no_atomic_cas)] | |
125 | { | |
126 | let kind = TryJoinAllKind::Small { | |
127 | elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(), | |
128 | }; | |
129 | ||
130 | assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( | |
131 | TryJoinAll { kind }, | |
132 | ) | |
133 | } | |
134 | ||
135 | #[cfg(not(futures_no_atomic_cas))] | |
136 | { | |
137 | let kind = match iter.size_hint().1 { | |
138 | Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small { | |
139 | elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(), | |
140 | }, | |
141 | _ => TryJoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().try_collect() }, | |
142 | }; | |
143 | ||
144 | assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( | |
145 | TryJoinAll { kind }, | |
146 | ) | |
147 | } | |
5099ac24 FG |
148 | } |
149 | ||
150 | impl<F> Future for TryJoinAll<F> | |
151 | where | |
152 | F: TryFuture, | |
153 | { | |
154 | type Output = Result<Vec<F::Ok>, F::Error>; | |
155 | ||
156 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
353b0b11 FG |
157 | match &mut self.kind { |
158 | TryJoinAllKind::Small { elems } => { | |
159 | let mut state = FinalState::AllDone; | |
160 | ||
161 | for elem in join_all::iter_pin_mut(elems.as_mut()) { | |
162 | match elem.try_poll(cx) { | |
163 | Poll::Pending => state = FinalState::Pending, | |
164 | Poll::Ready(Ok(())) => {} | |
165 | Poll::Ready(Err(e)) => { | |
166 | state = FinalState::Error(e); | |
167 | break; | |
168 | } | |
169 | } | |
5099ac24 | 170 | } |
5099ac24 | 171 | |
353b0b11 FG |
172 | match state { |
173 | FinalState::Pending => Poll::Pending, | |
174 | FinalState::AllDone => { | |
175 | let mut elems = mem::replace(elems, Box::pin([])); | |
176 | let results = join_all::iter_pin_mut(elems.as_mut()) | |
177 | .map(|e| e.take_output().unwrap()) | |
178 | .collect(); | |
179 | Poll::Ready(Ok(results)) | |
180 | } | |
181 | FinalState::Error(e) => { | |
182 | let _ = mem::replace(elems, Box::pin([])); | |
183 | Poll::Ready(Err(e)) | |
184 | } | |
185 | } | |
5099ac24 | 186 | } |
353b0b11 FG |
187 | #[cfg(not(futures_no_atomic_cas))] |
188 | TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx), | |
5099ac24 FG |
189 | } |
190 | } | |
191 | } | |
192 | ||
353b0b11 FG |
193 | impl<F> FromIterator<F> for TryJoinAll<F> |
194 | where | |
195 | F: TryFuture, | |
196 | { | |
5099ac24 FG |
197 | fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self { |
198 | try_join_all(iter) | |
199 | } | |
200 | } |