+use nix::dir::Dir;
use std::collections::HashSet;
+use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
-use std::sync::atomic::{Ordering, AtomicUsize};
use std::time::Instant;
use anyhow::{bail, format_err, Error};
-use crate::server::WorkerTask;
-use crate::api2::types::*;
+use proxmox_sys::{task_log, worker_task_context::WorkerTaskContext};
-use super::{
- DataStore, DataBlob, BackupGroup, BackupDir, BackupInfo, IndexFile,
- CryptMode,
- FileInfo, ArchiveType, archive_type,
-};
+use pbs_api_types::{Authid, CryptMode, VerifyState, UPID, SnapshotVerifyState};
+use pbs_datastore::{DataStore, DataBlob, StoreProgress};
+use pbs_datastore::backup_info::{BackupGroup, BackupDir, BackupInfo};
+use pbs_datastore::index::IndexFile;
+use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, FileInfo};
+use pbs_tools::fs::lock_dir_noblock_shared;
-fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
+use crate::tools::ParallelHandler;
+/// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
+/// already been verified or detected as corrupt.
+pub struct VerifyWorker {
+ worker: Arc<dyn WorkerTaskContext>,
+ datastore: Arc<DataStore>,
+ verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+}
+
+impl VerifyWorker {
+ /// Creates a new VerifyWorker for a given task worker and datastore.
+ pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore>) -> Self {
+ Self {
+ worker,
+ datastore,
+ // start with 16k chunks == up to 64G data
+ verified_chunks: Arc::new(Mutex::new(HashSet::with_capacity(16 * 1024))),
+ // start with 64 chunks since we assume there are few corrupt ones
+ corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
+ }
+ }
+}
+
+fn verify_blob(
+ datastore: Arc<DataStore>,
+ backup_dir: &BackupDir,
+ info: &FileInfo,
+) -> Result<(), Error> {
let blob = datastore.load_blob(backup_dir, &info.filename)?;
let raw_size = blob.raw_size();
}
}
-// We use a separate thread to read/load chunks, so that we can do
-// load and verify in parallel to increase performance.
-fn chunk_reader_thread(
+fn rename_corrupted_chunk(
datastore: Arc<DataStore>,
- index: Box<dyn IndexFile + Send>,
- verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- errors: Arc<AtomicUsize>,
- worker: Arc<WorkerTask>,
-) -> std::sync::mpsc::Receiver<(DataBlob, [u8;32], u64)> {
-
- let (sender, receiver) = std::sync::mpsc::sync_channel(3); // buffer up to 3 chunks
+ digest: &[u8;32],
+ worker: &dyn WorkerTaskContext,
+) {
+ let (path, digest_str) = datastore.chunk_path(digest);
- std::thread::spawn(move|| {
- for pos in 0..index.index_count() {
- let info = index.chunk_info(pos).unwrap();
- let size = info.range.end - info.range.start;
-
- if verified_chunks.lock().unwrap().contains(&info.digest) {
- continue; // already verified
- }
-
- if corrupt_chunks.lock().unwrap().contains(&info.digest) {
- let digest_str = proxmox::tools::digest_to_hex(&info.digest);
- worker.log(format!("chunk {} was marked as corrupt", digest_str));
- errors.fetch_add(1, Ordering::SeqCst);
- continue;
- }
+ let mut counter = 0;
+ let mut new_path = path.clone();
+ loop {
+ new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
+ if new_path.exists() && counter < 9 {
+ counter += 1;
+ } else {
+ break;
+ }
+ }
- match datastore.load_chunk(&info.digest) {
- Err(err) => {
- corrupt_chunks.lock().unwrap().insert(info.digest);
- worker.log(format!("can't verify chunk, load failed - {}", err));
- errors.fetch_add(1, Ordering::SeqCst);
- continue;
- }
- Ok(chunk) => {
- if sender.send((chunk, info.digest, size)).is_err() {
- break; // receiver gone - simply stop
- }
- }
+ match std::fs::rename(&path, &new_path) {
+ Ok(_) => {
+ task_log!(worker, "corrupted chunk renamed to {:?}", &new_path);
+ },
+ Err(err) => {
+ match err.kind() {
+ std::io::ErrorKind::NotFound => { /* ignored */ },
+ _ => task_log!(worker, "could not rename corrupted chunk {:?} - {}", &path, err)
}
}
- });
-
- receiver
+ };
}
fn verify_index_chunks(
- datastore: Arc<DataStore>,
+ verify_worker: &VerifyWorker,
index: Box<dyn IndexFile + Send>,
- verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
crypt_mode: CryptMode,
- worker: Arc<WorkerTask>,
) -> Result<(), Error> {
-
let errors = Arc::new(AtomicUsize::new(0));
let start_time = Instant::now();
- let chunk_channel = chunk_reader_thread(
- datastore,
- index,
- verified_chunks.clone(),
- corrupt_chunks.clone(),
- errors.clone(),
- worker.clone(),
- );
-
let mut read_bytes = 0;
let mut decoded_bytes = 0;
- loop {
-
- worker.fail_on_abort()?;
-
- let (chunk, digest, size) = match chunk_channel.recv() {
- Ok(tuple) => tuple,
- Err(std::sync::mpsc::RecvError) => break,
- };
+ let worker2 = Arc::clone(&verify_worker.worker);
+ let datastore2 = Arc::clone(&verify_worker.datastore);
+ let corrupt_chunks2 = Arc::clone(&verify_worker.corrupt_chunks);
+ let verified_chunks2 = Arc::clone(&verify_worker.verified_chunks);
+ let errors2 = Arc::clone(&errors);
+
+ let decoder_pool = ParallelHandler::new(
+ "verify chunk decoder",
+ 4,
+ move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
+ let chunk_crypt_mode = match chunk.crypt_mode() {
+ Err(err) => {
+ corrupt_chunks2.lock().unwrap().insert(digest);
+ task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err);
+ errors2.fetch_add(1, Ordering::SeqCst);
+ return Ok(());
+ },
+ Ok(mode) => mode,
+ };
+
+ if chunk_crypt_mode != crypt_mode {
+ task_log!(
+ worker2,
+ "chunk CryptMode {:?} does not match index CryptMode {:?}",
+ chunk_crypt_mode,
+ crypt_mode
+ );
+ errors2.fetch_add(1, Ordering::SeqCst);
+ }
- read_bytes += chunk.raw_size();
- decoded_bytes += size;
+ if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
+ corrupt_chunks2.lock().unwrap().insert(digest);
+ task_log!(worker2, "{}", err);
+ errors2.fetch_add(1, Ordering::SeqCst);
+ rename_corrupted_chunk(datastore2.clone(), &digest, &worker2);
+ } else {
+ verified_chunks2.lock().unwrap().insert(digest);
+ }
- let chunk_crypt_mode = match chunk.crypt_mode() {
- Err(err) => {
- corrupt_chunks.lock().unwrap().insert(digest);
- worker.log(format!("can't verify chunk, unknown CryptMode - {}", err));
- errors.fetch_add(1, Ordering::SeqCst);
- continue;
- },
- Ok(mode) => mode,
- };
-
- if chunk_crypt_mode != crypt_mode {
- worker.log(format!(
- "chunk CryptMode {:?} does not match index CryptMode {:?}",
- chunk_crypt_mode,
- crypt_mode
- ));
- errors.fetch_add(1, Ordering::SeqCst);
+ Ok(())
}
+ );
- if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
- corrupt_chunks.lock().unwrap().insert(digest);
- worker.log(format!("{}", err));
+ let skip_chunk = |digest: &[u8; 32]| -> bool {
+ if verify_worker.verified_chunks.lock().unwrap().contains(digest) {
+ true
+ } else if verify_worker.corrupt_chunks.lock().unwrap().contains(digest) {
+ let digest_str = proxmox::tools::digest_to_hex(digest);
+ task_log!(verify_worker.worker, "chunk {} was marked as corrupt", digest_str);
errors.fetch_add(1, Ordering::SeqCst);
+ true
} else {
- verified_chunks.lock().unwrap().insert(digest);
+ false
+ }
+ };
+
+ let check_abort = |pos: usize| -> Result<(), Error> {
+ if pos & 1023 == 0 {
+ verify_worker.worker.check_abort()?;
+ verify_worker.worker.fail_on_shutdown()?;
+ }
+ Ok(())
+ };
+
+ let chunk_list =
+ verify_worker
+ .datastore
+ .get_chunks_in_order(&index, skip_chunk, check_abort)?;
+
+ for (pos, _) in chunk_list {
+ verify_worker.worker.check_abort()?;
+ verify_worker.worker.fail_on_shutdown()?;
+
+ let info = index.chunk_info(pos).unwrap();
+
+ // we must always recheck this here, the parallel worker below alter it!
+ if skip_chunk(&info.digest) {
+ continue; // already verified or marked corrupt
+ }
+
+ match verify_worker.datastore.load_chunk(&info.digest) {
+ Err(err) => {
+ verify_worker.corrupt_chunks.lock().unwrap().insert(info.digest);
+ task_log!(verify_worker.worker, "can't verify chunk, load failed - {}", err);
+ errors.fetch_add(1, Ordering::SeqCst);
+ rename_corrupted_chunk(
+ verify_worker.datastore.clone(),
+ &info.digest,
+ &verify_worker.worker,
+ );
+ }
+ Ok(chunk) => {
+ let size = info.size();
+ read_bytes += chunk.raw_size();
+ decoder_pool.send((chunk, info.digest, size))?;
+ decoded_bytes += size;
+ }
}
}
+ decoder_pool.complete()?;
+
let elapsed = start_time.elapsed().as_secs_f64();
- let read_bytes_mib = (read_bytes as f64)/(1024.0*1024.0);
- let decoded_bytes_mib = (decoded_bytes as f64)/(1024.0*1024.0);
+ let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
+ let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
- let read_speed = read_bytes_mib/elapsed;
- let decode_speed = decoded_bytes_mib/elapsed;
+ let read_speed = read_bytes_mib / elapsed;
+ let decode_speed = decoded_bytes_mib / elapsed;
let error_count = errors.load(Ordering::SeqCst);
- worker.log(format!(" verified {:.2}/{:.2} Mib in {:.2} seconds, speed {:.2}/{:.2} Mib/s ({} errors)",
- read_bytes_mib, decoded_bytes_mib, elapsed, read_speed, decode_speed, error_count));
+ task_log!(
+ verify_worker.worker,
+ " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
+ read_bytes_mib,
+ decoded_bytes_mib,
+ elapsed,
+ read_speed,
+ decode_speed,
+ error_count,
+ );
if errors.load(Ordering::SeqCst) > 0 {
bail!("chunks could not be verified");
}
fn verify_fixed_index(
- datastore: Arc<DataStore>,
+ verify_worker: &VerifyWorker,
backup_dir: &BackupDir,
info: &FileInfo,
- verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- worker: Arc<WorkerTask>,
) -> Result<(), Error> {
-
let mut path = backup_dir.relative_path();
path.push(&info.filename);
- let index = datastore.open_fixed_reader(&path)?;
+ let index = verify_worker.datastore.open_fixed_reader(&path)?;
let (csum, size) = index.compute_csum();
if size != info.size {
bail!("wrong index checksum");
}
- verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker)
+ verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
}
fn verify_dynamic_index(
- datastore: Arc<DataStore>,
+ verify_worker: &VerifyWorker,
backup_dir: &BackupDir,
info: &FileInfo,
- verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- worker: Arc<WorkerTask>,
) -> Result<(), Error> {
-
let mut path = backup_dir.relative_path();
path.push(&info.filename);
- let index = datastore.open_dynamic_reader(&path)?;
+ let index = verify_worker.datastore.open_dynamic_reader(&path)?;
let (csum, size) = index.compute_csum();
if size != info.size {
bail!("wrong index checksum");
}
- verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker)
+ verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
}
/// Verify a single backup snapshot
/// - Ok(false) if there were verification errors
/// - Err(_) if task was aborted
pub fn verify_backup_dir(
- datastore: Arc<DataStore>,
+ verify_worker: &VerifyWorker,
backup_dir: &BackupDir,
- verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- worker: Arc<WorkerTask>
+ upid: UPID,
+ filter: Option<&dyn Fn(&BackupManifest) -> bool>,
) -> Result<bool, Error> {
+ let snap_lock = lock_dir_noblock_shared(
+ &verify_worker.datastore.snapshot_path(&backup_dir),
+ "snapshot",
+ "locked by another operation",
+ );
+ match snap_lock {
+ Ok(snap_lock) => {
+ verify_backup_dir_with_lock(verify_worker, backup_dir, upid, filter, snap_lock)
+ }
+ Err(err) => {
+ task_log!(
+ verify_worker.worker,
+ "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
+ verify_worker.datastore.name(),
+ backup_dir,
+ err,
+ );
+ Ok(true)
+ }
+ }
+}
- let mut manifest = match datastore.load_manifest(&backup_dir) {
+/// See verify_backup_dir
+pub fn verify_backup_dir_with_lock(
+ verify_worker: &VerifyWorker,
+ backup_dir: &BackupDir,
+ upid: UPID,
+ filter: Option<&dyn Fn(&BackupManifest) -> bool>,
+ _snap_lock: Dir,
+) -> Result<bool, Error> {
+ let manifest = match verify_worker.datastore.load_manifest(&backup_dir) {
Ok((manifest, _)) => manifest,
Err(err) => {
- worker.log(format!("verify {}:{} - manifest load error: {}", datastore.name(), backup_dir, err));
+ task_log!(
+ verify_worker.worker,
+ "verify {}:{} - manifest load error: {}",
+ verify_worker.datastore.name(),
+ backup_dir,
+ err,
+ );
return Ok(false);
}
};
- worker.log(format!("verify {}:{}", datastore.name(), backup_dir));
+ if let Some(filter) = filter {
+ if !filter(&manifest) {
+ task_log!(
+ verify_worker.worker,
+ "SKIPPED: verify {}:{} (recently verified)",
+ verify_worker.datastore.name(),
+ backup_dir,
+ );
+ return Ok(true);
+ }
+ }
+
+ task_log!(verify_worker.worker, "verify {}:{}", verify_worker.datastore.name(), backup_dir);
let mut error_count = 0;
- let mut verify_result = "ok";
+ let mut verify_result = VerifyState::Ok;
for info in manifest.files() {
- let result = proxmox::try_block!({
- worker.log(format!(" check {}", info.filename));
+ let result = proxmox_lang::try_block!({
+ task_log!(verify_worker.worker, " check {}", info.filename);
match archive_type(&info.filename)? {
- ArchiveType::FixedIndex =>
- verify_fixed_index(
- datastore.clone(),
- &backup_dir,
- info,
- verified_chunks.clone(),
- corrupt_chunks.clone(),
- worker.clone(),
- ),
- ArchiveType::DynamicIndex =>
- verify_dynamic_index(
- datastore.clone(),
- &backup_dir,
- info,
- verified_chunks.clone(),
- corrupt_chunks.clone(),
- worker.clone(),
- ),
- ArchiveType::Blob => verify_blob(datastore.clone(), &backup_dir, info),
+ ArchiveType::FixedIndex => verify_fixed_index(verify_worker, &backup_dir, info),
+ ArchiveType::DynamicIndex => verify_dynamic_index(verify_worker, &backup_dir, info),
+ ArchiveType::Blob => {
+ verify_blob(verify_worker.datastore.clone(), &backup_dir, info)
+ }
}
});
- worker.fail_on_abort()?;
+ verify_worker.worker.check_abort()?;
+ verify_worker.worker.fail_on_shutdown()?;
if let Err(err) = result {
- worker.log(format!("verify {}:{}/{} failed: {}", datastore.name(), backup_dir, info.filename, err));
+ task_log!(
+ verify_worker.worker,
+ "verify {}:{}/{} failed: {}",
+ verify_worker.datastore.name(),
+ backup_dir,
+ info.filename,
+ err,
+ );
error_count += 1;
- verify_result = "failed";
+ verify_result = VerifyState::Failed;
}
-
}
let verify_state = SnapshotVerifyState {
- state: verify_result.to_string(),
- upid: worker.upid().clone(),
+ state: verify_result,
+ upid,
};
- manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?;
- datastore.store_manifest(&backup_dir, serde_json::to_value(manifest)?)
- .map_err(|err| format_err!("unable to store manifest blob - {}", err))?;
-
+ let verify_state = serde_json::to_value(verify_state)?;
+ verify_worker
+ .datastore
+ .update_manifest(&backup_dir, |manifest| {
+ manifest.unprotected["verify_state"] = verify_state;
+ })
+ .map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
Ok(error_count == 0)
}
/// - Ok((count, failed_dirs)) where failed_dirs had verification errors
/// - Err(_) if task was aborted
pub fn verify_backup_group(
- datastore: Arc<DataStore>,
+ verify_worker: &VerifyWorker,
group: &BackupGroup,
- verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- progress: Option<(usize, usize)>, // (done, snapshot_count)
- worker: Arc<WorkerTask>,
-) -> Result<(usize, Vec<String>), Error> {
-
+ progress: &mut StoreProgress,
+ upid: &UPID,
+ filter: Option<&dyn Fn(&BackupManifest) -> bool>,
+) -> Result<Vec<String>, Error> {
let mut errors = Vec::new();
- let mut list = match group.list_backups(&datastore.base_path()) {
+ let mut list = match group.list_backups(&verify_worker.datastore.base_path()) {
Ok(list) => list,
Err(err) => {
- worker.log(format!("verify group {}:{} - unable to list backups: {}", datastore.name(), group, err));
- return Ok((0, errors));
+ task_log!(
+ verify_worker.worker,
+ "verify group {}:{} - unable to list backups: {}",
+ verify_worker.datastore.name(),
+ group,
+ err,
+ );
+ return Ok(errors);
}
};
- worker.log(format!("verify group {}:{}", datastore.name(), group));
+ let snapshot_count = list.len();
+ task_log!(
+ verify_worker.worker,
+ "verify group {}:{} ({} snapshots)",
+ verify_worker.datastore.name(),
+ group,
+ snapshot_count
+ );
- let (done, snapshot_count) = progress.unwrap_or((0, list.len()));
+ progress.group_snapshots = snapshot_count as u64;
- let mut count = 0;
BackupInfo::sort_list(&mut list, false); // newest first
- for info in list {
- count += 1;
- if !verify_backup_dir(datastore.clone(), &info.backup_dir, verified_chunks.clone(), corrupt_chunks.clone(), worker.clone())?{
+ for (pos, info) in list.into_iter().enumerate() {
+ if !verify_backup_dir(verify_worker, &info.backup_dir, upid.clone(), filter)? {
errors.push(info.backup_dir.to_string());
}
- if snapshot_count != 0 {
- let pos = done + count;
- let percentage = ((pos as f64) * 100.0)/(snapshot_count as f64);
- worker.log(format!("percentage done: {:.2}% ({} of {} snapshots)", percentage, pos, snapshot_count));
- }
+ progress.done_snapshots = pos as u64 + 1;
+ task_log!(verify_worker.worker, "percentage done: {}", progress);
}
- Ok((count, errors))
+ Ok(errors)
}
-/// Verify all backups inside a datastore
+/// Verify all (owned) backups inside a datastore
///
/// Errors are logged to the worker log.
///
/// Returns
/// - Ok(failed_dirs) where failed_dirs had verification errors
/// - Err(_) if task was aborted
-pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) -> Result<Vec<String>, Error> {
-
+pub fn verify_all_backups(
+ verify_worker: &VerifyWorker,
+ upid: &UPID,
+ owner: Option<Authid>,
+ filter: Option<&dyn Fn(&BackupManifest) -> bool>,
+) -> Result<Vec<String>, Error> {
let mut errors = Vec::new();
+ let worker = Arc::clone(&verify_worker.worker);
- let mut list = match BackupGroup::list_groups(&datastore.base_path()) {
- Ok(list) => list,
+ task_log!(worker, "verify datastore {}", verify_worker.datastore.name());
+
+ if let Some(owner) = &owner {
+ task_log!(worker, "limiting to backups owned by {}", owner);
+ }
+
+ let filter_by_owner = |group: &BackupGroup| {
+ match (verify_worker.datastore.get_owner(group), &owner) {
+ (Ok(ref group_owner), Some(owner)) => {
+ group_owner == owner
+ || (group_owner.is_token()
+ && !owner.is_token()
+ && group_owner.user() == owner.user())
+ },
+ (Ok(_), None) => true,
+ (Err(err), Some(_)) => {
+ // intentionally not in task log
+ // the task user might not be allowed to see this group!
+ println!("Failed to get owner of group '{}' - {}", group, err);
+ false
+ },
+ (Err(err), None) => {
+ // we don't filter by owner, but we want to log the error
+ task_log!(
+ worker,
+ "Failed to get owner of group '{} - {}",
+ group,
+ err,
+ );
+ errors.push(group.to_string());
+ true
+ },
+ }
+ };
+
+ let mut list = match BackupInfo::list_backup_groups(&verify_worker.datastore.base_path()) {
+ Ok(list) => list
+ .into_iter()
+ .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
+ .filter(filter_by_owner)
+ .collect::<Vec<BackupGroup>>(),
Err(err) => {
- worker.log(format!("verify datastore {} - unable to list backups: {}", datastore.name(), err));
+ task_log!(worker, "unable to list backups: {}", err,);
return Ok(errors);
}
};
list.sort_unstable();
- let mut snapshot_count = 0;
- for group in list.iter() {
- snapshot_count += group.list_backups(&datastore.base_path())?.len();
- }
-
- // start with 16384 chunks (up to 65GB)
- let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16)));
+ let group_count = list.len();
+ task_log!(worker, "found {} groups", group_count);
- // start with 64 chunks since we assume there are few corrupt ones
- let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
+ let mut progress = StoreProgress::new(group_count as u64);
- worker.log(format!("verify datastore {} ({} snapshots)", datastore.name(), snapshot_count));
+ for (pos, group) in list.into_iter().enumerate() {
+ progress.done_groups = pos as u64;
+ progress.done_snapshots = 0;
+ progress.group_snapshots = 0;
- let mut done = 0;
- for group in list {
- let (count, mut group_errors) = verify_backup_group(
- datastore.clone(),
- &group,
- verified_chunks.clone(),
- corrupt_chunks.clone(),
- Some((done, snapshot_count)),
- worker.clone(),
- )?;
+ let mut group_errors =
+ verify_backup_group(verify_worker, &group, &mut progress, upid, filter)?;
errors.append(&mut group_errors);
-
- done += count;
}
Ok(errors)
}
+
+/// Filter for the verification of snapshots
+pub fn verify_filter(
+ ignore_verified_snapshots: bool,
+ outdated_after: Option<i64>,
+ manifest: &BackupManifest,
+) -> bool {
+ if !ignore_verified_snapshots {
+ return true;
+ }
+
+ let raw_verify_state = manifest.unprotected["verify_state"].clone();
+ match serde_json::from_value::<SnapshotVerifyState>(raw_verify_state) {
+ Err(_) => true, // no last verification, always include
+ Ok(last_verify) => {
+ match outdated_after {
+ None => false, // never re-verify if ignored and no max age
+ Some(max_age) => {
+ let now = proxmox_time::epoch_i64();
+ let days_since_last_verify = (now - last_verify.upid.starttime) / 86400;
+
+ days_since_last_verify > max_age
+ }
+ }
+ }
+ }
+}