]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/pull.rs
use RateLimitConfig for HttpClient and pull
[proxmox-backup.git] / src / api2 / pull.rs
CommitLineData
eb506c83 1//! Sync datastore from remote server
6e9e6c7a 2use std::convert::TryFrom;
eb506c83 3
07ad6470 4use anyhow::{format_err, Error};
02543a5c 5use futures::{select, future::FutureExt};
de8ec041 6
6ef1b649
WB
7use proxmox_schema::api;
8use proxmox_router::{ApiMethod, Router, RpcEnvironment, Permission};
d5790a9f 9use proxmox_sys::task_log;
de8ec041 10
6afdda88 11use pbs_api_types::{
2d5287fb 12 Authid, SyncJobConfig, GroupFilter, RateLimitConfig, GROUP_FILTER_LIST_SCHEMA,
8cc3760e
DM
13 DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
14 PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
6afdda88 15};
b9700a9f 16use proxmox_rest_server::WorkerTask;
1ec0d70d 17use pbs_config::CachedUserInfo;
2b7f8dd5 18
6e9e6c7a
FG
19use crate::server::pull::{PullParameters, pull_store};
20use crate::server::jobstate::Job;
21
de8ec041 22
268687dd 23pub fn check_pull_privs(
e6dc35ac 24 auth_id: &Authid,
268687dd
DC
25 store: &str,
26 remote: &str,
27 remote_store: &str,
28 delete: bool,
29) -> Result<(), Error> {
30
31 let user_info = CachedUserInfo::new()?;
32
e6dc35ac
FG
33 user_info.check_privs(auth_id, &["datastore", store], PRIV_DATASTORE_BACKUP, false)?;
34 user_info.check_privs(auth_id, &["remote", remote, remote_store], PRIV_REMOTE_READ, false)?;
268687dd
DC
35
36 if delete {
e6dc35ac 37 user_info.check_privs(auth_id, &["datastore", store], PRIV_DATASTORE_PRUNE, false)?;
268687dd
DC
38 }
39
40 Ok(())
41}
42
6e9e6c7a
FG
43impl TryFrom<&SyncJobConfig> for PullParameters {
44 type Error = Error;
45
46 fn try_from(sync_job: &SyncJobConfig) -> Result<Self, Self::Error> {
47 PullParameters::new(
48 &sync_job.store,
49 &sync_job.remote,
50 &sync_job.remote_store,
51 sync_job.owner.as_ref().unwrap_or_else(|| Authid::root_auth_id()).clone(),
52 sync_job.remove_vanished,
062edce2 53 sync_job.group_filter.clone(),
2d5287fb 54 sync_job.limit.clone(),
6e9e6c7a
FG
55 )
56 }
268687dd
DC
57}
58
42b68f72 59pub fn do_sync_job(
713b66b6 60 mut job: Job,
42b68f72 61 sync_job: SyncJobConfig,
e6dc35ac 62 auth_id: &Authid,
02543a5c 63 schedule: Option<String>,
bfa942c0 64 to_stdout: bool,
42b68f72
DC
65) -> Result<String, Error> {
66
dbd45a72
FG
67 let job_id = format!("{}:{}:{}:{}",
68 sync_job.remote,
69 sync_job.remote_store,
70 sync_job.store,
71 job.jobname());
713b66b6 72 let worker_type = job.jobtype().to_string();
42b68f72 73
f47c1d3a 74 let (email, notify) = crate::server::lookup_datastore_notify_settings(&sync_job.store);
9e733dae 75
02543a5c 76 let upid_str = WorkerTask::spawn(
713b66b6 77 &worker_type,
dbd45a72 78 Some(job_id.clone()),
049a22a3 79 auth_id.to_string(),
bfa942c0 80 to_stdout,
02543a5c 81 move |worker| async move {
42b68f72 82
02543a5c 83 job.start(&worker.upid().to_string())?;
42b68f72 84
02543a5c 85 let worker2 = worker.clone();
9e733dae 86 let sync_job2 = sync_job.clone();
42b68f72 87
02543a5c 88 let worker_future = async move {
42b68f72 89
6e9e6c7a
FG
90 let pull_params = PullParameters::try_from(&sync_job)?;
91 let client = pull_params.client().await?;
02543a5c 92
1ec0d70d 93 task_log!(worker, "Starting datastore sync job '{}'", job_id);
02543a5c 94 if let Some(event_str) = schedule {
1ec0d70d 95 task_log!(worker, "task triggered by schedule '{}'", event_str);
02543a5c 96 }
1ec0d70d
DM
97 task_log!(
98 worker,
99 "sync datastore '{}' from '{}/{}'",
100 sync_job.store,
101 sync_job.remote,
102 sync_job.remote_store,
103 );
02543a5c 104
6e9e6c7a 105 pull_store(&worker, &client, &pull_params).await?;
02543a5c 106
1ec0d70d 107 task_log!(worker, "sync job '{}' end", &job_id);
02543a5c
DC
108
109 Ok(())
110 };
111
112 let mut abort_future = worker2.abort_future().map(|_| Err(format_err!("sync aborted")));
113
9e733dae 114 let result = select!{
02543a5c
DC
115 worker = worker_future.fuse() => worker,
116 abort = abort_future => abort,
117 };
118
9e733dae 119 let status = worker2.create_state(&result);
02543a5c
DC
120
121 match job.finish(status) {
122 Ok(_) => {},
123 Err(err) => {
124 eprintln!("could not finish job state: {}", err);
125 }
126 }
127
9e733dae 128 if let Some(email) = email {
f47c1d3a 129 if let Err(err) = crate::server::send_sync_status(&email, notify, &sync_job2, &result) {
9e733dae
DM
130 eprintln!("send sync notification failed: {}", err);
131 }
132 }
133
134 result
02543a5c 135 })?;
42b68f72
DC
136
137 Ok(upid_str)
138}
139
de8ec041
DM
140#[api(
141 input: {
142 properties: {
143 store: {
144 schema: DATASTORE_SCHEMA,
145 },
94609e23
DM
146 remote: {
147 schema: REMOTE_ID_SCHEMA,
de8ec041
DM
148 },
149 "remote-store": {
150 schema: DATASTORE_SCHEMA,
151 },
b4900286
DM
152 "remove-vanished": {
153 schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
4b4eba0b 154 optional: true,
4b4eba0b 155 },
062edce2 156 "group-filter": {
71e53463
FG
157 schema: GROUP_FILTER_LIST_SCHEMA,
158 optional: true,
159 },
2d5287fb
DM
160 limit: {
161 type: RateLimitConfig,
162 flatten: true,
163 }
de8ec041
DM
164 },
165 },
404d78c4 166 access: {
365f0f72 167 // Note: used parameters are no uri parameters, so we need to test inside function body
54552dda 168 description: r###"The user needs Datastore.Backup privilege on '/datastore/{store}',
8247db5b 169and needs to own the backup group. Remote.Read is required on '/remote/{remote}/{remote-store}'.
ec67af9a 170The delete flag additionally requires the Datastore.Prune privilege on '/datastore/{store}'.
54552dda 171"###,
365f0f72 172 permission: &Permission::Anybody,
404d78c4 173 },
de8ec041 174)]
eb506c83
DM
175/// Sync store from other repository
176async fn pull (
de8ec041 177 store: String,
94609e23 178 remote: String,
de8ec041 179 remote_store: String,
b4900286 180 remove_vanished: Option<bool>,
062edce2 181 group_filter: Option<Vec<GroupFilter>>,
2d5287fb 182 limit: RateLimitConfig,
de8ec041
DM
183 _info: &ApiMethod,
184 rpcenv: &mut dyn RpcEnvironment,
185) -> Result<String, Error> {
186
e6dc35ac 187 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
61ef4ae8 188 let delete = remove_vanished.unwrap_or(false);
4b4eba0b 189
e6dc35ac 190 check_pull_privs(&auth_id, &store, &remote, &remote_store, delete)?;
de8ec041 191
6e9e6c7a
FG
192 let pull_params = PullParameters::new(
193 &store,
194 &remote,
195 &remote_store,
196 auth_id.clone(),
197 remove_vanished,
062edce2 198 group_filter,
2d5287fb 199 limit,
6e9e6c7a
FG
200 )?;
201 let client = pull_params.client().await?;
de8ec041
DM
202
203 // fixme: set to_stdout to false?
049a22a3 204 let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move {
de8ec041 205
1ec0d70d 206 task_log!(worker, "sync datastore '{}' start", store);
de8ec041 207
6e9e6c7a 208 let pull_future = pull_store(&worker, &client, &pull_params);
36700a0a
DC
209 let future = select!{
210 success = pull_future.fuse() => success,
211 abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
212 };
213
214 let _ = future?;
de8ec041 215
1ec0d70d 216 task_log!(worker, "sync datastore '{}' end", store);
de8ec041
DM
217
218 Ok(())
219 })?;
220
221 Ok(upid_str)
222}
223
224pub const ROUTER: Router = Router::new()
eb506c83 225 .post(&API_METHOD_PULL);