]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/backup/verify.rs
use new proxmox-sys crate
[proxmox-backup.git] / src / backup / verify.rs
index 2f196bc0290aafa27aefc0acdd4a31f80bc9b9c7..a1d8a838b49a2a71bfd251f33472ca4c80f1b2c4 100644 (file)
@@ -1,21 +1,50 @@
+use nix::dir::Dir;
 use std::collections::HashSet;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
-use std::sync::atomic::{Ordering, AtomicUsize};
 use std::time::Instant;
 
 use anyhow::{bail, format_err, Error};
 
-use crate::server::WorkerTask;
-use crate::api2::types::*;
+use proxmox_sys::{task_log, worker_task_context::WorkerTaskContext};
 
-use super::{
-    DataStore, DataBlob, BackupGroup, BackupDir, BackupInfo, IndexFile,
-    CryptMode,
-    FileInfo, ArchiveType, archive_type,
-};
+use pbs_api_types::{Authid, CryptMode, VerifyState, UPID, SnapshotVerifyState};
+use pbs_datastore::{DataStore, DataBlob, StoreProgress};
+use pbs_datastore::backup_info::{BackupGroup, BackupDir, BackupInfo};
+use pbs_datastore::index::IndexFile;
+use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, FileInfo};
+use pbs_tools::fs::lock_dir_noblock_shared;
 
-fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
+use crate::tools::ParallelHandler;
 
+/// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
+/// already been verified or detected as corrupt.
+pub struct VerifyWorker {
+    worker: Arc<dyn WorkerTaskContext>,
+    datastore: Arc<DataStore>,
+    verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+}
+
+impl VerifyWorker {
+    /// Creates a new VerifyWorker for a given task worker and datastore.
+    pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore>) -> Self {
+        Self {
+            worker,
+            datastore,
+            // start with 16k chunks == up to 64G data
+            verified_chunks: Arc::new(Mutex::new(HashSet::with_capacity(16 * 1024))),
+            // start with 64 chunks since we assume there are few corrupt ones
+            corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
+        }
+    }
+}
+
+fn verify_blob(
+    datastore: Arc<DataStore>,
+    backup_dir: &BackupDir,
+    info: &FileInfo,
+) -> Result<(), Error> {
     let blob = datastore.load_blob(backup_dir, &info.filename)?;
 
     let raw_size = blob.raw_size();
@@ -39,131 +68,171 @@ fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInf
     }
 }
 
