]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/backup/environment.rs
update to nix 0.14, use code from proxmox:tools
[proxmox-backup.git] / src / api2 / backup / environment.rs
CommitLineData
b4b63e52 1use failure::*;
f9578f3c 2use std::sync::{Arc, Mutex};
d95ced64
DM
3use std::collections::HashMap;
4
5use serde_json::Value;
6
e18a6c9e
DM
7use proxmox::tools::{
8 digest_to_hex,
9 fs::file_set_contents,
10};
00388226 11
d95ced64
DM
12use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType};
13use crate::server::WorkerTask;
21ee7912 14use crate::backup::*;
b4b63e52
DM
15use crate::server::formatter::*;
16use hyper::{Body, Response};
d95ced64 17
96482891
DM
18struct UploadStatistic {
19 count: u64,
20 size: u64,
21 compressed_size: u64,
22 duplicates: u64,
23}
24
25impl UploadStatistic {
26 fn new() -> Self {
27 Self {
28 count: 0,
29 size: 0,
30 compressed_size: 0,
31 duplicates: 0,
32 }
33 }
34}
8bea85b4
DM
35
36struct DynamicWriterState {
37 name: String,
38 index: DynamicIndexWriter,
39 offset: u64,
40 chunk_count: u64,
96482891 41 upload_stat: UploadStatistic,
8bea85b4
DM
42}
43
a42fa400
DM
44struct FixedWriterState {
45 name: String,
46 index: FixedIndexWriter,
47 size: usize,
48 chunk_size: u32,
49 chunk_count: u64,
5e04ec70 50 small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller)
96482891 51 upload_stat: UploadStatistic,
a42fa400
DM
52}
53
f9578f3c 54struct SharedBackupState {
372724af 55 finished: bool,
f9578f3c 56 uid_counter: usize,
e6389f4e 57 file_counter: usize, // sucessfully uploaded files
8bea85b4 58 dynamic_writers: HashMap<usize, DynamicWriterState>,
a42fa400 59 fixed_writers: HashMap<usize, FixedWriterState>,
a09c0e38 60 known_chunks: HashMap<[u8;32], u32>,
f9578f3c
DM
61}
62
372724af
DM
63impl SharedBackupState {
64
65 // Raise error if finished flag is set
66 fn ensure_unfinished(&self) -> Result<(), Error> {
67 if self.finished {
68 bail!("backup already marked as finished.");
69 }
70 Ok(())
71 }
72
73 // Get an unique integer ID
74 pub fn next_uid(&mut self) -> usize {
75 self.uid_counter += 1;
76 self.uid_counter
77 }
78}
79
80
d95ced64
DM
81/// `RpcEnvironmet` implementation for backup service
82#[derive(Clone)]
83pub struct BackupEnvironment {
84 env_type: RpcEnvironmentType,
85 result_attributes: HashMap<String, Value>,
86 user: String,
a42d1f55 87 pub debug: bool,
b4b63e52 88 pub formatter: &'static OutputFormatter,
21ee7912
DM
89 pub worker: Arc<WorkerTask>,
90 pub datastore: Arc<DataStore>,
f9578f3c 91 pub backup_dir: BackupDir,
b02a52e3 92 pub last_backup: Option<BackupInfo>,
f9578f3c 93 state: Arc<Mutex<SharedBackupState>>
d95ced64
DM
94}
95
96impl BackupEnvironment {
f9578f3c
DM
97 pub fn new(
98 env_type: RpcEnvironmentType,
99 user: String,
100 worker: Arc<WorkerTask>,
101 datastore: Arc<DataStore>,
102 backup_dir: BackupDir,
f9578f3c
DM
103 ) -> Self {
104
105 let state = SharedBackupState {
372724af 106 finished: false,
f9578f3c 107 uid_counter: 0,
e6389f4e 108 file_counter: 0,
f9578f3c 109 dynamic_writers: HashMap::new(),
a42fa400 110 fixed_writers: HashMap::new(),
a09c0e38 111 known_chunks: HashMap::new(),
f9578f3c
DM
112 };
113
d95ced64
DM
114 Self {
115 result_attributes: HashMap::new(),
116 env_type,
117 user,
118 worker,
21ee7912 119 datastore,
a42d1f55 120 debug: false,
b4b63e52 121 formatter: &JSON_FORMATTER,
f9578f3c 122 backup_dir,
b02a52e3 123 last_backup: None,
f9578f3c 124 state: Arc::new(Mutex::new(state)),
d95ced64
DM
125 }
126 }
127
96482891
DM
128 /// Register a Chunk with associated length.
129 ///
130 /// We do not fully trust clients, so a client may only use registered
131 /// chunks. Please use this method to register chunks from previous backups.
a09c0e38
DM
132 pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
133 let mut state = self.state.lock().unwrap();
134
135 state.ensure_unfinished()?;
136
137 state.known_chunks.insert(digest, length);
138
139 Ok(())
140 }
141
96482891
DM
142 /// Register fixed length chunks after upload.
143 ///
144 /// Like `register_chunk()`, but additionally record statistics for
145 /// the fixed index writer.
642322b4
DM
146 pub fn register_fixed_chunk(
147 &self,
148 wid: usize,
149 digest: [u8; 32],
150 size: u32,
151 compressed_size: u32,
152 is_duplicate: bool,
153 ) -> Result<(), Error> {
154 let mut state = self.state.lock().unwrap();
155
156 state.ensure_unfinished()?;
157
158 let mut data = match state.fixed_writers.get_mut(&wid) {
159 Some(data) => data,
160 None => bail!("fixed writer '{}' not registered", wid),
161 };
162
5e04ec70
DM
163 if size > data.chunk_size {
164 bail!("fixed writer '{}' - got large chunk ({} > {}", data.name, size, data.chunk_size);
165 } else if size < data.chunk_size {
166 data.small_chunk_count += 1;
167 if data.small_chunk_count > 1 {
168 bail!("fixed writer '{}' - detected multiple end chunks (chunk size too small)");
169 }
642322b4
DM
170 }
171
96482891
DM
172 // record statistics
173 data.upload_stat.count += 1;
174 data.upload_stat.size += size as u64;
175 data.upload_stat.compressed_size += compressed_size as u64;
176 if is_duplicate { data.upload_stat.duplicates += 1; }
177
178 // register chunk
642322b4
DM
179 state.known_chunks.insert(digest, size);
180
181 Ok(())
182 }
183
96482891
DM
184 /// Register dynamic length chunks after upload.
185 ///
186 /// Like `register_chunk()`, but additionally record statistics for
187 /// the dynamic index writer.
642322b4
DM
188 pub fn register_dynamic_chunk(
189 &self,
190 wid: usize,
191 digest: [u8; 32],
192 size: u32,
193 compressed_size: u32,
194 is_duplicate: bool,
195 ) -> Result<(), Error> {
196 let mut state = self.state.lock().unwrap();
197
198 state.ensure_unfinished()?;
199
200 let mut data = match state.dynamic_writers.get_mut(&wid) {
201 Some(data) => data,
202 None => bail!("dynamic writer '{}' not registered", wid),
203 };
204
96482891
DM
205 // record statistics
206 data.upload_stat.count += 1;
207 data.upload_stat.size += size as u64;
208 data.upload_stat.compressed_size += compressed_size as u64;
209 if is_duplicate { data.upload_stat.duplicates += 1; }
210
211 // register chunk
642322b4
DM
212 state.known_chunks.insert(digest, size);
213
214 Ok(())
215 }
216
a09c0e38
DM
217 pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
218 let state = self.state.lock().unwrap();
219
220 match state.known_chunks.get(digest) {
221 Some(len) => Some(*len),
222 None => None,
223 }
224 }
225
372724af 226 /// Store the writer with an unique ID
8bea85b4 227 pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
f9578f3c 228 let mut state = self.state.lock().unwrap();
f9578f3c 229
372724af
DM
230 state.ensure_unfinished()?;
231
232 let uid = state.next_uid();
f9578f3c 233
8bea85b4 234 state.dynamic_writers.insert(uid, DynamicWriterState {
96482891 235 index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(),
8bea85b4 236 });
372724af
DM
237
238 Ok(uid)
f9578f3c
DM
239 }
240
a42fa400
DM
241 /// Store the writer with an unique ID
242 pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result<usize, Error> {
243 let mut state = self.state.lock().unwrap();
244
245 state.ensure_unfinished()?;
246
247 let uid = state.next_uid();
248
249 state.fixed_writers.insert(uid, FixedWriterState {
5e04ec70 250 index, name, chunk_count: 0, size, chunk_size, small_chunk_count: 0, upload_stat: UploadStatistic::new(),
a42fa400
DM
251 });
252
253 Ok(uid)
254 }
255
f9578f3c 256 /// Append chunk to dynamic writer
417cb073 257 pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
f9578f3c
DM
258 let mut state = self.state.lock().unwrap();
259
372724af
DM
260 state.ensure_unfinished()?;
261
f9578f3c
DM
262 let mut data = match state.dynamic_writers.get_mut(&wid) {
263 Some(data) => data,
264 None => bail!("dynamic writer '{}' not registered", wid),
265 };
266
f9578f3c 267
417cb073
DM
268 if data.offset != offset {
269 bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
270 data.name, data.offset, offset);
271 }
272
3dc5b2a2
DM
273 data.offset += size as u64;
274 data.chunk_count += 1;
275
8bea85b4 276 data.index.add_chunk(data.offset, digest)?;
f9578f3c
DM
277
278 Ok(())
279 }
280
a42fa400
DM
281 /// Append chunk to fixed writer
282 pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
283 let mut state = self.state.lock().unwrap();
284
285 state.ensure_unfinished()?;
286
287 let mut data = match state.fixed_writers.get_mut(&wid) {
288 Some(data) => data,
289 None => bail!("fixed writer '{}' not registered", wid),
290 };
291
5e04ec70
DM
292 let end = (offset as usize) + (size as usize);
293 let idx = data.index.check_chunk_alignment(end, size as usize)?;
a42fa400 294
5e04ec70 295 data.chunk_count += 1;
a42fa400 296
5e04ec70 297 data.index.add_digest(idx, digest)?;
a42fa400
DM
298
299 Ok(())
300 }
301
00388226 302 fn log_upload_stat(&self, archive_name: &str, csum: &[u8; 32], uuid: &[u8; 16], size: u64, chunk_count: u64, upload_stat: &UploadStatistic) {
96482891 303 self.log(format!("Upload statistics for '{}'", archive_name));
e18a6c9e
DM
304 self.log(format!("UUID: {}", digest_to_hex(uuid)));
305 self.log(format!("Checksum: {}", digest_to_hex(csum)));
96482891
DM
306 self.log(format!("Size: {}", size));
307 self.log(format!("Chunk count: {}", chunk_count));
36075475
DM
308
309 if size == 0 || chunk_count == 0 {
310 return;
311 }
312
96482891 313 self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size));
36075475
DM
314
315 let client_side_duplicates = chunk_count - upload_stat.count;
316 let server_side_duplicates = upload_stat.duplicates;
317
318 if (client_side_duplicates + server_side_duplicates) > 0 {
319 let per = (client_side_duplicates + server_side_duplicates)*100/chunk_count;
320 self.log(format!("Duplicates: {}+{} ({}%)", client_side_duplicates, server_side_duplicates, per));
321 }
322
96482891 323 if upload_stat.size > 0 {
36075475 324 self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size));
96482891
DM
325 }
326 }
327
a2077252 328 /// Close dynamic writer
8bea85b4 329 pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
a2077252
DM
330 let mut state = self.state.lock().unwrap();
331
372724af
DM
332 state.ensure_unfinished()?;
333
a2077252
DM
334 let mut data = match state.dynamic_writers.remove(&wid) {
335 Some(data) => data,
336 None => bail!("dynamic writer '{}' not registered", wid),
337 };
338
8bea85b4
DM
339 if data.chunk_count != chunk_count {
340 bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
341 }
342
343 if data.offset != size {
344 bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
345 }
346
00388226
DM
347 let uuid = data.index.uuid;
348
349 let csum = data.index.close()?;
a2077252 350
00388226 351 self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 352
e6389f4e
DM
353 state.file_counter += 1;
354
a2077252
DM
355 Ok(())
356 }
357
a42fa400
DM
358 /// Close fixed writer
359 pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
360 let mut state = self.state.lock().unwrap();
361
362 state.ensure_unfinished()?;
363
364 let mut data = match state.fixed_writers.remove(&wid) {
365 Some(data) => data,
366 None => bail!("fixed writer '{}' not registered", wid),
367 };
368
369 if data.chunk_count != chunk_count {
006f3ff4 370 bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
a42fa400
DM
371 }
372
006f3ff4
DM
373 let expected_count = data.index.index_length();
374
375 if chunk_count != (expected_count as u64) {
376 bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count);
377 }
378
379 if size != (data.size as u64) {
380 bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
381 }
a42fa400 382
00388226
DM
383 let uuid = data.index.uuid;
384
385 let csum = data.index.close()?;
a42fa400 386
00388226 387 self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
96482891 388
e6389f4e
DM
389 state.file_counter += 1;
390
a42fa400
DM
391 Ok(())
392 }
393
46bd8800
DM
394 pub fn add_blob(&self, file_name: &str, data: Vec<u8>) -> Result<(), Error> {
395
396 let mut path = self.datastore.base_path();
397 path.push(self.backup_dir.relative_path());
398 path.push(file_name);
399
400 let blob_len = data.len();
401 let orig_len = data.len(); // fixme:
402
403 let mut blob = DataBlob::from_raw(data)?;
404 // always comput CRC at server side
405 blob.set_crc(blob.compute_crc());
406
407 let raw_data = blob.raw_data();
e18a6c9e 408 file_set_contents(&path, raw_data, None)?;
46bd8800
DM
409
410 self.log(format!("add blob {:?} ({} bytes, comp: {})", path, orig_len, blob_len));
411
412 let mut state = self.state.lock().unwrap();
413 state.file_counter += 1;
414
415 Ok(())
416 }
417
372724af
DM
418 /// Mark backup as finished
419 pub fn finish_backup(&self) -> Result<(), Error> {
420 let mut state = self.state.lock().unwrap();
421 // test if all writer are correctly closed
422
423 state.ensure_unfinished()?;
424
425 state.finished = true;
426
427 if state.dynamic_writers.len() != 0 {
428 bail!("found open index writer - unable to finish backup");
429 }
430
e6389f4e
DM
431 if state.file_counter == 0 {
432 bail!("backup does not contain valid files (file count == 0)");
433 }
434
372724af
DM
435 Ok(())
436 }
437
d95ced64
DM
438 pub fn log<S: AsRef<str>>(&self, msg: S) {
439 self.worker.log(msg);
440 }
b4b63e52 441
a42d1f55
DM
442 pub fn debug<S: AsRef<str>>(&self, msg: S) {
443 if self.debug { self.worker.log(msg); }
444 }
445
b4b63e52
DM
446 pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
447 match result {
448 Ok(data) => (self.formatter.format_data)(data, self),
449 Err(err) => (self.formatter.format_error)(err),
450 }
451 }
372724af
DM
452
453 /// Raise error if finished flag is not set
454 pub fn ensure_finished(&self) -> Result<(), Error> {
455 let state = self.state.lock().unwrap();
456 if !state.finished {
457 bail!("backup ended but finished flag is not set.");
458 }
459 Ok(())
460 }
461
462 /// Remove complete backup
463 pub fn remove_backup(&self) -> Result<(), Error> {
464 let mut state = self.state.lock().unwrap();
465 state.finished = true;
466
467 self.datastore.remove_backup_dir(&self.backup_dir)?;
468
469 Ok(())
470 }
d95ced64
DM
471}
472
473impl RpcEnvironment for BackupEnvironment {
474
475 fn set_result_attrib(&mut self, name: &str, value: Value) {
476 self.result_attributes.insert(name.into(), value);
477 }
478
479 fn get_result_attrib(&self, name: &str) -> Option<&Value> {
480 self.result_attributes.get(name)
481 }
482
483 fn env_type(&self) -> RpcEnvironmentType {
484 self.env_type
485 }
486
487 fn set_user(&mut self, _user: Option<String>) {
488 panic!("unable to change user");
489 }
490
491 fn get_user(&self) -> Option<String> {
492 Some(self.user.clone())
493 }
494}
21ee7912 495
dd5495d6 496impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
21ee7912
DM
497 fn as_ref(&self) -> &BackupEnvironment {
498 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
499 }
500}
dd5495d6
WB
501
502impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
b4b63e52
DM
503 fn as_ref(&self) -> &BackupEnvironment {
504 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
505 }
506}