2 use std
::sync
::{Arc, Mutex}
;
3 use std
::collections
::HashMap
;
11 use proxmox
::api
::{RpcEnvironment, RpcEnvironmentType}
;
13 use crate::server
::WorkerTask
;
15 use crate::server
::formatter
::*;
16 use hyper
::{Body, Response}
;
18 struct UploadStatistic
{
25 impl UploadStatistic
{
36 struct DynamicWriterState
{
38 index
: DynamicIndexWriter
,
41 upload_stat
: UploadStatistic
,
44 struct FixedWriterState
{
46 index
: FixedIndexWriter
,
50 small_chunk_count
: usize, // allow 0..1 small chunks (last chunk may be smaller)
51 upload_stat
: UploadStatistic
,
54 struct SharedBackupState
{
57 file_counter
: usize, // sucessfully uploaded files
58 dynamic_writers
: HashMap
<usize, DynamicWriterState
>,
59 fixed_writers
: HashMap
<usize, FixedWriterState
>,
60 known_chunks
: HashMap
<[u8;32], u32>,
63 impl SharedBackupState
{
65 // Raise error if finished flag is set
66 fn ensure_unfinished(&self) -> Result
<(), Error
> {
68 bail
!("backup already marked as finished.");
73 // Get an unique integer ID
74 pub fn next_uid(&mut self) -> usize {
75 self.uid_counter
+= 1;
81 /// `RpcEnvironmet` implementation for backup service
83 pub struct BackupEnvironment
{
84 env_type
: RpcEnvironmentType
,
85 result_attributes
: HashMap
<String
, Value
>,
88 pub formatter
: &'
static OutputFormatter
,
89 pub worker
: Arc
<WorkerTask
>,
90 pub datastore
: Arc
<DataStore
>,
91 pub backup_dir
: BackupDir
,
92 pub last_backup
: Option
<BackupInfo
>,
93 state
: Arc
<Mutex
<SharedBackupState
>>
96 impl BackupEnvironment
{
98 env_type
: RpcEnvironmentType
,
100 worker
: Arc
<WorkerTask
>,
101 datastore
: Arc
<DataStore
>,
102 backup_dir
: BackupDir
,
105 let state
= SharedBackupState
{
109 dynamic_writers
: HashMap
::new(),
110 fixed_writers
: HashMap
::new(),
111 known_chunks
: HashMap
::new(),
115 result_attributes
: HashMap
::new(),
121 formatter
: &JSON_FORMATTER
,
124 state
: Arc
::new(Mutex
::new(state
)),
128 /// Register a Chunk with associated length.
130 /// We do not fully trust clients, so a client may only use registered
131 /// chunks. Please use this method to register chunks from previous backups.
132 pub fn register_chunk(&self, digest
: [u8; 32], length
: u32) -> Result
<(), Error
> {
133 let mut state
= self.state
.lock().unwrap();
135 state
.ensure_unfinished()?
;
137 state
.known_chunks
.insert(digest
, length
);
142 /// Register fixed length chunks after upload.
144 /// Like `register_chunk()`, but additionally record statistics for
145 /// the fixed index writer.
146 pub fn register_fixed_chunk(
151 compressed_size
: u32,
153 ) -> Result
<(), Error
> {
154 let mut state
= self.state
.lock().unwrap();
156 state
.ensure_unfinished()?
;
158 let mut data
= match state
.fixed_writers
.get_mut(&wid
) {
160 None
=> bail
!("fixed writer '{}' not registered", wid
),
163 if size
> data
.chunk_size
{
164 bail
!("fixed writer '{}' - got large chunk ({} > {}", data
.name
, size
, data
.chunk_size
);
165 } else if size
< data
.chunk_size
{
166 data
.small_chunk_count
+= 1;
167 if data
.small_chunk_count
> 1 {
168 bail
!("fixed writer '{}' - detected multiple end chunks (chunk size too small)");
173 data
.upload_stat
.count
+= 1;
174 data
.upload_stat
.size
+= size
as u64;
175 data
.upload_stat
.compressed_size
+= compressed_size
as u64;
176 if is_duplicate { data.upload_stat.duplicates += 1; }
179 state
.known_chunks
.insert(digest
, size
);
184 /// Register dynamic length chunks after upload.
186 /// Like `register_chunk()`, but additionally record statistics for
187 /// the dynamic index writer.
188 pub fn register_dynamic_chunk(
193 compressed_size
: u32,
195 ) -> Result
<(), Error
> {
196 let mut state
= self.state
.lock().unwrap();
198 state
.ensure_unfinished()?
;
200 let mut data
= match state
.dynamic_writers
.get_mut(&wid
) {
202 None
=> bail
!("dynamic writer '{}' not registered", wid
),
206 data
.upload_stat
.count
+= 1;
207 data
.upload_stat
.size
+= size
as u64;
208 data
.upload_stat
.compressed_size
+= compressed_size
as u64;
209 if is_duplicate { data.upload_stat.duplicates += 1; }
212 state
.known_chunks
.insert(digest
, size
);
217 pub fn lookup_chunk(&self, digest
: &[u8; 32]) -> Option
<u32> {
218 let state
= self.state
.lock().unwrap();
220 match state
.known_chunks
.get(digest
) {
221 Some(len
) => Some(*len
),
226 /// Store the writer with an unique ID
227 pub fn register_dynamic_writer(&self, index
: DynamicIndexWriter
, name
: String
) -> Result
<usize, Error
> {
228 let mut state
= self.state
.lock().unwrap();
230 state
.ensure_unfinished()?
;
232 let uid
= state
.next_uid();
234 state
.dynamic_writers
.insert(uid
, DynamicWriterState
{
235 index
, name
, offset
: 0, chunk_count
: 0, upload_stat
: UploadStatistic
::new(),
241 /// Store the writer with an unique ID
242 pub fn register_fixed_writer(&self, index
: FixedIndexWriter
, name
: String
, size
: usize, chunk_size
: u32) -> Result
<usize, Error
> {
243 let mut state
= self.state
.lock().unwrap();
245 state
.ensure_unfinished()?
;
247 let uid
= state
.next_uid();
249 state
.fixed_writers
.insert(uid
, FixedWriterState
{
250 index
, name
, chunk_count
: 0, size
, chunk_size
, small_chunk_count
: 0, upload_stat
: UploadStatistic
::new(),
256 /// Append chunk to dynamic writer
257 pub fn dynamic_writer_append_chunk(&self, wid
: usize, offset
: u64, size
: u32, digest
: &[u8; 32]) -> Result
<(), Error
> {
258 let mut state
= self.state
.lock().unwrap();
260 state
.ensure_unfinished()?
;
262 let mut data
= match state
.dynamic_writers
.get_mut(&wid
) {
264 None
=> bail
!("dynamic writer '{}' not registered", wid
),
268 if data
.offset
!= offset
{
269 bail
!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
270 data
.name
, data
.offset
, offset
);
273 data
.offset
+= size
as u64;
274 data
.chunk_count
+= 1;
276 data
.index
.add_chunk(data
.offset
, digest
)?
;
281 /// Append chunk to fixed writer
282 pub fn fixed_writer_append_chunk(&self, wid
: usize, offset
: u64, size
: u32, digest
: &[u8; 32]) -> Result
<(), Error
> {
283 let mut state
= self.state
.lock().unwrap();
285 state
.ensure_unfinished()?
;
287 let mut data
= match state
.fixed_writers
.get_mut(&wid
) {
289 None
=> bail
!("fixed writer '{}' not registered", wid
),
292 let end
= (offset
as usize) + (size
as usize);
293 let idx
= data
.index
.check_chunk_alignment(end
, size
as usize)?
;
295 data
.chunk_count
+= 1;
297 data
.index
.add_digest(idx
, digest
)?
;
302 fn log_upload_stat(&self, archive_name
: &str, csum
: &[u8; 32], uuid
: &[u8; 16], size
: u64, chunk_count
: u64, upload_stat
: &UploadStatistic
) {
303 self.log(format
!("Upload statistics for '{}'", archive_name
));
304 self.log(format
!("UUID: {}", digest_to_hex(uuid
)));
305 self.log(format
!("Checksum: {}", digest_to_hex(csum
)));
306 self.log(format
!("Size: {}", size
));
307 self.log(format
!("Chunk count: {}", chunk_count
));
309 if size
== 0 || chunk_count
== 0 {
313 self.log(format
!("Upload size: {} ({}%)", upload_stat
.size
, (upload_stat
.size
*100)/size
));
315 let client_side_duplicates
= chunk_count
- upload_stat
.count
;
316 let server_side_duplicates
= upload_stat
.duplicates
;
318 if (client_side_duplicates
+ server_side_duplicates
) > 0 {
319 let per
= (client_side_duplicates
+ server_side_duplicates
)*100/chunk_count
;
320 self.log(format
!("Duplicates: {}+{} ({}%)", client_side_duplicates
, server_side_duplicates
, per
));
323 if upload_stat
.size
> 0 {
324 self.log(format
!("Compression: {}%", (upload_stat
.compressed_size
*100)/upload_stat
.size
));
328 /// Close dynamic writer
329 pub fn dynamic_writer_close(&self, wid
: usize, chunk_count
: u64, size
: u64, csum
: [u8; 32]) -> Result
<(), Error
> {
330 let mut state
= self.state
.lock().unwrap();
332 state
.ensure_unfinished()?
;
334 let mut data
= match state
.dynamic_writers
.remove(&wid
) {
336 None
=> bail
!("dynamic writer '{}' not registered", wid
),
339 if data
.chunk_count
!= chunk_count
{
340 bail
!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data
.name
, data
.chunk_count
, chunk_count
);
343 if data
.offset
!= size
{
344 bail
!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data
.name
, data
.offset
, size
);
347 let uuid
= data
.index
.uuid
;
349 let expected_csum
= data
.index
.close()?
;
351 println
!("server checksum {:?} client: {:?}", expected_csum
, csum
);
352 if csum
!= expected_csum
{
353 bail
!("dynamic writer '{}' close failed - got unexpected checksum", data
.name
);
356 self.log_upload_stat(&data
.name
, &csum
, &uuid
, size
, chunk_count
, &data
.upload_stat
);
358 state
.file_counter
+= 1;
363 /// Close fixed writer
364 pub fn fixed_writer_close(&self, wid
: usize, chunk_count
: u64, size
: u64, csum
: [u8; 32]) -> Result
<(), Error
> {
365 let mut state
= self.state
.lock().unwrap();
367 state
.ensure_unfinished()?
;
369 let mut data
= match state
.fixed_writers
.remove(&wid
) {
371 None
=> bail
!("fixed writer '{}' not registered", wid
),
374 if data
.chunk_count
!= chunk_count
{
375 bail
!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data
.name
, data
.chunk_count
, chunk_count
);
378 let expected_count
= data
.index
.index_length();
380 if chunk_count
!= (expected_count
as u64) {
381 bail
!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data
.name
, expected_count
, chunk_count
);
384 if size
!= (data
.size
as u64) {
385 bail
!("fixed writer '{}' close failed - unexpected file size ({} != {})", data
.name
, data
.size
, size
);
388 let uuid
= data
.index
.uuid
;
390 let expected_csum
= data
.index
.close()?
;
392 println
!("server checksum {:?} client: {:?}", expected_csum
, csum
);
393 if csum
!= expected_csum
{
394 bail
!("fixed writer '{}' close failed - got unexpected checksum", data
.name
);
397 self.log_upload_stat(&data
.name
, &expected_csum
, &uuid
, size
, chunk_count
, &data
.upload_stat
);
399 state
.file_counter
+= 1;
404 pub fn add_blob(&self, file_name
: &str, data
: Vec
<u8>) -> Result
<(), Error
> {
406 let mut path
= self.datastore
.base_path();
407 path
.push(self.backup_dir
.relative_path());
408 path
.push(file_name
);
410 let blob_len
= data
.len();
411 let orig_len
= data
.len(); // fixme:
413 let blob
= DataBlob
::from_raw(data
)?
;
414 // always verify CRC at server side
417 let raw_data
= blob
.raw_data();
418 file_set_contents(&path
, raw_data
, None
)?
;
420 self.log(format
!("add blob {:?} ({} bytes, comp: {})", path
, orig_len
, blob_len
));
422 let mut state
= self.state
.lock().unwrap();
423 state
.file_counter
+= 1;
428 /// Mark backup as finished
429 pub fn finish_backup(&self) -> Result
<(), Error
> {
430 let mut state
= self.state
.lock().unwrap();
431 // test if all writer are correctly closed
433 state
.ensure_unfinished()?
;
435 state
.finished
= true;
437 if state
.dynamic_writers
.len() != 0 {
438 bail
!("found open index writer - unable to finish backup");
441 if state
.file_counter
== 0 {
442 bail
!("backup does not contain valid files (file count == 0)");
448 pub fn log
<S
: AsRef
<str>>(&self, msg
: S
) {
449 self.worker
.log(msg
);
452 pub fn debug
<S
: AsRef
<str>>(&self, msg
: S
) {
453 if self.debug { self.worker.log(msg); }
456 pub fn format_response(&self, result
: Result
<Value
, Error
>) -> Response
<Body
> {
458 Ok(data
) => (self.formatter
.format_data
)(data
, self),
459 Err(err
) => (self.formatter
.format_error
)(err
),
463 /// Raise error if finished flag is not set
464 pub fn ensure_finished(&self) -> Result
<(), Error
> {
465 let state
= self.state
.lock().unwrap();
467 bail
!("backup ended but finished flag is not set.");
472 /// Remove complete backup
473 pub fn remove_backup(&self) -> Result
<(), Error
> {
474 let mut state
= self.state
.lock().unwrap();
475 state
.finished
= true;
477 self.datastore
.remove_backup_dir(&self.backup_dir
)?
;
483 impl RpcEnvironment
for BackupEnvironment
{
485 fn set_result_attrib(&mut self, name
: &str, value
: Value
) {
486 self.result_attributes
.insert(name
.into(), value
);
489 fn get_result_attrib(&self, name
: &str) -> Option
<&Value
> {
490 self.result_attributes
.get(name
)
493 fn env_type(&self) -> RpcEnvironmentType
{
497 fn set_user(&mut self, _user
: Option
<String
>) {
498 panic
!("unable to change user");
501 fn get_user(&self) -> Option
<String
> {
502 Some(self.user
.clone())
506 impl AsRef
<BackupEnvironment
> for dyn RpcEnvironment
{
507 fn as_ref(&self) -> &BackupEnvironment
{
508 self.as_any().downcast_ref
::<BackupEnvironment
>().unwrap()
512 impl AsRef
<BackupEnvironment
> for Box
<dyn RpcEnvironment
> {
513 fn as_ref(&self) -> &BackupEnvironment
{
514 self.as_any().downcast_ref
::<BackupEnvironment
>().unwrap()