use std::sync::{Arc, Mutex};
use std::sync::atomic::{Ordering, AtomicUsize};
use std::time::Instant;
+use nix::dir::Dir;
use anyhow::{bail, format_err, Error};
BackupGroup,
BackupDir,
BackupInfo,
+ BackupManifest,
IndexFile,
CryptMode,
FileInfo,
corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
worker: Arc<dyn TaskState + Send + Sync>,
upid: UPID,
+ filter: Option<&dyn Fn(&BackupManifest) -> bool>,
) -> Result<bool, Error> {
-
- let _guard_res = lock_dir_noblock_shared(
+ let snap_lock = lock_dir_noblock_shared(
&datastore.snapshot_path(&backup_dir),
"snapshot",
"locked by another operation");
- if let Err(err) = _guard_res {
- task_log!(
- worker,
- "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
- datastore.name(),
+ match snap_lock {
+ Ok(snap_lock) => verify_backup_dir_with_lock(
+ datastore,
backup_dir,
- err,
- );
- return Ok(true);
+ verified_chunks,
+ corrupt_chunks,
+ worker,
+ upid,
+ filter,
+ snap_lock
+ ),
+ Err(err) => {
+ task_log!(
+ worker,
+ "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
+ datastore.name(),
+ backup_dir,
+ err,
+ );
+ Ok(true)
+ }
}
+}
- let mut manifest = match datastore.load_manifest(&backup_dir) {
+/// See verify_backup_dir
+pub fn verify_backup_dir_with_lock(
+ datastore: Arc<DataStore>,
+ backup_dir: &BackupDir,
+ verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+ corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+ worker: Arc<dyn TaskState + Send + Sync>,
+ upid: UPID,
+ filter: Option<&dyn Fn(&BackupManifest) -> bool>,
+ _snap_lock: Dir,
+) -> Result<bool, Error> {
+ let manifest = match datastore.load_manifest(&backup_dir) {
Ok((manifest, _)) => manifest,
Err(err) => {
task_log!(
}
};
+ if let Some(filter) = filter {
+ if filter(&manifest) == false {
+ task_log!(
+ worker,
+ "SKIPPED: verify {}:{} (recently verified)",
+ datastore.name(),
+ backup_dir,
+ );
+ return Ok(true);
+ }
+ }
+
task_log!(worker, "verify {}:{}", datastore.name(), backup_dir);
let mut error_count = 0;
state: verify_result,
upid,
};
- manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?;
- datastore.store_manifest(&backup_dir, serde_json::to_value(manifest)?)
- .map_err(|err| format_err!("unable to store manifest blob - {}", err))?;
+ let verify_state = serde_json::to_value(verify_state)?;
+ datastore.update_manifest(&backup_dir, |manifest| {
+ manifest.unprotected["verify_state"] = verify_state;
+ }).map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
Ok(error_count == 0)
}
progress: Option<(usize, usize)>, // (done, snapshot_count)
worker: Arc<dyn TaskState + Send + Sync>,
upid: &UPID,
+ filter: Option<&dyn Fn(&BackupManifest) -> bool>,
) -> Result<(usize, Vec<String>), Error> {
let mut errors = Vec::new();
BackupInfo::sort_list(&mut list, false); // newest first
for info in list {
count += 1;
+
if !verify_backup_dir(
datastore.clone(),
&info.backup_dir,
corrupt_chunks.clone(),
worker.clone(),
upid.clone(),
+ filter,
)? {
errors.push(info.backup_dir.to_string());
}
Ok((count, errors))
}
-/// Verify all backups inside a datastore
+/// Verify all (owned) backups inside a datastore
///
/// Errors are logged to the worker log.
///
datastore: Arc<DataStore>,
worker: Arc<dyn TaskState + Send + Sync>,
upid: &UPID,
+ owner: Option<Authid>,
+ filter: Option<&dyn Fn(&BackupManifest) -> bool>,
) -> Result<Vec<String>, Error> {
let mut errors = Vec::new();
+ if let Some(owner) = &owner {
+ task_log!(
+ worker,
+ "verify datastore {} - limiting to backups owned by {}",
+ datastore.name(),
+ owner
+ );
+ }
+
+ let filter_by_owner = |group: &BackupGroup| {
+ if let Some(owner) = &owner {
+ match datastore.get_owner(group) {
+ Ok(ref group_owner) => {
+ group_owner == owner
+ || (group_owner.is_token()
+ && !owner.is_token()
+ && group_owner.user() == owner.user())
+ },
+ Err(_) => false,
+ }
+ } else {
+ true
+ }
+ };
+
let mut list = match BackupGroup::list_groups(&datastore.base_path()) {
Ok(list) => list
.into_iter()
.filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
+ .filter(filter_by_owner)
.collect::<Vec<BackupGroup>>(),
Err(err) => {
task_log!(
Some((done, snapshot_count)),
worker.clone(),
upid,
+ filter,
)?;
errors.append(&mut group_errors);