]>
Commit | Line | Data |
---|---|---|
5099ac24 FG |
1 | use core::mem; |
2 | use core::pin::Pin; | |
3 | use futures_core::future::{FusedFuture, Future}; | |
4 | use futures_core::ready; | |
5 | use futures_core::stream::{FusedStream, Stream}; | |
6 | use futures_core::task::{Context, Poll}; | |
7 | use pin_project_lite::pin_project; | |
8 | ||
9 | pin_project! { | |
10 | /// Future for the [`collect`](super::StreamExt::collect) method. | |
11 | #[derive(Debug)] | |
12 | #[must_use = "futures do nothing unless you `.await` or poll them"] | |
13 | pub struct Collect<St, C> { | |
14 | #[pin] | |
15 | stream: St, | |
16 | collection: C, | |
17 | } | |
18 | } | |
19 | ||
20 | impl<St: Stream, C: Default> Collect<St, C> { | |
21 | fn finish(self: Pin<&mut Self>) -> C { | |
353b0b11 | 22 | mem::take(self.project().collection) |
5099ac24 FG |
23 | } |
24 | ||
25 | pub(super) fn new(stream: St) -> Self { | |
26 | Self { stream, collection: Default::default() } | |
27 | } | |
28 | } | |
29 | ||
30 | impl<St, C> FusedFuture for Collect<St, C> | |
31 | where | |
32 | St: FusedStream, | |
33 | C: Default + Extend<St::Item>, | |
34 | { | |
35 | fn is_terminated(&self) -> bool { | |
36 | self.stream.is_terminated() | |
37 | } | |
38 | } | |
39 | ||
40 | impl<St, C> Future for Collect<St, C> | |
41 | where | |
42 | St: Stream, | |
43 | C: Default + Extend<St::Item>, | |
44 | { | |
45 | type Output = C; | |
46 | ||
47 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> { | |
48 | let mut this = self.as_mut().project(); | |
49 | loop { | |
50 | match ready!(this.stream.as_mut().poll_next(cx)) { | |
51 | Some(e) => this.collection.extend(Some(e)), | |
52 | None => return Poll::Ready(self.finish()), | |
53 | } | |
54 | } | |
55 | } | |
56 | } |