]> git.proxmox.com Git - proxmox-backup.git/commitdiff
src/client/http_client.rs: move low level H2 code into separate class
authorDietmar Maurer <dietmar@proxmox.com>
Wed, 22 May 2019 15:28:25 +0000 (17:28 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Wed, 22 May 2019 15:28:25 +0000 (17:28 +0200)
src/client/http_client.rs

index afeb9c93c8e3b5673a6c273a67477b27577e2e2b..dc94e8f68ca76cf34d55577d1e9789e57bdcf779 100644 (file)
@@ -275,12 +275,11 @@ impl HttpClient {
 
                     let status = resp.status();
                     if status != http::StatusCode::SWITCHING_PROTOCOLS {
-                        bail!("got status {:?} instead of protocol switch", status);
+                        future::Either::A(Self::api_response(resp).and_then(|_| { bail!("unknown error"); }))
+                    } else {
+                        future::Either::B(resp.into_body().on_upgrade().map_err(Error::from))
                     }
-
-                    Ok(resp.into_body().on_upgrade().map_err(Error::from))
                 })
-                .flatten()
                 .and_then(|upgraded| {
                     h2::client::handshake(upgraded).map_err(Error::from)
                 })
@@ -403,53 +402,25 @@ impl HttpClient {
 
 //#[derive(Clone)]
 pub struct BackupClient {
-    h2: h2::client::SendRequest<bytes::Bytes>,
+    h2: H2Client,
 }
 
 impl BackupClient {
 
     pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
-        Self { h2 }
+        Self { h2: H2Client::new(h2) }
     }
 
     pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
-        let req = Self::request_builder("localhost", "GET", path, param).unwrap();
-        Self::request(self.h2.clone(), req)
+        self.h2.get(path, param)
     }
 
     pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
-        let req = Self::request_builder("localhost", "PUT", path, param).unwrap();
-        Self::request(self.h2.clone(), req)
+        self.h2.put(path, param)
     }
 
     pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
