]> git.proxmox.com Git - proxmox-backup.git/blame - src/server/pull.rs
include privilege names in check_privs error
[proxmox-backup.git] / src / server / pull.rs
CommitLineData
07ad6470
DM
1//! Sync datastore from remote server
2
e2956c60 3use std::collections::{HashMap, HashSet};
07ad6470 4use std::convert::TryFrom;
07ad6470 5use std::io::{Seek, SeekFrom};
54417086 6use std::sync::atomic::{AtomicUsize, Ordering};
e2956c60
FG
7use std::sync::{Arc, Mutex};
8use std::time::SystemTime;
07ad6470 9
c23192d3 10use anyhow::{bail, format_err, Error};
6ef1b649 11use http::StatusCode;
c06c1b4b 12use pbs_config::CachedUserInfo;
ee0ea735 13use serde_json::json;
c23192d3 14
6ef1b649 15use proxmox_router::HttpError;
d5790a9f 16use proxmox_sys::task_log;
c23192d3 17
2d5287fb 18use pbs_api_types::{
7d0dbaa0
FG
19 privs_to_priv_names, Authid, BackupNamespace, DatastoreWithNamespace, GroupFilter,
20 GroupListItem, NamespaceListItem, Operation, RateLimitConfig, Remote, SnapshotListItem,
21 MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY,
2d5287fb 22};
71e53463 23
ee0ea735
TL
24use pbs_client::{
25 BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
26};
ea584a75
WB
27use pbs_datastore::data_blob::DataBlob;
28use pbs_datastore::dynamic_index::DynamicIndexReader;
29use pbs_datastore::fixed_index::FixedIndexReader;
30use pbs_datastore::index::IndexFile;
31use pbs_datastore::manifest::{
ee0ea735 32 archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
ea584a75 33};
df768ebe 34use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
ba0ccc59 35use pbs_tools::sha::sha256;
b9700a9f 36use proxmox_rest_server::WorkerTask;
c23192d3 37
260147bd 38use crate::tools::parallel_handler::ParallelHandler;
07ad6470 39
29c56859 40/// Parameters for a pull operation.
6e9e6c7a 41pub struct PullParameters {
29c56859 42 /// Remote that is pulled from
6e9e6c7a 43 remote: Remote,
29c56859 44 /// Full specification of remote datastore
6e9e6c7a 45 source: BackupRepository,
29c56859 46 /// Local store that is pulled into
6e9e6c7a 47 store: Arc<DataStore>,
c06c1b4b
FG
48 /// Remote namespace
49 remote_ns: BackupNamespace,
50 /// Local namespace (anchor)
51 ns: BackupNamespace,
29c56859 52 /// Owner of synced groups (needs to match local owner of pre-existing groups)
6e9e6c7a 53 owner: Authid,
29c56859 54 /// Whether to remove groups which exist locally, but not on the remote end
6e9e6c7a 55 remove_vanished: bool,
b9310489
FG
56 /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion)
57 max_depth: Option<usize>,
29c56859 58 /// Filters for reducing the pull scope
71e53463 59 group_filter: Option<Vec<GroupFilter>>,
29c56859 60 /// Rate limits for all transfers from `remote`
2d5287fb 61 limit: RateLimitConfig,
6e9e6c7a
FG
62}
63
64impl PullParameters {
29c56859
FG
65 /// Creates a new instance of `PullParameters`.
66 ///
67 /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
68 /// [BackupRepository] with `remote_store`.
6e9e6c7a
FG
69 pub fn new(
70 store: &str,
c06c1b4b 71 ns: BackupNamespace,
6e9e6c7a
FG
72 remote: &str,
73 remote_store: &str,
c06c1b4b 74 remote_ns: BackupNamespace,
6e9e6c7a
FG
75 owner: Authid,
76 remove_vanished: Option<bool>,
b9310489 77 max_depth: Option<usize>,
71e53463 78 group_filter: Option<Vec<GroupFilter>>,
2d5287fb 79 limit: RateLimitConfig,
6e9e6c7a 80 ) -> Result<Self, Error> {
e9d2fc93 81 let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
6e9e6c7a 82
66abc4cb
FG
83 if let Some(max_depth) = max_depth {
84 ns.check_max_depth(max_depth)?;
85 remote_ns.check_max_depth(max_depth)?;
86 }
c06c1b4b 87
6e9e6c7a
FG
88 let (remote_config, _digest) = pbs_config::remote::config()?;
89 let remote: Remote = remote_config.lookup("remote", remote)?;
90
61ef4ae8 91 let remove_vanished = remove_vanished.unwrap_or(false);
6e9e6c7a
FG
92
93 let source = BackupRepository::new(
94 Some(remote.config.auth_id.clone()),
95 Some(remote.config.host.clone()),
96 remote.config.port,
97 remote_store.to_string(),
98 );
99
ee0ea735
TL
100 Ok(Self {
101 remote,
c06c1b4b
FG
102 remote_ns,
103 ns,
ee0ea735
TL
104 source,
105 store,
106 owner,
107 remove_vanished,
c06c1b4b 108 max_depth,
ee0ea735
TL
109 group_filter,
110 limit,
111 })
6e9e6c7a
FG
112 }
113
29c56859 114 /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
6e9e6c7a 115 pub async fn client(&self) -> Result<HttpClient, Error> {
2d5287fb 116 crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
6e9e6c7a 117 }
c06c1b4b
FG
118
119 /// Returns DatastoreWithNamespace with namespace (or local namespace anchor).
120 pub fn store_with_ns(&self, ns: BackupNamespace) -> DatastoreWithNamespace {
121 DatastoreWithNamespace {
122 store: self.store.name().to_string(),
123 ns,
124 }
125 }
6e9e6c7a
FG
126}
127
07ad6470 128async fn pull_index_chunks<I: IndexFile>(
998db639 129 worker: &WorkerTask,
73b2cc49 130 chunk_reader: RemoteChunkReader,
07ad6470
DM
131 target: Arc<DataStore>,
132 index: I,
e2956c60 133 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 134) -> Result<(), Error> {
73b2cc49 135 use futures::stream::{self, StreamExt, TryStreamExt};
07ad6470 136
998db639
DM
137 let start_time = SystemTime::now();
138
ebbe4958
DM
139 let stream = stream::iter(
140 (0..index.index_count())
141 .map(|pos| index.chunk_info(pos).unwrap())
142 .filter(|info| {
143 let mut guard = downloaded_chunks.lock().unwrap();
144 let done = guard.contains(&info.digest);
145 if !done {
146 // Note: We mark a chunk as downloaded before its actually downloaded
147 // to avoid duplicate downloads.
148 guard.insert(info.digest);
149 }
150 !done
e2956c60 151 }),
ebbe4958 152 );
07ad6470 153
a71bc08f 154 let target2 = target.clone();
54417086 155 let verify_pool = ParallelHandler::new(
e2956c60
FG
156 "sync chunk writer",
157 4,
158 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
25877d05 159 // println!("verify and write {}", hex::encode(&digest));
54417086 160 chunk.verify_unencrypted(size as usize, &digest)?;
a71bc08f 161 target2.insert_chunk(&chunk, &digest)?;
54417086 162 Ok(())
e2956c60 163 },
54417086
DM
164 );
165
166 let verify_and_write_channel = verify_pool.channel();
998db639 167
54417086
DM
168 let bytes = Arc::new(AtomicUsize::new(0));
169
170 stream
73b2cc49 171 .map(|info| {
73b2cc49
DM
172 let target = Arc::clone(&target);
173 let chunk_reader = chunk_reader.clone();
54417086
DM
174 let bytes = Arc::clone(&bytes);
175 let verify_and_write_channel = verify_and_write_channel.clone();
73b2cc49
DM
176
177 Ok::<_, Error>(async move {
9a1b24b6 178 let chunk_exists = proxmox_async::runtime::block_in_place(|| {
e2956c60
FG
179 target.cond_touch_chunk(&info.digest, false)
180 })?;
73b2cc49 181 if chunk_exists {
25877d05 182 //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest));
73b2cc49
DM
183 return Ok::<_, Error>(());
184 }
25877d05 185 //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest));
73b2cc49 186 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
54417086 187 let raw_size = chunk.raw_size() as usize;
73b2cc49 188
998db639 189 // decode, verify and write in a separate threads to maximize throughput
9a1b24b6 190 proxmox_async::runtime::block_in_place(|| {
e2956c60
FG
191 verify_and_write_channel.send((chunk, info.digest, info.size()))
192 })?;
54417086
DM
193
194 bytes.fetch_add(raw_size, Ordering::SeqCst);
998db639
DM
195
196 Ok(())
e2956c60 197 })
73b2cc49
DM
198 })
199 .try_buffer_unordered(20)
200 .try_for_each(|_res| futures::future::ok(()))
54417086 201 .await?;
998db639 202
54417086 203 drop(verify_and_write_channel);
998db639 204
54417086 205 verify_pool.complete()?;
998db639
DM
206
207 let elapsed = start_time.elapsed()?.as_secs_f64();
208
54417086
DM
209 let bytes = bytes.load(Ordering::SeqCst);
210
1ec0d70d 211 task_log!(
ee0ea735 212 worker,
e2956c60
FG
213 "downloaded {} bytes ({:.2} MiB/s)",
214 bytes,
215 (bytes as f64) / (1024.0 * 1024.0 * elapsed)
1ec0d70d 216 );
07ad6470
DM
217
218 Ok(())
219}
220
221async fn download_manifest(
222 reader: &BackupReader,
223 filename: &std::path::Path,
224) -> Result<std::fs::File, Error> {
3d571d55 225 let mut tmp_manifest_file = std::fs::OpenOptions::new()
07ad6470
DM
226 .write(true)
227 .create(true)
194da6f8 228 .truncate(true)
07ad6470
DM
229 .read(true)
230 .open(&filename)?;
231
e2956c60
FG
232 reader
233 .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
234 .await?;
07ad6470
DM
235
236 tmp_manifest_file.seek(SeekFrom::Start(0))?;
237
238 Ok(tmp_manifest_file)
239}
240
e2956c60 241fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
2ce15934 242 if size != info.size {
e2956c60
FG
243 bail!(
244 "wrong size for file '{}' ({} != {})",
245 info.filename,
246 info.size,
247 size
248 );
2ce15934
FG
249 }
250
251 if csum != &info.csum {
252 bail!("wrong checksum for file '{}'", info.filename);
253 }
254
255 Ok(())
256}
257
29c56859
FG
258/// Pulls a single file referenced by a manifest.
259///
260/// Pulling an archive consists of the following steps:
261/// - Create tmp file for archive
262/// - Download archive file into tmp file
263/// - Verify tmp file checksum
264/// - if archive is an index, pull referenced chunks
265/// - Rename tmp file into real path
07ad6470
DM
266async fn pull_single_archive(
267 worker: &WorkerTask,
268 reader: &BackupReader,
269 chunk_reader: &mut RemoteChunkReader,
c06c1b4b 270 snapshot: &pbs_datastore::BackupDir,
2ce15934 271 archive_info: &FileInfo,
e2956c60 272 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 273) -> Result<(), Error> {
2ce15934 274 let archive_name = &archive_info.filename;
c06c1b4b 275 let mut path = snapshot.full_path();
07ad6470
DM
276 path.push(archive_name);
277
278 let mut tmp_path = path.clone();
279 tmp_path.set_extension("tmp");
280
1ec0d70d
DM
281 task_log!(worker, "sync archive {}", archive_name);
282
3d571d55 283 let mut tmpfile = std::fs::OpenOptions::new()
07ad6470
DM
284 .write(true)
285 .create(true)
286 .read(true)
287 .open(&tmp_path)?;
288
3d571d55 289 reader.download(archive_name, &mut tmpfile).await?;
07ad6470
DM
290
291 match archive_type(archive_name)? {
292 ArchiveType::DynamicIndex => {
e2956c60
FG
293 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
294 format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
295 })?;
2ce15934
FG
296 let (csum, size) = index.compute_csum();
297 verify_archive(archive_info, &csum, size)?;
07ad6470 298
e2956c60
FG
299 pull_index_chunks(
300 worker,
301 chunk_reader.clone(),
c06c1b4b 302 snapshot.datastore().clone(),
e2956c60
FG
303 index,
304 downloaded_chunks,
305 )
306 .await?;
07ad6470
DM
307 }
308 ArchiveType::FixedIndex => {
e2956c60
FG
309 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
310 format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
311 })?;
2ce15934
FG
312 let (csum, size) = index.compute_csum();
313 verify_archive(archive_info, &csum, size)?;
07ad6470 314
e2956c60
FG
315 pull_index_chunks(
316 worker,
317 chunk_reader.clone(),
c06c1b4b 318 snapshot.datastore().clone(),
e2956c60
FG
319 index,
320 downloaded_chunks,
321 )
322 .await?;
07ad6470 323 }
2ce15934 324 ArchiveType::Blob => {
ba0ccc59
WB
325 tmpfile.seek(SeekFrom::Start(0))?;
326 let (csum, size) = sha256(&mut tmpfile)?;
2ce15934
FG
327 verify_archive(archive_info, &csum, size)?;
328 }
07ad6470
DM
329 }
330 if let Err(err) = std::fs::rename(&tmp_path, &path) {
331 bail!("Atomic rename file {:?} failed - {}", path, err);
332 }
333 Ok(())
334}
335
1610c45a
DM
336// Note: The client.log.blob is uploaded after the backup, so it is
337// not mentioned in the manifest.
338async fn try_client_log_download(
339 worker: &WorkerTask,
340 reader: Arc<BackupReader>,
341 path: &std::path::Path,
342) -> Result<(), Error> {
1610c45a
DM
343 let mut tmp_path = path.to_owned();
344 tmp_path.set_extension("tmp");
345
346 let tmpfile = std::fs::OpenOptions::new()
347 .write(true)
348 .create(true)
349 .read(true)
350 .open(&tmp_path)?;
351
add5861e 352 // Note: be silent if there is no log - only log successful download
3d571d55 353 if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
1610c45a
DM
354 if let Err(err) = std::fs::rename(&tmp_path, &path) {
355 bail!("Atomic rename file {:?} failed - {}", path, err);
356 }
1ec0d70d 357 task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
1610c45a
DM
358 }
359
360 Ok(())
361}
362
29c56859
FG
363/// Actual implementation of pulling a snapshot.
364///
365/// Pulling a snapshot consists of the following steps:
366/// - (Re)download the manifest
367/// -- if it matches, only download log and treat snapshot as already synced
368/// - Iterate over referenced files
369/// -- if file already exists, verify contents
370/// -- if not, pull it from the remote
371/// - Download log if not already existing
07ad6470
DM
372async fn pull_snapshot(
373 worker: &WorkerTask,
374 reader: Arc<BackupReader>,
c06c1b4b 375 snapshot: &pbs_datastore::BackupDir,
e2956c60 376 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 377) -> Result<(), Error> {
c06c1b4b 378 let mut manifest_name = snapshot.full_path();
07ad6470
DM
379 manifest_name.push(MANIFEST_BLOB_NAME);
380
c06c1b4b 381 let mut client_log_name = snapshot.full_path();
1610c45a
DM
382 client_log_name.push(CLIENT_LOG_BLOB_NAME);
383
07ad6470
DM
384 let mut tmp_manifest_name = manifest_name.clone();
385 tmp_manifest_name.set_extension("tmp");
386
c1c4a18f
FG
387 let download_res = download_manifest(&reader, &tmp_manifest_name).await;
388 let mut tmp_manifest_file = match download_res {
389 Ok(manifest_file) => manifest_file,
390 Err(err) => {
391 match err.downcast_ref::<HttpError>() {
e2956c60
FG
392 Some(HttpError { code, message }) => match *code {
393 StatusCode::NOT_FOUND => {
1ec0d70d
DM
394 task_log!(
395 worker,
1afce610
FG
396 "skipping snapshot {} - vanished since start of sync",
397 snapshot.dir(),
1ec0d70d 398 );
e2956c60
FG
399 return Ok(());
400 }
401 _ => {
87cdc327 402 bail!("HTTP error {code} - {message}");
c1c4a18f
FG
403 }
404 },
405 None => {
406 return Err(err);
e2956c60 407 }
c1c4a18f 408 };
e2956c60 409 }
c1c4a18f 410 };
39f18b30 411 let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
07ad6470
DM
412
413 if manifest_name.exists() {
6ef1b649 414 let manifest_blob = proxmox_lang::try_block!({
e2956c60 415 let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
87cdc327 416 format_err!("unable to open local manifest {manifest_name:?} - {err}")
e2956c60 417 })?;
07ad6470 418
39f18b30 419 let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
07ad6470 420 Ok(manifest_blob)
e2956c60
FG
421 })
422 .map_err(|err: Error| {
87cdc327 423 format_err!("unable to read local manifest {manifest_name:?} - {err}")
07ad6470
DM
424 })?;
425
426 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
1610c45a
DM
427 if !client_log_name.exists() {
428 try_client_log_download(worker, reader, &client_log_name).await?;
429 }
1ec0d70d 430 task_log!(worker, "no data changes");
e0085e66 431 let _ = std::fs::remove_file(&tmp_manifest_name);
07ad6470
DM
432 return Ok(()); // nothing changed
433 }
434 }
435
436 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
437
07ad6470 438 for item in manifest.files() {
c06c1b4b 439 let mut path = snapshot.full_path();
07ad6470
DM
440 path.push(&item.filename);
441
442 if path.exists() {
443 match archive_type(&item.filename)? {
444 ArchiveType::DynamicIndex => {
445 let index = DynamicIndexReader::open(&path)?;
446 let (csum, size) = index.compute_csum();
447 match manifest.verify_file(&item.filename, &csum, size) {
448 Ok(_) => continue,
449 Err(err) => {
1ec0d70d 450 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
451 }
452 }
453 }
454 ArchiveType::FixedIndex => {
455 let index = FixedIndexReader::open(&path)?;
456 let (csum, size) = index.compute_csum();
457 match manifest.verify_file(&item.filename, &csum, size) {
458 Ok(_) => continue,
459 Err(err) => {
1ec0d70d 460 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
461 }
462 }
463 }
464 ArchiveType::Blob => {
465 let mut tmpfile = std::fs::File::open(&path)?;
ba0ccc59 466 let (csum, size) = sha256(&mut tmpfile)?;
07ad6470
DM
467 match manifest.verify_file(&item.filename, &csum, size) {
468 Ok(_) => continue,
469 Err(err) => {
1ec0d70d 470 task_log!(worker, "detected changed file {:?} - {}", path, err);
07ad6470
DM
471 }
472 }
473 }
474 }
475 }
476
e2956c60
FG
477 let mut chunk_reader = RemoteChunkReader::new(
478 reader.clone(),
479 None,
480 item.chunk_crypt_mode(),
481 HashMap::new(),
482 );
14f6c9cb 483
07ad6470
DM
484 pull_single_archive(
485 worker,
486 &reader,
487 &mut chunk_reader,
07ad6470 488 snapshot,
9a37bd6c 489 item,
ebbe4958 490 downloaded_chunks.clone(),
e2956c60
FG
491 )
492 .await?;
07ad6470
DM
493 }
494
495 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
496 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
497 }
498
1610c45a
DM
499 if !client_log_name.exists() {
500 try_client_log_download(worker, reader, &client_log_name).await?;
501 }
502
fe79687c
TL
503 snapshot
504 .cleanup_unreferenced_files(&manifest)
505 .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
07ad6470
DM
506
507 Ok(())
508}
509
c06c1b4b
FG
510/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
511///
512/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
513/// pointing to the local datastore and target namespace.
aa073917 514async fn pull_snapshot_from(
07ad6470
DM
515 worker: &WorkerTask,
516 reader: Arc<BackupReader>,
c06c1b4b 517 snapshot: &pbs_datastore::BackupDir,
e2956c60 518 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 519) -> Result<(), Error> {
c06c1b4b
FG
520 let (_path, is_new, _snap_lock) = snapshot
521 .datastore()
1afce610 522 .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
07ad6470
DM
523
524 if is_new {
1afce610 525 task_log!(worker, "sync snapshot {}", snapshot.dir());
07ad6470 526
c06c1b4b 527 if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
1afce610
FG
528 if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
529 snapshot.backup_ns(),
530 snapshot.as_ref(),
531 true,
532 ) {
1ec0d70d 533 task_log!(worker, "cleanup error - {}", cleanup_err);
07ad6470
DM
534 }
535 return Err(err);
536 }
1afce610 537 task_log!(worker, "sync snapshot {} done", snapshot.dir());
07ad6470 538 } else {
1afce610 539 task_log!(worker, "re-sync snapshot {}", snapshot.dir());
c06c1b4b 540 pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
1afce610 541 task_log!(worker, "re-sync snapshot {} done", snapshot.dir());
07ad6470
DM
542 }
543
544 Ok(())
545}
546
d2354a16
DC
547struct SkipInfo {
548 oldest: i64,
549 newest: i64,
550 count: u64,
551}
552
553impl SkipInfo {
554 fn update(&mut self, backup_time: i64) {
555 self.count += 1;
556
557 if backup_time < self.oldest {
558 self.oldest = backup_time;
559 }
560
561 if backup_time > self.newest {
562 self.newest = backup_time;
563 }
564 }
565
566 fn affected(&self) -> Result<String, Error> {
567 match self.count {
568 0 => Ok(String::new()),
6ef1b649 569 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
ee0ea735
TL
570 _ => Ok(format!(
571 "{} .. {}",
572 proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
573 proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
574 )),
d2354a16
DC
575 }
576 }
577}
578
579impl std::fmt::Display for SkipInfo {
580 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
581 write!(
582 f,
583 "skipped: {} snapshot(s) ({}) older than the newest local snapshot",
584 self.count,
585 self.affected().map_err(|_| std::fmt::Error)?
586 )
587 }
588}
589
29c56859
FG
590/// Pulls a group according to `params`.
591///
592/// Pulling a group consists of the following steps:
c06c1b4b
FG
593/// - Query the list of snapshots available for this group in the source namespace on the remote
594/// - Sort by snapshot time
29c56859
FG
595/// - Get last snapshot timestamp on local datastore
596/// - Iterate over list of snapshots
597/// -- Recreate client/BackupReader
598/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
599/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
600///
c06c1b4b
FG
601/// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the
602/// remote when querying snapshots. This allows us to interact with old remotes that don't have
603/// namespace support yet.
604///
29c56859
FG
605/// Permission checks:
606/// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
607/// - local group owner is already checked by pull_store
aa073917 608async fn pull_group(
07ad6470
DM
609 worker: &WorkerTask,
610 client: &HttpClient,
6e9e6c7a 611 params: &PullParameters,
db87d93e 612 group: &pbs_api_types::BackupGroup,
c06c1b4b 613 remote_ns: BackupNamespace,
fc8920e3 614 progress: &mut StoreProgress,
07ad6470 615) -> Result<(), Error> {
ee0ea735
TL
616 let path = format!(
617 "api2/json/admin/datastore/{}/snapshots",
618 params.source.store()
619 );
07ad6470 620
c06c1b4b 621 let mut args = json!({
db87d93e
WB
622 "backup-type": group.ty,
623 "backup-id": group.id,
07ad6470
DM
624 });
625
c06c1b4b 626 if !remote_ns.is_root() {
bc21ade2 627 args["ns"] = serde_json::to_value(&remote_ns)?;
c06c1b4b
FG
628 }
629
630 let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;
631
07ad6470
DM
632 let mut result = client.get(&path, Some(args)).await?;
633 let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
634
988d575d 635 list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
07ad6470 636
0081903f
DM
637 client.login().await?; // make sure auth is complete
638
07ad6470
DM
639 let fingerprint = client.fingerprint();
640
c06c1b4b 641 let last_sync = params.store.last_successful_backup(&target_ns, group)?;
07ad6470
DM
642
643 let mut remote_snapshots = std::collections::HashSet::new();
644
f37d8540 645 // start with 65536 chunks (up to 256 GiB)
e2956c60 646 let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
ebbe4958 647
fc8920e3 648 progress.group_snapshots = list.len() as u64;
7b8aa893 649
d2354a16
DC
650 let mut skip_info = SkipInfo {
651 oldest: i64::MAX,
652 newest: i64::MIN,
653 count: 0,
654 };
655
7b8aa893 656 for (pos, item) in list.into_iter().enumerate() {
db87d93e 657 let snapshot = item.backup;
86f6f741
FG
658
659 // in-progress backups can't be synced
54aec2fa 660 if item.size.is_none() {
ee0ea735
TL
661 task_log!(
662 worker,
663 "skipping snapshot {} - in-progress backup",
664 snapshot
665 );
86f6f741
FG
666 continue;
667 }
668
8c74349b 669 remote_snapshots.insert(snapshot.time);
07ad6470
DM
670
671 if let Some(last_sync_time) = last_sync {
8c74349b
WB
672 if last_sync_time > snapshot.time {
673 skip_info.update(snapshot.time);
e2956c60
FG
674 continue;
675 }
07ad6470
DM
676 }
677
0081903f
DM
678 // get updated auth_info (new tickets)
679 let auth_info = client.login().await?;
680
ee0ea735
TL
681 let options =
682 HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
683 .rate_limit(params.limit.clone());
07ad6470 684
e2956c60 685 let new_client = HttpClient::new(
6e9e6c7a
FG
686 params.source.host(),
687 params.source.port(),
688 params.source.auth_id(),
e2956c60
FG
689 options,
690 )?;
07ad6470 691
133d718f
WB
692 let reader = BackupReader::start(
693 new_client,
694 None,
695 params.source.store(),
c06c1b4b 696 &remote_ns,
133d718f
WB
697 &snapshot,
698 true,
699 )
700 .await?;
07ad6470 701
c06c1b4b
FG
702 let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
703
704 let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
7b8aa893 705
fc8920e3 706 progress.done_snapshots = pos as u64 + 1;
1ec0d70d 707 task_log!(worker, "percentage done: {}", progress);
7b8aa893
DM
708
709 result?; // stop on error
07ad6470
DM
710 }
711
6e9e6c7a 712 if params.remove_vanished {
c06c1b4b 713 let group = params.store.backup_group(target_ns.clone(), group.clone());
6da20161 714 let local_list = group.list_backups()?;
07ad6470 715 for info in local_list {
1afce610
FG
716 let snapshot = info.backup_dir;
717 if remote_snapshots.contains(&snapshot.backup_time()) {
e2956c60
FG
718 continue;
719 }
1afce610 720 if snapshot.is_protected() {
34339261
DC
721 task_log!(
722 worker,
e3ea5770 723 "don't delete vanished snapshot {} (protected)",
1afce610 724 snapshot.dir()
34339261
DC
725 );
726 continue;
727 }
1afce610 728 task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
db87d93e
WB
729 params
730 .store
1afce610 731 .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
07ad6470
DM
732 }
733 }
734
d2354a16
DC
735 if skip_info.count > 0 {
736 task_log!(worker, "{}", skip_info);
737 }
738
07ad6470
DM
739 Ok(())
740}
741
7a3e777d 742// will modify params if switching to backwards mode for lack of NS support on remote end
c06c1b4b 743async fn query_namespaces(
7a3e777d 744 worker: &WorkerTask,
c06c1b4b 745 client: &HttpClient,
7a3e777d 746 params: &mut PullParameters,
c06c1b4b
FG
747) -> Result<Vec<BackupNamespace>, Error> {
748 let path = format!(
749 "api2/json/admin/datastore/{}/namespace",
750 params.source.store()
751 );
11567dfb
FG
752 let mut data = json!({});
753 if let Some(max_depth) = params.max_depth {
754 data["max-depth"] = json!(max_depth);
755 }
756
757 if !params.remote_ns.is_root() {
758 data["parent"] = json!(params.remote_ns);
759 }
7a3e777d 760
11567dfb 761 let mut result = match client.get(&path, Some(data)).await {
7a3e777d
FG
762 Ok(res) => res,
763 Err(err) => match err.downcast_ref::<HttpError>() {
764 Some(HttpError { code, message }) => match *code {
765 StatusCode::NOT_FOUND => {
766 if params.remote_ns.is_root() && params.max_depth.is_none() {
767 task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
768 task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
769 params.max_depth = Some(0);
770 } else {
771 bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
772 }
773
774 return Ok(vec![params.remote_ns.clone()]);
775 }
776 _ => {
777 bail!("Querying namespaces failed - HTTP error {code} - {message}");
778 }
779 },
780 None => {
781 bail!("Querying namespaces failed - {err}");
782 }
783 },
784 };
785
c06c1b4b
FG
786 let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
787
788 // parents first
789 list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
790
791 Ok(list.iter().map(|item| item.ns.clone()).collect())
792}
793
794fn check_ns_privs(
795 store_with_ns: &DatastoreWithNamespace,
796 owner: &Authid,
797 privs: u64,
798) -> Result<(), Error> {
799 let user_info = CachedUserInfo::new()?;
800
801 // TODO re-sync with API, maybe find common place?
802
7d0dbaa0
FG
803 let path = &store_with_ns.acl_path();
804 let user_privs = user_info.lookup_privs(owner, path);
c06c1b4b
FG
805
806 if (user_privs & privs) == 0 {
7d0dbaa0
FG
807 let priv_names = privs_to_priv_names(privs).join("|");
808 let path = path.join("/");
809 bail!("privilege(s) {priv_names} missing on /{path}");
c06c1b4b
FG
810 }
811 Ok(())
812}
813
814fn check_and_create_ns(
815 params: &PullParameters,
816 store_with_ns: &DatastoreWithNamespace,
817) -> Result<bool, Error> {
818 let ns = &store_with_ns.ns;
819 let mut created = false;
820
821 if !ns.is_root() && !params.store.namespace_path(&ns).exists() {
822 let mut parent = ns.clone();
823 let name = parent.pop();
824
825 let parent = params.store_with_ns(parent);
826
827 if let Err(err) = check_ns_privs(&parent, &params.owner, PRIV_DATASTORE_MODIFY) {
828 bail!(
829 "Not allowed to create namespace {} - {}",
830 store_with_ns,
831 err,
832 );
833 }
834 if let Some(name) = name {
835 if let Err(err) = params.store.create_namespace(&parent.ns, name) {
836 bail!(
837 "sync namespace {} failed - namespace creation failed: {}",
838 &store_with_ns,
839 err
840 );
841 }
842 created = true;
843 } else {
844 bail!(
845 "sync namespace {} failed - namespace creation failed - couldn't determine parent namespace",
846 &store_with_ns,
847 );
848 }
849 }
850
851 // TODO re-sync with API, maybe find common place?
852 if let Err(err) = check_ns_privs(&store_with_ns, &params.owner, PRIV_DATASTORE_BACKUP) {
853 bail!("sync namespace {} failed - {}", &store_with_ns, err);
854 }
855
856 Ok(created)
857}
858
859fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
860 let parent = local_ns.clone().parent();
861 check_ns_privs(
862 &params.store_with_ns(parent),
863 &params.owner,
864 PRIV_DATASTORE_MODIFY,
865 )?;
d1f9ccea 866 params.store.remove_namespace_recursive(local_ns, true)
c06c1b4b
FG
867}
868
869fn check_and_remove_vanished_ns(
870 worker: &WorkerTask,
871 params: &PullParameters,
872 synced_ns: HashSet<BackupNamespace>,
873) -> Result<bool, Error> {
874 let mut errors = false;
875 let user_info = CachedUserInfo::new()?;
876
87be232d
FG
877 // clamp like remote does so that we don't list more than we can ever have synced.
878 let max_depth = params
879 .max_depth
880 .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
881
c06c1b4b
FG
882 let mut local_ns_list: Vec<BackupNamespace> = params
883 .store
87be232d 884 .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
c06c1b4b
FG
885 .filter(|ns| {
886 let store_with_ns = params.store_with_ns(ns.clone());
887 let user_privs = user_info.lookup_privs(&params.owner, &store_with_ns.acl_path());
888 user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
889 })
890 .collect();
891
892 // children first!
893 local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
894
895 for local_ns in local_ns_list {
896 if local_ns == params.ns {
897 continue;
898 }
899
900 if synced_ns.contains(&local_ns) {
901 continue;
902 }
903
904 if local_ns.is_root() {
905 continue;
906 }
907 match check_and_remove_ns(params, &local_ns) {
908 Ok(true) => task_log!(worker, "Removed namespace {}", local_ns),
909 Ok(false) => task_log!(
910 worker,
911 "Did not remove namespace {} - protected snapshots remain",
912 local_ns
913 ),
914 Err(err) => {
915 task_log!(worker, "Failed to remove namespace {} - {}", local_ns, err);
916 errors = true;
917 }
918 }
919 }
920
921 Ok(errors)
922}
923
29c56859
FG
924/// Pulls a store according to `params`.
925///
926/// Pulling a store consists of the following steps:
c06c1b4b
FG
927/// - Query list of namespaces on the remote
928/// - Iterate list
929/// -- create sub-NS if needed (and allowed)
930/// -- attempt to pull each NS in turn
931/// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote
932///
933/// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is
934/// passed to the remote at all to allow pulling from remotes which have no notion of namespaces.
29c56859
FG
935///
936/// Permission checks:
c06c1b4b
FG
937/// - access to local datastore, namespace anchor and remote entry need to be checked at call site
938/// - remote namespaces are filtered by remote
939/// - creation and removal of sub-NS checked here
940/// - access to sub-NS checked here
07ad6470
DM
941pub async fn pull_store(
942 worker: &WorkerTask,
943 client: &HttpClient,
d9aad37f 944 mut params: PullParameters,
07ad6470 945) -> Result<(), Error> {
07ad6470 946 // explicit create shared lock to prevent GC on newly created chunks
6e9e6c7a 947 let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
7a3e777d 948 let mut errors = false;
07ad6470 949
7a3e777d 950 let old_max_depth = params.max_depth;
b9310489 951 let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
c06c1b4b
FG
952 vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
953 } else {
7a3e777d 954 query_namespaces(worker, client, &mut params).await?
c06c1b4b 955 };
7a3e777d 956 errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
c06c1b4b
FG
957
958 let (mut groups, mut snapshots) = (0, 0);
959 let mut synced_ns = HashSet::with_capacity(namespaces.len());
c06c1b4b
FG
960
961 for namespace in namespaces {
962 let source_store_ns = DatastoreWithNamespace {
963 store: params.source.store().to_owned(),
964 ns: namespace.clone(),
965 };
966 let target_ns = namespace.map_prefix(&params.remote_ns, &params.ns)?;
967 let target_store_ns = params.store_with_ns(target_ns.clone());
968
969 task_log!(worker, "----");
970 task_log!(
971 worker,
972 "Syncing {} into {}",
973 source_store_ns,
974 target_store_ns
975 );
976
977 synced_ns.insert(target_ns.clone());
978
d9aad37f 979 match check_and_create_ns(&params, &target_store_ns) {
c06c1b4b
FG
980 Ok(true) => task_log!(worker, "Created namespace {}", target_ns),
981 Ok(false) => {}
982 Err(err) => {
983 task_log!(
984 worker,
985 "Cannot sync {} into {} - {}",
986 source_store_ns,
987 target_store_ns,
988 err,
989 );
990 errors = true;
991 continue;
992 }
993 }
994
d9aad37f 995 match pull_ns(worker, client, &params, namespace.clone(), target_ns).await {
c06c1b4b
FG
996 Ok((ns_progress, ns_errors)) => {
997 errors |= ns_errors;
998
b9310489 999 if params.max_depth != Some(0) {
c06c1b4b
FG
1000 groups += ns_progress.done_groups;
1001 snapshots += ns_progress.done_snapshots;
1002 task_log!(
1003 worker,
1004 "Finished syncing namespace {}, current progress: {} groups, {} snapshots",
1005 namespace,
1006 groups,
1007 snapshots,
1008 );
1009 }
1010 }
1011 Err(err) => {
1012 errors = true;
1013 task_log!(
1014 worker,
1015 "Encountered errors while syncing namespace {} - {}",
1016 namespace,
1017 err,
1018 );
1019 }
1020 };
1021 }
1022
1023 if params.remove_vanished {
d9aad37f 1024 errors |= check_and_remove_vanished_ns(worker, &params, synced_ns)?;
c06c1b4b
FG
1025 }
1026
1027 if errors {
1028 bail!("sync failed with some errors.");
1029 }
1030
1031 Ok(())
1032}
1033
1034/// Pulls a namespace according to `params`.
1035///
1036/// Pulling a namespace consists of the following steps:
1037/// - Query list of groups on the remote (in `source_ns`)
1038/// - Filter list according to configured group filters
1039/// - Iterate list and attempt to pull each group in turn
1040/// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
1041/// not or no longer available on the remote
1042///
1043/// Permission checks:
1044/// - remote namespaces are filtered by remote
1045/// - owner check for vanished groups done here
1046pub async fn pull_ns(
1047 worker: &WorkerTask,
1048 client: &HttpClient,
1049 params: &PullParameters,
1050 source_ns: BackupNamespace,
1051 target_ns: BackupNamespace,
1052) -> Result<(StoreProgress, bool), Error> {
6e9e6c7a 1053 let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
07ad6470 1054
c06c1b4b
FG
1055 let args = if !source_ns.is_root() {
1056 Some(json!({
bc21ade2 1057 "ns": source_ns,
c06c1b4b
FG
1058 }))
1059 } else {
1060 None
1061 };
1062
44de5bcc 1063 let mut result = client
c06c1b4b 1064 .get(&path, args)
44de5bcc
FG
1065 .await
1066 .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
07ad6470
DM
1067
1068 let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
1069
71e53463 1070 let total_count = list.len();
07ad6470 1071 list.sort_unstable_by(|a, b| {
988d575d 1072 let type_order = a.backup.ty.cmp(&b.backup.ty);
07ad6470 1073 if type_order == std::cmp::Ordering::Equal {
988d575d 1074 a.backup.id.cmp(&b.backup.id)
07ad6470
DM
1075 } else {
1076 type_order
1077 }
1078 });
1079
db87d93e 1080 let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
ee0ea735 1081 filters.iter().any(|filter| group.matches(filter))
71e53463
FG
1082 };
1083
c06c1b4b 1084 // Get groups with target NS set
db87d93e 1085 let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
e2e7560d 1086
71e53463
FG
1087 let list = if let Some(ref group_filter) = &params.group_filter {
1088 let unfiltered_count = list.len();
db87d93e 1089 let list: Vec<pbs_api_types::BackupGroup> = list
71e53463 1090 .into_iter()
ee0ea735 1091 .filter(|group| apply_filters(group, group_filter))
71e53463 1092 .collect();
ee0ea735
TL
1093 task_log!(
1094 worker,
1095 "found {} groups to sync (out of {} total)",
1096 list.len(),
1097 unfiltered_count
1098 );
71e53463
FG
1099 list
1100 } else {
1101 task_log!(worker, "found {} groups to sync", total_count);
1102 list
1103 };
1104
07ad6470
DM
1105 let mut errors = false;
1106
1107 let mut new_groups = std::collections::HashSet::new();
e2e7560d
FG
1108 for group in list.iter() {
1109 new_groups.insert(group.clone());
07ad6470
DM
1110 }
1111
fc8920e3
FG
1112 let mut progress = StoreProgress::new(list.len() as u64);
1113
e2e7560d 1114 for (done, group) in list.into_iter().enumerate() {
fc8920e3
FG
1115 progress.done_groups = done as u64;
1116 progress.done_snapshots = 0;
1117 progress.group_snapshots = 0;
7b8aa893 1118
133d718f
WB
1119 let (owner, _lock_guard) =
1120 match params
1121 .store
c06c1b4b 1122 .create_locked_backup_group(&target_ns, &group, &params.owner)
133d718f
WB
1123 {
1124 Ok(result) => result,
1125 Err(err) => {
1126 task_log!(
1127 worker,
1128 "sync group {} failed - group lock failed: {}",
1129 &group,
1130 err
1131 );
1132 errors = true; // do not stop here, instead continue
1133 continue;
1134 }
1135 };
30f73fa2 1136
07ad6470 1137 // permission check
6e9e6c7a 1138 if params.owner != owner {
e2956c60 1139 // only the owner is allowed to create additional snapshots
1ec0d70d
DM
1140 task_log!(
1141 worker,
e2e7560d 1142 "sync group {} failed - owner check failed ({} != {})",
ee0ea735
TL
1143 &group,
1144 params.owner,
1145 owner
1ec0d70d 1146 );
7b8aa893 1147 errors = true; // do not stop here, instead continue
c06c1b4b
FG
1148 } else if let Err(err) = pull_group(
1149 worker,
1150 client,
1151 params,
1152 &group,
1153 source_ns.clone(),
1154 &mut progress,
1155 )
1156 .await
1157 {
ee0ea735 1158 task_log!(worker, "sync group {} failed - {}", &group, err,);
20813274 1159 errors = true; // do not stop here, instead continue
07ad6470
DM
1160 }
1161 }
1162
6e9e6c7a 1163 if params.remove_vanished {
6ef1b649 1164 let result: Result<(), Error> = proxmox_lang::try_block!({
c06c1b4b 1165 for local_group in params.store.iter_backup_groups(target_ns.clone())? {
249dde8b 1166 let local_group = local_group?;
e13303fc
FG
1167 let local_group = local_group.group();
1168 if new_groups.contains(local_group) {
e2956c60
FG
1169 continue;
1170 }
e13303fc 1171 let owner = params.store.get_owner(&target_ns, local_group)?;
df768ebe
FG
1172 if check_backup_owner(&owner, &params.owner).is_err() {
1173 continue;
1174 }
71e53463 1175 if let Some(ref group_filter) = &params.group_filter {
e13303fc 1176 if !apply_filters(local_group, group_filter) {
71e53463
FG
1177 continue;
1178 }
1179 }
e13303fc
FG
1180 task_log!(worker, "delete vanished group '{local_group}'",);
1181 match params.store.remove_backup_group(&target_ns, local_group) {
ee0ea735 1182 Ok(true) => {}
34339261 1183 Ok(false) => {
ee0ea735
TL
1184 task_log!(
1185 worker,
1186 "kept some protected snapshots of group '{}'",
1187 local_group
1188 );
1189 }
34339261
DC
1190 Err(err) => {
1191 task_log!(worker, "{}", err);
1192 errors = true;
1193 }
07ad6470
DM
1194 }
1195 }
1196 Ok(())
1197 });
1198 if let Err(err) = result {
1ec0d70d 1199 task_log!(worker, "error during cleanup: {}", err);
07ad6470
DM
1200 errors = true;
1201 };
1202 }
1203
c06c1b4b 1204 Ok((progress, errors))
07ad6470 1205}