1 //! Sync datastore from remote server
3 use std
::collections
::{HashMap, HashSet}
;
4 use std
::io
::{Seek, Write}
;
5 use std
::path
::{Path, PathBuf}
;
6 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
7 use std
::sync
::{Arc, Mutex}
;
8 use std
::time
::{Duration, SystemTime}
;
10 use anyhow
::{bail, format_err, Error}
;
12 use proxmox_human_byte
::HumanByte
;
13 use proxmox_rest_server
::WorkerTask
;
14 use proxmox_router
::HttpError
;
15 use proxmox_sys
::{task_log, task_warn}
;
19 print_store_and_ns
, Authid
, BackupDir
, BackupGroup
, BackupNamespace
, CryptMode
, GroupFilter
,
20 GroupListItem
, Operation
, RateLimitConfig
, Remote
, SnapshotListItem
, MAX_NAMESPACE_DEPTH
,
21 PRIV_DATASTORE_AUDIT
, PRIV_DATASTORE_BACKUP
, PRIV_DATASTORE_READ
,
23 use pbs_client
::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}
;
24 use pbs_config
::CachedUserInfo
;
25 use pbs_datastore
::data_blob
::DataBlob
;
26 use pbs_datastore
::dynamic_index
::DynamicIndexReader
;
27 use pbs_datastore
::fixed_index
::FixedIndexReader
;
28 use pbs_datastore
::index
::IndexFile
;
29 use pbs_datastore
::manifest
::{
30 archive_type
, ArchiveType
, BackupManifest
, FileInfo
, CLIENT_LOG_BLOB_NAME
, MANIFEST_BLOB_NAME
,
32 use pbs_datastore
::read_chunk
::AsyncReadChunk
;
34 check_backup_owner
, DataStore
, ListNamespacesRecursive
, LocalChunkReader
, StoreProgress
,
36 use pbs_tools
::sha
::sha256
;
38 use crate::backup
::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups}
;
39 use crate::tools
::parallel_handler
::ParallelHandler
;
42 backup_reader
: Arc
<BackupReader
>,
47 _dir_lock
: Arc
<Mutex
<proxmox_sys
::fs
::DirLockGuard
>>,
49 datastore
: Arc
<DataStore
>,
52 pub(crate) struct PullTarget
{
53 store
: Arc
<DataStore
>,
57 pub(crate) struct RemoteSource
{
58 repo
: BackupRepository
,
63 pub(crate) struct LocalSource
{
64 store
: Arc
<DataStore
>,
69 pub(crate) struct RemovedVanishedStats
{
70 pub(crate) groups
: usize,
71 pub(crate) snapshots
: usize,
72 pub(crate) namespaces
: usize,
75 impl RemovedVanishedStats
{
76 fn add(&mut self, rhs
: RemovedVanishedStats
) {
77 self.groups
+= rhs
.groups
;
78 self.snapshots
+= rhs
.snapshots
;
79 self.namespaces
+= rhs
.namespaces
;
84 pub(crate) struct PullStats
{
85 pub(crate) chunk_count
: usize,
86 pub(crate) bytes
: usize,
87 pub(crate) elapsed
: Duration
,
88 pub(crate) removed
: Option
<RemovedVanishedStats
>,
91 impl From
<RemovedVanishedStats
> for PullStats
{
92 fn from(removed
: RemovedVanishedStats
) -> Self {
94 removed
: Some(removed
),
101 fn add(&mut self, rhs
: PullStats
) {
102 self.chunk_count
+= rhs
.chunk_count
;
103 self.bytes
+= rhs
.bytes
;
104 self.elapsed
+= rhs
.elapsed
;
106 if let Some(rhs_removed
) = rhs
.removed
{
107 if let Some(ref mut removed
) = self.removed
{
108 removed
.add(rhs_removed
);
110 self.removed
= Some(rhs_removed
);
116 #[async_trait::async_trait]
117 /// `PullSource` is a trait that provides an interface for pulling data/information from a source.
118 /// The trait includes methods for listing namespaces, groups, and backup directories,
119 /// as well as retrieving a reader for reading data from the source
120 trait PullSource
: Send
+ Sync
{
121 /// Lists namespaces from the source.
122 async
fn list_namespaces(
124 max_depth
: &mut Option
<usize>,
126 ) -> Result
<Vec
<BackupNamespace
>, Error
>;
128 /// Lists groups within a specific namespace from the source.
129 async
fn list_groups(
131 namespace
: &BackupNamespace
,
133 ) -> Result
<Vec
<BackupGroup
>, Error
>;
135 /// Lists backup directories for a specific group within a specific namespace from the source.
136 async
fn list_backup_dirs(
138 namespace
: &BackupNamespace
,
141 ) -> Result
<Vec
<BackupDir
>, Error
>;
142 fn get_ns(&self) -> BackupNamespace
;
143 fn get_store(&self) -> &str;
145 /// Returns a reader for reading data from a specific backup directory.
148 ns
: &BackupNamespace
,
150 ) -> Result
<Arc
<dyn PullReader
>, Error
>;
153 #[async_trait::async_trait]
154 impl PullSource
for RemoteSource
{
155 async
fn list_namespaces(
157 max_depth
: &mut Option
<usize>,
159 ) -> Result
<Vec
<BackupNamespace
>, Error
> {
160 if self.ns
.is_root() && max_depth
.map_or(false, |depth
| depth
== 0) {
161 return Ok(vec
![self.ns
.clone()]);
164 let path
= format
!("api2/json/admin/datastore/{}/namespace", self.repo
.store());
165 let mut data
= json
!({}
);
166 if let Some(max_depth
) = max_depth
{
167 data
["max-depth"] = json
!(max_depth
);
170 if !self.ns
.is_root() {
171 data
["parent"] = json
!(self.ns
);
173 self.client
.login().await?
;
175 let mut result
= match self.client
.get(&path
, Some(data
)).await
{
177 Err(err
) => match err
.downcast_ref
::<HttpError
>() {
178 Some(HttpError { code, message }
) => match code
{
179 &StatusCode
::NOT_FOUND
=> {
180 if self.ns
.is_root() && max_depth
.is_none() {
181 task_warn
!(worker
, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
182 task_warn
!(worker
, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
183 max_depth
.replace(0);
185 bail
!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
188 return Ok(vec
![self.ns
.clone()]);
191 bail
!("Querying namespaces failed - HTTP error {code} - {message}");
195 bail
!("Querying namespaces failed - {err}");
200 let list
: Vec
<BackupNamespace
> =
201 serde_json
::from_value
::<Vec
<pbs_api_types
::NamespaceListItem
>>(result
["data"].take())?
203 .map(|list_item
| list_item
.ns
)
209 async
fn list_groups(
211 namespace
: &BackupNamespace
,
213 ) -> Result
<Vec
<BackupGroup
>, Error
> {
214 let path
= format
!("api2/json/admin/datastore/{}/groups", self.repo
.store());
216 let args
= if !namespace
.is_root() {
217 Some(json
!({ "ns": namespace.clone() }
))
222 self.client
.login().await?
;
224 self.client
.get(&path
, args
).await
.map_err(|err
| {
225 format_err
!("Failed to retrieve backup groups from remote - {}", err
)
229 serde_json
::from_value
::<Vec
<GroupListItem
>>(result
["data"].take())
230 .map_err(Error
::from
)?
232 .map(|item
| item
.backup
)
233 .collect
::<Vec
<BackupGroup
>>(),
237 async
fn list_backup_dirs(
239 namespace
: &BackupNamespace
,
242 ) -> Result
<Vec
<BackupDir
>, Error
> {
243 let path
= format
!("api2/json/admin/datastore/{}/snapshots", self.repo
.store());
245 let mut args
= json
!({
246 "backup-type": group
.ty
,
247 "backup-id": group
.id
,
250 if !namespace
.is_root() {
251 args
["ns"] = serde_json
::to_value(namespace
)?
;
254 self.client
.login().await?
;
256 let mut result
= self.client
.get(&path
, Some(args
)).await?
;
257 let snapshot_list
: Vec
<SnapshotListItem
> = serde_json
::from_value(result
["data"].take())?
;
260 .filter_map(|item
: SnapshotListItem
| {
261 let snapshot
= item
.backup
;
262 // in-progress backups can't be synced
263 if item
.size
.is_none() {
266 "skipping snapshot {} - in-progress backup",
274 .collect
::<Vec
<BackupDir
>>())
277 fn get_ns(&self) -> BackupNamespace
{
281 fn get_store(&self) -> &str {
287 ns
: &BackupNamespace
,
289 ) -> Result
<Arc
<dyn PullReader
>, Error
> {
291 BackupReader
::start(&self.client
, None
, self.repo
.store(), ns
, dir
, true).await?
;
292 Ok(Arc
::new(RemoteReader
{
299 #[async_trait::async_trait]
300 impl PullSource
for LocalSource
{
301 async
fn list_namespaces(
303 max_depth
: &mut Option
<usize>,
304 _worker
: &WorkerTask
,
305 ) -> Result
<Vec
<BackupNamespace
>, Error
> {
306 ListNamespacesRecursive
::new_max_depth(
309 max_depth
.unwrap_or(MAX_NAMESPACE_DEPTH
),
314 async
fn list_groups(
316 namespace
: &BackupNamespace
,
318 ) -> Result
<Vec
<BackupGroup
>, Error
> {
319 Ok(ListAccessibleBackupGroups
::new_with_privs(
323 Some(PRIV_DATASTORE_READ
),
324 Some(PRIV_DATASTORE_BACKUP
),
327 .filter_map(Result
::ok
)
328 .map(|backup_group
| backup_group
.group().clone())
329 .collect
::<Vec
<pbs_api_types
::BackupGroup
>>())
332 async
fn list_backup_dirs(
334 namespace
: &BackupNamespace
,
336 _worker
: &WorkerTask
,
337 ) -> Result
<Vec
<BackupDir
>, Error
> {
340 .backup_group(namespace
.clone(), group
.clone())
342 .filter_map(Result
::ok
)
343 .map(|snapshot
| snapshot
.dir().to_owned())
344 .collect
::<Vec
<BackupDir
>>())
347 fn get_ns(&self) -> BackupNamespace
{
351 fn get_store(&self) -> &str {
357 ns
: &BackupNamespace
,
359 ) -> Result
<Arc
<dyn PullReader
>, Error
> {
360 let dir
= self.store
.backup_dir(ns
.clone(), dir
.clone())?
;
361 let dir_lock
= proxmox_sys
::fs
::lock_dir_noblock_shared(
364 "locked by another operation",
366 Ok(Arc
::new(LocalReader
{
367 _dir_lock
: Arc
::new(Mutex
::new(dir_lock
)),
368 path
: dir
.full_path(),
369 datastore
: dir
.datastore().clone(),
374 #[async_trait::async_trait]
375 /// `PullReader` is a trait that provides an interface for reading data from a source.
376 /// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
377 trait PullReader
: Send
+ Sync
{
378 /// Returns a chunk reader with the specified encryption mode.
379 fn chunk_reader(&self, crypt_mode
: CryptMode
) -> Arc
<dyn AsyncReadChunk
>;
381 /// Asynchronously loads a file from the source into a local file.
382 /// `filename` is the name of the file to load from the source.
383 /// `into` is the path of the local file to load the source file into.
384 async
fn load_file_into(
389 ) -> Result
<Option
<DataBlob
>, Error
>;
391 /// Tries to download the client log from the source and save it into a local file.
392 async
fn try_download_client_log(
396 ) -> Result
<(), Error
>;
398 fn skip_chunk_sync(&self, target_store_name
: &str) -> bool
;
401 #[async_trait::async_trait]
402 impl PullReader
for RemoteReader
{
403 fn chunk_reader(&self, crypt_mode
: CryptMode
) -> Arc
<dyn AsyncReadChunk
> {
404 Arc
::new(RemoteChunkReader
::new(
405 self.backup_reader
.clone(),
412 async
fn load_file_into(
417 ) -> Result
<Option
<DataBlob
>, Error
> {
418 let mut tmp_file
= std
::fs
::OpenOptions
::new()
424 let download_result
= self.backup_reader
.download(filename
, &mut tmp_file
).await
;
425 if let Err(err
) = download_result
{
426 match err
.downcast_ref
::<HttpError
>() {
427 Some(HttpError { code, message }
) => match *code
{
428 StatusCode
::NOT_FOUND
=> {
431 "skipping snapshot {} - vanished since start of sync",
437 bail
!("HTTP error {code} - {message}");
446 Ok(DataBlob
::load_from_reader(&mut tmp_file
).ok())
449 async
fn try_download_client_log(
453 ) -> Result
<(), Error
> {
454 let mut tmp_path
= to_path
.to_owned();
455 tmp_path
.set_extension("tmp");
457 let tmpfile
= std
::fs
::OpenOptions
::new()
463 // Note: be silent if there is no log - only log successful download
466 .download(CLIENT_LOG_BLOB_NAME
, tmpfile
)
469 if let Err(err
) = std
::fs
::rename(&tmp_path
, to_path
) {
470 bail
!("Atomic rename file {:?} failed - {}", to_path
, err
);
472 task_log
!(worker
, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME
);
478 fn skip_chunk_sync(&self, _target_store_name
: &str) -> bool
{
483 #[async_trait::async_trait]
484 impl PullReader
for LocalReader
{
485 fn chunk_reader(&self, crypt_mode
: CryptMode
) -> Arc
<dyn AsyncReadChunk
> {
486 Arc
::new(LocalChunkReader
::new(
487 self.datastore
.clone(),
493 async
fn load_file_into(
497 _worker
: &WorkerTask
,
498 ) -> Result
<Option
<DataBlob
>, Error
> {
499 let mut tmp_file
= std
::fs
::OpenOptions
::new()
505 let mut from_path
= self.path
.clone();
506 from_path
.push(filename
);
507 tmp_file
.write_all(std
::fs
::read(from_path
)?
.as_slice())?
;
509 Ok(DataBlob
::load_from_reader(&mut tmp_file
).ok())
512 async
fn try_download_client_log(
515 _worker
: &WorkerTask
,
516 ) -> Result
<(), Error
> {
520 fn skip_chunk_sync(&self, target_store_name
: &str) -> bool
{
521 self.datastore
.name() == target_store_name
525 /// Parameters for a pull operation.
526 pub(crate) struct PullParameters
{
527 /// Where data is pulled from
528 source
: Arc
<dyn PullSource
>,
529 /// Where data should be pulled into
531 /// Owner of synced groups (needs to match local owner of pre-existing groups)
533 /// Whether to remove groups which exist locally, but not on the remote end
534 remove_vanished
: bool
,
535 /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion)
536 max_depth
: Option
<usize>,
537 /// Filters for reducing the pull scope
538 group_filter
: Vec
<GroupFilter
>,
539 /// How many snapshots should be transferred at most (taking the newest N snapshots)
540 transfer_last
: Option
<usize>,
543 impl PullParameters
{
544 /// Creates a new instance of `PullParameters`.
548 remote
: Option
<&str>,
550 remote_ns
: BackupNamespace
,
552 remove_vanished
: Option
<bool
>,
553 max_depth
: Option
<usize>,
554 group_filter
: Option
<Vec
<GroupFilter
>>,
555 limit
: RateLimitConfig
,
556 transfer_last
: Option
<usize>,
557 ) -> Result
<Self, Error
> {
558 if let Some(max_depth
) = max_depth
{
559 ns
.check_max_depth(max_depth
)?
;
560 remote_ns
.check_max_depth(max_depth
)?
;
562 let remove_vanished
= remove_vanished
.unwrap_or(false);
564 let source
: Arc
<dyn PullSource
> = if let Some(remote
) = remote
{
565 let (remote_config
, _digest
) = pbs_config
::remote
::config()?
;
566 let remote
: Remote
= remote_config
.lookup("remote", remote
)?
;
568 let repo
= BackupRepository
::new(
569 Some(remote
.config
.auth_id
.clone()),
570 Some(remote
.config
.host
.clone()),
572 remote_store
.to_string(),
574 let client
= crate::api2
::config
::remote
::remote_client_config(&remote
, Some(limit
))?
;
575 Arc
::new(RemoteSource
{
581 Arc
::new(LocalSource
{
582 store
: DataStore
::lookup_datastore(remote_store
, Some(Operation
::Read
))?
,
586 let target
= PullTarget
{
587 store
: DataStore
::lookup_datastore(store
, Some(Operation
::Write
))?
,
591 let group_filter
= group_filter
.unwrap_or_default();
605 async
fn pull_index_chunks
<I
: IndexFile
>(
607 chunk_reader
: Arc
<dyn AsyncReadChunk
>,
608 target
: Arc
<DataStore
>,
610 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
611 ) -> Result
<PullStats
, Error
> {
612 use futures
::stream
::{self, StreamExt, TryStreamExt}
;
614 let start_time
= SystemTime
::now();
616 let stream
= stream
::iter(
617 (0..index
.index_count())
618 .map(|pos
| index
.chunk_info(pos
).unwrap())
620 let mut guard
= downloaded_chunks
.lock().unwrap();
621 let done
= guard
.contains(&info
.digest
);
623 // Note: We mark a chunk as downloaded before its actually downloaded
624 // to avoid duplicate downloads.
625 guard
.insert(info
.digest
);
631 let target2
= target
.clone();
632 let verify_pool
= ParallelHandler
::new(
635 move |(chunk
, digest
, size
): (DataBlob
, [u8; 32], u64)| {
636 // println!("verify and write {}", hex::encode(&digest));
637 chunk
.verify_unencrypted(size
as usize, &digest
)?
;
638 target2
.insert_chunk(&chunk
, &digest
)?
;
643 let verify_and_write_channel
= verify_pool
.channel();
645 let bytes
= Arc
::new(AtomicUsize
::new(0));
646 let chunk_count
= Arc
::new(AtomicUsize
::new(0));
650 let target
= Arc
::clone(&target
);
651 let chunk_reader
= chunk_reader
.clone();
652 let bytes
= Arc
::clone(&bytes
);
653 let chunk_count
= Arc
::clone(&chunk_count
);
654 let verify_and_write_channel
= verify_and_write_channel
.clone();
656 Ok
::<_
, Error
>(async
move {
657 let chunk_exists
= proxmox_async
::runtime
::block_in_place(|| {
658 target
.cond_touch_chunk(&info
.digest
, false)
661 //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest));
662 return Ok
::<_
, Error
>(());
664 //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest));
665 let chunk
= chunk_reader
.read_raw_chunk(&info
.digest
).await?
;
666 let raw_size
= chunk
.raw_size() as usize;
668 // decode, verify and write in a separate threads to maximize throughput
669 proxmox_async
::runtime
::block_in_place(|| {
670 verify_and_write_channel
.send((chunk
, info
.digest
, info
.size()))
673 bytes
.fetch_add(raw_size
, Ordering
::SeqCst
);
674 chunk_count
.fetch_add(1, Ordering
::SeqCst
);
679 .try_buffer_unordered(20)
680 .try_for_each(|_res
| futures
::future
::ok(()))
683 drop(verify_and_write_channel
);
685 verify_pool
.complete()?
;
687 let elapsed
= start_time
.elapsed()?
;
689 let bytes
= bytes
.load(Ordering
::SeqCst
);
690 let chunk_count
= chunk_count
.load(Ordering
::SeqCst
);
694 "downloaded {} ({}/s)",
695 HumanByte
::from(bytes
),
696 HumanByte
::new_binary(bytes
as f64 / elapsed
.as_secs_f64()),
707 fn verify_archive(info
: &FileInfo
, csum
: &[u8; 32], size
: u64) -> Result
<(), Error
> {
708 if size
!= info
.size
{
710 "wrong size for file '{}' ({} != {})",
717 if csum
!= &info
.csum
{
718 bail
!("wrong checksum for file '{}'", info
.filename
);
724 /// Pulls a single file referenced by a manifest.
726 /// Pulling an archive consists of the following steps:
727 /// - Load archive file into tmp file
728 /// -- Load file into tmp file
729 /// -- Verify tmp file checksum
730 /// - if archive is an index, pull referenced chunks
731 /// - Rename tmp file into real path
732 async
fn pull_single_archive
<'a
>(
733 worker
: &'a WorkerTask
,
734 reader
: Arc
<dyn PullReader
+ 'a
>,
735 snapshot
: &'a pbs_datastore
::BackupDir
,
736 archive_info
: &'a FileInfo
,
737 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
738 ) -> Result
<PullStats
, Error
> {
739 let archive_name
= &archive_info
.filename
;
740 let mut path
= snapshot
.full_path();
741 path
.push(archive_name
);
743 let mut tmp_path
= path
.clone();
744 tmp_path
.set_extension("tmp");
746 let mut pull_stats
= PullStats
::default();
748 task_log
!(worker
, "sync archive {}", archive_name
);
751 .load_file_into(archive_name
, &tmp_path
, worker
)
754 let mut tmpfile
= std
::fs
::OpenOptions
::new().read(true).open(&tmp_path
)?
;
756 match archive_type(archive_name
)?
{
757 ArchiveType
::DynamicIndex
=> {
758 let index
= DynamicIndexReader
::new(tmpfile
).map_err(|err
| {
759 format_err
!("unable to read dynamic index {:?} - {}", tmp_path
, err
)
761 let (csum
, size
) = index
.compute_csum();
762 verify_archive(archive_info
, &csum
, size
)?
;
764 if reader
.skip_chunk_sync(snapshot
.datastore().name()) {
765 task_log
!(worker
, "skipping chunk sync for same datastore");
767 let stats
= pull_index_chunks(
769 reader
.chunk_reader(archive_info
.crypt_mode
),
770 snapshot
.datastore().clone(),
775 pull_stats
.add(stats
);
778 ArchiveType
::FixedIndex
=> {
779 let index
= FixedIndexReader
::new(tmpfile
).map_err(|err
| {
780 format_err
!("unable to read fixed index '{:?}' - {}", tmp_path
, err
)
782 let (csum
, size
) = index
.compute_csum();
783 verify_archive(archive_info
, &csum
, size
)?
;
785 if reader
.skip_chunk_sync(snapshot
.datastore().name()) {
786 task_log
!(worker
, "skipping chunk sync for same datastore");
788 let stats
= pull_index_chunks(
790 reader
.chunk_reader(archive_info
.crypt_mode
),
791 snapshot
.datastore().clone(),
796 pull_stats
.add(stats
);
799 ArchiveType
::Blob
=> {
801 let (csum
, size
) = sha256(&mut tmpfile
)?
;
802 verify_archive(archive_info
, &csum
, size
)?
;
805 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
806 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
811 /// Actual implementation of pulling a snapshot.
813 /// Pulling a snapshot consists of the following steps:
814 /// - (Re)download the manifest
815 /// -- if it matches, only download log and treat snapshot as already synced
816 /// - Iterate over referenced files
817 /// -- if file already exists, verify contents
818 /// -- if not, pull it from the remote
819 /// - Download log if not already existing
820 async
fn pull_snapshot
<'a
>(
821 worker
: &'a WorkerTask
,
822 reader
: Arc
<dyn PullReader
+ 'a
>,
823 snapshot
: &'a pbs_datastore
::BackupDir
,
824 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
825 ) -> Result
<PullStats
, Error
> {
826 let mut pull_stats
= PullStats
::default();
827 let mut manifest_name
= snapshot
.full_path();
828 manifest_name
.push(MANIFEST_BLOB_NAME
);
830 let mut client_log_name
= snapshot
.full_path();
831 client_log_name
.push(CLIENT_LOG_BLOB_NAME
);
833 let mut tmp_manifest_name
= manifest_name
.clone();
834 tmp_manifest_name
.set_extension("tmp");
835 let tmp_manifest_blob
;
836 if let Some(data
) = reader
837 .load_file_into(MANIFEST_BLOB_NAME
, &tmp_manifest_name
, worker
)
840 tmp_manifest_blob
= data
;
842 return Ok(pull_stats
);
845 if manifest_name
.exists() {
846 let manifest_blob
= proxmox_lang
::try_block
!({
847 let mut manifest_file
= std
::fs
::File
::open(&manifest_name
).map_err(|err
| {
848 format_err
!("unable to open local manifest {manifest_name:?} - {err}")
851 let manifest_blob
= DataBlob
::load_from_reader(&mut manifest_file
)?
;
854 .map_err(|err
: Error
| {
855 format_err
!("unable to read local manifest {manifest_name:?} - {err}")
858 if manifest_blob
.raw_data() == tmp_manifest_blob
.raw_data() {
859 if !client_log_name
.exists() {
861 .try_download_client_log(&client_log_name
, worker
)
864 task_log
!(worker
, "no data changes");
865 let _
= std
::fs
::remove_file(&tmp_manifest_name
);
866 return Ok(pull_stats
); // nothing changed
870 let manifest
= BackupManifest
::try_from(tmp_manifest_blob
)?
;
872 for item
in manifest
.files() {
873 let mut path
= snapshot
.full_path();
874 path
.push(&item
.filename
);
877 match archive_type(&item
.filename
)?
{
878 ArchiveType
::DynamicIndex
=> {
879 let index
= DynamicIndexReader
::open(&path
)?
;
880 let (csum
, size
) = index
.compute_csum();
881 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
884 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
888 ArchiveType
::FixedIndex
=> {
889 let index
= FixedIndexReader
::open(&path
)?
;
890 let (csum
, size
) = index
.compute_csum();
891 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
894 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
898 ArchiveType
::Blob
=> {
899 let mut tmpfile
= std
::fs
::File
::open(&path
)?
;
900 let (csum
, size
) = sha256(&mut tmpfile
)?
;
901 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
904 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
911 let stats
= pull_single_archive(
916 downloaded_chunks
.clone(),
919 pull_stats
.add(stats
);
922 if let Err(err
) = std
::fs
::rename(&tmp_manifest_name
, &manifest_name
) {
923 bail
!("Atomic rename file {:?} failed - {}", manifest_name
, err
);
926 if !client_log_name
.exists() {
928 .try_download_client_log(&client_log_name
, worker
)
932 .cleanup_unreferenced_files(&manifest
)
933 .map_err(|err
| format_err
!("failed to cleanup unreferenced files - {err}"))?
;
938 /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
940 /// The `reader` is configured to read from the source backup directory, while the
941 /// `snapshot` is pointing to the local datastore and target namespace.
942 async
fn pull_snapshot_from
<'a
>(
943 worker
: &'a WorkerTask
,
944 reader
: Arc
<dyn PullReader
+ 'a
>,
945 snapshot
: &'a pbs_datastore
::BackupDir
,
946 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
947 ) -> Result
<PullStats
, Error
> {
948 let (_path
, is_new
, _snap_lock
) = snapshot
950 .create_locked_backup_dir(snapshot
.backup_ns(), snapshot
.as_ref())?
;
952 let pull_stats
= if is_new
{
953 task_log
!(worker
, "sync snapshot {}", snapshot
.dir());
955 match pull_snapshot(worker
, reader
, snapshot
, downloaded_chunks
).await
{
957 if let Err(cleanup_err
) = snapshot
.datastore().remove_backup_dir(
958 snapshot
.backup_ns(),
962 task_log
!(worker
, "cleanup error - {}", cleanup_err
);
967 task_log
!(worker
, "sync snapshot {} done", snapshot
.dir());
972 task_log
!(worker
, "re-sync snapshot {}", snapshot
.dir());
973 pull_snapshot(worker
, reader
, snapshot
, downloaded_chunks
).await?
979 #[derive(PartialEq, Eq)]
985 impl std
::fmt
::Display
for SkipReason
{
986 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
991 SkipReason
::AlreadySynced
=> "older than the newest local snapshot",
992 SkipReason
::TransferLast
=> "due to transfer-last",
1002 skip_reason
: SkipReason
,
1006 fn new(skip_reason
: SkipReason
) -> Self {
1015 fn reset(&mut self) {
1017 self.oldest
= i64::MAX
;
1018 self.newest
= i64::MIN
;
1021 fn update(&mut self, backup_time
: i64) {
1024 if backup_time
< self.oldest
{
1025 self.oldest
= backup_time
;
1028 if backup_time
> self.newest
{
1029 self.newest
= backup_time
;
1033 fn affected(&self) -> Result
<String
, Error
> {
1035 0 => Ok(String
::new()),
1036 1 => Ok(proxmox_time
::epoch_to_rfc3339_utc(self.oldest
)?
),
1039 proxmox_time
::epoch_to_rfc3339_utc(self.oldest
)?
,
1040 proxmox_time
::epoch_to_rfc3339_utc(self.newest
)?
,
1046 impl std
::fmt
::Display
for SkipInfo
{
1047 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
1050 "skipped: {} snapshot(s) ({}) - {}",
1052 self.affected().map_err(|_
| std
::fmt
::Error
)?
,
1058 /// Pulls a group according to `params`.
1060 /// Pulling a group consists of the following steps:
1061 /// - Query the list of snapshots available for this group in the source namespace on the remote
1062 /// - Sort by snapshot time
1063 /// - Get last snapshot timestamp on local datastore
1064 /// - Iterate over list of snapshots
1065 /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
1066 /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
1068 /// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
1069 /// remote when querying snapshots. This allows us to interact with old remotes that don't have
1070 /// namespace support yet.
1072 /// Permission checks:
1073 /// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
1074 /// - local group owner is already checked by pull_store
1075 async
fn pull_group(
1076 worker
: &WorkerTask
,
1077 params
: &PullParameters
,
1078 source_namespace
: &BackupNamespace
,
1079 group
: &BackupGroup
,
1080 progress
: &mut StoreProgress
,
1081 ) -> Result
<PullStats
, Error
> {
1082 let mut already_synced_skip_info
= SkipInfo
::new(SkipReason
::AlreadySynced
);
1083 let mut transfer_last_skip_info
= SkipInfo
::new(SkipReason
::TransferLast
);
1085 let mut raw_list
: Vec
<BackupDir
> = params
1087 .list_backup_dirs(source_namespace
, group
, worker
)
1089 raw_list
.sort_unstable_by(|a
, b
| a
.time
.cmp(&b
.time
));
1091 let total_amount
= raw_list
.len();
1095 .map(|count
| total_amount
.saturating_sub(count
))
1096 .unwrap_or_default();
1098 let target_ns
= source_namespace
.map_prefix(¶ms
.source
.get_ns(), ¶ms
.target
.ns
)?
;
1100 let mut source_snapshots
= HashSet
::new();
1101 let last_sync_time
= params
1104 .last_successful_backup(&target_ns
, group
)?
1105 .unwrap_or(i64::MIN
);
1107 let list
: Vec
<BackupDir
> = raw_list
1110 .filter(|&(pos
, ref dir
)| {
1111 source_snapshots
.insert(dir
.time
);
1112 if last_sync_time
> dir
.time
{
1113 already_synced_skip_info
.update(dir
.time
);
1115 } else if already_synced_skip_info
.count
> 0 {
1116 task_log
!(worker
, "{}", already_synced_skip_info
);
1117 already_synced_skip_info
.reset();
1121 if pos
< cutoff
&& last_sync_time
!= dir
.time
{
1122 transfer_last_skip_info
.update(dir
.time
);
1124 } else if transfer_last_skip_info
.count
> 0 {
1125 task_log
!(worker
, "{}", transfer_last_skip_info
);
1126 transfer_last_skip_info
.reset();
1130 .map(|(_
, dir
)| dir
)
1133 // start with 65536 chunks (up to 256 GiB)
1134 let downloaded_chunks
= Arc
::new(Mutex
::new(HashSet
::with_capacity(1024 * 64)));
1136 progress
.group_snapshots
= list
.len() as u64;
1138 let mut pull_stats
= PullStats
::default();
1140 for (pos
, from_snapshot
) in list
.into_iter().enumerate() {
1141 let to_snapshot
= params
1144 .backup_dir(target_ns
.clone(), from_snapshot
.clone())?
;
1148 .reader(source_namespace
, &from_snapshot
)
1151 pull_snapshot_from(worker
, reader
, &to_snapshot
, downloaded_chunks
.clone()).await
;
1153 progress
.done_snapshots
= pos
as u64 + 1;
1154 task_log
!(worker
, "percentage done: {}", progress
);
1156 let stats
= result?
; // stop on error
1157 pull_stats
.add(stats
);
1160 if params
.remove_vanished
{
1164 .backup_group(target_ns
.clone(), group
.clone());
1165 let local_list
= group
.list_backups()?
;
1166 for info
in local_list
{
1167 let snapshot
= info
.backup_dir
;
1168 if source_snapshots
.contains(&snapshot
.backup_time()) {
1171 if snapshot
.is_protected() {
1174 "don't delete vanished snapshot {} (protected)",
1179 task_log
!(worker
, "delete vanished snapshot {}", snapshot
.dir());
1183 .remove_backup_dir(&target_ns
, snapshot
.as_ref(), false)?
;
1184 pull_stats
.add(PullStats
::from(RemovedVanishedStats
{
1195 fn check_and_create_ns(params
: &PullParameters
, ns
: &BackupNamespace
) -> Result
<bool
, Error
> {
1196 let mut created
= false;
1197 let store_ns_str
= print_store_and_ns(params
.target
.store
.name(), ns
);
1199 if !ns
.is_root() && !params
.target
.store
.namespace_path(ns
).exists() {
1200 check_ns_modification_privs(params
.target
.store
.name(), ns
, ¶ms
.owner
)
1201 .map_err(|err
| format_err
!("Creating {ns} not allowed - {err}"))?
;
1203 let name
= match ns
.components().last() {
1204 Some(name
) => name
.to_owned(),
1206 bail
!("Failed to determine last component of namespace.");
1210 if let Err(err
) = params
.target
.store
.create_namespace(&ns
.parent(), name
) {
1211 bail
!("sync into {store_ns_str} failed - namespace creation failed: {err}");
1217 params
.target
.store
.name(),
1220 PRIV_DATASTORE_BACKUP
,
1222 .map_err(|err
| format_err
!("sync into {store_ns_str} not allowed - {err}"))?
;
1227 fn check_and_remove_ns(params
: &PullParameters
, local_ns
: &BackupNamespace
) -> Result
<bool
, Error
> {
1228 check_ns_modification_privs(params
.target
.store
.name(), local_ns
, ¶ms
.owner
)
1229 .map_err(|err
| format_err
!("Removing {local_ns} not allowed - {err}"))?
;
1234 .remove_namespace_recursive(local_ns
, true)
1237 fn check_and_remove_vanished_ns(
1238 worker
: &WorkerTask
,
1239 params
: &PullParameters
,
1240 synced_ns
: HashSet
<BackupNamespace
>,
1241 ) -> Result
<(bool
, RemovedVanishedStats
), Error
> {
1242 let mut errors
= false;
1243 let mut removed_stats
= RemovedVanishedStats
::default();
1244 let user_info
= CachedUserInfo
::new()?
;
1246 // clamp like remote does so that we don't list more than we can ever have synced.
1247 let max_depth
= params
1249 .unwrap_or_else(|| MAX_NAMESPACE_DEPTH
- params
.source
.get_ns().depth());
1251 let mut local_ns_list
: Vec
<BackupNamespace
> = params
1254 .recursive_iter_backup_ns_ok(params
.target
.ns
.clone(), Some(max_depth
))?
1257 user_info
.lookup_privs(¶ms
.owner
, &ns
.acl_path(params
.target
.store
.name()));
1258 user_privs
& (PRIV_DATASTORE_BACKUP
| PRIV_DATASTORE_AUDIT
) != 0
1263 local_ns_list
.sort_unstable_by_key(|b
| std
::cmp
::Reverse(b
.name_len()));
1265 for local_ns
in local_ns_list
{
1266 if local_ns
== params
.target
.ns
{
1270 if synced_ns
.contains(&local_ns
) {
1274 if local_ns
.is_root() {
1277 match check_and_remove_ns(params
, &local_ns
) {
1279 task_log
!(worker
, "Removed namespace {local_ns}");
1280 removed_stats
.namespaces
+= 1;
1282 Ok(false) => task_log
!(
1284 "Did not remove namespace {} - protected snapshots remain",
1288 task_log
!(worker
, "Failed to remove namespace {} - {}", local_ns
, err
);
1294 Ok((errors
, removed_stats
))
1297 /// Pulls a store according to `params`.
1299 /// Pulling a store consists of the following steps:
1300 /// - Query list of namespaces on the remote
1302 /// -- create sub-NS if needed (and allowed)
1303 /// -- attempt to pull each NS in turn
1304 /// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote
1306 /// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is
1307 /// passed to the remote at all to allow pulling from remotes which have no notion of namespaces.
1309 /// Permission checks:
1310 /// - access to local datastore, namespace anchor and remote entry need to be checked at call site
1311 /// - remote namespaces are filtered by remote
1312 /// - creation and removal of sub-NS checked here
1313 /// - access to sub-NS checked here
1314 pub(crate) async
fn pull_store(
1315 worker
: &WorkerTask
,
1316 mut params
: PullParameters
,
1317 ) -> Result
<PullStats
, Error
> {
1318 // explicit create shared lock to prevent GC on newly created chunks
1319 let _shared_store_lock
= params
.target
.store
.try_shared_chunk_store_lock()?
;
1320 let mut errors
= false;
1322 let old_max_depth
= params
.max_depth
;
1323 let mut namespaces
= if params
.source
.get_ns().is_root() && old_max_depth
== Some(0) {
1324 vec
![params
.source
.get_ns()] // backwards compat - don't query remote namespaces!
1328 .list_namespaces(&mut params
.max_depth
, worker
)
1332 let ns_layers_to_be_pulled
= namespaces
1334 .map(BackupNamespace
::depth
)
1336 .map_or(0, |v
| v
- params
.source
.get_ns().depth());
1337 let target_depth
= params
.target
.ns
.depth();
1339 if ns_layers_to_be_pulled
+ target_depth
> MAX_NAMESPACE_DEPTH
{
1341 "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
1342 ns_layers_to_be_pulled
,
1348 errors
|= old_max_depth
!= params
.max_depth
; // fail job if we switched to backwards-compat mode
1349 namespaces
.sort_unstable_by_key(|a
| a
.name_len());
1351 let (mut groups
, mut snapshots
) = (0, 0);
1352 let mut synced_ns
= HashSet
::with_capacity(namespaces
.len());
1353 let mut pull_stats
= PullStats
::default();
1355 for namespace
in namespaces
{
1356 let source_store_ns_str
= print_store_and_ns(params
.source
.get_store(), &namespace
);
1358 let target_ns
= namespace
.map_prefix(¶ms
.source
.get_ns(), ¶ms
.target
.ns
)?
;
1359 let target_store_ns_str
= print_store_and_ns(params
.target
.store
.name(), &target_ns
);
1361 task_log
!(worker
, "----");
1364 "Syncing {} into {}",
1365 source_store_ns_str
,
1369 synced_ns
.insert(target_ns
.clone());
1371 match check_and_create_ns(¶ms
, &target_ns
) {
1372 Ok(true) => task_log
!(worker
, "Created namespace {}", target_ns
),
1377 "Cannot sync {} into {} - {}",
1378 source_store_ns_str
,
1379 target_store_ns_str
,
1387 match pull_ns(worker
, &namespace
, &mut params
).await
{
1388 Ok((ns_progress
, ns_pull_stats
, ns_errors
)) => {
1389 errors
|= ns_errors
;
1391 pull_stats
.add(ns_pull_stats
);
1393 if params
.max_depth
!= Some(0) {
1394 groups
+= ns_progress
.done_groups
;
1395 snapshots
+= ns_progress
.done_snapshots
;
1398 "Finished syncing namespace {}, current progress: {} groups, {} snapshots",
1409 "Encountered errors while syncing namespace {} - {}",
1417 if params
.remove_vanished
{
1418 let (has_errors
, stats
) = check_and_remove_vanished_ns(worker
, ¶ms
, synced_ns
)?
;
1419 errors
|= has_errors
;
1420 pull_stats
.add(PullStats
::from(stats
));
1424 bail
!("sync failed with some errors.");
1430 /// Pulls a namespace according to `params`.
1432 /// Pulling a namespace consists of the following steps:
1433 /// - Query list of groups on the remote (in `source_ns`)
1434 /// - Filter list according to configured group filters
1435 /// - Iterate list and attempt to pull each group in turn
1436 /// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
1437 /// not or no longer available on the remote
1439 /// Permission checks:
1440 /// - remote namespaces are filtered by remote
1441 /// - owner check for vanished groups done here
1442 pub(crate) async
fn pull_ns(
1443 worker
: &WorkerTask
,
1444 namespace
: &BackupNamespace
,
1445 params
: &mut PullParameters
,
1446 ) -> Result
<(StoreProgress
, PullStats
, bool
), Error
> {
1447 let mut list
: Vec
<BackupGroup
> = params
.source
.list_groups(namespace
, ¶ms
.owner
).await?
;
1449 list
.sort_unstable_by(|a
, b
| {
1450 let type_order
= a
.ty
.cmp(&b
.ty
);
1451 if type_order
== std
::cmp
::Ordering
::Equal
{
1458 let unfiltered_count
= list
.len();
1459 let list
: Vec
<BackupGroup
> = list
1461 .filter(|group
| group
.apply_filters(¶ms
.group_filter
))
1465 "found {} groups to sync (out of {} total)",
1470 let mut errors
= false;
1472 let mut new_groups
= HashSet
::new();
1473 for group
in list
.iter() {
1474 new_groups
.insert(group
.clone());
1477 let mut progress
= StoreProgress
::new(list
.len() as u64);
1478 let mut pull_stats
= PullStats
::default();
1480 let target_ns
= namespace
.map_prefix(¶ms
.source
.get_ns(), ¶ms
.target
.ns
)?
;
1482 for (done
, group
) in list
.into_iter().enumerate() {
1483 progress
.done_groups
= done
as u64;
1484 progress
.done_snapshots
= 0;
1485 progress
.group_snapshots
= 0;
1487 let (owner
, _lock_guard
) =
1491 .create_locked_backup_group(&target_ns
, &group
, ¶ms
.owner
)
1493 Ok(result
) => result
,
1497 "sync group {} failed - group lock failed: {}",
1502 // do not stop here, instead continue
1503 task_log
!(worker
, "create_locked_backup_group failed");
1509 if params
.owner
!= owner
{
1510 // only the owner is allowed to create additional snapshots
1513 "sync group {} failed - owner check failed ({} != {})",
1518 errors
= true; // do not stop here, instead continue
1520 match pull_group(worker
, params
, namespace
, &group
, &mut progress
).await
{
1521 Ok(stats
) => pull_stats
.add(stats
),
1523 task_log
!(worker
, "sync group {} failed - {}", &group
, err
,);
1524 errors
= true; // do not stop here, instead continue
1530 if params
.remove_vanished
{
1531 let result
: Result
<(), Error
> = proxmox_lang
::try_block
!({
1532 for local_group
in params
.target
.store
.iter_backup_groups(target_ns
.clone())?
{
1533 let local_group
= local_group?
;
1534 let local_group
= local_group
.group();
1535 if new_groups
.contains(local_group
) {
1538 let owner
= params
.target
.store
.get_owner(&target_ns
, local_group
)?
;
1539 if check_backup_owner(&owner
, ¶ms
.owner
).is_err() {
1542 if !local_group
.apply_filters(¶ms
.group_filter
) {
1545 task_log
!(worker
, "delete vanished group '{local_group}'",);
1546 let delete_stats_result
= params
1549 .remove_backup_group(&target_ns
, local_group
);
1551 match delete_stats_result
{
1553 if !stats
.all_removed() {
1556 "kept some protected snapshots of group '{local_group}'",
1558 pull_stats
.add(PullStats
::from(RemovedVanishedStats
{
1559 snapshots
: stats
.removed_snapshots(),
1564 pull_stats
.add(PullStats
::from(RemovedVanishedStats
{
1565 snapshots
: stats
.removed_snapshots(),
1572 task_log
!(worker
, "{}", err
);
1579 if let Err(err
) = result
{
1580 task_log
!(worker
, "error during cleanup: {}", err
);
1585 Ok((progress
, pull_stats
, errors
))