]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/backup/environment.rs
fix #2909: handle missing chunks gracefully in garbage collection
[proxmox-backup.git] / src / api2 / backup / environment.rs
CommitLineData
9fa55e09 1use anyhow::{bail, format_err, Error};
f9578f3c 2use std::sync::{Arc, Mutex};
d53fbe24 3use std::collections::{HashMap, HashSet};
d95ced64 4
c8fff67d 5use ::serde::{Serialize};
e8d1da6a 6use serde_json::{json, Value};
d95ced64 7
feaa1ad3
WB
8use proxmox::tools::digest_to_hex;
9use proxmox::tools::fs::{replace_file, CreateOptions};
a2479cfa 10use proxmox::api::{RpcEnvironment, RpcEnvironmentType};
00388226 11
d95ced64 12use crate::server::WorkerTask;
21ee7912 13use crate::backup::*;
b4b63e52
DM
14use crate::server::formatter::*;
15use hyper::{Body, Response};
d95ced64 16
c8fff67d 17#[derive(Copy, Clone, Serialize)]
96482891
DM
18struct UploadStatistic {
19 count: u64,
20 size: u64,
21 compressed_size: u64,
22 duplicates: u64,
23}
24
25impl UploadStatistic {
26 fn new() -> Self {
27 Self {
28 count: 0,
29 size: 0,
30 compressed_size: 0,
31 duplicates: 0,
32 }
33 }
34}
8bea85b4 35
c8fff67d
DM
36impl std::ops::Add for UploadStatistic {
37 type Output = Self;
38
39 fn add(self, other: Self) -> Self {
40 Self {
41 count: self.count + other.count,
42 size: self.size + other.size,
43 compressed_size: self.compressed_size + other.compressed_size,
44 duplicates: self.duplicates + other.duplicates,
45 }
46 }
47}
48
8bea85b4
DM
49struct DynamicWriterState {
50 name: String,
51 index: DynamicIndexWriter,
52 offset: u64,
53 chunk_count: u64,
96482891 54 upload_stat: UploadStatistic,
8bea85b4
DM
55}
56
a42fa400
DM
57struct FixedWriterState {
58 name: String,
59 index: FixedIndexWriter,
60 size: usize,
61 chunk_size: u32,
62 chunk_count: u64,
5e04ec70 63 small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller)
96482891 64 upload_stat: UploadStatistic,
facd9801 65 incremental: bool,
a42fa400
DM
66}
67
f9578f3c 68struct SharedBackupState {
372724af 69 finished: bool,
f9578f3c 70 uid_counter: usize,
add5861e 71 file_counter: usize, // successfully uploaded files
8bea85b4 72 dynamic_writers: HashMap<usize, DynamicWriterState>,
a42fa400 73 fixed_writers: HashMap<usize, FixedWriterState>,
a09c0e38 74 known_chunks: HashMap<[u8;32], u32>,
d53fbe24 75 base_snapshots: HashSet<BackupDir>,
c8fff67d
DM
76 backup_size: u64, // sums up size of all files
77 backup_stat: UploadStatistic,
f9578f3c
DM
78}
79
372724af
DM
80impl SharedBackupState {
81
82 // Raise error if finished flag is set
83 fn ensure_unfinished(&self) -> Result<(), Error> {
84 if self.finished {
85 bail!("backup already marked as finished.");
86 }
87 Ok(())
88 }
89
90 // Get an unique integer ID
91 pub fn next_uid(&mut self) -> usize {
92 self.uid_counter += 1;
93 self.uid_counter
94 }
95}
96
97
d95ced64
DM
98/// `RpcEnvironmet` implementation for backup service
99#[derive(Clone)]
100pub struct BackupEnvironment {
101 env_type: RpcEnvironmentType,
e8d1da6a 102 result_attributes: Value,
d95ced64 103 user: String,
a42d1f55 104 pub debug: bool,
b4b63e52 105 pub formatter: &'static OutputFormatter,
21ee7912
DM
106 pub worker: Arc<WorkerTask>,
107 pub datastore: Arc<DataStore>,
f9578f3c 108 pub backup_dir: BackupDir,
b02a52e3 109 pub last_backup: Option<BackupInfo>,
f9578f3c 110 state: Arc<Mutex<SharedBackupState>>
d95ced64
DM
111}
112
113impl BackupEnvironment {
f9578f3c
DM
114 pub fn new(
115 env_type: RpcEnvironmentType,
116 user: String,
117 worker: Arc<WorkerTask>,
118 datastore: Arc<DataStore>,
119 backup_dir: BackupDir,
f9578f3c
DM
120 ) -> Self {
121
122 let state = SharedBackupState {
372724af 123 finished: false,
f9578f3c 124 uid_counter: 0,
e6389f4e 125 file_counter: 0,
f9578f3c 126 dynamic_writers: HashMap::new(),
a42fa400 127 fixed_writers: HashMap::new(),
a09c0e38 128 known_chunks: HashMap::new(),
d53fbe24 129 base_snapshots: HashSet::new(),
c8fff67d
DM
130 backup_size: 0,
131 backup_stat: UploadStatistic::new(),
f9578f3c
DM
132 };
133
d95ced64 134 Self {
e8d1da6a 135 result_attributes: json!({}),
d95ced64
DM
136 env_type,
137 user,
138 worker,
21ee7912 139 datastore,
a42d1f55 140 debug: false,
b4b63e52 141 formatter: &JSON_FORMATTER,
f9578f3c 142 backup_dir,
b02a52e3 143 last_backup: None,
f9578f3c 144 state: Arc::new(Mutex::new(state)),
d95ced64
DM
145 }
146 }
147
d53fbe24
SR
148 /// Register a snapshot as a predecessor of the current backup.
149 /// It's existance will be ensured on finishing.
150 pub fn register_base_snapshot(&self, snap: BackupDir) {
151 let mut state = self.state.lock().unwrap();
152 state.base_snapshots.insert(snap);
153 }
154
96482891
DM
155 /// Register a Chunk with associated length.
156 ///
157 /// We do not fully trust clients, so a client may only use registered
158 /// chunks. Please use this method to register chunks from previous backups.
a09c0e38
DM
159 pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
160 let mut state = self.state.lock().unwrap();
161
162 state.ensure_unfinished()?;
163
164 state.known_chunks.insert(digest, length);
165
166 Ok(())
167 }
168
96482891
DM
169 /// Register fixed length chunks after upload.
170 ///
171 /// Like `register_chunk()`, but additionally record statistics for
172 /// the fixed index writer.
642322b4
DM
173 pub fn register_fixed_chunk(
174 &self,
175 wid: usize,
176 digest: [u8; 32],
177 size: u32,
178 compressed_size: u32,
179 is_duplicate: bool,
180 ) -> Result<(), Error> {
181 let mut state = self.state.lock().unwrap();
182
183 state.ensure_unfinished()?;
184
185 let mut data = match state.fixed_writers.get_mut(&wid) {
186 Some(data) => data,
187 None => bail!("fixed writer '{}' not registered", wid),
188 };
189
5e04ec70
DM
190 if size > data.chunk_size {
191 bail!("fixed writer '{}' - got large chunk ({} > {}", data.name, size, data.chunk_size);
192 } else if size < data.chunk_size {
193 data.small_chunk_count += 1;
194 if data.small_chunk_count > 1 {
195 bail!("fixed writer '{}' - detected multiple end chunks (chunk size too small)");
196 }
642322b4
DM
197 }
198
96482891
DM
199 // record statistics
200 data.upload_stat.count += 1;
201 data.upload_stat.size += size as u64;
202 data.upload_stat.compressed_size += compressed_size as u64;
203 if is_duplicate { data.upload_stat.duplicates += 1; }
204
205 // register chunk
642322b4
DM
206 state.known_chunks.insert(digest, size);
207
208 Ok(())
209 }
210
96482891
DM
211 /// Register dynamic length chunks after upload.
212 ///
213 /// Like `register_chunk()`, but additionally record statistics for
214 /// the dynamic index writer.
642322b4
DM
215 pub fn register_dynamic_chunk(
216 &self,
217 wid: usize,
218 digest: [u8; 32],
219 size: u32,
220 compressed_size: u32,
221 is_duplicate: bool,
222 ) -> Result<(), Error> {
223 let mut state = self.state.lock().unwrap();
224
225 state.ensure_unfinished()?;
226
227 let mut data = match state.dynamic_writers.get_mut(&wid) {
228 Some(data) => data,
229 None => bail!("dynamic writer '{}' not registered", wid),
230 };
231
96482891
DM
232 // record statistics
233 data.upload_stat.count += 1;
234 data.upload_stat.size += size as u64;
235 data.upload_stat.compressed_size += compressed_size as u64;
236 if is_duplicate { data.upload_stat.duplicates += 1; }
237
238 // register chunk
642322b4
DM
239 state.known_chunks.insert(digest, size);
240
241 Ok(())
242 }
243
a09c0e38
DM
244 pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
245 let state = self.state.lock().unwrap();
246
247 match state.known_chunks.get(digest) {
248 Some(len) => Some(*len),
249 None => None,
250 }
251 }
252
372724af 253 /// Store the writer with an unique ID
8bea85b4 254 pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
f9578f3c 255 let mut state = self.state.lock().unwrap();
f9578f3c 256
372724af
DM
257 state.ensure_unfinished()?;
258
259 let uid = state.next_uid();
f9578f3c 260
8bea85b4 261 state.dynamic_writers.insert(uid, DynamicWriterState {
96482891 262 index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(),
8bea85b4 263 });
372724af
DM
264
265 Ok(uid)
f9578f3c
DM
266 }
267
a42fa400 268 /// Store the writer with an unique ID
facd9801 269 pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32, incremental: bool) -> Result<usize, Error> {
a42fa400
DM
270 let mut state = self.state.lock().unwrap();
271
272 state.ensure_unfinished()?;
273
274 let uid = state.next_uid();
275
276 state.fixed_writers.insert(uid, FixedWriterState {
facd9801 277 index, name, chunk_count: 0, size, chunk_size, small_chunk_count: 0, upload_stat: UploadStatistic::new(), incremental,
a42fa400
DM
278 });
279
280 Ok(uid)
281 }
282
f9578f3c 283 /// Append chunk to dynamic writer
417cb073 284 pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
f9578f3c
DM
285 let mut state = self.state.lock().unwrap();
286
372724af
DM
287 state.ensure_unfinished()?;
288
f9578f3c
DM
289 let mut data = match state.dynamic_writers.get_mut(&wid) {
290 Some(data) => data,
291 None => bail!("dynamic writer '{}' not registered", wid),
292 };
293
f9578f3c 294
417cb073
DM
295 if data.offset != offset {
296 bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
297 data.name, data.offset, offset);
298 }
299
3dc5b2a2
DM
300 data.offset += size as u64;
301 data.chunk_count += 1;
302
8bea85b4 303 data.index.add_chunk(data.offset, digest)?;
f9578f3c
DM
304
305 Ok(())
306 }
307
a42fa400
DM
308 /// Append chunk to fixed writer
309 pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
310 let mut state = self.state.lock().unwrap();
311
312 state.ensure_unfinished()?;
313
314 let mut data = match state.fixed_writers.get_mut(&wid) {
315 Some(data) => data,
316 None => bail!("fixed writer '{}' not registered", wid),
317 };
318
5e04ec70
DM
319 let end = (offset as usize) + (size as usize);
320 let idx = data.index.check_chunk_alignment(end, size as usize)?;
a42fa400 321
5e04ec70 322 data.chunk_count += 1;
a42fa400 323
5e04ec70 324 data.index.add_digest(idx, digest)?;
a42fa400
DM
325
326 Ok(())
327 }
328
00388226 329 fn log_upload_stat(&self, archive_name: &str, csum: &[u8; 32], uuid: &[u8; 16], size: u64, chunk_count: u64, upload_stat: &UploadStatistic) {
96482891 330 self.log(format!("Upload statistics for '{}'", archive_name));
e18a6c9e
DM
331 self.log(format!("UUID: {}", digest_to_hex(uuid)));
332 self.log(format!("Checksum: {}", digest_to_hex(csum)));
96482891
DM
333 self.log(format!("Size: {}", size));
334 self.log(format!("Chunk count: {}", chunk_count));
36075475
DM
335
336 if size == 0 || chunk_count == 0 {
337 return;
338 }
339
96482891 340 self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size));
36075475 341
8268c9d1
SR
342 // account for zero chunk, which might be uploaded but never used
343 let client_side_duplicates = if chunk_count < upload_stat.count {
344 0
345 } else {
346 chunk_count - upload_stat.count
347 };
348
36075475
DM
349 let server_side_duplicates = upload_stat.duplicates;
350
351 if (client_side_duplicates + server_side_duplicates) > 0 {
352 let per = (client_side_duplicates + server_side_duplicates)*100/chunk_count;
353 self.log(format!("Duplicates: {}+{} ({}%)", client_side_duplicates, server_side_duplicates, per));
354 }
355
96482891 356 if upload_stat.size > 0 {
36075475 357 self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size));
96482891
DM
358 }
359 }
360
a2077252 361 /// Close dynamic writer
fb6026b6 362 pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
a2077252
DM
363 let mut state = self.state.lock().unwrap();
364
372724af
DM
365 state.ensure_unfinished()?;
366
a2077252
DM
367 let mut data = match state.dynamic_writers.remove(&wid) {
368 Some(data) => data,
369 None => bail!("dynamic writer '{}' not registered", wid),
370 };
371
8bea85b4
DM
372 if data.chunk_count != chunk_count {
373 bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
374 }
375
376 if data.offset != size {
377 bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
378 }
379
00388226
DM
380 let uuid = data.index.uuid;
381
fb6026b6
DM
382 let expected_csum = data.index.close()?;
383
fb6026b6
DM
384 if csum != expected_csum {
385 bail!("dynamic writer '{}' close failed - got unexpected checksum", data.name);
386 }
a2077252 387
00388226 388 self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 389
e6389f4e 390 state.file_counter += 1;
c8fff67d
DM
391 state.backup_size += size;
392 state.backup_stat = state.backup_stat + data.upload_stat;
e6389f4e 393
a2077252
DM
394 Ok(())
395 }
396
a42fa400 397 /// Close fixed writer
fb6026b6 398 pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
a42fa400
DM
399 let mut state = self.state.lock().unwrap();
400
401 state.ensure_unfinished()?;
402
403 let mut data = match state.fixed_writers.remove(&wid) {
404 Some(data) => data,
405 None => bail!("fixed writer '{}' not registered", wid),
406 };
407
408 if data.chunk_count != chunk_count {
006f3ff4 409 bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
a42fa400
DM
410 }
411
facd9801
SR
412 if !data.incremental {
413 let expected_count = data.index.index_length();
006f3ff4 414
facd9801
SR
415 if chunk_count != (expected_count as u64) {
416 bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count);
417 }
006f3ff4 418
facd9801
SR
419 if size != (data.size as u64) {
420 bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
421 }
006f3ff4 422 }
a42fa400 423
00388226 424 let uuid = data.index.uuid;
fb6026b6 425 let expected_csum = data.index.close()?;
a42fa400 426
fb6026b6
DM
427 if csum != expected_csum {
428 bail!("fixed writer '{}' close failed - got unexpected checksum", data.name);
429 }
430
431 self.log_upload_stat(&data.name, &expected_csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 432
e6389f4e 433 state.file_counter += 1;
c8fff67d
DM
434 state.backup_size += size;
435 state.backup_stat = state.backup_stat + data.upload_stat;
e6389f4e 436
a42fa400
DM
437 Ok(())
438 }
439
46bd8800
DM
440 pub fn add_blob(&self, file_name: &str, data: Vec<u8>) -> Result<(), Error> {
441
442 let mut path = self.datastore.base_path();
443 path.push(self.backup_dir.relative_path());
444 path.push(file_name);
445
446 let blob_len = data.len();
447 let orig_len = data.len(); // fixme:
448
39f18b30
DM
449 // always verify blob/CRC at server side
450 let blob = DataBlob::load_from_reader(&mut &data[..])?;
46bd8800
DM
451
452 let raw_data = blob.raw_data();
feaa1ad3 453 replace_file(&path, raw_data, CreateOptions::new())?;
46bd8800
DM
454
455 self.log(format!("add blob {:?} ({} bytes, comp: {})", path, orig_len, blob_len));
456
457 let mut state = self.state.lock().unwrap();
458 state.file_counter += 1;
c8fff67d
DM
459 state.backup_size += orig_len as u64;
460 state.backup_stat.size += blob_len as u64;
46bd8800
DM
461
462 Ok(())
463 }
464
372724af
DM
465 /// Mark backup as finished
466 pub fn finish_backup(&self) -> Result<(), Error> {
467 let mut state = self.state.lock().unwrap();
468 // test if all writer are correctly closed
469
470 state.ensure_unfinished()?;
471
372724af
DM
472 if state.dynamic_writers.len() != 0 {
473 bail!("found open index writer - unable to finish backup");
474 }
475
e6389f4e
DM
476 if state.file_counter == 0 {
477 bail!("backup does not contain valid files (file count == 0)");
478 }
479
9fa55e09
DM
480 state.finished = true;
481
482 // check manifest
c8fff67d 483 let mut manifest = self.datastore.load_manifest_json(&self.backup_dir)
9fa55e09
DM
484 .map_err(|err| format_err!("unable to load manifest blob - {}", err))?;
485
c8fff67d
DM
486 let stats = serde_json::to_value(state.backup_stat)?;
487
488 manifest["unprotected"]["chunk_upload_stats"] = stats;
489
490 self.datastore.store_manifest(&self.backup_dir, manifest)
491 .map_err(|err| format_err!("unable to store manifest blob - {}", err))?;
9fa55e09 492
d53fbe24
SR
493 for snap in &state.base_snapshots {
494 let path = self.datastore.snapshot_path(snap);
495 if !path.exists() {
496 bail!(
497 "base snapshot {} was removed during backup, cannot finish as chunks might be missing",
498 snap
499 );
500 }
501 }
502
476b4aca 503
372724af
DM
504 Ok(())
505 }
506
d95ced64
DM
507 pub fn log<S: AsRef<str>>(&self, msg: S) {
508 self.worker.log(msg);
509 }
b4b63e52 510
a42d1f55
DM
511 pub fn debug<S: AsRef<str>>(&self, msg: S) {
512 if self.debug { self.worker.log(msg); }
513 }
514
b4b63e52
DM
515 pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
516 match result {
517 Ok(data) => (self.formatter.format_data)(data, self),
518 Err(err) => (self.formatter.format_error)(err),
519 }
520 }
372724af
DM
521
522 /// Raise error if finished flag is not set
523 pub fn ensure_finished(&self) -> Result<(), Error> {
524 let state = self.state.lock().unwrap();
525 if !state.finished {
526 bail!("backup ended but finished flag is not set.");
527 }
528 Ok(())
529 }
530
531 /// Remove complete backup
532 pub fn remove_backup(&self) -> Result<(), Error> {
533 let mut state = self.state.lock().unwrap();
534 state.finished = true;
535
c9756b40 536 self.datastore.remove_backup_dir(&self.backup_dir, true)?;
372724af
DM
537
538 Ok(())
539 }
d95ced64
DM
540}
541
542impl RpcEnvironment for BackupEnvironment {
543
e8d1da6a
DM
544 fn result_attrib_mut(&mut self) -> &mut Value {
545 &mut self.result_attributes
d95ced64
DM
546 }
547
e8d1da6a
DM
548 fn result_attrib(&self) -> &Value {
549 &self.result_attributes
d95ced64
DM
550 }
551
552 fn env_type(&self) -> RpcEnvironmentType {
553 self.env_type
554 }
555
556 fn set_user(&mut self, _user: Option<String>) {
557 panic!("unable to change user");
558 }
559
560 fn get_user(&self) -> Option<String> {
561 Some(self.user.clone())
562 }
563}
21ee7912 564
dd5495d6 565impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
21ee7912
DM
566 fn as_ref(&self) -> &BackupEnvironment {
567 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
568 }
569}
dd5495d6
WB
570
571impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
b4b63e52
DM
572 fn as_ref(&self) -> &BackupEnvironment {
573 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
574 }
575}