]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-backup-client/src/main.rs
split the namespace out of BackupGroup/Dir api types
[proxmox-backup.git] / proxmox-backup-client / src / main.rs
1 use std::collections::HashSet;
2 use std::io::{self, Read, Seek, SeekFrom, Write};
3 use std::path::{Path, PathBuf};
4 use std::pin::Pin;
5 use std::sync::{Arc, Mutex};
6 use std::task::Context;
7
8 use anyhow::{bail, format_err, Error};
9 use futures::stream::{StreamExt, TryStreamExt};
10 use serde::Deserialize;
11 use serde_json::{json, Value};
12 use tokio::sync::mpsc;
13 use tokio_stream::wrappers::ReceiverStream;
14 use xdg::BaseDirectories;
15
16 use pathpatterns::{MatchEntry, MatchType, PatternFlag};
17 use proxmox_async::blocking::TokioWriterAdapter;
18 use proxmox_io::StdChannelWriter;
19 use proxmox_router::{cli::*, ApiMethod, RpcEnvironment};
20 use proxmox_schema::api;
21 use proxmox_sys::fs::{file_get_json, image_size, replace_file, CreateOptions};
22 use proxmox_time::{epoch_i64, strftime_local};
23 use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
24
25 use pbs_api_types::{
26 Authid, BackupDir, BackupGroup, BackupNamespace, BackupPart, BackupType, CryptMode,
27 Fingerprint, GroupListItem, HumanByte, PruneListItem, PruneOptions, RateLimitConfig,
28 SnapshotListItem, StorageStatus, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
29 BACKUP_TYPE_SCHEMA, TRAFFIC_CONTROL_BURST_SCHEMA, TRAFFIC_CONTROL_RATE_SCHEMA,
30 };
31 use pbs_client::catalog_shell::Shell;
32 use pbs_client::tools::{
33 complete_archive_name, complete_auth_id, complete_backup_group, complete_backup_snapshot,
34 complete_backup_source, complete_chunk_size, complete_group_or_snapshot,
35 complete_img_archive_name, complete_pxar_archive_name, complete_repository, connect,
36 connect_rate_limited, extract_repository_from_value,
37 key_source::{
38 crypto_parameters, format_key_source, get_encryption_key_password, KEYFD_SCHEMA,
39 KEYFILE_SCHEMA, MASTER_PUBKEY_FD_SCHEMA, MASTER_PUBKEY_FILE_SCHEMA,
40 },
41 CHUNK_SIZE_SCHEMA, REPO_URL_SCHEMA,
42 };
43 use pbs_client::{
44 delete_ticket_info, parse_backup_specification, view_task_result, BackupReader,
45 BackupRepository, BackupSpecificationType, BackupStats, BackupWriter, ChunkStream,
46 FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader, UploadOptions,
47 BACKUP_SOURCE_SCHEMA,
48 };
49 use pbs_config::key_config::{decrypt_key, rsa_encrypt_key_config, KeyConfig};
50 use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter};
51 use pbs_datastore::chunk_store::verify_chunk_size;
52 use pbs_datastore::dynamic_index::{BufferedDynamicReader, DynamicIndexReader};
53 use pbs_datastore::fixed_index::FixedIndexReader;
54 use pbs_datastore::index::IndexFile;
55 use pbs_datastore::manifest::{
56 archive_type, ArchiveType, BackupManifest, ENCRYPTED_KEY_BLOB_NAME, MANIFEST_BLOB_NAME,
57 };
58 use pbs_datastore::read_chunk::AsyncReadChunk;
59 use pbs_datastore::CATALOG_NAME;
60 use pbs_tools::crypt_config::CryptConfig;
61 use pbs_tools::json;
62
63 mod benchmark;
64 pub use benchmark::*;
65 mod mount;
66 pub use mount::*;
67 mod task;
68 pub use task::*;
69 mod catalog;
70 pub use catalog::*;
71 mod snapshot;
72 pub use snapshot::*;
73 pub mod key;
74
75 fn record_repository(repo: &BackupRepository) {
76 let base = match BaseDirectories::with_prefix("proxmox-backup") {
77 Ok(v) => v,
78 _ => return,
79 };
80
81 // usually $HOME/.cache/proxmox-backup/repo-list
82 let path = match base.place_cache_file("repo-list") {
83 Ok(v) => v,
84 _ => return,
85 };
86
87 let mut data = file_get_json(&path, None).unwrap_or_else(|_| json!({}));
88
89 let repo = repo.to_string();
90
91 data[&repo] = json! { data[&repo].as_i64().unwrap_or(0) + 1 };
92
93 let mut map = serde_json::map::Map::new();
94
95 loop {
96 let mut max_used = 0;
97 let mut max_repo = None;
98 for (repo, count) in data.as_object().unwrap() {
99 if map.contains_key(repo) {
100 continue;
101 }
102 if let Some(count) = count.as_i64() {
103 if count > max_used {
104 max_used = count;
105 max_repo = Some(repo);
106 }
107 }
108 }
109 if let Some(repo) = max_repo {
110 map.insert(repo.to_owned(), json!(max_used));
111 } else {
112 break;
113 }
114 if map.len() > 10 {
115 // store max. 10 repos
116 break;
117 }
118 }
119
120 let new_data = json!(map);
121
122 let _ = replace_file(
123 path,
124 new_data.to_string().as_bytes(),
125 CreateOptions::new(),
126 false,
127 );
128 }
129
130 async fn api_datastore_list_snapshots(
131 client: &HttpClient,
132 store: &str,
133 ns: &BackupNamespace,
134 group: Option<&BackupGroup>,
135 ) -> Result<Value, Error> {
136 let path = format!("api2/json/admin/datastore/{}/snapshots", store);
137
138 let mut args = match group {
139 Some(group) => serde_json::to_value(group)?,
140 None => json!({}),
141 };
142 if !ns.is_root() {
143 args["backup-ns"] = serde_json::to_value(ns)?;
144 }
145
146 let mut result = client.get(&path, Some(args)).await?;
147
148 Ok(result["data"].take())
149 }
150
151 pub async fn api_datastore_latest_snapshot(
152 client: &HttpClient,
153 store: &str,
154 ns: &BackupNamespace,
155 group: BackupGroup,
156 ) -> Result<BackupDir, Error> {
157 let list = api_datastore_list_snapshots(client, store, ns, Some(&group)).await?;
158 let mut list: Vec<SnapshotListItem> = serde_json::from_value(list)?;
159
160 if list.is_empty() {
161 bail!("backup group {} does not contain any snapshots.", group);
162 }
163
164 list.sort_unstable_by(|a, b| b.backup.time.cmp(&a.backup.time));
165
166 Ok((group, list[0].backup.time).into())
167 }
168
169 pub async fn dir_or_last_from_group(
170 client: &HttpClient,
171 repo: &BackupRepository,
172 ns: &BackupNamespace,
173 path: &str,
174 ) -> Result<BackupDir, Error> {
175 match path.parse::<BackupPart>()? {
176 BackupPart::Dir(dir) => Ok(dir),
177 BackupPart::Group(group) => {
178 api_datastore_latest_snapshot(&client, repo.store(), ns, group).await
179 }
180 }
181 }
182
183 async fn backup_directory<P: AsRef<Path>>(
184 client: &BackupWriter,
185 dir_path: P,
186 archive_name: &str,
187 chunk_size: Option<usize>,
188 catalog: Arc<Mutex<CatalogWriter<TokioWriterAdapter<StdChannelWriter<Error>>>>>,
189 pxar_create_options: pbs_client::pxar::PxarCreateOptions,
190 upload_options: UploadOptions,
191 ) -> Result<BackupStats, Error> {
192 let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), catalog, pxar_create_options)?;
193 let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
194
195 let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
196
197 let stream = ReceiverStream::new(rx).map_err(Error::from);
198
199 // spawn chunker inside a separate task so that it can run parallel
200 tokio::spawn(async move {
201 while let Some(v) = chunk_stream.next().await {
202 let _ = tx.send(v).await;
203 }
204 });
205
206 if upload_options.fixed_size.is_some() {
207 bail!("cannot backup directory with fixed chunk size!");
208 }
209
210 let stats = client
211 .upload_stream(archive_name, stream, upload_options)
212 .await?;
213
214 Ok(stats)
215 }
216
217 async fn backup_image<P: AsRef<Path>>(
218 client: &BackupWriter,
219 image_path: P,
220 archive_name: &str,
221 chunk_size: Option<usize>,
222 upload_options: UploadOptions,
223 ) -> Result<BackupStats, Error> {
224 let path = image_path.as_ref().to_owned();
225
226 let file = tokio::fs::File::open(path).await?;
227
228 let stream = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
229 .map_err(Error::from);
230
231 let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4 * 1024 * 1024));
232
233 if upload_options.fixed_size.is_none() {
234 bail!("cannot backup image with dynamic chunk size!");
235 }
236
237 let stats = client
238 .upload_stream(archive_name, stream, upload_options)
239 .await?;
240
241 Ok(stats)
242 }
243
244 pub fn optional_ns_param(param: &Value) -> Result<BackupNamespace, Error> {
245 Ok(match param.get("ns") {
246 Some(Value::String(ns)) => ns.parse()?,
247 Some(_) => bail!("invalid namespace parameter"),
248 None => BackupNamespace::root(),
249 })
250 }
251
252 #[api(
253 input: {
254 properties: {
255 repository: {
256 schema: REPO_URL_SCHEMA,
257 optional: true,
258 },
259 "ns": {
260 type: BackupNamespace,
261 optional: true,
262 },
263 "output-format": {
264 schema: OUTPUT_FORMAT,
265 optional: true,
266 },
267 }
268 }
269 )]
270 /// List backup groups.
271 async fn list_backup_groups(param: Value) -> Result<Value, Error> {
272 let output_format = get_output_format(&param);
273
274 let repo = extract_repository_from_value(&param)?;
275
276 let client = connect(&repo)?;
277
278 let path = format!("api2/json/admin/datastore/{}/groups", repo.store());
279
280 let backup_ns = optional_ns_param(&param)?;
281 let mut result = client
282 .get(&path, Some(json!({ "backup-ns": backup_ns })))
283 .await?;
284
285 record_repository(&repo);
286
287 let render_group_path = |_v: &Value, record: &Value| -> Result<String, Error> {
288 let item = GroupListItem::deserialize(record)?;
289 Ok(item.backup.to_string())
290 };
291
292 let render_last_backup = |_v: &Value, record: &Value| -> Result<String, Error> {
293 let item = GroupListItem::deserialize(record)?;
294 let snapshot = BackupDir {
295 group: item.backup,
296 time: item.last_backup,
297 };
298 Ok(snapshot.to_string())
299 };
300
301 let render_files = |_v: &Value, record: &Value| -> Result<String, Error> {
302 let item = GroupListItem::deserialize(record)?;
303 Ok(pbs_tools::format::render_backup_file_list(&item.files))
304 };
305
306 let options = default_table_format_options()
307 .sortby("backup-type", false)
308 .sortby("backup-id", false)
309 .column(
310 ColumnConfig::new("backup-id")
311 .renderer(render_group_path)
312 .header("group"),
313 )
314 .column(
315 ColumnConfig::new("last-backup")
316 .renderer(render_last_backup)
317 .header("last snapshot")
318 .right_align(false),
319 )
320 .column(ColumnConfig::new("backup-count"))
321 .column(ColumnConfig::new("files").renderer(render_files));
322
323 let mut data: Value = result["data"].take();
324
325 let return_type = &pbs_api_types::ADMIN_DATASTORE_LIST_GROUPS_RETURN_TYPE;
326
327 format_and_print_result_full(&mut data, return_type, &output_format, &options);
328
329 Ok(Value::Null)
330 }
331
332 fn merge_group_into(to: &mut serde_json::Map<String, Value>, group: BackupGroup) {
333 match serde_json::to_value(group).unwrap() {
334 Value::Object(group) => to.extend(group),
335 _ => unreachable!(),
336 }
337 }
338
339 #[api(
340 input: {
341 properties: {
342 repository: {
343 schema: REPO_URL_SCHEMA,
344 optional: true,
345 },
346 group: {
347 type: String,
348 description: "Backup group.",
349 },
350 "new-owner": {
351 type: Authid,
352 },
353 }
354 }
355 )]
356 /// Change owner of a backup group
357 async fn change_backup_owner(group: String, mut param: Value) -> Result<(), Error> {
358 let repo = extract_repository_from_value(&param)?;
359
360 let client = connect(&repo)?;
361
362 param.as_object_mut().unwrap().remove("repository");
363
364 let group: BackupGroup = group.parse()?;
365
366 merge_group_into(param.as_object_mut().unwrap(), group);
367
368 let path = format!("api2/json/admin/datastore/{}/change-owner", repo.store());
369 client.post(&path, Some(param)).await?;
370
371 record_repository(&repo);
372
373 Ok(())
374 }
375
376 #[api(
377 input: {
378 properties: {
379 repository: {
380 schema: REPO_URL_SCHEMA,
381 optional: true,
382 },
383 }
384 }
385 )]
386 /// Try to login. If successful, store ticket.
387 async fn api_login(param: Value) -> Result<Value, Error> {
388 let repo = extract_repository_from_value(&param)?;
389
390 let client = connect(&repo)?;
391 client.login().await?;
392
393 record_repository(&repo);
394
395 Ok(Value::Null)
396 }
397
398 #[api(
399 input: {
400 properties: {
401 repository: {
402 schema: REPO_URL_SCHEMA,
403 optional: true,
404 },
405 }
406 }
407 )]
408 /// Logout (delete stored ticket).
409 fn api_logout(param: Value) -> Result<Value, Error> {
410 let repo = extract_repository_from_value(&param)?;
411
412 delete_ticket_info("proxmox-backup", repo.host(), repo.user())?;
413
414 Ok(Value::Null)
415 }
416
417 #[api(
418 input: {
419 properties: {
420 repository: {
421 schema: REPO_URL_SCHEMA,
422 optional: true,
423 },
424 "output-format": {
425 schema: OUTPUT_FORMAT,
426 optional: true,
427 },
428 }
429 }
430 )]
431 /// Show client and optional server version
432 async fn api_version(param: Value) -> Result<(), Error> {
433 let output_format = get_output_format(&param);
434
435 let mut version_info = json!({
436 "client": {
437 "version": pbs_buildcfg::PROXMOX_PKG_VERSION,
438 "release": pbs_buildcfg::PROXMOX_PKG_RELEASE,
439 "repoid": pbs_buildcfg::PROXMOX_PKG_REPOID,
440 }
441 });
442
443 let repo = extract_repository_from_value(&param);
444 if let Ok(repo) = repo {
445 let client = connect(&repo)?;
446
447 match client.get("api2/json/version", None).await {
448 Ok(mut result) => version_info["server"] = result["data"].take(),
449 Err(e) => eprintln!("could not connect to server - {}", e),
450 }
451 }
452 if output_format == "text" {
453 println!(
454 "client version: {}.{}",
455 pbs_buildcfg::PROXMOX_PKG_VERSION,
456 pbs_buildcfg::PROXMOX_PKG_RELEASE,
457 );
458 if let Some(server) = version_info["server"].as_object() {
459 let server_version = server["version"].as_str().unwrap();
460 let server_release = server["release"].as_str().unwrap();
461 println!("server version: {}.{}", server_version, server_release);
462 }
463 } else {
464 format_and_print_result(&version_info, &output_format);
465 }
466
467 Ok(())
468 }
469
470 #[api(
471 input: {
472 properties: {
473 repository: {
474 schema: REPO_URL_SCHEMA,
475 optional: true,
476 },
477 "output-format": {
478 schema: OUTPUT_FORMAT,
479 optional: true,
480 },
481 },
482 },
483 )]
484 /// Start garbage collection for a specific repository.
485 async fn start_garbage_collection(param: Value) -> Result<Value, Error> {
486 let repo = extract_repository_from_value(&param)?;
487
488 let output_format = get_output_format(&param);
489
490 let client = connect(&repo)?;
491
492 let path = format!("api2/json/admin/datastore/{}/gc", repo.store());
493
494 let result = client.post(&path, None).await?;
495
496 record_repository(&repo);
497
498 view_task_result(&client, result, &output_format).await?;
499
500 Ok(Value::Null)
501 }
502
503 struct CatalogUploadResult {
504 catalog_writer: Arc<Mutex<CatalogWriter<TokioWriterAdapter<StdChannelWriter<Error>>>>>,
505 result: tokio::sync::oneshot::Receiver<Result<BackupStats, Error>>,
506 }
507
508 fn spawn_catalog_upload(
509 client: Arc<BackupWriter>,
510 encrypt: bool,
511 ) -> Result<CatalogUploadResult, Error> {
512 let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
513 let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx);
514 let catalog_chunk_size = 512 * 1024;
515 let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
516
517 let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(
518 StdChannelWriter::new(catalog_tx),
519 ))?));
520
521 let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel();
522
523 let upload_options = UploadOptions {
524 encrypt,
525 compress: true,
526 ..UploadOptions::default()
527 };
528
529 tokio::spawn(async move {
530 let catalog_upload_result = client
531 .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options)
532 .await;
533
534 if let Err(ref err) = catalog_upload_result {
535 eprintln!("catalog upload error - {}", err);
536 client.cancel();
537 }
538
539 let _ = catalog_result_tx.send(catalog_upload_result);
540 });
541
542 Ok(CatalogUploadResult {
543 catalog_writer,
544 result: catalog_result_rx,
545 })
546 }
547
548 #[api(
549 input: {
550 properties: {
551 backupspec: {
552 type: Array,
553 description: "List of backup source specifications ([<label.ext>:<path>] ...)",
554 items: {
555 schema: BACKUP_SOURCE_SCHEMA,
556 }
557 },
558 repository: {
559 schema: REPO_URL_SCHEMA,
560 optional: true,
561 },
562 "include-dev": {
563 description: "Include mountpoints with same st_dev number (see ``man fstat``) as specified files.",
564 optional: true,
565 items: {
566 type: String,
567 description: "Path to file.",
568 }
569 },
570 "all-file-systems": {
571 type: Boolean,
572 description: "Include all mounted subdirectories.",
573 optional: true,
574 default: false,
575 },
576 keyfile: {
577 schema: KEYFILE_SCHEMA,
578 optional: true,
579 },
580 "keyfd": {
581 schema: KEYFD_SCHEMA,
582 optional: true,
583 },
584 "master-pubkey-file": {
585 schema: MASTER_PUBKEY_FILE_SCHEMA,
586 optional: true,
587 },
588 "master-pubkey-fd": {
589 schema: MASTER_PUBKEY_FD_SCHEMA,
590 optional: true,
591 },
592 "crypt-mode": {
593 type: CryptMode,
594 optional: true,
595 },
596 "skip-lost-and-found": {
597 type: Boolean,
598 description: "Skip lost+found directory.",
599 optional: true,
600 default: false,
601 },
602 "backup-ns": {
603 schema: BACKUP_NAMESPACE_SCHEMA,
604 optional: true,
605 },
606 "backup-type": {
607 schema: BACKUP_TYPE_SCHEMA,
608 optional: true,
609 },
610 "backup-id": {
611 schema: BACKUP_ID_SCHEMA,
612 optional: true,
613 },
614 "backup-time": {
615 schema: BACKUP_TIME_SCHEMA,
616 optional: true,
617 },
618 "chunk-size": {
619 schema: CHUNK_SIZE_SCHEMA,
620 optional: true,
621 },
622 rate: {
623 schema: TRAFFIC_CONTROL_RATE_SCHEMA,
624 optional: true,
625 },
626 burst: {
627 schema: TRAFFIC_CONTROL_BURST_SCHEMA,
628 optional: true,
629 },
630 "exclude": {
631 type: Array,
632 description: "List of paths or patterns for matching files to exclude.",
633 optional: true,
634 items: {
635 type: String,
636 description: "Path or match pattern.",
637 }
638 },
639 "entries-max": {
640 type: Integer,
641 description: "Max number of entries to hold in memory.",
642 optional: true,
643 default: pbs_client::pxar::ENCODER_MAX_ENTRIES as isize,
644 },
645 "verbose": {
646 type: Boolean,
647 description: "Verbose output.",
648 optional: true,
649 default: false,
650 },
651 "dry-run": {
652 type: Boolean,
653 description: "Just show what backup would do, but do not upload anything.",
654 optional: true,
655 default: false,
656 },
657 }
658 }
659 )]
660 /// Create (host) backup.
661 async fn create_backup(
662 param: Value,
663 all_file_systems: bool,
664 skip_lost_and_found: bool,
665 dry_run: bool,
666 verbose: bool,
667 _info: &ApiMethod,
668 _rpcenv: &mut dyn RpcEnvironment,
669 ) -> Result<Value, Error> {
670 let repo = extract_repository_from_value(&param)?;
671
672 let backupspec_list = json::required_array_param(&param, "backupspec")?;
673
674 let backup_time_opt = param["backup-time"].as_i64();
675
676 let chunk_size_opt = param["chunk-size"].as_u64().map(|v| (v * 1024) as usize);
677
678 if let Some(size) = chunk_size_opt {
679 verify_chunk_size(size)?;
680 }
681
682 let rate = match param["rate"].as_str() {
683 Some(s) => Some(s.parse::<HumanByte>()?),
684 None => None,
685 };
686 let burst = match param["burst"].as_str() {
687 Some(s) => Some(s.parse::<HumanByte>()?),
688 None => None,
689 };
690
691 let rate_limit = RateLimitConfig::with_same_inout(rate, burst);
692
693 let crypto = crypto_parameters(&param)?;
694
695 let backup_id = param["backup-id"]
696 .as_str()
697 .unwrap_or(proxmox_sys::nodename());
698
699 let backup_ns: BackupNamespace = match param.get("backup-ns") {
700 Some(ns) => ns
701 .as_str()
702 .ok_or_else(|| format_err!("bad namespace {:?}", ns))?
703 .parse()?,
704 None => BackupNamespace::root(),
705 };
706
707 let backup_type: BackupType = param["backup-type"].as_str().unwrap_or("host").parse()?;
708
709 let include_dev = param["include-dev"].as_array();
710
711 let entries_max = param["entries-max"]
712 .as_u64()
713 .unwrap_or(pbs_client::pxar::ENCODER_MAX_ENTRIES as u64);
714
715 let empty = Vec::new();
716 let exclude_args = param["exclude"].as_array().unwrap_or(&empty);
717
718 let mut pattern_list = Vec::with_capacity(exclude_args.len());
719 for entry in exclude_args {
720 let entry = entry
721 .as_str()
722 .ok_or_else(|| format_err!("Invalid pattern string slice"))?;
723 pattern_list.push(
724 MatchEntry::parse_pattern(entry, PatternFlag::PATH_NAME, MatchType::Exclude)
725 .map_err(|err| format_err!("invalid exclude pattern entry: {}", err))?,
726 );
727 }
728
729 let mut devices = if all_file_systems {
730 None
731 } else {
732 Some(HashSet::new())
733 };
734
735 if let Some(include_dev) = include_dev {
736 if all_file_systems {
737 bail!("option 'all-file-systems' conflicts with option 'include-dev'");
738 }
739
740 let mut set = HashSet::new();
741 for path in include_dev {
742 let path = path.as_str().unwrap();
743 let stat = nix::sys::stat::stat(path)
744 .map_err(|err| format_err!("fstat {:?} failed - {}", path, err))?;
745 set.insert(stat.st_dev);
746 }
747 devices = Some(set);
748 }
749
750 let mut upload_list = vec![];
751 let mut target_set = HashSet::new();
752
753 for backupspec in backupspec_list {
754 let spec = parse_backup_specification(backupspec.as_str().unwrap())?;
755 let filename = &spec.config_string;
756 let target = &spec.archive_name;
757
758 if target_set.contains(target) {
759 bail!("got target twice: '{}'", target);
760 }
761 target_set.insert(target.to_string());
762
763 use std::os::unix::fs::FileTypeExt;
764
765 let metadata = std::fs::metadata(filename)
766 .map_err(|err| format_err!("unable to access '{}' - {}", filename, err))?;
767 let file_type = metadata.file_type();
768
769 match spec.spec_type {
770 BackupSpecificationType::PXAR => {
771 if !file_type.is_dir() {
772 bail!("got unexpected file type (expected directory)");
773 }
774 upload_list.push((
775 BackupSpecificationType::PXAR,
776 filename.to_owned(),
777 format!("{}.didx", target),
778 0,
779 ));
780 }
781 BackupSpecificationType::IMAGE => {
782 if !(file_type.is_file() || file_type.is_block_device()) {
783 bail!("got unexpected file type (expected file or block device)");
784 }
785
786 let size = image_size(&PathBuf::from(filename))?;
787
788 if size == 0 {
789 bail!("got zero-sized file '{}'", filename);
790 }
791
792 upload_list.push((
793 BackupSpecificationType::IMAGE,
794 filename.to_owned(),
795 format!("{}.fidx", target),
796 size,
797 ));
798 }
799 BackupSpecificationType::CONFIG => {
800 if !file_type.is_file() {
801 bail!("got unexpected file type (expected regular file)");
802 }
803 upload_list.push((
804 BackupSpecificationType::CONFIG,
805 filename.to_owned(),
806 format!("{}.blob", target),
807 metadata.len(),
808 ));
809 }
810 BackupSpecificationType::LOGFILE => {
811 if !file_type.is_file() {
812 bail!("got unexpected file type (expected regular file)");
813 }
814 upload_list.push((
815 BackupSpecificationType::LOGFILE,
816 filename.to_owned(),
817 format!("{}.blob", target),
818 metadata.len(),
819 ));
820 }
821 }
822 }
823
824 let backup_time = backup_time_opt.unwrap_or_else(epoch_i64);
825
826 let client = connect_rate_limited(&repo, rate_limit)?;
827 record_repository(&repo);
828
829 let snapshot = BackupDir::from((backup_type, backup_id.to_owned(), backup_time));
830 if backup_ns.is_root() {
831 println!("Starting backup: {snapshot}");
832 } else {
833 println!("Starting backup: [{backup_ns}]:{snapshot}");
834 }
835
836 println!("Client name: {}", proxmox_sys::nodename());
837
838 let start_time = std::time::Instant::now();
839
840 println!(
841 "Starting backup protocol: {}",
842 strftime_local("%c", epoch_i64())?
843 );
844
845 let (crypt_config, rsa_encrypted_key) = match crypto.enc_key {
846 None => (None, None),
847 Some(key_with_source) => {
848 println!(
849 "{}",
850 format_key_source(&key_with_source.source, "encryption")
851 );
852
853 let (key, created, fingerprint) =
854 decrypt_key(&key_with_source.key, &get_encryption_key_password)?;
855 println!("Encryption key fingerprint: {}", fingerprint);
856
857 let crypt_config = CryptConfig::new(key)?;
858
859 match crypto.master_pubkey {
860 Some(pem_with_source) => {
861 println!("{}", format_key_source(&pem_with_source.source, "master"));
862
863 let rsa = openssl::rsa::Rsa::public_key_from_pem(&pem_with_source.key)?;
864
865 let mut key_config = KeyConfig::without_password(key)?;
866 key_config.created = created; // keep original value
867
868 let enc_key = rsa_encrypt_key_config(rsa, &key_config)?;
869
870 (Some(Arc::new(crypt_config)), Some(enc_key))
871 }
872 _ => (Some(Arc::new(crypt_config)), None),
873 }
874 }
875 };
876
877 let client = BackupWriter::start(
878 client,
879 crypt_config.clone(),
880 repo.store(),
881 &backup_ns,
882 &snapshot,
883 verbose,
884 false,
885 )
886 .await?;
887
888 let download_previous_manifest = match client.previous_backup_time().await {
889 Ok(Some(backup_time)) => {
890 println!(
891 "Downloading previous manifest ({})",
892 strftime_local("%c", backup_time)?
893 );
894 true
895 }
896 Ok(None) => {
897 println!("No previous manifest available.");
898 false
899 }
900 Err(_) => {
901 // Fallback for outdated server, TODO remove/bubble up with 2.0
902 true
903 }
904 };
905
906 let previous_manifest = if download_previous_manifest {
907 match client.download_previous_manifest().await {
908 Ok(previous_manifest) => {
909 match previous_manifest.check_fingerprint(crypt_config.as_ref().map(Arc::as_ref)) {
910 Ok(()) => Some(Arc::new(previous_manifest)),
911 Err(err) => {
912 println!("Couldn't re-use previous manifest - {}", err);
913 None
914 }
915 }
916 }
917 Err(err) => {
918 println!("Couldn't download previous manifest - {}", err);
919 None
920 }
921 }
922 } else {
923 None
924 };
925
926 let mut manifest = BackupManifest::new(snapshot);
927
928 let mut catalog = None;
929 let mut catalog_result_rx = None;
930
931 let log_file = |desc: &str, file: &str, target: &str| {
932 let what = if dry_run { "Would upload" } else { "Upload" };
933 println!("{} {} '{}' to '{}' as {}", what, desc, file, repo, target);
934 };
935
936 for (backup_type, filename, target, size) in upload_list {
937 match (backup_type, dry_run) {
938 // dry-run
939 (BackupSpecificationType::CONFIG, true) => log_file("config file", &filename, &target),
940 (BackupSpecificationType::LOGFILE, true) => log_file("log file", &filename, &target),
941 (BackupSpecificationType::PXAR, true) => log_file("directory", &filename, &target),
942 (BackupSpecificationType::IMAGE, true) => log_file("image", &filename, &target),
943 // no dry-run
944 (BackupSpecificationType::CONFIG, false) => {
945 let upload_options = UploadOptions {
946 compress: true,
947 encrypt: crypto.mode == CryptMode::Encrypt,
948 ..UploadOptions::default()
949 };
950
951 log_file("config file", &filename, &target);
952 let stats = client
953 .upload_blob_from_file(&filename, &target, upload_options)
954 .await?;
955 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
956 }
957 (BackupSpecificationType::LOGFILE, false) => {
958 // fixme: remove - not needed anymore ?
959 let upload_options = UploadOptions {
960 compress: true,
961 encrypt: crypto.mode == CryptMode::Encrypt,
962 ..UploadOptions::default()
963 };
964
965 log_file("log file", &filename, &target);
966 let stats = client
967 .upload_blob_from_file(&filename, &target, upload_options)
968 .await?;
969 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
970 }
971 (BackupSpecificationType::PXAR, false) => {
972 // start catalog upload on first use
973 if catalog.is_none() {
974 let catalog_upload_res =
975 spawn_catalog_upload(client.clone(), crypto.mode == CryptMode::Encrypt)?;
976 catalog = Some(catalog_upload_res.catalog_writer);
977 catalog_result_rx = Some(catalog_upload_res.result);
978 }
979 let catalog = catalog.as_ref().unwrap();
980
981 log_file("directory", &filename, &target);
982 catalog
983 .lock()
984 .unwrap()
985 .start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
986
987 let pxar_options = pbs_client::pxar::PxarCreateOptions {
988 device_set: devices.clone(),
989 patterns: pattern_list.clone(),
990 entries_max: entries_max as usize,
991 skip_lost_and_found,
992 verbose,
993 };
994
995 let upload_options = UploadOptions {
996 previous_manifest: previous_manifest.clone(),
997 compress: true,
998 encrypt: crypto.mode == CryptMode::Encrypt,
999 ..UploadOptions::default()
1000 };
1001
1002 let stats = backup_directory(
1003 &client,
1004 &filename,
1005 &target,
1006 chunk_size_opt,
1007 catalog.clone(),
1008 pxar_options,
1009 upload_options,
1010 )
1011 .await?;
1012 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
1013 catalog.lock().unwrap().end_directory()?;
1014 }
1015 (BackupSpecificationType::IMAGE, false) => {
1016 log_file("image", &filename, &target);
1017
1018 let upload_options = UploadOptions {
1019 previous_manifest: previous_manifest.clone(),
1020 fixed_size: Some(size),
1021 compress: true,
1022 encrypt: crypto.mode == CryptMode::Encrypt,
1023 };
1024
1025 let stats =
1026 backup_image(&client, &filename, &target, chunk_size_opt, upload_options)
1027 .await?;
1028 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
1029 }
1030 }
1031 }
1032
1033 if dry_run {
1034 println!("dry-run: no upload happend");
1035 return Ok(Value::Null);
1036 }
1037
1038 // finalize and upload catalog
1039 if let Some(catalog) = catalog {
1040 let mutex = Arc::try_unwrap(catalog)
1041 .map_err(|_| format_err!("unable to get catalog (still used)"))?;
1042 let mut catalog = mutex.into_inner().unwrap();
1043
1044 catalog.finish()?;
1045
1046 drop(catalog); // close upload stream
1047
1048 if let Some(catalog_result_rx) = catalog_result_rx {
1049 let stats = catalog_result_rx.await??;
1050 manifest.add_file(CATALOG_NAME.to_owned(), stats.size, stats.csum, crypto.mode)?;
1051 }
1052 }
1053
1054 if let Some(rsa_encrypted_key) = rsa_encrypted_key {
1055 let target = ENCRYPTED_KEY_BLOB_NAME;
1056 println!("Upload RSA encoded key to '{:?}' as {}", repo, target);
1057 let options = UploadOptions {
1058 compress: false,
1059 encrypt: false,
1060 ..UploadOptions::default()
1061 };
1062 let stats = client
1063 .upload_blob_from_data(rsa_encrypted_key, target, options)
1064 .await?;
1065 manifest.add_file(target.to_string(), stats.size, stats.csum, crypto.mode)?;
1066 }
1067 // create manifest (index.json)
1068 // manifests are never encrypted, but include a signature
1069 let manifest = manifest
1070 .to_string(crypt_config.as_ref().map(Arc::as_ref))
1071 .map_err(|err| format_err!("unable to format manifest - {}", err))?;
1072
1073 if verbose {
1074 println!("Upload index.json to '{}'", repo)
1075 };
1076 let options = UploadOptions {
1077 compress: true,
1078 encrypt: false,
1079 ..UploadOptions::default()
1080 };
1081 client
1082 .upload_blob_from_data(manifest.into_bytes(), MANIFEST_BLOB_NAME, options)
1083 .await?;
1084
1085 client.finish().await?;
1086
1087 let end_time = std::time::Instant::now();
1088 let elapsed = end_time.duration_since(start_time);
1089 println!("Duration: {:.2}s", elapsed.as_secs_f64());
1090
1091 println!("End Time: {}", strftime_local("%c", epoch_i64())?);
1092
1093 Ok(Value::Null)
1094 }
1095
1096 async fn dump_image<W: Write>(
1097 client: Arc<BackupReader>,
1098 crypt_config: Option<Arc<CryptConfig>>,
1099 crypt_mode: CryptMode,
1100 index: FixedIndexReader,
1101 mut writer: W,
1102 verbose: bool,
1103 ) -> Result<(), Error> {
1104 let most_used = index.find_most_used_chunks(8);
1105
1106 let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, crypt_mode, most_used);
1107
1108 // Note: we avoid using BufferedFixedReader, because that add an additional buffer/copy
1109 // and thus slows down reading. Instead, directly use RemoteChunkReader
1110 let mut per = 0;
1111 let mut bytes = 0;
1112 let start_time = std::time::Instant::now();
1113
1114 for pos in 0..index.index_count() {
1115 let digest = index.index_digest(pos).unwrap();
1116 let raw_data = chunk_reader.read_chunk(digest).await?;
1117 writer.write_all(&raw_data)?;
1118 bytes += raw_data.len();
1119 if verbose {
1120 let next_per = ((pos + 1) * 100) / index.index_count();
1121 if per != next_per {
1122 eprintln!(
1123 "progress {}% (read {} bytes, duration {} sec)",
1124 next_per,
1125 bytes,
1126 start_time.elapsed().as_secs()
1127 );
1128 per = next_per;
1129 }
1130 }
1131 }
1132
1133 let end_time = std::time::Instant::now();
1134 let elapsed = end_time.duration_since(start_time);
1135 eprintln!(
1136 "restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
1137 bytes,
1138 elapsed.as_secs_f64(),
1139 bytes as f64 / (1024.0 * 1024.0 * elapsed.as_secs_f64())
1140 );
1141
1142 Ok(())
1143 }
1144
1145 fn parse_archive_type(name: &str) -> (String, ArchiveType) {
1146 if name.ends_with(".didx") || name.ends_with(".fidx") || name.ends_with(".blob") {
1147 (name.into(), archive_type(name).unwrap())
1148 } else if name.ends_with(".pxar") {
1149 (format!("{}.didx", name), ArchiveType::DynamicIndex)
1150 } else if name.ends_with(".img") {
1151 (format!("{}.fidx", name), ArchiveType::FixedIndex)
1152 } else {
1153 (format!("{}.blob", name), ArchiveType::Blob)
1154 }
1155 }
1156
1157 #[api(
1158 input: {
1159 properties: {
1160 repository: {
1161 schema: REPO_URL_SCHEMA,
1162 optional: true,
1163 },
1164 ns: {
1165 type: BackupNamespace,
1166 optional: true,
1167 },
1168 snapshot: {
1169 type: String,
1170 description: "Group/Snapshot path.",
1171 },
1172 "archive-name": {
1173 description: "Backup archive name.",
1174 type: String,
1175 },
1176 target: {
1177 type: String,
1178 description: r###"Target directory path. Use '-' to write to standard output.
1179
1180 We do not extract '.pxar' archives when writing to standard output.
1181
1182 "###
1183 },
1184 rate: {
1185 schema: TRAFFIC_CONTROL_RATE_SCHEMA,
1186 optional: true,
1187 },
1188 burst: {
1189 schema: TRAFFIC_CONTROL_BURST_SCHEMA,
1190 optional: true,
1191 },
1192 "allow-existing-dirs": {
1193 type: Boolean,
1194 description: "Do not fail if directories already exists.",
1195 optional: true,
1196 },
1197 keyfile: {
1198 schema: KEYFILE_SCHEMA,
1199 optional: true,
1200 },
1201 "keyfd": {
1202 schema: KEYFD_SCHEMA,
1203 optional: true,
1204 },
1205 "crypt-mode": {
1206 type: CryptMode,
1207 optional: true,
1208 },
1209 }
1210 }
1211 )]
1212 /// Restore backup repository.
1213 async fn restore(param: Value) -> Result<Value, Error> {
1214 let repo = extract_repository_from_value(&param)?;
1215
1216 let verbose = param["verbose"].as_bool().unwrap_or(false);
1217
1218 let allow_existing_dirs = param["allow-existing-dirs"].as_bool().unwrap_or(false);
1219
1220 let archive_name = json::required_string_param(&param, "archive-name")?;
1221
1222 let rate = match param["rate"].as_str() {
1223 Some(s) => Some(s.parse::<HumanByte>()?),
1224 None => None,
1225 };
1226 let burst = match param["burst"].as_str() {
1227 Some(s) => Some(s.parse::<HumanByte>()?),
1228 None => None,
1229 };
1230
1231 let rate_limit = RateLimitConfig::with_same_inout(rate, burst);
1232
1233 let client = connect_rate_limited(&repo, rate_limit)?;
1234 record_repository(&repo);
1235
1236 let ns = match param.get("ns") {
1237 Some(Value::String(ns)) => ns.parse()?,
1238 Some(_) => bail!("invalid namespace parameter"),
1239 None => BackupNamespace::root(),
1240 };
1241 let path = json::required_string_param(&param, "snapshot")?;
1242
1243 let backup_dir = dir_or_last_from_group(&client, &repo, &ns, &path).await?;
1244
1245 let target = json::required_string_param(&param, "target")?;
1246 let target = if target == "-" { None } else { Some(target) };
1247
1248 let crypto = crypto_parameters(&param)?;
1249
1250 let crypt_config = match crypto.enc_key {
1251 None => None,
1252 Some(ref key) => {
1253 let (key, _, _) =
1254 decrypt_key(&key.key, &get_encryption_key_password).map_err(|err| {
1255 eprintln!("{}", format_key_source(&key.source, "encryption"));
1256 err
1257 })?;
1258 Some(Arc::new(CryptConfig::new(key)?))
1259 }
1260 };
1261
1262 let client = BackupReader::start(
1263 client,
1264 crypt_config.clone(),
1265 repo.store(),
1266 &ns,
1267 &backup_dir,
1268 true,
1269 )
1270 .await?;
1271
1272 let (archive_name, archive_type) = parse_archive_type(archive_name);
1273
1274 let (manifest, backup_index_data) = client.download_manifest().await?;
1275
1276 if archive_name == ENCRYPTED_KEY_BLOB_NAME && crypt_config.is_none() {
1277 eprintln!("Restoring encrypted key blob without original key - skipping manifest fingerprint check!")
1278 } else {
1279 if manifest.signature.is_some() {
1280 if let Some(key) = &crypto.enc_key {
1281 eprintln!("{}", format_key_source(&key.source, "encryption"));
1282 }
1283 if let Some(config) = &crypt_config {
1284 eprintln!("Fingerprint: {}", Fingerprint::new(config.fingerprint()));
1285 }
1286 }
1287 manifest.check_fingerprint(crypt_config.as_ref().map(Arc::as_ref))?;
1288 }
1289
1290 if archive_name == MANIFEST_BLOB_NAME {
1291 if let Some(target) = target {
1292 replace_file(target, &backup_index_data, CreateOptions::new(), false)?;
1293 } else {
1294 let stdout = std::io::stdout();
1295 let mut writer = stdout.lock();
1296 writer
1297 .write_all(&backup_index_data)
1298 .map_err(|err| format_err!("unable to pipe data - {}", err))?;
1299 }
1300
1301 return Ok(Value::Null);
1302 }
1303
1304 let file_info = manifest.lookup_file_info(&archive_name)?;
1305
1306 if archive_type == ArchiveType::Blob {
1307 let mut reader = client.download_blob(&manifest, &archive_name).await?;
1308
1309 if let Some(target) = target {
1310 let mut writer = std::fs::OpenOptions::new()
1311 .write(true)
1312 .create(true)
1313 .create_new(true)
1314 .open(target)
1315 .map_err(|err| {
1316 format_err!("unable to create target file {:?} - {}", target, err)
1317 })?;
1318 std::io::copy(&mut reader, &mut writer)?;
1319 } else {
1320 let stdout = std::io::stdout();
1321 let mut writer = stdout.lock();
1322 std::io::copy(&mut reader, &mut writer)
1323 .map_err(|err| format_err!("unable to pipe data - {}", err))?;
1324 }
1325 } else if archive_type == ArchiveType::DynamicIndex {
1326 let index = client
1327 .download_dynamic_index(&manifest, &archive_name)
1328 .await?;
1329
1330 let most_used = index.find_most_used_chunks(8);
1331
1332 let chunk_reader = RemoteChunkReader::new(
1333 client.clone(),
1334 crypt_config,
1335 file_info.chunk_crypt_mode(),
1336 most_used,
1337 );
1338
1339 let mut reader = BufferedDynamicReader::new(index, chunk_reader);
1340
1341 let options = pbs_client::pxar::PxarExtractOptions {
1342 match_list: &[],
1343 extract_match_default: true,
1344 allow_existing_dirs,
1345 on_error: None,
1346 };
1347
1348 if let Some(target) = target {
1349 pbs_client::pxar::extract_archive(
1350 pxar::decoder::Decoder::from_std(reader)?,
1351 Path::new(target),
1352 pbs_client::pxar::Flags::DEFAULT,
1353 |path| {
1354 if verbose {
1355 println!("{:?}", path);
1356 }
1357 },
1358 options,
1359 )
1360 .map_err(|err| format_err!("error extracting archive - {}", err))?;
1361 } else {
1362 let mut writer = std::fs::OpenOptions::new()
1363 .write(true)
1364 .open("/dev/stdout")
1365 .map_err(|err| format_err!("unable to open /dev/stdout - {}", err))?;
1366
1367 std::io::copy(&mut reader, &mut writer)
1368 .map_err(|err| format_err!("unable to pipe data - {}", err))?;
1369 }
1370 } else if archive_type == ArchiveType::FixedIndex {
1371 let index = client
1372 .download_fixed_index(&manifest, &archive_name)
1373 .await?;
1374
1375 let mut writer = if let Some(target) = target {
1376 std::fs::OpenOptions::new()
1377 .write(true)
1378 .create(true)
1379 .create_new(true)
1380 .open(target)
1381 .map_err(|err| format_err!("unable to create target file {:?} - {}", target, err))?
1382 } else {
1383 std::fs::OpenOptions::new()
1384 .write(true)
1385 .open("/dev/stdout")
1386 .map_err(|err| format_err!("unable to open /dev/stdout - {}", err))?
1387 };
1388
1389 dump_image(
1390 client.clone(),
1391 crypt_config.clone(),
1392 file_info.chunk_crypt_mode(),
1393 index,
1394 &mut writer,
1395 verbose,
1396 )
1397 .await?;
1398 }
1399
1400 Ok(Value::Null)
1401 }
1402
1403 #[api(
1404 input: {
1405 properties: {
1406 "dry-run": {
1407 type: bool,
1408 optional: true,
1409 description: "Just show what prune would do, but do not delete anything.",
1410 },
1411 group: {
1412 type: String,
1413 description: "Backup group",
1414 },
1415 "prune-options": {
1416 type: PruneOptions,
1417 flatten: true,
1418 },
1419 "output-format": {
1420 schema: OUTPUT_FORMAT,
1421 optional: true,
1422 },
1423 quiet: {
1424 type: bool,
1425 optional: true,
1426 default: false,
1427 description: "Minimal output - only show removals.",
1428 },
1429 repository: {
1430 schema: REPO_URL_SCHEMA,
1431 optional: true,
1432 },
1433 },
1434 },
1435 )]
1436 /// Prune a backup repository.
1437 async fn prune(
1438 dry_run: Option<bool>,
1439 group: String,
1440 prune_options: PruneOptions,
1441 quiet: bool,
1442 mut param: Value,
1443 ) -> Result<Value, Error> {
1444 let repo = extract_repository_from_value(&param)?;
1445
1446 let client = connect(&repo)?;
1447
1448 let path = format!("api2/json/admin/datastore/{}/prune", repo.store());
1449
1450 let group: BackupGroup = group.parse()?;
1451
1452 let output_format = extract_output_format(&mut param);
1453
1454 let mut api_param = serde_json::to_value(prune_options)?;
1455 if let Some(dry_run) = dry_run {
1456 api_param["dry-run"] = dry_run.into();
1457 }
1458 merge_group_into(api_param.as_object_mut().unwrap(), group);
1459
1460 let mut result = client.post(&path, Some(api_param)).await?;
1461
1462 record_repository(&repo);
1463
1464 let render_snapshot_path = |_v: &Value, record: &Value| -> Result<String, Error> {
1465 let item: PruneListItem = serde_json::from_value(record.to_owned())?;
1466 Ok(item.backup.to_string())
1467 };
1468
1469 let render_prune_action = |v: &Value, _record: &Value| -> Result<String, Error> {
1470 Ok(match v.as_bool() {
1471 Some(true) => "keep",
1472 Some(false) => "remove",
1473 None => "unknown",
1474 }
1475 .to_string())
1476 };
1477
1478 let options = default_table_format_options()
1479 .sortby("backup-type", false)
1480 .sortby("backup-id", false)
1481 .sortby("backup-time", false)
1482 .column(
1483 ColumnConfig::new("backup-id")
1484 .renderer(render_snapshot_path)
1485 .header("snapshot"),
1486 )
1487 .column(
1488 ColumnConfig::new("backup-time")
1489 .renderer(pbs_tools::format::render_epoch)
1490 .header("date"),
1491 )
1492 .column(
1493 ColumnConfig::new("keep")
1494 .renderer(render_prune_action)
1495 .header("action"),
1496 );
1497
1498 let return_type = &pbs_api_types::ADMIN_DATASTORE_PRUNE_RETURN_TYPE;
1499
1500 let mut data = result["data"].take();
1501
1502 if quiet {
1503 let list: Vec<Value> = data
1504 .as_array()
1505 .unwrap()
1506 .iter()
1507 .filter(|item| item["keep"].as_bool() == Some(false))
1508 .cloned()
1509 .collect();
1510 data = list.into();
1511 }
1512
1513 format_and_print_result_full(&mut data, return_type, &output_format, &options);
1514
1515 Ok(Value::Null)
1516 }
1517
1518 #[api(
1519 input: {
1520 properties: {
1521 repository: {
1522 schema: REPO_URL_SCHEMA,
1523 optional: true,
1524 },
1525 "output-format": {
1526 schema: OUTPUT_FORMAT,
1527 optional: true,
1528 },
1529 }
1530 },
1531 returns: {
1532 type: StorageStatus,
1533 },
1534 )]
1535 /// Get repository status.
1536 async fn status(param: Value) -> Result<Value, Error> {
1537 let repo = extract_repository_from_value(&param)?;
1538
1539 let output_format = get_output_format(&param);
1540
1541 let client = connect(&repo)?;
1542
1543 let path = format!("api2/json/admin/datastore/{}/status", repo.store());
1544
1545 let mut result = client.get(&path, None).await?;
1546 let mut data = result["data"].take();
1547
1548 record_repository(&repo);
1549
1550 let render_total_percentage = |v: &Value, record: &Value| -> Result<String, Error> {
1551 let v = v.as_u64().unwrap();
1552 let total = record["total"].as_u64().unwrap();
1553 let roundup = total / 200;
1554 let per = ((v + roundup) * 100) / total;
1555 let info = format!(" ({} %)", per);
1556 Ok(format!("{} {:>8}", v, info))
1557 };
1558
1559 let options = default_table_format_options()
1560 .noheader(true)
1561 .column(ColumnConfig::new("total").renderer(render_total_percentage))
1562 .column(ColumnConfig::new("used").renderer(render_total_percentage))
1563 .column(ColumnConfig::new("avail").renderer(render_total_percentage));
1564
1565 let return_type = &API_METHOD_STATUS.returns;
1566
1567 format_and_print_result_full(&mut data, return_type, &output_format, &options);
1568
1569 Ok(Value::Null)
1570 }
1571
1572 /// This is a workaround until we have cleaned up the chunk/reader/... infrastructure for better
1573 /// async use!
1574 ///
1575 /// Ideally BufferedDynamicReader gets replaced so the LruCache maps to `BroadcastFuture<Chunk>`,
1576 /// so that we can properly access it from multiple threads simultaneously while not issuing
1577 /// duplicate simultaneous reads over http.
1578 pub struct BufferedDynamicReadAt {
1579 inner: Mutex<BufferedDynamicReader<RemoteChunkReader>>,
1580 }
1581
1582 impl BufferedDynamicReadAt {
1583 fn new(inner: BufferedDynamicReader<RemoteChunkReader>) -> Self {
1584 Self {
1585 inner: Mutex::new(inner),
1586 }
1587 }
1588 }
1589
1590 impl ReadAt for BufferedDynamicReadAt {
1591 fn start_read_at<'a>(
1592 self: Pin<&'a Self>,
1593 _cx: &mut Context,
1594 buf: &'a mut [u8],
1595 offset: u64,
1596 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
1597 MaybeReady::Ready(tokio::task::block_in_place(move || {
1598 let mut reader = self.inner.lock().unwrap();
1599 reader.seek(SeekFrom::Start(offset))?;
1600 reader.read(buf)
1601 }))
1602 }
1603
1604 fn poll_complete<'a>(
1605 self: Pin<&'a Self>,
1606 _op: ReadAtOperation<'a>,
1607 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
1608 panic!("BufferedDynamicReadAt::start_read_at returned Pending");
1609 }
1610 }
1611
1612 fn main() {
1613 pbs_tools::setup_libc_malloc_opts();
1614
1615 let backup_cmd_def = CliCommand::new(&API_METHOD_CREATE_BACKUP)
1616 .arg_param(&["backupspec"])
1617 .completion_cb("repository", complete_repository)
1618 .completion_cb("backupspec", complete_backup_source)
1619 .completion_cb("keyfile", complete_file_name)
1620 .completion_cb("master-pubkey-file", complete_file_name)
1621 .completion_cb("chunk-size", complete_chunk_size);
1622
1623 let benchmark_cmd_def = CliCommand::new(&API_METHOD_BENCHMARK)
1624 .completion_cb("repository", complete_repository)
1625 .completion_cb("keyfile", complete_file_name);
1626
1627 let list_cmd_def = CliCommand::new(&API_METHOD_LIST_BACKUP_GROUPS)
1628 .completion_cb("repository", complete_repository);
1629
1630 let garbage_collect_cmd_def = CliCommand::new(&API_METHOD_START_GARBAGE_COLLECTION)
1631 .completion_cb("repository", complete_repository);
1632
1633 let restore_cmd_def = CliCommand::new(&API_METHOD_RESTORE)
1634 .arg_param(&["snapshot", "archive-name", "target"])
1635 .completion_cb("repository", complete_repository)
1636 .completion_cb("snapshot", complete_group_or_snapshot)
1637 .completion_cb("archive-name", complete_archive_name)
1638 .completion_cb("target", complete_file_name);
1639
1640 let prune_cmd_def = CliCommand::new(&API_METHOD_PRUNE)
1641 .arg_param(&["group"])
1642 .completion_cb("group", complete_backup_group)
1643 .completion_cb("repository", complete_repository);
1644
1645 let status_cmd_def =
1646 CliCommand::new(&API_METHOD_STATUS).completion_cb("repository", complete_repository);
1647
1648 let login_cmd_def =
1649 CliCommand::new(&API_METHOD_API_LOGIN).completion_cb("repository", complete_repository);
1650
1651 let logout_cmd_def =
1652 CliCommand::new(&API_METHOD_API_LOGOUT).completion_cb("repository", complete_repository);
1653
1654 let version_cmd_def =
1655 CliCommand::new(&API_METHOD_API_VERSION).completion_cb("repository", complete_repository);
1656
1657 let change_owner_cmd_def = CliCommand::new(&API_METHOD_CHANGE_BACKUP_OWNER)
1658 .arg_param(&["group", "new-owner"])
1659 .completion_cb("group", complete_backup_group)
1660 .completion_cb("new-owner", complete_auth_id)
1661 .completion_cb("repository", complete_repository);
1662
1663 let cmd_def = CliCommandMap::new()
1664 .insert("backup", backup_cmd_def)
1665 .insert("garbage-collect", garbage_collect_cmd_def)
1666 .insert("list", list_cmd_def)
1667 .insert("login", login_cmd_def)
1668 .insert("logout", logout_cmd_def)
1669 .insert("prune", prune_cmd_def)
1670 .insert("restore", restore_cmd_def)
1671 .insert("snapshot", snapshot_mgtm_cli())
1672 .insert("status", status_cmd_def)
1673 .insert("key", key::cli())
1674 .insert("mount", mount_cmd_def())
1675 .insert("map", map_cmd_def())
1676 .insert("unmap", unmap_cmd_def())
1677 .insert("catalog", catalog_mgmt_cli())
1678 .insert("task", task_mgmt_cli())
1679 .insert("version", version_cmd_def)
1680 .insert("benchmark", benchmark_cmd_def)
1681 .insert("change-owner", change_owner_cmd_def)
1682 .alias(&["files"], &["snapshot", "files"])
1683 .alias(&["forget"], &["snapshot", "forget"])
1684 .alias(&["upload-log"], &["snapshot", "upload-log"])
1685 .alias(&["snapshots"], &["snapshot", "list"]);
1686
1687 let rpcenv = CliEnvironment::new();
1688 run_cli_command(
1689 cmd_def,
1690 rpcenv,
1691 Some(|future| proxmox_async::runtime::main(future)),
1692 );
1693 }