1 //! Sync datastore from remote server
3 use std
::collections
::{HashMap, HashSet}
;
4 use std
::convert
::TryFrom
;
5 use std
::io
::{Seek, SeekFrom}
;
6 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
7 use std
::sync
::{Arc, Mutex}
;
8 use std
::time
::SystemTime
;
10 use anyhow
::{bail, format_err, Error}
;
14 use proxmox_router
::HttpError
;
16 use pbs_api_types
::{Authid, SnapshotListItem, GroupListItem}
;
17 use pbs_datastore
::{DataStore, BackupInfo, BackupDir, BackupGroup, StoreProgress}
;
18 use pbs_datastore
::data_blob
::DataBlob
;
19 use pbs_datastore
::dynamic_index
::DynamicIndexReader
;
20 use pbs_datastore
::fixed_index
::FixedIndexReader
;
21 use pbs_datastore
::index
::IndexFile
;
22 use pbs_datastore
::manifest
::{
23 CLIENT_LOG_BLOB_NAME
, MANIFEST_BLOB_NAME
, ArchiveType
, BackupManifest
, FileInfo
, archive_type
25 use pbs_tools
::sha
::sha256
;
26 use pbs_tools
::task_log
;
27 use pbs_client
::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader}
;
28 use proxmox_rest_server
::WorkerTask
;
30 use crate::tools
::ParallelHandler
;
32 // fixme: implement filters
33 // fixme: delete vanished groups
34 // Todo: correctly lock backup groups
36 async
fn pull_index_chunks
<I
: IndexFile
>(
38 chunk_reader
: RemoteChunkReader
,
39 target
: Arc
<DataStore
>,
41 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
42 ) -> Result
<(), Error
> {
43 use futures
::stream
::{self, StreamExt, TryStreamExt}
;
45 let start_time
= SystemTime
::now();
47 let stream
= stream
::iter(
48 (0..index
.index_count())
49 .map(|pos
| index
.chunk_info(pos
).unwrap())
51 let mut guard
= downloaded_chunks
.lock().unwrap();
52 let done
= guard
.contains(&info
.digest
);
54 // Note: We mark a chunk as downloaded before its actually downloaded
55 // to avoid duplicate downloads.
56 guard
.insert(info
.digest
);
62 let target2
= target
.clone();
63 let verify_pool
= ParallelHandler
::new(
66 move |(chunk
, digest
, size
): (DataBlob
, [u8; 32], u64)| {
67 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
68 chunk
.verify_unencrypted(size
as usize, &digest
)?
;
69 target2
.insert_chunk(&chunk
, &digest
)?
;
74 let verify_and_write_channel
= verify_pool
.channel();
76 let bytes
= Arc
::new(AtomicUsize
::new(0));
80 let target
= Arc
::clone(&target
);
81 let chunk_reader
= chunk_reader
.clone();
82 let bytes
= Arc
::clone(&bytes
);
83 let verify_and_write_channel
= verify_and_write_channel
.clone();
85 Ok
::<_
, Error
>(async
move {
86 let chunk_exists
= pbs_runtime
::block_in_place(|| {
87 target
.cond_touch_chunk(&info
.digest
, false)
90 //task_log!(worker, "chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest));
91 return Ok
::<_
, Error
>(());
93 //task_log!(worker, "sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest));
94 let chunk
= chunk_reader
.read_raw_chunk(&info
.digest
).await?
;
95 let raw_size
= chunk
.raw_size() as usize;
97 // decode, verify and write in a separate threads to maximize throughput
98 pbs_runtime
::block_in_place(|| {
99 verify_and_write_channel
.send((chunk
, info
.digest
, info
.size()))
102 bytes
.fetch_add(raw_size
, Ordering
::SeqCst
);
107 .try_buffer_unordered(20)
108 .try_for_each(|_res
| futures
::future
::ok(()))
111 drop(verify_and_write_channel
);
113 verify_pool
.complete()?
;
115 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
117 let bytes
= bytes
.load(Ordering
::SeqCst
);
121 "downloaded {} bytes ({:.2} MiB/s)",
123 (bytes
as f64) / (1024.0 * 1024.0 * elapsed
)
129 async
fn download_manifest(
130 reader
: &BackupReader
,
131 filename
: &std
::path
::Path
,
132 ) -> Result
<std
::fs
::File
, Error
> {
133 let mut tmp_manifest_file
= std
::fs
::OpenOptions
::new()
141 .download(MANIFEST_BLOB_NAME
, &mut tmp_manifest_file
)
144 tmp_manifest_file
.seek(SeekFrom
::Start(0))?
;
146 Ok(tmp_manifest_file
)
149 fn verify_archive(info
: &FileInfo
, csum
: &[u8; 32], size
: u64) -> Result
<(), Error
> {
150 if size
!= info
.size
{
152 "wrong size for file '{}' ({} != {})",
159 if csum
!= &info
.csum
{
160 bail
!("wrong checksum for file '{}'", info
.filename
);
166 async
fn pull_single_archive(
168 reader
: &BackupReader
,
169 chunk_reader
: &mut RemoteChunkReader
,
170 tgt_store
: Arc
<DataStore
>,
171 snapshot
: &BackupDir
,
172 archive_info
: &FileInfo
,
173 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
174 ) -> Result
<(), Error
> {
175 let archive_name
= &archive_info
.filename
;
176 let mut path
= tgt_store
.base_path();
177 path
.push(snapshot
.relative_path());
178 path
.push(archive_name
);
180 let mut tmp_path
= path
.clone();
181 tmp_path
.set_extension("tmp");
183 task_log
!(worker
, "sync archive {}", archive_name
);
185 let mut tmpfile
= std
::fs
::OpenOptions
::new()
191 reader
.download(archive_name
, &mut tmpfile
).await?
;
193 match archive_type(archive_name
)?
{
194 ArchiveType
::DynamicIndex
=> {
195 let index
= DynamicIndexReader
::new(tmpfile
).map_err(|err
| {
196 format_err
!("unable to read dynamic index {:?} - {}", tmp_path
, err
)
198 let (csum
, size
) = index
.compute_csum();
199 verify_archive(archive_info
, &csum
, size
)?
;
203 chunk_reader
.clone(),
210 ArchiveType
::FixedIndex
=> {
211 let index
= FixedIndexReader
::new(tmpfile
).map_err(|err
| {
212 format_err
!("unable to read fixed index '{:?}' - {}", tmp_path
, err
)
214 let (csum
, size
) = index
.compute_csum();
215 verify_archive(archive_info
, &csum
, size
)?
;
219 chunk_reader
.clone(),
226 ArchiveType
::Blob
=> {
227 tmpfile
.seek(SeekFrom
::Start(0))?
;
228 let (csum
, size
) = sha256(&mut tmpfile
)?
;
229 verify_archive(archive_info
, &csum
, size
)?
;
232 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
233 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
238 // Note: The client.log.blob is uploaded after the backup, so it is
239 // not mentioned in the manifest.
240 async
fn try_client_log_download(
242 reader
: Arc
<BackupReader
>,
243 path
: &std
::path
::Path
,
244 ) -> Result
<(), Error
> {
245 let mut tmp_path
= path
.to_owned();
246 tmp_path
.set_extension("tmp");
248 let tmpfile
= std
::fs
::OpenOptions
::new()
254 // Note: be silent if there is no log - only log successful download
255 if let Ok(()) = reader
.download(CLIENT_LOG_BLOB_NAME
, tmpfile
).await
{
256 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
257 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
259 task_log
!(worker
, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME
);
265 async
fn pull_snapshot(
267 reader
: Arc
<BackupReader
>,
268 tgt_store
: Arc
<DataStore
>,
269 snapshot
: &BackupDir
,
270 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
271 ) -> Result
<(), Error
> {
272 let mut manifest_name
= tgt_store
.base_path();
273 manifest_name
.push(snapshot
.relative_path());
274 manifest_name
.push(MANIFEST_BLOB_NAME
);
276 let mut client_log_name
= tgt_store
.base_path();
277 client_log_name
.push(snapshot
.relative_path());
278 client_log_name
.push(CLIENT_LOG_BLOB_NAME
);
280 let mut tmp_manifest_name
= manifest_name
.clone();
281 tmp_manifest_name
.set_extension("tmp");
283 let download_res
= download_manifest(&reader
, &tmp_manifest_name
).await
;
284 let mut tmp_manifest_file
= match download_res
{
285 Ok(manifest_file
) => manifest_file
,
287 match err
.downcast_ref
::<HttpError
>() {
288 Some(HttpError { code, message }
) => match *code
{
289 StatusCode
::NOT_FOUND
=> {
292 "skipping snapshot {} - vanished since start of sync",
298 bail
!("HTTP error {} - {}", code
, message
);
307 let tmp_manifest_blob
= DataBlob
::load_from_reader(&mut tmp_manifest_file
)?
;
309 if manifest_name
.exists() {
310 let manifest_blob
= proxmox_lang
::try_block
!({
311 let mut manifest_file
= std
::fs
::File
::open(&manifest_name
).map_err(|err
| {
313 "unable to open local manifest {:?} - {}",
319 let manifest_blob
= DataBlob
::load_from_reader(&mut manifest_file
)?
;
322 .map_err(|err
: Error
| {
324 "unable to read local manifest {:?} - {}",
330 if manifest_blob
.raw_data() == tmp_manifest_blob
.raw_data() {
331 if !client_log_name
.exists() {
332 try_client_log_download(worker
, reader
, &client_log_name
).await?
;
334 task_log
!(worker
, "no data changes");
335 let _
= std
::fs
::remove_file(&tmp_manifest_name
);
336 return Ok(()); // nothing changed
340 let manifest
= BackupManifest
::try_from(tmp_manifest_blob
)?
;
342 for item
in manifest
.files() {
343 let mut path
= tgt_store
.base_path();
344 path
.push(snapshot
.relative_path());
345 path
.push(&item
.filename
);
348 match archive_type(&item
.filename
)?
{
349 ArchiveType
::DynamicIndex
=> {
350 let index
= DynamicIndexReader
::open(&path
)?
;
351 let (csum
, size
) = index
.compute_csum();
352 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
355 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
359 ArchiveType
::FixedIndex
=> {
360 let index
= FixedIndexReader
::open(&path
)?
;
361 let (csum
, size
) = index
.compute_csum();
362 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
365 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
369 ArchiveType
::Blob
=> {
370 let mut tmpfile
= std
::fs
::File
::open(&path
)?
;
371 let (csum
, size
) = sha256(&mut tmpfile
)?
;
372 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
375 task_log
!(worker
, "detected changed file {:?} - {}", path
, err
);
382 let mut chunk_reader
= RemoteChunkReader
::new(
385 item
.chunk_crypt_mode(),
396 downloaded_chunks
.clone(),
401 if let Err(err
) = std
::fs
::rename(&tmp_manifest_name
, &manifest_name
) {
402 bail
!("Atomic rename file {:?} failed - {}", manifest_name
, err
);
405 if !client_log_name
.exists() {
406 try_client_log_download(worker
, reader
, &client_log_name
).await?
;
409 // cleanup - remove stale files
410 tgt_store
.cleanup_backup_dir(snapshot
, &manifest
)?
;
415 pub async
fn pull_snapshot_from(
417 reader
: Arc
<BackupReader
>,
418 tgt_store
: Arc
<DataStore
>,
419 snapshot
: &BackupDir
,
420 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
421 ) -> Result
<(), Error
> {
422 let (_path
, is_new
, _snap_lock
) = tgt_store
.create_locked_backup_dir(&snapshot
)?
;
425 task_log
!(worker
, "sync snapshot {:?}", snapshot
.relative_path());
427 if let Err(err
) = pull_snapshot(
436 if let Err(cleanup_err
) = tgt_store
.remove_backup_dir(&snapshot
, true) {
437 task_log
!(worker
, "cleanup error - {}", cleanup_err
);
441 task_log
!(worker
, "sync snapshot {:?} done", snapshot
.relative_path());
443 task_log
!(worker
, "re-sync snapshot {:?}", snapshot
.relative_path());
452 task_log
!(worker
, "re-sync snapshot {:?} done", snapshot
.relative_path());
465 fn update(&mut self, backup_time
: i64) {
468 if backup_time
< self.oldest
{
469 self.oldest
= backup_time
;
472 if backup_time
> self.newest
{
473 self.newest
= backup_time
;
477 fn affected(&self) -> Result
<String
, Error
> {
479 0 => Ok(String
::new()),
480 1 => Ok(proxmox_time
::epoch_to_rfc3339_utc(self.oldest
)?
),
484 proxmox_time
::epoch_to_rfc3339_utc(self.oldest
)?
,
485 proxmox_time
::epoch_to_rfc3339_utc(self.newest
)?
,
492 impl std
::fmt
::Display
for SkipInfo
{
493 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
496 "skipped: {} snapshot(s) ({}) older than the newest local snapshot",
498 self.affected().map_err(|_
| std
::fmt
::Error
)?
503 pub async
fn pull_group(
506 src_repo
: &BackupRepository
,
507 tgt_store
: Arc
<DataStore
>,
510 progress
: &mut StoreProgress
,
511 ) -> Result
<(), Error
> {
512 let path
= format
!("api2/json/admin/datastore/{}/snapshots", src_repo
.store());
515 "backup-type": group
.backup_type(),
516 "backup-id": group
.backup_id(),
519 let mut result
= client
.get(&path
, Some(args
)).await?
;
520 let mut list
: Vec
<SnapshotListItem
> = serde_json
::from_value(result
["data"].take())?
;
522 list
.sort_unstable_by(|a
, b
| a
.backup_time
.cmp(&b
.backup_time
));
524 client
.login().await?
; // make sure auth is complete
526 let fingerprint
= client
.fingerprint();
528 let last_sync
= tgt_store
.last_successful_backup(group
)?
;
530 let mut remote_snapshots
= std
::collections
::HashSet
::new();
532 // start with 16384 chunks (up to 65GB)
533 let downloaded_chunks
= Arc
::new(Mutex
::new(HashSet
::with_capacity(1024 * 64)));
535 progress
.group_snapshots
= list
.len() as u64;
537 let mut skip_info
= SkipInfo
{
543 for (pos
, item
) in list
.into_iter().enumerate() {
544 let snapshot
= BackupDir
::new(item
.backup_type
, item
.backup_id
, item
.backup_time
)?
;
546 // in-progress backups can't be synced
547 if item
.size
.is_none() {
548 task_log
!(worker
, "skipping snapshot {} - in-progress backup", snapshot
);
552 let backup_time
= snapshot
.backup_time();
554 remote_snapshots
.insert(backup_time
);
556 if let Some(last_sync_time
) = last_sync
{
557 if last_sync_time
> backup_time
{
558 skip_info
.update(backup_time
);
563 // get updated auth_info (new tickets)
564 let auth_info
= client
.login().await?
;
566 let options
= HttpClientOptions
::new_non_interactive(auth_info
.ticket
.clone(), fingerprint
.clone());
568 let new_client
= HttpClient
::new(
575 let reader
= BackupReader
::start(
579 snapshot
.group().backup_type(),
580 snapshot
.group().backup_id(),
586 let result
= pull_snapshot_from(
591 downloaded_chunks
.clone(),
595 progress
.done_snapshots
= pos
as u64 + 1;
596 task_log
!(worker
, "percentage done: {}", progress
);
598 result?
; // stop on error
602 let local_list
= group
.list_backups(&tgt_store
.base_path())?
;
603 for info
in local_list
{
604 let backup_time
= info
.backup_dir
.backup_time();
605 if remote_snapshots
.contains(&backup_time
) {
608 task_log
!(worker
, "delete vanished snapshot {:?}", info
.backup_dir
.relative_path());
609 tgt_store
.remove_backup_dir(&info
.backup_dir
, false)?
;
613 if skip_info
.count
> 0 {
614 task_log
!(worker
, "{}", skip_info
);
620 pub async
fn pull_store(
623 src_repo
: &BackupRepository
,
624 tgt_store
: Arc
<DataStore
>,
627 ) -> Result
<(), Error
> {
628 // explicit create shared lock to prevent GC on newly created chunks
629 let _shared_store_lock
= tgt_store
.try_shared_chunk_store_lock()?
;
631 let path
= format
!("api2/json/admin/datastore/{}/groups", src_repo
.store());
633 let mut result
= client
636 .map_err(|err
| format_err
!("Failed to retrieve backup groups from remote - {}", err
))?
;
638 let mut list
: Vec
<GroupListItem
> = serde_json
::from_value(result
["data"].take())?
;
640 task_log
!(worker
, "found {} groups to sync", list
.len());
642 list
.sort_unstable_by(|a
, b
| {
643 let type_order
= a
.backup_type
.cmp(&b
.backup_type
);
644 if type_order
== std
::cmp
::Ordering
::Equal
{
645 a
.backup_id
.cmp(&b
.backup_id
)
651 let mut errors
= false;
653 let mut new_groups
= std
::collections
::HashSet
::new();
654 for item
in list
.iter() {
655 new_groups
.insert(BackupGroup
::new(&item
.backup_type
, &item
.backup_id
));
658 let mut progress
= StoreProgress
::new(list
.len() as u64);
660 for (done
, item
) in list
.into_iter().enumerate() {
661 progress
.done_groups
= done
as u64;
662 progress
.done_snapshots
= 0;
663 progress
.group_snapshots
= 0;
665 let group
= BackupGroup
::new(&item
.backup_type
, &item
.backup_id
);
667 let (owner
, _lock_guard
) = match tgt_store
.create_locked_backup_group(&group
, &auth_id
) {
668 Ok(result
) => result
,
672 "sync group {}/{} failed - group lock failed: {}",
673 item
.backup_type
, item
.backup_id
, err
675 errors
= true; // do not stop here, instead continue
681 if auth_id
!= owner
{
682 // only the owner is allowed to create additional snapshots
685 "sync group {}/{} failed - owner check failed ({} != {})",
686 item
.backup_type
, item
.backup_id
, auth_id
, owner
688 errors
= true; // do not stop here, instead continue
689 } else if let Err(err
) = pull_group(
702 "sync group {}/{} failed - {}",
703 item
.backup_type
, item
.backup_id
, err
,
705 errors
= true; // do not stop here, instead continue
710 let result
: Result
<(), Error
> = proxmox_lang
::try_block
!({
711 let local_groups
= BackupInfo
::list_backup_groups(&tgt_store
.base_path())?
;
712 for local_group
in local_groups
{
713 if new_groups
.contains(&local_group
) {
718 "delete vanished group '{}/{}'",
719 local_group
.backup_type(),
720 local_group
.backup_id()
722 if let Err(err
) = tgt_store
.remove_backup_group(&local_group
) {
723 task_log
!(worker
, "{}", err
.to_string());
729 if let Err(err
) = result
{
730 task_log
!(worker
, "error during cleanup: {}", err
);
736 bail
!("sync failed with some errors.");