]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer.rs
tape: improve PoolWriter logging
[proxmox-backup.git] / src / tape / pool_writer.rs
1 use std::collections::HashSet;
2 use std::path::Path;
3 use std::time::SystemTime;
4
5 use anyhow::{bail, Error};
6
7 use proxmox::tools::Uuid;
8
9 use crate::{
10 task_log,
11 backup::{
12 DataStore,
13 },
14 server::WorkerTask,
15 tape::{
16 TAPE_STATUS_DIR,
17 MAX_CHUNK_ARCHIVE_SIZE,
18 COMMIT_BLOCK_SIZE,
19 TapeWrite,
20 SnapshotReader,
21 SnapshotChunkIterator,
22 MediaPool,
23 MediaId,
24 MediaCatalog,
25 MediaSetCatalog,
26 file_formats::{
27 MediaSetLabel,
28 ChunkArchiveWriter,
29 tape_write_snapshot_archive,
30 },
31 drive::{
32 TapeDriver,
33 request_and_load_media,
34 tape_alert_flags_critical,
35 media_changer,
36 },
37 },
38 config::tape_encryption_keys::load_key_configs,
39 };
40
41
42 struct PoolWriterState {
43 drive: Box<dyn TapeDriver>,
44 catalog: MediaCatalog,
45 // tell if we already moved to EOM
46 at_eom: bool,
47 // bytes written after the last tape fush/sync
48 bytes_written: usize,
49 }
50
51 impl PoolWriterState {
52
53 fn commit(&mut self) -> Result<(), Error> {
54 self.drive.sync()?; // sync all data to the tape
55 self.catalog.commit()?; // then commit the catalog
56 self.bytes_written = 0;
57 Ok(())
58 }
59 }
60
61 /// Helper to manage a backup job, writing several tapes of a pool
62 pub struct PoolWriter {
63 pool: MediaPool,
64 drive_name: String,
65 status: Option<PoolWriterState>,
66 media_set_catalog: MediaSetCatalog,
67 }
68
69 impl PoolWriter {
70
71 pub fn new(mut pool: MediaPool, drive_name: &str, worker: &WorkerTask) -> Result<Self, Error> {
72
73 let current_time = proxmox::tools::time::epoch_i64();
74
75 let new_media_set_reason = pool.start_write_session(current_time)?;
76 if let Some(reason) = new_media_set_reason {
77 task_log!(
78 worker,
79 "starting new media set - reason: {}",
80 reason,
81 );
82 }
83
84 task_log!(worker, "media set uuid: {}", pool.current_media_set());
85
86 let mut media_set_catalog = MediaSetCatalog::new();
87
88 // load all catalogs read-only at start
89 for media_uuid in pool.current_media_list()? {
90 let media_catalog = MediaCatalog::open(
91 Path::new(TAPE_STATUS_DIR),
92 &media_uuid,
93 false,
94 false,
95 )?;
96 media_set_catalog.append_catalog(media_catalog)?;
97 }
98
99 Ok(Self {
100 pool,
101 drive_name: drive_name.to_string(),
102 status: None,
103 media_set_catalog,
104 })
105 }
106
107 pub fn pool(&mut self) -> &mut MediaPool {
108 &mut self.pool
109 }
110
111 /// Set media status to FULL (persistent - stores pool status)
112 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
113 self.pool.set_media_status_full(&uuid)?;
114 Ok(())
115 }
116
117 pub fn contains_snapshot(&self, snapshot: &str) -> bool {
118 if let Some(PoolWriterState { ref catalog, .. }) = self.status {
119 if catalog.contains_snapshot(snapshot) {
120 return true;
121 }
122 }
123 self.media_set_catalog.contains_snapshot(snapshot)
124 }
125
126 /// Eject media and drop PoolWriterState (close drive)
127 pub fn eject_media(&mut self, worker: &WorkerTask) -> Result<(), Error> {
128 let mut status = match self.status.take() {
129 Some(status) => status,
130 None => return Ok(()), // no media loaded
131 };
132
133 let (drive_config, _digest) = crate::config::drive::config()?;
134
135 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
136 worker.log("eject media");
137 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
138 drop(status); // close drive
139 worker.log("unload media");
140 changer.unload_media(None)?; //eject and unload
141 } else {
142 worker.log("standalone drive - ejecting media");
143 status.drive.eject_media()?;
144 }
145
146 Ok(())
147 }
148
149 /// Export current media set and drop PoolWriterState (close drive)
150 pub fn export_media_set(&mut self, worker: &WorkerTask) -> Result<(), Error> {
151 let mut status = self.status.take();
152
153 let (drive_config, _digest) = crate::config::drive::config()?;
154
155 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
156
157 if let Some(ref mut status) = status {
158 worker.log("eject media");
159 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
160 }
161 drop(status); // close drive
162
163 worker.log("unload media");
164 changer.unload_media(None)?;
165
166 for media_uuid in self.pool.current_media_list()? {
167 let media = self.pool.lookup_media(media_uuid)?;
168 let label_text = media.label_text();
169 if let Some(slot) = changer.export_media(label_text)? {
170 worker.log(format!("exported media '{}' to import/export slot {}", label_text, slot));
171 } else {
172 worker.warn(format!("export failed - media '{}' is not online", label_text));
173 }
174 }
175
176 } else if let Some(mut status) = status {
177 worker.log("standalone drive - ejecting media instead of export");
178 status.drive.eject_media()?;
179 }
180
181 Ok(())
182 }
183
184 /// commit changes to tape and catalog
185 ///
186 /// This is done automatically during a backupsession, but needs to
187 /// be called explicitly before dropping the PoolWriter
188 pub fn commit(&mut self) -> Result<(), Error> {
189 if let Some(ref mut status) = self.status {
190 status.commit()?;
191 }
192 Ok(())
193 }
194
195 /// Load a writable media into the drive
196 pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
197 let last_media_uuid = match self.status {
198 Some(PoolWriterState { ref catalog, .. }) => Some(catalog.uuid().clone()),
199 None => None,
200 };
201
202 let current_time = proxmox::tools::time::epoch_i64();
203 let media_uuid = self.pool.alloc_writable_media(current_time)?;
204
205 let media = self.pool.lookup_media(&media_uuid).unwrap();
206
207 let media_changed = match last_media_uuid {
208 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
209 None => true,
210 };
211
212 if !media_changed {
213 return Ok(media_uuid);
214 }
215
216 task_log!(worker, "allocated new writable media '{}'", media.label_text());
217
218 // remove read-only catalog (we store a writable version in status)
219 self.media_set_catalog.remove_catalog(&media_uuid);
220
221 if let Some(PoolWriterState {mut drive, catalog, .. }) = self.status.take() {
222 self.media_set_catalog.append_catalog(catalog)?;
223 task_log!(worker, "eject current media");
224 drive.eject_media()?;
225 }
226
227 let (drive_config, _digest) = crate::config::drive::config()?;
228
229 let (mut drive, old_media_id) =
230 request_and_load_media(worker, &drive_config, &self.drive_name, media.label())?;
231
232 // test for critical tape alert flags
233 if let Ok(alert_flags) = drive.tape_alert_flags() {
234 if !alert_flags.is_empty() {
235 worker.log(format!("TapeAlertFlags: {:?}", alert_flags));
236 if tape_alert_flags_critical(alert_flags) {
237 self.pool.set_media_status_damaged(&media_uuid)?;
238 bail!("aborting due to critical tape alert flags: {:?}", alert_flags);
239 }
240 }
241 }
242
243 let catalog = update_media_set_label(
244 worker,
245 drive.as_mut(),
246 old_media_id.media_set_label,
247 media.id(),
248 )?;
249
250 let media_set = media.media_set_label().clone().unwrap();
251
252 let encrypt_fingerprint = media_set
253 .encryption_key_fingerprint
254 .clone()
255 .map(|fp| (fp, media_set.uuid.clone()));
256
257 drive.set_encryption(encrypt_fingerprint)?;
258
259 self.status = Some(PoolWriterState { drive, catalog, at_eom: false, bytes_written: 0 });
260
261 Ok(media_uuid)
262 }
263
264 /// uuid of currently loaded BackupMedia
265 pub fn current_media_uuid(&self) -> Result<&Uuid, Error> {
266 match self.status {
267 Some(PoolWriterState { ref catalog, ..}) => Ok(catalog.uuid()),
268 None => bail!("PoolWriter - no media loaded"),
269 }
270 }
271
272 /// Move to EOM (if not aleady there), then creates a new snapshot
273 /// archive writing specified files (as .pxar) into it. On
274 /// success, this return 'Ok(true)' and the media catalog gets
275 /// updated.
276
277 /// Please note that this may fail when there is not enough space
278 /// on the media (return value 'Ok(false, _)'). In that case, the
279 /// archive is marked incomplete, and we do not use it. The caller
280 /// should mark the media as full and try again using another
281 /// media.
282 pub fn append_snapshot_archive(
283 &mut self,
284 worker: &WorkerTask,
285 snapshot_reader: &SnapshotReader,
286 ) -> Result<(bool, usize), Error> {
287
288 let status = match self.status {
289 Some(ref mut status) => status,
290 None => bail!("PoolWriter - no media loaded"),
291 };
292
293 if !status.at_eom {
294 worker.log(String::from("moving to end of media"));
295 status.drive.move_to_eom()?;
296 status.at_eom = true;
297 }
298
299 let current_file_number = status.drive.current_file_number()?;
300 if current_file_number < 2 {
301 bail!("got strange file position number from drive ({})", current_file_number);
302 }
303
304 let (done, bytes_written) = {
305 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
306
307 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
308 Some(content_uuid) => {
309 status.catalog.register_snapshot(
310 content_uuid,
311 current_file_number,
312 &snapshot_reader.snapshot().to_string(),
313 )?;
314 (true, writer.bytes_written())
315 }
316 None => (false, writer.bytes_written()),
317 }
318 };
319
320 status.bytes_written += bytes_written;
321
322 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
323
324 if !done || request_sync {
325 status.commit()?;
326 }
327
328 Ok((done, bytes_written))
329 }
330
331 /// Move to EOM (if not aleady there), then creates a new chunk
332 /// archive and writes chunks from 'chunk_iter'. This stops when
333 /// it detect LEOM or when we reach max archive size
334 /// (4GB). Written chunks are registered in the media catalog.
335 pub fn append_chunk_archive(
336 &mut self,
337 worker: &WorkerTask,
338 datastore: &DataStore,
339 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
340 ) -> Result<(bool, usize), Error> {
341
342 let status = match self.status {
343 Some(ref mut status) => status,
344 None => bail!("PoolWriter - no media loaded"),
345 };
346
347 if !status.at_eom {
348 worker.log(String::from("moving to end of media"));
349 status.drive.move_to_eom()?;
350 status.at_eom = true;
351 }
352
353 let current_file_number = status.drive.current_file_number()?;
354 if current_file_number < 2 {
355 bail!("got strange file position number from drive ({})", current_file_number);
356 }
357 let writer = status.drive.write_file()?;
358
359 let start_time = SystemTime::now();
360
361 let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
362 worker,
363 writer,
364 datastore,
365 chunk_iter,
366 &self.media_set_catalog,
367 &status.catalog,
368 MAX_CHUNK_ARCHIVE_SIZE,
369 )?;
370
371 status.bytes_written += bytes_written;
372
373 let elapsed = start_time.elapsed()?.as_secs_f64();
374 worker.log(format!(
375 "wrote {:.2} MB ({} MB/s)",
376 bytes_written as f64 / (1024.0*1024.0),
377 (bytes_written as f64)/(1024.0*1024.0*elapsed),
378 ));
379
380 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
381
382 // register chunks in media_catalog
383 status.catalog.start_chunk_archive(content_uuid, current_file_number)?;
384 for digest in saved_chunks {
385 status.catalog.register_chunk(&digest)?;
386 }
387 status.catalog.end_chunk_archive()?;
388
389 if leom || request_sync {
390 status.commit()?;
391 }
392
393 Ok((leom, bytes_written))
394 }
395 }
396
397 /// write up to <max_size> of chunks
398 fn write_chunk_archive<'a>(
399 worker: &WorkerTask,
400 writer: Box<dyn 'a + TapeWrite>,
401 datastore: &DataStore,
402 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
403 media_set_catalog: &MediaSetCatalog,
404 media_catalog: &MediaCatalog,
405 max_size: usize,
406 ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
407
408 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?;
409
410 let mut chunk_index: HashSet<[u8;32]> = HashSet::new();
411
412 // we want to get the chunk list in correct order
413 let mut chunk_list: Vec<[u8;32]> = Vec::new();
414
415 let mut leom = false;
416
417 loop {
418 let digest = match chunk_iter.next() {
419 None => break,
420 Some(digest) => digest?,
421 };
422 if media_catalog.contains_chunk(&digest)
423 || chunk_index.contains(&digest)
424 || media_set_catalog.contains_chunk(&digest)
425 {
426 continue;
427 }
428
429 let blob = datastore.load_chunk(&digest)?;
430 //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(&digest), blob.raw_size());
431
432 match writer.try_write_chunk(&digest, &blob) {
433 Ok(true) => {
434 chunk_index.insert(digest);
435 chunk_list.push(digest);
436 }
437 Ok(false) => {
438 leom = true;
439 break;
440 }
441 Err(err) => bail!("write chunk failed - {}", err),
442 }
443
444 if writer.bytes_written() > max_size {
445 worker.log("Chunk Archive max size reached, closing archive".to_string());
446 break;
447 }
448 }
449
450 writer.finish()?;
451
452 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
453 }
454
455 // Compare the media set label. If the media is empty, or the existing
456 // set label does not match the expected media set, overwrite the
457 // media set label.
458 fn update_media_set_label(
459 worker: &WorkerTask,
460 drive: &mut dyn TapeDriver,
461 old_set: Option<MediaSetLabel>,
462 media_id: &MediaId,
463 ) -> Result<MediaCatalog, Error> {
464
465 let media_catalog;
466
467 let new_set = match media_id.media_set_label {
468 None => bail!("got media without media set - internal error"),
469 Some(ref set) => set,
470 };
471
472 let key_config = if let Some(ref fingerprint) = new_set.encryption_key_fingerprint {
473 let (config_map, _digest) = load_key_configs()?;
474 match config_map.get(fingerprint) {
475 Some(key_config) => Some(key_config.clone()),
476 None => {
477 bail!("unable to find tape encryption key config '{}'", fingerprint);
478 }
479 }
480 } else {
481 None
482 };
483
484 let status_path = Path::new(TAPE_STATUS_DIR);
485
486 match old_set {
487 None => {
488 worker.log("wrinting new media set label".to_string());
489 drive.write_media_set_label(new_set, key_config.as_ref())?;
490 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
491 }
492 Some(media_set_label) => {
493 if new_set.uuid == media_set_label.uuid {
494 if new_set.seq_nr != media_set_label.seq_nr {
495 bail!("got media with wrong media sequence number ({} != {}",
496 new_set.seq_nr,media_set_label.seq_nr);
497 }
498 if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint {
499 bail!("detected changed encryption fingerprint - internal error");
500 }
501 media_catalog = MediaCatalog::open(status_path, &media_id.label.uuid, true, false)?;
502 } else {
503 worker.log(
504 format!("wrinting new media set label (overwrite '{}/{}')",
505 media_set_label.uuid.to_string(), media_set_label.seq_nr)
506 );
507
508 drive.write_media_set_label(new_set, key_config.as_ref())?;
509 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
510 }
511 }
512 }
513
514 // todo: verify last content/media_catalog somehow?
515 drive.move_to_eom()?; // just to be sure
516
517 Ok(media_catalog)
518 }