1 use std
::collections
::HashSet
;
2 use std
::io
::{self, Read, Write, Seek, SeekFrom}
;
3 use std
::path
::{Path, PathBuf}
;
5 use std
::sync
::{Arc, Mutex}
;
6 use std
::task
::Context
;
8 use anyhow
::{bail, format_err, Error}
;
9 use futures
::stream
::{StreamExt, TryStreamExt}
;
10 use serde_json
::{json, Value}
;
11 use tokio
::sync
::mpsc
;
12 use tokio_stream
::wrappers
::ReceiverStream
;
13 use xdg
::BaseDirectories
;
15 use pathpatterns
::{MatchEntry, MatchType, PatternFlag}
;
16 use proxmox_io
::StdChannelWriter
;
17 use proxmox_sys
::fs
::{file_get_json, replace_file, CreateOptions, image_size}
;
18 use proxmox_router
::{ApiMethod, RpcEnvironment, cli::*}
;
19 use proxmox_schema
::api
;
20 use proxmox_time
::{strftime_local, epoch_i64}
;
21 use proxmox_async
::blocking
::TokioWriterAdapter
;
22 use pxar
::accessor
::{MaybeReady, ReadAt, ReadAtOperation}
;
25 BACKUP_ID_SCHEMA
, BACKUP_TIME_SCHEMA
, BACKUP_TYPE_SCHEMA
,
26 TRAFFIC_CONTROL_BURST_SCHEMA
, TRAFFIC_CONTROL_RATE_SCHEMA
,
27 Authid
, CryptMode
, Fingerprint
, GroupListItem
, HumanByte
,
28 PruneListItem
, PruneOptions
, RateLimitConfig
, SnapshotListItem
,
35 BackupSpecificationType
,
45 parse_backup_specification
,
48 use pbs_client
::catalog_shell
::Shell
;
49 use pbs_client
::tools
::{
50 complete_archive_name
, complete_auth_id
, complete_backup_group
, complete_backup_snapshot
,
51 complete_backup_source
, complete_chunk_size
, complete_group_or_snapshot
,
52 complete_img_archive_name
, complete_pxar_archive_name
, complete_repository
, connect
,
53 connect_rate_limited
, extract_repository_from_value
,
55 crypto_parameters
, format_key_source
, get_encryption_key_password
, KEYFD_SCHEMA
,
56 KEYFILE_SCHEMA
, MASTER_PUBKEY_FD_SCHEMA
, MASTER_PUBKEY_FILE_SCHEMA
,
58 CHUNK_SIZE_SCHEMA
, REPO_URL_SCHEMA
,
60 use pbs_config
::key_config
::{KeyConfig, decrypt_key, rsa_encrypt_key_config}
;
61 use pbs_datastore
::CATALOG_NAME
;
62 use pbs_datastore
::backup_info
::{BackupDir, BackupGroup}
;
63 use pbs_datastore
::catalog
::{BackupCatalogWriter, CatalogReader, CatalogWriter}
;
64 use pbs_datastore
::chunk_store
::verify_chunk_size
;
65 use pbs_datastore
::dynamic_index
::{BufferedDynamicReader, DynamicIndexReader}
;
66 use pbs_datastore
::fixed_index
::FixedIndexReader
;
67 use pbs_datastore
::index
::IndexFile
;
68 use pbs_datastore
::manifest
::{
69 ENCRYPTED_KEY_BLOB_NAME
, MANIFEST_BLOB_NAME
, ArchiveType
, BackupManifest
, archive_type
,
71 use pbs_datastore
::read_chunk
::AsyncReadChunk
;
73 use pbs_tools
::crypt_config
::CryptConfig
;
87 fn record_repository(repo
: &BackupRepository
) {
89 let base
= match BaseDirectories
::with_prefix("proxmox-backup") {
94 // usually $HOME/.cache/proxmox-backup/repo-list
95 let path
= match base
.place_cache_file("repo-list") {
100 let mut data
= file_get_json(&path
, None
).unwrap_or_else(|_
| json
!({}
));
102 let repo
= repo
.to_string();
104 data
[&repo
] = json
!{ data[&repo].as_i64().unwrap_or(0) + 1 }
;
106 let mut map
= serde_json
::map
::Map
::new();
109 let mut max_used
= 0;
110 let mut max_repo
= None
;
111 for (repo
, count
) in data
.as_object().unwrap() {
112 if map
.contains_key(repo
) { continue; }
113 if let Some(count
) = count
.as_i64() {
114 if count
> max_used
{
116 max_repo
= Some(repo
);
120 if let Some(repo
) = max_repo
{
121 map
.insert(repo
.to_owned(), json
!(max_used
));
125 if map
.len() > 10 { // store max. 10 repos
130 let new_data
= json
!(map
);
132 let _
= replace_file(path
, new_data
.to_string().as_bytes(), CreateOptions
::new(), false);
135 async
fn api_datastore_list_snapshots(
138 group
: Option
<BackupGroup
>,
139 ) -> Result
<Value
, Error
> {
141 let path
= format
!("api2/json/admin/datastore/{}/snapshots", store
);
143 let mut args
= json
!({}
);
144 if let Some(group
) = group
{
145 args
["backup-type"] = group
.backup_type().into();
146 args
["backup-id"] = group
.backup_id().into();
149 let mut result
= client
.get(&path
, Some(args
)).await?
;
151 Ok(result
["data"].take())
154 pub async
fn api_datastore_latest_snapshot(
158 ) -> Result
<(String
, String
, i64), Error
> {
160 let list
= api_datastore_list_snapshots(client
, store
, Some(group
.clone())).await?
;
161 let mut list
: Vec
<SnapshotListItem
> = serde_json
::from_value(list
)?
;
164 bail
!("backup group {:?} does not contain any snapshots.", group
.group_path());
167 list
.sort_unstable_by(|a
, b
| b
.backup_time
.cmp(&a
.backup_time
));
169 let backup_time
= list
[0].backup_time
;
171 Ok((group
.backup_type().to_owned(), group
.backup_id().to_owned(), backup_time
))
174 async
fn backup_directory
<P
: AsRef
<Path
>>(
175 client
: &BackupWriter
,
178 chunk_size
: Option
<usize>,
179 catalog
: Arc
<Mutex
<CatalogWriter
<TokioWriterAdapter
<StdChannelWriter
<Error
>>>>>,
180 pxar_create_options
: pbs_client
::pxar
::PxarCreateOptions
,
181 upload_options
: UploadOptions
,
182 ) -> Result
<BackupStats
, Error
> {
184 let pxar_stream
= PxarBackupStream
::open(
189 let mut chunk_stream
= ChunkStream
::new(pxar_stream
, chunk_size
);
191 let (tx
, rx
) = mpsc
::channel(10); // allow to buffer 10 chunks
193 let stream
= ReceiverStream
::new(rx
)
194 .map_err(Error
::from
);
196 // spawn chunker inside a separate task so that it can run parallel
197 tokio
::spawn(async
move {
198 while let Some(v
) = chunk_stream
.next().await
{
199 let _
= tx
.send(v
).await
;
203 if upload_options
.fixed_size
.is_some() {
204 bail
!("cannot backup directory with fixed chunk size!");
208 .upload_stream(archive_name
, stream
, upload_options
)
214 async
fn backup_image
<P
: AsRef
<Path
>>(
215 client
: &BackupWriter
,
218 chunk_size
: Option
<usize>,
219 upload_options
: UploadOptions
,
220 ) -> Result
<BackupStats
, Error
> {
222 let path
= image_path
.as_ref().to_owned();
224 let file
= tokio
::fs
::File
::open(path
).await?
;
226 let stream
= tokio_util
::codec
::FramedRead
::new(file
, tokio_util
::codec
::BytesCodec
::new())
227 .map_err(Error
::from
);
229 let stream
= FixedChunkStream
::new(stream
, chunk_size
.unwrap_or(4*1024*1024));
231 if upload_options
.fixed_size
.is_none() {
232 bail
!("cannot backup image with dynamic chunk size!");
236 .upload_stream(archive_name
, stream
, upload_options
)
246 schema
: REPO_URL_SCHEMA
,
250 schema
: OUTPUT_FORMAT
,
256 /// List backup groups.
257 async
fn list_backup_groups(param
: Value
) -> Result
<Value
, Error
> {
259 let output_format
= get_output_format(¶m
);
261 let repo
= extract_repository_from_value(¶m
)?
;
263 let client
= connect(&repo
)?
;
265 let path
= format
!("api2/json/admin/datastore/{}/groups", repo
.store());
267 let mut result
= client
.get(&path
, None
).await?
;
269 record_repository(&repo
);
271 let render_group_path
= |_v
: &Value
, record
: &Value
| -> Result
<String
, Error
> {
272 let item
: GroupListItem
= serde_json
::from_value(record
.to_owned())?
;
273 let group
= BackupGroup
::new(item
.backup_type
, item
.backup_id
);
274 Ok(group
.group_path().to_str().unwrap().to_owned())
277 let render_last_backup
= |_v
: &Value
, record
: &Value
| -> Result
<String
, Error
> {
278 let item
: GroupListItem
= serde_json
::from_value(record
.to_owned())?
;
279 let snapshot
= BackupDir
::new(item
.backup_type
, item
.backup_id
, item
.last_backup
)?
;
280 Ok(snapshot
.relative_path().to_str().unwrap().to_owned())
283 let render_files
= |_v
: &Value
, record
: &Value
| -> Result
<String
, Error
> {
284 let item
: GroupListItem
= serde_json
::from_value(record
.to_owned())?
;
285 Ok(pbs_tools
::format
::render_backup_file_list(&item
.files
))
288 let options
= default_table_format_options()
289 .sortby("backup-type", false)
290 .sortby("backup-id", false)
291 .column(ColumnConfig
::new("backup-id").renderer(render_group_path
).header("group"))
293 ColumnConfig
::new("last-backup")
294 .renderer(render_last_backup
)
295 .header("last snapshot")
298 .column(ColumnConfig
::new("backup-count"))
299 .column(ColumnConfig
::new("files").renderer(render_files
));
301 let mut data
: Value
= result
["data"].take();
303 let return_type
= &pbs_api_types
::ADMIN_DATASTORE_LIST_GROUPS_RETURN_TYPE
;
305 format_and_print_result_full(&mut data
, return_type
, &output_format
, &options
);
314 schema
: REPO_URL_SCHEMA
,
319 description
: "Backup group.",
327 /// Change owner of a backup group
328 async
fn change_backup_owner(group
: String
, mut param
: Value
) -> Result
<(), Error
> {
330 let repo
= extract_repository_from_value(¶m
)?
;
332 let client
= connect(&repo
)?
;
334 param
.as_object_mut().unwrap().remove("repository");
336 let group
: BackupGroup
= group
.parse()?
;
338 param
["backup-type"] = group
.backup_type().into();
339 param
["backup-id"] = group
.backup_id().into();
341 let path
= format
!("api2/json/admin/datastore/{}/change-owner", repo
.store());
342 client
.post(&path
, Some(param
)).await?
;
344 record_repository(&repo
);
353 schema
: REPO_URL_SCHEMA
,
359 /// Try to login. If successful, store ticket.
360 async
fn api_login(param
: Value
) -> Result
<Value
, Error
> {
362 let repo
= extract_repository_from_value(¶m
)?
;
364 let client
= connect(&repo
)?
;
365 client
.login().await?
;
367 record_repository(&repo
);
376 schema
: REPO_URL_SCHEMA
,
382 /// Logout (delete stored ticket).
383 fn api_logout(param
: Value
) -> Result
<Value
, Error
> {
385 let repo
= extract_repository_from_value(¶m
)?
;
387 delete_ticket_info("proxmox-backup", repo
.host(), repo
.user())?
;
396 schema
: REPO_URL_SCHEMA
,
400 schema
: OUTPUT_FORMAT
,
406 /// Show client and optional server version
407 async
fn api_version(param
: Value
) -> Result
<(), Error
> {
409 let output_format
= get_output_format(¶m
);
411 let mut version_info
= json
!({
413 "version": pbs_buildcfg
::PROXMOX_PKG_VERSION
,
414 "release": pbs_buildcfg
::PROXMOX_PKG_RELEASE
,
415 "repoid": pbs_buildcfg
::PROXMOX_PKG_REPOID
,
419 let repo
= extract_repository_from_value(¶m
);
420 if let Ok(repo
) = repo
{
421 let client
= connect(&repo
)?
;
423 match client
.get("api2/json/version", None
).await
{
424 Ok(mut result
) => version_info
["server"] = result
["data"].take(),
425 Err(e
) => eprintln
!("could not connect to server - {}", e
),
428 if output_format
== "text" {
430 "client version: {}.{}",
431 pbs_buildcfg
::PROXMOX_PKG_VERSION
,
432 pbs_buildcfg
::PROXMOX_PKG_RELEASE
,
434 if let Some(server
) = version_info
["server"].as_object() {
435 let server_version
= server
["version"].as_str().unwrap();
436 let server_release
= server
["release"].as_str().unwrap();
437 println
!("server version: {}.{}", server_version
, server_release
);
440 format_and_print_result(&version_info
, &output_format
);
450 schema
: REPO_URL_SCHEMA
,
454 schema
: OUTPUT_FORMAT
,
460 /// Start garbage collection for a specific repository.
461 async
fn start_garbage_collection(param
: Value
) -> Result
<Value
, Error
> {
463 let repo
= extract_repository_from_value(¶m
)?
;
465 let output_format
= get_output_format(¶m
);
467 let client
= connect(&repo
)?
;
469 let path
= format
!("api2/json/admin/datastore/{}/gc", repo
.store());
471 let result
= client
.post(&path
, None
).await?
;
473 record_repository(&repo
);
475 view_task_result(&client
, result
, &output_format
).await?
;
480 struct CatalogUploadResult
{
481 catalog_writer
: Arc
<Mutex
<CatalogWriter
<TokioWriterAdapter
<StdChannelWriter
<Error
>>>>>,
482 result
: tokio
::sync
::oneshot
::Receiver
<Result
<BackupStats
, Error
>>,
485 fn spawn_catalog_upload(
486 client
: Arc
<BackupWriter
>,
488 ) -> Result
<CatalogUploadResult
, Error
> {
489 let (catalog_tx
, catalog_rx
) = std
::sync
::mpsc
::sync_channel(10); // allow to buffer 10 writes
490 let catalog_stream
= proxmox_async
::blocking
::StdChannelStream(catalog_rx
);
491 let catalog_chunk_size
= 512*1024;
492 let catalog_chunk_stream
= ChunkStream
::new(catalog_stream
, Some(catalog_chunk_size
));
494 let catalog_writer
= Arc
::new(Mutex
::new(CatalogWriter
::new(TokioWriterAdapter
::new(StdChannelWriter
::new(catalog_tx
)))?
));
496 let (catalog_result_tx
, catalog_result_rx
) = tokio
::sync
::oneshot
::channel();
498 let upload_options
= UploadOptions
{
501 ..UploadOptions
::default()
504 tokio
::spawn(async
move {
505 let catalog_upload_result
= client
506 .upload_stream(CATALOG_NAME
, catalog_chunk_stream
, upload_options
)
509 if let Err(ref err
) = catalog_upload_result
{
510 eprintln
!("catalog upload error - {}", err
);
514 let _
= catalog_result_tx
.send(catalog_upload_result
);
517 Ok(CatalogUploadResult { catalog_writer, result: catalog_result_rx }
)
525 description
: "List of backup source specifications ([<label.ext>:<path>] ...)",
527 schema
: BACKUP_SOURCE_SCHEMA
,
531 schema
: REPO_URL_SCHEMA
,
535 description
: "Include mountpoints with same st_dev number (see ``man fstat``) as specified files.",
539 description
: "Path to file.",
542 "all-file-systems": {
544 description
: "Include all mounted subdirectories.",
548 schema
: KEYFILE_SCHEMA
,
552 schema
: KEYFD_SCHEMA
,
555 "master-pubkey-file": {
556 schema
: MASTER_PUBKEY_FILE_SCHEMA
,
559 "master-pubkey-fd": {
560 schema
: MASTER_PUBKEY_FD_SCHEMA
,
567 "skip-lost-and-found": {
569 description
: "Skip lost+found directory.",
573 schema
: BACKUP_TYPE_SCHEMA
,
577 schema
: BACKUP_ID_SCHEMA
,
581 schema
: BACKUP_TIME_SCHEMA
,
585 schema
: CHUNK_SIZE_SCHEMA
,
589 schema
: TRAFFIC_CONTROL_RATE_SCHEMA
,
593 schema
: TRAFFIC_CONTROL_BURST_SCHEMA
,
598 description
: "List of paths or patterns for matching files to exclude.",
602 description
: "Path or match pattern.",
607 description
: "Max number of entries to hold in memory.",
609 default: pbs_client
::pxar
::ENCODER_MAX_ENTRIES
as isize,
613 description
: "Verbose output.",
619 /// Create (host) backup.
620 async
fn create_backup(
623 _rpcenv
: &mut dyn RpcEnvironment
,
624 ) -> Result
<Value
, Error
> {
626 let repo
= extract_repository_from_value(¶m
)?
;
628 let backupspec_list
= json
::required_array_param(¶m
, "backupspec")?
;
630 let all_file_systems
= param
["all-file-systems"].as_bool().unwrap_or(false);
632 let skip_lost_and_found
= param
["skip-lost-and-found"].as_bool().unwrap_or(false);
634 let verbose
= param
["verbose"].as_bool().unwrap_or(false);
636 let backup_time_opt
= param
["backup-time"].as_i64();
638 let chunk_size_opt
= param
["chunk-size"].as_u64().map(|v
| (v
*1024) as usize);
640 if let Some(size
) = chunk_size_opt
{
641 verify_chunk_size(size
)?
;
644 let rate
= match param
["rate"].as_str() {
645 Some(s
) => Some(s
.parse
::<HumanByte
>()?
),
648 let burst
= match param
["burst"].as_str() {
649 Some(s
) => Some(s
.parse
::<HumanByte
>()?
),
653 let rate_limit
= RateLimitConfig
::with_same_inout(rate
, burst
);
655 let crypto
= crypto_parameters(¶m
)?
;
657 let backup_id
= param
["backup-id"].as_str().unwrap_or(&proxmox_sys
::nodename());
659 let backup_type
= param
["backup-type"].as_str().unwrap_or("host");
661 let include_dev
= param
["include-dev"].as_array();
663 let entries_max
= param
["entries-max"].as_u64()
664 .unwrap_or(pbs_client
::pxar
::ENCODER_MAX_ENTRIES
as u64);
666 let empty
= Vec
::new();
667 let exclude_args
= param
["exclude"].as_array().unwrap_or(&empty
);
669 let mut pattern_list
= Vec
::with_capacity(exclude_args
.len());
670 for entry
in exclude_args
{
671 let entry
= entry
.as_str().ok_or_else(|| format_err
!("Invalid pattern string slice"))?
;
673 MatchEntry
::parse_pattern(entry
, PatternFlag
::PATH_NAME
, MatchType
::Exclude
)
674 .map_err(|err
| format_err
!("invalid exclude pattern entry: {}", err
))?
678 let mut devices
= if all_file_systems { None }
else { Some(HashSet::new()) }
;
680 if let Some(include_dev
) = include_dev
{
681 if all_file_systems
{
682 bail
!("option 'all-file-systems' conflicts with option 'include-dev'");
685 let mut set
= HashSet
::new();
686 for path
in include_dev
{
687 let path
= path
.as_str().unwrap();
688 let stat
= nix
::sys
::stat
::stat(path
)
689 .map_err(|err
| format_err
!("fstat {:?} failed - {}", path
, err
))?
;
690 set
.insert(stat
.st_dev
);
695 let mut upload_list
= vec
![];
696 let mut target_set
= HashSet
::new();
698 for backupspec
in backupspec_list
{
699 let spec
= parse_backup_specification(backupspec
.as_str().unwrap())?
;
700 let filename
= &spec
.config_string
;
701 let target
= &spec
.archive_name
;
703 if target_set
.contains(target
) {
704 bail
!("got target twice: '{}'", target
);
706 target_set
.insert(target
.to_string());
708 use std
::os
::unix
::fs
::FileTypeExt
;
710 let metadata
= std
::fs
::metadata(filename
)
711 .map_err(|err
| format_err
!("unable to access '{}' - {}", filename
, err
))?
;
712 let file_type
= metadata
.file_type();
714 match spec
.spec_type
{
715 BackupSpecificationType
::PXAR
=> {
716 if !file_type
.is_dir() {
717 bail
!("got unexpected file type (expected directory)");
719 upload_list
.push((BackupSpecificationType
::PXAR
, filename
.to_owned(), format
!("{}.didx", target
), 0));
721 BackupSpecificationType
::IMAGE
=> {
722 if !(file_type
.is_file() || file_type
.is_block_device()) {
723 bail
!("got unexpected file type (expected file or block device)");
726 let size
= image_size(&PathBuf
::from(filename
))?
;
728 if size
== 0 { bail!("got zero-sized file '{}'
", filename); }
730 upload_list.push((BackupSpecificationType::IMAGE, filename.to_owned(), format!("{}
.fidx
", target), size));
732 BackupSpecificationType::CONFIG => {
733 if !file_type.is_file() {
734 bail!("got unexpected file
type (expected regular file
)");
736 upload_list.push((BackupSpecificationType::CONFIG, filename.to_owned(), format!("{}
.blob
", target), metadata.len()));
738 BackupSpecificationType::LOGFILE => {
739 if !file_type.is_file() {
740 bail!("got unexpected file
type (expected regular file
)");
742 upload_list.push((BackupSpecificationType::LOGFILE, filename.to_owned(), format!("{}
.blob
", target), metadata.len()));
747 let backup_time = backup_time_opt.unwrap_or_else(epoch_i64);
749 let client = connect_rate_limited(&repo, rate_limit)?;
750 record_repository(&repo);
752 println!("Starting backup
: {}
/{}
/{}
", backup_type, backup_id, BackupDir::backup_time_to_string(backup_time)?);
754 println!("Client name
: {}
", proxmox_sys::nodename());
756 let start_time = std::time::Instant::now();
758 println!("Starting backup protocol
: {}
", strftime_local("%c
", epoch_i64())?);
760 let (crypt_config, rsa_encrypted_key) = match crypto.enc_key {
761 None => (None, None),
762 Some(key_with_source) => {
765 format_key_source(&key_with_source.source, "encryption
")
768 let (key, created, fingerprint) =
769 decrypt_key(&key_with_source.key, &get_encryption_key_password)?;
770 println!("Encryption key fingerprint
: {}
", fingerprint);
772 let crypt_config = CryptConfig::new(key)?;
774 match crypto.master_pubkey {
775 Some(pem_with_source) => {
776 println!("{}
", format_key_source(&pem_with_source.source, "master
"));
778 let rsa = openssl::rsa::Rsa::public_key_from_pem(&pem_with_source.key)?;
780 let mut key_config = KeyConfig::without_password(key)?;
781 key_config.created = created; // keep original value
783 let enc_key = rsa_encrypt_key_config(rsa, &key_config)?;
785 (Some(Arc::new(crypt_config)), Some(enc_key))
787 _ => (Some(Arc::new(crypt_config)), None),
792 let client = BackupWriter::start(
794 crypt_config.clone(),
803 let download_previous_manifest = match client.previous_backup_time().await {
804 Ok(Some(backup_time)) => {
806 "Downloading previous
manifest ({}
)",
807 strftime_local("%c
", backup_time)?
812 println!("No previous manifest available
.");
816 // Fallback for outdated server, TODO remove/bubble up with 2.0
821 let previous_manifest = if download_previous_manifest {
822 match client.download_previous_manifest().await {
823 Ok(previous_manifest) => {
824 match previous_manifest.check_fingerprint(crypt_config.as_ref().map(Arc::as_ref)) {
825 Ok(()) => Some(Arc::new(previous_manifest)),
827 println!("Couldn't re
-use previous manifest
- {}
", err);
833 println!("Couldn't download previous manifest
- {}
", err);
841 let snapshot = BackupDir::new(backup_type, backup_id, backup_time)?;
842 let mut manifest = BackupManifest::new(snapshot);
844 let mut catalog = None;
845 let mut catalog_result_rx = None;
847 for (backup_type, filename, target, size) in upload_list {
849 BackupSpecificationType::CONFIG => {
850 let upload_options = UploadOptions {
852 encrypt: crypto.mode == CryptMode::Encrypt,
853 ..UploadOptions::default()
856 println!("Upload config file '{}' to '{}'
as {}
", filename, repo, target);
858 .upload_blob_from_file(&filename, &target, upload_options)
860 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
862 BackupSpecificationType::LOGFILE => { // fixme: remove - not needed anymore ?
863 let upload_options = UploadOptions {
865 encrypt: crypto.mode == CryptMode::Encrypt,
866 ..UploadOptions::default()
869 println!("Upload log file '{}' to '{}'
as {}
", filename, repo, target);
871 .upload_blob_from_file(&filename, &target, upload_options)
873 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
875 BackupSpecificationType::PXAR => {
876 // start catalog upload on first use
877 if catalog.is_none() {
878 let catalog_upload_res = spawn_catalog_upload(client.clone(), crypto.mode == CryptMode::Encrypt)?;
879 catalog = Some(catalog_upload_res.catalog_writer);
880 catalog_result_rx = Some(catalog_upload_res.result);
882 let catalog = catalog.as_ref().unwrap();
884 println!("Upload directory '{}' to '{}'
as {}
", filename, repo, target);
885 catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
887 let pxar_options = pbs_client::pxar::PxarCreateOptions {
888 device_set: devices.clone(),
889 patterns: pattern_list.clone(),
890 entries_max: entries_max as usize,
895 let upload_options = UploadOptions {
896 previous_manifest: previous_manifest.clone(),
898 encrypt: crypto.mode == CryptMode::Encrypt,
899 ..UploadOptions::default()
902 let stats = backup_directory(
911 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
912 catalog.lock().unwrap().end_directory()?;
914 BackupSpecificationType::IMAGE => {
915 println!("Upload image '{}' to '{:?}'
as {}
", filename, repo, target);
917 let upload_options = UploadOptions {
918 previous_manifest: previous_manifest.clone(),
919 fixed_size: Some(size),
921 encrypt: crypto.mode == CryptMode::Encrypt,
924 let stats = backup_image(
931 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
936 // finalize and upload catalog
937 if let Some(catalog) = catalog {
938 let mutex = Arc::try_unwrap(catalog)
939 .map_err(|_| format_err!("unable to get
catalog (still used
)"))?;
940 let mut catalog = mutex.into_inner().unwrap();
944 drop(catalog); // close upload stream
946 if let Some(catalog_result_rx) = catalog_result_rx {
947 let stats = catalog_result_rx.await??;
948 manifest.add_file(CATALOG_NAME.to_owned(), stats.size, stats.csum, crypto.mode)?;
952 if let Some(rsa_encrypted_key) = rsa_encrypted_key {
953 let target = ENCRYPTED_KEY_BLOB_NAME;
954 println!("Upload RSA encoded key to '{:?}'
as {}
", repo, target);
955 let options = UploadOptions { compress: false, encrypt: false, ..UploadOptions::default() };
957 .upload_blob_from_data(rsa_encrypted_key, target, options)
959 manifest.add_file(target.to_string(), stats.size, stats.csum, crypto.mode)?;
962 // create manifest (index.json)
963 // manifests are never encrypted, but include a signature
964 let manifest = manifest.to_string(crypt_config.as_ref().map(Arc::as_ref))
965 .map_err(|err| format_err!("unable to format manifest
- {}
", err))?;
968 if verbose { println!("Upload index.json to '{}'", repo
) };
969 let options
= UploadOptions { compress: true, encrypt: false, ..UploadOptions::default() }
;
971 .upload_blob_from_data(manifest
.into_bytes(), MANIFEST_BLOB_NAME
, options
)
974 client
.finish().await?
;
976 let end_time
= std
::time
::Instant
::now();
977 let elapsed
= end_time
.duration_since(start_time
);
978 println
!("Duration: {:.2}s", elapsed
.as_secs_f64());
980 println
!("End Time: {}", strftime_local("%c", epoch_i64())?
);
985 async
fn dump_image
<W
: Write
>(
986 client
: Arc
<BackupReader
>,
987 crypt_config
: Option
<Arc
<CryptConfig
>>,
988 crypt_mode
: CryptMode
,
989 index
: FixedIndexReader
,
992 ) -> Result
<(), Error
> {
994 let most_used
= index
.find_most_used_chunks(8);
996 let chunk_reader
= RemoteChunkReader
::new(client
.clone(), crypt_config
, crypt_mode
, most_used
);
998 // Note: we avoid using BufferedFixedReader, because that add an additional buffer/copy
999 // and thus slows down reading. Instead, directly use RemoteChunkReader
1002 let start_time
= std
::time
::Instant
::now();
1004 for pos
in 0..index
.index_count() {
1005 let digest
= index
.index_digest(pos
).unwrap();
1006 let raw_data
= chunk_reader
.read_chunk(&digest
).await?
;
1007 writer
.write_all(&raw_data
)?
;
1008 bytes
+= raw_data
.len();
1010 let next_per
= ((pos
+1)*100)/index
.index_count();
1011 if per
!= next_per
{
1012 eprintln
!("progress {}% (read {} bytes, duration {} sec)",
1013 next_per
, bytes
, start_time
.elapsed().as_secs());
1019 let end_time
= std
::time
::Instant
::now();
1020 let elapsed
= end_time
.duration_since(start_time
);
1021 eprintln
!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
1023 elapsed
.as_secs_f64(),
1024 bytes
as f64/(1024.0*1024.0*elapsed
.as_secs_f64())
1031 fn parse_archive_type(name
: &str) -> (String
, ArchiveType
) {
1032 if name
.ends_with(".didx") || name
.ends_with(".fidx") || name
.ends_with(".blob") {
1033 (name
.into(), archive_type(name
).unwrap())
1034 } else if name
.ends_with(".pxar") {
1035 (format
!("{}.didx", name
), ArchiveType
::DynamicIndex
)
1036 } else if name
.ends_with(".img") {
1037 (format
!("{}.fidx", name
), ArchiveType
::FixedIndex
)
1039 (format
!("{}.blob", name
), ArchiveType
::Blob
)
1047 schema
: REPO_URL_SCHEMA
,
1052 description
: "Group/Snapshot path.",
1055 description
: "Backup archive name.",
1060 description
: r
###"Target directory path. Use '-' to write to standard output.
1062 We do not extract '.pxar' archives when writing to standard output.
1067 schema
: TRAFFIC_CONTROL_RATE_SCHEMA
,
1071 schema
: TRAFFIC_CONTROL_BURST_SCHEMA
,
1074 "allow-existing-dirs": {
1076 description
: "Do not fail if directories already exists.",
1080 schema
: KEYFILE_SCHEMA
,
1084 schema
: KEYFD_SCHEMA
,
1094 /// Restore backup repository.
1095 async
fn restore(param
: Value
) -> Result
<Value
, Error
> {
1096 let repo
= extract_repository_from_value(¶m
)?
;
1098 let verbose
= param
["verbose"].as_bool().unwrap_or(false);
1100 let allow_existing_dirs
= param
["allow-existing-dirs"].as_bool().unwrap_or(false);
1102 let archive_name
= json
::required_string_param(¶m
, "archive-name")?
;
1104 let rate
= match param
["rate"].as_str() {
1105 Some(s
) => Some(s
.parse
::<HumanByte
>()?
),
1108 let burst
= match param
["burst"].as_str() {
1109 Some(s
) => Some(s
.parse
::<HumanByte
>()?
),
1113 let rate_limit
= RateLimitConfig
::with_same_inout(rate
, burst
);
1115 let client
= connect_rate_limited(&repo
, rate_limit
)?
;
1116 record_repository(&repo
);
1118 let path
= json
::required_string_param(¶m
, "snapshot")?
;
1120 let (backup_type
, backup_id
, backup_time
) = if path
.matches('
/'
).count() == 1 {
1121 let group
: BackupGroup
= path
.parse()?
;
1122 api_datastore_latest_snapshot(&client
, repo
.store(), group
).await?
1124 let snapshot
: BackupDir
= path
.parse()?
;
1125 (snapshot
.group().backup_type().to_owned(), snapshot
.group().backup_id().to_owned(), snapshot
.backup_time())
1128 let target
= json
::required_string_param(¶m
, "target")?
;
1129 let target
= if target
== "-" { None }
else { Some(target) }
;
1131 let crypto
= crypto_parameters(¶m
)?
;
1133 let crypt_config
= match crypto
.enc_key
{
1137 decrypt_key(&key
.key
, &get_encryption_key_password
).map_err(|err
| {
1138 eprintln
!("{}", format_key_source(&key
.source
, "encryption"));
1141 Some(Arc
::new(CryptConfig
::new(key
)?
))
1145 let client
= BackupReader
::start(
1147 crypt_config
.clone(),
1155 let (archive_name
, archive_type
) = parse_archive_type(archive_name
);
1157 let (manifest
, backup_index_data
) = client
.download_manifest().await?
;
1159 if archive_name
== ENCRYPTED_KEY_BLOB_NAME
&& crypt_config
.is_none() {
1160 eprintln
!("Restoring encrypted key blob without original key - skipping manifest fingerprint check!")
1162 if manifest
.signature
.is_some() {
1163 if let Some(key
) = &crypto
.enc_key
{
1164 eprintln
!("{}", format_key_source(&key
.source
, "encryption"));
1166 if let Some(config
) = &crypt_config
{
1167 eprintln
!("Fingerprint: {}", Fingerprint
::new(config
.fingerprint()));
1170 manifest
.check_fingerprint(crypt_config
.as_ref().map(Arc
::as_ref
))?
;
1173 if archive_name
== MANIFEST_BLOB_NAME
{
1174 if let Some(target
) = target
{
1175 replace_file(target
, &backup_index_data
, CreateOptions
::new(), false)?
;
1177 let stdout
= std
::io
::stdout();
1178 let mut writer
= stdout
.lock();
1179 writer
.write_all(&backup_index_data
)
1180 .map_err(|err
| format_err
!("unable to pipe data - {}", err
))?
;
1183 return Ok(Value
::Null
);
1186 let file_info
= manifest
.lookup_file_info(&archive_name
)?
;
1188 if archive_type
== ArchiveType
::Blob
{
1190 let mut reader
= client
.download_blob(&manifest
, &archive_name
).await?
;
1192 if let Some(target
) = target
{
1193 let mut writer
= std
::fs
::OpenOptions
::new()
1198 .map_err(|err
| format_err
!("unable to create target file {:?} - {}", target
, err
))?
;
1199 std
::io
::copy(&mut reader
, &mut writer
)?
;
1201 let stdout
= std
::io
::stdout();
1202 let mut writer
= stdout
.lock();
1203 std
::io
::copy(&mut reader
, &mut writer
)
1204 .map_err(|err
| format_err
!("unable to pipe data - {}", err
))?
;
1207 } else if archive_type
== ArchiveType
::DynamicIndex
{
1209 let index
= client
.download_dynamic_index(&manifest
, &archive_name
).await?
;
1211 let most_used
= index
.find_most_used_chunks(8);
1213 let chunk_reader
= RemoteChunkReader
::new(client
.clone(), crypt_config
, file_info
.chunk_crypt_mode(), most_used
);
1215 let mut reader
= BufferedDynamicReader
::new(index
, chunk_reader
);
1217 let options
= pbs_client
::pxar
::PxarExtractOptions
{
1219 extract_match_default
: true,
1220 allow_existing_dirs
,
1224 if let Some(target
) = target
{
1225 pbs_client
::pxar
::extract_archive(
1226 pxar
::decoder
::Decoder
::from_std(reader
)?
,
1228 pbs_client
::pxar
::Flags
::DEFAULT
,
1231 println
!("{:?}", path
);
1236 .map_err(|err
| format_err
!("error extracting archive - {}", err
))?
;
1238 let mut writer
= std
::fs
::OpenOptions
::new()
1240 .open("/dev/stdout")
1241 .map_err(|err
| format_err
!("unable to open /dev/stdout - {}", err
))?
;
1243 std
::io
::copy(&mut reader
, &mut writer
)
1244 .map_err(|err
| format_err
!("unable to pipe data - {}", err
))?
;
1246 } else if archive_type
== ArchiveType
::FixedIndex
{
1248 let index
= client
.download_fixed_index(&manifest
, &archive_name
).await?
;
1250 let mut writer
= if let Some(target
) = target
{
1251 std
::fs
::OpenOptions
::new()
1256 .map_err(|err
| format_err
!("unable to create target file {:?} - {}", target
, err
))?
1258 std
::fs
::OpenOptions
::new()
1260 .open("/dev/stdout")
1261 .map_err(|err
| format_err
!("unable to open /dev/stdout - {}", err
))?
1264 dump_image(client
.clone(), crypt_config
.clone(), file_info
.chunk_crypt_mode(), index
, &mut writer
, verbose
).await?
;
1276 description
: "Just show what prune would do, but do not delete anything.",
1280 description
: "Backup group",
1287 schema
: OUTPUT_FORMAT
,
1294 description
: "Minimal output - only show removals.",
1297 schema
: REPO_URL_SCHEMA
,
1303 /// Prune a backup repository.
1305 dry_run
: Option
<bool
>,
1307 prune_options
: PruneOptions
,
1310 ) -> Result
<Value
, Error
> {
1311 let repo
= extract_repository_from_value(¶m
)?
;
1313 let client
= connect(&repo
)?
;
1315 let path
= format
!("api2/json/admin/datastore/{}/prune", repo
.store());
1317 let group
: BackupGroup
= group
.parse()?
;
1319 let output_format
= extract_output_format(&mut param
);
1321 let mut api_param
= serde_json
::to_value(prune_options
)?
;
1322 if let Some(dry_run
) = dry_run
{
1323 api_param
["dry-run"] = dry_run
.into();
1325 api_param
["backup-type"] = group
.backup_type().into();
1326 api_param
["backup-id"] = group
.backup_id().into();
1328 let mut result
= client
.post(&path
, Some(api_param
)).await?
;
1330 record_repository(&repo
);
1332 let render_snapshot_path
= |_v
: &Value
, record
: &Value
| -> Result
<String
, Error
> {
1333 let item
: PruneListItem
= serde_json
::from_value(record
.to_owned())?
;
1334 let snapshot
= BackupDir
::new(item
.backup_type
, item
.backup_id
, item
.backup_time
)?
;
1335 Ok(snapshot
.relative_path().to_str().unwrap().to_owned())
1338 let render_prune_action
= |v
: &Value
, _record
: &Value
| -> Result
<String
, Error
> {
1339 Ok(match v
.as_bool() {
1340 Some(true) => "keep",
1341 Some(false) => "remove",
1346 let options
= default_table_format_options()
1347 .sortby("backup-type", false)
1348 .sortby("backup-id", false)
1349 .sortby("backup-time", false)
1350 .column(ColumnConfig
::new("backup-id").renderer(render_snapshot_path
).header("snapshot"))
1351 .column(ColumnConfig
::new("backup-time").renderer(pbs_tools
::format
::render_epoch
).header("date"))
1352 .column(ColumnConfig
::new("keep").renderer(render_prune_action
).header("action"))
1355 let return_type
= &pbs_api_types
::ADMIN_DATASTORE_PRUNE_RETURN_TYPE
;
1357 let mut data
= result
["data"].take();
1360 let list
: Vec
<Value
> = data
.as_array().unwrap().iter().filter(|item
| {
1361 item
["keep"].as_bool() == Some(false)
1362 }).cloned().collect();
1366 format_and_print_result_full(&mut data
, return_type
, &output_format
, &options
);
1375 schema
: REPO_URL_SCHEMA
,
1379 schema
: OUTPUT_FORMAT
,
1385 type: StorageStatus
,
1388 /// Get repository status.
1389 async
fn status(param
: Value
) -> Result
<Value
, Error
> {
1391 let repo
= extract_repository_from_value(¶m
)?
;
1393 let output_format
= get_output_format(¶m
);
1395 let client
= connect(&repo
)?
;
1397 let path
= format
!("api2/json/admin/datastore/{}/status", repo
.store());
1399 let mut result
= client
.get(&path
, None
).await?
;
1400 let mut data
= result
["data"].take();
1402 record_repository(&repo
);
1404 let render_total_percentage
= |v
: &Value
, record
: &Value
| -> Result
<String
, Error
> {
1405 let v
= v
.as_u64().unwrap();
1406 let total
= record
["total"].as_u64().unwrap();
1407 let roundup
= total
/200;
1408 let per
= ((v
+roundup
)*100)/total
;
1409 let info
= format
!(" ({} %)", per
);
1410 Ok(format
!("{} {:>8}", v
, info
))
1413 let options
= default_table_format_options()
1415 .column(ColumnConfig
::new("total").renderer(render_total_percentage
))
1416 .column(ColumnConfig
::new("used").renderer(render_total_percentage
))
1417 .column(ColumnConfig
::new("avail").renderer(render_total_percentage
));
1419 let return_type
= &API_METHOD_STATUS
.returns
;
1421 format_and_print_result_full(&mut data
, return_type
, &output_format
, &options
);
1426 /// This is a workaround until we have cleaned up the chunk/reader/... infrastructure for better
1429 /// Ideally BufferedDynamicReader gets replaced so the LruCache maps to `BroadcastFuture<Chunk>`,
1430 /// so that we can properly access it from multiple threads simultaneously while not issuing
1431 /// duplicate simultaneous reads over http.
1432 pub struct BufferedDynamicReadAt
{
1433 inner
: Mutex
<BufferedDynamicReader
<RemoteChunkReader
>>,
1436 impl BufferedDynamicReadAt
{
1437 fn new(inner
: BufferedDynamicReader
<RemoteChunkReader
>) -> Self {
1439 inner
: Mutex
::new(inner
),
1444 impl ReadAt
for BufferedDynamicReadAt
{
1445 fn start_read_at
<'a
>(
1446 self: Pin
<&'a
Self>,
1450 ) -> MaybeReady
<io
::Result
<usize>, ReadAtOperation
<'a
>> {
1451 MaybeReady
::Ready(tokio
::task
::block_in_place(move || {
1452 let mut reader
= self.inner
.lock().unwrap();
1453 reader
.seek(SeekFrom
::Start(offset
))?
;
1454 Ok(reader
.read(buf
)?
)
1458 fn poll_complete
<'a
>(
1459 self: Pin
<&'a
Self>,
1460 _op
: ReadAtOperation
<'a
>,
1461 ) -> MaybeReady
<io
::Result
<usize>, ReadAtOperation
<'a
>> {
1462 panic
!("BufferedDynamicReadAt::start_read_at returned Pending");
1468 let backup_cmd_def
= CliCommand
::new(&API_METHOD_CREATE_BACKUP
)
1469 .arg_param(&["backupspec"])
1470 .completion_cb("repository", complete_repository
)
1471 .completion_cb("backupspec", complete_backup_source
)
1472 .completion_cb("keyfile", complete_file_name
)
1473 .completion_cb("master-pubkey-file", complete_file_name
)
1474 .completion_cb("chunk-size", complete_chunk_size
);
1476 let benchmark_cmd_def
= CliCommand
::new(&API_METHOD_BENCHMARK
)
1477 .completion_cb("repository", complete_repository
)
1478 .completion_cb("keyfile", complete_file_name
);
1480 let list_cmd_def
= CliCommand
::new(&API_METHOD_LIST_BACKUP_GROUPS
)
1481 .completion_cb("repository", complete_repository
);
1483 let garbage_collect_cmd_def
= CliCommand
::new(&API_METHOD_START_GARBAGE_COLLECTION
)
1484 .completion_cb("repository", complete_repository
);
1486 let restore_cmd_def
= CliCommand
::new(&API_METHOD_RESTORE
)
1487 .arg_param(&["snapshot", "archive-name", "target"])
1488 .completion_cb("repository", complete_repository
)
1489 .completion_cb("snapshot", complete_group_or_snapshot
)
1490 .completion_cb("archive-name", complete_archive_name
)
1491 .completion_cb("target", complete_file_name
);
1493 let prune_cmd_def
= CliCommand
::new(&API_METHOD_PRUNE
)
1494 .arg_param(&["group"])
1495 .completion_cb("group", complete_backup_group
)
1496 .completion_cb("repository", complete_repository
);
1498 let status_cmd_def
= CliCommand
::new(&API_METHOD_STATUS
)
1499 .completion_cb("repository", complete_repository
);
1501 let login_cmd_def
= CliCommand
::new(&API_METHOD_API_LOGIN
)
1502 .completion_cb("repository", complete_repository
);
1504 let logout_cmd_def
= CliCommand
::new(&API_METHOD_API_LOGOUT
)
1505 .completion_cb("repository", complete_repository
);
1507 let version_cmd_def
= CliCommand
::new(&API_METHOD_API_VERSION
)
1508 .completion_cb("repository", complete_repository
);
1510 let change_owner_cmd_def
= CliCommand
::new(&API_METHOD_CHANGE_BACKUP_OWNER
)
1511 .arg_param(&["group", "new-owner"])
1512 .completion_cb("group", complete_backup_group
)
1513 .completion_cb("new-owner", complete_auth_id
)
1514 .completion_cb("repository", complete_repository
);
1516 let cmd_def
= CliCommandMap
::new()
1517 .insert("backup", backup_cmd_def
)
1518 .insert("garbage-collect", garbage_collect_cmd_def
)
1519 .insert("list", list_cmd_def
)
1520 .insert("login", login_cmd_def
)
1521 .insert("logout", logout_cmd_def
)
1522 .insert("prune", prune_cmd_def
)
1523 .insert("restore", restore_cmd_def
)
1524 .insert("snapshot", snapshot_mgtm_cli())
1525 .insert("status", status_cmd_def
)
1526 .insert("key", key
::cli())
1527 .insert("mount", mount_cmd_def())
1528 .insert("map", map_cmd_def())
1529 .insert("unmap", unmap_cmd_def())
1530 .insert("catalog", catalog_mgmt_cli())
1531 .insert("task", task_mgmt_cli())
1532 .insert("version", version_cmd_def
)
1533 .insert("benchmark", benchmark_cmd_def
)
1534 .insert("change-owner", change_owner_cmd_def
)
1536 .alias(&["files"], &["snapshot", "files"])
1537 .alias(&["forget"], &["snapshot", "forget"])
1538 .alias(&["upload-log"], &["snapshot", "upload-log"])
1539 .alias(&["snapshots"], &["snapshot", "list"])
1542 let rpcenv
= CliEnvironment
::new();
1543 run_cli_command(cmd_def
, rpcenv
, Some(|future
| {
1544 proxmox_async
::runtime
::main(future
)