use hyper::Body;
use hyper::client::Client;
use xdg::BaseDirectories;
-use chrono::{DateTime, Local, Utc};
+use chrono::{DateTime, Utc};
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use std::io::Write;
use serde_json::{json, Value};
use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET};
+use proxmox::tools::{
+ digest_to_hex,
+ fs::{file_get_json, file_set_contents},
+};
+
use crate::tools::{self, BroadcastFuture, tty};
use crate::tools::futures::{cancellable, Canceller};
use super::pipe_to_stream::*;
let mode = nix::sys::stat::Mode::from_bits_truncate(0o0600);
- let mut data = tools::file_get_json(&path, Some(json!({})))?;
+ let mut data = file_get_json(&path, Some(json!({})))?;
let now = Utc::now().timestamp();
}
}
- tools::file_set_contents(path, new_data.to_string().as_bytes(), Some(mode))?;
+ file_set_contents(path, new_data.to_string().as_bytes(), Some(mode))?;
Ok(())
}
_ => return None,
};
- let data = match tools::file_get_json(&path, None) {
+ let data = match file_get_json(&path, None) {
Ok(v) => v,
_ => return None,
};
let mut httpc = hyper::client::HttpConnector::new(1);
httpc.set_nodelay(true); // important for h2 download performance!
- httpc.set_recv_buf_size(Some(1024*1024)); //important for h2 download performance!
+ httpc.set_recv_buffer_size(Some(1024*1024)); //important for h2 download performance!
httpc.enforce_http(false); // we want https...
let https = hyper_openssl::HttpsConnector::with_connector(httpc, ssl_connector_builder).unwrap();
})
}
- pub fn upload(&mut self, content_type: &str, body: Body, path: &str) -> impl Future<Item=Value, Error=Error> {
+ pub fn upload(
+ &mut self,
+ content_type: &str,
+ body: Body,
+ path: &str,
+ data: Option<Value>,
+ ) -> impl Future<Item=Value, Error=Error> {
let path = path.trim_matches('/');
- let url: Uri = format!("https://{}:8007/{}", &self.server, path).parse().unwrap();
+ let mut url = format!("https://{}:8007/{}", &self.server, path);
+
+ if let Some(data) = data {
+ let query = tools::json_object_to_query(data).unwrap();
+ url.push('?');
+ url.push_str(&query);
+ }
+
+ let url: Uri = url.parse().unwrap();
let req = Request::builder()
.method("POST")
datastore: &str,
backup_type: &str,
backup_id: &str,
+ backup_time: DateTime<Utc>,
debug: bool,
) -> impl Future<Item=Arc<BackupClient>, Error=Error> {
- let param = json!({"backup-type": backup_type, "backup-id": backup_id, "store": datastore, "debug": debug});
+ let param = json!({
+ "backup-type": backup_type,
+ "backup-id": backup_id,
+ "backup-time": backup_time.timestamp(),
+ "store": datastore,
+ "debug": debug
+ });
+
let req = Self::request_builder(&self.server, "GET", "/api2/json/backup", Some(param)).unwrap();
self.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
datastore: &str,
backup_type: &str,
backup_id: &str,
- backup_time: DateTime<Local>,
+ backup_time: DateTime<Utc>,
debug: bool,
) -> impl Future<Item=Arc<BackupReader>, Error=Error> {
output: W,
) -> impl Future<Item=W, Error=Error> {
let path = "chunk";
- let param = json!({ "digest": proxmox::tools::digest_to_hex(digest) });
+ let param = json!({ "digest": digest_to_hex(digest) });
self.h2.download(path, Some(param), output)
}
}
}
+pub struct BackupStats {
+ pub size: u64,
+}
+
impl BackupClient {
pub fn new(h2: H2Client, canceller: Canceller) -> Arc<Self> {
}
pub fn finish(self: Arc<Self>) -> impl Future<Item=(), Error=Error> {
- let canceller = self.canceller.clone();
- self.h2.clone().post("finish", None).map(move |_| {
- canceller.cancel();
- ()
- })
+ self.h2.clone()
+ .post("finish", None)
+ .map(move |_| {
+ self.canceller.cancel();
+ })
}
pub fn force_close(self) {
file_name: &str,
crypt_config: Option<Arc<CryptConfig>>,
compress: bool,
- ) -> impl Future<Item=(), Error=Error> {
+ sign_only: bool,
+ ) -> impl Future<Item=BackupStats, Error=Error> {
let h2 = self.h2.clone();
let file_name = file_name.to_owned();
+ let size = data.len() as u64;
futures::future::ok(())
.and_then(move |_| {
let blob = if let Some(ref crypt_config) = crypt_config {
- DataBlob::encode(&data, Some(crypt_config), compress)?
+ if sign_only {
+ DataBlob::create_signed(&data, crypt_config, compress)?
+ } else {
+ DataBlob::encode(&data, Some(crypt_config), compress)?
+ }
} else {
DataBlob::encode(&data, None, compress)?
};
.and_then(move |raw_data| {
let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
h2.upload("blob", Some(param), raw_data)
- .map(|_| {})
+ .map(move |_| {
+ BackupStats { size: size }
+ })
})
}
file_name: &str,
crypt_config: Option<Arc<CryptConfig>>,
compress: bool,
- ) -> impl Future<Item=(), Error=Error> {
+ ) -> impl Future<Item=BackupStats, Error=Error> {
let h2 = self.h2.clone();
let file_name = file_name.to_owned();
DataBlob::encode(&contents, None, compress)?
};
let raw_data = blob.into_inner();
- Ok(raw_data)
+ Ok((raw_data, contents.len()))
})
- .and_then(move |raw_data| {
+ .and_then(move |(raw_data, size)| {
let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
h2.upload("blob", Some(param), raw_data)
- .map(|_| {})
+ .map(move |_| {
+ BackupStats { size: size as u64 }
+ })
})
});
prefix: &str,
fixed_size: Option<u64>,
crypt_config: Option<Arc<CryptConfig>>,
- ) -> impl Future<Item=(), Error=Error> {
+ ) -> impl Future<Item=BackupStats, Error=Error> {
let known_chunks = Arc::new(Mutex::new(HashSet::new()));
.and_then(move |res| {
let wid = res.as_u64().unwrap();
Self::upload_chunk_info_stream(h2_3, wid, stream, &prefix, known_chunks.clone(), crypt_config)
- .and_then(move |(chunk_count, size, _speed)| {
+ .and_then(move |(chunk_count, size, _speed)| {
let param = json!({
"wid": wid ,
"chunk-count": chunk_count,
"size": size,
});
h2_4.post(&close_path, Some(param))
- })
- .map(|_| ())
+ .map(move |_| {
+ BackupStats { size: size as u64 }
+ })
+ })
})
}
let mut offset_list = vec![];
for (offset, digest) in chunk_list {
//println!("append chunk {} (offset {})", proxmox::tools::digest_to_hex(&digest), offset);
- digest_list.push(proxmox::tools::digest_to_hex(&digest));
+ digest_list.push(digest_to_hex(&digest));
offset_list.push(offset);
}
println!("append chunks list len ({})", digest_list.len());
DigestListDecoder::new(body.map_err(Error::from))
.for_each(move |chunk| {
let _ = release_capacity.release_capacity(chunk.len());
- println!("GOT DOWNLOAD {}", proxmox::tools::digest_to_hex(&chunk));
+ println!("GOT DOWNLOAD {}", digest_to_hex(&chunk));
known_chunks.lock().unwrap().insert(chunk);
Ok(())
})
if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
let offset = chunk_info.offset;
let digest = *chunk_info.chunk.digest();
- let digest_str = proxmox::tools::digest_to_hex(&digest);
+ let digest_str = digest_to_hex(&digest);
let upload_queue = upload_queue.clone();
println!("upload new chunk {} ({} bytes, offset {})", digest_str,
}
})
.then(move |result| {
- println!("RESULT {:?}", result);
+ //println!("RESULT {:?}", result);
upload_result.map_err(Error::from).and_then(|upload1_result| {
Ok(upload1_result.and(result))
})