]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup.rs
api/compat: remove remaining api_schema references
[proxmox-backup.git] / src / api2 / backup.rs
1 use failure::*;
2 use futures::*;
3 use hyper::header::{HeaderValue, UPGRADE};
4 use hyper::http::request::Parts;
5 use hyper::{Body, Response, StatusCode};
6 use serde_json::{json, Value};
7
8 use proxmox::{sortable, identity};
9 use proxmox::api::list_subdirs_api_method;
10 use proxmox::api::{ApiFuture, ApiHandler, ApiMethod, Router, RpcEnvironment};
11 use proxmox::api::router::SubdirMap;
12 use proxmox::api::schema::*;
13
14 use crate::tools;
15 use crate::tools::wrapped_reader_stream::*;
16 use crate::server::{WorkerTask, H2Service};
17 use crate::backup::*;
18 use crate::api2::types::*;
19
20 mod environment;
21 use environment::*;
22
23 mod upload_chunk;
24 use upload_chunk::*;
25
26 pub const ROUTER: Router = Router::new()
27 .upgrade(&API_METHOD_UPGRADE_BACKUP);
28
29 #[sortable]
30 pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
31 &ApiHandler::Async(&upgrade_to_backup_protocol),
32 &ObjectSchema::new(
33 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1!(), "')."),
34 &sorted!([
35 ("store", false, &StringSchema::new("Datastore name.").schema()),
36 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
37 ("backup-id", false, &BACKUP_ID_SCHEMA),
38 ("backup-time", false, &BACKUP_TIME_SCHEMA),
39 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
40 ]),
41 )
42 );
43
44 fn upgrade_to_backup_protocol(
45 parts: Parts,
46 req_body: Body,
47 param: Value,
48 _info: &ApiMethod,
49 rpcenv: Box<dyn RpcEnvironment>,
50 ) -> Result<ApiFuture, Error> {
51
52 let debug = param["debug"].as_bool().unwrap_or(false);
53
54 let store = tools::required_string_param(&param, "store")?.to_owned();
55 let datastore = DataStore::lookup_datastore(&store)?;
56
57 let backup_type = tools::required_string_param(&param, "backup-type")?;
58 let backup_id = tools::required_string_param(&param, "backup-id")?;
59 let backup_time = tools::required_integer_param(&param, "backup-time")?;
60
61 let protocols = parts
62 .headers
63 .get("UPGRADE")
64 .ok_or_else(|| format_err!("missing Upgrade header"))?
65 .to_str()?;
66
67 if protocols != PROXMOX_BACKUP_PROTOCOL_ID_V1!() {
68 bail!("invalid protocol name");
69 }
70
71 if parts.version >= http::version::Version::HTTP_2 {
72 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
73 }
74
75 let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
76
77 let username = rpcenv.get_user().unwrap();
78 let env_type = rpcenv.env_type();
79
80 let backup_group = BackupGroup::new(backup_type, backup_id);
81 let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None);
82 let backup_dir = BackupDir::new_with_group(backup_group, backup_time);
83
84 if let Some(last) = &last_backup {
85 if backup_dir.backup_time() <= last.backup_dir.backup_time() {
86 bail!("backup timestamp is older than last backup.");
87 }
88 }
89
90 let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
91 if !is_new { bail!("backup directorty already exists."); }
92
93 WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
94 let mut env = BackupEnvironment::new(
95 env_type, username.clone(), worker.clone(), datastore, backup_dir);
96
97 env.debug = debug;
98 env.last_backup = last_backup;
99
100 env.log(format!("starting new backup on datastore '{}': {:?}", store, path));
101
102 let service = H2Service::new(env.clone(), worker.clone(), &BACKUP_API_ROUTER, debug);
103
104 let abort_future = worker.abort_future();
105
106 let env2 = env.clone();
107 let env3 = env.clone();
108
109 let req_fut = req_body
110 .on_upgrade()
111 .map_err(Error::from)
112 .and_then(move |conn| {
113 env3.debug("protocol upgrade done");
114
115 let mut http = hyper::server::conn::Http::new();
116 http.http2_only(true);
117 // increase window size: todo - find optiomal size
118 let window_size = 32*1024*1024; // max = (1 << 31) - 2
119 http.http2_initial_stream_window_size(window_size);
120 http.http2_initial_connection_window_size(window_size);
121
122 http.serve_connection(conn, service)
123 .map_err(Error::from)
124 });
125 let abort_future = abort_future
126 .map(|_| Err(format_err!("task aborted")));
127
128 use futures::future::Either;
129 future::select(req_fut, abort_future)
130 .map(|res| match res {
131 Either::Left((Ok(res), _)) => Ok(res),
132 Either::Left((Err(err), _)) => Err(err),
133 Either::Right((Ok(res), _)) => Ok(res),
134 Either::Right((Err(err), _)) => Err(err),
135 })
136 .and_then(move |_result| async move {
137 env.ensure_finished()?;
138 env.log("backup finished sucessfully");
139 Ok(())
140 })
141 .then(move |result| async move {
142 if let Err(err) = result {
143 match env2.ensure_finished() {
144 Ok(()) => {}, // ignore error after finish
145 _ => {
146 env2.log(format!("backup failed: {}", err));
147 env2.log("removing failed backup");
148 env2.remove_backup()?;
149 return Err(err);
150 }
151 }
152 }
153 Ok(())
154 })
155 })?;
156
157 let response = Response::builder()
158 .status(StatusCode::SWITCHING_PROTOCOLS)
159 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
160 .body(Body::empty())?;
161
162 Ok(Box::new(futures::future::ok(response)))
163 }
164
165 pub const BACKUP_API_SUBDIRS: SubdirMap = &[
166 (
167 "blob", &Router::new()
168 .upload(&API_METHOD_UPLOAD_BLOB)
169 ),
170 (
171 "dynamic_chunk", &Router::new()
172 .upload(&API_METHOD_UPLOAD_DYNAMIC_CHUNK)
173 ),
174 (
175 "dynamic_close", &Router::new()
176 .post(&API_METHOD_CLOSE_DYNAMIC_INDEX)
177 ),
178 (
179 "dynamic_index", &Router::new()
180 .download(&API_METHOD_DYNAMIC_CHUNK_INDEX)
181 .post(&API_METHOD_CREATE_DYNAMIC_INDEX)
182 .put(&API_METHOD_DYNAMIC_APPEND)
183 ),
184 (
185 "finish", &Router::new()
186 .post(
187 &ApiMethod::new(
188 &ApiHandler::Sync(&finish_backup),
189 &ObjectSchema::new("Mark backup as finished.", &[])
190 )
191 )
192 ),
193 (
194 "fixed_chunk", &Router::new()
195 .upload(&API_METHOD_UPLOAD_FIXED_CHUNK)
196 ),
197 (
198 "fixed_close", &Router::new()
199 .post(&API_METHOD_CLOSE_FIXED_INDEX)
200 ),
201 (
202 "fixed_index", &Router::new()
203 .download(&API_METHOD_FIXED_CHUNK_INDEX)
204 .post(&API_METHOD_CREATE_FIXED_INDEX)
205 .put(&API_METHOD_FIXED_APPEND)
206 ),
207 (
208 "speedtest", &Router::new()
209 .upload(&API_METHOD_UPLOAD_SPEEDTEST)
210 ),
211 ];
212
213 pub const BACKUP_API_ROUTER: Router = Router::new()
214 .get(&list_subdirs_api_method!(BACKUP_API_SUBDIRS))
215 .subdirs(BACKUP_API_SUBDIRS);
216
217 #[sortable]
218 pub const API_METHOD_CREATE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
219 &ApiHandler::Sync(&create_dynamic_index),
220 &ObjectSchema::new(
221 "Create dynamic chunk index file.",
222 &sorted!([
223 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
224 ]),
225 )
226 );
227
228 fn create_dynamic_index(
229 param: Value,
230 _info: &ApiMethod,
231 rpcenv: &mut dyn RpcEnvironment,
232 ) -> Result<Value, Error> {
233
234 let env: &BackupEnvironment = rpcenv.as_ref();
235
236 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
237
238 let archive_name = name.clone();
239 if !archive_name.ends_with(".didx") {
240 bail!("wrong archive extension: '{}'", archive_name);
241 }
242
243 let mut path = env.backup_dir.relative_path();
244 path.push(archive_name);
245
246 let index = env.datastore.create_dynamic_writer(&path)?;
247 let wid = env.register_dynamic_writer(index, name)?;
248
249 env.log(format!("created new dynamic index {} ({:?})", wid, path));
250
251 Ok(json!(wid))
252 }
253
254 #[sortable]
255 pub const API_METHOD_CREATE_FIXED_INDEX: ApiMethod = ApiMethod::new(
256 &ApiHandler::Sync(&create_fixed_index),
257 &ObjectSchema::new(
258 "Create fixed chunk index file.",
259 &sorted!([
260 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
261 ("size", false, &IntegerSchema::new("File size.")
262 .minimum(1)
263 .schema()
264 ),
265 ]),
266 )
267 );
268
269 fn create_fixed_index(
270 param: Value,
271 _info: &ApiMethod,
272 rpcenv: &mut dyn RpcEnvironment,
273 ) -> Result<Value, Error> {
274
275 let env: &BackupEnvironment = rpcenv.as_ref();
276
277 println!("PARAM: {:?}", param);
278
279 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
280 let size = tools::required_integer_param(&param, "size")? as usize;
281
282 let archive_name = name.clone();
283 if !archive_name.ends_with(".fidx") {
284 bail!("wrong archive extension: '{}'", archive_name);
285 }
286
287 let mut path = env.backup_dir.relative_path();
288 path.push(archive_name);
289
290 let chunk_size = 4096*1024; // todo: ??
291
292 let index = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
293 let wid = env.register_fixed_writer(index, name, size, chunk_size as u32)?;
294
295 env.log(format!("created new fixed index {} ({:?})", wid, path));
296
297 Ok(json!(wid))
298 }
299
300 #[sortable]
301 pub const API_METHOD_DYNAMIC_APPEND: ApiMethod = ApiMethod::new(
302 &ApiHandler::Sync(&dynamic_append),
303 &ObjectSchema::new(
304 "Append chunk to dynamic index writer.",
305 &sorted!([
306 (
307 "wid",
308 false,
309 &IntegerSchema::new("Dynamic writer ID.")
310 .minimum(1)
311 .maximum(256)
312 .schema()
313 ),
314 (
315 "digest-list",
316 false,
317 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
318 ),
319 (
320 "offset-list",
321 false,
322 &ArraySchema::new(
323 "Chunk offset list.",
324 &IntegerSchema::new("Corresponding chunk offsets.")
325 .minimum(0)
326 .schema()
327 ).schema()
328 ),
329 ]),
330 )
331 );
332
333 fn dynamic_append (
334 param: Value,
335 _info: &ApiMethod,
336 rpcenv: &mut dyn RpcEnvironment,
337 ) -> Result<Value, Error> {
338
339 let wid = tools::required_integer_param(&param, "wid")? as usize;
340 let digest_list = tools::required_array_param(&param, "digest-list")?;
341 let offset_list = tools::required_array_param(&param, "offset-list")?;
342
343 if offset_list.len() != digest_list.len() {
344 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
345 }
346
347 let env: &BackupEnvironment = rpcenv.as_ref();
348
349 env.debug(format!("dynamic_append {} chunks", digest_list.len()));
350
351 for (i, item) in digest_list.iter().enumerate() {
352 let digest_str = item.as_str().unwrap();
353 let digest = proxmox::tools::hex_to_digest(digest_str)?;
354 let offset = offset_list[i].as_u64().unwrap();
355 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
356
357 env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
358
359 env.debug(format!("sucessfully added chunk {} to dynamic index {} (offset {}, size {})", digest_str, wid, offset, size));
360 }
361
362 Ok(Value::Null)
363 }
364
365 #[sortable]
366 pub const API_METHOD_FIXED_APPEND: ApiMethod = ApiMethod::new(
367 &ApiHandler::Sync(&fixed_append),
368 &ObjectSchema::new(
369 "Append chunk to fixed index writer.",
370 &sorted!([
371 (
372 "wid",
373 false,
374 &IntegerSchema::new("Fixed writer ID.")
375 .minimum(1)
376 .maximum(256)
377 .schema()
378 ),
379 (
380 "digest-list",
381 false,
382 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
383 ),
384 (
385 "offset-list",
386 false,
387 &ArraySchema::new(
388 "Chunk offset list.",
389 &IntegerSchema::new("Corresponding chunk offsets.")
390 .minimum(0)
391 .schema()
392 ).schema()
393 )
394 ]),
395 )
396 );
397
398 fn fixed_append (
399 param: Value,
400 _info: &ApiMethod,
401 rpcenv: &mut dyn RpcEnvironment,
402 ) -> Result<Value, Error> {
403
404 let wid = tools::required_integer_param(&param, "wid")? as usize;
405 let digest_list = tools::required_array_param(&param, "digest-list")?;
406 let offset_list = tools::required_array_param(&param, "offset-list")?;
407
408 if offset_list.len() != digest_list.len() {
409 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
410 }
411
412 let env: &BackupEnvironment = rpcenv.as_ref();
413
414 env.debug(format!("fixed_append {} chunks", digest_list.len()));
415
416 for (i, item) in digest_list.iter().enumerate() {
417 let digest_str = item.as_str().unwrap();
418 let digest = proxmox::tools::hex_to_digest(digest_str)?;
419 let offset = offset_list[i].as_u64().unwrap();
420 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
421
422 env.fixed_writer_append_chunk(wid, offset, size, &digest)?;
423
424 env.debug(format!("sucessfully added chunk {} to fixed index {} (offset {}, size {})", digest_str, wid, offset, size));
425 }
426
427 Ok(Value::Null)
428 }
429
430 #[sortable]
431 pub const API_METHOD_CLOSE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
432 &ApiHandler::Sync(&close_dynamic_index),
433 &ObjectSchema::new(
434 "Close dynamic index writer.",
435 &sorted!([
436 (
437 "wid",
438 false,
439 &IntegerSchema::new("Dynamic writer ID.")
440 .minimum(1)
441 .maximum(256)
442 .schema()
443 ),
444 (
445 "chunk-count",
446 false,
447 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
448 .minimum(1)
449 .schema()
450 ),
451 (
452 "size",
453 false,
454 &IntegerSchema::new("File size. This is used to verify that the server got all data.")
455 .minimum(1)
456 .schema()
457 ),
458 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
459 ]),
460 )
461 );
462
463 fn close_dynamic_index (
464 param: Value,
465 _info: &ApiMethod,
466 rpcenv: &mut dyn RpcEnvironment,
467 ) -> Result<Value, Error> {
468
469 let wid = tools::required_integer_param(&param, "wid")? as usize;
470 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
471 let size = tools::required_integer_param(&param, "size")? as u64;
472 let csum_str = tools::required_string_param(&param, "csum")?;
473 let csum = proxmox::tools::hex_to_digest(csum_str)?;
474
475 let env: &BackupEnvironment = rpcenv.as_ref();
476
477 env.dynamic_writer_close(wid, chunk_count, size, csum)?;
478
479 env.log(format!("sucessfully closed dynamic index {}", wid));
480
481 Ok(Value::Null)
482 }
483
484 #[sortable]
485 pub const API_METHOD_CLOSE_FIXED_INDEX: ApiMethod = ApiMethod::new(
486 &ApiHandler::Sync(&close_fixed_index),
487 &ObjectSchema::new(
488 "Close fixed index writer.",
489 &sorted!([
490 (
491 "wid",
492 false,
493 &IntegerSchema::new("Fixed writer ID.")
494 .minimum(1)
495 .maximum(256)
496 .schema()
497 ),
498 (
499 "chunk-count",
500 false,
501 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
502 .minimum(1)
503 .schema()
504 ),
505 (
506 "size",
507 false,
508 &IntegerSchema::new("File size. This is used to verify that the server got all data.")
509 .minimum(1)
510 .schema()
511 ),
512 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
513 ]),
514 )
515 );
516
517 fn close_fixed_index (
518 param: Value,
519 _info: &ApiMethod,
520 rpcenv: &mut dyn RpcEnvironment,
521 ) -> Result<Value, Error> {
522
523 let wid = tools::required_integer_param(&param, "wid")? as usize;
524 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
525 let size = tools::required_integer_param(&param, "size")? as u64;
526 let csum_str = tools::required_string_param(&param, "csum")?;
527 let csum = proxmox::tools::hex_to_digest(csum_str)?;
528
529 let env: &BackupEnvironment = rpcenv.as_ref();
530
531 env.fixed_writer_close(wid, chunk_count, size, csum)?;
532
533 env.log(format!("sucessfully closed fixed index {}", wid));
534
535 Ok(Value::Null)
536 }
537
538 fn finish_backup (
539 _param: Value,
540 _info: &ApiMethod,
541 rpcenv: &mut dyn RpcEnvironment,
542 ) -> Result<Value, Error> {
543
544 let env: &BackupEnvironment = rpcenv.as_ref();
545
546 env.finish_backup()?;
547 env.log("sucessfully finished backup");
548
549 Ok(Value::Null)
550 }
551
552 #[sortable]
553 pub const API_METHOD_DYNAMIC_CHUNK_INDEX: ApiMethod = ApiMethod::new(
554 &ApiHandler::Async(&dynamic_chunk_index),
555 &ObjectSchema::new(
556 r###"
557 Download the dynamic chunk index from the previous backup.
558 Simply returns an empty list if this is the first backup.
559 "### ,
560 &sorted!([
561 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
562 ]),
563 )
564 );
565
566 fn dynamic_chunk_index(
567 _parts: Parts,
568 _req_body: Body,
569 param: Value,
570 _info: &ApiMethod,
571 rpcenv: Box<dyn RpcEnvironment>,
572 ) -> Result<ApiFuture, Error> {
573
574 let env: &BackupEnvironment = rpcenv.as_ref();
575
576 let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
577
578 if !archive_name.ends_with(".didx") {
579 bail!("wrong archive extension: '{}'", archive_name);
580 }
581
582 let empty_response = {
583 Response::builder()
584 .status(StatusCode::OK)
585 .body(Body::empty())?
586 };
587
588 let last_backup = match &env.last_backup {
589 Some(info) => info,
590 None => return Ok(Box::new(future::ok(empty_response))),
591 };
592
593 let mut path = last_backup.backup_dir.relative_path();
594 path.push(&archive_name);
595
596 let index = match env.datastore.open_dynamic_reader(path) {
597 Ok(index) => index,
598 Err(_) => {
599 env.log(format!("there is no last backup for archive '{}'", archive_name));
600 return Ok(Box::new(future::ok(empty_response)));
601 }
602 };
603
604 env.log(format!("download last backup index for archive '{}'", archive_name));
605
606 let count = index.index_count();
607 for pos in 0..count {
608 let (start, end, digest) = index.chunk_info(pos)?;
609 let size = (end - start) as u32;
610 env.register_chunk(digest, size)?;
611 }
612
613 let reader = DigestListEncoder::new(Box::new(index));
614
615 let stream = WrappedReaderStream::new(reader);
616
617 // fixme: set size, content type?
618 let response = http::Response::builder()
619 .status(200)
620 .body(Body::wrap_stream(stream))?;
621
622 Ok(Box::new(future::ok(response)))
623 }
624
625 #[sortable]
626 pub const API_METHOD_FIXED_CHUNK_INDEX: ApiMethod = ApiMethod::new(
627 &ApiHandler::Async(&fixed_chunk_index),
628 &ObjectSchema::new(
629 r###"
630 Download the fixed chunk index from the previous backup.
631 Simply returns an empty list if this is the first backup.
632 "### ,
633 &sorted!([
634 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
635 ]),
636 )
637 );
638
639 fn fixed_chunk_index(
640 _parts: Parts,
641 _req_body: Body,
642 param: Value,
643 _info: &ApiMethod,
644 rpcenv: Box<dyn RpcEnvironment>,
645 ) -> Result<ApiFuture, Error> {
646
647 let env: &BackupEnvironment = rpcenv.as_ref();
648
649 let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
650
651 if !archive_name.ends_with(".fidx") {
652 bail!("wrong archive extension: '{}'", archive_name);
653 }
654
655 let empty_response = {
656 Response::builder()
657 .status(StatusCode::OK)
658 .body(Body::empty())?
659 };
660
661 let last_backup = match &env.last_backup {
662 Some(info) => info,
663 None => return Ok(Box::new(future::ok(empty_response))),
664 };
665
666 let mut path = last_backup.backup_dir.relative_path();
667 path.push(&archive_name);
668
669 let index = match env.datastore.open_fixed_reader(path) {
670 Ok(index) => index,
671 Err(_) => {
672 env.log(format!("there is no last backup for archive '{}'", archive_name));
673 return Ok(Box::new(future::ok(empty_response)));
674 }
675 };
676
677 env.log(format!("download last backup index for archive '{}'", archive_name));
678
679 let count = index.index_count();
680 let image_size = index.index_bytes();
681 for pos in 0..count {
682 let digest = index.index_digest(pos).unwrap();
683 // Note: last chunk can be smaller
684 let start = (pos*index.chunk_size) as u64;
685 let mut end = start + index.chunk_size as u64;
686 if end > image_size { end = image_size; }
687 let size = (end - start) as u32;
688 env.register_chunk(*digest, size)?;
689 }
690
691 let reader = DigestListEncoder::new(Box::new(index));
692
693 let stream = WrappedReaderStream::new(reader);
694
695 // fixme: set size, content type?
696 let response = http::Response::builder()
697 .status(200)
698 .body(Body::wrap_stream(stream))?;
699
700 Ok(Box::new(future::ok(response)))
701 }