]> git.proxmox.com Git - proxmox-backup.git/blame - src/server/pull.rs
server: sync job: include removed vanished stats
[proxmox-backup.git] / src / server / pull.rs
CommitLineData
07ad6470
DM
1//! Sync datastore from remote server
2
e2956c60 3use std::collections::{HashMap, HashSet};
076f36ec
HL
4use std::io::{Seek, Write};
5use std::path::{Path, PathBuf};
54417086 6use std::sync::atomic::{AtomicUsize, Ordering};
e2956c60 7use std::sync::{Arc, Mutex};
68ac365f 8use std::time::{Duration, SystemTime};
07ad6470 9
c23192d3 10use anyhow::{bail, format_err, Error};
6ef1b649 11use http::StatusCode;
12632250 12use proxmox_human_byte::HumanByte;
05a52d01 13use proxmox_rest_server::WorkerTask;
6ef1b649 14use proxmox_router::HttpError;
05a52d01
HL
15use proxmox_sys::{task_log, task_warn};
16use serde_json::json;
c23192d3 17
2d5287fb 18use pbs_api_types::{
05a52d01
HL
19 print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
20 GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
7aeabff2 21 PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
2d5287fb 22};
05a52d01
HL
23use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
24use pbs_config::CachedUserInfo;
ea584a75
WB
25use pbs_datastore::data_blob::DataBlob;
26use pbs_datastore::dynamic_index::DynamicIndexReader;
27use pbs_datastore::fixed_index::FixedIndexReader;
28use pbs_datastore::index::IndexFile;
29use pbs_datastore::manifest::{
ee0ea735 30 archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
ea584a75 31};
05a52d01 32use pbs_datastore::read_chunk::AsyncReadChunk;
076f36ec
HL
33use pbs_datastore::{
34 check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
35};
ba0ccc59 36use pbs_tools::sha::sha256;
c23192d3 37
076f36ec 38use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
260147bd 39use crate::tools::parallel_handler::ParallelHandler;
07ad6470 40
05a52d01
HL
41struct RemoteReader {
42 backup_reader: Arc<BackupReader>,
43 dir: BackupDir,
44}
45
076f36ec
HL
46struct LocalReader {
47 _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
48 path: PathBuf,
49 datastore: Arc<DataStore>,
50}
51
05a52d01 52pub(crate) struct PullTarget {
6e9e6c7a 53 store: Arc<DataStore>,
c06c1b4b 54 ns: BackupNamespace,
05a52d01
HL
55}
56
57pub(crate) struct RemoteSource {
58 repo: BackupRepository,
59 ns: BackupNamespace,
60 client: HttpClient,
61}
62
076f36ec
HL
63pub(crate) struct LocalSource {
64 store: Arc<DataStore>,
65 ns: BackupNamespace,
66}
67
ceb639bd
CE
68#[derive(Default)]
69pub(crate) struct RemovedVanishedStats {
70 pub(crate) groups: usize,
71 pub(crate) snapshots: usize,
72 pub(crate) namespaces: usize,
73}
74
75impl RemovedVanishedStats {
76 fn add(&mut self, rhs: RemovedVanishedStats) {
77 self.groups += rhs.groups;
78 self.snapshots += rhs.snapshots;
79 self.namespaces += rhs.namespaces;
80 }
81}
82
68ac365f
CE
83#[derive(Default)]
84pub(crate) struct PullStats {
85 pub(crate) chunk_count: usize,
86 pub(crate) bytes: usize,
87 pub(crate) elapsed: Duration,
ceb639bd
CE
88 pub(crate) removed: Option<RemovedVanishedStats>,
89}
90
91impl From<RemovedVanishedStats> for PullStats {
92 fn from(removed: RemovedVanishedStats) -> Self {
93 Self {
94 removed: Some(removed),
95 ..Default::default()
96 }
97 }
68ac365f
CE
98}
99
100impl PullStats {
101 fn add(&mut self, rhs: PullStats) {
102 self.chunk_count += rhs.chunk_count;
103 self.bytes += rhs.bytes;
104 self.elapsed += rhs.elapsed;
ceb639bd
CE
105
106 if let Some(rhs_removed) = rhs.removed {
107 if let Some(ref mut removed) = self.removed {
108 removed.add(rhs_removed);
109 } else {
110 self.removed = Some(rhs_removed);
111 }
112 }
68ac365f
CE
113 }
114}
115
05a52d01
HL
116#[async_trait::async_trait]
117/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
118/// The trait includes methods for listing namespaces, groups, and backup directories,
119/// as well as retrieving a reader for reading data from the source
120trait PullSource: Send + Sync {
121 /// Lists namespaces from the source.
122 async fn list_namespaces(
123 &self,
124 max_depth: &mut Option<usize>,
125 worker: &WorkerTask,
126 ) -> Result<Vec<BackupNamespace>, Error>;
127
128 /// Lists groups within a specific namespace from the source.
129 async fn list_groups(
130 &self,
131 namespace: &BackupNamespace,
132 owner: &Authid,
133 ) -> Result<Vec<BackupGroup>, Error>;
134
135 /// Lists backup directories for a specific group within a specific namespace from the source.
136 async fn list_backup_dirs(
137 &self,
138 namespace: &BackupNamespace,
139 group: &BackupGroup,
140 worker: &WorkerTask,
141 ) -> Result<Vec<BackupDir>, Error>;
142 fn get_ns(&self) -> BackupNamespace;
4cc4ea64 143 fn get_store(&self) -> &str;
05a52d01
HL
144
145 /// Returns a reader for reading data from a specific backup directory.
146 async fn reader(
147 &self,
148 ns: &BackupNamespace,
149 dir: &BackupDir,
150 ) -> Result<Arc<dyn PullReader>, Error>;
151}
152
153#[async_trait::async_trait]
154impl PullSource for RemoteSource {
155 async fn list_namespaces(
156 &self,
157 max_depth: &mut Option<usize>,
158 worker: &WorkerTask,
159 ) -> Result<Vec<BackupNamespace>, Error> {
160 if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
161 return Ok(vec![self.ns.clone()]);
162 }
163
164 let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
165 let mut data = json!({});
166 if let Some(max_depth) = max_depth {
167 data["max-depth"] = json!(max_depth);
168 }
169
170 if !self.ns.is_root() {
171 data["parent"] = json!(self.ns);
172 }
173 self.client.login().await?;
174
175 let mut result = match self.client.get(&path, Some(data)).await {
176 Ok(res) => res,
177 Err(err) => match err.downcast_ref::<HttpError>() {
178 Some(HttpError { code, message }) => match code {
179 &StatusCode::NOT_FOUND => {
180 if self.ns.is_root() && max_depth.is_none() {
181 task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
182 task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
183 max_depth.replace(0);
184 } else {
185 bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
186 }
187
188 return Ok(vec![self.ns.clone()]);
189 }
190 _ => {
191 bail!("Querying namespaces failed - HTTP error {code} - {message}");
192 }
193 },
194 None => {
195 bail!("Querying namespaces failed - {err}");
196 }
197 },
198 };
199
200 let list: Vec<BackupNamespace> =
201 serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
202 .into_iter()
203 .map(|list_item| list_item.ns)
204 .collect();
205
206 Ok(list)
207 }
208
209 async fn list_groups(
210 &self,
211 namespace: &BackupNamespace,
212 _owner: &Authid,
213 ) -> Result<Vec<BackupGroup>, Error> {
214 let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
215
216 let args = if !namespace.is_root() {
217 Some(json!({ "ns": namespace.clone() }))
218 } else {
219 None
220 };
221
222 self.client.login().await?;
223 let mut result =
224 self.client.get(&path, args).await.map_err(|err| {
225 format_err!("Failed to retrieve backup groups from remote - {}", err)
226 })?;
227
228 Ok(
229 serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
230 .map_err(Error::from)?
231 .into_iter()
232 .map(|item| item.backup)
233 .collect::<Vec<BackupGroup>>(),
234 )
235 }
236
237 async fn list_backup_dirs(
238 &self,
b14e5dcb 239 namespace: &BackupNamespace,
05a52d01
HL
240 group: &BackupGroup,
241 worker: &WorkerTask,
242 ) -> Result<Vec<BackupDir>, Error> {
243 let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
244
245 let mut args = json!({
246 "backup-type": group.ty,
247 "backup-id": group.id,
248 });
249
b14e5dcb 250 if !namespace.is_root() {
2224b390 251 args["ns"] = serde_json::to_value(namespace)?;
05a52d01
HL
252 }
253
254 self.client.login().await?;
255
256 let mut result = self.client.get(&path, Some(args)).await?;
257 let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
258 Ok(snapshot_list
259 .into_iter()
260 .filter_map(|item: SnapshotListItem| {
261 let snapshot = item.backup;
262 // in-progress backups can't be synced
263 if item.size.is_none() {
264 task_log!(
265 worker,
266 "skipping snapshot {} - in-progress backup",
267 snapshot
268 );
269 return None;
270 }
271
272 Some(snapshot)
273 })
274 .collect::<Vec<BackupDir>>())
275 }
276
277 fn get_ns(&self) -> BackupNamespace {
278 self.ns.clone()
279 }
280
4cc4ea64 281 fn get_store(&self) -> &str {
2224b390 282 self.repo.store()
05a52d01
HL
283 }
284
285 async fn reader(
286 &self,
287 ns: &BackupNamespace,
288 dir: &BackupDir,
289 ) -> Result<Arc<dyn PullReader>, Error> {
290 let backup_reader =
291 BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
292 Ok(Arc::new(RemoteReader {
293 backup_reader,
294 dir: dir.clone(),
295 }))
296 }
297}
298
076f36ec
HL
299#[async_trait::async_trait]
300impl PullSource for LocalSource {
301 async fn list_namespaces(
302 &self,
303 max_depth: &mut Option<usize>,
304 _worker: &WorkerTask,
305 ) -> Result<Vec<BackupNamespace>, Error> {
306 ListNamespacesRecursive::new_max_depth(
307 self.store.clone(),
308 self.ns.clone(),
309 max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
310 )?
311 .collect()
312 }
313
314 async fn list_groups(
315 &self,
316 namespace: &BackupNamespace,
317 owner: &Authid,
318 ) -> Result<Vec<BackupGroup>, Error> {
319 Ok(ListAccessibleBackupGroups::new_with_privs(
320 &self.store,
321 namespace.clone(),
322 0,
7aeabff2
HL
323 Some(PRIV_DATASTORE_READ),
324 Some(PRIV_DATASTORE_BACKUP),
076f36ec
HL
325 Some(owner),
326 )?
327 .filter_map(Result::ok)
328 .map(|backup_group| backup_group.group().clone())
329 .collect::<Vec<pbs_api_types::BackupGroup>>())
330 }
331
332 async fn list_backup_dirs(
333 &self,
334 namespace: &BackupNamespace,
335 group: &BackupGroup,
336 _worker: &WorkerTask,
337 ) -> Result<Vec<BackupDir>, Error> {
338 Ok(self
339 .store
340 .backup_group(namespace.clone(), group.clone())
341 .iter_snapshots()?
342 .filter_map(Result::ok)
343 .map(|snapshot| snapshot.dir().to_owned())
344 .collect::<Vec<BackupDir>>())
345 }
346
347 fn get_ns(&self) -> BackupNamespace {
348 self.ns.clone()
349 }
350
4cc4ea64
FG
351 fn get_store(&self) -> &str {
352 self.store.name()
076f36ec
HL
353 }
354
355 async fn reader(
356 &self,
357 ns: &BackupNamespace,
358 dir: &BackupDir,
359 ) -> Result<Arc<dyn PullReader>, Error> {
360 let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
361 let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
362 &dir.full_path(),
363 "snapshot",
364 "locked by another operation",
365 )?;
366 Ok(Arc::new(LocalReader {
367 _dir_lock: Arc::new(Mutex::new(dir_lock)),
368 path: dir.full_path(),
369 datastore: dir.datastore().clone(),
370 }))
371 }
372}
373
05a52d01
HL
374#[async_trait::async_trait]
375/// `PullReader` is a trait that provides an interface for reading data from a source.
376/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
377trait PullReader: Send + Sync {
378 /// Returns a chunk reader with the specified encryption mode.
379 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
380
381 /// Asynchronously loads a file from the source into a local file.
382 /// `filename` is the name of the file to load from the source.
383 /// `into` is the path of the local file to load the source file into.
384 async fn load_file_into(
385 &self,
386 filename: &str,
387 into: &Path,
388 worker: &WorkerTask,
389 ) -> Result<Option<DataBlob>, Error>;
390
391 /// Tries to download the client log from the source and save it into a local file.
392 async fn try_download_client_log(
393 &self,
394 to_path: &Path,
395 worker: &WorkerTask,
396 ) -> Result<(), Error>;
397
398 fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
399}
400
401#[async_trait::async_trait]
402impl PullReader for RemoteReader {
403 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
404 Arc::new(RemoteChunkReader::new(
405 self.backup_reader.clone(),
406 None,
407 crypt_mode,
408 HashMap::new(),
409 ))
410 }
411
412 async fn load_file_into(
413 &self,
414 filename: &str,
415 into: &Path,
416 worker: &WorkerTask,
417 ) -> Result<Option<DataBlob>, Error> {
418 let mut tmp_file = std::fs::OpenOptions::new()
419 .write(true)
420 .create(true)
421 .truncate(true)
422 .read(true)
423 .open(into)?;
424 let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
425 if let Err(err) = download_result {
426 match err.downcast_ref::<HttpError>() {
427 Some(HttpError { code, message }) => match *code {
428 StatusCode::NOT_FOUND => {
429 task_log!(
430 worker,
431 "skipping snapshot {} - vanished since start of sync",
432 &self.dir,
433 );
434 return Ok(None);
435 }
436 _ => {
437 bail!("HTTP error {code} - {message}");
438 }
439 },
440 None => {
441 return Err(err);
442 }
443 };
444 };
445 tmp_file.rewind()?;
446 Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
447 }
448
449 async fn try_download_client_log(
450 &self,
451 to_path: &Path,
452 worker: &WorkerTask,
453 ) -> Result<(), Error> {
454 let mut tmp_path = to_path.to_owned();
455 tmp_path.set_extension("tmp");
456
457 let tmpfile = std::fs::OpenOptions::new()
458 .write(true)
459 .create(true)
460 .read(true)
461 .open(&tmp_path)?;
462
463 // Note: be silent if there is no log - only log successful download
464 if let Ok(()) = self
465 .backup_reader
466 .download(CLIENT_LOG_BLOB_NAME, tmpfile)
467 .await
468 {
469 if let Err(err) = std::fs::rename(&tmp_path, to_path) {
470 bail!("Atomic rename file {:?} failed - {}", to_path, err);
471 }
472 task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
473 }
474
475 Ok(())
476 }
477
478 fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
479 false
480 }
481}
482
076f36ec
HL
483#[async_trait::async_trait]
484impl PullReader for LocalReader {
485 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
486 Arc::new(LocalChunkReader::new(
487 self.datastore.clone(),
488 None,
489 crypt_mode,
490 ))
491 }
492
493 async fn load_file_into(
494 &self,
495 filename: &str,
496 into: &Path,
497 _worker: &WorkerTask,
498 ) -> Result<Option<DataBlob>, Error> {
499 let mut tmp_file = std::fs::OpenOptions::new()
500 .write(true)
501 .create(true)
502 .truncate(true)
503 .read(true)
504 .open(into)?;
505 let mut from_path = self.path.clone();
506 from_path.push(filename);
507 tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
508 tmp_file.rewind()?;
509 Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
510 }
511
512 async fn try_download_client_log(
513 &self,
514 _to_path: &Path,
515 _worker: &WorkerTask,
516 ) -> Result<(), Error> {
517 Ok(())
518 }
519
520 fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
521 self.datastore.name() == target_store_name
522 }
523}
524
05a52d01
HL
525/// Parameters for a pull operation.
526pub(crate) struct PullParameters {
527 /// Where data is pulled from
528 source: Arc<dyn PullSource>,
529 /// Where data should be pulled into
530 target: PullTarget,
29c56859 531 /// Owner of synced groups (needs to match local owner of pre-existing groups)
6e9e6c7a 532 owner: Authid,
29c56859 533 /// Whether to remove groups which exist locally, but not on the remote end
6e9e6c7a 534 remove_vanished: bool,
b9310489
FG
535 /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion)
536 max_depth: Option<usize>,
29c56859 537 /// Filters for reducing the pull scope
59c92736 538 group_filter: Vec<GroupFilter>,
9b67352a
SH
539 /// How many snapshots should be transferred at most (taking the newest N snapshots)
540 transfer_last: Option<usize>,
6e9e6c7a
FG
541}
542
543impl PullParameters {
29c56859 544 /// Creates a new instance of `PullParameters`.
5b6cb51d 545 pub(crate) fn new(
6e9e6c7a 546 store: &str,
c06c1b4b 547 ns: BackupNamespace,
05a52d01 548 remote: Option<&str>,
6e9e6c7a 549 remote_store: &str,
c06c1b4b 550 remote_ns: BackupNamespace,
6e9e6c7a
FG
551 owner: Authid,
552 remove_vanished: Option<bool>,
b9310489 553 max_depth: Option<usize>,
71e53463 554 group_filter: Option<Vec<GroupFilter>>,
2d5287fb 555 limit: RateLimitConfig,
9b67352a 556 transfer_last: Option<usize>,
6e9e6c7a 557 ) -> Result<Self, Error> {
66abc4cb
FG
558 if let Some(max_depth) = max_depth {
559 ns.check_max_depth(max_depth)?;
560 remote_ns.check_max_depth(max_depth)?;
05a52d01 561 };
61ef4ae8 562 let remove_vanished = remove_vanished.unwrap_or(false);
6e9e6c7a 563
05a52d01
HL
564 let source: Arc<dyn PullSource> = if let Some(remote) = remote {
565 let (remote_config, _digest) = pbs_config::remote::config()?;
566 let remote: Remote = remote_config.lookup("remote", remote)?;
6e9e6c7a 567
05a52d01
HL
568 let repo = BackupRepository::new(
569 Some(remote.config.auth_id.clone()),
570 Some(remote.config.host.clone()),
571 remote.config.port,
572 remote_store.to_string(),
573 );
574 let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
575 Arc::new(RemoteSource {
576 repo,
577 ns: remote_ns,
578 client,
579 })
580 } else {
076f36ec
HL
581 Arc::new(LocalSource {
582 store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
583 ns: remote_ns,
584 })
05a52d01
HL
585 };
586 let target = PullTarget {
587 store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
c06c1b4b 588 ns,
05a52d01
HL
589 };
590
2dd9f98f 591 let group_filter = group_filter.unwrap_or_default();
59c92736 592
05a52d01 593 Ok(Self {
ee0ea735 594 source,
05a52d01 595 target,
ee0ea735
TL
596 owner,
597 remove_vanished,
c06c1b4b 598 max_depth,
ee0ea735 599 group_filter,
9b67352a 600 transfer_last,
ee0ea735 601 })
6e9e6c7a 602 }
6e9e6c7a
FG
603}
604
07ad6470 605async fn pull_index_chunks<I: IndexFile>(
998db639 606 worker: &WorkerTask,
05a52d01 607 chunk_reader: Arc<dyn AsyncReadChunk>,
07ad6470
DM
608 target: Arc<DataStore>,
609 index: I,
e2956c60 610 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
68ac365f 611) -> Result<PullStats, Error> {
73b2cc49 612 use futures::stream::{self, StreamExt, TryStreamExt};
07ad6470 613
998db639
DM
614 let start_time = SystemTime::now();
615
ebbe4958
DM
616 let stream = stream::iter(
617 (0..index.index_count())
618 .map(|pos| index.chunk_info(pos).unwrap())
619 .filter(|info| {
620 let mut guard = downloaded_chunks.lock().unwrap();
621 let done = guard.contains(&info.digest);
622 if !done {
623 // Note: We mark a chunk as downloaded before its actually downloaded
624 // to avoid duplicate downloads.
625 guard.insert(info.digest);
626 }
627 !done
e2956c60 628 }),
ebbe4958 629 );
07ad6470 630
a71bc08f 631 let target2 = target.clone();
54417086 632 let verify_pool = ParallelHandler::new(
e2956c60
FG
633 "sync chunk writer",
634 4,
635 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
25877d05 636 // println!("verify and write {}", hex::encode(&digest));
54417086 637 chunk.verify_unencrypted(size as usize, &digest)?;
a71bc08f 638 target2.insert_chunk(&chunk, &digest)?;
54417086 639 Ok(())
e2956c60 640 },
54417086
DM
641 );
642
643 let verify_and_write_channel = verify_pool.channel();
998db639 644
54417086 645 let bytes = Arc::new(AtomicUsize::new(0));
68ac365f 646 let chunk_count = Arc::new(AtomicUsize::new(0));
54417086
DM
647
648 stream
73b2cc49 649 .map(|info| {
73b2cc49
DM
650 let target = Arc::clone(&target);
651 let chunk_reader = chunk_reader.clone();
54417086 652 let bytes = Arc::clone(&bytes);
68ac365f 653 let chunk_count = Arc::clone(&chunk_count);
54417086 654 let verify_and_write_channel = verify_and_write_channel.clone();
73b2cc49
DM
655
656 Ok::<_, Error>(async move {
9a1b24b6 657 let chunk_exists = proxmox_async::runtime::block_in_place(|| {
e2956c60
FG
658 target.cond_touch_chunk(&info.digest, false)
659 })?;
73b2cc49 660 if chunk_exists {
25877d05 661 //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest));
73b2cc49
DM
662 return Ok::<_, Error>(());
663 }
25877d05 664 //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest));
73b2cc49 665 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
54417086 666 let raw_size = chunk.raw_size() as usize;
73b2cc49 667
998db639 668 // decode, verify and write in a separate threads to maximize throughput
9a1b24b6 669 proxmox_async::runtime::block_in_place(|| {
e2956c60
FG
670 verify_and_write_channel.send((chunk, info.digest, info.size()))
671 })?;
54417086
DM
672
673 bytes.fetch_add(raw_size, Ordering::SeqCst);
68ac365f 674 chunk_count.fetch_add(1, Ordering::SeqCst);
998db639
DM
675
676 Ok(())
e2956c60 677 })
73b2cc49
DM
678 })
679 .try_buffer_unordered(20)
680 .try_for_each(|_res| futures::future::ok(()))
54417086 681 .await?;
998db639 682
54417086 683 drop(verify_and_write_channel);
998db639 684
54417086 685 verify_pool.complete()?;
998db639 686
68ac365f 687 let elapsed = start_time.elapsed()?;
998db639 688
54417086 689 let bytes = bytes.load(Ordering::SeqCst);
68ac365f 690 let chunk_count = chunk_count.load(Ordering::SeqCst);
54417086 691
1ec0d70d 692 task_log!(
ee0ea735 693 worker,
12632250
CE
694 "downloaded {} ({}/s)",
695 HumanByte::from(bytes),
696 HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
1ec0d70d 697 );
07ad6470 698
68ac365f
CE
699 Ok(PullStats {
700 chunk_count,
701 bytes,
702 elapsed,
ceb639bd 703 removed: None,
68ac365f 704 })
07ad6470
DM
705}
706
e2956c60 707fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
2ce15934 708 if size != info.size {
e2956c60
FG
709 bail!(
710 "wrong size for file '{}' ({} != {})",
711 info.filename,
712 info.size,
713 size
714 );
2ce15934
FG
715 }
716
717 if csum != &info.csum {
718 bail!("wrong checksum for file '{}'", info.filename);
719 }
720
721 Ok(())
722}
723
29c56859
FG
724/// Pulls a single file referenced by a manifest.
725///
726/// Pulling an archive consists of the following steps:
05a52d01
HL
727/// - Load archive file into tmp file
728/// -- Load file into tmp file
729/// -- Verify tmp file checksum
29c56859
FG
730/// - if archive is an index, pull referenced chunks
731/// - Rename tmp file into real path
05a52d01
HL
732async fn pull_single_archive<'a>(
733 worker: &'a WorkerTask,
734 reader: Arc<dyn PullReader + 'a>,
735 snapshot: &'a pbs_datastore::BackupDir,
736 archive_info: &'a FileInfo,
e2956c60 737 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
68ac365f 738) -> Result<PullStats, Error> {
2ce15934 739 let archive_name = &archive_info.filename;
c06c1b4b 740 let mut path = snapshot.full_path();
07ad6470
DM
741 path.push(archive_name);
742
743 let mut tmp_path = path.clone();
744 tmp_path.set_extension("tmp");
745
68ac365f
CE
746 let mut pull_stats = PullStats::default();
747
1ec0d70d
DM
748 task_log!(worker, "sync archive {}", archive_name);
749
05a52d01
HL
750 reader
751 .load_file_into(archive_name, &tmp_path, worker)
752 .await?;
07ad6470 753
05a52d01 754 let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
07ad6470
DM
755
756 match archive_type(archive_name)? {
757 ArchiveType::DynamicIndex => {
e2956c60
FG
758 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
759 format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
760 })?;
2ce15934
FG
761 let (csum, size) = index.compute_csum();
762 verify_archive(archive_info, &csum, size)?;
07ad6470 763
05a52d01
HL
764 if reader.skip_chunk_sync(snapshot.datastore().name()) {
765 task_log!(worker, "skipping chunk sync for same datastore");
766 } else {
68ac365f 767 let stats = pull_index_chunks(
05a52d01
HL
768 worker,
769 reader.chunk_reader(archive_info.crypt_mode),
770 snapshot.datastore().clone(),
771 index,
772 downloaded_chunks,
773 )
774 .await?;
68ac365f 775 pull_stats.add(stats);
05a52d01 776 }
07ad6470
DM
777 }
778 ArchiveType::FixedIndex => {
e2956c60
FG
779 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
780 format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
781 })?;
2ce15934
FG
782 let (csum, size) = index.compute_csum();
783 verify_archive(archive_info, &csum, size)?;
07ad6470 784
05a52d01
HL
785 if reader.skip_chunk_sync(snapshot.datastore().name()) {
786 task_log!(worker, "skipping chunk sync for same datastore");
787 } else {
68ac365f 788 let stats = pull_index_chunks(
05a52d01
HL
789 worker,
790 reader.chunk_reader(archive_info.crypt_mode),
791 snapshot.datastore().clone(),
792 index,
793 downloaded_chunks,
794 )
795 .await?;
68ac365f 796 pull_stats.add(stats);
05a52d01 797 }
07ad6470 798 }
2ce15934 799 ArchiveType::Blob => {
05a52d01 800 tmpfile.rewind()?;
ba0ccc59 801 let (csum, size) = sha256(&mut tmpfile)?;
2ce15934
FG
802 verify_archive(archive_info, &csum, size)?;
803 }
07ad6470
DM
804 }
805 if let Err(err) = std::fs::rename(&tmp_path, &path) {
806 bail!("Atomic rename file {:?} failed - {}", path, err);
807 }
68ac365f 808 Ok(pull_stats)
07ad6470
DM
809}
810
29c56859
FG
811/// Actual implementation of pulling a snapshot.
812///
813/// Pulling a snapshot consists of the following steps:
814/// - (Re)download the manifest
815/// -- if it matches, only download log and treat snapshot as already synced
816/// - Iterate over referenced files
817/// -- if file already exists, verify contents
818/// -- if not, pull it from the remote
819/// - Download log if not already existing
05a52d01
HL
820async fn pull_snapshot<'a>(
821 worker: &'a WorkerTask,
822 reader: Arc<dyn PullReader + 'a>,
823 snapshot: &'a pbs_datastore::BackupDir,
e2956c60 824 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
68ac365f
CE
825) -> Result<PullStats, Error> {
826 let mut pull_stats = PullStats::default();
c06c1b4b 827 let mut manifest_name = snapshot.full_path();
07ad6470
DM
828 manifest_name.push(MANIFEST_BLOB_NAME);
829
c06c1b4b 830 let mut client_log_name = snapshot.full_path();
1610c45a
DM
831 client_log_name.push(CLIENT_LOG_BLOB_NAME);
832
07ad6470
DM
833 let mut tmp_manifest_name = manifest_name.clone();
834 tmp_manifest_name.set_extension("tmp");
05a52d01
HL
835 let tmp_manifest_blob;
836 if let Some(data) = reader
837 .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
838 .await?
839 {
840 tmp_manifest_blob = data;
841 } else {
68ac365f 842 return Ok(pull_stats);
05a52d01 843 }
07ad6470
DM
844
845 if manifest_name.exists() {
6ef1b649 846 let manifest_blob = proxmox_lang::try_block!({
e2956c60 847 let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
87cdc327 848 format_err!("unable to open local manifest {manifest_name:?} - {err}")
e2956c60 849 })?;
07ad6470 850
39f18b30 851 let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
07ad6470 852 Ok(manifest_blob)
e2956c60
FG
853 })
854 .map_err(|err: Error| {
87cdc327 855 format_err!("unable to read local manifest {manifest_name:?} - {err}")
07ad6470
DM
856 })?;
857
858 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
1610c45a 859 if !client_log_name.exists() {
05a52d01
HL
860 reader
861 .try_download_client_log(&client_log_name, worker)
862 .await?;
863 };
1ec0d70d 864 task_log!(worker, "no data changes");
e0085e66 865 let _ = std::fs::remove_file(&tmp_manifest_name);
68ac365f 866 return Ok(pull_stats); // nothing changed
07ad6470
DM
867 }
868 }
869
870 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
871
07ad6470 872 for item in manifest.files() {
c06c1b4b 873 let mut path = snapshot.full_path();
07ad6470
DM
874 path.push(&item.filename);
875
876 if path.exists() {
877 match archive_type(&item.filename)? {
878 ArchiveType::DynamicIndex => {
879 let index = DynamicIndexReader::open(&path)?;
880 let (csum, size) = index.compute_csum();
881 match manifest.verify_file(&item.filename, &csum, size) {
882 Ok(_) => continue,
883 Err(err) => {
1ec0d70d 884 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
885 }
886 }
887 }
888 ArchiveType::FixedIndex => {
889 let index = FixedIndexReader::open(&path)?;
890 let (csum, size) = index.compute_csum();
891 match manifest.verify_file(&item.filename, &csum, size) {
892 Ok(_) => continue,
893 Err(err) => {
1ec0d70d 894 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
895 }
896 }
897 }
898 ArchiveType::Blob => {
899 let mut tmpfile = std::fs::File::open(&path)?;
ba0ccc59 900 let (csum, size) = sha256(&mut tmpfile)?;
07ad6470
DM
901 match manifest.verify_file(&item.filename, &csum, size) {
902 Ok(_) => continue,
903 Err(err) => {
1ec0d70d 904 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
905 }
906 }
907 }
908 }
909 }
910
68ac365f 911 let stats = pull_single_archive(
07ad6470 912 worker,
05a52d01 913 reader.clone(),
07ad6470 914 snapshot,
9a37bd6c 915 item,
ebbe4958 916 downloaded_chunks.clone(),
e2956c60
FG
917 )
918 .await?;
68ac365f 919 pull_stats.add(stats);
07ad6470
DM
920 }
921
922 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
923 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
924 }
925
1610c45a 926 if !client_log_name.exists() {
05a52d01
HL
927 reader
928 .try_download_client_log(&client_log_name, worker)
929 .await?;
930 };
fe79687c
TL
931 snapshot
932 .cleanup_unreferenced_files(&manifest)
933 .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
07ad6470 934
68ac365f 935 Ok(pull_stats)
07ad6470
DM
936}
937
c06c1b4b
FG
938/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
939///
05a52d01
HL
940/// The `reader` is configured to read from the source backup directory, while the
941/// `snapshot` is pointing to the local datastore and target namespace.
942async fn pull_snapshot_from<'a>(
943 worker: &'a WorkerTask,
944 reader: Arc<dyn PullReader + 'a>,
945 snapshot: &'a pbs_datastore::BackupDir,
e2956c60 946 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
68ac365f 947) -> Result<PullStats, Error> {
c06c1b4b
FG
948 let (_path, is_new, _snap_lock) = snapshot
949 .datastore()
1afce610 950 .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
07ad6470 951
68ac365f 952 let pull_stats = if is_new {
1afce610 953 task_log!(worker, "sync snapshot {}", snapshot.dir());
07ad6470 954
68ac365f
CE
955 match pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
956 Err(err) => {
957 if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
958 snapshot.backup_ns(),
959 snapshot.as_ref(),
960 true,
961 ) {
962 task_log!(worker, "cleanup error - {}", cleanup_err);
963 }
964 return Err(err);
965 }
966 Ok(pull_stats) => {
967 task_log!(worker, "sync snapshot {} done", snapshot.dir());
968 pull_stats
07ad6470 969 }
07ad6470
DM
970 }
971 } else {
1afce610 972 task_log!(worker, "re-sync snapshot {}", snapshot.dir());
68ac365f
CE
973 pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?
974 };
07ad6470 975
68ac365f 976 Ok(pull_stats)
07ad6470
DM
977}
978
40a57cfa 979#[derive(PartialEq, Eq)]
71db1615
SH
980enum SkipReason {
981 AlreadySynced,
982 TransferLast,
983}
984
40a57cfa
FG
985impl std::fmt::Display for SkipReason {
986 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
987 write!(
988 f,
989 "{}",
990 match self {
991 SkipReason::AlreadySynced => "older than the newest local snapshot",
992 SkipReason::TransferLast => "due to transfer-last",
993 }
994 )
995 }
996}
997
d2354a16
DC
998struct SkipInfo {
999 oldest: i64,
1000 newest: i64,
1001 count: u64,
71db1615 1002 skip_reason: SkipReason,
d2354a16
DC
1003}
1004
1005impl SkipInfo {
71db1615
SH
1006 fn new(skip_reason: SkipReason) -> Self {
1007 SkipInfo {
1008 oldest: i64::MAX,
1009 newest: i64::MIN,
1010 count: 0,
1011 skip_reason,
1012 }
1013 }
1014
1015 fn reset(&mut self) {
1016 self.count = 0;
1017 self.oldest = i64::MAX;
1018 self.newest = i64::MIN;
1019 }
1020
d2354a16
DC
1021 fn update(&mut self, backup_time: i64) {
1022 self.count += 1;
1023
1024 if backup_time < self.oldest {
1025 self.oldest = backup_time;
1026 }
1027
1028 if backup_time > self.newest {
1029 self.newest = backup_time;
1030 }
1031 }
1032
1033 fn affected(&self) -> Result<String, Error> {
1034 match self.count {
1035 0 => Ok(String::new()),
6ef1b649 1036 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
ee0ea735
TL
1037 _ => Ok(format!(
1038 "{} .. {}",
1039 proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
1040 proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
1041 )),
d2354a16
DC
1042 }
1043 }
1044}
1045
1046impl std::fmt::Display for SkipInfo {
1047 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1048 write!(
1049 f,
71db1615 1050 "skipped: {} snapshot(s) ({}) - {}",
d2354a16 1051 self.count,
71db1615 1052 self.affected().map_err(|_| std::fmt::Error)?,
40a57cfa 1053 self.skip_reason,
d2354a16
DC
1054 )
1055 }
1056}
1057
29c56859
FG
1058/// Pulls a group according to `params`.
1059///
1060/// Pulling a group consists of the following steps:
c06c1b4b
FG
1061/// - Query the list of snapshots available for this group in the source namespace on the remote
1062/// - Sort by snapshot time
29c56859
FG
1063/// - Get last snapshot timestamp on local datastore
1064/// - Iterate over list of snapshots
29c56859
FG
1065/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
1066/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
1067///
05a52d01 1068/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
c06c1b4b
FG
1069/// remote when querying snapshots. This allows us to interact with old remotes that don't have
1070/// namespace support yet.
1071///
29c56859
FG
1072/// Permission checks:
1073/// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
1074/// - local group owner is already checked by pull_store
aa073917 1075async fn pull_group(
07ad6470 1076 worker: &WorkerTask,
6e9e6c7a 1077 params: &PullParameters,
05a52d01
HL
1078 source_namespace: &BackupNamespace,
1079 group: &BackupGroup,
fc8920e3 1080 progress: &mut StoreProgress,
68ac365f 1081) -> Result<PullStats, Error> {
71db1615
SH
1082 let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
1083 let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
d2354a16 1084
05a52d01
HL
1085 let mut raw_list: Vec<BackupDir> = params
1086 .source
1087 .list_backup_dirs(source_namespace, group, worker)
1088 .await?;
1089 raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
1090
1091 let total_amount = raw_list.len();
9b67352a
SH
1092
1093 let cutoff = params
1094 .transfer_last
1095 .map(|count| total_amount.saturating_sub(count))
1096 .unwrap_or_default();
1097
05a52d01 1098 let target_ns = source_namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
86f6f741 1099
05a52d01
HL
1100 let mut source_snapshots = HashSet::new();
1101 let last_sync_time = params
1102 .target
1103 .store
1104 .last_successful_backup(&target_ns, group)?
1105 .unwrap_or(i64::MIN);
1106
1107 let list: Vec<BackupDir> = raw_list
1108 .into_iter()
1109 .enumerate()
1110 .filter(|&(pos, ref dir)| {
1111 source_snapshots.insert(dir.time);
1112 if last_sync_time > dir.time {
1113 already_synced_skip_info.update(dir.time);
1114 return false;
1115 } else if already_synced_skip_info.count > 0 {
1116 task_log!(worker, "{}", already_synced_skip_info);
1117 already_synced_skip_info.reset();
1118 return true;
1119 }
0081903f 1120
05a52d01
HL
1121 if pos < cutoff && last_sync_time != dir.time {
1122 transfer_last_skip_info.update(dir.time);
1123 return false;
1124 } else if transfer_last_skip_info.count > 0 {
1125 task_log!(worker, "{}", transfer_last_skip_info);
1126 transfer_last_skip_info.reset();
1127 }
1128 true
1129 })
1130 .map(|(_, dir)| dir)
1131 .collect();
07ad6470 1132
05a52d01
HL
1133 // start with 65536 chunks (up to 256 GiB)
1134 let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
07ad6470 1135
05a52d01 1136 progress.group_snapshots = list.len() as u64;
07ad6470 1137
68ac365f
CE
1138 let mut pull_stats = PullStats::default();
1139
05a52d01
HL
1140 for (pos, from_snapshot) in list.into_iter().enumerate() {
1141 let to_snapshot = params
1142 .target
1143 .store
1144 .backup_dir(target_ns.clone(), from_snapshot.clone())?;
c06c1b4b 1145
05a52d01
HL
1146 let reader = params
1147 .source
1148 .reader(source_namespace, &from_snapshot)
1149 .await?;
1150 let result =
1151 pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
7b8aa893 1152
fc8920e3 1153 progress.done_snapshots = pos as u64 + 1;
1ec0d70d 1154 task_log!(worker, "percentage done: {}", progress);
7b8aa893 1155
68ac365f
CE
1156 let stats = result?; // stop on error
1157 pull_stats.add(stats);
07ad6470
DM
1158 }
1159
6e9e6c7a 1160 if params.remove_vanished {
05a52d01
HL
1161 let group = params
1162 .target
1163 .store
1164 .backup_group(target_ns.clone(), group.clone());
6da20161 1165 let local_list = group.list_backups()?;
07ad6470 1166 for info in local_list {
1afce610 1167 let snapshot = info.backup_dir;
05a52d01 1168 if source_snapshots.contains(&snapshot.backup_time()) {
e2956c60
FG
1169 continue;
1170 }
1afce610 1171 if snapshot.is_protected() {
34339261
DC
1172 task_log!(
1173 worker,
e3ea5770 1174 "don't delete vanished snapshot {} (protected)",
1afce610 1175 snapshot.dir()
34339261
DC
1176 );
1177 continue;
1178 }
1afce610 1179 task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
db87d93e 1180 params
05a52d01 1181 .target
db87d93e 1182 .store
1afce610 1183 .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
ceb639bd
CE
1184 pull_stats.add(PullStats::from(RemovedVanishedStats {
1185 snapshots: 1,
1186 groups: 0,
1187 namespaces: 0,
1188 }));
07ad6470
DM
1189 }
1190 }
1191
68ac365f 1192 Ok(pull_stats)
07ad6470
DM
1193}
1194
abd82485 1195fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
c06c1b4b 1196 let mut created = false;
05a52d01 1197 let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
c06c1b4b 1198
05a52d01
HL
1199 if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
1200 check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
77bd14f6
FG
1201 .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
1202
ea2e91e5
FG
1203 let name = match ns.components().last() {
1204 Some(name) => name.to_owned(),
1205 None => {
1206 bail!("Failed to determine last component of namespace.");
c06c1b4b 1207 }
ea2e91e5
FG
1208 };
1209
05a52d01 1210 if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
abd82485 1211 bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
c06c1b4b 1212 }
ea2e91e5 1213 created = true;
c06c1b4b
FG
1214 }
1215
abd82485 1216 check_ns_privs(
05a52d01 1217 params.target.store.name(),
abd82485
FG
1218 ns,
1219 &params.owner,
1220 PRIV_DATASTORE_BACKUP,
1221 )
1222 .map_err(|err| format_err!("sync into {store_ns_str} not allowed - {err}"))?;
c06c1b4b
FG
1223
1224 Ok(created)
1225}
1226
1227fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
05a52d01 1228 check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
77bd14f6 1229 .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
ea2e91e5 1230
05a52d01
HL
1231 params
1232 .target
1233 .store
1234 .remove_namespace_recursive(local_ns, true)
c06c1b4b
FG
1235}
1236
1237fn check_and_remove_vanished_ns(
1238 worker: &WorkerTask,
1239 params: &PullParameters,
1240 synced_ns: HashSet<BackupNamespace>,
ceb639bd 1241) -> Result<(bool, RemovedVanishedStats), Error> {
c06c1b4b 1242 let mut errors = false;
ceb639bd 1243 let mut removed_stats = RemovedVanishedStats::default();
c06c1b4b
FG
1244 let user_info = CachedUserInfo::new()?;
1245
87be232d
FG
1246 // clamp like remote does so that we don't list more than we can ever have synced.
1247 let max_depth = params
1248 .max_depth
05a52d01 1249 .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
87be232d 1250
c06c1b4b 1251 let mut local_ns_list: Vec<BackupNamespace> = params
05a52d01 1252 .target
c06c1b4b 1253 .store
05a52d01 1254 .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
c06c1b4b 1255 .filter(|ns| {
abd82485 1256 let user_privs =
05a52d01 1257 user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
c06c1b4b
FG
1258 user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
1259 })
1260 .collect();
1261
1262 // children first!
1263 local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
1264
1265 for local_ns in local_ns_list {
05a52d01 1266 if local_ns == params.target.ns {
c06c1b4b
FG
1267 continue;
1268 }
1269
1270 if synced_ns.contains(&local_ns) {
1271 continue;
1272 }
1273
1274 if local_ns.is_root() {
1275 continue;
1276 }
1277 match check_and_remove_ns(params, &local_ns) {
ceb639bd
CE
1278 Ok(true) => {
1279 task_log!(worker, "Removed namespace {local_ns}");
1280 removed_stats.namespaces += 1;
1281 }
c06c1b4b
FG
1282 Ok(false) => task_log!(
1283 worker,
1284 "Did not remove namespace {} - protected snapshots remain",
1285 local_ns
1286 ),
1287 Err(err) => {
1288 task_log!(worker, "Failed to remove namespace {} - {}", local_ns, err);
1289 errors = true;
1290 }
1291 }
1292 }
1293
ceb639bd 1294 Ok((errors, removed_stats))
c06c1b4b
FG
1295}
1296
29c56859
FG
1297/// Pulls a store according to `params`.
1298///
1299/// Pulling a store consists of the following steps:
c06c1b4b
FG
1300/// - Query list of namespaces on the remote
1301/// - Iterate list
1302/// -- create sub-NS if needed (and allowed)
1303/// -- attempt to pull each NS in turn
1304/// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote
1305///
1306/// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is
1307/// passed to the remote at all to allow pulling from remotes which have no notion of namespaces.
29c56859
FG
1308///
1309/// Permission checks:
c06c1b4b
FG
1310/// - access to local datastore, namespace anchor and remote entry need to be checked at call site
1311/// - remote namespaces are filtered by remote
1312/// - creation and removal of sub-NS checked here
1313/// - access to sub-NS checked here
5b6cb51d 1314pub(crate) async fn pull_store(
07ad6470 1315 worker: &WorkerTask,
d9aad37f 1316 mut params: PullParameters,
68ac365f 1317) -> Result<PullStats, Error> {
07ad6470 1318 // explicit create shared lock to prevent GC on newly created chunks
05a52d01 1319 let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
7a3e777d 1320 let mut errors = false;
07ad6470 1321
7a3e777d 1322 let old_max_depth = params.max_depth;
05a52d01
HL
1323 let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
1324 vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
c06c1b4b 1325 } else {
05a52d01
HL
1326 params
1327 .source
1328 .list_namespaces(&mut params.max_depth, worker)
1329 .await?
c06c1b4b 1330 };
05a52d01
HL
1331
1332 let ns_layers_to_be_pulled = namespaces
1333 .iter()
1334 .map(BackupNamespace::depth)
1335 .max()
1336 .map_or(0, |v| v - params.source.get_ns().depth());
1337 let target_depth = params.target.ns.depth();
1338
1339 if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
1340 bail!(
1341 "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
1342 ns_layers_to_be_pulled,
1343 target_depth,
1344 MAX_NAMESPACE_DEPTH
1345 );
1346 }
1347
7a3e777d 1348 errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
05a52d01 1349 namespaces.sort_unstable_by_key(|a| a.name_len());
c06c1b4b
FG
1350
1351 let (mut groups, mut snapshots) = (0, 0);
1352 let mut synced_ns = HashSet::with_capacity(namespaces.len());
68ac365f 1353 let mut pull_stats = PullStats::default();
c06c1b4b
FG
1354
1355 for namespace in namespaces {
4cc4ea64 1356 let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
abd82485 1357
05a52d01
HL
1358 let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
1359 let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
c06c1b4b
FG
1360
1361 task_log!(worker, "----");
1362 task_log!(
1363 worker,
1364 "Syncing {} into {}",
abd82485
FG
1365 source_store_ns_str,
1366 target_store_ns_str
c06c1b4b
FG
1367 );
1368
1369 synced_ns.insert(target_ns.clone());
1370
abd82485 1371 match check_and_create_ns(&params, &target_ns) {
c06c1b4b
FG
1372 Ok(true) => task_log!(worker, "Created namespace {}", target_ns),
1373 Ok(false) => {}
1374 Err(err) => {
1375 task_log!(
1376 worker,
1377 "Cannot sync {} into {} - {}",
abd82485
FG
1378 source_store_ns_str,
1379 target_store_ns_str,
c06c1b4b
FG
1380 err,
1381 );
1382 errors = true;
1383 continue;
1384 }
1385 }
1386
05a52d01 1387 match pull_ns(worker, &namespace, &mut params).await {
68ac365f 1388 Ok((ns_progress, ns_pull_stats, ns_errors)) => {
c06c1b4b
FG
1389 errors |= ns_errors;
1390
68ac365f
CE
1391 pull_stats.add(ns_pull_stats);
1392
b9310489 1393 if params.max_depth != Some(0) {
c06c1b4b
FG
1394 groups += ns_progress.done_groups;
1395 snapshots += ns_progress.done_snapshots;
1396 task_log!(
1397 worker,
1398 "Finished syncing namespace {}, current progress: {} groups, {} snapshots",
1399 namespace,
1400 groups,
1401 snapshots,
1402 );
1403 }
1404 }
1405 Err(err) => {
1406 errors = true;
1407 task_log!(
1408 worker,
1409 "Encountered errors while syncing namespace {} - {}",
05a52d01 1410 &namespace,
c06c1b4b
FG
1411 err,
1412 );
1413 }
1414 };
1415 }
1416
1417 if params.remove_vanished {
ceb639bd
CE
1418 let (has_errors, stats) = check_and_remove_vanished_ns(worker, &params, synced_ns)?;
1419 errors |= has_errors;
1420 pull_stats.add(PullStats::from(stats));
c06c1b4b
FG
1421 }
1422
1423 if errors {
1424 bail!("sync failed with some errors.");
1425 }
1426
68ac365f 1427 Ok(pull_stats)
c06c1b4b
FG
1428}
1429
1430/// Pulls a namespace according to `params`.
1431///
1432/// Pulling a namespace consists of the following steps:
1433/// - Query list of groups on the remote (in `source_ns`)
1434/// - Filter list according to configured group filters
1435/// - Iterate list and attempt to pull each group in turn
1436/// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
1437/// not or no longer available on the remote
1438///
1439/// Permission checks:
1440/// - remote namespaces are filtered by remote
1441/// - owner check for vanished groups done here
5b6cb51d 1442pub(crate) async fn pull_ns(
c06c1b4b 1443 worker: &WorkerTask,
05a52d01
HL
1444 namespace: &BackupNamespace,
1445 params: &mut PullParameters,
68ac365f 1446) -> Result<(StoreProgress, PullStats, bool), Error> {
05a52d01 1447 let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, &params.owner).await?;
07ad6470
DM
1448
1449 list.sort_unstable_by(|a, b| {
05a52d01 1450 let type_order = a.ty.cmp(&b.ty);
07ad6470 1451 if type_order == std::cmp::Ordering::Equal {
05a52d01 1452 a.id.cmp(&b.id)
07ad6470
DM
1453 } else {
1454 type_order
1455 }
1456 });
1457
59c92736
PH
1458 let unfiltered_count = list.len();
1459 let list: Vec<BackupGroup> = list
1460 .into_iter()
1461 .filter(|group| group.apply_filters(&params.group_filter))
1462 .collect();
1463 task_log!(
1464 worker,
1465 "found {} groups to sync (out of {} total)",
1466 list.len(),
1467 unfiltered_count
1468 );
71e53463 1469
07ad6470
DM
1470 let mut errors = false;
1471
05a52d01 1472 let mut new_groups = HashSet::new();
e2e7560d
FG
1473 for group in list.iter() {
1474 new_groups.insert(group.clone());
07ad6470
DM
1475 }
1476
fc8920e3 1477 let mut progress = StoreProgress::new(list.len() as u64);
68ac365f 1478 let mut pull_stats = PullStats::default();
fc8920e3 1479
05a52d01
HL
1480 let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
1481
e2e7560d 1482 for (done, group) in list.into_iter().enumerate() {
fc8920e3
FG
1483 progress.done_groups = done as u64;
1484 progress.done_snapshots = 0;
1485 progress.group_snapshots = 0;
7b8aa893 1486
133d718f
WB
1487 let (owner, _lock_guard) =
1488 match params
05a52d01 1489 .target
133d718f 1490 .store
c06c1b4b 1491 .create_locked_backup_group(&target_ns, &group, &params.owner)
133d718f
WB
1492 {
1493 Ok(result) => result,
1494 Err(err) => {
1495 task_log!(
1496 worker,
1497 "sync group {} failed - group lock failed: {}",
1498 &group,
1499 err
1500 );
05a52d01
HL
1501 errors = true;
1502 // do not stop here, instead continue
1503 task_log!(worker, "create_locked_backup_group failed");
133d718f
WB
1504 continue;
1505 }
1506 };
30f73fa2 1507
07ad6470 1508 // permission check
6e9e6c7a 1509 if params.owner != owner {
e2956c60 1510 // only the owner is allowed to create additional snapshots
1ec0d70d
DM
1511 task_log!(
1512 worker,
e2e7560d 1513 "sync group {} failed - owner check failed ({} != {})",
ee0ea735
TL
1514 &group,
1515 params.owner,
1516 owner
1ec0d70d 1517 );
7b8aa893 1518 errors = true; // do not stop here, instead continue
68ac365f
CE
1519 } else {
1520 match pull_group(worker, params, namespace, &group, &mut progress).await {
1521 Ok(stats) => pull_stats.add(stats),
1522 Err(err) => {
1523 task_log!(worker, "sync group {} failed - {}", &group, err,);
1524 errors = true; // do not stop here, instead continue
1525 }
1526 }
07ad6470
DM
1527 }
1528 }
1529
6e9e6c7a 1530 if params.remove_vanished {
6ef1b649 1531 let result: Result<(), Error> = proxmox_lang::try_block!({
05a52d01 1532 for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
249dde8b 1533 let local_group = local_group?;
e13303fc
FG
1534 let local_group = local_group.group();
1535 if new_groups.contains(local_group) {
e2956c60
FG
1536 continue;
1537 }
05a52d01 1538 let owner = params.target.store.get_owner(&target_ns, local_group)?;
df768ebe
FG
1539 if check_backup_owner(&owner, &params.owner).is_err() {
1540 continue;
1541 }
59c92736
PH
1542 if !local_group.apply_filters(&params.group_filter) {
1543 continue;
71e53463 1544 }
e13303fc 1545 task_log!(worker, "delete vanished group '{local_group}'",);
524ed404 1546 let delete_stats_result = params
05a52d01
HL
1547 .target
1548 .store
524ed404
CE
1549 .remove_backup_group(&target_ns, local_group);
1550
1551 match delete_stats_result {
1552 Ok(stats) => {
1553 if !stats.all_removed() {
1554 task_log!(
1555 worker,
1556 "kept some protected snapshots of group '{local_group}'",
1557 );
ceb639bd
CE
1558 pull_stats.add(PullStats::from(RemovedVanishedStats {
1559 snapshots: stats.removed_snapshots(),
1560 groups: 0,
1561 namespaces: 0,
1562 }));
1563 } else {
1564 pull_stats.add(PullStats::from(RemovedVanishedStats {
1565 snapshots: stats.removed_snapshots(),
1566 groups: 1,
1567 namespaces: 0,
1568 }));
524ed404 1569 }
ee0ea735 1570 }
34339261
DC
1571 Err(err) => {
1572 task_log!(worker, "{}", err);
1573 errors = true;
1574 }
07ad6470
DM
1575 }
1576 }
1577 Ok(())
1578 });
1579 if let Err(err) = result {
1ec0d70d 1580 task_log!(worker, "error during cleanup: {}", err);
07ad6470
DM
1581 errors = true;
1582 };
1583 }
1584
68ac365f 1585 Ok((progress, pull_stats, errors))
07ad6470 1586}