]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup.rs
src/api2/backup.rs: new required backup-time parameter
[proxmox-backup.git] / src / api2 / backup.rs
1 use failure::*;
2 use lazy_static::lazy_static;
3
4 use std::sync::Arc;
5
6 use futures::*;
7 use hyper::header::{HeaderValue, UPGRADE};
8 use hyper::{Body, Response, StatusCode};
9 use hyper::http::request::Parts;
10
11 use serde_json::{json, Value};
12
13 use crate::tools;
14 use crate::tools::wrapped_reader_stream::*;
15 use crate::api_schema::router::*;
16 use crate::api_schema::*;
17 use crate::server::{WorkerTask, H2Service};
18 use crate::backup::*;
19 use crate::api2::types::*;
20
21 mod environment;
22 use environment::*;
23
24 mod upload_chunk;
25 use upload_chunk::*;
26
27 pub fn router() -> Router {
28 Router::new()
29 .upgrade(api_method_upgrade_backup())
30 }
31
32 pub fn api_method_upgrade_backup() -> ApiAsyncMethod {
33 ApiAsyncMethod::new(
34 upgrade_to_backup_protocol,
35 ObjectSchema::new(concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1!(), "')."))
36 .required("store", StringSchema::new("Datastore name."))
37 .required("backup-type", StringSchema::new("Backup type.")
38 .format(Arc::new(ApiStringFormat::Enum(&["vm", "ct", "host"]))))
39 .required("backup-id", StringSchema::new("Backup ID."))
40 .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)")
41 .minimum(1547797308))
42 .optional("debug", BooleanSchema::new("Enable verbose debug logging."))
43 )
44 }
45
46 fn upgrade_to_backup_protocol(
47 parts: Parts,
48 req_body: Body,
49 param: Value,
50 _info: &ApiAsyncMethod,
51 rpcenv: Box<dyn RpcEnvironment>,
52 ) -> Result<BoxFut, Error> {
53
54 let debug = param["debug"].as_bool().unwrap_or(false);
55
56 let store = tools::required_string_param(&param, "store")?.to_owned();
57 let datastore = DataStore::lookup_datastore(&store)?;
58
59 let backup_type = tools::required_string_param(&param, "backup-type")?;
60 let backup_id = tools::required_string_param(&param, "backup-id")?;
61 let backup_time = tools::required_integer_param(&param, "backup-time")?;
62
63 let protocols = parts
64 .headers
65 .get("UPGRADE")
66 .ok_or_else(|| format_err!("missing Upgrade header"))?
67 .to_str()?;
68
69 if protocols != PROXMOX_BACKUP_PROTOCOL_ID_V1!() {
70 bail!("invalid protocol name");
71 }
72
73 if parts.version >= http::version::Version::HTTP_2 {
74 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
75 }
76
77 let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
78
79 let username = rpcenv.get_user().unwrap();
80 let env_type = rpcenv.env_type();
81
82 let backup_group = BackupGroup::new(backup_type, backup_id);
83 let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None);
84 let backup_dir = BackupDir::new_with_group(backup_group, backup_time);
85
86 if let Some(last) = &last_backup {
87 if backup_dir.backup_time() <= last.backup_dir.backup_time() {
88 bail!("backup timestamp is older than last backup.");
89 }
90 }
91
92 let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
93 if !is_new { bail!("backup directorty already exists."); }
94
95 WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
96 let mut env = BackupEnvironment::new(
97 env_type, username.clone(), worker.clone(), datastore, backup_dir);
98
99 env.debug = debug;
100 env.last_backup = last_backup;
101
102 env.log(format!("starting new backup on datastore '{}': {:?}", store, path));
103
104 let service = H2Service::new(env.clone(), worker.clone(), &BACKUP_ROUTER, debug);
105
106 let abort_future = worker.abort_future();
107
108 let env2 = env.clone();
109 let env3 = env.clone();
110
111 req_body
112 .on_upgrade()
113 .map_err(Error::from)
114 .and_then(move |conn| {
115 env3.debug("protocol upgrade done");
116
117 let mut http = hyper::server::conn::Http::new();
118 http.http2_only(true);
119 // increase window size: todo - find optiomal size
120 let window_size = 32*1024*1024; // max = (1 << 31) - 2
121 http.http2_initial_stream_window_size(window_size);
122 http.http2_initial_connection_window_size(window_size);
123
124 http.serve_connection(conn, service)
125 .map_err(Error::from)
126 })
127 .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); }))
128 .map_err(|(err, _)| err)
129 .and_then(move |(_result, _)| {
130 env.ensure_finished()?;
131 env.log("backup finished sucessfully");
132 Ok(())
133 })
134 .then(move |result| {
135 if let Err(err) = result {
136 match env2.ensure_finished() {
137 Ok(()) => {}, // ignore error after finish
138 _ => {
139 env2.log(format!("backup failed: {}", err));
140 env2.log("removing failed backup");
141 env2.remove_backup()?;
142 return Err(err);
143 }
144 }
145 }
146 Ok(())
147 })
148 })?;
149
150 let response = Response::builder()
151 .status(StatusCode::SWITCHING_PROTOCOLS)
152 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
153 .body(Body::empty())?;
154
155 Ok(Box::new(futures::future::ok(response)))
156 }
157
158 lazy_static!{
159 static ref BACKUP_ROUTER: Router = backup_api();
160 }
161
162 pub fn backup_api() -> Router {
163
164 let router = Router::new()
165 .subdir(
166 "blob", Router::new()
167 .upload(api_method_upload_blob())
168 )
169 .subdir(
170 "dynamic_chunk", Router::new()
171 .upload(api_method_upload_dynamic_chunk())
172 )
173 .subdir(
174 "dynamic_index", Router::new()
175 .download(api_method_dynamic_chunk_index())
176 .post(api_method_create_dynamic_index())
177 .put(api_method_dynamic_append())
178 )
179 .subdir(
180 "dynamic_close", Router::new()
181 .post(api_method_close_dynamic_index())
182 )
183 .subdir(
184 "fixed_chunk", Router::new()
185 .upload(api_method_upload_fixed_chunk())
186 )
187 .subdir(
188 "fixed_index", Router::new()
189 .download(api_method_fixed_chunk_index())
190 .post(api_method_create_fixed_index())
191 .put(api_method_fixed_append())
192 )
193 .subdir(
194 "fixed_close", Router::new()
195 .post(api_method_close_fixed_index())
196 )
197 .subdir(
198 "finish", Router::new()
199 .post(
200 ApiMethod::new(
201 finish_backup,
202 ObjectSchema::new("Mark backup as finished.")
203 )
204 )
205 )
206 .subdir(
207 "speedtest", Router::new()
208 .upload(api_method_upload_speedtest())
209 )
210 .list_subdirs();
211
212 router
213 }
214
215 pub fn api_method_create_dynamic_index() -> ApiMethod {
216 ApiMethod::new(
217 create_dynamic_index,
218 ObjectSchema::new("Create dynamic chunk index file.")
219 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
220 )
221 }
222
223 fn create_dynamic_index(
224 param: Value,
225 _info: &ApiMethod,
226 rpcenv: &mut dyn RpcEnvironment,
227 ) -> Result<Value, Error> {
228
229 let env: &BackupEnvironment = rpcenv.as_ref();
230
231 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
232
233 let mut archive_name = name.clone();
234 if !archive_name.ends_with(".pxar") {
235 bail!("wrong archive extension: '{}'", archive_name);
236 } else {
237 archive_name.push_str(".didx");
238 }
239
240 let mut path = env.backup_dir.relative_path();
241 path.push(archive_name);
242
243 let index = env.datastore.create_dynamic_writer(&path)?;
244 let wid = env.register_dynamic_writer(index, name)?;
245
246 env.log(format!("created new dynamic index {} ({:?})", wid, path));
247
248 Ok(json!(wid))
249 }
250
251 pub fn api_method_create_fixed_index() -> ApiMethod {
252 ApiMethod::new(
253 create_fixed_index,
254 ObjectSchema::new("Create fixed chunk index file.")
255 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
256 .required("size", IntegerSchema::new("File size.")
257 .minimum(1)
258 )
259 )
260 }
261
262 fn create_fixed_index(
263 param: Value,
264 _info: &ApiMethod,
265 rpcenv: &mut dyn RpcEnvironment,
266 ) -> Result<Value, Error> {
267
268 let env: &BackupEnvironment = rpcenv.as_ref();
269
270 println!("PARAM: {:?}", param);
271
272 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
273 let size = tools::required_integer_param(&param, "size")? as usize;
274
275 let mut archive_name = name.clone();
276 if !archive_name.ends_with(".img") {
277 bail!("wrong archive extension: '{}'", archive_name);
278 } else {
279 archive_name.push_str(".fidx");
280 }
281
282 let mut path = env.backup_dir.relative_path();
283 path.push(archive_name);
284
285 let chunk_size = 4096*1024; // todo: ??
286
287 let index = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
288 let wid = env.register_fixed_writer(index, name, size, chunk_size as u32)?;
289
290 env.log(format!("created new fixed index {} ({:?})", wid, path));
291
292 Ok(json!(wid))
293 }
294
295 pub fn api_method_dynamic_append() -> ApiMethod {
296 ApiMethod::new(
297 dynamic_append,
298 ObjectSchema::new("Append chunk to dynamic index writer.")
299 .required("wid", IntegerSchema::new("Dynamic writer ID.")
300 .minimum(1)
301 .maximum(256)
302 )
303 .required("digest-list", ArraySchema::new(
304 "Chunk digest list.", CHUNK_DIGEST_SCHEMA.clone())
305 )
306 .required("offset-list", ArraySchema::new(
307 "Chunk offset list.",
308 IntegerSchema::new("Corresponding chunk offsets.")
309 .minimum(0)
310 .into())
311 )
312 )
313 }
314
315 fn dynamic_append (
316 param: Value,
317 _info: &ApiMethod,
318 rpcenv: &mut dyn RpcEnvironment,
319 ) -> Result<Value, Error> {
320
321 let wid = tools::required_integer_param(&param, "wid")? as usize;
322 let digest_list = tools::required_array_param(&param, "digest-list")?;
323 let offset_list = tools::required_array_param(&param, "offset-list")?;
324
325 if offset_list.len() != digest_list.len() {
326 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
327 }
328
329 let env: &BackupEnvironment = rpcenv.as_ref();
330
331 env.debug(format!("dynamic_append {} chunks", digest_list.len()));
332
333 for (i, item) in digest_list.iter().enumerate() {
334 let digest_str = item.as_str().unwrap();
335 let digest = proxmox::tools::hex_to_digest(digest_str)?;
336 let offset = offset_list[i].as_u64().unwrap();
337 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
338
339 env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
340
341 env.debug(format!("sucessfully added chunk {} to dynamic index {} (offset {}, size {})", digest_str, wid, offset, size));
342 }
343
344 Ok(Value::Null)
345 }
346
347 pub fn api_method_fixed_append() -> ApiMethod {
348 ApiMethod::new(
349 fixed_append,
350 ObjectSchema::new("Append chunk to fixed index writer.")
351 .required("wid", IntegerSchema::new("Fixed writer ID.")
352 .minimum(1)
353 .maximum(256)
354 )
355 .required("digest-list", ArraySchema::new(
356 "Chunk digest list.", CHUNK_DIGEST_SCHEMA.clone())
357 )
358 .required("offset-list", ArraySchema::new(
359 "Chunk offset list.",
360 IntegerSchema::new("Corresponding chunk offsets.")
361 .minimum(0)
362 .into())
363 )
364 )
365 }
366
367 fn fixed_append (
368 param: Value,
369 _info: &ApiMethod,
370 rpcenv: &mut dyn RpcEnvironment,
371 ) -> Result<Value, Error> {
372
373 let wid = tools::required_integer_param(&param, "wid")? as usize;
374 let digest_list = tools::required_array_param(&param, "digest-list")?;
375 let offset_list = tools::required_array_param(&param, "offset-list")?;
376
377 if offset_list.len() != digest_list.len() {
378 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
379 }
380
381 let env: &BackupEnvironment = rpcenv.as_ref();
382
383 env.debug(format!("fixed_append {} chunks", digest_list.len()));
384
385 for (i, item) in digest_list.iter().enumerate() {
386 let digest_str = item.as_str().unwrap();
387 let digest = proxmox::tools::hex_to_digest(digest_str)?;
388 let offset = offset_list[i].as_u64().unwrap();
389 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
390
391 env.fixed_writer_append_chunk(wid, offset, size, &digest)?;
392
393 env.debug(format!("sucessfully added chunk {} to fixed index {} (offset {}, size {})", digest_str, wid, offset, size));
394 }
395
396 Ok(Value::Null)
397 }
398
399 pub fn api_method_close_dynamic_index() -> ApiMethod {
400 ApiMethod::new(
401 close_dynamic_index,
402 ObjectSchema::new("Close dynamic index writer.")
403 .required("wid", IntegerSchema::new("Dynamic writer ID.")
404 .minimum(1)
405 .maximum(256)
406 )
407 .required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
408 .minimum(1)
409 )
410 .required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.")
411 .minimum(1)
412 )
413 )
414 }
415
416 fn close_dynamic_index (
417 param: Value,
418 _info: &ApiMethod,
419 rpcenv: &mut dyn RpcEnvironment,
420 ) -> Result<Value, Error> {
421
422 let wid = tools::required_integer_param(&param, "wid")? as usize;
423 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
424 let size = tools::required_integer_param(&param, "size")? as u64;
425
426 let env: &BackupEnvironment = rpcenv.as_ref();
427
428 env.dynamic_writer_close(wid, chunk_count, size)?;
429
430 env.log(format!("sucessfully closed dynamic index {}", wid));
431
432 Ok(Value::Null)
433 }
434
435 pub fn api_method_close_fixed_index() -> ApiMethod {
436 ApiMethod::new(
437 close_fixed_index,
438 ObjectSchema::new("Close fixed index writer.")
439 .required("wid", IntegerSchema::new("Fixed writer ID.")
440 .minimum(1)
441 .maximum(256)
442 )
443 .required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
444 .minimum(1)
445 )
446 .required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.")
447 .minimum(1)
448 )
449 )
450 }
451
452 fn close_fixed_index (
453 param: Value,
454 _info: &ApiMethod,
455 rpcenv: &mut dyn RpcEnvironment,
456 ) -> Result<Value, Error> {
457
458 let wid = tools::required_integer_param(&param, "wid")? as usize;
459 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
460 let size = tools::required_integer_param(&param, "size")? as u64;
461
462 let env: &BackupEnvironment = rpcenv.as_ref();
463
464 env.fixed_writer_close(wid, chunk_count, size)?;
465
466 env.log(format!("sucessfully closed fixed index {}", wid));
467
468 Ok(Value::Null)
469 }
470
471 fn finish_backup (
472 _param: Value,
473 _info: &ApiMethod,
474 rpcenv: &mut dyn RpcEnvironment,
475 ) -> Result<Value, Error> {
476
477 let env: &BackupEnvironment = rpcenv.as_ref();
478
479 env.finish_backup()?;
480 env.log("sucessfully finished backup");
481
482 Ok(Value::Null)
483 }
484
485 pub fn api_method_dynamic_chunk_index() -> ApiAsyncMethod {
486 ApiAsyncMethod::new(
487 dynamic_chunk_index,
488 ObjectSchema::new(r###"
489 Download the dynamic chunk index from the previous backup.
490 Simply returns an empty list if this is the first backup.
491 "###
492 )
493 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
494 )
495 }
496
497 fn dynamic_chunk_index(
498 _parts: Parts,
499 _req_body: Body,
500 param: Value,
501 _info: &ApiAsyncMethod,
502 rpcenv: Box<dyn RpcEnvironment>,
503 ) -> Result<BoxFut, Error> {
504
505 let env: &BackupEnvironment = rpcenv.as_ref();
506
507 let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
508
509 if !archive_name.ends_with(".pxar") {
510 bail!("wrong archive extension: '{}'", archive_name);
511 } else {
512 archive_name.push_str(".didx");
513 }
514
515 let empty_response = {
516 Response::builder()
517 .status(StatusCode::OK)
518 .body(Body::empty())?
519 };
520
521 let last_backup = match &env.last_backup {
522 Some(info) => info,
523 None => return Ok(Box::new(future::ok(empty_response))),
524 };
525
526 let mut path = last_backup.backup_dir.relative_path();
527 path.push(&archive_name);
528
529 let index = match env.datastore.open_dynamic_reader(path) {
530 Ok(index) => index,
531 Err(_) => {
532 env.log(format!("there is no last backup for archive '{}'", archive_name));
533 return Ok(Box::new(future::ok(empty_response)));
534 }
535 };
536
537 env.log(format!("download last backup index for archive '{}'", archive_name));
538
539 let count = index.index_count();
540 for pos in 0..count {
541 let (start, end, digest) = index.chunk_info(pos)?;
542 let size = (end - start) as u32;
543 env.register_chunk(digest, size)?;
544 }
545
546 let reader = DigestListEncoder::new(Box::new(index));
547
548 let stream = WrappedReaderStream::new(reader);
549
550 // fixme: set size, content type?
551 let response = http::Response::builder()
552 .status(200)
553 .body(Body::wrap_stream(stream))?;
554
555 Ok(Box::new(future::ok(response)))
556 }
557
558 pub fn api_method_fixed_chunk_index() -> ApiAsyncMethod {
559 ApiAsyncMethod::new(
560 fixed_chunk_index,
561 ObjectSchema::new(r###"
562 Download the fixed chunk index from the previous backup.
563 Simply returns an empty list if this is the first backup.
564 "###
565 )
566 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
567 )
568 }
569
570 fn fixed_chunk_index(
571 _parts: Parts,
572 _req_body: Body,
573 param: Value,
574 _info: &ApiAsyncMethod,
575 rpcenv: Box<dyn RpcEnvironment>,
576 ) -> Result<BoxFut, Error> {
577
578 let env: &BackupEnvironment = rpcenv.as_ref();
579
580 let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
581
582 if !archive_name.ends_with(".img") {
583 bail!("wrong archive extension: '{}'", archive_name);
584 } else {
585 archive_name.push_str(".fidx");
586 }
587
588 let empty_response = {
589 Response::builder()
590 .status(StatusCode::OK)
591 .body(Body::empty())?
592 };
593
594 let last_backup = match &env.last_backup {
595 Some(info) => info,
596 None => return Ok(Box::new(future::ok(empty_response))),
597 };
598
599 let mut path = last_backup.backup_dir.relative_path();
600 path.push(&archive_name);
601
602 let index = match env.datastore.open_fixed_reader(path) {
603 Ok(index) => index,
604 Err(_) => {
605 env.log(format!("there is no last backup for archive '{}'", archive_name));
606 return Ok(Box::new(future::ok(empty_response)));
607 }
608 };
609
610 env.log(format!("download last backup index for archive '{}'", archive_name));
611
612 let count = index.index_count();
613 let image_size = index.index_bytes();
614 for pos in 0..count {
615 let digest = index.index_digest(pos).unwrap();
616 // Note: last chunk can be smaller
617 let start = (pos*index.chunk_size) as u64;
618 let mut end = start + index.chunk_size as u64;
619 if end > image_size { end = image_size; }
620 let size = (end - start) as u32;
621 env.register_chunk(*digest, size)?;
622 }
623
624 let reader = DigestListEncoder::new(Box::new(index));
625
626 let stream = WrappedReaderStream::new(reader);
627
628 // fixme: set size, content type?
629 let response = http::Response::builder()
630 .status(200)
631 .body(Body::wrap_stream(stream))?;
632
633 Ok(Box::new(future::ok(response)))
634 }