]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/tape/restore.rs
61e17d1ba566f5eeec14b32909b4cbf182b9178c
[proxmox-backup.git] / src / api2 / tape / restore.rs
1 use std::path::{Path, PathBuf};
2 use std::ffi::OsStr;
3 use std::collections::{HashMap, HashSet, BTreeMap};
4 use std::convert::TryFrom;
5 use std::io::{Seek, SeekFrom};
6 use std::sync::Arc;
7
8 use anyhow::{bail, format_err, Error};
9 use serde_json::Value;
10
11 use proxmox::{
12 api::{
13 api,
14 RpcEnvironment,
15 RpcEnvironmentType,
16 Router,
17 Permission,
18 schema::parse_property_string,
19 section_config::SectionConfigData,
20 },
21 tools::{
22 Uuid,
23 io::ReadExt,
24 fs::{
25 replace_file,
26 CreateOptions,
27 },
28 },
29 };
30
31 use pbs_api_types::{
32 Authid, Userid, CryptMode,
33 DATASTORE_MAP_ARRAY_SCHEMA, DATASTORE_MAP_LIST_SCHEMA, DRIVE_NAME_SCHEMA,
34 UPID_SCHEMA, TAPE_RESTORE_SNAPSHOT_SCHEMA,
35 PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_TAPE_READ,
36 };
37 use pbs_datastore::{DataStore, DataBlob};
38 use pbs_datastore::backup_info::BackupDir;
39 use pbs_datastore::dynamic_index::DynamicIndexReader;
40 use pbs_datastore::fixed_index::FixedIndexReader;
41 use pbs_datastore::index::IndexFile;
42 use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, MANIFEST_BLOB_NAME};
43 use pbs_config::CachedUserInfo;
44 use pbs_tape::{
45 TapeRead, BlockReadError, MediaContentHeader,
46 PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0,
47 };
48 use pbs_tools::{task_log, task_warn, task::WorkerTaskContext};
49 use proxmox_rest_server::WorkerTask;
50
51 use crate::{
52 tools::ParallelHandler,
53 server::lookup_user_email,
54 tape::{
55 TAPE_STATUS_DIR,
56 MediaId,
57 MediaSet,
58 MediaCatalog,
59 MediaSetCatalog,
60 Inventory,
61 lock_media_set,
62 file_formats::{
63 PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0,
64 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0,
65 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1,
66 PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0,
67 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0,
68 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1,
69 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0,
70 ChunkArchiveHeader,
71 ChunkArchiveDecoder,
72 SnapshotArchiveHeader,
73 CatalogArchiveHeader,
74 },
75 drive::{
76 TapeDriver,
77 request_and_load_media,
78 lock_tape_device,
79 set_tape_device_state,
80 },
81 },
82 };
83
84 const RESTORE_TMP_DIR: &str = "/var/tmp/proxmox-backup";
85
86 pub struct DataStoreMap {
87 map: HashMap<String, Arc<DataStore>>,
88 default: Option<Arc<DataStore>>,
89 }
90
91 impl TryFrom<String> for DataStoreMap {
92 type Error = Error;
93
94 fn try_from(value: String) -> Result<Self, Error> {
95 let value = parse_property_string(&value, &DATASTORE_MAP_ARRAY_SCHEMA)?;
96 let mut mapping: Vec<String> = value
97 .as_array()
98 .unwrap()
99 .iter()
100 .map(|v| v.as_str().unwrap().to_string())
101 .collect();
102
103 let mut map = HashMap::new();
104 let mut default = None;
105 while let Some(mut store) = mapping.pop() {
106 if let Some(index) = store.find('=') {
107 let mut target = store.split_off(index);
108 target.remove(0); // remove '='
109 let datastore = DataStore::lookup_datastore(&target)?;
110 map.insert(store, datastore);
111 } else if default.is_none() {
112 default = Some(DataStore::lookup_datastore(&store)?);
113 } else {
114 bail!("multiple default stores given");
115 }
116 }
117
118 Ok(Self { map, default })
119 }
120 }
121
122 impl DataStoreMap {
123 fn used_datastores<'a>(&self) -> HashSet<&str> {
124 let mut set = HashSet::new();
125 for store in self.map.values() {
126 set.insert(store.name());
127 }
128
129 if let Some(ref store) = self.default {
130 set.insert(store.name());
131 }
132
133 set
134 }
135
136 fn get_datastore(&self, source: &str) -> Option<Arc<DataStore>> {
137 if let Some(store) = self.map.get(source) {
138 return Some(Arc::clone(store));
139 }
140 if let Some(ref store) = self.default {
141 return Some(Arc::clone(store));
142 }
143
144 return None;
145 }
146 }
147
148 fn check_datastore_privs(
149 user_info: &CachedUserInfo,
150 store: &str,
151 auth_id: &Authid,
152 owner: &Option<Authid>,
153 ) -> Result<(), Error> {
154 let privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
155 if (privs & PRIV_DATASTORE_BACKUP) == 0 {
156 bail!("no permissions on /datastore/{}", store);
157 }
158
159 if let Some(ref owner) = owner {
160 let correct_owner = owner == auth_id
161 || (owner.is_token() && !auth_id.is_token() && owner.user() == auth_id.user());
162
163 // same permission as changing ownership after syncing
164 if !correct_owner && privs & PRIV_DATASTORE_MODIFY == 0 {
165 bail!("no permission to restore as '{}'", owner);
166 }
167 }
168
169 Ok(())
170 }
171
172 pub const ROUTER: Router = Router::new().post(&API_METHOD_RESTORE);
173
174 #[api(
175 input: {
176 properties: {
177 store: {
178 schema: DATASTORE_MAP_LIST_SCHEMA,
179 },
180 drive: {
181 schema: DRIVE_NAME_SCHEMA,
182 },
183 "media-set": {
184 description: "Media set UUID.",
185 type: String,
186 },
187 "notify-user": {
188 type: Userid,
189 optional: true,
190 },
191 "snapshots": {
192 description: "List of snapshots.",
193 type: Array,
194 optional: true,
195 items: {
196 schema: TAPE_RESTORE_SNAPSHOT_SCHEMA,
197 },
198 },
199 owner: {
200 type: Authid,
201 optional: true,
202 },
203 },
204 },
205 returns: {
206 schema: UPID_SCHEMA,
207 },
208 access: {
209 // Note: parameters are no uri parameter, so we need to test inside function body
210 description: "The user needs Tape.Read privilege on /tape/pool/{pool} \
211 and /tape/drive/{drive}, Datastore.Backup privilege on /datastore/{store}.",
212 permission: &Permission::Anybody,
213 },
214 )]
215 /// Restore data from media-set
216 pub fn restore(
217 store: String,
218 drive: String,
219 media_set: String,
220 notify_user: Option<Userid>,
221 snapshots: Option<Vec<String>>,
222 owner: Option<Authid>,
223 rpcenv: &mut dyn RpcEnvironment,
224 ) -> Result<Value, Error> {
225 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
226 let user_info = CachedUserInfo::new()?;
227
228 let store_map = DataStoreMap::try_from(store)
229 .map_err(|err| format_err!("cannot parse store mapping: {}", err))?;
230 let used_datastores = store_map.used_datastores();
231 if used_datastores.len() == 0 {
232 bail!("no datastores given");
233 }
234
235 for store in used_datastores.iter() {
236 check_datastore_privs(&user_info, &store, &auth_id, &owner)?;
237 }
238
239 let privs = user_info.lookup_privs(&auth_id, &["tape", "drive", &drive]);
240 if (privs & PRIV_TAPE_READ) == 0 {
241 bail!("no permissions on /tape/drive/{}", drive);
242 }
243
244 let media_set_uuid = media_set.parse()?;
245
246 let status_path = Path::new(TAPE_STATUS_DIR);
247
248 let _lock = lock_media_set(status_path, &media_set_uuid, None)?;
249
250 let inventory = Inventory::load(status_path)?;
251
252 let pool = inventory.lookup_media_set_pool(&media_set_uuid)?;
253
254 let privs = user_info.lookup_privs(&auth_id, &["tape", "pool", &pool]);
255 if (privs & PRIV_TAPE_READ) == 0 {
256 bail!("no permissions on /tape/pool/{}", pool);
257 }
258
259 let (drive_config, _digest) = pbs_config::drive::config()?;
260
261 // early check/lock before starting worker
262 let drive_lock = lock_tape_device(&drive_config, &drive)?;
263
264 let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
265
266 let taskid = used_datastores
267 .iter()
268 .map(|s| s.to_string())
269 .collect::<Vec<String>>()
270 .join(", ");
271
272 let upid_str = WorkerTask::new_thread(
273 "tape-restore",
274 Some(taskid),
275 auth_id.to_string(),
276 to_stdout,
277 move |worker| {
278 let _drive_lock = drive_lock; // keep lock guard
279
280 set_tape_device_state(&drive, &worker.upid().to_string())?;
281
282 let restore_owner = owner.as_ref().unwrap_or(&auth_id);
283
284 let email = notify_user
285 .as_ref()
286 .and_then(|userid| lookup_user_email(userid))
287 .or_else(|| lookup_user_email(&auth_id.clone().into()));
288
289 task_log!(worker, "Mediaset '{}'", media_set);
290 task_log!(worker, "Pool: {}", pool);
291
292 let res = if let Some(snapshots) = snapshots {
293 restore_list_worker(
294 worker.clone(),
295 snapshots,
296 inventory,
297 media_set_uuid,
298 drive_config,
299 &drive,
300 store_map,
301 restore_owner,
302 email,
303 )
304 } else {
305 restore_full_worker(
306 worker.clone(),
307 inventory,
308 media_set_uuid,
309 drive_config,
310 &drive,
311 store_map,
312 restore_owner,
313 email,
314 )
315 };
316
317 if res.is_ok() {
318 task_log!(worker, "Restore mediaset '{}' done", media_set);
319 }
320
321 if let Err(err) = set_tape_device_state(&drive, "") {
322 task_log!(
323 worker,
324 "could not unset drive state for {}: {}",
325 drive,
326 err
327 );
328 }
329
330 res
331 }
332 )?;
333
334 Ok(upid_str.into())
335 }
336
337 fn restore_full_worker(
338 worker: Arc<WorkerTask>,
339 inventory: Inventory,
340 media_set_uuid: Uuid,
341 drive_config: SectionConfigData,
342 drive_name: &str,
343 store_map: DataStoreMap,
344 restore_owner: &Authid,
345 email: Option<String>,
346 ) -> Result<(), Error> {
347 let members = inventory.compute_media_set_members(&media_set_uuid)?;
348
349 let media_list = members.media_list();
350
351 let mut media_id_list = Vec::new();
352
353 let mut encryption_key_fingerprint = None;
354
355 for (seq_nr, media_uuid) in media_list.iter().enumerate() {
356 match media_uuid {
357 None => {
358 bail!("media set {} is incomplete (missing member {}).", media_set_uuid, seq_nr);
359 }
360 Some(media_uuid) => {
361 let media_id = inventory.lookup_media(media_uuid).unwrap();
362 if let Some(ref set) = media_id.media_set_label { // always true here
363 if encryption_key_fingerprint.is_none() && set.encryption_key_fingerprint.is_some() {
364 encryption_key_fingerprint = set.encryption_key_fingerprint.clone();
365 }
366 }
367 media_id_list.push(media_id);
368 }
369 }
370 }
371
372 if let Some(fingerprint) = encryption_key_fingerprint {
373 task_log!(worker, "Encryption key fingerprint: {}", fingerprint);
374 }
375
376 task_log!(
377 worker,
378 "Datastore(s): {}",
379 store_map
380 .used_datastores()
381 .into_iter()
382 .map(String::from)
383 .collect::<Vec<String>>()
384 .join(", "),
385 );
386
387 task_log!(worker, "Drive: {}", drive_name);
388 task_log!(
389 worker,
390 "Required media list: {}",
391 media_id_list.iter()
392 .map(|media_id| media_id.label.label_text.as_str())
393 .collect::<Vec<&str>>()
394 .join(";")
395 );
396
397 let mut datastore_locks = Vec::new();
398 for store_name in store_map.used_datastores() {
399 // explicit create shared lock to prevent GC on newly created chunks
400 if let Some(store) = store_map.get_datastore(store_name) {
401 let shared_store_lock = store.try_shared_chunk_store_lock()?;
402 datastore_locks.push(shared_store_lock);
403 }
404 }
405
406 let mut checked_chunks_map = HashMap::new();
407
408 for media_id in media_id_list.iter() {
409 request_and_restore_media(
410 worker.clone(),
411 media_id,
412 &drive_config,
413 drive_name,
414 &store_map,
415 &mut checked_chunks_map,
416 restore_owner,
417 &email,
418 )?;
419 }
420
421 Ok(())
422 }
423
424 fn restore_list_worker(
425 worker: Arc<WorkerTask>,
426 snapshots: Vec<String>,
427 inventory: Inventory,
428 media_set_uuid: Uuid,
429 drive_config: SectionConfigData,
430 drive_name: &str,
431 store_map: DataStoreMap,
432 restore_owner: &Authid,
433 email: Option<String>,
434 ) -> Result<(), Error> {
435 let base_path: PathBuf = format!("{}/{}", RESTORE_TMP_DIR, media_set_uuid).into();
436 std::fs::create_dir_all(&base_path)?;
437
438 let catalog = get_media_set_catalog(&inventory, &media_set_uuid)?;
439
440 let mut datastore_locks = Vec::new();
441 let mut snapshot_file_hash: BTreeMap<Uuid, Vec<u64>> = BTreeMap::new();
442 let mut snapshot_locks = HashMap::new();
443
444 let res = proxmox::try_block!({
445 // assemble snapshot files/locks
446 for store_snapshot in snapshots.iter() {
447 let mut split = store_snapshot.splitn(2, ':');
448 let source_datastore = split
449 .next()
450 .ok_or_else(|| format_err!("invalid snapshot: {}", store_snapshot))?;
451 let snapshot = split
452 .next()
453 .ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?;
454 let backup_dir: BackupDir = snapshot.parse()?;
455
456 let datastore = store_map.get_datastore(source_datastore).ok_or_else(|| {
457 format_err!(
458 "could not find mapping for source datastore: {}",
459 source_datastore
460 )
461 })?;
462
463 let (owner, _group_lock) =
464 datastore.create_locked_backup_group(backup_dir.group(), &restore_owner)?;
465 if restore_owner != &owner {
466 // only the owner is allowed to create additional snapshots
467 bail!(
468 "restore '{}' failed - owner check failed ({} != {})",
469 snapshot,
470 restore_owner,
471 owner
472 );
473 }
474
475 let (media_id, file_num) = if let Some((media_uuid, file_num)) =
476 catalog.lookup_snapshot(&source_datastore, &snapshot)
477 {
478 let media_id = inventory.lookup_media(media_uuid).unwrap();
479 (media_id, file_num)
480 } else {
481 task_warn!(
482 worker,
483 "did not find snapshot '{}' in media set {}",
484 snapshot,
485 media_set_uuid
486 );
487 continue;
488 };
489
490 let (_rel_path, is_new, snap_lock) = datastore.create_locked_backup_dir(&backup_dir)?;
491
492 if !is_new {
493 task_log!(
494 worker,
495 "found snapshot {} on target datastore, skipping...",
496 snapshot
497 );
498 continue;
499 }
500
501 snapshot_locks.insert(store_snapshot.to_string(), snap_lock);
502
503 let shared_store_lock = datastore.try_shared_chunk_store_lock()?;
504 datastore_locks.push(shared_store_lock);
505
506 let file_list = snapshot_file_hash
507 .entry(media_id.label.uuid.clone())
508 .or_insert_with(Vec::new);
509 file_list.push(file_num);
510
511 task_log!(
512 worker,
513 "found snapshot {} on {}: file {}",
514 snapshot,
515 media_id.label.label_text,
516 file_num
517 );
518 }
519
520 if snapshot_file_hash.is_empty() {
521 task_log!(worker, "nothing to restore, skipping remaining phases...");
522 return Ok(());
523 }
524
525 task_log!(worker, "Phase 1: temporarily restore snapshots to temp dir");
526 let mut datastore_chunk_map: HashMap<String, HashSet<[u8; 32]>> = HashMap::new();
527 for (media_uuid, file_list) in snapshot_file_hash.iter_mut() {
528 let media_id = inventory.lookup_media(media_uuid).unwrap();
529 let (drive, info) = request_and_load_media(
530 &worker,
531 &drive_config,
532 &drive_name,
533 &media_id.label,
534 &email,
535 )?;
536 file_list.sort_unstable();
537 restore_snapshots_to_tmpdir(
538 worker.clone(),
539 &base_path,
540 file_list,
541 drive,
542 &info,
543 &media_set_uuid,
544 &mut datastore_chunk_map,
545 ).map_err(|err| format_err!("could not restore snapshots to tmpdir: {}", err))?;
546 }
547
548 // sorted media_uuid => (sorted file_num => (set of digests)))
549 let mut media_file_chunk_map: BTreeMap<Uuid, BTreeMap<u64, HashSet<[u8; 32]>>> = BTreeMap::new();
550
551 for (source_datastore, chunks) in datastore_chunk_map.into_iter() {
552 let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| {
553 format_err!(
554 "could not find mapping for source datastore: {}",
555 source_datastore
556 )
557 })?;
558 for digest in chunks.into_iter() {
559 // we only want to restore chunks that we do not have yet
560 if !datastore.cond_touch_chunk(&digest, false)? {
561 if let Some((uuid, nr)) = catalog.lookup_chunk(&source_datastore, &digest) {
562 let file = media_file_chunk_map.entry(uuid.clone()).or_insert_with(BTreeMap::new);
563 let chunks = file.entry(nr).or_insert_with(HashSet::new);
564 chunks.insert(digest);
565 }
566 }
567 }
568 }
569
570 // we do not need it anymore, saves memory
571 drop(catalog);
572
573 if !media_file_chunk_map.is_empty() {
574 task_log!(worker, "Phase 2: restore chunks to datastores");
575 } else {
576 task_log!(worker, "all chunks exist already, skipping phase 2...");
577 }
578
579 for (media_uuid, file_chunk_map) in media_file_chunk_map.iter_mut() {
580 let media_id = inventory.lookup_media(media_uuid).unwrap();
581 let (mut drive, _info) = request_and_load_media(
582 &worker,
583 &drive_config,
584 &drive_name,
585 &media_id.label,
586 &email,
587 )?;
588 restore_file_chunk_map(worker.clone(), &mut drive, &store_map, file_chunk_map)?;
589 }
590
591 task_log!(
592 worker,
593 "Phase 3: copy snapshots from temp dir to datastores"
594 );
595 for (store_snapshot, _lock) in snapshot_locks.into_iter() {
596 proxmox::try_block!({
597 let mut split = store_snapshot.splitn(2, ':');
598 let source_datastore = split
599 .next()
600 .ok_or_else(|| format_err!("invalid snapshot: {}", store_snapshot))?;
601 let snapshot = split
602 .next()
603 .ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?;
604 let backup_dir: BackupDir = snapshot.parse()?;
605
606 let datastore = store_map
607 .get_datastore(&source_datastore)
608 .ok_or_else(|| format_err!("unexpected source datastore: {}", source_datastore))?;
609
610 let mut tmp_path = base_path.clone();
611 tmp_path.push(&source_datastore);
612 tmp_path.push(snapshot);
613
614 let path = datastore.snapshot_path(&backup_dir);
615
616 for entry in std::fs::read_dir(tmp_path)? {
617 let entry = entry?;
618 let mut new_path = path.clone();
619 new_path.push(entry.file_name());
620 std::fs::copy(entry.path(), new_path)?;
621 }
622 task_log!(worker, "Restore snapshot '{}' done", snapshot);
623 Ok(())
624 }).map_err(|err: Error| format_err!("could not copy {}: {}", store_snapshot, err))?;
625 }
626 Ok(())
627 });
628
629 if res.is_err() {
630 task_warn!(worker, "Error during restore, partially restored snapshots will NOT be cleaned up");
631 }
632
633 match std::fs::remove_dir_all(&base_path) {
634 Ok(()) => {}
635 Err(err) => task_warn!(worker, "error cleaning up: {}", err),
636 }
637
638 res
639 }
640
641 fn get_media_set_catalog(
642 inventory: &Inventory,
643 media_set_uuid: &Uuid,
644 ) -> Result<MediaSetCatalog, Error> {
645 let status_path = Path::new(TAPE_STATUS_DIR);
646
647 let members = inventory.compute_media_set_members(media_set_uuid)?;
648 let media_list = members.media_list();
649 let mut catalog = MediaSetCatalog::new();
650
651 for (seq_nr, media_uuid) in media_list.iter().enumerate() {
652 match media_uuid {
653 None => {
654 bail!(
655 "media set {} is incomplete (missing member {}).",
656 media_set_uuid,
657 seq_nr
658 );
659 }
660 Some(media_uuid) => {
661 let media_id = inventory.lookup_media(media_uuid).unwrap();
662 let media_catalog = MediaCatalog::open(status_path, &media_id, false, false)?;
663 catalog.append_catalog(media_catalog)?;
664 }
665 }
666 }
667
668 Ok(catalog)
669 }
670
671 fn restore_snapshots_to_tmpdir(
672 worker: Arc<WorkerTask>,
673 path: &PathBuf,
674 file_list: &[u64],
675 mut drive: Box<dyn TapeDriver>,
676 media_id: &MediaId,
677 media_set_uuid: &Uuid,
678 chunks_list: &mut HashMap<String, HashSet<[u8; 32]>>,
679 ) -> Result<(), Error> {
680 match media_id.media_set_label {
681 None => {
682 bail!(
683 "missing media set label on media {} ({})",
684 media_id.label.label_text,
685 media_id.label.uuid
686 );
687 }
688 Some(ref set) => {
689 if set.uuid != *media_set_uuid {
690 bail!(
691 "wrong media set label on media {} ({} != {})",
692 media_id.label.label_text,
693 media_id.label.uuid,
694 media_set_uuid
695 );
696 }
697 let encrypt_fingerprint = set.encryption_key_fingerprint.clone().map(|fp| {
698 task_log!(worker, "Encryption key fingerprint: {}", fp);
699 (fp, set.uuid.clone())
700 });
701
702 drive.set_encryption(encrypt_fingerprint)?;
703 }
704 }
705
706 for file_num in file_list {
707 let current_file_number = drive.current_file_number()?;
708 if current_file_number != *file_num {
709 task_log!(worker, "was at file {}, moving to {}", current_file_number, file_num);
710 drive.move_to_file(*file_num)?;
711 let current_file_number = drive.current_file_number()?;
712 task_log!(worker, "now at file {}", current_file_number);
713 }
714 let mut reader = drive.read_next_file()?;
715
716 let header: MediaContentHeader = unsafe { reader.read_le_value()? };
717 if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
718 bail!("missing MediaContentHeader");
719 }
720
721 match header.content_magic {
722 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1 => {
723 let header_data = reader.read_exact_allocated(header.size as usize)?;
724
725 let archive_header: SnapshotArchiveHeader = serde_json::from_slice(&header_data)
726 .map_err(|err| {
727 format_err!("unable to parse snapshot archive header - {}", err)
728 })?;
729
730 let source_datastore = archive_header.store;
731 let snapshot = archive_header.snapshot;
732
733 task_log!(
734 worker,
735 "File {}: snapshot archive {}:{}",
736 file_num,
737 source_datastore,
738 snapshot
739 );
740
741 let mut decoder = pxar::decoder::sync::Decoder::from_std(reader)?;
742
743 let mut tmp_path = path.clone();
744 tmp_path.push(&source_datastore);
745 tmp_path.push(snapshot);
746 std::fs::create_dir_all(&tmp_path)?;
747
748 let chunks = chunks_list
749 .entry(source_datastore)
750 .or_insert_with(HashSet::new);
751 let manifest = try_restore_snapshot_archive(worker.clone(), &mut decoder, &tmp_path)?;
752 for item in manifest.files() {
753 let mut archive_path = tmp_path.to_owned();
754 archive_path.push(&item.filename);
755
756 let index: Box<dyn IndexFile> = match archive_type(&item.filename)? {
757 ArchiveType::DynamicIndex => {
758 Box::new(DynamicIndexReader::open(&archive_path)?)
759 }
760 ArchiveType::FixedIndex => {
761 Box::new(FixedIndexReader::open(&archive_path)?)
762 }
763 ArchiveType::Blob => continue,
764 };
765 for i in 0..index.index_count() {
766 if let Some(digest) = index.index_digest(i) {
767 chunks.insert(*digest);
768 }
769 }
770 }
771 }
772 other => bail!("unexpected file type: {:?}", other),
773 }
774 }
775
776 Ok(())
777 }
778
779 fn restore_file_chunk_map(
780 worker: Arc<WorkerTask>,
781 drive: &mut Box<dyn TapeDriver>,
782 store_map: &DataStoreMap,
783 file_chunk_map: &mut BTreeMap<u64, HashSet<[u8; 32]>>,
784 ) -> Result<(), Error> {
785 for (nr, chunk_map) in file_chunk_map.iter_mut() {
786 let current_file_number = drive.current_file_number()?;
787 if current_file_number != *nr {
788 task_log!(worker, "was at file {}, moving to {}", current_file_number, nr);
789 drive.move_to_file(*nr)?;
790 let current_file_number = drive.current_file_number()?;
791 task_log!(worker, "now at file {}", current_file_number);
792 }
793 let mut reader = drive.read_next_file()?;
794 let header: MediaContentHeader = unsafe { reader.read_le_value()? };
795 if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
796 bail!("missing MediaContentHeader");
797 }
798
799 match header.content_magic {
800 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1 => {
801 let header_data = reader.read_exact_allocated(header.size as usize)?;
802
803 let archive_header: ChunkArchiveHeader = serde_json::from_slice(&header_data)
804 .map_err(|err| format_err!("unable to parse chunk archive header - {}", err))?;
805
806 let source_datastore = archive_header.store;
807
808 task_log!(
809 worker,
810 "File {}: chunk archive for datastore '{}'",
811 nr,
812 source_datastore
813 );
814
815 let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| {
816 format_err!("unexpected chunk archive for store: {}", source_datastore)
817 })?;
818
819 let count = restore_partial_chunk_archive(worker.clone(), reader, datastore.clone(), chunk_map)?;
820 task_log!(worker, "restored {} chunks", count);
821 }
822 _ => bail!("unexpected content magic {:?}", header.content_magic),
823 }
824 }
825
826 Ok(())
827 }
828
829 fn restore_partial_chunk_archive<'a>(
830 worker: Arc<WorkerTask>,
831 reader: Box<dyn 'a + TapeRead>,
832 datastore: Arc<DataStore>,
833 chunk_list: &mut HashSet<[u8; 32]>,
834 ) -> Result<usize, Error> {
835 let mut decoder = ChunkArchiveDecoder::new(reader);
836
837 let mut count = 0;
838
839 let start_time = std::time::SystemTime::now();
840 let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
841 let bytes2 = bytes.clone();
842
843 let writer_pool = ParallelHandler::new(
844 "tape restore chunk writer",
845 4,
846 move |(chunk, digest): (DataBlob, [u8; 32])| {
847 if !datastore.cond_touch_chunk(&digest, false)? {
848 bytes2.fetch_add(chunk.raw_size(), std::sync::atomic::Ordering::SeqCst);
849 chunk.verify_crc()?;
850 if chunk.crypt_mode()? == CryptMode::None {
851 chunk.decode(None, Some(&digest))?; // verify digest
852 }
853
854 datastore.insert_chunk(&chunk, &digest)?;
855 }
856 Ok(())
857 },
858 );
859
860 let verify_and_write_channel = writer_pool.channel();
861
862 loop {
863 let (digest, blob) = match decoder.next_chunk()? {
864 Some((digest, blob)) => (digest, blob),
865 None => break,
866 };
867
868 worker.check_abort()?;
869
870 if chunk_list.remove(&digest) {
871 verify_and_write_channel.send((blob, digest.clone()))?;
872 count += 1;
873 }
874
875 if chunk_list.is_empty() {
876 break;
877 }
878 }
879
880 drop(verify_and_write_channel);
881
882 writer_pool.complete()?;
883
884 let elapsed = start_time.elapsed()?.as_secs_f64();
885
886 let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
887
888 task_log!(
889 worker,
890 "restored {} bytes ({:.2} MB/s)",
891 bytes,
892 (bytes as f64) / (1_000_000.0 * elapsed)
893 );
894
895 Ok(count)
896 }
897
898
899 /// Request and restore complete media without using existing catalog (create catalog instead)
900 pub fn request_and_restore_media(
901 worker: Arc<WorkerTask>,
902 media_id: &MediaId,
903 drive_config: &SectionConfigData,
904 drive_name: &str,
905 store_map: &DataStoreMap,
906 checked_chunks_map: &mut HashMap<String, HashSet<[u8;32]>>,
907 restore_owner: &Authid,
908 email: &Option<String>,
909 ) -> Result<(), Error> {
910 let media_set_uuid = match media_id.media_set_label {
911 None => bail!("restore_media: no media set - internal error"),
912 Some(ref set) => &set.uuid,
913 };
914
915 let (mut drive, info) = request_and_load_media(&worker, &drive_config, &drive_name, &media_id.label, email)?;
916
917 match info.media_set_label {
918 None => {
919 bail!("missing media set label on media {} ({})",
920 media_id.label.label_text, media_id.label.uuid);
921 }
922 Some(ref set) => {
923 if &set.uuid != media_set_uuid {
924 bail!("wrong media set label on media {} ({} != {})",
925 media_id.label.label_text, media_id.label.uuid,
926 media_set_uuid);
927 }
928 let encrypt_fingerprint = set.encryption_key_fingerprint.clone()
929 .map(|fp| (fp, set.uuid.clone()));
930
931 drive.set_encryption(encrypt_fingerprint)?;
932 }
933 }
934
935 restore_media(
936 worker,
937 &mut drive,
938 &info,
939 Some((&store_map, restore_owner)),
940 checked_chunks_map,
941 false,
942 )
943 }
944
945 /// Restore complete media content and catalog
946 ///
947 /// Only create the catalog if target is None.
948 pub fn restore_media(
949 worker: Arc<WorkerTask>,
950 drive: &mut Box<dyn TapeDriver>,
951 media_id: &MediaId,
952 target: Option<(&DataStoreMap, &Authid)>,
953 checked_chunks_map: &mut HashMap<String, HashSet<[u8;32]>>,
954 verbose: bool,
955 ) -> Result<(), Error> {
956
957 let status_path = Path::new(TAPE_STATUS_DIR);
958 let mut catalog = MediaCatalog::create_temporary_database(status_path, media_id, false)?;
959
960 loop {
961 let current_file_number = drive.current_file_number()?;
962 let reader = match drive.read_next_file() {
963 Err(BlockReadError::EndOfFile) => {
964 task_log!(worker, "skip unexpected filemark at pos {}", current_file_number);
965 continue;
966 }
967 Err(BlockReadError::EndOfStream) => {
968 task_log!(worker, "detected EOT after {} files", current_file_number);
969 break;
970 }
971 Err(BlockReadError::Error(err)) => {
972 return Err(err.into());
973 }
974 Ok(reader) => reader,
975 };
976
977 restore_archive(worker.clone(), reader, current_file_number, target, &mut catalog, checked_chunks_map, verbose)?;
978 }
979
980 catalog.commit()?;
981
982 MediaCatalog::finish_temporary_database(status_path, &media_id.label.uuid, true)?;
983
984 Ok(())
985 }
986
987 fn restore_archive<'a>(
988 worker: Arc<WorkerTask>,
989 mut reader: Box<dyn 'a + TapeRead>,
990 current_file_number: u64,
991 target: Option<(&DataStoreMap, &Authid)>,
992 catalog: &mut MediaCatalog,
993 checked_chunks_map: &mut HashMap<String, HashSet<[u8;32]>>,
994 verbose: bool,
995 ) -> Result<(), Error> {
996 let header: MediaContentHeader = unsafe { reader.read_le_value()? };
997 if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
998 bail!("missing MediaContentHeader");
999 }
1000
1001 //println!("Found MediaContentHeader: {:?}", header);
1002
1003 match header.content_magic {
1004 PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0 | PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0 => {
1005 bail!("unexpected content magic (label)");
1006 }
1007 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0 => {
1008 bail!("unexpected snapshot archive version (v1.0)");
1009 }
1010 PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1 => {
1011 let header_data = reader.read_exact_allocated(header.size as usize)?;
1012
1013 let archive_header: SnapshotArchiveHeader = serde_json::from_slice(&header_data)
1014 .map_err(|err| format_err!("unable to parse snapshot archive header - {}", err))?;
1015
1016 let datastore_name = archive_header.store;
1017 let snapshot = archive_header.snapshot;
1018
1019 task_log!(worker, "File {}: snapshot archive {}:{}", current_file_number, datastore_name, snapshot);
1020
1021 let backup_dir: BackupDir = snapshot.parse()?;
1022
1023 if let Some((store_map, authid)) = target.as_ref() {
1024 if let Some(datastore) = store_map.get_datastore(&datastore_name) {
1025 let (owner, _group_lock) =
1026 datastore.create_locked_backup_group(backup_dir.group(), authid)?;
1027 if *authid != &owner {
1028 // only the owner is allowed to create additional snapshots
1029 bail!(
1030 "restore '{}' failed - owner check failed ({} != {})",
1031 snapshot,
1032 authid,
1033 owner
1034 );
1035 }
1036
1037 let (rel_path, is_new, _snap_lock) =
1038 datastore.create_locked_backup_dir(&backup_dir)?;
1039 let mut path = datastore.base_path();
1040 path.push(rel_path);
1041
1042 if is_new {
1043 task_log!(worker, "restore snapshot {}", backup_dir);
1044
1045 match restore_snapshot_archive(worker.clone(), reader, &path) {
1046 Err(err) => {
1047 std::fs::remove_dir_all(&path)?;
1048 bail!("restore snapshot {} failed - {}", backup_dir, err);
1049 }
1050 Ok(false) => {
1051 std::fs::remove_dir_all(&path)?;
1052 task_log!(worker, "skip incomplete snapshot {}", backup_dir);
1053 }
1054 Ok(true) => {
1055 catalog.register_snapshot(
1056 Uuid::from(header.uuid),
1057 current_file_number,
1058 &datastore_name,
1059 &snapshot,
1060 )?;
1061 catalog.commit_if_large()?;
1062 }
1063 }
1064 return Ok(());
1065 }
1066 } else {
1067 task_log!(worker, "skipping...");
1068 }
1069 }
1070
1071 reader.skip_data()?; // read all data
1072 if let Ok(false) = reader.is_incomplete() {
1073 catalog.register_snapshot(Uuid::from(header.uuid), current_file_number, &datastore_name, &snapshot)?;
1074 catalog.commit_if_large()?;
1075 }
1076 }
1077 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0 => {
1078 bail!("unexpected chunk archive version (v1.0)");
1079 }
1080 PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1 => {
1081 let header_data = reader.read_exact_allocated(header.size as usize)?;
1082
1083 let archive_header: ChunkArchiveHeader = serde_json::from_slice(&header_data)
1084 .map_err(|err| format_err!("unable to parse chunk archive header - {}", err))?;
1085
1086 let source_datastore = archive_header.store;
1087
1088 task_log!(worker, "File {}: chunk archive for datastore '{}'", current_file_number, source_datastore);
1089 let datastore = target
1090 .as_ref()
1091 .and_then(|t| t.0.get_datastore(&source_datastore));
1092
1093 if datastore.is_some() || target.is_none() {
1094 let checked_chunks = checked_chunks_map
1095 .entry(datastore.as_ref().map(|d| d.name()).unwrap_or("_unused_").to_string())
1096 .or_insert(HashSet::new());
1097
1098 let chunks = if let Some(datastore) = datastore {
1099 restore_chunk_archive(worker.clone(), reader, datastore, checked_chunks, verbose)?
1100 } else {
1101 scan_chunk_archive(worker.clone(), reader, verbose)?
1102 };
1103
1104 if let Some(chunks) = chunks {
1105 catalog.register_chunk_archive(
1106 Uuid::from(header.uuid),
1107 current_file_number,
1108 &source_datastore,
1109 &chunks[..],
1110 )?;
1111 task_log!(worker, "register {} chunks", chunks.len());
1112 catalog.commit_if_large()?;
1113 }
1114 return Ok(());
1115 } else if target.is_some() {
1116 task_log!(worker, "skipping...");
1117 }
1118
1119 reader.skip_data()?; // read all data
1120 }
1121 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0 => {
1122 let header_data = reader.read_exact_allocated(header.size as usize)?;
1123
1124 let archive_header: CatalogArchiveHeader = serde_json::from_slice(&header_data)
1125 .map_err(|err| format_err!("unable to parse catalog archive header - {}", err))?;
1126
1127 task_log!(worker, "File {}: skip catalog '{}'", current_file_number, archive_header.uuid);
1128
1129 reader.skip_data()?; // read all data
1130 }
1131 _ => bail!("unknown content magic {:?}", header.content_magic),
1132 }
1133
1134 Ok(())
1135 }
1136
1137 // Read chunk archive without restoring data - just record contained chunks
1138 fn scan_chunk_archive<'a>(
1139 worker: Arc<WorkerTask>,
1140 reader: Box<dyn 'a + TapeRead>,
1141 verbose: bool,
1142 ) -> Result<Option<Vec<[u8;32]>>, Error> {
1143
1144 let mut chunks = Vec::new();
1145
1146 let mut decoder = ChunkArchiveDecoder::new(reader);
1147
1148 loop {
1149 let digest = match decoder.next_chunk() {
1150 Ok(Some((digest, _blob))) => digest,
1151 Ok(None) => break,
1152 Err(err) => {
1153 let reader = decoder.reader();
1154
1155 // check if this stream is marked incomplete
1156 if let Ok(true) = reader.is_incomplete() {
1157 return Ok(Some(chunks));
1158 }
1159
1160 // check if this is an aborted stream without end marker
1161 if let Ok(false) = reader.has_end_marker() {
1162 task_log!(worker, "missing stream end marker");
1163 return Ok(None);
1164 }
1165
1166 // else the archive is corrupt
1167 return Err(err);
1168 }
1169 };
1170
1171 worker.check_abort()?;
1172
1173 if verbose {
1174 task_log!(worker, "Found chunk: {}", proxmox::tools::digest_to_hex(&digest));
1175 }
1176
1177 chunks.push(digest);
1178 }
1179
1180 Ok(Some(chunks))
1181 }
1182
1183 fn restore_chunk_archive<'a>(
1184 worker: Arc<WorkerTask>,
1185 reader: Box<dyn 'a + TapeRead>,
1186 datastore: Arc<DataStore>,
1187 checked_chunks: &mut HashSet<[u8;32]>,
1188 verbose: bool,
1189 ) -> Result<Option<Vec<[u8;32]>>, Error> {
1190
1191 let mut chunks = Vec::new();
1192
1193 let mut decoder = ChunkArchiveDecoder::new(reader);
1194
1195 let datastore2 = datastore.clone();
1196 let start_time = std::time::SystemTime::now();
1197 let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
1198 let bytes2 = bytes.clone();
1199
1200 let worker2 = worker.clone();
1201
1202 let writer_pool = ParallelHandler::new(
1203 "tape restore chunk writer",
1204 4,
1205 move |(chunk, digest): (DataBlob, [u8; 32])| {
1206 let chunk_exists = datastore2.cond_touch_chunk(&digest, false)?;
1207 if !chunk_exists {
1208 if verbose {
1209 task_log!(worker2, "Insert chunk: {}", proxmox::tools::digest_to_hex(&digest));
1210 }
1211 bytes2.fetch_add(chunk.raw_size(), std::sync::atomic::Ordering::SeqCst);
1212 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
1213 chunk.verify_crc()?;
1214 if chunk.crypt_mode()? == CryptMode::None {
1215 chunk.decode(None, Some(&digest))?; // verify digest
1216 }
1217
1218 datastore2.insert_chunk(&chunk, &digest)?;
1219 } else if verbose {
1220 task_log!(worker2, "Found existing chunk: {}", proxmox::tools::digest_to_hex(&digest));
1221 }
1222 Ok(())
1223 },
1224 );
1225
1226 let verify_and_write_channel = writer_pool.channel();
1227
1228
1229 loop {
1230 let (digest, blob) = match decoder.next_chunk() {
1231 Ok(Some((digest, blob))) => (digest, blob),
1232 Ok(None) => break,
1233 Err(err) => {
1234 let reader = decoder.reader();
1235
1236 // check if this stream is marked incomplete
1237 if let Ok(true) = reader.is_incomplete() {
1238 return Ok(Some(chunks));
1239 }
1240
1241 // check if this is an aborted stream without end marker
1242 if let Ok(false) = reader.has_end_marker() {
1243 task_log!(worker, "missing stream end marker");
1244 return Ok(None);
1245 }
1246
1247 // else the archive is corrupt
1248 return Err(err);
1249 }
1250 };
1251
1252 worker.check_abort()?;
1253
1254 if !checked_chunks.contains(&digest) {
1255 verify_and_write_channel.send((blob, digest.clone()))?;
1256 checked_chunks.insert(digest.clone());
1257 }
1258 chunks.push(digest);
1259 }
1260
1261 drop(verify_and_write_channel);
1262
1263 writer_pool.complete()?;
1264
1265 let elapsed = start_time.elapsed()?.as_secs_f64();
1266
1267 let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
1268
1269 task_log!(
1270 worker,
1271 "restored {} bytes ({:.2} MB/s)",
1272 bytes,
1273 (bytes as f64) / (1_000_000.0 * elapsed)
1274 );
1275
1276 Ok(Some(chunks))
1277 }
1278
1279 fn restore_snapshot_archive<'a>(
1280 worker: Arc<WorkerTask>,
1281 reader: Box<dyn 'a + TapeRead>,
1282 snapshot_path: &Path,
1283 ) -> Result<bool, Error> {
1284
1285 let mut decoder = pxar::decoder::sync::Decoder::from_std(reader)?;
1286 match try_restore_snapshot_archive(worker, &mut decoder, snapshot_path) {
1287 Ok(_) => Ok(true),
1288 Err(err) => {
1289 let reader = decoder.input();
1290
1291 // check if this stream is marked incomplete
1292 if let Ok(true) = reader.is_incomplete() {
1293 return Ok(false);
1294 }
1295
1296 // check if this is an aborted stream without end marker
1297 if let Ok(false) = reader.has_end_marker() {
1298 return Ok(false);
1299 }
1300
1301 // else the archive is corrupt
1302 Err(err)
1303 }
1304 }
1305 }
1306
1307 fn try_restore_snapshot_archive<R: pxar::decoder::SeqRead>(
1308 worker: Arc<WorkerTask>,
1309 decoder: &mut pxar::decoder::sync::Decoder<R>,
1310 snapshot_path: &Path,
1311 ) -> Result<BackupManifest, Error> {
1312
1313 let _root = match decoder.next() {
1314 None => bail!("missing root entry"),
1315 Some(root) => {
1316 let root = root?;
1317 match root.kind() {
1318 pxar::EntryKind::Directory => { /* Ok */ }
1319 _ => bail!("wrong root entry type"),
1320 }
1321 root
1322 }
1323 };
1324
1325 let root_path = Path::new("/");
1326 let manifest_file_name = OsStr::new(MANIFEST_BLOB_NAME);
1327
1328 let mut manifest = None;
1329
1330 loop {
1331 worker.check_abort()?;
1332
1333 let entry = match decoder.next() {
1334 None => break,
1335 Some(entry) => entry?,
1336 };
1337 let entry_path = entry.path();
1338
1339 match entry.kind() {
1340 pxar::EntryKind::File { .. } => { /* Ok */ }
1341 _ => bail!("wrong entry type for {:?}", entry_path),
1342 }
1343 match entry_path.parent() {
1344 None => bail!("wrong parent for {:?}", entry_path),
1345 Some(p) => {
1346 if p != root_path {
1347 bail!("wrong parent for {:?}", entry_path);
1348 }
1349 }
1350 }
1351
1352 let filename = entry.file_name();
1353 let mut contents = match decoder.contents() {
1354 None => bail!("missing file content"),
1355 Some(contents) => contents,
1356 };
1357
1358 let mut archive_path = snapshot_path.to_owned();
1359 archive_path.push(&filename);
1360
1361 let mut tmp_path = archive_path.clone();
1362 tmp_path.set_extension("tmp");
1363
1364 if filename == manifest_file_name {
1365
1366 let blob = DataBlob::load_from_reader(&mut contents)?;
1367 let mut old_manifest = BackupManifest::try_from(blob)?;
1368
1369 // Remove verify_state to indicate that this snapshot is not verified
1370 old_manifest.unprotected
1371 .as_object_mut()
1372 .map(|m| m.remove("verify_state"));
1373
1374 let old_manifest = serde_json::to_string_pretty(&old_manifest)?;
1375 let blob = DataBlob::encode(old_manifest.as_bytes(), None, true)?;
1376
1377 let options = CreateOptions::new();
1378 replace_file(&tmp_path, blob.raw_data(), options)?;
1379
1380 manifest = Some(BackupManifest::try_from(blob)?);
1381 } else {
1382 let mut tmpfile = std::fs::OpenOptions::new()
1383 .write(true)
1384 .create(true)
1385 .read(true)
1386 .open(&tmp_path)
1387 .map_err(|err| format_err!("restore {:?} failed - {}", tmp_path, err))?;
1388
1389 std::io::copy(&mut contents, &mut tmpfile)?;
1390
1391 if let Err(err) = std::fs::rename(&tmp_path, &archive_path) {
1392 bail!("Atomic rename file {:?} failed - {}", archive_path, err);
1393 }
1394 }
1395 }
1396
1397 let manifest = match manifest {
1398 None => bail!("missing manifest"),
1399 Some(manifest) => manifest,
1400 };
1401
1402 // Do not verify anything here, because this would be to slow (causes tape stops).
1403
1404 // commit manifest
1405 let mut manifest_path = snapshot_path.to_owned();
1406 manifest_path.push(MANIFEST_BLOB_NAME);
1407 let mut tmp_manifest_path = manifest_path.clone();
1408 tmp_manifest_path.set_extension("tmp");
1409
1410 if let Err(err) = std::fs::rename(&tmp_manifest_path, &manifest_path) {
1411 bail!("Atomic rename manifest {:?} failed - {}", manifest_path, err);
1412 }
1413
1414 Ok(manifest)
1415 }
1416
1417 /// Try to restore media catalogs (form catalog_archives)
1418 pub fn fast_catalog_restore(
1419 worker: &WorkerTask,
1420 drive: &mut Box<dyn TapeDriver>,
1421 media_set: &MediaSet,
1422 uuid: &Uuid, // current media Uuid
1423 ) -> Result<bool, Error> {
1424
1425 let status_path = Path::new(TAPE_STATUS_DIR);
1426
1427 let current_file_number = drive.current_file_number()?;
1428 if current_file_number != 2 {
1429 bail!("fast_catalog_restore: wrong media position - internal error");
1430 }
1431
1432 let mut found_catalog = false;
1433
1434 let mut moved_to_eom = false;
1435
1436 loop {
1437 let current_file_number = drive.current_file_number()?;
1438
1439 { // limit reader scope
1440 let mut reader = match drive.read_next_file() {
1441 Err(BlockReadError::EndOfFile) => {
1442 task_log!(worker, "skip unexpected filemark at pos {}", current_file_number);
1443 continue;
1444 }
1445 Err(BlockReadError::EndOfStream) => {
1446 task_log!(worker, "detected EOT after {} files", current_file_number);
1447 break;
1448 }
1449 Err(BlockReadError::Error(err)) => {
1450 return Err(err.into());
1451 }
1452 Ok(reader) => reader,
1453 };
1454
1455 let header: MediaContentHeader = unsafe { reader.read_le_value()? };
1456 if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
1457 bail!("missing MediaContentHeader");
1458 }
1459
1460 if header.content_magic == PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0 {
1461 task_log!(worker, "found catalog at pos {}", current_file_number);
1462
1463 let header_data = reader.read_exact_allocated(header.size as usize)?;
1464
1465 let archive_header: CatalogArchiveHeader = serde_json::from_slice(&header_data)
1466 .map_err(|err| format_err!("unable to parse catalog archive header - {}", err))?;
1467
1468 if &archive_header.media_set_uuid != media_set.uuid() {
1469 task_log!(worker, "skipping unrelated catalog at pos {}", current_file_number);
1470 reader.skip_data()?; // read all data
1471 continue;
1472 }
1473
1474 let catalog_uuid = &archive_header.uuid;
1475
1476 let wanted = media_set
1477 .media_list()
1478 .iter()
1479 .find(|e| {
1480 match e {
1481 None => false,
1482 Some(uuid) => uuid == catalog_uuid,
1483 }
1484 })
1485 .is_some();
1486
1487 if !wanted {
1488 task_log!(worker, "skip catalog because media '{}' not inventarized", catalog_uuid);
1489 reader.skip_data()?; // read all data
1490 continue;
1491 }
1492
1493 if catalog_uuid == uuid {
1494 // always restore and overwrite catalog
1495 } else {
1496 // only restore if catalog does not exist
1497 if MediaCatalog::exists(status_path, catalog_uuid) {
1498 task_log!(worker, "catalog for media '{}' already exists", catalog_uuid);
1499 reader.skip_data()?; // read all data
1500 continue;
1501 }
1502 }
1503
1504 let mut file = MediaCatalog::create_temporary_database_file(status_path, catalog_uuid)?;
1505
1506 std::io::copy(&mut reader, &mut file)?;
1507
1508 file.seek(SeekFrom::Start(0))?;
1509
1510 match MediaCatalog::parse_catalog_header(&mut file)? {
1511 (true, Some(media_uuid), Some(media_set_uuid)) => {
1512 if &media_uuid != catalog_uuid {
1513 task_log!(worker, "catalog uuid missmatch at pos {}", current_file_number);
1514 continue;
1515 }
1516 if media_set_uuid != archive_header.media_set_uuid {
1517 task_log!(worker, "catalog media_set missmatch at pos {}", current_file_number);
1518 continue;
1519 }
1520
1521 MediaCatalog::finish_temporary_database(status_path, &media_uuid, true)?;
1522
1523 if catalog_uuid == uuid {
1524 task_log!(worker, "successfully restored catalog");
1525 found_catalog = true
1526 } else {
1527 task_log!(worker, "successfully restored related catalog {}", media_uuid);
1528 }
1529 }
1530 _ => {
1531 task_warn!(worker, "got incomplete catalog header - skip file");
1532 continue;
1533 }
1534 }
1535
1536 continue;
1537 }
1538 }
1539
1540 if moved_to_eom {
1541 break; // already done - stop
1542 }
1543 moved_to_eom = true;
1544
1545 task_log!(worker, "searching for catalog at EOT (moving to EOT)");
1546 drive.move_to_last_file()?;
1547
1548 let new_file_number = drive.current_file_number()?;
1549
1550 if new_file_number < (current_file_number + 1) {
1551 break; // no new content - stop
1552 }
1553 }
1554
1555 Ok(found_catalog)
1556 }