]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup/environment.rs
update to nix 0.14, use code from proxmox:tools
[proxmox-backup.git] / src / api2 / backup / environment.rs
1 use failure::*;
2 use std::sync::{Arc, Mutex};
3 use std::collections::HashMap;
4
5 use serde_json::Value;
6
7 use proxmox::tools::{
8 digest_to_hex,
9 fs::file_set_contents,
10 };
11
12 use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType};
13 use crate::server::WorkerTask;
14 use crate::backup::*;
15 use crate::server::formatter::*;
16 use hyper::{Body, Response};
17
18 struct UploadStatistic {
19 count: u64,
20 size: u64,
21 compressed_size: u64,
22 duplicates: u64,
23 }
24
25 impl UploadStatistic {
26 fn new() -> Self {
27 Self {
28 count: 0,
29 size: 0,
30 compressed_size: 0,
31 duplicates: 0,
32 }
33 }
34 }
35
36 struct DynamicWriterState {
37 name: String,
38 index: DynamicIndexWriter,
39 offset: u64,
40 chunk_count: u64,
41 upload_stat: UploadStatistic,
42 }
43
44 struct FixedWriterState {
45 name: String,
46 index: FixedIndexWriter,
47 size: usize,
48 chunk_size: u32,
49 chunk_count: u64,
50 small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller)
51 upload_stat: UploadStatistic,
52 }
53
54 struct SharedBackupState {
55 finished: bool,
56 uid_counter: usize,
57 file_counter: usize, // sucessfully uploaded files
58 dynamic_writers: HashMap<usize, DynamicWriterState>,
59 fixed_writers: HashMap<usize, FixedWriterState>,
60 known_chunks: HashMap<[u8;32], u32>,
61 }
62
63 impl SharedBackupState {
64
65 // Raise error if finished flag is set
66 fn ensure_unfinished(&self) -> Result<(), Error> {
67 if self.finished {
68 bail!("backup already marked as finished.");
69 }
70 Ok(())
71 }
72
73 // Get an unique integer ID
74 pub fn next_uid(&mut self) -> usize {
75 self.uid_counter += 1;
76 self.uid_counter
77 }
78 }
79
80
81 /// `RpcEnvironmet` implementation for backup service
82 #[derive(Clone)]
83 pub struct BackupEnvironment {
84 env_type: RpcEnvironmentType,
85 result_attributes: HashMap<String, Value>,
86 user: String,
87 pub debug: bool,
88 pub formatter: &'static OutputFormatter,
89 pub worker: Arc<WorkerTask>,
90 pub datastore: Arc<DataStore>,
91 pub backup_dir: BackupDir,
92 pub last_backup: Option<BackupInfo>,
93 state: Arc<Mutex<SharedBackupState>>
94 }
95
96 impl BackupEnvironment {
97 pub fn new(
98 env_type: RpcEnvironmentType,
99 user: String,
100 worker: Arc<WorkerTask>,
101 datastore: Arc<DataStore>,
102 backup_dir: BackupDir,
103 ) -> Self {
104
105 let state = SharedBackupState {
106 finished: false,
107 uid_counter: 0,
108 file_counter: 0,
109 dynamic_writers: HashMap::new(),
110 fixed_writers: HashMap::new(),
111 known_chunks: HashMap::new(),
112 };
113
114 Self {
115 result_attributes: HashMap::new(),
116 env_type,
117 user,
118 worker,
119 datastore,
120 debug: false,
121 formatter: &JSON_FORMATTER,
122 backup_dir,
123 last_backup: None,
124 state: Arc::new(Mutex::new(state)),
125 }
126 }
127
128 /// Register a Chunk with associated length.
129 ///
130 /// We do not fully trust clients, so a client may only use registered
131 /// chunks. Please use this method to register chunks from previous backups.
132 pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
133 let mut state = self.state.lock().unwrap();
134
135 state.ensure_unfinished()?;
136
137 state.known_chunks.insert(digest, length);
138
139 Ok(())
140 }
141
142 /// Register fixed length chunks after upload.
143 ///
144 /// Like `register_chunk()`, but additionally record statistics for
145 /// the fixed index writer.
146 pub fn register_fixed_chunk(
147 &self,
148 wid: usize,
149 digest: [u8; 32],
150 size: u32,
151 compressed_size: u32,
152 is_duplicate: bool,
153 ) -> Result<(), Error> {
154 let mut state = self.state.lock().unwrap();
155
156 state.ensure_unfinished()?;
157
158 let mut data = match state.fixed_writers.get_mut(&wid) {
159 Some(data) => data,
160 None => bail!("fixed writer '{}' not registered", wid),
161 };
162
163 if size > data.chunk_size {
164 bail!("fixed writer '{}' - got large chunk ({} > {}", data.name, size, data.chunk_size);
165 } else if size < data.chunk_size {
166 data.small_chunk_count += 1;
167 if data.small_chunk_count > 1 {
168 bail!("fixed writer '{}' - detected multiple end chunks (chunk size too small)");
169 }
170 }
171
172 // record statistics
173 data.upload_stat.count += 1;
174 data.upload_stat.size += size as u64;
175 data.upload_stat.compressed_size += compressed_size as u64;
176 if is_duplicate { data.upload_stat.duplicates += 1; }
177
178 // register chunk
179 state.known_chunks.insert(digest, size);
180
181 Ok(())
182 }
183
184 /// Register dynamic length chunks after upload.
185 ///
186 /// Like `register_chunk()`, but additionally record statistics for
187 /// the dynamic index writer.
188 pub fn register_dynamic_chunk(
189 &self,
190 wid: usize,
191 digest: [u8; 32],
192 size: u32,
193 compressed_size: u32,
194 is_duplicate: bool,
195 ) -> Result<(), Error> {
196 let mut state = self.state.lock().unwrap();
197
198 state.ensure_unfinished()?;
199
200 let mut data = match state.dynamic_writers.get_mut(&wid) {
201 Some(data) => data,
202 None => bail!("dynamic writer '{}' not registered", wid),
203 };
204
205 // record statistics
206 data.upload_stat.count += 1;
207 data.upload_stat.size += size as u64;
208 data.upload_stat.compressed_size += compressed_size as u64;
209 if is_duplicate { data.upload_stat.duplicates += 1; }
210
211 // register chunk
212 state.known_chunks.insert(digest, size);
213
214 Ok(())
215 }
216
217 pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
218 let state = self.state.lock().unwrap();
219
220 match state.known_chunks.get(digest) {
221 Some(len) => Some(*len),
222 None => None,
223 }
224 }
225
226 /// Store the writer with an unique ID
227 pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
228 let mut state = self.state.lock().unwrap();
229
230 state.ensure_unfinished()?;
231
232 let uid = state.next_uid();
233
234 state.dynamic_writers.insert(uid, DynamicWriterState {
235 index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(),
236 });
237
238 Ok(uid)
239 }
240
241 /// Store the writer with an unique ID
242 pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result<usize, Error> {
243 let mut state = self.state.lock().unwrap();
244
245 state.ensure_unfinished()?;
246
247 let uid = state.next_uid();
248
249 state.fixed_writers.insert(uid, FixedWriterState {
250 index, name, chunk_count: 0, size, chunk_size, small_chunk_count: 0, upload_stat: UploadStatistic::new(),
251 });
252
253 Ok(uid)
254 }
255
256 /// Append chunk to dynamic writer
257 pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
258 let mut state = self.state.lock().unwrap();
259
260 state.ensure_unfinished()?;
261
262 let mut data = match state.dynamic_writers.get_mut(&wid) {
263 Some(data) => data,
264 None => bail!("dynamic writer '{}' not registered", wid),
265 };
266
267
268 if data.offset != offset {
269 bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
270 data.name, data.offset, offset);
271 }
272
273 data.offset += size as u64;
274 data.chunk_count += 1;
275
276 data.index.add_chunk(data.offset, digest)?;
277
278 Ok(())
279 }
280
281 /// Append chunk to fixed writer
282 pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
283 let mut state = self.state.lock().unwrap();
284
285 state.ensure_unfinished()?;
286
287 let mut data = match state.fixed_writers.get_mut(&wid) {
288 Some(data) => data,
289 None => bail!("fixed writer '{}' not registered", wid),
290 };
291
292 let end = (offset as usize) + (size as usize);
293 let idx = data.index.check_chunk_alignment(end, size as usize)?;
294
295 data.chunk_count += 1;
296
297 data.index.add_digest(idx, digest)?;
298
299 Ok(())
300 }
301
302 fn log_upload_stat(&self, archive_name: &str, csum: &[u8; 32], uuid: &[u8; 16], size: u64, chunk_count: u64, upload_stat: &UploadStatistic) {
303 self.log(format!("Upload statistics for '{}'", archive_name));
304 self.log(format!("UUID: {}", digest_to_hex(uuid)));
305 self.log(format!("Checksum: {}", digest_to_hex(csum)));
306 self.log(format!("Size: {}", size));
307 self.log(format!("Chunk count: {}", chunk_count));
308
309 if size == 0 || chunk_count == 0 {
310 return;
311 }
312
313 self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size));
314
315 let client_side_duplicates = chunk_count - upload_stat.count;
316 let server_side_duplicates = upload_stat.duplicates;
317
318 if (client_side_duplicates + server_side_duplicates) > 0 {
319 let per = (client_side_duplicates + server_side_duplicates)*100/chunk_count;
320 self.log(format!("Duplicates: {}+{} ({}%)", client_side_duplicates, server_side_duplicates, per));
321 }
322
323 if upload_stat.size > 0 {
324 self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size));
325 }
326 }
327
328 /// Close dynamic writer
329 pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
330 let mut state = self.state.lock().unwrap();
331
332 state.ensure_unfinished()?;
333
334 let mut data = match state.dynamic_writers.remove(&wid) {
335 Some(data) => data,
336 None => bail!("dynamic writer '{}' not registered", wid),
337 };
338
339 if data.chunk_count != chunk_count {
340 bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
341 }
342
343 if data.offset != size {
344 bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
345 }
346
347 let uuid = data.index.uuid;
348
349 let csum = data.index.close()?;
350
351 self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
352
353 state.file_counter += 1;
354
355 Ok(())
356 }
357
358 /// Close fixed writer
359 pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
360 let mut state = self.state.lock().unwrap();
361
362 state.ensure_unfinished()?;
363
364 let mut data = match state.fixed_writers.remove(&wid) {
365 Some(data) => data,
366 None => bail!("fixed writer '{}' not registered", wid),
367 };
368
369 if data.chunk_count != chunk_count {
370 bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
371 }
372
373 let expected_count = data.index.index_length();
374
375 if chunk_count != (expected_count as u64) {
376 bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count);
377 }
378
379 if size != (data.size as u64) {
380 bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
381 }
382
383 let uuid = data.index.uuid;
384
385 let csum = data.index.close()?;
386
387 self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
388
389 state.file_counter += 1;
390
391 Ok(())
392 }
393
394 pub fn add_blob(&self, file_name: &str, data: Vec<u8>) -> Result<(), Error> {
395
396 let mut path = self.datastore.base_path();
397 path.push(self.backup_dir.relative_path());
398 path.push(file_name);
399
400 let blob_len = data.len();
401 let orig_len = data.len(); // fixme:
402
403 let mut blob = DataBlob::from_raw(data)?;
404 // always comput CRC at server side
405 blob.set_crc(blob.compute_crc());
406
407 let raw_data = blob.raw_data();
408 file_set_contents(&path, raw_data, None)?;
409
410 self.log(format!("add blob {:?} ({} bytes, comp: {})", path, orig_len, blob_len));
411
412 let mut state = self.state.lock().unwrap();
413 state.file_counter += 1;
414
415 Ok(())
416 }
417
418 /// Mark backup as finished
419 pub fn finish_backup(&self) -> Result<(), Error> {
420 let mut state = self.state.lock().unwrap();
421 // test if all writer are correctly closed
422
423 state.ensure_unfinished()?;
424
425 state.finished = true;
426
427 if state.dynamic_writers.len() != 0 {
428 bail!("found open index writer - unable to finish backup");
429 }
430
431 if state.file_counter == 0 {
432 bail!("backup does not contain valid files (file count == 0)");
433 }
434
435 Ok(())
436 }
437
438 pub fn log<S: AsRef<str>>(&self, msg: S) {
439 self.worker.log(msg);
440 }
441
442 pub fn debug<S: AsRef<str>>(&self, msg: S) {
443 if self.debug { self.worker.log(msg); }
444 }
445
446 pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
447 match result {
448 Ok(data) => (self.formatter.format_data)(data, self),
449 Err(err) => (self.formatter.format_error)(err),
450 }
451 }
452
453 /// Raise error if finished flag is not set
454 pub fn ensure_finished(&self) -> Result<(), Error> {
455 let state = self.state.lock().unwrap();
456 if !state.finished {
457 bail!("backup ended but finished flag is not set.");
458 }
459 Ok(())
460 }
461
462 /// Remove complete backup
463 pub fn remove_backup(&self) -> Result<(), Error> {
464 let mut state = self.state.lock().unwrap();
465 state.finished = true;
466
467 self.datastore.remove_backup_dir(&self.backup_dir)?;
468
469 Ok(())
470 }
471 }
472
473 impl RpcEnvironment for BackupEnvironment {
474
475 fn set_result_attrib(&mut self, name: &str, value: Value) {
476 self.result_attributes.insert(name.into(), value);
477 }
478
479 fn get_result_attrib(&self, name: &str) -> Option<&Value> {
480 self.result_attributes.get(name)
481 }
482
483 fn env_type(&self) -> RpcEnvironmentType {
484 self.env_type
485 }
486
487 fn set_user(&mut self, _user: Option<String>) {
488 panic!("unable to change user");
489 }
490
491 fn get_user(&self) -> Option<String> {
492 Some(self.user.clone())
493 }
494 }
495
496 impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
497 fn as_ref(&self) -> &BackupEnvironment {
498 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
499 }
500 }
501
502 impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
503 fn as_ref(&self) -> &BackupEnvironment {
504 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
505 }
506 }