1 use std
::collections
::HashSet
;
2 use std
::sync
::{Arc, Mutex}
;
3 use std
::sync
::atomic
::{Ordering, AtomicUsize}
;
4 use std
::time
::Instant
;
6 use anyhow
::{bail, format_err, Error}
;
8 use crate::server
::WorkerTask
;
9 use crate::api2
::types
::*;
12 DataStore
, DataBlob
, BackupGroup
, BackupDir
, BackupInfo
, IndexFile
,
14 FileInfo
, ArchiveType
, archive_type
,
17 fn verify_blob(datastore
: Arc
<DataStore
>, backup_dir
: &BackupDir
, info
: &FileInfo
) -> Result
<(), Error
> {
19 let blob
= datastore
.load_blob(backup_dir
, &info
.filename
)?
;
21 let raw_size
= blob
.raw_size();
22 if raw_size
!= info
.size
{
23 bail
!("wrong size ({} != {})", info
.size
, raw_size
);
26 let csum
= openssl
::sha
::sha256(blob
.raw_data());
27 if csum
!= info
.csum
{
28 bail
!("wrong index checksum");
31 match blob
.crypt_mode()?
{
32 CryptMode
::Encrypt
=> Ok(()),
34 // digest already verified above
35 blob
.decode(None
, None
)?
;
38 CryptMode
::SignOnly
=> bail
!("Invalid CryptMode for blob"),
42 // We use a separate thread to read/load chunks, so that we can do
43 // load and verify in parallel to increase performance.
44 fn chunk_reader_thread(
45 datastore
: Arc
<DataStore
>,
46 index
: Box
<dyn IndexFile
+ Send
>,
47 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
48 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
49 errors
: Arc
<AtomicUsize
>,
50 worker
: Arc
<WorkerTask
>,
51 ) -> std
::sync
::mpsc
::Receiver
<(DataBlob
, [u8;32], u64)> {
53 let (sender
, receiver
) = std
::sync
::mpsc
::sync_channel(3); // buffer up to 3 chunks
55 std
::thread
::spawn(move|| {
56 for pos
in 0..index
.index_count() {
57 let info
= index
.chunk_info(pos
).unwrap();
58 let size
= info
.range
.end
- info
.range
.start
;
60 if verified_chunks
.lock().unwrap().contains(&info
.digest
) {
61 continue; // already verified
64 if corrupt_chunks
.lock().unwrap().contains(&info
.digest
) {
65 let digest_str
= proxmox
::tools
::digest_to_hex(&info
.digest
);
66 worker
.log(format
!("chunk {} was marked as corrupt", digest_str
));
67 errors
.fetch_add(1, Ordering
::SeqCst
);
71 match datastore
.load_chunk(&info
.digest
) {
73 corrupt_chunks
.lock().unwrap().insert(info
.digest
);
74 worker
.log(format
!("can't verify chunk, load failed - {}", err
));
75 errors
.fetch_add(1, Ordering
::SeqCst
);
79 if sender
.send((chunk
, info
.digest
, size
)).is_err() {
80 break; // receiver gone - simply stop
90 fn verify_index_chunks(
91 datastore
: Arc
<DataStore
>,
92 index
: Box
<dyn IndexFile
+ Send
>,
93 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
94 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8; 32]>>>,
95 crypt_mode
: CryptMode
,
96 worker
: Arc
<WorkerTask
>,
97 ) -> Result
<(), Error
> {
99 let errors
= Arc
::new(AtomicUsize
::new(0));
101 let start_time
= Instant
::now();
103 let chunk_channel
= chunk_reader_thread(
106 verified_chunks
.clone(),
107 corrupt_chunks
.clone(),
112 let mut read_bytes
= 0;
113 let mut decoded_bytes
= 0;
117 worker
.fail_on_abort()?
;
119 let (chunk
, digest
, size
) = match chunk_channel
.recv() {
121 Err(std
::sync
::mpsc
::RecvError
) => break,
124 read_bytes
+= chunk
.raw_size();
125 decoded_bytes
+= size
;
127 let chunk_crypt_mode
= match chunk
.crypt_mode() {
129 corrupt_chunks
.lock().unwrap().insert(digest
);
130 worker
.log(format
!("can't verify chunk, unknown CryptMode - {}", err
));
131 errors
.fetch_add(1, Ordering
::SeqCst
);
137 if chunk_crypt_mode
!= crypt_mode
{
139 "chunk CryptMode {:?} does not match index CryptMode {:?}",
143 errors
.fetch_add(1, Ordering
::SeqCst
);
146 if let Err(err
) = chunk
.verify_unencrypted(size
as usize, &digest
) {
147 corrupt_chunks
.lock().unwrap().insert(digest
);
148 worker
.log(format
!("{}", err
));
149 errors
.fetch_add(1, Ordering
::SeqCst
);
151 verified_chunks
.lock().unwrap().insert(digest
);
155 let elapsed
= start_time
.elapsed().as_secs_f64();
157 let read_bytes_mib
= (read_bytes
as f64)/(1024.0*1024.0);
158 let decoded_bytes_mib
= (decoded_bytes
as f64)/(1024.0*1024.0);
160 let read_speed
= read_bytes_mib
/elapsed
;
161 let decode_speed
= decoded_bytes_mib
/elapsed
;
163 let error_count
= errors
.load(Ordering
::SeqCst
);
165 worker
.log(format
!(" verified {:.2}/{:.2} Mib in {:.2} seconds, speed {:.2}/{:.2} Mib/s ({} errors)",
166 read_bytes_mib
, decoded_bytes_mib
, elapsed
, read_speed
, decode_speed
, error_count
));
168 if errors
.load(Ordering
::SeqCst
) > 0 {
169 bail
!("chunks could not be verified");
175 fn verify_fixed_index(
176 datastore
: Arc
<DataStore
>,
177 backup_dir
: &BackupDir
,
179 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
180 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
181 worker
: Arc
<WorkerTask
>,
182 ) -> Result
<(), Error
> {
184 let mut path
= backup_dir
.relative_path();
185 path
.push(&info
.filename
);
187 let index
= datastore
.open_fixed_reader(&path
)?
;
189 let (csum
, size
) = index
.compute_csum();
190 if size
!= info
.size
{
191 bail
!("wrong size ({} != {})", info
.size
, size
);
194 if csum
!= info
.csum
{
195 bail
!("wrong index checksum");
198 verify_index_chunks(datastore
, Box
::new(index
), verified_chunks
, corrupt_chunks
, info
.chunk_crypt_mode(), worker
)
201 fn verify_dynamic_index(
202 datastore
: Arc
<DataStore
>,
203 backup_dir
: &BackupDir
,
205 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
206 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
207 worker
: Arc
<WorkerTask
>,
208 ) -> Result
<(), Error
> {
210 let mut path
= backup_dir
.relative_path();
211 path
.push(&info
.filename
);
213 let index
= datastore
.open_dynamic_reader(&path
)?
;
215 let (csum
, size
) = index
.compute_csum();
216 if size
!= info
.size
{
217 bail
!("wrong size ({} != {})", info
.size
, size
);
220 if csum
!= info
.csum
{
221 bail
!("wrong index checksum");
224 verify_index_chunks(datastore
, Box
::new(index
), verified_chunks
, corrupt_chunks
, info
.chunk_crypt_mode(), worker
)
227 /// Verify a single backup snapshot
229 /// This checks all archives inside a backup snapshot.
230 /// Errors are logged to the worker log.
233 /// - Ok(true) if verify is successful
234 /// - Ok(false) if there were verification errors
235 /// - Err(_) if task was aborted
236 pub fn verify_backup_dir(
237 datastore
: Arc
<DataStore
>,
238 backup_dir
: &BackupDir
,
239 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
240 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
241 worker
: Arc
<WorkerTask
>
242 ) -> Result
<bool
, Error
> {
244 let mut manifest
= match datastore
.load_manifest(&backup_dir
) {
245 Ok((manifest
, _
)) => manifest
,
247 worker
.log(format
!("verify {}:{} - manifest load error: {}", datastore
.name(), backup_dir
, err
));
252 worker
.log(format
!("verify {}:{}", datastore
.name(), backup_dir
));
254 let mut error_count
= 0;
256 let mut verify_result
= "ok";
257 for info
in manifest
.files() {
258 let result
= proxmox
::try_block
!({
259 worker
.log(format
!(" check {}", info
.filename
));
260 match archive_type(&info
.filename
)?
{
261 ArchiveType
::FixedIndex
=>
266 verified_chunks
.clone(),
267 corrupt_chunks
.clone(),
270 ArchiveType
::DynamicIndex
=>
271 verify_dynamic_index(
275 verified_chunks
.clone(),
276 corrupt_chunks
.clone(),
279 ArchiveType
::Blob
=> verify_blob(datastore
.clone(), &backup_dir
, info
),
283 worker
.fail_on_abort()?
;
285 if let Err(err
) = result
{
286 worker
.log(format
!("verify {}:{}/{} failed: {}", datastore
.name(), backup_dir
, info
.filename
, err
));
288 verify_result
= "failed";
293 let verify_state
= SnapshotVerifyState
{
294 state
: verify_result
.to_string(),
295 upid
: worker
.upid().clone(),
297 manifest
.unprotected
["verify_state"] = serde_json
::to_value(verify_state
)?
;
298 datastore
.store_manifest(&backup_dir
, serde_json
::to_value(manifest
)?
)
299 .map_err(|err
| format_err
!("unable to store manifest blob - {}", err
))?
;
305 /// Verify all backups inside a backup group
307 /// Errors are logged to the worker log.
310 /// - Ok(failed_dirs) where failed_dirs had verification errors
311 /// - Err(_) if task was aborted
312 pub fn verify_backup_group(
313 datastore
: Arc
<DataStore
>,
315 verified_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
316 corrupt_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
317 worker
: Arc
<WorkerTask
>,
318 ) -> Result
<Vec
<String
>, Error
> {
320 let mut errors
= Vec
::new();
321 let mut list
= match group
.list_backups(&datastore
.base_path()) {
324 worker
.log(format
!("verify group {}:{} - unable to list backups: {}", datastore
.name(), group
, err
));
329 worker
.log(format
!("verify group {}:{}", datastore
.name(), group
));
331 BackupInfo
::sort_list(&mut list
, false); // newest first
333 if !verify_backup_dir(datastore
.clone(), &info
.backup_dir
, verified_chunks
.clone(), corrupt_chunks
.clone(), worker
.clone())?
{
334 errors
.push(info
.backup_dir
.to_string());
341 /// Verify all backups inside a datastore
343 /// Errors are logged to the worker log.
346 /// - Ok(failed_dirs) where failed_dirs had verification errors
347 /// - Err(_) if task was aborted
348 pub fn verify_all_backups(datastore
: Arc
<DataStore
>, worker
: Arc
<WorkerTask
>) -> Result
<Vec
<String
>, Error
> {
350 let mut errors
= Vec
::new();
352 let mut list
= match BackupGroup
::list_groups(&datastore
.base_path()) {
355 worker
.log(format
!("verify datastore {} - unable to list backups: {}", datastore
.name(), err
));
360 list
.sort_unstable();
362 // start with 16384 chunks (up to 65GB)
363 let verified_chunks
= Arc
::new(Mutex
::new(HashSet
::with_capacity(1024*16)));
365 // start with 64 chunks since we assume there are few corrupt ones
366 let corrupt_chunks
= Arc
::new(Mutex
::new(HashSet
::with_capacity(64)));
368 worker
.log(format
!("verify datastore {}", datastore
.name()));
371 let mut group_errors
= verify_backup_group(
374 verified_chunks
.clone(),
375 corrupt_chunks
.clone(),
378 errors
.append(&mut group_errors
);