]>
Commit | Line | Data |
---|---|---|
5099ac24 FG |
1 | //! An unbounded set of streams |
2 | ||
3 | use core::fmt::{self, Debug}; | |
4 | use core::iter::FromIterator; | |
5 | use core::pin::Pin; | |
6 | ||
7 | use futures_core::ready; | |
8 | use futures_core::stream::{FusedStream, Stream}; | |
9 | use futures_core::task::{Context, Poll}; | |
10 | ||
5099ac24 FG |
11 | use super::assert_stream; |
12 | use crate::stream::{futures_unordered, FuturesUnordered, StreamExt, StreamFuture}; | |
13 | ||
353b0b11 FG |
14 | /// An unbounded set of streams |
15 | /// | |
16 | /// This "combinator" provides the ability to maintain a set of streams | |
17 | /// and drive them all to completion. | |
18 | /// | |
19 | /// Streams are pushed into this set and their realized values are | |
20 | /// yielded as they become ready. Streams will only be polled when they | |
21 | /// generate notifications. This allows to coordinate a large number of streams. | |
22 | /// | |
23 | /// Note that you can create a ready-made `SelectAll` via the | |
24 | /// `select_all` function in the `stream` module, or you can start with an | |
25 | /// empty set with the `SelectAll::new` constructor. | |
26 | #[must_use = "streams do nothing unless polled"] | |
27 | pub struct SelectAll<St> { | |
28 | inner: FuturesUnordered<StreamFuture<St>>, | |
5099ac24 FG |
29 | } |
30 | ||
31 | impl<St: Debug> Debug for SelectAll<St> { | |
32 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
33 | write!(f, "SelectAll {{ ... }}") | |
34 | } | |
35 | } | |
36 | ||
37 | impl<St: Stream + Unpin> SelectAll<St> { | |
38 | /// Constructs a new, empty `SelectAll` | |
39 | /// | |
40 | /// The returned `SelectAll` does not contain any streams and, in this | |
41 | /// state, `SelectAll::poll` will return `Poll::Ready(None)`. | |
42 | pub fn new() -> Self { | |
43 | Self { inner: FuturesUnordered::new() } | |
44 | } | |
45 | ||
46 | /// Returns the number of streams contained in the set. | |
47 | /// | |
48 | /// This represents the total number of in-flight streams. | |
49 | pub fn len(&self) -> usize { | |
50 | self.inner.len() | |
51 | } | |
52 | ||
53 | /// Returns `true` if the set contains no streams | |
54 | pub fn is_empty(&self) -> bool { | |
55 | self.inner.is_empty() | |
56 | } | |
57 | ||
58 | /// Push a stream into the set. | |
59 | /// | |
60 | /// This function submits the given stream to the set for managing. This | |
61 | /// function will not call `poll` on the submitted stream. The caller must | |
62 | /// ensure that `SelectAll::poll` is called in order to receive task | |
63 | /// notifications. | |
64 | pub fn push(&mut self, stream: St) { | |
65 | self.inner.push(stream.into_future()); | |
66 | } | |
67 | ||
68 | /// Returns an iterator that allows inspecting each stream in the set. | |
69 | pub fn iter(&self) -> Iter<'_, St> { | |
70 | Iter(self.inner.iter()) | |
71 | } | |
72 | ||
73 | /// Returns an iterator that allows modifying each stream in the set. | |
74 | pub fn iter_mut(&mut self) -> IterMut<'_, St> { | |
75 | IterMut(self.inner.iter_mut()) | |
76 | } | |
77 | ||
78 | /// Clears the set, removing all streams. | |
79 | pub fn clear(&mut self) { | |
80 | self.inner.clear() | |
81 | } | |
82 | } | |
83 | ||
84 | impl<St: Stream + Unpin> Default for SelectAll<St> { | |
85 | fn default() -> Self { | |
86 | Self::new() | |
87 | } | |
88 | } | |
89 | ||
90 | impl<St: Stream + Unpin> Stream for SelectAll<St> { | |
91 | type Item = St::Item; | |
92 | ||
93 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
94 | loop { | |
95 | match ready!(self.inner.poll_next_unpin(cx)) { | |
96 | Some((Some(item), remaining)) => { | |
97 | self.push(remaining); | |
98 | return Poll::Ready(Some(item)); | |
99 | } | |
100 | Some((None, _)) => { | |
101 | // `FuturesUnordered` thinks it isn't terminated | |
102 | // because it yielded a Some. | |
103 | // We do not return, but poll `FuturesUnordered` | |
104 | // in the next loop iteration. | |
105 | } | |
106 | None => return Poll::Ready(None), | |
107 | } | |
108 | } | |
109 | } | |
110 | } | |
111 | ||
112 | impl<St: Stream + Unpin> FusedStream for SelectAll<St> { | |
113 | fn is_terminated(&self) -> bool { | |
114 | self.inner.is_terminated() | |
115 | } | |
116 | } | |
117 | ||
118 | /// Convert a list of streams into a `Stream` of results from the streams. | |
119 | /// | |
120 | /// This essentially takes a list of streams (e.g. a vector, an iterator, etc.) | |
121 | /// and bundles them together into a single stream. | |
122 | /// The stream will yield items as they become available on the underlying | |
123 | /// streams internally, in the order they become available. | |
124 | /// | |
125 | /// Note that the returned set can also be used to dynamically push more | |
126 | /// streams into the set as they become available. | |
127 | /// | |
128 | /// This function is only available when the `std` or `alloc` feature of this | |
129 | /// library is activated, and it is activated by default. | |
130 | pub fn select_all<I>(streams: I) -> SelectAll<I::Item> | |
131 | where | |
132 | I: IntoIterator, | |
133 | I::Item: Stream + Unpin, | |
134 | { | |
135 | let mut set = SelectAll::new(); | |
136 | ||
137 | for stream in streams { | |
138 | set.push(stream); | |
139 | } | |
140 | ||
141 | assert_stream::<<I::Item as Stream>::Item, _>(set) | |
142 | } | |
143 | ||
144 | impl<St: Stream + Unpin> FromIterator<St> for SelectAll<St> { | |
145 | fn from_iter<T: IntoIterator<Item = St>>(iter: T) -> Self { | |
146 | select_all(iter) | |
147 | } | |
148 | } | |
149 | ||
150 | impl<St: Stream + Unpin> Extend<St> for SelectAll<St> { | |
151 | fn extend<T: IntoIterator<Item = St>>(&mut self, iter: T) { | |
152 | for st in iter { | |
153 | self.push(st) | |
154 | } | |
155 | } | |
156 | } | |
157 | ||
158 | impl<St: Stream + Unpin> IntoIterator for SelectAll<St> { | |
159 | type Item = St; | |
160 | type IntoIter = IntoIter<St>; | |
161 | ||
162 | fn into_iter(self) -> Self::IntoIter { | |
163 | IntoIter(self.inner.into_iter()) | |
164 | } | |
165 | } | |
166 | ||
167 | impl<'a, St: Stream + Unpin> IntoIterator for &'a SelectAll<St> { | |
168 | type Item = &'a St; | |
169 | type IntoIter = Iter<'a, St>; | |
170 | ||
171 | fn into_iter(self) -> Self::IntoIter { | |
172 | self.iter() | |
173 | } | |
174 | } | |
175 | ||
176 | impl<'a, St: Stream + Unpin> IntoIterator for &'a mut SelectAll<St> { | |
177 | type Item = &'a mut St; | |
178 | type IntoIter = IterMut<'a, St>; | |
179 | ||
180 | fn into_iter(self) -> Self::IntoIter { | |
181 | self.iter_mut() | |
182 | } | |
183 | } | |
184 | ||
185 | /// Immutable iterator over all streams in the unordered set. | |
186 | #[derive(Debug)] | |
187 | pub struct Iter<'a, St: Unpin>(futures_unordered::Iter<'a, StreamFuture<St>>); | |
188 | ||
189 | /// Mutable iterator over all streams in the unordered set. | |
190 | #[derive(Debug)] | |
191 | pub struct IterMut<'a, St: Unpin>(futures_unordered::IterMut<'a, StreamFuture<St>>); | |
192 | ||
193 | /// Owned iterator over all streams in the unordered set. | |
194 | #[derive(Debug)] | |
195 | pub struct IntoIter<St: Unpin>(futures_unordered::IntoIter<StreamFuture<St>>); | |
196 | ||
197 | impl<'a, St: Stream + Unpin> Iterator for Iter<'a, St> { | |
198 | type Item = &'a St; | |
199 | ||
200 | fn next(&mut self) -> Option<Self::Item> { | |
201 | let st = self.0.next()?; | |
202 | let next = st.get_ref(); | |
203 | // This should always be true because FuturesUnordered removes completed futures. | |
204 | debug_assert!(next.is_some()); | |
205 | next | |
206 | } | |
207 | ||
208 | fn size_hint(&self) -> (usize, Option<usize>) { | |
209 | self.0.size_hint() | |
210 | } | |
211 | } | |
212 | ||
213 | impl<St: Stream + Unpin> ExactSizeIterator for Iter<'_, St> {} | |
214 | ||
215 | impl<'a, St: Stream + Unpin> Iterator for IterMut<'a, St> { | |
216 | type Item = &'a mut St; | |
217 | ||
218 | fn next(&mut self) -> Option<Self::Item> { | |
219 | let st = self.0.next()?; | |
220 | let next = st.get_mut(); | |
221 | // This should always be true because FuturesUnordered removes completed futures. | |
222 | debug_assert!(next.is_some()); | |
223 | next | |
224 | } | |
225 | ||
226 | fn size_hint(&self) -> (usize, Option<usize>) { | |
227 | self.0.size_hint() | |
228 | } | |
229 | } | |
230 | ||
231 | impl<St: Stream + Unpin> ExactSizeIterator for IterMut<'_, St> {} | |
232 | ||
233 | impl<St: Stream + Unpin> Iterator for IntoIter<St> { | |
234 | type Item = St; | |
235 | ||
236 | fn next(&mut self) -> Option<Self::Item> { | |
237 | let st = self.0.next()?; | |
238 | let next = st.into_inner(); | |
239 | // This should always be true because FuturesUnordered removes completed futures. | |
240 | debug_assert!(next.is_some()); | |
241 | next | |
242 | } | |
243 | ||
244 | fn size_hint(&self) -> (usize, Option<usize>) { | |
245 | self.0.size_hint() | |
246 | } | |
247 | } | |
248 | ||
249 | impl<St: Stream + Unpin> ExactSizeIterator for IntoIter<St> {} |