2 use std
::collections
::HashSet
;
3 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
4 use std
::sync
::{Arc, Mutex}
;
5 use std
::time
::Instant
;
7 use anyhow
::{bail, format_err, Error}
;
9 use proxmox_sys
::{task_log, WorkerTaskContext}
;
11 use pbs_api_types
::{Authid, CryptMode, VerifyState, UPID, SnapshotVerifyState}
;
12 use pbs_datastore
::{DataStore, DataBlob, StoreProgress}
;
13 use pbs_datastore
::backup_info
::{BackupGroup, BackupDir, BackupInfo}
;
14 use pbs_datastore
::index
::IndexFile
;
15 use pbs_datastore
::manifest
::{archive_type, ArchiveType, BackupManifest, FileInfo}
;
16 use proxmox_sys
::fs
::lock_dir_noblock_shared
;
18 use crate::tools
::ParallelHandler
;
20 /// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
21 /// already been verified or detected as corrupt.
22 pub struct VerifyWorker
{
23 worker
: Arc
<dyn WorkerTaskContext
>,
24 datastore
: Arc
<DataStore
>,
25 verified_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
26 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
30 /// Creates a new VerifyWorker for a given task worker and datastore.
31 pub fn new(worker
: Arc
<dyn WorkerTaskContext
>, datastore
: Arc
<DataStore
>) -> Self {
35 // start with 16k chunks == up to 64G data
36 verified_chunks
: Arc
::new(Mutex
::new(HashSet
::with_capacity(16 * 1024))),
37 // start with 64 chunks since we assume there are few corrupt ones
38 corrupt_chunks
: Arc
::new(Mutex
::new(HashSet
::with_capacity(64))),
44 datastore
: Arc
<DataStore
>,
45 backup_dir
: &BackupDir
,
47 ) -> Result
<(), Error
> {
48 let blob
= datastore
.load_blob(backup_dir
, &info
.filename
)?
;
50 let raw_size
= blob
.raw_size();
51 if raw_size
!= info
.size
{
52 bail
!("wrong size ({} != {})", info
.size
, raw_size
);
55 let csum
= openssl
::sha
::sha256(blob
.raw_data());
56 if csum
!= info
.csum
{
57 bail
!("wrong index checksum");
60 match blob
.crypt_mode()?
{
61 CryptMode
::Encrypt
=> Ok(()),
63 // digest already verified above
64 blob
.decode(None
, None
)?
;
67 CryptMode
::SignOnly
=> bail
!("Invalid CryptMode for blob"),
71 fn rename_corrupted_chunk(
72 datastore
: Arc
<DataStore
>,
74 worker
: &dyn WorkerTaskContext
,
76 let (path
, digest_str
) = datastore
.chunk_path(digest
);
79 let mut new_path
= path
.clone();
81 new_path
.set_file_name(format
!("{}.{}.bad", digest_str
, counter
));
82 if new_path
.exists() && counter
< 9 {
89 match std
::fs
::rename(&path
, &new_path
) {
91 task_log
!(worker
, "corrupted chunk renamed to {:?}", &new_path
);
95 std
::io
::ErrorKind
::NotFound
=> { /* ignored */ }
,
96 _
=> task_log
!(worker
, "could not rename corrupted chunk {:?} - {}", &path
, err
)
102 fn verify_index_chunks(
103 verify_worker
: &VerifyWorker
,
104 index
: Box
<dyn IndexFile
+ Send
>,
105 crypt_mode
: CryptMode
,
106 ) -> Result
<(), Error
> {
107 let errors
= Arc
::new(AtomicUsize
::new(0));
109 let start_time
= Instant
::now();
111 let mut read_bytes
= 0;
112 let mut decoded_bytes
= 0;
114 let worker2
= Arc
::clone(&verify_worker
.worker
);
115 let datastore2
= Arc
::clone(&verify_worker
.datastore
);
116 let corrupt_chunks2
= Arc
::clone(&verify_worker
.corrupt_chunks
);
117 let verified_chunks2
= Arc
::clone(&verify_worker
.verified_chunks
);
118 let errors2
= Arc
::clone(&errors
);
120 let decoder_pool
= ParallelHandler
::new(
121 "verify chunk decoder",
123 move |(chunk
, digest
, size
): (DataBlob
, [u8; 32], u64)| {
124 let chunk_crypt_mode
= match chunk
.crypt_mode() {
126 corrupt_chunks2
.lock().unwrap().insert(digest
);
127 task_log
!(worker2
, "can't verify chunk, unknown CryptMode - {}", err
);
128 errors2
.fetch_add(1, Ordering
::SeqCst
);
134 if chunk_crypt_mode
!= crypt_mode
{
137 "chunk CryptMode {:?} does not match index CryptMode {:?}",
141 errors2
.fetch_add(1, Ordering
::SeqCst
);
144 if let Err(err
) = chunk
.verify_unencrypted(size
as usize, &digest
) {
145 corrupt_chunks2
.lock().unwrap().insert(digest
);
146 task_log
!(worker2
, "{}", err
);
147 errors2
.fetch_add(1, Ordering
::SeqCst
);
148 rename_corrupted_chunk(datastore2
.clone(), &digest
, &worker2
);
150 verified_chunks2
.lock().unwrap().insert(digest
);
157 let skip_chunk
= |digest
: &[u8; 32]| -> bool
{
158 if verify_worker
.verified_chunks
.lock().unwrap().contains(digest
) {
160 } else if verify_worker
.corrupt_chunks
.lock().unwrap().contains(digest
) {
161 let digest_str
= hex
::encode(digest
);
162 task_log
!(verify_worker
.worker
, "chunk {} was marked as corrupt", digest_str
);
163 errors
.fetch_add(1, Ordering
::SeqCst
);
170 let check_abort
= |pos
: usize| -> Result
<(), Error
> {
172 verify_worker
.worker
.check_abort()?
;
173 verify_worker
.worker
.fail_on_shutdown()?
;
181 .get_chunks_in_order(&index
, skip_chunk
, check_abort
)?
;
183 for (pos
, _
) in chunk_list
{
184 verify_worker
.worker
.check_abort()?
;
185 verify_worker
.worker
.fail_on_shutdown()?
;
187 let info
= index
.chunk_info(pos
).unwrap();
189 // we must always recheck this here, the parallel worker below alter it!
190 if skip_chunk(&info
.digest
) {
191 continue; // already verified or marked corrupt
194 match verify_worker
.datastore
.load_chunk(&info
.digest
) {
196 verify_worker
.corrupt_chunks
.lock().unwrap().insert(info
.digest
);
197 task_log
!(verify_worker
.worker
, "can't verify chunk, load failed - {}", err
);
198 errors
.fetch_add(1, Ordering
::SeqCst
);
199 rename_corrupted_chunk(
200 verify_worker
.datastore
.clone(),
202 &verify_worker
.worker
,
206 let size
= info
.size();
207 read_bytes
+= chunk
.raw_size();
208 decoder_pool
.send((chunk
, info
.digest
, size
))?
;
209 decoded_bytes
+= size
;
214 decoder_pool
.complete()?
;
216 let elapsed
= start_time
.elapsed().as_secs_f64();
218 let read_bytes_mib
= (read_bytes
as f64) / (1024.0 * 1024.0);
219 let decoded_bytes_mib
= (decoded_bytes
as f64) / (1024.0 * 1024.0);
221 let read_speed
= read_bytes_mib
/ elapsed
;
222 let decode_speed
= decoded_bytes_mib
/ elapsed
;
224 let error_count
= errors
.load(Ordering
::SeqCst
);
227 verify_worker
.worker
,
228 " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
237 if errors
.load(Ordering
::SeqCst
) > 0 {
238 bail
!("chunks could not be verified");
244 fn verify_fixed_index(
245 verify_worker
: &VerifyWorker
,
246 backup_dir
: &BackupDir
,
248 ) -> Result
<(), Error
> {
249 let mut path
= backup_dir
.relative_path();
250 path
.push(&info
.filename
);
252 let index
= verify_worker
.datastore
.open_fixed_reader(&path
)?
;
254 let (csum
, size
) = index
.compute_csum();
255 if size
!= info
.size
{
256 bail
!("wrong size ({} != {})", info
.size
, size
);
259 if csum
!= info
.csum
{
260 bail
!("wrong index checksum");
263 verify_index_chunks(verify_worker
, Box
::new(index
), info
.chunk_crypt_mode())
266 fn verify_dynamic_index(
267 verify_worker
: &VerifyWorker
,
268 backup_dir
: &BackupDir
,
270 ) -> Result
<(), Error
> {
271 let mut path
= backup_dir
.relative_path();
272 path
.push(&info
.filename
);
274 let index
= verify_worker
.datastore
.open_dynamic_reader(&path
)?
;
276 let (csum
, size
) = index
.compute_csum();
277 if size
!= info
.size
{
278 bail
!("wrong size ({} != {})", info
.size
, size
);
281 if csum
!= info
.csum
{
282 bail
!("wrong index checksum");
285 verify_index_chunks(verify_worker
, Box
::new(index
), info
.chunk_crypt_mode())
288 /// Verify a single backup snapshot
290 /// This checks all archives inside a backup snapshot.
291 /// Errors are logged to the worker log.
294 /// - Ok(true) if verify is successful
295 /// - Ok(false) if there were verification errors
296 /// - Err(_) if task was aborted
297 pub fn verify_backup_dir(
298 verify_worker
: &VerifyWorker
,
299 backup_dir
: &BackupDir
,
301 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
302 ) -> Result
<bool
, Error
> {
303 let snap_lock
= lock_dir_noblock_shared(
304 &verify_worker
.datastore
.snapshot_path(backup_dir
),
306 "locked by another operation",
310 verify_backup_dir_with_lock(verify_worker
, backup_dir
, upid
, filter
, snap_lock
)
314 verify_worker
.worker
,
315 "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
316 verify_worker
.datastore
.name(),
325 /// See verify_backup_dir
326 pub fn verify_backup_dir_with_lock(
327 verify_worker
: &VerifyWorker
,
328 backup_dir
: &BackupDir
,
330 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
332 ) -> Result
<bool
, Error
> {
333 let manifest
= match verify_worker
.datastore
.load_manifest(backup_dir
) {
334 Ok((manifest
, _
)) => manifest
,
337 verify_worker
.worker
,
338 "verify {}:{} - manifest load error: {}",
339 verify_worker
.datastore
.name(),
347 if let Some(filter
) = filter
{
348 if !filter(&manifest
) {
350 verify_worker
.worker
,
351 "SKIPPED: verify {}:{} (recently verified)",
352 verify_worker
.datastore
.name(),
359 task_log
!(verify_worker
.worker
, "verify {}:{}", verify_worker
.datastore
.name(), backup_dir
);
361 let mut error_count
= 0;
363 let mut verify_result
= VerifyState
::Ok
;
364 for info
in manifest
.files() {
365 let result
= proxmox_lang
::try_block
!({
366 task_log
!(verify_worker
.worker
, " check {}", info
.filename
);
367 match archive_type(&info
.filename
)?
{
368 ArchiveType
::FixedIndex
=> verify_fixed_index(verify_worker
, backup_dir
, info
),
369 ArchiveType
::DynamicIndex
=> verify_dynamic_index(verify_worker
, backup_dir
, info
),
370 ArchiveType
::Blob
=> {
371 verify_blob(verify_worker
.datastore
.clone(), backup_dir
, info
)
376 verify_worker
.worker
.check_abort()?
;
377 verify_worker
.worker
.fail_on_shutdown()?
;
379 if let Err(err
) = result
{
381 verify_worker
.worker
,
382 "verify {}:{}/{} failed: {}",
383 verify_worker
.datastore
.name(),
389 verify_result
= VerifyState
::Failed
;
393 let verify_state
= SnapshotVerifyState
{
394 state
: verify_result
,
397 let verify_state
= serde_json
::to_value(verify_state
)?
;
400 .update_manifest(backup_dir
, |manifest
| {
401 manifest
.unprotected
["verify_state"] = verify_state
;
403 .map_err(|err
| format_err
!("unable to update manifest blob - {}", err
))?
;
408 /// Verify all backups inside a backup group
410 /// Errors are logged to the worker log.
413 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
414 /// - Err(_) if task was aborted
415 pub fn verify_backup_group(
416 verify_worker
: &VerifyWorker
,
418 progress
: &mut StoreProgress
,
420 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
421 ) -> Result
<Vec
<String
>, Error
> {
422 let mut errors
= Vec
::new();
423 let mut list
= match group
.list_backups(&verify_worker
.datastore
.base_path()) {
427 verify_worker
.worker
,
428 "verify group {}:{} - unable to list backups: {}",
429 verify_worker
.datastore
.name(),
437 let snapshot_count
= list
.len();
439 verify_worker
.worker
,
440 "verify group {}:{} ({} snapshots)",
441 verify_worker
.datastore
.name(),
446 progress
.group_snapshots
= snapshot_count
as u64;
448 BackupInfo
::sort_list(&mut list
, false); // newest first
449 for (pos
, info
) in list
.into_iter().enumerate() {
450 if !verify_backup_dir(verify_worker
, &info
.backup_dir
, upid
.clone(), filter
)?
{
451 errors
.push(info
.backup_dir
.to_string());
453 progress
.done_snapshots
= pos
as u64 + 1;
454 task_log
!(verify_worker
.worker
, "percentage done: {}", progress
);
460 /// Verify all (owned) backups inside a datastore
462 /// Errors are logged to the worker log.
465 /// - Ok(failed_dirs) where failed_dirs had verification errors
466 /// - Err(_) if task was aborted
467 pub fn verify_all_backups(
468 verify_worker
: &VerifyWorker
,
470 owner
: Option
<Authid
>,
471 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
472 ) -> Result
<Vec
<String
>, Error
> {
473 let mut errors
= Vec
::new();
474 let worker
= Arc
::clone(&verify_worker
.worker
);
476 task_log
!(worker
, "verify datastore {}", verify_worker
.datastore
.name());
478 if let Some(owner
) = &owner
{
479 task_log
!(worker
, "limiting to backups owned by {}", owner
);
482 let filter_by_owner
= |group
: &BackupGroup
| {
483 match (verify_worker
.datastore
.get_owner(group
), &owner
) {
484 (Ok(ref group_owner
), Some(owner
)) => {
486 || (group_owner
.is_token()
488 && group_owner
.user() == owner
.user())
490 (Ok(_
), None
) => true,
491 (Err(err
), Some(_
)) => {
492 // intentionally not in task log
493 // the task user might not be allowed to see this group!
494 println
!("Failed to get owner of group '{}' - {}", group
, err
);
497 (Err(err
), None
) => {
498 // we don't filter by owner, but we want to log the error
501 "Failed to get owner of group '{} - {}",
505 errors
.push(group
.to_string());
511 let mut list
= match BackupInfo
::list_backup_groups(&verify_worker
.datastore
.base_path()) {
514 .filter(|group
| !(group
.backup_type() == "host" && group
.backup_id() == "benchmark"))
515 .filter(filter_by_owner
)
516 .collect
::<Vec
<BackupGroup
>>(),
518 task_log
!(worker
, "unable to list backups: {}", err
,);
523 list
.sort_unstable();
525 let group_count
= list
.len();
526 task_log
!(worker
, "found {} groups", group_count
);
528 let mut progress
= StoreProgress
::new(group_count
as u64);
530 for (pos
, group
) in list
.into_iter().enumerate() {
531 progress
.done_groups
= pos
as u64;
532 progress
.done_snapshots
= 0;
533 progress
.group_snapshots
= 0;
535 let mut group_errors
=
536 verify_backup_group(verify_worker
, &group
, &mut progress
, upid
, filter
)?
;
537 errors
.append(&mut group_errors
);
543 /// Filter for the verification of snapshots
544 pub fn verify_filter(
545 ignore_verified_snapshots
: bool
,
546 outdated_after
: Option
<i64>,
547 manifest
: &BackupManifest
,
549 if !ignore_verified_snapshots
{
553 let raw_verify_state
= manifest
.unprotected
["verify_state"].clone();
554 match serde_json
::from_value
::<SnapshotVerifyState
>(raw_verify_state
) {
555 Err(_
) => true, // no last verification, always include
557 match outdated_after
{
558 None
=> false, // never re-verify if ignored and no max age
560 let now
= proxmox_time
::epoch_i64();
561 let days_since_last_verify
= (now
- last_verify
.upid
.starttime
) / 86400;
563 days_since_last_verify
> max_age