]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/verify.rs
17179c13a424240b73ab1d0cbdfeea58cf5cb0de
[proxmox-backup.git] / src / backup / verify.rs
1 use nix::dir::Dir;
2 use std::collections::HashSet;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::{Arc, Mutex};
5 use std::time::Instant;
6
7 use anyhow::{bail, format_err, Error};
8
9 use pbs_api_types::{Authid, CryptMode, VerifyState, UPID, SnapshotVerifyState};
10 use pbs_datastore::{DataStore, DataBlob, StoreProgress};
11 use pbs_datastore::backup_info::{BackupGroup, BackupDir, BackupInfo};
12 use pbs_datastore::index::IndexFile;
13 use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, FileInfo};
14 use pbs_tools::fs::lock_dir_noblock_shared;
15 use pbs_tools::{task_log, task::WorkerTaskContext};
16
17 use crate::tools::ParallelHandler;
18
19 /// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
20 /// already been verified or detected as corrupt.
21 pub struct VerifyWorker {
22 worker: Arc<dyn WorkerTaskContext>,
23 datastore: Arc<DataStore>,
24 verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
25 corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
26 }
27
28 impl VerifyWorker {
29 /// Creates a new VerifyWorker for a given task worker and datastore.
30 pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore>) -> Self {
31 Self {
32 worker,
33 datastore,
34 // start with 16k chunks == up to 64G data
35 verified_chunks: Arc::new(Mutex::new(HashSet::with_capacity(16 * 1024))),
36 // start with 64 chunks since we assume there are few corrupt ones
37 corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
38 }
39 }
40 }
41
42 fn verify_blob(
43 datastore: Arc<DataStore>,
44 backup_dir: &BackupDir,
45 info: &FileInfo,
46 ) -> Result<(), Error> {
47 let blob = datastore.load_blob(backup_dir, &info.filename)?;
48
49 let raw_size = blob.raw_size();
50 if raw_size != info.size {
51 bail!("wrong size ({} != {})", info.size, raw_size);
52 }
53
54 let csum = openssl::sha::sha256(blob.raw_data());
55 if csum != info.csum {
56 bail!("wrong index checksum");
57 }
58
59 match blob.crypt_mode()? {
60 CryptMode::Encrypt => Ok(()),
61 CryptMode::None => {
62 // digest already verified above
63 blob.decode(None, None)?;
64 Ok(())
65 },
66 CryptMode::SignOnly => bail!("Invalid CryptMode for blob"),
67 }
68 }
69
70 fn rename_corrupted_chunk(
71 datastore: Arc<DataStore>,
72 digest: &[u8;32],
73 worker: &dyn WorkerTaskContext,
74 ) {
75 let (path, digest_str) = datastore.chunk_path(digest);
76
77 let mut counter = 0;
78 let mut new_path = path.clone();
79 loop {
80 new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
81 if new_path.exists() && counter < 9 {
82 counter += 1;
83 } else {
84 break;
85 }
86 }
87
88 match std::fs::rename(&path, &new_path) {
89 Ok(_) => {
90 task_log!(worker, "corrupted chunk renamed to {:?}", &new_path);
91 },
92 Err(err) => {
93 match err.kind() {
94 std::io::ErrorKind::NotFound => { /* ignored */ },
95 _ => task_log!(worker, "could not rename corrupted chunk {:?} - {}", &path, err)
96 }
97 }
98 };
99 }
100
101 fn verify_index_chunks(
102 verify_worker: &VerifyWorker,
103 index: Box<dyn IndexFile + Send>,
104 crypt_mode: CryptMode,
105 ) -> Result<(), Error> {
106 let errors = Arc::new(AtomicUsize::new(0));
107
108 let start_time = Instant::now();
109
110 let mut read_bytes = 0;
111 let mut decoded_bytes = 0;
112
113 let worker2 = Arc::clone(&verify_worker.worker);
114 let datastore2 = Arc::clone(&verify_worker.datastore);
115 let corrupt_chunks2 = Arc::clone(&verify_worker.corrupt_chunks);
116 let verified_chunks2 = Arc::clone(&verify_worker.verified_chunks);
117 let errors2 = Arc::clone(&errors);
118
119 let decoder_pool = ParallelHandler::new(
120 "verify chunk decoder",
121 4,
122 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
123 let chunk_crypt_mode = match chunk.crypt_mode() {
124 Err(err) => {
125 corrupt_chunks2.lock().unwrap().insert(digest);
126 task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err);
127 errors2.fetch_add(1, Ordering::SeqCst);
128 return Ok(());
129 },
130 Ok(mode) => mode,
131 };
132
133 if chunk_crypt_mode != crypt_mode {
134 task_log!(
135 worker2,
136 "chunk CryptMode {:?} does not match index CryptMode {:?}",
137 chunk_crypt_mode,
138 crypt_mode
139 );
140 errors2.fetch_add(1, Ordering::SeqCst);
141 }
142
143 if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
144 corrupt_chunks2.lock().unwrap().insert(digest);
145 task_log!(worker2, "{}", err);
146 errors2.fetch_add(1, Ordering::SeqCst);
147 rename_corrupted_chunk(datastore2.clone(), &digest, &worker2);
148 } else {
149 verified_chunks2.lock().unwrap().insert(digest);
150 }
151
152 Ok(())
153 }
154 );
155
156 let skip_chunk = |digest: &[u8; 32]| -> bool {
157 if verify_worker.verified_chunks.lock().unwrap().contains(digest) {
158 true
159 } else if verify_worker.corrupt_chunks.lock().unwrap().contains(digest) {
160 let digest_str = proxmox::tools::digest_to_hex(digest);
161 task_log!(verify_worker.worker, "chunk {} was marked as corrupt", digest_str);
162 errors.fetch_add(1, Ordering::SeqCst);
163 true
164 } else {
165 false
166 }
167 };
168
169 let check_abort = |pos: usize| -> Result<(), Error> {
170 if pos & 1023 == 0 {
171 verify_worker.worker.check_abort()?;
172 verify_worker.worker.fail_on_shutdown()?;
173 }
174 Ok(())
175 };
176
177 let chunk_list =
178 verify_worker
179 .datastore
180 .get_chunks_in_order(&index, skip_chunk, check_abort)?;
181
182 for (pos, _) in chunk_list {
183 verify_worker.worker.check_abort()?;
184 verify_worker.worker.fail_on_shutdown()?;
185
186 let info = index.chunk_info(pos).unwrap();
187
188 // we must always recheck this here, the parallel worker below alter it!
189 if skip_chunk(&info.digest) {
190 continue; // already verified or marked corrupt
191 }
192
193 match verify_worker.datastore.load_chunk(&info.digest) {
194 Err(err) => {
195 verify_worker.corrupt_chunks.lock().unwrap().insert(info.digest);
196 task_log!(verify_worker.worker, "can't verify chunk, load failed - {}", err);
197 errors.fetch_add(1, Ordering::SeqCst);
198 rename_corrupted_chunk(
199 verify_worker.datastore.clone(),
200 &info.digest,
201 &verify_worker.worker,
202 );
203 }
204 Ok(chunk) => {
205 let size = info.size();
206 read_bytes += chunk.raw_size();
207 decoder_pool.send((chunk, info.digest, size))?;
208 decoded_bytes += size;
209 }
210 }
211 }
212
213 decoder_pool.complete()?;
214
215 let elapsed = start_time.elapsed().as_secs_f64();
216
217 let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
218 let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
219
220 let read_speed = read_bytes_mib / elapsed;
221 let decode_speed = decoded_bytes_mib / elapsed;
222
223 let error_count = errors.load(Ordering::SeqCst);
224
225 task_log!(
226 verify_worker.worker,
227 " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
228 read_bytes_mib,
229 decoded_bytes_mib,
230 elapsed,
231 read_speed,
232 decode_speed,
233 error_count,
234 );
235
236 if errors.load(Ordering::SeqCst) > 0 {
237 bail!("chunks could not be verified");
238 }
239
240 Ok(())
241 }
242
243 fn verify_fixed_index(
244 verify_worker: &VerifyWorker,
245 backup_dir: &BackupDir,
246 info: &FileInfo,
247 ) -> Result<(), Error> {
248 let mut path = backup_dir.relative_path();
249 path.push(&info.filename);
250
251 let index = verify_worker.datastore.open_fixed_reader(&path)?;
252
253 let (csum, size) = index.compute_csum();
254 if size != info.size {
255 bail!("wrong size ({} != {})", info.size, size);
256 }
257
258 if csum != info.csum {
259 bail!("wrong index checksum");
260 }
261
262 verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
263 }
264
265 fn verify_dynamic_index(
266 verify_worker: &VerifyWorker,
267 backup_dir: &BackupDir,
268 info: &FileInfo,
269 ) -> Result<(), Error> {
270 let mut path = backup_dir.relative_path();
271 path.push(&info.filename);
272
273 let index = verify_worker.datastore.open_dynamic_reader(&path)?;
274
275 let (csum, size) = index.compute_csum();
276 if size != info.size {
277 bail!("wrong size ({} != {})", info.size, size);
278 }
279
280 if csum != info.csum {
281 bail!("wrong index checksum");
282 }
283
284 verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
285 }
286
287 /// Verify a single backup snapshot
288 ///
289 /// This checks all archives inside a backup snapshot.
290 /// Errors are logged to the worker log.
291 ///
292 /// Returns
293 /// - Ok(true) if verify is successful
294 /// - Ok(false) if there were verification errors
295 /// - Err(_) if task was aborted
296 pub fn verify_backup_dir(
297 verify_worker: &VerifyWorker,
298 backup_dir: &BackupDir,
299 upid: UPID,
300 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
301 ) -> Result<bool, Error> {
302 let snap_lock = lock_dir_noblock_shared(
303 &verify_worker.datastore.snapshot_path(&backup_dir),
304 "snapshot",
305 "locked by another operation",
306 );
307 match snap_lock {
308 Ok(snap_lock) => {
309 verify_backup_dir_with_lock(verify_worker, backup_dir, upid, filter, snap_lock)
310 }
311 Err(err) => {
312 task_log!(
313 verify_worker.worker,
314 "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
315 verify_worker.datastore.name(),
316 backup_dir,
317 err,
318 );
319 Ok(true)
320 }
321 }
322 }
323
324 /// See verify_backup_dir
325 pub fn verify_backup_dir_with_lock(
326 verify_worker: &VerifyWorker,
327 backup_dir: &BackupDir,
328 upid: UPID,
329 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
330 _snap_lock: Dir,
331 ) -> Result<bool, Error> {
332 let manifest = match verify_worker.datastore.load_manifest(&backup_dir) {
333 Ok((manifest, _)) => manifest,
334 Err(err) => {
335 task_log!(
336 verify_worker.worker,
337 "verify {}:{} - manifest load error: {}",
338 verify_worker.datastore.name(),
339 backup_dir,
340 err,
341 );
342 return Ok(false);
343 }
344 };
345
346 if let Some(filter) = filter {
347 if !filter(&manifest) {
348 task_log!(
349 verify_worker.worker,
350 "SKIPPED: verify {}:{} (recently verified)",
351 verify_worker.datastore.name(),
352 backup_dir,
353 );
354 return Ok(true);
355 }
356 }
357
358 task_log!(verify_worker.worker, "verify {}:{}", verify_worker.datastore.name(), backup_dir);
359
360 let mut error_count = 0;
361
362 let mut verify_result = VerifyState::Ok;
363 for info in manifest.files() {
364 let result = proxmox::try_block!({
365 task_log!(verify_worker.worker, " check {}", info.filename);
366 match archive_type(&info.filename)? {
367 ArchiveType::FixedIndex => verify_fixed_index(verify_worker, &backup_dir, info),
368 ArchiveType::DynamicIndex => verify_dynamic_index(verify_worker, &backup_dir, info),
369 ArchiveType::Blob => {
370 verify_blob(verify_worker.datastore.clone(), &backup_dir, info)
371 }
372 }
373 });
374
375 verify_worker.worker.check_abort()?;
376 verify_worker.worker.fail_on_shutdown()?;
377
378 if let Err(err) = result {
379 task_log!(
380 verify_worker.worker,
381 "verify {}:{}/{} failed: {}",
382 verify_worker.datastore.name(),
383 backup_dir,
384 info.filename,
385 err,
386 );
387 error_count += 1;
388 verify_result = VerifyState::Failed;
389 }
390 }
391
392 let verify_state = SnapshotVerifyState {
393 state: verify_result,
394 upid,
395 };
396 let verify_state = serde_json::to_value(verify_state)?;
397 verify_worker
398 .datastore
399 .update_manifest(&backup_dir, |manifest| {
400 manifest.unprotected["verify_state"] = verify_state;
401 })
402 .map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
403
404 Ok(error_count == 0)
405 }
406
407 /// Verify all backups inside a backup group
408 ///
409 /// Errors are logged to the worker log.
410 ///
411 /// Returns
412 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
413 /// - Err(_) if task was aborted
414 pub fn verify_backup_group(
415 verify_worker: &VerifyWorker,
416 group: &BackupGroup,
417 progress: &mut StoreProgress,
418 upid: &UPID,
419 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
420 ) -> Result<Vec<String>, Error> {
421 let mut errors = Vec::new();
422 let mut list = match group.list_backups(&verify_worker.datastore.base_path()) {
423 Ok(list) => list,
424 Err(err) => {
425 task_log!(
426 verify_worker.worker,
427 "verify group {}:{} - unable to list backups: {}",
428 verify_worker.datastore.name(),
429 group,
430 err,
431 );
432 return Ok(errors);
433 }
434 };
435
436 let snapshot_count = list.len();
437 task_log!(
438 verify_worker.worker,
439 "verify group {}:{} ({} snapshots)",
440 verify_worker.datastore.name(),
441 group,
442 snapshot_count
443 );
444
445 progress.group_snapshots = snapshot_count as u64;
446
447 BackupInfo::sort_list(&mut list, false); // newest first
448 for (pos, info) in list.into_iter().enumerate() {
449 if !verify_backup_dir(verify_worker, &info.backup_dir, upid.clone(), filter)? {
450 errors.push(info.backup_dir.to_string());
451 }
452 progress.done_snapshots = pos as u64 + 1;
453 task_log!(verify_worker.worker, "percentage done: {}", progress);
454 }
455
456 Ok(errors)
457 }
458
459 /// Verify all (owned) backups inside a datastore
460 ///
461 /// Errors are logged to the worker log.
462 ///
463 /// Returns
464 /// - Ok(failed_dirs) where failed_dirs had verification errors
465 /// - Err(_) if task was aborted
466 pub fn verify_all_backups(
467 verify_worker: &VerifyWorker,
468 upid: &UPID,
469 owner: Option<Authid>,
470 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
471 ) -> Result<Vec<String>, Error> {
472 let mut errors = Vec::new();
473 let worker = Arc::clone(&verify_worker.worker);
474
475 task_log!(worker, "verify datastore {}", verify_worker.datastore.name());
476
477 if let Some(owner) = &owner {
478 task_log!(worker, "limiting to backups owned by {}", owner);
479 }
480
481 let filter_by_owner = |group: &BackupGroup| {
482 match (verify_worker.datastore.get_owner(group), &owner) {
483 (Ok(ref group_owner), Some(owner)) => {
484 group_owner == owner
485 || (group_owner.is_token()
486 && !owner.is_token()
487 && group_owner.user() == owner.user())
488 },
489 (Ok(_), None) => true,
490 (Err(err), Some(_)) => {
491 // intentionally not in task log
492 // the task user might not be allowed to see this group!
493 println!("Failed to get owner of group '{}' - {}", group, err);
494 false
495 },
496 (Err(err), None) => {
497 // we don't filter by owner, but we want to log the error
498 task_log!(
499 worker,
500 "Failed to get owner of group '{} - {}",
501 group,
502 err,
503 );
504 errors.push(group.to_string());
505 true
506 },
507 }
508 };
509
510 let mut list = match BackupInfo::list_backup_groups(&verify_worker.datastore.base_path()) {
511 Ok(list) => list
512 .into_iter()
513 .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
514 .filter(filter_by_owner)
515 .collect::<Vec<BackupGroup>>(),
516 Err(err) => {
517 task_log!(worker, "unable to list backups: {}", err,);
518 return Ok(errors);
519 }
520 };
521
522 list.sort_unstable();
523
524 let group_count = list.len();
525 task_log!(worker, "found {} groups", group_count);
526
527 let mut progress = StoreProgress::new(group_count as u64);
528
529 for (pos, group) in list.into_iter().enumerate() {
530 progress.done_groups = pos as u64;
531 progress.done_snapshots = 0;
532 progress.group_snapshots = 0;
533
534 let mut group_errors =
535 verify_backup_group(verify_worker, &group, &mut progress, upid, filter)?;
536 errors.append(&mut group_errors);
537 }
538
539 Ok(errors)
540 }
541
542 /// Filter for the verification of snapshots
543 pub fn verify_filter(
544 ignore_verified_snapshots: bool,
545 outdated_after: Option<i64>,
546 manifest: &BackupManifest,
547 ) -> bool {
548 if !ignore_verified_snapshots {
549 return true;
550 }
551
552 let raw_verify_state = manifest.unprotected["verify_state"].clone();
553 match serde_json::from_value::<SnapshotVerifyState>(raw_verify_state) {
554 Err(_) => true, // no last verification, always include
555 Ok(last_verify) => {
556 match outdated_after {
557 None => false, // never re-verify if ignored and no max age
558 Some(max_age) => {
559 let now = proxmox::tools::time::epoch_i64();
560 let days_since_last_verify = (now - last_verify.upid.starttime) / 86400;
561
562 days_since_last_verify > max_age
563 }
564 }
565 }
566 }
567 }