1 //! Sync datastore from remote server
3 use std
::collections
::{HashMap, HashSet}
;
4 use std
::convert
::TryFrom
;
5 use std
::io
::{Seek, SeekFrom}
;
6 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
7 use std
::sync
::{Arc, Mutex}
;
8 use std
::time
::SystemTime
;
10 use anyhow
::{bail, format_err, Error}
;
14 use proxmox_router
::HttpError
;
15 use proxmox_sys
::task_log
;
18 Authid
, GroupFilter
, GroupListItem
, RateLimitConfig
, Remote
,
22 use pbs_datastore
::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress}
;
23 use pbs_datastore
::data_blob
::DataBlob
;
24 use pbs_datastore
::dynamic_index
::DynamicIndexReader
;
25 use pbs_datastore
::fixed_index
::FixedIndexReader
;
26 use pbs_datastore
::index
::IndexFile
;
27 use pbs_datastore
::manifest
::{
28 CLIENT_LOG_BLOB_NAME
, MANIFEST_BLOB_NAME
, ArchiveType
, BackupManifest
, FileInfo
, archive_type
30 use pbs_tools
::sha
::sha256
;
31 use pbs_client
::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader}
;
32 use proxmox_rest_server
::WorkerTask
;
34 use crate::tools
::ParallelHandler
;
36 // fixme: implement filters
37 // fixme: delete vanished groups
38 // Todo: correctly lock backup groups
40 pub struct PullParameters
{
42 source
: BackupRepository
,
43 store
: Arc
<DataStore
>,
45 remove_vanished
: bool
,
46 group_filter
: Option
<Vec
<GroupFilter
>>,
47 limit
: RateLimitConfig
,
56 remove_vanished
: Option
<bool
>,
57 group_filter
: Option
<Vec
<GroupFilter
>>,
58 limit
: RateLimitConfig
,
59 ) -> Result
<Self, Error
> {
60 let store
= DataStore
::lookup_datastore(store
)?
;
62 let (remote_config
, _digest
) = pbs_config
::remote
::config()?
;
63 let remote
: Remote
= remote_config
.lookup("remote", remote
)?
;
65 let remove_vanished
= remove_vanished
.unwrap_or(false);
67 let source
= BackupRepository
::new(
68 Some(remote
.config
.auth_id
.clone()),
69 Some(remote
.config
.host
.clone()),
71 remote_store
.to_string(),
74 Ok(Self { remote, source, store, owner, remove_vanished, group_filter, limit }
)
77 pub async
fn client(&self) -> Result
<HttpClient
, Error
> {
78 crate::api2
::config
::remote
::remote_client(&self.remote
, Some(self.limit
.clone())).await
82 async
fn pull_index_chunks
<I
: IndexFile
>(
84 chunk_reader
: RemoteChunkReader
,
85 target
: Arc
<DataStore
>,
87 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
88 ) -> Result
<(), Error
> {
89 use futures
::stream
::{self, StreamExt, TryStreamExt}
;
91 let start_time
= SystemTime
::now();
93 let stream
= stream
::iter(
94 (0..index
.index_count())
95 .map(|pos
| index
.chunk_info(pos
).unwrap())
97 let mut guard
= downloaded_chunks
.lock().unwrap();
98 let done
= guard
.contains(&info
.digest
);
100 // Note: We mark a chunk as downloaded before its actually downloaded
101 // to avoid duplicate downloads.
102 guard
.insert(info
.digest
);
108 let target2
= target
.clone();
109 let verify_pool
= ParallelHandler
::new(
112 move |(chunk
, digest
, size
): (DataBlob
, [u8; 32], u64)| {
113 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
114 chunk
.verify_unencrypted(size
as usize, &digest
)?
;
115 target2
.insert_chunk(&chunk
, &digest
)?
;
120 let verify_and_write_channel
= verify_pool
.channel();
122 let bytes
= Arc
::new(AtomicUsize
::new(0));
126 let target
= Arc
::clone(&target
);
127 let chunk_reader
= chunk_reader
.clone();
128 let bytes
= Arc
::clone(&bytes
);
129 let verify_and_write_channel
= verify_and_write_channel
.clone();
131 Ok
::<_
, Error
>(async
move {
132 let chunk_exists
= proxmox_async
::runtime
::block_in_place(|| {
133 target
.cond_touch_chunk(&info
.digest
, false)
136 //task_log!(worker, "chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest));
137 return Ok
::<_
, Error
>(());
139 //task_log!(worker, "sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest));
140 let chunk
= chunk_reader
.read_raw_chunk(&info
.digest
).await?
;
141 let raw_size
= chunk
.raw_size() as usize;
143 // decode, verify and write in a separate threads to maximize throughput
144 proxmox_async
::runtime
::block_in_place(|| {
145 verify_and_write_channel
.send((chunk
, info
.digest
, info
.size()))
148 bytes
.fetch_add(raw_size
, Ordering
::SeqCst
);
153 .try_buffer_unordered(20)
154 .try_for_each(|_res
| futures
::future
::ok(()))
157 drop(verify_and_write_channel
);
159 verify_pool
.complete()?
;
161 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
163 let bytes
= bytes
.load(Ordering
::SeqCst
);
167 "downloaded {} bytes ({:.2} MiB/s)",
169 (bytes
as f64) / (1024.0 * 1024.0 * elapsed
)
175 async
fn download_manifest(
176 reader
: &BackupReader
,
177 filename
: &std
::path
::Path
,
178 ) -> Result
<std
::fs
::File
, Error
> {
179 let mut tmp_manifest_file
= std
::fs
::OpenOptions
::new()
187 .download(MANIFEST_BLOB_NAME
, &mut tmp_manifest_file
)
190 tmp_manifest_file
.seek(SeekFrom
::Start(0))?
;
192 Ok(tmp_manifest_file
)
195 fn verify_archive(info
: &FileInfo
, csum
: &[u8; 32], size
: u64) -> Result
<(), Error
> {
196 if size
!= info
.size
{
198 "wrong size for file '{}' ({} != {})",
205 if csum
!= &info
.csum
{
206 bail
!("wrong checksum for file '{}'", info
.filename
);
212 async
fn pull_single_archive(
214 reader
: &BackupReader
,
215 chunk_reader
: &mut RemoteChunkReader
,
216 tgt_store
: Arc
<DataStore
>,
217 snapshot
: &BackupDir
,
218 archive_info
: &FileInfo
,
219 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
220 ) -> Result
<(), Error
> {
221 let archive_name
= &archive_info
.filename
;
222 let mut path
= tgt_store
.base_path();
223 path
.push(snapshot
.relative_path());
224 path
.push(archive_name
);
226 let mut tmp_path
= path
.clone();
227 tmp_path
.set_extension("tmp");
229 task_log
!(worker
, "sync archive {}", archive_name
);
231 let mut tmpfile
= std
::fs
::OpenOptions
::new()
237 reader
.download(archive_name
, &mut tmpfile
).await?
;
239 match archive_type(archive_name
)?
{
240 ArchiveType
::DynamicIndex
=> {
241 let index
= DynamicIndexReader
::new(tmpfile
).map_err(|err
| {
242 format_err
!("unable to read dynamic index {:?} - {}", tmp_path
, err
)
244 let (csum
, size
) = index
.compute_csum();
245 verify_archive(archive_info
, &csum
, size
)?
;
249 chunk_reader
.clone(),
256 ArchiveType
::FixedIndex
=> {
257 let index
= FixedIndexReader
::new(tmpfile
).map_err(|err
| {
258 format_err
!("unable to read fixed index '{:?}' - {}", tmp_path
, err
)
260 let (csum
, size
) = index
.compute_csum();
261 verify_archive(archive_info
, &csum
, size
)?
;
265 chunk_reader
.clone(),
272 ArchiveType
::Blob
=> {
273 tmpfile
.seek(SeekFrom
::Start(0))?
;
274 let (csum
, size
) = sha256(&mut tmpfile
)?
;
275 verify_archive(archive_info
, &csum
, size
)?
;
278 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
279 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
284 // Note: The client.log.blob is uploaded after the backup, so it is
285 // not mentioned in the manifest.
286 async
fn try_client_log_download(
288 reader
: Arc
<BackupReader
>,
289 path
: &std
::path
::Path
,
290 ) -> Result
<(), Error
> {
291 let mut tmp_path
= path
.to_owned();
292 tmp_path
.set_extension("tmp");
294 let tmpfile
= std
::fs
::OpenOptions
::new()
300 // Note: be silent if there is no log - only log successful download
301 if let Ok(()) = reader
.download(CLIENT_LOG_BLOB_NAME
, tmpfile
).await
{
302 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
303 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
305 task_log
!(worker
, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME
);
311 async
fn pull_snapshot(
313 reader
: Arc
<BackupReader
>,
314 tgt_store
: Arc
<DataStore
>,
315 snapshot
: &BackupDir
,
316 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
317 ) -> Result
<(), Error
> {
318 let mut manifest_name
= tgt_store
.base_path();
319 manifest_name
.push(snapshot
.relative_path());
320 manifest_name
.push(MANIFEST_BLOB_NAME
);
322 let mut client_log_name
= tgt_store
.base_path();
323 client_log_name
.push(snapshot
.relative_path());
324 client_log_name
.push(CLIENT_LOG_BLOB_NAME
);
326 let mut tmp_manifest_name
= manifest_name
.clone();
327 tmp_manifest_name
.set_extension("tmp");
329 let download_res
= download_manifest(&reader
, &tmp_manifest_name
).await
;
330 let mut tmp_manifest_file
= match download_res
{
331 Ok(manifest_file
) => manifest_file
,
333 match err
.downcast_ref
::<HttpError
>() {
334 Some(HttpError { code, message }
) => match *code
{
335 StatusCode
::NOT_FOUND
=> {
338 "skipping snapshot {} - vanished since start of sync",
344 bail
!("HTTP error {} - {}", code
, message
);
353 let tmp_manifest_blob
= DataBlob
::load_from_reader(&mut tmp_manifest_file
)?
;
355 if manifest_name
.exists() {
356 let manifest_blob
= proxmox_lang
::try_block
!({
357 let mut manifest_file
= std
::fs
::File
::open(&manifest_name
).map_err(|err
| {
359 "unable to open local manifest {:?} - {}",
365 let manifest_blob
= DataBlob
::load_from_reader(&mut manifest_file
)?
;
368 .map_err(|err
: Error
| {
370 "unable to read local manifest {:?} - {}",
376 if manifest_blob
.raw_data() == tmp_manifest_blob
.raw_data() {
377 if !client_log_name
.exists() {
378 try_client_log_download(worker
, reader
, &client_log_name
).await?
;
380 task_log
!(worker
, "no data changes");
381 let _
= std
::fs
::remove_file(&tmp_manifest_name
);
382 return Ok(()); // nothing changed
386 let manifest
= BackupManifest
::try_from(tmp_manifest_blob
)?
;
388 for item
in manifest
.files() {
389 let mut path
= tgt_store
.base_path();
390 path
.push(snapshot
.relative_path());
391 path
.push(&item
.filename
);
394 match archive_type(&item
.filename
)?
{
395 ArchiveType
::DynamicIndex
=> {
396 let index
= DynamicIndexReader
::open(&path
)?
;
397 let (csum
, size
) = index
.compute_csum();
398 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
401 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
405 ArchiveType
::FixedIndex
=> {
406 let index
= FixedIndexReader
::open(&path
)?
;
407 let (csum
, size
) = index
.compute_csum();
408 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
411 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
415 ArchiveType
::Blob
=> {
416 let mut tmpfile
= std
::fs
::File
::open(&path
)?
;
417 let (csum
, size
) = sha256(&mut tmpfile
)?
;
418 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
421 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
428 let mut chunk_reader
= RemoteChunkReader
::new(
431 item
.chunk_crypt_mode(),
442 downloaded_chunks
.clone(),
447 if let Err(err
) = std
::fs
::rename(&tmp_manifest_name
, &manifest_name
) {
448 bail
!("Atomic rename file {:?} failed - {}", manifest_name
, err
);
451 if !client_log_name
.exists() {
452 try_client_log_download(worker
, reader
, &client_log_name
).await?
;
455 // cleanup - remove stale files
456 tgt_store
.cleanup_backup_dir(snapshot
, &manifest
)?
;
461 pub async
fn pull_snapshot_from(
463 reader
: Arc
<BackupReader
>,
464 tgt_store
: Arc
<DataStore
>,
465 snapshot
: &BackupDir
,
466 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
467 ) -> Result
<(), Error
> {
468 let (_path
, is_new
, _snap_lock
) = tgt_store
.create_locked_backup_dir(&snapshot
)?
;
471 task_log
!(worker
, "sync snapshot {:?}", snapshot
.relative_path());
473 if let Err(err
) = pull_snapshot(
482 if let Err(cleanup_err
) = tgt_store
.remove_backup_dir(&snapshot
, true) {
483 task_log
!(worker
, "cleanup error - {}", cleanup_err
);
487 task_log
!(worker
, "sync snapshot {:?} done", snapshot
.relative_path());
489 task_log
!(worker
, "re-sync snapshot {:?}", snapshot
.relative_path());
498 task_log
!(worker
, "re-sync snapshot {:?} done", snapshot
.relative_path());
511 fn update(&mut self, backup_time
: i64) {
514 if backup_time
< self.oldest
{
515 self.oldest
= backup_time
;
518 if backup_time
> self.newest
{
519 self.newest
= backup_time
;
523 fn affected(&self) -> Result
<String
, Error
> {
525 0 => Ok(String
::new()),
526 1 => Ok(proxmox_time
::epoch_to_rfc3339_utc(self.oldest
)?
),
530 proxmox_time
::epoch_to_rfc3339_utc(self.oldest
)?
,
531 proxmox_time
::epoch_to_rfc3339_utc(self.newest
)?
,
538 impl std
::fmt
::Display
for SkipInfo
{
539 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
542 "skipped: {} snapshot(s) ({}) older than the newest local snapshot",
544 self.affected().map_err(|_
| std
::fmt
::Error
)?
549 pub async
fn pull_group(
552 params
: &PullParameters
,
554 progress
: &mut StoreProgress
,
555 ) -> Result
<(), Error
> {
556 let path
= format
!("api2/json/admin/datastore/{}/snapshots", params
.source
.store());
559 "backup-type": group
.backup_type(),
560 "backup-id": group
.backup_id(),
563 let mut result
= client
.get(&path
, Some(args
)).await?
;
564 let mut list
: Vec
<SnapshotListItem
> = serde_json
::from_value(result
["data"].take())?
;
566 list
.sort_unstable_by(|a
, b
| a
.backup_time
.cmp(&b
.backup_time
));
568 client
.login().await?
; // make sure auth is complete
570 let fingerprint
= client
.fingerprint();
572 let last_sync
= params
.store
.last_successful_backup(group
)?
;
574 let mut remote_snapshots
= std
::collections
::HashSet
::new();
576 // start with 16384 chunks (up to 65GB)
577 let downloaded_chunks
= Arc
::new(Mutex
::new(HashSet
::with_capacity(1024 * 64)));
579 progress
.group_snapshots
= list
.len() as u64;
581 let mut skip_info
= SkipInfo
{
587 for (pos
, item
) in list
.into_iter().enumerate() {
588 let snapshot
= BackupDir
::new(item
.backup_type
, item
.backup_id
, item
.backup_time
)?
;
590 // in-progress backups can't be synced
591 if item
.size
.is_none() {
592 task_log
!(worker
, "skipping snapshot {} - in-progress backup", snapshot
);
596 let backup_time
= snapshot
.backup_time();
598 remote_snapshots
.insert(backup_time
);
600 if let Some(last_sync_time
) = last_sync
{
601 if last_sync_time
> backup_time
{
602 skip_info
.update(backup_time
);
607 // get updated auth_info (new tickets)
608 let auth_info
= client
.login().await?
;
610 let options
= HttpClientOptions
::new_non_interactive(auth_info
.ticket
.clone(), fingerprint
.clone());
612 let new_client
= HttpClient
::new(
613 params
.source
.host(),
614 params
.source
.port(),
615 params
.source
.auth_id(),
619 let reader
= BackupReader
::start(
622 params
.source
.store(),
623 snapshot
.group().backup_type(),
624 snapshot
.group().backup_id(),
630 let result
= pull_snapshot_from(
633 params
.store
.clone(),
635 downloaded_chunks
.clone(),
639 progress
.done_snapshots
= pos
as u64 + 1;
640 task_log
!(worker
, "percentage done: {}", progress
);
642 result?
; // stop on error
645 if params
.remove_vanished
{
646 let local_list
= group
.list_backups(¶ms
.store
.base_path())?
;
647 for info
in local_list
{
648 let backup_time
= info
.backup_dir
.backup_time();
649 if remote_snapshots
.contains(&backup_time
) {
652 if info
.backup_dir
.is_protected(params
.store
.base_path()) {
655 "don't delete vanished snapshot {:?} (protected)",
656 info
.backup_dir
.relative_path()
660 task_log
!(worker
, "delete vanished snapshot {:?}", info
.backup_dir
.relative_path());
661 params
.store
.remove_backup_dir(&info
.backup_dir
, false)?
;
665 if skip_info
.count
> 0 {
666 task_log
!(worker
, "{}", skip_info
);
672 pub async
fn pull_store(
675 params
: &PullParameters
,
676 ) -> Result
<(), Error
> {
677 // explicit create shared lock to prevent GC on newly created chunks
678 let _shared_store_lock
= params
.store
.try_shared_chunk_store_lock()?
;
680 let path
= format
!("api2/json/admin/datastore/{}/groups", params
.source
.store());
682 let mut result
= client
685 .map_err(|err
| format_err
!("Failed to retrieve backup groups from remote - {}", err
))?
;
687 let mut list
: Vec
<GroupListItem
> = serde_json
::from_value(result
["data"].take())?
;
689 let total_count
= list
.len();
690 list
.sort_unstable_by(|a
, b
| {
691 let type_order
= a
.backup_type
.cmp(&b
.backup_type
);
692 if type_order
== std
::cmp
::Ordering
::Equal
{
693 a
.backup_id
.cmp(&b
.backup_id
)
699 let apply_filters
= |group
: &BackupGroup
, filters
: &[GroupFilter
]| -> bool
{
702 .any(|filter
| group
.matches(filter
))
705 let list
:Vec
<BackupGroup
> = list
707 .map(|item
| BackupGroup
::new(item
.backup_type
, item
.backup_id
))
710 let list
= if let Some(ref group_filter
) = ¶ms
.group_filter
{
711 let unfiltered_count
= list
.len();
712 let list
:Vec
<BackupGroup
> = list
715 apply_filters(&group
, group_filter
)
718 task_log
!(worker
, "found {} groups to sync (out of {} total)", list
.len(), unfiltered_count
);
721 task_log
!(worker
, "found {} groups to sync", total_count
);
725 let mut errors
= false;
727 let mut new_groups
= std
::collections
::HashSet
::new();
728 for group
in list
.iter() {
729 new_groups
.insert(group
.clone());
732 let mut progress
= StoreProgress
::new(list
.len() as u64);
734 for (done
, group
) in list
.into_iter().enumerate() {
735 progress
.done_groups
= done
as u64;
736 progress
.done_snapshots
= 0;
737 progress
.group_snapshots
= 0;
739 let (owner
, _lock_guard
) = match params
.store
.create_locked_backup_group(&group
, ¶ms
.owner
) {
740 Ok(result
) => result
,
744 "sync group {} failed - group lock failed: {}",
747 errors
= true; // do not stop here, instead continue
753 if params
.owner
!= owner
{
754 // only the owner is allowed to create additional snapshots
757 "sync group {} failed - owner check failed ({} != {})",
758 &group
, params
.owner
, owner
760 errors
= true; // do not stop here, instead continue
761 } else if let Err(err
) = pull_group(
772 "sync group {} failed - {}",
775 errors
= true; // do not stop here, instead continue
779 if params
.remove_vanished
{
780 let result
: Result
<(), Error
> = proxmox_lang
::try_block
!({
781 let local_groups
= BackupInfo
::list_backup_groups(¶ms
.store
.base_path())?
;
782 for local_group
in local_groups
{
783 if new_groups
.contains(&local_group
) {
786 if let Some(ref group_filter
) = ¶ms
.group_filter
{
787 if !apply_filters(&local_group
, group_filter
) {
793 "delete vanished group '{}/{}'",
794 local_group
.backup_type(),
795 local_group
.backup_id()
797 match params
.store
.remove_backup_group(&local_group
) {
800 task_log
!(worker
, "kept some protected snapshots of group '{}'", local_group
);
803 task_log
!(worker
, "{}", err
);
810 if let Err(err
) = result
{
811 task_log
!(worker
, "error during cleanup: {}", err
);
817 bail
!("sync failed with some errors.");