1 //! Sync datastore from remote server
4 use anyhow
::{format_err, Error}
;
5 use futures
::{select, future::FutureExt}
;
8 use proxmox
::api
::{ApiMethod, Router, RpcEnvironment, Permission}
;
10 use pbs_client
::{HttpClient, BackupRepository}
;
12 Remote
, Authid
, SyncJobConfig
,
13 DATASTORE_SCHEMA
, REMOTE_ID_SCHEMA
, REMOVE_VANISHED_BACKUPS_SCHEMA
,
14 PRIV_DATASTORE_BACKUP
, PRIV_DATASTORE_PRUNE
, PRIV_REMOTE_READ
,
17 use crate::server
::{WorkerTask, jobstate::Job, pull::pull_store}
;
18 use crate::backup
::DataStore
;
19 use crate::config
::cached_user_info
::CachedUserInfo
;
21 pub fn check_pull_privs(
27 ) -> Result
<(), Error
> {
29 let user_info
= CachedUserInfo
::new()?
;
31 user_info
.check_privs(auth_id
, &["datastore", store
], PRIV_DATASTORE_BACKUP
, false)?
;
32 user_info
.check_privs(auth_id
, &["remote", remote
, remote_store
], PRIV_REMOTE_READ
, false)?
;
35 user_info
.check_privs(auth_id
, &["datastore", store
], PRIV_DATASTORE_PRUNE
, false)?
;
41 pub async
fn get_pull_parameters(
45 ) -> Result
<(HttpClient
, BackupRepository
, Arc
<DataStore
>), Error
> {
47 let tgt_store
= DataStore
::lookup_datastore(store
)?
;
49 let (remote_config
, _digest
) = pbs_config
::remote
::config()?
;
50 let remote
: Remote
= remote_config
.lookup("remote", remote
)?
;
52 let src_repo
= BackupRepository
::new(
53 Some(remote
.config
.auth_id
.clone()),
54 Some(remote
.config
.host
.clone()),
56 remote_store
.to_string(),
59 let client
= crate::api2
::config
::remote
::remote_client(remote
).await?
;
61 Ok((client
, src_repo
, tgt_store
))
66 sync_job
: SyncJobConfig
,
68 schedule
: Option
<String
>,
69 ) -> Result
<String
, Error
> {
71 let job_id
= format
!("{}:{}:{}:{}",
73 sync_job
.remote_store
,
76 let worker_type
= job
.jobtype().to_string();
78 let (email
, notify
) = crate::server
::lookup_datastore_notify_settings(&sync_job
.store
);
80 let upid_str
= WorkerTask
::spawn(
85 move |worker
| async
move {
87 job
.start(&worker
.upid().to_string())?
;
89 let worker2
= worker
.clone();
90 let sync_job2
= sync_job
.clone();
92 let worker_future
= async
move {
94 let delete
= sync_job
.remove_vanished
.unwrap_or(true);
95 let sync_owner
= sync_job
.owner
.unwrap_or_else(|| Authid
::root_auth_id().clone());
96 let (client
, src_repo
, tgt_store
) = get_pull_parameters(&sync_job
.store
, &sync_job
.remote
, &sync_job
.remote_store
).await?
;
98 worker
.log(format
!("Starting datastore sync job '{}'", job_id
));
99 if let Some(event_str
) = schedule
{
100 worker
.log(format
!("task triggered by schedule '{}'", event_str
));
102 worker
.log(format
!("Sync datastore '{}' from '{}/{}'",
103 sync_job
.store
, sync_job
.remote
, sync_job
.remote_store
));
105 pull_store(&worker
, &client
, &src_repo
, tgt_store
.clone(), delete
, sync_owner
).await?
;
107 worker
.log(format
!("sync job '{}' end", &job_id
));
112 let mut abort_future
= worker2
.abort_future().map(|_
| Err(format_err
!("sync aborted")));
114 let result
= select
!{
115 worker
= worker_future
.fuse() => worker
,
116 abort
= abort_future
=> abort
,
119 let status
= worker2
.create_state(&result
);
121 match job
.finish(status
) {
124 eprintln
!("could not finish job state: {}", err
);
128 if let Some(email
) = email
{
129 if let Err(err
) = crate::server
::send_sync_status(&email
, notify
, &sync_job2
, &result
) {
130 eprintln
!("send sync notification failed: {}", err
);
144 schema
: DATASTORE_SCHEMA
,
147 schema
: REMOTE_ID_SCHEMA
,
150 schema
: DATASTORE_SCHEMA
,
153 schema
: REMOVE_VANISHED_BACKUPS_SCHEMA
,
159 // Note: used parameters are no uri parameters, so we need to test inside function body
160 description
: r
###"The user needs Datastore.Backup privilege on '/datastore/{store}',
161 and needs to own the backup group. Remote.Read is required on '/remote/{remote}/{remote-store}'.
162 The delete flag additionally requires the Datastore.Prune privilege on '/datastore/{store}'.
164 permission
: &Permission
::Anybody
,
167 /// Sync store from other repository
171 remote_store
: String
,
172 remove_vanished
: Option
<bool
>,
174 rpcenv
: &mut dyn RpcEnvironment
,
175 ) -> Result
<String
, Error
> {
177 let auth_id
: Authid
= rpcenv
.get_auth_id().unwrap().parse()?
;
178 let delete
= remove_vanished
.unwrap_or(true);
180 check_pull_privs(&auth_id
, &store
, &remote
, &remote_store
, delete
)?
;
182 let (client
, src_repo
, tgt_store
) = get_pull_parameters(&store
, &remote
, &remote_store
).await?
;
184 // fixme: set to_stdout to false?
185 let upid_str
= WorkerTask
::spawn("sync", Some(store
.clone()), auth_id
.clone(), true, move |worker
| async
move {
187 worker
.log(format
!("sync datastore '{}' start", store
));
189 let pull_future
= pull_store(&worker
, &client
, &src_repo
, tgt_store
.clone(), delete
, auth_id
);
190 let future
= select
!{
191 success
= pull_future
.fuse() => success
,
192 abort
= worker
.abort_future().map(|_
| Err(format_err
!("pull aborted"))) => abort
,
197 worker
.log(format
!("sync datastore '{}' end", store
));
205 pub const ROUTER
: Router
= Router
::new()
206 .post(&API_METHOD_PULL
);