]> git.proxmox.com Git - proxmox-backup.git/blame - src/server/pull.rs
use new proxmox-async crate
[proxmox-backup.git] / src / server / pull.rs
CommitLineData
07ad6470
DM
1//! Sync datastore from remote server
2
e2956c60 3use std::collections::{HashMap, HashSet};
07ad6470 4use std::convert::TryFrom;
07ad6470 5use std::io::{Seek, SeekFrom};
54417086 6use std::sync::atomic::{AtomicUsize, Ordering};
e2956c60
FG
7use std::sync::{Arc, Mutex};
8use std::time::SystemTime;
07ad6470 9
c23192d3
WB
10use anyhow::{bail, format_err, Error};
11use serde_json::json;
6ef1b649 12use http::StatusCode;
c23192d3 13
6ef1b649 14use proxmox_router::HttpError;
d5790a9f 15use proxmox_sys::task_log;
c23192d3 16
71e53463
FG
17use pbs_api_types::{Authid, GroupFilter, GroupListItem, Remote, SnapshotListItem};
18
19use pbs_datastore::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress};
ea584a75
WB
20use pbs_datastore::data_blob::DataBlob;
21use pbs_datastore::dynamic_index::DynamicIndexReader;
22use pbs_datastore::fixed_index::FixedIndexReader;
23use pbs_datastore::index::IndexFile;
24use pbs_datastore::manifest::{
25 CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
26};
ba0ccc59 27use pbs_tools::sha::sha256;
2b7f8dd5 28use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
b9700a9f 29use proxmox_rest_server::WorkerTask;
c23192d3 30
6d5d305d 31use crate::tools::ParallelHandler;
07ad6470
DM
32
33// fixme: implement filters
34// fixme: delete vanished groups
35// Todo: correctly lock backup groups
36
6e9e6c7a
FG
37pub struct PullParameters {
38 remote: Remote,
39 source: BackupRepository,
40 store: Arc<DataStore>,
41 owner: Authid,
42 remove_vanished: bool,
71e53463 43 group_filter: Option<Vec<GroupFilter>>,
6e9e6c7a
FG
44}
45
46impl PullParameters {
47 pub fn new(
48 store: &str,
49 remote: &str,
50 remote_store: &str,
51 owner: Authid,
52 remove_vanished: Option<bool>,
71e53463 53 group_filter: Option<Vec<GroupFilter>>,
6e9e6c7a
FG
54 ) -> Result<Self, Error> {
55 let store = DataStore::lookup_datastore(store)?;
56
57 let (remote_config, _digest) = pbs_config::remote::config()?;
58 let remote: Remote = remote_config.lookup("remote", remote)?;
59
61ef4ae8 60 let remove_vanished = remove_vanished.unwrap_or(false);
6e9e6c7a
FG
61
62 let source = BackupRepository::new(
63 Some(remote.config.auth_id.clone()),
64 Some(remote.config.host.clone()),
65 remote.config.port,
66 remote_store.to_string(),
67 );
68
71e53463 69 Ok(Self { remote, source, store, owner, remove_vanished, group_filter })
6e9e6c7a
FG
70 }
71
72 pub async fn client(&self) -> Result<HttpClient, Error> {
73 crate::api2::config::remote::remote_client(&self.remote).await
74 }
75}
76
07ad6470 77async fn pull_index_chunks<I: IndexFile>(
998db639 78 worker: &WorkerTask,
73b2cc49 79 chunk_reader: RemoteChunkReader,
07ad6470
DM
80 target: Arc<DataStore>,
81 index: I,
e2956c60 82 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 83) -> Result<(), Error> {
73b2cc49 84 use futures::stream::{self, StreamExt, TryStreamExt};
07ad6470 85
998db639
DM
86 let start_time = SystemTime::now();
87
ebbe4958
DM
88 let stream = stream::iter(
89 (0..index.index_count())
90 .map(|pos| index.chunk_info(pos).unwrap())
91 .filter(|info| {
92 let mut guard = downloaded_chunks.lock().unwrap();
93 let done = guard.contains(&info.digest);
94 if !done {
95 // Note: We mark a chunk as downloaded before its actually downloaded
96 // to avoid duplicate downloads.
97 guard.insert(info.digest);
98 }
99 !done
e2956c60 100 }),
ebbe4958 101 );
07ad6470 102
a71bc08f 103 let target2 = target.clone();
54417086 104 let verify_pool = ParallelHandler::new(
e2956c60
FG
105 "sync chunk writer",
106 4,
107 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
54417086
DM
108 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
109 chunk.verify_unencrypted(size as usize, &digest)?;
a71bc08f 110 target2.insert_chunk(&chunk, &digest)?;
54417086 111 Ok(())
e2956c60 112 },
54417086
DM
113 );
114
115 let verify_and_write_channel = verify_pool.channel();
998db639 116
54417086
DM
117 let bytes = Arc::new(AtomicUsize::new(0));
118
119 stream
73b2cc49 120 .map(|info| {
73b2cc49
DM
121 let target = Arc::clone(&target);
122 let chunk_reader = chunk_reader.clone();
54417086
DM
123 let bytes = Arc::clone(&bytes);
124 let verify_and_write_channel = verify_and_write_channel.clone();
73b2cc49
DM
125
126 Ok::<_, Error>(async move {
9a1b24b6 127 let chunk_exists = proxmox_async::runtime::block_in_place(|| {
e2956c60
FG
128 target.cond_touch_chunk(&info.digest, false)
129 })?;
73b2cc49 130 if chunk_exists {
1ec0d70d 131 //task_log!(worker, "chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest));
73b2cc49
DM
132 return Ok::<_, Error>(());
133 }
1ec0d70d 134 //task_log!(worker, "sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest));
73b2cc49 135 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
54417086 136 let raw_size = chunk.raw_size() as usize;
73b2cc49 137
998db639 138 // decode, verify and write in a separate threads to maximize throughput
9a1b24b6 139 proxmox_async::runtime::block_in_place(|| {
e2956c60
FG
140 verify_and_write_channel.send((chunk, info.digest, info.size()))
141 })?;
54417086
DM
142
143 bytes.fetch_add(raw_size, Ordering::SeqCst);
998db639
DM
144
145 Ok(())
e2956c60 146 })
73b2cc49
DM
147 })
148 .try_buffer_unordered(20)
149 .try_for_each(|_res| futures::future::ok(()))
54417086 150 .await?;
998db639 151
54417086 152 drop(verify_and_write_channel);
998db639 153
54417086 154 verify_pool.complete()?;
998db639
DM
155
156 let elapsed = start_time.elapsed()?.as_secs_f64();
157
54417086
DM
158 let bytes = bytes.load(Ordering::SeqCst);
159
1ec0d70d
DM
160 task_log!(
161 worker,
e2956c60
FG
162 "downloaded {} bytes ({:.2} MiB/s)",
163 bytes,
164 (bytes as f64) / (1024.0 * 1024.0 * elapsed)
1ec0d70d 165 );
07ad6470
DM
166
167 Ok(())
168}
169
170async fn download_manifest(
171 reader: &BackupReader,
172 filename: &std::path::Path,
173) -> Result<std::fs::File, Error> {
3d571d55 174 let mut tmp_manifest_file = std::fs::OpenOptions::new()
07ad6470
DM
175 .write(true)
176 .create(true)
194da6f8 177 .truncate(true)
07ad6470
DM
178 .read(true)
179 .open(&filename)?;
180
e2956c60
FG
181 reader
182 .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
183 .await?;
07ad6470
DM
184
185 tmp_manifest_file.seek(SeekFrom::Start(0))?;
186
187 Ok(tmp_manifest_file)
188}
189
e2956c60 190fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
2ce15934 191 if size != info.size {
e2956c60
FG
192 bail!(
193 "wrong size for file '{}' ({} != {})",
194 info.filename,
195 info.size,
196 size
197 );
2ce15934
FG
198 }
199
200 if csum != &info.csum {
201 bail!("wrong checksum for file '{}'", info.filename);
202 }
203
204 Ok(())
205}
206
07ad6470
DM
207async fn pull_single_archive(
208 worker: &WorkerTask,
209 reader: &BackupReader,
210 chunk_reader: &mut RemoteChunkReader,
211 tgt_store: Arc<DataStore>,
212 snapshot: &BackupDir,
2ce15934 213 archive_info: &FileInfo,
e2956c60 214 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 215) -> Result<(), Error> {
2ce15934 216 let archive_name = &archive_info.filename;
07ad6470
DM
217 let mut path = tgt_store.base_path();
218 path.push(snapshot.relative_path());
219 path.push(archive_name);
220
221 let mut tmp_path = path.clone();
222 tmp_path.set_extension("tmp");
223
1ec0d70d
DM
224 task_log!(worker, "sync archive {}", archive_name);
225
3d571d55 226 let mut tmpfile = std::fs::OpenOptions::new()
07ad6470
DM
227 .write(true)
228 .create(true)
229 .read(true)
230 .open(&tmp_path)?;
231
3d571d55 232 reader.download(archive_name, &mut tmpfile).await?;
07ad6470
DM
233
234 match archive_type(archive_name)? {
235 ArchiveType::DynamicIndex => {
e2956c60
FG
236 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
237 format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
238 })?;
2ce15934
FG
239 let (csum, size) = index.compute_csum();
240 verify_archive(archive_info, &csum, size)?;
07ad6470 241
e2956c60
FG
242 pull_index_chunks(
243 worker,
244 chunk_reader.clone(),
245 tgt_store.clone(),
246 index,
247 downloaded_chunks,
248 )
249 .await?;
07ad6470
DM
250 }
251 ArchiveType::FixedIndex => {
e2956c60
FG
252 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
253 format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
254 })?;
2ce15934
FG
255 let (csum, size) = index.compute_csum();
256 verify_archive(archive_info, &csum, size)?;
07ad6470 257
e2956c60
FG
258 pull_index_chunks(
259 worker,
260 chunk_reader.clone(),
261 tgt_store.clone(),
262 index,
263 downloaded_chunks,
264 )
265 .await?;
07ad6470 266 }
2ce15934 267 ArchiveType::Blob => {
ba0ccc59
WB
268 tmpfile.seek(SeekFrom::Start(0))?;
269 let (csum, size) = sha256(&mut tmpfile)?;
2ce15934
FG
270 verify_archive(archive_info, &csum, size)?;
271 }
07ad6470
DM
272 }
273 if let Err(err) = std::fs::rename(&tmp_path, &path) {
274 bail!("Atomic rename file {:?} failed - {}", path, err);
275 }
276 Ok(())
277}
278
1610c45a
DM
279// Note: The client.log.blob is uploaded after the backup, so it is
280// not mentioned in the manifest.
281async fn try_client_log_download(
282 worker: &WorkerTask,
283 reader: Arc<BackupReader>,
284 path: &std::path::Path,
285) -> Result<(), Error> {
1610c45a
DM
286 let mut tmp_path = path.to_owned();
287 tmp_path.set_extension("tmp");
288
289 let tmpfile = std::fs::OpenOptions::new()
290 .write(true)
291 .create(true)
292 .read(true)
293 .open(&tmp_path)?;
294
add5861e 295 // Note: be silent if there is no log - only log successful download
3d571d55 296 if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
1610c45a
DM
297 if let Err(err) = std::fs::rename(&tmp_path, &path) {
298 bail!("Atomic rename file {:?} failed - {}", path, err);
299 }
1ec0d70d 300 task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
1610c45a
DM
301 }
302
303 Ok(())
304}
305
07ad6470
DM
306async fn pull_snapshot(
307 worker: &WorkerTask,
308 reader: Arc<BackupReader>,
309 tgt_store: Arc<DataStore>,
310 snapshot: &BackupDir,
e2956c60 311 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 312) -> Result<(), Error> {
07ad6470
DM
313 let mut manifest_name = tgt_store.base_path();
314 manifest_name.push(snapshot.relative_path());
315 manifest_name.push(MANIFEST_BLOB_NAME);
316
1610c45a
DM
317 let mut client_log_name = tgt_store.base_path();
318 client_log_name.push(snapshot.relative_path());
319 client_log_name.push(CLIENT_LOG_BLOB_NAME);
320
07ad6470
DM
321 let mut tmp_manifest_name = manifest_name.clone();
322 tmp_manifest_name.set_extension("tmp");
323
c1c4a18f
FG
324 let download_res = download_manifest(&reader, &tmp_manifest_name).await;
325 let mut tmp_manifest_file = match download_res {
326 Ok(manifest_file) => manifest_file,
327 Err(err) => {
328 match err.downcast_ref::<HttpError>() {
e2956c60
FG
329 Some(HttpError { code, message }) => match *code {
330 StatusCode::NOT_FOUND => {
1ec0d70d
DM
331 task_log!(
332 worker,
e2956c60
FG
333 "skipping snapshot {} - vanished since start of sync",
334 snapshot
1ec0d70d 335 );
e2956c60
FG
336 return Ok(());
337 }
338 _ => {
339 bail!("HTTP error {} - {}", code, message);
c1c4a18f
FG
340 }
341 },
342 None => {
343 return Err(err);
e2956c60 344 }
c1c4a18f 345 };
e2956c60 346 }
c1c4a18f 347 };
39f18b30 348 let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
07ad6470
DM
349
350 if manifest_name.exists() {
6ef1b649 351 let manifest_blob = proxmox_lang::try_block!({
e2956c60
FG
352 let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
353 format_err!(
354 "unable to open local manifest {:?} - {}",
355 manifest_name,
356 err
357 )
358 })?;
07ad6470 359
39f18b30 360 let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
07ad6470 361 Ok(manifest_blob)
e2956c60
FG
362 })
363 .map_err(|err: Error| {
364 format_err!(
365 "unable to read local manifest {:?} - {}",
366 manifest_name,
367 err
368 )
07ad6470
DM
369 })?;
370
371 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
1610c45a
DM
372 if !client_log_name.exists() {
373 try_client_log_download(worker, reader, &client_log_name).await?;
374 }
1ec0d70d 375 task_log!(worker, "no data changes");
e0085e66 376 let _ = std::fs::remove_file(&tmp_manifest_name);
07ad6470
DM
377 return Ok(()); // nothing changed
378 }
379 }
380
381 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
382
07ad6470
DM
383 for item in manifest.files() {
384 let mut path = tgt_store.base_path();
385 path.push(snapshot.relative_path());
386 path.push(&item.filename);
387
388 if path.exists() {
389 match archive_type(&item.filename)? {
390 ArchiveType::DynamicIndex => {
391 let index = DynamicIndexReader::open(&path)?;
392 let (csum, size) = index.compute_csum();
393 match manifest.verify_file(&item.filename, &csum, size) {
394 Ok(_) => continue,
395 Err(err) => {
1ec0d70d 396 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
397 }
398 }
399 }
400 ArchiveType::FixedIndex => {
401 let index = FixedIndexReader::open(&path)?;
402 let (csum, size) = index.compute_csum();
403 match manifest.verify_file(&item.filename, &csum, size) {
404 Ok(_) => continue,
405 Err(err) => {
1ec0d70d 406 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
407 }
408 }
409 }
410 ArchiveType::Blob => {
411 let mut tmpfile = std::fs::File::open(&path)?;
ba0ccc59 412 let (csum, size) = sha256(&mut tmpfile)?;
07ad6470
DM
413 match manifest.verify_file(&item.filename, &csum, size) {
414 Ok(_) => continue,
415 Err(err) => {
1ec0d70d 416 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
417 }
418 }
419 }
420 }
421 }
422
e2956c60
FG
423 let mut chunk_reader = RemoteChunkReader::new(
424 reader.clone(),
425 None,
426 item.chunk_crypt_mode(),
427 HashMap::new(),
428 );
14f6c9cb 429
07ad6470
DM
430 pull_single_archive(
431 worker,
432 &reader,
433 &mut chunk_reader,
434 tgt_store.clone(),
435 snapshot,
2ce15934 436 &item,
ebbe4958 437 downloaded_chunks.clone(),
e2956c60
FG
438 )
439 .await?;
07ad6470
DM
440 }
441
442 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
443 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
444 }
445
1610c45a
DM
446 if !client_log_name.exists() {
447 try_client_log_download(worker, reader, &client_log_name).await?;
448 }
449
07ad6470
DM
450 // cleanup - remove stale files
451 tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
452
453 Ok(())
454}
455
456pub async fn pull_snapshot_from(
457 worker: &WorkerTask,
458 reader: Arc<BackupReader>,
459 tgt_store: Arc<DataStore>,
460 snapshot: &BackupDir,
e2956c60 461 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 462) -> Result<(), Error> {
f23f7543 463 let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
07ad6470
DM
464
465 if is_new {
1ec0d70d 466 task_log!(worker, "sync snapshot {:?}", snapshot.relative_path());
07ad6470 467
e2956c60
FG
468 if let Err(err) = pull_snapshot(
469 worker,
470 reader,
471 tgt_store.clone(),
472 &snapshot,
473 downloaded_chunks,
474 )
475 .await
476 {
c9756b40 477 if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
1ec0d70d 478 task_log!(worker, "cleanup error - {}", cleanup_err);
07ad6470
DM
479 }
480 return Err(err);
481 }
1ec0d70d 482 task_log!(worker, "sync snapshot {:?} done", snapshot.relative_path());
07ad6470 483 } else {
1ec0d70d 484 task_log!(worker, "re-sync snapshot {:?}", snapshot.relative_path());
e2956c60
FG
485 pull_snapshot(
486 worker,
487 reader,
488 tgt_store.clone(),
489 &snapshot,
490 downloaded_chunks,
491 )
492 .await?;
1ec0d70d 493 task_log!(worker, "re-sync snapshot {:?} done", snapshot.relative_path());
07ad6470
DM
494 }
495
496 Ok(())
497}
498
d2354a16
DC
499struct SkipInfo {
500 oldest: i64,
501 newest: i64,
502 count: u64,
503}
504
505impl SkipInfo {
506 fn update(&mut self, backup_time: i64) {
507 self.count += 1;
508
509 if backup_time < self.oldest {
510 self.oldest = backup_time;
511 }
512
513 if backup_time > self.newest {
514 self.newest = backup_time;
515 }
516 }
517
518 fn affected(&self) -> Result<String, Error> {
519 match self.count {
520 0 => Ok(String::new()),
6ef1b649 521 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
d2354a16
DC
522 _ => {
523 Ok(format!(
524 "{} .. {}",
6ef1b649
WB
525 proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
526 proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
d2354a16
DC
527 ))
528 }
529 }
530 }
531}
532
533impl std::fmt::Display for SkipInfo {
534 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
535 write!(
536 f,
537 "skipped: {} snapshot(s) ({}) older than the newest local snapshot",
538 self.count,
539 self.affected().map_err(|_| std::fmt::Error)?
540 )
541 }
542}
543
07ad6470
DM
544pub async fn pull_group(
545 worker: &WorkerTask,
546 client: &HttpClient,
6e9e6c7a 547 params: &PullParameters,
07ad6470 548 group: &BackupGroup,
fc8920e3 549 progress: &mut StoreProgress,
07ad6470 550) -> Result<(), Error> {
6e9e6c7a 551 let path = format!("api2/json/admin/datastore/{}/snapshots", params.source.store());
07ad6470
DM
552
553 let args = json!({
554 "backup-type": group.backup_type(),
555 "backup-id": group.backup_id(),
556 });
557
558 let mut result = client.get(&path, Some(args)).await?;
559 let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
560
561 list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
562
0081903f
DM
563 client.login().await?; // make sure auth is complete
564
07ad6470
DM
565 let fingerprint = client.fingerprint();
566
6e9e6c7a 567 let last_sync = params.store.last_successful_backup(group)?;
07ad6470
DM
568
569 let mut remote_snapshots = std::collections::HashSet::new();
570
ebbe4958 571 // start with 16384 chunks (up to 65GB)
e2956c60 572 let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
ebbe4958 573
fc8920e3 574 progress.group_snapshots = list.len() as u64;
7b8aa893 575
d2354a16
DC
576 let mut skip_info = SkipInfo {
577 oldest: i64::MAX,
578 newest: i64::MIN,
579 count: 0,
580 };
581
7b8aa893 582 for (pos, item) in list.into_iter().enumerate() {
e0e5b442 583 let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
86f6f741
FG
584
585 // in-progress backups can't be synced
54aec2fa 586 if item.size.is_none() {
1ec0d70d 587 task_log!(worker, "skipping snapshot {} - in-progress backup", snapshot);
86f6f741
FG
588 continue;
589 }
590
591 let backup_time = snapshot.backup_time();
592
07ad6470
DM
593 remote_snapshots.insert(backup_time);
594
595 if let Some(last_sync_time) = last_sync {
e2956c60 596 if last_sync_time > backup_time {
d2354a16 597 skip_info.update(backup_time);
e2956c60
FG
598 continue;
599 }
07ad6470
DM
600 }
601
0081903f
DM
602 // get updated auth_info (new tickets)
603 let auth_info = client.login().await?;
604
93e3581c 605 let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
07ad6470 606
e2956c60 607 let new_client = HttpClient::new(
6e9e6c7a
FG
608 params.source.host(),
609 params.source.port(),
610 params.source.auth_id(),
e2956c60
FG
611 options,
612 )?;
07ad6470
DM
613
614 let reader = BackupReader::start(
615 new_client,
616 None,
6e9e6c7a 617 params.source.store(),
86f6f741
FG
618 snapshot.group().backup_type(),
619 snapshot.group().backup_id(),
07ad6470
DM
620 backup_time,
621 true,
e2956c60
FG
622 )
623 .await?;
07ad6470 624
e2956c60
FG
625 let result = pull_snapshot_from(
626 worker,
627 reader,
6e9e6c7a 628 params.store.clone(),
e2956c60
FG
629 &snapshot,
630 downloaded_chunks.clone(),
631 )
632 .await;
7b8aa893 633
fc8920e3 634 progress.done_snapshots = pos as u64 + 1;
1ec0d70d 635 task_log!(worker, "percentage done: {}", progress);
7b8aa893
DM
636
637 result?; // stop on error
07ad6470
DM
638 }
639
6e9e6c7a
FG
640 if params.remove_vanished {
641 let local_list = group.list_backups(&params.store.base_path())?;
07ad6470
DM
642 for info in local_list {
643 let backup_time = info.backup_dir.backup_time();
e2956c60
FG
644 if remote_snapshots.contains(&backup_time) {
645 continue;
646 }
6e9e6c7a 647 if info.backup_dir.is_protected(params.store.base_path()) {
34339261
DC
648 task_log!(
649 worker,
650 "don't delete vanished snapshot {:?} (protected)",
651 info.backup_dir.relative_path()
652 );
653 continue;
654 }
1ec0d70d 655 task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path());
6e9e6c7a 656 params.store.remove_backup_dir(&info.backup_dir, false)?;
07ad6470
DM
657 }
658 }
659
d2354a16
DC
660 if skip_info.count > 0 {
661 task_log!(worker, "{}", skip_info);
662 }
663
07ad6470
DM
664 Ok(())
665}
666
667pub async fn pull_store(
668 worker: &WorkerTask,
669 client: &HttpClient,
6e9e6c7a 670 params: &PullParameters,
07ad6470 671) -> Result<(), Error> {
07ad6470 672 // explicit create shared lock to prevent GC on newly created chunks
6e9e6c7a 673 let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
07ad6470 674
6e9e6c7a 675 let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
07ad6470 676
44de5bcc
FG
677 let mut result = client
678 .get(&path, None)
679 .await
680 .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
07ad6470
DM
681
682 let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
683
71e53463 684 let total_count = list.len();
07ad6470
DM
685 list.sort_unstable_by(|a, b| {
686 let type_order = a.backup_type.cmp(&b.backup_type);
687 if type_order == std::cmp::Ordering::Equal {
688 a.backup_id.cmp(&b.backup_id)
689 } else {
690 type_order
691 }
692 });
693
71e53463
FG
694 let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
695 filters
696 .iter()
697 .any(|filter| group.matches(filter))
698 };
699
e2e7560d
FG
700 let list:Vec<BackupGroup> = list
701 .into_iter()
702 .map(|item| BackupGroup::new(item.backup_type, item.backup_id))
703 .collect();
704
71e53463
FG
705 let list = if let Some(ref group_filter) = &params.group_filter {
706 let unfiltered_count = list.len();
707 let list:Vec<BackupGroup> = list
708 .into_iter()
709 .filter(|group| {
710 apply_filters(&group, group_filter)
711 })
712 .collect();
713 task_log!(worker, "found {} groups to sync (out of {} total)", list.len(), unfiltered_count);
714 list
715 } else {
716 task_log!(worker, "found {} groups to sync", total_count);
717 list
718 };
719
07ad6470
DM
720 let mut errors = false;
721
722 let mut new_groups = std::collections::HashSet::new();
e2e7560d
FG
723 for group in list.iter() {
724 new_groups.insert(group.clone());
07ad6470
DM
725 }
726
fc8920e3
FG
727 let mut progress = StoreProgress::new(list.len() as u64);
728
e2e7560d 729 for (done, group) in list.into_iter().enumerate() {
fc8920e3
FG
730 progress.done_groups = done as u64;
731 progress.done_snapshots = 0;
732 progress.group_snapshots = 0;
7b8aa893 733
6e9e6c7a 734 let (owner, _lock_guard) = match params.store.create_locked_backup_group(&group, &params.owner) {
30f73fa2
DM
735 Ok(result) => result,
736 Err(err) => {
1ec0d70d
DM
737 task_log!(
738 worker,
e2e7560d
FG
739 "sync group {} failed - group lock failed: {}",
740 &group, err
1ec0d70d 741 );
30f73fa2
DM
742 errors = true; // do not stop here, instead continue
743 continue;
744 }
745 };
746
07ad6470 747 // permission check
6e9e6c7a 748 if params.owner != owner {
e2956c60 749 // only the owner is allowed to create additional snapshots
1ec0d70d
DM
750 task_log!(
751 worker,
e2e7560d 752 "sync group {} failed - owner check failed ({} != {})",
6e9e6c7a 753 &group, params.owner, owner
1ec0d70d 754 );
7b8aa893 755 errors = true; // do not stop here, instead continue
20813274
WB
756 } else if let Err(err) = pull_group(
757 worker,
758 client,
6e9e6c7a 759 params,
20813274 760 &group,
fc8920e3 761 &mut progress,
e2956c60
FG
762 )
763 .await
764 {
1ec0d70d
DM
765 task_log!(
766 worker,
e2e7560d
FG
767 "sync group {} failed - {}",
768 &group, err,
1ec0d70d 769 );
20813274 770 errors = true; // do not stop here, instead continue
07ad6470
DM
771 }
772 }
773
6e9e6c7a 774 if params.remove_vanished {
6ef1b649 775 let result: Result<(), Error> = proxmox_lang::try_block!({
6e9e6c7a 776 let local_groups = BackupInfo::list_backup_groups(&params.store.base_path())?;
07ad6470 777 for local_group in local_groups {
e2956c60
FG
778 if new_groups.contains(&local_group) {
779 continue;
780 }
71e53463
FG
781 if let Some(ref group_filter) = &params.group_filter {
782 if !apply_filters(&local_group, group_filter) {
783 continue;
784 }
785 }
1ec0d70d
DM
786 task_log!(
787 worker,
e2956c60
FG
788 "delete vanished group '{}/{}'",
789 local_group.backup_type(),
790 local_group.backup_id()
1ec0d70d 791 );
6e9e6c7a 792 match params.store.remove_backup_group(&local_group) {
34339261
DC
793 Ok(true) => {},
794 Ok(false) => {
795 task_log!(worker, "kept some protected snapshots of group '{}'", local_group);
796 },
797 Err(err) => {
798 task_log!(worker, "{}", err);
799 errors = true;
800 }
07ad6470
DM
801 }
802 }
803 Ok(())
804 });
805 if let Err(err) = result {
1ec0d70d 806 task_log!(worker, "error during cleanup: {}", err);
07ad6470
DM
807 errors = true;
808 };
809 }
810
811 if errors {
812 bail!("sync failed with some errors.");
813 }
814
815 Ok(())
816}