]> git.proxmox.com Git - proxmox-backup.git/commitdiff
src/client/http_client.rs: use async for more functions
authorDietmar Maurer <dietmar@proxmox.com>
Wed, 4 Sep 2019 10:47:01 +0000 (12:47 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Wed, 4 Sep 2019 11:48:16 +0000 (13:48 +0200)
src/client/http_client.rs

index 60faa69e409991213cdc5a61c89e88e477ad601e..5478e69fbb48f679eb9fd67b67a9acaa5525eb77 100644 (file)
@@ -143,7 +143,7 @@ impl HttpClient {
         })
     }
 
-    /// Login future
+    /// Login
     ///
     /// Login is done on demand, so this is onyl required if you need
     /// access to authentication data in 'AuthInfo'.
@@ -188,97 +188,85 @@ impl HttpClient {
             .build::<_, Body>(https)
     }
 
-    pub fn request(&self, mut req: Request<Body>) -> impl Future<Output = Result<Value, Error>> {
-
-        let login = self.auth.listen();
+    pub async fn request(&self, mut req: Request<Body>) -> Result<Value, Error> {
 
         let client = self.client.clone();
 
-        login.and_then(move |auth| {
-
-            let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
-            req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
-            req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap());
+        let auth =  self.login().await?;
 
-            let request = Self::api_request(client, req);
+        let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
+        req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
+        req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap());
 
-            request
-        })
+        Self::api_request(client, req).await
     }
 
