]>
Commit | Line | Data |
---|---|---|
152764ec DM |
1 | use failure::*; |
2 | ||
72375ce6 | 3 | use std::sync::Arc; |
92ac375a DM |
4 | |
5 | use futures::*; | |
152764ec | 6 | use hyper::header::{HeaderValue, UPGRADE}; |
bd1507c4 | 7 | use hyper::{Body, Response, StatusCode}; |
152764ec | 8 | use hyper::http::request::Parts; |
0aadd40b | 9 | use chrono::{Local, TimeZone}; |
152764ec | 10 | |
f9578f3c | 11 | use serde_json::{json, Value}; |
152764ec | 12 | |
92ac375a | 13 | use crate::tools; |
d3611366 | 14 | use crate::tools::wrapped_reader_stream::*; |
152764ec DM |
15 | use crate::api_schema::router::*; |
16 | use crate::api_schema::*; | |
58c8d7d9 | 17 | use crate::server::WorkerTask; |
21ee7912 | 18 | use crate::backup::*; |
152764ec | 19 | |
d95ced64 DM |
20 | mod environment; |
21 | use environment::*; | |
22 | ||
bd1507c4 DM |
23 | mod service; |
24 | use service::*; | |
25 | ||
21ee7912 DM |
26 | mod upload_chunk; |
27 | use upload_chunk::*; | |
28 | ||
29 | ||
ca60c371 | 30 | pub fn api_method_upgrade_backup() -> ApiAsyncMethod { |
152764ec | 31 | ApiAsyncMethod::new( |
0aadd40b | 32 | upgrade_to_backup_protocol, |
ca60c371 | 33 | ObjectSchema::new("Upgraded to backup protocol.") |
0aadd40b DM |
34 | .required("store", StringSchema::new("Datastore name.")) |
35 | .required("backup-type", StringSchema::new("Backup type.") | |
cc84a830 | 36 | .format(Arc::new(ApiStringFormat::Enum(&["vm", "ct", "host"])))) |
0aadd40b | 37 | .required("backup-id", StringSchema::new("Backup ID.")) |
152764ec DM |
38 | ) |
39 | } | |
40 | ||
0aadd40b | 41 | fn upgrade_to_backup_protocol( |
152764ec DM |
42 | parts: Parts, |
43 | req_body: Body, | |
0aadd40b | 44 | param: Value, |
152764ec | 45 | _info: &ApiAsyncMethod, |
b4b63e52 | 46 | rpcenv: Box<RpcEnvironment>, |
152764ec | 47 | ) -> Result<BoxFut, Error> { |
0aadd40b | 48 | |
bd1507c4 DM |
49 | static PROXMOX_BACKUP_PROTOCOL_ID: &str = "proxmox-backup-protocol-h2"; |
50 | ||
bb105f9d DM |
51 | let store = tools::required_string_param(¶m, "store")?.to_owned(); |
52 | let datastore = DataStore::lookup_datastore(&store)?; | |
21ee7912 | 53 | |
0aadd40b DM |
54 | let backup_type = tools::required_string_param(¶m, "backup-type")?; |
55 | let backup_id = tools::required_string_param(¶m, "backup-id")?; | |
f9578f3c | 56 | let backup_time = Local.timestamp(Local::now().timestamp(), 0); |
152764ec DM |
57 | |
58 | let protocols = parts | |
59 | .headers | |
60 | .get("UPGRADE") | |
61 | .ok_or_else(|| format_err!("missing Upgrade header"))? | |
62 | .to_str()?; | |
63 | ||
0aadd40b | 64 | if protocols != PROXMOX_BACKUP_PROTOCOL_ID { |
152764ec DM |
65 | bail!("invalid protocol name"); |
66 | } | |
67 | ||
96e95fc1 DM |
68 | if parts.version >= http::version::Version::HTTP_2 { |
69 | bail!("unexpected http version '{:?}' (expected version < 2)", parts.version); | |
70 | } | |
71 | ||
0aadd40b | 72 | let worker_id = format!("{}_{}_{}", store, backup_type, backup_id); |
d9bd06ea | 73 | |
58c8d7d9 DM |
74 | let username = rpcenv.get_user().unwrap(); |
75 | let env_type = rpcenv.env_type(); | |
92ac375a | 76 | |
51a4f63f | 77 | let backup_group = BackupGroup::new(backup_type, backup_id); |
fbb798f6 | 78 | let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None); |
51a4f63f | 79 | let backup_dir = BackupDir::new_with_group(backup_group, backup_time.timestamp()); |
f9578f3c | 80 | |
bb105f9d | 81 | let (path, is_new) = datastore.create_backup_dir(&backup_dir)?; |
f9578f3c DM |
82 | if !is_new { bail!("backup directorty already exists."); } |
83 | ||
0aadd40b | 84 | WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| { |
bb105f9d | 85 | let mut env = BackupEnvironment::new( |
6b95c7df | 86 | env_type, username.clone(), worker.clone(), datastore, backup_dir); |
b02a52e3 | 87 | |
bb105f9d | 88 | env.last_backup = last_backup; |
b02a52e3 | 89 | |
bb105f9d DM |
90 | env.log(format!("starting new backup on datastore '{}': {:?}", store, path)); |
91 | ||
372724af | 92 | let service = BackupService::new(env.clone(), worker.clone()); |
72375ce6 | 93 | |
a66ab8ae DM |
94 | let abort_future = worker.abort_future(); |
95 | ||
372724af | 96 | let env2 = env.clone(); |
bb105f9d | 97 | |
152764ec DM |
98 | req_body |
99 | .on_upgrade() | |
92ac375a | 100 | .map_err(Error::from) |
152764ec | 101 | .and_then(move |conn| { |
d9bd06ea | 102 | worker.log("upgrade done"); |
92ac375a DM |
103 | |
104 | let mut http = hyper::server::conn::Http::new(); | |
105 | http.http2_only(true); | |
adec8ea2 | 106 | // increase window size: todo - find optiomal size |
771953f9 DM |
107 | let window_size = 32*1024*1024; // max = (1 << 31) - 2 |
108 | http.http2_initial_stream_window_size(window_size); | |
109 | http.http2_initial_connection_window_size(window_size); | |
92ac375a | 110 | |
d9bd06ea DM |
111 | http.serve_connection(conn, service) |
112 | .map_err(Error::from) | |
0aadd40b | 113 | }) |
a66ab8ae | 114 | .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); })) |
372724af | 115 | .map_err(|(err, _)| err) |
bb105f9d | 116 | .and_then(move |(_result, _)| { |
372724af DM |
117 | env.ensure_finished()?; |
118 | env.log("backup finished sucessfully"); | |
bb105f9d DM |
119 | Ok(()) |
120 | }) | |
372724af DM |
121 | .then(move |result| { |
122 | if let Err(err) = result { | |
a9584932 DM |
123 | match env2.ensure_finished() { |
124 | Ok(()) => {}, // ignorte error after finish | |
125 | _ => { | |
126 | env2.log(format!("backup failed: {}", err)); | |
127 | env2.log("removing failed backup"); | |
128 | env2.remove_backup()?; | |
129 | return Err(err); | |
130 | } | |
131 | } | |
372724af DM |
132 | } |
133 | Ok(()) | |
bb105f9d | 134 | }) |
090ac9f7 DM |
135 | })?; |
136 | ||
137 | let response = Response::builder() | |
138 | .status(StatusCode::SWITCHING_PROTOCOLS) | |
0aadd40b | 139 | .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID)) |
090ac9f7 DM |
140 | .body(Body::empty())?; |
141 | ||
142 | Ok(Box::new(futures::future::ok(response))) | |
152764ec | 143 | } |
92ac375a DM |
144 | |
145 | fn backup_api() -> Router { | |
146 | ||
92ac375a | 147 | let router = Router::new() |
f9578f3c | 148 | .subdir( |
a1e7cff3 DM |
149 | "upload_chunk", Router::new() |
150 | .upload(api_method_upload_chunk()) | |
f9578f3c DM |
151 | ) |
152 | .subdir( | |
153 | "dynamic_index", Router::new() | |
d3611366 | 154 | .download(api_method_dynamic_chunk_index()) |
f9578f3c | 155 | .post(api_method_create_dynamic_index()) |
82ab7230 | 156 | .put(api_method_dynamic_append()) |
f9578f3c | 157 | ) |
a2077252 DM |
158 | .subdir( |
159 | "dynamic_close", Router::new() | |
160 | .post(api_method_close_dynamic_index()) | |
161 | ) | |
372724af DM |
162 | .subdir( |
163 | "finish", Router::new() | |
a55fcd74 | 164 | .post( |
372724af DM |
165 | ApiMethod::new( |
166 | finish_backup, | |
167 | ObjectSchema::new("Mark backup as finished.") | |
168 | ) | |
169 | ) | |
170 | ) | |
adec8ea2 DM |
171 | .subdir( |
172 | "speedtest", Router::new() | |
173 | .upload(api_method_upload_speedtest()) | |
174 | ) | |
92ac375a DM |
175 | .list_subdirs(); |
176 | ||
177 | router | |
178 | } | |
179 | ||
d3611366 DM |
180 | pub fn api_method_dynamic_chunk_index() -> ApiAsyncMethod { |
181 | ApiAsyncMethod::new( | |
182 | dynamic_chunk_index, | |
183 | ObjectSchema::new(r###" | |
184 | Download the dynamic chunk index from the previous backup. | |
185 | Simply returns an empty list if this is the first backup. | |
186 | "### | |
187 | ) | |
4e93f8c1 | 188 | .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone()) |
d3611366 DM |
189 | ) |
190 | } | |
191 | ||
f9578f3c DM |
192 | pub fn api_method_create_dynamic_index() -> ApiMethod { |
193 | ApiMethod::new( | |
194 | create_dynamic_index, | |
195 | ObjectSchema::new("Create dynamic chunk index file.") | |
4e93f8c1 | 196 | .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone()) |
f9578f3c DM |
197 | ) |
198 | } | |
199 | ||
200 | fn create_dynamic_index( | |
201 | param: Value, | |
202 | _info: &ApiMethod, | |
203 | rpcenv: &mut RpcEnvironment, | |
204 | ) -> Result<Value, Error> { | |
205 | ||
206 | let env: &BackupEnvironment = rpcenv.as_ref(); | |
f9578f3c | 207 | |
8bea85b4 | 208 | let name = tools::required_string_param(¶m, "archive-name")?.to_owned(); |
f9578f3c | 209 | |
8bea85b4 | 210 | let mut archive_name = name.clone(); |
f9578f3c DM |
211 | if !archive_name.ends_with(".pxar") { |
212 | bail!("wrong archive extension"); | |
213 | } else { | |
214 | archive_name.push_str(".didx"); | |
215 | } | |
216 | ||
6b95c7df | 217 | let mut path = env.backup_dir.relative_path(); |
f9578f3c DM |
218 | path.push(archive_name); |
219 | ||
220 | let chunk_size = 4096*1024; // todo: ?? | |
221 | ||
bb105f9d | 222 | let index = env.datastore.create_dynamic_writer(&path, chunk_size)?; |
8bea85b4 | 223 | let wid = env.register_dynamic_writer(index, name)?; |
f9578f3c | 224 | |
bb105f9d | 225 | env.log(format!("created new dynamic index {} ({:?})", wid, path)); |
f9578f3c | 226 | |
bb105f9d | 227 | Ok(json!(wid)) |
f9578f3c DM |
228 | } |
229 | ||
82ab7230 DM |
230 | pub fn api_method_dynamic_append() -> ApiMethod { |
231 | ApiMethod::new( | |
232 | dynamic_append, | |
233 | ObjectSchema::new("Append chunk to dynamic index writer.") | |
417cb073 DM |
234 | .required("wid", IntegerSchema::new("Dynamic writer ID.") |
235 | .minimum(1) | |
236 | .maximum(256) | |
237 | ) | |
aa1b2e04 DM |
238 | .required("digest-list", ArraySchema::new( |
239 | "Chunk digest list.", | |
240 | StringSchema::new("Chunk digest.").into()) | |
241 | ) | |
417cb073 DM |
242 | .required("offset-list", ArraySchema::new( |
243 | "Chunk offset list.", | |
244 | IntegerSchema::new("Corresponding chunk offsets.") | |
245 | .minimum(0) | |
246 | .into()) | |
82ab7230 DM |
247 | ) |
248 | ) | |
249 | } | |
250 | ||
251 | fn dynamic_append ( | |
252 | param: Value, | |
253 | _info: &ApiMethod, | |
254 | rpcenv: &mut RpcEnvironment, | |
255 | ) -> Result<Value, Error> { | |
256 | ||
257 | let wid = tools::required_integer_param(¶m, "wid")? as usize; | |
aa1b2e04 | 258 | let digest_list = tools::required_array_param(¶m, "digest-list")?; |
417cb073 | 259 | let offset_list = tools::required_array_param(¶m, "offset-list")?; |
aa1b2e04 DM |
260 | |
261 | println!("DIGEST LIST LEN {}", digest_list.len()); | |
82ab7230 | 262 | |
417cb073 DM |
263 | if offset_list.len() != digest_list.len() { |
264 | bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len()); | |
265 | } | |
266 | ||
82ab7230 DM |
267 | let env: &BackupEnvironment = rpcenv.as_ref(); |
268 | ||
417cb073 | 269 | for (i, item) in digest_list.iter().enumerate() { |
aa1b2e04 DM |
270 | let digest_str = item.as_str().unwrap(); |
271 | let digest = crate::tools::hex_to_digest(digest_str)?; | |
417cb073 | 272 | let offset = offset_list[i].as_u64().unwrap(); |
aa1b2e04 | 273 | let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?; |
417cb073 | 274 | env.dynamic_writer_append_chunk(wid, offset, size, &digest)?; |
82ab7230 | 275 | |
aa1b2e04 DM |
276 | env.log(format!("sucessfully added chunk {} to dynamic index {}", digest_str, wid)); |
277 | } | |
82ab7230 DM |
278 | |
279 | Ok(Value::Null) | |
280 | } | |
281 | ||
a2077252 DM |
282 | pub fn api_method_close_dynamic_index() -> ApiMethod { |
283 | ApiMethod::new( | |
284 | close_dynamic_index, | |
285 | ObjectSchema::new("Close dynamic index writer.") | |
286 | .required("wid", IntegerSchema::new("Dynamic writer ID.") | |
287 | .minimum(1) | |
288 | .maximum(256) | |
289 | ) | |
8bea85b4 DM |
290 | .required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.") |
291 | .minimum(1) | |
292 | ) | |
293 | .required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.") | |
294 | .minimum(1) | |
295 | ) | |
a2077252 DM |
296 | ) |
297 | } | |
298 | ||
299 | fn close_dynamic_index ( | |
300 | param: Value, | |
301 | _info: &ApiMethod, | |
302 | rpcenv: &mut RpcEnvironment, | |
303 | ) -> Result<Value, Error> { | |
304 | ||
305 | let wid = tools::required_integer_param(¶m, "wid")? as usize; | |
8bea85b4 DM |
306 | let chunk_count = tools::required_integer_param(¶m, "chunk-count")? as u64; |
307 | let size = tools::required_integer_param(¶m, "size")? as u64; | |
a2077252 DM |
308 | |
309 | let env: &BackupEnvironment = rpcenv.as_ref(); | |
310 | ||
8bea85b4 | 311 | env.dynamic_writer_close(wid, chunk_count, size)?; |
a2077252 | 312 | |
bb105f9d DM |
313 | env.log(format!("sucessfully closed dynamic index {}", wid)); |
314 | ||
a2077252 DM |
315 | Ok(Value::Null) |
316 | } | |
317 | ||
318 | ||
372724af DM |
319 | fn finish_backup ( |
320 | _param: Value, | |
321 | _info: &ApiMethod, | |
322 | rpcenv: &mut RpcEnvironment, | |
323 | ) -> Result<Value, Error> { | |
324 | ||
325 | let env: &BackupEnvironment = rpcenv.as_ref(); | |
326 | ||
327 | env.finish_backup()?; | |
328 | ||
329 | Ok(Value::Null) | |
330 | } | |
a2077252 | 331 | |
d3611366 DM |
332 | fn dynamic_chunk_index( |
333 | _parts: Parts, | |
334 | _req_body: Body, | |
335 | param: Value, | |
336 | _info: &ApiAsyncMethod, | |
337 | rpcenv: Box<RpcEnvironment>, | |
338 | ) -> Result<BoxFut, Error> { | |
339 | ||
340 | let env: &BackupEnvironment = rpcenv.as_ref(); | |
d3611366 | 341 | |
a9584932 DM |
342 | println!("TEST CHUNK DOWNLOAD"); |
343 | ||
d3611366 DM |
344 | let mut archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); |
345 | ||
346 | if !archive_name.ends_with(".pxar") { | |
347 | bail!("wrong archive extension"); | |
348 | } else { | |
349 | archive_name.push_str(".didx"); | |
350 | } | |
351 | ||
a9584932 DM |
352 | let empty_response = { |
353 | Response::builder() | |
354 | .status(StatusCode::OK) | |
355 | .body(Body::empty())? | |
356 | }; | |
357 | ||
d3611366 DM |
358 | let last_backup = match &env.last_backup { |
359 | Some(info) => info, | |
a9584932 | 360 | None => return Ok(Box::new(future::ok(empty_response))), |
d3611366 DM |
361 | }; |
362 | ||
363 | let mut path = last_backup.backup_dir.relative_path(); | |
a9584932 | 364 | path.push(&archive_name); |
d3611366 | 365 | |
a9584932 DM |
366 | let index = match env.datastore.open_dynamic_reader(path) { |
367 | Ok(index) => index, | |
368 | Err(_) => { | |
369 | env.log(format!("there is no last backup for archive '{}'", archive_name)); | |
370 | return Ok(Box::new(future::ok(empty_response))); | |
371 | } | |
372 | }; | |
373 | ||
374 | let count = index.index_count(); | |
375 | for pos in 0..count { | |
376 | let (start, end, digest) = index.chunk_info(pos)?; | |
377 | let size = (end - start) as u32; | |
378 | env.register_chunk(digest, size)?; | |
379 | } | |
d3611366 | 380 | |
7f3d2ffa | 381 | let reader = DigestListEncoder::new(Box::new(index)); |
d3611366 DM |
382 | |
383 | let stream = WrappedReaderStream::new(reader); | |
384 | ||
385 | // fixme: set size, content type? | |
386 | let response = http::Response::builder() | |
387 | .status(200) | |
388 | .body(Body::wrap_stream(stream))?; | |
389 | ||
390 | Ok(Box::new(future::ok(response))) | |
391 | } |