]>
Commit | Line | Data |
---|---|---|
1 | use std::collections::HashSet; | |
2 | use std::sync::{Arc, Mutex}; | |
3 | use std::sync::atomic::{Ordering, AtomicUsize}; | |
4 | use std::time::Instant; | |
5 | ||
6 | use anyhow::{bail, format_err, Error}; | |
7 | ||
8 | use crate::server::WorkerTask; | |
9 | use crate::api2::types::*; | |
10 | ||
11 | use super::{ | |
12 | DataStore, DataBlob, BackupGroup, BackupDir, BackupInfo, IndexFile, | |
13 | CryptMode, | |
14 | FileInfo, ArchiveType, archive_type, | |
15 | }; | |
16 | ||
17 | fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> { | |
18 | ||
19 | let blob = datastore.load_blob(backup_dir, &info.filename)?; | |
20 | ||
21 | let raw_size = blob.raw_size(); | |
22 | if raw_size != info.size { | |
23 | bail!("wrong size ({} != {})", info.size, raw_size); | |
24 | } | |
25 | ||
26 | let csum = openssl::sha::sha256(blob.raw_data()); | |
27 | if csum != info.csum { | |
28 | bail!("wrong index checksum"); | |
29 | } | |
30 | ||
31 | match blob.crypt_mode()? { | |
32 | CryptMode::Encrypt => Ok(()), | |
33 | CryptMode::None => { | |
34 | // digest already verified above | |
35 | blob.decode(None, None)?; | |
36 | Ok(()) | |
37 | }, | |
38 | CryptMode::SignOnly => bail!("Invalid CryptMode for blob"), | |
39 | } | |
40 | } | |
41 | ||
42 | fn rename_corrupted_chunk( | |
43 | datastore: Arc<DataStore>, | |
44 | digest: &[u8;32], | |
45 | worker: Arc<WorkerTask>, | |
46 | ) { | |
47 | let (path, digest_str) = datastore.chunk_path(digest); | |
48 | ||
49 | let mut counter = 0; | |
50 | let mut new_path = path.clone(); | |
51 | loop { | |
52 | new_path.set_file_name(format!("{}.{}.bad", digest_str, counter)); | |
53 | if new_path.exists() && counter < 9 { counter += 1; } else { break; } | |
54 | } | |
55 | ||
56 | match std::fs::rename(&path, &new_path) { | |
57 | Ok(_) => { | |
58 | worker.log(format!("corrupted chunk renamed to {:?}", &new_path)); | |
59 | }, | |
60 | Err(err) => { | |
61 | match err.kind() { | |
62 | std::io::ErrorKind::NotFound => { /* ignored */ }, | |
63 | _ => worker.log(format!("could not rename corrupted chunk {:?} - {}", &path, err)) | |
64 | } | |
65 | } | |
66 | }; | |
67 | } | |
68 | ||
69 | // We use a separate thread to read/load chunks, so that we can do | |
70 | // load and verify in parallel to increase performance. | |
71 | fn chunk_reader_thread( | |
72 | datastore: Arc<DataStore>, | |
73 | index: Box<dyn IndexFile + Send>, | |
74 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
75 | corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
76 | errors: Arc<AtomicUsize>, | |
77 | worker: Arc<WorkerTask>, | |
78 | ) -> std::sync::mpsc::Receiver<(DataBlob, [u8;32], u64)> { | |
79 | ||
80 | let (sender, receiver) = std::sync::mpsc::sync_channel(3); // buffer up to 3 chunks | |
81 | ||
82 | std::thread::spawn(move|| { | |
83 | for pos in 0..index.index_count() { | |
84 | let info = index.chunk_info(pos).unwrap(); | |
85 | let size = info.range.end - info.range.start; | |
86 | ||
87 | if verified_chunks.lock().unwrap().contains(&info.digest) { | |
88 | continue; // already verified | |
89 | } | |
90 | ||
91 | if corrupt_chunks.lock().unwrap().contains(&info.digest) { | |
92 | let digest_str = proxmox::tools::digest_to_hex(&info.digest); | |
93 | worker.log(format!("chunk {} was marked as corrupt", digest_str)); | |
94 | errors.fetch_add(1, Ordering::SeqCst); | |
95 | continue; | |
96 | } | |
97 | ||
98 | match datastore.load_chunk(&info.digest) { | |
99 | Err(err) => { | |
100 | corrupt_chunks.lock().unwrap().insert(info.digest); | |
101 | worker.log(format!("can't verify chunk, load failed - {}", err)); | |
102 | errors.fetch_add(1, Ordering::SeqCst); | |
103 | rename_corrupted_chunk(datastore.clone(), &info.digest, worker.clone()); | |
104 | continue; | |
105 | } | |
106 | Ok(chunk) => { | |
107 | if sender.send((chunk, info.digest, size)).is_err() { | |
108 | break; // receiver gone - simply stop | |
109 | } | |
110 | } | |
111 | } | |
112 | } | |
113 | }); | |
114 | ||
115 | receiver | |
116 | } | |
117 | ||
118 | fn verify_index_chunks( | |
119 | datastore: Arc<DataStore>, | |
120 | index: Box<dyn IndexFile + Send>, | |
121 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
122 | corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, | |
123 | crypt_mode: CryptMode, | |
124 | worker: Arc<WorkerTask>, | |
125 | ) -> Result<(), Error> { | |
126 | ||
127 | let errors = Arc::new(AtomicUsize::new(0)); | |
128 | ||
129 | let start_time = Instant::now(); | |
130 | ||
131 | let chunk_channel = chunk_reader_thread( | |
132 | datastore.clone(), | |
133 | index, | |
134 | verified_chunks.clone(), | |
135 | corrupt_chunks.clone(), | |
136 | errors.clone(), | |
137 | worker.clone(), | |
138 | ); | |
139 | ||
140 | let mut read_bytes = 0; | |
141 | let mut decoded_bytes = 0; | |
142 | ||
143 | loop { | |
144 | ||
145 | worker.fail_on_abort()?; | |
146 | crate::tools::fail_on_shutdown()?; | |
147 | ||
148 | let (chunk, digest, size) = match chunk_channel.recv() { | |
149 | Ok(tuple) => tuple, | |
150 | Err(std::sync::mpsc::RecvError) => break, | |
151 | }; | |
152 | ||
153 | read_bytes += chunk.raw_size(); | |
154 | decoded_bytes += size; | |
155 | ||
156 | let chunk_crypt_mode = match chunk.crypt_mode() { | |
157 | Err(err) => { | |
158 | corrupt_chunks.lock().unwrap().insert(digest); | |
159 | worker.log(format!("can't verify chunk, unknown CryptMode - {}", err)); | |
160 | errors.fetch_add(1, Ordering::SeqCst); | |
161 | continue; | |
162 | }, | |
163 | Ok(mode) => mode, | |
164 | }; | |
165 | ||
166 | if chunk_crypt_mode != crypt_mode { | |
167 | worker.log(format!( | |
168 | "chunk CryptMode {:?} does not match index CryptMode {:?}", | |
169 | chunk_crypt_mode, | |
170 | crypt_mode | |
171 | )); | |
172 | errors.fetch_add(1, Ordering::SeqCst); | |
173 | } | |
174 | ||
175 | if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { | |
176 | corrupt_chunks.lock().unwrap().insert(digest); | |
177 | worker.log(format!("{}", err)); | |
178 | errors.fetch_add(1, Ordering::SeqCst); | |
179 | rename_corrupted_chunk(datastore.clone(), &digest, worker.clone()); | |
180 | } else { | |
181 | verified_chunks.lock().unwrap().insert(digest); | |
182 | } | |
183 | } | |
184 | ||
185 | let elapsed = start_time.elapsed().as_secs_f64(); | |
186 | ||
187 | let read_bytes_mib = (read_bytes as f64)/(1024.0*1024.0); | |
188 | let decoded_bytes_mib = (decoded_bytes as f64)/(1024.0*1024.0); | |
189 | ||
190 | let read_speed = read_bytes_mib/elapsed; | |
191 | let decode_speed = decoded_bytes_mib/elapsed; | |
192 | ||
193 | let error_count = errors.load(Ordering::SeqCst); | |
194 | ||
195 | worker.log(format!(" verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)", | |
196 | read_bytes_mib, decoded_bytes_mib, elapsed, read_speed, decode_speed, error_count)); | |
197 | ||
198 | if errors.load(Ordering::SeqCst) > 0 { | |
199 | bail!("chunks could not be verified"); | |
200 | } | |
201 | ||
202 | Ok(()) | |
203 | } | |
204 | ||
205 | fn verify_fixed_index( | |
206 | datastore: Arc<DataStore>, | |
207 | backup_dir: &BackupDir, | |
208 | info: &FileInfo, | |
209 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
210 | corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
211 | worker: Arc<WorkerTask>, | |
212 | ) -> Result<(), Error> { | |
213 | ||
214 | let mut path = backup_dir.relative_path(); | |
215 | path.push(&info.filename); | |
216 | ||
217 | let index = datastore.open_fixed_reader(&path)?; | |
218 | ||
219 | let (csum, size) = index.compute_csum(); | |
220 | if size != info.size { | |
221 | bail!("wrong size ({} != {})", info.size, size); | |
222 | } | |
223 | ||
224 | if csum != info.csum { | |
225 | bail!("wrong index checksum"); | |
226 | } | |
227 | ||
228 | verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker) | |
229 | } | |
230 | ||
231 | fn verify_dynamic_index( | |
232 | datastore: Arc<DataStore>, | |
233 | backup_dir: &BackupDir, | |
234 | info: &FileInfo, | |
235 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
236 | corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
237 | worker: Arc<WorkerTask>, | |
238 | ) -> Result<(), Error> { | |
239 | ||
240 | let mut path = backup_dir.relative_path(); | |
241 | path.push(&info.filename); | |
242 | ||
243 | let index = datastore.open_dynamic_reader(&path)?; | |
244 | ||
245 | let (csum, size) = index.compute_csum(); | |
246 | if size != info.size { | |
247 | bail!("wrong size ({} != {})", info.size, size); | |
248 | } | |
249 | ||
250 | if csum != info.csum { | |
251 | bail!("wrong index checksum"); | |
252 | } | |
253 | ||
254 | verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker) | |
255 | } | |
256 | ||
257 | /// Verify a single backup snapshot | |
258 | /// | |
259 | /// This checks all archives inside a backup snapshot. | |
260 | /// Errors are logged to the worker log. | |
261 | /// | |
262 | /// Returns | |
263 | /// - Ok(true) if verify is successful | |
264 | /// - Ok(false) if there were verification errors | |
265 | /// - Err(_) if task was aborted | |
266 | pub fn verify_backup_dir( | |
267 | datastore: Arc<DataStore>, | |
268 | backup_dir: &BackupDir, | |
269 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
270 | corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
271 | worker: Arc<WorkerTask> | |
272 | ) -> Result<bool, Error> { | |
273 | ||
274 | let mut manifest = match datastore.load_manifest(&backup_dir) { | |
275 | Ok((manifest, _)) => manifest, | |
276 | Err(err) => { | |
277 | worker.log(format!("verify {}:{} - manifest load error: {}", datastore.name(), backup_dir, err)); | |
278 | return Ok(false); | |
279 | } | |
280 | }; | |
281 | ||
282 | worker.log(format!("verify {}:{}", datastore.name(), backup_dir)); | |
283 | ||
284 | let mut error_count = 0; | |
285 | ||
286 | let mut verify_result = VerifyState::Ok; | |
287 | for info in manifest.files() { | |
288 | let result = proxmox::try_block!({ | |
289 | worker.log(format!(" check {}", info.filename)); | |
290 | match archive_type(&info.filename)? { | |
291 | ArchiveType::FixedIndex => | |
292 | verify_fixed_index( | |
293 | datastore.clone(), | |
294 | &backup_dir, | |
295 | info, | |
296 | verified_chunks.clone(), | |
297 | corrupt_chunks.clone(), | |
298 | worker.clone(), | |
299 | ), | |
300 | ArchiveType::DynamicIndex => | |
301 | verify_dynamic_index( | |
302 | datastore.clone(), | |
303 | &backup_dir, | |
304 | info, | |
305 | verified_chunks.clone(), | |
306 | corrupt_chunks.clone(), | |
307 | worker.clone(), | |
308 | ), | |
309 | ArchiveType::Blob => verify_blob(datastore.clone(), &backup_dir, info), | |
310 | } | |
311 | }); | |
312 | ||
313 | worker.fail_on_abort()?; | |
314 | crate::tools::fail_on_shutdown()?; | |
315 | ||
316 | if let Err(err) = result { | |
317 | worker.log(format!("verify {}:{}/{} failed: {}", datastore.name(), backup_dir, info.filename, err)); | |
318 | error_count += 1; | |
319 | verify_result = VerifyState::Failed; | |
320 | } | |
321 | ||
322 | } | |
323 | ||
324 | let verify_state = SnapshotVerifyState { | |
325 | state: verify_result, | |
326 | upid: worker.upid().clone(), | |
327 | }; | |
328 | manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?; | |
329 | datastore.store_manifest(&backup_dir, serde_json::to_value(manifest)?) | |
330 | .map_err(|err| format_err!("unable to store manifest blob - {}", err))?; | |
331 | ||
332 | Ok(error_count == 0) | |
333 | } | |
334 | ||
335 | /// Verify all backups inside a backup group | |
336 | /// | |
337 | /// Errors are logged to the worker log. | |
338 | /// | |
339 | /// Returns | |
340 | /// - Ok((count, failed_dirs)) where failed_dirs had verification errors | |
341 | /// - Err(_) if task was aborted | |
342 | pub fn verify_backup_group( | |
343 | datastore: Arc<DataStore>, | |
344 | group: &BackupGroup, | |
345 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
346 | corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
347 | progress: Option<(usize, usize)>, // (done, snapshot_count) | |
348 | worker: Arc<WorkerTask>, | |
349 | ) -> Result<(usize, Vec<String>), Error> { | |
350 | ||
351 | let mut errors = Vec::new(); | |
352 | let mut list = match group.list_backups(&datastore.base_path()) { | |
353 | Ok(list) => list, | |
354 | Err(err) => { | |
355 | worker.log(format!("verify group {}:{} - unable to list backups: {}", datastore.name(), group, err)); | |
356 | return Ok((0, errors)); | |
357 | } | |
358 | }; | |
359 | ||
360 | worker.log(format!("verify group {}:{}", datastore.name(), group)); | |
361 | ||
362 | let (done, snapshot_count) = progress.unwrap_or((0, list.len())); | |
363 | ||
364 | let mut count = 0; | |
365 | BackupInfo::sort_list(&mut list, false); // newest first | |
366 | for info in list { | |
367 | count += 1; | |
368 | if !verify_backup_dir(datastore.clone(), &info.backup_dir, verified_chunks.clone(), corrupt_chunks.clone(), worker.clone())?{ | |
369 | errors.push(info.backup_dir.to_string()); | |
370 | } | |
371 | if snapshot_count != 0 { | |
372 | let pos = done + count; | |
373 | let percentage = ((pos as f64) * 100.0)/(snapshot_count as f64); | |
374 | worker.log(format!("percentage done: {:.2}% ({} of {} snapshots)", percentage, pos, snapshot_count)); | |
375 | } | |
376 | } | |
377 | ||
378 | Ok((count, errors)) | |
379 | } | |
380 | ||
381 | /// Verify all backups inside a datastore | |
382 | /// | |
383 | /// Errors are logged to the worker log. | |
384 | /// | |
385 | /// Returns | |
386 | /// - Ok(failed_dirs) where failed_dirs had verification errors | |
387 | /// - Err(_) if task was aborted | |
388 | pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) -> Result<Vec<String>, Error> { | |
389 | ||
390 | let mut errors = Vec::new(); | |
391 | ||
392 | let mut list = match BackupGroup::list_groups(&datastore.base_path()) { | |
393 | Ok(list) => list | |
394 | .into_iter() | |
395 | .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark")) | |
396 | .collect::<Vec<BackupGroup>>(), | |
397 | Err(err) => { | |
398 | worker.log(format!("verify datastore {} - unable to list backups: {}", datastore.name(), err)); | |
399 | return Ok(errors); | |
400 | } | |
401 | }; | |
402 | ||
403 | list.sort_unstable(); | |
404 | ||
405 | let mut snapshot_count = 0; | |
406 | for group in list.iter() { | |
407 | snapshot_count += group.list_backups(&datastore.base_path())?.len(); | |
408 | } | |
409 | ||
410 | // start with 16384 chunks (up to 65GB) | |
411 | let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16))); | |
412 | ||
413 | // start with 64 chunks since we assume there are few corrupt ones | |
414 | let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64))); | |
415 | ||
416 | worker.log(format!("verify datastore {} ({} snapshots)", datastore.name(), snapshot_count)); | |
417 | ||
418 | let mut done = 0; | |
419 | for group in list { | |
420 | let (count, mut group_errors) = verify_backup_group( | |
421 | datastore.clone(), | |
422 | &group, | |
423 | verified_chunks.clone(), | |
424 | corrupt_chunks.clone(), | |
425 | Some((done, snapshot_count)), | |
426 | worker.clone(), | |
427 | )?; | |
428 | errors.append(&mut group_errors); | |
429 | ||
430 | done += count; | |
431 | } | |
432 | ||
433 | Ok(errors) | |
434 | } |