]> git.proxmox.com Git - proxmox-backup.git/blob - src/server/pull.rs
pull: refactor pulling from a datastore
[proxmox-backup.git] / src / server / pull.rs
1 //! Sync datastore from remote server
2
3 use std::collections::{HashMap, HashSet};
4 use std::io::Seek;
5 use std::path::Path;
6 use std::sync::atomic::{AtomicUsize, Ordering};
7 use std::sync::{Arc, Mutex};
8 use std::time::SystemTime;
9
10 use anyhow::{bail, format_err, Error};
11 use http::StatusCode;
12 use proxmox_rest_server::WorkerTask;
13 use proxmox_router::HttpError;
14 use proxmox_sys::{task_log, task_warn};
15 use serde_json::json;
16
17 use pbs_api_types::{
18 print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
19 GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
20 PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
21 };
22 use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
23 use pbs_config::CachedUserInfo;
24 use pbs_datastore::data_blob::DataBlob;
25 use pbs_datastore::dynamic_index::DynamicIndexReader;
26 use pbs_datastore::fixed_index::FixedIndexReader;
27 use pbs_datastore::index::IndexFile;
28 use pbs_datastore::manifest::{
29 archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
30 };
31 use pbs_datastore::read_chunk::AsyncReadChunk;
32 use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
33 use pbs_tools::sha::sha256;
34
35 use crate::backup::{check_ns_modification_privs, check_ns_privs};
36 use crate::tools::parallel_handler::ParallelHandler;
37
38 struct RemoteReader {
39 backup_reader: Arc<BackupReader>,
40 dir: BackupDir,
41 }
42
43 pub(crate) struct PullTarget {
44 store: Arc<DataStore>,
45 ns: BackupNamespace,
46 }
47
48 pub(crate) struct RemoteSource {
49 repo: BackupRepository,
50 ns: BackupNamespace,
51 client: HttpClient,
52 }
53
54 #[async_trait::async_trait]
55 /// `PullSource` is a trait that provides an interface for pulling data/information from a source.
56 /// The trait includes methods for listing namespaces, groups, and backup directories,
57 /// as well as retrieving a reader for reading data from the source
58 trait PullSource: Send + Sync {
59 /// Lists namespaces from the source.
60 async fn list_namespaces(
61 &self,
62 max_depth: &mut Option<usize>,
63 worker: &WorkerTask,
64 ) -> Result<Vec<BackupNamespace>, Error>;
65
66 /// Lists groups within a specific namespace from the source.
67 async fn list_groups(
68 &self,
69 namespace: &BackupNamespace,
70 owner: &Authid,
71 ) -> Result<Vec<BackupGroup>, Error>;
72
73 /// Lists backup directories for a specific group within a specific namespace from the source.
74 async fn list_backup_dirs(
75 &self,
76 namespace: &BackupNamespace,
77 group: &BackupGroup,
78 worker: &WorkerTask,
79 ) -> Result<Vec<BackupDir>, Error>;
80 fn get_ns(&self) -> BackupNamespace;
81 fn print_store_and_ns(&self) -> String;
82
83 /// Returns a reader for reading data from a specific backup directory.
84 async fn reader(
85 &self,
86 ns: &BackupNamespace,
87 dir: &BackupDir,
88 ) -> Result<Arc<dyn PullReader>, Error>;
89 }
90
91 #[async_trait::async_trait]
92 impl PullSource for RemoteSource {
93 async fn list_namespaces(
94 &self,
95 max_depth: &mut Option<usize>,
96 worker: &WorkerTask,
97 ) -> Result<Vec<BackupNamespace>, Error> {
98 if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
99 return Ok(vec![self.ns.clone()]);
100 }
101
102 let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
103 let mut data = json!({});
104 if let Some(max_depth) = max_depth {
105 data["max-depth"] = json!(max_depth);
106 }
107
108 if !self.ns.is_root() {
109 data["parent"] = json!(self.ns);
110 }
111 self.client.login().await?;
112
113 let mut result = match self.client.get(&path, Some(data)).await {
114 Ok(res) => res,
115 Err(err) => match err.downcast_ref::<HttpError>() {
116 Some(HttpError { code, message }) => match code {
117 &StatusCode::NOT_FOUND => {
118 if self.ns.is_root() && max_depth.is_none() {
119 task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
120 task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
121 max_depth.replace(0);
122 } else {
123 bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
124 }
125
126 return Ok(vec![self.ns.clone()]);
127 }
128 _ => {
129 bail!("Querying namespaces failed - HTTP error {code} - {message}");
130 }
131 },
132 None => {
133 bail!("Querying namespaces failed - {err}");
134 }
135 },
136 };
137
138 let list: Vec<BackupNamespace> =
139 serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
140 .into_iter()
141 .map(|list_item| list_item.ns)
142 .collect();
143
144 Ok(list)
145 }
146
147 async fn list_groups(
148 &self,
149 namespace: &BackupNamespace,
150 _owner: &Authid,
151 ) -> Result<Vec<BackupGroup>, Error> {
152 let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
153
154 let args = if !namespace.is_root() {
155 Some(json!({ "ns": namespace.clone() }))
156 } else {
157 None
158 };
159
160 self.client.login().await?;
161 let mut result =
162 self.client.get(&path, args).await.map_err(|err| {
163 format_err!("Failed to retrieve backup groups from remote - {}", err)
164 })?;
165
166 Ok(
167 serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
168 .map_err(Error::from)?
169 .into_iter()
170 .map(|item| item.backup)
171 .collect::<Vec<BackupGroup>>(),
172 )
173 }
174
175 async fn list_backup_dirs(
176 &self,
177 _namespace: &BackupNamespace,
178 group: &BackupGroup,
179 worker: &WorkerTask,
180 ) -> Result<Vec<BackupDir>, Error> {
181 let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
182
183 let mut args = json!({
184 "backup-type": group.ty,
185 "backup-id": group.id,
186 });
187
188 if !self.ns.is_root() {
189 args["ns"] = serde_json::to_value(&self.ns)?;
190 }
191
192 self.client.login().await?;
193
194 let mut result = self.client.get(&path, Some(args)).await?;
195 let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
196 Ok(snapshot_list
197 .into_iter()
198 .filter_map(|item: SnapshotListItem| {
199 let snapshot = item.backup;
200 // in-progress backups can't be synced
201 if item.size.is_none() {
202 task_log!(
203 worker,
204 "skipping snapshot {} - in-progress backup",
205 snapshot
206 );
207 return None;
208 }
209
210 Some(snapshot)
211 })
212 .collect::<Vec<BackupDir>>())
213 }
214
215 fn get_ns(&self) -> BackupNamespace {
216 self.ns.clone()
217 }
218
219 fn print_store_and_ns(&self) -> String {
220 print_store_and_ns(self.repo.store(), &self.ns)
221 }
222
223 async fn reader(
224 &self,
225 ns: &BackupNamespace,
226 dir: &BackupDir,
227 ) -> Result<Arc<dyn PullReader>, Error> {
228 let backup_reader =
229 BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
230 Ok(Arc::new(RemoteReader {
231 backup_reader,
232 dir: dir.clone(),
233 }))
234 }
235 }
236
237 #[async_trait::async_trait]
238 /// `PullReader` is a trait that provides an interface for reading data from a source.
239 /// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
240 trait PullReader: Send + Sync {
241 /// Returns a chunk reader with the specified encryption mode.
242 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
243
244 /// Asynchronously loads a file from the source into a local file.
245 /// `filename` is the name of the file to load from the source.
246 /// `into` is the path of the local file to load the source file into.
247 async fn load_file_into(
248 &self,
249 filename: &str,
250 into: &Path,
251 worker: &WorkerTask,
252 ) -> Result<Option<DataBlob>, Error>;
253
254 /// Tries to download the client log from the source and save it into a local file.
255 async fn try_download_client_log(
256 &self,
257 to_path: &Path,
258 worker: &WorkerTask,
259 ) -> Result<(), Error>;
260
261 fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
262 }
263
264 #[async_trait::async_trait]
265 impl PullReader for RemoteReader {
266 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
267 Arc::new(RemoteChunkReader::new(
268 self.backup_reader.clone(),
269 None,
270 crypt_mode,
271 HashMap::new(),
272 ))
273 }
274
275 async fn load_file_into(
276 &self,
277 filename: &str,
278 into: &Path,
279 worker: &WorkerTask,
280 ) -> Result<Option<DataBlob>, Error> {
281 let mut tmp_file = std::fs::OpenOptions::new()
282 .write(true)
283 .create(true)
284 .truncate(true)
285 .read(true)
286 .open(into)?;
287 let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
288 if let Err(err) = download_result {
289 match err.downcast_ref::<HttpError>() {
290 Some(HttpError { code, message }) => match *code {
291 StatusCode::NOT_FOUND => {
292 task_log!(
293 worker,
294 "skipping snapshot {} - vanished since start of sync",
295 &self.dir,
296 );
297 return Ok(None);
298 }
299 _ => {
300 bail!("HTTP error {code} - {message}");
301 }
302 },
303 None => {
304 return Err(err);
305 }
306 };
307 };
308 tmp_file.rewind()?;
309 Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
310 }
311
312 async fn try_download_client_log(
313 &self,
314 to_path: &Path,
315 worker: &WorkerTask,
316 ) -> Result<(), Error> {
317 let mut tmp_path = to_path.to_owned();
318 tmp_path.set_extension("tmp");
319
320 let tmpfile = std::fs::OpenOptions::new()
321 .write(true)
322 .create(true)
323 .read(true)
324 .open(&tmp_path)?;
325
326 // Note: be silent if there is no log - only log successful download
327 if let Ok(()) = self
328 .backup_reader
329 .download(CLIENT_LOG_BLOB_NAME, tmpfile)
330 .await
331 {
332 if let Err(err) = std::fs::rename(&tmp_path, to_path) {
333 bail!("Atomic rename file {:?} failed - {}", to_path, err);
334 }
335 task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
336 }
337
338 Ok(())
339 }
340
341 fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
342 false
343 }
344 }
345
346 /// Parameters for a pull operation.
347 pub(crate) struct PullParameters {
348 /// Where data is pulled from
349 source: Arc<dyn PullSource>,
350 /// Where data should be pulled into
351 target: PullTarget,
352 /// Owner of synced groups (needs to match local owner of pre-existing groups)
353 owner: Authid,
354 /// Whether to remove groups which exist locally, but not on the remote end
355 remove_vanished: bool,
356 /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion)
357 max_depth: Option<usize>,
358 /// Filters for reducing the pull scope
359 group_filter: Option<Vec<GroupFilter>>,
360 /// How many snapshots should be transferred at most (taking the newest N snapshots)
361 transfer_last: Option<usize>,
362 }
363
364 impl PullParameters {
365 /// Creates a new instance of `PullParameters`.
366 pub(crate) fn new(
367 store: &str,
368 ns: BackupNamespace,
369 remote: Option<&str>,
370 remote_store: &str,
371 remote_ns: BackupNamespace,
372 owner: Authid,
373 remove_vanished: Option<bool>,
374 max_depth: Option<usize>,
375 group_filter: Option<Vec<GroupFilter>>,
376 limit: RateLimitConfig,
377 transfer_last: Option<usize>,
378 ) -> Result<Self, Error> {
379 if let Some(max_depth) = max_depth {
380 ns.check_max_depth(max_depth)?;
381 remote_ns.check_max_depth(max_depth)?;
382 };
383 let remove_vanished = remove_vanished.unwrap_or(false);
384
385 let source: Arc<dyn PullSource> = if let Some(remote) = remote {
386 let (remote_config, _digest) = pbs_config::remote::config()?;
387 let remote: Remote = remote_config.lookup("remote", remote)?;
388
389 let repo = BackupRepository::new(
390 Some(remote.config.auth_id.clone()),
391 Some(remote.config.host.clone()),
392 remote.config.port,
393 remote_store.to_string(),
394 );
395 let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
396 Arc::new(RemoteSource {
397 repo,
398 ns: remote_ns,
399 client,
400 })
401 } else {
402 bail!("local sync not implemented yet")
403 };
404 let target = PullTarget {
405 store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
406 ns,
407 };
408
409 Ok(Self {
410 source,
411 target,
412 owner,
413 remove_vanished,
414 max_depth,
415 group_filter,
416 transfer_last,
417 })
418 }
419 }
420
421 async fn pull_index_chunks<I: IndexFile>(
422 worker: &WorkerTask,
423 chunk_reader: Arc<dyn AsyncReadChunk>,
424 target: Arc<DataStore>,
425 index: I,
426 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
427 ) -> Result<(), Error> {
428 use futures::stream::{self, StreamExt, TryStreamExt};
429
430 let start_time = SystemTime::now();
431
432 let stream = stream::iter(
433 (0..index.index_count())
434 .map(|pos| index.chunk_info(pos).unwrap())
435 .filter(|info| {
436 let mut guard = downloaded_chunks.lock().unwrap();
437 let done = guard.contains(&info.digest);
438 if !done {
439 // Note: We mark a chunk as downloaded before its actually downloaded
440 // to avoid duplicate downloads.
441 guard.insert(info.digest);
442 }
443 !done
444 }),
445 );
446
447 let target2 = target.clone();
448 let verify_pool = ParallelHandler::new(
449 "sync chunk writer",
450 4,
451 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
452 // println!("verify and write {}", hex::encode(&digest));
453 chunk.verify_unencrypted(size as usize, &digest)?;
454 target2.insert_chunk(&chunk, &digest)?;
455 Ok(())
456 },
457 );
458
459 let verify_and_write_channel = verify_pool.channel();
460
461 let bytes = Arc::new(AtomicUsize::new(0));
462
463 stream
464 .map(|info| {
465 let target = Arc::clone(&target);
466 let chunk_reader = chunk_reader.clone();
467 let bytes = Arc::clone(&bytes);
468 let verify_and_write_channel = verify_and_write_channel.clone();
469
470 Ok::<_, Error>(async move {
471 let chunk_exists = proxmox_async::runtime::block_in_place(|| {
472 target.cond_touch_chunk(&info.digest, false)
473 })?;
474 if chunk_exists {
475 //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest));
476 return Ok::<_, Error>(());
477 }
478 //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest));
479 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
480 let raw_size = chunk.raw_size() as usize;
481
482 // decode, verify and write in a separate threads to maximize throughput
483 proxmox_async::runtime::block_in_place(|| {
484 verify_and_write_channel.send((chunk, info.digest, info.size()))
485 })?;
486
487 bytes.fetch_add(raw_size, Ordering::SeqCst);
488
489 Ok(())
490 })
491 })
492 .try_buffer_unordered(20)
493 .try_for_each(|_res| futures::future::ok(()))
494 .await?;
495
496 drop(verify_and_write_channel);
497
498 verify_pool.complete()?;
499
500 let elapsed = start_time.elapsed()?.as_secs_f64();
501
502 let bytes = bytes.load(Ordering::SeqCst);
503
504 task_log!(
505 worker,
506 "downloaded {} bytes ({:.2} MiB/s)",
507 bytes,
508 (bytes as f64) / (1024.0 * 1024.0 * elapsed)
509 );
510
511 Ok(())
512 }
513
514 fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
515 if size != info.size {
516 bail!(
517 "wrong size for file '{}' ({} != {})",
518 info.filename,
519 info.size,
520 size
521 );
522 }
523
524 if csum != &info.csum {
525 bail!("wrong checksum for file '{}'", info.filename);
526 }
527
528 Ok(())
529 }
530
531 /// Pulls a single file referenced by a manifest.
532 ///
533 /// Pulling an archive consists of the following steps:
534 /// - Load archive file into tmp file
535 /// -- Load file into tmp file
536 /// -- Verify tmp file checksum
537 /// - if archive is an index, pull referenced chunks
538 /// - Rename tmp file into real path
539 async fn pull_single_archive<'a>(
540 worker: &'a WorkerTask,
541 reader: Arc<dyn PullReader + 'a>,
542 snapshot: &'a pbs_datastore::BackupDir,
543 archive_info: &'a FileInfo,
544 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
545 ) -> Result<(), Error> {
546 let archive_name = &archive_info.filename;
547 let mut path = snapshot.full_path();
548 path.push(archive_name);
549
550 let mut tmp_path = path.clone();
551 tmp_path.set_extension("tmp");
552
553 task_log!(worker, "sync archive {}", archive_name);
554
555 reader
556 .load_file_into(archive_name, &tmp_path, worker)
557 .await?;
558
559 let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
560
561 match archive_type(archive_name)? {
562 ArchiveType::DynamicIndex => {
563 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
564 format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
565 })?;
566 let (csum, size) = index.compute_csum();
567 verify_archive(archive_info, &csum, size)?;
568
569 if reader.skip_chunk_sync(snapshot.datastore().name()) {
570 task_log!(worker, "skipping chunk sync for same datastore");
571 } else {
572 pull_index_chunks(
573 worker,
574 reader.chunk_reader(archive_info.crypt_mode),
575 snapshot.datastore().clone(),
576 index,
577 downloaded_chunks,
578 )
579 .await?;
580 }
581 }
582 ArchiveType::FixedIndex => {
583 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
584 format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
585 })?;
586 let (csum, size) = index.compute_csum();
587 verify_archive(archive_info, &csum, size)?;
588
589 if reader.skip_chunk_sync(snapshot.datastore().name()) {
590 task_log!(worker, "skipping chunk sync for same datastore");
591 } else {
592 pull_index_chunks(
593 worker,
594 reader.chunk_reader(archive_info.crypt_mode),
595 snapshot.datastore().clone(),
596 index,
597 downloaded_chunks,
598 )
599 .await?;
600 }
601 }
602 ArchiveType::Blob => {
603 tmpfile.rewind()?;
604 let (csum, size) = sha256(&mut tmpfile)?;
605 verify_archive(archive_info, &csum, size)?;
606 }
607 }
608 if let Err(err) = std::fs::rename(&tmp_path, &path) {
609 bail!("Atomic rename file {:?} failed - {}", path, err);
610 }
611 Ok(())
612 }
613
614 /// Actual implementation of pulling a snapshot.
615 ///
616 /// Pulling a snapshot consists of the following steps:
617 /// - (Re)download the manifest
618 /// -- if it matches, only download log and treat snapshot as already synced
619 /// - Iterate over referenced files
620 /// -- if file already exists, verify contents
621 /// -- if not, pull it from the remote
622 /// - Download log if not already existing
623 async fn pull_snapshot<'a>(
624 worker: &'a WorkerTask,
625 reader: Arc<dyn PullReader + 'a>,
626 snapshot: &'a pbs_datastore::BackupDir,
627 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
628 ) -> Result<(), Error> {
629 let mut manifest_name = snapshot.full_path();
630 manifest_name.push(MANIFEST_BLOB_NAME);
631
632 let mut client_log_name = snapshot.full_path();
633 client_log_name.push(CLIENT_LOG_BLOB_NAME);
634
635 let mut tmp_manifest_name = manifest_name.clone();
636 tmp_manifest_name.set_extension("tmp");
637 let tmp_manifest_blob;
638 if let Some(data) = reader
639 .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
640 .await?
641 {
642 tmp_manifest_blob = data;
643 } else {
644 return Ok(());
645 }
646
647 if manifest_name.exists() {
648 let manifest_blob = proxmox_lang::try_block!({
649 let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
650 format_err!("unable to open local manifest {manifest_name:?} - {err}")
651 })?;
652
653 let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
654 Ok(manifest_blob)
655 })
656 .map_err(|err: Error| {
657 format_err!("unable to read local manifest {manifest_name:?} - {err}")
658 })?;
659
660 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
661 if !client_log_name.exists() {
662 reader
663 .try_download_client_log(&client_log_name, worker)
664 .await?;
665 };
666 task_log!(worker, "no data changes");
667 let _ = std::fs::remove_file(&tmp_manifest_name);
668 return Ok(()); // nothing changed
669 }
670 }
671
672 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
673
674 for item in manifest.files() {
675 let mut path = snapshot.full_path();
676 path.push(&item.filename);
677
678 if path.exists() {
679 match archive_type(&item.filename)? {
680 ArchiveType::DynamicIndex => {
681 let index = DynamicIndexReader::open(&path)?;
682 let (csum, size) = index.compute_csum();
683 match manifest.verify_file(&item.filename, &csum, size) {
684 Ok(_) => continue,
685 Err(err) => {
686 task_log!(worker, "detected changed file {:?} - {}", path, err);
687 }
688 }
689 }
690 ArchiveType::FixedIndex => {
691 let index = FixedIndexReader::open(&path)?;
692 let (csum, size) = index.compute_csum();
693 match manifest.verify_file(&item.filename, &csum, size) {
694 Ok(_) => continue,
695 Err(err) => {
696 task_log!(worker, "detected changed file {:?} - {}", path, err);
697 }
698 }
699 }
700 ArchiveType::Blob => {
701 let mut tmpfile = std::fs::File::open(&path)?;
702 let (csum, size) = sha256(&mut tmpfile)?;
703 match manifest.verify_file(&item.filename, &csum, size) {
704 Ok(_) => continue,
705 Err(err) => {
706 task_log!(worker, "detected changed file {:?} - {}", path, err);
707 }
708 }
709 }
710 }
711 }
712
713 pull_single_archive(
714 worker,
715 reader.clone(),
716 snapshot,
717 item,
718 downloaded_chunks.clone(),
719 )
720 .await?;
721 }
722
723 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
724 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
725 }
726
727 if !client_log_name.exists() {
728 reader
729 .try_download_client_log(&client_log_name, worker)
730 .await?;
731 };
732 snapshot
733 .cleanup_unreferenced_files(&manifest)
734 .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
735
736 Ok(())
737 }
738
739 /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
740 ///
741 /// The `reader` is configured to read from the source backup directory, while the
742 /// `snapshot` is pointing to the local datastore and target namespace.
743 async fn pull_snapshot_from<'a>(
744 worker: &'a WorkerTask,
745 reader: Arc<dyn PullReader + 'a>,
746 snapshot: &'a pbs_datastore::BackupDir,
747 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
748 ) -> Result<(), Error> {
749 let (_path, is_new, _snap_lock) = snapshot
750 .datastore()
751 .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
752
753 if is_new {
754 task_log!(worker, "sync snapshot {}", snapshot.dir());
755
756 if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
757 if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
758 snapshot.backup_ns(),
759 snapshot.as_ref(),
760 true,
761 ) {
762 task_log!(worker, "cleanup error - {}", cleanup_err);
763 }
764 return Err(err);
765 }
766 task_log!(worker, "sync snapshot {} done", snapshot.dir());
767 } else {
768 task_log!(worker, "re-sync snapshot {}", snapshot.dir());
769 pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
770 }
771
772 Ok(())
773 }
774
775 #[derive(PartialEq, Eq)]
776 enum SkipReason {
777 AlreadySynced,
778 TransferLast,
779 }
780
781 impl std::fmt::Display for SkipReason {
782 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
783 write!(
784 f,
785 "{}",
786 match self {
787 SkipReason::AlreadySynced => "older than the newest local snapshot",
788 SkipReason::TransferLast => "due to transfer-last",
789 }
790 )
791 }
792 }
793
794 struct SkipInfo {
795 oldest: i64,
796 newest: i64,
797 count: u64,
798 skip_reason: SkipReason,
799 }
800
801 impl SkipInfo {
802 fn new(skip_reason: SkipReason) -> Self {
803 SkipInfo {
804 oldest: i64::MAX,
805 newest: i64::MIN,
806 count: 0,
807 skip_reason,
808 }
809 }
810
811 fn reset(&mut self) {
812 self.count = 0;
813 self.oldest = i64::MAX;
814 self.newest = i64::MIN;
815 }
816
817 fn update(&mut self, backup_time: i64) {
818 self.count += 1;
819
820 if backup_time < self.oldest {
821 self.oldest = backup_time;
822 }
823
824 if backup_time > self.newest {
825 self.newest = backup_time;
826 }
827 }
828
829 fn affected(&self) -> Result<String, Error> {
830 match self.count {
831 0 => Ok(String::new()),
832 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
833 _ => Ok(format!(
834 "{} .. {}",
835 proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
836 proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
837 )),
838 }
839 }
840 }
841
842 impl std::fmt::Display for SkipInfo {
843 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
844 write!(
845 f,
846 "skipped: {} snapshot(s) ({}) - {}",
847 self.count,
848 self.affected().map_err(|_| std::fmt::Error)?,
849 self.skip_reason,
850 )
851 }
852 }
853
854 /// Pulls a group according to `params`.
855 ///
856 /// Pulling a group consists of the following steps:
857 /// - Query the list of snapshots available for this group in the source namespace on the remote
858 /// - Sort by snapshot time
859 /// - Get last snapshot timestamp on local datastore
860 /// - Iterate over list of snapshots
861 /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
862 /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
863 ///
864 /// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
865 /// remote when querying snapshots. This allows us to interact with old remotes that don't have
866 /// namespace support yet.
867 ///
868 /// Permission checks:
869 /// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
870 /// - local group owner is already checked by pull_store
871 async fn pull_group(
872 worker: &WorkerTask,
873 params: &PullParameters,
874 source_namespace: &BackupNamespace,
875 group: &BackupGroup,
876 progress: &mut StoreProgress,
877 ) -> Result<(), Error> {
878 let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
879 let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
880
881 let mut raw_list: Vec<BackupDir> = params
882 .source
883 .list_backup_dirs(source_namespace, group, worker)
884 .await?;
885 raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
886
887 let total_amount = raw_list.len();
888
889 let cutoff = params
890 .transfer_last
891 .map(|count| total_amount.saturating_sub(count))
892 .unwrap_or_default();
893
894 let target_ns = source_namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
895
896 let mut source_snapshots = HashSet::new();
897 let last_sync_time = params
898 .target
899 .store
900 .last_successful_backup(&target_ns, group)?
901 .unwrap_or(i64::MIN);
902
903 let list: Vec<BackupDir> = raw_list
904 .into_iter()
905 .enumerate()
906 .filter(|&(pos, ref dir)| {
907 source_snapshots.insert(dir.time);
908 if last_sync_time > dir.time {
909 already_synced_skip_info.update(dir.time);
910 return false;
911 } else if already_synced_skip_info.count > 0 {
912 task_log!(worker, "{}", already_synced_skip_info);
913 already_synced_skip_info.reset();
914 return true;
915 }
916
917 if pos < cutoff && last_sync_time != dir.time {
918 transfer_last_skip_info.update(dir.time);
919 return false;
920 } else if transfer_last_skip_info.count > 0 {
921 task_log!(worker, "{}", transfer_last_skip_info);
922 transfer_last_skip_info.reset();
923 }
924 true
925 })
926 .map(|(_, dir)| dir)
927 .collect();
928
929 // start with 65536 chunks (up to 256 GiB)
930 let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
931
932 progress.group_snapshots = list.len() as u64;
933
934 for (pos, from_snapshot) in list.into_iter().enumerate() {
935 let to_snapshot = params
936 .target
937 .store
938 .backup_dir(target_ns.clone(), from_snapshot.clone())?;
939
940 let reader = params
941 .source
942 .reader(source_namespace, &from_snapshot)
943 .await?;
944 let result =
945 pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
946
947 progress.done_snapshots = pos as u64 + 1;
948 task_log!(worker, "percentage done: {}", progress);
949
950 result?; // stop on error
951 }
952
953 if params.remove_vanished {
954 let group = params
955 .target
956 .store
957 .backup_group(target_ns.clone(), group.clone());
958 let local_list = group.list_backups()?;
959 for info in local_list {
960 let snapshot = info.backup_dir;
961 if source_snapshots.contains(&snapshot.backup_time()) {
962 continue;
963 }
964 if snapshot.is_protected() {
965 task_log!(
966 worker,
967 "don't delete vanished snapshot {} (protected)",
968 snapshot.dir()
969 );
970 continue;
971 }
972 task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
973 params
974 .target
975 .store
976 .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
977 }
978 }
979
980 Ok(())
981 }
982
983 fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
984 let mut created = false;
985 let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
986
987 if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
988 check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
989 .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
990
991 let name = match ns.components().last() {
992 Some(name) => name.to_owned(),
993 None => {
994 bail!("Failed to determine last component of namespace.");
995 }
996 };
997
998 if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
999 bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
1000 }
1001 created = true;
1002 }
1003
1004 check_ns_privs(
1005 params.target.store.name(),
1006 ns,
1007 &params.owner,
1008 PRIV_DATASTORE_BACKUP,
1009 )
1010 .map_err(|err| format_err!("sync into {store_ns_str} not allowed - {err}"))?;
1011
1012 Ok(created)
1013 }
1014
1015 fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
1016 check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
1017 .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
1018
1019 params
1020 .target
1021 .store
1022 .remove_namespace_recursive(local_ns, true)
1023 }
1024
1025 fn check_and_remove_vanished_ns(
1026 worker: &WorkerTask,
1027 params: &PullParameters,
1028 synced_ns: HashSet<BackupNamespace>,
1029 ) -> Result<bool, Error> {
1030 let mut errors = false;
1031 let user_info = CachedUserInfo::new()?;
1032
1033 // clamp like remote does so that we don't list more than we can ever have synced.
1034 let max_depth = params
1035 .max_depth
1036 .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
1037
1038 let mut local_ns_list: Vec<BackupNamespace> = params
1039 .target
1040 .store
1041 .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
1042 .filter(|ns| {
1043 let user_privs =
1044 user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
1045 user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
1046 })
1047 .collect();
1048
1049 // children first!
1050 local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
1051
1052 for local_ns in local_ns_list {
1053 if local_ns == params.target.ns {
1054 continue;
1055 }
1056
1057 if synced_ns.contains(&local_ns) {
1058 continue;
1059 }
1060
1061 if local_ns.is_root() {
1062 continue;
1063 }
1064 match check_and_remove_ns(params, &local_ns) {
1065 Ok(true) => task_log!(worker, "Removed namespace {}", local_ns),
1066 Ok(false) => task_log!(
1067 worker,
1068 "Did not remove namespace {} - protected snapshots remain",
1069 local_ns
1070 ),
1071 Err(err) => {
1072 task_log!(worker, "Failed to remove namespace {} - {}", local_ns, err);
1073 errors = true;
1074 }
1075 }
1076 }
1077
1078 Ok(errors)
1079 }
1080
1081 /// Pulls a store according to `params`.
1082 ///
1083 /// Pulling a store consists of the following steps:
1084 /// - Query list of namespaces on the remote
1085 /// - Iterate list
1086 /// -- create sub-NS if needed (and allowed)
1087 /// -- attempt to pull each NS in turn
1088 /// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote
1089 ///
1090 /// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is
1091 /// passed to the remote at all to allow pulling from remotes which have no notion of namespaces.
1092 ///
1093 /// Permission checks:
1094 /// - access to local datastore, namespace anchor and remote entry need to be checked at call site
1095 /// - remote namespaces are filtered by remote
1096 /// - creation and removal of sub-NS checked here
1097 /// - access to sub-NS checked here
1098 pub(crate) async fn pull_store(
1099 worker: &WorkerTask,
1100 mut params: PullParameters,
1101 ) -> Result<(), Error> {
1102 // explicit create shared lock to prevent GC on newly created chunks
1103 let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
1104 let mut errors = false;
1105
1106 let old_max_depth = params.max_depth;
1107 let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
1108 vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
1109 } else {
1110 params
1111 .source
1112 .list_namespaces(&mut params.max_depth, worker)
1113 .await?
1114 };
1115
1116 let ns_layers_to_be_pulled = namespaces
1117 .iter()
1118 .map(BackupNamespace::depth)
1119 .max()
1120 .map_or(0, |v| v - params.source.get_ns().depth());
1121 let target_depth = params.target.ns.depth();
1122
1123 if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
1124 bail!(
1125 "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
1126 ns_layers_to_be_pulled,
1127 target_depth,
1128 MAX_NAMESPACE_DEPTH
1129 );
1130 }
1131
1132 errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
1133 namespaces.sort_unstable_by_key(|a| a.name_len());
1134
1135 let (mut groups, mut snapshots) = (0, 0);
1136 let mut synced_ns = HashSet::with_capacity(namespaces.len());
1137
1138 for namespace in namespaces {
1139 let source_store_ns_str = params.source.print_store_and_ns();
1140
1141 let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
1142 let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
1143
1144 task_log!(worker, "----");
1145 task_log!(
1146 worker,
1147 "Syncing {} into {}",
1148 source_store_ns_str,
1149 target_store_ns_str
1150 );
1151
1152 synced_ns.insert(target_ns.clone());
1153
1154 match check_and_create_ns(&params, &target_ns) {
1155 Ok(true) => task_log!(worker, "Created namespace {}", target_ns),
1156 Ok(false) => {}
1157 Err(err) => {
1158 task_log!(
1159 worker,
1160 "Cannot sync {} into {} - {}",
1161 source_store_ns_str,
1162 target_store_ns_str,
1163 err,
1164 );
1165 errors = true;
1166 continue;
1167 }
1168 }
1169
1170 match pull_ns(worker, &namespace, &mut params).await {
1171 Ok((ns_progress, ns_errors)) => {
1172 errors |= ns_errors;
1173
1174 if params.max_depth != Some(0) {
1175 groups += ns_progress.done_groups;
1176 snapshots += ns_progress.done_snapshots;
1177 task_log!(
1178 worker,
1179 "Finished syncing namespace {}, current progress: {} groups, {} snapshots",
1180 namespace,
1181 groups,
1182 snapshots,
1183 );
1184 }
1185 }
1186 Err(err) => {
1187 errors = true;
1188 task_log!(
1189 worker,
1190 "Encountered errors while syncing namespace {} - {}",
1191 &namespace,
1192 err,
1193 );
1194 }
1195 };
1196 }
1197
1198 if params.remove_vanished {
1199 errors |= check_and_remove_vanished_ns(worker, &params, synced_ns)?;
1200 }
1201
1202 if errors {
1203 bail!("sync failed with some errors.");
1204 }
1205
1206 Ok(())
1207 }
1208
1209 /// Pulls a namespace according to `params`.
1210 ///
1211 /// Pulling a namespace consists of the following steps:
1212 /// - Query list of groups on the remote (in `source_ns`)
1213 /// - Filter list according to configured group filters
1214 /// - Iterate list and attempt to pull each group in turn
1215 /// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
1216 /// not or no longer available on the remote
1217 ///
1218 /// Permission checks:
1219 /// - remote namespaces are filtered by remote
1220 /// - owner check for vanished groups done here
1221 pub(crate) async fn pull_ns(
1222 worker: &WorkerTask,
1223 namespace: &BackupNamespace,
1224 params: &mut PullParameters,
1225 ) -> Result<(StoreProgress, bool), Error> {
1226 let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, &params.owner).await?;
1227
1228 let total_count = list.len();
1229 list.sort_unstable_by(|a, b| {
1230 let type_order = a.ty.cmp(&b.ty);
1231 if type_order == std::cmp::Ordering::Equal {
1232 a.id.cmp(&b.id)
1233 } else {
1234 type_order
1235 }
1236 });
1237
1238 let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
1239 filters.iter().any(|filter| group.matches(filter))
1240 };
1241
1242 let list = if let Some(ref group_filter) = &params.group_filter {
1243 let unfiltered_count = list.len();
1244 let list: Vec<BackupGroup> = list
1245 .into_iter()
1246 .filter(|group| apply_filters(group, group_filter))
1247 .collect();
1248 task_log!(
1249 worker,
1250 "found {} groups to sync (out of {} total)",
1251 list.len(),
1252 unfiltered_count
1253 );
1254 list
1255 } else {
1256 task_log!(worker, "found {} groups to sync", total_count);
1257 list
1258 };
1259
1260 let mut errors = false;
1261
1262 let mut new_groups = HashSet::new();
1263 for group in list.iter() {
1264 new_groups.insert(group.clone());
1265 }
1266
1267 let mut progress = StoreProgress::new(list.len() as u64);
1268
1269 let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
1270
1271 for (done, group) in list.into_iter().enumerate() {
1272 progress.done_groups = done as u64;
1273 progress.done_snapshots = 0;
1274 progress.group_snapshots = 0;
1275
1276 let (owner, _lock_guard) =
1277 match params
1278 .target
1279 .store
1280 .create_locked_backup_group(&target_ns, &group, &params.owner)
1281 {
1282 Ok(result) => result,
1283 Err(err) => {
1284 task_log!(
1285 worker,
1286 "sync group {} failed - group lock failed: {}",
1287 &group,
1288 err
1289 );
1290 errors = true;
1291 // do not stop here, instead continue
1292 task_log!(worker, "create_locked_backup_group failed");
1293 continue;
1294 }
1295 };
1296
1297 // permission check
1298 if params.owner != owner {
1299 // only the owner is allowed to create additional snapshots
1300 task_log!(
1301 worker,
1302 "sync group {} failed - owner check failed ({} != {})",
1303 &group,
1304 params.owner,
1305 owner
1306 );
1307 errors = true; // do not stop here, instead continue
1308 } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
1309 {
1310 task_log!(worker, "sync group {} failed - {}", &group, err,);
1311 errors = true; // do not stop here, instead continue
1312 }
1313 }
1314
1315 if params.remove_vanished {
1316 let result: Result<(), Error> = proxmox_lang::try_block!({
1317 for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
1318 let local_group = local_group?;
1319 let local_group = local_group.group();
1320 if new_groups.contains(local_group) {
1321 continue;
1322 }
1323 let owner = params.target.store.get_owner(&target_ns, local_group)?;
1324 if check_backup_owner(&owner, &params.owner).is_err() {
1325 continue;
1326 }
1327 if let Some(ref group_filter) = &params.group_filter {
1328 if !apply_filters(local_group, group_filter) {
1329 continue;
1330 }
1331 }
1332 task_log!(worker, "delete vanished group '{local_group}'",);
1333 match params
1334 .target
1335 .store
1336 .remove_backup_group(&target_ns, local_group)
1337 {
1338 Ok(true) => {}
1339 Ok(false) => {
1340 task_log!(
1341 worker,
1342 "kept some protected snapshots of group '{}'",
1343 local_group
1344 );
1345 }
1346 Err(err) => {
1347 task_log!(worker, "{}", err);
1348 errors = true;
1349 }
1350 }
1351 }
1352 Ok(())
1353 });
1354 if let Err(err) = result {
1355 task_log!(worker, "error during cleanup: {}", err);
1356 errors = true;
1357 };
1358 }
1359
1360 Ok((progress, errors))
1361 }