]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/backup/environment.rs
use new proxmox::sys::linux::magic
[proxmox-backup.git] / src / api2 / backup / environment.rs
CommitLineData
b4b63e52 1use failure::*;
f9578f3c 2use std::sync::{Arc, Mutex};
d95ced64
DM
3use std::collections::HashMap;
4
5use serde_json::Value;
6
bffd40d6 7use proxmox::tools;
00388226 8
d95ced64
DM
9use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType};
10use crate::server::WorkerTask;
21ee7912 11use crate::backup::*;
b4b63e52
DM
12use crate::server::formatter::*;
13use hyper::{Body, Response};
d95ced64 14
96482891
DM
15struct UploadStatistic {
16 count: u64,
17 size: u64,
18 compressed_size: u64,
19 duplicates: u64,
20}
21
22impl UploadStatistic {
23 fn new() -> Self {
24 Self {
25 count: 0,
26 size: 0,
27 compressed_size: 0,
28 duplicates: 0,
29 }
30 }
31}
8bea85b4
DM
32
33struct DynamicWriterState {
34 name: String,
35 index: DynamicIndexWriter,
36 offset: u64,
37 chunk_count: u64,
96482891 38 upload_stat: UploadStatistic,
8bea85b4
DM
39}
40
a42fa400
DM
41struct FixedWriterState {
42 name: String,
43 index: FixedIndexWriter,
44 size: usize,
45 chunk_size: u32,
46 chunk_count: u64,
5e04ec70 47 small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller)
96482891 48 upload_stat: UploadStatistic,
a42fa400
DM
49}
50
f9578f3c 51struct SharedBackupState {
372724af 52 finished: bool,
f9578f3c 53 uid_counter: usize,
e6389f4e 54 file_counter: usize, // sucessfully uploaded files
8bea85b4 55 dynamic_writers: HashMap<usize, DynamicWriterState>,
a42fa400 56 fixed_writers: HashMap<usize, FixedWriterState>,
a09c0e38 57 known_chunks: HashMap<[u8;32], u32>,
f9578f3c
DM
58}
59
372724af
DM
60impl SharedBackupState {
61
62 // Raise error if finished flag is set
63 fn ensure_unfinished(&self) -> Result<(), Error> {
64 if self.finished {
65 bail!("backup already marked as finished.");
66 }
67 Ok(())
68 }
69
70 // Get an unique integer ID
71 pub fn next_uid(&mut self) -> usize {
72 self.uid_counter += 1;
73 self.uid_counter
74 }
75}
76
77
d95ced64
DM
78/// `RpcEnvironmet` implementation for backup service
79#[derive(Clone)]
80pub struct BackupEnvironment {
81 env_type: RpcEnvironmentType,
82 result_attributes: HashMap<String, Value>,
83 user: String,
a42d1f55 84 pub debug: bool,
b4b63e52 85 pub formatter: &'static OutputFormatter,
21ee7912
DM
86 pub worker: Arc<WorkerTask>,
87 pub datastore: Arc<DataStore>,
f9578f3c 88 pub backup_dir: BackupDir,
b02a52e3 89 pub last_backup: Option<BackupInfo>,
f9578f3c 90 state: Arc<Mutex<SharedBackupState>>
d95ced64
DM
91}
92
93impl BackupEnvironment {
f9578f3c
DM
94 pub fn new(
95 env_type: RpcEnvironmentType,
96 user: String,
97 worker: Arc<WorkerTask>,
98 datastore: Arc<DataStore>,
99 backup_dir: BackupDir,
f9578f3c
DM
100 ) -> Self {
101
102 let state = SharedBackupState {
372724af 103 finished: false,
f9578f3c 104 uid_counter: 0,
e6389f4e 105 file_counter: 0,
f9578f3c 106 dynamic_writers: HashMap::new(),
a42fa400 107 fixed_writers: HashMap::new(),
a09c0e38 108 known_chunks: HashMap::new(),
f9578f3c
DM
109 };
110
d95ced64
DM
111 Self {
112 result_attributes: HashMap::new(),
113 env_type,
114 user,
115 worker,
21ee7912 116 datastore,
a42d1f55 117 debug: false,
b4b63e52 118 formatter: &JSON_FORMATTER,
f9578f3c 119 backup_dir,
b02a52e3 120 last_backup: None,
f9578f3c 121 state: Arc::new(Mutex::new(state)),
d95ced64
DM
122 }
123 }
124
96482891
DM
125 /// Register a Chunk with associated length.
126 ///
127 /// We do not fully trust clients, so a client may only use registered
128 /// chunks. Please use this method to register chunks from previous backups.
a09c0e38
DM
129 pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
130 let mut state = self.state.lock().unwrap();
131
132 state.ensure_unfinished()?;
133
134 state.known_chunks.insert(digest, length);
135
136 Ok(())
137 }
138
96482891
DM
139 /// Register fixed length chunks after upload.
140 ///
141 /// Like `register_chunk()`, but additionally record statistics for
142 /// the fixed index writer.
642322b4
DM
143 pub fn register_fixed_chunk(
144 &self,
145 wid: usize,
146 digest: [u8; 32],
147 size: u32,
148 compressed_size: u32,
149 is_duplicate: bool,
150 ) -> Result<(), Error> {
151 let mut state = self.state.lock().unwrap();
152
153 state.ensure_unfinished()?;
154
155 let mut data = match state.fixed_writers.get_mut(&wid) {
156 Some(data) => data,
157 None => bail!("fixed writer '{}' not registered", wid),
158 };
159
5e04ec70
DM
160 if size > data.chunk_size {
161 bail!("fixed writer '{}' - got large chunk ({} > {}", data.name, size, data.chunk_size);
162 } else if size < data.chunk_size {
163 data.small_chunk_count += 1;
164 if data.small_chunk_count > 1 {
165 bail!("fixed writer '{}' - detected multiple end chunks (chunk size too small)");
166 }
642322b4
DM
167 }
168
96482891
DM
169 // record statistics
170 data.upload_stat.count += 1;
171 data.upload_stat.size += size as u64;
172 data.upload_stat.compressed_size += compressed_size as u64;
173 if is_duplicate { data.upload_stat.duplicates += 1; }
174
175 // register chunk
642322b4
DM
176 state.known_chunks.insert(digest, size);
177
178 Ok(())
179 }
180
96482891
DM
181 /// Register dynamic length chunks after upload.
182 ///
183 /// Like `register_chunk()`, but additionally record statistics for
184 /// the dynamic index writer.
642322b4
DM
185 pub fn register_dynamic_chunk(
186 &self,
187 wid: usize,
188 digest: [u8; 32],
189 size: u32,
190 compressed_size: u32,
191 is_duplicate: bool,
192 ) -> Result<(), Error> {
193 let mut state = self.state.lock().unwrap();
194
195 state.ensure_unfinished()?;
196
197 let mut data = match state.dynamic_writers.get_mut(&wid) {
198 Some(data) => data,
199 None => bail!("dynamic writer '{}' not registered", wid),
200 };
201
96482891
DM
202 // record statistics
203 data.upload_stat.count += 1;
204 data.upload_stat.size += size as u64;
205 data.upload_stat.compressed_size += compressed_size as u64;
206 if is_duplicate { data.upload_stat.duplicates += 1; }
207
208 // register chunk
642322b4
DM
209 state.known_chunks.insert(digest, size);
210
211 Ok(())
212 }
213
a09c0e38
DM
214 pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
215 let state = self.state.lock().unwrap();
216
217 match state.known_chunks.get(digest) {
218 Some(len) => Some(*len),
219 None => None,
220 }
221 }
222
372724af 223 /// Store the writer with an unique ID
8bea85b4 224 pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
f9578f3c 225 let mut state = self.state.lock().unwrap();
f9578f3c 226
372724af
DM
227 state.ensure_unfinished()?;
228
229 let uid = state.next_uid();
f9578f3c 230
8bea85b4 231 state.dynamic_writers.insert(uid, DynamicWriterState {
96482891 232 index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(),
8bea85b4 233 });
372724af
DM
234
235 Ok(uid)
f9578f3c
DM
236 }
237
a42fa400
DM
238 /// Store the writer with an unique ID
239 pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result<usize, Error> {
240 let mut state = self.state.lock().unwrap();
241
242 state.ensure_unfinished()?;
243
244 let uid = state.next_uid();
245
246 state.fixed_writers.insert(uid, FixedWriterState {
5e04ec70 247 index, name, chunk_count: 0, size, chunk_size, small_chunk_count: 0, upload_stat: UploadStatistic::new(),
a42fa400
DM
248 });
249
250 Ok(uid)
251 }
252
f9578f3c 253 /// Append chunk to dynamic writer
417cb073 254 pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
f9578f3c
DM
255 let mut state = self.state.lock().unwrap();
256
372724af
DM
257 state.ensure_unfinished()?;
258
f9578f3c
DM
259 let mut data = match state.dynamic_writers.get_mut(&wid) {
260 Some(data) => data,
261 None => bail!("dynamic writer '{}' not registered", wid),
262 };
263
f9578f3c 264
417cb073
DM
265 if data.offset != offset {
266 bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
267 data.name, data.offset, offset);
268 }
269
3dc5b2a2
DM
270 data.offset += size as u64;
271 data.chunk_count += 1;
272
8bea85b4 273 data.index.add_chunk(data.offset, digest)?;
f9578f3c
DM
274
275 Ok(())
276 }
277
a42fa400
DM
278 /// Append chunk to fixed writer
279 pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
280 let mut state = self.state.lock().unwrap();
281
282 state.ensure_unfinished()?;
283
284 let mut data = match state.fixed_writers.get_mut(&wid) {
285 Some(data) => data,
286 None => bail!("fixed writer '{}' not registered", wid),
287 };
288
5e04ec70
DM
289 let end = (offset as usize) + (size as usize);
290 let idx = data.index.check_chunk_alignment(end, size as usize)?;
a42fa400 291
5e04ec70 292 data.chunk_count += 1;
a42fa400 293
5e04ec70 294 data.index.add_digest(idx, digest)?;
a42fa400
DM
295
296 Ok(())
297 }
298
00388226 299 fn log_upload_stat(&self, archive_name: &str, csum: &[u8; 32], uuid: &[u8; 16], size: u64, chunk_count: u64, upload_stat: &UploadStatistic) {
96482891 300 self.log(format!("Upload statistics for '{}'", archive_name));
00388226
DM
301 self.log(format!("UUID: {}", tools::digest_to_hex(uuid)));
302 self.log(format!("Checksum: {}", tools::digest_to_hex(csum)));
96482891
DM
303 self.log(format!("Size: {}", size));
304 self.log(format!("Chunk count: {}", chunk_count));
36075475
DM
305
306 if size == 0 || chunk_count == 0 {
307 return;
308 }
309
96482891 310 self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size));
36075475
DM
311
312 let client_side_duplicates = chunk_count - upload_stat.count;
313 let server_side_duplicates = upload_stat.duplicates;
314
315 if (client_side_duplicates + server_side_duplicates) > 0 {
316 let per = (client_side_duplicates + server_side_duplicates)*100/chunk_count;
317 self.log(format!("Duplicates: {}+{} ({}%)", client_side_duplicates, server_side_duplicates, per));
318 }
319
96482891 320 if upload_stat.size > 0 {
36075475 321 self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size));
96482891
DM
322 }
323 }
324
a2077252 325 /// Close dynamic writer
8bea85b4 326 pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
a2077252
DM
327 let mut state = self.state.lock().unwrap();
328
372724af
DM
329 state.ensure_unfinished()?;
330
a2077252
DM
331 let mut data = match state.dynamic_writers.remove(&wid) {
332 Some(data) => data,
333 None => bail!("dynamic writer '{}' not registered", wid),
334 };
335
8bea85b4
DM
336 if data.chunk_count != chunk_count {
337 bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
338 }
339
340 if data.offset != size {
341 bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
342 }
343
00388226
DM
344 let uuid = data.index.uuid;
345
346 let csum = data.index.close()?;
a2077252 347
00388226 348 self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 349
e6389f4e
DM
350 state.file_counter += 1;
351
a2077252
DM
352 Ok(())
353 }
354
a42fa400
DM
355 /// Close fixed writer
356 pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
357 let mut state = self.state.lock().unwrap();
358
359 state.ensure_unfinished()?;
360
361 let mut data = match state.fixed_writers.remove(&wid) {
362 Some(data) => data,
363 None => bail!("fixed writer '{}' not registered", wid),
364 };
365
366 if data.chunk_count != chunk_count {
006f3ff4 367 bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
a42fa400
DM
368 }
369
006f3ff4
DM
370 let expected_count = data.index.index_length();
371
372 if chunk_count != (expected_count as u64) {
373 bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count);
374 }
375
376 if size != (data.size as u64) {
377 bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
378 }
a42fa400 379
00388226
DM
380 let uuid = data.index.uuid;
381
382 let csum = data.index.close()?;
a42fa400 383
00388226 384 self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 385
e6389f4e
DM
386 state.file_counter += 1;
387
a42fa400
DM
388 Ok(())
389 }
390
46bd8800
DM
391 pub fn add_blob(&self, file_name: &str, data: Vec<u8>) -> Result<(), Error> {
392
393 let mut path = self.datastore.base_path();
394 path.push(self.backup_dir.relative_path());
395 path.push(file_name);
396
397 let blob_len = data.len();
398 let orig_len = data.len(); // fixme:
399
400 let mut blob = DataBlob::from_raw(data)?;
401 // always comput CRC at server side
402 blob.set_crc(blob.compute_crc());
403
404 let raw_data = blob.raw_data();
405 crate::tools::file_set_contents(&path, raw_data, None)?;
406
407 self.log(format!("add blob {:?} ({} bytes, comp: {})", path, orig_len, blob_len));
408
409 let mut state = self.state.lock().unwrap();
410 state.file_counter += 1;
411
412 Ok(())
413 }
414
372724af
DM
415 /// Mark backup as finished
416 pub fn finish_backup(&self) -> Result<(), Error> {
417 let mut state = self.state.lock().unwrap();
418 // test if all writer are correctly closed
419
420 state.ensure_unfinished()?;
421
422 state.finished = true;
423
424 if state.dynamic_writers.len() != 0 {
425 bail!("found open index writer - unable to finish backup");
426 }
427
e6389f4e
DM
428 if state.file_counter == 0 {
429 bail!("backup does not contain valid files (file count == 0)");
430 }
431
372724af
DM
432 Ok(())
433 }
434
d95ced64
DM
435 pub fn log<S: AsRef<str>>(&self, msg: S) {
436 self.worker.log(msg);
437 }
b4b63e52 438
a42d1f55
DM
439 pub fn debug<S: AsRef<str>>(&self, msg: S) {
440 if self.debug { self.worker.log(msg); }
441 }
442
b4b63e52
DM
443 pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
444 match result {
445 Ok(data) => (self.formatter.format_data)(data, self),
446 Err(err) => (self.formatter.format_error)(err),
447 }
448 }
372724af
DM
449
450 /// Raise error if finished flag is not set
451 pub fn ensure_finished(&self) -> Result<(), Error> {
452 let state = self.state.lock().unwrap();
453 if !state.finished {
454 bail!("backup ended but finished flag is not set.");
455 }
456 Ok(())
457 }
458
459 /// Remove complete backup
460 pub fn remove_backup(&self) -> Result<(), Error> {
461 let mut state = self.state.lock().unwrap();
462 state.finished = true;
463
464 self.datastore.remove_backup_dir(&self.backup_dir)?;
465
466 Ok(())
467 }
d95ced64
DM
468}
469
470impl RpcEnvironment for BackupEnvironment {
471
472 fn set_result_attrib(&mut self, name: &str, value: Value) {
473 self.result_attributes.insert(name.into(), value);
474 }
475
476 fn get_result_attrib(&self, name: &str) -> Option<&Value> {
477 self.result_attributes.get(name)
478 }
479
480 fn env_type(&self) -> RpcEnvironmentType {
481 self.env_type
482 }
483
484 fn set_user(&mut self, _user: Option<String>) {
485 panic!("unable to change user");
486 }
487
488 fn get_user(&self) -> Option<String> {
489 Some(self.user.clone())
490 }
491}
21ee7912 492
dd5495d6 493impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
21ee7912
DM
494 fn as_ref(&self) -> &BackupEnvironment {
495 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
496 }
497}
dd5495d6
WB
498
499impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
b4b63e52
DM
500 fn as_ref(&self) -> &BackupEnvironment {
501 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
502 }
503}