1 //! Sync datastore from remote server
3 use anyhow
::{bail, format_err, Error}
;
5 use std
::convert
::TryFrom
;
6 use std
::sync
::{Arc, Mutex}
;
7 use std
::collections
::{HashSet, HashMap}
;
8 use std
::io
::{Seek, SeekFrom}
;
9 use std
::time
::SystemTime
;
10 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
12 use proxmox
::api
::error
::{StatusCode, HttpError}
;
14 tools
::{ParallelHandler, compute_file_csum}
,
22 // fixme: implement filters
23 // fixme: delete vanished groups
24 // Todo: correctly lock backup groups
26 async
fn pull_index_chunks
<I
: IndexFile
>(
28 chunk_reader
: RemoteChunkReader
,
29 target
: Arc
<DataStore
>,
31 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
32 ) -> Result
<(), Error
> {
34 use futures
::stream
::{self, StreamExt, TryStreamExt}
;
36 let start_time
= SystemTime
::now();
38 let stream
= stream
::iter(
39 (0..index
.index_count())
40 .map(|pos
| index
.chunk_info(pos
).unwrap())
42 let mut guard
= downloaded_chunks
.lock().unwrap();
43 let done
= guard
.contains(&info
.digest
);
45 // Note: We mark a chunk as downloaded before its actually downloaded
46 // to avoid duplicate downloads.
47 guard
.insert(info
.digest
);
53 let target2
= target
.clone();
54 let verify_pool
= ParallelHandler
::new(
55 "sync chunk writer", 4,
56 move |(chunk
, digest
, size
): (DataBlob
, [u8;32], u64)| {
57 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
58 chunk
.verify_unencrypted(size
as usize, &digest
)?
;
59 target2
.insert_chunk(&chunk
, &digest
)?
;
64 let verify_and_write_channel
= verify_pool
.channel();
66 let bytes
= Arc
::new(AtomicUsize
::new(0));
71 let target
= Arc
::clone(&target
);
72 let chunk_reader
= chunk_reader
.clone();
73 let bytes
= Arc
::clone(&bytes
);
74 let verify_and_write_channel
= verify_and_write_channel
.clone();
76 Ok
::<_
, Error
>(async
move {
77 let chunk_exists
= crate::tools
::runtime
::block_in_place(|| target
.cond_touch_chunk(&info
.digest
, false))?
;
79 //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
80 return Ok
::<_
, Error
>(());
82 //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
83 let chunk
= chunk_reader
.read_raw_chunk(&info
.digest
).await?
;
84 let raw_size
= chunk
.raw_size() as usize;
86 // decode, verify and write in a separate threads to maximize throughput
87 crate::tools
::runtime
::block_in_place(|| verify_and_write_channel
.send((chunk
, info
.digest
, info
.size())))?
;
89 bytes
.fetch_add(raw_size
, Ordering
::SeqCst
);
94 .try_buffer_unordered(20)
95 .try_for_each(|_res
| futures
::future
::ok(()))
98 drop(verify_and_write_channel
);
100 verify_pool
.complete()?
;
102 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
104 let bytes
= bytes
.load(Ordering
::SeqCst
);
106 worker
.log(format
!("downloaded {} bytes ({:.2} MiB/s)", bytes
, (bytes
as f64)/(1024.0*1024.0*elapsed
)));
111 async
fn download_manifest(
112 reader
: &BackupReader
,
113 filename
: &std
::path
::Path
,
114 ) -> Result
<std
::fs
::File
, Error
> {
116 let mut tmp_manifest_file
= std
::fs
::OpenOptions
::new()
123 reader
.download(MANIFEST_BLOB_NAME
, &mut tmp_manifest_file
).await?
;
125 tmp_manifest_file
.seek(SeekFrom
::Start(0))?
;
127 Ok(tmp_manifest_file
)
134 ) -> Result
<(), Error
> {
135 if size
!= info
.size
{
136 bail
!("wrong size for file '{}' ({} != {})", info
.filename
, info
.size
, size
);
139 if csum
!= &info
.csum
{
140 bail
!("wrong checksum for file '{}'", info
.filename
);
146 async
fn pull_single_archive(
148 reader
: &BackupReader
,
149 chunk_reader
: &mut RemoteChunkReader
,
150 tgt_store
: Arc
<DataStore
>,
151 snapshot
: &BackupDir
,
152 archive_info
: &FileInfo
,
153 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
154 ) -> Result
<(), Error
> {
156 let archive_name
= &archive_info
.filename
;
157 let mut path
= tgt_store
.base_path();
158 path
.push(snapshot
.relative_path());
159 path
.push(archive_name
);
161 let mut tmp_path
= path
.clone();
162 tmp_path
.set_extension("tmp");
164 worker
.log(format
!("sync archive {}", archive_name
));
165 let mut tmpfile
= std
::fs
::OpenOptions
::new()
171 reader
.download(archive_name
, &mut tmpfile
).await?
;
173 match archive_type(archive_name
)?
{
174 ArchiveType
::DynamicIndex
=> {
175 let index
= DynamicIndexReader
::new(tmpfile
)
176 .map_err(|err
| format_err
!("unable to read dynamic index {:?} - {}", tmp_path
, err
))?
;
177 let (csum
, size
) = index
.compute_csum();
178 verify_archive(archive_info
, &csum
, size
)?
;
180 pull_index_chunks(worker
, chunk_reader
.clone(), tgt_store
.clone(), index
, downloaded_chunks
).await?
;
182 ArchiveType
::FixedIndex
=> {
183 let index
= FixedIndexReader
::new(tmpfile
)
184 .map_err(|err
| format_err
!("unable to read fixed index '{:?}' - {}", tmp_path
, err
))?
;
185 let (csum
, size
) = index
.compute_csum();
186 verify_archive(archive_info
, &csum
, size
)?
;
188 pull_index_chunks(worker
, chunk_reader
.clone(), tgt_store
.clone(), index
, downloaded_chunks
).await?
;
190 ArchiveType
::Blob
=> {
191 let (csum
, size
) = compute_file_csum(&mut tmpfile
)?
;
192 verify_archive(archive_info
, &csum
, size
)?
;
195 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
196 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
201 // Note: The client.log.blob is uploaded after the backup, so it is
202 // not mentioned in the manifest.
203 async
fn try_client_log_download(
205 reader
: Arc
<BackupReader
>,
206 path
: &std
::path
::Path
,
207 ) -> Result
<(), Error
> {
209 let mut tmp_path
= path
.to_owned();
210 tmp_path
.set_extension("tmp");
212 let tmpfile
= std
::fs
::OpenOptions
::new()
218 // Note: be silent if there is no log - only log successful download
219 if let Ok(()) = reader
.download(CLIENT_LOG_BLOB_NAME
, tmpfile
).await
{
220 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
221 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
223 worker
.log(format
!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME
));
229 async
fn pull_snapshot(
231 reader
: Arc
<BackupReader
>,
232 tgt_store
: Arc
<DataStore
>,
233 snapshot
: &BackupDir
,
234 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
235 ) -> Result
<(), Error
> {
237 let mut manifest_name
= tgt_store
.base_path();
238 manifest_name
.push(snapshot
.relative_path());
239 manifest_name
.push(MANIFEST_BLOB_NAME
);
241 let mut client_log_name
= tgt_store
.base_path();
242 client_log_name
.push(snapshot
.relative_path());
243 client_log_name
.push(CLIENT_LOG_BLOB_NAME
);
245 let mut tmp_manifest_name
= manifest_name
.clone();
246 tmp_manifest_name
.set_extension("tmp");
248 let download_res
= download_manifest(&reader
, &tmp_manifest_name
).await
;
249 let mut tmp_manifest_file
= match download_res
{
250 Ok(manifest_file
) => manifest_file
,
252 match err
.downcast_ref
::<HttpError
>() {
253 Some(HttpError { code, message }
) => {
255 StatusCode
::NOT_FOUND
=> {
256 worker
.log(format
!("skipping snapshot {} - vanished since start of sync", snapshot
));
260 bail
!("HTTP error {} - {}", code
, message
);
270 let tmp_manifest_blob
= DataBlob
::load_from_reader(&mut tmp_manifest_file
)?
;
272 if manifest_name
.exists() {
273 let manifest_blob
= proxmox
::try_block
!({
274 let mut manifest_file
= std
::fs
::File
::open(&manifest_name
)
275 .map_err(|err
| format_err
!("unable to open local manifest {:?} - {}", manifest_name
, err
))?
;
277 let manifest_blob
= DataBlob
::load_from_reader(&mut manifest_file
)?
;
279 }).map_err(|err
: Error
| {
280 format_err
!("unable to read local manifest {:?} - {}", manifest_name
, err
)
283 if manifest_blob
.raw_data() == tmp_manifest_blob
.raw_data() {
284 if !client_log_name
.exists() {
285 try_client_log_download(worker
, reader
, &client_log_name
).await?
;
287 worker
.log("no data changes");
288 let _
= std
::fs
::remove_file(&tmp_manifest_name
);
289 return Ok(()); // nothing changed
293 let manifest
= BackupManifest
::try_from(tmp_manifest_blob
)?
;
295 for item
in manifest
.files() {
296 let mut path
= tgt_store
.base_path();
297 path
.push(snapshot
.relative_path());
298 path
.push(&item
.filename
);
301 match archive_type(&item
.filename
)?
{
302 ArchiveType
::DynamicIndex
=> {
303 let index
= DynamicIndexReader
::open(&path
)?
;
304 let (csum
, size
) = index
.compute_csum();
305 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
308 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
312 ArchiveType
::FixedIndex
=> {
313 let index
= FixedIndexReader
::open(&path
)?
;
314 let (csum
, size
) = index
.compute_csum();
315 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
318 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
322 ArchiveType
::Blob
=> {
323 let mut tmpfile
= std
::fs
::File
::open(&path
)?
;
324 let (csum
, size
) = compute_file_csum(&mut tmpfile
)?
;
325 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
328 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
335 let mut chunk_reader
= RemoteChunkReader
::new(reader
.clone(), None
, item
.chunk_crypt_mode(), HashMap
::new());
344 downloaded_chunks
.clone(),
348 if let Err(err
) = std
::fs
::rename(&tmp_manifest_name
, &manifest_name
) {
349 bail
!("Atomic rename file {:?} failed - {}", manifest_name
, err
);
352 if !client_log_name
.exists() {
353 try_client_log_download(worker
, reader
, &client_log_name
).await?
;
356 // cleanup - remove stale files
357 tgt_store
.cleanup_backup_dir(snapshot
, &manifest
)?
;
362 pub async
fn pull_snapshot_from(
364 reader
: Arc
<BackupReader
>,
365 tgt_store
: Arc
<DataStore
>,
366 snapshot
: &BackupDir
,
367 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
368 ) -> Result
<(), Error
> {
370 let (_path
, is_new
, _snap_lock
) = tgt_store
.create_locked_backup_dir(&snapshot
)?
;
373 worker
.log(format
!("sync snapshot {:?}", snapshot
.relative_path()));
375 if let Err(err
) = pull_snapshot(worker
, reader
, tgt_store
.clone(), &snapshot
, downloaded_chunks
).await
{
376 if let Err(cleanup_err
) = tgt_store
.remove_backup_dir(&snapshot
, true) {
377 worker
.log(format
!("cleanup error - {}", cleanup_err
));
381 worker
.log(format
!("sync snapshot {:?} done", snapshot
.relative_path()));
383 worker
.log(format
!("re-sync snapshot {:?}", snapshot
.relative_path()));
384 pull_snapshot(worker
, reader
, tgt_store
.clone(), &snapshot
, downloaded_chunks
).await?
;
385 worker
.log(format
!("re-sync snapshot {:?} done", snapshot
.relative_path()));
391 pub async
fn pull_group(
394 src_repo
: &BackupRepository
,
395 tgt_store
: Arc
<DataStore
>,
398 progress
: &mut StoreProgress
,
399 ) -> Result
<(), Error
> {
401 let path
= format
!("api2/json/admin/datastore/{}/snapshots", src_repo
.store());
404 "backup-type": group
.backup_type(),
405 "backup-id": group
.backup_id(),
408 let mut result
= client
.get(&path
, Some(args
)).await?
;
409 let mut list
: Vec
<SnapshotListItem
> = serde_json
::from_value(result
["data"].take())?
;
411 list
.sort_unstable_by(|a
, b
| a
.backup_time
.cmp(&b
.backup_time
));
413 client
.login().await?
; // make sure auth is complete
415 let fingerprint
= client
.fingerprint();
417 let last_sync
= tgt_store
.last_successful_backup(group
)?
;
419 let mut remote_snapshots
= std
::collections
::HashSet
::new();
421 // start with 16384 chunks (up to 65GB)
422 let downloaded_chunks
= Arc
::new(Mutex
::new(HashSet
::with_capacity(1024*64)));
424 progress
.group_snapshots
= list
.len() as u64;
426 for (pos
, item
) in list
.into_iter().enumerate() {
427 let snapshot
= BackupDir
::new(item
.backup_type
, item
.backup_id
, item
.backup_time
)?
;
429 // in-progress backups can't be synced
430 if item
.size
.is_none() {
431 worker
.log(format
!("skipping snapshot {} - in-progress backup", snapshot
));
435 let backup_time
= snapshot
.backup_time();
437 remote_snapshots
.insert(backup_time
);
439 if let Some(last_sync_time
) = last_sync
{
440 if last_sync_time
> backup_time { continue; }
443 // get updated auth_info (new tickets)
444 let auth_info
= client
.login().await?
;
446 let options
= HttpClientOptions
::new()
447 .password(Some(auth_info
.ticket
.clone()))
448 .fingerprint(fingerprint
.clone());
450 let new_client
= HttpClient
::new(src_repo
.host(), src_repo
.port(), src_repo
.auth_id(), options
)?
;
452 let reader
= BackupReader
::start(
456 snapshot
.group().backup_type(),
457 snapshot
.group().backup_id(),
462 let result
= pull_snapshot_from(worker
, reader
, tgt_store
.clone(), &snapshot
, downloaded_chunks
.clone()).await
;
464 progress
.done_snapshots
= pos
as u64 + 1;
465 worker
.log(format
!("percentage done: {}", progress
.clone()));
467 result?
; // stop on error
471 let local_list
= group
.list_backups(&tgt_store
.base_path())?
;
472 for info
in local_list
{
473 let backup_time
= info
.backup_dir
.backup_time();
474 if remote_snapshots
.contains(&backup_time
) { continue; }
475 worker
.log(format
!("delete vanished snapshot {:?}", info
.backup_dir
.relative_path()));
476 tgt_store
.remove_backup_dir(&info
.backup_dir
, false)?
;
483 pub async
fn pull_store(
486 src_repo
: &BackupRepository
,
487 tgt_store
: Arc
<DataStore
>,
490 ) -> Result
<(), Error
> {
492 // explicit create shared lock to prevent GC on newly created chunks
493 let _shared_store_lock
= tgt_store
.try_shared_chunk_store_lock()?
;
495 let path
= format
!("api2/json/admin/datastore/{}/groups", src_repo
.store());
497 let mut result
= client
.get(&path
, None
).await?
;
499 let mut list
: Vec
<GroupListItem
> = serde_json
::from_value(result
["data"].take())?
;
501 list
.sort_unstable_by(|a
, b
| {
502 let type_order
= a
.backup_type
.cmp(&b
.backup_type
);
503 if type_order
== std
::cmp
::Ordering
::Equal
{
504 a
.backup_id
.cmp(&b
.backup_id
)
510 let mut errors
= false;
512 let mut new_groups
= std
::collections
::HashSet
::new();
513 for item
in list
.iter() {
514 new_groups
.insert(BackupGroup
::new(&item
.backup_type
, &item
.backup_id
));
517 let mut progress
= StoreProgress
::new(list
.len() as u64);
519 for (done
, item
) in list
.into_iter().enumerate() {
520 progress
.done_groups
= done
as u64;
521 progress
.done_snapshots
= 0;
522 progress
.group_snapshots
= 0;
524 let group
= BackupGroup
::new(&item
.backup_type
, &item
.backup_id
);
526 let (owner
, _lock_guard
) = match tgt_store
.create_locked_backup_group(&group
, &auth_id
) {
527 Ok(result
) => result
,
529 worker
.log(format
!("sync group {}/{} failed - group lock failed: {}",
530 item
.backup_type
, item
.backup_id
, err
));
531 errors
= true; // do not stop here, instead continue
537 if auth_id
!= owner
{ // only the owner is allowed to create additional snapshots
538 worker
.log(format
!("sync group {}/{} failed - owner check failed ({} != {})",
539 item
.backup_type
, item
.backup_id
, auth_id
, owner
));
540 errors
= true; // do not stop here, instead continue
542 } else if let Err(err
) = pull_group(
552 "sync group {}/{} failed - {}",
557 errors
= true; // do not stop here, instead continue
562 let result
: Result
<(), Error
> = proxmox
::try_block
!({
563 let local_groups
= BackupInfo
::list_backup_groups(&tgt_store
.base_path())?
;
564 for local_group
in local_groups
{
565 if new_groups
.contains(&local_group
) { continue; }
566 worker
.log(format
!("delete vanished group '{}/{}'", local_group
.backup_type(), local_group
.backup_id()));
567 if let Err(err
) = tgt_store
.remove_backup_group(&local_group
) {
568 worker
.log(err
.to_string());
574 if let Err(err
) = result
{
575 worker
.log(format
!("error during cleanup: {}", err
));
581 bail
!("sync failed with some errors.");