]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/pipe_to_stream.rs
move 'wait_for_local_worker' from client to server
[proxmox-backup.git] / src / client / pipe_to_stream.rs
CommitLineData
25ddc15f
DM
1// Implement simple flow control for h2 client
2//
3// See also: hyper/src/proto/h2/mod.rs
4
2107bb40
WB
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
f7d4e4b5 8use anyhow::{format_err, Error};
be3a0295 9use bytes::Bytes;
2107bb40 10use futures::{ready, Future};
7a57cb77 11use h2::SendStream;
25ddc15f
DM
12
13pub struct PipeToSendStream {
14 body_tx: SendStream<Bytes>,
15 data: Option<Bytes>,
16}
17
18impl PipeToSendStream {
19 pub fn new(data: Bytes, tx: SendStream<Bytes>) -> PipeToSendStream {
20 PipeToSendStream {
21 body_tx: tx,
22 data: Some(data),
23 }
24 }
25}
26
27impl Future for PipeToSendStream {
2107bb40
WB
28 type Output = Result<(), Error>;
29
30 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
31 let this = self.get_mut();
25ddc15f 32
bd430c22
WB
33 if this.data != None {
34 // just reserve 1 byte to make sure there's some
35 // capacity available. h2 will handle the capacity
36 // management for the actual body chunk.
37 this.body_tx.reserve_capacity(1);
25ddc15f 38
bd430c22
WB
39 if this.body_tx.capacity() == 0 {
40 loop {
41 match ready!(this.body_tx.poll_capacity(cx)) {
42 Some(Err(err)) => return Poll::Ready(Err(Error::from(err))),
43 Some(Ok(0)) => {}
44 Some(Ok(_)) => break,
45 None => return Poll::Ready(Err(format_err!("protocol canceled"))),
25ddc15f
DM
46 }
47 }
62ee2eb4
DM
48 } else if let Poll::Ready(reset) = this.body_tx.poll_reset(cx) {
49 return Poll::Ready(Err(match reset {
50 Ok(reason) => format_err!("stream received RST_STREAM: {:?}", reason),
51 Err(err) => Error::from(err),
52 }));
25ddc15f 53 }
bd430c22
WB
54
55 this.body_tx
56 .send_data(this.data.take().unwrap(), true)
57 .map_err(Error::from)?;
58
62ee2eb4 59 Poll::Ready(Ok(()))
bd430c22
WB
60 } else {
61 if let Poll::Ready(reset) = this.body_tx.poll_reset(cx) {
62 return Poll::Ready(Err(match reset {
63 Ok(reason) => format_err!("stream received RST_STREAM: {:?}", reason),
64 Err(err) => Error::from(err),
65 }));
66 }
62ee2eb4 67 Poll::Ready(Ok(()))
25ddc15f
DM
68 }
69 }
70}