]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/pull.rs
gc: don't limit index listing to same filesystem
[proxmox-backup.git] / src / client / pull.rs
CommitLineData
07ad6470
DM
1//! Sync datastore from remote server
2
3use anyhow::{bail, format_err, Error};
4use serde_json::json;
5use std::convert::TryFrom;
ebbe4958
DM
6use std::sync::{Arc, Mutex};
7use std::collections::{HashSet, HashMap};
07ad6470 8use std::io::{Seek, SeekFrom};
998db639 9use std::time::SystemTime;
54417086 10use std::sync::atomic::{AtomicUsize, Ordering};
07ad6470 11
c1c4a18f 12use proxmox::api::error::{StatusCode, HttpError};
1bc1d81a 13use crate::{
54417086 14 tools::{ParallelHandler, compute_file_csum},
1bc1d81a
DM
15 server::WorkerTask,
16 backup::*,
17 api2::types::*,
18 client::*,
19};
07ad6470
DM
20
21
22// fixme: implement filters
23// fixme: delete vanished groups
24// Todo: correctly lock backup groups
25
26async fn pull_index_chunks<I: IndexFile>(
998db639 27 worker: &WorkerTask,
73b2cc49 28 chunk_reader: RemoteChunkReader,
07ad6470
DM
29 target: Arc<DataStore>,
30 index: I,
ebbe4958 31 downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
07ad6470
DM
32) -> Result<(), Error> {
33
73b2cc49 34 use futures::stream::{self, StreamExt, TryStreamExt};
07ad6470 35
998db639
DM
36 let start_time = SystemTime::now();
37
ebbe4958
DM
38 let stream = stream::iter(
39 (0..index.index_count())
40 .map(|pos| index.chunk_info(pos).unwrap())
41 .filter(|info| {
42 let mut guard = downloaded_chunks.lock().unwrap();
43 let done = guard.contains(&info.digest);
44 if !done {
45 // Note: We mark a chunk as downloaded before its actually downloaded
46 // to avoid duplicate downloads.
47 guard.insert(info.digest);
48 }
49 !done
50 })
51 );
07ad6470 52
a71bc08f 53 let target2 = target.clone();
54417086
DM
54 let verify_pool = ParallelHandler::new(
55 "sync chunk writer", 4,
a71bc08f 56 move |(chunk, digest, size): (DataBlob, [u8;32], u64)| {
54417086
DM
57 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
58 chunk.verify_unencrypted(size as usize, &digest)?;
a71bc08f 59 target2.insert_chunk(&chunk, &digest)?;
54417086
DM
60 Ok(())
61 }
62 );
63
64 let verify_and_write_channel = verify_pool.channel();
998db639 65
54417086
DM
66 let bytes = Arc::new(AtomicUsize::new(0));
67
68 stream
73b2cc49 69 .map(|info| {
9e496ff6 70
73b2cc49
DM
71 let target = Arc::clone(&target);
72 let chunk_reader = chunk_reader.clone();
54417086
DM
73 let bytes = Arc::clone(&bytes);
74 let verify_and_write_channel = verify_and_write_channel.clone();
73b2cc49
DM
75
76 Ok::<_, Error>(async move {
77 let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?;
78 if chunk_exists {
79 //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
80 return Ok::<_, Error>(());
81 }
82 //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
83 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
54417086 84 let raw_size = chunk.raw_size() as usize;
73b2cc49 85
998db639 86 // decode, verify and write in a separate threads to maximize throughput
54417086
DM
87 crate::tools::runtime::block_in_place(|| verify_and_write_channel.send((chunk, info.digest, info.size())))?;
88
89 bytes.fetch_add(raw_size, Ordering::SeqCst);
998db639
DM
90
91 Ok(())
92 })
73b2cc49
DM
93 })
94 .try_buffer_unordered(20)
95 .try_for_each(|_res| futures::future::ok(()))
54417086 96 .await?;
998db639 97
54417086 98 drop(verify_and_write_channel);
998db639 99
54417086 100 verify_pool.complete()?;
998db639
DM
101
102 let elapsed = start_time.elapsed()?.as_secs_f64();
103
54417086
DM
104 let bytes = bytes.load(Ordering::SeqCst);
105
42ca9e91 106 worker.log(format!("downloaded {} bytes ({:.2} MiB/s)", bytes, (bytes as f64)/(1024.0*1024.0*elapsed)));
07ad6470
DM
107
108 Ok(())
109}
110
111async fn download_manifest(
112 reader: &BackupReader,
113 filename: &std::path::Path,
114) -> Result<std::fs::File, Error> {
115
3d571d55 116 let mut tmp_manifest_file = std::fs::OpenOptions::new()
07ad6470
DM
117 .write(true)
118 .create(true)
194da6f8 119 .truncate(true)
07ad6470
DM
120 .read(true)
121 .open(&filename)?;
122
3d571d55 123 reader.download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file).await?;
07ad6470
DM
124
125 tmp_manifest_file.seek(SeekFrom::Start(0))?;
126
127 Ok(tmp_manifest_file)
128}
129
2ce15934
FG
130fn verify_archive(
131 info: &FileInfo,
132 csum: &[u8; 32],
133 size: u64,
134) -> Result<(), Error> {
135 if size != info.size {
136 bail!("wrong size for file '{}' ({} != {})", info.filename, info.size, size);
137 }
138
139 if csum != &info.csum {
140 bail!("wrong checksum for file '{}'", info.filename);
141 }
142
143 Ok(())
144}
145
07ad6470
DM
146async fn pull_single_archive(
147 worker: &WorkerTask,
148 reader: &BackupReader,
149 chunk_reader: &mut RemoteChunkReader,
150 tgt_store: Arc<DataStore>,
151 snapshot: &BackupDir,
2ce15934 152 archive_info: &FileInfo,
ebbe4958 153 downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
07ad6470
DM
154) -> Result<(), Error> {
155
2ce15934 156 let archive_name = &archive_info.filename;
07ad6470
DM
157 let mut path = tgt_store.base_path();
158 path.push(snapshot.relative_path());
159 path.push(archive_name);
160
161 let mut tmp_path = path.clone();
162 tmp_path.set_extension("tmp");
163
164 worker.log(format!("sync archive {}", archive_name));
3d571d55 165 let mut tmpfile = std::fs::OpenOptions::new()
07ad6470
DM
166 .write(true)
167 .create(true)
168 .read(true)
169 .open(&tmp_path)?;
170
3d571d55 171 reader.download(archive_name, &mut tmpfile).await?;
07ad6470
DM
172
173 match archive_type(archive_name)? {
174 ArchiveType::DynamicIndex => {
175 let index = DynamicIndexReader::new(tmpfile)
176 .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?;
2ce15934
FG
177 let (csum, size) = index.compute_csum();
178 verify_archive(archive_info, &csum, size)?;
07ad6470 179
ebbe4958 180 pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
07ad6470
DM
181 }
182 ArchiveType::FixedIndex => {
183 let index = FixedIndexReader::new(tmpfile)
184 .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?;
2ce15934
FG
185 let (csum, size) = index.compute_csum();
186 verify_archive(archive_info, &csum, size)?;
07ad6470 187
ebbe4958 188 pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
07ad6470 189 }
2ce15934
FG
190 ArchiveType::Blob => {
191 let (csum, size) = compute_file_csum(&mut tmpfile)?;
192 verify_archive(archive_info, &csum, size)?;
193 }
07ad6470
DM
194 }
195 if let Err(err) = std::fs::rename(&tmp_path, &path) {
196 bail!("Atomic rename file {:?} failed - {}", path, err);
197 }
198 Ok(())
199}
200
1610c45a
DM
201// Note: The client.log.blob is uploaded after the backup, so it is
202// not mentioned in the manifest.
203async fn try_client_log_download(
204 worker: &WorkerTask,
205 reader: Arc<BackupReader>,
206 path: &std::path::Path,
207) -> Result<(), Error> {
208
209 let mut tmp_path = path.to_owned();
210 tmp_path.set_extension("tmp");
211
212 let tmpfile = std::fs::OpenOptions::new()
213 .write(true)
214 .create(true)
215 .read(true)
216 .open(&tmp_path)?;
217
add5861e 218 // Note: be silent if there is no log - only log successful download
3d571d55 219 if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
1610c45a
DM
220 if let Err(err) = std::fs::rename(&tmp_path, &path) {
221 bail!("Atomic rename file {:?} failed - {}", path, err);
222 }
add5861e 223 worker.log(format!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME));
1610c45a
DM
224 }
225
226 Ok(())
227}
228
07ad6470
DM
229async fn pull_snapshot(
230 worker: &WorkerTask,
231 reader: Arc<BackupReader>,
232 tgt_store: Arc<DataStore>,
233 snapshot: &BackupDir,
ebbe4958 234 downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
07ad6470
DM
235) -> Result<(), Error> {
236
237 let mut manifest_name = tgt_store.base_path();
238 manifest_name.push(snapshot.relative_path());
239 manifest_name.push(MANIFEST_BLOB_NAME);
240
1610c45a
DM
241 let mut client_log_name = tgt_store.base_path();
242 client_log_name.push(snapshot.relative_path());
243 client_log_name.push(CLIENT_LOG_BLOB_NAME);
244
07ad6470
DM
245 let mut tmp_manifest_name = manifest_name.clone();
246 tmp_manifest_name.set_extension("tmp");
247
c1c4a18f
FG
248 let download_res = download_manifest(&reader, &tmp_manifest_name).await;
249 let mut tmp_manifest_file = match download_res {
250 Ok(manifest_file) => manifest_file,
251 Err(err) => {
252 match err.downcast_ref::<HttpError>() {
253 Some(HttpError { code, message }) => {
54aec2fa
WB
254 match *code {
255 StatusCode::NOT_FOUND => {
c1c4a18f
FG
256 worker.log(format!("skipping snapshot {} - vanished since start of sync", snapshot));
257 return Ok(());
258 },
259 _ => {
260 bail!("HTTP error {} - {}", code, message);
261 },
262 }
263 },
264 None => {
265 return Err(err);
266 },
267 };
268 },
269 };
39f18b30 270 let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
07ad6470
DM
271
272 if manifest_name.exists() {
273 let manifest_blob = proxmox::try_block!({
274 let mut manifest_file = std::fs::File::open(&manifest_name)
275 .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?;
276
39f18b30 277 let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
07ad6470
DM
278 Ok(manifest_blob)
279 }).map_err(|err: Error| {
280 format_err!("unable to read local manifest {:?} - {}", manifest_name, err)
281 })?;
282
283 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
1610c45a
DM
284 if !client_log_name.exists() {
285 try_client_log_download(worker, reader, &client_log_name).await?;
286 }
287 worker.log("no data changes");
e0085e66 288 let _ = std::fs::remove_file(&tmp_manifest_name);
07ad6470
DM
289 return Ok(()); // nothing changed
290 }
291 }
292
293 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
294
07ad6470
DM
295 for item in manifest.files() {
296 let mut path = tgt_store.base_path();
297 path.push(snapshot.relative_path());
298 path.push(&item.filename);
299
300 if path.exists() {
301 match archive_type(&item.filename)? {
302 ArchiveType::DynamicIndex => {
303 let index = DynamicIndexReader::open(&path)?;
304 let (csum, size) = index.compute_csum();
305 match manifest.verify_file(&item.filename, &csum, size) {
306 Ok(_) => continue,
307 Err(err) => {
308 worker.log(format!("detected changed file {:?} - {}", path, err));
309 }
310 }
311 }
312 ArchiveType::FixedIndex => {
313 let index = FixedIndexReader::open(&path)?;
314 let (csum, size) = index.compute_csum();
315 match manifest.verify_file(&item.filename, &csum, size) {
316 Ok(_) => continue,
317 Err(err) => {
318 worker.log(format!("detected changed file {:?} - {}", path, err));
319 }
320 }
321 }
322 ArchiveType::Blob => {
323 let mut tmpfile = std::fs::File::open(&path)?;
324 let (csum, size) = compute_file_csum(&mut tmpfile)?;
325 match manifest.verify_file(&item.filename, &csum, size) {
326 Ok(_) => continue,
327 Err(err) => {
328 worker.log(format!("detected changed file {:?} - {}", path, err));
329 }
330 }
331 }
332 }
333 }
334
14f6c9cb
FG
335 let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, item.chunk_crypt_mode(), HashMap::new());
336
07ad6470
DM
337 pull_single_archive(
338 worker,
339 &reader,
340 &mut chunk_reader,
341 tgt_store.clone(),
342 snapshot,
2ce15934 343 &item,
ebbe4958 344 downloaded_chunks.clone(),
07ad6470
DM
345 ).await?;
346 }
347
348 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
349 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
350 }
351
1610c45a
DM
352 if !client_log_name.exists() {
353 try_client_log_download(worker, reader, &client_log_name).await?;
354 }
355
07ad6470
DM
356 // cleanup - remove stale files
357 tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
358
359 Ok(())
360}
361
362pub async fn pull_snapshot_from(
363 worker: &WorkerTask,
364 reader: Arc<BackupReader>,
365 tgt_store: Arc<DataStore>,
366 snapshot: &BackupDir,
ebbe4958 367 downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
07ad6470
DM
368) -> Result<(), Error> {
369
f23f7543 370 let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
07ad6470
DM
371
372 if is_new {
373 worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
374
ebbe4958 375 if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await {
c9756b40 376 if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
07ad6470
DM
377 worker.log(format!("cleanup error - {}", cleanup_err));
378 }
379 return Err(err);
380 }
4856a218 381 worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
07ad6470
DM
382 } else {
383 worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
ebbe4958 384 pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await?;
4856a218 385 worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path()));
07ad6470
DM
386 }
387
388 Ok(())
389}
390
391pub async fn pull_group(
392 worker: &WorkerTask,
393 client: &HttpClient,
394 src_repo: &BackupRepository,
395 tgt_store: Arc<DataStore>,
396 group: &BackupGroup,
397 delete: bool,
7b8aa893 398 progress: Option<(usize, usize)>, // (groups_done, group_count)
07ad6470
DM
399) -> Result<(), Error> {
400
401 let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
402
403 let args = json!({
404 "backup-type": group.backup_type(),
405 "backup-id": group.backup_id(),
406 });
407
408 let mut result = client.get(&path, Some(args)).await?;
409 let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
410
411 list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
412
0081903f
DM
413 client.login().await?; // make sure auth is complete
414
07ad6470
DM
415 let fingerprint = client.fingerprint();
416
417 let last_sync = tgt_store.last_successful_backup(group)?;
418
419 let mut remote_snapshots = std::collections::HashSet::new();
420
7b8aa893
DM
421 let (per_start, per_group) = if let Some((groups_done, group_count)) = progress {
422 let per_start = (groups_done as f64)/(group_count as f64);
423 let per_group = 1.0/(group_count as f64);
424 (per_start, per_group)
425 } else {
426 (0.0, 1.0)
427 };
428
ebbe4958
DM
429 // start with 16384 chunks (up to 65GB)
430 let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*64)));
431
7b8aa893
DM
432 let snapshot_count = list.len();
433
434 for (pos, item) in list.into_iter().enumerate() {
e0e5b442 435 let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
86f6f741
FG
436
437 // in-progress backups can't be synced
54aec2fa 438 if item.size.is_none() {
86f6f741
FG
439 worker.log(format!("skipping snapshot {} - in-progress backup", snapshot));
440 continue;
441 }
442
443 let backup_time = snapshot.backup_time();
444
07ad6470
DM
445 remote_snapshots.insert(backup_time);
446
447 if let Some(last_sync_time) = last_sync {
448 if last_sync_time > backup_time { continue; }
449 }
450
0081903f
DM
451 // get updated auth_info (new tickets)
452 let auth_info = client.login().await?;
453
07ad6470
DM
454 let options = HttpClientOptions::new()
455 .password(Some(auth_info.ticket.clone()))
456 .fingerprint(fingerprint.clone());
457
34aa8e13 458 let new_client = HttpClient::new(src_repo.host(), src_repo.port(), src_repo.auth_id(), options)?;
07ad6470
DM
459
460 let reader = BackupReader::start(
461 new_client,
462 None,
463 src_repo.store(),
86f6f741
FG
464 snapshot.group().backup_type(),
465 snapshot.group().backup_id(),
07ad6470
DM
466 backup_time,
467 true,
468 ).await?;
469
7b8aa893
DM
470 let result = pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks.clone()).await;
471
472 let percentage = (pos as f64)/(snapshot_count as f64);
473 let percentage = per_start + percentage*per_group;
474 worker.log(format!("percentage done: {:.2}%", percentage*100.0));
475
476 result?; // stop on error
07ad6470
DM
477 }
478
479 if delete {
480 let local_list = group.list_backups(&tgt_store.base_path())?;
481 for info in local_list {
482 let backup_time = info.backup_dir.backup_time();
483 if remote_snapshots.contains(&backup_time) { continue; }
484 worker.log(format!("delete vanished snapshot {:?}", info.backup_dir.relative_path()));
c9756b40 485 tgt_store.remove_backup_dir(&info.backup_dir, false)?;
07ad6470
DM
486 }
487 }
488
489 Ok(())
490}
491
492pub async fn pull_store(
493 worker: &WorkerTask,
494 client: &HttpClient,
495 src_repo: &BackupRepository,
496 tgt_store: Arc<DataStore>,
497 delete: bool,
e6dc35ac 498 auth_id: Authid,
07ad6470
DM
499) -> Result<(), Error> {
500
501 // explicit create shared lock to prevent GC on newly created chunks
502 let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
503
504 let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
505
506 let mut result = client.get(&path, None).await?;
507
508 let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
509
510 list.sort_unstable_by(|a, b| {
511 let type_order = a.backup_type.cmp(&b.backup_type);
512 if type_order == std::cmp::Ordering::Equal {
513 a.backup_id.cmp(&b.backup_id)
514 } else {
515 type_order
516 }
517 });
518
519 let mut errors = false;
520
521 let mut new_groups = std::collections::HashSet::new();
522 for item in list.iter() {
523 new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id));
524 }
525
7b8aa893 526 let group_count = list.len();
7b8aa893 527
54aec2fa 528 for (groups_done, item) in list.into_iter().enumerate() {
07ad6470
DM
529 let group = BackupGroup::new(&item.backup_type, &item.backup_id);
530
30f73fa2
DM
531 let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
532 Ok(result) => result,
533 Err(err) => {
534 worker.log(format!("sync group {}/{} failed - group lock failed: {}",
535 item.backup_type, item.backup_id, err));
536 errors = true; // do not stop here, instead continue
537 continue;
538 }
539 };
540
07ad6470 541 // permission check
e6dc35ac 542 if auth_id != owner { // only the owner is allowed to create additional snapshots
07ad6470 543 worker.log(format!("sync group {}/{} failed - owner check failed ({} != {})",
e6dc35ac 544 item.backup_type, item.backup_id, auth_id, owner));
7b8aa893 545 errors = true; // do not stop here, instead continue
07ad6470 546
20813274
WB
547 } else if let Err(err) = pull_group(
548 worker,
549 client,
550 src_repo,
551 tgt_store.clone(),
552 &group,
553 delete,
554 Some((groups_done, group_count)),
555 ).await {
556 worker.log(format!(
557 "sync group {}/{} failed - {}",
558 item.backup_type,
559 item.backup_id,
560 err,
561 ));
562 errors = true; // do not stop here, instead continue
07ad6470
DM
563 }
564 }
565
566 if delete {
567 let result: Result<(), Error> = proxmox::try_block!({
568 let local_groups = BackupGroup::list_groups(&tgt_store.base_path())?;
569 for local_group in local_groups {
570 if new_groups.contains(&local_group) { continue; }
571 worker.log(format!("delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id()));
572 if let Err(err) = tgt_store.remove_backup_group(&local_group) {
573 worker.log(err.to_string());
574 errors = true;
575 }
576 }
577 Ok(())
578 });
579 if let Err(err) = result {
580 worker.log(format!("error during cleanup: {}", err));
581 errors = true;
582 };
583 }
584
585 if errors {
586 bail!("sync failed with some errors.");
587 }
588
589 Ok(())
590}