1 use crate::stream
::{Fuse, FuturesUnordered, StreamExt}
;
4 use futures_core
::future
::Future
;
5 use futures_core
::stream
::{FusedStream, Stream}
;
6 use futures_core
::task
::{Context, Poll}
;
7 #[cfg(feature = "sink")]
8 use futures_sink
::Sink
;
9 use pin_project_lite
::pin_project
;
12 /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered)
14 #[must_use = "streams do nothing unless polled"]
15 pub struct BufferUnordered
<St
>
21 in_progress_queue
: FuturesUnordered
<St
::Item
>,
26 impl<St
> fmt
::Debug
for BufferUnordered
<St
>
28 St
: Stream
+ fmt
::Debug
,
30 fn fmt(&self, f
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
31 f
.debug_struct("BufferUnordered")
32 .field("stream", &self.stream
)
33 .field("in_progress_queue", &self.in_progress_queue
)
34 .field("max", &self.max
)
39 impl<St
> BufferUnordered
<St
>
44 pub(super) fn new(stream
: St
, n
: usize) -> Self {
46 stream
: super::Fuse
::new(stream
),
47 in_progress_queue
: FuturesUnordered
::new(),
52 delegate_access_inner
!(stream
, St
, (.));
55 impl<St
> Stream
for BufferUnordered
<St
>
60 type Item
= <St
::Item
as Future
>::Output
;
62 fn poll_next(self: Pin
<&mut Self>, cx
: &mut Context
<'_
>) -> Poll
<Option
<Self::Item
>> {
63 let mut this
= self.project();
65 // First up, try to spawn off as many futures as possible by filling up
66 // our queue of futures.
67 while this
.in_progress_queue
.len() < *this
.max
{
68 match this
.stream
.as_mut().poll_next(cx
) {
69 Poll
::Ready(Some(fut
)) => this
.in_progress_queue
.push(fut
),
70 Poll
::Ready(None
) | Poll
::Pending
=> break,
74 // Attempt to pull the next value from the in_progress_queue
75 match this
.in_progress_queue
.poll_next_unpin(cx
) {
76 x @ Poll
::Pending
| x @ Poll
::Ready(Some(_
)) => return x
,
77 Poll
::Ready(None
) => {}
80 // If more values are still coming from the stream, we're not done yet
81 if this
.stream
.is_done() {
88 fn size_hint(&self) -> (usize, Option
<usize>) {
89 let queue_len
= self.in_progress_queue
.len();
90 let (lower
, upper
) = self.stream
.size_hint();
91 let lower
= lower
.saturating_add(queue_len
);
92 let upper
= match upper
{
93 Some(x
) => x
.checked_add(queue_len
),
100 impl<St
> FusedStream
for BufferUnordered
<St
>
105 fn is_terminated(&self) -> bool
{
106 self.in_progress_queue
.is_terminated() && self.stream
.is_terminated()
110 // Forwarding impl of Sink from the underlying stream
111 #[cfg(feature = "sink")]
112 impl<S
, Item
> Sink
<Item
> for BufferUnordered
<S
>
114 S
: Stream
+ Sink
<Item
>,
117 type Error
= S
::Error
;
119 delegate_sink
!(stream
, Item
);