]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer/mod.rs
5d8d1644aa70d89e656479a093ae8cf087c613b9
[proxmox-backup.git] / src / tape / pool_writer / mod.rs
1 mod catalog_set;
2 pub use catalog_set::*;
3
4 mod new_chunks_iterator;
5 pub use new_chunks_iterator::*;
6
7 use std::path::Path;
8 use std::fs::File;
9 use std::time::SystemTime;
10 use std::sync::{Arc, Mutex};
11
12 use anyhow::{bail, Error};
13
14 use proxmox_uuid::Uuid;
15
16 use pbs_tools::{task_log, task_warn};
17 use pbs_config::tape_encryption_keys::load_key_configs;
18 use pbs_tape::{
19 TapeWrite,
20 sg_tape::tape_alert_flags_critical,
21 };
22 use pbs_datastore::{DataStore, SnapshotReader};
23 use proxmox_rest_server::WorkerTask;
24
25 use crate::{
26 tape::{
27 TAPE_STATUS_DIR,
28 MAX_CHUNK_ARCHIVE_SIZE,
29 COMMIT_BLOCK_SIZE,
30 MediaPool,
31 MediaId,
32 MediaCatalog,
33 file_formats::{
34 MediaSetLabel,
35 ChunkArchiveWriter,
36 tape_write_snapshot_archive,
37 tape_write_catalog,
38 },
39 drive::{
40 TapeDriver,
41 request_and_load_media,
42 media_changer,
43 },
44 },
45 };
46
47
48 struct PoolWriterState {
49 drive: Box<dyn TapeDriver>,
50 // Media Uuid from loaded media
51 media_uuid: Uuid,
52 // tell if we already moved to EOM
53 at_eom: bool,
54 // bytes written after the last tape fush/sync
55 bytes_written: usize,
56 }
57
58 /// Helper to manage a backup job, writing several tapes of a pool
59 pub struct PoolWriter {
60 pool: MediaPool,
61 drive_name: String,
62 status: Option<PoolWriterState>,
63 catalog_set: Arc<Mutex<CatalogSet>>,
64 notify_email: Option<String>,
65 }
66
67 impl PoolWriter {
68
69 pub fn new(
70 mut pool: MediaPool,
71 drive_name: &str,
72 worker: &WorkerTask,
73 notify_email: Option<String>,
74 force_media_set: bool,
75 ) -> Result<Self, Error> {
76
77 let current_time = proxmox_time::epoch_i64();
78
79 let new_media_set_reason = pool.start_write_session(current_time, force_media_set)?;
80 if let Some(reason) = new_media_set_reason {
81 task_log!(
82 worker,
83 "starting new media set - reason: {}",
84 reason,
85 );
86 }
87
88 let media_set_uuid = pool.current_media_set().uuid();
89 task_log!(worker, "media set uuid: {}", media_set_uuid);
90
91 let mut catalog_set = CatalogSet::new();
92
93 // load all catalogs read-only at start
94 for media_uuid in pool.current_media_list()? {
95 let media_info = pool.lookup_media(media_uuid).unwrap();
96 let media_catalog = MediaCatalog::open(
97 Path::new(TAPE_STATUS_DIR),
98 media_info.id(),
99 false,
100 false,
101 )?;
102 catalog_set.append_read_only_catalog(media_catalog)?;
103 }
104
105 Ok(Self {
106 pool,
107 drive_name: drive_name.to_string(),
108 status: None,
109 catalog_set: Arc::new(Mutex::new(catalog_set)),
110 notify_email,
111 })
112 }
113
114 pub fn pool(&mut self) -> &mut MediaPool {
115 &mut self.pool
116 }
117
118 /// Set media status to FULL (persistent - stores pool status)
119 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
120 self.pool.set_media_status_full(&uuid)?;
121 Ok(())
122 }
123
124 pub fn contains_snapshot(&self, store: &str, snapshot: &str) -> bool {
125 self.catalog_set.lock().unwrap().contains_snapshot(store, snapshot)
126 }
127
128 /// Eject media and drop PoolWriterState (close drive)
129 pub fn eject_media(&mut self, worker: &WorkerTask) -> Result<(), Error> {
130 let mut status = match self.status.take() {
131 Some(status) => status,
132 None => return Ok(()), // no media loaded
133 };
134
135 let (drive_config, _digest) = pbs_config::drive::config()?;
136
137 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
138 task_log!(worker, "eject media");
139 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
140 drop(status); // close drive
141 task_log!(worker, "unload media");
142 changer.unload_media(None)?; //eject and unload
143 } else {
144 task_log!(worker, "standalone drive - ejecting media");
145 status.drive.eject_media()?;
146 }
147
148 Ok(())
149 }
150
151 /// Export current media set and drop PoolWriterState (close drive)
152 pub fn export_media_set(&mut self, worker: &WorkerTask) -> Result<(), Error> {
153 let mut status = self.status.take();
154
155 let (drive_config, _digest) = pbs_config::drive::config()?;
156
157 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
158
159 if let Some(ref mut status) = status {
160 task_log!(worker, "rewind media");
161 // rewind first so that the unload command later does not run into a timeout
162 status.drive.rewind()?;
163 }
164 drop(status); // close drive
165
166 for media_uuid in self.pool.current_media_list()? {
167 let media = self.pool.lookup_media(media_uuid)?;
168 let label_text = media.label_text();
169 if let Some(slot) = changer.export_media(label_text)? {
170 task_log!(worker, "exported media '{}' to import/export slot {}", label_text, slot);
171 } else {
172 task_warn!(worker, "export failed - media '{}' is not online or in different drive", label_text);
173 }
174 }
175
176 } else if let Some(mut status) = status {
177 task_log!(worker, "standalone drive - ejecting media instead of export");
178 status.drive.eject_media()?;
179 }
180
181 Ok(())
182 }
183
184 /// commit changes to tape and catalog
185 ///
186 /// This is done automatically during a backupsession, but needs to
187 /// be called explicitly before dropping the PoolWriter
188 pub fn commit(&mut self) -> Result<(), Error> {
189 if let Some(PoolWriterState {ref mut drive, .. }) = self.status {
190 drive.sync()?; // sync all data to the tape
191 }
192 self.catalog_set.lock().unwrap().commit()?; // then commit the catalog
193 Ok(())
194 }
195
196 /// Load a writable media into the drive
197 pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
198 let last_media_uuid = match self.status {
199 Some(PoolWriterState { ref media_uuid, ..}) => Some(media_uuid.clone()),
200 None => None,
201 };
202
203 let current_time = proxmox_time::epoch_i64();
204 let media_uuid = self.pool.alloc_writable_media(current_time)?;
205
206 let media = self.pool.lookup_media(&media_uuid).unwrap();
207
208 let media_changed = match last_media_uuid {
209 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
210 None => true,
211 };
212
213 if !media_changed {
214 return Ok(media_uuid);
215 }
216
217 task_log!(worker, "allocated new writable media '{}'", media.label_text());
218
219 if let Some(PoolWriterState {mut drive, .. }) = self.status.take() {
220 if last_media_uuid.is_some() {
221 task_log!(worker, "eject current media");
222 drive.eject_media()?;
223 }
224 }
225
226 let (drive_config, _digest) = pbs_config::drive::config()?;
227
228 let (mut drive, old_media_id) =
229 request_and_load_media(worker, &drive_config, &self.drive_name, media.label(), &self.notify_email)?;
230
231 // test for critical tape alert flags
232 if let Ok(alert_flags) = drive.tape_alert_flags() {
233 if !alert_flags.is_empty() {
234 task_log!(worker, "TapeAlertFlags: {:?}", alert_flags);
235 if tape_alert_flags_critical(alert_flags) {
236 self.pool.set_media_status_damaged(&media_uuid)?;
237 bail!("aborting due to critical tape alert flags: {:?}", alert_flags);
238 }
239 }
240 }
241
242 let (catalog, is_new_media) = update_media_set_label(
243 worker,
244 drive.as_mut(),
245 old_media_id.media_set_label,
246 media.id(),
247 )?;
248
249 self.catalog_set.lock().unwrap().append_catalog(catalog)?;
250
251 let media_set = media.media_set_label().clone().unwrap();
252
253 let encrypt_fingerprint = media_set
254 .encryption_key_fingerprint
255 .clone()
256 .map(|fp| (fp, media_set.uuid.clone()));
257
258 drive.set_encryption(encrypt_fingerprint)?;
259
260 self.status = Some(PoolWriterState {
261 drive,
262 media_uuid: media_uuid.clone(),
263 at_eom: false,
264 bytes_written: 0,
265 });
266
267 if is_new_media {
268 // add catalogs from previous media
269 self.append_media_set_catalogs(worker)?;
270 }
271
272 Ok(media_uuid)
273 }
274
275 fn open_catalog_file(uuid: &Uuid) -> Result<File, Error> {
276
277 let status_path = Path::new(TAPE_STATUS_DIR);
278 let mut path = status_path.to_owned();
279 path.push(uuid.to_string());
280 path.set_extension("log");
281
282 let file = std::fs::OpenOptions::new()
283 .read(true)
284 .open(&path)?;
285
286 Ok(file)
287 }
288
289 // Check it tape is loaded, then move to EOM (if not already there)
290 //
291 // Returns the tape position at EOM.
292 fn prepare_tape_write(
293 status: &mut PoolWriterState,
294 worker: &WorkerTask,
295 ) -> Result<u64, Error> {
296
297 if !status.at_eom {
298 task_log!(worker, "moving to end of media");
299 status.drive.move_to_eom(true)?;
300 status.at_eom = true;
301 }
302
303 let current_file_number = status.drive.current_file_number()?;
304 if current_file_number < 2 {
305 bail!("got strange file position number from drive ({})", current_file_number);
306 }
307
308 Ok(current_file_number)
309 }
310
311 /// Move to EOM (if not already there), then write the current
312 /// catalog to the tape. On success, this return 'Ok(true)'.
313
314 /// Please note that this may fail when there is not enough space
315 /// on the media (return value 'Ok(false, _)'). In that case, the
316 /// archive is marked incomplete. The caller should mark the media
317 /// as full and try again using another media.
318 pub fn append_catalog_archive(
319 &mut self,
320 worker: &WorkerTask,
321 ) -> Result<bool, Error> {
322
323 let status = match self.status {
324 Some(ref mut status) => status,
325 None => bail!("PoolWriter - no media loaded"),
326 };
327
328 Self::prepare_tape_write(status, worker)?;
329
330 let catalog_set = self.catalog_set.lock().unwrap();
331
332 let catalog = match catalog_set.catalog {
333 None => bail!("append_catalog_archive failed: no catalog - internal error"),
334 Some(ref catalog) => catalog,
335 };
336
337 let media_set = self.pool.current_media_set();
338
339 let media_list = media_set.media_list();
340 let uuid = match media_list.last() {
341 None => bail!("got empty media list - internal error"),
342 Some(None) => bail!("got incomplete media list - internal error"),
343 Some(Some(last_uuid)) => {
344 if last_uuid != catalog.uuid() {
345 bail!("got wrong media - internal error");
346 }
347 last_uuid
348 }
349 };
350
351 let seq_nr = media_list.len() - 1;
352
353 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
354
355 let mut file = Self::open_catalog_file(uuid)?;
356
357 let done = tape_write_catalog(
358 writer.as_mut(),
359 uuid,
360 media_set.uuid(),
361 seq_nr,
362 &mut file,
363 )?.is_some();
364
365 Ok(done)
366 }
367
368 // Append catalogs for all previous media in set (without last)
369 fn append_media_set_catalogs(
370 &mut self,
371 worker: &WorkerTask,
372 ) -> Result<(), Error> {
373
374 let media_set = self.pool.current_media_set();
375
376 let mut media_list = &media_set.media_list()[..];
377 if media_list.len() < 2 {
378 return Ok(());
379 }
380 media_list = &media_list[..(media_list.len()-1)];
381
382 let status = match self.status {
383 Some(ref mut status) => status,
384 None => bail!("PoolWriter - no media loaded"),
385 };
386
387 Self::prepare_tape_write(status, worker)?;
388
389 for (seq_nr, uuid) in media_list.iter().enumerate() {
390
391 let uuid = match uuid {
392 None => bail!("got incomplete media list - internal error"),
393 Some(uuid) => uuid,
394 };
395
396 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
397
398 let mut file = Self::open_catalog_file(uuid)?;
399
400 task_log!(worker, "write catalog for previous media: {}", uuid);
401
402 if tape_write_catalog(
403 writer.as_mut(),
404 uuid,
405 media_set.uuid(),
406 seq_nr,
407 &mut file,
408 )?.is_none() {
409 bail!("got EOM while writing start catalog");
410 }
411 }
412
413 Ok(())
414 }
415
416 /// Move to EOM (if not already there), then creates a new snapshot
417 /// archive writing specified files (as .pxar) into it. On
418 /// success, this return 'Ok(true)' and the media catalog gets
419 /// updated.
420
421 /// Please note that this may fail when there is not enough space
422 /// on the media (return value 'Ok(false, _)'). In that case, the
423 /// archive is marked incomplete, and we do not use it. The caller
424 /// should mark the media as full and try again using another
425 /// media.
426 pub fn append_snapshot_archive(
427 &mut self,
428 worker: &WorkerTask,
429 snapshot_reader: &SnapshotReader,
430 ) -> Result<(bool, usize), Error> {
431
432 let status = match self.status {
433 Some(ref mut status) => status,
434 None => bail!("PoolWriter - no media loaded"),
435 };
436
437 let current_file_number = Self::prepare_tape_write(status, worker)?;
438
439 let (done, bytes_written) = {
440 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
441
442 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
443 Some(content_uuid) => {
444 self.catalog_set.lock().unwrap().register_snapshot(
445 content_uuid,
446 current_file_number,
447 &snapshot_reader.datastore_name().to_string(),
448 &snapshot_reader.snapshot().to_string(),
449 )?;
450 (true, writer.bytes_written())
451 }
452 None => (false, writer.bytes_written()),
453 }
454 };
455
456 status.bytes_written += bytes_written;
457
458 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
459
460 if !done || request_sync {
461 self.commit()?;
462 }
463
464 Ok((done, bytes_written))
465 }
466
467 /// Move to EOM (if not already there), then creates a new chunk
468 /// archive and writes chunks from 'chunk_iter'. This stops when
469 /// it detect LEOM or when we reach max archive size
470 /// (4GB). Written chunks are registered in the media catalog.
471 pub fn append_chunk_archive(
472 &mut self,
473 worker: &WorkerTask,
474 chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
475 store: &str,
476 ) -> Result<(bool, usize), Error> {
477
478 let status = match self.status {
479 Some(ref mut status) => status,
480 None => bail!("PoolWriter - no media loaded"),
481 };
482
483 let current_file_number = Self::prepare_tape_write(status, worker)?;
484
485 let writer = status.drive.write_file()?;
486
487 let start_time = SystemTime::now();
488
489 let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
490 worker,
491 writer,
492 chunk_iter,
493 store,
494 MAX_CHUNK_ARCHIVE_SIZE,
495 )?;
496
497 status.bytes_written += bytes_written;
498
499 let elapsed = start_time.elapsed()?.as_secs_f64();
500 task_log!(
501 worker,
502 "wrote {} chunks ({:.2} MB at {:.2} MB/s)",
503 saved_chunks.len(),
504 bytes_written as f64 /1_000_000.0,
505 (bytes_written as f64)/(1_000_000.0*elapsed),
506 );
507
508 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
509
510 // register chunks in media_catalog
511 self.catalog_set.lock().unwrap()
512 .register_chunk_archive(content_uuid, current_file_number, store, &saved_chunks)?;
513
514 if leom || request_sync {
515 self.commit()?;
516 }
517
518 Ok((leom, bytes_written))
519 }
520
521 pub fn spawn_chunk_reader_thread(
522 &self,
523 datastore: Arc<DataStore>,
524 snapshot_reader: Arc<Mutex<SnapshotReader>>,
525 ) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
526 NewChunksIterator::spawn(
527 datastore,
528 snapshot_reader,
529 Arc::clone(&self.catalog_set),
530 )
531 }
532 }
533
534 /// write up to <max_size> of chunks
535 fn write_chunk_archive<'a>(
536 _worker: &WorkerTask,
537 writer: Box<dyn 'a + TapeWrite>,
538 chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
539 store: &str,
540 max_size: usize,
541 ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
542
543 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, store, true)?;
544
545 // we want to get the chunk list in correct order
546 let mut chunk_list: Vec<[u8;32]> = Vec::new();
547
548 let mut leom = false;
549
550 loop {
551 let (digest, blob) = match chunk_iter.peek() {
552 None => break,
553 Some(Ok((digest, blob))) => (digest, blob),
554 Some(Err(err)) => bail!("{}", err),
555 };
556
557 //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(digest), blob.raw_size());
558
559 match writer.try_write_chunk(&digest, &blob) {
560 Ok(true) => {
561 chunk_list.push(*digest);
562 chunk_iter.next(); // consume
563 }
564 Ok(false) => {
565 // Note; we do not consume the chunk (no chunk_iter.next())
566 leom = true;
567 break;
568 }
569 Err(err) => bail!("write chunk failed - {}", err),
570 }
571
572 if writer.bytes_written() > max_size {
573 //task_log!(worker, "Chunk Archive max size reached, closing archive");
574 break;
575 }
576 }
577
578 writer.finish()?;
579
580 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
581 }
582
583 // Compare the media set label. If the media is empty, or the existing
584 // set label does not match the expected media set, overwrite the
585 // media set label.
586 fn update_media_set_label(
587 worker: &WorkerTask,
588 drive: &mut dyn TapeDriver,
589 old_set: Option<MediaSetLabel>,
590 media_id: &MediaId,
591 ) -> Result<(MediaCatalog, bool), Error> {
592
593 let media_catalog;
594
595 let new_set = match media_id.media_set_label {
596 None => bail!("got media without media set - internal error"),
597 Some(ref set) => set,
598 };
599
600 let key_config = if let Some(ref fingerprint) = new_set.encryption_key_fingerprint {
601 let (config_map, _digest) = load_key_configs()?;
602 match config_map.get(fingerprint) {
603 Some(key_config) => Some(key_config.clone()),
604 None => {
605 bail!("unable to find tape encryption key config '{}'", fingerprint);
606 }
607 }
608 } else {
609 None
610 };
611
612 let status_path = Path::new(TAPE_STATUS_DIR);
613
614 let new_media = match old_set {
615 None => {
616 task_log!(worker, "writing new media set label");
617 drive.write_media_set_label(new_set, key_config.as_ref())?;
618 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
619 true
620 }
621 Some(media_set_label) => {
622 if new_set.uuid == media_set_label.uuid {
623 if new_set.seq_nr != media_set_label.seq_nr {
624 bail!("got media with wrong media sequence number ({} != {}",
625 new_set.seq_nr,media_set_label.seq_nr);
626 }
627 if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint {
628 bail!("detected changed encryption fingerprint - internal error");
629 }
630 media_catalog = MediaCatalog::open(status_path, &media_id, true, false)?;
631
632 // todo: verify last content/media_catalog somehow?
633
634 false
635 } else {
636 task_log!(
637 worker,
638 "writing new media set label (overwrite '{}/{}')",
639 media_set_label.uuid.to_string(),
640 media_set_label.seq_nr,
641 );
642
643 drive.write_media_set_label(new_set, key_config.as_ref())?;
644 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
645 true
646 }
647 }
648 };
649
650 Ok((media_catalog, new_media))
651 }