]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/client/backup_writer.rs
client: factor out UploadOptions
[proxmox-backup.git] / src / client / backup_writer.rs
index 7e5adb3c9211903d5ca81cafa453452676c7bba1..38953a450a3ed20cd8d3007210c7a9dc68baf481 100644 (file)
@@ -4,13 +4,13 @@ use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 
 use anyhow::{bail, format_err, Error};
-use chrono::{DateTime, Utc};
 use futures::*;
 use futures::stream::Stream;
 use futures::future::AbortHandle;
 use serde_json::{json, Value};
 use tokio::io::AsyncReadExt;
 use tokio::sync::{mpsc, oneshot};
+use tokio_stream::wrappers::ReceiverStream;
 
 use proxmox::tools::digest_to_hex;
 
@@ -39,6 +39,18 @@ pub struct BackupStats {
     pub csum: [u8; 32],
 }
 
+/// Options for uploading blobs/streams to the server
+#[derive(Default, Clone)]
+pub struct UploadOptions {
+    pub previous_manifest: Option<Arc<BackupManifest>>,
+    pub compress: bool,
+    pub encrypt: bool,
+    pub fixed_size: Option<u64>,
+}
+
+type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
+type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
+
 impl BackupWriter {
 
     fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> {
@@ -51,20 +63,22 @@ impl BackupWriter {
         datastore: &str,
         backup_type: &str,
         backup_id: &str,
-        backup_time: DateTime<Utc>,
+        backup_time: i64,
         debug: bool,
+        benchmark: bool
     ) -> Result<Arc<BackupWriter>, Error> {
 
         let param = json!({
             "backup-type": backup_type,
             "backup-id": backup_id,
-            "backup-time": backup_time.timestamp(),
+            "backup-time": backup_time,
             "store": datastore,
-            "debug": debug
+            "debug": debug,
+            "benchmark": benchmark
         });
 
         let req = HttpClient::request_builder(
-            client.server(), "GET", "/api2/json/backup", Some(param)).unwrap();
+            client.server(), client.port(), "GET", "/api2/json/backup", Some(param)).unwrap();
 
         let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
 
@@ -163,13 +177,12 @@ impl BackupWriter {
         &self,
         data: Vec<u8>,
         file_name: &str,
-        compress: bool,
-        encrypt: bool,
+        options: UploadOptions,
     ) -> Result<BackupStats, Error> {
-        let blob = match (encrypt, &self.crypt_config) {
-             (false, _) => DataBlob::encode(&data, None, compress)?,
+        let blob = match (options.encrypt, &self.crypt_config) {
+             (false, _) => DataBlob::encode(&data, None, options.compress)?,
              (true, None) => bail!("requested encryption without a crypt config"),
-             (true, Some(crypt_config)) => DataBlob::encode(&data, Some(crypt_config), compress)?,
+             (true, Some(crypt_config)) => DataBlob::encode(&data, Some(crypt_config), options.compress)?,
         };
 
         let raw_data = blob.into_inner();
@@ -185,8 +198,7 @@ impl BackupWriter {
         &self,
         src_path: P,
         file_name: &str,
-        compress: bool,
-        encrypt: bool,
+        options: UploadOptions,
     ) -> Result<BackupStats, Error> {
 
         let src_path = src_path.as_ref();
@@ -201,34 +213,33 @@ impl BackupWriter {
             .await
             .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
 
-        self.upload_blob_from_data(contents, file_name, compress, encrypt).await
+        self.upload_blob_from_data(contents, file_name, options).await
     }
 
     pub async fn upload_stream(
         &self,
-        previous_manifest: Option<Arc<BackupManifest>>,
         archive_name: &str,
         stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
-        prefix: &str,
-        fixed_size: Option<u64>,
-        compress: bool,
-        encrypt: bool,
+        options: UploadOptions,
     ) -> Result<BackupStats, Error> {
         let known_chunks = Arc::new(Mutex::new(HashSet::new()));
 
         let mut param = json!({ "archive-name": archive_name });
-        if let Some(size) = fixed_size {
+        let prefix = if let Some(size) = options.fixed_size {
             param["size"] = size.into();
-        }
+            "fixed"
+        } else {
+            "dynamic"
+        };
 
-        if encrypt && self.crypt_config.is_none() {
+        if options.encrypt && self.crypt_config.is_none() {
             bail!("requested encryption without a crypt config");
         }
 
         let index_path = format!("{}_index", prefix);
         let close_path = format!("{}_close", prefix);
 
-        if let Some(manifest) = previous_manifest {
+        if let Some(manifest) = options.previous_manifest {
             // try, but ignore errors
             match archive_type(archive_name) {
                 Ok(ArchiveType::FixedIndex) => {
@@ -250,8 +261,8 @@ impl BackupWriter {
                 stream,
                 &prefix,
                 known_chunks.clone(),
-                if encrypt { self.crypt_config.clone() } else { None },
-                compress,
+                if options.encrypt { self.crypt_config.clone() } else { None },
+                options.compress,
                 self.verbose,
             )
             .await?;
@@ -261,12 +272,12 @@ impl BackupWriter {
         let archive = if self.verbose {
             archive_name.to_string()
         } else {
-            crate::tools::format::strip_server_file_expenstion(archive_name.clone())
+            crate::tools::format::strip_server_file_extension(archive_name)
         };
         if archive_name != CATALOG_NAME {
-            let speed: HumanByte = (uploaded / (duration.as_secs() as usize)).into();
+            let speed: HumanByte = ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into();
             let uploaded: HumanByte = uploaded.into();
-            println!("{}: had to upload {} from {} in {}s, avgerage speed {}/s).", archive, uploaded, vsize_h, duration.as_secs(), speed);
+            println!("{}: had to upload {} of {} in {:.2}s, average speed {}/s).", archive, uploaded, vsize_h, duration.as_secs_f64(), speed);
         } else {
             println!("Uploaded backup catalog ({})", vsize_h);
         }
@@ -317,7 +328,7 @@ impl BackupWriter {
         // });
         // old code for reference?
         tokio::spawn(
-            verify_queue_rx
+            ReceiverStream::new(verify_queue_rx)
                 .map(Ok::<_, Error>)
                 .try_for_each(move |response: h2::client::ResponseFuture| {
                     response
@@ -334,18 +345,18 @@ impl BackupWriter {
         (verify_queue_tx, verify_result_rx)
     }
 
-    fn append_chunk_queue(h2: H2Client, wid: u64, path: String, verbose: bool) -> (
-        mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
-        oneshot::Receiver<Result<(), Error>>,
-    ) {
+    fn append_chunk_queue(
+        h2: H2Client,
+        wid: u64,
+        path: String,
+        verbose: bool,
+    ) -> (UploadQueueSender, UploadResultReceiver) {
         let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
         let (verify_result_tx, verify_result_rx) = oneshot::channel();
 
-        let h2_2 = h2.clone();
-
         // FIXME: async-block-ify this code!
         tokio::spawn(
-            verify_queue_rx
+            ReceiverStream::new(verify_queue_rx)
                 .map(Ok::<_, Error>)
                 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
                     match (response, merged_chunk_info) {
@@ -380,7 +391,7 @@ impl BackupWriter {
                             let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
                             let param_data = bytes::Bytes::from(param.to_string().into_bytes());
                             let upload_data = Some(param_data);
-                            h2_2.send_request(request, upload_data)
+                            h2.send_request(request, upload_data)
                                 .and_then(move |response| {
                                     response
                                         .map_err(Error::from)
@@ -471,6 +482,13 @@ impl BackupWriter {
         Ok(index)
     }
 
+    /// Retrieve backup time of last backup
+    pub async fn previous_backup_time(&self) -> Result<Option<i64>, Error> {
+        let data = self.h2.get("previous_backup_time", None).await?;
+        serde_json::from_value(data)
+            .map_err(|err| format_err!("Failed to parse backup time value returned by server - {}", err))
+    }
+
     /// Download backup manifest (index.json) of last backup
     pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
 
@@ -479,15 +497,19 @@ impl BackupWriter {
         let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
         self.h2.download("previous", Some(param), &mut raw_data).await?;
 
-        let blob = DataBlob::from_raw(raw_data)?;
-        blob.verify_crc()?;
-        let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
+        let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
+        // no expected digest available
+        let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref), None)?;
 
         let manifest = BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
 
         Ok(manifest)
     }
 
+    // We have no `self` here for `h2` and `verbose`, the only other arg "common" with 1 other
+    // funciton in the same path is `wid`, so those 3 could be in a struct, but there's no real use
+    // since this is a private method.
+    #[allow(clippy::too_many_arguments)]
     fn upload_chunk_info_stream(
         h2: H2Client,
         wid: u64,
@@ -514,7 +536,7 @@ impl BackupWriter {
         let is_fixed_chunk_size = prefix == "fixed";
 
         let (upload_queue, upload_result) =
-            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned(), verbose);
+            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, verbose);
 
         let start_time = std::time::Instant::now();
 
@@ -567,16 +589,19 @@ impl BackupWriter {
             })
             .merge_known_chunks()
             .try_for_each(move |merged_chunk_info| {
+                let upload_queue = upload_queue.clone();
 
                 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
                     let offset = chunk_info.offset;
                     let digest = chunk_info.digest;
                     let digest_str = digest_to_hex(&digest);
 
-                    if false && verbose { // TO verbose, needs finer verbosity setting granularity
+                    /* too verbose, needs finer verbosity setting granularity
+                    if verbose {
                         println!("upload new chunk {} ({} bytes, offset {})", digest_str,
                                  chunk_info.chunk_len, offset);
                     }
+                    */
 
                     let chunk_data = chunk_info.chunk.into_inner();
                     let param = json!({
@@ -592,7 +617,6 @@ impl BackupWriter {
 
                     let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
 
-                    let mut upload_queue = upload_queue.clone();
                     future::Either::Left(h2
                         .send_request(request, upload_data)
                         .and_then(move |response| async move {
@@ -603,7 +627,6 @@ impl BackupWriter {
                         })
                     )
                 } else {
-                    let mut upload_queue = upload_queue.clone();
                     future::Either::Right(async move {
                         upload_queue
                             .send((merged_chunk_info, None))
@@ -629,7 +652,7 @@ impl BackupWriter {
             })
     }
 
-    /// Upload speed test - prints result ot stderr
+    /// Upload speed test - prints result to stderr
     pub async fn upload_speedtest(&self, verbose: bool) -> Result<f64, Error> {
 
         let mut data = vec![];
@@ -655,8 +678,6 @@ impl BackupWriter {
                 break;
             }
 
-            let mut upload_queue = upload_queue.clone();
-
             if verbose { eprintln!("send test data ({} bytes)", data.len()); }
             let request = H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
             let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;