1 //! Sync datastore from remote server
3 use std
::collections
::{HashMap, HashSet}
;
4 use std
::convert
::TryFrom
;
5 use std
::io
::{Seek, SeekFrom}
;
6 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
7 use std
::sync
::{Arc, Mutex}
;
8 use std
::time
::SystemTime
;
10 use anyhow
::{bail, format_err, Error}
;
13 use proxmox
::api
::error
::{HttpError, StatusCode}
;
15 use pbs_api_types
::{Authid, SnapshotListItem, GroupListItem}
;
16 use pbs_datastore
::{DataStore, BackupInfo, BackupDir, BackupGroup, StoreProgress}
;
17 use pbs_datastore
::data_blob
::DataBlob
;
18 use pbs_datastore
::dynamic_index
::DynamicIndexReader
;
19 use pbs_datastore
::fixed_index
::FixedIndexReader
;
20 use pbs_datastore
::index
::IndexFile
;
21 use pbs_datastore
::manifest
::{
22 CLIENT_LOG_BLOB_NAME
, MANIFEST_BLOB_NAME
, ArchiveType
, BackupManifest
, FileInfo
, archive_type
24 use pbs_tools
::sha
::sha256
;
25 use pbs_tools
::task_log
;
26 use pbs_client
::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader}
;
27 use proxmox_rest_server
::WorkerTask
;
29 use crate::tools
::ParallelHandler
;
31 // fixme: implement filters
32 // fixme: delete vanished groups
33 // Todo: correctly lock backup groups
35 async
fn pull_index_chunks
<I
: IndexFile
>(
37 chunk_reader
: RemoteChunkReader
,
38 target
: Arc
<DataStore
>,
40 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
41 ) -> Result
<(), Error
> {
42 use futures
::stream
::{self, StreamExt, TryStreamExt}
;
44 let start_time
= SystemTime
::now();
46 let stream
= stream
::iter(
47 (0..index
.index_count())
48 .map(|pos
| index
.chunk_info(pos
).unwrap())
50 let mut guard
= downloaded_chunks
.lock().unwrap();
51 let done
= guard
.contains(&info
.digest
);
53 // Note: We mark a chunk as downloaded before its actually downloaded
54 // to avoid duplicate downloads.
55 guard
.insert(info
.digest
);
61 let target2
= target
.clone();
62 let verify_pool
= ParallelHandler
::new(
65 move |(chunk
, digest
, size
): (DataBlob
, [u8; 32], u64)| {
66 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
67 chunk
.verify_unencrypted(size
as usize, &digest
)?
;
68 target2
.insert_chunk(&chunk
, &digest
)?
;
73 let verify_and_write_channel
= verify_pool
.channel();
75 let bytes
= Arc
::new(AtomicUsize
::new(0));
79 let target
= Arc
::clone(&target
);
80 let chunk_reader
= chunk_reader
.clone();
81 let bytes
= Arc
::clone(&bytes
);
82 let verify_and_write_channel
= verify_and_write_channel
.clone();
84 Ok
::<_
, Error
>(async
move {
85 let chunk_exists
= pbs_runtime
::block_in_place(|| {
86 target
.cond_touch_chunk(&info
.digest
, false)
89 //task_log!(worker, "chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest));
90 return Ok
::<_
, Error
>(());
92 //task_log!(worker, "sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest));
93 let chunk
= chunk_reader
.read_raw_chunk(&info
.digest
).await?
;
94 let raw_size
= chunk
.raw_size() as usize;
96 // decode, verify and write in a separate threads to maximize throughput
97 pbs_runtime
::block_in_place(|| {
98 verify_and_write_channel
.send((chunk
, info
.digest
, info
.size()))
101 bytes
.fetch_add(raw_size
, Ordering
::SeqCst
);
106 .try_buffer_unordered(20)
107 .try_for_each(|_res
| futures
::future
::ok(()))
110 drop(verify_and_write_channel
);
112 verify_pool
.complete()?
;
114 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
116 let bytes
= bytes
.load(Ordering
::SeqCst
);
120 "downloaded {} bytes ({:.2} MiB/s)",
122 (bytes
as f64) / (1024.0 * 1024.0 * elapsed
)
128 async
fn download_manifest(
129 reader
: &BackupReader
,
130 filename
: &std
::path
::Path
,
131 ) -> Result
<std
::fs
::File
, Error
> {
132 let mut tmp_manifest_file
= std
::fs
::OpenOptions
::new()
140 .download(MANIFEST_BLOB_NAME
, &mut tmp_manifest_file
)
143 tmp_manifest_file
.seek(SeekFrom
::Start(0))?
;
145 Ok(tmp_manifest_file
)
148 fn verify_archive(info
: &FileInfo
, csum
: &[u8; 32], size
: u64) -> Result
<(), Error
> {
149 if size
!= info
.size
{
151 "wrong size for file '{}' ({} != {})",
158 if csum
!= &info
.csum
{
159 bail
!("wrong checksum for file '{}'", info
.filename
);
165 async
fn pull_single_archive(
167 reader
: &BackupReader
,
168 chunk_reader
: &mut RemoteChunkReader
,
169 tgt_store
: Arc
<DataStore
>,
170 snapshot
: &BackupDir
,
171 archive_info
: &FileInfo
,
172 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
173 ) -> Result
<(), Error
> {
174 let archive_name
= &archive_info
.filename
;
175 let mut path
= tgt_store
.base_path();
176 path
.push(snapshot
.relative_path());
177 path
.push(archive_name
);
179 let mut tmp_path
= path
.clone();
180 tmp_path
.set_extension("tmp");
182 task_log
!(worker
, "sync archive {}", archive_name
);
184 let mut tmpfile
= std
::fs
::OpenOptions
::new()
190 reader
.download(archive_name
, &mut tmpfile
).await?
;
192 match archive_type(archive_name
)?
{
193 ArchiveType
::DynamicIndex
=> {
194 let index
= DynamicIndexReader
::new(tmpfile
).map_err(|err
| {
195 format_err
!("unable to read dynamic index {:?} - {}", tmp_path
, err
)
197 let (csum
, size
) = index
.compute_csum();
198 verify_archive(archive_info
, &csum
, size
)?
;
202 chunk_reader
.clone(),
209 ArchiveType
::FixedIndex
=> {
210 let index
= FixedIndexReader
::new(tmpfile
).map_err(|err
| {
211 format_err
!("unable to read fixed index '{:?}' - {}", tmp_path
, err
)
213 let (csum
, size
) = index
.compute_csum();
214 verify_archive(archive_info
, &csum
, size
)?
;
218 chunk_reader
.clone(),
225 ArchiveType
::Blob
=> {
226 tmpfile
.seek(SeekFrom
::Start(0))?
;
227 let (csum
, size
) = sha256(&mut tmpfile
)?
;
228 verify_archive(archive_info
, &csum
, size
)?
;
231 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
232 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
237 // Note: The client.log.blob is uploaded after the backup, so it is
238 // not mentioned in the manifest.
239 async
fn try_client_log_download(
241 reader
: Arc
<BackupReader
>,
242 path
: &std
::path
::Path
,
243 ) -> Result
<(), Error
> {
244 let mut tmp_path
= path
.to_owned();
245 tmp_path
.set_extension("tmp");
247 let tmpfile
= std
::fs
::OpenOptions
::new()
253 // Note: be silent if there is no log - only log successful download
254 if let Ok(()) = reader
.download(CLIENT_LOG_BLOB_NAME
, tmpfile
).await
{
255 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
256 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
258 task_log
!(worker
, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME
);
264 async
fn pull_snapshot(
266 reader
: Arc
<BackupReader
>,
267 tgt_store
: Arc
<DataStore
>,
268 snapshot
: &BackupDir
,
269 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
270 ) -> Result
<(), Error
> {
271 let mut manifest_name
= tgt_store
.base_path();
272 manifest_name
.push(snapshot
.relative_path());
273 manifest_name
.push(MANIFEST_BLOB_NAME
);
275 let mut client_log_name
= tgt_store
.base_path();
276 client_log_name
.push(snapshot
.relative_path());
277 client_log_name
.push(CLIENT_LOG_BLOB_NAME
);
279 let mut tmp_manifest_name
= manifest_name
.clone();
280 tmp_manifest_name
.set_extension("tmp");
282 let download_res
= download_manifest(&reader
, &tmp_manifest_name
).await
;
283 let mut tmp_manifest_file
= match download_res
{
284 Ok(manifest_file
) => manifest_file
,
286 match err
.downcast_ref
::<HttpError
>() {
287 Some(HttpError { code, message }
) => match *code
{
288 StatusCode
::NOT_FOUND
=> {
291 "skipping snapshot {} - vanished since start of sync",
297 bail
!("HTTP error {} - {}", code
, message
);
306 let tmp_manifest_blob
= DataBlob
::load_from_reader(&mut tmp_manifest_file
)?
;
308 if manifest_name
.exists() {
309 let manifest_blob
= proxmox
::try_block
!({
310 let mut manifest_file
= std
::fs
::File
::open(&manifest_name
).map_err(|err
| {
312 "unable to open local manifest {:?} - {}",
318 let manifest_blob
= DataBlob
::load_from_reader(&mut manifest_file
)?
;
321 .map_err(|err
: Error
| {
323 "unable to read local manifest {:?} - {}",
329 if manifest_blob
.raw_data() == tmp_manifest_blob
.raw_data() {
330 if !client_log_name
.exists() {
331 try_client_log_download(worker
, reader
, &client_log_name
).await?
;
333 task_log
!(worker
, "no data changes");
334 let _
= std
::fs
::remove_file(&tmp_manifest_name
);
335 return Ok(()); // nothing changed
339 let manifest
= BackupManifest
::try_from(tmp_manifest_blob
)?
;
341 for item
in manifest
.files() {
342 let mut path
= tgt_store
.base_path();
343 path
.push(snapshot
.relative_path());
344 path
.push(&item
.filename
);
347 match archive_type(&item
.filename
)?
{
348 ArchiveType
::DynamicIndex
=> {
349 let index
= DynamicIndexReader
::open(&path
)?
;
350 let (csum
, size
) = index
.compute_csum();
351 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
354 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
358 ArchiveType
::FixedIndex
=> {
359 let index
= FixedIndexReader
::open(&path
)?
;
360 let (csum
, size
) = index
.compute_csum();
361 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
364 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
368 ArchiveType
::Blob
=> {
369 let mut tmpfile
= std
::fs
::File
::open(&path
)?
;
370 let (csum
, size
) = sha256(&mut tmpfile
)?
;
371 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
374 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
381 let mut chunk_reader
= RemoteChunkReader
::new(
384 item
.chunk_crypt_mode(),
395 downloaded_chunks
.clone(),
400 if let Err(err
) = std
::fs
::rename(&tmp_manifest_name
, &manifest_name
) {
401 bail
!("Atomic rename file {:?} failed - {}", manifest_name
, err
);
404 if !client_log_name
.exists() {
405 try_client_log_download(worker
, reader
, &client_log_name
).await?
;
408 // cleanup - remove stale files
409 tgt_store
.cleanup_backup_dir(snapshot
, &manifest
)?
;
414 pub async
fn pull_snapshot_from(
416 reader
: Arc
<BackupReader
>,
417 tgt_store
: Arc
<DataStore
>,
418 snapshot
: &BackupDir
,
419 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
420 ) -> Result
<(), Error
> {
421 let (_path
, is_new
, _snap_lock
) = tgt_store
.create_locked_backup_dir(&snapshot
)?
;
424 task_log
!(worker
, "sync snapshot {:?}", snapshot
.relative_path());
426 if let Err(err
) = pull_snapshot(
435 if let Err(cleanup_err
) = tgt_store
.remove_backup_dir(&snapshot
, true) {
436 task_log
!(worker
, "cleanup error - {}", cleanup_err
);
440 task_log
!(worker
, "sync snapshot {:?} done", snapshot
.relative_path());
442 task_log
!(worker
, "re-sync snapshot {:?}", snapshot
.relative_path());
451 task_log
!(worker
, "re-sync snapshot {:?} done", snapshot
.relative_path());
464 fn update(&mut self, backup_time
: i64) {
467 if backup_time
< self.oldest
{
468 self.oldest
= backup_time
;
471 if backup_time
> self.newest
{
472 self.newest
= backup_time
;
476 fn affected(&self) -> Result
<String
, Error
> {
478 0 => Ok(String
::new()),
479 1 => proxmox
::tools
::time
::epoch_to_rfc3339_utc(self.oldest
),
483 proxmox
::tools
::time
::epoch_to_rfc3339_utc(self.oldest
)?
,
484 proxmox
::tools
::time
::epoch_to_rfc3339_utc(self.newest
)?
,
491 impl std
::fmt
::Display
for SkipInfo
{
492 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
495 "skipped: {} snapshot(s) ({}) older than the newest local snapshot",
497 self.affected().map_err(|_
| std
::fmt
::Error
)?
502 pub async
fn pull_group(
505 src_repo
: &BackupRepository
,
506 tgt_store
: Arc
<DataStore
>,
509 progress
: &mut StoreProgress
,
510 ) -> Result
<(), Error
> {
511 let path
= format
!("api2/json/admin/datastore/{}/snapshots", src_repo
.store());
514 "backup-type": group
.backup_type(),
515 "backup-id": group
.backup_id(),
518 let mut result
= client
.get(&path
, Some(args
)).await?
;
519 let mut list
: Vec
<SnapshotListItem
> = serde_json
::from_value(result
["data"].take())?
;
521 list
.sort_unstable_by(|a
, b
| a
.backup_time
.cmp(&b
.backup_time
));
523 client
.login().await?
; // make sure auth is complete
525 let fingerprint
= client
.fingerprint();
527 let last_sync
= tgt_store
.last_successful_backup(group
)?
;
529 let mut remote_snapshots
= std
::collections
::HashSet
::new();
531 // start with 16384 chunks (up to 65GB)
532 let downloaded_chunks
= Arc
::new(Mutex
::new(HashSet
::with_capacity(1024 * 64)));
534 progress
.group_snapshots
= list
.len() as u64;
536 let mut skip_info
= SkipInfo
{
542 for (pos
, item
) in list
.into_iter().enumerate() {
543 let snapshot
= BackupDir
::new(item
.backup_type
, item
.backup_id
, item
.backup_time
)?
;
545 // in-progress backups can't be synced
546 if item
.size
.is_none() {
547 task_log
!(worker
, "skipping snapshot {} - in-progress backup", snapshot
);
551 let backup_time
= snapshot
.backup_time();
553 remote_snapshots
.insert(backup_time
);
555 if let Some(last_sync_time
) = last_sync
{
556 if last_sync_time
> backup_time
{
557 skip_info
.update(backup_time
);
562 // get updated auth_info (new tickets)
563 let auth_info
= client
.login().await?
;
565 let options
= HttpClientOptions
::new_non_interactive(auth_info
.ticket
.clone(), fingerprint
.clone());
567 let new_client
= HttpClient
::new(
574 let reader
= BackupReader
::start(
578 snapshot
.group().backup_type(),
579 snapshot
.group().backup_id(),
585 let result
= pull_snapshot_from(
590 downloaded_chunks
.clone(),
594 progress
.done_snapshots
= pos
as u64 + 1;
595 task_log
!(worker
, "percentage done: {}", progress
);
597 result?
; // stop on error
601 let local_list
= group
.list_backups(&tgt_store
.base_path())?
;
602 for info
in local_list
{
603 let backup_time
= info
.backup_dir
.backup_time();
604 if remote_snapshots
.contains(&backup_time
) {
607 task_log
!(worker
, "delete vanished snapshot {:?}", info
.backup_dir
.relative_path());
608 tgt_store
.remove_backup_dir(&info
.backup_dir
, false)?
;
612 if skip_info
.count
> 0 {
613 task_log
!(worker
, "{}", skip_info
);
619 pub async
fn pull_store(
622 src_repo
: &BackupRepository
,
623 tgt_store
: Arc
<DataStore
>,
626 ) -> Result
<(), Error
> {
627 // explicit create shared lock to prevent GC on newly created chunks
628 let _shared_store_lock
= tgt_store
.try_shared_chunk_store_lock()?
;
630 let path
= format
!("api2/json/admin/datastore/{}/groups", src_repo
.store());
632 let mut result
= client
635 .map_err(|err
| format_err
!("Failed to retrieve backup groups from remote - {}", err
))?
;
637 let mut list
: Vec
<GroupListItem
> = serde_json
::from_value(result
["data"].take())?
;
639 task_log
!(worker
, "found {} groups to sync", list
.len());
641 list
.sort_unstable_by(|a
, b
| {
642 let type_order
= a
.backup_type
.cmp(&b
.backup_type
);
643 if type_order
== std
::cmp
::Ordering
::Equal
{
644 a
.backup_id
.cmp(&b
.backup_id
)
650 let mut errors
= false;
652 let mut new_groups
= std
::collections
::HashSet
::new();
653 for item
in list
.iter() {
654 new_groups
.insert(BackupGroup
::new(&item
.backup_type
, &item
.backup_id
));
657 let mut progress
= StoreProgress
::new(list
.len() as u64);
659 for (done
, item
) in list
.into_iter().enumerate() {
660 progress
.done_groups
= done
as u64;
661 progress
.done_snapshots
= 0;
662 progress
.group_snapshots
= 0;
664 let group
= BackupGroup
::new(&item
.backup_type
, &item
.backup_id
);
666 let (owner
, _lock_guard
) = match tgt_store
.create_locked_backup_group(&group
, &auth_id
) {
667 Ok(result
) => result
,
671 "sync group {}/{} failed - group lock failed: {}",
672 item
.backup_type
, item
.backup_id
, err
674 errors
= true; // do not stop here, instead continue
680 if auth_id
!= owner
{
681 // only the owner is allowed to create additional snapshots
684 "sync group {}/{} failed - owner check failed ({} != {})",
685 item
.backup_type
, item
.backup_id
, auth_id
, owner
687 errors
= true; // do not stop here, instead continue
688 } else if let Err(err
) = pull_group(
701 "sync group {}/{} failed - {}",
702 item
.backup_type
, item
.backup_id
, err
,
704 errors
= true; // do not stop here, instead continue
709 let result
: Result
<(), Error
> = proxmox
::try_block
!({
710 let local_groups
= BackupInfo
::list_backup_groups(&tgt_store
.base_path())?
;
711 for local_group
in local_groups
{
712 if new_groups
.contains(&local_group
) {
717 "delete vanished group '{}/{}'",
718 local_group
.backup_type(),
719 local_group
.backup_id()
721 if let Err(err
) = tgt_store
.remove_backup_group(&local_group
) {
722 task_log
!(worker
, "{}", err
.to_string());
728 if let Err(err
) = result
{
729 task_log
!(worker
, "error during cleanup: {}", err
);
735 bail
!("sync failed with some errors.");