]>
Commit | Line | Data |
---|---|---|
eb506c83 | 1 | //! Sync datastore from remote server |
4ec73327 | 2 | use anyhow::{bail, format_err, Error}; |
dc7a5b34 | 3 | use futures::{future::FutureExt, select}; |
de8ec041 | 4 | |
1e85f97b | 5 | use proxmox_router::{Permission, Router, RpcEnvironment}; |
6ef1b649 | 6 | use proxmox_schema::api; |
d5790a9f | 7 | use proxmox_sys::task_log; |
de8ec041 | 8 | |
6afdda88 | 9 | use pbs_api_types::{ |
c06c1b4b | 10 | Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA, |
e40c7fb9 FG |
11 | GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP, |
12 | PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, | |
9b67352a | 13 | TRANSFER_LAST_SCHEMA, |
6afdda88 | 14 | }; |
1ec0d70d | 15 | use pbs_config::CachedUserInfo; |
d3852556 | 16 | use proxmox_human_byte::HumanByte; |
dc7a5b34 | 17 | use proxmox_rest_server::WorkerTask; |
2b7f8dd5 | 18 | |
6e9e6c7a | 19 | use crate::server::jobstate::Job; |
dc7a5b34 | 20 | use crate::server::pull::{pull_store, PullParameters}; |
de8ec041 | 21 | |
268687dd | 22 | pub fn check_pull_privs( |
e6dc35ac | 23 | auth_id: &Authid, |
268687dd | 24 | store: &str, |
c06c1b4b | 25 | ns: Option<&str>, |
4ec73327 | 26 | remote: Option<&str>, |
268687dd DC |
27 | remote_store: &str, |
28 | delete: bool, | |
29 | ) -> Result<(), Error> { | |
268687dd DC |
30 | let user_info = CachedUserInfo::new()?; |
31 | ||
c06c1b4b FG |
32 | let local_store_ns_acl_path = match ns { |
33 | Some(ns) => vec!["datastore", store, ns], | |
34 | None => vec!["datastore", store], | |
35 | }; | |
36 | ||
37 | user_info.check_privs( | |
38 | auth_id, | |
39 | &local_store_ns_acl_path, | |
40 | PRIV_DATASTORE_BACKUP, | |
41 | false, | |
42 | )?; | |
4ec73327 HL |
43 | |
44 | if let Some(remote) = remote { | |
45 | user_info.check_privs( | |
46 | auth_id, | |
47 | &["remote", remote, remote_store], | |
48 | PRIV_REMOTE_READ, | |
49 | false, | |
50 | )?; | |
51 | } else { | |
52 | user_info.check_privs( | |
53 | auth_id, | |
54 | &["datastore", remote_store], | |
55 | PRIV_DATASTORE_BACKUP, | |
56 | false, | |
57 | )?; | |
58 | } | |
268687dd DC |
59 | |
60 | if delete { | |
c06c1b4b FG |
61 | user_info.check_privs( |
62 | auth_id, | |
63 | &local_store_ns_acl_path, | |
64 | PRIV_DATASTORE_PRUNE, | |
65 | false, | |
66 | )?; | |
268687dd DC |
67 | } |
68 | ||
69 | Ok(()) | |
70 | } | |
71 | ||
6e9e6c7a FG |
72 | impl TryFrom<&SyncJobConfig> for PullParameters { |
73 | type Error = Error; | |
74 | ||
75 | fn try_from(sync_job: &SyncJobConfig) -> Result<Self, Self::Error> { | |
76 | PullParameters::new( | |
77 | &sync_job.store, | |
c06c1b4b | 78 | sync_job.ns.clone().unwrap_or_default(), |
4ec73327 | 79 | sync_job.remote.as_deref(), |
6e9e6c7a | 80 | &sync_job.remote_store, |
c06c1b4b | 81 | sync_job.remote_ns.clone().unwrap_or_default(), |
dc7a5b34 TL |
82 | sync_job |
83 | .owner | |
84 | .as_ref() | |
85 | .unwrap_or_else(|| Authid::root_auth_id()) | |
86 | .clone(), | |
6e9e6c7a | 87 | sync_job.remove_vanished, |
c06c1b4b | 88 | sync_job.max_depth, |
062edce2 | 89 | sync_job.group_filter.clone(), |
2d5287fb | 90 | sync_job.limit.clone(), |
9b67352a | 91 | sync_job.transfer_last, |
6e9e6c7a FG |
92 | ) |
93 | } | |
268687dd DC |
94 | } |
95 | ||
42b68f72 | 96 | pub fn do_sync_job( |
713b66b6 | 97 | mut job: Job, |
42b68f72 | 98 | sync_job: SyncJobConfig, |
e6dc35ac | 99 | auth_id: &Authid, |
02543a5c | 100 | schedule: Option<String>, |
bfa942c0 | 101 | to_stdout: bool, |
42b68f72 | 102 | ) -> Result<String, Error> { |
dc7a5b34 | 103 | let job_id = format!( |
c06c1b4b | 104 | "{}:{}:{}:{}:{}", |
4ec73327 | 105 | sync_job.remote.as_deref().unwrap_or("-"), |
dc7a5b34 TL |
106 | sync_job.remote_store, |
107 | sync_job.store, | |
c06c1b4b | 108 | sync_job.ns.clone().unwrap_or_default(), |
dc7a5b34 TL |
109 | job.jobname() |
110 | ); | |
713b66b6 | 111 | let worker_type = job.jobtype().to_string(); |
42b68f72 | 112 | |
4ec73327 HL |
113 | if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store { |
114 | bail!("can't sync to same datastore"); | |
115 | } | |
116 | ||
02543a5c | 117 | let upid_str = WorkerTask::spawn( |
713b66b6 | 118 | &worker_type, |
dbd45a72 | 119 | Some(job_id.clone()), |
049a22a3 | 120 | auth_id.to_string(), |
bfa942c0 | 121 | to_stdout, |
02543a5c | 122 | move |worker| async move { |
02543a5c | 123 | job.start(&worker.upid().to_string())?; |
42b68f72 | 124 | |
02543a5c | 125 | let worker2 = worker.clone(); |
9e733dae | 126 | let sync_job2 = sync_job.clone(); |
42b68f72 | 127 | |
02543a5c | 128 | let worker_future = async move { |
6e9e6c7a | 129 | let pull_params = PullParameters::try_from(&sync_job)?; |
02543a5c | 130 | |
1ec0d70d | 131 | task_log!(worker, "Starting datastore sync job '{}'", job_id); |
02543a5c | 132 | if let Some(event_str) = schedule { |
1ec0d70d | 133 | task_log!(worker, "task triggered by schedule '{}'", event_str); |
02543a5c | 134 | } |
1ec0d70d DM |
135 | task_log!( |
136 | worker, | |
4ec73327 | 137 | "sync datastore '{}' from '{}{}'", |
1ec0d70d | 138 | sync_job.store, |
4ec73327 HL |
139 | sync_job |
140 | .remote | |
141 | .as_deref() | |
142 | .map_or(String::new(), |remote| format!("{remote}/")), | |
1ec0d70d DM |
143 | sync_job.remote_store, |
144 | ); | |
02543a5c | 145 | |
d3852556 | 146 | let pull_stats = pull_store(&worker, pull_params).await?; |
ed9721f2 TL |
147 | |
148 | if pull_stats.bytes != 0 { | |
149 | let amount = HumanByte::from(pull_stats.bytes); | |
150 | let rate = HumanByte::new_binary( | |
151 | pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(), | |
152 | ); | |
153 | task_log!( | |
154 | worker, | |
155 | "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)", | |
156 | pull_stats.chunk_count, | |
157 | ); | |
158 | } else { | |
159 | task_log!(worker, "Summary: sync job found no new data to pull"); | |
160 | } | |
02543a5c | 161 | |
f4a8be4b CE |
162 | if let Some(removed) = pull_stats.removed { |
163 | task_log!( | |
164 | worker, | |
165 | "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}", | |
166 | removed.snapshots, | |
167 | removed.groups, | |
168 | removed.namespaces, | |
169 | ); | |
170 | } | |
171 | ||
1ec0d70d | 172 | task_log!(worker, "sync job '{}' end", &job_id); |
02543a5c DC |
173 | |
174 | Ok(()) | |
175 | }; | |
176 | ||
dc7a5b34 TL |
177 | let mut abort_future = worker2 |
178 | .abort_future() | |
179 | .map(|_| Err(format_err!("sync aborted"))); | |
02543a5c | 180 | |
dc7a5b34 | 181 | let result = select! { |
02543a5c DC |
182 | worker = worker_future.fuse() => worker, |
183 | abort = abort_future => abort, | |
184 | }; | |
185 | ||
9e733dae | 186 | let status = worker2.create_state(&result); |
02543a5c DC |
187 | |
188 | match job.finish(status) { | |
dc7a5b34 | 189 | Ok(_) => {} |
02543a5c DC |
190 | Err(err) => { |
191 | eprintln!("could not finish job state: {}", err); | |
192 | } | |
193 | } | |
194 | ||
5b23a707 LW |
195 | if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) { |
196 | eprintln!("send sync notification failed: {err}"); | |
9e733dae DM |
197 | } |
198 | ||
199 | result | |
dc7a5b34 TL |
200 | }, |
201 | )?; | |
42b68f72 DC |
202 | |
203 | Ok(upid_str) | |
204 | } | |
205 | ||
de8ec041 DM |
206 | #[api( |
207 | input: { | |
208 | properties: { | |
209 | store: { | |
210 | schema: DATASTORE_SCHEMA, | |
211 | }, | |
c06c1b4b FG |
212 | ns: { |
213 | type: BackupNamespace, | |
214 | optional: true, | |
215 | }, | |
94609e23 DM |
216 | remote: { |
217 | schema: REMOTE_ID_SCHEMA, | |
4ec73327 | 218 | optional: true, |
de8ec041 DM |
219 | }, |
220 | "remote-store": { | |
221 | schema: DATASTORE_SCHEMA, | |
222 | }, | |
c06c1b4b FG |
223 | "remote-ns": { |
224 | type: BackupNamespace, | |
225 | optional: true, | |
226 | }, | |
b4900286 DM |
227 | "remove-vanished": { |
228 | schema: REMOVE_VANISHED_BACKUPS_SCHEMA, | |
4b4eba0b | 229 | optional: true, |
4b4eba0b | 230 | }, |
c06c1b4b | 231 | "max-depth": { |
e40c7fb9 | 232 | schema: NS_MAX_DEPTH_REDUCED_SCHEMA, |
c06c1b4b FG |
233 | optional: true, |
234 | }, | |
062edce2 | 235 | "group-filter": { |
71e53463 FG |
236 | schema: GROUP_FILTER_LIST_SCHEMA, |
237 | optional: true, | |
238 | }, | |
2d5287fb DM |
239 | limit: { |
240 | type: RateLimitConfig, | |
241 | flatten: true, | |
9b67352a SH |
242 | }, |
243 | "transfer-last": { | |
244 | schema: TRANSFER_LAST_SCHEMA, | |
245 | optional: true, | |
246 | }, | |
de8ec041 DM |
247 | }, |
248 | }, | |
404d78c4 | 249 | access: { |
365f0f72 | 250 | // Note: used parameters are no uri parameters, so we need to test inside function body |
54552dda | 251 | description: r###"The user needs Datastore.Backup privilege on '/datastore/{store}', |
8247db5b | 252 | and needs to own the backup group. Remote.Read is required on '/remote/{remote}/{remote-store}'. |
ec67af9a | 253 | The delete flag additionally requires the Datastore.Prune privilege on '/datastore/{store}'. |
54552dda | 254 | "###, |
365f0f72 | 255 | permission: &Permission::Anybody, |
404d78c4 | 256 | }, |
de8ec041 | 257 | )] |
eb506c83 | 258 | /// Sync store from other repository |
e1db0670 | 259 | #[allow(clippy::too_many_arguments)] |
dc7a5b34 | 260 | async fn pull( |
de8ec041 | 261 | store: String, |
c06c1b4b | 262 | ns: Option<BackupNamespace>, |
4ec73327 | 263 | remote: Option<String>, |
de8ec041 | 264 | remote_store: String, |
c06c1b4b | 265 | remote_ns: Option<BackupNamespace>, |
b4900286 | 266 | remove_vanished: Option<bool>, |
c06c1b4b | 267 | max_depth: Option<usize>, |
062edce2 | 268 | group_filter: Option<Vec<GroupFilter>>, |
2d5287fb | 269 | limit: RateLimitConfig, |
9b67352a | 270 | transfer_last: Option<usize>, |
de8ec041 DM |
271 | rpcenv: &mut dyn RpcEnvironment, |
272 | ) -> Result<String, Error> { | |
e6dc35ac | 273 | let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; |
61ef4ae8 | 274 | let delete = remove_vanished.unwrap_or(false); |
4b4eba0b | 275 | |
4ec73327 HL |
276 | if remote.is_none() && store == remote_store { |
277 | bail!("can't sync to same datastore"); | |
278 | } | |
279 | ||
c06c1b4b | 280 | let ns = ns.unwrap_or_default(); |
c06c1b4b FG |
281 | let ns_str = if ns.is_root() { |
282 | None | |
283 | } else { | |
284 | Some(ns.to_string()) | |
285 | }; | |
286 | ||
287 | check_pull_privs( | |
288 | &auth_id, | |
289 | &store, | |
290 | ns_str.as_deref(), | |
4ec73327 | 291 | remote.as_deref(), |
c06c1b4b FG |
292 | &remote_store, |
293 | delete, | |
294 | )?; | |
de8ec041 | 295 | |
6e9e6c7a FG |
296 | let pull_params = PullParameters::new( |
297 | &store, | |
c06c1b4b | 298 | ns, |
4ec73327 | 299 | remote.as_deref(), |
6e9e6c7a | 300 | &remote_store, |
c06c1b4b | 301 | remote_ns.unwrap_or_default(), |
6e9e6c7a FG |
302 | auth_id.clone(), |
303 | remove_vanished, | |
c06c1b4b | 304 | max_depth, |
062edce2 | 305 | group_filter, |
2d5287fb | 306 | limit, |
9b67352a | 307 | transfer_last, |
6e9e6c7a | 308 | )?; |
de8ec041 DM |
309 | |
310 | // fixme: set to_stdout to false? | |
b9b2d635 | 311 | // FIXME: add namespace to worker id? |
dc7a5b34 TL |
312 | let upid_str = WorkerTask::spawn( |
313 | "sync", | |
314 | Some(store.clone()), | |
315 | auth_id.to_string(), | |
316 | true, | |
317 | move |worker| async move { | |
c06c1b4b FG |
318 | task_log!( |
319 | worker, | |
320 | "pull datastore '{}' from '{}/{}'", | |
321 | store, | |
4ec73327 | 322 | remote.as_deref().unwrap_or("-"), |
c06c1b4b FG |
323 | remote_store, |
324 | ); | |
de8ec041 | 325 | |
05a52d01 | 326 | let pull_future = pull_store(&worker, pull_params); |
10dac693 | 327 | (select! { |
dc7a5b34 TL |
328 | success = pull_future.fuse() => success, |
329 | abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort, | |
10dac693 | 330 | })?; |
de8ec041 | 331 | |
c06c1b4b | 332 | task_log!(worker, "pull datastore '{}' end", store); |
de8ec041 | 333 | |
dc7a5b34 TL |
334 | Ok(()) |
335 | }, | |
336 | )?; | |
de8ec041 DM |
337 | |
338 | Ok(upid_str) | |
339 | } | |
340 | ||
dc7a5b34 | 341 | pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL); |