]> git.proxmox.com Git - proxmox-backup.git/blame - src/backup/verify.rs
src/client/pull.rs: simplify code
[proxmox-backup.git] / src / backup / verify.rs
CommitLineData
2aaae970 1use std::collections::HashSet;
6b809ff5
DM
2use std::sync::{Arc, Mutex};
3use std::sync::atomic::{Ordering, AtomicUsize};
4use std::time::Instant;
2aaae970 5
3b2046d2 6use anyhow::{bail, format_err, Error};
c2009e53
DM
7
8use crate::server::WorkerTask;
3b2046d2 9use crate::api2::types::*;
c2009e53
DM
10
11use super::{
6b809ff5 12 DataStore, DataBlob, BackupGroup, BackupDir, BackupInfo, IndexFile,
8819d1f2 13 CryptMode,
c2009e53
DM
14 FileInfo, ArchiveType, archive_type,
15};
16
6b809ff5 17fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
c2009e53 18
39f18b30 19 let blob = datastore.load_blob(backup_dir, &info.filename)?;
c2009e53 20
2aaae970 21 let raw_size = blob.raw_size();
c2009e53
DM
22 if raw_size != info.size {
23 bail!("wrong size ({} != {})", info.size, raw_size);
24 }
25
39f18b30 26 let csum = openssl::sha::sha256(blob.raw_data());
c2009e53
DM
27 if csum != info.csum {
28 bail!("wrong index checksum");
29 }
30
8819d1f2
FG
31 match blob.crypt_mode()? {
32 CryptMode::Encrypt => Ok(()),
33 CryptMode::None => {
34 // digest already verified above
35 blob.decode(None, None)?;
36 Ok(())
37 },
38 CryptMode::SignOnly => bail!("Invalid CryptMode for blob"),
c2009e53 39 }
c2009e53
DM
40}
41
0f3b7efa
SR
42fn rename_corrupted_chunk(
43 datastore: Arc<DataStore>,
44 digest: &[u8;32],
45 worker: Arc<WorkerTask>,
46) {
47 let (path, digest_str) = datastore.chunk_path(digest);
48
49 let mut counter = 0;
50 let mut new_path = path.clone();
aadcc281 51 loop {
0f3b7efa 52 new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
aadcc281 53 if new_path.exists() && counter < 9 { counter += 1; } else { break; }
0f3b7efa
SR
54 }
55
56 match std::fs::rename(&path, &new_path) {
57 Ok(_) => {
58 worker.log(format!("corrupted chunk renamed to {:?}", &new_path));
59 },
60 Err(err) => {
61 match err.kind() {
62 std::io::ErrorKind::NotFound => { /* ignored */ },
63 _ => worker.log(format!("could not rename corrupted chunk {:?} - {}", &path, err))
64 }
65 }
66 };
67}
68
6b809ff5
DM
69// We use a separate thread to read/load chunks, so that we can do
70// load and verify in parallel to increase performance.
71fn chunk_reader_thread(
72 datastore: Arc<DataStore>,
73 index: Box<dyn IndexFile + Send>,
74 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
75 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
76 errors: Arc<AtomicUsize>,
77 worker: Arc<WorkerTask>,
78) -> std::sync::mpsc::Receiver<(DataBlob, [u8;32], u64)> {
79
80 let (sender, receiver) = std::sync::mpsc::sync_channel(3); // buffer up to 3 chunks
81
82 std::thread::spawn(move|| {
83 for pos in 0..index.index_count() {
84 let info = index.chunk_info(pos).unwrap();
85 let size = info.range.end - info.range.start;
86
87 if verified_chunks.lock().unwrap().contains(&info.digest) {
88 continue; // already verified
89 }
90
91 if corrupt_chunks.lock().unwrap().contains(&info.digest) {
92 let digest_str = proxmox::tools::digest_to_hex(&info.digest);
93 worker.log(format!("chunk {} was marked as corrupt", digest_str));
94 errors.fetch_add(1, Ordering::SeqCst);
95 continue;
96 }
97
98 match datastore.load_chunk(&info.digest) {
99 Err(err) => {
100 corrupt_chunks.lock().unwrap().insert(info.digest);
101 worker.log(format!("can't verify chunk, load failed - {}", err));
102 errors.fetch_add(1, Ordering::SeqCst);
0f3b7efa 103 rename_corrupted_chunk(datastore.clone(), &info.digest, worker.clone());
6b809ff5
DM
104 continue;
105 }
106 Ok(chunk) => {
107 if sender.send((chunk, info.digest, size)).is_err() {
108 break; // receiver gone - simply stop
109 }
110 }
111 }
112 }
113 });
114
115 receiver
116}
117
fdaab0df 118fn verify_index_chunks(
6b809ff5
DM
119 datastore: Arc<DataStore>,
120 index: Box<dyn IndexFile + Send>,
121 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
122 corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
9a38fa29 123 crypt_mode: CryptMode,
6b809ff5 124 worker: Arc<WorkerTask>,
fdaab0df
DM
125) -> Result<(), Error> {
126
6b809ff5 127 let errors = Arc::new(AtomicUsize::new(0));
fdaab0df 128
6b809ff5 129 let start_time = Instant::now();
fdaab0df 130
6b809ff5 131 let chunk_channel = chunk_reader_thread(
0f3b7efa 132 datastore.clone(),
6b809ff5
DM
133 index,
134 verified_chunks.clone(),
135 corrupt_chunks.clone(),
136 errors.clone(),
137 worker.clone(),
138 );
7ae571e7 139
6b809ff5
DM
140 let mut read_bytes = 0;
141 let mut decoded_bytes = 0;
7ae571e7 142
6b809ff5 143 loop {
2aaae970 144
6b809ff5 145 worker.fail_on_abort()?;
deef6369 146 crate::tools::fail_on_shutdown()?;
6b809ff5
DM
147
148 let (chunk, digest, size) = match chunk_channel.recv() {
149 Ok(tuple) => tuple,
150 Err(std::sync::mpsc::RecvError) => break,
9a38fa29
FG
151 };
152
6b809ff5
DM
153 read_bytes += chunk.raw_size();
154 decoded_bytes += size;
155
9a38fa29
FG
156 let chunk_crypt_mode = match chunk.crypt_mode() {
157 Err(err) => {
6b809ff5 158 corrupt_chunks.lock().unwrap().insert(digest);
9a38fa29 159 worker.log(format!("can't verify chunk, unknown CryptMode - {}", err));
6b809ff5 160 errors.fetch_add(1, Ordering::SeqCst);
9a38fa29
FG
161 continue;
162 },
163 Ok(mode) => mode,
164 };
165
166 if chunk_crypt_mode != crypt_mode {
167 worker.log(format!(
168 "chunk CryptMode {:?} does not match index CryptMode {:?}",
169 chunk_crypt_mode,
170 crypt_mode
171 ));
6b809ff5 172 errors.fetch_add(1, Ordering::SeqCst);
9a38fa29
FG
173 }
174
6b809ff5
DM
175 if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
176 corrupt_chunks.lock().unwrap().insert(digest);
7ae571e7 177 worker.log(format!("{}", err));
6b809ff5 178 errors.fetch_add(1, Ordering::SeqCst);
0f3b7efa 179 rename_corrupted_chunk(datastore.clone(), &digest, worker.clone());
7ae571e7 180 } else {
6b809ff5 181 verified_chunks.lock().unwrap().insert(digest);
2aaae970 182 }
fdaab0df
DM
183 }
184
6b809ff5
DM
185 let elapsed = start_time.elapsed().as_secs_f64();
186
187 let read_bytes_mib = (read_bytes as f64)/(1024.0*1024.0);
188 let decoded_bytes_mib = (decoded_bytes as f64)/(1024.0*1024.0);
189
190 let read_speed = read_bytes_mib/elapsed;
191 let decode_speed = decoded_bytes_mib/elapsed;
192
193 let error_count = errors.load(Ordering::SeqCst);
194
7c77e2f9 195 worker.log(format!(" verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
6b809ff5
DM
196 read_bytes_mib, decoded_bytes_mib, elapsed, read_speed, decode_speed, error_count));
197
198 if errors.load(Ordering::SeqCst) > 0 {
f66f537d
DC
199 bail!("chunks could not be verified");
200 }
201
fdaab0df
DM
202 Ok(())
203}
204
2aaae970 205fn verify_fixed_index(
6b809ff5 206 datastore: Arc<DataStore>,
2aaae970
DM
207 backup_dir: &BackupDir,
208 info: &FileInfo,
6b809ff5
DM
209 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
210 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
211 worker: Arc<WorkerTask>,
2aaae970 212) -> Result<(), Error> {
c2009e53
DM
213
214 let mut path = backup_dir.relative_path();
215 path.push(&info.filename);
216
217 let index = datastore.open_fixed_reader(&path)?;
218
219 let (csum, size) = index.compute_csum();
220 if size != info.size {
221 bail!("wrong size ({} != {})", info.size, size);
222 }
223
224 if csum != info.csum {
225 bail!("wrong index checksum");
226 }
227
9a38fa29 228 verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker)
c2009e53
DM
229}
230
2aaae970 231fn verify_dynamic_index(
6b809ff5 232 datastore: Arc<DataStore>,
2aaae970
DM
233 backup_dir: &BackupDir,
234 info: &FileInfo,
6b809ff5
DM
235 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
236 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
237 worker: Arc<WorkerTask>,
2aaae970
DM
238) -> Result<(), Error> {
239
c2009e53
DM
240 let mut path = backup_dir.relative_path();
241 path.push(&info.filename);
242
243 let index = datastore.open_dynamic_reader(&path)?;
244
245 let (csum, size) = index.compute_csum();
246 if size != info.size {
247 bail!("wrong size ({} != {})", info.size, size);
248 }
249
250 if csum != info.csum {
251 bail!("wrong index checksum");
252 }
253
9a38fa29 254 verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker)
c2009e53
DM
255}
256
257/// Verify a single backup snapshot
258///
259/// This checks all archives inside a backup snapshot.
260/// Errors are logged to the worker log.
261///
8ea00f6e
DM
262/// Returns
263/// - Ok(true) if verify is successful
264/// - Ok(false) if there were verification errors
265/// - Err(_) if task was aborted
2aaae970 266pub fn verify_backup_dir(
6b809ff5 267 datastore: Arc<DataStore>,
2aaae970 268 backup_dir: &BackupDir,
6b809ff5
DM
269 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
270 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
271 worker: Arc<WorkerTask>
2aaae970 272) -> Result<bool, Error> {
c2009e53 273
3b2046d2 274 let mut manifest = match datastore.load_manifest(&backup_dir) {
ff86ef00 275 Ok((manifest, _)) => manifest,
c2009e53
DM
276 Err(err) => {
277 worker.log(format!("verify {}:{} - manifest load error: {}", datastore.name(), backup_dir, err));
8ea00f6e 278 return Ok(false);
c2009e53
DM
279 }
280 };
281
282 worker.log(format!("verify {}:{}", datastore.name(), backup_dir));
283
284 let mut error_count = 0;
285
d10332a1 286 let mut verify_result = VerifyState::Ok;
c2009e53
DM
287 for info in manifest.files() {
288 let result = proxmox::try_block!({
289 worker.log(format!(" check {}", info.filename));
290 match archive_type(&info.filename)? {
d8594d87
DC
291 ArchiveType::FixedIndex =>
292 verify_fixed_index(
6b809ff5 293 datastore.clone(),
d8594d87
DC
294 &backup_dir,
295 info,
6b809ff5
DM
296 verified_chunks.clone(),
297 corrupt_chunks.clone(),
298 worker.clone(),
d8594d87
DC
299 ),
300 ArchiveType::DynamicIndex =>
301 verify_dynamic_index(
6b809ff5 302 datastore.clone(),
d8594d87
DC
303 &backup_dir,
304 info,
6b809ff5
DM
305 verified_chunks.clone(),
306 corrupt_chunks.clone(),
307 worker.clone(),
d8594d87 308 ),
6b809ff5 309 ArchiveType::Blob => verify_blob(datastore.clone(), &backup_dir, info),
c2009e53
DM
310 }
311 });
8ea00f6e
DM
312
313 worker.fail_on_abort()?;
deef6369 314 crate::tools::fail_on_shutdown()?;
8ea00f6e 315
c2009e53
DM
316 if let Err(err) = result {
317 worker.log(format!("verify {}:{}/{} failed: {}", datastore.name(), backup_dir, info.filename, err));
318 error_count += 1;
d10332a1 319 verify_result = VerifyState::Failed;
c2009e53 320 }
3b2046d2 321
c2009e53
DM
322 }
323
3b2046d2 324 let verify_state = SnapshotVerifyState {
d10332a1 325 state: verify_result,
3b2046d2
TL
326 upid: worker.upid().clone(),
327 };
328 manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?;
329 datastore.store_manifest(&backup_dir, serde_json::to_value(manifest)?)
330 .map_err(|err| format_err!("unable to store manifest blob - {}", err))?;
331
8ea00f6e 332 Ok(error_count == 0)
c2009e53
DM
333}
334
8ea00f6e
DM
335/// Verify all backups inside a backup group
336///
337/// Errors are logged to the worker log.
338///
339/// Returns
63d9aca9 340/// - Ok((count, failed_dirs)) where failed_dirs had verification errors
8ea00f6e 341/// - Err(_) if task was aborted
4f09d310
DM
342pub fn verify_backup_group(
343 datastore: Arc<DataStore>,
344 group: &BackupGroup,
345 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
346 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
63d9aca9 347 progress: Option<(usize, usize)>, // (done, snapshot_count)
4f09d310 348 worker: Arc<WorkerTask>,
63d9aca9 349) -> Result<(usize, Vec<String>), Error> {
c2009e53 350
adfdc369 351 let mut errors = Vec::new();
c2009e53
DM
352 let mut list = match group.list_backups(&datastore.base_path()) {
353 Ok(list) => list,
354 Err(err) => {
355 worker.log(format!("verify group {}:{} - unable to list backups: {}", datastore.name(), group, err));
63d9aca9 356 return Ok((0, errors));
c2009e53
DM
357 }
358 };
359
360 worker.log(format!("verify group {}:{}", datastore.name(), group));
361
63d9aca9
DM
362 let (done, snapshot_count) = progress.unwrap_or((0, list.len()));
363
364 let mut count = 0;
c2009e53
DM
365 BackupInfo::sort_list(&mut list, false); // newest first
366 for info in list {
63d9aca9 367 count += 1;
6b809ff5 368 if !verify_backup_dir(datastore.clone(), &info.backup_dir, verified_chunks.clone(), corrupt_chunks.clone(), worker.clone())?{
adfdc369 369 errors.push(info.backup_dir.to_string());
c2009e53 370 }
63d9aca9
DM
371 if snapshot_count != 0 {
372 let pos = done + count;
373 let percentage = ((pos as f64) * 100.0)/(snapshot_count as f64);
374 worker.log(format!("percentage done: {:.2}% ({} of {} snapshots)", percentage, pos, snapshot_count));
375 }
c2009e53
DM
376 }
377
63d9aca9 378 Ok((count, errors))
c2009e53
DM
379}
380
8ea00f6e
DM
381/// Verify all backups inside a datastore
382///
383/// Errors are logged to the worker log.
384///
385/// Returns
adfdc369 386/// - Ok(failed_dirs) where failed_dirs had verification errors
8ea00f6e 387/// - Err(_) if task was aborted
6b809ff5 388pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) -> Result<Vec<String>, Error> {
adfdc369
DC
389
390 let mut errors = Vec::new();
c2009e53 391
4264c502 392 let mut list = match BackupGroup::list_groups(&datastore.base_path()) {
5656888c
DM
393 Ok(list) => list
394 .into_iter()
395 .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
396 .collect::<Vec<BackupGroup>>(),
c2009e53
DM
397 Err(err) => {
398 worker.log(format!("verify datastore {} - unable to list backups: {}", datastore.name(), err));
adfdc369 399 return Ok(errors);
c2009e53
DM
400 }
401 };
402
4264c502
DM
403 list.sort_unstable();
404
63d9aca9
DM
405 let mut snapshot_count = 0;
406 for group in list.iter() {
407 snapshot_count += group.list_backups(&datastore.base_path())?.len();
408 }
409
4f09d310
DM
410 // start with 16384 chunks (up to 65GB)
411 let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16)));
412
413 // start with 64 chunks since we assume there are few corrupt ones
414 let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
415
63d9aca9 416 worker.log(format!("verify datastore {} ({} snapshots)", datastore.name(), snapshot_count));
c2009e53 417
63d9aca9 418 let mut done = 0;
c2009e53 419 for group in list {
63d9aca9 420 let (count, mut group_errors) = verify_backup_group(
4f09d310
DM
421 datastore.clone(),
422 &group,
423 verified_chunks.clone(),
424 corrupt_chunks.clone(),
63d9aca9 425 Some((done, snapshot_count)),
4f09d310
DM
426 worker.clone(),
427 )?;
adfdc369 428 errors.append(&mut group_errors);
63d9aca9
DM
429
430 done += count;
c2009e53
DM
431 }
432
adfdc369 433 Ok(errors)
c2009e53 434}