2 pub use catalog_set
::*;
4 mod new_chunks_iterator
;
5 pub use new_chunks_iterator
::*;
9 use std
::time
::SystemTime
;
10 use std
::sync
::{Arc, Mutex}
;
12 use anyhow
::{bail, Error}
;
14 use proxmox
::tools
::Uuid
;
16 use pbs_datastore
::task_log
;
25 MAX_CHUNK_ARCHIVE_SIZE
,
35 tape_write_snapshot_archive
,
40 request_and_load_media
,
41 tape_alert_flags_critical
,
45 config
::tape_encryption_keys
::load_key_configs
,
49 struct PoolWriterState
{
50 drive
: Box
<dyn TapeDriver
>,
51 // Media Uuid from loaded media
53 // tell if we already moved to EOM
55 // bytes written after the last tape fush/sync
59 /// Helper to manage a backup job, writing several tapes of a pool
60 pub struct PoolWriter
{
63 status
: Option
<PoolWriterState
>,
64 catalog_set
: Arc
<Mutex
<CatalogSet
>>,
65 notify_email
: Option
<String
>,
74 notify_email
: Option
<String
>,
75 force_media_set
: bool
,
76 ) -> Result
<Self, Error
> {
78 let current_time
= proxmox
::tools
::time
::epoch_i64();
80 let new_media_set_reason
= pool
.start_write_session(current_time
, force_media_set
)?
;
81 if let Some(reason
) = new_media_set_reason
{
84 "starting new media set - reason: {}",
89 let media_set_uuid
= pool
.current_media_set().uuid();
90 task_log
!(worker
, "media set uuid: {}", media_set_uuid
);
92 let mut catalog_set
= CatalogSet
::new();
94 // load all catalogs read-only at start
95 for media_uuid
in pool
.current_media_list()?
{
96 let media_info
= pool
.lookup_media(media_uuid
).unwrap();
97 let media_catalog
= MediaCatalog
::open(
98 Path
::new(TAPE_STATUS_DIR
),
103 catalog_set
.append_read_only_catalog(media_catalog
)?
;
108 drive_name
: drive_name
.to_string(),
110 catalog_set
: Arc
::new(Mutex
::new(catalog_set
)),
115 pub fn pool(&mut self) -> &mut MediaPool
{
119 /// Set media status to FULL (persistent - stores pool status)
120 pub fn set_media_status_full(&mut self, uuid
: &Uuid
) -> Result
<(), Error
> {
121 self.pool
.set_media_status_full(&uuid
)?
;
125 pub fn contains_snapshot(&self, store
: &str, snapshot
: &str) -> bool
{
126 self.catalog_set
.lock().unwrap().contains_snapshot(store
, snapshot
)
129 /// Eject media and drop PoolWriterState (close drive)
130 pub fn eject_media(&mut self, worker
: &WorkerTask
) -> Result
<(), Error
> {
131 let mut status
= match self.status
.take() {
132 Some(status
) => status
,
133 None
=> return Ok(()), // no media loaded
136 let (drive_config
, _digest
) = crate::config
::drive
::config()?
;
138 if let Some((mut changer
, _
)) = media_changer(&drive_config
, &self.drive_name
)?
{
139 worker
.log("eject media");
140 status
.drive
.eject_media()?
; // rewind and eject early, so that unload_media is faster
141 drop(status
); // close drive
142 worker
.log("unload media");
143 changer
.unload_media(None
)?
; //eject and unload
145 worker
.log("standalone drive - ejecting media");
146 status
.drive
.eject_media()?
;
152 /// Export current media set and drop PoolWriterState (close drive)
153 pub fn export_media_set(&mut self, worker
: &WorkerTask
) -> Result
<(), Error
> {
154 let mut status
= self.status
.take();
156 let (drive_config
, _digest
) = crate::config
::drive
::config()?
;
158 if let Some((mut changer
, _
)) = media_changer(&drive_config
, &self.drive_name
)?
{
160 if let Some(ref mut status
) = status
{
161 worker
.log("eject media");
162 status
.drive
.eject_media()?
; // rewind and eject early, so that unload_media is faster
164 drop(status
); // close drive
166 worker
.log("unload media");
167 changer
.unload_media(None
)?
;
169 for media_uuid
in self.pool
.current_media_list()?
{
170 let media
= self.pool
.lookup_media(media_uuid
)?
;
171 let label_text
= media
.label_text();
172 if let Some(slot
) = changer
.export_media(label_text
)?
{
173 worker
.log(format
!("exported media '{}' to import/export slot {}", label_text
, slot
));
175 worker
.warn(format
!("export failed - media '{}' is not online", label_text
));
179 } else if let Some(mut status
) = status
{
180 worker
.log("standalone drive - ejecting media instead of export");
181 status
.drive
.eject_media()?
;
187 /// commit changes to tape and catalog
189 /// This is done automatically during a backupsession, but needs to
190 /// be called explicitly before dropping the PoolWriter
191 pub fn commit(&mut self) -> Result
<(), Error
> {
192 if let Some(PoolWriterState {ref mut drive, .. }
) = self.status
{
193 drive
.sync()?
; // sync all data to the tape
195 self.catalog_set
.lock().unwrap().commit()?
; // then commit the catalog
199 /// Load a writable media into the drive
200 pub fn load_writable_media(&mut self, worker
: &WorkerTask
) -> Result
<Uuid
, Error
> {
201 let last_media_uuid
= match self.status
{
202 Some(PoolWriterState { ref media_uuid, ..}
) => Some(media_uuid
.clone()),
206 let current_time
= proxmox
::tools
::time
::epoch_i64();
207 let media_uuid
= self.pool
.alloc_writable_media(current_time
)?
;
209 let media
= self.pool
.lookup_media(&media_uuid
).unwrap();
211 let media_changed
= match last_media_uuid
{
212 Some(ref last_media_uuid
) => last_media_uuid
!= &media_uuid
,
217 return Ok(media_uuid
);
220 task_log
!(worker
, "allocated new writable media '{}'", media
.label_text());
222 if let Some(PoolWriterState {mut drive, .. }
) = self.status
.take() {
223 if last_media_uuid
.is_some() {
224 task_log
!(worker
, "eject current media");
225 drive
.eject_media()?
;
229 let (drive_config
, _digest
) = crate::config
::drive
::config()?
;
231 let (mut drive
, old_media_id
) =
232 request_and_load_media(worker
, &drive_config
, &self.drive_name
, media
.label(), &self.notify_email
)?
;
234 // test for critical tape alert flags
235 if let Ok(alert_flags
) = drive
.tape_alert_flags() {
236 if !alert_flags
.is_empty() {
237 worker
.log(format
!("TapeAlertFlags: {:?}", alert_flags
));
238 if tape_alert_flags_critical(alert_flags
) {
239 self.pool
.set_media_status_damaged(&media_uuid
)?
;
240 bail
!("aborting due to critical tape alert flags: {:?}", alert_flags
);
245 let (catalog
, is_new_media
) = update_media_set_label(
248 old_media_id
.media_set_label
,
252 self.catalog_set
.lock().unwrap().append_catalog(catalog
)?
;
254 let media_set
= media
.media_set_label().clone().unwrap();
256 let encrypt_fingerprint
= media_set
257 .encryption_key_fingerprint
259 .map(|fp
| (fp
, media_set
.uuid
.clone()));
261 drive
.set_encryption(encrypt_fingerprint
)?
;
263 self.status
= Some(PoolWriterState
{
265 media_uuid
: media_uuid
.clone(),
271 // add catalogs from previous media
272 self.append_media_set_catalogs(worker
)?
;
278 fn open_catalog_file(uuid
: &Uuid
) -> Result
<File
, Error
> {
280 let status_path
= Path
::new(TAPE_STATUS_DIR
);
281 let mut path
= status_path
.to_owned();
282 path
.push(uuid
.to_string());
283 path
.set_extension("log");
285 let file
= std
::fs
::OpenOptions
::new()
292 // Check it tape is loaded, then move to EOM (if not already there)
294 // Returns the tape position at EOM.
295 fn prepare_tape_write(
296 status
: &mut PoolWriterState
,
298 ) -> Result
<u64, Error
> {
301 worker
.log(String
::from("moving to end of media"));
302 status
.drive
.move_to_eom(true)?
;
303 status
.at_eom
= true;
306 let current_file_number
= status
.drive
.current_file_number()?
;
307 if current_file_number
< 2 {
308 bail
!("got strange file position number from drive ({})", current_file_number
);
311 Ok(current_file_number
)
314 /// Move to EOM (if not already there), then write the current
315 /// catalog to the tape. On success, this return 'Ok(true)'.
317 /// Please note that this may fail when there is not enough space
318 /// on the media (return value 'Ok(false, _)'). In that case, the
319 /// archive is marked incomplete. The caller should mark the media
320 /// as full and try again using another media.
321 pub fn append_catalog_archive(
324 ) -> Result
<bool
, Error
> {
326 let status
= match self.status
{
327 Some(ref mut status
) => status
,
328 None
=> bail
!("PoolWriter - no media loaded"),
331 Self::prepare_tape_write(status
, worker
)?
;
333 let catalog_set
= self.catalog_set
.lock().unwrap();
335 let catalog
= match catalog_set
.catalog
{
336 None
=> bail
!("append_catalog_archive failed: no catalog - internal error"),
337 Some(ref catalog
) => catalog
,
340 let media_set
= self.pool
.current_media_set();
342 let media_list
= media_set
.media_list();
343 let uuid
= match media_list
.last() {
344 None
=> bail
!("got empty media list - internal error"),
345 Some(None
) => bail
!("got incomplete media list - internal error"),
346 Some(Some(last_uuid
)) => {
347 if last_uuid
!= catalog
.uuid() {
348 bail
!("got wrong media - internal error");
354 let seq_nr
= media_list
.len() - 1;
356 let mut writer
: Box
<dyn TapeWrite
> = status
.drive
.write_file()?
;
358 let mut file
= Self::open_catalog_file(uuid
)?
;
360 let done
= tape_write_catalog(
371 // Append catalogs for all previous media in set (without last)
372 fn append_media_set_catalogs(
375 ) -> Result
<(), Error
> {
377 let media_set
= self.pool
.current_media_set();
379 let mut media_list
= &media_set
.media_list()[..];
380 if media_list
.len() < 2 {
383 media_list
= &media_list
[..(media_list
.len()-1)];
385 let status
= match self.status
{
386 Some(ref mut status
) => status
,
387 None
=> bail
!("PoolWriter - no media loaded"),
390 Self::prepare_tape_write(status
, worker
)?
;
392 for (seq_nr
, uuid
) in media_list
.iter().enumerate() {
394 let uuid
= match uuid
{
395 None
=> bail
!("got incomplete media list - internal error"),
399 let mut writer
: Box
<dyn TapeWrite
> = status
.drive
.write_file()?
;
401 let mut file
= Self::open_catalog_file(uuid
)?
;
403 task_log
!(worker
, "write catalog for previous media: {}", uuid
);
405 if tape_write_catalog(
412 bail
!("got EOM while writing start catalog");
419 /// Move to EOM (if not already there), then creates a new snapshot
420 /// archive writing specified files (as .pxar) into it. On
421 /// success, this return 'Ok(true)' and the media catalog gets
424 /// Please note that this may fail when there is not enough space
425 /// on the media (return value 'Ok(false, _)'). In that case, the
426 /// archive is marked incomplete, and we do not use it. The caller
427 /// should mark the media as full and try again using another
429 pub fn append_snapshot_archive(
432 snapshot_reader
: &SnapshotReader
,
433 ) -> Result
<(bool
, usize), Error
> {
435 let status
= match self.status
{
436 Some(ref mut status
) => status
,
437 None
=> bail
!("PoolWriter - no media loaded"),
440 let current_file_number
= Self::prepare_tape_write(status
, worker
)?
;
442 let (done
, bytes_written
) = {
443 let mut writer
: Box
<dyn TapeWrite
> = status
.drive
.write_file()?
;
445 match tape_write_snapshot_archive(writer
.as_mut(), snapshot_reader
)?
{
446 Some(content_uuid
) => {
447 self.catalog_set
.lock().unwrap().register_snapshot(
450 &snapshot_reader
.datastore_name().to_string(),
451 &snapshot_reader
.snapshot().to_string(),
453 (true, writer
.bytes_written())
455 None
=> (false, writer
.bytes_written()),
459 status
.bytes_written
+= bytes_written
;
461 let request_sync
= status
.bytes_written
>= COMMIT_BLOCK_SIZE
;
463 if !done
|| request_sync
{
467 Ok((done
, bytes_written
))
470 /// Move to EOM (if not already there), then creates a new chunk
471 /// archive and writes chunks from 'chunk_iter'. This stops when
472 /// it detect LEOM or when we reach max archive size
473 /// (4GB). Written chunks are registered in the media catalog.
474 pub fn append_chunk_archive(
477 chunk_iter
: &mut std
::iter
::Peekable
<NewChunksIterator
>,
479 ) -> Result
<(bool
, usize), Error
> {
481 let status
= match self.status
{
482 Some(ref mut status
) => status
,
483 None
=> bail
!("PoolWriter - no media loaded"),
486 let current_file_number
= Self::prepare_tape_write(status
, worker
)?
;
488 let writer
= status
.drive
.write_file()?
;
490 let start_time
= SystemTime
::now();
492 let (saved_chunks
, content_uuid
, leom
, bytes_written
) = write_chunk_archive(
497 MAX_CHUNK_ARCHIVE_SIZE
,
500 status
.bytes_written
+= bytes_written
;
502 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
504 "wrote {} chunks ({:.2} MB at {:.2} MB/s)",
506 bytes_written
as f64 /1_000_000.0,
507 (bytes_written
as f64)/(1_000_000.0*elapsed
),
510 let request_sync
= status
.bytes_written
>= COMMIT_BLOCK_SIZE
;
512 // register chunks in media_catalog
513 self.catalog_set
.lock().unwrap()
514 .register_chunk_archive(content_uuid
, current_file_number
, store
, &saved_chunks
)?
;
516 if leom
|| request_sync
{
520 Ok((leom
, bytes_written
))
523 pub fn spawn_chunk_reader_thread(
525 datastore
: Arc
<DataStore
>,
526 snapshot_reader
: Arc
<Mutex
<SnapshotReader
>>,
527 ) -> Result
<(std
::thread
::JoinHandle
<()>, NewChunksIterator
), Error
> {
528 NewChunksIterator
::spawn(
531 Arc
::clone(&self.catalog_set
),
536 /// write up to <max_size> of chunks
537 fn write_chunk_archive
<'a
>(
538 _worker
: &WorkerTask
,
539 writer
: Box
<dyn 'a
+ TapeWrite
>,
540 chunk_iter
: &mut std
::iter
::Peekable
<NewChunksIterator
>,
543 ) -> Result
<(Vec
<[u8;32]>, Uuid
, bool
, usize), Error
> {
545 let (mut writer
, content_uuid
) = ChunkArchiveWriter
::new(writer
, store
, true)?
;
547 // we want to get the chunk list in correct order
548 let mut chunk_list
: Vec
<[u8;32]> = Vec
::new();
550 let mut leom
= false;
553 let (digest
, blob
) = match chunk_iter
.peek() {
555 Some(Ok((digest
, blob
))) => (digest
, blob
),
556 Some(Err(err
)) => bail
!("{}", err
),
559 //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(digest), blob.raw_size());
561 match writer
.try_write_chunk(&digest
, &blob
) {
563 chunk_list
.push(*digest
);
564 chunk_iter
.next(); // consume
567 // Note; we do not consume the chunk (no chunk_iter.next())
571 Err(err
) => bail
!("write chunk failed - {}", err
),
574 if writer
.bytes_written() > max_size
{
575 //worker.log("Chunk Archive max size reached, closing archive".to_string());
582 Ok((chunk_list
, content_uuid
, leom
, writer
.bytes_written()))
585 // Compare the media set label. If the media is empty, or the existing
586 // set label does not match the expected media set, overwrite the
588 fn update_media_set_label(
590 drive
: &mut dyn TapeDriver
,
591 old_set
: Option
<MediaSetLabel
>,
593 ) -> Result
<(MediaCatalog
, bool
), Error
> {
597 let new_set
= match media_id
.media_set_label
{
598 None
=> bail
!("got media without media set - internal error"),
599 Some(ref set
) => set
,
602 let key_config
= if let Some(ref fingerprint
) = new_set
.encryption_key_fingerprint
{
603 let (config_map
, _digest
) = load_key_configs()?
;
604 match config_map
.get(fingerprint
) {
605 Some(key_config
) => Some(key_config
.clone()),
607 bail
!("unable to find tape encryption key config '{}'", fingerprint
);
614 let status_path
= Path
::new(TAPE_STATUS_DIR
);
616 let new_media
= match old_set
{
618 worker
.log("wrinting new media set label".to_string());
619 drive
.write_media_set_label(new_set
, key_config
.as_ref())?
;
620 media_catalog
= MediaCatalog
::overwrite(status_path
, media_id
, false)?
;
623 Some(media_set_label
) => {
624 if new_set
.uuid
== media_set_label
.uuid
{
625 if new_set
.seq_nr
!= media_set_label
.seq_nr
{
626 bail
!("got media with wrong media sequence number ({} != {}",
627 new_set
.seq_nr
,media_set_label
.seq_nr
);
629 if new_set
.encryption_key_fingerprint
!= media_set_label
.encryption_key_fingerprint
{
630 bail
!("detected changed encryption fingerprint - internal error");
632 media_catalog
= MediaCatalog
::open(status_path
, &media_id
, true, false)?
;
634 // todo: verify last content/media_catalog somehow?
639 format
!("wrinting new media set label (overwrite '{}/{}')",
640 media_set_label
.uuid
.to_string(), media_set_label
.seq_nr
)
643 drive
.write_media_set_label(new_set
, key_config
.as_ref())?
;
644 media_catalog
= MediaCatalog
::overwrite(status_path
, media_id
, false)?
;
650 Ok((media_catalog
, new_media
))