-        Self::h2post(self.h2.clone(), path, param)
-    }
-
-    fn h2post(
-        h2: h2::client::SendRequest<bytes::Bytes>,
-        path: &str,
-        param: Option<Value>
-    ) -> impl Future<Item=Value, Error=Error> {
-        let req = Self::request_builder("localhost", "POST", path, param).unwrap();
-        Self::request(h2, req)
-    }
-
-    pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
-        let request = Self::request_builder("localhost", "POST", path, param).unwrap();
-
-        self.h2.clone()
-            .ready()
-            .map_err(Error::from)
-            .and_then(move |mut send_request| {
-                let (response, stream) = send_request.send_request(request, false).unwrap();
-                PipeToSendStream::new(bytes::Bytes::from(data), stream)
-                    .and_then(|_| {
-                        response
-                            .map_err(Error::from)
-                            .and_then(Self::h2api_response)
-                    })
-            })
+        self.h2.post(path, param)
     }
 
     fn response_queue() -> (
@@ -465,7 +436,7 @@ impl BackupClient {
                 .for_each(|response: h2::client::ResponseFuture| {
                     response
                         .map_err(Error::from)
-                        .and_then(Self::h2api_response)
+                        .and_then(H2Client::h2api_response)
                         .and_then(|result| {
                             println!("RESPONSE: {:?}", result);
                             Ok(())
@@ -482,16 +453,16 @@ impl BackupClient {
     }
 
     fn download_chunk_list(
-        h2: h2::client::SendRequest<bytes::Bytes>,
+        h2: H2Client,
         path: &str,
         archive_name: &str,
         known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
     ) -> impl Future<Item=(), Error=Error> {
 
         let param = json!({ "archive-name": archive_name });
-        let request = Self::request_builder("localhost", "GET", path, Some(param)).unwrap();
+        let request = H2Client::request_builder("localhost", "GET", path, Some(param)).unwrap();
 
-        Self::send_request(h2.clone(), request, None)
+        h2.send_request(request, None)
             .and_then(move |response| {
                 response
                     .map_err(Error::from)
@@ -521,8 +492,7 @@ impl BackupClient {
     }
 
     pub fn finish(&self) -> impl Future<Item=(), Error=Error> {
-        Self::h2post(self.h2.clone(), "finish", None)
-            .map(|_| ())
+        self.h2.clone().post("finish", None).map(|_| ())
     }
 
     pub fn upload_dynamic_stream(
@@ -542,25 +512,20 @@ impl BackupClient {
 
         Self::download_chunk_list(h2, "dynamic_index", archive_name, known_chunks.clone())
             .and_then(move |_| {
-                Self::h2post(h2_2, "dynamic_index", Some(param))
+                h2_2.post("dynamic_index", Some(param))
             })
             .and_then(move |res| {
-                println!("GOT1 {:?}", res);
                 let wid = res.as_u64().unwrap();
-                //let dir_path = PathBuf::from("../casync-pve");
-                //let dir_path = PathBuf::from(".");
-
-                //upload_pxar(h2, known_chunks, &dir_path, wid).unwrap()
                 Self::upload_stream(h2_3, wid, stream, known_chunks.clone())
                     .and_then(move |_size| {
-                        Self::h2post(h2_4, "dynamic_close", Some(json!({ "wid": wid })))
+                        h2_4.post("dynamic_close", Some(json!({ "wid": wid })))
                     })
                     .map(|_| ())
             })
     }
 
     fn upload_stream(
-        h2: h2::client::SendRequest<bytes::Bytes>,
+        h2: H2Client,
         wid: u64,
         stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
         known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
@@ -576,7 +541,6 @@ impl BackupClient {
 
         let start_time = std::time::Instant::now();
 
-
         stream
             .for_each(move |data| {
                 let h2 = h2.clone();
@@ -597,17 +561,17 @@ impl BackupClient {
                 if chunk_is_known {
                     println!("append existing chunk ({} bytes)", data.len());
                     let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&digest) });
-                    request = Self::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap();
+                    request = H2Client::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap();
                     upload_data = None;
                 } else {
                     println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&digest), data.len());
                     known_chunks.insert(digest);
                     let param = json!({ "wid": wid, "size" : data.len() });
-                    request = Self::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap();
+                    request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap();
                     upload_data = Some(bytes::Bytes::from(data));
                 }
 
-                Self::send_request(h2, request, upload_data)
+                h2.send_request(request, upload_data)
                     .and_then(move |response| {
                         upload_queue.send(response)
                             .map(|_| ()).map_err(Error::from)
@@ -666,8 +630,8 @@ impl BackupClient {
                 let upload_queue = upload_queue.clone();
 
                 println!("send test data ({} bytes)", data.len());
-                let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap();
-                Self::send_request(h2, request, Some(bytes::Bytes::from(data)))
+                let request = H2Client::request_builder("localhost", "POST", "speedtest", None).unwrap();
+                h2.send_request(request, Some(bytes::Bytes::from(data)))
                     .and_then(move |response| {
                         upload_queue.send(response)
                             .map(|_| ()).map_err(Error::from)
@@ -690,13 +654,58 @@ impl BackupClient {
                 Ok(speed)
             })
     }
+}
+
+#[derive(Clone)]
+pub struct H2Client {
+    h2: h2::client::SendRequest<bytes::Bytes>,
+}
+
+impl H2Client {
+
+    pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
+        Self { h2 }
+    }
+
+    pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
+        let req = Self::request_builder("localhost", "GET", path, param).unwrap();
+        self.request(req)
+    }
+
+    pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
+        let req = Self::request_builder("localhost", "PUT", path, param).unwrap();
+        self.request(req)
+    }
+
+    pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
+        let req = Self::request_builder("localhost", "POST", path, param).unwrap();
+        self.request(req)
+    }
+
+    pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
+        let request = Self::request_builder("localhost", "POST", path, param).unwrap();
+
+
+        self.h2.clone()
+            .ready()
+            .map_err(Error::from)
+            .and_then(move |mut send_request| {
+                let (response, stream) = send_request.send_request(request, false).unwrap();
+                PipeToSendStream::new(bytes::Bytes::from(data), stream)
+                    .and_then(|_| {
+                        response
+                            .map_err(Error::from)
+                            .and_then(Self::h2api_response)
+                    })
+            })
+    }
 
     fn request(
-        h2: h2::client::SendRequest<bytes::Bytes>,
+        &self,
         request: Request<()>,
     ) -> impl Future<Item=Value, Error=Error> {
 
-        Self::send_request(h2, request, None)
+        self.send_request(request, None)
             .and_then(move |response| {
                 response
                     .map_err(Error::from)
@@ -705,12 +714,12 @@ impl BackupClient {
     }
 
     fn send_request(
-        h2: h2::client::SendRequest<bytes::Bytes>,
+        &self,
         request: Request<()>,
         data: Option<bytes::Bytes>,
     ) -> impl Future<Item=h2::client::ResponseFuture, Error=Error> {
 
-        h2
+        self.h2.clone()
             .ready()
             .map_err(Error::from)
             .and_then(move |mut send_request| {