//! Sync datastore from remote server
-use std::sync::{Arc};
+use std::convert::TryFrom;
use anyhow::{format_err, Error};
+use futures::{select, future::FutureExt};
-use proxmox::api::api;
-use proxmox::api::{ApiMethod, Router, RpcEnvironment, Permission};
-
-use crate::server::{WorkerTask};
-use crate::backup::DataStore;
-use crate::client::{HttpClient, HttpClientOptions, BackupRepository, pull::pull_store};
-use crate::api2::types::*;
-use crate::config::{
- remote,
- acl::{PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ},
- cached_user_info::CachedUserInfo,
+use proxmox_schema::api;
+use proxmox_router::{ApiMethod, Router, RpcEnvironment, Permission};
+use proxmox_sys::task_log;
+
+use pbs_api_types::{
+ Authid, SyncJobConfig, GroupFilter, RateLimitConfig, GROUP_FILTER_LIST_SCHEMA,
+ DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
+ PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
};
+use proxmox_rest_server::WorkerTask;
+use pbs_config::CachedUserInfo;
+
+use crate::server::pull::{PullParameters, pull_store};
+use crate::server::jobstate::Job;
pub fn check_pull_privs(
- username: &str,
+ auth_id: &Authid,
store: &str,
remote: &str,
remote_store: &str,
let user_info = CachedUserInfo::new()?;
- user_info.check_privs(username, &["datastore", store], PRIV_DATASTORE_BACKUP, false)?;
- user_info.check_privs(username, &["remote", remote, remote_store], PRIV_REMOTE_READ, false)?;
+ user_info.check_privs(auth_id, &["datastore", store], PRIV_DATASTORE_BACKUP, false)?;
+ user_info.check_privs(auth_id, &["remote", remote, remote_store], PRIV_REMOTE_READ, false)?;
if delete {
- user_info.check_privs(username, &["datastore", store], PRIV_DATASTORE_PRUNE, false)?;
+ user_info.check_privs(auth_id, &["datastore", store], PRIV_DATASTORE_PRUNE, false)?;
}
Ok(())
}
-pub async fn get_pull_parameters(
- store: &str,
- remote: &str,
- remote_store: &str,
-) -> Result<(HttpClient, BackupRepository, Arc<DataStore>), Error> {
+impl TryFrom<&SyncJobConfig> for PullParameters {
+ type Error = Error;
+
+ fn try_from(sync_job: &SyncJobConfig) -> Result<Self, Self::Error> {
+ PullParameters::new(
+ &sync_job.store,
+ &sync_job.remote,
+ &sync_job.remote_store,
+ sync_job.owner.as_ref().unwrap_or_else(|| Authid::root_auth_id()).clone(),
+ sync_job.remove_vanished,
+ sync_job.group_filter.clone(),
+ sync_job.limit.clone(),
+ )
+ }
+}
+
+pub fn do_sync_job(
+ mut job: Job,
+ sync_job: SyncJobConfig,
+ auth_id: &Authid,
+ schedule: Option<String>,
+ to_stdout: bool,
+) -> Result<String, Error> {
+
+ let job_id = format!("{}:{}:{}:{}",
+ sync_job.remote,
+ sync_job.remote_store,
+ sync_job.store,
+ job.jobname());
+ let worker_type = job.jobtype().to_string();
+
+ let (email, notify) = crate::server::lookup_datastore_notify_settings(&sync_job.store);
+
+ let upid_str = WorkerTask::spawn(
+ &worker_type,
+ Some(job_id.clone()),
+ auth_id.to_string(),
+ to_stdout,
+ move |worker| async move {
+
+ job.start(&worker.upid().to_string())?;
- let tgt_store = DataStore::lookup_datastore(store)?;
+ let worker2 = worker.clone();
+ let sync_job2 = sync_job.clone();
- let (remote_config, _digest) = remote::config()?;
- let remote: remote::Remote = remote_config.lookup("remote", remote)?;
+ let worker_future = async move {
- let options = HttpClientOptions::new()
- .password(Some(remote.password.clone()))
- .fingerprint(remote.fingerprint.clone());
+ let pull_params = PullParameters::try_from(&sync_job)?;
+ let client = pull_params.client().await?;
- let client = HttpClient::new(&remote.host, &remote.userid, options)?;
- let _auth_info = client.login() // make sure we can auth
- .await
- .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote.host, err))?;
+ task_log!(worker, "Starting datastore sync job '{}'", job_id);
+ if let Some(event_str) = schedule {
+ task_log!(worker, "task triggered by schedule '{}'", event_str);
+ }
+ task_log!(
+ worker,
+ "sync datastore '{}' from '{}/{}'",
+ sync_job.store,
+ sync_job.remote,
+ sync_job.remote_store,
+ );
- let src_repo = BackupRepository::new(Some(remote.userid), Some(remote.host), remote_store.to_string());
+ pull_store(&worker, &client, &pull_params).await?;
- Ok((client, src_repo, tgt_store))
+ task_log!(worker, "sync job '{}' end", &job_id);
+
+ Ok(())
+ };
+
+ let mut abort_future = worker2.abort_future().map(|_| Err(format_err!("sync aborted")));
+
+ let result = select!{
+ worker = worker_future.fuse() => worker,
+ abort = abort_future => abort,
+ };
+
+ let status = worker2.create_state(&result);
+
+ match job.finish(status) {
+ Ok(_) => {},
+ Err(err) => {
+ eprintln!("could not finish job state: {}", err);
+ }
+ }
+
+ if let Some(email) = email {
+ if let Err(err) = crate::server::send_sync_status(&email, notify, &sync_job2, &result) {
+ eprintln!("send sync notification failed: {}", err);
+ }
+ }
+
+ result
+ })?;
+
+ Ok(upid_str)
}
#[api(
schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
optional: true,
},
+ "group-filter": {
+ schema: GROUP_FILTER_LIST_SCHEMA,
+ optional: true,
+ },
+ limit: {
+ type: RateLimitConfig,
+ flatten: true,
+ }
},
},
access: {
remote: String,
remote_store: String,
remove_vanished: Option<bool>,
+ group_filter: Option<Vec<GroupFilter>>,
+ limit: RateLimitConfig,
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<String, Error> {
- let username = rpcenv.get_user().unwrap();
- let delete = remove_vanished.unwrap_or(true);
+ let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+ let delete = remove_vanished.unwrap_or(false);
- check_pull_privs(&username, &store, &remote, &remote_store, delete)?;
+ check_pull_privs(&auth_id, &store, &remote, &remote_store, delete)?;
- let (client, src_repo, tgt_store) = get_pull_parameters(&store, &remote, &remote_store).await?;
+ let pull_params = PullParameters::new(
+ &store,
+ &remote,
+ &remote_store,
+ auth_id.clone(),
+ remove_vanished,
+ group_filter,
+ limit,
+ )?;
+ let client = pull_params.client().await?;
// fixme: set to_stdout to false?
- let upid_str = WorkerTask::spawn("sync", Some(store.clone()), &username.clone(), true, move |worker| async move {
+ let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move {
+
+ task_log!(worker, "sync datastore '{}' start", store);
- worker.log(format!("sync datastore '{}' start", store));
+ let pull_future = pull_store(&worker, &client, &pull_params);
+ let future = select!{
+ success = pull_future.fuse() => success,
+ abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
+ };
- pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, username).await?;
+ let _ = future?;
- worker.log(format!("sync datastore '{}' end", store));
+ task_log!(worker, "sync datastore '{}' end", store);
Ok(())
})?;