]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer.rs
clippy: remove needless bool literals
[proxmox-backup.git] / src / tape / pool_writer.rs
1 use std::collections::HashSet;
2 use std::path::Path;
3 use std::time::SystemTime;
4
5 use anyhow::{bail, Error};
6
7 use proxmox::tools::Uuid;
8
9 use crate::{
10 backup::{
11 DataStore,
12 },
13 server::WorkerTask,
14 tape::{
15 TAPE_STATUS_DIR,
16 MAX_CHUNK_ARCHIVE_SIZE,
17 COMMIT_BLOCK_SIZE,
18 TapeDriver,
19 TapeWrite,
20 ChunkArchiveWriter,
21 SnapshotReader,
22 SnapshotChunkIterator,
23 MediaPool,
24 MediaId,
25 MediaCatalog,
26 MediaSetCatalog,
27 tape_write_snapshot_archive,
28 request_and_load_media,
29 tape_alert_flags_critical,
30 media_changer,
31 file_formats::MediaSetLabel,
32 },
33 config::tape_encryption_keys::load_key_configs,
34 };
35
36
37 struct PoolWriterState {
38 drive: Box<dyn TapeDriver>,
39 catalog: MediaCatalog,
40 // tell if we already moved to EOM
41 at_eom: bool,
42 // bytes written after the last tape fush/sync
43 bytes_written: usize,
44 }
45
46 impl PoolWriterState {
47
48 fn commit(&mut self) -> Result<(), Error> {
49 self.drive.sync()?; // sync all data to the tape
50 self.catalog.commit()?; // then commit the catalog
51 self.bytes_written = 0;
52 Ok(())
53 }
54 }
55
56 /// Helper to manage a backup job, writing several tapes of a pool
57 pub struct PoolWriter {
58 pool: MediaPool,
59 drive_name: String,
60 status: Option<PoolWriterState>,
61 media_set_catalog: MediaSetCatalog,
62 }
63
64 impl PoolWriter {
65
66 pub fn new(mut pool: MediaPool, drive_name: &str) -> Result<Self, Error> {
67
68 let current_time = proxmox::tools::time::epoch_i64();
69
70 pool.start_write_session(current_time)?;
71
72 let mut media_set_catalog = MediaSetCatalog::new();
73
74 // load all catalogs read-only at start
75 for media_uuid in pool.current_media_list()? {
76 let media_catalog = MediaCatalog::open(
77 Path::new(TAPE_STATUS_DIR),
78 &media_uuid,
79 false,
80 false,
81 )?;
82 media_set_catalog.append_catalog(media_catalog)?;
83 }
84
85 Ok(Self {
86 pool,
87 drive_name: drive_name.to_string(),
88 status: None,
89 media_set_catalog,
90 })
91 }
92
93 pub fn pool(&mut self) -> &mut MediaPool {
94 &mut self.pool
95 }
96
97 /// Set media status to FULL (persistent - stores pool status)
98 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
99 self.pool.set_media_status_full(&uuid)?;
100 Ok(())
101 }
102
103 pub fn contains_snapshot(&self, snapshot: &str) -> bool {
104 if let Some(PoolWriterState { ref catalog, .. }) = self.status {
105 if catalog.contains_snapshot(snapshot) {
106 return true;
107 }
108 }
109 self.media_set_catalog.contains_snapshot(snapshot)
110 }
111
112 /// Eject media and drop PoolWriterState (close drive)
113 pub fn eject_media(&mut self, worker: &WorkerTask) -> Result<(), Error> {
114 let mut status = match self.status.take() {
115 Some(status) => status,
116 None => return Ok(()), // no media loaded
117 };
118
119 let (drive_config, _digest) = crate::config::drive::config()?;
120
121 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
122 worker.log("eject media");
123 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
124 drop(status); // close drive
125 worker.log("unload media");
126 changer.unload_media(None)?; //eject and unload
127 } else {
128 worker.log("standalone drive - ejecting media");
129 status.drive.eject_media()?;
130 }
131
132 Ok(())
133 }
134
135 /// Export current media set and drop PoolWriterState (close drive)
136 pub fn export_media_set(&mut self, worker: &WorkerTask) -> Result<(), Error> {
137 let mut status = self.status.take();
138
139 let (drive_config, _digest) = crate::config::drive::config()?;
140
141 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
142
143 if let Some(ref mut status) = status {
144 worker.log("eject media");
145 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
146 }
147 drop(status); // close drive
148
149 worker.log("unload media");
150 changer.unload_media(None)?;
151
152 for media_uuid in self.pool.current_media_list()? {
153 let media = self.pool.lookup_media(media_uuid)?;
154 let label_text = media.label_text();
155 if let Some(slot) = changer.export_media(label_text)? {
156 worker.log(format!("exported media '{}' to import/export slot {}", label_text, slot));
157 } else {
158 worker.warn(format!("export failed - media '{}' is not online", label_text));
159 }
160 }
161
162 } else {
163 if let Some(mut status) = status {
164 worker.log("standalone drive - ejecting media instead of export");
165 status.drive.eject_media()?;
166 }
167 }
168
169 Ok(())
170 }
171
172 /// commit changes to tape and catalog
173 ///
174 /// This is done automatically during a backupsession, but needs to
175 /// be called explicitly before dropping the PoolWriter
176 pub fn commit(&mut self) -> Result<(), Error> {
177 if let Some(ref mut status) = self.status {
178 status.commit()?;
179 }
180 Ok(())
181 }
182
183 /// Load a writable media into the drive
184 pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
185 let last_media_uuid = match self.status {
186 Some(PoolWriterState { ref catalog, .. }) => Some(catalog.uuid().clone()),
187 None => None,
188 };
189
190 let current_time = proxmox::tools::time::epoch_i64();
191 let media_uuid = self.pool.alloc_writable_media(current_time)?;
192
193 let media = self.pool.lookup_media(&media_uuid).unwrap();
194
195 let media_changed = match last_media_uuid {
196 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
197 None => true,
198 };
199
200 if !media_changed {
201 return Ok(media_uuid);
202 }
203
204 // remove read-only catalog (we store a writable version in status)
205 self.media_set_catalog.remove_catalog(&media_uuid);
206
207 if let Some(PoolWriterState {mut drive, catalog, .. }) = self.status.take() {
208 self.media_set_catalog.append_catalog(catalog)?;
209 drive.eject_media()?;
210 }
211
212 let (drive_config, _digest) = crate::config::drive::config()?;
213
214 let (mut drive, old_media_id) =
215 request_and_load_media(worker, &drive_config, &self.drive_name, media.label())?;
216
217 // test for critical tape alert flags
218 if let Ok(alert_flags) = drive.tape_alert_flags() {
219 if !alert_flags.is_empty() {
220 worker.log(format!("TapeAlertFlags: {:?}", alert_flags));
221 if tape_alert_flags_critical(alert_flags) {
222 bail!("aborting due to critical tape alert flags: {:?}", alert_flags);
223 }
224 }
225 }
226
227 let catalog = update_media_set_label(
228 worker,
229 drive.as_mut(),
230 old_media_id.media_set_label,
231 media.id(),
232 )?;
233
234 let encrypt_fingerprint = media
235 .media_set_label()
236 .as_ref()
237 .unwrap()
238 .encryption_key_fingerprint
239 .clone();
240
241 drive.set_encryption(encrypt_fingerprint)?;
242
243 self.status = Some(PoolWriterState { drive, catalog, at_eom: false, bytes_written: 0 });
244
245 Ok(media_uuid)
246 }
247
248 /// uuid of currently loaded BackupMedia
249 pub fn current_media_uuid(&self) -> Result<&Uuid, Error> {
250 match self.status {
251 Some(PoolWriterState { ref catalog, ..}) => Ok(catalog.uuid()),
252 None => bail!("PoolWriter - no media loaded"),
253 }
254 }
255
256 /// Move to EOM (if not aleady there), then creates a new snapshot
257 /// archive writing specified files (as .pxar) into it. On
258 /// success, this return 'Ok(true)' and the media catalog gets
259 /// updated.
260
261 /// Please note that this may fail when there is not enough space
262 /// on the media (return value 'Ok(false, _)'). In that case, the
263 /// archive is marked incomplete, and we do not use it. The caller
264 /// should mark the media as full and try again using another
265 /// media.
266 pub fn append_snapshot_archive(
267 &mut self,
268 worker: &WorkerTask,
269 snapshot_reader: &SnapshotReader,
270 ) -> Result<(bool, usize), Error> {
271
272 let status = match self.status {
273 Some(ref mut status) => status,
274 None => bail!("PoolWriter - no media loaded"),
275 };
276
277 if !status.at_eom {
278 worker.log(String::from("moving to end of media"));
279 status.drive.move_to_eom()?;
280 status.at_eom = true;
281 }
282
283 let current_file_number = status.drive.current_file_number()?;
284 if current_file_number < 2 {
285 bail!("got strange file position number from drive ({})", current_file_number);
286 }
287
288 let (done, bytes_written) = {
289 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
290
291 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
292 Some(content_uuid) => {
293 status.catalog.register_snapshot(
294 content_uuid,
295 current_file_number,
296 &snapshot_reader.snapshot().to_string(),
297 )?;
298 (true, writer.bytes_written())
299 }
300 None => (false, writer.bytes_written()),
301 }
302 };
303
304 status.bytes_written += bytes_written;
305
306 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
307
308 if !done || request_sync {
309 status.commit()?;
310 }
311
312 Ok((done, bytes_written))
313 }
314
315 /// Move to EOM (if not aleady there), then creates a new chunk
316 /// archive and writes chunks from 'chunk_iter'. This stops when
317 /// it detect LEOM or when we reach max archive size
318 /// (4GB). Written chunks are registered in the media catalog.
319 pub fn append_chunk_archive(
320 &mut self,
321 worker: &WorkerTask,
322 datastore: &DataStore,
323 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
324 ) -> Result<(bool, usize), Error> {
325
326 let status = match self.status {
327 Some(ref mut status) => status,
328 None => bail!("PoolWriter - no media loaded"),
329 };
330
331 if !status.at_eom {
332 worker.log(String::from("moving to end of media"));
333 status.drive.move_to_eom()?;
334 status.at_eom = true;
335 }
336
337 let current_file_number = status.drive.current_file_number()?;
338 if current_file_number < 2 {
339 bail!("got strange file position number from drive ({})", current_file_number);
340 }
341 let writer = status.drive.write_file()?;
342
343 let start_time = SystemTime::now();
344
345 let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
346 worker,
347 writer,
348 datastore,
349 chunk_iter,
350 &self.media_set_catalog,
351 &status.catalog,
352 MAX_CHUNK_ARCHIVE_SIZE,
353 )?;
354
355 status.bytes_written += bytes_written;
356
357 let elapsed = start_time.elapsed()?.as_secs_f64();
358 worker.log(format!(
359 "wrote {:.2} MB ({} MB/s)",
360 bytes_written as f64 / (1024.0*1024.0),
361 (bytes_written as f64)/(1024.0*1024.0*elapsed),
362 ));
363
364 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
365
366 // register chunks in media_catalog
367 status.catalog.start_chunk_archive(content_uuid, current_file_number)?;
368 for digest in saved_chunks {
369 status.catalog.register_chunk(&digest)?;
370 }
371 status.catalog.end_chunk_archive()?;
372
373 if leom || request_sync {
374 status.commit()?;
375 }
376
377 Ok((leom, bytes_written))
378 }
379 }
380
381 /// write up to <max_size> of chunks
382 fn write_chunk_archive<'a>(
383 worker: &WorkerTask,
384 writer: Box<dyn 'a + TapeWrite>,
385 datastore: &DataStore,
386 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
387 media_set_catalog: &MediaSetCatalog,
388 media_catalog: &MediaCatalog,
389 max_size: usize,
390 ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
391
392 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?;
393
394 let mut chunk_index: HashSet<[u8;32]> = HashSet::new();
395
396 // we want to get the chunk list in correct order
397 let mut chunk_list: Vec<[u8;32]> = Vec::new();
398
399 let mut leom = false;
400
401 loop {
402 let digest = match chunk_iter.next() {
403 None => break,
404 Some(digest) => digest?,
405 };
406 if media_catalog.contains_chunk(&digest)
407 || chunk_index.contains(&digest)
408 || media_set_catalog.contains_chunk(&digest)
409 {
410 continue;
411 }
412
413 let blob = datastore.load_chunk(&digest)?;
414 //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(&digest), blob.raw_size());
415
416 match writer.try_write_chunk(&digest, &blob) {
417 Ok(true) => {
418 chunk_index.insert(digest);
419 chunk_list.push(digest);
420 }
421 Ok(false) => {
422 leom = true;
423 break;
424 }
425 Err(err) => bail!("write chunk failed - {}", err),
426 }
427
428 if writer.bytes_written() > max_size {
429 worker.log(format!("Chunk Archive max size reached, closing archive"));
430 break;
431 }
432 }
433
434 writer.finish()?;
435
436 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
437 }
438
439 // Compare the media set label. If the media is empty, or the existing
440 // set label does not match the expected media set, overwrite the
441 // media set label.
442 fn update_media_set_label(
443 worker: &WorkerTask,
444 drive: &mut dyn TapeDriver,
445 old_set: Option<MediaSetLabel>,
446 media_id: &MediaId,
447 ) -> Result<MediaCatalog, Error> {
448
449 let media_catalog;
450
451 let new_set = match media_id.media_set_label {
452 None => bail!("got media without media set - internal error"),
453 Some(ref set) => set,
454 };
455
456 let key_config = if let Some(ref fingerprint) = new_set.encryption_key_fingerprint {
457 let (config_map, _digest) = load_key_configs()?;
458 match config_map.get(fingerprint) {
459 Some(item) => {
460 // fixme: store item.hint??? should be in key-config instead
461 Some(item.key_config.clone())
462 }
463 None => {
464 bail!("unable to find tape encryption key config '{}'", fingerprint);
465 }
466 }
467 } else {
468 None
469 };
470
471 let status_path = Path::new(TAPE_STATUS_DIR);
472
473 match old_set {
474 None => {
475 worker.log(format!("wrinting new media set label"));
476 drive.write_media_set_label(new_set, key_config.as_ref())?;
477 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
478 }
479 Some(media_set_label) => {
480 if new_set.uuid == media_set_label.uuid {
481 if new_set.seq_nr != media_set_label.seq_nr {
482 bail!("got media with wrong media sequence number ({} != {}",
483 new_set.seq_nr,media_set_label.seq_nr);
484 }
485 if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint {
486 bail!("detected changed encryption fingerprint - internal error");
487 }
488 media_catalog = MediaCatalog::open(status_path, &media_id.label.uuid, true, false)?;
489 } else {
490 worker.log(
491 format!("wrinting new media set label (overwrite '{}/{}')",
492 media_set_label.uuid.to_string(), media_set_label.seq_nr)
493 );
494
495 drive.write_media_set_label(new_set, key_config.as_ref())?;
496 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
497 }
498 }
499 }
500
501 // todo: verify last content/media_catalog somehow?
502 drive.move_to_eom()?; // just to be sure
503
504 Ok(media_catalog)
505 }