use http::StatusCode;
use proxmox_router::HttpError;
+use proxmox_sys::task_log;
-use pbs_api_types::{Authid, SnapshotListItem, GroupListItem};
-use pbs_datastore::{DataStore, BackupInfo, BackupDir, BackupGroup, StoreProgress};
+use pbs_api_types::{
+ Authid, GroupFilter, GroupListItem, RateLimitConfig, Remote,
+ SnapshotListItem,
+};
+
+use pbs_datastore::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress};
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
};
use pbs_tools::sha::sha256;
-use pbs_tools::task_log;
use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
use proxmox_rest_server::WorkerTask;
// fixme: delete vanished groups
// Todo: correctly lock backup groups
+pub struct PullParameters {
+ remote: Remote,
+ source: BackupRepository,
+ store: Arc<DataStore>,
+ owner: Authid,
+ remove_vanished: bool,
+ group_filter: Option<Vec<GroupFilter>>,
+ limit: RateLimitConfig,
+}
+
+impl PullParameters {
+ pub fn new(
+ store: &str,
+ remote: &str,
+ remote_store: &str,
+ owner: Authid,
+ remove_vanished: Option<bool>,
+ group_filter: Option<Vec<GroupFilter>>,
+ limit: RateLimitConfig,
+ ) -> Result<Self, Error> {
+ let store = DataStore::lookup_datastore(store)?;
+
+ let (remote_config, _digest) = pbs_config::remote::config()?;
+ let remote: Remote = remote_config.lookup("remote", remote)?;
+
+ let remove_vanished = remove_vanished.unwrap_or(false);
+
+ let source = BackupRepository::new(
+ Some(remote.config.auth_id.clone()),
+ Some(remote.config.host.clone()),
+ remote.config.port,
+ remote_store.to_string(),
+ );
+
+ Ok(Self { remote, source, store, owner, remove_vanished, group_filter, limit })
+ }
+
+ pub async fn client(&self) -> Result<HttpClient, Error> {
+ crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
+ }
+}
+
async fn pull_index_chunks<I: IndexFile>(
worker: &WorkerTask,
chunk_reader: RemoteChunkReader,
let verify_and_write_channel = verify_and_write_channel.clone();
Ok::<_, Error>(async move {
- let chunk_exists = pbs_runtime::block_in_place(|| {
+ let chunk_exists = proxmox_async::runtime::block_in_place(|| {
target.cond_touch_chunk(&info.digest, false)
})?;
if chunk_exists {
let raw_size = chunk.raw_size() as usize;
// decode, verify and write in a separate threads to maximize throughput
- pbs_runtime::block_in_place(|| {
+ proxmox_async::runtime::block_in_place(|| {
verify_and_write_channel.send((chunk, info.digest, info.size()))
})?;
pub async fn pull_group(
worker: &WorkerTask,
client: &HttpClient,
- src_repo: &BackupRepository,
- tgt_store: Arc<DataStore>,
+ params: &PullParameters,
group: &BackupGroup,
- delete: bool,
progress: &mut StoreProgress,
) -> Result<(), Error> {
- let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
+ let path = format!("api2/json/admin/datastore/{}/snapshots", params.source.store());
let args = json!({
"backup-type": group.backup_type(),
let fingerprint = client.fingerprint();
- let last_sync = tgt_store.last_successful_backup(group)?;
+ let last_sync = params.store.last_successful_backup(group)?;
let mut remote_snapshots = std::collections::HashSet::new();
let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
let new_client = HttpClient::new(
- src_repo.host(),
- src_repo.port(),
- src_repo.auth_id(),
+ params.source.host(),
+ params.source.port(),
+ params.source.auth_id(),
options,
)?;
let reader = BackupReader::start(
new_client,
None,
- src_repo.store(),
+ params.source.store(),
snapshot.group().backup_type(),
snapshot.group().backup_id(),
backup_time,
let result = pull_snapshot_from(
worker,
reader,
- tgt_store.clone(),
+ params.store.clone(),
&snapshot,
downloaded_chunks.clone(),
)
result?; // stop on error
}
- if delete {
- let local_list = group.list_backups(&tgt_store.base_path())?;
+ if params.remove_vanished {
+ let local_list = group.list_backups(¶ms.store.base_path())?;
for info in local_list {
let backup_time = info.backup_dir.backup_time();
if remote_snapshots.contains(&backup_time) {
continue;
}
- if info.backup_dir.is_protected(tgt_store.base_path()) {
+ if info.backup_dir.is_protected(params.store.base_path()) {
task_log!(
worker,
"don't delete vanished snapshot {:?} (protected)",
continue;
}
task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path());
- tgt_store.remove_backup_dir(&info.backup_dir, false)?;
+ params.store.remove_backup_dir(&info.backup_dir, false)?;
}
}
pub async fn pull_store(
worker: &WorkerTask,
client: &HttpClient,
- src_repo: &BackupRepository,
- tgt_store: Arc<DataStore>,
- delete: bool,
- auth_id: Authid,
+ params: &PullParameters,
) -> Result<(), Error> {
// explicit create shared lock to prevent GC on newly created chunks
- let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
+ let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
- let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
+ let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
let mut result = client
.get(&path, None)
let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
- task_log!(worker, "found {} groups to sync", list.len());
-
+ let total_count = list.len();
list.sort_unstable_by(|a, b| {
let type_order = a.backup_type.cmp(&b.backup_type);
if type_order == std::cmp::Ordering::Equal {
}
});
+ let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
+ filters
+ .iter()
+ .any(|filter| group.matches(filter))
+ };
+
+ let list:Vec<BackupGroup> = list
+ .into_iter()
+ .map(|item| BackupGroup::new(item.backup_type, item.backup_id))
+ .collect();
+
+ let list = if let Some(ref group_filter) = ¶ms.group_filter {
+ let unfiltered_count = list.len();
+ let list:Vec<BackupGroup> = list
+ .into_iter()
+ .filter(|group| {
+ apply_filters(&group, group_filter)
+ })
+ .collect();
+ task_log!(worker, "found {} groups to sync (out of {} total)", list.len(), unfiltered_count);
+ list
+ } else {
+ task_log!(worker, "found {} groups to sync", total_count);
+ list
+ };
+
let mut errors = false;
let mut new_groups = std::collections::HashSet::new();
- for item in list.iter() {
- new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id));
+ for group in list.iter() {
+ new_groups.insert(group.clone());
}
let mut progress = StoreProgress::new(list.len() as u64);
- for (done, item) in list.into_iter().enumerate() {
+ for (done, group) in list.into_iter().enumerate() {
progress.done_groups = done as u64;
progress.done_snapshots = 0;
progress.group_snapshots = 0;
- let group = BackupGroup::new(&item.backup_type, &item.backup_id);
-
- let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
+ let (owner, _lock_guard) = match params.store.create_locked_backup_group(&group, ¶ms.owner) {
Ok(result) => result,
Err(err) => {
task_log!(
worker,
- "sync group {}/{} failed - group lock failed: {}",
- item.backup_type, item.backup_id, err
+ "sync group {} failed - group lock failed: {}",
+ &group, err
);
errors = true; // do not stop here, instead continue
continue;
};
// permission check
- if auth_id != owner {
+ if params.owner != owner {
// only the owner is allowed to create additional snapshots
task_log!(
worker,
- "sync group {}/{} failed - owner check failed ({} != {})",
- item.backup_type, item.backup_id, auth_id, owner
+ "sync group {} failed - owner check failed ({} != {})",
+ &group, params.owner, owner
);
errors = true; // do not stop here, instead continue
} else if let Err(err) = pull_group(
worker,
client,
- src_repo,
- tgt_store.clone(),
+ params,
&group,
- delete,
&mut progress,
)
.await
{
task_log!(
worker,
- "sync group {}/{} failed - {}",
- item.backup_type, item.backup_id, err,
+ "sync group {} failed - {}",
+ &group, err,
);
errors = true; // do not stop here, instead continue
}
}
- if delete {
+ if params.remove_vanished {
let result: Result<(), Error> = proxmox_lang::try_block!({
- let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?;
+ let local_groups = BackupInfo::list_backup_groups(¶ms.store.base_path())?;
for local_group in local_groups {
if new_groups.contains(&local_group) {
continue;
}
+ if let Some(ref group_filter) = ¶ms.group_filter {
+ if !apply_filters(&local_group, group_filter) {
+ continue;
+ }
+ }
task_log!(
worker,
"delete vanished group '{}/{}'",
local_group.backup_type(),
local_group.backup_id()
);
- match tgt_store.remove_backup_group(&local_group) {
+ match params.store.remove_backup_group(&local_group) {
Ok(true) => {},
Ok(false) => {
task_log!(worker, "kept some protected snapshots of group '{}'", local_group);