]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup.rs
update backup api for incremental backup
[proxmox-backup.git] / src / api2 / backup.rs
1 use anyhow::{bail, format_err, Error};
2 use futures::*;
3 use hyper::header::{HeaderValue, UPGRADE};
4 use hyper::http::request::Parts;
5 use hyper::{Body, Response, StatusCode};
6 use serde_json::{json, Value};
7
8 use proxmox::{sortable, identity, list_subdirs_api_method};
9 use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironment, Permission};
10 use proxmox::api::router::SubdirMap;
11 use proxmox::api::schema::*;
12
13 use crate::tools;
14 use crate::server::{WorkerTask, H2Service};
15 use crate::backup::*;
16 use crate::api2::types::*;
17 use crate::config::acl::PRIV_DATASTORE_BACKUP;
18 use crate::config::cached_user_info::CachedUserInfo;
19
20 mod environment;
21 use environment::*;
22
23 mod upload_chunk;
24 use upload_chunk::*;
25
26 pub const ROUTER: Router = Router::new()
27 .upgrade(&API_METHOD_UPGRADE_BACKUP);
28
29 #[sortable]
30 pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
31 &ApiHandler::AsyncHttp(&upgrade_to_backup_protocol),
32 &ObjectSchema::new(
33 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1!(), "')."),
34 &sorted!([
35 ("store", false, &DATASTORE_SCHEMA),
36 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
37 ("backup-id", false, &BACKUP_ID_SCHEMA),
38 ("backup-time", false, &BACKUP_TIME_SCHEMA),
39 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
40 ]),
41 )
42 ).access(
43 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
44 Some("The user needs Datastore.Backup privilege on /datastore/{store} and needs to own the backup group."),
45 &Permission::Anybody
46 );
47
48 fn upgrade_to_backup_protocol(
49 parts: Parts,
50 req_body: Body,
51 param: Value,
52 _info: &ApiMethod,
53 rpcenv: Box<dyn RpcEnvironment>,
54 ) -> ApiResponseFuture {
55
56 async move {
57 let debug = param["debug"].as_bool().unwrap_or(false);
58
59 let username = rpcenv.get_user().unwrap();
60
61 let store = tools::required_string_param(&param, "store")?.to_owned();
62
63 let user_info = CachedUserInfo::new()?;
64 user_info.check_privs(&username, &["datastore", &store], PRIV_DATASTORE_BACKUP, false)?;
65
66 let datastore = DataStore::lookup_datastore(&store)?;
67
68 let backup_type = tools::required_string_param(&param, "backup-type")?;
69 let backup_id = tools::required_string_param(&param, "backup-id")?;
70 let backup_time = tools::required_integer_param(&param, "backup-time")?;
71
72 let protocols = parts
73 .headers
74 .get("UPGRADE")
75 .ok_or_else(|| format_err!("missing Upgrade header"))?
76 .to_str()?;
77
78 if protocols != PROXMOX_BACKUP_PROTOCOL_ID_V1!() {
79 bail!("invalid protocol name");
80 }
81
82 if parts.version >= http::version::Version::HTTP_2 {
83 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
84 }
85
86 let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
87
88 let env_type = rpcenv.env_type();
89
90 let backup_group = BackupGroup::new(backup_type, backup_id);
91 let owner = datastore.create_backup_group(&backup_group, &username)?;
92 // permission check
93 if owner != username { // only the owner is allowed to create additional snapshots
94 bail!("backup owner check failed ({} != {})", username, owner);
95 }
96
97 let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None);
98 let backup_dir = BackupDir::new_with_group(backup_group, backup_time);
99
100 if let Some(last) = &last_backup {
101 if backup_dir.backup_time() <= last.backup_dir.backup_time() {
102 bail!("backup timestamp is older than last backup.");
103 }
104 // fixme: abort if last backup is still running - howto test?
105 // Idea: write upid into a file inside snapshot dir. then test if
106 // it is still running here.
107 }
108
109 let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
110 if !is_new { bail!("backup directory already exists."); }
111
112 WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
113 let mut env = BackupEnvironment::new(
114 env_type, username.clone(), worker.clone(), datastore, backup_dir);
115
116 env.debug = debug;
117 env.last_backup = last_backup;
118
119 env.log(format!("starting new backup on datastore '{}': {:?}", store, path));
120
121 let service = H2Service::new(env.clone(), worker.clone(), &BACKUP_API_ROUTER, debug);
122
123 let abort_future = worker.abort_future();
124
125 let env2 = env.clone();
126
127 let mut req_fut = req_body
128 .on_upgrade()
129 .map_err(Error::from)
130 .and_then(move |conn| {
131 env2.debug("protocol upgrade done");
132
133 let mut http = hyper::server::conn::Http::new();
134 http.http2_only(true);
135 // increase window size: todo - find optiomal size
136 let window_size = 32*1024*1024; // max = (1 << 31) - 2
137 http.http2_initial_stream_window_size(window_size);
138 http.http2_initial_connection_window_size(window_size);
139
140 http.serve_connection(conn, service)
141 .map_err(Error::from)
142 });
143 let mut abort_future = abort_future
144 .map(|_| Err(format_err!("task aborted")));
145
146 async move {
147 let res = select!{
148 req = req_fut => req,
149 abrt = abort_future => abrt,
150 };
151
152 match (res, env.ensure_finished()) {
153 (Ok(_), Ok(())) => {
154 env.log("backup finished successfully");
155 Ok(())
156 },
157 (Err(err), Ok(())) => {
158 // ignore errors after finish
159 env.log(format!("backup had errors but finished: {}", err));
160 Ok(())
161 },
162 (Ok(_), Err(err)) => {
163 env.log(format!("backup ended and finish failed: {}", err));
164 env.log("removing unfinished backup");
165 env.remove_backup()?;
166 Err(err)
167 },
168 (Err(err), Err(_)) => {
169 env.log(format!("backup failed: {}", err));
170 env.log("removing failed backup");
171 env.remove_backup()?;
172 Err(err)
173 },
174 }
175 }
176 })?;
177
178 let response = Response::builder()
179 .status(StatusCode::SWITCHING_PROTOCOLS)
180 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
181 .body(Body::empty())?;
182
183 Ok(response)
184 }.boxed()
185 }
186
187 pub const BACKUP_API_SUBDIRS: SubdirMap = &[
188 (
189 "blob", &Router::new()
190 .upload(&API_METHOD_UPLOAD_BLOB)
191 ),
192 (
193 "dynamic_chunk", &Router::new()
194 .upload(&API_METHOD_UPLOAD_DYNAMIC_CHUNK)
195 ),
196 (
197 "dynamic_close", &Router::new()
198 .post(&API_METHOD_CLOSE_DYNAMIC_INDEX)
199 ),
200 (
201 "dynamic_index", &Router::new()
202 .post(&API_METHOD_CREATE_DYNAMIC_INDEX)
203 .put(&API_METHOD_DYNAMIC_APPEND)
204 ),
205 (
206 "finish", &Router::new()
207 .post(
208 &ApiMethod::new(
209 &ApiHandler::Sync(&finish_backup),
210 &ObjectSchema::new("Mark backup as finished.", &[])
211 )
212 )
213 ),
214 (
215 "fixed_chunk", &Router::new()
216 .upload(&API_METHOD_UPLOAD_FIXED_CHUNK)
217 ),
218 (
219 "fixed_close", &Router::new()
220 .post(&API_METHOD_CLOSE_FIXED_INDEX)
221 ),
222 (
223 "fixed_index", &Router::new()
224 .post(&API_METHOD_CREATE_FIXED_INDEX)
225 .put(&API_METHOD_FIXED_APPEND)
226 ),
227 (
228 "previous", &Router::new()
229 .download(&API_METHOD_DOWNLOAD_PREVIOUS)
230 ),
231 (
232 "speedtest", &Router::new()
233 .upload(&API_METHOD_UPLOAD_SPEEDTEST)
234 ),
235 ];
236
237 pub const BACKUP_API_ROUTER: Router = Router::new()
238 .get(&list_subdirs_api_method!(BACKUP_API_SUBDIRS))
239 .subdirs(BACKUP_API_SUBDIRS);
240
241 #[sortable]
242 pub const API_METHOD_CREATE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
243 &ApiHandler::Sync(&create_dynamic_index),
244 &ObjectSchema::new(
245 "Create dynamic chunk index file.",
246 &sorted!([
247 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
248 ]),
249 )
250 );
251
252 fn create_dynamic_index(
253 param: Value,
254 _info: &ApiMethod,
255 rpcenv: &mut dyn RpcEnvironment,
256 ) -> Result<Value, Error> {
257
258 let env: &BackupEnvironment = rpcenv.as_ref();
259
260 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
261
262 let archive_name = name.clone();
263 if !archive_name.ends_with(".didx") {
264 bail!("wrong archive extension: '{}'", archive_name);
265 }
266
267 let mut path = env.backup_dir.relative_path();
268 path.push(archive_name);
269
270 let index = env.datastore.create_dynamic_writer(&path)?;
271 let wid = env.register_dynamic_writer(index, name)?;
272
273 env.log(format!("created new dynamic index {} ({:?})", wid, path));
274
275 Ok(json!(wid))
276 }
277
278 #[sortable]
279 pub const API_METHOD_CREATE_FIXED_INDEX: ApiMethod = ApiMethod::new(
280 &ApiHandler::Sync(&create_fixed_index),
281 &ObjectSchema::new(
282 "Create fixed chunk index file.",
283 &sorted!([
284 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
285 ("size", false, &IntegerSchema::new("File size.")
286 .minimum(1)
287 .schema()
288 ),
289 ("reuse-csum", true, &StringSchema::new("If set, compare last backup's \
290 csum and reuse index for incremental backup if it matches.").schema()),
291 ]),
292 )
293 );
294
295 fn create_fixed_index(
296 param: Value,
297 _info: &ApiMethod,
298 rpcenv: &mut dyn RpcEnvironment,
299 ) -> Result<Value, Error> {
300
301 let env: &BackupEnvironment = rpcenv.as_ref();
302
303 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
304 let size = tools::required_integer_param(&param, "size")? as usize;
305 let reuse_csum = param["reuse-csum"].as_str();
306
307 let archive_name = name.clone();
308 if !archive_name.ends_with(".fidx") {
309 bail!("wrong archive extension: '{}'", archive_name);
310 }
311
312 let mut path = env.backup_dir.relative_path();
313 path.push(&archive_name);
314
315 let chunk_size = 4096*1024; // todo: ??
316
317 // do incremental backup if csum is set
318 let mut reader = None;
319 let mut incremental = false;
320 if let Some(csum) = reuse_csum {
321 incremental = true;
322 let last_backup = match &env.last_backup {
323 Some(info) => info,
324 None => {
325 bail!("cannot reuse index - no previous backup exists");
326 }
327 };
328
329 let mut last_path = last_backup.backup_dir.relative_path();
330 last_path.push(&archive_name);
331
332 let index = match env.datastore.open_fixed_reader(last_path) {
333 Ok(index) => index,
334 Err(_) => {
335 bail!("cannot reuse index - no previous backup exists for archive");
336 }
337 };
338
339 let (old_csum, _) = index.compute_csum();
340 let old_csum = proxmox::tools::digest_to_hex(&old_csum);
341 if old_csum != csum {
342 bail!("expected csum ({}) doesn't match last backup's ({}), cannot do incremental backup",
343 csum, old_csum);
344 }
345
346 reader = Some(index);
347 }
348
349 let mut writer = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
350
351 if let Some(reader) = reader {
352 writer.clone_data_from(&reader)?;
353 }
354
355 let wid = env.register_fixed_writer(writer, name, size, chunk_size as u32, incremental)?;
356
357 env.log(format!("created new fixed index {} ({:?})", wid, path));
358
359 Ok(json!(wid))
360 }
361
362 #[sortable]
363 pub const API_METHOD_DYNAMIC_APPEND: ApiMethod = ApiMethod::new(
364 &ApiHandler::Sync(&dynamic_append),
365 &ObjectSchema::new(
366 "Append chunk to dynamic index writer.",
367 &sorted!([
368 (
369 "wid",
370 false,
371 &IntegerSchema::new("Dynamic writer ID.")
372 .minimum(1)
373 .maximum(256)
374 .schema()
375 ),
376 (
377 "digest-list",
378 false,
379 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
380 ),
381 (
382 "offset-list",
383 false,
384 &ArraySchema::new(
385 "Chunk offset list.",
386 &IntegerSchema::new("Corresponding chunk offsets.")
387 .minimum(0)
388 .schema()
389 ).schema()
390 ),
391 ]),
392 )
393 );
394
395 fn dynamic_append (
396 param: Value,
397 _info: &ApiMethod,
398 rpcenv: &mut dyn RpcEnvironment,
399 ) -> Result<Value, Error> {
400
401 let wid = tools::required_integer_param(&param, "wid")? as usize;
402 let digest_list = tools::required_array_param(&param, "digest-list")?;
403 let offset_list = tools::required_array_param(&param, "offset-list")?;
404
405 if offset_list.len() != digest_list.len() {
406 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
407 }
408
409 let env: &BackupEnvironment = rpcenv.as_ref();
410
411 env.debug(format!("dynamic_append {} chunks", digest_list.len()));
412
413 for (i, item) in digest_list.iter().enumerate() {
414 let digest_str = item.as_str().unwrap();
415 let digest = proxmox::tools::hex_to_digest(digest_str)?;
416 let offset = offset_list[i].as_u64().unwrap();
417 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
418
419 env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
420
421 env.debug(format!("successfully added chunk {} to dynamic index {} (offset {}, size {})", digest_str, wid, offset, size));
422 }
423
424 Ok(Value::Null)
425 }
426
427 #[sortable]
428 pub const API_METHOD_FIXED_APPEND: ApiMethod = ApiMethod::new(
429 &ApiHandler::Sync(&fixed_append),
430 &ObjectSchema::new(
431 "Append chunk to fixed index writer.",
432 &sorted!([
433 (
434 "wid",
435 false,
436 &IntegerSchema::new("Fixed writer ID.")
437 .minimum(1)
438 .maximum(256)
439 .schema()
440 ),
441 (
442 "digest-list",
443 false,
444 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
445 ),
446 (
447 "offset-list",
448 false,
449 &ArraySchema::new(
450 "Chunk offset list.",
451 &IntegerSchema::new("Corresponding chunk offsets.")
452 .minimum(0)
453 .schema()
454 ).schema()
455 )
456 ]),
457 )
458 );
459
460 fn fixed_append (
461 param: Value,
462 _info: &ApiMethod,
463 rpcenv: &mut dyn RpcEnvironment,
464 ) -> Result<Value, Error> {
465
466 let wid = tools::required_integer_param(&param, "wid")? as usize;
467 let digest_list = tools::required_array_param(&param, "digest-list")?;
468 let offset_list = tools::required_array_param(&param, "offset-list")?;
469
470 if offset_list.len() != digest_list.len() {
471 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
472 }
473
474 let env: &BackupEnvironment = rpcenv.as_ref();
475
476 env.debug(format!("fixed_append {} chunks", digest_list.len()));
477
478 for (i, item) in digest_list.iter().enumerate() {
479 let digest_str = item.as_str().unwrap();
480 let digest = proxmox::tools::hex_to_digest(digest_str)?;
481 let offset = offset_list[i].as_u64().unwrap();
482 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
483
484 env.fixed_writer_append_chunk(wid, offset, size, &digest)?;
485
486 env.debug(format!("successfully added chunk {} to fixed index {} (offset {}, size {})", digest_str, wid, offset, size));
487 }
488
489 Ok(Value::Null)
490 }
491
492 #[sortable]
493 pub const API_METHOD_CLOSE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
494 &ApiHandler::Sync(&close_dynamic_index),
495 &ObjectSchema::new(
496 "Close dynamic index writer.",
497 &sorted!([
498 (
499 "wid",
500 false,
501 &IntegerSchema::new("Dynamic writer ID.")
502 .minimum(1)
503 .maximum(256)
504 .schema()
505 ),
506 (
507 "chunk-count",
508 false,
509 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
510 .minimum(1)
511 .schema()
512 ),
513 (
514 "size",
515 false,
516 &IntegerSchema::new("File size. This is used to verify that the server got all data.")
517 .minimum(1)
518 .schema()
519 ),
520 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
521 ]),
522 )
523 );
524
525 fn close_dynamic_index (
526 param: Value,
527 _info: &ApiMethod,
528 rpcenv: &mut dyn RpcEnvironment,
529 ) -> Result<Value, Error> {
530
531 let wid = tools::required_integer_param(&param, "wid")? as usize;
532 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
533 let size = tools::required_integer_param(&param, "size")? as u64;
534 let csum_str = tools::required_string_param(&param, "csum")?;
535 let csum = proxmox::tools::hex_to_digest(csum_str)?;
536
537 let env: &BackupEnvironment = rpcenv.as_ref();
538
539 env.dynamic_writer_close(wid, chunk_count, size, csum)?;
540
541 env.log(format!("successfully closed dynamic index {}", wid));
542
543 Ok(Value::Null)
544 }
545
546 #[sortable]
547 pub const API_METHOD_CLOSE_FIXED_INDEX: ApiMethod = ApiMethod::new(
548 &ApiHandler::Sync(&close_fixed_index),
549 &ObjectSchema::new(
550 "Close fixed index writer.",
551 &sorted!([
552 (
553 "wid",
554 false,
555 &IntegerSchema::new("Fixed writer ID.")
556 .minimum(1)
557 .maximum(256)
558 .schema()
559 ),
560 (
561 "chunk-count",
562 false,
563 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks. Ignored for incremental backups.")
564 .minimum(0)
565 .schema()
566 ),
567 (
568 "size",
569 false,
570 &IntegerSchema::new("File size. This is used to verify that the server got all data. Ignored for incremental backups.")
571 .minimum(0)
572 .schema()
573 ),
574 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
575 ]),
576 )
577 );
578
579 fn close_fixed_index (
580 param: Value,
581 _info: &ApiMethod,
582 rpcenv: &mut dyn RpcEnvironment,
583 ) -> Result<Value, Error> {
584
585 let wid = tools::required_integer_param(&param, "wid")? as usize;
586 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
587 let size = tools::required_integer_param(&param, "size")? as u64;
588 let csum_str = tools::required_string_param(&param, "csum")?;
589 let csum = proxmox::tools::hex_to_digest(csum_str)?;
590
591 let env: &BackupEnvironment = rpcenv.as_ref();
592
593 env.fixed_writer_close(wid, chunk_count, size, csum)?;
594
595 env.log(format!("successfully closed fixed index {}", wid));
596
597 Ok(Value::Null)
598 }
599
600 fn finish_backup (
601 _param: Value,
602 _info: &ApiMethod,
603 rpcenv: &mut dyn RpcEnvironment,
604 ) -> Result<Value, Error> {
605
606 let env: &BackupEnvironment = rpcenv.as_ref();
607
608 env.finish_backup()?;
609 env.log("successfully finished backup");
610
611 Ok(Value::Null)
612 }
613
614 #[sortable]
615 pub const API_METHOD_DOWNLOAD_PREVIOUS: ApiMethod = ApiMethod::new(
616 &ApiHandler::AsyncHttp(&download_previous),
617 &ObjectSchema::new(
618 "Download archive from previous backup.",
619 &sorted!([
620 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
621 ]),
622 )
623 );
624
625 fn download_previous(
626 _parts: Parts,
627 _req_body: Body,
628 param: Value,
629 _info: &ApiMethod,
630 rpcenv: Box<dyn RpcEnvironment>,
631 ) -> ApiResponseFuture {
632
633 async move {
634 let env: &BackupEnvironment = rpcenv.as_ref();
635
636 let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
637
638 let last_backup = match &env.last_backup {
639 Some(info) => info,
640 None => bail!("no previous backup"),
641 };
642
643 env.log(format!("download '{}' from previous backup.", archive_name));
644
645 let mut path = env.datastore.snapshot_path(&last_backup.backup_dir);
646 path.push(&archive_name);
647
648 crate::api2::helpers::create_download_response(path).await
649 }.boxed()
650 }