]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/backup/environment.rs
move worker_task.rs into proxmox-rest-server crate
[proxmox-backup.git] / src / api2 / backup / environment.rs
CommitLineData
9fa55e09 1use anyhow::{bail, format_err, Error};
f9578f3c 2use std::sync::{Arc, Mutex};
9c26a3d6 3use std::collections::HashMap;
0698f78d 4use nix::dir::Dir;
d95ced64 5
c8fff67d 6use ::serde::{Serialize};
e8d1da6a 7use serde_json::{json, Value};
d95ced64 8
feaa1ad3
WB
9use proxmox::tools::digest_to_hex;
10use proxmox::tools::fs::{replace_file, CreateOptions};
a2479cfa 11use proxmox::api::{RpcEnvironment, RpcEnvironmentType};
00388226 12
b2065dc7
WB
13use pbs_datastore::DataBlob;
14use pbs_datastore::backup_info::{BackupDir, BackupInfo};
15use pbs_datastore::dynamic_index::DynamicIndexWriter;
16use pbs_datastore::fixed_index::FixedIndexWriter;
6227654a 17use pbs_api_types::Authid;
b9700a9f 18use proxmox_rest_server::{WorkerTask, formatter::*};
b2065dc7 19
b2065dc7 20use crate::backup::{verify_backup_dir_with_lock, DataStore};
b9700a9f 21
b4b63e52 22use hyper::{Body, Response};
d95ced64 23
c8fff67d 24#[derive(Copy, Clone, Serialize)]
96482891
DM
25struct UploadStatistic {
26 count: u64,
27 size: u64,
28 compressed_size: u64,
29 duplicates: u64,
30}
31
32impl UploadStatistic {
33 fn new() -> Self {
34 Self {
35 count: 0,
36 size: 0,
37 compressed_size: 0,
38 duplicates: 0,
39 }
40 }
41}
8bea85b4 42
c8fff67d
DM
43impl std::ops::Add for UploadStatistic {
44 type Output = Self;
45
46 fn add(self, other: Self) -> Self {
47 Self {
48 count: self.count + other.count,
49 size: self.size + other.size,
50 compressed_size: self.compressed_size + other.compressed_size,
51 duplicates: self.duplicates + other.duplicates,
52 }
53 }
54}
55
8bea85b4
DM
56struct DynamicWriterState {
57 name: String,
58 index: DynamicIndexWriter,
59 offset: u64,
60 chunk_count: u64,
96482891 61 upload_stat: UploadStatistic,
8bea85b4
DM
62}
63
a42fa400
DM
64struct FixedWriterState {
65 name: String,
66 index: FixedIndexWriter,
67 size: usize,
68 chunk_size: u32,
69 chunk_count: u64,
5e04ec70 70 small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller)
96482891 71 upload_stat: UploadStatistic,
facd9801 72 incremental: bool,
a42fa400
DM
73}
74
79f6a79c
TL
75// key=digest, value=length
76type KnownChunksMap = HashMap<[u8;32], u32>;
43772efc 77
f9578f3c 78struct SharedBackupState {
372724af 79 finished: bool,
f9578f3c 80 uid_counter: usize,
add5861e 81 file_counter: usize, // successfully uploaded files
8bea85b4 82 dynamic_writers: HashMap<usize, DynamicWriterState>,
a42fa400 83 fixed_writers: HashMap<usize, FixedWriterState>,
43772efc 84 known_chunks: KnownChunksMap,
c8fff67d
DM
85 backup_size: u64, // sums up size of all files
86 backup_stat: UploadStatistic,
f9578f3c
DM
87}
88
372724af
DM
89impl SharedBackupState {
90
91 // Raise error if finished flag is set
92 fn ensure_unfinished(&self) -> Result<(), Error> {
93 if self.finished {
94 bail!("backup already marked as finished.");
95 }
96 Ok(())
97 }
98
99 // Get an unique integer ID
100 pub fn next_uid(&mut self) -> usize {
101 self.uid_counter += 1;
102 self.uid_counter
103 }
104}
105
106
d95ced64
DM
107/// `RpcEnvironmet` implementation for backup service
108#[derive(Clone)]
109pub struct BackupEnvironment {
110 env_type: RpcEnvironmentType,
e8d1da6a 111 result_attributes: Value,
e6dc35ac 112 auth_id: Authid,
a42d1f55 113 pub debug: bool,
b4b63e52 114 pub formatter: &'static OutputFormatter,
21ee7912
DM
115 pub worker: Arc<WorkerTask>,
116 pub datastore: Arc<DataStore>,
f9578f3c 117 pub backup_dir: BackupDir,
b02a52e3 118 pub last_backup: Option<BackupInfo>,
f9578f3c 119 state: Arc<Mutex<SharedBackupState>>
d95ced64
DM
120}
121
122impl BackupEnvironment {
f9578f3c
DM
123 pub fn new(
124 env_type: RpcEnvironmentType,
e6dc35ac 125 auth_id: Authid,
f9578f3c
DM
126 worker: Arc<WorkerTask>,
127 datastore: Arc<DataStore>,
128 backup_dir: BackupDir,
f9578f3c
DM
129 ) -> Self {
130
131 let state = SharedBackupState {
372724af 132 finished: false,
f9578f3c 133 uid_counter: 0,
e6389f4e 134 file_counter: 0,
f9578f3c 135 dynamic_writers: HashMap::new(),
a42fa400 136 fixed_writers: HashMap::new(),
a09c0e38 137 known_chunks: HashMap::new(),
c8fff67d
DM
138 backup_size: 0,
139 backup_stat: UploadStatistic::new(),
f9578f3c
DM
140 };
141
d95ced64 142 Self {
e8d1da6a 143 result_attributes: json!({}),
d95ced64 144 env_type,
e6dc35ac 145 auth_id,
d95ced64 146 worker,
21ee7912 147 datastore,
a42d1f55 148 debug: false,
b4b63e52 149 formatter: &JSON_FORMATTER,
f9578f3c 150 backup_dir,
b02a52e3 151 last_backup: None,
f9578f3c 152 state: Arc::new(Mutex::new(state)),
d95ced64
DM
153 }
154 }
155
96482891
DM
156 /// Register a Chunk with associated length.
157 ///
158 /// We do not fully trust clients, so a client may only use registered
159 /// chunks. Please use this method to register chunks from previous backups.
a09c0e38
DM
160 pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
161 let mut state = self.state.lock().unwrap();
162
163 state.ensure_unfinished()?;
164
79f6a79c 165 state.known_chunks.insert(digest, length);
a09c0e38
DM
166
167 Ok(())
168 }
169
96482891
DM
170 /// Register fixed length chunks after upload.
171 ///
172 /// Like `register_chunk()`, but additionally record statistics for
173 /// the fixed index writer.
642322b4
DM
174 pub fn register_fixed_chunk(
175 &self,
176 wid: usize,
177 digest: [u8; 32],
178 size: u32,
179 compressed_size: u32,
180 is_duplicate: bool,
181 ) -> Result<(), Error> {
182 let mut state = self.state.lock().unwrap();
183
184 state.ensure_unfinished()?;
185
186 let mut data = match state.fixed_writers.get_mut(&wid) {
187 Some(data) => data,
188 None => bail!("fixed writer '{}' not registered", wid),
189 };
190
5e04ec70
DM
191 if size > data.chunk_size {
192 bail!("fixed writer '{}' - got large chunk ({} > {}", data.name, size, data.chunk_size);
43313c2e
FG
193 }
194
195 if size < data.chunk_size {
5e04ec70
DM
196 data.small_chunk_count += 1;
197 if data.small_chunk_count > 1 {
198 bail!("fixed writer '{}' - detected multiple end chunks (chunk size too small)");
199 }
642322b4
DM
200 }
201
96482891
DM
202 // record statistics
203 data.upload_stat.count += 1;
204 data.upload_stat.size += size as u64;
205 data.upload_stat.compressed_size += compressed_size as u64;
206 if is_duplicate { data.upload_stat.duplicates += 1; }
207
208 // register chunk
79f6a79c 209 state.known_chunks.insert(digest, size);
642322b4
DM
210
211 Ok(())
212 }
213
96482891
DM
214 /// Register dynamic length chunks after upload.
215 ///
216 /// Like `register_chunk()`, but additionally record statistics for
217 /// the dynamic index writer.
642322b4
DM
218 pub fn register_dynamic_chunk(
219 &self,
220 wid: usize,
221 digest: [u8; 32],
222 size: u32,
223 compressed_size: u32,
224 is_duplicate: bool,
225 ) -> Result<(), Error> {
226 let mut state = self.state.lock().unwrap();
227
228 state.ensure_unfinished()?;
229
230 let mut data = match state.dynamic_writers.get_mut(&wid) {
231 Some(data) => data,
232 None => bail!("dynamic writer '{}' not registered", wid),
233 };
234
96482891
DM
235 // record statistics
236 data.upload_stat.count += 1;
237 data.upload_stat.size += size as u64;
238 data.upload_stat.compressed_size += compressed_size as u64;
239 if is_duplicate { data.upload_stat.duplicates += 1; }
240
241 // register chunk
79f6a79c 242 state.known_chunks.insert(digest, size);
642322b4
DM
243
244 Ok(())
245 }
246
a09c0e38
DM
247 pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
248 let state = self.state.lock().unwrap();
249
250 match state.known_chunks.get(digest) {
79f6a79c 251 Some(len) => Some(*len),
a09c0e38
DM
252 None => None,
253 }
254 }
255
372724af 256 /// Store the writer with an unique ID
8bea85b4 257 pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
f9578f3c 258 let mut state = self.state.lock().unwrap();
f9578f3c 259
372724af
DM
260 state.ensure_unfinished()?;
261
262 let uid = state.next_uid();
f9578f3c 263
8bea85b4 264 state.dynamic_writers.insert(uid, DynamicWriterState {
96482891 265 index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(),
8bea85b4 266 });
372724af
DM
267
268 Ok(uid)
f9578f3c
DM
269 }
270
a42fa400 271 /// Store the writer with an unique ID
facd9801 272 pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32, incremental: bool) -> Result<usize, Error> {
a42fa400
DM
273 let mut state = self.state.lock().unwrap();
274
275 state.ensure_unfinished()?;
276
277 let uid = state.next_uid();
278
279 state.fixed_writers.insert(uid, FixedWriterState {
facd9801 280 index, name, chunk_count: 0, size, chunk_size, small_chunk_count: 0, upload_stat: UploadStatistic::new(), incremental,
a42fa400
DM
281 });
282
283 Ok(uid)
284 }
285
f9578f3c 286 /// Append chunk to dynamic writer
417cb073 287 pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
f9578f3c
DM
288 let mut state = self.state.lock().unwrap();
289
372724af
DM
290 state.ensure_unfinished()?;
291
f9578f3c
DM
292 let mut data = match state.dynamic_writers.get_mut(&wid) {
293 Some(data) => data,
294 None => bail!("dynamic writer '{}' not registered", wid),
295 };
296
f9578f3c 297
417cb073
DM
298 if data.offset != offset {
299 bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
300 data.name, data.offset, offset);
301 }
302
3dc5b2a2
DM
303 data.offset += size as u64;
304 data.chunk_count += 1;
305
8bea85b4 306 data.index.add_chunk(data.offset, digest)?;
f9578f3c
DM
307
308 Ok(())
309 }
310
a42fa400
DM
311 /// Append chunk to fixed writer
312 pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
313 let mut state = self.state.lock().unwrap();
314
315 state.ensure_unfinished()?;
316
317 let mut data = match state.fixed_writers.get_mut(&wid) {
318 Some(data) => data,
319 None => bail!("fixed writer '{}' not registered", wid),
320 };
321
5e04ec70
DM
322 let end = (offset as usize) + (size as usize);
323 let idx = data.index.check_chunk_alignment(end, size as usize)?;
a42fa400 324
5e04ec70 325 data.chunk_count += 1;
a42fa400 326
5e04ec70 327 data.index.add_digest(idx, digest)?;
a42fa400
DM
328
329 Ok(())
330 }
331
00388226 332 fn log_upload_stat(&self, archive_name: &str, csum: &[u8; 32], uuid: &[u8; 16], size: u64, chunk_count: u64, upload_stat: &UploadStatistic) {
96482891 333 self.log(format!("Upload statistics for '{}'", archive_name));
e18a6c9e
DM
334 self.log(format!("UUID: {}", digest_to_hex(uuid)));
335 self.log(format!("Checksum: {}", digest_to_hex(csum)));
96482891
DM
336 self.log(format!("Size: {}", size));
337 self.log(format!("Chunk count: {}", chunk_count));
36075475
DM
338
339 if size == 0 || chunk_count == 0 {
340 return;
341 }
342
96482891 343 self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size));
36075475 344
8268c9d1
SR
345 // account for zero chunk, which might be uploaded but never used
346 let client_side_duplicates = if chunk_count < upload_stat.count {
347 0
348 } else {
349 chunk_count - upload_stat.count
350 };
351
36075475
DM
352 let server_side_duplicates = upload_stat.duplicates;
353
354 if (client_side_duplicates + server_side_duplicates) > 0 {
355 let per = (client_side_duplicates + server_side_duplicates)*100/chunk_count;
356 self.log(format!("Duplicates: {}+{} ({}%)", client_side_duplicates, server_side_duplicates, per));
357 }
358
96482891 359 if upload_stat.size > 0 {
36075475 360 self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size));
96482891
DM
361 }
362 }
363
a2077252 364 /// Close dynamic writer
fb6026b6 365 pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
a2077252
DM
366 let mut state = self.state.lock().unwrap();
367
372724af
DM
368 state.ensure_unfinished()?;
369
a2077252
DM
370 let mut data = match state.dynamic_writers.remove(&wid) {
371 Some(data) => data,
372 None => bail!("dynamic writer '{}' not registered", wid),
373 };
374
8bea85b4
DM
375 if data.chunk_count != chunk_count {
376 bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
377 }
378
379 if data.offset != size {
380 bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
381 }
382
00388226
DM
383 let uuid = data.index.uuid;
384
fb6026b6
DM
385 let expected_csum = data.index.close()?;
386
fb6026b6
DM
387 if csum != expected_csum {
388 bail!("dynamic writer '{}' close failed - got unexpected checksum", data.name);
389 }
a2077252 390
00388226 391 self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 392
e6389f4e 393 state.file_counter += 1;
c8fff67d
DM
394 state.backup_size += size;
395 state.backup_stat = state.backup_stat + data.upload_stat;
e6389f4e 396
a2077252
DM
397 Ok(())
398 }
399
a42fa400 400 /// Close fixed writer
fb6026b6 401 pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
a42fa400
DM
402 let mut state = self.state.lock().unwrap();
403
404 state.ensure_unfinished()?;
405
406 let mut data = match state.fixed_writers.remove(&wid) {
407 Some(data) => data,
408 None => bail!("fixed writer '{}' not registered", wid),
409 };
410
411 if data.chunk_count != chunk_count {
006f3ff4 412 bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
a42fa400
DM
413 }
414
facd9801
SR
415 if !data.incremental {
416 let expected_count = data.index.index_length();
006f3ff4 417
facd9801
SR
418 if chunk_count != (expected_count as u64) {
419 bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count);
420 }
006f3ff4 421
facd9801
SR
422 if size != (data.size as u64) {
423 bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
424 }
006f3ff4 425 }
a42fa400 426
00388226 427 let uuid = data.index.uuid;
fb6026b6 428 let expected_csum = data.index.close()?;
a42fa400 429
fb6026b6
DM
430 if csum != expected_csum {
431 bail!("fixed writer '{}' close failed - got unexpected checksum", data.name);
432 }
433
434 self.log_upload_stat(&data.name, &expected_csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 435
e6389f4e 436 state.file_counter += 1;
c8fff67d
DM
437 state.backup_size += size;
438 state.backup_stat = state.backup_stat + data.upload_stat;
e6389f4e 439
a42fa400
DM
440 Ok(())
441 }
442
46bd8800
DM
443 pub fn add_blob(&self, file_name: &str, data: Vec<u8>) -> Result<(), Error> {
444
445 let mut path = self.datastore.base_path();
446 path.push(self.backup_dir.relative_path());
447 path.push(file_name);
448
449 let blob_len = data.len();
450 let orig_len = data.len(); // fixme:
451
39f18b30
DM
452 // always verify blob/CRC at server side
453 let blob = DataBlob::load_from_reader(&mut &data[..])?;
46bd8800
DM
454
455 let raw_data = blob.raw_data();
feaa1ad3 456 replace_file(&path, raw_data, CreateOptions::new())?;
46bd8800
DM
457
458 self.log(format!("add blob {:?} ({} bytes, comp: {})", path, orig_len, blob_len));
459
460 let mut state = self.state.lock().unwrap();
461 state.file_counter += 1;
c8fff67d
DM
462 state.backup_size += orig_len as u64;
463 state.backup_stat.size += blob_len as u64;
46bd8800
DM
464
465 Ok(())
466 }
467
372724af
DM
468 /// Mark backup as finished
469 pub fn finish_backup(&self) -> Result<(), Error> {
470 let mut state = self.state.lock().unwrap();
372724af
DM
471
472 state.ensure_unfinished()?;
473
3cddfb29 474 // test if all writer are correctly closed
3984a5fd 475 if !state.dynamic_writers.is_empty() || !state.fixed_writers.is_empty() {
372724af
DM
476 bail!("found open index writer - unable to finish backup");
477 }
478
e6389f4e
DM
479 if state.file_counter == 0 {
480 bail!("backup does not contain valid files (file count == 0)");
481 }
482
1a374fcf 483 // check for valid manifest and store stats
c8fff67d 484 let stats = serde_json::to_value(state.backup_stat)?;
1a374fcf
SR
485 self.datastore.update_manifest(&self.backup_dir, |manifest| {
486 manifest.unprotected["chunk_upload_stats"] = stats;
487 }).map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
9fa55e09 488
81f29351
SR
489 if let Some(base) = &self.last_backup {
490 let path = self.datastore.snapshot_path(&base.backup_dir);
491 if !path.exists() {
492 bail!(
493 "base snapshot {} was removed during backup, cannot finish as chunks might be missing",
494 base.backup_dir
495 );
496 }
497 }
498
d74edc3d
SR
499 // marks the backup as successful
500 state.finished = true;
476b4aca 501
372724af
DM
502 Ok(())
503 }
504
0698f78d
SR
505 /// If verify-new is set on the datastore, this will run a new verify task
506 /// for the backup. If not, this will return and also drop the passed lock
507 /// immediately.
508 pub fn verify_after_complete(&self, snap_lock: Dir) -> Result<(), Error> {
509 self.ensure_finished()?;
510
511 if !self.datastore.verify_new() {
512 // no verify requested, do nothing
513 return Ok(());
514 }
515
4ebda996 516 let worker_id = format!("{}:{}/{}/{:08X}",
0698f78d
SR
517 self.datastore.name(),
518 self.backup_dir.group().backup_type(),
519 self.backup_dir.group().backup_id(),
520 self.backup_dir.backup_time());
521
522 let datastore = self.datastore.clone();
523 let backup_dir = self.backup_dir.clone();
524
525 WorkerTask::new_thread(
526 "verify",
527 Some(worker_id),
049a22a3 528 self.auth_id.to_string(),
0698f78d
SR
529 false,
530 move |worker| {
531 worker.log("Automatically verifying newly added snapshot");
532
0698f78d 533
9c26a3d6 534 let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
0698f78d 535 if !verify_backup_dir_with_lock(
9c26a3d6 536 &verify_worker,
0698f78d 537 &backup_dir,
0698f78d 538 worker.upid().clone(),
d771a608 539 None,
0698f78d
SR
540 snap_lock,
541 )? {
542 bail!("verification failed - please check the log for details");
543 }
544
545 Ok(())
546 },
547 ).map(|_| ())
548 }
549
d95ced64
DM
550 pub fn log<S: AsRef<str>>(&self, msg: S) {
551 self.worker.log(msg);
552 }
b4b63e52 553
a42d1f55
DM
554 pub fn debug<S: AsRef<str>>(&self, msg: S) {
555 if self.debug { self.worker.log(msg); }
556 }
557
b4b63e52
DM
558 pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
559 match result {
560 Ok(data) => (self.formatter.format_data)(data, self),
561 Err(err) => (self.formatter.format_error)(err),
562 }
563 }
372724af
DM
564
565 /// Raise error if finished flag is not set
566 pub fn ensure_finished(&self) -> Result<(), Error> {
567 let state = self.state.lock().unwrap();
568 if !state.finished {
569 bail!("backup ended but finished flag is not set.");
570 }
571 Ok(())
572 }
573
b428af97
DM
574 /// Return true if the finished flag is set
575 pub fn finished(&self) -> bool {
576 let state = self.state.lock().unwrap();
577 state.finished
578 }
579
372724af
DM
580 /// Remove complete backup
581 pub fn remove_backup(&self) -> Result<(), Error> {
582 let mut state = self.state.lock().unwrap();
583 state.finished = true;
584
c9756b40 585 self.datastore.remove_backup_dir(&self.backup_dir, true)?;
372724af
DM
586
587 Ok(())
588 }
d95ced64
DM
589}
590
591impl RpcEnvironment for BackupEnvironment {
592
e8d1da6a
DM
593 fn result_attrib_mut(&mut self) -> &mut Value {
594 &mut self.result_attributes
d95ced64
DM
595 }
596
e8d1da6a
DM
597 fn result_attrib(&self) -> &Value {
598 &self.result_attributes
d95ced64
DM
599 }
600
601 fn env_type(&self) -> RpcEnvironmentType {
602 self.env_type
603 }
604
e6dc35ac
FG
605 fn set_auth_id(&mut self, _auth_id: Option<String>) {
606 panic!("unable to change auth_id");
d95ced64
DM
607 }
608
e6dc35ac
FG
609 fn get_auth_id(&self) -> Option<String> {
610 Some(self.auth_id.to_string())
d95ced64
DM
611 }
612}
21ee7912 613
dd5495d6 614impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
21ee7912
DM
615 fn as_ref(&self) -> &BackupEnvironment {
616 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
617 }
618}
dd5495d6
WB
619
620impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
b4b63e52
DM
621 fn as_ref(&self) -> &BackupEnvironment {
622 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
623 }
624}