2 pub use catalog_set
::*;
4 mod new_chunks_iterator
;
5 pub use new_chunks_iterator
::*;
9 use std
::time
::SystemTime
;
10 use std
::sync
::{Arc, Mutex}
;
12 use anyhow
::{bail, Error}
;
14 use proxmox_uuid
::Uuid
;
16 use pbs_tools
::{task_log, task_warn}
;
17 use pbs_config
::tape_encryption_keys
::load_key_configs
;
20 sg_tape
::tape_alert_flags_critical
,
22 use pbs_datastore
::{DataStore, SnapshotReader}
;
23 use proxmox_rest_server
::WorkerTask
;
28 MAX_CHUNK_ARCHIVE_SIZE
,
36 tape_write_snapshot_archive
,
41 request_and_load_media
,
48 struct PoolWriterState
{
49 drive
: Box
<dyn TapeDriver
>,
50 // Media Uuid from loaded media
52 // tell if we already moved to EOM
54 // bytes written after the last tape fush/sync
58 /// Helper to manage a backup job, writing several tapes of a pool
59 pub struct PoolWriter
{
62 status
: Option
<PoolWriterState
>,
63 catalog_set
: Arc
<Mutex
<CatalogSet
>>,
64 notify_email
: Option
<String
>,
73 notify_email
: Option
<String
>,
74 force_media_set
: bool
,
75 ) -> Result
<Self, Error
> {
77 let current_time
= proxmox_time
::epoch_i64();
79 let new_media_set_reason
= pool
.start_write_session(current_time
, force_media_set
)?
;
80 if let Some(reason
) = new_media_set_reason
{
83 "starting new media set - reason: {}",
88 let media_set_uuid
= pool
.current_media_set().uuid();
89 task_log
!(worker
, "media set uuid: {}", media_set_uuid
);
91 let mut catalog_set
= CatalogSet
::new();
93 // load all catalogs read-only at start
94 for media_uuid
in pool
.current_media_list()?
{
95 let media_info
= pool
.lookup_media(media_uuid
).unwrap();
96 let media_catalog
= MediaCatalog
::open(
97 Path
::new(TAPE_STATUS_DIR
),
102 catalog_set
.append_read_only_catalog(media_catalog
)?
;
107 drive_name
: drive_name
.to_string(),
109 catalog_set
: Arc
::new(Mutex
::new(catalog_set
)),
114 pub fn pool(&mut self) -> &mut MediaPool
{
118 /// Set media status to FULL (persistent - stores pool status)
119 pub fn set_media_status_full(&mut self, uuid
: &Uuid
) -> Result
<(), Error
> {
120 self.pool
.set_media_status_full(&uuid
)?
;
124 pub fn contains_snapshot(&self, store
: &str, snapshot
: &str) -> bool
{
125 self.catalog_set
.lock().unwrap().contains_snapshot(store
, snapshot
)
128 /// Eject media and drop PoolWriterState (close drive)
129 pub fn eject_media(&mut self, worker
: &WorkerTask
) -> Result
<(), Error
> {
130 let mut status
= match self.status
.take() {
131 Some(status
) => status
,
132 None
=> return Ok(()), // no media loaded
135 let (drive_config
, _digest
) = pbs_config
::drive
::config()?
;
137 if let Some((mut changer
, _
)) = media_changer(&drive_config
, &self.drive_name
)?
{
138 task_log
!(worker
, "eject media");
139 status
.drive
.eject_media()?
; // rewind and eject early, so that unload_media is faster
140 drop(status
); // close drive
141 task_log
!(worker
, "unload media");
142 changer
.unload_media(None
)?
; //eject and unload
144 task_log
!(worker
, "standalone drive - ejecting media");
145 status
.drive
.eject_media()?
;
151 /// Export current media set and drop PoolWriterState (close drive)
152 pub fn export_media_set(&mut self, worker
: &WorkerTask
) -> Result
<(), Error
> {
153 let mut status
= self.status
.take();
155 let (drive_config
, _digest
) = pbs_config
::drive
::config()?
;
157 if let Some((mut changer
, _
)) = media_changer(&drive_config
, &self.drive_name
)?
{
159 if let Some(ref mut status
) = status
{
160 task_log
!(worker
, "rewind media");
161 // rewind first so that the unload command later does not run into a timeout
162 status
.drive
.rewind()?
;
164 drop(status
); // close drive
166 for media_uuid
in self.pool
.current_media_list()?
{
167 let media
= self.pool
.lookup_media(media_uuid
)?
;
168 let label_text
= media
.label_text();
169 if let Some(slot
) = changer
.export_media(label_text
)?
{
170 task_log
!(worker
, "exported media '{}' to import/export slot {}", label_text
, slot
);
172 task_warn
!(worker
, "export failed - media '{}' is not online or in different drive", label_text
);
176 } else if let Some(mut status
) = status
{
177 task_log
!(worker
, "standalone drive - ejecting media instead of export");
178 status
.drive
.eject_media()?
;
184 /// commit changes to tape and catalog
186 /// This is done automatically during a backupsession, but needs to
187 /// be called explicitly before dropping the PoolWriter
188 pub fn commit(&mut self) -> Result
<(), Error
> {
189 if let Some(PoolWriterState {ref mut drive, .. }
) = self.status
{
190 drive
.sync()?
; // sync all data to the tape
192 self.catalog_set
.lock().unwrap().commit()?
; // then commit the catalog
196 /// Load a writable media into the drive
197 pub fn load_writable_media(&mut self, worker
: &WorkerTask
) -> Result
<Uuid
, Error
> {
198 let last_media_uuid
= match self.status
{
199 Some(PoolWriterState { ref media_uuid, ..}
) => Some(media_uuid
.clone()),
203 let current_time
= proxmox_time
::epoch_i64();
204 let media_uuid
= self.pool
.alloc_writable_media(current_time
)?
;
206 let media
= self.pool
.lookup_media(&media_uuid
).unwrap();
208 let media_changed
= match last_media_uuid
{
209 Some(ref last_media_uuid
) => last_media_uuid
!= &media_uuid
,
214 return Ok(media_uuid
);
217 task_log
!(worker
, "allocated new writable media '{}'", media
.label_text());
219 if let Some(PoolWriterState {mut drive, .. }
) = self.status
.take() {
220 if last_media_uuid
.is_some() {
221 task_log
!(worker
, "eject current media");
222 drive
.eject_media()?
;
226 let (drive_config
, _digest
) = pbs_config
::drive
::config()?
;
228 let (mut drive
, old_media_id
) =
229 request_and_load_media(worker
, &drive_config
, &self.drive_name
, media
.label(), &self.notify_email
)?
;
231 // test for critical tape alert flags
232 if let Ok(alert_flags
) = drive
.tape_alert_flags() {
233 if !alert_flags
.is_empty() {
234 task_log
!(worker
, "TapeAlertFlags: {:?}", alert_flags
);
235 if tape_alert_flags_critical(alert_flags
) {
236 self.pool
.set_media_status_damaged(&media_uuid
)?
;
237 bail
!("aborting due to critical tape alert flags: {:?}", alert_flags
);
242 let (catalog
, is_new_media
) = update_media_set_label(
245 old_media_id
.media_set_label
,
249 self.catalog_set
.lock().unwrap().append_catalog(catalog
)?
;
251 let media_set
= media
.media_set_label().clone().unwrap();
253 let encrypt_fingerprint
= media_set
254 .encryption_key_fingerprint
256 .map(|fp
| (fp
, media_set
.uuid
.clone()));
258 drive
.set_encryption(encrypt_fingerprint
)?
;
260 self.status
= Some(PoolWriterState
{
262 media_uuid
: media_uuid
.clone(),
268 // add catalogs from previous media
269 self.append_media_set_catalogs(worker
)?
;
275 fn open_catalog_file(uuid
: &Uuid
) -> Result
<File
, Error
> {
277 let status_path
= Path
::new(TAPE_STATUS_DIR
);
278 let mut path
= status_path
.to_owned();
279 path
.push(uuid
.to_string());
280 path
.set_extension("log");
282 let file
= std
::fs
::OpenOptions
::new()
289 // Check it tape is loaded, then move to EOM (if not already there)
291 // Returns the tape position at EOM.
292 fn prepare_tape_write(
293 status
: &mut PoolWriterState
,
295 ) -> Result
<u64, Error
> {
298 task_log
!(worker
, "moving to end of media");
299 status
.drive
.move_to_eom(true)?
;
300 status
.at_eom
= true;
303 let current_file_number
= status
.drive
.current_file_number()?
;
304 if current_file_number
< 2 {
305 bail
!("got strange file position number from drive ({})", current_file_number
);
308 Ok(current_file_number
)
311 /// Move to EOM (if not already there), then write the current
312 /// catalog to the tape. On success, this return 'Ok(true)'.
314 /// Please note that this may fail when there is not enough space
315 /// on the media (return value 'Ok(false, _)'). In that case, the
316 /// archive is marked incomplete. The caller should mark the media
317 /// as full and try again using another media.
318 pub fn append_catalog_archive(
321 ) -> Result
<bool
, Error
> {
323 let status
= match self.status
{
324 Some(ref mut status
) => status
,
325 None
=> bail
!("PoolWriter - no media loaded"),
328 Self::prepare_tape_write(status
, worker
)?
;
330 let catalog_set
= self.catalog_set
.lock().unwrap();
332 let catalog
= match catalog_set
.catalog
{
333 None
=> bail
!("append_catalog_archive failed: no catalog - internal error"),
334 Some(ref catalog
) => catalog
,
337 let media_set
= self.pool
.current_media_set();
339 let media_list
= media_set
.media_list();
340 let uuid
= match media_list
.last() {
341 None
=> bail
!("got empty media list - internal error"),
342 Some(None
) => bail
!("got incomplete media list - internal error"),
343 Some(Some(last_uuid
)) => {
344 if last_uuid
!= catalog
.uuid() {
345 bail
!("got wrong media - internal error");
351 let seq_nr
= media_list
.len() - 1;
353 let mut writer
: Box
<dyn TapeWrite
> = status
.drive
.write_file()?
;
355 let mut file
= Self::open_catalog_file(uuid
)?
;
357 let done
= tape_write_catalog(
368 // Append catalogs for all previous media in set (without last)
369 fn append_media_set_catalogs(
372 ) -> Result
<(), Error
> {
374 let media_set
= self.pool
.current_media_set();
376 let mut media_list
= &media_set
.media_list()[..];
377 if media_list
.len() < 2 {
380 media_list
= &media_list
[..(media_list
.len()-1)];
382 let status
= match self.status
{
383 Some(ref mut status
) => status
,
384 None
=> bail
!("PoolWriter - no media loaded"),
387 Self::prepare_tape_write(status
, worker
)?
;
389 for (seq_nr
, uuid
) in media_list
.iter().enumerate() {
391 let uuid
= match uuid
{
392 None
=> bail
!("got incomplete media list - internal error"),
396 let mut writer
: Box
<dyn TapeWrite
> = status
.drive
.write_file()?
;
398 let mut file
= Self::open_catalog_file(uuid
)?
;
400 task_log
!(worker
, "write catalog for previous media: {}", uuid
);
402 if tape_write_catalog(
409 bail
!("got EOM while writing start catalog");
416 /// Move to EOM (if not already there), then creates a new snapshot
417 /// archive writing specified files (as .pxar) into it. On
418 /// success, this return 'Ok(true)' and the media catalog gets
421 /// Please note that this may fail when there is not enough space
422 /// on the media (return value 'Ok(false, _)'). In that case, the
423 /// archive is marked incomplete, and we do not use it. The caller
424 /// should mark the media as full and try again using another
426 pub fn append_snapshot_archive(
429 snapshot_reader
: &SnapshotReader
,
430 ) -> Result
<(bool
, usize), Error
> {
432 let status
= match self.status
{
433 Some(ref mut status
) => status
,
434 None
=> bail
!("PoolWriter - no media loaded"),
437 let current_file_number
= Self::prepare_tape_write(status
, worker
)?
;
439 let (done
, bytes_written
) = {
440 let mut writer
: Box
<dyn TapeWrite
> = status
.drive
.write_file()?
;
442 match tape_write_snapshot_archive(writer
.as_mut(), snapshot_reader
)?
{
443 Some(content_uuid
) => {
444 self.catalog_set
.lock().unwrap().register_snapshot(
447 &snapshot_reader
.datastore_name().to_string(),
448 &snapshot_reader
.snapshot().to_string(),
450 (true, writer
.bytes_written())
452 None
=> (false, writer
.bytes_written()),
456 status
.bytes_written
+= bytes_written
;
458 let request_sync
= status
.bytes_written
>= COMMIT_BLOCK_SIZE
;
460 if !done
|| request_sync
{
464 Ok((done
, bytes_written
))
467 /// Move to EOM (if not already there), then creates a new chunk
468 /// archive and writes chunks from 'chunk_iter'. This stops when
469 /// it detect LEOM or when we reach max archive size
470 /// (4GB). Written chunks are registered in the media catalog.
471 pub fn append_chunk_archive(
474 chunk_iter
: &mut std
::iter
::Peekable
<NewChunksIterator
>,
476 ) -> Result
<(bool
, usize), Error
> {
478 let status
= match self.status
{
479 Some(ref mut status
) => status
,
480 None
=> bail
!("PoolWriter - no media loaded"),
483 let current_file_number
= Self::prepare_tape_write(status
, worker
)?
;
485 let writer
= status
.drive
.write_file()?
;
487 let start_time
= SystemTime
::now();
489 let (saved_chunks
, content_uuid
, leom
, bytes_written
) = write_chunk_archive(
494 MAX_CHUNK_ARCHIVE_SIZE
,
497 status
.bytes_written
+= bytes_written
;
499 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
502 "wrote {} chunks ({:.2} MB at {:.2} MB/s)",
504 bytes_written
as f64 /1_000_000.0,
505 (bytes_written
as f64)/(1_000_000.0*elapsed
),
508 let request_sync
= status
.bytes_written
>= COMMIT_BLOCK_SIZE
;
510 // register chunks in media_catalog
511 self.catalog_set
.lock().unwrap()
512 .register_chunk_archive(content_uuid
, current_file_number
, store
, &saved_chunks
)?
;
514 if leom
|| request_sync
{
518 Ok((leom
, bytes_written
))
521 pub fn spawn_chunk_reader_thread(
523 datastore
: Arc
<DataStore
>,
524 snapshot_reader
: Arc
<Mutex
<SnapshotReader
>>,
525 ) -> Result
<(std
::thread
::JoinHandle
<()>, NewChunksIterator
), Error
> {
526 NewChunksIterator
::spawn(
529 Arc
::clone(&self.catalog_set
),
534 /// write up to <max_size> of chunks
535 fn write_chunk_archive
<'a
>(
536 _worker
: &WorkerTask
,
537 writer
: Box
<dyn 'a
+ TapeWrite
>,
538 chunk_iter
: &mut std
::iter
::Peekable
<NewChunksIterator
>,
541 ) -> Result
<(Vec
<[u8;32]>, Uuid
, bool
, usize), Error
> {
543 let (mut writer
, content_uuid
) = ChunkArchiveWriter
::new(writer
, store
, true)?
;
545 // we want to get the chunk list in correct order
546 let mut chunk_list
: Vec
<[u8;32]> = Vec
::new();
548 let mut leom
= false;
551 let (digest
, blob
) = match chunk_iter
.peek() {
553 Some(Ok((digest
, blob
))) => (digest
, blob
),
554 Some(Err(err
)) => bail
!("{}", err
),
557 //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(digest), blob.raw_size());
559 match writer
.try_write_chunk(&digest
, &blob
) {
561 chunk_list
.push(*digest
);
562 chunk_iter
.next(); // consume
565 // Note; we do not consume the chunk (no chunk_iter.next())
569 Err(err
) => bail
!("write chunk failed - {}", err
),
572 if writer
.bytes_written() > max_size
{
573 //task_log!(worker, "Chunk Archive max size reached, closing archive");
580 Ok((chunk_list
, content_uuid
, leom
, writer
.bytes_written()))
583 // Compare the media set label. If the media is empty, or the existing
584 // set label does not match the expected media set, overwrite the
586 fn update_media_set_label(
588 drive
: &mut dyn TapeDriver
,
589 old_set
: Option
<MediaSetLabel
>,
591 ) -> Result
<(MediaCatalog
, bool
), Error
> {
595 let new_set
= match media_id
.media_set_label
{
596 None
=> bail
!("got media without media set - internal error"),
597 Some(ref set
) => set
,
600 let key_config
= if let Some(ref fingerprint
) = new_set
.encryption_key_fingerprint
{
601 let (config_map
, _digest
) = load_key_configs()?
;
602 match config_map
.get(fingerprint
) {
603 Some(key_config
) => Some(key_config
.clone()),
605 bail
!("unable to find tape encryption key config '{}'", fingerprint
);
612 let status_path
= Path
::new(TAPE_STATUS_DIR
);
614 let new_media
= match old_set
{
616 task_log
!(worker
, "writing new media set label");
617 drive
.write_media_set_label(new_set
, key_config
.as_ref())?
;
618 media_catalog
= MediaCatalog
::overwrite(status_path
, media_id
, false)?
;
621 Some(media_set_label
) => {
622 if new_set
.uuid
== media_set_label
.uuid
{
623 if new_set
.seq_nr
!= media_set_label
.seq_nr
{
624 bail
!("got media with wrong media sequence number ({} != {}",
625 new_set
.seq_nr
,media_set_label
.seq_nr
);
627 if new_set
.encryption_key_fingerprint
!= media_set_label
.encryption_key_fingerprint
{
628 bail
!("detected changed encryption fingerprint - internal error");
630 media_catalog
= MediaCatalog
::open(status_path
, &media_id
, true, false)?
;
632 // todo: verify last content/media_catalog somehow?
638 "writing new media set label (overwrite '{}/{}')",
639 media_set_label
.uuid
.to_string(),
640 media_set_label
.seq_nr
,
643 drive
.write_media_set_label(new_set
, key_config
.as_ref())?
;
644 media_catalog
= MediaCatalog
::overwrite(status_path
, media_id
, false)?
;
650 Ok((media_catalog
, new_media
))