1 use std
::collections
::{BTreeMap, HashMap, HashSet}
;
2 use std
::convert
::TryFrom
;
4 use std
::io
::{Seek, SeekFrom}
;
5 use std
::path
::{Path, PathBuf}
;
8 use anyhow
::{bail, format_err, Error}
;
11 use proxmox_io
::ReadExt
;
12 use proxmox_router
::{Permission, Router, RpcEnvironment, RpcEnvironmentType}
;
13 use proxmox_schema
::api
;
14 use proxmox_section_config
::SectionConfigData
;
15 use proxmox_sys
::fs
::{replace_file, CreateOptions}
;
16 use proxmox_sys
::{task_log, task_warn, WorkerTaskContext}
;
17 use proxmox_uuid
::Uuid
;
20 Authid
, CryptMode
, Userid
, DATASTORE_MAP_ARRAY_SCHEMA
, DATASTORE_MAP_LIST_SCHEMA
,
21 DRIVE_NAME_SCHEMA
, PRIV_DATASTORE_BACKUP
, PRIV_DATASTORE_MODIFY
, PRIV_TAPE_READ
,
22 TAPE_RESTORE_SNAPSHOT_SCHEMA
, UPID_SCHEMA
,
24 use pbs_config
::CachedUserInfo
;
25 use pbs_datastore
::backup_info
::BackupDir
;
26 use pbs_datastore
::dynamic_index
::DynamicIndexReader
;
27 use pbs_datastore
::fixed_index
::FixedIndexReader
;
28 use pbs_datastore
::index
::IndexFile
;
29 use pbs_datastore
::manifest
::{archive_type, ArchiveType, BackupManifest, MANIFEST_BLOB_NAME}
;
30 use pbs_datastore
::{DataBlob, DataStore}
;
32 BlockReadError
, MediaContentHeader
, TapeRead
, PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0
,
34 use proxmox_rest_server
::WorkerTask
;
37 server
::lookup_user_email
,
39 drive
::{lock_tape_device, request_and_load_media, set_tape_device_state, TapeDriver}
,
41 CatalogArchiveHeader
, ChunkArchiveDecoder
, ChunkArchiveHeader
, SnapshotArchiveHeader
,
42 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
, PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0
,
43 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1
, PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0
,
44 PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0
, PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0
,
45 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1
,
47 lock_media_set
, Inventory
, MediaCatalog
, MediaId
, MediaSet
, MediaSetCatalog
,
50 tools
::parallel_handler
::ParallelHandler
,
53 const RESTORE_TMP_DIR
: &str = "/var/tmp/proxmox-backup";
55 pub struct DataStoreMap
{
56 map
: HashMap
<String
, Arc
<DataStore
>>,
57 default: Option
<Arc
<DataStore
>>,
60 impl TryFrom
<String
> for DataStoreMap
{
63 fn try_from(value
: String
) -> Result
<Self, Error
> {
64 let value
= DATASTORE_MAP_ARRAY_SCHEMA
.parse_property_string(&value
)?
;
65 let mut mapping
: Vec
<String
> = value
69 .map(|v
| v
.as_str().unwrap().to_string())
72 let mut map
= HashMap
::new();
73 let mut default = None
;
74 while let Some(mut store
) = mapping
.pop() {
75 if let Some(index
) = store
.find('
='
) {
76 let mut target
= store
.split_off(index
);
77 target
.remove(0); // remove '='
78 let datastore
= DataStore
::lookup_datastore(&target
)?
;
79 map
.insert(store
, datastore
);
80 } else if default.is_none() {
81 default = Some(DataStore
::lookup_datastore(&store
)?
);
83 bail
!("multiple default stores given");
87 Ok(Self { map, default }
)
92 fn used_datastores
<'a
>(&self) -> HashSet
<&str> {
93 let mut set
= HashSet
::new();
94 for store
in self.map
.values() {
95 set
.insert(store
.name());
98 if let Some(ref store
) = self.default {
99 set
.insert(store
.name());
105 fn get_datastore(&self, source
: &str) -> Option
<Arc
<DataStore
>> {
106 if let Some(store
) = self.map
.get(source
) {
107 return Some(Arc
::clone(store
));
109 if let Some(ref store
) = self.default {
110 return Some(Arc
::clone(store
));
117 fn check_datastore_privs(
118 user_info
: &CachedUserInfo
,
121 owner
: &Option
<Authid
>,
122 ) -> Result
<(), Error
> {
123 let privs
= user_info
.lookup_privs(auth_id
, &["datastore", store
]);
124 if (privs
& PRIV_DATASTORE_BACKUP
) == 0 {
125 bail
!("no permissions on /datastore/{}", store
);
128 if let Some(ref owner
) = owner
{
129 let correct_owner
= owner
== auth_id
130 || (owner
.is_token() && !auth_id
.is_token() && owner
.user() == auth_id
.user());
132 // same permission as changing ownership after syncing
133 if !correct_owner
&& privs
& PRIV_DATASTORE_MODIFY
== 0 {
134 bail
!("no permission to restore as '{}'", owner
);
141 pub const ROUTER
: Router
= Router
::new().post(&API_METHOD_RESTORE
);
147 schema
: DATASTORE_MAP_LIST_SCHEMA
,
150 schema
: DRIVE_NAME_SCHEMA
,
153 description
: "Media set UUID.",
161 description
: "List of snapshots.",
165 schema
: TAPE_RESTORE_SNAPSHOT_SCHEMA
,
178 // Note: parameters are no uri parameter, so we need to test inside function body
179 description
: "The user needs Tape.Read privilege on /tape/pool/{pool} \
180 and /tape/drive/{drive}, Datastore.Backup privilege on /datastore/{store}.",
181 permission
: &Permission
::Anybody
,
184 /// Restore data from media-set
189 notify_user
: Option
<Userid
>,
190 snapshots
: Option
<Vec
<String
>>,
191 owner
: Option
<Authid
>,
192 rpcenv
: &mut dyn RpcEnvironment
,
193 ) -> Result
<Value
, Error
> {
194 let auth_id
: Authid
= rpcenv
.get_auth_id().unwrap().parse()?
;
195 let user_info
= CachedUserInfo
::new()?
;
197 let store_map
= DataStoreMap
::try_from(store
)
198 .map_err(|err
| format_err
!("cannot parse store mapping: {}", err
))?
;
199 let used_datastores
= store_map
.used_datastores();
200 if used_datastores
.is_empty() {
201 bail
!("no datastores given");
204 for store
in used_datastores
.iter() {
205 check_datastore_privs(&user_info
, store
, &auth_id
, &owner
)?
;
208 let privs
= user_info
.lookup_privs(&auth_id
, &["tape", "drive", &drive
]);
209 if (privs
& PRIV_TAPE_READ
) == 0 {
210 bail
!("no permissions on /tape/drive/{}", drive
);
213 let media_set_uuid
= media_set
.parse()?
;
215 let status_path
= Path
::new(TAPE_STATUS_DIR
);
217 let _lock
= lock_media_set(status_path
, &media_set_uuid
, None
)?
;
219 let inventory
= Inventory
::load(status_path
)?
;
221 let pool
= inventory
.lookup_media_set_pool(&media_set_uuid
)?
;
223 let privs
= user_info
.lookup_privs(&auth_id
, &["tape", "pool", &pool
]);
224 if (privs
& PRIV_TAPE_READ
) == 0 {
225 bail
!("no permissions on /tape/pool/{}", pool
);
228 let (drive_config
, _digest
) = pbs_config
::drive
::config()?
;
230 // early check/lock before starting worker
231 let drive_lock
= lock_tape_device(&drive_config
, &drive
)?
;
233 let to_stdout
= rpcenv
.env_type() == RpcEnvironmentType
::CLI
;
235 let taskid
= used_datastores
237 .map(|s
| s
.to_string())
238 .collect
::<Vec
<String
>>()
241 let upid_str
= WorkerTask
::new_thread(
247 let _drive_lock
= drive_lock
; // keep lock guard
249 set_tape_device_state(&drive
, &worker
.upid().to_string())?
;
251 let restore_owner
= owner
.as_ref().unwrap_or(&auth_id
);
253 let email
= notify_user
255 .and_then(|userid
| lookup_user_email(userid
))
256 .or_else(|| lookup_user_email(&auth_id
.clone().into()));
258 task_log
!(worker
, "Mediaset '{}'", media_set
);
259 task_log
!(worker
, "Pool: {}", pool
);
261 let res
= if let Some(snapshots
) = snapshots
{
287 task_log
!(worker
, "Restore mediaset '{}' done", media_set
);
290 if let Err(err
) = set_tape_device_state(&drive
, "") {
291 task_log
!(worker
, "could not unset drive state for {}: {}", drive
, err
);
301 fn restore_full_worker(
302 worker
: Arc
<WorkerTask
>,
303 inventory
: Inventory
,
304 media_set_uuid
: Uuid
,
305 drive_config
: SectionConfigData
,
307 store_map
: DataStoreMap
,
308 restore_owner
: &Authid
,
309 email
: Option
<String
>,
310 ) -> Result
<(), Error
> {
311 let members
= inventory
.compute_media_set_members(&media_set_uuid
)?
;
313 let media_list
= members
.media_list();
315 let mut media_id_list
= Vec
::new();
317 let mut encryption_key_fingerprint
= None
;
319 for (seq_nr
, media_uuid
) in media_list
.iter().enumerate() {
323 "media set {} is incomplete (missing member {}).",
328 Some(media_uuid
) => {
329 let media_id
= inventory
.lookup_media(media_uuid
).unwrap();
330 if let Some(ref set
) = media_id
.media_set_label
{
332 if encryption_key_fingerprint
.is_none()
333 && set
.encryption_key_fingerprint
.is_some()
335 encryption_key_fingerprint
= set
.encryption_key_fingerprint
.clone();
338 media_id_list
.push(media_id
);
343 if let Some(fingerprint
) = encryption_key_fingerprint
{
344 task_log
!(worker
, "Encryption key fingerprint: {}", fingerprint
);
354 .collect
::<Vec
<String
>>()
358 task_log
!(worker
, "Drive: {}", drive_name
);
361 "Required media list: {}",
364 .map(|media_id
| media_id
.label
.label_text
.as_str())
365 .collect
::<Vec
<&str>>()
369 let mut datastore_locks
= Vec
::new();
370 for store_name
in store_map
.used_datastores() {
371 // explicit create shared lock to prevent GC on newly created chunks
372 if let Some(store
) = store_map
.get_datastore(store_name
) {
373 let shared_store_lock
= store
.try_shared_chunk_store_lock()?
;
374 datastore_locks
.push(shared_store_lock
);
378 let mut checked_chunks_map
= HashMap
::new();
380 for media_id
in media_id_list
.iter() {
381 request_and_restore_media(
387 &mut checked_chunks_map
,
396 fn restore_list_worker(
397 worker
: Arc
<WorkerTask
>,
398 snapshots
: Vec
<String
>,
399 inventory
: Inventory
,
400 media_set_uuid
: Uuid
,
401 drive_config
: SectionConfigData
,
403 store_map
: DataStoreMap
,
404 restore_owner
: &Authid
,
405 email
: Option
<String
>,
406 ) -> Result
<(), Error
> {
407 let base_path
: PathBuf
= format
!("{}/{}", RESTORE_TMP_DIR
, media_set_uuid
).into();
408 std
::fs
::create_dir_all(&base_path
)?
;
410 let catalog
= get_media_set_catalog(&inventory
, &media_set_uuid
)?
;
412 let mut datastore_locks
= Vec
::new();
413 let mut snapshot_file_hash
: BTreeMap
<Uuid
, Vec
<u64>> = BTreeMap
::new();
414 let mut snapshot_locks
= HashMap
::new();
416 let res
= proxmox_lang
::try_block
!({
417 // assemble snapshot files/locks
418 for store_snapshot
in snapshots
.iter() {
419 let mut split
= store_snapshot
.splitn(2, '
:'
);
420 let source_datastore
= split
422 .ok_or_else(|| format_err
!("invalid snapshot: {}", store_snapshot
))?
;
425 .ok_or_else(|| format_err
!("invalid snapshot:{}", store_snapshot
))?
;
426 let backup_dir
: BackupDir
= snapshot
.parse()?
;
428 let datastore
= store_map
.get_datastore(source_datastore
).ok_or_else(|| {
430 "could not find mapping for source datastore: {}",
435 let (owner
, _group_lock
) =
436 datastore
.create_locked_backup_group(backup_dir
.group(), restore_owner
)?
;
437 if restore_owner
!= &owner
{
438 // only the owner is allowed to create additional snapshots
440 "restore '{}' failed - owner check failed ({} != {})",
447 let (media_id
, file_num
) = if let Some((media_uuid
, file_num
)) =
448 catalog
.lookup_snapshot(source_datastore
, snapshot
)
450 let media_id
= inventory
.lookup_media(media_uuid
).unwrap();
455 "did not find snapshot '{}' in media set {}",
462 let (_rel_path
, is_new
, snap_lock
) = datastore
.create_locked_backup_dir(&backup_dir
)?
;
467 "found snapshot {} on target datastore, skipping...",
473 snapshot_locks
.insert(store_snapshot
.to_string(), snap_lock
);
475 let shared_store_lock
= datastore
.try_shared_chunk_store_lock()?
;
476 datastore_locks
.push(shared_store_lock
);
478 let file_list
= snapshot_file_hash
479 .entry(media_id
.label
.uuid
.clone())
480 .or_insert_with(Vec
::new
);
481 file_list
.push(file_num
);
485 "found snapshot {} on {}: file {}",
487 media_id
.label
.label_text
,
492 if snapshot_file_hash
.is_empty() {
493 task_log
!(worker
, "nothing to restore, skipping remaining phases...");
497 task_log
!(worker
, "Phase 1: temporarily restore snapshots to temp dir");
498 let mut datastore_chunk_map
: HashMap
<String
, HashSet
<[u8; 32]>> = HashMap
::new();
499 for (media_uuid
, file_list
) in snapshot_file_hash
.iter_mut() {
500 let media_id
= inventory
.lookup_media(media_uuid
).unwrap();
501 let (drive
, info
) = request_and_load_media(
508 file_list
.sort_unstable();
509 restore_snapshots_to_tmpdir(
516 &mut datastore_chunk_map
,
518 .map_err(|err
| format_err
!("could not restore snapshots to tmpdir: {}", err
))?
;
521 // sorted media_uuid => (sorted file_num => (set of digests)))
522 let mut media_file_chunk_map
: BTreeMap
<Uuid
, BTreeMap
<u64, HashSet
<[u8; 32]>>> =
525 for (source_datastore
, chunks
) in datastore_chunk_map
.into_iter() {
526 let datastore
= store_map
.get_datastore(&source_datastore
).ok_or_else(|| {
528 "could not find mapping for source datastore: {}",
532 for digest
in chunks
.into_iter() {
533 // we only want to restore chunks that we do not have yet
534 if !datastore
.cond_touch_chunk(&digest
, false)?
{
535 if let Some((uuid
, nr
)) = catalog
.lookup_chunk(&source_datastore
, &digest
) {
536 let file
= media_file_chunk_map
538 .or_insert_with(BTreeMap
::new
);
539 let chunks
= file
.entry(nr
).or_insert_with(HashSet
::new
);
540 chunks
.insert(digest
);
546 // we do not need it anymore, saves memory
549 if !media_file_chunk_map
.is_empty() {
550 task_log
!(worker
, "Phase 2: restore chunks to datastores");
552 task_log
!(worker
, "all chunks exist already, skipping phase 2...");
555 for (media_uuid
, file_chunk_map
) in media_file_chunk_map
.iter_mut() {
556 let media_id
= inventory
.lookup_media(media_uuid
).unwrap();
557 let (mut drive
, _info
) = request_and_load_media(
564 restore_file_chunk_map(worker
.clone(), &mut drive
, &store_map
, file_chunk_map
)?
;
569 "Phase 3: copy snapshots from temp dir to datastores"
571 for (store_snapshot
, _lock
) in snapshot_locks
.into_iter() {
572 proxmox_lang
::try_block
!({
573 let mut split
= store_snapshot
.splitn(2, '
:'
);
574 let source_datastore
= split
576 .ok_or_else(|| format_err
!("invalid snapshot: {}", store_snapshot
))?
;
579 .ok_or_else(|| format_err
!("invalid snapshot:{}", store_snapshot
))?
;
580 let backup_dir
: BackupDir
= snapshot
.parse()?
;
582 let datastore
= store_map
.get_datastore(source_datastore
).ok_or_else(|| {
583 format_err
!("unexpected source datastore: {}", source_datastore
)
586 let mut tmp_path
= base_path
.clone();
587 tmp_path
.push(&source_datastore
);
588 tmp_path
.push(snapshot
);
590 let path
= datastore
.snapshot_path(&backup_dir
);
592 for entry
in std
::fs
::read_dir(tmp_path
)?
{
594 let mut new_path
= path
.clone();
595 new_path
.push(entry
.file_name());
596 std
::fs
::copy(entry
.path(), new_path
)?
;
598 task_log
!(worker
, "Restore snapshot '{}' done", snapshot
);
601 .map_err(|err
: Error
| format_err
!("could not copy {}: {}", store_snapshot
, err
))?
;
609 "Error during restore, partially restored snapshots will NOT be cleaned up"
613 match std
::fs
::remove_dir_all(&base_path
) {
615 Err(err
) => task_warn
!(worker
, "error cleaning up: {}", err
),
621 fn get_media_set_catalog(
622 inventory
: &Inventory
,
623 media_set_uuid
: &Uuid
,
624 ) -> Result
<MediaSetCatalog
, Error
> {
625 let status_path
= Path
::new(TAPE_STATUS_DIR
);
627 let members
= inventory
.compute_media_set_members(media_set_uuid
)?
;
628 let media_list
= members
.media_list();
629 let mut catalog
= MediaSetCatalog
::new();
631 for (seq_nr
, media_uuid
) in media_list
.iter().enumerate() {
635 "media set {} is incomplete (missing member {}).",
640 Some(media_uuid
) => {
641 let media_id
= inventory
.lookup_media(media_uuid
).unwrap();
642 let media_catalog
= MediaCatalog
::open(status_path
, media_id
, false, false)?
;
643 catalog
.append_catalog(media_catalog
)?
;
651 fn restore_snapshots_to_tmpdir(
652 worker
: Arc
<WorkerTask
>,
655 mut drive
: Box
<dyn TapeDriver
>,
657 media_set_uuid
: &Uuid
,
658 chunks_list
: &mut HashMap
<String
, HashSet
<[u8; 32]>>,
659 ) -> Result
<(), Error
> {
660 match media_id
.media_set_label
{
663 "missing media set label on media {} ({})",
664 media_id
.label
.label_text
,
669 if set
.uuid
!= *media_set_uuid
{
671 "wrong media set label on media {} ({} != {})",
672 media_id
.label
.label_text
,
677 let encrypt_fingerprint
= set
.encryption_key_fingerprint
.clone().map(|fp
| {
678 task_log
!(worker
, "Encryption key fingerprint: {}", fp
);
679 (fp
, set
.uuid
.clone())
682 drive
.set_encryption(encrypt_fingerprint
)?
;
686 for file_num
in file_list
{
687 let current_file_number
= drive
.current_file_number()?
;
688 if current_file_number
!= *file_num
{
691 "was at file {}, moving to {}",
695 drive
.move_to_file(*file_num
)?
;
696 let current_file_number
= drive
.current_file_number()?
;
697 task_log
!(worker
, "now at file {}", current_file_number
);
699 let mut reader
= drive
.read_next_file()?
;
701 let header
: MediaContentHeader
= unsafe { reader.read_le_value()? }
;
702 if header
.magic
!= PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0
{
703 bail
!("missing MediaContentHeader");
706 match header
.content_magic
{
707 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1
=> {
708 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
710 let archive_header
: SnapshotArchiveHeader
= serde_json
::from_slice(&header_data
)
712 format_err
!("unable to parse snapshot archive header - {}", err
)
715 let source_datastore
= archive_header
.store
;
716 let snapshot
= archive_header
.snapshot
;
720 "File {}: snapshot archive {}:{}",
726 let mut decoder
= pxar
::decoder
::sync
::Decoder
::from_std(reader
)?
;
728 let mut tmp_path
= path
.clone();
729 tmp_path
.push(&source_datastore
);
730 tmp_path
.push(snapshot
);
731 std
::fs
::create_dir_all(&tmp_path
)?
;
733 let chunks
= chunks_list
734 .entry(source_datastore
)
735 .or_insert_with(HashSet
::new
);
737 try_restore_snapshot_archive(worker
.clone(), &mut decoder
, &tmp_path
)?
;
738 for item
in manifest
.files() {
739 let mut archive_path
= tmp_path
.to_owned();
740 archive_path
.push(&item
.filename
);
742 let index
: Box
<dyn IndexFile
> = match archive_type(&item
.filename
)?
{
743 ArchiveType
::DynamicIndex
=> {
744 Box
::new(DynamicIndexReader
::open(&archive_path
)?
)
746 ArchiveType
::FixedIndex
=> Box
::new(FixedIndexReader
::open(&archive_path
)?
),
747 ArchiveType
::Blob
=> continue,
749 for i
in 0..index
.index_count() {
750 if let Some(digest
) = index
.index_digest(i
) {
751 chunks
.insert(*digest
);
756 other
=> bail
!("unexpected file type: {:?}", other
),
763 fn restore_file_chunk_map(
764 worker
: Arc
<WorkerTask
>,
765 drive
: &mut Box
<dyn TapeDriver
>,
766 store_map
: &DataStoreMap
,
767 file_chunk_map
: &mut BTreeMap
<u64, HashSet
<[u8; 32]>>,
768 ) -> Result
<(), Error
> {
769 for (nr
, chunk_map
) in file_chunk_map
.iter_mut() {
770 let current_file_number
= drive
.current_file_number()?
;
771 if current_file_number
!= *nr
{
774 "was at file {}, moving to {}",
778 drive
.move_to_file(*nr
)?
;
779 let current_file_number
= drive
.current_file_number()?
;
780 task_log
!(worker
, "now at file {}", current_file_number
);
782 let mut reader
= drive
.read_next_file()?
;
783 let header
: MediaContentHeader
= unsafe { reader.read_le_value()? }
;
784 if header
.magic
!= PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0
{
785 bail
!("missing MediaContentHeader");
788 match header
.content_magic
{
789 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1
=> {
790 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
792 let archive_header
: ChunkArchiveHeader
= serde_json
::from_slice(&header_data
)
793 .map_err(|err
| format_err
!("unable to parse chunk archive header - {}", err
))?
;
795 let source_datastore
= archive_header
.store
;
799 "File {}: chunk archive for datastore '{}'",
804 let datastore
= store_map
.get_datastore(&source_datastore
).ok_or_else(|| {
805 format_err
!("unexpected chunk archive for store: {}", source_datastore
)
808 let count
= restore_partial_chunk_archive(
814 task_log
!(worker
, "restored {} chunks", count
);
816 _
=> bail
!("unexpected content magic {:?}", header
.content_magic
),
823 fn restore_partial_chunk_archive
<'a
>(
824 worker
: Arc
<WorkerTask
>,
825 reader
: Box
<dyn 'a
+ TapeRead
>,
826 datastore
: Arc
<DataStore
>,
827 chunk_list
: &mut HashSet
<[u8; 32]>,
828 ) -> Result
<usize, Error
> {
829 let mut decoder
= ChunkArchiveDecoder
::new(reader
);
833 let start_time
= std
::time
::SystemTime
::now();
834 let bytes
= Arc
::new(std
::sync
::atomic
::AtomicU64
::new(0));
835 let bytes2
= bytes
.clone();
837 let writer_pool
= ParallelHandler
::new(
838 "tape restore chunk writer",
840 move |(chunk
, digest
): (DataBlob
, [u8; 32])| {
841 if !datastore
.cond_touch_chunk(&digest
, false)?
{
842 bytes2
.fetch_add(chunk
.raw_size(), std
::sync
::atomic
::Ordering
::SeqCst
);
844 if chunk
.crypt_mode()?
== CryptMode
::None
{
845 chunk
.decode(None
, Some(&digest
))?
; // verify digest
848 datastore
.insert_chunk(&chunk
, &digest
)?
;
854 let verify_and_write_channel
= writer_pool
.channel();
857 let (digest
, blob
) = match decoder
.next_chunk()?
{
858 Some((digest
, blob
)) => (digest
, blob
),
862 worker
.check_abort()?
;
864 if chunk_list
.remove(&digest
) {
865 verify_and_write_channel
.send((blob
, digest
.clone()))?
;
869 if chunk_list
.is_empty() {
874 drop(verify_and_write_channel
);
876 writer_pool
.complete()?
;
878 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
880 let bytes
= bytes
.load(std
::sync
::atomic
::Ordering
::SeqCst
);
884 "restored {} bytes ({:.2} MB/s)",
886 (bytes
as f64) / (1_000_000.0 * elapsed
)
892 /// Request and restore complete media without using existing catalog (create catalog instead)
893 pub fn request_and_restore_media(
894 worker
: Arc
<WorkerTask
>,
896 drive_config
: &SectionConfigData
,
898 store_map
: &DataStoreMap
,
899 checked_chunks_map
: &mut HashMap
<String
, HashSet
<[u8; 32]>>,
900 restore_owner
: &Authid
,
901 email
: &Option
<String
>,
902 ) -> Result
<(), Error
> {
903 let media_set_uuid
= match media_id
.media_set_label
{
904 None
=> bail
!("restore_media: no media set - internal error"),
905 Some(ref set
) => &set
.uuid
,
908 let (mut drive
, info
) =
909 request_and_load_media(&worker
, drive_config
, drive_name
, &media_id
.label
, email
)?
;
911 match info
.media_set_label
{
914 "missing media set label on media {} ({})",
915 media_id
.label
.label_text
,
920 if &set
.uuid
!= media_set_uuid
{
922 "wrong media set label on media {} ({} != {})",
923 media_id
.label
.label_text
,
928 let encrypt_fingerprint
= set
929 .encryption_key_fingerprint
931 .map(|fp
| (fp
, set
.uuid
.clone()));
933 drive
.set_encryption(encrypt_fingerprint
)?
;
941 Some((store_map
, restore_owner
)),
947 /// Restore complete media content and catalog
949 /// Only create the catalog if target is None.
950 pub fn restore_media(
951 worker
: Arc
<WorkerTask
>,
952 drive
: &mut Box
<dyn TapeDriver
>,
954 target
: Option
<(&DataStoreMap
, &Authid
)>,
955 checked_chunks_map
: &mut HashMap
<String
, HashSet
<[u8; 32]>>,
957 ) -> Result
<(), Error
> {
958 let status_path
= Path
::new(TAPE_STATUS_DIR
);
959 let mut catalog
= MediaCatalog
::create_temporary_database(status_path
, media_id
, false)?
;
962 let current_file_number
= drive
.current_file_number()?
;
963 let reader
= match drive
.read_next_file() {
964 Err(BlockReadError
::EndOfFile
) => {
967 "skip unexpected filemark at pos {}",
972 Err(BlockReadError
::EndOfStream
) => {
973 task_log
!(worker
, "detected EOT after {} files", current_file_number
);
976 Err(BlockReadError
::Error(err
)) => {
977 return Err(err
.into());
979 Ok(reader
) => reader
,
995 MediaCatalog
::finish_temporary_database(status_path
, &media_id
.label
.uuid
, true)?
;
1000 fn restore_archive
<'a
>(
1001 worker
: Arc
<WorkerTask
>,
1002 mut reader
: Box
<dyn 'a
+ TapeRead
>,
1003 current_file_number
: u64,
1004 target
: Option
<(&DataStoreMap
, &Authid
)>,
1005 catalog
: &mut MediaCatalog
,
1006 checked_chunks_map
: &mut HashMap
<String
, HashSet
<[u8; 32]>>,
1008 ) -> Result
<(), Error
> {
1009 let header
: MediaContentHeader
= unsafe { reader.read_le_value()? }
;
1010 if header
.magic
!= PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0
{
1011 bail
!("missing MediaContentHeader");
1014 //println!("Found MediaContentHeader: {:?}", header);
1016 match header
.content_magic
{
1017 PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0
| PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0
=> {
1018 bail
!("unexpected content magic (label)");
1020 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0
=> {
1021 bail
!("unexpected snapshot archive version (v1.0)");
1023 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1
=> {
1024 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
1026 let archive_header
: SnapshotArchiveHeader
= serde_json
::from_slice(&header_data
)
1027 .map_err(|err
| format_err
!("unable to parse snapshot archive header - {}", err
))?
;
1029 let datastore_name
= archive_header
.store
;
1030 let snapshot
= archive_header
.snapshot
;
1034 "File {}: snapshot archive {}:{}",
1035 current_file_number
,
1040 let backup_dir
: BackupDir
= snapshot
.parse()?
;
1042 if let Some((store_map
, authid
)) = target
.as_ref() {
1043 if let Some(datastore
) = store_map
.get_datastore(&datastore_name
) {
1044 let (owner
, _group_lock
) =
1045 datastore
.create_locked_backup_group(backup_dir
.group(), authid
)?
;
1046 if *authid
!= &owner
{
1047 // only the owner is allowed to create additional snapshots
1049 "restore '{}' failed - owner check failed ({} != {})",
1056 let (rel_path
, is_new
, _snap_lock
) =
1057 datastore
.create_locked_backup_dir(&backup_dir
)?
;
1058 let mut path
= datastore
.base_path();
1059 path
.push(rel_path
);
1062 task_log
!(worker
, "restore snapshot {}", backup_dir
);
1064 match restore_snapshot_archive(worker
.clone(), reader
, &path
) {
1066 std
::fs
::remove_dir_all(&path
)?
;
1067 bail
!("restore snapshot {} failed - {}", backup_dir
, err
);
1070 std
::fs
::remove_dir_all(&path
)?
;
1071 task_log
!(worker
, "skip incomplete snapshot {}", backup_dir
);
1074 catalog
.register_snapshot(
1075 Uuid
::from(header
.uuid
),
1076 current_file_number
,
1080 catalog
.commit_if_large()?
;
1086 task_log
!(worker
, "skipping...");
1090 reader
.skip_data()?
; // read all data
1091 if let Ok(false) = reader
.is_incomplete() {
1092 catalog
.register_snapshot(
1093 Uuid
::from(header
.uuid
),
1094 current_file_number
,
1098 catalog
.commit_if_large()?
;
1101 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0
=> {
1102 bail
!("unexpected chunk archive version (v1.0)");
1104 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1
=> {
1105 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
1107 let archive_header
: ChunkArchiveHeader
= serde_json
::from_slice(&header_data
)
1108 .map_err(|err
| format_err
!("unable to parse chunk archive header - {}", err
))?
;
1110 let source_datastore
= archive_header
.store
;
1114 "File {}: chunk archive for datastore '{}'",
1115 current_file_number
,
1118 let datastore
= target
1120 .and_then(|t
| t
.0.get_datastore(&source_datastore
));
1122 if datastore
.is_some() || target
.is_none() {
1123 let checked_chunks
= checked_chunks_map
1128 .unwrap_or("_unused_")
1131 .or_insert(HashSet
::new());
1133 let chunks
= if let Some(datastore
) = datastore
{
1134 restore_chunk_archive(
1142 scan_chunk_archive(worker
.clone(), reader
, verbose
)?
1145 if let Some(chunks
) = chunks
{
1146 catalog
.register_chunk_archive(
1147 Uuid
::from(header
.uuid
),
1148 current_file_number
,
1152 task_log
!(worker
, "register {} chunks", chunks
.len());
1153 catalog
.commit_if_large()?
;
1156 } else if target
.is_some() {
1157 task_log
!(worker
, "skipping...");
1160 reader
.skip_data()?
; // read all data
1162 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
=> {
1163 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
1165 let archive_header
: CatalogArchiveHeader
= serde_json
::from_slice(&header_data
)
1166 .map_err(|err
| format_err
!("unable to parse catalog archive header - {}", err
))?
;
1170 "File {}: skip catalog '{}'",
1171 current_file_number
,
1175 reader
.skip_data()?
; // read all data
1177 _
=> bail
!("unknown content magic {:?}", header
.content_magic
),
1183 // Read chunk archive without restoring data - just record contained chunks
1184 fn scan_chunk_archive
<'a
>(
1185 worker
: Arc
<WorkerTask
>,
1186 reader
: Box
<dyn 'a
+ TapeRead
>,
1188 ) -> Result
<Option
<Vec
<[u8; 32]>>, Error
> {
1189 let mut chunks
= Vec
::new();
1191 let mut decoder
= ChunkArchiveDecoder
::new(reader
);
1194 let digest
= match decoder
.next_chunk() {
1195 Ok(Some((digest
, _blob
))) => digest
,
1198 let reader
= decoder
.reader();
1200 // check if this stream is marked incomplete
1201 if let Ok(true) = reader
.is_incomplete() {
1202 return Ok(Some(chunks
));
1205 // check if this is an aborted stream without end marker
1206 if let Ok(false) = reader
.has_end_marker() {
1207 task_log
!(worker
, "missing stream end marker");
1211 // else the archive is corrupt
1216 worker
.check_abort()?
;
1219 task_log
!(worker
, "Found chunk: {}", hex
::encode(&digest
));
1222 chunks
.push(digest
);
1228 fn restore_chunk_archive
<'a
>(
1229 worker
: Arc
<WorkerTask
>,
1230 reader
: Box
<dyn 'a
+ TapeRead
>,
1231 datastore
: Arc
<DataStore
>,
1232 checked_chunks
: &mut HashSet
<[u8; 32]>,
1234 ) -> Result
<Option
<Vec
<[u8; 32]>>, Error
> {
1235 let mut chunks
= Vec
::new();
1237 let mut decoder
= ChunkArchiveDecoder
::new(reader
);
1239 let start_time
= std
::time
::SystemTime
::now();
1240 let bytes
= Arc
::new(std
::sync
::atomic
::AtomicU64
::new(0));
1241 let bytes2
= bytes
.clone();
1243 let worker2
= worker
.clone();
1245 let writer_pool
= ParallelHandler
::new(
1246 "tape restore chunk writer",
1248 move |(chunk
, digest
): (DataBlob
, [u8; 32])| {
1249 let chunk_exists
= datastore
.cond_touch_chunk(&digest
, false)?
;
1252 task_log
!(worker2
, "Insert chunk: {}", hex
::encode(&digest
));
1254 bytes2
.fetch_add(chunk
.raw_size(), std
::sync
::atomic
::Ordering
::SeqCst
);
1255 // println!("verify and write {}", hex::encode(&digest));
1256 chunk
.verify_crc()?
;
1257 if chunk
.crypt_mode()?
== CryptMode
::None
{
1258 chunk
.decode(None
, Some(&digest
))?
; // verify digest
1261 datastore
.insert_chunk(&chunk
, &digest
)?
;
1263 task_log
!(worker2
, "Found existing chunk: {}", hex
::encode(&digest
));
1269 let verify_and_write_channel
= writer_pool
.channel();
1272 let (digest
, blob
) = match decoder
.next_chunk() {
1273 Ok(Some((digest
, blob
))) => (digest
, blob
),
1276 let reader
= decoder
.reader();
1278 // check if this stream is marked incomplete
1279 if let Ok(true) = reader
.is_incomplete() {
1280 return Ok(Some(chunks
));
1283 // check if this is an aborted stream without end marker
1284 if let Ok(false) = reader
.has_end_marker() {
1285 task_log
!(worker
, "missing stream end marker");
1289 // else the archive is corrupt
1294 worker
.check_abort()?
;
1296 if !checked_chunks
.contains(&digest
) {
1297 verify_and_write_channel
.send((blob
, digest
.clone()))?
;
1298 checked_chunks
.insert(digest
.clone());
1300 chunks
.push(digest
);
1303 drop(verify_and_write_channel
);
1305 writer_pool
.complete()?
;
1307 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
1309 let bytes
= bytes
.load(std
::sync
::atomic
::Ordering
::SeqCst
);
1313 "restored {} bytes ({:.2} MB/s)",
1315 (bytes
as f64) / (1_000_000.0 * elapsed
)
1321 fn restore_snapshot_archive
<'a
>(
1322 worker
: Arc
<WorkerTask
>,
1323 reader
: Box
<dyn 'a
+ TapeRead
>,
1324 snapshot_path
: &Path
,
1325 ) -> Result
<bool
, Error
> {
1326 let mut decoder
= pxar
::decoder
::sync
::Decoder
::from_std(reader
)?
;
1327 match try_restore_snapshot_archive(worker
, &mut decoder
, snapshot_path
) {
1330 let reader
= decoder
.input();
1332 // check if this stream is marked incomplete
1333 if let Ok(true) = reader
.is_incomplete() {
1337 // check if this is an aborted stream without end marker
1338 if let Ok(false) = reader
.has_end_marker() {
1342 // else the archive is corrupt
1348 fn try_restore_snapshot_archive
<R
: pxar
::decoder
::SeqRead
>(
1349 worker
: Arc
<WorkerTask
>,
1350 decoder
: &mut pxar
::decoder
::sync
::Decoder
<R
>,
1351 snapshot_path
: &Path
,
1352 ) -> Result
<BackupManifest
, Error
> {
1353 let _root
= match decoder
.next() {
1354 None
=> bail
!("missing root entry"),
1358 pxar
::EntryKind
::Directory
=> { /* Ok */ }
1359 _
=> bail
!("wrong root entry type"),
1365 let root_path
= Path
::new("/");
1366 let manifest_file_name
= OsStr
::new(MANIFEST_BLOB_NAME
);
1368 let mut manifest
= None
;
1371 worker
.check_abort()?
;
1373 let entry
= match decoder
.next() {
1375 Some(entry
) => entry?
,
1377 let entry_path
= entry
.path();
1379 match entry
.kind() {
1380 pxar
::EntryKind
::File { .. }
=> { /* Ok */ }
1381 _
=> bail
!("wrong entry type for {:?}", entry_path
),
1383 match entry_path
.parent() {
1384 None
=> bail
!("wrong parent for {:?}", entry_path
),
1387 bail
!("wrong parent for {:?}", entry_path
);
1392 let filename
= entry
.file_name();
1393 let mut contents
= match decoder
.contents() {
1394 None
=> bail
!("missing file content"),
1395 Some(contents
) => contents
,
1398 let mut archive_path
= snapshot_path
.to_owned();
1399 archive_path
.push(&filename
);
1401 let mut tmp_path
= archive_path
.clone();
1402 tmp_path
.set_extension("tmp");
1404 if filename
== manifest_file_name
{
1405 let blob
= DataBlob
::load_from_reader(&mut contents
)?
;
1406 let mut old_manifest
= BackupManifest
::try_from(blob
)?
;
1408 // Remove verify_state to indicate that this snapshot is not verified
1412 .map(|m
| m
.remove("verify_state"));
1414 let old_manifest
= serde_json
::to_string_pretty(&old_manifest
)?
;
1415 let blob
= DataBlob
::encode(old_manifest
.as_bytes(), None
, true)?
;
1417 let options
= CreateOptions
::new();
1418 replace_file(&tmp_path
, blob
.raw_data(), options
, false)?
;
1420 manifest
= Some(BackupManifest
::try_from(blob
)?
);
1422 let mut tmpfile
= std
::fs
::OpenOptions
::new()
1427 .map_err(|err
| format_err
!("restore {:?} failed - {}", tmp_path
, err
))?
;
1429 std
::io
::copy(&mut contents
, &mut tmpfile
)?
;
1431 if let Err(err
) = std
::fs
::rename(&tmp_path
, &archive_path
) {
1432 bail
!("Atomic rename file {:?} failed - {}", archive_path
, err
);
1437 let manifest
= match manifest
{
1438 None
=> bail
!("missing manifest"),
1439 Some(manifest
) => manifest
,
1442 // Do not verify anything here, because this would be to slow (causes tape stops).
1445 let mut manifest_path
= snapshot_path
.to_owned();
1446 manifest_path
.push(MANIFEST_BLOB_NAME
);
1447 let mut tmp_manifest_path
= manifest_path
.clone();
1448 tmp_manifest_path
.set_extension("tmp");
1450 if let Err(err
) = std
::fs
::rename(&tmp_manifest_path
, &manifest_path
) {
1452 "Atomic rename manifest {:?} failed - {}",
1461 /// Try to restore media catalogs (form catalog_archives)
1462 pub fn fast_catalog_restore(
1463 worker
: &WorkerTask
,
1464 drive
: &mut Box
<dyn TapeDriver
>,
1465 media_set
: &MediaSet
,
1466 uuid
: &Uuid
, // current media Uuid
1467 ) -> Result
<bool
, Error
> {
1468 let status_path
= Path
::new(TAPE_STATUS_DIR
);
1470 let current_file_number
= drive
.current_file_number()?
;
1471 if current_file_number
!= 2 {
1472 bail
!("fast_catalog_restore: wrong media position - internal error");
1475 let mut found_catalog
= false;
1477 let mut moved_to_eom
= false;
1480 let current_file_number
= drive
.current_file_number()?
;
1483 // limit reader scope
1484 let mut reader
= match drive
.read_next_file() {
1485 Err(BlockReadError
::EndOfFile
) => {
1488 "skip unexpected filemark at pos {}",
1493 Err(BlockReadError
::EndOfStream
) => {
1494 task_log
!(worker
, "detected EOT after {} files", current_file_number
);
1497 Err(BlockReadError
::Error(err
)) => {
1498 return Err(err
.into());
1500 Ok(reader
) => reader
,
1503 let header
: MediaContentHeader
= unsafe { reader.read_le_value()? }
;
1504 if header
.magic
!= PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0
{
1505 bail
!("missing MediaContentHeader");
1508 if header
.content_magic
== PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
{
1509 task_log
!(worker
, "found catalog at pos {}", current_file_number
);
1511 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
1513 let archive_header
: CatalogArchiveHeader
= serde_json
::from_slice(&header_data
)
1515 format_err
!("unable to parse catalog archive header - {}", err
)
1518 if &archive_header
.media_set_uuid
!= media_set
.uuid() {
1521 "skipping unrelated catalog at pos {}",
1524 reader
.skip_data()?
; // read all data
1528 let catalog_uuid
= &archive_header
.uuid
;
1530 let wanted
= media_set
1535 Some(uuid
) => uuid
== catalog_uuid
,
1542 "skip catalog because media '{}' not inventarized",
1545 reader
.skip_data()?
; // read all data
1549 if catalog_uuid
== uuid
{
1550 // always restore and overwrite catalog
1552 // only restore if catalog does not exist
1553 if MediaCatalog
::exists(status_path
, catalog_uuid
) {
1556 "catalog for media '{}' already exists",
1559 reader
.skip_data()?
; // read all data
1565 MediaCatalog
::create_temporary_database_file(status_path
, catalog_uuid
)?
;
1567 std
::io
::copy(&mut reader
, &mut file
)?
;
1569 file
.seek(SeekFrom
::Start(0))?
;
1571 match MediaCatalog
::parse_catalog_header(&mut file
)?
{
1572 (true, Some(media_uuid
), Some(media_set_uuid
)) => {
1573 if &media_uuid
!= catalog_uuid
{
1576 "catalog uuid missmatch at pos {}",
1581 if media_set_uuid
!= archive_header
.media_set_uuid
{
1584 "catalog media_set missmatch at pos {}",
1590 MediaCatalog
::finish_temporary_database(status_path
, &media_uuid
, true)?
;
1592 if catalog_uuid
== uuid
{
1593 task_log
!(worker
, "successfully restored catalog");
1594 found_catalog
= true
1598 "successfully restored related catalog {}",
1604 task_warn
!(worker
, "got incomplete catalog header - skip file");
1614 break; // already done - stop
1616 moved_to_eom
= true;
1618 task_log
!(worker
, "searching for catalog at EOT (moving to EOT)");
1619 drive
.move_to_last_file()?
;
1621 let new_file_number
= drive
.current_file_number()?
;
1623 if new_file_number
< (current_file_number
+ 1) {
1624 break; // no new content - stop