]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer/mod.rs
tape: include used tapes in tape notification e-mails
[proxmox-backup.git] / src / tape / pool_writer / mod.rs
1 mod catalog_set;
2 pub use catalog_set::*;
3
4 mod new_chunks_iterator;
5 pub use new_chunks_iterator::*;
6
7 use std::collections::HashSet;
8 use std::fs::File;
9 use std::path::Path;
10 use std::sync::{Arc, Mutex};
11 use std::time::SystemTime;
12
13 use anyhow::{bail, Error};
14
15 use proxmox_sys::{task_log, task_warn};
16 use proxmox_uuid::Uuid;
17
18 use pbs_config::tape_encryption_keys::load_key_configs;
19 use pbs_datastore::{DataStore, SnapshotReader};
20 use pbs_tape::{sg_tape::tape_alert_flags_critical, TapeWrite};
21 use proxmox_rest_server::WorkerTask;
22
23 use crate::tape::{
24 drive::{media_changer, request_and_load_media, TapeDriver},
25 file_formats::{
26 tape_write_catalog, tape_write_snapshot_archive, ChunkArchiveWriter, MediaSetLabel,
27 },
28 MediaCatalog, MediaId, MediaPool, COMMIT_BLOCK_SIZE, MAX_CHUNK_ARCHIVE_SIZE, TAPE_STATUS_DIR,
29 };
30
31 use super::file_formats::{
32 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1,
33 };
34
35 struct PoolWriterState {
36 drive: Box<dyn TapeDriver>,
37 // Media Uuid from loaded media
38 media_uuid: Uuid,
39 // tell if we already moved to EOM
40 at_eom: bool,
41 // bytes written after the last tape fush/sync
42 bytes_written: usize,
43 }
44
45 /// Helper to manage a backup job, writing several tapes of a pool
46 pub struct PoolWriter {
47 pool: MediaPool,
48 drive_name: String,
49 status: Option<PoolWriterState>,
50 catalog_set: Arc<Mutex<CatalogSet>>,
51 notify_email: Option<String>,
52 ns_magic: bool,
53 used_tapes: HashSet<Uuid>,
54 }
55
56 impl PoolWriter {
57 pub fn new(
58 mut pool: MediaPool,
59 drive_name: &str,
60 worker: &WorkerTask,
61 notify_email: Option<String>,
62 force_media_set: bool,
63 ns_magic: bool,
64 ) -> Result<Self, Error> {
65 let current_time = proxmox_time::epoch_i64();
66
67 let new_media_set_reason = pool.start_write_session(current_time, force_media_set)?;
68 if let Some(reason) = new_media_set_reason {
69 task_log!(worker, "starting new media set - reason: {}", reason,);
70 }
71
72 let media_set_uuid = pool.current_media_set().uuid();
73 task_log!(worker, "media set uuid: {}", media_set_uuid);
74
75 let mut catalog_set = CatalogSet::new();
76
77 // load all catalogs read-only at start
78 for media_uuid in pool.current_media_list()? {
79 let media_info = pool.lookup_media(media_uuid).unwrap();
80 let media_catalog =
81 MediaCatalog::open(Path::new(TAPE_STATUS_DIR), media_info.id(), false, false)?;
82 catalog_set.append_read_only_catalog(media_catalog)?;
83 }
84
85 Ok(Self {
86 pool,
87 drive_name: drive_name.to_string(),
88 status: None,
89 catalog_set: Arc::new(Mutex::new(catalog_set)),
90 notify_email,
91 ns_magic,
92 used_tapes: HashSet::new(),
93 })
94 }
95
96 pub fn pool(&mut self) -> &mut MediaPool {
97 &mut self.pool
98 }
99
100 /// Set media status to FULL (persistent - stores pool status)
101 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
102 self.pool.set_media_status_full(uuid)?;
103 Ok(())
104 }
105
106 pub fn get_used_media_labels(&self) -> Result<Vec<String>, Error> {
107 let mut res = Vec::with_capacity(self.used_tapes.len());
108 for media_uuid in &self.used_tapes {
109 let media_info = self.pool.lookup_media(&media_uuid)?;
110 res.push(media_info.label_text().to_string());
111 }
112
113 Ok(res)
114 }
115
116 pub fn contains_snapshot(
117 &self,
118 store: &str,
119 ns: &pbs_api_types::BackupNamespace,
120 snapshot: &pbs_api_types::BackupDir,
121 ) -> bool {
122 self.catalog_set
123 .lock()
124 .unwrap()
125 .contains_snapshot(store, ns, snapshot)
126 }
127
128 /// Eject media and drop PoolWriterState (close drive)
129 pub fn eject_media(&mut self, worker: &WorkerTask) -> Result<(), Error> {
130 let mut status = match self.status.take() {
131 Some(status) => status,
132 None => return Ok(()), // no media loaded
133 };
134
135 let (drive_config, _digest) = pbs_config::drive::config()?;
136
137 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
138 task_log!(worker, "eject media");
139 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
140 drop(status); // close drive
141 task_log!(worker, "unload media");
142 changer.unload_media(None)?; //eject and unload
143 } else {
144 task_log!(worker, "standalone drive - ejecting media");
145 status.drive.eject_media()?;
146 }
147
148 Ok(())
149 }
150
151 /// Export current media set and drop PoolWriterState (close drive)
152 pub fn export_media_set(&mut self, worker: &WorkerTask) -> Result<(), Error> {
153 let mut status = self.status.take();
154
155 let (drive_config, _digest) = pbs_config::drive::config()?;
156
157 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
158 if let Some(ref mut status) = status {
159 task_log!(worker, "rewind media");
160 // rewind first so that the unload command later does not run into a timeout
161 status.drive.rewind()?;
162 }
163 drop(status); // close drive
164
165 for media_uuid in self.pool.current_media_list()? {
166 let media = self.pool.lookup_media(media_uuid)?;
167 let label_text = media.label_text();
168 if let Some(slot) = changer.export_media(label_text)? {
169 task_log!(
170 worker,
171 "exported media '{}' to import/export slot {}",
172 label_text,
173 slot
174 );
175 } else {
176 task_warn!(
177 worker,
178 "export failed - media '{}' is not online or in different drive",
179 label_text
180 );
181 }
182 }
183 } else if let Some(mut status) = status {
184 task_log!(
185 worker,
186 "standalone drive - ejecting media instead of export"
187 );
188 status.drive.eject_media()?;
189 }
190
191 Ok(())
192 }
193
194 /// commit changes to tape and catalog
195 ///
196 /// This is done automatically during a backupsession, but needs to
197 /// be called explicitly before dropping the PoolWriter
198 pub fn commit(&mut self) -> Result<(), Error> {
199 if let Some(PoolWriterState { ref mut drive, .. }) = self.status {
200 drive.sync()?; // sync all data to the tape
201 }
202 self.catalog_set.lock().unwrap().commit()?; // then commit the catalog
203 Ok(())
204 }
205
206 /// Load a writable media into the drive
207 pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
208 let last_media_uuid = match self.status {
209 Some(PoolWriterState { ref media_uuid, .. }) => Some(media_uuid.clone()),
210 None => None,
211 };
212
213 let current_time = proxmox_time::epoch_i64();
214 let media_uuid = self.pool.alloc_writable_media(current_time)?;
215
216 let media = self.pool.lookup_media(&media_uuid).unwrap();
217
218 let media_changed = match last_media_uuid {
219 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
220 None => true,
221 };
222
223 if !media_changed {
224 self.used_tapes.insert(media_uuid.clone());
225 return Ok(media_uuid);
226 }
227
228 task_log!(
229 worker,
230 "allocated new writable media '{}'",
231 media.label_text()
232 );
233
234 if let Some(PoolWriterState { mut drive, .. }) = self.status.take() {
235 if last_media_uuid.is_some() {
236 task_log!(worker, "eject current media");
237 drive.eject_media()?;
238 }
239 }
240
241 let (drive_config, _digest) = pbs_config::drive::config()?;
242
243 let (mut drive, old_media_id) = request_and_load_media(
244 worker,
245 &drive_config,
246 &self.drive_name,
247 media.label(),
248 &self.notify_email,
249 )?;
250
251 // test for critical tape alert flags
252 if let Ok(alert_flags) = drive.tape_alert_flags() {
253 if !alert_flags.is_empty() {
254 task_log!(worker, "TapeAlertFlags: {:?}", alert_flags);
255 if tape_alert_flags_critical(alert_flags) {
256 self.pool.set_media_status_damaged(&media_uuid)?;
257 bail!(
258 "aborting due to critical tape alert flags: {:?}",
259 alert_flags
260 );
261 }
262 }
263 }
264
265 let (catalog, is_new_media) = update_media_set_label(
266 worker,
267 drive.as_mut(),
268 old_media_id.media_set_label,
269 media.id(),
270 )?;
271
272 self.catalog_set.lock().unwrap().append_catalog(catalog)?;
273
274 let media_set = media.media_set_label().clone().unwrap();
275
276 let encrypt_fingerprint = media_set
277 .encryption_key_fingerprint
278 .clone()
279 .map(|fp| (fp, media_set.uuid.clone()));
280
281 drive.set_encryption(encrypt_fingerprint)?;
282
283 self.status = Some(PoolWriterState {
284 drive,
285 media_uuid: media_uuid.clone(),
286 at_eom: false,
287 bytes_written: 0,
288 });
289
290 if is_new_media {
291 // add catalogs from previous media
292 self.append_media_set_catalogs(worker)?;
293 }
294
295 self.used_tapes.insert(media_uuid.clone());
296 Ok(media_uuid)
297 }
298
299 fn open_catalog_file(uuid: &Uuid) -> Result<File, Error> {
300 let status_path = Path::new(TAPE_STATUS_DIR);
301 let mut path = status_path.to_owned();
302 path.push(uuid.to_string());
303 path.set_extension("log");
304
305 let file = std::fs::OpenOptions::new().read(true).open(&path)?;
306
307 Ok(file)
308 }
309
310 // Check it tape is loaded, then move to EOM (if not already there)
311 //
312 // Returns the tape position at EOM.
313 fn prepare_tape_write(status: &mut PoolWriterState, worker: &WorkerTask) -> Result<u64, Error> {
314 if !status.at_eom {
315 task_log!(worker, "moving to end of media");
316 status.drive.move_to_eom(true)?;
317 status.at_eom = true;
318 task_log!(worker, "arrived at end of media");
319 }
320
321 let current_file_number = status.drive.current_file_number()?;
322 if current_file_number < 2 {
323 bail!(
324 "got strange file position number from drive ({})",
325 current_file_number
326 );
327 }
328
329 Ok(current_file_number)
330 }
331
332 /// Move to EOM (if not already there), then write the current
333 /// catalog to the tape. On success, this return 'Ok(true)'.
334
335 /// Please note that this may fail when there is not enough space
336 /// on the media (return value 'Ok(false, _)'). In that case, the
337 /// archive is marked incomplete. The caller should mark the media
338 /// as full and try again using another media.
339 pub fn append_catalog_archive(&mut self, worker: &WorkerTask) -> Result<bool, Error> {
340 let catalog_magic = self.catalog_version();
341
342 let status = match self.status {
343 Some(ref mut status) => status,
344 None => bail!("PoolWriter - no media loaded"),
345 };
346
347 Self::prepare_tape_write(status, worker)?;
348
349 let catalog_set = self.catalog_set.lock().unwrap();
350
351 let catalog = match catalog_set.catalog {
352 None => bail!("append_catalog_archive failed: no catalog - internal error"),
353 Some(ref catalog) => catalog,
354 };
355
356 let media_set = self.pool.current_media_set();
357
358 let media_list = media_set.media_list();
359 let uuid = match media_list.last() {
360 None => bail!("got empty media list - internal error"),
361 Some(None) => bail!("got incomplete media list - internal error"),
362 Some(Some(last_uuid)) => {
363 if last_uuid != catalog.uuid() {
364 bail!("got wrong media - internal error");
365 }
366 last_uuid
367 }
368 };
369
370 let seq_nr = media_list.len() - 1;
371
372 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
373
374 let mut file = Self::open_catalog_file(uuid)?;
375
376 let done = tape_write_catalog(
377 writer.as_mut(),
378 uuid,
379 media_set.uuid(),
380 seq_nr,
381 &mut file,
382 catalog_magic,
383 )?
384 .is_some();
385
386 Ok(done)
387 }
388
389 // Append catalogs for all previous media in set (without last)
390 fn append_media_set_catalogs(&mut self, worker: &WorkerTask) -> Result<(), Error> {
391 let media_set = self.pool.current_media_set();
392
393 let mut media_list = &media_set.media_list()[..];
394 if media_list.len() < 2 {
395 return Ok(());
396 }
397 media_list = &media_list[..(media_list.len() - 1)];
398
399 let catalog_magic = self.catalog_version();
400
401 let status = match self.status {
402 Some(ref mut status) => status,
403 None => bail!("PoolWriter - no media loaded"),
404 };
405
406 Self::prepare_tape_write(status, worker)?;
407
408 for (seq_nr, uuid) in media_list.iter().enumerate() {
409 let uuid = match uuid {
410 None => bail!("got incomplete media list - internal error"),
411 Some(uuid) => uuid,
412 };
413
414 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
415
416 let mut file = Self::open_catalog_file(uuid)?;
417
418 task_log!(worker, "write catalog for previous media: {}", uuid);
419
420 if tape_write_catalog(
421 writer.as_mut(),
422 uuid,
423 media_set.uuid(),
424 seq_nr,
425 &mut file,
426 catalog_magic,
427 )?
428 .is_none()
429 {
430 bail!("got EOM while writing start catalog");
431 }
432 }
433
434 Ok(())
435 }
436
437 /// Move to EOM (if not already there), then creates a new snapshot
438 /// archive writing specified files (as .pxar) into it. On
439 /// success, this return 'Ok(true)' and the media catalog gets
440 /// updated.
441
442 /// Please note that this may fail when there is not enough space
443 /// on the media (return value 'Ok(false, _)'). In that case, the
444 /// archive is marked incomplete, and we do not use it. The caller
445 /// should mark the media as full and try again using another
446 /// media.
447 pub fn append_snapshot_archive(
448 &mut self,
449 worker: &WorkerTask,
450 snapshot_reader: &SnapshotReader,
451 ) -> Result<(bool, usize), Error> {
452 let status = match self.status {
453 Some(ref mut status) => status,
454 None => bail!("PoolWriter - no media loaded"),
455 };
456
457 let current_file_number = Self::prepare_tape_write(status, worker)?;
458
459 let (done, bytes_written) = {
460 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
461
462 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
463 Some(content_uuid) => {
464 self.catalog_set.lock().unwrap().register_snapshot(
465 content_uuid,
466 current_file_number,
467 snapshot_reader.datastore_name(),
468 snapshot_reader.snapshot().backup_ns(),
469 snapshot_reader.snapshot().as_ref(),
470 )?;
471 (true, writer.bytes_written())
472 }
473 None => (false, writer.bytes_written()),
474 }
475 };
476
477 status.bytes_written += bytes_written;
478
479 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
480
481 if !done || request_sync {
482 self.commit()?;
483 }
484
485 Ok((done, bytes_written))
486 }
487
488 /// Move to EOM (if not already there), then creates a new chunk
489 /// archive and writes chunks from 'chunk_iter'. This stops when
490 /// it detect LEOM or when we reach max archive size
491 /// (4GB). Written chunks are registered in the media catalog.
492 pub fn append_chunk_archive(
493 &mut self,
494 worker: &WorkerTask,
495 chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
496 store: &str,
497 ) -> Result<(bool, usize), Error> {
498 let status = match self.status {
499 Some(ref mut status) => status,
500 None => bail!("PoolWriter - no media loaded"),
501 };
502
503 let current_file_number = Self::prepare_tape_write(status, worker)?;
504
505 let writer = status.drive.write_file()?;
506
507 let start_time = SystemTime::now();
508
509 let (saved_chunks, content_uuid, leom, bytes_written) =
510 write_chunk_archive(worker, writer, chunk_iter, store, MAX_CHUNK_ARCHIVE_SIZE)?;
511
512 status.bytes_written += bytes_written;
513
514 let elapsed = start_time.elapsed()?.as_secs_f64();
515 task_log!(
516 worker,
517 "wrote {} chunks ({:.2} MB at {:.2} MB/s)",
518 saved_chunks.len(),
519 bytes_written as f64 / 1_000_000.0,
520 (bytes_written as f64) / (1_000_000.0 * elapsed),
521 );
522
523 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
524
525 // register chunks in media_catalog
526 self.catalog_set.lock().unwrap().register_chunk_archive(
527 content_uuid,
528 current_file_number,
529 store,
530 &saved_chunks,
531 )?;
532
533 if leom || request_sync {
534 self.commit()?;
535 }
536
537 Ok((leom, bytes_written))
538 }
539
540 pub fn spawn_chunk_reader_thread(
541 &self,
542 datastore: Arc<DataStore>,
543 snapshot_reader: Arc<Mutex<SnapshotReader>>,
544 ) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
545 NewChunksIterator::spawn(datastore, snapshot_reader, Arc::clone(&self.catalog_set))
546 }
547
548 pub(crate) fn catalog_version(&self) -> [u8; 8] {
549 if self.ns_magic {
550 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1
551 } else {
552 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
553 }
554 }
555 }
556
557 /// write up to <max_size> of chunks
558 fn write_chunk_archive<'a>(
559 _worker: &WorkerTask,
560 writer: Box<dyn 'a + TapeWrite>,
561 chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
562 store: &str,
563 max_size: usize,
564 ) -> Result<(Vec<[u8; 32]>, Uuid, bool, usize), Error> {
565 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, store, true)?;
566
567 // we want to get the chunk list in correct order
568 let mut chunk_list: Vec<[u8; 32]> = Vec::new();
569
570 let mut leom = false;
571
572 loop {
573 let (digest, blob) = match chunk_iter.peek() {
574 None => break,
575 Some(Ok((digest, blob))) => (digest, blob),
576 Some(Err(err)) => bail!("{}", err),
577 };
578
579 //println!("CHUNK {} size {}", hex::encode(digest), blob.raw_size());
580
581 match writer.try_write_chunk(digest, blob) {
582 Ok(true) => {
583 chunk_list.push(*digest);
584 chunk_iter.next(); // consume
585 }
586 Ok(false) => {
587 // Note; we do not consume the chunk (no chunk_iter.next())
588 leom = true;
589 break;
590 }
591 Err(err) => bail!("write chunk failed - {}", err),
592 }
593
594 if writer.bytes_written() > max_size {
595 //task_log!(worker, "Chunk Archive max size reached, closing archive");
596 break;
597 }
598 }
599
600 writer.finish()?;
601
602 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
603 }
604
605 // Compare the media set label. If the media is empty, or the existing
606 // set label does not match the expected media set, overwrite the
607 // media set label.
608 fn update_media_set_label(
609 worker: &WorkerTask,
610 drive: &mut dyn TapeDriver,
611 old_set: Option<MediaSetLabel>,
612 media_id: &MediaId,
613 ) -> Result<(MediaCatalog, bool), Error> {
614 let media_catalog;
615
616 let new_set = match media_id.media_set_label {
617 None => bail!("got media without media set - internal error"),
618 Some(ref set) => set,
619 };
620
621 let key_config = if let Some(ref fingerprint) = new_set.encryption_key_fingerprint {
622 let (config_map, _digest) = load_key_configs()?;
623 match config_map.get(fingerprint) {
624 Some(key_config) => Some(key_config.clone()),
625 None => {
626 bail!(
627 "unable to find tape encryption key config '{}'",
628 fingerprint
629 );
630 }
631 }
632 } else {
633 None
634 };
635
636 let status_path = Path::new(TAPE_STATUS_DIR);
637
638 let new_media = match old_set {
639 None => {
640 task_log!(worker, "writing new media set label");
641 drive.write_media_set_label(new_set, key_config.as_ref())?;
642 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
643 true
644 }
645 Some(media_set_label) => {
646 if new_set.uuid == media_set_label.uuid {
647 if new_set.seq_nr != media_set_label.seq_nr {
648 bail!(
649 "got media with wrong media sequence number ({} != {}",
650 new_set.seq_nr,
651 media_set_label.seq_nr
652 );
653 }
654 if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint
655 {
656 bail!("detected changed encryption fingerprint - internal error");
657 }
658 media_catalog = MediaCatalog::open(status_path, media_id, true, false)?;
659
660 // todo: verify last content/media_catalog somehow?
661
662 false
663 } else {
664 task_log!(
665 worker,
666 "writing new media set label (overwrite '{}/{}')",
667 media_set_label.uuid.to_string(),
668 media_set_label.seq_nr,
669 );
670
671 drive.write_media_set_label(new_set, key_config.as_ref())?;
672 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
673 true
674 }
675 }
676 };
677
678 Ok((media_catalog, new_media))
679 }