]>
Commit | Line | Data |
---|---|---|
07ad6470 DM |
1 | //! Sync datastore from remote server |
2 | ||
e2956c60 | 3 | use std::collections::{HashMap, HashSet}; |
076f36ec HL |
4 | use std::io::{Seek, Write}; |
5 | use std::path::{Path, PathBuf}; | |
54417086 | 6 | use std::sync::atomic::{AtomicUsize, Ordering}; |
e2956c60 | 7 | use std::sync::{Arc, Mutex}; |
68ac365f | 8 | use std::time::{Duration, SystemTime}; |
07ad6470 | 9 | |
c23192d3 | 10 | use anyhow::{bail, format_err, Error}; |
6ef1b649 | 11 | use http::StatusCode; |
12632250 | 12 | use proxmox_human_byte::HumanByte; |
05a52d01 | 13 | use proxmox_rest_server::WorkerTask; |
6ef1b649 | 14 | use proxmox_router::HttpError; |
05a52d01 HL |
15 | use proxmox_sys::{task_log, task_warn}; |
16 | use serde_json::json; | |
c23192d3 | 17 | |
2d5287fb | 18 | use pbs_api_types::{ |
05a52d01 HL |
19 | print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter, |
20 | GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH, | |
7aeabff2 | 21 | PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ, |
2d5287fb | 22 | }; |
05a52d01 HL |
23 | use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}; |
24 | use pbs_config::CachedUserInfo; | |
ea584a75 WB |
25 | use pbs_datastore::data_blob::DataBlob; |
26 | use pbs_datastore::dynamic_index::DynamicIndexReader; | |
27 | use pbs_datastore::fixed_index::FixedIndexReader; | |
28 | use pbs_datastore::index::IndexFile; | |
29 | use pbs_datastore::manifest::{ | |
ee0ea735 | 30 | archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, |
ea584a75 | 31 | }; |
05a52d01 | 32 | use pbs_datastore::read_chunk::AsyncReadChunk; |
076f36ec HL |
33 | use pbs_datastore::{ |
34 | check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress, | |
35 | }; | |
ba0ccc59 | 36 | use pbs_tools::sha::sha256; |
c23192d3 | 37 | |
076f36ec | 38 | use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups}; |
260147bd | 39 | use crate::tools::parallel_handler::ParallelHandler; |
07ad6470 | 40 | |
05a52d01 HL |
41 | struct RemoteReader { |
42 | backup_reader: Arc<BackupReader>, | |
43 | dir: BackupDir, | |
44 | } | |
45 | ||
076f36ec HL |
46 | struct LocalReader { |
47 | _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>, | |
48 | path: PathBuf, | |
49 | datastore: Arc<DataStore>, | |
50 | } | |
51 | ||
05a52d01 | 52 | pub(crate) struct PullTarget { |
6e9e6c7a | 53 | store: Arc<DataStore>, |
c06c1b4b | 54 | ns: BackupNamespace, |
05a52d01 HL |
55 | } |
56 | ||
57 | pub(crate) struct RemoteSource { | |
58 | repo: BackupRepository, | |
59 | ns: BackupNamespace, | |
60 | client: HttpClient, | |
61 | } | |
62 | ||
076f36ec HL |
63 | pub(crate) struct LocalSource { |
64 | store: Arc<DataStore>, | |
65 | ns: BackupNamespace, | |
66 | } | |
67 | ||
ceb639bd CE |
68 | #[derive(Default)] |
69 | pub(crate) struct RemovedVanishedStats { | |
70 | pub(crate) groups: usize, | |
71 | pub(crate) snapshots: usize, | |
72 | pub(crate) namespaces: usize, | |
73 | } | |
74 | ||
75 | impl RemovedVanishedStats { | |
76 | fn add(&mut self, rhs: RemovedVanishedStats) { | |
77 | self.groups += rhs.groups; | |
78 | self.snapshots += rhs.snapshots; | |
79 | self.namespaces += rhs.namespaces; | |
80 | } | |
81 | } | |
82 | ||
68ac365f CE |
83 | #[derive(Default)] |
84 | pub(crate) struct PullStats { | |
85 | pub(crate) chunk_count: usize, | |
86 | pub(crate) bytes: usize, | |
87 | pub(crate) elapsed: Duration, | |
ceb639bd CE |
88 | pub(crate) removed: Option<RemovedVanishedStats>, |
89 | } | |
90 | ||
91 | impl From<RemovedVanishedStats> for PullStats { | |
92 | fn from(removed: RemovedVanishedStats) -> Self { | |
93 | Self { | |
94 | removed: Some(removed), | |
95 | ..Default::default() | |
96 | } | |
97 | } | |
68ac365f CE |
98 | } |
99 | ||
100 | impl PullStats { | |
101 | fn add(&mut self, rhs: PullStats) { | |
102 | self.chunk_count += rhs.chunk_count; | |
103 | self.bytes += rhs.bytes; | |
104 | self.elapsed += rhs.elapsed; | |
ceb639bd CE |
105 | |
106 | if let Some(rhs_removed) = rhs.removed { | |
107 | if let Some(ref mut removed) = self.removed { | |
108 | removed.add(rhs_removed); | |
109 | } else { | |
110 | self.removed = Some(rhs_removed); | |
111 | } | |
112 | } | |
68ac365f CE |
113 | } |
114 | } | |
115 | ||
05a52d01 HL |
116 | #[async_trait::async_trait] |
117 | /// `PullSource` is a trait that provides an interface for pulling data/information from a source. | |
118 | /// The trait includes methods for listing namespaces, groups, and backup directories, | |
119 | /// as well as retrieving a reader for reading data from the source | |
120 | trait PullSource: Send + Sync { | |
121 | /// Lists namespaces from the source. | |
122 | async fn list_namespaces( | |
123 | &self, | |
124 | max_depth: &mut Option<usize>, | |
125 | worker: &WorkerTask, | |
126 | ) -> Result<Vec<BackupNamespace>, Error>; | |
127 | ||
128 | /// Lists groups within a specific namespace from the source. | |
129 | async fn list_groups( | |
130 | &self, | |
131 | namespace: &BackupNamespace, | |
132 | owner: &Authid, | |
133 | ) -> Result<Vec<BackupGroup>, Error>; | |
134 | ||
135 | /// Lists backup directories for a specific group within a specific namespace from the source. | |
136 | async fn list_backup_dirs( | |
137 | &self, | |
138 | namespace: &BackupNamespace, | |
139 | group: &BackupGroup, | |
140 | worker: &WorkerTask, | |
141 | ) -> Result<Vec<BackupDir>, Error>; | |
142 | fn get_ns(&self) -> BackupNamespace; | |
4cc4ea64 | 143 | fn get_store(&self) -> &str; |
05a52d01 HL |
144 | |
145 | /// Returns a reader for reading data from a specific backup directory. | |
146 | async fn reader( | |
147 | &self, | |
148 | ns: &BackupNamespace, | |
149 | dir: &BackupDir, | |
150 | ) -> Result<Arc<dyn PullReader>, Error>; | |
151 | } | |
152 | ||
153 | #[async_trait::async_trait] | |
154 | impl PullSource for RemoteSource { | |
155 | async fn list_namespaces( | |
156 | &self, | |
157 | max_depth: &mut Option<usize>, | |
158 | worker: &WorkerTask, | |
159 | ) -> Result<Vec<BackupNamespace>, Error> { | |
160 | if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) { | |
161 | return Ok(vec![self.ns.clone()]); | |
162 | } | |
163 | ||
164 | let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store()); | |
165 | let mut data = json!({}); | |
166 | if let Some(max_depth) = max_depth { | |
167 | data["max-depth"] = json!(max_depth); | |
168 | } | |
169 | ||
170 | if !self.ns.is_root() { | |
171 | data["parent"] = json!(self.ns); | |
172 | } | |
173 | self.client.login().await?; | |
174 | ||
175 | let mut result = match self.client.get(&path, Some(data)).await { | |
176 | Ok(res) => res, | |
177 | Err(err) => match err.downcast_ref::<HttpError>() { | |
178 | Some(HttpError { code, message }) => match code { | |
179 | &StatusCode::NOT_FOUND => { | |
180 | if self.ns.is_root() && max_depth.is_none() { | |
181 | task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode"); | |
182 | task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system."); | |
183 | max_depth.replace(0); | |
184 | } else { | |
185 | bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.") | |
186 | } | |
187 | ||
188 | return Ok(vec![self.ns.clone()]); | |
189 | } | |
190 | _ => { | |
191 | bail!("Querying namespaces failed - HTTP error {code} - {message}"); | |
192 | } | |
193 | }, | |
194 | None => { | |
195 | bail!("Querying namespaces failed - {err}"); | |
196 | } | |
197 | }, | |
198 | }; | |
199 | ||
200 | let list: Vec<BackupNamespace> = | |
201 | serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())? | |
202 | .into_iter() | |
203 | .map(|list_item| list_item.ns) | |
204 | .collect(); | |
205 | ||
206 | Ok(list) | |
207 | } | |
208 | ||
209 | async fn list_groups( | |
210 | &self, | |
211 | namespace: &BackupNamespace, | |
212 | _owner: &Authid, | |
213 | ) -> Result<Vec<BackupGroup>, Error> { | |
214 | let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store()); | |
215 | ||
216 | let args = if !namespace.is_root() { | |
217 | Some(json!({ "ns": namespace.clone() })) | |
218 | } else { | |
219 | None | |
220 | }; | |
221 | ||
222 | self.client.login().await?; | |
223 | let mut result = | |
224 | self.client.get(&path, args).await.map_err(|err| { | |
225 | format_err!("Failed to retrieve backup groups from remote - {}", err) | |
226 | })?; | |
227 | ||
228 | Ok( | |
229 | serde_json::from_value::<Vec<GroupListItem>>(result["data"].take()) | |
230 | .map_err(Error::from)? | |
231 | .into_iter() | |
232 | .map(|item| item.backup) | |
233 | .collect::<Vec<BackupGroup>>(), | |
234 | ) | |
235 | } | |
236 | ||
237 | async fn list_backup_dirs( | |
238 | &self, | |
b14e5dcb | 239 | namespace: &BackupNamespace, |
05a52d01 HL |
240 | group: &BackupGroup, |
241 | worker: &WorkerTask, | |
242 | ) -> Result<Vec<BackupDir>, Error> { | |
243 | let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store()); | |
244 | ||
245 | let mut args = json!({ | |
246 | "backup-type": group.ty, | |
247 | "backup-id": group.id, | |
248 | }); | |
249 | ||
b14e5dcb | 250 | if !namespace.is_root() { |
2224b390 | 251 | args["ns"] = serde_json::to_value(namespace)?; |
05a52d01 HL |
252 | } |
253 | ||
254 | self.client.login().await?; | |
255 | ||
256 | let mut result = self.client.get(&path, Some(args)).await?; | |
257 | let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?; | |
258 | Ok(snapshot_list | |
259 | .into_iter() | |
260 | .filter_map(|item: SnapshotListItem| { | |
261 | let snapshot = item.backup; | |
262 | // in-progress backups can't be synced | |
263 | if item.size.is_none() { | |
264 | task_log!( | |
265 | worker, | |
266 | "skipping snapshot {} - in-progress backup", | |
267 | snapshot | |
268 | ); | |
269 | return None; | |
270 | } | |
271 | ||
272 | Some(snapshot) | |
273 | }) | |
274 | .collect::<Vec<BackupDir>>()) | |
275 | } | |
276 | ||
277 | fn get_ns(&self) -> BackupNamespace { | |
278 | self.ns.clone() | |
279 | } | |
280 | ||
4cc4ea64 | 281 | fn get_store(&self) -> &str { |
2224b390 | 282 | self.repo.store() |
05a52d01 HL |
283 | } |
284 | ||
285 | async fn reader( | |
286 | &self, | |
287 | ns: &BackupNamespace, | |
288 | dir: &BackupDir, | |
289 | ) -> Result<Arc<dyn PullReader>, Error> { | |
290 | let backup_reader = | |
291 | BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?; | |
292 | Ok(Arc::new(RemoteReader { | |
293 | backup_reader, | |
294 | dir: dir.clone(), | |
295 | })) | |
296 | } | |
297 | } | |
298 | ||
076f36ec HL |
299 | #[async_trait::async_trait] |
300 | impl PullSource for LocalSource { | |
301 | async fn list_namespaces( | |
302 | &self, | |
303 | max_depth: &mut Option<usize>, | |
304 | _worker: &WorkerTask, | |
305 | ) -> Result<Vec<BackupNamespace>, Error> { | |
306 | ListNamespacesRecursive::new_max_depth( | |
307 | self.store.clone(), | |
308 | self.ns.clone(), | |
309 | max_depth.unwrap_or(MAX_NAMESPACE_DEPTH), | |
310 | )? | |
311 | .collect() | |
312 | } | |
313 | ||
314 | async fn list_groups( | |
315 | &self, | |
316 | namespace: &BackupNamespace, | |
317 | owner: &Authid, | |
318 | ) -> Result<Vec<BackupGroup>, Error> { | |
319 | Ok(ListAccessibleBackupGroups::new_with_privs( | |
320 | &self.store, | |
321 | namespace.clone(), | |
322 | 0, | |
7aeabff2 HL |
323 | Some(PRIV_DATASTORE_READ), |
324 | Some(PRIV_DATASTORE_BACKUP), | |
076f36ec HL |
325 | Some(owner), |
326 | )? | |
327 | .filter_map(Result::ok) | |
328 | .map(|backup_group| backup_group.group().clone()) | |
329 | .collect::<Vec<pbs_api_types::BackupGroup>>()) | |
330 | } | |
331 | ||
332 | async fn list_backup_dirs( | |
333 | &self, | |
334 | namespace: &BackupNamespace, | |
335 | group: &BackupGroup, | |
336 | _worker: &WorkerTask, | |
337 | ) -> Result<Vec<BackupDir>, Error> { | |
338 | Ok(self | |
339 | .store | |
340 | .backup_group(namespace.clone(), group.clone()) | |
341 | .iter_snapshots()? | |
342 | .filter_map(Result::ok) | |
343 | .map(|snapshot| snapshot.dir().to_owned()) | |
344 | .collect::<Vec<BackupDir>>()) | |
345 | } | |
346 | ||
347 | fn get_ns(&self) -> BackupNamespace { | |
348 | self.ns.clone() | |
349 | } | |
350 | ||
4cc4ea64 FG |
351 | fn get_store(&self) -> &str { |
352 | self.store.name() | |
076f36ec HL |
353 | } |
354 | ||
355 | async fn reader( | |
356 | &self, | |
357 | ns: &BackupNamespace, | |
358 | dir: &BackupDir, | |
359 | ) -> Result<Arc<dyn PullReader>, Error> { | |
360 | let dir = self.store.backup_dir(ns.clone(), dir.clone())?; | |
361 | let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared( | |
362 | &dir.full_path(), | |
363 | "snapshot", | |
364 | "locked by another operation", | |
365 | )?; | |
366 | Ok(Arc::new(LocalReader { | |
367 | _dir_lock: Arc::new(Mutex::new(dir_lock)), | |
368 | path: dir.full_path(), | |
369 | datastore: dir.datastore().clone(), | |
370 | })) | |
371 | } | |
372 | } | |
373 | ||
05a52d01 HL |
374 | #[async_trait::async_trait] |
375 | /// `PullReader` is a trait that provides an interface for reading data from a source. | |
376 | /// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped. | |
377 | trait PullReader: Send + Sync { | |
378 | /// Returns a chunk reader with the specified encryption mode. | |
379 | fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>; | |
380 | ||
381 | /// Asynchronously loads a file from the source into a local file. | |
382 | /// `filename` is the name of the file to load from the source. | |
383 | /// `into` is the path of the local file to load the source file into. | |
384 | async fn load_file_into( | |
385 | &self, | |
386 | filename: &str, | |
387 | into: &Path, | |
388 | worker: &WorkerTask, | |
389 | ) -> Result<Option<DataBlob>, Error>; | |
390 | ||
391 | /// Tries to download the client log from the source and save it into a local file. | |
392 | async fn try_download_client_log( | |
393 | &self, | |
394 | to_path: &Path, | |
395 | worker: &WorkerTask, | |
396 | ) -> Result<(), Error>; | |
397 | ||
398 | fn skip_chunk_sync(&self, target_store_name: &str) -> bool; | |
399 | } | |
400 | ||
401 | #[async_trait::async_trait] | |
402 | impl PullReader for RemoteReader { | |
403 | fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> { | |
404 | Arc::new(RemoteChunkReader::new( | |
405 | self.backup_reader.clone(), | |
406 | None, | |
407 | crypt_mode, | |
408 | HashMap::new(), | |
409 | )) | |
410 | } | |
411 | ||
412 | async fn load_file_into( | |
413 | &self, | |
414 | filename: &str, | |
415 | into: &Path, | |
416 | worker: &WorkerTask, | |
417 | ) -> Result<Option<DataBlob>, Error> { | |
418 | let mut tmp_file = std::fs::OpenOptions::new() | |
419 | .write(true) | |
420 | .create(true) | |
421 | .truncate(true) | |
422 | .read(true) | |
423 | .open(into)?; | |
424 | let download_result = self.backup_reader.download(filename, &mut tmp_file).await; | |
425 | if let Err(err) = download_result { | |
426 | match err.downcast_ref::<HttpError>() { | |
427 | Some(HttpError { code, message }) => match *code { | |
428 | StatusCode::NOT_FOUND => { | |
429 | task_log!( | |
430 | worker, | |
431 | "skipping snapshot {} - vanished since start of sync", | |
432 | &self.dir, | |
433 | ); | |
434 | return Ok(None); | |
435 | } | |
436 | _ => { | |
437 | bail!("HTTP error {code} - {message}"); | |
438 | } | |
439 | }, | |
440 | None => { | |
441 | return Err(err); | |
442 | } | |
443 | }; | |
444 | }; | |
445 | tmp_file.rewind()?; | |
446 | Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) | |
447 | } | |
448 | ||
449 | async fn try_download_client_log( | |
450 | &self, | |
451 | to_path: &Path, | |
452 | worker: &WorkerTask, | |
453 | ) -> Result<(), Error> { | |
454 | let mut tmp_path = to_path.to_owned(); | |
455 | tmp_path.set_extension("tmp"); | |
456 | ||
457 | let tmpfile = std::fs::OpenOptions::new() | |
458 | .write(true) | |
459 | .create(true) | |
460 | .read(true) | |
461 | .open(&tmp_path)?; | |
462 | ||
463 | // Note: be silent if there is no log - only log successful download | |
464 | if let Ok(()) = self | |
465 | .backup_reader | |
466 | .download(CLIENT_LOG_BLOB_NAME, tmpfile) | |
467 | .await | |
468 | { | |
469 | if let Err(err) = std::fs::rename(&tmp_path, to_path) { | |
470 | bail!("Atomic rename file {:?} failed - {}", to_path, err); | |
471 | } | |
472 | task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME); | |
473 | } | |
474 | ||
475 | Ok(()) | |
476 | } | |
477 | ||
478 | fn skip_chunk_sync(&self, _target_store_name: &str) -> bool { | |
479 | false | |
480 | } | |
481 | } | |
482 | ||
076f36ec HL |
483 | #[async_trait::async_trait] |
484 | impl PullReader for LocalReader { | |
485 | fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> { | |
486 | Arc::new(LocalChunkReader::new( | |
487 | self.datastore.clone(), | |
488 | None, | |
489 | crypt_mode, | |
490 | )) | |
491 | } | |
492 | ||
493 | async fn load_file_into( | |
494 | &self, | |
495 | filename: &str, | |
496 | into: &Path, | |
497 | _worker: &WorkerTask, | |
498 | ) -> Result<Option<DataBlob>, Error> { | |
499 | let mut tmp_file = std::fs::OpenOptions::new() | |
500 | .write(true) | |
501 | .create(true) | |
502 | .truncate(true) | |
503 | .read(true) | |
504 | .open(into)?; | |
505 | let mut from_path = self.path.clone(); | |
506 | from_path.push(filename); | |
507 | tmp_file.write_all(std::fs::read(from_path)?.as_slice())?; | |
508 | tmp_file.rewind()?; | |
509 | Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) | |
510 | } | |
511 | ||
512 | async fn try_download_client_log( | |
513 | &self, | |
514 | _to_path: &Path, | |
515 | _worker: &WorkerTask, | |
516 | ) -> Result<(), Error> { | |
517 | Ok(()) | |
518 | } | |
519 | ||
520 | fn skip_chunk_sync(&self, target_store_name: &str) -> bool { | |
521 | self.datastore.name() == target_store_name | |
522 | } | |
523 | } | |
524 | ||
05a52d01 HL |
525 | /// Parameters for a pull operation. |
526 | pub(crate) struct PullParameters { | |
527 | /// Where data is pulled from | |
528 | source: Arc<dyn PullSource>, | |
529 | /// Where data should be pulled into | |
530 | target: PullTarget, | |
29c56859 | 531 | /// Owner of synced groups (needs to match local owner of pre-existing groups) |
6e9e6c7a | 532 | owner: Authid, |
29c56859 | 533 | /// Whether to remove groups which exist locally, but not on the remote end |
6e9e6c7a | 534 | remove_vanished: bool, |
b9310489 FG |
535 | /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion) |
536 | max_depth: Option<usize>, | |
29c56859 | 537 | /// Filters for reducing the pull scope |
59c92736 | 538 | group_filter: Vec<GroupFilter>, |
9b67352a SH |
539 | /// How many snapshots should be transferred at most (taking the newest N snapshots) |
540 | transfer_last: Option<usize>, | |
6e9e6c7a FG |
541 | } |
542 | ||
543 | impl PullParameters { | |
29c56859 | 544 | /// Creates a new instance of `PullParameters`. |
5b6cb51d | 545 | pub(crate) fn new( |
6e9e6c7a | 546 | store: &str, |
c06c1b4b | 547 | ns: BackupNamespace, |
05a52d01 | 548 | remote: Option<&str>, |
6e9e6c7a | 549 | remote_store: &str, |
c06c1b4b | 550 | remote_ns: BackupNamespace, |
6e9e6c7a FG |
551 | owner: Authid, |
552 | remove_vanished: Option<bool>, | |
b9310489 | 553 | max_depth: Option<usize>, |
71e53463 | 554 | group_filter: Option<Vec<GroupFilter>>, |
2d5287fb | 555 | limit: RateLimitConfig, |
9b67352a | 556 | transfer_last: Option<usize>, |
6e9e6c7a | 557 | ) -> Result<Self, Error> { |
66abc4cb FG |
558 | if let Some(max_depth) = max_depth { |
559 | ns.check_max_depth(max_depth)?; | |
560 | remote_ns.check_max_depth(max_depth)?; | |
05a52d01 | 561 | }; |
61ef4ae8 | 562 | let remove_vanished = remove_vanished.unwrap_or(false); |
6e9e6c7a | 563 | |
05a52d01 HL |
564 | let source: Arc<dyn PullSource> = if let Some(remote) = remote { |
565 | let (remote_config, _digest) = pbs_config::remote::config()?; | |
566 | let remote: Remote = remote_config.lookup("remote", remote)?; | |
6e9e6c7a | 567 | |
05a52d01 HL |
568 | let repo = BackupRepository::new( |
569 | Some(remote.config.auth_id.clone()), | |
570 | Some(remote.config.host.clone()), | |
571 | remote.config.port, | |
572 | remote_store.to_string(), | |
573 | ); | |
574 | let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?; | |
575 | Arc::new(RemoteSource { | |
576 | repo, | |
577 | ns: remote_ns, | |
578 | client, | |
579 | }) | |
580 | } else { | |
076f36ec HL |
581 | Arc::new(LocalSource { |
582 | store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?, | |
583 | ns: remote_ns, | |
584 | }) | |
05a52d01 HL |
585 | }; |
586 | let target = PullTarget { | |
587 | store: DataStore::lookup_datastore(store, Some(Operation::Write))?, | |
c06c1b4b | 588 | ns, |
05a52d01 HL |
589 | }; |
590 | ||
2dd9f98f | 591 | let group_filter = group_filter.unwrap_or_default(); |
59c92736 | 592 | |
05a52d01 | 593 | Ok(Self { |
ee0ea735 | 594 | source, |
05a52d01 | 595 | target, |
ee0ea735 TL |
596 | owner, |
597 | remove_vanished, | |
c06c1b4b | 598 | max_depth, |
ee0ea735 | 599 | group_filter, |
9b67352a | 600 | transfer_last, |
ee0ea735 | 601 | }) |
6e9e6c7a | 602 | } |
6e9e6c7a FG |
603 | } |
604 | ||
07ad6470 | 605 | async fn pull_index_chunks<I: IndexFile>( |
998db639 | 606 | worker: &WorkerTask, |
05a52d01 | 607 | chunk_reader: Arc<dyn AsyncReadChunk>, |
07ad6470 DM |
608 | target: Arc<DataStore>, |
609 | index: I, | |
e2956c60 | 610 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
68ac365f | 611 | ) -> Result<PullStats, Error> { |
73b2cc49 | 612 | use futures::stream::{self, StreamExt, TryStreamExt}; |
07ad6470 | 613 | |
998db639 DM |
614 | let start_time = SystemTime::now(); |
615 | ||
ebbe4958 DM |
616 | let stream = stream::iter( |
617 | (0..index.index_count()) | |
618 | .map(|pos| index.chunk_info(pos).unwrap()) | |
619 | .filter(|info| { | |
620 | let mut guard = downloaded_chunks.lock().unwrap(); | |
621 | let done = guard.contains(&info.digest); | |
622 | if !done { | |
623 | // Note: We mark a chunk as downloaded before its actually downloaded | |
624 | // to avoid duplicate downloads. | |
625 | guard.insert(info.digest); | |
626 | } | |
627 | !done | |
e2956c60 | 628 | }), |
ebbe4958 | 629 | ); |
07ad6470 | 630 | |
a71bc08f | 631 | let target2 = target.clone(); |
54417086 | 632 | let verify_pool = ParallelHandler::new( |
e2956c60 FG |
633 | "sync chunk writer", |
634 | 4, | |
635 | move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { | |
25877d05 | 636 | // println!("verify and write {}", hex::encode(&digest)); |
54417086 | 637 | chunk.verify_unencrypted(size as usize, &digest)?; |
a71bc08f | 638 | target2.insert_chunk(&chunk, &digest)?; |
54417086 | 639 | Ok(()) |
e2956c60 | 640 | }, |
54417086 DM |
641 | ); |
642 | ||
643 | let verify_and_write_channel = verify_pool.channel(); | |
998db639 | 644 | |
54417086 | 645 | let bytes = Arc::new(AtomicUsize::new(0)); |
68ac365f | 646 | let chunk_count = Arc::new(AtomicUsize::new(0)); |
54417086 DM |
647 | |
648 | stream | |
73b2cc49 | 649 | .map(|info| { |
73b2cc49 DM |
650 | let target = Arc::clone(&target); |
651 | let chunk_reader = chunk_reader.clone(); | |
54417086 | 652 | let bytes = Arc::clone(&bytes); |
68ac365f | 653 | let chunk_count = Arc::clone(&chunk_count); |
54417086 | 654 | let verify_and_write_channel = verify_and_write_channel.clone(); |
73b2cc49 DM |
655 | |
656 | Ok::<_, Error>(async move { | |
9a1b24b6 | 657 | let chunk_exists = proxmox_async::runtime::block_in_place(|| { |
e2956c60 FG |
658 | target.cond_touch_chunk(&info.digest, false) |
659 | })?; | |
73b2cc49 | 660 | if chunk_exists { |
25877d05 | 661 | //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest)); |
73b2cc49 DM |
662 | return Ok::<_, Error>(()); |
663 | } | |
25877d05 | 664 | //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest)); |
73b2cc49 | 665 | let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; |
54417086 | 666 | let raw_size = chunk.raw_size() as usize; |
73b2cc49 | 667 | |
998db639 | 668 | // decode, verify and write in a separate threads to maximize throughput |
9a1b24b6 | 669 | proxmox_async::runtime::block_in_place(|| { |
e2956c60 FG |
670 | verify_and_write_channel.send((chunk, info.digest, info.size())) |
671 | })?; | |
54417086 DM |
672 | |
673 | bytes.fetch_add(raw_size, Ordering::SeqCst); | |
68ac365f | 674 | chunk_count.fetch_add(1, Ordering::SeqCst); |
998db639 DM |
675 | |
676 | Ok(()) | |
e2956c60 | 677 | }) |
73b2cc49 DM |
678 | }) |
679 | .try_buffer_unordered(20) | |
680 | .try_for_each(|_res| futures::future::ok(())) | |
54417086 | 681 | .await?; |
998db639 | 682 | |
54417086 | 683 | drop(verify_and_write_channel); |
998db639 | 684 | |
54417086 | 685 | verify_pool.complete()?; |
998db639 | 686 | |
68ac365f | 687 | let elapsed = start_time.elapsed()?; |
998db639 | 688 | |
54417086 | 689 | let bytes = bytes.load(Ordering::SeqCst); |
68ac365f | 690 | let chunk_count = chunk_count.load(Ordering::SeqCst); |
54417086 | 691 | |
1ec0d70d | 692 | task_log!( |
ee0ea735 | 693 | worker, |
12632250 CE |
694 | "downloaded {} ({}/s)", |
695 | HumanByte::from(bytes), | |
696 | HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()), | |
1ec0d70d | 697 | ); |
07ad6470 | 698 | |
68ac365f CE |
699 | Ok(PullStats { |
700 | chunk_count, | |
701 | bytes, | |
702 | elapsed, | |
ceb639bd | 703 | removed: None, |
68ac365f | 704 | }) |
07ad6470 DM |
705 | } |
706 | ||
e2956c60 | 707 | fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> { |
2ce15934 | 708 | if size != info.size { |
e2956c60 FG |
709 | bail!( |
710 | "wrong size for file '{}' ({} != {})", | |
711 | info.filename, | |
712 | info.size, | |
713 | size | |
714 | ); | |
2ce15934 FG |
715 | } |
716 | ||
717 | if csum != &info.csum { | |
718 | bail!("wrong checksum for file '{}'", info.filename); | |
719 | } | |
720 | ||
721 | Ok(()) | |
722 | } | |
723 | ||
29c56859 FG |
724 | /// Pulls a single file referenced by a manifest. |
725 | /// | |
726 | /// Pulling an archive consists of the following steps: | |
05a52d01 HL |
727 | /// - Load archive file into tmp file |
728 | /// -- Load file into tmp file | |
729 | /// -- Verify tmp file checksum | |
29c56859 FG |
730 | /// - if archive is an index, pull referenced chunks |
731 | /// - Rename tmp file into real path | |
05a52d01 HL |
732 | async fn pull_single_archive<'a>( |
733 | worker: &'a WorkerTask, | |
734 | reader: Arc<dyn PullReader + 'a>, | |
735 | snapshot: &'a pbs_datastore::BackupDir, | |
736 | archive_info: &'a FileInfo, | |
e2956c60 | 737 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
68ac365f | 738 | ) -> Result<PullStats, Error> { |
2ce15934 | 739 | let archive_name = &archive_info.filename; |
c06c1b4b | 740 | let mut path = snapshot.full_path(); |
07ad6470 DM |
741 | path.push(archive_name); |
742 | ||
743 | let mut tmp_path = path.clone(); | |
744 | tmp_path.set_extension("tmp"); | |
745 | ||
68ac365f CE |
746 | let mut pull_stats = PullStats::default(); |
747 | ||
1ec0d70d DM |
748 | task_log!(worker, "sync archive {}", archive_name); |
749 | ||
05a52d01 HL |
750 | reader |
751 | .load_file_into(archive_name, &tmp_path, worker) | |
752 | .await?; | |
07ad6470 | 753 | |
05a52d01 | 754 | let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?; |
07ad6470 DM |
755 | |
756 | match archive_type(archive_name)? { | |
757 | ArchiveType::DynamicIndex => { | |
e2956c60 FG |
758 | let index = DynamicIndexReader::new(tmpfile).map_err(|err| { |
759 | format_err!("unable to read dynamic index {:?} - {}", tmp_path, err) | |
760 | })?; | |
2ce15934 FG |
761 | let (csum, size) = index.compute_csum(); |
762 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 763 | |
05a52d01 HL |
764 | if reader.skip_chunk_sync(snapshot.datastore().name()) { |
765 | task_log!(worker, "skipping chunk sync for same datastore"); | |
766 | } else { | |
68ac365f | 767 | let stats = pull_index_chunks( |
05a52d01 HL |
768 | worker, |
769 | reader.chunk_reader(archive_info.crypt_mode), | |
770 | snapshot.datastore().clone(), | |
771 | index, | |
772 | downloaded_chunks, | |
773 | ) | |
774 | .await?; | |
68ac365f | 775 | pull_stats.add(stats); |
05a52d01 | 776 | } |
07ad6470 DM |
777 | } |
778 | ArchiveType::FixedIndex => { | |
e2956c60 FG |
779 | let index = FixedIndexReader::new(tmpfile).map_err(|err| { |
780 | format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err) | |
781 | })?; | |
2ce15934 FG |
782 | let (csum, size) = index.compute_csum(); |
783 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 784 | |
05a52d01 HL |
785 | if reader.skip_chunk_sync(snapshot.datastore().name()) { |
786 | task_log!(worker, "skipping chunk sync for same datastore"); | |
787 | } else { | |
68ac365f | 788 | let stats = pull_index_chunks( |
05a52d01 HL |
789 | worker, |
790 | reader.chunk_reader(archive_info.crypt_mode), | |
791 | snapshot.datastore().clone(), | |
792 | index, | |
793 | downloaded_chunks, | |
794 | ) | |
795 | .await?; | |
68ac365f | 796 | pull_stats.add(stats); |
05a52d01 | 797 | } |
07ad6470 | 798 | } |
2ce15934 | 799 | ArchiveType::Blob => { |
05a52d01 | 800 | tmpfile.rewind()?; |
ba0ccc59 | 801 | let (csum, size) = sha256(&mut tmpfile)?; |
2ce15934 FG |
802 | verify_archive(archive_info, &csum, size)?; |
803 | } | |
07ad6470 DM |
804 | } |
805 | if let Err(err) = std::fs::rename(&tmp_path, &path) { | |
806 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
807 | } | |
68ac365f | 808 | Ok(pull_stats) |
07ad6470 DM |
809 | } |
810 | ||
29c56859 FG |
811 | /// Actual implementation of pulling a snapshot. |
812 | /// | |
813 | /// Pulling a snapshot consists of the following steps: | |
814 | /// - (Re)download the manifest | |
815 | /// -- if it matches, only download log and treat snapshot as already synced | |
816 | /// - Iterate over referenced files | |
817 | /// -- if file already exists, verify contents | |
818 | /// -- if not, pull it from the remote | |
819 | /// - Download log if not already existing | |
05a52d01 HL |
820 | async fn pull_snapshot<'a>( |
821 | worker: &'a WorkerTask, | |
822 | reader: Arc<dyn PullReader + 'a>, | |
823 | snapshot: &'a pbs_datastore::BackupDir, | |
e2956c60 | 824 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
68ac365f CE |
825 | ) -> Result<PullStats, Error> { |
826 | let mut pull_stats = PullStats::default(); | |
c06c1b4b | 827 | let mut manifest_name = snapshot.full_path(); |
07ad6470 DM |
828 | manifest_name.push(MANIFEST_BLOB_NAME); |
829 | ||
c06c1b4b | 830 | let mut client_log_name = snapshot.full_path(); |
1610c45a DM |
831 | client_log_name.push(CLIENT_LOG_BLOB_NAME); |
832 | ||
07ad6470 DM |
833 | let mut tmp_manifest_name = manifest_name.clone(); |
834 | tmp_manifest_name.set_extension("tmp"); | |
05a52d01 HL |
835 | let tmp_manifest_blob; |
836 | if let Some(data) = reader | |
837 | .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker) | |
838 | .await? | |
839 | { | |
840 | tmp_manifest_blob = data; | |
841 | } else { | |
68ac365f | 842 | return Ok(pull_stats); |
05a52d01 | 843 | } |
07ad6470 DM |
844 | |
845 | if manifest_name.exists() { | |
6ef1b649 | 846 | let manifest_blob = proxmox_lang::try_block!({ |
e2956c60 | 847 | let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| { |
87cdc327 | 848 | format_err!("unable to open local manifest {manifest_name:?} - {err}") |
e2956c60 | 849 | })?; |
07ad6470 | 850 | |
39f18b30 | 851 | let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?; |
07ad6470 | 852 | Ok(manifest_blob) |
e2956c60 FG |
853 | }) |
854 | .map_err(|err: Error| { | |
87cdc327 | 855 | format_err!("unable to read local manifest {manifest_name:?} - {err}") |
07ad6470 DM |
856 | })?; |
857 | ||
858 | if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { | |
1610c45a | 859 | if !client_log_name.exists() { |
05a52d01 HL |
860 | reader |
861 | .try_download_client_log(&client_log_name, worker) | |
862 | .await?; | |
863 | }; | |
1ec0d70d | 864 | task_log!(worker, "no data changes"); |
e0085e66 | 865 | let _ = std::fs::remove_file(&tmp_manifest_name); |
68ac365f | 866 | return Ok(pull_stats); // nothing changed |
07ad6470 DM |
867 | } |
868 | } | |
869 | ||
870 | let manifest = BackupManifest::try_from(tmp_manifest_blob)?; | |
871 | ||
07ad6470 | 872 | for item in manifest.files() { |
c06c1b4b | 873 | let mut path = snapshot.full_path(); |
07ad6470 DM |
874 | path.push(&item.filename); |
875 | ||
876 | if path.exists() { | |
877 | match archive_type(&item.filename)? { | |
878 | ArchiveType::DynamicIndex => { | |
879 | let index = DynamicIndexReader::open(&path)?; | |
880 | let (csum, size) = index.compute_csum(); | |
881 | match manifest.verify_file(&item.filename, &csum, size) { | |
882 | Ok(_) => continue, | |
883 | Err(err) => { | |
1ec0d70d | 884 | task_log!(worker, "detected changed file {:?} - {}", path, err); |
07ad6470 DM |
885 | } |
886 | } | |
887 | } | |
888 | ArchiveType::FixedIndex => { | |
889 | let index = FixedIndexReader::open(&path)?; | |
890 | let (csum, size) = index.compute_csum(); | |
891 | match manifest.verify_file(&item.filename, &csum, size) { | |
892 | Ok(_) => continue, | |
893 | Err(err) => { | |
1ec0d70d | 894 | task_log!(worker, "detected changed file {:?} - {}", path, err); |
07ad6470 DM |
895 | } |
896 | } | |
897 | } | |
898 | ArchiveType::Blob => { | |
899 | let mut tmpfile = std::fs::File::open(&path)?; | |
ba0ccc59 | 900 | let (csum, size) = sha256(&mut tmpfile)?; |
07ad6470 DM |
901 | match manifest.verify_file(&item.filename, &csum, size) { |
902 | Ok(_) => continue, | |
903 | Err(err) => { | |
1ec0d70d | 904 | task_log!(worker, "detected changed file {:?} - {}", path, err); |
07ad6470 DM |
905 | } |
906 | } | |
907 | } | |
908 | } | |
909 | } | |
910 | ||
68ac365f | 911 | let stats = pull_single_archive( |
07ad6470 | 912 | worker, |
05a52d01 | 913 | reader.clone(), |
07ad6470 | 914 | snapshot, |
9a37bd6c | 915 | item, |
ebbe4958 | 916 | downloaded_chunks.clone(), |
e2956c60 FG |
917 | ) |
918 | .await?; | |
68ac365f | 919 | pull_stats.add(stats); |
07ad6470 DM |
920 | } |
921 | ||
922 | if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { | |
923 | bail!("Atomic rename file {:?} failed - {}", manifest_name, err); | |
924 | } | |
925 | ||
1610c45a | 926 | if !client_log_name.exists() { |
05a52d01 HL |
927 | reader |
928 | .try_download_client_log(&client_log_name, worker) | |
929 | .await?; | |
930 | }; | |
fe79687c TL |
931 | snapshot |
932 | .cleanup_unreferenced_files(&manifest) | |
933 | .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?; | |
07ad6470 | 934 | |
68ac365f | 935 | Ok(pull_stats) |
07ad6470 DM |
936 | } |
937 | ||
c06c1b4b FG |
938 | /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case. |
939 | /// | |
05a52d01 HL |
940 | /// The `reader` is configured to read from the source backup directory, while the |
941 | /// `snapshot` is pointing to the local datastore and target namespace. | |
942 | async fn pull_snapshot_from<'a>( | |
943 | worker: &'a WorkerTask, | |
944 | reader: Arc<dyn PullReader + 'a>, | |
945 | snapshot: &'a pbs_datastore::BackupDir, | |
e2956c60 | 946 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
68ac365f | 947 | ) -> Result<PullStats, Error> { |
c06c1b4b FG |
948 | let (_path, is_new, _snap_lock) = snapshot |
949 | .datastore() | |
1afce610 | 950 | .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?; |
07ad6470 | 951 | |
68ac365f | 952 | let pull_stats = if is_new { |
1afce610 | 953 | task_log!(worker, "sync snapshot {}", snapshot.dir()); |
07ad6470 | 954 | |
68ac365f CE |
955 | match pull_snapshot(worker, reader, snapshot, downloaded_chunks).await { |
956 | Err(err) => { | |
957 | if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir( | |
958 | snapshot.backup_ns(), | |
959 | snapshot.as_ref(), | |
960 | true, | |
961 | ) { | |
962 | task_log!(worker, "cleanup error - {}", cleanup_err); | |
963 | } | |
964 | return Err(err); | |
965 | } | |
966 | Ok(pull_stats) => { | |
967 | task_log!(worker, "sync snapshot {} done", snapshot.dir()); | |
968 | pull_stats | |
07ad6470 | 969 | } |
07ad6470 DM |
970 | } |
971 | } else { | |
1afce610 | 972 | task_log!(worker, "re-sync snapshot {}", snapshot.dir()); |
68ac365f CE |
973 | pull_snapshot(worker, reader, snapshot, downloaded_chunks).await? |
974 | }; | |
07ad6470 | 975 | |
68ac365f | 976 | Ok(pull_stats) |
07ad6470 DM |
977 | } |
978 | ||
40a57cfa | 979 | #[derive(PartialEq, Eq)] |
71db1615 SH |
980 | enum SkipReason { |
981 | AlreadySynced, | |
982 | TransferLast, | |
983 | } | |
984 | ||
40a57cfa FG |
985 | impl std::fmt::Display for SkipReason { |
986 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
987 | write!( | |
988 | f, | |
989 | "{}", | |
990 | match self { | |
991 | SkipReason::AlreadySynced => "older than the newest local snapshot", | |
992 | SkipReason::TransferLast => "due to transfer-last", | |
993 | } | |
994 | ) | |
995 | } | |
996 | } | |
997 | ||
d2354a16 DC |
998 | struct SkipInfo { |
999 | oldest: i64, | |
1000 | newest: i64, | |
1001 | count: u64, | |
71db1615 | 1002 | skip_reason: SkipReason, |
d2354a16 DC |
1003 | } |
1004 | ||
1005 | impl SkipInfo { | |
71db1615 SH |
1006 | fn new(skip_reason: SkipReason) -> Self { |
1007 | SkipInfo { | |
1008 | oldest: i64::MAX, | |
1009 | newest: i64::MIN, | |
1010 | count: 0, | |
1011 | skip_reason, | |
1012 | } | |
1013 | } | |
1014 | ||
1015 | fn reset(&mut self) { | |
1016 | self.count = 0; | |
1017 | self.oldest = i64::MAX; | |
1018 | self.newest = i64::MIN; | |
1019 | } | |
1020 | ||
d2354a16 DC |
1021 | fn update(&mut self, backup_time: i64) { |
1022 | self.count += 1; | |
1023 | ||
1024 | if backup_time < self.oldest { | |
1025 | self.oldest = backup_time; | |
1026 | } | |
1027 | ||
1028 | if backup_time > self.newest { | |
1029 | self.newest = backup_time; | |
1030 | } | |
1031 | } | |
1032 | ||
1033 | fn affected(&self) -> Result<String, Error> { | |
1034 | match self.count { | |
1035 | 0 => Ok(String::new()), | |
6ef1b649 | 1036 | 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?), |
ee0ea735 TL |
1037 | _ => Ok(format!( |
1038 | "{} .. {}", | |
1039 | proxmox_time::epoch_to_rfc3339_utc(self.oldest)?, | |
1040 | proxmox_time::epoch_to_rfc3339_utc(self.newest)?, | |
1041 | )), | |
d2354a16 DC |
1042 | } |
1043 | } | |
1044 | } | |
1045 | ||
1046 | impl std::fmt::Display for SkipInfo { | |
1047 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
1048 | write!( | |
1049 | f, | |
71db1615 | 1050 | "skipped: {} snapshot(s) ({}) - {}", |
d2354a16 | 1051 | self.count, |
71db1615 | 1052 | self.affected().map_err(|_| std::fmt::Error)?, |
40a57cfa | 1053 | self.skip_reason, |
d2354a16 DC |
1054 | ) |
1055 | } | |
1056 | } | |
1057 | ||
29c56859 FG |
1058 | /// Pulls a group according to `params`. |
1059 | /// | |
1060 | /// Pulling a group consists of the following steps: | |
c06c1b4b FG |
1061 | /// - Query the list of snapshots available for this group in the source namespace on the remote |
1062 | /// - Sort by snapshot time | |
29c56859 FG |
1063 | /// - Get last snapshot timestamp on local datastore |
1064 | /// - Iterate over list of snapshots | |
29c56859 FG |
1065 | /// -- pull snapshot, unless it's not finished yet or older than last local snapshot |
1066 | /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote | |
1067 | /// | |
05a52d01 | 1068 | /// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the |
c06c1b4b FG |
1069 | /// remote when querying snapshots. This allows us to interact with old remotes that don't have |
1070 | /// namespace support yet. | |
1071 | /// | |
29c56859 FG |
1072 | /// Permission checks: |
1073 | /// - remote snapshot access is checked by remote (twice: query and opening the backup reader) | |
1074 | /// - local group owner is already checked by pull_store | |
aa073917 | 1075 | async fn pull_group( |
07ad6470 | 1076 | worker: &WorkerTask, |
6e9e6c7a | 1077 | params: &PullParameters, |
05a52d01 HL |
1078 | source_namespace: &BackupNamespace, |
1079 | group: &BackupGroup, | |
fc8920e3 | 1080 | progress: &mut StoreProgress, |
68ac365f | 1081 | ) -> Result<PullStats, Error> { |
71db1615 SH |
1082 | let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced); |
1083 | let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast); | |
d2354a16 | 1084 | |
05a52d01 HL |
1085 | let mut raw_list: Vec<BackupDir> = params |
1086 | .source | |
1087 | .list_backup_dirs(source_namespace, group, worker) | |
1088 | .await?; | |
1089 | raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time)); | |
1090 | ||
1091 | let total_amount = raw_list.len(); | |
9b67352a SH |
1092 | |
1093 | let cutoff = params | |
1094 | .transfer_last | |
1095 | .map(|count| total_amount.saturating_sub(count)) | |
1096 | .unwrap_or_default(); | |
1097 | ||
05a52d01 | 1098 | let target_ns = source_namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; |
86f6f741 | 1099 | |
05a52d01 HL |
1100 | let mut source_snapshots = HashSet::new(); |
1101 | let last_sync_time = params | |
1102 | .target | |
1103 | .store | |
1104 | .last_successful_backup(&target_ns, group)? | |
1105 | .unwrap_or(i64::MIN); | |
1106 | ||
1107 | let list: Vec<BackupDir> = raw_list | |
1108 | .into_iter() | |
1109 | .enumerate() | |
1110 | .filter(|&(pos, ref dir)| { | |
1111 | source_snapshots.insert(dir.time); | |
1112 | if last_sync_time > dir.time { | |
1113 | already_synced_skip_info.update(dir.time); | |
1114 | return false; | |
1115 | } else if already_synced_skip_info.count > 0 { | |
1116 | task_log!(worker, "{}", already_synced_skip_info); | |
1117 | already_synced_skip_info.reset(); | |
1118 | return true; | |
1119 | } | |
0081903f | 1120 | |
05a52d01 HL |
1121 | if pos < cutoff && last_sync_time != dir.time { |
1122 | transfer_last_skip_info.update(dir.time); | |
1123 | return false; | |
1124 | } else if transfer_last_skip_info.count > 0 { | |
1125 | task_log!(worker, "{}", transfer_last_skip_info); | |
1126 | transfer_last_skip_info.reset(); | |
1127 | } | |
1128 | true | |
1129 | }) | |
1130 | .map(|(_, dir)| dir) | |
1131 | .collect(); | |
07ad6470 | 1132 | |
05a52d01 HL |
1133 | // start with 65536 chunks (up to 256 GiB) |
1134 | let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); | |
07ad6470 | 1135 | |
05a52d01 | 1136 | progress.group_snapshots = list.len() as u64; |
07ad6470 | 1137 | |
68ac365f CE |
1138 | let mut pull_stats = PullStats::default(); |
1139 | ||
05a52d01 HL |
1140 | for (pos, from_snapshot) in list.into_iter().enumerate() { |
1141 | let to_snapshot = params | |
1142 | .target | |
1143 | .store | |
1144 | .backup_dir(target_ns.clone(), from_snapshot.clone())?; | |
c06c1b4b | 1145 | |
05a52d01 HL |
1146 | let reader = params |
1147 | .source | |
1148 | .reader(source_namespace, &from_snapshot) | |
1149 | .await?; | |
1150 | let result = | |
1151 | pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await; | |
7b8aa893 | 1152 | |
fc8920e3 | 1153 | progress.done_snapshots = pos as u64 + 1; |
1ec0d70d | 1154 | task_log!(worker, "percentage done: {}", progress); |
7b8aa893 | 1155 | |
68ac365f CE |
1156 | let stats = result?; // stop on error |
1157 | pull_stats.add(stats); | |
07ad6470 DM |
1158 | } |
1159 | ||
6e9e6c7a | 1160 | if params.remove_vanished { |
05a52d01 HL |
1161 | let group = params |
1162 | .target | |
1163 | .store | |
1164 | .backup_group(target_ns.clone(), group.clone()); | |
6da20161 | 1165 | let local_list = group.list_backups()?; |
07ad6470 | 1166 | for info in local_list { |
1afce610 | 1167 | let snapshot = info.backup_dir; |
05a52d01 | 1168 | if source_snapshots.contains(&snapshot.backup_time()) { |
e2956c60 FG |
1169 | continue; |
1170 | } | |
1afce610 | 1171 | if snapshot.is_protected() { |
34339261 DC |
1172 | task_log!( |
1173 | worker, | |
e3ea5770 | 1174 | "don't delete vanished snapshot {} (protected)", |
1afce610 | 1175 | snapshot.dir() |
34339261 DC |
1176 | ); |
1177 | continue; | |
1178 | } | |
1afce610 | 1179 | task_log!(worker, "delete vanished snapshot {}", snapshot.dir()); |
db87d93e | 1180 | params |
05a52d01 | 1181 | .target |
db87d93e | 1182 | .store |
1afce610 | 1183 | .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?; |
ceb639bd CE |
1184 | pull_stats.add(PullStats::from(RemovedVanishedStats { |
1185 | snapshots: 1, | |
1186 | groups: 0, | |
1187 | namespaces: 0, | |
1188 | })); | |
07ad6470 DM |
1189 | } |
1190 | } | |
1191 | ||
68ac365f | 1192 | Ok(pull_stats) |
07ad6470 DM |
1193 | } |
1194 | ||
abd82485 | 1195 | fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> { |
c06c1b4b | 1196 | let mut created = false; |
05a52d01 | 1197 | let store_ns_str = print_store_and_ns(params.target.store.name(), ns); |
c06c1b4b | 1198 | |
05a52d01 HL |
1199 | if !ns.is_root() && !params.target.store.namespace_path(ns).exists() { |
1200 | check_ns_modification_privs(params.target.store.name(), ns, ¶ms.owner) | |
77bd14f6 FG |
1201 | .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?; |
1202 | ||
ea2e91e5 FG |
1203 | let name = match ns.components().last() { |
1204 | Some(name) => name.to_owned(), | |
1205 | None => { | |
1206 | bail!("Failed to determine last component of namespace."); | |
c06c1b4b | 1207 | } |
ea2e91e5 FG |
1208 | }; |
1209 | ||
05a52d01 | 1210 | if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) { |
abd82485 | 1211 | bail!("sync into {store_ns_str} failed - namespace creation failed: {err}"); |
c06c1b4b | 1212 | } |
ea2e91e5 | 1213 | created = true; |
c06c1b4b FG |
1214 | } |
1215 | ||
abd82485 | 1216 | check_ns_privs( |
05a52d01 | 1217 | params.target.store.name(), |
abd82485 FG |
1218 | ns, |
1219 | ¶ms.owner, | |
1220 | PRIV_DATASTORE_BACKUP, | |
1221 | ) | |
1222 | .map_err(|err| format_err!("sync into {store_ns_str} not allowed - {err}"))?; | |
c06c1b4b FG |
1223 | |
1224 | Ok(created) | |
1225 | } | |
1226 | ||
1227 | fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> { | |
05a52d01 | 1228 | check_ns_modification_privs(params.target.store.name(), local_ns, ¶ms.owner) |
77bd14f6 | 1229 | .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?; |
ea2e91e5 | 1230 | |
05a52d01 HL |
1231 | params |
1232 | .target | |
1233 | .store | |
1234 | .remove_namespace_recursive(local_ns, true) | |
c06c1b4b FG |
1235 | } |
1236 | ||
1237 | fn check_and_remove_vanished_ns( | |
1238 | worker: &WorkerTask, | |
1239 | params: &PullParameters, | |
1240 | synced_ns: HashSet<BackupNamespace>, | |
ceb639bd | 1241 | ) -> Result<(bool, RemovedVanishedStats), Error> { |
c06c1b4b | 1242 | let mut errors = false; |
ceb639bd | 1243 | let mut removed_stats = RemovedVanishedStats::default(); |
c06c1b4b FG |
1244 | let user_info = CachedUserInfo::new()?; |
1245 | ||
87be232d FG |
1246 | // clamp like remote does so that we don't list more than we can ever have synced. |
1247 | let max_depth = params | |
1248 | .max_depth | |
05a52d01 | 1249 | .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth()); |
87be232d | 1250 | |
c06c1b4b | 1251 | let mut local_ns_list: Vec<BackupNamespace> = params |
05a52d01 | 1252 | .target |
c06c1b4b | 1253 | .store |
05a52d01 | 1254 | .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))? |
c06c1b4b | 1255 | .filter(|ns| { |
abd82485 | 1256 | let user_privs = |
05a52d01 | 1257 | user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.target.store.name())); |
c06c1b4b FG |
1258 | user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0 |
1259 | }) | |
1260 | .collect(); | |
1261 | ||
1262 | // children first! | |
1263 | local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len())); | |
1264 | ||
1265 | for local_ns in local_ns_list { | |
05a52d01 | 1266 | if local_ns == params.target.ns { |
c06c1b4b FG |
1267 | continue; |
1268 | } | |
1269 | ||
1270 | if synced_ns.contains(&local_ns) { | |
1271 | continue; | |
1272 | } | |
1273 | ||
1274 | if local_ns.is_root() { | |
1275 | continue; | |
1276 | } | |
1277 | match check_and_remove_ns(params, &local_ns) { | |
ceb639bd CE |
1278 | Ok(true) => { |
1279 | task_log!(worker, "Removed namespace {local_ns}"); | |
1280 | removed_stats.namespaces += 1; | |
1281 | } | |
c06c1b4b FG |
1282 | Ok(false) => task_log!( |
1283 | worker, | |
1284 | "Did not remove namespace {} - protected snapshots remain", | |
1285 | local_ns | |
1286 | ), | |
1287 | Err(err) => { | |
1288 | task_log!(worker, "Failed to remove namespace {} - {}", local_ns, err); | |
1289 | errors = true; | |
1290 | } | |
1291 | } | |
1292 | } | |
1293 | ||
ceb639bd | 1294 | Ok((errors, removed_stats)) |
c06c1b4b FG |
1295 | } |
1296 | ||
29c56859 FG |
1297 | /// Pulls a store according to `params`. |
1298 | /// | |
1299 | /// Pulling a store consists of the following steps: | |
c06c1b4b FG |
1300 | /// - Query list of namespaces on the remote |
1301 | /// - Iterate list | |
1302 | /// -- create sub-NS if needed (and allowed) | |
1303 | /// -- attempt to pull each NS in turn | |
1304 | /// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote | |
1305 | /// | |
1306 | /// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is | |
1307 | /// passed to the remote at all to allow pulling from remotes which have no notion of namespaces. | |
29c56859 FG |
1308 | /// |
1309 | /// Permission checks: | |
c06c1b4b FG |
1310 | /// - access to local datastore, namespace anchor and remote entry need to be checked at call site |
1311 | /// - remote namespaces are filtered by remote | |
1312 | /// - creation and removal of sub-NS checked here | |
1313 | /// - access to sub-NS checked here | |
5b6cb51d | 1314 | pub(crate) async fn pull_store( |
07ad6470 | 1315 | worker: &WorkerTask, |
d9aad37f | 1316 | mut params: PullParameters, |
68ac365f | 1317 | ) -> Result<PullStats, Error> { |
07ad6470 | 1318 | // explicit create shared lock to prevent GC on newly created chunks |
05a52d01 | 1319 | let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?; |
7a3e777d | 1320 | let mut errors = false; |
07ad6470 | 1321 | |
7a3e777d | 1322 | let old_max_depth = params.max_depth; |
05a52d01 HL |
1323 | let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) { |
1324 | vec![params.source.get_ns()] // backwards compat - don't query remote namespaces! | |
c06c1b4b | 1325 | } else { |
05a52d01 HL |
1326 | params |
1327 | .source | |
1328 | .list_namespaces(&mut params.max_depth, worker) | |
1329 | .await? | |
c06c1b4b | 1330 | }; |
05a52d01 HL |
1331 | |
1332 | let ns_layers_to_be_pulled = namespaces | |
1333 | .iter() | |
1334 | .map(BackupNamespace::depth) | |
1335 | .max() | |
1336 | .map_or(0, |v| v - params.source.get_ns().depth()); | |
1337 | let target_depth = params.target.ns.depth(); | |
1338 | ||
1339 | if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH { | |
1340 | bail!( | |
1341 | "Syncing would exceed max allowed namespace depth. ({}+{} > {})", | |
1342 | ns_layers_to_be_pulled, | |
1343 | target_depth, | |
1344 | MAX_NAMESPACE_DEPTH | |
1345 | ); | |
1346 | } | |
1347 | ||
7a3e777d | 1348 | errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode |
05a52d01 | 1349 | namespaces.sort_unstable_by_key(|a| a.name_len()); |
c06c1b4b FG |
1350 | |
1351 | let (mut groups, mut snapshots) = (0, 0); | |
1352 | let mut synced_ns = HashSet::with_capacity(namespaces.len()); | |
68ac365f | 1353 | let mut pull_stats = PullStats::default(); |
c06c1b4b FG |
1354 | |
1355 | for namespace in namespaces { | |
4cc4ea64 | 1356 | let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace); |
abd82485 | 1357 | |
05a52d01 HL |
1358 | let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; |
1359 | let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns); | |
c06c1b4b FG |
1360 | |
1361 | task_log!(worker, "----"); | |
1362 | task_log!( | |
1363 | worker, | |
1364 | "Syncing {} into {}", | |
abd82485 FG |
1365 | source_store_ns_str, |
1366 | target_store_ns_str | |
c06c1b4b FG |
1367 | ); |
1368 | ||
1369 | synced_ns.insert(target_ns.clone()); | |
1370 | ||
abd82485 | 1371 | match check_and_create_ns(¶ms, &target_ns) { |
c06c1b4b FG |
1372 | Ok(true) => task_log!(worker, "Created namespace {}", target_ns), |
1373 | Ok(false) => {} | |
1374 | Err(err) => { | |
1375 | task_log!( | |
1376 | worker, | |
1377 | "Cannot sync {} into {} - {}", | |
abd82485 FG |
1378 | source_store_ns_str, |
1379 | target_store_ns_str, | |
c06c1b4b FG |
1380 | err, |
1381 | ); | |
1382 | errors = true; | |
1383 | continue; | |
1384 | } | |
1385 | } | |
1386 | ||
05a52d01 | 1387 | match pull_ns(worker, &namespace, &mut params).await { |
68ac365f | 1388 | Ok((ns_progress, ns_pull_stats, ns_errors)) => { |
c06c1b4b FG |
1389 | errors |= ns_errors; |
1390 | ||
68ac365f CE |
1391 | pull_stats.add(ns_pull_stats); |
1392 | ||
b9310489 | 1393 | if params.max_depth != Some(0) { |
c06c1b4b FG |
1394 | groups += ns_progress.done_groups; |
1395 | snapshots += ns_progress.done_snapshots; | |
1396 | task_log!( | |
1397 | worker, | |
1398 | "Finished syncing namespace {}, current progress: {} groups, {} snapshots", | |
1399 | namespace, | |
1400 | groups, | |
1401 | snapshots, | |
1402 | ); | |
1403 | } | |
1404 | } | |
1405 | Err(err) => { | |
1406 | errors = true; | |
1407 | task_log!( | |
1408 | worker, | |
1409 | "Encountered errors while syncing namespace {} - {}", | |
05a52d01 | 1410 | &namespace, |
c06c1b4b FG |
1411 | err, |
1412 | ); | |
1413 | } | |
1414 | }; | |
1415 | } | |
1416 | ||
1417 | if params.remove_vanished { | |
ceb639bd CE |
1418 | let (has_errors, stats) = check_and_remove_vanished_ns(worker, ¶ms, synced_ns)?; |
1419 | errors |= has_errors; | |
1420 | pull_stats.add(PullStats::from(stats)); | |
c06c1b4b FG |
1421 | } |
1422 | ||
1423 | if errors { | |
1424 | bail!("sync failed with some errors."); | |
1425 | } | |
1426 | ||
68ac365f | 1427 | Ok(pull_stats) |
c06c1b4b FG |
1428 | } |
1429 | ||
1430 | /// Pulls a namespace according to `params`. | |
1431 | /// | |
1432 | /// Pulling a namespace consists of the following steps: | |
1433 | /// - Query list of groups on the remote (in `source_ns`) | |
1434 | /// - Filter list according to configured group filters | |
1435 | /// - Iterate list and attempt to pull each group in turn | |
1436 | /// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are | |
1437 | /// not or no longer available on the remote | |
1438 | /// | |
1439 | /// Permission checks: | |
1440 | /// - remote namespaces are filtered by remote | |
1441 | /// - owner check for vanished groups done here | |
5b6cb51d | 1442 | pub(crate) async fn pull_ns( |
c06c1b4b | 1443 | worker: &WorkerTask, |
05a52d01 HL |
1444 | namespace: &BackupNamespace, |
1445 | params: &mut PullParameters, | |
68ac365f | 1446 | ) -> Result<(StoreProgress, PullStats, bool), Error> { |
05a52d01 | 1447 | let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?; |
07ad6470 DM |
1448 | |
1449 | list.sort_unstable_by(|a, b| { | |
05a52d01 | 1450 | let type_order = a.ty.cmp(&b.ty); |
07ad6470 | 1451 | if type_order == std::cmp::Ordering::Equal { |
05a52d01 | 1452 | a.id.cmp(&b.id) |
07ad6470 DM |
1453 | } else { |
1454 | type_order | |
1455 | } | |
1456 | }); | |
1457 | ||
59c92736 PH |
1458 | let unfiltered_count = list.len(); |
1459 | let list: Vec<BackupGroup> = list | |
1460 | .into_iter() | |
1461 | .filter(|group| group.apply_filters(¶ms.group_filter)) | |
1462 | .collect(); | |
1463 | task_log!( | |
1464 | worker, | |
1465 | "found {} groups to sync (out of {} total)", | |
1466 | list.len(), | |
1467 | unfiltered_count | |
1468 | ); | |
71e53463 | 1469 | |
07ad6470 DM |
1470 | let mut errors = false; |
1471 | ||
05a52d01 | 1472 | let mut new_groups = HashSet::new(); |
e2e7560d FG |
1473 | for group in list.iter() { |
1474 | new_groups.insert(group.clone()); | |
07ad6470 DM |
1475 | } |
1476 | ||
fc8920e3 | 1477 | let mut progress = StoreProgress::new(list.len() as u64); |
68ac365f | 1478 | let mut pull_stats = PullStats::default(); |
fc8920e3 | 1479 | |
05a52d01 HL |
1480 | let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; |
1481 | ||
e2e7560d | 1482 | for (done, group) in list.into_iter().enumerate() { |
fc8920e3 FG |
1483 | progress.done_groups = done as u64; |
1484 | progress.done_snapshots = 0; | |
1485 | progress.group_snapshots = 0; | |
7b8aa893 | 1486 | |
133d718f WB |
1487 | let (owner, _lock_guard) = |
1488 | match params | |
05a52d01 | 1489 | .target |
133d718f | 1490 | .store |
c06c1b4b | 1491 | .create_locked_backup_group(&target_ns, &group, ¶ms.owner) |
133d718f WB |
1492 | { |
1493 | Ok(result) => result, | |
1494 | Err(err) => { | |
1495 | task_log!( | |
1496 | worker, | |
1497 | "sync group {} failed - group lock failed: {}", | |
1498 | &group, | |
1499 | err | |
1500 | ); | |
05a52d01 HL |
1501 | errors = true; |
1502 | // do not stop here, instead continue | |
1503 | task_log!(worker, "create_locked_backup_group failed"); | |
133d718f WB |
1504 | continue; |
1505 | } | |
1506 | }; | |
30f73fa2 | 1507 | |
07ad6470 | 1508 | // permission check |
6e9e6c7a | 1509 | if params.owner != owner { |
e2956c60 | 1510 | // only the owner is allowed to create additional snapshots |
1ec0d70d DM |
1511 | task_log!( |
1512 | worker, | |
e2e7560d | 1513 | "sync group {} failed - owner check failed ({} != {})", |
ee0ea735 TL |
1514 | &group, |
1515 | params.owner, | |
1516 | owner | |
1ec0d70d | 1517 | ); |
7b8aa893 | 1518 | errors = true; // do not stop here, instead continue |
68ac365f CE |
1519 | } else { |
1520 | match pull_group(worker, params, namespace, &group, &mut progress).await { | |
1521 | Ok(stats) => pull_stats.add(stats), | |
1522 | Err(err) => { | |
1523 | task_log!(worker, "sync group {} failed - {}", &group, err,); | |
1524 | errors = true; // do not stop here, instead continue | |
1525 | } | |
1526 | } | |
07ad6470 DM |
1527 | } |
1528 | } | |
1529 | ||
6e9e6c7a | 1530 | if params.remove_vanished { |
6ef1b649 | 1531 | let result: Result<(), Error> = proxmox_lang::try_block!({ |
05a52d01 | 1532 | for local_group in params.target.store.iter_backup_groups(target_ns.clone())? { |
249dde8b | 1533 | let local_group = local_group?; |
e13303fc FG |
1534 | let local_group = local_group.group(); |
1535 | if new_groups.contains(local_group) { | |
e2956c60 FG |
1536 | continue; |
1537 | } | |
05a52d01 | 1538 | let owner = params.target.store.get_owner(&target_ns, local_group)?; |
df768ebe FG |
1539 | if check_backup_owner(&owner, ¶ms.owner).is_err() { |
1540 | continue; | |
1541 | } | |
59c92736 PH |
1542 | if !local_group.apply_filters(¶ms.group_filter) { |
1543 | continue; | |
71e53463 | 1544 | } |
e13303fc | 1545 | task_log!(worker, "delete vanished group '{local_group}'",); |
524ed404 | 1546 | let delete_stats_result = params |
05a52d01 HL |
1547 | .target |
1548 | .store | |
524ed404 CE |
1549 | .remove_backup_group(&target_ns, local_group); |
1550 | ||
1551 | match delete_stats_result { | |
1552 | Ok(stats) => { | |
1553 | if !stats.all_removed() { | |
1554 | task_log!( | |
1555 | worker, | |
1556 | "kept some protected snapshots of group '{local_group}'", | |
1557 | ); | |
ceb639bd CE |
1558 | pull_stats.add(PullStats::from(RemovedVanishedStats { |
1559 | snapshots: stats.removed_snapshots(), | |
1560 | groups: 0, | |
1561 | namespaces: 0, | |
1562 | })); | |
1563 | } else { | |
1564 | pull_stats.add(PullStats::from(RemovedVanishedStats { | |
1565 | snapshots: stats.removed_snapshots(), | |
1566 | groups: 1, | |
1567 | namespaces: 0, | |
1568 | })); | |
524ed404 | 1569 | } |
ee0ea735 | 1570 | } |
34339261 DC |
1571 | Err(err) => { |
1572 | task_log!(worker, "{}", err); | |
1573 | errors = true; | |
1574 | } | |
07ad6470 DM |
1575 | } |
1576 | } | |
1577 | Ok(()) | |
1578 | }); | |
1579 | if let Err(err) = result { | |
1ec0d70d | 1580 | task_log!(worker, "error during cleanup: {}", err); |
07ad6470 DM |
1581 | errors = true; |
1582 | }; | |
1583 | } | |
1584 | ||
68ac365f | 1585 | Ok((progress, pull_stats, errors)) |
07ad6470 | 1586 | } |