]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/tape/pool_writer/mod.rs
fix #5229: tape: remove max sequence number limit
[proxmox-backup.git] / src / tape / pool_writer / mod.rs
index 8042de9e22fc78a5d7dd3353d6301115e9c95607..a6ba4a1d3828a09714b0816c382311221e323361 100644 (file)
@@ -4,46 +4,37 @@ pub use catalog_set::*;
 mod new_chunks_iterator;
 pub use new_chunks_iterator::*;
 
-use std::path::Path;
+use std::collections::HashSet;
 use std::fs::File;
-use std::time::SystemTime;
+use std::path::PathBuf;
 use std::sync::{Arc, Mutex};
+use std::time::SystemTime;
 
 use anyhow::{bail, Error};
 
-use proxmox::tools::Uuid;
+use proxmox_sys::{task_log, task_warn};
+use proxmox_uuid::Uuid;
 
-use pbs_datastore::task_log;
-use pbs_config::tape_encryption_keys::load_key_configs;
-use pbs_tape::{
-    TapeWrite,
-    sg_tape::tape_alert_flags_critical,
-};
+use pbs_datastore::{DataStore, SnapshotReader};
+use pbs_tape::{sg_tape::tape_alert_flags_critical, TapeWrite};
+use proxmox_rest_server::WorkerTask;
 
