]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/tape/restore.rs
cleanup schema function calls
[proxmox-backup.git] / src / api2 / tape / restore.rs
1 use std::path::{Path, PathBuf};
2 use std::ffi::OsStr;
3 use std::collections::{HashMap, HashSet, BTreeMap};
4 use std::convert::TryFrom;
5 use std::io::{Seek, SeekFrom};
6 use std::sync::Arc;
7
8 use anyhow::{bail, format_err, Error};
9 use serde_json::Value;
10
11 use proxmox_sys::fs::{replace_file, CreateOptions};
12 use proxmox_io::ReadExt;
13 use proxmox_router::{Permission, Router, RpcEnvironment, RpcEnvironmentType};
14 use proxmox_schema::api;
15 use proxmox_section_config::SectionConfigData;
16 use proxmox_uuid::Uuid;
17 use proxmox_sys::{task_log, task_warn, WorkerTaskContext};
18
19 use pbs_api_types::{
20 Authid, Userid, CryptMode,
21 DATASTORE_MAP_ARRAY_SCHEMA, DATASTORE_MAP_LIST_SCHEMA, DRIVE_NAME_SCHEMA,
22 UPID_SCHEMA, TAPE_RESTORE_SNAPSHOT_SCHEMA,
23 PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_TAPE_READ,
24 };
25 use pbs_datastore::{DataStore, DataBlob};
26 use pbs_datastore::backup_info::BackupDir;
27 use pbs_datastore::dynamic_index::DynamicIndexReader;
28 use pbs_datastore::fixed_index::FixedIndexReader;
29 use pbs_datastore::index::IndexFile;
30 use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, MANIFEST_BLOB_NAME};
31 use pbs_config::CachedUserInfo;
32 use pbs_tape::{
33 TapeRead, BlockReadError, MediaContentHeader,
34 PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0,
35 };
36 use proxmox_rest_server::WorkerTask;
37
38 use crate::{
39 tools::ParallelHandler,
40 server::lookup_user_email,
41 tape::{
42 TAPE_STATUS_DIR,
43 MediaId,
44 MediaSet,
45 MediaCatalog,
46 MediaSetCatalog,
47 Inventory,
48 lock_media_set,
49 file_formats::{
50 PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0,
51 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0,
52 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1,
53 PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0,
54 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0,
55 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1,
56 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0,
57 ChunkArchiveHeader,
58 ChunkArchiveDecoder,
59 SnapshotArchiveHeader,
60 CatalogArchiveHeader,
61 },
62 drive::{
63 TapeDriver,
64 request_and_load_media,
65 lock_tape_device,
66 set_tape_device_state,
67 },
68 },
69 };
70
71 const RESTORE_TMP_DIR: &str = "/var/tmp/proxmox-backup";
72
73 pub struct DataStoreMap {
74 map: HashMap<String, Arc<DataStore>>,
75 default: Option<Arc<DataStore>>,
76 }
77
78 impl TryFrom<String> for DataStoreMap {
79 type Error = Error;
80
81 fn try_from(value: String) -> Result<Self, Error> {
82 let value = DATASTORE_MAP_ARRAY_SCHEMA.parse_property_string(&value)?;
83 let mut mapping: Vec<String> = value
84 .as_array()
85 .unwrap()
86 .iter()
87 .map(|v| v.as_str().unwrap().to_string())
88 .collect();
89
90 let mut map = HashMap::new();
91 let mut default = None;
92 while let Some(mut store) = mapping.pop() {
93 if let Some(index) = store.find('=') {
94 let mut target = store.split_off(index);
95 target.remove(0); // remove '='
96 let datastore = DataStore::lookup_datastore(&target)?;
97 map.insert(store, datastore);
98 } else if default.is_none() {
99 default = Some(DataStore::lookup_datastore(&store)?);
100 } else {
101 bail!("multiple default stores given");
102 }
103 }
104
105 Ok(Self { map, default })
106 }
107 }
108
109 impl DataStoreMap {
110 fn used_datastores<'a>(&self) -> HashSet<&str> {
111 let mut set = HashSet::new();
112 for store in self.map.values() {
113 set.insert(store.name());
114 }
115
116 if let Some(ref store) = self.default {
117 set.insert(store.name());
118 }
119
120 set
121 }
122
123 fn get_datastore(&self, source: &str) -> Option<Arc<DataStore>> {
124 if let Some(store) = self.map.get(source) {
125 return Some(Arc::clone(store));
126 }
127 if let Some(ref store) = self.default {
128 return Some(Arc::clone(store));
129 }
130
131 return None;
132 }
133 }
134
135 fn check_datastore_privs(
136 user_info: &CachedUserInfo,
137 store: &str,
138 auth_id: &Authid,
139 owner: &Option<Authid>,
140 ) -> Result<(), Error> {
141 let privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
142 if (privs & PRIV_DATASTORE_BACKUP) == 0 {
143 bail!("no permissions on /datastore/{}", store);
144 }
145
146 if let Some(ref owner) = owner {
147 let correct_owner = owner == auth_id
148 || (owner.is_token() && !auth_id.is_token() && owner.user() == auth_id.user());
149
150 // same permission as changing ownership after syncing
151 if !correct_owner && privs & PRIV_DATASTORE_MODIFY == 0 {
152 bail!("no permission to restore as '{}'", owner);
153 }
154 }
155
156 Ok(())
157 }
158
159 pub const ROUTER: Router = Router::new().post(&API_METHOD_RESTORE);
160
161 #[api(
162 input: {
163 properties: {
164 store: {
165 schema: DATASTORE_MAP_LIST_SCHEMA,
166 },
167 drive: {
168 schema: DRIVE_NAME_SCHEMA,
169 },
170 "media-set": {
171 description: "Media set UUID.",
172 type: String,
173 },
174 "notify-user": {
175 type: Userid,
176 optional: true,
177 },
178 "snapshots": {
179 description: "List of snapshots.",
180 type: Array,
181 optional: true,
182 items: {
183 schema: TAPE_RESTORE_SNAPSHOT_SCHEMA,
184 },
185 },
186 owner: {
187 type: Authid,
188 optional: true,
189 },
190 },
191 },
192 returns: {
193 schema: UPID_SCHEMA,
194 },
195 access: {
196 // Note: parameters are no uri parameter, so we need to test inside function body
197 description: "The user needs Tape.Read privilege on /tape/pool/{pool} \
198 and /tape/drive/{drive}, Datastore.Backup privilege on /datastore/{store}.",
199 permission: &Permission::Anybody,
200 },
201 )]
202 /// Restore data from media-set
203 pub fn restore(
204 store: String,
205 drive: String,
206 media_set: String,
207 notify_user: Option<Userid>,
208 snapshots: Option<Vec<String>>,
209 owner: Option<Authid>,
210 rpcenv: &mut dyn RpcEnvironment,
211 ) -> Result<Value, Error> {
212 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
213 let user_info = CachedUserInfo::new()?;
214
215 let store_map = DataStoreMap::try_from(store)
216 .map_err(|err| format_err!("cannot parse store mapping: {}", err))?;
217 let used_datastores = store_map.used_datastores();
218 if used_datastores.len() == 0 {
219 bail!("no datastores given");
220 }
221
222 for store in used_datastores.iter() {
223 check_datastore_privs(&user_info, &store, &auth_id, &owner)?;
224 }
225
226 let privs = user_info.lookup_privs(&auth_id, &["tape", "drive", &drive]);
227 if (privs & PRIV_TAPE_READ) == 0 {
228 bail!("no permissions on /tape/drive/{}", drive);
229 }
230
231 let media_set_uuid = media_set.parse()?;
232
233 let status_path = Path::new(TAPE_STATUS_DIR);
234
235 let _lock = lock_media_set(status_path, &media_set_uuid, None)?;
236
237 let inventory = Inventory::load(status_path)?;
238
239 let pool = inventory.lookup_media_set_pool(&media_set_uuid)?;
240
241 let privs = user_info.lookup_privs(&auth_id, &["tape", "pool", &pool]);
242 if (privs & PRIV_TAPE_READ) == 0 {
243 bail!("no permissions on /tape/pool/{}", pool);
244 }
245
246 let (drive_config, _digest) = pbs_config::drive::config()?;
247
248 // early check/lock before starting worker
249 let drive_lock = lock_tape_device(&drive_config, &drive)?;
250
251 let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
252
253 let taskid = used_datastores
254 .iter()
255 .map(|s| s.to_string())
256 .collect::<Vec<String>>()
257 .join(", ");
258
259 let upid_str = WorkerTask::new_thread(
260 "tape-restore",
261 Some(taskid),
262 auth_id.to_string(),
263 to_stdout,
264 move |worker| {
265 let _drive_lock = drive_lock; // keep lock guard
266
267 set_tape_device_state(&drive, &worker.upid().to_string())?;
268
269 let restore_owner = owner.as_ref().unwrap_or(&auth_id);
270
271 let email = notify_user
272 .as_ref()
273 .and_then(|userid| lookup_user_email(userid))
274 .or_else(|| lookup_user_email(&auth_id.clone().into()));
275
276 task_log!(worker, "Mediaset '{}'", media_set);
277 task_log!(worker, "Pool: {}", pool);
278
279 let res = if let Some(snapshots) = snapshots {
280 restore_list_worker(
281 worker.clone(),
282 snapshots,
283 inventory,
284 media_set_uuid,
285 drive_config,
286 &drive,
287 store_map,
288 restore_owner,
289 email,
290 )
291 } else {
292 restore_full_worker(
293 worker.clone(),
294 inventory,
295 media_set_uuid,
296 drive_config,
297 &drive,
298 store_map,
299 restore_owner,
300 email,
301 )
302 };
303
304 if res.is_ok() {
305 task_log!(worker, "Restore mediaset '{}' done", media_set);
306 }
307
308 if let Err(err) = set_tape_device_state(&drive, "") {
309 task_log!(
310 worker,
311 "could not unset drive state for {}: {}",
312 drive,
313 err
314 );
315 }
316
317 res
318 }
319 )?;
320
321 Ok(upid_str.into())
322 }
323
324 fn restore_full_worker(
325 worker: Arc<WorkerTask>,
326 inventory: Inventory,
327 media_set_uuid: Uuid,
328 drive_config: SectionConfigData,
329 drive_name: &str,
330 store_map: DataStoreMap,
331 restore_owner: &Authid,
332 email: Option<String>,
333 ) -> Result<(), Error> {
334 let members = inventory.compute_media_set_members(&media_set_uuid)?;
335
336 let media_list = members.media_list();
337
338 let mut media_id_list = Vec::new();
339
340 let mut encryption_key_fingerprint = None;
341
342 for (seq_nr, media_uuid) in media_list.iter().enumerate() {
343 match media_uuid {
344 None => {
345 bail!("media set {} is incomplete (missing member {}).", media_set_uuid, seq_nr);
346 }
347 Some(media_uuid) => {
348 let media_id = inventory.lookup_media(media_uuid).unwrap();
349 if let Some(ref set) = media_id.media_set_label { // always true here
350 if encryption_key_fingerprint.is_none() && set.encryption_key_fingerprint.is_some() {
351 encryption_key_fingerprint = set.encryption_key_fingerprint.clone();
352 }
353 }
354 media_id_list.push(media_id);
355 }
356 }
357 }
358
359 if let Some(fingerprint) = encryption_key_fingerprint {
360 task_log!(worker, "Encryption key fingerprint: {}", fingerprint);
361 }
362
363 task_log!(
364 worker,
365 "Datastore(s): {}",
366 store_map
367 .used_datastores()
368 .into_iter()
369 .map(String::from)
370 .collect::<Vec<String>>()
371 .join(", "),
372 );
373
374 task_log!(worker, "Drive: {}", drive_name);
375 task_log!(
376 worker,
377 "Required media list: {}",
378 media_id_list.iter()
379 .map(|media_id| media_id.label.label_text.as_str())
380 .collect::<Vec<&str>>()
381 .join(";")
382 );
383
384 let mut datastore_locks = Vec::new();
385 for store_name in store_map.used_datastores() {
386 // explicit create shared lock to prevent GC on newly created chunks
387 if let Some(store) = store_map.get_datastore(store_name) {
388 let shared_store_lock = store.try_shared_chunk_store_lock()?;
389 datastore_locks.push(shared_store_lock);
390 }
391 }
392
393 let mut checked_chunks_map = HashMap::new();
394
395 for media_id in media_id_list.iter() {
396 request_and_restore_media(
397 worker.clone(),
398 media_id,
399 &drive_config,
400 drive_name,
401 &store_map,
402 &mut checked_chunks_map,
403 restore_owner,
404 &email,
405 )?;
406 }
407
408 Ok(())
409 }
410
411 fn restore_list_worker(
412 worker: Arc<WorkerTask>,
413 snapshots: Vec<String>,
414 inventory: Inventory,
415 media_set_uuid: Uuid,
416 drive_config: SectionConfigData,
417 drive_name: &str,
418 store_map: DataStoreMap,
419 restore_owner: &Authid,
420 email: Option<String>,
421 ) -> Result<(), Error> {
422 let base_path: PathBuf = format!("{}/{}", RESTORE_TMP_DIR, media_set_uuid).into();
423 std::fs::create_dir_all(&base_path)?;
424
425 let catalog = get_media_set_catalog(&inventory, &media_set_uuid)?;
426
427 let mut datastore_locks = Vec::new();
428 let mut snapshot_file_hash: BTreeMap<Uuid, Vec<u64>> = BTreeMap::new();
429 let mut snapshot_locks = HashMap::new();
430
431 let res = proxmox_lang::try_block!({
432 // assemble snapshot files/locks
433 for store_snapshot in snapshots.iter() {
434 let mut split = store_snapshot.splitn(2, ':');
435 let source_datastore = split
436 .next()
437 .ok_or_else(|| format_err!("invalid snapshot: {}", store_snapshot))?;
438 let snapshot = split
439 .next()
440 .ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?;
441 let backup_dir: BackupDir = snapshot.parse()?;
442
443 let datastore = store_map.get_datastore(source_datastore).ok_or_else(|| {
444 format_err!(
445 "could not find mapping for source datastore: {}",
446 source_datastore
447 )
448 })?;
449
450 let (owner, _group_lock) =
451 datastore.create_locked_backup_group(backup_dir.group(), &restore_owner)?;
452 if restore_owner != &owner {
453 // only the owner is allowed to create additional snapshots
454 bail!(
455 "restore '{}' failed - owner check failed ({} != {})",
456 snapshot,
457 restore_owner,
458 owner
459 );
460 }
461
462 let (media_id, file_num) = if let Some((media_uuid, file_num)) =
463 catalog.lookup_snapshot(&source_datastore, &snapshot)
464 {
465 let media_id = inventory.lookup_media(media_uuid).unwrap();
466 (media_id, file_num)
467 } else {
468 task_warn!(
469 worker,
470 "did not find snapshot '{}' in media set {}",
471 snapshot,
472 media_set_uuid
473 );
474 continue;
475 };
476
477 let (_rel_path, is_new, snap_lock) = datastore.create_locked_backup_dir(&backup_dir)?;
478
479 if !is_new {
480 task_log!(
481 worker,
482 "found snapshot {} on target datastore, skipping...",
483 snapshot
484 );
485 continue;
486 }
487
488 snapshot_locks.insert(store_snapshot.to_string(), snap_lock);
489
490 let shared_store_lock = datastore.try_shared_chunk_store_lock()?;
491 datastore_locks.push(shared_store_lock);
492
493 let file_list = snapshot_file_hash
494 .entry(media_id.label.uuid.clone())
495 .or_insert_with(Vec::new);
496 file_list.push(file_num);
497
498 task_log!(
499 worker,
500 "found snapshot {} on {}: file {}",
501 snapshot,
502 media_id.label.label_text,
503 file_num
504 );
505 }
506
507 if snapshot_file_hash.is_empty() {
508 task_log!(worker, "nothing to restore, skipping remaining phases...");
509 return Ok(());
510 }
511
512 task_log!(worker, "Phase 1: temporarily restore snapshots to temp dir");
513 let mut datastore_chunk_map: HashMap<String, HashSet<[u8; 32]>> = HashMap::new();
514 for (media_uuid, file_list) in snapshot_file_hash.iter_mut() {
515 let media_id = inventory.lookup_media(media_uuid).unwrap();
516 let (drive, info) = request_and_load_media(
517 &worker,
518 &drive_config,
519 &drive_name,
520 &media_id.label,
521 &email,
522 )?;
523 file_list.sort_unstable();
524 restore_snapshots_to_tmpdir(
525 worker.clone(),
526 &base_path,
527 file_list,
528 drive,
529 &info,
530 &media_set_uuid,
531 &mut datastore_chunk_map,
532 ).map_err(|err| format_err!("could not restore snapshots to tmpdir: {}", err))?;
533 }
534
535 // sorted media_uuid => (sorted file_num => (set of digests)))
536 let mut media_file_chunk_map: BTreeMap<Uuid, BTreeMap<u64, HashSet<[u8; 32]>>> = BTreeMap::new();
537
538 for (source_datastore, chunks) in datastore_chunk_map.into_iter() {
539 let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| {
540 format_err!(
541 "could not find mapping for source datastore: {}",
542 source_datastore
543 )
544 })?;
545 for digest in chunks.into_iter() {
546 // we only want to restore chunks that we do not have yet
547 if !datastore.cond_touch_chunk(&digest, false)? {
548 if let Some((uuid, nr)) = catalog.lookup_chunk(&source_datastore, &digest) {
549 let file = media_file_chunk_map.entry(uuid.clone()).or_insert_with(BTreeMap::new);
550 let chunks = file.entry(nr).or_insert_with(HashSet::new);
551 chunks.insert(digest);
552 }
553 }
554 }
555 }
556
557 // we do not need it anymore, saves memory
558 drop(catalog);
559
560 if !media_file_chunk_map.is_empty() {
561 task_log!(worker, "Phase 2: restore chunks to datastores");
562 } else {
563 task_log!(worker, "all chunks exist already, skipping phase 2...");
564 }
565
566 for (media_uuid, file_chunk_map) in media_file_chunk_map.iter_mut() {
567 let media_id = inventory.lookup_media(media_uuid).unwrap();
568 let (mut drive, _info) = request_and_load_media(
569 &worker,
570 &drive_config,
571 &drive_name,
572 &media_id.label,
573 &email,
574 )?;
575 restore_file_chunk_map(worker.clone(), &mut drive, &store_map, file_chunk_map)?;
576 }
577
578 task_log!(
579 worker,
580 "Phase 3: copy snapshots from temp dir to datastores"
581 );
582 for (store_snapshot, _lock) in snapshot_locks.into_iter() {
583 proxmox_lang::try_block!({
584 let mut split = store_snapshot.splitn(2, ':');
585 let source_datastore = split
586 .next()
587 .ok_or_else(|| format_err!("invalid snapshot: {}", store_snapshot))?;
588 let snapshot = split
589 .next()
590 .ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?;
591 let backup_dir: BackupDir = snapshot.parse()?;
592
593 let datastore = store_map
594 .get_datastore(&source_datastore)
595 .ok_or_else(|| format_err!("unexpected source datastore: {}", source_datastore))?;
596
597 let mut tmp_path = base_path.clone();
598 tmp_path.push(&source_datastore);
599 tmp_path.push(snapshot);
600
601 let path = datastore.snapshot_path(&backup_dir);
602
603 for entry in std::fs::read_dir(tmp_path)? {
604 let entry = entry?;
605 let mut new_path = path.clone();
606 new_path.push(entry.file_name());
607 std::fs::copy(entry.path(), new_path)?;
608 }
609 task_log!(worker, "Restore snapshot '{}' done", snapshot);
610 Ok(())
611 }).map_err(|err: Error| format_err!("could not copy {}: {}", store_snapshot, err))?;
612 }
613 Ok(())
614 });
615
616 if res.is_err() {
617 task_warn!(worker, "Error during restore, partially restored snapshots will NOT be cleaned up");
618 }
619
620 match std::fs::remove_dir_all(&base_path) {
621 Ok(()) => {}
622 Err(err) => task_warn!(worker, "error cleaning up: {}", err),
623 }
624
625 res
626 }
627
628 fn get_media_set_catalog(
629 inventory: &Inventory,
630 media_set_uuid: &Uuid,
631 ) -> Result<MediaSetCatalog, Error> {
632 let status_path = Path::new(TAPE_STATUS_DIR);
633
634 let members = inventory.compute_media_set_members(media_set_uuid)?;
635 let media_list = members.media_list();
636 let mut catalog = MediaSetCatalog::new();
637
638 for (seq_nr, media_uuid) in media_list.iter().enumerate() {
639 match media_uuid {
640 None => {
641 bail!(
642 "media set {} is incomplete (missing member {}).",
643 media_set_uuid,
644 seq_nr
645 );
646 }
647 Some(media_uuid) => {
648 let media_id = inventory.lookup_media(media_uuid).unwrap();
649 let media_catalog = MediaCatalog::open(status_path, &media_id, false, false)?;
650 catalog.append_catalog(media_catalog)?;
651 }
652 }
653 }
654
655 Ok(catalog)
656 }
657
658 fn restore_snapshots_to_tmpdir(
659 worker: Arc<WorkerTask>,
660 path: &PathBuf,
661 file_list: &[u64],
662 mut drive: Box<dyn TapeDriver>,
663 media_id: &MediaId,
664 media_set_uuid: &Uuid,
665 chunks_list: &mut HashMap<String, HashSet<[u8; 32]>>,
666 ) -> Result<(), Error> {
667 match media_id.media_set_label {
668 None => {
669 bail!(
670 "missing media set label on media {} ({})",
671 media_id.label.label_text,
672 media_id.label.uuid
673 );
674 }
675 Some(ref set) => {
676 if set.uuid != *media_set_uuid {
677 bail!(
678 "wrong media set label on media {} ({} != {})",
679 media_id.label.label_text,
680 media_id.label.uuid,
681 media_set_uuid
682 );
683 }
684 let encrypt_fingerprint = set.encryption_key_fingerprint.clone().map(|fp| {
685 task_log!(worker, "Encryption key fingerprint: {}", fp);
686 (fp, set.uuid.clone())
687 });
688
689 drive.set_encryption(encrypt_fingerprint)?;
690 }
691 }
692
693 for file_num in file_list {
694 let current_file_number = drive.current_file_number()?;
695 if current_file_number != *file_num {
696 task_log!(worker, "was at file {}, moving to {}", current_file_number, file_num);
697 drive.move_to_file(*file_num)?;
698 let current_file_number = drive.current_file_number()?;
699 task_log!(worker, "now at file {}", current_file_number);
700 }
701 let mut reader = drive.read_next_file()?;
702
703 let header: MediaContentHeader = unsafe { reader.read_le_value()? };
704 if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
705 bail!("missing MediaContentHeader");
706 }
707
708 match header.content_magic {
709 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1 => {
710 let header_data = reader.read_exact_allocated(header.size as usize)?;
711
712 let archive_header: SnapshotArchiveHeader = serde_json::from_slice(&header_data)
713 .map_err(|err| {
714 format_err!("unable to parse snapshot archive header - {}", err)
715 })?;
716
717 let source_datastore = archive_header.store;
718 let snapshot = archive_header.snapshot;
719
720 task_log!(
721 worker,
722 "File {}: snapshot archive {}:{}",
723 file_num,
724 source_datastore,
725 snapshot
726 );
727
728 let mut decoder = pxar::decoder::sync::Decoder::from_std(reader)?;
729
730 let mut tmp_path = path.clone();
731 tmp_path.push(&source_datastore);
732 tmp_path.push(snapshot);
733 std::fs::create_dir_all(&tmp_path)?;
734
735 let chunks = chunks_list
736 .entry(source_datastore)
737 .or_insert_with(HashSet::new);
738 let manifest = try_restore_snapshot_archive(worker.clone(), &mut decoder, &tmp_path)?;
739 for item in manifest.files() {
740 let mut archive_path = tmp_path.to_owned();
741 archive_path.push(&item.filename);
742
743 let index: Box<dyn IndexFile> = match archive_type(&item.filename)? {
744 ArchiveType::DynamicIndex => {
745 Box::new(DynamicIndexReader::open(&archive_path)?)
746 }
747 ArchiveType::FixedIndex => {
748 Box::new(FixedIndexReader::open(&archive_path)?)
749 }
750 ArchiveType::Blob => continue,
751 };
752 for i in 0..index.index_count() {
753 if let Some(digest) = index.index_digest(i) {
754 chunks.insert(*digest);
755 }
756 }
757 }
758 }
759 other => bail!("unexpected file type: {:?}", other),
760 }
761 }
762
763 Ok(())
764 }
765
766 fn restore_file_chunk_map(
767 worker: Arc<WorkerTask>,
768 drive: &mut Box<dyn TapeDriver>,
769 store_map: &DataStoreMap,
770 file_chunk_map: &mut BTreeMap<u64, HashSet<[u8; 32]>>,
771 ) -> Result<(), Error> {
772 for (nr, chunk_map) in file_chunk_map.iter_mut() {
773 let current_file_number = drive.current_file_number()?;
774 if current_file_number != *nr {
775 task_log!(worker, "was at file {}, moving to {}", current_file_number, nr);
776 drive.move_to_file(*nr)?;
777 let current_file_number = drive.current_file_number()?;
778 task_log!(worker, "now at file {}", current_file_number);
779 }
780 let mut reader = drive.read_next_file()?;
781 let header: MediaContentHeader = unsafe { reader.read_le_value()? };
782 if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
783 bail!("missing MediaContentHeader");
784 }
785
786 match header.content_magic {
787 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1 => {
788 let header_data = reader.read_exact_allocated(header.size as usize)?;
789
790 let archive_header: ChunkArchiveHeader = serde_json::from_slice(&header_data)
791 .map_err(|err| format_err!("unable to parse chunk archive header - {}", err))?;
792
793 let source_datastore = archive_header.store;
794
795 task_log!(
796 worker,
797 "File {}: chunk archive for datastore '{}'",
798 nr,
799 source_datastore
800 );
801
802 let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| {
803 format_err!("unexpected chunk archive for store: {}", source_datastore)
804 })?;
805
806 let count = restore_partial_chunk_archive(worker.clone(), reader, datastore.clone(), chunk_map)?;
807 task_log!(worker, "restored {} chunks", count);
808 }
809 _ => bail!("unexpected content magic {:?}", header.content_magic),
810 }
811 }
812
813 Ok(())
814 }
815
816 fn restore_partial_chunk_archive<'a>(
817 worker: Arc<WorkerTask>,
818 reader: Box<dyn 'a + TapeRead>,
819 datastore: Arc<DataStore>,
820 chunk_list: &mut HashSet<[u8; 32]>,
821 ) -> Result<usize, Error> {
822 let mut decoder = ChunkArchiveDecoder::new(reader);
823
824 let mut count = 0;
825
826 let start_time = std::time::SystemTime::now();
827 let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
828 let bytes2 = bytes.clone();
829
830 let writer_pool = ParallelHandler::new(
831 "tape restore chunk writer",
832 4,
833 move |(chunk, digest): (DataBlob, [u8; 32])| {
834 if !datastore.cond_touch_chunk(&digest, false)? {
835 bytes2.fetch_add(chunk.raw_size(), std::sync::atomic::Ordering::SeqCst);
836 chunk.verify_crc()?;
837 if chunk.crypt_mode()? == CryptMode::None {
838 chunk.decode(None, Some(&digest))?; // verify digest
839 }
840
841 datastore.insert_chunk(&chunk, &digest)?;
842 }
843 Ok(())
844 },
845 );
846
847 let verify_and_write_channel = writer_pool.channel();
848
849 loop {
850 let (digest, blob) = match decoder.next_chunk()? {
851 Some((digest, blob)) => (digest, blob),
852 None => break,
853 };
854
855 worker.check_abort()?;
856
857 if chunk_list.remove(&digest) {
858 verify_and_write_channel.send((blob, digest.clone()))?;
859 count += 1;
860 }
861
862 if chunk_list.is_empty() {
863 break;
864 }
865 }
866
867 drop(verify_and_write_channel);
868
869 writer_pool.complete()?;
870
871 let elapsed = start_time.elapsed()?.as_secs_f64();
872
873 let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
874
875 task_log!(
876 worker,
877 "restored {} bytes ({:.2} MB/s)",
878 bytes,
879 (bytes as f64) / (1_000_000.0 * elapsed)
880 );
881
882 Ok(count)
883 }
884
885
886 /// Request and restore complete media without using existing catalog (create catalog instead)
887 pub fn request_and_restore_media(
888 worker: Arc<WorkerTask>,
889 media_id: &MediaId,
890 drive_config: &SectionConfigData,
891 drive_name: &str,
892 store_map: &DataStoreMap,
893 checked_chunks_map: &mut HashMap<String, HashSet<[u8;32]>>,
894 restore_owner: &Authid,
895 email: &Option<String>,
896 ) -> Result<(), Error> {
897 let media_set_uuid = match media_id.media_set_label {
898 None => bail!("restore_media: no media set - internal error"),
899 Some(ref set) => &set.uuid,
900 };
901
902 let (mut drive, info) = request_and_load_media(&worker, &drive_config, &drive_name, &media_id.label, email)?;
903
904 match info.media_set_label {
905 None => {
906 bail!("missing media set label on media {} ({})",
907 media_id.label.label_text, media_id.label.uuid);
908 }
909 Some(ref set) => {
910 if &set.uuid != media_set_uuid {
911 bail!("wrong media set label on media {} ({} != {})",
912 media_id.label.label_text, media_id.label.uuid,
913 media_set_uuid);
914 }
915 let encrypt_fingerprint = set.encryption_key_fingerprint.clone()
916 .map(|fp| (fp, set.uuid.clone()));
917
918 drive.set_encryption(encrypt_fingerprint)?;
919 }
920 }
921
922 restore_media(
923 worker,
924 &mut drive,
925 &info,
926 Some((&store_map, restore_owner)),
927 checked_chunks_map,
928 false,
929 )
930 }
931
932 /// Restore complete media content and catalog
933 ///
934 /// Only create the catalog if target is None.
935 pub fn restore_media(
936 worker: Arc<WorkerTask>,
937 drive: &mut Box<dyn TapeDriver>,
938 media_id: &MediaId,
939 target: Option<(&DataStoreMap, &Authid)>,
940 checked_chunks_map: &mut HashMap<String, HashSet<[u8;32]>>,
941 verbose: bool,
942 ) -> Result<(), Error> {
943
944 let status_path = Path::new(TAPE_STATUS_DIR);
945 let mut catalog = MediaCatalog::create_temporary_database(status_path, media_id, false)?;
946
947 loop {
948 let current_file_number = drive.current_file_number()?;
949 let reader = match drive.read_next_file() {
950 Err(BlockReadError::EndOfFile) => {
951 task_log!(worker, "skip unexpected filemark at pos {}", current_file_number);
952 continue;
953 }
954 Err(BlockReadError::EndOfStream) => {
955 task_log!(worker, "detected EOT after {} files", current_file_number);
956 break;
957 }
958 Err(BlockReadError::Error(err)) => {
959 return Err(err.into());
960 }
961 Ok(reader) => reader,
962 };
963
964 restore_archive(worker.clone(), reader, current_file_number, target, &mut catalog, checked_chunks_map, verbose)?;
965 }
966
967 catalog.commit()?;
968
969 MediaCatalog::finish_temporary_database(status_path, &media_id.label.uuid, true)?;
970
971 Ok(())
972 }
973
974 fn restore_archive<'a>(
975 worker: Arc<WorkerTask>,
976 mut reader: Box<dyn 'a + TapeRead>,
977 current_file_number: u64,
978 target: Option<(&DataStoreMap, &Authid)>,
979 catalog: &mut MediaCatalog,
980 checked_chunks_map: &mut HashMap<String, HashSet<[u8;32]>>,
981 verbose: bool,
982 ) -> Result<(), Error> {
983 let header: MediaContentHeader = unsafe { reader.read_le_value()? };
984 if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
985 bail!("missing MediaContentHeader");
986 }
987
988 //println!("Found MediaContentHeader: {:?}", header);
989
990 match header.content_magic {
991 PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0 | PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0 => {
992 bail!("unexpected content magic (label)");
993 }
994 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0 => {
995 bail!("unexpected snapshot archive version (v1.0)");
996 }
997 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1 => {
998 let header_data = reader.read_exact_allocated(header.size as usize)?;
999
1000 let archive_header: SnapshotArchiveHeader = serde_json::from_slice(&header_data)
1001 .map_err(|err| format_err!("unable to parse snapshot archive header - {}", err))?;
1002
1003 let datastore_name = archive_header.store;
1004 let snapshot = archive_header.snapshot;
1005
1006 task_log!(worker, "File {}: snapshot archive {}:{}", current_file_number, datastore_name, snapshot);
1007
1008 let backup_dir: BackupDir = snapshot.parse()?;
1009
1010 if let Some((store_map, authid)) = target.as_ref() {
1011 if let Some(datastore) = store_map.get_datastore(&datastore_name) {
1012 let (owner, _group_lock) =
1013 datastore.create_locked_backup_group(backup_dir.group(), authid)?;
1014 if *authid != &owner {
1015 // only the owner is allowed to create additional snapshots
1016 bail!(
1017 "restore '{}' failed - owner check failed ({} != {})",
1018 snapshot,
1019 authid,
1020 owner
1021 );
1022 }
1023
1024 let (rel_path, is_new, _snap_lock) =
1025 datastore.create_locked_backup_dir(&backup_dir)?;
1026 let mut path = datastore.base_path();
1027 path.push(rel_path);
1028
1029 if is_new {
1030 task_log!(worker, "restore snapshot {}", backup_dir);
1031
1032 match restore_snapshot_archive(worker.clone(), reader, &path) {
1033 Err(err) => {
1034 std::fs::remove_dir_all(&path)?;
1035 bail!("restore snapshot {} failed - {}", backup_dir, err);
1036 }
1037 Ok(false) => {
1038 std::fs::remove_dir_all(&path)?;
1039 task_log!(worker, "skip incomplete snapshot {}", backup_dir);
1040 }
1041 Ok(true) => {
1042 catalog.register_snapshot(
1043 Uuid::from(header.uuid),
1044 current_file_number,
1045 &datastore_name,
1046 &snapshot,
1047 )?;
1048 catalog.commit_if_large()?;
1049 }
1050 }
1051 return Ok(());
1052 }
1053 } else {
1054 task_log!(worker, "skipping...");
1055 }
1056 }
1057
1058 reader.skip_data()?; // read all data
1059 if let Ok(false) = reader.is_incomplete() {
1060 catalog.register_snapshot(Uuid::from(header.uuid), current_file_number, &datastore_name, &snapshot)?;
1061 catalog.commit_if_large()?;
1062 }
1063 }
1064 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0 => {
1065 bail!("unexpected chunk archive version (v1.0)");
1066 }
1067 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1 => {
1068 let header_data = reader.read_exact_allocated(header.size as usize)?;
1069
1070 let archive_header: ChunkArchiveHeader = serde_json::from_slice(&header_data)
1071 .map_err(|err| format_err!("unable to parse chunk archive header - {}", err))?;
1072
1073 let source_datastore = archive_header.store;
1074
1075 task_log!(worker, "File {}: chunk archive for datastore '{}'", current_file_number, source_datastore);
1076 let datastore = target
1077 .as_ref()
1078 .and_then(|t| t.0.get_datastore(&source_datastore));
1079
1080 if datastore.is_some() || target.is_none() {
1081 let checked_chunks = checked_chunks_map
1082 .entry(datastore.as_ref().map(|d| d.name()).unwrap_or("_unused_").to_string())
1083 .or_insert(HashSet::new());
1084
1085 let chunks = if let Some(datastore) = datastore {
1086 restore_chunk_archive(worker.clone(), reader, datastore, checked_chunks, verbose)?
1087 } else {
1088 scan_chunk_archive(worker.clone(), reader, verbose)?
1089 };
1090
1091 if let Some(chunks) = chunks {
1092 catalog.register_chunk_archive(
1093 Uuid::from(header.uuid),
1094 current_file_number,
1095 &source_datastore,
1096 &chunks[..],
1097 )?;
1098 task_log!(worker, "register {} chunks", chunks.len());
1099 catalog.commit_if_large()?;
1100 }
1101 return Ok(());
1102 } else if target.is_some() {
1103 task_log!(worker, "skipping...");
1104 }
1105
1106 reader.skip_data()?; // read all data
1107 }
1108 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0 => {
1109 let header_data = reader.read_exact_allocated(header.size as usize)?;
1110
1111 let archive_header: CatalogArchiveHeader = serde_json::from_slice(&header_data)
1112 .map_err(|err| format_err!("unable to parse catalog archive header - {}", err))?;
1113
1114 task_log!(worker, "File {}: skip catalog '{}'", current_file_number, archive_header.uuid);
1115
1116 reader.skip_data()?; // read all data
1117 }
1118 _ => bail!("unknown content magic {:?}", header.content_magic),
1119 }
1120
1121 Ok(())
1122 }
1123
1124 // Read chunk archive without restoring data - just record contained chunks
1125 fn scan_chunk_archive<'a>(
1126 worker: Arc<WorkerTask>,
1127 reader: Box<dyn 'a + TapeRead>,
1128 verbose: bool,
1129 ) -> Result<Option<Vec<[u8;32]>>, Error> {
1130
1131 let mut chunks = Vec::new();
1132
1133 let mut decoder = ChunkArchiveDecoder::new(reader);
1134
1135 loop {
1136 let digest = match decoder.next_chunk() {
1137 Ok(Some((digest, _blob))) => digest,
1138 Ok(None) => break,
1139 Err(err) => {
1140 let reader = decoder.reader();
1141
1142 // check if this stream is marked incomplete
1143 if let Ok(true) = reader.is_incomplete() {
1144 return Ok(Some(chunks));
1145 }
1146
1147 // check if this is an aborted stream without end marker
1148 if let Ok(false) = reader.has_end_marker() {
1149 task_log!(worker, "missing stream end marker");
1150 return Ok(None);
1151 }
1152
1153 // else the archive is corrupt
1154 return Err(err);
1155 }
1156 };
1157
1158 worker.check_abort()?;
1159
1160 if verbose {
1161 task_log!(worker, "Found chunk: {}", hex::encode(&digest));
1162 }
1163
1164 chunks.push(digest);
1165 }
1166
1167 Ok(Some(chunks))
1168 }
1169
1170 fn restore_chunk_archive<'a>(
1171 worker: Arc<WorkerTask>,
1172 reader: Box<dyn 'a + TapeRead>,
1173 datastore: Arc<DataStore>,
1174 checked_chunks: &mut HashSet<[u8;32]>,
1175 verbose: bool,
1176 ) -> Result<Option<Vec<[u8;32]>>, Error> {
1177
1178 let mut chunks = Vec::new();
1179
1180 let mut decoder = ChunkArchiveDecoder::new(reader);
1181
1182 let datastore2 = datastore.clone();
1183 let start_time = std::time::SystemTime::now();
1184 let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
1185 let bytes2 = bytes.clone();
1186
1187 let worker2 = worker.clone();
1188
1189 let writer_pool = ParallelHandler::new(
1190 "tape restore chunk writer",
1191 4,
1192 move |(chunk, digest): (DataBlob, [u8; 32])| {
1193 let chunk_exists = datastore2.cond_touch_chunk(&digest, false)?;
1194 if !chunk_exists {
1195 if verbose {
1196 task_log!(worker2, "Insert chunk: {}", hex::encode(&digest));
1197 }
1198 bytes2.fetch_add(chunk.raw_size(), std::sync::atomic::Ordering::SeqCst);
1199 // println!("verify and write {}", hex::encode(&digest));
1200 chunk.verify_crc()?;
1201 if chunk.crypt_mode()? == CryptMode::None {
1202 chunk.decode(None, Some(&digest))?; // verify digest
1203 }
1204
1205 datastore2.insert_chunk(&chunk, &digest)?;
1206 } else if verbose {
1207 task_log!(worker2, "Found existing chunk: {}", hex::encode(&digest));
1208 }
1209 Ok(())
1210 },
1211 );
1212
1213 let verify_and_write_channel = writer_pool.channel();
1214
1215
1216 loop {
1217 let (digest, blob) = match decoder.next_chunk() {
1218 Ok(Some((digest, blob))) => (digest, blob),
1219 Ok(None) => break,
1220 Err(err) => {
1221 let reader = decoder.reader();
1222
1223 // check if this stream is marked incomplete
1224 if let Ok(true) = reader.is_incomplete() {
1225 return Ok(Some(chunks));
1226 }
1227
1228 // check if this is an aborted stream without end marker
1229 if let Ok(false) = reader.has_end_marker() {
1230 task_log!(worker, "missing stream end marker");
1231 return Ok(None);
1232 }
1233
1234 // else the archive is corrupt
1235 return Err(err);
1236 }
1237 };
1238
1239 worker.check_abort()?;
1240
1241 if !checked_chunks.contains(&digest) {
1242 verify_and_write_channel.send((blob, digest.clone()))?;
1243 checked_chunks.insert(digest.clone());
1244 }
1245 chunks.push(digest);
1246 }
1247
1248 drop(verify_and_write_channel);
1249
1250 writer_pool.complete()?;
1251
1252 let elapsed = start_time.elapsed()?.as_secs_f64();
1253
1254 let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
1255
1256 task_log!(
1257 worker,
1258 "restored {} bytes ({:.2} MB/s)",
1259 bytes,
1260 (bytes as f64) / (1_000_000.0 * elapsed)
1261 );
1262
1263 Ok(Some(chunks))
1264 }
1265
1266 fn restore_snapshot_archive<'a>(
1267 worker: Arc<WorkerTask>,
1268 reader: Box<dyn 'a + TapeRead>,
1269 snapshot_path: &Path,
1270 ) -> Result<bool, Error> {
1271
1272 let mut decoder = pxar::decoder::sync::Decoder::from_std(reader)?;
1273 match try_restore_snapshot_archive(worker, &mut decoder, snapshot_path) {
1274 Ok(_) => Ok(true),
1275 Err(err) => {
1276 let reader = decoder.input();
1277
1278 // check if this stream is marked incomplete
1279 if let Ok(true) = reader.is_incomplete() {
1280 return Ok(false);
1281 }
1282
1283 // check if this is an aborted stream without end marker
1284 if let Ok(false) = reader.has_end_marker() {
1285 return Ok(false);
1286 }
1287
1288 // else the archive is corrupt
1289 Err(err)
1290 }
1291 }
1292 }
1293
1294 fn try_restore_snapshot_archive<R: pxar::decoder::SeqRead>(
1295 worker: Arc<WorkerTask>,
1296 decoder: &mut pxar::decoder::sync::Decoder<R>,
1297 snapshot_path: &Path,
1298 ) -> Result<BackupManifest, Error> {
1299
1300 let _root = match decoder.next() {
1301 None => bail!("missing root entry"),
1302 Some(root) => {
1303 let root = root?;
1304 match root.kind() {
1305 pxar::EntryKind::Directory => { /* Ok */ }
1306 _ => bail!("wrong root entry type"),
1307 }
1308 root
1309 }
1310 };
1311
1312 let root_path = Path::new("/");
1313 let manifest_file_name = OsStr::new(MANIFEST_BLOB_NAME);
1314
1315 let mut manifest = None;
1316
1317 loop {
1318 worker.check_abort()?;
1319
1320 let entry = match decoder.next() {
1321 None => break,
1322 Some(entry) => entry?,
1323 };
1324 let entry_path = entry.path();
1325
1326 match entry.kind() {
1327 pxar::EntryKind::File { .. } => { /* Ok */ }
1328 _ => bail!("wrong entry type for {:?}", entry_path),
1329 }
1330 match entry_path.parent() {
1331 None => bail!("wrong parent for {:?}", entry_path),
1332 Some(p) => {
1333 if p != root_path {
1334 bail!("wrong parent for {:?}", entry_path);
1335 }
1336 }
1337 }
1338
1339 let filename = entry.file_name();
1340 let mut contents = match decoder.contents() {
1341 None => bail!("missing file content"),
1342 Some(contents) => contents,
1343 };
1344
1345 let mut archive_path = snapshot_path.to_owned();
1346 archive_path.push(&filename);
1347
1348 let mut tmp_path = archive_path.clone();
1349 tmp_path.set_extension("tmp");
1350
1351 if filename == manifest_file_name {
1352
1353 let blob = DataBlob::load_from_reader(&mut contents)?;
1354 let mut old_manifest = BackupManifest::try_from(blob)?;
1355
1356 // Remove verify_state to indicate that this snapshot is not verified
1357 old_manifest.unprotected
1358 .as_object_mut()
1359 .map(|m| m.remove("verify_state"));
1360
1361 let old_manifest = serde_json::to_string_pretty(&old_manifest)?;
1362 let blob = DataBlob::encode(old_manifest.as_bytes(), None, true)?;
1363
1364 let options = CreateOptions::new();
1365 replace_file(&tmp_path, blob.raw_data(), options, false)?;
1366
1367 manifest = Some(BackupManifest::try_from(blob)?);
1368 } else {
1369 let mut tmpfile = std::fs::OpenOptions::new()
1370 .write(true)
1371 .create(true)
1372 .read(true)
1373 .open(&tmp_path)
1374 .map_err(|err| format_err!("restore {:?} failed - {}", tmp_path, err))?;
1375
1376 std::io::copy(&mut contents, &mut tmpfile)?;
1377
1378 if let Err(err) = std::fs::rename(&tmp_path, &archive_path) {
1379 bail!("Atomic rename file {:?} failed - {}", archive_path, err);
1380 }
1381 }
1382 }
1383
1384 let manifest = match manifest {
1385 None => bail!("missing manifest"),
1386 Some(manifest) => manifest,
1387 };
1388
1389 // Do not verify anything here, because this would be to slow (causes tape stops).
1390
1391 // commit manifest
1392 let mut manifest_path = snapshot_path.to_owned();
1393 manifest_path.push(MANIFEST_BLOB_NAME);
1394 let mut tmp_manifest_path = manifest_path.clone();
1395 tmp_manifest_path.set_extension("tmp");
1396
1397 if let Err(err) = std::fs::rename(&tmp_manifest_path, &manifest_path) {
1398 bail!("Atomic rename manifest {:?} failed - {}", manifest_path, err);
1399 }
1400
1401 Ok(manifest)
1402 }
1403
1404 /// Try to restore media catalogs (form catalog_archives)
1405 pub fn fast_catalog_restore(
1406 worker: &WorkerTask,
1407 drive: &mut Box<dyn TapeDriver>,
1408 media_set: &MediaSet,
1409 uuid: &Uuid, // current media Uuid
1410 ) -> Result<bool, Error> {
1411
1412 let status_path = Path::new(TAPE_STATUS_DIR);
1413
1414 let current_file_number = drive.current_file_number()?;
1415 if current_file_number != 2 {
1416 bail!("fast_catalog_restore: wrong media position - internal error");
1417 }
1418
1419 let mut found_catalog = false;
1420
1421 let mut moved_to_eom = false;
1422
1423 loop {
1424 let current_file_number = drive.current_file_number()?;
1425
1426 { // limit reader scope
1427 let mut reader = match drive.read_next_file() {
1428 Err(BlockReadError::EndOfFile) => {
1429 task_log!(worker, "skip unexpected filemark at pos {}", current_file_number);
1430 continue;
1431 }
1432 Err(BlockReadError::EndOfStream) => {
1433 task_log!(worker, "detected EOT after {} files", current_file_number);
1434 break;
1435 }
1436 Err(BlockReadError::Error(err)) => {
1437 return Err(err.into());
1438 }
1439 Ok(reader) => reader,
1440 };
1441
1442 let header: MediaContentHeader = unsafe { reader.read_le_value()? };
1443 if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
1444 bail!("missing MediaContentHeader");
1445 }
1446
1447 if header.content_magic == PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0 {
1448 task_log!(worker, "found catalog at pos {}", current_file_number);
1449
1450 let header_data = reader.read_exact_allocated(header.size as usize)?;
1451
1452 let archive_header: CatalogArchiveHeader = serde_json::from_slice(&header_data)
1453 .map_err(|err| format_err!("unable to parse catalog archive header - {}", err))?;
1454
1455 if &archive_header.media_set_uuid != media_set.uuid() {
1456 task_log!(worker, "skipping unrelated catalog at pos {}", current_file_number);
1457 reader.skip_data()?; // read all data
1458 continue;
1459 }
1460
1461 let catalog_uuid = &archive_header.uuid;
1462
1463 let wanted = media_set
1464 .media_list()
1465 .iter()
1466 .find(|e| {
1467 match e {
1468 None => false,
1469 Some(uuid) => uuid == catalog_uuid,
1470 }
1471 })
1472 .is_some();
1473
1474 if !wanted {
1475 task_log!(worker, "skip catalog because media '{}' not inventarized", catalog_uuid);
1476 reader.skip_data()?; // read all data
1477 continue;
1478 }
1479
1480 if catalog_uuid == uuid {
1481 // always restore and overwrite catalog
1482 } else {
1483 // only restore if catalog does not exist
1484 if MediaCatalog::exists(status_path, catalog_uuid) {
1485 task_log!(worker, "catalog for media '{}' already exists", catalog_uuid);
1486 reader.skip_data()?; // read all data
1487 continue;
1488 }
1489 }
1490
1491 let mut file = MediaCatalog::create_temporary_database_file(status_path, catalog_uuid)?;
1492
1493 std::io::copy(&mut reader, &mut file)?;
1494
1495 file.seek(SeekFrom::Start(0))?;
1496
1497 match MediaCatalog::parse_catalog_header(&mut file)? {
1498 (true, Some(media_uuid), Some(media_set_uuid)) => {
1499 if &media_uuid != catalog_uuid {
1500 task_log!(worker, "catalog uuid missmatch at pos {}", current_file_number);
1501 continue;
1502 }
1503 if media_set_uuid != archive_header.media_set_uuid {
1504 task_log!(worker, "catalog media_set missmatch at pos {}", current_file_number);
1505 continue;
1506 }
1507
1508 MediaCatalog::finish_temporary_database(status_path, &media_uuid, true)?;
1509
1510 if catalog_uuid == uuid {
1511 task_log!(worker, "successfully restored catalog");
1512 found_catalog = true
1513 } else {
1514 task_log!(worker, "successfully restored related catalog {}", media_uuid);
1515 }
1516 }
1517 _ => {
1518 task_warn!(worker, "got incomplete catalog header - skip file");
1519 continue;
1520 }
1521 }
1522
1523 continue;
1524 }
1525 }
1526
1527 if moved_to_eom {
1528 break; // already done - stop
1529 }
1530 moved_to_eom = true;
1531
1532 task_log!(worker, "searching for catalog at EOT (moving to EOT)");
1533 drive.move_to_last_file()?;
1534
1535 let new_file_number = drive.current_file_number()?;
1536
1537 if new_file_number < (current_file_number + 1) {
1538 break; // no new content - stop
1539 }
1540 }
1541
1542 Ok(found_catalog)
1543 }