]> git.proxmox.com Git - proxmox-backup.git/commitdiff
src/client/http_client.rs: switch to async
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Fri, 23 Aug 2019 12:33:48 +0000 (14:33 +0200)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Mon, 2 Sep 2019 13:21:26 +0000 (15:21 +0200)
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
src/client/http_client.rs

index 1bdd0d7d90b90082f96860bfe78452beccf026a1..0c6b1e931a9ea815dab2947bb6ba1f38f0b8b35b 100644 (file)
@@ -14,7 +14,8 @@ use hyper::Body;
 use hyper::client::Client;
 use openssl::ssl::{SslConnector, SslMethod};
 use serde_json::{json, Value};
-use tokio::sync::mpsc;
+use tokio::io::AsyncReadExt;
+use tokio::sync::{mpsc, oneshot};
 use url::percent_encoding::{percent_encode,  DEFAULT_ENCODE_SET};
 use xdg::BaseDirectories;
 
@@ -145,7 +146,7 @@ impl HttpClient {
     ///
     /// Login is done on demand, so this is onyl required if you need
     /// access to authentication data in 'AuthInfo'.
-    pub fn login(&self) -> impl Future<Item=AuthInfo, Error=Error> {
+    pub fn login(&self) -> impl Future<Output = Result<AuthInfo, Error>> {
         self.auth.listen()
     }
 
@@ -186,7 +187,7 @@ impl HttpClient {
             .build::<_, Body>(https)
     }
 
-    pub fn request(&self, mut req: Request<Body>) -> impl Future<Item=Value, Error=Error>  {
+    pub fn request(&self, mut req: Request<Body>) -> impl Future<Output = Result<Value, Error>> {
 
         let login = self.auth.listen();
 
@@ -204,26 +205,38 @@ impl HttpClient {
         })
     }
 
-    pub fn get(&self, path: &str, data: Option<Value>) -> impl Future<Item=Value, Error=Error> {
-
+    pub fn get(
+        &self,
+        path: &str,
+        data: Option<Value>,
+    ) -> impl Future<Output = Result<Value, Error>> {
         let req = Self::request_builder(&self.server, "GET", path, data).unwrap();
         self.request(req)
     }
 
-    pub fn delete(&mut self, path: &str, data: Option<Value>) -> impl Future<Item=Value, Error=Error> {
-
+    pub fn delete(
+        &mut self,
+        path: &str,
+        data: Option<Value>,
+    ) -> impl Future<Output = Result<Value, Error>> {
         let req = Self::request_builder(&self.server, "DELETE", path, data).unwrap();
         self.request(req)
     }
 
-    pub fn post(&mut self, path: &str, data: Option<Value>) -> impl Future<Item=Value, Error=Error> {
-
+    pub fn post(
+        &mut self,
+        path: &str,
+        data: Option<Value>,
+    ) -> impl Future<Output = Result<Value, Error>> {
         let req = Self::request_builder(&self.server, "POST", path, data).unwrap();
         self.request(req)
     }
 
-    pub fn download<W: Write>(&mut self, path: &str, output: W) ->  impl Future<Item=W, Error=Error> {
-
+    pub fn download<W: Write + Send + 'static>(
+        &mut self,
+        path: &str,
+        output: W,
+    ) ->  impl Future<Output = Result<W, Error>> {
         let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap();
 
         let login = self.auth.listen();
@@ -240,15 +253,15 @@ impl HttpClient {
                 .and_then(|resp| {
                     let status = resp.status();
                     if !status.is_success() {
-                        future::Either::A(
+                        future::Either::Left(
                             HttpClient::api_response(resp)
-                                .and_then(|_| { bail!("unknown error"); })
+                                .map(|_| Err(format_err!("unknown error")))
                         )
                     } else {
-                        future::Either::B(
+                        future::Either::Right(
                             resp.into_body()
                                 .map_err(Error::from)
-                                .fold(output, move |mut acc, chunk| {
+                                .try_fold(output, move |mut acc, chunk| async move {
                                     acc.write_all(&chunk)?;
                                     Ok::<_, Error>(acc)
                                 })
@@ -264,7 +277,7 @@ impl HttpClient {
         body: Body,
         path: &str,
         data: Option<Value>,
-    ) -> impl Future<Item=Value, Error=Error> {
+    ) -> impl Future<Output = Result<Value, Error>> {
 
         let path = path.trim_matches('/');
         let mut url = format!("https://{}:8007/{}", &self.server, path);
@@ -294,7 +307,7 @@ impl HttpClient {
         backup_id: &str,
         backup_time: DateTime<Utc>,
         debug: bool,
-    ) -> impl Future<Item=Arc<BackupClient>, Error=Error> {
+    ) -> impl Future<Output = Result<Arc<BackupClient>, Error>> {
 
         let param = json!({
             "backup-type": backup_type,
@@ -307,7 +320,7 @@ impl HttpClient {
         let req = Self::request_builder(&self.server, "GET", "/api2/json/backup", Some(param)).unwrap();
 
         self.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
-            .map(|(h2, canceller)| BackupClient::new(h2, canceller))
+            .map_ok(|(h2, canceller)| BackupClient::new(h2, canceller))
     }
 
     pub fn start_backup_reader(
@@ -317,7 +330,7 @@ impl HttpClient {
         backup_id: &str,
         backup_time: DateTime<Utc>,
         debug: bool,
-    ) -> impl Future<Item=Arc<BackupReader>, Error=Error> {
+    ) -> impl Future<Output = Result<Arc<BackupReader>, Error>> {
 
         let param = json!({
             "backup-type": backup_type,
@@ -329,14 +342,14 @@ impl HttpClient {
         let req = Self::request_builder(&self.server, "GET", "/api2/json/reader", Some(param)).unwrap();
 
         self.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
-            .map(|(h2, canceller)| BackupReader::new(h2, canceller))
+            .map_ok(|(h2, canceller)| BackupReader::new(h2, canceller))
     }
 
     pub fn start_h2_connection(
         &self,
         mut req: Request<Body>,
         protocol_name: String,
-    ) -> impl Future<Item=(H2Client, Canceller), Error=Error> {
+    ) -> impl Future<Output = Result<(H2Client, Canceller), Error>> {
 
         let login = self.auth.listen();
         let client = self.client.clone();
@@ -353,9 +366,17 @@ impl HttpClient {
 
                     let status = resp.status();
                     if status != http::StatusCode::SWITCHING_PROTOCOLS {
-                        future::Either::A(Self::api_response(resp).and_then(|_| { bail!("unknown error"); }))
+                        future::Either::Left(
+                            Self::api_response(resp)
+                                .map(|_| Err(format_err!("unknown error")))
+                        )
                     } else {
-                        future::Either::B(resp.into_body().on_upgrade().map_err(Error::from))
+                        future::Either::Right(
+                            resp
+                                .into_body()
+                                .on_upgrade()
+                                .map_err(Error::from)
+                        )
                     }
                 })
                 .and_then(|upgraded| {
@@ -368,7 +389,7 @@ impl HttpClient {
                         .handshake(upgraded)
                         .map_err(Error::from)
                 })
-                .and_then(|(h2, connection)| {
+                .and_then(|(h2, connection)| async move {
                     let connection = connection
                         .map_err(|_| panic!("HTTP/2.0 connection failed"));
 
@@ -382,11 +403,9 @@ impl HttpClient {
                     hyper::rt::spawn(connection);
 
                     // Wait until the `SendRequest` handle has available capacity.
-                    Ok(h2.ready()
-                       .map(move |c| (H2Client::new(c), canceller))
-                       .map_err(Error::from))
-                })
-                .flatten()
+                    let c = h2.ready().await?;
+                    Ok((H2Client::new(c), canceller))
+                }.boxed())
         })
     }
 
@@ -395,60 +414,47 @@ impl HttpClient {
         server: String,
         username: String,
         password: String,
-    ) -> Box<dyn Future<Item=AuthInfo, Error=Error> + Send> {
-
-        let server2 = server.clone();
-
-        let create_request = futures::future::lazy(move || {
+    ) -> Box<dyn Future<Output = Result<AuthInfo, Error>> + Send> {
+        Box::new(async move {
             let data = json!({ "username": username, "password": password });
             let req = Self::request_builder(&server, "POST", "/api2/json/access/ticket", Some(data)).unwrap();
-            Self::api_request(client, req)
-        });
-
-        let login_future = create_request
-            .and_then(move |cred| {
-                let auth = AuthInfo {
-                    username: cred["data"]["username"].as_str().unwrap().to_owned(),
-                    ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(),
-                    token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
-                };
+            let cred = Self::api_request(client, req).await?;
+            let auth = AuthInfo {
+                username: cred["data"]["username"].as_str().unwrap().to_owned(),
+                ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(),
+                token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
+            };
 
-                let _ = store_ticket_info(&server2, &auth.username, &auth.ticket, &auth.token);
+            let _ = store_ticket_info(&server, &auth.username, &auth.ticket, &auth.token);
 
-                Ok(auth)
-            });
-
-        Box::new(login_future)
+            Ok(auth)
+        })
     }
 
-    fn api_response(response: Response<Body>) -> impl Future<Item=Value, Error=Error> {
-
+    async fn api_response(response: Response<Body>) -> Result<Value, Error> {
         let status = response.status();
-
-        response
+        let data = response
             .into_body()
-            .concat2()
-            .map_err(Error::from)
-            .and_then(move |data| {
-
-                let text = String::from_utf8(data.to_vec()).unwrap();
-                if status.is_success() {
-                    if text.len() > 0 {
-                        let value: Value = serde_json::from_str(&text)?;
-                        Ok(value)
-                    } else {
-                        Ok(Value::Null)
-                    }
-                } else {
-                    bail!("HTTP Error {}: {}", status, text);
-                }
-            })
+            .try_concat()
+            .await?;
+
+        let text = String::from_utf8(data.to_vec()).unwrap();
+        if status.is_success() {
+            if text.len() > 0 {
+                let value: Value = serde_json::from_str(&text)?;
+                Ok(value)
+            } else {
+                Ok(Value::Null)
+            }
+        } else {
+            bail!("HTTP Error {}: {}", status, text);
+        }
     }
 
     fn api_request(
         client: Client<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>>,
         req: Request<Body>
-    ) -> impl Future<Item=Value, Error=Error> {
+    ) -> impl Future<Output = Result<Value, Error>> {
 
         client.request(req)
             .map_err(Error::from)
@@ -511,40 +517,52 @@ impl BackupReader {
         Arc::new(Self { h2, canceller: canceller })
     }
 
-    pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
+    pub fn get(
+        &self,
+        path: &str,
+        param: Option<Value>,
+    ) -> impl Future<Output = Result<Value, Error>> {
         self.h2.get(path, param)
     }
 
-    pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
+    pub fn put(
+        &self,
+        path: &str,
+        param: Option<Value>,
+    ) -> impl Future<Output = Result<Value, Error>> {
         self.h2.put(path, param)
     }
 
-    pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
+    pub fn post(
+        &self,
+        path: &str,
+        param: Option<Value>,
+    ) -> impl Future<Output = Result<Value, Error>> {
         self.h2.post(path, param)
     }
 
-    pub fn download<W: Write>(
+    pub fn download<W: Write + Send + 'static>(
         &self,
         file_name: &str,
         output: W,
-    ) -> impl Future<Item=W, Error=Error> {
+    ) -> impl Future<Output = Result<W, Error>> {
         let path = "download";
         let param = json!({ "file-name": file_name });
         self.h2.download(path, Some(param), output)
     }
 
-    pub fn speedtest<W: Write>(
+    pub fn speedtest<W: Write + Send + 'static>(
         &self,
         output: W,
-    ) -> impl Future<Item=W, Error=Error> {
+    ) -> impl Future<Output = Result<W, Error>> {
         self.h2.download("speedtest", None, output)
     }
 
-    pub fn download_chunk<W: Write>(
+    pub fn download_chunk<W: Write + Send + 'static>(
         &self,
         digest: &[u8; 32],
         output: W,
-    ) -> impl Future<Item=W, Error=Error> {
+    ) -> impl Future<Output = Result<W, Error>> {
         let path = "chunk";
         let param = json!({ "digest": digest_to_hex(digest) });
         self.h2.download(path, Some(param), output)
@@ -573,27 +591,38 @@ pub struct BackupStats {
 }
 
 impl BackupClient {
-
     pub fn new(h2: H2Client, canceller: Canceller) -> Arc<Self> {
         Arc::new(Self { h2, canceller })
     }
 
-    pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
+    pub fn get(
+        &self,
+        path: &str,
+        param: Option<Value>,
+    ) -> impl Future<Output = Result<Value, Error>> {
         self.h2.get(path, param)
     }
 
-    pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
+    pub fn put(
+        &self,
+        path: &str,
+        param: Option<Value>,
+    ) -> impl Future<Output = Result<Value, Error>> {
         self.h2.put(path, param)
     }
 
-    pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
+    pub fn post(
+        &self,
+        path: &str,
+        param: Option<Value>,
+    ) -> impl Future<Output = Result<Value, Error>> {
         self.h2.post(path, param)
     }
 
-    pub fn finish(self: Arc<Self>) -> impl Future<Item=(), Error=Error> {
+    pub fn finish(self: Arc<Self>) -> impl Future<Output = Result<(), Error>> {
         self.h2.clone()
             .post("finish", None)
-            .map(move |_| {
+            .map_ok(move |_| {
                 self.canceller.cancel();
             })
     }
@@ -606,27 +635,22 @@ impl BackupClient {
         &self,
         mut reader: R,
         file_name: &str,
-     ) -> impl Future<Item=BackupStats, Error=Error> {
+     ) -> impl Future<Output = Result<BackupStats, Error>> {
 
         let h2 = self.h2.clone();
         let file_name = file_name.to_owned();
 
-        futures::future::ok(())
-            .and_then(move |_| {
-                let mut raw_data = Vec::new();
-                // fixme: avoid loading into memory
-                reader.read_to_end(&mut raw_data)?;
-                Ok(raw_data)
-            })
-            .and_then(move |raw_data| {
-                let csum = openssl::sha::sha256(&raw_data);
-                let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
-                let size = raw_data.len() as u64; // fixme: should be decoded size instead??
-                h2.upload("blob", Some(param), raw_data)
-                    .map(move |_| {
-                        BackupStats { size, csum }
-                    })
-            })
+        async move {
+            let mut raw_data = Vec::new();
+            // fixme: avoid loading into memory
+            reader.read_to_end(&mut raw_data)?;
+
+            let csum = openssl::sha::sha256(&raw_data);
+            let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
+            let size = raw_data.len() as u64; // fixme: should be decoded size instead??
+            let _value = h2.upload("blob", Some(param), raw_data).await?;
+            Ok(BackupStats { size, csum })
+        }
     }
 
     pub fn upload_blob_from_data(
@@ -636,35 +660,30 @@ impl BackupClient {
         crypt_config: Option<Arc<CryptConfig>>,
         compress: bool,
         sign_only: bool,
-     ) -> impl Future<Item=BackupStats, Error=Error> {
+     ) -> impl Future<Output = Result<BackupStats, Error>> {
 
         let h2 = self.h2.clone();
         let file_name = file_name.to_owned();
         let size = data.len() as u64;
 
-        futures::future::ok(())
-            .and_then(move |_| {
-                let blob = if let Some(crypt_config) = crypt_config {
-                    if sign_only {
-                        DataBlob::create_signed(&data, crypt_config, compress)?
-                    } else {
-                        DataBlob::encode(&data, Some(crypt_config.clone()), compress)?
-                    }
+        async move {
+            let blob = if let Some(crypt_config) = crypt_config {
+                if sign_only {
+                    DataBlob::create_signed(&data, crypt_config, compress)?
                 } else {
-                    DataBlob::encode(&data, None, compress)?
-                };
+                    DataBlob::encode(&data, Some(crypt_config.clone()), compress)?
+                }
+            } else {
+                DataBlob::encode(&data, None, compress)?
+            };
 
-                let raw_data = blob.into_inner();
-                Ok(raw_data)
-            })
-            .and_then(move |raw_data| {
-                let csum = openssl::sha::sha256(&raw_data);
-                let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
-                h2.upload("blob", Some(param), raw_data)
-                    .map(move |_| {
-                        BackupStats { size, csum }
-                    })
-            })
+            let raw_data = blob.into_inner();
+
+            let csum = openssl::sha::sha256(&raw_data);
+            let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
+            let _value = h2.upload("blob", Some(param), raw_data).await?;
+            Ok(BackupStats { size, csum })
+        }
     }
 
     pub fn upload_blob_from_file<P: AsRef<std::path::Path>>(
@@ -673,52 +692,43 @@ impl BackupClient {
         file_name: &str,
         crypt_config: Option<Arc<CryptConfig>>,
         compress: bool,
-     ) -> impl Future<Item=BackupStats, Error=Error> {
+     ) -> impl Future<Output = Result<BackupStats, Error>> {
 
         let h2 = self.h2.clone();
         let file_name = file_name.to_owned();
         let src_path = src_path.as_ref().to_owned();
 
-        let task = tokio::fs::File::open(src_path.clone())
-            .map_err(move |err| format_err!("unable to open file {:?} - {}", src_path, err))
-            .and_then(move |file| {
-                let contents = vec![];
-                tokio::io::read_to_end(file, contents)
-                    .map_err(Error::from)
-                    .and_then(move |(_, contents)| {
-                        let blob = DataBlob::encode(&contents, crypt_config, compress)?;
-                        let raw_data = blob.into_inner();
-                        Ok((raw_data, contents.len() as u64))
-                    })
-                    .and_then(move |(raw_data, size)| {
-                        let csum = openssl::sha::sha256(&raw_data);
-                        let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
-                        h2.upload("blob", Some(param), raw_data)
-                            .map(move |_| {
-                                BackupStats { size, csum }
-                            })
-                    })
+        async move {
+            let mut file = tokio::fs::File::open(src_path.clone())
+                .await
+                .map_err(move |err| format_err!("unable to open file {:?} - {}", src_path, err))?;
+
+            let mut contents = Vec::new();
+            file.read_to_end(&mut contents).await.map_err(Error::from)?;
+
+            let size: u64 = contents.len() as u64;
+            let blob = DataBlob::encode(&contents, crypt_config, compress)?;
+            let raw_data = blob.into_inner();
+            let csum = openssl::sha::sha256(&raw_data);
+            let param = json!({
+                "encoded-size": raw_data.len(),
+                "file-name": file_name,
             });
-
-        task
+            h2.upload("blob", Some(param), raw_data).await?;
+            Ok(BackupStats { size, csum })
+        }
     }
 
     pub fn upload_stream(
         &self,
         archive_name: &str,
-        stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
+        stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
         prefix: &str,
         fixed_size: Option<u64>,
         crypt_config: Option<Arc<CryptConfig>>,
-    ) -> impl Future<Item=BackupStats, Error=Error> {
-
+    ) -> impl Future<Output = Result<BackupStats, Error>> {
         let known_chunks = Arc::new(Mutex::new(HashSet::new()));
 
-        let h2 = self.h2.clone();
-        let h2_2 = self.h2.clone();
-        let h2_3 = self.h2.clone();
-        let h2_4 = self.h2.clone();
-
         let mut param = json!({ "archive-name": archive_name });
         if let Some(size) = fixed_size {
             param["size"] = size.into();
@@ -729,51 +739,59 @@ impl BackupClient {
 
         let prefix = prefix.to_owned();
 
-        Self::download_chunk_list(h2, &index_path, archive_name, known_chunks.clone())
-            .and_then(move |_| {
-                h2_2.post(&index_path, Some(param))
-            })
-            .and_then(move |res| {
-                let wid = res.as_u64().unwrap();
-                Self::upload_chunk_info_stream(h2_3, wid, stream, &prefix, known_chunks.clone(), crypt_config)
-                    .and_then(move |(chunk_count, size, _speed, csum)| {
-                        let param = json!({
-                            "wid": wid ,
-                            "chunk-count": chunk_count,
-                            "size": size,
-                        });
-                        h2_4.post(&close_path, Some(param))
-                            .map(move |_| {
-                                BackupStats { size: size as u64, csum }
-                            })
-                    })
+        let h2 = self.h2.clone();
+
+        let download_future =
+            Self::download_chunk_list(h2.clone(), &index_path, archive_name, known_chunks.clone());
+
+        async move {
+            download_future.await?;
+
+            let wid = h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
+
+            let (chunk_count, size, _speed, csum) = Self::upload_chunk_info_stream(
+                h2.clone(),
+                wid,
+                stream,
+                &prefix,
+                known_chunks.clone(),
+                crypt_config,
+            )
+            .await?;
+
+            let param = json!({
+                "wid": wid ,
+                "chunk-count": chunk_count,
+                "size": size,
+            });
+            let _value = h2.post(&close_path, Some(param)).await?;
+            Ok(BackupStats {
+                size: size as u64,
+                csum,
             })
+        }
     }
 
     fn response_queue() -> (
         mpsc::Sender<h2::client::ResponseFuture>,
-        sync::oneshot::Receiver<Result<(), Error>>
+        oneshot::Receiver<Result<(), Error>>
     ) {
         let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
-        let (verify_result_tx, verify_result_rx) = sync::oneshot::channel();
+        let (verify_result_tx, verify_result_rx) = oneshot::channel();
 
         hyper::rt::spawn(
             verify_queue_rx
-                .map_err(Error::from)
-                .for_each(|response: h2::client::ResponseFuture| {
+                .map(Ok::<_, Error>)
+                .try_for_each(|response: h2::client::ResponseFuture| {
                     response
                         .map_err(Error::from)
                         .and_then(H2Client::h2api_response)
-                        .and_then(|result| {
-                            println!("RESPONSE: {:?}", result);
-                            Ok(())
-                        })
+                        .map_ok(|result| println!("RESPONSE: {:?}", result))
                         .map_err(|err| format_err!("pipelined request failed: {}", err))
                 })
-                .then(|result|
-                      verify_result_tx.send(result)
-                )
-                .map_err(|_| { /* ignore closed channel */ })
+                .map(|result| {
+                      let _ignore_closed_channel = verify_result_tx.send(result);
+                })
         );
 
         (verify_queue_tx, verify_result_rx)
@@ -781,30 +799,30 @@ impl BackupClient {
 
     fn append_chunk_queue(h2: H2Client, wid: u64, path: String) -> (
         mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
-        sync::oneshot::Receiver<Result<(), Error>>
+        oneshot::Receiver<Result<(), Error>>
     ) {
         let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
-        let (verify_result_tx, verify_result_rx) = sync::oneshot::channel();
+        let (verify_result_tx, verify_result_rx) = oneshot::channel();
 
         let h2_2 = h2.clone();
 
         hyper::rt::spawn(
             verify_queue_rx
-                .map_err(Error::from)
+                .map(Ok::<_, Error>)
                 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
                     match (response, merged_chunk_info) {
                         (Some(response), MergedChunkInfo::Known(list)) => {
-                            future::Either::A(
+                            future::Either::Left(
                                 response
                                     .map_err(Error::from)
                                     .and_then(H2Client::h2api_response)
                                     .and_then(move |_result| {
-                                        Ok(MergedChunkInfo::Known(list))
+                                        future::ok(MergedChunkInfo::Known(list))
                                     })
                             )
                         }
                         (None, MergedChunkInfo::Known(list)) => {
-                            future::Either::B(future::ok(MergedChunkInfo::Known(list)))
+                            future::Either::Right(future::ok(MergedChunkInfo::Known(list)))
                         }
                         _ => unreachable!(),
                     }
@@ -831,18 +849,17 @@ impl BackupClient {
                                     response
                                         .map_err(Error::from)
                                         .and_then(H2Client::h2api_response)
-                                        .and_then(|_| Ok(()))
+                                        .map_ok(|_| ())
                                 })
                                 .map_err(|err| format_err!("pipelined request failed: {}", err))
                         }
                         _ => unreachable!(),
                     }
                 })
-                .for_each(|_| Ok(()))
-                .then(|result|
-                      verify_result_tx.send(result)
-                )
-                .map_err(|_| { /* ignore closed channel */ })
+                .try_for_each(|_| future::ok(()))
+                .map(|result| {
+                      let _ignore_closed_channel = verify_result_tx.send(result);
+                })
         );
 
         (verify_queue_tx, verify_result_rx)
@@ -853,7 +870,7 @@ impl BackupClient {
         path: &str,
         archive_name: &str,
         known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    ) -> impl Future<Item=(), Error=Error> {
+    ) -> impl Future<Output = Result<(), Error>> {
 
         let param = json!({ "archive-name": archive_name });
         let request = H2Client::request_builder("localhost", "GET", path, Some(param)).unwrap();
@@ -866,9 +883,12 @@ impl BackupClient {
                         let status = resp.status();
 
                         if !status.is_success() {
-                            future::Either::A(H2Client::h2api_response(resp).and_then(|_| { bail!("unknown error"); }))
+                            future::Either::Left(
+                                H2Client::h2api_response(resp)
+                                    .map(|_| Err(format_err!("unknown error")))
+                            )
                         } else {
-                            future::Either::B(future::ok(resp.into_body()))
+                            future::Either::Right(future::ok(resp.into_body()))
                         }
                     })
                     .and_then(move |mut body| {
@@ -876,11 +896,11 @@ impl BackupClient {
                         let mut release_capacity = body.release_capacity().clone();
 
                         DigestListDecoder::new(body.map_err(Error::from))
-                            .for_each(move |chunk| {
+                            .try_for_each(move |chunk| {
                                 let _ = release_capacity.release_capacity(chunk.len());
                                 println!("GOT DOWNLOAD {}", digest_to_hex(&chunk));
                                 known_chunks.lock().unwrap().insert(chunk);
-                                Ok(())
+                                futures::future::ok(())
                             })
                        })
             })
@@ -889,22 +909,23 @@ impl BackupClient {
     fn upload_chunk_info_stream(
         h2: H2Client,
         wid: u64,
-        stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
+        stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
         prefix: &str,
         known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
         crypt_config: Option<Arc<CryptConfig>>,
-    ) -> impl Future<Item=(usize, usize, usize, [u8; 32]), Error=Error> {
+    ) -> impl Future<Output = Result<(usize, usize, usize, [u8; 32]), Error>> {
 
-        let repeat = std::sync::Arc::new(AtomicUsize::new(0));
+        let repeat = Arc::new(AtomicUsize::new(0));
         let repeat2 = repeat.clone();
 
-        let stream_len = std::sync::Arc::new(AtomicUsize::new(0));
+        let stream_len = Arc::new(AtomicUsize::new(0));
         let stream_len2 = stream_len.clone();
 
         let append_chunk_path = format!("{}_index", prefix);
         let upload_chunk_path = format!("{}_chunk", prefix);
 
-        let (upload_queue, upload_result) = Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned());
+        let (upload_queue, upload_result) =
+            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned());
 
         let start_time = std::time::Instant::now();
 
@@ -936,21 +957,26 @@ impl BackupClient {
 
                 let chunk_is_known = known_chunks.contains(digest);
                 if chunk_is_known {
-                    Ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
+                    future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
                 } else {
                     known_chunks.insert(*digest);
-                    let chunk = chunk_builder.build()?;
-                    Ok(MergedChunkInfo::New(ChunkInfo { chunk, chunk_len: chunk_len as u64, offset }))
+                    future::ready(chunk_builder
+                        .build()
+                        .map(move |chunk| MergedChunkInfo::New(ChunkInfo {
+                            chunk,
+                            chunk_len: chunk_len as u64,
+                            offset,
+                        }))
+                    )
                 }
             })
             .merge_known_chunks()
-            .for_each(move |merged_chunk_info| {
+            .try_for_each(move |merged_chunk_info| {
 
                 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
                     let offset = chunk_info.offset;
                     let digest = *chunk_info.chunk.digest();
                     let digest_str = digest_to_hex(&digest);
-                    let upload_queue = upload_queue.clone();
 
                     println!("upload new chunk {} ({} bytes, offset {})", digest_str,
                              chunk_info.chunk_len, offset);
@@ -968,28 +994,29 @@ impl BackupClient {
 
                     let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
 
-                    future::Either::A(
-                        h2.send_request(request, upload_data)
-                            .and_then(move |response| {
-                                upload_queue.clone().send((new_info, Some(response)))
-                                    .map(|_| ()).map_err(Error::from)
-                            })
+                    let mut upload_queue = upload_queue.clone();
+                    future::Either::Left(h2
+                        .send_request(request, upload_data)
+                        .and_then(move |response| async move {
+                            upload_queue
+                                .send((new_info, Some(response)))
+                                .await
+                                .map_err(Error::from)
+                        })
                     )
                 } else {
-
-                    future::Either::B(
-                        upload_queue.clone().send((merged_chunk_info, None))
-                            .map(|_| ()).map_err(Error::from)
-                    )
+                    let mut upload_queue = upload_queue.clone();
+                    future::Either::Right(async move {
+                        upload_queue
+                            .send((merged_chunk_info, None))
+                            .await
+                            .map_err(Error::from)
+                    })
                 }
             })
-            .then(move |result| {
-                //println!("RESULT {:?}", result);
-                upload_result.map_err(Error::from).and_then(|upload1_result| {
-                    Ok(upload1_result.and(result))
-                })
-            })
-            .flatten()
+            .then(move |result| async move {
+                upload_result.await?.and(result)
+            }.boxed())
             .and_then(move |_| {
                 let repeat = repeat2.load(Ordering::SeqCst);
                 let stream_len = stream_len2.load(Ordering::SeqCst);
@@ -1003,11 +1030,11 @@ impl BackupClient {
                 let mut guard = index_csum_2.lock().unwrap();
                 let csum = guard.take().unwrap().finish();
 
-                Ok((repeat, stream_len, speed, csum))
+                futures::future::ok((repeat, stream_len, speed, csum))
             })
     }
 
-    pub fn upload_speedtest(&self) -> impl Future<Item=usize, Error=Error> {
+    pub fn upload_speedtest(&self) -> impl Future<Output = Result<usize, Error>> {
 
         let mut data = vec![];
         // generate pseudo random byte sequence
@@ -1020,7 +1047,7 @@ impl BackupClient {
 
         let item_len = data.len();
 
-        let repeat = std::sync::Arc::new(AtomicUsize::new(0));
+        let repeat = Arc::new(AtomicUsize::new(0));
         let repeat2 = repeat.clone();
 
         let (upload_queue, upload_result) = Self::response_queue();
@@ -1031,29 +1058,32 @@ impl BackupClient {
 
         futures::stream::repeat(data)
             .take_while(move |_| {
-                repeat.fetch_add(1, Ordering::SeqCst);
-                Ok(start_time.elapsed().as_secs() < 5)
+                let repeat = Arc::clone(&repeat);
+                async move {
+                    repeat.fetch_add(1, Ordering::SeqCst);
+                    start_time.elapsed().as_secs() < 5
+                }
             })
-            .for_each(move |data| {
+            .map(Ok)
+            .try_for_each(move |data| {
                 let h2 = h2.clone();
 
-                let upload_queue = upload_queue.clone();
+                let mut upload_queue = upload_queue.clone();
 
                 println!("send test data ({} bytes)", data.len());
                 let request = H2Client::request_builder("localhost", "POST", "speedtest", None).unwrap();
                 h2.send_request(request, Some(bytes::Bytes::from(data)))
-                    .and_then(move |response| {
-                        upload_queue.send(response)
-                            .map(|_| ()).map_err(Error::from)
+                    .and_then(move |response| async move {
+                        upload_queue
+                            .send(response)
+                            .await
+                            .map_err(Error::from)
                     })
             })
-            .then(move |result| {
+            .then(move |result| async move {
                 println!("RESULT {:?}", result);
-                upload_result.map_err(Error::from).and_then(|upload1_result| {
-                    Ok(upload1_result.and(result))
-                })
+                upload_result.await?.and(result)
             })
-            .flatten()
             .and_then(move |_| {
                 let repeat = repeat2.load(Ordering::SeqCst);
                 println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
@@ -1061,7 +1091,7 @@ impl BackupClient {
                 if repeat > 0 {
                     println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
                 }
-                Ok(speed)
+                futures::future::ok(speed)
             })
     }
 }
@@ -1077,22 +1107,27 @@ impl H2Client {
         Self { h2 }
     }
 
-    pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
+    pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Output = Result<Value, Error>> {
         let req = Self::request_builder("localhost", "GET", path, param).unwrap();
         self.request(req)
     }
 
-    pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
+    pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Output = Result<Value, Error>> {
         let req = Self::request_builder("localhost", "PUT", path, param).unwrap();
         self.request(req)
     }
 
-    pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
+    pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Output = Result<Value, Error>> {
         let req = Self::request_builder("localhost", "POST", path, param).unwrap();
         self.request(req)
     }
 
-    pub fn download<W: Write>(&self, path: &str, param: Option<Value>, output: W) -> impl Future<Item=W, Error=Error> {
+    pub fn download<W: Write + Send + 'static>(
+        &self,
+        path: &str,
+        param: Option<Value>,
+        output: W,
+    ) -> impl Future<Output = Result<W, Error>> {
         let request = Self::request_builder("localhost", "GET", path, param).unwrap();
 
         self.send_request(request, None)
@@ -1102,21 +1137,24 @@ impl H2Client {
                     .and_then(move |resp| {
                         let status = resp.status();
                         if !status.is_success() {
-                            future::Either::A(
+                            future::Either::Left(
                                 H2Client::h2api_response(resp)
-                                    .and_then(|_| { bail!("unknown error"); })
+                                    .map(|_| Err(format_err!("unknown error")))
                             )
                         } else {
                             let mut body = resp.into_body();
-                            let mut release_capacity = body.release_capacity().clone();
+                            let release_capacity = body.release_capacity().clone();
 
-                            future::Either::B(
+                            future::Either::Right(
                                 body
                                     .map_err(Error::from)
-                                    .fold(output, move |mut acc, chunk| {
-                                        let _ = release_capacity.release_capacity(chunk.len());
-                                        acc.write_all(&chunk)?;
-                                        Ok::<_, Error>(acc)
+                                    .try_fold(output, move |mut acc, chunk| {
+                                        let mut release_capacity = release_capacity.clone();
+                                        async move {
+                                            let _ = release_capacity.release_capacity(chunk.len());
+                                            acc.write_all(&chunk)?;
+                                            Ok::<_, Error>(acc)
+                                        }
                                     })
                             )
                         }
@@ -1124,7 +1162,12 @@ impl H2Client {
             })
     }
 
-    pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
+    pub fn upload(
+        &self,
+        path: &str,
+        param: Option<Value>,
+        data: Vec<u8>,
+    ) -> impl Future<Output = Result<Value, Error>> {
         let request = Self::request_builder("localhost", "POST", path, param).unwrap();
 
         self.h2.clone()
@@ -1144,7 +1187,7 @@ impl H2Client {
     fn request(
         &self,
         request: Request<()>,
-    ) -> impl Future<Item=Value, Error=Error> {
+    ) -> impl Future<Output = Result<Value, Error>> {
 
         self.send_request(request, None)
             .and_then(move |response| {
@@ -1158,7 +1201,7 @@ impl H2Client {
         &self,
         request: Request<()>,
         data: Option<bytes::Bytes>,
-    ) -> impl Future<Item=h2::client::ResponseFuture, Error=Error> {
+    ) -> impl Future<Output = Result<h2::client::ResponseFuture, Error>> {
 
         self.h2.clone()
             .ready()
@@ -1166,19 +1209,20 @@ impl H2Client {
             .and_then(move |mut send_request| {
                 if let Some(data) = data {
                     let (response, stream) = send_request.send_request(request, false).unwrap();
-                    future::Either::A(PipeToSendStream::new(data, stream)
+                    future::Either::Left(PipeToSendStream::new(data, stream)
                         .and_then(move |_| {
                             future::ok(response)
                         }))
                 } else {
                     let (response, _stream) = send_request.send_request(request, true).unwrap();
-                    future::Either::B(future::ok(response))
+                    future::Either::Right(future::ok(response))
                 }
             })
     }
 
-    fn h2api_response(response: Response<h2::RecvStream>) -> impl Future<Item=Value, Error=Error> {
-
+    fn h2api_response(
+        response: Response<h2::RecvStream>,
+    ) -> impl Future<Output = Result<Value, Error>> {
         let status = response.status();
 
         let (_head, mut body) = response.into_parts();
@@ -1192,14 +1236,14 @@ impl H2Client {
         let mut release_capacity = body.release_capacity().clone();
 
         body
-            .map(move |chunk| {
+            .map_ok(move |chunk| {
                 // Let the server send more data.
                 let _ = release_capacity.release_capacity(chunk.len());
                 chunk
             })
-            .concat2()
+            .try_concat()
             .map_err(Error::from)
-            .and_then(move |data| {
+            .and_then(move |data| async move {
                 let text = String::from_utf8(data.to_vec()).unwrap();
                 if status.is_success() {
                     if text.len() > 0 {
@@ -1216,7 +1260,7 @@ impl H2Client {
                 } else {
                     bail!("HTTP Error {}: {}", status, text);
                 }
-            })
+            }.boxed())
     }
 
     // Note: We always encode parameters with the url