2 pub use catalog_set
::*;
4 mod new_chunks_iterator
;
5 pub use new_chunks_iterator
::*;
9 use std
::time
::SystemTime
;
10 use std
::sync
::{Arc, Mutex}
;
12 use anyhow
::{bail, Error}
;
14 use proxmox
::tools
::Uuid
;
16 use pbs_tools
::{task_log, task_warn}
;
17 use pbs_config
::tape_encryption_keys
::load_key_configs
;
20 sg_tape
::tape_alert_flags_critical
,
22 use pbs_datastore
::{DataStore, SnapshotReader}
;
23 use proxmox_rest_server
::WorkerTask
;
28 MAX_CHUNK_ARCHIVE_SIZE
,
36 tape_write_snapshot_archive
,
41 request_and_load_media
,
48 struct PoolWriterState
{
49 drive
: Box
<dyn TapeDriver
>,
50 // Media Uuid from loaded media
52 // tell if we already moved to EOM
54 // bytes written after the last tape fush/sync
58 /// Helper to manage a backup job, writing several tapes of a pool
59 pub struct PoolWriter
{
62 status
: Option
<PoolWriterState
>,
63 catalog_set
: Arc
<Mutex
<CatalogSet
>>,
64 notify_email
: Option
<String
>,
73 notify_email
: Option
<String
>,
74 force_media_set
: bool
,
75 ) -> Result
<Self, Error
> {
77 let current_time
= proxmox
::tools
::time
::epoch_i64();
79 let new_media_set_reason
= pool
.start_write_session(current_time
, force_media_set
)?
;
80 if let Some(reason
) = new_media_set_reason
{
83 "starting new media set - reason: {}",
88 let media_set_uuid
= pool
.current_media_set().uuid();
89 task_log
!(worker
, "media set uuid: {}", media_set_uuid
);
91 let mut catalog_set
= CatalogSet
::new();
93 // load all catalogs read-only at start
94 for media_uuid
in pool
.current_media_list()?
{
95 let media_info
= pool
.lookup_media(media_uuid
).unwrap();
96 let media_catalog
= MediaCatalog
::open(
97 Path
::new(TAPE_STATUS_DIR
),
102 catalog_set
.append_read_only_catalog(media_catalog
)?
;
107 drive_name
: drive_name
.to_string(),
109 catalog_set
: Arc
::new(Mutex
::new(catalog_set
)),
114 pub fn pool(&mut self) -> &mut MediaPool
{
118 /// Set media status to FULL (persistent - stores pool status)
119 pub fn set_media_status_full(&mut self, uuid
: &Uuid
) -> Result
<(), Error
> {
120 self.pool
.set_media_status_full(&uuid
)?
;
124 pub fn contains_snapshot(&self, store
: &str, snapshot
: &str) -> bool
{
125 self.catalog_set
.lock().unwrap().contains_snapshot(store
, snapshot
)
128 /// Eject media and drop PoolWriterState (close drive)
129 pub fn eject_media(&mut self, worker
: &WorkerTask
) -> Result
<(), Error
> {
130 let mut status
= match self.status
.take() {
131 Some(status
) => status
,
132 None
=> return Ok(()), // no media loaded
135 let (drive_config
, _digest
) = pbs_config
::drive
::config()?
;
137 if let Some((mut changer
, _
)) = media_changer(&drive_config
, &self.drive_name
)?
{
138 task_log
!(worker
, "eject media");
139 status
.drive
.eject_media()?
; // rewind and eject early, so that unload_media is faster
140 drop(status
); // close drive
141 task_log
!(worker
, "unload media");
142 changer
.unload_media(None
)?
; //eject and unload
144 task_log
!(worker
, "standalone drive - ejecting media");
145 status
.drive
.eject_media()?
;
151 /// Export current media set and drop PoolWriterState (close drive)
152 pub fn export_media_set(&mut self, worker
: &WorkerTask
) -> Result
<(), Error
> {
153 let mut status
= self.status
.take();
155 let (drive_config
, _digest
) = pbs_config
::drive
::config()?
;
157 if let Some((mut changer
, _
)) = media_changer(&drive_config
, &self.drive_name
)?
{
159 if let Some(ref mut status
) = status
{
160 task_log
!(worker
, "eject media");
161 status
.drive
.eject_media()?
; // rewind and eject early, so that unload_media is faster
163 drop(status
); // close drive
165 task_log
!(worker
, "unload media");
166 changer
.unload_media(None
)?
;
168 for media_uuid
in self.pool
.current_media_list()?
{
169 let media
= self.pool
.lookup_media(media_uuid
)?
;
170 let label_text
= media
.label_text();
171 if let Some(slot
) = changer
.export_media(label_text
)?
{
172 task_log
!(worker
, "exported media '{}' to import/export slot {}", label_text
, slot
);
174 task_warn
!(worker
, "export failed - media '{}' is not online", label_text
);
178 } else if let Some(mut status
) = status
{
179 task_log
!(worker
, "standalone drive - ejecting media instead of export");
180 status
.drive
.eject_media()?
;
186 /// commit changes to tape and catalog
188 /// This is done automatically during a backupsession, but needs to
189 /// be called explicitly before dropping the PoolWriter
190 pub fn commit(&mut self) -> Result
<(), Error
> {
191 if let Some(PoolWriterState {ref mut drive, .. }
) = self.status
{
192 drive
.sync()?
; // sync all data to the tape
194 self.catalog_set
.lock().unwrap().commit()?
; // then commit the catalog
198 /// Load a writable media into the drive
199 pub fn load_writable_media(&mut self, worker
: &WorkerTask
) -> Result
<Uuid
, Error
> {
200 let last_media_uuid
= match self.status
{
201 Some(PoolWriterState { ref media_uuid, ..}
) => Some(media_uuid
.clone()),
205 let current_time
= proxmox
::tools
::time
::epoch_i64();
206 let media_uuid
= self.pool
.alloc_writable_media(current_time
)?
;
208 let media
= self.pool
.lookup_media(&media_uuid
).unwrap();
210 let media_changed
= match last_media_uuid
{
211 Some(ref last_media_uuid
) => last_media_uuid
!= &media_uuid
,
216 return Ok(media_uuid
);
219 task_log
!(worker
, "allocated new writable media '{}'", media
.label_text());
221 if let Some(PoolWriterState {mut drive, .. }
) = self.status
.take() {
222 if last_media_uuid
.is_some() {
223 task_log
!(worker
, "eject current media");
224 drive
.eject_media()?
;
228 let (drive_config
, _digest
) = pbs_config
::drive
::config()?
;
230 let (mut drive
, old_media_id
) =
231 request_and_load_media(worker
, &drive_config
, &self.drive_name
, media
.label(), &self.notify_email
)?
;
233 // test for critical tape alert flags
234 if let Ok(alert_flags
) = drive
.tape_alert_flags() {
235 if !alert_flags
.is_empty() {
236 task_log
!(worker
, "TapeAlertFlags: {:?}", alert_flags
);
237 if tape_alert_flags_critical(alert_flags
) {
238 self.pool
.set_media_status_damaged(&media_uuid
)?
;
239 bail
!("aborting due to critical tape alert flags: {:?}", alert_flags
);
244 let (catalog
, is_new_media
) = update_media_set_label(
247 old_media_id
.media_set_label
,
251 self.catalog_set
.lock().unwrap().append_catalog(catalog
)?
;
253 let media_set
= media
.media_set_label().clone().unwrap();
255 let encrypt_fingerprint
= media_set
256 .encryption_key_fingerprint
258 .map(|fp
| (fp
, media_set
.uuid
.clone()));
260 drive
.set_encryption(encrypt_fingerprint
)?
;
262 self.status
= Some(PoolWriterState
{
264 media_uuid
: media_uuid
.clone(),
270 // add catalogs from previous media
271 self.append_media_set_catalogs(worker
)?
;
277 fn open_catalog_file(uuid
: &Uuid
) -> Result
<File
, Error
> {
279 let status_path
= Path
::new(TAPE_STATUS_DIR
);
280 let mut path
= status_path
.to_owned();
281 path
.push(uuid
.to_string());
282 path
.set_extension("log");
284 let file
= std
::fs
::OpenOptions
::new()
291 // Check it tape is loaded, then move to EOM (if not already there)
293 // Returns the tape position at EOM.
294 fn prepare_tape_write(
295 status
: &mut PoolWriterState
,
297 ) -> Result
<u64, Error
> {
300 task_log
!(worker
, "moving to end of media");
301 status
.drive
.move_to_eom(true)?
;
302 status
.at_eom
= true;
305 let current_file_number
= status
.drive
.current_file_number()?
;
306 if current_file_number
< 2 {
307 bail
!("got strange file position number from drive ({})", current_file_number
);
310 Ok(current_file_number
)
313 /// Move to EOM (if not already there), then write the current
314 /// catalog to the tape. On success, this return 'Ok(true)'.
316 /// Please note that this may fail when there is not enough space
317 /// on the media (return value 'Ok(false, _)'). In that case, the
318 /// archive is marked incomplete. The caller should mark the media
319 /// as full and try again using another media.
320 pub fn append_catalog_archive(
323 ) -> Result
<bool
, Error
> {
325 let status
= match self.status
{
326 Some(ref mut status
) => status
,
327 None
=> bail
!("PoolWriter - no media loaded"),
330 Self::prepare_tape_write(status
, worker
)?
;
332 let catalog_set
= self.catalog_set
.lock().unwrap();
334 let catalog
= match catalog_set
.catalog
{
335 None
=> bail
!("append_catalog_archive failed: no catalog - internal error"),
336 Some(ref catalog
) => catalog
,
339 let media_set
= self.pool
.current_media_set();
341 let media_list
= media_set
.media_list();
342 let uuid
= match media_list
.last() {
343 None
=> bail
!("got empty media list - internal error"),
344 Some(None
) => bail
!("got incomplete media list - internal error"),
345 Some(Some(last_uuid
)) => {
346 if last_uuid
!= catalog
.uuid() {
347 bail
!("got wrong media - internal error");
353 let seq_nr
= media_list
.len() - 1;
355 let mut writer
: Box
<dyn TapeWrite
> = status
.drive
.write_file()?
;
357 let mut file
= Self::open_catalog_file(uuid
)?
;
359 let done
= tape_write_catalog(
370 // Append catalogs for all previous media in set (without last)
371 fn append_media_set_catalogs(
374 ) -> Result
<(), Error
> {
376 let media_set
= self.pool
.current_media_set();
378 let mut media_list
= &media_set
.media_list()[..];
379 if media_list
.len() < 2 {
382 media_list
= &media_list
[..(media_list
.len()-1)];
384 let status
= match self.status
{
385 Some(ref mut status
) => status
,
386 None
=> bail
!("PoolWriter - no media loaded"),
389 Self::prepare_tape_write(status
, worker
)?
;
391 for (seq_nr
, uuid
) in media_list
.iter().enumerate() {
393 let uuid
= match uuid
{
394 None
=> bail
!("got incomplete media list - internal error"),
398 let mut writer
: Box
<dyn TapeWrite
> = status
.drive
.write_file()?
;
400 let mut file
= Self::open_catalog_file(uuid
)?
;
402 task_log
!(worker
, "write catalog for previous media: {}", uuid
);
404 if tape_write_catalog(
411 bail
!("got EOM while writing start catalog");
418 /// Move to EOM (if not already there), then creates a new snapshot
419 /// archive writing specified files (as .pxar) into it. On
420 /// success, this return 'Ok(true)' and the media catalog gets
423 /// Please note that this may fail when there is not enough space
424 /// on the media (return value 'Ok(false, _)'). In that case, the
425 /// archive is marked incomplete, and we do not use it. The caller
426 /// should mark the media as full and try again using another
428 pub fn append_snapshot_archive(
431 snapshot_reader
: &SnapshotReader
,
432 ) -> Result
<(bool
, usize), Error
> {
434 let status
= match self.status
{
435 Some(ref mut status
) => status
,
436 None
=> bail
!("PoolWriter - no media loaded"),
439 let current_file_number
= Self::prepare_tape_write(status
, worker
)?
;
441 let (done
, bytes_written
) = {
442 let mut writer
: Box
<dyn TapeWrite
> = status
.drive
.write_file()?
;
444 match tape_write_snapshot_archive(writer
.as_mut(), snapshot_reader
)?
{
445 Some(content_uuid
) => {
446 self.catalog_set
.lock().unwrap().register_snapshot(
449 &snapshot_reader
.datastore_name().to_string(),
450 &snapshot_reader
.snapshot().to_string(),
452 (true, writer
.bytes_written())
454 None
=> (false, writer
.bytes_written()),
458 status
.bytes_written
+= bytes_written
;
460 let request_sync
= status
.bytes_written
>= COMMIT_BLOCK_SIZE
;
462 if !done
|| request_sync
{
466 Ok((done
, bytes_written
))
469 /// Move to EOM (if not already there), then creates a new chunk
470 /// archive and writes chunks from 'chunk_iter'. This stops when
471 /// it detect LEOM or when we reach max archive size
472 /// (4GB). Written chunks are registered in the media catalog.
473 pub fn append_chunk_archive(
476 chunk_iter
: &mut std
::iter
::Peekable
<NewChunksIterator
>,
478 ) -> Result
<(bool
, usize), Error
> {
480 let status
= match self.status
{
481 Some(ref mut status
) => status
,
482 None
=> bail
!("PoolWriter - no media loaded"),
485 let current_file_number
= Self::prepare_tape_write(status
, worker
)?
;
487 let writer
= status
.drive
.write_file()?
;
489 let start_time
= SystemTime
::now();
491 let (saved_chunks
, content_uuid
, leom
, bytes_written
) = write_chunk_archive(
496 MAX_CHUNK_ARCHIVE_SIZE
,
499 status
.bytes_written
+= bytes_written
;
501 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
504 "wrote {} chunks ({:.2} MB at {:.2} MB/s)",
506 bytes_written
as f64 /1_000_000.0,
507 (bytes_written
as f64)/(1_000_000.0*elapsed
),
510 let request_sync
= status
.bytes_written
>= COMMIT_BLOCK_SIZE
;
512 // register chunks in media_catalog
513 self.catalog_set
.lock().unwrap()
514 .register_chunk_archive(content_uuid
, current_file_number
, store
, &saved_chunks
)?
;
516 if leom
|| request_sync
{
520 Ok((leom
, bytes_written
))
523 pub fn spawn_chunk_reader_thread(
525 datastore
: Arc
<DataStore
>,
526 snapshot_reader
: Arc
<Mutex
<SnapshotReader
>>,
527 ) -> Result
<(std
::thread
::JoinHandle
<()>, NewChunksIterator
), Error
> {
528 NewChunksIterator
::spawn(
531 Arc
::clone(&self.catalog_set
),
536 /// write up to <max_size> of chunks
537 fn write_chunk_archive
<'a
>(
538 _worker
: &WorkerTask
,
539 writer
: Box
<dyn 'a
+ TapeWrite
>,
540 chunk_iter
: &mut std
::iter
::Peekable
<NewChunksIterator
>,
543 ) -> Result
<(Vec
<[u8;32]>, Uuid
, bool
, usize), Error
> {
545 let (mut writer
, content_uuid
) = ChunkArchiveWriter
::new(writer
, store
, true)?
;
547 // we want to get the chunk list in correct order
548 let mut chunk_list
: Vec
<[u8;32]> = Vec
::new();
550 let mut leom
= false;
553 let (digest
, blob
) = match chunk_iter
.peek() {
555 Some(Ok((digest
, blob
))) => (digest
, blob
),
556 Some(Err(err
)) => bail
!("{}", err
),
559 //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(digest), blob.raw_size());
561 match writer
.try_write_chunk(&digest
, &blob
) {
563 chunk_list
.push(*digest
);
564 chunk_iter
.next(); // consume
567 // Note; we do not consume the chunk (no chunk_iter.next())
571 Err(err
) => bail
!("write chunk failed - {}", err
),
574 if writer
.bytes_written() > max_size
{
575 //task_log!(worker, "Chunk Archive max size reached, closing archive");
582 Ok((chunk_list
, content_uuid
, leom
, writer
.bytes_written()))
585 // Compare the media set label. If the media is empty, or the existing
586 // set label does not match the expected media set, overwrite the
588 fn update_media_set_label(
590 drive
: &mut dyn TapeDriver
,
591 old_set
: Option
<MediaSetLabel
>,
593 ) -> Result
<(MediaCatalog
, bool
), Error
> {
597 let new_set
= match media_id
.media_set_label
{
598 None
=> bail
!("got media without media set - internal error"),
599 Some(ref set
) => set
,
602 let key_config
= if let Some(ref fingerprint
) = new_set
.encryption_key_fingerprint
{
603 let (config_map
, _digest
) = load_key_configs()?
;
604 match config_map
.get(fingerprint
) {
605 Some(key_config
) => Some(key_config
.clone()),
607 bail
!("unable to find tape encryption key config '{}'", fingerprint
);
614 let status_path
= Path
::new(TAPE_STATUS_DIR
);
616 let new_media
= match old_set
{
618 task_log
!(worker
, "writing new media set label");
619 drive
.write_media_set_label(new_set
, key_config
.as_ref())?
;
620 media_catalog
= MediaCatalog
::overwrite(status_path
, media_id
, false)?
;
623 Some(media_set_label
) => {
624 if new_set
.uuid
== media_set_label
.uuid
{
625 if new_set
.seq_nr
!= media_set_label
.seq_nr
{
626 bail
!("got media with wrong media sequence number ({} != {}",
627 new_set
.seq_nr
,media_set_label
.seq_nr
);
629 if new_set
.encryption_key_fingerprint
!= media_set_label
.encryption_key_fingerprint
{
630 bail
!("detected changed encryption fingerprint - internal error");
632 media_catalog
= MediaCatalog
::open(status_path
, &media_id
, true, false)?
;
634 // todo: verify last content/media_catalog somehow?
640 "writing new media set label (overwrite '{}/{}')",
641 media_set_label
.uuid
.to_string(),
642 media_set_label
.seq_nr
,
645 drive
.write_media_set_label(new_set
, key_config
.as_ref())?
;
646 media_catalog
= MediaCatalog
::overwrite(status_path
, media_id
, false)?
;
652 Ok((media_catalog
, new_media
))