]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup.rs
src/api2.rs: move backup api to /backup
[proxmox-backup.git] / src / api2 / backup.rs
1 use failure::*;
2
3 use std::sync::Arc;
4
5 use futures::*;
6 use hyper::header::{HeaderValue, UPGRADE};
7 use hyper::{Body, Response, StatusCode};
8 use hyper::http::request::Parts;
9 use chrono::{Local, TimeZone};
10
11 use serde_json::{json, Value};
12
13 use crate::tools;
14 use crate::tools::wrapped_reader_stream::*;
15 use crate::api_schema::router::*;
16 use crate::api_schema::*;
17 use crate::server::WorkerTask;
18 use crate::backup::*;
19
20 mod environment;
21 use environment::*;
22
23 mod service;
24 use service::*;
25
26 mod upload_chunk;
27 use upload_chunk::*;
28
29 pub fn router() -> Router {
30 Router::new()
31 .upgrade(api_method_upgrade_backup())
32 }
33
34 pub fn api_method_upgrade_backup() -> ApiAsyncMethod {
35 ApiAsyncMethod::new(
36 upgrade_to_backup_protocol,
37 ObjectSchema::new("Upgraded to backup protocol.")
38 .required("store", StringSchema::new("Datastore name."))
39 .required("backup-type", StringSchema::new("Backup type.")
40 .format(Arc::new(ApiStringFormat::Enum(&["vm", "ct", "host"]))))
41 .required("backup-id", StringSchema::new("Backup ID."))
42 .optional("debug", BooleanSchema::new("Enable verbose debug logging."))
43 )
44 }
45
46 fn upgrade_to_backup_protocol(
47 parts: Parts,
48 req_body: Body,
49 param: Value,
50 _info: &ApiAsyncMethod,
51 rpcenv: Box<RpcEnvironment>,
52 ) -> Result<BoxFut, Error> {
53
54 static PROXMOX_BACKUP_PROTOCOL_ID: &str = "proxmox-backup-protocol-h2";
55
56 let debug = param["debug"].as_bool().unwrap_or(false);
57
58 let store = tools::required_string_param(&param, "store")?.to_owned();
59 let datastore = DataStore::lookup_datastore(&store)?;
60
61 let backup_type = tools::required_string_param(&param, "backup-type")?;
62 let backup_id = tools::required_string_param(&param, "backup-id")?;
63 let backup_time = Local.timestamp(Local::now().timestamp(), 0);
64
65 let protocols = parts
66 .headers
67 .get("UPGRADE")
68 .ok_or_else(|| format_err!("missing Upgrade header"))?
69 .to_str()?;
70
71 if protocols != PROXMOX_BACKUP_PROTOCOL_ID {
72 bail!("invalid protocol name");
73 }
74
75 if parts.version >= http::version::Version::HTTP_2 {
76 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
77 }
78
79 let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
80
81 let username = rpcenv.get_user().unwrap();
82 let env_type = rpcenv.env_type();
83
84 let backup_group = BackupGroup::new(backup_type, backup_id);
85 let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None);
86 let backup_dir = BackupDir::new_with_group(backup_group, backup_time.timestamp());
87
88 let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
89 if !is_new { bail!("backup directorty already exists."); }
90
91 WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
92 let mut env = BackupEnvironment::new(
93 env_type, username.clone(), worker.clone(), datastore, backup_dir);
94
95 env.debug = debug;
96 env.last_backup = last_backup;
97
98 env.log(format!("starting new backup on datastore '{}': {:?}", store, path));
99
100 let service = BackupService::new(env.clone(), worker.clone(), debug);
101
102 let abort_future = worker.abort_future();
103
104 let env2 = env.clone();
105 let env3 = env.clone();
106
107 req_body
108 .on_upgrade()
109 .map_err(Error::from)
110 .and_then(move |conn| {
111 env3.debug("protocol upgrade done");
112
113 let mut http = hyper::server::conn::Http::new();
114 http.http2_only(true);
115 // increase window size: todo - find optiomal size
116 let window_size = 32*1024*1024; // max = (1 << 31) - 2
117 http.http2_initial_stream_window_size(window_size);
118 http.http2_initial_connection_window_size(window_size);
119
120 http.serve_connection(conn, service)
121 .map_err(Error::from)
122 })
123 .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); }))
124 .map_err(|(err, _)| err)
125 .and_then(move |(_result, _)| {
126 env.ensure_finished()?;
127 env.log("backup finished sucessfully");
128 Ok(())
129 })
130 .then(move |result| {
131 if let Err(err) = result {
132 match env2.ensure_finished() {
133 Ok(()) => {}, // ignore error after finish
134 _ => {
135 env2.log(format!("backup failed: {}", err));
136 env2.log("removing failed backup");
137 env2.remove_backup()?;
138 return Err(err);
139 }
140 }
141 }
142 Ok(())
143 })
144 })?;
145
146 let response = Response::builder()
147 .status(StatusCode::SWITCHING_PROTOCOLS)
148 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID))
149 .body(Body::empty())?;
150
151 Ok(Box::new(futures::future::ok(response)))
152 }
153
154 pub fn backup_api() -> Router {
155
156 let router = Router::new()
157 .subdir(
158 "config", Router::new()
159 .upload(api_method_upload_config())
160 )
161 .subdir(
162 "dynamic_chunk", Router::new()
163 .upload(api_method_upload_dynamic_chunk())
164 )
165 .subdir(
166 "dynamic_index", Router::new()
167 .download(api_method_dynamic_chunk_index())
168 .post(api_method_create_dynamic_index())
169 .put(api_method_dynamic_append())
170 )
171 .subdir(
172 "dynamic_close", Router::new()
173 .post(api_method_close_dynamic_index())
174 )
175 .subdir(
176 "fixed_chunk", Router::new()
177 .upload(api_method_upload_fixed_chunk())
178 )
179 .subdir(
180 "fixed_index", Router::new()
181 .download(api_method_fixed_chunk_index())
182 .post(api_method_create_fixed_index())
183 .put(api_method_fixed_append())
184 )
185 .subdir(
186 "fixed_close", Router::new()
187 .post(api_method_close_fixed_index())
188 )
189 .subdir(
190 "finish", Router::new()
191 .post(
192 ApiMethod::new(
193 finish_backup,
194 ObjectSchema::new("Mark backup as finished.")
195 )
196 )
197 )
198 .subdir(
199 "speedtest", Router::new()
200 .upload(api_method_upload_speedtest())
201 )
202 .list_subdirs();
203
204 router
205 }
206
207 pub fn api_method_create_dynamic_index() -> ApiMethod {
208 ApiMethod::new(
209 create_dynamic_index,
210 ObjectSchema::new("Create dynamic chunk index file.")
211 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
212 )
213 }
214
215 fn create_dynamic_index(
216 param: Value,
217 _info: &ApiMethod,
218 rpcenv: &mut RpcEnvironment,
219 ) -> Result<Value, Error> {
220
221 let env: &BackupEnvironment = rpcenv.as_ref();
222
223 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
224
225 let mut archive_name = name.clone();
226 if !archive_name.ends_with(".pxar") {
227 bail!("wrong archive extension: '{}'", archive_name);
228 } else {
229 archive_name.push_str(".didx");
230 }
231
232 let mut path = env.backup_dir.relative_path();
233 path.push(archive_name);
234
235 let index = env.datastore.create_dynamic_writer(&path)?;
236 let wid = env.register_dynamic_writer(index, name)?;
237
238 env.log(format!("created new dynamic index {} ({:?})", wid, path));
239
240 Ok(json!(wid))
241 }
242
243 pub fn api_method_create_fixed_index() -> ApiMethod {
244 ApiMethod::new(
245 create_fixed_index,
246 ObjectSchema::new("Create fixed chunk index file.")
247 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
248 .required("size", IntegerSchema::new("File size.")
249 .minimum(1)
250 )
251 )
252 }
253
254 fn create_fixed_index(
255 param: Value,
256 _info: &ApiMethod,
257 rpcenv: &mut RpcEnvironment,
258 ) -> Result<Value, Error> {
259
260 let env: &BackupEnvironment = rpcenv.as_ref();
261
262 println!("PARAM: {:?}", param);
263
264 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
265 let size = tools::required_integer_param(&param, "size")? as usize;
266
267 let mut archive_name = name.clone();
268 if !archive_name.ends_with(".img") {
269 bail!("wrong archive extension: '{}'", archive_name);
270 } else {
271 archive_name.push_str(".fidx");
272 }
273
274 let mut path = env.backup_dir.relative_path();
275 path.push(archive_name);
276
277 let chunk_size = 4096*1024; // todo: ??
278
279 let index = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
280 let wid = env.register_fixed_writer(index, name, size, chunk_size as u32)?;
281
282 env.log(format!("created new fixed index {} ({:?})", wid, path));
283
284 Ok(json!(wid))
285 }
286
287 pub fn api_method_dynamic_append() -> ApiMethod {
288 ApiMethod::new(
289 dynamic_append,
290 ObjectSchema::new("Append chunk to dynamic index writer.")
291 .required("wid", IntegerSchema::new("Dynamic writer ID.")
292 .minimum(1)
293 .maximum(256)
294 )
295 .required("digest-list", ArraySchema::new(
296 "Chunk digest list.",
297 StringSchema::new("Chunk digest.").into())
298 )
299 .required("offset-list", ArraySchema::new(
300 "Chunk offset list.",
301 IntegerSchema::new("Corresponding chunk offsets.")
302 .minimum(0)
303 .into())
304 )
305 )
306 }
307
308 fn dynamic_append (
309 param: Value,
310 _info: &ApiMethod,
311 rpcenv: &mut RpcEnvironment,
312 ) -> Result<Value, Error> {
313
314 let wid = tools::required_integer_param(&param, "wid")? as usize;
315 let digest_list = tools::required_array_param(&param, "digest-list")?;
316 let offset_list = tools::required_array_param(&param, "offset-list")?;
317
318 if offset_list.len() != digest_list.len() {
319 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
320 }
321
322 let env: &BackupEnvironment = rpcenv.as_ref();
323
324 env.debug(format!("dynamic_append {} chunks", digest_list.len()));
325
326 for (i, item) in digest_list.iter().enumerate() {
327 let digest_str = item.as_str().unwrap();
328 let digest = crate::tools::hex_to_digest(digest_str)?;
329 let offset = offset_list[i].as_u64().unwrap();
330 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
331
332 env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
333
334 env.debug(format!("sucessfully added chunk {} to dynamic index {} (offset {}, size {})", digest_str, wid, offset, size));
335 }
336
337 Ok(Value::Null)
338 }
339
340 pub fn api_method_fixed_append() -> ApiMethod {
341 ApiMethod::new(
342 fixed_append,
343 ObjectSchema::new("Append chunk to fixed index writer.")
344 .required("wid", IntegerSchema::new("Fixed writer ID.")
345 .minimum(1)
346 .maximum(256)
347 )
348 .required("digest-list", ArraySchema::new(
349 "Chunk digest list.",
350 StringSchema::new("Chunk digest.").into())
351 )
352 .required("offset-list", ArraySchema::new(
353 "Chunk offset list.",
354 IntegerSchema::new("Corresponding chunk offsets.")
355 .minimum(0)
356 .into())
357 )
358 )
359 }
360
361 fn fixed_append (
362 param: Value,
363 _info: &ApiMethod,
364 rpcenv: &mut RpcEnvironment,
365 ) -> Result<Value, Error> {
366
367 let wid = tools::required_integer_param(&param, "wid")? as usize;
368 let digest_list = tools::required_array_param(&param, "digest-list")?;
369 let offset_list = tools::required_array_param(&param, "offset-list")?;
370
371 if offset_list.len() != digest_list.len() {
372 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
373 }
374
375 let env: &BackupEnvironment = rpcenv.as_ref();
376
377 env.debug(format!("fixed_append {} chunks", digest_list.len()));
378
379 for (i, item) in digest_list.iter().enumerate() {
380 let digest_str = item.as_str().unwrap();
381 let digest = crate::tools::hex_to_digest(digest_str)?;
382 let offset = offset_list[i].as_u64().unwrap();
383 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
384
385 env.fixed_writer_append_chunk(wid, offset, size, &digest)?;
386
387 env.debug(format!("sucessfully added chunk {} to fixed index {} (offset {}, size {})", digest_str, wid, offset, size));
388 }
389
390 Ok(Value::Null)
391 }
392
393 pub fn api_method_close_dynamic_index() -> ApiMethod {
394 ApiMethod::new(
395 close_dynamic_index,
396 ObjectSchema::new("Close dynamic index writer.")
397 .required("wid", IntegerSchema::new("Dynamic writer ID.")
398 .minimum(1)
399 .maximum(256)
400 )
401 .required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
402 .minimum(1)
403 )
404 .required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.")
405 .minimum(1)
406 )
407 )
408 }
409
410 fn close_dynamic_index (
411 param: Value,
412 _info: &ApiMethod,
413 rpcenv: &mut RpcEnvironment,
414 ) -> Result<Value, Error> {
415
416 let wid = tools::required_integer_param(&param, "wid")? as usize;
417 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
418 let size = tools::required_integer_param(&param, "size")? as u64;
419
420 let env: &BackupEnvironment = rpcenv.as_ref();
421
422 env.dynamic_writer_close(wid, chunk_count, size)?;
423
424 env.log(format!("sucessfully closed dynamic index {}", wid));
425
426 Ok(Value::Null)
427 }
428
429 pub fn api_method_close_fixed_index() -> ApiMethod {
430 ApiMethod::new(
431 close_fixed_index,
432 ObjectSchema::new("Close fixed index writer.")
433 .required("wid", IntegerSchema::new("Fixed writer ID.")
434 .minimum(1)
435 .maximum(256)
436 )
437 .required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
438 .minimum(1)
439 )
440 .required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.")
441 .minimum(1)
442 )
443 )
444 }
445
446 fn close_fixed_index (
447 param: Value,
448 _info: &ApiMethod,
449 rpcenv: &mut RpcEnvironment,
450 ) -> Result<Value, Error> {
451
452 let wid = tools::required_integer_param(&param, "wid")? as usize;
453 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
454 let size = tools::required_integer_param(&param, "size")? as u64;
455
456 let env: &BackupEnvironment = rpcenv.as_ref();
457
458 env.fixed_writer_close(wid, chunk_count, size)?;
459
460 env.log(format!("sucessfully closed fixed index {}", wid));
461
462 Ok(Value::Null)
463 }
464
465 fn finish_backup (
466 _param: Value,
467 _info: &ApiMethod,
468 rpcenv: &mut RpcEnvironment,
469 ) -> Result<Value, Error> {
470
471 let env: &BackupEnvironment = rpcenv.as_ref();
472
473 env.finish_backup()?;
474 env.log("sucessfully finished backup");
475
476 Ok(Value::Null)
477 }
478
479 pub fn api_method_dynamic_chunk_index() -> ApiAsyncMethod {
480 ApiAsyncMethod::new(
481 dynamic_chunk_index,
482 ObjectSchema::new(r###"
483 Download the dynamic chunk index from the previous backup.
484 Simply returns an empty list if this is the first backup.
485 "###
486 )
487 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
488 )
489 }
490
491 fn dynamic_chunk_index(
492 _parts: Parts,
493 _req_body: Body,
494 param: Value,
495 _info: &ApiAsyncMethod,
496 rpcenv: Box<RpcEnvironment>,
497 ) -> Result<BoxFut, Error> {
498
499 let env: &BackupEnvironment = rpcenv.as_ref();
500
501 let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
502
503 if !archive_name.ends_with(".pxar") {
504 bail!("wrong archive extension: '{}'", archive_name);
505 } else {
506 archive_name.push_str(".didx");
507 }
508
509 let empty_response = {
510 Response::builder()
511 .status(StatusCode::OK)
512 .body(Body::empty())?
513 };
514
515 let last_backup = match &env.last_backup {
516 Some(info) => info,
517 None => return Ok(Box::new(future::ok(empty_response))),
518 };
519
520 let mut path = last_backup.backup_dir.relative_path();
521 path.push(&archive_name);
522
523 let index = match env.datastore.open_dynamic_reader(path) {
524 Ok(index) => index,
525 Err(_) => {
526 env.log(format!("there is no last backup for archive '{}'", archive_name));
527 return Ok(Box::new(future::ok(empty_response)));
528 }
529 };
530
531 env.log(format!("download last backup index for archive '{}'", archive_name));
532
533 let count = index.index_count();
534 for pos in 0..count {
535 let (start, end, digest) = index.chunk_info(pos)?;
536 let size = (end - start) as u32;
537 env.register_chunk(digest, size)?;
538 }
539
540 let reader = DigestListEncoder::new(Box::new(index));
541
542 let stream = WrappedReaderStream::new(reader);
543
544 // fixme: set size, content type?
545 let response = http::Response::builder()
546 .status(200)
547 .body(Body::wrap_stream(stream))?;
548
549 Ok(Box::new(future::ok(response)))
550 }
551
552 pub fn api_method_fixed_chunk_index() -> ApiAsyncMethod {
553 ApiAsyncMethod::new(
554 fixed_chunk_index,
555 ObjectSchema::new(r###"
556 Download the fixed chunk index from the previous backup.
557 Simply returns an empty list if this is the first backup.
558 "###
559 )
560 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
561 )
562 }
563
564 fn fixed_chunk_index(
565 _parts: Parts,
566 _req_body: Body,
567 param: Value,
568 _info: &ApiAsyncMethod,
569 rpcenv: Box<RpcEnvironment>,
570 ) -> Result<BoxFut, Error> {
571
572 let env: &BackupEnvironment = rpcenv.as_ref();
573
574 let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
575
576 if !archive_name.ends_with(".img") {
577 bail!("wrong archive extension: '{}'", archive_name);
578 } else {
579 archive_name.push_str(".fidx");
580 }
581
582 let empty_response = {
583 Response::builder()
584 .status(StatusCode::OK)
585 .body(Body::empty())?
586 };
587
588 let last_backup = match &env.last_backup {
589 Some(info) => info,
590 None => return Ok(Box::new(future::ok(empty_response))),
591 };
592
593 let mut path = last_backup.backup_dir.relative_path();
594 path.push(&archive_name);
595
596 let index = match env.datastore.open_fixed_reader(path) {
597 Ok(index) => index,
598 Err(_) => {
599 env.log(format!("there is no last backup for archive '{}'", archive_name));
600 return Ok(Box::new(future::ok(empty_response)));
601 }
602 };
603
604 env.log(format!("download last backup index for archive '{}'", archive_name));
605
606 let count = index.index_count();
607 for pos in 0..count {
608 let digest = index.index_digest(pos).unwrap();
609 let size = index.chunk_size as u32;
610 env.register_chunk(*digest, size)?;
611 }
612
613 let reader = DigestListEncoder::new(Box::new(index));
614
615 let stream = WrappedReaderStream::new(reader);
616
617 // fixme: set size, content type?
618 let response = http::Response::builder()
619 .status(200)
620 .body(Body::wrap_stream(stream))?;
621
622 Ok(Box::new(future::ok(response)))
623 }