]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup.rs
test another api macro use
[proxmox-backup.git] / src / api2 / backup.rs
1 use failure::*;
2 use futures::*;
3 use hyper::header::{HeaderValue, UPGRADE};
4 use hyper::http::request::Parts;
5 use hyper::{Body, Response, StatusCode};
6 use serde_json::{json, Value};
7
8 use proxmox::{sortable, identity};
9 use proxmox::api::{api, list_subdirs_api_method};
10 use proxmox::api::{ApiFuture, ApiHandler, ApiMethod, Router, RpcEnvironment};
11 use proxmox::api::router::SubdirMap;
12 use proxmox::api::schema::*;
13
14 use crate::tools;
15 use crate::tools::wrapped_reader_stream::*;
16 use crate::server::{WorkerTask, H2Service};
17 use crate::backup::*;
18 use crate::api2::types::*;
19
20 mod environment;
21 use environment::*;
22
23 mod upload_chunk;
24 use upload_chunk::*;
25
26 pub const ROUTER: Router = Router::new()
27 .upgrade(&API_METHOD_UPGRADE_BACKUP);
28
29 #[sortable]
30 pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
31 &ApiHandler::AsyncHttp(&upgrade_to_backup_protocol),
32 &ObjectSchema::new(
33 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1!(), "')."),
34 &sorted!([
35 ("store", false, &StringSchema::new("Datastore name.").schema()),
36 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
37 ("backup-id", false, &BACKUP_ID_SCHEMA),
38 ("backup-time", false, &BACKUP_TIME_SCHEMA),
39 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
40 ]),
41 )
42 );
43
44 fn upgrade_to_backup_protocol(
45 parts: Parts,
46 req_body: Body,
47 param: Value,
48 _info: &ApiMethod,
49 rpcenv: Box<dyn RpcEnvironment>,
50 ) -> ApiFuture {
51
52 async move {
53 let debug = param["debug"].as_bool().unwrap_or(false);
54
55 let store = tools::required_string_param(&param, "store")?.to_owned();
56 let datastore = DataStore::lookup_datastore(&store)?;
57
58 let backup_type = tools::required_string_param(&param, "backup-type")?;
59 let backup_id = tools::required_string_param(&param, "backup-id")?;
60 let backup_time = tools::required_integer_param(&param, "backup-time")?;
61
62 let protocols = parts
63 .headers
64 .get("UPGRADE")
65 .ok_or_else(|| format_err!("missing Upgrade header"))?
66 .to_str()?;
67
68 if protocols != PROXMOX_BACKUP_PROTOCOL_ID_V1!() {
69 bail!("invalid protocol name");
70 }
71
72 if parts.version >= http::version::Version::HTTP_2 {
73 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
74 }
75
76 let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
77
78 let username = rpcenv.get_user().unwrap();
79 let env_type = rpcenv.env_type();
80
81 let backup_group = BackupGroup::new(backup_type, backup_id);
82 let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None);
83 let backup_dir = BackupDir::new_with_group(backup_group, backup_time);
84
85 if let Some(last) = &last_backup {
86 if backup_dir.backup_time() <= last.backup_dir.backup_time() {
87 bail!("backup timestamp is older than last backup.");
88 }
89 }
90
91 let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
92 if !is_new { bail!("backup directorty already exists."); }
93
94 WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
95 let mut env = BackupEnvironment::new(
96 env_type, username.clone(), worker.clone(), datastore, backup_dir);
97
98 env.debug = debug;
99 env.last_backup = last_backup;
100
101 env.log(format!("starting new backup on datastore '{}': {:?}", store, path));
102
103 let service = H2Service::new(env.clone(), worker.clone(), &BACKUP_API_ROUTER, debug);
104
105 let abort_future = worker.abort_future();
106
107 let env2 = env.clone();
108 let env3 = env.clone();
109
110 let req_fut = req_body
111 .on_upgrade()
112 .map_err(Error::from)
113 .and_then(move |conn| {
114 env3.debug("protocol upgrade done");
115
116 let mut http = hyper::server::conn::Http::new();
117 http.http2_only(true);
118 // increase window size: todo - find optiomal size
119 let window_size = 32*1024*1024; // max = (1 << 31) - 2
120 http.http2_initial_stream_window_size(window_size);
121 http.http2_initial_connection_window_size(window_size);
122
123 http.serve_connection(conn, service)
124 .map_err(Error::from)
125 });
126 let abort_future = abort_future
127 .map(|_| Err(format_err!("task aborted")));
128
129 use futures::future::Either;
130 future::select(req_fut, abort_future)
131 .map(|res| match res {
132 Either::Left((Ok(res), _)) => Ok(res),
133 Either::Left((Err(err), _)) => Err(err),
134 Either::Right((Ok(res), _)) => Ok(res),
135 Either::Right((Err(err), _)) => Err(err),
136 })
137 .and_then(move |_result| async move {
138 env.ensure_finished()?;
139 env.log("backup finished sucessfully");
140 Ok(())
141 })
142 .then(move |result| async move {
143 if let Err(err) = result {
144 match env2.ensure_finished() {
145 Ok(()) => {}, // ignore error after finish
146 _ => {
147 env2.log(format!("backup failed: {}", err));
148 env2.log("removing failed backup");
149 env2.remove_backup()?;
150 return Err(err);
151 }
152 }
153 }
154 Ok(())
155 })
156 })?;
157
158 let response = Response::builder()
159 .status(StatusCode::SWITCHING_PROTOCOLS)
160 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
161 .body(Body::empty())?;
162
163 Ok(response)
164 }.boxed()
165 }
166
167 pub const BACKUP_API_SUBDIRS: SubdirMap = &[
168 (
169 "blob", &Router::new()
170 .upload(&API_METHOD_UPLOAD_BLOB)
171 ),
172 (
173 "dynamic_chunk", &Router::new()
174 .upload(&API_METHOD_UPLOAD_DYNAMIC_CHUNK)
175 ),
176 (
177 "dynamic_close", &Router::new()
178 .post(&API_METHOD_CLOSE_DYNAMIC_INDEX)
179 ),
180 (
181 "dynamic_index", &Router::new()
182 .download(&API_METHOD_DYNAMIC_CHUNK_INDEX)
183 .post(&API_METHOD_CREATE_DYNAMIC_INDEX)
184 .put(&API_METHOD_DYNAMIC_APPEND)
185 ),
186 (
187 "finish", &Router::new()
188 .post(
189 &ApiMethod::new(
190 &ApiHandler::Sync(&finish_backup),
191 &ObjectSchema::new("Mark backup as finished.", &[])
192 )
193 )
194 ),
195 (
196 "fixed_chunk", &Router::new()
197 .upload(&API_METHOD_UPLOAD_FIXED_CHUNK)
198 ),
199 (
200 "fixed_close", &Router::new()
201 .post(&API_METHOD_CLOSE_FIXED_INDEX)
202 ),
203 (
204 "fixed_index", &Router::new()
205 .download(&API_METHOD_FIXED_CHUNK_INDEX)
206 .post(&API_METHOD_CREATE_FIXED_INDEX)
207 .put(&API_METHOD_FIXED_APPEND)
208 ),
209 (
210 "speedtest", &Router::new()
211 .upload(&API_METHOD_UPLOAD_SPEEDTEST)
212 ),
213 ];
214
215 pub const BACKUP_API_ROUTER: Router = Router::new()
216 .get(&list_subdirs_api_method!(BACKUP_API_SUBDIRS))
217 .subdirs(BACKUP_API_SUBDIRS);
218
219 #[api(
220 input: {
221 properties: {
222 "archive-name": {
223 schema: crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA,
224 },
225 },
226 },
227 )]
228 /// Create dynamic chunk index file.
229 fn create_dynamic_index(
230 param: Value,
231 rpcenv: &mut dyn RpcEnvironment,
232 ) -> Result<Value, Error> {
233
234 let env: &BackupEnvironment = rpcenv.as_ref();
235
236 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
237
238 let archive_name = name.clone();
239 if !archive_name.ends_with(".didx") {
240 bail!("wrong archive extension: '{}'", archive_name);
241 }
242
243 let mut path = env.backup_dir.relative_path();
244 path.push(archive_name);
245
246 let index = env.datastore.create_dynamic_writer(&path)?;
247 let wid = env.register_dynamic_writer(index, name)?;
248
249 env.log(format!("created new dynamic index {} ({:?})", wid, path));
250
251 Ok(json!(wid))
252 }
253
254 #[sortable]
255 pub const API_METHOD_CREATE_FIXED_INDEX: ApiMethod = ApiMethod::new(
256 &ApiHandler::Sync(&create_fixed_index),
257 &ObjectSchema::new(
258 "Create fixed chunk index file.",
259 &sorted!([
260 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
261 ("size", false, &IntegerSchema::new("File size.")
262 .minimum(1)
263 .schema()
264 ),
265 ]),
266 )
267 );
268
269 fn create_fixed_index(
270 param: Value,
271 _info: &ApiMethod,
272 rpcenv: &mut dyn RpcEnvironment,
273 ) -> Result<Value, Error> {
274
275 let env: &BackupEnvironment = rpcenv.as_ref();
276
277 println!("PARAM: {:?}", param);
278
279 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
280 let size = tools::required_integer_param(&param, "size")? as usize;
281
282 let archive_name = name.clone();
283 if !archive_name.ends_with(".fidx") {
284 bail!("wrong archive extension: '{}'", archive_name);
285 }
286
287 let mut path = env.backup_dir.relative_path();
288 path.push(archive_name);
289
290 let chunk_size = 4096*1024; // todo: ??
291
292 let index = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
293 let wid = env.register_fixed_writer(index, name, size, chunk_size as u32)?;
294
295 env.log(format!("created new fixed index {} ({:?})", wid, path));
296
297 Ok(json!(wid))
298 }
299
300 #[sortable]
301 pub const API_METHOD_DYNAMIC_APPEND: ApiMethod = ApiMethod::new(
302 &ApiHandler::Sync(&dynamic_append),
303 &ObjectSchema::new(
304 "Append chunk to dynamic index writer.",
305 &sorted!([
306 (
307 "wid",
308 false,
309 &IntegerSchema::new("Dynamic writer ID.")
310 .minimum(1)
311 .maximum(256)
312 .schema()
313 ),
314 (
315 "digest-list",
316 false,
317 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
318 ),
319 (
320 "offset-list",
321 false,
322 &ArraySchema::new(
323 "Chunk offset list.",
324 &IntegerSchema::new("Corresponding chunk offsets.")
325 .minimum(0)
326 .schema()
327 ).schema()
328 ),
329 ]),
330 )
331 );
332
333 fn dynamic_append (
334 param: Value,
335 _info: &ApiMethod,
336 rpcenv: &mut dyn RpcEnvironment,
337 ) -> Result<Value, Error> {
338
339 let wid = tools::required_integer_param(&param, "wid")? as usize;
340 let digest_list = tools::required_array_param(&param, "digest-list")?;
341 let offset_list = tools::required_array_param(&param, "offset-list")?;
342
343 if offset_list.len() != digest_list.len() {
344 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
345 }
346
347 let env: &BackupEnvironment = rpcenv.as_ref();
348
349 env.debug(format!("dynamic_append {} chunks", digest_list.len()));
350
351 for (i, item) in digest_list.iter().enumerate() {
352 let digest_str = item.as_str().unwrap();
353 let digest = proxmox::tools::hex_to_digest(digest_str)?;
354 let offset = offset_list[i].as_u64().unwrap();
355 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
356
357 env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
358
359 env.debug(format!("sucessfully added chunk {} to dynamic index {} (offset {}, size {})", digest_str, wid, offset, size));
360 }
361
362 Ok(Value::Null)
363 }
364
365 #[sortable]
366 pub const API_METHOD_FIXED_APPEND: ApiMethod = ApiMethod::new(
367 &ApiHandler::Sync(&fixed_append),
368 &ObjectSchema::new(
369 "Append chunk to fixed index writer.",
370 &sorted!([
371 (
372 "wid",
373 false,
374 &IntegerSchema::new("Fixed writer ID.")
375 .minimum(1)
376 .maximum(256)
377 .schema()
378 ),
379 (
380 "digest-list",
381 false,
382 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
383 ),
384 (
385 "offset-list",
386 false,
387 &ArraySchema::new(
388 "Chunk offset list.",
389 &IntegerSchema::new("Corresponding chunk offsets.")
390 .minimum(0)
391 .schema()
392 ).schema()
393 )
394 ]),
395 )
396 );
397
398 fn fixed_append (
399 param: Value,
400 _info: &ApiMethod,
401 rpcenv: &mut dyn RpcEnvironment,
402 ) -> Result<Value, Error> {
403
404 let wid = tools::required_integer_param(&param, "wid")? as usize;
405 let digest_list = tools::required_array_param(&param, "digest-list")?;
406 let offset_list = tools::required_array_param(&param, "offset-list")?;
407
408 if offset_list.len() != digest_list.len() {
409 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
410 }
411
412 let env: &BackupEnvironment = rpcenv.as_ref();
413
414 env.debug(format!("fixed_append {} chunks", digest_list.len()));
415
416 for (i, item) in digest_list.iter().enumerate() {
417 let digest_str = item.as_str().unwrap();
418 let digest = proxmox::tools::hex_to_digest(digest_str)?;
419 let offset = offset_list[i].as_u64().unwrap();
420 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
421
422 env.fixed_writer_append_chunk(wid, offset, size, &digest)?;
423
424 env.debug(format!("sucessfully added chunk {} to fixed index {} (offset {}, size {})", digest_str, wid, offset, size));
425 }
426
427 Ok(Value::Null)
428 }
429
430 #[sortable]
431 pub const API_METHOD_CLOSE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
432 &ApiHandler::Sync(&close_dynamic_index),
433 &ObjectSchema::new(
434 "Close dynamic index writer.",
435 &sorted!([
436 (
437 "wid",
438 false,
439 &IntegerSchema::new("Dynamic writer ID.")
440 .minimum(1)
441 .maximum(256)
442 .schema()
443 ),
444 (
445 "chunk-count",
446 false,
447 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
448 .minimum(1)
449 .schema()
450 ),
451 (
452 "size",
453 false,
454 &IntegerSchema::new("File size. This is used to verify that the server got all data.")
455 .minimum(1)
456 .schema()
457 ),
458 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
459 ]),
460 )
461 );
462
463 fn close_dynamic_index (
464 param: Value,
465 _info: &ApiMethod,
466 rpcenv: &mut dyn RpcEnvironment,
467 ) -> Result<Value, Error> {
468
469 let wid = tools::required_integer_param(&param, "wid")? as usize;
470 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
471 let size = tools::required_integer_param(&param, "size")? as u64;
472 let csum_str = tools::required_string_param(&param, "csum")?;
473 let csum = proxmox::tools::hex_to_digest(csum_str)?;
474
475 let env: &BackupEnvironment = rpcenv.as_ref();
476
477 env.dynamic_writer_close(wid, chunk_count, size, csum)?;
478
479 env.log(format!("sucessfully closed dynamic index {}", wid));
480
481 Ok(Value::Null)
482 }
483
484 #[sortable]
485 pub const API_METHOD_CLOSE_FIXED_INDEX: ApiMethod = ApiMethod::new(
486 &ApiHandler::Sync(&close_fixed_index),
487 &ObjectSchema::new(
488 "Close fixed index writer.",
489 &sorted!([
490 (
491 "wid",
492 false,
493 &IntegerSchema::new("Fixed writer ID.")
494 .minimum(1)
495 .maximum(256)
496 .schema()
497 ),
498 (
499 "chunk-count",
500 false,
501 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
502 .minimum(1)
503 .schema()
504 ),
505 (
506 "size",
507 false,
508 &IntegerSchema::new("File size. This is used to verify that the server got all data.")
509 .minimum(1)
510 .schema()
511 ),
512 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
513 ]),
514 )
515 );
516
517 fn close_fixed_index (
518 param: Value,
519 _info: &ApiMethod,
520 rpcenv: &mut dyn RpcEnvironment,
521 ) -> Result<Value, Error> {
522
523 let wid = tools::required_integer_param(&param, "wid")? as usize;
524 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
525 let size = tools::required_integer_param(&param, "size")? as u64;
526 let csum_str = tools::required_string_param(&param, "csum")?;
527 let csum = proxmox::tools::hex_to_digest(csum_str)?;
528
529 let env: &BackupEnvironment = rpcenv.as_ref();
530
531 env.fixed_writer_close(wid, chunk_count, size, csum)?;
532
533 env.log(format!("sucessfully closed fixed index {}", wid));
534
535 Ok(Value::Null)
536 }
537
538 fn finish_backup (
539 _param: Value,
540 _info: &ApiMethod,
541 rpcenv: &mut dyn RpcEnvironment,
542 ) -> Result<Value, Error> {
543
544 let env: &BackupEnvironment = rpcenv.as_ref();
545
546 env.finish_backup()?;
547 env.log("sucessfully finished backup");
548
549 Ok(Value::Null)
550 }
551
552 #[sortable]
553 pub const API_METHOD_DYNAMIC_CHUNK_INDEX: ApiMethod = ApiMethod::new(
554 &ApiHandler::AsyncHttp(&dynamic_chunk_index),
555 &ObjectSchema::new(
556 r###"
557 Download the dynamic chunk index from the previous backup.
558 Simply returns an empty list if this is the first backup.
559 "### ,
560 &sorted!([
561 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
562 ]),
563 )
564 );
565
566 fn dynamic_chunk_index(
567 _parts: Parts,
568 _req_body: Body,
569 param: Value,
570 _info: &ApiMethod,
571 rpcenv: Box<dyn RpcEnvironment>,
572 ) -> ApiFuture {
573
574 async move {
575 let env: &BackupEnvironment = rpcenv.as_ref();
576
577 let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
578
579 if !archive_name.ends_with(".didx") {
580 bail!("wrong archive extension: '{}'", archive_name);
581 }
582
583 let empty_response = {
584 Response::builder()
585 .status(StatusCode::OK)
586 .body(Body::empty())?
587 };
588
589 let last_backup = match &env.last_backup {
590 Some(info) => info,
591 None => return Ok(empty_response),
592 };
593
594 let mut path = last_backup.backup_dir.relative_path();
595 path.push(&archive_name);
596
597 let index = match env.datastore.open_dynamic_reader(path) {
598 Ok(index) => index,
599 Err(_) => {
600 env.log(format!("there is no last backup for archive '{}'", archive_name));
601 return Ok(empty_response);
602 }
603 };
604
605 env.log(format!("download last backup index for archive '{}'", archive_name));
606
607 let count = index.index_count();
608 for pos in 0..count {
609 let (start, end, digest) = index.chunk_info(pos)?;
610 let size = (end - start) as u32;
611 env.register_chunk(digest, size)?;
612 }
613
614 let reader = DigestListEncoder::new(Box::new(index));
615
616 let stream = WrappedReaderStream::new(reader);
617
618 // fixme: set size, content type?
619 let response = http::Response::builder()
620 .status(200)
621 .body(Body::wrap_stream(stream))?;
622
623 Ok(response)
624 }.boxed()
625 }
626
627 #[sortable]
628 pub const API_METHOD_FIXED_CHUNK_INDEX: ApiMethod = ApiMethod::new(
629 &ApiHandler::AsyncHttp(&fixed_chunk_index),
630 &ObjectSchema::new(
631 r###"
632 Download the fixed chunk index from the previous backup.
633 Simply returns an empty list if this is the first backup.
634 "### ,
635 &sorted!([
636 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
637 ]),
638 )
639 );
640
641 fn fixed_chunk_index(
642 _parts: Parts,
643 _req_body: Body,
644 param: Value,
645 _info: &ApiMethod,
646 rpcenv: Box<dyn RpcEnvironment>,
647 ) -> ApiFuture {
648
649 async move {
650 let env: &BackupEnvironment = rpcenv.as_ref();
651
652 let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
653
654 if !archive_name.ends_with(".fidx") {
655 bail!("wrong archive extension: '{}'", archive_name);
656 }
657
658 let empty_response = {
659 Response::builder()
660 .status(StatusCode::OK)
661 .body(Body::empty())?
662 };
663
664 let last_backup = match &env.last_backup {
665 Some(info) => info,
666 None => return Ok(empty_response),
667 };
668
669 let mut path = last_backup.backup_dir.relative_path();
670 path.push(&archive_name);
671
672 let index = match env.datastore.open_fixed_reader(path) {
673 Ok(index) => index,
674 Err(_) => {
675 env.log(format!("there is no last backup for archive '{}'", archive_name));
676 return Ok(empty_response);
677 }
678 };
679
680 env.log(format!("download last backup index for archive '{}'", archive_name));
681
682 let count = index.index_count();
683 let image_size = index.index_bytes();
684 for pos in 0..count {
685 let digest = index.index_digest(pos).unwrap();
686 // Note: last chunk can be smaller
687 let start = (pos*index.chunk_size) as u64;
688 let mut end = start + index.chunk_size as u64;
689 if end > image_size { end = image_size; }
690 let size = (end - start) as u32;
691 env.register_chunk(*digest, size)?;
692 }
693
694 let reader = DigestListEncoder::new(Box::new(index));
695
696 let stream = WrappedReaderStream::new(reader);
697
698 // fixme: set size, content type?
699 let response = http::Response::builder()
700 .status(200)
701 .body(Body::wrap_stream(stream))?;
702
703 Ok(response)
704 }.boxed()
705 }