]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/admin/datastore/backup.rs
src/backup/fixed_index.rs: remove ChunkStat from struct
[proxmox-backup.git] / src / api2 / admin / datastore / backup.rs
CommitLineData
152764ec
DM
1use failure::*;
2
72375ce6 3use std::sync::Arc;
92ac375a
DM
4
5use futures::*;
152764ec 6use hyper::header::{HeaderValue, UPGRADE};
bd1507c4 7use hyper::{Body, Response, StatusCode};
152764ec 8use hyper::http::request::Parts;
0aadd40b 9use chrono::{Local, TimeZone};
152764ec 10
f9578f3c 11use serde_json::{json, Value};
152764ec 12
92ac375a 13use crate::tools;
d3611366 14use crate::tools::wrapped_reader_stream::*;
152764ec
DM
15use crate::api_schema::router::*;
16use crate::api_schema::*;
58c8d7d9 17use crate::server::WorkerTask;
21ee7912 18use crate::backup::*;
152764ec 19
d95ced64
DM
20mod environment;
21use environment::*;
22
bd1507c4
DM
23mod service;
24use service::*;
25
21ee7912
DM
26mod upload_chunk;
27use upload_chunk::*;
28
29
ca60c371 30pub fn api_method_upgrade_backup() -> ApiAsyncMethod {
152764ec 31 ApiAsyncMethod::new(
0aadd40b 32 upgrade_to_backup_protocol,
ca60c371 33 ObjectSchema::new("Upgraded to backup protocol.")
0aadd40b
DM
34 .required("store", StringSchema::new("Datastore name."))
35 .required("backup-type", StringSchema::new("Backup type.")
cc84a830 36 .format(Arc::new(ApiStringFormat::Enum(&["vm", "ct", "host"]))))
0aadd40b 37 .required("backup-id", StringSchema::new("Backup ID."))
152764ec
DM
38 )
39}
40
0aadd40b 41fn upgrade_to_backup_protocol(
152764ec
DM
42 parts: Parts,
43 req_body: Body,
0aadd40b 44 param: Value,
152764ec 45 _info: &ApiAsyncMethod,
b4b63e52 46 rpcenv: Box<RpcEnvironment>,
152764ec 47) -> Result<BoxFut, Error> {
0aadd40b 48
bd1507c4
DM
49 static PROXMOX_BACKUP_PROTOCOL_ID: &str = "proxmox-backup-protocol-h2";
50
bb105f9d
DM
51 let store = tools::required_string_param(&param, "store")?.to_owned();
52 let datastore = DataStore::lookup_datastore(&store)?;
21ee7912 53
0aadd40b
DM
54 let backup_type = tools::required_string_param(&param, "backup-type")?;
55 let backup_id = tools::required_string_param(&param, "backup-id")?;
f9578f3c 56 let backup_time = Local.timestamp(Local::now().timestamp(), 0);
152764ec
DM
57
58 let protocols = parts
59 .headers
60 .get("UPGRADE")
61 .ok_or_else(|| format_err!("missing Upgrade header"))?
62 .to_str()?;
63
0aadd40b 64 if protocols != PROXMOX_BACKUP_PROTOCOL_ID {
152764ec
DM
65 bail!("invalid protocol name");
66 }
67
96e95fc1
DM
68 if parts.version >= http::version::Version::HTTP_2 {
69 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
70 }
71
0aadd40b 72 let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
d9bd06ea 73
58c8d7d9
DM
74 let username = rpcenv.get_user().unwrap();
75 let env_type = rpcenv.env_type();
92ac375a 76
51a4f63f 77 let backup_group = BackupGroup::new(backup_type, backup_id);
fbb798f6 78 let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None);
51a4f63f 79 let backup_dir = BackupDir::new_with_group(backup_group, backup_time.timestamp());
f9578f3c 80
bb105f9d 81 let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
f9578f3c
DM
82 if !is_new { bail!("backup directorty already exists."); }
83
0aadd40b 84 WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
bb105f9d 85 let mut env = BackupEnvironment::new(
6b95c7df 86 env_type, username.clone(), worker.clone(), datastore, backup_dir);
b02a52e3 87
bb105f9d 88 env.last_backup = last_backup;
b02a52e3 89
bb105f9d
DM
90 env.log(format!("starting new backup on datastore '{}': {:?}", store, path));
91
372724af 92 let service = BackupService::new(env.clone(), worker.clone());
72375ce6 93
a66ab8ae
DM
94 let abort_future = worker.abort_future();
95
372724af 96 let env2 = env.clone();
bb105f9d 97
152764ec
DM
98 req_body
99 .on_upgrade()
92ac375a 100 .map_err(Error::from)
152764ec 101 .and_then(move |conn| {
d9bd06ea 102 worker.log("upgrade done");
92ac375a
DM
103
104 let mut http = hyper::server::conn::Http::new();
105 http.http2_only(true);
adec8ea2 106 // increase window size: todo - find optiomal size
771953f9
DM
107 let window_size = 32*1024*1024; // max = (1 << 31) - 2
108 http.http2_initial_stream_window_size(window_size);
109 http.http2_initial_connection_window_size(window_size);
92ac375a 110
d9bd06ea
DM
111 http.serve_connection(conn, service)
112 .map_err(Error::from)
0aadd40b 113 })
a66ab8ae 114 .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); }))
372724af 115 .map_err(|(err, _)| err)
bb105f9d 116 .and_then(move |(_result, _)| {
372724af
DM
117 env.ensure_finished()?;
118 env.log("backup finished sucessfully");
bb105f9d
DM
119 Ok(())
120 })
372724af
DM
121 .then(move |result| {
122 if let Err(err) = result {
a9584932 123 match env2.ensure_finished() {
e3d525fe 124 Ok(()) => {}, // ignore error after finish
a9584932
DM
125 _ => {
126 env2.log(format!("backup failed: {}", err));
127 env2.log("removing failed backup");
128 env2.remove_backup()?;
129 return Err(err);
130 }
131 }
372724af
DM
132 }
133 Ok(())
bb105f9d 134 })
090ac9f7
DM
135 })?;
136
137 let response = Response::builder()
138 .status(StatusCode::SWITCHING_PROTOCOLS)
0aadd40b 139 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID))
090ac9f7
DM
140 .body(Body::empty())?;
141
142 Ok(Box::new(futures::future::ok(response)))
152764ec 143}
92ac375a
DM
144
145fn backup_api() -> Router {
146
92ac375a 147 let router = Router::new()
f9578f3c 148 .subdir(
a1e7cff3
DM
149 "upload_chunk", Router::new()
150 .upload(api_method_upload_chunk())
f9578f3c
DM
151 )
152 .subdir(
153 "dynamic_index", Router::new()
d3611366 154 .download(api_method_dynamic_chunk_index())
f9578f3c 155 .post(api_method_create_dynamic_index())
82ab7230 156 .put(api_method_dynamic_append())
f9578f3c 157 )
a2077252
DM
158 .subdir(
159 "dynamic_close", Router::new()
160 .post(api_method_close_dynamic_index())
161 )
a42fa400
DM
162 .subdir(
163 "fixed_index", Router::new()
164 .download(api_method_fixed_chunk_index())
165 .post(api_method_create_fixed_index())
166 .put(api_method_fixed_append())
167 )
168 .subdir(
169 "fixed_close", Router::new()
170 .post(api_method_close_fixed_index())
171 )
372724af
DM
172 .subdir(
173 "finish", Router::new()
a55fcd74 174 .post(
372724af
DM
175 ApiMethod::new(
176 finish_backup,
177 ObjectSchema::new("Mark backup as finished.")
178 )
179 )
180 )
adec8ea2
DM
181 .subdir(
182 "speedtest", Router::new()
183 .upload(api_method_upload_speedtest())
184 )
92ac375a
DM
185 .list_subdirs();
186
187 router
188}
189
f9578f3c
DM
190pub fn api_method_create_dynamic_index() -> ApiMethod {
191 ApiMethod::new(
192 create_dynamic_index,
193 ObjectSchema::new("Create dynamic chunk index file.")
4e93f8c1 194 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
f9578f3c
DM
195 )
196}
197
198fn create_dynamic_index(
199 param: Value,
200 _info: &ApiMethod,
201 rpcenv: &mut RpcEnvironment,
202) -> Result<Value, Error> {
203
204 let env: &BackupEnvironment = rpcenv.as_ref();
f9578f3c 205
8bea85b4 206 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
f9578f3c 207
8bea85b4 208 let mut archive_name = name.clone();
f9578f3c 209 if !archive_name.ends_with(".pxar") {
a42fa400 210 bail!("wrong archive extension: '{}'", archive_name);
f9578f3c
DM
211 } else {
212 archive_name.push_str(".didx");
213 }
214
6b95c7df 215 let mut path = env.backup_dir.relative_path();
f9578f3c
DM
216 path.push(archive_name);
217
218 let chunk_size = 4096*1024; // todo: ??
219
bb105f9d 220 let index = env.datastore.create_dynamic_writer(&path, chunk_size)?;
8bea85b4 221 let wid = env.register_dynamic_writer(index, name)?;
f9578f3c 222
bb105f9d 223 env.log(format!("created new dynamic index {} ({:?})", wid, path));
f9578f3c 224
bb105f9d 225 Ok(json!(wid))
f9578f3c
DM
226}
227
a42fa400
DM
228pub fn api_method_create_fixed_index() -> ApiMethod {
229 ApiMethod::new(
230 create_fixed_index,
231 ObjectSchema::new("Create fixed chunk index file.")
232 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
233 .required("size", IntegerSchema::new("File size.")
234 .minimum(1)
235 )
236 )
237}
238
239fn create_fixed_index(
240 param: Value,
241 _info: &ApiMethod,
242 rpcenv: &mut RpcEnvironment,
243) -> Result<Value, Error> {
244
245 let env: &BackupEnvironment = rpcenv.as_ref();
246
247 println!("PARAM: {:?}", param);
248
249 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
250 let size = tools::required_integer_param(&param, "size")? as usize;
251
252 let mut archive_name = name.clone();
253 if !archive_name.ends_with(".img") {
254 bail!("wrong archive extension: '{}'", archive_name);
255 } else {
256 archive_name.push_str(".fidx");
257 }
258
259 let mut path = env.backup_dir.relative_path();
260 path.push(archive_name);
261
262 let chunk_size = 4096*1024; // todo: ??
263
264 let index = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
265 let wid = env.register_fixed_writer(index, name, size, chunk_size as u32)?;
266
267 env.log(format!("created new fixed index {} ({:?})", wid, path));
268
269 Ok(json!(wid))
270}
271
82ab7230
DM
272pub fn api_method_dynamic_append() -> ApiMethod {
273 ApiMethod::new(
274 dynamic_append,
275 ObjectSchema::new("Append chunk to dynamic index writer.")
417cb073
DM
276 .required("wid", IntegerSchema::new("Dynamic writer ID.")
277 .minimum(1)
278 .maximum(256)
279 )
aa1b2e04
DM
280 .required("digest-list", ArraySchema::new(
281 "Chunk digest list.",
282 StringSchema::new("Chunk digest.").into())
283 )
417cb073
DM
284 .required("offset-list", ArraySchema::new(
285 "Chunk offset list.",
286 IntegerSchema::new("Corresponding chunk offsets.")
287 .minimum(0)
288 .into())
82ab7230
DM
289 )
290 )
291}
292
293fn dynamic_append (
294 param: Value,
295 _info: &ApiMethod,
296 rpcenv: &mut RpcEnvironment,
297) -> Result<Value, Error> {
298
299 let wid = tools::required_integer_param(&param, "wid")? as usize;
aa1b2e04 300 let digest_list = tools::required_array_param(&param, "digest-list")?;
417cb073 301 let offset_list = tools::required_array_param(&param, "offset-list")?;
aa1b2e04
DM
302
303 println!("DIGEST LIST LEN {}", digest_list.len());
82ab7230 304
417cb073
DM
305 if offset_list.len() != digest_list.len() {
306 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
307 }
308
82ab7230
DM
309 let env: &BackupEnvironment = rpcenv.as_ref();
310
417cb073 311 for (i, item) in digest_list.iter().enumerate() {
aa1b2e04
DM
312 let digest_str = item.as_str().unwrap();
313 let digest = crate::tools::hex_to_digest(digest_str)?;
417cb073 314 let offset = offset_list[i].as_u64().unwrap();
aa1b2e04 315 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
417cb073 316 env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
82ab7230 317
aa1b2e04
DM
318 env.log(format!("sucessfully added chunk {} to dynamic index {}", digest_str, wid));
319 }
82ab7230
DM
320
321 Ok(Value::Null)
322}
323
a42fa400
DM
324pub fn api_method_fixed_append() -> ApiMethod {
325 ApiMethod::new(
326 fixed_append,
327 ObjectSchema::new("Append chunk to fixed index writer.")
328 .required("wid", IntegerSchema::new("Fixed writer ID.")
329 .minimum(1)
330 .maximum(256)
331 )
332 .required("digest-list", ArraySchema::new(
333 "Chunk digest list.",
334 StringSchema::new("Chunk digest.").into())
335 )
336 .required("offset-list", ArraySchema::new(
337 "Chunk offset list.",
338 IntegerSchema::new("Corresponding chunk offsets.")
339 .minimum(0)
340 .into())
341 )
342 )
343}
344
345fn fixed_append (
346 param: Value,
347 _info: &ApiMethod,
348 rpcenv: &mut RpcEnvironment,
349) -> Result<Value, Error> {
350
351 let wid = tools::required_integer_param(&param, "wid")? as usize;
352 let digest_list = tools::required_array_param(&param, "digest-list")?;
353 let offset_list = tools::required_array_param(&param, "offset-list")?;
354
355 println!("DIGEST LIST LEN {}", digest_list.len());
356
357 if offset_list.len() != digest_list.len() {
358 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
359 }
360
361 let env: &BackupEnvironment = rpcenv.as_ref();
362
363 for (i, item) in digest_list.iter().enumerate() {
364 let digest_str = item.as_str().unwrap();
365 let digest = crate::tools::hex_to_digest(digest_str)?;
366 let offset = offset_list[i].as_u64().unwrap();
367 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
368 println!("DEBUG {} {}", offset, size);
369 env.fixed_writer_append_chunk(wid, offset, size, &digest)?;
370
371 env.log(format!("sucessfully added chunk {} to fixed index {}", digest_str, wid));
372 }
373
374 Ok(Value::Null)
375}
376
a2077252
DM
377pub fn api_method_close_dynamic_index() -> ApiMethod {
378 ApiMethod::new(
379 close_dynamic_index,
380 ObjectSchema::new("Close dynamic index writer.")
381 .required("wid", IntegerSchema::new("Dynamic writer ID.")
382 .minimum(1)
383 .maximum(256)
384 )
8bea85b4
DM
385 .required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
386 .minimum(1)
387 )
388 .required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.")
389 .minimum(1)
390 )
a2077252
DM
391 )
392}
393
394fn close_dynamic_index (
395 param: Value,
396 _info: &ApiMethod,
397 rpcenv: &mut RpcEnvironment,
398) -> Result<Value, Error> {
399
400 let wid = tools::required_integer_param(&param, "wid")? as usize;
8bea85b4
DM
401 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
402 let size = tools::required_integer_param(&param, "size")? as u64;
a2077252
DM
403
404 let env: &BackupEnvironment = rpcenv.as_ref();
405
8bea85b4 406 env.dynamic_writer_close(wid, chunk_count, size)?;
a2077252 407
bb105f9d
DM
408 env.log(format!("sucessfully closed dynamic index {}", wid));
409
a2077252
DM
410 Ok(Value::Null)
411}
412
a42fa400
DM
413pub fn api_method_close_fixed_index() -> ApiMethod {
414 ApiMethod::new(
415 close_fixed_index,
416 ObjectSchema::new("Close fixed index writer.")
417 .required("wid", IntegerSchema::new("Fixed writer ID.")
418 .minimum(1)
419 .maximum(256)
420 )
421 .required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
422 .minimum(1)
423 )
424 .required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.")
425 .minimum(1)
426 )
427 )
428}
429
430fn close_fixed_index (
431 param: Value,
432 _info: &ApiMethod,
433 rpcenv: &mut RpcEnvironment,
434) -> Result<Value, Error> {
435
436 let wid = tools::required_integer_param(&param, "wid")? as usize;
437 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
438 let size = tools::required_integer_param(&param, "size")? as u64;
439
440 let env: &BackupEnvironment = rpcenv.as_ref();
441
442 env.fixed_writer_close(wid, chunk_count, size)?;
443
444 env.log(format!("sucessfully closed fixed index {}", wid));
445
446 Ok(Value::Null)
447}
a2077252 448
372724af
DM
449fn finish_backup (
450 _param: Value,
451 _info: &ApiMethod,
452 rpcenv: &mut RpcEnvironment,
453) -> Result<Value, Error> {
454
455 let env: &BackupEnvironment = rpcenv.as_ref();
456
457 env.finish_backup()?;
60e589a1 458 env.log("sucessfully finished backup");
372724af
DM
459
460 Ok(Value::Null)
461}
a2077252 462
a42fa400
DM
463pub fn api_method_dynamic_chunk_index() -> ApiAsyncMethod {
464 ApiAsyncMethod::new(
465 dynamic_chunk_index,
466 ObjectSchema::new(r###"
467Download the dynamic chunk index from the previous backup.
468Simply returns an empty list if this is the first backup.
469"###
470 )
471 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
472 )
473}
474
d3611366
DM
475fn dynamic_chunk_index(
476 _parts: Parts,
477 _req_body: Body,
478 param: Value,
479 _info: &ApiAsyncMethod,
480 rpcenv: Box<RpcEnvironment>,
481) -> Result<BoxFut, Error> {
482
483 let env: &BackupEnvironment = rpcenv.as_ref();
d3611366 484
a9584932
DM
485 println!("TEST CHUNK DOWNLOAD");
486
d3611366
DM
487 let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
488
489 if !archive_name.ends_with(".pxar") {
a42fa400 490 bail!("wrong archive extension: '{}'", archive_name);
d3611366
DM
491 } else {
492 archive_name.push_str(".didx");
493 }
494
a9584932
DM
495 let empty_response = {
496 Response::builder()
497 .status(StatusCode::OK)
498 .body(Body::empty())?
499 };
500
d3611366
DM
501 let last_backup = match &env.last_backup {
502 Some(info) => info,
a9584932 503 None => return Ok(Box::new(future::ok(empty_response))),
d3611366
DM
504 };
505
506 let mut path = last_backup.backup_dir.relative_path();
a9584932 507 path.push(&archive_name);
d3611366 508
a9584932
DM
509 let index = match env.datastore.open_dynamic_reader(path) {
510 Ok(index) => index,
511 Err(_) => {
512 env.log(format!("there is no last backup for archive '{}'", archive_name));
513 return Ok(Box::new(future::ok(empty_response)));
514 }
515 };
516
517 let count = index.index_count();
518 for pos in 0..count {
519 let (start, end, digest) = index.chunk_info(pos)?;
520 let size = (end - start) as u32;
521 env.register_chunk(digest, size)?;
522 }
d3611366 523
7f3d2ffa 524 let reader = DigestListEncoder::new(Box::new(index));
d3611366
DM
525
526 let stream = WrappedReaderStream::new(reader);
527
528 // fixme: set size, content type?
529 let response = http::Response::builder()
530 .status(200)
531 .body(Body::wrap_stream(stream))?;
532
533 Ok(Box::new(future::ok(response)))
534}
a42fa400
DM
535
536pub fn api_method_fixed_chunk_index() -> ApiAsyncMethod {
537 ApiAsyncMethod::new(
538 fixed_chunk_index,
539 ObjectSchema::new(r###"
540Download the fixed chunk index from the previous backup.
541Simply returns an empty list if this is the first backup.
542"###
543 )
544 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
545 )
546}
547
548fn fixed_chunk_index(
549 _parts: Parts,
550 _req_body: Body,
551 param: Value,
552 _info: &ApiAsyncMethod,
553 rpcenv: Box<RpcEnvironment>,
554) -> Result<BoxFut, Error> {
555
556 let env: &BackupEnvironment = rpcenv.as_ref();
557
558 let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
559
560 if !archive_name.ends_with(".img") {
561 bail!("wrong archive extension: '{}'", archive_name);
562 } else {
563 archive_name.push_str(".fidx");
564 }
565
566 let empty_response = {
567 Response::builder()
568 .status(StatusCode::OK)
569 .body(Body::empty())?
570 };
571
572 let last_backup = match &env.last_backup {
573 Some(info) => info,
574 None => return Ok(Box::new(future::ok(empty_response))),
575 };
576
577 let mut path = last_backup.backup_dir.relative_path();
578 path.push(&archive_name);
579
580 let index = match env.datastore.open_fixed_reader(path) {
581 Ok(index) => index,
582 Err(_) => {
583 env.log(format!("there is no last backup for archive '{}'", archive_name));
584 return Ok(Box::new(future::ok(empty_response)));
585 }
586 };
587
588 let count = index.index_count();
589 for pos in 0..count {
590 let digest = index.index_digest(pos).unwrap();
591 let size = index.chunk_size as u32;
592 env.register_chunk(*digest, size)?;
593 }
594
595 let reader = DigestListEncoder::new(Box::new(index));
596
597 let stream = WrappedReaderStream::new(reader);
598
599 // fixme: set size, content type?
600 let response = http::Response::builder()
601 .status(200)
602 .body(Body::wrap_stream(stream))?;
603
604 Ok(Box::new(future::ok(response)))
605}