]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup/environment.rs
make datastore BackupGroup/Dir ctors private
[proxmox-backup.git] / src / api2 / backup / environment.rs
1 use anyhow::{bail, format_err, Error};
2 use nix::dir::Dir;
3 use std::collections::HashMap;
4 use std::sync::{Arc, Mutex};
5
6 use ::serde::Serialize;
7 use serde_json::{json, Value};
8
9 use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
10 use proxmox_sys::fs::{replace_file, CreateOptions};
11
12 use pbs_api_types::Authid;
13 use pbs_datastore::backup_info::{BackupDir, BackupInfo};
14 use pbs_datastore::dynamic_index::DynamicIndexWriter;
15 use pbs_datastore::fixed_index::FixedIndexWriter;
16 use pbs_datastore::{DataBlob, DataStore};
17 use proxmox_rest_server::{formatter::*, WorkerTask};
18
19 use crate::backup::verify_backup_dir_with_lock;
20
21 use hyper::{Body, Response};
22
23 #[derive(Copy, Clone, Serialize)]
24 struct UploadStatistic {
25 count: u64,
26 size: u64,
27 compressed_size: u64,
28 duplicates: u64,
29 }
30
31 impl UploadStatistic {
32 fn new() -> Self {
33 Self {
34 count: 0,
35 size: 0,
36 compressed_size: 0,
37 duplicates: 0,
38 }
39 }
40 }
41
42 impl std::ops::Add for UploadStatistic {
43 type Output = Self;
44
45 fn add(self, other: Self) -> Self {
46 Self {
47 count: self.count + other.count,
48 size: self.size + other.size,
49 compressed_size: self.compressed_size + other.compressed_size,
50 duplicates: self.duplicates + other.duplicates,
51 }
52 }
53 }
54
55 struct DynamicWriterState {
56 name: String,
57 index: DynamicIndexWriter,
58 offset: u64,
59 chunk_count: u64,
60 upload_stat: UploadStatistic,
61 }
62
63 struct FixedWriterState {
64 name: String,
65 index: FixedIndexWriter,
66 size: usize,
67 chunk_size: u32,
68 chunk_count: u64,
69 small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller)
70 upload_stat: UploadStatistic,
71 incremental: bool,
72 }
73
74 // key=digest, value=length
75 type KnownChunksMap = HashMap<[u8; 32], u32>;
76
77 struct SharedBackupState {
78 finished: bool,
79 uid_counter: usize,
80 file_counter: usize, // successfully uploaded files
81 dynamic_writers: HashMap<usize, DynamicWriterState>,
82 fixed_writers: HashMap<usize, FixedWriterState>,
83 known_chunks: KnownChunksMap,
84 backup_size: u64, // sums up size of all files
85 backup_stat: UploadStatistic,
86 }
87
88 impl SharedBackupState {
89 // Raise error if finished flag is set
90 fn ensure_unfinished(&self) -> Result<(), Error> {
91 if self.finished {
92 bail!("backup already marked as finished.");
93 }
94 Ok(())
95 }
96
97 // Get an unique integer ID
98 pub fn next_uid(&mut self) -> usize {
99 self.uid_counter += 1;
100 self.uid_counter
101 }
102 }
103
104 /// `RpcEnvironmet` implementation for backup service
105 #[derive(Clone)]
106 pub struct BackupEnvironment {
107 env_type: RpcEnvironmentType,
108 result_attributes: Value,
109 auth_id: Authid,
110 pub debug: bool,
111 pub formatter: &'static dyn OutputFormatter,
112 pub worker: Arc<WorkerTask>,
113 pub datastore: Arc<DataStore>,
114 pub backup_dir: BackupDir,
115 pub last_backup: Option<BackupInfo>,
116 state: Arc<Mutex<SharedBackupState>>,
117 }
118
119 impl BackupEnvironment {
120 pub fn new(
121 env_type: RpcEnvironmentType,
122 auth_id: Authid,
123 worker: Arc<WorkerTask>,
124 datastore: Arc<DataStore>,
125 backup_dir: BackupDir,
126 ) -> Self {
127 let state = SharedBackupState {
128 finished: false,
129 uid_counter: 0,
130 file_counter: 0,
131 dynamic_writers: HashMap::new(),
132 fixed_writers: HashMap::new(),
133 known_chunks: HashMap::new(),
134 backup_size: 0,
135 backup_stat: UploadStatistic::new(),
136 };
137
138 Self {
139 result_attributes: json!({}),
140 env_type,
141 auth_id,
142 worker,
143 datastore,
144 debug: false,
145 formatter: JSON_FORMATTER,
146 backup_dir,
147 last_backup: None,
148 state: Arc::new(Mutex::new(state)),
149 }
150 }
151
152 /// Register a Chunk with associated length.
153 ///
154 /// We do not fully trust clients, so a client may only use registered
155 /// chunks. Please use this method to register chunks from previous backups.
156 pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
157 let mut state = self.state.lock().unwrap();
158
159 state.ensure_unfinished()?;
160
161 state.known_chunks.insert(digest, length);
162
163 Ok(())
164 }
165
166 /// Register fixed length chunks after upload.
167 ///
168 /// Like `register_chunk()`, but additionally record statistics for
169 /// the fixed index writer.
170 pub fn register_fixed_chunk(
171 &self,
172 wid: usize,
173 digest: [u8; 32],
174 size: u32,
175 compressed_size: u32,
176 is_duplicate: bool,
177 ) -> Result<(), Error> {
178 let mut state = self.state.lock().unwrap();
179
180 state.ensure_unfinished()?;
181
182 let mut data = match state.fixed_writers.get_mut(&wid) {
183 Some(data) => data,
184 None => bail!("fixed writer '{}' not registered", wid),
185 };
186
187 if size > data.chunk_size {
188 bail!(
189 "fixed writer '{}' - got large chunk ({} > {}",
190 data.name,
191 size,
192 data.chunk_size
193 );
194 }
195
196 if size < data.chunk_size {
197 data.small_chunk_count += 1;
198 if data.small_chunk_count > 1 {
199 bail!(
200 "fixed writer '{}' - detected multiple end chunks (chunk size too small)",
201 wid
202 );
203 }
204 }
205
206 // record statistics
207 data.upload_stat.count += 1;
208 data.upload_stat.size += size as u64;
209 data.upload_stat.compressed_size += compressed_size as u64;
210 if is_duplicate {
211 data.upload_stat.duplicates += 1;
212 }
213
214 // register chunk
215 state.known_chunks.insert(digest, size);
216
217 Ok(())
218 }
219
220 /// Register dynamic length chunks after upload.
221 ///
222 /// Like `register_chunk()`, but additionally record statistics for
223 /// the dynamic index writer.
224 pub fn register_dynamic_chunk(
225 &self,
226 wid: usize,
227 digest: [u8; 32],
228 size: u32,
229 compressed_size: u32,
230 is_duplicate: bool,
231 ) -> Result<(), Error> {
232 let mut state = self.state.lock().unwrap();
233
234 state.ensure_unfinished()?;
235
236 let mut data = match state.dynamic_writers.get_mut(&wid) {
237 Some(data) => data,
238 None => bail!("dynamic writer '{}' not registered", wid),
239 };
240
241 // record statistics
242 data.upload_stat.count += 1;
243 data.upload_stat.size += size as u64;
244 data.upload_stat.compressed_size += compressed_size as u64;
245 if is_duplicate {
246 data.upload_stat.duplicates += 1;
247 }
248
249 // register chunk
250 state.known_chunks.insert(digest, size);
251
252 Ok(())
253 }
254
255 pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
256 let state = self.state.lock().unwrap();
257
258 state.known_chunks.get(digest).map(|len| *len)
259 }
260
261 /// Store the writer with an unique ID
262 pub fn register_dynamic_writer(
263 &self,
264 index: DynamicIndexWriter,
265 name: String,
266 ) -> Result<usize, Error> {
267 let mut state = self.state.lock().unwrap();
268
269 state.ensure_unfinished()?;
270
271 let uid = state.next_uid();
272
273 state.dynamic_writers.insert(
274 uid,
275 DynamicWriterState {
276 index,
277 name,
278 offset: 0,
279 chunk_count: 0,
280 upload_stat: UploadStatistic::new(),
281 },
282 );
283
284 Ok(uid)
285 }
286
287 /// Store the writer with an unique ID
288 pub fn register_fixed_writer(
289 &self,
290 index: FixedIndexWriter,
291 name: String,
292 size: usize,
293 chunk_size: u32,
294 incremental: bool,
295 ) -> Result<usize, Error> {
296 let mut state = self.state.lock().unwrap();
297
298 state.ensure_unfinished()?;
299
300 let uid = state.next_uid();
301
302 state.fixed_writers.insert(
303 uid,
304 FixedWriterState {
305 index,
306 name,
307 chunk_count: 0,
308 size,
309 chunk_size,
310 small_chunk_count: 0,
311 upload_stat: UploadStatistic::new(),
312 incremental,
313 },
314 );
315
316 Ok(uid)
317 }
318
319 /// Append chunk to dynamic writer
320 pub fn dynamic_writer_append_chunk(
321 &self,
322 wid: usize,
323 offset: u64,
324 size: u32,
325 digest: &[u8; 32],
326 ) -> Result<(), Error> {
327 let mut state = self.state.lock().unwrap();
328
329 state.ensure_unfinished()?;
330
331 let mut data = match state.dynamic_writers.get_mut(&wid) {
332 Some(data) => data,
333 None => bail!("dynamic writer '{}' not registered", wid),
334 };
335
336 if data.offset != offset {
337 bail!(
338 "dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
339 data.name,
340 data.offset,
341 offset
342 );
343 }
344
345 data.offset += size as u64;
346 data.chunk_count += 1;
347
348 data.index.add_chunk(data.offset, digest)?;
349
350 Ok(())
351 }
352
353 /// Append chunk to fixed writer
354 pub fn fixed_writer_append_chunk(
355 &self,
356 wid: usize,
357 offset: u64,
358 size: u32,
359 digest: &[u8; 32],
360 ) -> Result<(), Error> {
361 let mut state = self.state.lock().unwrap();
362
363 state.ensure_unfinished()?;
364
365 let mut data = match state.fixed_writers.get_mut(&wid) {
366 Some(data) => data,
367 None => bail!("fixed writer '{}' not registered", wid),
368 };
369
370 let end = (offset as usize) + (size as usize);
371 let idx = data.index.check_chunk_alignment(end, size as usize)?;
372
373 data.chunk_count += 1;
374
375 data.index.add_digest(idx, digest)?;
376
377 Ok(())
378 }
379
380 fn log_upload_stat(
381 &self,
382 archive_name: &str,
383 csum: &[u8; 32],
384 uuid: &[u8; 16],
385 size: u64,
386 chunk_count: u64,
387 upload_stat: &UploadStatistic,
388 ) {
389 self.log(format!("Upload statistics for '{}'", archive_name));
390 self.log(format!("UUID: {}", hex::encode(uuid)));
391 self.log(format!("Checksum: {}", hex::encode(csum)));
392 self.log(format!("Size: {}", size));
393 self.log(format!("Chunk count: {}", chunk_count));
394
395 if size == 0 || chunk_count == 0 {
396 return;
397 }
398
399 self.log(format!(
400 "Upload size: {} ({}%)",
401 upload_stat.size,
402 (upload_stat.size * 100) / size
403 ));
404
405 // account for zero chunk, which might be uploaded but never used
406 let client_side_duplicates = if chunk_count < upload_stat.count {
407 0
408 } else {
409 chunk_count - upload_stat.count
410 };
411
412 let server_side_duplicates = upload_stat.duplicates;
413
414 if (client_side_duplicates + server_side_duplicates) > 0 {
415 let per = (client_side_duplicates + server_side_duplicates) * 100 / chunk_count;
416 self.log(format!(
417 "Duplicates: {}+{} ({}%)",
418 client_side_duplicates, server_side_duplicates, per
419 ));
420 }
421
422 if upload_stat.size > 0 {
423 self.log(format!(
424 "Compression: {}%",
425 (upload_stat.compressed_size * 100) / upload_stat.size
426 ));
427 }
428 }
429
430 /// Close dynamic writer
431 pub fn dynamic_writer_close(
432 &self,
433 wid: usize,
434 chunk_count: u64,
435 size: u64,
436 csum: [u8; 32],
437 ) -> Result<(), Error> {
438 let mut state = self.state.lock().unwrap();
439
440 state.ensure_unfinished()?;
441
442 let mut data = match state.dynamic_writers.remove(&wid) {
443 Some(data) => data,
444 None => bail!("dynamic writer '{}' not registered", wid),
445 };
446
447 if data.chunk_count != chunk_count {
448 bail!(
449 "dynamic writer '{}' close failed - unexpected chunk count ({} != {})",
450 data.name,
451 data.chunk_count,
452 chunk_count
453 );
454 }
455
456 if data.offset != size {
457 bail!(
458 "dynamic writer '{}' close failed - unexpected file size ({} != {})",
459 data.name,
460 data.offset,
461 size
462 );
463 }
464
465 let uuid = data.index.uuid;
466
467 let expected_csum = data.index.close()?;
468
469 if csum != expected_csum {
470 bail!(
471 "dynamic writer '{}' close failed - got unexpected checksum",
472 data.name
473 );
474 }
475
476 self.log_upload_stat(
477 &data.name,
478 &csum,
479 &uuid,
480 size,
481 chunk_count,
482 &data.upload_stat,
483 );
484
485 state.file_counter += 1;
486 state.backup_size += size;
487 state.backup_stat = state.backup_stat + data.upload_stat;
488
489 Ok(())
490 }
491
492 /// Close fixed writer
493 pub fn fixed_writer_close(
494 &self,
495 wid: usize,
496 chunk_count: u64,
497 size: u64,
498 csum: [u8; 32],
499 ) -> Result<(), Error> {
500 let mut state = self.state.lock().unwrap();
501
502 state.ensure_unfinished()?;
503
504 let mut data = match state.fixed_writers.remove(&wid) {
505 Some(data) => data,
506 None => bail!("fixed writer '{}' not registered", wid),
507 };
508
509 if data.chunk_count != chunk_count {
510 bail!(
511 "fixed writer '{}' close failed - received wrong number of chunk ({} != {})",
512 data.name,
513 data.chunk_count,
514 chunk_count
515 );
516 }
517
518 if !data.incremental {
519 let expected_count = data.index.index_length();
520
521 if chunk_count != (expected_count as u64) {
522 bail!(
523 "fixed writer '{}' close failed - unexpected chunk count ({} != {})",
524 data.name,
525 expected_count,
526 chunk_count
527 );
528 }
529
530 if size != (data.size as u64) {
531 bail!(
532 "fixed writer '{}' close failed - unexpected file size ({} != {})",
533 data.name,
534 data.size,
535 size
536 );
537 }
538 }
539
540 let uuid = data.index.uuid;
541 let expected_csum = data.index.close()?;
542
543 if csum != expected_csum {
544 bail!(
545 "fixed writer '{}' close failed - got unexpected checksum",
546 data.name
547 );
548 }
549
550 self.log_upload_stat(
551 &data.name,
552 &expected_csum,
553 &uuid,
554 size,
555 chunk_count,
556 &data.upload_stat,
557 );
558
559 state.file_counter += 1;
560 state.backup_size += size;
561 state.backup_stat = state.backup_stat + data.upload_stat;
562
563 Ok(())
564 }
565
566 pub fn add_blob(&self, file_name: &str, data: Vec<u8>) -> Result<(), Error> {
567 let mut path = self.datastore.base_path();
568 path.push(self.backup_dir.relative_path());
569 path.push(file_name);
570
571 let blob_len = data.len();
572 let orig_len = data.len(); // fixme:
573
574 // always verify blob/CRC at server side
575 let blob = DataBlob::load_from_reader(&mut &data[..])?;
576
577 let raw_data = blob.raw_data();
578 replace_file(&path, raw_data, CreateOptions::new(), false)?;
579
580 self.log(format!(
581 "add blob {:?} ({} bytes, comp: {})",
582 path, orig_len, blob_len
583 ));
584
585 let mut state = self.state.lock().unwrap();
586 state.file_counter += 1;
587 state.backup_size += orig_len as u64;
588 state.backup_stat.size += blob_len as u64;
589
590 Ok(())
591 }
592
593 /// Mark backup as finished
594 pub fn finish_backup(&self) -> Result<(), Error> {
595 let mut state = self.state.lock().unwrap();
596
597 state.ensure_unfinished()?;
598
599 // test if all writer are correctly closed
600 if !state.dynamic_writers.is_empty() || !state.fixed_writers.is_empty() {
601 bail!("found open index writer - unable to finish backup");
602 }
603
604 if state.file_counter == 0 {
605 bail!("backup does not contain valid files (file count == 0)");
606 }
607
608 // check for valid manifest and store stats
609 let stats = serde_json::to_value(state.backup_stat)?;
610 self.datastore
611 .update_manifest(&self.backup_dir, |manifest| {
612 manifest.unprotected["chunk_upload_stats"] = stats;
613 })
614 .map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
615
616 if let Some(base) = &self.last_backup {
617 let path = self.datastore.snapshot_path(base.backup_dir.as_ref());
618 if !path.exists() {
619 bail!(
620 "base snapshot {} was removed during backup, cannot finish as chunks might be missing",
621 base.backup_dir
622 );
623 }
624 }
625
626 // marks the backup as successful
627 state.finished = true;
628
629 Ok(())
630 }
631
632 /// If verify-new is set on the datastore, this will run a new verify task
633 /// for the backup. If not, this will return and also drop the passed lock
634 /// immediately.
635 pub fn verify_after_complete(&self, snap_lock: Dir) -> Result<(), Error> {
636 self.ensure_finished()?;
637
638 if !self.datastore.verify_new() {
639 // no verify requested, do nothing
640 return Ok(());
641 }
642
643 let worker_id = format!(
644 "{}:{}/{}/{:08X}",
645 self.datastore.name(),
646 self.backup_dir.backup_type(),
647 self.backup_dir.backup_id(),
648 self.backup_dir.backup_time()
649 );
650
651 let datastore = self.datastore.clone();
652 let backup_dir = self.backup_dir.clone();
653
654 WorkerTask::new_thread(
655 "verify",
656 Some(worker_id),
657 self.auth_id.to_string(),
658 false,
659 move |worker| {
660 worker.log_message("Automatically verifying newly added snapshot");
661
662 let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
663 if !verify_backup_dir_with_lock(
664 &verify_worker,
665 &backup_dir,
666 worker.upid().clone(),
667 None,
668 snap_lock,
669 )? {
670 bail!("verification failed - please check the log for details");
671 }
672
673 Ok(())
674 },
675 )
676 .map(|_| ())
677 }
678
679 pub fn log<S: AsRef<str>>(&self, msg: S) {
680 self.worker.log_message(msg);
681 }
682
683 pub fn debug<S: AsRef<str>>(&self, msg: S) {
684 if self.debug {
685 self.worker.log_message(msg);
686 }
687 }
688
689 pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
690 self.formatter.format_result(result, self)
691 }
692
693 /// Raise error if finished flag is not set
694 pub fn ensure_finished(&self) -> Result<(), Error> {
695 let state = self.state.lock().unwrap();
696 if !state.finished {
697 bail!("backup ended but finished flag is not set.");
698 }
699 Ok(())
700 }
701
702 /// Return true if the finished flag is set
703 pub fn finished(&self) -> bool {
704 let state = self.state.lock().unwrap();
705 state.finished
706 }
707
708 /// Remove complete backup
709 pub fn remove_backup(&self) -> Result<(), Error> {
710 let mut state = self.state.lock().unwrap();
711 state.finished = true;
712
713 self.datastore
714 .remove_backup_dir(self.backup_dir.as_ref(), true)?;
715
716 Ok(())
717 }
718 }
719
720 impl RpcEnvironment for BackupEnvironment {
721 fn result_attrib_mut(&mut self) -> &mut Value {
722 &mut self.result_attributes
723 }
724
725 fn result_attrib(&self) -> &Value {
726 &self.result_attributes
727 }
728
729 fn env_type(&self) -> RpcEnvironmentType {
730 self.env_type
731 }
732
733 fn set_auth_id(&mut self, _auth_id: Option<String>) {
734 panic!("unable to change auth_id");
735 }
736
737 fn get_auth_id(&self) -> Option<String> {
738 Some(self.auth_id.to_string())
739 }
740 }
741
742 impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
743 fn as_ref(&self) -> &BackupEnvironment {
744 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
745 }
746 }
747
748 impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
749 fn as_ref(&self) -> &BackupEnvironment {
750 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
751 }
752 }