]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/backup/environment.rs
src/api2/reader.rs: fix speedtest description
[proxmox-backup.git] / src / api2 / backup / environment.rs
CommitLineData
9fa55e09 1use anyhow::{bail, format_err, Error};
f9578f3c 2use std::sync::{Arc, Mutex};
8b5f72b1 3use std::collections::HashMap;
d95ced64 4
c8fff67d 5use ::serde::{Serialize};
e8d1da6a 6use serde_json::{json, Value};
d95ced64 7
feaa1ad3
WB
8use proxmox::tools::digest_to_hex;
9use proxmox::tools::fs::{replace_file, CreateOptions};
a2479cfa 10use proxmox::api::{RpcEnvironment, RpcEnvironmentType};
00388226 11
d10332a1 12use crate::api2::types::{Userid, SnapshotVerifyState, VerifyState};
21ee7912 13use crate::backup::*;
e7cb4dc5 14use crate::server::WorkerTask;
b4b63e52
DM
15use crate::server::formatter::*;
16use hyper::{Body, Response};
d95ced64 17
c8fff67d 18#[derive(Copy, Clone, Serialize)]
96482891
DM
19struct UploadStatistic {
20 count: u64,
21 size: u64,
22 compressed_size: u64,
23 duplicates: u64,
24}
25
26impl UploadStatistic {
27 fn new() -> Self {
28 Self {
29 count: 0,
30 size: 0,
31 compressed_size: 0,
32 duplicates: 0,
33 }
34 }
35}
8bea85b4 36
c8fff67d
DM
37impl std::ops::Add for UploadStatistic {
38 type Output = Self;
39
40 fn add(self, other: Self) -> Self {
41 Self {
42 count: self.count + other.count,
43 size: self.size + other.size,
44 compressed_size: self.compressed_size + other.compressed_size,
45 duplicates: self.duplicates + other.duplicates,
46 }
47 }
48}
49
8bea85b4
DM
50struct DynamicWriterState {
51 name: String,
52 index: DynamicIndexWriter,
53 offset: u64,
54 chunk_count: u64,
96482891 55 upload_stat: UploadStatistic,
8bea85b4
DM
56}
57
a42fa400
DM
58struct FixedWriterState {
59 name: String,
60 index: FixedIndexWriter,
61 size: usize,
62 chunk_size: u32,
63 chunk_count: u64,
5e04ec70 64 small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller)
96482891 65 upload_stat: UploadStatistic,
facd9801 66 incremental: bool,
a42fa400
DM
67}
68
43772efc
SR
69// key=digest, value=(length, existance checked)
70type KnownChunksMap = HashMap<[u8;32], (u32, bool)>;
71
f9578f3c 72struct SharedBackupState {
372724af 73 finished: bool,
f9578f3c 74 uid_counter: usize,
add5861e 75 file_counter: usize, // successfully uploaded files
8bea85b4 76 dynamic_writers: HashMap<usize, DynamicWriterState>,
a42fa400 77 fixed_writers: HashMap<usize, FixedWriterState>,
43772efc 78 known_chunks: KnownChunksMap,
c8fff67d
DM
79 backup_size: u64, // sums up size of all files
80 backup_stat: UploadStatistic,
f9578f3c
DM
81}
82
372724af
DM
83impl SharedBackupState {
84
85 // Raise error if finished flag is set
86 fn ensure_unfinished(&self) -> Result<(), Error> {
87 if self.finished {
88 bail!("backup already marked as finished.");
89 }
90 Ok(())
91 }
92
93 // Get an unique integer ID
94 pub fn next_uid(&mut self) -> usize {
95 self.uid_counter += 1;
96 self.uid_counter
97 }
98}
99
100
d95ced64
DM
101/// `RpcEnvironmet` implementation for backup service
102#[derive(Clone)]
103pub struct BackupEnvironment {
104 env_type: RpcEnvironmentType,
e8d1da6a 105 result_attributes: Value,
e7cb4dc5 106 user: Userid,
a42d1f55 107 pub debug: bool,
b4b63e52 108 pub formatter: &'static OutputFormatter,
21ee7912
DM
109 pub worker: Arc<WorkerTask>,
110 pub datastore: Arc<DataStore>,
f9578f3c 111 pub backup_dir: BackupDir,
b02a52e3 112 pub last_backup: Option<BackupInfo>,
f9578f3c 113 state: Arc<Mutex<SharedBackupState>>
d95ced64
DM
114}
115
116impl BackupEnvironment {
f9578f3c
DM
117 pub fn new(
118 env_type: RpcEnvironmentType,
e7cb4dc5 119 user: Userid,
f9578f3c
DM
120 worker: Arc<WorkerTask>,
121 datastore: Arc<DataStore>,
122 backup_dir: BackupDir,
f9578f3c
DM
123 ) -> Self {
124
125 let state = SharedBackupState {
372724af 126 finished: false,
f9578f3c 127 uid_counter: 0,
e6389f4e 128 file_counter: 0,
f9578f3c 129 dynamic_writers: HashMap::new(),
a42fa400 130 fixed_writers: HashMap::new(),
a09c0e38 131 known_chunks: HashMap::new(),
c8fff67d
DM
132 backup_size: 0,
133 backup_stat: UploadStatistic::new(),
f9578f3c
DM
134 };
135
d95ced64 136 Self {
e8d1da6a 137 result_attributes: json!({}),
d95ced64
DM
138 env_type,
139 user,
140 worker,
21ee7912 141 datastore,
a42d1f55 142 debug: false,
b4b63e52 143 formatter: &JSON_FORMATTER,
f9578f3c 144 backup_dir,
b02a52e3 145 last_backup: None,
f9578f3c 146 state: Arc::new(Mutex::new(state)),
d95ced64
DM
147 }
148 }
149
96482891
DM
150 /// Register a Chunk with associated length.
151 ///
152 /// We do not fully trust clients, so a client may only use registered
153 /// chunks. Please use this method to register chunks from previous backups.
a09c0e38
DM
154 pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
155 let mut state = self.state.lock().unwrap();
156
157 state.ensure_unfinished()?;
158
43772efc 159 state.known_chunks.insert(digest, (length, false));
a09c0e38
DM
160
161 Ok(())
162 }
163
96482891
DM
164 /// Register fixed length chunks after upload.
165 ///
166 /// Like `register_chunk()`, but additionally record statistics for
167 /// the fixed index writer.
642322b4
DM
168 pub fn register_fixed_chunk(
169 &self,
170 wid: usize,
171 digest: [u8; 32],
172 size: u32,
173 compressed_size: u32,
174 is_duplicate: bool,
175 ) -> Result<(), Error> {
176 let mut state = self.state.lock().unwrap();
177
178 state.ensure_unfinished()?;
179
180 let mut data = match state.fixed_writers.get_mut(&wid) {
181 Some(data) => data,
182 None => bail!("fixed writer '{}' not registered", wid),
183 };
184
5e04ec70
DM
185 if size > data.chunk_size {
186 bail!("fixed writer '{}' - got large chunk ({} > {}", data.name, size, data.chunk_size);
187 } else if size < data.chunk_size {
188 data.small_chunk_count += 1;
189 if data.small_chunk_count > 1 {
190 bail!("fixed writer '{}' - detected multiple end chunks (chunk size too small)");
191 }
642322b4
DM
192 }
193
96482891
DM
194 // record statistics
195 data.upload_stat.count += 1;
196 data.upload_stat.size += size as u64;
197 data.upload_stat.compressed_size += compressed_size as u64;
198 if is_duplicate { data.upload_stat.duplicates += 1; }
199
200 // register chunk
43772efc 201 state.known_chunks.insert(digest, (size, true));
642322b4
DM
202
203 Ok(())
204 }
205
96482891
DM
206 /// Register dynamic length chunks after upload.
207 ///
208 /// Like `register_chunk()`, but additionally record statistics for
209 /// the dynamic index writer.
642322b4
DM
210 pub fn register_dynamic_chunk(
211 &self,
212 wid: usize,
213 digest: [u8; 32],
214 size: u32,
215 compressed_size: u32,
216 is_duplicate: bool,
217 ) -> Result<(), Error> {
218 let mut state = self.state.lock().unwrap();
219
220 state.ensure_unfinished()?;
221
222 let mut data = match state.dynamic_writers.get_mut(&wid) {
223 Some(data) => data,
224 None => bail!("dynamic writer '{}' not registered", wid),
225 };
226
96482891
DM
227 // record statistics
228 data.upload_stat.count += 1;
229 data.upload_stat.size += size as u64;
230 data.upload_stat.compressed_size += compressed_size as u64;
231 if is_duplicate { data.upload_stat.duplicates += 1; }
232
233 // register chunk
43772efc 234 state.known_chunks.insert(digest, (size, true));
642322b4
DM
235
236 Ok(())
237 }
238
a09c0e38
DM
239 pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
240 let state = self.state.lock().unwrap();
241
242 match state.known_chunks.get(digest) {
43772efc 243 Some((len, _)) => Some(*len),
a09c0e38
DM
244 None => None,
245 }
246 }
247
372724af 248 /// Store the writer with an unique ID
8bea85b4 249 pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
f9578f3c 250 let mut state = self.state.lock().unwrap();
f9578f3c 251
372724af
DM
252 state.ensure_unfinished()?;
253
254 let uid = state.next_uid();
f9578f3c 255
8bea85b4 256 state.dynamic_writers.insert(uid, DynamicWriterState {
96482891 257 index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(),
8bea85b4 258 });
372724af
DM
259
260 Ok(uid)
f9578f3c
DM
261 }
262
a42fa400 263 /// Store the writer with an unique ID
facd9801 264 pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32, incremental: bool) -> Result<usize, Error> {
a42fa400
DM
265 let mut state = self.state.lock().unwrap();
266
267 state.ensure_unfinished()?;
268
269 let uid = state.next_uid();
270
271 state.fixed_writers.insert(uid, FixedWriterState {
facd9801 272 index, name, chunk_count: 0, size, chunk_size, small_chunk_count: 0, upload_stat: UploadStatistic::new(), incremental,
a42fa400
DM
273 });
274
275 Ok(uid)
276 }
277
f9578f3c 278 /// Append chunk to dynamic writer
417cb073 279 pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
f9578f3c
DM
280 let mut state = self.state.lock().unwrap();
281
372724af
DM
282 state.ensure_unfinished()?;
283
f9578f3c
DM
284 let mut data = match state.dynamic_writers.get_mut(&wid) {
285 Some(data) => data,
286 None => bail!("dynamic writer '{}' not registered", wid),
287 };
288
f9578f3c 289
417cb073
DM
290 if data.offset != offset {
291 bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
292 data.name, data.offset, offset);
293 }
294
3dc5b2a2
DM
295 data.offset += size as u64;
296 data.chunk_count += 1;
297
8bea85b4 298 data.index.add_chunk(data.offset, digest)?;
f9578f3c
DM
299
300 Ok(())
301 }
302
a42fa400
DM
303 /// Append chunk to fixed writer
304 pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
305 let mut state = self.state.lock().unwrap();
306
307 state.ensure_unfinished()?;
308
309 let mut data = match state.fixed_writers.get_mut(&wid) {
310 Some(data) => data,
311 None => bail!("fixed writer '{}' not registered", wid),
312 };
313
5e04ec70
DM
314 let end = (offset as usize) + (size as usize);
315 let idx = data.index.check_chunk_alignment(end, size as usize)?;
a42fa400 316
5e04ec70 317 data.chunk_count += 1;
a42fa400 318
5e04ec70 319 data.index.add_digest(idx, digest)?;
a42fa400
DM
320
321 Ok(())
322 }
323
00388226 324 fn log_upload_stat(&self, archive_name: &str, csum: &[u8; 32], uuid: &[u8; 16], size: u64, chunk_count: u64, upload_stat: &UploadStatistic) {
96482891 325 self.log(format!("Upload statistics for '{}'", archive_name));
e18a6c9e
DM
326 self.log(format!("UUID: {}", digest_to_hex(uuid)));
327 self.log(format!("Checksum: {}", digest_to_hex(csum)));
96482891
DM
328 self.log(format!("Size: {}", size));
329 self.log(format!("Chunk count: {}", chunk_count));
36075475
DM
330
331 if size == 0 || chunk_count == 0 {
332 return;
333 }
334
96482891 335 self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size));
36075475 336
8268c9d1
SR
337 // account for zero chunk, which might be uploaded but never used
338 let client_side_duplicates = if chunk_count < upload_stat.count {
339 0
340 } else {
341 chunk_count - upload_stat.count
342 };
343
36075475
DM
344 let server_side_duplicates = upload_stat.duplicates;
345
346 if (client_side_duplicates + server_side_duplicates) > 0 {
347 let per = (client_side_duplicates + server_side_duplicates)*100/chunk_count;
348 self.log(format!("Duplicates: {}+{} ({}%)", client_side_duplicates, server_side_duplicates, per));
349 }
350
96482891 351 if upload_stat.size > 0 {
36075475 352 self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size));
96482891
DM
353 }
354 }
355
a2077252 356 /// Close dynamic writer
fb6026b6 357 pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
a2077252
DM
358 let mut state = self.state.lock().unwrap();
359
372724af
DM
360 state.ensure_unfinished()?;
361
a2077252
DM
362 let mut data = match state.dynamic_writers.remove(&wid) {
363 Some(data) => data,
364 None => bail!("dynamic writer '{}' not registered", wid),
365 };
366
8bea85b4
DM
367 if data.chunk_count != chunk_count {
368 bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
369 }
370
371 if data.offset != size {
372 bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
373 }
374
00388226
DM
375 let uuid = data.index.uuid;
376
fb6026b6
DM
377 let expected_csum = data.index.close()?;
378
fb6026b6
DM
379 if csum != expected_csum {
380 bail!("dynamic writer '{}' close failed - got unexpected checksum", data.name);
381 }
a2077252 382
00388226 383 self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 384
e6389f4e 385 state.file_counter += 1;
c8fff67d
DM
386 state.backup_size += size;
387 state.backup_stat = state.backup_stat + data.upload_stat;
e6389f4e 388
a2077252
DM
389 Ok(())
390 }
391
a42fa400 392 /// Close fixed writer
fb6026b6 393 pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
a42fa400
DM
394 let mut state = self.state.lock().unwrap();
395
396 state.ensure_unfinished()?;
397
398 let mut data = match state.fixed_writers.remove(&wid) {
399 Some(data) => data,
400 None => bail!("fixed writer '{}' not registered", wid),
401 };
402
403 if data.chunk_count != chunk_count {
006f3ff4 404 bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
a42fa400
DM
405 }
406
facd9801
SR
407 if !data.incremental {
408 let expected_count = data.index.index_length();
006f3ff4 409
facd9801
SR
410 if chunk_count != (expected_count as u64) {
411 bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count);
412 }
006f3ff4 413
facd9801
SR
414 if size != (data.size as u64) {
415 bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
416 }
006f3ff4 417 }
a42fa400 418
00388226 419 let uuid = data.index.uuid;
fb6026b6 420 let expected_csum = data.index.close()?;
a42fa400 421
fb6026b6
DM
422 if csum != expected_csum {
423 bail!("fixed writer '{}' close failed - got unexpected checksum", data.name);
424 }
425
426 self.log_upload_stat(&data.name, &expected_csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 427
e6389f4e 428 state.file_counter += 1;
c8fff67d
DM
429 state.backup_size += size;
430 state.backup_stat = state.backup_stat + data.upload_stat;
e6389f4e 431
a42fa400
DM
432 Ok(())
433 }
434
46bd8800
DM
435 pub fn add_blob(&self, file_name: &str, data: Vec<u8>) -> Result<(), Error> {
436
437 let mut path = self.datastore.base_path();
438 path.push(self.backup_dir.relative_path());
439 path.push(file_name);
440
441 let blob_len = data.len();
442 let orig_len = data.len(); // fixme:
443
39f18b30
DM
444 // always verify blob/CRC at server side
445 let blob = DataBlob::load_from_reader(&mut &data[..])?;
46bd8800
DM
446
447 let raw_data = blob.raw_data();
feaa1ad3 448 replace_file(&path, raw_data, CreateOptions::new())?;
46bd8800
DM
449
450 self.log(format!("add blob {:?} ({} bytes, comp: {})", path, orig_len, blob_len));
451
452 let mut state = self.state.lock().unwrap();
453 state.file_counter += 1;
c8fff67d
DM
454 state.backup_size += orig_len as u64;
455 state.backup_stat.size += blob_len as u64;
46bd8800
DM
456
457 Ok(())
458 }
459
43772efc
SR
460 /// Ensure all chunks referenced in this backup actually exist.
461 /// Only call *after* all writers have been closed, to avoid race with GC.
462 /// In case of error, mark the previous backup as 'verify failed'.
463 fn verify_chunk_existance(&self, known_chunks: &KnownChunksMap) -> Result<(), Error> {
464 for (digest, (_, checked)) in known_chunks.iter() {
465 if !checked && !self.datastore.chunk_path(digest).0.exists() {
466 let mark_msg = if let Some(ref last_backup) = self.last_backup {
467 let last_dir = &last_backup.backup_dir;
468 let verify_state = SnapshotVerifyState {
d10332a1 469 state: VerifyState::Failed,
43772efc
SR
470 upid: self.worker.upid().clone(),
471 };
472
473 let res = proxmox::try_block!{
474 let (mut manifest, _) = self.datastore.load_manifest(last_dir)?;
475 manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?;
476 self.datastore.store_manifest(last_dir, serde_json::to_value(manifest)?)
477 };
478
479 if let Err(err) = res {
480 format!("tried marking previous snapshot as bad, \
481 but got error accessing manifest: {}", err)
482 } else {
483 "marked previous snapshot as bad, please use \
484 'verify' for a detailed check".to_owned()
485 }
486 } else {
487 "internal error: no base backup registered to mark invalid".to_owned()
488 };
489
490 bail!(
491 "chunk '{}' was attempted to be reused but doesn't exist - {}",
492 digest_to_hex(digest),
493 mark_msg
494 );
495 }
496 }
497
498 Ok(())
499 }
500
372724af
DM
501 /// Mark backup as finished
502 pub fn finish_backup(&self) -> Result<(), Error> {
503 let mut state = self.state.lock().unwrap();
372724af
DM
504
505 state.ensure_unfinished()?;
506
3cddfb29
SR
507 // test if all writer are correctly closed
508 if state.dynamic_writers.len() != 0 || state.fixed_writers.len() != 0 {
372724af
DM
509 bail!("found open index writer - unable to finish backup");
510 }
511
e6389f4e
DM
512 if state.file_counter == 0 {
513 bail!("backup does not contain valid files (file count == 0)");
514 }
515
9fa55e09 516 // check manifest
c8fff67d 517 let mut manifest = self.datastore.load_manifest_json(&self.backup_dir)
9fa55e09
DM
518 .map_err(|err| format_err!("unable to load manifest blob - {}", err))?;
519
c8fff67d
DM
520 let stats = serde_json::to_value(state.backup_stat)?;
521
522 manifest["unprotected"]["chunk_upload_stats"] = stats;
523
524 self.datastore.store_manifest(&self.backup_dir, manifest)
525 .map_err(|err| format_err!("unable to store manifest blob - {}", err))?;
9fa55e09 526
81f29351
SR
527 if let Some(base) = &self.last_backup {
528 let path = self.datastore.snapshot_path(&base.backup_dir);
529 if !path.exists() {
530 bail!(
531 "base snapshot {} was removed during backup, cannot finish as chunks might be missing",
532 base.backup_dir
533 );
534 }
535 }
536
43772efc
SR
537 self.verify_chunk_existance(&state.known_chunks)?;
538
d74edc3d
SR
539 // marks the backup as successful
540 state.finished = true;
476b4aca 541
372724af
DM
542 Ok(())
543 }
544
d95ced64
DM
545 pub fn log<S: AsRef<str>>(&self, msg: S) {
546 self.worker.log(msg);
547 }
b4b63e52 548
a42d1f55
DM
549 pub fn debug<S: AsRef<str>>(&self, msg: S) {
550 if self.debug { self.worker.log(msg); }
551 }
552
b4b63e52
DM
553 pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
554 match result {
555 Ok(data) => (self.formatter.format_data)(data, self),
556 Err(err) => (self.formatter.format_error)(err),
557 }
558 }
372724af
DM
559
560 /// Raise error if finished flag is not set
561 pub fn ensure_finished(&self) -> Result<(), Error> {
562 let state = self.state.lock().unwrap();
563 if !state.finished {
564 bail!("backup ended but finished flag is not set.");
565 }
566 Ok(())
567 }
568
569 /// Remove complete backup
570 pub fn remove_backup(&self) -> Result<(), Error> {
571 let mut state = self.state.lock().unwrap();
572 state.finished = true;
573
c9756b40 574 self.datastore.remove_backup_dir(&self.backup_dir, true)?;
372724af
DM
575
576 Ok(())
577 }
d95ced64
DM
578}
579
580impl RpcEnvironment for BackupEnvironment {
581
e8d1da6a
DM
582 fn result_attrib_mut(&mut self) -> &mut Value {
583 &mut self.result_attributes
d95ced64
DM
584 }
585
e8d1da6a
DM
586 fn result_attrib(&self) -> &Value {
587 &self.result_attributes
d95ced64
DM
588 }
589
590 fn env_type(&self) -> RpcEnvironmentType {
591 self.env_type
592 }
593
594 fn set_user(&mut self, _user: Option<String>) {
595 panic!("unable to change user");
596 }
597
598 fn get_user(&self) -> Option<String> {
e7cb4dc5 599 Some(self.user.to_string())
d95ced64
DM
600 }
601}
21ee7912 602
dd5495d6 603impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
21ee7912
DM
604 fn as_ref(&self) -> &BackupEnvironment {
605 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
606 }
607}
dd5495d6
WB
608
609impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
b4b63e52
DM
610 fn as_ref(&self) -> &BackupEnvironment {
611 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
612 }
613}