]>
Commit | Line | Data |
---|---|---|
b4b63e52 | 1 | use failure::*; |
f9578f3c | 2 | use std::sync::{Arc, Mutex}; |
d95ced64 DM |
3 | use std::collections::HashMap; |
4 | ||
5 | use serde_json::Value; | |
6 | ||
7 | use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType}; | |
8 | use crate::server::WorkerTask; | |
21ee7912 | 9 | use crate::backup::*; |
b4b63e52 DM |
10 | use crate::server::formatter::*; |
11 | use hyper::{Body, Response}; | |
d95ced64 | 12 | |
8bea85b4 DM |
13 | |
14 | struct DynamicWriterState { | |
15 | name: String, | |
16 | index: DynamicIndexWriter, | |
17 | offset: u64, | |
18 | chunk_count: u64, | |
19 | } | |
20 | ||
a42fa400 DM |
21 | struct FixedWriterState { |
22 | name: String, | |
23 | index: FixedIndexWriter, | |
24 | size: usize, | |
25 | chunk_size: u32, | |
26 | chunk_count: u64, | |
27 | } | |
28 | ||
f9578f3c | 29 | struct SharedBackupState { |
372724af | 30 | finished: bool, |
f9578f3c | 31 | uid_counter: usize, |
8bea85b4 | 32 | dynamic_writers: HashMap<usize, DynamicWriterState>, |
a42fa400 | 33 | fixed_writers: HashMap<usize, FixedWriterState>, |
a09c0e38 | 34 | known_chunks: HashMap<[u8;32], u32>, |
f9578f3c DM |
35 | } |
36 | ||
372724af DM |
37 | impl SharedBackupState { |
38 | ||
39 | // Raise error if finished flag is set | |
40 | fn ensure_unfinished(&self) -> Result<(), Error> { | |
41 | if self.finished { | |
42 | bail!("backup already marked as finished."); | |
43 | } | |
44 | Ok(()) | |
45 | } | |
46 | ||
47 | // Get an unique integer ID | |
48 | pub fn next_uid(&mut self) -> usize { | |
49 | self.uid_counter += 1; | |
50 | self.uid_counter | |
51 | } | |
52 | } | |
53 | ||
54 | ||
d95ced64 DM |
55 | /// `RpcEnvironmet` implementation for backup service |
56 | #[derive(Clone)] | |
57 | pub struct BackupEnvironment { | |
58 | env_type: RpcEnvironmentType, | |
59 | result_attributes: HashMap<String, Value>, | |
60 | user: String, | |
b4b63e52 | 61 | pub formatter: &'static OutputFormatter, |
21ee7912 DM |
62 | pub worker: Arc<WorkerTask>, |
63 | pub datastore: Arc<DataStore>, | |
f9578f3c | 64 | pub backup_dir: BackupDir, |
b02a52e3 | 65 | pub last_backup: Option<BackupInfo>, |
f9578f3c | 66 | state: Arc<Mutex<SharedBackupState>> |
d95ced64 DM |
67 | } |
68 | ||
69 | impl BackupEnvironment { | |
f9578f3c DM |
70 | pub fn new( |
71 | env_type: RpcEnvironmentType, | |
72 | user: String, | |
73 | worker: Arc<WorkerTask>, | |
74 | datastore: Arc<DataStore>, | |
75 | backup_dir: BackupDir, | |
f9578f3c DM |
76 | ) -> Self { |
77 | ||
78 | let state = SharedBackupState { | |
372724af | 79 | finished: false, |
f9578f3c DM |
80 | uid_counter: 0, |
81 | dynamic_writers: HashMap::new(), | |
a42fa400 | 82 | fixed_writers: HashMap::new(), |
a09c0e38 | 83 | known_chunks: HashMap::new(), |
f9578f3c DM |
84 | }; |
85 | ||
d95ced64 DM |
86 | Self { |
87 | result_attributes: HashMap::new(), | |
88 | env_type, | |
89 | user, | |
90 | worker, | |
21ee7912 | 91 | datastore, |
b4b63e52 | 92 | formatter: &JSON_FORMATTER, |
f9578f3c | 93 | backup_dir, |
b02a52e3 | 94 | last_backup: None, |
f9578f3c | 95 | state: Arc::new(Mutex::new(state)), |
d95ced64 DM |
96 | } |
97 | } | |
98 | ||
a09c0e38 DM |
99 | // Register a Chunk with associated length. A client may only use registered |
100 | // chunks (we do not trust clients that far ...) | |
101 | pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> { | |
102 | let mut state = self.state.lock().unwrap(); | |
103 | ||
104 | state.ensure_unfinished()?; | |
105 | ||
106 | state.known_chunks.insert(digest, length); | |
107 | ||
108 | Ok(()) | |
109 | } | |
110 | ||
111 | pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> { | |
112 | let state = self.state.lock().unwrap(); | |
113 | ||
114 | match state.known_chunks.get(digest) { | |
115 | Some(len) => Some(*len), | |
116 | None => None, | |
117 | } | |
118 | } | |
119 | ||
372724af | 120 | /// Store the writer with an unique ID |
8bea85b4 | 121 | pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> { |
f9578f3c | 122 | let mut state = self.state.lock().unwrap(); |
f9578f3c | 123 | |
372724af DM |
124 | state.ensure_unfinished()?; |
125 | ||
126 | let uid = state.next_uid(); | |
f9578f3c | 127 | |
8bea85b4 DM |
128 | state.dynamic_writers.insert(uid, DynamicWriterState { |
129 | index, name, offset: 0, chunk_count: 0, | |
130 | }); | |
372724af DM |
131 | |
132 | Ok(uid) | |
f9578f3c DM |
133 | } |
134 | ||
a42fa400 DM |
135 | /// Store the writer with an unique ID |
136 | pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result<usize, Error> { | |
137 | let mut state = self.state.lock().unwrap(); | |
138 | ||
139 | state.ensure_unfinished()?; | |
140 | ||
141 | let uid = state.next_uid(); | |
142 | ||
143 | state.fixed_writers.insert(uid, FixedWriterState { | |
144 | index, name, chunk_count: 0, size, chunk_size, | |
145 | }); | |
146 | ||
147 | Ok(uid) | |
148 | } | |
149 | ||
f9578f3c | 150 | /// Append chunk to dynamic writer |
417cb073 | 151 | pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> { |
f9578f3c DM |
152 | let mut state = self.state.lock().unwrap(); |
153 | ||
372724af DM |
154 | state.ensure_unfinished()?; |
155 | ||
f9578f3c DM |
156 | let mut data = match state.dynamic_writers.get_mut(&wid) { |
157 | Some(data) => data, | |
158 | None => bail!("dynamic writer '{}' not registered", wid), | |
159 | }; | |
160 | ||
8bea85b4 DM |
161 | data.offset += size as u64; |
162 | data.chunk_count += 1; | |
f9578f3c | 163 | |
417cb073 DM |
164 | if data.offset != offset { |
165 | bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})", | |
166 | data.name, data.offset, offset); | |
167 | } | |
168 | ||
8bea85b4 | 169 | data.index.add_chunk(data.offset, digest)?; |
f9578f3c DM |
170 | |
171 | Ok(()) | |
172 | } | |
173 | ||
a42fa400 DM |
174 | /// Append chunk to fixed writer |
175 | pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> { | |
176 | let mut state = self.state.lock().unwrap(); | |
177 | ||
178 | state.ensure_unfinished()?; | |
179 | ||
180 | let mut data = match state.fixed_writers.get_mut(&wid) { | |
181 | Some(data) => data, | |
182 | None => bail!("fixed writer '{}' not registered", wid), | |
183 | }; | |
184 | ||
185 | data.chunk_count += 1; | |
186 | ||
187 | if size != data.chunk_size { | |
188 | bail!("fixed writer '{}' - got unexpected chunk size ({} != {}", data.name, size, data.chunk_size); | |
189 | } | |
190 | ||
191 | let pos = (offset as usize)/(data.chunk_size as usize); | |
192 | data.index.add_digest(pos, digest)?; | |
193 | ||
194 | Ok(()) | |
195 | } | |
196 | ||
a2077252 | 197 | /// Close dynamic writer |
8bea85b4 | 198 | pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> { |
a2077252 DM |
199 | let mut state = self.state.lock().unwrap(); |
200 | ||
372724af DM |
201 | state.ensure_unfinished()?; |
202 | ||
a2077252 DM |
203 | let mut data = match state.dynamic_writers.remove(&wid) { |
204 | Some(data) => data, | |
205 | None => bail!("dynamic writer '{}' not registered", wid), | |
206 | }; | |
207 | ||
8bea85b4 DM |
208 | if data.chunk_count != chunk_count { |
209 | bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count); | |
210 | } | |
211 | ||
212 | if data.offset != size { | |
213 | bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size); | |
214 | } | |
215 | ||
216 | data.index.close()?; | |
a2077252 DM |
217 | |
218 | Ok(()) | |
219 | } | |
220 | ||
a42fa400 DM |
221 | /// Close fixed writer |
222 | pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> { | |
223 | let mut state = self.state.lock().unwrap(); | |
224 | ||
225 | state.ensure_unfinished()?; | |
226 | ||
227 | let mut data = match state.fixed_writers.remove(&wid) { | |
228 | Some(data) => data, | |
229 | None => bail!("fixed writer '{}' not registered", wid), | |
230 | }; | |
231 | ||
232 | if data.chunk_count != chunk_count { | |
233 | bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count); | |
234 | } | |
235 | ||
236 | ||
237 | data.index.close()?; | |
238 | ||
239 | Ok(()) | |
240 | } | |
241 | ||
372724af DM |
242 | /// Mark backup as finished |
243 | pub fn finish_backup(&self) -> Result<(), Error> { | |
244 | let mut state = self.state.lock().unwrap(); | |
245 | // test if all writer are correctly closed | |
246 | ||
247 | state.ensure_unfinished()?; | |
248 | ||
249 | state.finished = true; | |
250 | ||
251 | if state.dynamic_writers.len() != 0 { | |
252 | bail!("found open index writer - unable to finish backup"); | |
253 | } | |
254 | ||
255 | Ok(()) | |
256 | } | |
257 | ||
d95ced64 DM |
258 | pub fn log<S: AsRef<str>>(&self, msg: S) { |
259 | self.worker.log(msg); | |
260 | } | |
b4b63e52 DM |
261 | |
262 | pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> { | |
263 | match result { | |
264 | Ok(data) => (self.formatter.format_data)(data, self), | |
265 | Err(err) => (self.formatter.format_error)(err), | |
266 | } | |
267 | } | |
372724af DM |
268 | |
269 | /// Raise error if finished flag is not set | |
270 | pub fn ensure_finished(&self) -> Result<(), Error> { | |
271 | let state = self.state.lock().unwrap(); | |
272 | if !state.finished { | |
273 | bail!("backup ended but finished flag is not set."); | |
274 | } | |
275 | Ok(()) | |
276 | } | |
277 | ||
278 | /// Remove complete backup | |
279 | pub fn remove_backup(&self) -> Result<(), Error> { | |
280 | let mut state = self.state.lock().unwrap(); | |
281 | state.finished = true; | |
282 | ||
283 | self.datastore.remove_backup_dir(&self.backup_dir)?; | |
284 | ||
285 | Ok(()) | |
286 | } | |
d95ced64 DM |
287 | } |
288 | ||
289 | impl RpcEnvironment for BackupEnvironment { | |
290 | ||
291 | fn set_result_attrib(&mut self, name: &str, value: Value) { | |
292 | self.result_attributes.insert(name.into(), value); | |
293 | } | |
294 | ||
295 | fn get_result_attrib(&self, name: &str) -> Option<&Value> { | |
296 | self.result_attributes.get(name) | |
297 | } | |
298 | ||
299 | fn env_type(&self) -> RpcEnvironmentType { | |
300 | self.env_type | |
301 | } | |
302 | ||
303 | fn set_user(&mut self, _user: Option<String>) { | |
304 | panic!("unable to change user"); | |
305 | } | |
306 | ||
307 | fn get_user(&self) -> Option<String> { | |
308 | Some(self.user.clone()) | |
309 | } | |
310 | } | |
21ee7912 DM |
311 | |
312 | impl AsRef<BackupEnvironment> for RpcEnvironment { | |
313 | fn as_ref(&self) -> &BackupEnvironment { | |
314 | self.as_any().downcast_ref::<BackupEnvironment>().unwrap() | |
315 | } | |
316 | } | |
b4b63e52 DM |
317 | impl AsRef<BackupEnvironment> for Box<RpcEnvironment> { |
318 | fn as_ref(&self) -> &BackupEnvironment { | |
319 | self.as_any().downcast_ref::<BackupEnvironment>().unwrap() | |
320 | } | |
321 | } |