]> git.proxmox.com Git - proxmox-backup.git/commitdiff
src/client/http_client.rs: cleanup, make login fully async
authorDietmar Maurer <dietmar@proxmox.com>
Sun, 28 Apr 2019 08:55:03 +0000 (10:55 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Sun, 28 Apr 2019 08:55:03 +0000 (10:55 +0200)
src/bin/proxmox-backup-client.rs
src/client/http_client.rs

index 4df7de160be02fa761cdddc98ee942193d2e537a..e28640ce984a21fa104082153437aa692c766076 100644 (file)
@@ -24,6 +24,7 @@ use regex::Regex;
 use xdg::BaseDirectories;
 
 use lazy_static::lazy_static;
+use futures::*;
 
 lazy_static! {
     static ref BACKUPSPEC_REGEX: Regex = Regex::new(r"^([a-zA-Z0-9_-]+\.(?:pxar|raw)):(.+)$").unwrap();
@@ -135,7 +136,7 @@ fn backup_directory<P: AsRef<Path>>(
 
     let body = Body::wrap_stream(stream);
 
-    client.upload("application/x-proxmox-backup-pxar", body, &path)?;
+    client.upload("application/x-proxmox-backup-pxar", body, &path).wait()?;
 
     Ok(())
 }
@@ -235,11 +236,11 @@ fn list_backup_groups(
     let repo_url = tools::required_string_param(&param, "repository")?;
     let repo: BackupRepository = repo_url.parse()?;
 
-    let mut client = HttpClient::new(repo.host(), repo.user());
+    let client = HttpClient::new(repo.host(), repo.user());
 
     let path = format!("api2/json/admin/datastore/{}/groups", repo.store());
 
-    let mut result = client.get(&path)?;
+    let mut result = client.get(&path).wait()?;
 
     record_repository(&repo);
 
@@ -300,12 +301,12 @@ fn list_snapshots(
         "backup-id": group.backup_id(),
     }))?;
 
-    let mut client = HttpClient::new(repo.host(), repo.user());
+    let client = HttpClient::new(repo.host(), repo.user());
 
     let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), query);
 
     // fixme: params
-    let result = client.get(&path)?;
+    let result = client.get(&path).wait()?;
 
     record_repository(&repo);
 
@@ -353,7 +354,7 @@ fn forget_snapshots(
 
     let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), query);
 
-    let result = client.delete(&path)?;
+    let result = client.delete(&path).wait()?;
 
     record_repository(&repo);
 
@@ -373,7 +374,7 @@ fn start_garbage_collection(
 
     let path = format!("api2/json/admin/datastore/{}/gc", repo.store());
 
-    let result = client.post(&path)?;
+    let result = client.post(&path, None).wait()?;
 
     record_repository(&repo);
 
@@ -446,8 +447,6 @@ fn create_backup(
 
     let mut client = HttpClient::new(repo.host(), repo.user());
 
-    client.login()?; // login before starting backup
-
     record_repository(&repo);
 
     println!("Starting backup");
@@ -503,8 +502,6 @@ fn restore(
 
     let mut client = HttpClient::new(repo.host(), repo.user());
 
-    client.login()?; // login before starting
-
     record_repository(&repo);
 
     let path = tools::required_string_param(&param, "snapshot")?;
@@ -520,7 +517,7 @@ fn restore(
         }))?;
 
         let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), subquery);
