]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/backup/environment.rs
src/backup/chunk_store.rs: coding style fixes
[proxmox-backup.git] / src / api2 / backup / environment.rs
CommitLineData
b4b63e52 1use failure::*;
f9578f3c 2use std::sync::{Arc, Mutex};
d95ced64
DM
3use std::collections::HashMap;
4
5use serde_json::Value;
6
bffd40d6 7use proxmox::tools;
00388226 8
d95ced64
DM
9use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType};
10use crate::server::WorkerTask;
21ee7912 11use crate::backup::*;
b4b63e52
DM
12use crate::server::formatter::*;
13use hyper::{Body, Response};
d95ced64 14
96482891
DM
15struct UploadStatistic {
16 count: u64,
17 size: u64,
18 compressed_size: u64,
19 duplicates: u64,
20}
21
22impl UploadStatistic {
23 fn new() -> Self {
24 Self {
25 count: 0,
26 size: 0,
27 compressed_size: 0,
28 duplicates: 0,
29 }
30 }
31}
8bea85b4
DM
32
33struct DynamicWriterState {
34 name: String,
35 index: DynamicIndexWriter,
36 offset: u64,
37 chunk_count: u64,
96482891 38 upload_stat: UploadStatistic,
8bea85b4
DM
39}
40
a42fa400
DM
41struct FixedWriterState {
42 name: String,
43 index: FixedIndexWriter,
44 size: usize,
45 chunk_size: u32,
46 chunk_count: u64,
96482891 47 upload_stat: UploadStatistic,
a42fa400
DM
48}
49
f9578f3c 50struct SharedBackupState {
372724af 51 finished: bool,
f9578f3c 52 uid_counter: usize,
e6389f4e 53 file_counter: usize, // sucessfully uploaded files
8bea85b4 54 dynamic_writers: HashMap<usize, DynamicWriterState>,
a42fa400 55 fixed_writers: HashMap<usize, FixedWriterState>,
a09c0e38 56 known_chunks: HashMap<[u8;32], u32>,
f9578f3c
DM
57}
58
372724af
DM
59impl SharedBackupState {
60
61 // Raise error if finished flag is set
62 fn ensure_unfinished(&self) -> Result<(), Error> {
63 if self.finished {
64 bail!("backup already marked as finished.");
65 }
66 Ok(())
67 }
68
69 // Get an unique integer ID
70 pub fn next_uid(&mut self) -> usize {
71 self.uid_counter += 1;
72 self.uid_counter
73 }
74}
75
76
d95ced64
DM
77/// `RpcEnvironmet` implementation for backup service
78#[derive(Clone)]
79pub struct BackupEnvironment {
80 env_type: RpcEnvironmentType,
81 result_attributes: HashMap<String, Value>,
82 user: String,
a42d1f55 83 pub debug: bool,
b4b63e52 84 pub formatter: &'static OutputFormatter,
21ee7912
DM
85 pub worker: Arc<WorkerTask>,
86 pub datastore: Arc<DataStore>,
f9578f3c 87 pub backup_dir: BackupDir,
b02a52e3 88 pub last_backup: Option<BackupInfo>,
f9578f3c 89 state: Arc<Mutex<SharedBackupState>>
d95ced64
DM
90}
91
92impl BackupEnvironment {
f9578f3c
DM
93 pub fn new(
94 env_type: RpcEnvironmentType,
95 user: String,
96 worker: Arc<WorkerTask>,
97 datastore: Arc<DataStore>,
98 backup_dir: BackupDir,
f9578f3c
DM
99 ) -> Self {
100
101 let state = SharedBackupState {
372724af 102 finished: false,
f9578f3c 103 uid_counter: 0,
e6389f4e 104 file_counter: 0,
f9578f3c 105 dynamic_writers: HashMap::new(),
a42fa400 106 fixed_writers: HashMap::new(),
a09c0e38 107 known_chunks: HashMap::new(),
f9578f3c
DM
108 };
109
d95ced64
DM
110 Self {
111 result_attributes: HashMap::new(),
112 env_type,
113 user,
114 worker,
21ee7912 115 datastore,
a42d1f55 116 debug: false,
b4b63e52 117 formatter: &JSON_FORMATTER,
f9578f3c 118 backup_dir,
b02a52e3 119 last_backup: None,
f9578f3c 120 state: Arc::new(Mutex::new(state)),
d95ced64
DM
121 }
122 }
123
96482891
DM
124 /// Register a Chunk with associated length.
125 ///
126 /// We do not fully trust clients, so a client may only use registered
127 /// chunks. Please use this method to register chunks from previous backups.
a09c0e38
DM
128 pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
129 let mut state = self.state.lock().unwrap();
130
131 state.ensure_unfinished()?;
132
133 state.known_chunks.insert(digest, length);
134
135 Ok(())
136 }
137
96482891
DM
138 /// Register fixed length chunks after upload.
139 ///
140 /// Like `register_chunk()`, but additionally record statistics for
141 /// the fixed index writer.
642322b4
DM
142 pub fn register_fixed_chunk(
143 &self,
144 wid: usize,
145 digest: [u8; 32],
146 size: u32,
147 compressed_size: u32,
148 is_duplicate: bool,
149 ) -> Result<(), Error> {
150 let mut state = self.state.lock().unwrap();
151
152 state.ensure_unfinished()?;
153
154 let mut data = match state.fixed_writers.get_mut(&wid) {
155 Some(data) => data,
156 None => bail!("fixed writer '{}' not registered", wid),
157 };
158
159 if size != data.chunk_size {
160 bail!("fixed writer '{}' - got unexpected chunk size ({} != {}", data.name, size, data.chunk_size);
161 }
162
96482891
DM
163 // record statistics
164 data.upload_stat.count += 1;
165 data.upload_stat.size += size as u64;
166 data.upload_stat.compressed_size += compressed_size as u64;
167 if is_duplicate { data.upload_stat.duplicates += 1; }
168
169 // register chunk
642322b4
DM
170 state.known_chunks.insert(digest, size);
171
172 Ok(())
173 }
174
96482891
DM
175 /// Register dynamic length chunks after upload.
176 ///
177 /// Like `register_chunk()`, but additionally record statistics for
178 /// the dynamic index writer.
642322b4
DM
179 pub fn register_dynamic_chunk(
180 &self,
181 wid: usize,
182 digest: [u8; 32],
183 size: u32,
184 compressed_size: u32,
185 is_duplicate: bool,
186 ) -> Result<(), Error> {
187 let mut state = self.state.lock().unwrap();
188
189 state.ensure_unfinished()?;
190
191 let mut data = match state.dynamic_writers.get_mut(&wid) {
192 Some(data) => data,
193 None => bail!("dynamic writer '{}' not registered", wid),
194 };
195
96482891
DM
196 // record statistics
197 data.upload_stat.count += 1;
198 data.upload_stat.size += size as u64;
199 data.upload_stat.compressed_size += compressed_size as u64;
200 if is_duplicate { data.upload_stat.duplicates += 1; }
201
202 // register chunk
642322b4
DM
203 state.known_chunks.insert(digest, size);
204
205 Ok(())
206 }
207
a09c0e38
DM
208 pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
209 let state = self.state.lock().unwrap();
210
211 match state.known_chunks.get(digest) {
212 Some(len) => Some(*len),
213 None => None,
214 }
215 }
216
372724af 217 /// Store the writer with an unique ID
8bea85b4 218 pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
f9578f3c 219 let mut state = self.state.lock().unwrap();
f9578f3c 220
372724af
DM
221 state.ensure_unfinished()?;
222
223 let uid = state.next_uid();
f9578f3c 224
8bea85b4 225 state.dynamic_writers.insert(uid, DynamicWriterState {
96482891 226 index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(),
8bea85b4 227 });
372724af
DM
228
229 Ok(uid)
f9578f3c
DM
230 }
231
a42fa400
DM
232 /// Store the writer with an unique ID
233 pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result<usize, Error> {
234 let mut state = self.state.lock().unwrap();
235
236 state.ensure_unfinished()?;
237
238 let uid = state.next_uid();
239
240 state.fixed_writers.insert(uid, FixedWriterState {
96482891 241 index, name, chunk_count: 0, size, chunk_size, upload_stat: UploadStatistic::new(),
a42fa400
DM
242 });
243
244 Ok(uid)
245 }
246
f9578f3c 247 /// Append chunk to dynamic writer
417cb073 248 pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
f9578f3c
DM
249 let mut state = self.state.lock().unwrap();
250
372724af
DM
251 state.ensure_unfinished()?;
252
f9578f3c
DM
253 let mut data = match state.dynamic_writers.get_mut(&wid) {
254 Some(data) => data,
255 None => bail!("dynamic writer '{}' not registered", wid),
256 };
257
f9578f3c 258
417cb073
DM
259 if data.offset != offset {
260 bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
261 data.name, data.offset, offset);
262 }
263
3dc5b2a2
DM
264 data.offset += size as u64;
265 data.chunk_count += 1;
266
8bea85b4 267 data.index.add_chunk(data.offset, digest)?;
f9578f3c
DM
268
269 Ok(())
270 }
271
a42fa400
DM
272 /// Append chunk to fixed writer
273 pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
274 let mut state = self.state.lock().unwrap();
275
276 state.ensure_unfinished()?;
277
278 let mut data = match state.fixed_writers.get_mut(&wid) {
279 Some(data) => data,
280 None => bail!("fixed writer '{}' not registered", wid),
281 };
282
283 data.chunk_count += 1;
284
285 if size != data.chunk_size {
286 bail!("fixed writer '{}' - got unexpected chunk size ({} != {}", data.name, size, data.chunk_size);
287 }
288
289 let pos = (offset as usize)/(data.chunk_size as usize);
290 data.index.add_digest(pos, digest)?;
291
292 Ok(())
293 }
294
00388226 295 fn log_upload_stat(&self, archive_name: &str, csum: &[u8; 32], uuid: &[u8; 16], size: u64, chunk_count: u64, upload_stat: &UploadStatistic) {
96482891 296 self.log(format!("Upload statistics for '{}'", archive_name));
00388226
DM
297 self.log(format!("UUID: {}", tools::digest_to_hex(uuid)));
298 self.log(format!("Checksum: {}", tools::digest_to_hex(csum)));
96482891
DM
299 self.log(format!("Size: {}", size));
300 self.log(format!("Chunk count: {}", chunk_count));
36075475
DM
301
302 if size == 0 || chunk_count == 0 {
303 return;
304 }
305
96482891 306 self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size));
36075475
DM
307
308 let client_side_duplicates = chunk_count - upload_stat.count;
309 let server_side_duplicates = upload_stat.duplicates;
310
311 if (client_side_duplicates + server_side_duplicates) > 0 {
312 let per = (client_side_duplicates + server_side_duplicates)*100/chunk_count;
313 self.log(format!("Duplicates: {}+{} ({}%)", client_side_duplicates, server_side_duplicates, per));
314 }
315
96482891 316 if upload_stat.size > 0 {
36075475 317 self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size));
96482891
DM
318 }
319 }
320
a2077252 321 /// Close dynamic writer
8bea85b4 322 pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
a2077252
DM
323 let mut state = self.state.lock().unwrap();
324
372724af
DM
325 state.ensure_unfinished()?;
326
a2077252
DM
327 let mut data = match state.dynamic_writers.remove(&wid) {
328 Some(data) => data,
329 None => bail!("dynamic writer '{}' not registered", wid),
330 };
331
8bea85b4
DM
332 if data.chunk_count != chunk_count {
333 bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
334 }
335
336 if data.offset != size {
337 bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
338 }
339
00388226
DM
340 let uuid = data.index.uuid;
341
342 let csum = data.index.close()?;
a2077252 343
00388226 344 self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 345
e6389f4e
DM
346 state.file_counter += 1;
347
a2077252
DM
348 Ok(())
349 }
350
a42fa400
DM
351 /// Close fixed writer
352 pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
353 let mut state = self.state.lock().unwrap();
354
355 state.ensure_unfinished()?;
356
357 let mut data = match state.fixed_writers.remove(&wid) {
358 Some(data) => data,
359 None => bail!("fixed writer '{}' not registered", wid),
360 };
361
362 if data.chunk_count != chunk_count {
006f3ff4 363 bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
a42fa400
DM
364 }
365
006f3ff4
DM
366 let expected_count = data.index.index_length();
367
368 if chunk_count != (expected_count as u64) {
369 bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count);
370 }
371
372 if size != (data.size as u64) {
373 bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
374 }
a42fa400 375
00388226
DM
376 let uuid = data.index.uuid;
377
378 let csum = data.index.close()?;
a42fa400 379
00388226 380 self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 381
e6389f4e
DM
382 state.file_counter += 1;
383
a42fa400
DM
384 Ok(())
385 }
386
46bd8800
DM
387 pub fn add_blob(&self, file_name: &str, data: Vec<u8>) -> Result<(), Error> {
388
389 let mut path = self.datastore.base_path();
390 path.push(self.backup_dir.relative_path());
391 path.push(file_name);
392
393 let blob_len = data.len();
394 let orig_len = data.len(); // fixme:
395
396 let mut blob = DataBlob::from_raw(data)?;
397 // always comput CRC at server side
398 blob.set_crc(blob.compute_crc());
399
400 let raw_data = blob.raw_data();
401 crate::tools::file_set_contents(&path, raw_data, None)?;
402
403 self.log(format!("add blob {:?} ({} bytes, comp: {})", path, orig_len, blob_len));
404
405 let mut state = self.state.lock().unwrap();
406 state.file_counter += 1;
407
408 Ok(())
409 }
410
372724af
DM
411 /// Mark backup as finished
412 pub fn finish_backup(&self) -> Result<(), Error> {
413 let mut state = self.state.lock().unwrap();
414 // test if all writer are correctly closed
415
416 state.ensure_unfinished()?;
417
418 state.finished = true;
419
420 if state.dynamic_writers.len() != 0 {
421 bail!("found open index writer - unable to finish backup");
422 }
423
e6389f4e
DM
424 if state.file_counter == 0 {
425 bail!("backup does not contain valid files (file count == 0)");
426 }
427
372724af
DM
428 Ok(())
429 }
430
d95ced64
DM
431 pub fn log<S: AsRef<str>>(&self, msg: S) {
432 self.worker.log(msg);
433 }
b4b63e52 434
a42d1f55
DM
435 pub fn debug<S: AsRef<str>>(&self, msg: S) {
436 if self.debug { self.worker.log(msg); }
437 }
438
b4b63e52
DM
439 pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
440 match result {
441 Ok(data) => (self.formatter.format_data)(data, self),
442 Err(err) => (self.formatter.format_error)(err),
443 }
444 }
372724af
DM
445
446 /// Raise error if finished flag is not set
447 pub fn ensure_finished(&self) -> Result<(), Error> {
448 let state = self.state.lock().unwrap();
449 if !state.finished {
450 bail!("backup ended but finished flag is not set.");
451 }
452 Ok(())
453 }
454
455 /// Remove complete backup
456 pub fn remove_backup(&self) -> Result<(), Error> {
457 let mut state = self.state.lock().unwrap();
458 state.finished = true;
459
460 self.datastore.remove_backup_dir(&self.backup_dir)?;
461
462 Ok(())
463 }
d95ced64
DM
464}
465
466impl RpcEnvironment for BackupEnvironment {
467
468 fn set_result_attrib(&mut self, name: &str, value: Value) {
469 self.result_attributes.insert(name.into(), value);
470 }
471
472 fn get_result_attrib(&self, name: &str) -> Option<&Value> {
473 self.result_attributes.get(name)
474 }
475
476 fn env_type(&self) -> RpcEnvironmentType {
477 self.env_type
478 }
479
480 fn set_user(&mut self, _user: Option<String>) {
481 panic!("unable to change user");
482 }
483
484 fn get_user(&self) -> Option<String> {
485 Some(self.user.clone())
486 }
487}
21ee7912 488
dd5495d6 489impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
21ee7912
DM
490 fn as_ref(&self) -> &BackupEnvironment {
491 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
492 }
493}
dd5495d6
WB
494
495impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
b4b63e52
DM
496 fn as_ref(&self) -> &BackupEnvironment {
497 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
498 }
499}