1 use std
::path
::{Path, PathBuf}
;
3 use std
::collections
::{HashMap, HashSet, BTreeMap}
;
4 use std
::convert
::TryFrom
;
5 use std
::io
::{Seek, SeekFrom}
;
8 use anyhow
::{bail, format_err, Error}
;
18 schema
::parse_property_string
,
19 section_config
::SectionConfigData
,
32 Authid
, Userid
, CryptMode
,
33 DATASTORE_MAP_ARRAY_SCHEMA
, DATASTORE_MAP_LIST_SCHEMA
, DRIVE_NAME_SCHEMA
,
34 UPID_SCHEMA
, TAPE_RESTORE_SNAPSHOT_SCHEMA
,
35 PRIV_DATASTORE_BACKUP
, PRIV_DATASTORE_MODIFY
, PRIV_TAPE_READ
,
37 use pbs_datastore
::{DataStore, DataBlob}
;
38 use pbs_datastore
::backup_info
::BackupDir
;
39 use pbs_datastore
::dynamic_index
::DynamicIndexReader
;
40 use pbs_datastore
::fixed_index
::FixedIndexReader
;
41 use pbs_datastore
::index
::IndexFile
;
42 use pbs_datastore
::manifest
::{archive_type, ArchiveType, BackupManifest, MANIFEST_BLOB_NAME}
;
43 use pbs_config
::CachedUserInfo
;
45 TapeRead
, BlockReadError
, MediaContentHeader
,
46 PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0
,
48 use pbs_tools
::{task_log, task_warn, task::WorkerTaskContext}
;
49 use proxmox_rest_server
::WorkerTask
;
52 tools
::ParallelHandler
,
53 server
::lookup_user_email
,
63 PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0
,
64 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0
,
65 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1
,
66 PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0
,
67 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0
,
68 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1
,
69 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
,
72 SnapshotArchiveHeader
,
77 request_and_load_media
,
79 set_tape_device_state
,
84 const RESTORE_TMP_DIR
: &str = "/var/tmp/proxmox-backup";
86 pub struct DataStoreMap
{
87 map
: HashMap
<String
, Arc
<DataStore
>>,
88 default: Option
<Arc
<DataStore
>>,
91 impl TryFrom
<String
> for DataStoreMap
{
94 fn try_from(value
: String
) -> Result
<Self, Error
> {
95 let value
= parse_property_string(&value
, &DATASTORE_MAP_ARRAY_SCHEMA
)?
;
96 let mut mapping
: Vec
<String
> = value
100 .map(|v
| v
.as_str().unwrap().to_string())
103 let mut map
= HashMap
::new();
104 let mut default = None
;
105 while let Some(mut store
) = mapping
.pop() {
106 if let Some(index
) = store
.find('
='
) {
107 let mut target
= store
.split_off(index
);
108 target
.remove(0); // remove '='
109 let datastore
= DataStore
::lookup_datastore(&target
)?
;
110 map
.insert(store
, datastore
);
111 } else if default.is_none() {
112 default = Some(DataStore
::lookup_datastore(&store
)?
);
114 bail
!("multiple default stores given");
118 Ok(Self { map, default }
)
123 fn used_datastores
<'a
>(&self) -> HashSet
<&str> {
124 let mut set
= HashSet
::new();
125 for store
in self.map
.values() {
126 set
.insert(store
.name());
129 if let Some(ref store
) = self.default {
130 set
.insert(store
.name());
136 fn get_datastore(&self, source
: &str) -> Option
<Arc
<DataStore
>> {
137 if let Some(store
) = self.map
.get(source
) {
138 return Some(Arc
::clone(store
));
140 if let Some(ref store
) = self.default {
141 return Some(Arc
::clone(store
));
148 fn check_datastore_privs(
149 user_info
: &CachedUserInfo
,
152 owner
: &Option
<Authid
>,
153 ) -> Result
<(), Error
> {
154 let privs
= user_info
.lookup_privs(&auth_id
, &["datastore", &store
]);
155 if (privs
& PRIV_DATASTORE_BACKUP
) == 0 {
156 bail
!("no permissions on /datastore/{}", store
);
159 if let Some(ref owner
) = owner
{
160 let correct_owner
= owner
== auth_id
161 || (owner
.is_token() && !auth_id
.is_token() && owner
.user() == auth_id
.user());
163 // same permission as changing ownership after syncing
164 if !correct_owner
&& privs
& PRIV_DATASTORE_MODIFY
== 0 {
165 bail
!("no permission to restore as '{}'", owner
);
172 pub const ROUTER
: Router
= Router
::new().post(&API_METHOD_RESTORE
);
178 schema
: DATASTORE_MAP_LIST_SCHEMA
,
181 schema
: DRIVE_NAME_SCHEMA
,
184 description
: "Media set UUID.",
192 description
: "List of snapshots.",
196 schema
: TAPE_RESTORE_SNAPSHOT_SCHEMA
,
209 // Note: parameters are no uri parameter, so we need to test inside function body
210 description
: "The user needs Tape.Read privilege on /tape/pool/{pool} \
211 and /tape/drive/{drive}, Datastore.Backup privilege on /datastore/{store}.",
212 permission
: &Permission
::Anybody
,
215 /// Restore data from media-set
220 notify_user
: Option
<Userid
>,
221 snapshots
: Option
<Vec
<String
>>,
222 owner
: Option
<Authid
>,
223 rpcenv
: &mut dyn RpcEnvironment
,
224 ) -> Result
<Value
, Error
> {
225 let auth_id
: Authid
= rpcenv
.get_auth_id().unwrap().parse()?
;
226 let user_info
= CachedUserInfo
::new()?
;
228 let store_map
= DataStoreMap
::try_from(store
)
229 .map_err(|err
| format_err
!("cannot parse store mapping: {}", err
))?
;
230 let used_datastores
= store_map
.used_datastores();
231 if used_datastores
.len() == 0 {
232 bail
!("no datastores given");
235 for store
in used_datastores
.iter() {
236 check_datastore_privs(&user_info
, &store
, &auth_id
, &owner
)?
;
239 let privs
= user_info
.lookup_privs(&auth_id
, &["tape", "drive", &drive
]);
240 if (privs
& PRIV_TAPE_READ
) == 0 {
241 bail
!("no permissions on /tape/drive/{}", drive
);
244 let media_set_uuid
= media_set
.parse()?
;
246 let status_path
= Path
::new(TAPE_STATUS_DIR
);
248 let _lock
= lock_media_set(status_path
, &media_set_uuid
, None
)?
;
250 let inventory
= Inventory
::load(status_path
)?
;
252 let pool
= inventory
.lookup_media_set_pool(&media_set_uuid
)?
;
254 let privs
= user_info
.lookup_privs(&auth_id
, &["tape", "pool", &pool
]);
255 if (privs
& PRIV_TAPE_READ
) == 0 {
256 bail
!("no permissions on /tape/pool/{}", pool
);
259 let (drive_config
, _digest
) = pbs_config
::drive
::config()?
;
261 // early check/lock before starting worker
262 let drive_lock
= lock_tape_device(&drive_config
, &drive
)?
;
264 let to_stdout
= rpcenv
.env_type() == RpcEnvironmentType
::CLI
;
266 let taskid
= used_datastores
268 .map(|s
| s
.to_string())
269 .collect
::<Vec
<String
>>()
272 let upid_str
= WorkerTask
::new_thread(
278 let _drive_lock
= drive_lock
; // keep lock guard
280 set_tape_device_state(&drive
, &worker
.upid().to_string())?
;
282 let restore_owner
= owner
.as_ref().unwrap_or(&auth_id
);
284 let email
= notify_user
286 .and_then(|userid
| lookup_user_email(userid
))
287 .or_else(|| lookup_user_email(&auth_id
.clone().into()));
289 task_log
!(worker
, "Mediaset '{}'", media_set
);
290 task_log
!(worker
, "Pool: {}", pool
);
292 let res
= if let Some(snapshots
) = snapshots
{
318 task_log
!(worker
, "Restore mediaset '{}' done", media_set
);
321 if let Err(err
) = set_tape_device_state(&drive
, "") {
324 "could not unset drive state for {}: {}",
337 fn restore_full_worker(
338 worker
: Arc
<WorkerTask
>,
339 inventory
: Inventory
,
340 media_set_uuid
: Uuid
,
341 drive_config
: SectionConfigData
,
343 store_map
: DataStoreMap
,
344 restore_owner
: &Authid
,
345 email
: Option
<String
>,
346 ) -> Result
<(), Error
> {
347 let members
= inventory
.compute_media_set_members(&media_set_uuid
)?
;
349 let media_list
= members
.media_list();
351 let mut media_id_list
= Vec
::new();
353 let mut encryption_key_fingerprint
= None
;
355 for (seq_nr
, media_uuid
) in media_list
.iter().enumerate() {
358 bail
!("media set {} is incomplete (missing member {}).", media_set_uuid
, seq_nr
);
360 Some(media_uuid
) => {
361 let media_id
= inventory
.lookup_media(media_uuid
).unwrap();
362 if let Some(ref set
) = media_id
.media_set_label
{ // always true here
363 if encryption_key_fingerprint
.is_none() && set
.encryption_key_fingerprint
.is_some() {
364 encryption_key_fingerprint
= set
.encryption_key_fingerprint
.clone();
367 media_id_list
.push(media_id
);
372 if let Some(fingerprint
) = encryption_key_fingerprint
{
373 task_log
!(worker
, "Encryption key fingerprint: {}", fingerprint
);
383 .collect
::<Vec
<String
>>()
387 task_log
!(worker
, "Drive: {}", drive_name
);
390 "Required media list: {}",
392 .map(|media_id
| media_id
.label
.label_text
.as_str())
393 .collect
::<Vec
<&str>>()
397 let mut datastore_locks
= Vec
::new();
398 for store_name
in store_map
.used_datastores() {
399 // explicit create shared lock to prevent GC on newly created chunks
400 if let Some(store
) = store_map
.get_datastore(store_name
) {
401 let shared_store_lock
= store
.try_shared_chunk_store_lock()?
;
402 datastore_locks
.push(shared_store_lock
);
406 let mut checked_chunks_map
= HashMap
::new();
408 for media_id
in media_id_list
.iter() {
409 request_and_restore_media(
415 &mut checked_chunks_map
,
424 fn restore_list_worker(
425 worker
: Arc
<WorkerTask
>,
426 snapshots
: Vec
<String
>,
427 inventory
: Inventory
,
428 media_set_uuid
: Uuid
,
429 drive_config
: SectionConfigData
,
431 store_map
: DataStoreMap
,
432 restore_owner
: &Authid
,
433 email
: Option
<String
>,
434 ) -> Result
<(), Error
> {
435 let base_path
: PathBuf
= format
!("{}/{}", RESTORE_TMP_DIR
, media_set_uuid
).into();
436 std
::fs
::create_dir_all(&base_path
)?
;
438 let catalog
= get_media_set_catalog(&inventory
, &media_set_uuid
)?
;
440 let mut datastore_locks
= Vec
::new();
441 let mut snapshot_file_hash
: BTreeMap
<Uuid
, Vec
<u64>> = BTreeMap
::new();
442 let mut snapshot_locks
= HashMap
::new();
444 let res
= proxmox
::try_block
!({
445 // assemble snapshot files/locks
446 for store_snapshot
in snapshots
.iter() {
447 let mut split
= store_snapshot
.splitn(2, '
:'
);
448 let source_datastore
= split
450 .ok_or_else(|| format_err
!("invalid snapshot: {}", store_snapshot
))?
;
453 .ok_or_else(|| format_err
!("invalid snapshot:{}", store_snapshot
))?
;
454 let backup_dir
: BackupDir
= snapshot
.parse()?
;
456 let datastore
= store_map
.get_datastore(source_datastore
).ok_or_else(|| {
458 "could not find mapping for source datastore: {}",
463 let (owner
, _group_lock
) =
464 datastore
.create_locked_backup_group(backup_dir
.group(), &restore_owner
)?
;
465 if restore_owner
!= &owner
{
466 // only the owner is allowed to create additional snapshots
468 "restore '{}' failed - owner check failed ({} != {})",
475 let (media_id
, file_num
) = if let Some((media_uuid
, file_num
)) =
476 catalog
.lookup_snapshot(&source_datastore
, &snapshot
)
478 let media_id
= inventory
.lookup_media(media_uuid
).unwrap();
483 "did not find snapshot '{}' in media set {}",
490 let (_rel_path
, is_new
, snap_lock
) = datastore
.create_locked_backup_dir(&backup_dir
)?
;
495 "found snapshot {} on target datastore, skipping...",
501 snapshot_locks
.insert(store_snapshot
.to_string(), snap_lock
);
503 let shared_store_lock
= datastore
.try_shared_chunk_store_lock()?
;
504 datastore_locks
.push(shared_store_lock
);
506 let file_list
= snapshot_file_hash
507 .entry(media_id
.label
.uuid
.clone())
508 .or_insert_with(Vec
::new
);
509 file_list
.push(file_num
);
513 "found snapshot {} on {}: file {}",
515 media_id
.label
.label_text
,
520 if snapshot_file_hash
.is_empty() {
521 task_log
!(worker
, "nothing to restore, skipping remaining phases...");
525 task_log
!(worker
, "Phase 1: temporarily restore snapshots to temp dir");
526 let mut datastore_chunk_map
: HashMap
<String
, HashSet
<[u8; 32]>> = HashMap
::new();
527 for (media_uuid
, file_list
) in snapshot_file_hash
.iter_mut() {
528 let media_id
= inventory
.lookup_media(media_uuid
).unwrap();
529 let (drive
, info
) = request_and_load_media(
536 file_list
.sort_unstable();
537 restore_snapshots_to_tmpdir(
544 &mut datastore_chunk_map
,
545 ).map_err(|err
| format_err
!("could not restore snapshots to tmpdir: {}", err
))?
;
548 // sorted media_uuid => (sorted file_num => (set of digests)))
549 let mut media_file_chunk_map
: BTreeMap
<Uuid
, BTreeMap
<u64, HashSet
<[u8; 32]>>> = BTreeMap
::new();
551 for (source_datastore
, chunks
) in datastore_chunk_map
.into_iter() {
552 let datastore
= store_map
.get_datastore(&source_datastore
).ok_or_else(|| {
554 "could not find mapping for source datastore: {}",
558 for digest
in chunks
.into_iter() {
559 // we only want to restore chunks that we do not have yet
560 if !datastore
.cond_touch_chunk(&digest
, false)?
{
561 if let Some((uuid
, nr
)) = catalog
.lookup_chunk(&source_datastore
, &digest
) {
562 let file
= media_file_chunk_map
.entry(uuid
.clone()).or_insert_with(BTreeMap
::new
);
563 let chunks
= file
.entry(nr
).or_insert_with(HashSet
::new
);
564 chunks
.insert(digest
);
570 // we do not need it anymore, saves memory
573 if !media_file_chunk_map
.is_empty() {
574 task_log
!(worker
, "Phase 2: restore chunks to datastores");
576 task_log
!(worker
, "all chunks exist already, skipping phase 2...");
579 for (media_uuid
, file_chunk_map
) in media_file_chunk_map
.iter_mut() {
580 let media_id
= inventory
.lookup_media(media_uuid
).unwrap();
581 let (mut drive
, _info
) = request_and_load_media(
588 restore_file_chunk_map(worker
.clone(), &mut drive
, &store_map
, file_chunk_map
)?
;
593 "Phase 3: copy snapshots from temp dir to datastores"
595 for (store_snapshot
, _lock
) in snapshot_locks
.into_iter() {
596 proxmox
::try_block
!({
597 let mut split
= store_snapshot
.splitn(2, '
:'
);
598 let source_datastore
= split
600 .ok_or_else(|| format_err
!("invalid snapshot: {}", store_snapshot
))?
;
603 .ok_or_else(|| format_err
!("invalid snapshot:{}", store_snapshot
))?
;
604 let backup_dir
: BackupDir
= snapshot
.parse()?
;
606 let datastore
= store_map
607 .get_datastore(&source_datastore
)
608 .ok_or_else(|| format_err
!("unexpected source datastore: {}", source_datastore
))?
;
610 let mut tmp_path
= base_path
.clone();
611 tmp_path
.push(&source_datastore
);
612 tmp_path
.push(snapshot
);
614 let path
= datastore
.snapshot_path(&backup_dir
);
616 for entry
in std
::fs
::read_dir(tmp_path
)?
{
618 let mut new_path
= path
.clone();
619 new_path
.push(entry
.file_name());
620 std
::fs
::copy(entry
.path(), new_path
)?
;
622 task_log
!(worker
, "Restore snapshot '{}' done", snapshot
);
624 }).map_err(|err
: Error
| format_err
!("could not copy {}: {}", store_snapshot
, err
))?
;
630 task_warn
!(worker
, "Error during restore, partially restored snapshots will NOT be cleaned up");
633 match std
::fs
::remove_dir_all(&base_path
) {
635 Err(err
) => task_warn
!(worker
, "error cleaning up: {}", err
),
641 fn get_media_set_catalog(
642 inventory
: &Inventory
,
643 media_set_uuid
: &Uuid
,
644 ) -> Result
<MediaSetCatalog
, Error
> {
645 let status_path
= Path
::new(TAPE_STATUS_DIR
);
647 let members
= inventory
.compute_media_set_members(media_set_uuid
)?
;
648 let media_list
= members
.media_list();
649 let mut catalog
= MediaSetCatalog
::new();
651 for (seq_nr
, media_uuid
) in media_list
.iter().enumerate() {
655 "media set {} is incomplete (missing member {}).",
660 Some(media_uuid
) => {
661 let media_id
= inventory
.lookup_media(media_uuid
).unwrap();
662 let media_catalog
= MediaCatalog
::open(status_path
, &media_id
, false, false)?
;
663 catalog
.append_catalog(media_catalog
)?
;
671 fn restore_snapshots_to_tmpdir(
672 worker
: Arc
<WorkerTask
>,
675 mut drive
: Box
<dyn TapeDriver
>,
677 media_set_uuid
: &Uuid
,
678 chunks_list
: &mut HashMap
<String
, HashSet
<[u8; 32]>>,
679 ) -> Result
<(), Error
> {
680 match media_id
.media_set_label
{
683 "missing media set label on media {} ({})",
684 media_id
.label
.label_text
,
689 if set
.uuid
!= *media_set_uuid
{
691 "wrong media set label on media {} ({} != {})",
692 media_id
.label
.label_text
,
697 let encrypt_fingerprint
= set
.encryption_key_fingerprint
.clone().map(|fp
| {
698 task_log
!(worker
, "Encryption key fingerprint: {}", fp
);
699 (fp
, set
.uuid
.clone())
702 drive
.set_encryption(encrypt_fingerprint
)?
;
706 for file_num
in file_list
{
707 let current_file_number
= drive
.current_file_number()?
;
708 if current_file_number
!= *file_num
{
709 task_log
!(worker
, "was at file {}, moving to {}", current_file_number
, file_num
);
710 drive
.move_to_file(*file_num
)?
;
711 let current_file_number
= drive
.current_file_number()?
;
712 task_log
!(worker
, "now at file {}", current_file_number
);
714 let mut reader
= drive
.read_next_file()?
;
716 let header
: MediaContentHeader
= unsafe { reader.read_le_value()? }
;
717 if header
.magic
!= PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0
{
718 bail
!("missing MediaContentHeader");
721 match header
.content_magic
{
722 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1
=> {
723 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
725 let archive_header
: SnapshotArchiveHeader
= serde_json
::from_slice(&header_data
)
727 format_err
!("unable to parse snapshot archive header - {}", err
)
730 let source_datastore
= archive_header
.store
;
731 let snapshot
= archive_header
.snapshot
;
735 "File {}: snapshot archive {}:{}",
741 let mut decoder
= pxar
::decoder
::sync
::Decoder
::from_std(reader
)?
;
743 let mut tmp_path
= path
.clone();
744 tmp_path
.push(&source_datastore
);
745 tmp_path
.push(snapshot
);
746 std
::fs
::create_dir_all(&tmp_path
)?
;
748 let chunks
= chunks_list
749 .entry(source_datastore
)
750 .or_insert_with(HashSet
::new
);
751 let manifest
= try_restore_snapshot_archive(worker
.clone(), &mut decoder
, &tmp_path
)?
;
752 for item
in manifest
.files() {
753 let mut archive_path
= tmp_path
.to_owned();
754 archive_path
.push(&item
.filename
);
756 let index
: Box
<dyn IndexFile
> = match archive_type(&item
.filename
)?
{
757 ArchiveType
::DynamicIndex
=> {
758 Box
::new(DynamicIndexReader
::open(&archive_path
)?
)
760 ArchiveType
::FixedIndex
=> {
761 Box
::new(FixedIndexReader
::open(&archive_path
)?
)
763 ArchiveType
::Blob
=> continue,
765 for i
in 0..index
.index_count() {
766 if let Some(digest
) = index
.index_digest(i
) {
767 chunks
.insert(*digest
);
772 other
=> bail
!("unexpected file type: {:?}", other
),
779 fn restore_file_chunk_map(
780 worker
: Arc
<WorkerTask
>,
781 drive
: &mut Box
<dyn TapeDriver
>,
782 store_map
: &DataStoreMap
,
783 file_chunk_map
: &mut BTreeMap
<u64, HashSet
<[u8; 32]>>,
784 ) -> Result
<(), Error
> {
785 for (nr
, chunk_map
) in file_chunk_map
.iter_mut() {
786 let current_file_number
= drive
.current_file_number()?
;
787 if current_file_number
!= *nr
{
788 task_log
!(worker
, "was at file {}, moving to {}", current_file_number
, nr
);
789 drive
.move_to_file(*nr
)?
;
790 let current_file_number
= drive
.current_file_number()?
;
791 task_log
!(worker
, "now at file {}", current_file_number
);
793 let mut reader
= drive
.read_next_file()?
;
794 let header
: MediaContentHeader
= unsafe { reader.read_le_value()? }
;
795 if header
.magic
!= PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0
{
796 bail
!("missing MediaContentHeader");
799 match header
.content_magic
{
800 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1
=> {
801 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
803 let archive_header
: ChunkArchiveHeader
= serde_json
::from_slice(&header_data
)
804 .map_err(|err
| format_err
!("unable to parse chunk archive header - {}", err
))?
;
806 let source_datastore
= archive_header
.store
;
810 "File {}: chunk archive for datastore '{}'",
815 let datastore
= store_map
.get_datastore(&source_datastore
).ok_or_else(|| {
816 format_err
!("unexpected chunk archive for store: {}", source_datastore
)
819 let count
= restore_partial_chunk_archive(worker
.clone(), reader
, datastore
.clone(), chunk_map
)?
;
820 task_log
!(worker
, "restored {} chunks", count
);
822 _
=> bail
!("unexpected content magic {:?}", header
.content_magic
),
829 fn restore_partial_chunk_archive
<'a
>(
830 worker
: Arc
<WorkerTask
>,
831 reader
: Box
<dyn 'a
+ TapeRead
>,
832 datastore
: Arc
<DataStore
>,
833 chunk_list
: &mut HashSet
<[u8; 32]>,
834 ) -> Result
<usize, Error
> {
835 let mut decoder
= ChunkArchiveDecoder
::new(reader
);
839 let start_time
= std
::time
::SystemTime
::now();
840 let bytes
= Arc
::new(std
::sync
::atomic
::AtomicU64
::new(0));
841 let bytes2
= bytes
.clone();
843 let writer_pool
= ParallelHandler
::new(
844 "tape restore chunk writer",
846 move |(chunk
, digest
): (DataBlob
, [u8; 32])| {
847 if !datastore
.cond_touch_chunk(&digest
, false)?
{
848 bytes2
.fetch_add(chunk
.raw_size(), std
::sync
::atomic
::Ordering
::SeqCst
);
850 if chunk
.crypt_mode()?
== CryptMode
::None
{
851 chunk
.decode(None
, Some(&digest
))?
; // verify digest
854 datastore
.insert_chunk(&chunk
, &digest
)?
;
860 let verify_and_write_channel
= writer_pool
.channel();
863 let (digest
, blob
) = match decoder
.next_chunk()?
{
864 Some((digest
, blob
)) => (digest
, blob
),
868 worker
.check_abort()?
;
870 if chunk_list
.remove(&digest
) {
871 verify_and_write_channel
.send((blob
, digest
.clone()))?
;
875 if chunk_list
.is_empty() {
880 drop(verify_and_write_channel
);
882 writer_pool
.complete()?
;
884 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
886 let bytes
= bytes
.load(std
::sync
::atomic
::Ordering
::SeqCst
);
890 "restored {} bytes ({:.2} MB/s)",
892 (bytes
as f64) / (1_000_000.0 * elapsed
)
899 /// Request and restore complete media without using existing catalog (create catalog instead)
900 pub fn request_and_restore_media(
901 worker
: Arc
<WorkerTask
>,
903 drive_config
: &SectionConfigData
,
905 store_map
: &DataStoreMap
,
906 checked_chunks_map
: &mut HashMap
<String
, HashSet
<[u8;32]>>,
907 restore_owner
: &Authid
,
908 email
: &Option
<String
>,
909 ) -> Result
<(), Error
> {
910 let media_set_uuid
= match media_id
.media_set_label
{
911 None
=> bail
!("restore_media: no media set - internal error"),
912 Some(ref set
) => &set
.uuid
,
915 let (mut drive
, info
) = request_and_load_media(&worker
, &drive_config
, &drive_name
, &media_id
.label
, email
)?
;
917 match info
.media_set_label
{
919 bail
!("missing media set label on media {} ({})",
920 media_id
.label
.label_text
, media_id
.label
.uuid
);
923 if &set
.uuid
!= media_set_uuid
{
924 bail
!("wrong media set label on media {} ({} != {})",
925 media_id
.label
.label_text
, media_id
.label
.uuid
,
928 let encrypt_fingerprint
= set
.encryption_key_fingerprint
.clone()
929 .map(|fp
| (fp
, set
.uuid
.clone()));
931 drive
.set_encryption(encrypt_fingerprint
)?
;
939 Some((&store_map
, restore_owner
)),
945 /// Restore complete media content and catalog
947 /// Only create the catalog if target is None.
948 pub fn restore_media(
949 worker
: Arc
<WorkerTask
>,
950 drive
: &mut Box
<dyn TapeDriver
>,
952 target
: Option
<(&DataStoreMap
, &Authid
)>,
953 checked_chunks_map
: &mut HashMap
<String
, HashSet
<[u8;32]>>,
955 ) -> Result
<(), Error
> {
957 let status_path
= Path
::new(TAPE_STATUS_DIR
);
958 let mut catalog
= MediaCatalog
::create_temporary_database(status_path
, media_id
, false)?
;
961 let current_file_number
= drive
.current_file_number()?
;
962 let reader
= match drive
.read_next_file() {
963 Err(BlockReadError
::EndOfFile
) => {
964 task_log
!(worker
, "skip unexpected filemark at pos {}", current_file_number
);
967 Err(BlockReadError
::EndOfStream
) => {
968 task_log
!(worker
, "detected EOT after {} files", current_file_number
);
971 Err(BlockReadError
::Error(err
)) => {
972 return Err(err
.into());
974 Ok(reader
) => reader
,
977 restore_archive(worker
.clone(), reader
, current_file_number
, target
, &mut catalog
, checked_chunks_map
, verbose
)?
;
982 MediaCatalog
::finish_temporary_database(status_path
, &media_id
.label
.uuid
, true)?
;
987 fn restore_archive
<'a
>(
988 worker
: Arc
<WorkerTask
>,
989 mut reader
: Box
<dyn 'a
+ TapeRead
>,
990 current_file_number
: u64,
991 target
: Option
<(&DataStoreMap
, &Authid
)>,
992 catalog
: &mut MediaCatalog
,
993 checked_chunks_map
: &mut HashMap
<String
, HashSet
<[u8;32]>>,
995 ) -> Result
<(), Error
> {
996 let header
: MediaContentHeader
= unsafe { reader.read_le_value()? }
;
997 if header
.magic
!= PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0
{
998 bail
!("missing MediaContentHeader");
1001 //println!("Found MediaContentHeader: {:?}", header);
1003 match header
.content_magic
{
1004 PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0
| PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0
=> {
1005 bail
!("unexpected content magic (label)");
1007 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0
=> {
1008 bail
!("unexpected snapshot archive version (v1.0)");
1010 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1
=> {
1011 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
1013 let archive_header
: SnapshotArchiveHeader
= serde_json
::from_slice(&header_data
)
1014 .map_err(|err
| format_err
!("unable to parse snapshot archive header - {}", err
))?
;
1016 let datastore_name
= archive_header
.store
;
1017 let snapshot
= archive_header
.snapshot
;
1019 task_log
!(worker
, "File {}: snapshot archive {}:{}", current_file_number
, datastore_name
, snapshot
);
1021 let backup_dir
: BackupDir
= snapshot
.parse()?
;
1023 if let Some((store_map
, authid
)) = target
.as_ref() {
1024 if let Some(datastore
) = store_map
.get_datastore(&datastore_name
) {
1025 let (owner
, _group_lock
) =
1026 datastore
.create_locked_backup_group(backup_dir
.group(), authid
)?
;
1027 if *authid
!= &owner
{
1028 // only the owner is allowed to create additional snapshots
1030 "restore '{}' failed - owner check failed ({} != {})",
1037 let (rel_path
, is_new
, _snap_lock
) =
1038 datastore
.create_locked_backup_dir(&backup_dir
)?
;
1039 let mut path
= datastore
.base_path();
1040 path
.push(rel_path
);
1043 task_log
!(worker
, "restore snapshot {}", backup_dir
);
1045 match restore_snapshot_archive(worker
.clone(), reader
, &path
) {
1047 std
::fs
::remove_dir_all(&path
)?
;
1048 bail
!("restore snapshot {} failed - {}", backup_dir
, err
);
1051 std
::fs
::remove_dir_all(&path
)?
;
1052 task_log
!(worker
, "skip incomplete snapshot {}", backup_dir
);
1055 catalog
.register_snapshot(
1056 Uuid
::from(header
.uuid
),
1057 current_file_number
,
1061 catalog
.commit_if_large()?
;
1067 task_log
!(worker
, "skipping...");
1071 reader
.skip_data()?
; // read all data
1072 if let Ok(false) = reader
.is_incomplete() {
1073 catalog
.register_snapshot(Uuid
::from(header
.uuid
), current_file_number
, &datastore_name
, &snapshot
)?
;
1074 catalog
.commit_if_large()?
;
1077 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0
=> {
1078 bail
!("unexpected chunk archive version (v1.0)");
1080 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1
=> {
1081 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
1083 let archive_header
: ChunkArchiveHeader
= serde_json
::from_slice(&header_data
)
1084 .map_err(|err
| format_err
!("unable to parse chunk archive header - {}", err
))?
;
1086 let source_datastore
= archive_header
.store
;
1088 task_log
!(worker
, "File {}: chunk archive for datastore '{}'", current_file_number
, source_datastore
);
1089 let datastore
= target
1091 .and_then(|t
| t
.0.get_datastore(&source_datastore
));
1093 if datastore
.is_some() || target
.is_none() {
1094 let checked_chunks
= checked_chunks_map
1095 .entry(datastore
.as_ref().map(|d
| d
.name()).unwrap_or("_unused_").to_string())
1096 .or_insert(HashSet
::new());
1098 let chunks
= if let Some(datastore
) = datastore
{
1099 restore_chunk_archive(worker
.clone(), reader
, datastore
, checked_chunks
, verbose
)?
1101 scan_chunk_archive(worker
.clone(), reader
, verbose
)?
1104 if let Some(chunks
) = chunks
{
1105 catalog
.register_chunk_archive(
1106 Uuid
::from(header
.uuid
),
1107 current_file_number
,
1111 task_log
!(worker
, "register {} chunks", chunks
.len());
1112 catalog
.commit_if_large()?
;
1115 } else if target
.is_some() {
1116 task_log
!(worker
, "skipping...");
1119 reader
.skip_data()?
; // read all data
1121 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
=> {
1122 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
1124 let archive_header
: CatalogArchiveHeader
= serde_json
::from_slice(&header_data
)
1125 .map_err(|err
| format_err
!("unable to parse catalog archive header - {}", err
))?
;
1127 task_log
!(worker
, "File {}: skip catalog '{}'", current_file_number
, archive_header
.uuid
);
1129 reader
.skip_data()?
; // read all data
1131 _
=> bail
!("unknown content magic {:?}", header
.content_magic
),
1137 // Read chunk archive without restoring data - just record contained chunks
1138 fn scan_chunk_archive
<'a
>(
1139 worker
: Arc
<WorkerTask
>,
1140 reader
: Box
<dyn 'a
+ TapeRead
>,
1142 ) -> Result
<Option
<Vec
<[u8;32]>>, Error
> {
1144 let mut chunks
= Vec
::new();
1146 let mut decoder
= ChunkArchiveDecoder
::new(reader
);
1149 let digest
= match decoder
.next_chunk() {
1150 Ok(Some((digest
, _blob
))) => digest
,
1153 let reader
= decoder
.reader();
1155 // check if this stream is marked incomplete
1156 if let Ok(true) = reader
.is_incomplete() {
1157 return Ok(Some(chunks
));
1160 // check if this is an aborted stream without end marker
1161 if let Ok(false) = reader
.has_end_marker() {
1162 task_log
!(worker
, "missing stream end marker");
1166 // else the archive is corrupt
1171 worker
.check_abort()?
;
1174 task_log
!(worker
, "Found chunk: {}", proxmox
::tools
::digest_to_hex(&digest
));
1177 chunks
.push(digest
);
1183 fn restore_chunk_archive
<'a
>(
1184 worker
: Arc
<WorkerTask
>,
1185 reader
: Box
<dyn 'a
+ TapeRead
>,
1186 datastore
: Arc
<DataStore
>,
1187 checked_chunks
: &mut HashSet
<[u8;32]>,
1189 ) -> Result
<Option
<Vec
<[u8;32]>>, Error
> {
1191 let mut chunks
= Vec
::new();
1193 let mut decoder
= ChunkArchiveDecoder
::new(reader
);
1195 let datastore2
= datastore
.clone();
1196 let start_time
= std
::time
::SystemTime
::now();
1197 let bytes
= Arc
::new(std
::sync
::atomic
::AtomicU64
::new(0));
1198 let bytes2
= bytes
.clone();
1200 let worker2
= worker
.clone();
1202 let writer_pool
= ParallelHandler
::new(
1203 "tape restore chunk writer",
1205 move |(chunk
, digest
): (DataBlob
, [u8; 32])| {
1206 let chunk_exists
= datastore2
.cond_touch_chunk(&digest
, false)?
;
1209 task_log
!(worker2
, "Insert chunk: {}", proxmox
::tools
::digest_to_hex(&digest
));
1211 bytes2
.fetch_add(chunk
.raw_size(), std
::sync
::atomic
::Ordering
::SeqCst
);
1212 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
1213 chunk
.verify_crc()?
;
1214 if chunk
.crypt_mode()?
== CryptMode
::None
{
1215 chunk
.decode(None
, Some(&digest
))?
; // verify digest
1218 datastore2
.insert_chunk(&chunk
, &digest
)?
;
1220 task_log
!(worker2
, "Found existing chunk: {}", proxmox
::tools
::digest_to_hex(&digest
));
1226 let verify_and_write_channel
= writer_pool
.channel();
1230 let (digest
, blob
) = match decoder
.next_chunk() {
1231 Ok(Some((digest
, blob
))) => (digest
, blob
),
1234 let reader
= decoder
.reader();
1236 // check if this stream is marked incomplete
1237 if let Ok(true) = reader
.is_incomplete() {
1238 return Ok(Some(chunks
));
1241 // check if this is an aborted stream without end marker
1242 if let Ok(false) = reader
.has_end_marker() {
1243 task_log
!(worker
, "missing stream end marker");
1247 // else the archive is corrupt
1252 worker
.check_abort()?
;
1254 if !checked_chunks
.contains(&digest
) {
1255 verify_and_write_channel
.send((blob
, digest
.clone()))?
;
1256 checked_chunks
.insert(digest
.clone());
1258 chunks
.push(digest
);
1261 drop(verify_and_write_channel
);
1263 writer_pool
.complete()?
;
1265 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
1267 let bytes
= bytes
.load(std
::sync
::atomic
::Ordering
::SeqCst
);
1271 "restored {} bytes ({:.2} MB/s)",
1273 (bytes
as f64) / (1_000_000.0 * elapsed
)
1279 fn restore_snapshot_archive
<'a
>(
1280 worker
: Arc
<WorkerTask
>,
1281 reader
: Box
<dyn 'a
+ TapeRead
>,
1282 snapshot_path
: &Path
,
1283 ) -> Result
<bool
, Error
> {
1285 let mut decoder
= pxar
::decoder
::sync
::Decoder
::from_std(reader
)?
;
1286 match try_restore_snapshot_archive(worker
, &mut decoder
, snapshot_path
) {
1289 let reader
= decoder
.input();
1291 // check if this stream is marked incomplete
1292 if let Ok(true) = reader
.is_incomplete() {
1296 // check if this is an aborted stream without end marker
1297 if let Ok(false) = reader
.has_end_marker() {
1301 // else the archive is corrupt
1307 fn try_restore_snapshot_archive
<R
: pxar
::decoder
::SeqRead
>(
1308 worker
: Arc
<WorkerTask
>,
1309 decoder
: &mut pxar
::decoder
::sync
::Decoder
<R
>,
1310 snapshot_path
: &Path
,
1311 ) -> Result
<BackupManifest
, Error
> {
1313 let _root
= match decoder
.next() {
1314 None
=> bail
!("missing root entry"),
1318 pxar
::EntryKind
::Directory
=> { /* Ok */ }
1319 _
=> bail
!("wrong root entry type"),
1325 let root_path
= Path
::new("/");
1326 let manifest_file_name
= OsStr
::new(MANIFEST_BLOB_NAME
);
1328 let mut manifest
= None
;
1331 worker
.check_abort()?
;
1333 let entry
= match decoder
.next() {
1335 Some(entry
) => entry?
,
1337 let entry_path
= entry
.path();
1339 match entry
.kind() {
1340 pxar
::EntryKind
::File { .. }
=> { /* Ok */ }
1341 _
=> bail
!("wrong entry type for {:?}", entry_path
),
1343 match entry_path
.parent() {
1344 None
=> bail
!("wrong parent for {:?}", entry_path
),
1347 bail
!("wrong parent for {:?}", entry_path
);
1352 let filename
= entry
.file_name();
1353 let mut contents
= match decoder
.contents() {
1354 None
=> bail
!("missing file content"),
1355 Some(contents
) => contents
,
1358 let mut archive_path
= snapshot_path
.to_owned();
1359 archive_path
.push(&filename
);
1361 let mut tmp_path
= archive_path
.clone();
1362 tmp_path
.set_extension("tmp");
1364 if filename
== manifest_file_name
{
1366 let blob
= DataBlob
::load_from_reader(&mut contents
)?
;
1367 let mut old_manifest
= BackupManifest
::try_from(blob
)?
;
1369 // Remove verify_state to indicate that this snapshot is not verified
1370 old_manifest
.unprotected
1372 .map(|m
| m
.remove("verify_state"));
1374 let old_manifest
= serde_json
::to_string_pretty(&old_manifest
)?
;
1375 let blob
= DataBlob
::encode(old_manifest
.as_bytes(), None
, true)?
;
1377 let options
= CreateOptions
::new();
1378 replace_file(&tmp_path
, blob
.raw_data(), options
)?
;
1380 manifest
= Some(BackupManifest
::try_from(blob
)?
);
1382 let mut tmpfile
= std
::fs
::OpenOptions
::new()
1387 .map_err(|err
| format_err
!("restore {:?} failed - {}", tmp_path
, err
))?
;
1389 std
::io
::copy(&mut contents
, &mut tmpfile
)?
;
1391 if let Err(err
) = std
::fs
::rename(&tmp_path
, &archive_path
) {
1392 bail
!("Atomic rename file {:?} failed - {}", archive_path
, err
);
1397 let manifest
= match manifest
{
1398 None
=> bail
!("missing manifest"),
1399 Some(manifest
) => manifest
,
1402 // Do not verify anything here, because this would be to slow (causes tape stops).
1405 let mut manifest_path
= snapshot_path
.to_owned();
1406 manifest_path
.push(MANIFEST_BLOB_NAME
);
1407 let mut tmp_manifest_path
= manifest_path
.clone();
1408 tmp_manifest_path
.set_extension("tmp");
1410 if let Err(err
) = std
::fs
::rename(&tmp_manifest_path
, &manifest_path
) {
1411 bail
!("Atomic rename manifest {:?} failed - {}", manifest_path
, err
);
1417 /// Try to restore media catalogs (form catalog_archives)
1418 pub fn fast_catalog_restore(
1419 worker
: &WorkerTask
,
1420 drive
: &mut Box
<dyn TapeDriver
>,
1421 media_set
: &MediaSet
,
1422 uuid
: &Uuid
, // current media Uuid
1423 ) -> Result
<bool
, Error
> {
1425 let status_path
= Path
::new(TAPE_STATUS_DIR
);
1427 let current_file_number
= drive
.current_file_number()?
;
1428 if current_file_number
!= 2 {
1429 bail
!("fast_catalog_restore: wrong media position - internal error");
1432 let mut found_catalog
= false;
1434 let mut moved_to_eom
= false;
1437 let current_file_number
= drive
.current_file_number()?
;
1439 { // limit reader scope
1440 let mut reader
= match drive
.read_next_file() {
1441 Err(BlockReadError
::EndOfFile
) => {
1442 task_log
!(worker
, "skip unexpected filemark at pos {}", current_file_number
);
1445 Err(BlockReadError
::EndOfStream
) => {
1446 task_log
!(worker
, "detected EOT after {} files", current_file_number
);
1449 Err(BlockReadError
::Error(err
)) => {
1450 return Err(err
.into());
1452 Ok(reader
) => reader
,
1455 let header
: MediaContentHeader
= unsafe { reader.read_le_value()? }
;
1456 if header
.magic
!= PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0
{
1457 bail
!("missing MediaContentHeader");
1460 if header
.content_magic
== PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
{
1461 task_log
!(worker
, "found catalog at pos {}", current_file_number
);
1463 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
1465 let archive_header
: CatalogArchiveHeader
= serde_json
::from_slice(&header_data
)
1466 .map_err(|err
| format_err
!("unable to parse catalog archive header - {}", err
))?
;
1468 if &archive_header
.media_set_uuid
!= media_set
.uuid() {
1469 task_log
!(worker
, "skipping unrelated catalog at pos {}", current_file_number
);
1470 reader
.skip_data()?
; // read all data
1474 let catalog_uuid
= &archive_header
.uuid
;
1476 let wanted
= media_set
1482 Some(uuid
) => uuid
== catalog_uuid
,
1488 task_log
!(worker
, "skip catalog because media '{}' not inventarized", catalog_uuid
);
1489 reader
.skip_data()?
; // read all data
1493 if catalog_uuid
== uuid
{
1494 // always restore and overwrite catalog
1496 // only restore if catalog does not exist
1497 if MediaCatalog
::exists(status_path
, catalog_uuid
) {
1498 task_log
!(worker
, "catalog for media '{}' already exists", catalog_uuid
);
1499 reader
.skip_data()?
; // read all data
1504 let mut file
= MediaCatalog
::create_temporary_database_file(status_path
, catalog_uuid
)?
;
1506 std
::io
::copy(&mut reader
, &mut file
)?
;
1508 file
.seek(SeekFrom
::Start(0))?
;
1510 match MediaCatalog
::parse_catalog_header(&mut file
)?
{
1511 (true, Some(media_uuid
), Some(media_set_uuid
)) => {
1512 if &media_uuid
!= catalog_uuid
{
1513 task_log
!(worker
, "catalog uuid missmatch at pos {}", current_file_number
);
1516 if media_set_uuid
!= archive_header
.media_set_uuid
{
1517 task_log
!(worker
, "catalog media_set missmatch at pos {}", current_file_number
);
1521 MediaCatalog
::finish_temporary_database(status_path
, &media_uuid
, true)?
;
1523 if catalog_uuid
== uuid
{
1524 task_log
!(worker
, "successfully restored catalog");
1525 found_catalog
= true
1527 task_log
!(worker
, "successfully restored related catalog {}", media_uuid
);
1531 task_warn
!(worker
, "got incomplete catalog header - skip file");
1541 break; // already done - stop
1543 moved_to_eom
= true;
1545 task_log
!(worker
, "searching for catalog at EOT (moving to EOT)");
1546 drive
.move_to_last_file()?
;
1548 let new_file_number
= drive
.current_file_number()?
;
1550 if new_file_number
< (current_file_number
+ 1) {
1551 break; // no new content - stop