1 use core
::marker
::PhantomData
;
4 use futures_core
::ready
;
5 use futures_core
::stream
::{FusedStream, Stream, TryStream}
;
6 use futures_core
::task
::{Context, Poll}
;
7 #[cfg(feature = "sink")]
8 use futures_sink
::Sink
;
10 use pin_project_lite
::pin_project
;
12 use crate::future
::Either
;
13 use crate::stream
::stream
::flatten_unordered
::{
14 FlattenUnorderedWithFlowController
, FlowController
, FlowStep
,
16 use crate::stream
::IntoStream
;
17 use crate::TryStreamExt
;
20 /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method.
21 TryFlattenUnordered
<St
>(
22 FlattenUnorderedWithFlowController
<NestedTryStreamIntoEitherTryStream
<St
>, PropagateBaseStreamError
<St
>>
23 ): Debug
+ Sink
+ Stream
+ FusedStream
+ AccessInner
[St
, (. .)]
25 |stream
: St
, limit
: impl Into
<Option
<usize>>|
26 FlattenUnorderedWithFlowController
::new(
27 NestedTryStreamIntoEitherTryStream
::new(stream
),
35 <St
::Ok
as TryStream
>::Error
: From
<St
::Error
>
39 /// Emits either successful streams or single-item streams containing the underlying errors.
40 /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`.
42 #[must_use = "streams do nothing unless polled"]
43 pub struct NestedTryStreamIntoEitherTryStream
<St
>
48 <St
::Ok
as TryStream
>::Error
: From
<St
::Error
>
55 impl<St
> NestedTryStreamIntoEitherTryStream
<St
>
58 St
::Ok
: TryStream
+ Unpin
,
59 <St
::Ok
as TryStream
>::Error
: From
<St
::Error
>,
61 fn new(stream
: St
) -> Self {
65 delegate_access_inner
!(stream
, St
, ());
68 /// Emits a single item immediately, then stream will be terminated.
69 #[derive(Debug, Clone)]
70 pub struct Single
<T
>(Option
<T
>);
73 /// Constructs new `Single` with the given value.
74 fn new(val
: T
) -> Self {
78 /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated.
79 fn next_immediate(&mut self) -> Option
<T
> {
84 impl<T
> Unpin
for Single
<T
> {}
86 impl<T
> Stream
for Single
<T
> {
89 fn poll_next(mut self: Pin
<&mut Self>, _
: &mut Context
<'_
>) -> Poll
<Option
<Self::Item
>> {
90 Poll
::Ready(self.0.take())
93 fn size_hint(&self) -> (usize, Option
<usize>) {
94 self.0.as_ref().map_or((0, Some(0)), |_
| (1, Some(1)))
98 /// Immediately propagates errors occurred in the base stream.
99 #[derive(Debug, Clone, Copy)]
100 pub struct PropagateBaseStreamError
<St
>(PhantomData
<St
>);
102 type BaseStreamItem
<St
> = <NestedTryStreamIntoEitherTryStream
<St
> as Stream
>::Item
;
103 type InnerStreamItem
<St
> = <BaseStreamItem
<St
> as Stream
>::Item
;
105 impl<St
> FlowController
<BaseStreamItem
<St
>, InnerStreamItem
<St
>> for PropagateBaseStreamError
<St
>
108 St
::Ok
: TryStream
+ Unpin
,
109 <St
::Ok
as TryStream
>::Error
: From
<St
::Error
>,
111 fn next_step(item
: BaseStreamItem
<St
>) -> FlowStep
<BaseStreamItem
<St
>, InnerStreamItem
<St
>> {
113 // A new successful inner stream received
114 st @ Either
::Left(_
) => FlowStep
::Continue(st
),
115 // An error encountered
116 Either
::Right(mut err
) => FlowStep
::Return(err
.next_immediate().unwrap()),
121 type SingleStreamResult
<St
> = Single
<Result
<<St
as TryStream
>::Ok
, <St
as TryStream
>::Error
>>;
123 impl<St
> Stream
for NestedTryStreamIntoEitherTryStream
<St
>
126 St
::Ok
: TryStream
+ Unpin
,
127 <St
::Ok
as TryStream
>::Error
: From
<St
::Error
>,
129 // Item is either an inner stream or a stream containing a single error.
130 // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s.
131 type Item
= Either
<IntoStream
<St
::Ok
>, SingleStreamResult
<St
::Ok
>>;
133 fn poll_next(self: Pin
<&mut Self>, cx
: &mut Context
<'_
>) -> Poll
<Option
<Self::Item
>> {
134 let item
= ready
!(self.project().stream
.try_poll_next(cx
));
136 let out
= match item
{
137 Some(res
) => match res
{
138 // Emit successful inner stream as is
139 Ok(stream
) => Either
::Left(stream
.into_stream()),
140 // Wrap an error into a stream containing a single item
142 let res
= err
.map(|_
: St
::Ok
| unreachable
!()).map_err(Into
::into
);
144 Either
::Right(Single
::new(res
))
147 None
=> return Poll
::Ready(None
),
150 Poll
::Ready(Some(out
))
154 impl<St
> FusedStream
for NestedTryStreamIntoEitherTryStream
<St
>
156 St
: TryStream
+ FusedStream
,
157 St
::Ok
: TryStream
+ Unpin
,
158 <St
::Ok
as TryStream
>::Error
: From
<St
::Error
>,
160 fn is_terminated(&self) -> bool
{
161 self.stream
.is_terminated()
165 // Forwarding impl of Sink from the underlying stream
166 #[cfg(feature = "sink")]
167 impl<St
, Item
> Sink
<Item
> for NestedTryStreamIntoEitherTryStream
<St
>
169 St
: TryStream
+ Sink
<Item
>,
170 St
::Ok
: TryStream
+ Unpin
,
171 <St
::Ok
as TryStream
>::Error
: From
<<St
as TryStream
>::Error
>,
173 type Error
= <St
as Sink
<Item
>>::Error
;
175 delegate_sink
!(stream
, Item
);