use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error};
-use futures::*;
-use futures::stream::Stream;
use futures::future::AbortHandle;
+use futures::stream::Stream;
+use futures::*;
use serde_json::{json, Value};
use tokio::io::AsyncReadExt;
use tokio::sync::{mpsc, oneshot};
use proxmox::tools::digest_to_hex;
-use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
+use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
use crate::backup::*;
use crate::tools::format::HumanByte;
-use super::{HttpClient, H2Client};
+use super::{H2Client, HttpClient};
pub struct BackupWriter {
h2: H2Client,
}
impl Drop for BackupWriter {
-
fn drop(&mut self) {
self.abort.abort();
}
type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
impl BackupWriter {
-
- fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> {
- Arc::new(Self { h2, abort, crypt_config, verbose })
+ fn new(
+ h2: H2Client,
+ abort: AbortHandle,
+ crypt_config: Option<Arc<CryptConfig>>,
+ verbose: bool,
+ ) -> Arc<Self> {
+ Arc::new(Self {
+ h2,
+ abort,
+ crypt_config,
+ verbose,
+ })
}
// FIXME: extract into (flattened) parameter struct?
backup_id: &str,
backup_time: i64,
debug: bool,
- benchmark: bool
+ benchmark: bool,
) -> Result<Arc<BackupWriter>, Error> {
-
let param = json!({
"backup-type": backup_type,
"backup-id": backup_id,
});
let req = HttpClient::request_builder(
- client.server(), client.port(), "GET", "/api2/json/backup", Some(param)).unwrap();
-
- let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
+ client.server(),
+ client.port(),
+ "GET",
+ "/api2/json/backup",
+ Some(param),
+ )
+ .unwrap();
+
+ let (h2, abort) = client
+ .start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
+ .await?;
Ok(BackupWriter::new(h2, abort, crypt_config, debug))
}
- pub async fn get(
- &self,
- path: &str,
- param: Option<Value>,
- ) -> Result<Value, Error> {
+ pub async fn get(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
self.h2.get(path, param).await
}
- pub async fn put(
- &self,
- path: &str,
- param: Option<Value>,
- ) -> Result<Value, Error> {
+ pub async fn put(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
self.h2.put(path, param).await
}
- pub async fn post(
- &self,
- path: &str,
- param: Option<Value>,
- ) -> Result<Value, Error> {
+ pub async fn post(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
self.h2.post(path, param).await
}
content_type: &str,
data: Vec<u8>,
) -> Result<Value, Error> {
- self.h2.upload("POST", path, param, content_type, data).await
+ self.h2
+ .upload("POST", path, param, content_type, data)
+ .await
}
pub async fn send_upload_request(
content_type: &str,
data: Vec<u8>,
) -> Result<h2::client::ResponseFuture, Error> {
-
- let request = H2Client::request_builder("localhost", method, path, param, Some(content_type)).unwrap();
- let response_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
+ let request =
+ H2Client::request_builder("localhost", method, path, param, Some(content_type))
+ .unwrap();
+ let response_future = self
+ .h2
+ .send_request(request, Some(bytes::Bytes::from(data.clone())))
+ .await?;
Ok(response_future)
}
&self,
mut reader: R,
file_name: &str,
- ) -> Result<BackupStats, Error> {
+ ) -> Result<BackupStats, Error> {
let mut raw_data = Vec::new();
// fixme: avoid loading into memory
reader.read_to_end(&mut raw_data)?;
let csum = openssl::sha::sha256(&raw_data);
let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
let size = raw_data.len() as u64;
- let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
+ let _value = self
+ .h2
+ .upload(
+ "POST",
+ "blob",
+ Some(param),
+ "application/octet-stream",
+ raw_data,
+ )
+ .await?;
Ok(BackupStats { size, csum })
}
options: UploadOptions,
) -> Result<BackupStats, Error> {
let blob = match (options.encrypt, &self.crypt_config) {
- (false, _) => DataBlob::encode(&data, None, options.compress)?,
- (true, None) => bail!("requested encryption without a crypt config"),
- (true, Some(crypt_config)) => DataBlob::encode(&data, Some(crypt_config), options.compress)?,
+ (false, _) => DataBlob::encode(&data, None, options.compress)?,
+ (true, None) => bail!("requested encryption without a crypt config"),
+ (true, Some(crypt_config)) => {
+ DataBlob::encode(&data, Some(crypt_config), options.compress)?
+ }
};
let raw_data = blob.into_inner();
let csum = openssl::sha::sha256(&raw_data);
let param = json!({"encoded-size": size, "file-name": file_name });
- let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
+ let _value = self
+ .h2
+ .upload(
+ "POST",
+ "blob",
+ Some(param),
+ "application/octet-stream",
+ raw_data,
+ )
+ .await?;
Ok(BackupStats { size, csum })
}
file_name: &str,
options: UploadOptions,
) -> Result<BackupStats, Error> {
-
let src_path = src_path.as_ref();
let mut file = tokio::fs::File::open(src_path)
.await
.map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
- self.upload_blob_from_data(contents, file_name, options).await
+ self.upload_blob_from_data(contents, file_name, options)
+ .await
}
pub async fn upload_stream(
// try, but ignore errors
match archive_type(archive_name) {
Ok(ArchiveType::FixedIndex) => {
- let _ = self.download_previous_fixed_index(archive_name, &manifest, known_chunks.clone()).await;
+ let _ = self
+ .download_previous_fixed_index(
+ archive_name,
+ &manifest,
+ known_chunks.clone(),
+ )
+ .await;
}
Ok(ArchiveType::DynamicIndex) => {
- let _ = self.download_previous_dynamic_index(archive_name, &manifest, known_chunks.clone()).await;
+ let _ = self
+ .download_previous_dynamic_index(
+ archive_name,
+ &manifest,
+ known_chunks.clone(),
+ )
+ .await;
}
_ => { /* do nothing */ }
}
}
- let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
+ let wid = self
+ .h2
+ .post(&index_path, Some(param))
+ .await?
+ .as_u64()
+ .unwrap();
let (chunk_count, chunk_reused, size, size_reused, duration, csum) =
Self::upload_chunk_info_stream(
stream,
&prefix,
known_chunks.clone(),
- if options.encrypt { self.crypt_config.clone() } else { None },
+ if options.encrypt {
+ self.crypt_config.clone()
+ } else {
+ None
+ },
options.compress,
self.verbose,
)
crate::tools::format::strip_server_file_extension(archive_name)
};
if archive_name != CATALOG_NAME {
- let speed: HumanByte = ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into();
+ let speed: HumanByte =
+ ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into();
let uploaded: HumanByte = uploaded.into();
- println!("{}: had to upload {} of {} in {:.2}s, average speed {}/s).", archive, uploaded, vsize_h, duration.as_secs_f64(), speed);
+ println!(
+ "{}: had to upload {} of {} in {:.2}s, average speed {}/s).",
+ archive,
+ uploaded,
+ vsize_h,
+ duration.as_secs_f64(),
+ speed
+ );
} else {
println!("Uploaded backup catalog ({})", vsize_h);
}
- if size_reused > 0 && size > 1024*1024 {
+ if size_reused > 0 && size > 1024 * 1024 {
let reused_percent = size_reused as f64 * 100. / size as f64;
let reused: HumanByte = size_reused.into();
- println!("{}: backup was done incrementally, reused {} ({:.1}%)", archive, reused, reused_percent);
+ println!(
+ "{}: backup was done incrementally, reused {} ({:.1}%)",
+ archive, reused, reused_percent
+ );
}
if self.verbose && chunk_count > 0 {
- println!("{}: Reused {} from {} chunks.", archive, chunk_reused, chunk_count);
- println!("{}: Average chunk size was {}.", archive, HumanByte::from(size/chunk_count));
- println!("{}: Average time per request: {} microseconds.", archive, (duration.as_micros())/(chunk_count as u128));
+ println!(
+ "{}: Reused {} from {} chunks.",
+ archive, chunk_reused, chunk_count
+ );
+ println!(
+ "{}: Average chunk size was {}.",
+ archive,
+ HumanByte::from(size / chunk_count)
+ );
+ println!(
+ "{}: Average time per request: {} microseconds.",
+ archive,
+ (duration.as_micros()) / (chunk_count as u128)
+ );
}
let param = json!({
})
}
- fn response_queue(verbose: bool) -> (
+ fn response_queue(
+ verbose: bool,
+ ) -> (
mpsc::Sender<h2::client::ResponseFuture>,
- oneshot::Receiver<Result<(), Error>>
+ oneshot::Receiver<Result<(), Error>>,
) {
let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
let (verify_result_tx, verify_result_rx) = oneshot::channel();
response
.map_err(Error::from)
.and_then(H2Client::h2api_response)
- .map_ok(move |result| if verbose { println!("RESPONSE: {:?}", result) })
+ .map_ok(move |result| {
+ if verbose {
+ println!("RESPONSE: {:?}", result)
+ }
+ })
.map_err(|err| format_err!("pipelined request failed: {}", err))
})
.map(|result| {
- let _ignore_closed_channel = verify_result_tx.send(result);
- })
+ let _ignore_closed_channel = verify_result_tx.send(result);
+ }),
);
(verify_queue_tx, verify_result_rx)
&self,
archive_name: &str,
manifest: &BackupManifest,
- known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+ known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<FixedIndexReader, Error> {
-
let mut tmpfile = std::fs::OpenOptions::new()
.write(true)
.read(true)
.open("/tmp")?;
let param = json!({ "archive-name": archive_name });
- self.h2.download("previous", Some(param), &mut tmpfile).await?;
+ self.h2
+ .download("previous", Some(param), &mut tmpfile)
+ .await?;
- let index = FixedIndexReader::new(tmpfile)
- .map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?;
+ let index = FixedIndexReader::new(tmpfile).map_err(|err| {
+ format_err!("unable to read fixed index '{}' - {}", archive_name, err)
+ })?;
// Note: do not use values stored in index (not trusted) - instead, computed them again
let (csum, size) = index.compute_csum();
manifest.verify_file(archive_name, &csum, size)?;
}
if self.verbose {
- println!("{}: known chunks list length is {}", archive_name, index.index_count());
+ println!(
+ "{}: known chunks list length is {}",
+ archive_name,
+ index.index_count()
+ );
}
Ok(index)
&self,
archive_name: &str,
manifest: &BackupManifest,
- known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+ known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<DynamicIndexReader, Error> {
-
let mut tmpfile = std::fs::OpenOptions::new()
.write(true)
.read(true)
.open("/tmp")?;
let param = json!({ "archive-name": archive_name });
- self.h2.download("previous", Some(param), &mut tmpfile).await?;
+ self.h2
+ .download("previous", Some(param), &mut tmpfile)
+ .await?;
- let index = DynamicIndexReader::new(tmpfile)
- .map_err(|err| format_err!("unable to read dynmamic index '{}' - {}", archive_name, err))?;
+ let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
+ format_err!("unable to read dynmamic index '{}' - {}", archive_name, err)
+ })?;
// Note: do not use values stored in index (not trusted) - instead, computed them again
let (csum, size) = index.compute_csum();
manifest.verify_file(archive_name, &csum, size)?;
}
if self.verbose {
- println!("{}: known chunks list length is {}", archive_name, index.index_count());
+ println!(
+ "{}: known chunks list length is {}",
+ archive_name,
+ index.index_count()
+ );
}
Ok(index)
/// Retrieve backup time of last backup
pub async fn previous_backup_time(&self) -> Result<Option<i64>, Error> {
let data = self.h2.get("previous_backup_time", None).await?;
- serde_json::from_value(data)
- .map_err(|err| format_err!("Failed to parse backup time value returned by server - {}", err))
+ serde_json::from_value(data).map_err(|err| {
+ format_err!(
+ "Failed to parse backup time value returned by server - {}",
+ err
+ )
+ })
}
/// Download backup manifest (index.json) of last backup
pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
-
let mut raw_data = Vec::with_capacity(64 * 1024);
let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
- self.h2.download("previous", Some(param), &mut raw_data).await?;
+ self.h2
+ .download("previous", Some(param), &mut raw_data)
+ .await?;
let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
// no expected digest available
let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref), None)?;
- let manifest = BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
+ let manifest =
+ BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
Ok(manifest)
}
wid: u64,
stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
prefix: &str,
- known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+ known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
crypt_config: Option<Arc<CryptConfig>>,
compress: bool,
verbose: bool,
- ) -> impl Future<Output = Result<(usize, usize, usize, usize, std::time::Duration, [u8; 32]), Error>> {
-
+ ) -> impl Future<Output = Result<(usize, usize, usize, usize, std::time::Duration, [u8; 32]), Error>>
+ {
let total_chunks = Arc::new(AtomicUsize::new(0));
let total_chunks2 = total_chunks.clone();
let known_chunk_count = Arc::new(AtomicUsize::new(0));
stream
.and_then(move |data| {
-
let chunk_len = data.len();
total_chunks.fetch_add(1, Ordering::SeqCst);
let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
- let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
- .compress(compress);
+ let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
if let Some(ref crypt_config) = crypt_config {
chunk_builder = chunk_builder.crypt_config(crypt_config);
let chunk_end = offset + chunk_len as u64;
- if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
+ if !is_fixed_chunk_size {
+ csum.update(&chunk_end.to_le_bytes());
+ }
csum.update(digest);
let chunk_is_known = known_chunks.contains(digest);
future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
} else {
known_chunks.insert(*digest);
- future::ready(chunk_builder
- .build()
- .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
+ future::ready(chunk_builder.build().map(move |(chunk, digest)| {
+ MergedChunkInfo::New(ChunkInfo {
chunk,
digest,
chunk_len: chunk_len as u64,
offset,
- }))
- )
+ })
+ }))
}
})
.merge_known_chunks()
});
let ct = "application/octet-stream";
- let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param), Some(ct)).unwrap();
+ let request = H2Client::request_builder(
+ "localhost",
+ "POST",
+ &upload_chunk_path,
+ Some(param),
+ Some(ct),
+ )
+ .unwrap();
let upload_data = Some(bytes::Bytes::from(chunk_data));
let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
- future::Either::Left(h2
- .send_request(request, upload_data)
- .and_then(move |response| async move {
+ future::Either::Left(h2.send_request(request, upload_data).and_then(
+ move |response| async move {
upload_queue
.send((new_info, Some(response)))
.await
- .map_err(|err| format_err!("failed to send to upload queue: {}", err))
- })
- )
+ .map_err(|err| {
+ format_err!("failed to send to upload queue: {}", err)
+ })
+ },
+ ))
} else {
future::Either::Right(async move {
upload_queue
})
}
})
- .then(move |result| async move {
- upload_result.await?.and(result)
- }.boxed())
+ .then(move |result| async move { upload_result.await?.and(result) }.boxed())
.and_then(move |_| {
let duration = start_time.elapsed();
let total_chunks = total_chunks2.load(Ordering::SeqCst);
let mut guard = index_csum_2.lock().unwrap();
let csum = guard.take().unwrap().finish();
- futures::future::ok((total_chunks, known_chunk_count, stream_len, reused_len, duration, csum))
+ futures::future::ok((
+ total_chunks,
+ known_chunk_count,
+ stream_len,
+ reused_len,
+ duration,
+ csum,
+ ))
})
}
/// Upload speed test - prints result to stderr
pub async fn upload_speedtest(&self, verbose: bool) -> Result<f64, Error> {
-
let mut data = vec![];
// generate pseudo random byte sequence
- for i in 0..1024*1024 {
+ for i in 0..1024 * 1024 {
for j in 0..4 {
- let byte = ((i >> (j<<3))&0xff) as u8;
+ let byte = ((i >> (j << 3)) & 0xff) as u8;
data.push(byte);
}
}
break;
}
- if verbose { eprintln!("send test data ({} bytes)", data.len()); }
- let request = H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
- let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
+ if verbose {
+ eprintln!("send test data ({} bytes)", data.len());
+ }
+ let request =
+ H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
+ let request_future = self
+ .h2
+ .send_request(request, Some(bytes::Bytes::from(data.clone())))
+ .await?;
upload_queue.send(request_future).await?;
}
let _ = upload_result.await?;
- eprintln!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
- let speed = ((item_len*(repeat as usize)) as f64)/start_time.elapsed().as_secs_f64();
- eprintln!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
+ eprintln!(
+ "Uploaded {} chunks in {} seconds.",
+ repeat,
+ start_time.elapsed().as_secs()
+ );
+ let speed = ((item_len * (repeat as usize)) as f64) / start_time.elapsed().as_secs_f64();
+ eprintln!(
+ "Time per request: {} microseconds.",
+ (start_time.elapsed().as_micros()) / (repeat as u128)
+ );
Ok(speed)
}