2 pub use catalog_set
::*;
4 mod new_chunks_iterator
;
5 pub use new_chunks_iterator
::*;
7 use std
::collections
::HashSet
;
9 use std
::path
::PathBuf
;
10 use std
::sync
::{Arc, Mutex}
;
11 use std
::time
::SystemTime
;
13 use anyhow
::{bail, Error}
;
15 use proxmox_sys
::{task_log, task_warn}
;
16 use proxmox_uuid
::Uuid
;
18 use pbs_datastore
::{DataStore, SnapshotReader}
;
19 use pbs_tape
::{sg_tape::tape_alert_flags_critical, TapeWrite}
;
20 use proxmox_rest_server
::WorkerTask
;
23 drive
::{media_changer, request_and_load_media, TapeDriver}
,
24 encryption_keys
::load_key_configs
,
26 tape_write_catalog
, tape_write_snapshot_archive
, ChunkArchiveWriter
, MediaSetLabel
,
28 MediaCatalog
, MediaId
, MediaPool
, COMMIT_BLOCK_SIZE
, MAX_CHUNK_ARCHIVE_SIZE
, TAPE_STATUS_DIR
,
31 use super::file_formats
::{
32 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
, PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1
,
35 // Warn when the sequence number reaches this limit, as large
36 // media sets are error prone and take a very long time to restore from.
37 const MEDIA_SET_SEQ_NR_WARN_LIMIT
: u64 = 20;
39 struct PoolWriterState
{
40 drive
: Box
<dyn TapeDriver
>,
41 // Media Uuid from loaded media
43 // tell if we already moved to EOM
45 // bytes written after the last tape fush/sync
49 /// Helper to manage a backup job, writing several tapes of a pool
50 pub struct PoolWriter
{
53 status
: Option
<PoolWriterState
>,
54 catalog_set
: Arc
<Mutex
<CatalogSet
>>,
55 notify_email
: Option
<String
>,
57 used_tapes
: HashSet
<Uuid
>,
65 notify_email
: Option
<String
>,
66 force_media_set
: bool
,
68 ) -> Result
<Self, Error
> {
69 let current_time
= proxmox_time
::epoch_i64();
71 let new_media_set_reason
= pool
.start_write_session(current_time
, force_media_set
)?
;
72 if let Some(reason
) = new_media_set_reason
{
73 task_log
!(worker
, "starting new media set - reason: {}", reason
,);
76 let media_set_uuid
= pool
.current_media_set().uuid();
77 task_log
!(worker
, "media set uuid: {}", media_set_uuid
);
79 let mut catalog_set
= CatalogSet
::new();
81 // load all catalogs read-only at start
82 for media_uuid
in pool
.current_media_list()?
{
83 let media_info
= pool
.lookup_media(media_uuid
).unwrap();
84 let media_catalog
= MediaCatalog
::open(TAPE_STATUS_DIR
, media_info
.id(), false, false)?
;
85 catalog_set
.append_read_only_catalog(media_catalog
)?
;
90 drive_name
: drive_name
.to_string(),
92 catalog_set
: Arc
::new(Mutex
::new(catalog_set
)),
95 used_tapes
: HashSet
::new(),
99 pub fn pool(&mut self) -> &mut MediaPool
{
103 /// Set media status to FULL (persistent - stores pool status)
104 pub fn set_media_status_full(&mut self, uuid
: &Uuid
) -> Result
<(), Error
> {
105 self.pool
.set_media_status_full(uuid
)?
;
109 pub fn get_used_media_labels(&self) -> Result
<Vec
<String
>, Error
> {
110 let mut res
= Vec
::with_capacity(self.used_tapes
.len());
111 for media_uuid
in &self.used_tapes
{
112 let media_info
= self.pool
.lookup_media(media_uuid
)?
;
113 res
.push(media_info
.label_text().to_string());
119 pub fn contains_snapshot(
122 ns
: &pbs_api_types
::BackupNamespace
,
123 snapshot
: &pbs_api_types
::BackupDir
,
128 .contains_snapshot(store
, ns
, snapshot
)
131 /// Eject media and drop PoolWriterState (close drive)
132 pub fn eject_media(&mut self, worker
: &WorkerTask
) -> Result
<(), Error
> {
133 let mut status
= match self.status
.take() {
134 Some(status
) => status
,
135 None
=> return Ok(()), // no media loaded
138 let (drive_config
, _digest
) = pbs_config
::drive
::config()?
;
140 if let Some((mut changer
, _
)) = media_changer(&drive_config
, &self.drive_name
)?
{
141 task_log
!(worker
, "eject media");
142 status
.drive
.eject_media()?
; // rewind and eject early, so that unload_media is faster
143 drop(status
); // close drive
144 task_log
!(worker
, "unload media");
145 changer
.unload_media(None
)?
; //eject and unload
147 task_log
!(worker
, "standalone drive - ejecting media");
148 status
.drive
.eject_media()?
;
154 /// Export current media set and drop PoolWriterState (close drive)
155 pub fn export_media_set(&mut self, worker
: &WorkerTask
) -> Result
<(), Error
> {
156 let mut status
= self.status
.take();
158 let (drive_config
, _digest
) = pbs_config
::drive
::config()?
;
160 if let Some((mut changer
, _
)) = media_changer(&drive_config
, &self.drive_name
)?
{
161 if let Some(ref mut status
) = status
{
162 task_log
!(worker
, "rewind media");
163 // rewind first so that the unload command later does not run into a timeout
164 status
.drive
.rewind()?
;
166 drop(status
); // close drive
168 for media_uuid
in self.pool
.current_media_list()?
{
169 let media
= self.pool
.lookup_media(media_uuid
)?
;
170 let label_text
= media
.label_text();
171 if let Some(slot
) = changer
.export_media(label_text
)?
{
174 "exported media '{}' to import/export slot {}",
181 "export failed - media '{}' is not online or in different drive",
186 } else if let Some(mut status
) = status
{
189 "standalone drive - ejecting media instead of export"
191 status
.drive
.eject_media()?
;
197 /// commit changes to tape and catalog
199 /// This is done automatically during a backupsession, but needs to
200 /// be called explicitly before dropping the PoolWriter
201 pub fn commit(&mut self) -> Result
<(), Error
> {
202 if let Some(PoolWriterState { ref mut drive, .. }
) = self.status
{
203 drive
.sync()?
; // sync all data to the tape
205 self.catalog_set
.lock().unwrap().commit()?
; // then commit the catalog
209 /// Load a writable media into the drive
210 pub fn load_writable_media(&mut self, worker
: &WorkerTask
) -> Result
<Uuid
, Error
> {
211 let last_media_uuid
= match self.status
{
212 Some(PoolWriterState { ref media_uuid, .. }
) => Some(media_uuid
.clone()),
216 let current_time
= proxmox_time
::epoch_i64();
217 let media_uuid
= self.pool
.alloc_writable_media(current_time
)?
;
219 let media
= self.pool
.lookup_media(&media_uuid
).unwrap();
221 let media_changed
= match last_media_uuid
{
222 Some(ref last_media_uuid
) => last_media_uuid
!= &media_uuid
,
227 self.used_tapes
.insert(media_uuid
.clone());
228 return Ok(media_uuid
);
233 "allocated new writable media '{}'",
237 if let Some(PoolWriterState { mut drive, .. }
) = self.status
.take() {
238 if last_media_uuid
.is_some() {
239 task_log
!(worker
, "eject current media");
240 drive
.eject_media()?
;
244 let (drive_config
, _digest
) = pbs_config
::drive
::config()?
;
246 let (mut drive
, old_media_id
) = request_and_load_media(
254 // test for critical tape alert flags
255 if let Ok(alert_flags
) = drive
.tape_alert_flags() {
256 if !alert_flags
.is_empty() {
257 task_log
!(worker
, "TapeAlertFlags: {:?}", alert_flags
);
258 if tape_alert_flags_critical(alert_flags
) {
259 self.pool
.set_media_status_damaged(&media_uuid
)?
;
261 "aborting due to critical tape alert flags: {:?}",
268 let (catalog
, is_new_media
) = update_media_set_label(
271 old_media_id
.media_set_label
,
275 self.catalog_set
.lock().unwrap().append_catalog(catalog
)?
;
277 let media_set
= media
.media_set_label().unwrap();
279 if is_new_media
&& media_set
.seq_nr
>= MEDIA_SET_SEQ_NR_WARN_LIMIT
{
282 "large media-set detected ({}), consider using a different allocation policy",
287 drive
.assert_encryption_mode(media_set
.encryption_key_fingerprint
.is_some())?
;
289 self.status
= Some(PoolWriterState
{
291 media_uuid
: media_uuid
.clone(),
297 // add catalogs from previous media
298 self.append_media_set_catalogs(worker
)?
;
301 self.used_tapes
.insert(media_uuid
.clone());
305 fn open_catalog_file(uuid
: &Uuid
) -> Result
<File
, Error
> {
306 let mut path
= PathBuf
::from(TAPE_STATUS_DIR
);
307 path
.push(uuid
.to_string());
308 path
.set_extension("log");
310 let file
= std
::fs
::OpenOptions
::new().read(true).open(&path
)?
;
315 // Check it tape is loaded, then move to EOM (if not already there)
317 // Returns the tape position at EOM.
318 fn prepare_tape_write(status
: &mut PoolWriterState
, worker
: &WorkerTask
) -> Result
<u64, Error
> {
320 task_log
!(worker
, "moving to end of media");
321 status
.drive
.move_to_eom(true)?
;
322 status
.at_eom
= true;
323 task_log
!(worker
, "arrived at end of media");
326 let current_file_number
= status
.drive
.current_file_number()?
;
327 if current_file_number
< 2 {
329 "got strange file position number from drive ({})",
334 Ok(current_file_number
)
337 /// Move to EOM (if not already there), then write the current
338 /// catalog to the tape. On success, this return 'Ok(true)'.
340 /// Please note that this may fail when there is not enough space
341 /// on the media (return value 'Ok(false, _)'). In that case, the
342 /// archive is marked incomplete. The caller should mark the media
343 /// as full and try again using another media.
344 pub fn append_catalog_archive(&mut self, worker
: &WorkerTask
) -> Result
<bool
, Error
> {
345 let catalog_magic
= self.catalog_version();
347 let status
= match self.status
{
348 Some(ref mut status
) => status
,
349 None
=> bail
!("PoolWriter - no media loaded"),
352 Self::prepare_tape_write(status
, worker
)?
;
354 let catalog_set
= self.catalog_set
.lock().unwrap();
356 let catalog
= match catalog_set
.catalog
{
357 None
=> bail
!("append_catalog_archive failed: no catalog - internal error"),
358 Some(ref catalog
) => catalog
,
361 let media_set
= self.pool
.current_media_set();
363 let media_list
= media_set
.media_list();
364 let uuid
= match media_list
.last() {
365 None
=> bail
!("got empty media list - internal error"),
366 Some(None
) => bail
!("got incomplete media list - internal error"),
367 Some(Some(last_uuid
)) => {
368 if last_uuid
!= catalog
.uuid() {
369 bail
!("got wrong media - internal error");
375 let seq_nr
= media_list
.len() - 1;
377 let mut writer
: Box
<dyn TapeWrite
> = status
.drive
.write_file()?
;
379 let mut file
= Self::open_catalog_file(uuid
)?
;
381 let done
= tape_write_catalog(
394 // Append catalogs for all previous media in set (without last)
395 fn append_media_set_catalogs(&mut self, worker
: &WorkerTask
) -> Result
<(), Error
> {
396 let media_set
= self.pool
.current_media_set();
398 let mut media_list
= media_set
.media_list();
399 if media_list
.len() < 2 {
402 media_list
= &media_list
[..(media_list
.len() - 1)];
404 let catalog_magic
= self.catalog_version();
406 let status
= match self.status
{
407 Some(ref mut status
) => status
,
408 None
=> bail
!("PoolWriter - no media loaded"),
411 Self::prepare_tape_write(status
, worker
)?
;
413 for (seq_nr
, uuid
) in media_list
.iter().enumerate() {
414 let uuid
= match uuid
{
415 None
=> bail
!("got incomplete media list - internal error"),
419 let mut writer
: Box
<dyn TapeWrite
> = status
.drive
.write_file()?
;
421 let mut file
= Self::open_catalog_file(uuid
)?
;
423 task_log
!(worker
, "write catalog for previous media: {}", uuid
);
425 if tape_write_catalog(
435 bail
!("got EOM while writing start catalog");
442 /// Move to EOM (if not already there), then creates a new snapshot
443 /// archive writing specified files (as .pxar) into it. On
444 /// success, this return 'Ok(true)' and the media catalog gets
447 /// Please note that this may fail when there is not enough space
448 /// on the media (return value 'Ok(false, _)'). In that case, the
449 /// archive is marked incomplete, and we do not use it. The caller
450 /// should mark the media as full and try again using another
452 pub fn append_snapshot_archive(
455 snapshot_reader
: &SnapshotReader
,
456 ) -> Result
<(bool
, usize), Error
> {
457 let status
= match self.status
{
458 Some(ref mut status
) => status
,
459 None
=> bail
!("PoolWriter - no media loaded"),
462 let current_file_number
= Self::prepare_tape_write(status
, worker
)?
;
464 let (done
, bytes_written
) = {
465 let mut writer
: Box
<dyn TapeWrite
> = status
.drive
.write_file()?
;
467 match tape_write_snapshot_archive(writer
.as_mut(), snapshot_reader
)?
{
468 Some(content_uuid
) => {
469 self.catalog_set
.lock().unwrap().register_snapshot(
472 snapshot_reader
.datastore_name(),
473 snapshot_reader
.snapshot().backup_ns(),
474 snapshot_reader
.snapshot().as_ref(),
476 (true, writer
.bytes_written())
478 None
=> (false, writer
.bytes_written()),
482 status
.bytes_written
+= bytes_written
;
484 let request_sync
= status
.bytes_written
>= COMMIT_BLOCK_SIZE
;
486 if !done
|| request_sync
{
490 Ok((done
, bytes_written
))
493 /// Move to EOM (if not already there), then creates a new chunk
494 /// archive and writes chunks from 'chunk_iter'. This stops when
495 /// it detect LEOM or when we reach max archive size
496 /// (4GB). Written chunks are registered in the media catalog.
497 pub fn append_chunk_archive(
500 chunk_iter
: &mut std
::iter
::Peekable
<NewChunksIterator
>,
502 ) -> Result
<(bool
, usize), Error
> {
503 let status
= match self.status
{
504 Some(ref mut status
) => status
,
505 None
=> bail
!("PoolWriter - no media loaded"),
508 let current_file_number
= Self::prepare_tape_write(status
, worker
)?
;
510 let writer
= status
.drive
.write_file()?
;
512 let start_time
= SystemTime
::now();
514 let (saved_chunks
, content_uuid
, leom
, bytes_written
) =
515 write_chunk_archive(worker
, writer
, chunk_iter
, store
, MAX_CHUNK_ARCHIVE_SIZE
)?
;
517 status
.bytes_written
+= bytes_written
;
519 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
522 "wrote {} chunks ({:.2} MB at {:.2} MB/s)",
524 bytes_written
as f64 / 1_000_000.0,
525 (bytes_written
as f64) / (1_000_000.0 * elapsed
),
528 let request_sync
= status
.bytes_written
>= COMMIT_BLOCK_SIZE
;
530 // register chunks in media_catalog
531 self.catalog_set
.lock().unwrap().register_chunk_archive(
538 if leom
|| request_sync
{
542 Ok((leom
, bytes_written
))
545 pub fn spawn_chunk_reader_thread(
547 datastore
: Arc
<DataStore
>,
548 snapshot_reader
: Arc
<Mutex
<SnapshotReader
>>,
549 ) -> Result
<(std
::thread
::JoinHandle
<()>, NewChunksIterator
), Error
> {
550 NewChunksIterator
::spawn(datastore
, snapshot_reader
, Arc
::clone(&self.catalog_set
))
553 pub(crate) fn catalog_version(&self) -> [u8; 8] {
555 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1
557 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
562 /// write up to <max_size> of chunks
563 #[allow(clippy::type_complexity)]
564 fn write_chunk_archive
<'a
>(
565 _worker
: &WorkerTask
,
566 writer
: Box
<dyn 'a
+ TapeWrite
>,
567 chunk_iter
: &mut std
::iter
::Peekable
<NewChunksIterator
>,
570 ) -> Result
<(Vec
<[u8; 32]>, Uuid
, bool
, usize), Error
> {
571 let (mut writer
, content_uuid
) = ChunkArchiveWriter
::new(writer
, store
, true)?
;
573 // we want to get the chunk list in correct order
574 let mut chunk_list
: Vec
<[u8; 32]> = Vec
::new();
576 let mut leom
= false;
579 let (digest
, blob
) = match chunk_iter
.peek() {
581 Some(Ok((digest
, blob
))) => (digest
, blob
),
582 Some(Err(err
)) => bail
!("{}", err
),
585 //println!("CHUNK {} size {}", hex::encode(digest), blob.raw_size());
587 match writer
.try_write_chunk(digest
, blob
) {
589 chunk_list
.push(*digest
);
590 chunk_iter
.next(); // consume
593 // Note; we do not consume the chunk (no chunk_iter.next())
597 Err(err
) => bail
!("write chunk failed - {}", err
),
600 if writer
.bytes_written() > max_size
{
601 //task_log!(worker, "Chunk Archive max size reached, closing archive");
608 Ok((chunk_list
, content_uuid
, leom
, writer
.bytes_written()))
611 // Compare the media set label. If the media is empty, or the existing
612 // set label does not match the expected media set, overwrite the
614 fn update_media_set_label(
616 drive
: &mut dyn TapeDriver
,
617 old_set
: Option
<MediaSetLabel
>,
619 ) -> Result
<(MediaCatalog
, bool
), Error
> {
622 let new_set
= match media_id
.media_set_label
{
623 None
=> bail
!("got media without media set - internal error"),
624 Some(ref set
) => set
,
627 let key_config
= if let Some(ref fingerprint
) = new_set
.encryption_key_fingerprint
{
628 let (config_map
, _digest
) = load_key_configs()?
;
629 match config_map
.get(fingerprint
) {
630 Some(key_config
) => Some(key_config
.clone()),
633 "unable to find tape encryption key config '{}'",
642 let new_media
= match old_set
{
644 task_log
!(worker
, "writing new media set label");
645 drive
.write_media_set_label(new_set
, key_config
.as_ref())?
;
646 media_catalog
= MediaCatalog
::overwrite(TAPE_STATUS_DIR
, media_id
, false)?
;
649 Some(media_set_label
) => {
650 if new_set
.uuid
== media_set_label
.uuid
{
651 if new_set
.seq_nr
!= media_set_label
.seq_nr
{
653 "got media with wrong media sequence number ({} != {}",
655 media_set_label
.seq_nr
658 if new_set
.encryption_key_fingerprint
!= media_set_label
.encryption_key_fingerprint
660 bail
!("detected changed encryption fingerprint - internal error");
662 media_catalog
= MediaCatalog
::open(TAPE_STATUS_DIR
, media_id
, true, false)?
;
664 // todo: verify last content/media_catalog somehow?
670 "writing new media set label (overwrite '{}/{}')",
671 media_set_label
.uuid
.to_string(),
672 media_set_label
.seq_nr
,
675 drive
.write_media_set_label(new_set
, key_config
.as_ref())?
;
676 media_catalog
= MediaCatalog
::overwrite(TAPE_STATUS_DIR
, media_id
, false)?
;
682 Ok((media_catalog
, new_media
))