]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup/environment.rs
switch from failure to anyhow
[proxmox-backup.git] / src / api2 / backup / environment.rs
1 use anyhow::{bail, Error};
2 use std::sync::{Arc, Mutex};
3 use std::collections::HashMap;
4
5 use serde_json::Value;
6
7 use proxmox::tools::digest_to_hex;
8 use proxmox::tools::fs::{replace_file, CreateOptions};
9 use proxmox::api::{RpcEnvironment, RpcEnvironmentType};
10
11 use crate::server::WorkerTask;
12 use crate::backup::*;
13 use crate::server::formatter::*;
14 use hyper::{Body, Response};
15
16 struct UploadStatistic {
17 count: u64,
18 size: u64,
19 compressed_size: u64,
20 duplicates: u64,
21 }
22
23 impl UploadStatistic {
24 fn new() -> Self {
25 Self {
26 count: 0,
27 size: 0,
28 compressed_size: 0,
29 duplicates: 0,
30 }
31 }
32 }
33
34 struct DynamicWriterState {
35 name: String,
36 index: DynamicIndexWriter,
37 offset: u64,
38 chunk_count: u64,
39 upload_stat: UploadStatistic,
40 }
41
42 struct FixedWriterState {
43 name: String,
44 index: FixedIndexWriter,
45 size: usize,
46 chunk_size: u32,
47 chunk_count: u64,
48 small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller)
49 upload_stat: UploadStatistic,
50 }
51
52 struct SharedBackupState {
53 finished: bool,
54 uid_counter: usize,
55 file_counter: usize, // sucessfully uploaded files
56 dynamic_writers: HashMap<usize, DynamicWriterState>,
57 fixed_writers: HashMap<usize, FixedWriterState>,
58 known_chunks: HashMap<[u8;32], u32>,
59 }
60
61 impl SharedBackupState {
62
63 // Raise error if finished flag is set
64 fn ensure_unfinished(&self) -> Result<(), Error> {
65 if self.finished {
66 bail!("backup already marked as finished.");
67 }
68 Ok(())
69 }
70
71 // Get an unique integer ID
72 pub fn next_uid(&mut self) -> usize {
73 self.uid_counter += 1;
74 self.uid_counter
75 }
76 }
77
78
79 /// `RpcEnvironmet` implementation for backup service
80 #[derive(Clone)]
81 pub struct BackupEnvironment {
82 env_type: RpcEnvironmentType,
83 result_attributes: HashMap<String, Value>,
84 user: String,
85 pub debug: bool,
86 pub formatter: &'static OutputFormatter,
87 pub worker: Arc<WorkerTask>,
88 pub datastore: Arc<DataStore>,
89 pub backup_dir: BackupDir,
90 pub last_backup: Option<BackupInfo>,
91 state: Arc<Mutex<SharedBackupState>>
92 }
93
94 impl BackupEnvironment {
95 pub fn new(
96 env_type: RpcEnvironmentType,
97 user: String,
98 worker: Arc<WorkerTask>,
99 datastore: Arc<DataStore>,
100 backup_dir: BackupDir,
101 ) -> Self {
102
103 let state = SharedBackupState {
104 finished: false,
105 uid_counter: 0,
106 file_counter: 0,
107 dynamic_writers: HashMap::new(),
108 fixed_writers: HashMap::new(),
109 known_chunks: HashMap::new(),
110 };
111
112 Self {
113 result_attributes: HashMap::new(),
114 env_type,
115 user,
116 worker,
117 datastore,
118 debug: false,
119 formatter: &JSON_FORMATTER,
120 backup_dir,
121 last_backup: None,
122 state: Arc::new(Mutex::new(state)),
123 }
124 }
125
126 /// Register a Chunk with associated length.
127 ///
128 /// We do not fully trust clients, so a client may only use registered
129 /// chunks. Please use this method to register chunks from previous backups.
130 pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
131 let mut state = self.state.lock().unwrap();
132
133 state.ensure_unfinished()?;
134
135 state.known_chunks.insert(digest, length);
136
137 Ok(())
138 }
139
140 /// Register fixed length chunks after upload.
141 ///
142 /// Like `register_chunk()`, but additionally record statistics for
143 /// the fixed index writer.
144 pub fn register_fixed_chunk(
145 &self,
146 wid: usize,
147 digest: [u8; 32],
148 size: u32,
149 compressed_size: u32,
150 is_duplicate: bool,
151 ) -> Result<(), Error> {
152 let mut state = self.state.lock().unwrap();
153
154 state.ensure_unfinished()?;
155
156 let mut data = match state.fixed_writers.get_mut(&wid) {
157 Some(data) => data,
158 None => bail!("fixed writer '{}' not registered", wid),
159 };
160
161 if size > data.chunk_size {
162 bail!("fixed writer '{}' - got large chunk ({} > {}", data.name, size, data.chunk_size);
163 } else if size < data.chunk_size {
164 data.small_chunk_count += 1;
165 if data.small_chunk_count > 1 {
166 bail!("fixed writer '{}' - detected multiple end chunks (chunk size too small)");
167 }
168 }
169
170 // record statistics
171 data.upload_stat.count += 1;
172 data.upload_stat.size += size as u64;
173 data.upload_stat.compressed_size += compressed_size as u64;
174 if is_duplicate { data.upload_stat.duplicates += 1; }
175
176 // register chunk
177 state.known_chunks.insert(digest, size);
178
179 Ok(())
180 }
181
182 /// Register dynamic length chunks after upload.
183 ///
184 /// Like `register_chunk()`, but additionally record statistics for
185 /// the dynamic index writer.
186 pub fn register_dynamic_chunk(
187 &self,
188 wid: usize,
189 digest: [u8; 32],
190 size: u32,
191 compressed_size: u32,
192 is_duplicate: bool,
193 ) -> Result<(), Error> {
194 let mut state = self.state.lock().unwrap();
195
196 state.ensure_unfinished()?;
197
198 let mut data = match state.dynamic_writers.get_mut(&wid) {
199 Some(data) => data,
200 None => bail!("dynamic writer '{}' not registered", wid),
201 };
202
203 // record statistics
204 data.upload_stat.count += 1;
205 data.upload_stat.size += size as u64;
206 data.upload_stat.compressed_size += compressed_size as u64;
207 if is_duplicate { data.upload_stat.duplicates += 1; }
208
209 // register chunk
210 state.known_chunks.insert(digest, size);
211
212 Ok(())
213 }
214
215 pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
216 let state = self.state.lock().unwrap();
217
218 match state.known_chunks.get(digest) {
219 Some(len) => Some(*len),
220 None => None,
221 }
222 }
223
224 /// Store the writer with an unique ID
225 pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
226 let mut state = self.state.lock().unwrap();
227
228 state.ensure_unfinished()?;
229
230 let uid = state.next_uid();
231
232 state.dynamic_writers.insert(uid, DynamicWriterState {
233 index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(),
234 });
235
236 Ok(uid)
237 }
238
239 /// Store the writer with an unique ID
240 pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result<usize, Error> {
241 let mut state = self.state.lock().unwrap();
242
243 state.ensure_unfinished()?;
244
245 let uid = state.next_uid();
246
247 state.fixed_writers.insert(uid, FixedWriterState {
248 index, name, chunk_count: 0, size, chunk_size, small_chunk_count: 0, upload_stat: UploadStatistic::new(),
249 });
250
251 Ok(uid)
252 }
253
254 /// Append chunk to dynamic writer
255 pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
256 let mut state = self.state.lock().unwrap();
257
258 state.ensure_unfinished()?;
259
260 let mut data = match state.dynamic_writers.get_mut(&wid) {
261 Some(data) => data,
262 None => bail!("dynamic writer '{}' not registered", wid),
263 };
264
265
266 if data.offset != offset {
267 bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
268 data.name, data.offset, offset);
269 }
270
271 data.offset += size as u64;
272 data.chunk_count += 1;
273
274 data.index.add_chunk(data.offset, digest)?;
275
276 Ok(())
277 }
278
279 /// Append chunk to fixed writer
280 pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
281 let mut state = self.state.lock().unwrap();
282
283 state.ensure_unfinished()?;
284
285 let mut data = match state.fixed_writers.get_mut(&wid) {
286 Some(data) => data,
287 None => bail!("fixed writer '{}' not registered", wid),
288 };
289
290 let end = (offset as usize) + (size as usize);
291 let idx = data.index.check_chunk_alignment(end, size as usize)?;
292
293 data.chunk_count += 1;
294
295 data.index.add_digest(idx, digest)?;
296
297 Ok(())
298 }
299
300 fn log_upload_stat(&self, archive_name: &str, csum: &[u8; 32], uuid: &[u8; 16], size: u64, chunk_count: u64, upload_stat: &UploadStatistic) {
301 self.log(format!("Upload statistics for '{}'", archive_name));
302 self.log(format!("UUID: {}", digest_to_hex(uuid)));
303 self.log(format!("Checksum: {}", digest_to_hex(csum)));
304 self.log(format!("Size: {}", size));
305 self.log(format!("Chunk count: {}", chunk_count));
306
307 if size == 0 || chunk_count == 0 {
308 return;
309 }
310
311 self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size));
312
313 let client_side_duplicates = chunk_count - upload_stat.count;
314 let server_side_duplicates = upload_stat.duplicates;
315
316 if (client_side_duplicates + server_side_duplicates) > 0 {
317 let per = (client_side_duplicates + server_side_duplicates)*100/chunk_count;
318 self.log(format!("Duplicates: {}+{} ({}%)", client_side_duplicates, server_side_duplicates, per));
319 }
320
321 if upload_stat.size > 0 {
322 self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size));
323 }
324 }
325
326 /// Close dynamic writer
327 pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
328 let mut state = self.state.lock().unwrap();
329
330 state.ensure_unfinished()?;
331
332 let mut data = match state.dynamic_writers.remove(&wid) {
333 Some(data) => data,
334 None => bail!("dynamic writer '{}' not registered", wid),
335 };
336
337 if data.chunk_count != chunk_count {
338 bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
339 }
340
341 if data.offset != size {
342 bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
343 }
344
345 let uuid = data.index.uuid;
346
347 let expected_csum = data.index.close()?;
348
349 println!("server checksum {:?} client: {:?}", expected_csum, csum);
350 if csum != expected_csum {
351 bail!("dynamic writer '{}' close failed - got unexpected checksum", data.name);
352 }
353
354 self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
355
356 state.file_counter += 1;
357
358 Ok(())
359 }
360
361 /// Close fixed writer
362 pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
363 let mut state = self.state.lock().unwrap();
364
365 state.ensure_unfinished()?;
366
367 let mut data = match state.fixed_writers.remove(&wid) {
368 Some(data) => data,
369 None => bail!("fixed writer '{}' not registered", wid),
370 };
371
372 if data.chunk_count != chunk_count {
373 bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
374 }
375
376 let expected_count = data.index.index_length();
377
378 if chunk_count != (expected_count as u64) {
379 bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count);
380 }
381
382 if size != (data.size as u64) {
383 bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
384 }
385
386 let uuid = data.index.uuid;
387
388 let expected_csum = data.index.close()?;
389
390 println!("server checksum {:?} client: {:?}", expected_csum, csum);
391 if csum != expected_csum {
392 bail!("fixed writer '{}' close failed - got unexpected checksum", data.name);
393 }
394
395 self.log_upload_stat(&data.name, &expected_csum, &uuid, size, chunk_count, &data.upload_stat);
396
397 state.file_counter += 1;
398
399 Ok(())
400 }
401
402 pub fn add_blob(&self, file_name: &str, data: Vec<u8>) -> Result<(), Error> {
403
404 let mut path = self.datastore.base_path();
405 path.push(self.backup_dir.relative_path());
406 path.push(file_name);
407
408 let blob_len = data.len();
409 let orig_len = data.len(); // fixme:
410
411 let blob = DataBlob::from_raw(data)?;
412 // always verify CRC at server side
413 blob.verify_crc()?;
414
415 let raw_data = blob.raw_data();
416 replace_file(&path, raw_data, CreateOptions::new())?;
417
418 self.log(format!("add blob {:?} ({} bytes, comp: {})", path, orig_len, blob_len));
419
420 let mut state = self.state.lock().unwrap();
421 state.file_counter += 1;
422
423 Ok(())
424 }
425
426 /// Mark backup as finished
427 pub fn finish_backup(&self) -> Result<(), Error> {
428 let mut state = self.state.lock().unwrap();
429 // test if all writer are correctly closed
430
431 state.ensure_unfinished()?;
432
433 state.finished = true;
434
435 if state.dynamic_writers.len() != 0 {
436 bail!("found open index writer - unable to finish backup");
437 }
438
439 if state.file_counter == 0 {
440 bail!("backup does not contain valid files (file count == 0)");
441 }
442
443 Ok(())
444 }
445
446 pub fn log<S: AsRef<str>>(&self, msg: S) {
447 self.worker.log(msg);
448 }
449
450 pub fn debug<S: AsRef<str>>(&self, msg: S) {
451 if self.debug { self.worker.log(msg); }
452 }
453
454 pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
455 match result {
456 Ok(data) => (self.formatter.format_data)(data, self),
457 Err(err) => (self.formatter.format_error)(err),
458 }
459 }
460
461 /// Raise error if finished flag is not set
462 pub fn ensure_finished(&self) -> Result<(), Error> {
463 let state = self.state.lock().unwrap();
464 if !state.finished {
465 bail!("backup ended but finished flag is not set.");
466 }
467 Ok(())
468 }
469
470 /// Remove complete backup
471 pub fn remove_backup(&self) -> Result<(), Error> {
472 let mut state = self.state.lock().unwrap();
473 state.finished = true;
474
475 self.datastore.remove_backup_dir(&self.backup_dir)?;
476
477 Ok(())
478 }
479 }
480
481 impl RpcEnvironment for BackupEnvironment {
482
483 fn set_result_attrib(&mut self, name: &str, value: Value) {
484 self.result_attributes.insert(name.into(), value);
485 }
486
487 fn get_result_attrib(&self, name: &str) -> Option<&Value> {
488 self.result_attributes.get(name)
489 }
490
491 fn env_type(&self) -> RpcEnvironmentType {
492 self.env_type
493 }
494
495 fn set_user(&mut self, _user: Option<String>) {
496 panic!("unable to change user");
497 }
498
499 fn get_user(&self) -> Option<String> {
500 Some(self.user.clone())
501 }
502 }
503
504 impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
505 fn as_ref(&self) -> &BackupEnvironment {
506 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
507 }
508 }
509
510 impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
511 fn as_ref(&self) -> &BackupEnvironment {
512 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
513 }
514 }