]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/backup/environment.rs
clippy: is_some/none/ok/err/empty
[proxmox-backup.git] / src / api2 / backup / environment.rs
CommitLineData
9fa55e09 1use anyhow::{bail, format_err, Error};
f9578f3c 2use std::sync::{Arc, Mutex};
0698f78d
SR
3use std::collections::{HashMap, HashSet};
4use nix::dir::Dir;
d95ced64 5
c8fff67d 6use ::serde::{Serialize};
e8d1da6a 7use serde_json::{json, Value};
d95ced64 8
feaa1ad3
WB
9use proxmox::tools::digest_to_hex;
10use proxmox::tools::fs::{replace_file, CreateOptions};
a2479cfa 11use proxmox::api::{RpcEnvironment, RpcEnvironmentType};
00388226 12
e6dc35ac 13use crate::api2::types::Authid;
21ee7912 14use crate::backup::*;
e7cb4dc5 15use crate::server::WorkerTask;
b4b63e52
DM
16use crate::server::formatter::*;
17use hyper::{Body, Response};
d95ced64 18
c8fff67d 19#[derive(Copy, Clone, Serialize)]
96482891
DM
20struct UploadStatistic {
21 count: u64,
22 size: u64,
23 compressed_size: u64,
24 duplicates: u64,
25}
26
27impl UploadStatistic {
28 fn new() -> Self {
29 Self {
30 count: 0,
31 size: 0,
32 compressed_size: 0,
33 duplicates: 0,
34 }
35 }
36}
8bea85b4 37
c8fff67d
DM
38impl std::ops::Add for UploadStatistic {
39 type Output = Self;
40
41 fn add(self, other: Self) -> Self {
42 Self {
43 count: self.count + other.count,
44 size: self.size + other.size,
45 compressed_size: self.compressed_size + other.compressed_size,
46 duplicates: self.duplicates + other.duplicates,
47 }
48 }
49}
50
8bea85b4
DM
51struct DynamicWriterState {
52 name: String,
53 index: DynamicIndexWriter,
54 offset: u64,
55 chunk_count: u64,
96482891 56 upload_stat: UploadStatistic,
8bea85b4
DM
57}
58
a42fa400
DM
59struct FixedWriterState {
60 name: String,
61 index: FixedIndexWriter,
62 size: usize,
63 chunk_size: u32,
64 chunk_count: u64,
5e04ec70 65 small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller)
96482891 66 upload_stat: UploadStatistic,
facd9801 67 incremental: bool,
a42fa400
DM
68}
69
79f6a79c
TL
70// key=digest, value=length
71type KnownChunksMap = HashMap<[u8;32], u32>;
43772efc 72
f9578f3c 73struct SharedBackupState {
372724af 74 finished: bool,
f9578f3c 75 uid_counter: usize,
add5861e 76 file_counter: usize, // successfully uploaded files
8bea85b4 77 dynamic_writers: HashMap<usize, DynamicWriterState>,
a42fa400 78 fixed_writers: HashMap<usize, FixedWriterState>,
43772efc 79 known_chunks: KnownChunksMap,
c8fff67d
DM
80 backup_size: u64, // sums up size of all files
81 backup_stat: UploadStatistic,
f9578f3c
DM
82}
83
372724af
DM
84impl SharedBackupState {
85
86 // Raise error if finished flag is set
87 fn ensure_unfinished(&self) -> Result<(), Error> {
88 if self.finished {
89 bail!("backup already marked as finished.");
90 }
91 Ok(())
92 }
93
94 // Get an unique integer ID
95 pub fn next_uid(&mut self) -> usize {
96 self.uid_counter += 1;
97 self.uid_counter
98 }
99}
100
101
d95ced64
DM
102/// `RpcEnvironmet` implementation for backup service
103#[derive(Clone)]
104pub struct BackupEnvironment {
105 env_type: RpcEnvironmentType,
e8d1da6a 106 result_attributes: Value,
e6dc35ac 107 auth_id: Authid,
a42d1f55 108 pub debug: bool,
b4b63e52 109 pub formatter: &'static OutputFormatter,
21ee7912
DM
110 pub worker: Arc<WorkerTask>,
111 pub datastore: Arc<DataStore>,
f9578f3c 112 pub backup_dir: BackupDir,
b02a52e3 113 pub last_backup: Option<BackupInfo>,
f9578f3c 114 state: Arc<Mutex<SharedBackupState>>
d95ced64
DM
115}
116
117impl BackupEnvironment {
f9578f3c
DM
118 pub fn new(
119 env_type: RpcEnvironmentType,
e6dc35ac 120 auth_id: Authid,
f9578f3c
DM
121 worker: Arc<WorkerTask>,
122 datastore: Arc<DataStore>,
123 backup_dir: BackupDir,
f9578f3c
DM
124 ) -> Self {
125
126 let state = SharedBackupState {
372724af 127 finished: false,
f9578f3c 128 uid_counter: 0,
e6389f4e 129 file_counter: 0,
f9578f3c 130 dynamic_writers: HashMap::new(),
a42fa400 131 fixed_writers: HashMap::new(),
a09c0e38 132 known_chunks: HashMap::new(),
c8fff67d
DM
133 backup_size: 0,
134 backup_stat: UploadStatistic::new(),
f9578f3c
DM
135 };
136
d95ced64 137 Self {
e8d1da6a 138 result_attributes: json!({}),
d95ced64 139 env_type,
e6dc35ac 140 auth_id,
d95ced64 141 worker,
21ee7912 142 datastore,
a42d1f55 143 debug: false,
b4b63e52 144 formatter: &JSON_FORMATTER,
f9578f3c 145 backup_dir,
b02a52e3 146 last_backup: None,
f9578f3c 147 state: Arc::new(Mutex::new(state)),
d95ced64
DM
148 }
149 }
150
96482891
DM
151 /// Register a Chunk with associated length.
152 ///
153 /// We do not fully trust clients, so a client may only use registered
154 /// chunks. Please use this method to register chunks from previous backups.
a09c0e38
DM
155 pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
156 let mut state = self.state.lock().unwrap();
157
158 state.ensure_unfinished()?;
159
79f6a79c 160 state.known_chunks.insert(digest, length);
a09c0e38
DM
161
162 Ok(())
163 }
164
96482891
DM
165 /// Register fixed length chunks after upload.
166 ///
167 /// Like `register_chunk()`, but additionally record statistics for
168 /// the fixed index writer.
642322b4
DM
169 pub fn register_fixed_chunk(
170 &self,
171 wid: usize,
172 digest: [u8; 32],
173 size: u32,
174 compressed_size: u32,
175 is_duplicate: bool,
176 ) -> Result<(), Error> {
177 let mut state = self.state.lock().unwrap();
178
179 state.ensure_unfinished()?;
180
181 let mut data = match state.fixed_writers.get_mut(&wid) {
182 Some(data) => data,
183 None => bail!("fixed writer '{}' not registered", wid),
184 };
185
5e04ec70
DM
186 if size > data.chunk_size {
187 bail!("fixed writer '{}' - got large chunk ({} > {}", data.name, size, data.chunk_size);
188 } else if size < data.chunk_size {
189 data.small_chunk_count += 1;
190 if data.small_chunk_count > 1 {
191 bail!("fixed writer '{}' - detected multiple end chunks (chunk size too small)");
192 }
642322b4
DM
193 }
194
96482891
DM
195 // record statistics
196 data.upload_stat.count += 1;
197 data.upload_stat.size += size as u64;
198 data.upload_stat.compressed_size += compressed_size as u64;
199 if is_duplicate { data.upload_stat.duplicates += 1; }
200
201 // register chunk
79f6a79c 202 state.known_chunks.insert(digest, size);
642322b4
DM
203
204 Ok(())
205 }
206
96482891
DM
207 /// Register dynamic length chunks after upload.
208 ///
209 /// Like `register_chunk()`, but additionally record statistics for
210 /// the dynamic index writer.
642322b4
DM
211 pub fn register_dynamic_chunk(
212 &self,
213 wid: usize,
214 digest: [u8; 32],
215 size: u32,
216 compressed_size: u32,
217 is_duplicate: bool,
218 ) -> Result<(), Error> {
219 let mut state = self.state.lock().unwrap();
220
221 state.ensure_unfinished()?;
222
223 let mut data = match state.dynamic_writers.get_mut(&wid) {
224 Some(data) => data,
225 None => bail!("dynamic writer '{}' not registered", wid),
226 };
227
96482891
DM
228 // record statistics
229 data.upload_stat.count += 1;
230 data.upload_stat.size += size as u64;
231 data.upload_stat.compressed_size += compressed_size as u64;
232 if is_duplicate { data.upload_stat.duplicates += 1; }
233
234 // register chunk
79f6a79c 235 state.known_chunks.insert(digest, size);
642322b4
DM
236
237 Ok(())
238 }
239
a09c0e38
DM
240 pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
241 let state = self.state.lock().unwrap();
242
243 match state.known_chunks.get(digest) {
79f6a79c 244 Some(len) => Some(*len),
a09c0e38
DM
245 None => None,
246 }
247 }
248
372724af 249 /// Store the writer with an unique ID
8bea85b4 250 pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
f9578f3c 251 let mut state = self.state.lock().unwrap();
f9578f3c 252
372724af
DM
253 state.ensure_unfinished()?;
254
255 let uid = state.next_uid();
f9578f3c 256
8bea85b4 257 state.dynamic_writers.insert(uid, DynamicWriterState {
96482891 258 index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(),
8bea85b4 259 });
372724af
DM
260
261 Ok(uid)
f9578f3c
DM
262 }
263
a42fa400 264 /// Store the writer with an unique ID
facd9801 265 pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32, incremental: bool) -> Result<usize, Error> {
a42fa400
DM
266 let mut state = self.state.lock().unwrap();
267
268 state.ensure_unfinished()?;
269
270 let uid = state.next_uid();
271
272 state.fixed_writers.insert(uid, FixedWriterState {
facd9801 273 index, name, chunk_count: 0, size, chunk_size, small_chunk_count: 0, upload_stat: UploadStatistic::new(), incremental,
a42fa400
DM
274 });
275
276 Ok(uid)
277 }
278
f9578f3c 279 /// Append chunk to dynamic writer
417cb073 280 pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
f9578f3c
DM
281 let mut state = self.state.lock().unwrap();
282
372724af
DM
283 state.ensure_unfinished()?;
284
f9578f3c
DM
285 let mut data = match state.dynamic_writers.get_mut(&wid) {
286 Some(data) => data,
287 None => bail!("dynamic writer '{}' not registered", wid),
288 };
289
f9578f3c 290
417cb073
DM
291 if data.offset != offset {
292 bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
293 data.name, data.offset, offset);
294 }
295
3dc5b2a2
DM
296 data.offset += size as u64;
297 data.chunk_count += 1;
298
8bea85b4 299 data.index.add_chunk(data.offset, digest)?;
f9578f3c
DM
300
301 Ok(())
302 }
303
a42fa400
DM
304 /// Append chunk to fixed writer
305 pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
306 let mut state = self.state.lock().unwrap();
307
308 state.ensure_unfinished()?;
309
310 let mut data = match state.fixed_writers.get_mut(&wid) {
311 Some(data) => data,
312 None => bail!("fixed writer '{}' not registered", wid),
313 };
314
5e04ec70
DM
315 let end = (offset as usize) + (size as usize);
316 let idx = data.index.check_chunk_alignment(end, size as usize)?;
a42fa400 317
5e04ec70 318 data.chunk_count += 1;
a42fa400 319
5e04ec70 320 data.index.add_digest(idx, digest)?;
a42fa400
DM
321
322 Ok(())
323 }
324
00388226 325 fn log_upload_stat(&self, archive_name: &str, csum: &[u8; 32], uuid: &[u8; 16], size: u64, chunk_count: u64, upload_stat: &UploadStatistic) {
96482891 326 self.log(format!("Upload statistics for '{}'", archive_name));
e18a6c9e
DM
327 self.log(format!("UUID: {}", digest_to_hex(uuid)));
328 self.log(format!("Checksum: {}", digest_to_hex(csum)));
96482891
DM
329 self.log(format!("Size: {}", size));
330 self.log(format!("Chunk count: {}", chunk_count));
36075475
DM
331
332 if size == 0 || chunk_count == 0 {
333 return;
334 }
335
96482891 336 self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size));
36075475 337
8268c9d1
SR
338 // account for zero chunk, which might be uploaded but never used
339 let client_side_duplicates = if chunk_count < upload_stat.count {
340 0
341 } else {
342 chunk_count - upload_stat.count
343 };
344
36075475
DM
345 let server_side_duplicates = upload_stat.duplicates;
346
347 if (client_side_duplicates + server_side_duplicates) > 0 {
348 let per = (client_side_duplicates + server_side_duplicates)*100/chunk_count;
349 self.log(format!("Duplicates: {}+{} ({}%)", client_side_duplicates, server_side_duplicates, per));
350 }
351
96482891 352 if upload_stat.size > 0 {
36075475 353 self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size));
96482891
DM
354 }
355 }
356
a2077252 357 /// Close dynamic writer
fb6026b6 358 pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
a2077252
DM
359 let mut state = self.state.lock().unwrap();
360
372724af
DM
361 state.ensure_unfinished()?;
362
a2077252
DM
363 let mut data = match state.dynamic_writers.remove(&wid) {
364 Some(data) => data,
365 None => bail!("dynamic writer '{}' not registered", wid),
366 };
367
8bea85b4
DM
368 if data.chunk_count != chunk_count {
369 bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
370 }
371
372 if data.offset != size {
373 bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
374 }
375
00388226
DM
376 let uuid = data.index.uuid;
377
fb6026b6
DM
378 let expected_csum = data.index.close()?;
379
fb6026b6
DM
380 if csum != expected_csum {
381 bail!("dynamic writer '{}' close failed - got unexpected checksum", data.name);
382 }
a2077252 383
00388226 384 self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 385
e6389f4e 386 state.file_counter += 1;
c8fff67d
DM
387 state.backup_size += size;
388 state.backup_stat = state.backup_stat + data.upload_stat;
e6389f4e 389
a2077252
DM
390 Ok(())
391 }
392
a42fa400 393 /// Close fixed writer
fb6026b6 394 pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
a42fa400
DM
395 let mut state = self.state.lock().unwrap();
396
397 state.ensure_unfinished()?;
398
399 let mut data = match state.fixed_writers.remove(&wid) {
400 Some(data) => data,
401 None => bail!("fixed writer '{}' not registered", wid),
402 };
403
404 if data.chunk_count != chunk_count {
006f3ff4 405 bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
a42fa400
DM
406 }
407
facd9801
SR
408 if !data.incremental {
409 let expected_count = data.index.index_length();
006f3ff4 410
facd9801
SR
411 if chunk_count != (expected_count as u64) {
412 bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count);
413 }
006f3ff4 414
facd9801
SR
415 if size != (data.size as u64) {
416 bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
417 }
006f3ff4 418 }
a42fa400 419
00388226 420 let uuid = data.index.uuid;
fb6026b6 421 let expected_csum = data.index.close()?;
a42fa400 422
fb6026b6
DM
423 if csum != expected_csum {
424 bail!("fixed writer '{}' close failed - got unexpected checksum", data.name);
425 }
426
427 self.log_upload_stat(&data.name, &expected_csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 428
e6389f4e 429 state.file_counter += 1;
c8fff67d
DM
430 state.backup_size += size;
431 state.backup_stat = state.backup_stat + data.upload_stat;
e6389f4e 432
a42fa400
DM
433 Ok(())
434 }
435
46bd8800
DM
436 pub fn add_blob(&self, file_name: &str, data: Vec<u8>) -> Result<(), Error> {
437
438 let mut path = self.datastore.base_path();
439 path.push(self.backup_dir.relative_path());
440 path.push(file_name);
441
442 let blob_len = data.len();
443 let orig_len = data.len(); // fixme:
444
39f18b30
DM
445 // always verify blob/CRC at server side
446 let blob = DataBlob::load_from_reader(&mut &data[..])?;
46bd8800
DM
447
448 let raw_data = blob.raw_data();
feaa1ad3 449 replace_file(&path, raw_data, CreateOptions::new())?;
46bd8800
DM
450
451 self.log(format!("add blob {:?} ({} bytes, comp: {})", path, orig_len, blob_len));
452
453 let mut state = self.state.lock().unwrap();
454 state.file_counter += 1;
c8fff67d
DM
455 state.backup_size += orig_len as u64;
456 state.backup_stat.size += blob_len as u64;
46bd8800
DM
457
458 Ok(())
459 }
460
372724af
DM
461 /// Mark backup as finished
462 pub fn finish_backup(&self) -> Result<(), Error> {
463 let mut state = self.state.lock().unwrap();
372724af
DM
464
465 state.ensure_unfinished()?;
466
3cddfb29 467 // test if all writer are correctly closed
3984a5fd 468 if !state.dynamic_writers.is_empty() || !state.fixed_writers.is_empty() {
372724af
DM
469 bail!("found open index writer - unable to finish backup");
470 }
471
e6389f4e
DM
472 if state.file_counter == 0 {
473 bail!("backup does not contain valid files (file count == 0)");
474 }
475
1a374fcf 476 // check for valid manifest and store stats
c8fff67d 477 let stats = serde_json::to_value(state.backup_stat)?;
1a374fcf
SR
478 self.datastore.update_manifest(&self.backup_dir, |manifest| {
479 manifest.unprotected["chunk_upload_stats"] = stats;
480 }).map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
9fa55e09 481
81f29351
SR
482 if let Some(base) = &self.last_backup {
483 let path = self.datastore.snapshot_path(&base.backup_dir);
484 if !path.exists() {
485 bail!(
486 "base snapshot {} was removed during backup, cannot finish as chunks might be missing",
487 base.backup_dir
488 );
489 }
490 }
491
d74edc3d
SR
492 // marks the backup as successful
493 state.finished = true;
476b4aca 494
372724af
DM
495 Ok(())
496 }
497
0698f78d
SR
498 /// If verify-new is set on the datastore, this will run a new verify task
499 /// for the backup. If not, this will return and also drop the passed lock
500 /// immediately.
501 pub fn verify_after_complete(&self, snap_lock: Dir) -> Result<(), Error> {
502 self.ensure_finished()?;
503
504 if !self.datastore.verify_new() {
505 // no verify requested, do nothing
506 return Ok(());
507 }
508
4ebda996 509 let worker_id = format!("{}:{}/{}/{:08X}",
0698f78d
SR
510 self.datastore.name(),
511 self.backup_dir.group().backup_type(),
512 self.backup_dir.group().backup_id(),
513 self.backup_dir.backup_time());
514
515 let datastore = self.datastore.clone();
516 let backup_dir = self.backup_dir.clone();
517
518 WorkerTask::new_thread(
519 "verify",
520 Some(worker_id),
e6dc35ac 521 self.auth_id.clone(),
0698f78d
SR
522 false,
523 move |worker| {
524 worker.log("Automatically verifying newly added snapshot");
525
526 let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16)));
527 let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
528
529 if !verify_backup_dir_with_lock(
530 datastore,
531 &backup_dir,
532 verified_chunks,
533 corrupt_chunks,
534 worker.clone(),
535 worker.upid().clone(),
d771a608 536 None,
0698f78d
SR
537 snap_lock,
538 )? {
539 bail!("verification failed - please check the log for details");
540 }
541
542 Ok(())
543 },
544 ).map(|_| ())
545 }
546
d95ced64
DM
547 pub fn log<S: AsRef<str>>(&self, msg: S) {
548 self.worker.log(msg);
549 }
b4b63e52 550
a42d1f55
DM
551 pub fn debug<S: AsRef<str>>(&self, msg: S) {
552 if self.debug { self.worker.log(msg); }
553 }
554
b4b63e52
DM
555 pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
556 match result {
557 Ok(data) => (self.formatter.format_data)(data, self),
558 Err(err) => (self.formatter.format_error)(err),
559 }
560 }
372724af
DM
561
562 /// Raise error if finished flag is not set
563 pub fn ensure_finished(&self) -> Result<(), Error> {
564 let state = self.state.lock().unwrap();
565 if !state.finished {
566 bail!("backup ended but finished flag is not set.");
567 }
568 Ok(())
569 }
570
b428af97
DM
571 /// Return true if the finished flag is set
572 pub fn finished(&self) -> bool {
573 let state = self.state.lock().unwrap();
574 state.finished
575 }
576
372724af
DM
577 /// Remove complete backup
578 pub fn remove_backup(&self) -> Result<(), Error> {
579 let mut state = self.state.lock().unwrap();
580 state.finished = true;
581
c9756b40 582 self.datastore.remove_backup_dir(&self.backup_dir, true)?;
372724af
DM
583
584 Ok(())
585 }
d95ced64
DM
586}
587
588impl RpcEnvironment for BackupEnvironment {
589
e8d1da6a
DM
590 fn result_attrib_mut(&mut self) -> &mut Value {
591 &mut self.result_attributes
d95ced64
DM
592 }
593
e8d1da6a
DM
594 fn result_attrib(&self) -> &Value {
595 &self.result_attributes
d95ced64
DM
596 }
597
598 fn env_type(&self) -> RpcEnvironmentType {
599 self.env_type
600 }
601
e6dc35ac
FG
602 fn set_auth_id(&mut self, _auth_id: Option<String>) {
603 panic!("unable to change auth_id");
d95ced64
DM
604 }
605
e6dc35ac
FG
606 fn get_auth_id(&self) -> Option<String> {
607 Some(self.auth_id.to_string())
d95ced64
DM
608 }
609}
21ee7912 610
dd5495d6 611impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
21ee7912
DM
612 fn as_ref(&self) -> &BackupEnvironment {
613 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
614 }
615}
dd5495d6
WB
616
617impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
b4b63e52
DM
618 fn as_ref(&self) -> &BackupEnvironment {
619 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
620 }
621}