-use crate::{
-    backup::{DataStore, SnapshotReader},
-    server::WorkerTask,
-    tape::{
-        TAPE_STATUS_DIR,
-        MAX_CHUNK_ARCHIVE_SIZE,
-        COMMIT_BLOCK_SIZE,
-        MediaPool,
-        MediaId,
-        MediaCatalog,
-        file_formats::{
-            MediaSetLabel,
-            ChunkArchiveWriter,
-            tape_write_snapshot_archive,
-            tape_write_catalog,
-        },
-        drive::{
-            TapeDriver,
-            request_and_load_media,
-            media_changer,
-        },
+use crate::tape::{
+    drive::{media_changer, request_and_load_media, TapeDriver},
+    encryption_keys::load_key_configs,
+    file_formats::{
+        tape_write_catalog, tape_write_snapshot_archive, ChunkArchiveWriter, MediaSetLabel,
     },
+    MediaCatalog, MediaId, MediaPool, COMMIT_BLOCK_SIZE, MAX_CHUNK_ARCHIVE_SIZE, TAPE_STATUS_DIR,
 };
 
+use super::file_formats::{
+    PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1,
+};
+
+// Warn when the sequence number reaches this limit, as large
+// media sets are error prone and take a very long time to restore from.
+const MEDIA_SET_SEQ_NR_WARN_LIMIT: u64 = 20;
 
 struct PoolWriterState {
     drive: Box<dyn TapeDriver>,
@@ -62,27 +53,24 @@ pub struct PoolWriter {
     status: Option<PoolWriterState>,
     catalog_set: Arc<Mutex<CatalogSet>>,
     notify_email: Option<String>,
+    ns_magic: bool,
+    used_tapes: HashSet<Uuid>,
 }
 
 impl PoolWriter {
-
     pub fn new(
         mut pool: MediaPool,
         drive_name: &str,
         worker: &WorkerTask,
         notify_email: Option<String>,
         force_media_set: bool,
+        ns_magic: bool,
     ) -> Result<Self, Error> {
-
-        let current_time = proxmox::tools::time::epoch_i64();
+        let current_time = proxmox_time::epoch_i64();
 
         let new_media_set_reason = pool.start_write_session(current_time, force_media_set)?;
         if let Some(reason) = new_media_set_reason {
-            task_log!(
-                worker,
-                "starting new media set - reason: {}",
-                reason,
-            );
+            task_log!(worker, "starting new media set - reason: {}", reason,);
         }
 
         let media_set_uuid = pool.current_media_set().uuid();
@@ -93,12 +81,7 @@ impl PoolWriter {
         // load all catalogs read-only at start
         for media_uuid in pool.current_media_list()? {
             let media_info = pool.lookup_media(media_uuid).unwrap();
-            let media_catalog = MediaCatalog::open(
-                Path::new(TAPE_STATUS_DIR),
-                media_info.id(),
-                false,
-                false,
-            )?;
+            let media_catalog = MediaCatalog::open(TAPE_STATUS_DIR, media_info.id(), false, false)?;
             catalog_set.append_read_only_catalog(media_catalog)?;
         }
 
@@ -108,7 +91,9 @@ impl PoolWriter {
             status: None,
             catalog_set: Arc::new(Mutex::new(catalog_set)),
             notify_email,
-         })
+            ns_magic,
+            used_tapes: HashSet::new(),
+        })
     }
 
     pub fn pool(&mut self) -> &mut MediaPool {
@@ -117,12 +102,30 @@ impl PoolWriter {
 
     /// Set media status to FULL (persistent - stores pool status)
     pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
-        self.pool.set_media_status_full(&uuid)?;
+        self.pool.set_media_status_full(uuid)?;
         Ok(())
     }
 
-    pub fn contains_snapshot(&self, store: &str, snapshot: &str) -> bool {
-        self.catalog_set.lock().unwrap().contains_snapshot(store, snapshot)
+    pub fn get_used_media_labels(&self) -> Result<Vec<String>, Error> {
+        let mut res = Vec::with_capacity(self.used_tapes.len());
+        for media_uuid in &self.used_tapes {
+            let media_info = self.pool.lookup_media(media_uuid)?;
+            res.push(media_info.label_text().to_string());
+        }
+
+        Ok(res)
+    }
+
+    pub fn contains_snapshot(
+        &self,
+        store: &str,
+        ns: &pbs_api_types::BackupNamespace,
+        snapshot: &pbs_api_types::BackupDir,
+    ) -> bool {
+        self.catalog_set
+            .lock()
+            .unwrap()
+            .contains_snapshot(store, ns, snapshot)
     }
 
     /// Eject media and drop PoolWriterState (close drive)
@@ -135,13 +138,13 @@ impl PoolWriter {
         let (drive_config, _digest) = pbs_config::drive::config()?;
 
         if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
-            worker.log("eject media");
+            task_log!(worker, "eject media");
             status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
             drop(status); // close drive
-            worker.log("unload media");
+            task_log!(worker, "unload media");
             changer.unload_media(None)?; //eject and unload
         } else {
-            worker.log("standalone drive - ejecting media");
+            task_log!(worker, "standalone drive - ejecting media");
             status.drive.eject_media()?;
         }
 
@@ -155,28 +158,36 @@ impl PoolWriter {
         let (drive_config, _digest) = pbs_config::drive::config()?;
 
         if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
-
             if let Some(ref mut status) = status {
-                worker.log("eject media");
-                status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
+                task_log!(worker, "rewind media");
+                // rewind first so that the unload command later does not run into a timeout
+                status.drive.rewind()?;
             }
             drop(status); // close drive
 
-            worker.log("unload media");
-            changer.unload_media(None)?;
-
             for media_uuid in self.pool.current_media_list()? {
                 let media = self.pool.lookup_media(media_uuid)?;
                 let label_text = media.label_text();
                 if let Some(slot) = changer.export_media(label_text)? {
-                    worker.log(format!("exported media '{}' to import/export slot {}", label_text, slot));
+                    task_log!(
+                        worker,
+                        "exported media '{}' to import/export slot {}",
+                        label_text,
+                        slot
+                    );
                 } else {
-                    worker.warn(format!("export failed - media '{}' is not online", label_text));
+                    task_warn!(
+                        worker,
+                        "export failed - media '{}' is not online or in different drive",
+                        label_text
+                    );
                 }
             }
-
         } else if let Some(mut status) = status {
-            worker.log("standalone drive - ejecting media instead of export");
+            task_log!(
+                worker,
+                "standalone drive - ejecting media instead of export"
+            );
             status.drive.eject_media()?;
         }
 
@@ -188,7 +199,7 @@ impl PoolWriter {
     /// This is done automatically during a backupsession, but needs to
     /// be called explicitly before dropping the PoolWriter
     pub fn commit(&mut self) -> Result<(), Error> {
-         if let Some(PoolWriterState {ref mut drive, .. }) = self.status {
+        if let Some(PoolWriterState { ref mut drive, .. }) = self.status {
             drive.sync()?; // sync all data to the tape
         }
         self.catalog_set.lock().unwrap().commit()?; // then commit the catalog
@@ -198,11 +209,11 @@ impl PoolWriter {
     /// Load a writable media into the drive
     pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
         let last_media_uuid = match self.status {
-            Some(PoolWriterState { ref media_uuid, ..}) => Some(media_uuid.clone()),
+            Some(PoolWriterState { ref media_uuid, .. }) => Some(media_uuid.clone()),
             None => None,
         };
 
-        let current_time = proxmox::tools::time::epoch_i64();
+        let current_time = proxmox_time::epoch_i64();
         let media_uuid = self.pool.alloc_writable_media(current_time)?;
 
         let media = self.pool.lookup_media(&media_uuid).unwrap();
@@ -213,12 +224,17 @@ impl PoolWriter {
         };
 
         if !media_changed {
+            self.used_tapes.insert(media_uuid.clone());
             return Ok(media_uuid);
         }
 
-        task_log!(worker, "allocated new writable media '{}'", media.label_text());
+        task_log!(
+            worker,
+            "allocated new writable media '{}'",
+            media.label_text()
+        );
 
-        if let Some(PoolWriterState {mut drive, .. }) = self.status.take() {
+        if let Some(PoolWriterState { mut drive, .. }) = self.status.take() {
             if last_media_uuid.is_some() {
                 task_log!(worker, "eject current media");
                 drive.eject_media()?;
@@ -227,16 +243,24 @@ impl PoolWriter {
 
         let (drive_config, _digest) = pbs_config::drive::config()?;
 
-        let (mut drive, old_media_id) =
-            request_and_load_media(worker, &drive_config, &self.drive_name, media.label(), &self.notify_email)?;
+        let (mut drive, old_media_id) = request_and_load_media(
+            worker,
+            &drive_config,
+            &self.drive_name,
+            media.label(),
+            &self.notify_email,
+        )?;
 
         // test for critical tape alert flags
         if let Ok(alert_flags) = drive.tape_alert_flags() {
             if !alert_flags.is_empty() {
-                worker.log(format!("TapeAlertFlags: {:?}", alert_flags));
+                task_log!(worker, "TapeAlertFlags: {:?}", alert_flags);
                 if tape_alert_flags_critical(alert_flags) {
                     self.pool.set_media_status_damaged(&media_uuid)?;
-                    bail!("aborting due to critical tape alert flags: {:?}", alert_flags);
+                    bail!(
+                        "aborting due to critical tape alert flags: {:?}",
+                        alert_flags
+                    );
                 }
             }
         }
@@ -250,14 +274,17 @@ impl PoolWriter {
 
         self.catalog_set.lock().unwrap().append_catalog(catalog)?;
 
-        let media_set = media.media_set_label().clone().unwrap();
+        let media_set = media.media_set_label().unwrap();
 
-        let encrypt_fingerprint = media_set
-            .encryption_key_fingerprint
-            .clone()
-            .map(|fp| (fp, media_set.uuid.clone()));
+        if is_new_media && media_set.seq_nr >= MEDIA_SET_SEQ_NR_WARN_LIMIT {
+            task_warn!(
+                worker,
+                "large media-set detected ({}), consider using a different allocation policy",
+                media_set.seq_nr
+            );
+        }
 
-        drive.set_encryption(encrypt_fingerprint)?;
+        drive.assert_encryption_mode(media_set.encryption_key_fingerprint.is_some())?;
 
         self.status = Some(PoolWriterState {
             drive,
@@ -271,19 +298,16 @@ impl PoolWriter {
             self.append_media_set_catalogs(worker)?;
         }
 
+        self.used_tapes.insert(media_uuid.clone());
         Ok(media_uuid)
     }
 
     fn open_catalog_file(uuid: &Uuid) -> Result<File, Error> {
-
-        let status_path = Path::new(TAPE_STATUS_DIR);
-        let mut path = status_path.to_owned();
+        let mut path = PathBuf::from(TAPE_STATUS_DIR);
         path.push(uuid.to_string());
         path.set_extension("log");
 
-        let file = std::fs::OpenOptions::new()
-            .read(true)
-            .open(&path)?;
+        let file = std::fs::OpenOptions::new().read(true).open(&path)?;
 
         Ok(file)
     }
@@ -291,20 +315,20 @@ impl PoolWriter {
     // Check it tape is loaded, then move to EOM (if not already there)
     //
     // Returns the tape position at EOM.
-    fn prepare_tape_write(
-        status: &mut PoolWriterState,
-        worker: &WorkerTask,
-    ) -> Result<u64, Error> {
-
+    fn prepare_tape_write(status: &mut PoolWriterState, worker: &WorkerTask) -> Result<u64, Error> {
         if !status.at_eom {
-            worker.log(String::from("moving to end of media"));
+            task_log!(worker, "moving to end of media");
             status.drive.move_to_eom(true)?;
             status.at_eom = true;
+            task_log!(worker, "arrived at end of media");
         }
 
         let current_file_number = status.drive.current_file_number()?;
         if current_file_number < 2 {
-            bail!("got strange file position number from drive ({})", current_file_number);
+            bail!(
+                "got strange file position number from drive ({})",
+                current_file_number
+            );
         }
 
         Ok(current_file_number)
@@ -317,10 +341,8 @@ impl PoolWriter {
     /// on the media (return value 'Ok(false, _)'). In that case, the
     /// archive is marked incomplete. The caller should mark the media
     /// as full and try again using another media.
-    pub fn append_catalog_archive(
-        &mut self,
-        worker: &WorkerTask,
-    ) -> Result<bool, Error> {
+    pub fn append_catalog_archive(&mut self, worker: &WorkerTask) -> Result<bool, Error> {
+        let catalog_magic = self.catalog_version();
 
         let status = match self.status {
             Some(ref mut status) => status,
@@ -362,24 +384,24 @@ impl PoolWriter {
             media_set.uuid(),
             seq_nr,
             &mut file,
-        )?.is_some();
+            catalog_magic,
+        )?
+        .is_some();
 
         Ok(done)
     }
 
     // Append catalogs for all previous media in set (without last)
-    fn append_media_set_catalogs(
-        &mut self,
-        worker: &WorkerTask,
-    ) -> Result<(), Error> {
-
+    fn append_media_set_catalogs(&mut self, worker: &WorkerTask) -> Result<(), Error> {
         let media_set = self.pool.current_media_set();
 
-        let mut media_list = &media_set.media_list()[..];
+        let mut media_list = media_set.media_list();
         if media_list.len() < 2 {
             return Ok(());
         }
-        media_list = &media_list[..(media_list.len()-1)];
+        media_list = &media_list[..(media_list.len() - 1)];
+
+        let catalog_magic = self.catalog_version();
 
         let status = match self.status {
             Some(ref mut status) => status,
@@ -389,7 +411,6 @@ impl PoolWriter {
         Self::prepare_tape_write(status, worker)?;
 
         for (seq_nr, uuid) in media_list.iter().enumerate() {
-
             let uuid = match uuid {
                 None => bail!("got incomplete media list - internal error"),
                 Some(uuid) => uuid,
@@ -407,7 +428,10 @@ impl PoolWriter {
                 media_set.uuid(),
                 seq_nr,
                 &mut file,
-            )?.is_none() {
+                catalog_magic,
+            )?
+            .is_none()
+            {
                 bail!("got EOM while writing start catalog");
             }
         }
@@ -430,7 +454,6 @@ impl PoolWriter {
         worker: &WorkerTask,
         snapshot_reader: &SnapshotReader,
     ) -> Result<(bool, usize), Error> {
-
         let status = match self.status {
             Some(ref mut status) => status,
             None => bail!("PoolWriter - no media loaded"),
@@ -446,8 +469,9 @@ impl PoolWriter {
                     self.catalog_set.lock().unwrap().register_snapshot(
                         content_uuid,
                         current_file_number,
-                        &snapshot_reader.datastore_name().to_string(),
-                        &snapshot_reader.snapshot().to_string(),
+                        snapshot_reader.datastore_name(),
+                        snapshot_reader.snapshot().backup_ns(),
+                        snapshot_reader.snapshot().as_ref(),
                     )?;
                     (true, writer.bytes_written())
                 }
@@ -476,7 +500,6 @@ impl PoolWriter {
         chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
         store: &str,
     ) -> Result<(bool, usize), Error> {
-
         let status = match self.status {
             Some(ref mut status) => status,
             None => bail!("PoolWriter - no media loaded"),
@@ -488,29 +511,29 @@ impl PoolWriter {
 
         let start_time = SystemTime::now();
 
-        let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
-            worker,
-            writer,
-            chunk_iter,
-            store,
-            MAX_CHUNK_ARCHIVE_SIZE,
-        )?;
+        let (saved_chunks, content_uuid, leom, bytes_written) =
+            write_chunk_archive(worker, writer, chunk_iter, store, MAX_CHUNK_ARCHIVE_SIZE)?;
 
         status.bytes_written += bytes_written;
 
-        let elapsed =  start_time.elapsed()?.as_secs_f64();
-        worker.log(format!(
+        let elapsed = start_time.elapsed()?.as_secs_f64();
+        task_log!(
+            worker,
             "wrote {} chunks ({:.2} MB at {:.2} MB/s)",
             saved_chunks.len(),
-            bytes_written as f64 /1_000_000.0,
-            (bytes_written as f64)/(1_000_000.0*elapsed),
-        ));
+            bytes_written as f64 / 1_000_000.0,
+            (bytes_written as f64) / (1_000_000.0 * elapsed),
+        );
 
         let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
 
         // register chunks in media_catalog
-        self.catalog_set.lock().unwrap()
-            .register_chunk_archive(content_uuid, current_file_number, store, &saved_chunks)?;
+        self.catalog_set.lock().unwrap().register_chunk_archive(
+            content_uuid,
+            current_file_number,
+            store,
+            &saved_chunks,
+        )?;
 
         if leom || request_sync {
             self.commit()?;
@@ -524,27 +547,31 @@ impl PoolWriter {
         datastore: Arc<DataStore>,
         snapshot_reader: Arc<Mutex<SnapshotReader>>,
     ) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
-        NewChunksIterator::spawn(
-            datastore,
-            snapshot_reader,
-            Arc::clone(&self.catalog_set),
-        )
+        NewChunksIterator::spawn(datastore, snapshot_reader, Arc::clone(&self.catalog_set))
+    }
+
+    pub(crate) fn catalog_version(&self) -> [u8; 8] {
+        if self.ns_magic {
+            PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1
+        } else {
+            PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
+        }
     }
 }
 
 /// write up to <max_size> of chunks
+#[allow(clippy::type_complexity)]
 fn write_chunk_archive<'a>(
     _worker: &WorkerTask,
     writer: Box<dyn 'a + TapeWrite>,
     chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
     store: &str,
     max_size: usize,
-) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
-
+) -> Result<(Vec<[u8; 32]>, Uuid, bool, usize), Error> {
     let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, store, true)?;
 
     // we want to get the chunk list in correct order
-    let mut chunk_list: Vec<[u8;32]> = Vec::new();
+    let mut chunk_list: Vec<[u8; 32]> = Vec::new();
 
     let mut leom = false;
 
@@ -555,9 +582,9 @@ fn write_chunk_archive<'a>(
             Some(Err(err)) => bail!("{}", err),
         };
 
-        //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(digest), blob.raw_size());
+        //println!("CHUNK {} size {}", hex::encode(digest), blob.raw_size());
 
-        match writer.try_write_chunk(&digest, &blob) {
+        match writer.try_write_chunk(digest, blob) {
             Ok(true) => {
                 chunk_list.push(*digest);
                 chunk_iter.next(); // consume
@@ -571,7 +598,7 @@ fn write_chunk_archive<'a>(
         }
 
         if writer.bytes_written() > max_size {
-            //worker.log("Chunk Archive max size reached, closing archive".to_string());
+            //task_log!(worker, "Chunk Archive max size reached, closing archive");
             break;
         }
     }
@@ -590,7 +617,6 @@ fn update_media_set_label(
     old_set: Option<MediaSetLabel>,
     media_id: &MediaId,
 ) -> Result<(MediaCatalog, bool), Error> {
-
     let media_catalog;
 
     let new_set = match media_id.media_set_label {
@@ -603,44 +629,51 @@ fn update_media_set_label(
         match config_map.get(fingerprint) {
             Some(key_config) => Some(key_config.clone()),
             None => {
-                bail!("unable to find tape encryption key config '{}'", fingerprint);
+                bail!(
+                    "unable to find tape encryption key config '{}'",
+                    fingerprint
+                );
             }
         }
     } else {
         None
     };
 
-    let status_path = Path::new(TAPE_STATUS_DIR);
-
     let new_media = match old_set {
         None => {
-            worker.log("writing new media set label".to_string());
+            task_log!(worker, "writing new media set label");
             drive.write_media_set_label(new_set, key_config.as_ref())?;
-            media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
+            media_catalog = MediaCatalog::overwrite(TAPE_STATUS_DIR, media_id, false)?;
             true
         }
         Some(media_set_label) => {
             if new_set.uuid == media_set_label.uuid {
                 if new_set.seq_nr != media_set_label.seq_nr {
-                    bail!("got media with wrong media sequence number ({} != {}",
-                          new_set.seq_nr,media_set_label.seq_nr);
+                    bail!(
+                        "got media with wrong media sequence number ({} != {}",
+                        new_set.seq_nr,
+                        media_set_label.seq_nr
+                    );
                 }
-                if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint {
+                if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint
+                {
                     bail!("detected changed encryption fingerprint - internal error");
                 }
-                media_catalog = MediaCatalog::open(status_path, &media_id, true, false)?;
+                media_catalog = MediaCatalog::open(TAPE_STATUS_DIR, media_id, true, false)?;
 
                 // todo: verify last content/media_catalog somehow?
 
                 false
             } else {
-                worker.log(
-                    format!("writing new media set label (overwrite '{}/{}')",
-                            media_set_label.uuid.to_string(), media_set_label.seq_nr)
+                task_log!(
+                    worker,
+                    "writing new media set label (overwrite '{}/{}')",
+                    media_set_label.uuid.to_string(),
+                    media_set_label.seq_nr,
                 );
 
                 drive.write_media_set_label(new_set, key_config.as_ref())?;
-                media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
+                media_catalog = MediaCatalog::overwrite(TAPE_STATUS_DIR, media_id, false)?;
                 true
             }
         }