1 //! Sync datastore from remote server
3 use anyhow
::{bail, format_err, Error}
;
5 use std
::convert
::TryFrom
;
6 use std
::sync
::{Arc, Mutex}
;
7 use std
::collections
::{HashSet, HashMap}
;
8 use std
::io
::{Seek, SeekFrom}
;
9 use std
::time
::SystemTime
;
10 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
12 use proxmox
::api
::error
::{StatusCode, HttpError}
;
14 tools
::{ParallelHandler, compute_file_csum}
,
22 // fixme: implement filters
23 // fixme: delete vanished groups
24 // Todo: correctly lock backup groups
26 async
fn pull_index_chunks
<I
: IndexFile
>(
28 chunk_reader
: RemoteChunkReader
,
29 target
: Arc
<DataStore
>,
31 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
32 ) -> Result
<(), Error
> {
34 use futures
::stream
::{self, StreamExt, TryStreamExt}
;
36 let start_time
= SystemTime
::now();
38 let stream
= stream
::iter(
39 (0..index
.index_count())
40 .map(|pos
| index
.chunk_info(pos
).unwrap())
42 let mut guard
= downloaded_chunks
.lock().unwrap();
43 let done
= guard
.contains(&info
.digest
);
45 // Note: We mark a chunk as downloaded before its actually downloaded
46 // to avoid duplicate downloads.
47 guard
.insert(info
.digest
);
53 let verify_pool
= ParallelHandler
::new(
54 "sync chunk writer", 4,
55 |(chunk
, digest
, size
): (DataBlob
, [u8;32], u64)| {
56 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
57 chunk
.verify_unencrypted(size
as usize, &digest
)?
;
58 target
.insert_chunk(&chunk
, &digest
)?
;
63 let verify_and_write_channel
= verify_pool
.channel();
65 let bytes
= Arc
::new(AtomicUsize
::new(0));
70 let target
= Arc
::clone(&target
);
71 let chunk_reader
= chunk_reader
.clone();
72 let bytes
= Arc
::clone(&bytes
);
73 let verify_and_write_channel
= verify_and_write_channel
.clone();
75 Ok
::<_
, Error
>(async
move {
76 let chunk_exists
= crate::tools
::runtime
::block_in_place(|| target
.cond_touch_chunk(&info
.digest
, false))?
;
78 //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
79 return Ok
::<_
, Error
>(());
81 //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
82 let chunk
= chunk_reader
.read_raw_chunk(&info
.digest
).await?
;
83 let raw_size
= chunk
.raw_size() as usize;
85 // decode, verify and write in a separate threads to maximize throughput
86 crate::tools
::runtime
::block_in_place(|| verify_and_write_channel
.send((chunk
, info
.digest
, info
.size())))?
;
88 bytes
.fetch_add(raw_size
, Ordering
::SeqCst
);
93 .try_buffer_unordered(20)
94 .try_for_each(|_res
| futures
::future
::ok(()))
97 drop(verify_and_write_channel
);
99 verify_pool
.complete()?
;
101 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
103 let bytes
= bytes
.load(Ordering
::SeqCst
);
105 worker
.log(format
!("downloaded {} bytes ({} MiB/s)", bytes
, (bytes
as f64)/(1024.0*1024.0*elapsed
)));
110 async
fn download_manifest(
111 reader
: &BackupReader
,
112 filename
: &std
::path
::Path
,
113 ) -> Result
<std
::fs
::File
, Error
> {
115 let mut tmp_manifest_file
= std
::fs
::OpenOptions
::new()
122 reader
.download(MANIFEST_BLOB_NAME
, &mut tmp_manifest_file
).await?
;
124 tmp_manifest_file
.seek(SeekFrom
::Start(0))?
;
126 Ok(tmp_manifest_file
)
133 ) -> Result
<(), Error
> {
134 if size
!= info
.size
{
135 bail
!("wrong size for file '{}' ({} != {})", info
.filename
, info
.size
, size
);
138 if csum
!= &info
.csum
{
139 bail
!("wrong checksum for file '{}'", info
.filename
);
145 async
fn pull_single_archive(
147 reader
: &BackupReader
,
148 chunk_reader
: &mut RemoteChunkReader
,
149 tgt_store
: Arc
<DataStore
>,
150 snapshot
: &BackupDir
,
151 archive_info
: &FileInfo
,
152 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
153 ) -> Result
<(), Error
> {
155 let archive_name
= &archive_info
.filename
;
156 let mut path
= tgt_store
.base_path();
157 path
.push(snapshot
.relative_path());
158 path
.push(archive_name
);
160 let mut tmp_path
= path
.clone();
161 tmp_path
.set_extension("tmp");
163 worker
.log(format
!("sync archive {}", archive_name
));
164 let mut tmpfile
= std
::fs
::OpenOptions
::new()
170 reader
.download(archive_name
, &mut tmpfile
).await?
;
172 match archive_type(archive_name
)?
{
173 ArchiveType
::DynamicIndex
=> {
174 let index
= DynamicIndexReader
::new(tmpfile
)
175 .map_err(|err
| format_err
!("unable to read dynamic index {:?} - {}", tmp_path
, err
))?
;
176 let (csum
, size
) = index
.compute_csum();
177 verify_archive(archive_info
, &csum
, size
)?
;
179 pull_index_chunks(worker
, chunk_reader
.clone(), tgt_store
.clone(), index
, downloaded_chunks
).await?
;
181 ArchiveType
::FixedIndex
=> {
182 let index
= FixedIndexReader
::new(tmpfile
)
183 .map_err(|err
| format_err
!("unable to read fixed index '{:?}' - {}", tmp_path
, err
))?
;
184 let (csum
, size
) = index
.compute_csum();
185 verify_archive(archive_info
, &csum
, size
)?
;
187 pull_index_chunks(worker
, chunk_reader
.clone(), tgt_store
.clone(), index
, downloaded_chunks
).await?
;
189 ArchiveType
::Blob
=> {
190 let (csum
, size
) = compute_file_csum(&mut tmpfile
)?
;
191 verify_archive(archive_info
, &csum
, size
)?
;
194 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
195 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
200 // Note: The client.log.blob is uploaded after the backup, so it is
201 // not mentioned in the manifest.
202 async
fn try_client_log_download(
204 reader
: Arc
<BackupReader
>,
205 path
: &std
::path
::Path
,
206 ) -> Result
<(), Error
> {
208 let mut tmp_path
= path
.to_owned();
209 tmp_path
.set_extension("tmp");
211 let tmpfile
= std
::fs
::OpenOptions
::new()
217 // Note: be silent if there is no log - only log successful download
218 if let Ok(()) = reader
.download(CLIENT_LOG_BLOB_NAME
, tmpfile
).await
{
219 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
220 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
222 worker
.log(format
!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME
));
228 async
fn pull_snapshot(
230 reader
: Arc
<BackupReader
>,
231 tgt_store
: Arc
<DataStore
>,
232 snapshot
: &BackupDir
,
233 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
234 ) -> Result
<(), Error
> {
236 let mut manifest_name
= tgt_store
.base_path();
237 manifest_name
.push(snapshot
.relative_path());
238 manifest_name
.push(MANIFEST_BLOB_NAME
);
240 let mut client_log_name
= tgt_store
.base_path();
241 client_log_name
.push(snapshot
.relative_path());
242 client_log_name
.push(CLIENT_LOG_BLOB_NAME
);
244 let mut tmp_manifest_name
= manifest_name
.clone();
245 tmp_manifest_name
.set_extension("tmp");
247 let download_res
= download_manifest(&reader
, &tmp_manifest_name
).await
;
248 let mut tmp_manifest_file
= match download_res
{
249 Ok(manifest_file
) => manifest_file
,
251 match err
.downcast_ref
::<HttpError
>() {
252 Some(HttpError { code, message }
) => {
254 &StatusCode
::NOT_FOUND
=> {
255 worker
.log(format
!("skipping snapshot {} - vanished since start of sync", snapshot
));
259 bail
!("HTTP error {} - {}", code
, message
);
269 let tmp_manifest_blob
= DataBlob
::load_from_reader(&mut tmp_manifest_file
)?
;
271 if manifest_name
.exists() {
272 let manifest_blob
= proxmox
::try_block
!({
273 let mut manifest_file
= std
::fs
::File
::open(&manifest_name
)
274 .map_err(|err
| format_err
!("unable to open local manifest {:?} - {}", manifest_name
, err
))?
;
276 let manifest_blob
= DataBlob
::load_from_reader(&mut manifest_file
)?
;
278 }).map_err(|err
: Error
| {
279 format_err
!("unable to read local manifest {:?} - {}", manifest_name
, err
)
282 if manifest_blob
.raw_data() == tmp_manifest_blob
.raw_data() {
283 if !client_log_name
.exists() {
284 try_client_log_download(worker
, reader
, &client_log_name
).await?
;
286 worker
.log("no data changes");
287 let _
= std
::fs
::remove_file(&tmp_manifest_name
);
288 return Ok(()); // nothing changed
292 let manifest
= BackupManifest
::try_from(tmp_manifest_blob
)?
;
294 for item
in manifest
.files() {
295 let mut path
= tgt_store
.base_path();
296 path
.push(snapshot
.relative_path());
297 path
.push(&item
.filename
);
300 match archive_type(&item
.filename
)?
{
301 ArchiveType
::DynamicIndex
=> {
302 let index
= DynamicIndexReader
::open(&path
)?
;
303 let (csum
, size
) = index
.compute_csum();
304 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
307 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
311 ArchiveType
::FixedIndex
=> {
312 let index
= FixedIndexReader
::open(&path
)?
;
313 let (csum
, size
) = index
.compute_csum();
314 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
317 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
321 ArchiveType
::Blob
=> {
322 let mut tmpfile
= std
::fs
::File
::open(&path
)?
;
323 let (csum
, size
) = compute_file_csum(&mut tmpfile
)?
;
324 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
327 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
334 let mut chunk_reader
= RemoteChunkReader
::new(reader
.clone(), None
, item
.chunk_crypt_mode(), HashMap
::new());
343 downloaded_chunks
.clone(),
347 if let Err(err
) = std
::fs
::rename(&tmp_manifest_name
, &manifest_name
) {
348 bail
!("Atomic rename file {:?} failed - {}", manifest_name
, err
);
351 if !client_log_name
.exists() {
352 try_client_log_download(worker
, reader
, &client_log_name
).await?
;
355 // cleanup - remove stale files
356 tgt_store
.cleanup_backup_dir(snapshot
, &manifest
)?
;
361 pub async
fn pull_snapshot_from(
363 reader
: Arc
<BackupReader
>,
364 tgt_store
: Arc
<DataStore
>,
365 snapshot
: &BackupDir
,
366 downloaded_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
367 ) -> Result
<(), Error
> {
369 let (_path
, is_new
, _snap_lock
) = tgt_store
.create_locked_backup_dir(&snapshot
)?
;
372 worker
.log(format
!("sync snapshot {:?}", snapshot
.relative_path()));
374 if let Err(err
) = pull_snapshot(worker
, reader
, tgt_store
.clone(), &snapshot
, downloaded_chunks
).await
{
375 if let Err(cleanup_err
) = tgt_store
.remove_backup_dir(&snapshot
, true) {
376 worker
.log(format
!("cleanup error - {}", cleanup_err
));
380 worker
.log(format
!("sync snapshot {:?} done", snapshot
.relative_path()));
382 worker
.log(format
!("re-sync snapshot {:?}", snapshot
.relative_path()));
383 pull_snapshot(worker
, reader
, tgt_store
.clone(), &snapshot
, downloaded_chunks
).await?
;
384 worker
.log(format
!("re-sync snapshot {:?} done", snapshot
.relative_path()));
390 pub async
fn pull_group(
393 src_repo
: &BackupRepository
,
394 tgt_store
: Arc
<DataStore
>,
397 ) -> Result
<(), Error
> {
399 let path
= format
!("api2/json/admin/datastore/{}/snapshots", src_repo
.store());
402 "backup-type": group
.backup_type(),
403 "backup-id": group
.backup_id(),
406 let mut result
= client
.get(&path
, Some(args
)).await?
;
407 let mut list
: Vec
<SnapshotListItem
> = serde_json
::from_value(result
["data"].take())?
;
409 list
.sort_unstable_by(|a
, b
| a
.backup_time
.cmp(&b
.backup_time
));
411 let auth_info
= client
.login().await?
;
412 let fingerprint
= client
.fingerprint();
414 let last_sync
= tgt_store
.last_successful_backup(group
)?
;
416 let mut remote_snapshots
= std
::collections
::HashSet
::new();
418 // start with 16384 chunks (up to 65GB)
419 let downloaded_chunks
= Arc
::new(Mutex
::new(HashSet
::with_capacity(1024*64)));
422 let snapshot
= BackupDir
::new(item
.backup_type
, item
.backup_id
, item
.backup_time
)?
;
424 // in-progress backups can't be synced
425 if let None
= item
.size
{
426 worker
.log(format
!("skipping snapshot {} - in-progress backup", snapshot
));
430 let backup_time
= snapshot
.backup_time();
432 remote_snapshots
.insert(backup_time
);
434 if let Some(last_sync_time
) = last_sync
{
435 if last_sync_time
> backup_time { continue; }
438 let options
= HttpClientOptions
::new()
439 .password(Some(auth_info
.ticket
.clone()))
440 .fingerprint(fingerprint
.clone());
442 let new_client
= HttpClient
::new(src_repo
.host(), src_repo
.port(), src_repo
.user(), options
)?
;
444 let reader
= BackupReader
::start(
448 snapshot
.group().backup_type(),
449 snapshot
.group().backup_id(),
454 pull_snapshot_from(worker
, reader
, tgt_store
.clone(), &snapshot
, downloaded_chunks
.clone()).await?
;
458 let local_list
= group
.list_backups(&tgt_store
.base_path())?
;
459 for info
in local_list
{
460 let backup_time
= info
.backup_dir
.backup_time();
461 if remote_snapshots
.contains(&backup_time
) { continue; }
462 worker
.log(format
!("delete vanished snapshot {:?}", info
.backup_dir
.relative_path()));
463 tgt_store
.remove_backup_dir(&info
.backup_dir
, false)?
;
470 pub async
fn pull_store(
473 src_repo
: &BackupRepository
,
474 tgt_store
: Arc
<DataStore
>,
477 ) -> Result
<(), Error
> {
479 // explicit create shared lock to prevent GC on newly created chunks
480 let _shared_store_lock
= tgt_store
.try_shared_chunk_store_lock()?
;
482 let path
= format
!("api2/json/admin/datastore/{}/groups", src_repo
.store());
484 let mut result
= client
.get(&path
, None
).await?
;
486 let mut list
: Vec
<GroupListItem
> = serde_json
::from_value(result
["data"].take())?
;
488 list
.sort_unstable_by(|a
, b
| {
489 let type_order
= a
.backup_type
.cmp(&b
.backup_type
);
490 if type_order
== std
::cmp
::Ordering
::Equal
{
491 a
.backup_id
.cmp(&b
.backup_id
)
497 let mut errors
= false;
499 let mut new_groups
= std
::collections
::HashSet
::new();
500 for item
in list
.iter() {
501 new_groups
.insert(BackupGroup
::new(&item
.backup_type
, &item
.backup_id
));
505 let group
= BackupGroup
::new(&item
.backup_type
, &item
.backup_id
);
507 let (owner
, _lock_guard
) = tgt_store
.create_locked_backup_group(&group
, &userid
)?
;
509 if userid
!= owner
{ // only the owner is allowed to create additional snapshots
510 worker
.log(format
!("sync group {}/{} failed - owner check failed ({} != {})",
511 item
.backup_type
, item
.backup_id
, userid
, owner
));
513 continue; // do not stop here, instead continue
516 if let Err(err
) = pull_group(worker
, client
, src_repo
, tgt_store
.clone(), &group
, delete
).await
{
517 worker
.log(format
!("sync group {}/{} failed - {}", item
.backup_type
, item
.backup_id
, err
));
519 continue; // do not stop here, instead continue
524 let result
: Result
<(), Error
> = proxmox
::try_block
!({
525 let local_groups
= BackupGroup
::list_groups(&tgt_store
.base_path())?
;
526 for local_group
in local_groups
{
527 if new_groups
.contains(&local_group
) { continue; }
528 worker
.log(format
!("delete vanished group '{}/{}'", local_group
.backup_type(), local_group
.backup_id()));
529 if let Err(err
) = tgt_store
.remove_backup_group(&local_group
) {
530 worker
.log(err
.to_string());
536 if let Err(err
) = result
{
537 worker
.log(format
!("error during cleanup: {}", err
));
543 bail
!("sync failed with some errors.");