]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/admin/datastore.rs
src/api2/admin/datastore.rs: use correct path for download
[proxmox-backup.git] / src / api2 / admin / datastore.rs
1 use failure::*;
2 use futures::*;
3
4 use crate::tools;
5 use crate::api_schema::*;
6 use crate::api_schema::router::*;
7 //use crate::server::rest::*;
8 use serde_json::{json, Value};
9 use std::collections::{HashSet, HashMap};
10 use chrono::{DateTime, Datelike, TimeZone, Local};
11 use std::path::PathBuf;
12 use std::sync::Arc;
13
14 use crate::config::datastore;
15
16 use crate::backup::*;
17 use crate::server::WorkerTask;
18
19 use hyper::{header, Body, Response, StatusCode};
20 use hyper::http::request::Parts;
21
22 fn group_backups(backup_list: Vec<BackupInfo>) -> HashMap<String, Vec<BackupInfo>> {
23
24 let mut group_hash = HashMap::new();
25
26 for info in backup_list {
27 let group_id = info.backup_dir.group().group_path().to_str().unwrap().to_owned();
28 let time_list = group_hash.entry(group_id).or_insert(vec![]);
29 time_list.push(info);
30 }
31
32 group_hash
33 }
34
35 fn mark_selections<F: Fn(DateTime<Local>, &BackupInfo) -> String> (
36 mark: &mut HashSet<PathBuf>,
37 list: &Vec<BackupInfo>,
38 keep: usize,
39 select_id: F,
40 ){
41 let mut hash = HashSet::new();
42 for info in list {
43 let local_time = info.backup_dir.backup_time().with_timezone(&Local);
44 if hash.len() >= keep as usize { break; }
45 let backup_id = info.backup_dir.relative_path();
46 let sel_id: String = select_id(local_time, &info);
47 if !hash.contains(&sel_id) {
48 hash.insert(sel_id);
49 //println!(" KEEP ID {} {}", backup_id, local_time.format("%c"));
50 mark.insert(backup_id);
51 }
52 }
53 }
54
55 fn list_groups(
56 param: Value,
57 _info: &ApiMethod,
58 _rpcenv: &mut dyn RpcEnvironment,
59 ) -> Result<Value, Error> {
60
61 let store = param["store"].as_str().unwrap();
62
63 let datastore = DataStore::lookup_datastore(store)?;
64
65 let backup_list = BackupInfo::list_backups(&datastore.base_path())?;
66
67 let group_hash = group_backups(backup_list);
68
69 let mut groups = vec![];
70
71 for (_group_id, mut list) in group_hash {
72
73 BackupInfo::sort_list(&mut list, false);
74
75 let info = &list[0];
76 let group = info.backup_dir.group();
77
78 groups.push(json!({
79 "backup-type": group.backup_type(),
80 "backup-id": group.backup_id(),
81 "last-backup": info.backup_dir.backup_time().timestamp(),
82 "backup-count": list.len() as u64,
83 "files": info.files,
84 }));
85 }
86
87 Ok(json!(groups))
88 }
89
90 fn list_snapshot_files (
91 param: Value,
92 _info: &ApiMethod,
93 _rpcenv: &mut dyn RpcEnvironment,
94 ) -> Result<Value, Error> {
95
96 let store = tools::required_string_param(&param, "store")?;
97 let backup_type = tools::required_string_param(&param, "backup-type")?;
98 let backup_id = tools::required_string_param(&param, "backup-id")?;
99 let backup_time = tools::required_integer_param(&param, "backup-time")?;
100
101 let snapshot = BackupDir::new(backup_type, backup_id, backup_time);
102
103 let datastore = DataStore::lookup_datastore(store)?;
104
105 let path = datastore.base_path();
106 let files = BackupInfo::list_files(&path, &snapshot)?;
107
108 Ok(json!(files))
109 }
110
111 fn delete_snapshots (
112 param: Value,
113 _info: &ApiMethod,
114 _rpcenv: &mut dyn RpcEnvironment,
115 ) -> Result<Value, Error> {
116
117 let store = tools::required_string_param(&param, "store")?;
118 let backup_type = tools::required_string_param(&param, "backup-type")?;
119 let backup_id = tools::required_string_param(&param, "backup-id")?;
120 let backup_time = tools::required_integer_param(&param, "backup-time")?;
121
122 let snapshot = BackupDir::new(backup_type, backup_id, backup_time);
123
124 let datastore = DataStore::lookup_datastore(store)?;
125
126 datastore.remove_backup_dir(&snapshot)?;
127
128 Ok(Value::Null)
129 }
130
131 fn list_snapshots (
132 param: Value,
133 _info: &ApiMethod,
134 _rpcenv: &mut dyn RpcEnvironment,
135 ) -> Result<Value, Error> {
136
137 let store = tools::required_string_param(&param, "store")?;
138 let backup_type = tools::required_string_param(&param, "backup-type")?;
139 let backup_id = tools::required_string_param(&param, "backup-id")?;
140
141 let group = BackupGroup::new(backup_type, backup_id);
142
143 let datastore = DataStore::lookup_datastore(store)?;
144
145 let base_path = datastore.base_path();
146
147 let backup_list = group.list_backups(&base_path)?;
148
149 let mut snapshots = vec![];
150
151 for info in backup_list {
152 snapshots.push(json!({
153 "backup-type": group.backup_type(),
154 "backup-id": group.backup_id(),
155 "backup-time": info.backup_dir.backup_time().timestamp(),
156 "files": info.files,
157 }));
158 }
159
160 Ok(json!(snapshots))
161 }
162
163 fn prune(
164 param: Value,
165 _info: &ApiMethod,
166 _rpcenv: &mut dyn RpcEnvironment,
167 ) -> Result<Value, Error> {
168
169 let store = param["store"].as_str().unwrap();
170
171 let datastore = DataStore::lookup_datastore(store)?;
172
173 let mut keep_all = true;
174
175 for opt in &["keep-last", "keep-daily", "keep-weekly", "keep-weekly", "keep-yearly"] {
176 if !param[opt].is_null() {
177 keep_all = false;
178 break;
179 }
180 }
181
182 let worker = WorkerTask::new("prune", Some(store.to_owned()), "root@pam", true)?;
183 let result = try_block! {
184 if keep_all {
185 worker.log("No selection - keeping all files.");
186 return Ok(());
187 } else {
188 worker.log(format!("Starting prune on store {}", store));
189 }
190
191 let backup_list = BackupInfo::list_backups(&datastore.base_path())?;
192
193 let group_hash = group_backups(backup_list);
194
195 for (_group_id, mut list) in group_hash {
196
197 let mut mark = HashSet::new();
198
199 BackupInfo::sort_list(&mut list, false);
200
201 if let Some(keep_last) = param["keep-last"].as_u64() {
202 list.iter().take(keep_last as usize).for_each(|info| {
203 mark.insert(info.backup_dir.relative_path());
204 });
205 }
206
207 if let Some(keep_daily) = param["keep-daily"].as_u64() {
208 mark_selections(&mut mark, &list, keep_daily as usize, |local_time, _info| {
209 format!("{}/{}/{}", local_time.year(), local_time.month(), local_time.day())
210 });
211 }
212
213 if let Some(keep_weekly) = param["keep-weekly"].as_u64() {
214 mark_selections(&mut mark, &list, keep_weekly as usize, |local_time, _info| {
215 format!("{}/{}", local_time.year(), local_time.iso_week().week())
216 });
217 }
218
219 if let Some(keep_monthly) = param["keep-monthly"].as_u64() {
220 mark_selections(&mut mark, &list, keep_monthly as usize, |local_time, _info| {
221 format!("{}/{}", local_time.year(), local_time.month())
222 });
223 }
224
225 if let Some(keep_yearly) = param["keep-yearly"].as_u64() {
226 mark_selections(&mut mark, &list, keep_yearly as usize, |local_time, _info| {
227 format!("{}/{}", local_time.year(), local_time.year())
228 });
229 }
230
231 let mut remove_list: Vec<BackupInfo> = list.into_iter()
232 .filter(|info| !mark.contains(&info.backup_dir.relative_path())).collect();
233
234 BackupInfo::sort_list(&mut remove_list, true);
235
236 for info in remove_list {
237 worker.log(format!("remove {:?}", info.backup_dir));
238 datastore.remove_backup_dir(&info.backup_dir)?;
239 }
240 }
241
242 Ok(())
243 };
244
245 worker.log_result(&result);
246
247 if let Err(err) = result {
248 bail!("prune failed - {}", err);
249 }
250
251 Ok(json!(null))
252 }
253
254 pub fn add_common_prune_prameters(schema: ObjectSchema) -> ObjectSchema {
255
256 schema
257 .optional(
258 "keep-last",
259 IntegerSchema::new("Number of backups to keep.")
260 .minimum(1)
261 )
262 .optional(
263 "keep-daily",
264 IntegerSchema::new("Number of daily backups to keep.")
265 .minimum(1)
266 )
267 .optional(
268 "keep-weekly",
269 IntegerSchema::new("Number of weekly backups to keep.")
270 .minimum(1)
271 )
272 .optional(
273 "keep-monthly",
274 IntegerSchema::new("Number of monthly backups to keep.")
275 .minimum(1)
276 )
277 .optional(
278 "keep-yearly",
279 IntegerSchema::new("Number of yearly backups to keep.")
280 .minimum(1)
281 )
282 }
283
284 fn api_method_prune() -> ApiMethod {
285 ApiMethod::new(
286 prune,
287 add_common_prune_prameters(
288 ObjectSchema::new("Prune the datastore.")
289 .required(
290 "store",
291 StringSchema::new("Datastore name.")
292 )
293 )
294 )
295 }
296
297 fn start_garbage_collection(
298 param: Value,
299 _info: &ApiMethod,
300 rpcenv: &mut dyn RpcEnvironment,
301 ) -> Result<Value, Error> {
302
303 let store = param["store"].as_str().unwrap().to_string();
304
305 let datastore = DataStore::lookup_datastore(&store)?;
306
307 println!("Starting garbage collection on store {}", store);
308
309 let to_stdout = if rpcenv.env_type() == RpcEnvironmentType::CLI { true } else { false };
310
311 let upid_str = WorkerTask::new_thread(
312 "garbage_collection", Some(store.clone()), "root@pam", to_stdout, move |worker|
313 {
314 worker.log(format!("starting garbage collection on store {}", store));
315 datastore.garbage_collection(worker)
316 })?;
317
318 Ok(json!(upid_str))
319 }
320
321 pub fn api_method_start_garbage_collection() -> ApiMethod {
322 ApiMethod::new(
323 start_garbage_collection,
324 ObjectSchema::new("Start garbage collection.")
325 .required("store", StringSchema::new("Datastore name."))
326 )
327 }
328
329 fn garbage_collection_status(
330 param: Value,
331 _info: &ApiMethod,
332 _rpcenv: &mut dyn RpcEnvironment,
333 ) -> Result<Value, Error> {
334
335 let store = param["store"].as_str().unwrap();
336
337 let datastore = DataStore::lookup_datastore(&store)?;
338
339 println!("Garbage collection status on store {}", store);
340
341 let status = datastore.last_gc_status();
342
343 Ok(serde_json::to_value(&status)?)
344 }
345
346 pub fn api_method_garbage_collection_status() -> ApiMethod {
347 ApiMethod::new(
348 garbage_collection_status,
349 ObjectSchema::new("Garbage collection status.")
350 .required("store", StringSchema::new("Datastore name."))
351 )
352 }
353
354 fn get_backup_list(
355 param: Value,
356 _info: &ApiMethod,
357 _rpcenv: &mut dyn RpcEnvironment,
358 ) -> Result<Value, Error> {
359
360 //let config = datastore::config()?;
361
362 let store = param["store"].as_str().unwrap();
363
364 let datastore = DataStore::lookup_datastore(store)?;
365
366 let mut list = vec![];
367
368 let backup_list = BackupInfo::list_backups(&datastore.base_path())?;
369
370 for info in backup_list {
371 list.push(json!({
372 "backup-type": info.backup_dir.group().backup_type(),
373 "backup-id": info.backup_dir.group().backup_id(),
374 "backup-time": info.backup_dir.backup_time().timestamp(),
375 "files": info.files,
376 }));
377 }
378
379 let result = json!(list);
380
381 Ok(result)
382 }
383
384 fn get_datastore_list(
385 _param: Value,
386 _info: &ApiMethod,
387 _rpcenv: &mut dyn RpcEnvironment,
388 ) -> Result<Value, Error> {
389
390 let config = datastore::config()?;
391
392 Ok(config.convert_to_array("store"))
393 }
394
395
396 fn download_file(
397 _parts: Parts,
398 _req_body: Body,
399 param: Value,
400 _info: &ApiAsyncMethod,
401 _rpcenv: Box<dyn RpcEnvironment>,
402 ) -> Result<BoxFut, Error> {
403
404 let store = tools::required_string_param(&param, "store")?;
405
406 let datastore = DataStore::lookup_datastore(store)?;
407
408 let file_name = tools::required_string_param(&param, "file-name")?.to_owned();
409
410 let backup_type = tools::required_string_param(&param, "backup-type")?;
411 let backup_id = tools::required_string_param(&param, "backup-id")?;
412 let backup_time = tools::required_integer_param(&param, "backup-time")?;
413
414 println!("Download {} from {} ({}/{}/{}/{})", file_name, store,
415 backup_type, backup_id, Local.timestamp(backup_time, 0), file_name);
416
417 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
418
419 let mut path = datastore.base_path();
420 path.push(backup_dir.relative_path());
421 path.push(&file_name);
422
423 let response_future = tokio::fs::File::open(path)
424 .map_err(|err| http_err!(BAD_REQUEST, format!("File open failed: {}", err)))
425 .and_then(move |file| {
426 let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()).
427 map(|bytes| {
428 //sigh - howto avoid copy here? or the whole map() ??
429 hyper::Chunk::from(bytes.to_vec())
430 });
431 let body = Body::wrap_stream(payload);
432
433 // fixme: set other headers ?
434 Ok(Response::builder()
435 .status(StatusCode::OK)
436 .header(header::CONTENT_TYPE, "application/octet-stream")
437 .body(body)
438 .unwrap())
439 });
440
441 Ok(Box::new(response_future))
442 }
443
444 pub fn api_method_download_file() -> ApiAsyncMethod {
445 ApiAsyncMethod::new(
446 download_file,
447 ObjectSchema::new("Download single raw file from backup snapshot.")
448 .required("store", StringSchema::new("Datastore name."))
449 .required("backup-type", StringSchema::new("Backup type.")
450 .format(Arc::new(ApiStringFormat::Enum(&["ct", "host"]))))
451 .required("backup-id", StringSchema::new("Backup ID."))
452 .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)")
453 .minimum(1547797308))
454 .required("file-name", StringSchema::new("Raw file name."))
455 )
456 }
457
458 pub fn router() -> Router {
459
460 let store_schema: Arc<Schema> = Arc::new(
461 StringSchema::new("Datastore name.").into()
462 );
463
464 let datastore_info = Router::new()
465 .subdir(
466 "backups",
467 Router::new()
468 .get(ApiMethod::new(
469 get_backup_list,
470 ObjectSchema::new("List backups.")
471 .required("store", store_schema.clone()))))
472 .subdir(
473 "download",
474 Router::new()
475 .download(api_method_download_file())
476 )
477 .subdir(
478 "gc",
479 Router::new()
480 .get(api_method_garbage_collection_status())
481 .post(api_method_start_garbage_collection()))
482 .subdir(
483 "files",
484 Router::new()
485 .get(
486 ApiMethod::new(
487 list_snapshot_files,
488 ObjectSchema::new("List snapshot files.")
489 .required("store", store_schema.clone())
490 .required("backup-type", StringSchema::new("Backup type."))
491 .required("backup-id", StringSchema::new("Backup ID."))
492 .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)")
493 .minimum(1547797308))
494 )
495 )
496 )
497 .subdir(
498 "groups",
499 Router::new()
500 .get(ApiMethod::new(
501 list_groups,
502 ObjectSchema::new("List backup groups.")
503 .required("store", store_schema.clone()))))
504 .subdir(
505 "snapshots",
506 Router::new()
507 .get(
508 ApiMethod::new(
509 list_snapshots,
510 ObjectSchema::new("List backup groups.")
511 .required("store", store_schema.clone())
512 .required("backup-type", StringSchema::new("Backup type."))
513 .required("backup-id", StringSchema::new("Backup ID."))
514 )
515 )
516 .delete(
517 ApiMethod::new(
518 delete_snapshots,
519 ObjectSchema::new("Delete backup snapshot.")
520 .required("store", store_schema.clone())
521 .required("backup-type", StringSchema::new("Backup type."))
522 .required("backup-id", StringSchema::new("Backup ID."))
523 .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)")
524 .minimum(1547797308))
525 )
526 )
527 )
528 .subdir(
529 "prune",
530 Router::new()
531 .post(api_method_prune())
532 )
533 .list_subdirs();
534
535
536
537 let route = Router::new()
538 .get(ApiMethod::new(
539 get_datastore_list,
540 ObjectSchema::new("Directory index.")))
541 .match_all("store", datastore_info);
542
543
544
545 route
546 }