3 use crate::codec
::UserError
;
4 use crate::frame
::{self, Reason, StreamId}
;
5 use crate::proto
::{self, Error, Initiator, PollReset}
;
10 /// Represents the state of an H2 stream
14 /// send PP | | recv PP
15 /// ,--------| idle |--------.
18 /// +----------+ | +----------+
19 /// | | | send H / | |
20 /// ,------| reserved | | recv H | reserved |------.
21 /// | | (local) | | | (remote) | |
22 /// | +----------+ v +----------+ |
23 /// | | +--------+ | |
24 /// | | recv ES | | send ES | |
25 /// | send H | ,-------| open |-------. | recv H |
27 /// | v v +--------+ v v |
28 /// | +----------+ | +----------+ |
29 /// | | half | | | half | |
30 /// | | closed | | send R / | closed | |
31 /// | | (remote) | | recv R | (local) | |
32 /// | +----------+ | +----------+ |
34 /// | | send ES / | recv ES / | |
35 /// | | send R / v send R / | |
36 /// | | recv R +--------+ recv R | |
37 /// | send R / `----------->| |<-----------' send R / |
38 /// | recv R | closed | recv R |
39 /// `----------------------->| |<----------------------'
42 /// send: endpoint sends this frame
43 /// recv: endpoint receives this frame
45 /// H: HEADERS frame (with implied CONTINUATIONs)
46 /// PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
47 /// ES: END_STREAM flag
48 /// R: RST_STREAM frame
50 #[derive(Debug, Clone)]
55 #[derive(Debug, Clone)]
58 // TODO: these states shouldn't count against concurrency limits:
61 Open { local: Peer, remote: Peer }
,
62 HalfClosedLocal(Peer
), // TODO: explicitly name this value
63 HalfClosedRemote(Peer
),
67 #[derive(Debug, Copy, Clone, Default)]
74 #[derive(Debug, Clone)]
79 /// This indicates to the connection that a reset frame must be sent out
80 /// once the send queue has been flushed.
82 /// Examples of when this could happen:
83 /// - User drops all references to a stream, so we want to CANCEL the it.
84 /// - Header block size was too large, so we want to REFUSE, possibly
85 /// after sending a 431 response frame.
86 ScheduledLibraryReset(Reason
),
90 /// Opens the send-half of a stream if it is not already open.
91 pub fn send_open(&mut self, eos
: bool
) -> Result
<(), UserError
> {
92 let local
= Streaming
;
94 self.inner
= match self.inner
{
97 HalfClosedLocal(AwaitingHeaders
)
101 remote
: AwaitingHeaders
,
106 local
: AwaitingHeaders
,
110 HalfClosedLocal(remote
)
112 Open { local, remote }
115 HalfClosedRemote(AwaitingHeaders
) | ReservedLocal
=> {
117 Closed(Cause
::EndStream
)
119 HalfClosedRemote(local
)
123 // All other transitions result in a protocol error
124 return Err(UserError
::UnexpectedFrameType
);
131 /// Opens the receive-half of the stream when a HEADERS frame is received.
133 /// Returns true if this transitions the state to Open.
134 pub fn recv_open(&mut self, frame
: &frame
::Headers
) -> Result
<bool
, Error
> {
135 let mut initial
= false;
136 let eos
= frame
.is_end_stream();
138 self.inner
= match self.inner
{
143 HalfClosedRemote(AwaitingHeaders
)
146 local
: AwaitingHeaders
,
147 remote
: if frame
.is_informational() {
148 tracing
::trace
!("skipping 1xx response headers");
160 Closed(Cause
::EndStream
)
161 } else if frame
.is_informational() {
162 tracing
::trace
!("skipping 1xx response headers");
165 HalfClosedLocal(Streaming
)
170 remote
: AwaitingHeaders
,
173 HalfClosedRemote(local
)
177 remote
: if frame
.is_informational() {
178 tracing
::trace
!("skipping 1xx response headers");
186 HalfClosedLocal(AwaitingHeaders
) => {
188 Closed(Cause
::EndStream
)
189 } else if frame
.is_informational() {
190 tracing
::trace
!("skipping 1xx response headers");
191 HalfClosedLocal(AwaitingHeaders
)
193 HalfClosedLocal(Streaming
)
197 // All other transitions result in a protocol error
198 proto_err
!(conn
: "recv_open: in unexpected state {:?}", state
);
199 return Err(Error
::library_go_away(Reason
::PROTOCOL_ERROR
));
206 /// Transition from Idle -> ReservedRemote
207 pub fn reserve_remote(&mut self) -> Result
<(), Error
> {
210 self.inner
= ReservedRemote
;
214 proto_err
!(conn
: "reserve_remote: in unexpected state {:?}", state
);
215 Err(Error
::library_go_away(Reason
::PROTOCOL_ERROR
))
220 /// Transition from Idle -> ReservedLocal
221 pub fn reserve_local(&mut self) -> Result
<(), UserError
> {
224 self.inner
= ReservedLocal
;
227 _
=> Err(UserError
::UnexpectedFrameType
),
231 /// Indicates that the remote side will not send more data to the local.
232 pub fn recv_close(&mut self) -> Result
<(), Error
> {
234 Open { local, .. }
=> {
235 // The remote side will continue to receive data.
236 tracing
::trace
!("recv_close: Open => HalfClosedRemote({:?})", local
);
237 self.inner
= HalfClosedRemote(local
);
240 HalfClosedLocal(..) => {
241 tracing
::trace
!("recv_close: HalfClosedLocal => Closed");
242 self.inner
= Closed(Cause
::EndStream
);
246 proto_err
!(conn
: "recv_close: in unexpected state {:?}", state
);
247 Err(Error
::library_go_away(Reason
::PROTOCOL_ERROR
))
252 /// The remote explicitly sent a RST_STREAM.
255 /// - `frame`: the received RST_STREAM frame.
256 /// - `queued`: true if this stream has frames in the pending send queue.
257 pub fn recv_reset(&mut self, frame
: frame
::Reset
, queued
: bool
) {
259 // If the stream is already in a `Closed` state, do nothing,
260 // provided that there are no frames still in the send queue.
261 Closed(..) if !queued
=> {}
262 // A notionally `Closed` stream may still have queued frames in
263 // the following cases:
265 // - if the cause is `Cause::Scheduled(..)` (i.e. we have not
266 // actually closed the stream yet).
267 // - if the cause is `Cause::EndStream`: we transition to this
268 // state when an EOS frame is *enqueued* (so that it's invalid
269 // to enqueue more frames), not when the EOS frame is *sent*;
270 // therefore, there may still be frames ahead of the EOS frame
271 // in the send queue.
273 // In either of these cases, we want to overwrite the stream's
274 // previous state with the received RST_STREAM, so that the queue
275 // will be cleared by `Prioritize::pop_frame`.
278 "recv_reset; frame={:?}; state={:?}; queued={:?}",
283 self.inner
= Closed(Cause
::Error(Error
::remote_reset(
291 /// Handle a connection-level error.
292 pub fn handle_error(&mut self, err
: &proto
::Error
) {
296 tracing
::trace
!("handle_error; err={:?}", err
);
297 self.inner
= Closed(Cause
::Error(err
.clone()));
302 pub fn recv_eof(&mut self) {
306 tracing
::trace
!("recv_eof; state={:?}", state
);
307 self.inner
= Closed(Cause
::Error(
309 io
::ErrorKind
::BrokenPipe
,
310 "stream closed because of a broken pipe",
318 /// Indicates that the local side will not send more data to the local.
319 pub fn send_close(&mut self) {
321 Open { remote, .. }
=> {
322 // The remote side will continue to receive data.
323 tracing
::trace
!("send_close: Open => HalfClosedLocal({:?})", remote
);
324 self.inner
= HalfClosedLocal(remote
);
326 HalfClosedRemote(..) => {
327 tracing
::trace
!("send_close: HalfClosedRemote => Closed");
328 self.inner
= Closed(Cause
::EndStream
);
330 ref state
=> panic
!("send_close: unexpected state {:?}", state
),
334 /// Set the stream state to reset locally.
335 pub fn set_reset(&mut self, stream_id
: StreamId
, reason
: Reason
, initiator
: Initiator
) {
336 self.inner
= Closed(Cause
::Error(Error
::Reset(stream_id
, reason
, initiator
)));
339 /// Set the stream state to a scheduled reset.
340 pub fn set_scheduled_reset(&mut self, reason
: Reason
) {
341 debug_assert
!(!self.is_closed());
342 self.inner
= Closed(Cause
::ScheduledLibraryReset(reason
));
345 pub fn get_scheduled_reset(&self) -> Option
<Reason
> {
347 Closed(Cause
::ScheduledLibraryReset(reason
)) => Some(reason
),
352 pub fn is_scheduled_reset(&self) -> bool
{
353 matches
!(self.inner
, Closed(Cause
::ScheduledLibraryReset(..)))
356 pub fn is_local_error(&self) -> bool
{
358 Closed(Cause
::Error(ref e
)) => e
.is_local(),
359 Closed(Cause
::ScheduledLibraryReset(..)) => true,
364 pub fn is_remote_reset(&self) -> bool
{
367 Closed(Cause
::Error(Error
::Reset(_
, _
, Initiator
::Remote
)))
371 /// Returns true if the stream is already reset.
372 pub fn is_reset(&self) -> bool
{
374 Closed(Cause
::EndStream
) => false,
380 pub fn is_send_streaming(&self) -> bool
{
386 } | HalfClosedRemote(Streaming
)
390 /// Returns true when the stream is in a state to receive headers
391 pub fn is_recv_headers(&self) -> bool
{
395 remote
: AwaitingHeaders
,
397 } | HalfClosedLocal(AwaitingHeaders
)
402 pub fn is_recv_streaming(&self) -> bool
{
408 } | HalfClosedLocal(Streaming
)
412 pub fn is_closed(&self) -> bool
{
413 matches
!(self.inner
, Closed(_
))
416 pub fn is_recv_closed(&self) -> bool
{
419 Closed(..) | HalfClosedRemote(..) | ReservedLocal
423 pub fn is_send_closed(&self) -> bool
{
426 Closed(..) | HalfClosedLocal(..) | ReservedRemote
430 pub fn is_idle(&self) -> bool
{
431 matches
!(self.inner
, Idle
)
434 pub fn ensure_recv_open(&self) -> Result
<bool
, proto
::Error
> {
435 // TODO: Is this correct?
437 Closed(Cause
::Error(ref e
)) => Err(e
.clone()),
438 Closed(Cause
::ScheduledLibraryReset(reason
)) => {
439 Err(proto
::Error
::library_go_away(reason
))
441 Closed(Cause
::EndStream
) | HalfClosedRemote(..) | ReservedLocal
=> Ok(false),
446 /// Returns a reason if the stream has been reset.
447 pub(super) fn ensure_reason(&self, mode
: PollReset
) -> Result
<Option
<Reason
>, crate::Error
> {
449 Closed(Cause
::Error(Error
::Reset(_
, reason
, _
)))
450 | Closed(Cause
::Error(Error
::GoAway(_
, reason
, _
)))
451 | Closed(Cause
::ScheduledLibraryReset(reason
)) => Ok(Some(reason
)),
452 Closed(Cause
::Error(ref e
)) => Err(e
.clone().into()),
456 | HalfClosedRemote(Streaming
) => match mode
{
457 PollReset
::AwaitingHeaders
=> Err(UserError
::PollResetAfterSendResponse
.into()),
458 PollReset
::Streaming
=> Ok(None
),
465 impl Default
for State
{
466 fn default() -> State
{
467 State { inner: Inner::Idle }