use xdg::BaseDirectories;
use lazy_static::lazy_static;
+use futures::*;
lazy_static! {
static ref BACKUPSPEC_REGEX: Regex = Regex::new(r"^([a-zA-Z0-9_-]+\.(?:pxar|raw)):(.+)$").unwrap();
let body = Body::wrap_stream(stream);
- client.upload("application/x-proxmox-backup-pxar", body, &path)?;
+ client.upload("application/x-proxmox-backup-pxar", body, &path).wait()?;
Ok(())
}
let repo_url = tools::required_string_param(¶m, "repository")?;
let repo: BackupRepository = repo_url.parse()?;
- let mut client = HttpClient::new(repo.host(), repo.user());
+ let client = HttpClient::new(repo.host(), repo.user());
let path = format!("api2/json/admin/datastore/{}/groups", repo.store());
- let mut result = client.get(&path)?;
+ let mut result = client.get(&path).wait()?;
record_repository(&repo);
"backup-id": group.backup_id(),
}))?;
- let mut client = HttpClient::new(repo.host(), repo.user());
+ let client = HttpClient::new(repo.host(), repo.user());
let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), query);
// fixme: params
- let result = client.get(&path)?;
+ let result = client.get(&path).wait()?;
record_repository(&repo);
let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), query);
- let result = client.delete(&path)?;
+ let result = client.delete(&path).wait()?;
record_repository(&repo);
let path = format!("api2/json/admin/datastore/{}/gc", repo.store());
- let result = client.post(&path)?;
+ let result = client.post(&path, None).wait()?;
record_repository(&repo);
let mut client = HttpClient::new(repo.host(), repo.user());
- client.login()?; // login before starting backup
-
record_repository(&repo);
println!("Starting backup");
let mut client = HttpClient::new(repo.host(), repo.user());
- client.login()?; // login before starting
-
record_repository(&repo);
let path = tools::required_string_param(¶m, "snapshot")?;
}))?;
let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), subquery);
- let result = client.get(&path)?;
+ let result = client.get(&path).wait()?;
let list = result["data"].as_array().unwrap();
if list.len() == 0 {
let target = PathBuf::from(target);
let writer = PxarDecodeWriter::new(&target, true)?;
- client.download(&path, Box::new(writer))?;
+ client.download(&path, Box::new(writer)).wait()?;
} else {
bail!("unknown file extensions - unable to download '{}'", archive_name);
}
param.as_object_mut().unwrap().remove("repository");
- let result = client.post_json(&path, param)?;
+ let result = client.post(&path, Some(param)).wait()?;
record_repository(&repo);
Ok(result)
}
+// like get, but simply ignore errors and return Null instead
fn try_get(repo: &BackupRepository, url: &str) -> Value {
- let mut client = HttpClient::new(repo.host(), repo.user());
+ let client = HttpClient::new(repo.host(), repo.user());
- let mut resp = match client.try_get(url) {
+ let mut resp = match client.get(url).wait() {
Ok(v) => v,
_ => return Value::Null,
};
.insert("restore".to_owned(), restore_cmd_def.into())
.insert("snapshots".to_owned(), snapshots_cmd_def.into());
- run_cli_command(cmd_def.into());
+ hyper::rt::run(futures::future::lazy(move || {
+ run_cli_command(cmd_def.into());
+ Ok(())
+ }));
}
use http::Uri;
use hyper::Body;
use hyper::client::Client;
-use hyper::rt::{self, Future};
use xdg::BaseDirectories;
use chrono::Utc;
use http::Request;
+use http::header::HeaderValue;
+
+use futures::Future;
use futures::stream::Stream;
use serde_json::{json, Value};
use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET};
-use crate::tools::{self, tty};
+use crate::tools::{self, BroadcastFuture, tty};
+
+#[derive(Clone)]
+struct AuthInfo {
+ username: String,
+ ticket: String,
+ token: String,
+}
/// HTTP(S) API client
pub struct HttpClient {
- username: String,
+ client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
server: String,
-
- ticket: Option<String>,
- token: Option<String>
+ auth: BroadcastFuture<AuthInfo>,
}
fn store_ticket_info(server: &str, username: &str, ticket: &str, token: &str) -> Result<(), Error> {
impl HttpClient {
pub fn new(server: &str, username: &str) -> Self {
+ let client = Self::build_client();
+ let login = Self::credentials(client.clone(), server, username);
+
Self {
+ client,
server: String::from(server),
- username: String::from(username),
- ticket: None,
- token: None,
+ auth: BroadcastFuture::new(login),
}
}
- fn get_password(&self) -> Result<String, Error> {
+ fn get_password(_username: &str) -> Result<String, Error> {
use std::env::VarError::*;
match std::env::var("PBS_PASSWORD") {
Ok(p) => return Ok(p),
bail!("no password input mechanism available");
}
- fn build_client() -> Result<Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>, Error> {
+ fn build_client() -> Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>> {
let mut builder = native_tls::TlsConnector::builder();
// FIXME: We need a CLI option for this!
builder.danger_accept_invalid_certs(true);
- let tlsconnector = builder.build()?;
+ let tlsconnector = builder.build().unwrap();
let mut httpc = hyper::client::HttpConnector::new(1);
httpc.enforce_http(false); // we want https...
let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector));
https.https_only(true); // force it!
- let client = Client::builder().build::<_, Body>(https);
- Ok(client)
+ Client::builder().build::<_, Body>(https)
}
- fn run_request(
- request: Request<Body>,
- ) -> Result<Value, Error> {
- let client = Self::build_client()?;
+ pub fn request(&self, mut req: Request<Body>) -> impl Future<Item=Value, Error=Error> {
- let (tx, rx) = std::sync::mpsc::channel();
+ let login = self.auth.listen();
- let future = client
- .request(request)
- .map_err(Error::from)
- .and_then(|resp| {
+ let client = self.client.clone();
- let status = resp.status();
+ login.and_then(move |auth| {
- resp.into_body().concat2().map_err(Error::from)
- .and_then(move |data| {
+ let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
+ req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
+ req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap());
- let text = String::from_utf8(data.to_vec()).unwrap();
- if status.is_success() {
- if text.len() > 0 {
- let value: Value = serde_json::from_str(&text)?;
- Ok(value)
- } else {
- Ok(Value::Null)
- }
- } else {
- bail!("HTTP Error {}: {}", status, text);
- }
- })
- })
- .then(move |res| {
- tx.send(res).unwrap();
- Ok(())
- });
+ let request = Self::api_request(client, req);
- // drop client, else client keeps connectioon open (keep-alive feature)
- drop(client);
-
- rt::run(future);
-
- rx.recv().unwrap()
+ request
+ })
}
- fn run_download(
- request: Request<Body>,
- mut output: Box<dyn std::io::Write + Send>,
- ) -> Result<(), Error> {
- let client = Self::build_client()?;
-
- let (tx, rx) = std::sync::mpsc::channel();
-
- let future = client
- .request(request)
- .map_err(Error::from)
- .and_then(move |resp| {
-
- let _status = resp.status(); // fixme: ??
+ pub fn get(&self, path: &str) -> impl Future<Item=Value, Error=Error> {
- resp.into_body()
- .map_err(Error::from)
- .for_each(move |chunk| {
- output.write_all(&chunk)?;
- Ok(())
- })
-
- })
- .then(move |res| {
- tx.send(res).unwrap();
- Ok(())
- });
-
- // drop client, else client keeps connectioon open (keep-alive feature)
- drop(client);
-
- rt::run(future);
-
- rx.recv().unwrap()
+ let req = Self::request_builder(&self.server, "GET", path, None).unwrap();
+ self.request(req)
}
- pub fn download(&mut self, path: &str, output: Box<dyn std::io::Write + Send>) -> Result<(), Error> {
-
- let path = path.trim_matches('/');
- let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
-
- let (ticket, _token) = self.login()?;
-
- let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
-
- let request = Request::builder()
- .method("GET")
- .uri(url)
- .header("User-Agent", "proxmox-backup-client/1.0")
- .header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
- .body(Body::empty())?;
+ pub fn delete(&mut self, path: &str) -> impl Future<Item=Value, Error=Error> {
- Self::run_download(request, output)
+ let req = Self::request_builder(&self.server, "DELETE", path, None).unwrap();
+ self.request(req)
}
- pub fn get(&mut self, path: &str) -> Result<Value, Error> {
-
- let path = path.trim_matches('/');
- let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
-
- let (ticket, _token) = self.login()?;
-
- let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
-
- let request = Request::builder()
- .method("GET")
- .uri(url)
- .header("User-Agent", "proxmox-backup-client/1.0")
- .header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
- .body(Body::empty())?;
+ pub fn post(&mut self, path: &str, data: Option<Value>) -> impl Future<Item=Value, Error=Error> {
- Self::run_request(request)
+ let req = Self::request_builder(&self.server, "POST", path, data).unwrap();
+ self.request(req)
}
- /// like get(), but use existing credentials (never asks for password).
- /// this simply fails when there is no ticket
- pub fn try_get(&mut self, path: &str) -> Result<Value, Error> {
-
- let path = path.trim_matches('/');
- let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
-
- let mut credentials = None;
-
- if let Some(ref ticket) = self.ticket {
- if let Some(ref token) = self.token {
- credentials = Some((ticket.clone(), token.clone()));
- }
- }
-
- if credentials == None {
- if let Some((ticket, token)) = load_ticket_info(&self.server, &self.username) {
- credentials = Some((ticket.clone(), token.clone()));
- }
- }
-
- if credentials == None {
- bail!("unable to get credentials");
- }
+ pub fn download(&mut self, path: &str, mut output: Box<dyn std::io::Write + Send>) -> impl Future<Item=(), Error=Error> {
- let (ticket, _token) = credentials.unwrap();
+ let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap();
- let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
+ let login = self.auth.listen();
- let request = Request::builder()
- .method("GET")
- .uri(url)
- .header("User-Agent", "proxmox-backup-client/1.0")
- .header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
- .body(Body::empty())?;
+ let client = self.client.clone();
- Self::run_request(request)
- }
+ login.and_then(move |auth| {
- pub fn delete(&mut self, path: &str) -> Result<Value, Error> {
+ let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
+ req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
- let path = path.trim_matches('/');
- let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
+ client.request(req)
+ .map_err(Error::from)
+ .and_then(|resp| {
- let (ticket, token) = self.login()?;
+ let _status = resp.status(); // fixme: ??
- let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
+ resp.into_body()
+ .map_err(Error::from)
+ .for_each(move |chunk| {
+ output.write_all(&chunk)?;
+ Ok(())
+ })
- let request = Request::builder()
- .method("DELETE")
- .uri(url)
- .header("User-Agent", "proxmox-backup-client/1.0")
- .header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
- .header("CSRFPreventionToken", token)
- .body(Body::empty())?;
-
- Self::run_request(request)
+ })
+ })
}
- pub fn post(&mut self, path: &str) -> Result<Value, Error> {
+ pub fn upload(&mut self, content_type: &str, body: Body, path: &str) -> impl Future<Item=Value, Error=Error> {
let path = path.trim_matches('/');
- let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
-
- let (ticket, token) = self.login()?;
+ let url: Uri = format!("https://{}:8007/{}", &self.server, path).parse().unwrap();
- let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
-
- let request = Request::builder()
+ let req = Request::builder()
.method("POST")
.uri(url)
.header("User-Agent", "proxmox-backup-client/1.0")
- .header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
- .header("CSRFPreventionToken", token)
- .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
- .body(Body::empty())?;
+ .header("Content-Type", content_type)
+ .body(body).unwrap();
- Self::run_request(request)
+ self.request(req)
}
- pub fn post_json(&mut self, path: &str, data: Value) -> Result<Value, Error> {
-
- let path = path.trim_matches('/');
- let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
+ fn credentials(
+ client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
+ server: &str,
+ username: &str,
+ ) -> Box<Future<Item=AuthInfo, Error=Error> + Send> {
- let (ticket, token) = self.login()?;
+ let server = server.to_owned();
+ let server2 = server.to_owned();
+ let username = username.to_owned();
- let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
+ let create_request = futures::future::lazy(move || {
- let request = Request::builder()
- .method("POST")
- .uri(url)
- .header("User-Agent", "proxmox-backup-client/1.0")
- .header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
- .header("CSRFPreventionToken", token)
- .header(hyper::header::CONTENT_TYPE, "application/json")
- .body(Body::from(data.to_string()))?;
+ let data = if let Some((ticket, _token)) = load_ticket_info(&server, &username) {
+ json!({ "username": username, "password": ticket })
+ } else {
- Self::run_request(request)
- }
+ let password = match Self::get_password(&username) {
+ Ok(p) => p,
+ Err(err) => {
+ return futures::future::Either::A(futures::future::err(err));
+ }
+ };
- fn try_login(&mut self, password: &str) -> Result<(String, String), Error> {
+ json!({ "username": username, "password": password })
+ };
- let url: Uri = format!("https://{}:8007/{}", self.server, "/api2/json/access/ticket").parse()?;
+ let req = Self::request_builder(&server, "POST", "/api2/json/access/ticket", Some(data)).unwrap();
- let query = url::form_urlencoded::Serializer::new(String::new())
- .append_pair("username", &self.username)
- .append_pair("password", &password)
- .finish();
+ futures::future::Either::B(Self::api_request(client, req))
+ });
- let request = Request::builder()
- .method("POST")
- .uri(url)
- .header("User-Agent", "proxmox-backup-client/1.0")
- .header("Content-Type", "application/x-www-form-urlencoded")
- .body(Body::from(query))?;
+ let login_future = create_request
+ .and_then(move |cred| {
+ let auth = AuthInfo {
+ username: cred["data"]["username"].as_str().unwrap().to_owned(),
+ ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(),
+ token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
+ };
- let auth_res = Self::run_request(request)?;
+ let _ = store_ticket_info(&server2, &auth.username, &auth.ticket, &auth.token);
- let ticket = match auth_res["data"]["ticket"].as_str() {
- Some(t) => t,
- None => bail!("got unexpected respose for login request."),
- };
- let token = match auth_res["data"]["CSRFPreventionToken"].as_str() {
- Some(t) => t,
- None => bail!("got unexpected respose for login request."),
- };
+ Ok(auth)
+ });
- Ok((ticket.to_owned(), token.to_owned()))
+ Box::new(login_future)
}
- pub fn login(&mut self) -> Result<(String, String), Error> {
-
- if let Some(ref ticket) = self.ticket {
- if let Some(ref token) = self.token {
- return Ok((ticket.clone(), token.clone()));
- }
- }
+ fn api_request(
+ client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
+ req: Request<Body>
+ ) -> impl Future<Item=Value, Error=Error> {
- if let Some((ticket, _token)) = load_ticket_info(&self.server, &self.username) {
- if let Ok((ticket, token)) = self.try_login(&ticket) {
- let _ = store_ticket_info(&self.server, &self.username, &ticket, &token);
- return Ok((ticket.to_owned(), token.to_owned()))
- }
- }
+ client.request(req)
+ .map_err(Error::from)
+ .and_then(|resp| {
- let password = self.get_password()?;
- let (ticket, token) = self.try_login(&password)?;
+ let status = resp.status();
- let _ = store_ticket_info(&self.server, &self.username, &ticket, &token);
+ resp
+ .into_body()
+ .concat2()
+ .map_err(Error::from)
+ .and_then(move |data| {
- Ok((ticket.to_owned(), token.to_owned()))
+ let text = String::from_utf8(data.to_vec()).unwrap();
+ if status.is_success() {
+ if text.len() > 0 {
+ let value: Value = serde_json::from_str(&text)?;
+ Ok(value)
+ } else {
+ Ok(Value::Null)
+ }
+ } else {
+ bail!("HTTP Error {}: {}", status, text);
+ }
+ })
+ })
}
- pub fn upload(&mut self, content_type: &str, body: Body, path: &str) -> Result<Value, Error> {
-
+ pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> {
let path = path.trim_matches('/');
- let url: Uri = format!("https://{}:8007/{}", self.server, path).parse()?;
-
- let (ticket, token) = self.login()?;
+ let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
+
+ if let Some(data) = data {
+ if method == "POST" {
+ let request = Request::builder()
+ .method(method)
+ .uri(url)
+ .header("User-Agent", "proxmox-backup-client/1.0")
+ .header(hyper::header::CONTENT_TYPE, "application/json")
+ .body(Body::from(data.to_string()))?;
+ return Ok(request);
+ } else {
+ unimplemented!();
+ }
- let enc_ticket = percent_encode(ticket.as_bytes(), DEFAULT_ENCODE_SET).to_string();
+ }
let request = Request::builder()
- .method("POST")
+ .method(method)
.uri(url)
.header("User-Agent", "proxmox-backup-client/1.0")
- .header("Cookie", format!("PBSAuthCookie={}", enc_ticket))
- .header("CSRFPreventionToken", token)
- .header("Content-Type", content_type)
- .body(body)?;
+ .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
+ .body(Body::empty())?;
- Self::run_request(request)
+ Ok(request)
}
}