]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer.rs
tape: change code hierarchy to improve docs
[proxmox-backup.git] / src / tape / pool_writer.rs
1 use std::collections::HashSet;
2 use std::path::Path;
3 use std::time::SystemTime;
4
5 use anyhow::{bail, Error};
6
7 use proxmox::tools::Uuid;
8
9 use crate::{
10 backup::{
11 DataStore,
12 },
13 server::WorkerTask,
14 tape::{
15 TAPE_STATUS_DIR,
16 MAX_CHUNK_ARCHIVE_SIZE,
17 COMMIT_BLOCK_SIZE,
18 TapeWrite,
19 ChunkArchiveWriter,
20 SnapshotReader,
21 SnapshotChunkIterator,
22 MediaPool,
23 MediaId,
24 MediaCatalog,
25 MediaSetCatalog,
26 tape_write_snapshot_archive,
27 file_formats::MediaSetLabel,
28 drive::{
29 TapeDriver,
30 request_and_load_media,
31 tape_alert_flags_critical,
32 media_changer,
33 },
34 },
35 config::tape_encryption_keys::load_key_configs,
36 };
37
38
39 struct PoolWriterState {
40 drive: Box<dyn TapeDriver>,
41 catalog: MediaCatalog,
42 // tell if we already moved to EOM
43 at_eom: bool,
44 // bytes written after the last tape fush/sync
45 bytes_written: usize,
46 }
47
48 impl PoolWriterState {
49
50 fn commit(&mut self) -> Result<(), Error> {
51 self.drive.sync()?; // sync all data to the tape
52 self.catalog.commit()?; // then commit the catalog
53 self.bytes_written = 0;
54 Ok(())
55 }
56 }
57
58 /// Helper to manage a backup job, writing several tapes of a pool
59 pub struct PoolWriter {
60 pool: MediaPool,
61 drive_name: String,
62 status: Option<PoolWriterState>,
63 media_set_catalog: MediaSetCatalog,
64 }
65
66 impl PoolWriter {
67
68 pub fn new(mut pool: MediaPool, drive_name: &str) -> Result<Self, Error> {
69
70 let current_time = proxmox::tools::time::epoch_i64();
71
72 pool.start_write_session(current_time)?;
73
74 let mut media_set_catalog = MediaSetCatalog::new();
75
76 // load all catalogs read-only at start
77 for media_uuid in pool.current_media_list()? {
78 let media_catalog = MediaCatalog::open(
79 Path::new(TAPE_STATUS_DIR),
80 &media_uuid,
81 false,
82 false,
83 )?;
84 media_set_catalog.append_catalog(media_catalog)?;
85 }
86
87 Ok(Self {
88 pool,
89 drive_name: drive_name.to_string(),
90 status: None,
91 media_set_catalog,
92 })
93 }
94
95 pub fn pool(&mut self) -> &mut MediaPool {
96 &mut self.pool
97 }
98
99 /// Set media status to FULL (persistent - stores pool status)
100 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
101 self.pool.set_media_status_full(&uuid)?;
102 Ok(())
103 }
104
105 pub fn contains_snapshot(&self, snapshot: &str) -> bool {
106 if let Some(PoolWriterState { ref catalog, .. }) = self.status {
107 if catalog.contains_snapshot(snapshot) {
108 return true;
109 }
110 }
111 self.media_set_catalog.contains_snapshot(snapshot)
112 }
113
114 /// Eject media and drop PoolWriterState (close drive)
115 pub fn eject_media(&mut self, worker: &WorkerTask) -> Result<(), Error> {
116 let mut status = match self.status.take() {
117 Some(status) => status,
118 None => return Ok(()), // no media loaded
119 };
120
121 let (drive_config, _digest) = crate::config::drive::config()?;
122
123 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
124 worker.log("eject media");
125 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
126 drop(status); // close drive
127 worker.log("unload media");
128 changer.unload_media(None)?; //eject and unload
129 } else {
130 worker.log("standalone drive - ejecting media");
131 status.drive.eject_media()?;
132 }
133
134 Ok(())
135 }
136
137 /// Export current media set and drop PoolWriterState (close drive)
138 pub fn export_media_set(&mut self, worker: &WorkerTask) -> Result<(), Error> {
139 let mut status = self.status.take();
140
141 let (drive_config, _digest) = crate::config::drive::config()?;
142
143 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
144
145 if let Some(ref mut status) = status {
146 worker.log("eject media");
147 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
148 }
149 drop(status); // close drive
150
151 worker.log("unload media");
152 changer.unload_media(None)?;
153
154 for media_uuid in self.pool.current_media_list()? {
155 let media = self.pool.lookup_media(media_uuid)?;
156 let label_text = media.label_text();
157 if let Some(slot) = changer.export_media(label_text)? {
158 worker.log(format!("exported media '{}' to import/export slot {}", label_text, slot));
159 } else {
160 worker.warn(format!("export failed - media '{}' is not online", label_text));
161 }
162 }
163
164 } else if let Some(mut status) = status {
165 worker.log("standalone drive - ejecting media instead of export");
166 status.drive.eject_media()?;
167 }
168
169 Ok(())
170 }
171
172 /// commit changes to tape and catalog
173 ///
174 /// This is done automatically during a backupsession, but needs to
175 /// be called explicitly before dropping the PoolWriter
176 pub fn commit(&mut self) -> Result<(), Error> {
177 if let Some(ref mut status) = self.status {
178 status.commit()?;
179 }
180 Ok(())
181 }
182
183 /// Load a writable media into the drive
184 pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
185 let last_media_uuid = match self.status {
186 Some(PoolWriterState { ref catalog, .. }) => Some(catalog.uuid().clone()),
187 None => None,
188 };
189
190 let current_time = proxmox::tools::time::epoch_i64();
191 let media_uuid = self.pool.alloc_writable_media(current_time)?;
192
193 let media = self.pool.lookup_media(&media_uuid).unwrap();
194
195 let media_changed = match last_media_uuid {
196 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
197 None => true,
198 };
199
200 if !media_changed {
201 return Ok(media_uuid);
202 }
203
204 // remove read-only catalog (we store a writable version in status)
205 self.media_set_catalog.remove_catalog(&media_uuid);
206
207 if let Some(PoolWriterState {mut drive, catalog, .. }) = self.status.take() {
208 self.media_set_catalog.append_catalog(catalog)?;
209 drive.eject_media()?;
210 }
211
212 let (drive_config, _digest) = crate::config::drive::config()?;
213
214 let (mut drive, old_media_id) =
215 request_and_load_media(worker, &drive_config, &self.drive_name, media.label())?;
216
217 // test for critical tape alert flags
218 if let Ok(alert_flags) = drive.tape_alert_flags() {
219 if !alert_flags.is_empty() {
220 worker.log(format!("TapeAlertFlags: {:?}", alert_flags));
221 if tape_alert_flags_critical(alert_flags) {
222 bail!("aborting due to critical tape alert flags: {:?}", alert_flags);
223 }
224 }
225 }
226
227 let catalog = update_media_set_label(
228 worker,
229 drive.as_mut(),
230 old_media_id.media_set_label,
231 media.id(),
232 )?;
233
234 let media_set = media.media_set_label().clone().unwrap();
235
236 let encrypt_fingerprint = media_set
237 .encryption_key_fingerprint
238 .clone()
239 .map(|fp| (fp, media_set.uuid.clone()));
240
241 drive.set_encryption(encrypt_fingerprint)?;
242
243 self.status = Some(PoolWriterState { drive, catalog, at_eom: false, bytes_written: 0 });
244
245 Ok(media_uuid)
246 }
247
248 /// uuid of currently loaded BackupMedia
249 pub fn current_media_uuid(&self) -> Result<&Uuid, Error> {
250 match self.status {
251 Some(PoolWriterState { ref catalog, ..}) => Ok(catalog.uuid()),
252 None => bail!("PoolWriter - no media loaded"),
253 }
254 }
255
256 /// Move to EOM (if not aleady there), then creates a new snapshot
257 /// archive writing specified files (as .pxar) into it. On
258 /// success, this return 'Ok(true)' and the media catalog gets
259 /// updated.
260
261 /// Please note that this may fail when there is not enough space
262 /// on the media (return value 'Ok(false, _)'). In that case, the
263 /// archive is marked incomplete, and we do not use it. The caller
264 /// should mark the media as full and try again using another
265 /// media.
266 pub fn append_snapshot_archive(
267 &mut self,
268 worker: &WorkerTask,
269 snapshot_reader: &SnapshotReader,
270 ) -> Result<(bool, usize), Error> {
271
272 let status = match self.status {
273 Some(ref mut status) => status,
274 None => bail!("PoolWriter - no media loaded"),
275 };
276
277 if !status.at_eom {
278 worker.log(String::from("moving to end of media"));
279 status.drive.move_to_eom()?;
280 status.at_eom = true;
281 }
282
283 let current_file_number = status.drive.current_file_number()?;
284 if current_file_number < 2 {
285 bail!("got strange file position number from drive ({})", current_file_number);
286 }
287
288 let (done, bytes_written) = {
289 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
290
291 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
292 Some(content_uuid) => {
293 status.catalog.register_snapshot(
294 content_uuid,
295 current_file_number,
296 &snapshot_reader.snapshot().to_string(),
297 )?;
298 (true, writer.bytes_written())
299 }
300 None => (false, writer.bytes_written()),
301 }
302 };
303
304 status.bytes_written += bytes_written;
305
306 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
307
308 if !done || request_sync {
309 status.commit()?;
310 }
311
312 Ok((done, bytes_written))
313 }
314
315 /// Move to EOM (if not aleady there), then creates a new chunk
316 /// archive and writes chunks from 'chunk_iter'. This stops when
317 /// it detect LEOM or when we reach max archive size
318 /// (4GB). Written chunks are registered in the media catalog.
319 pub fn append_chunk_archive(
320 &mut self,
321 worker: &WorkerTask,
322 datastore: &DataStore,
323 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
324 ) -> Result<(bool, usize), Error> {
325
326 let status = match self.status {
327 Some(ref mut status) => status,
328 None => bail!("PoolWriter - no media loaded"),
329 };
330
331 if !status.at_eom {
332 worker.log(String::from("moving to end of media"));
333 status.drive.move_to_eom()?;
334 status.at_eom = true;
335 }
336
337 let current_file_number = status.drive.current_file_number()?;
338 if current_file_number < 2 {
339 bail!("got strange file position number from drive ({})", current_file_number);
340 }
341 let writer = status.drive.write_file()?;
342
343 let start_time = SystemTime::now();
344
345 let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
346 worker,
347 writer,
348 datastore,
349 chunk_iter,
350 &self.media_set_catalog,
351 &status.catalog,
352 MAX_CHUNK_ARCHIVE_SIZE,
353 )?;
354
355 status.bytes_written += bytes_written;
356
357 let elapsed = start_time.elapsed()?.as_secs_f64();
358 worker.log(format!(
359 "wrote {:.2} MB ({} MB/s)",
360 bytes_written as f64 / (1024.0*1024.0),
361 (bytes_written as f64)/(1024.0*1024.0*elapsed),
362 ));
363
364 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
365
366 // register chunks in media_catalog
367 status.catalog.start_chunk_archive(content_uuid, current_file_number)?;
368 for digest in saved_chunks {
369 status.catalog.register_chunk(&digest)?;
370 }
371 status.catalog.end_chunk_archive()?;
372
373 if leom || request_sync {
374 status.commit()?;
375 }
376
377 Ok((leom, bytes_written))
378 }
379 }
380
381 /// write up to <max_size> of chunks
382 fn write_chunk_archive<'a>(
383 worker: &WorkerTask,
384 writer: Box<dyn 'a + TapeWrite>,
385 datastore: &DataStore,
386 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
387 media_set_catalog: &MediaSetCatalog,
388 media_catalog: &MediaCatalog,
389 max_size: usize,
390 ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
391
392 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?;
393
394 let mut chunk_index: HashSet<[u8;32]> = HashSet::new();
395
396 // we want to get the chunk list in correct order
397 let mut chunk_list: Vec<[u8;32]> = Vec::new();
398
399 let mut leom = false;
400
401 loop {
402 let digest = match chunk_iter.next() {
403 None => break,
404 Some(digest) => digest?,
405 };
406 if media_catalog.contains_chunk(&digest)
407 || chunk_index.contains(&digest)
408 || media_set_catalog.contains_chunk(&digest)
409 {
410 continue;
411 }
412
413 let blob = datastore.load_chunk(&digest)?;
414 //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(&digest), blob.raw_size());
415
416 match writer.try_write_chunk(&digest, &blob) {
417 Ok(true) => {
418 chunk_index.insert(digest);
419 chunk_list.push(digest);
420 }
421 Ok(false) => {
422 leom = true;
423 break;
424 }
425 Err(err) => bail!("write chunk failed - {}", err),
426 }
427
428 if writer.bytes_written() > max_size {
429 worker.log("Chunk Archive max size reached, closing archive".to_string());
430 break;
431 }
432 }
433
434 writer.finish()?;
435
436 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
437 }
438
439 // Compare the media set label. If the media is empty, or the existing
440 // set label does not match the expected media set, overwrite the
441 // media set label.
442 fn update_media_set_label(
443 worker: &WorkerTask,
444 drive: &mut dyn TapeDriver,
445 old_set: Option<MediaSetLabel>,
446 media_id: &MediaId,
447 ) -> Result<MediaCatalog, Error> {
448
449 let media_catalog;
450
451 let new_set = match media_id.media_set_label {
452 None => bail!("got media without media set - internal error"),
453 Some(ref set) => set,
454 };
455
456 let key_config = if let Some(ref fingerprint) = new_set.encryption_key_fingerprint {
457 let (config_map, _digest) = load_key_configs()?;
458 match config_map.get(fingerprint) {
459 Some(key_config) => Some(key_config.clone()),
460 None => {
461 bail!("unable to find tape encryption key config '{}'", fingerprint);
462 }
463 }
464 } else {
465 None
466 };
467
468 let status_path = Path::new(TAPE_STATUS_DIR);
469
470 match old_set {
471 None => {
472 worker.log("wrinting new media set label".to_string());
473 drive.write_media_set_label(new_set, key_config.as_ref())?;
474 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
475 }
476 Some(media_set_label) => {
477 if new_set.uuid == media_set_label.uuid {
478 if new_set.seq_nr != media_set_label.seq_nr {
479 bail!("got media with wrong media sequence number ({} != {}",
480 new_set.seq_nr,media_set_label.seq_nr);
481 }
482 if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint {
483 bail!("detected changed encryption fingerprint - internal error");
484 }
485 media_catalog = MediaCatalog::open(status_path, &media_id.label.uuid, true, false)?;
486 } else {
487 worker.log(
488 format!("wrinting new media set label (overwrite '{}/{}')",
489 media_set_label.uuid.to_string(), media_set_label.seq_nr)
490 );
491
492 drive.write_media_set_label(new_set, key_config.as_ref())?;
493 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
494 }
495 }
496 }
497
498 // todo: verify last content/media_catalog somehow?
499 drive.move_to_eom()?; // just to be sure
500
501 Ok(media_catalog)
502 }