]> git.proxmox.com Git - proxmox-backup.git/blame - src/server/pull.rs
fix #4315: jobs: modify GroupFilter so include/exclude is tracked
[proxmox-backup.git] / src / server / pull.rs
CommitLineData
07ad6470
DM
1//! Sync datastore from remote server
2
e2956c60 3use std::collections::{HashMap, HashSet};
076f36ec
HL
4use std::io::{Seek, Write};
5use std::path::{Path, PathBuf};
54417086 6use std::sync::atomic::{AtomicUsize, Ordering};
e2956c60
FG
7use std::sync::{Arc, Mutex};
8use std::time::SystemTime;
07ad6470 9
c23192d3 10use anyhow::{bail, format_err, Error};
6ef1b649 11use http::StatusCode;
05a52d01 12use proxmox_rest_server::WorkerTask;
6ef1b649 13use proxmox_router::HttpError;
05a52d01
HL
14use proxmox_sys::{task_log, task_warn};
15use serde_json::json;
c23192d3 16
2d5287fb 17use pbs_api_types::{
05a52d01
HL
18 print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
19 GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
7aeabff2 20 PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
2d5287fb 21};
05a52d01
HL
22use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
23use pbs_config::CachedUserInfo;
ea584a75
WB
24use pbs_datastore::data_blob::DataBlob;
25use pbs_datastore::dynamic_index::DynamicIndexReader;
26use pbs_datastore::fixed_index::FixedIndexReader;
27use pbs_datastore::index::IndexFile;
28use pbs_datastore::manifest::{
ee0ea735 29 archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
ea584a75 30};
05a52d01 31use pbs_datastore::read_chunk::AsyncReadChunk;
076f36ec
HL
32use pbs_datastore::{
33 check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
34};
ba0ccc59 35use pbs_tools::sha::sha256;
c23192d3 36
076f36ec 37use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
260147bd 38use crate::tools::parallel_handler::ParallelHandler;
07ad6470 39
05a52d01
HL
40struct RemoteReader {
41 backup_reader: Arc<BackupReader>,
42 dir: BackupDir,
43}
44
076f36ec
HL
45struct LocalReader {
46 _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
47 path: PathBuf,
48 datastore: Arc<DataStore>,
49}
50
05a52d01 51pub(crate) struct PullTarget {
6e9e6c7a 52 store: Arc<DataStore>,
c06c1b4b 53 ns: BackupNamespace,
05a52d01
HL
54}
55
56pub(crate) struct RemoteSource {
57 repo: BackupRepository,
58 ns: BackupNamespace,
59 client: HttpClient,
60}
61
076f36ec
HL
62pub(crate) struct LocalSource {
63 store: Arc<DataStore>,
64 ns: BackupNamespace,
65}
66
05a52d01
HL
67#[async_trait::async_trait]
68/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
69/// The trait includes methods for listing namespaces, groups, and backup directories,
70/// as well as retrieving a reader for reading data from the source
71trait PullSource: Send + Sync {
72 /// Lists namespaces from the source.
73 async fn list_namespaces(
74 &self,
75 max_depth: &mut Option<usize>,
76 worker: &WorkerTask,
77 ) -> Result<Vec<BackupNamespace>, Error>;
78
79 /// Lists groups within a specific namespace from the source.
80 async fn list_groups(
81 &self,
82 namespace: &BackupNamespace,
83 owner: &Authid,
84 ) -> Result<Vec<BackupGroup>, Error>;
85
86 /// Lists backup directories for a specific group within a specific namespace from the source.
87 async fn list_backup_dirs(
88 &self,
89 namespace: &BackupNamespace,
90 group: &BackupGroup,
91 worker: &WorkerTask,
92 ) -> Result<Vec<BackupDir>, Error>;
93 fn get_ns(&self) -> BackupNamespace;
4cc4ea64 94 fn get_store(&self) -> &str;
05a52d01
HL
95
96 /// Returns a reader for reading data from a specific backup directory.
97 async fn reader(
98 &self,
99 ns: &BackupNamespace,
100 dir: &BackupDir,
101 ) -> Result<Arc<dyn PullReader>, Error>;
102}
103
104#[async_trait::async_trait]
105impl PullSource for RemoteSource {
106 async fn list_namespaces(
107 &self,
108 max_depth: &mut Option<usize>,
109 worker: &WorkerTask,
110 ) -> Result<Vec<BackupNamespace>, Error> {
111 if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
112 return Ok(vec![self.ns.clone()]);
113 }
114
115 let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
116 let mut data = json!({});
117 if let Some(max_depth) = max_depth {
118 data["max-depth"] = json!(max_depth);
119 }
120
121 if !self.ns.is_root() {
122 data["parent"] = json!(self.ns);
123 }
124 self.client.login().await?;
125
126 let mut result = match self.client.get(&path, Some(data)).await {
127 Ok(res) => res,
128 Err(err) => match err.downcast_ref::<HttpError>() {
129 Some(HttpError { code, message }) => match code {
130 &StatusCode::NOT_FOUND => {
131 if self.ns.is_root() && max_depth.is_none() {
132 task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
133 task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
134 max_depth.replace(0);
135 } else {
136 bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
137 }
138
139 return Ok(vec![self.ns.clone()]);
140 }
141 _ => {
142 bail!("Querying namespaces failed - HTTP error {code} - {message}");
143 }
144 },
145 None => {
146 bail!("Querying namespaces failed - {err}");
147 }
148 },
149 };
150
151 let list: Vec<BackupNamespace> =
152 serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
153 .into_iter()
154 .map(|list_item| list_item.ns)
155 .collect();
156
157 Ok(list)
158 }
159
160 async fn list_groups(
161 &self,
162 namespace: &BackupNamespace,
163 _owner: &Authid,
164 ) -> Result<Vec<BackupGroup>, Error> {
165 let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
166
167 let args = if !namespace.is_root() {
168 Some(json!({ "ns": namespace.clone() }))
169 } else {
170 None
171 };
172
173 self.client.login().await?;
174 let mut result =
175 self.client.get(&path, args).await.map_err(|err| {
176 format_err!("Failed to retrieve backup groups from remote - {}", err)
177 })?;
178
179 Ok(
180 serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
181 .map_err(Error::from)?
182 .into_iter()
183 .map(|item| item.backup)
184 .collect::<Vec<BackupGroup>>(),
185 )
186 }
187
188 async fn list_backup_dirs(
189 &self,
b14e5dcb 190 namespace: &BackupNamespace,
05a52d01
HL
191 group: &BackupGroup,
192 worker: &WorkerTask,
193 ) -> Result<Vec<BackupDir>, Error> {
194 let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
195
196 let mut args = json!({
197 "backup-type": group.ty,
198 "backup-id": group.id,
199 });
200
b14e5dcb
FG
201 if !namespace.is_root() {
202 args["ns"] = serde_json::to_value(&namespace)?;
05a52d01
HL
203 }
204
205 self.client.login().await?;
206
207 let mut result = self.client.get(&path, Some(args)).await?;
208 let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
209 Ok(snapshot_list
210 .into_iter()
211 .filter_map(|item: SnapshotListItem| {
212 let snapshot = item.backup;
213 // in-progress backups can't be synced
214 if item.size.is_none() {
215 task_log!(
216 worker,
217 "skipping snapshot {} - in-progress backup",
218 snapshot
219 );
220 return None;
221 }
222
223 Some(snapshot)
224 })
225 .collect::<Vec<BackupDir>>())
226 }
227
228 fn get_ns(&self) -> BackupNamespace {
229 self.ns.clone()
230 }
231
4cc4ea64
FG
232 fn get_store(&self) -> &str {
233 &self.repo.store()
05a52d01
HL
234 }
235
236 async fn reader(
237 &self,
238 ns: &BackupNamespace,
239 dir: &BackupDir,
240 ) -> Result<Arc<dyn PullReader>, Error> {
241 let backup_reader =
242 BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
243 Ok(Arc::new(RemoteReader {
244 backup_reader,
245 dir: dir.clone(),
246 }))
247 }
248}
249
076f36ec
HL
250#[async_trait::async_trait]
251impl PullSource for LocalSource {
252 async fn list_namespaces(
253 &self,
254 max_depth: &mut Option<usize>,
255 _worker: &WorkerTask,
256 ) -> Result<Vec<BackupNamespace>, Error> {
257 ListNamespacesRecursive::new_max_depth(
258 self.store.clone(),
259 self.ns.clone(),
260 max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
261 )?
262 .collect()
263 }
264
265 async fn list_groups(
266 &self,
267 namespace: &BackupNamespace,
268 owner: &Authid,
269 ) -> Result<Vec<BackupGroup>, Error> {
270 Ok(ListAccessibleBackupGroups::new_with_privs(
271 &self.store,
272 namespace.clone(),
273 0,
7aeabff2
HL
274 Some(PRIV_DATASTORE_READ),
275 Some(PRIV_DATASTORE_BACKUP),
076f36ec
HL
276 Some(owner),
277 )?
278 .filter_map(Result::ok)
279 .map(|backup_group| backup_group.group().clone())
280 .collect::<Vec<pbs_api_types::BackupGroup>>())
281 }
282
283 async fn list_backup_dirs(
284 &self,
285 namespace: &BackupNamespace,
286 group: &BackupGroup,
287 _worker: &WorkerTask,
288 ) -> Result<Vec<BackupDir>, Error> {
289 Ok(self
290 .store
291 .backup_group(namespace.clone(), group.clone())
292 .iter_snapshots()?
293 .filter_map(Result::ok)
294 .map(|snapshot| snapshot.dir().to_owned())
295 .collect::<Vec<BackupDir>>())
296 }
297
298 fn get_ns(&self) -> BackupNamespace {
299 self.ns.clone()
300 }
301
4cc4ea64
FG
302 fn get_store(&self) -> &str {
303 self.store.name()
076f36ec
HL
304 }
305
306 async fn reader(
307 &self,
308 ns: &BackupNamespace,
309 dir: &BackupDir,
310 ) -> Result<Arc<dyn PullReader>, Error> {
311 let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
312 let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
313 &dir.full_path(),
314 "snapshot",
315 "locked by another operation",
316 )?;
317 Ok(Arc::new(LocalReader {
318 _dir_lock: Arc::new(Mutex::new(dir_lock)),
319 path: dir.full_path(),
320 datastore: dir.datastore().clone(),
321 }))
322 }
323}
324
05a52d01
HL
325#[async_trait::async_trait]
326/// `PullReader` is a trait that provides an interface for reading data from a source.
327/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
328trait PullReader: Send + Sync {
329 /// Returns a chunk reader with the specified encryption mode.
330 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
331
332 /// Asynchronously loads a file from the source into a local file.
333 /// `filename` is the name of the file to load from the source.
334 /// `into` is the path of the local file to load the source file into.
335 async fn load_file_into(
336 &self,
337 filename: &str,
338 into: &Path,
339 worker: &WorkerTask,
340 ) -> Result<Option<DataBlob>, Error>;
341
342 /// Tries to download the client log from the source and save it into a local file.
343 async fn try_download_client_log(
344 &self,
345 to_path: &Path,
346 worker: &WorkerTask,
347 ) -> Result<(), Error>;
348
349 fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
350}
351
352#[async_trait::async_trait]
353impl PullReader for RemoteReader {
354 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
355 Arc::new(RemoteChunkReader::new(
356 self.backup_reader.clone(),
357 None,
358 crypt_mode,
359 HashMap::new(),
360 ))
361 }
362
363 async fn load_file_into(
364 &self,
365 filename: &str,
366 into: &Path,
367 worker: &WorkerTask,
368 ) -> Result<Option<DataBlob>, Error> {
369 let mut tmp_file = std::fs::OpenOptions::new()
370 .write(true)
371 .create(true)
372 .truncate(true)
373 .read(true)
374 .open(into)?;
375 let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
376 if let Err(err) = download_result {
377 match err.downcast_ref::<HttpError>() {
378 Some(HttpError { code, message }) => match *code {
379 StatusCode::NOT_FOUND => {
380 task_log!(
381 worker,
382 "skipping snapshot {} - vanished since start of sync",
383 &self.dir,
384 );
385 return Ok(None);
386 }
387 _ => {
388 bail!("HTTP error {code} - {message}");
389 }
390 },
391 None => {
392 return Err(err);
393 }
394 };
395 };
396 tmp_file.rewind()?;
397 Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
398 }
399
400 async fn try_download_client_log(
401 &self,
402 to_path: &Path,
403 worker: &WorkerTask,
404 ) -> Result<(), Error> {
405 let mut tmp_path = to_path.to_owned();
406 tmp_path.set_extension("tmp");
407
408 let tmpfile = std::fs::OpenOptions::new()
409 .write(true)
410 .create(true)
411 .read(true)
412 .open(&tmp_path)?;
413
414 // Note: be silent if there is no log - only log successful download
415 if let Ok(()) = self
416 .backup_reader
417 .download(CLIENT_LOG_BLOB_NAME, tmpfile)
418 .await
419 {
420 if let Err(err) = std::fs::rename(&tmp_path, to_path) {
421 bail!("Atomic rename file {:?} failed - {}", to_path, err);
422 }
423 task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
424 }
425
426 Ok(())
427 }
428
429 fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
430 false
431 }
432}
433
076f36ec
HL
434#[async_trait::async_trait]
435impl PullReader for LocalReader {
436 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
437 Arc::new(LocalChunkReader::new(
438 self.datastore.clone(),
439 None,
440 crypt_mode,
441 ))
442 }
443
444 async fn load_file_into(
445 &self,
446 filename: &str,
447 into: &Path,
448 _worker: &WorkerTask,
449 ) -> Result<Option<DataBlob>, Error> {
450 let mut tmp_file = std::fs::OpenOptions::new()
451 .write(true)
452 .create(true)
453 .truncate(true)
454 .read(true)
455 .open(into)?;
456 let mut from_path = self.path.clone();
457 from_path.push(filename);
458 tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
459 tmp_file.rewind()?;
460 Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
461 }
462
463 async fn try_download_client_log(
464 &self,
465 _to_path: &Path,
466 _worker: &WorkerTask,
467 ) -> Result<(), Error> {
468 Ok(())
469 }
470
471 fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
472 self.datastore.name() == target_store_name
473 }
474}
475
05a52d01
HL
476/// Parameters for a pull operation.
477pub(crate) struct PullParameters {
478 /// Where data is pulled from
479 source: Arc<dyn PullSource>,
480 /// Where data should be pulled into
481 target: PullTarget,
29c56859 482 /// Owner of synced groups (needs to match local owner of pre-existing groups)
6e9e6c7a 483 owner: Authid,
29c56859 484 /// Whether to remove groups which exist locally, but not on the remote end
6e9e6c7a 485 remove_vanished: bool,
b9310489
FG
486 /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion)
487 max_depth: Option<usize>,
29c56859 488 /// Filters for reducing the pull scope
59c92736 489 group_filter: Vec<GroupFilter>,
9b67352a
SH
490 /// How many snapshots should be transferred at most (taking the newest N snapshots)
491 transfer_last: Option<usize>,
6e9e6c7a
FG
492}
493
494impl PullParameters {
29c56859 495 /// Creates a new instance of `PullParameters`.
5b6cb51d 496 pub(crate) fn new(
6e9e6c7a 497 store: &str,
c06c1b4b 498 ns: BackupNamespace,
05a52d01 499 remote: Option<&str>,
6e9e6c7a 500 remote_store: &str,
c06c1b4b 501 remote_ns: BackupNamespace,
6e9e6c7a
FG
502 owner: Authid,
503 remove_vanished: Option<bool>,
b9310489 504 max_depth: Option<usize>,
71e53463 505 group_filter: Option<Vec<GroupFilter>>,
2d5287fb 506 limit: RateLimitConfig,
9b67352a 507 transfer_last: Option<usize>,
6e9e6c7a 508 ) -> Result<Self, Error> {
66abc4cb
FG
509 if let Some(max_depth) = max_depth {
510 ns.check_max_depth(max_depth)?;
511 remote_ns.check_max_depth(max_depth)?;
05a52d01 512 };
61ef4ae8 513 let remove_vanished = remove_vanished.unwrap_or(false);
6e9e6c7a 514
05a52d01
HL
515 let source: Arc<dyn PullSource> = if let Some(remote) = remote {
516 let (remote_config, _digest) = pbs_config::remote::config()?;
517 let remote: Remote = remote_config.lookup("remote", remote)?;
6e9e6c7a 518
05a52d01
HL
519 let repo = BackupRepository::new(
520 Some(remote.config.auth_id.clone()),
521 Some(remote.config.host.clone()),
522 remote.config.port,
523 remote_store.to_string(),
524 );
525 let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
526 Arc::new(RemoteSource {
527 repo,
528 ns: remote_ns,
529 client,
530 })
531 } else {
076f36ec
HL
532 Arc::new(LocalSource {
533 store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
534 ns: remote_ns,
535 })
05a52d01
HL
536 };
537 let target = PullTarget {
538 store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
c06c1b4b 539 ns,
05a52d01
HL
540 };
541
59c92736
PH
542 let group_filter = match group_filter {
543 Some(f) => f,
544 None => Vec::<GroupFilter>::new(),
545 };
546
05a52d01 547 Ok(Self {
ee0ea735 548 source,
05a52d01 549 target,
ee0ea735
TL
550 owner,
551 remove_vanished,
c06c1b4b 552 max_depth,
ee0ea735 553 group_filter,
9b67352a 554 transfer_last,
ee0ea735 555 })
6e9e6c7a 556 }
6e9e6c7a
FG
557}
558
07ad6470 559async fn pull_index_chunks<I: IndexFile>(
998db639 560 worker: &WorkerTask,
05a52d01 561 chunk_reader: Arc<dyn AsyncReadChunk>,
07ad6470
DM
562 target: Arc<DataStore>,
563 index: I,
e2956c60 564 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 565) -> Result<(), Error> {
73b2cc49 566 use futures::stream::{self, StreamExt, TryStreamExt};
07ad6470 567
998db639
DM
568 let start_time = SystemTime::now();
569
ebbe4958
DM
570 let stream = stream::iter(
571 (0..index.index_count())
572 .map(|pos| index.chunk_info(pos).unwrap())
573 .filter(|info| {
574 let mut guard = downloaded_chunks.lock().unwrap();
575 let done = guard.contains(&info.digest);
576 if !done {
577 // Note: We mark a chunk as downloaded before its actually downloaded
578 // to avoid duplicate downloads.
579 guard.insert(info.digest);
580 }
581 !done
e2956c60 582 }),
ebbe4958 583 );
07ad6470 584
a71bc08f 585 let target2 = target.clone();
54417086 586 let verify_pool = ParallelHandler::new(
e2956c60
FG
587 "sync chunk writer",
588 4,
589 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
25877d05 590 // println!("verify and write {}", hex::encode(&digest));
54417086 591 chunk.verify_unencrypted(size as usize, &digest)?;
a71bc08f 592 target2.insert_chunk(&chunk, &digest)?;
54417086 593 Ok(())
e2956c60 594 },
54417086
DM
595 );
596
597 let verify_and_write_channel = verify_pool.channel();
998db639 598
54417086
DM
599 let bytes = Arc::new(AtomicUsize::new(0));
600
601 stream
73b2cc49 602 .map(|info| {
73b2cc49
DM
603 let target = Arc::clone(&target);
604 let chunk_reader = chunk_reader.clone();
54417086
DM
605 let bytes = Arc::clone(&bytes);
606 let verify_and_write_channel = verify_and_write_channel.clone();
73b2cc49
DM
607
608 Ok::<_, Error>(async move {
9a1b24b6 609 let chunk_exists = proxmox_async::runtime::block_in_place(|| {
e2956c60
FG
610 target.cond_touch_chunk(&info.digest, false)
611 })?;
73b2cc49 612 if chunk_exists {
25877d05 613 //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest));
73b2cc49
DM
614 return Ok::<_, Error>(());
615 }
25877d05 616 //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest));
73b2cc49 617 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
54417086 618 let raw_size = chunk.raw_size() as usize;
73b2cc49 619
998db639 620 // decode, verify and write in a separate threads to maximize throughput
9a1b24b6 621 proxmox_async::runtime::block_in_place(|| {
e2956c60
FG
622 verify_and_write_channel.send((chunk, info.digest, info.size()))
623 })?;
54417086
DM
624
625 bytes.fetch_add(raw_size, Ordering::SeqCst);
998db639
DM
626
627 Ok(())
e2956c60 628 })
73b2cc49
DM
629 })
630 .try_buffer_unordered(20)
631 .try_for_each(|_res| futures::future::ok(()))
54417086 632 .await?;
998db639 633
54417086 634 drop(verify_and_write_channel);
998db639 635
54417086 636 verify_pool.complete()?;
998db639
DM
637
638 let elapsed = start_time.elapsed()?.as_secs_f64();
639
54417086
DM
640 let bytes = bytes.load(Ordering::SeqCst);
641
1ec0d70d 642 task_log!(
ee0ea735 643 worker,
e2956c60
FG
644 "downloaded {} bytes ({:.2} MiB/s)",
645 bytes,
646 (bytes as f64) / (1024.0 * 1024.0 * elapsed)
1ec0d70d 647 );
07ad6470
DM
648
649 Ok(())
650}
651
e2956c60 652fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
2ce15934 653 if size != info.size {
e2956c60
FG
654 bail!(
655 "wrong size for file '{}' ({} != {})",
656 info.filename,
657 info.size,
658 size
659 );
2ce15934
FG
660 }
661
662 if csum != &info.csum {
663 bail!("wrong checksum for file '{}'", info.filename);
664 }
665
666 Ok(())
667}
668
29c56859
FG
669/// Pulls a single file referenced by a manifest.
670///
671/// Pulling an archive consists of the following steps:
05a52d01
HL
672/// - Load archive file into tmp file
673/// -- Load file into tmp file
674/// -- Verify tmp file checksum
29c56859
FG
675/// - if archive is an index, pull referenced chunks
676/// - Rename tmp file into real path
05a52d01
HL
677async fn pull_single_archive<'a>(
678 worker: &'a WorkerTask,
679 reader: Arc<dyn PullReader + 'a>,
680 snapshot: &'a pbs_datastore::BackupDir,
681 archive_info: &'a FileInfo,
e2956c60 682 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 683) -> Result<(), Error> {
2ce15934 684 let archive_name = &archive_info.filename;
c06c1b4b 685 let mut path = snapshot.full_path();
07ad6470
DM
686 path.push(archive_name);
687
688 let mut tmp_path = path.clone();
689 tmp_path.set_extension("tmp");
690
1ec0d70d
DM
691 task_log!(worker, "sync archive {}", archive_name);
692
05a52d01
HL
693 reader
694 .load_file_into(archive_name, &tmp_path, worker)
695 .await?;
07ad6470 696
05a52d01 697 let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
07ad6470
DM
698
699 match archive_type(archive_name)? {
700 ArchiveType::DynamicIndex => {
e2956c60
FG
701 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
702 format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
703 })?;
2ce15934
FG
704 let (csum, size) = index.compute_csum();
705 verify_archive(archive_info, &csum, size)?;
07ad6470 706
05a52d01
HL
707 if reader.skip_chunk_sync(snapshot.datastore().name()) {
708 task_log!(worker, "skipping chunk sync for same datastore");
709 } else {
710 pull_index_chunks(
711 worker,
712 reader.chunk_reader(archive_info.crypt_mode),
713 snapshot.datastore().clone(),
714 index,
715 downloaded_chunks,
716 )
717 .await?;
718 }
07ad6470
DM
719 }
720 ArchiveType::FixedIndex => {
e2956c60
FG
721 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
722 format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
723 })?;
2ce15934
FG
724 let (csum, size) = index.compute_csum();
725 verify_archive(archive_info, &csum, size)?;
07ad6470 726
05a52d01
HL
727 if reader.skip_chunk_sync(snapshot.datastore().name()) {
728 task_log!(worker, "skipping chunk sync for same datastore");
729 } else {
730 pull_index_chunks(
731 worker,
732 reader.chunk_reader(archive_info.crypt_mode),
733 snapshot.datastore().clone(),
734 index,
735 downloaded_chunks,
736 )
737 .await?;
738 }
07ad6470 739 }
2ce15934 740 ArchiveType::Blob => {
05a52d01 741 tmpfile.rewind()?;
ba0ccc59 742 let (csum, size) = sha256(&mut tmpfile)?;
2ce15934
FG
743 verify_archive(archive_info, &csum, size)?;
744 }
07ad6470
DM
745 }
746 if let Err(err) = std::fs::rename(&tmp_path, &path) {
747 bail!("Atomic rename file {:?} failed - {}", path, err);
748 }
749 Ok(())
750}
751
29c56859
FG
752/// Actual implementation of pulling a snapshot.
753///
754/// Pulling a snapshot consists of the following steps:
755/// - (Re)download the manifest
756/// -- if it matches, only download log and treat snapshot as already synced
757/// - Iterate over referenced files
758/// -- if file already exists, verify contents
759/// -- if not, pull it from the remote
760/// - Download log if not already existing
05a52d01
HL
761async fn pull_snapshot<'a>(
762 worker: &'a WorkerTask,
763 reader: Arc<dyn PullReader + 'a>,
764 snapshot: &'a pbs_datastore::BackupDir,
e2956c60 765 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 766) -> Result<(), Error> {
c06c1b4b 767 let mut manifest_name = snapshot.full_path();
07ad6470
DM
768 manifest_name.push(MANIFEST_BLOB_NAME);
769
c06c1b4b 770 let mut client_log_name = snapshot.full_path();
1610c45a
DM
771 client_log_name.push(CLIENT_LOG_BLOB_NAME);
772
07ad6470
DM
773 let mut tmp_manifest_name = manifest_name.clone();
774 tmp_manifest_name.set_extension("tmp");
05a52d01
HL
775 let tmp_manifest_blob;
776 if let Some(data) = reader
777 .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
778 .await?
779 {
780 tmp_manifest_blob = data;
781 } else {
782 return Ok(());
783 }
07ad6470
DM
784
785 if manifest_name.exists() {
6ef1b649 786 let manifest_blob = proxmox_lang::try_block!({
e2956c60 787 let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
87cdc327 788 format_err!("unable to open local manifest {manifest_name:?} - {err}")
e2956c60 789 })?;
07ad6470 790
39f18b30 791 let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
07ad6470 792 Ok(manifest_blob)
e2956c60
FG
793 })
794 .map_err(|err: Error| {
87cdc327 795 format_err!("unable to read local manifest {manifest_name:?} - {err}")
07ad6470
DM
796 })?;
797
798 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
1610c45a 799 if !client_log_name.exists() {
05a52d01
HL
800 reader
801 .try_download_client_log(&client_log_name, worker)
802 .await?;
803 };
1ec0d70d 804 task_log!(worker, "no data changes");
e0085e66 805 let _ = std::fs::remove_file(&tmp_manifest_name);
07ad6470
DM
806 return Ok(()); // nothing changed
807 }
808 }
809
810 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
811
07ad6470 812 for item in manifest.files() {
c06c1b4b 813 let mut path = snapshot.full_path();
07ad6470
DM
814 path.push(&item.filename);
815
816 if path.exists() {
817 match archive_type(&item.filename)? {
818 ArchiveType::DynamicIndex => {
819 let index = DynamicIndexReader::open(&path)?;
820 let (csum, size) = index.compute_csum();
821 match manifest.verify_file(&item.filename, &csum, size) {
822 Ok(_) => continue,
823 Err(err) => {
1ec0d70d 824 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
825 }
826 }
827 }
828 ArchiveType::FixedIndex => {
829 let index = FixedIndexReader::open(&path)?;
830 let (csum, size) = index.compute_csum();
831 match manifest.verify_file(&item.filename, &csum, size) {
832 Ok(_) => continue,
833 Err(err) => {
1ec0d70d 834 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
835 }
836 }
837 }
838 ArchiveType::Blob => {
839 let mut tmpfile = std::fs::File::open(&path)?;
ba0ccc59 840 let (csum, size) = sha256(&mut tmpfile)?;
07ad6470
DM
841 match manifest.verify_file(&item.filename, &csum, size) {
842 Ok(_) => continue,
843 Err(err) => {
1ec0d70d 844 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
845 }
846 }
847 }
848 }
849 }
850
851 pull_single_archive(
852 worker,
05a52d01 853 reader.clone(),
07ad6470 854 snapshot,
9a37bd6c 855 item,
ebbe4958 856 downloaded_chunks.clone(),
e2956c60
FG
857 )
858 .await?;
07ad6470
DM
859 }
860
861 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
862 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
863 }
864
1610c45a 865 if !client_log_name.exists() {
05a52d01
HL
866 reader
867 .try_download_client_log(&client_log_name, worker)
868 .await?;
869 };
fe79687c
TL
870 snapshot
871 .cleanup_unreferenced_files(&manifest)
872 .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
07ad6470
DM
873
874 Ok(())
875}
876
c06c1b4b
FG
877/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
878///
05a52d01
HL
879/// The `reader` is configured to read from the source backup directory, while the
880/// `snapshot` is pointing to the local datastore and target namespace.
881async fn pull_snapshot_from<'a>(
882 worker: &'a WorkerTask,
883 reader: Arc<dyn PullReader + 'a>,
884 snapshot: &'a pbs_datastore::BackupDir,
e2956c60 885 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 886) -> Result<(), Error> {
c06c1b4b
FG
887 let (_path, is_new, _snap_lock) = snapshot
888 .datastore()
1afce610 889 .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
07ad6470
DM
890
891 if is_new {
1afce610 892 task_log!(worker, "sync snapshot {}", snapshot.dir());
07ad6470 893
c06c1b4b 894 if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
1afce610
FG
895 if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
896 snapshot.backup_ns(),
897 snapshot.as_ref(),
898 true,
899 ) {
1ec0d70d 900 task_log!(worker, "cleanup error - {}", cleanup_err);
07ad6470
DM
901 }
902 return Err(err);
903 }
1afce610 904 task_log!(worker, "sync snapshot {} done", snapshot.dir());
07ad6470 905 } else {
1afce610 906 task_log!(worker, "re-sync snapshot {}", snapshot.dir());
c06c1b4b 907 pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
07ad6470
DM
908 }
909
910 Ok(())
911}
912
40a57cfa 913#[derive(PartialEq, Eq)]
71db1615
SH
914enum SkipReason {
915 AlreadySynced,
916 TransferLast,
917}
918
40a57cfa
FG
919impl std::fmt::Display for SkipReason {
920 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
921 write!(
922 f,
923 "{}",
924 match self {
925 SkipReason::AlreadySynced => "older than the newest local snapshot",
926 SkipReason::TransferLast => "due to transfer-last",
927 }
928 )
929 }
930}
931
d2354a16
DC
932struct SkipInfo {
933 oldest: i64,
934 newest: i64,
935 count: u64,
71db1615 936 skip_reason: SkipReason,
d2354a16
DC
937}
938
939impl SkipInfo {
71db1615
SH
940 fn new(skip_reason: SkipReason) -> Self {
941 SkipInfo {
942 oldest: i64::MAX,
943 newest: i64::MIN,
944 count: 0,
945 skip_reason,
946 }
947 }
948
949 fn reset(&mut self) {
950 self.count = 0;
951 self.oldest = i64::MAX;
952 self.newest = i64::MIN;
953 }
954
d2354a16
DC
955 fn update(&mut self, backup_time: i64) {
956 self.count += 1;
957
958 if backup_time < self.oldest {
959 self.oldest = backup_time;
960 }
961
962 if backup_time > self.newest {
963 self.newest = backup_time;
964 }
965 }
966
967 fn affected(&self) -> Result<String, Error> {
968 match self.count {
969 0 => Ok(String::new()),
6ef1b649 970 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
ee0ea735
TL
971 _ => Ok(format!(
972 "{} .. {}",
973 proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
974 proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
975 )),
d2354a16
DC
976 }
977 }
978}
979
980impl std::fmt::Display for SkipInfo {
981 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
982 write!(
983 f,
71db1615 984 "skipped: {} snapshot(s) ({}) - {}",
d2354a16 985 self.count,
71db1615 986 self.affected().map_err(|_| std::fmt::Error)?,
40a57cfa 987 self.skip_reason,
d2354a16
DC
988 )
989 }
990}
991
29c56859
FG
992/// Pulls a group according to `params`.
993///
994/// Pulling a group consists of the following steps:
c06c1b4b
FG
995/// - Query the list of snapshots available for this group in the source namespace on the remote
996/// - Sort by snapshot time
29c56859
FG
997/// - Get last snapshot timestamp on local datastore
998/// - Iterate over list of snapshots
29c56859
FG
999/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
1000/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
1001///
05a52d01 1002/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
c06c1b4b
FG
1003/// remote when querying snapshots. This allows us to interact with old remotes that don't have
1004/// namespace support yet.
1005///
29c56859
FG
1006/// Permission checks:
1007/// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
1008/// - local group owner is already checked by pull_store
aa073917 1009async fn pull_group(
07ad6470 1010 worker: &WorkerTask,
6e9e6c7a 1011 params: &PullParameters,
05a52d01
HL
1012 source_namespace: &BackupNamespace,
1013 group: &BackupGroup,
fc8920e3 1014 progress: &mut StoreProgress,
07ad6470 1015) -> Result<(), Error> {
71db1615
SH
1016 let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
1017 let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
d2354a16 1018
05a52d01
HL
1019 let mut raw_list: Vec<BackupDir> = params
1020 .source
1021 .list_backup_dirs(source_namespace, group, worker)
1022 .await?;
1023 raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
1024
1025 let total_amount = raw_list.len();
9b67352a
SH
1026
1027 let cutoff = params
1028 .transfer_last
1029 .map(|count| total_amount.saturating_sub(count))
1030 .unwrap_or_default();
1031
05a52d01 1032 let target_ns = source_namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
86f6f741 1033
05a52d01
HL
1034 let mut source_snapshots = HashSet::new();
1035 let last_sync_time = params
1036 .target
1037 .store
1038 .last_successful_backup(&target_ns, group)?
1039 .unwrap_or(i64::MIN);
1040
1041 let list: Vec<BackupDir> = raw_list
1042 .into_iter()
1043 .enumerate()
1044 .filter(|&(pos, ref dir)| {
1045 source_snapshots.insert(dir.time);
1046 if last_sync_time > dir.time {
1047 already_synced_skip_info.update(dir.time);
1048 return false;
1049 } else if already_synced_skip_info.count > 0 {
1050 task_log!(worker, "{}", already_synced_skip_info);
1051 already_synced_skip_info.reset();
1052 return true;
1053 }
0081903f 1054
05a52d01
HL
1055 if pos < cutoff && last_sync_time != dir.time {
1056 transfer_last_skip_info.update(dir.time);
1057 return false;
1058 } else if transfer_last_skip_info.count > 0 {
1059 task_log!(worker, "{}", transfer_last_skip_info);
1060 transfer_last_skip_info.reset();
1061 }
1062 true
1063 })
1064 .map(|(_, dir)| dir)
1065 .collect();
07ad6470 1066
05a52d01
HL
1067 // start with 65536 chunks (up to 256 GiB)
1068 let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
07ad6470 1069
05a52d01 1070 progress.group_snapshots = list.len() as u64;
07ad6470 1071
05a52d01
HL
1072 for (pos, from_snapshot) in list.into_iter().enumerate() {
1073 let to_snapshot = params
1074 .target
1075 .store
1076 .backup_dir(target_ns.clone(), from_snapshot.clone())?;
c06c1b4b 1077
05a52d01
HL
1078 let reader = params
1079 .source
1080 .reader(source_namespace, &from_snapshot)
1081 .await?;
1082 let result =
1083 pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
7b8aa893 1084
fc8920e3 1085 progress.done_snapshots = pos as u64 + 1;
1ec0d70d 1086 task_log!(worker, "percentage done: {}", progress);
7b8aa893
DM
1087
1088 result?; // stop on error
07ad6470
DM
1089 }
1090
6e9e6c7a 1091 if params.remove_vanished {
05a52d01
HL
1092 let group = params
1093 .target
1094 .store
1095 .backup_group(target_ns.clone(), group.clone());
6da20161 1096 let local_list = group.list_backups()?;
07ad6470 1097 for info in local_list {
1afce610 1098 let snapshot = info.backup_dir;
05a52d01 1099 if source_snapshots.contains(&snapshot.backup_time()) {
e2956c60
FG
1100 continue;
1101 }
1afce610 1102 if snapshot.is_protected() {
34339261
DC
1103 task_log!(
1104 worker,
e3ea5770 1105 "don't delete vanished snapshot {} (protected)",
1afce610 1106 snapshot.dir()
34339261
DC
1107 );
1108 continue;
1109 }
1afce610 1110 task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
db87d93e 1111 params
05a52d01 1112 .target
db87d93e 1113 .store
1afce610 1114 .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
07ad6470
DM
1115 }
1116 }
1117
1118 Ok(())
1119}
1120
abd82485 1121fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
c06c1b4b 1122 let mut created = false;
05a52d01 1123 let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
c06c1b4b 1124
05a52d01
HL
1125 if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
1126 check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
77bd14f6
FG
1127 .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
1128
ea2e91e5
FG
1129 let name = match ns.components().last() {
1130 Some(name) => name.to_owned(),
1131 None => {
1132 bail!("Failed to determine last component of namespace.");
c06c1b4b 1133 }
ea2e91e5
FG
1134 };
1135
05a52d01 1136 if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
abd82485 1137 bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
c06c1b4b 1138 }
ea2e91e5 1139 created = true;
c06c1b4b
FG
1140 }
1141
abd82485 1142 check_ns_privs(
05a52d01 1143 params.target.store.name(),
abd82485
FG
1144 ns,
1145 &params.owner,
1146 PRIV_DATASTORE_BACKUP,
1147 )
1148 .map_err(|err| format_err!("sync into {store_ns_str} not allowed - {err}"))?;
c06c1b4b
FG
1149
1150 Ok(created)
1151}
1152
1153fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
05a52d01 1154 check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
77bd14f6 1155 .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
ea2e91e5 1156
05a52d01
HL
1157 params
1158 .target
1159 .store
1160 .remove_namespace_recursive(local_ns, true)
c06c1b4b
FG
1161}
1162
1163fn check_and_remove_vanished_ns(
1164 worker: &WorkerTask,
1165 params: &PullParameters,
1166 synced_ns: HashSet<BackupNamespace>,
1167) -> Result<bool, Error> {
1168 let mut errors = false;
1169 let user_info = CachedUserInfo::new()?;
1170
87be232d
FG
1171 // clamp like remote does so that we don't list more than we can ever have synced.
1172 let max_depth = params
1173 .max_depth
05a52d01 1174 .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
87be232d 1175
c06c1b4b 1176 let mut local_ns_list: Vec<BackupNamespace> = params
05a52d01 1177 .target
c06c1b4b 1178 .store
05a52d01 1179 .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
c06c1b4b 1180 .filter(|ns| {
abd82485 1181 let user_privs =
05a52d01 1182 user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
c06c1b4b
FG
1183 user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
1184 })
1185 .collect();
1186
1187 // children first!
1188 local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
1189
1190 for local_ns in local_ns_list {
05a52d01 1191 if local_ns == params.target.ns {
c06c1b4b
FG
1192 continue;
1193 }
1194
1195 if synced_ns.contains(&local_ns) {
1196 continue;
1197 }
1198
1199 if local_ns.is_root() {
1200 continue;
1201 }
1202 match check_and_remove_ns(params, &local_ns) {
1203 Ok(true) => task_log!(worker, "Removed namespace {}", local_ns),
1204 Ok(false) => task_log!(
1205 worker,
1206 "Did not remove namespace {} - protected snapshots remain",
1207 local_ns
1208 ),
1209 Err(err) => {
1210 task_log!(worker, "Failed to remove namespace {} - {}", local_ns, err);
1211 errors = true;
1212 }
1213 }
1214 }
1215
1216 Ok(errors)
1217}
1218
29c56859
FG
1219/// Pulls a store according to `params`.
1220///
1221/// Pulling a store consists of the following steps:
c06c1b4b
FG
1222/// - Query list of namespaces on the remote
1223/// - Iterate list
1224/// -- create sub-NS if needed (and allowed)
1225/// -- attempt to pull each NS in turn
1226/// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote
1227///
1228/// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is
1229/// passed to the remote at all to allow pulling from remotes which have no notion of namespaces.
29c56859
FG
1230///
1231/// Permission checks:
c06c1b4b
FG
1232/// - access to local datastore, namespace anchor and remote entry need to be checked at call site
1233/// - remote namespaces are filtered by remote
1234/// - creation and removal of sub-NS checked here
1235/// - access to sub-NS checked here
5b6cb51d 1236pub(crate) async fn pull_store(
07ad6470 1237 worker: &WorkerTask,
d9aad37f 1238 mut params: PullParameters,
07ad6470 1239) -> Result<(), Error> {
07ad6470 1240 // explicit create shared lock to prevent GC on newly created chunks
05a52d01 1241 let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
7a3e777d 1242 let mut errors = false;
07ad6470 1243
7a3e777d 1244 let old_max_depth = params.max_depth;
05a52d01
HL
1245 let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
1246 vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
c06c1b4b 1247 } else {
05a52d01
HL
1248 params
1249 .source
1250 .list_namespaces(&mut params.max_depth, worker)
1251 .await?
c06c1b4b 1252 };
05a52d01
HL
1253
1254 let ns_layers_to_be_pulled = namespaces
1255 .iter()
1256 .map(BackupNamespace::depth)
1257 .max()
1258 .map_or(0, |v| v - params.source.get_ns().depth());
1259 let target_depth = params.target.ns.depth();
1260
1261 if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
1262 bail!(
1263 "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
1264 ns_layers_to_be_pulled,
1265 target_depth,
1266 MAX_NAMESPACE_DEPTH
1267 );
1268 }
1269
7a3e777d 1270 errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
05a52d01 1271 namespaces.sort_unstable_by_key(|a| a.name_len());
c06c1b4b
FG
1272
1273 let (mut groups, mut snapshots) = (0, 0);
1274 let mut synced_ns = HashSet::with_capacity(namespaces.len());
c06c1b4b
FG
1275
1276 for namespace in namespaces {
4cc4ea64 1277 let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
abd82485 1278
05a52d01
HL
1279 let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
1280 let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
c06c1b4b
FG
1281
1282 task_log!(worker, "----");
1283 task_log!(
1284 worker,
1285 "Syncing {} into {}",
abd82485
FG
1286 source_store_ns_str,
1287 target_store_ns_str
c06c1b4b
FG
1288 );
1289
1290 synced_ns.insert(target_ns.clone());
1291
abd82485 1292 match check_and_create_ns(&params, &target_ns) {
c06c1b4b
FG
1293 Ok(true) => task_log!(worker, "Created namespace {}", target_ns),
1294 Ok(false) => {}
1295 Err(err) => {
1296 task_log!(
1297 worker,
1298 "Cannot sync {} into {} - {}",
abd82485
FG
1299 source_store_ns_str,
1300 target_store_ns_str,
c06c1b4b
FG
1301 err,
1302 );
1303 errors = true;
1304 continue;
1305 }
1306 }
1307
05a52d01 1308 match pull_ns(worker, &namespace, &mut params).await {
c06c1b4b
FG
1309 Ok((ns_progress, ns_errors)) => {
1310 errors |= ns_errors;
1311
b9310489 1312 if params.max_depth != Some(0) {
c06c1b4b
FG
1313 groups += ns_progress.done_groups;
1314 snapshots += ns_progress.done_snapshots;
1315 task_log!(
1316 worker,
1317 "Finished syncing namespace {}, current progress: {} groups, {} snapshots",
1318 namespace,
1319 groups,
1320 snapshots,
1321 );
1322 }
1323 }
1324 Err(err) => {
1325 errors = true;
1326 task_log!(
1327 worker,
1328 "Encountered errors while syncing namespace {} - {}",
05a52d01 1329 &namespace,
c06c1b4b
FG
1330 err,
1331 );
1332 }
1333 };
1334 }
1335
1336 if params.remove_vanished {
d9aad37f 1337 errors |= check_and_remove_vanished_ns(worker, &params, synced_ns)?;
c06c1b4b
FG
1338 }
1339
1340 if errors {
1341 bail!("sync failed with some errors.");
1342 }
1343
1344 Ok(())
1345}
1346
1347/// Pulls a namespace according to `params`.
1348///
1349/// Pulling a namespace consists of the following steps:
1350/// - Query list of groups on the remote (in `source_ns`)
1351/// - Filter list according to configured group filters
1352/// - Iterate list and attempt to pull each group in turn
1353/// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
1354/// not or no longer available on the remote
1355///
1356/// Permission checks:
1357/// - remote namespaces are filtered by remote
1358/// - owner check for vanished groups done here
5b6cb51d 1359pub(crate) async fn pull_ns(
c06c1b4b 1360 worker: &WorkerTask,
05a52d01
HL
1361 namespace: &BackupNamespace,
1362 params: &mut PullParameters,
c06c1b4b 1363) -> Result<(StoreProgress, bool), Error> {
05a52d01 1364 let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, &params.owner).await?;
07ad6470
DM
1365
1366 list.sort_unstable_by(|a, b| {
05a52d01 1367 let type_order = a.ty.cmp(&b.ty);
07ad6470 1368 if type_order == std::cmp::Ordering::Equal {
05a52d01 1369 a.id.cmp(&b.id)
07ad6470
DM
1370 } else {
1371 type_order
1372 }
1373 });
1374
59c92736
PH
1375 let unfiltered_count = list.len();
1376 let list: Vec<BackupGroup> = list
1377 .into_iter()
1378 .filter(|group| group.apply_filters(&params.group_filter))
1379 .collect();
1380 task_log!(
1381 worker,
1382 "found {} groups to sync (out of {} total)",
1383 list.len(),
1384 unfiltered_count
1385 );
71e53463 1386
07ad6470
DM
1387 let mut errors = false;
1388
05a52d01 1389 let mut new_groups = HashSet::new();
e2e7560d
FG
1390 for group in list.iter() {
1391 new_groups.insert(group.clone());
07ad6470
DM
1392 }
1393
fc8920e3
FG
1394 let mut progress = StoreProgress::new(list.len() as u64);
1395
05a52d01
HL
1396 let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
1397
e2e7560d 1398 for (done, group) in list.into_iter().enumerate() {
fc8920e3
FG
1399 progress.done_groups = done as u64;
1400 progress.done_snapshots = 0;
1401 progress.group_snapshots = 0;
7b8aa893 1402
133d718f
WB
1403 let (owner, _lock_guard) =
1404 match params
05a52d01 1405 .target
133d718f 1406 .store
c06c1b4b 1407 .create_locked_backup_group(&target_ns, &group, &params.owner)
133d718f
WB
1408 {
1409 Ok(result) => result,
1410 Err(err) => {
1411 task_log!(
1412 worker,
1413 "sync group {} failed - group lock failed: {}",
1414 &group,
1415 err
1416 );
05a52d01
HL
1417 errors = true;
1418 // do not stop here, instead continue
1419 task_log!(worker, "create_locked_backup_group failed");
133d718f
WB
1420 continue;
1421 }
1422 };
30f73fa2 1423
07ad6470 1424 // permission check
6e9e6c7a 1425 if params.owner != owner {
e2956c60 1426 // only the owner is allowed to create additional snapshots
1ec0d70d
DM
1427 task_log!(
1428 worker,
e2e7560d 1429 "sync group {} failed - owner check failed ({} != {})",
ee0ea735
TL
1430 &group,
1431 params.owner,
1432 owner
1ec0d70d 1433 );
7b8aa893 1434 errors = true; // do not stop here, instead continue
05a52d01 1435 } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
c06c1b4b 1436 {
ee0ea735 1437 task_log!(worker, "sync group {} failed - {}", &group, err,);
20813274 1438 errors = true; // do not stop here, instead continue
07ad6470
DM
1439 }
1440 }
1441
6e9e6c7a 1442 if params.remove_vanished {
6ef1b649 1443 let result: Result<(), Error> = proxmox_lang::try_block!({
05a52d01 1444 for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
249dde8b 1445 let local_group = local_group?;
e13303fc
FG
1446 let local_group = local_group.group();
1447 if new_groups.contains(local_group) {
e2956c60
FG
1448 continue;
1449 }
05a52d01 1450 let owner = params.target.store.get_owner(&target_ns, local_group)?;
df768ebe
FG
1451 if check_backup_owner(&owner, &params.owner).is_err() {
1452 continue;
1453 }
59c92736
PH
1454 if !local_group.apply_filters(&params.group_filter) {
1455 continue;
71e53463 1456 }
e13303fc 1457 task_log!(worker, "delete vanished group '{local_group}'",);
05a52d01
HL
1458 match params
1459 .target
1460 .store
1461 .remove_backup_group(&target_ns, local_group)
1462 {
ee0ea735 1463 Ok(true) => {}
34339261 1464 Ok(false) => {
ee0ea735
TL
1465 task_log!(
1466 worker,
1467 "kept some protected snapshots of group '{}'",
1468 local_group
1469 );
1470 }
34339261
DC
1471 Err(err) => {
1472 task_log!(worker, "{}", err);
1473 errors = true;
1474 }
07ad6470
DM
1475 }
1476 }
1477 Ok(())
1478 });
1479 if let Err(err) = result {
1ec0d70d 1480 task_log!(worker, "error during cleanup: {}", err);
07ad6470
DM
1481 errors = true;
1482 };
1483 }
1484
c06c1b4b 1485 Ok((progress, errors))
07ad6470 1486}