]>
Commit | Line | Data |
---|---|---|
add651ee FG |
1 | use super::{ |
2 | store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId, | |
3 | StreamIdOverflow, WindowSize, | |
4 | }; | |
5 | use crate::codec::UserError; | |
6 | use crate::frame::{self, Reason}; | |
4b012472 | 7 | use crate::proto::{self, Error, Initiator}; |
add651ee FG |
8 | |
9 | use bytes::Buf; | |
10 | use tokio::io::AsyncWrite; | |
11 | ||
12 | use std::cmp::Ordering; | |
13 | use std::io; | |
14 | use std::task::{Context, Poll, Waker}; | |
15 | ||
16 | /// Manages state transitions related to outbound frames. | |
17 | #[derive(Debug)] | |
18 | pub(super) struct Send { | |
19 | /// Stream identifier to use for next initialized stream. | |
20 | next_stream_id: Result<StreamId, StreamIdOverflow>, | |
21 | ||
22 | /// Any streams with a higher ID are ignored. | |
23 | /// | |
24 | /// This starts as MAX, but is lowered when a GOAWAY is received. | |
25 | /// | |
26 | /// > After sending a GOAWAY frame, the sender can discard frames for | |
27 | /// > streams initiated by the receiver with identifiers higher than | |
28 | /// > the identified last stream. | |
29 | max_stream_id: StreamId, | |
30 | ||
31 | /// Initial window size of locally initiated streams | |
32 | init_window_sz: WindowSize, | |
33 | ||
34 | /// Prioritization layer | |
35 | prioritize: Prioritize, | |
36 | ||
37 | is_push_enabled: bool, | |
38 | ||
39 | /// If extended connect protocol is enabled. | |
40 | is_extended_connect_protocol_enabled: bool, | |
41 | } | |
42 | ||
43 | /// A value to detect which public API has called `poll_reset`. | |
44 | #[derive(Debug)] | |
45 | pub(crate) enum PollReset { | |
46 | AwaitingHeaders, | |
47 | Streaming, | |
48 | } | |
49 | ||
50 | impl Send { | |
51 | /// Create a new `Send` | |
52 | pub fn new(config: &Config) -> Self { | |
53 | Send { | |
54 | init_window_sz: config.remote_init_window_sz, | |
55 | max_stream_id: StreamId::MAX, | |
56 | next_stream_id: Ok(config.local_next_stream_id), | |
57 | prioritize: Prioritize::new(config), | |
58 | is_push_enabled: true, | |
59 | is_extended_connect_protocol_enabled: false, | |
60 | } | |
61 | } | |
62 | ||
63 | /// Returns the initial send window size | |
64 | pub fn init_window_sz(&self) -> WindowSize { | |
65 | self.init_window_sz | |
66 | } | |
67 | ||
68 | pub fn open(&mut self) -> Result<StreamId, UserError> { | |
69 | let stream_id = self.ensure_next_stream_id()?; | |
70 | self.next_stream_id = stream_id.next_id(); | |
71 | Ok(stream_id) | |
72 | } | |
73 | ||
74 | pub fn reserve_local(&mut self) -> Result<StreamId, UserError> { | |
75 | let stream_id = self.ensure_next_stream_id()?; | |
76 | self.next_stream_id = stream_id.next_id(); | |
77 | Ok(stream_id) | |
78 | } | |
79 | ||
80 | fn check_headers(fields: &http::HeaderMap) -> Result<(), UserError> { | |
81 | // 8.1.2.2. Connection-Specific Header Fields | |
82 | if fields.contains_key(http::header::CONNECTION) | |
83 | || fields.contains_key(http::header::TRANSFER_ENCODING) | |
84 | || fields.contains_key(http::header::UPGRADE) | |
85 | || fields.contains_key("keep-alive") | |
86 | || fields.contains_key("proxy-connection") | |
87 | { | |
88 | tracing::debug!("illegal connection-specific headers found"); | |
89 | return Err(UserError::MalformedHeaders); | |
90 | } else if let Some(te) = fields.get(http::header::TE) { | |
91 | if te != "trailers" { | |
92 | tracing::debug!("illegal connection-specific headers found"); | |
93 | return Err(UserError::MalformedHeaders); | |
94 | } | |
95 | } | |
96 | Ok(()) | |
97 | } | |
98 | ||
99 | pub fn send_push_promise<B>( | |
100 | &mut self, | |
101 | frame: frame::PushPromise, | |
102 | buffer: &mut Buffer<Frame<B>>, | |
103 | stream: &mut store::Ptr, | |
104 | task: &mut Option<Waker>, | |
105 | ) -> Result<(), UserError> { | |
106 | if !self.is_push_enabled { | |
107 | return Err(UserError::PeerDisabledServerPush); | |
108 | } | |
109 | ||
110 | tracing::trace!( | |
111 | "send_push_promise; frame={:?}; init_window={:?}", | |
112 | frame, | |
113 | self.init_window_sz | |
114 | ); | |
115 | ||
116 | Self::check_headers(frame.fields())?; | |
117 | ||
118 | // Queue the frame for sending | |
119 | self.prioritize | |
120 | .queue_frame(frame.into(), buffer, stream, task); | |
121 | ||
122 | Ok(()) | |
123 | } | |
124 | ||
125 | pub fn send_headers<B>( | |
126 | &mut self, | |
127 | frame: frame::Headers, | |
128 | buffer: &mut Buffer<Frame<B>>, | |
129 | stream: &mut store::Ptr, | |
130 | counts: &mut Counts, | |
131 | task: &mut Option<Waker>, | |
132 | ) -> Result<(), UserError> { | |
133 | tracing::trace!( | |
134 | "send_headers; frame={:?}; init_window={:?}", | |
135 | frame, | |
136 | self.init_window_sz | |
137 | ); | |
138 | ||
139 | Self::check_headers(frame.fields())?; | |
140 | ||
141 | let end_stream = frame.is_end_stream(); | |
142 | ||
143 | // Update the state | |
144 | stream.state.send_open(end_stream)?; | |
145 | ||
4b012472 FG |
146 | let mut pending_open = false; |
147 | if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push { | |
148 | self.prioritize.queue_open(stream); | |
149 | pending_open = true; | |
add651ee FG |
150 | } |
151 | ||
152 | // Queue the frame for sending | |
4b012472 FG |
153 | // |
154 | // This call expects that, since new streams are in the open queue, new | |
155 | // streams won't be pushed on pending_send. | |
add651ee FG |
156 | self.prioritize |
157 | .queue_frame(frame.into(), buffer, stream, task); | |
158 | ||
4b012472 FG |
159 | // Need to notify the connection when pushing onto pending_open since |
160 | // queue_frame only notifies for pending_send. | |
161 | if pending_open { | |
162 | if let Some(task) = task.take() { | |
163 | task.wake(); | |
164 | } | |
165 | } | |
166 | ||
add651ee FG |
167 | Ok(()) |
168 | } | |
169 | ||
170 | /// Send an explicit RST_STREAM frame | |
171 | pub fn send_reset<B>( | |
172 | &mut self, | |
173 | reason: Reason, | |
174 | initiator: Initiator, | |
175 | buffer: &mut Buffer<Frame<B>>, | |
176 | stream: &mut store::Ptr, | |
177 | counts: &mut Counts, | |
178 | task: &mut Option<Waker>, | |
179 | ) { | |
180 | let is_reset = stream.state.is_reset(); | |
181 | let is_closed = stream.state.is_closed(); | |
182 | let is_empty = stream.pending_send.is_empty(); | |
183 | let stream_id = stream.id; | |
184 | ||
185 | tracing::trace!( | |
186 | "send_reset(..., reason={:?}, initiator={:?}, stream={:?}, ..., \ | |
187 | is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \ | |
188 | state={:?} \ | |
189 | ", | |
190 | reason, | |
191 | initiator, | |
192 | stream_id, | |
193 | is_reset, | |
194 | is_closed, | |
195 | is_empty, | |
196 | stream.state | |
197 | ); | |
198 | ||
199 | if is_reset { | |
200 | // Don't double reset | |
201 | tracing::trace!( | |
202 | " -> not sending RST_STREAM ({:?} is already reset)", | |
203 | stream_id | |
204 | ); | |
205 | return; | |
206 | } | |
207 | ||
208 | // Transition the state to reset no matter what. | |
209 | stream.state.set_reset(stream_id, reason, initiator); | |
210 | ||
211 | // If closed AND the send queue is flushed, then the stream cannot be | |
212 | // reset explicitly, either. Implicit resets can still be queued. | |
213 | if is_closed && is_empty { | |
214 | tracing::trace!( | |
215 | " -> not sending explicit RST_STREAM ({:?} was closed \ | |
216 | and send queue was flushed)", | |
217 | stream_id | |
218 | ); | |
219 | return; | |
220 | } | |
221 | ||
222 | // Clear all pending outbound frames. | |
223 | // Note that we don't call `self.recv_err` because we want to enqueue | |
224 | // the reset frame before transitioning the stream inside | |
225 | // `reclaim_all_capacity`. | |
226 | self.prioritize.clear_queue(buffer, stream); | |
227 | ||
228 | let frame = frame::Reset::new(stream.id, reason); | |
229 | ||
230 | tracing::trace!("send_reset -- queueing; frame={:?}", frame); | |
231 | self.prioritize | |
232 | .queue_frame(frame.into(), buffer, stream, task); | |
233 | self.prioritize.reclaim_all_capacity(stream, counts); | |
234 | } | |
235 | ||
236 | pub fn schedule_implicit_reset( | |
237 | &mut self, | |
238 | stream: &mut store::Ptr, | |
239 | reason: Reason, | |
240 | counts: &mut Counts, | |
241 | task: &mut Option<Waker>, | |
242 | ) { | |
243 | if stream.state.is_closed() { | |
244 | // Stream is already closed, nothing more to do | |
245 | return; | |
246 | } | |
247 | ||
248 | stream.state.set_scheduled_reset(reason); | |
249 | ||
250 | self.prioritize.reclaim_reserved_capacity(stream, counts); | |
251 | self.prioritize.schedule_send(stream, task); | |
252 | } | |
253 | ||
254 | pub fn send_data<B>( | |
255 | &mut self, | |
256 | frame: frame::Data<B>, | |
257 | buffer: &mut Buffer<Frame<B>>, | |
258 | stream: &mut store::Ptr, | |
259 | counts: &mut Counts, | |
260 | task: &mut Option<Waker>, | |
261 | ) -> Result<(), UserError> | |
262 | where | |
263 | B: Buf, | |
264 | { | |
265 | self.prioritize | |
266 | .send_data(frame, buffer, stream, counts, task) | |
267 | } | |
268 | ||
269 | pub fn send_trailers<B>( | |
270 | &mut self, | |
271 | frame: frame::Headers, | |
272 | buffer: &mut Buffer<Frame<B>>, | |
273 | stream: &mut store::Ptr, | |
274 | counts: &mut Counts, | |
275 | task: &mut Option<Waker>, | |
276 | ) -> Result<(), UserError> { | |
277 | // TODO: Should this logic be moved into state.rs? | |
278 | if !stream.state.is_send_streaming() { | |
279 | return Err(UserError::UnexpectedFrameType); | |
280 | } | |
281 | ||
282 | stream.state.send_close(); | |
283 | ||
284 | tracing::trace!("send_trailers -- queuing; frame={:?}", frame); | |
285 | self.prioritize | |
286 | .queue_frame(frame.into(), buffer, stream, task); | |
287 | ||
288 | // Release any excess capacity | |
289 | self.prioritize.reserve_capacity(0, stream, counts); | |
290 | ||
291 | Ok(()) | |
292 | } | |
293 | ||
294 | pub fn poll_complete<T, B>( | |
295 | &mut self, | |
296 | cx: &mut Context, | |
297 | buffer: &mut Buffer<Frame<B>>, | |
298 | store: &mut Store, | |
299 | counts: &mut Counts, | |
300 | dst: &mut Codec<T, Prioritized<B>>, | |
301 | ) -> Poll<io::Result<()>> | |
302 | where | |
303 | T: AsyncWrite + Unpin, | |
304 | B: Buf, | |
305 | { | |
306 | self.prioritize | |
307 | .poll_complete(cx, buffer, store, counts, dst) | |
308 | } | |
309 | ||
310 | /// Request capacity to send data | |
311 | pub fn reserve_capacity( | |
312 | &mut self, | |
313 | capacity: WindowSize, | |
314 | stream: &mut store::Ptr, | |
315 | counts: &mut Counts, | |
316 | ) { | |
317 | self.prioritize.reserve_capacity(capacity, stream, counts) | |
318 | } | |
319 | ||
320 | pub fn poll_capacity( | |
321 | &mut self, | |
322 | cx: &Context, | |
323 | stream: &mut store::Ptr, | |
324 | ) -> Poll<Option<Result<WindowSize, UserError>>> { | |
325 | if !stream.state.is_send_streaming() { | |
326 | return Poll::Ready(None); | |
327 | } | |
328 | ||
329 | if !stream.send_capacity_inc { | |
330 | stream.wait_send(cx); | |
331 | return Poll::Pending; | |
332 | } | |
333 | ||
334 | stream.send_capacity_inc = false; | |
335 | ||
336 | Poll::Ready(Some(Ok(self.capacity(stream)))) | |
337 | } | |
338 | ||
339 | /// Current available stream send capacity | |
340 | pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { | |
341 | stream.capacity(self.prioritize.max_buffer_size()) | |
342 | } | |
343 | ||
344 | pub fn poll_reset( | |
345 | &self, | |
346 | cx: &Context, | |
347 | stream: &mut Stream, | |
348 | mode: PollReset, | |
349 | ) -> Poll<Result<Reason, crate::Error>> { | |
350 | match stream.state.ensure_reason(mode)? { | |
351 | Some(reason) => Poll::Ready(Ok(reason)), | |
352 | None => { | |
353 | stream.wait_send(cx); | |
354 | Poll::Pending | |
355 | } | |
356 | } | |
357 | } | |
358 | ||
359 | pub fn recv_connection_window_update( | |
360 | &mut self, | |
361 | frame: frame::WindowUpdate, | |
362 | store: &mut Store, | |
363 | counts: &mut Counts, | |
364 | ) -> Result<(), Reason> { | |
365 | self.prioritize | |
366 | .recv_connection_window_update(frame.size_increment(), store, counts) | |
367 | } | |
368 | ||
369 | pub fn recv_stream_window_update<B>( | |
370 | &mut self, | |
371 | sz: WindowSize, | |
372 | buffer: &mut Buffer<Frame<B>>, | |
373 | stream: &mut store::Ptr, | |
374 | counts: &mut Counts, | |
375 | task: &mut Option<Waker>, | |
376 | ) -> Result<(), Reason> { | |
377 | if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { | |
378 | tracing::debug!("recv_stream_window_update !!; err={:?}", e); | |
379 | ||
380 | self.send_reset( | |
381 | Reason::FLOW_CONTROL_ERROR, | |
382 | Initiator::Library, | |
383 | buffer, | |
384 | stream, | |
385 | counts, | |
386 | task, | |
387 | ); | |
388 | ||
389 | return Err(e); | |
390 | } | |
391 | ||
392 | Ok(()) | |
393 | } | |
394 | ||
395 | pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), Error> { | |
396 | if last_stream_id > self.max_stream_id { | |
397 | // The remote endpoint sent a `GOAWAY` frame indicating a stream | |
398 | // that we never sent, or that we have already terminated on account | |
399 | // of previous `GOAWAY` frame. In either case, that is illegal. | |
400 | // (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase | |
401 | // the value they send in the last stream identifier, since the | |
402 | // peers might already have retried unprocessed requests on another | |
403 | // connection.") | |
404 | proto_err!(conn: | |
405 | "recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})", | |
406 | last_stream_id, self.max_stream_id, | |
407 | ); | |
408 | return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); | |
409 | } | |
410 | ||
411 | self.max_stream_id = last_stream_id; | |
412 | Ok(()) | |
413 | } | |
414 | ||
415 | pub fn handle_error<B>( | |
416 | &mut self, | |
417 | buffer: &mut Buffer<Frame<B>>, | |
418 | stream: &mut store::Ptr, | |
419 | counts: &mut Counts, | |
420 | ) { | |
421 | // Clear all pending outbound frames | |
422 | self.prioritize.clear_queue(buffer, stream); | |
423 | self.prioritize.reclaim_all_capacity(stream, counts); | |
424 | } | |
425 | ||
426 | pub fn apply_remote_settings<B>( | |
427 | &mut self, | |
428 | settings: &frame::Settings, | |
429 | buffer: &mut Buffer<Frame<B>>, | |
430 | store: &mut Store, | |
431 | counts: &mut Counts, | |
432 | task: &mut Option<Waker>, | |
433 | ) -> Result<(), Error> { | |
434 | if let Some(val) = settings.is_extended_connect_protocol_enabled() { | |
435 | self.is_extended_connect_protocol_enabled = val; | |
436 | } | |
437 | ||
438 | // Applies an update to the remote endpoint's initial window size. | |
439 | // | |
440 | // Per RFC 7540 §6.9.2: | |
441 | // | |
442 | // In addition to changing the flow-control window for streams that are | |
443 | // not yet active, a SETTINGS frame can alter the initial flow-control | |
444 | // window size for streams with active flow-control windows (that is, | |
445 | // streams in the "open" or "half-closed (remote)" state). When the | |
446 | // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust | |
447 | // the size of all stream flow-control windows that it maintains by the | |
448 | // difference between the new value and the old value. | |
449 | // | |
450 | // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available | |
451 | // space in a flow-control window to become negative. A sender MUST | |
452 | // track the negative flow-control window and MUST NOT send new | |
453 | // flow-controlled frames until it receives WINDOW_UPDATE frames that | |
454 | // cause the flow-control window to become positive. | |
455 | if let Some(val) = settings.initial_window_size() { | |
456 | let old_val = self.init_window_sz; | |
457 | self.init_window_sz = val; | |
458 | ||
459 | match val.cmp(&old_val) { | |
460 | Ordering::Less => { | |
461 | // We must decrease the (remote) window on every open stream. | |
462 | let dec = old_val - val; | |
463 | tracing::trace!("decrementing all windows; dec={}", dec); | |
464 | ||
465 | let mut total_reclaimed = 0; | |
4b012472 | 466 | store.try_for_each(|mut stream| { |
add651ee FG |
467 | let stream = &mut *stream; |
468 | ||
4b012472 FG |
469 | tracing::trace!( |
470 | "decrementing stream window; id={:?}; decr={}; flow={:?}", | |
471 | stream.id, | |
472 | dec, | |
473 | stream.send_flow | |
474 | ); | |
475 | ||
476 | // TODO: this decrement can underflow based on received frames! | |
477 | stream | |
478 | .send_flow | |
479 | .dec_send_window(dec) | |
480 | .map_err(proto::Error::library_go_away)?; | |
add651ee FG |
481 | |
482 | // It's possible that decreasing the window causes | |
483 | // `window_size` (the stream-specific window) to fall below | |
484 | // `available` (the portion of the connection-level window | |
485 | // that we have allocated to the stream). | |
486 | // In this case, we should take that excess allocation away | |
487 | // and reassign it to other streams. | |
488 | let window_size = stream.send_flow.window_size(); | |
489 | let available = stream.send_flow.available().as_size(); | |
490 | let reclaimed = if available > window_size { | |
491 | // Drop down to `window_size`. | |
492 | let reclaim = available - window_size; | |
4b012472 FG |
493 | stream |
494 | .send_flow | |
495 | .claim_capacity(reclaim) | |
496 | .map_err(proto::Error::library_go_away)?; | |
add651ee FG |
497 | total_reclaimed += reclaim; |
498 | reclaim | |
499 | } else { | |
500 | 0 | |
501 | }; | |
502 | ||
503 | tracing::trace!( | |
504 | "decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}", | |
505 | stream.id, | |
506 | dec, | |
507 | reclaimed, | |
508 | stream.send_flow | |
509 | ); | |
510 | ||
511 | // TODO: Should this notify the producer when the capacity | |
512 | // of a stream is reduced? Maybe it should if the capacity | |
513 | // is reduced to zero, allowing the producer to stop work. | |
4b012472 FG |
514 | |
515 | Ok::<_, proto::Error>(()) | |
516 | })?; | |
add651ee FG |
517 | |
518 | self.prioritize | |
519 | .assign_connection_capacity(total_reclaimed, store, counts); | |
520 | } | |
521 | Ordering::Greater => { | |
522 | let inc = val - old_val; | |
523 | ||
524 | store.try_for_each(|mut stream| { | |
525 | self.recv_stream_window_update(inc, buffer, &mut stream, counts, task) | |
526 | .map_err(Error::library_go_away) | |
527 | })?; | |
528 | } | |
529 | Ordering::Equal => (), | |
530 | } | |
531 | } | |
532 | ||
533 | if let Some(val) = settings.is_push_enabled() { | |
534 | self.is_push_enabled = val | |
535 | } | |
536 | ||
537 | Ok(()) | |
538 | } | |
539 | ||
540 | pub fn clear_queues(&mut self, store: &mut Store, counts: &mut Counts) { | |
541 | self.prioritize.clear_pending_capacity(store, counts); | |
542 | self.prioritize.clear_pending_send(store, counts); | |
543 | self.prioritize.clear_pending_open(store, counts); | |
544 | } | |
545 | ||
546 | pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { | |
547 | if let Ok(next) = self.next_stream_id { | |
548 | if id >= next { | |
549 | return Err(Reason::PROTOCOL_ERROR); | |
550 | } | |
551 | } | |
552 | // if next_stream_id is overflowed, that's ok. | |
553 | ||
554 | Ok(()) | |
555 | } | |
556 | ||
557 | pub fn ensure_next_stream_id(&self) -> Result<StreamId, UserError> { | |
558 | self.next_stream_id | |
559 | .map_err(|_| UserError::OverflowedStreamId) | |
560 | } | |
561 | ||
562 | pub fn may_have_created_stream(&self, id: StreamId) -> bool { | |
563 | if let Ok(next_id) = self.next_stream_id { | |
564 | // Peer::is_local_init should have been called beforehand | |
565 | debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),); | |
566 | id < next_id | |
567 | } else { | |
568 | true | |
569 | } | |
570 | } | |
571 | ||
572 | pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) { | |
573 | if let Ok(next_id) = self.next_stream_id { | |
574 | // Peer::is_local_init should have been called beforehand | |
575 | debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated()); | |
576 | if id >= next_id { | |
577 | self.next_stream_id = id.next_id(); | |
578 | } | |
579 | } | |
580 | } | |
581 | ||
582 | pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { | |
583 | self.is_extended_connect_protocol_enabled | |
584 | } | |
585 | } |