-        let result = client.get(&path)?;
+        let result = client.get(&path).wait()?;
 
         let list = result["data"].as_array().unwrap();
         if list.len() == 0 {
@@ -553,7 +550,7 @@ fn restore(
 
         let target = PathBuf::from(target);
         let writer = PxarDecodeWriter::new(&target, true)?;
-        client.download(&path, Box::new(writer))?;
+        client.download(&path, Box::new(writer)).wait()?;
     } else {
         bail!("unknown file extensions - unable to download '{}'", archive_name);
     }
@@ -576,18 +573,19 @@ fn prune(
 
     param.as_object_mut().unwrap().remove("repository");
 
-    let result = client.post_json(&path, param)?;
+    let result = client.post(&path, Some(param)).wait()?;
 
     record_repository(&repo);
 
     Ok(result)
 }
 
+// like get, but simply ignore errors and return Null instead
 fn try_get(repo: &BackupRepository, url: &str) -> Value {
 
-    let mut client = HttpClient::new(repo.host(), repo.user());
+    let client = HttpClient::new(repo.host(), repo.user());
 
-    let mut resp = match client.try_get(url) {
+    let mut resp = match client.get(url).wait() {
         Ok(v) => v,
         _ => return Value::Null,
     };
@@ -858,6 +856,9 @@ fn main() {
         .insert("restore".to_owned(), restore_cmd_def.into())
         .insert("snapshots".to_owned(), snapshots_cmd_def.into());
 
-    run_cli_command(cmd_def.into());
+    hyper::rt::run(futures::future::lazy(move || {
+        run_cli_command(cmd_def.into());
+        Ok(())
+    }));
 
 }
index e13a0ea3b4a9c686a611acad0df9ba45abfb1600..17ad00170469c258ba36fc21086eb6ad71daa3f6 100644 (file)
@@ -3,25 +3,32 @@ use failure::*;
 use http::Uri;
 use hyper::Body;
 use hyper::client::Client;
-use hyper::rt::{self, Future};
 use xdg::BaseDirectories;
 use chrono::Utc;
 
 use http::Request;
+use http::header::HeaderValue;
+
+use futures::Future;
 use futures::stream::Stream;
 
 use serde_json::{json, Value};
 use url::percent_encoding::{percent_encode,  DEFAULT_ENCODE_SET};
 
-use crate::tools::{self, tty};
+use crate::tools::{self, BroadcastFuture, tty};
+
+#[derive(Clone)]
+struct AuthInfo {
+    username: String,
+    ticket: String,
+    token: String,
+}
 
 /// HTTP(S) API client
 pub struct HttpClient {
-    username: String,
+    client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
     server: String,
-
-    ticket: Option<String>,
-    token: Option<String>
+    auth: BroadcastFuture<AuthInfo>,
 }
 
 fn store_ticket_info(server: &str, username: &str, ticket: &str, token: &str) -> Result<(), Error> {
@@ -104,15 +111,17 @@ fn load_ticket_info(server: &str, username: &str) -> Option<(String, String)> {
 impl HttpClient {
 
     pub fn new(server: &str, username: &str) -> Self {
+        let client = Self::build_client();
+        let login = Self::credentials(client.clone(), server, username);
+
         Self {
+            client,
             server: String::from(server),
-            username: String::from(username),
-            ticket: None,
-            token: None,
+            auth: BroadcastFuture::new(login),
         }
     }
 
-    fn get_password(&self) -> Result<String, Error> {
+    fn get_password(_username: &str) -> Result<String, Error> {
         use std::env::VarError::*;
         match std::env::var("PBS_PASSWORD") {
             Ok(p) => return Ok(p),
@@ -130,308 +139,204 @@ impl HttpClient {
         bail!("no password input mechanism available");
     }
 
-    fn build_client() -> Result<Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>, Error> {
+    fn build_client() -> Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>> {
         let mut builder = native_tls::TlsConnector::builder();
         // FIXME: We need a CLI option for this!
         builder.danger_accept_invalid_certs(true);
-        let tlsconnector = builder.build()?;
+        let tlsconnector = builder.build().unwrap();
         let mut httpc = hyper::client::HttpConnector::new(1);
         httpc.enforce_http(false); // we want https...
         let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector));
         https.https_only(true); // force it!
-        let client = Client::builder().build::<_, Body>(https);
-        Ok(client)
+        Client::builder().build::<_, Body>(https)
     }
 
-    fn run_request(
-        request: Request<Body>,
-    ) -> Result<Value, Error> {
-        let client = Self::build_client()?;
+    pub fn request(&self, mut req: Request<Body>) -> impl Future<Item=Value, Error=Error>  {
 
-        let (tx, rx) = std::sync::mpsc::channel();
+        let login = self.auth.listen();
 
-        let future = client
-            .request(request)
-            .map_err(Error::from)
-            .and_then(|resp| {
+        let client = self.client.clone();
 
-                let status = resp.status();
+        login.and_then(move |auth| {
 
-                resp.into_body().concat2().map_err(Error::from)
-                    .and_then(move |data| {
+            let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
+            req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
+            req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap());
 
-                        let text = String::from_utf8(data.to_vec()).unwrap();
-                        if status.is_success() {
-                            if text.len() > 0 {
-                                let value: Value = serde_json::from_str(&text)?;
-                                Ok(value)
-                            } else {
-                                Ok(Value::Null)
-                            }
-                        } else {
-                            bail!("HTTP Error {}: {}", status, text);
-                        }
-                    })
-            })
-            .then(move |res| {
-                tx.send(res).unwrap();
-                Ok(())
-            });
+            let request = Self::api_request(client, req);
 
-        // drop client, else client keeps connectioon open (keep-alive feature)
-        drop(client);
-
-        rt::run(future);
-
-        rx.recv().unwrap()
+            request
+        })
     }
 
-    fn run_download(
-        request: Request<Body>,
-        mut output: Box<dyn std::io::Write + Send>,
-    ) -> Result<(), Error> {
-        let client = Self::build_client()?;
-
-        let (tx, rx) = std::sync::mpsc::channel();
-
-        let future = client
-            .request(request)
-            .map_err(Error::from)
-            .and_then(move |resp| {
-
-                let _status = resp.status(); // fixme: ??
+    pub fn get(&self, path: &str) -> impl Future<Item=Value, Error=Error> {
 
-                resp.into_body()
-                    .map_err(Error::from)
-                    .for_each(move |chunk| {
-                        output.write_all(&chunk)?;
-                        Ok(())
-                    })
-
-            })
-            .then(move |res| {
-                tx.send(res).unwrap();
-                Ok(())
-            });
-
-        // drop client, else client keeps connectioon open (keep-alive feature)
-        drop(client);
-
-        rt::run(future);
-
-        rx.recv().unwrap()
+        let req = Self::request_builder(&self.server, "GET", path, None).unwrap();
+        self.request(req)
     }
 
-    pub fn download(&mut self, path: &str, output: Box<dyn std::io::Write + Send>) -> Result<(), Error> {
-
-        let path = path.trim_matches('/');
-        let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
-
-        let (ticket, _token) = self.login()?;
-
-        let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
-
-        let request = Request::builder()
-            .method("GET")
-            .uri(url)
-            .header("User-Agent", "proxmox-backup-client/1.0")
-            .header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
-            .body(Body::empty())?;
+    pub fn delete(&mut self, path: &str) -> impl Future<Item=Value, Error=Error> {
 
-        Self::run_download(request, output)
+        let req = Self::request_builder(&self.server, "DELETE", path, None).unwrap();
+        self.request(req)
     }
 
-    pub fn get(&mut self, path: &str) -> Result<Value, Error> {
-
-        let path = path.trim_matches('/');
-        let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
-
-        let (ticket, _token) = self.login()?;
-
-        let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
-
-        let request = Request::builder()
-            .method("GET")
-            .uri(url)
-            .header("User-Agent", "proxmox-backup-client/1.0")
-            .header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
-            .body(Body::empty())?;
+    pub fn post(&mut self, path: &str, data: Option<Value>) -> impl Future<Item=Value, Error=Error> {
 
-        Self::run_request(request)
+        let req = Self::request_builder(&self.server, "POST", path, data).unwrap();
+        self.request(req)
     }
 
-    /// like get(), but use existing credentials (never asks for password).
-    /// this simply fails when there is no ticket
-    pub fn try_get(&mut self, path: &str) -> Result<Value, Error> {
-
-        let path = path.trim_matches('/');
-        let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
-
-        let mut credentials = None;
-
-        if let Some(ref ticket) = self.ticket {
-            if let Some(ref token) = self.token {
-                credentials = Some((ticket.clone(), token.clone()));
-            }
-        }
-
-        if credentials == None {
-            if let Some((ticket, token)) = load_ticket_info(&self.server, &self.username) {
-                credentials = Some((ticket.clone(), token.clone()));
-            }
-        }
-
-        if credentials == None {
-            bail!("unable to get credentials");
-        }
+    pub fn download(&mut self, path: &str, mut output: Box<dyn std::io::Write + Send>) ->  impl Future<Item=(), Error=Error> {
 
-        let (ticket, _token) = credentials.unwrap();
+        let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap();
 
-        let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
+        let login = self.auth.listen();
 
-        let request = Request::builder()
-            .method("GET")
-            .uri(url)
-            .header("User-Agent", "proxmox-backup-client/1.0")
-            .header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
-            .body(Body::empty())?;
+        let client = self.client.clone();
 
-        Self::run_request(request)
-    }
+        login.and_then(move |auth| {
 
-    pub fn delete(&mut self, path: &str) -> Result<Value, Error> {
+            let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
+            req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
 
-        let path = path.trim_matches('/');
-        let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
+            client.request(req)
+                .map_err(Error::from)
+                .and_then(|resp| {
 
-        let (ticket, token) = self.login()?;
+                    let _status = resp.status(); // fixme: ??
 
-        let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
+                    resp.into_body()
+                        .map_err(Error::from)
+                        .for_each(move |chunk| {
+                            output.write_all(&chunk)?;
+                            Ok(())
+                        })
 
-        let request = Request::builder()
-            .method("DELETE")
-            .uri(url)
-            .header("User-Agent", "proxmox-backup-client/1.0")
-            .header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
-            .header("CSRFPreventionToken", token)
-            .body(Body::empty())?;
-
-        Self::run_request(request)
+                })
+        })
     }
 
-    pub fn post(&mut self, path: &str) -> Result<Value, Error> {
+    pub fn upload(&mut self, content_type: &str, body: Body, path: &str) -> impl Future<Item=Value, Error=Error> {
 
         let path = path.trim_matches('/');
-        let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
-
-        let (ticket, token) = self.login()?;
+        let url: Uri = format!("https://{}:8007/{}", &self.server, path).parse().unwrap();
 
-        let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
-
-        let request = Request::builder()
+        let req = Request::builder()
             .method("POST")
             .uri(url)
             .header("User-Agent", "proxmox-backup-client/1.0")
-            .header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
-            .header("CSRFPreventionToken", token)
-            .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
-            .body(Body::empty())?;
+            .header("Content-Type", content_type)
+            .body(body).unwrap();
 
-        Self::run_request(request)
+        self.request(req)
     }
 
-    pub fn post_json(&mut self, path: &str, data: Value) -> Result<Value, Error> {
-
-        let path = path.trim_matches('/');
-        let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
+    fn credentials(
+        client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
+        server: &str,
+        username: &str,
+    ) -> Box<Future<Item=AuthInfo, Error=Error> + Send> {
 
-        let (ticket, token) = self.login()?;
+        let server = server.to_owned();
+        let server2 = server.to_owned();
+        let username = username.to_owned();
 
-        let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
+        let create_request = futures::future::lazy(move || {
 
-        let request = Request::builder()
-            .method("POST")
-            .uri(url)
-            .header("User-Agent", "proxmox-backup-client/1.0")
-            .header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
-            .header("CSRFPreventionToken", token)
-            .header(hyper::header::CONTENT_TYPE, "application/json")
-            .body(Body::from(data.to_string()))?;
+            let data = if let Some((ticket, _token)) = load_ticket_info(&server, &username) {
+                json!({ "username": username, "password": ticket })
+            } else {
 
-        Self::run_request(request)
-    }
+                let password = match Self::get_password(&username) {
+                    Ok(p) => p,
+                    Err(err) => {
+                        return futures::future::Either::A(futures::future::err(err));
+                    }
+                };
 
-    fn try_login(&mut self, password: &str) -> Result<(String, String), Error> {
+                json!({ "username": username, "password": password })
+            };
 
-        let url: Uri = format!("https://{}:8007/{}", self.server, "/api2/json/access/ticket").parse()?;
+            let req = Self::request_builder(&server, "POST", "/api2/json/access/ticket", Some(data)).unwrap();
 
-        let query = url::form_urlencoded::Serializer::new(String::new())
-            .append_pair("username", &self.username)
-            .append_pair("password", &password)
-            .finish();
+            futures::future::Either::B(Self::api_request(client, req))
+        });
 
-        let request = Request::builder()
-            .method("POST")
-            .uri(url)
-            .header("User-Agent", "proxmox-backup-client/1.0")
-            .header("Content-Type", "application/x-www-form-urlencoded")
-            .body(Body::from(query))?;
+        let login_future = create_request
+            .and_then(move |cred| {
+                let auth = AuthInfo {
+                    username: cred["data"]["username"].as_str().unwrap().to_owned(),
+                    ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(),
+                    token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
+                };
 
-        let auth_res = Self::run_request(request)?;
+                let _ = store_ticket_info(&server2, &auth.username, &auth.ticket, &auth.token);
 
-        let ticket = match auth_res["data"]["ticket"].as_str() {
-            Some(t) => t,
-            None => bail!("got unexpected respose for login request."),
-        };
-        let token = match auth_res["data"]["CSRFPreventionToken"].as_str() {
-            Some(t) => t,
-            None => bail!("got unexpected respose for login request."),
-        };
+                Ok(auth)
+            });
 
-        Ok((ticket.to_owned(), token.to_owned()))
+        Box::new(login_future)
     }
 
-    pub fn login(&mut self) ->  Result<(String, String), Error> {
-
-        if let Some(ref ticket) = self.ticket {
-            if let Some(ref token) = self.token {
-                return Ok((ticket.clone(), token.clone()));
-            }
-        }
+    fn api_request(
+        client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
+        req: Request<Body>
+    ) -> impl Future<Item=Value, Error=Error> {
 
-        if let Some((ticket, _token)) = load_ticket_info(&self.server, &self.username) {
-            if let Ok((ticket, token)) = self.try_login(&ticket) {
-                let _ = store_ticket_info(&self.server, &self.username, &ticket, &token);
-                return Ok((ticket.to_owned(), token.to_owned()))
-            }
-        }
+        client.request(req)
+            .map_err(Error::from)
+            .and_then(|resp| {
 
-        let password = self.get_password()?;
-        let (ticket, token) = self.try_login(&password)?;
+                let status = resp.status();
 
-        let _ = store_ticket_info(&self.server, &self.username, &ticket, &token);
+                resp
+                    .into_body()
+                    .concat2()
+                    .map_err(Error::from)
+                    .and_then(move |data| {
 
-        Ok((ticket.to_owned(), token.to_owned()))
+                        let text = String::from_utf8(data.to_vec()).unwrap();
+                        if status.is_success() {
+                            if text.len() > 0 {
+                                let value: Value = serde_json::from_str(&text)?;
+                                Ok(value)
+                            } else {
+                                Ok(Value::Null)
+                            }
+                        } else {
+                            bail!("HTTP Error {}: {}", status, text);
+                        }
+                    })
+            })
     }
 
-    pub fn upload(&mut self, content_type: &str, body: Body, path: &str) -> Result<Value, Error> {
-
+    pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> {
         let path = path.trim_matches('/');
-        let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
-
-        let (ticket, token) = self.login()?;
+        let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
+
+        if let Some(data) = data {
+            if method == "POST" {
+                let request = Request::builder()
+                    .method(method)
+                    .uri(url)
+                    .header("User-Agent", "proxmox-backup-client/1.0")
+                    .header(hyper::header::CONTENT_TYPE, "application/json")
+                    .body(Body::from(data.to_string()))?;
+                return Ok(request);
+            } else {
+                unimplemented!();
+            }
 
-        let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
+        }
 
         let request = Request::builder()
-            .method("POST")
+            .method(method)
             .uri(url)
             .header("User-Agent", "proxmox-backup-client/1.0")
-            .header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
-            .header("CSRFPreventionToken", token)
-            .header("Content-Type", content_type)
-            .body(body)?;
+            .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
+            .body(Body::empty())?;
 
-        Self::run_request(request)
+        Ok(request)
     }
 }