use http::StatusCode;
use proxmox_router::HttpError;
+use proxmox_sys::task_log;
-use pbs_api_types::{Authid, GroupListItem, Remote, SnapshotListItem};
-use pbs_datastore::{DataStore, BackupInfo, BackupDir, BackupGroup, StoreProgress};
+use pbs_api_types::{
+ Authid, GroupFilter, GroupListItem, RateLimitConfig, Remote,
+ SnapshotListItem,
+};
+
+use pbs_datastore::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress};
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
};
use pbs_tools::sha::sha256;
-use pbs_tools::task_log;
use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
use proxmox_rest_server::WorkerTask;
store: Arc<DataStore>,
owner: Authid,
remove_vanished: bool,
+ group_filter: Option<Vec<GroupFilter>>,
+ limit: RateLimitConfig,
}
impl PullParameters {
remote_store: &str,
owner: Authid,
remove_vanished: Option<bool>,
+ group_filter: Option<Vec<GroupFilter>>,
+ limit: RateLimitConfig,
) -> Result<Self, Error> {
let store = DataStore::lookup_datastore(store)?;
let (remote_config, _digest) = pbs_config::remote::config()?;
let remote: Remote = remote_config.lookup("remote", remote)?;
- let remove_vanished = remove_vanished.unwrap_or(true);
+ let remove_vanished = remove_vanished.unwrap_or(false);
let source = BackupRepository::new(
Some(remote.config.auth_id.clone()),
remote_store.to_string(),
);
- Ok(Self { remote, source, store, owner, remove_vanished })
+ Ok(Self { remote, source, store, owner, remove_vanished, group_filter, limit })
}
pub async fn client(&self) -> Result<HttpClient, Error> {
- crate::api2::config::remote::remote_client(&self.remote).await
+ crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
}
}
let verify_and_write_channel = verify_and_write_channel.clone();
Ok::<_, Error>(async move {
- let chunk_exists = pbs_runtime::block_in_place(|| {
+ let chunk_exists = proxmox_async::runtime::block_in_place(|| {
target.cond_touch_chunk(&info.digest, false)
})?;
if chunk_exists {
let raw_size = chunk.raw_size() as usize;
// decode, verify and write in a separate threads to maximize throughput
- pbs_runtime::block_in_place(|| {
+ proxmox_async::runtime::block_in_place(|| {
verify_and_write_channel.send((chunk, info.digest, info.size()))
})?;
let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
- task_log!(worker, "found {} groups to sync", list.len());
-
+ let total_count = list.len();
list.sort_unstable_by(|a, b| {
let type_order = a.backup_type.cmp(&b.backup_type);
if type_order == std::cmp::Ordering::Equal {
}
});
+ let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
+ filters
+ .iter()
+ .any(|filter| group.matches(filter))
+ };
+
let list:Vec<BackupGroup> = list
.into_iter()
.map(|item| BackupGroup::new(item.backup_type, item.backup_id))
.collect();
+ let list = if let Some(ref group_filter) = ¶ms.group_filter {
+ let unfiltered_count = list.len();
+ let list:Vec<BackupGroup> = list
+ .into_iter()
+ .filter(|group| {
+ apply_filters(&group, group_filter)
+ })
+ .collect();
+ task_log!(worker, "found {} groups to sync (out of {} total)", list.len(), unfiltered_count);
+ list
+ } else {
+ task_log!(worker, "found {} groups to sync", total_count);
+ list
+ };
+
let mut errors = false;
let mut new_groups = std::collections::HashSet::new();
if new_groups.contains(&local_group) {
continue;
}
+ if let Some(ref group_filter) = ¶ms.group_filter {
+ if !apply_filters(&local_group, group_filter) {
+ continue;
+ }
+ }
task_log!(
worker,
"delete vanished group '{}/{}'",