]> git.proxmox.com Git - proxmox-backup.git/blame - src/server/pull.rs
datastore: group: return basic stats on backup group destroy
[proxmox-backup.git] / src / server / pull.rs
CommitLineData
07ad6470
DM
1//! Sync datastore from remote server
2
e2956c60 3use std::collections::{HashMap, HashSet};
076f36ec
HL
4use std::io::{Seek, Write};
5use std::path::{Path, PathBuf};
54417086 6use std::sync::atomic::{AtomicUsize, Ordering};
e2956c60 7use std::sync::{Arc, Mutex};
68ac365f 8use std::time::{Duration, SystemTime};
07ad6470 9
c23192d3 10use anyhow::{bail, format_err, Error};
6ef1b649 11use http::StatusCode;
12632250 12use proxmox_human_byte::HumanByte;
05a52d01 13use proxmox_rest_server::WorkerTask;
6ef1b649 14use proxmox_router::HttpError;
05a52d01
HL
15use proxmox_sys::{task_log, task_warn};
16use serde_json::json;
c23192d3 17
2d5287fb 18use pbs_api_types::{
05a52d01
HL
19 print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
20 GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
7aeabff2 21 PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
2d5287fb 22};
05a52d01
HL
23use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
24use pbs_config::CachedUserInfo;
ea584a75
WB
25use pbs_datastore::data_blob::DataBlob;
26use pbs_datastore::dynamic_index::DynamicIndexReader;
27use pbs_datastore::fixed_index::FixedIndexReader;
28use pbs_datastore::index::IndexFile;
29use pbs_datastore::manifest::{
ee0ea735 30 archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
ea584a75 31};
05a52d01 32use pbs_datastore::read_chunk::AsyncReadChunk;
076f36ec
HL
33use pbs_datastore::{
34 check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
35};
ba0ccc59 36use pbs_tools::sha::sha256;
c23192d3 37
076f36ec 38use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
260147bd 39use crate::tools::parallel_handler::ParallelHandler;
07ad6470 40
05a52d01
HL
41struct RemoteReader {
42 backup_reader: Arc<BackupReader>,
43 dir: BackupDir,
44}
45
076f36ec
HL
46struct LocalReader {
47 _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
48 path: PathBuf,
49 datastore: Arc<DataStore>,
50}
51
05a52d01 52pub(crate) struct PullTarget {
6e9e6c7a 53 store: Arc<DataStore>,
c06c1b4b 54 ns: BackupNamespace,
05a52d01
HL
55}
56
57pub(crate) struct RemoteSource {
58 repo: BackupRepository,
59 ns: BackupNamespace,
60 client: HttpClient,
61}
62
076f36ec
HL
63pub(crate) struct LocalSource {
64 store: Arc<DataStore>,
65 ns: BackupNamespace,
66}
67
68ac365f
CE
68#[derive(Default)]
69pub(crate) struct PullStats {
70 pub(crate) chunk_count: usize,
71 pub(crate) bytes: usize,
72 pub(crate) elapsed: Duration,
73}
74
75impl PullStats {
76 fn add(&mut self, rhs: PullStats) {
77 self.chunk_count += rhs.chunk_count;
78 self.bytes += rhs.bytes;
79 self.elapsed += rhs.elapsed;
80 }
81}
82
05a52d01
HL
83#[async_trait::async_trait]
84/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
85/// The trait includes methods for listing namespaces, groups, and backup directories,
86/// as well as retrieving a reader for reading data from the source
87trait PullSource: Send + Sync {
88 /// Lists namespaces from the source.
89 async fn list_namespaces(
90 &self,
91 max_depth: &mut Option<usize>,
92 worker: &WorkerTask,
93 ) -> Result<Vec<BackupNamespace>, Error>;
94
95 /// Lists groups within a specific namespace from the source.
96 async fn list_groups(
97 &self,
98 namespace: &BackupNamespace,
99 owner: &Authid,
100 ) -> Result<Vec<BackupGroup>, Error>;
101
102 /// Lists backup directories for a specific group within a specific namespace from the source.
103 async fn list_backup_dirs(
104 &self,
105 namespace: &BackupNamespace,
106 group: &BackupGroup,
107 worker: &WorkerTask,
108 ) -> Result<Vec<BackupDir>, Error>;
109 fn get_ns(&self) -> BackupNamespace;
4cc4ea64 110 fn get_store(&self) -> &str;
05a52d01
HL
111
112 /// Returns a reader for reading data from a specific backup directory.
113 async fn reader(
114 &self,
115 ns: &BackupNamespace,
116 dir: &BackupDir,
117 ) -> Result<Arc<dyn PullReader>, Error>;
118}
119
120#[async_trait::async_trait]
121impl PullSource for RemoteSource {
122 async fn list_namespaces(
123 &self,
124 max_depth: &mut Option<usize>,
125 worker: &WorkerTask,
126 ) -> Result<Vec<BackupNamespace>, Error> {
127 if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
128 return Ok(vec![self.ns.clone()]);
129 }
130
131 let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
132 let mut data = json!({});
133 if let Some(max_depth) = max_depth {
134 data["max-depth"] = json!(max_depth);
135 }
136
137 if !self.ns.is_root() {
138 data["parent"] = json!(self.ns);
139 }
140 self.client.login().await?;
141
142 let mut result = match self.client.get(&path, Some(data)).await {
143 Ok(res) => res,
144 Err(err) => match err.downcast_ref::<HttpError>() {
145 Some(HttpError { code, message }) => match code {
146 &StatusCode::NOT_FOUND => {
147 if self.ns.is_root() && max_depth.is_none() {
148 task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
149 task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
150 max_depth.replace(0);
151 } else {
152 bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
153 }
154
155 return Ok(vec![self.ns.clone()]);
156 }
157 _ => {
158 bail!("Querying namespaces failed - HTTP error {code} - {message}");
159 }
160 },
161 None => {
162 bail!("Querying namespaces failed - {err}");
163 }
164 },
165 };
166
167 let list: Vec<BackupNamespace> =
168 serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
169 .into_iter()
170 .map(|list_item| list_item.ns)
171 .collect();
172
173 Ok(list)
174 }
175
176 async fn list_groups(
177 &self,
178 namespace: &BackupNamespace,
179 _owner: &Authid,
180 ) -> Result<Vec<BackupGroup>, Error> {
181 let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
182
183 let args = if !namespace.is_root() {
184 Some(json!({ "ns": namespace.clone() }))
185 } else {
186 None
187 };
188
189 self.client.login().await?;
190 let mut result =
191 self.client.get(&path, args).await.map_err(|err| {
192 format_err!("Failed to retrieve backup groups from remote - {}", err)
193 })?;
194
195 Ok(
196 serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
197 .map_err(Error::from)?
198 .into_iter()
199 .map(|item| item.backup)
200 .collect::<Vec<BackupGroup>>(),
201 )
202 }
203
204 async fn list_backup_dirs(
205 &self,
b14e5dcb 206 namespace: &BackupNamespace,
05a52d01
HL
207 group: &BackupGroup,
208 worker: &WorkerTask,
209 ) -> Result<Vec<BackupDir>, Error> {
210 let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
211
212 let mut args = json!({
213 "backup-type": group.ty,
214 "backup-id": group.id,
215 });
216
b14e5dcb 217 if !namespace.is_root() {
2224b390 218 args["ns"] = serde_json::to_value(namespace)?;
05a52d01
HL
219 }
220
221 self.client.login().await?;
222
223 let mut result = self.client.get(&path, Some(args)).await?;
224 let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
225 Ok(snapshot_list
226 .into_iter()
227 .filter_map(|item: SnapshotListItem| {
228 let snapshot = item.backup;
229 // in-progress backups can't be synced
230 if item.size.is_none() {
231 task_log!(
232 worker,
233 "skipping snapshot {} - in-progress backup",
234 snapshot
235 );
236 return None;
237 }
238
239 Some(snapshot)
240 })
241 .collect::<Vec<BackupDir>>())
242 }
243
244 fn get_ns(&self) -> BackupNamespace {
245 self.ns.clone()
246 }
247
4cc4ea64 248 fn get_store(&self) -> &str {
2224b390 249 self.repo.store()
05a52d01
HL
250 }
251
252 async fn reader(
253 &self,
254 ns: &BackupNamespace,
255 dir: &BackupDir,
256 ) -> Result<Arc<dyn PullReader>, Error> {
257 let backup_reader =
258 BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
259 Ok(Arc::new(RemoteReader {
260 backup_reader,
261 dir: dir.clone(),
262 }))
263 }
264}
265
076f36ec
HL
266#[async_trait::async_trait]
267impl PullSource for LocalSource {
268 async fn list_namespaces(
269 &self,
270 max_depth: &mut Option<usize>,
271 _worker: &WorkerTask,
272 ) -> Result<Vec<BackupNamespace>, Error> {
273 ListNamespacesRecursive::new_max_depth(
274 self.store.clone(),
275 self.ns.clone(),
276 max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
277 )?
278 .collect()
279 }
280
281 async fn list_groups(
282 &self,
283 namespace: &BackupNamespace,
284 owner: &Authid,
285 ) -> Result<Vec<BackupGroup>, Error> {
286 Ok(ListAccessibleBackupGroups::new_with_privs(
287 &self.store,
288 namespace.clone(),
289 0,
7aeabff2
HL
290 Some(PRIV_DATASTORE_READ),
291 Some(PRIV_DATASTORE_BACKUP),
076f36ec
HL
292 Some(owner),
293 )?
294 .filter_map(Result::ok)
295 .map(|backup_group| backup_group.group().clone())
296 .collect::<Vec<pbs_api_types::BackupGroup>>())
297 }
298
299 async fn list_backup_dirs(
300 &self,
301 namespace: &BackupNamespace,
302 group: &BackupGroup,
303 _worker: &WorkerTask,
304 ) -> Result<Vec<BackupDir>, Error> {
305 Ok(self
306 .store
307 .backup_group(namespace.clone(), group.clone())
308 .iter_snapshots()?
309 .filter_map(Result::ok)
310 .map(|snapshot| snapshot.dir().to_owned())
311 .collect::<Vec<BackupDir>>())
312 }
313
314 fn get_ns(&self) -> BackupNamespace {
315 self.ns.clone()
316 }
317
4cc4ea64
FG
318 fn get_store(&self) -> &str {
319 self.store.name()
076f36ec
HL
320 }
321
322 async fn reader(
323 &self,
324 ns: &BackupNamespace,
325 dir: &BackupDir,
326 ) -> Result<Arc<dyn PullReader>, Error> {
327 let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
328 let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
329 &dir.full_path(),
330 "snapshot",
331 "locked by another operation",
332 )?;
333 Ok(Arc::new(LocalReader {
334 _dir_lock: Arc::new(Mutex::new(dir_lock)),
335 path: dir.full_path(),
336 datastore: dir.datastore().clone(),
337 }))
338 }
339}
340
05a52d01
HL
341#[async_trait::async_trait]
342/// `PullReader` is a trait that provides an interface for reading data from a source.
343/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
344trait PullReader: Send + Sync {
345 /// Returns a chunk reader with the specified encryption mode.
346 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
347
348 /// Asynchronously loads a file from the source into a local file.
349 /// `filename` is the name of the file to load from the source.
350 /// `into` is the path of the local file to load the source file into.
351 async fn load_file_into(
352 &self,
353 filename: &str,
354 into: &Path,
355 worker: &WorkerTask,
356 ) -> Result<Option<DataBlob>, Error>;
357
358 /// Tries to download the client log from the source and save it into a local file.
359 async fn try_download_client_log(
360 &self,
361 to_path: &Path,
362 worker: &WorkerTask,
363 ) -> Result<(), Error>;
364
365 fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
366}
367
368#[async_trait::async_trait]
369impl PullReader for RemoteReader {
370 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
371 Arc::new(RemoteChunkReader::new(
372 self.backup_reader.clone(),
373 None,
374 crypt_mode,
375 HashMap::new(),
376 ))
377 }
378
379 async fn load_file_into(
380 &self,
381 filename: &str,
382 into: &Path,
383 worker: &WorkerTask,
384 ) -> Result<Option<DataBlob>, Error> {
385 let mut tmp_file = std::fs::OpenOptions::new()
386 .write(true)
387 .create(true)
388 .truncate(true)
389 .read(true)
390 .open(into)?;
391 let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
392 if let Err(err) = download_result {
393 match err.downcast_ref::<HttpError>() {
394 Some(HttpError { code, message }) => match *code {
395 StatusCode::NOT_FOUND => {
396 task_log!(
397 worker,
398 "skipping snapshot {} - vanished since start of sync",
399 &self.dir,
400 );
401 return Ok(None);
402 }
403 _ => {
404 bail!("HTTP error {code} - {message}");
405 }
406 },
407 None => {
408 return Err(err);
409 }
410 };
411 };
412 tmp_file.rewind()?;
413 Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
414 }
415
416 async fn try_download_client_log(
417 &self,
418 to_path: &Path,
419 worker: &WorkerTask,
420 ) -> Result<(), Error> {
421 let mut tmp_path = to_path.to_owned();
422 tmp_path.set_extension("tmp");
423
424 let tmpfile = std::fs::OpenOptions::new()
425 .write(true)
426 .create(true)
427 .read(true)
428 .open(&tmp_path)?;
429
430 // Note: be silent if there is no log - only log successful download
431 if let Ok(()) = self
432 .backup_reader
433 .download(CLIENT_LOG_BLOB_NAME, tmpfile)
434 .await
435 {
436 if let Err(err) = std::fs::rename(&tmp_path, to_path) {
437 bail!("Atomic rename file {:?} failed - {}", to_path, err);
438 }
439 task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
440 }
441
442 Ok(())
443 }
444
445 fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
446 false
447 }
448}
449
076f36ec
HL
450#[async_trait::async_trait]
451impl PullReader for LocalReader {
452 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
453 Arc::new(LocalChunkReader::new(
454 self.datastore.clone(),
455 None,
456 crypt_mode,
457 ))
458 }
459
460 async fn load_file_into(
461 &self,
462 filename: &str,
463 into: &Path,
464 _worker: &WorkerTask,
465 ) -> Result<Option<DataBlob>, Error> {
466 let mut tmp_file = std::fs::OpenOptions::new()
467 .write(true)
468 .create(true)
469 .truncate(true)
470 .read(true)
471 .open(into)?;
472 let mut from_path = self.path.clone();
473 from_path.push(filename);
474 tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
475 tmp_file.rewind()?;
476 Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
477 }
478
479 async fn try_download_client_log(
480 &self,
481 _to_path: &Path,
482 _worker: &WorkerTask,
483 ) -> Result<(), Error> {
484 Ok(())
485 }
486
487 fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
488 self.datastore.name() == target_store_name
489 }
490}
491
05a52d01
HL
492/// Parameters for a pull operation.
493pub(crate) struct PullParameters {
494 /// Where data is pulled from
495 source: Arc<dyn PullSource>,
496 /// Where data should be pulled into
497 target: PullTarget,
29c56859 498 /// Owner of synced groups (needs to match local owner of pre-existing groups)
6e9e6c7a 499 owner: Authid,
29c56859 500 /// Whether to remove groups which exist locally, but not on the remote end
6e9e6c7a 501 remove_vanished: bool,
b9310489
FG
502 /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion)
503 max_depth: Option<usize>,
29c56859 504 /// Filters for reducing the pull scope
59c92736 505 group_filter: Vec<GroupFilter>,
9b67352a
SH
506 /// How many snapshots should be transferred at most (taking the newest N snapshots)
507 transfer_last: Option<usize>,
6e9e6c7a
FG
508}
509
510impl PullParameters {
29c56859 511 /// Creates a new instance of `PullParameters`.
5b6cb51d 512 pub(crate) fn new(
6e9e6c7a 513 store: &str,
c06c1b4b 514 ns: BackupNamespace,
05a52d01 515 remote: Option<&str>,
6e9e6c7a 516 remote_store: &str,
c06c1b4b 517 remote_ns: BackupNamespace,
6e9e6c7a
FG
518 owner: Authid,
519 remove_vanished: Option<bool>,
b9310489 520 max_depth: Option<usize>,
71e53463 521 group_filter: Option<Vec<GroupFilter>>,
2d5287fb 522 limit: RateLimitConfig,
9b67352a 523 transfer_last: Option<usize>,
6e9e6c7a 524 ) -> Result<Self, Error> {
66abc4cb
FG
525 if let Some(max_depth) = max_depth {
526 ns.check_max_depth(max_depth)?;
527 remote_ns.check_max_depth(max_depth)?;
05a52d01 528 };
61ef4ae8 529 let remove_vanished = remove_vanished.unwrap_or(false);
6e9e6c7a 530
05a52d01
HL
531 let source: Arc<dyn PullSource> = if let Some(remote) = remote {
532 let (remote_config, _digest) = pbs_config::remote::config()?;
533 let remote: Remote = remote_config.lookup("remote", remote)?;
6e9e6c7a 534
05a52d01
HL
535 let repo = BackupRepository::new(
536 Some(remote.config.auth_id.clone()),
537 Some(remote.config.host.clone()),
538 remote.config.port,
539 remote_store.to_string(),
540 );
541 let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
542 Arc::new(RemoteSource {
543 repo,
544 ns: remote_ns,
545 client,
546 })
547 } else {
076f36ec
HL
548 Arc::new(LocalSource {
549 store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
550 ns: remote_ns,
551 })
05a52d01
HL
552 };
553 let target = PullTarget {
554 store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
c06c1b4b 555 ns,
05a52d01
HL
556 };
557
2dd9f98f 558 let group_filter = group_filter.unwrap_or_default();
59c92736 559
05a52d01 560 Ok(Self {
ee0ea735 561 source,
05a52d01 562 target,
ee0ea735
TL
563 owner,
564 remove_vanished,
c06c1b4b 565 max_depth,
ee0ea735 566 group_filter,
9b67352a 567 transfer_last,
ee0ea735 568 })
6e9e6c7a 569 }
6e9e6c7a
FG
570}
571
07ad6470 572async fn pull_index_chunks<I: IndexFile>(
998db639 573 worker: &WorkerTask,
05a52d01 574 chunk_reader: Arc<dyn AsyncReadChunk>,
07ad6470
DM
575 target: Arc<DataStore>,
576 index: I,
e2956c60 577 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
68ac365f 578) -> Result<PullStats, Error> {
73b2cc49 579 use futures::stream::{self, StreamExt, TryStreamExt};
07ad6470 580
998db639
DM
581 let start_time = SystemTime::now();
582
ebbe4958
DM
583 let stream = stream::iter(
584 (0..index.index_count())
585 .map(|pos| index.chunk_info(pos).unwrap())
586 .filter(|info| {
587 let mut guard = downloaded_chunks.lock().unwrap();
588 let done = guard.contains(&info.digest);
589 if !done {
590 // Note: We mark a chunk as downloaded before its actually downloaded
591 // to avoid duplicate downloads.
592 guard.insert(info.digest);
593 }
594 !done
e2956c60 595 }),
ebbe4958 596 );
07ad6470 597
a71bc08f 598 let target2 = target.clone();
54417086 599 let verify_pool = ParallelHandler::new(
e2956c60
FG
600 "sync chunk writer",
601 4,
602 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
25877d05 603 // println!("verify and write {}", hex::encode(&digest));
54417086 604 chunk.verify_unencrypted(size as usize, &digest)?;
a71bc08f 605 target2.insert_chunk(&chunk, &digest)?;
54417086 606 Ok(())
e2956c60 607 },
54417086
DM
608 );
609
610 let verify_and_write_channel = verify_pool.channel();
998db639 611
54417086 612 let bytes = Arc::new(AtomicUsize::new(0));
68ac365f 613 let chunk_count = Arc::new(AtomicUsize::new(0));
54417086
DM
614
615 stream
73b2cc49 616 .map(|info| {
73b2cc49
DM
617 let target = Arc::clone(&target);
618 let chunk_reader = chunk_reader.clone();
54417086 619 let bytes = Arc::clone(&bytes);
68ac365f 620 let chunk_count = Arc::clone(&chunk_count);
54417086 621 let verify_and_write_channel = verify_and_write_channel.clone();
73b2cc49
DM
622
623 Ok::<_, Error>(async move {
9a1b24b6 624 let chunk_exists = proxmox_async::runtime::block_in_place(|| {
e2956c60
FG
625 target.cond_touch_chunk(&info.digest, false)
626 })?;
73b2cc49 627 if chunk_exists {
25877d05 628 //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest));
73b2cc49
DM
629 return Ok::<_, Error>(());
630 }
25877d05 631 //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest));
73b2cc49 632 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
54417086 633 let raw_size = chunk.raw_size() as usize;
73b2cc49 634
998db639 635 // decode, verify and write in a separate threads to maximize throughput
9a1b24b6 636 proxmox_async::runtime::block_in_place(|| {
e2956c60
FG
637 verify_and_write_channel.send((chunk, info.digest, info.size()))
638 })?;
54417086
DM
639
640 bytes.fetch_add(raw_size, Ordering::SeqCst);
68ac365f 641 chunk_count.fetch_add(1, Ordering::SeqCst);
998db639
DM
642
643 Ok(())
e2956c60 644 })
73b2cc49
DM
645 })
646 .try_buffer_unordered(20)
647 .try_for_each(|_res| futures::future::ok(()))
54417086 648 .await?;
998db639 649
54417086 650 drop(verify_and_write_channel);
998db639 651
54417086 652 verify_pool.complete()?;
998db639 653
68ac365f 654 let elapsed = start_time.elapsed()?;
998db639 655
54417086 656 let bytes = bytes.load(Ordering::SeqCst);
68ac365f 657 let chunk_count = chunk_count.load(Ordering::SeqCst);
54417086 658
1ec0d70d 659 task_log!(
ee0ea735 660 worker,
12632250
CE
661 "downloaded {} ({}/s)",
662 HumanByte::from(bytes),
663 HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
1ec0d70d 664 );
07ad6470 665
68ac365f
CE
666 Ok(PullStats {
667 chunk_count,
668 bytes,
669 elapsed,
670 })
07ad6470
DM
671}
672
e2956c60 673fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
2ce15934 674 if size != info.size {
e2956c60
FG
675 bail!(
676 "wrong size for file '{}' ({} != {})",
677 info.filename,
678 info.size,
679 size
680 );
2ce15934
FG
681 }
682
683 if csum != &info.csum {
684 bail!("wrong checksum for file '{}'", info.filename);
685 }
686
687 Ok(())
688}
689
29c56859
FG
690/// Pulls a single file referenced by a manifest.
691///
692/// Pulling an archive consists of the following steps:
05a52d01
HL
693/// - Load archive file into tmp file
694/// -- Load file into tmp file
695/// -- Verify tmp file checksum
29c56859
FG
696/// - if archive is an index, pull referenced chunks
697/// - Rename tmp file into real path
05a52d01
HL
698async fn pull_single_archive<'a>(
699 worker: &'a WorkerTask,
700 reader: Arc<dyn PullReader + 'a>,
701 snapshot: &'a pbs_datastore::BackupDir,
702 archive_info: &'a FileInfo,
e2956c60 703 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
68ac365f 704) -> Result<PullStats, Error> {
2ce15934 705 let archive_name = &archive_info.filename;
c06c1b4b 706 let mut path = snapshot.full_path();
07ad6470
DM
707 path.push(archive_name);
708
709 let mut tmp_path = path.clone();
710 tmp_path.set_extension("tmp");
711
68ac365f
CE
712 let mut pull_stats = PullStats::default();
713
1ec0d70d
DM
714 task_log!(worker, "sync archive {}", archive_name);
715
05a52d01
HL
716 reader
717 .load_file_into(archive_name, &tmp_path, worker)
718 .await?;
07ad6470 719
05a52d01 720 let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
07ad6470
DM
721
722 match archive_type(archive_name)? {
723 ArchiveType::DynamicIndex => {
e2956c60
FG
724 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
725 format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
726 })?;
2ce15934
FG
727 let (csum, size) = index.compute_csum();
728 verify_archive(archive_info, &csum, size)?;
07ad6470 729
05a52d01
HL
730 if reader.skip_chunk_sync(snapshot.datastore().name()) {
731 task_log!(worker, "skipping chunk sync for same datastore");
732 } else {
68ac365f 733 let stats = pull_index_chunks(
05a52d01
HL
734 worker,
735 reader.chunk_reader(archive_info.crypt_mode),
736 snapshot.datastore().clone(),
737 index,
738 downloaded_chunks,
739 )
740 .await?;
68ac365f 741 pull_stats.add(stats);
05a52d01 742 }
07ad6470
DM
743 }
744 ArchiveType::FixedIndex => {
e2956c60
FG
745 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
746 format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
747 })?;
2ce15934
FG
748 let (csum, size) = index.compute_csum();
749 verify_archive(archive_info, &csum, size)?;
07ad6470 750
05a52d01
HL
751 if reader.skip_chunk_sync(snapshot.datastore().name()) {
752 task_log!(worker, "skipping chunk sync for same datastore");
753 } else {
68ac365f 754 let stats = pull_index_chunks(
05a52d01
HL
755 worker,
756 reader.chunk_reader(archive_info.crypt_mode),
757 snapshot.datastore().clone(),
758 index,
759 downloaded_chunks,
760 )
761 .await?;
68ac365f 762 pull_stats.add(stats);
05a52d01 763 }
07ad6470 764 }
2ce15934 765 ArchiveType::Blob => {
05a52d01 766 tmpfile.rewind()?;
ba0ccc59 767 let (csum, size) = sha256(&mut tmpfile)?;
2ce15934
FG
768 verify_archive(archive_info, &csum, size)?;
769 }
07ad6470
DM
770 }
771 if let Err(err) = std::fs::rename(&tmp_path, &path) {
772 bail!("Atomic rename file {:?} failed - {}", path, err);
773 }
68ac365f 774 Ok(pull_stats)
07ad6470
DM
775}
776
29c56859
FG
777/// Actual implementation of pulling a snapshot.
778///
779/// Pulling a snapshot consists of the following steps:
780/// - (Re)download the manifest
781/// -- if it matches, only download log and treat snapshot as already synced
782/// - Iterate over referenced files
783/// -- if file already exists, verify contents
784/// -- if not, pull it from the remote
785/// - Download log if not already existing
05a52d01
HL
786async fn pull_snapshot<'a>(
787 worker: &'a WorkerTask,
788 reader: Arc<dyn PullReader + 'a>,
789 snapshot: &'a pbs_datastore::BackupDir,
e2956c60 790 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
68ac365f
CE
791) -> Result<PullStats, Error> {
792 let mut pull_stats = PullStats::default();
c06c1b4b 793 let mut manifest_name = snapshot.full_path();
07ad6470
DM
794 manifest_name.push(MANIFEST_BLOB_NAME);
795
c06c1b4b 796 let mut client_log_name = snapshot.full_path();
1610c45a
DM
797 client_log_name.push(CLIENT_LOG_BLOB_NAME);
798
07ad6470
DM
799 let mut tmp_manifest_name = manifest_name.clone();
800 tmp_manifest_name.set_extension("tmp");
05a52d01
HL
801 let tmp_manifest_blob;
802 if let Some(data) = reader
803 .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
804 .await?
805 {
806 tmp_manifest_blob = data;
807 } else {
68ac365f 808 return Ok(pull_stats);
05a52d01 809 }
07ad6470
DM
810
811 if manifest_name.exists() {
6ef1b649 812 let manifest_blob = proxmox_lang::try_block!({
e2956c60 813 let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
87cdc327 814 format_err!("unable to open local manifest {manifest_name:?} - {err}")
e2956c60 815 })?;
07ad6470 816
39f18b30 817 let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
07ad6470 818 Ok(manifest_blob)
e2956c60
FG
819 })
820 .map_err(|err: Error| {
87cdc327 821 format_err!("unable to read local manifest {manifest_name:?} - {err}")
07ad6470
DM
822 })?;
823
824 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
1610c45a 825 if !client_log_name.exists() {
05a52d01
HL
826 reader
827 .try_download_client_log(&client_log_name, worker)
828 .await?;
829 };
1ec0d70d 830 task_log!(worker, "no data changes");
e0085e66 831 let _ = std::fs::remove_file(&tmp_manifest_name);
68ac365f 832 return Ok(pull_stats); // nothing changed
07ad6470
DM
833 }
834 }
835
836 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
837
07ad6470 838 for item in manifest.files() {
c06c1b4b 839 let mut path = snapshot.full_path();
07ad6470
DM
840 path.push(&item.filename);
841
842 if path.exists() {
843 match archive_type(&item.filename)? {
844 ArchiveType::DynamicIndex => {
845 let index = DynamicIndexReader::open(&path)?;
846 let (csum, size) = index.compute_csum();
847 match manifest.verify_file(&item.filename, &csum, size) {
848 Ok(_) => continue,
849 Err(err) => {
1ec0d70d 850 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
851 }
852 }
853 }
854 ArchiveType::FixedIndex => {
855 let index = FixedIndexReader::open(&path)?;
856 let (csum, size) = index.compute_csum();
857 match manifest.verify_file(&item.filename, &csum, size) {
858 Ok(_) => continue,
859 Err(err) => {
1ec0d70d 860 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
861 }
862 }
863 }
864 ArchiveType::Blob => {
865 let mut tmpfile = std::fs::File::open(&path)?;
ba0ccc59 866 let (csum, size) = sha256(&mut tmpfile)?;
07ad6470
DM
867 match manifest.verify_file(&item.filename, &csum, size) {
868 Ok(_) => continue,
869 Err(err) => {
1ec0d70d 870 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
871 }
872 }
873 }
874 }
875 }
876
68ac365f 877 let stats = pull_single_archive(
07ad6470 878 worker,
05a52d01 879 reader.clone(),
07ad6470 880 snapshot,
9a37bd6c 881 item,
ebbe4958 882 downloaded_chunks.clone(),
e2956c60
FG
883 )
884 .await?;
68ac365f 885 pull_stats.add(stats);
07ad6470
DM
886 }
887
888 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
889 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
890 }
891
1610c45a 892 if !client_log_name.exists() {
05a52d01
HL
893 reader
894 .try_download_client_log(&client_log_name, worker)
895 .await?;
896 };
fe79687c
TL
897 snapshot
898 .cleanup_unreferenced_files(&manifest)
899 .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
07ad6470 900
68ac365f 901 Ok(pull_stats)
07ad6470
DM
902}
903
c06c1b4b
FG
904/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
905///
05a52d01
HL
906/// The `reader` is configured to read from the source backup directory, while the
907/// `snapshot` is pointing to the local datastore and target namespace.
908async fn pull_snapshot_from<'a>(
909 worker: &'a WorkerTask,
910 reader: Arc<dyn PullReader + 'a>,
911 snapshot: &'a pbs_datastore::BackupDir,
e2956c60 912 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
68ac365f 913) -> Result<PullStats, Error> {
c06c1b4b
FG
914 let (_path, is_new, _snap_lock) = snapshot
915 .datastore()
1afce610 916 .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
07ad6470 917
68ac365f 918 let pull_stats = if is_new {
1afce610 919 task_log!(worker, "sync snapshot {}", snapshot.dir());
07ad6470 920
68ac365f
CE
921 match pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
922 Err(err) => {
923 if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
924 snapshot.backup_ns(),
925 snapshot.as_ref(),
926 true,
927 ) {
928 task_log!(worker, "cleanup error - {}", cleanup_err);
929 }
930 return Err(err);
931 }
932 Ok(pull_stats) => {
933 task_log!(worker, "sync snapshot {} done", snapshot.dir());
934 pull_stats
07ad6470 935 }
07ad6470
DM
936 }
937 } else {
1afce610 938 task_log!(worker, "re-sync snapshot {}", snapshot.dir());
68ac365f
CE
939 pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?
940 };
07ad6470 941
68ac365f 942 Ok(pull_stats)
07ad6470
DM
943}
944
40a57cfa 945#[derive(PartialEq, Eq)]
71db1615
SH
946enum SkipReason {
947 AlreadySynced,
948 TransferLast,
949}
950
40a57cfa
FG
951impl std::fmt::Display for SkipReason {
952 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
953 write!(
954 f,
955 "{}",
956 match self {
957 SkipReason::AlreadySynced => "older than the newest local snapshot",
958 SkipReason::TransferLast => "due to transfer-last",
959 }
960 )
961 }
962}
963
d2354a16
DC
964struct SkipInfo {
965 oldest: i64,
966 newest: i64,
967 count: u64,
71db1615 968 skip_reason: SkipReason,
d2354a16
DC
969}
970
971impl SkipInfo {
71db1615
SH
972 fn new(skip_reason: SkipReason) -> Self {
973 SkipInfo {
974 oldest: i64::MAX,
975 newest: i64::MIN,
976 count: 0,
977 skip_reason,
978 }
979 }
980
981 fn reset(&mut self) {
982 self.count = 0;
983 self.oldest = i64::MAX;
984 self.newest = i64::MIN;
985 }
986
d2354a16
DC
987 fn update(&mut self, backup_time: i64) {
988 self.count += 1;
989
990 if backup_time < self.oldest {
991 self.oldest = backup_time;
992 }
993
994 if backup_time > self.newest {
995 self.newest = backup_time;
996 }
997 }
998
999 fn affected(&self) -> Result<String, Error> {
1000 match self.count {
1001 0 => Ok(String::new()),
6ef1b649 1002 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
ee0ea735
TL
1003 _ => Ok(format!(
1004 "{} .. {}",
1005 proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
1006 proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
1007 )),
d2354a16
DC
1008 }
1009 }
1010}
1011
1012impl std::fmt::Display for SkipInfo {
1013 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1014 write!(
1015 f,
71db1615 1016 "skipped: {} snapshot(s) ({}) - {}",
d2354a16 1017 self.count,
71db1615 1018 self.affected().map_err(|_| std::fmt::Error)?,
40a57cfa 1019 self.skip_reason,
d2354a16
DC
1020 )
1021 }
1022}
1023
29c56859
FG
1024/// Pulls a group according to `params`.
1025///
1026/// Pulling a group consists of the following steps:
c06c1b4b
FG
1027/// - Query the list of snapshots available for this group in the source namespace on the remote
1028/// - Sort by snapshot time
29c56859
FG
1029/// - Get last snapshot timestamp on local datastore
1030/// - Iterate over list of snapshots
29c56859
FG
1031/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
1032/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
1033///
05a52d01 1034/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
c06c1b4b
FG
1035/// remote when querying snapshots. This allows us to interact with old remotes that don't have
1036/// namespace support yet.
1037///
29c56859
FG
1038/// Permission checks:
1039/// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
1040/// - local group owner is already checked by pull_store
aa073917 1041async fn pull_group(
07ad6470 1042 worker: &WorkerTask,
6e9e6c7a 1043 params: &PullParameters,
05a52d01
HL
1044 source_namespace: &BackupNamespace,
1045 group: &BackupGroup,
fc8920e3 1046 progress: &mut StoreProgress,
68ac365f 1047) -> Result<PullStats, Error> {
71db1615
SH
1048 let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
1049 let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
d2354a16 1050
05a52d01
HL
1051 let mut raw_list: Vec<BackupDir> = params
1052 .source
1053 .list_backup_dirs(source_namespace, group, worker)
1054 .await?;
1055 raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
1056
1057 let total_amount = raw_list.len();
9b67352a
SH
1058
1059 let cutoff = params
1060 .transfer_last
1061 .map(|count| total_amount.saturating_sub(count))
1062 .unwrap_or_default();
1063
05a52d01 1064 let target_ns = source_namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
86f6f741 1065
05a52d01
HL
1066 let mut source_snapshots = HashSet::new();
1067 let last_sync_time = params
1068 .target
1069 .store
1070 .last_successful_backup(&target_ns, group)?
1071 .unwrap_or(i64::MIN);
1072
1073 let list: Vec<BackupDir> = raw_list
1074 .into_iter()
1075 .enumerate()
1076 .filter(|&(pos, ref dir)| {
1077 source_snapshots.insert(dir.time);
1078 if last_sync_time > dir.time {
1079 already_synced_skip_info.update(dir.time);
1080 return false;
1081 } else if already_synced_skip_info.count > 0 {
1082 task_log!(worker, "{}", already_synced_skip_info);
1083 already_synced_skip_info.reset();
1084 return true;
1085 }
0081903f 1086
05a52d01
HL
1087 if pos < cutoff && last_sync_time != dir.time {
1088 transfer_last_skip_info.update(dir.time);
1089 return false;
1090 } else if transfer_last_skip_info.count > 0 {
1091 task_log!(worker, "{}", transfer_last_skip_info);
1092 transfer_last_skip_info.reset();
1093 }
1094 true
1095 })
1096 .map(|(_, dir)| dir)
1097 .collect();
07ad6470 1098
05a52d01
HL
1099 // start with 65536 chunks (up to 256 GiB)
1100 let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
07ad6470 1101
05a52d01 1102 progress.group_snapshots = list.len() as u64;
07ad6470 1103
68ac365f
CE
1104 let mut pull_stats = PullStats::default();
1105
05a52d01
HL
1106 for (pos, from_snapshot) in list.into_iter().enumerate() {
1107 let to_snapshot = params
1108 .target
1109 .store
1110 .backup_dir(target_ns.clone(), from_snapshot.clone())?;
c06c1b4b 1111
05a52d01
HL
1112 let reader = params
1113 .source
1114 .reader(source_namespace, &from_snapshot)
1115 .await?;
1116 let result =
1117 pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
7b8aa893 1118
fc8920e3 1119 progress.done_snapshots = pos as u64 + 1;
1ec0d70d 1120 task_log!(worker, "percentage done: {}", progress);
7b8aa893 1121
68ac365f
CE
1122 let stats = result?; // stop on error
1123 pull_stats.add(stats);
07ad6470
DM
1124 }
1125
6e9e6c7a 1126 if params.remove_vanished {
05a52d01
HL
1127 let group = params
1128 .target
1129 .store
1130 .backup_group(target_ns.clone(), group.clone());
6da20161 1131 let local_list = group.list_backups()?;
07ad6470 1132 for info in local_list {
1afce610 1133 let snapshot = info.backup_dir;
05a52d01 1134 if source_snapshots.contains(&snapshot.backup_time()) {
e2956c60
FG
1135 continue;
1136 }
1afce610 1137 if snapshot.is_protected() {
34339261
DC
1138 task_log!(
1139 worker,
e3ea5770 1140 "don't delete vanished snapshot {} (protected)",
1afce610 1141 snapshot.dir()
34339261
DC
1142 );
1143 continue;
1144 }
1afce610 1145 task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
db87d93e 1146 params
05a52d01 1147 .target
db87d93e 1148 .store
1afce610 1149 .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
07ad6470
DM
1150 }
1151 }
1152
68ac365f 1153 Ok(pull_stats)
07ad6470
DM
1154}
1155
abd82485 1156fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
c06c1b4b 1157 let mut created = false;
05a52d01 1158 let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
c06c1b4b 1159
05a52d01
HL
1160 if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
1161 check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
77bd14f6
FG
1162 .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
1163
ea2e91e5
FG
1164 let name = match ns.components().last() {
1165 Some(name) => name.to_owned(),
1166 None => {
1167 bail!("Failed to determine last component of namespace.");
c06c1b4b 1168 }
ea2e91e5
FG
1169 };
1170
05a52d01 1171 if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
abd82485 1172 bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
c06c1b4b 1173 }
ea2e91e5 1174 created = true;
c06c1b4b
FG
1175 }
1176
abd82485 1177 check_ns_privs(
05a52d01 1178 params.target.store.name(),
abd82485
FG
1179 ns,
1180 &params.owner,
1181 PRIV_DATASTORE_BACKUP,
1182 )
1183 .map_err(|err| format_err!("sync into {store_ns_str} not allowed - {err}"))?;
c06c1b4b
FG
1184
1185 Ok(created)
1186}
1187
1188fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
05a52d01 1189 check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
77bd14f6 1190 .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
ea2e91e5 1191
05a52d01
HL
1192 params
1193 .target
1194 .store
1195 .remove_namespace_recursive(local_ns, true)
c06c1b4b
FG
1196}
1197
1198fn check_and_remove_vanished_ns(
1199 worker: &WorkerTask,
1200 params: &PullParameters,
1201 synced_ns: HashSet<BackupNamespace>,
1202) -> Result<bool, Error> {
1203 let mut errors = false;
1204 let user_info = CachedUserInfo::new()?;
1205
87be232d
FG
1206 // clamp like remote does so that we don't list more than we can ever have synced.
1207 let max_depth = params
1208 .max_depth
05a52d01 1209 .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
87be232d 1210
c06c1b4b 1211 let mut local_ns_list: Vec<BackupNamespace> = params
05a52d01 1212 .target
c06c1b4b 1213 .store
05a52d01 1214 .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
c06c1b4b 1215 .filter(|ns| {
abd82485 1216 let user_privs =
05a52d01 1217 user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
c06c1b4b
FG
1218 user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
1219 })
1220 .collect();
1221
1222 // children first!
1223 local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
1224
1225 for local_ns in local_ns_list {
05a52d01 1226 if local_ns == params.target.ns {
c06c1b4b
FG
1227 continue;
1228 }
1229
1230 if synced_ns.contains(&local_ns) {
1231 continue;
1232 }
1233
1234 if local_ns.is_root() {
1235 continue;
1236 }
1237 match check_and_remove_ns(params, &local_ns) {
1238 Ok(true) => task_log!(worker, "Removed namespace {}", local_ns),
1239 Ok(false) => task_log!(
1240 worker,
1241 "Did not remove namespace {} - protected snapshots remain",
1242 local_ns
1243 ),
1244 Err(err) => {
1245 task_log!(worker, "Failed to remove namespace {} - {}", local_ns, err);
1246 errors = true;
1247 }
1248 }
1249 }
1250
1251 Ok(errors)
1252}
1253
29c56859
FG
1254/// Pulls a store according to `params`.
1255///
1256/// Pulling a store consists of the following steps:
c06c1b4b
FG
1257/// - Query list of namespaces on the remote
1258/// - Iterate list
1259/// -- create sub-NS if needed (and allowed)
1260/// -- attempt to pull each NS in turn
1261/// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote
1262///
1263/// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is
1264/// passed to the remote at all to allow pulling from remotes which have no notion of namespaces.
29c56859
FG
1265///
1266/// Permission checks:
c06c1b4b
FG
1267/// - access to local datastore, namespace anchor and remote entry need to be checked at call site
1268/// - remote namespaces are filtered by remote
1269/// - creation and removal of sub-NS checked here
1270/// - access to sub-NS checked here
5b6cb51d 1271pub(crate) async fn pull_store(
07ad6470 1272 worker: &WorkerTask,
d9aad37f 1273 mut params: PullParameters,
68ac365f 1274) -> Result<PullStats, Error> {
07ad6470 1275 // explicit create shared lock to prevent GC on newly created chunks
05a52d01 1276 let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
7a3e777d 1277 let mut errors = false;
07ad6470 1278
7a3e777d 1279 let old_max_depth = params.max_depth;
05a52d01
HL
1280 let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
1281 vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
c06c1b4b 1282 } else {
05a52d01
HL
1283 params
1284 .source
1285 .list_namespaces(&mut params.max_depth, worker)
1286 .await?
c06c1b4b 1287 };
05a52d01
HL
1288
1289 let ns_layers_to_be_pulled = namespaces
1290 .iter()
1291 .map(BackupNamespace::depth)
1292 .max()
1293 .map_or(0, |v| v - params.source.get_ns().depth());
1294 let target_depth = params.target.ns.depth();
1295
1296 if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
1297 bail!(
1298 "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
1299 ns_layers_to_be_pulled,
1300 target_depth,
1301 MAX_NAMESPACE_DEPTH
1302 );
1303 }
1304
7a3e777d 1305 errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
05a52d01 1306 namespaces.sort_unstable_by_key(|a| a.name_len());
c06c1b4b
FG
1307
1308 let (mut groups, mut snapshots) = (0, 0);
1309 let mut synced_ns = HashSet::with_capacity(namespaces.len());
68ac365f 1310 let mut pull_stats = PullStats::default();
c06c1b4b
FG
1311
1312 for namespace in namespaces {
4cc4ea64 1313 let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
abd82485 1314
05a52d01
HL
1315 let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
1316 let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
c06c1b4b
FG
1317
1318 task_log!(worker, "----");
1319 task_log!(
1320 worker,
1321 "Syncing {} into {}",
abd82485
FG
1322 source_store_ns_str,
1323 target_store_ns_str
c06c1b4b
FG
1324 );
1325
1326 synced_ns.insert(target_ns.clone());
1327
abd82485 1328 match check_and_create_ns(&params, &target_ns) {
c06c1b4b
FG
1329 Ok(true) => task_log!(worker, "Created namespace {}", target_ns),
1330 Ok(false) => {}
1331 Err(err) => {
1332 task_log!(
1333 worker,
1334 "Cannot sync {} into {} - {}",
abd82485
FG
1335 source_store_ns_str,
1336 target_store_ns_str,
c06c1b4b
FG
1337 err,
1338 );
1339 errors = true;
1340 continue;
1341 }
1342 }
1343
05a52d01 1344 match pull_ns(worker, &namespace, &mut params).await {
68ac365f 1345 Ok((ns_progress, ns_pull_stats, ns_errors)) => {
c06c1b4b
FG
1346 errors |= ns_errors;
1347
68ac365f
CE
1348 pull_stats.add(ns_pull_stats);
1349
b9310489 1350 if params.max_depth != Some(0) {
c06c1b4b
FG
1351 groups += ns_progress.done_groups;
1352 snapshots += ns_progress.done_snapshots;
1353 task_log!(
1354 worker,
1355 "Finished syncing namespace {}, current progress: {} groups, {} snapshots",
1356 namespace,
1357 groups,
1358 snapshots,
1359 );
1360 }
1361 }
1362 Err(err) => {
1363 errors = true;
1364 task_log!(
1365 worker,
1366 "Encountered errors while syncing namespace {} - {}",
05a52d01 1367 &namespace,
c06c1b4b
FG
1368 err,
1369 );
1370 }
1371 };
1372 }
1373
1374 if params.remove_vanished {
d9aad37f 1375 errors |= check_and_remove_vanished_ns(worker, &params, synced_ns)?;
c06c1b4b
FG
1376 }
1377
1378 if errors {
1379 bail!("sync failed with some errors.");
1380 }
1381
68ac365f 1382 Ok(pull_stats)
c06c1b4b
FG
1383}
1384
1385/// Pulls a namespace according to `params`.
1386///
1387/// Pulling a namespace consists of the following steps:
1388/// - Query list of groups on the remote (in `source_ns`)
1389/// - Filter list according to configured group filters
1390/// - Iterate list and attempt to pull each group in turn
1391/// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
1392/// not or no longer available on the remote
1393///
1394/// Permission checks:
1395/// - remote namespaces are filtered by remote
1396/// - owner check for vanished groups done here
5b6cb51d 1397pub(crate) async fn pull_ns(
c06c1b4b 1398 worker: &WorkerTask,
05a52d01
HL
1399 namespace: &BackupNamespace,
1400 params: &mut PullParameters,
68ac365f 1401) -> Result<(StoreProgress, PullStats, bool), Error> {
05a52d01 1402 let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, &params.owner).await?;
07ad6470
DM
1403
1404 list.sort_unstable_by(|a, b| {
05a52d01 1405 let type_order = a.ty.cmp(&b.ty);
07ad6470 1406 if type_order == std::cmp::Ordering::Equal {
05a52d01 1407 a.id.cmp(&b.id)
07ad6470
DM
1408 } else {
1409 type_order
1410 }
1411 });
1412
59c92736
PH
1413 let unfiltered_count = list.len();
1414 let list: Vec<BackupGroup> = list
1415 .into_iter()
1416 .filter(|group| group.apply_filters(&params.group_filter))
1417 .collect();
1418 task_log!(
1419 worker,
1420 "found {} groups to sync (out of {} total)",
1421 list.len(),
1422 unfiltered_count
1423 );
71e53463 1424
07ad6470
DM
1425 let mut errors = false;
1426
05a52d01 1427 let mut new_groups = HashSet::new();
e2e7560d
FG
1428 for group in list.iter() {
1429 new_groups.insert(group.clone());
07ad6470
DM
1430 }
1431
fc8920e3 1432 let mut progress = StoreProgress::new(list.len() as u64);
68ac365f 1433 let mut pull_stats = PullStats::default();
fc8920e3 1434
05a52d01
HL
1435 let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
1436
e2e7560d 1437 for (done, group) in list.into_iter().enumerate() {
fc8920e3
FG
1438 progress.done_groups = done as u64;
1439 progress.done_snapshots = 0;
1440 progress.group_snapshots = 0;
7b8aa893 1441
133d718f
WB
1442 let (owner, _lock_guard) =
1443 match params
05a52d01 1444 .target
133d718f 1445 .store
c06c1b4b 1446 .create_locked_backup_group(&target_ns, &group, &params.owner)
133d718f
WB
1447 {
1448 Ok(result) => result,
1449 Err(err) => {
1450 task_log!(
1451 worker,
1452 "sync group {} failed - group lock failed: {}",
1453 &group,
1454 err
1455 );
05a52d01
HL
1456 errors = true;
1457 // do not stop here, instead continue
1458 task_log!(worker, "create_locked_backup_group failed");
133d718f
WB
1459 continue;
1460 }
1461 };
30f73fa2 1462
07ad6470 1463 // permission check
6e9e6c7a 1464 if params.owner != owner {
e2956c60 1465 // only the owner is allowed to create additional snapshots
1ec0d70d
DM
1466 task_log!(
1467 worker,
e2e7560d 1468 "sync group {} failed - owner check failed ({} != {})",
ee0ea735
TL
1469 &group,
1470 params.owner,
1471 owner
1ec0d70d 1472 );
7b8aa893 1473 errors = true; // do not stop here, instead continue
68ac365f
CE
1474 } else {
1475 match pull_group(worker, params, namespace, &group, &mut progress).await {
1476 Ok(stats) => pull_stats.add(stats),
1477 Err(err) => {
1478 task_log!(worker, "sync group {} failed - {}", &group, err,);
1479 errors = true; // do not stop here, instead continue
1480 }
1481 }
07ad6470
DM
1482 }
1483 }
1484
6e9e6c7a 1485 if params.remove_vanished {
6ef1b649 1486 let result: Result<(), Error> = proxmox_lang::try_block!({
05a52d01 1487 for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
249dde8b 1488 let local_group = local_group?;
e13303fc
FG
1489 let local_group = local_group.group();
1490 if new_groups.contains(local_group) {
e2956c60
FG
1491 continue;
1492 }
05a52d01 1493 let owner = params.target.store.get_owner(&target_ns, local_group)?;
df768ebe
FG
1494 if check_backup_owner(&owner, &params.owner).is_err() {
1495 continue;
1496 }
59c92736
PH
1497 if !local_group.apply_filters(&params.group_filter) {
1498 continue;
71e53463 1499 }
e13303fc 1500 task_log!(worker, "delete vanished group '{local_group}'",);
524ed404 1501 let delete_stats_result = params
05a52d01
HL
1502 .target
1503 .store
524ed404
CE
1504 .remove_backup_group(&target_ns, local_group);
1505
1506 match delete_stats_result {
1507 Ok(stats) => {
1508 if !stats.all_removed() {
1509 task_log!(
1510 worker,
1511 "kept some protected snapshots of group '{local_group}'",
1512 );
1513 }
ee0ea735 1514 }
34339261
DC
1515 Err(err) => {
1516 task_log!(worker, "{}", err);
1517 errors = true;
1518 }
07ad6470
DM
1519 }
1520 }
1521 Ok(())
1522 });
1523 if let Err(err) = result {
1ec0d70d 1524 task_log!(worker, "error during cleanup: {}", err);
07ad6470
DM
1525 errors = true;
1526 };
1527 }
1528
68ac365f 1529 Ok((progress, pull_stats, errors))
07ad6470 1530}