]> git.proxmox.com Git - proxmox-backup.git/commitdiff
don't require WorkerTask in backup/
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Mon, 12 Oct 2020 09:46:34 +0000 (11:46 +0200)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Mon, 12 Oct 2020 12:11:57 +0000 (14:11 +0200)
To untangle the server code from the actual backup
implementation.
It would be ideal if the whole backup/ dir could become its
own crate with minimal dependencies, certainly without
depending on the actual api server. That would then also be
used more easily to create forensic tools for all the data
file types we have in the backup repositories.

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
src/api2/admin/datastore.rs
src/backup/chunk_store.rs
src/backup/datastore.rs
src/backup/verify.rs
src/bin/proxmox-backup-proxy.rs

index af3af0ad9c1a73dbc2403ecdc31b7e5144d3bf15..c260b62da7721b002fc97fd125edb06bd5e16410 100644 (file)
@@ -518,7 +518,14 @@ pub fn verify(
 
             let failed_dirs = if let Some(backup_dir) = backup_dir {
                 let mut res = Vec::new();
-                if !verify_backup_dir(datastore, &backup_dir, verified_chunks, corrupt_chunks, worker.clone())? {
+                if !verify_backup_dir(
+                    datastore,
+                    &backup_dir,
+                    verified_chunks,
+                    corrupt_chunks,
+                    worker.clone(),
+                    worker.upid().clone(),
+                )? {
                     res.push(backup_dir.to_string());
                 }
                 res
@@ -530,10 +537,11 @@ pub fn verify(
                     corrupt_chunks,
                     None,
                     worker.clone(),
+                    worker.upid(),
                 )?;
                 failed_dirs
             } else {
-                verify_all_backups(datastore, worker.clone())?
+                verify_all_backups(datastore, worker.clone(), worker.upid())?
             };
             if failed_dirs.len() > 0 {
                 worker.log("Failed to verify following snapshots:");
@@ -770,7 +778,7 @@ fn start_garbage_collection(
         to_stdout,
         move |worker| {
             worker.log(format!("starting garbage collection on store {}", store));
-            datastore.garbage_collection(&worker)
+            datastore.garbage_collection(&*worker, worker.upid())
         },
     )?;
 
index 9c81ff27ce4942a353dc685f48ff9cf64f80b167..4397648b767ccca2fcdb86c3915d6a6336e5e127 100644 (file)
@@ -11,7 +11,7 @@ use crate::tools;
 use crate::api2::types::GarbageCollectionStatus;
 
 use super::DataBlob;
-use crate::server::WorkerTask;
+use crate::task::TaskState;
 
 /// File system based chunk store
 pub struct ChunkStore {
@@ -278,7 +278,7 @@ impl ChunkStore {
         oldest_writer: i64,
         phase1_start_time: i64,
         status: &mut GarbageCollectionStatus,
-        worker: &WorkerTask,
+        worker: &dyn TaskState,
     ) -> Result<(), Error> {
         use nix::sys::stat::fstatat;
         use nix::unistd::{unlinkat, UnlinkatFlags};
@@ -297,10 +297,15 @@ impl ChunkStore {
         for (entry, percentage, bad) in self.get_chunk_iterator()? {
             if last_percentage != percentage {
                 last_percentage = percentage;
-                worker.log(format!("percentage done: phase2 {}% (processed {} chunks)", percentage, chunk_count));
+                crate::task_log!(
+                    worker,
+                    "percentage done: phase2 {}% (processed {} chunks)",
+                    percentage,
+                    chunk_count,
+                );
             }
 
-            worker.fail_on_abort()?;
+            worker.check_abort()?;
             tools::fail_on_shutdown()?;
 
             let (dirfd, entry) = match entry {
@@ -334,12 +339,13 @@ impl ChunkStore {
                         Ok(_) => {
                             match unlinkat(Some(dirfd), filename, UnlinkatFlags::NoRemoveDir) {
                                 Err(err) =>
-                                    worker.warn(format!(
+                                    crate::task_warn!(
+                                        worker,
                                         "unlinking corrupt chunk {:?} failed on store '{}' - {}",
                                         filename,
                                         self.name,
                                         err,
-                                    )),
+                                    ),
                                 Ok(_) => {
                                     status.removed_bad += 1;
                                     status.removed_bytes += stat.st_size as u64;
@@ -351,11 +357,12 @@ impl ChunkStore {
                         },
                         Err(err) => {
                             // some other error, warn user and keep .bad file around too
-                            worker.warn(format!(
+                            crate::task_warn!(
+                                worker,
                                 "error during stat on '{:?}' - {}",
                                 orig_filename,
                                 err,
-                            ));
+                            );
                         }
                     }
                 } else if stat.st_atime < min_atime {
index 1b5f7f8aa3b7727e3cc88042184b7d381c282d63..1f708293aa0b81872718372a5356b4d914db80d3 100644 (file)
@@ -18,11 +18,12 @@ use super::manifest::{MANIFEST_BLOB_NAME, CLIENT_LOG_BLOB_NAME, BackupManifest};
 use super::index::*;
 use super::{DataBlob, ArchiveType, archive_type};
 use crate::config::datastore;
-use crate::server::WorkerTask;
+use crate::task::TaskState;
 use crate::tools;
 use crate::tools::format::HumanByte;
 use crate::tools::fs::{lock_dir_noblock, DirLockGuard};
 use crate::api2::types::{GarbageCollectionStatus, Userid};
+use crate::server::UPID;
 
 lazy_static! {
     static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
@@ -411,25 +412,34 @@ impl DataStore {
         index: I,
         file_name: &Path, // only used for error reporting
         status: &mut GarbageCollectionStatus,
-        worker: &WorkerTask,
+        worker: &dyn TaskState,
     ) -> Result<(), Error> {
 
         status.index_file_count += 1;
         status.index_data_bytes += index.index_bytes();
 
         for pos in 0..index.index_count() {
-            worker.fail_on_abort()?;
+            worker.check_abort()?;
             tools::fail_on_shutdown()?;
             let digest = index.index_digest(pos).unwrap();
             if let Err(err) = self.chunk_store.touch_chunk(digest) {
-                worker.warn(&format!("warning: unable to access chunk {}, required by {:?} - {}",
-                      proxmox::tools::digest_to_hex(digest), file_name, err));
+                crate::task_warn!(
+                    worker,
+                    "warning: unable to access chunk {}, required by {:?} - {}",
+                    proxmox::tools::digest_to_hex(digest),
+                    file_name,
+                    err,
+                );
             }
         }
         Ok(())
     }
 
-    fn mark_used_chunks(&self, status: &mut GarbageCollectionStatus, worker: &WorkerTask) -> Result<(), Error> {
+    fn mark_used_chunks(
+        &self,
+        status: &mut GarbageCollectionStatus,
+        worker: &dyn TaskState,
+    ) -> Result<(), Error> {
 
         let image_list = self.list_images()?;
 
@@ -441,7 +451,7 @@ impl DataStore {
 
         for path in image_list {
 
-            worker.fail_on_abort()?;
+            worker.check_abort()?;
             tools::fail_on_shutdown()?;
 
             if let Ok(archive_type) = archive_type(&path) {
@@ -457,8 +467,13 @@ impl DataStore {
 
             let percentage = done*100/image_count;
             if percentage > last_percentage {
-                worker.log(format!("percentage done: phase1 {}% ({} of {} index files)",
-                                   percentage, done, image_count));
+                crate::task_log!(
+                    worker,
+                    "percentage done: phase1 {}% ({} of {} index files)",
+                    percentage,
+                    done,
+                    image_count,
+                );
                 last_percentage = percentage;
             }
         }
@@ -474,7 +489,7 @@ impl DataStore {
         if let Ok(_) = self.gc_mutex.try_lock() { false } else { true }
     }
 
-    pub fn garbage_collection(&self, worker: &WorkerTask) -> Result<(), Error> {
+    pub fn garbage_collection(&self, worker: &dyn TaskState, upid: &UPID) -> Result<(), Error> {
 
         if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
 
@@ -487,36 +502,59 @@ impl DataStore {
             let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
 
             let mut gc_status = GarbageCollectionStatus::default();
-            gc_status.upid = Some(worker.to_string());
-
-            worker.log("Start GC phase1 (mark used chunks)");
-
-            self.mark_used_chunks(&mut gc_status, &worker)?;
-
-            worker.log("Start GC phase2 (sweep unused chunks)");
-            self.chunk_store.sweep_unused_chunks(oldest_writer, phase1_start_time, &mut gc_status, &worker)?;
-
-            worker.log(&format!("Removed garbage: {}", HumanByte::from(gc_status.removed_bytes)));
-            worker.log(&format!("Removed chunks: {}", gc_status.removed_chunks));
+            gc_status.upid = Some(upid.to_string());
+
+            crate::task_log!(worker, "Start GC phase1 (mark used chunks)");
+
+            self.mark_used_chunks(&mut gc_status, worker)?;
+
+            crate::task_log!(worker, "Start GC phase2 (sweep unused chunks)");
+            self.chunk_store.sweep_unused_chunks(
+                oldest_writer,
+                phase1_start_time,
+                &mut gc_status,
+                worker,
+            )?;
+
+            crate::task_log!(
+                worker,
+                "Removed garbage: {}",
+                HumanByte::from(gc_status.removed_bytes),
+            );
+            crate::task_log!(worker, "Removed chunks: {}", gc_status.removed_chunks);
             if gc_status.pending_bytes > 0 {
-                worker.log(&format!("Pending removals: {} (in {} chunks)", HumanByte::from(gc_status.pending_bytes), gc_status.pending_chunks));
+                crate::task_log!(
+                    worker,
+                    "Pending removals: {} (in {} chunks)",
+                    HumanByte::from(gc_status.pending_bytes),
+                    gc_status.pending_chunks,
+                );
             }
             if gc_status.removed_bad > 0 {
-                worker.log(&format!("Removed bad files: {}", gc_status.removed_bad));
+                crate::task_log!(worker, "Removed bad files: {}", gc_status.removed_bad);
             }
 
-            worker.log(&format!("Original data usage: {}", HumanByte::from(gc_status.index_data_bytes)));
+            crate::task_log!(
+                worker,
+                "Original data usage: {}",
+                HumanByte::from(gc_status.index_data_bytes),
+            );
 
             if gc_status.index_data_bytes > 0 {
                 let comp_per = (gc_status.disk_bytes as f64 * 100.)/gc_status.index_data_bytes as f64;
-                worker.log(&format!("On-Disk usage: {} ({:.2}%)", HumanByte::from(gc_status.disk_bytes), comp_per));
+                crate::task_log!(
+                    worker,
+                    "On-Disk usage: {} ({:.2}%)",
+                    HumanByte::from(gc_status.disk_bytes),
+                    comp_per,
+                );
             }
 
-            worker.log(&format!("On-Disk chunks: {}", gc_status.disk_chunks));
+            crate::task_log!(worker, "On-Disk chunks: {}", gc_status.disk_chunks);
 
             if gc_status.disk_chunks > 0 {
                 let avg_chunk = gc_status.disk_bytes/(gc_status.disk_chunks as u64);
-                worker.log(&format!("Average chunk size: {}", HumanByte::from(avg_chunk)));
+                crate::task_log!(worker, "Average chunk size: {}", HumanByte::from(avg_chunk));
             }
 
             *self.last_gc_status.lock().unwrap() = gc_status;
index 0c55305fafd3ae9ebb86e581dd7d553e40f2fb37..f2c38eae4ac7b2b0b5bfe5dcfa6d3a47313474f6 100644 (file)
@@ -6,9 +6,7 @@ use std::time::Instant;
 use anyhow::{bail, format_err, Error};
 
 use crate::{
-    server::WorkerTask,
     api2::types::*,
-    tools::ParallelHandler,
     backup::{
         DataStore,
         DataBlob,
@@ -21,6 +19,10 @@ use crate::{
         ArchiveType,
         archive_type,
     },
+    server::UPID,
+    task::TaskState,
+    task_log,
+    tools::ParallelHandler,
 };
 
 fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
@@ -51,7 +53,7 @@ fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInf
 fn rename_corrupted_chunk(
     datastore: Arc<DataStore>,
     digest: &[u8;32],
-    worker: Arc<WorkerTask>,
+    worker: &dyn TaskState,
 ) {
     let (path, digest_str) = datastore.chunk_path(digest);
 
@@ -64,12 +66,12 @@ fn rename_corrupted_chunk(
 
     match std::fs::rename(&path, &new_path) {
         Ok(_) => {
-            worker.log(format!("corrupted chunk renamed to {:?}", &new_path));
+            task_log!(worker, "corrupted chunk renamed to {:?}", &new_path);
         },
         Err(err) => {
             match err.kind() {
                 std::io::ErrorKind::NotFound => { /* ignored */ },
-                _ => worker.log(format!("could not rename corrupted chunk {:?} - {}", &path, err))
+                _ => task_log!(worker, "could not rename corrupted chunk {:?} - {}", &path, err)
             }
         }
     };
@@ -81,7 +83,7 @@ fn verify_index_chunks(
     verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
     corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
     crypt_mode: CryptMode,
-    worker: Arc<WorkerTask>,
+    worker: Arc<dyn TaskState + Send + Sync>,
 ) -> Result<(), Error> {
 
     let errors = Arc::new(AtomicUsize::new(0));
@@ -103,7 +105,7 @@ fn verify_index_chunks(
             let chunk_crypt_mode = match chunk.crypt_mode() {
                 Err(err) => {
                     corrupt_chunks2.lock().unwrap().insert(digest);
-                    worker2.log(format!("can't verify chunk, unknown CryptMode - {}", err));
+                    task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err);
                     errors2.fetch_add(1, Ordering::SeqCst);
                     return Ok(());
                 },
@@ -111,19 +113,20 @@ fn verify_index_chunks(
             };
 
             if chunk_crypt_mode != crypt_mode {
-                worker2.log(format!(
+                task_log!(
+                    worker2,
                     "chunk CryptMode {:?} does not match index CryptMode {:?}",
                     chunk_crypt_mode,
                     crypt_mode
-                ));
+                );
                 errors2.fetch_add(1, Ordering::SeqCst);
             }
 
             if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
                 corrupt_chunks2.lock().unwrap().insert(digest);
-                worker2.log(format!("{}", err));
+                task_log!(worker2, "{}", err);
                 errors2.fetch_add(1, Ordering::SeqCst);
-                rename_corrupted_chunk(datastore2.clone(), &digest, worker2.clone());
+                rename_corrupted_chunk(datastore2.clone(), &digest, &worker2);
             } else {
                 verified_chunks2.lock().unwrap().insert(digest);
             }
@@ -134,7 +137,7 @@ fn verify_index_chunks(
 
     for pos in 0..index.index_count() {
 
-        worker.fail_on_abort()?;
+        worker.check_abort()?;
         crate::tools::fail_on_shutdown()?;
 
         let info = index.chunk_info(pos).unwrap();
@@ -146,7 +149,7 @@ fn verify_index_chunks(
 
         if corrupt_chunks.lock().unwrap().contains(&info.digest) {
             let digest_str = proxmox::tools::digest_to_hex(&info.digest);
-            worker.log(format!("chunk {} was marked as corrupt", digest_str));
+            task_log!(worker, "chunk {} was marked as corrupt", digest_str);
             errors.fetch_add(1, Ordering::SeqCst);
             continue;
         }
@@ -154,9 +157,9 @@ fn verify_index_chunks(
         match datastore.load_chunk(&info.digest) {
             Err(err) => {
                 corrupt_chunks.lock().unwrap().insert(info.digest);
-                worker.log(format!("can't verify chunk, load failed - {}", err));
+                task_log!(worker, "can't verify chunk, load failed - {}", err);
                 errors.fetch_add(1, Ordering::SeqCst);
-                rename_corrupted_chunk(datastore.clone(), &info.digest, worker.clone());
+                rename_corrupted_chunk(datastore.clone(), &info.digest, &worker);
                 continue;
             }
             Ok(chunk) => {
@@ -179,8 +182,16 @@ fn verify_index_chunks(
 
     let error_count = errors.load(Ordering::SeqCst);
 
-    worker.log(format!("  verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
-                       read_bytes_mib, decoded_bytes_mib, elapsed, read_speed, decode_speed, error_count));
+    task_log!(
+        worker,
+        "  verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
+        read_bytes_mib,
+        decoded_bytes_mib,
+        elapsed,
+        read_speed,
+        decode_speed,
+        error_count,
+    );
 
     if errors.load(Ordering::SeqCst) > 0 {
         bail!("chunks could not be verified");
@@ -195,7 +206,7 @@ fn verify_fixed_index(
     info: &FileInfo,
     verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
     corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    worker: Arc<WorkerTask>,
+    worker: Arc<dyn TaskState + Send + Sync>,
 ) -> Result<(), Error> {
 
     let mut path = backup_dir.relative_path();
@@ -212,7 +223,14 @@ fn verify_fixed_index(
         bail!("wrong index checksum");
     }
 
-    verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker)
+    verify_index_chunks(
+        datastore,
+        Box::new(index),
+        verified_chunks,
+        corrupt_chunks,
+        info.chunk_crypt_mode(),
+        worker,
+    )
 }
 
 fn verify_dynamic_index(
@@ -221,7 +239,7 @@ fn verify_dynamic_index(
     info: &FileInfo,
     verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
     corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    worker: Arc<WorkerTask>,
+    worker: Arc<dyn TaskState + Send + Sync>,
 ) -> Result<(), Error> {
 
     let mut path = backup_dir.relative_path();
@@ -238,7 +256,14 @@ fn verify_dynamic_index(
         bail!("wrong index checksum");
     }
 
-    verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker)
+    verify_index_chunks(
+        datastore,
+        Box::new(index),
+        verified_chunks,
+        corrupt_chunks,
+        info.chunk_crypt_mode(),
+        worker,
+    )
 }
 
 /// Verify a single backup snapshot
@@ -255,25 +280,32 @@ pub fn verify_backup_dir(
     backup_dir: &BackupDir,
     verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
     corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    worker: Arc<WorkerTask>
+    worker: Arc<dyn TaskState + Send + Sync>,
+    upid: UPID,
 ) -> Result<bool, Error> {
 
     let mut manifest = match datastore.load_manifest(&backup_dir) {
         Ok((manifest, _)) => manifest,
         Err(err) => {
-            worker.log(format!("verify {}:{} - manifest load error: {}", datastore.name(), backup_dir, err));
+            task_log!(
+                worker,
+                "verify {}:{} - manifest load error: {}",
+                datastore.name(),
+                backup_dir,
+                err,
+            );
             return Ok(false);
         }
     };
 
-    worker.log(format!("verify {}:{}", datastore.name(), backup_dir));
+    task_log!(worker, "verify {}:{}", datastore.name(), backup_dir);
 
     let mut error_count = 0;
 
     let mut verify_result = VerifyState::Ok;
     for info in manifest.files() {
         let result = proxmox::try_block!({
-            worker.log(format!("  check {}", info.filename));
+            task_log!(worker, "  check {}", info.filename);
             match archive_type(&info.filename)? {
                 ArchiveType::FixedIndex =>
                     verify_fixed_index(
@@ -297,11 +329,18 @@ pub fn verify_backup_dir(
             }
         });
 
-        worker.fail_on_abort()?;
+        worker.check_abort()?;
         crate::tools::fail_on_shutdown()?;
 
         if let Err(err) = result {
-            worker.log(format!("verify {}:{}/{} failed: {}", datastore.name(), backup_dir, info.filename, err));
+            task_log!(
+                worker,
+                "verify {}:{}/{} failed: {}",
+                datastore.name(),
+                backup_dir,
+                info.filename,
+                err,
+            );
             error_count += 1;
             verify_result = VerifyState::Failed;
         }
@@ -310,7 +349,7 @@ pub fn verify_backup_dir(
 
     let verify_state = SnapshotVerifyState {
         state: verify_result,
-        upid: worker.upid().clone(),
+        upid,
     };
     manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?;
     datastore.store_manifest(&backup_dir, serde_json::to_value(manifest)?)
@@ -332,19 +371,26 @@ pub fn verify_backup_group(
     verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
     corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
     progress: Option<(usize, usize)>, // (done, snapshot_count)
-    worker: Arc<WorkerTask>,
+    worker: Arc<dyn TaskState + Send + Sync>,
+    upid: &UPID,
 ) -> Result<(usize, Vec<String>), Error> {
 
     let mut errors = Vec::new();
     let mut list = match group.list_backups(&datastore.base_path()) {
         Ok(list) => list,
         Err(err) => {
-            worker.log(format!("verify group {}:{} - unable to list backups: {}", datastore.name(), group, err));
+            task_log!(
+                worker,
+                "verify group {}:{} - unable to list backups: {}",
+                datastore.name(),
+                group,
+                err,
+            );
             return Ok((0, errors));
         }
     };
 
-    worker.log(format!("verify group {}:{}", datastore.name(), group));
+    task_log!(worker, "verify group {}:{}", datastore.name(), group);
 
     let (done, snapshot_count) = progress.unwrap_or((0, list.len()));
 
@@ -352,13 +398,26 @@ pub fn verify_backup_group(
     BackupInfo::sort_list(&mut list, false); // newest first
     for info in list {
         count += 1;
-        if !verify_backup_dir(datastore.clone(), &info.backup_dir, verified_chunks.clone(), corrupt_chunks.clone(), worker.clone())?{
+        if !verify_backup_dir(
+            datastore.clone(),
+            &info.backup_dir,
+            verified_chunks.clone(),
+            corrupt_chunks.clone(),
+            worker.clone(),
+            upid.clone(),
+        )? {
             errors.push(info.backup_dir.to_string());
         }
         if snapshot_count != 0 {
             let pos = done + count;
             let percentage = ((pos as f64) * 100.0)/(snapshot_count as f64);
-            worker.log(format!("percentage done: {:.2}% ({} of {} snapshots)", percentage, pos, snapshot_count));
+            task_log!(
+                worker,
+                "percentage done: {:.2}% ({} of {} snapshots)",
+                percentage,
+                pos,
+                snapshot_count,
+            );
         }
     }
 
@@ -372,8 +431,11 @@ pub fn verify_backup_group(
 /// Returns
 /// - Ok(failed_dirs) where failed_dirs had verification errors
 /// - Err(_) if task was aborted
-pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) -> Result<Vec<String>, Error> {
-
+pub fn verify_all_backups(
+    datastore: Arc<DataStore>,
+    worker: Arc<dyn TaskState + Send + Sync>,
+    upid: &UPID,
+) -> Result<Vec<String>, Error> {
     let mut errors = Vec::new();
 
     let mut list = match BackupGroup::list_groups(&datastore.base_path()) {
@@ -382,7 +444,12 @@ pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) ->
             .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
             .collect::<Vec<BackupGroup>>(),
         Err(err) => {
-            worker.log(format!("verify datastore {} - unable to list backups: {}", datastore.name(), err));
+            task_log!(
+                worker,
+                "verify datastore {} - unable to list backups: {}",
+                datastore.name(),
+                err,
+            );
             return Ok(errors);
         }
     };
@@ -400,7 +467,7 @@ pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) ->
     // start with 64 chunks since we assume there are few corrupt ones
     let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
 
-    worker.log(format!("verify datastore {} ({} snapshots)", datastore.name(), snapshot_count));
+    task_log!(worker, "verify datastore {} ({} snapshots)", datastore.name(), snapshot_count);
 
     let mut done = 0;
     for group in list {
@@ -411,6 +478,7 @@ pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) ->
             corrupt_chunks.clone(),
             Some((done, snapshot_count)),
             worker.clone(),
+            upid,
         )?;
         errors.append(&mut group_errors);
 
index 67fbc54174ef54da65ea593eae5d304324d7ea10..b28ac035594b945ed856afe2041b792f67245ebd 100644 (file)
@@ -306,7 +306,7 @@ async fn schedule_datastore_garbage_collection() {
                 worker.log(format!("starting garbage collection on store {}", store));
                 worker.log(format!("task triggered by schedule '{}'", event_str));
 
-                let result = datastore.garbage_collection(&worker);
+                let result = datastore.garbage_collection(&*worker, worker.upid());
 
                 let status = worker.create_state(&result);
 
@@ -557,7 +557,8 @@ async fn schedule_datastore_verification() {
                 worker.log(format!("starting verification on store {}", store2));
                 worker.log(format!("task triggered by schedule '{}'", event_str));
                 let result = try_block!({
-                    let failed_dirs = verify_all_backups(datastore, worker.clone())?;
+                    let failed_dirs =
+                        verify_all_backups(datastore, worker.clone(), worker.upid())?;
                     if failed_dirs.len() > 0 {
                         worker.log("Failed to verify following snapshots:");
                         for dir in failed_dirs {