]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/verify.rs
backup: switch over to streaming Iterator improving memory usage
[proxmox-backup.git] / src / backup / verify.rs
1 use nix::dir::Dir;
2 use std::collections::HashSet;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::{Arc, Mutex};
5 use std::time::Instant;
6
7 use anyhow::{bail, format_err, Error};
8
9 use proxmox_sys::{task_log, WorkerTaskContext};
10
11 use pbs_api_types::{Authid, CryptMode, SnapshotVerifyState, VerifyState, UPID};
12 use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
13 use pbs_datastore::index::IndexFile;
14 use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, FileInfo};
15 use pbs_datastore::{DataBlob, DataStore, StoreProgress};
16 use proxmox_sys::fs::lock_dir_noblock_shared;
17
18 use crate::tools::parallel_handler::ParallelHandler;
19
20 /// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
21 /// already been verified or detected as corrupt.
22 pub struct VerifyWorker {
23 worker: Arc<dyn WorkerTaskContext>,
24 datastore: Arc<DataStore>,
25 verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
26 corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
27 }
28
29 impl VerifyWorker {
30 /// Creates a new VerifyWorker for a given task worker and datastore.
31 pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore>) -> Self {
32 Self {
33 worker,
34 datastore,
35 // start with 16k chunks == up to 64G data
36 verified_chunks: Arc::new(Mutex::new(HashSet::with_capacity(16 * 1024))),
37 // start with 64 chunks since we assume there are few corrupt ones
38 corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
39 }
40 }
41 }
42
43 fn verify_blob(
44 datastore: Arc<DataStore>,
45 backup_dir: &BackupDir,
46 info: &FileInfo,
47 ) -> Result<(), Error> {
48 let blob = datastore.load_blob(backup_dir, &info.filename)?;
49
50 let raw_size = blob.raw_size();
51 if raw_size != info.size {
52 bail!("wrong size ({} != {})", info.size, raw_size);
53 }
54
55 let csum = openssl::sha::sha256(blob.raw_data());
56 if csum != info.csum {
57 bail!("wrong index checksum");
58 }
59
60 match blob.crypt_mode()? {
61 CryptMode::Encrypt => Ok(()),
62 CryptMode::None => {
63 // digest already verified above
64 blob.decode(None, None)?;
65 Ok(())
66 }
67 CryptMode::SignOnly => bail!("Invalid CryptMode for blob"),
68 }
69 }
70
71 fn rename_corrupted_chunk(
72 datastore: Arc<DataStore>,
73 digest: &[u8; 32],
74 worker: &dyn WorkerTaskContext,
75 ) {
76 let (path, digest_str) = datastore.chunk_path(digest);
77
78 let mut counter = 0;
79 let mut new_path = path.clone();
80 loop {
81 new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
82 if new_path.exists() && counter < 9 {
83 counter += 1;
84 } else {
85 break;
86 }
87 }
88
89 match std::fs::rename(&path, &new_path) {
90 Ok(_) => {
91 task_log!(worker, "corrupted chunk renamed to {:?}", &new_path);
92 }
93 Err(err) => {
94 match err.kind() {
95 std::io::ErrorKind::NotFound => { /* ignored */ }
96 _ => task_log!(
97 worker,
98 "could not rename corrupted chunk {:?} - {}",
99 &path,
100 err
101 ),
102 }
103 }
104 };
105 }
106
107 fn verify_index_chunks(
108 verify_worker: &VerifyWorker,
109 index: Box<dyn IndexFile + Send>,
110 crypt_mode: CryptMode,
111 ) -> Result<(), Error> {
112 let errors = Arc::new(AtomicUsize::new(0));
113
114 let start_time = Instant::now();
115
116 let mut read_bytes = 0;
117 let mut decoded_bytes = 0;
118
119 let worker2 = Arc::clone(&verify_worker.worker);
120 let datastore2 = Arc::clone(&verify_worker.datastore);
121 let corrupt_chunks2 = Arc::clone(&verify_worker.corrupt_chunks);
122 let verified_chunks2 = Arc::clone(&verify_worker.verified_chunks);
123 let errors2 = Arc::clone(&errors);
124
125 let decoder_pool = ParallelHandler::new(
126 "verify chunk decoder",
127 4,
128 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
129 let chunk_crypt_mode = match chunk.crypt_mode() {
130 Err(err) => {
131 corrupt_chunks2.lock().unwrap().insert(digest);
132 task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err);
133 errors2.fetch_add(1, Ordering::SeqCst);
134 return Ok(());
135 }
136 Ok(mode) => mode,
137 };
138
139 if chunk_crypt_mode != crypt_mode {
140 task_log!(
141 worker2,
142 "chunk CryptMode {:?} does not match index CryptMode {:?}",
143 chunk_crypt_mode,
144 crypt_mode
145 );
146 errors2.fetch_add(1, Ordering::SeqCst);
147 }
148
149 if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
150 corrupt_chunks2.lock().unwrap().insert(digest);
151 task_log!(worker2, "{}", err);
152 errors2.fetch_add(1, Ordering::SeqCst);
153 rename_corrupted_chunk(datastore2.clone(), &digest, &worker2);
154 } else {
155 verified_chunks2.lock().unwrap().insert(digest);
156 }
157
158 Ok(())
159 },
160 );
161
162 let skip_chunk = |digest: &[u8; 32]| -> bool {
163 if verify_worker
164 .verified_chunks
165 .lock()
166 .unwrap()
167 .contains(digest)
168 {
169 true
170 } else if verify_worker
171 .corrupt_chunks
172 .lock()
173 .unwrap()
174 .contains(digest)
175 {
176 let digest_str = hex::encode(digest);
177 task_log!(
178 verify_worker.worker,
179 "chunk {} was marked as corrupt",
180 digest_str
181 );
182 errors.fetch_add(1, Ordering::SeqCst);
183 true
184 } else {
185 false
186 }
187 };
188
189 let check_abort = |pos: usize| -> Result<(), Error> {
190 if pos & 1023 == 0 {
191 verify_worker.worker.check_abort()?;
192 verify_worker.worker.fail_on_shutdown()?;
193 }
194 Ok(())
195 };
196
197 let chunk_list =
198 verify_worker
199 .datastore
200 .get_chunks_in_order(&index, skip_chunk, check_abort)?;
201
202 for (pos, _) in chunk_list {
203 verify_worker.worker.check_abort()?;
204 verify_worker.worker.fail_on_shutdown()?;
205
206 let info = index.chunk_info(pos).unwrap();
207
208 // we must always recheck this here, the parallel worker below alter it!
209 if skip_chunk(&info.digest) {
210 continue; // already verified or marked corrupt
211 }
212
213 match verify_worker.datastore.load_chunk(&info.digest) {
214 Err(err) => {
215 verify_worker
216 .corrupt_chunks
217 .lock()
218 .unwrap()
219 .insert(info.digest);
220 task_log!(
221 verify_worker.worker,
222 "can't verify chunk, load failed - {}",
223 err
224 );
225 errors.fetch_add(1, Ordering::SeqCst);
226 rename_corrupted_chunk(
227 verify_worker.datastore.clone(),
228 &info.digest,
229 &verify_worker.worker,
230 );
231 }
232 Ok(chunk) => {
233 let size = info.size();
234 read_bytes += chunk.raw_size();
235 decoder_pool.send((chunk, info.digest, size))?;
236 decoded_bytes += size;
237 }
238 }
239 }
240
241 decoder_pool.complete()?;
242
243 let elapsed = start_time.elapsed().as_secs_f64();
244
245 let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
246 let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
247
248 let read_speed = read_bytes_mib / elapsed;
249 let decode_speed = decoded_bytes_mib / elapsed;
250
251 let error_count = errors.load(Ordering::SeqCst);
252
253 task_log!(
254 verify_worker.worker,
255 " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
256 read_bytes_mib,
257 decoded_bytes_mib,
258 elapsed,
259 read_speed,
260 decode_speed,
261 error_count,
262 );
263
264 if errors.load(Ordering::SeqCst) > 0 {
265 bail!("chunks could not be verified");
266 }
267
268 Ok(())
269 }
270
271 fn verify_fixed_index(
272 verify_worker: &VerifyWorker,
273 backup_dir: &BackupDir,
274 info: &FileInfo,
275 ) -> Result<(), Error> {
276 let mut path = backup_dir.relative_path();
277 path.push(&info.filename);
278
279 let index = verify_worker.datastore.open_fixed_reader(&path)?;
280
281 let (csum, size) = index.compute_csum();
282 if size != info.size {
283 bail!("wrong size ({} != {})", info.size, size);
284 }
285
286 if csum != info.csum {
287 bail!("wrong index checksum");
288 }
289
290 verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
291 }
292
293 fn verify_dynamic_index(
294 verify_worker: &VerifyWorker,
295 backup_dir: &BackupDir,
296 info: &FileInfo,
297 ) -> Result<(), Error> {
298 let mut path = backup_dir.relative_path();
299 path.push(&info.filename);
300
301 let index = verify_worker.datastore.open_dynamic_reader(&path)?;
302
303 let (csum, size) = index.compute_csum();
304 if size != info.size {
305 bail!("wrong size ({} != {})", info.size, size);
306 }
307
308 if csum != info.csum {
309 bail!("wrong index checksum");
310 }
311
312 verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
313 }
314
315 /// Verify a single backup snapshot
316 ///
317 /// This checks all archives inside a backup snapshot.
318 /// Errors are logged to the worker log.
319 ///
320 /// Returns
321 /// - Ok(true) if verify is successful
322 /// - Ok(false) if there were verification errors
323 /// - Err(_) if task was aborted
324 pub fn verify_backup_dir(
325 verify_worker: &VerifyWorker,
326 backup_dir: &BackupDir,
327 upid: UPID,
328 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
329 ) -> Result<bool, Error> {
330 let snap_lock = lock_dir_noblock_shared(
331 &verify_worker.datastore.snapshot_path(backup_dir),
332 "snapshot",
333 "locked by another operation",
334 );
335 match snap_lock {
336 Ok(snap_lock) => {
337 verify_backup_dir_with_lock(verify_worker, backup_dir, upid, filter, snap_lock)
338 }
339 Err(err) => {
340 task_log!(
341 verify_worker.worker,
342 "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
343 verify_worker.datastore.name(),
344 backup_dir,
345 err,
346 );
347 Ok(true)
348 }
349 }
350 }
351
352 /// See verify_backup_dir
353 pub fn verify_backup_dir_with_lock(
354 verify_worker: &VerifyWorker,
355 backup_dir: &BackupDir,
356 upid: UPID,
357 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
358 _snap_lock: Dir,
359 ) -> Result<bool, Error> {
360 let manifest = match verify_worker.datastore.load_manifest(backup_dir) {
361 Ok((manifest, _)) => manifest,
362 Err(err) => {
363 task_log!(
364 verify_worker.worker,
365 "verify {}:{} - manifest load error: {}",
366 verify_worker.datastore.name(),
367 backup_dir,
368 err,
369 );
370 return Ok(false);
371 }
372 };
373
374 if let Some(filter) = filter {
375 if !filter(&manifest) {
376 task_log!(
377 verify_worker.worker,
378 "SKIPPED: verify {}:{} (recently verified)",
379 verify_worker.datastore.name(),
380 backup_dir,
381 );
382 return Ok(true);
383 }
384 }
385
386 task_log!(
387 verify_worker.worker,
388 "verify {}:{}",
389 verify_worker.datastore.name(),
390 backup_dir
391 );
392
393 let mut error_count = 0;
394
395 let mut verify_result = VerifyState::Ok;
396 for info in manifest.files() {
397 let result = proxmox_lang::try_block!({
398 task_log!(verify_worker.worker, " check {}", info.filename);
399 match archive_type(&info.filename)? {
400 ArchiveType::FixedIndex => verify_fixed_index(verify_worker, backup_dir, info),
401 ArchiveType::DynamicIndex => verify_dynamic_index(verify_worker, backup_dir, info),
402 ArchiveType::Blob => verify_blob(verify_worker.datastore.clone(), backup_dir, info),
403 }
404 });
405
406 verify_worker.worker.check_abort()?;
407 verify_worker.worker.fail_on_shutdown()?;
408
409 if let Err(err) = result {
410 task_log!(
411 verify_worker.worker,
412 "verify {}:{}/{} failed: {}",
413 verify_worker.datastore.name(),
414 backup_dir,
415 info.filename,
416 err,
417 );
418 error_count += 1;
419 verify_result = VerifyState::Failed;
420 }
421 }
422
423 let verify_state = SnapshotVerifyState {
424 state: verify_result,
425 upid,
426 };
427 let verify_state = serde_json::to_value(verify_state)?;
428 verify_worker
429 .datastore
430 .update_manifest(backup_dir, |manifest| {
431 manifest.unprotected["verify_state"] = verify_state;
432 })
433 .map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
434
435 Ok(error_count == 0)
436 }
437
438 /// Verify all backups inside a backup group
439 ///
440 /// Errors are logged to the worker log.
441 ///
442 /// Returns
443 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
444 /// - Err(_) if task was aborted
445 pub fn verify_backup_group(
446 verify_worker: &VerifyWorker,
447 group: &BackupGroup,
448 progress: &mut StoreProgress,
449 upid: &UPID,
450 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
451 ) -> Result<Vec<String>, Error> {
452 let mut errors = Vec::new();
453 let mut list = match group.list_backups(&verify_worker.datastore.base_path()) {
454 Ok(list) => list,
455 Err(err) => {
456 task_log!(
457 verify_worker.worker,
458 "verify group {}:{} - unable to list backups: {}",
459 verify_worker.datastore.name(),
460 group,
461 err,
462 );
463 return Ok(errors);
464 }
465 };
466
467 let snapshot_count = list.len();
468 task_log!(
469 verify_worker.worker,
470 "verify group {}:{} ({} snapshots)",
471 verify_worker.datastore.name(),
472 group,
473 snapshot_count
474 );
475
476 progress.group_snapshots = snapshot_count as u64;
477
478 BackupInfo::sort_list(&mut list, false); // newest first
479 for (pos, info) in list.into_iter().enumerate() {
480 if !verify_backup_dir(verify_worker, &info.backup_dir, upid.clone(), filter)? {
481 errors.push(info.backup_dir.to_string());
482 }
483 progress.done_snapshots = pos as u64 + 1;
484 task_log!(verify_worker.worker, "percentage done: {}", progress);
485 }
486
487 Ok(errors)
488 }
489
490 /// Verify all (owned) backups inside a datastore
491 ///
492 /// Errors are logged to the worker log.
493 ///
494 /// Returns
495 /// - Ok(failed_dirs) where failed_dirs had verification errors
496 /// - Err(_) if task was aborted
497 pub fn verify_all_backups(
498 verify_worker: &VerifyWorker,
499 upid: &UPID,
500 owner: Option<Authid>,
501 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
502 ) -> Result<Vec<String>, Error> {
503 let mut errors = Vec::new();
504 let worker = Arc::clone(&verify_worker.worker);
505
506 task_log!(
507 worker,
508 "verify datastore {}",
509 verify_worker.datastore.name()
510 );
511
512 if let Some(owner) = &owner {
513 task_log!(worker, "limiting to backups owned by {}", owner);
514 }
515
516 let filter_by_owner = |group: &BackupGroup| {
517 match (verify_worker.datastore.get_owner(group), &owner) {
518 (Ok(ref group_owner), Some(owner)) => {
519 group_owner == owner
520 || (group_owner.is_token()
521 && !owner.is_token()
522 && group_owner.user() == owner.user())
523 }
524 (Ok(_), None) => true,
525 (Err(err), Some(_)) => {
526 // intentionally not in task log
527 // the task user might not be allowed to see this group!
528 println!("Failed to get owner of group '{}' - {}", group, err);
529 false
530 }
531 (Err(err), None) => {
532 // we don't filter by owner, but we want to log the error
533 task_log!(worker, "Failed to get owner of group '{} - {}", group, err,);
534 errors.push(group.to_string());
535 true
536 }
537 }
538 };
539
540 let mut list = match verify_worker.datastore.iter_backup_groups_ok() {
541 Ok(list) => list
542 .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
543 .filter(filter_by_owner)
544 .collect::<Vec<BackupGroup>>(),
545 Err(err) => {
546 task_log!(worker, "unable to list backups: {}", err,);
547 return Ok(errors);
548 }
549 };
550
551 list.sort_unstable();
552
553 let group_count = list.len();
554 task_log!(worker, "found {} groups", group_count);
555
556 let mut progress = StoreProgress::new(group_count as u64);
557
558 for (pos, group) in list.into_iter().enumerate() {
559 progress.done_groups = pos as u64;
560 progress.done_snapshots = 0;
561 progress.group_snapshots = 0;
562
563 let mut group_errors =
564 verify_backup_group(verify_worker, &group, &mut progress, upid, filter)?;
565 errors.append(&mut group_errors);
566 }
567
568 Ok(errors)
569 }
570
571 /// Filter for the verification of snapshots
572 pub fn verify_filter(
573 ignore_verified_snapshots: bool,
574 outdated_after: Option<i64>,
575 manifest: &BackupManifest,
576 ) -> bool {
577 if !ignore_verified_snapshots {
578 return true;
579 }
580
581 let raw_verify_state = manifest.unprotected["verify_state"].clone();
582 match serde_json::from_value::<SnapshotVerifyState>(raw_verify_state) {
583 Err(_) => true, // no last verification, always include
584 Ok(last_verify) => {
585 match outdated_after {
586 None => false, // never re-verify if ignored and no max age
587 Some(max_age) => {
588 let now = proxmox_time::epoch_i64();
589 let days_since_last_verify = (now - last_verify.upid.starttime) / 86400;
590
591 max_age == 0 || days_since_last_verify > max_age
592 }
593 }
594 }
595 }
596 }