]>
git.proxmox.com Git - rustc.git/blob - vendor/futures-util/src/stream/select_with_strategy.rs
1 use super::assert_stream
;
2 use core
::{fmt, pin::Pin}
;
3 use futures_core
::stream
::{FusedStream, Stream}
;
4 use futures_core
::task
::{Context, Poll}
;
5 use pin_project_lite
::pin_project
;
7 /// Type to tell [`SelectWithStrategy`] which stream to poll next.
8 #[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
10 /// Poll the first stream.
12 /// Poll the second stream.
17 /// Toggle the value and return the old one.
18 pub fn toggle(&mut self) -> Self {
24 fn other(&self) -> PollNext
{
26 PollNext
::Left
=> PollNext
::Right
,
27 PollNext
::Right
=> PollNext
::Left
,
32 impl Default
for PollNext
{
33 fn default() -> Self {
46 fn finish(&mut self, ps
: PollNext
) {
48 (InternalState
::Start
, PollNext
::Left
) => {
49 *self = InternalState
::LeftFinished
;
51 (InternalState
::Start
, PollNext
::Right
) => {
52 *self = InternalState
::RightFinished
;
54 (InternalState
::LeftFinished
, PollNext
::Right
)
55 | (InternalState
::RightFinished
, PollNext
::Left
) => {
56 *self = InternalState
::BothFinished
;
64 /// Stream for the [`select_with_strategy()`] function. See function docs for details.
65 #[must_use = "streams do nothing unless polled"]
66 #[project = SelectWithStrategyProj]
67 pub struct SelectWithStrategy
<St1
, St2
, Clos
, State
> {
72 internal_state
: InternalState
,
78 /// This function will attempt to pull items from both streams. You provide a
79 /// closure to tell [`SelectWithStrategy`] which stream to poll. The closure can
80 /// store state on `SelectWithStrategy` to which it will receive a `&mut` on every
81 /// invocation. This allows basing the strategy on prior choices.
83 /// After one of the two input streams completes, the remaining one will be
84 /// polled exclusively. The returned stream completes when both input
85 /// streams have completed.
87 /// Note that this function consumes both streams and returns a wrapped
93 /// This example shows how to always prioritize the left stream.
96 /// # futures::executor::block_on(async {
97 /// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };
99 /// let left = repeat(1);
100 /// let right = repeat(2);
102 /// // We don't need any state, so let's make it an empty tuple.
103 /// // We must provide some type here, as there is no way for the compiler
104 /// // to infer it. As we don't need to capture variables, we can just
105 /// // use a function pointer instead of a closure.
106 /// fn prio_left(_: &mut ()) -> PollNext { PollNext::Left }
108 /// let mut out = select_with_strategy(left, right, prio_left);
110 /// for _ in 0..100 {
111 /// // Whenever we poll out, we will alwas get `1`.
112 /// assert_eq!(1, out.select_next_some().await);
118 /// This example shows how to select from both streams round robin.
119 /// Note: this special case is provided by [`futures-util::stream::select`].
122 /// # futures::executor::block_on(async {
123 /// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };
125 /// let left = repeat(1);
126 /// let right = repeat(2);
128 /// let rrobin = |last: &mut PollNext| last.toggle();
130 /// let mut out = select_with_strategy(left, right, rrobin);
132 /// for _ in 0..100 {
133 /// // We should be alternating now.
134 /// assert_eq!(1, out.select_next_some().await);
135 /// assert_eq!(2, out.select_next_some().await);
139 pub fn select_with_strategy
<St1
, St2
, Clos
, State
>(
143 ) -> SelectWithStrategy
<St1
, St2
, Clos
, State
>
146 St2
: Stream
<Item
= St1
::Item
>,
147 Clos
: FnMut(&mut State
) -> PollNext
,
150 assert_stream
::<St1
::Item
, _
>(SelectWithStrategy
{
153 state
: Default
::default(),
154 internal_state
: InternalState
::Start
,
159 impl<St1
, St2
, Clos
, State
> SelectWithStrategy
<St1
, St2
, Clos
, State
> {
160 /// Acquires a reference to the underlying streams that this combinator is
162 pub fn get_ref(&self) -> (&St1
, &St2
) {
163 (&self.stream1
, &self.stream2
)
166 /// Acquires a mutable reference to the underlying streams that this
167 /// combinator is pulling from.
169 /// Note that care must be taken to avoid tampering with the state of the
170 /// stream which may otherwise confuse this combinator.
171 pub fn get_mut(&mut self) -> (&mut St1
, &mut St2
) {
172 (&mut self.stream1
, &mut self.stream2
)
175 /// Acquires a pinned mutable reference to the underlying streams that this
176 /// combinator is pulling from.
178 /// Note that care must be taken to avoid tampering with the state of the
179 /// stream which may otherwise confuse this combinator.
180 pub fn get_pin_mut(self: Pin
<&mut Self>) -> (Pin
<&mut St1
>, Pin
<&mut St2
>) {
181 let this
= self.project();
182 (this
.stream1
, this
.stream2
)
185 /// Consumes this combinator, returning the underlying streams.
187 /// Note that this may discard intermediate state of this combinator, so
188 /// care should be taken to avoid losing resources when this is called.
189 pub fn into_inner(self) -> (St1
, St2
) {
190 (self.stream1
, self.stream2
)
194 impl<St1
, St2
, Clos
, State
> FusedStream
for SelectWithStrategy
<St1
, St2
, Clos
, State
>
197 St2
: Stream
<Item
= St1
::Item
>,
198 Clos
: FnMut(&mut State
) -> PollNext
,
200 fn is_terminated(&self) -> bool
{
201 match self.internal_state
{
202 InternalState
::BothFinished
=> true,
209 fn poll_side
<St1
, St2
, Clos
, State
>(
210 select
: &mut SelectWithStrategyProj
<'_
, St1
, St2
, Clos
, State
>,
212 cx
: &mut Context
<'_
>,
213 ) -> Poll
<Option
<St1
::Item
>>
216 St2
: Stream
<Item
= St1
::Item
>,
219 PollNext
::Left
=> select
.stream1
.as_mut().poll_next(cx
),
220 PollNext
::Right
=> select
.stream2
.as_mut().poll_next(cx
),
225 fn poll_inner
<St1
, St2
, Clos
, State
>(
226 select
: &mut SelectWithStrategyProj
<'_
, St1
, St2
, Clos
, State
>,
228 cx
: &mut Context
<'_
>,
229 ) -> Poll
<Option
<St1
::Item
>>
232 St2
: Stream
<Item
= St1
::Item
>,
234 let first_done
= match poll_side(select
, side
, cx
) {
235 Poll
::Ready(Some(item
)) => return Poll
::Ready(Some(item
)),
236 Poll
::Ready(None
) => {
237 select
.internal_state
.finish(side
);
240 Poll
::Pending
=> false,
242 let other
= side
.other();
243 match poll_side(select
, other
, cx
) {
244 Poll
::Ready(None
) => {
245 select
.internal_state
.finish(other
);
256 impl<St1
, St2
, Clos
, State
> Stream
for SelectWithStrategy
<St1
, St2
, Clos
, State
>
259 St2
: Stream
<Item
= St1
::Item
>,
260 Clos
: FnMut(&mut State
) -> PollNext
,
262 type Item
= St1
::Item
;
264 fn poll_next(self: Pin
<&mut Self>, cx
: &mut Context
<'_
>) -> Poll
<Option
<St1
::Item
>> {
265 let mut this
= self.project();
267 match this
.internal_state
{
268 InternalState
::Start
=> {
269 let next_side
= (this
.clos
)(this
.state
);
270 poll_inner(&mut this
, next_side
, cx
)
272 InternalState
::LeftFinished
=> match this
.stream2
.poll_next(cx
) {
273 Poll
::Ready(None
) => {
274 *this
.internal_state
= InternalState
::BothFinished
;
279 InternalState
::RightFinished
=> match this
.stream1
.poll_next(cx
) {
280 Poll
::Ready(None
) => {
281 *this
.internal_state
= InternalState
::BothFinished
;
286 InternalState
::BothFinished
=> Poll
::Ready(None
),
291 impl<St1
, St2
, Clos
, State
> fmt
::Debug
for SelectWithStrategy
<St1
, St2
, Clos
, State
>
297 fn fmt(&self, f
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
298 f
.debug_struct("SelectWithStrategy")
299 .field("stream1", &self.stream1
)
300 .field("stream2", &self.stream2
)
301 .field("state", &self.state
)