]> git.proxmox.com Git - proxmox-backup.git/blob - pbs-client/src/pipe_to_stream.rs
move fuse code from pbs-client to pbs-pxar-fuse
[proxmox-backup.git] / pbs-client / src / pipe_to_stream.rs
1 // Implement simple flow control for h2 client
2 //
3 // See also: hyper/src/proto/h2/mod.rs
4
5 use std::pin::Pin;
6 use std::task::{Context, Poll};
7
8 use anyhow::{format_err, Error};
9 use bytes::Bytes;
10 use futures::{ready, Future};
11 use h2::SendStream;
12
13 pub struct PipeToSendStream {
14 body_tx: SendStream<Bytes>,
15 data: Option<Bytes>,
16 }
17
18 impl PipeToSendStream {
19 pub fn new(data: Bytes, tx: SendStream<Bytes>) -> PipeToSendStream {
20 PipeToSendStream {
21 body_tx: tx,
22 data: Some(data),
23 }
24 }
25 }
26
27 impl Future for PipeToSendStream {
28 type Output = Result<(), Error>;
29
30 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
31 let this = self.get_mut();
32
33 if this.data.is_some() {
34 // just reserve 1 byte to make sure there's some
35 // capacity available. h2 will handle the capacity
36 // management for the actual body chunk.
37 this.body_tx.reserve_capacity(1);
38
39 if this.body_tx.capacity() == 0 {
40 loop {
41 match ready!(this.body_tx.poll_capacity(cx)) {
42 Some(Err(err)) => return Poll::Ready(Err(Error::from(err))),
43 Some(Ok(0)) => {}
44 Some(Ok(_)) => break,
45 None => return Poll::Ready(Err(format_err!("protocol canceled"))),
46 }
47 }
48 } else if let Poll::Ready(reset) = this.body_tx.poll_reset(cx) {
49 return Poll::Ready(Err(match reset {
50 Ok(reason) => format_err!("stream received RST_STREAM: {:?}", reason),
51 Err(err) => Error::from(err),
52 }));
53 }
54
55 this.body_tx
56 .send_data(this.data.take().unwrap(), true)
57 .map_err(Error::from)?;
58
59 Poll::Ready(Ok(()))
60 } else {
61 if let Poll::Ready(reset) = this.body_tx.poll_reset(cx) {
62 return Poll::Ready(Err(match reset {
63 Ok(reason) => format_err!("stream received RST_STREAM: {:?}", reason),
64 Err(err) => Error::from(err),
65 }));
66 }
67 Poll::Ready(Ok(()))
68 }
69 }
70 }