]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/client/backup_writer.rs
client: factor out UploadOptions
[proxmox-backup.git] / src / client / backup_writer.rs
index 769f3e946a2bfabfd36b4a3753f7195fb5570f11..38953a450a3ed20cd8d3007210c7a9dc68baf481 100644 (file)
@@ -3,19 +3,20 @@ use std::os::unix::fs::OpenOptionsExt;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 
-use anyhow::{format_err, Error};
-use chrono::{DateTime, Utc};
+use anyhow::{bail, format_err, Error};
 use futures::*;
 use futures::stream::Stream;
 use futures::future::AbortHandle;
 use serde_json::{json, Value};
 use tokio::io::AsyncReadExt;
 use tokio::sync::{mpsc, oneshot};
+use tokio_stream::wrappers::ReceiverStream;
 
 use proxmox::tools::digest_to_hex;
 
 use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
 use crate::backup::*;
+use crate::tools::format::HumanByte;
 
 use super::{HttpClient, H2Client};
 
@@ -38,6 +39,18 @@ pub struct BackupStats {
     pub csum: [u8; 32],
 }
 
+/// Options for uploading blobs/streams to the server
+#[derive(Default, Clone)]
+pub struct UploadOptions {
+    pub previous_manifest: Option<Arc<BackupManifest>>,
+    pub compress: bool,
+    pub encrypt: bool,
+    pub fixed_size: Option<u64>,
+}
+
+type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
+type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
+
 impl BackupWriter {
 
     fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> {
@@ -50,20 +63,22 @@ impl BackupWriter {
         datastore: &str,
         backup_type: &str,
         backup_id: &str,
-        backup_time: DateTime<Utc>,
+        backup_time: i64,
         debug: bool,
+        benchmark: bool
     ) -> Result<Arc<BackupWriter>, Error> {
 
         let param = json!({
             "backup-type": backup_type,
             "backup-id": backup_id,
-            "backup-time": backup_time.timestamp(),
+            "backup-time": backup_time,
             "store": datastore,
-            "debug": debug
+            "debug": debug,
+            "benchmark": benchmark
         });
 
         let req = HttpClient::request_builder(
-            client.server(), "GET", "/api2/json/backup", Some(param)).unwrap();
+            client.server(), client.port(), "GET", "/api2/json/backup", Some(param)).unwrap();
 
         let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
 
@@ -162,22 +177,12 @@ impl BackupWriter {
         &self,
         data: Vec<u8>,
         file_name: &str,
-        compress: bool,
-        crypt_or_sign: Option<bool>,
-     ) -> Result<BackupStats, Error> {
-
-        let blob = if let Some(ref crypt_config) = self.crypt_config {
-            if let Some(encrypt) = crypt_or_sign {
-                if encrypt {
-                    DataBlob::encode(&data, Some(crypt_config), compress)?
-                } else {
-                    DataBlob::create_signed(&data, crypt_config, compress)?
-                }
-            } else {
-                DataBlob::encode(&data, None, compress)?
-            }
-        } else {
-            DataBlob::encode(&data, None, compress)?
+        options: UploadOptions,
+    ) -> Result<BackupStats, Error> {
+        let blob = match (options.encrypt, &self.crypt_config) {
+             (false, _) => DataBlob::encode(&data, None, options.compress)?,
+             (true, None) => bail!("requested encryption without a crypt config"),
+             (true, Some(crypt_config)) => DataBlob::encode(&data, Some(crypt_config), options.compress)?,
         };
 
         let raw_data = blob.into_inner();
@@ -193,9 +198,8 @@ impl BackupWriter {
         &self,
         src_path: P,
         file_name: &str,
-        compress: bool,
-        crypt_or_sign: Option<bool>,
-     ) -> Result<BackupStats, Error> {
+        options: UploadOptions,
+    ) -> Result<BackupStats, Error> {
 
         let src_path = src_path.as_ref();
 
@@ -209,28 +213,33 @@ impl BackupWriter {
             .await
             .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
 
-        self.upload_blob_from_data(contents, file_name, compress, crypt_or_sign).await
+        self.upload_blob_from_data(contents, file_name, options).await
     }
 
     pub async fn upload_stream(
         &self,
-        previous_manifest: Option<Arc<BackupManifest>>,
         archive_name: &str,
         stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
-        prefix: &str,
-        fixed_size: Option<u64>,
+        options: UploadOptions,
     ) -> Result<BackupStats, Error> {
         let known_chunks = Arc::new(Mutex::new(HashSet::new()));
 
         let mut param = json!({ "archive-name": archive_name });
-        if let Some(size) = fixed_size {
+        let prefix = if let Some(size) = options.fixed_size {
             param["size"] = size.into();
+            "fixed"
+        } else {
+            "dynamic"
+        };
+
+        if options.encrypt && self.crypt_config.is_none() {
+            bail!("requested encryption without a crypt config");
         }
 
         let index_path = format!("{}_index", prefix);
         let close_path = format!("{}_close", prefix);
 
-        if let Some(manifest) = previous_manifest {
+        if let Some(manifest) = options.previous_manifest {
             // try, but ignore errors
             match archive_type(archive_name) {
                 Ok(ArchiveType::FixedIndex) => {
@@ -245,22 +254,43 @@ impl BackupWriter {
 
         let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
 
-        let (chunk_count, size, duration, speed, csum) =
+        let (chunk_count, chunk_reused, size, size_reused, duration, csum) =
             Self::upload_chunk_info_stream(
                 self.h2.clone(),
                 wid,
                 stream,
                 &prefix,
                 known_chunks.clone(),
-                self.crypt_config.clone(),
+                if options.encrypt { self.crypt_config.clone() } else { None },
+                options.compress,
                 self.verbose,
             )
             .await?;
 
-        println!("{}: Uploaded {} bytes as {} chunks in {} seconds ({} MB/s).", archive_name, size, chunk_count, duration.as_secs(), speed);
-        if chunk_count > 0 {
-            println!("{}: Average chunk size was {} bytes.", archive_name, size/chunk_count);
-            println!("{}: Time per request: {} microseconds.", archive_name, (duration.as_micros())/(chunk_count as u128));
+        let uploaded = size - size_reused;
+        let vsize_h: HumanByte = size.into();
+        let archive = if self.verbose {
+            archive_name.to_string()
+        } else {
+            crate::tools::format::strip_server_file_extension(archive_name)
+        };
+        if archive_name != CATALOG_NAME {
+            let speed: HumanByte = ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into();
+            let uploaded: HumanByte = uploaded.into();
+            println!("{}: had to upload {} of {} in {:.2}s, average speed {}/s).", archive, uploaded, vsize_h, duration.as_secs_f64(), speed);
+        } else {
+            println!("Uploaded backup catalog ({})", vsize_h);
+        }
+
+        if size_reused > 0 && size > 1024*1024 {
+            let reused_percent = size_reused as f64 * 100. / size as f64;
+            let reused: HumanByte = size_reused.into();
+            println!("{}: backup was done incrementally, reused {} ({:.1}%)", archive, reused, reused_percent);
+        }
+        if self.verbose && chunk_count > 0 {
+            println!("{}: Reused {} from {} chunks.", archive, chunk_reused, chunk_count);
+            println!("{}: Average chunk size was {}.", archive, HumanByte::from(size/chunk_count));
+            println!("{}: Average time per request: {} microseconds.", archive, (duration.as_micros())/(chunk_count as u128));
         }
 
         let param = json!({
@@ -276,7 +306,7 @@ impl BackupWriter {
         })
     }
 
-    fn response_queue() -> (
+    fn response_queue(verbose: bool) -> (
         mpsc::Sender<h2::client::ResponseFuture>,
         oneshot::Receiver<Result<(), Error>>
     ) {
@@ -298,13 +328,13 @@ impl BackupWriter {
         // });
         // old code for reference?
         tokio::spawn(
-            verify_queue_rx
+            ReceiverStream::new(verify_queue_rx)
                 .map(Ok::<_, Error>)
-                .try_for_each(|response: h2::client::ResponseFuture| {
+                .try_for_each(move |response: h2::client::ResponseFuture| {
                     response
                         .map_err(Error::from)
                         .and_then(H2Client::h2api_response)
-                        .map_ok(|result| println!("RESPONSE: {:?}", result))
+                        .map_ok(move |result| if verbose { println!("RESPONSE: {:?}", result) })
                         .map_err(|err| format_err!("pipelined request failed: {}", err))
                 })
                 .map(|result| {
@@ -315,18 +345,18 @@ impl BackupWriter {
         (verify_queue_tx, verify_result_rx)
     }
 
-    fn append_chunk_queue(h2: H2Client, wid: u64, path: String, verbose: bool) -> (
-        mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
-        oneshot::Receiver<Result<(), Error>>,
-    ) {
+    fn append_chunk_queue(
+        h2: H2Client,
+        wid: u64,
+        path: String,
+        verbose: bool,
+    ) -> (UploadQueueSender, UploadResultReceiver) {
         let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
         let (verify_result_tx, verify_result_rx) = oneshot::channel();
 
-        let h2_2 = h2.clone();
-
         // FIXME: async-block-ify this code!
         tokio::spawn(
-            verify_queue_rx
+            ReceiverStream::new(verify_queue_rx)
                 .map(Ok::<_, Error>)
                 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
                     match (response, merged_chunk_info) {
@@ -361,7 +391,7 @@ impl BackupWriter {
                             let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
                             let param_data = bytes::Bytes::from(param.to_string().into_bytes());
                             let upload_data = Some(param_data);
-                            h2_2.send_request(request, upload_data)
+                            h2.send_request(request, upload_data)
                                 .and_then(move |response| {
                                     response
                                         .map_err(Error::from)
@@ -434,7 +464,7 @@ impl BackupWriter {
         self.h2.download("previous", Some(param), &mut tmpfile).await?;
 
         let index = DynamicIndexReader::new(tmpfile)
-            .map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?;
+            .map_err(|err| format_err!("unable to read dynmamic index '{}' - {}", archive_name, err))?;
         // Note: do not use values stored in index (not trusted) - instead, computed them again
         let (csum, size) = index.compute_csum();
         manifest.verify_file(archive_name, &csum, size)?;
@@ -452,25 +482,34 @@ impl BackupWriter {
         Ok(index)
     }
 
+    /// Retrieve backup time of last backup
+    pub async fn previous_backup_time(&self) -> Result<Option<i64>, Error> {
+        let data = self.h2.get("previous_backup_time", None).await?;
+        serde_json::from_value(data)
+            .map_err(|err| format_err!("Failed to parse backup time value returned by server - {}", err))
+    }
+
     /// Download backup manifest (index.json) of last backup
     pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
 
-        use std::convert::TryFrom;
-
         let mut raw_data = Vec::with_capacity(64 * 1024);
 
         let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
         self.h2.download("previous", Some(param), &mut raw_data).await?;
 
-        let blob = DataBlob::from_raw(raw_data)?;
-        blob.verify_crc()?;
-        let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
-        let json: Value = serde_json::from_slice(&data[..])?;
-        let manifest = BackupManifest::try_from(json)?;
+        let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
+        // no expected digest available
+        let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref), None)?;
+
+        let manifest = BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
 
         Ok(manifest)
     }
 
+    // We have no `self` here for `h2` and `verbose`, the only other arg "common" with 1 other
+    // funciton in the same path is `wid`, so those 3 could be in a struct, but there's no real use
+    // since this is a private method.
+    #[allow(clippy::too_many_arguments)]
     fn upload_chunk_info_stream(
         h2: H2Client,
         wid: u64,
@@ -478,21 +517,26 @@ impl BackupWriter {
         prefix: &str,
         known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
         crypt_config: Option<Arc<CryptConfig>>,
+        compress: bool,
         verbose: bool,
-    ) -> impl Future<Output = Result<(usize, usize, std::time::Duration, usize, [u8; 32]), Error>> {
+    ) -> impl Future<Output = Result<(usize, usize, usize, usize, std::time::Duration, [u8; 32]), Error>> {
 
-        let repeat = Arc::new(AtomicUsize::new(0));
-        let repeat2 = repeat.clone();
+        let total_chunks = Arc::new(AtomicUsize::new(0));
+        let total_chunks2 = total_chunks.clone();
+        let known_chunk_count = Arc::new(AtomicUsize::new(0));
+        let known_chunk_count2 = known_chunk_count.clone();
 
         let stream_len = Arc::new(AtomicUsize::new(0));
         let stream_len2 = stream_len.clone();
+        let reused_len = Arc::new(AtomicUsize::new(0));
+        let reused_len2 = reused_len.clone();
 
         let append_chunk_path = format!("{}_index", prefix);
         let upload_chunk_path = format!("{}_chunk", prefix);
         let is_fixed_chunk_size = prefix == "fixed";
 
         let (upload_queue, upload_result) =
-            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned(), verbose);
+            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, verbose);
 
         let start_time = std::time::Instant::now();
 
@@ -504,11 +548,11 @@ impl BackupWriter {
 
                 let chunk_len = data.len();
 
-                repeat.fetch_add(1, Ordering::SeqCst);
+                total_chunks.fetch_add(1, Ordering::SeqCst);
                 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
 
                 let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
-                    .compress(true);
+                    .compress(compress);
 
                 if let Some(ref crypt_config) = crypt_config {
                     chunk_builder = chunk_builder.crypt_config(crypt_config);
@@ -527,6 +571,8 @@ impl BackupWriter {
 
                 let chunk_is_known = known_chunks.contains(digest);
                 if chunk_is_known {
+                    known_chunk_count.fetch_add(1, Ordering::SeqCst);
+                    reused_len.fetch_add(chunk_len, Ordering::SeqCst);
                     future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
                 } else {
                     known_chunks.insert(*digest);
@@ -543,16 +589,19 @@ impl BackupWriter {
             })
             .merge_known_chunks()
             .try_for_each(move |merged_chunk_info| {
+                let upload_queue = upload_queue.clone();
 
                 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
                     let offset = chunk_info.offset;
                     let digest = chunk_info.digest;
                     let digest_str = digest_to_hex(&digest);
 
+                    /* too verbose, needs finer verbosity setting granularity
                     if verbose {
                         println!("upload new chunk {} ({} bytes, offset {})", digest_str,
                                  chunk_info.chunk_len, offset);
                     }
+                    */
 
                     let chunk_data = chunk_info.chunk.into_inner();
                     let param = json!({
@@ -568,7 +617,6 @@ impl BackupWriter {
 
                     let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
 
-                    let mut upload_queue = upload_queue.clone();
                     future::Either::Left(h2
                         .send_request(request, upload_data)
                         .and_then(move |response| async move {
@@ -579,7 +627,6 @@ impl BackupWriter {
                         })
                     )
                 } else {
-                    let mut upload_queue = upload_queue.clone();
                     future::Either::Right(async move {
                         upload_queue
                             .send((merged_chunk_info, None))
@@ -592,18 +639,21 @@ impl BackupWriter {
                 upload_result.await?.and(result)
             }.boxed())
             .and_then(move |_| {
-                let repeat = repeat2.load(Ordering::SeqCst);
+                let duration = start_time.elapsed();
+                let total_chunks = total_chunks2.load(Ordering::SeqCst);
+                let known_chunk_count = known_chunk_count2.load(Ordering::SeqCst);
                 let stream_len = stream_len2.load(Ordering::SeqCst);
-                let speed = ((stream_len*1_000_000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
+                let reused_len = reused_len2.load(Ordering::SeqCst);
 
                 let mut guard = index_csum_2.lock().unwrap();
                 let csum = guard.take().unwrap().finish();
 
-                futures::future::ok((repeat, stream_len, start_time.elapsed(), speed, csum))
+                futures::future::ok((total_chunks, known_chunk_count, stream_len, reused_len, duration, csum))
             })
     }
 
-    pub async fn upload_speedtest(&self) -> Result<usize, Error> {
+    /// Upload speed test - prints result to stderr
+    pub async fn upload_speedtest(&self, verbose: bool) -> Result<f64, Error> {
 
         let mut data = vec![];
         // generate pseudo random byte sequence
@@ -618,7 +668,7 @@ impl BackupWriter {
 
         let mut repeat = 0;
 
-        let (upload_queue, upload_result) = Self::response_queue();
+        let (upload_queue, upload_result) = Self::response_queue(verbose);
 
         let start_time = std::time::Instant::now();
 
@@ -628,9 +678,7 @@ impl BackupWriter {
                 break;
             }
 
-            let mut upload_queue = upload_queue.clone();
-
-            println!("send test data ({} bytes)", data.len());
+            if verbose { eprintln!("send test data ({} bytes)", data.len()); }
             let request = H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
             let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
 
@@ -641,9 +689,9 @@ impl BackupWriter {
 
         let _ = upload_result.await?;
 
-        println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
-        let speed = ((item_len*1_000_000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
-        println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
+        eprintln!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
+        let speed = ((item_len*(repeat as usize)) as f64)/start_time.elapsed().as_secs_f64();
+        eprintln!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
 
         Ok(speed)
     }