]> git.proxmox.com Git - proxmox-backup.git/blame - src/tape/pool_writer.rs
server/email_notifications: do not double html escape
[proxmox-backup.git] / src / tape / pool_writer.rs
CommitLineData
d37da6b7
DM
1use std::collections::HashSet;
2use std::path::Path;
32b75d36 3use std::fs::File;
31cf625a 4use std::time::SystemTime;
5c4755ad 5use std::sync::{Arc, Mutex};
d37da6b7 6
5c4755ad 7use anyhow::{bail, format_err, Error};
d37da6b7 8
66e42bec 9use proxmox::tools::Uuid;
d37da6b7
DM
10
11use crate::{
3fbf2311 12 task_log,
d37da6b7
DM
13 backup::{
14 DataStore,
5c4755ad 15 DataBlob,
d37da6b7 16 },
ff58c519 17 server::WorkerTask,
d37da6b7
DM
18 tape::{
19 TAPE_STATUS_DIR,
20 MAX_CHUNK_ARCHIVE_SIZE,
21 COMMIT_BLOCK_SIZE,
d37da6b7 22 TapeWrite,
d37da6b7 23 SnapshotReader,
d37da6b7
DM
24 MediaPool,
25 MediaId,
26 MediaCatalog,
27 MediaSetCatalog,
f47e0357
DM
28 file_formats::{
29 MediaSetLabel,
30 ChunkArchiveWriter,
31 tape_write_snapshot_archive,
32b75d36 32 tape_write_catalog,
f47e0357 33 },
37796ff7
DM
34 drive::{
35 TapeDriver,
36 request_and_load_media,
37 tape_alert_flags_critical,
38 media_changer,
39 },
d37da6b7 40 },
feb1645f 41 config::tape_encryption_keys::load_key_configs,
d37da6b7
DM
42};
43
5c4755ad
DM
44/// Helper to build and query sets of catalogs
45pub struct CatalogBuilder {
46 // read only part
47 media_set_catalog: MediaSetCatalog,
48 // catalog to modify (latest in set)
49 catalog: Option<MediaCatalog>,
50}
51
52impl CatalogBuilder {
53
54 /// Test if the catalog already contains a snapshot
54722aca 55 pub fn contains_snapshot(&self, store: &str, snapshot: &str) -> bool {
5c4755ad 56 if let Some(ref catalog) = self.catalog {
54722aca 57 if catalog.contains_snapshot(store, snapshot) {
5c4755ad
DM
58 return true;
59 }
60 }
54722aca 61 self.media_set_catalog.contains_snapshot(store, snapshot)
5c4755ad
DM
62 }
63
64 /// Test if the catalog already contains a chunk
54722aca 65 pub fn contains_chunk(&self, store: &str, digest: &[u8;32]) -> bool {
5c4755ad 66 if let Some(ref catalog) = self.catalog {
54722aca 67 if catalog.contains_chunk(store, digest) {
5c4755ad
DM
68 return true;
69 }
70 }
54722aca 71 self.media_set_catalog.contains_chunk(store, digest)
5c4755ad
DM
72 }
73
74 /// Add a new catalog, move the old on to the read-only set
75 pub fn append_catalog(&mut self, new_catalog: MediaCatalog) -> Result<(), Error> {
76
77 // append current catalog to read-only set
78 if let Some(catalog) = self.catalog.take() {
79 self.media_set_catalog.append_catalog(catalog)?;
80 }
81
82 // remove read-only version from set (in case it is there)
83 self.media_set_catalog.remove_catalog(&new_catalog.uuid());
84
85 self.catalog = Some(new_catalog);
86
87 Ok(())
88 }
89
90 /// Register a snapshot
91 pub fn register_snapshot(
92 &mut self,
93 uuid: Uuid, // Uuid form MediaContentHeader
94 file_number: u64,
54722aca 95 store: &str,
5c4755ad
DM
96 snapshot: &str,
97 ) -> Result<(), Error> {
98 match self.catalog {
99 Some(ref mut catalog) => {
54722aca 100 catalog.register_snapshot(uuid, file_number, store, snapshot)?;
5c4755ad
DM
101 }
102 None => bail!("no catalog loaded - internal error"),
103 }
104 Ok(())
105 }
106
107 /// Register a chunk archive
108 pub fn register_chunk_archive(
109 &mut self,
110 uuid: Uuid, // Uuid form MediaContentHeader
111 file_number: u64,
54722aca 112 store: &str,
5c4755ad
DM
113 chunk_list: &[[u8; 32]],
114 ) -> Result<(), Error> {
115 match self.catalog {
116 Some(ref mut catalog) => {
54722aca 117 catalog.start_chunk_archive(uuid, file_number, store)?;
5c4755ad
DM
118 for digest in chunk_list {
119 catalog.register_chunk(digest)?;
120 }
121 catalog.end_chunk_archive()?;
122 }
123 None => bail!("no catalog loaded - internal error"),
124 }
125 Ok(())
126 }
127
128 /// Commit the catalog changes
129 pub fn commit(&mut self) -> Result<(), Error> {
130 if let Some(ref mut catalog) = self.catalog {
131 catalog.commit()?;
132 }
133 Ok(())
134 }
135}
136
137/// Chunk iterator which use a separate thread to read chunks
138///
139/// The iterator skips duplicate chunks and chunks already in the
140/// catalog.
141pub struct NewChunksIterator {
142 rx: std::sync::mpsc::Receiver<Result<Option<([u8; 32], DataBlob)>, Error>>,
143}
144
145impl NewChunksIterator {
146
147 /// Creates the iterator, spawning a new thread
148 ///
149 /// Make sure to join() the returnd thread handle.
150 pub fn spawn(
151 datastore: Arc<DataStore>,
152 snapshot_reader: Arc<Mutex<SnapshotReader>>,
153 catalog_builder: Arc<Mutex<CatalogBuilder>>,
154 ) -> Result<(std::thread::JoinHandle<()>, Self), Error> {
155
156 let (tx, rx) = std::sync::mpsc::sync_channel(3);
157
158 let reader_thread = std::thread::spawn(move || {
159
160 let snapshot_reader = snapshot_reader.lock().unwrap();
161
162 let mut chunk_index: HashSet<[u8;32]> = HashSet::new();
163
54722aca
DM
164 let datastore_name = snapshot_reader.datastore_name();
165
5c4755ad
DM
166 let result: Result<(), Error> = proxmox::try_block!({
167
168 let mut chunk_iter = snapshot_reader.chunk_iterator()?;
169
170 loop {
171 let digest = match chunk_iter.next() {
172 None => {
173 tx.send(Ok(None)).unwrap();
174 break;
175 }
176 Some(digest) => digest?,
177 };
178
179 if chunk_index.contains(&digest) {
180 continue;
181 }
182
54722aca 183 if catalog_builder.lock().unwrap().contains_chunk(&datastore_name, &digest) {
5c4755ad
DM
184 continue;
185 };
186
187 let blob = datastore.load_chunk(&digest)?;
188 //println!("LOAD CHUNK {}", proxmox::tools::digest_to_hex(&digest));
189 tx.send(Ok(Some((digest, blob)))).unwrap();
190
191 chunk_index.insert(digest);
192 }
193
194 Ok(())
195 });
196 if let Err(err) = result {
197 tx.send(Err(err)).unwrap();
198 }
199 });
200
201 Ok((reader_thread, Self { rx }))
202 }
203}
204
205// We do not use Receiver::into_iter(). The manual implementation
206// returns a simpler type.
207impl Iterator for NewChunksIterator {
208 type Item = Result<([u8; 32], DataBlob), Error>;
209
210 fn next(&mut self) -> Option<Self::Item> {
211 match self.rx.recv() {
212 Ok(Ok(None)) => None,
213 Ok(Ok(Some((digest, blob)))) => Some(Ok((digest, blob))),
214 Ok(Err(err)) => Some(Err(err)),
215 Err(_) => Some(Err(format_err!("reader thread failed"))),
216 }
217 }
218}
d37da6b7
DM
219
220struct PoolWriterState {
221 drive: Box<dyn TapeDriver>,
d37da6b7
DM
222 // tell if we already moved to EOM
223 at_eom: bool,
224 // bytes written after the last tape fush/sync
225 bytes_written: usize,
226}
227
d37da6b7
DM
228/// Helper to manage a backup job, writing several tapes of a pool
229pub struct PoolWriter {
230 pool: MediaPool,
231 drive_name: String,
232 status: Option<PoolWriterState>,
5c4755ad 233 catalog_builder: Arc<Mutex<CatalogBuilder>>,
c9793d47 234 notify_email: Option<String>,
d37da6b7
DM
235}
236
237impl PoolWriter {
238
32b75d36
DM
239 pub fn new(
240 mut pool: MediaPool,
241 drive_name: &str,
242 worker: &WorkerTask,
243 notify_email: Option<String>,
244 ) -> Result<Self, Error> {
d37da6b7
DM
245
246 let current_time = proxmox::tools::time::epoch_i64();
247
90e16be3
DM
248 let new_media_set_reason = pool.start_write_session(current_time)?;
249 if let Some(reason) = new_media_set_reason {
250 task_log!(
251 worker,
252 "starting new media set - reason: {}",
253 reason,
254 );
255 }
256
32b75d36
DM
257 let media_set_uuid = pool.current_media_set().uuid();
258 task_log!(worker, "media set uuid: {}", media_set_uuid);
d37da6b7
DM
259
260 let mut media_set_catalog = MediaSetCatalog::new();
261
262 // load all catalogs read-only at start
263 for media_uuid in pool.current_media_list()? {
237314ad 264 let media_info = pool.lookup_media(media_uuid).unwrap();
d37da6b7
DM
265 let media_catalog = MediaCatalog::open(
266 Path::new(TAPE_STATUS_DIR),
237314ad 267 media_info.id(),
d37da6b7
DM
268 false,
269 false,
270 )?;
271 media_set_catalog.append_catalog(media_catalog)?;
272 }
273
5c4755ad
DM
274 let catalog_builder = CatalogBuilder { media_set_catalog, catalog: None };
275
d37da6b7
DM
276 Ok(Self {
277 pool,
278 drive_name: drive_name.to_string(),
279 status: None,
5c4755ad 280 catalog_builder: Arc::new(Mutex::new(catalog_builder)),
c9793d47 281 notify_email,
d37da6b7
DM
282 })
283 }
284
285 pub fn pool(&mut self) -> &mut MediaPool {
286 &mut self.pool
287 }
288
289 /// Set media status to FULL (persistent - stores pool status)
290 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
291 self.pool.set_media_status_full(&uuid)?;
292 Ok(())
293 }
294
54722aca
DM
295 pub fn contains_snapshot(&self, store: &str, snapshot: &str) -> bool {
296 self.catalog_builder.lock().unwrap().contains_snapshot(store, snapshot)
d37da6b7
DM
297 }
298
42967bf1 299 /// Eject media and drop PoolWriterState (close drive)
5654d8ce 300 pub fn eject_media(&mut self, worker: &WorkerTask) -> Result<(), Error> {
42967bf1
DM
301 let mut status = match self.status.take() {
302 Some(status) => status,
303 None => return Ok(()), // no media loaded
304 };
305
306 let (drive_config, _digest) = crate::config::drive::config()?;
307
308 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
5654d8ce
DM
309 worker.log("eject media");
310 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
edb90f6a 311 drop(status); // close drive
5654d8ce
DM
312 worker.log("unload media");
313 changer.unload_media(None)?; //eject and unload
42967bf1 314 } else {
5654d8ce 315 worker.log("standalone drive - ejecting media");
42967bf1
DM
316 status.drive.eject_media()?;
317 }
318
319 Ok(())
320 }
321
edb90f6a
DM
322 /// Export current media set and drop PoolWriterState (close drive)
323 pub fn export_media_set(&mut self, worker: &WorkerTask) -> Result<(), Error> {
5654d8ce 324 let mut status = self.status.take();
edb90f6a
DM
325
326 let (drive_config, _digest) = crate::config::drive::config()?;
327
328 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
5654d8ce
DM
329
330 if let Some(ref mut status) = status {
331 worker.log("eject media");
332 status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster
333 }
edb90f6a
DM
334 drop(status); // close drive
335
5654d8ce 336 worker.log("unload media");
edb90f6a
DM
337 changer.unload_media(None)?;
338
339 for media_uuid in self.pool.current_media_list()? {
340 let media = self.pool.lookup_media(media_uuid)?;
8446fbca
DM
341 let label_text = media.label_text();
342 if let Some(slot) = changer.export_media(label_text)? {
343 worker.log(format!("exported media '{}' to import/export slot {}", label_text, slot));
edb90f6a 344 } else {
8446fbca 345 worker.warn(format!("export failed - media '{}' is not online", label_text));
edb90f6a
DM
346 }
347 }
348
6334bdc1
FG
349 } else if let Some(mut status) = status {
350 worker.log("standalone drive - ejecting media instead of export");
351 status.drive.eject_media()?;
edb90f6a
DM
352 }
353
354 Ok(())
355 }
356
d37da6b7
DM
357 /// commit changes to tape and catalog
358 ///
359 /// This is done automatically during a backupsession, but needs to
360 /// be called explicitly before dropping the PoolWriter
361 pub fn commit(&mut self) -> Result<(), Error> {
5c4755ad
DM
362 if let Some(PoolWriterState {ref mut drive, .. }) = self.status {
363 drive.sync()?; // sync all data to the tape
d37da6b7 364 }
5c4755ad 365 self.catalog_builder.lock().unwrap().commit()?; // then commit the catalog
d37da6b7
DM
366 Ok(())
367 }
368
369 /// Load a writable media into the drive
ff58c519 370 pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
5c4755ad
DM
371 let last_media_uuid = match self.catalog_builder.lock().unwrap().catalog {
372 Some(ref catalog) => Some(catalog.uuid().clone()),
d37da6b7
DM
373 None => None,
374 };
375
376 let current_time = proxmox::tools::time::epoch_i64();
377 let media_uuid = self.pool.alloc_writable_media(current_time)?;
378
379 let media = self.pool.lookup_media(&media_uuid).unwrap();
380
381 let media_changed = match last_media_uuid {
382 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
383 None => true,
384 };
385
386 if !media_changed {
387 return Ok(media_uuid);
388 }
389
3fbf2311
DM
390 task_log!(worker, "allocated new writable media '{}'", media.label_text());
391
5c4755ad
DM
392 if let Some(PoolWriterState {mut drive, .. }) = self.status.take() {
393 if last_media_uuid.is_some() {
394 task_log!(worker, "eject current media");
395 drive.eject_media()?;
396 }
d37da6b7
DM
397 }
398
399 let (drive_config, _digest) = crate::config::drive::config()?;
66e42bec
DM
400
401 let (mut drive, old_media_id) =
c9793d47 402 request_and_load_media(worker, &drive_config, &self.drive_name, media.label(), &self.notify_email)?;
66e42bec 403
5843268c 404 // test for critical tape alert flags
a08a1985
DM
405 if let Ok(alert_flags) = drive.tape_alert_flags() {
406 if !alert_flags.is_empty() {
407 worker.log(format!("TapeAlertFlags: {:?}", alert_flags));
408 if tape_alert_flags_critical(alert_flags) {
25350f33 409 self.pool.set_media_status_damaged(&media_uuid)?;
a08a1985
DM
410 bail!("aborting due to critical tape alert flags: {:?}", alert_flags);
411 }
5843268c
DM
412 }
413 }
414
32b75d36 415 let (catalog, new_media) = update_media_set_label(
c503ea70 416 worker,
66e42bec
DM
417 drive.as_mut(),
418 old_media_id.media_set_label,
419 media.id(),
c503ea70
DM
420 )?;
421
5c4755ad
DM
422 self.catalog_builder.lock().unwrap().append_catalog(catalog)?;
423
2b191385
DM
424 let media_set = media.media_set_label().clone().unwrap();
425
426 let encrypt_fingerprint = media_set
8a0046f5 427 .encryption_key_fingerprint
2b191385
DM
428 .clone()
429 .map(|fp| (fp, media_set.uuid.clone()));
8a0046f5
DM
430
431 drive.set_encryption(encrypt_fingerprint)?;
432
5c4755ad 433 self.status = Some(PoolWriterState { drive, at_eom: false, bytes_written: 0 });
d37da6b7 434
32b75d36
DM
435 if new_media {
436 // add catalogs from previous media
437 self.append_media_set_catalogs(worker)?;
438 }
439
d37da6b7
DM
440 Ok(media_uuid)
441 }
442
32b75d36
DM
443 fn open_catalog_file(uuid: &Uuid) -> Result<File, Error> {
444
445 let status_path = Path::new(TAPE_STATUS_DIR);
446 let mut path = status_path.to_owned();
447 path.push(uuid.to_string());
448 path.set_extension("log");
449
450 let file = std::fs::OpenOptions::new()
451 .read(true)
452 .open(&path)?;
453
454 Ok(file)
455 }
456
457 /// Move to EOM (if not already there), then write the current
458 /// catalog to the tape. On success, this return 'Ok(true)'.
459
460 /// Please note that this may fail when there is not enough space
461 /// on the media (return value 'Ok(false, _)'). In that case, the
462 /// archive is marked incomplete. The caller should mark the media
463 /// as full and try again using another media.
464 pub fn append_catalog_archive(
465 &mut self,
466 worker: &WorkerTask,
467 ) -> Result<bool, Error> {
468
469 let status = match self.status {
470 Some(ref mut status) => status,
471 None => bail!("PoolWriter - no media loaded"),
472 };
473
474 if !status.at_eom {
475 worker.log(String::from("moving to end of media"));
476 status.drive.move_to_eom()?;
477 status.at_eom = true;
478 }
479
480 let current_file_number = status.drive.current_file_number()?;
481 if current_file_number < 2 {
482 bail!("got strange file position number from drive ({})", current_file_number);
483 }
484
485 let catalog_builder = self.catalog_builder.lock().unwrap();
486
487 let catalog = match catalog_builder.catalog {
488 None => bail!("append_catalog_archive failed: no catalog - internal error"),
489 Some(ref catalog) => catalog,
490 };
491
492 let media_set = self.pool.current_media_set();
493
494 let media_list = media_set.media_list();
495 let uuid = match media_list.last() {
496 None => bail!("got empty media list - internal error"),
497 Some(None) => bail!("got incomplete media list - internal error"),
498 Some(Some(last_uuid)) => {
499 if last_uuid != catalog.uuid() {
500 bail!("got wrong media - internal error");
501 }
502 last_uuid
503 }
504 };
505
506 let seq_nr = media_list.len() - 1;
507
508 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
509
510 let mut file = Self::open_catalog_file(uuid)?;
511
512 let done = tape_write_catalog(
513 writer.as_mut(),
514 uuid,
515 media_set.uuid(),
516 seq_nr,
517 &mut file,
518 )?.is_some();
519
520 Ok(done)
521 }
522
523 // Append catalogs for all previous media in set (without last)
524 fn append_media_set_catalogs(
525 &mut self,
526 worker: &WorkerTask,
527 ) -> Result<(), Error> {
528
529 let media_set = self.pool.current_media_set();
530
531 let mut media_list = &media_set.media_list()[..];
532 if media_list.len() < 2 {
533 return Ok(());
534 }
535 media_list = &media_list[..(media_list.len()-1)];
536
537 let status = match self.status {
538 Some(ref mut status) => status,
539 None => bail!("PoolWriter - no media loaded"),
540 };
541
542 if !status.at_eom {
543 worker.log(String::from("moving to end of media"));
544 status.drive.move_to_eom()?;
545 status.at_eom = true;
546 }
547
548 let current_file_number = status.drive.current_file_number()?;
549 if current_file_number < 2 {
550 bail!("got strange file position number from drive ({})", current_file_number);
551 }
552
553 for (seq_nr, uuid) in media_list.iter().enumerate() {
554
555 let uuid = match uuid {
556 None => bail!("got incomplete media list - internal error"),
557 Some(uuid) => uuid,
558 };
559
560 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
561
562 let mut file = Self::open_catalog_file(uuid)?;
563
564 task_log!(worker, "write catalog for previous media: {}", uuid);
565
566 if tape_write_catalog(
567 writer.as_mut(),
568 uuid,
569 media_set.uuid(),
570 seq_nr,
571 &mut file,
572 )?.is_none() {
573 bail!("got EOM while writing start catalog");
574 }
575 }
576
577 Ok(())
578 }
579
d1d74c43 580 /// Move to EOM (if not already there), then creates a new snapshot
d37da6b7
DM
581 /// archive writing specified files (as .pxar) into it. On
582 /// success, this return 'Ok(true)' and the media catalog gets
583 /// updated.
584
585 /// Please note that this may fail when there is not enough space
586 /// on the media (return value 'Ok(false, _)'). In that case, the
587 /// archive is marked incomplete, and we do not use it. The caller
588 /// should mark the media as full and try again using another
589 /// media.
590 pub fn append_snapshot_archive(
591 &mut self,
5654d8ce 592 worker: &WorkerTask,
d37da6b7
DM
593 snapshot_reader: &SnapshotReader,
594 ) -> Result<(bool, usize), Error> {
595
596 let status = match self.status {
597 Some(ref mut status) => status,
598 None => bail!("PoolWriter - no media loaded"),
599 };
600
601 if !status.at_eom {
5654d8ce 602 worker.log(String::from("moving to end of media"));
d37da6b7
DM
603 status.drive.move_to_eom()?;
604 status.at_eom = true;
605 }
606
607 let current_file_number = status.drive.current_file_number()?;
608 if current_file_number < 2 {
609 bail!("got strange file position number from drive ({})", current_file_number);
610 }
611
612 let (done, bytes_written) = {
613 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
614
615 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
616 Some(content_uuid) => {
5c4755ad 617 self.catalog_builder.lock().unwrap().register_snapshot(
d37da6b7
DM
618 content_uuid,
619 current_file_number,
54722aca 620 &snapshot_reader.datastore_name().to_string(),
d37da6b7
DM
621 &snapshot_reader.snapshot().to_string(),
622 )?;
623 (true, writer.bytes_written())
624 }
625 None => (false, writer.bytes_written()),
626 }
627 };
628
629 status.bytes_written += bytes_written;
630
39735609 631 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
d37da6b7
DM
632
633 if !done || request_sync {
5c4755ad 634 self.commit()?;
d37da6b7
DM
635 }
636
637 Ok((done, bytes_written))
638 }
639
d1d74c43 640 /// Move to EOM (if not already there), then creates a new chunk
d37da6b7
DM
641 /// archive and writes chunks from 'chunk_iter'. This stops when
642 /// it detect LEOM or when we reach max archive size
643 /// (4GB). Written chunks are registered in the media catalog.
644 pub fn append_chunk_archive(
645 &mut self,
31cf625a 646 worker: &WorkerTask,
5c4755ad 647 chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
54722aca 648 store: &str,
d37da6b7
DM
649 ) -> Result<(bool, usize), Error> {
650
651 let status = match self.status {
652 Some(ref mut status) => status,
653 None => bail!("PoolWriter - no media loaded"),
654 };
655
656 if !status.at_eom {
5654d8ce 657 worker.log(String::from("moving to end of media"));
d37da6b7
DM
658 status.drive.move_to_eom()?;
659 status.at_eom = true;
660 }
661
662 let current_file_number = status.drive.current_file_number()?;
663 if current_file_number < 2 {
664 bail!("got strange file position number from drive ({})", current_file_number);
665 }
666 let writer = status.drive.write_file()?;
667
31cf625a
DM
668 let start_time = SystemTime::now();
669
d37da6b7 670 let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
31cf625a 671 worker,
d37da6b7 672 writer,
d37da6b7 673 chunk_iter,
54722aca 674 store,
d37da6b7
DM
675 MAX_CHUNK_ARCHIVE_SIZE,
676 )?;
677
678 status.bytes_written += bytes_written;
679
31cf625a
DM
680 let elapsed = start_time.elapsed()?.as_secs_f64();
681 worker.log(format!(
776dabfb 682 "wrote {} chunks ({:.2} MB at {:.2} MB/s)",
2c10410b 683 saved_chunks.len(),
776dabfb
DM
684 bytes_written as f64 /1_000_000.0,
685 (bytes_written as f64)/(1_000_000.0*elapsed),
31cf625a
DM
686 ));
687
39735609 688 let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
d37da6b7
DM
689
690 // register chunks in media_catalog
5c4755ad 691 self.catalog_builder.lock().unwrap()
54722aca 692 .register_chunk_archive(content_uuid, current_file_number, store, &saved_chunks)?;
d37da6b7
DM
693
694 if leom || request_sync {
5c4755ad 695 self.commit()?;
d37da6b7
DM
696 }
697
698 Ok((leom, bytes_written))
699 }
5c4755ad
DM
700
701 pub fn spawn_chunk_reader_thread(
702 &self,
703 datastore: Arc<DataStore>,
704 snapshot_reader: Arc<Mutex<SnapshotReader>>,
705 ) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
706 NewChunksIterator::spawn(
707 datastore,
708 snapshot_reader,
709 Arc::clone(&self.catalog_builder),
710 )
711 }
d37da6b7
DM
712}
713
714/// write up to <max_size> of chunks
715fn write_chunk_archive<'a>(
2c10410b 716 _worker: &WorkerTask,
d37da6b7 717 writer: Box<dyn 'a + TapeWrite>,
5c4755ad 718 chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
54722aca 719 store: &str,
d37da6b7
DM
720 max_size: usize,
721) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
722
54722aca 723 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, store, true)?;
d37da6b7 724
d37da6b7
DM
725 // we want to get the chunk list in correct order
726 let mut chunk_list: Vec<[u8;32]> = Vec::new();
727
728 let mut leom = false;
729
730 loop {
5c4755ad 731 let (digest, blob) = match chunk_iter.peek() {
d37da6b7 732 None => break,
5c4755ad 733 Some(Ok((digest, blob))) => (digest, blob),
e8913fea 734 Some(Err(err)) => bail!("{}", err),
d37da6b7 735 };
e8913fea 736
5c4755ad 737 //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(digest), blob.raw_size());
d37da6b7
DM
738
739 match writer.try_write_chunk(&digest, &blob) {
e8913fea 740 Ok(true) => {
5c4755ad 741 chunk_list.push(*digest);
e8913fea 742 chunk_iter.next(); // consume
d37da6b7
DM
743 }
744 Ok(false) => {
e8913fea 745 // Note; we do not consume the chunk (no chunk_iter.next())
d37da6b7
DM
746 leom = true;
747 break;
748 }
749 Err(err) => bail!("write chunk failed - {}", err),
750 }
751
752 if writer.bytes_written() > max_size {
2c10410b 753 //worker.log("Chunk Archive max size reached, closing archive".to_string());
d37da6b7
DM
754 break;
755 }
756 }
757
758 writer.finish()?;
759
760 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
761}
762
66e42bec
DM
763// Compare the media set label. If the media is empty, or the existing
764// set label does not match the expected media set, overwrite the
765// media set label.
766fn update_media_set_label(
ff58c519 767 worker: &WorkerTask,
66e42bec
DM
768 drive: &mut dyn TapeDriver,
769 old_set: Option<MediaSetLabel>,
d37da6b7 770 media_id: &MediaId,
32b75d36 771) -> Result<(MediaCatalog, bool), Error> {
d37da6b7
DM
772
773 let media_catalog;
774
775 let new_set = match media_id.media_set_label {
66e42bec 776 None => bail!("got media without media set - internal error"),
d37da6b7
DM
777 Some(ref set) => set,
778 };
779
feb1645f
DM
780 let key_config = if let Some(ref fingerprint) = new_set.encryption_key_fingerprint {
781 let (config_map, _digest) = load_key_configs()?;
782 match config_map.get(fingerprint) {
82a103c8 783 Some(key_config) => Some(key_config.clone()),
feb1645f
DM
784 None => {
785 bail!("unable to find tape encryption key config '{}'", fingerprint);
786 }
787 }
788 } else {
789 None
790 };
791
d37da6b7
DM
792 let status_path = Path::new(TAPE_STATUS_DIR);
793
32b75d36 794 let new_media = match old_set {
d37da6b7 795 None => {
3b82f3ee 796 worker.log("wrinting new media set label".to_string());
feb1645f 797 drive.write_media_set_label(new_set, key_config.as_ref())?;
31cf625a 798 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
32b75d36 799 true
d37da6b7
DM
800 }
801 Some(media_set_label) => {
802 if new_set.uuid == media_set_label.uuid {
803 if new_set.seq_nr != media_set_label.seq_nr {
804 bail!("got media with wrong media sequence number ({} != {}",
805 new_set.seq_nr,media_set_label.seq_nr);
806 }
8a0046f5
DM
807 if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint {
808 bail!("detected changed encryption fingerprint - internal error");
809 }
237314ad 810 media_catalog = MediaCatalog::open(status_path, &media_id, true, false)?;
32b75d36
DM
811
812 // todo: verify last content/media_catalog somehow?
813
814 false
d37da6b7 815 } else {
66e42bec
DM
816 worker.log(
817 format!("wrinting new media set label (overwrite '{}/{}')",
818 media_set_label.uuid.to_string(), media_set_label.seq_nr)
819 );
d37da6b7 820
feb1645f 821 drive.write_media_set_label(new_set, key_config.as_ref())?;
31cf625a 822 media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?;
32b75d36 823 true
d37da6b7
DM
824 }
825 }
32b75d36 826 };
d37da6b7 827
32b75d36 828 Ok((media_catalog, new_media))
d37da6b7 829}