1 //! Sync datastore from remote server
4 use anyhow
::{format_err, Error}
;
5 use futures
::{select, future::FutureExt}
;
8 use proxmox
::api
::{ApiMethod, Router, RpcEnvironment, Permission}
;
10 use pbs_client
::{HttpClient, BackupRepository}
;
12 Remote
, Authid
, SyncJobConfig
,
13 DATASTORE_SCHEMA
, REMOTE_ID_SCHEMA
, REMOVE_VANISHED_BACKUPS_SCHEMA
,
14 PRIV_DATASTORE_BACKUP
, PRIV_DATASTORE_PRUNE
, PRIV_REMOTE_READ
,
17 use crate::server
::{WorkerTask, jobstate::Job, pull::pull_store}
;
18 use crate::backup
::DataStore
;
19 use pbs_config
::CachedUserInfo
;
21 pub fn check_pull_privs(
27 ) -> Result
<(), Error
> {
29 let user_info
= CachedUserInfo
::new()?
;
31 user_info
.check_privs(auth_id
, &["datastore", store
], PRIV_DATASTORE_BACKUP
, false)?
;
32 user_info
.check_privs(auth_id
, &["remote", remote
, remote_store
], PRIV_REMOTE_READ
, false)?
;
35 user_info
.check_privs(auth_id
, &["datastore", store
], PRIV_DATASTORE_PRUNE
, false)?
;
41 pub async
fn get_pull_parameters(
45 ) -> Result
<(HttpClient
, BackupRepository
, Arc
<DataStore
>), Error
> {
47 let tgt_store
= DataStore
::lookup_datastore(store
)?
;
49 let (remote_config
, _digest
) = pbs_config
::remote
::config()?
;
50 let remote
: Remote
= remote_config
.lookup("remote", remote
)?
;
52 let src_repo
= BackupRepository
::new(
53 Some(remote
.config
.auth_id
.clone()),
54 Some(remote
.config
.host
.clone()),
56 remote_store
.to_string(),
59 let client
= crate::api2
::config
::remote
::remote_client(remote
).await?
;
61 Ok((client
, src_repo
, tgt_store
))
66 sync_job
: SyncJobConfig
,
68 schedule
: Option
<String
>,
70 ) -> Result
<String
, Error
> {
72 let job_id
= format
!("{}:{}:{}:{}",
74 sync_job
.remote_store
,
77 let worker_type
= job
.jobtype().to_string();
79 let (email
, notify
) = crate::server
::lookup_datastore_notify_settings(&sync_job
.store
);
81 let upid_str
= WorkerTask
::spawn(
86 move |worker
| async
move {
88 job
.start(&worker
.upid().to_string())?
;
90 let worker2
= worker
.clone();
91 let sync_job2
= sync_job
.clone();
93 let worker_future
= async
move {
95 let delete
= sync_job
.remove_vanished
.unwrap_or(true);
96 let sync_owner
= sync_job
.owner
.unwrap_or_else(|| Authid
::root_auth_id().clone());
97 let (client
, src_repo
, tgt_store
) = get_pull_parameters(&sync_job
.store
, &sync_job
.remote
, &sync_job
.remote_store
).await?
;
99 worker
.log(format
!("Starting datastore sync job '{}'", job_id
));
100 if let Some(event_str
) = schedule
{
101 worker
.log(format
!("task triggered by schedule '{}'", event_str
));
103 worker
.log(format
!("Sync datastore '{}' from '{}/{}'",
104 sync_job
.store
, sync_job
.remote
, sync_job
.remote_store
));
106 pull_store(&worker
, &client
, &src_repo
, tgt_store
.clone(), delete
, sync_owner
).await?
;
108 worker
.log(format
!("sync job '{}' end", &job_id
));
113 let mut abort_future
= worker2
.abort_future().map(|_
| Err(format_err
!("sync aborted")));
115 let result
= select
!{
116 worker
= worker_future
.fuse() => worker
,
117 abort
= abort_future
=> abort
,
120 let status
= worker2
.create_state(&result
);
122 match job
.finish(status
) {
125 eprintln
!("could not finish job state: {}", err
);
129 if let Some(email
) = email
{
130 if let Err(err
) = crate::server
::send_sync_status(&email
, notify
, &sync_job2
, &result
) {
131 eprintln
!("send sync notification failed: {}", err
);
145 schema
: DATASTORE_SCHEMA
,
148 schema
: REMOTE_ID_SCHEMA
,
151 schema
: DATASTORE_SCHEMA
,
154 schema
: REMOVE_VANISHED_BACKUPS_SCHEMA
,
160 // Note: used parameters are no uri parameters, so we need to test inside function body
161 description
: r
###"The user needs Datastore.Backup privilege on '/datastore/{store}',
162 and needs to own the backup group. Remote.Read is required on '/remote/{remote}/{remote-store}'.
163 The delete flag additionally requires the Datastore.Prune privilege on '/datastore/{store}'.
165 permission
: &Permission
::Anybody
,
168 /// Sync store from other repository
172 remote_store
: String
,
173 remove_vanished
: Option
<bool
>,
175 rpcenv
: &mut dyn RpcEnvironment
,
176 ) -> Result
<String
, Error
> {
178 let auth_id
: Authid
= rpcenv
.get_auth_id().unwrap().parse()?
;
179 let delete
= remove_vanished
.unwrap_or(true);
181 check_pull_privs(&auth_id
, &store
, &remote
, &remote_store
, delete
)?
;
183 let (client
, src_repo
, tgt_store
) = get_pull_parameters(&store
, &remote
, &remote_store
).await?
;
185 // fixme: set to_stdout to false?
186 let upid_str
= WorkerTask
::spawn("sync", Some(store
.clone()), auth_id
.to_string(), true, move |worker
| async
move {
188 worker
.log(format
!("sync datastore '{}' start", store
));
190 let pull_future
= pull_store(&worker
, &client
, &src_repo
, tgt_store
.clone(), delete
, auth_id
);
191 let future
= select
!{
192 success
= pull_future
.fuse() => success
,
193 abort
= worker
.abort_future().map(|_
| Err(format_err
!("pull aborted"))) => abort
,
198 worker
.log(format
!("sync datastore '{}' end", store
));
206 pub const ROUTER
: Router
= Router
::new()
207 .post(&API_METHOD_PULL
);