mod new_chunks_iterator;
pub use new_chunks_iterator::*;
-use std::path::Path;
+use std::collections::HashSet;
use std::fs::File;
-use std::time::SystemTime;
+use std::path::PathBuf;
use std::sync::{Arc, Mutex};
+use std::time::SystemTime;
use anyhow::{bail, Error};
-use proxmox::tools::Uuid;
+use proxmox_sys::{task_log, task_warn};
+use proxmox_uuid::Uuid;
-use pbs_datastore::task_log;
-use pbs_config::tape_encryption_keys::load_key_configs;
-use pbs_tape::{
- TapeWrite,
- sg_tape::tape_alert_flags_critical,
-};
+use pbs_datastore::{DataStore, SnapshotReader};
+use pbs_tape::{sg_tape::tape_alert_flags_critical, TapeWrite};
+use proxmox_rest_server::WorkerTask;
-use crate::{
- backup::{DataStore, SnapshotReader},
- server::WorkerTask,
- tape::{
- TAPE_STATUS_DIR,
- MAX_CHUNK_ARCHIVE_SIZE,
- COMMIT_BLOCK_SIZE,
- MediaPool,
- MediaId,
- MediaCatalog,
- file_formats::{
- MediaSetLabel,
- ChunkArchiveWriter,
- tape_write_snapshot_archive,
- tape_write_catalog,
- },
- drive::{
- TapeDriver,
- request_and_load_media,
- media_changer,
- },
+use crate::tape::{
+ drive::{media_changer, request_and_load_media, TapeDriver},
+ encryption_keys::load_key_configs,
+ file_formats::{
+ tape_write_catalog, tape_write_snapshot_archive, ChunkArchiveWriter, MediaSetLabel,
},
+ MediaCatalog, MediaId, MediaPool, COMMIT_BLOCK_SIZE, MAX_CHUNK_ARCHIVE_SIZE, TAPE_STATUS_DIR,
};
+use super::file_formats::{
+ PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1,
+};
+
+// Warn when the sequence number reaches this limit, as large
+// media sets are error prone and take a very long time to restore from.
+const MEDIA_SET_SEQ_NR_WARN_LIMIT: u64 = 20;
struct PoolWriterState {
drive: Box<dyn TapeDriver>,
status: Option<PoolWriterState>,
catalog_set: Arc<Mutex<CatalogSet>>,
notify_email: Option<String>,
+ ns_magic: bool,
+ used_tapes: HashSet<Uuid>,
}
impl PoolWriter {
-
pub fn new(
mut pool: MediaPool,
drive_name: &str,
worker: &WorkerTask,
notify_email: Option<String>,
force_media_set: bool,
+ ns_magic: bool,
) -> Result<Self, Error> {
-
- let current_time = proxmox::tools::time::epoch_i64();
+ let current_time = proxmox_time::epoch_i64();
let new_media_set_reason = pool.start_write_session(current_time, force_media_set)?;
if let Some(reason) = new_media_set_reason {
- task_log!(
- worker,
- "starting new media set - reason: {}",
- reason,
- );
+ task_log!(worker, "starting new media set - reason: {}", reason,);
}
let media_set_uuid = pool.current_media_set().uuid();
// load all catalogs read-only at start
for media_uuid in pool.current_media_list()? {
let media_info = pool.lookup_media(media_uuid).unwrap();
- let media_catalog = MediaCatalog::open(
- Path::new(TAPE_STATUS_DIR),
- media_info.id(),
- false,
- false,
- )?;
+ let media_catalog = MediaCatalog::open(TAPE_STATUS_DIR, media_info.id(), false, false)?;
catalog_set.append_read_only_catalog(media_catalog)?;
}
status: None,
catalog_set: Arc::new(Mutex::new(catalog_set)),
notify_email,
- })
+ ns_magic,
+ used_tapes: HashSet::new(),
+ })
}
pub fn pool(&mut self) -> &mut MediaPool {
/// Set media status to FULL (persistent - stores pool status)
pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
- self.pool.set_media_status_full(&uuid)?;
+ self.pool.set_media_status_full(uuid)?;
Ok(())
}
- pub fn contains_snapshot(&self, store: &str, snapshot: &str) -> bool {
- self.catalog_set.lock().unwrap().contains_snapshot(store, snapshot)
+ pub fn get_used_media_labels(&self) -> Result<Vec<String>, Error> {
+ let mut res = Vec::with_capacity(self.used_tapes.len());
+ for media_uuid in &self.used_tapes {
+ let media_info = self.pool.lookup_media(media_uuid)?;
+ res.push(media_info.label_text().to_string());
+ }
+
+ Ok(res)
+ }
+
+ pub fn contains_snapshot(
+ &self,
+ store: &str,
+ ns: &pbs_api_types::BackupNamespace,
+ snapshot: &pbs_api_types::BackupDir,
+ ) -> bool {
+ self.catalog_set
+ .lock()
+ .unwrap()
+ .contains_snapshot(store, ns, snapshot)
}
/// Eject media and drop PoolWriterState (close drive)
let (drive_config, _digest) = pbs_config::drive::config()?;
if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
- worker.log("eject media");
+ task_log!(worker, "eject media");
status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
drop(status); // close drive
- worker.log("unload media");
+ task_log!(worker, "unload media");
changer.unload_media(None)?; //eject and unload
} else {
- worker.log("standalone drive - ejecting media");
+ task_log!(worker, "standalone drive - ejecting media");
status.drive.eject_media()?;
}
let (drive_config, _digest) = pbs_config::drive::config()?;
if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
-
if let Some(ref mut status) = status {
- worker.log("eject media");
- status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
+ task_log!(worker, "rewind media");
+ // rewind first so that the unload command later does not run into a timeout
+ status.drive.rewind()?;
}
drop(status); // close drive
- worker.log("unload media");
- changer.unload_media(None)?;
-
for media_uuid in self.pool.current_media_list()? {
let media = self.pool.lookup_media(media_uuid)?;
let label_text = media.label_text();
if let Some(slot) = changer.export_media(label_text)? {
- worker.log(format!("exported media '{}' to import/export slot {}", label_text, slot));
+ task_log!(
+ worker,
+ "exported media '{}' to import/export slot {}",
+ label_text,
+ slot
+ );
} else {
- worker.warn(format!("export failed - media '{}' is not online", label_text));
+ task_warn!(
+ worker,
+ "export failed - media '{}' is not online or in different drive",
+ label_text
+ );
}
}
-
} else if let Some(mut status) = status {
- worker.log("standalone drive - ejecting media instead of export");
+ task_log!(
+ worker,
+ "standalone drive - ejecting media instead of export"
+ );
status.drive.eject_media()?;
}
/// This is done automatically during a backupsession, but needs to
/// be called explicitly before dropping the PoolWriter
pub fn commit(&mut self) -> Result<(), Error> {
- if let Some(PoolWriterState {ref mut drive, .. }) = self.status {
+ if let Some(PoolWriterState { ref mut drive, .. }) = self.status {
drive.sync()?; // sync all data to the tape
}
self.catalog_set.lock().unwrap().commit()?; // then commit the catalog
/// Load a writable media into the drive
pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
let last_media_uuid = match self.status {
- Some(PoolWriterState { ref media_uuid, ..}) => Some(media_uuid.clone()),
+ Some(PoolWriterState { ref media_uuid, .. }) => Some(media_uuid.clone()),
None => None,
};
- let current_time = proxmox::tools::time::epoch_i64();
+ let current_time = proxmox_time::epoch_i64();
let media_uuid = self.pool.alloc_writable_media(current_time)?;
let media = self.pool.lookup_media(&media_uuid).unwrap();
};
if !media_changed {
+ self.used_tapes.insert(media_uuid.clone());
return Ok(media_uuid);
}
- task_log!(worker, "allocated new writable media '{}'", media.label_text());
+ task_log!(
+ worker,
+ "allocated new writable media '{}'",
+ media.label_text()
+ );
- if let Some(PoolWriterState {mut drive, .. }) = self.status.take() {
+ if let Some(PoolWriterState { mut drive, .. }) = self.status.take() {
if last_media_uuid.is_some() {
task_log!(worker, "eject current media");
drive.eject_media()?;
let (drive_config, _digest) = pbs_config::drive::config()?;
- let (mut drive, old_media_id) =
- request_and_load_media(worker, &drive_config, &self.drive_name, media.label(), &self.notify_email)?;
+ let (mut drive, old_media_id) = request_and_load_media(
+ worker,
+ &drive_config,
+ &self.drive_name,
+ media.label(),
+ &self.notify_email,
+ )?;
// test for critical tape alert flags
if let Ok(alert_flags) = drive.tape_alert_flags() {
if !alert_flags.is_empty() {
- worker.log(format!("TapeAlertFlags: {:?}", alert_flags));
+ task_log!(worker, "TapeAlertFlags: {:?}", alert_flags);
if tape_alert_flags_critical(alert_flags) {
self.pool.set_media_status_damaged(&media_uuid)?;
- bail!("aborting due to critical tape alert flags: {:?}", alert_flags);
+ bail!(
+ "aborting due to critical tape alert flags: {:?}",
+ alert_flags
+ );
}
}
}
self.catalog_set.lock().unwrap().append_catalog(catalog)?;
- let media_set = media.media_set_label().clone().unwrap();
+ let media_set = media.media_set_label().unwrap();
- let encrypt_fingerprint = media_set
- .encryption_key_fingerprint
- .clone()
- .map(|fp| (fp, media_set.uuid.clone()));
+ if is_new_media && media_set.seq_nr >= MEDIA_SET_SEQ_NR_WARN_LIMIT {
+ task_warn!(
+ worker,
+ "large media-set detected ({}), consider using a different allocation policy",
+ media_set.seq_nr
+ );
+ }
- drive.set_encryption(encrypt_fingerprint)?;
+ drive.assert_encryption_mode(media_set.encryption_key_fingerprint.is_some())?;
self.status = Some(PoolWriterState {
drive,
self.append_media_set_catalogs(worker)?;
}
+ self.used_tapes.insert(media_uuid.clone());
Ok(media_uuid)
}
fn open_catalog_file(uuid: &Uuid) -> Result<File, Error> {
-
- let status_path = Path::new(TAPE_STATUS_DIR);
- let mut path = status_path.to_owned();
+ let mut path = PathBuf::from(TAPE_STATUS_DIR);
path.push(uuid.to_string());
path.set_extension("log");
- let file = std::fs::OpenOptions::new()
- .read(true)
- .open(&path)?;
+ let file = std::fs::OpenOptions::new().read(true).open(&path)?;
Ok(file)
}
// Check it tape is loaded, then move to EOM (if not already there)
//
// Returns the tape position at EOM.
- fn prepare_tape_write(
- status: &mut PoolWriterState,
- worker: &WorkerTask,
- ) -> Result<u64, Error> {
-
+ fn prepare_tape_write(status: &mut PoolWriterState, worker: &WorkerTask) -> Result<u64, Error> {
if !status.at_eom {
- worker.log(String::from("moving to end of media"));
+ task_log!(worker, "moving to end of media");
status.drive.move_to_eom(true)?;
status.at_eom = true;
+ task_log!(worker, "arrived at end of media");
}
let current_file_number = status.drive.current_file_number()?;
if current_file_number < 2 {
- bail!("got strange file position number from drive ({})", current_file_number);
+ bail!(
+ "got strange file position number from drive ({})",
+ current_file_number
+ );
}
Ok(current_file_number)
/// on the media (return value 'Ok(false, _)'). In that case, the
/// archive is marked incomplete. The caller should mark the media
/// as full and try again using another media.
- pub fn append_catalog_archive(
- &mut self,
- worker: &WorkerTask,
- ) -> Result<bool, Error> {
+ pub fn append_catalog_archive(&mut self, worker: &WorkerTask) -> Result<bool, Error> {
+ let catalog_magic = self.catalog_version();
let status = match self.status {
Some(ref mut status) => status,
media_set.uuid(),
seq_nr,
&mut file,
- )?.is_some();
+ catalog_magic,
+ )?
+ .is_some();
Ok(done)
}
// Append catalogs for all previous media in set (without last)
- fn append_media_set_catalogs(
- &mut self,
- worker: &WorkerTask,
- ) -> Result<(), Error> {
-
+ fn append_media_set_catalogs(&mut self, worker: &WorkerTask) -> Result<(), Error> {
let media_set = self.pool.current_media_set();
- let mut media_list = &media_set.media_list()[..];
+ let mut media_list = media_set.media_list();
if media_list.len() < 2 {
return Ok(());
}
- media_list = &media_list[..(media_list.len()-1)];
+ media_list = &media_list[..(media_list.len() - 1)];
+
+ let catalog_magic = self.catalog_version();
let status = match self.status {
Some(ref mut status) => status,
Self::prepare_tape_write(status, worker)?;
for (seq_nr, uuid) in media_list.iter().enumerate() {
-
let uuid = match uuid {
None => bail!("got incomplete media list - internal error"),
Some(uuid) => uuid,
media_set.uuid(),
seq_nr,
&mut file,
- )?.is_none() {
+ catalog_magic,
+ )?
+ .is_none()
+ {
bail!("got EOM while writing start catalog");
}
}
worker: &WorkerTask,
snapshot_reader: &SnapshotReader,
) -> Result<(bool, usize), Error> {
-
let status = match self.status {
Some(ref mut status) => status,
None => bail!("PoolWriter - no media loaded"),
self.catalog_set.lock().unwrap().register_snapshot(
content_uuid,
current_file_number,
- &snapshot_reader.datastore_name().to_string(),
- &snapshot_reader.snapshot().to_string(),
+ snapshot_reader.datastore_name(),
+ snapshot_reader.snapshot().backup_ns(),
+ snapshot_reader.snapshot().as_ref(),
)?;
(true, writer.bytes_written())
}
chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
store: &str,
) -> Result<(bool, usize), Error> {
-
let status = match self.status {
Some(ref mut status) => status,
None => bail!("PoolWriter - no media loaded"),
let start_time = SystemTime::now();
- let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
- worker,
- writer,
- chunk_iter,
- store,
- MAX_CHUNK_ARCHIVE_SIZE,
- )?;
+ let (saved_chunks, content_uuid, leom, bytes_written) =
+ write_chunk_archive(worker, writer, chunk_iter, store, MAX_CHUNK_ARCHIVE_SIZE)?;
status.bytes_written += bytes_written;
- let elapsed = start_time.elapsed()?.as_secs_f64();
- worker.log(format!(
+ let elapsed = start_time.elapsed()?.as_secs_f64();
+ task_log!(
+ worker,
"wrote {} chunks ({:.2} MB at {:.2} MB/s)",
saved_chunks.len(),
- bytes_written as f64 /1_000_000.0,
- (bytes_written as f64)/(1_000_000.0*elapsed),
- ));
+ bytes_written as f64 / 1_000_000.0,
+ (bytes_written as f64) / (1_000_000.0 * elapsed),
+ );
let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
// register chunks in media_catalog
- self.catalog_set.lock().unwrap()
- .register_chunk_archive(content_uuid, current_file_number, store, &saved_chunks)?;
+ self.catalog_set.lock().unwrap().register_chunk_archive(
+ content_uuid,
+ current_file_number,
+ store,
+ &saved_chunks,
+ )?;
if leom || request_sync {
self.commit()?;
datastore: Arc<DataStore>,
snapshot_reader: Arc<Mutex<SnapshotReader>>,
) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
- NewChunksIterator::spawn(
- datastore,
- snapshot_reader,
- Arc::clone(&self.catalog_set),
- )
+ NewChunksIterator::spawn(datastore, snapshot_reader, Arc::clone(&self.catalog_set))
+ }
+
+ pub(crate) fn catalog_version(&self) -> [u8; 8] {
+ if self.ns_magic {
+ PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1
+ } else {
+ PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
+ }
}
}
/// write up to <max_size> of chunks
+#[allow(clippy::type_complexity)]
fn write_chunk_archive<'a>(
_worker: &WorkerTask,
writer: Box<dyn 'a + TapeWrite>,
chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
store: &str,
max_size: usize,
-) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
-
+) -> Result<(Vec<[u8; 32]>, Uuid, bool, usize), Error> {
let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, store, true)?;
// we want to get the chunk list in correct order
- let mut chunk_list: Vec<[u8;32]> = Vec::new();
+ let mut chunk_list: Vec<[u8; 32]> = Vec::new();
let mut leom = false;
Some(Err(err)) => bail!("{}", err),
};
- //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(digest), blob.raw_size());
+ //println!("CHUNK {} size {}", hex::encode(digest), blob.raw_size());
- match writer.try_write_chunk(&digest, &blob) {
+ match writer.try_write_chunk(digest, blob) {
Ok(true) => {
chunk_list.push(*digest);
chunk_iter.next(); // consume
}
if writer.bytes_written() > max_size {
- //worker.log("Chunk Archive max size reached, closing archive".to_string());
+ //task_log!(worker, "Chunk Archive max size reached, closing archive");
break;
}
}
old_set: Option<MediaSetLabel>,
media_id: &MediaId,
) -> Result<(MediaCatalog, bool), Error> {
-
let media_catalog;
let new_set = match media_id.media_set_label {
match config_map.get(fingerprint) {
Some(key_config) => Some(key_config.clone()),
None => {
- bail!("unable to find tape encryption key config '{}'", fingerprint);
+ bail!(
+ "unable to find tape encryption key config '{}'",
+ fingerprint
+ );
}
}
} else {
None
};
- let status_path = Path::new(TAPE_STATUS_DIR);
-
let new_media = match old_set {
None => {
- worker.log("writing new media set label".to_string());
+ task_log!(worker, "writing new media set label");
drive.write_media_set_label(new_set, key_config.as_ref())?;
- media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
+ media_catalog = MediaCatalog::overwrite(TAPE_STATUS_DIR, media_id, false)?;
true
}
Some(media_set_label) => {
if new_set.uuid == media_set_label.uuid {
if new_set.seq_nr != media_set_label.seq_nr {
- bail!("got media with wrong media sequence number ({} != {}",
- new_set.seq_nr,media_set_label.seq_nr);
+ bail!(
+ "got media with wrong media sequence number ({} != {}",
+ new_set.seq_nr,
+ media_set_label.seq_nr
+ );
}
- if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint {
+ if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint
+ {
bail!("detected changed encryption fingerprint - internal error");
}
- media_catalog = MediaCatalog::open(status_path, &media_id, true, false)?;
+ media_catalog = MediaCatalog::open(TAPE_STATUS_DIR, media_id, true, false)?;
// todo: verify last content/media_catalog somehow?
false
} else {
- worker.log(
- format!("writing new media set label (overwrite '{}/{}')",
- media_set_label.uuid.to_string(), media_set_label.seq_nr)
+ task_log!(
+ worker,
+ "writing new media set label (overwrite '{}/{}')",
+ media_set_label.uuid.to_string(),
+ media_set_label.seq_nr,
);
drive.write_media_set_label(new_set, key_config.as_ref())?;
- media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
+ media_catalog = MediaCatalog::overwrite(TAPE_STATUS_DIR, media_id, false)?;
true
}
}