]>
Commit | Line | Data |
---|---|---|
25ddc15f DM |
1 | // Implement simple flow control for h2 client |
2 | // | |
3 | // See also: hyper/src/proto/h2/mod.rs | |
4 | ||
2107bb40 WB |
5 | use std::pin::Pin; |
6 | use std::task::{Context, Poll}; | |
7 | ||
f7d4e4b5 | 8 | use anyhow::{format_err, Error}; |
be3a0295 | 9 | use bytes::Bytes; |
2107bb40 | 10 | use futures::{ready, Future}; |
7a57cb77 | 11 | use h2::SendStream; |
25ddc15f DM |
12 | |
13 | pub struct PipeToSendStream { | |
14 | body_tx: SendStream<Bytes>, | |
15 | data: Option<Bytes>, | |
16 | } | |
17 | ||
18 | impl PipeToSendStream { | |
19 | pub fn new(data: Bytes, tx: SendStream<Bytes>) -> PipeToSendStream { | |
20 | PipeToSendStream { | |
21 | body_tx: tx, | |
22 | data: Some(data), | |
23 | } | |
24 | } | |
25 | } | |
26 | ||
27 | impl Future for PipeToSendStream { | |
2107bb40 WB |
28 | type Output = Result<(), Error>; |
29 | ||
30 | fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { | |
31 | let this = self.get_mut(); | |
25ddc15f | 32 | |
bd430c22 WB |
33 | if this.data != None { |
34 | // just reserve 1 byte to make sure there's some | |
35 | // capacity available. h2 will handle the capacity | |
36 | // management for the actual body chunk. | |
37 | this.body_tx.reserve_capacity(1); | |
25ddc15f | 38 | |
bd430c22 WB |
39 | if this.body_tx.capacity() == 0 { |
40 | loop { | |
41 | match ready!(this.body_tx.poll_capacity(cx)) { | |
42 | Some(Err(err)) => return Poll::Ready(Err(Error::from(err))), | |
43 | Some(Ok(0)) => {} | |
44 | Some(Ok(_)) => break, | |
45 | None => return Poll::Ready(Err(format_err!("protocol canceled"))), | |
25ddc15f DM |
46 | } |
47 | } | |
62ee2eb4 DM |
48 | } else if let Poll::Ready(reset) = this.body_tx.poll_reset(cx) { |
49 | return Poll::Ready(Err(match reset { | |
50 | Ok(reason) => format_err!("stream received RST_STREAM: {:?}", reason), | |
51 | Err(err) => Error::from(err), | |
52 | })); | |
25ddc15f | 53 | } |
bd430c22 WB |
54 | |
55 | this.body_tx | |
56 | .send_data(this.data.take().unwrap(), true) | |
57 | .map_err(Error::from)?; | |
58 | ||
62ee2eb4 | 59 | Poll::Ready(Ok(())) |
bd430c22 WB |
60 | } else { |
61 | if let Poll::Ready(reset) = this.body_tx.poll_reset(cx) { | |
62 | return Poll::Ready(Err(match reset { | |
63 | Ok(reason) => format_err!("stream received RST_STREAM: {:?}", reason), | |
64 | Err(err) => Error::from(err), | |
65 | })); | |
66 | } | |
62ee2eb4 | 67 | Poll::Ready(Ok(())) |
25ddc15f DM |
68 | } |
69 | } | |
70 | } |