]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/verify.rs
datastore: add manifest locking
[proxmox-backup.git] / src / backup / verify.rs
1 use std::collections::HashSet;
2 use std::sync::{Arc, Mutex};
3 use std::sync::atomic::{Ordering, AtomicUsize};
4 use std::time::Instant;
5
6 use anyhow::{bail, format_err, Error};
7
8 use crate::{
9 api2::types::*,
10 backup::{
11 DataStore,
12 DataBlob,
13 BackupGroup,
14 BackupDir,
15 BackupInfo,
16 IndexFile,
17 CryptMode,
18 FileInfo,
19 ArchiveType,
20 archive_type,
21 },
22 server::UPID,
23 task::TaskState,
24 task_log,
25 tools::ParallelHandler,
26 tools::fs::lock_dir_noblock_shared,
27 };
28
29 fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
30
31 let blob = datastore.load_blob(backup_dir, &info.filename)?;
32
33 let raw_size = blob.raw_size();
34 if raw_size != info.size {
35 bail!("wrong size ({} != {})", info.size, raw_size);
36 }
37
38 let csum = openssl::sha::sha256(blob.raw_data());
39 if csum != info.csum {
40 bail!("wrong index checksum");
41 }
42
43 match blob.crypt_mode()? {
44 CryptMode::Encrypt => Ok(()),
45 CryptMode::None => {
46 // digest already verified above
47 blob.decode(None, None)?;
48 Ok(())
49 },
50 CryptMode::SignOnly => bail!("Invalid CryptMode for blob"),
51 }
52 }
53
54 fn rename_corrupted_chunk(
55 datastore: Arc<DataStore>,
56 digest: &[u8;32],
57 worker: &dyn TaskState,
58 ) {
59 let (path, digest_str) = datastore.chunk_path(digest);
60
61 let mut counter = 0;
62 let mut new_path = path.clone();
63 loop {
64 new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
65 if new_path.exists() && counter < 9 { counter += 1; } else { break; }
66 }
67
68 match std::fs::rename(&path, &new_path) {
69 Ok(_) => {
70 task_log!(worker, "corrupted chunk renamed to {:?}", &new_path);
71 },
72 Err(err) => {
73 match err.kind() {
74 std::io::ErrorKind::NotFound => { /* ignored */ },
75 _ => task_log!(worker, "could not rename corrupted chunk {:?} - {}", &path, err)
76 }
77 }
78 };
79 }
80
81 fn verify_index_chunks(
82 datastore: Arc<DataStore>,
83 index: Box<dyn IndexFile + Send>,
84 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
85 corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
86 crypt_mode: CryptMode,
87 worker: Arc<dyn TaskState + Send + Sync>,
88 ) -> Result<(), Error> {
89
90 let errors = Arc::new(AtomicUsize::new(0));
91
92 let start_time = Instant::now();
93
94 let mut read_bytes = 0;
95 let mut decoded_bytes = 0;
96
97 let worker2 = Arc::clone(&worker);
98 let datastore2 = Arc::clone(&datastore);
99 let corrupt_chunks2 = Arc::clone(&corrupt_chunks);
100 let verified_chunks2 = Arc::clone(&verified_chunks);
101 let errors2 = Arc::clone(&errors);
102
103 let decoder_pool = ParallelHandler::new(
104 "verify chunk decoder", 4,
105 move |(chunk, digest, size): (DataBlob, [u8;32], u64)| {
106 let chunk_crypt_mode = match chunk.crypt_mode() {
107 Err(err) => {
108 corrupt_chunks2.lock().unwrap().insert(digest);
109 task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err);
110 errors2.fetch_add(1, Ordering::SeqCst);
111 return Ok(());
112 },
113 Ok(mode) => mode,
114 };
115
116 if chunk_crypt_mode != crypt_mode {
117 task_log!(
118 worker2,
119 "chunk CryptMode {:?} does not match index CryptMode {:?}",
120 chunk_crypt_mode,
121 crypt_mode
122 );
123 errors2.fetch_add(1, Ordering::SeqCst);
124 }
125
126 if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
127 corrupt_chunks2.lock().unwrap().insert(digest);
128 task_log!(worker2, "{}", err);
129 errors2.fetch_add(1, Ordering::SeqCst);
130 rename_corrupted_chunk(datastore2.clone(), &digest, &worker2);
131 } else {
132 verified_chunks2.lock().unwrap().insert(digest);
133 }
134
135 Ok(())
136 }
137 );
138
139 for pos in 0..index.index_count() {
140
141 worker.check_abort()?;
142 crate::tools::fail_on_shutdown()?;
143
144 let info = index.chunk_info(pos).unwrap();
145 let size = info.size();
146
147 if verified_chunks.lock().unwrap().contains(&info.digest) {
148 continue; // already verified
149 }
150
151 if corrupt_chunks.lock().unwrap().contains(&info.digest) {
152 let digest_str = proxmox::tools::digest_to_hex(&info.digest);
153 task_log!(worker, "chunk {} was marked as corrupt", digest_str);
154 errors.fetch_add(1, Ordering::SeqCst);
155 continue;
156 }
157
158 match datastore.load_chunk(&info.digest) {
159 Err(err) => {
160 corrupt_chunks.lock().unwrap().insert(info.digest);
161 task_log!(worker, "can't verify chunk, load failed - {}", err);
162 errors.fetch_add(1, Ordering::SeqCst);
163 rename_corrupted_chunk(datastore.clone(), &info.digest, &worker);
164 continue;
165 }
166 Ok(chunk) => {
167 read_bytes += chunk.raw_size();
168 decoder_pool.send((chunk, info.digest, size))?;
169 decoded_bytes += size;
170 }
171 }
172 }
173
174 decoder_pool.complete()?;
175
176 let elapsed = start_time.elapsed().as_secs_f64();
177
178 let read_bytes_mib = (read_bytes as f64)/(1024.0*1024.0);
179 let decoded_bytes_mib = (decoded_bytes as f64)/(1024.0*1024.0);
180
181 let read_speed = read_bytes_mib/elapsed;
182 let decode_speed = decoded_bytes_mib/elapsed;
183
184 let error_count = errors.load(Ordering::SeqCst);
185
186 task_log!(
187 worker,
188 " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
189 read_bytes_mib,
190 decoded_bytes_mib,
191 elapsed,
192 read_speed,
193 decode_speed,
194 error_count,
195 );
196
197 if errors.load(Ordering::SeqCst) > 0 {
198 bail!("chunks could not be verified");
199 }
200
201 Ok(())
202 }
203
204 fn verify_fixed_index(
205 datastore: Arc<DataStore>,
206 backup_dir: &BackupDir,
207 info: &FileInfo,
208 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
209 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
210 worker: Arc<dyn TaskState + Send + Sync>,
211 ) -> Result<(), Error> {
212
213 let mut path = backup_dir.relative_path();
214 path.push(&info.filename);
215
216 let index = datastore.open_fixed_reader(&path)?;
217
218 let (csum, size) = index.compute_csum();
219 if size != info.size {
220 bail!("wrong size ({} != {})", info.size, size);
221 }
222
223 if csum != info.csum {
224 bail!("wrong index checksum");
225 }
226
227 verify_index_chunks(
228 datastore,
229 Box::new(index),
230 verified_chunks,
231 corrupt_chunks,
232 info.chunk_crypt_mode(),
233 worker,
234 )
235 }
236
237 fn verify_dynamic_index(
238 datastore: Arc<DataStore>,
239 backup_dir: &BackupDir,
240 info: &FileInfo,
241 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
242 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
243 worker: Arc<dyn TaskState + Send + Sync>,
244 ) -> Result<(), Error> {
245
246 let mut path = backup_dir.relative_path();
247 path.push(&info.filename);
248
249 let index = datastore.open_dynamic_reader(&path)?;
250
251 let (csum, size) = index.compute_csum();
252 if size != info.size {
253 bail!("wrong size ({} != {})", info.size, size);
254 }
255
256 if csum != info.csum {
257 bail!("wrong index checksum");
258 }
259
260 verify_index_chunks(
261 datastore,
262 Box::new(index),
263 verified_chunks,
264 corrupt_chunks,
265 info.chunk_crypt_mode(),
266 worker,
267 )
268 }
269
270 /// Verify a single backup snapshot
271 ///
272 /// This checks all archives inside a backup snapshot.
273 /// Errors are logged to the worker log.
274 ///
275 /// Returns
276 /// - Ok(true) if verify is successful
277 /// - Ok(false) if there were verification errors
278 /// - Err(_) if task was aborted
279 pub fn verify_backup_dir(
280 datastore: Arc<DataStore>,
281 backup_dir: &BackupDir,
282 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
283 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
284 worker: Arc<dyn TaskState + Send + Sync>,
285 upid: UPID,
286 ) -> Result<bool, Error> {
287
288 let _guard_res = lock_dir_noblock_shared(
289 &datastore.snapshot_path(&backup_dir),
290 "snapshot",
291 "locked by another operation");
292 if let Err(err) = _guard_res {
293 task_log!(
294 worker,
295 "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
296 datastore.name(),
297 backup_dir,
298 err,
299 );
300 return Ok(true);
301 }
302
303 let manifest = match datastore.load_manifest(&backup_dir) {
304 Ok((manifest, _)) => manifest,
305 Err(err) => {
306 task_log!(
307 worker,
308 "verify {}:{} - manifest load error: {}",
309 datastore.name(),
310 backup_dir,
311 err,
312 );
313 return Ok(false);
314 }
315 };
316
317 task_log!(worker, "verify {}:{}", datastore.name(), backup_dir);
318
319 let mut error_count = 0;
320
321 let mut verify_result = VerifyState::Ok;
322 for info in manifest.files() {
323 let result = proxmox::try_block!({
324 task_log!(worker, " check {}", info.filename);
325 match archive_type(&info.filename)? {
326 ArchiveType::FixedIndex =>
327 verify_fixed_index(
328 datastore.clone(),
329 &backup_dir,
330 info,
331 verified_chunks.clone(),
332 corrupt_chunks.clone(),
333 worker.clone(),
334 ),
335 ArchiveType::DynamicIndex =>
336 verify_dynamic_index(
337 datastore.clone(),
338 &backup_dir,
339 info,
340 verified_chunks.clone(),
341 corrupt_chunks.clone(),
342 worker.clone(),
343 ),
344 ArchiveType::Blob => verify_blob(datastore.clone(), &backup_dir, info),
345 }
346 });
347
348 worker.check_abort()?;
349 crate::tools::fail_on_shutdown()?;
350
351 if let Err(err) = result {
352 task_log!(
353 worker,
354 "verify {}:{}/{} failed: {}",
355 datastore.name(),
356 backup_dir,
357 info.filename,
358 err,
359 );
360 error_count += 1;
361 verify_result = VerifyState::Failed;
362 }
363
364 }
365
366 let verify_state = SnapshotVerifyState {
367 state: verify_result,
368 upid,
369 };
370 let verify_state = serde_json::to_value(verify_state)?;
371 datastore.update_manifest(&backup_dir, |manifest| {
372 manifest.unprotected["verify_state"] = verify_state;
373 }).map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
374
375 Ok(error_count == 0)
376 }
377
378 /// Verify all backups inside a backup group
379 ///
380 /// Errors are logged to the worker log.
381 ///
382 /// Returns
383 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
384 /// - Err(_) if task was aborted
385 pub fn verify_backup_group(
386 datastore: Arc<DataStore>,
387 group: &BackupGroup,
388 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
389 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
390 progress: Option<(usize, usize)>, // (done, snapshot_count)
391 worker: Arc<dyn TaskState + Send + Sync>,
392 upid: &UPID,
393 ) -> Result<(usize, Vec<String>), Error> {
394
395 let mut errors = Vec::new();
396 let mut list = match group.list_backups(&datastore.base_path()) {
397 Ok(list) => list,
398 Err(err) => {
399 task_log!(
400 worker,
401 "verify group {}:{} - unable to list backups: {}",
402 datastore.name(),
403 group,
404 err,
405 );
406 return Ok((0, errors));
407 }
408 };
409
410 task_log!(worker, "verify group {}:{}", datastore.name(), group);
411
412 let (done, snapshot_count) = progress.unwrap_or((0, list.len()));
413
414 let mut count = 0;
415 BackupInfo::sort_list(&mut list, false); // newest first
416 for info in list {
417 count += 1;
418 if !verify_backup_dir(
419 datastore.clone(),
420 &info.backup_dir,
421 verified_chunks.clone(),
422 corrupt_chunks.clone(),
423 worker.clone(),
424 upid.clone(),
425 )? {
426 errors.push(info.backup_dir.to_string());
427 }
428 if snapshot_count != 0 {
429 let pos = done + count;
430 let percentage = ((pos as f64) * 100.0)/(snapshot_count as f64);
431 task_log!(
432 worker,
433 "percentage done: {:.2}% ({} of {} snapshots)",
434 percentage,
435 pos,
436 snapshot_count,
437 );
438 }
439 }
440
441 Ok((count, errors))
442 }
443
444 /// Verify all backups inside a datastore
445 ///
446 /// Errors are logged to the worker log.
447 ///
448 /// Returns
449 /// - Ok(failed_dirs) where failed_dirs had verification errors
450 /// - Err(_) if task was aborted
451 pub fn verify_all_backups(
452 datastore: Arc<DataStore>,
453 worker: Arc<dyn TaskState + Send + Sync>,
454 upid: &UPID,
455 ) -> Result<Vec<String>, Error> {
456 let mut errors = Vec::new();
457
458 let mut list = match BackupGroup::list_groups(&datastore.base_path()) {
459 Ok(list) => list
460 .into_iter()
461 .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
462 .collect::<Vec<BackupGroup>>(),
463 Err(err) => {
464 task_log!(
465 worker,
466 "verify datastore {} - unable to list backups: {}",
467 datastore.name(),
468 err,
469 );
470 return Ok(errors);
471 }
472 };
473
474 list.sort_unstable();
475
476 let mut snapshot_count = 0;
477 for group in list.iter() {
478 snapshot_count += group.list_backups(&datastore.base_path())?.len();
479 }
480
481 // start with 16384 chunks (up to 65GB)
482 let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16)));
483
484 // start with 64 chunks since we assume there are few corrupt ones
485 let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
486
487 task_log!(worker, "verify datastore {} ({} snapshots)", datastore.name(), snapshot_count);
488
489 let mut done = 0;
490 for group in list {
491 let (count, mut group_errors) = verify_backup_group(
492 datastore.clone(),
493 &group,
494 verified_chunks.clone(),
495 corrupt_chunks.clone(),
496 Some((done, snapshot_count)),
497 worker.clone(),
498 upid,
499 )?;
500 errors.append(&mut group_errors);
501
502 done += count;
503 }
504
505 Ok(errors)
506 }