]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/pull.rs
api: sync job: log stats for removed vanished entities
[proxmox-backup.git] / src / api2 / pull.rs
CommitLineData
eb506c83 1//! Sync datastore from remote server
4ec73327 2use anyhow::{bail, format_err, Error};
dc7a5b34 3use futures::{future::FutureExt, select};
de8ec041 4
1e85f97b 5use proxmox_router::{Permission, Router, RpcEnvironment};
6ef1b649 6use proxmox_schema::api;
d5790a9f 7use proxmox_sys::task_log;
de8ec041 8
6afdda88 9use pbs_api_types::{
c06c1b4b 10 Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
e40c7fb9
FG
11 GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
12 PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
9b67352a 13 TRANSFER_LAST_SCHEMA,
6afdda88 14};
1ec0d70d 15use pbs_config::CachedUserInfo;
d3852556 16use proxmox_human_byte::HumanByte;
dc7a5b34 17use proxmox_rest_server::WorkerTask;
2b7f8dd5 18
6e9e6c7a 19use crate::server::jobstate::Job;
dc7a5b34 20use crate::server::pull::{pull_store, PullParameters};
de8ec041 21
268687dd 22pub fn check_pull_privs(
e6dc35ac 23 auth_id: &Authid,
268687dd 24 store: &str,
c06c1b4b 25 ns: Option<&str>,
4ec73327 26 remote: Option<&str>,
268687dd
DC
27 remote_store: &str,
28 delete: bool,
29) -> Result<(), Error> {
268687dd
DC
30 let user_info = CachedUserInfo::new()?;
31
c06c1b4b
FG
32 let local_store_ns_acl_path = match ns {
33 Some(ns) => vec!["datastore", store, ns],
34 None => vec!["datastore", store],
35 };
36
37 user_info.check_privs(
38 auth_id,
39 &local_store_ns_acl_path,
40 PRIV_DATASTORE_BACKUP,
41 false,
42 )?;
4ec73327
HL
43
44 if let Some(remote) = remote {
45 user_info.check_privs(
46 auth_id,
47 &["remote", remote, remote_store],
48 PRIV_REMOTE_READ,
49 false,
50 )?;
51 } else {
52 user_info.check_privs(
53 auth_id,
54 &["datastore", remote_store],
55 PRIV_DATASTORE_BACKUP,
56 false,
57 )?;
58 }
268687dd
DC
59
60 if delete {
c06c1b4b
FG
61 user_info.check_privs(
62 auth_id,
63 &local_store_ns_acl_path,
64 PRIV_DATASTORE_PRUNE,
65 false,
66 )?;
268687dd
DC
67 }
68
69 Ok(())
70}
71
6e9e6c7a
FG
72impl TryFrom<&SyncJobConfig> for PullParameters {
73 type Error = Error;
74
75 fn try_from(sync_job: &SyncJobConfig) -> Result<Self, Self::Error> {
76 PullParameters::new(
77 &sync_job.store,
c06c1b4b 78 sync_job.ns.clone().unwrap_or_default(),
4ec73327 79 sync_job.remote.as_deref(),
6e9e6c7a 80 &sync_job.remote_store,
c06c1b4b 81 sync_job.remote_ns.clone().unwrap_or_default(),
dc7a5b34
TL
82 sync_job
83 .owner
84 .as_ref()
85 .unwrap_or_else(|| Authid::root_auth_id())
86 .clone(),
6e9e6c7a 87 sync_job.remove_vanished,
c06c1b4b 88 sync_job.max_depth,
062edce2 89 sync_job.group_filter.clone(),
2d5287fb 90 sync_job.limit.clone(),
9b67352a 91 sync_job.transfer_last,
6e9e6c7a
FG
92 )
93 }
268687dd
DC
94}
95
42b68f72 96pub fn do_sync_job(
713b66b6 97 mut job: Job,
42b68f72 98 sync_job: SyncJobConfig,
e6dc35ac 99 auth_id: &Authid,
02543a5c 100 schedule: Option<String>,
bfa942c0 101 to_stdout: bool,
42b68f72 102) -> Result<String, Error> {
dc7a5b34 103 let job_id = format!(
c06c1b4b 104 "{}:{}:{}:{}:{}",
4ec73327 105 sync_job.remote.as_deref().unwrap_or("-"),
dc7a5b34
TL
106 sync_job.remote_store,
107 sync_job.store,
c06c1b4b 108 sync_job.ns.clone().unwrap_or_default(),
dc7a5b34
TL
109 job.jobname()
110 );
713b66b6 111 let worker_type = job.jobtype().to_string();
42b68f72 112
4ec73327
HL
113 if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
114 bail!("can't sync to same datastore");
115 }
116
f47c1d3a 117 let (email, notify) = crate::server::lookup_datastore_notify_settings(&sync_job.store);
9e733dae 118
02543a5c 119 let upid_str = WorkerTask::spawn(
713b66b6 120 &worker_type,
dbd45a72 121 Some(job_id.clone()),
049a22a3 122 auth_id.to_string(),
bfa942c0 123 to_stdout,
02543a5c 124 move |worker| async move {
02543a5c 125 job.start(&worker.upid().to_string())?;
42b68f72 126
02543a5c 127 let worker2 = worker.clone();
9e733dae 128 let sync_job2 = sync_job.clone();
42b68f72 129
02543a5c 130 let worker_future = async move {
6e9e6c7a 131 let pull_params = PullParameters::try_from(&sync_job)?;
02543a5c 132
1ec0d70d 133 task_log!(worker, "Starting datastore sync job '{}'", job_id);
02543a5c 134 if let Some(event_str) = schedule {
1ec0d70d 135 task_log!(worker, "task triggered by schedule '{}'", event_str);
02543a5c 136 }
1ec0d70d
DM
137 task_log!(
138 worker,
4ec73327 139 "sync datastore '{}' from '{}{}'",
1ec0d70d 140 sync_job.store,
4ec73327
HL
141 sync_job
142 .remote
143 .as_deref()
144 .map_or(String::new(), |remote| format!("{remote}/")),
1ec0d70d
DM
145 sync_job.remote_store,
146 );
02543a5c 147
d3852556 148 let pull_stats = pull_store(&worker, pull_params).await?;
ed9721f2
TL
149
150 if pull_stats.bytes != 0 {
151 let amount = HumanByte::from(pull_stats.bytes);
152 let rate = HumanByte::new_binary(
153 pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(),
154 );
155 task_log!(
156 worker,
157 "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)",
158 pull_stats.chunk_count,
159 );
160 } else {
161 task_log!(worker, "Summary: sync job found no new data to pull");
162 }
02543a5c 163
f4a8be4b
CE
164 if let Some(removed) = pull_stats.removed {
165 task_log!(
166 worker,
167 "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
168 removed.snapshots,
169 removed.groups,
170 removed.namespaces,
171 );
172 }
173
1ec0d70d 174 task_log!(worker, "sync job '{}' end", &job_id);
02543a5c
DC
175
176 Ok(())
177 };
178
dc7a5b34
TL
179 let mut abort_future = worker2
180 .abort_future()
181 .map(|_| Err(format_err!("sync aborted")));
02543a5c 182
dc7a5b34 183 let result = select! {
02543a5c
DC
184 worker = worker_future.fuse() => worker,
185 abort = abort_future => abort,
186 };
187
9e733dae 188 let status = worker2.create_state(&result);
02543a5c
DC
189
190 match job.finish(status) {
dc7a5b34 191 Ok(_) => {}
02543a5c
DC
192 Err(err) => {
193 eprintln!("could not finish job state: {}", err);
194 }
195 }
196
9e733dae 197 if let Some(email) = email {
dc7a5b34
TL
198 if let Err(err) =
199 crate::server::send_sync_status(&email, notify, &sync_job2, &result)
200 {
9e733dae
DM
201 eprintln!("send sync notification failed: {}", err);
202 }
203 }
204
205 result
dc7a5b34
TL
206 },
207 )?;
42b68f72
DC
208
209 Ok(upid_str)
210}
211
de8ec041
DM
212#[api(
213 input: {
214 properties: {
215 store: {
216 schema: DATASTORE_SCHEMA,
217 },
c06c1b4b
FG
218 ns: {
219 type: BackupNamespace,
220 optional: true,
221 },
94609e23
DM
222 remote: {
223 schema: REMOTE_ID_SCHEMA,
4ec73327 224 optional: true,
de8ec041
DM
225 },
226 "remote-store": {
227 schema: DATASTORE_SCHEMA,
228 },
c06c1b4b
FG
229 "remote-ns": {
230 type: BackupNamespace,
231 optional: true,
232 },
b4900286
DM
233 "remove-vanished": {
234 schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
4b4eba0b 235 optional: true,
4b4eba0b 236 },
c06c1b4b 237 "max-depth": {
e40c7fb9 238 schema: NS_MAX_DEPTH_REDUCED_SCHEMA,
c06c1b4b
FG
239 optional: true,
240 },
062edce2 241 "group-filter": {
71e53463
FG
242 schema: GROUP_FILTER_LIST_SCHEMA,
243 optional: true,
244 },
2d5287fb
DM
245 limit: {
246 type: RateLimitConfig,
247 flatten: true,
9b67352a
SH
248 },
249 "transfer-last": {
250 schema: TRANSFER_LAST_SCHEMA,
251 optional: true,
252 },
de8ec041
DM
253 },
254 },
404d78c4 255 access: {
365f0f72 256 // Note: used parameters are no uri parameters, so we need to test inside function body
54552dda 257 description: r###"The user needs Datastore.Backup privilege on '/datastore/{store}',
8247db5b 258and needs to own the backup group. Remote.Read is required on '/remote/{remote}/{remote-store}'.
ec67af9a 259The delete flag additionally requires the Datastore.Prune privilege on '/datastore/{store}'.
54552dda 260"###,
365f0f72 261 permission: &Permission::Anybody,
404d78c4 262 },
de8ec041 263)]
eb506c83 264/// Sync store from other repository
e1db0670 265#[allow(clippy::too_many_arguments)]
dc7a5b34 266async fn pull(
de8ec041 267 store: String,
c06c1b4b 268 ns: Option<BackupNamespace>,
4ec73327 269 remote: Option<String>,
de8ec041 270 remote_store: String,
c06c1b4b 271 remote_ns: Option<BackupNamespace>,
b4900286 272 remove_vanished: Option<bool>,
c06c1b4b 273 max_depth: Option<usize>,
062edce2 274 group_filter: Option<Vec<GroupFilter>>,
2d5287fb 275 limit: RateLimitConfig,
9b67352a 276 transfer_last: Option<usize>,
de8ec041
DM
277 rpcenv: &mut dyn RpcEnvironment,
278) -> Result<String, Error> {
e6dc35ac 279 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
61ef4ae8 280 let delete = remove_vanished.unwrap_or(false);
4b4eba0b 281
4ec73327
HL
282 if remote.is_none() && store == remote_store {
283 bail!("can't sync to same datastore");
284 }
285
c06c1b4b 286 let ns = ns.unwrap_or_default();
c06c1b4b
FG
287 let ns_str = if ns.is_root() {
288 None
289 } else {
290 Some(ns.to_string())
291 };
292
293 check_pull_privs(
294 &auth_id,
295 &store,
296 ns_str.as_deref(),
4ec73327 297 remote.as_deref(),
c06c1b4b
FG
298 &remote_store,
299 delete,
300 )?;
de8ec041 301
6e9e6c7a
FG
302 let pull_params = PullParameters::new(
303 &store,
c06c1b4b 304 ns,
4ec73327 305 remote.as_deref(),
6e9e6c7a 306 &remote_store,
c06c1b4b 307 remote_ns.unwrap_or_default(),
6e9e6c7a
FG
308 auth_id.clone(),
309 remove_vanished,
c06c1b4b 310 max_depth,
062edce2 311 group_filter,
2d5287fb 312 limit,
9b67352a 313 transfer_last,
6e9e6c7a 314 )?;
de8ec041
DM
315
316 // fixme: set to_stdout to false?
b9b2d635 317 // FIXME: add namespace to worker id?
dc7a5b34
TL
318 let upid_str = WorkerTask::spawn(
319 "sync",
320 Some(store.clone()),
321 auth_id.to_string(),
322 true,
323 move |worker| async move {
c06c1b4b
FG
324 task_log!(
325 worker,
326 "pull datastore '{}' from '{}/{}'",
327 store,
4ec73327 328 remote.as_deref().unwrap_or("-"),
c06c1b4b
FG
329 remote_store,
330 );
de8ec041 331
05a52d01 332 let pull_future = pull_store(&worker, pull_params);
10dac693 333 (select! {
dc7a5b34
TL
334 success = pull_future.fuse() => success,
335 abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
10dac693 336 })?;
de8ec041 337
c06c1b4b 338 task_log!(worker, "pull datastore '{}' end", store);
de8ec041 339
dc7a5b34
TL
340 Ok(())
341 },
342 )?;
de8ec041
DM
343
344 Ok(upid_str)
345}
346
dc7a5b34 347pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);