]>
Commit | Line | Data |
---|---|---|
1 | use std::collections::HashSet; | |
2 | use std::sync::{Arc, Mutex}; | |
3 | use std::sync::atomic::{Ordering, AtomicUsize}; | |
4 | use std::time::Instant; | |
5 | use nix::dir::Dir; | |
6 | ||
7 | use anyhow::{bail, format_err, Error}; | |
8 | ||
9 | use crate::{ | |
10 | api2::types::*, | |
11 | backup::{ | |
12 | DataStore, | |
13 | DataBlob, | |
14 | BackupGroup, | |
15 | BackupDir, | |
16 | BackupInfo, | |
17 | BackupManifest, | |
18 | IndexFile, | |
19 | CryptMode, | |
20 | FileInfo, | |
21 | ArchiveType, | |
22 | archive_type, | |
23 | }, | |
24 | server::UPID, | |
25 | task::TaskState, | |
26 | task_log, | |
27 | tools::ParallelHandler, | |
28 | tools::fs::lock_dir_noblock_shared, | |
29 | }; | |
30 | ||
31 | fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> { | |
32 | ||
33 | let blob = datastore.load_blob(backup_dir, &info.filename)?; | |
34 | ||
35 | let raw_size = blob.raw_size(); | |
36 | if raw_size != info.size { | |
37 | bail!("wrong size ({} != {})", info.size, raw_size); | |
38 | } | |
39 | ||
40 | let csum = openssl::sha::sha256(blob.raw_data()); | |
41 | if csum != info.csum { | |
42 | bail!("wrong index checksum"); | |
43 | } | |
44 | ||
45 | match blob.crypt_mode()? { | |
46 | CryptMode::Encrypt => Ok(()), | |
47 | CryptMode::None => { | |
48 | // digest already verified above | |
49 | blob.decode(None, None)?; | |
50 | Ok(()) | |
51 | }, | |
52 | CryptMode::SignOnly => bail!("Invalid CryptMode for blob"), | |
53 | } | |
54 | } | |
55 | ||
56 | fn rename_corrupted_chunk( | |
57 | datastore: Arc<DataStore>, | |
58 | digest: &[u8;32], | |
59 | worker: &dyn TaskState, | |
60 | ) { | |
61 | let (path, digest_str) = datastore.chunk_path(digest); | |
62 | ||
63 | let mut counter = 0; | |
64 | let mut new_path = path.clone(); | |
65 | loop { | |
66 | new_path.set_file_name(format!("{}.{}.bad", digest_str, counter)); | |
67 | if new_path.exists() && counter < 9 { counter += 1; } else { break; } | |
68 | } | |
69 | ||
70 | match std::fs::rename(&path, &new_path) { | |
71 | Ok(_) => { | |
72 | task_log!(worker, "corrupted chunk renamed to {:?}", &new_path); | |
73 | }, | |
74 | Err(err) => { | |
75 | match err.kind() { | |
76 | std::io::ErrorKind::NotFound => { /* ignored */ }, | |
77 | _ => task_log!(worker, "could not rename corrupted chunk {:?} - {}", &path, err) | |
78 | } | |
79 | } | |
80 | }; | |
81 | } | |
82 | ||
83 | fn verify_index_chunks( | |
84 | datastore: Arc<DataStore>, | |
85 | index: Box<dyn IndexFile + Send>, | |
86 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
87 | corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, | |
88 | crypt_mode: CryptMode, | |
89 | worker: Arc<dyn TaskState + Send + Sync>, | |
90 | ) -> Result<(), Error> { | |
91 | ||
92 | let errors = Arc::new(AtomicUsize::new(0)); | |
93 | ||
94 | let start_time = Instant::now(); | |
95 | ||
96 | let mut read_bytes = 0; | |
97 | let mut decoded_bytes = 0; | |
98 | ||
99 | let worker2 = Arc::clone(&worker); | |
100 | let datastore2 = Arc::clone(&datastore); | |
101 | let corrupt_chunks2 = Arc::clone(&corrupt_chunks); | |
102 | let verified_chunks2 = Arc::clone(&verified_chunks); | |
103 | let errors2 = Arc::clone(&errors); | |
104 | ||
105 | let decoder_pool = ParallelHandler::new( | |
106 | "verify chunk decoder", 4, | |
107 | move |(chunk, digest, size): (DataBlob, [u8;32], u64)| { | |
108 | let chunk_crypt_mode = match chunk.crypt_mode() { | |
109 | Err(err) => { | |
110 | corrupt_chunks2.lock().unwrap().insert(digest); | |
111 | task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err); | |
112 | errors2.fetch_add(1, Ordering::SeqCst); | |
113 | return Ok(()); | |
114 | }, | |
115 | Ok(mode) => mode, | |
116 | }; | |
117 | ||
118 | if chunk_crypt_mode != crypt_mode { | |
119 | task_log!( | |
120 | worker2, | |
121 | "chunk CryptMode {:?} does not match index CryptMode {:?}", | |
122 | chunk_crypt_mode, | |
123 | crypt_mode | |
124 | ); | |
125 | errors2.fetch_add(1, Ordering::SeqCst); | |
126 | } | |
127 | ||
128 | if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { | |
129 | corrupt_chunks2.lock().unwrap().insert(digest); | |
130 | task_log!(worker2, "{}", err); | |
131 | errors2.fetch_add(1, Ordering::SeqCst); | |
132 | rename_corrupted_chunk(datastore2.clone(), &digest, &worker2); | |
133 | } else { | |
134 | verified_chunks2.lock().unwrap().insert(digest); | |
135 | } | |
136 | ||
137 | Ok(()) | |
138 | } | |
139 | ); | |
140 | ||
141 | for pos in 0..index.index_count() { | |
142 | ||
143 | worker.check_abort()?; | |
144 | crate::tools::fail_on_shutdown()?; | |
145 | ||
146 | let info = index.chunk_info(pos).unwrap(); | |
147 | let size = info.size(); | |
148 | ||
149 | if verified_chunks.lock().unwrap().contains(&info.digest) { | |
150 | continue; // already verified | |
151 | } | |
152 | ||
153 | if corrupt_chunks.lock().unwrap().contains(&info.digest) { | |
154 | let digest_str = proxmox::tools::digest_to_hex(&info.digest); | |
155 | task_log!(worker, "chunk {} was marked as corrupt", digest_str); | |
156 | errors.fetch_add(1, Ordering::SeqCst); | |
157 | continue; | |
158 | } | |
159 | ||
160 | match datastore.load_chunk(&info.digest) { | |
161 | Err(err) => { | |
162 | corrupt_chunks.lock().unwrap().insert(info.digest); | |
163 | task_log!(worker, "can't verify chunk, load failed - {}", err); | |
164 | errors.fetch_add(1, Ordering::SeqCst); | |
165 | rename_corrupted_chunk(datastore.clone(), &info.digest, &worker); | |
166 | continue; | |
167 | } | |
168 | Ok(chunk) => { | |
169 | read_bytes += chunk.raw_size(); | |
170 | decoder_pool.send((chunk, info.digest, size))?; | |
171 | decoded_bytes += size; | |
172 | } | |
173 | } | |
174 | } | |
175 | ||
176 | decoder_pool.complete()?; | |
177 | ||
178 | let elapsed = start_time.elapsed().as_secs_f64(); | |
179 | ||
180 | let read_bytes_mib = (read_bytes as f64)/(1024.0*1024.0); | |
181 | let decoded_bytes_mib = (decoded_bytes as f64)/(1024.0*1024.0); | |
182 | ||
183 | let read_speed = read_bytes_mib/elapsed; | |
184 | let decode_speed = decoded_bytes_mib/elapsed; | |
185 | ||
186 | let error_count = errors.load(Ordering::SeqCst); | |
187 | ||
188 | task_log!( | |
189 | worker, | |
190 | " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)", | |
191 | read_bytes_mib, | |
192 | decoded_bytes_mib, | |
193 | elapsed, | |
194 | read_speed, | |
195 | decode_speed, | |
196 | error_count, | |
197 | ); | |
198 | ||
199 | if errors.load(Ordering::SeqCst) > 0 { | |
200 | bail!("chunks could not be verified"); | |
201 | } | |
202 | ||
203 | Ok(()) | |
204 | } | |
205 | ||
206 | fn verify_fixed_index( | |
207 | datastore: Arc<DataStore>, | |
208 | backup_dir: &BackupDir, | |
209 | info: &FileInfo, | |
210 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
211 | corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
212 | worker: Arc<dyn TaskState + Send + Sync>, | |
213 | ) -> Result<(), Error> { | |
214 | ||
215 | let mut path = backup_dir.relative_path(); | |
216 | path.push(&info.filename); | |
217 | ||
218 | let index = datastore.open_fixed_reader(&path)?; | |
219 | ||
220 | let (csum, size) = index.compute_csum(); | |
221 | if size != info.size { | |
222 | bail!("wrong size ({} != {})", info.size, size); | |
223 | } | |
224 | ||
225 | if csum != info.csum { | |
226 | bail!("wrong index checksum"); | |
227 | } | |
228 | ||
229 | verify_index_chunks( | |
230 | datastore, | |
231 | Box::new(index), | |
232 | verified_chunks, | |
233 | corrupt_chunks, | |
234 | info.chunk_crypt_mode(), | |
235 | worker, | |
236 | ) | |
237 | } | |
238 | ||
239 | fn verify_dynamic_index( | |
240 | datastore: Arc<DataStore>, | |
241 | backup_dir: &BackupDir, | |
242 | info: &FileInfo, | |
243 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
244 | corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
245 | worker: Arc<dyn TaskState + Send + Sync>, | |
246 | ) -> Result<(), Error> { | |
247 | ||
248 | let mut path = backup_dir.relative_path(); | |
249 | path.push(&info.filename); | |
250 | ||
251 | let index = datastore.open_dynamic_reader(&path)?; | |
252 | ||
253 | let (csum, size) = index.compute_csum(); | |
254 | if size != info.size { | |
255 | bail!("wrong size ({} != {})", info.size, size); | |
256 | } | |
257 | ||
258 | if csum != info.csum { | |
259 | bail!("wrong index checksum"); | |
260 | } | |
261 | ||
262 | verify_index_chunks( | |
263 | datastore, | |
264 | Box::new(index), | |
265 | verified_chunks, | |
266 | corrupt_chunks, | |
267 | info.chunk_crypt_mode(), | |
268 | worker, | |
269 | ) | |
270 | } | |
271 | ||
272 | /// Verify a single backup snapshot | |
273 | /// | |
274 | /// This checks all archives inside a backup snapshot. | |
275 | /// Errors are logged to the worker log. | |
276 | /// | |
277 | /// Returns | |
278 | /// - Ok(true) if verify is successful | |
279 | /// - Ok(false) if there were verification errors | |
280 | /// - Err(_) if task was aborted | |
281 | pub fn verify_backup_dir( | |
282 | datastore: Arc<DataStore>, | |
283 | backup_dir: &BackupDir, | |
284 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
285 | corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
286 | worker: Arc<dyn TaskState + Send + Sync>, | |
287 | upid: UPID, | |
288 | filter: Option<&dyn Fn(&BackupManifest) -> bool>, | |
289 | ) -> Result<bool, Error> { | |
290 | let snap_lock = lock_dir_noblock_shared( | |
291 | &datastore.snapshot_path(&backup_dir), | |
292 | "snapshot", | |
293 | "locked by another operation"); | |
294 | match snap_lock { | |
295 | Ok(snap_lock) => verify_backup_dir_with_lock( | |
296 | datastore, | |
297 | backup_dir, | |
298 | verified_chunks, | |
299 | corrupt_chunks, | |
300 | worker, | |
301 | upid, | |
302 | filter, | |
303 | snap_lock | |
304 | ), | |
305 | Err(err) => { | |
306 | task_log!( | |
307 | worker, | |
308 | "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}", | |
309 | datastore.name(), | |
310 | backup_dir, | |
311 | err, | |
312 | ); | |
313 | Ok(true) | |
314 | } | |
315 | } | |
316 | } | |
317 | ||
318 | /// See verify_backup_dir | |
319 | pub fn verify_backup_dir_with_lock( | |
320 | datastore: Arc<DataStore>, | |
321 | backup_dir: &BackupDir, | |
322 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
323 | corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
324 | worker: Arc<dyn TaskState + Send + Sync>, | |
325 | upid: UPID, | |
326 | filter: Option<&dyn Fn(&BackupManifest) -> bool>, | |
327 | _snap_lock: Dir, | |
328 | ) -> Result<bool, Error> { | |
329 | let manifest = match datastore.load_manifest(&backup_dir) { | |
330 | Ok((manifest, _)) => manifest, | |
331 | Err(err) => { | |
332 | task_log!( | |
333 | worker, | |
334 | "verify {}:{} - manifest load error: {}", | |
335 | datastore.name(), | |
336 | backup_dir, | |
337 | err, | |
338 | ); | |
339 | return Ok(false); | |
340 | } | |
341 | }; | |
342 | ||
343 | if let Some(filter) = filter { | |
344 | if filter(&manifest) == false { | |
345 | task_log!( | |
346 | worker, | |
347 | "SKIPPED: verify {}:{} (recently verified)", | |
348 | datastore.name(), | |
349 | backup_dir, | |
350 | ); | |
351 | return Ok(true); | |
352 | } | |
353 | } | |
354 | ||
355 | task_log!(worker, "verify {}:{}", datastore.name(), backup_dir); | |
356 | ||
357 | let mut error_count = 0; | |
358 | ||
359 | let mut verify_result = VerifyState::Ok; | |
360 | for info in manifest.files() { | |
361 | let result = proxmox::try_block!({ | |
362 | task_log!(worker, " check {}", info.filename); | |
363 | match archive_type(&info.filename)? { | |
364 | ArchiveType::FixedIndex => | |
365 | verify_fixed_index( | |
366 | datastore.clone(), | |
367 | &backup_dir, | |
368 | info, | |
369 | verified_chunks.clone(), | |
370 | corrupt_chunks.clone(), | |
371 | worker.clone(), | |
372 | ), | |
373 | ArchiveType::DynamicIndex => | |
374 | verify_dynamic_index( | |
375 | datastore.clone(), | |
376 | &backup_dir, | |
377 | info, | |
378 | verified_chunks.clone(), | |
379 | corrupt_chunks.clone(), | |
380 | worker.clone(), | |
381 | ), | |
382 | ArchiveType::Blob => verify_blob(datastore.clone(), &backup_dir, info), | |
383 | } | |
384 | }); | |
385 | ||
386 | worker.check_abort()?; | |
387 | crate::tools::fail_on_shutdown()?; | |
388 | ||
389 | if let Err(err) = result { | |
390 | task_log!( | |
391 | worker, | |
392 | "verify {}:{}/{} failed: {}", | |
393 | datastore.name(), | |
394 | backup_dir, | |
395 | info.filename, | |
396 | err, | |
397 | ); | |
398 | error_count += 1; | |
399 | verify_result = VerifyState::Failed; | |
400 | } | |
401 | ||
402 | } | |
403 | ||
404 | let verify_state = SnapshotVerifyState { | |
405 | state: verify_result, | |
406 | upid, | |
407 | }; | |
408 | let verify_state = serde_json::to_value(verify_state)?; | |
409 | datastore.update_manifest(&backup_dir, |manifest| { | |
410 | manifest.unprotected["verify_state"] = verify_state; | |
411 | }).map_err(|err| format_err!("unable to update manifest blob - {}", err))?; | |
412 | ||
413 | Ok(error_count == 0) | |
414 | } | |
415 | ||
416 | /// Verify all backups inside a backup group | |
417 | /// | |
418 | /// Errors are logged to the worker log. | |
419 | /// | |
420 | /// Returns | |
421 | /// - Ok((count, failed_dirs)) where failed_dirs had verification errors | |
422 | /// - Err(_) if task was aborted | |
423 | pub fn verify_backup_group( | |
424 | datastore: Arc<DataStore>, | |
425 | group: &BackupGroup, | |
426 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
427 | corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
428 | progress: Option<(usize, usize)>, // (done, snapshot_count) | |
429 | worker: Arc<dyn TaskState + Send + Sync>, | |
430 | upid: &UPID, | |
431 | filter: Option<&dyn Fn(&BackupManifest) -> bool>, | |
432 | ) -> Result<(usize, Vec<String>), Error> { | |
433 | ||
434 | let mut errors = Vec::new(); | |
435 | let mut list = match group.list_backups(&datastore.base_path()) { | |
436 | Ok(list) => list, | |
437 | Err(err) => { | |
438 | task_log!( | |
439 | worker, | |
440 | "verify group {}:{} - unable to list backups: {}", | |
441 | datastore.name(), | |
442 | group, | |
443 | err, | |
444 | ); | |
445 | return Ok((0, errors)); | |
446 | } | |
447 | }; | |
448 | ||
449 | task_log!(worker, "verify group {}:{}", datastore.name(), group); | |
450 | ||
451 | let (done, snapshot_count) = progress.unwrap_or((0, list.len())); | |
452 | ||
453 | let mut count = 0; | |
454 | BackupInfo::sort_list(&mut list, false); // newest first | |
455 | for info in list { | |
456 | count += 1; | |
457 | ||
458 | if !verify_backup_dir( | |
459 | datastore.clone(), | |
460 | &info.backup_dir, | |
461 | verified_chunks.clone(), | |
462 | corrupt_chunks.clone(), | |
463 | worker.clone(), | |
464 | upid.clone(), | |
465 | filter, | |
466 | )? { | |
467 | errors.push(info.backup_dir.to_string()); | |
468 | } | |
469 | if snapshot_count != 0 { | |
470 | let pos = done + count; | |
471 | let percentage = ((pos as f64) * 100.0)/(snapshot_count as f64); | |
472 | task_log!( | |
473 | worker, | |
474 | "percentage done: {:.2}% ({} of {} snapshots)", | |
475 | percentage, | |
476 | pos, | |
477 | snapshot_count, | |
478 | ); | |
479 | } | |
480 | } | |
481 | ||
482 | Ok((count, errors)) | |
483 | } | |
484 | ||
485 | /// Verify all backups inside a datastore | |
486 | /// | |
487 | /// Errors are logged to the worker log. | |
488 | /// | |
489 | /// Returns | |
490 | /// - Ok(failed_dirs) where failed_dirs had verification errors | |
491 | /// - Err(_) if task was aborted | |
492 | pub fn verify_all_backups( | |
493 | datastore: Arc<DataStore>, | |
494 | worker: Arc<dyn TaskState + Send + Sync>, | |
495 | upid: &UPID, | |
496 | filter: Option<&dyn Fn(&BackupManifest) -> bool>, | |
497 | ) -> Result<Vec<String>, Error> { | |
498 | let mut errors = Vec::new(); | |
499 | ||
500 | let mut list = match BackupGroup::list_groups(&datastore.base_path()) { | |
501 | Ok(list) => list | |
502 | .into_iter() | |
503 | .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark")) | |
504 | .collect::<Vec<BackupGroup>>(), | |
505 | Err(err) => { | |
506 | task_log!( | |
507 | worker, | |
508 | "verify datastore {} - unable to list backups: {}", | |
509 | datastore.name(), | |
510 | err, | |
511 | ); | |
512 | return Ok(errors); | |
513 | } | |
514 | }; | |
515 | ||
516 | list.sort_unstable(); | |
517 | ||
518 | let mut snapshot_count = 0; | |
519 | for group in list.iter() { | |
520 | snapshot_count += group.list_backups(&datastore.base_path())?.len(); | |
521 | } | |
522 | ||
523 | // start with 16384 chunks (up to 65GB) | |
524 | let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16))); | |
525 | ||
526 | // start with 64 chunks since we assume there are few corrupt ones | |
527 | let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64))); | |
528 | ||
529 | task_log!(worker, "verify datastore {} ({} snapshots)", datastore.name(), snapshot_count); | |
530 | ||
531 | let mut done = 0; | |
532 | for group in list { | |
533 | let (count, mut group_errors) = verify_backup_group( | |
534 | datastore.clone(), | |
535 | &group, | |
536 | verified_chunks.clone(), | |
537 | corrupt_chunks.clone(), | |
538 | Some((done, snapshot_count)), | |
539 | worker.clone(), | |
540 | upid, | |
541 | filter, | |
542 | )?; | |
543 | errors.append(&mut group_errors); | |
544 | ||
545 | done += count; | |
546 | } | |
547 | ||
548 | Ok(errors) | |
549 | } |