1 use anyhow
::{bail, Error}
;
2 use std
::sync
::{Arc, Mutex}
;
3 use std
::collections
::HashMap
;
7 use proxmox
::tools
::digest_to_hex
;
8 use proxmox
::tools
::fs
::{replace_file, CreateOptions}
;
9 use proxmox
::api
::{RpcEnvironment, RpcEnvironmentType}
;
11 use crate::server
::WorkerTask
;
13 use crate::server
::formatter
::*;
14 use hyper
::{Body, Response}
;
16 struct UploadStatistic
{
23 impl UploadStatistic
{
34 struct DynamicWriterState
{
36 index
: DynamicIndexWriter
,
39 upload_stat
: UploadStatistic
,
42 struct FixedWriterState
{
44 index
: FixedIndexWriter
,
48 small_chunk_count
: usize, // allow 0..1 small chunks (last chunk may be smaller)
49 upload_stat
: UploadStatistic
,
52 struct SharedBackupState
{
55 file_counter
: usize, // sucessfully uploaded files
56 dynamic_writers
: HashMap
<usize, DynamicWriterState
>,
57 fixed_writers
: HashMap
<usize, FixedWriterState
>,
58 known_chunks
: HashMap
<[u8;32], u32>,
61 impl SharedBackupState
{
63 // Raise error if finished flag is set
64 fn ensure_unfinished(&self) -> Result
<(), Error
> {
66 bail
!("backup already marked as finished.");
71 // Get an unique integer ID
72 pub fn next_uid(&mut self) -> usize {
73 self.uid_counter
+= 1;
79 /// `RpcEnvironmet` implementation for backup service
81 pub struct BackupEnvironment
{
82 env_type
: RpcEnvironmentType
,
83 result_attributes
: HashMap
<String
, Value
>,
86 pub formatter
: &'
static OutputFormatter
,
87 pub worker
: Arc
<WorkerTask
>,
88 pub datastore
: Arc
<DataStore
>,
89 pub backup_dir
: BackupDir
,
90 pub last_backup
: Option
<BackupInfo
>,
91 state
: Arc
<Mutex
<SharedBackupState
>>
94 impl BackupEnvironment
{
96 env_type
: RpcEnvironmentType
,
98 worker
: Arc
<WorkerTask
>,
99 datastore
: Arc
<DataStore
>,
100 backup_dir
: BackupDir
,
103 let state
= SharedBackupState
{
107 dynamic_writers
: HashMap
::new(),
108 fixed_writers
: HashMap
::new(),
109 known_chunks
: HashMap
::new(),
113 result_attributes
: HashMap
::new(),
119 formatter
: &JSON_FORMATTER
,
122 state
: Arc
::new(Mutex
::new(state
)),
126 /// Register a Chunk with associated length.
128 /// We do not fully trust clients, so a client may only use registered
129 /// chunks. Please use this method to register chunks from previous backups.
130 pub fn register_chunk(&self, digest
: [u8; 32], length
: u32) -> Result
<(), Error
> {
131 let mut state
= self.state
.lock().unwrap();
133 state
.ensure_unfinished()?
;
135 state
.known_chunks
.insert(digest
, length
);
140 /// Register fixed length chunks after upload.
142 /// Like `register_chunk()`, but additionally record statistics for
143 /// the fixed index writer.
144 pub fn register_fixed_chunk(
149 compressed_size
: u32,
151 ) -> Result
<(), Error
> {
152 let mut state
= self.state
.lock().unwrap();
154 state
.ensure_unfinished()?
;
156 let mut data
= match state
.fixed_writers
.get_mut(&wid
) {
158 None
=> bail
!("fixed writer '{}' not registered", wid
),
161 if size
> data
.chunk_size
{
162 bail
!("fixed writer '{}' - got large chunk ({} > {}", data
.name
, size
, data
.chunk_size
);
163 } else if size
< data
.chunk_size
{
164 data
.small_chunk_count
+= 1;
165 if data
.small_chunk_count
> 1 {
166 bail
!("fixed writer '{}' - detected multiple end chunks (chunk size too small)");
171 data
.upload_stat
.count
+= 1;
172 data
.upload_stat
.size
+= size
as u64;
173 data
.upload_stat
.compressed_size
+= compressed_size
as u64;
174 if is_duplicate { data.upload_stat.duplicates += 1; }
177 state
.known_chunks
.insert(digest
, size
);
182 /// Register dynamic length chunks after upload.
184 /// Like `register_chunk()`, but additionally record statistics for
185 /// the dynamic index writer.
186 pub fn register_dynamic_chunk(
191 compressed_size
: u32,
193 ) -> Result
<(), Error
> {
194 let mut state
= self.state
.lock().unwrap();
196 state
.ensure_unfinished()?
;
198 let mut data
= match state
.dynamic_writers
.get_mut(&wid
) {
200 None
=> bail
!("dynamic writer '{}' not registered", wid
),
204 data
.upload_stat
.count
+= 1;
205 data
.upload_stat
.size
+= size
as u64;
206 data
.upload_stat
.compressed_size
+= compressed_size
as u64;
207 if is_duplicate { data.upload_stat.duplicates += 1; }
210 state
.known_chunks
.insert(digest
, size
);
215 pub fn lookup_chunk(&self, digest
: &[u8; 32]) -> Option
<u32> {
216 let state
= self.state
.lock().unwrap();
218 match state
.known_chunks
.get(digest
) {
219 Some(len
) => Some(*len
),
224 /// Store the writer with an unique ID
225 pub fn register_dynamic_writer(&self, index
: DynamicIndexWriter
, name
: String
) -> Result
<usize, Error
> {
226 let mut state
= self.state
.lock().unwrap();
228 state
.ensure_unfinished()?
;
230 let uid
= state
.next_uid();
232 state
.dynamic_writers
.insert(uid
, DynamicWriterState
{
233 index
, name
, offset
: 0, chunk_count
: 0, upload_stat
: UploadStatistic
::new(),
239 /// Store the writer with an unique ID
240 pub fn register_fixed_writer(&self, index
: FixedIndexWriter
, name
: String
, size
: usize, chunk_size
: u32) -> Result
<usize, Error
> {
241 let mut state
= self.state
.lock().unwrap();
243 state
.ensure_unfinished()?
;
245 let uid
= state
.next_uid();
247 state
.fixed_writers
.insert(uid
, FixedWriterState
{
248 index
, name
, chunk_count
: 0, size
, chunk_size
, small_chunk_count
: 0, upload_stat
: UploadStatistic
::new(),
254 /// Append chunk to dynamic writer
255 pub fn dynamic_writer_append_chunk(&self, wid
: usize, offset
: u64, size
: u32, digest
: &[u8; 32]) -> Result
<(), Error
> {
256 let mut state
= self.state
.lock().unwrap();
258 state
.ensure_unfinished()?
;
260 let mut data
= match state
.dynamic_writers
.get_mut(&wid
) {
262 None
=> bail
!("dynamic writer '{}' not registered", wid
),
266 if data
.offset
!= offset
{
267 bail
!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
268 data
.name
, data
.offset
, offset
);
271 data
.offset
+= size
as u64;
272 data
.chunk_count
+= 1;
274 data
.index
.add_chunk(data
.offset
, digest
)?
;
279 /// Append chunk to fixed writer
280 pub fn fixed_writer_append_chunk(&self, wid
: usize, offset
: u64, size
: u32, digest
: &[u8; 32]) -> Result
<(), Error
> {
281 let mut state
= self.state
.lock().unwrap();
283 state
.ensure_unfinished()?
;
285 let mut data
= match state
.fixed_writers
.get_mut(&wid
) {
287 None
=> bail
!("fixed writer '{}' not registered", wid
),
290 let end
= (offset
as usize) + (size
as usize);
291 let idx
= data
.index
.check_chunk_alignment(end
, size
as usize)?
;
293 data
.chunk_count
+= 1;
295 data
.index
.add_digest(idx
, digest
)?
;
300 fn log_upload_stat(&self, archive_name
: &str, csum
: &[u8; 32], uuid
: &[u8; 16], size
: u64, chunk_count
: u64, upload_stat
: &UploadStatistic
) {
301 self.log(format
!("Upload statistics for '{}'", archive_name
));
302 self.log(format
!("UUID: {}", digest_to_hex(uuid
)));
303 self.log(format
!("Checksum: {}", digest_to_hex(csum
)));
304 self.log(format
!("Size: {}", size
));
305 self.log(format
!("Chunk count: {}", chunk_count
));
307 if size
== 0 || chunk_count
== 0 {
311 self.log(format
!("Upload size: {} ({}%)", upload_stat
.size
, (upload_stat
.size
*100)/size
));
313 let client_side_duplicates
= chunk_count
- upload_stat
.count
;
314 let server_side_duplicates
= upload_stat
.duplicates
;
316 if (client_side_duplicates
+ server_side_duplicates
) > 0 {
317 let per
= (client_side_duplicates
+ server_side_duplicates
)*100/chunk_count
;
318 self.log(format
!("Duplicates: {}+{} ({}%)", client_side_duplicates
, server_side_duplicates
, per
));
321 if upload_stat
.size
> 0 {
322 self.log(format
!("Compression: {}%", (upload_stat
.compressed_size
*100)/upload_stat
.size
));
326 /// Close dynamic writer
327 pub fn dynamic_writer_close(&self, wid
: usize, chunk_count
: u64, size
: u64, csum
: [u8; 32]) -> Result
<(), Error
> {
328 let mut state
= self.state
.lock().unwrap();
330 state
.ensure_unfinished()?
;
332 let mut data
= match state
.dynamic_writers
.remove(&wid
) {
334 None
=> bail
!("dynamic writer '{}' not registered", wid
),
337 if data
.chunk_count
!= chunk_count
{
338 bail
!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data
.name
, data
.chunk_count
, chunk_count
);
341 if data
.offset
!= size
{
342 bail
!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data
.name
, data
.offset
, size
);
345 let uuid
= data
.index
.uuid
;
347 let expected_csum
= data
.index
.close()?
;
349 println
!("server checksum {:?} client: {:?}", expected_csum
, csum
);
350 if csum
!= expected_csum
{
351 bail
!("dynamic writer '{}' close failed - got unexpected checksum", data
.name
);
354 self.log_upload_stat(&data
.name
, &csum
, &uuid
, size
, chunk_count
, &data
.upload_stat
);
356 state
.file_counter
+= 1;
361 /// Close fixed writer
362 pub fn fixed_writer_close(&self, wid
: usize, chunk_count
: u64, size
: u64, csum
: [u8; 32]) -> Result
<(), Error
> {
363 let mut state
= self.state
.lock().unwrap();
365 state
.ensure_unfinished()?
;
367 let mut data
= match state
.fixed_writers
.remove(&wid
) {
369 None
=> bail
!("fixed writer '{}' not registered", wid
),
372 if data
.chunk_count
!= chunk_count
{
373 bail
!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data
.name
, data
.chunk_count
, chunk_count
);
376 let expected_count
= data
.index
.index_length();
378 if chunk_count
!= (expected_count
as u64) {
379 bail
!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data
.name
, expected_count
, chunk_count
);
382 if size
!= (data
.size
as u64) {
383 bail
!("fixed writer '{}' close failed - unexpected file size ({} != {})", data
.name
, data
.size
, size
);
386 let uuid
= data
.index
.uuid
;
388 let expected_csum
= data
.index
.close()?
;
390 println
!("server checksum {:?} client: {:?}", expected_csum
, csum
);
391 if csum
!= expected_csum
{
392 bail
!("fixed writer '{}' close failed - got unexpected checksum", data
.name
);
395 self.log_upload_stat(&data
.name
, &expected_csum
, &uuid
, size
, chunk_count
, &data
.upload_stat
);
397 state
.file_counter
+= 1;
402 pub fn add_blob(&self, file_name
: &str, data
: Vec
<u8>) -> Result
<(), Error
> {
404 let mut path
= self.datastore
.base_path();
405 path
.push(self.backup_dir
.relative_path());
406 path
.push(file_name
);
408 let blob_len
= data
.len();
409 let orig_len
= data
.len(); // fixme:
411 let blob
= DataBlob
::from_raw(data
)?
;
412 // always verify CRC at server side
415 let raw_data
= blob
.raw_data();
416 replace_file(&path
, raw_data
, CreateOptions
::new())?
;
418 self.log(format
!("add blob {:?} ({} bytes, comp: {})", path
, orig_len
, blob_len
));
420 let mut state
= self.state
.lock().unwrap();
421 state
.file_counter
+= 1;
426 /// Mark backup as finished
427 pub fn finish_backup(&self) -> Result
<(), Error
> {
428 let mut state
= self.state
.lock().unwrap();
429 // test if all writer are correctly closed
431 state
.ensure_unfinished()?
;
433 state
.finished
= true;
435 if state
.dynamic_writers
.len() != 0 {
436 bail
!("found open index writer - unable to finish backup");
439 if state
.file_counter
== 0 {
440 bail
!("backup does not contain valid files (file count == 0)");
446 pub fn log
<S
: AsRef
<str>>(&self, msg
: S
) {
447 self.worker
.log(msg
);
450 pub fn debug
<S
: AsRef
<str>>(&self, msg
: S
) {
451 if self.debug { self.worker.log(msg); }
454 pub fn format_response(&self, result
: Result
<Value
, Error
>) -> Response
<Body
> {
456 Ok(data
) => (self.formatter
.format_data
)(data
, self),
457 Err(err
) => (self.formatter
.format_error
)(err
),
461 /// Raise error if finished flag is not set
462 pub fn ensure_finished(&self) -> Result
<(), Error
> {
463 let state
= self.state
.lock().unwrap();
465 bail
!("backup ended but finished flag is not set.");
470 /// Remove complete backup
471 pub fn remove_backup(&self) -> Result
<(), Error
> {
472 let mut state
= self.state
.lock().unwrap();
473 state
.finished
= true;
475 self.datastore
.remove_backup_dir(&self.backup_dir
)?
;
481 impl RpcEnvironment
for BackupEnvironment
{
483 fn set_result_attrib(&mut self, name
: &str, value
: Value
) {
484 self.result_attributes
.insert(name
.into(), value
);
487 fn get_result_attrib(&self, name
: &str) -> Option
<&Value
> {
488 self.result_attributes
.get(name
)
491 fn env_type(&self) -> RpcEnvironmentType
{
495 fn set_user(&mut self, _user
: Option
<String
>) {
496 panic
!("unable to change user");
499 fn get_user(&self) -> Option
<String
> {
500 Some(self.user
.clone())
504 impl AsRef
<BackupEnvironment
> for dyn RpcEnvironment
{
505 fn as_ref(&self) -> &BackupEnvironment
{
506 self.as_any().downcast_ref
::<BackupEnvironment
>().unwrap()
510 impl AsRef
<BackupEnvironment
> for Box
<dyn RpcEnvironment
> {
511 fn as_ref(&self) -> &BackupEnvironment
{
512 self.as_any().downcast_ref
::<BackupEnvironment
>().unwrap()