//! Sync datastore from remote server
-
-use failure::*;
-use serde_json::json;
-use std::convert::TryFrom;
-use std::sync::Arc;
-use std::collections::HashMap;
-use std::io::{Seek, SeekFrom};
-use chrono::{Utc, TimeZone};
-
-use proxmox::api::api;
-use proxmox::api::{ApiMethod, Router, RpcEnvironment};
-
-use crate::server::{WorkerTask};
-use crate::backup::*;
-use crate::client::*;
-use crate::config::remote;
-use crate::api2::types::*;
-
-// fixme: implement filters
-// fixme: delete vanished groups
-// Todo: correctly lock backup groups
-
-async fn pull_index_chunks<I: IndexFile>(
- _worker: &WorkerTask,
- chunk_reader: &mut RemoteChunkReader,
- target: Arc<DataStore>,
- index: I,
+use anyhow::{bail, format_err, Error};
+use futures::{future::FutureExt, select};
+
+use proxmox_router::{Permission, Router, RpcEnvironment};
+use proxmox_schema::api;
+use proxmox_sys::task_log;
+
+use pbs_api_types::{
+ Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
+ GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
+ PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
+ TRANSFER_LAST_SCHEMA,
+};
+use pbs_config::CachedUserInfo;
+use proxmox_human_byte::HumanByte;
+use proxmox_rest_server::WorkerTask;
+
+use crate::server::jobstate::Job;
+use crate::server::pull::{pull_store, PullParameters};
+
+pub fn check_pull_privs(
+ auth_id: &Authid,
+ store: &str,
+ ns: Option<&str>,
+ remote: Option<&str>,
+ remote_store: &str,
+ delete: bool,
) -> Result<(), Error> {
+ let user_info = CachedUserInfo::new()?;
+
+ let local_store_ns_acl_path = match ns {
+ Some(ns) => vec!["datastore", store, ns],
+ None => vec!["datastore", store],
+ };
+
+ user_info.check_privs(
+ auth_id,
+ &local_store_ns_acl_path,
+ PRIV_DATASTORE_BACKUP,
+ false,
+ )?;
+
+ if let Some(remote) = remote {
+ user_info.check_privs(
+ auth_id,
+ &["remote", remote, remote_store],
+ PRIV_REMOTE_READ,
+ false,
+ )?;
+ } else {
+ user_info.check_privs(
+ auth_id,
+ &["datastore", remote_store],
+ PRIV_DATASTORE_BACKUP,
+ false,
+ )?;
+ }
-
- for pos in 0..index.index_count() {
- let digest = index.index_digest(pos).unwrap();
- let chunk_exists = target.cond_touch_chunk(digest, false)?;
- if chunk_exists {
- //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
- continue;
- }
- //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
- let chunk = chunk_reader.read_raw_chunk(&digest)?;
-
- target.insert_chunk(&chunk, &digest)?;
+ if delete {
+ user_info.check_privs(
+ auth_id,
+ &local_store_ns_acl_path,
+ PRIV_DATASTORE_PRUNE,
+ false,
+ )?;
}
Ok(())
}
-async fn download_manifest(
- reader: &BackupReader,
- filename: &std::path::Path,
-) -> Result<std::fs::File, Error> {
-
- let tmp_manifest_file = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&filename)?;
-
- let mut tmp_manifest_file = reader.download(MANIFEST_BLOB_NAME, tmp_manifest_file).await?;
-
- tmp_manifest_file.seek(SeekFrom::Start(0))?;
-
- Ok(tmp_manifest_file)
-}
-
-async fn pull_single_archive(
- worker: &WorkerTask,
- reader: &BackupReader,
- chunk_reader: &mut RemoteChunkReader,
- tgt_store: Arc<DataStore>,
- snapshot: &BackupDir,
- archive_name: &str,
-) -> Result<(), Error> {
-
- let mut path = tgt_store.base_path();
- path.push(snapshot.relative_path());
- path.push(archive_name);
-
- let mut tmp_path = path.clone();
- tmp_path.set_extension("tmp");
-
- worker.log(format!("sync archive {}", archive_name));
- let tmpfile = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&tmp_path)?;
-
- let tmpfile = reader.download(archive_name, tmpfile).await?;
-
- match archive_type(archive_name)? {
- ArchiveType::DynamicIndex => {
- let index = DynamicIndexReader::new(tmpfile)
- .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?;
-
- pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
- }
- ArchiveType::FixedIndex => {
- let index = FixedIndexReader::new(tmpfile)
- .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?;
-
- pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
- }
- ArchiveType::Blob => { /* nothing to do */ }
+impl TryFrom<&SyncJobConfig> for PullParameters {
+ type Error = Error;
+
+ fn try_from(sync_job: &SyncJobConfig) -> Result<Self, Self::Error> {
+ PullParameters::new(
+ &sync_job.store,
+ sync_job.ns.clone().unwrap_or_default(),
+ sync_job.remote.as_deref(),
+ &sync_job.remote_store,
+ sync_job.remote_ns.clone().unwrap_or_default(),
+ sync_job
+ .owner
+ .as_ref()
+ .unwrap_or_else(|| Authid::root_auth_id())
+ .clone(),
+ sync_job.remove_vanished,
+ sync_job.max_depth,
+ sync_job.group_filter.clone(),
+ sync_job.limit.clone(),
+ sync_job.transfer_last,
+ )
}
- if let Err(err) = std::fs::rename(&tmp_path, &path) {
- bail!("Atomic rename file {:?} failed - {}", path, err);
- }
- Ok(())
}
-async fn pull_snapshot(
- worker: &WorkerTask,
- reader: Arc<BackupReader>,
- tgt_store: Arc<DataStore>,
- snapshot: &BackupDir,
-) -> Result<(), Error> {
-
- let mut manifest_name = tgt_store.base_path();
- manifest_name.push(snapshot.relative_path());
- manifest_name.push(MANIFEST_BLOB_NAME);
-
- let mut tmp_manifest_name = manifest_name.clone();
- tmp_manifest_name.set_extension("tmp");
+pub fn do_sync_job(
+ mut job: Job,
+ sync_job: SyncJobConfig,
+ auth_id: &Authid,
+ schedule: Option<String>,
+ to_stdout: bool,
+) -> Result<String, Error> {
+ let job_id = format!(
+ "{}:{}:{}:{}:{}",
+ sync_job.remote.as_deref().unwrap_or("-"),
+ sync_job.remote_store,
+ sync_job.store,
+ sync_job.ns.clone().unwrap_or_default(),
+ job.jobname()
+ );
+ let worker_type = job.jobtype().to_string();
+
+ if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
+ bail!("can't sync to same datastore");
+ }
- let mut tmp_manifest_file = download_manifest(&reader, &tmp_manifest_name).await?;
- let tmp_manifest_blob = DataBlob::load(&mut tmp_manifest_file)?;
- tmp_manifest_blob.verify_crc()?;
+ let (email, notify) = crate::server::lookup_datastore_notify_settings(&sync_job.store);
- if manifest_name.exists() {
- let manifest_blob = proxmox::tools::try_block!({
- let mut manifest_file = std::fs::File::open(&manifest_name)
- .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?;
+ let upid_str = WorkerTask::spawn(
+ &worker_type,
+ Some(job_id.clone()),
+ auth_id.to_string(),
+ to_stdout,
+ move |worker| async move {
+ job.start(&worker.upid().to_string())?;
- let manifest_blob = DataBlob::load(&mut manifest_file)?;
- manifest_blob.verify_crc()?;
- Ok(manifest_blob)
- }).map_err(|err: Error| {
- format_err!("unable to read local manifest {:?} - {}", manifest_name, err)
- })?;
+ let worker2 = worker.clone();
+ let sync_job2 = sync_job.clone();
- if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
- return Ok(()); // nothing changed
- }
- }
+ let worker_future = async move {
+ let pull_params = PullParameters::try_from(&sync_job)?;
- let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
-
- let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, HashMap::new());
-
- for item in manifest.files() {
- let mut path = tgt_store.base_path();
- path.push(snapshot.relative_path());
- path.push(&item.filename);
-
- if path.exists() {
- match archive_type(&item.filename)? {
- ArchiveType::DynamicIndex => {
- let index = DynamicIndexReader::open(&path)?;
- let (csum, size) = index.compute_csum();
- match manifest.verify_file(&item.filename, &csum, size) {
- Ok(_) => continue,
- Err(err) => {
- worker.log(format!("detected changed file {:?} - {}", path, err));
- }
- }
- }
- ArchiveType::FixedIndex => {
- let index = FixedIndexReader::open(&path)?;
- let (csum, size) = index.compute_csum();
- match manifest.verify_file(&item.filename, &csum, size) {
- Ok(_) => continue,
- Err(err) => {
- worker.log(format!("detected changed file {:?} - {}", path, err));
- }
- }
+ task_log!(worker, "Starting datastore sync job '{}'", job_id);
+ if let Some(event_str) = schedule {
+ task_log!(worker, "task triggered by schedule '{}'", event_str);
}
- ArchiveType::Blob => {
- let mut tmpfile = std::fs::File::open(&path)?;
- let (csum, size) = compute_file_csum(&mut tmpfile)?;
- match manifest.verify_file(&item.filename, &csum, size) {
- Ok(_) => continue,
- Err(err) => {
- worker.log(format!("detected changed file {:?} - {}", path, err));
- }
- }
+ task_log!(
+ worker,
+ "sync datastore '{}' from '{}{}'",
+ sync_job.store,
+ sync_job
+ .remote
+ .as_deref()
+ .map_or(String::new(), |remote| format!("{remote}/")),
+ sync_job.remote_store,
+ );
+
+ let pull_stats = pull_store(&worker, pull_params).await?;
+
+ if pull_stats.bytes != 0 {
+ let amount = HumanByte::from(pull_stats.bytes);
+ let rate = HumanByte::new_binary(
+ pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(),
+ );
+ task_log!(
+ worker,
+ "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)",
+ pull_stats.chunk_count,
+ );
+ } else {
+ task_log!(worker, "Summary: sync job found no new data to pull");
}
- }
- }
-
- pull_single_archive(
- worker,
- &reader,
- &mut chunk_reader,
- tgt_store.clone(),
- snapshot,
- &item.filename,
- ).await?;
- }
- if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
- bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
- }
+ if let Some(removed) = pull_stats.removed {
+ task_log!(
+ worker,
+ "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
+ removed.snapshots,
+ removed.groups,
+ removed.namespaces,
+ );
+ }
- // cleanup - remove stale files
- tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
+ task_log!(worker, "sync job '{}' end", &job_id);
- Ok(())
-}
+ Ok(())
+ };
-pub async fn pull_snapshot_from(
- worker: &WorkerTask,
- reader: Arc<BackupReader>,
- tgt_store: Arc<DataStore>,
- snapshot: &BackupDir,
-) -> Result<(), Error> {
+ let mut abort_future = worker2
+ .abort_future()
+ .map(|_| Err(format_err!("sync aborted")));
- let (_path, is_new) = tgt_store.create_backup_dir(&snapshot)?;
+ let result = select! {
+ worker = worker_future.fuse() => worker,
+ abort = abort_future => abort,
+ };
- if is_new {
- worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
+ let status = worker2.create_state(&result);
- if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await {
- if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot) {
- worker.log(format!("cleanup error - {}", cleanup_err));
+ match job.finish(status) {
+ Ok(_) => {}
+ Err(err) => {
+ eprintln!("could not finish job state: {}", err);
+ }
}
- return Err(err);
- }
- } else {
- worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
- pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await?
- }
-
- Ok(())
-}
-
-pub async fn pull_group(
- worker: &WorkerTask,
- client: &HttpClient,
- src_repo: &BackupRepository,
- tgt_store: Arc<DataStore>,
- group: &BackupGroup,
-) -> Result<(), Error> {
-
- let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
-
- let args = json!({
- "backup-type": group.backup_type(),
- "backup-id": group.backup_id(),
- });
-
- let mut result = client.get(&path, Some(args)).await?;
- let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
- list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
-
- let auth_info = client.login().await?;
-
- let last_sync = group.last_successful_backup(&tgt_store.base_path())?;
-
- for item in list {
- let backup_time = Utc.timestamp(item.backup_time, 0);
- if let Some(last_sync_time) = last_sync {
- if last_sync_time > backup_time { continue; }
- }
-
- let new_client = HttpClient::new(
- src_repo.host(),
- src_repo.user(),
- Some(auth_info.ticket.clone())
- )?;
-
- let reader = BackupReader::start(
- new_client,
- None,
- src_repo.store(),
- &item.backup_type,
- &item.backup_id,
- backup_time,
- true,
- ).await?;
-
- let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time);
-
- pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?;
- }
-
- Ok(())
-}
-
-pub async fn pull_store(
- worker: &WorkerTask,
- client: &HttpClient,
- src_repo: &BackupRepository,
- tgt_store: Arc<DataStore>,
-) -> Result<(), Error> {
-
- let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
-
- let mut result = client.get(&path, None).await?;
-
- let list = result["data"].as_array_mut().unwrap();
-
- list.sort_unstable_by(|a, b| {
- let a_id = a["backup-id"].as_str().unwrap();
- let a_backup_type = a["backup-type"].as_str().unwrap();
- let b_id = b["backup-id"].as_str().unwrap();
- let b_backup_type = b["backup-type"].as_str().unwrap();
-
- let type_order = a_backup_type.cmp(b_backup_type);
- if type_order == std::cmp::Ordering::Equal {
- a_id.cmp(b_id)
- } else {
- type_order
- }
- });
-
- let mut errors = false;
-
- for item in list {
-
- let id = item["backup-id"].as_str().unwrap();
- let btype = item["backup-type"].as_str().unwrap();
-
- let group = BackupGroup::new(btype, id);
- if let Err(err) = pull_group(worker, client, src_repo, tgt_store.clone(), &group).await {
- worker.log(format!("sync group {}/{} failed - {}", btype, id, err));
- errors = true;
- // continue
- }
- }
+ if let Some(email) = email {
+ if let Err(err) =
+ crate::server::send_sync_status(&email, notify, &sync_job2, &result)
+ {
+ eprintln!("send sync notification failed: {}", err);
+ }
+ }
- if errors {
- bail!("sync failed with some errors.");
- }
+ result
+ },
+ )?;
- Ok(())
+ Ok(upid_str)
}
#[api(
store: {
schema: DATASTORE_SCHEMA,
},
+ ns: {
+ type: BackupNamespace,
+ optional: true,
+ },
remote: {
schema: REMOTE_ID_SCHEMA,
+ optional: true,
},
"remote-store": {
schema: DATASTORE_SCHEMA,
},
+ "remote-ns": {
+ type: BackupNamespace,
+ optional: true,
+ },
+ "remove-vanished": {
+ schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
+ optional: true,
+ },
+ "max-depth": {
+ schema: NS_MAX_DEPTH_REDUCED_SCHEMA,
+ optional: true,
+ },
+ "group-filter": {
+ schema: GROUP_FILTER_LIST_SCHEMA,
+ optional: true,
+ },
+ limit: {
+ type: RateLimitConfig,
+ flatten: true,
+ },
+ "transfer-last": {
+ schema: TRANSFER_LAST_SCHEMA,
+ optional: true,
+ },
},
},
+ access: {
+ // Note: used parameters are no uri parameters, so we need to test inside function body
+ description: r###"The user needs Datastore.Backup privilege on '/datastore/{store}',
+and needs to own the backup group. Remote.Read is required on '/remote/{remote}/{remote-store}'.
+The delete flag additionally requires the Datastore.Prune privilege on '/datastore/{store}'.
+"###,
+ permission: &Permission::Anybody,
+ },
)]
/// Sync store from other repository
-async fn pull (
+#[allow(clippy::too_many_arguments)]
+async fn pull(
store: String,
- remote: String,
+ ns: Option<BackupNamespace>,
+ remote: Option<String>,
remote_store: String,
- _info: &ApiMethod,
+ remote_ns: Option<BackupNamespace>,
+ remove_vanished: Option<bool>,
+ max_depth: Option<usize>,
+ group_filter: Option<Vec<GroupFilter>>,
+ limit: RateLimitConfig,
+ transfer_last: Option<usize>,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<String, Error> {
+ let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+ let delete = remove_vanished.unwrap_or(false);
- let username = rpcenv.get_user().unwrap();
-
- let tgt_store = DataStore::lookup_datastore(&store)?;
-
- let (remote_config, _digest) = remote::config()?;
- let remote: remote::Remote = remote_config.lookup("remote", &remote)?;
-
- let client = HttpClient::new(&remote.host, &remote.userid, Some(remote.password.clone()))?;
- let _auth_info = client.login() // make sure we can auth
- .await
- .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote.host, err))?;
+ if remote.is_none() && store == remote_store {
+ bail!("can't sync to same datastore");
+ }
- let src_repo = BackupRepository::new(Some(remote.userid), Some(remote.host), remote_store);
+ let ns = ns.unwrap_or_default();
+ let ns_str = if ns.is_root() {
+ None
+ } else {
+ Some(ns.to_string())
+ };
+
+ check_pull_privs(
+ &auth_id,
+ &store,
+ ns_str.as_deref(),
+ remote.as_deref(),
+ &remote_store,
+ delete,
+ )?;
+
+ let pull_params = PullParameters::new(
+ &store,
+ ns,
+ remote.as_deref(),
+ &remote_store,
+ remote_ns.unwrap_or_default(),
+ auth_id.clone(),
+ remove_vanished,
+ max_depth,
+ group_filter,
+ limit,
+ transfer_last,
+ )?;
// fixme: set to_stdout to false?
- let upid_str = WorkerTask::spawn("sync", Some(store.clone()), &username.clone(), true, move |worker| async move {
-
- worker.log(format!("sync datastore '{}' start", store));
-
- // explicit create shared lock to prevent GC on newly created chunks
- let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
-
- pull_store(&worker, &client, &src_repo, tgt_store.clone()).await?;
-
- worker.log(format!("sync datastore '{}' end", store));
-
- Ok(())
- })?;
+ // FIXME: add namespace to worker id?
+ let upid_str = WorkerTask::spawn(
+ "sync",
+ Some(store.clone()),
+ auth_id.to_string(),
+ true,
+ move |worker| async move {
+ task_log!(
+ worker,
+ "pull datastore '{}' from '{}/{}'",
+ store,
+ remote.as_deref().unwrap_or("-"),
+ remote_store,
+ );
+
+ let pull_future = pull_store(&worker, pull_params);
+ (select! {
+ success = pull_future.fuse() => success,
+ abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
+ })?;
+
+ task_log!(worker, "pull datastore '{}' end", store);
+
+ Ok(())
+ },
+ )?;
Ok(upid_str)
}
-pub const ROUTER: Router = Router::new()
- .post(&API_METHOD_PULL);
+pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);