]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/verify.rs
rename TaskState to WorkerTaskContext
[proxmox-backup.git] / src / backup / verify.rs
1 use nix::dir::Dir;
2 use std::collections::HashSet;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::{Arc, Mutex};
5 use std::time::Instant;
6
7 use anyhow::{bail, format_err, Error};
8
9 use pbs_api_types::{Authid, CryptMode, VerifyState, UPID, SnapshotVerifyState};
10 use pbs_datastore::{DataBlob, StoreProgress};
11 use pbs_datastore::backup_info::{BackupGroup, BackupDir, BackupInfo};
12 use pbs_datastore::index::IndexFile;
13 use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, FileInfo};
14 use pbs_tools::fs::lock_dir_noblock_shared;
15 use pbs_tools::{task_log, task::WorkerTaskContext};
16
17 use crate::{
18 backup::DataStore,
19 tools::ParallelHandler,
20 };
21
22 /// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
23 /// already been verified or detected as corrupt.
24 pub struct VerifyWorker {
25 worker: Arc<dyn WorkerTaskContext + Send + Sync>,
26 datastore: Arc<DataStore>,
27 verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
28 corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
29 }
30
31 impl VerifyWorker {
32 /// Creates a new VerifyWorker for a given task worker and datastore.
33 pub fn new(worker: Arc<dyn WorkerTaskContext + Send + Sync>, datastore: Arc<DataStore>) -> Self {
34 Self {
35 worker,
36 datastore,
37 // start with 16k chunks == up to 64G data
38 verified_chunks: Arc::new(Mutex::new(HashSet::with_capacity(16 * 1024))),
39 // start with 64 chunks since we assume there are few corrupt ones
40 corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
41 }
42 }
43 }
44
45 fn verify_blob(
46 datastore: Arc<DataStore>,
47 backup_dir: &BackupDir,
48 info: &FileInfo,
49 ) -> Result<(), Error> {
50 let blob = datastore.load_blob(backup_dir, &info.filename)?;
51
52 let raw_size = blob.raw_size();
53 if raw_size != info.size {
54 bail!("wrong size ({} != {})", info.size, raw_size);
55 }
56
57 let csum = openssl::sha::sha256(blob.raw_data());
58 if csum != info.csum {
59 bail!("wrong index checksum");
60 }
61
62 match blob.crypt_mode()? {
63 CryptMode::Encrypt => Ok(()),
64 CryptMode::None => {
65 // digest already verified above
66 blob.decode(None, None)?;
67 Ok(())
68 },
69 CryptMode::SignOnly => bail!("Invalid CryptMode for blob"),
70 }
71 }
72
73 fn rename_corrupted_chunk(
74 datastore: Arc<DataStore>,
75 digest: &[u8;32],
76 worker: &dyn WorkerTaskContext,
77 ) {
78 let (path, digest_str) = datastore.chunk_path(digest);
79
80 let mut counter = 0;
81 let mut new_path = path.clone();
82 loop {
83 new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
84 if new_path.exists() && counter < 9 {
85 counter += 1;
86 } else {
87 break;
88 }
89 }
90
91 match std::fs::rename(&path, &new_path) {
92 Ok(_) => {
93 task_log!(worker, "corrupted chunk renamed to {:?}", &new_path);
94 },
95 Err(err) => {
96 match err.kind() {
97 std::io::ErrorKind::NotFound => { /* ignored */ },
98 _ => task_log!(worker, "could not rename corrupted chunk {:?} - {}", &path, err)
99 }
100 }
101 };
102 }
103
104 fn verify_index_chunks(
105 verify_worker: &VerifyWorker,
106 index: Box<dyn IndexFile + Send>,
107 crypt_mode: CryptMode,
108 ) -> Result<(), Error> {
109 let errors = Arc::new(AtomicUsize::new(0));
110
111 let start_time = Instant::now();
112
113 let mut read_bytes = 0;
114 let mut decoded_bytes = 0;
115
116 let worker2 = Arc::clone(&verify_worker.worker);
117 let datastore2 = Arc::clone(&verify_worker.datastore);
118 let corrupt_chunks2 = Arc::clone(&verify_worker.corrupt_chunks);
119 let verified_chunks2 = Arc::clone(&verify_worker.verified_chunks);
120 let errors2 = Arc::clone(&errors);
121
122 let decoder_pool = ParallelHandler::new(
123 "verify chunk decoder",
124 4,
125 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
126 let chunk_crypt_mode = match chunk.crypt_mode() {
127 Err(err) => {
128 corrupt_chunks2.lock().unwrap().insert(digest);
129 task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err);
130 errors2.fetch_add(1, Ordering::SeqCst);
131 return Ok(());
132 },
133 Ok(mode) => mode,
134 };
135
136 if chunk_crypt_mode != crypt_mode {
137 task_log!(
138 worker2,
139 "chunk CryptMode {:?} does not match index CryptMode {:?}",
140 chunk_crypt_mode,
141 crypt_mode
142 );
143 errors2.fetch_add(1, Ordering::SeqCst);
144 }
145
146 if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
147 corrupt_chunks2.lock().unwrap().insert(digest);
148 task_log!(worker2, "{}", err);
149 errors2.fetch_add(1, Ordering::SeqCst);
150 rename_corrupted_chunk(datastore2.clone(), &digest, &worker2);
151 } else {
152 verified_chunks2.lock().unwrap().insert(digest);
153 }
154
155 Ok(())
156 }
157 );
158
159 let skip_chunk = |digest: &[u8; 32]| -> bool {
160 if verify_worker.verified_chunks.lock().unwrap().contains(digest) {
161 true
162 } else if verify_worker.corrupt_chunks.lock().unwrap().contains(digest) {
163 let digest_str = proxmox::tools::digest_to_hex(digest);
164 task_log!(verify_worker.worker, "chunk {} was marked as corrupt", digest_str);
165 errors.fetch_add(1, Ordering::SeqCst);
166 true
167 } else {
168 false
169 }
170 };
171
172 let check_abort = |pos: usize| -> Result<(), Error> {
173 if pos & 1023 == 0 {
174 verify_worker.worker.check_abort()?;
175 proxmox_rest_server::fail_on_shutdown()?;
176 }
177 Ok(())
178 };
179
180 let chunk_list =
181 verify_worker
182 .datastore
183 .get_chunks_in_order(&index, skip_chunk, check_abort)?;
184
185 for (pos, _) in chunk_list {
186 verify_worker.worker.check_abort()?;
187 proxmox_rest_server::fail_on_shutdown()?;
188
189 let info = index.chunk_info(pos).unwrap();
190
191 // we must always recheck this here, the parallel worker below alter it!
192 if skip_chunk(&info.digest) {
193 continue; // already verified or marked corrupt
194 }
195
196 match verify_worker.datastore.load_chunk(&info.digest) {
197 Err(err) => {
198 verify_worker.corrupt_chunks.lock().unwrap().insert(info.digest);
199 task_log!(verify_worker.worker, "can't verify chunk, load failed - {}", err);
200 errors.fetch_add(1, Ordering::SeqCst);
201 rename_corrupted_chunk(
202 verify_worker.datastore.clone(),
203 &info.digest,
204 &verify_worker.worker,
205 );
206 }
207 Ok(chunk) => {
208 let size = info.size();
209 read_bytes += chunk.raw_size();
210 decoder_pool.send((chunk, info.digest, size))?;
211 decoded_bytes += size;
212 }
213 }
214 }
215
216 decoder_pool.complete()?;
217
218 let elapsed = start_time.elapsed().as_secs_f64();
219
220 let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
221 let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
222
223 let read_speed = read_bytes_mib / elapsed;
224 let decode_speed = decoded_bytes_mib / elapsed;
225
226 let error_count = errors.load(Ordering::SeqCst);
227
228 task_log!(
229 verify_worker.worker,
230 " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
231 read_bytes_mib,
232 decoded_bytes_mib,
233 elapsed,
234 read_speed,
235 decode_speed,
236 error_count,
237 );
238
239 if errors.load(Ordering::SeqCst) > 0 {
240 bail!("chunks could not be verified");
241 }
242
243 Ok(())
244 }
245
246 fn verify_fixed_index(
247 verify_worker: &VerifyWorker,
248 backup_dir: &BackupDir,
249 info: &FileInfo,
250 ) -> Result<(), Error> {
251 let mut path = backup_dir.relative_path();
252 path.push(&info.filename);
253
254 let index = verify_worker.datastore.open_fixed_reader(&path)?;
255
256 let (csum, size) = index.compute_csum();
257 if size != info.size {
258 bail!("wrong size ({} != {})", info.size, size);
259 }
260
261 if csum != info.csum {
262 bail!("wrong index checksum");
263 }
264
265 verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
266 }
267
268 fn verify_dynamic_index(
269 verify_worker: &VerifyWorker,
270 backup_dir: &BackupDir,
271 info: &FileInfo,
272 ) -> Result<(), Error> {
273 let mut path = backup_dir.relative_path();
274 path.push(&info.filename);
275
276 let index = verify_worker.datastore.open_dynamic_reader(&path)?;
277
278 let (csum, size) = index.compute_csum();
279 if size != info.size {
280 bail!("wrong size ({} != {})", info.size, size);
281 }
282
283 if csum != info.csum {
284 bail!("wrong index checksum");
285 }
286
287 verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
288 }
289
290 /// Verify a single backup snapshot
291 ///
292 /// This checks all archives inside a backup snapshot.
293 /// Errors are logged to the worker log.
294 ///
295 /// Returns
296 /// - Ok(true) if verify is successful
297 /// - Ok(false) if there were verification errors
298 /// - Err(_) if task was aborted
299 pub fn verify_backup_dir(
300 verify_worker: &VerifyWorker,
301 backup_dir: &BackupDir,
302 upid: UPID,
303 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
304 ) -> Result<bool, Error> {
305 let snap_lock = lock_dir_noblock_shared(
306 &verify_worker.datastore.snapshot_path(&backup_dir),
307 "snapshot",
308 "locked by another operation",
309 );
310 match snap_lock {
311 Ok(snap_lock) => {
312 verify_backup_dir_with_lock(verify_worker, backup_dir, upid, filter, snap_lock)
313 }
314 Err(err) => {
315 task_log!(
316 verify_worker.worker,
317 "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
318 verify_worker.datastore.name(),
319 backup_dir,
320 err,
321 );
322 Ok(true)
323 }
324 }
325 }
326
327 /// See verify_backup_dir
328 pub fn verify_backup_dir_with_lock(
329 verify_worker: &VerifyWorker,
330 backup_dir: &BackupDir,
331 upid: UPID,
332 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
333 _snap_lock: Dir,
334 ) -> Result<bool, Error> {
335 let manifest = match verify_worker.datastore.load_manifest(&backup_dir) {
336 Ok((manifest, _)) => manifest,
337 Err(err) => {
338 task_log!(
339 verify_worker.worker,
340 "verify {}:{} - manifest load error: {}",
341 verify_worker.datastore.name(),
342 backup_dir,
343 err,
344 );
345 return Ok(false);
346 }
347 };
348
349 if let Some(filter) = filter {
350 if !filter(&manifest) {
351 task_log!(
352 verify_worker.worker,
353 "SKIPPED: verify {}:{} (recently verified)",
354 verify_worker.datastore.name(),
355 backup_dir,
356 );
357 return Ok(true);
358 }
359 }
360
361 task_log!(verify_worker.worker, "verify {}:{}", verify_worker.datastore.name(), backup_dir);
362
363 let mut error_count = 0;
364
365 let mut verify_result = VerifyState::Ok;
366 for info in manifest.files() {
367 let result = proxmox::try_block!({
368 task_log!(verify_worker.worker, " check {}", info.filename);
369 match archive_type(&info.filename)? {
370 ArchiveType::FixedIndex => verify_fixed_index(verify_worker, &backup_dir, info),
371 ArchiveType::DynamicIndex => verify_dynamic_index(verify_worker, &backup_dir, info),
372 ArchiveType::Blob => {
373 verify_blob(verify_worker.datastore.clone(), &backup_dir, info)
374 }
375 }
376 });
377
378 verify_worker.worker.check_abort()?;
379 proxmox_rest_server::fail_on_shutdown()?;
380
381 if let Err(err) = result {
382 task_log!(
383 verify_worker.worker,
384 "verify {}:{}/{} failed: {}",
385 verify_worker.datastore.name(),
386 backup_dir,
387 info.filename,
388 err,
389 );
390 error_count += 1;
391 verify_result = VerifyState::Failed;
392 }
393 }
394
395 let verify_state = SnapshotVerifyState {
396 state: verify_result,
397 upid,
398 };
399 let verify_state = serde_json::to_value(verify_state)?;
400 verify_worker
401 .datastore
402 .update_manifest(&backup_dir, |manifest| {
403 manifest.unprotected["verify_state"] = verify_state;
404 })
405 .map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
406
407 Ok(error_count == 0)
408 }
409
410 /// Verify all backups inside a backup group
411 ///
412 /// Errors are logged to the worker log.
413 ///
414 /// Returns
415 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
416 /// - Err(_) if task was aborted
417 pub fn verify_backup_group(
418 verify_worker: &VerifyWorker,
419 group: &BackupGroup,
420 progress: &mut StoreProgress,
421 upid: &UPID,
422 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
423 ) -> Result<Vec<String>, Error> {
424 let mut errors = Vec::new();
425 let mut list = match group.list_backups(&verify_worker.datastore.base_path()) {
426 Ok(list) => list,
427 Err(err) => {
428 task_log!(
429 verify_worker.worker,
430 "verify group {}:{} - unable to list backups: {}",
431 verify_worker.datastore.name(),
432 group,
433 err,
434 );
435 return Ok(errors);
436 }
437 };
438
439 let snapshot_count = list.len();
440 task_log!(
441 verify_worker.worker,
442 "verify group {}:{} ({} snapshots)",
443 verify_worker.datastore.name(),
444 group,
445 snapshot_count
446 );
447
448 progress.group_snapshots = snapshot_count as u64;
449
450 BackupInfo::sort_list(&mut list, false); // newest first
451 for (pos, info) in list.into_iter().enumerate() {
452 if !verify_backup_dir(verify_worker, &info.backup_dir, upid.clone(), filter)? {
453 errors.push(info.backup_dir.to_string());
454 }
455 progress.done_snapshots = pos as u64 + 1;
456 task_log!(verify_worker.worker, "percentage done: {}", progress);
457 }
458
459 Ok(errors)
460 }
461
462 /// Verify all (owned) backups inside a datastore
463 ///
464 /// Errors are logged to the worker log.
465 ///
466 /// Returns
467 /// - Ok(failed_dirs) where failed_dirs had verification errors
468 /// - Err(_) if task was aborted
469 pub fn verify_all_backups(
470 verify_worker: &VerifyWorker,
471 upid: &UPID,
472 owner: Option<Authid>,
473 filter: Option<&dyn Fn(&BackupManifest) -> bool>,
474 ) -> Result<Vec<String>, Error> {
475 let mut errors = Vec::new();
476 let worker = Arc::clone(&verify_worker.worker);
477
478 task_log!(worker, "verify datastore {}", verify_worker.datastore.name());
479
480 if let Some(owner) = &owner {
481 task_log!(worker, "limiting to backups owned by {}", owner);
482 }
483
484 let filter_by_owner = |group: &BackupGroup| {
485 match (verify_worker.datastore.get_owner(group), &owner) {
486 (Ok(ref group_owner), Some(owner)) => {
487 group_owner == owner
488 || (group_owner.is_token()
489 && !owner.is_token()
490 && group_owner.user() == owner.user())
491 },
492 (Ok(_), None) => true,
493 (Err(err), Some(_)) => {
494 // intentionally not in task log
495 // the task user might not be allowed to see this group!
496 println!("Failed to get owner of group '{}' - {}", group, err);
497 false
498 },
499 (Err(err), None) => {
500 // we don't filter by owner, but we want to log the error
501 task_log!(
502 worker,
503 "Failed to get owner of group '{} - {}",
504 group,
505 err,
506 );
507 errors.push(group.to_string());
508 true
509 },
510 }
511 };
512
513 let mut list = match BackupInfo::list_backup_groups(&verify_worker.datastore.base_path()) {
514 Ok(list) => list
515 .into_iter()
516 .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
517 .filter(filter_by_owner)
518 .collect::<Vec<BackupGroup>>(),
519 Err(err) => {
520 task_log!(worker, "unable to list backups: {}", err,);
521 return Ok(errors);
522 }
523 };
524
525 list.sort_unstable();
526
527 let group_count = list.len();
528 task_log!(worker, "found {} groups", group_count);
529
530 let mut progress = StoreProgress::new(group_count as u64);
531
532 for (pos, group) in list.into_iter().enumerate() {
533 progress.done_groups = pos as u64;
534 progress.done_snapshots = 0;
535 progress.group_snapshots = 0;
536
537 let mut group_errors =
538 verify_backup_group(verify_worker, &group, &mut progress, upid, filter)?;
539 errors.append(&mut group_errors);
540 }
541
542 Ok(errors)
543 }
544
545 /// Filter for the verification of snapshots
546 pub fn verify_filter(
547 ignore_verified_snapshots: bool,
548 outdated_after: Option<i64>,
549 manifest: &BackupManifest,
550 ) -> bool {
551 if !ignore_verified_snapshots {
552 return true;
553 }
554
555 let raw_verify_state = manifest.unprotected["verify_state"].clone();
556 match serde_json::from_value::<SnapshotVerifyState>(raw_verify_state) {
557 Err(_) => true, // no last verification, always include
558 Ok(last_verify) => {
559 match outdated_after {
560 None => false, // never re-verify if ignored and no max age
561 Some(max_age) => {
562 let now = proxmox::tools::time::epoch_i64();
563 let days_since_last_verify = (now - last_verify.upid.starttime) / 86400;
564
565 days_since_last_verify > max_age
566 }
567 }
568 }
569 }
570 }