1 use std
::collections
::HashSet
;
2 use std
::sync
::{Arc, Mutex}
;
3 use std
::sync
::atomic
::{Ordering, AtomicUsize}
;
4 use std
::time
::Instant
;
7 use anyhow
::{bail, format_err, Error}
;
27 tools
::ParallelHandler
,
28 tools
::fs
::lock_dir_noblock_shared
,
31 fn verify_blob(datastore
: Arc
<DataStore
>, backup_dir
: &BackupDir
, info
: &FileInfo
) -> Result
<(), Error
> {
33 let blob
= datastore
.load_blob(backup_dir
, &info
.filename
)?
;
35 let raw_size
= blob
.raw_size();
36 if raw_size
!= info
.size
{
37 bail
!("wrong size ({} != {})", info
.size
, raw_size
);
40 let csum
= openssl
::sha
::sha256(blob
.raw_data());
41 if csum
!= info
.csum
{
42 bail
!("wrong index checksum");
45 match blob
.crypt_mode()?
{
46 CryptMode
::Encrypt
=> Ok(()),
48 // digest already verified above
49 blob
.decode(None
, None
)?
;
52 CryptMode
::SignOnly
=> bail
!("Invalid CryptMode for blob"),
56 fn rename_corrupted_chunk(
57 datastore
: Arc
<DataStore
>,
59 worker
: &dyn TaskState
,
61 let (path
, digest_str
) = datastore
.chunk_path(digest
);
64 let mut new_path
= path
.clone();
66 new_path
.set_file_name(format
!("{}.{}.bad", digest_str
, counter
));
67 if new_path
.exists() && counter
< 9 { counter += 1; }
else { break; }
70 match std
::fs
::rename(&path
, &new_path
) {
72 task_log
!(worker
, "corrupted chunk renamed to {:?}", &new_path
);
76 std
::io
::ErrorKind
::NotFound
=> { /* ignored */ }
,
77 _
=> task_log
!(worker
, "could not rename corrupted chunk {:?} - {}", &path
, err
)
83 fn verify_index_chunks(
84 datastore
: Arc
<DataStore
>,
85 index
: Box
<dyn IndexFile
+ Send
>,
86 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
87 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
88 crypt_mode
: CryptMode
,
89 worker
: Arc
<dyn TaskState
+ Send
+ Sync
>,
90 ) -> Result
<(), Error
> {
92 let errors
= Arc
::new(AtomicUsize
::new(0));
94 let start_time
= Instant
::now();
96 let mut read_bytes
= 0;
97 let mut decoded_bytes
= 0;
99 let worker2
= Arc
::clone(&worker
);
100 let datastore2
= Arc
::clone(&datastore
);
101 let corrupt_chunks2
= Arc
::clone(&corrupt_chunks
);
102 let verified_chunks2
= Arc
::clone(&verified_chunks
);
103 let errors2
= Arc
::clone(&errors
);
105 let decoder_pool
= ParallelHandler
::new(
106 "verify chunk decoder", 4,
107 move |(chunk
, digest
, size
): (DataBlob
, [u8;32], u64)| {
108 let chunk_crypt_mode
= match chunk
.crypt_mode() {
110 corrupt_chunks2
.lock().unwrap().insert(digest
);
111 task_log
!(worker2
, "can't verify chunk, unknown CryptMode - {}", err
);
112 errors2
.fetch_add(1, Ordering
::SeqCst
);
118 if chunk_crypt_mode
!= crypt_mode
{
121 "chunk CryptMode {:?} does not match index CryptMode {:?}",
125 errors2
.fetch_add(1, Ordering
::SeqCst
);
128 if let Err(err
) = chunk
.verify_unencrypted(size
as usize, &digest
) {
129 corrupt_chunks2
.lock().unwrap().insert(digest
);
130 task_log
!(worker2
, "{}", err
);
131 errors2
.fetch_add(1, Ordering
::SeqCst
);
132 rename_corrupted_chunk(datastore2
.clone(), &digest
, &worker2
);
134 verified_chunks2
.lock().unwrap().insert(digest
);
141 for pos
in 0..index
.index_count() {
143 worker
.check_abort()?
;
144 crate::tools
::fail_on_shutdown()?
;
146 let info
= index
.chunk_info(pos
).unwrap();
147 let size
= info
.size();
149 if verified_chunks
.lock().unwrap().contains(&info
.digest
) {
150 continue; // already verified
153 if corrupt_chunks
.lock().unwrap().contains(&info
.digest
) {
154 let digest_str
= proxmox
::tools
::digest_to_hex(&info
.digest
);
155 task_log
!(worker
, "chunk {} was marked as corrupt", digest_str
);
156 errors
.fetch_add(1, Ordering
::SeqCst
);
160 match datastore
.load_chunk(&info
.digest
) {
162 corrupt_chunks
.lock().unwrap().insert(info
.digest
);
163 task_log
!(worker
, "can't verify chunk, load failed - {}", err
);
164 errors
.fetch_add(1, Ordering
::SeqCst
);
165 rename_corrupted_chunk(datastore
.clone(), &info
.digest
, &worker
);
169 read_bytes
+= chunk
.raw_size();
170 decoder_pool
.send((chunk
, info
.digest
, size
))?
;
171 decoded_bytes
+= size
;
176 decoder_pool
.complete()?
;
178 let elapsed
= start_time
.elapsed().as_secs_f64();
180 let read_bytes_mib
= (read_bytes
as f64)/(1024.0*1024.0);
181 let decoded_bytes_mib
= (decoded_bytes
as f64)/(1024.0*1024.0);
183 let read_speed
= read_bytes_mib
/elapsed
;
184 let decode_speed
= decoded_bytes_mib
/elapsed
;
186 let error_count
= errors
.load(Ordering
::SeqCst
);
190 " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
199 if errors
.load(Ordering
::SeqCst
) > 0 {
200 bail
!("chunks could not be verified");
206 fn verify_fixed_index(
207 datastore
: Arc
<DataStore
>,
208 backup_dir
: &BackupDir
,
210 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
211 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
212 worker
: Arc
<dyn TaskState
+ Send
+ Sync
>,
213 ) -> Result
<(), Error
> {
215 let mut path
= backup_dir
.relative_path();
216 path
.push(&info
.filename
);
218 let index
= datastore
.open_fixed_reader(&path
)?
;
220 let (csum
, size
) = index
.compute_csum();
221 if size
!= info
.size
{
222 bail
!("wrong size ({} != {})", info
.size
, size
);
225 if csum
!= info
.csum
{
226 bail
!("wrong index checksum");
234 info
.chunk_crypt_mode(),
239 fn verify_dynamic_index(
240 datastore
: Arc
<DataStore
>,
241 backup_dir
: &BackupDir
,
243 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
244 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
245 worker
: Arc
<dyn TaskState
+ Send
+ Sync
>,
246 ) -> Result
<(), Error
> {
248 let mut path
= backup_dir
.relative_path();
249 path
.push(&info
.filename
);
251 let index
= datastore
.open_dynamic_reader(&path
)?
;
253 let (csum
, size
) = index
.compute_csum();
254 if size
!= info
.size
{
255 bail
!("wrong size ({} != {})", info
.size
, size
);
258 if csum
!= info
.csum
{
259 bail
!("wrong index checksum");
267 info
.chunk_crypt_mode(),
272 /// Verify a single backup snapshot
274 /// This checks all archives inside a backup snapshot.
275 /// Errors are logged to the worker log.
278 /// - Ok(true) if verify is successful
279 /// - Ok(false) if there were verification errors
280 /// - Err(_) if task was aborted
281 pub fn verify_backup_dir(
282 datastore
: Arc
<DataStore
>,
283 backup_dir
: &BackupDir
,
284 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
285 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
286 worker
: Arc
<dyn TaskState
+ Send
+ Sync
>,
288 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
289 ) -> Result
<bool
, Error
> {
290 let snap_lock
= lock_dir_noblock_shared(
291 &datastore
.snapshot_path(&backup_dir
),
293 "locked by another operation");
295 Ok(snap_lock
) => verify_backup_dir_with_lock(
308 "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
318 /// See verify_backup_dir
319 pub fn verify_backup_dir_with_lock(
320 datastore
: Arc
<DataStore
>,
321 backup_dir
: &BackupDir
,
322 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
323 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
324 worker
: Arc
<dyn TaskState
+ Send
+ Sync
>,
326 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
328 ) -> Result
<bool
, Error
> {
329 let manifest
= match datastore
.load_manifest(&backup_dir
) {
330 Ok((manifest
, _
)) => manifest
,
334 "verify {}:{} - manifest load error: {}",
343 if let Some(filter
) = filter
{
344 if filter(&manifest
) == false {
347 "SKIPPED: verify {}:{} (recently verified)",
355 task_log
!(worker
, "verify {}:{}", datastore
.name(), backup_dir
);
357 let mut error_count
= 0;
359 let mut verify_result
= VerifyState
::Ok
;
360 for info
in manifest
.files() {
361 let result
= proxmox
::try_block
!({
362 task_log
!(worker
, " check {}", info
.filename
);
363 match archive_type(&info
.filename
)?
{
364 ArchiveType
::FixedIndex
=>
369 verified_chunks
.clone(),
370 corrupt_chunks
.clone(),
373 ArchiveType
::DynamicIndex
=>
374 verify_dynamic_index(
378 verified_chunks
.clone(),
379 corrupt_chunks
.clone(),
382 ArchiveType
::Blob
=> verify_blob(datastore
.clone(), &backup_dir
, info
),
386 worker
.check_abort()?
;
387 crate::tools
::fail_on_shutdown()?
;
389 if let Err(err
) = result
{
392 "verify {}:{}/{} failed: {}",
399 verify_result
= VerifyState
::Failed
;
404 let verify_state
= SnapshotVerifyState
{
405 state
: verify_result
,
408 let verify_state
= serde_json
::to_value(verify_state
)?
;
409 datastore
.update_manifest(&backup_dir
, |manifest
| {
410 manifest
.unprotected
["verify_state"] = verify_state
;
411 }).map_err(|err
| format_err
!("unable to update manifest blob - {}", err
))?
;
416 /// Verify all backups inside a backup group
418 /// Errors are logged to the worker log.
421 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
422 /// - Err(_) if task was aborted
423 pub fn verify_backup_group(
424 datastore
: Arc
<DataStore
>,
426 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
427 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
428 progress
: Option
<(usize, usize)>, // (done, snapshot_count)
429 worker
: Arc
<dyn TaskState
+ Send
+ Sync
>,
431 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
432 ) -> Result
<(usize, Vec
<String
>), Error
> {
434 let mut errors
= Vec
::new();
435 let mut list
= match group
.list_backups(&datastore
.base_path()) {
440 "verify group {}:{} - unable to list backups: {}",
445 return Ok((0, errors
));
449 task_log
!(worker
, "verify group {}:{}", datastore
.name(), group
);
451 let (done
, snapshot_count
) = progress
.unwrap_or((0, list
.len()));
454 BackupInfo
::sort_list(&mut list
, false); // newest first
458 if !verify_backup_dir(
461 verified_chunks
.clone(),
462 corrupt_chunks
.clone(),
467 errors
.push(info
.backup_dir
.to_string());
469 if snapshot_count
!= 0 {
470 let pos
= done
+ count
;
471 let percentage
= ((pos
as f64) * 100.0)/(snapshot_count
as f64);
474 "percentage done: {:.2}% ({} of {} snapshots)",
485 /// Verify all (owned) backups inside a datastore
487 /// Errors are logged to the worker log.
490 /// - Ok(failed_dirs) where failed_dirs had verification errors
491 /// - Err(_) if task was aborted
492 pub fn verify_all_backups(
493 datastore
: Arc
<DataStore
>,
494 worker
: Arc
<dyn TaskState
+ Send
+ Sync
>,
496 owner
: Option
<Authid
>,
497 filter
: Option
<&dyn Fn(&BackupManifest
) -> bool
>,
498 ) -> Result
<Vec
<String
>, Error
> {
499 let mut errors
= Vec
::new();
501 if let Some(owner
) = &owner
{
504 "verify datastore {} - limiting to backups owned by {}",
510 let filter_by_owner
= |group
: &BackupGroup
| {
511 if let Some(owner
) = &owner
{
512 match datastore
.get_owner(group
) {
513 Ok(ref group_owner
) => {
515 || (group_owner
.is_token()
517 && group_owner
.user() == owner
.user())
526 let mut list
= match BackupGroup
::list_groups(&datastore
.base_path()) {
529 .filter(|group
| !(group
.backup_type() == "host" && group
.backup_id() == "benchmark"))
530 .filter(filter_by_owner
)
531 .collect
::<Vec
<BackupGroup
>>(),
535 "verify datastore {} - unable to list backups: {}",
543 list
.sort_unstable();
545 let mut snapshot_count
= 0;
546 for group
in list
.iter() {
547 snapshot_count
+= group
.list_backups(&datastore
.base_path())?
.len();
550 // start with 16384 chunks (up to 65GB)
551 let verified_chunks
= Arc
::new(Mutex
::new(HashSet
::with_capacity(1024*16)));
553 // start with 64 chunks since we assume there are few corrupt ones
554 let corrupt_chunks
= Arc
::new(Mutex
::new(HashSet
::with_capacity(64)));
556 task_log
!(worker
, "verify datastore {} ({} snapshots)", datastore
.name(), snapshot_count
);
560 let (count
, mut group_errors
) = verify_backup_group(
563 verified_chunks
.clone(),
564 corrupt_chunks
.clone(),
565 Some((done
, snapshot_count
)),
570 errors
.append(&mut group_errors
);