]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer.rs
tape: improve backup task logging
[proxmox-backup.git] / src / tape / pool_writer.rs
1 use std::collections::HashSet;
2 use std::path::Path;
3 use std::time::SystemTime;
4
5 use anyhow::{bail, Error};
6
7 use proxmox::tools::Uuid;
8
9 use crate::{
10 task_log,
11 backup::{
12 DataStore,
13 },
14 server::WorkerTask,
15 tape::{
16 TAPE_STATUS_DIR,
17 MAX_CHUNK_ARCHIVE_SIZE,
18 COMMIT_BLOCK_SIZE,
19 TapeWrite,
20 SnapshotReader,
21 SnapshotChunkIterator,
22 MediaPool,
23 MediaId,
24 MediaCatalog,
25 MediaSetCatalog,
26 file_formats::{
27 MediaSetLabel,
28 ChunkArchiveWriter,
29 tape_write_snapshot_archive,
30 },
31 drive::{
32 TapeDriver,
33 request_and_load_media,
34 tape_alert_flags_critical,
35 media_changer,
36 },
37 },
38 config::tape_encryption_keys::load_key_configs,
39 };
40
41
42 struct PoolWriterState {
43 drive: Box<dyn TapeDriver>,
44 catalog: MediaCatalog,
45 // tell if we already moved to EOM
46 at_eom: bool,
47 // bytes written after the last tape fush/sync
48 bytes_written: usize,
49 }
50
51 impl PoolWriterState {
52
53 fn commit(&mut self) -> Result<(), Error> {
54 self.drive.sync()?; // sync all data to the tape
55 self.catalog.commit()?; // then commit the catalog
56 self.bytes_written = 0;
57 Ok(())
58 }
59 }
60
61 /// Helper to manage a backup job, writing several tapes of a pool
62 pub struct PoolWriter {
63 pool: MediaPool,
64 drive_name: String,
65 status: Option<PoolWriterState>,
66 media_set_catalog: MediaSetCatalog,
67 }
68
69 impl PoolWriter {
70
71 pub fn new(mut pool: MediaPool, drive_name: &str) -> Result<Self, Error> {
72
73 let current_time = proxmox::tools::time::epoch_i64();
74
75 pool.start_write_session(current_time)?;
76
77 let mut media_set_catalog = MediaSetCatalog::new();
78
79 // load all catalogs read-only at start
80 for media_uuid in pool.current_media_list()? {
81 let media_catalog = MediaCatalog::open(
82 Path::new(TAPE_STATUS_DIR),
83 &media_uuid,
84 false,
85 false,
86 )?;
87 media_set_catalog.append_catalog(media_catalog)?;
88 }
89
90 Ok(Self {
91 pool,
92 drive_name: drive_name.to_string(),
93 status: None,
94 media_set_catalog,
95 })
96 }
97
98 pub fn pool(&mut self) -> &mut MediaPool {
99 &mut self.pool
100 }
101
102 /// Set media status to FULL (persistent - stores pool status)
103 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
104 self.pool.set_media_status_full(&uuid)?;
105 Ok(())
106 }
107
108 pub fn contains_snapshot(&self, snapshot: &str) -> bool {
109 if let Some(PoolWriterState { ref catalog, .. }) = self.status {
110 if catalog.contains_snapshot(snapshot) {
111 return true;
112 }
113 }
114 self.media_set_catalog.contains_snapshot(snapshot)
115 }
116
117 /// Eject media and drop PoolWriterState (close drive)
118 pub fn eject_media(&mut self, worker: &WorkerTask) -> Result<(), Error> {
119 let mut status = match self.status.take() {
120 Some(status) => status,
121 None => return Ok(()), // no media loaded
122 };
123
124 let (drive_config, _digest) = crate::config::drive::config()?;
125
126 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
127 worker.log("eject media");
128 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
129 drop(status); // close drive
130 worker.log("unload media");
131 changer.unload_media(None)?; //eject and unload
132 } else {
133 worker.log("standalone drive - ejecting media");
134 status.drive.eject_media()?;
135 }
136
137 Ok(())
138 }
139
140 /// Export current media set and drop PoolWriterState (close drive)
141 pub fn export_media_set(&mut self, worker: &WorkerTask) -> Result<(), Error> {
142 let mut status = self.status.take();
143
144 let (drive_config, _digest) = crate::config::drive::config()?;
145
146 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
147
148 if let Some(ref mut status) = status {
149 worker.log("eject media");
150 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
151 }
152 drop(status); // close drive
153
154 worker.log("unload media");
155 changer.unload_media(None)?;
156
157 for media_uuid in self.pool.current_media_list()? {
158 let media = self.pool.lookup_media(media_uuid)?;
159 let label_text = media.label_text();
160 if let Some(slot) = changer.export_media(label_text)? {
161 worker.log(format!("exported media '{}' to import/export slot {}", label_text, slot));
162 } else {
163 worker.warn(format!("export failed - media '{}' is not online", label_text));
164 }
165 }
166
167 } else if let Some(mut status) = status {
168 worker.log("standalone drive - ejecting media instead of export");
169 status.drive.eject_media()?;
170 }
171
172 Ok(())
173 }
174
175 /// commit changes to tape and catalog
176 ///
177 /// This is done automatically during a backupsession, but needs to
178 /// be called explicitly before dropping the PoolWriter
179 pub fn commit(&mut self) -> Result<(), Error> {
180 if let Some(ref mut status) = self.status {
181 status.commit()?;
182 }
183 Ok(())
184 }
185
186 /// Load a writable media into the drive
187 pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
188 let last_media_uuid = match self.status {
189 Some(PoolWriterState { ref catalog, .. }) => Some(catalog.uuid().clone()),
190 None => None,
191 };
192
193 let current_time = proxmox::tools::time::epoch_i64();
194 let media_uuid = self.pool.alloc_writable_media(current_time)?;
195
196 let media = self.pool.lookup_media(&media_uuid).unwrap();
197
198 let media_changed = match last_media_uuid {
199 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
200 None => true,
201 };
202
203 if !media_changed {
204 return Ok(media_uuid);
205 }
206
207 task_log!(worker, "allocated new writable media '{}'", media.label_text());
208
209 // remove read-only catalog (we store a writable version in status)
210 self.media_set_catalog.remove_catalog(&media_uuid);
211
212 if let Some(PoolWriterState {mut drive, catalog, .. }) = self.status.take() {
213 self.media_set_catalog.append_catalog(catalog)?;
214 task_log!(worker, "eject current media");
215 drive.eject_media()?;
216 }
217
218 let (drive_config, _digest) = crate::config::drive::config()?;
219
220 let (mut drive, old_media_id) =
221 request_and_load_media(worker, &drive_config, &self.drive_name, media.label())?;
222
223 // test for critical tape alert flags
224 if let Ok(alert_flags) = drive.tape_alert_flags() {
225 if !alert_flags.is_empty() {
226 worker.log(format!("TapeAlertFlags: {:?}", alert_flags));
227 if tape_alert_flags_critical(alert_flags) {
228 bail!("aborting due to critical tape alert flags: {:?}", alert_flags);
229 }
230 }
231 }
232
233 let catalog = update_media_set_label(
234 worker,
235 drive.as_mut(),
236 old_media_id.media_set_label,
237 media.id(),
238 )?;
239
240 let media_set = media.media_set_label().clone().unwrap();
241
242 let encrypt_fingerprint = media_set
243 .encryption_key_fingerprint
244 .clone()
245 .map(|fp| (fp, media_set.uuid.clone()));
246
247 drive.set_encryption(encrypt_fingerprint)?;
248
249 self.status = Some(PoolWriterState { drive, catalog, at_eom: false, bytes_written: 0 });
250
251 Ok(media_uuid)
252 }
253
254 /// uuid of currently loaded BackupMedia
255 pub fn current_media_uuid(&self) -> Result<&Uuid, Error> {
256 match self.status {
257 Some(PoolWriterState { ref catalog, ..}) => Ok(catalog.uuid()),
258 None => bail!("PoolWriter - no media loaded"),
259 }
260 }
261
262 /// Move to EOM (if not aleady there), then creates a new snapshot
263 /// archive writing specified files (as .pxar) into it. On
264 /// success, this return 'Ok(true)' and the media catalog gets
265 /// updated.
266
267 /// Please note that this may fail when there is not enough space
268 /// on the media (return value 'Ok(false, _)'). In that case, the
269 /// archive is marked incomplete, and we do not use it. The caller
270 /// should mark the media as full and try again using another
271 /// media.
272 pub fn append_snapshot_archive(
273 &mut self,
274 worker: &WorkerTask,
275 snapshot_reader: &SnapshotReader,
276 ) -> Result<(bool, usize), Error> {
277
278 let status = match self.status {
279 Some(ref mut status) => status,
280 None => bail!("PoolWriter - no media loaded"),
281 };
282
283 if !status.at_eom {
284 worker.log(String::from("moving to end of media"));
285 status.drive.move_to_eom()?;
286 status.at_eom = true;
287 }
288
289 let current_file_number = status.drive.current_file_number()?;
290 if current_file_number < 2 {
291 bail!("got strange file position number from drive ({})", current_file_number);
292 }
293
294 let (done, bytes_written) = {
295 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
296
297 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
298 Some(content_uuid) => {
299 status.catalog.register_snapshot(
300 content_uuid,
301 current_file_number,
302 &snapshot_reader.snapshot().to_string(),
303 )?;
304 (true, writer.bytes_written())
305 }
306 None => (false, writer.bytes_written()),
307 }
308 };
309
310 status.bytes_written += bytes_written;
311
312 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
313
314 if !done || request_sync {
315 status.commit()?;
316 }
317
318 Ok((done, bytes_written))
319 }
320
321 /// Move to EOM (if not aleady there), then creates a new chunk
322 /// archive and writes chunks from 'chunk_iter'. This stops when
323 /// it detect LEOM or when we reach max archive size
324 /// (4GB). Written chunks are registered in the media catalog.
325 pub fn append_chunk_archive(
326 &mut self,
327 worker: &WorkerTask,
328 datastore: &DataStore,
329 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
330 ) -> Result<(bool, usize), Error> {
331
332 let status = match self.status {
333 Some(ref mut status) => status,
334 None => bail!("PoolWriter - no media loaded"),
335 };
336
337 if !status.at_eom {
338 worker.log(String::from("moving to end of media"));
339 status.drive.move_to_eom()?;
340 status.at_eom = true;
341 }
342
343 let current_file_number = status.drive.current_file_number()?;
344 if current_file_number < 2 {
345 bail!("got strange file position number from drive ({})", current_file_number);
346 }
347 let writer = status.drive.write_file()?;
348
349 let start_time = SystemTime::now();
350
351 let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
352 worker,
353 writer,
354 datastore,
355 chunk_iter,
356 &self.media_set_catalog,
357 &status.catalog,
358 MAX_CHUNK_ARCHIVE_SIZE,
359 )?;
360
361 status.bytes_written += bytes_written;
362
363 let elapsed = start_time.elapsed()?.as_secs_f64();
364 worker.log(format!(
365 "wrote {:.2} MB ({} MB/s)",
366 bytes_written as f64 / (1024.0*1024.0),
367 (bytes_written as f64)/(1024.0*1024.0*elapsed),
368 ));
369
370 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
371
372 // register chunks in media_catalog
373 status.catalog.start_chunk_archive(content_uuid, current_file_number)?;
374 for digest in saved_chunks {
375 status.catalog.register_chunk(&digest)?;
376 }
377 status.catalog.end_chunk_archive()?;
378
379 if leom || request_sync {
380 status.commit()?;
381 }
382
383 Ok((leom, bytes_written))
384 }
385 }
386
387 /// write up to <max_size> of chunks
388 fn write_chunk_archive<'a>(
389 worker: &WorkerTask,
390 writer: Box<dyn 'a + TapeWrite>,
391 datastore: &DataStore,
392 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
393 media_set_catalog: &MediaSetCatalog,
394 media_catalog: &MediaCatalog,
395 max_size: usize,
396 ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
397
398 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?;
399
400 let mut chunk_index: HashSet<[u8;32]> = HashSet::new();
401
402 // we want to get the chunk list in correct order
403 let mut chunk_list: Vec<[u8;32]> = Vec::new();
404
405 let mut leom = false;
406
407 loop {
408 let digest = match chunk_iter.next() {
409 None => break,
410 Some(digest) => digest?,
411 };
412 if media_catalog.contains_chunk(&digest)
413 || chunk_index.contains(&digest)
414 || media_set_catalog.contains_chunk(&digest)
415 {
416 continue;
417 }
418
419 let blob = datastore.load_chunk(&digest)?;
420 //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(&digest), blob.raw_size());
421
422 match writer.try_write_chunk(&digest, &blob) {
423 Ok(true) => {
424 chunk_index.insert(digest);
425 chunk_list.push(digest);
426 }
427 Ok(false) => {
428 leom = true;
429 break;
430 }
431 Err(err) => bail!("write chunk failed - {}", err),
432 }
433
434 if writer.bytes_written() > max_size {
435 worker.log("Chunk Archive max size reached, closing archive".to_string());
436 break;
437 }
438 }
439
440 writer.finish()?;
441
442 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
443 }
444
445 // Compare the media set label. If the media is empty, or the existing
446 // set label does not match the expected media set, overwrite the
447 // media set label.
448 fn update_media_set_label(
449 worker: &WorkerTask,
450 drive: &mut dyn TapeDriver,
451 old_set: Option<MediaSetLabel>,
452 media_id: &MediaId,
453 ) -> Result<MediaCatalog, Error> {
454
455 let media_catalog;
456
457 let new_set = match media_id.media_set_label {
458 None => bail!("got media without media set - internal error"),
459 Some(ref set) => set,
460 };
461
462 let key_config = if let Some(ref fingerprint) = new_set.encryption_key_fingerprint {
463 let (config_map, _digest) = load_key_configs()?;
464 match config_map.get(fingerprint) {
465 Some(key_config) => Some(key_config.clone()),
466 None => {
467 bail!("unable to find tape encryption key config '{}'", fingerprint);
468 }
469 }
470 } else {
471 None
472 };
473
474 let status_path = Path::new(TAPE_STATUS_DIR);
475
476 match old_set {
477 None => {
478 worker.log("wrinting new media set label".to_string());
479 drive.write_media_set_label(new_set, key_config.as_ref())?;
480 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
481 }
482 Some(media_set_label) => {
483 if new_set.uuid == media_set_label.uuid {
484 if new_set.seq_nr != media_set_label.seq_nr {
485 bail!("got media with wrong media sequence number ({} != {}",
486 new_set.seq_nr,media_set_label.seq_nr);
487 }
488 if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint {
489 bail!("detected changed encryption fingerprint - internal error");
490 }
491 media_catalog = MediaCatalog::open(status_path, &media_id.label.uuid, true, false)?;
492 } else {
493 worker.log(
494 format!("wrinting new media set label (overwrite '{}/{}')",
495 media_set_label.uuid.to_string(), media_set_label.seq_nr)
496 );
497
498 drive.write_media_set_label(new_set, key_config.as_ref())?;
499 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
500 }
501 }
502 }
503
504 // todo: verify last content/media_catalog somehow?
505 drive.move_to_eom()?; // just to be sure
506
507 Ok(media_catalog)
508 }