]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-backup-client/src/main.rs
make datastore BackupGroup/Dir ctors private
[proxmox-backup.git] / proxmox-backup-client / src / main.rs
1 use std::collections::HashSet;
2 use std::io::{self, Read, Seek, SeekFrom, Write};
3 use std::path::{Path, PathBuf};
4 use std::pin::Pin;
5 use std::sync::{Arc, Mutex};
6 use std::task::Context;
7
8 use anyhow::{bail, format_err, Error};
9 use futures::stream::{StreamExt, TryStreamExt};
10 use serde_json::{json, Value};
11 use tokio::sync::mpsc;
12 use tokio_stream::wrappers::ReceiverStream;
13 use xdg::BaseDirectories;
14
15 use pathpatterns::{MatchEntry, MatchType, PatternFlag};
16 use proxmox_async::blocking::TokioWriterAdapter;
17 use proxmox_io::StdChannelWriter;
18 use proxmox_router::{cli::*, ApiMethod, RpcEnvironment};
19 use proxmox_schema::api;
20 use proxmox_sys::fs::{file_get_json, image_size, replace_file, CreateOptions};
21 use proxmox_time::{epoch_i64, strftime_local};
22 use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
23
24 use pbs_api_types::{
25 Authid, BackupDir, BackupGroup, BackupType, CryptMode, Fingerprint, GroupListItem, HumanByte,
26 PruneListItem, PruneOptions, RateLimitConfig, SnapshotListItem, StorageStatus,
27 BACKUP_ID_SCHEMA, BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, TRAFFIC_CONTROL_BURST_SCHEMA,
28 TRAFFIC_CONTROL_RATE_SCHEMA,
29 };
30 use pbs_client::catalog_shell::Shell;
31 use pbs_client::tools::{
32 complete_archive_name, complete_auth_id, complete_backup_group, complete_backup_snapshot,
33 complete_backup_source, complete_chunk_size, complete_group_or_snapshot,
34 complete_img_archive_name, complete_pxar_archive_name, complete_repository, connect,
35 connect_rate_limited, extract_repository_from_value,
36 key_source::{
37 crypto_parameters, format_key_source, get_encryption_key_password, KEYFD_SCHEMA,
38 KEYFILE_SCHEMA, MASTER_PUBKEY_FD_SCHEMA, MASTER_PUBKEY_FILE_SCHEMA,
39 },
40 CHUNK_SIZE_SCHEMA, REPO_URL_SCHEMA,
41 };
42 use pbs_client::{
43 delete_ticket_info, parse_backup_specification, view_task_result, BackupReader,
44 BackupRepository, BackupSpecificationType, BackupStats, BackupWriter, ChunkStream,
45 FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader, UploadOptions,
46 BACKUP_SOURCE_SCHEMA,
47 };
48 use pbs_config::key_config::{decrypt_key, rsa_encrypt_key_config, KeyConfig};
49 use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter};
50 use pbs_datastore::chunk_store::verify_chunk_size;
51 use pbs_datastore::dynamic_index::{BufferedDynamicReader, DynamicIndexReader};
52 use pbs_datastore::fixed_index::FixedIndexReader;
53 use pbs_datastore::index::IndexFile;
54 use pbs_datastore::manifest::{
55 archive_type, ArchiveType, BackupManifest, ENCRYPTED_KEY_BLOB_NAME, MANIFEST_BLOB_NAME,
56 };
57 use pbs_datastore::read_chunk::AsyncReadChunk;
58 use pbs_datastore::CATALOG_NAME;
59 use pbs_tools::crypt_config::CryptConfig;
60 use pbs_tools::json;
61
62 mod benchmark;
63 pub use benchmark::*;
64 mod mount;
65 pub use mount::*;
66 mod task;
67 pub use task::*;
68 mod catalog;
69 pub use catalog::*;
70 mod snapshot;
71 pub use snapshot::*;
72 pub mod key;
73
74 fn record_repository(repo: &BackupRepository) {
75 let base = match BaseDirectories::with_prefix("proxmox-backup") {
76 Ok(v) => v,
77 _ => return,
78 };
79
80 // usually $HOME/.cache/proxmox-backup/repo-list
81 let path = match base.place_cache_file("repo-list") {
82 Ok(v) => v,
83 _ => return,
84 };
85
86 let mut data = file_get_json(&path, None).unwrap_or_else(|_| json!({}));
87
88 let repo = repo.to_string();
89
90 data[&repo] = json! { data[&repo].as_i64().unwrap_or(0) + 1 };
91
92 let mut map = serde_json::map::Map::new();
93
94 loop {
95 let mut max_used = 0;
96 let mut max_repo = None;
97 for (repo, count) in data.as_object().unwrap() {
98 if map.contains_key(repo) {
99 continue;
100 }
101 if let Some(count) = count.as_i64() {
102 if count > max_used {
103 max_used = count;
104 max_repo = Some(repo);
105 }
106 }
107 }
108 if let Some(repo) = max_repo {
109 map.insert(repo.to_owned(), json!(max_used));
110 } else {
111 break;
112 }
113 if map.len() > 10 {
114 // store max. 10 repos
115 break;
116 }
117 }
118
119 let new_data = json!(map);
120
121 let _ = replace_file(
122 path,
123 new_data.to_string().as_bytes(),
124 CreateOptions::new(),
125 false,
126 );
127 }
128
129 async fn api_datastore_list_snapshots(
130 client: &HttpClient,
131 store: &str,
132 group: Option<BackupGroup>,
133 ) -> Result<Value, Error> {
134 let path = format!("api2/json/admin/datastore/{}/snapshots", store);
135
136 let mut args = json!({});
137 if let Some(group) = group {
138 args["backup-type"] = group.ty.to_string().into();
139 args["backup-id"] = group.id.into();
140 }
141
142 let mut result = client.get(&path, Some(args)).await?;
143
144 Ok(result["data"].take())
145 }
146
147 pub async fn api_datastore_latest_snapshot(
148 client: &HttpClient,
149 store: &str,
150 group: BackupGroup,
151 ) -> Result<(BackupType, String, i64), Error> {
152 let list = api_datastore_list_snapshots(client, store, Some(group.clone())).await?;
153 let mut list: Vec<SnapshotListItem> = serde_json::from_value(list)?;
154
155 if list.is_empty() {
156 bail!("backup group {} does not contain any snapshots.", group);
157 }
158
159 list.sort_unstable_by(|a, b| b.backup.time.cmp(&a.backup.time));
160
161 Ok((group.ty, group.id, list[0].backup.time))
162 }
163
164 async fn backup_directory<P: AsRef<Path>>(
165 client: &BackupWriter,
166 dir_path: P,
167 archive_name: &str,
168 chunk_size: Option<usize>,
169 catalog: Arc<Mutex<CatalogWriter<TokioWriterAdapter<StdChannelWriter<Error>>>>>,
170 pxar_create_options: pbs_client::pxar::PxarCreateOptions,
171 upload_options: UploadOptions,
172 ) -> Result<BackupStats, Error> {
173 let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), catalog, pxar_create_options)?;
174 let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
175
176 let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
177
178 let stream = ReceiverStream::new(rx).map_err(Error::from);
179
180 // spawn chunker inside a separate task so that it can run parallel
181 tokio::spawn(async move {
182 while let Some(v) = chunk_stream.next().await {
183 let _ = tx.send(v).await;
184 }
185 });
186
187 if upload_options.fixed_size.is_some() {
188 bail!("cannot backup directory with fixed chunk size!");
189 }
190
191 let stats = client
192 .upload_stream(archive_name, stream, upload_options)
193 .await?;
194
195 Ok(stats)
196 }
197
198 async fn backup_image<P: AsRef<Path>>(
199 client: &BackupWriter,
200 image_path: P,
201 archive_name: &str,
202 chunk_size: Option<usize>,
203 upload_options: UploadOptions,
204 ) -> Result<BackupStats, Error> {
205 let path = image_path.as_ref().to_owned();
206
207 let file = tokio::fs::File::open(path).await?;
208
209 let stream = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
210 .map_err(Error::from);
211
212 let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4 * 1024 * 1024));
213
214 if upload_options.fixed_size.is_none() {
215 bail!("cannot backup image with dynamic chunk size!");
216 }
217
218 let stats = client
219 .upload_stream(archive_name, stream, upload_options)
220 .await?;
221
222 Ok(stats)
223 }
224
225 #[api(
226 input: {
227 properties: {
228 repository: {
229 schema: REPO_URL_SCHEMA,
230 optional: true,
231 },
232 "output-format": {
233 schema: OUTPUT_FORMAT,
234 optional: true,
235 },
236 }
237 }
238 )]
239 /// List backup groups.
240 async fn list_backup_groups(param: Value) -> Result<Value, Error> {
241 let output_format = get_output_format(&param);
242
243 let repo = extract_repository_from_value(&param)?;
244
245 let client = connect(&repo)?;
246
247 let path = format!("api2/json/admin/datastore/{}/groups", repo.store());
248
249 let mut result = client.get(&path, None).await?;
250
251 record_repository(&repo);
252
253 let render_group_path = |_v: &Value, record: &Value| -> Result<String, Error> {
254 let item: GroupListItem = serde_json::from_value(record.to_owned())?;
255 let group = BackupGroup::new(item.backup.ty, item.backup.id);
256 Ok(group.to_string())
257 };
258
259 let render_last_backup = |_v: &Value, record: &Value| -> Result<String, Error> {
260 let item: GroupListItem = serde_json::from_value(record.to_owned())?;
261 let snapshot = BackupDir {
262 group: item.backup,
263 time: item.last_backup,
264 };
265 Ok(snapshot.to_string())
266 };
267
268 let render_files = |_v: &Value, record: &Value| -> Result<String, Error> {
269 let item: GroupListItem = serde_json::from_value(record.to_owned())?;
270 Ok(pbs_tools::format::render_backup_file_list(&item.files))
271 };
272
273 let options = default_table_format_options()
274 .sortby("backup-type", false)
275 .sortby("backup-id", false)
276 .column(
277 ColumnConfig::new("backup-id")
278 .renderer(render_group_path)
279 .header("group"),
280 )
281 .column(
282 ColumnConfig::new("last-backup")
283 .renderer(render_last_backup)
284 .header("last snapshot")
285 .right_align(false),
286 )
287 .column(ColumnConfig::new("backup-count"))
288 .column(ColumnConfig::new("files").renderer(render_files));
289
290 let mut data: Value = result["data"].take();
291
292 let return_type = &pbs_api_types::ADMIN_DATASTORE_LIST_GROUPS_RETURN_TYPE;
293
294 format_and_print_result_full(&mut data, return_type, &output_format, &options);
295
296 Ok(Value::Null)
297 }
298
299 #[api(
300 input: {
301 properties: {
302 repository: {
303 schema: REPO_URL_SCHEMA,
304 optional: true,
305 },
306 group: {
307 type: String,
308 description: "Backup group.",
309 },
310 "new-owner": {
311 type: Authid,
312 },
313 }
314 }
315 )]
316 /// Change owner of a backup group
317 async fn change_backup_owner(group: String, mut param: Value) -> Result<(), Error> {
318 let repo = extract_repository_from_value(&param)?;
319
320 let client = connect(&repo)?;
321
322 param.as_object_mut().unwrap().remove("repository");
323
324 let group: BackupGroup = group.parse()?;
325
326 param["backup-type"] = group.ty.to_string().into();
327 param["backup-id"] = group.id.into();
328
329 let path = format!("api2/json/admin/datastore/{}/change-owner", repo.store());
330 client.post(&path, Some(param)).await?;
331
332 record_repository(&repo);
333
334 Ok(())
335 }
336
337 #[api(
338 input: {
339 properties: {
340 repository: {
341 schema: REPO_URL_SCHEMA,
342 optional: true,
343 },
344 }
345 }
346 )]
347 /// Try to login. If successful, store ticket.
348 async fn api_login(param: Value) -> Result<Value, Error> {
349 let repo = extract_repository_from_value(&param)?;
350
351 let client = connect(&repo)?;
352 client.login().await?;
353
354 record_repository(&repo);
355
356 Ok(Value::Null)
357 }
358
359 #[api(
360 input: {
361 properties: {
362 repository: {
363 schema: REPO_URL_SCHEMA,
364 optional: true,
365 },
366 }
367 }
368 )]
369 /// Logout (delete stored ticket).
370 fn api_logout(param: Value) -> Result<Value, Error> {
371 let repo = extract_repository_from_value(&param)?;
372
373 delete_ticket_info("proxmox-backup", repo.host(), repo.user())?;
374
375 Ok(Value::Null)
376 }
377
378 #[api(
379 input: {
380 properties: {
381 repository: {
382 schema: REPO_URL_SCHEMA,
383 optional: true,
384 },
385 "output-format": {
386 schema: OUTPUT_FORMAT,
387 optional: true,
388 },
389 }
390 }
391 )]
392 /// Show client and optional server version
393 async fn api_version(param: Value) -> Result<(), Error> {
394 let output_format = get_output_format(&param);
395
396 let mut version_info = json!({
397 "client": {
398 "version": pbs_buildcfg::PROXMOX_PKG_VERSION,
399 "release": pbs_buildcfg::PROXMOX_PKG_RELEASE,
400 "repoid": pbs_buildcfg::PROXMOX_PKG_REPOID,
401 }
402 });
403
404 let repo = extract_repository_from_value(&param);
405 if let Ok(repo) = repo {
406 let client = connect(&repo)?;
407
408 match client.get("api2/json/version", None).await {
409 Ok(mut result) => version_info["server"] = result["data"].take(),
410 Err(e) => eprintln!("could not connect to server - {}", e),
411 }
412 }
413 if output_format == "text" {
414 println!(
415 "client version: {}.{}",
416 pbs_buildcfg::PROXMOX_PKG_VERSION,
417 pbs_buildcfg::PROXMOX_PKG_RELEASE,
418 );
419 if let Some(server) = version_info["server"].as_object() {
420 let server_version = server["version"].as_str().unwrap();
421 let server_release = server["release"].as_str().unwrap();
422 println!("server version: {}.{}", server_version, server_release);
423 }
424 } else {
425 format_and_print_result(&version_info, &output_format);
426 }
427
428 Ok(())
429 }
430
431 #[api(
432 input: {
433 properties: {
434 repository: {
435 schema: REPO_URL_SCHEMA,
436 optional: true,
437 },
438 "output-format": {
439 schema: OUTPUT_FORMAT,
440 optional: true,
441 },
442 },
443 },
444 )]
445 /// Start garbage collection for a specific repository.
446 async fn start_garbage_collection(param: Value) -> Result<Value, Error> {
447 let repo = extract_repository_from_value(&param)?;
448
449 let output_format = get_output_format(&param);
450
451 let client = connect(&repo)?;
452
453 let path = format!("api2/json/admin/datastore/{}/gc", repo.store());
454
455 let result = client.post(&path, None).await?;
456
457 record_repository(&repo);
458
459 view_task_result(&client, result, &output_format).await?;
460
461 Ok(Value::Null)
462 }
463
464 struct CatalogUploadResult {
465 catalog_writer: Arc<Mutex<CatalogWriter<TokioWriterAdapter<StdChannelWriter<Error>>>>>,
466 result: tokio::sync::oneshot::Receiver<Result<BackupStats, Error>>,
467 }
468
469 fn spawn_catalog_upload(
470 client: Arc<BackupWriter>,
471 encrypt: bool,
472 ) -> Result<CatalogUploadResult, Error> {
473 let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
474 let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx);
475 let catalog_chunk_size = 512 * 1024;
476 let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
477
478 let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(
479 StdChannelWriter::new(catalog_tx),
480 ))?));
481
482 let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel();
483
484 let upload_options = UploadOptions {
485 encrypt,
486 compress: true,
487 ..UploadOptions::default()
488 };
489
490 tokio::spawn(async move {
491 let catalog_upload_result = client
492 .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options)
493 .await;
494
495 if let Err(ref err) = catalog_upload_result {
496 eprintln!("catalog upload error - {}", err);
497 client.cancel();
498 }
499
500 let _ = catalog_result_tx.send(catalog_upload_result);
501 });
502
503 Ok(CatalogUploadResult {
504 catalog_writer,
505 result: catalog_result_rx,
506 })
507 }
508
509 #[api(
510 input: {
511 properties: {
512 backupspec: {
513 type: Array,
514 description: "List of backup source specifications ([<label.ext>:<path>] ...)",
515 items: {
516 schema: BACKUP_SOURCE_SCHEMA,
517 }
518 },
519 repository: {
520 schema: REPO_URL_SCHEMA,
521 optional: true,
522 },
523 "include-dev": {
524 description: "Include mountpoints with same st_dev number (see ``man fstat``) as specified files.",
525 optional: true,
526 items: {
527 type: String,
528 description: "Path to file.",
529 }
530 },
531 "all-file-systems": {
532 type: Boolean,
533 description: "Include all mounted subdirectories.",
534 optional: true,
535 default: false,
536 },
537 keyfile: {
538 schema: KEYFILE_SCHEMA,
539 optional: true,
540 },
541 "keyfd": {
542 schema: KEYFD_SCHEMA,
543 optional: true,
544 },
545 "master-pubkey-file": {
546 schema: MASTER_PUBKEY_FILE_SCHEMA,
547 optional: true,
548 },
549 "master-pubkey-fd": {
550 schema: MASTER_PUBKEY_FD_SCHEMA,
551 optional: true,
552 },
553 "crypt-mode": {
554 type: CryptMode,
555 optional: true,
556 },
557 "skip-lost-and-found": {
558 type: Boolean,
559 description: "Skip lost+found directory.",
560 optional: true,
561 default: false,
562 },
563 "backup-type": {
564 schema: BACKUP_TYPE_SCHEMA,
565 optional: true,
566 },
567 "backup-id": {
568 schema: BACKUP_ID_SCHEMA,
569 optional: true,
570 },
571 "backup-time": {
572 schema: BACKUP_TIME_SCHEMA,
573 optional: true,
574 },
575 "chunk-size": {
576 schema: CHUNK_SIZE_SCHEMA,
577 optional: true,
578 },
579 rate: {
580 schema: TRAFFIC_CONTROL_RATE_SCHEMA,
581 optional: true,
582 },
583 burst: {
584 schema: TRAFFIC_CONTROL_BURST_SCHEMA,
585 optional: true,
586 },
587 "exclude": {
588 type: Array,
589 description: "List of paths or patterns for matching files to exclude.",
590 optional: true,
591 items: {
592 type: String,
593 description: "Path or match pattern.",
594 }
595 },
596 "entries-max": {
597 type: Integer,
598 description: "Max number of entries to hold in memory.",
599 optional: true,
600 default: pbs_client::pxar::ENCODER_MAX_ENTRIES as isize,
601 },
602 "verbose": {
603 type: Boolean,
604 description: "Verbose output.",
605 optional: true,
606 default: false,
607 },
608 "dry-run": {
609 type: Boolean,
610 description: "Just show what backup would do, but do not upload anything.",
611 optional: true,
612 default: false,
613 },
614 }
615 }
616 )]
617 /// Create (host) backup.
618 async fn create_backup(
619 param: Value,
620 all_file_systems: bool,
621 skip_lost_and_found: bool,
622 dry_run: bool,
623 verbose: bool,
624 _info: &ApiMethod,
625 _rpcenv: &mut dyn RpcEnvironment,
626 ) -> Result<Value, Error> {
627 let repo = extract_repository_from_value(&param)?;
628
629 let backupspec_list = json::required_array_param(&param, "backupspec")?;
630
631 let backup_time_opt = param["backup-time"].as_i64();
632
633 let chunk_size_opt = param["chunk-size"].as_u64().map(|v| (v * 1024) as usize);
634
635 if let Some(size) = chunk_size_opt {
636 verify_chunk_size(size)?;
637 }
638
639 let rate = match param["rate"].as_str() {
640 Some(s) => Some(s.parse::<HumanByte>()?),
641 None => None,
642 };
643 let burst = match param["burst"].as_str() {
644 Some(s) => Some(s.parse::<HumanByte>()?),
645 None => None,
646 };
647
648 let rate_limit = RateLimitConfig::with_same_inout(rate, burst);
649
650 let crypto = crypto_parameters(&param)?;
651
652 let backup_id = param["backup-id"]
653 .as_str()
654 .unwrap_or(proxmox_sys::nodename());
655
656 let backup_type: BackupType = param["backup-type"].as_str().unwrap_or("host").parse()?;
657
658 let include_dev = param["include-dev"].as_array();
659
660 let entries_max = param["entries-max"]
661 .as_u64()
662 .unwrap_or(pbs_client::pxar::ENCODER_MAX_ENTRIES as u64);
663
664 let empty = Vec::new();
665 let exclude_args = param["exclude"].as_array().unwrap_or(&empty);
666
667 let mut pattern_list = Vec::with_capacity(exclude_args.len());
668 for entry in exclude_args {
669 let entry = entry
670 .as_str()
671 .ok_or_else(|| format_err!("Invalid pattern string slice"))?;
672 pattern_list.push(
673 MatchEntry::parse_pattern(entry, PatternFlag::PATH_NAME, MatchType::Exclude)
674 .map_err(|err| format_err!("invalid exclude pattern entry: {}", err))?,
675 );
676 }
677
678 let mut devices = if all_file_systems {
679 None
680 } else {
681 Some(HashSet::new())
682 };
683
684 if let Some(include_dev) = include_dev {
685 if all_file_systems {
686 bail!("option 'all-file-systems' conflicts with option 'include-dev'");
687 }
688
689 let mut set = HashSet::new();
690 for path in include_dev {
691 let path = path.as_str().unwrap();
692 let stat = nix::sys::stat::stat(path)
693 .map_err(|err| format_err!("fstat {:?} failed - {}", path, err))?;
694 set.insert(stat.st_dev);
695 }
696 devices = Some(set);
697 }
698
699 let mut upload_list = vec![];
700 let mut target_set = HashSet::new();
701
702 for backupspec in backupspec_list {
703 let spec = parse_backup_specification(backupspec.as_str().unwrap())?;
704 let filename = &spec.config_string;
705 let target = &spec.archive_name;
706
707 if target_set.contains(target) {
708 bail!("got target twice: '{}'", target);
709 }
710 target_set.insert(target.to_string());
711
712 use std::os::unix::fs::FileTypeExt;
713
714 let metadata = std::fs::metadata(filename)
715 .map_err(|err| format_err!("unable to access '{}' - {}", filename, err))?;
716 let file_type = metadata.file_type();
717
718 match spec.spec_type {
719 BackupSpecificationType::PXAR => {
720 if !file_type.is_dir() {
721 bail!("got unexpected file type (expected directory)");
722 }
723 upload_list.push((
724 BackupSpecificationType::PXAR,
725 filename.to_owned(),
726 format!("{}.didx", target),
727 0,
728 ));
729 }
730 BackupSpecificationType::IMAGE => {
731 if !(file_type.is_file() || file_type.is_block_device()) {
732 bail!("got unexpected file type (expected file or block device)");
733 }
734
735 let size = image_size(&PathBuf::from(filename))?;
736
737 if size == 0 {
738 bail!("got zero-sized file '{}'", filename);
739 }
740
741 upload_list.push((
742 BackupSpecificationType::IMAGE,
743 filename.to_owned(),
744 format!("{}.fidx", target),
745 size,
746 ));
747 }
748 BackupSpecificationType::CONFIG => {
749 if !file_type.is_file() {
750 bail!("got unexpected file type (expected regular file)");
751 }
752 upload_list.push((
753 BackupSpecificationType::CONFIG,
754 filename.to_owned(),
755 format!("{}.blob", target),
756 metadata.len(),
757 ));
758 }
759 BackupSpecificationType::LOGFILE => {
760 if !file_type.is_file() {
761 bail!("got unexpected file type (expected regular file)");
762 }
763 upload_list.push((
764 BackupSpecificationType::LOGFILE,
765 filename.to_owned(),
766 format!("{}.blob", target),
767 metadata.len(),
768 ));
769 }
770 }
771 }
772
773 let backup_time = backup_time_opt.unwrap_or_else(epoch_i64);
774
775 let client = connect_rate_limited(&repo, rate_limit)?;
776 record_repository(&repo);
777
778 println!(
779 "Starting backup: {}/{}/{}",
780 backup_type,
781 backup_id,
782 pbs_datastore::BackupDir::backup_time_to_string(backup_time)?
783 );
784
785 println!("Client name: {}", proxmox_sys::nodename());
786
787 let start_time = std::time::Instant::now();
788
789 println!(
790 "Starting backup protocol: {}",
791 strftime_local("%c", epoch_i64())?
792 );
793
794 let (crypt_config, rsa_encrypted_key) = match crypto.enc_key {
795 None => (None, None),
796 Some(key_with_source) => {
797 println!(
798 "{}",
799 format_key_source(&key_with_source.source, "encryption")
800 );
801
802 let (key, created, fingerprint) =
803 decrypt_key(&key_with_source.key, &get_encryption_key_password)?;
804 println!("Encryption key fingerprint: {}", fingerprint);
805
806 let crypt_config = CryptConfig::new(key)?;
807
808 match crypto.master_pubkey {
809 Some(pem_with_source) => {
810 println!("{}", format_key_source(&pem_with_source.source, "master"));
811
812 let rsa = openssl::rsa::Rsa::public_key_from_pem(&pem_with_source.key)?;
813
814 let mut key_config = KeyConfig::without_password(key)?;
815 key_config.created = created; // keep original value
816
817 let enc_key = rsa_encrypt_key_config(rsa, &key_config)?;
818
819 (Some(Arc::new(crypt_config)), Some(enc_key))
820 }
821 _ => (Some(Arc::new(crypt_config)), None),
822 }
823 }
824 };
825
826 let client = BackupWriter::start(
827 client,
828 crypt_config.clone(),
829 repo.store(),
830 backup_type,
831 backup_id,
832 backup_time,
833 verbose,
834 false,
835 )
836 .await?;
837
838 let download_previous_manifest = match client.previous_backup_time().await {
839 Ok(Some(backup_time)) => {
840 println!(
841 "Downloading previous manifest ({})",
842 strftime_local("%c", backup_time)?
843 );
844 true
845 }
846 Ok(None) => {
847 println!("No previous manifest available.");
848 false
849 }
850 Err(_) => {
851 // Fallback for outdated server, TODO remove/bubble up with 2.0
852 true
853 }
854 };
855
856 let previous_manifest = if download_previous_manifest {
857 match client.download_previous_manifest().await {
858 Ok(previous_manifest) => {
859 match previous_manifest.check_fingerprint(crypt_config.as_ref().map(Arc::as_ref)) {
860 Ok(()) => Some(Arc::new(previous_manifest)),
861 Err(err) => {
862 println!("Couldn't re-use previous manifest - {}", err);
863 None
864 }
865 }
866 }
867 Err(err) => {
868 println!("Couldn't download previous manifest - {}", err);
869 None
870 }
871 }
872 } else {
873 None
874 };
875
876 let snapshot = BackupDir::from((backup_type, backup_id.to_owned(), backup_time));
877 let mut manifest = BackupManifest::new(snapshot);
878
879 let mut catalog = None;
880 let mut catalog_result_rx = None;
881
882 let log_file = |desc: &str, file: &str, target: &str| {
883 let what = if dry_run { "Would upload" } else { "Upload" };
884 println!("{} {} '{}' to '{}' as {}", what, desc, file, repo, target);
885 };
886
887 for (backup_type, filename, target, size) in upload_list {
888 match (backup_type, dry_run) {
889 // dry-run
890 (BackupSpecificationType::CONFIG, true) => log_file("config file", &filename, &target),
891 (BackupSpecificationType::LOGFILE, true) => log_file("log file", &filename, &target),
892 (BackupSpecificationType::PXAR, true) => log_file("directory", &filename, &target),
893 (BackupSpecificationType::IMAGE, true) => log_file("image", &filename, &target),
894 // no dry-run
895 (BackupSpecificationType::CONFIG, false) => {
896 let upload_options = UploadOptions {
897 compress: true,
898 encrypt: crypto.mode == CryptMode::Encrypt,
899 ..UploadOptions::default()
900 };
901
902 log_file("config file", &filename, &target);
903 let stats = client
904 .upload_blob_from_file(&filename, &target, upload_options)
905 .await?;
906 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
907 }
908 (BackupSpecificationType::LOGFILE, false) => {
909 // fixme: remove - not needed anymore ?
910 let upload_options = UploadOptions {
911 compress: true,
912 encrypt: crypto.mode == CryptMode::Encrypt,
913 ..UploadOptions::default()
914 };
915
916 log_file("log file", &filename, &target);
917 let stats = client
918 .upload_blob_from_file(&filename, &target, upload_options)
919 .await?;
920 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
921 }
922 (BackupSpecificationType::PXAR, false) => {
923 // start catalog upload on first use
924 if catalog.is_none() {
925 let catalog_upload_res =
926 spawn_catalog_upload(client.clone(), crypto.mode == CryptMode::Encrypt)?;
927 catalog = Some(catalog_upload_res.catalog_writer);
928 catalog_result_rx = Some(catalog_upload_res.result);
929 }
930 let catalog = catalog.as_ref().unwrap();
931
932 log_file("directory", &filename, &target);
933 catalog
934 .lock()
935 .unwrap()
936 .start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
937
938 let pxar_options = pbs_client::pxar::PxarCreateOptions {
939 device_set: devices.clone(),
940 patterns: pattern_list.clone(),
941 entries_max: entries_max as usize,
942 skip_lost_and_found,
943 verbose,
944 };
945
946 let upload_options = UploadOptions {
947 previous_manifest: previous_manifest.clone(),
948 compress: true,
949 encrypt: crypto.mode == CryptMode::Encrypt,
950 ..UploadOptions::default()
951 };
952
953 let stats = backup_directory(
954 &client,
955 &filename,
956 &target,
957 chunk_size_opt,
958 catalog.clone(),
959 pxar_options,
960 upload_options,
961 )
962 .await?;
963 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
964 catalog.lock().unwrap().end_directory()?;
965 }
966 (BackupSpecificationType::IMAGE, false) => {
967 log_file("image", &filename, &target);
968
969 let upload_options = UploadOptions {
970 previous_manifest: previous_manifest.clone(),
971 fixed_size: Some(size),
972 compress: true,
973 encrypt: crypto.mode == CryptMode::Encrypt,
974 };
975
976 let stats =
977 backup_image(&client, &filename, &target, chunk_size_opt, upload_options)
978 .await?;
979 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
980 }
981 }
982 }
983
984 if dry_run {
985 println!("dry-run: no upload happend");
986 return Ok(Value::Null);
987 }
988
989 // finalize and upload catalog
990 if let Some(catalog) = catalog {
991 let mutex = Arc::try_unwrap(catalog)
992 .map_err(|_| format_err!("unable to get catalog (still used)"))?;
993 let mut catalog = mutex.into_inner().unwrap();
994
995 catalog.finish()?;
996
997 drop(catalog); // close upload stream
998
999 if let Some(catalog_result_rx) = catalog_result_rx {
1000 let stats = catalog_result_rx.await??;
1001 manifest.add_file(CATALOG_NAME.to_owned(), stats.size, stats.csum, crypto.mode)?;
1002 }
1003 }
1004
1005 if let Some(rsa_encrypted_key) = rsa_encrypted_key {
1006 let target = ENCRYPTED_KEY_BLOB_NAME;
1007 println!("Upload RSA encoded key to '{:?}' as {}", repo, target);
1008 let options = UploadOptions {
1009 compress: false,
1010 encrypt: false,
1011 ..UploadOptions::default()
1012 };
1013 let stats = client
1014 .upload_blob_from_data(rsa_encrypted_key, target, options)
1015 .await?;
1016 manifest.add_file(target.to_string(), stats.size, stats.csum, crypto.mode)?;
1017 }
1018 // create manifest (index.json)
1019 // manifests are never encrypted, but include a signature
1020 let manifest = manifest
1021 .to_string(crypt_config.as_ref().map(Arc::as_ref))
1022 .map_err(|err| format_err!("unable to format manifest - {}", err))?;
1023
1024 if verbose {
1025 println!("Upload index.json to '{}'", repo)
1026 };
1027 let options = UploadOptions {
1028 compress: true,
1029 encrypt: false,
1030 ..UploadOptions::default()
1031 };
1032 client
1033 .upload_blob_from_data(manifest.into_bytes(), MANIFEST_BLOB_NAME, options)
1034 .await?;
1035
1036 client.finish().await?;
1037
1038 let end_time = std::time::Instant::now();
1039 let elapsed = end_time.duration_since(start_time);
1040 println!("Duration: {:.2}s", elapsed.as_secs_f64());
1041
1042 println!("End Time: {}", strftime_local("%c", epoch_i64())?);
1043
1044 Ok(Value::Null)
1045 }
1046
1047 async fn dump_image<W: Write>(
1048 client: Arc<BackupReader>,
1049 crypt_config: Option<Arc<CryptConfig>>,
1050 crypt_mode: CryptMode,
1051 index: FixedIndexReader,
1052 mut writer: W,
1053 verbose: bool,
1054 ) -> Result<(), Error> {
1055 let most_used = index.find_most_used_chunks(8);
1056
1057 let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, crypt_mode, most_used);
1058
1059 // Note: we avoid using BufferedFixedReader, because that add an additional buffer/copy
1060 // and thus slows down reading. Instead, directly use RemoteChunkReader
1061 let mut per = 0;
1062 let mut bytes = 0;
1063 let start_time = std::time::Instant::now();
1064
1065 for pos in 0..index.index_count() {
1066 let digest = index.index_digest(pos).unwrap();
1067 let raw_data = chunk_reader.read_chunk(digest).await?;
1068 writer.write_all(&raw_data)?;
1069 bytes += raw_data.len();
1070 if verbose {
1071 let next_per = ((pos + 1) * 100) / index.index_count();
1072 if per != next_per {
1073 eprintln!(
1074 "progress {}% (read {} bytes, duration {} sec)",
1075 next_per,
1076 bytes,
1077 start_time.elapsed().as_secs()
1078 );
1079 per = next_per;
1080 }
1081 }
1082 }
1083
1084 let end_time = std::time::Instant::now();
1085 let elapsed = end_time.duration_since(start_time);
1086 eprintln!(
1087 "restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
1088 bytes,
1089 elapsed.as_secs_f64(),
1090 bytes as f64 / (1024.0 * 1024.0 * elapsed.as_secs_f64())
1091 );
1092
1093 Ok(())
1094 }
1095
1096 fn parse_archive_type(name: &str) -> (String, ArchiveType) {
1097 if name.ends_with(".didx") || name.ends_with(".fidx") || name.ends_with(".blob") {
1098 (name.into(), archive_type(name).unwrap())
1099 } else if name.ends_with(".pxar") {
1100 (format!("{}.didx", name), ArchiveType::DynamicIndex)
1101 } else if name.ends_with(".img") {
1102 (format!("{}.fidx", name), ArchiveType::FixedIndex)
1103 } else {
1104 (format!("{}.blob", name), ArchiveType::Blob)
1105 }
1106 }
1107
1108 #[api(
1109 input: {
1110 properties: {
1111 repository: {
1112 schema: REPO_URL_SCHEMA,
1113 optional: true,
1114 },
1115 snapshot: {
1116 type: String,
1117 description: "Group/Snapshot path.",
1118 },
1119 "archive-name": {
1120 description: "Backup archive name.",
1121 type: String,
1122 },
1123 target: {
1124 type: String,
1125 description: r###"Target directory path. Use '-' to write to standard output.
1126
1127 We do not extract '.pxar' archives when writing to standard output.
1128
1129 "###
1130 },
1131 rate: {
1132 schema: TRAFFIC_CONTROL_RATE_SCHEMA,
1133 optional: true,
1134 },
1135 burst: {
1136 schema: TRAFFIC_CONTROL_BURST_SCHEMA,
1137 optional: true,
1138 },
1139 "allow-existing-dirs": {
1140 type: Boolean,
1141 description: "Do not fail if directories already exists.",
1142 optional: true,
1143 },
1144 keyfile: {
1145 schema: KEYFILE_SCHEMA,
1146 optional: true,
1147 },
1148 "keyfd": {
1149 schema: KEYFD_SCHEMA,
1150 optional: true,
1151 },
1152 "crypt-mode": {
1153 type: CryptMode,
1154 optional: true,
1155 },
1156 }
1157 }
1158 )]
1159 /// Restore backup repository.
1160 async fn restore(param: Value) -> Result<Value, Error> {
1161 let repo = extract_repository_from_value(&param)?;
1162
1163 let verbose = param["verbose"].as_bool().unwrap_or(false);
1164
1165 let allow_existing_dirs = param["allow-existing-dirs"].as_bool().unwrap_or(false);
1166
1167 let archive_name = json::required_string_param(&param, "archive-name")?;
1168
1169 let rate = match param["rate"].as_str() {
1170 Some(s) => Some(s.parse::<HumanByte>()?),
1171 None => None,
1172 };
1173 let burst = match param["burst"].as_str() {
1174 Some(s) => Some(s.parse::<HumanByte>()?),
1175 None => None,
1176 };
1177
1178 let rate_limit = RateLimitConfig::with_same_inout(rate, burst);
1179
1180 let client = connect_rate_limited(&repo, rate_limit)?;
1181 record_repository(&repo);
1182
1183 let path = json::required_string_param(&param, "snapshot")?;
1184
1185 let (backup_type, backup_id, backup_time) = if path.matches('/').count() == 1 {
1186 let group: BackupGroup = path.parse()?;
1187 api_datastore_latest_snapshot(&client, repo.store(), group).await?
1188 } else {
1189 let snapshot: BackupDir = path.parse()?;
1190 (snapshot.group.ty, snapshot.group.id, snapshot.time)
1191 };
1192
1193 let target = json::required_string_param(&param, "target")?;
1194 let target = if target == "-" { None } else { Some(target) };
1195
1196 let crypto = crypto_parameters(&param)?;
1197
1198 let crypt_config = match crypto.enc_key {
1199 None => None,
1200 Some(ref key) => {
1201 let (key, _, _) =
1202 decrypt_key(&key.key, &get_encryption_key_password).map_err(|err| {
1203 eprintln!("{}", format_key_source(&key.source, "encryption"));
1204 err
1205 })?;
1206 Some(Arc::new(CryptConfig::new(key)?))
1207 }
1208 };
1209
1210 let client = BackupReader::start(
1211 client,
1212 crypt_config.clone(),
1213 repo.store(),
1214 backup_type,
1215 &backup_id,
1216 backup_time,
1217 true,
1218 )
1219 .await?;
1220
1221 let (archive_name, archive_type) = parse_archive_type(archive_name);
1222
1223 let (manifest, backup_index_data) = client.download_manifest().await?;
1224
1225 if archive_name == ENCRYPTED_KEY_BLOB_NAME && crypt_config.is_none() {
1226 eprintln!("Restoring encrypted key blob without original key - skipping manifest fingerprint check!")
1227 } else {
1228 if manifest.signature.is_some() {
1229 if let Some(key) = &crypto.enc_key {
1230 eprintln!("{}", format_key_source(&key.source, "encryption"));
1231 }
1232 if let Some(config) = &crypt_config {
1233 eprintln!("Fingerprint: {}", Fingerprint::new(config.fingerprint()));
1234 }
1235 }
1236 manifest.check_fingerprint(crypt_config.as_ref().map(Arc::as_ref))?;
1237 }
1238
1239 if archive_name == MANIFEST_BLOB_NAME {
1240 if let Some(target) = target {
1241 replace_file(target, &backup_index_data, CreateOptions::new(), false)?;
1242 } else {
1243 let stdout = std::io::stdout();
1244 let mut writer = stdout.lock();
1245 writer
1246 .write_all(&backup_index_data)
1247 .map_err(|err| format_err!("unable to pipe data - {}", err))?;
1248 }
1249
1250 return Ok(Value::Null);
1251 }
1252
1253 let file_info = manifest.lookup_file_info(&archive_name)?;
1254
1255 if archive_type == ArchiveType::Blob {
1256 let mut reader = client.download_blob(&manifest, &archive_name).await?;
1257
1258 if let Some(target) = target {
1259 let mut writer = std::fs::OpenOptions::new()
1260 .write(true)
1261 .create(true)
1262 .create_new(true)
1263 .open(target)
1264 .map_err(|err| {
1265 format_err!("unable to create target file {:?} - {}", target, err)
1266 })?;
1267 std::io::copy(&mut reader, &mut writer)?;
1268 } else {
1269 let stdout = std::io::stdout();
1270 let mut writer = stdout.lock();
1271 std::io::copy(&mut reader, &mut writer)
1272 .map_err(|err| format_err!("unable to pipe data - {}", err))?;
1273 }
1274 } else if archive_type == ArchiveType::DynamicIndex {
1275 let index = client
1276 .download_dynamic_index(&manifest, &archive_name)
1277 .await?;
1278
1279 let most_used = index.find_most_used_chunks(8);
1280
1281 let chunk_reader = RemoteChunkReader::new(
1282 client.clone(),
1283 crypt_config,
1284 file_info.chunk_crypt_mode(),
1285 most_used,
1286 );
1287
1288 let mut reader = BufferedDynamicReader::new(index, chunk_reader);
1289
1290 let options = pbs_client::pxar::PxarExtractOptions {
1291 match_list: &[],
1292 extract_match_default: true,
1293 allow_existing_dirs,
1294 on_error: None,
1295 };
1296
1297 if let Some(target) = target {
1298 pbs_client::pxar::extract_archive(
1299 pxar::decoder::Decoder::from_std(reader)?,
1300 Path::new(target),
1301 pbs_client::pxar::Flags::DEFAULT,
1302 |path| {
1303 if verbose {
1304 println!("{:?}", path);
1305 }
1306 },
1307 options,
1308 )
1309 .map_err(|err| format_err!("error extracting archive - {}", err))?;
1310 } else {
1311 let mut writer = std::fs::OpenOptions::new()
1312 .write(true)
1313 .open("/dev/stdout")
1314 .map_err(|err| format_err!("unable to open /dev/stdout - {}", err))?;
1315
1316 std::io::copy(&mut reader, &mut writer)
1317 .map_err(|err| format_err!("unable to pipe data - {}", err))?;
1318 }
1319 } else if archive_type == ArchiveType::FixedIndex {
1320 let index = client
1321 .download_fixed_index(&manifest, &archive_name)
1322 .await?;
1323
1324 let mut writer = if let Some(target) = target {
1325 std::fs::OpenOptions::new()
1326 .write(true)
1327 .create(true)
1328 .create_new(true)
1329 .open(target)
1330 .map_err(|err| format_err!("unable to create target file {:?} - {}", target, err))?
1331 } else {
1332 std::fs::OpenOptions::new()
1333 .write(true)
1334 .open("/dev/stdout")
1335 .map_err(|err| format_err!("unable to open /dev/stdout - {}", err))?
1336 };
1337
1338 dump_image(
1339 client.clone(),
1340 crypt_config.clone(),
1341 file_info.chunk_crypt_mode(),
1342 index,
1343 &mut writer,
1344 verbose,
1345 )
1346 .await?;
1347 }
1348
1349 Ok(Value::Null)
1350 }
1351
1352 #[api(
1353 input: {
1354 properties: {
1355 "dry-run": {
1356 type: bool,
1357 optional: true,
1358 description: "Just show what prune would do, but do not delete anything.",
1359 },
1360 group: {
1361 type: String,
1362 description: "Backup group",
1363 },
1364 "prune-options": {
1365 type: PruneOptions,
1366 flatten: true,
1367 },
1368 "output-format": {
1369 schema: OUTPUT_FORMAT,
1370 optional: true,
1371 },
1372 quiet: {
1373 type: bool,
1374 optional: true,
1375 default: false,
1376 description: "Minimal output - only show removals.",
1377 },
1378 repository: {
1379 schema: REPO_URL_SCHEMA,
1380 optional: true,
1381 },
1382 },
1383 },
1384 )]
1385 /// Prune a backup repository.
1386 async fn prune(
1387 dry_run: Option<bool>,
1388 group: String,
1389 prune_options: PruneOptions,
1390 quiet: bool,
1391 mut param: Value,
1392 ) -> Result<Value, Error> {
1393 let repo = extract_repository_from_value(&param)?;
1394
1395 let client = connect(&repo)?;
1396
1397 let path = format!("api2/json/admin/datastore/{}/prune", repo.store());
1398
1399 let group: BackupGroup = group.parse()?;
1400
1401 let output_format = extract_output_format(&mut param);
1402
1403 let mut api_param = serde_json::to_value(prune_options)?;
1404 if let Some(dry_run) = dry_run {
1405 api_param["dry-run"] = dry_run.into();
1406 }
1407 api_param["backup-type"] = group.ty.to_string().into();
1408 api_param["backup-id"] = group.id.into();
1409
1410 let mut result = client.post(&path, Some(api_param)).await?;
1411
1412 record_repository(&repo);
1413
1414 let render_snapshot_path = |_v: &Value, record: &Value| -> Result<String, Error> {
1415 let item: PruneListItem = serde_json::from_value(record.to_owned())?;
1416 Ok(item.backup.to_string())
1417 };
1418
1419 let render_prune_action = |v: &Value, _record: &Value| -> Result<String, Error> {
1420 Ok(match v.as_bool() {
1421 Some(true) => "keep",
1422 Some(false) => "remove",
1423 None => "unknown",
1424 }
1425 .to_string())
1426 };
1427
1428 let options = default_table_format_options()
1429 .sortby("backup-type", false)
1430 .sortby("backup-id", false)
1431 .sortby("backup-time", false)
1432 .column(
1433 ColumnConfig::new("backup-id")
1434 .renderer(render_snapshot_path)
1435 .header("snapshot"),
1436 )
1437 .column(
1438 ColumnConfig::new("backup-time")
1439 .renderer(pbs_tools::format::render_epoch)
1440 .header("date"),
1441 )
1442 .column(
1443 ColumnConfig::new("keep")
1444 .renderer(render_prune_action)
1445 .header("action"),
1446 );
1447
1448 let return_type = &pbs_api_types::ADMIN_DATASTORE_PRUNE_RETURN_TYPE;
1449
1450 let mut data = result["data"].take();
1451
1452 if quiet {
1453 let list: Vec<Value> = data
1454 .as_array()
1455 .unwrap()
1456 .iter()
1457 .filter(|item| item["keep"].as_bool() == Some(false))
1458 .cloned()
1459 .collect();
1460 data = list.into();
1461 }
1462
1463 format_and_print_result_full(&mut data, return_type, &output_format, &options);
1464
1465 Ok(Value::Null)
1466 }
1467
1468 #[api(
1469 input: {
1470 properties: {
1471 repository: {
1472 schema: REPO_URL_SCHEMA,
1473 optional: true,
1474 },
1475 "output-format": {
1476 schema: OUTPUT_FORMAT,
1477 optional: true,
1478 },
1479 }
1480 },
1481 returns: {
1482 type: StorageStatus,
1483 },
1484 )]
1485 /// Get repository status.
1486 async fn status(param: Value) -> Result<Value, Error> {
1487 let repo = extract_repository_from_value(&param)?;
1488
1489 let output_format = get_output_format(&param);
1490
1491 let client = connect(&repo)?;
1492
1493 let path = format!("api2/json/admin/datastore/{}/status", repo.store());
1494
1495 let mut result = client.get(&path, None).await?;
1496 let mut data = result["data"].take();
1497
1498 record_repository(&repo);
1499
1500 let render_total_percentage = |v: &Value, record: &Value| -> Result<String, Error> {
1501 let v = v.as_u64().unwrap();
1502 let total = record["total"].as_u64().unwrap();
1503 let roundup = total / 200;
1504 let per = ((v + roundup) * 100) / total;
1505 let info = format!(" ({} %)", per);
1506 Ok(format!("{} {:>8}", v, info))
1507 };
1508
1509 let options = default_table_format_options()
1510 .noheader(true)
1511 .column(ColumnConfig::new("total").renderer(render_total_percentage))
1512 .column(ColumnConfig::new("used").renderer(render_total_percentage))
1513 .column(ColumnConfig::new("avail").renderer(render_total_percentage));
1514
1515 let return_type = &API_METHOD_STATUS.returns;
1516
1517 format_and_print_result_full(&mut data, return_type, &output_format, &options);
1518
1519 Ok(Value::Null)
1520 }
1521
1522 /// This is a workaround until we have cleaned up the chunk/reader/... infrastructure for better
1523 /// async use!
1524 ///
1525 /// Ideally BufferedDynamicReader gets replaced so the LruCache maps to `BroadcastFuture<Chunk>`,
1526 /// so that we can properly access it from multiple threads simultaneously while not issuing
1527 /// duplicate simultaneous reads over http.
1528 pub struct BufferedDynamicReadAt {
1529 inner: Mutex<BufferedDynamicReader<RemoteChunkReader>>,
1530 }
1531
1532 impl BufferedDynamicReadAt {
1533 fn new(inner: BufferedDynamicReader<RemoteChunkReader>) -> Self {
1534 Self {
1535 inner: Mutex::new(inner),
1536 }
1537 }
1538 }
1539
1540 impl ReadAt for BufferedDynamicReadAt {
1541 fn start_read_at<'a>(
1542 self: Pin<&'a Self>,
1543 _cx: &mut Context,
1544 buf: &'a mut [u8],
1545 offset: u64,
1546 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
1547 MaybeReady::Ready(tokio::task::block_in_place(move || {
1548 let mut reader = self.inner.lock().unwrap();
1549 reader.seek(SeekFrom::Start(offset))?;
1550 reader.read(buf)
1551 }))
1552 }
1553
1554 fn poll_complete<'a>(
1555 self: Pin<&'a Self>,
1556 _op: ReadAtOperation<'a>,
1557 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
1558 panic!("BufferedDynamicReadAt::start_read_at returned Pending");
1559 }
1560 }
1561
1562 fn main() {
1563 pbs_tools::setup_libc_malloc_opts();
1564
1565 let backup_cmd_def = CliCommand::new(&API_METHOD_CREATE_BACKUP)
1566 .arg_param(&["backupspec"])
1567 .completion_cb("repository", complete_repository)
1568 .completion_cb("backupspec", complete_backup_source)
1569 .completion_cb("keyfile", complete_file_name)
1570 .completion_cb("master-pubkey-file", complete_file_name)
1571 .completion_cb("chunk-size", complete_chunk_size);
1572
1573 let benchmark_cmd_def = CliCommand::new(&API_METHOD_BENCHMARK)
1574 .completion_cb("repository", complete_repository)
1575 .completion_cb("keyfile", complete_file_name);
1576
1577 let list_cmd_def = CliCommand::new(&API_METHOD_LIST_BACKUP_GROUPS)
1578 .completion_cb("repository", complete_repository);
1579
1580 let garbage_collect_cmd_def = CliCommand::new(&API_METHOD_START_GARBAGE_COLLECTION)
1581 .completion_cb("repository", complete_repository);
1582
1583 let restore_cmd_def = CliCommand::new(&API_METHOD_RESTORE)
1584 .arg_param(&["snapshot", "archive-name", "target"])
1585 .completion_cb("repository", complete_repository)
1586 .completion_cb("snapshot", complete_group_or_snapshot)
1587 .completion_cb("archive-name", complete_archive_name)
1588 .completion_cb("target", complete_file_name);
1589
1590 let prune_cmd_def = CliCommand::new(&API_METHOD_PRUNE)
1591 .arg_param(&["group"])
1592 .completion_cb("group", complete_backup_group)
1593 .completion_cb("repository", complete_repository);
1594
1595 let status_cmd_def =
1596 CliCommand::new(&API_METHOD_STATUS).completion_cb("repository", complete_repository);
1597
1598 let login_cmd_def =
1599 CliCommand::new(&API_METHOD_API_LOGIN).completion_cb("repository", complete_repository);
1600
1601 let logout_cmd_def =
1602 CliCommand::new(&API_METHOD_API_LOGOUT).completion_cb("repository", complete_repository);
1603
1604 let version_cmd_def =
1605 CliCommand::new(&API_METHOD_API_VERSION).completion_cb("repository", complete_repository);
1606
1607 let change_owner_cmd_def = CliCommand::new(&API_METHOD_CHANGE_BACKUP_OWNER)
1608 .arg_param(&["group", "new-owner"])
1609 .completion_cb("group", complete_backup_group)
1610 .completion_cb("new-owner", complete_auth_id)
1611 .completion_cb("repository", complete_repository);
1612
1613 let cmd_def = CliCommandMap::new()
1614 .insert("backup", backup_cmd_def)
1615 .insert("garbage-collect", garbage_collect_cmd_def)
1616 .insert("list", list_cmd_def)
1617 .insert("login", login_cmd_def)
1618 .insert("logout", logout_cmd_def)
1619 .insert("prune", prune_cmd_def)
1620 .insert("restore", restore_cmd_def)
1621 .insert("snapshot", snapshot_mgtm_cli())
1622 .insert("status", status_cmd_def)
1623 .insert("key", key::cli())
1624 .insert("mount", mount_cmd_def())
1625 .insert("map", map_cmd_def())
1626 .insert("unmap", unmap_cmd_def())
1627 .insert("catalog", catalog_mgmt_cli())
1628 .insert("task", task_mgmt_cli())
1629 .insert("version", version_cmd_def)
1630 .insert("benchmark", benchmark_cmd_def)
1631 .insert("change-owner", change_owner_cmd_def)
1632 .alias(&["files"], &["snapshot", "files"])
1633 .alias(&["forget"], &["snapshot", "forget"])
1634 .alias(&["upload-log"], &["snapshot", "upload-log"])
1635 .alias(&["snapshots"], &["snapshot", "list"]);
1636
1637 let rpcenv = CliEnvironment::new();
1638 run_cli_command(
1639 cmd_def,
1640 rpcenv,
1641 Some(|future| proxmox_async::runtime::main(future)),
1642 );
1643 }