]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/verify.rs
2b36e9fe45cf36c91ddc7e4f7eb8e67f5ea5e126
[proxmox-backup.git] / src / backup / verify.rs
1 use nix::dir::Dir;
2 use std::collections::HashSet;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::{Arc, Mutex};
5 use std::time::Instant;
6
7 use anyhow::{bail, format_err, Error};
8
9 use proxmox_sys::{task_log, WorkerTaskContext};
10
11 use pbs_api_types::{
12 print_ns_and_snapshot, Authid, BackupNamespace, BackupType, CryptMode, DatastoreWithNamespace,
13 SnapshotVerifyState, VerifyState, UPID,
14 };
15 use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
16 use pbs_datastore::index::IndexFile;
17 use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, FileInfo};
18 use pbs_datastore::{DataBlob, DataStore, StoreProgress};
19 use proxmox_sys::fs::lock_dir_noblock_shared;
20
21 use crate::tools::parallel_handler::ParallelHandler;
22
23 use crate::backup::hierarchy::ListAccessibleBackupGroups;
24
25 /// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
26 /// already been verified or detected as corrupt.
27 pub struct VerifyWorker {
28 worker: Arc<dyn WorkerTaskContext>,
29 datastore: Arc<DataStore>,
30 verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
31 corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
32 }
33
34 impl VerifyWorker {
35 /// Creates a new VerifyWorker for a given task worker and datastore.
36 pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore>) -> Self {
37 Self {
38 worker,
39 datastore,
40 // start with 16k chunks == up to 64G data
41 verified_chunks: Arc::new(Mutex::new(HashSet::with_capacity(16 * 1024))),
42 // start with 64 chunks since we assume there are few corrupt ones
43 corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
44 }
45 }
46 }
47
48 fn verify_blob(backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
49 let blob = backup_dir.load_blob(&info.filename)?;
50
51 let raw_size = blob.raw_size();
52 if raw_size != info.size {
53 bail!("wrong size ({} != {})", info.size, raw_size);
54 }
55
56 let csum = openssl::sha::sha256(blob.raw_data());
57 if csum != info.csum {
58 bail!("wrong index checksum");
59 }
60
61 match blob.crypt_mode()? {
62 CryptMode::Encrypt => Ok(()),
63 CryptMode::None => {
64 // digest already verified above
65 blob.decode(None, None)?;
66 Ok(())
67 }
68 CryptMode::SignOnly => bail!("Invalid CryptMode for blob"),
69 }
70 }
71
72 fn rename_corrupted_chunk(
73 datastore: Arc<DataStore>,
74 digest: &[u8; 32],
75 worker: &dyn WorkerTaskContext,
76 ) {
77 let (path, digest_str) = datastore.chunk_path(digest);
78
79 let mut counter = 0;
80 let mut new_path = path.clone();
81 loop {
82 new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
83 if new_path.exists() && counter < 9 {
84 counter += 1;
85 } else {
86 break;
87 }
88 }
89
90 match std::fs::rename(&path, &new_path) {
91 Ok(_) => {
92 task_log!(worker, "corrupted chunk renamed to {:?}", &new_path);
93 }
94 Err(err) => {
95 match err.kind() {
96 std::io::ErrorKind::NotFound => { /* ignored */ }
97 _ => task_log!(
98 worker,
99 "could not rename corrupted chunk {:?} - {}",
100 &path,
101 err
102 ),
103 }
104 }
105 };
106 }
107
108 fn verify_index_chunks(
109 verify_worker: &VerifyWorker,
110 index: Box<dyn IndexFile + Send>,
111 crypt_mode: CryptMode,
112 ) -> Result<(), Error> {
113 let errors = Arc::new(AtomicUsize::new(0));
114
115 let start_time = Instant::now();
116
117 let mut read_bytes = 0;
118 let mut decoded_bytes = 0;
119
120 let worker2 = Arc::clone(&verify_worker.worker);
121 let datastore2 = Arc::clone(&verify_worker.datastore);
122 let corrupt_chunks2 = Arc::clone(&verify_worker.corrupt_chunks);
123 let verified_chunks2 = Arc::clone(&verify_worker.verified_chunks);
124 let errors2 = Arc::clone(&errors);
125
126 let decoder_pool = ParallelHandler::new(
127 "verify chunk decoder",
128 4,
129 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
130 let chunk_crypt_mode = match chunk.crypt_mode() {
131 Err(err) => {
132 corrupt_chunks2.lock().unwrap().insert(digest);
133 task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err);
134 errors2.fetch_add(1, Ordering::SeqCst);
135 return Ok(());
136 }
137 Ok(mode) => mode,
138 };
139
140 if chunk_crypt_mode != crypt_mode {
141 task_log!(
142 worker2,
143 "chunk CryptMode {:?} does not match index CryptMode {:?}",
144 chunk_crypt_mode,
145 crypt_mode
146 );
147 errors2.fetch_add(1, Ordering::SeqCst);
148 }
149
150 if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
151 corrupt_chunks2.lock().unwrap().insert(digest);
152 task_log!(worker2, "{}", err);
153 errors2.fetch_add(1, Ordering::SeqCst);
154 rename_corrupted_chunk(datastore2.clone(), &digest, &worker2);
155 } else {
156 verified_chunks2.lock().unwrap().insert(digest);
157 }
158
159 Ok(())
160 },
161 );
162
163 let skip_chunk = |digest: &[u8; 32]| -> bool {
164 if verify_worker
165 .verified_chunks
166 .lock()
167 .unwrap()
168 .contains(digest)
169 {
170 true
171 } else if verify_worker
172 .corrupt_chunks
173 .lock()
174 .unwrap()
175 .contains(digest)
176 {
177 let digest_str = hex::encode(digest);
178 task_log!(
179 verify_worker.worker,
180 "chunk {} was marked as corrupt",
181 digest_str
182 );
183 errors.fetch_add(1, Ordering::SeqCst);
184 true
185 } else {
186 false
187 }
188 };
189
190 let check_abort = |pos: usize| -> Result<(), Error> {
191 if pos & 1023 == 0 {
192 verify_worker.worker.check_abort()?;
193 verify_worker.worker.fail_on_shutdown()?;
194 }
195 Ok(())
196 };
197
198 let chunk_list =
199 verify_worker
200 .datastore
201 .get_chunks_in_order(&index, skip_chunk, check_abort)?;
202
203 for (pos, _) in chunk_list {
204 verify_worker.worker.check_abort()?;
205 verify_worker.worker.fail_on_shutdown()?;
206
207 let info = index.chunk_info(pos).unwrap();
208
209 // we must always recheck this here, the parallel worker below alter it!
210 if skip_chunk(&info.digest) {
211 continue; // already verified or marked corrupt
212 }
213
214 match verify_worker.datastore.load_chunk(&info.digest) {
215 Err(err) => {
216 verify_worker
217 .corrupt_chunks
218 .lock()
219 .unwrap()
220 .insert(info.digest);
221 task_log!(
222 verify_worker.worker,
223 "can't verify chunk, load failed - {}",
224 err
225 );
226 errors.fetch_add(1, Ordering::SeqCst);
227 rename_corrupted_chunk(
228 verify_worker.datastore.clone(),
229 &info.digest,
230 &verify_worker.worker,
231 );
232 }
233 Ok(chunk) => {
234 let size = info.size();
235 read_bytes += chunk.raw_size();
236 decoder_pool.send((chunk, info.digest, size))?;
237 decoded_bytes += size;
238 }
239 }
240 }
241
242 decoder_pool.complete()?;
243
244 let elapsed = start_time.elapsed().as_secs_f64();
245
246 let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
247 let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
248
249 let read_speed = read_bytes_mib / elapsed;
250 let decode_speed = decoded_bytes_mib / elapsed;
251
252 let error_count = errors.load(Ordering::SeqCst);
253
254 task_log!(
255 verify_worker.worker,
256 " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
257 read_bytes_mib,
258 decoded_bytes_mib,
259 elapsed,
260 read_speed,
261 decode_speed,
262 error_count,
263 );
264
265 if errors.load(Ordering::SeqCst) > 0 {
266 bail!("chunks could not be verified");
267 }
268
269 Ok(())
270 }
271
272 fn verify_fixed_index(
273 verify_worker: &VerifyWorker,
274 backup_dir: &BackupDir,
275 info: &FileInfo,
276 ) -> Result<(), Error> {
277 let mut path = backup_dir.relative_path();
278 path.push(&info.filename);
279
280 let index = verify_worker.datastore.open_fixed_reader(&path)?;
281
282 let (csum, size) = index.compute_csum();
283 if size != info.size {
284 bail!("wrong size ({} != {})", info.size, size);
285 }
286
287 if csum != info.csum {
288 bail!("wrong index checksum");
289 }
290
291 verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
292 }
293
294 fn verify_dynamic_index(
295 verify_worker: &VerifyWorker,
296 backup_dir: &BackupDir,
297 info: &FileInfo,
298 ) -> Result<(), Error> {
299 let mut path = backup_dir.relative_path();
300 path.push(&info.filename);
301
302 let index = verify_worker.datastore.open_dynamic_reader(&path)?;
303
304 let (csum, size) = index.compute_csum();
305 if size != info.size {
306 bail!("wrong size ({} != {})", info.size, size);
307 }
308
309 if csum != info.csum {
310 bail!("wrong index checksum");
311 }
312
313 verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
314 }
315
316 /// Verify a single backup snapshot
317 ///
318 /// This checks all archives inside a backup snapshot.
319 /// Errors are logged to the worker log.
320 ///
321 /// Returns
322 /// - Ok(true) if verify is successful
323 /// - Ok(false) if there were verification errors
324 /// - Err(_) if task was aborted
325 pub fn verify_backup_dir(
326 verify_worker: &VerifyWorker,
327 backup_dir: &BackupDir,
328 upid: UPID,
329 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
330 ) -> Result<bool, Error> {
331 let snap_lock = lock_dir_noblock_shared(
332 &backup_dir.full_path(),
333 "snapshot",
334 "locked by another operation",
335 );
336 match snap_lock {
337 Ok(snap_lock) => {
338 verify_backup_dir_with_lock(verify_worker, backup_dir, upid, filter, snap_lock)
339 }
340 Err(err) => {
341 task_log!(
342 verify_worker.worker,
343 "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
344 verify_worker.datastore.name(),
345 backup_dir.dir(),
346 err,
347 );
348 Ok(true)
349 }
350 }
351 }
352
353 /// See verify_backup_dir
354 pub fn verify_backup_dir_with_lock(
355 verify_worker: &VerifyWorker,
356 backup_dir: &BackupDir,
357 upid: UPID,
358 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
359 _snap_lock: Dir,
360 ) -> Result<bool, Error> {
361 let manifest = match backup_dir.load_manifest() {
362 Ok((manifest, _)) => manifest,
363 Err(err) => {
364 task_log!(
365 verify_worker.worker,
366 "verify {}:{} - manifest load error: {}",
367 verify_worker.datastore.name(),
368 backup_dir.dir(),
369 err,
370 );
371 return Ok(false);
372 }
373 };
374
375 if let Some(filter) = filter {
376 if !filter(&manifest) {
377 task_log!(
378 verify_worker.worker,
379 "SKIPPED: verify {}:{} (recently verified)",
380 verify_worker.datastore.name(),
381 backup_dir.dir(),
382 );
383 return Ok(true);
384 }
385 }
386
387 task_log!(
388 verify_worker.worker,
389 "verify {}:{}",
390 verify_worker.datastore.name(),
391 backup_dir.dir()
392 );
393
394 let mut error_count = 0;
395
396 let mut verify_result = VerifyState::Ok;
397 for info in manifest.files() {
398 let result = proxmox_lang::try_block!({
399 task_log!(verify_worker.worker, " check {}", info.filename);
400 match archive_type(&info.filename)? {
401 ArchiveType::FixedIndex => verify_fixed_index(verify_worker, backup_dir, info),
402 ArchiveType::DynamicIndex => verify_dynamic_index(verify_worker, backup_dir, info),
403 ArchiveType::Blob => verify_blob(backup_dir, info),
404 }
405 });
406
407 verify_worker.worker.check_abort()?;
408 verify_worker.worker.fail_on_shutdown()?;
409
410 if let Err(err) = result {
411 task_log!(
412 verify_worker.worker,
413 "verify {}:{}/{} failed: {}",
414 verify_worker.datastore.name(),
415 backup_dir.dir(),
416 info.filename,
417 err,
418 );
419 error_count += 1;
420 verify_result = VerifyState::Failed;
421 }
422 }
423
424 let verify_state = SnapshotVerifyState {
425 state: verify_result,
426 upid,
427 };
428 let verify_state = serde_json::to_value(verify_state)?;
429 backup_dir
430 .update_manifest(|manifest| {
431 manifest.unprotected["verify_state"] = verify_state;
432 })
433 .map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
434
435 Ok(error_count == 0)
436 }
437
438 /// Verify all backups inside a backup group
439 ///
440 /// Errors are logged to the worker log.
441 ///
442 /// Returns
443 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
444 /// - Err(_) if task was aborted
445 pub fn verify_backup_group(
446 verify_worker: &VerifyWorker,
447 group: &BackupGroup,
448 progress: &mut StoreProgress,
449 upid: &UPID,
450 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
451 ) -> Result<Vec<String>, Error> {
452 let mut errors = Vec::new();
453 let mut list = match group.list_backups() {
454 Ok(list) => list,
455 Err(err) => {
456 let store_with_ns = DatastoreWithNamespace {
457 store: verify_worker.datastore.name().to_owned(),
458 ns: group.backup_ns().clone(),
459 };
460 task_log!(
461 verify_worker.worker,
462 "verify {}, group {} - unable to list backups: {}",
463 store_with_ns,
464 group.group(),
465 err,
466 );
467 return Ok(errors);
468 }
469 };
470
471 let snapshot_count = list.len();
472 task_log!(
473 verify_worker.worker,
474 "verify group {}:{} ({} snapshots)",
475 verify_worker.datastore.name(),
476 group.group(),
477 snapshot_count
478 );
479
480 progress.group_snapshots = snapshot_count as u64;
481
482 BackupInfo::sort_list(&mut list, false); // newest first
483 for (pos, info) in list.into_iter().enumerate() {
484 if !verify_backup_dir(verify_worker, &info.backup_dir, upid.clone(), filter)? {
485 errors.push(print_ns_and_snapshot(
486 info.backup_dir.backup_ns(),
487 info.backup_dir.as_ref(),
488 ));
489 }
490 progress.done_snapshots = pos as u64 + 1;
491 task_log!(verify_worker.worker, "percentage done: {}", progress);
492 }
493
494 Ok(errors)
495 }
496
497 /// Verify all (owned) backups inside a datastore
498 ///
499 /// Errors are logged to the worker log.
500 ///
501 /// Returns
502 /// - Ok(failed_dirs) where failed_dirs had verification errors
503 /// - Err(_) if task was aborted
504 pub fn verify_all_backups(
505 verify_worker: &VerifyWorker,
506 upid: &UPID,
507 ns: BackupNamespace,
508 max_depth: Option<usize>,
509 owner: Option<&Authid>,
510 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
511 ) -> Result<Vec<String>, Error> {
512 let mut errors = Vec::new();
513 let worker = Arc::clone(&verify_worker.worker);
514
515 task_log!(
516 worker,
517 "verify datastore {}",
518 verify_worker.datastore.name()
519 );
520
521 let owner_filtered = if let Some(owner) = &owner {
522 task_log!(worker, "limiting to backups owned by {}", owner);
523 true
524 } else {
525 false
526 };
527
528 // FIXME: This should probably simply enable recursion (or the call have a recursion parameter)
529 let store = &verify_worker.datastore;
530 let max_depth = max_depth.unwrap_or(pbs_api_types::MAX_NAMESPACE_DEPTH);
531
532 let mut list = match ListAccessibleBackupGroups::new(store, ns.clone(), max_depth, owner) {
533 Ok(list) => list
534 .filter_map(|group| match group {
535 Ok(group) => Some(group),
536 Err(err) if owner_filtered => {
537 // intentionally not in task log, the user might not see this group!
538 println!("error on iterating groups in ns '{ns}' - {err}");
539 None
540 }
541 Err(err) => {
542 // we don't filter by owner, but we want to log the error
543 task_log!(worker, "error on iterating groups in ns '{ns}' - {err}");
544 errors.push(err.to_string());
545 None
546 }
547 })
548 .filter(|group| {
549 !(group.backup_type() == BackupType::Host && group.backup_id() == "benchmark")
550 })
551 .collect::<Vec<BackupGroup>>(),
552 Err(err) => {
553 task_log!(worker, "unable to list backups: {}", err,);
554 return Ok(errors);
555 }
556 };
557
558 list.sort_unstable_by(|a, b| a.group().cmp(b.group()));
559
560 let group_count = list.len();
561 task_log!(worker, "found {} groups", group_count);
562
563 let mut progress = StoreProgress::new(group_count as u64);
564
565 for (pos, group) in list.into_iter().enumerate() {
566 progress.done_groups = pos as u64;
567 progress.done_snapshots = 0;
568 progress.group_snapshots = 0;
569
570 let mut group_errors =
571 verify_backup_group(verify_worker, &group, &mut progress, upid, filter)?;
572 errors.append(&mut group_errors);
573 }
574
575 Ok(errors)
576 }
577
578 /// Filter for the verification of snapshots
579 pub fn verify_filter(
580 ignore_verified_snapshots: bool,
581 outdated_after: Option<i64>,
582 manifest: &BackupManifest,
583 ) -> bool {
584 if !ignore_verified_snapshots {
585 return true;
586 }
587
588 let raw_verify_state = manifest.unprotected["verify_state"].clone();
589 match serde_json::from_value::<SnapshotVerifyState>(raw_verify_state) {
590 Err(_) => true, // no last verification, always include
591 Ok(last_verify) => {
592 match outdated_after {
593 None => false, // never re-verify if ignored and no max age
594 Some(max_age) => {
595 let now = proxmox_time::epoch_i64();
596 let days_since_last_verify = (now - last_verify.upid.starttime) / 86400;
597
598 days_since_last_verify > max_age
599 }
600 }
601 }
602 }
603 }