]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup.rs
src/backup/fixed_index.rs: use correct size
[proxmox-backup.git] / src / api2 / backup.rs
1 use failure::*;
2 use lazy_static::lazy_static;
3
4 use std::sync::Arc;
5
6 use futures::*;
7 use hyper::header::{HeaderValue, UPGRADE};
8 use hyper::{Body, Response, StatusCode};
9 use hyper::http::request::Parts;
10 use chrono::{Local, TimeZone};
11
12 use serde_json::{json, Value};
13
14 use crate::tools;
15 use crate::tools::wrapped_reader_stream::*;
16 use crate::api_schema::router::*;
17 use crate::api_schema::*;
18 use crate::server::{WorkerTask, H2Service};
19 use crate::backup::*;
20 use crate::api2::types::*;
21
22 mod environment;
23 use environment::*;
24
25 mod upload_chunk;
26 use upload_chunk::*;
27
28 pub fn router() -> Router {
29 Router::new()
30 .upgrade(api_method_upgrade_backup())
31 }
32
33 pub fn api_method_upgrade_backup() -> ApiAsyncMethod {
34 ApiAsyncMethod::new(
35 upgrade_to_backup_protocol,
36 ObjectSchema::new(concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1!(), "')."))
37 .required("store", StringSchema::new("Datastore name."))
38 .required("backup-type", StringSchema::new("Backup type.")
39 .format(Arc::new(ApiStringFormat::Enum(&["vm", "ct", "host"]))))
40 .required("backup-id", StringSchema::new("Backup ID."))
41 .optional("debug", BooleanSchema::new("Enable verbose debug logging."))
42 )
43 }
44
45 fn upgrade_to_backup_protocol(
46 parts: Parts,
47 req_body: Body,
48 param: Value,
49 _info: &ApiAsyncMethod,
50 rpcenv: Box<dyn RpcEnvironment>,
51 ) -> Result<BoxFut, Error> {
52
53 let debug = param["debug"].as_bool().unwrap_or(false);
54
55 let store = tools::required_string_param(&param, "store")?.to_owned();
56 let datastore = DataStore::lookup_datastore(&store)?;
57
58 let backup_type = tools::required_string_param(&param, "backup-type")?;
59 let backup_id = tools::required_string_param(&param, "backup-id")?;
60 let backup_time = Local.timestamp(Local::now().timestamp(), 0);
61
62 let protocols = parts
63 .headers
64 .get("UPGRADE")
65 .ok_or_else(|| format_err!("missing Upgrade header"))?
66 .to_str()?;
67
68 if protocols != PROXMOX_BACKUP_PROTOCOL_ID_V1!() {
69 bail!("invalid protocol name");
70 }
71
72 if parts.version >= http::version::Version::HTTP_2 {
73 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
74 }
75
76 let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
77
78 let username = rpcenv.get_user().unwrap();
79 let env_type = rpcenv.env_type();
80
81 let backup_group = BackupGroup::new(backup_type, backup_id);
82 let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None);
83 let backup_dir = BackupDir::new_with_group(backup_group, backup_time.timestamp());
84
85 let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
86 if !is_new { bail!("backup directorty already exists."); }
87
88 WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
89 let mut env = BackupEnvironment::new(
90 env_type, username.clone(), worker.clone(), datastore, backup_dir);
91
92 env.debug = debug;
93 env.last_backup = last_backup;
94
95 env.log(format!("starting new backup on datastore '{}': {:?}", store, path));
96
97 let service = H2Service::new(env.clone(), worker.clone(), &BACKUP_ROUTER, debug);
98
99 let abort_future = worker.abort_future();
100
101 let env2 = env.clone();
102 let env3 = env.clone();
103
104 req_body
105 .on_upgrade()
106 .map_err(Error::from)
107 .and_then(move |conn| {
108 env3.debug("protocol upgrade done");
109
110 let mut http = hyper::server::conn::Http::new();
111 http.http2_only(true);
112 // increase window size: todo - find optiomal size
113 let window_size = 32*1024*1024; // max = (1 << 31) - 2
114 http.http2_initial_stream_window_size(window_size);
115 http.http2_initial_connection_window_size(window_size);
116
117 http.serve_connection(conn, service)
118 .map_err(Error::from)
119 })
120 .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); }))
121 .map_err(|(err, _)| err)
122 .and_then(move |(_result, _)| {
123 env.ensure_finished()?;
124 env.log("backup finished sucessfully");
125 Ok(())
126 })
127 .then(move |result| {
128 if let Err(err) = result {
129 match env2.ensure_finished() {
130 Ok(()) => {}, // ignore error after finish
131 _ => {
132 env2.log(format!("backup failed: {}", err));
133 env2.log("removing failed backup");
134 env2.remove_backup()?;
135 return Err(err);
136 }
137 }
138 }
139 Ok(())
140 })
141 })?;
142
143 let response = Response::builder()
144 .status(StatusCode::SWITCHING_PROTOCOLS)
145 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
146 .body(Body::empty())?;
147
148 Ok(Box::new(futures::future::ok(response)))
149 }
150
151 lazy_static!{
152 static ref BACKUP_ROUTER: Router = backup_api();
153 }
154
155 pub fn backup_api() -> Router {
156
157 let router = Router::new()
158 .subdir(
159 "blob", Router::new()
160 .upload(api_method_upload_blob())
161 )
162 .subdir(
163 "dynamic_chunk", Router::new()
164 .upload(api_method_upload_dynamic_chunk())
165 )
166 .subdir(
167 "dynamic_index", Router::new()
168 .download(api_method_dynamic_chunk_index())
169 .post(api_method_create_dynamic_index())
170 .put(api_method_dynamic_append())
171 )
172 .subdir(
173 "dynamic_close", Router::new()
174 .post(api_method_close_dynamic_index())
175 )
176 .subdir(
177 "fixed_chunk", Router::new()
178 .upload(api_method_upload_fixed_chunk())
179 )
180 .subdir(
181 "fixed_index", Router::new()
182 .download(api_method_fixed_chunk_index())
183 .post(api_method_create_fixed_index())
184 .put(api_method_fixed_append())
185 )
186 .subdir(
187 "fixed_close", Router::new()
188 .post(api_method_close_fixed_index())
189 )
190 .subdir(
191 "finish", Router::new()
192 .post(
193 ApiMethod::new(
194 finish_backup,
195 ObjectSchema::new("Mark backup as finished.")
196 )
197 )
198 )
199 .subdir(
200 "speedtest", Router::new()
201 .upload(api_method_upload_speedtest())
202 )
203 .list_subdirs();
204
205 router
206 }
207
208 pub fn api_method_create_dynamic_index() -> ApiMethod {
209 ApiMethod::new(
210 create_dynamic_index,
211 ObjectSchema::new("Create dynamic chunk index file.")
212 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
213 )
214 }
215
216 fn create_dynamic_index(
217 param: Value,
218 _info: &ApiMethod,
219 rpcenv: &mut dyn RpcEnvironment,
220 ) -> Result<Value, Error> {
221
222 let env: &BackupEnvironment = rpcenv.as_ref();
223
224 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
225
226 let mut archive_name = name.clone();
227 if !archive_name.ends_with(".pxar") {
228 bail!("wrong archive extension: '{}'", archive_name);
229 } else {
230 archive_name.push_str(".didx");
231 }
232
233 let mut path = env.backup_dir.relative_path();
234 path.push(archive_name);
235
236 let index = env.datastore.create_dynamic_writer(&path)?;
237 let wid = env.register_dynamic_writer(index, name)?;
238
239 env.log(format!("created new dynamic index {} ({:?})", wid, path));
240
241 Ok(json!(wid))
242 }
243
244 pub fn api_method_create_fixed_index() -> ApiMethod {
245 ApiMethod::new(
246 create_fixed_index,
247 ObjectSchema::new("Create fixed chunk index file.")
248 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
249 .required("size", IntegerSchema::new("File size.")
250 .minimum(1)
251 )
252 )
253 }
254
255 fn create_fixed_index(
256 param: Value,
257 _info: &ApiMethod,
258 rpcenv: &mut dyn RpcEnvironment,
259 ) -> Result<Value, Error> {
260
261 let env: &BackupEnvironment = rpcenv.as_ref();
262
263 println!("PARAM: {:?}", param);
264
265 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
266 let size = tools::required_integer_param(&param, "size")? as usize;
267
268 let mut archive_name = name.clone();
269 if !archive_name.ends_with(".img") {
270 bail!("wrong archive extension: '{}'", archive_name);
271 } else {
272 archive_name.push_str(".fidx");
273 }
274
275 let mut path = env.backup_dir.relative_path();
276 path.push(archive_name);
277
278 let chunk_size = 4096*1024; // todo: ??
279
280 let index = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
281 let wid = env.register_fixed_writer(index, name, size, chunk_size as u32)?;
282
283 env.log(format!("created new fixed index {} ({:?})", wid, path));
284
285 Ok(json!(wid))
286 }
287
288 pub fn api_method_dynamic_append() -> ApiMethod {
289 ApiMethod::new(
290 dynamic_append,
291 ObjectSchema::new("Append chunk to dynamic index writer.")
292 .required("wid", IntegerSchema::new("Dynamic writer ID.")
293 .minimum(1)
294 .maximum(256)
295 )
296 .required("digest-list", ArraySchema::new(
297 "Chunk digest list.", CHUNK_DIGEST_SCHEMA.clone())
298 )
299 .required("offset-list", ArraySchema::new(
300 "Chunk offset list.",
301 IntegerSchema::new("Corresponding chunk offsets.")
302 .minimum(0)
303 .into())
304 )
305 )
306 }
307
308 fn dynamic_append (
309 param: Value,
310 _info: &ApiMethod,
311 rpcenv: &mut dyn RpcEnvironment,
312 ) -> Result<Value, Error> {
313
314 let wid = tools::required_integer_param(&param, "wid")? as usize;
315 let digest_list = tools::required_array_param(&param, "digest-list")?;
316 let offset_list = tools::required_array_param(&param, "offset-list")?;
317
318 if offset_list.len() != digest_list.len() {
319 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
320 }
321
322 let env: &BackupEnvironment = rpcenv.as_ref();
323
324 env.debug(format!("dynamic_append {} chunks", digest_list.len()));
325
326 for (i, item) in digest_list.iter().enumerate() {
327 let digest_str = item.as_str().unwrap();
328 let digest = proxmox::tools::hex_to_digest(digest_str)?;
329 let offset = offset_list[i].as_u64().unwrap();
330 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
331
332 env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
333
334 env.debug(format!("sucessfully added chunk {} to dynamic index {} (offset {}, size {})", digest_str, wid, offset, size));
335 }
336
337 Ok(Value::Null)
338 }
339
340 pub fn api_method_fixed_append() -> ApiMethod {
341 ApiMethod::new(
342 fixed_append,
343 ObjectSchema::new("Append chunk to fixed index writer.")
344 .required("wid", IntegerSchema::new("Fixed writer ID.")
345 .minimum(1)
346 .maximum(256)
347 )
348 .required("digest-list", ArraySchema::new(
349 "Chunk digest list.", CHUNK_DIGEST_SCHEMA.clone())
350 )
351 .required("offset-list", ArraySchema::new(
352 "Chunk offset list.",
353 IntegerSchema::new("Corresponding chunk offsets.")
354 .minimum(0)
355 .into())
356 )
357 )
358 }
359
360 fn fixed_append (
361 param: Value,
362 _info: &ApiMethod,
363 rpcenv: &mut dyn RpcEnvironment,
364 ) -> Result<Value, Error> {
365
366 let wid = tools::required_integer_param(&param, "wid")? as usize;
367 let digest_list = tools::required_array_param(&param, "digest-list")?;
368 let offset_list = tools::required_array_param(&param, "offset-list")?;
369
370 if offset_list.len() != digest_list.len() {
371 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
372 }
373
374 let env: &BackupEnvironment = rpcenv.as_ref();
375
376 env.debug(format!("fixed_append {} chunks", digest_list.len()));
377
378 for (i, item) in digest_list.iter().enumerate() {
379 let digest_str = item.as_str().unwrap();
380 let digest = proxmox::tools::hex_to_digest(digest_str)?;
381 let offset = offset_list[i].as_u64().unwrap();
382 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
383
384 env.fixed_writer_append_chunk(wid, offset, size, &digest)?;
385
386 env.debug(format!("sucessfully added chunk {} to fixed index {} (offset {}, size {})", digest_str, wid, offset, size));
387 }
388
389 Ok(Value::Null)
390 }
391
392 pub fn api_method_close_dynamic_index() -> ApiMethod {
393 ApiMethod::new(
394 close_dynamic_index,
395 ObjectSchema::new("Close dynamic index writer.")
396 .required("wid", IntegerSchema::new("Dynamic writer ID.")
397 .minimum(1)
398 .maximum(256)
399 )
400 .required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
401 .minimum(1)
402 )
403 .required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.")
404 .minimum(1)
405 )
406 )
407 }
408
409 fn close_dynamic_index (
410 param: Value,
411 _info: &ApiMethod,
412 rpcenv: &mut dyn RpcEnvironment,
413 ) -> Result<Value, Error> {
414
415 let wid = tools::required_integer_param(&param, "wid")? as usize;
416 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
417 let size = tools::required_integer_param(&param, "size")? as u64;
418
419 let env: &BackupEnvironment = rpcenv.as_ref();
420
421 env.dynamic_writer_close(wid, chunk_count, size)?;
422
423 env.log(format!("sucessfully closed dynamic index {}", wid));
424
425 Ok(Value::Null)
426 }
427
428 pub fn api_method_close_fixed_index() -> ApiMethod {
429 ApiMethod::new(
430 close_fixed_index,
431 ObjectSchema::new("Close fixed index writer.")
432 .required("wid", IntegerSchema::new("Fixed writer ID.")
433 .minimum(1)
434 .maximum(256)
435 )
436 .required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
437 .minimum(1)
438 )
439 .required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.")
440 .minimum(1)
441 )
442 )
443 }
444
445 fn close_fixed_index (
446 param: Value,
447 _info: &ApiMethod,
448 rpcenv: &mut dyn RpcEnvironment,
449 ) -> Result<Value, Error> {
450
451 let wid = tools::required_integer_param(&param, "wid")? as usize;
452 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
453 let size = tools::required_integer_param(&param, "size")? as u64;
454
455 let env: &BackupEnvironment = rpcenv.as_ref();
456
457 env.fixed_writer_close(wid, chunk_count, size)?;
458
459 env.log(format!("sucessfully closed fixed index {}", wid));
460
461 Ok(Value::Null)
462 }
463
464 fn finish_backup (
465 _param: Value,
466 _info: &ApiMethod,
467 rpcenv: &mut dyn RpcEnvironment,
468 ) -> Result<Value, Error> {
469
470 let env: &BackupEnvironment = rpcenv.as_ref();
471
472 env.finish_backup()?;
473 env.log("sucessfully finished backup");
474
475 Ok(Value::Null)
476 }
477
478 pub fn api_method_dynamic_chunk_index() -> ApiAsyncMethod {
479 ApiAsyncMethod::new(
480 dynamic_chunk_index,
481 ObjectSchema::new(r###"
482 Download the dynamic chunk index from the previous backup.
483 Simply returns an empty list if this is the first backup.
484 "###
485 )
486 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
487 )
488 }
489
490 fn dynamic_chunk_index(
491 _parts: Parts,
492 _req_body: Body,
493 param: Value,
494 _info: &ApiAsyncMethod,
495 rpcenv: Box<dyn RpcEnvironment>,
496 ) -> Result<BoxFut, Error> {
497
498 let env: &BackupEnvironment = rpcenv.as_ref();
499
500 let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
501
502 if !archive_name.ends_with(".pxar") {
503 bail!("wrong archive extension: '{}'", archive_name);
504 } else {
505 archive_name.push_str(".didx");
506 }
507
508 let empty_response = {
509 Response::builder()
510 .status(StatusCode::OK)
511 .body(Body::empty())?
512 };
513
514 let last_backup = match &env.last_backup {
515 Some(info) => info,
516 None => return Ok(Box::new(future::ok(empty_response))),
517 };
518
519 let mut path = last_backup.backup_dir.relative_path();
520 path.push(&archive_name);
521
522 let index = match env.datastore.open_dynamic_reader(path) {
523 Ok(index) => index,
524 Err(_) => {
525 env.log(format!("there is no last backup for archive '{}'", archive_name));
526 return Ok(Box::new(future::ok(empty_response)));
527 }
528 };
529
530 env.log(format!("download last backup index for archive '{}'", archive_name));
531
532 let count = index.index_count();
533 for pos in 0..count {
534 let (start, end, digest) = index.chunk_info(pos)?;
535 let size = (end - start) as u32;
536 env.register_chunk(digest, size)?;
537 }
538
539 let reader = DigestListEncoder::new(Box::new(index));
540
541 let stream = WrappedReaderStream::new(reader);
542
543 // fixme: set size, content type?
544 let response = http::Response::builder()
545 .status(200)
546 .body(Body::wrap_stream(stream))?;
547
548 Ok(Box::new(future::ok(response)))
549 }
550
551 pub fn api_method_fixed_chunk_index() -> ApiAsyncMethod {
552 ApiAsyncMethod::new(
553 fixed_chunk_index,
554 ObjectSchema::new(r###"
555 Download the fixed chunk index from the previous backup.
556 Simply returns an empty list if this is the first backup.
557 "###
558 )
559 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
560 )
561 }
562
563 fn fixed_chunk_index(
564 _parts: Parts,
565 _req_body: Body,
566 param: Value,
567 _info: &ApiAsyncMethod,
568 rpcenv: Box<dyn RpcEnvironment>,
569 ) -> Result<BoxFut, Error> {
570
571 let env: &BackupEnvironment = rpcenv.as_ref();
572
573 let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
574
575 if !archive_name.ends_with(".img") {
576 bail!("wrong archive extension: '{}'", archive_name);
577 } else {
578 archive_name.push_str(".fidx");
579 }
580
581 let empty_response = {
582 Response::builder()
583 .status(StatusCode::OK)
584 .body(Body::empty())?
585 };
586
587 let last_backup = match &env.last_backup {
588 Some(info) => info,
589 None => return Ok(Box::new(future::ok(empty_response))),
590 };
591
592 let mut path = last_backup.backup_dir.relative_path();
593 path.push(&archive_name);
594
595 let index = match env.datastore.open_fixed_reader(path) {
596 Ok(index) => index,
597 Err(_) => {
598 env.log(format!("there is no last backup for archive '{}'", archive_name));
599 return Ok(Box::new(future::ok(empty_response)));
600 }
601 };
602
603 env.log(format!("download last backup index for archive '{}'", archive_name));
604
605 let count = index.index_count();
606 let image_size = index.index_bytes();
607 for pos in 0..count {
608 let digest = index.index_digest(pos).unwrap();
609 // Note: last chunk can be smaller
610 let start = (pos*index.chunk_size) as u64;
611 let mut end = start + index.chunk_size as u64;
612 if end > image_size { end = image_size; }
613 let size = (end - start) as u32;
614 env.register_chunk(*digest, size)?;
615 }
616
617 let reader = DigestListEncoder::new(Box::new(index));
618
619 let stream = WrappedReaderStream::new(reader);
620
621 // fixme: set size, content type?
622 let response = http::Response::builder()
623 .status(200)
624 .body(Body::wrap_stream(stream))?;
625
626 Ok(Box::new(future::ok(response)))
627 }