]> git.proxmox.com Git - proxmox-backup.git/blob - src/server/pull.rs
use new fsync parameter to replace_file and atomic_open_or_create
[proxmox-backup.git] / src / server / pull.rs
1 //! Sync datastore from remote server
2
3 use std::collections::{HashMap, HashSet};
4 use std::convert::TryFrom;
5 use std::io::{Seek, SeekFrom};
6 use std::sync::atomic::{AtomicUsize, Ordering};
7 use std::sync::{Arc, Mutex};
8 use std::time::SystemTime;
9
10 use anyhow::{bail, format_err, Error};
11 use serde_json::json;
12 use http::StatusCode;
13
14 use proxmox_router::HttpError;
15
16 use pbs_api_types::{Authid, SnapshotListItem, GroupListItem};
17 use pbs_datastore::{DataStore, BackupInfo, BackupDir, BackupGroup, StoreProgress};
18 use pbs_datastore::data_blob::DataBlob;
19 use pbs_datastore::dynamic_index::DynamicIndexReader;
20 use pbs_datastore::fixed_index::FixedIndexReader;
21 use pbs_datastore::index::IndexFile;
22 use pbs_datastore::manifest::{
23 CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
24 };
25 use pbs_tools::sha::sha256;
26 use pbs_tools::task_log;
27 use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
28 use proxmox_rest_server::WorkerTask;
29
30 use crate::tools::ParallelHandler;
31
32 // fixme: implement filters
33 // fixme: delete vanished groups
34 // Todo: correctly lock backup groups
35
36 async fn pull_index_chunks<I: IndexFile>(
37 worker: &WorkerTask,
38 chunk_reader: RemoteChunkReader,
39 target: Arc<DataStore>,
40 index: I,
41 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
42 ) -> Result<(), Error> {
43 use futures::stream::{self, StreamExt, TryStreamExt};
44
45 let start_time = SystemTime::now();
46
47 let stream = stream::iter(
48 (0..index.index_count())
49 .map(|pos| index.chunk_info(pos).unwrap())
50 .filter(|info| {
51 let mut guard = downloaded_chunks.lock().unwrap();
52 let done = guard.contains(&info.digest);
53 if !done {
54 // Note: We mark a chunk as downloaded before its actually downloaded
55 // to avoid duplicate downloads.
56 guard.insert(info.digest);
57 }
58 !done
59 }),
60 );
61
62 let target2 = target.clone();
63 let verify_pool = ParallelHandler::new(
64 "sync chunk writer",
65 4,
66 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
67 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
68 chunk.verify_unencrypted(size as usize, &digest)?;
69 target2.insert_chunk(&chunk, &digest)?;
70 Ok(())
71 },
72 );
73
74 let verify_and_write_channel = verify_pool.channel();
75
76 let bytes = Arc::new(AtomicUsize::new(0));
77
78 stream
79 .map(|info| {
80 let target = Arc::clone(&target);
81 let chunk_reader = chunk_reader.clone();
82 let bytes = Arc::clone(&bytes);
83 let verify_and_write_channel = verify_and_write_channel.clone();
84
85 Ok::<_, Error>(async move {
86 let chunk_exists = pbs_runtime::block_in_place(|| {
87 target.cond_touch_chunk(&info.digest, false)
88 })?;
89 if chunk_exists {
90 //task_log!(worker, "chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest));
91 return Ok::<_, Error>(());
92 }
93 //task_log!(worker, "sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest));
94 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
95 let raw_size = chunk.raw_size() as usize;
96
97 // decode, verify and write in a separate threads to maximize throughput
98 pbs_runtime::block_in_place(|| {
99 verify_and_write_channel.send((chunk, info.digest, info.size()))
100 })?;
101
102 bytes.fetch_add(raw_size, Ordering::SeqCst);
103
104 Ok(())
105 })
106 })
107 .try_buffer_unordered(20)
108 .try_for_each(|_res| futures::future::ok(()))
109 .await?;
110
111 drop(verify_and_write_channel);
112
113 verify_pool.complete()?;
114
115 let elapsed = start_time.elapsed()?.as_secs_f64();
116
117 let bytes = bytes.load(Ordering::SeqCst);
118
119 task_log!(
120 worker,
121 "downloaded {} bytes ({:.2} MiB/s)",
122 bytes,
123 (bytes as f64) / (1024.0 * 1024.0 * elapsed)
124 );
125
126 Ok(())
127 }
128
129 async fn download_manifest(
130 reader: &BackupReader,
131 filename: &std::path::Path,
132 ) -> Result<std::fs::File, Error> {
133 let mut tmp_manifest_file = std::fs::OpenOptions::new()
134 .write(true)
135 .create(true)
136 .truncate(true)
137 .read(true)
138 .open(&filename)?;
139
140 reader
141 .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
142 .await?;
143
144 tmp_manifest_file.seek(SeekFrom::Start(0))?;
145
146 Ok(tmp_manifest_file)
147 }
148
149 fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
150 if size != info.size {
151 bail!(
152 "wrong size for file '{}' ({} != {})",
153 info.filename,
154 info.size,
155 size
156 );
157 }
158
159 if csum != &info.csum {
160 bail!("wrong checksum for file '{}'", info.filename);
161 }
162
163 Ok(())
164 }
165
166 async fn pull_single_archive(
167 worker: &WorkerTask,
168 reader: &BackupReader,
169 chunk_reader: &mut RemoteChunkReader,
170 tgt_store: Arc<DataStore>,
171 snapshot: &BackupDir,
172 archive_info: &FileInfo,
173 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
174 ) -> Result<(), Error> {
175 let archive_name = &archive_info.filename;
176 let mut path = tgt_store.base_path();
177 path.push(snapshot.relative_path());
178 path.push(archive_name);
179
180 let mut tmp_path = path.clone();
181 tmp_path.set_extension("tmp");
182
183 task_log!(worker, "sync archive {}", archive_name);
184
185 let mut tmpfile = std::fs::OpenOptions::new()
186 .write(true)
187 .create(true)
188 .read(true)
189 .open(&tmp_path)?;
190
191 reader.download(archive_name, &mut tmpfile).await?;
192
193 match archive_type(archive_name)? {
194 ArchiveType::DynamicIndex => {
195 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
196 format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
197 })?;
198 let (csum, size) = index.compute_csum();
199 verify_archive(archive_info, &csum, size)?;
200
201 pull_index_chunks(
202 worker,
203 chunk_reader.clone(),
204 tgt_store.clone(),
205 index,
206 downloaded_chunks,
207 )
208 .await?;
209 }
210 ArchiveType::FixedIndex => {
211 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
212 format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
213 })?;
214 let (csum, size) = index.compute_csum();
215 verify_archive(archive_info, &csum, size)?;
216
217 pull_index_chunks(
218 worker,
219 chunk_reader.clone(),
220 tgt_store.clone(),
221 index,
222 downloaded_chunks,
223 )
224 .await?;
225 }
226 ArchiveType::Blob => {
227 tmpfile.seek(SeekFrom::Start(0))?;
228 let (csum, size) = sha256(&mut tmpfile)?;
229 verify_archive(archive_info, &csum, size)?;
230 }
231 }
232 if let Err(err) = std::fs::rename(&tmp_path, &path) {
233 bail!("Atomic rename file {:?} failed - {}", path, err);
234 }
235 Ok(())
236 }
237
238 // Note: The client.log.blob is uploaded after the backup, so it is
239 // not mentioned in the manifest.
240 async fn try_client_log_download(
241 worker: &WorkerTask,
242 reader: Arc<BackupReader>,
243 path: &std::path::Path,
244 ) -> Result<(), Error> {
245 let mut tmp_path = path.to_owned();
246 tmp_path.set_extension("tmp");
247
248 let tmpfile = std::fs::OpenOptions::new()
249 .write(true)
250 .create(true)
251 .read(true)
252 .open(&tmp_path)?;
253
254 // Note: be silent if there is no log - only log successful download
255 if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
256 if let Err(err) = std::fs::rename(&tmp_path, &path) {
257 bail!("Atomic rename file {:?} failed - {}", path, err);
258 }
259 task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
260 }
261
262 Ok(())
263 }
264
265 async fn pull_snapshot(
266 worker: &WorkerTask,
267 reader: Arc<BackupReader>,
268 tgt_store: Arc<DataStore>,
269 snapshot: &BackupDir,
270 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
271 ) -> Result<(), Error> {
272 let mut manifest_name = tgt_store.base_path();
273 manifest_name.push(snapshot.relative_path());
274 manifest_name.push(MANIFEST_BLOB_NAME);
275
276 let mut client_log_name = tgt_store.base_path();
277 client_log_name.push(snapshot.relative_path());
278 client_log_name.push(CLIENT_LOG_BLOB_NAME);
279
280 let mut tmp_manifest_name = manifest_name.clone();
281 tmp_manifest_name.set_extension("tmp");
282
283 let download_res = download_manifest(&reader, &tmp_manifest_name).await;
284 let mut tmp_manifest_file = match download_res {
285 Ok(manifest_file) => manifest_file,
286 Err(err) => {
287 match err.downcast_ref::<HttpError>() {
288 Some(HttpError { code, message }) => match *code {
289 StatusCode::NOT_FOUND => {
290 task_log!(
291 worker,
292 "skipping snapshot {} - vanished since start of sync",
293 snapshot
294 );
295 return Ok(());
296 }
297 _ => {
298 bail!("HTTP error {} - {}", code, message);
299 }
300 },
301 None => {
302 return Err(err);
303 }
304 };
305 }
306 };
307 let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
308
309 if manifest_name.exists() {
310 let manifest_blob = proxmox_lang::try_block!({
311 let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
312 format_err!(
313 "unable to open local manifest {:?} - {}",
314 manifest_name,
315 err
316 )
317 })?;
318
319 let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
320 Ok(manifest_blob)
321 })
322 .map_err(|err: Error| {
323 format_err!(
324 "unable to read local manifest {:?} - {}",
325 manifest_name,
326 err
327 )
328 })?;
329
330 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
331 if !client_log_name.exists() {
332 try_client_log_download(worker, reader, &client_log_name).await?;
333 }
334 task_log!(worker, "no data changes");
335 let _ = std::fs::remove_file(&tmp_manifest_name);
336 return Ok(()); // nothing changed
337 }
338 }
339
340 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
341
342 for item in manifest.files() {
343 let mut path = tgt_store.base_path();
344 path.push(snapshot.relative_path());
345 path.push(&item.filename);
346
347 if path.exists() {
348 match archive_type(&item.filename)? {
349 ArchiveType::DynamicIndex => {
350 let index = DynamicIndexReader::open(&path)?;
351 let (csum, size) = index.compute_csum();
352 match manifest.verify_file(&item.filename, &csum, size) {
353 Ok(_) => continue,
354 Err(err) => {
355 task_log!(worker, "detected changed file {:?} - {}", path, err);
356 }
357 }
358 }
359 ArchiveType::FixedIndex => {
360 let index = FixedIndexReader::open(&path)?;
361 let (csum, size) = index.compute_csum();
362 match manifest.verify_file(&item.filename, &csum, size) {
363 Ok(_) => continue,
364 Err(err) => {
365 task_log!(worker, "detected changed file {:?} - {}", path, err);
366 }
367 }
368 }
369 ArchiveType::Blob => {
370 let mut tmpfile = std::fs::File::open(&path)?;
371 let (csum, size) = sha256(&mut tmpfile)?;
372 match manifest.verify_file(&item.filename, &csum, size) {
373 Ok(_) => continue,
374 Err(err) => {
375 task_log!(worker, "detected changed file {:?} - {}", path, err);
376 }
377 }
378 }
379 }
380 }
381
382 let mut chunk_reader = RemoteChunkReader::new(
383 reader.clone(),
384 None,
385 item.chunk_crypt_mode(),
386 HashMap::new(),
387 );
388
389 pull_single_archive(
390 worker,
391 &reader,
392 &mut chunk_reader,
393 tgt_store.clone(),
394 snapshot,
395 &item,
396 downloaded_chunks.clone(),
397 )
398 .await?;
399 }
400
401 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
402 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
403 }
404
405 if !client_log_name.exists() {
406 try_client_log_download(worker, reader, &client_log_name).await?;
407 }
408
409 // cleanup - remove stale files
410 tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
411
412 Ok(())
413 }
414
415 pub async fn pull_snapshot_from(
416 worker: &WorkerTask,
417 reader: Arc<BackupReader>,
418 tgt_store: Arc<DataStore>,
419 snapshot: &BackupDir,
420 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
421 ) -> Result<(), Error> {
422 let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
423
424 if is_new {
425 task_log!(worker, "sync snapshot {:?}", snapshot.relative_path());
426
427 if let Err(err) = pull_snapshot(
428 worker,
429 reader,
430 tgt_store.clone(),
431 &snapshot,
432 downloaded_chunks,
433 )
434 .await
435 {
436 if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
437 task_log!(worker, "cleanup error - {}", cleanup_err);
438 }
439 return Err(err);
440 }
441 task_log!(worker, "sync snapshot {:?} done", snapshot.relative_path());
442 } else {
443 task_log!(worker, "re-sync snapshot {:?}", snapshot.relative_path());
444 pull_snapshot(
445 worker,
446 reader,
447 tgt_store.clone(),
448 &snapshot,
449 downloaded_chunks,
450 )
451 .await?;
452 task_log!(worker, "re-sync snapshot {:?} done", snapshot.relative_path());
453 }
454
455 Ok(())
456 }
457
458 struct SkipInfo {
459 oldest: i64,
460 newest: i64,
461 count: u64,
462 }
463
464 impl SkipInfo {
465 fn update(&mut self, backup_time: i64) {
466 self.count += 1;
467
468 if backup_time < self.oldest {
469 self.oldest = backup_time;
470 }
471
472 if backup_time > self.newest {
473 self.newest = backup_time;
474 }
475 }
476
477 fn affected(&self) -> Result<String, Error> {
478 match self.count {
479 0 => Ok(String::new()),
480 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
481 _ => {
482 Ok(format!(
483 "{} .. {}",
484 proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
485 proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
486 ))
487 }
488 }
489 }
490 }
491
492 impl std::fmt::Display for SkipInfo {
493 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
494 write!(
495 f,
496 "skipped: {} snapshot(s) ({}) older than the newest local snapshot",
497 self.count,
498 self.affected().map_err(|_| std::fmt::Error)?
499 )
500 }
501 }
502
503 pub async fn pull_group(
504 worker: &WorkerTask,
505 client: &HttpClient,
506 src_repo: &BackupRepository,
507 tgt_store: Arc<DataStore>,
508 group: &BackupGroup,
509 delete: bool,
510 progress: &mut StoreProgress,
511 ) -> Result<(), Error> {
512 let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
513
514 let args = json!({
515 "backup-type": group.backup_type(),
516 "backup-id": group.backup_id(),
517 });
518
519 let mut result = client.get(&path, Some(args)).await?;
520 let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
521
522 list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
523
524 client.login().await?; // make sure auth is complete
525
526 let fingerprint = client.fingerprint();
527
528 let last_sync = tgt_store.last_successful_backup(group)?;
529
530 let mut remote_snapshots = std::collections::HashSet::new();
531
532 // start with 16384 chunks (up to 65GB)
533 let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
534
535 progress.group_snapshots = list.len() as u64;
536
537 let mut skip_info = SkipInfo {
538 oldest: i64::MAX,
539 newest: i64::MIN,
540 count: 0,
541 };
542
543 for (pos, item) in list.into_iter().enumerate() {
544 let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
545
546 // in-progress backups can't be synced
547 if item.size.is_none() {
548 task_log!(worker, "skipping snapshot {} - in-progress backup", snapshot);
549 continue;
550 }
551
552 let backup_time = snapshot.backup_time();
553
554 remote_snapshots.insert(backup_time);
555
556 if let Some(last_sync_time) = last_sync {
557 if last_sync_time > backup_time {
558 skip_info.update(backup_time);
559 continue;
560 }
561 }
562
563 // get updated auth_info (new tickets)
564 let auth_info = client.login().await?;
565
566 let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
567
568 let new_client = HttpClient::new(
569 src_repo.host(),
570 src_repo.port(),
571 src_repo.auth_id(),
572 options,
573 )?;
574
575 let reader = BackupReader::start(
576 new_client,
577 None,
578 src_repo.store(),
579 snapshot.group().backup_type(),
580 snapshot.group().backup_id(),
581 backup_time,
582 true,
583 )
584 .await?;
585
586 let result = pull_snapshot_from(
587 worker,
588 reader,
589 tgt_store.clone(),
590 &snapshot,
591 downloaded_chunks.clone(),
592 )
593 .await;
594
595 progress.done_snapshots = pos as u64 + 1;
596 task_log!(worker, "percentage done: {}", progress);
597
598 result?; // stop on error
599 }
600
601 if delete {
602 let local_list = group.list_backups(&tgt_store.base_path())?;
603 for info in local_list {
604 let backup_time = info.backup_dir.backup_time();
605 if remote_snapshots.contains(&backup_time) {
606 continue;
607 }
608 task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path());
609 tgt_store.remove_backup_dir(&info.backup_dir, false)?;
610 }
611 }
612
613 if skip_info.count > 0 {
614 task_log!(worker, "{}", skip_info);
615 }
616
617 Ok(())
618 }
619
620 pub async fn pull_store(
621 worker: &WorkerTask,
622 client: &HttpClient,
623 src_repo: &BackupRepository,
624 tgt_store: Arc<DataStore>,
625 delete: bool,
626 auth_id: Authid,
627 ) -> Result<(), Error> {
628 // explicit create shared lock to prevent GC on newly created chunks
629 let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
630
631 let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
632
633 let mut result = client
634 .get(&path, None)
635 .await
636 .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
637
638 let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
639
640 task_log!(worker, "found {} groups to sync", list.len());
641
642 list.sort_unstable_by(|a, b| {
643 let type_order = a.backup_type.cmp(&b.backup_type);
644 if type_order == std::cmp::Ordering::Equal {
645 a.backup_id.cmp(&b.backup_id)
646 } else {
647 type_order
648 }
649 });
650
651 let mut errors = false;
652
653 let mut new_groups = std::collections::HashSet::new();
654 for item in list.iter() {
655 new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id));
656 }
657
658 let mut progress = StoreProgress::new(list.len() as u64);
659
660 for (done, item) in list.into_iter().enumerate() {
661 progress.done_groups = done as u64;
662 progress.done_snapshots = 0;
663 progress.group_snapshots = 0;
664
665 let group = BackupGroup::new(&item.backup_type, &item.backup_id);
666
667 let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
668 Ok(result) => result,
669 Err(err) => {
670 task_log!(
671 worker,
672 "sync group {}/{} failed - group lock failed: {}",
673 item.backup_type, item.backup_id, err
674 );
675 errors = true; // do not stop here, instead continue
676 continue;
677 }
678 };
679
680 // permission check
681 if auth_id != owner {
682 // only the owner is allowed to create additional snapshots
683 task_log!(
684 worker,
685 "sync group {}/{} failed - owner check failed ({} != {})",
686 item.backup_type, item.backup_id, auth_id, owner
687 );
688 errors = true; // do not stop here, instead continue
689 } else if let Err(err) = pull_group(
690 worker,
691 client,
692 src_repo,
693 tgt_store.clone(),
694 &group,
695 delete,
696 &mut progress,
697 )
698 .await
699 {
700 task_log!(
701 worker,
702 "sync group {}/{} failed - {}",
703 item.backup_type, item.backup_id, err,
704 );
705 errors = true; // do not stop here, instead continue
706 }
707 }
708
709 if delete {
710 let result: Result<(), Error> = proxmox_lang::try_block!({
711 let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?;
712 for local_group in local_groups {
713 if new_groups.contains(&local_group) {
714 continue;
715 }
716 task_log!(
717 worker,
718 "delete vanished group '{}/{}'",
719 local_group.backup_type(),
720 local_group.backup_id()
721 );
722 if let Err(err) = tgt_store.remove_backup_group(&local_group) {
723 task_log!(worker, "{}", err.to_string());
724 errors = true;
725 }
726 }
727 Ok(())
728 });
729 if let Err(err) = result {
730 task_log!(worker, "error during cleanup: {}", err);
731 errors = true;
732 };
733 }
734
735 if errors {
736 bail!("sync failed with some errors.");
737 }
738
739 Ok(())
740 }