1 //! Sync datastore from remote server
3 use std
::collections
::{HashMap, HashSet}
;
4 use std
::io
::{Seek, Write}
;
5 use std
::path
::{Path, PathBuf}
;
6 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
7 use std
::sync
::{Arc, Mutex}
;
8 use std
::time
::{Duration, SystemTime}
;
10 use anyhow
::{bail, format_err, Error}
;
12 use proxmox_human_byte
::HumanByte
;
13 use proxmox_rest_server
::WorkerTask
;
14 use proxmox_router
::HttpError
;
15 use proxmox_sys
::{task_log, task_warn}
;
19 print_store_and_ns
, Authid
, BackupDir
, BackupGroup
, BackupNamespace
, CryptMode
, GroupFilter
,
20 GroupListItem
, Operation
, RateLimitConfig
, Remote
, SnapshotListItem
, MAX_NAMESPACE_DEPTH
,
21 PRIV_DATASTORE_AUDIT
, PRIV_DATASTORE_BACKUP
, PRIV_DATASTORE_READ
,
23 use pbs_client
::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}
;
24 use pbs_config
::CachedUserInfo
;
25 use pbs_datastore
::data_blob
::DataBlob
;
26 use pbs_datastore
::dynamic_index
::DynamicIndexReader
;
27 use pbs_datastore
::fixed_index
::FixedIndexReader
;
28 use pbs_datastore
::index
::IndexFile
;
29 use pbs_datastore
::manifest
::{
30 archive_type
, ArchiveType
, BackupManifest
, FileInfo
, CLIENT_LOG_BLOB_NAME
, MANIFEST_BLOB_NAME
,
32 use pbs_datastore
::read_chunk
::AsyncReadChunk
;
34 check_backup_owner
, DataStore
, ListNamespacesRecursive
, LocalChunkReader
, StoreProgress
,
36 use pbs_tools
::sha
::sha256
;
38 use crate::backup
::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups}
;
39 use crate::tools
::parallel_handler
::ParallelHandler
;
42 backup_reader
: Arc
<BackupReader
>,
47 _dir_lock
: Arc
<Mutex
<proxmox_sys
::fs
::DirLockGuard
>>,
49 datastore
: Arc
<DataStore
>,
52 pub(crate) struct PullTarget
{
53 store
: Arc
<DataStore
>,
57 pub(crate) struct RemoteSource
{
58 repo
: BackupRepository
,
63 pub(crate) struct LocalSource
{
64 store
: Arc
<DataStore
>,
69 pub(crate) struct PullStats
{
70 pub(crate) chunk_count
: usize,
71 pub(crate) bytes
: usize,
72 pub(crate) elapsed
: Duration
,
76 fn add(&mut self, rhs
: PullStats
) {
77 self.chunk_count
+= rhs
.chunk_count
;
78 self.bytes
+= rhs
.bytes
;
79 self.elapsed
+= rhs
.elapsed
;
83 #[async_trait::async_trait]
84 /// `PullSource` is a trait that provides an interface for pulling data/information from a source.
85 /// The trait includes methods for listing namespaces, groups, and backup directories,
86 /// as well as retrieving a reader for reading data from the source
87 trait PullSource
: Send
+ Sync
{
88 /// Lists namespaces from the source.
89 async
fn list_namespaces(
91 max_depth
: &mut Option
<usize>,
93 ) -> Result
<Vec
<BackupNamespace
>, Error
>;
95 /// Lists groups within a specific namespace from the source.
98 namespace
: &BackupNamespace
,
100 ) -> Result
<Vec
<BackupGroup
>, Error
>;
102 /// Lists backup directories for a specific group within a specific namespace from the source.
103 async
fn list_backup_dirs(
105 namespace
: &BackupNamespace
,
108 ) -> Result
<Vec
<BackupDir
>, Error
>;
109 fn get_ns(&self) -> BackupNamespace
;
110 fn get_store(&self) -> &str;
112 /// Returns a reader for reading data from a specific backup directory.
115 ns
: &BackupNamespace
,
117 ) -> Result
<Arc
<dyn PullReader
>, Error
>;
120 #[async_trait::async_trait]
121 impl PullSource
for RemoteSource
{
122 async
fn list_namespaces(
124 max_depth
: &mut Option
<usize>,
126 ) -> Result
<Vec
<BackupNamespace
>, Error
> {
127 if self.ns
.is_root() && max_depth
.map_or(false, |depth
| depth
== 0) {
128 return Ok(vec
![self.ns
.clone()]);
131 let path
= format
!("api2/json/admin/datastore/{}/namespace", self.repo
.store());
132 let mut data
= json
!({}
);
133 if let Some(max_depth
) = max_depth
{
134 data
["max-depth"] = json
!(max_depth
);
137 if !self.ns
.is_root() {
138 data
["parent"] = json
!(self.ns
);
140 self.client
.login().await?
;
142 let mut result
= match self.client
.get(&path
, Some(data
)).await
{
144 Err(err
) => match err
.downcast_ref
::<HttpError
>() {
145 Some(HttpError { code, message }
) => match code
{
146 &StatusCode
::NOT_FOUND
=> {
147 if self.ns
.is_root() && max_depth
.is_none() {
148 task_warn
!(worker
, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
149 task_warn
!(worker
, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
150 max_depth
.replace(0);
152 bail
!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
155 return Ok(vec
![self.ns
.clone()]);
158 bail
!("Querying namespaces failed - HTTP error {code} - {message}");
162 bail
!("Querying namespaces failed - {err}");
167 let list
: Vec
<BackupNamespace
> =
168 serde_json
::from_value
::<Vec
<pbs_api_types
::NamespaceListItem
>>(result
["data"].take())?
170 .map(|list_item
| list_item
.ns
)
176 async
fn list_groups(
178 namespace
: &BackupNamespace
,
180 ) -> Result
<Vec
<BackupGroup
>, Error
> {
181 let path
= format
!("api2/json/admin/datastore/{}/groups", self.repo
.store());
183 let args
= if !namespace
.is_root() {
184 Some(json
!({ "ns": namespace.clone() }
))
189 self.client
.login().await?
;
191 self.client
.get(&path
, args
).await
.map_err(|err
| {
192 format_err
!("Failed to retrieve backup groups from remote - {}", err
)
196 serde_json
::from_value
::<Vec
<GroupListItem
>>(result
["data"].take())
197 .map_err(Error
::from
)?
199 .map(|item
| item
.backup
)
200 .collect
::<Vec
<BackupGroup
>>(),
204 async
fn list_backup_dirs(
206 namespace
: &BackupNamespace
,
209 ) -> Result
<Vec
<BackupDir
>, Error
> {
210 let path
= format
!("api2/json/admin/datastore/{}/snapshots", self.repo
.store());
212 let mut args
= json
!({
213 "backup-type": group
.ty
,
214 "backup-id": group
.id
,
217 if !namespace
.is_root() {
218 args
["ns"] = serde_json
::to_value(namespace
)?
;
221 self.client
.login().await?
;
223 let mut result
= self.client
.get(&path
, Some(args
)).await?
;
224 let snapshot_list
: Vec
<SnapshotListItem
> = serde_json
::from_value(result
["data"].take())?
;
227 .filter_map(|item
: SnapshotListItem
| {
228 let snapshot
= item
.backup
;
229 // in-progress backups can't be synced
230 if item
.size
.is_none() {
233 "skipping snapshot {} - in-progress backup",
241 .collect
::<Vec
<BackupDir
>>())
244 fn get_ns(&self) -> BackupNamespace
{
248 fn get_store(&self) -> &str {
254 ns
: &BackupNamespace
,
256 ) -> Result
<Arc
<dyn PullReader
>, Error
> {
258 BackupReader
::start(&self.client
, None
, self.repo
.store(), ns
, dir
, true).await?
;
259 Ok(Arc
::new(RemoteReader
{
266 #[async_trait::async_trait]
267 impl PullSource
for LocalSource
{
268 async
fn list_namespaces(
270 max_depth
: &mut Option
<usize>,
271 _worker
: &WorkerTask
,
272 ) -> Result
<Vec
<BackupNamespace
>, Error
> {
273 ListNamespacesRecursive
::new_max_depth(
276 max_depth
.unwrap_or(MAX_NAMESPACE_DEPTH
),
281 async
fn list_groups(
283 namespace
: &BackupNamespace
,
285 ) -> Result
<Vec
<BackupGroup
>, Error
> {
286 Ok(ListAccessibleBackupGroups
::new_with_privs(
290 Some(PRIV_DATASTORE_READ
),
291 Some(PRIV_DATASTORE_BACKUP
),
294 .filter_map(Result
::ok
)
295 .map(|backup_group
| backup_group
.group().clone())
296 .collect
::<Vec
<pbs_api_types
::BackupGroup
>>())
299 async
fn list_backup_dirs(
301 namespace
: &BackupNamespace
,
303 _worker
: &WorkerTask
,
304 ) -> Result
<Vec
<BackupDir
>, Error
> {
307 .backup_group(namespace
.clone(), group
.clone())
309 .filter_map(Result
::ok
)
310 .map(|snapshot
| snapshot
.dir().to_owned())
311 .collect
::<Vec
<BackupDir
>>())
314 fn get_ns(&self) -> BackupNamespace
{
318 fn get_store(&self) -> &str {
324 ns
: &BackupNamespace
,
326 ) -> Result
<Arc
<dyn PullReader
>, Error
> {
327 let dir
= self.store
.backup_dir(ns
.clone(), dir
.clone())?
;
328 let dir_lock
= proxmox_sys
::fs
::lock_dir_noblock_shared(
331 "locked by another operation",
333 Ok(Arc
::new(LocalReader
{
334 _dir_lock
: Arc
::new(Mutex
::new(dir_lock
)),
335 path
: dir
.full_path(),
336 datastore
: dir
.datastore().clone(),
341 #[async_trait::async_trait]
342 /// `PullReader` is a trait that provides an interface for reading data from a source.
343 /// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
344 trait PullReader
: Send
+ Sync
{
345 /// Returns a chunk reader with the specified encryption mode.
346 fn chunk_reader(&self, crypt_mode
: CryptMode
) -> Arc
<dyn AsyncReadChunk
>;
348 /// Asynchronously loads a file from the source into a local file.
349 /// `filename` is the name of the file to load from the source.
350 /// `into` is the path of the local file to load the source file into.
351 async
fn load_file_into(
356 ) -> Result
<Option
<DataBlob
>, Error
>;
358 /// Tries to download the client log from the source and save it into a local file.
359 async
fn try_download_client_log(
363 ) -> Result
<(), Error
>;
365 fn skip_chunk_sync(&self, target_store_name
: &str) -> bool
;
368 #[async_trait::async_trait]
369 impl PullReader
for RemoteReader
{
370 fn chunk_reader(&self, crypt_mode
: CryptMode
) -> Arc
<dyn AsyncReadChunk
> {
371 Arc
::new(RemoteChunkReader
::new(
372 self.backup_reader
.clone(),
379 async
fn load_file_into(
384 ) -> Result
<Option
<DataBlob
>, Error
> {
385 let mut tmp_file
= std
::fs
::OpenOptions
::new()
391 let download_result
= self.backup_reader
.download(filename
, &mut tmp_file
).await
;
392 if let Err(err
) = download_result
{
393 match err
.downcast_ref
::<HttpError
>() {
394 Some(HttpError { code, message }
) => match *code
{
395 StatusCode
::NOT_FOUND
=> {
398 "skipping snapshot {} - vanished since start of sync",
404 bail
!("HTTP error {code} - {message}");
413 Ok(DataBlob
::load_from_reader(&mut tmp_file
).ok())
416 async
fn try_download_client_log(
420 ) -> Result
<(), Error
> {
421 let mut tmp_path
= to_path
.to_owned();
422 tmp_path
.set_extension("tmp");
424 let tmpfile
= std
::fs
::OpenOptions
::new()
430 // Note: be silent if there is no log - only log successful download
433 .download(CLIENT_LOG_BLOB_NAME
, tmpfile
)
436 if let Err(err
) = std
::fs
::rename(&tmp_path
, to_path
) {
437 bail
!("Atomic rename file {:?} failed - {}", to_path
, err
);
439 task_log
!(worker
, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME
);
445 fn skip_chunk_sync(&self, _target_store_name
: &str) -> bool
{
450 #[async_trait::async_trait]
451 impl PullReader
for LocalReader
{
452 fn chunk_reader(&self, crypt_mode
: CryptMode
) -> Arc
<dyn AsyncReadChunk
> {
453 Arc
::new(LocalChunkReader
::new(
454 self.datastore
.clone(),
460 async
fn load_file_into(
464 _worker
: &WorkerTask
,
465 ) -> Result
<Option
<DataBlob
>, Error
> {
466 let mut tmp_file
= std
::fs
::OpenOptions
::new()
472 let mut from_path
= self.path
.clone();
473 from_path
.push(filename
);
474 tmp_file
.write_all(std
::fs
::read(from_path
)?
.as_slice())?
;
476 Ok(DataBlob
::load_from_reader(&mut tmp_file
).ok())
479 async
fn try_download_client_log(
482 _worker
: &WorkerTask
,
483 ) -> Result
<(), Error
> {
487 fn skip_chunk_sync(&self, target_store_name
: &str) -> bool
{
488 self.datastore
.name() == target_store_name
492 /// Parameters for a pull operation.
493 pub(crate) struct PullParameters
{
494 /// Where data is pulled from
495 source
: Arc
<dyn PullSource
>,
496 /// Where data should be pulled into
498 /// Owner of synced groups (needs to match local owner of pre-existing groups)
500 /// Whether to remove groups which exist locally, but not on the remote end
501 remove_vanished
: bool
,
502 /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion)
503 max_depth
: Option
<usize>,
504 /// Filters for reducing the pull scope
505 group_filter
: Vec
<GroupFilter
>,
506 /// How many snapshots should be transferred at most (taking the newest N snapshots)
507 transfer_last
: Option
<usize>,
510 impl PullParameters
{
511 /// Creates a new instance of `PullParameters`.
515 remote
: Option
<&str>,
517 remote_ns
: BackupNamespace
,
519 remove_vanished
: Option
<bool
>,
520 max_depth
: Option
<usize>,
521 group_filter
: Option
<Vec
<GroupFilter
>>,
522 limit
: RateLimitConfig
,
523 transfer_last
: Option
<usize>,
524 ) -> Result
<Self, Error
> {
525 if let Some(max_depth
) = max_depth
{
526 ns
.check_max_depth(max_depth
)?
;
527 remote_ns
.check_max_depth(max_depth
)?
;
529 let remove_vanished
= remove_vanished
.unwrap_or(false);
531 let source
: Arc
<dyn PullSource
> = if let Some(remote
) = remote
{
532 let (remote_config
, _digest
) = pbs_config
::remote
::config()?
;
533 let remote
: Remote
= remote_config
.lookup("remote", remote
)?
;
535 let repo
= BackupRepository
::new(
536 Some(remote
.config
.auth_id
.clone()),
537 Some(remote
.config
.host
.clone()),
539 remote_store
.to_string(),
541 let client
= crate::api2
::config
::remote
::remote_client_config(&remote
, Some(limit
))?
;
542 Arc
::new(RemoteSource
{
548 Arc
::new(LocalSource
{
549 store
: DataStore
::lookup_datastore(remote_store
, Some(Operation
::Read
))?
,
553 let target
= PullTarget
{
554 store
: DataStore
::lookup_datastore(store
, Some(Operation
::Write
))?
,
558 let group_filter
= group_filter
.unwrap_or_default();
572 async
fn pull_index_chunks
<I
: IndexFile
>(
574 chunk_reader
: Arc
<dyn AsyncReadChunk
>,
575 target
: Arc
<DataStore
>,
577 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
578 ) -> Result
<PullStats
, Error
> {
579 use futures
::stream
::{self, StreamExt, TryStreamExt}
;
581 let start_time
= SystemTime
::now();
583 let stream
= stream
::iter(
584 (0..index
.index_count())
585 .map(|pos
| index
.chunk_info(pos
).unwrap())
587 let mut guard
= downloaded_chunks
.lock().unwrap();
588 let done
= guard
.contains(&info
.digest
);
590 // Note: We mark a chunk as downloaded before its actually downloaded
591 // to avoid duplicate downloads.
592 guard
.insert(info
.digest
);
598 let target2
= target
.clone();
599 let verify_pool
= ParallelHandler
::new(
602 move |(chunk
, digest
, size
): (DataBlob
, [u8; 32], u64)| {
603 // println!("verify and write {}", hex::encode(&digest));
604 chunk
.verify_unencrypted(size
as usize, &digest
)?
;
605 target2
.insert_chunk(&chunk
, &digest
)?
;
610 let verify_and_write_channel
= verify_pool
.channel();
612 let bytes
= Arc
::new(AtomicUsize
::new(0));
613 let chunk_count
= Arc
::new(AtomicUsize
::new(0));
617 let target
= Arc
::clone(&target
);
618 let chunk_reader
= chunk_reader
.clone();
619 let bytes
= Arc
::clone(&bytes
);
620 let chunk_count
= Arc
::clone(&chunk_count
);
621 let verify_and_write_channel
= verify_and_write_channel
.clone();
623 Ok
::<_
, Error
>(async
move {
624 let chunk_exists
= proxmox_async
::runtime
::block_in_place(|| {
625 target
.cond_touch_chunk(&info
.digest
, false)
628 //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest));
629 return Ok
::<_
, Error
>(());
631 //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest));
632 let chunk
= chunk_reader
.read_raw_chunk(&info
.digest
).await?
;
633 let raw_size
= chunk
.raw_size() as usize;
635 // decode, verify and write in a separate threads to maximize throughput
636 proxmox_async
::runtime
::block_in_place(|| {
637 verify_and_write_channel
.send((chunk
, info
.digest
, info
.size()))
640 bytes
.fetch_add(raw_size
, Ordering
::SeqCst
);
641 chunk_count
.fetch_add(1, Ordering
::SeqCst
);
646 .try_buffer_unordered(20)
647 .try_for_each(|_res
| futures
::future
::ok(()))
650 drop(verify_and_write_channel
);
652 verify_pool
.complete()?
;
654 let elapsed
= start_time
.elapsed()?
;
656 let bytes
= bytes
.load(Ordering
::SeqCst
);
657 let chunk_count
= chunk_count
.load(Ordering
::SeqCst
);
661 "downloaded {} ({}/s)",
662 HumanByte
::from(bytes
),
663 HumanByte
::new_binary(bytes
as f64 / elapsed
.as_secs_f64()),
673 fn verify_archive(info
: &FileInfo
, csum
: &[u8; 32], size
: u64) -> Result
<(), Error
> {
674 if size
!= info
.size
{
676 "wrong size for file '{}' ({} != {})",
683 if csum
!= &info
.csum
{
684 bail
!("wrong checksum for file '{}'", info
.filename
);
690 /// Pulls a single file referenced by a manifest.
692 /// Pulling an archive consists of the following steps:
693 /// - Load archive file into tmp file
694 /// -- Load file into tmp file
695 /// -- Verify tmp file checksum
696 /// - if archive is an index, pull referenced chunks
697 /// - Rename tmp file into real path
698 async
fn pull_single_archive
<'a
>(
699 worker
: &'a WorkerTask
,
700 reader
: Arc
<dyn PullReader
+ 'a
>,
701 snapshot
: &'a pbs_datastore
::BackupDir
,
702 archive_info
: &'a FileInfo
,
703 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
704 ) -> Result
<PullStats
, Error
> {
705 let archive_name
= &archive_info
.filename
;
706 let mut path
= snapshot
.full_path();
707 path
.push(archive_name
);
709 let mut tmp_path
= path
.clone();
710 tmp_path
.set_extension("tmp");
712 let mut pull_stats
= PullStats
::default();
714 task_log
!(worker
, "sync archive {}", archive_name
);
717 .load_file_into(archive_name
, &tmp_path
, worker
)
720 let mut tmpfile
= std
::fs
::OpenOptions
::new().read(true).open(&tmp_path
)?
;
722 match archive_type(archive_name
)?
{
723 ArchiveType
::DynamicIndex
=> {
724 let index
= DynamicIndexReader
::new(tmpfile
).map_err(|err
| {
725 format_err
!("unable to read dynamic index {:?} - {}", tmp_path
, err
)
727 let (csum
, size
) = index
.compute_csum();
728 verify_archive(archive_info
, &csum
, size
)?
;
730 if reader
.skip_chunk_sync(snapshot
.datastore().name()) {
731 task_log
!(worker
, "skipping chunk sync for same datastore");
733 let stats
= pull_index_chunks(
735 reader
.chunk_reader(archive_info
.crypt_mode
),
736 snapshot
.datastore().clone(),
741 pull_stats
.add(stats
);
744 ArchiveType
::FixedIndex
=> {
745 let index
= FixedIndexReader
::new(tmpfile
).map_err(|err
| {
746 format_err
!("unable to read fixed index '{:?}' - {}", tmp_path
, err
)
748 let (csum
, size
) = index
.compute_csum();
749 verify_archive(archive_info
, &csum
, size
)?
;
751 if reader
.skip_chunk_sync(snapshot
.datastore().name()) {
752 task_log
!(worker
, "skipping chunk sync for same datastore");
754 let stats
= pull_index_chunks(
756 reader
.chunk_reader(archive_info
.crypt_mode
),
757 snapshot
.datastore().clone(),
762 pull_stats
.add(stats
);
765 ArchiveType
::Blob
=> {
767 let (csum
, size
) = sha256(&mut tmpfile
)?
;
768 verify_archive(archive_info
, &csum
, size
)?
;
771 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
772 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
777 /// Actual implementation of pulling a snapshot.
779 /// Pulling a snapshot consists of the following steps:
780 /// - (Re)download the manifest
781 /// -- if it matches, only download log and treat snapshot as already synced
782 /// - Iterate over referenced files
783 /// -- if file already exists, verify contents
784 /// -- if not, pull it from the remote
785 /// - Download log if not already existing
786 async
fn pull_snapshot
<'a
>(
787 worker
: &'a WorkerTask
,
788 reader
: Arc
<dyn PullReader
+ 'a
>,
789 snapshot
: &'a pbs_datastore
::BackupDir
,
790 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
791 ) -> Result
<PullStats
, Error
> {
792 let mut pull_stats
= PullStats
::default();
793 let mut manifest_name
= snapshot
.full_path();
794 manifest_name
.push(MANIFEST_BLOB_NAME
);
796 let mut client_log_name
= snapshot
.full_path();
797 client_log_name
.push(CLIENT_LOG_BLOB_NAME
);
799 let mut tmp_manifest_name
= manifest_name
.clone();
800 tmp_manifest_name
.set_extension("tmp");
801 let tmp_manifest_blob
;
802 if let Some(data
) = reader
803 .load_file_into(MANIFEST_BLOB_NAME
, &tmp_manifest_name
, worker
)
806 tmp_manifest_blob
= data
;
808 return Ok(pull_stats
);
811 if manifest_name
.exists() {
812 let manifest_blob
= proxmox_lang
::try_block
!({
813 let mut manifest_file
= std
::fs
::File
::open(&manifest_name
).map_err(|err
| {
814 format_err
!("unable to open local manifest {manifest_name:?} - {err}")
817 let manifest_blob
= DataBlob
::load_from_reader(&mut manifest_file
)?
;
820 .map_err(|err
: Error
| {
821 format_err
!("unable to read local manifest {manifest_name:?} - {err}")
824 if manifest_blob
.raw_data() == tmp_manifest_blob
.raw_data() {
825 if !client_log_name
.exists() {
827 .try_download_client_log(&client_log_name
, worker
)
830 task_log
!(worker
, "no data changes");
831 let _
= std
::fs
::remove_file(&tmp_manifest_name
);
832 return Ok(pull_stats
); // nothing changed
836 let manifest
= BackupManifest
::try_from(tmp_manifest_blob
)?
;
838 for item
in manifest
.files() {
839 let mut path
= snapshot
.full_path();
840 path
.push(&item
.filename
);
843 match archive_type(&item
.filename
)?
{
844 ArchiveType
::DynamicIndex
=> {
845 let index
= DynamicIndexReader
::open(&path
)?
;
846 let (csum
, size
) = index
.compute_csum();
847 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
850 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
854 ArchiveType
::FixedIndex
=> {
855 let index
= FixedIndexReader
::open(&path
)?
;
856 let (csum
, size
) = index
.compute_csum();
857 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
860 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
864 ArchiveType
::Blob
=> {
865 let mut tmpfile
= std
::fs
::File
::open(&path
)?
;
866 let (csum
, size
) = sha256(&mut tmpfile
)?
;
867 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
870 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
877 let stats
= pull_single_archive(
882 downloaded_chunks
.clone(),
885 pull_stats
.add(stats
);
888 if let Err(err
) = std
::fs
::rename(&tmp_manifest_name
, &manifest_name
) {
889 bail
!("Atomic rename file {:?} failed - {}", manifest_name
, err
);
892 if !client_log_name
.exists() {
894 .try_download_client_log(&client_log_name
, worker
)
898 .cleanup_unreferenced_files(&manifest
)
899 .map_err(|err
| format_err
!("failed to cleanup unreferenced files - {err}"))?
;
904 /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
906 /// The `reader` is configured to read from the source backup directory, while the
907 /// `snapshot` is pointing to the local datastore and target namespace.
908 async
fn pull_snapshot_from
<'a
>(
909 worker
: &'a WorkerTask
,
910 reader
: Arc
<dyn PullReader
+ 'a
>,
911 snapshot
: &'a pbs_datastore
::BackupDir
,
912 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
913 ) -> Result
<PullStats
, Error
> {
914 let (_path
, is_new
, _snap_lock
) = snapshot
916 .create_locked_backup_dir(snapshot
.backup_ns(), snapshot
.as_ref())?
;
918 let pull_stats
= if is_new
{
919 task_log
!(worker
, "sync snapshot {}", snapshot
.dir());
921 match pull_snapshot(worker
, reader
, snapshot
, downloaded_chunks
).await
{
923 if let Err(cleanup_err
) = snapshot
.datastore().remove_backup_dir(
924 snapshot
.backup_ns(),
928 task_log
!(worker
, "cleanup error - {}", cleanup_err
);
933 task_log
!(worker
, "sync snapshot {} done", snapshot
.dir());
938 task_log
!(worker
, "re-sync snapshot {}", snapshot
.dir());
939 pull_snapshot(worker
, reader
, snapshot
, downloaded_chunks
).await?
945 #[derive(PartialEq, Eq)]
951 impl std
::fmt
::Display
for SkipReason
{
952 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
957 SkipReason
::AlreadySynced
=> "older than the newest local snapshot",
958 SkipReason
::TransferLast
=> "due to transfer-last",
968 skip_reason
: SkipReason
,
972 fn new(skip_reason
: SkipReason
) -> Self {
981 fn reset(&mut self) {
983 self.oldest
= i64::MAX
;
984 self.newest
= i64::MIN
;
987 fn update(&mut self, backup_time
: i64) {
990 if backup_time
< self.oldest
{
991 self.oldest
= backup_time
;
994 if backup_time
> self.newest
{
995 self.newest
= backup_time
;
999 fn affected(&self) -> Result
<String
, Error
> {
1001 0 => Ok(String
::new()),
1002 1 => Ok(proxmox_time
::epoch_to_rfc3339_utc(self.oldest
)?
),
1005 proxmox_time
::epoch_to_rfc3339_utc(self.oldest
)?
,
1006 proxmox_time
::epoch_to_rfc3339_utc(self.newest
)?
,
1012 impl std
::fmt
::Display
for SkipInfo
{
1013 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
1016 "skipped: {} snapshot(s) ({}) - {}",
1018 self.affected().map_err(|_
| std
::fmt
::Error
)?
,
1024 /// Pulls a group according to `params`.
1026 /// Pulling a group consists of the following steps:
1027 /// - Query the list of snapshots available for this group in the source namespace on the remote
1028 /// - Sort by snapshot time
1029 /// - Get last snapshot timestamp on local datastore
1030 /// - Iterate over list of snapshots
1031 /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
1032 /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
1034 /// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
1035 /// remote when querying snapshots. This allows us to interact with old remotes that don't have
1036 /// namespace support yet.
1038 /// Permission checks:
1039 /// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
1040 /// - local group owner is already checked by pull_store
1041 async
fn pull_group(
1042 worker
: &WorkerTask
,
1043 params
: &PullParameters
,
1044 source_namespace
: &BackupNamespace
,
1045 group
: &BackupGroup
,
1046 progress
: &mut StoreProgress
,
1047 ) -> Result
<PullStats
, Error
> {
1048 let mut already_synced_skip_info
= SkipInfo
::new(SkipReason
::AlreadySynced
);
1049 let mut transfer_last_skip_info
= SkipInfo
::new(SkipReason
::TransferLast
);
1051 let mut raw_list
: Vec
<BackupDir
> = params
1053 .list_backup_dirs(source_namespace
, group
, worker
)
1055 raw_list
.sort_unstable_by(|a
, b
| a
.time
.cmp(&b
.time
));
1057 let total_amount
= raw_list
.len();
1061 .map(|count
| total_amount
.saturating_sub(count
))
1062 .unwrap_or_default();
1064 let target_ns
= source_namespace
.map_prefix(¶ms
.source
.get_ns(), ¶ms
.target
.ns
)?
;
1066 let mut source_snapshots
= HashSet
::new();
1067 let last_sync_time
= params
1070 .last_successful_backup(&target_ns
, group
)?
1071 .unwrap_or(i64::MIN
);
1073 let list
: Vec
<BackupDir
> = raw_list
1076 .filter(|&(pos
, ref dir
)| {
1077 source_snapshots
.insert(dir
.time
);
1078 if last_sync_time
> dir
.time
{
1079 already_synced_skip_info
.update(dir
.time
);
1081 } else if already_synced_skip_info
.count
> 0 {
1082 task_log
!(worker
, "{}", already_synced_skip_info
);
1083 already_synced_skip_info
.reset();
1087 if pos
< cutoff
&& last_sync_time
!= dir
.time
{
1088 transfer_last_skip_info
.update(dir
.time
);
1090 } else if transfer_last_skip_info
.count
> 0 {
1091 task_log
!(worker
, "{}", transfer_last_skip_info
);
1092 transfer_last_skip_info
.reset();
1096 .map(|(_
, dir
)| dir
)
1099 // start with 65536 chunks (up to 256 GiB)
1100 let downloaded_chunks
= Arc
::new(Mutex
::new(HashSet
::with_capacity(1024 * 64)));
1102 progress
.group_snapshots
= list
.len() as u64;
1104 let mut pull_stats
= PullStats
::default();
1106 for (pos
, from_snapshot
) in list
.into_iter().enumerate() {
1107 let to_snapshot
= params
1110 .backup_dir(target_ns
.clone(), from_snapshot
.clone())?
;
1114 .reader(source_namespace
, &from_snapshot
)
1117 pull_snapshot_from(worker
, reader
, &to_snapshot
, downloaded_chunks
.clone()).await
;
1119 progress
.done_snapshots
= pos
as u64 + 1;
1120 task_log
!(worker
, "percentage done: {}", progress
);
1122 let stats
= result?
; // stop on error
1123 pull_stats
.add(stats
);
1126 if params
.remove_vanished
{
1130 .backup_group(target_ns
.clone(), group
.clone());
1131 let local_list
= group
.list_backups()?
;
1132 for info
in local_list
{
1133 let snapshot
= info
.backup_dir
;
1134 if source_snapshots
.contains(&snapshot
.backup_time()) {
1137 if snapshot
.is_protected() {
1140 "don't delete vanished snapshot {} (protected)",
1145 task_log
!(worker
, "delete vanished snapshot {}", snapshot
.dir());
1149 .remove_backup_dir(&target_ns
, snapshot
.as_ref(), false)?
;
1156 fn check_and_create_ns(params
: &PullParameters
, ns
: &BackupNamespace
) -> Result
<bool
, Error
> {
1157 let mut created
= false;
1158 let store_ns_str
= print_store_and_ns(params
.target
.store
.name(), ns
);
1160 if !ns
.is_root() && !params
.target
.store
.namespace_path(ns
).exists() {
1161 check_ns_modification_privs(params
.target
.store
.name(), ns
, ¶ms
.owner
)
1162 .map_err(|err
| format_err
!("Creating {ns} not allowed - {err}"))?
;
1164 let name
= match ns
.components().last() {
1165 Some(name
) => name
.to_owned(),
1167 bail
!("Failed to determine last component of namespace.");
1171 if let Err(err
) = params
.target
.store
.create_namespace(&ns
.parent(), name
) {
1172 bail
!("sync into {store_ns_str} failed - namespace creation failed: {err}");
1178 params
.target
.store
.name(),
1181 PRIV_DATASTORE_BACKUP
,
1183 .map_err(|err
| format_err
!("sync into {store_ns_str} not allowed - {err}"))?
;
1188 fn check_and_remove_ns(params
: &PullParameters
, local_ns
: &BackupNamespace
) -> Result
<bool
, Error
> {
1189 check_ns_modification_privs(params
.target
.store
.name(), local_ns
, ¶ms
.owner
)
1190 .map_err(|err
| format_err
!("Removing {local_ns} not allowed - {err}"))?
;
1195 .remove_namespace_recursive(local_ns
, true)
1198 fn check_and_remove_vanished_ns(
1199 worker
: &WorkerTask
,
1200 params
: &PullParameters
,
1201 synced_ns
: HashSet
<BackupNamespace
>,
1202 ) -> Result
<bool
, Error
> {
1203 let mut errors
= false;
1204 let user_info
= CachedUserInfo
::new()?
;
1206 // clamp like remote does so that we don't list more than we can ever have synced.
1207 let max_depth
= params
1209 .unwrap_or_else(|| MAX_NAMESPACE_DEPTH
- params
.source
.get_ns().depth());
1211 let mut local_ns_list
: Vec
<BackupNamespace
> = params
1214 .recursive_iter_backup_ns_ok(params
.target
.ns
.clone(), Some(max_depth
))?
1217 user_info
.lookup_privs(¶ms
.owner
, &ns
.acl_path(params
.target
.store
.name()));
1218 user_privs
& (PRIV_DATASTORE_BACKUP
| PRIV_DATASTORE_AUDIT
) != 0
1223 local_ns_list
.sort_unstable_by_key(|b
| std
::cmp
::Reverse(b
.name_len()));
1225 for local_ns
in local_ns_list
{
1226 if local_ns
== params
.target
.ns
{
1230 if synced_ns
.contains(&local_ns
) {
1234 if local_ns
.is_root() {
1237 match check_and_remove_ns(params
, &local_ns
) {
1238 Ok(true) => task_log
!(worker
, "Removed namespace {}", local_ns
),
1239 Ok(false) => task_log
!(
1241 "Did not remove namespace {} - protected snapshots remain",
1245 task_log
!(worker
, "Failed to remove namespace {} - {}", local_ns
, err
);
1254 /// Pulls a store according to `params`.
1256 /// Pulling a store consists of the following steps:
1257 /// - Query list of namespaces on the remote
1259 /// -- create sub-NS if needed (and allowed)
1260 /// -- attempt to pull each NS in turn
1261 /// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote
1263 /// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is
1264 /// passed to the remote at all to allow pulling from remotes which have no notion of namespaces.
1266 /// Permission checks:
1267 /// - access to local datastore, namespace anchor and remote entry need to be checked at call site
1268 /// - remote namespaces are filtered by remote
1269 /// - creation and removal of sub-NS checked here
1270 /// - access to sub-NS checked here
1271 pub(crate) async
fn pull_store(
1272 worker
: &WorkerTask
,
1273 mut params
: PullParameters
,
1274 ) -> Result
<PullStats
, Error
> {
1275 // explicit create shared lock to prevent GC on newly created chunks
1276 let _shared_store_lock
= params
.target
.store
.try_shared_chunk_store_lock()?
;
1277 let mut errors
= false;
1279 let old_max_depth
= params
.max_depth
;
1280 let mut namespaces
= if params
.source
.get_ns().is_root() && old_max_depth
== Some(0) {
1281 vec
![params
.source
.get_ns()] // backwards compat - don't query remote namespaces!
1285 .list_namespaces(&mut params
.max_depth
, worker
)
1289 let ns_layers_to_be_pulled
= namespaces
1291 .map(BackupNamespace
::depth
)
1293 .map_or(0, |v
| v
- params
.source
.get_ns().depth());
1294 let target_depth
= params
.target
.ns
.depth();
1296 if ns_layers_to_be_pulled
+ target_depth
> MAX_NAMESPACE_DEPTH
{
1298 "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
1299 ns_layers_to_be_pulled
,
1305 errors
|= old_max_depth
!= params
.max_depth
; // fail job if we switched to backwards-compat mode
1306 namespaces
.sort_unstable_by_key(|a
| a
.name_len());
1308 let (mut groups
, mut snapshots
) = (0, 0);
1309 let mut synced_ns
= HashSet
::with_capacity(namespaces
.len());
1310 let mut pull_stats
= PullStats
::default();
1312 for namespace
in namespaces
{
1313 let source_store_ns_str
= print_store_and_ns(params
.source
.get_store(), &namespace
);
1315 let target_ns
= namespace
.map_prefix(¶ms
.source
.get_ns(), ¶ms
.target
.ns
)?
;
1316 let target_store_ns_str
= print_store_and_ns(params
.target
.store
.name(), &target_ns
);
1318 task_log
!(worker
, "----");
1321 "Syncing {} into {}",
1322 source_store_ns_str
,
1326 synced_ns
.insert(target_ns
.clone());
1328 match check_and_create_ns(¶ms
, &target_ns
) {
1329 Ok(true) => task_log
!(worker
, "Created namespace {}", target_ns
),
1334 "Cannot sync {} into {} - {}",
1335 source_store_ns_str
,
1336 target_store_ns_str
,
1344 match pull_ns(worker
, &namespace
, &mut params
).await
{
1345 Ok((ns_progress
, ns_pull_stats
, ns_errors
)) => {
1346 errors
|= ns_errors
;
1348 pull_stats
.add(ns_pull_stats
);
1350 if params
.max_depth
!= Some(0) {
1351 groups
+= ns_progress
.done_groups
;
1352 snapshots
+= ns_progress
.done_snapshots
;
1355 "Finished syncing namespace {}, current progress: {} groups, {} snapshots",
1366 "Encountered errors while syncing namespace {} - {}",
1374 if params
.remove_vanished
{
1375 errors
|= check_and_remove_vanished_ns(worker
, ¶ms
, synced_ns
)?
;
1379 bail
!("sync failed with some errors.");
1385 /// Pulls a namespace according to `params`.
1387 /// Pulling a namespace consists of the following steps:
1388 /// - Query list of groups on the remote (in `source_ns`)
1389 /// - Filter list according to configured group filters
1390 /// - Iterate list and attempt to pull each group in turn
1391 /// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
1392 /// not or no longer available on the remote
1394 /// Permission checks:
1395 /// - remote namespaces are filtered by remote
1396 /// - owner check for vanished groups done here
1397 pub(crate) async
fn pull_ns(
1398 worker
: &WorkerTask
,
1399 namespace
: &BackupNamespace
,
1400 params
: &mut PullParameters
,
1401 ) -> Result
<(StoreProgress
, PullStats
, bool
), Error
> {
1402 let mut list
: Vec
<BackupGroup
> = params
.source
.list_groups(namespace
, ¶ms
.owner
).await?
;
1404 list
.sort_unstable_by(|a
, b
| {
1405 let type_order
= a
.ty
.cmp(&b
.ty
);
1406 if type_order
== std
::cmp
::Ordering
::Equal
{
1413 let unfiltered_count
= list
.len();
1414 let list
: Vec
<BackupGroup
> = list
1416 .filter(|group
| group
.apply_filters(¶ms
.group_filter
))
1420 "found {} groups to sync (out of {} total)",
1425 let mut errors
= false;
1427 let mut new_groups
= HashSet
::new();
1428 for group
in list
.iter() {
1429 new_groups
.insert(group
.clone());
1432 let mut progress
= StoreProgress
::new(list
.len() as u64);
1433 let mut pull_stats
= PullStats
::default();
1435 let target_ns
= namespace
.map_prefix(¶ms
.source
.get_ns(), ¶ms
.target
.ns
)?
;
1437 for (done
, group
) in list
.into_iter().enumerate() {
1438 progress
.done_groups
= done
as u64;
1439 progress
.done_snapshots
= 0;
1440 progress
.group_snapshots
= 0;
1442 let (owner
, _lock_guard
) =
1446 .create_locked_backup_group(&target_ns
, &group
, ¶ms
.owner
)
1448 Ok(result
) => result
,
1452 "sync group {} failed - group lock failed: {}",
1457 // do not stop here, instead continue
1458 task_log
!(worker
, "create_locked_backup_group failed");
1464 if params
.owner
!= owner
{
1465 // only the owner is allowed to create additional snapshots
1468 "sync group {} failed - owner check failed ({} != {})",
1473 errors
= true; // do not stop here, instead continue
1475 match pull_group(worker
, params
, namespace
, &group
, &mut progress
).await
{
1476 Ok(stats
) => pull_stats
.add(stats
),
1478 task_log
!(worker
, "sync group {} failed - {}", &group
, err
,);
1479 errors
= true; // do not stop here, instead continue
1485 if params
.remove_vanished
{
1486 let result
: Result
<(), Error
> = proxmox_lang
::try_block
!({
1487 for local_group
in params
.target
.store
.iter_backup_groups(target_ns
.clone())?
{
1488 let local_group
= local_group?
;
1489 let local_group
= local_group
.group();
1490 if new_groups
.contains(local_group
) {
1493 let owner
= params
.target
.store
.get_owner(&target_ns
, local_group
)?
;
1494 if check_backup_owner(&owner
, ¶ms
.owner
).is_err() {
1497 if !local_group
.apply_filters(¶ms
.group_filter
) {
1500 task_log
!(worker
, "delete vanished group '{local_group}'",);
1501 let delete_stats_result
= params
1504 .remove_backup_group(&target_ns
, local_group
);
1506 match delete_stats_result
{
1508 if !stats
.all_removed() {
1511 "kept some protected snapshots of group '{local_group}'",
1516 task_log
!(worker
, "{}", err
);
1523 if let Err(err
) = result
{
1524 task_log
!(worker
, "error during cleanup: {}", err
);
1529 Ok((progress
, pull_stats
, errors
))