-    pub fn get(
+    pub async fn get(
         &self,
         path: &str,
         data: Option<Value>,
-    ) -> impl Future<Output = Result<Value, Error>> {
+    ) -> Result<Value, Error> {
         let req = Self::request_builder(&self.server, "GET", path, data).unwrap();
-        self.request(req)
+        self.request(req).await
     }
 
-    pub fn delete(
+    pub async fn delete(
         &mut self,
         path: &str,
         data: Option<Value>,
-    ) -> impl Future<Output = Result<Value, Error>> {
+    ) -> Result<Value, Error> {
         let req = Self::request_builder(&self.server, "DELETE", path, data).unwrap();
-        self.request(req)
+        self.request(req).await
     }
 
-    pub fn post(
+    pub async fn post(
         &mut self,
         path: &str,
         data: Option<Value>,
-    ) -> impl Future<Output = Result<Value, Error>> {
+    ) -> Result<Value, Error> {
         let req = Self::request_builder(&self.server, "POST", path, data).unwrap();
-        self.request(req)
+        self.request(req).await
     }
 
-    pub fn download<W: Write + Send + 'static>(
+    pub async fn download(
         &mut self,
         path: &str,
-        output: W,
-    ) ->  impl Future<Output = Result<W, Error>> {
+        output: &mut (dyn Write + Send),
+    ) ->  Result<(), Error> {
         let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap();
 
-        let login = self.auth.listen();
-
         let client = self.client.clone();
 
-        login.and_then(move |auth| {
+        let auth = self.login().await?;
 
-            let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
-            req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
+        let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
+        req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
 
-            client.request(req)
+        let resp = client.request(req).await?;
+        let status = resp.status();
+        if !status.is_success() {
+            HttpClient::api_response(resp)
+                .map(|_| Err(format_err!("unknown error")))
+                .await?
+        } else {
+            resp.into_body()
                 .map_err(Error::from)
-                .and_then(|resp| {
-                    let status = resp.status();
-                    if !status.is_success() {
-                        future::Either::Left(
-                            HttpClient::api_response(resp)
-                                .map(|_| Err(format_err!("unknown error")))
-                        )
-                    } else {
-                        future::Either::Right(
-                            resp.into_body()
-                                .map_err(Error::from)
-                                .try_fold(output, move |mut acc, chunk| async move {
-                                    acc.write_all(&chunk)?;
-                                    Ok::<_, Error>(acc)
-                                })
-                        )
-                    }
+                .try_fold(output, move |acc, chunk| async move {
+                    acc.write_all(&chunk)?;
+                    Ok::<_, Error>(acc)
                 })
-        })
+                .await?;
+        }
+        Ok(())
     }
 
-    pub fn upload(
+    pub async fn upload(
         &mut self,
         content_type: &str,
         body: Body,
         path: &str,
         data: Option<Value>,
-    ) -> impl Future<Output = Result<Value, Error>> {
+    ) -> Result<Value, Error> {
 
         let path = path.trim_matches('/');
         let mut url = format!("https://{}:8007/{}", &self.server, path);
@@ -298,17 +286,17 @@ impl HttpClient {
             .header("Content-Type", content_type)
             .body(body).unwrap();
 
-        self.request(req)
+        self.request(req).await
     }
 
-    pub fn start_backup(
+    pub async fn start_backup(
         &self,
         datastore: &str,
         backup_type: &str,
         backup_id: &str,
         backup_time: DateTime<Utc>,
         debug: bool,
-    ) -> impl Future<Output = Result<Arc<BackupClient>, Error>> {
+    ) -> Result<Arc<BackupClient>, Error> {
 
         let param = json!({
             "backup-type": backup_type,
@@ -320,18 +308,19 @@ impl HttpClient {
 
         let req = Self::request_builder(&self.server, "GET", "/api2/json/backup", Some(param)).unwrap();
 
-        self.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
-            .map_ok(|(h2, canceller)| BackupClient::new(h2, canceller))
+        let (h2, canceller) = self.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
+
+        Ok(BackupClient::new(h2, canceller))
     }
 
-    pub fn start_backup_reader(
+    pub async fn start_backup_reader(
         &self,
         datastore: &str,
         backup_type: &str,
         backup_id: &str,
         backup_time: DateTime<Utc>,
         debug: bool,
-    ) -> impl Future<Output = Result<Arc<BackupReader>, Error>> {
+    ) -> Result<Arc<BackupReader>, Error> {
 
         let param = json!({
             "backup-type": backup_type,
@@ -342,72 +331,63 @@ impl HttpClient {
         });
         let req = Self::request_builder(&self.server, "GET", "/api2/json/reader", Some(param)).unwrap();
 
-        self.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
-            .map_ok(|(h2, canceller)| BackupReader::new(h2, canceller))
+        let (h2, canceller) = self.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?;
+
+        Ok(BackupReader::new(h2, canceller))
     }
 
-    pub fn start_h2_connection(
+    pub async fn start_h2_connection(
         &self,
         mut req: Request<Body>,
         protocol_name: String,
-    ) -> impl Future<Output = Result<(H2Client, Canceller), Error>> {
+    ) -> Result<(H2Client, Canceller), Error> {
 
-        let login = self.auth.listen();
+        let auth = self.login().await?;
         let client = self.client.clone();
 
-        login.and_then(move |auth| {
+        let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
+        req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
+        req.headers_mut().insert("UPGRADE", HeaderValue::from_str(&protocol_name).unwrap());
 
-            let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
-            req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
-            req.headers_mut().insert("UPGRADE", HeaderValue::from_str(&protocol_name).unwrap());
+        let resp = client.request(req).await?;
+        let status = resp.status();
 
-            client.request(req)
-                .map_err(Error::from)
-                .and_then(|resp| {
-
-                    let status = resp.status();
-                    if status != http::StatusCode::SWITCHING_PROTOCOLS {
-                        future::Either::Left(
-                            Self::api_response(resp)
-                                .map(|_| Err(format_err!("unknown error")))
-                        )
-                    } else {
-                        future::Either::Right(
-                            resp
-                                .into_body()
-                                .on_upgrade()
-                                .map_err(Error::from)
-                        )
-                    }
-                })
-                .and_then(|upgraded| {
-                   let max_window_size = (1 << 31) - 2;
-
-                    h2::client::Builder::new()
-                        .initial_connection_window_size(max_window_size)
-                        .initial_window_size(max_window_size)
-                        .max_frame_size(4*1024*1024)
-                        .handshake(upgraded)
-                        .map_err(Error::from)
-                })
-                .and_then(|(h2, connection)| async move {
-                    let connection = connection
-                        .map_err(|_| panic!("HTTP/2.0 connection failed"));
-
-                    let (connection, canceller) = cancellable(connection)?;
-                    // A cancellable future returns an Option which is None when cancelled and
-                    // Some when it finished instead, since we don't care about the return type we
-                    // need to map it away:
-                    let connection = connection.map(|_| ());
-
-                    // Spawn a new task to drive the connection state
-                    hyper::rt::spawn(connection);
-
-                    // Wait until the `SendRequest` handle has available capacity.
-                    let c = h2.ready().await?;
-                    Ok((H2Client::new(c), canceller))
-                }.boxed())
-        })
+        if status != http::StatusCode::SWITCHING_PROTOCOLS {
+            Self::api_response(resp)
+                .map(|_| Err(format_err!("unknown error")))
+                .await?;
+            unreachable!();
+        }
+
+        let upgraded = resp
+            .into_body()
+            .on_upgrade()
+            .await?;
+
+        let max_window_size = (1 << 31) - 2;
+
+        let (h2, connection) = h2::client::Builder::new()
+            .initial_connection_window_size(max_window_size)
+            .initial_window_size(max_window_size)
+            .max_frame_size(4*1024*1024)
+            .handshake(upgraded)
+            .await?;
+
+        let connection = connection
+            .map_err(|_| panic!("HTTP/2.0 connection failed"));
+
+        let (connection, canceller) = cancellable(connection)?;
+        // A cancellable future returns an Option which is None when cancelled and
+        // Some when it finished instead, since we don't care about the return type we
+        // need to map it away:
+        let connection = connection.map(|_| ());
+
+        // Spawn a new task to drive the connection state
+        hyper::rt::spawn(connection);
+
+        // Wait until the `SendRequest` handle has available capacity.
+        let c = h2.ready().await?;
+        Ok((H2Client::new(c), canceller))
     }
 
     async fn credentials(
@@ -450,14 +430,15 @@ impl HttpClient {
         }
     }
 
-    fn api_request(
+    async fn api_request(
         client: Client<HttpsConnector>,
         req: Request<Body>
-    ) -> impl Future<Output = Result<Value, Error>> {
+    ) -> Result<Value, Error> {
 
         client.request(req)
             .map_err(Error::from)
             .and_then(Self::api_response)
+            .await
     }
 
     pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> {
@@ -953,7 +934,7 @@ impl BackupClient {
                 let csum = guard.as_mut().unwrap();
 
                 let chunk_end = offset + chunk_len as u64;
-                
+
                 csum.update(&chunk_end.to_le_bytes());
                 csum.update(digest);