2 store
, Buffer
, Codec
, Config
, Counts
, Frame
, Prioritize
, Prioritized
, Store
, Stream
, StreamId
,
3 StreamIdOverflow
, WindowSize
,
5 use crate::codec
::UserError
;
6 use crate::frame
::{self, Reason}
;
7 use crate::proto
::{self, Error, Initiator}
;
10 use tokio
::io
::AsyncWrite
;
12 use std
::cmp
::Ordering
;
14 use std
::task
::{Context, Poll, Waker}
;
16 /// Manages state transitions related to outbound frames.
18 pub(super) struct Send
{
19 /// Stream identifier to use for next initialized stream.
20 next_stream_id
: Result
<StreamId
, StreamIdOverflow
>,
22 /// Any streams with a higher ID are ignored.
24 /// This starts as MAX, but is lowered when a GOAWAY is received.
26 /// > After sending a GOAWAY frame, the sender can discard frames for
27 /// > streams initiated by the receiver with identifiers higher than
28 /// > the identified last stream.
29 max_stream_id
: StreamId
,
31 /// Initial window size of locally initiated streams
32 init_window_sz
: WindowSize
,
34 /// Prioritization layer
35 prioritize
: Prioritize
,
37 is_push_enabled
: bool
,
39 /// If extended connect protocol is enabled.
40 is_extended_connect_protocol_enabled
: bool
,
43 /// A value to detect which public API has called `poll_reset`.
45 pub(crate) enum PollReset
{
51 /// Create a new `Send`
52 pub fn new(config
: &Config
) -> Self {
54 init_window_sz
: config
.remote_init_window_sz
,
55 max_stream_id
: StreamId
::MAX
,
56 next_stream_id
: Ok(config
.local_next_stream_id
),
57 prioritize
: Prioritize
::new(config
),
58 is_push_enabled
: true,
59 is_extended_connect_protocol_enabled
: false,
63 /// Returns the initial send window size
64 pub fn init_window_sz(&self) -> WindowSize
{
68 pub fn open(&mut self) -> Result
<StreamId
, UserError
> {
69 let stream_id
= self.ensure_next_stream_id()?
;
70 self.next_stream_id
= stream_id
.next_id();
74 pub fn reserve_local(&mut self) -> Result
<StreamId
, UserError
> {
75 let stream_id
= self.ensure_next_stream_id()?
;
76 self.next_stream_id
= stream_id
.next_id();
80 fn check_headers(fields
: &http
::HeaderMap
) -> Result
<(), UserError
> {
81 // 8.1.2.2. Connection-Specific Header Fields
82 if fields
.contains_key(http
::header
::CONNECTION
)
83 || fields
.contains_key(http
::header
::TRANSFER_ENCODING
)
84 || fields
.contains_key(http
::header
::UPGRADE
)
85 || fields
.contains_key("keep-alive")
86 || fields
.contains_key("proxy-connection")
88 tracing
::debug
!("illegal connection-specific headers found");
89 return Err(UserError
::MalformedHeaders
);
90 } else if let Some(te
) = fields
.get(http
::header
::TE
) {
92 tracing
::debug
!("illegal connection-specific headers found");
93 return Err(UserError
::MalformedHeaders
);
99 pub fn send_push_promise
<B
>(
101 frame
: frame
::PushPromise
,
102 buffer
: &mut Buffer
<Frame
<B
>>,
103 stream
: &mut store
::Ptr
,
104 task
: &mut Option
<Waker
>,
105 ) -> Result
<(), UserError
> {
106 if !self.is_push_enabled
{
107 return Err(UserError
::PeerDisabledServerPush
);
111 "send_push_promise; frame={:?}; init_window={:?}",
116 Self::check_headers(frame
.fields())?
;
118 // Queue the frame for sending
120 .queue_frame(frame
.into(), buffer
, stream
, task
);
125 pub fn send_headers
<B
>(
127 frame
: frame
::Headers
,
128 buffer
: &mut Buffer
<Frame
<B
>>,
129 stream
: &mut store
::Ptr
,
131 task
: &mut Option
<Waker
>,
132 ) -> Result
<(), UserError
> {
134 "send_headers; frame={:?}; init_window={:?}",
139 Self::check_headers(frame
.fields())?
;
141 let end_stream
= frame
.is_end_stream();
144 stream
.state
.send_open(end_stream
)?
;
146 let mut pending_open
= false;
147 if counts
.peer().is_local_init(frame
.stream_id()) && !stream
.is_pending_push
{
148 self.prioritize
.queue_open(stream
);
152 // Queue the frame for sending
154 // This call expects that, since new streams are in the open queue, new
155 // streams won't be pushed on pending_send.
157 .queue_frame(frame
.into(), buffer
, stream
, task
);
159 // Need to notify the connection when pushing onto pending_open since
160 // queue_frame only notifies for pending_send.
162 if let Some(task
) = task
.take() {
170 /// Send an explicit RST_STREAM frame
171 pub fn send_reset
<B
>(
174 initiator
: Initiator
,
175 buffer
: &mut Buffer
<Frame
<B
>>,
176 stream
: &mut store
::Ptr
,
178 task
: &mut Option
<Waker
>,
180 let is_reset
= stream
.state
.is_reset();
181 let is_closed
= stream
.state
.is_closed();
182 let is_empty
= stream
.pending_send
.is_empty();
183 let stream_id
= stream
.id
;
186 "send_reset(..., reason={:?}, initiator={:?}, stream={:?}, ..., \
187 is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \
200 // Don't double reset
202 " -> not sending RST_STREAM ({:?} is already reset)",
208 // Transition the state to reset no matter what.
209 stream
.state
.set_reset(stream_id
, reason
, initiator
);
211 // If closed AND the send queue is flushed, then the stream cannot be
212 // reset explicitly, either. Implicit resets can still be queued.
213 if is_closed
&& is_empty
{
215 " -> not sending explicit RST_STREAM ({:?} was closed \
216 and send queue was flushed)",
222 // Clear all pending outbound frames.
223 // Note that we don't call `self.recv_err` because we want to enqueue
224 // the reset frame before transitioning the stream inside
225 // `reclaim_all_capacity`.
226 self.prioritize
.clear_queue(buffer
, stream
);
228 let frame
= frame
::Reset
::new(stream
.id
, reason
);
230 tracing
::trace
!("send_reset -- queueing; frame={:?}", frame
);
232 .queue_frame(frame
.into(), buffer
, stream
, task
);
233 self.prioritize
.reclaim_all_capacity(stream
, counts
);
236 pub fn schedule_implicit_reset(
238 stream
: &mut store
::Ptr
,
241 task
: &mut Option
<Waker
>,
243 if stream
.state
.is_closed() {
244 // Stream is already closed, nothing more to do
248 stream
.state
.set_scheduled_reset(reason
);
250 self.prioritize
.reclaim_reserved_capacity(stream
, counts
);
251 self.prioritize
.schedule_send(stream
, task
);
256 frame
: frame
::Data
<B
>,
257 buffer
: &mut Buffer
<Frame
<B
>>,
258 stream
: &mut store
::Ptr
,
260 task
: &mut Option
<Waker
>,
261 ) -> Result
<(), UserError
>
266 .send_data(frame
, buffer
, stream
, counts
, task
)
269 pub fn send_trailers
<B
>(
271 frame
: frame
::Headers
,
272 buffer
: &mut Buffer
<Frame
<B
>>,
273 stream
: &mut store
::Ptr
,
275 task
: &mut Option
<Waker
>,
276 ) -> Result
<(), UserError
> {
277 // TODO: Should this logic be moved into state.rs?
278 if !stream
.state
.is_send_streaming() {
279 return Err(UserError
::UnexpectedFrameType
);
282 stream
.state
.send_close();
284 tracing
::trace
!("send_trailers -- queuing; frame={:?}", frame
);
286 .queue_frame(frame
.into(), buffer
, stream
, task
);
288 // Release any excess capacity
289 self.prioritize
.reserve_capacity(0, stream
, counts
);
294 pub fn poll_complete
<T
, B
>(
297 buffer
: &mut Buffer
<Frame
<B
>>,
300 dst
: &mut Codec
<T
, Prioritized
<B
>>,
301 ) -> Poll
<io
::Result
<()>>
303 T
: AsyncWrite
+ Unpin
,
307 .poll_complete(cx
, buffer
, store
, counts
, dst
)
310 /// Request capacity to send data
311 pub fn reserve_capacity(
313 capacity
: WindowSize
,
314 stream
: &mut store
::Ptr
,
317 self.prioritize
.reserve_capacity(capacity
, stream
, counts
)
320 pub fn poll_capacity(
323 stream
: &mut store
::Ptr
,
324 ) -> Poll
<Option
<Result
<WindowSize
, UserError
>>> {
325 if !stream
.state
.is_send_streaming() {
326 return Poll
::Ready(None
);
329 if !stream
.send_capacity_inc
{
330 stream
.wait_send(cx
);
331 return Poll
::Pending
;
334 stream
.send_capacity_inc
= false;
336 Poll
::Ready(Some(Ok(self.capacity(stream
))))
339 /// Current available stream send capacity
340 pub fn capacity(&self, stream
: &mut store
::Ptr
) -> WindowSize
{
341 stream
.capacity(self.prioritize
.max_buffer_size())
349 ) -> Poll
<Result
<Reason
, crate::Error
>> {
350 match stream
.state
.ensure_reason(mode
)?
{
351 Some(reason
) => Poll
::Ready(Ok(reason
)),
353 stream
.wait_send(cx
);
359 pub fn recv_connection_window_update(
361 frame
: frame
::WindowUpdate
,
364 ) -> Result
<(), Reason
> {
366 .recv_connection_window_update(frame
.size_increment(), store
, counts
)
369 pub fn recv_stream_window_update
<B
>(
372 buffer
: &mut Buffer
<Frame
<B
>>,
373 stream
: &mut store
::Ptr
,
375 task
: &mut Option
<Waker
>,
376 ) -> Result
<(), Reason
> {
377 if let Err(e
) = self.prioritize
.recv_stream_window_update(sz
, stream
) {
378 tracing
::debug
!("recv_stream_window_update !!; err={:?}", e
);
381 Reason
::FLOW_CONTROL_ERROR
,
395 pub(super) fn recv_go_away(&mut self, last_stream_id
: StreamId
) -> Result
<(), Error
> {
396 if last_stream_id
> self.max_stream_id
{
397 // The remote endpoint sent a `GOAWAY` frame indicating a stream
398 // that we never sent, or that we have already terminated on account
399 // of previous `GOAWAY` frame. In either case, that is illegal.
400 // (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase
401 // the value they send in the last stream identifier, since the
402 // peers might already have retried unprocessed requests on another
405 "recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})",
406 last_stream_id
, self.max_stream_id
,
408 return Err(Error
::library_go_away(Reason
::PROTOCOL_ERROR
));
411 self.max_stream_id
= last_stream_id
;
415 pub fn handle_error
<B
>(
417 buffer
: &mut Buffer
<Frame
<B
>>,
418 stream
: &mut store
::Ptr
,
421 // Clear all pending outbound frames
422 self.prioritize
.clear_queue(buffer
, stream
);
423 self.prioritize
.reclaim_all_capacity(stream
, counts
);
426 pub fn apply_remote_settings
<B
>(
428 settings
: &frame
::Settings
,
429 buffer
: &mut Buffer
<Frame
<B
>>,
432 task
: &mut Option
<Waker
>,
433 ) -> Result
<(), Error
> {
434 if let Some(val
) = settings
.is_extended_connect_protocol_enabled() {
435 self.is_extended_connect_protocol_enabled
= val
;
438 // Applies an update to the remote endpoint's initial window size.
440 // Per RFC 7540 ยง6.9.2:
442 // In addition to changing the flow-control window for streams that are
443 // not yet active, a SETTINGS frame can alter the initial flow-control
444 // window size for streams with active flow-control windows (that is,
445 // streams in the "open" or "half-closed (remote)" state). When the
446 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
447 // the size of all stream flow-control windows that it maintains by the
448 // difference between the new value and the old value.
450 // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
451 // space in a flow-control window to become negative. A sender MUST
452 // track the negative flow-control window and MUST NOT send new
453 // flow-controlled frames until it receives WINDOW_UPDATE frames that
454 // cause the flow-control window to become positive.
455 if let Some(val
) = settings
.initial_window_size() {
456 let old_val
= self.init_window_sz
;
457 self.init_window_sz
= val
;
459 match val
.cmp(&old_val
) {
461 // We must decrease the (remote) window on every open stream.
462 let dec
= old_val
- val
;
463 tracing
::trace
!("decrementing all windows; dec={}", dec
);
465 let mut total_reclaimed
= 0;
466 store
.try_for_each(|mut stream
| {
467 let stream
= &mut *stream
;
470 "decrementing stream window; id={:?}; decr={}; flow={:?}",
476 // TODO: this decrement can underflow based on received frames!
479 .dec_send_window(dec
)
480 .map_err(proto
::Error
::library_go_away
)?
;
482 // It's possible that decreasing the window causes
483 // `window_size` (the stream-specific window) to fall below
484 // `available` (the portion of the connection-level window
485 // that we have allocated to the stream).
486 // In this case, we should take that excess allocation away
487 // and reassign it to other streams.
488 let window_size
= stream
.send_flow
.window_size();
489 let available
= stream
.send_flow
.available().as_size();
490 let reclaimed
= if available
> window_size
{
491 // Drop down to `window_size`.
492 let reclaim
= available
- window_size
;
495 .claim_capacity(reclaim
)
496 .map_err(proto
::Error
::library_go_away
)?
;
497 total_reclaimed
+= reclaim
;
504 "decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}",
511 // TODO: Should this notify the producer when the capacity
512 // of a stream is reduced? Maybe it should if the capacity
513 // is reduced to zero, allowing the producer to stop work.
515 Ok
::<_
, proto
::Error
>(())
519 .assign_connection_capacity(total_reclaimed
, store
, counts
);
521 Ordering
::Greater
=> {
522 let inc
= val
- old_val
;
524 store
.try_for_each(|mut stream
| {
525 self.recv_stream_window_update(inc
, buffer
, &mut stream
, counts
, task
)
526 .map_err(Error
::library_go_away
)
529 Ordering
::Equal
=> (),
533 if let Some(val
) = settings
.is_push_enabled() {
534 self.is_push_enabled
= val
540 pub fn clear_queues(&mut self, store
: &mut Store
, counts
: &mut Counts
) {
541 self.prioritize
.clear_pending_capacity(store
, counts
);
542 self.prioritize
.clear_pending_send(store
, counts
);
543 self.prioritize
.clear_pending_open(store
, counts
);
546 pub fn ensure_not_idle(&self, id
: StreamId
) -> Result
<(), Reason
> {
547 if let Ok(next
) = self.next_stream_id
{
549 return Err(Reason
::PROTOCOL_ERROR
);
552 // if next_stream_id is overflowed, that's ok.
557 pub fn ensure_next_stream_id(&self) -> Result
<StreamId
, UserError
> {
559 .map_err(|_
| UserError
::OverflowedStreamId
)
562 pub fn may_have_created_stream(&self, id
: StreamId
) -> bool
{
563 if let Ok(next_id
) = self.next_stream_id
{
564 // Peer::is_local_init should have been called beforehand
565 debug_assert_eq
!(id
.is_server_initiated(), next_id
.is_server_initiated(),);
572 pub(super) fn maybe_reset_next_stream_id(&mut self, id
: StreamId
) {
573 if let Ok(next_id
) = self.next_stream_id
{
574 // Peer::is_local_init should have been called beforehand
575 debug_assert_eq
!(id
.is_server_initiated(), next_id
.is_server_initiated());
577 self.next_stream_id
= id
.next_id();
582 pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool
{
583 self.is_extended_connect_protocol_enabled