]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/verify.rs
datastore content: search: set emptytext to searched columns
[proxmox-backup.git] / src / backup / verify.rs
1 use std::collections::HashSet;
2 use std::sync::{Arc, Mutex};
3 use std::sync::atomic::{Ordering, AtomicUsize};
4 use std::time::Instant;
5
6 use anyhow::{bail, format_err, Error};
7
8 use crate::server::WorkerTask;
9 use crate::api2::types::*;
10
11 use super::{
12 DataStore, DataBlob, BackupGroup, BackupDir, BackupInfo, IndexFile,
13 CryptMode,
14 FileInfo, ArchiveType, archive_type,
15 };
16
17 fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
18
19 let blob = datastore.load_blob(backup_dir, &info.filename)?;
20
21 let raw_size = blob.raw_size();
22 if raw_size != info.size {
23 bail!("wrong size ({} != {})", info.size, raw_size);
24 }
25
26 let csum = openssl::sha::sha256(blob.raw_data());
27 if csum != info.csum {
28 bail!("wrong index checksum");
29 }
30
31 match blob.crypt_mode()? {
32 CryptMode::Encrypt => Ok(()),
33 CryptMode::None => {
34 // digest already verified above
35 blob.decode(None, None)?;
36 Ok(())
37 },
38 CryptMode::SignOnly => bail!("Invalid CryptMode for blob"),
39 }
40 }
41
42 // We use a separate thread to read/load chunks, so that we can do
43 // load and verify in parallel to increase performance.
44 fn chunk_reader_thread(
45 datastore: Arc<DataStore>,
46 index: Box<dyn IndexFile + Send>,
47 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
48 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
49 errors: Arc<AtomicUsize>,
50 worker: Arc<WorkerTask>,
51 ) -> std::sync::mpsc::Receiver<(DataBlob, [u8;32], u64)> {
52
53 let (sender, receiver) = std::sync::mpsc::sync_channel(3); // buffer up to 3 chunks
54
55 std::thread::spawn(move|| {
56 for pos in 0..index.index_count() {
57 let info = index.chunk_info(pos).unwrap();
58 let size = info.range.end - info.range.start;
59
60 if verified_chunks.lock().unwrap().contains(&info.digest) {
61 continue; // already verified
62 }
63
64 if corrupt_chunks.lock().unwrap().contains(&info.digest) {
65 let digest_str = proxmox::tools::digest_to_hex(&info.digest);
66 worker.log(format!("chunk {} was marked as corrupt", digest_str));
67 errors.fetch_add(1, Ordering::SeqCst);
68 continue;
69 }
70
71 match datastore.load_chunk(&info.digest) {
72 Err(err) => {
73 corrupt_chunks.lock().unwrap().insert(info.digest);
74 worker.log(format!("can't verify chunk, load failed - {}", err));
75 errors.fetch_add(1, Ordering::SeqCst);
76 continue;
77 }
78 Ok(chunk) => {
79 if sender.send((chunk, info.digest, size)).is_err() {
80 break; // receiver gone - simply stop
81 }
82 }
83 }
84 }
85 });
86
87 receiver
88 }
89
90 fn verify_index_chunks(
91 datastore: Arc<DataStore>,
92 index: Box<dyn IndexFile + Send>,
93 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
94 corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
95 crypt_mode: CryptMode,
96 worker: Arc<WorkerTask>,
97 ) -> Result<(), Error> {
98
99 let errors = Arc::new(AtomicUsize::new(0));
100
101 let start_time = Instant::now();
102
103 let chunk_channel = chunk_reader_thread(
104 datastore,
105 index,
106 verified_chunks.clone(),
107 corrupt_chunks.clone(),
108 errors.clone(),
109 worker.clone(),
110 );
111
112 let mut read_bytes = 0;
113 let mut decoded_bytes = 0;
114
115 loop {
116
117 worker.fail_on_abort()?;
118
119 let (chunk, digest, size) = match chunk_channel.recv() {
120 Ok(tuple) => tuple,
121 Err(std::sync::mpsc::RecvError) => break,
122 };
123
124 read_bytes += chunk.raw_size();
125 decoded_bytes += size;
126
127 let chunk_crypt_mode = match chunk.crypt_mode() {
128 Err(err) => {
129 corrupt_chunks.lock().unwrap().insert(digest);
130 worker.log(format!("can't verify chunk, unknown CryptMode - {}", err));
131 errors.fetch_add(1, Ordering::SeqCst);
132 continue;
133 },
134 Ok(mode) => mode,
135 };
136
137 if chunk_crypt_mode != crypt_mode {
138 worker.log(format!(
139 "chunk CryptMode {:?} does not match index CryptMode {:?}",
140 chunk_crypt_mode,
141 crypt_mode
142 ));
143 errors.fetch_add(1, Ordering::SeqCst);
144 }
145
146 if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
147 corrupt_chunks.lock().unwrap().insert(digest);
148 worker.log(format!("{}", err));
149 errors.fetch_add(1, Ordering::SeqCst);
150 } else {
151 verified_chunks.lock().unwrap().insert(digest);
152 }
153 }
154
155 let elapsed = start_time.elapsed().as_secs_f64();
156
157 let read_bytes_mib = (read_bytes as f64)/(1024.0*1024.0);
158 let decoded_bytes_mib = (decoded_bytes as f64)/(1024.0*1024.0);
159
160 let read_speed = read_bytes_mib/elapsed;
161 let decode_speed = decoded_bytes_mib/elapsed;
162
163 let error_count = errors.load(Ordering::SeqCst);
164
165 worker.log(format!(" verified {:.2}/{:.2} Mib in {:.2} seconds, speed {:.2}/{:.2} Mib/s ({} errors)",
166 read_bytes_mib, decoded_bytes_mib, elapsed, read_speed, decode_speed, error_count));
167
168 if errors.load(Ordering::SeqCst) > 0 {
169 bail!("chunks could not be verified");
170 }
171
172 Ok(())
173 }
174
175 fn verify_fixed_index(
176 datastore: Arc<DataStore>,
177 backup_dir: &BackupDir,
178 info: &FileInfo,
179 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
180 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
181 worker: Arc<WorkerTask>,
182 ) -> Result<(), Error> {
183
184 let mut path = backup_dir.relative_path();
185 path.push(&info.filename);
186
187 let index = datastore.open_fixed_reader(&path)?;
188
189 let (csum, size) = index.compute_csum();
190 if size != info.size {
191 bail!("wrong size ({} != {})", info.size, size);
192 }
193
194 if csum != info.csum {
195 bail!("wrong index checksum");
196 }
197
198 verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker)
199 }
200
201 fn verify_dynamic_index(
202 datastore: Arc<DataStore>,
203 backup_dir: &BackupDir,
204 info: &FileInfo,
205 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
206 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
207 worker: Arc<WorkerTask>,
208 ) -> Result<(), Error> {
209
210 let mut path = backup_dir.relative_path();
211 path.push(&info.filename);
212
213 let index = datastore.open_dynamic_reader(&path)?;
214
215 let (csum, size) = index.compute_csum();
216 if size != info.size {
217 bail!("wrong size ({} != {})", info.size, size);
218 }
219
220 if csum != info.csum {
221 bail!("wrong index checksum");
222 }
223
224 verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker)
225 }
226
227 /// Verify a single backup snapshot
228 ///
229 /// This checks all archives inside a backup snapshot.
230 /// Errors are logged to the worker log.
231 ///
232 /// Returns
233 /// - Ok(true) if verify is successful
234 /// - Ok(false) if there were verification errors
235 /// - Err(_) if task was aborted
236 pub fn verify_backup_dir(
237 datastore: Arc<DataStore>,
238 backup_dir: &BackupDir,
239 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
240 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
241 worker: Arc<WorkerTask>
242 ) -> Result<bool, Error> {
243
244 let mut manifest = match datastore.load_manifest(&backup_dir) {
245 Ok((manifest, _)) => manifest,
246 Err(err) => {
247 worker.log(format!("verify {}:{} - manifest load error: {}", datastore.name(), backup_dir, err));
248 return Ok(false);
249 }
250 };
251
252 worker.log(format!("verify {}:{}", datastore.name(), backup_dir));
253
254 let mut error_count = 0;
255
256 let mut verify_result = "ok";
257 for info in manifest.files() {
258 let result = proxmox::try_block!({
259 worker.log(format!(" check {}", info.filename));
260 match archive_type(&info.filename)? {
261 ArchiveType::FixedIndex =>
262 verify_fixed_index(
263 datastore.clone(),
264 &backup_dir,
265 info,
266 verified_chunks.clone(),
267 corrupt_chunks.clone(),
268 worker.clone(),
269 ),
270 ArchiveType::DynamicIndex =>
271 verify_dynamic_index(
272 datastore.clone(),
273 &backup_dir,
274 info,
275 verified_chunks.clone(),
276 corrupt_chunks.clone(),
277 worker.clone(),
278 ),
279 ArchiveType::Blob => verify_blob(datastore.clone(), &backup_dir, info),
280 }
281 });
282
283 worker.fail_on_abort()?;
284
285 if let Err(err) = result {
286 worker.log(format!("verify {}:{}/{} failed: {}", datastore.name(), backup_dir, info.filename, err));
287 error_count += 1;
288 verify_result = "failed";
289 }
290
291 }
292
293 let verify_state = SnapshotVerifyState {
294 state: verify_result.to_string(),
295 upid: worker.upid().clone(),
296 };
297 manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?;
298 datastore.store_manifest(&backup_dir, serde_json::to_value(manifest)?)
299 .map_err(|err| format_err!("unable to store manifest blob - {}", err))?;
300
301
302 Ok(error_count == 0)
303 }
304
305 /// Verify all backups inside a backup group
306 ///
307 /// Errors are logged to the worker log.
308 ///
309 /// Returns
310 /// - Ok(failed_dirs) where failed_dirs had verification errors
311 /// - Err(_) if task was aborted
312 pub fn verify_backup_group(
313 datastore: Arc<DataStore>,
314 group: &BackupGroup,
315 verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
316 corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
317 worker: Arc<WorkerTask>,
318 ) -> Result<Vec<String>, Error> {
319
320 let mut errors = Vec::new();
321 let mut list = match group.list_backups(&datastore.base_path()) {
322 Ok(list) => list,
323 Err(err) => {
324 worker.log(format!("verify group {}:{} - unable to list backups: {}", datastore.name(), group, err));
325 return Ok(errors);
326 }
327 };
328
329 worker.log(format!("verify group {}:{}", datastore.name(), group));
330
331 BackupInfo::sort_list(&mut list, false); // newest first
332 for info in list {
333 if !verify_backup_dir(datastore.clone(), &info.backup_dir, verified_chunks.clone(), corrupt_chunks.clone(), worker.clone())?{
334 errors.push(info.backup_dir.to_string());
335 }
336 }
337
338 Ok(errors)
339 }
340
341 /// Verify all backups inside a datastore
342 ///
343 /// Errors are logged to the worker log.
344 ///
345 /// Returns
346 /// - Ok(failed_dirs) where failed_dirs had verification errors
347 /// - Err(_) if task was aborted
348 pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) -> Result<Vec<String>, Error> {
349
350 let mut errors = Vec::new();
351
352 let mut list = match BackupGroup::list_groups(&datastore.base_path()) {
353 Ok(list) => list,
354 Err(err) => {
355 worker.log(format!("verify datastore {} - unable to list backups: {}", datastore.name(), err));
356 return Ok(errors);
357 }
358 };
359
360 list.sort_unstable();
361
362 // start with 16384 chunks (up to 65GB)
363 let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16)));
364
365 // start with 64 chunks since we assume there are few corrupt ones
366 let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
367
368 worker.log(format!("verify datastore {}", datastore.name()));
369
370 for group in list {
371 let mut group_errors = verify_backup_group(
372 datastore.clone(),
373 &group,
374 verified_chunks.clone(),
375 corrupt_chunks.clone(),
376 worker.clone(),
377 )?;
378 errors.append(&mut group_errors);
379 }
380
381 Ok(errors)
382 }