1 //! Sync datastore from remote server
4 use anyhow
::{format_err, Error}
;
5 use futures
::{select, future::FutureExt}
;
8 use proxmox
::api
::{ApiMethod, Router, RpcEnvironment, Permission}
;
10 use crate::server
::{WorkerTask, jobstate::Job}
;
11 use crate::backup
::DataStore
;
12 use crate::client
::{HttpClient, HttpClientOptions, BackupRepository, pull::pull_store}
;
13 use crate::api2
::types
::*;
17 acl
::{PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ}
,
18 cached_user_info
::CachedUserInfo
,
22 pub fn check_pull_privs(
28 ) -> Result
<(), Error
> {
30 let user_info
= CachedUserInfo
::new()?
;
32 user_info
.check_privs(userid
, &["datastore", store
], PRIV_DATASTORE_BACKUP
, false)?
;
33 user_info
.check_privs(userid
, &["remote", remote
, remote_store
], PRIV_REMOTE_READ
, false)?
;
36 user_info
.check_privs(userid
, &["datastore", store
], PRIV_DATASTORE_PRUNE
, false)?
;
42 pub async
fn get_pull_parameters(
46 ) -> Result
<(HttpClient
, BackupRepository
, Arc
<DataStore
>), Error
> {
48 let tgt_store
= DataStore
::lookup_datastore(store
)?
;
50 let (remote_config
, _digest
) = remote
::config()?
;
51 let remote
: remote
::Remote
= remote_config
.lookup("remote", remote
)?
;
53 let options
= HttpClientOptions
::new()
54 .password(Some(remote
.password
.clone()))
55 .fingerprint(remote
.fingerprint
.clone());
57 let src_repo
= BackupRepository
::new(Some(remote
.userid
.clone()), Some(remote
.host
.clone()), remote
.port
, remote_store
.to_string());
59 let client
= HttpClient
::new(&src_repo
.host(), src_repo
.port(), &src_repo
.user(), options
)?
;
60 let _auth_info
= client
.login() // make sure we can auth
62 .map_err(|err
| format_err
!("remote connection to '{}' failed - {}", remote
.host
, err
))?
;
65 Ok((client
, src_repo
, tgt_store
))
70 sync_job
: SyncJobConfig
,
72 schedule
: Option
<String
>,
73 ) -> Result
<String
, Error
> {
75 let job_id
= job
.jobname().to_string();
76 let worker_type
= job
.jobtype().to_string();
78 let upid_str
= WorkerTask
::spawn(
80 Some(job
.jobname().to_string()),
83 move |worker
| async
move {
85 job
.start(&worker
.upid().to_string())?
;
87 let worker2
= worker
.clone();
89 let worker_future
= async
move {
91 let delete
= sync_job
.remove_vanished
.unwrap_or(true);
92 let (client
, src_repo
, tgt_store
) = get_pull_parameters(&sync_job
.store
, &sync_job
.remote
, &sync_job
.remote_store
).await?
;
94 worker
.log(format
!("Starting datastore sync job '{}'", job_id
));
95 if let Some(event_str
) = schedule
{
96 worker
.log(format
!("task triggered by schedule '{}'", event_str
));
98 worker
.log(format
!("Sync datastore '{}' from '{}/{}'",
99 sync_job
.store
, sync_job
.remote
, sync_job
.remote_store
));
101 crate::client
::pull
::pull_store(&worker
, &client
, &src_repo
, tgt_store
.clone(), delete
, Userid
::backup_userid().clone()).await?
;
103 worker
.log(format
!("sync job '{}' end", &job_id
));
108 let mut abort_future
= worker2
.abort_future().map(|_
| Err(format_err
!("sync aborted")));
111 worker
= worker_future
.fuse() => worker
,
112 abort
= abort_future
=> abort
,
115 let status
= worker2
.create_state(&res
);
117 match job
.finish(status
) {
120 eprintln
!("could not finish job state: {}", err
);
134 schema
: DATASTORE_SCHEMA
,
137 schema
: REMOTE_ID_SCHEMA
,
140 schema
: DATASTORE_SCHEMA
,
143 schema
: REMOVE_VANISHED_BACKUPS_SCHEMA
,
149 // Note: used parameters are no uri parameters, so we need to test inside function body
150 description
: r
###"The user needs Datastore.Backup privilege on '/datastore/{store}',
151 and needs to own the backup group. Remote.Read is required on '/remote/{remote}/{remote-store}'.
152 The delete flag additionally requires the Datastore.Prune privilege on '/datastore/{store}'.
154 permission
: &Permission
::Anybody
,
157 /// Sync store from other repository
161 remote_store
: String
,
162 remove_vanished
: Option
<bool
>,
164 rpcenv
: &mut dyn RpcEnvironment
,
165 ) -> Result
<String
, Error
> {
167 let userid
: Userid
= rpcenv
.get_user().unwrap().parse()?
;
168 let delete
= remove_vanished
.unwrap_or(true);
170 check_pull_privs(&userid
, &store
, &remote
, &remote_store
, delete
)?
;
172 let (client
, src_repo
, tgt_store
) = get_pull_parameters(&store
, &remote
, &remote_store
).await?
;
174 // fixme: set to_stdout to false?
175 let upid_str
= WorkerTask
::spawn("sync", Some(store
.clone()), userid
.clone(), true, move |worker
| async
move {
177 worker
.log(format
!("sync datastore '{}' start", store
));
179 let pull_future
= pull_store(&worker
, &client
, &src_repo
, tgt_store
.clone(), delete
, userid
);
180 let future
= select
!{
181 success
= pull_future
.fuse() => success
,
182 abort
= worker
.abort_future().map(|_
| Err(format_err
!("pull aborted"))) => abort
,
187 worker
.log(format
!("sync datastore '{}' end", store
));
195 pub const ROUTER
: Router
= Router
::new()
196 .post(&API_METHOD_PULL
);