2 use std
::sync
::{Arc, Mutex}
;
3 use std
::collections
::HashMap
;
7 use crate::api_schema
::router
::{RpcEnvironment, RpcEnvironmentType}
;
8 use crate::server
::WorkerTask
;
10 use crate::server
::formatter
::*;
11 use hyper
::{Body, Response}
;
13 struct UploadStatistic
{
20 impl UploadStatistic
{
31 struct DynamicWriterState
{
33 index
: DynamicIndexWriter
,
36 upload_stat
: UploadStatistic
,
39 struct FixedWriterState
{
41 index
: FixedIndexWriter
,
45 upload_stat
: UploadStatistic
,
48 struct SharedBackupState
{
51 file_counter
: usize, // sucessfully uploaded files
52 dynamic_writers
: HashMap
<usize, DynamicWriterState
>,
53 fixed_writers
: HashMap
<usize, FixedWriterState
>,
54 known_chunks
: HashMap
<[u8;32], u32>,
57 impl SharedBackupState
{
59 // Raise error if finished flag is set
60 fn ensure_unfinished(&self) -> Result
<(), Error
> {
62 bail
!("backup already marked as finished.");
67 // Get an unique integer ID
68 pub fn next_uid(&mut self) -> usize {
69 self.uid_counter
+= 1;
75 /// `RpcEnvironmet` implementation for backup service
77 pub struct BackupEnvironment
{
78 env_type
: RpcEnvironmentType
,
79 result_attributes
: HashMap
<String
, Value
>,
82 pub formatter
: &'
static OutputFormatter
,
83 pub worker
: Arc
<WorkerTask
>,
84 pub datastore
: Arc
<DataStore
>,
85 pub backup_dir
: BackupDir
,
86 pub last_backup
: Option
<BackupInfo
>,
87 state
: Arc
<Mutex
<SharedBackupState
>>
90 impl BackupEnvironment
{
92 env_type
: RpcEnvironmentType
,
94 worker
: Arc
<WorkerTask
>,
95 datastore
: Arc
<DataStore
>,
96 backup_dir
: BackupDir
,
99 let state
= SharedBackupState
{
103 dynamic_writers
: HashMap
::new(),
104 fixed_writers
: HashMap
::new(),
105 known_chunks
: HashMap
::new(),
109 result_attributes
: HashMap
::new(),
115 formatter
: &JSON_FORMATTER
,
118 state
: Arc
::new(Mutex
::new(state
)),
122 /// Register a Chunk with associated length.
124 /// We do not fully trust clients, so a client may only use registered
125 /// chunks. Please use this method to register chunks from previous backups.
126 pub fn register_chunk(&self, digest
: [u8; 32], length
: u32) -> Result
<(), Error
> {
127 let mut state
= self.state
.lock().unwrap();
129 state
.ensure_unfinished()?
;
131 state
.known_chunks
.insert(digest
, length
);
136 /// Register fixed length chunks after upload.
138 /// Like `register_chunk()`, but additionally record statistics for
139 /// the fixed index writer.
140 pub fn register_fixed_chunk(
145 compressed_size
: u32,
147 ) -> Result
<(), Error
> {
148 let mut state
= self.state
.lock().unwrap();
150 state
.ensure_unfinished()?
;
152 let mut data
= match state
.fixed_writers
.get_mut(&wid
) {
154 None
=> bail
!("fixed writer '{}' not registered", wid
),
157 if size
!= data
.chunk_size
{
158 bail
!("fixed writer '{}' - got unexpected chunk size ({} != {}", data
.name
, size
, data
.chunk_size
);
162 data
.upload_stat
.count
+= 1;
163 data
.upload_stat
.size
+= size
as u64;
164 data
.upload_stat
.compressed_size
+= compressed_size
as u64;
165 if is_duplicate { data.upload_stat.duplicates += 1; }
168 state
.known_chunks
.insert(digest
, size
);
173 /// Register dynamic length chunks after upload.
175 /// Like `register_chunk()`, but additionally record statistics for
176 /// the dynamic index writer.
177 pub fn register_dynamic_chunk(
182 compressed_size
: u32,
184 ) -> Result
<(), Error
> {
185 let mut state
= self.state
.lock().unwrap();
187 state
.ensure_unfinished()?
;
189 let mut data
= match state
.dynamic_writers
.get_mut(&wid
) {
191 None
=> bail
!("dynamic writer '{}' not registered", wid
),
195 data
.upload_stat
.count
+= 1;
196 data
.upload_stat
.size
+= size
as u64;
197 data
.upload_stat
.compressed_size
+= compressed_size
as u64;
198 if is_duplicate { data.upload_stat.duplicates += 1; }
201 state
.known_chunks
.insert(digest
, size
);
206 pub fn lookup_chunk(&self, digest
: &[u8; 32]) -> Option
<u32> {
207 let state
= self.state
.lock().unwrap();
209 match state
.known_chunks
.get(digest
) {
210 Some(len
) => Some(*len
),
215 /// Store the writer with an unique ID
216 pub fn register_dynamic_writer(&self, index
: DynamicIndexWriter
, name
: String
) -> Result
<usize, Error
> {
217 let mut state
= self.state
.lock().unwrap();
219 state
.ensure_unfinished()?
;
221 let uid
= state
.next_uid();
223 state
.dynamic_writers
.insert(uid
, DynamicWriterState
{
224 index
, name
, offset
: 0, chunk_count
: 0, upload_stat
: UploadStatistic
::new(),
230 /// Store the writer with an unique ID
231 pub fn register_fixed_writer(&self, index
: FixedIndexWriter
, name
: String
, size
: usize, chunk_size
: u32) -> Result
<usize, Error
> {
232 let mut state
= self.state
.lock().unwrap();
234 state
.ensure_unfinished()?
;
236 let uid
= state
.next_uid();
238 state
.fixed_writers
.insert(uid
, FixedWriterState
{
239 index
, name
, chunk_count
: 0, size
, chunk_size
, upload_stat
: UploadStatistic
::new(),
245 /// Append chunk to dynamic writer
246 pub fn dynamic_writer_append_chunk(&self, wid
: usize, offset
: u64, size
: u32, digest
: &[u8; 32]) -> Result
<(), Error
> {
247 let mut state
= self.state
.lock().unwrap();
249 state
.ensure_unfinished()?
;
251 let mut data
= match state
.dynamic_writers
.get_mut(&wid
) {
253 None
=> bail
!("dynamic writer '{}' not registered", wid
),
257 if data
.offset
!= offset
{
258 bail
!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
259 data
.name
, data
.offset
, offset
);
262 data
.offset
+= size
as u64;
263 data
.chunk_count
+= 1;
265 data
.index
.add_chunk(data
.offset
, digest
)?
;
270 /// Append chunk to fixed writer
271 pub fn fixed_writer_append_chunk(&self, wid
: usize, offset
: u64, size
: u32, digest
: &[u8; 32]) -> Result
<(), Error
> {
272 let mut state
= self.state
.lock().unwrap();
274 state
.ensure_unfinished()?
;
276 let mut data
= match state
.fixed_writers
.get_mut(&wid
) {
278 None
=> bail
!("fixed writer '{}' not registered", wid
),
281 data
.chunk_count
+= 1;
283 if size
!= data
.chunk_size
{
284 bail
!("fixed writer '{}' - got unexpected chunk size ({} != {}", data
.name
, size
, data
.chunk_size
);
287 let pos
= (offset
as usize)/(data
.chunk_size
as usize);
288 data
.index
.add_digest(pos
, digest
)?
;
293 fn log_upload_stat(&self, archive_name
: &str, size
: u64, chunk_count
: u64, upload_stat
: &UploadStatistic
) {
294 self.log(format
!("Upload statistics for '{}'", archive_name
));
295 self.log(format
!("Size: {}", size
));
296 self.log(format
!("Chunk count: {}", chunk_count
));
297 self.log(format
!("Upload size: {} ({}%)", upload_stat
.size
, (upload_stat
.size
*100)/size
));
298 if upload_stat
.size
> 0 {
299 self.log(format
!("Compression: {}%", (upload_stat
.compressed_size
*100)/upload_stat
.size
));
303 /// Close dynamic writer
304 pub fn dynamic_writer_close(&self, wid
: usize, chunk_count
: u64, size
: u64) -> Result
<(), Error
> {
305 let mut state
= self.state
.lock().unwrap();
307 state
.ensure_unfinished()?
;
309 let mut data
= match state
.dynamic_writers
.remove(&wid
) {
311 None
=> bail
!("dynamic writer '{}' not registered", wid
),
314 if data
.chunk_count
!= chunk_count
{
315 bail
!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data
.name
, data
.chunk_count
, chunk_count
);
318 if data
.offset
!= size
{
319 bail
!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data
.name
, data
.offset
, size
);
324 self.log_upload_stat(&data
.name
, size
, chunk_count
, &data
.upload_stat
);
326 state
.file_counter
+= 1;
331 /// Close fixed writer
332 pub fn fixed_writer_close(&self, wid
: usize, chunk_count
: u64, size
: u64) -> Result
<(), Error
> {
333 let mut state
= self.state
.lock().unwrap();
335 state
.ensure_unfinished()?
;
337 let mut data
= match state
.fixed_writers
.remove(&wid
) {
339 None
=> bail
!("fixed writer '{}' not registered", wid
),
342 if data
.chunk_count
!= chunk_count
{
343 bail
!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data
.name
, data
.chunk_count
, chunk_count
);
346 let expected_count
= data
.index
.index_length();
348 if chunk_count
!= (expected_count
as u64) {
349 bail
!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data
.name
, expected_count
, chunk_count
);
352 if size
!= (data
.size
as u64) {
353 bail
!("fixed writer '{}' close failed - unexpected file size ({} != {})", data
.name
, data
.size
, size
);
358 self.log_upload_stat(&data
.name
, size
, chunk_count
, &data
.upload_stat
);
360 state
.file_counter
+= 1;
365 /// Mark backup as finished
366 pub fn finish_backup(&self) -> Result
<(), Error
> {
367 let mut state
= self.state
.lock().unwrap();
368 // test if all writer are correctly closed
370 state
.ensure_unfinished()?
;
372 state
.finished
= true;
374 if state
.dynamic_writers
.len() != 0 {
375 bail
!("found open index writer - unable to finish backup");
378 if state
.file_counter
== 0 {
379 bail
!("backup does not contain valid files (file count == 0)");
385 pub fn log
<S
: AsRef
<str>>(&self, msg
: S
) {
386 self.worker
.log(msg
);
389 pub fn debug
<S
: AsRef
<str>>(&self, msg
: S
) {
390 if self.debug { self.worker.log(msg); }
393 pub fn format_response(&self, result
: Result
<Value
, Error
>) -> Response
<Body
> {
395 Ok(data
) => (self.formatter
.format_data
)(data
, self),
396 Err(err
) => (self.formatter
.format_error
)(err
),
400 /// Raise error if finished flag is not set
401 pub fn ensure_finished(&self) -> Result
<(), Error
> {
402 let state
= self.state
.lock().unwrap();
404 bail
!("backup ended but finished flag is not set.");
409 /// Remove complete backup
410 pub fn remove_backup(&self) -> Result
<(), Error
> {
411 let mut state
= self.state
.lock().unwrap();
412 state
.finished
= true;
414 self.datastore
.remove_backup_dir(&self.backup_dir
)?
;
420 impl RpcEnvironment
for BackupEnvironment
{
422 fn set_result_attrib(&mut self, name
: &str, value
: Value
) {
423 self.result_attributes
.insert(name
.into(), value
);
426 fn get_result_attrib(&self, name
: &str) -> Option
<&Value
> {
427 self.result_attributes
.get(name
)
430 fn env_type(&self) -> RpcEnvironmentType
{
434 fn set_user(&mut self, _user
: Option
<String
>) {
435 panic
!("unable to change user");
438 fn get_user(&self) -> Option
<String
> {
439 Some(self.user
.clone())
443 impl AsRef
<BackupEnvironment
> for RpcEnvironment
{
444 fn as_ref(&self) -> &BackupEnvironment
{
445 self.as_any().downcast_ref
::<BackupEnvironment
>().unwrap()
448 impl AsRef
<BackupEnvironment
> for Box
<RpcEnvironment
> {
449 fn as_ref(&self) -> &BackupEnvironment
{
450 self.as_any().downcast_ref
::<BackupEnvironment
>().unwrap()