1 use std
::collections
::HashSet
;
2 use std
::sync
::{Arc, Mutex}
;
3 use std
::sync
::atomic
::{Ordering, AtomicUsize}
;
4 use std
::time
::Instant
;
6 use anyhow
::{bail, format_err, Error}
;
25 fn verify_blob(datastore
: Arc
<DataStore
>, backup_dir
: &BackupDir
, info
: &FileInfo
) -> Result
<(), Error
> {
27 let blob
= datastore
.load_blob(backup_dir
, &info
.filename
)?
;
29 let raw_size
= blob
.raw_size();
30 if raw_size
!= info
.size
{
31 bail
!("wrong size ({} != {})", info
.size
, raw_size
);
34 let csum
= openssl
::sha
::sha256(blob
.raw_data());
35 if csum
!= info
.csum
{
36 bail
!("wrong index checksum");
39 match blob
.crypt_mode()?
{
40 CryptMode
::Encrypt
=> Ok(()),
42 // digest already verified above
43 blob
.decode(None
, None
)?
;
46 CryptMode
::SignOnly
=> bail
!("Invalid CryptMode for blob"),
50 fn rename_corrupted_chunk(
51 datastore
: Arc
<DataStore
>,
53 worker
: Arc
<WorkerTask
>,
55 let (path
, digest_str
) = datastore
.chunk_path(digest
);
58 let mut new_path
= path
.clone();
60 new_path
.set_file_name(format
!("{}.{}.bad", digest_str
, counter
));
61 if new_path
.exists() && counter
< 9 { counter += 1; }
else { break; }
64 match std
::fs
::rename(&path
, &new_path
) {
66 worker
.log(format
!("corrupted chunk renamed to {:?}", &new_path
));
70 std
::io
::ErrorKind
::NotFound
=> { /* ignored */ }
,
71 _
=> worker
.log(format
!("could not rename corrupted chunk {:?} - {}", &path
, err
))
77 // We use a separate thread to read/load chunks, so that we can do
78 // load and verify in parallel to increase performance.
79 fn chunk_reader_thread(
80 datastore
: Arc
<DataStore
>,
81 index
: Box
<dyn IndexFile
+ Send
>,
82 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
83 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
84 errors
: Arc
<AtomicUsize
>,
85 worker
: Arc
<WorkerTask
>,
86 ) -> std
::sync
::mpsc
::Receiver
<(DataBlob
, [u8;32], u64)> {
88 let (sender
, receiver
) = std
::sync
::mpsc
::sync_channel(3); // buffer up to 3 chunks
90 std
::thread
::spawn(move|| {
91 for pos
in 0..index
.index_count() {
92 let info
= index
.chunk_info(pos
).unwrap();
93 let size
= info
.range
.end
- info
.range
.start
;
95 if verified_chunks
.lock().unwrap().contains(&info
.digest
) {
96 continue; // already verified
99 if corrupt_chunks
.lock().unwrap().contains(&info
.digest
) {
100 let digest_str
= proxmox
::tools
::digest_to_hex(&info
.digest
);
101 worker
.log(format
!("chunk {} was marked as corrupt", digest_str
));
102 errors
.fetch_add(1, Ordering
::SeqCst
);
106 match datastore
.load_chunk(&info
.digest
) {
108 corrupt_chunks
.lock().unwrap().insert(info
.digest
);
109 worker
.log(format
!("can't verify chunk, load failed - {}", err
));
110 errors
.fetch_add(1, Ordering
::SeqCst
);
111 rename_corrupted_chunk(datastore
.clone(), &info
.digest
, worker
.clone());
115 if sender
.send((chunk
, info
.digest
, size
)).is_err() {
116 break; // receiver gone - simply stop
126 fn verify_index_chunks(
127 datastore
: Arc
<DataStore
>,
128 index
: Box
<dyn IndexFile
+ Send
>,
129 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
130 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
131 crypt_mode
: CryptMode
,
132 worker
: Arc
<WorkerTask
>,
133 ) -> Result
<(), Error
> {
135 let errors
= Arc
::new(AtomicUsize
::new(0));
137 let start_time
= Instant
::now();
139 let chunk_channel
= chunk_reader_thread(
142 verified_chunks
.clone(),
143 corrupt_chunks
.clone(),
148 let mut read_bytes
= 0;
149 let mut decoded_bytes
= 0;
153 worker
.fail_on_abort()?
;
154 crate::tools
::fail_on_shutdown()?
;
156 let (chunk
, digest
, size
) = match chunk_channel
.recv() {
158 Err(std
::sync
::mpsc
::RecvError
) => break,
161 read_bytes
+= chunk
.raw_size();
162 decoded_bytes
+= size
;
164 let chunk_crypt_mode
= match chunk
.crypt_mode() {
166 corrupt_chunks
.lock().unwrap().insert(digest
);
167 worker
.log(format
!("can't verify chunk, unknown CryptMode - {}", err
));
168 errors
.fetch_add(1, Ordering
::SeqCst
);
174 if chunk_crypt_mode
!= crypt_mode
{
176 "chunk CryptMode {:?} does not match index CryptMode {:?}",
180 errors
.fetch_add(1, Ordering
::SeqCst
);
183 if let Err(err
) = chunk
.verify_unencrypted(size
as usize, &digest
) {
184 corrupt_chunks
.lock().unwrap().insert(digest
);
185 worker
.log(format
!("{}", err
));
186 errors
.fetch_add(1, Ordering
::SeqCst
);
187 rename_corrupted_chunk(datastore
.clone(), &digest
, worker
.clone());
189 verified_chunks
.lock().unwrap().insert(digest
);
193 let elapsed
= start_time
.elapsed().as_secs_f64();
195 let read_bytes_mib
= (read_bytes
as f64)/(1024.0*1024.0);
196 let decoded_bytes_mib
= (decoded_bytes
as f64)/(1024.0*1024.0);
198 let read_speed
= read_bytes_mib
/elapsed
;
199 let decode_speed
= decoded_bytes_mib
/elapsed
;
201 let error_count
= errors
.load(Ordering
::SeqCst
);
203 worker
.log(format
!(" verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
204 read_bytes_mib
, decoded_bytes_mib
, elapsed
, read_speed
, decode_speed
, error_count
));
206 if errors
.load(Ordering
::SeqCst
) > 0 {
207 bail
!("chunks could not be verified");
213 fn verify_fixed_index(
214 datastore
: Arc
<DataStore
>,
215 backup_dir
: &BackupDir
,
217 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
218 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
219 worker
: Arc
<WorkerTask
>,
220 ) -> Result
<(), Error
> {
222 let mut path
= backup_dir
.relative_path();
223 path
.push(&info
.filename
);
225 let index
= datastore
.open_fixed_reader(&path
)?
;
227 let (csum
, size
) = index
.compute_csum();
228 if size
!= info
.size
{
229 bail
!("wrong size ({} != {})", info
.size
, size
);
232 if csum
!= info
.csum
{
233 bail
!("wrong index checksum");
236 verify_index_chunks(datastore
, Box
::new(index
), verified_chunks
, corrupt_chunks
, info
.chunk_crypt_mode(), worker
)
239 fn verify_dynamic_index(
240 datastore
: Arc
<DataStore
>,
241 backup_dir
: &BackupDir
,
243 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
244 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
245 worker
: Arc
<WorkerTask
>,
246 ) -> Result
<(), Error
> {
248 let mut path
= backup_dir
.relative_path();
249 path
.push(&info
.filename
);
251 let index
= datastore
.open_dynamic_reader(&path
)?
;
253 let (csum
, size
) = index
.compute_csum();
254 if size
!= info
.size
{
255 bail
!("wrong size ({} != {})", info
.size
, size
);
258 if csum
!= info
.csum
{
259 bail
!("wrong index checksum");
262 verify_index_chunks(datastore
, Box
::new(index
), verified_chunks
, corrupt_chunks
, info
.chunk_crypt_mode(), worker
)
265 /// Verify a single backup snapshot
267 /// This checks all archives inside a backup snapshot.
268 /// Errors are logged to the worker log.
271 /// - Ok(true) if verify is successful
272 /// - Ok(false) if there were verification errors
273 /// - Err(_) if task was aborted
274 pub fn verify_backup_dir(
275 datastore
: Arc
<DataStore
>,
276 backup_dir
: &BackupDir
,
277 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
278 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
279 worker
: Arc
<WorkerTask
>
280 ) -> Result
<bool
, Error
> {
282 let mut manifest
= match datastore
.load_manifest(&backup_dir
) {
283 Ok((manifest
, _
)) => manifest
,
285 worker
.log(format
!("verify {}:{} - manifest load error: {}", datastore
.name(), backup_dir
, err
));
290 worker
.log(format
!("verify {}:{}", datastore
.name(), backup_dir
));
292 let mut error_count
= 0;
294 let mut verify_result
= VerifyState
::Ok
;
295 for info
in manifest
.files() {
296 let result
= proxmox
::try_block
!({
297 worker
.log(format
!(" check {}", info
.filename
));
298 match archive_type(&info
.filename
)?
{
299 ArchiveType
::FixedIndex
=>
304 verified_chunks
.clone(),
305 corrupt_chunks
.clone(),
308 ArchiveType
::DynamicIndex
=>
309 verify_dynamic_index(
313 verified_chunks
.clone(),
314 corrupt_chunks
.clone(),
317 ArchiveType
::Blob
=> verify_blob(datastore
.clone(), &backup_dir
, info
),
321 worker
.fail_on_abort()?
;
322 crate::tools
::fail_on_shutdown()?
;
324 if let Err(err
) = result
{
325 worker
.log(format
!("verify {}:{}/{} failed: {}", datastore
.name(), backup_dir
, info
.filename
, err
));
327 verify_result
= VerifyState
::Failed
;
332 let verify_state
= SnapshotVerifyState
{
333 state
: verify_result
,
334 upid
: worker
.upid().clone(),
336 manifest
.unprotected
["verify_state"] = serde_json
::to_value(verify_state
)?
;
337 datastore
.store_manifest(&backup_dir
, serde_json
::to_value(manifest
)?
)
338 .map_err(|err
| format_err
!("unable to store manifest blob - {}", err
))?
;
343 /// Verify all backups inside a backup group
345 /// Errors are logged to the worker log.
348 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
349 /// - Err(_) if task was aborted
350 pub fn verify_backup_group(
351 datastore
: Arc
<DataStore
>,
353 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
354 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
355 progress
: Option
<(usize, usize)>, // (done, snapshot_count)
356 worker
: Arc
<WorkerTask
>,
357 ) -> Result
<(usize, Vec
<String
>), Error
> {
359 let mut errors
= Vec
::new();
360 let mut list
= match group
.list_backups(&datastore
.base_path()) {
363 worker
.log(format
!("verify group {}:{} - unable to list backups: {}", datastore
.name(), group
, err
));
364 return Ok((0, errors
));
368 worker
.log(format
!("verify group {}:{}", datastore
.name(), group
));
370 let (done
, snapshot_count
) = progress
.unwrap_or((0, list
.len()));
373 BackupInfo
::sort_list(&mut list
, false); // newest first
376 if !verify_backup_dir(datastore
.clone(), &info
.backup_dir
, verified_chunks
.clone(), corrupt_chunks
.clone(), worker
.clone())?
{
377 errors
.push(info
.backup_dir
.to_string());
379 if snapshot_count
!= 0 {
380 let pos
= done
+ count
;
381 let percentage
= ((pos
as f64) * 100.0)/(snapshot_count
as f64);
382 worker
.log(format
!("percentage done: {:.2}% ({} of {} snapshots)", percentage
, pos
, snapshot_count
));
389 /// Verify all backups inside a datastore
391 /// Errors are logged to the worker log.
394 /// - Ok(failed_dirs) where failed_dirs had verification errors
395 /// - Err(_) if task was aborted
396 pub fn verify_all_backups(datastore
: Arc
<DataStore
>, worker
: Arc
<WorkerTask
>) -> Result
<Vec
<String
>, Error
> {
398 let mut errors
= Vec
::new();
400 let mut list
= match BackupGroup
::list_groups(&datastore
.base_path()) {
403 .filter(|group
| !(group
.backup_type() == "host" && group
.backup_id() == "benchmark"))
404 .collect
::<Vec
<BackupGroup
>>(),
406 worker
.log(format
!("verify datastore {} - unable to list backups: {}", datastore
.name(), err
));
411 list
.sort_unstable();
413 let mut snapshot_count
= 0;
414 for group
in list
.iter() {
415 snapshot_count
+= group
.list_backups(&datastore
.base_path())?
.len();
418 // start with 16384 chunks (up to 65GB)
419 let verified_chunks
= Arc
::new(Mutex
::new(HashSet
::with_capacity(1024*16)));
421 // start with 64 chunks since we assume there are few corrupt ones
422 let corrupt_chunks
= Arc
::new(Mutex
::new(HashSet
::with_capacity(64)));
424 worker
.log(format
!("verify datastore {} ({} snapshots)", datastore
.name(), snapshot_count
));
428 let (count
, mut group_errors
) = verify_backup_group(
431 verified_chunks
.clone(),
432 corrupt_chunks
.clone(),
433 Some((done
, snapshot_count
)),
436 errors
.append(&mut group_errors
);