]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/verify.rs
tree-wide: fix needless borrows
[proxmox-backup.git] / src / backup / verify.rs
1 use nix::dir::Dir;
2 use std::collections::HashSet;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::{Arc, Mutex};
5 use std::time::Instant;
6
7 use anyhow::{bail, format_err, Error};
8
9 use proxmox_sys::{task_log, WorkerTaskContext};
10
11 use pbs_api_types::{Authid, CryptMode, VerifyState, UPID, SnapshotVerifyState};
12 use pbs_datastore::{DataStore, DataBlob, StoreProgress};
13 use pbs_datastore::backup_info::{BackupGroup, BackupDir, BackupInfo};
14 use pbs_datastore::index::IndexFile;
15 use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, FileInfo};
16 use proxmox_sys::fs::lock_dir_noblock_shared;
17
18 use crate::tools::ParallelHandler;
19
20 /// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
21 /// already been verified or detected as corrupt.
22 pub struct VerifyWorker {
23 worker: Arc<dyn WorkerTaskContext>,
24 datastore: Arc<DataStore>,
25 verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
26 corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
27 }
28
29 impl VerifyWorker {
30 /// Creates a new VerifyWorker for a given task worker and datastore.
31 pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore>) -> Self {
32 Self {
33 worker,
34 datastore,
35 // start with 16k chunks == up to 64G data
36 verified_chunks: Arc::new(Mutex::new(HashSet::with_capacity(16 * 1024))),
37 // start with 64 chunks since we assume there are few corrupt ones
38 corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
39 }
40 }
41 }
42
43 fn verify_blob(
44 datastore: Arc<DataStore>,
45 backup_dir: &BackupDir,
46 info: &FileInfo,
47 ) -> Result<(), Error> {
48 let blob = datastore.load_blob(backup_dir, &info.filename)?;
49
50 let raw_size = blob.raw_size();
51 if raw_size != info.size {
52 bail!("wrong size ({} != {})", info.size, raw_size);
53 }
54
55 let csum = openssl::sha::sha256(blob.raw_data());
56 if csum != info.csum {
57 bail!("wrong index checksum");
58 }
59
60 match blob.crypt_mode()? {
61 CryptMode::Encrypt => Ok(()),
62 CryptMode::None => {
63 // digest already verified above
64 blob.decode(None, None)?;
65 Ok(())
66 },
67 CryptMode::SignOnly => bail!("Invalid CryptMode for blob"),
68 }
69 }
70
71 fn rename_corrupted_chunk(
72 datastore: Arc<DataStore>,
73 digest: &[u8;32],
74 worker: &dyn WorkerTaskContext,
75 ) {
76 let (path, digest_str) = datastore.chunk_path(digest);
77
78 let mut counter = 0;
79 let mut new_path = path.clone();
80 loop {
81 new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
82 if new_path.exists() && counter < 9 {
83 counter += 1;
84 } else {
85 break;
86 }
87 }
88
89 match std::fs::rename(&path, &new_path) {
90 Ok(_) => {
91 task_log!(worker, "corrupted chunk renamed to {:?}", &new_path);
92 },
93 Err(err) => {
94 match err.kind() {
95 std::io::ErrorKind::NotFound => { /* ignored */ },
96 _ => task_log!(worker, "could not rename corrupted chunk {:?} - {}", &path, err)
97 }
98 }
99 };
100 }
101
102 fn verify_index_chunks(
103 verify_worker: &VerifyWorker,
104 index: Box<dyn IndexFile + Send>,
105 crypt_mode: CryptMode,
106 ) -> Result<(), Error> {
107 let errors = Arc::new(AtomicUsize::new(0));
108
109 let start_time = Instant::now();
110
111 let mut read_bytes = 0;
112 let mut decoded_bytes = 0;
113
114 let worker2 = Arc::clone(&verify_worker.worker);
115 let datastore2 = Arc::clone(&verify_worker.datastore);
116 let corrupt_chunks2 = Arc::clone(&verify_worker.corrupt_chunks);
117 let verified_chunks2 = Arc::clone(&verify_worker.verified_chunks);
118 let errors2 = Arc::clone(&errors);
119
120 let decoder_pool = ParallelHandler::new(
121 "verify chunk decoder",
122 4,
123 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
124 let chunk_crypt_mode = match chunk.crypt_mode() {
125 Err(err) => {
126 corrupt_chunks2.lock().unwrap().insert(digest);
127 task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err);
128 errors2.fetch_add(1, Ordering::SeqCst);
129 return Ok(());
130 },
131 Ok(mode) => mode,
132 };
133
134 if chunk_crypt_mode != crypt_mode {
135 task_log!(
136 worker2,
137 "chunk CryptMode {:?} does not match index CryptMode {:?}",
138 chunk_crypt_mode,
139 crypt_mode
140 );
141 errors2.fetch_add(1, Ordering::SeqCst);
142 }
143
144 if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
145 corrupt_chunks2.lock().unwrap().insert(digest);
146 task_log!(worker2, "{}", err);
147 errors2.fetch_add(1, Ordering::SeqCst);
148 rename_corrupted_chunk(datastore2.clone(), &digest, &worker2);
149 } else {
150 verified_chunks2.lock().unwrap().insert(digest);
151 }
152
153 Ok(())
154 }
155 );
156
157 let skip_chunk = |digest: &[u8; 32]| -> bool {
158 if verify_worker.verified_chunks.lock().unwrap().contains(digest) {
159 true
160 } else if verify_worker.corrupt_chunks.lock().unwrap().contains(digest) {
161 let digest_str = hex::encode(digest);
162 task_log!(verify_worker.worker, "chunk {} was marked as corrupt", digest_str);
163 errors.fetch_add(1, Ordering::SeqCst);
164 true
165 } else {
166 false
167 }
168 };
169
170 let check_abort = |pos: usize| -> Result<(), Error> {
171 if pos & 1023 == 0 {
172 verify_worker.worker.check_abort()?;
173 verify_worker.worker.fail_on_shutdown()?;
174 }
175 Ok(())
176 };
177
178 let chunk_list =
179 verify_worker
180 .datastore
181 .get_chunks_in_order(&index, skip_chunk, check_abort)?;
182
183 for (pos, _) in chunk_list {
184 verify_worker.worker.check_abort()?;
185 verify_worker.worker.fail_on_shutdown()?;
186
187 let info = index.chunk_info(pos).unwrap();
188
189 // we must always recheck this here, the parallel worker below alter it!
190 if skip_chunk(&info.digest) {
191 continue; // already verified or marked corrupt
192 }
193
194 match verify_worker.datastore.load_chunk(&info.digest) {
195 Err(err) => {
196 verify_worker.corrupt_chunks.lock().unwrap().insert(info.digest);
197 task_log!(verify_worker.worker, "can't verify chunk, load failed - {}", err);
198 errors.fetch_add(1, Ordering::SeqCst);
199 rename_corrupted_chunk(
200 verify_worker.datastore.clone(),
201 &info.digest,
202 &verify_worker.worker,
203 );
204 }
205 Ok(chunk) => {
206 let size = info.size();
207 read_bytes += chunk.raw_size();
208 decoder_pool.send((chunk, info.digest, size))?;
209 decoded_bytes += size;
210 }
211 }
212 }
213
214 decoder_pool.complete()?;
215
216 let elapsed = start_time.elapsed().as_secs_f64();
217
218 let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
219 let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
220
221 let read_speed = read_bytes_mib / elapsed;
222 let decode_speed = decoded_bytes_mib / elapsed;
223
224 let error_count = errors.load(Ordering::SeqCst);
225
226 task_log!(
227 verify_worker.worker,
228 " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
229 read_bytes_mib,
230 decoded_bytes_mib,
231 elapsed,
232 read_speed,
233 decode_speed,
234 error_count,
235 );
236
237 if errors.load(Ordering::SeqCst) > 0 {
238 bail!("chunks could not be verified");
239 }
240
241 Ok(())
242 }
243
244 fn verify_fixed_index(
245 verify_worker: &VerifyWorker,
246 backup_dir: &BackupDir,
247 info: &FileInfo,
248 ) -> Result<(), Error> {
249 let mut path = backup_dir.relative_path();
250 path.push(&info.filename);
251
252 let index = verify_worker.datastore.open_fixed_reader(&path)?;
253
254 let (csum, size) = index.compute_csum();
255 if size != info.size {
256 bail!("wrong size ({} != {})", info.size, size);
257 }
258
259 if csum != info.csum {
260 bail!("wrong index checksum");
261 }
262
263 verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
264 }
265
266 fn verify_dynamic_index(
267 verify_worker: &VerifyWorker,
268 backup_dir: &BackupDir,
269 info: &FileInfo,
270 ) -> Result<(), Error> {
271 let mut path = backup_dir.relative_path();
272 path.push(&info.filename);
273
274 let index = verify_worker.datastore.open_dynamic_reader(&path)?;
275
276 let (csum, size) = index.compute_csum();
277 if size != info.size {
278 bail!("wrong size ({} != {})", info.size, size);
279 }
280
281 if csum != info.csum {
282 bail!("wrong index checksum");
283 }
284
285 verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
286 }
287
288 /// Verify a single backup snapshot
289 ///
290 /// This checks all archives inside a backup snapshot.
291 /// Errors are logged to the worker log.
292 ///
293 /// Returns
294 /// - Ok(true) if verify is successful
295 /// - Ok(false) if there were verification errors
296 /// - Err(_) if task was aborted
297 pub fn verify_backup_dir(
298 verify_worker: &VerifyWorker,
299 backup_dir: &BackupDir,
300 upid: UPID,
301 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
302 ) -> Result<bool, Error> {
303 let snap_lock = lock_dir_noblock_shared(
304 &verify_worker.datastore.snapshot_path(backup_dir),
305 "snapshot",
306 "locked by another operation",
307 );
308 match snap_lock {
309 Ok(snap_lock) => {
310 verify_backup_dir_with_lock(verify_worker, backup_dir, upid, filter, snap_lock)
311 }
312 Err(err) => {
313 task_log!(
314 verify_worker.worker,
315 "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
316 verify_worker.datastore.name(),
317 backup_dir,
318 err,
319 );
320 Ok(true)
321 }
322 }
323 }
324
325 /// See verify_backup_dir
326 pub fn verify_backup_dir_with_lock(
327 verify_worker: &VerifyWorker,
328 backup_dir: &BackupDir,
329 upid: UPID,
330 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
331 _snap_lock: Dir,
332 ) -> Result<bool, Error> {
333 let manifest = match verify_worker.datastore.load_manifest(backup_dir) {
334 Ok((manifest, _)) => manifest,
335 Err(err) => {
336 task_log!(
337 verify_worker.worker,
338 "verify {}:{} - manifest load error: {}",
339 verify_worker.datastore.name(),
340 backup_dir,
341 err,
342 );
343 return Ok(false);
344 }
345 };
346
347 if let Some(filter) = filter {
348 if !filter(&manifest) {
349 task_log!(
350 verify_worker.worker,
351 "SKIPPED: verify {}:{} (recently verified)",
352 verify_worker.datastore.name(),
353 backup_dir,
354 );
355 return Ok(true);
356 }
357 }
358
359 task_log!(verify_worker.worker, "verify {}:{}", verify_worker.datastore.name(), backup_dir);
360
361 let mut error_count = 0;
362
363 let mut verify_result = VerifyState::Ok;
364 for info in manifest.files() {
365 let result = proxmox_lang::try_block!({
366 task_log!(verify_worker.worker, " check {}", info.filename);
367 match archive_type(&info.filename)? {
368 ArchiveType::FixedIndex => verify_fixed_index(verify_worker, backup_dir, info),
369 ArchiveType::DynamicIndex => verify_dynamic_index(verify_worker, backup_dir, info),
370 ArchiveType::Blob => {
371 verify_blob(verify_worker.datastore.clone(), backup_dir, info)
372 }
373 }
374 });
375
376 verify_worker.worker.check_abort()?;
377 verify_worker.worker.fail_on_shutdown()?;
378
379 if let Err(err) = result {
380 task_log!(
381 verify_worker.worker,
382 "verify {}:{}/{} failed: {}",
383 verify_worker.datastore.name(),
384 backup_dir,
385 info.filename,
386 err,
387 );
388 error_count += 1;
389 verify_result = VerifyState::Failed;
390 }
391 }
392
393 let verify_state = SnapshotVerifyState {
394 state: verify_result,
395 upid,
396 };
397 let verify_state = serde_json::to_value(verify_state)?;
398 verify_worker
399 .datastore
400 .update_manifest(backup_dir, |manifest| {
401 manifest.unprotected["verify_state"] = verify_state;
402 })
403 .map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
404
405 Ok(error_count == 0)
406 }
407
408 /// Verify all backups inside a backup group
409 ///
410 /// Errors are logged to the worker log.
411 ///
412 /// Returns
413 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
414 /// - Err(_) if task was aborted
415 pub fn verify_backup_group(
416 verify_worker: &VerifyWorker,
417 group: &BackupGroup,
418 progress: &mut StoreProgress,
419 upid: &UPID,
420 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
421 ) -> Result<Vec<String>, Error> {
422 let mut errors = Vec::new();
423 let mut list = match group.list_backups(&verify_worker.datastore.base_path()) {
424 Ok(list) => list,
425 Err(err) => {
426 task_log!(
427 verify_worker.worker,
428 "verify group {}:{} - unable to list backups: {}",
429 verify_worker.datastore.name(),
430 group,
431 err,
432 );
433 return Ok(errors);
434 }
435 };
436
437 let snapshot_count = list.len();
438 task_log!(
439 verify_worker.worker,
440 "verify group {}:{} ({} snapshots)",
441 verify_worker.datastore.name(),
442 group,
443 snapshot_count
444 );
445
446 progress.group_snapshots = snapshot_count as u64;
447
448 BackupInfo::sort_list(&mut list, false); // newest first
449 for (pos, info) in list.into_iter().enumerate() {
450 if !verify_backup_dir(verify_worker, &info.backup_dir, upid.clone(), filter)? {
451 errors.push(info.backup_dir.to_string());
452 }
453 progress.done_snapshots = pos as u64 + 1;
454 task_log!(verify_worker.worker, "percentage done: {}", progress);
455 }
456
457 Ok(errors)
458 }
459
460 /// Verify all (owned) backups inside a datastore
461 ///
462 /// Errors are logged to the worker log.
463 ///
464 /// Returns
465 /// - Ok(failed_dirs) where failed_dirs had verification errors
466 /// - Err(_) if task was aborted
467 pub fn verify_all_backups(
468 verify_worker: &VerifyWorker,
469 upid: &UPID,
470 owner: Option<Authid>,
471 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
472 ) -> Result<Vec<String>, Error> {
473 let mut errors = Vec::new();
474 let worker = Arc::clone(&verify_worker.worker);
475
476 task_log!(worker, "verify datastore {}", verify_worker.datastore.name());
477
478 if let Some(owner) = &owner {
479 task_log!(worker, "limiting to backups owned by {}", owner);
480 }
481
482 let filter_by_owner = |group: &BackupGroup| {
483 match (verify_worker.datastore.get_owner(group), &owner) {
484 (Ok(ref group_owner), Some(owner)) => {
485 group_owner == owner
486 || (group_owner.is_token()
487 && !owner.is_token()
488 && group_owner.user() == owner.user())
489 },
490 (Ok(_), None) => true,
491 (Err(err), Some(_)) => {
492 // intentionally not in task log
493 // the task user might not be allowed to see this group!
494 println!("Failed to get owner of group '{}' - {}", group, err);
495 false
496 },
497 (Err(err), None) => {
498 // we don't filter by owner, but we want to log the error
499 task_log!(
500 worker,
501 "Failed to get owner of group '{} - {}",
502 group,
503 err,
504 );
505 errors.push(group.to_string());
506 true
507 },
508 }
509 };
510
511 let mut list = match BackupInfo::list_backup_groups(&verify_worker.datastore.base_path()) {
512 Ok(list) => list
513 .into_iter()
514 .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
515 .filter(filter_by_owner)
516 .collect::<Vec<BackupGroup>>(),
517 Err(err) => {
518 task_log!(worker, "unable to list backups: {}", err,);
519 return Ok(errors);
520 }
521 };
522
523 list.sort_unstable();
524
525 let group_count = list.len();
526 task_log!(worker, "found {} groups", group_count);
527
528 let mut progress = StoreProgress::new(group_count as u64);
529
530 for (pos, group) in list.into_iter().enumerate() {
531 progress.done_groups = pos as u64;
532 progress.done_snapshots = 0;
533 progress.group_snapshots = 0;
534
535 let mut group_errors =
536 verify_backup_group(verify_worker, &group, &mut progress, upid, filter)?;
537 errors.append(&mut group_errors);
538 }
539
540 Ok(errors)
541 }
542
543 /// Filter for the verification of snapshots
544 pub fn verify_filter(
545 ignore_verified_snapshots: bool,
546 outdated_after: Option<i64>,
547 manifest: &BackupManifest,
548 ) -> bool {
549 if !ignore_verified_snapshots {
550 return true;
551 }
552
553 let raw_verify_state = manifest.unprotected["verify_state"].clone();
554 match serde_json::from_value::<SnapshotVerifyState>(raw_verify_state) {
555 Err(_) => true, // no last verification, always include
556 Ok(last_verify) => {
557 match outdated_after {
558 None => false, // never re-verify if ignored and no max age
559 Some(max_age) => {
560 let now = proxmox_time::epoch_i64();
561 let days_since_last_verify = (now - last_verify.upid.starttime) / 86400;
562
563 days_since_last_verify > max_age
564 }
565 }
566 }
567 }
568 }