From 5a2df00004e1365b71e7eea5ad34522379e7cbc2 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Sun, 28 Apr 2019 10:55:03 +0200 Subject: [PATCH] src/client/http_client.rs: cleanup, make login fully async --- src/bin/proxmox-backup-client.rs | 35 +-- src/client/http_client.rs | 387 ++++++++++++------------------- 2 files changed, 164 insertions(+), 258 deletions(-) diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index 4df7de16..e28640ce 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -24,6 +24,7 @@ use regex::Regex; use xdg::BaseDirectories; use lazy_static::lazy_static; +use futures::*; lazy_static! { static ref BACKUPSPEC_REGEX: Regex = Regex::new(r"^([a-zA-Z0-9_-]+\.(?:pxar|raw)):(.+)$").unwrap(); @@ -135,7 +136,7 @@ fn backup_directory>( let body = Body::wrap_stream(stream); - client.upload("application/x-proxmox-backup-pxar", body, &path)?; + client.upload("application/x-proxmox-backup-pxar", body, &path).wait()?; Ok(()) } @@ -235,11 +236,11 @@ fn list_backup_groups( let repo_url = tools::required_string_param(¶m, "repository")?; let repo: BackupRepository = repo_url.parse()?; - let mut client = HttpClient::new(repo.host(), repo.user()); + let client = HttpClient::new(repo.host(), repo.user()); let path = format!("api2/json/admin/datastore/{}/groups", repo.store()); - let mut result = client.get(&path)?; + let mut result = client.get(&path).wait()?; record_repository(&repo); @@ -300,12 +301,12 @@ fn list_snapshots( "backup-id": group.backup_id(), }))?; - let mut client = HttpClient::new(repo.host(), repo.user()); + let client = HttpClient::new(repo.host(), repo.user()); let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), query); // fixme: params - let result = client.get(&path)?; + let result = client.get(&path).wait()?; record_repository(&repo); @@ -353,7 +354,7 @@ fn forget_snapshots( let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), query); - let result = client.delete(&path)?; + let result = client.delete(&path).wait()?; record_repository(&repo); @@ -373,7 +374,7 @@ fn start_garbage_collection( let path = format!("api2/json/admin/datastore/{}/gc", repo.store()); - let result = client.post(&path)?; + let result = client.post(&path, None).wait()?; record_repository(&repo); @@ -446,8 +447,6 @@ fn create_backup( let mut client = HttpClient::new(repo.host(), repo.user()); - client.login()?; // login before starting backup - record_repository(&repo); println!("Starting backup"); @@ -503,8 +502,6 @@ fn restore( let mut client = HttpClient::new(repo.host(), repo.user()); - client.login()?; // login before starting - record_repository(&repo); let path = tools::required_string_param(¶m, "snapshot")?; @@ -520,7 +517,7 @@ fn restore( }))?; let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), subquery); - let result = client.get(&path)?; + let result = client.get(&path).wait()?; let list = result["data"].as_array().unwrap(); if list.len() == 0 { @@ -553,7 +550,7 @@ fn restore( let target = PathBuf::from(target); let writer = PxarDecodeWriter::new(&target, true)?; - client.download(&path, Box::new(writer))?; + client.download(&path, Box::new(writer)).wait()?; } else { bail!("unknown file extensions - unable to download '{}'", archive_name); } @@ -576,18 +573,19 @@ fn prune( param.as_object_mut().unwrap().remove("repository"); - let result = client.post_json(&path, param)?; + let result = client.post(&path, Some(param)).wait()?; record_repository(&repo); Ok(result) } +// like get, but simply ignore errors and return Null instead fn try_get(repo: &BackupRepository, url: &str) -> Value { - let mut client = HttpClient::new(repo.host(), repo.user()); + let client = HttpClient::new(repo.host(), repo.user()); - let mut resp = match client.try_get(url) { + let mut resp = match client.get(url).wait() { Ok(v) => v, _ => return Value::Null, }; @@ -858,6 +856,9 @@ fn main() { .insert("restore".to_owned(), restore_cmd_def.into()) .insert("snapshots".to_owned(), snapshots_cmd_def.into()); - run_cli_command(cmd_def.into()); + hyper::rt::run(futures::future::lazy(move || { + run_cli_command(cmd_def.into()); + Ok(()) + })); } diff --git a/src/client/http_client.rs b/src/client/http_client.rs index e13a0ea3..17ad0017 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -3,25 +3,32 @@ use failure::*; use http::Uri; use hyper::Body; use hyper::client::Client; -use hyper::rt::{self, Future}; use xdg::BaseDirectories; use chrono::Utc; use http::Request; +use http::header::HeaderValue; + +use futures::Future; use futures::stream::Stream; use serde_json::{json, Value}; use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET}; -use crate::tools::{self, tty}; +use crate::tools::{self, BroadcastFuture, tty}; + +#[derive(Clone)] +struct AuthInfo { + username: String, + ticket: String, + token: String, +} /// HTTP(S) API client pub struct HttpClient { - username: String, + client: Client>, server: String, - - ticket: Option, - token: Option + auth: BroadcastFuture, } fn store_ticket_info(server: &str, username: &str, ticket: &str, token: &str) -> Result<(), Error> { @@ -104,15 +111,17 @@ fn load_ticket_info(server: &str, username: &str) -> Option<(String, String)> { impl HttpClient { pub fn new(server: &str, username: &str) -> Self { + let client = Self::build_client(); + let login = Self::credentials(client.clone(), server, username); + Self { + client, server: String::from(server), - username: String::from(username), - ticket: None, - token: None, + auth: BroadcastFuture::new(login), } } - fn get_password(&self) -> Result { + fn get_password(_username: &str) -> Result { use std::env::VarError::*; match std::env::var("PBS_PASSWORD") { Ok(p) => return Ok(p), @@ -130,308 +139,204 @@ impl HttpClient { bail!("no password input mechanism available"); } - fn build_client() -> Result>, Error> { + fn build_client() -> Client> { let mut builder = native_tls::TlsConnector::builder(); // FIXME: We need a CLI option for this! builder.danger_accept_invalid_certs(true); - let tlsconnector = builder.build()?; + let tlsconnector = builder.build().unwrap(); let mut httpc = hyper::client::HttpConnector::new(1); httpc.enforce_http(false); // we want https... let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector)); https.https_only(true); // force it! - let client = Client::builder().build::<_, Body>(https); - Ok(client) + Client::builder().build::<_, Body>(https) } - fn run_request( - request: Request, - ) -> Result { - let client = Self::build_client()?; + pub fn request(&self, mut req: Request) -> impl Future { - let (tx, rx) = std::sync::mpsc::channel(); + let login = self.auth.listen(); - let future = client - .request(request) - .map_err(Error::from) - .and_then(|resp| { + let client = self.client.clone(); - let status = resp.status(); + login.and_then(move |auth| { - resp.into_body().concat2().map_err(Error::from) - .and_then(move |data| { + let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); + req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap()); + req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap()); - let text = String::from_utf8(data.to_vec()).unwrap(); - if status.is_success() { - if text.len() > 0 { - let value: Value = serde_json::from_str(&text)?; - Ok(value) - } else { - Ok(Value::Null) - } - } else { - bail!("HTTP Error {}: {}", status, text); - } - }) - }) - .then(move |res| { - tx.send(res).unwrap(); - Ok(()) - }); + let request = Self::api_request(client, req); - // drop client, else client keeps connectioon open (keep-alive feature) - drop(client); - - rt::run(future); - - rx.recv().unwrap() + request + }) } - fn run_download( - request: Request, - mut output: Box, - ) -> Result<(), Error> { - let client = Self::build_client()?; - - let (tx, rx) = std::sync::mpsc::channel(); - - let future = client - .request(request) - .map_err(Error::from) - .and_then(move |resp| { - - let _status = resp.status(); // fixme: ?? + pub fn get(&self, path: &str) -> impl Future { - resp.into_body() - .map_err(Error::from) - .for_each(move |chunk| { - output.write_all(&chunk)?; - Ok(()) - }) - - }) - .then(move |res| { - tx.send(res).unwrap(); - Ok(()) - }); - - // drop client, else client keeps connectioon open (keep-alive feature) - drop(client); - - rt::run(future); - - rx.recv().unwrap() + let req = Self::request_builder(&self.server, "GET", path, None).unwrap(); + self.request(req) } - pub fn download(&mut self, path: &str, output: Box) -> Result<(), Error> { - - let path = path.trim_matches('/'); - let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?; - - let (ticket, _token) = self.login()?; - - let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string(); - - let request = Request::builder() - .method("GET") - .uri(url) - .header("User-Agent", "proxmox-backup-client/1.0") - .header("Cookie", format!("PBSAuthCookie={}", enc_ticket)) - .body(Body::empty())?; + pub fn delete(&mut self, path: &str) -> impl Future { - Self::run_download(request, output) + let req = Self::request_builder(&self.server, "DELETE", path, None).unwrap(); + self.request(req) } - pub fn get(&mut self, path: &str) -> Result { - - let path = path.trim_matches('/'); - let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?; - - let (ticket, _token) = self.login()?; - - let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string(); - - let request = Request::builder() - .method("GET") - .uri(url) - .header("User-Agent", "proxmox-backup-client/1.0") - .header("Cookie", format!("PBSAuthCookie={}", enc_ticket)) - .body(Body::empty())?; + pub fn post(&mut self, path: &str, data: Option) -> impl Future { - Self::run_request(request) + let req = Self::request_builder(&self.server, "POST", path, data).unwrap(); + self.request(req) } - /// like get(), but use existing credentials (never asks for password). - /// this simply fails when there is no ticket - pub fn try_get(&mut self, path: &str) -> Result { - - let path = path.trim_matches('/'); - let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?; - - let mut credentials = None; - - if let Some(ref ticket) = self.ticket { - if let Some(ref token) = self.token { - credentials = Some((ticket.clone(), token.clone())); - } - } - - if credentials == None { - if let Some((ticket, token)) = load_ticket_info(&self.server, &self.username) { - credentials = Some((ticket.clone(), token.clone())); - } - } - - if credentials == None { - bail!("unable to get credentials"); - } + pub fn download(&mut self, path: &str, mut output: Box) -> impl Future { - let (ticket, _token) = credentials.unwrap(); + let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap(); - let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string(); + let login = self.auth.listen(); - let request = Request::builder() - .method("GET") - .uri(url) - .header("User-Agent", "proxmox-backup-client/1.0") - .header("Cookie", format!("PBSAuthCookie={}", enc_ticket)) - .body(Body::empty())?; + let client = self.client.clone(); - Self::run_request(request) - } + login.and_then(move |auth| { - pub fn delete(&mut self, path: &str) -> Result { + let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); + req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap()); - let path = path.trim_matches('/'); - let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?; + client.request(req) + .map_err(Error::from) + .and_then(|resp| { - let (ticket, token) = self.login()?; + let _status = resp.status(); // fixme: ?? - let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string(); + resp.into_body() + .map_err(Error::from) + .for_each(move |chunk| { + output.write_all(&chunk)?; + Ok(()) + }) - let request = Request::builder() - .method("DELETE") - .uri(url) - .header("User-Agent", "proxmox-backup-client/1.0") - .header("Cookie", format!("PBSAuthCookie={}", enc_ticket)) - .header("CSRFPreventionToken", token) - .body(Body::empty())?; - - Self::run_request(request) + }) + }) } - pub fn post(&mut self, path: &str) -> Result { + pub fn upload(&mut self, content_type: &str, body: Body, path: &str) -> impl Future { let path = path.trim_matches('/'); - let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?; - - let (ticket, token) = self.login()?; + let url: Uri = format!("https://{}:8007/{}", &self.server, path).parse().unwrap(); - let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string(); - - let request = Request::builder() + let req = Request::builder() .method("POST") .uri(url) .header("User-Agent", "proxmox-backup-client/1.0") - .header("Cookie", format!("PBSAuthCookie={}", enc_ticket)) - .header("CSRFPreventionToken", token) - .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded") - .body(Body::empty())?; + .header("Content-Type", content_type) + .body(body).unwrap(); - Self::run_request(request) + self.request(req) } - pub fn post_json(&mut self, path: &str, data: Value) -> Result { - - let path = path.trim_matches('/'); - let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?; + fn credentials( + client: Client>, + server: &str, + username: &str, + ) -> Box + Send> { - let (ticket, token) = self.login()?; + let server = server.to_owned(); + let server2 = server.to_owned(); + let username = username.to_owned(); - let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string(); + let create_request = futures::future::lazy(move || { - let request = Request::builder() - .method("POST") - .uri(url) - .header("User-Agent", "proxmox-backup-client/1.0") - .header("Cookie", format!("PBSAuthCookie={}", enc_ticket)) - .header("CSRFPreventionToken", token) - .header(hyper::header::CONTENT_TYPE, "application/json") - .body(Body::from(data.to_string()))?; + let data = if let Some((ticket, _token)) = load_ticket_info(&server, &username) { + json!({ "username": username, "password": ticket }) + } else { - Self::run_request(request) - } + let password = match Self::get_password(&username) { + Ok(p) => p, + Err(err) => { + return futures::future::Either::A(futures::future::err(err)); + } + }; - fn try_login(&mut self, password: &str) -> Result<(String, String), Error> { + json!({ "username": username, "password": password }) + }; - let url: Uri = format!("https://{}:8007/{}", self.server, "/api2/json/access/ticket").parse()?; + let req = Self::request_builder(&server, "POST", "/api2/json/access/ticket", Some(data)).unwrap(); - let query = url::form_urlencoded::Serializer::new(String::new()) - .append_pair("username", &self.username) - .append_pair("password", &password) - .finish(); + futures::future::Either::B(Self::api_request(client, req)) + }); - let request = Request::builder() - .method("POST") - .uri(url) - .header("User-Agent", "proxmox-backup-client/1.0") - .header("Content-Type", "application/x-www-form-urlencoded") - .body(Body::from(query))?; + let login_future = create_request + .and_then(move |cred| { + let auth = AuthInfo { + username: cred["data"]["username"].as_str().unwrap().to_owned(), + ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(), + token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(), + }; - let auth_res = Self::run_request(request)?; + let _ = store_ticket_info(&server2, &auth.username, &auth.ticket, &auth.token); - let ticket = match auth_res["data"]["ticket"].as_str() { - Some(t) => t, - None => bail!("got unexpected respose for login request."), - }; - let token = match auth_res["data"]["CSRFPreventionToken"].as_str() { - Some(t) => t, - None => bail!("got unexpected respose for login request."), - }; + Ok(auth) + }); - Ok((ticket.to_owned(), token.to_owned())) + Box::new(login_future) } - pub fn login(&mut self) -> Result<(String, String), Error> { - - if let Some(ref ticket) = self.ticket { - if let Some(ref token) = self.token { - return Ok((ticket.clone(), token.clone())); - } - } + fn api_request( + client: Client>, + req: Request + ) -> impl Future { - if let Some((ticket, _token)) = load_ticket_info(&self.server, &self.username) { - if let Ok((ticket, token)) = self.try_login(&ticket) { - let _ = store_ticket_info(&self.server, &self.username, &ticket, &token); - return Ok((ticket.to_owned(), token.to_owned())) - } - } + client.request(req) + .map_err(Error::from) + .and_then(|resp| { - let password = self.get_password()?; - let (ticket, token) = self.try_login(&password)?; + let status = resp.status(); - let _ = store_ticket_info(&self.server, &self.username, &ticket, &token); + resp + .into_body() + .concat2() + .map_err(Error::from) + .and_then(move |data| { - Ok((ticket.to_owned(), token.to_owned())) + let text = String::from_utf8(data.to_vec()).unwrap(); + if status.is_success() { + if text.len() > 0 { + let value: Value = serde_json::from_str(&text)?; + Ok(value) + } else { + Ok(Value::Null) + } + } else { + bail!("HTTP Error {}: {}", status, text); + } + }) + }) } - pub fn upload(&mut self, content_type: &str, body: Body, path: &str) -> Result { - + pub fn request_builder(server: &str, method: &str, path: &str, data: Option) -> Result, Error> { let path = path.trim_matches('/'); - let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?; - - let (ticket, token) = self.login()?; + let url: Uri = format!("https://{}:8007/{}", server, path).parse()?; + + if let Some(data) = data { + if method == "POST" { + let request = Request::builder() + .method(method) + .uri(url) + .header("User-Agent", "proxmox-backup-client/1.0") + .header(hyper::header::CONTENT_TYPE, "application/json") + .body(Body::from(data.to_string()))?; + return Ok(request); + } else { + unimplemented!(); + } - let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string(); + } let request = Request::builder() - .method("POST") + .method(method) .uri(url) .header("User-Agent", "proxmox-backup-client/1.0") - .header("Cookie", format!("PBSAuthCookie={}", enc_ticket)) - .header("CSRFPreventionToken", token) - .header("Content-Type", content_type) - .body(body)?; + .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded") + .body(Body::empty())?; - Self::run_request(request) + Ok(request) } } -- 2.39.5