]>
git.proxmox.com Git - proxmox-backup.git/blob - src/client/pipe_to_stream.rs
1 // Implement simple flow control for h2 client
3 // See also: hyper/src/proto/h2/mod.rs
6 use std
::task
::{Context, Poll}
;
10 use futures
::{ready, Future}
;
13 pub struct PipeToSendStream
{
14 body_tx
: SendStream
<Bytes
>,
18 impl PipeToSendStream
{
19 pub fn new(data
: Bytes
, tx
: SendStream
<Bytes
>) -> PipeToSendStream
{
27 impl Future
for PipeToSendStream
{
28 type Output
= Result
<(), Error
>;
30 fn poll(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Self::Output
> {
31 let this
= self.get_mut();
33 if this
.data
!= None
{
34 // just reserve 1 byte to make sure there's some
35 // capacity available. h2 will handle the capacity
36 // management for the actual body chunk.
37 this
.body_tx
.reserve_capacity(1);
39 if this
.body_tx
.capacity() == 0 {
41 match ready
!(this
.body_tx
.poll_capacity(cx
)) {
42 Some(Err(err
)) => return Poll
::Ready(Err(Error
::from(err
))),
45 None
=> return Poll
::Ready(Err(format_err
!("protocol canceled"))),
48 } else if let Poll
::Ready(reset
) = this
.body_tx
.poll_reset(cx
) {
49 return Poll
::Ready(Err(match reset
{
50 Ok(reason
) => format_err
!("stream received RST_STREAM: {:?}", reason
),
51 Err(err
) => Error
::from(err
),
56 .send_data(this
.data
.take().unwrap(), true)
57 .map_err(Error
::from
)?
;
61 if let Poll
::Ready(reset
) = this
.body_tx
.poll_reset(cx
) {
62 return Poll
::Ready(Err(match reset
{
63 Ok(reason
) => format_err
!("stream received RST_STREAM: {:?}", reason
),
64 Err(err
) => Error
::from(err
),