2 use std
::sync
::{Arc, Mutex}
;
3 use std
::collections
::HashMap
;
12 use crate::api_schema
::router
::{RpcEnvironment, RpcEnvironmentType}
;
13 use crate::server
::WorkerTask
;
15 use crate::server
::formatter
::*;
16 use hyper
::{Body, Response}
;
18 struct UploadStatistic
{
25 impl UploadStatistic
{
36 struct DynamicWriterState
{
38 index
: DynamicIndexWriter
,
41 upload_stat
: UploadStatistic
,
44 struct FixedWriterState
{
46 index
: FixedIndexWriter
,
50 small_chunk_count
: usize, // allow 0..1 small chunks (last chunk may be smaller)
51 upload_stat
: UploadStatistic
,
54 struct SharedBackupState
{
57 file_counter
: usize, // sucessfully uploaded files
58 dynamic_writers
: HashMap
<usize, DynamicWriterState
>,
59 fixed_writers
: HashMap
<usize, FixedWriterState
>,
60 known_chunks
: HashMap
<[u8;32], u32>,
63 impl SharedBackupState
{
65 // Raise error if finished flag is set
66 fn ensure_unfinished(&self) -> Result
<(), Error
> {
68 bail
!("backup already marked as finished.");
73 // Get an unique integer ID
74 pub fn next_uid(&mut self) -> usize {
75 self.uid_counter
+= 1;
81 /// `RpcEnvironmet` implementation for backup service
83 pub struct BackupEnvironment
{
84 env_type
: RpcEnvironmentType
,
85 result_attributes
: HashMap
<String
, Value
>,
88 pub formatter
: &'
static OutputFormatter
,
89 pub worker
: Arc
<WorkerTask
>,
90 pub datastore
: Arc
<DataStore
>,
91 pub backup_dir
: BackupDir
,
92 pub last_backup
: Option
<BackupInfo
>,
93 state
: Arc
<Mutex
<SharedBackupState
>>
96 impl BackupEnvironment
{
98 env_type
: RpcEnvironmentType
,
100 worker
: Arc
<WorkerTask
>,
101 datastore
: Arc
<DataStore
>,
102 backup_dir
: BackupDir
,
105 let state
= SharedBackupState
{
109 dynamic_writers
: HashMap
::new(),
110 fixed_writers
: HashMap
::new(),
111 known_chunks
: HashMap
::new(),
115 result_attributes
: HashMap
::new(),
121 formatter
: &JSON_FORMATTER
,
124 state
: Arc
::new(Mutex
::new(state
)),
128 /// Register a Chunk with associated length.
130 /// We do not fully trust clients, so a client may only use registered
131 /// chunks. Please use this method to register chunks from previous backups.
132 pub fn register_chunk(&self, digest
: [u8; 32], length
: u32) -> Result
<(), Error
> {
133 let mut state
= self.state
.lock().unwrap();
135 state
.ensure_unfinished()?
;
137 state
.known_chunks
.insert(digest
, length
);
142 /// Register fixed length chunks after upload.
144 /// Like `register_chunk()`, but additionally record statistics for
145 /// the fixed index writer.
146 pub fn register_fixed_chunk(
151 compressed_size
: u32,
153 ) -> Result
<(), Error
> {
154 let mut state
= self.state
.lock().unwrap();
156 state
.ensure_unfinished()?
;
158 let mut data
= match state
.fixed_writers
.get_mut(&wid
) {
160 None
=> bail
!("fixed writer '{}' not registered", wid
),
163 if size
> data
.chunk_size
{
164 bail
!("fixed writer '{}' - got large chunk ({} > {}", data
.name
, size
, data
.chunk_size
);
165 } else if size
< data
.chunk_size
{
166 data
.small_chunk_count
+= 1;
167 if data
.small_chunk_count
> 1 {
168 bail
!("fixed writer '{}' - detected multiple end chunks (chunk size too small)");
173 data
.upload_stat
.count
+= 1;
174 data
.upload_stat
.size
+= size
as u64;
175 data
.upload_stat
.compressed_size
+= compressed_size
as u64;
176 if is_duplicate { data.upload_stat.duplicates += 1; }
179 state
.known_chunks
.insert(digest
, size
);
184 /// Register dynamic length chunks after upload.
186 /// Like `register_chunk()`, but additionally record statistics for
187 /// the dynamic index writer.
188 pub fn register_dynamic_chunk(
193 compressed_size
: u32,
195 ) -> Result
<(), Error
> {
196 let mut state
= self.state
.lock().unwrap();
198 state
.ensure_unfinished()?
;
200 let mut data
= match state
.dynamic_writers
.get_mut(&wid
) {
202 None
=> bail
!("dynamic writer '{}' not registered", wid
),
206 data
.upload_stat
.count
+= 1;
207 data
.upload_stat
.size
+= size
as u64;
208 data
.upload_stat
.compressed_size
+= compressed_size
as u64;
209 if is_duplicate { data.upload_stat.duplicates += 1; }
212 state
.known_chunks
.insert(digest
, size
);
217 pub fn lookup_chunk(&self, digest
: &[u8; 32]) -> Option
<u32> {
218 let state
= self.state
.lock().unwrap();
220 match state
.known_chunks
.get(digest
) {
221 Some(len
) => Some(*len
),
226 /// Store the writer with an unique ID
227 pub fn register_dynamic_writer(&self, index
: DynamicIndexWriter
, name
: String
) -> Result
<usize, Error
> {
228 let mut state
= self.state
.lock().unwrap();
230 state
.ensure_unfinished()?
;
232 let uid
= state
.next_uid();
234 state
.dynamic_writers
.insert(uid
, DynamicWriterState
{
235 index
, name
, offset
: 0, chunk_count
: 0, upload_stat
: UploadStatistic
::new(),
241 /// Store the writer with an unique ID
242 pub fn register_fixed_writer(&self, index
: FixedIndexWriter
, name
: String
, size
: usize, chunk_size
: u32) -> Result
<usize, Error
> {
243 let mut state
= self.state
.lock().unwrap();
245 state
.ensure_unfinished()?
;
247 let uid
= state
.next_uid();
249 state
.fixed_writers
.insert(uid
, FixedWriterState
{
250 index
, name
, chunk_count
: 0, size
, chunk_size
, small_chunk_count
: 0, upload_stat
: UploadStatistic
::new(),
256 /// Append chunk to dynamic writer
257 pub fn dynamic_writer_append_chunk(&self, wid
: usize, offset
: u64, size
: u32, digest
: &[u8; 32]) -> Result
<(), Error
> {
258 let mut state
= self.state
.lock().unwrap();
260 state
.ensure_unfinished()?
;
262 let mut data
= match state
.dynamic_writers
.get_mut(&wid
) {
264 None
=> bail
!("dynamic writer '{}' not registered", wid
),
268 if data
.offset
!= offset
{
269 bail
!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
270 data
.name
, data
.offset
, offset
);
273 data
.offset
+= size
as u64;
274 data
.chunk_count
+= 1;
276 data
.index
.add_chunk(data
.offset
, digest
)?
;
281 /// Append chunk to fixed writer
282 pub fn fixed_writer_append_chunk(&self, wid
: usize, offset
: u64, size
: u32, digest
: &[u8; 32]) -> Result
<(), Error
> {
283 let mut state
= self.state
.lock().unwrap();
285 state
.ensure_unfinished()?
;
287 let mut data
= match state
.fixed_writers
.get_mut(&wid
) {
289 None
=> bail
!("fixed writer '{}' not registered", wid
),
292 let end
= (offset
as usize) + (size
as usize);
293 let idx
= data
.index
.check_chunk_alignment(end
, size
as usize)?
;
295 data
.chunk_count
+= 1;
297 data
.index
.add_digest(idx
, digest
)?
;
302 fn log_upload_stat(&self, archive_name
: &str, csum
: &[u8; 32], uuid
: &[u8; 16], size
: u64, chunk_count
: u64, upload_stat
: &UploadStatistic
) {
303 self.log(format
!("Upload statistics for '{}'", archive_name
));
304 self.log(format
!("UUID: {}", digest_to_hex(uuid
)));
305 self.log(format
!("Checksum: {}", digest_to_hex(csum
)));
306 self.log(format
!("Size: {}", size
));
307 self.log(format
!("Chunk count: {}", chunk_count
));
309 if size
== 0 || chunk_count
== 0 {
313 self.log(format
!("Upload size: {} ({}%)", upload_stat
.size
, (upload_stat
.size
*100)/size
));
315 let client_side_duplicates
= chunk_count
- upload_stat
.count
;
316 let server_side_duplicates
= upload_stat
.duplicates
;
318 if (client_side_duplicates
+ server_side_duplicates
) > 0 {
319 let per
= (client_side_duplicates
+ server_side_duplicates
)*100/chunk_count
;
320 self.log(format
!("Duplicates: {}+{} ({}%)", client_side_duplicates
, server_side_duplicates
, per
));
323 if upload_stat
.size
> 0 {
324 self.log(format
!("Compression: {}%", (upload_stat
.compressed_size
*100)/upload_stat
.size
));
328 /// Close dynamic writer
329 pub fn dynamic_writer_close(&self, wid
: usize, chunk_count
: u64, size
: u64) -> Result
<(), Error
> {
330 let mut state
= self.state
.lock().unwrap();
332 state
.ensure_unfinished()?
;
334 let mut data
= match state
.dynamic_writers
.remove(&wid
) {
336 None
=> bail
!("dynamic writer '{}' not registered", wid
),
339 if data
.chunk_count
!= chunk_count
{
340 bail
!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data
.name
, data
.chunk_count
, chunk_count
);
343 if data
.offset
!= size
{
344 bail
!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data
.name
, data
.offset
, size
);
347 let uuid
= data
.index
.uuid
;
349 let csum
= data
.index
.close()?
;
351 self.log_upload_stat(&data
.name
, &csum
, &uuid
, size
, chunk_count
, &data
.upload_stat
);
353 state
.file_counter
+= 1;
358 /// Close fixed writer
359 pub fn fixed_writer_close(&self, wid
: usize, chunk_count
: u64, size
: u64) -> Result
<(), Error
> {
360 let mut state
= self.state
.lock().unwrap();
362 state
.ensure_unfinished()?
;
364 let mut data
= match state
.fixed_writers
.remove(&wid
) {
366 None
=> bail
!("fixed writer '{}' not registered", wid
),
369 if data
.chunk_count
!= chunk_count
{
370 bail
!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data
.name
, data
.chunk_count
, chunk_count
);
373 let expected_count
= data
.index
.index_length();
375 if chunk_count
!= (expected_count
as u64) {
376 bail
!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data
.name
, expected_count
, chunk_count
);
379 if size
!= (data
.size
as u64) {
380 bail
!("fixed writer '{}' close failed - unexpected file size ({} != {})", data
.name
, data
.size
, size
);
383 let uuid
= data
.index
.uuid
;
385 let csum
= data
.index
.close()?
;
387 self.log_upload_stat(&data
.name
, &csum
, &uuid
, size
, chunk_count
, &data
.upload_stat
);
389 state
.file_counter
+= 1;
394 pub fn add_blob(&self, file_name
: &str, data
: Vec
<u8>) -> Result
<(), Error
> {
396 let mut path
= self.datastore
.base_path();
397 path
.push(self.backup_dir
.relative_path());
398 path
.push(file_name
);
400 let blob_len
= data
.len();
401 let orig_len
= data
.len(); // fixme:
403 let mut blob
= DataBlob
::from_raw(data
)?
;
404 // always comput CRC at server side
405 blob
.set_crc(blob
.compute_crc());
407 let raw_data
= blob
.raw_data();
408 file_set_contents(&path
, raw_data
, None
)?
;
410 self.log(format
!("add blob {:?} ({} bytes, comp: {})", path
, orig_len
, blob_len
));
412 let mut state
= self.state
.lock().unwrap();
413 state
.file_counter
+= 1;
418 /// Mark backup as finished
419 pub fn finish_backup(&self) -> Result
<(), Error
> {
420 let mut state
= self.state
.lock().unwrap();
421 // test if all writer are correctly closed
423 state
.ensure_unfinished()?
;
425 state
.finished
= true;
427 if state
.dynamic_writers
.len() != 0 {
428 bail
!("found open index writer - unable to finish backup");
431 if state
.file_counter
== 0 {
432 bail
!("backup does not contain valid files (file count == 0)");
438 pub fn log
<S
: AsRef
<str>>(&self, msg
: S
) {
439 self.worker
.log(msg
);
442 pub fn debug
<S
: AsRef
<str>>(&self, msg
: S
) {
443 if self.debug { self.worker.log(msg); }
446 pub fn format_response(&self, result
: Result
<Value
, Error
>) -> Response
<Body
> {
448 Ok(data
) => (self.formatter
.format_data
)(data
, self),
449 Err(err
) => (self.formatter
.format_error
)(err
),
453 /// Raise error if finished flag is not set
454 pub fn ensure_finished(&self) -> Result
<(), Error
> {
455 let state
= self.state
.lock().unwrap();
457 bail
!("backup ended but finished flag is not set.");
462 /// Remove complete backup
463 pub fn remove_backup(&self) -> Result
<(), Error
> {
464 let mut state
= self.state
.lock().unwrap();
465 state
.finished
= true;
467 self.datastore
.remove_backup_dir(&self.backup_dir
)?
;
473 impl RpcEnvironment
for BackupEnvironment
{
475 fn set_result_attrib(&mut self, name
: &str, value
: Value
) {
476 self.result_attributes
.insert(name
.into(), value
);
479 fn get_result_attrib(&self, name
: &str) -> Option
<&Value
> {
480 self.result_attributes
.get(name
)
483 fn env_type(&self) -> RpcEnvironmentType
{
487 fn set_user(&mut self, _user
: Option
<String
>) {
488 panic
!("unable to change user");
491 fn get_user(&self) -> Option
<String
> {
492 Some(self.user
.clone())
496 impl AsRef
<BackupEnvironment
> for dyn RpcEnvironment
{
497 fn as_ref(&self) -> &BackupEnvironment
{
498 self.as_any().downcast_ref
::<BackupEnvironment
>().unwrap()
502 impl AsRef
<BackupEnvironment
> for Box
<dyn RpcEnvironment
> {
503 fn as_ref(&self) -> &BackupEnvironment
{
504 self.as_any().downcast_ref
::<BackupEnvironment
>().unwrap()