]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer.rs
a15273f13489c6eb9b1a5f618045040e3e3b6adc
[proxmox-backup.git] / src / tape / pool_writer.rs
1 use std::collections::HashSet;
2 use std::path::Path;
3 use std::time::SystemTime;
4
5 use anyhow::{bail, Error};
6
7 use proxmox::tools::Uuid;
8
9 use crate::{
10 backup::{
11 DataStore,
12 },
13 server::WorkerTask,
14 tape::{
15 TAPE_STATUS_DIR,
16 MAX_CHUNK_ARCHIVE_SIZE,
17 COMMIT_BLOCK_SIZE,
18 TapeWrite,
19 SnapshotReader,
20 SnapshotChunkIterator,
21 MediaPool,
22 MediaId,
23 MediaCatalog,
24 MediaSetCatalog,
25 file_formats::{
26 MediaSetLabel,
27 ChunkArchiveWriter,
28 tape_write_snapshot_archive,
29 },
30 drive::{
31 TapeDriver,
32 request_and_load_media,
33 tape_alert_flags_critical,
34 media_changer,
35 },
36 },
37 config::tape_encryption_keys::load_key_configs,
38 };
39
40
41 struct PoolWriterState {
42 drive: Box<dyn TapeDriver>,
43 catalog: MediaCatalog,
44 // tell if we already moved to EOM
45 at_eom: bool,
46 // bytes written after the last tape fush/sync
47 bytes_written: usize,
48 }
49
50 impl PoolWriterState {
51
52 fn commit(&mut self) -> Result<(), Error> {
53 self.drive.sync()?; // sync all data to the tape
54 self.catalog.commit()?; // then commit the catalog
55 self.bytes_written = 0;
56 Ok(())
57 }
58 }
59
60 /// Helper to manage a backup job, writing several tapes of a pool
61 pub struct PoolWriter {
62 pool: MediaPool,
63 drive_name: String,
64 status: Option<PoolWriterState>,
65 media_set_catalog: MediaSetCatalog,
66 }
67
68 impl PoolWriter {
69
70 pub fn new(mut pool: MediaPool, drive_name: &str) -> Result<Self, Error> {
71
72 let current_time = proxmox::tools::time::epoch_i64();
73
74 pool.start_write_session(current_time)?;
75
76 let mut media_set_catalog = MediaSetCatalog::new();
77
78 // load all catalogs read-only at start
79 for media_uuid in pool.current_media_list()? {
80 let media_catalog = MediaCatalog::open(
81 Path::new(TAPE_STATUS_DIR),
82 &media_uuid,
83 false,
84 false,
85 )?;
86 media_set_catalog.append_catalog(media_catalog)?;
87 }
88
89 Ok(Self {
90 pool,
91 drive_name: drive_name.to_string(),
92 status: None,
93 media_set_catalog,
94 })
95 }
96
97 pub fn pool(&mut self) -> &mut MediaPool {
98 &mut self.pool
99 }
100
101 /// Set media status to FULL (persistent - stores pool status)
102 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
103 self.pool.set_media_status_full(&uuid)?;
104 Ok(())
105 }
106
107 pub fn contains_snapshot(&self, snapshot: &str) -> bool {
108 if let Some(PoolWriterState { ref catalog, .. }) = self.status {
109 if catalog.contains_snapshot(snapshot) {
110 return true;
111 }
112 }
113 self.media_set_catalog.contains_snapshot(snapshot)
114 }
115
116 /// Eject media and drop PoolWriterState (close drive)
117 pub fn eject_media(&mut self, worker: &WorkerTask) -> Result<(), Error> {
118 let mut status = match self.status.take() {
119 Some(status) => status,
120 None => return Ok(()), // no media loaded
121 };
122
123 let (drive_config, _digest) = crate::config::drive::config()?;
124
125 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
126 worker.log("eject media");
127 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
128 drop(status); // close drive
129 worker.log("unload media");
130 changer.unload_media(None)?; //eject and unload
131 } else {
132 worker.log("standalone drive - ejecting media");
133 status.drive.eject_media()?;
134 }
135
136 Ok(())
137 }
138
139 /// Export current media set and drop PoolWriterState (close drive)
140 pub fn export_media_set(&mut self, worker: &WorkerTask) -> Result<(), Error> {
141 let mut status = self.status.take();
142
143 let (drive_config, _digest) = crate::config::drive::config()?;
144
145 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
146
147 if let Some(ref mut status) = status {
148 worker.log("eject media");
149 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
150 }
151 drop(status); // close drive
152
153 worker.log("unload media");
154 changer.unload_media(None)?;
155
156 for media_uuid in self.pool.current_media_list()? {
157 let media = self.pool.lookup_media(media_uuid)?;
158 let label_text = media.label_text();
159 if let Some(slot) = changer.export_media(label_text)? {
160 worker.log(format!("exported media '{}' to import/export slot {}", label_text, slot));
161 } else {
162 worker.warn(format!("export failed - media '{}' is not online", label_text));
163 }
164 }
165
166 } else if let Some(mut status) = status {
167 worker.log("standalone drive - ejecting media instead of export");
168 status.drive.eject_media()?;
169 }
170
171 Ok(())
172 }
173
174 /// commit changes to tape and catalog
175 ///
176 /// This is done automatically during a backupsession, but needs to
177 /// be called explicitly before dropping the PoolWriter
178 pub fn commit(&mut self) -> Result<(), Error> {
179 if let Some(ref mut status) = self.status {
180 status.commit()?;
181 }
182 Ok(())
183 }
184
185 /// Load a writable media into the drive
186 pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
187 let last_media_uuid = match self.status {
188 Some(PoolWriterState { ref catalog, .. }) => Some(catalog.uuid().clone()),
189 None => None,
190 };
191
192 let current_time = proxmox::tools::time::epoch_i64();
193 let media_uuid = self.pool.alloc_writable_media(current_time)?;
194
195 let media = self.pool.lookup_media(&media_uuid).unwrap();
196
197 let media_changed = match last_media_uuid {
198 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
199 None => true,
200 };
201
202 if !media_changed {
203 return Ok(media_uuid);
204 }
205
206 // remove read-only catalog (we store a writable version in status)
207 self.media_set_catalog.remove_catalog(&media_uuid);
208
209 if let Some(PoolWriterState {mut drive, catalog, .. }) = self.status.take() {
210 self.media_set_catalog.append_catalog(catalog)?;
211 drive.eject_media()?;
212 }
213
214 let (drive_config, _digest) = crate::config::drive::config()?;
215
216 let (mut drive, old_media_id) =
217 request_and_load_media(worker, &drive_config, &self.drive_name, media.label())?;
218
219 // test for critical tape alert flags
220 if let Ok(alert_flags) = drive.tape_alert_flags() {
221 if !alert_flags.is_empty() {
222 worker.log(format!("TapeAlertFlags: {:?}", alert_flags));
223 if tape_alert_flags_critical(alert_flags) {
224 bail!("aborting due to critical tape alert flags: {:?}", alert_flags);
225 }
226 }
227 }
228
229 let catalog = update_media_set_label(
230 worker,
231 drive.as_mut(),
232 old_media_id.media_set_label,
233 media.id(),
234 )?;
235
236 let media_set = media.media_set_label().clone().unwrap();
237
238 let encrypt_fingerprint = media_set
239 .encryption_key_fingerprint
240 .clone()
241 .map(|fp| (fp, media_set.uuid.clone()));
242
243 drive.set_encryption(encrypt_fingerprint)?;
244
245 self.status = Some(PoolWriterState { drive, catalog, at_eom: false, bytes_written: 0 });
246
247 Ok(media_uuid)
248 }
249
250 /// uuid of currently loaded BackupMedia
251 pub fn current_media_uuid(&self) -> Result<&Uuid, Error> {
252 match self.status {
253 Some(PoolWriterState { ref catalog, ..}) => Ok(catalog.uuid()),
254 None => bail!("PoolWriter - no media loaded"),
255 }
256 }
257
258 /// Move to EOM (if not aleady there), then creates a new snapshot
259 /// archive writing specified files (as .pxar) into it. On
260 /// success, this return 'Ok(true)' and the media catalog gets
261 /// updated.
262
263 /// Please note that this may fail when there is not enough space
264 /// on the media (return value 'Ok(false, _)'). In that case, the
265 /// archive is marked incomplete, and we do not use it. The caller
266 /// should mark the media as full and try again using another
267 /// media.
268 pub fn append_snapshot_archive(
269 &mut self,
270 worker: &WorkerTask,
271 snapshot_reader: &SnapshotReader,
272 ) -> Result<(bool, usize), Error> {
273
274 let status = match self.status {
275 Some(ref mut status) => status,
276 None => bail!("PoolWriter - no media loaded"),
277 };
278
279 if !status.at_eom {
280 worker.log(String::from("moving to end of media"));
281 status.drive.move_to_eom()?;
282 status.at_eom = true;
283 }
284
285 let current_file_number = status.drive.current_file_number()?;
286 if current_file_number < 2 {
287 bail!("got strange file position number from drive ({})", current_file_number);
288 }
289
290 let (done, bytes_written) = {
291 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
292
293 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
294 Some(content_uuid) => {
295 status.catalog.register_snapshot(
296 content_uuid,
297 current_file_number,
298 &snapshot_reader.snapshot().to_string(),
299 )?;
300 (true, writer.bytes_written())
301 }
302 None => (false, writer.bytes_written()),
303 }
304 };
305
306 status.bytes_written += bytes_written;
307
308 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
309
310 if !done || request_sync {
311 status.commit()?;
312 }
313
314 Ok((done, bytes_written))
315 }
316
317 /// Move to EOM (if not aleady there), then creates a new chunk
318 /// archive and writes chunks from 'chunk_iter'. This stops when
319 /// it detect LEOM or when we reach max archive size
320 /// (4GB). Written chunks are registered in the media catalog.
321 pub fn append_chunk_archive(
322 &mut self,
323 worker: &WorkerTask,
324 datastore: &DataStore,
325 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
326 ) -> Result<(bool, usize), Error> {
327
328 let status = match self.status {
329 Some(ref mut status) => status,
330 None => bail!("PoolWriter - no media loaded"),
331 };
332
333 if !status.at_eom {
334 worker.log(String::from("moving to end of media"));
335 status.drive.move_to_eom()?;
336 status.at_eom = true;
337 }
338
339 let current_file_number = status.drive.current_file_number()?;
340 if current_file_number < 2 {
341 bail!("got strange file position number from drive ({})", current_file_number);
342 }
343 let writer = status.drive.write_file()?;
344
345 let start_time = SystemTime::now();
346
347 let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
348 worker,
349 writer,
350 datastore,
351 chunk_iter,
352 &self.media_set_catalog,
353 &status.catalog,
354 MAX_CHUNK_ARCHIVE_SIZE,
355 )?;
356
357 status.bytes_written += bytes_written;
358
359 let elapsed = start_time.elapsed()?.as_secs_f64();
360 worker.log(format!(
361 "wrote {:.2} MB ({} MB/s)",
362 bytes_written as f64 / (1024.0*1024.0),
363 (bytes_written as f64)/(1024.0*1024.0*elapsed),
364 ));
365
366 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
367
368 // register chunks in media_catalog
369 status.catalog.start_chunk_archive(content_uuid, current_file_number)?;
370 for digest in saved_chunks {
371 status.catalog.register_chunk(&digest)?;
372 }
373 status.catalog.end_chunk_archive()?;
374
375 if leom || request_sync {
376 status.commit()?;
377 }
378
379 Ok((leom, bytes_written))
380 }
381 }
382
383 /// write up to <max_size> of chunks
384 fn write_chunk_archive<'a>(
385 worker: &WorkerTask,
386 writer: Box<dyn 'a + TapeWrite>,
387 datastore: &DataStore,
388 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
389 media_set_catalog: &MediaSetCatalog,
390 media_catalog: &MediaCatalog,
391 max_size: usize,
392 ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
393
394 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?;
395
396 let mut chunk_index: HashSet<[u8;32]> = HashSet::new();
397
398 // we want to get the chunk list in correct order
399 let mut chunk_list: Vec<[u8;32]> = Vec::new();
400
401 let mut leom = false;
402
403 loop {
404 let digest = match chunk_iter.next() {
405 None => break,
406 Some(digest) => digest?,
407 };
408 if media_catalog.contains_chunk(&digest)
409 || chunk_index.contains(&digest)
410 || media_set_catalog.contains_chunk(&digest)
411 {
412 continue;
413 }
414
415 let blob = datastore.load_chunk(&digest)?;
416 //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(&digest), blob.raw_size());
417
418 match writer.try_write_chunk(&digest, &blob) {
419 Ok(true) => {
420 chunk_index.insert(digest);
421 chunk_list.push(digest);
422 }
423 Ok(false) => {
424 leom = true;
425 break;
426 }
427 Err(err) => bail!("write chunk failed - {}", err),
428 }
429
430 if writer.bytes_written() > max_size {
431 worker.log("Chunk Archive max size reached, closing archive".to_string());
432 break;
433 }
434 }
435
436 writer.finish()?;
437
438 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
439 }
440
441 // Compare the media set label. If the media is empty, or the existing
442 // set label does not match the expected media set, overwrite the
443 // media set label.
444 fn update_media_set_label(
445 worker: &WorkerTask,
446 drive: &mut dyn TapeDriver,
447 old_set: Option<MediaSetLabel>,
448 media_id: &MediaId,
449 ) -> Result<MediaCatalog, Error> {
450
451 let media_catalog;
452
453 let new_set = match media_id.media_set_label {
454 None => bail!("got media without media set - internal error"),
455 Some(ref set) => set,
456 };
457
458 let key_config = if let Some(ref fingerprint) = new_set.encryption_key_fingerprint {
459 let (config_map, _digest) = load_key_configs()?;
460 match config_map.get(fingerprint) {
461 Some(key_config) => Some(key_config.clone()),
462 None => {
463 bail!("unable to find tape encryption key config '{}'", fingerprint);
464 }
465 }
466 } else {
467 None
468 };
469
470 let status_path = Path::new(TAPE_STATUS_DIR);
471
472 match old_set {
473 None => {
474 worker.log("wrinting new media set label".to_string());
475 drive.write_media_set_label(new_set, key_config.as_ref())?;
476 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
477 }
478 Some(media_set_label) => {
479 if new_set.uuid == media_set_label.uuid {
480 if new_set.seq_nr != media_set_label.seq_nr {
481 bail!("got media with wrong media sequence number ({} != {}",
482 new_set.seq_nr,media_set_label.seq_nr);
483 }
484 if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint {
485 bail!("detected changed encryption fingerprint - internal error");
486 }
487 media_catalog = MediaCatalog::open(status_path, &media_id.label.uuid, true, false)?;
488 } else {
489 worker.log(
490 format!("wrinting new media set label (overwrite '{}/{}')",
491 media_set_label.uuid.to_string(), media_set_label.seq_nr)
492 );
493
494 drive.write_media_set_label(new_set, key_config.as_ref())?;
495 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
496 }
497 }
498 }
499
500 // todo: verify last content/media_catalog somehow?
501 drive.move_to_eom()?; // just to be sure
502
503 Ok(media_catalog)
504 }