]>
Commit | Line | Data |
---|---|---|
2aaae970 | 1 | use std::collections::HashSet; |
6b809ff5 DM |
2 | use std::sync::{Arc, Mutex}; |
3 | use std::sync::atomic::{Ordering, AtomicUsize}; | |
4 | use std::time::Instant; | |
2aaae970 | 5 | |
3b2046d2 | 6 | use anyhow::{bail, format_err, Error}; |
c2009e53 DM |
7 | |
8 | use crate::server::WorkerTask; | |
3b2046d2 | 9 | use crate::api2::types::*; |
c2009e53 DM |
10 | |
11 | use super::{ | |
6b809ff5 | 12 | DataStore, DataBlob, BackupGroup, BackupDir, BackupInfo, IndexFile, |
8819d1f2 | 13 | CryptMode, |
c2009e53 DM |
14 | FileInfo, ArchiveType, archive_type, |
15 | }; | |
16 | ||
6b809ff5 | 17 | fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> { |
c2009e53 | 18 | |
39f18b30 | 19 | let blob = datastore.load_blob(backup_dir, &info.filename)?; |
c2009e53 | 20 | |
2aaae970 | 21 | let raw_size = blob.raw_size(); |
c2009e53 DM |
22 | if raw_size != info.size { |
23 | bail!("wrong size ({} != {})", info.size, raw_size); | |
24 | } | |
25 | ||
39f18b30 | 26 | let csum = openssl::sha::sha256(blob.raw_data()); |
c2009e53 DM |
27 | if csum != info.csum { |
28 | bail!("wrong index checksum"); | |
29 | } | |
30 | ||
8819d1f2 FG |
31 | match blob.crypt_mode()? { |
32 | CryptMode::Encrypt => Ok(()), | |
33 | CryptMode::None => { | |
34 | // digest already verified above | |
35 | blob.decode(None, None)?; | |
36 | Ok(()) | |
37 | }, | |
38 | CryptMode::SignOnly => bail!("Invalid CryptMode for blob"), | |
c2009e53 | 39 | } |
c2009e53 DM |
40 | } |
41 | ||
6b809ff5 DM |
42 | // We use a separate thread to read/load chunks, so that we can do |
43 | // load and verify in parallel to increase performance. | |
44 | fn chunk_reader_thread( | |
45 | datastore: Arc<DataStore>, | |
46 | index: Box<dyn IndexFile + Send>, | |
47 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
48 | corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
49 | errors: Arc<AtomicUsize>, | |
50 | worker: Arc<WorkerTask>, | |
51 | ) -> std::sync::mpsc::Receiver<(DataBlob, [u8;32], u64)> { | |
52 | ||
53 | let (sender, receiver) = std::sync::mpsc::sync_channel(3); // buffer up to 3 chunks | |
54 | ||
55 | std::thread::spawn(move|| { | |
56 | for pos in 0..index.index_count() { | |
57 | let info = index.chunk_info(pos).unwrap(); | |
58 | let size = info.range.end - info.range.start; | |
59 | ||
60 | if verified_chunks.lock().unwrap().contains(&info.digest) { | |
61 | continue; // already verified | |
62 | } | |
63 | ||
64 | if corrupt_chunks.lock().unwrap().contains(&info.digest) { | |
65 | let digest_str = proxmox::tools::digest_to_hex(&info.digest); | |
66 | worker.log(format!("chunk {} was marked as corrupt", digest_str)); | |
67 | errors.fetch_add(1, Ordering::SeqCst); | |
68 | continue; | |
69 | } | |
70 | ||
71 | match datastore.load_chunk(&info.digest) { | |
72 | Err(err) => { | |
73 | corrupt_chunks.lock().unwrap().insert(info.digest); | |
74 | worker.log(format!("can't verify chunk, load failed - {}", err)); | |
75 | errors.fetch_add(1, Ordering::SeqCst); | |
76 | continue; | |
77 | } | |
78 | Ok(chunk) => { | |
79 | if sender.send((chunk, info.digest, size)).is_err() { | |
80 | break; // receiver gone - simply stop | |
81 | } | |
82 | } | |
83 | } | |
84 | } | |
85 | }); | |
86 | ||
87 | receiver | |
88 | } | |
89 | ||
fdaab0df | 90 | fn verify_index_chunks( |
6b809ff5 DM |
91 | datastore: Arc<DataStore>, |
92 | index: Box<dyn IndexFile + Send>, | |
93 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
94 | corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, | |
9a38fa29 | 95 | crypt_mode: CryptMode, |
6b809ff5 | 96 | worker: Arc<WorkerTask>, |
fdaab0df DM |
97 | ) -> Result<(), Error> { |
98 | ||
6b809ff5 | 99 | let errors = Arc::new(AtomicUsize::new(0)); |
fdaab0df | 100 | |
6b809ff5 | 101 | let start_time = Instant::now(); |
fdaab0df | 102 | |
6b809ff5 DM |
103 | let chunk_channel = chunk_reader_thread( |
104 | datastore, | |
105 | index, | |
106 | verified_chunks.clone(), | |
107 | corrupt_chunks.clone(), | |
108 | errors.clone(), | |
109 | worker.clone(), | |
110 | ); | |
7ae571e7 | 111 | |
6b809ff5 DM |
112 | let mut read_bytes = 0; |
113 | let mut decoded_bytes = 0; | |
7ae571e7 | 114 | |
6b809ff5 | 115 | loop { |
2aaae970 | 116 | |
6b809ff5 | 117 | worker.fail_on_abort()?; |
deef6369 | 118 | crate::tools::fail_on_shutdown()?; |
6b809ff5 DM |
119 | |
120 | let (chunk, digest, size) = match chunk_channel.recv() { | |
121 | Ok(tuple) => tuple, | |
122 | Err(std::sync::mpsc::RecvError) => break, | |
9a38fa29 FG |
123 | }; |
124 | ||
6b809ff5 DM |
125 | read_bytes += chunk.raw_size(); |
126 | decoded_bytes += size; | |
127 | ||
9a38fa29 FG |
128 | let chunk_crypt_mode = match chunk.crypt_mode() { |
129 | Err(err) => { | |
6b809ff5 | 130 | corrupt_chunks.lock().unwrap().insert(digest); |
9a38fa29 | 131 | worker.log(format!("can't verify chunk, unknown CryptMode - {}", err)); |
6b809ff5 | 132 | errors.fetch_add(1, Ordering::SeqCst); |
9a38fa29 FG |
133 | continue; |
134 | }, | |
135 | Ok(mode) => mode, | |
136 | }; | |
137 | ||
138 | if chunk_crypt_mode != crypt_mode { | |
139 | worker.log(format!( | |
140 | "chunk CryptMode {:?} does not match index CryptMode {:?}", | |
141 | chunk_crypt_mode, | |
142 | crypt_mode | |
143 | )); | |
6b809ff5 | 144 | errors.fetch_add(1, Ordering::SeqCst); |
9a38fa29 FG |
145 | } |
146 | ||
6b809ff5 DM |
147 | if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { |
148 | corrupt_chunks.lock().unwrap().insert(digest); | |
7ae571e7 | 149 | worker.log(format!("{}", err)); |
6b809ff5 | 150 | errors.fetch_add(1, Ordering::SeqCst); |
7ae571e7 | 151 | } else { |
6b809ff5 | 152 | verified_chunks.lock().unwrap().insert(digest); |
2aaae970 | 153 | } |
fdaab0df DM |
154 | } |
155 | ||
6b809ff5 DM |
156 | let elapsed = start_time.elapsed().as_secs_f64(); |
157 | ||
158 | let read_bytes_mib = (read_bytes as f64)/(1024.0*1024.0); | |
159 | let decoded_bytes_mib = (decoded_bytes as f64)/(1024.0*1024.0); | |
160 | ||
161 | let read_speed = read_bytes_mib/elapsed; | |
162 | let decode_speed = decoded_bytes_mib/elapsed; | |
163 | ||
164 | let error_count = errors.load(Ordering::SeqCst); | |
165 | ||
166 | worker.log(format!(" verified {:.2}/{:.2} Mib in {:.2} seconds, speed {:.2}/{:.2} Mib/s ({} errors)", | |
167 | read_bytes_mib, decoded_bytes_mib, elapsed, read_speed, decode_speed, error_count)); | |
168 | ||
169 | if errors.load(Ordering::SeqCst) > 0 { | |
f66f537d DC |
170 | bail!("chunks could not be verified"); |
171 | } | |
172 | ||
fdaab0df DM |
173 | Ok(()) |
174 | } | |
175 | ||
2aaae970 | 176 | fn verify_fixed_index( |
6b809ff5 | 177 | datastore: Arc<DataStore>, |
2aaae970 DM |
178 | backup_dir: &BackupDir, |
179 | info: &FileInfo, | |
6b809ff5 DM |
180 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, |
181 | corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
182 | worker: Arc<WorkerTask>, | |
2aaae970 | 183 | ) -> Result<(), Error> { |
c2009e53 DM |
184 | |
185 | let mut path = backup_dir.relative_path(); | |
186 | path.push(&info.filename); | |
187 | ||
188 | let index = datastore.open_fixed_reader(&path)?; | |
189 | ||
190 | let (csum, size) = index.compute_csum(); | |
191 | if size != info.size { | |
192 | bail!("wrong size ({} != {})", info.size, size); | |
193 | } | |
194 | ||
195 | if csum != info.csum { | |
196 | bail!("wrong index checksum"); | |
197 | } | |
198 | ||
9a38fa29 | 199 | verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker) |
c2009e53 DM |
200 | } |
201 | ||
2aaae970 | 202 | fn verify_dynamic_index( |
6b809ff5 | 203 | datastore: Arc<DataStore>, |
2aaae970 DM |
204 | backup_dir: &BackupDir, |
205 | info: &FileInfo, | |
6b809ff5 DM |
206 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, |
207 | corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
208 | worker: Arc<WorkerTask>, | |
2aaae970 DM |
209 | ) -> Result<(), Error> { |
210 | ||
c2009e53 DM |
211 | let mut path = backup_dir.relative_path(); |
212 | path.push(&info.filename); | |
213 | ||
214 | let index = datastore.open_dynamic_reader(&path)?; | |
215 | ||
216 | let (csum, size) = index.compute_csum(); | |
217 | if size != info.size { | |
218 | bail!("wrong size ({} != {})", info.size, size); | |
219 | } | |
220 | ||
221 | if csum != info.csum { | |
222 | bail!("wrong index checksum"); | |
223 | } | |
224 | ||
9a38fa29 | 225 | verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker) |
c2009e53 DM |
226 | } |
227 | ||
228 | /// Verify a single backup snapshot | |
229 | /// | |
230 | /// This checks all archives inside a backup snapshot. | |
231 | /// Errors are logged to the worker log. | |
232 | /// | |
8ea00f6e DM |
233 | /// Returns |
234 | /// - Ok(true) if verify is successful | |
235 | /// - Ok(false) if there were verification errors | |
236 | /// - Err(_) if task was aborted | |
2aaae970 | 237 | pub fn verify_backup_dir( |
6b809ff5 | 238 | datastore: Arc<DataStore>, |
2aaae970 | 239 | backup_dir: &BackupDir, |
6b809ff5 DM |
240 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, |
241 | corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
242 | worker: Arc<WorkerTask> | |
2aaae970 | 243 | ) -> Result<bool, Error> { |
c2009e53 | 244 | |
3b2046d2 | 245 | let mut manifest = match datastore.load_manifest(&backup_dir) { |
ff86ef00 | 246 | Ok((manifest, _)) => manifest, |
c2009e53 DM |
247 | Err(err) => { |
248 | worker.log(format!("verify {}:{} - manifest load error: {}", datastore.name(), backup_dir, err)); | |
8ea00f6e | 249 | return Ok(false); |
c2009e53 DM |
250 | } |
251 | }; | |
252 | ||
253 | worker.log(format!("verify {}:{}", datastore.name(), backup_dir)); | |
254 | ||
255 | let mut error_count = 0; | |
256 | ||
3b2046d2 | 257 | let mut verify_result = "ok"; |
c2009e53 DM |
258 | for info in manifest.files() { |
259 | let result = proxmox::try_block!({ | |
260 | worker.log(format!(" check {}", info.filename)); | |
261 | match archive_type(&info.filename)? { | |
d8594d87 DC |
262 | ArchiveType::FixedIndex => |
263 | verify_fixed_index( | |
6b809ff5 | 264 | datastore.clone(), |
d8594d87 DC |
265 | &backup_dir, |
266 | info, | |
6b809ff5 DM |
267 | verified_chunks.clone(), |
268 | corrupt_chunks.clone(), | |
269 | worker.clone(), | |
d8594d87 DC |
270 | ), |
271 | ArchiveType::DynamicIndex => | |
272 | verify_dynamic_index( | |
6b809ff5 | 273 | datastore.clone(), |
d8594d87 DC |
274 | &backup_dir, |
275 | info, | |
6b809ff5 DM |
276 | verified_chunks.clone(), |
277 | corrupt_chunks.clone(), | |
278 | worker.clone(), | |
d8594d87 | 279 | ), |
6b809ff5 | 280 | ArchiveType::Blob => verify_blob(datastore.clone(), &backup_dir, info), |
c2009e53 DM |
281 | } |
282 | }); | |
8ea00f6e DM |
283 | |
284 | worker.fail_on_abort()?; | |
deef6369 | 285 | crate::tools::fail_on_shutdown()?; |
8ea00f6e | 286 | |
c2009e53 DM |
287 | if let Err(err) = result { |
288 | worker.log(format!("verify {}:{}/{} failed: {}", datastore.name(), backup_dir, info.filename, err)); | |
289 | error_count += 1; | |
3b2046d2 | 290 | verify_result = "failed"; |
c2009e53 | 291 | } |
3b2046d2 | 292 | |
c2009e53 DM |
293 | } |
294 | ||
3b2046d2 TL |
295 | let verify_state = SnapshotVerifyState { |
296 | state: verify_result.to_string(), | |
297 | upid: worker.upid().clone(), | |
298 | }; | |
299 | manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?; | |
300 | datastore.store_manifest(&backup_dir, serde_json::to_value(manifest)?) | |
301 | .map_err(|err| format_err!("unable to store manifest blob - {}", err))?; | |
302 | ||
303 | ||
8ea00f6e | 304 | Ok(error_count == 0) |
c2009e53 DM |
305 | } |
306 | ||
8ea00f6e DM |
307 | /// Verify all backups inside a backup group |
308 | /// | |
309 | /// Errors are logged to the worker log. | |
310 | /// | |
311 | /// Returns | |
63d9aca9 | 312 | /// - Ok((count, failed_dirs)) where failed_dirs had verification errors |
8ea00f6e | 313 | /// - Err(_) if task was aborted |
4f09d310 DM |
314 | pub fn verify_backup_group( |
315 | datastore: Arc<DataStore>, | |
316 | group: &BackupGroup, | |
317 | verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
318 | corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
63d9aca9 | 319 | progress: Option<(usize, usize)>, // (done, snapshot_count) |
4f09d310 | 320 | worker: Arc<WorkerTask>, |
63d9aca9 | 321 | ) -> Result<(usize, Vec<String>), Error> { |
c2009e53 | 322 | |
adfdc369 | 323 | let mut errors = Vec::new(); |
c2009e53 DM |
324 | let mut list = match group.list_backups(&datastore.base_path()) { |
325 | Ok(list) => list, | |
326 | Err(err) => { | |
327 | worker.log(format!("verify group {}:{} - unable to list backups: {}", datastore.name(), group, err)); | |
63d9aca9 | 328 | return Ok((0, errors)); |
c2009e53 DM |
329 | } |
330 | }; | |
331 | ||
332 | worker.log(format!("verify group {}:{}", datastore.name(), group)); | |
333 | ||
63d9aca9 DM |
334 | let (done, snapshot_count) = progress.unwrap_or((0, list.len())); |
335 | ||
336 | let mut count = 0; | |
c2009e53 DM |
337 | BackupInfo::sort_list(&mut list, false); // newest first |
338 | for info in list { | |
63d9aca9 | 339 | count += 1; |
6b809ff5 | 340 | if !verify_backup_dir(datastore.clone(), &info.backup_dir, verified_chunks.clone(), corrupt_chunks.clone(), worker.clone())?{ |
adfdc369 | 341 | errors.push(info.backup_dir.to_string()); |
c2009e53 | 342 | } |
63d9aca9 DM |
343 | if snapshot_count != 0 { |
344 | let pos = done + count; | |
345 | let percentage = ((pos as f64) * 100.0)/(snapshot_count as f64); | |
346 | worker.log(format!("percentage done: {:.2}% ({} of {} snapshots)", percentage, pos, snapshot_count)); | |
347 | } | |
c2009e53 DM |
348 | } |
349 | ||
63d9aca9 | 350 | Ok((count, errors)) |
c2009e53 DM |
351 | } |
352 | ||
8ea00f6e DM |
353 | /// Verify all backups inside a datastore |
354 | /// | |
355 | /// Errors are logged to the worker log. | |
356 | /// | |
357 | /// Returns | |
adfdc369 | 358 | /// - Ok(failed_dirs) where failed_dirs had verification errors |
8ea00f6e | 359 | /// - Err(_) if task was aborted |
6b809ff5 | 360 | pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) -> Result<Vec<String>, Error> { |
adfdc369 DC |
361 | |
362 | let mut errors = Vec::new(); | |
c2009e53 | 363 | |
4264c502 | 364 | let mut list = match BackupGroup::list_groups(&datastore.base_path()) { |
c2009e53 DM |
365 | Ok(list) => list, |
366 | Err(err) => { | |
367 | worker.log(format!("verify datastore {} - unable to list backups: {}", datastore.name(), err)); | |
adfdc369 | 368 | return Ok(errors); |
c2009e53 DM |
369 | } |
370 | }; | |
371 | ||
4264c502 DM |
372 | list.sort_unstable(); |
373 | ||
63d9aca9 DM |
374 | let mut snapshot_count = 0; |
375 | for group in list.iter() { | |
376 | snapshot_count += group.list_backups(&datastore.base_path())?.len(); | |
377 | } | |
378 | ||
4f09d310 DM |
379 | // start with 16384 chunks (up to 65GB) |
380 | let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16))); | |
381 | ||
382 | // start with 64 chunks since we assume there are few corrupt ones | |
383 | let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64))); | |
384 | ||
63d9aca9 | 385 | worker.log(format!("verify datastore {} ({} snapshots)", datastore.name(), snapshot_count)); |
c2009e53 | 386 | |
63d9aca9 | 387 | let mut done = 0; |
c2009e53 | 388 | for group in list { |
63d9aca9 | 389 | let (count, mut group_errors) = verify_backup_group( |
4f09d310 DM |
390 | datastore.clone(), |
391 | &group, | |
392 | verified_chunks.clone(), | |
393 | corrupt_chunks.clone(), | |
63d9aca9 | 394 | Some((done, snapshot_count)), |
4f09d310 DM |
395 | worker.clone(), |
396 | )?; | |
adfdc369 | 397 | errors.append(&mut group_errors); |
63d9aca9 DM |
398 | |
399 | done += count; | |
c2009e53 DM |
400 | } |
401 | ||
adfdc369 | 402 | Ok(errors) |
c2009e53 | 403 | } |