2 use std
::collections
::HashSet
;
3 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
4 use std
::sync
::{Arc, Mutex}
;
5 use std
::time
::Instant
;
7 use anyhow
::{bail, format_err, Error}
;
9 use proxmox_sys
::{task_log, WorkerTaskContext}
;
12 print_ns_and_snapshot
, Authid
, BackupNamespace
, BackupType
, CryptMode
, DatastoreWithNamespace
,
13 SnapshotVerifyState
, VerifyState
, UPID
,
15 use pbs_datastore
::backup_info
::{BackupDir, BackupGroup, BackupInfo}
;
16 use pbs_datastore
::index
::IndexFile
;
17 use pbs_datastore
::manifest
::{archive_type, ArchiveType, BackupManifest, FileInfo}
;
18 use pbs_datastore
::{DataBlob, DataStore, StoreProgress}
;
19 use proxmox_sys
::fs
::lock_dir_noblock_shared
;
21 use crate::tools
::parallel_handler
::ParallelHandler
;
23 use crate::backup
::hierarchy
::ListAccessibleBackupGroups
;
25 /// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
26 /// already been verified or detected as corrupt.
27 pub struct VerifyWorker
{
28 worker
: Arc
<dyn WorkerTaskContext
>,
29 datastore
: Arc
<DataStore
>,
30 verified_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
31 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
35 /// Creates a new VerifyWorker for a given task worker and datastore.
36 pub fn new(worker
: Arc
<dyn WorkerTaskContext
>, datastore
: Arc
<DataStore
>) -> Self {
40 // start with 16k chunks == up to 64G data
41 verified_chunks
: Arc
::new(Mutex
::new(HashSet
::with_capacity(16 * 1024))),
42 // start with 64 chunks since we assume there are few corrupt ones
43 corrupt_chunks
: Arc
::new(Mutex
::new(HashSet
::with_capacity(64))),
48 fn verify_blob(backup_dir
: &BackupDir
, info
: &FileInfo
) -> Result
<(), Error
> {
49 let blob
= backup_dir
.load_blob(&info
.filename
)?
;
51 let raw_size
= blob
.raw_size();
52 if raw_size
!= info
.size
{
53 bail
!("wrong size ({} != {})", info
.size
, raw_size
);
56 let csum
= openssl
::sha
::sha256(blob
.raw_data());
57 if csum
!= info
.csum
{
58 bail
!("wrong index checksum");
61 match blob
.crypt_mode()?
{
62 CryptMode
::Encrypt
=> Ok(()),
64 // digest already verified above
65 blob
.decode(None
, None
)?
;
68 CryptMode
::SignOnly
=> bail
!("Invalid CryptMode for blob"),
72 fn rename_corrupted_chunk(
73 datastore
: Arc
<DataStore
>,
75 worker
: &dyn WorkerTaskContext
,
77 let (path
, digest_str
) = datastore
.chunk_path(digest
);
80 let mut new_path
= path
.clone();
82 new_path
.set_file_name(format
!("{}.{}.bad", digest_str
, counter
));
83 if new_path
.exists() && counter
< 9 {
90 match std
::fs
::rename(&path
, &new_path
) {
92 task_log
!(worker
, "corrupted chunk renamed to {:?}", &new_path
);
96 std
::io
::ErrorKind
::NotFound
=> { /* ignored */ }
99 "could not rename corrupted chunk {:?} - {}",
108 fn verify_index_chunks(
109 verify_worker
: &VerifyWorker
,
110 index
: Box
<dyn IndexFile
+ Send
>,
111 crypt_mode
: CryptMode
,
112 ) -> Result
<(), Error
> {
113 let errors
= Arc
::new(AtomicUsize
::new(0));
115 let start_time
= Instant
::now();
117 let mut read_bytes
= 0;
118 let mut decoded_bytes
= 0;
120 let worker2
= Arc
::clone(&verify_worker
.worker
);
121 let datastore2
= Arc
::clone(&verify_worker
.datastore
);
122 let corrupt_chunks2
= Arc
::clone(&verify_worker
.corrupt_chunks
);
123 let verified_chunks2
= Arc
::clone(&verify_worker
.verified_chunks
);
124 let errors2
= Arc
::clone(&errors
);
126 let decoder_pool
= ParallelHandler
::new(
127 "verify chunk decoder",
129 move |(chunk
, digest
, size
): (DataBlob
, [u8; 32], u64)| {
130 let chunk_crypt_mode
= match chunk
.crypt_mode() {
132 corrupt_chunks2
.lock().unwrap().insert(digest
);
133 task_log
!(worker2
, "can't verify chunk, unknown CryptMode - {}", err
);
134 errors2
.fetch_add(1, Ordering
::SeqCst
);
140 if chunk_crypt_mode
!= crypt_mode
{
143 "chunk CryptMode {:?} does not match index CryptMode {:?}",
147 errors2
.fetch_add(1, Ordering
::SeqCst
);
150 if let Err(err
) = chunk
.verify_unencrypted(size
as usize, &digest
) {
151 corrupt_chunks2
.lock().unwrap().insert(digest
);
152 task_log
!(worker2
, "{}", err
);
153 errors2
.fetch_add(1, Ordering
::SeqCst
);
154 rename_corrupted_chunk(datastore2
.clone(), &digest
, &worker2
);
156 verified_chunks2
.lock().unwrap().insert(digest
);
163 let skip_chunk
= |digest
: &[u8; 32]| -> bool
{
171 } else if verify_worker
177 let digest_str
= hex
::encode(digest
);
179 verify_worker
.worker
,
180 "chunk {} was marked as corrupt",
183 errors
.fetch_add(1, Ordering
::SeqCst
);
190 let check_abort
= |pos
: usize| -> Result
<(), Error
> {
192 verify_worker
.worker
.check_abort()?
;
193 verify_worker
.worker
.fail_on_shutdown()?
;
201 .get_chunks_in_order(&index
, skip_chunk
, check_abort
)?
;
203 for (pos
, _
) in chunk_list
{
204 verify_worker
.worker
.check_abort()?
;
205 verify_worker
.worker
.fail_on_shutdown()?
;
207 let info
= index
.chunk_info(pos
).unwrap();
209 // we must always recheck this here, the parallel worker below alter it!
210 if skip_chunk(&info
.digest
) {
211 continue; // already verified or marked corrupt
214 match verify_worker
.datastore
.load_chunk(&info
.digest
) {
220 .insert(info
.digest
);
222 verify_worker
.worker
,
223 "can't verify chunk, load failed - {}",
226 errors
.fetch_add(1, Ordering
::SeqCst
);
227 rename_corrupted_chunk(
228 verify_worker
.datastore
.clone(),
230 &verify_worker
.worker
,
234 let size
= info
.size();
235 read_bytes
+= chunk
.raw_size();
236 decoder_pool
.send((chunk
, info
.digest
, size
))?
;
237 decoded_bytes
+= size
;
242 decoder_pool
.complete()?
;
244 let elapsed
= start_time
.elapsed().as_secs_f64();
246 let read_bytes_mib
= (read_bytes
as f64) / (1024.0 * 1024.0);
247 let decoded_bytes_mib
= (decoded_bytes
as f64) / (1024.0 * 1024.0);
249 let read_speed
= read_bytes_mib
/ elapsed
;
250 let decode_speed
= decoded_bytes_mib
/ elapsed
;
252 let error_count
= errors
.load(Ordering
::SeqCst
);
255 verify_worker
.worker
,
256 " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
265 if errors
.load(Ordering
::SeqCst
) > 0 {
266 bail
!("chunks could not be verified");
272 fn verify_fixed_index(
273 verify_worker
: &VerifyWorker
,
274 backup_dir
: &BackupDir
,
276 ) -> Result
<(), Error
> {
277 let mut path
= backup_dir
.relative_path();
278 path
.push(&info
.filename
);
280 let index
= verify_worker
.datastore
.open_fixed_reader(&path
)?
;
282 let (csum
, size
) = index
.compute_csum();
283 if size
!= info
.size
{
284 bail
!("wrong size ({} != {})", info
.size
, size
);
287 if csum
!= info
.csum
{
288 bail
!("wrong index checksum");
291 verify_index_chunks(verify_worker
, Box
::new(index
), info
.chunk_crypt_mode())
294 fn verify_dynamic_index(
295 verify_worker
: &VerifyWorker
,
296 backup_dir
: &BackupDir
,
298 ) -> Result
<(), Error
> {
299 let mut path
= backup_dir
.relative_path();
300 path
.push(&info
.filename
);
302 let index
= verify_worker
.datastore
.open_dynamic_reader(&path
)?
;
304 let (csum
, size
) = index
.compute_csum();
305 if size
!= info
.size
{
306 bail
!("wrong size ({} != {})", info
.size
, size
);
309 if csum
!= info
.csum
{
310 bail
!("wrong index checksum");
313 verify_index_chunks(verify_worker
, Box
::new(index
), info
.chunk_crypt_mode())
316 /// Verify a single backup snapshot
318 /// This checks all archives inside a backup snapshot.
319 /// Errors are logged to the worker log.
322 /// - Ok(true) if verify is successful
323 /// - Ok(false) if there were verification errors
324 /// - Err(_) if task was aborted
325 pub fn verify_backup_dir(
326 verify_worker
: &VerifyWorker
,
327 backup_dir
: &BackupDir
,
329 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
330 ) -> Result
<bool
, Error
> {
331 let snap_lock
= lock_dir_noblock_shared(
332 &backup_dir
.full_path(),
334 "locked by another operation",
338 verify_backup_dir_with_lock(verify_worker
, backup_dir
, upid
, filter
, snap_lock
)
342 verify_worker
.worker
,
343 "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
344 verify_worker
.datastore
.name(),
353 /// See verify_backup_dir
354 pub fn verify_backup_dir_with_lock(
355 verify_worker
: &VerifyWorker
,
356 backup_dir
: &BackupDir
,
358 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
360 ) -> Result
<bool
, Error
> {
361 let manifest
= match backup_dir
.load_manifest() {
362 Ok((manifest
, _
)) => manifest
,
365 verify_worker
.worker
,
366 "verify {}:{} - manifest load error: {}",
367 verify_worker
.datastore
.name(),
375 if let Some(filter
) = filter
{
376 if !filter(&manifest
) {
378 verify_worker
.worker
,
379 "SKIPPED: verify {}:{} (recently verified)",
380 verify_worker
.datastore
.name(),
388 verify_worker
.worker
,
390 verify_worker
.datastore
.name(),
394 let mut error_count
= 0;
396 let mut verify_result
= VerifyState
::Ok
;
397 for info
in manifest
.files() {
398 let result
= proxmox_lang
::try_block
!({
399 task_log
!(verify_worker
.worker
, " check {}", info
.filename
);
400 match archive_type(&info
.filename
)?
{
401 ArchiveType
::FixedIndex
=> verify_fixed_index(verify_worker
, backup_dir
, info
),
402 ArchiveType
::DynamicIndex
=> verify_dynamic_index(verify_worker
, backup_dir
, info
),
403 ArchiveType
::Blob
=> verify_blob(backup_dir
, info
),
407 verify_worker
.worker
.check_abort()?
;
408 verify_worker
.worker
.fail_on_shutdown()?
;
410 if let Err(err
) = result
{
412 verify_worker
.worker
,
413 "verify {}:{}/{} failed: {}",
414 verify_worker
.datastore
.name(),
420 verify_result
= VerifyState
::Failed
;
424 let verify_state
= SnapshotVerifyState
{
425 state
: verify_result
,
428 let verify_state
= serde_json
::to_value(verify_state
)?
;
430 .update_manifest(|manifest
| {
431 manifest
.unprotected
["verify_state"] = verify_state
;
433 .map_err(|err
| format_err
!("unable to update manifest blob - {}", err
))?
;
438 /// Verify all backups inside a backup group
440 /// Errors are logged to the worker log.
443 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
444 /// - Err(_) if task was aborted
445 pub fn verify_backup_group(
446 verify_worker
: &VerifyWorker
,
448 progress
: &mut StoreProgress
,
450 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
451 ) -> Result
<Vec
<String
>, Error
> {
452 let mut errors
= Vec
::new();
453 let mut list
= match group
.list_backups() {
456 let store_with_ns
= DatastoreWithNamespace
{
457 store
: verify_worker
.datastore
.name().to_owned(),
458 ns
: group
.backup_ns().clone(),
461 verify_worker
.worker
,
462 "verify {}, group {} - unable to list backups: {}",
471 let snapshot_count
= list
.len();
473 verify_worker
.worker
,
474 "verify group {}:{} ({} snapshots)",
475 verify_worker
.datastore
.name(),
480 progress
.group_snapshots
= snapshot_count
as u64;
482 BackupInfo
::sort_list(&mut list
, false); // newest first
483 for (pos
, info
) in list
.into_iter().enumerate() {
484 if !verify_backup_dir(verify_worker
, &info
.backup_dir
, upid
.clone(), filter
)?
{
485 errors
.push(print_ns_and_snapshot(
486 info
.backup_dir
.backup_ns(),
487 info
.backup_dir
.as_ref(),
490 progress
.done_snapshots
= pos
as u64 + 1;
491 task_log
!(verify_worker
.worker
, "percentage done: {}", progress
);
497 /// Verify all (owned) backups inside a datastore
499 /// Errors are logged to the worker log.
502 /// - Ok(failed_dirs) where failed_dirs had verification errors
503 /// - Err(_) if task was aborted
504 pub fn verify_all_backups(
505 verify_worker
: &VerifyWorker
,
508 max_depth
: Option
<usize>,
509 owner
: Option
<&Authid
>,
510 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
511 ) -> Result
<Vec
<String
>, Error
> {
512 let mut errors
= Vec
::new();
513 let worker
= Arc
::clone(&verify_worker
.worker
);
517 "verify datastore {}",
518 verify_worker
.datastore
.name()
521 let owner_filtered
= if let Some(owner
) = &owner
{
522 task_log
!(worker
, "limiting to backups owned by {}", owner
);
528 // FIXME: This should probably simply enable recursion (or the call have a recursion parameter)
529 let store
= &verify_worker
.datastore
;
530 let max_depth
= max_depth
.unwrap_or(pbs_api_types
::MAX_NAMESPACE_DEPTH
);
532 let mut list
= match ListAccessibleBackupGroups
::new(store
, ns
.clone(), max_depth
, owner
) {
534 .filter_map(|group
| match group
{
535 Ok(group
) => Some(group
),
536 Err(err
) if owner_filtered
=> {
537 // intentionally not in task log, the user might not see this group!
538 println
!("error on iterating groups in ns '{ns}' - {err}");
542 // we don't filter by owner, but we want to log the error
543 task_log
!(worker
, "error on iterating groups in ns '{ns}' - {err}");
544 errors
.push(err
.to_string());
549 !(group
.backup_type() == BackupType
::Host
&& group
.backup_id() == "benchmark")
551 .collect
::<Vec
<BackupGroup
>>(),
553 task_log
!(worker
, "unable to list backups: {}", err
,);
558 list
.sort_unstable_by(|a
, b
| a
.group().cmp(b
.group()));
560 let group_count
= list
.len();
561 task_log
!(worker
, "found {} groups", group_count
);
563 let mut progress
= StoreProgress
::new(group_count
as u64);
565 for (pos
, group
) in list
.into_iter().enumerate() {
566 progress
.done_groups
= pos
as u64;
567 progress
.done_snapshots
= 0;
568 progress
.group_snapshots
= 0;
570 let mut group_errors
=
571 verify_backup_group(verify_worker
, &group
, &mut progress
, upid
, filter
)?
;
572 errors
.append(&mut group_errors
);
578 /// Filter for the verification of snapshots
579 pub fn verify_filter(
580 ignore_verified_snapshots
: bool
,
581 outdated_after
: Option
<i64>,
582 manifest
: &BackupManifest
,
584 if !ignore_verified_snapshots
{
588 let raw_verify_state
= manifest
.unprotected
["verify_state"].clone();
589 match serde_json
::from_value
::<SnapshotVerifyState
>(raw_verify_state
) {
590 Err(_
) => true, // no last verification, always include
592 match outdated_after
{
593 None
=> false, // never re-verify if ignored and no max age
595 let now
= proxmox_time
::epoch_i64();
596 let days_since_last_verify
= (now
- last_verify
.upid
.starttime
) / 86400;
598 days_since_last_verify
> max_age