]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup.rs
tree-wide: use 'dyn' for all trait objects
[proxmox-backup.git] / src / api2 / backup.rs
1 use failure::*;
2
3 use std::sync::Arc;
4
5 use futures::*;
6 use hyper::header::{HeaderValue, UPGRADE};
7 use hyper::{Body, Response, StatusCode};
8 use hyper::http::request::Parts;
9 use chrono::{Local, TimeZone};
10
11 use serde_json::{json, Value};
12
13 use crate::tools;
14 use crate::tools::wrapped_reader_stream::*;
15 use crate::api_schema::router::*;
16 use crate::api_schema::*;
17 use crate::server::WorkerTask;
18 use crate::backup::*;
19
20 mod environment;
21 use environment::*;
22
23 mod service;
24 use service::*;
25
26 mod upload_chunk;
27 use upload_chunk::*;
28
29 pub fn router() -> Router {
30 Router::new()
31 .upgrade(api_method_upgrade_backup())
32 }
33
34 pub fn api_method_upgrade_backup() -> ApiAsyncMethod {
35 ApiAsyncMethod::new(
36 upgrade_to_backup_protocol,
37 ObjectSchema::new(concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1!(), "')."))
38 .required("store", StringSchema::new("Datastore name."))
39 .required("backup-type", StringSchema::new("Backup type.")
40 .format(Arc::new(ApiStringFormat::Enum(&["vm", "ct", "host"]))))
41 .required("backup-id", StringSchema::new("Backup ID."))
42 .optional("debug", BooleanSchema::new("Enable verbose debug logging."))
43 )
44 }
45
46 fn upgrade_to_backup_protocol(
47 parts: Parts,
48 req_body: Body,
49 param: Value,
50 _info: &ApiAsyncMethod,
51 rpcenv: Box<dyn RpcEnvironment>,
52 ) -> Result<BoxFut, Error> {
53
54 let debug = param["debug"].as_bool().unwrap_or(false);
55
56 let store = tools::required_string_param(&param, "store")?.to_owned();
57 let datastore = DataStore::lookup_datastore(&store)?;
58
59 let backup_type = tools::required_string_param(&param, "backup-type")?;
60 let backup_id = tools::required_string_param(&param, "backup-id")?;
61 let backup_time = Local.timestamp(Local::now().timestamp(), 0);
62
63 let protocols = parts
64 .headers
65 .get("UPGRADE")
66 .ok_or_else(|| format_err!("missing Upgrade header"))?
67 .to_str()?;
68
69 if protocols != PROXMOX_BACKUP_PROTOCOL_ID_V1!() {
70 bail!("invalid protocol name");
71 }
72
73 if parts.version >= http::version::Version::HTTP_2 {
74 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
75 }
76
77 let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
78
79 let username = rpcenv.get_user().unwrap();
80 let env_type = rpcenv.env_type();
81
82 let backup_group = BackupGroup::new(backup_type, backup_id);
83 let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None);
84 let backup_dir = BackupDir::new_with_group(backup_group, backup_time.timestamp());
85
86 let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
87 if !is_new { bail!("backup directorty already exists."); }
88
89 WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
90 let mut env = BackupEnvironment::new(
91 env_type, username.clone(), worker.clone(), datastore, backup_dir);
92
93 env.debug = debug;
94 env.last_backup = last_backup;
95
96 env.log(format!("starting new backup on datastore '{}': {:?}", store, path));
97
98 let service = BackupService::new(env.clone(), worker.clone(), debug);
99
100 let abort_future = worker.abort_future();
101
102 let env2 = env.clone();
103 let env3 = env.clone();
104
105 req_body
106 .on_upgrade()
107 .map_err(Error::from)
108 .and_then(move |conn| {
109 env3.debug("protocol upgrade done");
110
111 let mut http = hyper::server::conn::Http::new();
112 http.http2_only(true);
113 // increase window size: todo - find optiomal size
114 let window_size = 32*1024*1024; // max = (1 << 31) - 2
115 http.http2_initial_stream_window_size(window_size);
116 http.http2_initial_connection_window_size(window_size);
117
118 http.serve_connection(conn, service)
119 .map_err(Error::from)
120 })
121 .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); }))
122 .map_err(|(err, _)| err)
123 .and_then(move |(_result, _)| {
124 env.ensure_finished()?;
125 env.log("backup finished sucessfully");
126 Ok(())
127 })
128 .then(move |result| {
129 if let Err(err) = result {
130 match env2.ensure_finished() {
131 Ok(()) => {}, // ignore error after finish
132 _ => {
133 env2.log(format!("backup failed: {}", err));
134 env2.log("removing failed backup");
135 env2.remove_backup()?;
136 return Err(err);
137 }
138 }
139 }
140 Ok(())
141 })
142 })?;
143
144 let response = Response::builder()
145 .status(StatusCode::SWITCHING_PROTOCOLS)
146 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
147 .body(Body::empty())?;
148
149 Ok(Box::new(futures::future::ok(response)))
150 }
151
152 pub fn backup_api() -> Router {
153
154 let router = Router::new()
155 .subdir(
156 "config", Router::new()
157 .upload(api_method_upload_config())
158 )
159 .subdir(
160 "dynamic_chunk", Router::new()
161 .upload(api_method_upload_dynamic_chunk())
162 )
163 .subdir(
164 "dynamic_index", Router::new()
165 .download(api_method_dynamic_chunk_index())
166 .post(api_method_create_dynamic_index())
167 .put(api_method_dynamic_append())
168 )
169 .subdir(
170 "dynamic_close", Router::new()
171 .post(api_method_close_dynamic_index())
172 )
173 .subdir(
174 "fixed_chunk", Router::new()
175 .upload(api_method_upload_fixed_chunk())
176 )
177 .subdir(
178 "fixed_index", Router::new()
179 .download(api_method_fixed_chunk_index())
180 .post(api_method_create_fixed_index())
181 .put(api_method_fixed_append())
182 )
183 .subdir(
184 "fixed_close", Router::new()
185 .post(api_method_close_fixed_index())
186 )
187 .subdir(
188 "finish", Router::new()
189 .post(
190 ApiMethod::new(
191 finish_backup,
192 ObjectSchema::new("Mark backup as finished.")
193 )
194 )
195 )
196 .subdir(
197 "speedtest", Router::new()
198 .upload(api_method_upload_speedtest())
199 )
200 .list_subdirs();
201
202 router
203 }
204
205 pub fn api_method_create_dynamic_index() -> ApiMethod {
206 ApiMethod::new(
207 create_dynamic_index,
208 ObjectSchema::new("Create dynamic chunk index file.")
209 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
210 )
211 }
212
213 fn create_dynamic_index(
214 param: Value,
215 _info: &ApiMethod,
216 rpcenv: &mut dyn RpcEnvironment,
217 ) -> Result<Value, Error> {
218
219 let env: &BackupEnvironment = rpcenv.as_ref();
220
221 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
222
223 let mut archive_name = name.clone();
224 if !archive_name.ends_with(".pxar") {
225 bail!("wrong archive extension: '{}'", archive_name);
226 } else {
227 archive_name.push_str(".didx");
228 }
229
230 let mut path = env.backup_dir.relative_path();
231 path.push(archive_name);
232
233 let index = env.datastore.create_dynamic_writer(&path)?;
234 let wid = env.register_dynamic_writer(index, name)?;
235
236 env.log(format!("created new dynamic index {} ({:?})", wid, path));
237
238 Ok(json!(wid))
239 }
240
241 pub fn api_method_create_fixed_index() -> ApiMethod {
242 ApiMethod::new(
243 create_fixed_index,
244 ObjectSchema::new("Create fixed chunk index file.")
245 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
246 .required("size", IntegerSchema::new("File size.")
247 .minimum(1)
248 )
249 )
250 }
251
252 fn create_fixed_index(
253 param: Value,
254 _info: &ApiMethod,
255 rpcenv: &mut dyn RpcEnvironment,
256 ) -> Result<Value, Error> {
257
258 let env: &BackupEnvironment = rpcenv.as_ref();
259
260 println!("PARAM: {:?}", param);
261
262 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
263 let size = tools::required_integer_param(&param, "size")? as usize;
264
265 let mut archive_name = name.clone();
266 if !archive_name.ends_with(".img") {
267 bail!("wrong archive extension: '{}'", archive_name);
268 } else {
269 archive_name.push_str(".fidx");
270 }
271
272 let mut path = env.backup_dir.relative_path();
273 path.push(archive_name);
274
275 let chunk_size = 4096*1024; // todo: ??
276
277 let index = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
278 let wid = env.register_fixed_writer(index, name, size, chunk_size as u32)?;
279
280 env.log(format!("created new fixed index {} ({:?})", wid, path));
281
282 Ok(json!(wid))
283 }
284
285 pub fn api_method_dynamic_append() -> ApiMethod {
286 ApiMethod::new(
287 dynamic_append,
288 ObjectSchema::new("Append chunk to dynamic index writer.")
289 .required("wid", IntegerSchema::new("Dynamic writer ID.")
290 .minimum(1)
291 .maximum(256)
292 )
293 .required("digest-list", ArraySchema::new(
294 "Chunk digest list.",
295 StringSchema::new("Chunk digest.").into())
296 )
297 .required("offset-list", ArraySchema::new(
298 "Chunk offset list.",
299 IntegerSchema::new("Corresponding chunk offsets.")
300 .minimum(0)
301 .into())
302 )
303 )
304 }
305
306 fn dynamic_append (
307 param: Value,
308 _info: &ApiMethod,
309 rpcenv: &mut dyn RpcEnvironment,
310 ) -> Result<Value, Error> {
311
312 let wid = tools::required_integer_param(&param, "wid")? as usize;
313 let digest_list = tools::required_array_param(&param, "digest-list")?;
314 let offset_list = tools::required_array_param(&param, "offset-list")?;
315
316 if offset_list.len() != digest_list.len() {
317 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
318 }
319
320 let env: &BackupEnvironment = rpcenv.as_ref();
321
322 env.debug(format!("dynamic_append {} chunks", digest_list.len()));
323
324 for (i, item) in digest_list.iter().enumerate() {
325 let digest_str = item.as_str().unwrap();
326 let digest = crate::tools::hex_to_digest(digest_str)?;
327 let offset = offset_list[i].as_u64().unwrap();
328 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
329
330 env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
331
332 env.debug(format!("sucessfully added chunk {} to dynamic index {} (offset {}, size {})", digest_str, wid, offset, size));
333 }
334
335 Ok(Value::Null)
336 }
337
338 pub fn api_method_fixed_append() -> ApiMethod {
339 ApiMethod::new(
340 fixed_append,
341 ObjectSchema::new("Append chunk to fixed index writer.")
342 .required("wid", IntegerSchema::new("Fixed writer ID.")
343 .minimum(1)
344 .maximum(256)
345 )
346 .required("digest-list", ArraySchema::new(
347 "Chunk digest list.",
348 StringSchema::new("Chunk digest.").into())
349 )
350 .required("offset-list", ArraySchema::new(
351 "Chunk offset list.",
352 IntegerSchema::new("Corresponding chunk offsets.")
353 .minimum(0)
354 .into())
355 )
356 )
357 }
358
359 fn fixed_append (
360 param: Value,
361 _info: &ApiMethod,
362 rpcenv: &mut dyn RpcEnvironment,
363 ) -> Result<Value, Error> {
364
365 let wid = tools::required_integer_param(&param, "wid")? as usize;
366 let digest_list = tools::required_array_param(&param, "digest-list")?;
367 let offset_list = tools::required_array_param(&param, "offset-list")?;
368
369 if offset_list.len() != digest_list.len() {
370 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
371 }
372
373 let env: &BackupEnvironment = rpcenv.as_ref();
374
375 env.debug(format!("fixed_append {} chunks", digest_list.len()));
376
377 for (i, item) in digest_list.iter().enumerate() {
378 let digest_str = item.as_str().unwrap();
379 let digest = crate::tools::hex_to_digest(digest_str)?;
380 let offset = offset_list[i].as_u64().unwrap();
381 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
382
383 env.fixed_writer_append_chunk(wid, offset, size, &digest)?;
384
385 env.debug(format!("sucessfully added chunk {} to fixed index {} (offset {}, size {})", digest_str, wid, offset, size));
386 }
387
388 Ok(Value::Null)
389 }
390
391 pub fn api_method_close_dynamic_index() -> ApiMethod {
392 ApiMethod::new(
393 close_dynamic_index,
394 ObjectSchema::new("Close dynamic index writer.")
395 .required("wid", IntegerSchema::new("Dynamic writer ID.")
396 .minimum(1)
397 .maximum(256)
398 )
399 .required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
400 .minimum(1)
401 )
402 .required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.")
403 .minimum(1)
404 )
405 )
406 }
407
408 fn close_dynamic_index (
409 param: Value,
410 _info: &ApiMethod,
411 rpcenv: &mut dyn RpcEnvironment,
412 ) -> Result<Value, Error> {
413
414 let wid = tools::required_integer_param(&param, "wid")? as usize;
415 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
416 let size = tools::required_integer_param(&param, "size")? as u64;
417
418 let env: &BackupEnvironment = rpcenv.as_ref();
419
420 env.dynamic_writer_close(wid, chunk_count, size)?;
421
422 env.log(format!("sucessfully closed dynamic index {}", wid));
423
424 Ok(Value::Null)
425 }
426
427 pub fn api_method_close_fixed_index() -> ApiMethod {
428 ApiMethod::new(
429 close_fixed_index,
430 ObjectSchema::new("Close fixed index writer.")
431 .required("wid", IntegerSchema::new("Fixed writer ID.")
432 .minimum(1)
433 .maximum(256)
434 )
435 .required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
436 .minimum(1)
437 )
438 .required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.")
439 .minimum(1)
440 )
441 )
442 }
443
444 fn close_fixed_index (
445 param: Value,
446 _info: &ApiMethod,
447 rpcenv: &mut dyn RpcEnvironment,
448 ) -> Result<Value, Error> {
449
450 let wid = tools::required_integer_param(&param, "wid")? as usize;
451 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
452 let size = tools::required_integer_param(&param, "size")? as u64;
453
454 let env: &BackupEnvironment = rpcenv.as_ref();
455
456 env.fixed_writer_close(wid, chunk_count, size)?;
457
458 env.log(format!("sucessfully closed fixed index {}", wid));
459
460 Ok(Value::Null)
461 }
462
463 fn finish_backup (
464 _param: Value,
465 _info: &ApiMethod,
466 rpcenv: &mut dyn RpcEnvironment,
467 ) -> Result<Value, Error> {
468
469 let env: &BackupEnvironment = rpcenv.as_ref();
470
471 env.finish_backup()?;
472 env.log("sucessfully finished backup");
473
474 Ok(Value::Null)
475 }
476
477 pub fn api_method_dynamic_chunk_index() -> ApiAsyncMethod {
478 ApiAsyncMethod::new(
479 dynamic_chunk_index,
480 ObjectSchema::new(r###"
481 Download the dynamic chunk index from the previous backup.
482 Simply returns an empty list if this is the first backup.
483 "###
484 )
485 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
486 )
487 }
488
489 fn dynamic_chunk_index(
490 _parts: Parts,
491 _req_body: Body,
492 param: Value,
493 _info: &ApiAsyncMethod,
494 rpcenv: Box<dyn RpcEnvironment>,
495 ) -> Result<BoxFut, Error> {
496
497 let env: &BackupEnvironment = rpcenv.as_ref();
498
499 let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
500
501 if !archive_name.ends_with(".pxar") {
502 bail!("wrong archive extension: '{}'", archive_name);
503 } else {
504 archive_name.push_str(".didx");
505 }
506
507 let empty_response = {
508 Response::builder()
509 .status(StatusCode::OK)
510 .body(Body::empty())?
511 };
512
513 let last_backup = match &env.last_backup {
514 Some(info) => info,
515 None => return Ok(Box::new(future::ok(empty_response))),
516 };
517
518 let mut path = last_backup.backup_dir.relative_path();
519 path.push(&archive_name);
520
521 let index = match env.datastore.open_dynamic_reader(path) {
522 Ok(index) => index,
523 Err(_) => {
524 env.log(format!("there is no last backup for archive '{}'", archive_name));
525 return Ok(Box::new(future::ok(empty_response)));
526 }
527 };
528
529 env.log(format!("download last backup index for archive '{}'", archive_name));
530
531 let count = index.index_count();
532 for pos in 0..count {
533 let (start, end, digest) = index.chunk_info(pos)?;
534 let size = (end - start) as u32;
535 env.register_chunk(digest, size)?;
536 }
537
538 let reader = DigestListEncoder::new(Box::new(index));
539
540 let stream = WrappedReaderStream::new(reader);
541
542 // fixme: set size, content type?
543 let response = http::Response::builder()
544 .status(200)
545 .body(Body::wrap_stream(stream))?;
546
547 Ok(Box::new(future::ok(response)))
548 }
549
550 pub fn api_method_fixed_chunk_index() -> ApiAsyncMethod {
551 ApiAsyncMethod::new(
552 fixed_chunk_index,
553 ObjectSchema::new(r###"
554 Download the fixed chunk index from the previous backup.
555 Simply returns an empty list if this is the first backup.
556 "###
557 )
558 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
559 )
560 }
561
562 fn fixed_chunk_index(
563 _parts: Parts,
564 _req_body: Body,
565 param: Value,
566 _info: &ApiAsyncMethod,
567 rpcenv: Box<dyn RpcEnvironment>,
568 ) -> Result<BoxFut, Error> {
569
570 let env: &BackupEnvironment = rpcenv.as_ref();
571
572 let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
573
574 if !archive_name.ends_with(".img") {
575 bail!("wrong archive extension: '{}'", archive_name);
576 } else {
577 archive_name.push_str(".fidx");
578 }
579
580 let empty_response = {
581 Response::builder()
582 .status(StatusCode::OK)
583 .body(Body::empty())?
584 };
585
586 let last_backup = match &env.last_backup {
587 Some(info) => info,
588 None => return Ok(Box::new(future::ok(empty_response))),
589 };
590
591 let mut path = last_backup.backup_dir.relative_path();
592 path.push(&archive_name);
593
594 let index = match env.datastore.open_fixed_reader(path) {
595 Ok(index) => index,
596 Err(_) => {
597 env.log(format!("there is no last backup for archive '{}'", archive_name));
598 return Ok(Box::new(future::ok(empty_response)));
599 }
600 };
601
602 env.log(format!("download last backup index for archive '{}'", archive_name));
603
604 let count = index.index_count();
605 for pos in 0..count {
606 let digest = index.index_digest(pos).unwrap();
607 let size = index.chunk_size as u32;
608 env.register_chunk(*digest, size)?;
609 }
610
611 let reader = DigestListEncoder::new(Box::new(index));
612
613 let stream = WrappedReaderStream::new(reader);
614
615 // fixme: set size, content type?
616 let response = http::Response::builder()
617 .status(200)
618 .body(Body::wrap_stream(stream))?;
619
620 Ok(Box::new(future::ok(response)))
621 }