1 use anyhow
::{bail, format_err, Error}
;
3 use std
::collections
::HashMap
;
4 use std
::sync
::{Arc, Mutex}
;
6 use ::serde
::Serialize
;
7 use serde_json
::{json, Value}
;
9 use proxmox_router
::{RpcEnvironment, RpcEnvironmentType}
;
10 use proxmox_sys
::fs
::{replace_file, CreateOptions}
;
12 use pbs_api_types
::Authid
;
13 use pbs_datastore
::backup_info
::{BackupDir, BackupInfo}
;
14 use pbs_datastore
::dynamic_index
::DynamicIndexWriter
;
15 use pbs_datastore
::fixed_index
::FixedIndexWriter
;
16 use pbs_datastore
::{DataBlob, DataStore}
;
17 use proxmox_rest_server
::{formatter::*, WorkerTask}
;
19 use crate::backup
::verify_backup_dir_with_lock
;
21 use hyper
::{Body, Response}
;
23 #[derive(Copy, Clone, Serialize)]
24 struct UploadStatistic
{
31 impl UploadStatistic
{
42 impl std
::ops
::Add
for UploadStatistic
{
45 fn add(self, other
: Self) -> Self {
47 count
: self.count
+ other
.count
,
48 size
: self.size
+ other
.size
,
49 compressed_size
: self.compressed_size
+ other
.compressed_size
,
50 duplicates
: self.duplicates
+ other
.duplicates
,
55 struct DynamicWriterState
{
57 index
: DynamicIndexWriter
,
60 upload_stat
: UploadStatistic
,
63 struct FixedWriterState
{
65 index
: FixedIndexWriter
,
69 small_chunk_count
: usize, // allow 0..1 small chunks (last chunk may be smaller)
70 upload_stat
: UploadStatistic
,
74 // key=digest, value=length
75 type KnownChunksMap
= HashMap
<[u8; 32], u32>;
77 struct SharedBackupState
{
80 file_counter
: usize, // successfully uploaded files
81 dynamic_writers
: HashMap
<usize, DynamicWriterState
>,
82 fixed_writers
: HashMap
<usize, FixedWriterState
>,
83 known_chunks
: KnownChunksMap
,
84 backup_size
: u64, // sums up size of all files
85 backup_stat
: UploadStatistic
,
88 impl SharedBackupState
{
89 // Raise error if finished flag is set
90 fn ensure_unfinished(&self) -> Result
<(), Error
> {
92 bail
!("backup already marked as finished.");
97 // Get an unique integer ID
98 pub fn next_uid(&mut self) -> usize {
99 self.uid_counter
+= 1;
104 /// `RpcEnvironmet` implementation for backup service
106 pub struct BackupEnvironment
{
107 env_type
: RpcEnvironmentType
,
108 result_attributes
: Value
,
111 pub formatter
: &'
static dyn OutputFormatter
,
112 pub worker
: Arc
<WorkerTask
>,
113 pub datastore
: Arc
<DataStore
>,
114 pub backup_dir
: BackupDir
,
115 pub last_backup
: Option
<BackupInfo
>,
116 state
: Arc
<Mutex
<SharedBackupState
>>,
119 impl BackupEnvironment
{
121 env_type
: RpcEnvironmentType
,
123 worker
: Arc
<WorkerTask
>,
124 datastore
: Arc
<DataStore
>,
125 backup_dir
: BackupDir
,
127 let state
= SharedBackupState
{
131 dynamic_writers
: HashMap
::new(),
132 fixed_writers
: HashMap
::new(),
133 known_chunks
: HashMap
::new(),
135 backup_stat
: UploadStatistic
::new(),
139 result_attributes
: json
!({}
),
145 formatter
: JSON_FORMATTER
,
148 state
: Arc
::new(Mutex
::new(state
)),
152 /// Register a Chunk with associated length.
154 /// We do not fully trust clients, so a client may only use registered
155 /// chunks. Please use this method to register chunks from previous backups.
156 pub fn register_chunk(&self, digest
: [u8; 32], length
: u32) -> Result
<(), Error
> {
157 let mut state
= self.state
.lock().unwrap();
159 state
.ensure_unfinished()?
;
161 state
.known_chunks
.insert(digest
, length
);
166 /// Register fixed length chunks after upload.
168 /// Like `register_chunk()`, but additionally record statistics for
169 /// the fixed index writer.
170 pub fn register_fixed_chunk(
175 compressed_size
: u32,
177 ) -> Result
<(), Error
> {
178 let mut state
= self.state
.lock().unwrap();
180 state
.ensure_unfinished()?
;
182 let mut data
= match state
.fixed_writers
.get_mut(&wid
) {
184 None
=> bail
!("fixed writer '{}' not registered", wid
),
187 if size
> data
.chunk_size
{
189 "fixed writer '{}' - got large chunk ({} > {}",
196 if size
< data
.chunk_size
{
197 data
.small_chunk_count
+= 1;
198 if data
.small_chunk_count
> 1 {
200 "fixed writer '{}' - detected multiple end chunks (chunk size too small)",
207 data
.upload_stat
.count
+= 1;
208 data
.upload_stat
.size
+= size
as u64;
209 data
.upload_stat
.compressed_size
+= compressed_size
as u64;
211 data
.upload_stat
.duplicates
+= 1;
215 state
.known_chunks
.insert(digest
, size
);
220 /// Register dynamic length chunks after upload.
222 /// Like `register_chunk()`, but additionally record statistics for
223 /// the dynamic index writer.
224 pub fn register_dynamic_chunk(
229 compressed_size
: u32,
231 ) -> Result
<(), Error
> {
232 let mut state
= self.state
.lock().unwrap();
234 state
.ensure_unfinished()?
;
236 let mut data
= match state
.dynamic_writers
.get_mut(&wid
) {
238 None
=> bail
!("dynamic writer '{}' not registered", wid
),
242 data
.upload_stat
.count
+= 1;
243 data
.upload_stat
.size
+= size
as u64;
244 data
.upload_stat
.compressed_size
+= compressed_size
as u64;
246 data
.upload_stat
.duplicates
+= 1;
250 state
.known_chunks
.insert(digest
, size
);
255 pub fn lookup_chunk(&self, digest
: &[u8; 32]) -> Option
<u32> {
256 let state
= self.state
.lock().unwrap();
258 state
.known_chunks
.get(digest
).map(|len
| *len
)
261 /// Store the writer with an unique ID
262 pub fn register_dynamic_writer(
264 index
: DynamicIndexWriter
,
266 ) -> Result
<usize, Error
> {
267 let mut state
= self.state
.lock().unwrap();
269 state
.ensure_unfinished()?
;
271 let uid
= state
.next_uid();
273 state
.dynamic_writers
.insert(
280 upload_stat
: UploadStatistic
::new(),
287 /// Store the writer with an unique ID
288 pub fn register_fixed_writer(
290 index
: FixedIndexWriter
,
295 ) -> Result
<usize, Error
> {
296 let mut state
= self.state
.lock().unwrap();
298 state
.ensure_unfinished()?
;
300 let uid
= state
.next_uid();
302 state
.fixed_writers
.insert(
310 small_chunk_count
: 0,
311 upload_stat
: UploadStatistic
::new(),
319 /// Append chunk to dynamic writer
320 pub fn dynamic_writer_append_chunk(
326 ) -> Result
<(), Error
> {
327 let mut state
= self.state
.lock().unwrap();
329 state
.ensure_unfinished()?
;
331 let mut data
= match state
.dynamic_writers
.get_mut(&wid
) {
333 None
=> bail
!("dynamic writer '{}' not registered", wid
),
336 if data
.offset
!= offset
{
338 "dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
345 data
.offset
+= size
as u64;
346 data
.chunk_count
+= 1;
348 data
.index
.add_chunk(data
.offset
, digest
)?
;
353 /// Append chunk to fixed writer
354 pub fn fixed_writer_append_chunk(
360 ) -> Result
<(), Error
> {
361 let mut state
= self.state
.lock().unwrap();
363 state
.ensure_unfinished()?
;
365 let mut data
= match state
.fixed_writers
.get_mut(&wid
) {
367 None
=> bail
!("fixed writer '{}' not registered", wid
),
370 let end
= (offset
as usize) + (size
as usize);
371 let idx
= data
.index
.check_chunk_alignment(end
, size
as usize)?
;
373 data
.chunk_count
+= 1;
375 data
.index
.add_digest(idx
, digest
)?
;
387 upload_stat
: &UploadStatistic
,
389 self.log(format
!("Upload statistics for '{}'", archive_name
));
390 self.log(format
!("UUID: {}", hex
::encode(uuid
)));
391 self.log(format
!("Checksum: {}", hex
::encode(csum
)));
392 self.log(format
!("Size: {}", size
));
393 self.log(format
!("Chunk count: {}", chunk_count
));
395 if size
== 0 || chunk_count
== 0 {
400 "Upload size: {} ({}%)",
402 (upload_stat
.size
* 100) / size
405 // account for zero chunk, which might be uploaded but never used
406 let client_side_duplicates
= if chunk_count
< upload_stat
.count
{
409 chunk_count
- upload_stat
.count
412 let server_side_duplicates
= upload_stat
.duplicates
;
414 if (client_side_duplicates
+ server_side_duplicates
) > 0 {
415 let per
= (client_side_duplicates
+ server_side_duplicates
) * 100 / chunk_count
;
417 "Duplicates: {}+{} ({}%)",
418 client_side_duplicates
, server_side_duplicates
, per
422 if upload_stat
.size
> 0 {
425 (upload_stat
.compressed_size
* 100) / upload_stat
.size
430 /// Close dynamic writer
431 pub fn dynamic_writer_close(
437 ) -> Result
<(), Error
> {
438 let mut state
= self.state
.lock().unwrap();
440 state
.ensure_unfinished()?
;
442 let mut data
= match state
.dynamic_writers
.remove(&wid
) {
444 None
=> bail
!("dynamic writer '{}' not registered", wid
),
447 if data
.chunk_count
!= chunk_count
{
449 "dynamic writer '{}' close failed - unexpected chunk count ({} != {})",
456 if data
.offset
!= size
{
458 "dynamic writer '{}' close failed - unexpected file size ({} != {})",
465 let uuid
= data
.index
.uuid
;
467 let expected_csum
= data
.index
.close()?
;
469 if csum
!= expected_csum
{
471 "dynamic writer '{}' close failed - got unexpected checksum",
476 self.log_upload_stat(
485 state
.file_counter
+= 1;
486 state
.backup_size
+= size
;
487 state
.backup_stat
= state
.backup_stat
+ data
.upload_stat
;
492 /// Close fixed writer
493 pub fn fixed_writer_close(
499 ) -> Result
<(), Error
> {
500 let mut state
= self.state
.lock().unwrap();
502 state
.ensure_unfinished()?
;
504 let mut data
= match state
.fixed_writers
.remove(&wid
) {
506 None
=> bail
!("fixed writer '{}' not registered", wid
),
509 if data
.chunk_count
!= chunk_count
{
511 "fixed writer '{}' close failed - received wrong number of chunk ({} != {})",
518 if !data
.incremental
{
519 let expected_count
= data
.index
.index_length();
521 if chunk_count
!= (expected_count
as u64) {
523 "fixed writer '{}' close failed - unexpected chunk count ({} != {})",
530 if size
!= (data
.size
as u64) {
532 "fixed writer '{}' close failed - unexpected file size ({} != {})",
540 let uuid
= data
.index
.uuid
;
541 let expected_csum
= data
.index
.close()?
;
543 if csum
!= expected_csum
{
545 "fixed writer '{}' close failed - got unexpected checksum",
550 self.log_upload_stat(
559 state
.file_counter
+= 1;
560 state
.backup_size
+= size
;
561 state
.backup_stat
= state
.backup_stat
+ data
.upload_stat
;
566 pub fn add_blob(&self, file_name
: &str, data
: Vec
<u8>) -> Result
<(), Error
> {
567 let mut path
= self.datastore
.base_path();
568 path
.push(self.backup_dir
.relative_path());
569 path
.push(file_name
);
571 let blob_len
= data
.len();
572 let orig_len
= data
.len(); // fixme:
574 // always verify blob/CRC at server side
575 let blob
= DataBlob
::load_from_reader(&mut &data
[..])?
;
577 let raw_data
= blob
.raw_data();
578 replace_file(&path
, raw_data
, CreateOptions
::new(), false)?
;
581 "add blob {:?} ({} bytes, comp: {})",
582 path
, orig_len
, blob_len
585 let mut state
= self.state
.lock().unwrap();
586 state
.file_counter
+= 1;
587 state
.backup_size
+= orig_len
as u64;
588 state
.backup_stat
.size
+= blob_len
as u64;
593 /// Mark backup as finished
594 pub fn finish_backup(&self) -> Result
<(), Error
> {
595 let mut state
= self.state
.lock().unwrap();
597 state
.ensure_unfinished()?
;
599 // test if all writer are correctly closed
600 if !state
.dynamic_writers
.is_empty() || !state
.fixed_writers
.is_empty() {
601 bail
!("found open index writer - unable to finish backup");
604 if state
.file_counter
== 0 {
605 bail
!("backup does not contain valid files (file count == 0)");
608 // check for valid manifest and store stats
609 let stats
= serde_json
::to_value(state
.backup_stat
)?
;
611 .update_manifest(|manifest
| {
612 manifest
.unprotected
["chunk_upload_stats"] = stats
;
614 .map_err(|err
| format_err
!("unable to update manifest blob - {}", err
))?
;
616 if let Some(base
) = &self.last_backup
{
617 let path
= base
.backup_dir
.full_path();
620 "base snapshot {} was removed during backup, cannot finish as chunks might be missing",
626 // marks the backup as successful
627 state
.finished
= true;
632 /// If verify-new is set on the datastore, this will run a new verify task
633 /// for the backup. If not, this will return and also drop the passed lock
635 pub fn verify_after_complete(&self, snap_lock
: Dir
) -> Result
<(), Error
> {
636 self.ensure_finished()?
;
638 if !self.datastore
.verify_new() {
639 // no verify requested, do nothing
643 let worker_id
= format
!(
645 self.datastore
.name(),
646 self.backup_dir
.backup_type(),
647 self.backup_dir
.backup_id(),
648 self.backup_dir
.backup_time()
651 let datastore
= self.datastore
.clone();
652 let backup_dir
= self.backup_dir
.clone();
654 WorkerTask
::new_thread(
657 self.auth_id
.to_string(),
660 worker
.log_message("Automatically verifying newly added snapshot");
662 let verify_worker
= crate::backup
::VerifyWorker
::new(worker
.clone(), datastore
);
663 if !verify_backup_dir_with_lock(
666 worker
.upid().clone(),
670 bail
!("verification failed - please check the log for details");
679 pub fn log
<S
: AsRef
<str>>(&self, msg
: S
) {
680 self.worker
.log_message(msg
);
683 pub fn debug
<S
: AsRef
<str>>(&self, msg
: S
) {
685 self.worker
.log_message(msg
);
689 pub fn format_response(&self, result
: Result
<Value
, Error
>) -> Response
<Body
> {
690 self.formatter
.format_result(result
, self)
693 /// Raise error if finished flag is not set
694 pub fn ensure_finished(&self) -> Result
<(), Error
> {
695 let state
= self.state
.lock().unwrap();
697 bail
!("backup ended but finished flag is not set.");
702 /// Return true if the finished flag is set
703 pub fn finished(&self) -> bool
{
704 let state
= self.state
.lock().unwrap();
708 /// Remove complete backup
709 pub fn remove_backup(&self) -> Result
<(), Error
> {
710 let mut state
= self.state
.lock().unwrap();
711 state
.finished
= true;
713 self.datastore
.remove_backup_dir(
714 self.backup_dir
.backup_ns(),
715 self.backup_dir
.as_ref(),
723 impl RpcEnvironment
for BackupEnvironment
{
724 fn result_attrib_mut(&mut self) -> &mut Value
{
725 &mut self.result_attributes
728 fn result_attrib(&self) -> &Value
{
729 &self.result_attributes
732 fn env_type(&self) -> RpcEnvironmentType
{
736 fn set_auth_id(&mut self, _auth_id
: Option
<String
>) {
737 panic
!("unable to change auth_id");
740 fn get_auth_id(&self) -> Option
<String
> {
741 Some(self.auth_id
.to_string())
745 impl AsRef
<BackupEnvironment
> for dyn RpcEnvironment
{
746 fn as_ref(&self) -> &BackupEnvironment
{
747 self.as_any().downcast_ref
::<BackupEnvironment
>().unwrap()
751 impl AsRef
<BackupEnvironment
> for Box
<dyn RpcEnvironment
> {
752 fn as_ref(&self) -> &BackupEnvironment
{
753 self.as_any().downcast_ref
::<BackupEnvironment
>().unwrap()