]> git.proxmox.com Git - rustc.git/blob - vendor/futures-util/src/stream/stream/buffer_unordered.rs
91b0f6bcce325a3c8829b0f3e3c6dca1e639f291
[rustc.git] / vendor / futures-util / src / stream / stream / buffer_unordered.rs
1 use crate::stream::{Fuse, FuturesUnordered, StreamExt};
2 use core::fmt;
3 use core::pin::Pin;
4 use futures_core::future::Future;
5 use futures_core::stream::{FusedStream, Stream};
6 use futures_core::task::{Context, Poll};
7 #[cfg(feature = "sink")]
8 use futures_sink::Sink;
9 use pin_project_lite::pin_project;
10
11 pin_project! {
12 /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered)
13 /// method.
14 #[must_use = "streams do nothing unless polled"]
15 pub struct BufferUnordered<St>
16 where
17 St: Stream,
18 {
19 #[pin]
20 stream: Fuse<St>,
21 in_progress_queue: FuturesUnordered<St::Item>,
22 max: usize,
23 }
24 }
25
26 impl<St> fmt::Debug for BufferUnordered<St>
27 where
28 St: Stream + fmt::Debug,
29 {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 f.debug_struct("BufferUnordered")
32 .field("stream", &self.stream)
33 .field("in_progress_queue", &self.in_progress_queue)
34 .field("max", &self.max)
35 .finish()
36 }
37 }
38
39 impl<St> BufferUnordered<St>
40 where
41 St: Stream,
42 St::Item: Future,
43 {
44 pub(super) fn new(stream: St, n: usize) -> Self {
45 Self {
46 stream: super::Fuse::new(stream),
47 in_progress_queue: FuturesUnordered::new(),
48 max: n,
49 }
50 }
51
52 delegate_access_inner!(stream, St, (.));
53 }
54
55 impl<St> Stream for BufferUnordered<St>
56 where
57 St: Stream,
58 St::Item: Future,
59 {
60 type Item = <St::Item as Future>::Output;
61
62 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63 let mut this = self.project();
64
65 // First up, try to spawn off as many futures as possible by filling up
66 // our queue of futures.
67 while this.in_progress_queue.len() < *this.max {
68 match this.stream.as_mut().poll_next(cx) {
69 Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
70 Poll::Ready(None) | Poll::Pending => break,
71 }
72 }
73
74 // Attempt to pull the next value from the in_progress_queue
75 match this.in_progress_queue.poll_next_unpin(cx) {
76 x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
77 Poll::Ready(None) => {}
78 }
79
80 // If more values are still coming from the stream, we're not done yet
81 if this.stream.is_done() {
82 Poll::Ready(None)
83 } else {
84 Poll::Pending
85 }
86 }
87
88 fn size_hint(&self) -> (usize, Option<usize>) {
89 let queue_len = self.in_progress_queue.len();
90 let (lower, upper) = self.stream.size_hint();
91 let lower = lower.saturating_add(queue_len);
92 let upper = match upper {
93 Some(x) => x.checked_add(queue_len),
94 None => None,
95 };
96 (lower, upper)
97 }
98 }
99
100 impl<St> FusedStream for BufferUnordered<St>
101 where
102 St: Stream,
103 St::Item: Future,
104 {
105 fn is_terminated(&self) -> bool {
106 self.in_progress_queue.is_terminated() && self.stream.is_terminated()
107 }
108 }
109
110 // Forwarding impl of Sink from the underlying stream
111 #[cfg(feature = "sink")]
112 impl<S, Item> Sink<Item> for BufferUnordered<S>
113 where
114 S: Stream + Sink<Item>,
115 S::Item: Future,
116 {
117 type Error = S::Error;
118
119 delegate_sink!(stream, Item);
120 }