]> git.proxmox.com Git - proxmox-backup.git/blame - src/server/pull.rs
sync: fix source store+namespace printing
[proxmox-backup.git] / src / server / pull.rs
CommitLineData
07ad6470
DM
1//! Sync datastore from remote server
2
e2956c60 3use std::collections::{HashMap, HashSet};
076f36ec
HL
4use std::io::{Seek, Write};
5use std::path::{Path, PathBuf};
54417086 6use std::sync::atomic::{AtomicUsize, Ordering};
e2956c60
FG
7use std::sync::{Arc, Mutex};
8use std::time::SystemTime;
07ad6470 9
c23192d3 10use anyhow::{bail, format_err, Error};
6ef1b649 11use http::StatusCode;
05a52d01 12use proxmox_rest_server::WorkerTask;
6ef1b649 13use proxmox_router::HttpError;
05a52d01
HL
14use proxmox_sys::{task_log, task_warn};
15use serde_json::json;
c23192d3 16
2d5287fb 17use pbs_api_types::{
05a52d01
HL
18 print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
19 GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
7aeabff2 20 PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
2d5287fb 21};
05a52d01
HL
22use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
23use pbs_config::CachedUserInfo;
ea584a75
WB
24use pbs_datastore::data_blob::DataBlob;
25use pbs_datastore::dynamic_index::DynamicIndexReader;
26use pbs_datastore::fixed_index::FixedIndexReader;
27use pbs_datastore::index::IndexFile;
28use pbs_datastore::manifest::{
ee0ea735 29 archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
ea584a75 30};
05a52d01 31use pbs_datastore::read_chunk::AsyncReadChunk;
076f36ec
HL
32use pbs_datastore::{
33 check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
34};
ba0ccc59 35use pbs_tools::sha::sha256;
c23192d3 36
076f36ec 37use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
260147bd 38use crate::tools::parallel_handler::ParallelHandler;
07ad6470 39
05a52d01
HL
40struct RemoteReader {
41 backup_reader: Arc<BackupReader>,
42 dir: BackupDir,
43}
44
076f36ec
HL
45struct LocalReader {
46 _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
47 path: PathBuf,
48 datastore: Arc<DataStore>,
49}
50
05a52d01 51pub(crate) struct PullTarget {
6e9e6c7a 52 store: Arc<DataStore>,
c06c1b4b 53 ns: BackupNamespace,
05a52d01
HL
54}
55
56pub(crate) struct RemoteSource {
57 repo: BackupRepository,
58 ns: BackupNamespace,
59 client: HttpClient,
60}
61
076f36ec
HL
62pub(crate) struct LocalSource {
63 store: Arc<DataStore>,
64 ns: BackupNamespace,
65}
66
05a52d01
HL
67#[async_trait::async_trait]
68/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
69/// The trait includes methods for listing namespaces, groups, and backup directories,
70/// as well as retrieving a reader for reading data from the source
71trait PullSource: Send + Sync {
72 /// Lists namespaces from the source.
73 async fn list_namespaces(
74 &self,
75 max_depth: &mut Option<usize>,
76 worker: &WorkerTask,
77 ) -> Result<Vec<BackupNamespace>, Error>;
78
79 /// Lists groups within a specific namespace from the source.
80 async fn list_groups(
81 &self,
82 namespace: &BackupNamespace,
83 owner: &Authid,
84 ) -> Result<Vec<BackupGroup>, Error>;
85
86 /// Lists backup directories for a specific group within a specific namespace from the source.
87 async fn list_backup_dirs(
88 &self,
89 namespace: &BackupNamespace,
90 group: &BackupGroup,
91 worker: &WorkerTask,
92 ) -> Result<Vec<BackupDir>, Error>;
93 fn get_ns(&self) -> BackupNamespace;
4cc4ea64 94 fn get_store(&self) -> &str;
05a52d01
HL
95
96 /// Returns a reader for reading data from a specific backup directory.
97 async fn reader(
98 &self,
99 ns: &BackupNamespace,
100 dir: &BackupDir,
101 ) -> Result<Arc<dyn PullReader>, Error>;
102}
103
104#[async_trait::async_trait]
105impl PullSource for RemoteSource {
106 async fn list_namespaces(
107 &self,
108 max_depth: &mut Option<usize>,
109 worker: &WorkerTask,
110 ) -> Result<Vec<BackupNamespace>, Error> {
111 if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
112 return Ok(vec![self.ns.clone()]);
113 }
114
115 let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
116 let mut data = json!({});
117 if let Some(max_depth) = max_depth {
118 data["max-depth"] = json!(max_depth);
119 }
120
121 if !self.ns.is_root() {
122 data["parent"] = json!(self.ns);
123 }
124 self.client.login().await?;
125
126 let mut result = match self.client.get(&path, Some(data)).await {
127 Ok(res) => res,
128 Err(err) => match err.downcast_ref::<HttpError>() {
129 Some(HttpError { code, message }) => match code {
130 &StatusCode::NOT_FOUND => {
131 if self.ns.is_root() && max_depth.is_none() {
132 task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
133 task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
134 max_depth.replace(0);
135 } else {
136 bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
137 }
138
139 return Ok(vec![self.ns.clone()]);
140 }
141 _ => {
142 bail!("Querying namespaces failed - HTTP error {code} - {message}");
143 }
144 },
145 None => {
146 bail!("Querying namespaces failed - {err}");
147 }
148 },
149 };
150
151 let list: Vec<BackupNamespace> =
152 serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
153 .into_iter()
154 .map(|list_item| list_item.ns)
155 .collect();
156
157 Ok(list)
158 }
159
160 async fn list_groups(
161 &self,
162 namespace: &BackupNamespace,
163 _owner: &Authid,
164 ) -> Result<Vec<BackupGroup>, Error> {
165 let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
166
167 let args = if !namespace.is_root() {
168 Some(json!({ "ns": namespace.clone() }))
169 } else {
170 None
171 };
172
173 self.client.login().await?;
174 let mut result =
175 self.client.get(&path, args).await.map_err(|err| {
176 format_err!("Failed to retrieve backup groups from remote - {}", err)
177 })?;
178
179 Ok(
180 serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
181 .map_err(Error::from)?
182 .into_iter()
183 .map(|item| item.backup)
184 .collect::<Vec<BackupGroup>>(),
185 )
186 }
187
188 async fn list_backup_dirs(
189 &self,
b14e5dcb 190 namespace: &BackupNamespace,
05a52d01
HL
191 group: &BackupGroup,
192 worker: &WorkerTask,
193 ) -> Result<Vec<BackupDir>, Error> {
194 let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
195
196 let mut args = json!({
197 "backup-type": group.ty,
198 "backup-id": group.id,
199 });
200
b14e5dcb
FG
201 if !namespace.is_root() {
202 args["ns"] = serde_json::to_value(&namespace)?;
05a52d01
HL
203 }
204
205 self.client.login().await?;
206
207 let mut result = self.client.get(&path, Some(args)).await?;
208 let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
209 Ok(snapshot_list
210 .into_iter()
211 .filter_map(|item: SnapshotListItem| {
212 let snapshot = item.backup;
213 // in-progress backups can't be synced
214 if item.size.is_none() {
215 task_log!(
216 worker,
217 "skipping snapshot {} - in-progress backup",
218 snapshot
219 );
220 return None;
221 }
222
223 Some(snapshot)
224 })
225 .collect::<Vec<BackupDir>>())
226 }
227
228 fn get_ns(&self) -> BackupNamespace {
229 self.ns.clone()
230 }
231
4cc4ea64
FG
232 fn get_store(&self) -> &str {
233 &self.repo.store()
05a52d01
HL
234 }
235
236 async fn reader(
237 &self,
238 ns: &BackupNamespace,
239 dir: &BackupDir,
240 ) -> Result<Arc<dyn PullReader>, Error> {
241 let backup_reader =
242 BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
243 Ok(Arc::new(RemoteReader {
244 backup_reader,
245 dir: dir.clone(),
246 }))
247 }
248}
249
076f36ec
HL
250#[async_trait::async_trait]
251impl PullSource for LocalSource {
252 async fn list_namespaces(
253 &self,
254 max_depth: &mut Option<usize>,
255 _worker: &WorkerTask,
256 ) -> Result<Vec<BackupNamespace>, Error> {
257 ListNamespacesRecursive::new_max_depth(
258 self.store.clone(),
259 self.ns.clone(),
260 max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
261 )?
262 .collect()
263 }
264
265 async fn list_groups(
266 &self,
267 namespace: &BackupNamespace,
268 owner: &Authid,
269 ) -> Result<Vec<BackupGroup>, Error> {
270 Ok(ListAccessibleBackupGroups::new_with_privs(
271 &self.store,
272 namespace.clone(),
273 0,
7aeabff2
HL
274 Some(PRIV_DATASTORE_READ),
275 Some(PRIV_DATASTORE_BACKUP),
076f36ec
HL
276 Some(owner),
277 )?
278 .filter_map(Result::ok)
279 .map(|backup_group| backup_group.group().clone())
280 .collect::<Vec<pbs_api_types::BackupGroup>>())
281 }
282
283 async fn list_backup_dirs(
284 &self,
285 namespace: &BackupNamespace,
286 group: &BackupGroup,
287 _worker: &WorkerTask,
288 ) -> Result<Vec<BackupDir>, Error> {
289 Ok(self
290 .store
291 .backup_group(namespace.clone(), group.clone())
292 .iter_snapshots()?
293 .filter_map(Result::ok)
294 .map(|snapshot| snapshot.dir().to_owned())
295 .collect::<Vec<BackupDir>>())
296 }
297
298 fn get_ns(&self) -> BackupNamespace {
299 self.ns.clone()
300 }
301
4cc4ea64
FG
302 fn get_store(&self) -> &str {
303 self.store.name()
076f36ec
HL
304 }
305
306 async fn reader(
307 &self,
308 ns: &BackupNamespace,
309 dir: &BackupDir,
310 ) -> Result<Arc<dyn PullReader>, Error> {
311 let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
312 let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
313 &dir.full_path(),
314 "snapshot",
315 "locked by another operation",
316 )?;
317 Ok(Arc::new(LocalReader {
318 _dir_lock: Arc::new(Mutex::new(dir_lock)),
319 path: dir.full_path(),
320 datastore: dir.datastore().clone(),
321 }))
322 }
323}
324
05a52d01
HL
325#[async_trait::async_trait]
326/// `PullReader` is a trait that provides an interface for reading data from a source.
327/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
328trait PullReader: Send + Sync {
329 /// Returns a chunk reader with the specified encryption mode.
330 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
331
332 /// Asynchronously loads a file from the source into a local file.
333 /// `filename` is the name of the file to load from the source.
334 /// `into` is the path of the local file to load the source file into.
335 async fn load_file_into(
336 &self,
337 filename: &str,
338 into: &Path,
339 worker: &WorkerTask,
340 ) -> Result<Option<DataBlob>, Error>;
341
342 /// Tries to download the client log from the source and save it into a local file.
343 async fn try_download_client_log(
344 &self,
345 to_path: &Path,
346 worker: &WorkerTask,
347 ) -> Result<(), Error>;
348
349 fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
350}
351
352#[async_trait::async_trait]
353impl PullReader for RemoteReader {
354 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
355 Arc::new(RemoteChunkReader::new(
356 self.backup_reader.clone(),
357 None,
358 crypt_mode,
359 HashMap::new(),
360 ))
361 }
362
363 async fn load_file_into(
364 &self,
365 filename: &str,
366 into: &Path,
367 worker: &WorkerTask,
368 ) -> Result<Option<DataBlob>, Error> {
369 let mut tmp_file = std::fs::OpenOptions::new()
370 .write(true)
371 .create(true)
372 .truncate(true)
373 .read(true)
374 .open(into)?;
375 let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
376 if let Err(err) = download_result {
377 match err.downcast_ref::<HttpError>() {
378 Some(HttpError { code, message }) => match *code {
379 StatusCode::NOT_FOUND => {
380 task_log!(
381 worker,
382 "skipping snapshot {} - vanished since start of sync",
383 &self.dir,
384 );
385 return Ok(None);
386 }
387 _ => {
388 bail!("HTTP error {code} - {message}");
389 }
390 },
391 None => {
392 return Err(err);
393 }
394 };
395 };
396 tmp_file.rewind()?;
397 Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
398 }
399
400 async fn try_download_client_log(
401 &self,
402 to_path: &Path,
403 worker: &WorkerTask,
404 ) -> Result<(), Error> {
405 let mut tmp_path = to_path.to_owned();
406 tmp_path.set_extension("tmp");
407
408 let tmpfile = std::fs::OpenOptions::new()
409 .write(true)
410 .create(true)
411 .read(true)
412 .open(&tmp_path)?;
413
414 // Note: be silent if there is no log - only log successful download
415 if let Ok(()) = self
416 .backup_reader
417 .download(CLIENT_LOG_BLOB_NAME, tmpfile)
418 .await
419 {
420 if let Err(err) = std::fs::rename(&tmp_path, to_path) {
421 bail!("Atomic rename file {:?} failed - {}", to_path, err);
422 }
423 task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
424 }
425
426 Ok(())
427 }
428
429 fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
430 false
431 }
432}
433
076f36ec
HL
434#[async_trait::async_trait]
435impl PullReader for LocalReader {
436 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
437 Arc::new(LocalChunkReader::new(
438 self.datastore.clone(),
439 None,
440 crypt_mode,
441 ))
442 }
443
444 async fn load_file_into(
445 &self,
446 filename: &str,
447 into: &Path,
448 _worker: &WorkerTask,
449 ) -> Result<Option<DataBlob>, Error> {
450 let mut tmp_file = std::fs::OpenOptions::new()
451 .write(true)
452 .create(true)
453 .truncate(true)
454 .read(true)
455 .open(into)?;
456 let mut from_path = self.path.clone();
457 from_path.push(filename);
458 tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
459 tmp_file.rewind()?;
460 Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
461 }
462
463 async fn try_download_client_log(
464 &self,
465 _to_path: &Path,
466 _worker: &WorkerTask,
467 ) -> Result<(), Error> {
468 Ok(())
469 }
470
471 fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
472 self.datastore.name() == target_store_name
473 }
474}
475
05a52d01
HL
476/// Parameters for a pull operation.
477pub(crate) struct PullParameters {
478 /// Where data is pulled from
479 source: Arc<dyn PullSource>,
480 /// Where data should be pulled into
481 target: PullTarget,
29c56859 482 /// Owner of synced groups (needs to match local owner of pre-existing groups)
6e9e6c7a 483 owner: Authid,
29c56859 484 /// Whether to remove groups which exist locally, but not on the remote end
6e9e6c7a 485 remove_vanished: bool,
b9310489
FG
486 /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion)
487 max_depth: Option<usize>,
29c56859 488 /// Filters for reducing the pull scope
71e53463 489 group_filter: Option<Vec<GroupFilter>>,
9b67352a
SH
490 /// How many snapshots should be transferred at most (taking the newest N snapshots)
491 transfer_last: Option<usize>,
6e9e6c7a
FG
492}
493
494impl PullParameters {
29c56859 495 /// Creates a new instance of `PullParameters`.
5b6cb51d 496 pub(crate) fn new(
6e9e6c7a 497 store: &str,
c06c1b4b 498 ns: BackupNamespace,
05a52d01 499 remote: Option<&str>,
6e9e6c7a 500 remote_store: &str,
c06c1b4b 501 remote_ns: BackupNamespace,
6e9e6c7a
FG
502 owner: Authid,
503 remove_vanished: Option<bool>,
b9310489 504 max_depth: Option<usize>,
71e53463 505 group_filter: Option<Vec<GroupFilter>>,
2d5287fb 506 limit: RateLimitConfig,
9b67352a 507 transfer_last: Option<usize>,
6e9e6c7a 508 ) -> Result<Self, Error> {
66abc4cb
FG
509 if let Some(max_depth) = max_depth {
510 ns.check_max_depth(max_depth)?;
511 remote_ns.check_max_depth(max_depth)?;
05a52d01 512 };
61ef4ae8 513 let remove_vanished = remove_vanished.unwrap_or(false);
6e9e6c7a 514
05a52d01
HL
515 let source: Arc<dyn PullSource> = if let Some(remote) = remote {
516 let (remote_config, _digest) = pbs_config::remote::config()?;
517 let remote: Remote = remote_config.lookup("remote", remote)?;
6e9e6c7a 518
05a52d01
HL
519 let repo = BackupRepository::new(
520 Some(remote.config.auth_id.clone()),
521 Some(remote.config.host.clone()),
522 remote.config.port,
523 remote_store.to_string(),
524 );
525 let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
526 Arc::new(RemoteSource {
527 repo,
528 ns: remote_ns,
529 client,
530 })
531 } else {
076f36ec
HL
532 Arc::new(LocalSource {
533 store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
534 ns: remote_ns,
535 })
05a52d01
HL
536 };
537 let target = PullTarget {
538 store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
c06c1b4b 539 ns,
05a52d01
HL
540 };
541
542 Ok(Self {
ee0ea735 543 source,
05a52d01 544 target,
ee0ea735
TL
545 owner,
546 remove_vanished,
c06c1b4b 547 max_depth,
ee0ea735 548 group_filter,
9b67352a 549 transfer_last,
ee0ea735 550 })
6e9e6c7a 551 }
6e9e6c7a
FG
552}
553
07ad6470 554async fn pull_index_chunks<I: IndexFile>(
998db639 555 worker: &WorkerTask,
05a52d01 556 chunk_reader: Arc<dyn AsyncReadChunk>,
07ad6470
DM
557 target: Arc<DataStore>,
558 index: I,
e2956c60 559 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 560) -> Result<(), Error> {
73b2cc49 561 use futures::stream::{self, StreamExt, TryStreamExt};
07ad6470 562
998db639
DM
563 let start_time = SystemTime::now();
564
ebbe4958
DM
565 let stream = stream::iter(
566 (0..index.index_count())
567 .map(|pos| index.chunk_info(pos).unwrap())
568 .filter(|info| {
569 let mut guard = downloaded_chunks.lock().unwrap();
570 let done = guard.contains(&info.digest);
571 if !done {
572 // Note: We mark a chunk as downloaded before its actually downloaded
573 // to avoid duplicate downloads.
574 guard.insert(info.digest);
575 }
576 !done
e2956c60 577 }),
ebbe4958 578 );
07ad6470 579
a71bc08f 580 let target2 = target.clone();
54417086 581 let verify_pool = ParallelHandler::new(
e2956c60
FG
582 "sync chunk writer",
583 4,
584 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
25877d05 585 // println!("verify and write {}", hex::encode(&digest));
54417086 586 chunk.verify_unencrypted(size as usize, &digest)?;
a71bc08f 587 target2.insert_chunk(&chunk, &digest)?;
54417086 588 Ok(())
e2956c60 589 },
54417086
DM
590 );
591
592 let verify_and_write_channel = verify_pool.channel();
998db639 593
54417086
DM
594 let bytes = Arc::new(AtomicUsize::new(0));
595
596 stream
73b2cc49 597 .map(|info| {
73b2cc49
DM
598 let target = Arc::clone(&target);
599 let chunk_reader = chunk_reader.clone();
54417086
DM
600 let bytes = Arc::clone(&bytes);
601 let verify_and_write_channel = verify_and_write_channel.clone();
73b2cc49
DM
602
603 Ok::<_, Error>(async move {
9a1b24b6 604 let chunk_exists = proxmox_async::runtime::block_in_place(|| {
e2956c60
FG
605 target.cond_touch_chunk(&info.digest, false)
606 })?;
73b2cc49 607 if chunk_exists {
25877d05 608 //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest));
73b2cc49
DM
609 return Ok::<_, Error>(());
610 }
25877d05 611 //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest));
73b2cc49 612 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
54417086 613 let raw_size = chunk.raw_size() as usize;
73b2cc49 614
998db639 615 // decode, verify and write in a separate threads to maximize throughput
9a1b24b6 616 proxmox_async::runtime::block_in_place(|| {
e2956c60
FG
617 verify_and_write_channel.send((chunk, info.digest, info.size()))
618 })?;
54417086
DM
619
620 bytes.fetch_add(raw_size, Ordering::SeqCst);
998db639
DM
621
622 Ok(())
e2956c60 623 })
73b2cc49
DM
624 })
625 .try_buffer_unordered(20)
626 .try_for_each(|_res| futures::future::ok(()))
54417086 627 .await?;
998db639 628
54417086 629 drop(verify_and_write_channel);
998db639 630
54417086 631 verify_pool.complete()?;
998db639
DM
632
633 let elapsed = start_time.elapsed()?.as_secs_f64();
634
54417086
DM
635 let bytes = bytes.load(Ordering::SeqCst);
636
1ec0d70d 637 task_log!(
ee0ea735 638 worker,
e2956c60
FG
639 "downloaded {} bytes ({:.2} MiB/s)",
640 bytes,
641 (bytes as f64) / (1024.0 * 1024.0 * elapsed)
1ec0d70d 642 );
07ad6470
DM
643
644 Ok(())
645}
646
e2956c60 647fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
2ce15934 648 if size != info.size {
e2956c60
FG
649 bail!(
650 "wrong size for file '{}' ({} != {})",
651 info.filename,
652 info.size,
653 size
654 );
2ce15934
FG
655 }
656
657 if csum != &info.csum {
658 bail!("wrong checksum for file '{}'", info.filename);
659 }
660
661 Ok(())
662}
663
29c56859
FG
664/// Pulls a single file referenced by a manifest.
665///
666/// Pulling an archive consists of the following steps:
05a52d01
HL
667/// - Load archive file into tmp file
668/// -- Load file into tmp file
669/// -- Verify tmp file checksum
29c56859
FG
670/// - if archive is an index, pull referenced chunks
671/// - Rename tmp file into real path
05a52d01
HL
672async fn pull_single_archive<'a>(
673 worker: &'a WorkerTask,
674 reader: Arc<dyn PullReader + 'a>,
675 snapshot: &'a pbs_datastore::BackupDir,
676 archive_info: &'a FileInfo,
e2956c60 677 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 678) -> Result<(), Error> {
2ce15934 679 let archive_name = &archive_info.filename;
c06c1b4b 680 let mut path = snapshot.full_path();
07ad6470
DM
681 path.push(archive_name);
682
683 let mut tmp_path = path.clone();
684 tmp_path.set_extension("tmp");
685
1ec0d70d
DM
686 task_log!(worker, "sync archive {}", archive_name);
687
05a52d01
HL
688 reader
689 .load_file_into(archive_name, &tmp_path, worker)
690 .await?;
07ad6470 691
05a52d01 692 let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
07ad6470
DM
693
694 match archive_type(archive_name)? {
695 ArchiveType::DynamicIndex => {
e2956c60
FG
696 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
697 format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
698 })?;
2ce15934
FG
699 let (csum, size) = index.compute_csum();
700 verify_archive(archive_info, &csum, size)?;
07ad6470 701
05a52d01
HL
702 if reader.skip_chunk_sync(snapshot.datastore().name()) {
703 task_log!(worker, "skipping chunk sync for same datastore");
704 } else {
705 pull_index_chunks(
706 worker,
707 reader.chunk_reader(archive_info.crypt_mode),
708 snapshot.datastore().clone(),
709 index,
710 downloaded_chunks,
711 )
712 .await?;
713 }
07ad6470
DM
714 }
715 ArchiveType::FixedIndex => {
e2956c60
FG
716 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
717 format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
718 })?;
2ce15934
FG
719 let (csum, size) = index.compute_csum();
720 verify_archive(archive_info, &csum, size)?;
07ad6470 721
05a52d01
HL
722 if reader.skip_chunk_sync(snapshot.datastore().name()) {
723 task_log!(worker, "skipping chunk sync for same datastore");
724 } else {
725 pull_index_chunks(
726 worker,
727 reader.chunk_reader(archive_info.crypt_mode),
728 snapshot.datastore().clone(),
729 index,
730 downloaded_chunks,
731 )
732 .await?;
733 }
07ad6470 734 }
2ce15934 735 ArchiveType::Blob => {
05a52d01 736 tmpfile.rewind()?;
ba0ccc59 737 let (csum, size) = sha256(&mut tmpfile)?;
2ce15934
FG
738 verify_archive(archive_info, &csum, size)?;
739 }
07ad6470
DM
740 }
741 if let Err(err) = std::fs::rename(&tmp_path, &path) {
742 bail!("Atomic rename file {:?} failed - {}", path, err);
743 }
744 Ok(())
745}
746
29c56859
FG
747/// Actual implementation of pulling a snapshot.
748///
749/// Pulling a snapshot consists of the following steps:
750/// - (Re)download the manifest
751/// -- if it matches, only download log and treat snapshot as already synced
752/// - Iterate over referenced files
753/// -- if file already exists, verify contents
754/// -- if not, pull it from the remote
755/// - Download log if not already existing
05a52d01
HL
756async fn pull_snapshot<'a>(
757 worker: &'a WorkerTask,
758 reader: Arc<dyn PullReader + 'a>,
759 snapshot: &'a pbs_datastore::BackupDir,
e2956c60 760 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 761) -> Result<(), Error> {
c06c1b4b 762 let mut manifest_name = snapshot.full_path();
07ad6470
DM
763 manifest_name.push(MANIFEST_BLOB_NAME);
764
c06c1b4b 765 let mut client_log_name = snapshot.full_path();
1610c45a
DM
766 client_log_name.push(CLIENT_LOG_BLOB_NAME);
767
07ad6470
DM
768 let mut tmp_manifest_name = manifest_name.clone();
769 tmp_manifest_name.set_extension("tmp");
05a52d01
HL
770 let tmp_manifest_blob;
771 if let Some(data) = reader
772 .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
773 .await?
774 {
775 tmp_manifest_blob = data;
776 } else {
777 return Ok(());
778 }
07ad6470
DM
779
780 if manifest_name.exists() {
6ef1b649 781 let manifest_blob = proxmox_lang::try_block!({
e2956c60 782 let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
87cdc327 783 format_err!("unable to open local manifest {manifest_name:?} - {err}")
e2956c60 784 })?;
07ad6470 785
39f18b30 786 let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
07ad6470 787 Ok(manifest_blob)
e2956c60
FG
788 })
789 .map_err(|err: Error| {
87cdc327 790 format_err!("unable to read local manifest {manifest_name:?} - {err}")
07ad6470
DM
791 })?;
792
793 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
1610c45a 794 if !client_log_name.exists() {
05a52d01
HL
795 reader
796 .try_download_client_log(&client_log_name, worker)
797 .await?;
798 };
1ec0d70d 799 task_log!(worker, "no data changes");
e0085e66 800 let _ = std::fs::remove_file(&tmp_manifest_name);
07ad6470
DM
801 return Ok(()); // nothing changed
802 }
803 }
804
805 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
806
07ad6470 807 for item in manifest.files() {
c06c1b4b 808 let mut path = snapshot.full_path();
07ad6470
DM
809 path.push(&item.filename);
810
811 if path.exists() {
812 match archive_type(&item.filename)? {
813 ArchiveType::DynamicIndex => {
814 let index = DynamicIndexReader::open(&path)?;
815 let (csum, size) = index.compute_csum();
816 match manifest.verify_file(&item.filename, &csum, size) {
817 Ok(_) => continue,
818 Err(err) => {
1ec0d70d 819 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
820 }
821 }
822 }
823 ArchiveType::FixedIndex => {
824 let index = FixedIndexReader::open(&path)?;
825 let (csum, size) = index.compute_csum();
826 match manifest.verify_file(&item.filename, &csum, size) {
827 Ok(_) => continue,
828 Err(err) => {
1ec0d70d 829 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
830 }
831 }
832 }
833 ArchiveType::Blob => {
834 let mut tmpfile = std::fs::File::open(&path)?;
ba0ccc59 835 let (csum, size) = sha256(&mut tmpfile)?;
07ad6470
DM
836 match manifest.verify_file(&item.filename, &csum, size) {
837 Ok(_) => continue,
838 Err(err) => {
1ec0d70d 839 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
840 }
841 }
842 }
843 }
844 }
845
846 pull_single_archive(
847 worker,
05a52d01 848 reader.clone(),
07ad6470 849 snapshot,
9a37bd6c 850 item,
ebbe4958 851 downloaded_chunks.clone(),
e2956c60
FG
852 )
853 .await?;
07ad6470
DM
854 }
855
856 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
857 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
858 }
859
1610c45a 860 if !client_log_name.exists() {
05a52d01
HL
861 reader
862 .try_download_client_log(&client_log_name, worker)
863 .await?;
864 };
fe79687c
TL
865 snapshot
866 .cleanup_unreferenced_files(&manifest)
867 .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
07ad6470
DM
868
869 Ok(())
870}
871
c06c1b4b
FG
872/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
873///
05a52d01
HL
874/// The `reader` is configured to read from the source backup directory, while the
875/// `snapshot` is pointing to the local datastore and target namespace.
876async fn pull_snapshot_from<'a>(
877 worker: &'a WorkerTask,
878 reader: Arc<dyn PullReader + 'a>,
879 snapshot: &'a pbs_datastore::BackupDir,
e2956c60 880 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 881) -> Result<(), Error> {
c06c1b4b
FG
882 let (_path, is_new, _snap_lock) = snapshot
883 .datastore()
1afce610 884 .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
07ad6470
DM
885
886 if is_new {
1afce610 887 task_log!(worker, "sync snapshot {}", snapshot.dir());
07ad6470 888
c06c1b4b 889 if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
1afce610
FG
890 if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
891 snapshot.backup_ns(),
892 snapshot.as_ref(),
893 true,
894 ) {
1ec0d70d 895 task_log!(worker, "cleanup error - {}", cleanup_err);
07ad6470
DM
896 }
897 return Err(err);
898 }
1afce610 899 task_log!(worker, "sync snapshot {} done", snapshot.dir());
07ad6470 900 } else {
1afce610 901 task_log!(worker, "re-sync snapshot {}", snapshot.dir());
c06c1b4b 902 pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
07ad6470
DM
903 }
904
905 Ok(())
906}
907
40a57cfa 908#[derive(PartialEq, Eq)]
71db1615
SH
909enum SkipReason {
910 AlreadySynced,
911 TransferLast,
912}
913
40a57cfa
FG
914impl std::fmt::Display for SkipReason {
915 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
916 write!(
917 f,
918 "{}",
919 match self {
920 SkipReason::AlreadySynced => "older than the newest local snapshot",
921 SkipReason::TransferLast => "due to transfer-last",
922 }
923 )
924 }
925}
926
d2354a16
DC
927struct SkipInfo {
928 oldest: i64,
929 newest: i64,
930 count: u64,
71db1615 931 skip_reason: SkipReason,
d2354a16
DC
932}
933
934impl SkipInfo {
71db1615
SH
935 fn new(skip_reason: SkipReason) -> Self {
936 SkipInfo {
937 oldest: i64::MAX,
938 newest: i64::MIN,
939 count: 0,
940 skip_reason,
941 }
942 }
943
944 fn reset(&mut self) {
945 self.count = 0;
946 self.oldest = i64::MAX;
947 self.newest = i64::MIN;
948 }
949
d2354a16
DC
950 fn update(&mut self, backup_time: i64) {
951 self.count += 1;
952
953 if backup_time < self.oldest {
954 self.oldest = backup_time;
955 }
956
957 if backup_time > self.newest {
958 self.newest = backup_time;
959 }
960 }
961
962 fn affected(&self) -> Result<String, Error> {
963 match self.count {
964 0 => Ok(String::new()),
6ef1b649 965 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
ee0ea735
TL
966 _ => Ok(format!(
967 "{} .. {}",
968 proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
969 proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
970 )),
d2354a16
DC
971 }
972 }
973}
974
975impl std::fmt::Display for SkipInfo {
976 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
977 write!(
978 f,
71db1615 979 "skipped: {} snapshot(s) ({}) - {}",
d2354a16 980 self.count,
71db1615 981 self.affected().map_err(|_| std::fmt::Error)?,
40a57cfa 982 self.skip_reason,
d2354a16
DC
983 )
984 }
985}
986
29c56859
FG
987/// Pulls a group according to `params`.
988///
989/// Pulling a group consists of the following steps:
c06c1b4b
FG
990/// - Query the list of snapshots available for this group in the source namespace on the remote
991/// - Sort by snapshot time
29c56859
FG
992/// - Get last snapshot timestamp on local datastore
993/// - Iterate over list of snapshots
29c56859
FG
994/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
995/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
996///
05a52d01 997/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
c06c1b4b
FG
998/// remote when querying snapshots. This allows us to interact with old remotes that don't have
999/// namespace support yet.
1000///
29c56859
FG
1001/// Permission checks:
1002/// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
1003/// - local group owner is already checked by pull_store
aa073917 1004async fn pull_group(
07ad6470 1005 worker: &WorkerTask,
6e9e6c7a 1006 params: &PullParameters,
05a52d01
HL
1007 source_namespace: &BackupNamespace,
1008 group: &BackupGroup,
fc8920e3 1009 progress: &mut StoreProgress,
07ad6470 1010) -> Result<(), Error> {
71db1615
SH
1011 let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
1012 let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
d2354a16 1013
05a52d01
HL
1014 let mut raw_list: Vec<BackupDir> = params
1015 .source
1016 .list_backup_dirs(source_namespace, group, worker)
1017 .await?;
1018 raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
1019
1020 let total_amount = raw_list.len();
9b67352a
SH
1021
1022 let cutoff = params
1023 .transfer_last
1024 .map(|count| total_amount.saturating_sub(count))
1025 .unwrap_or_default();
1026
05a52d01 1027 let target_ns = source_namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
86f6f741 1028
05a52d01
HL
1029 let mut source_snapshots = HashSet::new();
1030 let last_sync_time = params
1031 .target
1032 .store
1033 .last_successful_backup(&target_ns, group)?
1034 .unwrap_or(i64::MIN);
1035
1036 let list: Vec<BackupDir> = raw_list
1037 .into_iter()
1038 .enumerate()
1039 .filter(|&(pos, ref dir)| {
1040 source_snapshots.insert(dir.time);
1041 if last_sync_time > dir.time {
1042 already_synced_skip_info.update(dir.time);
1043 return false;
1044 } else if already_synced_skip_info.count > 0 {
1045 task_log!(worker, "{}", already_synced_skip_info);
1046 already_synced_skip_info.reset();
1047 return true;
1048 }
0081903f 1049
05a52d01
HL
1050 if pos < cutoff && last_sync_time != dir.time {
1051 transfer_last_skip_info.update(dir.time);
1052 return false;
1053 } else if transfer_last_skip_info.count > 0 {
1054 task_log!(worker, "{}", transfer_last_skip_info);
1055 transfer_last_skip_info.reset();
1056 }
1057 true
1058 })
1059 .map(|(_, dir)| dir)
1060 .collect();
07ad6470 1061
05a52d01
HL
1062 // start with 65536 chunks (up to 256 GiB)
1063 let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
07ad6470 1064
05a52d01 1065 progress.group_snapshots = list.len() as u64;
07ad6470 1066
05a52d01
HL
1067 for (pos, from_snapshot) in list.into_iter().enumerate() {
1068 let to_snapshot = params
1069 .target
1070 .store
1071 .backup_dir(target_ns.clone(), from_snapshot.clone())?;
c06c1b4b 1072
05a52d01
HL
1073 let reader = params
1074 .source
1075 .reader(source_namespace, &from_snapshot)
1076 .await?;
1077 let result =
1078 pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
7b8aa893 1079
fc8920e3 1080 progress.done_snapshots = pos as u64 + 1;
1ec0d70d 1081 task_log!(worker, "percentage done: {}", progress);
7b8aa893
DM
1082
1083 result?; // stop on error
07ad6470
DM
1084 }
1085
6e9e6c7a 1086 if params.remove_vanished {
05a52d01
HL
1087 let group = params
1088 .target
1089 .store
1090 .backup_group(target_ns.clone(), group.clone());
6da20161 1091 let local_list = group.list_backups()?;
07ad6470 1092 for info in local_list {
1afce610 1093 let snapshot = info.backup_dir;
05a52d01 1094 if source_snapshots.contains(&snapshot.backup_time()) {
e2956c60
FG
1095 continue;
1096 }
1afce610 1097 if snapshot.is_protected() {
34339261
DC
1098 task_log!(
1099 worker,
e3ea5770 1100 "don't delete vanished snapshot {} (protected)",
1afce610 1101 snapshot.dir()
34339261
DC
1102 );
1103 continue;
1104 }
1afce610 1105 task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
db87d93e 1106 params
05a52d01 1107 .target
db87d93e 1108 .store
1afce610 1109 .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
07ad6470
DM
1110 }
1111 }
1112
1113 Ok(())
1114}
1115
abd82485 1116fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
c06c1b4b 1117 let mut created = false;
05a52d01 1118 let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
c06c1b4b 1119
05a52d01
HL
1120 if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
1121 check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
77bd14f6
FG
1122 .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
1123
ea2e91e5
FG
1124 let name = match ns.components().last() {
1125 Some(name) => name.to_owned(),
1126 None => {
1127 bail!("Failed to determine last component of namespace.");
c06c1b4b 1128 }
ea2e91e5
FG
1129 };
1130
05a52d01 1131 if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
abd82485 1132 bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
c06c1b4b 1133 }
ea2e91e5 1134 created = true;
c06c1b4b
FG
1135 }
1136
abd82485 1137 check_ns_privs(
05a52d01 1138 params.target.store.name(),
abd82485
FG
1139 ns,
1140 &params.owner,
1141 PRIV_DATASTORE_BACKUP,
1142 )
1143 .map_err(|err| format_err!("sync into {store_ns_str} not allowed - {err}"))?;
c06c1b4b
FG
1144
1145 Ok(created)
1146}
1147
1148fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
05a52d01 1149 check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
77bd14f6 1150 .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
ea2e91e5 1151
05a52d01
HL
1152 params
1153 .target
1154 .store
1155 .remove_namespace_recursive(local_ns, true)
c06c1b4b
FG
1156}
1157
1158fn check_and_remove_vanished_ns(
1159 worker: &WorkerTask,
1160 params: &PullParameters,
1161 synced_ns: HashSet<BackupNamespace>,
1162) -> Result<bool, Error> {
1163 let mut errors = false;
1164 let user_info = CachedUserInfo::new()?;
1165
87be232d
FG
1166 // clamp like remote does so that we don't list more than we can ever have synced.
1167 let max_depth = params
1168 .max_depth
05a52d01 1169 .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
87be232d 1170
c06c1b4b 1171 let mut local_ns_list: Vec<BackupNamespace> = params
05a52d01 1172 .target
c06c1b4b 1173 .store
05a52d01 1174 .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
c06c1b4b 1175 .filter(|ns| {
abd82485 1176 let user_privs =
05a52d01 1177 user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
c06c1b4b
FG
1178 user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
1179 })
1180 .collect();
1181
1182 // children first!
1183 local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
1184
1185 for local_ns in local_ns_list {
05a52d01 1186 if local_ns == params.target.ns {
c06c1b4b
FG
1187 continue;
1188 }
1189
1190 if synced_ns.contains(&local_ns) {
1191 continue;
1192 }
1193
1194 if local_ns.is_root() {
1195 continue;
1196 }
1197 match check_and_remove_ns(params, &local_ns) {
1198 Ok(true) => task_log!(worker, "Removed namespace {}", local_ns),
1199 Ok(false) => task_log!(
1200 worker,
1201 "Did not remove namespace {} - protected snapshots remain",
1202 local_ns
1203 ),
1204 Err(err) => {
1205 task_log!(worker, "Failed to remove namespace {} - {}", local_ns, err);
1206 errors = true;
1207 }
1208 }
1209 }
1210
1211 Ok(errors)
1212}
1213
29c56859
FG
1214/// Pulls a store according to `params`.
1215///
1216/// Pulling a store consists of the following steps:
c06c1b4b
FG
1217/// - Query list of namespaces on the remote
1218/// - Iterate list
1219/// -- create sub-NS if needed (and allowed)
1220/// -- attempt to pull each NS in turn
1221/// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote
1222///
1223/// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is
1224/// passed to the remote at all to allow pulling from remotes which have no notion of namespaces.
29c56859
FG
1225///
1226/// Permission checks:
c06c1b4b
FG
1227/// - access to local datastore, namespace anchor and remote entry need to be checked at call site
1228/// - remote namespaces are filtered by remote
1229/// - creation and removal of sub-NS checked here
1230/// - access to sub-NS checked here
5b6cb51d 1231pub(crate) async fn pull_store(
07ad6470 1232 worker: &WorkerTask,
d9aad37f 1233 mut params: PullParameters,
07ad6470 1234) -> Result<(), Error> {
07ad6470 1235 // explicit create shared lock to prevent GC on newly created chunks
05a52d01 1236 let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
7a3e777d 1237 let mut errors = false;
07ad6470 1238
7a3e777d 1239 let old_max_depth = params.max_depth;
05a52d01
HL
1240 let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
1241 vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
c06c1b4b 1242 } else {
05a52d01
HL
1243 params
1244 .source
1245 .list_namespaces(&mut params.max_depth, worker)
1246 .await?
c06c1b4b 1247 };
05a52d01
HL
1248
1249 let ns_layers_to_be_pulled = namespaces
1250 .iter()
1251 .map(BackupNamespace::depth)
1252 .max()
1253 .map_or(0, |v| v - params.source.get_ns().depth());
1254 let target_depth = params.target.ns.depth();
1255
1256 if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
1257 bail!(
1258 "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
1259 ns_layers_to_be_pulled,
1260 target_depth,
1261 MAX_NAMESPACE_DEPTH
1262 );
1263 }
1264
7a3e777d 1265 errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
05a52d01 1266 namespaces.sort_unstable_by_key(|a| a.name_len());
c06c1b4b
FG
1267
1268 let (mut groups, mut snapshots) = (0, 0);
1269 let mut synced_ns = HashSet::with_capacity(namespaces.len());
c06c1b4b
FG
1270
1271 for namespace in namespaces {
4cc4ea64 1272 let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
abd82485 1273
05a52d01
HL
1274 let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
1275 let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
c06c1b4b
FG
1276
1277 task_log!(worker, "----");
1278 task_log!(
1279 worker,
1280 "Syncing {} into {}",
abd82485
FG
1281 source_store_ns_str,
1282 target_store_ns_str
c06c1b4b
FG
1283 );
1284
1285 synced_ns.insert(target_ns.clone());
1286
abd82485 1287 match check_and_create_ns(&params, &target_ns) {
c06c1b4b
FG
1288 Ok(true) => task_log!(worker, "Created namespace {}", target_ns),
1289 Ok(false) => {}
1290 Err(err) => {
1291 task_log!(
1292 worker,
1293 "Cannot sync {} into {} - {}",
abd82485
FG
1294 source_store_ns_str,
1295 target_store_ns_str,
c06c1b4b
FG
1296 err,
1297 );
1298 errors = true;
1299 continue;
1300 }
1301 }
1302
05a52d01 1303 match pull_ns(worker, &namespace, &mut params).await {
c06c1b4b
FG
1304 Ok((ns_progress, ns_errors)) => {
1305 errors |= ns_errors;
1306
b9310489 1307 if params.max_depth != Some(0) {
c06c1b4b
FG
1308 groups += ns_progress.done_groups;
1309 snapshots += ns_progress.done_snapshots;
1310 task_log!(
1311 worker,
1312 "Finished syncing namespace {}, current progress: {} groups, {} snapshots",
1313 namespace,
1314 groups,
1315 snapshots,
1316 );
1317 }
1318 }
1319 Err(err) => {
1320 errors = true;
1321 task_log!(
1322 worker,
1323 "Encountered errors while syncing namespace {} - {}",
05a52d01 1324 &namespace,
c06c1b4b
FG
1325 err,
1326 );
1327 }
1328 };
1329 }
1330
1331 if params.remove_vanished {
d9aad37f 1332 errors |= check_and_remove_vanished_ns(worker, &params, synced_ns)?;
c06c1b4b
FG
1333 }
1334
1335 if errors {
1336 bail!("sync failed with some errors.");
1337 }
1338
1339 Ok(())
1340}
1341
1342/// Pulls a namespace according to `params`.
1343///
1344/// Pulling a namespace consists of the following steps:
1345/// - Query list of groups on the remote (in `source_ns`)
1346/// - Filter list according to configured group filters
1347/// - Iterate list and attempt to pull each group in turn
1348/// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
1349/// not or no longer available on the remote
1350///
1351/// Permission checks:
1352/// - remote namespaces are filtered by remote
1353/// - owner check for vanished groups done here
5b6cb51d 1354pub(crate) async fn pull_ns(
c06c1b4b 1355 worker: &WorkerTask,
05a52d01
HL
1356 namespace: &BackupNamespace,
1357 params: &mut PullParameters,
c06c1b4b 1358) -> Result<(StoreProgress, bool), Error> {
05a52d01 1359 let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, &params.owner).await?;
07ad6470 1360
71e53463 1361 let total_count = list.len();
07ad6470 1362 list.sort_unstable_by(|a, b| {
05a52d01 1363 let type_order = a.ty.cmp(&b.ty);
07ad6470 1364 if type_order == std::cmp::Ordering::Equal {
05a52d01 1365 a.id.cmp(&b.id)
07ad6470
DM
1366 } else {
1367 type_order
1368 }
1369 });
1370
05a52d01 1371 let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
ee0ea735 1372 filters.iter().any(|filter| group.matches(filter))
71e53463
FG
1373 };
1374
71e53463
FG
1375 let list = if let Some(ref group_filter) = &params.group_filter {
1376 let unfiltered_count = list.len();
05a52d01 1377 let list: Vec<BackupGroup> = list
71e53463 1378 .into_iter()
ee0ea735 1379 .filter(|group| apply_filters(group, group_filter))
71e53463 1380 .collect();
ee0ea735
TL
1381 task_log!(
1382 worker,
1383 "found {} groups to sync (out of {} total)",
1384 list.len(),
1385 unfiltered_count
1386 );
71e53463
FG
1387 list
1388 } else {
1389 task_log!(worker, "found {} groups to sync", total_count);
1390 list
1391 };
1392
07ad6470
DM
1393 let mut errors = false;
1394
05a52d01 1395 let mut new_groups = HashSet::new();
e2e7560d
FG
1396 for group in list.iter() {
1397 new_groups.insert(group.clone());
07ad6470
DM
1398 }
1399
fc8920e3
FG
1400 let mut progress = StoreProgress::new(list.len() as u64);
1401
05a52d01
HL
1402 let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
1403
e2e7560d 1404 for (done, group) in list.into_iter().enumerate() {
fc8920e3
FG
1405 progress.done_groups = done as u64;
1406 progress.done_snapshots = 0;
1407 progress.group_snapshots = 0;
7b8aa893 1408
133d718f
WB
1409 let (owner, _lock_guard) =
1410 match params
05a52d01 1411 .target
133d718f 1412 .store
c06c1b4b 1413 .create_locked_backup_group(&target_ns, &group, &params.owner)
133d718f
WB
1414 {
1415 Ok(result) => result,
1416 Err(err) => {
1417 task_log!(
1418 worker,
1419 "sync group {} failed - group lock failed: {}",
1420 &group,
1421 err
1422 );
05a52d01
HL
1423 errors = true;
1424 // do not stop here, instead continue
1425 task_log!(worker, "create_locked_backup_group failed");
133d718f
WB
1426 continue;
1427 }
1428 };
30f73fa2 1429
07ad6470 1430 // permission check
6e9e6c7a 1431 if params.owner != owner {
e2956c60 1432 // only the owner is allowed to create additional snapshots
1ec0d70d
DM
1433 task_log!(
1434 worker,
e2e7560d 1435 "sync group {} failed - owner check failed ({} != {})",
ee0ea735
TL
1436 &group,
1437 params.owner,
1438 owner
1ec0d70d 1439 );
7b8aa893 1440 errors = true; // do not stop here, instead continue
05a52d01 1441 } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
c06c1b4b 1442 {
ee0ea735 1443 task_log!(worker, "sync group {} failed - {}", &group, err,);
20813274 1444 errors = true; // do not stop here, instead continue
07ad6470
DM
1445 }
1446 }
1447
6e9e6c7a 1448 if params.remove_vanished {
6ef1b649 1449 let result: Result<(), Error> = proxmox_lang::try_block!({
05a52d01 1450 for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
249dde8b 1451 let local_group = local_group?;
e13303fc
FG
1452 let local_group = local_group.group();
1453 if new_groups.contains(local_group) {
e2956c60
FG
1454 continue;
1455 }
05a52d01 1456 let owner = params.target.store.get_owner(&target_ns, local_group)?;
df768ebe
FG
1457 if check_backup_owner(&owner, &params.owner).is_err() {
1458 continue;
1459 }
71e53463 1460 if let Some(ref group_filter) = &params.group_filter {
e13303fc 1461 if !apply_filters(local_group, group_filter) {
71e53463
FG
1462 continue;
1463 }
1464 }
e13303fc 1465 task_log!(worker, "delete vanished group '{local_group}'",);
05a52d01
HL
1466 match params
1467 .target
1468 .store
1469 .remove_backup_group(&target_ns, local_group)
1470 {
ee0ea735 1471 Ok(true) => {}
34339261 1472 Ok(false) => {
ee0ea735
TL
1473 task_log!(
1474 worker,
1475 "kept some protected snapshots of group '{}'",
1476 local_group
1477 );
1478 }
34339261
DC
1479 Err(err) => {
1480 task_log!(worker, "{}", err);
1481 errors = true;
1482 }
07ad6470
DM
1483 }
1484 }
1485 Ok(())
1486 });
1487 if let Err(err) = result {
1ec0d70d 1488 task_log!(worker, "error during cleanup: {}", err);
07ad6470
DM
1489 errors = true;
1490 };
1491 }
1492
c06c1b4b 1493 Ok((progress, errors))
07ad6470 1494}