]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/admin/datastore.rs
src/api2/admin/datastore.rs: backup logs may not be written twice
[proxmox-backup.git] / src / api2 / admin / datastore.rs
1 use failure::*;
2 use futures::*;
3
4 use crate::tools;
5 use crate::api_schema::*;
6 use crate::api_schema::router::*;
7 //use crate::server::rest::*;
8 use serde_json::{json, Value};
9 use std::collections::{HashSet, HashMap};
10 use chrono::{DateTime, Datelike, TimeZone, Local};
11 use std::path::PathBuf;
12 use std::sync::Arc;
13
14 use crate::config::datastore;
15
16 use crate::backup::*;
17 use crate::server::WorkerTask;
18
19 use hyper::{header, Body, Response, StatusCode};
20 use hyper::http::request::Parts;
21
22 fn group_backups(backup_list: Vec<BackupInfo>) -> HashMap<String, Vec<BackupInfo>> {
23
24 let mut group_hash = HashMap::new();
25
26 for info in backup_list {
27 let group_id = info.backup_dir.group().group_path().to_str().unwrap().to_owned();
28 let time_list = group_hash.entry(group_id).or_insert(vec![]);
29 time_list.push(info);
30 }
31
32 group_hash
33 }
34
35 fn mark_selections<F: Fn(DateTime<Local>, &BackupInfo) -> String> (
36 mark: &mut HashSet<PathBuf>,
37 list: &Vec<BackupInfo>,
38 keep: usize,
39 select_id: F,
40 ){
41 let mut hash = HashSet::new();
42 for info in list {
43 let local_time = info.backup_dir.backup_time().with_timezone(&Local);
44 if hash.len() >= keep as usize { break; }
45 let backup_id = info.backup_dir.relative_path();
46 let sel_id: String = select_id(local_time, &info);
47 if !hash.contains(&sel_id) {
48 hash.insert(sel_id);
49 //println!(" KEEP ID {} {}", backup_id, local_time.format("%c"));
50 mark.insert(backup_id);
51 }
52 }
53 }
54
55 fn list_groups(
56 param: Value,
57 _info: &ApiMethod,
58 _rpcenv: &mut dyn RpcEnvironment,
59 ) -> Result<Value, Error> {
60
61 let store = param["store"].as_str().unwrap();
62
63 let datastore = DataStore::lookup_datastore(store)?;
64
65 let backup_list = BackupInfo::list_backups(&datastore.base_path())?;
66
67 let group_hash = group_backups(backup_list);
68
69 let mut groups = vec![];
70
71 for (_group_id, mut list) in group_hash {
72
73 BackupInfo::sort_list(&mut list, false);
74
75 let info = &list[0];
76 let group = info.backup_dir.group();
77
78 groups.push(json!({
79 "backup-type": group.backup_type(),
80 "backup-id": group.backup_id(),
81 "last-backup": info.backup_dir.backup_time().timestamp(),
82 "backup-count": list.len() as u64,
83 "files": info.files,
84 }));
85 }
86
87 Ok(json!(groups))
88 }
89
90 fn list_snapshot_files (
91 param: Value,
92 _info: &ApiMethod,
93 _rpcenv: &mut dyn RpcEnvironment,
94 ) -> Result<Value, Error> {
95
96 let store = tools::required_string_param(&param, "store")?;
97 let backup_type = tools::required_string_param(&param, "backup-type")?;
98 let backup_id = tools::required_string_param(&param, "backup-id")?;
99 let backup_time = tools::required_integer_param(&param, "backup-time")?;
100
101 let snapshot = BackupDir::new(backup_type, backup_id, backup_time);
102
103 let datastore = DataStore::lookup_datastore(store)?;
104
105 let path = datastore.base_path();
106 let files = BackupInfo::list_files(&path, &snapshot)?;
107
108 Ok(json!(files))
109 }
110
111 fn delete_snapshots (
112 param: Value,
113 _info: &ApiMethod,
114 _rpcenv: &mut dyn RpcEnvironment,
115 ) -> Result<Value, Error> {
116
117 let store = tools::required_string_param(&param, "store")?;
118 let backup_type = tools::required_string_param(&param, "backup-type")?;
119 let backup_id = tools::required_string_param(&param, "backup-id")?;
120 let backup_time = tools::required_integer_param(&param, "backup-time")?;
121
122 let snapshot = BackupDir::new(backup_type, backup_id, backup_time);
123
124 let datastore = DataStore::lookup_datastore(store)?;
125
126 datastore.remove_backup_dir(&snapshot)?;
127
128 Ok(Value::Null)
129 }
130
131 fn list_snapshots (
132 param: Value,
133 _info: &ApiMethod,
134 _rpcenv: &mut dyn RpcEnvironment,
135 ) -> Result<Value, Error> {
136
137 let store = tools::required_string_param(&param, "store")?;
138 let backup_type = param["backup-type"].as_str();
139 let backup_id = param["backup-id"].as_str();
140
141 let datastore = DataStore::lookup_datastore(store)?;
142
143 let base_path = datastore.base_path();
144
145 let backup_list = BackupInfo::list_backups(&base_path)?;
146
147 let mut snapshots = vec![];
148
149 for info in backup_list {
150 let group = info.backup_dir.group();
151 if let Some(backup_type) = backup_type {
152 if backup_type != group.backup_type() { continue; }
153 }
154 if let Some(backup_id) = backup_id {
155 if backup_id != group.backup_id() { continue; }
156 }
157 snapshots.push(json!({
158 "backup-type": group.backup_type(),
159 "backup-id": group.backup_id(),
160 "backup-time": info.backup_dir.backup_time().timestamp(),
161 "files": info.files,
162 }));
163 }
164
165 Ok(json!(snapshots))
166 }
167
168 fn status(
169 param: Value,
170 _info: &ApiMethod,
171 _rpcenv: &mut dyn RpcEnvironment,
172 ) -> Result<Value, Error> {
173
174 let store = param["store"].as_str().unwrap();
175
176 let datastore = DataStore::lookup_datastore(store)?;
177
178 let base_path = datastore.base_path();
179
180 let mut stat: libc::statfs64 = unsafe { std::mem::zeroed() };
181
182 use nix::NixPath;
183
184 let res = base_path.with_nix_path(|cstr| unsafe { libc::statfs64(cstr.as_ptr(), &mut stat) })?;
185 nix::errno::Errno::result(res)?;
186
187 let bsize = stat.f_bsize as u64;
188 Ok(json!({
189 "total": stat.f_blocks*bsize,
190 "used": (stat.f_blocks-stat.f_bfree)*bsize,
191 "avail": stat.f_bavail*bsize,
192 }))
193 }
194
195 fn api_method_status() -> ApiMethod {
196 ApiMethod::new(
197 status,
198 add_common_prune_prameters(
199 ObjectSchema::new("Get datastore status.")
200 .required(
201 "store",
202 StringSchema::new("Datastore name.")
203 )
204 )
205 )
206 }
207
208 fn prune(
209 param: Value,
210 _info: &ApiMethod,
211 _rpcenv: &mut dyn RpcEnvironment,
212 ) -> Result<Value, Error> {
213
214 let store = param["store"].as_str().unwrap();
215
216 let datastore = DataStore::lookup_datastore(store)?;
217
218 let mut keep_all = true;
219
220 for opt in &["keep-last", "keep-daily", "keep-weekly", "keep-weekly", "keep-yearly"] {
221 if !param[opt].is_null() {
222 keep_all = false;
223 break;
224 }
225 }
226
227 let worker = WorkerTask::new("prune", Some(store.to_owned()), "root@pam", true)?;
228 let result = try_block! {
229 if keep_all {
230 worker.log("No selection - keeping all files.");
231 return Ok(());
232 } else {
233 worker.log(format!("Starting prune on store {}", store));
234 }
235
236 let backup_list = BackupInfo::list_backups(&datastore.base_path())?;
237
238 let group_hash = group_backups(backup_list);
239
240 for (_group_id, mut list) in group_hash {
241
242 let mut mark = HashSet::new();
243
244 BackupInfo::sort_list(&mut list, false);
245
246 if let Some(keep_last) = param["keep-last"].as_u64() {
247 list.iter().take(keep_last as usize).for_each(|info| {
248 mark.insert(info.backup_dir.relative_path());
249 });
250 }
251
252 if let Some(keep_daily) = param["keep-daily"].as_u64() {
253 mark_selections(&mut mark, &list, keep_daily as usize, |local_time, _info| {
254 format!("{}/{}/{}", local_time.year(), local_time.month(), local_time.day())
255 });
256 }
257
258 if let Some(keep_weekly) = param["keep-weekly"].as_u64() {
259 mark_selections(&mut mark, &list, keep_weekly as usize, |local_time, _info| {
260 format!("{}/{}", local_time.year(), local_time.iso_week().week())
261 });
262 }
263
264 if let Some(keep_monthly) = param["keep-monthly"].as_u64() {
265 mark_selections(&mut mark, &list, keep_monthly as usize, |local_time, _info| {
266 format!("{}/{}", local_time.year(), local_time.month())
267 });
268 }
269
270 if let Some(keep_yearly) = param["keep-yearly"].as_u64() {
271 mark_selections(&mut mark, &list, keep_yearly as usize, |local_time, _info| {
272 format!("{}/{}", local_time.year(), local_time.year())
273 });
274 }
275
276 let mut remove_list: Vec<BackupInfo> = list.into_iter()
277 .filter(|info| !mark.contains(&info.backup_dir.relative_path())).collect();
278
279 BackupInfo::sort_list(&mut remove_list, true);
280
281 for info in remove_list {
282 worker.log(format!("remove {:?}", info.backup_dir));
283 datastore.remove_backup_dir(&info.backup_dir)?;
284 }
285 }
286
287 Ok(())
288 };
289
290 worker.log_result(&result);
291
292 if let Err(err) = result {
293 bail!("prune failed - {}", err);
294 }
295
296 Ok(json!(null))
297 }
298
299 pub fn add_common_prune_prameters(schema: ObjectSchema) -> ObjectSchema {
300
301 schema
302 .optional(
303 "keep-last",
304 IntegerSchema::new("Number of backups to keep.")
305 .minimum(1)
306 )
307 .optional(
308 "keep-daily",
309 IntegerSchema::new("Number of daily backups to keep.")
310 .minimum(1)
311 )
312 .optional(
313 "keep-weekly",
314 IntegerSchema::new("Number of weekly backups to keep.")
315 .minimum(1)
316 )
317 .optional(
318 "keep-monthly",
319 IntegerSchema::new("Number of monthly backups to keep.")
320 .minimum(1)
321 )
322 .optional(
323 "keep-yearly",
324 IntegerSchema::new("Number of yearly backups to keep.")
325 .minimum(1)
326 )
327 }
328
329 fn api_method_prune() -> ApiMethod {
330 ApiMethod::new(
331 prune,
332 add_common_prune_prameters(
333 ObjectSchema::new("Prune the datastore.")
334 .required(
335 "store",
336 StringSchema::new("Datastore name.")
337 )
338 )
339 )
340 }
341
342 fn start_garbage_collection(
343 param: Value,
344 _info: &ApiMethod,
345 rpcenv: &mut dyn RpcEnvironment,
346 ) -> Result<Value, Error> {
347
348 let store = param["store"].as_str().unwrap().to_string();
349
350 let datastore = DataStore::lookup_datastore(&store)?;
351
352 println!("Starting garbage collection on store {}", store);
353
354 let to_stdout = if rpcenv.env_type() == RpcEnvironmentType::CLI { true } else { false };
355
356 let upid_str = WorkerTask::new_thread(
357 "garbage_collection", Some(store.clone()), "root@pam", to_stdout, move |worker|
358 {
359 worker.log(format!("starting garbage collection on store {}", store));
360 datastore.garbage_collection(worker)
361 })?;
362
363 Ok(json!(upid_str))
364 }
365
366 pub fn api_method_start_garbage_collection() -> ApiMethod {
367 ApiMethod::new(
368 start_garbage_collection,
369 ObjectSchema::new("Start garbage collection.")
370 .required("store", StringSchema::new("Datastore name."))
371 )
372 }
373
374 fn garbage_collection_status(
375 param: Value,
376 _info: &ApiMethod,
377 _rpcenv: &mut dyn RpcEnvironment,
378 ) -> Result<Value, Error> {
379
380 let store = param["store"].as_str().unwrap();
381
382 let datastore = DataStore::lookup_datastore(&store)?;
383
384 println!("Garbage collection status on store {}", store);
385
386 let status = datastore.last_gc_status();
387
388 Ok(serde_json::to_value(&status)?)
389 }
390
391 pub fn api_method_garbage_collection_status() -> ApiMethod {
392 ApiMethod::new(
393 garbage_collection_status,
394 ObjectSchema::new("Garbage collection status.")
395 .required("store", StringSchema::new("Datastore name."))
396 )
397 }
398
399 fn get_datastore_list(
400 _param: Value,
401 _info: &ApiMethod,
402 _rpcenv: &mut dyn RpcEnvironment,
403 ) -> Result<Value, Error> {
404
405 let config = datastore::config()?;
406
407 Ok(config.convert_to_array("store"))
408 }
409
410
411 fn download_file(
412 _parts: Parts,
413 _req_body: Body,
414 param: Value,
415 _info: &ApiAsyncMethod,
416 _rpcenv: Box<dyn RpcEnvironment>,
417 ) -> Result<BoxFut, Error> {
418
419 let store = tools::required_string_param(&param, "store")?;
420
421 let datastore = DataStore::lookup_datastore(store)?;
422
423 let file_name = tools::required_string_param(&param, "file-name")?.to_owned();
424
425 let backup_type = tools::required_string_param(&param, "backup-type")?;
426 let backup_id = tools::required_string_param(&param, "backup-id")?;
427 let backup_time = tools::required_integer_param(&param, "backup-time")?;
428
429 println!("Download {} from {} ({}/{}/{}/{})", file_name, store,
430 backup_type, backup_id, Local.timestamp(backup_time, 0), file_name);
431
432 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
433
434 let mut path = datastore.base_path();
435 path.push(backup_dir.relative_path());
436 path.push(&file_name);
437
438 let response_future = tokio::fs::File::open(path)
439 .map_err(|err| http_err!(BAD_REQUEST, format!("File open failed: {}", err)))
440 .and_then(move |file| {
441 let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()).
442 map(|bytes| hyper::Chunk::from(bytes.freeze()));
443 let body = Body::wrap_stream(payload);
444
445 // fixme: set other headers ?
446 Ok(Response::builder()
447 .status(StatusCode::OK)
448 .header(header::CONTENT_TYPE, "application/octet-stream")
449 .body(body)
450 .unwrap())
451 });
452
453 Ok(Box::new(response_future))
454 }
455
456 pub fn api_method_download_file() -> ApiAsyncMethod {
457 ApiAsyncMethod::new(
458 download_file,
459 ObjectSchema::new("Download single raw file from backup snapshot.")
460 .required("store", StringSchema::new("Datastore name."))
461 .required("backup-type", StringSchema::new("Backup type.")
462 .format(Arc::new(ApiStringFormat::Enum(&["ct", "host"]))))
463 .required("backup-id", StringSchema::new("Backup ID."))
464 .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)")
465 .minimum(1547797308))
466 .required("file-name", StringSchema::new("Raw file name."))
467 )
468 }
469
470 fn upload_backup_log(
471 _parts: Parts,
472 req_body: Body,
473 param: Value,
474 _info: &ApiAsyncMethod,
475 _rpcenv: Box<dyn RpcEnvironment>,
476 ) -> Result<BoxFut, Error> {
477
478 let store = tools::required_string_param(&param, "store")?;
479
480 let datastore = DataStore::lookup_datastore(store)?;
481
482 let file_name = "client.log.blob";
483
484 let backup_type = tools::required_string_param(&param, "backup-type")?;
485 let backup_id = tools::required_string_param(&param, "backup-id")?;
486 let backup_time = tools::required_integer_param(&param, "backup-time")?;
487
488 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
489
490 let mut path = datastore.base_path();
491 path.push(backup_dir.relative_path());
492 path.push(&file_name);
493
494 if path.exists() {
495 bail!("backup already contains a log.");
496 }
497
498 println!("Upload backup log to {}/{}/{}/{}/{}", store,
499 backup_type, backup_id, BackupDir::backup_time_to_string(backup_dir.backup_time()), file_name);
500
501 let resp = req_body
502 .map_err(Error::from)
503 .fold(Vec::new(), |mut acc, chunk| {
504 acc.extend_from_slice(&*chunk);
505 Ok::<_, Error>(acc)
506 })
507 .and_then(move |data| {
508 let mut blob = DataBlob::from_raw(data)?;
509 // always comput CRC at server side
510 blob.set_crc(blob.compute_crc());
511 let raw_data = blob.raw_data();
512 crate::tools::file_set_contents(&path, raw_data, None)?;
513 Ok(())
514 })
515 .and_then(move |_| {
516 Ok(crate::server::formatter::json_response(Ok(Value::Null)))
517 })
518 ;
519
520 Ok(Box::new(resp))
521 }
522
523 pub fn api_method_upload_backup_log() -> ApiAsyncMethod {
524 ApiAsyncMethod::new(
525 upload_backup_log,
526 ObjectSchema::new("Download single raw file from backup snapshot.")
527 .required("store", StringSchema::new("Datastore name."))
528 .required("backup-type", StringSchema::new("Backup type.")
529 .format(Arc::new(ApiStringFormat::Enum(&["ct", "host"]))))
530 .required("backup-id", StringSchema::new("Backup ID."))
531 .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)")
532 .minimum(1547797308))
533 )
534 }
535
536 pub fn router() -> Router {
537
538 let store_schema: Arc<Schema> = Arc::new(
539 StringSchema::new("Datastore name.").into()
540 );
541
542 let datastore_info = Router::new()
543 .subdir(
544 "download",
545 Router::new()
546 .download(api_method_download_file())
547 )
548 .subdir(
549 "upload-backup-log",
550 Router::new()
551 .upload(api_method_upload_backup_log())
552 )
553 .subdir(
554 "gc",
555 Router::new()
556 .get(api_method_garbage_collection_status())
557 .post(api_method_start_garbage_collection()))
558 .subdir(
559 "files",
560 Router::new()
561 .get(
562 ApiMethod::new(
563 list_snapshot_files,
564 ObjectSchema::new("List snapshot files.")
565 .required("store", store_schema.clone())
566 .required("backup-type", StringSchema::new("Backup type."))
567 .required("backup-id", StringSchema::new("Backup ID."))
568 .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)")
569 .minimum(1547797308))
570 )
571 )
572 )
573 .subdir(
574 "groups",
575 Router::new()
576 .get(ApiMethod::new(
577 list_groups,
578 ObjectSchema::new("List backup groups.")
579 .required("store", store_schema.clone()))))
580 .subdir(
581 "snapshots",
582 Router::new()
583 .get(
584 ApiMethod::new(
585 list_snapshots,
586 ObjectSchema::new("List backup groups.")
587 .required("store", store_schema.clone())
588 .optional("backup-type", StringSchema::new("Backup type."))
589 .optional("backup-id", StringSchema::new("Backup ID."))
590 )
591 )
592 .delete(
593 ApiMethod::new(
594 delete_snapshots,
595 ObjectSchema::new("Delete backup snapshot.")
596 .required("store", store_schema.clone())
597 .required("backup-type", StringSchema::new("Backup type."))
598 .required("backup-id", StringSchema::new("Backup ID."))
599 .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)")
600 .minimum(1547797308))
601 )
602 )
603 )
604 .subdir(
605 "prune",
606 Router::new()
607 .post(api_method_prune())
608 )
609 .subdir(
610 "status",
611 Router::new()
612 .get(api_method_status())
613 )
614 .list_subdirs();
615
616
617
618 let route = Router::new()
619 .get(ApiMethod::new(
620 get_datastore_list,
621 ObjectSchema::new("Directory index.")))
622 .match_all("store", datastore_info);
623
624
625
626 route
627 }