2 pub use catalog_set
::*;
4 mod new_chunks_iterator
;
5 pub use new_chunks_iterator
::*;
7 use std
::collections
::HashSet
;
10 use std
::sync
::{Arc, Mutex}
;
11 use std
::time
::SystemTime
;
13 use anyhow
::{bail, Error}
;
15 use proxmox_sys
::{task_log, task_warn}
;
16 use proxmox_uuid
::Uuid
;
18 use pbs_config
::tape_encryption_keys
::load_key_configs
;
19 use pbs_datastore
::{DataStore, SnapshotReader}
;
20 use pbs_tape
::{sg_tape::tape_alert_flags_critical, TapeWrite}
;
21 use proxmox_rest_server
::WorkerTask
;
24 drive
::{media_changer, request_and_load_media, TapeDriver}
,
26 tape_write_catalog
, tape_write_snapshot_archive
, ChunkArchiveWriter
, MediaSetLabel
,
28 MediaCatalog
, MediaId
, MediaPool
, COMMIT_BLOCK_SIZE
, MAX_CHUNK_ARCHIVE_SIZE
, TAPE_STATUS_DIR
,
31 use super::file_formats
::{
32 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
, PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1
,
35 struct PoolWriterState
{
36 drive
: Box
<dyn TapeDriver
>,
37 // Media Uuid from loaded media
39 // tell if we already moved to EOM
41 // bytes written after the last tape fush/sync
45 /// Helper to manage a backup job, writing several tapes of a pool
46 pub struct PoolWriter
{
49 status
: Option
<PoolWriterState
>,
50 catalog_set
: Arc
<Mutex
<CatalogSet
>>,
51 notify_email
: Option
<String
>,
53 used_tapes
: HashSet
<Uuid
>,
61 notify_email
: Option
<String
>,
62 force_media_set
: bool
,
64 ) -> Result
<Self, Error
> {
65 let current_time
= proxmox_time
::epoch_i64();
67 let new_media_set_reason
= pool
.start_write_session(current_time
, force_media_set
)?
;
68 if let Some(reason
) = new_media_set_reason
{
69 task_log
!(worker
, "starting new media set - reason: {}", reason
,);
72 let media_set_uuid
= pool
.current_media_set().uuid();
73 task_log
!(worker
, "media set uuid: {}", media_set_uuid
);
75 let mut catalog_set
= CatalogSet
::new();
77 // load all catalogs read-only at start
78 for media_uuid
in pool
.current_media_list()?
{
79 let media_info
= pool
.lookup_media(media_uuid
).unwrap();
81 MediaCatalog
::open(Path
::new(TAPE_STATUS_DIR
), media_info
.id(), false, false)?
;
82 catalog_set
.append_read_only_catalog(media_catalog
)?
;
87 drive_name
: drive_name
.to_string(),
89 catalog_set
: Arc
::new(Mutex
::new(catalog_set
)),
92 used_tapes
: HashSet
::new(),
96 pub fn pool(&mut self) -> &mut MediaPool
{
100 /// Set media status to FULL (persistent - stores pool status)
101 pub fn set_media_status_full(&mut self, uuid
: &Uuid
) -> Result
<(), Error
> {
102 self.pool
.set_media_status_full(uuid
)?
;
106 pub fn get_used_media_labels(&self) -> Result
<Vec
<String
>, Error
> {
107 let mut res
= Vec
::with_capacity(self.used_tapes
.len());
108 for media_uuid
in &self.used_tapes
{
109 let media_info
= self.pool
.lookup_media(&media_uuid
)?
;
110 res
.push(media_info
.label_text().to_string());
116 pub fn contains_snapshot(
119 ns
: &pbs_api_types
::BackupNamespace
,
120 snapshot
: &pbs_api_types
::BackupDir
,
125 .contains_snapshot(store
, ns
, snapshot
)
128 /// Eject media and drop PoolWriterState (close drive)
129 pub fn eject_media(&mut self, worker
: &WorkerTask
) -> Result
<(), Error
> {
130 let mut status
= match self.status
.take() {
131 Some(status
) => status
,
132 None
=> return Ok(()), // no media loaded
135 let (drive_config
, _digest
) = pbs_config
::drive
::config()?
;
137 if let Some((mut changer
, _
)) = media_changer(&drive_config
, &self.drive_name
)?
{
138 task_log
!(worker
, "eject media");
139 status
.drive
.eject_media()?
; // rewind and eject early, so that unload_media is faster
140 drop(status
); // close drive
141 task_log
!(worker
, "unload media");
142 changer
.unload_media(None
)?
; //eject and unload
144 task_log
!(worker
, "standalone drive - ejecting media");
145 status
.drive
.eject_media()?
;
151 /// Export current media set and drop PoolWriterState (close drive)
152 pub fn export_media_set(&mut self, worker
: &WorkerTask
) -> Result
<(), Error
> {
153 let mut status
= self.status
.take();
155 let (drive_config
, _digest
) = pbs_config
::drive
::config()?
;
157 if let Some((mut changer
, _
)) = media_changer(&drive_config
, &self.drive_name
)?
{
158 if let Some(ref mut status
) = status
{
159 task_log
!(worker
, "rewind media");
160 // rewind first so that the unload command later does not run into a timeout
161 status
.drive
.rewind()?
;
163 drop(status
); // close drive
165 for media_uuid
in self.pool
.current_media_list()?
{
166 let media
= self.pool
.lookup_media(media_uuid
)?
;
167 let label_text
= media
.label_text();
168 if let Some(slot
) = changer
.export_media(label_text
)?
{
171 "exported media '{}' to import/export slot {}",
178 "export failed - media '{}' is not online or in different drive",
183 } else if let Some(mut status
) = status
{
186 "standalone drive - ejecting media instead of export"
188 status
.drive
.eject_media()?
;
194 /// commit changes to tape and catalog
196 /// This is done automatically during a backupsession, but needs to
197 /// be called explicitly before dropping the PoolWriter
198 pub fn commit(&mut self) -> Result
<(), Error
> {
199 if let Some(PoolWriterState { ref mut drive, .. }
) = self.status
{
200 drive
.sync()?
; // sync all data to the tape
202 self.catalog_set
.lock().unwrap().commit()?
; // then commit the catalog
206 /// Load a writable media into the drive
207 pub fn load_writable_media(&mut self, worker
: &WorkerTask
) -> Result
<Uuid
, Error
> {
208 let last_media_uuid
= match self.status
{
209 Some(PoolWriterState { ref media_uuid, .. }
) => Some(media_uuid
.clone()),
213 let current_time
= proxmox_time
::epoch_i64();
214 let media_uuid
= self.pool
.alloc_writable_media(current_time
)?
;
216 let media
= self.pool
.lookup_media(&media_uuid
).unwrap();
218 let media_changed
= match last_media_uuid
{
219 Some(ref last_media_uuid
) => last_media_uuid
!= &media_uuid
,
224 self.used_tapes
.insert(media_uuid
.clone());
225 return Ok(media_uuid
);
230 "allocated new writable media '{}'",
234 if let Some(PoolWriterState { mut drive, .. }
) = self.status
.take() {
235 if last_media_uuid
.is_some() {
236 task_log
!(worker
, "eject current media");
237 drive
.eject_media()?
;
241 let (drive_config
, _digest
) = pbs_config
::drive
::config()?
;
243 let (mut drive
, old_media_id
) = request_and_load_media(
251 // test for critical tape alert flags
252 if let Ok(alert_flags
) = drive
.tape_alert_flags() {
253 if !alert_flags
.is_empty() {
254 task_log
!(worker
, "TapeAlertFlags: {:?}", alert_flags
);
255 if tape_alert_flags_critical(alert_flags
) {
256 self.pool
.set_media_status_damaged(&media_uuid
)?
;
258 "aborting due to critical tape alert flags: {:?}",
265 let (catalog
, is_new_media
) = update_media_set_label(
268 old_media_id
.media_set_label
,
272 self.catalog_set
.lock().unwrap().append_catalog(catalog
)?
;
274 let media_set
= media
.media_set_label().clone().unwrap();
276 let encrypt_fingerprint
= media_set
277 .encryption_key_fingerprint
279 .map(|fp
| (fp
, media_set
.uuid
.clone()));
281 drive
.set_encryption(encrypt_fingerprint
)?
;
283 self.status
= Some(PoolWriterState
{
285 media_uuid
: media_uuid
.clone(),
291 // add catalogs from previous media
292 self.append_media_set_catalogs(worker
)?
;
295 self.used_tapes
.insert(media_uuid
.clone());
299 fn open_catalog_file(uuid
: &Uuid
) -> Result
<File
, Error
> {
300 let status_path
= Path
::new(TAPE_STATUS_DIR
);
301 let mut path
= status_path
.to_owned();
302 path
.push(uuid
.to_string());
303 path
.set_extension("log");
305 let file
= std
::fs
::OpenOptions
::new().read(true).open(&path
)?
;
310 // Check it tape is loaded, then move to EOM (if not already there)
312 // Returns the tape position at EOM.
313 fn prepare_tape_write(status
: &mut PoolWriterState
, worker
: &WorkerTask
) -> Result
<u64, Error
> {
315 task_log
!(worker
, "moving to end of media");
316 status
.drive
.move_to_eom(true)?
;
317 status
.at_eom
= true;
318 task_log
!(worker
, "arrived at end of media");
321 let current_file_number
= status
.drive
.current_file_number()?
;
322 if current_file_number
< 2 {
324 "got strange file position number from drive ({})",
329 Ok(current_file_number
)
332 /// Move to EOM (if not already there), then write the current
333 /// catalog to the tape. On success, this return 'Ok(true)'.
335 /// Please note that this may fail when there is not enough space
336 /// on the media (return value 'Ok(false, _)'). In that case, the
337 /// archive is marked incomplete. The caller should mark the media
338 /// as full and try again using another media.
339 pub fn append_catalog_archive(&mut self, worker
: &WorkerTask
) -> Result
<bool
, Error
> {
340 let catalog_magic
= self.catalog_version();
342 let status
= match self.status
{
343 Some(ref mut status
) => status
,
344 None
=> bail
!("PoolWriter - no media loaded"),
347 Self::prepare_tape_write(status
, worker
)?
;
349 let catalog_set
= self.catalog_set
.lock().unwrap();
351 let catalog
= match catalog_set
.catalog
{
352 None
=> bail
!("append_catalog_archive failed: no catalog - internal error"),
353 Some(ref catalog
) => catalog
,
356 let media_set
= self.pool
.current_media_set();
358 let media_list
= media_set
.media_list();
359 let uuid
= match media_list
.last() {
360 None
=> bail
!("got empty media list - internal error"),
361 Some(None
) => bail
!("got incomplete media list - internal error"),
362 Some(Some(last_uuid
)) => {
363 if last_uuid
!= catalog
.uuid() {
364 bail
!("got wrong media - internal error");
370 let seq_nr
= media_list
.len() - 1;
372 let mut writer
: Box
<dyn TapeWrite
> = status
.drive
.write_file()?
;
374 let mut file
= Self::open_catalog_file(uuid
)?
;
376 let done
= tape_write_catalog(
389 // Append catalogs for all previous media in set (without last)
390 fn append_media_set_catalogs(&mut self, worker
: &WorkerTask
) -> Result
<(), Error
> {
391 let media_set
= self.pool
.current_media_set();
393 let mut media_list
= &media_set
.media_list()[..];
394 if media_list
.len() < 2 {
397 media_list
= &media_list
[..(media_list
.len() - 1)];
399 let catalog_magic
= self.catalog_version();
401 let status
= match self.status
{
402 Some(ref mut status
) => status
,
403 None
=> bail
!("PoolWriter - no media loaded"),
406 Self::prepare_tape_write(status
, worker
)?
;
408 for (seq_nr
, uuid
) in media_list
.iter().enumerate() {
409 let uuid
= match uuid
{
410 None
=> bail
!("got incomplete media list - internal error"),
414 let mut writer
: Box
<dyn TapeWrite
> = status
.drive
.write_file()?
;
416 let mut file
= Self::open_catalog_file(uuid
)?
;
418 task_log
!(worker
, "write catalog for previous media: {}", uuid
);
420 if tape_write_catalog(
430 bail
!("got EOM while writing start catalog");
437 /// Move to EOM (if not already there), then creates a new snapshot
438 /// archive writing specified files (as .pxar) into it. On
439 /// success, this return 'Ok(true)' and the media catalog gets
442 /// Please note that this may fail when there is not enough space
443 /// on the media (return value 'Ok(false, _)'). In that case, the
444 /// archive is marked incomplete, and we do not use it. The caller
445 /// should mark the media as full and try again using another
447 pub fn append_snapshot_archive(
450 snapshot_reader
: &SnapshotReader
,
451 ) -> Result
<(bool
, usize), Error
> {
452 let status
= match self.status
{
453 Some(ref mut status
) => status
,
454 None
=> bail
!("PoolWriter - no media loaded"),
457 let current_file_number
= Self::prepare_tape_write(status
, worker
)?
;
459 let (done
, bytes_written
) = {
460 let mut writer
: Box
<dyn TapeWrite
> = status
.drive
.write_file()?
;
462 match tape_write_snapshot_archive(writer
.as_mut(), snapshot_reader
)?
{
463 Some(content_uuid
) => {
464 self.catalog_set
.lock().unwrap().register_snapshot(
467 snapshot_reader
.datastore_name(),
468 snapshot_reader
.snapshot().backup_ns(),
469 snapshot_reader
.snapshot().as_ref(),
471 (true, writer
.bytes_written())
473 None
=> (false, writer
.bytes_written()),
477 status
.bytes_written
+= bytes_written
;
479 let request_sync
= status
.bytes_written
>= COMMIT_BLOCK_SIZE
;
481 if !done
|| request_sync
{
485 Ok((done
, bytes_written
))
488 /// Move to EOM (if not already there), then creates a new chunk
489 /// archive and writes chunks from 'chunk_iter'. This stops when
490 /// it detect LEOM or when we reach max archive size
491 /// (4GB). Written chunks are registered in the media catalog.
492 pub fn append_chunk_archive(
495 chunk_iter
: &mut std
::iter
::Peekable
<NewChunksIterator
>,
497 ) -> Result
<(bool
, usize), Error
> {
498 let status
= match self.status
{
499 Some(ref mut status
) => status
,
500 None
=> bail
!("PoolWriter - no media loaded"),
503 let current_file_number
= Self::prepare_tape_write(status
, worker
)?
;
505 let writer
= status
.drive
.write_file()?
;
507 let start_time
= SystemTime
::now();
509 let (saved_chunks
, content_uuid
, leom
, bytes_written
) =
510 write_chunk_archive(worker
, writer
, chunk_iter
, store
, MAX_CHUNK_ARCHIVE_SIZE
)?
;
512 status
.bytes_written
+= bytes_written
;
514 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
517 "wrote {} chunks ({:.2} MB at {:.2} MB/s)",
519 bytes_written
as f64 / 1_000_000.0,
520 (bytes_written
as f64) / (1_000_000.0 * elapsed
),
523 let request_sync
= status
.bytes_written
>= COMMIT_BLOCK_SIZE
;
525 // register chunks in media_catalog
526 self.catalog_set
.lock().unwrap().register_chunk_archive(
533 if leom
|| request_sync
{
537 Ok((leom
, bytes_written
))
540 pub fn spawn_chunk_reader_thread(
542 datastore
: Arc
<DataStore
>,
543 snapshot_reader
: Arc
<Mutex
<SnapshotReader
>>,
544 ) -> Result
<(std
::thread
::JoinHandle
<()>, NewChunksIterator
), Error
> {
545 NewChunksIterator
::spawn(datastore
, snapshot_reader
, Arc
::clone(&self.catalog_set
))
548 pub(crate) fn catalog_version(&self) -> [u8; 8] {
550 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1
552 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
557 /// write up to <max_size> of chunks
558 fn write_chunk_archive
<'a
>(
559 _worker
: &WorkerTask
,
560 writer
: Box
<dyn 'a
+ TapeWrite
>,
561 chunk_iter
: &mut std
::iter
::Peekable
<NewChunksIterator
>,
564 ) -> Result
<(Vec
<[u8; 32]>, Uuid
, bool
, usize), Error
> {
565 let (mut writer
, content_uuid
) = ChunkArchiveWriter
::new(writer
, store
, true)?
;
567 // we want to get the chunk list in correct order
568 let mut chunk_list
: Vec
<[u8; 32]> = Vec
::new();
570 let mut leom
= false;
573 let (digest
, blob
) = match chunk_iter
.peek() {
575 Some(Ok((digest
, blob
))) => (digest
, blob
),
576 Some(Err(err
)) => bail
!("{}", err
),
579 //println!("CHUNK {} size {}", hex::encode(digest), blob.raw_size());
581 match writer
.try_write_chunk(digest
, blob
) {
583 chunk_list
.push(*digest
);
584 chunk_iter
.next(); // consume
587 // Note; we do not consume the chunk (no chunk_iter.next())
591 Err(err
) => bail
!("write chunk failed - {}", err
),
594 if writer
.bytes_written() > max_size
{
595 //task_log!(worker, "Chunk Archive max size reached, closing archive");
602 Ok((chunk_list
, content_uuid
, leom
, writer
.bytes_written()))
605 // Compare the media set label. If the media is empty, or the existing
606 // set label does not match the expected media set, overwrite the
608 fn update_media_set_label(
610 drive
: &mut dyn TapeDriver
,
611 old_set
: Option
<MediaSetLabel
>,
613 ) -> Result
<(MediaCatalog
, bool
), Error
> {
616 let new_set
= match media_id
.media_set_label
{
617 None
=> bail
!("got media without media set - internal error"),
618 Some(ref set
) => set
,
621 let key_config
= if let Some(ref fingerprint
) = new_set
.encryption_key_fingerprint
{
622 let (config_map
, _digest
) = load_key_configs()?
;
623 match config_map
.get(fingerprint
) {
624 Some(key_config
) => Some(key_config
.clone()),
627 "unable to find tape encryption key config '{}'",
636 let status_path
= Path
::new(TAPE_STATUS_DIR
);
638 let new_media
= match old_set
{
640 task_log
!(worker
, "writing new media set label");
641 drive
.write_media_set_label(new_set
, key_config
.as_ref())?
;
642 media_catalog
= MediaCatalog
::overwrite(status_path
, media_id
, false)?
;
645 Some(media_set_label
) => {
646 if new_set
.uuid
== media_set_label
.uuid
{
647 if new_set
.seq_nr
!= media_set_label
.seq_nr
{
649 "got media with wrong media sequence number ({} != {}",
651 media_set_label
.seq_nr
654 if new_set
.encryption_key_fingerprint
!= media_set_label
.encryption_key_fingerprint
656 bail
!("detected changed encryption fingerprint - internal error");
658 media_catalog
= MediaCatalog
::open(status_path
, media_id
, true, false)?
;
660 // todo: verify last content/media_catalog somehow?
666 "writing new media set label (overwrite '{}/{}')",
667 media_set_label
.uuid
.to_string(),
668 media_set_label
.seq_nr
,
671 drive
.write_media_set_label(new_set
, key_config
.as_ref())?
;
672 media_catalog
= MediaCatalog
::overwrite(status_path
, media_id
, false)?
;
678 Ok((media_catalog
, new_media
))