1 use std
::path
::{Path, PathBuf}
;
3 use std
::collections
::{HashMap, HashSet, BTreeMap}
;
4 use std
::convert
::TryFrom
;
5 use std
::io
::{Seek, SeekFrom}
;
8 use anyhow
::{bail, format_err, Error}
;
11 use proxmox
::tools
::fs
::{replace_file, CreateOptions}
;
12 use proxmox_io
::ReadExt
;
13 use proxmox_router
::{Permission, Router, RpcEnvironment, RpcEnvironmentType}
;
14 use proxmox_schema
::{api, parse_property_string}
;
15 use proxmox_section_config
::SectionConfigData
;
16 use proxmox_uuid
::Uuid
;
19 Authid
, Userid
, CryptMode
,
20 DATASTORE_MAP_ARRAY_SCHEMA
, DATASTORE_MAP_LIST_SCHEMA
, DRIVE_NAME_SCHEMA
,
21 UPID_SCHEMA
, TAPE_RESTORE_SNAPSHOT_SCHEMA
,
22 PRIV_DATASTORE_BACKUP
, PRIV_DATASTORE_MODIFY
, PRIV_TAPE_READ
,
24 use pbs_datastore
::{DataStore, DataBlob}
;
25 use pbs_datastore
::backup_info
::BackupDir
;
26 use pbs_datastore
::dynamic_index
::DynamicIndexReader
;
27 use pbs_datastore
::fixed_index
::FixedIndexReader
;
28 use pbs_datastore
::index
::IndexFile
;
29 use pbs_datastore
::manifest
::{archive_type, ArchiveType, BackupManifest, MANIFEST_BLOB_NAME}
;
30 use pbs_config
::CachedUserInfo
;
32 TapeRead
, BlockReadError
, MediaContentHeader
,
33 PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0
,
35 use pbs_tools
::{task_log, task_warn, task::WorkerTaskContext}
;
36 use proxmox_rest_server
::WorkerTask
;
39 tools
::ParallelHandler
,
40 server
::lookup_user_email
,
50 PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0
,
51 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0
,
52 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1
,
53 PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0
,
54 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0
,
55 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1
,
56 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
,
59 SnapshotArchiveHeader
,
64 request_and_load_media
,
66 set_tape_device_state
,
71 const RESTORE_TMP_DIR
: &str = "/var/tmp/proxmox-backup";
73 pub struct DataStoreMap
{
74 map
: HashMap
<String
, Arc
<DataStore
>>,
75 default: Option
<Arc
<DataStore
>>,
78 impl TryFrom
<String
> for DataStoreMap
{
81 fn try_from(value
: String
) -> Result
<Self, Error
> {
82 let value
= parse_property_string(&value
, &DATASTORE_MAP_ARRAY_SCHEMA
)?
;
83 let mut mapping
: Vec
<String
> = value
87 .map(|v
| v
.as_str().unwrap().to_string())
90 let mut map
= HashMap
::new();
91 let mut default = None
;
92 while let Some(mut store
) = mapping
.pop() {
93 if let Some(index
) = store
.find('
='
) {
94 let mut target
= store
.split_off(index
);
95 target
.remove(0); // remove '='
96 let datastore
= DataStore
::lookup_datastore(&target
)?
;
97 map
.insert(store
, datastore
);
98 } else if default.is_none() {
99 default = Some(DataStore
::lookup_datastore(&store
)?
);
101 bail
!("multiple default stores given");
105 Ok(Self { map, default }
)
110 fn used_datastores
<'a
>(&self) -> HashSet
<&str> {
111 let mut set
= HashSet
::new();
112 for store
in self.map
.values() {
113 set
.insert(store
.name());
116 if let Some(ref store
) = self.default {
117 set
.insert(store
.name());
123 fn get_datastore(&self, source
: &str) -> Option
<Arc
<DataStore
>> {
124 if let Some(store
) = self.map
.get(source
) {
125 return Some(Arc
::clone(store
));
127 if let Some(ref store
) = self.default {
128 return Some(Arc
::clone(store
));
135 fn check_datastore_privs(
136 user_info
: &CachedUserInfo
,
139 owner
: &Option
<Authid
>,
140 ) -> Result
<(), Error
> {
141 let privs
= user_info
.lookup_privs(&auth_id
, &["datastore", &store
]);
142 if (privs
& PRIV_DATASTORE_BACKUP
) == 0 {
143 bail
!("no permissions on /datastore/{}", store
);
146 if let Some(ref owner
) = owner
{
147 let correct_owner
= owner
== auth_id
148 || (owner
.is_token() && !auth_id
.is_token() && owner
.user() == auth_id
.user());
150 // same permission as changing ownership after syncing
151 if !correct_owner
&& privs
& PRIV_DATASTORE_MODIFY
== 0 {
152 bail
!("no permission to restore as '{}'", owner
);
159 pub const ROUTER
: Router
= Router
::new().post(&API_METHOD_RESTORE
);
165 schema
: DATASTORE_MAP_LIST_SCHEMA
,
168 schema
: DRIVE_NAME_SCHEMA
,
171 description
: "Media set UUID.",
179 description
: "List of snapshots.",
183 schema
: TAPE_RESTORE_SNAPSHOT_SCHEMA
,
196 // Note: parameters are no uri parameter, so we need to test inside function body
197 description
: "The user needs Tape.Read privilege on /tape/pool/{pool} \
198 and /tape/drive/{drive}, Datastore.Backup privilege on /datastore/{store}.",
199 permission
: &Permission
::Anybody
,
202 /// Restore data from media-set
207 notify_user
: Option
<Userid
>,
208 snapshots
: Option
<Vec
<String
>>,
209 owner
: Option
<Authid
>,
210 rpcenv
: &mut dyn RpcEnvironment
,
211 ) -> Result
<Value
, Error
> {
212 let auth_id
: Authid
= rpcenv
.get_auth_id().unwrap().parse()?
;
213 let user_info
= CachedUserInfo
::new()?
;
215 let store_map
= DataStoreMap
::try_from(store
)
216 .map_err(|err
| format_err
!("cannot parse store mapping: {}", err
))?
;
217 let used_datastores
= store_map
.used_datastores();
218 if used_datastores
.len() == 0 {
219 bail
!("no datastores given");
222 for store
in used_datastores
.iter() {
223 check_datastore_privs(&user_info
, &store
, &auth_id
, &owner
)?
;
226 let privs
= user_info
.lookup_privs(&auth_id
, &["tape", "drive", &drive
]);
227 if (privs
& PRIV_TAPE_READ
) == 0 {
228 bail
!("no permissions on /tape/drive/{}", drive
);
231 let media_set_uuid
= media_set
.parse()?
;
233 let status_path
= Path
::new(TAPE_STATUS_DIR
);
235 let _lock
= lock_media_set(status_path
, &media_set_uuid
, None
)?
;
237 let inventory
= Inventory
::load(status_path
)?
;
239 let pool
= inventory
.lookup_media_set_pool(&media_set_uuid
)?
;
241 let privs
= user_info
.lookup_privs(&auth_id
, &["tape", "pool", &pool
]);
242 if (privs
& PRIV_TAPE_READ
) == 0 {
243 bail
!("no permissions on /tape/pool/{}", pool
);
246 let (drive_config
, _digest
) = pbs_config
::drive
::config()?
;
248 // early check/lock before starting worker
249 let drive_lock
= lock_tape_device(&drive_config
, &drive
)?
;
251 let to_stdout
= rpcenv
.env_type() == RpcEnvironmentType
::CLI
;
253 let taskid
= used_datastores
255 .map(|s
| s
.to_string())
256 .collect
::<Vec
<String
>>()
259 let upid_str
= WorkerTask
::new_thread(
265 let _drive_lock
= drive_lock
; // keep lock guard
267 set_tape_device_state(&drive
, &worker
.upid().to_string())?
;
269 let restore_owner
= owner
.as_ref().unwrap_or(&auth_id
);
271 let email
= notify_user
273 .and_then(|userid
| lookup_user_email(userid
))
274 .or_else(|| lookup_user_email(&auth_id
.clone().into()));
276 task_log
!(worker
, "Mediaset '{}'", media_set
);
277 task_log
!(worker
, "Pool: {}", pool
);
279 let res
= if let Some(snapshots
) = snapshots
{
305 task_log
!(worker
, "Restore mediaset '{}' done", media_set
);
308 if let Err(err
) = set_tape_device_state(&drive
, "") {
311 "could not unset drive state for {}: {}",
324 fn restore_full_worker(
325 worker
: Arc
<WorkerTask
>,
326 inventory
: Inventory
,
327 media_set_uuid
: Uuid
,
328 drive_config
: SectionConfigData
,
330 store_map
: DataStoreMap
,
331 restore_owner
: &Authid
,
332 email
: Option
<String
>,
333 ) -> Result
<(), Error
> {
334 let members
= inventory
.compute_media_set_members(&media_set_uuid
)?
;
336 let media_list
= members
.media_list();
338 let mut media_id_list
= Vec
::new();
340 let mut encryption_key_fingerprint
= None
;
342 for (seq_nr
, media_uuid
) in media_list
.iter().enumerate() {
345 bail
!("media set {} is incomplete (missing member {}).", media_set_uuid
, seq_nr
);
347 Some(media_uuid
) => {
348 let media_id
= inventory
.lookup_media(media_uuid
).unwrap();
349 if let Some(ref set
) = media_id
.media_set_label
{ // always true here
350 if encryption_key_fingerprint
.is_none() && set
.encryption_key_fingerprint
.is_some() {
351 encryption_key_fingerprint
= set
.encryption_key_fingerprint
.clone();
354 media_id_list
.push(media_id
);
359 if let Some(fingerprint
) = encryption_key_fingerprint
{
360 task_log
!(worker
, "Encryption key fingerprint: {}", fingerprint
);
370 .collect
::<Vec
<String
>>()
374 task_log
!(worker
, "Drive: {}", drive_name
);
377 "Required media list: {}",
379 .map(|media_id
| media_id
.label
.label_text
.as_str())
380 .collect
::<Vec
<&str>>()
384 let mut datastore_locks
= Vec
::new();
385 for store_name
in store_map
.used_datastores() {
386 // explicit create shared lock to prevent GC on newly created chunks
387 if let Some(store
) = store_map
.get_datastore(store_name
) {
388 let shared_store_lock
= store
.try_shared_chunk_store_lock()?
;
389 datastore_locks
.push(shared_store_lock
);
393 let mut checked_chunks_map
= HashMap
::new();
395 for media_id
in media_id_list
.iter() {
396 request_and_restore_media(
402 &mut checked_chunks_map
,
411 fn restore_list_worker(
412 worker
: Arc
<WorkerTask
>,
413 snapshots
: Vec
<String
>,
414 inventory
: Inventory
,
415 media_set_uuid
: Uuid
,
416 drive_config
: SectionConfigData
,
418 store_map
: DataStoreMap
,
419 restore_owner
: &Authid
,
420 email
: Option
<String
>,
421 ) -> Result
<(), Error
> {
422 let base_path
: PathBuf
= format
!("{}/{}", RESTORE_TMP_DIR
, media_set_uuid
).into();
423 std
::fs
::create_dir_all(&base_path
)?
;
425 let catalog
= get_media_set_catalog(&inventory
, &media_set_uuid
)?
;
427 let mut datastore_locks
= Vec
::new();
428 let mut snapshot_file_hash
: BTreeMap
<Uuid
, Vec
<u64>> = BTreeMap
::new();
429 let mut snapshot_locks
= HashMap
::new();
431 let res
= proxmox_lang
::try_block
!({
432 // assemble snapshot files/locks
433 for store_snapshot
in snapshots
.iter() {
434 let mut split
= store_snapshot
.splitn(2, '
:'
);
435 let source_datastore
= split
437 .ok_or_else(|| format_err
!("invalid snapshot: {}", store_snapshot
))?
;
440 .ok_or_else(|| format_err
!("invalid snapshot:{}", store_snapshot
))?
;
441 let backup_dir
: BackupDir
= snapshot
.parse()?
;
443 let datastore
= store_map
.get_datastore(source_datastore
).ok_or_else(|| {
445 "could not find mapping for source datastore: {}",
450 let (owner
, _group_lock
) =
451 datastore
.create_locked_backup_group(backup_dir
.group(), &restore_owner
)?
;
452 if restore_owner
!= &owner
{
453 // only the owner is allowed to create additional snapshots
455 "restore '{}' failed - owner check failed ({} != {})",
462 let (media_id
, file_num
) = if let Some((media_uuid
, file_num
)) =
463 catalog
.lookup_snapshot(&source_datastore
, &snapshot
)
465 let media_id
= inventory
.lookup_media(media_uuid
).unwrap();
470 "did not find snapshot '{}' in media set {}",
477 let (_rel_path
, is_new
, snap_lock
) = datastore
.create_locked_backup_dir(&backup_dir
)?
;
482 "found snapshot {} on target datastore, skipping...",
488 snapshot_locks
.insert(store_snapshot
.to_string(), snap_lock
);
490 let shared_store_lock
= datastore
.try_shared_chunk_store_lock()?
;
491 datastore_locks
.push(shared_store_lock
);
493 let file_list
= snapshot_file_hash
494 .entry(media_id
.label
.uuid
.clone())
495 .or_insert_with(Vec
::new
);
496 file_list
.push(file_num
);
500 "found snapshot {} on {}: file {}",
502 media_id
.label
.label_text
,
507 if snapshot_file_hash
.is_empty() {
508 task_log
!(worker
, "nothing to restore, skipping remaining phases...");
512 task_log
!(worker
, "Phase 1: temporarily restore snapshots to temp dir");
513 let mut datastore_chunk_map
: HashMap
<String
, HashSet
<[u8; 32]>> = HashMap
::new();
514 for (media_uuid
, file_list
) in snapshot_file_hash
.iter_mut() {
515 let media_id
= inventory
.lookup_media(media_uuid
).unwrap();
516 let (drive
, info
) = request_and_load_media(
523 file_list
.sort_unstable();
524 restore_snapshots_to_tmpdir(
531 &mut datastore_chunk_map
,
532 ).map_err(|err
| format_err
!("could not restore snapshots to tmpdir: {}", err
))?
;
535 // sorted media_uuid => (sorted file_num => (set of digests)))
536 let mut media_file_chunk_map
: BTreeMap
<Uuid
, BTreeMap
<u64, HashSet
<[u8; 32]>>> = BTreeMap
::new();
538 for (source_datastore
, chunks
) in datastore_chunk_map
.into_iter() {
539 let datastore
= store_map
.get_datastore(&source_datastore
).ok_or_else(|| {
541 "could not find mapping for source datastore: {}",
545 for digest
in chunks
.into_iter() {
546 // we only want to restore chunks that we do not have yet
547 if !datastore
.cond_touch_chunk(&digest
, false)?
{
548 if let Some((uuid
, nr
)) = catalog
.lookup_chunk(&source_datastore
, &digest
) {
549 let file
= media_file_chunk_map
.entry(uuid
.clone()).or_insert_with(BTreeMap
::new
);
550 let chunks
= file
.entry(nr
).or_insert_with(HashSet
::new
);
551 chunks
.insert(digest
);
557 // we do not need it anymore, saves memory
560 if !media_file_chunk_map
.is_empty() {
561 task_log
!(worker
, "Phase 2: restore chunks to datastores");
563 task_log
!(worker
, "all chunks exist already, skipping phase 2...");
566 for (media_uuid
, file_chunk_map
) in media_file_chunk_map
.iter_mut() {
567 let media_id
= inventory
.lookup_media(media_uuid
).unwrap();
568 let (mut drive
, _info
) = request_and_load_media(
575 restore_file_chunk_map(worker
.clone(), &mut drive
, &store_map
, file_chunk_map
)?
;
580 "Phase 3: copy snapshots from temp dir to datastores"
582 for (store_snapshot
, _lock
) in snapshot_locks
.into_iter() {
583 proxmox_lang
::try_block
!({
584 let mut split
= store_snapshot
.splitn(2, '
:'
);
585 let source_datastore
= split
587 .ok_or_else(|| format_err
!("invalid snapshot: {}", store_snapshot
))?
;
590 .ok_or_else(|| format_err
!("invalid snapshot:{}", store_snapshot
))?
;
591 let backup_dir
: BackupDir
= snapshot
.parse()?
;
593 let datastore
= store_map
594 .get_datastore(&source_datastore
)
595 .ok_or_else(|| format_err
!("unexpected source datastore: {}", source_datastore
))?
;
597 let mut tmp_path
= base_path
.clone();
598 tmp_path
.push(&source_datastore
);
599 tmp_path
.push(snapshot
);
601 let path
= datastore
.snapshot_path(&backup_dir
);
603 for entry
in std
::fs
::read_dir(tmp_path
)?
{
605 let mut new_path
= path
.clone();
606 new_path
.push(entry
.file_name());
607 std
::fs
::copy(entry
.path(), new_path
)?
;
609 task_log
!(worker
, "Restore snapshot '{}' done", snapshot
);
611 }).map_err(|err
: Error
| format_err
!("could not copy {}: {}", store_snapshot
, err
))?
;
617 task_warn
!(worker
, "Error during restore, partially restored snapshots will NOT be cleaned up");
620 match std
::fs
::remove_dir_all(&base_path
) {
622 Err(err
) => task_warn
!(worker
, "error cleaning up: {}", err
),
628 fn get_media_set_catalog(
629 inventory
: &Inventory
,
630 media_set_uuid
: &Uuid
,
631 ) -> Result
<MediaSetCatalog
, Error
> {
632 let status_path
= Path
::new(TAPE_STATUS_DIR
);
634 let members
= inventory
.compute_media_set_members(media_set_uuid
)?
;
635 let media_list
= members
.media_list();
636 let mut catalog
= MediaSetCatalog
::new();
638 for (seq_nr
, media_uuid
) in media_list
.iter().enumerate() {
642 "media set {} is incomplete (missing member {}).",
647 Some(media_uuid
) => {
648 let media_id
= inventory
.lookup_media(media_uuid
).unwrap();
649 let media_catalog
= MediaCatalog
::open(status_path
, &media_id
, false, false)?
;
650 catalog
.append_catalog(media_catalog
)?
;
658 fn restore_snapshots_to_tmpdir(
659 worker
: Arc
<WorkerTask
>,
662 mut drive
: Box
<dyn TapeDriver
>,
664 media_set_uuid
: &Uuid
,
665 chunks_list
: &mut HashMap
<String
, HashSet
<[u8; 32]>>,
666 ) -> Result
<(), Error
> {
667 match media_id
.media_set_label
{
670 "missing media set label on media {} ({})",
671 media_id
.label
.label_text
,
676 if set
.uuid
!= *media_set_uuid
{
678 "wrong media set label on media {} ({} != {})",
679 media_id
.label
.label_text
,
684 let encrypt_fingerprint
= set
.encryption_key_fingerprint
.clone().map(|fp
| {
685 task_log
!(worker
, "Encryption key fingerprint: {}", fp
);
686 (fp
, set
.uuid
.clone())
689 drive
.set_encryption(encrypt_fingerprint
)?
;
693 for file_num
in file_list
{
694 let current_file_number
= drive
.current_file_number()?
;
695 if current_file_number
!= *file_num
{
696 task_log
!(worker
, "was at file {}, moving to {}", current_file_number
, file_num
);
697 drive
.move_to_file(*file_num
)?
;
698 let current_file_number
= drive
.current_file_number()?
;
699 task_log
!(worker
, "now at file {}", current_file_number
);
701 let mut reader
= drive
.read_next_file()?
;
703 let header
: MediaContentHeader
= unsafe { reader.read_le_value()? }
;
704 if header
.magic
!= PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0
{
705 bail
!("missing MediaContentHeader");
708 match header
.content_magic
{
709 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1
=> {
710 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
712 let archive_header
: SnapshotArchiveHeader
= serde_json
::from_slice(&header_data
)
714 format_err
!("unable to parse snapshot archive header - {}", err
)
717 let source_datastore
= archive_header
.store
;
718 let snapshot
= archive_header
.snapshot
;
722 "File {}: snapshot archive {}:{}",
728 let mut decoder
= pxar
::decoder
::sync
::Decoder
::from_std(reader
)?
;
730 let mut tmp_path
= path
.clone();
731 tmp_path
.push(&source_datastore
);
732 tmp_path
.push(snapshot
);
733 std
::fs
::create_dir_all(&tmp_path
)?
;
735 let chunks
= chunks_list
736 .entry(source_datastore
)
737 .or_insert_with(HashSet
::new
);
738 let manifest
= try_restore_snapshot_archive(worker
.clone(), &mut decoder
, &tmp_path
)?
;
739 for item
in manifest
.files() {
740 let mut archive_path
= tmp_path
.to_owned();
741 archive_path
.push(&item
.filename
);
743 let index
: Box
<dyn IndexFile
> = match archive_type(&item
.filename
)?
{
744 ArchiveType
::DynamicIndex
=> {
745 Box
::new(DynamicIndexReader
::open(&archive_path
)?
)
747 ArchiveType
::FixedIndex
=> {
748 Box
::new(FixedIndexReader
::open(&archive_path
)?
)
750 ArchiveType
::Blob
=> continue,
752 for i
in 0..index
.index_count() {
753 if let Some(digest
) = index
.index_digest(i
) {
754 chunks
.insert(*digest
);
759 other
=> bail
!("unexpected file type: {:?}", other
),
766 fn restore_file_chunk_map(
767 worker
: Arc
<WorkerTask
>,
768 drive
: &mut Box
<dyn TapeDriver
>,
769 store_map
: &DataStoreMap
,
770 file_chunk_map
: &mut BTreeMap
<u64, HashSet
<[u8; 32]>>,
771 ) -> Result
<(), Error
> {
772 for (nr
, chunk_map
) in file_chunk_map
.iter_mut() {
773 let current_file_number
= drive
.current_file_number()?
;
774 if current_file_number
!= *nr
{
775 task_log
!(worker
, "was at file {}, moving to {}", current_file_number
, nr
);
776 drive
.move_to_file(*nr
)?
;
777 let current_file_number
= drive
.current_file_number()?
;
778 task_log
!(worker
, "now at file {}", current_file_number
);
780 let mut reader
= drive
.read_next_file()?
;
781 let header
: MediaContentHeader
= unsafe { reader.read_le_value()? }
;
782 if header
.magic
!= PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0
{
783 bail
!("missing MediaContentHeader");
786 match header
.content_magic
{
787 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1
=> {
788 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
790 let archive_header
: ChunkArchiveHeader
= serde_json
::from_slice(&header_data
)
791 .map_err(|err
| format_err
!("unable to parse chunk archive header - {}", err
))?
;
793 let source_datastore
= archive_header
.store
;
797 "File {}: chunk archive for datastore '{}'",
802 let datastore
= store_map
.get_datastore(&source_datastore
).ok_or_else(|| {
803 format_err
!("unexpected chunk archive for store: {}", source_datastore
)
806 let count
= restore_partial_chunk_archive(worker
.clone(), reader
, datastore
.clone(), chunk_map
)?
;
807 task_log
!(worker
, "restored {} chunks", count
);
809 _
=> bail
!("unexpected content magic {:?}", header
.content_magic
),
816 fn restore_partial_chunk_archive
<'a
>(
817 worker
: Arc
<WorkerTask
>,
818 reader
: Box
<dyn 'a
+ TapeRead
>,
819 datastore
: Arc
<DataStore
>,
820 chunk_list
: &mut HashSet
<[u8; 32]>,
821 ) -> Result
<usize, Error
> {
822 let mut decoder
= ChunkArchiveDecoder
::new(reader
);
826 let start_time
= std
::time
::SystemTime
::now();
827 let bytes
= Arc
::new(std
::sync
::atomic
::AtomicU64
::new(0));
828 let bytes2
= bytes
.clone();
830 let writer_pool
= ParallelHandler
::new(
831 "tape restore chunk writer",
833 move |(chunk
, digest
): (DataBlob
, [u8; 32])| {
834 if !datastore
.cond_touch_chunk(&digest
, false)?
{
835 bytes2
.fetch_add(chunk
.raw_size(), std
::sync
::atomic
::Ordering
::SeqCst
);
837 if chunk
.crypt_mode()?
== CryptMode
::None
{
838 chunk
.decode(None
, Some(&digest
))?
; // verify digest
841 datastore
.insert_chunk(&chunk
, &digest
)?
;
847 let verify_and_write_channel
= writer_pool
.channel();
850 let (digest
, blob
) = match decoder
.next_chunk()?
{
851 Some((digest
, blob
)) => (digest
, blob
),
855 worker
.check_abort()?
;
857 if chunk_list
.remove(&digest
) {
858 verify_and_write_channel
.send((blob
, digest
.clone()))?
;
862 if chunk_list
.is_empty() {
867 drop(verify_and_write_channel
);
869 writer_pool
.complete()?
;
871 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
873 let bytes
= bytes
.load(std
::sync
::atomic
::Ordering
::SeqCst
);
877 "restored {} bytes ({:.2} MB/s)",
879 (bytes
as f64) / (1_000_000.0 * elapsed
)
886 /// Request and restore complete media without using existing catalog (create catalog instead)
887 pub fn request_and_restore_media(
888 worker
: Arc
<WorkerTask
>,
890 drive_config
: &SectionConfigData
,
892 store_map
: &DataStoreMap
,
893 checked_chunks_map
: &mut HashMap
<String
, HashSet
<[u8;32]>>,
894 restore_owner
: &Authid
,
895 email
: &Option
<String
>,
896 ) -> Result
<(), Error
> {
897 let media_set_uuid
= match media_id
.media_set_label
{
898 None
=> bail
!("restore_media: no media set - internal error"),
899 Some(ref set
) => &set
.uuid
,
902 let (mut drive
, info
) = request_and_load_media(&worker
, &drive_config
, &drive_name
, &media_id
.label
, email
)?
;
904 match info
.media_set_label
{
906 bail
!("missing media set label on media {} ({})",
907 media_id
.label
.label_text
, media_id
.label
.uuid
);
910 if &set
.uuid
!= media_set_uuid
{
911 bail
!("wrong media set label on media {} ({} != {})",
912 media_id
.label
.label_text
, media_id
.label
.uuid
,
915 let encrypt_fingerprint
= set
.encryption_key_fingerprint
.clone()
916 .map(|fp
| (fp
, set
.uuid
.clone()));
918 drive
.set_encryption(encrypt_fingerprint
)?
;
926 Some((&store_map
, restore_owner
)),
932 /// Restore complete media content and catalog
934 /// Only create the catalog if target is None.
935 pub fn restore_media(
936 worker
: Arc
<WorkerTask
>,
937 drive
: &mut Box
<dyn TapeDriver
>,
939 target
: Option
<(&DataStoreMap
, &Authid
)>,
940 checked_chunks_map
: &mut HashMap
<String
, HashSet
<[u8;32]>>,
942 ) -> Result
<(), Error
> {
944 let status_path
= Path
::new(TAPE_STATUS_DIR
);
945 let mut catalog
= MediaCatalog
::create_temporary_database(status_path
, media_id
, false)?
;
948 let current_file_number
= drive
.current_file_number()?
;
949 let reader
= match drive
.read_next_file() {
950 Err(BlockReadError
::EndOfFile
) => {
951 task_log
!(worker
, "skip unexpected filemark at pos {}", current_file_number
);
954 Err(BlockReadError
::EndOfStream
) => {
955 task_log
!(worker
, "detected EOT after {} files", current_file_number
);
958 Err(BlockReadError
::Error(err
)) => {
959 return Err(err
.into());
961 Ok(reader
) => reader
,
964 restore_archive(worker
.clone(), reader
, current_file_number
, target
, &mut catalog
, checked_chunks_map
, verbose
)?
;
969 MediaCatalog
::finish_temporary_database(status_path
, &media_id
.label
.uuid
, true)?
;
974 fn restore_archive
<'a
>(
975 worker
: Arc
<WorkerTask
>,
976 mut reader
: Box
<dyn 'a
+ TapeRead
>,
977 current_file_number
: u64,
978 target
: Option
<(&DataStoreMap
, &Authid
)>,
979 catalog
: &mut MediaCatalog
,
980 checked_chunks_map
: &mut HashMap
<String
, HashSet
<[u8;32]>>,
982 ) -> Result
<(), Error
> {
983 let header
: MediaContentHeader
= unsafe { reader.read_le_value()? }
;
984 if header
.magic
!= PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0
{
985 bail
!("missing MediaContentHeader");
988 //println!("Found MediaContentHeader: {:?}", header);
990 match header
.content_magic
{
991 PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0
| PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0
=> {
992 bail
!("unexpected content magic (label)");
994 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0
=> {
995 bail
!("unexpected snapshot archive version (v1.0)");
997 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1
=> {
998 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
1000 let archive_header
: SnapshotArchiveHeader
= serde_json
::from_slice(&header_data
)
1001 .map_err(|err
| format_err
!("unable to parse snapshot archive header - {}", err
))?
;
1003 let datastore_name
= archive_header
.store
;
1004 let snapshot
= archive_header
.snapshot
;
1006 task_log
!(worker
, "File {}: snapshot archive {}:{}", current_file_number
, datastore_name
, snapshot
);
1008 let backup_dir
: BackupDir
= snapshot
.parse()?
;
1010 if let Some((store_map
, authid
)) = target
.as_ref() {
1011 if let Some(datastore
) = store_map
.get_datastore(&datastore_name
) {
1012 let (owner
, _group_lock
) =
1013 datastore
.create_locked_backup_group(backup_dir
.group(), authid
)?
;
1014 if *authid
!= &owner
{
1015 // only the owner is allowed to create additional snapshots
1017 "restore '{}' failed - owner check failed ({} != {})",
1024 let (rel_path
, is_new
, _snap_lock
) =
1025 datastore
.create_locked_backup_dir(&backup_dir
)?
;
1026 let mut path
= datastore
.base_path();
1027 path
.push(rel_path
);
1030 task_log
!(worker
, "restore snapshot {}", backup_dir
);
1032 match restore_snapshot_archive(worker
.clone(), reader
, &path
) {
1034 std
::fs
::remove_dir_all(&path
)?
;
1035 bail
!("restore snapshot {} failed - {}", backup_dir
, err
);
1038 std
::fs
::remove_dir_all(&path
)?
;
1039 task_log
!(worker
, "skip incomplete snapshot {}", backup_dir
);
1042 catalog
.register_snapshot(
1043 Uuid
::from(header
.uuid
),
1044 current_file_number
,
1048 catalog
.commit_if_large()?
;
1054 task_log
!(worker
, "skipping...");
1058 reader
.skip_data()?
; // read all data
1059 if let Ok(false) = reader
.is_incomplete() {
1060 catalog
.register_snapshot(Uuid
::from(header
.uuid
), current_file_number
, &datastore_name
, &snapshot
)?
;
1061 catalog
.commit_if_large()?
;
1064 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0
=> {
1065 bail
!("unexpected chunk archive version (v1.0)");
1067 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1
=> {
1068 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
1070 let archive_header
: ChunkArchiveHeader
= serde_json
::from_slice(&header_data
)
1071 .map_err(|err
| format_err
!("unable to parse chunk archive header - {}", err
))?
;
1073 let source_datastore
= archive_header
.store
;
1075 task_log
!(worker
, "File {}: chunk archive for datastore '{}'", current_file_number
, source_datastore
);
1076 let datastore
= target
1078 .and_then(|t
| t
.0.get_datastore(&source_datastore
));
1080 if datastore
.is_some() || target
.is_none() {
1081 let checked_chunks
= checked_chunks_map
1082 .entry(datastore
.as_ref().map(|d
| d
.name()).unwrap_or("_unused_").to_string())
1083 .or_insert(HashSet
::new());
1085 let chunks
= if let Some(datastore
) = datastore
{
1086 restore_chunk_archive(worker
.clone(), reader
, datastore
, checked_chunks
, verbose
)?
1088 scan_chunk_archive(worker
.clone(), reader
, verbose
)?
1091 if let Some(chunks
) = chunks
{
1092 catalog
.register_chunk_archive(
1093 Uuid
::from(header
.uuid
),
1094 current_file_number
,
1098 task_log
!(worker
, "register {} chunks", chunks
.len());
1099 catalog
.commit_if_large()?
;
1102 } else if target
.is_some() {
1103 task_log
!(worker
, "skipping...");
1106 reader
.skip_data()?
; // read all data
1108 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
=> {
1109 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
1111 let archive_header
: CatalogArchiveHeader
= serde_json
::from_slice(&header_data
)
1112 .map_err(|err
| format_err
!("unable to parse catalog archive header - {}", err
))?
;
1114 task_log
!(worker
, "File {}: skip catalog '{}'", current_file_number
, archive_header
.uuid
);
1116 reader
.skip_data()?
; // read all data
1118 _
=> bail
!("unknown content magic {:?}", header
.content_magic
),
1124 // Read chunk archive without restoring data - just record contained chunks
1125 fn scan_chunk_archive
<'a
>(
1126 worker
: Arc
<WorkerTask
>,
1127 reader
: Box
<dyn 'a
+ TapeRead
>,
1129 ) -> Result
<Option
<Vec
<[u8;32]>>, Error
> {
1131 let mut chunks
= Vec
::new();
1133 let mut decoder
= ChunkArchiveDecoder
::new(reader
);
1136 let digest
= match decoder
.next_chunk() {
1137 Ok(Some((digest
, _blob
))) => digest
,
1140 let reader
= decoder
.reader();
1142 // check if this stream is marked incomplete
1143 if let Ok(true) = reader
.is_incomplete() {
1144 return Ok(Some(chunks
));
1147 // check if this is an aborted stream without end marker
1148 if let Ok(false) = reader
.has_end_marker() {
1149 task_log
!(worker
, "missing stream end marker");
1153 // else the archive is corrupt
1158 worker
.check_abort()?
;
1161 task_log
!(worker
, "Found chunk: {}", proxmox
::tools
::digest_to_hex(&digest
));
1164 chunks
.push(digest
);
1170 fn restore_chunk_archive
<'a
>(
1171 worker
: Arc
<WorkerTask
>,
1172 reader
: Box
<dyn 'a
+ TapeRead
>,
1173 datastore
: Arc
<DataStore
>,
1174 checked_chunks
: &mut HashSet
<[u8;32]>,
1176 ) -> Result
<Option
<Vec
<[u8;32]>>, Error
> {
1178 let mut chunks
= Vec
::new();
1180 let mut decoder
= ChunkArchiveDecoder
::new(reader
);
1182 let datastore2
= datastore
.clone();
1183 let start_time
= std
::time
::SystemTime
::now();
1184 let bytes
= Arc
::new(std
::sync
::atomic
::AtomicU64
::new(0));
1185 let bytes2
= bytes
.clone();
1187 let worker2
= worker
.clone();
1189 let writer_pool
= ParallelHandler
::new(
1190 "tape restore chunk writer",
1192 move |(chunk
, digest
): (DataBlob
, [u8; 32])| {
1193 let chunk_exists
= datastore2
.cond_touch_chunk(&digest
, false)?
;
1196 task_log
!(worker2
, "Insert chunk: {}", proxmox
::tools
::digest_to_hex(&digest
));
1198 bytes2
.fetch_add(chunk
.raw_size(), std
::sync
::atomic
::Ordering
::SeqCst
);
1199 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
1200 chunk
.verify_crc()?
;
1201 if chunk
.crypt_mode()?
== CryptMode
::None
{
1202 chunk
.decode(None
, Some(&digest
))?
; // verify digest
1205 datastore2
.insert_chunk(&chunk
, &digest
)?
;
1207 task_log
!(worker2
, "Found existing chunk: {}", proxmox
::tools
::digest_to_hex(&digest
));
1213 let verify_and_write_channel
= writer_pool
.channel();
1217 let (digest
, blob
) = match decoder
.next_chunk() {
1218 Ok(Some((digest
, blob
))) => (digest
, blob
),
1221 let reader
= decoder
.reader();
1223 // check if this stream is marked incomplete
1224 if let Ok(true) = reader
.is_incomplete() {
1225 return Ok(Some(chunks
));
1228 // check if this is an aborted stream without end marker
1229 if let Ok(false) = reader
.has_end_marker() {
1230 task_log
!(worker
, "missing stream end marker");
1234 // else the archive is corrupt
1239 worker
.check_abort()?
;
1241 if !checked_chunks
.contains(&digest
) {
1242 verify_and_write_channel
.send((blob
, digest
.clone()))?
;
1243 checked_chunks
.insert(digest
.clone());
1245 chunks
.push(digest
);
1248 drop(verify_and_write_channel
);
1250 writer_pool
.complete()?
;
1252 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
1254 let bytes
= bytes
.load(std
::sync
::atomic
::Ordering
::SeqCst
);
1258 "restored {} bytes ({:.2} MB/s)",
1260 (bytes
as f64) / (1_000_000.0 * elapsed
)
1266 fn restore_snapshot_archive
<'a
>(
1267 worker
: Arc
<WorkerTask
>,
1268 reader
: Box
<dyn 'a
+ TapeRead
>,
1269 snapshot_path
: &Path
,
1270 ) -> Result
<bool
, Error
> {
1272 let mut decoder
= pxar
::decoder
::sync
::Decoder
::from_std(reader
)?
;
1273 match try_restore_snapshot_archive(worker
, &mut decoder
, snapshot_path
) {
1276 let reader
= decoder
.input();
1278 // check if this stream is marked incomplete
1279 if let Ok(true) = reader
.is_incomplete() {
1283 // check if this is an aborted stream without end marker
1284 if let Ok(false) = reader
.has_end_marker() {
1288 // else the archive is corrupt
1294 fn try_restore_snapshot_archive
<R
: pxar
::decoder
::SeqRead
>(
1295 worker
: Arc
<WorkerTask
>,
1296 decoder
: &mut pxar
::decoder
::sync
::Decoder
<R
>,
1297 snapshot_path
: &Path
,
1298 ) -> Result
<BackupManifest
, Error
> {
1300 let _root
= match decoder
.next() {
1301 None
=> bail
!("missing root entry"),
1305 pxar
::EntryKind
::Directory
=> { /* Ok */ }
1306 _
=> bail
!("wrong root entry type"),
1312 let root_path
= Path
::new("/");
1313 let manifest_file_name
= OsStr
::new(MANIFEST_BLOB_NAME
);
1315 let mut manifest
= None
;
1318 worker
.check_abort()?
;
1320 let entry
= match decoder
.next() {
1322 Some(entry
) => entry?
,
1324 let entry_path
= entry
.path();
1326 match entry
.kind() {
1327 pxar
::EntryKind
::File { .. }
=> { /* Ok */ }
1328 _
=> bail
!("wrong entry type for {:?}", entry_path
),
1330 match entry_path
.parent() {
1331 None
=> bail
!("wrong parent for {:?}", entry_path
),
1334 bail
!("wrong parent for {:?}", entry_path
);
1339 let filename
= entry
.file_name();
1340 let mut contents
= match decoder
.contents() {
1341 None
=> bail
!("missing file content"),
1342 Some(contents
) => contents
,
1345 let mut archive_path
= snapshot_path
.to_owned();
1346 archive_path
.push(&filename
);
1348 let mut tmp_path
= archive_path
.clone();
1349 tmp_path
.set_extension("tmp");
1351 if filename
== manifest_file_name
{
1353 let blob
= DataBlob
::load_from_reader(&mut contents
)?
;
1354 let mut old_manifest
= BackupManifest
::try_from(blob
)?
;
1356 // Remove verify_state to indicate that this snapshot is not verified
1357 old_manifest
.unprotected
1359 .map(|m
| m
.remove("verify_state"));
1361 let old_manifest
= serde_json
::to_string_pretty(&old_manifest
)?
;
1362 let blob
= DataBlob
::encode(old_manifest
.as_bytes(), None
, true)?
;
1364 let options
= CreateOptions
::new();
1365 replace_file(&tmp_path
, blob
.raw_data(), options
)?
;
1367 manifest
= Some(BackupManifest
::try_from(blob
)?
);
1369 let mut tmpfile
= std
::fs
::OpenOptions
::new()
1374 .map_err(|err
| format_err
!("restore {:?} failed - {}", tmp_path
, err
))?
;
1376 std
::io
::copy(&mut contents
, &mut tmpfile
)?
;
1378 if let Err(err
) = std
::fs
::rename(&tmp_path
, &archive_path
) {
1379 bail
!("Atomic rename file {:?} failed - {}", archive_path
, err
);
1384 let manifest
= match manifest
{
1385 None
=> bail
!("missing manifest"),
1386 Some(manifest
) => manifest
,
1389 // Do not verify anything here, because this would be to slow (causes tape stops).
1392 let mut manifest_path
= snapshot_path
.to_owned();
1393 manifest_path
.push(MANIFEST_BLOB_NAME
);
1394 let mut tmp_manifest_path
= manifest_path
.clone();
1395 tmp_manifest_path
.set_extension("tmp");
1397 if let Err(err
) = std
::fs
::rename(&tmp_manifest_path
, &manifest_path
) {
1398 bail
!("Atomic rename manifest {:?} failed - {}", manifest_path
, err
);
1404 /// Try to restore media catalogs (form catalog_archives)
1405 pub fn fast_catalog_restore(
1406 worker
: &WorkerTask
,
1407 drive
: &mut Box
<dyn TapeDriver
>,
1408 media_set
: &MediaSet
,
1409 uuid
: &Uuid
, // current media Uuid
1410 ) -> Result
<bool
, Error
> {
1412 let status_path
= Path
::new(TAPE_STATUS_DIR
);
1414 let current_file_number
= drive
.current_file_number()?
;
1415 if current_file_number
!= 2 {
1416 bail
!("fast_catalog_restore: wrong media position - internal error");
1419 let mut found_catalog
= false;
1421 let mut moved_to_eom
= false;
1424 let current_file_number
= drive
.current_file_number()?
;
1426 { // limit reader scope
1427 let mut reader
= match drive
.read_next_file() {
1428 Err(BlockReadError
::EndOfFile
) => {
1429 task_log
!(worker
, "skip unexpected filemark at pos {}", current_file_number
);
1432 Err(BlockReadError
::EndOfStream
) => {
1433 task_log
!(worker
, "detected EOT after {} files", current_file_number
);
1436 Err(BlockReadError
::Error(err
)) => {
1437 return Err(err
.into());
1439 Ok(reader
) => reader
,
1442 let header
: MediaContentHeader
= unsafe { reader.read_le_value()? }
;
1443 if header
.magic
!= PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0
{
1444 bail
!("missing MediaContentHeader");
1447 if header
.content_magic
== PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
{
1448 task_log
!(worker
, "found catalog at pos {}", current_file_number
);
1450 let header_data
= reader
.read_exact_allocated(header
.size
as usize)?
;
1452 let archive_header
: CatalogArchiveHeader
= serde_json
::from_slice(&header_data
)
1453 .map_err(|err
| format_err
!("unable to parse catalog archive header - {}", err
))?
;
1455 if &archive_header
.media_set_uuid
!= media_set
.uuid() {
1456 task_log
!(worker
, "skipping unrelated catalog at pos {}", current_file_number
);
1457 reader
.skip_data()?
; // read all data
1461 let catalog_uuid
= &archive_header
.uuid
;
1463 let wanted
= media_set
1469 Some(uuid
) => uuid
== catalog_uuid
,
1475 task_log
!(worker
, "skip catalog because media '{}' not inventarized", catalog_uuid
);
1476 reader
.skip_data()?
; // read all data
1480 if catalog_uuid
== uuid
{
1481 // always restore and overwrite catalog
1483 // only restore if catalog does not exist
1484 if MediaCatalog
::exists(status_path
, catalog_uuid
) {
1485 task_log
!(worker
, "catalog for media '{}' already exists", catalog_uuid
);
1486 reader
.skip_data()?
; // read all data
1491 let mut file
= MediaCatalog
::create_temporary_database_file(status_path
, catalog_uuid
)?
;
1493 std
::io
::copy(&mut reader
, &mut file
)?
;
1495 file
.seek(SeekFrom
::Start(0))?
;
1497 match MediaCatalog
::parse_catalog_header(&mut file
)?
{
1498 (true, Some(media_uuid
), Some(media_set_uuid
)) => {
1499 if &media_uuid
!= catalog_uuid
{
1500 task_log
!(worker
, "catalog uuid missmatch at pos {}", current_file_number
);
1503 if media_set_uuid
!= archive_header
.media_set_uuid
{
1504 task_log
!(worker
, "catalog media_set missmatch at pos {}", current_file_number
);
1508 MediaCatalog
::finish_temporary_database(status_path
, &media_uuid
, true)?
;
1510 if catalog_uuid
== uuid
{
1511 task_log
!(worker
, "successfully restored catalog");
1512 found_catalog
= true
1514 task_log
!(worker
, "successfully restored related catalog {}", media_uuid
);
1518 task_warn
!(worker
, "got incomplete catalog header - skip file");
1528 break; // already done - stop
1530 moved_to_eom
= true;
1532 task_log
!(worker
, "searching for catalog at EOT (moving to EOT)");
1533 drive
.move_to_last_file()?
;
1535 let new_file_number
= drive
.current_file_number()?
;
1537 if new_file_number
< (current_file_number
+ 1) {
1538 break; // no new content - stop