]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/client/backup_writer.rs
tokio 1.0: use ReceiverStream from tokio-stream
[proxmox-backup.git] / src / client / backup_writer.rs
index 0b0ef93bb29da21413c57601eb3fe43ce7d2b82b..bcbd6f28c26e600df83023622e3f5c7fda9f4489 100644 (file)
@@ -4,13 +4,13 @@ use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 
 use anyhow::{bail, format_err, Error};
-use chrono::{DateTime, Utc};
 use futures::*;
 use futures::stream::Stream;
 use futures::future::AbortHandle;
 use serde_json::{json, Value};
 use tokio::io::AsyncReadExt;
 use tokio::sync::{mpsc, oneshot};
+use tokio_stream::wrappers::ReceiverStream;
 
 use proxmox::tools::digest_to_hex;
 
@@ -39,6 +39,9 @@ pub struct BackupStats {
     pub csum: [u8; 32],
 }
 
+type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
+type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
+
 impl BackupWriter {
 
     fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> {
@@ -51,20 +54,22 @@ impl BackupWriter {
         datastore: &str,
         backup_type: &str,
         backup_id: &str,
-        backup_time: DateTime<Utc>,
+        backup_time: i64,
         debug: bool,
+        benchmark: bool
     ) -> Result<Arc<BackupWriter>, Error> {
 
         let param = json!({
             "backup-type": backup_type,
             "backup-id": backup_id,
-            "backup-time": backup_time.timestamp(),
+            "backup-time": backup_time,
             "store": datastore,
-            "debug": debug
+            "debug": debug,
+            "benchmark": benchmark
         });
 
         let req = HttpClient::request_builder(
-            client.server(), "GET", "/api2/json/backup", Some(param)).unwrap();
+            client.server(), client.port(), "GET", "/api2/json/backup", Some(param)).unwrap();
 
         let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
 
@@ -261,7 +266,7 @@ impl BackupWriter {
         let archive = if self.verbose {
             archive_name.to_string()
         } else {
-            crate::tools::format::strip_server_file_expenstion(archive_name.clone())
+            crate::tools::format::strip_server_file_extension(archive_name)
         };
         if archive_name != CATALOG_NAME {
             let speed: HumanByte = ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into();
@@ -317,7 +322,7 @@ impl BackupWriter {
         // });
         // old code for reference?
         tokio::spawn(
-            verify_queue_rx
+            ReceiverStream::new(verify_queue_rx)
                 .map(Ok::<_, Error>)
                 .try_for_each(move |response: h2::client::ResponseFuture| {
                     response
@@ -334,18 +339,18 @@ impl BackupWriter {
         (verify_queue_tx, verify_result_rx)
     }
 
-    fn append_chunk_queue(h2: H2Client, wid: u64, path: String, verbose: bool) -> (
-        mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
-        oneshot::Receiver<Result<(), Error>>,
-    ) {
+    fn append_chunk_queue(
+        h2: H2Client,
+        wid: u64,
+        path: String,
+        verbose: bool,
+    ) -> (UploadQueueSender, UploadResultReceiver) {
         let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
         let (verify_result_tx, verify_result_rx) = oneshot::channel();
 
-        let h2_2 = h2.clone();
-
         // FIXME: async-block-ify this code!
         tokio::spawn(
-            verify_queue_rx
+            ReceiverStream::new(verify_queue_rx)
                 .map(Ok::<_, Error>)
                 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
                     match (response, merged_chunk_info) {
@@ -380,7 +385,7 @@ impl BackupWriter {
                             let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
                             let param_data = bytes::Bytes::from(param.to_string().into_bytes());
                             let upload_data = Some(param_data);
-                            h2_2.send_request(request, upload_data)
+                            h2.send_request(request, upload_data)
                                 .and_then(move |response| {
                                     response
                                         .map_err(Error::from)
@@ -471,6 +476,13 @@ impl BackupWriter {
         Ok(index)
     }
 
+    /// Retrieve backup time of last backup
+    pub async fn previous_backup_time(&self) -> Result<Option<i64>, Error> {
+        let data = self.h2.get("previous_backup_time", None).await?;
+        serde_json::from_value(data)
+            .map_err(|err| format_err!("Failed to parse backup time value returned by server - {}", err))
+    }
+
     /// Download backup manifest (index.json) of last backup
     pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
 
@@ -480,13 +492,18 @@ impl BackupWriter {
         self.h2.download("previous", Some(param), &mut raw_data).await?;
 
         let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
-        let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
+        // no expected digest available
+        let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref), None)?;
 
         let manifest = BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
 
         Ok(manifest)
     }
 
+    // We have no `self` here for `h2` and `verbose`, the only other arg "common" with 1 other
+    // funciton in the same path is `wid`, so those 3 could be in a struct, but there's no real use
+    // since this is a private method.
+    #[allow(clippy::too_many_arguments)]
     fn upload_chunk_info_stream(
         h2: H2Client,
         wid: u64,
@@ -513,7 +530,7 @@ impl BackupWriter {
         let is_fixed_chunk_size = prefix == "fixed";
 
         let (upload_queue, upload_result) =
-            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned(), verbose);
+            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, verbose);
 
         let start_time = std::time::Instant::now();
 
@@ -572,10 +589,12 @@ impl BackupWriter {
                     let digest = chunk_info.digest;
                     let digest_str = digest_to_hex(&digest);
 
-                    if false && verbose { // TO verbose, needs finer verbosity setting granularity
+                    /* too verbose, needs finer verbosity setting granularity
+                    if verbose {
                         println!("upload new chunk {} ({} bytes, offset {})", digest_str,
                                  chunk_info.chunk_len, offset);
                     }
+                    */
 
                     let chunk_data = chunk_info.chunk.into_inner();
                     let param = json!({
@@ -628,7 +647,7 @@ impl BackupWriter {
             })
     }
 
-    /// Upload speed test - prints result ot stderr
+    /// Upload speed test - prints result to stderr
     pub async fn upload_speedtest(&self, verbose: bool) -> Result<f64, Error> {
 
         let mut data = vec![];