2 use std
::collections
::HashSet
;
3 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
4 use std
::sync
::{Arc, Mutex}
;
5 use std
::time
::Instant
;
7 use anyhow
::{bail, format_err, Error}
;
9 use pbs_api_types
::{Authid, CryptMode, VerifyState, UPID, SnapshotVerifyState}
;
10 use pbs_datastore
::{DataBlob, StoreProgress}
;
11 use pbs_datastore
::backup_info
::{BackupGroup, BackupDir, BackupInfo}
;
12 use pbs_datastore
::index
::IndexFile
;
13 use pbs_datastore
::manifest
::{archive_type, ArchiveType, BackupManifest, FileInfo}
;
14 use pbs_tools
::fs
::lock_dir_noblock_shared
;
15 use pbs_tools
::{task_log, task::WorkerTaskContext}
;
19 tools
::ParallelHandler
,
22 /// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
23 /// already been verified or detected as corrupt.
24 pub struct VerifyWorker
{
25 worker
: Arc
<dyn WorkerTaskContext
+ Send
+ Sync
>,
26 datastore
: Arc
<DataStore
>,
27 verified_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
28 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
32 /// Creates a new VerifyWorker for a given task worker and datastore.
33 pub fn new(worker
: Arc
<dyn WorkerTaskContext
+ Send
+ Sync
>, datastore
: Arc
<DataStore
>) -> Self {
37 // start with 16k chunks == up to 64G data
38 verified_chunks
: Arc
::new(Mutex
::new(HashSet
::with_capacity(16 * 1024))),
39 // start with 64 chunks since we assume there are few corrupt ones
40 corrupt_chunks
: Arc
::new(Mutex
::new(HashSet
::with_capacity(64))),
46 datastore
: Arc
<DataStore
>,
47 backup_dir
: &BackupDir
,
49 ) -> Result
<(), Error
> {
50 let blob
= datastore
.load_blob(backup_dir
, &info
.filename
)?
;
52 let raw_size
= blob
.raw_size();
53 if raw_size
!= info
.size
{
54 bail
!("wrong size ({} != {})", info
.size
, raw_size
);
57 let csum
= openssl
::sha
::sha256(blob
.raw_data());
58 if csum
!= info
.csum
{
59 bail
!("wrong index checksum");
62 match blob
.crypt_mode()?
{
63 CryptMode
::Encrypt
=> Ok(()),
65 // digest already verified above
66 blob
.decode(None
, None
)?
;
69 CryptMode
::SignOnly
=> bail
!("Invalid CryptMode for blob"),
73 fn rename_corrupted_chunk(
74 datastore
: Arc
<DataStore
>,
76 worker
: &dyn WorkerTaskContext
,
78 let (path
, digest_str
) = datastore
.chunk_path(digest
);
81 let mut new_path
= path
.clone();
83 new_path
.set_file_name(format
!("{}.{}.bad", digest_str
, counter
));
84 if new_path
.exists() && counter
< 9 {
91 match std
::fs
::rename(&path
, &new_path
) {
93 task_log
!(worker
, "corrupted chunk renamed to {:?}", &new_path
);
97 std
::io
::ErrorKind
::NotFound
=> { /* ignored */ }
,
98 _
=> task_log
!(worker
, "could not rename corrupted chunk {:?} - {}", &path
, err
)
104 fn verify_index_chunks(
105 verify_worker
: &VerifyWorker
,
106 index
: Box
<dyn IndexFile
+ Send
>,
107 crypt_mode
: CryptMode
,
108 ) -> Result
<(), Error
> {
109 let errors
= Arc
::new(AtomicUsize
::new(0));
111 let start_time
= Instant
::now();
113 let mut read_bytes
= 0;
114 let mut decoded_bytes
= 0;
116 let worker2
= Arc
::clone(&verify_worker
.worker
);
117 let datastore2
= Arc
::clone(&verify_worker
.datastore
);
118 let corrupt_chunks2
= Arc
::clone(&verify_worker
.corrupt_chunks
);
119 let verified_chunks2
= Arc
::clone(&verify_worker
.verified_chunks
);
120 let errors2
= Arc
::clone(&errors
);
122 let decoder_pool
= ParallelHandler
::new(
123 "verify chunk decoder",
125 move |(chunk
, digest
, size
): (DataBlob
, [u8; 32], u64)| {
126 let chunk_crypt_mode
= match chunk
.crypt_mode() {
128 corrupt_chunks2
.lock().unwrap().insert(digest
);
129 task_log
!(worker2
, "can't verify chunk, unknown CryptMode - {}", err
);
130 errors2
.fetch_add(1, Ordering
::SeqCst
);
136 if chunk_crypt_mode
!= crypt_mode
{
139 "chunk CryptMode {:?} does not match index CryptMode {:?}",
143 errors2
.fetch_add(1, Ordering
::SeqCst
);
146 if let Err(err
) = chunk
.verify_unencrypted(size
as usize, &digest
) {
147 corrupt_chunks2
.lock().unwrap().insert(digest
);
148 task_log
!(worker2
, "{}", err
);
149 errors2
.fetch_add(1, Ordering
::SeqCst
);
150 rename_corrupted_chunk(datastore2
.clone(), &digest
, &worker2
);
152 verified_chunks2
.lock().unwrap().insert(digest
);
159 let skip_chunk
= |digest
: &[u8; 32]| -> bool
{
160 if verify_worker
.verified_chunks
.lock().unwrap().contains(digest
) {
162 } else if verify_worker
.corrupt_chunks
.lock().unwrap().contains(digest
) {
163 let digest_str
= proxmox
::tools
::digest_to_hex(digest
);
164 task_log
!(verify_worker
.worker
, "chunk {} was marked as corrupt", digest_str
);
165 errors
.fetch_add(1, Ordering
::SeqCst
);
172 let check_abort
= |pos
: usize| -> Result
<(), Error
> {
174 verify_worker
.worker
.check_abort()?
;
175 proxmox_rest_server
::fail_on_shutdown()?
;
183 .get_chunks_in_order(&index
, skip_chunk
, check_abort
)?
;
185 for (pos
, _
) in chunk_list
{
186 verify_worker
.worker
.check_abort()?
;
187 proxmox_rest_server
::fail_on_shutdown()?
;
189 let info
= index
.chunk_info(pos
).unwrap();
191 // we must always recheck this here, the parallel worker below alter it!
192 if skip_chunk(&info
.digest
) {
193 continue; // already verified or marked corrupt
196 match verify_worker
.datastore
.load_chunk(&info
.digest
) {
198 verify_worker
.corrupt_chunks
.lock().unwrap().insert(info
.digest
);
199 task_log
!(verify_worker
.worker
, "can't verify chunk, load failed - {}", err
);
200 errors
.fetch_add(1, Ordering
::SeqCst
);
201 rename_corrupted_chunk(
202 verify_worker
.datastore
.clone(),
204 &verify_worker
.worker
,
208 let size
= info
.size();
209 read_bytes
+= chunk
.raw_size();
210 decoder_pool
.send((chunk
, info
.digest
, size
))?
;
211 decoded_bytes
+= size
;
216 decoder_pool
.complete()?
;
218 let elapsed
= start_time
.elapsed().as_secs_f64();
220 let read_bytes_mib
= (read_bytes
as f64) / (1024.0 * 1024.0);
221 let decoded_bytes_mib
= (decoded_bytes
as f64) / (1024.0 * 1024.0);
223 let read_speed
= read_bytes_mib
/ elapsed
;
224 let decode_speed
= decoded_bytes_mib
/ elapsed
;
226 let error_count
= errors
.load(Ordering
::SeqCst
);
229 verify_worker
.worker
,
230 " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
239 if errors
.load(Ordering
::SeqCst
) > 0 {
240 bail
!("chunks could not be verified");
246 fn verify_fixed_index(
247 verify_worker
: &VerifyWorker
,
248 backup_dir
: &BackupDir
,
250 ) -> Result
<(), Error
> {
251 let mut path
= backup_dir
.relative_path();
252 path
.push(&info
.filename
);
254 let index
= verify_worker
.datastore
.open_fixed_reader(&path
)?
;
256 let (csum
, size
) = index
.compute_csum();
257 if size
!= info
.size
{
258 bail
!("wrong size ({} != {})", info
.size
, size
);
261 if csum
!= info
.csum
{
262 bail
!("wrong index checksum");
265 verify_index_chunks(verify_worker
, Box
::new(index
), info
.chunk_crypt_mode())
268 fn verify_dynamic_index(
269 verify_worker
: &VerifyWorker
,
270 backup_dir
: &BackupDir
,
272 ) -> Result
<(), Error
> {
273 let mut path
= backup_dir
.relative_path();
274 path
.push(&info
.filename
);
276 let index
= verify_worker
.datastore
.open_dynamic_reader(&path
)?
;
278 let (csum
, size
) = index
.compute_csum();
279 if size
!= info
.size
{
280 bail
!("wrong size ({} != {})", info
.size
, size
);
283 if csum
!= info
.csum
{
284 bail
!("wrong index checksum");
287 verify_index_chunks(verify_worker
, Box
::new(index
), info
.chunk_crypt_mode())
290 /// Verify a single backup snapshot
292 /// This checks all archives inside a backup snapshot.
293 /// Errors are logged to the worker log.
296 /// - Ok(true) if verify is successful
297 /// - Ok(false) if there were verification errors
298 /// - Err(_) if task was aborted
299 pub fn verify_backup_dir(
300 verify_worker
: &VerifyWorker
,
301 backup_dir
: &BackupDir
,
303 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
304 ) -> Result
<bool
, Error
> {
305 let snap_lock
= lock_dir_noblock_shared(
306 &verify_worker
.datastore
.snapshot_path(&backup_dir
),
308 "locked by another operation",
312 verify_backup_dir_with_lock(verify_worker
, backup_dir
, upid
, filter
, snap_lock
)
316 verify_worker
.worker
,
317 "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
318 verify_worker
.datastore
.name(),
327 /// See verify_backup_dir
328 pub fn verify_backup_dir_with_lock(
329 verify_worker
: &VerifyWorker
,
330 backup_dir
: &BackupDir
,
332 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
334 ) -> Result
<bool
, Error
> {
335 let manifest
= match verify_worker
.datastore
.load_manifest(&backup_dir
) {
336 Ok((manifest
, _
)) => manifest
,
339 verify_worker
.worker
,
340 "verify {}:{} - manifest load error: {}",
341 verify_worker
.datastore
.name(),
349 if let Some(filter
) = filter
{
350 if !filter(&manifest
) {
352 verify_worker
.worker
,
353 "SKIPPED: verify {}:{} (recently verified)",
354 verify_worker
.datastore
.name(),
361 task_log
!(verify_worker
.worker
, "verify {}:{}", verify_worker
.datastore
.name(), backup_dir
);
363 let mut error_count
= 0;
365 let mut verify_result
= VerifyState
::Ok
;
366 for info
in manifest
.files() {
367 let result
= proxmox
::try_block
!({
368 task_log
!(verify_worker
.worker
, " check {}", info
.filename
);
369 match archive_type(&info
.filename
)?
{
370 ArchiveType
::FixedIndex
=> verify_fixed_index(verify_worker
, &backup_dir
, info
),
371 ArchiveType
::DynamicIndex
=> verify_dynamic_index(verify_worker
, &backup_dir
, info
),
372 ArchiveType
::Blob
=> {
373 verify_blob(verify_worker
.datastore
.clone(), &backup_dir
, info
)
378 verify_worker
.worker
.check_abort()?
;
379 proxmox_rest_server
::fail_on_shutdown()?
;
381 if let Err(err
) = result
{
383 verify_worker
.worker
,
384 "verify {}:{}/{} failed: {}",
385 verify_worker
.datastore
.name(),
391 verify_result
= VerifyState
::Failed
;
395 let verify_state
= SnapshotVerifyState
{
396 state
: verify_result
,
399 let verify_state
= serde_json
::to_value(verify_state
)?
;
402 .update_manifest(&backup_dir
, |manifest
| {
403 manifest
.unprotected
["verify_state"] = verify_state
;
405 .map_err(|err
| format_err
!("unable to update manifest blob - {}", err
))?
;
410 /// Verify all backups inside a backup group
412 /// Errors are logged to the worker log.
415 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
416 /// - Err(_) if task was aborted
417 pub fn verify_backup_group(
418 verify_worker
: &VerifyWorker
,
420 progress
: &mut StoreProgress
,
422 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
423 ) -> Result
<Vec
<String
>, Error
> {
424 let mut errors
= Vec
::new();
425 let mut list
= match group
.list_backups(&verify_worker
.datastore
.base_path()) {
429 verify_worker
.worker
,
430 "verify group {}:{} - unable to list backups: {}",
431 verify_worker
.datastore
.name(),
439 let snapshot_count
= list
.len();
441 verify_worker
.worker
,
442 "verify group {}:{} ({} snapshots)",
443 verify_worker
.datastore
.name(),
448 progress
.group_snapshots
= snapshot_count
as u64;
450 BackupInfo
::sort_list(&mut list
, false); // newest first
451 for (pos
, info
) in list
.into_iter().enumerate() {
452 if !verify_backup_dir(verify_worker
, &info
.backup_dir
, upid
.clone(), filter
)?
{
453 errors
.push(info
.backup_dir
.to_string());
455 progress
.done_snapshots
= pos
as u64 + 1;
456 task_log
!(verify_worker
.worker
, "percentage done: {}", progress
);
462 /// Verify all (owned) backups inside a datastore
464 /// Errors are logged to the worker log.
467 /// - Ok(failed_dirs) where failed_dirs had verification errors
468 /// - Err(_) if task was aborted
469 pub fn verify_all_backups(
470 verify_worker
: &VerifyWorker
,
472 owner
: Option
<Authid
>,
473 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
474 ) -> Result
<Vec
<String
>, Error
> {
475 let mut errors
= Vec
::new();
476 let worker
= Arc
::clone(&verify_worker
.worker
);
478 task_log
!(worker
, "verify datastore {}", verify_worker
.datastore
.name());
480 if let Some(owner
) = &owner
{
481 task_log
!(worker
, "limiting to backups owned by {}", owner
);
484 let filter_by_owner
= |group
: &BackupGroup
| {
485 match (verify_worker
.datastore
.get_owner(group
), &owner
) {
486 (Ok(ref group_owner
), Some(owner
)) => {
488 || (group_owner
.is_token()
490 && group_owner
.user() == owner
.user())
492 (Ok(_
), None
) => true,
493 (Err(err
), Some(_
)) => {
494 // intentionally not in task log
495 // the task user might not be allowed to see this group!
496 println
!("Failed to get owner of group '{}' - {}", group
, err
);
499 (Err(err
), None
) => {
500 // we don't filter by owner, but we want to log the error
503 "Failed to get owner of group '{} - {}",
507 errors
.push(group
.to_string());
513 let mut list
= match BackupInfo
::list_backup_groups(&verify_worker
.datastore
.base_path()) {
516 .filter(|group
| !(group
.backup_type() == "host" && group
.backup_id() == "benchmark"))
517 .filter(filter_by_owner
)
518 .collect
::<Vec
<BackupGroup
>>(),
520 task_log
!(worker
, "unable to list backups: {}", err
,);
525 list
.sort_unstable();
527 let group_count
= list
.len();
528 task_log
!(worker
, "found {} groups", group_count
);
530 let mut progress
= StoreProgress
::new(group_count
as u64);
532 for (pos
, group
) in list
.into_iter().enumerate() {
533 progress
.done_groups
= pos
as u64;
534 progress
.done_snapshots
= 0;
535 progress
.group_snapshots
= 0;
537 let mut group_errors
=
538 verify_backup_group(verify_worker
, &group
, &mut progress
, upid
, filter
)?
;
539 errors
.append(&mut group_errors
);
545 /// Filter for the verification of snapshots
546 pub fn verify_filter(
547 ignore_verified_snapshots
: bool
,
548 outdated_after
: Option
<i64>,
549 manifest
: &BackupManifest
,
551 if !ignore_verified_snapshots
{
555 let raw_verify_state
= manifest
.unprotected
["verify_state"].clone();
556 match serde_json
::from_value
::<SnapshotVerifyState
>(raw_verify_state
) {
557 Err(_
) => true, // no last verification, always include
559 match outdated_after
{
560 None
=> false, // never re-verify if ignored and no max age
562 let now
= proxmox
::tools
::time
::epoch_i64();
563 let days_since_last_verify
= (now
- last_verify
.upid
.starttime
) / 86400;
565 days_since_last_verify
> max_age