]>
Commit | Line | Data |
---|---|---|
b4b63e52 | 1 | use failure::*; |
f9578f3c | 2 | use std::sync::{Arc, Mutex}; |
d95ced64 DM |
3 | use std::collections::HashMap; |
4 | ||
5 | use serde_json::Value; | |
6 | ||
e18a6c9e DM |
7 | use proxmox::tools::{ |
8 | digest_to_hex, | |
9 | fs::file_set_contents, | |
10 | }; | |
00388226 | 11 | |
d95ced64 DM |
12 | use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType}; |
13 | use crate::server::WorkerTask; | |
21ee7912 | 14 | use crate::backup::*; |
b4b63e52 DM |
15 | use crate::server::formatter::*; |
16 | use hyper::{Body, Response}; | |
d95ced64 | 17 | |
96482891 DM |
18 | struct UploadStatistic { |
19 | count: u64, | |
20 | size: u64, | |
21 | compressed_size: u64, | |
22 | duplicates: u64, | |
23 | } | |
24 | ||
25 | impl UploadStatistic { | |
26 | fn new() -> Self { | |
27 | Self { | |
28 | count: 0, | |
29 | size: 0, | |
30 | compressed_size: 0, | |
31 | duplicates: 0, | |
32 | } | |
33 | } | |
34 | } | |
8bea85b4 DM |
35 | |
36 | struct DynamicWriterState { | |
37 | name: String, | |
38 | index: DynamicIndexWriter, | |
39 | offset: u64, | |
40 | chunk_count: u64, | |
96482891 | 41 | upload_stat: UploadStatistic, |
8bea85b4 DM |
42 | } |
43 | ||
a42fa400 DM |
44 | struct FixedWriterState { |
45 | name: String, | |
46 | index: FixedIndexWriter, | |
47 | size: usize, | |
48 | chunk_size: u32, | |
49 | chunk_count: u64, | |
5e04ec70 | 50 | small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller) |
96482891 | 51 | upload_stat: UploadStatistic, |
a42fa400 DM |
52 | } |
53 | ||
f9578f3c | 54 | struct SharedBackupState { |
372724af | 55 | finished: bool, |
f9578f3c | 56 | uid_counter: usize, |
e6389f4e | 57 | file_counter: usize, // sucessfully uploaded files |
8bea85b4 | 58 | dynamic_writers: HashMap<usize, DynamicWriterState>, |
a42fa400 | 59 | fixed_writers: HashMap<usize, FixedWriterState>, |
a09c0e38 | 60 | known_chunks: HashMap<[u8;32], u32>, |
f9578f3c DM |
61 | } |
62 | ||
372724af DM |
63 | impl SharedBackupState { |
64 | ||
65 | // Raise error if finished flag is set | |
66 | fn ensure_unfinished(&self) -> Result<(), Error> { | |
67 | if self.finished { | |
68 | bail!("backup already marked as finished."); | |
69 | } | |
70 | Ok(()) | |
71 | } | |
72 | ||
73 | // Get an unique integer ID | |
74 | pub fn next_uid(&mut self) -> usize { | |
75 | self.uid_counter += 1; | |
76 | self.uid_counter | |
77 | } | |
78 | } | |
79 | ||
80 | ||
d95ced64 DM |
81 | /// `RpcEnvironmet` implementation for backup service |
82 | #[derive(Clone)] | |
83 | pub struct BackupEnvironment { | |
84 | env_type: RpcEnvironmentType, | |
85 | result_attributes: HashMap<String, Value>, | |
86 | user: String, | |
a42d1f55 | 87 | pub debug: bool, |
b4b63e52 | 88 | pub formatter: &'static OutputFormatter, |
21ee7912 DM |
89 | pub worker: Arc<WorkerTask>, |
90 | pub datastore: Arc<DataStore>, | |
f9578f3c | 91 | pub backup_dir: BackupDir, |
b02a52e3 | 92 | pub last_backup: Option<BackupInfo>, |
f9578f3c | 93 | state: Arc<Mutex<SharedBackupState>> |
d95ced64 DM |
94 | } |
95 | ||
96 | impl BackupEnvironment { | |
f9578f3c DM |
97 | pub fn new( |
98 | env_type: RpcEnvironmentType, | |
99 | user: String, | |
100 | worker: Arc<WorkerTask>, | |
101 | datastore: Arc<DataStore>, | |
102 | backup_dir: BackupDir, | |
f9578f3c DM |
103 | ) -> Self { |
104 | ||
105 | let state = SharedBackupState { | |
372724af | 106 | finished: false, |
f9578f3c | 107 | uid_counter: 0, |
e6389f4e | 108 | file_counter: 0, |
f9578f3c | 109 | dynamic_writers: HashMap::new(), |
a42fa400 | 110 | fixed_writers: HashMap::new(), |
a09c0e38 | 111 | known_chunks: HashMap::new(), |
f9578f3c DM |
112 | }; |
113 | ||
d95ced64 DM |
114 | Self { |
115 | result_attributes: HashMap::new(), | |
116 | env_type, | |
117 | user, | |
118 | worker, | |
21ee7912 | 119 | datastore, |
a42d1f55 | 120 | debug: false, |
b4b63e52 | 121 | formatter: &JSON_FORMATTER, |
f9578f3c | 122 | backup_dir, |
b02a52e3 | 123 | last_backup: None, |
f9578f3c | 124 | state: Arc::new(Mutex::new(state)), |
d95ced64 DM |
125 | } |
126 | } | |
127 | ||
96482891 DM |
128 | /// Register a Chunk with associated length. |
129 | /// | |
130 | /// We do not fully trust clients, so a client may only use registered | |
131 | /// chunks. Please use this method to register chunks from previous backups. | |
a09c0e38 DM |
132 | pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> { |
133 | let mut state = self.state.lock().unwrap(); | |
134 | ||
135 | state.ensure_unfinished()?; | |
136 | ||
137 | state.known_chunks.insert(digest, length); | |
138 | ||
139 | Ok(()) | |
140 | } | |
141 | ||
96482891 DM |
142 | /// Register fixed length chunks after upload. |
143 | /// | |
144 | /// Like `register_chunk()`, but additionally record statistics for | |
145 | /// the fixed index writer. | |
642322b4 DM |
146 | pub fn register_fixed_chunk( |
147 | &self, | |
148 | wid: usize, | |
149 | digest: [u8; 32], | |
150 | size: u32, | |
151 | compressed_size: u32, | |
152 | is_duplicate: bool, | |
153 | ) -> Result<(), Error> { | |
154 | let mut state = self.state.lock().unwrap(); | |
155 | ||
156 | state.ensure_unfinished()?; | |
157 | ||
158 | let mut data = match state.fixed_writers.get_mut(&wid) { | |
159 | Some(data) => data, | |
160 | None => bail!("fixed writer '{}' not registered", wid), | |
161 | }; | |
162 | ||
5e04ec70 DM |
163 | if size > data.chunk_size { |
164 | bail!("fixed writer '{}' - got large chunk ({} > {}", data.name, size, data.chunk_size); | |
165 | } else if size < data.chunk_size { | |
166 | data.small_chunk_count += 1; | |
167 | if data.small_chunk_count > 1 { | |
168 | bail!("fixed writer '{}' - detected multiple end chunks (chunk size too small)"); | |
169 | } | |
642322b4 DM |
170 | } |
171 | ||
96482891 DM |
172 | // record statistics |
173 | data.upload_stat.count += 1; | |
174 | data.upload_stat.size += size as u64; | |
175 | data.upload_stat.compressed_size += compressed_size as u64; | |
176 | if is_duplicate { data.upload_stat.duplicates += 1; } | |
177 | ||
178 | // register chunk | |
642322b4 DM |
179 | state.known_chunks.insert(digest, size); |
180 | ||
181 | Ok(()) | |
182 | } | |
183 | ||
96482891 DM |
184 | /// Register dynamic length chunks after upload. |
185 | /// | |
186 | /// Like `register_chunk()`, but additionally record statistics for | |
187 | /// the dynamic index writer. | |
642322b4 DM |
188 | pub fn register_dynamic_chunk( |
189 | &self, | |
190 | wid: usize, | |
191 | digest: [u8; 32], | |
192 | size: u32, | |
193 | compressed_size: u32, | |
194 | is_duplicate: bool, | |
195 | ) -> Result<(), Error> { | |
196 | let mut state = self.state.lock().unwrap(); | |
197 | ||
198 | state.ensure_unfinished()?; | |
199 | ||
200 | let mut data = match state.dynamic_writers.get_mut(&wid) { | |
201 | Some(data) => data, | |
202 | None => bail!("dynamic writer '{}' not registered", wid), | |
203 | }; | |
204 | ||
96482891 DM |
205 | // record statistics |
206 | data.upload_stat.count += 1; | |
207 | data.upload_stat.size += size as u64; | |
208 | data.upload_stat.compressed_size += compressed_size as u64; | |
209 | if is_duplicate { data.upload_stat.duplicates += 1; } | |
210 | ||
211 | // register chunk | |
642322b4 DM |
212 | state.known_chunks.insert(digest, size); |
213 | ||
214 | Ok(()) | |
215 | } | |
216 | ||
a09c0e38 DM |
217 | pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> { |
218 | let state = self.state.lock().unwrap(); | |
219 | ||
220 | match state.known_chunks.get(digest) { | |
221 | Some(len) => Some(*len), | |
222 | None => None, | |
223 | } | |
224 | } | |
225 | ||
372724af | 226 | /// Store the writer with an unique ID |
8bea85b4 | 227 | pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> { |
f9578f3c | 228 | let mut state = self.state.lock().unwrap(); |
f9578f3c | 229 | |
372724af DM |
230 | state.ensure_unfinished()?; |
231 | ||
232 | let uid = state.next_uid(); | |
f9578f3c | 233 | |
8bea85b4 | 234 | state.dynamic_writers.insert(uid, DynamicWriterState { |
96482891 | 235 | index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(), |
8bea85b4 | 236 | }); |
372724af DM |
237 | |
238 | Ok(uid) | |
f9578f3c DM |
239 | } |
240 | ||
a42fa400 DM |
241 | /// Store the writer with an unique ID |
242 | pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result<usize, Error> { | |
243 | let mut state = self.state.lock().unwrap(); | |
244 | ||
245 | state.ensure_unfinished()?; | |
246 | ||
247 | let uid = state.next_uid(); | |
248 | ||
249 | state.fixed_writers.insert(uid, FixedWriterState { | |
5e04ec70 | 250 | index, name, chunk_count: 0, size, chunk_size, small_chunk_count: 0, upload_stat: UploadStatistic::new(), |
a42fa400 DM |
251 | }); |
252 | ||
253 | Ok(uid) | |
254 | } | |
255 | ||
f9578f3c | 256 | /// Append chunk to dynamic writer |
417cb073 | 257 | pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> { |
f9578f3c DM |
258 | let mut state = self.state.lock().unwrap(); |
259 | ||
372724af DM |
260 | state.ensure_unfinished()?; |
261 | ||
f9578f3c DM |
262 | let mut data = match state.dynamic_writers.get_mut(&wid) { |
263 | Some(data) => data, | |
264 | None => bail!("dynamic writer '{}' not registered", wid), | |
265 | }; | |
266 | ||
f9578f3c | 267 | |
417cb073 DM |
268 | if data.offset != offset { |
269 | bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})", | |
270 | data.name, data.offset, offset); | |
271 | } | |
272 | ||
3dc5b2a2 DM |
273 | data.offset += size as u64; |
274 | data.chunk_count += 1; | |
275 | ||
8bea85b4 | 276 | data.index.add_chunk(data.offset, digest)?; |
f9578f3c DM |
277 | |
278 | Ok(()) | |
279 | } | |
280 | ||
a42fa400 DM |
281 | /// Append chunk to fixed writer |
282 | pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> { | |
283 | let mut state = self.state.lock().unwrap(); | |
284 | ||
285 | state.ensure_unfinished()?; | |
286 | ||
287 | let mut data = match state.fixed_writers.get_mut(&wid) { | |
288 | Some(data) => data, | |
289 | None => bail!("fixed writer '{}' not registered", wid), | |
290 | }; | |
291 | ||
5e04ec70 DM |
292 | let end = (offset as usize) + (size as usize); |
293 | let idx = data.index.check_chunk_alignment(end, size as usize)?; | |
a42fa400 | 294 | |
5e04ec70 | 295 | data.chunk_count += 1; |
a42fa400 | 296 | |
5e04ec70 | 297 | data.index.add_digest(idx, digest)?; |
a42fa400 DM |
298 | |
299 | Ok(()) | |
300 | } | |
301 | ||
00388226 | 302 | fn log_upload_stat(&self, archive_name: &str, csum: &[u8; 32], uuid: &[u8; 16], size: u64, chunk_count: u64, upload_stat: &UploadStatistic) { |
96482891 | 303 | self.log(format!("Upload statistics for '{}'", archive_name)); |
e18a6c9e DM |
304 | self.log(format!("UUID: {}", digest_to_hex(uuid))); |
305 | self.log(format!("Checksum: {}", digest_to_hex(csum))); | |
96482891 DM |
306 | self.log(format!("Size: {}", size)); |
307 | self.log(format!("Chunk count: {}", chunk_count)); | |
36075475 DM |
308 | |
309 | if size == 0 || chunk_count == 0 { | |
310 | return; | |
311 | } | |
312 | ||
96482891 | 313 | self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size)); |
36075475 DM |
314 | |
315 | let client_side_duplicates = chunk_count - upload_stat.count; | |
316 | let server_side_duplicates = upload_stat.duplicates; | |
317 | ||
318 | if (client_side_duplicates + server_side_duplicates) > 0 { | |
319 | let per = (client_side_duplicates + server_side_duplicates)*100/chunk_count; | |
320 | self.log(format!("Duplicates: {}+{} ({}%)", client_side_duplicates, server_side_duplicates, per)); | |
321 | } | |
322 | ||
96482891 | 323 | if upload_stat.size > 0 { |
36075475 | 324 | self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size)); |
96482891 DM |
325 | } |
326 | } | |
327 | ||
a2077252 | 328 | /// Close dynamic writer |
8bea85b4 | 329 | pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> { |
a2077252 DM |
330 | let mut state = self.state.lock().unwrap(); |
331 | ||
372724af DM |
332 | state.ensure_unfinished()?; |
333 | ||
a2077252 DM |
334 | let mut data = match state.dynamic_writers.remove(&wid) { |
335 | Some(data) => data, | |
336 | None => bail!("dynamic writer '{}' not registered", wid), | |
337 | }; | |
338 | ||
8bea85b4 DM |
339 | if data.chunk_count != chunk_count { |
340 | bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count); | |
341 | } | |
342 | ||
343 | if data.offset != size { | |
344 | bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size); | |
345 | } | |
346 | ||
00388226 DM |
347 | let uuid = data.index.uuid; |
348 | ||
349 | let csum = data.index.close()?; | |
a2077252 | 350 | |
00388226 | 351 | self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat); |
96482891 | 352 | |
e6389f4e DM |
353 | state.file_counter += 1; |
354 | ||
a2077252 DM |
355 | Ok(()) |
356 | } | |
357 | ||
a42fa400 DM |
358 | /// Close fixed writer |
359 | pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> { | |
360 | let mut state = self.state.lock().unwrap(); | |
361 | ||
362 | state.ensure_unfinished()?; | |
363 | ||
364 | let mut data = match state.fixed_writers.remove(&wid) { | |
365 | Some(data) => data, | |
366 | None => bail!("fixed writer '{}' not registered", wid), | |
367 | }; | |
368 | ||
369 | if data.chunk_count != chunk_count { | |
006f3ff4 | 370 | bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count); |
a42fa400 DM |
371 | } |
372 | ||
006f3ff4 DM |
373 | let expected_count = data.index.index_length(); |
374 | ||
375 | if chunk_count != (expected_count as u64) { | |
376 | bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count); | |
377 | } | |
378 | ||
379 | if size != (data.size as u64) { | |
380 | bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size); | |
381 | } | |
a42fa400 | 382 | |
00388226 DM |
383 | let uuid = data.index.uuid; |
384 | ||
385 | let csum = data.index.close()?; | |
a42fa400 | 386 | |
00388226 | 387 | self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat); |
96482891 | 388 | |
e6389f4e DM |
389 | state.file_counter += 1; |
390 | ||
a42fa400 DM |
391 | Ok(()) |
392 | } | |
393 | ||
46bd8800 DM |
394 | pub fn add_blob(&self, file_name: &str, data: Vec<u8>) -> Result<(), Error> { |
395 | ||
396 | let mut path = self.datastore.base_path(); | |
397 | path.push(self.backup_dir.relative_path()); | |
398 | path.push(file_name); | |
399 | ||
400 | let blob_len = data.len(); | |
401 | let orig_len = data.len(); // fixme: | |
402 | ||
403 | let mut blob = DataBlob::from_raw(data)?; | |
404 | // always comput CRC at server side | |
405 | blob.set_crc(blob.compute_crc()); | |
406 | ||
407 | let raw_data = blob.raw_data(); | |
e18a6c9e | 408 | file_set_contents(&path, raw_data, None)?; |
46bd8800 DM |
409 | |
410 | self.log(format!("add blob {:?} ({} bytes, comp: {})", path, orig_len, blob_len)); | |
411 | ||
412 | let mut state = self.state.lock().unwrap(); | |
413 | state.file_counter += 1; | |
414 | ||
415 | Ok(()) | |
416 | } | |
417 | ||
372724af DM |
418 | /// Mark backup as finished |
419 | pub fn finish_backup(&self) -> Result<(), Error> { | |
420 | let mut state = self.state.lock().unwrap(); | |
421 | // test if all writer are correctly closed | |
422 | ||
423 | state.ensure_unfinished()?; | |
424 | ||
425 | state.finished = true; | |
426 | ||
427 | if state.dynamic_writers.len() != 0 { | |
428 | bail!("found open index writer - unable to finish backup"); | |
429 | } | |
430 | ||
e6389f4e DM |
431 | if state.file_counter == 0 { |
432 | bail!("backup does not contain valid files (file count == 0)"); | |
433 | } | |
434 | ||
372724af DM |
435 | Ok(()) |
436 | } | |
437 | ||
d95ced64 DM |
438 | pub fn log<S: AsRef<str>>(&self, msg: S) { |
439 | self.worker.log(msg); | |
440 | } | |
b4b63e52 | 441 | |
a42d1f55 DM |
442 | pub fn debug<S: AsRef<str>>(&self, msg: S) { |
443 | if self.debug { self.worker.log(msg); } | |
444 | } | |
445 | ||
b4b63e52 DM |
446 | pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> { |
447 | match result { | |
448 | Ok(data) => (self.formatter.format_data)(data, self), | |
449 | Err(err) => (self.formatter.format_error)(err), | |
450 | } | |
451 | } | |
372724af DM |
452 | |
453 | /// Raise error if finished flag is not set | |
454 | pub fn ensure_finished(&self) -> Result<(), Error> { | |
455 | let state = self.state.lock().unwrap(); | |
456 | if !state.finished { | |
457 | bail!("backup ended but finished flag is not set."); | |
458 | } | |
459 | Ok(()) | |
460 | } | |
461 | ||
462 | /// Remove complete backup | |
463 | pub fn remove_backup(&self) -> Result<(), Error> { | |
464 | let mut state = self.state.lock().unwrap(); | |
465 | state.finished = true; | |
466 | ||
467 | self.datastore.remove_backup_dir(&self.backup_dir)?; | |
468 | ||
469 | Ok(()) | |
470 | } | |
d95ced64 DM |
471 | } |
472 | ||
473 | impl RpcEnvironment for BackupEnvironment { | |
474 | ||
475 | fn set_result_attrib(&mut self, name: &str, value: Value) { | |
476 | self.result_attributes.insert(name.into(), value); | |
477 | } | |
478 | ||
479 | fn get_result_attrib(&self, name: &str) -> Option<&Value> { | |
480 | self.result_attributes.get(name) | |
481 | } | |
482 | ||
483 | fn env_type(&self) -> RpcEnvironmentType { | |
484 | self.env_type | |
485 | } | |
486 | ||
487 | fn set_user(&mut self, _user: Option<String>) { | |
488 | panic!("unable to change user"); | |
489 | } | |
490 | ||
491 | fn get_user(&self) -> Option<String> { | |
492 | Some(self.user.clone()) | |
493 | } | |
494 | } | |
21ee7912 | 495 | |
dd5495d6 | 496 | impl AsRef<BackupEnvironment> for dyn RpcEnvironment { |
21ee7912 DM |
497 | fn as_ref(&self) -> &BackupEnvironment { |
498 | self.as_any().downcast_ref::<BackupEnvironment>().unwrap() | |
499 | } | |
500 | } | |
dd5495d6 WB |
501 | |
502 | impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> { | |
b4b63e52 DM |
503 | fn as_ref(&self) -> &BackupEnvironment { |
504 | self.as_any().downcast_ref::<BackupEnvironment>().unwrap() | |
505 | } | |
506 | } |