]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer/mod.rs
eed0d61e1c24a9f6cc875967f6a36136a4452944
[proxmox-backup.git] / src / tape / pool_writer / mod.rs
1 mod catalog_set;
2 pub use catalog_set::*;
3
4 mod new_chunks_iterator;
5 pub use new_chunks_iterator::*;
6
7 use std::path::Path;
8 use std::fs::File;
9 use std::time::SystemTime;
10 use std::sync::{Arc, Mutex};
11
12 use anyhow::{bail, Error};
13
14 use proxmox::tools::Uuid;
15
16 use pbs_tools::{task_log, task_warn};
17 use pbs_config::tape_encryption_keys::load_key_configs;
18 use pbs_tape::{
19 TapeWrite,
20 sg_tape::tape_alert_flags_critical,
21 };
22 use pbs_datastore::{DataStore, SnapshotReader};
23 use proxmox_rest_server::WorkerTask;
24
25 use crate::{
26 tape::{
27 TAPE_STATUS_DIR,
28 MAX_CHUNK_ARCHIVE_SIZE,
29 COMMIT_BLOCK_SIZE,
30 MediaPool,
31 MediaId,
32 MediaCatalog,
33 file_formats::{
34 MediaSetLabel,
35 ChunkArchiveWriter,
36 tape_write_snapshot_archive,
37 tape_write_catalog,
38 },
39 drive::{
40 TapeDriver,
41 request_and_load_media,
42 media_changer,
43 },
44 },
45 };
46
47
48 struct PoolWriterState {
49 drive: Box<dyn TapeDriver>,
50 // Media Uuid from loaded media
51 media_uuid: Uuid,
52 // tell if we already moved to EOM
53 at_eom: bool,
54 // bytes written after the last tape fush/sync
55 bytes_written: usize,
56 }
57
58 /// Helper to manage a backup job, writing several tapes of a pool
59 pub struct PoolWriter {
60 pool: MediaPool,
61 drive_name: String,
62 status: Option<PoolWriterState>,
63 catalog_set: Arc<Mutex<CatalogSet>>,
64 notify_email: Option<String>,
65 }
66
67 impl PoolWriter {
68
69 pub fn new(
70 mut pool: MediaPool,
71 drive_name: &str,
72 worker: &WorkerTask,
73 notify_email: Option<String>,
74 force_media_set: bool,
75 ) -> Result<Self, Error> {
76
77 let current_time = proxmox::tools::time::epoch_i64();
78
79 let new_media_set_reason = pool.start_write_session(current_time, force_media_set)?;
80 if let Some(reason) = new_media_set_reason {
81 task_log!(
82 worker,
83 "starting new media set - reason: {}",
84 reason,
85 );
86 }
87
88 let media_set_uuid = pool.current_media_set().uuid();
89 task_log!(worker, "media set uuid: {}", media_set_uuid);
90
91 let mut catalog_set = CatalogSet::new();
92
93 // load all catalogs read-only at start
94 for media_uuid in pool.current_media_list()? {
95 let media_info = pool.lookup_media(media_uuid).unwrap();
96 let media_catalog = MediaCatalog::open(
97 Path::new(TAPE_STATUS_DIR),
98 media_info.id(),
99 false,
100 false,
101 )?;
102 catalog_set.append_read_only_catalog(media_catalog)?;
103 }
104
105 Ok(Self {
106 pool,
107 drive_name: drive_name.to_string(),
108 status: None,
109 catalog_set: Arc::new(Mutex::new(catalog_set)),
110 notify_email,
111 })
112 }
113
114 pub fn pool(&mut self) -> &mut MediaPool {
115 &mut self.pool
116 }
117
118 /// Set media status to FULL (persistent - stores pool status)
119 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
120 self.pool.set_media_status_full(&uuid)?;
121 Ok(())
122 }
123
124 pub fn contains_snapshot(&self, store: &str, snapshot: &str) -> bool {
125 self.catalog_set.lock().unwrap().contains_snapshot(store, snapshot)
126 }
127
128 /// Eject media and drop PoolWriterState (close drive)
129 pub fn eject_media(&mut self, worker: &WorkerTask) -> Result<(), Error> {
130 let mut status = match self.status.take() {
131 Some(status) => status,
132 None => return Ok(()), // no media loaded
133 };
134
135 let (drive_config, _digest) = pbs_config::drive::config()?;
136
137 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
138 task_log!(worker, "eject media");
139 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
140 drop(status); // close drive
141 task_log!(worker, "unload media");
142 changer.unload_media(None)?; //eject and unload
143 } else {
144 task_log!(worker, "standalone drive - ejecting media");
145 status.drive.eject_media()?;
146 }
147
148 Ok(())
149 }
150
151 /// Export current media set and drop PoolWriterState (close drive)
152 pub fn export_media_set(&mut self, worker: &WorkerTask) -> Result<(), Error> {
153 let mut status = self.status.take();
154
155 let (drive_config, _digest) = pbs_config::drive::config()?;
156
157 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
158
159 if let Some(ref mut status) = status {
160 task_log!(worker, "eject media");
161 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
162 }
163 drop(status); // close drive
164
165 task_log!(worker, "unload media");
166 changer.unload_media(None)?;
167
168 for media_uuid in self.pool.current_media_list()? {
169 let media = self.pool.lookup_media(media_uuid)?;
170 let label_text = media.label_text();
171 if let Some(slot) = changer.export_media(label_text)? {
172 task_log!(worker, "exported media '{}' to import/export slot {}", label_text, slot);
173 } else {
174 task_warn!(worker, "export failed - media '{}' is not online", label_text);
175 }
176 }
177
178 } else if let Some(mut status) = status {
179 task_log!(worker, "standalone drive - ejecting media instead of export");
180 status.drive.eject_media()?;
181 }
182
183 Ok(())
184 }
185
186 /// commit changes to tape and catalog
187 ///
188 /// This is done automatically during a backupsession, but needs to
189 /// be called explicitly before dropping the PoolWriter
190 pub fn commit(&mut self) -> Result<(), Error> {
191 if let Some(PoolWriterState {ref mut drive, .. }) = self.status {
192 drive.sync()?; // sync all data to the tape
193 }
194 self.catalog_set.lock().unwrap().commit()?; // then commit the catalog
195 Ok(())
196 }
197
198 /// Load a writable media into the drive
199 pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
200 let last_media_uuid = match self.status {
201 Some(PoolWriterState { ref media_uuid, ..}) => Some(media_uuid.clone()),
202 None => None,
203 };
204
205 let current_time = proxmox::tools::time::epoch_i64();
206 let media_uuid = self.pool.alloc_writable_media(current_time)?;
207
208 let media = self.pool.lookup_media(&media_uuid).unwrap();
209
210 let media_changed = match last_media_uuid {
211 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
212 None => true,
213 };
214
215 if !media_changed {
216 return Ok(media_uuid);
217 }
218
219 task_log!(worker, "allocated new writable media '{}'", media.label_text());
220
221 if let Some(PoolWriterState {mut drive, .. }) = self.status.take() {
222 if last_media_uuid.is_some() {
223 task_log!(worker, "eject current media");
224 drive.eject_media()?;
225 }
226 }
227
228 let (drive_config, _digest) = pbs_config::drive::config()?;
229
230 let (mut drive, old_media_id) =
231 request_and_load_media(worker, &drive_config, &self.drive_name, media.label(), &self.notify_email)?;
232
233 // test for critical tape alert flags
234 if let Ok(alert_flags) = drive.tape_alert_flags() {
235 if !alert_flags.is_empty() {
236 task_log!(worker, "TapeAlertFlags: {:?}", alert_flags);
237 if tape_alert_flags_critical(alert_flags) {
238 self.pool.set_media_status_damaged(&media_uuid)?;
239 bail!("aborting due to critical tape alert flags: {:?}", alert_flags);
240 }
241 }
242 }
243
244 let (catalog, is_new_media) = update_media_set_label(
245 worker,
246 drive.as_mut(),
247 old_media_id.media_set_label,
248 media.id(),
249 )?;
250
251 self.catalog_set.lock().unwrap().append_catalog(catalog)?;
252
253 let media_set = media.media_set_label().clone().unwrap();
254
255 let encrypt_fingerprint = media_set
256 .encryption_key_fingerprint
257 .clone()
258 .map(|fp| (fp, media_set.uuid.clone()));
259
260 drive.set_encryption(encrypt_fingerprint)?;
261
262 self.status = Some(PoolWriterState {
263 drive,
264 media_uuid: media_uuid.clone(),
265 at_eom: false,
266 bytes_written: 0,
267 });
268
269 if is_new_media {
270 // add catalogs from previous media
271 self.append_media_set_catalogs(worker)?;
272 }
273
274 Ok(media_uuid)
275 }
276
277 fn open_catalog_file(uuid: &Uuid) -> Result<File, Error> {
278
279 let status_path = Path::new(TAPE_STATUS_DIR);
280 let mut path = status_path.to_owned();
281 path.push(uuid.to_string());
282 path.set_extension("log");
283
284 let file = std::fs::OpenOptions::new()
285 .read(true)
286 .open(&path)?;
287
288 Ok(file)
289 }
290
291 // Check it tape is loaded, then move to EOM (if not already there)
292 //
293 // Returns the tape position at EOM.
294 fn prepare_tape_write(
295 status: &mut PoolWriterState,
296 worker: &WorkerTask,
297 ) -> Result<u64, Error> {
298
299 if !status.at_eom {
300 task_log!(worker, "moving to end of media");
301 status.drive.move_to_eom(true)?;
302 status.at_eom = true;
303 }
304
305 let current_file_number = status.drive.current_file_number()?;
306 if current_file_number < 2 {
307 bail!("got strange file position number from drive ({})", current_file_number);
308 }
309
310 Ok(current_file_number)
311 }
312
313 /// Move to EOM (if not already there), then write the current
314 /// catalog to the tape. On success, this return 'Ok(true)'.
315
316 /// Please note that this may fail when there is not enough space
317 /// on the media (return value 'Ok(false, _)'). In that case, the
318 /// archive is marked incomplete. The caller should mark the media
319 /// as full and try again using another media.
320 pub fn append_catalog_archive(
321 &mut self,
322 worker: &WorkerTask,
323 ) -> Result<bool, Error> {
324
325 let status = match self.status {
326 Some(ref mut status) => status,
327 None => bail!("PoolWriter - no media loaded"),
328 };
329
330 Self::prepare_tape_write(status, worker)?;
331
332 let catalog_set = self.catalog_set.lock().unwrap();
333
334 let catalog = match catalog_set.catalog {
335 None => bail!("append_catalog_archive failed: no catalog - internal error"),
336 Some(ref catalog) => catalog,
337 };
338
339 let media_set = self.pool.current_media_set();
340
341 let media_list = media_set.media_list();
342 let uuid = match media_list.last() {
343 None => bail!("got empty media list - internal error"),
344 Some(None) => bail!("got incomplete media list - internal error"),
345 Some(Some(last_uuid)) => {
346 if last_uuid != catalog.uuid() {
347 bail!("got wrong media - internal error");
348 }
349 last_uuid
350 }
351 };
352
353 let seq_nr = media_list.len() - 1;
354
355 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
356
357 let mut file = Self::open_catalog_file(uuid)?;
358
359 let done = tape_write_catalog(
360 writer.as_mut(),
361 uuid,
362 media_set.uuid(),
363 seq_nr,
364 &mut file,
365 )?.is_some();
366
367 Ok(done)
368 }
369
370 // Append catalogs for all previous media in set (without last)
371 fn append_media_set_catalogs(
372 &mut self,
373 worker: &WorkerTask,
374 ) -> Result<(), Error> {
375
376 let media_set = self.pool.current_media_set();
377
378 let mut media_list = &media_set.media_list()[..];
379 if media_list.len() < 2 {
380 return Ok(());
381 }
382 media_list = &media_list[..(media_list.len()-1)];
383
384 let status = match self.status {
385 Some(ref mut status) => status,
386 None => bail!("PoolWriter - no media loaded"),
387 };
388
389 Self::prepare_tape_write(status, worker)?;
390
391 for (seq_nr, uuid) in media_list.iter().enumerate() {
392
393 let uuid = match uuid {
394 None => bail!("got incomplete media list - internal error"),
395 Some(uuid) => uuid,
396 };
397
398 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
399
400 let mut file = Self::open_catalog_file(uuid)?;
401
402 task_log!(worker, "write catalog for previous media: {}", uuid);
403
404 if tape_write_catalog(
405 writer.as_mut(),
406 uuid,
407 media_set.uuid(),
408 seq_nr,
409 &mut file,
410 )?.is_none() {
411 bail!("got EOM while writing start catalog");
412 }
413 }
414
415 Ok(())
416 }
417
418 /// Move to EOM (if not already there), then creates a new snapshot
419 /// archive writing specified files (as .pxar) into it. On
420 /// success, this return 'Ok(true)' and the media catalog gets
421 /// updated.
422
423 /// Please note that this may fail when there is not enough space
424 /// on the media (return value 'Ok(false, _)'). In that case, the
425 /// archive is marked incomplete, and we do not use it. The caller
426 /// should mark the media as full and try again using another
427 /// media.
428 pub fn append_snapshot_archive(
429 &mut self,
430 worker: &WorkerTask,
431 snapshot_reader: &SnapshotReader,
432 ) -> Result<(bool, usize), Error> {
433
434 let status = match self.status {
435 Some(ref mut status) => status,
436 None => bail!("PoolWriter - no media loaded"),
437 };
438
439 let current_file_number = Self::prepare_tape_write(status, worker)?;
440
441 let (done, bytes_written) = {
442 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
443
444 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
445 Some(content_uuid) => {
446 self.catalog_set.lock().unwrap().register_snapshot(
447 content_uuid,
448 current_file_number,
449 &snapshot_reader.datastore_name().to_string(),
450 &snapshot_reader.snapshot().to_string(),
451 )?;
452 (true, writer.bytes_written())
453 }
454 None => (false, writer.bytes_written()),
455 }
456 };
457
458 status.bytes_written += bytes_written;
459
460 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
461
462 if !done || request_sync {
463 self.commit()?;
464 }
465
466 Ok((done, bytes_written))
467 }
468
469 /// Move to EOM (if not already there), then creates a new chunk
470 /// archive and writes chunks from 'chunk_iter'. This stops when
471 /// it detect LEOM or when we reach max archive size
472 /// (4GB). Written chunks are registered in the media catalog.
473 pub fn append_chunk_archive(
474 &mut self,
475 worker: &WorkerTask,
476 chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
477 store: &str,
478 ) -> Result<(bool, usize), Error> {
479
480 let status = match self.status {
481 Some(ref mut status) => status,
482 None => bail!("PoolWriter - no media loaded"),
483 };
484
485 let current_file_number = Self::prepare_tape_write(status, worker)?;
486
487 let writer = status.drive.write_file()?;
488
489 let start_time = SystemTime::now();
490
491 let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
492 worker,
493 writer,
494 chunk_iter,
495 store,
496 MAX_CHUNK_ARCHIVE_SIZE,
497 )?;
498
499 status.bytes_written += bytes_written;
500
501 let elapsed = start_time.elapsed()?.as_secs_f64();
502 task_log!(
503 worker,
504 "wrote {} chunks ({:.2} MB at {:.2} MB/s)",
505 saved_chunks.len(),
506 bytes_written as f64 /1_000_000.0,
507 (bytes_written as f64)/(1_000_000.0*elapsed),
508 );
509
510 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
511
512 // register chunks in media_catalog
513 self.catalog_set.lock().unwrap()
514 .register_chunk_archive(content_uuid, current_file_number, store, &saved_chunks)?;
515
516 if leom || request_sync {
517 self.commit()?;
518 }
519
520 Ok((leom, bytes_written))
521 }
522
523 pub fn spawn_chunk_reader_thread(
524 &self,
525 datastore: Arc<DataStore>,
526 snapshot_reader: Arc<Mutex<SnapshotReader>>,
527 ) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
528 NewChunksIterator::spawn(
529 datastore,
530 snapshot_reader,
531 Arc::clone(&self.catalog_set),
532 )
533 }
534 }
535
536 /// write up to <max_size> of chunks
537 fn write_chunk_archive<'a>(
538 _worker: &WorkerTask,
539 writer: Box<dyn 'a + TapeWrite>,
540 chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
541 store: &str,
542 max_size: usize,
543 ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
544
545 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, store, true)?;
546
547 // we want to get the chunk list in correct order
548 let mut chunk_list: Vec<[u8;32]> = Vec::new();
549
550 let mut leom = false;
551
552 loop {
553 let (digest, blob) = match chunk_iter.peek() {
554 None => break,
555 Some(Ok((digest, blob))) => (digest, blob),
556 Some(Err(err)) => bail!("{}", err),
557 };
558
559 //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(digest), blob.raw_size());
560
561 match writer.try_write_chunk(&digest, &blob) {
562 Ok(true) => {
563 chunk_list.push(*digest);
564 chunk_iter.next(); // consume
565 }
566 Ok(false) => {
567 // Note; we do not consume the chunk (no chunk_iter.next())
568 leom = true;
569 break;
570 }
571 Err(err) => bail!("write chunk failed - {}", err),
572 }
573
574 if writer.bytes_written() > max_size {
575 //task_log!(worker, "Chunk Archive max size reached, closing archive");
576 break;
577 }
578 }
579
580 writer.finish()?;
581
582 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
583 }
584
585 // Compare the media set label. If the media is empty, or the existing
586 // set label does not match the expected media set, overwrite the
587 // media set label.
588 fn update_media_set_label(
589 worker: &WorkerTask,
590 drive: &mut dyn TapeDriver,
591 old_set: Option<MediaSetLabel>,
592 media_id: &MediaId,
593 ) -> Result<(MediaCatalog, bool), Error> {
594
595 let media_catalog;
596
597 let new_set = match media_id.media_set_label {
598 None => bail!("got media without media set - internal error"),
599 Some(ref set) => set,
600 };
601
602 let key_config = if let Some(ref fingerprint) = new_set.encryption_key_fingerprint {
603 let (config_map, _digest) = load_key_configs()?;
604 match config_map.get(fingerprint) {
605 Some(key_config) => Some(key_config.clone()),
606 None => {
607 bail!("unable to find tape encryption key config '{}'", fingerprint);
608 }
609 }
610 } else {
611 None
612 };
613
614 let status_path = Path::new(TAPE_STATUS_DIR);
615
616 let new_media = match old_set {
617 None => {
618 task_log!(worker, "writing new media set label");
619 drive.write_media_set_label(new_set, key_config.as_ref())?;
620 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
621 true
622 }
623 Some(media_set_label) => {
624 if new_set.uuid == media_set_label.uuid {
625 if new_set.seq_nr != media_set_label.seq_nr {
626 bail!("got media with wrong media sequence number ({} != {}",
627 new_set.seq_nr,media_set_label.seq_nr);
628 }
629 if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint {
630 bail!("detected changed encryption fingerprint - internal error");
631 }
632 media_catalog = MediaCatalog::open(status_path, &media_id, true, false)?;
633
634 // todo: verify last content/media_catalog somehow?
635
636 false
637 } else {
638 task_log!(
639 worker,
640 "writing new media set label (overwrite '{}/{}')",
641 media_set_label.uuid.to_string(),
642 media_set_label.seq_nr,
643 );
644
645 drive.write_media_set_label(new_set, key_config.as_ref())?;
646 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
647 true
648 }
649 }
650 };
651
652 Ok((media_catalog, new_media))
653 }