]> git.proxmox.com Git - proxmox-backup.git/blob - src/server/pull.rs
613370384a9e7030eec2dd65667d7b1ad8647a67
[proxmox-backup.git] / src / server / pull.rs
1 //! Sync datastore from remote server
2
3 use std::collections::{HashMap, HashSet};
4 use std::convert::TryFrom;
5 use std::io::{Seek, SeekFrom};
6 use std::sync::atomic::{AtomicUsize, Ordering};
7 use std::sync::{Arc, Mutex};
8 use std::time::SystemTime;
9
10 use anyhow::{bail, format_err, Error};
11 use serde_json::json;
12
13 use proxmox::api::error::{HttpError, StatusCode};
14
15 use pbs_api_types::{Authid, SnapshotListItem, GroupListItem};
16 use pbs_datastore::{DataStore, BackupInfo, BackupDir, BackupGroup, StoreProgress};
17 use pbs_datastore::data_blob::DataBlob;
18 use pbs_datastore::dynamic_index::DynamicIndexReader;
19 use pbs_datastore::fixed_index::FixedIndexReader;
20 use pbs_datastore::index::IndexFile;
21 use pbs_datastore::manifest::{
22 CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
23 };
24 use pbs_tools::sha::sha256;
25 use pbs_tools::task_log;
26 use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
27 use proxmox_rest_server::WorkerTask;
28
29 use crate::tools::ParallelHandler;
30
31 // fixme: implement filters
32 // fixme: delete vanished groups
33 // Todo: correctly lock backup groups
34
35 async fn pull_index_chunks<I: IndexFile>(
36 worker: &WorkerTask,
37 chunk_reader: RemoteChunkReader,
38 target: Arc<DataStore>,
39 index: I,
40 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
41 ) -> Result<(), Error> {
42 use futures::stream::{self, StreamExt, TryStreamExt};
43
44 let start_time = SystemTime::now();
45
46 let stream = stream::iter(
47 (0..index.index_count())
48 .map(|pos| index.chunk_info(pos).unwrap())
49 .filter(|info| {
50 let mut guard = downloaded_chunks.lock().unwrap();
51 let done = guard.contains(&info.digest);
52 if !done {
53 // Note: We mark a chunk as downloaded before its actually downloaded
54 // to avoid duplicate downloads.
55 guard.insert(info.digest);
56 }
57 !done
58 }),
59 );
60
61 let target2 = target.clone();
62 let verify_pool = ParallelHandler::new(
63 "sync chunk writer",
64 4,
65 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
66 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
67 chunk.verify_unencrypted(size as usize, &digest)?;
68 target2.insert_chunk(&chunk, &digest)?;
69 Ok(())
70 },
71 );
72
73 let verify_and_write_channel = verify_pool.channel();
74
75 let bytes = Arc::new(AtomicUsize::new(0));
76
77 stream
78 .map(|info| {
79 let target = Arc::clone(&target);
80 let chunk_reader = chunk_reader.clone();
81 let bytes = Arc::clone(&bytes);
82 let verify_and_write_channel = verify_and_write_channel.clone();
83
84 Ok::<_, Error>(async move {
85 let chunk_exists = pbs_runtime::block_in_place(|| {
86 target.cond_touch_chunk(&info.digest, false)
87 })?;
88 if chunk_exists {
89 //task_log!(worker, "chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest));
90 return Ok::<_, Error>(());
91 }
92 //task_log!(worker, "sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest));
93 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
94 let raw_size = chunk.raw_size() as usize;
95
96 // decode, verify and write in a separate threads to maximize throughput
97 pbs_runtime::block_in_place(|| {
98 verify_and_write_channel.send((chunk, info.digest, info.size()))
99 })?;
100
101 bytes.fetch_add(raw_size, Ordering::SeqCst);
102
103 Ok(())
104 })
105 })
106 .try_buffer_unordered(20)
107 .try_for_each(|_res| futures::future::ok(()))
108 .await?;
109
110 drop(verify_and_write_channel);
111
112 verify_pool.complete()?;
113
114 let elapsed = start_time.elapsed()?.as_secs_f64();
115
116 let bytes = bytes.load(Ordering::SeqCst);
117
118 task_log!(
119 worker,
120 "downloaded {} bytes ({:.2} MiB/s)",
121 bytes,
122 (bytes as f64) / (1024.0 * 1024.0 * elapsed)
123 );
124
125 Ok(())
126 }
127
128 async fn download_manifest(
129 reader: &BackupReader,
130 filename: &std::path::Path,
131 ) -> Result<std::fs::File, Error> {
132 let mut tmp_manifest_file = std::fs::OpenOptions::new()
133 .write(true)
134 .create(true)
135 .truncate(true)
136 .read(true)
137 .open(&filename)?;
138
139 reader
140 .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
141 .await?;
142
143 tmp_manifest_file.seek(SeekFrom::Start(0))?;
144
145 Ok(tmp_manifest_file)
146 }
147
148 fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
149 if size != info.size {
150 bail!(
151 "wrong size for file '{}' ({} != {})",
152 info.filename,
153 info.size,
154 size
155 );
156 }
157
158 if csum != &info.csum {
159 bail!("wrong checksum for file '{}'", info.filename);
160 }
161
162 Ok(())
163 }
164
165 async fn pull_single_archive(
166 worker: &WorkerTask,
167 reader: &BackupReader,
168 chunk_reader: &mut RemoteChunkReader,
169 tgt_store: Arc<DataStore>,
170 snapshot: &BackupDir,
171 archive_info: &FileInfo,
172 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
173 ) -> Result<(), Error> {
174 let archive_name = &archive_info.filename;
175 let mut path = tgt_store.base_path();
176 path.push(snapshot.relative_path());
177 path.push(archive_name);
178
179 let mut tmp_path = path.clone();
180 tmp_path.set_extension("tmp");
181
182 task_log!(worker, "sync archive {}", archive_name);
183
184 let mut tmpfile = std::fs::OpenOptions::new()
185 .write(true)
186 .create(true)
187 .read(true)
188 .open(&tmp_path)?;
189
190 reader.download(archive_name, &mut tmpfile).await?;
191
192 match archive_type(archive_name)? {
193 ArchiveType::DynamicIndex => {
194 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
195 format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
196 })?;
197 let (csum, size) = index.compute_csum();
198 verify_archive(archive_info, &csum, size)?;
199
200 pull_index_chunks(
201 worker,
202 chunk_reader.clone(),
203 tgt_store.clone(),
204 index,
205 downloaded_chunks,
206 )
207 .await?;
208 }
209 ArchiveType::FixedIndex => {
210 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
211 format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
212 })?;
213 let (csum, size) = index.compute_csum();
214 verify_archive(archive_info, &csum, size)?;
215
216 pull_index_chunks(
217 worker,
218 chunk_reader.clone(),
219 tgt_store.clone(),
220 index,
221 downloaded_chunks,
222 )
223 .await?;
224 }
225 ArchiveType::Blob => {
226 tmpfile.seek(SeekFrom::Start(0))?;
227 let (csum, size) = sha256(&mut tmpfile)?;
228 verify_archive(archive_info, &csum, size)?;
229 }
230 }
231 if let Err(err) = std::fs::rename(&tmp_path, &path) {
232 bail!("Atomic rename file {:?} failed - {}", path, err);
233 }
234 Ok(())
235 }
236
237 // Note: The client.log.blob is uploaded after the backup, so it is
238 // not mentioned in the manifest.
239 async fn try_client_log_download(
240 worker: &WorkerTask,
241 reader: Arc<BackupReader>,
242 path: &std::path::Path,
243 ) -> Result<(), Error> {
244 let mut tmp_path = path.to_owned();
245 tmp_path.set_extension("tmp");
246
247 let tmpfile = std::fs::OpenOptions::new()
248 .write(true)
249 .create(true)
250 .read(true)
251 .open(&tmp_path)?;
252
253 // Note: be silent if there is no log - only log successful download
254 if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
255 if let Err(err) = std::fs::rename(&tmp_path, &path) {
256 bail!("Atomic rename file {:?} failed - {}", path, err);
257 }
258 task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
259 }
260
261 Ok(())
262 }
263
264 async fn pull_snapshot(
265 worker: &WorkerTask,
266 reader: Arc<BackupReader>,
267 tgt_store: Arc<DataStore>,
268 snapshot: &BackupDir,
269 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
270 ) -> Result<(), Error> {
271 let mut manifest_name = tgt_store.base_path();
272 manifest_name.push(snapshot.relative_path());
273 manifest_name.push(MANIFEST_BLOB_NAME);
274
275 let mut client_log_name = tgt_store.base_path();
276 client_log_name.push(snapshot.relative_path());
277 client_log_name.push(CLIENT_LOG_BLOB_NAME);
278
279 let mut tmp_manifest_name = manifest_name.clone();
280 tmp_manifest_name.set_extension("tmp");
281
282 let download_res = download_manifest(&reader, &tmp_manifest_name).await;
283 let mut tmp_manifest_file = match download_res {
284 Ok(manifest_file) => manifest_file,
285 Err(err) => {
286 match err.downcast_ref::<HttpError>() {
287 Some(HttpError { code, message }) => match *code {
288 StatusCode::NOT_FOUND => {
289 task_log!(
290 worker,
291 "skipping snapshot {} - vanished since start of sync",
292 snapshot
293 );
294 return Ok(());
295 }
296 _ => {
297 bail!("HTTP error {} - {}", code, message);
298 }
299 },
300 None => {
301 return Err(err);
302 }
303 };
304 }
305 };
306 let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
307
308 if manifest_name.exists() {
309 let manifest_blob = proxmox::try_block!({
310 let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
311 format_err!(
312 "unable to open local manifest {:?} - {}",
313 manifest_name,
314 err
315 )
316 })?;
317
318 let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
319 Ok(manifest_blob)
320 })
321 .map_err(|err: Error| {
322 format_err!(
323 "unable to read local manifest {:?} - {}",
324 manifest_name,
325 err
326 )
327 })?;
328
329 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
330 if !client_log_name.exists() {
331 try_client_log_download(worker, reader, &client_log_name).await?;
332 }
333 task_log!(worker, "no data changes");
334 let _ = std::fs::remove_file(&tmp_manifest_name);
335 return Ok(()); // nothing changed
336 }
337 }
338
339 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
340
341 for item in manifest.files() {
342 let mut path = tgt_store.base_path();
343 path.push(snapshot.relative_path());
344 path.push(&item.filename);
345
346 if path.exists() {
347 match archive_type(&item.filename)? {
348 ArchiveType::DynamicIndex => {
349 let index = DynamicIndexReader::open(&path)?;
350 let (csum, size) = index.compute_csum();
351 match manifest.verify_file(&item.filename, &csum, size) {
352 Ok(_) => continue,
353 Err(err) => {
354 task_log!(worker, "detected changed file {:?} - {}", path, err);
355 }
356 }
357 }
358 ArchiveType::FixedIndex => {
359 let index = FixedIndexReader::open(&path)?;
360 let (csum, size) = index.compute_csum();
361 match manifest.verify_file(&item.filename, &csum, size) {
362 Ok(_) => continue,
363 Err(err) => {
364 task_log!(worker, "detected changed file {:?} - {}", path, err);
365 }
366 }
367 }
368 ArchiveType::Blob => {
369 let mut tmpfile = std::fs::File::open(&path)?;
370 let (csum, size) = sha256(&mut tmpfile)?;
371 match manifest.verify_file(&item.filename, &csum, size) {
372 Ok(_) => continue,
373 Err(err) => {
374 task_log!(worker, "detected changed file {:?} - {}", path, err);
375 }
376 }
377 }
378 }
379 }
380
381 let mut chunk_reader = RemoteChunkReader::new(
382 reader.clone(),
383 None,
384 item.chunk_crypt_mode(),
385 HashMap::new(),
386 );
387
388 pull_single_archive(
389 worker,
390 &reader,
391 &mut chunk_reader,
392 tgt_store.clone(),
393 snapshot,
394 &item,
395 downloaded_chunks.clone(),
396 )
397 .await?;
398 }
399
400 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
401 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
402 }
403
404 if !client_log_name.exists() {
405 try_client_log_download(worker, reader, &client_log_name).await?;
406 }
407
408 // cleanup - remove stale files
409 tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
410
411 Ok(())
412 }
413
414 pub async fn pull_snapshot_from(
415 worker: &WorkerTask,
416 reader: Arc<BackupReader>,
417 tgt_store: Arc<DataStore>,
418 snapshot: &BackupDir,
419 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
420 ) -> Result<(), Error> {
421 let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
422
423 if is_new {
424 task_log!(worker, "sync snapshot {:?}", snapshot.relative_path());
425
426 if let Err(err) = pull_snapshot(
427 worker,
428 reader,
429 tgt_store.clone(),
430 &snapshot,
431 downloaded_chunks,
432 )
433 .await
434 {
435 if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
436 task_log!(worker, "cleanup error - {}", cleanup_err);
437 }
438 return Err(err);
439 }
440 task_log!(worker, "sync snapshot {:?} done", snapshot.relative_path());
441 } else {
442 task_log!(worker, "re-sync snapshot {:?}", snapshot.relative_path());
443 pull_snapshot(
444 worker,
445 reader,
446 tgt_store.clone(),
447 &snapshot,
448 downloaded_chunks,
449 )
450 .await?;
451 task_log!(worker, "re-sync snapshot {:?} done", snapshot.relative_path());
452 }
453
454 Ok(())
455 }
456
457 struct SkipInfo {
458 oldest: i64,
459 newest: i64,
460 count: u64,
461 }
462
463 impl SkipInfo {
464 fn update(&mut self, backup_time: i64) {
465 self.count += 1;
466
467 if backup_time < self.oldest {
468 self.oldest = backup_time;
469 }
470
471 if backup_time > self.newest {
472 self.newest = backup_time;
473 }
474 }
475
476 fn affected(&self) -> Result<String, Error> {
477 match self.count {
478 0 => Ok(String::new()),
479 1 => proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest),
480 _ => {
481 Ok(format!(
482 "{} .. {}",
483 proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest)?,
484 proxmox::tools::time::epoch_to_rfc3339_utc(self.newest)?,
485 ))
486 }
487 }
488 }
489 }
490
491 impl std::fmt::Display for SkipInfo {
492 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
493 write!(
494 f,
495 "skipped: {} snapshot(s) ({}) older than the newest local snapshot",
496 self.count,
497 self.affected().map_err(|_| std::fmt::Error)?
498 )
499 }
500 }
501
502 pub async fn pull_group(
503 worker: &WorkerTask,
504 client: &HttpClient,
505 src_repo: &BackupRepository,
506 tgt_store: Arc<DataStore>,
507 group: &BackupGroup,
508 delete: bool,
509 progress: &mut StoreProgress,
510 ) -> Result<(), Error> {
511 let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
512
513 let args = json!({
514 "backup-type": group.backup_type(),
515 "backup-id": group.backup_id(),
516 });
517
518 let mut result = client.get(&path, Some(args)).await?;
519 let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
520
521 list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
522
523 client.login().await?; // make sure auth is complete
524
525 let fingerprint = client.fingerprint();
526
527 let last_sync = tgt_store.last_successful_backup(group)?;
528
529 let mut remote_snapshots = std::collections::HashSet::new();
530
531 // start with 16384 chunks (up to 65GB)
532 let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
533
534 progress.group_snapshots = list.len() as u64;
535
536 let mut skip_info = SkipInfo {
537 oldest: i64::MAX,
538 newest: i64::MIN,
539 count: 0,
540 };
541
542 for (pos, item) in list.into_iter().enumerate() {
543 let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
544
545 // in-progress backups can't be synced
546 if item.size.is_none() {
547 task_log!(worker, "skipping snapshot {} - in-progress backup", snapshot);
548 continue;
549 }
550
551 let backup_time = snapshot.backup_time();
552
553 remote_snapshots.insert(backup_time);
554
555 if let Some(last_sync_time) = last_sync {
556 if last_sync_time > backup_time {
557 skip_info.update(backup_time);
558 continue;
559 }
560 }
561
562 // get updated auth_info (new tickets)
563 let auth_info = client.login().await?;
564
565 let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
566
567 let new_client = HttpClient::new(
568 src_repo.host(),
569 src_repo.port(),
570 src_repo.auth_id(),
571 options,
572 )?;
573
574 let reader = BackupReader::start(
575 new_client,
576 None,
577 src_repo.store(),
578 snapshot.group().backup_type(),
579 snapshot.group().backup_id(),
580 backup_time,
581 true,
582 )
583 .await?;
584
585 let result = pull_snapshot_from(
586 worker,
587 reader,
588 tgt_store.clone(),
589 &snapshot,
590 downloaded_chunks.clone(),
591 )
592 .await;
593
594 progress.done_snapshots = pos as u64 + 1;
595 task_log!(worker, "percentage done: {}", progress);
596
597 result?; // stop on error
598 }
599
600 if delete {
601 let local_list = group.list_backups(&tgt_store.base_path())?;
602 for info in local_list {
603 let backup_time = info.backup_dir.backup_time();
604 if remote_snapshots.contains(&backup_time) {
605 continue;
606 }
607 task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path());
608 tgt_store.remove_backup_dir(&info.backup_dir, false)?;
609 }
610 }
611
612 if skip_info.count > 0 {
613 task_log!(worker, "{}", skip_info);
614 }
615
616 Ok(())
617 }
618
619 pub async fn pull_store(
620 worker: &WorkerTask,
621 client: &HttpClient,
622 src_repo: &BackupRepository,
623 tgt_store: Arc<DataStore>,
624 delete: bool,
625 auth_id: Authid,
626 ) -> Result<(), Error> {
627 // explicit create shared lock to prevent GC on newly created chunks
628 let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
629
630 let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
631
632 let mut result = client
633 .get(&path, None)
634 .await
635 .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
636
637 let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
638
639 task_log!(worker, "found {} groups to sync", list.len());
640
641 list.sort_unstable_by(|a, b| {
642 let type_order = a.backup_type.cmp(&b.backup_type);
643 if type_order == std::cmp::Ordering::Equal {
644 a.backup_id.cmp(&b.backup_id)
645 } else {
646 type_order
647 }
648 });
649
650 let mut errors = false;
651
652 let mut new_groups = std::collections::HashSet::new();
653 for item in list.iter() {
654 new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id));
655 }
656
657 let mut progress = StoreProgress::new(list.len() as u64);
658
659 for (done, item) in list.into_iter().enumerate() {
660 progress.done_groups = done as u64;
661 progress.done_snapshots = 0;
662 progress.group_snapshots = 0;
663
664 let group = BackupGroup::new(&item.backup_type, &item.backup_id);
665
666 let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
667 Ok(result) => result,
668 Err(err) => {
669 task_log!(
670 worker,
671 "sync group {}/{} failed - group lock failed: {}",
672 item.backup_type, item.backup_id, err
673 );
674 errors = true; // do not stop here, instead continue
675 continue;
676 }
677 };
678
679 // permission check
680 if auth_id != owner {
681 // only the owner is allowed to create additional snapshots
682 task_log!(
683 worker,
684 "sync group {}/{} failed - owner check failed ({} != {})",
685 item.backup_type, item.backup_id, auth_id, owner
686 );
687 errors = true; // do not stop here, instead continue
688 } else if let Err(err) = pull_group(
689 worker,
690 client,
691 src_repo,
692 tgt_store.clone(),
693 &group,
694 delete,
695 &mut progress,
696 )
697 .await
698 {
699 task_log!(
700 worker,
701 "sync group {}/{} failed - {}",
702 item.backup_type, item.backup_id, err,
703 );
704 errors = true; // do not stop here, instead continue
705 }
706 }
707
708 if delete {
709 let result: Result<(), Error> = proxmox::try_block!({
710 let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?;
711 for local_group in local_groups {
712 if new_groups.contains(&local_group) {
713 continue;
714 }
715 task_log!(
716 worker,
717 "delete vanished group '{}/{}'",
718 local_group.backup_type(),
719 local_group.backup_id()
720 );
721 if let Err(err) = tgt_store.remove_backup_group(&local_group) {
722 task_log!(worker, "{}", err.to_string());
723 errors = true;
724 }
725 }
726 Ok(())
727 });
728 if let Err(err) = result {
729 task_log!(worker, "error during cleanup: {}", err);
730 errors = true;
731 };
732 }
733
734 if errors {
735 bail!("sync failed with some errors.");
736 }
737
738 Ok(())
739 }