1 //! Sync datastore from remote server
3 use std
::collections
::{HashMap, HashSet}
;
4 use std
::io
::{Seek, Write}
;
5 use std
::path
::{Path, PathBuf}
;
6 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
7 use std
::sync
::{Arc, Mutex}
;
8 use std
::time
::SystemTime
;
10 use anyhow
::{bail, format_err, Error}
;
12 use proxmox_rest_server
::WorkerTask
;
13 use proxmox_router
::HttpError
;
14 use proxmox_sys
::{task_log, task_warn}
;
18 print_store_and_ns
, Authid
, BackupDir
, BackupGroup
, BackupNamespace
, CryptMode
, GroupFilter
,
19 GroupListItem
, Operation
, RateLimitConfig
, Remote
, SnapshotListItem
, MAX_NAMESPACE_DEPTH
,
20 PRIV_DATASTORE_AUDIT
, PRIV_DATASTORE_BACKUP
, PRIV_DATASTORE_READ
,
22 use pbs_client
::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}
;
23 use pbs_config
::CachedUserInfo
;
24 use pbs_datastore
::data_blob
::DataBlob
;
25 use pbs_datastore
::dynamic_index
::DynamicIndexReader
;
26 use pbs_datastore
::fixed_index
::FixedIndexReader
;
27 use pbs_datastore
::index
::IndexFile
;
28 use pbs_datastore
::manifest
::{
29 archive_type
, ArchiveType
, BackupManifest
, FileInfo
, CLIENT_LOG_BLOB_NAME
, MANIFEST_BLOB_NAME
,
31 use pbs_datastore
::read_chunk
::AsyncReadChunk
;
33 check_backup_owner
, DataStore
, ListNamespacesRecursive
, LocalChunkReader
, StoreProgress
,
35 use pbs_tools
::sha
::sha256
;
37 use crate::backup
::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups}
;
38 use crate::tools
::parallel_handler
::ParallelHandler
;
41 backup_reader
: Arc
<BackupReader
>,
46 _dir_lock
: Arc
<Mutex
<proxmox_sys
::fs
::DirLockGuard
>>,
48 datastore
: Arc
<DataStore
>,
51 pub(crate) struct PullTarget
{
52 store
: Arc
<DataStore
>,
56 pub(crate) struct RemoteSource
{
57 repo
: BackupRepository
,
62 pub(crate) struct LocalSource
{
63 store
: Arc
<DataStore
>,
67 #[async_trait::async_trait]
68 /// `PullSource` is a trait that provides an interface for pulling data/information from a source.
69 /// The trait includes methods for listing namespaces, groups, and backup directories,
70 /// as well as retrieving a reader for reading data from the source
71 trait PullSource
: Send
+ Sync
{
72 /// Lists namespaces from the source.
73 async
fn list_namespaces(
75 max_depth
: &mut Option
<usize>,
77 ) -> Result
<Vec
<BackupNamespace
>, Error
>;
79 /// Lists groups within a specific namespace from the source.
82 namespace
: &BackupNamespace
,
84 ) -> Result
<Vec
<BackupGroup
>, Error
>;
86 /// Lists backup directories for a specific group within a specific namespace from the source.
87 async
fn list_backup_dirs(
89 namespace
: &BackupNamespace
,
92 ) -> Result
<Vec
<BackupDir
>, Error
>;
93 fn get_ns(&self) -> BackupNamespace
;
94 fn get_store(&self) -> &str;
96 /// Returns a reader for reading data from a specific backup directory.
101 ) -> Result
<Arc
<dyn PullReader
>, Error
>;
104 #[async_trait::async_trait]
105 impl PullSource
for RemoteSource
{
106 async
fn list_namespaces(
108 max_depth
: &mut Option
<usize>,
110 ) -> Result
<Vec
<BackupNamespace
>, Error
> {
111 if self.ns
.is_root() && max_depth
.map_or(false, |depth
| depth
== 0) {
112 return Ok(vec
![self.ns
.clone()]);
115 let path
= format
!("api2/json/admin/datastore/{}/namespace", self.repo
.store());
116 let mut data
= json
!({}
);
117 if let Some(max_depth
) = max_depth
{
118 data
["max-depth"] = json
!(max_depth
);
121 if !self.ns
.is_root() {
122 data
["parent"] = json
!(self.ns
);
124 self.client
.login().await?
;
126 let mut result
= match self.client
.get(&path
, Some(data
)).await
{
128 Err(err
) => match err
.downcast_ref
::<HttpError
>() {
129 Some(HttpError { code, message }
) => match code
{
130 &StatusCode
::NOT_FOUND
=> {
131 if self.ns
.is_root() && max_depth
.is_none() {
132 task_warn
!(worker
, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
133 task_warn
!(worker
, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
134 max_depth
.replace(0);
136 bail
!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
139 return Ok(vec
![self.ns
.clone()]);
142 bail
!("Querying namespaces failed - HTTP error {code} - {message}");
146 bail
!("Querying namespaces failed - {err}");
151 let list
: Vec
<BackupNamespace
> =
152 serde_json
::from_value
::<Vec
<pbs_api_types
::NamespaceListItem
>>(result
["data"].take())?
154 .map(|list_item
| list_item
.ns
)
160 async
fn list_groups(
162 namespace
: &BackupNamespace
,
164 ) -> Result
<Vec
<BackupGroup
>, Error
> {
165 let path
= format
!("api2/json/admin/datastore/{}/groups", self.repo
.store());
167 let args
= if !namespace
.is_root() {
168 Some(json
!({ "ns": namespace.clone() }
))
173 self.client
.login().await?
;
175 self.client
.get(&path
, args
).await
.map_err(|err
| {
176 format_err
!("Failed to retrieve backup groups from remote - {}", err
)
180 serde_json
::from_value
::<Vec
<GroupListItem
>>(result
["data"].take())
181 .map_err(Error
::from
)?
183 .map(|item
| item
.backup
)
184 .collect
::<Vec
<BackupGroup
>>(),
188 async
fn list_backup_dirs(
190 namespace
: &BackupNamespace
,
193 ) -> Result
<Vec
<BackupDir
>, Error
> {
194 let path
= format
!("api2/json/admin/datastore/{}/snapshots", self.repo
.store());
196 let mut args
= json
!({
197 "backup-type": group
.ty
,
198 "backup-id": group
.id
,
201 if !namespace
.is_root() {
202 args
["ns"] = serde_json
::to_value(&namespace
)?
;
205 self.client
.login().await?
;
207 let mut result
= self.client
.get(&path
, Some(args
)).await?
;
208 let snapshot_list
: Vec
<SnapshotListItem
> = serde_json
::from_value(result
["data"].take())?
;
211 .filter_map(|item
: SnapshotListItem
| {
212 let snapshot
= item
.backup
;
213 // in-progress backups can't be synced
214 if item
.size
.is_none() {
217 "skipping snapshot {} - in-progress backup",
225 .collect
::<Vec
<BackupDir
>>())
228 fn get_ns(&self) -> BackupNamespace
{
232 fn get_store(&self) -> &str {
238 ns
: &BackupNamespace
,
240 ) -> Result
<Arc
<dyn PullReader
>, Error
> {
242 BackupReader
::start(&self.client
, None
, self.repo
.store(), ns
, dir
, true).await?
;
243 Ok(Arc
::new(RemoteReader
{
250 #[async_trait::async_trait]
251 impl PullSource
for LocalSource
{
252 async
fn list_namespaces(
254 max_depth
: &mut Option
<usize>,
255 _worker
: &WorkerTask
,
256 ) -> Result
<Vec
<BackupNamespace
>, Error
> {
257 ListNamespacesRecursive
::new_max_depth(
260 max_depth
.unwrap_or(MAX_NAMESPACE_DEPTH
),
265 async
fn list_groups(
267 namespace
: &BackupNamespace
,
269 ) -> Result
<Vec
<BackupGroup
>, Error
> {
270 Ok(ListAccessibleBackupGroups
::new_with_privs(
274 Some(PRIV_DATASTORE_READ
),
275 Some(PRIV_DATASTORE_BACKUP
),
278 .filter_map(Result
::ok
)
279 .map(|backup_group
| backup_group
.group().clone())
280 .collect
::<Vec
<pbs_api_types
::BackupGroup
>>())
283 async
fn list_backup_dirs(
285 namespace
: &BackupNamespace
,
287 _worker
: &WorkerTask
,
288 ) -> Result
<Vec
<BackupDir
>, Error
> {
291 .backup_group(namespace
.clone(), group
.clone())
293 .filter_map(Result
::ok
)
294 .map(|snapshot
| snapshot
.dir().to_owned())
295 .collect
::<Vec
<BackupDir
>>())
298 fn get_ns(&self) -> BackupNamespace
{
302 fn get_store(&self) -> &str {
308 ns
: &BackupNamespace
,
310 ) -> Result
<Arc
<dyn PullReader
>, Error
> {
311 let dir
= self.store
.backup_dir(ns
.clone(), dir
.clone())?
;
312 let dir_lock
= proxmox_sys
::fs
::lock_dir_noblock_shared(
315 "locked by another operation",
317 Ok(Arc
::new(LocalReader
{
318 _dir_lock
: Arc
::new(Mutex
::new(dir_lock
)),
319 path
: dir
.full_path(),
320 datastore
: dir
.datastore().clone(),
325 #[async_trait::async_trait]
326 /// `PullReader` is a trait that provides an interface for reading data from a source.
327 /// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
328 trait PullReader
: Send
+ Sync
{
329 /// Returns a chunk reader with the specified encryption mode.
330 fn chunk_reader(&self, crypt_mode
: CryptMode
) -> Arc
<dyn AsyncReadChunk
>;
332 /// Asynchronously loads a file from the source into a local file.
333 /// `filename` is the name of the file to load from the source.
334 /// `into` is the path of the local file to load the source file into.
335 async
fn load_file_into(
340 ) -> Result
<Option
<DataBlob
>, Error
>;
342 /// Tries to download the client log from the source and save it into a local file.
343 async
fn try_download_client_log(
347 ) -> Result
<(), Error
>;
349 fn skip_chunk_sync(&self, target_store_name
: &str) -> bool
;
352 #[async_trait::async_trait]
353 impl PullReader
for RemoteReader
{
354 fn chunk_reader(&self, crypt_mode
: CryptMode
) -> Arc
<dyn AsyncReadChunk
> {
355 Arc
::new(RemoteChunkReader
::new(
356 self.backup_reader
.clone(),
363 async
fn load_file_into(
368 ) -> Result
<Option
<DataBlob
>, Error
> {
369 let mut tmp_file
= std
::fs
::OpenOptions
::new()
375 let download_result
= self.backup_reader
.download(filename
, &mut tmp_file
).await
;
376 if let Err(err
) = download_result
{
377 match err
.downcast_ref
::<HttpError
>() {
378 Some(HttpError { code, message }
) => match *code
{
379 StatusCode
::NOT_FOUND
=> {
382 "skipping snapshot {} - vanished since start of sync",
388 bail
!("HTTP error {code} - {message}");
397 Ok(DataBlob
::load_from_reader(&mut tmp_file
).ok())
400 async
fn try_download_client_log(
404 ) -> Result
<(), Error
> {
405 let mut tmp_path
= to_path
.to_owned();
406 tmp_path
.set_extension("tmp");
408 let tmpfile
= std
::fs
::OpenOptions
::new()
414 // Note: be silent if there is no log - only log successful download
417 .download(CLIENT_LOG_BLOB_NAME
, tmpfile
)
420 if let Err(err
) = std
::fs
::rename(&tmp_path
, to_path
) {
421 bail
!("Atomic rename file {:?} failed - {}", to_path
, err
);
423 task_log
!(worker
, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME
);
429 fn skip_chunk_sync(&self, _target_store_name
: &str) -> bool
{
434 #[async_trait::async_trait]
435 impl PullReader
for LocalReader
{
436 fn chunk_reader(&self, crypt_mode
: CryptMode
) -> Arc
<dyn AsyncReadChunk
> {
437 Arc
::new(LocalChunkReader
::new(
438 self.datastore
.clone(),
444 async
fn load_file_into(
448 _worker
: &WorkerTask
,
449 ) -> Result
<Option
<DataBlob
>, Error
> {
450 let mut tmp_file
= std
::fs
::OpenOptions
::new()
456 let mut from_path
= self.path
.clone();
457 from_path
.push(filename
);
458 tmp_file
.write_all(std
::fs
::read(from_path
)?
.as_slice())?
;
460 Ok(DataBlob
::load_from_reader(&mut tmp_file
).ok())
463 async
fn try_download_client_log(
466 _worker
: &WorkerTask
,
467 ) -> Result
<(), Error
> {
471 fn skip_chunk_sync(&self, target_store_name
: &str) -> bool
{
472 self.datastore
.name() == target_store_name
476 /// Parameters for a pull operation.
477 pub(crate) struct PullParameters
{
478 /// Where data is pulled from
479 source
: Arc
<dyn PullSource
>,
480 /// Where data should be pulled into
482 /// Owner of synced groups (needs to match local owner of pre-existing groups)
484 /// Whether to remove groups which exist locally, but not on the remote end
485 remove_vanished
: bool
,
486 /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion)
487 max_depth
: Option
<usize>,
488 /// Filters for reducing the pull scope
489 group_filter
: Option
<Vec
<GroupFilter
>>,
490 /// How many snapshots should be transferred at most (taking the newest N snapshots)
491 transfer_last
: Option
<usize>,
494 impl PullParameters
{
495 /// Creates a new instance of `PullParameters`.
499 remote
: Option
<&str>,
501 remote_ns
: BackupNamespace
,
503 remove_vanished
: Option
<bool
>,
504 max_depth
: Option
<usize>,
505 group_filter
: Option
<Vec
<GroupFilter
>>,
506 limit
: RateLimitConfig
,
507 transfer_last
: Option
<usize>,
508 ) -> Result
<Self, Error
> {
509 if let Some(max_depth
) = max_depth
{
510 ns
.check_max_depth(max_depth
)?
;
511 remote_ns
.check_max_depth(max_depth
)?
;
513 let remove_vanished
= remove_vanished
.unwrap_or(false);
515 let source
: Arc
<dyn PullSource
> = if let Some(remote
) = remote
{
516 let (remote_config
, _digest
) = pbs_config
::remote
::config()?
;
517 let remote
: Remote
= remote_config
.lookup("remote", remote
)?
;
519 let repo
= BackupRepository
::new(
520 Some(remote
.config
.auth_id
.clone()),
521 Some(remote
.config
.host
.clone()),
523 remote_store
.to_string(),
525 let client
= crate::api2
::config
::remote
::remote_client_config(&remote
, Some(limit
))?
;
526 Arc
::new(RemoteSource
{
532 Arc
::new(LocalSource
{
533 store
: DataStore
::lookup_datastore(remote_store
, Some(Operation
::Read
))?
,
537 let target
= PullTarget
{
538 store
: DataStore
::lookup_datastore(store
, Some(Operation
::Write
))?
,
554 async
fn pull_index_chunks
<I
: IndexFile
>(
556 chunk_reader
: Arc
<dyn AsyncReadChunk
>,
557 target
: Arc
<DataStore
>,
559 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
560 ) -> Result
<(), Error
> {
561 use futures
::stream
::{self, StreamExt, TryStreamExt}
;
563 let start_time
= SystemTime
::now();
565 let stream
= stream
::iter(
566 (0..index
.index_count())
567 .map(|pos
| index
.chunk_info(pos
).unwrap())
569 let mut guard
= downloaded_chunks
.lock().unwrap();
570 let done
= guard
.contains(&info
.digest
);
572 // Note: We mark a chunk as downloaded before its actually downloaded
573 // to avoid duplicate downloads.
574 guard
.insert(info
.digest
);
580 let target2
= target
.clone();
581 let verify_pool
= ParallelHandler
::new(
584 move |(chunk
, digest
, size
): (DataBlob
, [u8; 32], u64)| {
585 // println!("verify and write {}", hex::encode(&digest));
586 chunk
.verify_unencrypted(size
as usize, &digest
)?
;
587 target2
.insert_chunk(&chunk
, &digest
)?
;
592 let verify_and_write_channel
= verify_pool
.channel();
594 let bytes
= Arc
::new(AtomicUsize
::new(0));
598 let target
= Arc
::clone(&target
);
599 let chunk_reader
= chunk_reader
.clone();
600 let bytes
= Arc
::clone(&bytes
);
601 let verify_and_write_channel
= verify_and_write_channel
.clone();
603 Ok
::<_
, Error
>(async
move {
604 let chunk_exists
= proxmox_async
::runtime
::block_in_place(|| {
605 target
.cond_touch_chunk(&info
.digest
, false)
608 //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest));
609 return Ok
::<_
, Error
>(());
611 //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest));
612 let chunk
= chunk_reader
.read_raw_chunk(&info
.digest
).await?
;
613 let raw_size
= chunk
.raw_size() as usize;
615 // decode, verify and write in a separate threads to maximize throughput
616 proxmox_async
::runtime
::block_in_place(|| {
617 verify_and_write_channel
.send((chunk
, info
.digest
, info
.size()))
620 bytes
.fetch_add(raw_size
, Ordering
::SeqCst
);
625 .try_buffer_unordered(20)
626 .try_for_each(|_res
| futures
::future
::ok(()))
629 drop(verify_and_write_channel
);
631 verify_pool
.complete()?
;
633 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
635 let bytes
= bytes
.load(Ordering
::SeqCst
);
639 "downloaded {} bytes ({:.2} MiB/s)",
641 (bytes
as f64) / (1024.0 * 1024.0 * elapsed
)
647 fn verify_archive(info
: &FileInfo
, csum
: &[u8; 32], size
: u64) -> Result
<(), Error
> {
648 if size
!= info
.size
{
650 "wrong size for file '{}' ({} != {})",
657 if csum
!= &info
.csum
{
658 bail
!("wrong checksum for file '{}'", info
.filename
);
664 /// Pulls a single file referenced by a manifest.
666 /// Pulling an archive consists of the following steps:
667 /// - Load archive file into tmp file
668 /// -- Load file into tmp file
669 /// -- Verify tmp file checksum
670 /// - if archive is an index, pull referenced chunks
671 /// - Rename tmp file into real path
672 async
fn pull_single_archive
<'a
>(
673 worker
: &'a WorkerTask
,
674 reader
: Arc
<dyn PullReader
+ 'a
>,
675 snapshot
: &'a pbs_datastore
::BackupDir
,
676 archive_info
: &'a FileInfo
,
677 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
678 ) -> Result
<(), Error
> {
679 let archive_name
= &archive_info
.filename
;
680 let mut path
= snapshot
.full_path();
681 path
.push(archive_name
);
683 let mut tmp_path
= path
.clone();
684 tmp_path
.set_extension("tmp");
686 task_log
!(worker
, "sync archive {}", archive_name
);
689 .load_file_into(archive_name
, &tmp_path
, worker
)
692 let mut tmpfile
= std
::fs
::OpenOptions
::new().read(true).open(&tmp_path
)?
;
694 match archive_type(archive_name
)?
{
695 ArchiveType
::DynamicIndex
=> {
696 let index
= DynamicIndexReader
::new(tmpfile
).map_err(|err
| {
697 format_err
!("unable to read dynamic index {:?} - {}", tmp_path
, err
)
699 let (csum
, size
) = index
.compute_csum();
700 verify_archive(archive_info
, &csum
, size
)?
;
702 if reader
.skip_chunk_sync(snapshot
.datastore().name()) {
703 task_log
!(worker
, "skipping chunk sync for same datastore");
707 reader
.chunk_reader(archive_info
.crypt_mode
),
708 snapshot
.datastore().clone(),
715 ArchiveType
::FixedIndex
=> {
716 let index
= FixedIndexReader
::new(tmpfile
).map_err(|err
| {
717 format_err
!("unable to read fixed index '{:?}' - {}", tmp_path
, err
)
719 let (csum
, size
) = index
.compute_csum();
720 verify_archive(archive_info
, &csum
, size
)?
;
722 if reader
.skip_chunk_sync(snapshot
.datastore().name()) {
723 task_log
!(worker
, "skipping chunk sync for same datastore");
727 reader
.chunk_reader(archive_info
.crypt_mode
),
728 snapshot
.datastore().clone(),
735 ArchiveType
::Blob
=> {
737 let (csum
, size
) = sha256(&mut tmpfile
)?
;
738 verify_archive(archive_info
, &csum
, size
)?
;
741 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
742 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
747 /// Actual implementation of pulling a snapshot.
749 /// Pulling a snapshot consists of the following steps:
750 /// - (Re)download the manifest
751 /// -- if it matches, only download log and treat snapshot as already synced
752 /// - Iterate over referenced files
753 /// -- if file already exists, verify contents
754 /// -- if not, pull it from the remote
755 /// - Download log if not already existing
756 async
fn pull_snapshot
<'a
>(
757 worker
: &'a WorkerTask
,
758 reader
: Arc
<dyn PullReader
+ 'a
>,
759 snapshot
: &'a pbs_datastore
::BackupDir
,
760 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
761 ) -> Result
<(), Error
> {
762 let mut manifest_name
= snapshot
.full_path();
763 manifest_name
.push(MANIFEST_BLOB_NAME
);
765 let mut client_log_name
= snapshot
.full_path();
766 client_log_name
.push(CLIENT_LOG_BLOB_NAME
);
768 let mut tmp_manifest_name
= manifest_name
.clone();
769 tmp_manifest_name
.set_extension("tmp");
770 let tmp_manifest_blob
;
771 if let Some(data
) = reader
772 .load_file_into(MANIFEST_BLOB_NAME
, &tmp_manifest_name
, worker
)
775 tmp_manifest_blob
= data
;
780 if manifest_name
.exists() {
781 let manifest_blob
= proxmox_lang
::try_block
!({
782 let mut manifest_file
= std
::fs
::File
::open(&manifest_name
).map_err(|err
| {
783 format_err
!("unable to open local manifest {manifest_name:?} - {err}")
786 let manifest_blob
= DataBlob
::load_from_reader(&mut manifest_file
)?
;
789 .map_err(|err
: Error
| {
790 format_err
!("unable to read local manifest {manifest_name:?} - {err}")
793 if manifest_blob
.raw_data() == tmp_manifest_blob
.raw_data() {
794 if !client_log_name
.exists() {
796 .try_download_client_log(&client_log_name
, worker
)
799 task_log
!(worker
, "no data changes");
800 let _
= std
::fs
::remove_file(&tmp_manifest_name
);
801 return Ok(()); // nothing changed
805 let manifest
= BackupManifest
::try_from(tmp_manifest_blob
)?
;
807 for item
in manifest
.files() {
808 let mut path
= snapshot
.full_path();
809 path
.push(&item
.filename
);
812 match archive_type(&item
.filename
)?
{
813 ArchiveType
::DynamicIndex
=> {
814 let index
= DynamicIndexReader
::open(&path
)?
;
815 let (csum
, size
) = index
.compute_csum();
816 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
819 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
823 ArchiveType
::FixedIndex
=> {
824 let index
= FixedIndexReader
::open(&path
)?
;
825 let (csum
, size
) = index
.compute_csum();
826 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
829 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
833 ArchiveType
::Blob
=> {
834 let mut tmpfile
= std
::fs
::File
::open(&path
)?
;
835 let (csum
, size
) = sha256(&mut tmpfile
)?
;
836 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
839 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
851 downloaded_chunks
.clone(),
856 if let Err(err
) = std
::fs
::rename(&tmp_manifest_name
, &manifest_name
) {
857 bail
!("Atomic rename file {:?} failed - {}", manifest_name
, err
);
860 if !client_log_name
.exists() {
862 .try_download_client_log(&client_log_name
, worker
)
866 .cleanup_unreferenced_files(&manifest
)
867 .map_err(|err
| format_err
!("failed to cleanup unreferenced files - {err}"))?
;
872 /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
874 /// The `reader` is configured to read from the source backup directory, while the
875 /// `snapshot` is pointing to the local datastore and target namespace.
876 async
fn pull_snapshot_from
<'a
>(
877 worker
: &'a WorkerTask
,
878 reader
: Arc
<dyn PullReader
+ 'a
>,
879 snapshot
: &'a pbs_datastore
::BackupDir
,
880 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
881 ) -> Result
<(), Error
> {
882 let (_path
, is_new
, _snap_lock
) = snapshot
884 .create_locked_backup_dir(snapshot
.backup_ns(), snapshot
.as_ref())?
;
887 task_log
!(worker
, "sync snapshot {}", snapshot
.dir());
889 if let Err(err
) = pull_snapshot(worker
, reader
, snapshot
, downloaded_chunks
).await
{
890 if let Err(cleanup_err
) = snapshot
.datastore().remove_backup_dir(
891 snapshot
.backup_ns(),
895 task_log
!(worker
, "cleanup error - {}", cleanup_err
);
899 task_log
!(worker
, "sync snapshot {} done", snapshot
.dir());
901 task_log
!(worker
, "re-sync snapshot {}", snapshot
.dir());
902 pull_snapshot(worker
, reader
, snapshot
, downloaded_chunks
).await?
;
908 #[derive(PartialEq, Eq)]
914 impl std
::fmt
::Display
for SkipReason
{
915 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
920 SkipReason
::AlreadySynced
=> "older than the newest local snapshot",
921 SkipReason
::TransferLast
=> "due to transfer-last",
931 skip_reason
: SkipReason
,
935 fn new(skip_reason
: SkipReason
) -> Self {
944 fn reset(&mut self) {
946 self.oldest
= i64::MAX
;
947 self.newest
= i64::MIN
;
950 fn update(&mut self, backup_time
: i64) {
953 if backup_time
< self.oldest
{
954 self.oldest
= backup_time
;
957 if backup_time
> self.newest
{
958 self.newest
= backup_time
;
962 fn affected(&self) -> Result
<String
, Error
> {
964 0 => Ok(String
::new()),
965 1 => Ok(proxmox_time
::epoch_to_rfc3339_utc(self.oldest
)?
),
968 proxmox_time
::epoch_to_rfc3339_utc(self.oldest
)?
,
969 proxmox_time
::epoch_to_rfc3339_utc(self.newest
)?
,
975 impl std
::fmt
::Display
for SkipInfo
{
976 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
979 "skipped: {} snapshot(s) ({}) - {}",
981 self.affected().map_err(|_
| std
::fmt
::Error
)?
,
987 /// Pulls a group according to `params`.
989 /// Pulling a group consists of the following steps:
990 /// - Query the list of snapshots available for this group in the source namespace on the remote
991 /// - Sort by snapshot time
992 /// - Get last snapshot timestamp on local datastore
993 /// - Iterate over list of snapshots
994 /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
995 /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
997 /// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
998 /// remote when querying snapshots. This allows us to interact with old remotes that don't have
999 /// namespace support yet.
1001 /// Permission checks:
1002 /// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
1003 /// - local group owner is already checked by pull_store
1004 async
fn pull_group(
1005 worker
: &WorkerTask
,
1006 params
: &PullParameters
,
1007 source_namespace
: &BackupNamespace
,
1008 group
: &BackupGroup
,
1009 progress
: &mut StoreProgress
,
1010 ) -> Result
<(), Error
> {
1011 let mut already_synced_skip_info
= SkipInfo
::new(SkipReason
::AlreadySynced
);
1012 let mut transfer_last_skip_info
= SkipInfo
::new(SkipReason
::TransferLast
);
1014 let mut raw_list
: Vec
<BackupDir
> = params
1016 .list_backup_dirs(source_namespace
, group
, worker
)
1018 raw_list
.sort_unstable_by(|a
, b
| a
.time
.cmp(&b
.time
));
1020 let total_amount
= raw_list
.len();
1024 .map(|count
| total_amount
.saturating_sub(count
))
1025 .unwrap_or_default();
1027 let target_ns
= source_namespace
.map_prefix(¶ms
.source
.get_ns(), ¶ms
.target
.ns
)?
;
1029 let mut source_snapshots
= HashSet
::new();
1030 let last_sync_time
= params
1033 .last_successful_backup(&target_ns
, group
)?
1034 .unwrap_or(i64::MIN
);
1036 let list
: Vec
<BackupDir
> = raw_list
1039 .filter(|&(pos
, ref dir
)| {
1040 source_snapshots
.insert(dir
.time
);
1041 if last_sync_time
> dir
.time
{
1042 already_synced_skip_info
.update(dir
.time
);
1044 } else if already_synced_skip_info
.count
> 0 {
1045 task_log
!(worker
, "{}", already_synced_skip_info
);
1046 already_synced_skip_info
.reset();
1050 if pos
< cutoff
&& last_sync_time
!= dir
.time
{
1051 transfer_last_skip_info
.update(dir
.time
);
1053 } else if transfer_last_skip_info
.count
> 0 {
1054 task_log
!(worker
, "{}", transfer_last_skip_info
);
1055 transfer_last_skip_info
.reset();
1059 .map(|(_
, dir
)| dir
)
1062 // start with 65536 chunks (up to 256 GiB)
1063 let downloaded_chunks
= Arc
::new(Mutex
::new(HashSet
::with_capacity(1024 * 64)));
1065 progress
.group_snapshots
= list
.len() as u64;
1067 for (pos
, from_snapshot
) in list
.into_iter().enumerate() {
1068 let to_snapshot
= params
1071 .backup_dir(target_ns
.clone(), from_snapshot
.clone())?
;
1075 .reader(source_namespace
, &from_snapshot
)
1078 pull_snapshot_from(worker
, reader
, &to_snapshot
, downloaded_chunks
.clone()).await
;
1080 progress
.done_snapshots
= pos
as u64 + 1;
1081 task_log
!(worker
, "percentage done: {}", progress
);
1083 result?
; // stop on error
1086 if params
.remove_vanished
{
1090 .backup_group(target_ns
.clone(), group
.clone());
1091 let local_list
= group
.list_backups()?
;
1092 for info
in local_list
{
1093 let snapshot
= info
.backup_dir
;
1094 if source_snapshots
.contains(&snapshot
.backup_time()) {
1097 if snapshot
.is_protected() {
1100 "don't delete vanished snapshot {} (protected)",
1105 task_log
!(worker
, "delete vanished snapshot {}", snapshot
.dir());
1109 .remove_backup_dir(&target_ns
, snapshot
.as_ref(), false)?
;
1116 fn check_and_create_ns(params
: &PullParameters
, ns
: &BackupNamespace
) -> Result
<bool
, Error
> {
1117 let mut created
= false;
1118 let store_ns_str
= print_store_and_ns(params
.target
.store
.name(), ns
);
1120 if !ns
.is_root() && !params
.target
.store
.namespace_path(ns
).exists() {
1121 check_ns_modification_privs(params
.target
.store
.name(), ns
, ¶ms
.owner
)
1122 .map_err(|err
| format_err
!("Creating {ns} not allowed - {err}"))?
;
1124 let name
= match ns
.components().last() {
1125 Some(name
) => name
.to_owned(),
1127 bail
!("Failed to determine last component of namespace.");
1131 if let Err(err
) = params
.target
.store
.create_namespace(&ns
.parent(), name
) {
1132 bail
!("sync into {store_ns_str} failed - namespace creation failed: {err}");
1138 params
.target
.store
.name(),
1141 PRIV_DATASTORE_BACKUP
,
1143 .map_err(|err
| format_err
!("sync into {store_ns_str} not allowed - {err}"))?
;
1148 fn check_and_remove_ns(params
: &PullParameters
, local_ns
: &BackupNamespace
) -> Result
<bool
, Error
> {
1149 check_ns_modification_privs(params
.target
.store
.name(), local_ns
, ¶ms
.owner
)
1150 .map_err(|err
| format_err
!("Removing {local_ns} not allowed - {err}"))?
;
1155 .remove_namespace_recursive(local_ns
, true)
1158 fn check_and_remove_vanished_ns(
1159 worker
: &WorkerTask
,
1160 params
: &PullParameters
,
1161 synced_ns
: HashSet
<BackupNamespace
>,
1162 ) -> Result
<bool
, Error
> {
1163 let mut errors
= false;
1164 let user_info
= CachedUserInfo
::new()?
;
1166 // clamp like remote does so that we don't list more than we can ever have synced.
1167 let max_depth
= params
1169 .unwrap_or_else(|| MAX_NAMESPACE_DEPTH
- params
.source
.get_ns().depth());
1171 let mut local_ns_list
: Vec
<BackupNamespace
> = params
1174 .recursive_iter_backup_ns_ok(params
.target
.ns
.clone(), Some(max_depth
))?
1177 user_info
.lookup_privs(¶ms
.owner
, &ns
.acl_path(params
.target
.store
.name()));
1178 user_privs
& (PRIV_DATASTORE_BACKUP
| PRIV_DATASTORE_AUDIT
) != 0
1183 local_ns_list
.sort_unstable_by_key(|b
| std
::cmp
::Reverse(b
.name_len()));
1185 for local_ns
in local_ns_list
{
1186 if local_ns
== params
.target
.ns
{
1190 if synced_ns
.contains(&local_ns
) {
1194 if local_ns
.is_root() {
1197 match check_and_remove_ns(params
, &local_ns
) {
1198 Ok(true) => task_log
!(worker
, "Removed namespace {}", local_ns
),
1199 Ok(false) => task_log
!(
1201 "Did not remove namespace {} - protected snapshots remain",
1205 task_log
!(worker
, "Failed to remove namespace {} - {}", local_ns
, err
);
1214 /// Pulls a store according to `params`.
1216 /// Pulling a store consists of the following steps:
1217 /// - Query list of namespaces on the remote
1219 /// -- create sub-NS if needed (and allowed)
1220 /// -- attempt to pull each NS in turn
1221 /// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote
1223 /// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is
1224 /// passed to the remote at all to allow pulling from remotes which have no notion of namespaces.
1226 /// Permission checks:
1227 /// - access to local datastore, namespace anchor and remote entry need to be checked at call site
1228 /// - remote namespaces are filtered by remote
1229 /// - creation and removal of sub-NS checked here
1230 /// - access to sub-NS checked here
1231 pub(crate) async
fn pull_store(
1232 worker
: &WorkerTask
,
1233 mut params
: PullParameters
,
1234 ) -> Result
<(), Error
> {
1235 // explicit create shared lock to prevent GC on newly created chunks
1236 let _shared_store_lock
= params
.target
.store
.try_shared_chunk_store_lock()?
;
1237 let mut errors
= false;
1239 let old_max_depth
= params
.max_depth
;
1240 let mut namespaces
= if params
.source
.get_ns().is_root() && old_max_depth
== Some(0) {
1241 vec
![params
.source
.get_ns()] // backwards compat - don't query remote namespaces!
1245 .list_namespaces(&mut params
.max_depth
, worker
)
1249 let ns_layers_to_be_pulled
= namespaces
1251 .map(BackupNamespace
::depth
)
1253 .map_or(0, |v
| v
- params
.source
.get_ns().depth());
1254 let target_depth
= params
.target
.ns
.depth();
1256 if ns_layers_to_be_pulled
+ target_depth
> MAX_NAMESPACE_DEPTH
{
1258 "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
1259 ns_layers_to_be_pulled
,
1265 errors
|= old_max_depth
!= params
.max_depth
; // fail job if we switched to backwards-compat mode
1266 namespaces
.sort_unstable_by_key(|a
| a
.name_len());
1268 let (mut groups
, mut snapshots
) = (0, 0);
1269 let mut synced_ns
= HashSet
::with_capacity(namespaces
.len());
1271 for namespace
in namespaces
{
1272 let source_store_ns_str
= print_store_and_ns(params
.source
.get_store(), &namespace
);
1274 let target_ns
= namespace
.map_prefix(¶ms
.source
.get_ns(), ¶ms
.target
.ns
)?
;
1275 let target_store_ns_str
= print_store_and_ns(params
.target
.store
.name(), &target_ns
);
1277 task_log
!(worker
, "----");
1280 "Syncing {} into {}",
1281 source_store_ns_str
,
1285 synced_ns
.insert(target_ns
.clone());
1287 match check_and_create_ns(¶ms
, &target_ns
) {
1288 Ok(true) => task_log
!(worker
, "Created namespace {}", target_ns
),
1293 "Cannot sync {} into {} - {}",
1294 source_store_ns_str
,
1295 target_store_ns_str
,
1303 match pull_ns(worker
, &namespace
, &mut params
).await
{
1304 Ok((ns_progress
, ns_errors
)) => {
1305 errors
|= ns_errors
;
1307 if params
.max_depth
!= Some(0) {
1308 groups
+= ns_progress
.done_groups
;
1309 snapshots
+= ns_progress
.done_snapshots
;
1312 "Finished syncing namespace {}, current progress: {} groups, {} snapshots",
1323 "Encountered errors while syncing namespace {} - {}",
1331 if params
.remove_vanished
{
1332 errors
|= check_and_remove_vanished_ns(worker
, ¶ms
, synced_ns
)?
;
1336 bail
!("sync failed with some errors.");
1342 /// Pulls a namespace according to `params`.
1344 /// Pulling a namespace consists of the following steps:
1345 /// - Query list of groups on the remote (in `source_ns`)
1346 /// - Filter list according to configured group filters
1347 /// - Iterate list and attempt to pull each group in turn
1348 /// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
1349 /// not or no longer available on the remote
1351 /// Permission checks:
1352 /// - remote namespaces are filtered by remote
1353 /// - owner check for vanished groups done here
1354 pub(crate) async
fn pull_ns(
1355 worker
: &WorkerTask
,
1356 namespace
: &BackupNamespace
,
1357 params
: &mut PullParameters
,
1358 ) -> Result
<(StoreProgress
, bool
), Error
> {
1359 let mut list
: Vec
<BackupGroup
> = params
.source
.list_groups(namespace
, ¶ms
.owner
).await?
;
1361 let total_count
= list
.len();
1362 list
.sort_unstable_by(|a
, b
| {
1363 let type_order
= a
.ty
.cmp(&b
.ty
);
1364 if type_order
== std
::cmp
::Ordering
::Equal
{
1371 let apply_filters
= |group
: &BackupGroup
, filters
: &[GroupFilter
]| -> bool
{
1372 filters
.iter().any(|filter
| group
.matches(filter
))
1375 let list
= if let Some(ref group_filter
) = ¶ms
.group_filter
{
1376 let unfiltered_count
= list
.len();
1377 let list
: Vec
<BackupGroup
> = list
1379 .filter(|group
| apply_filters(group
, group_filter
))
1383 "found {} groups to sync (out of {} total)",
1389 task_log
!(worker
, "found {} groups to sync", total_count
);
1393 let mut errors
= false;
1395 let mut new_groups
= HashSet
::new();
1396 for group
in list
.iter() {
1397 new_groups
.insert(group
.clone());
1400 let mut progress
= StoreProgress
::new(list
.len() as u64);
1402 let target_ns
= namespace
.map_prefix(¶ms
.source
.get_ns(), ¶ms
.target
.ns
)?
;
1404 for (done
, group
) in list
.into_iter().enumerate() {
1405 progress
.done_groups
= done
as u64;
1406 progress
.done_snapshots
= 0;
1407 progress
.group_snapshots
= 0;
1409 let (owner
, _lock_guard
) =
1413 .create_locked_backup_group(&target_ns
, &group
, ¶ms
.owner
)
1415 Ok(result
) => result
,
1419 "sync group {} failed - group lock failed: {}",
1424 // do not stop here, instead continue
1425 task_log
!(worker
, "create_locked_backup_group failed");
1431 if params
.owner
!= owner
{
1432 // only the owner is allowed to create additional snapshots
1435 "sync group {} failed - owner check failed ({} != {})",
1440 errors
= true; // do not stop here, instead continue
1441 } else if let Err(err
) = pull_group(worker
, params
, namespace
, &group
, &mut progress
).await
1443 task_log
!(worker
, "sync group {} failed - {}", &group
, err
,);
1444 errors
= true; // do not stop here, instead continue
1448 if params
.remove_vanished
{
1449 let result
: Result
<(), Error
> = proxmox_lang
::try_block
!({
1450 for local_group
in params
.target
.store
.iter_backup_groups(target_ns
.clone())?
{
1451 let local_group
= local_group?
;
1452 let local_group
= local_group
.group();
1453 if new_groups
.contains(local_group
) {
1456 let owner
= params
.target
.store
.get_owner(&target_ns
, local_group
)?
;
1457 if check_backup_owner(&owner
, ¶ms
.owner
).is_err() {
1460 if let Some(ref group_filter
) = ¶ms
.group_filter
{
1461 if !apply_filters(local_group
, group_filter
) {
1465 task_log
!(worker
, "delete vanished group '{local_group}'",);
1469 .remove_backup_group(&target_ns
, local_group
)
1475 "kept some protected snapshots of group '{}'",
1480 task_log
!(worker
, "{}", err
);
1487 if let Err(err
) = result
{
1488 task_log
!(worker
, "error during cleanup: {}", err
);
1493 Ok((progress
, errors
))