]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup/environment.rs
move src/backup/datastore.rs into pbs_datastore crate
[proxmox-backup.git] / src / api2 / backup / environment.rs
1 use anyhow::{bail, format_err, Error};
2 use std::sync::{Arc, Mutex};
3 use std::collections::HashMap;
4 use nix::dir::Dir;
5
6 use ::serde::{Serialize};
7 use serde_json::{json, Value};
8
9 use proxmox::tools::digest_to_hex;
10 use proxmox::tools::fs::{replace_file, CreateOptions};
11 use proxmox::api::{RpcEnvironment, RpcEnvironmentType};
12
13 use pbs_datastore::{DataStore, DataBlob};
14 use pbs_datastore::backup_info::{BackupDir, BackupInfo};
15 use pbs_datastore::dynamic_index::DynamicIndexWriter;
16 use pbs_datastore::fixed_index::FixedIndexWriter;
17 use pbs_api_types::Authid;
18 use proxmox_rest_server::{WorkerTask, formatter::*};
19
20 use crate::backup::verify_backup_dir_with_lock;
21
22 use hyper::{Body, Response};
23
24 #[derive(Copy, Clone, Serialize)]
25 struct UploadStatistic {
26 count: u64,
27 size: u64,
28 compressed_size: u64,
29 duplicates: u64,
30 }
31
32 impl UploadStatistic {
33 fn new() -> Self {
34 Self {
35 count: 0,
36 size: 0,
37 compressed_size: 0,
38 duplicates: 0,
39 }
40 }
41 }
42
43 impl std::ops::Add for UploadStatistic {
44 type Output = Self;
45
46 fn add(self, other: Self) -> Self {
47 Self {
48 count: self.count + other.count,
49 size: self.size + other.size,
50 compressed_size: self.compressed_size + other.compressed_size,
51 duplicates: self.duplicates + other.duplicates,
52 }
53 }
54 }
55
56 struct DynamicWriterState {
57 name: String,
58 index: DynamicIndexWriter,
59 offset: u64,
60 chunk_count: u64,
61 upload_stat: UploadStatistic,
62 }
63
64 struct FixedWriterState {
65 name: String,
66 index: FixedIndexWriter,
67 size: usize,
68 chunk_size: u32,
69 chunk_count: u64,
70 small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller)
71 upload_stat: UploadStatistic,
72 incremental: bool,
73 }
74
75 // key=digest, value=length
76 type KnownChunksMap = HashMap<[u8;32], u32>;
77
78 struct SharedBackupState {
79 finished: bool,
80 uid_counter: usize,
81 file_counter: usize, // successfully uploaded files
82 dynamic_writers: HashMap<usize, DynamicWriterState>,
83 fixed_writers: HashMap<usize, FixedWriterState>,
84 known_chunks: KnownChunksMap,
85 backup_size: u64, // sums up size of all files
86 backup_stat: UploadStatistic,
87 }
88
89 impl SharedBackupState {
90
91 // Raise error if finished flag is set
92 fn ensure_unfinished(&self) -> Result<(), Error> {
93 if self.finished {
94 bail!("backup already marked as finished.");
95 }
96 Ok(())
97 }
98
99 // Get an unique integer ID
100 pub fn next_uid(&mut self) -> usize {
101 self.uid_counter += 1;
102 self.uid_counter
103 }
104 }
105
106
107 /// `RpcEnvironmet` implementation for backup service
108 #[derive(Clone)]
109 pub struct BackupEnvironment {
110 env_type: RpcEnvironmentType,
111 result_attributes: Value,
112 auth_id: Authid,
113 pub debug: bool,
114 pub formatter: &'static OutputFormatter,
115 pub worker: Arc<WorkerTask>,
116 pub datastore: Arc<DataStore>,
117 pub backup_dir: BackupDir,
118 pub last_backup: Option<BackupInfo>,
119 state: Arc<Mutex<SharedBackupState>>
120 }
121
122 impl BackupEnvironment {
123 pub fn new(
124 env_type: RpcEnvironmentType,
125 auth_id: Authid,
126 worker: Arc<WorkerTask>,
127 datastore: Arc<DataStore>,
128 backup_dir: BackupDir,
129 ) -> Self {
130
131 let state = SharedBackupState {
132 finished: false,
133 uid_counter: 0,
134 file_counter: 0,
135 dynamic_writers: HashMap::new(),
136 fixed_writers: HashMap::new(),
137 known_chunks: HashMap::new(),
138 backup_size: 0,
139 backup_stat: UploadStatistic::new(),
140 };
141
142 Self {
143 result_attributes: json!({}),
144 env_type,
145 auth_id,
146 worker,
147 datastore,
148 debug: false,
149 formatter: &JSON_FORMATTER,
150 backup_dir,
151 last_backup: None,
152 state: Arc::new(Mutex::new(state)),
153 }
154 }
155
156 /// Register a Chunk with associated length.
157 ///
158 /// We do not fully trust clients, so a client may only use registered
159 /// chunks. Please use this method to register chunks from previous backups.
160 pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
161 let mut state = self.state.lock().unwrap();
162
163 state.ensure_unfinished()?;
164
165 state.known_chunks.insert(digest, length);
166
167 Ok(())
168 }
169
170 /// Register fixed length chunks after upload.
171 ///
172 /// Like `register_chunk()`, but additionally record statistics for
173 /// the fixed index writer.
174 pub fn register_fixed_chunk(
175 &self,
176 wid: usize,
177 digest: [u8; 32],
178 size: u32,
179 compressed_size: u32,
180 is_duplicate: bool,
181 ) -> Result<(), Error> {
182 let mut state = self.state.lock().unwrap();
183
184 state.ensure_unfinished()?;
185
186 let mut data = match state.fixed_writers.get_mut(&wid) {
187 Some(data) => data,
188 None => bail!("fixed writer '{}' not registered", wid),
189 };
190
191 if size > data.chunk_size {
192 bail!("fixed writer '{}' - got large chunk ({} > {}", data.name, size, data.chunk_size);
193 }
194
195 if size < data.chunk_size {
196 data.small_chunk_count += 1;
197 if data.small_chunk_count > 1 {
198 bail!("fixed writer '{}' - detected multiple end chunks (chunk size too small)");
199 }
200 }
201
202 // record statistics
203 data.upload_stat.count += 1;
204 data.upload_stat.size += size as u64;
205 data.upload_stat.compressed_size += compressed_size as u64;
206 if is_duplicate { data.upload_stat.duplicates += 1; }
207
208 // register chunk
209 state.known_chunks.insert(digest, size);
210
211 Ok(())
212 }
213
214 /// Register dynamic length chunks after upload.
215 ///
216 /// Like `register_chunk()`, but additionally record statistics for
217 /// the dynamic index writer.
218 pub fn register_dynamic_chunk(
219 &self,
220 wid: usize,
221 digest: [u8; 32],
222 size: u32,
223 compressed_size: u32,
224 is_duplicate: bool,
225 ) -> Result<(), Error> {
226 let mut state = self.state.lock().unwrap();
227
228 state.ensure_unfinished()?;
229
230 let mut data = match state.dynamic_writers.get_mut(&wid) {
231 Some(data) => data,
232 None => bail!("dynamic writer '{}' not registered", wid),
233 };
234
235 // record statistics
236 data.upload_stat.count += 1;
237 data.upload_stat.size += size as u64;
238 data.upload_stat.compressed_size += compressed_size as u64;
239 if is_duplicate { data.upload_stat.duplicates += 1; }
240
241 // register chunk
242 state.known_chunks.insert(digest, size);
243
244 Ok(())
245 }
246
247 pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
248 let state = self.state.lock().unwrap();
249
250 match state.known_chunks.get(digest) {
251 Some(len) => Some(*len),
252 None => None,
253 }
254 }
255
256 /// Store the writer with an unique ID
257 pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
258 let mut state = self.state.lock().unwrap();
259
260 state.ensure_unfinished()?;
261
262 let uid = state.next_uid();
263
264 state.dynamic_writers.insert(uid, DynamicWriterState {
265 index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(),
266 });
267
268 Ok(uid)
269 }
270
271 /// Store the writer with an unique ID
272 pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32, incremental: bool) -> Result<usize, Error> {
273 let mut state = self.state.lock().unwrap();
274
275 state.ensure_unfinished()?;
276
277 let uid = state.next_uid();
278
279 state.fixed_writers.insert(uid, FixedWriterState {
280 index, name, chunk_count: 0, size, chunk_size, small_chunk_count: 0, upload_stat: UploadStatistic::new(), incremental,
281 });
282
283 Ok(uid)
284 }
285
286 /// Append chunk to dynamic writer
287 pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
288 let mut state = self.state.lock().unwrap();
289
290 state.ensure_unfinished()?;
291
292 let mut data = match state.dynamic_writers.get_mut(&wid) {
293 Some(data) => data,
294 None => bail!("dynamic writer '{}' not registered", wid),
295 };
296
297
298 if data.offset != offset {
299 bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
300 data.name, data.offset, offset);
301 }
302
303 data.offset += size as u64;
304 data.chunk_count += 1;
305
306 data.index.add_chunk(data.offset, digest)?;
307
308 Ok(())
309 }
310
311 /// Append chunk to fixed writer
312 pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
313 let mut state = self.state.lock().unwrap();
314
315 state.ensure_unfinished()?;
316
317 let mut data = match state.fixed_writers.get_mut(&wid) {
318 Some(data) => data,
319 None => bail!("fixed writer '{}' not registered", wid),
320 };
321
322 let end = (offset as usize) + (size as usize);
323 let idx = data.index.check_chunk_alignment(end, size as usize)?;
324
325 data.chunk_count += 1;
326
327 data.index.add_digest(idx, digest)?;
328
329 Ok(())
330 }
331
332 fn log_upload_stat(&self, archive_name: &str, csum: &[u8; 32], uuid: &[u8; 16], size: u64, chunk_count: u64, upload_stat: &UploadStatistic) {
333 self.log(format!("Upload statistics for '{}'", archive_name));
334 self.log(format!("UUID: {}", digest_to_hex(uuid)));
335 self.log(format!("Checksum: {}", digest_to_hex(csum)));
336 self.log(format!("Size: {}", size));
337 self.log(format!("Chunk count: {}", chunk_count));
338
339 if size == 0 || chunk_count == 0 {
340 return;
341 }
342
343 self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size));
344
345 // account for zero chunk, which might be uploaded but never used
346 let client_side_duplicates = if chunk_count < upload_stat.count {
347 0
348 } else {
349 chunk_count - upload_stat.count
350 };
351
352 let server_side_duplicates = upload_stat.duplicates;
353
354 if (client_side_duplicates + server_side_duplicates) > 0 {
355 let per = (client_side_duplicates + server_side_duplicates)*100/chunk_count;
356 self.log(format!("Duplicates: {}+{} ({}%)", client_side_duplicates, server_side_duplicates, per));
357 }
358
359 if upload_stat.size > 0 {
360 self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size));
361 }
362 }
363
364 /// Close dynamic writer
365 pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
366 let mut state = self.state.lock().unwrap();
367
368 state.ensure_unfinished()?;
369
370 let mut data = match state.dynamic_writers.remove(&wid) {
371 Some(data) => data,
372 None => bail!("dynamic writer '{}' not registered", wid),
373 };
374
375 if data.chunk_count != chunk_count {
376 bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
377 }
378
379 if data.offset != size {
380 bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
381 }
382
383 let uuid = data.index.uuid;
384
385 let expected_csum = data.index.close()?;
386
387 if csum != expected_csum {
388 bail!("dynamic writer '{}' close failed - got unexpected checksum", data.name);
389 }
390
391 self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
392
393 state.file_counter += 1;
394 state.backup_size += size;
395 state.backup_stat = state.backup_stat + data.upload_stat;
396
397 Ok(())
398 }
399
400 /// Close fixed writer
401 pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
402 let mut state = self.state.lock().unwrap();
403
404 state.ensure_unfinished()?;
405
406 let mut data = match state.fixed_writers.remove(&wid) {
407 Some(data) => data,
408 None => bail!("fixed writer '{}' not registered", wid),
409 };
410
411 if data.chunk_count != chunk_count {
412 bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
413 }
414
415 if !data.incremental {
416 let expected_count = data.index.index_length();
417
418 if chunk_count != (expected_count as u64) {
419 bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count);
420 }
421
422 if size != (data.size as u64) {
423 bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
424 }
425 }
426
427 let uuid = data.index.uuid;
428 let expected_csum = data.index.close()?;
429
430 if csum != expected_csum {
431 bail!("fixed writer '{}' close failed - got unexpected checksum", data.name);
432 }
433
434 self.log_upload_stat(&data.name, &expected_csum, &uuid, size, chunk_count, &data.upload_stat);
435
436 state.file_counter += 1;
437 state.backup_size += size;
438 state.backup_stat = state.backup_stat + data.upload_stat;
439
440 Ok(())
441 }
442
443 pub fn add_blob(&self, file_name: &str, data: Vec<u8>) -> Result<(), Error> {
444
445 let mut path = self.datastore.base_path();
446 path.push(self.backup_dir.relative_path());
447 path.push(file_name);
448
449 let blob_len = data.len();
450 let orig_len = data.len(); // fixme:
451
452 // always verify blob/CRC at server side
453 let blob = DataBlob::load_from_reader(&mut &data[..])?;
454
455 let raw_data = blob.raw_data();
456 replace_file(&path, raw_data, CreateOptions::new())?;
457
458 self.log(format!("add blob {:?} ({} bytes, comp: {})", path, orig_len, blob_len));
459
460 let mut state = self.state.lock().unwrap();
461 state.file_counter += 1;
462 state.backup_size += orig_len as u64;
463 state.backup_stat.size += blob_len as u64;
464
465 Ok(())
466 }
467
468 /// Mark backup as finished
469 pub fn finish_backup(&self) -> Result<(), Error> {
470 let mut state = self.state.lock().unwrap();
471
472 state.ensure_unfinished()?;
473
474 // test if all writer are correctly closed
475 if !state.dynamic_writers.is_empty() || !state.fixed_writers.is_empty() {
476 bail!("found open index writer - unable to finish backup");
477 }
478
479 if state.file_counter == 0 {
480 bail!("backup does not contain valid files (file count == 0)");
481 }
482
483 // check for valid manifest and store stats
484 let stats = serde_json::to_value(state.backup_stat)?;
485 self.datastore.update_manifest(&self.backup_dir, |manifest| {
486 manifest.unprotected["chunk_upload_stats"] = stats;
487 }).map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
488
489 if let Some(base) = &self.last_backup {
490 let path = self.datastore.snapshot_path(&base.backup_dir);
491 if !path.exists() {
492 bail!(
493 "base snapshot {} was removed during backup, cannot finish as chunks might be missing",
494 base.backup_dir
495 );
496 }
497 }
498
499 // marks the backup as successful
500 state.finished = true;
501
502 Ok(())
503 }
504
505 /// If verify-new is set on the datastore, this will run a new verify task
506 /// for the backup. If not, this will return and also drop the passed lock
507 /// immediately.
508 pub fn verify_after_complete(&self, snap_lock: Dir) -> Result<(), Error> {
509 self.ensure_finished()?;
510
511 if !self.datastore.verify_new() {
512 // no verify requested, do nothing
513 return Ok(());
514 }
515
516 let worker_id = format!("{}:{}/{}/{:08X}",
517 self.datastore.name(),
518 self.backup_dir.group().backup_type(),
519 self.backup_dir.group().backup_id(),
520 self.backup_dir.backup_time());
521
522 let datastore = self.datastore.clone();
523 let backup_dir = self.backup_dir.clone();
524
525 WorkerTask::new_thread(
526 "verify",
527 Some(worker_id),
528 self.auth_id.to_string(),
529 false,
530 move |worker| {
531 worker.log_message("Automatically verifying newly added snapshot");
532
533
534 let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
535 if !verify_backup_dir_with_lock(
536 &verify_worker,
537 &backup_dir,
538 worker.upid().clone(),
539 None,
540 snap_lock,
541 )? {
542 bail!("verification failed - please check the log for details");
543 }
544
545 Ok(())
546 },
547 ).map(|_| ())
548 }
549
550 pub fn log<S: AsRef<str>>(&self, msg: S) {
551 self.worker.log_message(msg);
552 }
553
554 pub fn debug<S: AsRef<str>>(&self, msg: S) {
555 if self.debug { self.worker.log_message(msg); }
556 }
557
558 pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
559 match result {
560 Ok(data) => (self.formatter.format_data)(data, self),
561 Err(err) => (self.formatter.format_error)(err),
562 }
563 }
564
565 /// Raise error if finished flag is not set
566 pub fn ensure_finished(&self) -> Result<(), Error> {
567 let state = self.state.lock().unwrap();
568 if !state.finished {
569 bail!("backup ended but finished flag is not set.");
570 }
571 Ok(())
572 }
573
574 /// Return true if the finished flag is set
575 pub fn finished(&self) -> bool {
576 let state = self.state.lock().unwrap();
577 state.finished
578 }
579
580 /// Remove complete backup
581 pub fn remove_backup(&self) -> Result<(), Error> {
582 let mut state = self.state.lock().unwrap();
583 state.finished = true;
584
585 self.datastore.remove_backup_dir(&self.backup_dir, true)?;
586
587 Ok(())
588 }
589 }
590
591 impl RpcEnvironment for BackupEnvironment {
592
593 fn result_attrib_mut(&mut self) -> &mut Value {
594 &mut self.result_attributes
595 }
596
597 fn result_attrib(&self) -> &Value {
598 &self.result_attributes
599 }
600
601 fn env_type(&self) -> RpcEnvironmentType {
602 self.env_type
603 }
604
605 fn set_auth_id(&mut self, _auth_id: Option<String>) {
606 panic!("unable to change auth_id");
607 }
608
609 fn get_auth_id(&self) -> Option<String> {
610 Some(self.auth_id.to_string())
611 }
612 }
613
614 impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
615 fn as_ref(&self) -> &BackupEnvironment {
616 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
617 }
618 }
619
620 impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
621 fn as_ref(&self) -> &BackupEnvironment {
622 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
623 }
624 }