]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer/mod.rs
move chunk_store to pbs-datastore
[proxmox-backup.git] / src / tape / pool_writer / mod.rs
1 mod catalog_set;
2 pub use catalog_set::*;
3
4 mod new_chunks_iterator;
5 pub use new_chunks_iterator::*;
6
7 use std::path::Path;
8 use std::fs::File;
9 use std::time::SystemTime;
10 use std::sync::{Arc, Mutex};
11
12 use anyhow::{bail, Error};
13
14 use proxmox::tools::Uuid;
15
16 use pbs_datastore::task_log;
17
18 use crate::{
19 backup::{
20 DataStore,
21 },
22 server::WorkerTask,
23 tape::{
24 TAPE_STATUS_DIR,
25 MAX_CHUNK_ARCHIVE_SIZE,
26 COMMIT_BLOCK_SIZE,
27 TapeWrite,
28 SnapshotReader,
29 MediaPool,
30 MediaId,
31 MediaCatalog,
32 file_formats::{
33 MediaSetLabel,
34 ChunkArchiveWriter,
35 tape_write_snapshot_archive,
36 tape_write_catalog,
37 },
38 drive::{
39 TapeDriver,
40 request_and_load_media,
41 tape_alert_flags_critical,
42 media_changer,
43 },
44 },
45 config::tape_encryption_keys::load_key_configs,
46 };
47
48
49 struct PoolWriterState {
50 drive: Box<dyn TapeDriver>,
51 // Media Uuid from loaded media
52 media_uuid: Uuid,
53 // tell if we already moved to EOM
54 at_eom: bool,
55 // bytes written after the last tape fush/sync
56 bytes_written: usize,
57 }
58
59 /// Helper to manage a backup job, writing several tapes of a pool
60 pub struct PoolWriter {
61 pool: MediaPool,
62 drive_name: String,
63 status: Option<PoolWriterState>,
64 catalog_set: Arc<Mutex<CatalogSet>>,
65 notify_email: Option<String>,
66 }
67
68 impl PoolWriter {
69
70 pub fn new(
71 mut pool: MediaPool,
72 drive_name: &str,
73 worker: &WorkerTask,
74 notify_email: Option<String>,
75 force_media_set: bool,
76 ) -> Result<Self, Error> {
77
78 let current_time = proxmox::tools::time::epoch_i64();
79
80 let new_media_set_reason = pool.start_write_session(current_time, force_media_set)?;
81 if let Some(reason) = new_media_set_reason {
82 task_log!(
83 worker,
84 "starting new media set - reason: {}",
85 reason,
86 );
87 }
88
89 let media_set_uuid = pool.current_media_set().uuid();
90 task_log!(worker, "media set uuid: {}", media_set_uuid);
91
92 let mut catalog_set = CatalogSet::new();
93
94 // load all catalogs read-only at start
95 for media_uuid in pool.current_media_list()? {
96 let media_info = pool.lookup_media(media_uuid).unwrap();
97 let media_catalog = MediaCatalog::open(
98 Path::new(TAPE_STATUS_DIR),
99 media_info.id(),
100 false,
101 false,
102 )?;
103 catalog_set.append_read_only_catalog(media_catalog)?;
104 }
105
106 Ok(Self {
107 pool,
108 drive_name: drive_name.to_string(),
109 status: None,
110 catalog_set: Arc::new(Mutex::new(catalog_set)),
111 notify_email,
112 })
113 }
114
115 pub fn pool(&mut self) -> &mut MediaPool {
116 &mut self.pool
117 }
118
119 /// Set media status to FULL (persistent - stores pool status)
120 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
121 self.pool.set_media_status_full(&uuid)?;
122 Ok(())
123 }
124
125 pub fn contains_snapshot(&self, store: &str, snapshot: &str) -> bool {
126 self.catalog_set.lock().unwrap().contains_snapshot(store, snapshot)
127 }
128
129 /// Eject media and drop PoolWriterState (close drive)
130 pub fn eject_media(&mut self, worker: &WorkerTask) -> Result<(), Error> {
131 let mut status = match self.status.take() {
132 Some(status) => status,
133 None => return Ok(()), // no media loaded
134 };
135
136 let (drive_config, _digest) = crate::config::drive::config()?;
137
138 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
139 worker.log("eject media");
140 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
141 drop(status); // close drive
142 worker.log("unload media");
143 changer.unload_media(None)?; //eject and unload
144 } else {
145 worker.log("standalone drive - ejecting media");
146 status.drive.eject_media()?;
147 }
148
149 Ok(())
150 }
151
152 /// Export current media set and drop PoolWriterState (close drive)
153 pub fn export_media_set(&mut self, worker: &WorkerTask) -> Result<(), Error> {
154 let mut status = self.status.take();
155
156 let (drive_config, _digest) = crate::config::drive::config()?;
157
158 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
159
160 if let Some(ref mut status) = status {
161 worker.log("eject media");
162 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
163 }
164 drop(status); // close drive
165
166 worker.log("unload media");
167 changer.unload_media(None)?;
168
169 for media_uuid in self.pool.current_media_list()? {
170 let media = self.pool.lookup_media(media_uuid)?;
171 let label_text = media.label_text();
172 if let Some(slot) = changer.export_media(label_text)? {
173 worker.log(format!("exported media '{}' to import/export slot {}", label_text, slot));
174 } else {
175 worker.warn(format!("export failed - media '{}' is not online", label_text));
176 }
177 }
178
179 } else if let Some(mut status) = status {
180 worker.log("standalone drive - ejecting media instead of export");
181 status.drive.eject_media()?;
182 }
183
184 Ok(())
185 }
186
187 /// commit changes to tape and catalog
188 ///
189 /// This is done automatically during a backupsession, but needs to
190 /// be called explicitly before dropping the PoolWriter
191 pub fn commit(&mut self) -> Result<(), Error> {
192 if let Some(PoolWriterState {ref mut drive, .. }) = self.status {
193 drive.sync()?; // sync all data to the tape
194 }
195 self.catalog_set.lock().unwrap().commit()?; // then commit the catalog
196 Ok(())
197 }
198
199 /// Load a writable media into the drive
200 pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
201 let last_media_uuid = match self.status {
202 Some(PoolWriterState { ref media_uuid, ..}) => Some(media_uuid.clone()),
203 None => None,
204 };
205
206 let current_time = proxmox::tools::time::epoch_i64();
207 let media_uuid = self.pool.alloc_writable_media(current_time)?;
208
209 let media = self.pool.lookup_media(&media_uuid).unwrap();
210
211 let media_changed = match last_media_uuid {
212 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
213 None => true,
214 };
215
216 if !media_changed {
217 return Ok(media_uuid);
218 }
219
220 task_log!(worker, "allocated new writable media '{}'", media.label_text());
221
222 if let Some(PoolWriterState {mut drive, .. }) = self.status.take() {
223 if last_media_uuid.is_some() {
224 task_log!(worker, "eject current media");
225 drive.eject_media()?;
226 }
227 }
228
229 let (drive_config, _digest) = crate::config::drive::config()?;
230
231 let (mut drive, old_media_id) =
232 request_and_load_media(worker, &drive_config, &self.drive_name, media.label(), &self.notify_email)?;
233
234 // test for critical tape alert flags
235 if let Ok(alert_flags) = drive.tape_alert_flags() {
236 if !alert_flags.is_empty() {
237 worker.log(format!("TapeAlertFlags: {:?}", alert_flags));
238 if tape_alert_flags_critical(alert_flags) {
239 self.pool.set_media_status_damaged(&media_uuid)?;
240 bail!("aborting due to critical tape alert flags: {:?}", alert_flags);
241 }
242 }
243 }
244
245 let (catalog, is_new_media) = update_media_set_label(
246 worker,
247 drive.as_mut(),
248 old_media_id.media_set_label,
249 media.id(),
250 )?;
251
252 self.catalog_set.lock().unwrap().append_catalog(catalog)?;
253
254 let media_set = media.media_set_label().clone().unwrap();
255
256 let encrypt_fingerprint = media_set
257 .encryption_key_fingerprint
258 .clone()
259 .map(|fp| (fp, media_set.uuid.clone()));
260
261 drive.set_encryption(encrypt_fingerprint)?;
262
263 self.status = Some(PoolWriterState {
264 drive,
265 media_uuid: media_uuid.clone(),
266 at_eom: false,
267 bytes_written: 0,
268 });
269
270 if is_new_media {
271 // add catalogs from previous media
272 self.append_media_set_catalogs(worker)?;
273 }
274
275 Ok(media_uuid)
276 }
277
278 fn open_catalog_file(uuid: &Uuid) -> Result<File, Error> {
279
280 let status_path = Path::new(TAPE_STATUS_DIR);
281 let mut path = status_path.to_owned();
282 path.push(uuid.to_string());
283 path.set_extension("log");
284
285 let file = std::fs::OpenOptions::new()
286 .read(true)
287 .open(&path)?;
288
289 Ok(file)
290 }
291
292 // Check it tape is loaded, then move to EOM (if not already there)
293 //
294 // Returns the tape position at EOM.
295 fn prepare_tape_write(
296 status: &mut PoolWriterState,
297 worker: &WorkerTask,
298 ) -> Result<u64, Error> {
299
300 if !status.at_eom {
301 worker.log(String::from("moving to end of media"));
302 status.drive.move_to_eom(true)?;
303 status.at_eom = true;
304 }
305
306 let current_file_number = status.drive.current_file_number()?;
307 if current_file_number < 2 {
308 bail!("got strange file position number from drive ({})", current_file_number);
309 }
310
311 Ok(current_file_number)
312 }
313
314 /// Move to EOM (if not already there), then write the current
315 /// catalog to the tape. On success, this return 'Ok(true)'.
316
317 /// Please note that this may fail when there is not enough space
318 /// on the media (return value 'Ok(false, _)'). In that case, the
319 /// archive is marked incomplete. The caller should mark the media
320 /// as full and try again using another media.
321 pub fn append_catalog_archive(
322 &mut self,
323 worker: &WorkerTask,
324 ) -> Result<bool, Error> {
325
326 let status = match self.status {
327 Some(ref mut status) => status,
328 None => bail!("PoolWriter - no media loaded"),
329 };
330
331 Self::prepare_tape_write(status, worker)?;
332
333 let catalog_set = self.catalog_set.lock().unwrap();
334
335 let catalog = match catalog_set.catalog {
336 None => bail!("append_catalog_archive failed: no catalog - internal error"),
337 Some(ref catalog) => catalog,
338 };
339
340 let media_set = self.pool.current_media_set();
341
342 let media_list = media_set.media_list();
343 let uuid = match media_list.last() {
344 None => bail!("got empty media list - internal error"),
345 Some(None) => bail!("got incomplete media list - internal error"),
346 Some(Some(last_uuid)) => {
347 if last_uuid != catalog.uuid() {
348 bail!("got wrong media - internal error");
349 }
350 last_uuid
351 }
352 };
353
354 let seq_nr = media_list.len() - 1;
355
356 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
357
358 let mut file = Self::open_catalog_file(uuid)?;
359
360 let done = tape_write_catalog(
361 writer.as_mut(),
362 uuid,
363 media_set.uuid(),
364 seq_nr,
365 &mut file,
366 )?.is_some();
367
368 Ok(done)
369 }
370
371 // Append catalogs for all previous media in set (without last)
372 fn append_media_set_catalogs(
373 &mut self,
374 worker: &WorkerTask,
375 ) -> Result<(), Error> {
376
377 let media_set = self.pool.current_media_set();
378
379 let mut media_list = &media_set.media_list()[..];
380 if media_list.len() < 2 {
381 return Ok(());
382 }
383 media_list = &media_list[..(media_list.len()-1)];
384
385 let status = match self.status {
386 Some(ref mut status) => status,
387 None => bail!("PoolWriter - no media loaded"),
388 };
389
390 Self::prepare_tape_write(status, worker)?;
391
392 for (seq_nr, uuid) in media_list.iter().enumerate() {
393
394 let uuid = match uuid {
395 None => bail!("got incomplete media list - internal error"),
396 Some(uuid) => uuid,
397 };
398
399 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
400
401 let mut file = Self::open_catalog_file(uuid)?;
402
403 task_log!(worker, "write catalog for previous media: {}", uuid);
404
405 if tape_write_catalog(
406 writer.as_mut(),
407 uuid,
408 media_set.uuid(),
409 seq_nr,
410 &mut file,
411 )?.is_none() {
412 bail!("got EOM while writing start catalog");
413 }
414 }
415
416 Ok(())
417 }
418
419 /// Move to EOM (if not already there), then creates a new snapshot
420 /// archive writing specified files (as .pxar) into it. On
421 /// success, this return 'Ok(true)' and the media catalog gets
422 /// updated.
423
424 /// Please note that this may fail when there is not enough space
425 /// on the media (return value 'Ok(false, _)'). In that case, the
426 /// archive is marked incomplete, and we do not use it. The caller
427 /// should mark the media as full and try again using another
428 /// media.
429 pub fn append_snapshot_archive(
430 &mut self,
431 worker: &WorkerTask,
432 snapshot_reader: &SnapshotReader,
433 ) -> Result<(bool, usize), Error> {
434
435 let status = match self.status {
436 Some(ref mut status) => status,
437 None => bail!("PoolWriter - no media loaded"),
438 };
439
440 let current_file_number = Self::prepare_tape_write(status, worker)?;
441
442 let (done, bytes_written) = {
443 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
444
445 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
446 Some(content_uuid) => {
447 self.catalog_set.lock().unwrap().register_snapshot(
448 content_uuid,
449 current_file_number,
450 &snapshot_reader.datastore_name().to_string(),
451 &snapshot_reader.snapshot().to_string(),
452 )?;
453 (true, writer.bytes_written())
454 }
455 None => (false, writer.bytes_written()),
456 }
457 };
458
459 status.bytes_written += bytes_written;
460
461 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
462
463 if !done || request_sync {
464 self.commit()?;
465 }
466
467 Ok((done, bytes_written))
468 }
469
470 /// Move to EOM (if not already there), then creates a new chunk
471 /// archive and writes chunks from 'chunk_iter'. This stops when
472 /// it detect LEOM or when we reach max archive size
473 /// (4GB). Written chunks are registered in the media catalog.
474 pub fn append_chunk_archive(
475 &mut self,
476 worker: &WorkerTask,
477 chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
478 store: &str,
479 ) -> Result<(bool, usize), Error> {
480
481 let status = match self.status {
482 Some(ref mut status) => status,
483 None => bail!("PoolWriter - no media loaded"),
484 };
485
486 let current_file_number = Self::prepare_tape_write(status, worker)?;
487
488 let writer = status.drive.write_file()?;
489
490 let start_time = SystemTime::now();
491
492 let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
493 worker,
494 writer,
495 chunk_iter,
496 store,
497 MAX_CHUNK_ARCHIVE_SIZE,
498 )?;
499
500 status.bytes_written += bytes_written;
501
502 let elapsed = start_time.elapsed()?.as_secs_f64();
503 worker.log(format!(
504 "wrote {} chunks ({:.2} MB at {:.2} MB/s)",
505 saved_chunks.len(),
506 bytes_written as f64 /1_000_000.0,
507 (bytes_written as f64)/(1_000_000.0*elapsed),
508 ));
509
510 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
511
512 // register chunks in media_catalog
513 self.catalog_set.lock().unwrap()
514 .register_chunk_archive(content_uuid, current_file_number, store, &saved_chunks)?;
515
516 if leom || request_sync {
517 self.commit()?;
518 }
519
520 Ok((leom, bytes_written))
521 }
522
523 pub fn spawn_chunk_reader_thread(
524 &self,
525 datastore: Arc<DataStore>,
526 snapshot_reader: Arc<Mutex<SnapshotReader>>,
527 ) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
528 NewChunksIterator::spawn(
529 datastore,
530 snapshot_reader,
531 Arc::clone(&self.catalog_set),
532 )
533 }
534 }
535
536 /// write up to <max_size> of chunks
537 fn write_chunk_archive<'a>(
538 _worker: &WorkerTask,
539 writer: Box<dyn 'a + TapeWrite>,
540 chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
541 store: &str,
542 max_size: usize,
543 ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
544
545 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, store, true)?;
546
547 // we want to get the chunk list in correct order
548 let mut chunk_list: Vec<[u8;32]> = Vec::new();
549
550 let mut leom = false;
551
552 loop {
553 let (digest, blob) = match chunk_iter.peek() {
554 None => break,
555 Some(Ok((digest, blob))) => (digest, blob),
556 Some(Err(err)) => bail!("{}", err),
557 };
558
559 //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(digest), blob.raw_size());
560
561 match writer.try_write_chunk(&digest, &blob) {
562 Ok(true) => {
563 chunk_list.push(*digest);
564 chunk_iter.next(); // consume
565 }
566 Ok(false) => {
567 // Note; we do not consume the chunk (no chunk_iter.next())
568 leom = true;
569 break;
570 }
571 Err(err) => bail!("write chunk failed - {}", err),
572 }
573
574 if writer.bytes_written() > max_size {
575 //worker.log("Chunk Archive max size reached, closing archive".to_string());
576 break;
577 }
578 }
579
580 writer.finish()?;
581
582 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
583 }
584
585 // Compare the media set label. If the media is empty, or the existing
586 // set label does not match the expected media set, overwrite the
587 // media set label.
588 fn update_media_set_label(
589 worker: &WorkerTask,
590 drive: &mut dyn TapeDriver,
591 old_set: Option<MediaSetLabel>,
592 media_id: &MediaId,
593 ) -> Result<(MediaCatalog, bool), Error> {
594
595 let media_catalog;
596
597 let new_set = match media_id.media_set_label {
598 None => bail!("got media without media set - internal error"),
599 Some(ref set) => set,
600 };
601
602 let key_config = if let Some(ref fingerprint) = new_set.encryption_key_fingerprint {
603 let (config_map, _digest) = load_key_configs()?;
604 match config_map.get(fingerprint) {
605 Some(key_config) => Some(key_config.clone()),
606 None => {
607 bail!("unable to find tape encryption key config '{}'", fingerprint);
608 }
609 }
610 } else {
611 None
612 };
613
614 let status_path = Path::new(TAPE_STATUS_DIR);
615
616 let new_media = match old_set {
617 None => {
618 worker.log("wrinting new media set label".to_string());
619 drive.write_media_set_label(new_set, key_config.as_ref())?;
620 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
621 true
622 }
623 Some(media_set_label) => {
624 if new_set.uuid == media_set_label.uuid {
625 if new_set.seq_nr != media_set_label.seq_nr {
626 bail!("got media with wrong media sequence number ({} != {}",
627 new_set.seq_nr,media_set_label.seq_nr);
628 }
629 if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint {
630 bail!("detected changed encryption fingerprint - internal error");
631 }
632 media_catalog = MediaCatalog::open(status_path, &media_id, true, false)?;
633
634 // todo: verify last content/media_catalog somehow?
635
636 false
637 } else {
638 worker.log(
639 format!("wrinting new media set label (overwrite '{}/{}')",
640 media_set_label.uuid.to_string(), media_set_label.seq_nr)
641 );
642
643 drive.write_media_set_label(new_set, key_config.as_ref())?;
644 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
645 true
646 }
647 }
648 };
649
650 Ok((media_catalog, new_media))
651 }