]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer/mod.rs
fix #5229: tape: remove max sequence number limit
[proxmox-backup.git] / src / tape / pool_writer / mod.rs
1 mod catalog_set;
2 pub use catalog_set::*;
3
4 mod new_chunks_iterator;
5 pub use new_chunks_iterator::*;
6
7 use std::collections::HashSet;
8 use std::fs::File;
9 use std::path::PathBuf;
10 use std::sync::{Arc, Mutex};
11 use std::time::SystemTime;
12
13 use anyhow::{bail, Error};
14
15 use proxmox_sys::{task_log, task_warn};
16 use proxmox_uuid::Uuid;
17
18 use pbs_datastore::{DataStore, SnapshotReader};
19 use pbs_tape::{sg_tape::tape_alert_flags_critical, TapeWrite};
20 use proxmox_rest_server::WorkerTask;
21
22 use crate::tape::{
23 drive::{media_changer, request_and_load_media, TapeDriver},
24 encryption_keys::load_key_configs,
25 file_formats::{
26 tape_write_catalog, tape_write_snapshot_archive, ChunkArchiveWriter, MediaSetLabel,
27 },
28 MediaCatalog, MediaId, MediaPool, COMMIT_BLOCK_SIZE, MAX_CHUNK_ARCHIVE_SIZE, TAPE_STATUS_DIR,
29 };
30
31 use super::file_formats::{
32 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1,
33 };
34
35 // Warn when the sequence number reaches this limit, as large
36 // media sets are error prone and take a very long time to restore from.
37 const MEDIA_SET_SEQ_NR_WARN_LIMIT: u64 = 20;
38
39 struct PoolWriterState {
40 drive: Box<dyn TapeDriver>,
41 // Media Uuid from loaded media
42 media_uuid: Uuid,
43 // tell if we already moved to EOM
44 at_eom: bool,
45 // bytes written after the last tape fush/sync
46 bytes_written: usize,
47 }
48
49 /// Helper to manage a backup job, writing several tapes of a pool
50 pub struct PoolWriter {
51 pool: MediaPool,
52 drive_name: String,
53 status: Option<PoolWriterState>,
54 catalog_set: Arc<Mutex<CatalogSet>>,
55 notify_email: Option<String>,
56 ns_magic: bool,
57 used_tapes: HashSet<Uuid>,
58 }
59
60 impl PoolWriter {
61 pub fn new(
62 mut pool: MediaPool,
63 drive_name: &str,
64 worker: &WorkerTask,
65 notify_email: Option<String>,
66 force_media_set: bool,
67 ns_magic: bool,
68 ) -> Result<Self, Error> {
69 let current_time = proxmox_time::epoch_i64();
70
71 let new_media_set_reason = pool.start_write_session(current_time, force_media_set)?;
72 if let Some(reason) = new_media_set_reason {
73 task_log!(worker, "starting new media set - reason: {}", reason,);
74 }
75
76 let media_set_uuid = pool.current_media_set().uuid();
77 task_log!(worker, "media set uuid: {}", media_set_uuid);
78
79 let mut catalog_set = CatalogSet::new();
80
81 // load all catalogs read-only at start
82 for media_uuid in pool.current_media_list()? {
83 let media_info = pool.lookup_media(media_uuid).unwrap();
84 let media_catalog = MediaCatalog::open(TAPE_STATUS_DIR, media_info.id(), false, false)?;
85 catalog_set.append_read_only_catalog(media_catalog)?;
86 }
87
88 Ok(Self {
89 pool,
90 drive_name: drive_name.to_string(),
91 status: None,
92 catalog_set: Arc::new(Mutex::new(catalog_set)),
93 notify_email,
94 ns_magic,
95 used_tapes: HashSet::new(),
96 })
97 }
98
99 pub fn pool(&mut self) -> &mut MediaPool {
100 &mut self.pool
101 }
102
103 /// Set media status to FULL (persistent - stores pool status)
104 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
105 self.pool.set_media_status_full(uuid)?;
106 Ok(())
107 }
108
109 pub fn get_used_media_labels(&self) -> Result<Vec<String>, Error> {
110 let mut res = Vec::with_capacity(self.used_tapes.len());
111 for media_uuid in &self.used_tapes {
112 let media_info = self.pool.lookup_media(media_uuid)?;
113 res.push(media_info.label_text().to_string());
114 }
115
116 Ok(res)
117 }
118
119 pub fn contains_snapshot(
120 &self,
121 store: &str,
122 ns: &pbs_api_types::BackupNamespace,
123 snapshot: &pbs_api_types::BackupDir,
124 ) -> bool {
125 self.catalog_set
126 .lock()
127 .unwrap()
128 .contains_snapshot(store, ns, snapshot)
129 }
130
131 /// Eject media and drop PoolWriterState (close drive)
132 pub fn eject_media(&mut self, worker: &WorkerTask) -> Result<(), Error> {
133 let mut status = match self.status.take() {
134 Some(status) => status,
135 None => return Ok(()), // no media loaded
136 };
137
138 let (drive_config, _digest) = pbs_config::drive::config()?;
139
140 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
141 task_log!(worker, "eject media");
142 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
143 drop(status); // close drive
144 task_log!(worker, "unload media");
145 changer.unload_media(None)?; //eject and unload
146 } else {
147 task_log!(worker, "standalone drive - ejecting media");
148 status.drive.eject_media()?;
149 }
150
151 Ok(())
152 }
153
154 /// Export current media set and drop PoolWriterState (close drive)
155 pub fn export_media_set(&mut self, worker: &WorkerTask) -> Result<(), Error> {
156 let mut status = self.status.take();
157
158 let (drive_config, _digest) = pbs_config::drive::config()?;
159
160 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
161 if let Some(ref mut status) = status {
162 task_log!(worker, "rewind media");
163 // rewind first so that the unload command later does not run into a timeout
164 status.drive.rewind()?;
165 }
166 drop(status); // close drive
167
168 for media_uuid in self.pool.current_media_list()? {
169 let media = self.pool.lookup_media(media_uuid)?;
170 let label_text = media.label_text();
171 if let Some(slot) = changer.export_media(label_text)? {
172 task_log!(
173 worker,
174 "exported media '{}' to import/export slot {}",
175 label_text,
176 slot
177 );
178 } else {
179 task_warn!(
180 worker,
181 "export failed - media '{}' is not online or in different drive",
182 label_text
183 );
184 }
185 }
186 } else if let Some(mut status) = status {
187 task_log!(
188 worker,
189 "standalone drive - ejecting media instead of export"
190 );
191 status.drive.eject_media()?;
192 }
193
194 Ok(())
195 }
196
197 /// commit changes to tape and catalog
198 ///
199 /// This is done automatically during a backupsession, but needs to
200 /// be called explicitly before dropping the PoolWriter
201 pub fn commit(&mut self) -> Result<(), Error> {
202 if let Some(PoolWriterState { ref mut drive, .. }) = self.status {
203 drive.sync()?; // sync all data to the tape
204 }
205 self.catalog_set.lock().unwrap().commit()?; // then commit the catalog
206 Ok(())
207 }
208
209 /// Load a writable media into the drive
210 pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
211 let last_media_uuid = match self.status {
212 Some(PoolWriterState { ref media_uuid, .. }) => Some(media_uuid.clone()),
213 None => None,
214 };
215
216 let current_time = proxmox_time::epoch_i64();
217 let media_uuid = self.pool.alloc_writable_media(current_time)?;
218
219 let media = self.pool.lookup_media(&media_uuid).unwrap();
220
221 let media_changed = match last_media_uuid {
222 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
223 None => true,
224 };
225
226 if !media_changed {
227 self.used_tapes.insert(media_uuid.clone());
228 return Ok(media_uuid);
229 }
230
231 task_log!(
232 worker,
233 "allocated new writable media '{}'",
234 media.label_text()
235 );
236
237 if let Some(PoolWriterState { mut drive, .. }) = self.status.take() {
238 if last_media_uuid.is_some() {
239 task_log!(worker, "eject current media");
240 drive.eject_media()?;
241 }
242 }
243
244 let (drive_config, _digest) = pbs_config::drive::config()?;
245
246 let (mut drive, old_media_id) = request_and_load_media(
247 worker,
248 &drive_config,
249 &self.drive_name,
250 media.label(),
251 &self.notify_email,
252 )?;
253
254 // test for critical tape alert flags
255 if let Ok(alert_flags) = drive.tape_alert_flags() {
256 if !alert_flags.is_empty() {
257 task_log!(worker, "TapeAlertFlags: {:?}", alert_flags);
258 if tape_alert_flags_critical(alert_flags) {
259 self.pool.set_media_status_damaged(&media_uuid)?;
260 bail!(
261 "aborting due to critical tape alert flags: {:?}",
262 alert_flags
263 );
264 }
265 }
266 }
267
268 let (catalog, is_new_media) = update_media_set_label(
269 worker,
270 drive.as_mut(),
271 old_media_id.media_set_label,
272 media.id(),
273 )?;
274
275 self.catalog_set.lock().unwrap().append_catalog(catalog)?;
276
277 let media_set = media.media_set_label().unwrap();
278
279 if is_new_media && media_set.seq_nr >= MEDIA_SET_SEQ_NR_WARN_LIMIT {
280 task_warn!(
281 worker,
282 "large media-set detected ({}), consider using a different allocation policy",
283 media_set.seq_nr
284 );
285 }
286
287 drive.assert_encryption_mode(media_set.encryption_key_fingerprint.is_some())?;
288
289 self.status = Some(PoolWriterState {
290 drive,
291 media_uuid: media_uuid.clone(),
292 at_eom: false,
293 bytes_written: 0,
294 });
295
296 if is_new_media {
297 // add catalogs from previous media
298 self.append_media_set_catalogs(worker)?;
299 }
300
301 self.used_tapes.insert(media_uuid.clone());
302 Ok(media_uuid)
303 }
304
305 fn open_catalog_file(uuid: &Uuid) -> Result<File, Error> {
306 let mut path = PathBuf::from(TAPE_STATUS_DIR);
307 path.push(uuid.to_string());
308 path.set_extension("log");
309
310 let file = std::fs::OpenOptions::new().read(true).open(&path)?;
311
312 Ok(file)
313 }
314
315 // Check it tape is loaded, then move to EOM (if not already there)
316 //
317 // Returns the tape position at EOM.
318 fn prepare_tape_write(status: &mut PoolWriterState, worker: &WorkerTask) -> Result<u64, Error> {
319 if !status.at_eom {
320 task_log!(worker, "moving to end of media");
321 status.drive.move_to_eom(true)?;
322 status.at_eom = true;
323 task_log!(worker, "arrived at end of media");
324 }
325
326 let current_file_number = status.drive.current_file_number()?;
327 if current_file_number < 2 {
328 bail!(
329 "got strange file position number from drive ({})",
330 current_file_number
331 );
332 }
333
334 Ok(current_file_number)
335 }
336
337 /// Move to EOM (if not already there), then write the current
338 /// catalog to the tape. On success, this return 'Ok(true)'.
339
340 /// Please note that this may fail when there is not enough space
341 /// on the media (return value 'Ok(false, _)'). In that case, the
342 /// archive is marked incomplete. The caller should mark the media
343 /// as full and try again using another media.
344 pub fn append_catalog_archive(&mut self, worker: &WorkerTask) -> Result<bool, Error> {
345 let catalog_magic = self.catalog_version();
346
347 let status = match self.status {
348 Some(ref mut status) => status,
349 None => bail!("PoolWriter - no media loaded"),
350 };
351
352 Self::prepare_tape_write(status, worker)?;
353
354 let catalog_set = self.catalog_set.lock().unwrap();
355
356 let catalog = match catalog_set.catalog {
357 None => bail!("append_catalog_archive failed: no catalog - internal error"),
358 Some(ref catalog) => catalog,
359 };
360
361 let media_set = self.pool.current_media_set();
362
363 let media_list = media_set.media_list();
364 let uuid = match media_list.last() {
365 None => bail!("got empty media list - internal error"),
366 Some(None) => bail!("got incomplete media list - internal error"),
367 Some(Some(last_uuid)) => {
368 if last_uuid != catalog.uuid() {
369 bail!("got wrong media - internal error");
370 }
371 last_uuid
372 }
373 };
374
375 let seq_nr = media_list.len() - 1;
376
377 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
378
379 let mut file = Self::open_catalog_file(uuid)?;
380
381 let done = tape_write_catalog(
382 writer.as_mut(),
383 uuid,
384 media_set.uuid(),
385 seq_nr,
386 &mut file,
387 catalog_magic,
388 )?
389 .is_some();
390
391 Ok(done)
392 }
393
394 // Append catalogs for all previous media in set (without last)
395 fn append_media_set_catalogs(&mut self, worker: &WorkerTask) -> Result<(), Error> {
396 let media_set = self.pool.current_media_set();
397
398 let mut media_list = media_set.media_list();
399 if media_list.len() < 2 {
400 return Ok(());
401 }
402 media_list = &media_list[..(media_list.len() - 1)];
403
404 let catalog_magic = self.catalog_version();
405
406 let status = match self.status {
407 Some(ref mut status) => status,
408 None => bail!("PoolWriter - no media loaded"),
409 };
410
411 Self::prepare_tape_write(status, worker)?;
412
413 for (seq_nr, uuid) in media_list.iter().enumerate() {
414 let uuid = match uuid {
415 None => bail!("got incomplete media list - internal error"),
416 Some(uuid) => uuid,
417 };
418
419 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
420
421 let mut file = Self::open_catalog_file(uuid)?;
422
423 task_log!(worker, "write catalog for previous media: {}", uuid);
424
425 if tape_write_catalog(
426 writer.as_mut(),
427 uuid,
428 media_set.uuid(),
429 seq_nr,
430 &mut file,
431 catalog_magic,
432 )?
433 .is_none()
434 {
435 bail!("got EOM while writing start catalog");
436 }
437 }
438
439 Ok(())
440 }
441
442 /// Move to EOM (if not already there), then creates a new snapshot
443 /// archive writing specified files (as .pxar) into it. On
444 /// success, this return 'Ok(true)' and the media catalog gets
445 /// updated.
446
447 /// Please note that this may fail when there is not enough space
448 /// on the media (return value 'Ok(false, _)'). In that case, the
449 /// archive is marked incomplete, and we do not use it. The caller
450 /// should mark the media as full and try again using another
451 /// media.
452 pub fn append_snapshot_archive(
453 &mut self,
454 worker: &WorkerTask,
455 snapshot_reader: &SnapshotReader,
456 ) -> Result<(bool, usize), Error> {
457 let status = match self.status {
458 Some(ref mut status) => status,
459 None => bail!("PoolWriter - no media loaded"),
460 };
461
462 let current_file_number = Self::prepare_tape_write(status, worker)?;
463
464 let (done, bytes_written) = {
465 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
466
467 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
468 Some(content_uuid) => {
469 self.catalog_set.lock().unwrap().register_snapshot(
470 content_uuid,
471 current_file_number,
472 snapshot_reader.datastore_name(),
473 snapshot_reader.snapshot().backup_ns(),
474 snapshot_reader.snapshot().as_ref(),
475 )?;
476 (true, writer.bytes_written())
477 }
478 None => (false, writer.bytes_written()),
479 }
480 };
481
482 status.bytes_written += bytes_written;
483
484 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
485
486 if !done || request_sync {
487 self.commit()?;
488 }
489
490 Ok((done, bytes_written))
491 }
492
493 /// Move to EOM (if not already there), then creates a new chunk
494 /// archive and writes chunks from 'chunk_iter'. This stops when
495 /// it detect LEOM or when we reach max archive size
496 /// (4GB). Written chunks are registered in the media catalog.
497 pub fn append_chunk_archive(
498 &mut self,
499 worker: &WorkerTask,
500 chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
501 store: &str,
502 ) -> Result<(bool, usize), Error> {
503 let status = match self.status {
504 Some(ref mut status) => status,
505 None => bail!("PoolWriter - no media loaded"),
506 };
507
508 let current_file_number = Self::prepare_tape_write(status, worker)?;
509
510 let writer = status.drive.write_file()?;
511
512 let start_time = SystemTime::now();
513
514 let (saved_chunks, content_uuid, leom, bytes_written) =
515 write_chunk_archive(worker, writer, chunk_iter, store, MAX_CHUNK_ARCHIVE_SIZE)?;
516
517 status.bytes_written += bytes_written;
518
519 let elapsed = start_time.elapsed()?.as_secs_f64();
520 task_log!(
521 worker,
522 "wrote {} chunks ({:.2} MB at {:.2} MB/s)",
523 saved_chunks.len(),
524 bytes_written as f64 / 1_000_000.0,
525 (bytes_written as f64) / (1_000_000.0 * elapsed),
526 );
527
528 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
529
530 // register chunks in media_catalog
531 self.catalog_set.lock().unwrap().register_chunk_archive(
532 content_uuid,
533 current_file_number,
534 store,
535 &saved_chunks,
536 )?;
537
538 if leom || request_sync {
539 self.commit()?;
540 }
541
542 Ok((leom, bytes_written))
543 }
544
545 pub fn spawn_chunk_reader_thread(
546 &self,
547 datastore: Arc<DataStore>,
548 snapshot_reader: Arc<Mutex<SnapshotReader>>,
549 ) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
550 NewChunksIterator::spawn(datastore, snapshot_reader, Arc::clone(&self.catalog_set))
551 }
552
553 pub(crate) fn catalog_version(&self) -> [u8; 8] {
554 if self.ns_magic {
555 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1
556 } else {
557 PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0
558 }
559 }
560 }
561
562 /// write up to <max_size> of chunks
563 #[allow(clippy::type_complexity)]
564 fn write_chunk_archive<'a>(
565 _worker: &WorkerTask,
566 writer: Box<dyn 'a + TapeWrite>,
567 chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
568 store: &str,
569 max_size: usize,
570 ) -> Result<(Vec<[u8; 32]>, Uuid, bool, usize), Error> {
571 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, store, true)?;
572
573 // we want to get the chunk list in correct order
574 let mut chunk_list: Vec<[u8; 32]> = Vec::new();
575
576 let mut leom = false;
577
578 loop {
579 let (digest, blob) = match chunk_iter.peek() {
580 None => break,
581 Some(Ok((digest, blob))) => (digest, blob),
582 Some(Err(err)) => bail!("{}", err),
583 };
584
585 //println!("CHUNK {} size {}", hex::encode(digest), blob.raw_size());
586
587 match writer.try_write_chunk(digest, blob) {
588 Ok(true) => {
589 chunk_list.push(*digest);
590 chunk_iter.next(); // consume
591 }
592 Ok(false) => {
593 // Note; we do not consume the chunk (no chunk_iter.next())
594 leom = true;
595 break;
596 }
597 Err(err) => bail!("write chunk failed - {}", err),
598 }
599
600 if writer.bytes_written() > max_size {
601 //task_log!(worker, "Chunk Archive max size reached, closing archive");
602 break;
603 }
604 }
605
606 writer.finish()?;
607
608 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
609 }
610
611 // Compare the media set label. If the media is empty, or the existing
612 // set label does not match the expected media set, overwrite the
613 // media set label.
614 fn update_media_set_label(
615 worker: &WorkerTask,
616 drive: &mut dyn TapeDriver,
617 old_set: Option<MediaSetLabel>,
618 media_id: &MediaId,
619 ) -> Result<(MediaCatalog, bool), Error> {
620 let media_catalog;
621
622 let new_set = match media_id.media_set_label {
623 None => bail!("got media without media set - internal error"),
624 Some(ref set) => set,
625 };
626
627 let key_config = if let Some(ref fingerprint) = new_set.encryption_key_fingerprint {
628 let (config_map, _digest) = load_key_configs()?;
629 match config_map.get(fingerprint) {
630 Some(key_config) => Some(key_config.clone()),
631 None => {
632 bail!(
633 "unable to find tape encryption key config '{}'",
634 fingerprint
635 );
636 }
637 }
638 } else {
639 None
640 };
641
642 let new_media = match old_set {
643 None => {
644 task_log!(worker, "writing new media set label");
645 drive.write_media_set_label(new_set, key_config.as_ref())?;
646 media_catalog = MediaCatalog::overwrite(TAPE_STATUS_DIR, media_id, false)?;
647 true
648 }
649 Some(media_set_label) => {
650 if new_set.uuid == media_set_label.uuid {
651 if new_set.seq_nr != media_set_label.seq_nr {
652 bail!(
653 "got media with wrong media sequence number ({} != {}",
654 new_set.seq_nr,
655 media_set_label.seq_nr
656 );
657 }
658 if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint
659 {
660 bail!("detected changed encryption fingerprint - internal error");
661 }
662 media_catalog = MediaCatalog::open(TAPE_STATUS_DIR, media_id, true, false)?;
663
664 // todo: verify last content/media_catalog somehow?
665
666 false
667 } else {
668 task_log!(
669 worker,
670 "writing new media set label (overwrite '{}/{}')",
671 media_set_label.uuid.to_string(),
672 media_set_label.seq_nr,
673 );
674
675 drive.write_media_set_label(new_set, key_config.as_ref())?;
676 media_catalog = MediaCatalog::overwrite(TAPE_STATUS_DIR, media_id, false)?;
677 true
678 }
679 }
680 };
681
682 Ok((media_catalog, new_media))
683 }