]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/verify.rs
verify: improve code reuse, fix filter function
[proxmox-backup.git] / src / backup / verify.rs
1 use std::collections::HashSet;
2 use std::sync::{Arc, Mutex};
3 use std::sync::atomic::{Ordering, AtomicUsize};
4 use std::time::Instant;
5 use nix::dir::Dir;
6
7 use anyhow::{bail, format_err, Error};
8
9 use crate::{
10 api2::types::*,
11 backup::{
12 DataStore,
13 DataBlob,
14 BackupGroup,
15 BackupDir,
16 BackupInfo,
17 IndexFile,
18 CryptMode,
19 FileInfo,
20 ArchiveType,
21 archive_type,
22 },
23 server::UPID,
24 task::TaskState,
25 task_log,
26 tools::ParallelHandler,
27 tools::fs::lock_dir_noblock_shared,
28 };
29
30 fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
31
32 let blob = datastore.load_blob(backup_dir, &info.filename)?;
33
34 let raw_size = blob.raw_size();
35 if raw_size != info.size {
36 bail!("wrong size ({} != {})", info.size, raw_size);
37 }
38
39 let csum = openssl::sha::sha256(blob.raw_data());
40 if csum != info.csum {
41 bail!("wrong index checksum");
42 }
43
44 match blob.crypt_mode()? {
45 CryptMode::Encrypt => Ok(()),
46 CryptMode::None => {
47 // digest already verified above
48 blob.decode(None, None)?;
49 Ok(())
50 },
51 CryptMode::SignOnly => bail!("Invalid CryptMode for blob"),
52 }
53 }
54
55 fn rename_corrupted_chunk(
56 datastore: Arc<DataStore>,
57 digest: &[u8;32],
58 worker: &dyn TaskState,
59 ) {
60 let (path, digest_str) = datastore.chunk_path(digest);
61
62 let mut counter = 0;
63 let mut new_path = path.clone();
64 loop {
65 new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
66 if new_path.exists() && counter < 9 { counter += 1; } else { break; }
67 }
68
69 match std::fs::rename(&path, &new_path) {
70 Ok(_) => {
71 task_log!(worker, "corrupted chunk renamed to {:?}", &new_path);
72 },
73 Err(err) => {
74 match err.kind() {
75 std::io::ErrorKind::NotFound => { /* ignored */ },
76 _ => task_log!(worker, "could not rename corrupted chunk {:?} - {}", &path, err)
77 }
78 }
79 };
80 }
81
82 fn verify_index_chunks(
83 datastore: Arc<DataStore>,
84 index: Box<dyn IndexFile + Send>,
85 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
86 corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
87 crypt_mode: CryptMode,
88 worker: Arc<dyn TaskState + Send + Sync>,
89 ) -> Result<(), Error> {
90
91 let errors = Arc::new(AtomicUsize::new(0));
92
93 let start_time = Instant::now();
94
95 let mut read_bytes = 0;
96 let mut decoded_bytes = 0;
97
98 let worker2 = Arc::clone(&worker);
99 let datastore2 = Arc::clone(&datastore);
100 let corrupt_chunks2 = Arc::clone(&corrupt_chunks);
101 let verified_chunks2 = Arc::clone(&verified_chunks);
102 let errors2 = Arc::clone(&errors);
103
104 let decoder_pool = ParallelHandler::new(
105 "verify chunk decoder", 4,
106 move |(chunk, digest, size): (DataBlob, [u8;32], u64)| {
107 let chunk_crypt_mode = match chunk.crypt_mode() {
108 Err(err) => {
109 corrupt_chunks2.lock().unwrap().insert(digest);
110 task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err);
111 errors2.fetch_add(1, Ordering::SeqCst);
112 return Ok(());
113 },
114 Ok(mode) => mode,
115 };
116
117 if chunk_crypt_mode != crypt_mode {
118 task_log!(
119 worker2,
120 "chunk CryptMode {:?} does not match index CryptMode {:?}",
121 chunk_crypt_mode,
122 crypt_mode
123 );
124 errors2.fetch_add(1, Ordering::SeqCst);
125 }
126
127 if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
128 corrupt_chunks2.lock().unwrap().insert(digest);
129 task_log!(worker2, "{}", err);
130 errors2.fetch_add(1, Ordering::SeqCst);
131 rename_corrupted_chunk(datastore2.clone(), &digest, &worker2);
132 } else {
133 verified_chunks2.lock().unwrap().insert(digest);
134 }
135
136 Ok(())
137 }
138 );
139
140 for pos in 0..index.index_count() {
141
142 worker.check_abort()?;
143 crate::tools::fail_on_shutdown()?;
144
145 let info = index.chunk_info(pos).unwrap();
146 let size = info.size();
147
148 if verified_chunks.lock().unwrap().contains(&info.digest) {
149 continue; // already verified
150 }
151
152 if corrupt_chunks.lock().unwrap().contains(&info.digest) {
153 let digest_str = proxmox::tools::digest_to_hex(&info.digest);
154 task_log!(worker, "chunk {} was marked as corrupt", digest_str);
155 errors.fetch_add(1, Ordering::SeqCst);
156 continue;
157 }
158
159 match datastore.load_chunk(&info.digest) {
160 Err(err) => {
161 corrupt_chunks.lock().unwrap().insert(info.digest);
162 task_log!(worker, "can't verify chunk, load failed - {}", err);
163 errors.fetch_add(1, Ordering::SeqCst);
164 rename_corrupted_chunk(datastore.clone(), &info.digest, &worker);
165 continue;
166 }
167 Ok(chunk) => {
168 read_bytes += chunk.raw_size();
169 decoder_pool.send((chunk, info.digest, size))?;
170 decoded_bytes += size;
171 }
172 }
173 }
174
175 decoder_pool.complete()?;
176
177 let elapsed = start_time.elapsed().as_secs_f64();
178
179 let read_bytes_mib = (read_bytes as f64)/(1024.0*1024.0);
180 let decoded_bytes_mib = (decoded_bytes as f64)/(1024.0*1024.0);
181
182 let read_speed = read_bytes_mib/elapsed;
183 let decode_speed = decoded_bytes_mib/elapsed;
184
185 let error_count = errors.load(Ordering::SeqCst);
186
187 task_log!(
188 worker,
189 " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
190 read_bytes_mib,
191 decoded_bytes_mib,
192 elapsed,
193 read_speed,
194 decode_speed,
195 error_count,
196 );
197
198 if errors.load(Ordering::SeqCst) > 0 {
199 bail!("chunks could not be verified");
200 }
201
202 Ok(())
203 }
204
205 fn verify_fixed_index(
206 datastore: Arc<DataStore>,
207 backup_dir: &BackupDir,
208 info: &FileInfo,
209 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
210 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
211 worker: Arc<dyn TaskState + Send + Sync>,
212 ) -> Result<(), Error> {
213
214 let mut path = backup_dir.relative_path();
215 path.push(&info.filename);
216
217 let index = datastore.open_fixed_reader(&path)?;
218
219 let (csum, size) = index.compute_csum();
220 if size != info.size {
221 bail!("wrong size ({} != {})", info.size, size);
222 }
223
224 if csum != info.csum {
225 bail!("wrong index checksum");
226 }
227
228 verify_index_chunks(
229 datastore,
230 Box::new(index),
231 verified_chunks,
232 corrupt_chunks,
233 info.chunk_crypt_mode(),
234 worker,
235 )
236 }
237
238 fn verify_dynamic_index(
239 datastore: Arc<DataStore>,
240 backup_dir: &BackupDir,
241 info: &FileInfo,
242 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
243 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
244 worker: Arc<dyn TaskState + Send + Sync>,
245 ) -> Result<(), Error> {
246
247 let mut path = backup_dir.relative_path();
248 path.push(&info.filename);
249
250 let index = datastore.open_dynamic_reader(&path)?;
251
252 let (csum, size) = index.compute_csum();
253 if size != info.size {
254 bail!("wrong size ({} != {})", info.size, size);
255 }
256
257 if csum != info.csum {
258 bail!("wrong index checksum");
259 }
260
261 verify_index_chunks(
262 datastore,
263 Box::new(index),
264 verified_chunks,
265 corrupt_chunks,
266 info.chunk_crypt_mode(),
267 worker,
268 )
269 }
270
271 /// Verify a single backup snapshot
272 ///
273 /// This checks all archives inside a backup snapshot.
274 /// Errors are logged to the worker log.
275 ///
276 /// Returns
277 /// - Ok(true) if verify is successful
278 /// - Ok(false) if there were verification errors
279 /// - Err(_) if task was aborted
280 pub fn verify_backup_dir(
281 datastore: Arc<DataStore>,
282 backup_dir: &BackupDir,
283 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
284 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
285 worker: Arc<dyn TaskState + Send + Sync>,
286 upid: UPID,
287 ) -> Result<bool, Error> {
288 let snap_lock = lock_dir_noblock_shared(
289 &datastore.snapshot_path(&backup_dir),
290 "snapshot",
291 "locked by another operation");
292 match snap_lock {
293 Ok(snap_lock) => verify_backup_dir_with_lock(
294 datastore,
295 backup_dir,
296 verified_chunks,
297 corrupt_chunks,
298 worker,
299 upid,
300 snap_lock
301 ),
302 Err(err) => {
303 task_log!(
304 worker,
305 "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
306 datastore.name(),
307 backup_dir,
308 err,
309 );
310 Ok(true)
311 }
312 }
313 }
314
315 /// See verify_backup_dir
316 pub fn verify_backup_dir_with_lock(
317 datastore: Arc<DataStore>,
318 backup_dir: &BackupDir,
319 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
320 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
321 worker: Arc<dyn TaskState + Send + Sync>,
322 upid: UPID,
323 _snap_lock: Dir,
324 ) -> Result<bool, Error> {
325 let manifest = match datastore.load_manifest(&backup_dir) {
326 Ok((manifest, _)) => manifest,
327 Err(err) => {
328 task_log!(
329 worker,
330 "verify {}:{} - manifest load error: {}",
331 datastore.name(),
332 backup_dir,
333 err,
334 );
335 return Ok(false);
336 }
337 };
338
339 task_log!(worker, "verify {}:{}", datastore.name(), backup_dir);
340
341 let mut error_count = 0;
342
343 let mut verify_result = VerifyState::Ok;
344 for info in manifest.files() {
345 let result = proxmox::try_block!({
346 task_log!(worker, " check {}", info.filename);
347 match archive_type(&info.filename)? {
348 ArchiveType::FixedIndex =>
349 verify_fixed_index(
350 datastore.clone(),
351 &backup_dir,
352 info,
353 verified_chunks.clone(),
354 corrupt_chunks.clone(),
355 worker.clone(),
356 ),
357 ArchiveType::DynamicIndex =>
358 verify_dynamic_index(
359 datastore.clone(),
360 &backup_dir,
361 info,
362 verified_chunks.clone(),
363 corrupt_chunks.clone(),
364 worker.clone(),
365 ),
366 ArchiveType::Blob => verify_blob(datastore.clone(), &backup_dir, info),
367 }
368 });
369
370 worker.check_abort()?;
371 crate::tools::fail_on_shutdown()?;
372
373 if let Err(err) = result {
374 task_log!(
375 worker,
376 "verify {}:{}/{} failed: {}",
377 datastore.name(),
378 backup_dir,
379 info.filename,
380 err,
381 );
382 error_count += 1;
383 verify_result = VerifyState::Failed;
384 }
385
386 }
387
388 let verify_state = SnapshotVerifyState {
389 state: verify_result,
390 upid,
391 };
392 let verify_state = serde_json::to_value(verify_state)?;
393 datastore.update_manifest(&backup_dir, |manifest| {
394 manifest.unprotected["verify_state"] = verify_state;
395 }).map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
396
397 Ok(error_count == 0)
398 }
399
400 /// Verify all backups inside a backup group
401 ///
402 /// Errors are logged to the worker log.
403 ///
404 /// Returns
405 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
406 /// - Err(_) if task was aborted
407 pub fn verify_backup_group<F: Fn(&BackupInfo) -> bool>(
408 datastore: Arc<DataStore>,
409 group: &BackupGroup,
410 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
411 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
412 progress: Option<(usize, usize)>, // (done, snapshot_count)
413 worker: Arc<dyn TaskState + Send + Sync>,
414 upid: &UPID,
415 filter: &F,
416 ) -> Result<(usize, Vec<String>), Error> {
417
418 let mut errors = Vec::new();
419 let mut list = match group.list_backups(&datastore.base_path()) {
420 Ok(list) => list,
421 Err(err) => {
422 task_log!(
423 worker,
424 "verify group {}:{} - unable to list backups: {}",
425 datastore.name(),
426 group,
427 err,
428 );
429 return Ok((0, errors));
430 }
431 };
432
433 task_log!(worker, "verify group {}:{}", datastore.name(), group);
434
435 let (done, snapshot_count) = progress.unwrap_or((0, list.len()));
436
437 let mut count = 0;
438 BackupInfo::sort_list(&mut list, false); // newest first
439 for info in list {
440 count += 1;
441
442 if filter(&info) == false {
443 task_log!(
444 worker,
445 "SKIPPED: verify {}:{} (already verified)",
446 datastore.name(),
447 info.backup_dir,
448 );
449 continue;
450 }
451
452 if !verify_backup_dir(
453 datastore.clone(),
454 &info.backup_dir,
455 verified_chunks.clone(),
456 corrupt_chunks.clone(),
457 worker.clone(),
458 upid.clone(),
459 )? {
460 errors.push(info.backup_dir.to_string());
461 }
462 if snapshot_count != 0 {
463 let pos = done + count;
464 let percentage = ((pos as f64) * 100.0)/(snapshot_count as f64);
465 task_log!(
466 worker,
467 "percentage done: {:.2}% ({} of {} snapshots)",
468 percentage,
469 pos,
470 snapshot_count,
471 );
472 }
473 }
474
475 Ok((count, errors))
476 }
477
478 /// Verify all backups inside a datastore
479 ///
480 /// Errors are logged to the worker log.
481 ///
482 /// Returns
483 /// - Ok(failed_dirs) where failed_dirs had verification errors
484 /// - Err(_) if task was aborted
485 pub fn verify_all_backups<F: Fn(&BackupInfo) -> bool>(
486 datastore: Arc<DataStore>,
487 worker: Arc<dyn TaskState + Send + Sync>,
488 upid: &UPID,
489 filter: &F,
490 ) -> Result<Vec<String>, Error> {
491 let mut errors = Vec::new();
492
493 let mut list = match BackupGroup::list_groups(&datastore.base_path()) {
494 Ok(list) => list
495 .into_iter()
496 .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
497 .collect::<Vec<BackupGroup>>(),
498 Err(err) => {
499 task_log!(
500 worker,
501 "verify datastore {} - unable to list backups: {}",
502 datastore.name(),
503 err,
504 );
505 return Ok(errors);
506 }
507 };
508
509 list.sort_unstable();
510
511 let mut snapshot_count = 0;
512 for group in list.iter() {
513 snapshot_count += group.list_backups(&datastore.base_path())?.len();
514 }
515
516 // start with 16384 chunks (up to 65GB)
517 let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16)));
518
519 // start with 64 chunks since we assume there are few corrupt ones
520 let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
521
522 task_log!(worker, "verify datastore {} ({} snapshots)", datastore.name(), snapshot_count);
523
524 let mut done = 0;
525 for group in list {
526 let (count, mut group_errors) = verify_backup_group(
527 datastore.clone(),
528 &group,
529 verified_chunks.clone(),
530 corrupt_chunks.clone(),
531 Some((done, snapshot_count)),
532 worker.clone(),
533 upid,
534 filter,
535 )?;
536 errors.append(&mut group_errors);
537
538 done += count;
539 }
540
541 Ok(errors)
542 }