use proxmox_sys::task_log;
use pbs_api_types::{
- Authid, GroupFilter, GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem,
+ Authid, BackupNamespace, GroupFilter, GroupListItem, Operation, RateLimitConfig, Remote,
+ SnapshotListItem,
};
use pbs_client::{
use pbs_datastore::manifest::{
archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
-use pbs_datastore::{BackupDir, BackupGroup, BackupInfo, DataStore, StoreProgress};
+use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
use pbs_tools::sha::sha256;
use proxmox_rest_server::WorkerTask;
use crate::tools::parallel_handler::ParallelHandler;
-// fixme: implement filters
-// fixme: delete vanished groups
-// Todo: correctly lock backup groups
-
+/// Parameters for a pull operation.
pub struct PullParameters {
+ /// Remote that is pulled from
remote: Remote,
+ /// Full specification of remote datastore
source: BackupRepository,
+ /// Local store that is pulled into
store: Arc<DataStore>,
+ /// Owner of synced groups (needs to match local owner of pre-existing groups)
owner: Authid,
+ /// Whether to remove groups which exist locally, but not on the remote end
remove_vanished: bool,
+ /// Filters for reducing the pull scope
group_filter: Option<Vec<GroupFilter>>,
+ /// Rate limits for all transfers from `remote`
limit: RateLimitConfig,
}
impl PullParameters {
+ /// Creates a new instance of `PullParameters`.
+ ///
+ /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
+ /// [BackupRepository] with `remote_store`.
pub fn new(
store: &str,
remote: &str,
})
}
+ /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
pub async fn client(&self) -> Result<HttpClient, Error> {
crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
}
Ok(())
}
+/// Pulls a single file referenced by a manifest.
+///
+/// Pulling an archive consists of the following steps:
+/// - Create tmp file for archive
+/// - Download archive file into tmp file
+/// - Verify tmp file checksum
+/// - if archive is an index, pull referenced chunks
+/// - Rename tmp file into real path
async fn pull_single_archive(
worker: &WorkerTask,
reader: &BackupReader,
chunk_reader: &mut RemoteChunkReader,
tgt_store: Arc<DataStore>,
- snapshot: &BackupDir,
+ snapshot: &pbs_api_types::BackupDir,
archive_info: &FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
let archive_name = &archive_info.filename;
let mut path = tgt_store.base_path();
- path.push(snapshot.relative_path());
+ path.push(snapshot.to_string());
path.push(archive_name);
let mut tmp_path = path.clone();
Ok(())
}
+/// Actual implementation of pulling a snapshot.
+///
+/// Pulling a snapshot consists of the following steps:
+/// - (Re)download the manifest
+/// -- if it matches, only download log and treat snapshot as already synced
+/// - Iterate over referenced files
+/// -- if file already exists, verify contents
+/// -- if not, pull it from the remote
+/// - Download log if not already existing
async fn pull_snapshot(
worker: &WorkerTask,
reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>,
- snapshot: &BackupDir,
+ snapshot: &pbs_api_types::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
+ let snapshot_relative_path = snapshot.to_string();
+
let mut manifest_name = tgt_store.base_path();
- manifest_name.push(snapshot.relative_path());
+ manifest_name.push(&snapshot_relative_path);
manifest_name.push(MANIFEST_BLOB_NAME);
let mut client_log_name = tgt_store.base_path();
- client_log_name.push(snapshot.relative_path());
+ client_log_name.push(&snapshot_relative_path);
client_log_name.push(CLIENT_LOG_BLOB_NAME);
let mut tmp_manifest_name = manifest_name.clone();
for item in manifest.files() {
let mut path = tgt_store.base_path();
- path.push(snapshot.relative_path());
+ path.push(&snapshot_relative_path);
path.push(&item.filename);
if path.exists() {
Ok(())
}
-pub async fn pull_snapshot_from(
+/// Pulls a `snapshot` into `tgt_store`, differentiating between new snapshots (removed on error)
+/// and existing ones (kept even on error).
+async fn pull_snapshot_from(
worker: &WorkerTask,
reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>,
- snapshot: &BackupDir,
+ snapshot: &pbs_api_types::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
- let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(snapshot)?;
+ // FIXME: Namespace support requires source AND target namespace
+ let ns = BackupNamespace::root();
+ let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&ns, snapshot)?;
+ let snapshot_path = snapshot.to_string();
if is_new {
- task_log!(worker, "sync snapshot {:?}", snapshot.relative_path());
+ task_log!(worker, "sync snapshot {:?}", snapshot_path);
if let Err(err) = pull_snapshot(
worker,
)
.await
{
- if let Err(cleanup_err) = tgt_store.remove_backup_dir(snapshot, true) {
+ if let Err(cleanup_err) = tgt_store.remove_backup_dir(&ns, snapshot, true) {
task_log!(worker, "cleanup error - {}", cleanup_err);
}
return Err(err);
}
- task_log!(worker, "sync snapshot {:?} done", snapshot.relative_path());
+ task_log!(worker, "sync snapshot {:?} done", snapshot_path);
} else {
- task_log!(worker, "re-sync snapshot {:?}", snapshot.relative_path());
+ task_log!(worker, "re-sync snapshot {:?}", snapshot_path);
pull_snapshot(
worker,
reader,
downloaded_chunks,
)
.await?;
- task_log!(
- worker,
- "re-sync snapshot {:?} done",
- snapshot.relative_path()
- );
+ task_log!(worker, "re-sync snapshot {:?} done", snapshot_path);
}
Ok(())
}
}
-pub async fn pull_group(
+/// Pulls a group according to `params`.
+///
+/// Pulling a group consists of the following steps:
+/// - Query the list of snapshots available for this group on the remote, sort by snapshot time
+/// - Get last snapshot timestamp on local datastore
+/// - Iterate over list of snapshots
+/// -- Recreate client/BackupReader
+/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
+/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
+///
+/// Permission checks:
+/// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
+/// - local group owner is already checked by pull_store
+async fn pull_group(
worker: &WorkerTask,
client: &HttpClient,
params: &PullParameters,
- group: &BackupGroup,
+ group: &pbs_api_types::BackupGroup,
progress: &mut StoreProgress,
) -> Result<(), Error> {
+ // FIXME: Namespace support
+ let ns = BackupNamespace::root();
+
let path = format!(
"api2/json/admin/datastore/{}/snapshots",
params.source.store()
);
let args = json!({
- "backup-type": group.backup_type(),
- "backup-id": group.backup_id(),
+ "backup-type": group.ty,
+ "backup-id": group.id,
});
let mut result = client.get(&path, Some(args)).await?;
let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
- list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
+ list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
client.login().await?; // make sure auth is complete
let fingerprint = client.fingerprint();
- let last_sync = params.store.last_successful_backup(group)?;
+ let last_sync = params.store.last_successful_backup(&ns, group)?;
let mut remote_snapshots = std::collections::HashSet::new();
- // start with 16384 chunks (up to 65GB)
+ // start with 65536 chunks (up to 256 GiB)
let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
progress.group_snapshots = list.len() as u64;
};
for (pos, item) in list.into_iter().enumerate() {
- let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
+ let snapshot = item.backup;
// in-progress backups can't be synced
if item.size.is_none() {
continue;
}
- let backup_time = snapshot.backup_time();
-
- remote_snapshots.insert(backup_time);
+ remote_snapshots.insert(snapshot.time);
if let Some(last_sync_time) = last_sync {
- if last_sync_time > backup_time {
- skip_info.update(backup_time);
+ if last_sync_time > snapshot.time {
+ skip_info.update(snapshot.time);
continue;
}
}
new_client,
None,
params.source.store(),
- snapshot.group().backup_type(),
- snapshot.group().backup_id(),
- backup_time,
+ &ns,
+ &snapshot,
true,
)
.await?;
}
if params.remove_vanished {
- let local_list = group.list_backups(¶ms.store.base_path())?;
+ let group = params.store.backup_group(ns.clone(), group.clone());
+ let local_list = group.list_backups()?;
for info in local_list {
let backup_time = info.backup_dir.backup_time();
if remote_snapshots.contains(&backup_time) {
continue;
}
- if info.backup_dir.is_protected(params.store.base_path()) {
+ if info.backup_dir.is_protected() {
task_log!(
worker,
"don't delete vanished snapshot {:?} (protected)",
"delete vanished snapshot {:?}",
info.backup_dir.relative_path()
);
- params.store.remove_backup_dir(&info.backup_dir, false)?;
+ params
+ .store
+ .remove_backup_dir(&ns, info.backup_dir.as_ref(), false)?;
}
}
Ok(())
}
+/// Pulls a store according to `params`.
+///
+/// Pulling a store consists of the following steps:
+/// - Query list of groups on the remote
+/// - Filter list according to configured group filters
+/// - Iterate list and attempt to pull each group in turn
+/// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
+/// not or no longer available on the remote
+///
+/// Permission checks:
+/// - access to local datastore and remote entry need to be checked at call site
+/// - remote groups are filtered by remote
+/// - owner check for vanished groups done here
pub async fn pull_store(
worker: &WorkerTask,
client: &HttpClient,
params: &PullParameters,
) -> Result<(), Error> {
+ // FIXME: Namespace support requires source AND target namespace
+ let ns = BackupNamespace::root();
+ let local_ns = BackupNamespace::root();
+
// explicit create shared lock to prevent GC on newly created chunks
let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
+ // FIXME: Namespaces! AND: If we make this API call recurse down namespaces we need to do the
+ // same down in the `remove_vanished` case!
let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
let mut result = client
let total_count = list.len();
list.sort_unstable_by(|a, b| {
- let type_order = a.backup_type.cmp(&b.backup_type);
+ let type_order = a.backup.ty.cmp(&b.backup.ty);
if type_order == std::cmp::Ordering::Equal {
- a.backup_id.cmp(&b.backup_id)
+ a.backup.id.cmp(&b.backup.id)
} else {
type_order
}
});
- let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
+ let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
filters.iter().any(|filter| group.matches(filter))
};
- let list: Vec<BackupGroup> = list
- .into_iter()
- .map(|item| BackupGroup::new(item.backup_type, item.backup_id))
- .collect();
+ let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
let list = if let Some(ref group_filter) = ¶ms.group_filter {
let unfiltered_count = list.len();
- let list: Vec<BackupGroup> = list
+ let list: Vec<pbs_api_types::BackupGroup> = list
.into_iter()
.filter(|group| apply_filters(group, group_filter))
.collect();
progress.done_snapshots = 0;
progress.group_snapshots = 0;
- let (owner, _lock_guard) = match params
- .store
- .create_locked_backup_group(&group, ¶ms.owner)
- {
- Ok(result) => result,
- Err(err) => {
- task_log!(
- worker,
- "sync group {} failed - group lock failed: {}",
- &group,
- err
- );
- errors = true; // do not stop here, instead continue
- continue;
- }
- };
+ let (owner, _lock_guard) =
+ match params
+ .store
+ .create_locked_backup_group(&ns, &group, ¶ms.owner)
+ {
+ Ok(result) => result,
+ Err(err) => {
+ task_log!(
+ worker,
+ "sync group {} failed - group lock failed: {}",
+ &group,
+ err
+ );
+ errors = true; // do not stop here, instead continue
+ continue;
+ }
+ };
// permission check
if params.owner != owner {
if params.remove_vanished {
let result: Result<(), Error> = proxmox_lang::try_block!({
- let local_groups = BackupInfo::list_backup_groups(¶ms.store.base_path())?;
- for local_group in local_groups {
- if new_groups.contains(&local_group) {
+ // FIXME: See above comment about namespaces & recursion
+ for local_group in params.store.iter_backup_groups(Default::default())? {
+ let local_group = local_group?;
+ if new_groups.contains(local_group.as_ref()) {
+ continue;
+ }
+ let owner = params.store.get_owner(&local_ns, &local_group.group())?;
+ if check_backup_owner(&owner, ¶ms.owner).is_err() {
continue;
}
if let Some(ref group_filter) = ¶ms.group_filter {
- if !apply_filters(&local_group, group_filter) {
+ if !apply_filters(local_group.as_ref(), group_filter) {
continue;
}
}
local_group.backup_type(),
local_group.backup_id()
);
- match params.store.remove_backup_group(&local_group) {
+ match params.store.remove_backup_group(&ns, local_group.as_ref()) {
Ok(true) => {}
Ok(false) => {
task_log!(