]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/backup/environment.rs
src/api2/backup/environment.rs: remove debug code
[proxmox-backup.git] / src / api2 / backup / environment.rs
CommitLineData
9fa55e09 1use anyhow::{bail, format_err, Error};
f9578f3c 2use std::sync::{Arc, Mutex};
d53fbe24 3use std::collections::{HashMap, HashSet};
d95ced64 4
c8fff67d 5use ::serde::{Serialize};
e8d1da6a 6use serde_json::{json, Value};
d95ced64 7
feaa1ad3
WB
8use proxmox::tools::digest_to_hex;
9use proxmox::tools::fs::{replace_file, CreateOptions};
a2479cfa 10use proxmox::api::{RpcEnvironment, RpcEnvironmentType};
00388226 11
d95ced64 12use crate::server::WorkerTask;
21ee7912 13use crate::backup::*;
b4b63e52
DM
14use crate::server::formatter::*;
15use hyper::{Body, Response};
d95ced64 16
c8fff67d 17#[derive(Copy, Clone, Serialize)]
96482891
DM
18struct UploadStatistic {
19 count: u64,
20 size: u64,
21 compressed_size: u64,
22 duplicates: u64,
23}
24
25impl UploadStatistic {
26 fn new() -> Self {
27 Self {
28 count: 0,
29 size: 0,
30 compressed_size: 0,
31 duplicates: 0,
32 }
33 }
34}
8bea85b4 35
c8fff67d
DM
36impl std::ops::Add for UploadStatistic {
37 type Output = Self;
38
39 fn add(self, other: Self) -> Self {
40 Self {
41 count: self.count + other.count,
42 size: self.size + other.size,
43 compressed_size: self.compressed_size + other.compressed_size,
44 duplicates: self.duplicates + other.duplicates,
45 }
46 }
47}
48
8bea85b4
DM
49struct DynamicWriterState {
50 name: String,
51 index: DynamicIndexWriter,
52 offset: u64,
53 chunk_count: u64,
96482891 54 upload_stat: UploadStatistic,
8bea85b4
DM
55}
56
a42fa400
DM
57struct FixedWriterState {
58 name: String,
59 index: FixedIndexWriter,
60 size: usize,
61 chunk_size: u32,
62 chunk_count: u64,
5e04ec70 63 small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller)
96482891 64 upload_stat: UploadStatistic,
facd9801 65 incremental: bool,
a42fa400
DM
66}
67
f9578f3c 68struct SharedBackupState {
372724af 69 finished: bool,
f9578f3c 70 uid_counter: usize,
add5861e 71 file_counter: usize, // successfully uploaded files
8bea85b4 72 dynamic_writers: HashMap<usize, DynamicWriterState>,
a42fa400 73 fixed_writers: HashMap<usize, FixedWriterState>,
a09c0e38 74 known_chunks: HashMap<[u8;32], u32>,
d53fbe24 75 base_snapshots: HashSet<BackupDir>,
c8fff67d
DM
76 backup_size: u64, // sums up size of all files
77 backup_stat: UploadStatistic,
f9578f3c
DM
78}
79
372724af
DM
80impl SharedBackupState {
81
82 // Raise error if finished flag is set
83 fn ensure_unfinished(&self) -> Result<(), Error> {
84 if self.finished {
85 bail!("backup already marked as finished.");
86 }
87 Ok(())
88 }
89
90 // Get an unique integer ID
91 pub fn next_uid(&mut self) -> usize {
92 self.uid_counter += 1;
93 self.uid_counter
94 }
95}
96
97
d95ced64
DM
98/// `RpcEnvironmet` implementation for backup service
99#[derive(Clone)]
100pub struct BackupEnvironment {
101 env_type: RpcEnvironmentType,
e8d1da6a 102 result_attributes: Value,
d95ced64 103 user: String,
a42d1f55 104 pub debug: bool,
b4b63e52 105 pub formatter: &'static OutputFormatter,
21ee7912
DM
106 pub worker: Arc<WorkerTask>,
107 pub datastore: Arc<DataStore>,
f9578f3c 108 pub backup_dir: BackupDir,
b02a52e3 109 pub last_backup: Option<BackupInfo>,
f9578f3c 110 state: Arc<Mutex<SharedBackupState>>
d95ced64
DM
111}
112
113impl BackupEnvironment {
f9578f3c
DM
114 pub fn new(
115 env_type: RpcEnvironmentType,
116 user: String,
117 worker: Arc<WorkerTask>,
118 datastore: Arc<DataStore>,
119 backup_dir: BackupDir,
f9578f3c
DM
120 ) -> Self {
121
122 let state = SharedBackupState {
372724af 123 finished: false,
f9578f3c 124 uid_counter: 0,
e6389f4e 125 file_counter: 0,
f9578f3c 126 dynamic_writers: HashMap::new(),
a42fa400 127 fixed_writers: HashMap::new(),
a09c0e38 128 known_chunks: HashMap::new(),
d53fbe24 129 base_snapshots: HashSet::new(),
c8fff67d
DM
130 backup_size: 0,
131 backup_stat: UploadStatistic::new(),
f9578f3c
DM
132 };
133
d95ced64 134 Self {
e8d1da6a 135 result_attributes: json!({}),
d95ced64
DM
136 env_type,
137 user,
138 worker,
21ee7912 139 datastore,
a42d1f55 140 debug: false,
b4b63e52 141 formatter: &JSON_FORMATTER,
f9578f3c 142 backup_dir,
b02a52e3 143 last_backup: None,
f9578f3c 144 state: Arc::new(Mutex::new(state)),
d95ced64
DM
145 }
146 }
147
d53fbe24
SR
148 /// Register a snapshot as a predecessor of the current backup.
149 /// It's existance will be ensured on finishing.
150 pub fn register_base_snapshot(&self, snap: BackupDir) {
151 let mut state = self.state.lock().unwrap();
152 state.base_snapshots.insert(snap);
153 }
154
96482891
DM
155 /// Register a Chunk with associated length.
156 ///
157 /// We do not fully trust clients, so a client may only use registered
158 /// chunks. Please use this method to register chunks from previous backups.
a09c0e38
DM
159 pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
160 let mut state = self.state.lock().unwrap();
161
162 state.ensure_unfinished()?;
163
164 state.known_chunks.insert(digest, length);
165
166 Ok(())
167 }
168
96482891
DM
169 /// Register fixed length chunks after upload.
170 ///
171 /// Like `register_chunk()`, but additionally record statistics for
172 /// the fixed index writer.
642322b4
DM
173 pub fn register_fixed_chunk(
174 &self,
175 wid: usize,
176 digest: [u8; 32],
177 size: u32,
178 compressed_size: u32,
179 is_duplicate: bool,
180 ) -> Result<(), Error> {
181 let mut state = self.state.lock().unwrap();
182
183 state.ensure_unfinished()?;
184
185 let mut data = match state.fixed_writers.get_mut(&wid) {
186 Some(data) => data,
187 None => bail!("fixed writer '{}' not registered", wid),
188 };
189
5e04ec70
DM
190 if size > data.chunk_size {
191 bail!("fixed writer '{}' - got large chunk ({} > {}", data.name, size, data.chunk_size);
192 } else if size < data.chunk_size {
193 data.small_chunk_count += 1;
194 if data.small_chunk_count > 1 {
195 bail!("fixed writer '{}' - detected multiple end chunks (chunk size too small)");
196 }
642322b4
DM
197 }
198
96482891
DM
199 // record statistics
200 data.upload_stat.count += 1;
201 data.upload_stat.size += size as u64;
202 data.upload_stat.compressed_size += compressed_size as u64;
203 if is_duplicate { data.upload_stat.duplicates += 1; }
204
205 // register chunk
642322b4
DM
206 state.known_chunks.insert(digest, size);
207
208 Ok(())
209 }
210
96482891
DM
211 /// Register dynamic length chunks after upload.
212 ///
213 /// Like `register_chunk()`, but additionally record statistics for
214 /// the dynamic index writer.
642322b4
DM
215 pub fn register_dynamic_chunk(
216 &self,
217 wid: usize,
218 digest: [u8; 32],
219 size: u32,
220 compressed_size: u32,
221 is_duplicate: bool,
222 ) -> Result<(), Error> {
223 let mut state = self.state.lock().unwrap();
224
225 state.ensure_unfinished()?;
226
227 let mut data = match state.dynamic_writers.get_mut(&wid) {
228 Some(data) => data,
229 None => bail!("dynamic writer '{}' not registered", wid),
230 };
231
96482891
DM
232 // record statistics
233 data.upload_stat.count += 1;
234 data.upload_stat.size += size as u64;
235 data.upload_stat.compressed_size += compressed_size as u64;
236 if is_duplicate { data.upload_stat.duplicates += 1; }
237
238 // register chunk
642322b4
DM
239 state.known_chunks.insert(digest, size);
240
241 Ok(())
242 }
243
a09c0e38
DM
244 pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
245 let state = self.state.lock().unwrap();
246
247 match state.known_chunks.get(digest) {
248 Some(len) => Some(*len),
249 None => None,
250 }
251 }
252
372724af 253 /// Store the writer with an unique ID
8bea85b4 254 pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
f9578f3c 255 let mut state = self.state.lock().unwrap();
f9578f3c 256
372724af
DM
257 state.ensure_unfinished()?;
258
259 let uid = state.next_uid();
f9578f3c 260
8bea85b4 261 state.dynamic_writers.insert(uid, DynamicWriterState {
96482891 262 index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(),
8bea85b4 263 });
372724af
DM
264
265 Ok(uid)
f9578f3c
DM
266 }
267
a42fa400 268 /// Store the writer with an unique ID
facd9801 269 pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32, incremental: bool) -> Result<usize, Error> {
a42fa400
DM
270 let mut state = self.state.lock().unwrap();
271
272 state.ensure_unfinished()?;
273
274 let uid = state.next_uid();
275
276 state.fixed_writers.insert(uid, FixedWriterState {
facd9801 277 index, name, chunk_count: 0, size, chunk_size, small_chunk_count: 0, upload_stat: UploadStatistic::new(), incremental,
a42fa400
DM
278 });
279
280 Ok(uid)
281 }
282
f9578f3c 283 /// Append chunk to dynamic writer
417cb073 284 pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
f9578f3c
DM
285 let mut state = self.state.lock().unwrap();
286
372724af
DM
287 state.ensure_unfinished()?;
288
f9578f3c
DM
289 let mut data = match state.dynamic_writers.get_mut(&wid) {
290 Some(data) => data,
291 None => bail!("dynamic writer '{}' not registered", wid),
292 };
293
f9578f3c 294
417cb073
DM
295 if data.offset != offset {
296 bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
297 data.name, data.offset, offset);
298 }
299
3dc5b2a2
DM
300 data.offset += size as u64;
301 data.chunk_count += 1;
302
8bea85b4 303 data.index.add_chunk(data.offset, digest)?;
f9578f3c
DM
304
305 Ok(())
306 }
307
a42fa400
DM
308 /// Append chunk to fixed writer
309 pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
310 let mut state = self.state.lock().unwrap();
311
312 state.ensure_unfinished()?;
313
314 let mut data = match state.fixed_writers.get_mut(&wid) {
315 Some(data) => data,
316 None => bail!("fixed writer '{}' not registered", wid),
317 };
318
5e04ec70
DM
319 let end = (offset as usize) + (size as usize);
320 let idx = data.index.check_chunk_alignment(end, size as usize)?;
a42fa400 321
5e04ec70 322 data.chunk_count += 1;
a42fa400 323
5e04ec70 324 data.index.add_digest(idx, digest)?;
a42fa400
DM
325
326 Ok(())
327 }
328
00388226 329 fn log_upload_stat(&self, archive_name: &str, csum: &[u8; 32], uuid: &[u8; 16], size: u64, chunk_count: u64, upload_stat: &UploadStatistic) {
96482891 330 self.log(format!("Upload statistics for '{}'", archive_name));
e18a6c9e
DM
331 self.log(format!("UUID: {}", digest_to_hex(uuid)));
332 self.log(format!("Checksum: {}", digest_to_hex(csum)));
96482891
DM
333 self.log(format!("Size: {}", size));
334 self.log(format!("Chunk count: {}", chunk_count));
36075475
DM
335
336 if size == 0 || chunk_count == 0 {
337 return;
338 }
339
96482891 340 self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size));
36075475 341
8268c9d1
SR
342 // account for zero chunk, which might be uploaded but never used
343 let client_side_duplicates = if chunk_count < upload_stat.count {
344 0
345 } else {
346 chunk_count - upload_stat.count
347 };
348
36075475
DM
349 let server_side_duplicates = upload_stat.duplicates;
350
351 if (client_side_duplicates + server_side_duplicates) > 0 {
352 let per = (client_side_duplicates + server_side_duplicates)*100/chunk_count;
353 self.log(format!("Duplicates: {}+{} ({}%)", client_side_duplicates, server_side_duplicates, per));
354 }
355
96482891 356 if upload_stat.size > 0 {
36075475 357 self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size));
96482891
DM
358 }
359 }
360
a2077252 361 /// Close dynamic writer
fb6026b6 362 pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
a2077252
DM
363 let mut state = self.state.lock().unwrap();
364
372724af
DM
365 state.ensure_unfinished()?;
366
a2077252
DM
367 let mut data = match state.dynamic_writers.remove(&wid) {
368 Some(data) => data,
369 None => bail!("dynamic writer '{}' not registered", wid),
370 };
371
8bea85b4
DM
372 if data.chunk_count != chunk_count {
373 bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
374 }
375
376 if data.offset != size {
377 bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
378 }
379
00388226
DM
380 let uuid = data.index.uuid;
381
fb6026b6
DM
382 let expected_csum = data.index.close()?;
383
fb6026b6
DM
384 if csum != expected_csum {
385 bail!("dynamic writer '{}' close failed - got unexpected checksum", data.name);
386 }
a2077252 387
00388226 388 self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 389
e6389f4e 390 state.file_counter += 1;
c8fff67d
DM
391 state.backup_size += size;
392 state.backup_stat = state.backup_stat + data.upload_stat;
e6389f4e 393
a2077252
DM
394 Ok(())
395 }
396
a42fa400 397 /// Close fixed writer
fb6026b6 398 pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
a42fa400
DM
399 let mut state = self.state.lock().unwrap();
400
401 state.ensure_unfinished()?;
402
403 let mut data = match state.fixed_writers.remove(&wid) {
404 Some(data) => data,
405 None => bail!("fixed writer '{}' not registered", wid),
406 };
407
408 if data.chunk_count != chunk_count {
006f3ff4 409 bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
a42fa400
DM
410 }
411
facd9801
SR
412 if !data.incremental {
413 let expected_count = data.index.index_length();
006f3ff4 414
facd9801
SR
415 if chunk_count != (expected_count as u64) {
416 bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count);
417 }
006f3ff4 418
facd9801
SR
419 if size != (data.size as u64) {
420 bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
421 }
006f3ff4 422 }
a42fa400 423
00388226 424 let uuid = data.index.uuid;
fb6026b6 425 let expected_csum = data.index.close()?;
a42fa400 426
facd9801 427 println!("server checksum: {:?} client: {:?} (incremental: {})", expected_csum, csum, data.incremental);
fb6026b6
DM
428 if csum != expected_csum {
429 bail!("fixed writer '{}' close failed - got unexpected checksum", data.name);
430 }
431
432 self.log_upload_stat(&data.name, &expected_csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 433
e6389f4e 434 state.file_counter += 1;
c8fff67d
DM
435 state.backup_size += size;
436 state.backup_stat = state.backup_stat + data.upload_stat;
e6389f4e 437
a42fa400
DM
438 Ok(())
439 }
440
46bd8800
DM
441 pub fn add_blob(&self, file_name: &str, data: Vec<u8>) -> Result<(), Error> {
442
443 let mut path = self.datastore.base_path();
444 path.push(self.backup_dir.relative_path());
445 path.push(file_name);
446
447 let blob_len = data.len();
448 let orig_len = data.len(); // fixme:
449
39f18b30
DM
450 // always verify blob/CRC at server side
451 let blob = DataBlob::load_from_reader(&mut &data[..])?;
46bd8800
DM
452
453 let raw_data = blob.raw_data();
feaa1ad3 454 replace_file(&path, raw_data, CreateOptions::new())?;
46bd8800
DM
455
456 self.log(format!("add blob {:?} ({} bytes, comp: {})", path, orig_len, blob_len));
457
458 let mut state = self.state.lock().unwrap();
459 state.file_counter += 1;
c8fff67d
DM
460 state.backup_size += orig_len as u64;
461 state.backup_stat.size += blob_len as u64;
46bd8800
DM
462
463 Ok(())
464 }
465
372724af
DM
466 /// Mark backup as finished
467 pub fn finish_backup(&self) -> Result<(), Error> {
468 let mut state = self.state.lock().unwrap();
469 // test if all writer are correctly closed
470
471 state.ensure_unfinished()?;
472
372724af
DM
473 if state.dynamic_writers.len() != 0 {
474 bail!("found open index writer - unable to finish backup");
475 }
476
e6389f4e
DM
477 if state.file_counter == 0 {
478 bail!("backup does not contain valid files (file count == 0)");
479 }
480
9fa55e09
DM
481 state.finished = true;
482
483 // check manifest
c8fff67d 484 let mut manifest = self.datastore.load_manifest_json(&self.backup_dir)
9fa55e09
DM
485 .map_err(|err| format_err!("unable to load manifest blob - {}", err))?;
486
c8fff67d
DM
487 let stats = serde_json::to_value(state.backup_stat)?;
488
489 manifest["unprotected"]["chunk_upload_stats"] = stats;
490
491 self.datastore.store_manifest(&self.backup_dir, manifest)
492 .map_err(|err| format_err!("unable to store manifest blob - {}", err))?;
9fa55e09 493
d53fbe24
SR
494 for snap in &state.base_snapshots {
495 let path = self.datastore.snapshot_path(snap);
496 if !path.exists() {
497 bail!(
498 "base snapshot {} was removed during backup, cannot finish as chunks might be missing",
499 snap
500 );
501 }
502 }
503
476b4aca 504
372724af
DM
505 Ok(())
506 }
507
d95ced64
DM
508 pub fn log<S: AsRef<str>>(&self, msg: S) {
509 self.worker.log(msg);
510 }
b4b63e52 511
a42d1f55
DM
512 pub fn debug<S: AsRef<str>>(&self, msg: S) {
513 if self.debug { self.worker.log(msg); }
514 }
515
b4b63e52
DM
516 pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
517 match result {
518 Ok(data) => (self.formatter.format_data)(data, self),
519 Err(err) => (self.formatter.format_error)(err),
520 }
521 }
372724af
DM
522
523 /// Raise error if finished flag is not set
524 pub fn ensure_finished(&self) -> Result<(), Error> {
525 let state = self.state.lock().unwrap();
526 if !state.finished {
527 bail!("backup ended but finished flag is not set.");
528 }
529 Ok(())
530 }
531
532 /// Remove complete backup
533 pub fn remove_backup(&self) -> Result<(), Error> {
534 let mut state = self.state.lock().unwrap();
535 state.finished = true;
536
c9756b40 537 self.datastore.remove_backup_dir(&self.backup_dir, true)?;
372724af
DM
538
539 Ok(())
540 }
d95ced64
DM
541}
542
543impl RpcEnvironment for BackupEnvironment {
544
e8d1da6a
DM
545 fn result_attrib_mut(&mut self) -> &mut Value {
546 &mut self.result_attributes
d95ced64
DM
547 }
548
e8d1da6a
DM
549 fn result_attrib(&self) -> &Value {
550 &self.result_attributes
d95ced64
DM
551 }
552
553 fn env_type(&self) -> RpcEnvironmentType {
554 self.env_type
555 }
556
557 fn set_user(&mut self, _user: Option<String>) {
558 panic!("unable to change user");
559 }
560
561 fn get_user(&self) -> Option<String> {
562 Some(self.user.clone())
563 }
564}
21ee7912 565
dd5495d6 566impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
21ee7912
DM
567 fn as_ref(&self) -> &BackupEnvironment {
568 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
569 }
570}
dd5495d6
WB
571
572impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
b4b63e52
DM
573 fn as_ref(&self) -> &BackupEnvironment {
574 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
575 }
576}