-// We use a separate thread to read/load chunks, so that we can do
-// load and verify in parallel to increase performance.
-fn chunk_reader_thread(
+fn rename_corrupted_chunk(
     datastore: Arc<DataStore>,
-    index: Box<dyn IndexFile + Send>,
-    verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    errors: Arc<AtomicUsize>,
-    worker: Arc<WorkerTask>,
-) -> std::sync::mpsc::Receiver<(DataBlob, [u8;32], u64)> {
-
-    let (sender, receiver) = std::sync::mpsc::sync_channel(3); // buffer up to 3 chunks
+    digest: &[u8;32],
+    worker: &dyn WorkerTaskContext,
+) {
+    let (path, digest_str) = datastore.chunk_path(digest);
 
-    std::thread::spawn(move|| {
-        for pos in 0..index.index_count() {
-            let info = index.chunk_info(pos).unwrap();
-            let size = info.range.end - info.range.start;
-
-            if verified_chunks.lock().unwrap().contains(&info.digest) {
-                continue; // already verified
-            }
-
-            if corrupt_chunks.lock().unwrap().contains(&info.digest) {
-                let digest_str = proxmox::tools::digest_to_hex(&info.digest);
-                worker.log(format!("chunk {} was marked as corrupt", digest_str));
-                errors.fetch_add(1, Ordering::SeqCst);
-                continue;
-            }
+    let mut counter = 0;
+    let mut new_path = path.clone();
+    loop {
+        new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
+        if new_path.exists() && counter < 9 {
+            counter += 1;
+        } else {
+            break;
+        }
+    }
 
-            match datastore.load_chunk(&info.digest) {
-                Err(err) => {
-                    corrupt_chunks.lock().unwrap().insert(info.digest);
-                    worker.log(format!("can't verify chunk, load failed - {}", err));
-                    errors.fetch_add(1, Ordering::SeqCst);
-                    continue;
-                }
-                Ok(chunk) => {
-                    if sender.send((chunk, info.digest, size)).is_err() {
-                        break; // receiver gone - simply stop
-                    }
-                }
+    match std::fs::rename(&path, &new_path) {
+        Ok(_) => {
+            task_log!(worker, "corrupted chunk renamed to {:?}", &new_path);
+        },
+        Err(err) => {
+            match err.kind() {
+                std::io::ErrorKind::NotFound => { /* ignored */ },
+                _ => task_log!(worker, "could not rename corrupted chunk {:?} - {}", &path, err)
             }
         }
-    });
-
-    receiver
+    };
 }
 
 fn verify_index_chunks(
-    datastore: Arc<DataStore>,
+    verify_worker: &VerifyWorker,
     index: Box<dyn IndexFile + Send>,
-    verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
     crypt_mode: CryptMode,
-    worker: Arc<WorkerTask>,
 ) -> Result<(), Error> {
-
     let errors = Arc::new(AtomicUsize::new(0));
 
     let start_time = Instant::now();
 
-    let chunk_channel = chunk_reader_thread(
-        datastore,
-        index,
-        verified_chunks.clone(),
-        corrupt_chunks.clone(),
-        errors.clone(),
-        worker.clone(),
-    );
-
     let mut read_bytes = 0;
     let mut decoded_bytes = 0;
 
-    loop {
-
-        worker.fail_on_abort()?;
-
-        let (chunk, digest, size) = match chunk_channel.recv() {
-            Ok(tuple) => tuple,
-            Err(std::sync::mpsc::RecvError) => break,
-        };
+    let worker2 = Arc::clone(&verify_worker.worker);
+    let datastore2 = Arc::clone(&verify_worker.datastore);
+    let corrupt_chunks2 = Arc::clone(&verify_worker.corrupt_chunks);
+    let verified_chunks2 = Arc::clone(&verify_worker.verified_chunks);
+    let errors2 = Arc::clone(&errors);
+
+    let decoder_pool = ParallelHandler::new(
+        "verify chunk decoder",
+        4,
+        move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
+            let chunk_crypt_mode = match chunk.crypt_mode() {
+                Err(err) => {
+                    corrupt_chunks2.lock().unwrap().insert(digest);
+                    task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err);
+                    errors2.fetch_add(1, Ordering::SeqCst);
+                    return Ok(());
+                },
+                Ok(mode) => mode,
+            };
+
+            if chunk_crypt_mode != crypt_mode {
+                task_log!(
+                    worker2,
+                    "chunk CryptMode {:?} does not match index CryptMode {:?}",
+                    chunk_crypt_mode,
+                    crypt_mode
+                );
+                errors2.fetch_add(1, Ordering::SeqCst);
+            }
 
-        read_bytes += chunk.raw_size();
-        decoded_bytes += size;
+            if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
+                corrupt_chunks2.lock().unwrap().insert(digest);
+                task_log!(worker2, "{}", err);
+                errors2.fetch_add(1, Ordering::SeqCst);
+                rename_corrupted_chunk(datastore2.clone(), &digest, &worker2);
+            } else {
+                verified_chunks2.lock().unwrap().insert(digest);
+            }
 
-        let chunk_crypt_mode = match chunk.crypt_mode() {
-            Err(err) => {
-                corrupt_chunks.lock().unwrap().insert(digest);
-                worker.log(format!("can't verify chunk, unknown CryptMode - {}", err));
-                errors.fetch_add(1, Ordering::SeqCst);
-                continue;
-            },
-            Ok(mode) => mode,
-        };
-
-        if chunk_crypt_mode != crypt_mode {
-            worker.log(format!(
-                "chunk CryptMode {:?} does not match index CryptMode {:?}",
-                chunk_crypt_mode,
-                crypt_mode
-            ));
-            errors.fetch_add(1, Ordering::SeqCst);
+            Ok(())
         }
+    );
 
-        if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
-            corrupt_chunks.lock().unwrap().insert(digest);
-            worker.log(format!("{}", err));
+    let skip_chunk = |digest: &[u8; 32]| -> bool {
+        if verify_worker.verified_chunks.lock().unwrap().contains(digest) {
+            true
+        } else if verify_worker.corrupt_chunks.lock().unwrap().contains(digest) {
+            let digest_str = proxmox::tools::digest_to_hex(digest);
+            task_log!(verify_worker.worker, "chunk {} was marked as corrupt", digest_str);
             errors.fetch_add(1, Ordering::SeqCst);
+            true
         } else {
-            verified_chunks.lock().unwrap().insert(digest);
+            false
+        }
+    };
+
+    let check_abort = |pos: usize| -> Result<(), Error> {
+        if pos & 1023 == 0 {
+            verify_worker.worker.check_abort()?;
+            verify_worker.worker.fail_on_shutdown()?;
+        }
+        Ok(())
+    };
+
+    let chunk_list =
+        verify_worker
+            .datastore
+            .get_chunks_in_order(&index, skip_chunk, check_abort)?;
+
+    for (pos, _) in chunk_list {
+        verify_worker.worker.check_abort()?;
+        verify_worker.worker.fail_on_shutdown()?;
+
+        let info = index.chunk_info(pos).unwrap();
+
+        // we must always recheck this here, the parallel worker below alter it!
+        if skip_chunk(&info.digest) {
+            continue; // already verified or marked corrupt
+        }
+
+        match verify_worker.datastore.load_chunk(&info.digest) {
+            Err(err) => {
+                verify_worker.corrupt_chunks.lock().unwrap().insert(info.digest);
+                task_log!(verify_worker.worker, "can't verify chunk, load failed - {}", err);
+                errors.fetch_add(1, Ordering::SeqCst);
+                rename_corrupted_chunk(
+                    verify_worker.datastore.clone(),
+                    &info.digest,
+                    &verify_worker.worker,
+                );
+            }
+            Ok(chunk) => {
+                let size = info.size();
+                read_bytes += chunk.raw_size();
+                decoder_pool.send((chunk, info.digest, size))?;
+                decoded_bytes += size;
+            }
         }
     }
 
+    decoder_pool.complete()?;
+
     let elapsed = start_time.elapsed().as_secs_f64();
 
-    let read_bytes_mib = (read_bytes as f64)/(1024.0*1024.0);
-    let decoded_bytes_mib = (decoded_bytes as f64)/(1024.0*1024.0);
+    let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
+    let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
 
-    let read_speed = read_bytes_mib/elapsed;
-    let decode_speed = decoded_bytes_mib/elapsed;
+    let read_speed = read_bytes_mib / elapsed;
+    let decode_speed = decoded_bytes_mib / elapsed;
 
     let error_count = errors.load(Ordering::SeqCst);
 
-    worker.log(format!("  verified {:.2}/{:.2} Mib in {:.2} seconds, speed {:.2}/{:.2} Mib/s ({} errors)",
-                       read_bytes_mib, decoded_bytes_mib, elapsed, read_speed, decode_speed, error_count));
+    task_log!(
+        verify_worker.worker,
+        "  verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
+        read_bytes_mib,
+        decoded_bytes_mib,
+        elapsed,
+        read_speed,
+        decode_speed,
+        error_count,
+    );
 
     if errors.load(Ordering::SeqCst) > 0 {
         bail!("chunks could not be verified");
@@ -173,18 +242,14 @@ fn verify_index_chunks(
 }
 
 fn verify_fixed_index(
-    datastore: Arc<DataStore>,
+    verify_worker: &VerifyWorker,
     backup_dir: &BackupDir,
     info: &FileInfo,
-    verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    worker: Arc<WorkerTask>,
 ) -> Result<(), Error> {
-
     let mut path = backup_dir.relative_path();
     path.push(&info.filename);
 
-    let index = datastore.open_fixed_reader(&path)?;
+    let index = verify_worker.datastore.open_fixed_reader(&path)?;
 
     let (csum, size) = index.compute_csum();
     if size != info.size {
@@ -195,22 +260,18 @@ fn verify_fixed_index(
         bail!("wrong index checksum");
     }
 
-    verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker)
+    verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
 }
 
 fn verify_dynamic_index(
-    datastore: Arc<DataStore>,
+    verify_worker: &VerifyWorker,
     backup_dir: &BackupDir,
     info: &FileInfo,
-    verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    worker: Arc<WorkerTask>,
 ) -> Result<(), Error> {
-
     let mut path = backup_dir.relative_path();
     path.push(&info.filename);
 
-    let index = datastore.open_dynamic_reader(&path)?;
+    let index = verify_worker.datastore.open_dynamic_reader(&path)?;
 
     let (csum, size) = index.compute_csum();
     if size != info.size {
@@ -221,7 +282,7 @@ fn verify_dynamic_index(
         bail!("wrong index checksum");
     }
 
-    verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker)
+    verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
 }
 
 /// Verify a single backup snapshot
@@ -234,70 +295,112 @@ fn verify_dynamic_index(
 /// - Ok(false) if there were verification errors
 /// - Err(_) if task was aborted
 pub fn verify_backup_dir(
-    datastore: Arc<DataStore>,
+    verify_worker: &VerifyWorker,
     backup_dir: &BackupDir,
-    verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    worker: Arc<WorkerTask>
+    upid: UPID,
+    filter: Option<&dyn Fn(&BackupManifest) -> bool>,
 ) -> Result<bool, Error> {
+    let snap_lock = lock_dir_noblock_shared(
+        &verify_worker.datastore.snapshot_path(&backup_dir),
+        "snapshot",
+        "locked by another operation",
+    );
+    match snap_lock {
+        Ok(snap_lock) => {
+            verify_backup_dir_with_lock(verify_worker, backup_dir, upid, filter, snap_lock)
+        }
+        Err(err) => {
+            task_log!(
+                verify_worker.worker,
+                "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
+                verify_worker.datastore.name(),
+                backup_dir,
+                err,
+            );
+            Ok(true)
+        }
+    }
+}
 
-    let mut manifest = match datastore.load_manifest(&backup_dir) {
+/// See verify_backup_dir
+pub fn verify_backup_dir_with_lock(
+    verify_worker: &VerifyWorker,
+    backup_dir: &BackupDir,
+    upid: UPID,
+    filter: Option<&dyn Fn(&BackupManifest) -> bool>,
+    _snap_lock: Dir,
+) -> Result<bool, Error> {
+    let manifest = match verify_worker.datastore.load_manifest(&backup_dir) {
         Ok((manifest, _)) => manifest,
         Err(err) => {
-            worker.log(format!("verify {}:{} - manifest load error: {}", datastore.name(), backup_dir, err));
+            task_log!(
+                verify_worker.worker,
+                "verify {}:{} - manifest load error: {}",
+                verify_worker.datastore.name(),
+                backup_dir,
+                err,
+            );
             return Ok(false);
         }
     };
 
-    worker.log(format!("verify {}:{}", datastore.name(), backup_dir));
+    if let Some(filter) = filter {
+        if !filter(&manifest) {
+            task_log!(
+                verify_worker.worker,
+                "SKIPPED: verify {}:{} (recently verified)",
+                verify_worker.datastore.name(),
+                backup_dir,
+            );
+            return Ok(true);
+        }
+    }
+
+    task_log!(verify_worker.worker, "verify {}:{}", verify_worker.datastore.name(), backup_dir);
 
     let mut error_count = 0;
 
-    let mut verify_result = "ok";
+    let mut verify_result = VerifyState::Ok;
     for info in manifest.files() {
-        let result = proxmox::try_block!({
-            worker.log(format!("  check {}", info.filename));
+        let result = proxmox_lang::try_block!({
+            task_log!(verify_worker.worker, "  check {}", info.filename);
             match archive_type(&info.filename)? {
-                ArchiveType::FixedIndex =>
-                    verify_fixed_index(
-                        datastore.clone(),
-                        &backup_dir,
-                        info,
-                        verified_chunks.clone(),
-                        corrupt_chunks.clone(),
-                        worker.clone(),
-                    ),
-                ArchiveType::DynamicIndex =>
-                    verify_dynamic_index(
-                        datastore.clone(),
-                        &backup_dir,
-                        info,
-                        verified_chunks.clone(),
-                        corrupt_chunks.clone(),
-                        worker.clone(),
-                    ),
-                ArchiveType::Blob => verify_blob(datastore.clone(), &backup_dir, info),
+                ArchiveType::FixedIndex => verify_fixed_index(verify_worker, &backup_dir, info),
+                ArchiveType::DynamicIndex => verify_dynamic_index(verify_worker, &backup_dir, info),
+                ArchiveType::Blob => {
+                    verify_blob(verify_worker.datastore.clone(), &backup_dir, info)
+                }
             }
         });
 
-        worker.fail_on_abort()?;
+        verify_worker.worker.check_abort()?;
+        verify_worker.worker.fail_on_shutdown()?;
 
         if let Err(err) = result {
-            worker.log(format!("verify {}:{}/{} failed: {}", datastore.name(), backup_dir, info.filename, err));
+            task_log!(
+                verify_worker.worker,
+                "verify {}:{}/{} failed: {}",
+                verify_worker.datastore.name(),
+                backup_dir,
+                info.filename,
+                err,
+            );
             error_count += 1;
-            verify_result = "failed";
+            verify_result = VerifyState::Failed;
         }
-
     }
 
     let verify_state = SnapshotVerifyState {
-        state: verify_result.to_string(),
-        upid: worker.upid().clone(),
+        state: verify_result,
+        upid,
     };
-    manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?;
-    datastore.store_manifest(&backup_dir, serde_json::to_value(manifest)?)
-        .map_err(|err| format_err!("unable to store manifest blob - {}", err))?;
-
+    let verify_state = serde_json::to_value(verify_state)?;
+    verify_worker
+        .datastore
+        .update_manifest(&backup_dir, |manifest| {
+            manifest.unprotected["verify_state"] = verify_state;
+        })
+        .map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
 
     Ok(error_count == 0)
 }
@@ -310,92 +413,156 @@ pub fn verify_backup_dir(
 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
 /// - Err(_) if task was aborted
 pub fn verify_backup_group(
-    datastore: Arc<DataStore>,
+    verify_worker: &VerifyWorker,
     group: &BackupGroup,
-    verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    progress: Option<(usize, usize)>, // (done, snapshot_count)
-    worker: Arc<WorkerTask>,
-) -> Result<(usize, Vec<String>), Error> {
-
+    progress: &mut StoreProgress,
+    upid: &UPID,
+    filter: Option<&dyn Fn(&BackupManifest) -> bool>,
+) -> Result<Vec<String>, Error> {
     let mut errors = Vec::new();
-    let mut list = match group.list_backups(&datastore.base_path()) {
+    let mut list = match group.list_backups(&verify_worker.datastore.base_path()) {
         Ok(list) => list,
         Err(err) => {
-            worker.log(format!("verify group {}:{} - unable to list backups: {}", datastore.name(), group, err));
-            return Ok((0, errors));
+            task_log!(
+                verify_worker.worker,
+                "verify group {}:{} - unable to list backups: {}",
+                verify_worker.datastore.name(),
+                group,
+                err,
+            );
+            return Ok(errors);
         }
     };
 
-    worker.log(format!("verify group {}:{}", datastore.name(), group));
+    let snapshot_count = list.len();
+    task_log!(
+        verify_worker.worker,
+        "verify group {}:{} ({} snapshots)",
+        verify_worker.datastore.name(),
+        group,
+        snapshot_count
+    );
 
-    let (done, snapshot_count) = progress.unwrap_or((0, list.len()));
+    progress.group_snapshots = snapshot_count as u64;
 
-    let mut count = 0;
     BackupInfo::sort_list(&mut list, false); // newest first
-    for info in list {
-        count += 1;
-        if !verify_backup_dir(datastore.clone(), &info.backup_dir, verified_chunks.clone(), corrupt_chunks.clone(), worker.clone())?{
+    for (pos, info) in list.into_iter().enumerate() {
+        if !verify_backup_dir(verify_worker, &info.backup_dir, upid.clone(), filter)? {
             errors.push(info.backup_dir.to_string());
         }
-        if snapshot_count != 0 {
-            let pos = done + count;
-            let percentage = ((pos as f64) * 100.0)/(snapshot_count as f64);
-            worker.log(format!("percentage done: {:.2}% ({} of {} snapshots)", percentage, pos, snapshot_count));
-        }
+        progress.done_snapshots = pos as u64 + 1;
+        task_log!(verify_worker.worker, "percentage done: {}", progress);
     }
 
-    Ok((count, errors))
+    Ok(errors)
 }
 
-/// Verify all backups inside a datastore
+/// Verify all (owned) backups inside a datastore
 ///
 /// Errors are logged to the worker log.
 ///
 /// Returns
 /// - Ok(failed_dirs) where failed_dirs had verification errors
 /// - Err(_) if task was aborted
-pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) -> Result<Vec<String>, Error> {
-
+pub fn verify_all_backups(
+    verify_worker: &VerifyWorker,
+    upid: &UPID,
+    owner: Option<Authid>,
+    filter: Option<&dyn Fn(&BackupManifest) -> bool>,
+) -> Result<Vec<String>, Error> {
     let mut errors = Vec::new();
+    let worker = Arc::clone(&verify_worker.worker);
 
-    let mut list = match BackupGroup::list_groups(&datastore.base_path()) {
-        Ok(list) => list,
+    task_log!(worker, "verify datastore {}", verify_worker.datastore.name());
+
+    if let Some(owner) = &owner {
+        task_log!(worker, "limiting to backups owned by {}", owner);
+    }
+
+    let filter_by_owner = |group: &BackupGroup| {
+        match (verify_worker.datastore.get_owner(group), &owner) {
+            (Ok(ref group_owner), Some(owner)) => {
+                group_owner == owner
+                    || (group_owner.is_token()
+                        && !owner.is_token()
+                        && group_owner.user() == owner.user())
+            },
+            (Ok(_), None) => true,
+            (Err(err), Some(_)) => {
+                // intentionally not in task log
+                // the task user might not be allowed to see this group!
+                println!("Failed to get owner of group '{}' - {}", group, err);
+                false
+            },
+            (Err(err), None) => {
+                // we don't filter by owner, but we want to log the error
+                task_log!(
+                    worker,
+                    "Failed to get owner of group '{} - {}",
+                    group,
+                    err,
+                );
+                errors.push(group.to_string());
+                true
+            },
+        }
+    };
+
+    let mut list = match BackupInfo::list_backup_groups(&verify_worker.datastore.base_path()) {
+        Ok(list) => list
+            .into_iter()
+            .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
+            .filter(filter_by_owner)
+            .collect::<Vec<BackupGroup>>(),
         Err(err) => {
-            worker.log(format!("verify datastore {} - unable to list backups: {}", datastore.name(), err));
+            task_log!(worker, "unable to list backups: {}", err,);
             return Ok(errors);
         }
     };
 
     list.sort_unstable();
 
-    let mut snapshot_count = 0;
-    for group in list.iter() {
-        snapshot_count += group.list_backups(&datastore.base_path())?.len();
-    }
-
-    // start with 16384 chunks (up to 65GB)
-    let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16)));
+    let group_count = list.len();
+    task_log!(worker, "found {} groups", group_count);
 
-    // start with 64 chunks since we assume there are few corrupt ones
-    let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
+    let mut progress = StoreProgress::new(group_count as u64);
 
-    worker.log(format!("verify datastore {} ({} snapshots)", datastore.name(), snapshot_count));
+    for (pos, group) in list.into_iter().enumerate() {
+        progress.done_groups = pos as u64;
+        progress.done_snapshots = 0;
+        progress.group_snapshots = 0;
 
-    let mut done = 0;
-    for group in list {
-        let (count, mut group_errors) = verify_backup_group(
-            datastore.clone(),
-            &group,
-            verified_chunks.clone(),
-            corrupt_chunks.clone(),
-            Some((done, snapshot_count)),
-            worker.clone(),
-        )?;
+        let mut group_errors =
+            verify_backup_group(verify_worker, &group, &mut progress, upid, filter)?;
         errors.append(&mut group_errors);
-
-        done += count;
     }
 
     Ok(errors)
 }
+
+/// Filter for the verification of snapshots
+pub fn verify_filter(
+    ignore_verified_snapshots: bool,
+    outdated_after: Option<i64>,
+    manifest: &BackupManifest,
+) -> bool {
+    if !ignore_verified_snapshots {
+        return true;
+    }
+
+    let raw_verify_state = manifest.unprotected["verify_state"].clone();
+    match serde_json::from_value::<SnapshotVerifyState>(raw_verify_state) {
+        Err(_) => true, // no last verification, always include
+        Ok(last_verify) => {
+            match outdated_after {
+                None => false, // never re-verify if ignored and no max age
+                Some(max_age) => {
+                    let now = proxmox_time::epoch_i64();
+                    let days_since_last_verify = (now - last_verify.upid.starttime) / 86400;
+
+                    days_since_last_verify > max_age
+                }
+            }
+        }
+    }
+}