]>
Commit | Line | Data |
---|---|---|
b4b63e52 | 1 | use failure::*; |
f9578f3c | 2 | use std::sync::{Arc, Mutex}; |
d95ced64 DM |
3 | use std::collections::HashMap; |
4 | ||
5 | use serde_json::Value; | |
6 | ||
7 | use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType}; | |
8 | use crate::server::WorkerTask; | |
21ee7912 | 9 | use crate::backup::*; |
b4b63e52 DM |
10 | use crate::server::formatter::*; |
11 | use hyper::{Body, Response}; | |
d95ced64 | 12 | |
96482891 DM |
13 | struct UploadStatistic { |
14 | count: u64, | |
15 | size: u64, | |
16 | compressed_size: u64, | |
17 | duplicates: u64, | |
18 | } | |
19 | ||
20 | impl UploadStatistic { | |
21 | fn new() -> Self { | |
22 | Self { | |
23 | count: 0, | |
24 | size: 0, | |
25 | compressed_size: 0, | |
26 | duplicates: 0, | |
27 | } | |
28 | } | |
29 | } | |
8bea85b4 DM |
30 | |
31 | struct DynamicWriterState { | |
32 | name: String, | |
33 | index: DynamicIndexWriter, | |
34 | offset: u64, | |
35 | chunk_count: u64, | |
96482891 | 36 | upload_stat: UploadStatistic, |
8bea85b4 DM |
37 | } |
38 | ||
a42fa400 DM |
39 | struct FixedWriterState { |
40 | name: String, | |
41 | index: FixedIndexWriter, | |
42 | size: usize, | |
43 | chunk_size: u32, | |
44 | chunk_count: u64, | |
96482891 | 45 | upload_stat: UploadStatistic, |
a42fa400 DM |
46 | } |
47 | ||
f9578f3c | 48 | struct SharedBackupState { |
372724af | 49 | finished: bool, |
f9578f3c | 50 | uid_counter: usize, |
e6389f4e | 51 | file_counter: usize, // sucessfully uploaded files |
8bea85b4 | 52 | dynamic_writers: HashMap<usize, DynamicWriterState>, |
a42fa400 | 53 | fixed_writers: HashMap<usize, FixedWriterState>, |
a09c0e38 | 54 | known_chunks: HashMap<[u8;32], u32>, |
f9578f3c DM |
55 | } |
56 | ||
372724af DM |
57 | impl SharedBackupState { |
58 | ||
59 | // Raise error if finished flag is set | |
60 | fn ensure_unfinished(&self) -> Result<(), Error> { | |
61 | if self.finished { | |
62 | bail!("backup already marked as finished."); | |
63 | } | |
64 | Ok(()) | |
65 | } | |
66 | ||
67 | // Get an unique integer ID | |
68 | pub fn next_uid(&mut self) -> usize { | |
69 | self.uid_counter += 1; | |
70 | self.uid_counter | |
71 | } | |
72 | } | |
73 | ||
74 | ||
d95ced64 DM |
75 | /// `RpcEnvironmet` implementation for backup service |
76 | #[derive(Clone)] | |
77 | pub struct BackupEnvironment { | |
78 | env_type: RpcEnvironmentType, | |
79 | result_attributes: HashMap<String, Value>, | |
80 | user: String, | |
a42d1f55 | 81 | pub debug: bool, |
b4b63e52 | 82 | pub formatter: &'static OutputFormatter, |
21ee7912 DM |
83 | pub worker: Arc<WorkerTask>, |
84 | pub datastore: Arc<DataStore>, | |
f9578f3c | 85 | pub backup_dir: BackupDir, |
b02a52e3 | 86 | pub last_backup: Option<BackupInfo>, |
f9578f3c | 87 | state: Arc<Mutex<SharedBackupState>> |
d95ced64 DM |
88 | } |
89 | ||
90 | impl BackupEnvironment { | |
f9578f3c DM |
91 | pub fn new( |
92 | env_type: RpcEnvironmentType, | |
93 | user: String, | |
94 | worker: Arc<WorkerTask>, | |
95 | datastore: Arc<DataStore>, | |
96 | backup_dir: BackupDir, | |
f9578f3c DM |
97 | ) -> Self { |
98 | ||
99 | let state = SharedBackupState { | |
372724af | 100 | finished: false, |
f9578f3c | 101 | uid_counter: 0, |
e6389f4e | 102 | file_counter: 0, |
f9578f3c | 103 | dynamic_writers: HashMap::new(), |
a42fa400 | 104 | fixed_writers: HashMap::new(), |
a09c0e38 | 105 | known_chunks: HashMap::new(), |
f9578f3c DM |
106 | }; |
107 | ||
d95ced64 DM |
108 | Self { |
109 | result_attributes: HashMap::new(), | |
110 | env_type, | |
111 | user, | |
112 | worker, | |
21ee7912 | 113 | datastore, |
a42d1f55 | 114 | debug: false, |
b4b63e52 | 115 | formatter: &JSON_FORMATTER, |
f9578f3c | 116 | backup_dir, |
b02a52e3 | 117 | last_backup: None, |
f9578f3c | 118 | state: Arc::new(Mutex::new(state)), |
d95ced64 DM |
119 | } |
120 | } | |
121 | ||
96482891 DM |
122 | /// Register a Chunk with associated length. |
123 | /// | |
124 | /// We do not fully trust clients, so a client may only use registered | |
125 | /// chunks. Please use this method to register chunks from previous backups. | |
a09c0e38 DM |
126 | pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> { |
127 | let mut state = self.state.lock().unwrap(); | |
128 | ||
129 | state.ensure_unfinished()?; | |
130 | ||
131 | state.known_chunks.insert(digest, length); | |
132 | ||
133 | Ok(()) | |
134 | } | |
135 | ||
96482891 DM |
136 | /// Register fixed length chunks after upload. |
137 | /// | |
138 | /// Like `register_chunk()`, but additionally record statistics for | |
139 | /// the fixed index writer. | |
642322b4 DM |
140 | pub fn register_fixed_chunk( |
141 | &self, | |
142 | wid: usize, | |
143 | digest: [u8; 32], | |
144 | size: u32, | |
145 | compressed_size: u32, | |
146 | is_duplicate: bool, | |
147 | ) -> Result<(), Error> { | |
148 | let mut state = self.state.lock().unwrap(); | |
149 | ||
150 | state.ensure_unfinished()?; | |
151 | ||
152 | let mut data = match state.fixed_writers.get_mut(&wid) { | |
153 | Some(data) => data, | |
154 | None => bail!("fixed writer '{}' not registered", wid), | |
155 | }; | |
156 | ||
157 | if size != data.chunk_size { | |
158 | bail!("fixed writer '{}' - got unexpected chunk size ({} != {}", data.name, size, data.chunk_size); | |
159 | } | |
160 | ||
96482891 DM |
161 | // record statistics |
162 | data.upload_stat.count += 1; | |
163 | data.upload_stat.size += size as u64; | |
164 | data.upload_stat.compressed_size += compressed_size as u64; | |
165 | if is_duplicate { data.upload_stat.duplicates += 1; } | |
166 | ||
167 | // register chunk | |
642322b4 DM |
168 | state.known_chunks.insert(digest, size); |
169 | ||
170 | Ok(()) | |
171 | } | |
172 | ||
96482891 DM |
173 | /// Register dynamic length chunks after upload. |
174 | /// | |
175 | /// Like `register_chunk()`, but additionally record statistics for | |
176 | /// the dynamic index writer. | |
642322b4 DM |
177 | pub fn register_dynamic_chunk( |
178 | &self, | |
179 | wid: usize, | |
180 | digest: [u8; 32], | |
181 | size: u32, | |
182 | compressed_size: u32, | |
183 | is_duplicate: bool, | |
184 | ) -> Result<(), Error> { | |
185 | let mut state = self.state.lock().unwrap(); | |
186 | ||
187 | state.ensure_unfinished()?; | |
188 | ||
189 | let mut data = match state.dynamic_writers.get_mut(&wid) { | |
190 | Some(data) => data, | |
191 | None => bail!("dynamic writer '{}' not registered", wid), | |
192 | }; | |
193 | ||
96482891 DM |
194 | // record statistics |
195 | data.upload_stat.count += 1; | |
196 | data.upload_stat.size += size as u64; | |
197 | data.upload_stat.compressed_size += compressed_size as u64; | |
198 | if is_duplicate { data.upload_stat.duplicates += 1; } | |
199 | ||
200 | // register chunk | |
642322b4 DM |
201 | state.known_chunks.insert(digest, size); |
202 | ||
203 | Ok(()) | |
204 | } | |
205 | ||
a09c0e38 DM |
206 | pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> { |
207 | let state = self.state.lock().unwrap(); | |
208 | ||
209 | match state.known_chunks.get(digest) { | |
210 | Some(len) => Some(*len), | |
211 | None => None, | |
212 | } | |
213 | } | |
214 | ||
372724af | 215 | /// Store the writer with an unique ID |
8bea85b4 | 216 | pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> { |
f9578f3c | 217 | let mut state = self.state.lock().unwrap(); |
f9578f3c | 218 | |
372724af DM |
219 | state.ensure_unfinished()?; |
220 | ||
221 | let uid = state.next_uid(); | |
f9578f3c | 222 | |
8bea85b4 | 223 | state.dynamic_writers.insert(uid, DynamicWriterState { |
96482891 | 224 | index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(), |
8bea85b4 | 225 | }); |
372724af DM |
226 | |
227 | Ok(uid) | |
f9578f3c DM |
228 | } |
229 | ||
a42fa400 DM |
230 | /// Store the writer with an unique ID |
231 | pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result<usize, Error> { | |
232 | let mut state = self.state.lock().unwrap(); | |
233 | ||
234 | state.ensure_unfinished()?; | |
235 | ||
236 | let uid = state.next_uid(); | |
237 | ||
238 | state.fixed_writers.insert(uid, FixedWriterState { | |
96482891 | 239 | index, name, chunk_count: 0, size, chunk_size, upload_stat: UploadStatistic::new(), |
a42fa400 DM |
240 | }); |
241 | ||
242 | Ok(uid) | |
243 | } | |
244 | ||
f9578f3c | 245 | /// Append chunk to dynamic writer |
417cb073 | 246 | pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> { |
f9578f3c DM |
247 | let mut state = self.state.lock().unwrap(); |
248 | ||
372724af DM |
249 | state.ensure_unfinished()?; |
250 | ||
f9578f3c DM |
251 | let mut data = match state.dynamic_writers.get_mut(&wid) { |
252 | Some(data) => data, | |
253 | None => bail!("dynamic writer '{}' not registered", wid), | |
254 | }; | |
255 | ||
f9578f3c | 256 | |
417cb073 DM |
257 | if data.offset != offset { |
258 | bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})", | |
259 | data.name, data.offset, offset); | |
260 | } | |
261 | ||
3dc5b2a2 DM |
262 | data.offset += size as u64; |
263 | data.chunk_count += 1; | |
264 | ||
8bea85b4 | 265 | data.index.add_chunk(data.offset, digest)?; |
f9578f3c DM |
266 | |
267 | Ok(()) | |
268 | } | |
269 | ||
a42fa400 DM |
270 | /// Append chunk to fixed writer |
271 | pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> { | |
272 | let mut state = self.state.lock().unwrap(); | |
273 | ||
274 | state.ensure_unfinished()?; | |
275 | ||
276 | let mut data = match state.fixed_writers.get_mut(&wid) { | |
277 | Some(data) => data, | |
278 | None => bail!("fixed writer '{}' not registered", wid), | |
279 | }; | |
280 | ||
281 | data.chunk_count += 1; | |
282 | ||
283 | if size != data.chunk_size { | |
284 | bail!("fixed writer '{}' - got unexpected chunk size ({} != {}", data.name, size, data.chunk_size); | |
285 | } | |
286 | ||
287 | let pos = (offset as usize)/(data.chunk_size as usize); | |
288 | data.index.add_digest(pos, digest)?; | |
289 | ||
290 | Ok(()) | |
291 | } | |
292 | ||
96482891 DM |
293 | fn log_upload_stat(&self, archive_name: &str, size: u64, chunk_count: u64, upload_stat: &UploadStatistic) { |
294 | self.log(format!("Upload statistics for '{}'", archive_name)); | |
295 | self.log(format!("Size: {}", size)); | |
296 | self.log(format!("Chunk count: {}", chunk_count)); | |
297 | self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size)); | |
298 | if upload_stat.size > 0 { | |
299 | self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size)); | |
300 | } | |
301 | } | |
302 | ||
a2077252 | 303 | /// Close dynamic writer |
8bea85b4 | 304 | pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> { |
a2077252 DM |
305 | let mut state = self.state.lock().unwrap(); |
306 | ||
372724af DM |
307 | state.ensure_unfinished()?; |
308 | ||
a2077252 DM |
309 | let mut data = match state.dynamic_writers.remove(&wid) { |
310 | Some(data) => data, | |
311 | None => bail!("dynamic writer '{}' not registered", wid), | |
312 | }; | |
313 | ||
8bea85b4 DM |
314 | if data.chunk_count != chunk_count { |
315 | bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count); | |
316 | } | |
317 | ||
318 | if data.offset != size { | |
319 | bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size); | |
320 | } | |
321 | ||
322 | data.index.close()?; | |
a2077252 | 323 | |
96482891 DM |
324 | self.log_upload_stat(&data.name, size, chunk_count, &data.upload_stat); |
325 | ||
e6389f4e DM |
326 | state.file_counter += 1; |
327 | ||
a2077252 DM |
328 | Ok(()) |
329 | } | |
330 | ||
a42fa400 DM |
331 | /// Close fixed writer |
332 | pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> { | |
333 | let mut state = self.state.lock().unwrap(); | |
334 | ||
335 | state.ensure_unfinished()?; | |
336 | ||
337 | let mut data = match state.fixed_writers.remove(&wid) { | |
338 | Some(data) => data, | |
339 | None => bail!("fixed writer '{}' not registered", wid), | |
340 | }; | |
341 | ||
342 | if data.chunk_count != chunk_count { | |
006f3ff4 | 343 | bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count); |
a42fa400 DM |
344 | } |
345 | ||
006f3ff4 DM |
346 | let expected_count = data.index.index_length(); |
347 | ||
348 | if chunk_count != (expected_count as u64) { | |
349 | bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count); | |
350 | } | |
351 | ||
352 | if size != (data.size as u64) { | |
353 | bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size); | |
354 | } | |
a42fa400 DM |
355 | |
356 | data.index.close()?; | |
357 | ||
96482891 DM |
358 | self.log_upload_stat(&data.name, size, chunk_count, &data.upload_stat); |
359 | ||
e6389f4e DM |
360 | state.file_counter += 1; |
361 | ||
a42fa400 DM |
362 | Ok(()) |
363 | } | |
364 | ||
372724af DM |
365 | /// Mark backup as finished |
366 | pub fn finish_backup(&self) -> Result<(), Error> { | |
367 | let mut state = self.state.lock().unwrap(); | |
368 | // test if all writer are correctly closed | |
369 | ||
370 | state.ensure_unfinished()?; | |
371 | ||
372 | state.finished = true; | |
373 | ||
374 | if state.dynamic_writers.len() != 0 { | |
375 | bail!("found open index writer - unable to finish backup"); | |
376 | } | |
377 | ||
e6389f4e DM |
378 | if state.file_counter == 0 { |
379 | bail!("backup does not contain valid files (file count == 0)"); | |
380 | } | |
381 | ||
372724af DM |
382 | Ok(()) |
383 | } | |
384 | ||
d95ced64 DM |
385 | pub fn log<S: AsRef<str>>(&self, msg: S) { |
386 | self.worker.log(msg); | |
387 | } | |
b4b63e52 | 388 | |
a42d1f55 DM |
389 | pub fn debug<S: AsRef<str>>(&self, msg: S) { |
390 | if self.debug { self.worker.log(msg); } | |
391 | } | |
392 | ||
b4b63e52 DM |
393 | pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> { |
394 | match result { | |
395 | Ok(data) => (self.formatter.format_data)(data, self), | |
396 | Err(err) => (self.formatter.format_error)(err), | |
397 | } | |
398 | } | |
372724af DM |
399 | |
400 | /// Raise error if finished flag is not set | |
401 | pub fn ensure_finished(&self) -> Result<(), Error> { | |
402 | let state = self.state.lock().unwrap(); | |
403 | if !state.finished { | |
404 | bail!("backup ended but finished flag is not set."); | |
405 | } | |
406 | Ok(()) | |
407 | } | |
408 | ||
409 | /// Remove complete backup | |
410 | pub fn remove_backup(&self) -> Result<(), Error> { | |
411 | let mut state = self.state.lock().unwrap(); | |
412 | state.finished = true; | |
413 | ||
414 | self.datastore.remove_backup_dir(&self.backup_dir)?; | |
415 | ||
416 | Ok(()) | |
417 | } | |
d95ced64 DM |
418 | } |
419 | ||
420 | impl RpcEnvironment for BackupEnvironment { | |
421 | ||
422 | fn set_result_attrib(&mut self, name: &str, value: Value) { | |
423 | self.result_attributes.insert(name.into(), value); | |
424 | } | |
425 | ||
426 | fn get_result_attrib(&self, name: &str) -> Option<&Value> { | |
427 | self.result_attributes.get(name) | |
428 | } | |
429 | ||
430 | fn env_type(&self) -> RpcEnvironmentType { | |
431 | self.env_type | |
432 | } | |
433 | ||
434 | fn set_user(&mut self, _user: Option<String>) { | |
435 | panic!("unable to change user"); | |
436 | } | |
437 | ||
438 | fn get_user(&self) -> Option<String> { | |
439 | Some(self.user.clone()) | |
440 | } | |
441 | } | |
21ee7912 | 442 | |
dd5495d6 | 443 | impl AsRef<BackupEnvironment> for dyn RpcEnvironment { |
21ee7912 DM |
444 | fn as_ref(&self) -> &BackupEnvironment { |
445 | self.as_any().downcast_ref::<BackupEnvironment>().unwrap() | |
446 | } | |
447 | } | |
dd5495d6 WB |
448 | |
449 | impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> { | |
b4b63e52 DM |
450 | fn as_ref(&self) -> &BackupEnvironment { |
451 | self.as_any().downcast_ref::<BackupEnvironment>().unwrap() | |
452 | } | |
453 | } |