]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/admin/datastore/backup/environment.rs
src/client/http_client.rs: code cleanup
[proxmox-backup.git] / src / api2 / admin / datastore / backup / environment.rs
CommitLineData
b4b63e52 1use failure::*;
f9578f3c 2use std::sync::{Arc, Mutex};
d95ced64
DM
3use std::collections::HashMap;
4
5use serde_json::Value;
6
7use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType};
8use crate::server::WorkerTask;
21ee7912 9use crate::backup::*;
b4b63e52
DM
10use crate::server::formatter::*;
11use hyper::{Body, Response};
d95ced64 12
8bea85b4
DM
13
14struct DynamicWriterState {
15 name: String,
16 index: DynamicIndexWriter,
17 offset: u64,
18 chunk_count: u64,
19}
20
a42fa400
DM
21struct FixedWriterState {
22 name: String,
23 index: FixedIndexWriter,
24 size: usize,
25 chunk_size: u32,
26 chunk_count: u64,
27}
28
f9578f3c 29struct SharedBackupState {
372724af 30 finished: bool,
f9578f3c 31 uid_counter: usize,
8bea85b4 32 dynamic_writers: HashMap<usize, DynamicWriterState>,
a42fa400 33 fixed_writers: HashMap<usize, FixedWriterState>,
a09c0e38 34 known_chunks: HashMap<[u8;32], u32>,
f9578f3c
DM
35}
36
372724af
DM
37impl SharedBackupState {
38
39 // Raise error if finished flag is set
40 fn ensure_unfinished(&self) -> Result<(), Error> {
41 if self.finished {
42 bail!("backup already marked as finished.");
43 }
44 Ok(())
45 }
46
47 // Get an unique integer ID
48 pub fn next_uid(&mut self) -> usize {
49 self.uid_counter += 1;
50 self.uid_counter
51 }
52}
53
54
d95ced64
DM
55/// `RpcEnvironmet` implementation for backup service
56#[derive(Clone)]
57pub struct BackupEnvironment {
58 env_type: RpcEnvironmentType,
59 result_attributes: HashMap<String, Value>,
60 user: String,
b4b63e52 61 pub formatter: &'static OutputFormatter,
21ee7912
DM
62 pub worker: Arc<WorkerTask>,
63 pub datastore: Arc<DataStore>,
f9578f3c 64 pub backup_dir: BackupDir,
b02a52e3 65 pub last_backup: Option<BackupInfo>,
f9578f3c 66 state: Arc<Mutex<SharedBackupState>>
d95ced64
DM
67}
68
69impl BackupEnvironment {
f9578f3c
DM
70 pub fn new(
71 env_type: RpcEnvironmentType,
72 user: String,
73 worker: Arc<WorkerTask>,
74 datastore: Arc<DataStore>,
75 backup_dir: BackupDir,
f9578f3c
DM
76 ) -> Self {
77
78 let state = SharedBackupState {
372724af 79 finished: false,
f9578f3c
DM
80 uid_counter: 0,
81 dynamic_writers: HashMap::new(),
a42fa400 82 fixed_writers: HashMap::new(),
a09c0e38 83 known_chunks: HashMap::new(),
f9578f3c
DM
84 };
85
d95ced64
DM
86 Self {
87 result_attributes: HashMap::new(),
88 env_type,
89 user,
90 worker,
21ee7912 91 datastore,
b4b63e52 92 formatter: &JSON_FORMATTER,
f9578f3c 93 backup_dir,
b02a52e3 94 last_backup: None,
f9578f3c 95 state: Arc::new(Mutex::new(state)),
d95ced64
DM
96 }
97 }
98
a09c0e38
DM
99 // Register a Chunk with associated length. A client may only use registered
100 // chunks (we do not trust clients that far ...)
101 pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
102 let mut state = self.state.lock().unwrap();
103
104 state.ensure_unfinished()?;
105
106 state.known_chunks.insert(digest, length);
107
108 Ok(())
109 }
110
111 pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
112 let state = self.state.lock().unwrap();
113
114 match state.known_chunks.get(digest) {
115 Some(len) => Some(*len),
116 None => None,
117 }
118 }
119
372724af 120 /// Store the writer with an unique ID
8bea85b4 121 pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
f9578f3c 122 let mut state = self.state.lock().unwrap();
f9578f3c 123
372724af
DM
124 state.ensure_unfinished()?;
125
126 let uid = state.next_uid();
f9578f3c 127
8bea85b4
DM
128 state.dynamic_writers.insert(uid, DynamicWriterState {
129 index, name, offset: 0, chunk_count: 0,
130 });
372724af
DM
131
132 Ok(uid)
f9578f3c
DM
133 }
134
a42fa400
DM
135 /// Store the writer with an unique ID
136 pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result<usize, Error> {
137 let mut state = self.state.lock().unwrap();
138
139 state.ensure_unfinished()?;
140
141 let uid = state.next_uid();
142
143 state.fixed_writers.insert(uid, FixedWriterState {
144 index, name, chunk_count: 0, size, chunk_size,
145 });
146
147 Ok(uid)
148 }
149
f9578f3c 150 /// Append chunk to dynamic writer
417cb073 151 pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
f9578f3c
DM
152 let mut state = self.state.lock().unwrap();
153
372724af
DM
154 state.ensure_unfinished()?;
155
f9578f3c
DM
156 let mut data = match state.dynamic_writers.get_mut(&wid) {
157 Some(data) => data,
158 None => bail!("dynamic writer '{}' not registered", wid),
159 };
160
8bea85b4
DM
161 data.offset += size as u64;
162 data.chunk_count += 1;
f9578f3c 163
417cb073
DM
164 if data.offset != offset {
165 bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
166 data.name, data.offset, offset);
167 }
168
8bea85b4 169 data.index.add_chunk(data.offset, digest)?;
f9578f3c
DM
170
171 Ok(())
172 }
173
a42fa400
DM
174 /// Append chunk to fixed writer
175 pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
176 let mut state = self.state.lock().unwrap();
177
178 state.ensure_unfinished()?;
179
180 let mut data = match state.fixed_writers.get_mut(&wid) {
181 Some(data) => data,
182 None => bail!("fixed writer '{}' not registered", wid),
183 };
184
185 data.chunk_count += 1;
186
187 if size != data.chunk_size {
188 bail!("fixed writer '{}' - got unexpected chunk size ({} != {}", data.name, size, data.chunk_size);
189 }
190
191 let pos = (offset as usize)/(data.chunk_size as usize);
192 data.index.add_digest(pos, digest)?;
193
194 Ok(())
195 }
196
a2077252 197 /// Close dynamic writer
8bea85b4 198 pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
a2077252
DM
199 let mut state = self.state.lock().unwrap();
200
372724af
DM
201 state.ensure_unfinished()?;
202
a2077252
DM
203 let mut data = match state.dynamic_writers.remove(&wid) {
204 Some(data) => data,
205 None => bail!("dynamic writer '{}' not registered", wid),
206 };
207
8bea85b4
DM
208 if data.chunk_count != chunk_count {
209 bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
210 }
211
212 if data.offset != size {
213 bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
214 }
215
216 data.index.close()?;
a2077252
DM
217
218 Ok(())
219 }
220
a42fa400
DM
221 /// Close fixed writer
222 pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
223 let mut state = self.state.lock().unwrap();
224
225 state.ensure_unfinished()?;
226
227 let mut data = match state.fixed_writers.remove(&wid) {
228 Some(data) => data,
229 None => bail!("fixed writer '{}' not registered", wid),
230 };
231
232 if data.chunk_count != chunk_count {
233 bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
234 }
235
236
237 data.index.close()?;
238
239 Ok(())
240 }
241
372724af
DM
242 /// Mark backup as finished
243 pub fn finish_backup(&self) -> Result<(), Error> {
244 let mut state = self.state.lock().unwrap();
245 // test if all writer are correctly closed
246
247 state.ensure_unfinished()?;
248
249 state.finished = true;
250
251 if state.dynamic_writers.len() != 0 {
252 bail!("found open index writer - unable to finish backup");
253 }
254
255 Ok(())
256 }
257
d95ced64
DM
258 pub fn log<S: AsRef<str>>(&self, msg: S) {
259 self.worker.log(msg);
260 }
b4b63e52
DM
261
262 pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
263 match result {
264 Ok(data) => (self.formatter.format_data)(data, self),
265 Err(err) => (self.formatter.format_error)(err),
266 }
267 }
372724af
DM
268
269 /// Raise error if finished flag is not set
270 pub fn ensure_finished(&self) -> Result<(), Error> {
271 let state = self.state.lock().unwrap();
272 if !state.finished {
273 bail!("backup ended but finished flag is not set.");
274 }
275 Ok(())
276 }
277
278 /// Remove complete backup
279 pub fn remove_backup(&self) -> Result<(), Error> {
280 let mut state = self.state.lock().unwrap();
281 state.finished = true;
282
283 self.datastore.remove_backup_dir(&self.backup_dir)?;
284
285 Ok(())
286 }
d95ced64
DM
287}
288
289impl RpcEnvironment for BackupEnvironment {
290
291 fn set_result_attrib(&mut self, name: &str, value: Value) {
292 self.result_attributes.insert(name.into(), value);
293 }
294
295 fn get_result_attrib(&self, name: &str) -> Option<&Value> {
296 self.result_attributes.get(name)
297 }
298
299 fn env_type(&self) -> RpcEnvironmentType {
300 self.env_type
301 }
302
303 fn set_user(&mut self, _user: Option<String>) {
304 panic!("unable to change user");
305 }
306
307 fn get_user(&self) -> Option<String> {
308 Some(self.user.clone())
309 }
310}
21ee7912
DM
311
312impl AsRef<BackupEnvironment> for RpcEnvironment {
313 fn as_ref(&self) -> &BackupEnvironment {
314 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
315 }
316}
b4b63e52
DM
317impl AsRef<BackupEnvironment> for Box<RpcEnvironment> {
318 fn as_ref(&self) -> &BackupEnvironment {
319 self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
320 }
321}