1 use anyhow
::{bail, format_err, Error}
;
2 use std
::sync
::{Arc, Mutex}
;
3 use std
::collections
::HashMap
;
6 use ::serde
::{Serialize}
;
7 use serde_json
::{json, Value}
;
9 use proxmox
::tools
::digest_to_hex
;
10 use proxmox
::tools
::fs
::{replace_file, CreateOptions}
;
11 use proxmox
::api
::{RpcEnvironment, RpcEnvironmentType}
;
13 use pbs_datastore
::DataBlob
;
14 use pbs_datastore
::backup_info
::{BackupDir, BackupInfo}
;
15 use pbs_datastore
::dynamic_index
::DynamicIndexWriter
;
16 use pbs_datastore
::fixed_index
::FixedIndexWriter
;
17 use pbs_api_types
::Authid
;
18 use proxmox_rest_server
::{WorkerTask, formatter::*}
;
20 use crate::backup
::{verify_backup_dir_with_lock, DataStore}
;
22 use hyper
::{Body, Response}
;
24 #[derive(Copy, Clone, Serialize)]
25 struct UploadStatistic
{
32 impl UploadStatistic
{
43 impl std
::ops
::Add
for UploadStatistic
{
46 fn add(self, other
: Self) -> Self {
48 count
: self.count
+ other
.count
,
49 size
: self.size
+ other
.size
,
50 compressed_size
: self.compressed_size
+ other
.compressed_size
,
51 duplicates
: self.duplicates
+ other
.duplicates
,
56 struct DynamicWriterState
{
58 index
: DynamicIndexWriter
,
61 upload_stat
: UploadStatistic
,
64 struct FixedWriterState
{
66 index
: FixedIndexWriter
,
70 small_chunk_count
: usize, // allow 0..1 small chunks (last chunk may be smaller)
71 upload_stat
: UploadStatistic
,
75 // key=digest, value=length
76 type KnownChunksMap
= HashMap
<[u8;32], u32>;
78 struct SharedBackupState
{
81 file_counter
: usize, // successfully uploaded files
82 dynamic_writers
: HashMap
<usize, DynamicWriterState
>,
83 fixed_writers
: HashMap
<usize, FixedWriterState
>,
84 known_chunks
: KnownChunksMap
,
85 backup_size
: u64, // sums up size of all files
86 backup_stat
: UploadStatistic
,
89 impl SharedBackupState
{
91 // Raise error if finished flag is set
92 fn ensure_unfinished(&self) -> Result
<(), Error
> {
94 bail
!("backup already marked as finished.");
99 // Get an unique integer ID
100 pub fn next_uid(&mut self) -> usize {
101 self.uid_counter
+= 1;
107 /// `RpcEnvironmet` implementation for backup service
109 pub struct BackupEnvironment
{
110 env_type
: RpcEnvironmentType
,
111 result_attributes
: Value
,
114 pub formatter
: &'
static OutputFormatter
,
115 pub worker
: Arc
<WorkerTask
>,
116 pub datastore
: Arc
<DataStore
>,
117 pub backup_dir
: BackupDir
,
118 pub last_backup
: Option
<BackupInfo
>,
119 state
: Arc
<Mutex
<SharedBackupState
>>
122 impl BackupEnvironment
{
124 env_type
: RpcEnvironmentType
,
126 worker
: Arc
<WorkerTask
>,
127 datastore
: Arc
<DataStore
>,
128 backup_dir
: BackupDir
,
131 let state
= SharedBackupState
{
135 dynamic_writers
: HashMap
::new(),
136 fixed_writers
: HashMap
::new(),
137 known_chunks
: HashMap
::new(),
139 backup_stat
: UploadStatistic
::new(),
143 result_attributes
: json
!({}
),
149 formatter
: &JSON_FORMATTER
,
152 state
: Arc
::new(Mutex
::new(state
)),
156 /// Register a Chunk with associated length.
158 /// We do not fully trust clients, so a client may only use registered
159 /// chunks. Please use this method to register chunks from previous backups.
160 pub fn register_chunk(&self, digest
: [u8; 32], length
: u32) -> Result
<(), Error
> {
161 let mut state
= self.state
.lock().unwrap();
163 state
.ensure_unfinished()?
;
165 state
.known_chunks
.insert(digest
, length
);
170 /// Register fixed length chunks after upload.
172 /// Like `register_chunk()`, but additionally record statistics for
173 /// the fixed index writer.
174 pub fn register_fixed_chunk(
179 compressed_size
: u32,
181 ) -> Result
<(), Error
> {
182 let mut state
= self.state
.lock().unwrap();
184 state
.ensure_unfinished()?
;
186 let mut data
= match state
.fixed_writers
.get_mut(&wid
) {
188 None
=> bail
!("fixed writer '{}' not registered", wid
),
191 if size
> data
.chunk_size
{
192 bail
!("fixed writer '{}' - got large chunk ({} > {}", data
.name
, size
, data
.chunk_size
);
195 if size
< data
.chunk_size
{
196 data
.small_chunk_count
+= 1;
197 if data
.small_chunk_count
> 1 {
198 bail
!("fixed writer '{}' - detected multiple end chunks (chunk size too small)");
203 data
.upload_stat
.count
+= 1;
204 data
.upload_stat
.size
+= size
as u64;
205 data
.upload_stat
.compressed_size
+= compressed_size
as u64;
206 if is_duplicate { data.upload_stat.duplicates += 1; }
209 state
.known_chunks
.insert(digest
, size
);
214 /// Register dynamic length chunks after upload.
216 /// Like `register_chunk()`, but additionally record statistics for
217 /// the dynamic index writer.
218 pub fn register_dynamic_chunk(
223 compressed_size
: u32,
225 ) -> Result
<(), Error
> {
226 let mut state
= self.state
.lock().unwrap();
228 state
.ensure_unfinished()?
;
230 let mut data
= match state
.dynamic_writers
.get_mut(&wid
) {
232 None
=> bail
!("dynamic writer '{}' not registered", wid
),
236 data
.upload_stat
.count
+= 1;
237 data
.upload_stat
.size
+= size
as u64;
238 data
.upload_stat
.compressed_size
+= compressed_size
as u64;
239 if is_duplicate { data.upload_stat.duplicates += 1; }
242 state
.known_chunks
.insert(digest
, size
);
247 pub fn lookup_chunk(&self, digest
: &[u8; 32]) -> Option
<u32> {
248 let state
= self.state
.lock().unwrap();
250 match state
.known_chunks
.get(digest
) {
251 Some(len
) => Some(*len
),
256 /// Store the writer with an unique ID
257 pub fn register_dynamic_writer(&self, index
: DynamicIndexWriter
, name
: String
) -> Result
<usize, Error
> {
258 let mut state
= self.state
.lock().unwrap();
260 state
.ensure_unfinished()?
;
262 let uid
= state
.next_uid();
264 state
.dynamic_writers
.insert(uid
, DynamicWriterState
{
265 index
, name
, offset
: 0, chunk_count
: 0, upload_stat
: UploadStatistic
::new(),
271 /// Store the writer with an unique ID
272 pub fn register_fixed_writer(&self, index
: FixedIndexWriter
, name
: String
, size
: usize, chunk_size
: u32, incremental
: bool
) -> Result
<usize, Error
> {
273 let mut state
= self.state
.lock().unwrap();
275 state
.ensure_unfinished()?
;
277 let uid
= state
.next_uid();
279 state
.fixed_writers
.insert(uid
, FixedWriterState
{
280 index
, name
, chunk_count
: 0, size
, chunk_size
, small_chunk_count
: 0, upload_stat
: UploadStatistic
::new(), incremental
,
286 /// Append chunk to dynamic writer
287 pub fn dynamic_writer_append_chunk(&self, wid
: usize, offset
: u64, size
: u32, digest
: &[u8; 32]) -> Result
<(), Error
> {
288 let mut state
= self.state
.lock().unwrap();
290 state
.ensure_unfinished()?
;
292 let mut data
= match state
.dynamic_writers
.get_mut(&wid
) {
294 None
=> bail
!("dynamic writer '{}' not registered", wid
),
298 if data
.offset
!= offset
{
299 bail
!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
300 data
.name
, data
.offset
, offset
);
303 data
.offset
+= size
as u64;
304 data
.chunk_count
+= 1;
306 data
.index
.add_chunk(data
.offset
, digest
)?
;
311 /// Append chunk to fixed writer
312 pub fn fixed_writer_append_chunk(&self, wid
: usize, offset
: u64, size
: u32, digest
: &[u8; 32]) -> Result
<(), Error
> {
313 let mut state
= self.state
.lock().unwrap();
315 state
.ensure_unfinished()?
;
317 let mut data
= match state
.fixed_writers
.get_mut(&wid
) {
319 None
=> bail
!("fixed writer '{}' not registered", wid
),
322 let end
= (offset
as usize) + (size
as usize);
323 let idx
= data
.index
.check_chunk_alignment(end
, size
as usize)?
;
325 data
.chunk_count
+= 1;
327 data
.index
.add_digest(idx
, digest
)?
;
332 fn log_upload_stat(&self, archive_name
: &str, csum
: &[u8; 32], uuid
: &[u8; 16], size
: u64, chunk_count
: u64, upload_stat
: &UploadStatistic
) {
333 self.log(format
!("Upload statistics for '{}'", archive_name
));
334 self.log(format
!("UUID: {}", digest_to_hex(uuid
)));
335 self.log(format
!("Checksum: {}", digest_to_hex(csum
)));
336 self.log(format
!("Size: {}", size
));
337 self.log(format
!("Chunk count: {}", chunk_count
));
339 if size
== 0 || chunk_count
== 0 {
343 self.log(format
!("Upload size: {} ({}%)", upload_stat
.size
, (upload_stat
.size
*100)/size
));
345 // account for zero chunk, which might be uploaded but never used
346 let client_side_duplicates
= if chunk_count
< upload_stat
.count
{
349 chunk_count
- upload_stat
.count
352 let server_side_duplicates
= upload_stat
.duplicates
;
354 if (client_side_duplicates
+ server_side_duplicates
) > 0 {
355 let per
= (client_side_duplicates
+ server_side_duplicates
)*100/chunk_count
;
356 self.log(format
!("Duplicates: {}+{} ({}%)", client_side_duplicates
, server_side_duplicates
, per
));
359 if upload_stat
.size
> 0 {
360 self.log(format
!("Compression: {}%", (upload_stat
.compressed_size
*100)/upload_stat
.size
));
364 /// Close dynamic writer
365 pub fn dynamic_writer_close(&self, wid
: usize, chunk_count
: u64, size
: u64, csum
: [u8; 32]) -> Result
<(), Error
> {
366 let mut state
= self.state
.lock().unwrap();
368 state
.ensure_unfinished()?
;
370 let mut data
= match state
.dynamic_writers
.remove(&wid
) {
372 None
=> bail
!("dynamic writer '{}' not registered", wid
),
375 if data
.chunk_count
!= chunk_count
{
376 bail
!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data
.name
, data
.chunk_count
, chunk_count
);
379 if data
.offset
!= size
{
380 bail
!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data
.name
, data
.offset
, size
);
383 let uuid
= data
.index
.uuid
;
385 let expected_csum
= data
.index
.close()?
;
387 if csum
!= expected_csum
{
388 bail
!("dynamic writer '{}' close failed - got unexpected checksum", data
.name
);
391 self.log_upload_stat(&data
.name
, &csum
, &uuid
, size
, chunk_count
, &data
.upload_stat
);
393 state
.file_counter
+= 1;
394 state
.backup_size
+= size
;
395 state
.backup_stat
= state
.backup_stat
+ data
.upload_stat
;
400 /// Close fixed writer
401 pub fn fixed_writer_close(&self, wid
: usize, chunk_count
: u64, size
: u64, csum
: [u8; 32]) -> Result
<(), Error
> {
402 let mut state
= self.state
.lock().unwrap();
404 state
.ensure_unfinished()?
;
406 let mut data
= match state
.fixed_writers
.remove(&wid
) {
408 None
=> bail
!("fixed writer '{}' not registered", wid
),
411 if data
.chunk_count
!= chunk_count
{
412 bail
!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data
.name
, data
.chunk_count
, chunk_count
);
415 if !data
.incremental
{
416 let expected_count
= data
.index
.index_length();
418 if chunk_count
!= (expected_count
as u64) {
419 bail
!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data
.name
, expected_count
, chunk_count
);
422 if size
!= (data
.size
as u64) {
423 bail
!("fixed writer '{}' close failed - unexpected file size ({} != {})", data
.name
, data
.size
, size
);
427 let uuid
= data
.index
.uuid
;
428 let expected_csum
= data
.index
.close()?
;
430 if csum
!= expected_csum
{
431 bail
!("fixed writer '{}' close failed - got unexpected checksum", data
.name
);
434 self.log_upload_stat(&data
.name
, &expected_csum
, &uuid
, size
, chunk_count
, &data
.upload_stat
);
436 state
.file_counter
+= 1;
437 state
.backup_size
+= size
;
438 state
.backup_stat
= state
.backup_stat
+ data
.upload_stat
;
443 pub fn add_blob(&self, file_name
: &str, data
: Vec
<u8>) -> Result
<(), Error
> {
445 let mut path
= self.datastore
.base_path();
446 path
.push(self.backup_dir
.relative_path());
447 path
.push(file_name
);
449 let blob_len
= data
.len();
450 let orig_len
= data
.len(); // fixme:
452 // always verify blob/CRC at server side
453 let blob
= DataBlob
::load_from_reader(&mut &data
[..])?
;
455 let raw_data
= blob
.raw_data();
456 replace_file(&path
, raw_data
, CreateOptions
::new())?
;
458 self.log(format
!("add blob {:?} ({} bytes, comp: {})", path
, orig_len
, blob_len
));
460 let mut state
= self.state
.lock().unwrap();
461 state
.file_counter
+= 1;
462 state
.backup_size
+= orig_len
as u64;
463 state
.backup_stat
.size
+= blob_len
as u64;
468 /// Mark backup as finished
469 pub fn finish_backup(&self) -> Result
<(), Error
> {
470 let mut state
= self.state
.lock().unwrap();
472 state
.ensure_unfinished()?
;
474 // test if all writer are correctly closed
475 if !state
.dynamic_writers
.is_empty() || !state
.fixed_writers
.is_empty() {
476 bail
!("found open index writer - unable to finish backup");
479 if state
.file_counter
== 0 {
480 bail
!("backup does not contain valid files (file count == 0)");
483 // check for valid manifest and store stats
484 let stats
= serde_json
::to_value(state
.backup_stat
)?
;
485 self.datastore
.update_manifest(&self.backup_dir
, |manifest
| {
486 manifest
.unprotected
["chunk_upload_stats"] = stats
;
487 }).map_err(|err
| format_err
!("unable to update manifest blob - {}", err
))?
;
489 if let Some(base
) = &self.last_backup
{
490 let path
= self.datastore
.snapshot_path(&base
.backup_dir
);
493 "base snapshot {} was removed during backup, cannot finish as chunks might be missing",
499 // marks the backup as successful
500 state
.finished
= true;
505 /// If verify-new is set on the datastore, this will run a new verify task
506 /// for the backup. If not, this will return and also drop the passed lock
508 pub fn verify_after_complete(&self, snap_lock
: Dir
) -> Result
<(), Error
> {
509 self.ensure_finished()?
;
511 if !self.datastore
.verify_new() {
512 // no verify requested, do nothing
516 let worker_id
= format
!("{}:{}/{}/{:08X}",
517 self.datastore
.name(),
518 self.backup_dir
.group().backup_type(),
519 self.backup_dir
.group().backup_id(),
520 self.backup_dir
.backup_time());
522 let datastore
= self.datastore
.clone();
523 let backup_dir
= self.backup_dir
.clone();
525 WorkerTask
::new_thread(
528 self.auth_id
.to_string(),
531 worker
.log_message("Automatically verifying newly added snapshot");
534 let verify_worker
= crate::backup
::VerifyWorker
::new(worker
.clone(), datastore
);
535 if !verify_backup_dir_with_lock(
538 worker
.upid().clone(),
542 bail
!("verification failed - please check the log for details");
550 pub fn log
<S
: AsRef
<str>>(&self, msg
: S
) {
551 self.worker
.log_message(msg
);
554 pub fn debug
<S
: AsRef
<str>>(&self, msg
: S
) {
555 if self.debug { self.worker.log_message(msg); }
558 pub fn format_response(&self, result
: Result
<Value
, Error
>) -> Response
<Body
> {
560 Ok(data
) => (self.formatter
.format_data
)(data
, self),
561 Err(err
) => (self.formatter
.format_error
)(err
),
565 /// Raise error if finished flag is not set
566 pub fn ensure_finished(&self) -> Result
<(), Error
> {
567 let state
= self.state
.lock().unwrap();
569 bail
!("backup ended but finished flag is not set.");
574 /// Return true if the finished flag is set
575 pub fn finished(&self) -> bool
{
576 let state
= self.state
.lock().unwrap();
580 /// Remove complete backup
581 pub fn remove_backup(&self) -> Result
<(), Error
> {
582 let mut state
= self.state
.lock().unwrap();
583 state
.finished
= true;
585 self.datastore
.remove_backup_dir(&self.backup_dir
, true)?
;
591 impl RpcEnvironment
for BackupEnvironment
{
593 fn result_attrib_mut(&mut self) -> &mut Value
{
594 &mut self.result_attributes
597 fn result_attrib(&self) -> &Value
{
598 &self.result_attributes
601 fn env_type(&self) -> RpcEnvironmentType
{
605 fn set_auth_id(&mut self, _auth_id
: Option
<String
>) {
606 panic
!("unable to change auth_id");
609 fn get_auth_id(&self) -> Option
<String
> {
610 Some(self.auth_id
.to_string())
614 impl AsRef
<BackupEnvironment
> for dyn RpcEnvironment
{
615 fn as_ref(&self) -> &BackupEnvironment
{
616 self.as_any().downcast_ref
::<BackupEnvironment
>().unwrap()
620 impl AsRef
<BackupEnvironment
> for Box
<dyn RpcEnvironment
> {
621 fn as_ref(&self) -> &BackupEnvironment
{
622 self.as_any().downcast_ref
::<BackupEnvironment
>().unwrap()