1 //! Sync datastore from remote server
4 use anyhow
::{format_err, Error}
;
5 use futures
::{select, future::FutureExt}
;
8 use proxmox
::api
::{ApiMethod, Router, RpcEnvironment, Permission}
;
10 use crate::server
::{WorkerTask}
;
11 use crate::backup
::DataStore
;
12 use crate::client
::{HttpClient, HttpClientOptions, BackupRepository, pull::pull_store}
;
13 use crate::api2
::types
::*;
18 acl
::{PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ}
,
19 cached_user_info
::CachedUserInfo
,
23 pub fn check_pull_privs(
29 ) -> Result
<(), Error
> {
31 let user_info
= CachedUserInfo
::new()?
;
33 user_info
.check_privs(userid
, &["datastore", store
], PRIV_DATASTORE_BACKUP
, false)?
;
34 user_info
.check_privs(userid
, &["remote", remote
, remote_store
], PRIV_REMOTE_READ
, false)?
;
37 user_info
.check_privs(userid
, &["datastore", store
], PRIV_DATASTORE_PRUNE
, false)?
;
43 pub async
fn get_pull_parameters(
47 ) -> Result
<(HttpClient
, BackupRepository
, Arc
<DataStore
>), Error
> {
49 let tgt_store
= DataStore
::lookup_datastore(store
)?
;
51 let (remote_config
, _digest
) = remote
::config()?
;
52 let remote
: remote
::Remote
= remote_config
.lookup("remote", remote
)?
;
54 let options
= HttpClientOptions
::new()
55 .password(Some(remote
.password
.clone()))
56 .fingerprint(remote
.fingerprint
.clone());
58 let client
= HttpClient
::new(&remote
.host
, &remote
.userid
, options
)?
;
59 let _auth_info
= client
.login() // make sure we can auth
61 .map_err(|err
| format_err
!("remote connection to '{}' failed - {}", remote
.host
, err
))?
;
63 let src_repo
= BackupRepository
::new(Some(remote
.userid
), Some(remote
.host
), remote_store
.to_string());
65 Ok((client
, src_repo
, tgt_store
))
70 sync_job
: SyncJobConfig
,
72 schedule
: Option
<String
>,
74 ) -> Result
<String
, Error
> {
76 let job_id
= id
.to_string();
77 let worker_type
= "syncjob";
79 let upid_str
= WorkerTask
::spawn(
84 move |worker
| async
move {
86 job
.start(&worker
.upid().to_string())?
;
88 let worker2
= worker
.clone();
90 let worker_future
= async
move {
92 let delete
= sync_job
.remove_vanished
.unwrap_or(true);
93 let (client
, src_repo
, tgt_store
) = get_pull_parameters(&sync_job
.store
, &sync_job
.remote
, &sync_job
.remote_store
).await?
;
95 worker
.log(format
!("Starting datastore sync job '{}'", job_id
));
96 if let Some(event_str
) = schedule
{
97 worker
.log(format
!("task triggered by schedule '{}'", event_str
));
99 worker
.log(format
!("Sync datastore '{}' from '{}/{}'",
100 sync_job
.store
, sync_job
.remote
, sync_job
.remote_store
));
102 crate::client
::pull
::pull_store(&worker
, &client
, &src_repo
, tgt_store
.clone(), delete
, Userid
::backup_userid().clone()).await?
;
104 worker
.log(format
!("sync job '{}' end", &job_id
));
109 let mut abort_future
= worker2
.abort_future().map(|_
| Err(format_err
!("sync aborted")));
112 worker
= worker_future
.fuse() => worker
,
113 abort
= abort_future
=> abort
,
116 let status
= worker2
.create_state(&res
);
118 match job
.finish(status
) {
121 eprintln
!("could not finish job state: {}", err
);
135 schema
: DATASTORE_SCHEMA
,
138 schema
: REMOTE_ID_SCHEMA
,
141 schema
: DATASTORE_SCHEMA
,
144 schema
: REMOVE_VANISHED_BACKUPS_SCHEMA
,
150 // Note: used parameters are no uri parameters, so we need to test inside function body
151 description
: r
###"The user needs Datastore.Backup privilege on '/datastore/{store}',
152 and needs to own the backup group. Remote.Read is required on '/remote/{remote}/{remote-store}'.
153 The delete flag additionally requires the Datastore.Prune privilege on '/datastore/{store}'.
155 permission
: &Permission
::Anybody
,
158 /// Sync store from other repository
162 remote_store
: String
,
163 remove_vanished
: Option
<bool
>,
165 rpcenv
: &mut dyn RpcEnvironment
,
166 ) -> Result
<String
, Error
> {
168 let userid
: Userid
= rpcenv
.get_user().unwrap().parse()?
;
169 let delete
= remove_vanished
.unwrap_or(true);
171 check_pull_privs(&userid
, &store
, &remote
, &remote_store
, delete
)?
;
173 let (client
, src_repo
, tgt_store
) = get_pull_parameters(&store
, &remote
, &remote_store
).await?
;
175 // fixme: set to_stdout to false?
176 let upid_str
= WorkerTask
::spawn("sync", Some(store
.clone()), userid
.clone(), true, move |worker
| async
move {
178 worker
.log(format
!("sync datastore '{}' start", store
));
180 pull_store(&worker
, &client
, &src_repo
, tgt_store
.clone(), delete
, userid
).await?
;
182 worker
.log(format
!("sync datastore '{}' end", store
));
190 pub const ROUTER
: Router
= Router
::new()
191 .post(&API_METHOD_PULL
);