use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
+use http::StatusCode;
use serde_json::json;
-use proxmox::api::error::{HttpError, StatusCode};
+use proxmox_router::HttpError;
+use proxmox_sys::task_log;
-use pbs_api_types::{Authid, SnapshotListItem, GroupListItem};
-use pbs_datastore::{task_log, BackupInfo, BackupDir, BackupGroup, StoreProgress};
+use pbs_api_types::{
+ Authid, BackupNamespace, GroupFilter, GroupListItem, Operation, RateLimitConfig, Remote,
+ SnapshotListItem,
+};
+
+use pbs_client::{
+ BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
+};
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{
- CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
+ archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
+use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
use pbs_tools::sha::sha256;
-use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
+use proxmox_rest_server::WorkerTask;
+
+use crate::tools::parallel_handler::ParallelHandler;
+
+/// Parameters for a pull operation.
+pub struct PullParameters {
+ /// Remote that is pulled from
+ remote: Remote,
+ /// Full specification of remote datastore
+ source: BackupRepository,
+ /// Local store that is pulled into
+ store: Arc<DataStore>,
+ /// Owner of synced groups (needs to match local owner of pre-existing groups)
+ owner: Authid,
+ /// Whether to remove groups which exist locally, but not on the remote end
+ remove_vanished: bool,
+ /// Filters for reducing the pull scope
+ group_filter: Option<Vec<GroupFilter>>,
+ /// Rate limits for all transfers from `remote`
+ limit: RateLimitConfig,
+}
-use crate::{
- backup::DataStore,
- server::WorkerTask,
- tools::ParallelHandler,
-};
+impl PullParameters {
+ /// Creates a new instance of `PullParameters`.
+ ///
+ /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
+ /// [BackupRepository] with `remote_store`.
+ pub fn new(
+ store: &str,
+ remote: &str,
+ remote_store: &str,
+ owner: Authid,
+ remove_vanished: Option<bool>,
+ group_filter: Option<Vec<GroupFilter>>,
+ limit: RateLimitConfig,
+ ) -> Result<Self, Error> {
+ let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
+
+ let (remote_config, _digest) = pbs_config::remote::config()?;
+ let remote: Remote = remote_config.lookup("remote", remote)?;
+
+ let remove_vanished = remove_vanished.unwrap_or(false);
+
+ let source = BackupRepository::new(
+ Some(remote.config.auth_id.clone()),
+ Some(remote.config.host.clone()),
+ remote.config.port,
+ remote_store.to_string(),
+ );
-// fixme: implement filters
-// fixme: delete vanished groups
-// Todo: correctly lock backup groups
+ Ok(Self {
+ remote,
+ source,
+ store,
+ owner,
+ remove_vanished,
+ group_filter,
+ limit,
+ })
+ }
+
+ /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
+ pub async fn client(&self) -> Result<HttpClient, Error> {
+ crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
+ }
+}
async fn pull_index_chunks<I: IndexFile>(
worker: &WorkerTask,
"sync chunk writer",
4,
move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
- // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
+ // println!("verify and write {}", hex::encode(&digest));
chunk.verify_unencrypted(size as usize, &digest)?;
target2.insert_chunk(&chunk, &digest)?;
Ok(())
let verify_and_write_channel = verify_and_write_channel.clone();
Ok::<_, Error>(async move {
- let chunk_exists = pbs_runtime::block_in_place(|| {
+ let chunk_exists = proxmox_async::runtime::block_in_place(|| {
target.cond_touch_chunk(&info.digest, false)
})?;
if chunk_exists {
- //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
+ //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest));
return Ok::<_, Error>(());
}
- //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
+ //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest));
let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
let raw_size = chunk.raw_size() as usize;
// decode, verify and write in a separate threads to maximize throughput
- pbs_runtime::block_in_place(|| {
+ proxmox_async::runtime::block_in_place(|| {
verify_and_write_channel.send((chunk, info.digest, info.size()))
})?;
let bytes = bytes.load(Ordering::SeqCst);
- worker.log(format!(
+ task_log!(
+ worker,
"downloaded {} bytes ({:.2} MiB/s)",
bytes,
(bytes as f64) / (1024.0 * 1024.0 * elapsed)
- ));
+ );
Ok(())
}
Ok(())
}
+/// Pulls a single file referenced by a manifest.
+///
+/// Pulling an archive consists of the following steps:
+/// - Create tmp file for archive
+/// - Download archive file into tmp file
+/// - Verify tmp file checksum
+/// - if archive is an index, pull referenced chunks
+/// - Rename tmp file into real path
async fn pull_single_archive(
worker: &WorkerTask,
reader: &BackupReader,
chunk_reader: &mut RemoteChunkReader,
tgt_store: Arc<DataStore>,
- snapshot: &BackupDir,
+ snapshot: &pbs_api_types::BackupDir,
archive_info: &FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
let archive_name = &archive_info.filename;
let mut path = tgt_store.base_path();
- path.push(snapshot.relative_path());
+ path.push(snapshot.to_string());
path.push(archive_name);
let mut tmp_path = path.clone();
tmp_path.set_extension("tmp");
- worker.log(format!("sync archive {}", archive_name));
+ task_log!(worker, "sync archive {}", archive_name);
+
let mut tmpfile = std::fs::OpenOptions::new()
.write(true)
.create(true)
if let Err(err) = std::fs::rename(&tmp_path, &path) {
bail!("Atomic rename file {:?} failed - {}", path, err);
}
- worker.log(format!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME));
+ task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
}
Ok(())
}
+/// Actual implementation of pulling a snapshot.
+///
+/// Pulling a snapshot consists of the following steps:
+/// - (Re)download the manifest
+/// -- if it matches, only download log and treat snapshot as already synced
+/// - Iterate over referenced files
+/// -- if file already exists, verify contents
+/// -- if not, pull it from the remote
+/// - Download log if not already existing
async fn pull_snapshot(
worker: &WorkerTask,
reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>,
- snapshot: &BackupDir,
+ snapshot: &pbs_api_types::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
+ let snapshot_relative_path = snapshot.to_string();
+
let mut manifest_name = tgt_store.base_path();
- manifest_name.push(snapshot.relative_path());
+ manifest_name.push(&snapshot_relative_path);
manifest_name.push(MANIFEST_BLOB_NAME);
let mut client_log_name = tgt_store.base_path();
- client_log_name.push(snapshot.relative_path());
+ client_log_name.push(&snapshot_relative_path);
client_log_name.push(CLIENT_LOG_BLOB_NAME);
let mut tmp_manifest_name = manifest_name.clone();
match err.downcast_ref::<HttpError>() {
Some(HttpError { code, message }) => match *code {
StatusCode::NOT_FOUND => {
- worker.log(format!(
+ task_log!(
+ worker,
"skipping snapshot {} - vanished since start of sync",
snapshot
- ));
+ );
return Ok(());
}
_ => {
let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
if manifest_name.exists() {
- let manifest_blob = proxmox::try_block!({
+ let manifest_blob = proxmox_lang::try_block!({
let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
format_err!(
"unable to open local manifest {:?} - {}",
if !client_log_name.exists() {
try_client_log_download(worker, reader, &client_log_name).await?;
}
- worker.log("no data changes");
+ task_log!(worker, "no data changes");
let _ = std::fs::remove_file(&tmp_manifest_name);
return Ok(()); // nothing changed
}
for item in manifest.files() {
let mut path = tgt_store.base_path();
- path.push(snapshot.relative_path());
+ path.push(&snapshot_relative_path);
path.push(&item.filename);
if path.exists() {
match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- worker.log(format!("detected changed file {:?} - {}", path, err));
+ task_log!(worker, "detected changed file {:?} - {}", path, err);
}
}
}
match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- worker.log(format!("detected changed file {:?} - {}", path, err));
+ task_log!(worker, "detected changed file {:?} - {}", path, err);
}
}
}
match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- worker.log(format!("detected changed file {:?} - {}", path, err));
+ task_log!(worker, "detected changed file {:?} - {}", path, err);
}
}
}
&mut chunk_reader,
tgt_store.clone(),
snapshot,
- &item,
+ item,
downloaded_chunks.clone(),
)
.await?;
Ok(())
}
-pub async fn pull_snapshot_from(
+/// Pulls a `snapshot` into `tgt_store`, differentiating between new snapshots (removed on error)
+/// and existing ones (kept even on error).
+async fn pull_snapshot_from(
worker: &WorkerTask,
reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>,
- snapshot: &BackupDir,
+ snapshot: &pbs_api_types::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
- let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
+ // FIXME: Namespace support requires source AND target namespace
+ let ns = BackupNamespace::root();
+ let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&ns, snapshot)?;
+ let snapshot_path = snapshot.to_string();
if is_new {
- worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
+ task_log!(worker, "sync snapshot {:?}", snapshot_path);
if let Err(err) = pull_snapshot(
worker,
reader,
tgt_store.clone(),
- &snapshot,
+ snapshot,
downloaded_chunks,
)
.await
{
- if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
- worker.log(format!("cleanup error - {}", cleanup_err));
+ if let Err(cleanup_err) = tgt_store.remove_backup_dir(&ns, snapshot, true) {
+ task_log!(worker, "cleanup error - {}", cleanup_err);
}
return Err(err);
}
- worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
+ task_log!(worker, "sync snapshot {:?} done", snapshot_path);
} else {
- worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
+ task_log!(worker, "re-sync snapshot {:?}", snapshot_path);
pull_snapshot(
worker,
reader,
tgt_store.clone(),
- &snapshot,
+ snapshot,
downloaded_chunks,
)
.await?;
- worker.log(format!(
- "re-sync snapshot {:?} done",
- snapshot.relative_path()
- ));
+ task_log!(worker, "re-sync snapshot {:?} done", snapshot_path);
}
Ok(())
fn affected(&self) -> Result<String, Error> {
match self.count {
0 => Ok(String::new()),
- 1 => proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest),
- _ => {
- Ok(format!(
- "{} .. {}",
- proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest)?,
- proxmox::tools::time::epoch_to_rfc3339_utc(self.newest)?,
- ))
- }
+ 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
+ _ => Ok(format!(
+ "{} .. {}",
+ proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
+ proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
+ )),
}
}
}
}
}
-pub async fn pull_group(
+/// Pulls a group according to `params`.
+///
+/// Pulling a group consists of the following steps:
+/// - Query the list of snapshots available for this group on the remote, sort by snapshot time
+/// - Get last snapshot timestamp on local datastore
+/// - Iterate over list of snapshots
+/// -- Recreate client/BackupReader
+/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
+/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
+///
+/// Permission checks:
+/// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
+/// - local group owner is already checked by pull_store
+async fn pull_group(
worker: &WorkerTask,
client: &HttpClient,
- src_repo: &BackupRepository,
- tgt_store: Arc<DataStore>,
- group: &BackupGroup,
- delete: bool,
+ params: &PullParameters,
+ group: &pbs_api_types::BackupGroup,
progress: &mut StoreProgress,
) -> Result<(), Error> {
- let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
+ // FIXME: Namespace support
+ let ns = BackupNamespace::root();
+
+ let path = format!(
+ "api2/json/admin/datastore/{}/snapshots",
+ params.source.store()
+ );
let args = json!({
- "backup-type": group.backup_type(),
- "backup-id": group.backup_id(),
+ "backup-type": group.ty,
+ "backup-id": group.id,
});
let mut result = client.get(&path, Some(args)).await?;
let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
- list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
+ list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
client.login().await?; // make sure auth is complete
let fingerprint = client.fingerprint();
- let last_sync = tgt_store.last_successful_backup(group)?;
+ let last_sync = params.store.last_successful_backup(&ns, group)?;
let mut remote_snapshots = std::collections::HashSet::new();
- // start with 16384 chunks (up to 65GB)
+ // start with 65536 chunks (up to 256 GiB)
let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
progress.group_snapshots = list.len() as u64;
};
for (pos, item) in list.into_iter().enumerate() {
- let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
+ let snapshot = item.backup;
// in-progress backups can't be synced
if item.size.is_none() {
- worker.log(format!(
+ task_log!(
+ worker,
"skipping snapshot {} - in-progress backup",
snapshot
- ));
+ );
continue;
}
- let backup_time = snapshot.backup_time();
-
- remote_snapshots.insert(backup_time);
+ remote_snapshots.insert(snapshot.time);
if let Some(last_sync_time) = last_sync {
- if last_sync_time > backup_time {
- skip_info.update(backup_time);
+ if last_sync_time > snapshot.time {
+ skip_info.update(snapshot.time);
continue;
}
}
// get updated auth_info (new tickets)
let auth_info = client.login().await?;
- let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
+ let options =
+ HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
+ .rate_limit(params.limit.clone());
let new_client = HttpClient::new(
- src_repo.host(),
- src_repo.port(),
- src_repo.auth_id(),
+ params.source.host(),
+ params.source.port(),
+ params.source.auth_id(),
options,
)?;
let reader = BackupReader::start(
new_client,
None,
- src_repo.store(),
- snapshot.group().backup_type(),
- snapshot.group().backup_id(),
- backup_time,
+ params.source.store(),
+ &ns,
+ &snapshot,
true,
)
.await?;
let result = pull_snapshot_from(
worker,
reader,
- tgt_store.clone(),
+ params.store.clone(),
&snapshot,
downloaded_chunks.clone(),
)
.await;
progress.done_snapshots = pos as u64 + 1;
- worker.log(format!("percentage done: {}", progress));
+ task_log!(worker, "percentage done: {}", progress);
result?; // stop on error
}
- if delete {
- let local_list = group.list_backups(&tgt_store.base_path())?;
+ if params.remove_vanished {
+ let group = params.store.backup_group(ns.clone(), group.clone());
+ let local_list = group.list_backups()?;
for info in local_list {
let backup_time = info.backup_dir.backup_time();
if remote_snapshots.contains(&backup_time) {
continue;
}
- worker.log(format!(
+ if info.backup_dir.is_protected() {
+ task_log!(
+ worker,
+ "don't delete vanished snapshot {:?} (protected)",
+ info.backup_dir.relative_path()
+ );
+ continue;
+ }
+ task_log!(
+ worker,
"delete vanished snapshot {:?}",
info.backup_dir.relative_path()
- ));
- tgt_store.remove_backup_dir(&info.backup_dir, false)?;
+ );
+ params
+ .store
+ .remove_backup_dir(&ns, info.backup_dir.as_ref(), false)?;
}
}
Ok(())
}
+/// Pulls a store according to `params`.
+///
+/// Pulling a store consists of the following steps:
+/// - Query list of groups on the remote
+/// - Filter list according to configured group filters
+/// - Iterate list and attempt to pull each group in turn
+/// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
+/// not or no longer available on the remote
+///
+/// Permission checks:
+/// - access to local datastore and remote entry need to be checked at call site
+/// - remote groups are filtered by remote
+/// - owner check for vanished groups done here
pub async fn pull_store(
worker: &WorkerTask,
client: &HttpClient,
- src_repo: &BackupRepository,
- tgt_store: Arc<DataStore>,
- delete: bool,
- auth_id: Authid,
+ params: &PullParameters,
) -> Result<(), Error> {
+ // FIXME: Namespace support requires source AND target namespace
+ let ns = BackupNamespace::root();
+ let local_ns = BackupNamespace::root();
+
// explicit create shared lock to prevent GC on newly created chunks
- let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
+ let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
- let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
+ // FIXME: Namespaces! AND: If we make this API call recurse down namespaces we need to do the
+ // same down in the `remove_vanished` case!
+ let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
let mut result = client
.get(&path, None)
let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
- worker.log(format!("found {} groups to sync", list.len()));
-
+ let total_count = list.len();
list.sort_unstable_by(|a, b| {
- let type_order = a.backup_type.cmp(&b.backup_type);
+ let type_order = a.backup.ty.cmp(&b.backup.ty);
if type_order == std::cmp::Ordering::Equal {
- a.backup_id.cmp(&b.backup_id)
+ a.backup.id.cmp(&b.backup.id)
} else {
type_order
}
});
+ let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
+ filters.iter().any(|filter| group.matches(filter))
+ };
+
+ let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
+
+ let list = if let Some(ref group_filter) = ¶ms.group_filter {
+ let unfiltered_count = list.len();
+ let list: Vec<pbs_api_types::BackupGroup> = list
+ .into_iter()
+ .filter(|group| apply_filters(group, group_filter))
+ .collect();
+ task_log!(
+ worker,
+ "found {} groups to sync (out of {} total)",
+ list.len(),
+ unfiltered_count
+ );
+ list
+ } else {
+ task_log!(worker, "found {} groups to sync", total_count);
+ list
+ };
+
let mut errors = false;
let mut new_groups = std::collections::HashSet::new();
- for item in list.iter() {
- new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id));
+ for group in list.iter() {
+ new_groups.insert(group.clone());
}
let mut progress = StoreProgress::new(list.len() as u64);
- for (done, item) in list.into_iter().enumerate() {
+ for (done, group) in list.into_iter().enumerate() {
progress.done_groups = done as u64;
progress.done_snapshots = 0;
progress.group_snapshots = 0;
- let group = BackupGroup::new(&item.backup_type, &item.backup_id);
-
- let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
- Ok(result) => result,
- Err(err) => {
- worker.log(format!(
- "sync group {}/{} failed - group lock failed: {}",
- item.backup_type, item.backup_id, err
- ));
- errors = true; // do not stop here, instead continue
- continue;
- }
- };
+ let (owner, _lock_guard) =
+ match params
+ .store
+ .create_locked_backup_group(&ns, &group, ¶ms.owner)
+ {
+ Ok(result) => result,
+ Err(err) => {
+ task_log!(
+ worker,
+ "sync group {} failed - group lock failed: {}",
+ &group,
+ err
+ );
+ errors = true; // do not stop here, instead continue
+ continue;
+ }
+ };
// permission check
- if auth_id != owner {
+ if params.owner != owner {
// only the owner is allowed to create additional snapshots
- worker.log(format!(
- "sync group {}/{} failed - owner check failed ({} != {})",
- item.backup_type, item.backup_id, auth_id, owner
- ));
+ task_log!(
+ worker,
+ "sync group {} failed - owner check failed ({} != {})",
+ &group,
+ params.owner,
+ owner
+ );
errors = true; // do not stop here, instead continue
- } else if let Err(err) = pull_group(
- worker,
- client,
- src_repo,
- tgt_store.clone(),
- &group,
- delete,
- &mut progress,
- )
- .await
- {
- worker.log(format!(
- "sync group {}/{} failed - {}",
- item.backup_type, item.backup_id, err,
- ));
+ } else if let Err(err) = pull_group(worker, client, params, &group, &mut progress).await {
+ task_log!(worker, "sync group {} failed - {}", &group, err,);
errors = true; // do not stop here, instead continue
}
}
- if delete {
- let result: Result<(), Error> = proxmox::try_block!({
- let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?;
- for local_group in local_groups {
- if new_groups.contains(&local_group) {
+ if params.remove_vanished {
+ let result: Result<(), Error> = proxmox_lang::try_block!({
+ // FIXME: See above comment about namespaces & recursion
+ for local_group in params.store.iter_backup_groups(Default::default())? {
+ let local_group = local_group?;
+ if new_groups.contains(local_group.as_ref()) {
+ continue;
+ }
+ let owner = params.store.get_owner(&local_ns, &local_group.group())?;
+ if check_backup_owner(&owner, ¶ms.owner).is_err() {
continue;
}
- worker.log(format!(
+ if let Some(ref group_filter) = ¶ms.group_filter {
+ if !apply_filters(local_group.as_ref(), group_filter) {
+ continue;
+ }
+ }
+ task_log!(
+ worker,
"delete vanished group '{}/{}'",
local_group.backup_type(),
local_group.backup_id()
- ));
- if let Err(err) = tgt_store.remove_backup_group(&local_group) {
- worker.log(err.to_string());
- errors = true;
+ );
+ match params.store.remove_backup_group(&ns, local_group.as_ref()) {
+ Ok(true) => {}
+ Ok(false) => {
+ task_log!(
+ worker,
+ "kept some protected snapshots of group '{}'",
+ local_group
+ );
+ }
+ Err(err) => {
+ task_log!(worker, "{}", err);
+ errors = true;
+ }
}
}
Ok(())
});
if let Err(err) = result {
- worker.log(format!("error during cleanup: {}", err));
+ task_log!(worker, "error during cleanup: {}", err);
errors = true;
};
}