1 //! Sync datastore from remote server
3 use std
::collections
::{HashMap, HashSet}
;
6 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
7 use std
::sync
::{Arc, Mutex}
;
8 use std
::time
::SystemTime
;
10 use anyhow
::{bail, format_err, Error}
;
12 use proxmox_rest_server
::WorkerTask
;
13 use proxmox_router
::HttpError
;
14 use proxmox_sys
::{task_log, task_warn}
;
18 print_store_and_ns
, Authid
, BackupDir
, BackupGroup
, BackupNamespace
, CryptMode
, GroupFilter
,
19 GroupListItem
, Operation
, RateLimitConfig
, Remote
, SnapshotListItem
, MAX_NAMESPACE_DEPTH
,
20 PRIV_DATASTORE_AUDIT
, PRIV_DATASTORE_BACKUP
,
22 use pbs_client
::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}
;
23 use pbs_config
::CachedUserInfo
;
24 use pbs_datastore
::data_blob
::DataBlob
;
25 use pbs_datastore
::dynamic_index
::DynamicIndexReader
;
26 use pbs_datastore
::fixed_index
::FixedIndexReader
;
27 use pbs_datastore
::index
::IndexFile
;
28 use pbs_datastore
::manifest
::{
29 archive_type
, ArchiveType
, BackupManifest
, FileInfo
, CLIENT_LOG_BLOB_NAME
, MANIFEST_BLOB_NAME
,
31 use pbs_datastore
::read_chunk
::AsyncReadChunk
;
32 use pbs_datastore
::{check_backup_owner, DataStore, StoreProgress}
;
33 use pbs_tools
::sha
::sha256
;
35 use crate::backup
::{check_ns_modification_privs, check_ns_privs}
;
36 use crate::tools
::parallel_handler
::ParallelHandler
;
39 backup_reader
: Arc
<BackupReader
>,
43 pub(crate) struct PullTarget
{
44 store
: Arc
<DataStore
>,
48 pub(crate) struct RemoteSource
{
49 repo
: BackupRepository
,
54 #[async_trait::async_trait]
55 /// `PullSource` is a trait that provides an interface for pulling data/information from a source.
56 /// The trait includes methods for listing namespaces, groups, and backup directories,
57 /// as well as retrieving a reader for reading data from the source
58 trait PullSource
: Send
+ Sync
{
59 /// Lists namespaces from the source.
60 async
fn list_namespaces(
62 max_depth
: &mut Option
<usize>,
64 ) -> Result
<Vec
<BackupNamespace
>, Error
>;
66 /// Lists groups within a specific namespace from the source.
69 namespace
: &BackupNamespace
,
71 ) -> Result
<Vec
<BackupGroup
>, Error
>;
73 /// Lists backup directories for a specific group within a specific namespace from the source.
74 async
fn list_backup_dirs(
76 namespace
: &BackupNamespace
,
79 ) -> Result
<Vec
<BackupDir
>, Error
>;
80 fn get_ns(&self) -> BackupNamespace
;
81 fn print_store_and_ns(&self) -> String
;
83 /// Returns a reader for reading data from a specific backup directory.
88 ) -> Result
<Arc
<dyn PullReader
>, Error
>;
91 #[async_trait::async_trait]
92 impl PullSource
for RemoteSource
{
93 async
fn list_namespaces(
95 max_depth
: &mut Option
<usize>,
97 ) -> Result
<Vec
<BackupNamespace
>, Error
> {
98 if self.ns
.is_root() && max_depth
.map_or(false, |depth
| depth
== 0) {
99 return Ok(vec
![self.ns
.clone()]);
102 let path
= format
!("api2/json/admin/datastore/{}/namespace", self.repo
.store());
103 let mut data
= json
!({}
);
104 if let Some(max_depth
) = max_depth
{
105 data
["max-depth"] = json
!(max_depth
);
108 if !self.ns
.is_root() {
109 data
["parent"] = json
!(self.ns
);
111 self.client
.login().await?
;
113 let mut result
= match self.client
.get(&path
, Some(data
)).await
{
115 Err(err
) => match err
.downcast_ref
::<HttpError
>() {
116 Some(HttpError { code, message }
) => match code
{
117 &StatusCode
::NOT_FOUND
=> {
118 if self.ns
.is_root() && max_depth
.is_none() {
119 task_warn
!(worker
, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
120 task_warn
!(worker
, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
121 max_depth
.replace(0);
123 bail
!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
126 return Ok(vec
![self.ns
.clone()]);
129 bail
!("Querying namespaces failed - HTTP error {code} - {message}");
133 bail
!("Querying namespaces failed - {err}");
138 let list
: Vec
<BackupNamespace
> =
139 serde_json
::from_value
::<Vec
<pbs_api_types
::NamespaceListItem
>>(result
["data"].take())?
141 .map(|list_item
| list_item
.ns
)
147 async
fn list_groups(
149 namespace
: &BackupNamespace
,
151 ) -> Result
<Vec
<BackupGroup
>, Error
> {
152 let path
= format
!("api2/json/admin/datastore/{}/groups", self.repo
.store());
154 let args
= if !namespace
.is_root() {
155 Some(json
!({ "ns": namespace.clone() }
))
160 self.client
.login().await?
;
162 self.client
.get(&path
, args
).await
.map_err(|err
| {
163 format_err
!("Failed to retrieve backup groups from remote - {}", err
)
167 serde_json
::from_value
::<Vec
<GroupListItem
>>(result
["data"].take())
168 .map_err(Error
::from
)?
170 .map(|item
| item
.backup
)
171 .collect
::<Vec
<BackupGroup
>>(),
175 async
fn list_backup_dirs(
177 _namespace
: &BackupNamespace
,
180 ) -> Result
<Vec
<BackupDir
>, Error
> {
181 let path
= format
!("api2/json/admin/datastore/{}/snapshots", self.repo
.store());
183 let mut args
= json
!({
184 "backup-type": group
.ty
,
185 "backup-id": group
.id
,
188 if !self.ns
.is_root() {
189 args
["ns"] = serde_json
::to_value(&self.ns
)?
;
192 self.client
.login().await?
;
194 let mut result
= self.client
.get(&path
, Some(args
)).await?
;
195 let snapshot_list
: Vec
<SnapshotListItem
> = serde_json
::from_value(result
["data"].take())?
;
198 .filter_map(|item
: SnapshotListItem
| {
199 let snapshot
= item
.backup
;
200 // in-progress backups can't be synced
201 if item
.size
.is_none() {
204 "skipping snapshot {} - in-progress backup",
212 .collect
::<Vec
<BackupDir
>>())
215 fn get_ns(&self) -> BackupNamespace
{
219 fn print_store_and_ns(&self) -> String
{
220 print_store_and_ns(self.repo
.store(), &self.ns
)
225 ns
: &BackupNamespace
,
227 ) -> Result
<Arc
<dyn PullReader
>, Error
> {
229 BackupReader
::start(&self.client
, None
, self.repo
.store(), ns
, dir
, true).await?
;
230 Ok(Arc
::new(RemoteReader
{
237 #[async_trait::async_trait]
238 /// `PullReader` is a trait that provides an interface for reading data from a source.
239 /// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
240 trait PullReader
: Send
+ Sync
{
241 /// Returns a chunk reader with the specified encryption mode.
242 fn chunk_reader(&self, crypt_mode
: CryptMode
) -> Arc
<dyn AsyncReadChunk
>;
244 /// Asynchronously loads a file from the source into a local file.
245 /// `filename` is the name of the file to load from the source.
246 /// `into` is the path of the local file to load the source file into.
247 async
fn load_file_into(
252 ) -> Result
<Option
<DataBlob
>, Error
>;
254 /// Tries to download the client log from the source and save it into a local file.
255 async
fn try_download_client_log(
259 ) -> Result
<(), Error
>;
261 fn skip_chunk_sync(&self, target_store_name
: &str) -> bool
;
264 #[async_trait::async_trait]
265 impl PullReader
for RemoteReader
{
266 fn chunk_reader(&self, crypt_mode
: CryptMode
) -> Arc
<dyn AsyncReadChunk
> {
267 Arc
::new(RemoteChunkReader
::new(
268 self.backup_reader
.clone(),
275 async
fn load_file_into(
280 ) -> Result
<Option
<DataBlob
>, Error
> {
281 let mut tmp_file
= std
::fs
::OpenOptions
::new()
287 let download_result
= self.backup_reader
.download(filename
, &mut tmp_file
).await
;
288 if let Err(err
) = download_result
{
289 match err
.downcast_ref
::<HttpError
>() {
290 Some(HttpError { code, message }
) => match *code
{
291 StatusCode
::NOT_FOUND
=> {
294 "skipping snapshot {} - vanished since start of sync",
300 bail
!("HTTP error {code} - {message}");
309 Ok(DataBlob
::load_from_reader(&mut tmp_file
).ok())
312 async
fn try_download_client_log(
316 ) -> Result
<(), Error
> {
317 let mut tmp_path
= to_path
.to_owned();
318 tmp_path
.set_extension("tmp");
320 let tmpfile
= std
::fs
::OpenOptions
::new()
326 // Note: be silent if there is no log - only log successful download
329 .download(CLIENT_LOG_BLOB_NAME
, tmpfile
)
332 if let Err(err
) = std
::fs
::rename(&tmp_path
, to_path
) {
333 bail
!("Atomic rename file {:?} failed - {}", to_path
, err
);
335 task_log
!(worker
, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME
);
341 fn skip_chunk_sync(&self, _target_store_name
: &str) -> bool
{
346 /// Parameters for a pull operation.
347 pub(crate) struct PullParameters
{
348 /// Where data is pulled from
349 source
: Arc
<dyn PullSource
>,
350 /// Where data should be pulled into
352 /// Owner of synced groups (needs to match local owner of pre-existing groups)
354 /// Whether to remove groups which exist locally, but not on the remote end
355 remove_vanished
: bool
,
356 /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion)
357 max_depth
: Option
<usize>,
358 /// Filters for reducing the pull scope
359 group_filter
: Option
<Vec
<GroupFilter
>>,
360 /// How many snapshots should be transferred at most (taking the newest N snapshots)
361 transfer_last
: Option
<usize>,
364 impl PullParameters
{
365 /// Creates a new instance of `PullParameters`.
369 remote
: Option
<&str>,
371 remote_ns
: BackupNamespace
,
373 remove_vanished
: Option
<bool
>,
374 max_depth
: Option
<usize>,
375 group_filter
: Option
<Vec
<GroupFilter
>>,
376 limit
: RateLimitConfig
,
377 transfer_last
: Option
<usize>,
378 ) -> Result
<Self, Error
> {
379 if let Some(max_depth
) = max_depth
{
380 ns
.check_max_depth(max_depth
)?
;
381 remote_ns
.check_max_depth(max_depth
)?
;
383 let remove_vanished
= remove_vanished
.unwrap_or(false);
385 let source
: Arc
<dyn PullSource
> = if let Some(remote
) = remote
{
386 let (remote_config
, _digest
) = pbs_config
::remote
::config()?
;
387 let remote
: Remote
= remote_config
.lookup("remote", remote
)?
;
389 let repo
= BackupRepository
::new(
390 Some(remote
.config
.auth_id
.clone()),
391 Some(remote
.config
.host
.clone()),
393 remote_store
.to_string(),
395 let client
= crate::api2
::config
::remote
::remote_client_config(&remote
, Some(limit
))?
;
396 Arc
::new(RemoteSource
{
402 bail
!("local sync not implemented yet")
404 let target
= PullTarget
{
405 store
: DataStore
::lookup_datastore(store
, Some(Operation
::Write
))?
,
421 async
fn pull_index_chunks
<I
: IndexFile
>(
423 chunk_reader
: Arc
<dyn AsyncReadChunk
>,
424 target
: Arc
<DataStore
>,
426 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
427 ) -> Result
<(), Error
> {
428 use futures
::stream
::{self, StreamExt, TryStreamExt}
;
430 let start_time
= SystemTime
::now();
432 let stream
= stream
::iter(
433 (0..index
.index_count())
434 .map(|pos
| index
.chunk_info(pos
).unwrap())
436 let mut guard
= downloaded_chunks
.lock().unwrap();
437 let done
= guard
.contains(&info
.digest
);
439 // Note: We mark a chunk as downloaded before its actually downloaded
440 // to avoid duplicate downloads.
441 guard
.insert(info
.digest
);
447 let target2
= target
.clone();
448 let verify_pool
= ParallelHandler
::new(
451 move |(chunk
, digest
, size
): (DataBlob
, [u8; 32], u64)| {
452 // println!("verify and write {}", hex::encode(&digest));
453 chunk
.verify_unencrypted(size
as usize, &digest
)?
;
454 target2
.insert_chunk(&chunk
, &digest
)?
;
459 let verify_and_write_channel
= verify_pool
.channel();
461 let bytes
= Arc
::new(AtomicUsize
::new(0));
465 let target
= Arc
::clone(&target
);
466 let chunk_reader
= chunk_reader
.clone();
467 let bytes
= Arc
::clone(&bytes
);
468 let verify_and_write_channel
= verify_and_write_channel
.clone();
470 Ok
::<_
, Error
>(async
move {
471 let chunk_exists
= proxmox_async
::runtime
::block_in_place(|| {
472 target
.cond_touch_chunk(&info
.digest
, false)
475 //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest));
476 return Ok
::<_
, Error
>(());
478 //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest));
479 let chunk
= chunk_reader
.read_raw_chunk(&info
.digest
).await?
;
480 let raw_size
= chunk
.raw_size() as usize;
482 // decode, verify and write in a separate threads to maximize throughput
483 proxmox_async
::runtime
::block_in_place(|| {
484 verify_and_write_channel
.send((chunk
, info
.digest
, info
.size()))
487 bytes
.fetch_add(raw_size
, Ordering
::SeqCst
);
492 .try_buffer_unordered(20)
493 .try_for_each(|_res
| futures
::future
::ok(()))
496 drop(verify_and_write_channel
);
498 verify_pool
.complete()?
;
500 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
502 let bytes
= bytes
.load(Ordering
::SeqCst
);
506 "downloaded {} bytes ({:.2} MiB/s)",
508 (bytes
as f64) / (1024.0 * 1024.0 * elapsed
)
514 fn verify_archive(info
: &FileInfo
, csum
: &[u8; 32], size
: u64) -> Result
<(), Error
> {
515 if size
!= info
.size
{
517 "wrong size for file '{}' ({} != {})",
524 if csum
!= &info
.csum
{
525 bail
!("wrong checksum for file '{}'", info
.filename
);
531 /// Pulls a single file referenced by a manifest.
533 /// Pulling an archive consists of the following steps:
534 /// - Load archive file into tmp file
535 /// -- Load file into tmp file
536 /// -- Verify tmp file checksum
537 /// - if archive is an index, pull referenced chunks
538 /// - Rename tmp file into real path
539 async
fn pull_single_archive
<'a
>(
540 worker
: &'a WorkerTask
,
541 reader
: Arc
<dyn PullReader
+ 'a
>,
542 snapshot
: &'a pbs_datastore
::BackupDir
,
543 archive_info
: &'a FileInfo
,
544 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
545 ) -> Result
<(), Error
> {
546 let archive_name
= &archive_info
.filename
;
547 let mut path
= snapshot
.full_path();
548 path
.push(archive_name
);
550 let mut tmp_path
= path
.clone();
551 tmp_path
.set_extension("tmp");
553 task_log
!(worker
, "sync archive {}", archive_name
);
556 .load_file_into(archive_name
, &tmp_path
, worker
)
559 let mut tmpfile
= std
::fs
::OpenOptions
::new().read(true).open(&tmp_path
)?
;
561 match archive_type(archive_name
)?
{
562 ArchiveType
::DynamicIndex
=> {
563 let index
= DynamicIndexReader
::new(tmpfile
).map_err(|err
| {
564 format_err
!("unable to read dynamic index {:?} - {}", tmp_path
, err
)
566 let (csum
, size
) = index
.compute_csum();
567 verify_archive(archive_info
, &csum
, size
)?
;
569 if reader
.skip_chunk_sync(snapshot
.datastore().name()) {
570 task_log
!(worker
, "skipping chunk sync for same datastore");
574 reader
.chunk_reader(archive_info
.crypt_mode
),
575 snapshot
.datastore().clone(),
582 ArchiveType
::FixedIndex
=> {
583 let index
= FixedIndexReader
::new(tmpfile
).map_err(|err
| {
584 format_err
!("unable to read fixed index '{:?}' - {}", tmp_path
, err
)
586 let (csum
, size
) = index
.compute_csum();
587 verify_archive(archive_info
, &csum
, size
)?
;
589 if reader
.skip_chunk_sync(snapshot
.datastore().name()) {
590 task_log
!(worker
, "skipping chunk sync for same datastore");
594 reader
.chunk_reader(archive_info
.crypt_mode
),
595 snapshot
.datastore().clone(),
602 ArchiveType
::Blob
=> {
604 let (csum
, size
) = sha256(&mut tmpfile
)?
;
605 verify_archive(archive_info
, &csum
, size
)?
;
608 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
609 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
614 /// Actual implementation of pulling a snapshot.
616 /// Pulling a snapshot consists of the following steps:
617 /// - (Re)download the manifest
618 /// -- if it matches, only download log and treat snapshot as already synced
619 /// - Iterate over referenced files
620 /// -- if file already exists, verify contents
621 /// -- if not, pull it from the remote
622 /// - Download log if not already existing
623 async
fn pull_snapshot
<'a
>(
624 worker
: &'a WorkerTask
,
625 reader
: Arc
<dyn PullReader
+ 'a
>,
626 snapshot
: &'a pbs_datastore
::BackupDir
,
627 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
628 ) -> Result
<(), Error
> {
629 let mut manifest_name
= snapshot
.full_path();
630 manifest_name
.push(MANIFEST_BLOB_NAME
);
632 let mut client_log_name
= snapshot
.full_path();
633 client_log_name
.push(CLIENT_LOG_BLOB_NAME
);
635 let mut tmp_manifest_name
= manifest_name
.clone();
636 tmp_manifest_name
.set_extension("tmp");
637 let tmp_manifest_blob
;
638 if let Some(data
) = reader
639 .load_file_into(MANIFEST_BLOB_NAME
, &tmp_manifest_name
, worker
)
642 tmp_manifest_blob
= data
;
647 if manifest_name
.exists() {
648 let manifest_blob
= proxmox_lang
::try_block
!({
649 let mut manifest_file
= std
::fs
::File
::open(&manifest_name
).map_err(|err
| {
650 format_err
!("unable to open local manifest {manifest_name:?} - {err}")
653 let manifest_blob
= DataBlob
::load_from_reader(&mut manifest_file
)?
;
656 .map_err(|err
: Error
| {
657 format_err
!("unable to read local manifest {manifest_name:?} - {err}")
660 if manifest_blob
.raw_data() == tmp_manifest_blob
.raw_data() {
661 if !client_log_name
.exists() {
663 .try_download_client_log(&client_log_name
, worker
)
666 task_log
!(worker
, "no data changes");
667 let _
= std
::fs
::remove_file(&tmp_manifest_name
);
668 return Ok(()); // nothing changed
672 let manifest
= BackupManifest
::try_from(tmp_manifest_blob
)?
;
674 for item
in manifest
.files() {
675 let mut path
= snapshot
.full_path();
676 path
.push(&item
.filename
);
679 match archive_type(&item
.filename
)?
{
680 ArchiveType
::DynamicIndex
=> {
681 let index
= DynamicIndexReader
::open(&path
)?
;
682 let (csum
, size
) = index
.compute_csum();
683 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
686 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
690 ArchiveType
::FixedIndex
=> {
691 let index
= FixedIndexReader
::open(&path
)?
;
692 let (csum
, size
) = index
.compute_csum();
693 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
696 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
700 ArchiveType
::Blob
=> {
701 let mut tmpfile
= std
::fs
::File
::open(&path
)?
;
702 let (csum
, size
) = sha256(&mut tmpfile
)?
;
703 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
706 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
718 downloaded_chunks
.clone(),
723 if let Err(err
) = std
::fs
::rename(&tmp_manifest_name
, &manifest_name
) {
724 bail
!("Atomic rename file {:?} failed - {}", manifest_name
, err
);
727 if !client_log_name
.exists() {
729 .try_download_client_log(&client_log_name
, worker
)
733 .cleanup_unreferenced_files(&manifest
)
734 .map_err(|err
| format_err
!("failed to cleanup unreferenced files - {err}"))?
;
739 /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
741 /// The `reader` is configured to read from the source backup directory, while the
742 /// `snapshot` is pointing to the local datastore and target namespace.
743 async
fn pull_snapshot_from
<'a
>(
744 worker
: &'a WorkerTask
,
745 reader
: Arc
<dyn PullReader
+ 'a
>,
746 snapshot
: &'a pbs_datastore
::BackupDir
,
747 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
748 ) -> Result
<(), Error
> {
749 let (_path
, is_new
, _snap_lock
) = snapshot
751 .create_locked_backup_dir(snapshot
.backup_ns(), snapshot
.as_ref())?
;
754 task_log
!(worker
, "sync snapshot {}", snapshot
.dir());
756 if let Err(err
) = pull_snapshot(worker
, reader
, snapshot
, downloaded_chunks
).await
{
757 if let Err(cleanup_err
) = snapshot
.datastore().remove_backup_dir(
758 snapshot
.backup_ns(),
762 task_log
!(worker
, "cleanup error - {}", cleanup_err
);
766 task_log
!(worker
, "sync snapshot {} done", snapshot
.dir());
768 task_log
!(worker
, "re-sync snapshot {}", snapshot
.dir());
769 pull_snapshot(worker
, reader
, snapshot
, downloaded_chunks
).await?
;
775 #[derive(PartialEq, Eq)]
781 impl std
::fmt
::Display
for SkipReason
{
782 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
787 SkipReason
::AlreadySynced
=> "older than the newest local snapshot",
788 SkipReason
::TransferLast
=> "due to transfer-last",
798 skip_reason
: SkipReason
,
802 fn new(skip_reason
: SkipReason
) -> Self {
811 fn reset(&mut self) {
813 self.oldest
= i64::MAX
;
814 self.newest
= i64::MIN
;
817 fn update(&mut self, backup_time
: i64) {
820 if backup_time
< self.oldest
{
821 self.oldest
= backup_time
;
824 if backup_time
> self.newest
{
825 self.newest
= backup_time
;
829 fn affected(&self) -> Result
<String
, Error
> {
831 0 => Ok(String
::new()),
832 1 => Ok(proxmox_time
::epoch_to_rfc3339_utc(self.oldest
)?
),
835 proxmox_time
::epoch_to_rfc3339_utc(self.oldest
)?
,
836 proxmox_time
::epoch_to_rfc3339_utc(self.newest
)?
,
842 impl std
::fmt
::Display
for SkipInfo
{
843 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
846 "skipped: {} snapshot(s) ({}) - {}",
848 self.affected().map_err(|_
| std
::fmt
::Error
)?
,
854 /// Pulls a group according to `params`.
856 /// Pulling a group consists of the following steps:
857 /// - Query the list of snapshots available for this group in the source namespace on the remote
858 /// - Sort by snapshot time
859 /// - Get last snapshot timestamp on local datastore
860 /// - Iterate over list of snapshots
861 /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
862 /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
864 /// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
865 /// remote when querying snapshots. This allows us to interact with old remotes that don't have
866 /// namespace support yet.
868 /// Permission checks:
869 /// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
870 /// - local group owner is already checked by pull_store
873 params
: &PullParameters
,
874 source_namespace
: &BackupNamespace
,
876 progress
: &mut StoreProgress
,
877 ) -> Result
<(), Error
> {
878 let mut already_synced_skip_info
= SkipInfo
::new(SkipReason
::AlreadySynced
);
879 let mut transfer_last_skip_info
= SkipInfo
::new(SkipReason
::TransferLast
);
881 let mut raw_list
: Vec
<BackupDir
> = params
883 .list_backup_dirs(source_namespace
, group
, worker
)
885 raw_list
.sort_unstable_by(|a
, b
| a
.time
.cmp(&b
.time
));
887 let total_amount
= raw_list
.len();
891 .map(|count
| total_amount
.saturating_sub(count
))
892 .unwrap_or_default();
894 let target_ns
= source_namespace
.map_prefix(¶ms
.source
.get_ns(), ¶ms
.target
.ns
)?
;
896 let mut source_snapshots
= HashSet
::new();
897 let last_sync_time
= params
900 .last_successful_backup(&target_ns
, group
)?
901 .unwrap_or(i64::MIN
);
903 let list
: Vec
<BackupDir
> = raw_list
906 .filter(|&(pos
, ref dir
)| {
907 source_snapshots
.insert(dir
.time
);
908 if last_sync_time
> dir
.time
{
909 already_synced_skip_info
.update(dir
.time
);
911 } else if already_synced_skip_info
.count
> 0 {
912 task_log
!(worker
, "{}", already_synced_skip_info
);
913 already_synced_skip_info
.reset();
917 if pos
< cutoff
&& last_sync_time
!= dir
.time
{
918 transfer_last_skip_info
.update(dir
.time
);
920 } else if transfer_last_skip_info
.count
> 0 {
921 task_log
!(worker
, "{}", transfer_last_skip_info
);
922 transfer_last_skip_info
.reset();
929 // start with 65536 chunks (up to 256 GiB)
930 let downloaded_chunks
= Arc
::new(Mutex
::new(HashSet
::with_capacity(1024 * 64)));
932 progress
.group_snapshots
= list
.len() as u64;
934 for (pos
, from_snapshot
) in list
.into_iter().enumerate() {
935 let to_snapshot
= params
938 .backup_dir(target_ns
.clone(), from_snapshot
.clone())?
;
942 .reader(source_namespace
, &from_snapshot
)
945 pull_snapshot_from(worker
, reader
, &to_snapshot
, downloaded_chunks
.clone()).await
;
947 progress
.done_snapshots
= pos
as u64 + 1;
948 task_log
!(worker
, "percentage done: {}", progress
);
950 result?
; // stop on error
953 if params
.remove_vanished
{
957 .backup_group(target_ns
.clone(), group
.clone());
958 let local_list
= group
.list_backups()?
;
959 for info
in local_list
{
960 let snapshot
= info
.backup_dir
;
961 if source_snapshots
.contains(&snapshot
.backup_time()) {
964 if snapshot
.is_protected() {
967 "don't delete vanished snapshot {} (protected)",
972 task_log
!(worker
, "delete vanished snapshot {}", snapshot
.dir());
976 .remove_backup_dir(&target_ns
, snapshot
.as_ref(), false)?
;
983 fn check_and_create_ns(params
: &PullParameters
, ns
: &BackupNamespace
) -> Result
<bool
, Error
> {
984 let mut created
= false;
985 let store_ns_str
= print_store_and_ns(params
.target
.store
.name(), ns
);
987 if !ns
.is_root() && !params
.target
.store
.namespace_path(ns
).exists() {
988 check_ns_modification_privs(params
.target
.store
.name(), ns
, ¶ms
.owner
)
989 .map_err(|err
| format_err
!("Creating {ns} not allowed - {err}"))?
;
991 let name
= match ns
.components().last() {
992 Some(name
) => name
.to_owned(),
994 bail
!("Failed to determine last component of namespace.");
998 if let Err(err
) = params
.target
.store
.create_namespace(&ns
.parent(), name
) {
999 bail
!("sync into {store_ns_str} failed - namespace creation failed: {err}");
1005 params
.target
.store
.name(),
1008 PRIV_DATASTORE_BACKUP
,
1010 .map_err(|err
| format_err
!("sync into {store_ns_str} not allowed - {err}"))?
;
1015 fn check_and_remove_ns(params
: &PullParameters
, local_ns
: &BackupNamespace
) -> Result
<bool
, Error
> {
1016 check_ns_modification_privs(params
.target
.store
.name(), local_ns
, ¶ms
.owner
)
1017 .map_err(|err
| format_err
!("Removing {local_ns} not allowed - {err}"))?
;
1022 .remove_namespace_recursive(local_ns
, true)
1025 fn check_and_remove_vanished_ns(
1026 worker
: &WorkerTask
,
1027 params
: &PullParameters
,
1028 synced_ns
: HashSet
<BackupNamespace
>,
1029 ) -> Result
<bool
, Error
> {
1030 let mut errors
= false;
1031 let user_info
= CachedUserInfo
::new()?
;
1033 // clamp like remote does so that we don't list more than we can ever have synced.
1034 let max_depth
= params
1036 .unwrap_or_else(|| MAX_NAMESPACE_DEPTH
- params
.source
.get_ns().depth());
1038 let mut local_ns_list
: Vec
<BackupNamespace
> = params
1041 .recursive_iter_backup_ns_ok(params
.target
.ns
.clone(), Some(max_depth
))?
1044 user_info
.lookup_privs(¶ms
.owner
, &ns
.acl_path(params
.target
.store
.name()));
1045 user_privs
& (PRIV_DATASTORE_BACKUP
| PRIV_DATASTORE_AUDIT
) != 0
1050 local_ns_list
.sort_unstable_by_key(|b
| std
::cmp
::Reverse(b
.name_len()));
1052 for local_ns
in local_ns_list
{
1053 if local_ns
== params
.target
.ns
{
1057 if synced_ns
.contains(&local_ns
) {
1061 if local_ns
.is_root() {
1064 match check_and_remove_ns(params
, &local_ns
) {
1065 Ok(true) => task_log
!(worker
, "Removed namespace {}", local_ns
),
1066 Ok(false) => task_log
!(
1068 "Did not remove namespace {} - protected snapshots remain",
1072 task_log
!(worker
, "Failed to remove namespace {} - {}", local_ns
, err
);
1081 /// Pulls a store according to `params`.
1083 /// Pulling a store consists of the following steps:
1084 /// - Query list of namespaces on the remote
1086 /// -- create sub-NS if needed (and allowed)
1087 /// -- attempt to pull each NS in turn
1088 /// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote
1090 /// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is
1091 /// passed to the remote at all to allow pulling from remotes which have no notion of namespaces.
1093 /// Permission checks:
1094 /// - access to local datastore, namespace anchor and remote entry need to be checked at call site
1095 /// - remote namespaces are filtered by remote
1096 /// - creation and removal of sub-NS checked here
1097 /// - access to sub-NS checked here
1098 pub(crate) async
fn pull_store(
1099 worker
: &WorkerTask
,
1100 mut params
: PullParameters
,
1101 ) -> Result
<(), Error
> {
1102 // explicit create shared lock to prevent GC on newly created chunks
1103 let _shared_store_lock
= params
.target
.store
.try_shared_chunk_store_lock()?
;
1104 let mut errors
= false;
1106 let old_max_depth
= params
.max_depth
;
1107 let mut namespaces
= if params
.source
.get_ns().is_root() && old_max_depth
== Some(0) {
1108 vec
![params
.source
.get_ns()] // backwards compat - don't query remote namespaces!
1112 .list_namespaces(&mut params
.max_depth
, worker
)
1116 let ns_layers_to_be_pulled
= namespaces
1118 .map(BackupNamespace
::depth
)
1120 .map_or(0, |v
| v
- params
.source
.get_ns().depth());
1121 let target_depth
= params
.target
.ns
.depth();
1123 if ns_layers_to_be_pulled
+ target_depth
> MAX_NAMESPACE_DEPTH
{
1125 "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
1126 ns_layers_to_be_pulled
,
1132 errors
|= old_max_depth
!= params
.max_depth
; // fail job if we switched to backwards-compat mode
1133 namespaces
.sort_unstable_by_key(|a
| a
.name_len());
1135 let (mut groups
, mut snapshots
) = (0, 0);
1136 let mut synced_ns
= HashSet
::with_capacity(namespaces
.len());
1138 for namespace
in namespaces
{
1139 let source_store_ns_str
= params
.source
.print_store_and_ns();
1141 let target_ns
= namespace
.map_prefix(¶ms
.source
.get_ns(), ¶ms
.target
.ns
)?
;
1142 let target_store_ns_str
= print_store_and_ns(params
.target
.store
.name(), &target_ns
);
1144 task_log
!(worker
, "----");
1147 "Syncing {} into {}",
1148 source_store_ns_str
,
1152 synced_ns
.insert(target_ns
.clone());
1154 match check_and_create_ns(¶ms
, &target_ns
) {
1155 Ok(true) => task_log
!(worker
, "Created namespace {}", target_ns
),
1160 "Cannot sync {} into {} - {}",
1161 source_store_ns_str
,
1162 target_store_ns_str
,
1170 match pull_ns(worker
, &namespace
, &mut params
).await
{
1171 Ok((ns_progress
, ns_errors
)) => {
1172 errors
|= ns_errors
;
1174 if params
.max_depth
!= Some(0) {
1175 groups
+= ns_progress
.done_groups
;
1176 snapshots
+= ns_progress
.done_snapshots
;
1179 "Finished syncing namespace {}, current progress: {} groups, {} snapshots",
1190 "Encountered errors while syncing namespace {} - {}",
1198 if params
.remove_vanished
{
1199 errors
|= check_and_remove_vanished_ns(worker
, ¶ms
, synced_ns
)?
;
1203 bail
!("sync failed with some errors.");
1209 /// Pulls a namespace according to `params`.
1211 /// Pulling a namespace consists of the following steps:
1212 /// - Query list of groups on the remote (in `source_ns`)
1213 /// - Filter list according to configured group filters
1214 /// - Iterate list and attempt to pull each group in turn
1215 /// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
1216 /// not or no longer available on the remote
1218 /// Permission checks:
1219 /// - remote namespaces are filtered by remote
1220 /// - owner check for vanished groups done here
1221 pub(crate) async
fn pull_ns(
1222 worker
: &WorkerTask
,
1223 namespace
: &BackupNamespace
,
1224 params
: &mut PullParameters
,
1225 ) -> Result
<(StoreProgress
, bool
), Error
> {
1226 let mut list
: Vec
<BackupGroup
> = params
.source
.list_groups(namespace
, ¶ms
.owner
).await?
;
1228 let total_count
= list
.len();
1229 list
.sort_unstable_by(|a
, b
| {
1230 let type_order
= a
.ty
.cmp(&b
.ty
);
1231 if type_order
== std
::cmp
::Ordering
::Equal
{
1238 let apply_filters
= |group
: &BackupGroup
, filters
: &[GroupFilter
]| -> bool
{
1239 filters
.iter().any(|filter
| group
.matches(filter
))
1242 let list
= if let Some(ref group_filter
) = ¶ms
.group_filter
{
1243 let unfiltered_count
= list
.len();
1244 let list
: Vec
<BackupGroup
> = list
1246 .filter(|group
| apply_filters(group
, group_filter
))
1250 "found {} groups to sync (out of {} total)",
1256 task_log
!(worker
, "found {} groups to sync", total_count
);
1260 let mut errors
= false;
1262 let mut new_groups
= HashSet
::new();
1263 for group
in list
.iter() {
1264 new_groups
.insert(group
.clone());
1267 let mut progress
= StoreProgress
::new(list
.len() as u64);
1269 let target_ns
= namespace
.map_prefix(¶ms
.source
.get_ns(), ¶ms
.target
.ns
)?
;
1271 for (done
, group
) in list
.into_iter().enumerate() {
1272 progress
.done_groups
= done
as u64;
1273 progress
.done_snapshots
= 0;
1274 progress
.group_snapshots
= 0;
1276 let (owner
, _lock_guard
) =
1280 .create_locked_backup_group(&target_ns
, &group
, ¶ms
.owner
)
1282 Ok(result
) => result
,
1286 "sync group {} failed - group lock failed: {}",
1291 // do not stop here, instead continue
1292 task_log
!(worker
, "create_locked_backup_group failed");
1298 if params
.owner
!= owner
{
1299 // only the owner is allowed to create additional snapshots
1302 "sync group {} failed - owner check failed ({} != {})",
1307 errors
= true; // do not stop here, instead continue
1308 } else if let Err(err
) = pull_group(worker
, params
, namespace
, &group
, &mut progress
).await
1310 task_log
!(worker
, "sync group {} failed - {}", &group
, err
,);
1311 errors
= true; // do not stop here, instead continue
1315 if params
.remove_vanished
{
1316 let result
: Result
<(), Error
> = proxmox_lang
::try_block
!({
1317 for local_group
in params
.target
.store
.iter_backup_groups(target_ns
.clone())?
{
1318 let local_group
= local_group?
;
1319 let local_group
= local_group
.group();
1320 if new_groups
.contains(local_group
) {
1323 let owner
= params
.target
.store
.get_owner(&target_ns
, local_group
)?
;
1324 if check_backup_owner(&owner
, ¶ms
.owner
).is_err() {
1327 if let Some(ref group_filter
) = ¶ms
.group_filter
{
1328 if !apply_filters(local_group
, group_filter
) {
1332 task_log
!(worker
, "delete vanished group '{local_group}'",);
1336 .remove_backup_group(&target_ns
, local_group
)
1342 "kept some protected snapshots of group '{}'",
1347 task_log
!(worker
, "{}", err
);
1354 if let Err(err
) = result
{
1355 task_log
!(worker
, "error during cleanup: {}", err
);
1360 Ok((progress
, errors
))