use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
-use serde_json::json;
use http::StatusCode;
+use serde_json::json;
use proxmox_router::HttpError;
use proxmox_sys::task_log;
-use pbs_api_types::{Authid, GroupFilter, GroupListItem, Remote, SnapshotListItem};
+use pbs_api_types::{
+ Authid, BackupNamespace, GroupFilter, GroupListItem, Operation, RateLimitConfig, Remote,
+ SnapshotListItem,
+};
-use pbs_datastore::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress};
+use pbs_client::{
+ BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
+};
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{
- CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
+ archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
+use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
use pbs_tools::sha::sha256;
-use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
use proxmox_rest_server::WorkerTask;
-use crate::tools::ParallelHandler;
-
-// fixme: implement filters
-// fixme: delete vanished groups
-// Todo: correctly lock backup groups
+use crate::tools::parallel_handler::ParallelHandler;
+/// Parameters for a pull operation.
pub struct PullParameters {
+ /// Remote that is pulled from
remote: Remote,
+ /// Full specification of remote datastore
source: BackupRepository,
+ /// Local store that is pulled into
store: Arc<DataStore>,
+ /// Owner of synced groups (needs to match local owner of pre-existing groups)
owner: Authid,
+ /// Whether to remove groups which exist locally, but not on the remote end
remove_vanished: bool,
+ /// Filters for reducing the pull scope
group_filter: Option<Vec<GroupFilter>>,
+ /// Rate limits for all transfers from `remote`
+ limit: RateLimitConfig,
}
impl PullParameters {
+ /// Creates a new instance of `PullParameters`.
+ ///
+ /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
+ /// [BackupRepository] with `remote_store`.
pub fn new(
store: &str,
remote: &str,
owner: Authid,
remove_vanished: Option<bool>,
group_filter: Option<Vec<GroupFilter>>,
+ limit: RateLimitConfig,
) -> Result<Self, Error> {
- let store = DataStore::lookup_datastore(store)?;
+ let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
let (remote_config, _digest) = pbs_config::remote::config()?;
let remote: Remote = remote_config.lookup("remote", remote)?;
remote_store.to_string(),
);
- Ok(Self { remote, source, store, owner, remove_vanished, group_filter })
+ Ok(Self {
+ remote,
+ source,
+ store,
+ owner,
+ remove_vanished,
+ group_filter,
+ limit,
+ })
}
+ /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
pub async fn client(&self) -> Result<HttpClient, Error> {
- crate::api2::config::remote::remote_client(&self.remote).await
+ crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
}
}
"sync chunk writer",
4,
move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
- // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
+ // println!("verify and write {}", hex::encode(&digest));
chunk.verify_unencrypted(size as usize, &digest)?;
target2.insert_chunk(&chunk, &digest)?;
Ok(())
target.cond_touch_chunk(&info.digest, false)
})?;
if chunk_exists {
- //task_log!(worker, "chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest));
+ //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest));
return Ok::<_, Error>(());
}
- //task_log!(worker, "sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest));
+ //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest));
let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
let raw_size = chunk.raw_size() as usize;
let bytes = bytes.load(Ordering::SeqCst);
task_log!(
- worker,
+ worker,
"downloaded {} bytes ({:.2} MiB/s)",
bytes,
(bytes as f64) / (1024.0 * 1024.0 * elapsed)
Ok(())
}
+/// Pulls a single file referenced by a manifest.
+///
+/// Pulling an archive consists of the following steps:
+/// - Create tmp file for archive
+/// - Download archive file into tmp file
+/// - Verify tmp file checksum
+/// - if archive is an index, pull referenced chunks
+/// - Rename tmp file into real path
async fn pull_single_archive(
worker: &WorkerTask,
reader: &BackupReader,
chunk_reader: &mut RemoteChunkReader,
tgt_store: Arc<DataStore>,
- snapshot: &BackupDir,
+ snapshot: &pbs_api_types::BackupDir,
archive_info: &FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
let archive_name = &archive_info.filename;
let mut path = tgt_store.base_path();
- path.push(snapshot.relative_path());
+ path.push(snapshot.to_string());
path.push(archive_name);
let mut tmp_path = path.clone();
Ok(())
}
+/// Actual implementation of pulling a snapshot.
+///
+/// Pulling a snapshot consists of the following steps:
+/// - (Re)download the manifest
+/// -- if it matches, only download log and treat snapshot as already synced
+/// - Iterate over referenced files
+/// -- if file already exists, verify contents
+/// -- if not, pull it from the remote
+/// - Download log if not already existing
async fn pull_snapshot(
worker: &WorkerTask,
reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>,
- snapshot: &BackupDir,
+ snapshot: &pbs_api_types::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
+ let snapshot_relative_path = snapshot.to_string();
+
let mut manifest_name = tgt_store.base_path();
- manifest_name.push(snapshot.relative_path());
+ manifest_name.push(&snapshot_relative_path);
manifest_name.push(MANIFEST_BLOB_NAME);
let mut client_log_name = tgt_store.base_path();
- client_log_name.push(snapshot.relative_path());
+ client_log_name.push(&snapshot_relative_path);
client_log_name.push(CLIENT_LOG_BLOB_NAME);
let mut tmp_manifest_name = manifest_name.clone();
for item in manifest.files() {
let mut path = tgt_store.base_path();
- path.push(snapshot.relative_path());
+ path.push(&snapshot_relative_path);
path.push(&item.filename);
if path.exists() {
&mut chunk_reader,
tgt_store.clone(),
snapshot,
- &item,
+ item,
downloaded_chunks.clone(),
)
.await?;
Ok(())
}
-pub async fn pull_snapshot_from(
+/// Pulls a `snapshot` into `tgt_store`, differentiating between new snapshots (removed on error)
+/// and existing ones (kept even on error).
+async fn pull_snapshot_from(
worker: &WorkerTask,
reader: Arc<BackupReader>,
tgt_store: Arc<DataStore>,
- snapshot: &BackupDir,
+ snapshot: &pbs_api_types::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
- let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
+ // FIXME: Namespace support requires source AND target namespace
+ let ns = BackupNamespace::root();
+ let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&ns, snapshot)?;
+ let snapshot_path = snapshot.to_string();
if is_new {
- task_log!(worker, "sync snapshot {:?}", snapshot.relative_path());
+ task_log!(worker, "sync snapshot {:?}", snapshot_path);
if let Err(err) = pull_snapshot(
worker,
reader,
tgt_store.clone(),
- &snapshot,
+ snapshot,
downloaded_chunks,
)
.await
{
- if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
+ if let Err(cleanup_err) = tgt_store.remove_backup_dir(&ns, snapshot, true) {
task_log!(worker, "cleanup error - {}", cleanup_err);
}
return Err(err);
}
- task_log!(worker, "sync snapshot {:?} done", snapshot.relative_path());
+ task_log!(worker, "sync snapshot {:?} done", snapshot_path);
} else {
- task_log!(worker, "re-sync snapshot {:?}", snapshot.relative_path());
+ task_log!(worker, "re-sync snapshot {:?}", snapshot_path);
pull_snapshot(
worker,
reader,
tgt_store.clone(),
- &snapshot,
+ snapshot,
downloaded_chunks,
)
.await?;
- task_log!(worker, "re-sync snapshot {:?} done", snapshot.relative_path());
+ task_log!(worker, "re-sync snapshot {:?} done", snapshot_path);
}
Ok(())
match self.count {
0 => Ok(String::new()),
1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
- _ => {
- Ok(format!(
- "{} .. {}",
- proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
- proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
- ))
- }
+ _ => Ok(format!(
+ "{} .. {}",
+ proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
+ proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
+ )),
}
}
}
}
}
-pub async fn pull_group(
+/// Pulls a group according to `params`.
+///
+/// Pulling a group consists of the following steps:
+/// - Query the list of snapshots available for this group on the remote, sort by snapshot time
+/// - Get last snapshot timestamp on local datastore
+/// - Iterate over list of snapshots
+/// -- Recreate client/BackupReader
+/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
+/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
+///
+/// Permission checks:
+/// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
+/// - local group owner is already checked by pull_store
+async fn pull_group(
worker: &WorkerTask,
client: &HttpClient,
params: &PullParameters,
- group: &BackupGroup,
+ group: &pbs_api_types::BackupGroup,
progress: &mut StoreProgress,
) -> Result<(), Error> {
- let path = format!("api2/json/admin/datastore/{}/snapshots", params.source.store());
+ // FIXME: Namespace support
+ let ns = BackupNamespace::root();
+
+ let path = format!(
+ "api2/json/admin/datastore/{}/snapshots",
+ params.source.store()
+ );
let args = json!({
- "backup-type": group.backup_type(),
- "backup-id": group.backup_id(),
+ "backup-type": group.ty,
+ "backup-id": group.id,
});
let mut result = client.get(&path, Some(args)).await?;
let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
- list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
+ list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
client.login().await?; // make sure auth is complete
let fingerprint = client.fingerprint();
- let last_sync = params.store.last_successful_backup(group)?;
+ let last_sync = params.store.last_successful_backup(&ns, group)?;
let mut remote_snapshots = std::collections::HashSet::new();
- // start with 16384 chunks (up to 65GB)
+ // start with 65536 chunks (up to 256 GiB)
let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
progress.group_snapshots = list.len() as u64;
};
for (pos, item) in list.into_iter().enumerate() {
- let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
+ let snapshot = item.backup;
// in-progress backups can't be synced
if item.size.is_none() {
- task_log!(worker, "skipping snapshot {} - in-progress backup", snapshot);
+ task_log!(
+ worker,
+ "skipping snapshot {} - in-progress backup",
+ snapshot
+ );
continue;
}
- let backup_time = snapshot.backup_time();
-
- remote_snapshots.insert(backup_time);
+ remote_snapshots.insert(snapshot.time);
if let Some(last_sync_time) = last_sync {
- if last_sync_time > backup_time {
- skip_info.update(backup_time);
+ if last_sync_time > snapshot.time {
+ skip_info.update(snapshot.time);
continue;
}
}
// get updated auth_info (new tickets)
let auth_info = client.login().await?;
- let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
+ let options =
+ HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
+ .rate_limit(params.limit.clone());
let new_client = HttpClient::new(
params.source.host(),
new_client,
None,
params.source.store(),
- snapshot.group().backup_type(),
- snapshot.group().backup_id(),
- backup_time,
+ &ns,
+ &snapshot,
true,
)
.await?;
}
if params.remove_vanished {
- let local_list = group.list_backups(¶ms.store.base_path())?;
+ let group = params.store.backup_group(ns.clone(), group.clone());
+ let local_list = group.list_backups()?;
for info in local_list {
let backup_time = info.backup_dir.backup_time();
if remote_snapshots.contains(&backup_time) {
continue;
}
- if info.backup_dir.is_protected(params.store.base_path()) {
+ if info.backup_dir.is_protected() {
task_log!(
worker,
"don't delete vanished snapshot {:?} (protected)",
);
continue;
}
- task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path());
- params.store.remove_backup_dir(&info.backup_dir, false)?;
+ task_log!(
+ worker,
+ "delete vanished snapshot {:?}",
+ info.backup_dir.relative_path()
+ );
+ params
+ .store
+ .remove_backup_dir(&ns, info.backup_dir.as_ref(), false)?;
}
}
Ok(())
}
+/// Pulls a store according to `params`.
+///
+/// Pulling a store consists of the following steps:
+/// - Query list of groups on the remote
+/// - Filter list according to configured group filters
+/// - Iterate list and attempt to pull each group in turn
+/// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
+/// not or no longer available on the remote
+///
+/// Permission checks:
+/// - access to local datastore and remote entry need to be checked at call site
+/// - remote groups are filtered by remote
+/// - owner check for vanished groups done here
pub async fn pull_store(
worker: &WorkerTask,
client: &HttpClient,
params: &PullParameters,
) -> Result<(), Error> {
+ // FIXME: Namespace support requires source AND target namespace
+ let ns = BackupNamespace::root();
+ let local_ns = BackupNamespace::root();
+
// explicit create shared lock to prevent GC on newly created chunks
let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
+ // FIXME: Namespaces! AND: If we make this API call recurse down namespaces we need to do the
+ // same down in the `remove_vanished` case!
let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
let mut result = client
let total_count = list.len();
list.sort_unstable_by(|a, b| {
- let type_order = a.backup_type.cmp(&b.backup_type);
+ let type_order = a.backup.ty.cmp(&b.backup.ty);
if type_order == std::cmp::Ordering::Equal {
- a.backup_id.cmp(&b.backup_id)
+ a.backup.id.cmp(&b.backup.id)
} else {
type_order
}
});
- let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
- filters
- .iter()
- .any(|filter| group.matches(filter))
+ let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
+ filters.iter().any(|filter| group.matches(filter))
};
- let list:Vec<BackupGroup> = list
- .into_iter()
- .map(|item| BackupGroup::new(item.backup_type, item.backup_id))
- .collect();
+ let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
let list = if let Some(ref group_filter) = ¶ms.group_filter {
let unfiltered_count = list.len();
- let list:Vec<BackupGroup> = list
+ let list: Vec<pbs_api_types::BackupGroup> = list
.into_iter()
- .filter(|group| {
- apply_filters(&group, group_filter)
- })
+ .filter(|group| apply_filters(group, group_filter))
.collect();
- task_log!(worker, "found {} groups to sync (out of {} total)", list.len(), unfiltered_count);
+ task_log!(
+ worker,
+ "found {} groups to sync (out of {} total)",
+ list.len(),
+ unfiltered_count
+ );
list
} else {
task_log!(worker, "found {} groups to sync", total_count);
progress.done_snapshots = 0;
progress.group_snapshots = 0;
- let (owner, _lock_guard) = match params.store.create_locked_backup_group(&group, ¶ms.owner) {
- Ok(result) => result,
- Err(err) => {
- task_log!(
- worker,
- "sync group {} failed - group lock failed: {}",
- &group, err
- );
- errors = true; // do not stop here, instead continue
- continue;
- }
- };
+ let (owner, _lock_guard) =
+ match params
+ .store
+ .create_locked_backup_group(&ns, &group, ¶ms.owner)
+ {
+ Ok(result) => result,
+ Err(err) => {
+ task_log!(
+ worker,
+ "sync group {} failed - group lock failed: {}",
+ &group,
+ err
+ );
+ errors = true; // do not stop here, instead continue
+ continue;
+ }
+ };
// permission check
if params.owner != owner {
task_log!(
worker,
"sync group {} failed - owner check failed ({} != {})",
- &group, params.owner, owner
+ &group,
+ params.owner,
+ owner
);
errors = true; // do not stop here, instead continue
- } else if let Err(err) = pull_group(
- worker,
- client,
- params,
- &group,
- &mut progress,
- )
- .await
- {
- task_log!(
- worker,
- "sync group {} failed - {}",
- &group, err,
- );
+ } else if let Err(err) = pull_group(worker, client, params, &group, &mut progress).await {
+ task_log!(worker, "sync group {} failed - {}", &group, err,);
errors = true; // do not stop here, instead continue
}
}
if params.remove_vanished {
let result: Result<(), Error> = proxmox_lang::try_block!({
- let local_groups = BackupInfo::list_backup_groups(¶ms.store.base_path())?;
- for local_group in local_groups {
- if new_groups.contains(&local_group) {
+ // FIXME: See above comment about namespaces & recursion
+ for local_group in params.store.iter_backup_groups(Default::default())? {
+ let local_group = local_group?;
+ if new_groups.contains(local_group.as_ref()) {
+ continue;
+ }
+ let owner = params.store.get_owner(&local_ns, &local_group.group())?;
+ if check_backup_owner(&owner, ¶ms.owner).is_err() {
continue;
}
if let Some(ref group_filter) = ¶ms.group_filter {
- if !apply_filters(&local_group, group_filter) {
+ if !apply_filters(local_group.as_ref(), group_filter) {
continue;
}
}
local_group.backup_type(),
local_group.backup_id()
);
- match params.store.remove_backup_group(&local_group) {
- Ok(true) => {},
+ match params.store.remove_backup_group(&ns, local_group.as_ref()) {
+ Ok(true) => {}
Ok(false) => {
- task_log!(worker, "kept some protected snapshots of group '{}'", local_group);
- },
+ task_log!(
+ worker,
+ "kept some protected snapshots of group '{}'",
+ local_group
+ );
+ }
Err(err) => {
task_log!(worker, "{}", err);
errors = true;