]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/backup/environment.rs
tree-wide: use 'dyn' for all trait objects
[proxmox-backup.git] / src / api2 / backup / environment.rs
CommitLineData
b4b63e52 1use failure::*;
f9578f3c 2use std::sync::{Arc, Mutex};
d95ced64
DM
3use std::collections::HashMap;
4
5use serde_json::Value;
6
7use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType};
8use crate::server::WorkerTask;
21ee7912 9use crate::backup::*;
b4b63e52
DM
10use crate::server::formatter::*;
11use hyper::{Body, Response};
d95ced64 12
96482891
DM
13struct UploadStatistic {
14 count: u64,
15 size: u64,
16 compressed_size: u64,
17 duplicates: u64,
18}
19
20impl UploadStatistic {
21 fn new() -> Self {
22 Self {
23 count: 0,
24 size: 0,
25 compressed_size: 0,
26 duplicates: 0,
27 }
28 }
29}
8bea85b4
DM
30
31struct DynamicWriterState {
32 name: String,
33 index: DynamicIndexWriter,
34 offset: u64,
35 chunk_count: u64,
96482891 36 upload_stat: UploadStatistic,
8bea85b4
DM
37}
38
a42fa400
DM
39struct FixedWriterState {
40 name: String,
41 index: FixedIndexWriter,
42 size: usize,
43 chunk_size: u32,
44 chunk_count: u64,
96482891 45 upload_stat: UploadStatistic,
a42fa400
DM
46}
47
f9578f3c 48struct SharedBackupState {
372724af 49 finished: bool,
f9578f3c 50 uid_counter: usize,
e6389f4e 51 file_counter: usize, // sucessfully uploaded files
8bea85b4 52 dynamic_writers: HashMap<usize, DynamicWriterState>,
a42fa400 53 fixed_writers: HashMap<usize, FixedWriterState>,
a09c0e38 54 known_chunks: HashMap<[u8;32], u32>,
f9578f3c
DM
55}
56
372724af
DM
57impl SharedBackupState {
58
59 // Raise error if finished flag is set
60 fn ensure_unfinished(&self) -> Result<(), Error> {
61 if self.finished {
62 bail!("backup already marked as finished.");
63 }
64 Ok(())
65 }
66
67 // Get an unique integer ID
68 pub fn next_uid(&mut self) -> usize {
69 self.uid_counter += 1;
70 self.uid_counter
71 }
72}
73
74
d95ced64
DM
75/// `RpcEnvironmet` implementation for backup service
76#[derive(Clone)]
77pub struct BackupEnvironment {
78 env_type: RpcEnvironmentType,
79 result_attributes: HashMap<String, Value>,
80 user: String,
a42d1f55 81 pub debug: bool,
b4b63e52 82 pub formatter: &'static OutputFormatter,
21ee7912
DM
83 pub worker: Arc<WorkerTask>,
84 pub datastore: Arc<DataStore>,
f9578f3c 85 pub backup_dir: BackupDir,
b02a52e3 86 pub last_backup: Option<BackupInfo>,
f9578f3c 87 state: Arc<Mutex<SharedBackupState>>
d95ced64
DM
88}
89
90impl BackupEnvironment {
f9578f3c
DM
91 pub fn new(
92 env_type: RpcEnvironmentType,
93 user: String,
94 worker: Arc<WorkerTask>,
95 datastore: Arc<DataStore>,
96 backup_dir: BackupDir,
f9578f3c
DM
97 ) -> Self {
98
99 let state = SharedBackupState {
372724af 100 finished: false,
f9578f3c 101 uid_counter: 0,
e6389f4e 102 file_counter: 0,
f9578f3c 103 dynamic_writers: HashMap::new(),
a42fa400 104 fixed_writers: HashMap::new(),
a09c0e38 105 known_chunks: HashMap::new(),
f9578f3c
DM
106 };
107
d95ced64
DM
108 Self {
109 result_attributes: HashMap::new(),
110 env_type,
111 user,
112 worker,
21ee7912 113 datastore,
a42d1f55 114 debug: false,
b4b63e52 115 formatter: &JSON_FORMATTER,
f9578f3c 116 backup_dir,
b02a52e3 117 last_backup: None,
f9578f3c 118 state: Arc::new(Mutex::new(state)),
d95ced64
DM
119 }
120 }
121
96482891
DM
122 /// Register a Chunk with associated length.
123 ///
124 /// We do not fully trust clients, so a client may only use registered
125 /// chunks. Please use this method to register chunks from previous backups.
a09c0e38
DM
126 pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
127 let mut state = self.state.lock().unwrap();
128
129 state.ensure_unfinished()?;
130
131 state.known_chunks.insert(digest, length);
132
133 Ok(())
134 }
135
96482891
DM
136 /// Register fixed length chunks after upload.
137 ///
138 /// Like `register_chunk()`, but additionally record statistics for
139 /// the fixed index writer.
642322b4
DM
140 pub fn register_fixed_chunk(
141 &self,
142 wid: usize,
143 digest: [u8; 32],
144 size: u32,
145 compressed_size: u32,
146 is_duplicate: bool,
147 ) -> Result<(), Error> {
148 let mut state = self.state.lock().unwrap();
149
150 state.ensure_unfinished()?;
151
152 let mut data = match state.fixed_writers.get_mut(&wid) {
153 Some(data) => data,
154 None => bail!("fixed writer '{}' not registered", wid),
155 };
156
157 if size != data.chunk_size {
158 bail!("fixed writer '{}' - got unexpected chunk size ({} != {}", data.name, size, data.chunk_size);
159 }
160
96482891
DM
161 // record statistics
162 data.upload_stat.count += 1;
163 data.upload_stat.size += size as u64;
164 data.upload_stat.compressed_size += compressed_size as u64;
165 if is_duplicate { data.upload_stat.duplicates += 1; }
166
167 // register chunk
642322b4
DM
168 state.known_chunks.insert(digest, size);
169
170 Ok(())
171 }
172
96482891
DM
173 /// Register dynamic length chunks after upload.
174 ///
175 /// Like `register_chunk()`, but additionally record statistics for
176 /// the dynamic index writer.
642322b4
DM
177 pub fn register_dynamic_chunk(
178 &self,
179 wid: usize,
180 digest: [u8; 32],
181 size: u32,
182 compressed_size: u32,
183 is_duplicate: bool,
184 ) -> Result<(), Error> {
185 let mut state = self.state.lock().unwrap();
186
187 state.ensure_unfinished()?;
188
189 let mut data = match state.dynamic_writers.get_mut(&wid) {
190 Some(data) => data,
191 None => bail!("dynamic writer '{}' not registered", wid),
192 };
193
96482891
DM
194 // record statistics
195 data.upload_stat.count += 1;
196 data.upload_stat.size += size as u64;
197 data.upload_stat.compressed_size += compressed_size as u64;
198 if is_duplicate { data.upload_stat.duplicates += 1; }
199
200 // register chunk
642322b4
DM
201 state.known_chunks.insert(digest, size);
202
203 Ok(())
204 }
205
a09c0e38
DM
206 pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
207 let state = self.state.lock().unwrap();
208
209 match state.known_chunks.get(digest) {
210 Some(len) => Some(*len),
211 None => None,
212 }
213 }
214
372724af 215 /// Store the writer with an unique ID
8bea85b4 216 pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
f9578f3c 217 let mut state = self.state.lock().unwrap();
f9578f3c 218
372724af
DM
219 state.ensure_unfinished()?;
220
221 let uid = state.next_uid();
f9578f3c 222
8bea85b4 223 state.dynamic_writers.insert(uid, DynamicWriterState {
96482891 224 index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(),
8bea85b4 225 });
372724af
DM
226
227 Ok(uid)
f9578f3c
DM
228 }
229
a42fa400
DM
230 /// Store the writer with an unique ID
231 pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result<usize, Error> {
232 let mut state = self.state.lock().unwrap();
233
234 state.ensure_unfinished()?;
235
236 let uid = state.next_uid();
237
238 state.fixed_writers.insert(uid, FixedWriterState {
96482891 239 index, name, chunk_count: 0, size, chunk_size, upload_stat: UploadStatistic::new(),
a42fa400
DM
240 });
241
242 Ok(uid)
243 }
244
f9578f3c 245 /// Append chunk to dynamic writer
417cb073 246 pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
f9578f3c
DM
247 let mut state = self.state.lock().unwrap();
248
372724af
DM
249 state.ensure_unfinished()?;
250
f9578f3c
DM
251 let mut data = match state.dynamic_writers.get_mut(&wid) {
252 Some(data) => data,
253 None => bail!("dynamic writer '{}' not registered", wid),
254 };
255
f9578f3c 256
417cb073
DM
257 if data.offset != offset {
258 bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
259 data.name, data.offset, offset);
260 }
261
3dc5b2a2
DM
262 data.offset += size as u64;
263 data.chunk_count += 1;
264
8bea85b4 265 data.index.add_chunk(data.offset, digest)?;
f9578f3c
DM
266
267 Ok(())
268 }
269
a42fa400
DM
270 /// Append chunk to fixed writer
271 pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
272 let mut state = self.state.lock().unwrap();
273
274 state.ensure_unfinished()?;
275
276 let mut data = match state.fixed_writers.get_mut(&wid) {
277 Some(data) => data,
278 None => bail!("fixed writer '{}' not registered", wid),
279 };
280
281 data.chunk_count += 1;
282
283 if size != data.chunk_size {
284 bail!("fixed writer '{}' - got unexpected chunk size ({} != {}", data.name, size, data.chunk_size);
285 }
286
287 let pos = (offset as usize)/(data.chunk_size as usize);
288 data.index.add_digest(pos, digest)?;
289
290 Ok(())
291 }
292
96482891
DM
293 fn log_upload_stat(&self, archive_name: &str, size: u64, chunk_count: u64, upload_stat: &UploadStatistic) {
294 self.log(format!("Upload statistics for '{}'", archive_name));
295 self.log(format!("Size: {}", size));
296 self.log(format!("Chunk count: {}", chunk_count));
297 self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size));
298 if upload_stat.size > 0 {
299 self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size));
300 }
301 }
302
a2077252 303 /// Close dynamic writer
8bea85b4 304 pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
a2077252
DM
305 let mut state = self.state.lock().unwrap();
306
372724af
DM
307 state.ensure_unfinished()?;
308
a2077252
DM
309 let mut data = match state.dynamic_writers.remove(&wid) {
310 Some(data) => data,
311 None => bail!("dynamic writer '{}' not registered", wid),
312 };
313
8bea85b4
DM
314 if data.chunk_count != chunk_count {
315 bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
316 }
317
318 if data.offset != size {
319 bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
320 }
321
322 data.index.close()?;
a2077252 323
96482891
DM
324 self.log_upload_stat(&data.name, size, chunk_count, &data.upload_stat);
325
e6389f4e
DM
326 state.file_counter += 1;
327
a2077252
DM
328 Ok(())
329 }
330
a42fa400
DM
331 /// Close fixed writer
332 pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
333 let mut state = self.state.lock().unwrap();
334
335 state.ensure_unfinished()?;
336
337 let mut data = match state.fixed_writers.remove(&wid) {
338 Some(data) => data,
339 None => bail!("fixed writer '{}' not registered", wid),
340 };
341
342 if data.chunk_count != chunk_count {
006f3ff4 343 bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
a42fa400
DM
344 }
345
006f3ff4
DM
346 let expected_count = data.index.index_length();
347
348 if chunk_count != (expected_count as u64) {
349 bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count);
350 }
351
352 if size != (data.size as u64) {
353 bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
354 }
a42fa400
DM
355
356 data.index.close()?;
357
96482891
DM
358 self.log_upload_stat(&data.name, size, chunk_count, &data.upload_stat);
359
e6389f4e
DM
360 state.file_counter += 1;
361
a42fa400
DM
362 Ok(())
363 }
364
372724af
DM
365 /// Mark backup as finished
366 pub fn finish_backup(&self) -> Result<(), Error> {
367 let mut state = self.state.lock().unwrap();
368 // test if all writer are correctly closed
369
370 state.ensure_unfinished()?;
371
372 state.finished = true;
373
374 if state.dynamic_writers.len() != 0 {
375 bail!("found open index writer - unable to finish backup");
376 }
377
e6389f4e
DM
378 if state.file_counter == 0 {
379 bail!("backup does not contain valid files (file count == 0)");
380 }
381
372724af
DM
382 Ok(())
383 }
384
d95ced64
DM
385 pub fn log<S: AsRef<str>>(&self, msg: S) {
386 self.worker.log(msg);
387 }
b4b63e52 388
a42d1f55
DM
389 pub fn debug<S: AsRef<str>>(&self, msg: S) {
390 if self.debug { self.worker.log(msg); }
391 }
392
b4b63e52
DM
393 pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
394 match result {
395 Ok(data) => (self.formatter.format_data)(data, self),
396 Err(err) => (self.formatter.format_error)(err),
397 }
398 }
372724af
DM
399
400 /// Raise error if finished flag is not set
401 pub fn ensure_finished(&self) -> Result<(), Error> {
402 let state = self.state.lock().unwrap();
403 if !state.finished {
404 bail!("backup ended but finished flag is not set.");
405 }
406 Ok(())
407 }
408
409 /// Remove complete backup
410 pub fn remove_backup(&self) -> Result<(), Error> {
411 let mut state = self.state.lock().unwrap();
412 state.finished = true;
413
414 self.datastore.remove_backup_dir(&self.backup_dir)?;
415
416 Ok(())
417 }
d95ced64
DM
418}
419
420impl RpcEnvironment for BackupEnvironment {
421
422 fn set_result_attrib(&mut self, name: &str, value: Value) {
423 self.result_attributes.insert(name.into(), value);
424 }
425
426 fn get_result_attrib(&self, name: &str) -> Option<&Value> {
427 self.result_attributes.get(name)
428 }
429
430 fn env_type(&self) -> RpcEnvironmentType {
431 self.env_type
432 }
433
434 fn set_user(&mut self, _user: Option<String>) {
435 panic!("unable to change user");
436 }
437
438 fn get_user(&self) -> Option<String> {
439 Some(self.user.clone())
440 }
441}
21ee7912 442
dd5495d6 443impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
21ee7912
DM
444 fn as_ref(&self) -> &BackupEnvironment {
445 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
446 }
447}
dd5495d6
WB
448
449impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
b4b63e52
DM
450 fn as_ref(&self) -> &BackupEnvironment {
451 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
452 }
453}