]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer.rs
clippy: collapse nested ifs
[proxmox-backup.git] / src / tape / pool_writer.rs
1 use std::collections::HashSet;
2 use std::path::Path;
3 use std::time::SystemTime;
4
5 use anyhow::{bail, Error};
6
7 use proxmox::tools::Uuid;
8
9 use crate::{
10 backup::{
11 DataStore,
12 },
13 server::WorkerTask,
14 tape::{
15 TAPE_STATUS_DIR,
16 MAX_CHUNK_ARCHIVE_SIZE,
17 COMMIT_BLOCK_SIZE,
18 TapeDriver,
19 TapeWrite,
20 ChunkArchiveWriter,
21 SnapshotReader,
22 SnapshotChunkIterator,
23 MediaPool,
24 MediaId,
25 MediaCatalog,
26 MediaSetCatalog,
27 tape_write_snapshot_archive,
28 request_and_load_media,
29 tape_alert_flags_critical,
30 media_changer,
31 file_formats::MediaSetLabel,
32 },
33 config::tape_encryption_keys::load_key_configs,
34 };
35
36
37 struct PoolWriterState {
38 drive: Box<dyn TapeDriver>,
39 catalog: MediaCatalog,
40 // tell if we already moved to EOM
41 at_eom: bool,
42 // bytes written after the last tape fush/sync
43 bytes_written: usize,
44 }
45
46 impl PoolWriterState {
47
48 fn commit(&mut self) -> Result<(), Error> {
49 self.drive.sync()?; // sync all data to the tape
50 self.catalog.commit()?; // then commit the catalog
51 self.bytes_written = 0;
52 Ok(())
53 }
54 }
55
56 /// Helper to manage a backup job, writing several tapes of a pool
57 pub struct PoolWriter {
58 pool: MediaPool,
59 drive_name: String,
60 status: Option<PoolWriterState>,
61 media_set_catalog: MediaSetCatalog,
62 }
63
64 impl PoolWriter {
65
66 pub fn new(mut pool: MediaPool, drive_name: &str) -> Result<Self, Error> {
67
68 let current_time = proxmox::tools::time::epoch_i64();
69
70 pool.start_write_session(current_time)?;
71
72 let mut media_set_catalog = MediaSetCatalog::new();
73
74 // load all catalogs read-only at start
75 for media_uuid in pool.current_media_list()? {
76 let media_catalog = MediaCatalog::open(
77 Path::new(TAPE_STATUS_DIR),
78 &media_uuid,
79 false,
80 false,
81 )?;
82 media_set_catalog.append_catalog(media_catalog)?;
83 }
84
85 Ok(Self {
86 pool,
87 drive_name: drive_name.to_string(),
88 status: None,
89 media_set_catalog,
90 })
91 }
92
93 pub fn pool(&mut self) -> &mut MediaPool {
94 &mut self.pool
95 }
96
97 /// Set media status to FULL (persistent - stores pool status)
98 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
99 self.pool.set_media_status_full(&uuid)?;
100 Ok(())
101 }
102
103 pub fn contains_snapshot(&self, snapshot: &str) -> bool {
104 if let Some(PoolWriterState { ref catalog, .. }) = self.status {
105 if catalog.contains_snapshot(snapshot) {
106 return true;
107 }
108 }
109 self.media_set_catalog.contains_snapshot(snapshot)
110 }
111
112 /// Eject media and drop PoolWriterState (close drive)
113 pub fn eject_media(&mut self, worker: &WorkerTask) -> Result<(), Error> {
114 let mut status = match self.status.take() {
115 Some(status) => status,
116 None => return Ok(()), // no media loaded
117 };
118
119 let (drive_config, _digest) = crate::config::drive::config()?;
120
121 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
122 worker.log("eject media");
123 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
124 drop(status); // close drive
125 worker.log("unload media");
126 changer.unload_media(None)?; //eject and unload
127 } else {
128 worker.log("standalone drive - ejecting media");
129 status.drive.eject_media()?;
130 }
131
132 Ok(())
133 }
134
135 /// Export current media set and drop PoolWriterState (close drive)
136 pub fn export_media_set(&mut self, worker: &WorkerTask) -> Result<(), Error> {
137 let mut status = self.status.take();
138
139 let (drive_config, _digest) = crate::config::drive::config()?;
140
141 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
142
143 if let Some(ref mut status) = status {
144 worker.log("eject media");
145 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
146 }
147 drop(status); // close drive
148
149 worker.log("unload media");
150 changer.unload_media(None)?;
151
152 for media_uuid in self.pool.current_media_list()? {
153 let media = self.pool.lookup_media(media_uuid)?;
154 let label_text = media.label_text();
155 if let Some(slot) = changer.export_media(label_text)? {
156 worker.log(format!("exported media '{}' to import/export slot {}", label_text, slot));
157 } else {
158 worker.warn(format!("export failed - media '{}' is not online", label_text));
159 }
160 }
161
162 } else if let Some(mut status) = status {
163 worker.log("standalone drive - ejecting media instead of export");
164 status.drive.eject_media()?;
165 }
166
167 Ok(())
168 }
169
170 /// commit changes to tape and catalog
171 ///
172 /// This is done automatically during a backupsession, but needs to
173 /// be called explicitly before dropping the PoolWriter
174 pub fn commit(&mut self) -> Result<(), Error> {
175 if let Some(ref mut status) = self.status {
176 status.commit()?;
177 }
178 Ok(())
179 }
180
181 /// Load a writable media into the drive
182 pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
183 let last_media_uuid = match self.status {
184 Some(PoolWriterState { ref catalog, .. }) => Some(catalog.uuid().clone()),
185 None => None,
186 };
187
188 let current_time = proxmox::tools::time::epoch_i64();
189 let media_uuid = self.pool.alloc_writable_media(current_time)?;
190
191 let media = self.pool.lookup_media(&media_uuid).unwrap();
192
193 let media_changed = match last_media_uuid {
194 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
195 None => true,
196 };
197
198 if !media_changed {
199 return Ok(media_uuid);
200 }
201
202 // remove read-only catalog (we store a writable version in status)
203 self.media_set_catalog.remove_catalog(&media_uuid);
204
205 if let Some(PoolWriterState {mut drive, catalog, .. }) = self.status.take() {
206 self.media_set_catalog.append_catalog(catalog)?;
207 drive.eject_media()?;
208 }
209
210 let (drive_config, _digest) = crate::config::drive::config()?;
211
212 let (mut drive, old_media_id) =
213 request_and_load_media(worker, &drive_config, &self.drive_name, media.label())?;
214
215 // test for critical tape alert flags
216 if let Ok(alert_flags) = drive.tape_alert_flags() {
217 if !alert_flags.is_empty() {
218 worker.log(format!("TapeAlertFlags: {:?}", alert_flags));
219 if tape_alert_flags_critical(alert_flags) {
220 bail!("aborting due to critical tape alert flags: {:?}", alert_flags);
221 }
222 }
223 }
224
225 let catalog = update_media_set_label(
226 worker,
227 drive.as_mut(),
228 old_media_id.media_set_label,
229 media.id(),
230 )?;
231
232 let encrypt_fingerprint = media
233 .media_set_label()
234 .as_ref()
235 .unwrap()
236 .encryption_key_fingerprint
237 .clone();
238
239 drive.set_encryption(encrypt_fingerprint)?;
240
241 self.status = Some(PoolWriterState { drive, catalog, at_eom: false, bytes_written: 0 });
242
243 Ok(media_uuid)
244 }
245
246 /// uuid of currently loaded BackupMedia
247 pub fn current_media_uuid(&self) -> Result<&Uuid, Error> {
248 match self.status {
249 Some(PoolWriterState { ref catalog, ..}) => Ok(catalog.uuid()),
250 None => bail!("PoolWriter - no media loaded"),
251 }
252 }
253
254 /// Move to EOM (if not aleady there), then creates a new snapshot
255 /// archive writing specified files (as .pxar) into it. On
256 /// success, this return 'Ok(true)' and the media catalog gets
257 /// updated.
258
259 /// Please note that this may fail when there is not enough space
260 /// on the media (return value 'Ok(false, _)'). In that case, the
261 /// archive is marked incomplete, and we do not use it. The caller
262 /// should mark the media as full and try again using another
263 /// media.
264 pub fn append_snapshot_archive(
265 &mut self,
266 worker: &WorkerTask,
267 snapshot_reader: &SnapshotReader,
268 ) -> Result<(bool, usize), Error> {
269
270 let status = match self.status {
271 Some(ref mut status) => status,
272 None => bail!("PoolWriter - no media loaded"),
273 };
274
275 if !status.at_eom {
276 worker.log(String::from("moving to end of media"));
277 status.drive.move_to_eom()?;
278 status.at_eom = true;
279 }
280
281 let current_file_number = status.drive.current_file_number()?;
282 if current_file_number < 2 {
283 bail!("got strange file position number from drive ({})", current_file_number);
284 }
285
286 let (done, bytes_written) = {
287 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
288
289 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
290 Some(content_uuid) => {
291 status.catalog.register_snapshot(
292 content_uuid,
293 current_file_number,
294 &snapshot_reader.snapshot().to_string(),
295 )?;
296 (true, writer.bytes_written())
297 }
298 None => (false, writer.bytes_written()),
299 }
300 };
301
302 status.bytes_written += bytes_written;
303
304 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
305
306 if !done || request_sync {
307 status.commit()?;
308 }
309
310 Ok((done, bytes_written))
311 }
312
313 /// Move to EOM (if not aleady there), then creates a new chunk
314 /// archive and writes chunks from 'chunk_iter'. This stops when
315 /// it detect LEOM or when we reach max archive size
316 /// (4GB). Written chunks are registered in the media catalog.
317 pub fn append_chunk_archive(
318 &mut self,
319 worker: &WorkerTask,
320 datastore: &DataStore,
321 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
322 ) -> Result<(bool, usize), Error> {
323
324 let status = match self.status {
325 Some(ref mut status) => status,
326 None => bail!("PoolWriter - no media loaded"),
327 };
328
329 if !status.at_eom {
330 worker.log(String::from("moving to end of media"));
331 status.drive.move_to_eom()?;
332 status.at_eom = true;
333 }
334
335 let current_file_number = status.drive.current_file_number()?;
336 if current_file_number < 2 {
337 bail!("got strange file position number from drive ({})", current_file_number);
338 }
339 let writer = status.drive.write_file()?;
340
341 let start_time = SystemTime::now();
342
343 let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
344 worker,
345 writer,
346 datastore,
347 chunk_iter,
348 &self.media_set_catalog,
349 &status.catalog,
350 MAX_CHUNK_ARCHIVE_SIZE,
351 )?;
352
353 status.bytes_written += bytes_written;
354
355 let elapsed = start_time.elapsed()?.as_secs_f64();
356 worker.log(format!(
357 "wrote {:.2} MB ({} MB/s)",
358 bytes_written as f64 / (1024.0*1024.0),
359 (bytes_written as f64)/(1024.0*1024.0*elapsed),
360 ));
361
362 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
363
364 // register chunks in media_catalog
365 status.catalog.start_chunk_archive(content_uuid, current_file_number)?;
366 for digest in saved_chunks {
367 status.catalog.register_chunk(&digest)?;
368 }
369 status.catalog.end_chunk_archive()?;
370
371 if leom || request_sync {
372 status.commit()?;
373 }
374
375 Ok((leom, bytes_written))
376 }
377 }
378
379 /// write up to <max_size> of chunks
380 fn write_chunk_archive<'a>(
381 worker: &WorkerTask,
382 writer: Box<dyn 'a + TapeWrite>,
383 datastore: &DataStore,
384 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
385 media_set_catalog: &MediaSetCatalog,
386 media_catalog: &MediaCatalog,
387 max_size: usize,
388 ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
389
390 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?;
391
392 let mut chunk_index: HashSet<[u8;32]> = HashSet::new();
393
394 // we want to get the chunk list in correct order
395 let mut chunk_list: Vec<[u8;32]> = Vec::new();
396
397 let mut leom = false;
398
399 loop {
400 let digest = match chunk_iter.next() {
401 None => break,
402 Some(digest) => digest?,
403 };
404 if media_catalog.contains_chunk(&digest)
405 || chunk_index.contains(&digest)
406 || media_set_catalog.contains_chunk(&digest)
407 {
408 continue;
409 }
410
411 let blob = datastore.load_chunk(&digest)?;
412 //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(&digest), blob.raw_size());
413
414 match writer.try_write_chunk(&digest, &blob) {
415 Ok(true) => {
416 chunk_index.insert(digest);
417 chunk_list.push(digest);
418 }
419 Ok(false) => {
420 leom = true;
421 break;
422 }
423 Err(err) => bail!("write chunk failed - {}", err),
424 }
425
426 if writer.bytes_written() > max_size {
427 worker.log("Chunk Archive max size reached, closing archive".to_string());
428 break;
429 }
430 }
431
432 writer.finish()?;
433
434 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
435 }
436
437 // Compare the media set label. If the media is empty, or the existing
438 // set label does not match the expected media set, overwrite the
439 // media set label.
440 fn update_media_set_label(
441 worker: &WorkerTask,
442 drive: &mut dyn TapeDriver,
443 old_set: Option<MediaSetLabel>,
444 media_id: &MediaId,
445 ) -> Result<MediaCatalog, Error> {
446
447 let media_catalog;
448
449 let new_set = match media_id.media_set_label {
450 None => bail!("got media without media set - internal error"),
451 Some(ref set) => set,
452 };
453
454 let key_config = if let Some(ref fingerprint) = new_set.encryption_key_fingerprint {
455 let (config_map, _digest) = load_key_configs()?;
456 match config_map.get(fingerprint) {
457 Some(item) => {
458 // fixme: store item.hint??? should be in key-config instead
459 Some(item.key_config.clone())
460 }
461 None => {
462 bail!("unable to find tape encryption key config '{}'", fingerprint);
463 }
464 }
465 } else {
466 None
467 };
468
469 let status_path = Path::new(TAPE_STATUS_DIR);
470
471 match old_set {
472 None => {
473 worker.log("wrinting new media set label".to_string());
474 drive.write_media_set_label(new_set, key_config.as_ref())?;
475 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
476 }
477 Some(media_set_label) => {
478 if new_set.uuid == media_set_label.uuid {
479 if new_set.seq_nr != media_set_label.seq_nr {
480 bail!("got media with wrong media sequence number ({} != {}",
481 new_set.seq_nr,media_set_label.seq_nr);
482 }
483 if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint {
484 bail!("detected changed encryption fingerprint - internal error");
485 }
486 media_catalog = MediaCatalog::open(status_path, &media_id.label.uuid, true, false)?;
487 } else {
488 worker.log(
489 format!("wrinting new media set label (overwrite '{}/{}')",
490 media_set_label.uuid.to_string(), media_set_label.seq_nr)
491 );
492
493 drive.write_media_set_label(new_set, key_config.as_ref())?;
494 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
495 }
496 }
497 }
498
499 // todo: verify last content/media_catalog somehow?
500 drive.move_to_eom()?; // just to be sure
501
502 Ok(media_catalog)
503 }