]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/verify.rs
503a404b2f72cfbc04fe47480422da3a68eae833
[proxmox-backup.git] / src / backup / verify.rs
1 use nix::dir::Dir;
2 use std::collections::HashSet;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::{Arc, Mutex};
5 use std::time::Instant;
6
7 use anyhow::{bail, format_err, Error};
8
9 use crate::{
10 api2::types::*,
11 backup::{
12 DataStore,
13 StoreProgress,
14 DataBlob,
15 BackupGroup,
16 BackupDir,
17 BackupInfo,
18 BackupManifest,
19 IndexFile,
20 CryptMode,
21 FileInfo,
22 ArchiveType,
23 archive_type,
24 },
25 server::UPID,
26 task::TaskState,
27 task_log,
28 tools::fs::lock_dir_noblock_shared,
29 tools::ParallelHandler,
30 };
31
32 /// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
33 /// already been verified or detected as corrupt.
34 pub struct VerifyWorker {
35 worker: Arc<dyn TaskState + Send + Sync>,
36 datastore: Arc<DataStore>,
37 verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
38 corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
39 }
40
41 impl VerifyWorker {
42 /// Creates a new VerifyWorker for a given task worker and datastore.
43 pub fn new(worker: Arc<dyn TaskState + Send + Sync>, datastore: Arc<DataStore>) -> Self {
44 Self {
45 worker,
46 datastore,
47 // start with 16k chunks == up to 64G data
48 verified_chunks: Arc::new(Mutex::new(HashSet::with_capacity(16 * 1024))),
49 // start with 64 chunks since we assume there are few corrupt ones
50 corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
51 }
52 }
53 }
54
55 fn verify_blob(
56 datastore: Arc<DataStore>,
57 backup_dir: &BackupDir,
58 info: &FileInfo,
59 ) -> Result<(), Error> {
60 let blob = datastore.load_blob(backup_dir, &info.filename)?;
61
62 let raw_size = blob.raw_size();
63 if raw_size != info.size {
64 bail!("wrong size ({} != {})", info.size, raw_size);
65 }
66
67 let csum = openssl::sha::sha256(blob.raw_data());
68 if csum != info.csum {
69 bail!("wrong index checksum");
70 }
71
72 match blob.crypt_mode()? {
73 CryptMode::Encrypt => Ok(()),
74 CryptMode::None => {
75 // digest already verified above
76 blob.decode(None, None)?;
77 Ok(())
78 },
79 CryptMode::SignOnly => bail!("Invalid CryptMode for blob"),
80 }
81 }
82
83 fn rename_corrupted_chunk(
84 datastore: Arc<DataStore>,
85 digest: &[u8;32],
86 worker: &dyn TaskState,
87 ) {
88 let (path, digest_str) = datastore.chunk_path(digest);
89
90 let mut counter = 0;
91 let mut new_path = path.clone();
92 loop {
93 new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
94 if new_path.exists() && counter < 9 {
95 counter += 1;
96 } else {
97 break;
98 }
99 }
100
101 match std::fs::rename(&path, &new_path) {
102 Ok(_) => {
103 task_log!(worker, "corrupted chunk renamed to {:?}", &new_path);
104 },
105 Err(err) => {
106 match err.kind() {
107 std::io::ErrorKind::NotFound => { /* ignored */ },
108 _ => task_log!(worker, "could not rename corrupted chunk {:?} - {}", &path, err)
109 }
110 }
111 };
112 }
113
114 fn verify_index_chunks(
115 verify_worker: &VerifyWorker,
116 index: Box<dyn IndexFile + Send>,
117 crypt_mode: CryptMode,
118 ) -> Result<(), Error> {
119 let errors = Arc::new(AtomicUsize::new(0));
120
121 let start_time = Instant::now();
122
123 let mut read_bytes = 0;
124 let mut decoded_bytes = 0;
125
126 let worker2 = Arc::clone(&verify_worker.worker);
127 let datastore2 = Arc::clone(&verify_worker.datastore);
128 let corrupt_chunks2 = Arc::clone(&verify_worker.corrupt_chunks);
129 let verified_chunks2 = Arc::clone(&verify_worker.verified_chunks);
130 let errors2 = Arc::clone(&errors);
131
132 let decoder_pool = ParallelHandler::new(
133 "verify chunk decoder",
134 4,
135 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
136 let chunk_crypt_mode = match chunk.crypt_mode() {
137 Err(err) => {
138 corrupt_chunks2.lock().unwrap().insert(digest);
139 task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err);
140 errors2.fetch_add(1, Ordering::SeqCst);
141 return Ok(());
142 },
143 Ok(mode) => mode,
144 };
145
146 if chunk_crypt_mode != crypt_mode {
147 task_log!(
148 worker2,
149 "chunk CryptMode {:?} does not match index CryptMode {:?}",
150 chunk_crypt_mode,
151 crypt_mode
152 );
153 errors2.fetch_add(1, Ordering::SeqCst);
154 }
155
156 if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
157 corrupt_chunks2.lock().unwrap().insert(digest);
158 task_log!(worker2, "{}", err);
159 errors2.fetch_add(1, Ordering::SeqCst);
160 rename_corrupted_chunk(datastore2.clone(), &digest, &worker2);
161 } else {
162 verified_chunks2.lock().unwrap().insert(digest);
163 }
164
165 Ok(())
166 }
167 );
168
169 let skip_chunk = |digest: &[u8; 32]| -> bool {
170 if verify_worker.verified_chunks.lock().unwrap().contains(digest) {
171 true
172 } else if verify_worker.corrupt_chunks.lock().unwrap().contains(digest) {
173 let digest_str = proxmox::tools::digest_to_hex(digest);
174 task_log!(verify_worker.worker, "chunk {} was marked as corrupt", digest_str);
175 errors.fetch_add(1, Ordering::SeqCst);
176 true
177 } else {
178 false
179 }
180 };
181
182 let check_abort = |pos: usize| -> Result<(), Error> {
183 if pos & 1023 == 0 {
184 verify_worker.worker.check_abort()?;
185 crate::tools::fail_on_shutdown()?;
186 }
187 Ok(())
188 };
189
190 let chunk_list =
191 verify_worker
192 .datastore
193 .get_chunks_in_order(&index, skip_chunk, check_abort)?;
194
195 for (pos, _) in chunk_list {
196 verify_worker.worker.check_abort()?;
197 crate::tools::fail_on_shutdown()?;
198
199 let info = index.chunk_info(pos).unwrap();
200
201 // we must always recheck this here, the parallel worker below alter it!
202 if skip_chunk(&info.digest) {
203 continue; // already verified or marked corrupt
204 }
205
206 match verify_worker.datastore.load_chunk(&info.digest) {
207 Err(err) => {
208 verify_worker.corrupt_chunks.lock().unwrap().insert(info.digest);
209 task_log!(verify_worker.worker, "can't verify chunk, load failed - {}", err);
210 errors.fetch_add(1, Ordering::SeqCst);
211 rename_corrupted_chunk(
212 verify_worker.datastore.clone(),
213 &info.digest,
214 &verify_worker.worker,
215 );
216 }
217 Ok(chunk) => {
218 let size = info.size();
219 read_bytes += chunk.raw_size();
220 decoder_pool.send((chunk, info.digest, size))?;
221 decoded_bytes += size;
222 }
223 }
224 }
225
226 decoder_pool.complete()?;
227
228 let elapsed = start_time.elapsed().as_secs_f64();
229
230 let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
231 let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
232
233 let read_speed = read_bytes_mib / elapsed;
234 let decode_speed = decoded_bytes_mib / elapsed;
235
236 let error_count = errors.load(Ordering::SeqCst);
237
238 task_log!(
239 verify_worker.worker,
240 " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
241 read_bytes_mib,
242 decoded_bytes_mib,
243 elapsed,
244 read_speed,
245 decode_speed,
246 error_count,
247 );
248
249 if errors.load(Ordering::SeqCst) > 0 {
250 bail!("chunks could not be verified");
251 }
252
253 Ok(())
254 }
255
256 fn verify_fixed_index(
257 verify_worker: &VerifyWorker,
258 backup_dir: &BackupDir,
259 info: &FileInfo,
260 ) -> Result<(), Error> {
261 let mut path = backup_dir.relative_path();
262 path.push(&info.filename);
263
264 let index = verify_worker.datastore.open_fixed_reader(&path)?;
265
266 let (csum, size) = index.compute_csum();
267 if size != info.size {
268 bail!("wrong size ({} != {})", info.size, size);
269 }
270
271 if csum != info.csum {
272 bail!("wrong index checksum");
273 }
274
275 verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
276 }
277
278 fn verify_dynamic_index(
279 verify_worker: &VerifyWorker,
280 backup_dir: &BackupDir,
281 info: &FileInfo,
282 ) -> Result<(), Error> {
283 let mut path = backup_dir.relative_path();
284 path.push(&info.filename);
285
286 let index = verify_worker.datastore.open_dynamic_reader(&path)?;
287
288 let (csum, size) = index.compute_csum();
289 if size != info.size {
290 bail!("wrong size ({} != {})", info.size, size);
291 }
292
293 if csum != info.csum {
294 bail!("wrong index checksum");
295 }
296
297 verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
298 }
299
300 /// Verify a single backup snapshot
301 ///
302 /// This checks all archives inside a backup snapshot.
303 /// Errors are logged to the worker log.
304 ///
305 /// Returns
306 /// - Ok(true) if verify is successful
307 /// - Ok(false) if there were verification errors
308 /// - Err(_) if task was aborted
309 pub fn verify_backup_dir(
310 verify_worker: &VerifyWorker,
311 backup_dir: &BackupDir,
312 upid: UPID,
313 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
314 ) -> Result<bool, Error> {
315 let snap_lock = lock_dir_noblock_shared(
316 &verify_worker.datastore.snapshot_path(&backup_dir),
317 "snapshot",
318 "locked by another operation",
319 );
320 match snap_lock {
321 Ok(snap_lock) => {
322 verify_backup_dir_with_lock(verify_worker, backup_dir, upid, filter, snap_lock)
323 }
324 Err(err) => {
325 task_log!(
326 verify_worker.worker,
327 "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
328 verify_worker.datastore.name(),
329 backup_dir,
330 err,
331 );
332 Ok(true)
333 }
334 }
335 }
336
337 /// See verify_backup_dir
338 pub fn verify_backup_dir_with_lock(
339 verify_worker: &VerifyWorker,
340 backup_dir: &BackupDir,
341 upid: UPID,
342 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
343 _snap_lock: Dir,
344 ) -> Result<bool, Error> {
345 let manifest = match verify_worker.datastore.load_manifest(&backup_dir) {
346 Ok((manifest, _)) => manifest,
347 Err(err) => {
348 task_log!(
349 verify_worker.worker,
350 "verify {}:{} - manifest load error: {}",
351 verify_worker.datastore.name(),
352 backup_dir,
353 err,
354 );
355 return Ok(false);
356 }
357 };
358
359 if let Some(filter) = filter {
360 if !filter(&manifest) {
361 task_log!(
362 verify_worker.worker,
363 "SKIPPED: verify {}:{} (recently verified)",
364 verify_worker.datastore.name(),
365 backup_dir,
366 );
367 return Ok(true);
368 }
369 }
370
371 task_log!(verify_worker.worker, "verify {}:{}", verify_worker.datastore.name(), backup_dir);
372
373 let mut error_count = 0;
374
375 let mut verify_result = VerifyState::Ok;
376 for info in manifest.files() {
377 let result = proxmox::try_block!({
378 task_log!(verify_worker.worker, " check {}", info.filename);
379 match archive_type(&info.filename)? {
380 ArchiveType::FixedIndex => verify_fixed_index(verify_worker, &backup_dir, info),
381 ArchiveType::DynamicIndex => verify_dynamic_index(verify_worker, &backup_dir, info),
382 ArchiveType::Blob => {
383 verify_blob(verify_worker.datastore.clone(), &backup_dir, info)
384 }
385 }
386 });
387
388 verify_worker.worker.check_abort()?;
389 crate::tools::fail_on_shutdown()?;
390
391 if let Err(err) = result {
392 task_log!(
393 verify_worker.worker,
394 "verify {}:{}/{} failed: {}",
395 verify_worker.datastore.name(),
396 backup_dir,
397 info.filename,
398 err,
399 );
400 error_count += 1;
401 verify_result = VerifyState::Failed;
402 }
403 }
404
405 let verify_state = SnapshotVerifyState {
406 state: verify_result,
407 upid,
408 };
409 let verify_state = serde_json::to_value(verify_state)?;
410 verify_worker
411 .datastore
412 .update_manifest(&backup_dir, |manifest| {
413 manifest.unprotected["verify_state"] = verify_state;
414 })
415 .map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
416
417 Ok(error_count == 0)
418 }
419
420 /// Verify all backups inside a backup group
421 ///
422 /// Errors are logged to the worker log.
423 ///
424 /// Returns
425 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
426 /// - Err(_) if task was aborted
427 pub fn verify_backup_group(
428 verify_worker: &VerifyWorker,
429 group: &BackupGroup,
430 progress: &mut StoreProgress,
431 upid: &UPID,
432 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
433 ) -> Result<Vec<String>, Error> {
434 let mut errors = Vec::new();
435 let mut list = match group.list_backups(&verify_worker.datastore.base_path()) {
436 Ok(list) => list,
437 Err(err) => {
438 task_log!(
439 verify_worker.worker,
440 "verify group {}:{} - unable to list backups: {}",
441 verify_worker.datastore.name(),
442 group,
443 err,
444 );
445 return Ok(errors);
446 }
447 };
448
449 let snapshot_count = list.len();
450 task_log!(
451 verify_worker.worker,
452 "verify group {}:{} ({} snapshots)",
453 verify_worker.datastore.name(),
454 group,
455 snapshot_count
456 );
457
458 progress.group_snapshots = snapshot_count as u64;
459
460 BackupInfo::sort_list(&mut list, false); // newest first
461 for (pos, info) in list.into_iter().enumerate() {
462 if !verify_backup_dir(verify_worker, &info.backup_dir, upid.clone(), filter)? {
463 errors.push(info.backup_dir.to_string());
464 }
465 progress.done_snapshots = pos as u64 + 1;
466 task_log!(verify_worker.worker, "percentage done: {}", progress);
467 }
468
469 Ok(errors)
470 }
471
472 /// Verify all (owned) backups inside a datastore
473 ///
474 /// Errors are logged to the worker log.
475 ///
476 /// Returns
477 /// - Ok(failed_dirs) where failed_dirs had verification errors
478 /// - Err(_) if task was aborted
479 pub fn verify_all_backups(
480 verify_worker: &VerifyWorker,
481 upid: &UPID,
482 owner: Option<Authid>,
483 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
484 ) -> Result<Vec<String>, Error> {
485 let mut errors = Vec::new();
486 let worker = Arc::clone(&verify_worker.worker);
487
488 task_log!(worker, "verify datastore {}", verify_worker.datastore.name());
489
490 if let Some(owner) = &owner {
491 task_log!(worker, "limiting to backups owned by {}", owner);
492 }
493
494 let filter_by_owner = |group: &BackupGroup| {
495 match (verify_worker.datastore.get_owner(group), &owner) {
496 (Ok(ref group_owner), Some(owner)) => {
497 group_owner == owner
498 || (group_owner.is_token()
499 && !owner.is_token()
500 && group_owner.user() == owner.user())
501 },
502 (Ok(_), None) => true,
503 (Err(err), Some(_)) => {
504 // intentionally not in task log
505 // the task user might not be allowed to see this group!
506 println!("Failed to get owner of group '{}' - {}", group, err);
507 false
508 },
509 (Err(err), None) => {
510 // we don't filter by owner, but we want to log the error
511 task_log!(
512 worker,
513 "Failed to get owner of group '{} - {}",
514 group,
515 err,
516 );
517 errors.push(group.to_string());
518 true
519 },
520 }
521 };
522
523 let mut list = match BackupInfo::list_backup_groups(&verify_worker.datastore.base_path()) {
524 Ok(list) => list
525 .into_iter()
526 .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
527 .filter(filter_by_owner)
528 .collect::<Vec<BackupGroup>>(),
529 Err(err) => {
530 task_log!(worker, "unable to list backups: {}", err,);
531 return Ok(errors);
532 }
533 };
534
535 list.sort_unstable();
536
537 let group_count = list.len();
538 task_log!(worker, "found {} groups", group_count);
539
540 let mut progress = StoreProgress::new(group_count as u64);
541
542 for (pos, group) in list.into_iter().enumerate() {
543 progress.done_groups = pos as u64;
544 progress.done_snapshots = 0;
545 progress.group_snapshots = 0;
546
547 let mut group_errors =
548 verify_backup_group(verify_worker, &group, &mut progress, upid, filter)?;
549 errors.append(&mut group_errors);
550 }
551
552 Ok(errors)
553 }
554
555 /// Filter for the verification of snapshots
556 pub fn verify_filter(
557 ignore_verified_snapshots: bool,
558 outdated_after: Option<i64>,
559 manifest: &BackupManifest,
560 ) -> bool {
561 if !ignore_verified_snapshots {
562 return true;
563 }
564
565 let raw_verify_state = manifest.unprotected["verify_state"].clone();
566 match serde_json::from_value::<SnapshotVerifyState>(raw_verify_state) {
567 Err(_) => true, // no last verification, always include
568 Ok(last_verify) => {
569 match outdated_after {
570 None => false, // never re-verify if ignored and no max age
571 Some(max_age) => {
572 let now = proxmox::tools::time::epoch_i64();
573 let days_since_last_verify = (now - last_verify.upid.starttime) / 86400;
574
575 days_since_last_verify > max_age
576 }
577 }
578 }
579 }
580 }