]>
Commit | Line | Data |
---|---|---|
d37da6b7 DM |
1 | use std::collections::HashSet; |
2 | use std::path::Path; | |
31cf625a | 3 | use std::time::SystemTime; |
5c4755ad | 4 | use std::sync::{Arc, Mutex}; |
d37da6b7 | 5 | |
5c4755ad | 6 | use anyhow::{bail, format_err, Error}; |
d37da6b7 | 7 | |
66e42bec | 8 | use proxmox::tools::Uuid; |
d37da6b7 DM |
9 | |
10 | use crate::{ | |
3fbf2311 | 11 | task_log, |
d37da6b7 DM |
12 | backup::{ |
13 | DataStore, | |
5c4755ad | 14 | DataBlob, |
d37da6b7 | 15 | }, |
ff58c519 | 16 | server::WorkerTask, |
d37da6b7 DM |
17 | tape::{ |
18 | TAPE_STATUS_DIR, | |
19 | MAX_CHUNK_ARCHIVE_SIZE, | |
20 | COMMIT_BLOCK_SIZE, | |
d37da6b7 | 21 | TapeWrite, |
d37da6b7 | 22 | SnapshotReader, |
d37da6b7 DM |
23 | MediaPool, |
24 | MediaId, | |
25 | MediaCatalog, | |
26 | MediaSetCatalog, | |
f47e0357 DM |
27 | file_formats::{ |
28 | MediaSetLabel, | |
29 | ChunkArchiveWriter, | |
30 | tape_write_snapshot_archive, | |
31 | }, | |
37796ff7 DM |
32 | drive::{ |
33 | TapeDriver, | |
34 | request_and_load_media, | |
35 | tape_alert_flags_critical, | |
36 | media_changer, | |
37 | }, | |
d37da6b7 | 38 | }, |
feb1645f | 39 | config::tape_encryption_keys::load_key_configs, |
d37da6b7 DM |
40 | }; |
41 | ||
5c4755ad DM |
42 | /// Helper to build and query sets of catalogs |
43 | pub struct CatalogBuilder { | |
44 | // read only part | |
45 | media_set_catalog: MediaSetCatalog, | |
46 | // catalog to modify (latest in set) | |
47 | catalog: Option<MediaCatalog>, | |
48 | } | |
49 | ||
50 | impl CatalogBuilder { | |
51 | ||
52 | /// Test if the catalog already contains a snapshot | |
53 | pub fn contains_snapshot(&self, snapshot: &str) -> bool { | |
54 | if let Some(ref catalog) = self.catalog { | |
55 | if catalog.contains_snapshot(snapshot) { | |
56 | return true; | |
57 | } | |
58 | } | |
59 | self.media_set_catalog.contains_snapshot(snapshot) | |
60 | } | |
61 | ||
62 | /// Test if the catalog already contains a chunk | |
63 | pub fn contains_chunk(&self, digest: &[u8;32]) -> bool { | |
64 | if let Some(ref catalog) = self.catalog { | |
65 | if catalog.contains_chunk(digest) { | |
66 | return true; | |
67 | } | |
68 | } | |
69 | self.media_set_catalog.contains_chunk(digest) | |
70 | } | |
71 | ||
72 | /// Add a new catalog, move the old on to the read-only set | |
73 | pub fn append_catalog(&mut self, new_catalog: MediaCatalog) -> Result<(), Error> { | |
74 | ||
75 | // append current catalog to read-only set | |
76 | if let Some(catalog) = self.catalog.take() { | |
77 | self.media_set_catalog.append_catalog(catalog)?; | |
78 | } | |
79 | ||
80 | // remove read-only version from set (in case it is there) | |
81 | self.media_set_catalog.remove_catalog(&new_catalog.uuid()); | |
82 | ||
83 | self.catalog = Some(new_catalog); | |
84 | ||
85 | Ok(()) | |
86 | } | |
87 | ||
88 | /// Register a snapshot | |
89 | pub fn register_snapshot( | |
90 | &mut self, | |
91 | uuid: Uuid, // Uuid form MediaContentHeader | |
92 | file_number: u64, | |
93 | snapshot: &str, | |
94 | ) -> Result<(), Error> { | |
95 | match self.catalog { | |
96 | Some(ref mut catalog) => { | |
97 | catalog.register_snapshot(uuid, file_number, snapshot)?; | |
98 | } | |
99 | None => bail!("no catalog loaded - internal error"), | |
100 | } | |
101 | Ok(()) | |
102 | } | |
103 | ||
104 | /// Register a chunk archive | |
105 | pub fn register_chunk_archive( | |
106 | &mut self, | |
107 | uuid: Uuid, // Uuid form MediaContentHeader | |
108 | file_number: u64, | |
109 | chunk_list: &[[u8; 32]], | |
110 | ) -> Result<(), Error> { | |
111 | match self.catalog { | |
112 | Some(ref mut catalog) => { | |
113 | catalog.start_chunk_archive(uuid, file_number)?; | |
114 | for digest in chunk_list { | |
115 | catalog.register_chunk(digest)?; | |
116 | } | |
117 | catalog.end_chunk_archive()?; | |
118 | } | |
119 | None => bail!("no catalog loaded - internal error"), | |
120 | } | |
121 | Ok(()) | |
122 | } | |
123 | ||
124 | /// Commit the catalog changes | |
125 | pub fn commit(&mut self) -> Result<(), Error> { | |
126 | if let Some(ref mut catalog) = self.catalog { | |
127 | catalog.commit()?; | |
128 | } | |
129 | Ok(()) | |
130 | } | |
131 | } | |
132 | ||
133 | /// Chunk iterator which use a separate thread to read chunks | |
134 | /// | |
135 | /// The iterator skips duplicate chunks and chunks already in the | |
136 | /// catalog. | |
137 | pub struct NewChunksIterator { | |
138 | rx: std::sync::mpsc::Receiver<Result<Option<([u8; 32], DataBlob)>, Error>>, | |
139 | } | |
140 | ||
141 | impl NewChunksIterator { | |
142 | ||
143 | /// Creates the iterator, spawning a new thread | |
144 | /// | |
145 | /// Make sure to join() the returnd thread handle. | |
146 | pub fn spawn( | |
147 | datastore: Arc<DataStore>, | |
148 | snapshot_reader: Arc<Mutex<SnapshotReader>>, | |
149 | catalog_builder: Arc<Mutex<CatalogBuilder>>, | |
150 | ) -> Result<(std::thread::JoinHandle<()>, Self), Error> { | |
151 | ||
152 | let (tx, rx) = std::sync::mpsc::sync_channel(3); | |
153 | ||
154 | let reader_thread = std::thread::spawn(move || { | |
155 | ||
156 | let snapshot_reader = snapshot_reader.lock().unwrap(); | |
157 | ||
158 | let mut chunk_index: HashSet<[u8;32]> = HashSet::new(); | |
159 | ||
160 | let result: Result<(), Error> = proxmox::try_block!({ | |
161 | ||
162 | let mut chunk_iter = snapshot_reader.chunk_iterator()?; | |
163 | ||
164 | loop { | |
165 | let digest = match chunk_iter.next() { | |
166 | None => { | |
167 | tx.send(Ok(None)).unwrap(); | |
168 | break; | |
169 | } | |
170 | Some(digest) => digest?, | |
171 | }; | |
172 | ||
173 | if chunk_index.contains(&digest) { | |
174 | continue; | |
175 | } | |
176 | ||
177 | if catalog_builder.lock().unwrap().contains_chunk(&digest) { | |
178 | continue; | |
179 | }; | |
180 | ||
181 | let blob = datastore.load_chunk(&digest)?; | |
182 | //println!("LOAD CHUNK {}", proxmox::tools::digest_to_hex(&digest)); | |
183 | tx.send(Ok(Some((digest, blob)))).unwrap(); | |
184 | ||
185 | chunk_index.insert(digest); | |
186 | } | |
187 | ||
188 | Ok(()) | |
189 | }); | |
190 | if let Err(err) = result { | |
191 | tx.send(Err(err)).unwrap(); | |
192 | } | |
193 | }); | |
194 | ||
195 | Ok((reader_thread, Self { rx })) | |
196 | } | |
197 | } | |
198 | ||
199 | // We do not use Receiver::into_iter(). The manual implementation | |
200 | // returns a simpler type. | |
201 | impl Iterator for NewChunksIterator { | |
202 | type Item = Result<([u8; 32], DataBlob), Error>; | |
203 | ||
204 | fn next(&mut self) -> Option<Self::Item> { | |
205 | match self.rx.recv() { | |
206 | Ok(Ok(None)) => None, | |
207 | Ok(Ok(Some((digest, blob)))) => Some(Ok((digest, blob))), | |
208 | Ok(Err(err)) => Some(Err(err)), | |
209 | Err(_) => Some(Err(format_err!("reader thread failed"))), | |
210 | } | |
211 | } | |
212 | } | |
d37da6b7 DM |
213 | |
214 | struct PoolWriterState { | |
215 | drive: Box<dyn TapeDriver>, | |
d37da6b7 DM |
216 | // tell if we already moved to EOM |
217 | at_eom: bool, | |
218 | // bytes written after the last tape fush/sync | |
219 | bytes_written: usize, | |
220 | } | |
221 | ||
d37da6b7 DM |
222 | /// Helper to manage a backup job, writing several tapes of a pool |
223 | pub struct PoolWriter { | |
224 | pool: MediaPool, | |
225 | drive_name: String, | |
226 | status: Option<PoolWriterState>, | |
5c4755ad | 227 | catalog_builder: Arc<Mutex<CatalogBuilder>>, |
c9793d47 | 228 | notify_email: Option<String>, |
d37da6b7 DM |
229 | } |
230 | ||
231 | impl PoolWriter { | |
232 | ||
c9793d47 | 233 | pub fn new(mut pool: MediaPool, drive_name: &str, worker: &WorkerTask, notify_email: Option<String>) -> Result<Self, Error> { |
d37da6b7 DM |
234 | |
235 | let current_time = proxmox::tools::time::epoch_i64(); | |
236 | ||
90e16be3 DM |
237 | let new_media_set_reason = pool.start_write_session(current_time)?; |
238 | if let Some(reason) = new_media_set_reason { | |
239 | task_log!( | |
240 | worker, | |
241 | "starting new media set - reason: {}", | |
242 | reason, | |
243 | ); | |
244 | } | |
245 | ||
246 | task_log!(worker, "media set uuid: {}", pool.current_media_set()); | |
d37da6b7 DM |
247 | |
248 | let mut media_set_catalog = MediaSetCatalog::new(); | |
249 | ||
250 | // load all catalogs read-only at start | |
251 | for media_uuid in pool.current_media_list()? { | |
252 | let media_catalog = MediaCatalog::open( | |
253 | Path::new(TAPE_STATUS_DIR), | |
254 | &media_uuid, | |
255 | false, | |
256 | false, | |
257 | )?; | |
258 | media_set_catalog.append_catalog(media_catalog)?; | |
259 | } | |
260 | ||
5c4755ad DM |
261 | let catalog_builder = CatalogBuilder { media_set_catalog, catalog: None }; |
262 | ||
d37da6b7 DM |
263 | Ok(Self { |
264 | pool, | |
265 | drive_name: drive_name.to_string(), | |
266 | status: None, | |
5c4755ad | 267 | catalog_builder: Arc::new(Mutex::new(catalog_builder)), |
c9793d47 | 268 | notify_email, |
d37da6b7 DM |
269 | }) |
270 | } | |
271 | ||
272 | pub fn pool(&mut self) -> &mut MediaPool { | |
273 | &mut self.pool | |
274 | } | |
275 | ||
276 | /// Set media status to FULL (persistent - stores pool status) | |
277 | pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> { | |
278 | self.pool.set_media_status_full(&uuid)?; | |
279 | Ok(()) | |
280 | } | |
281 | ||
282 | pub fn contains_snapshot(&self, snapshot: &str) -> bool { | |
5c4755ad | 283 | self.catalog_builder.lock().unwrap().contains_snapshot(snapshot) |
d37da6b7 DM |
284 | } |
285 | ||
42967bf1 | 286 | /// Eject media and drop PoolWriterState (close drive) |
5654d8ce | 287 | pub fn eject_media(&mut self, worker: &WorkerTask) -> Result<(), Error> { |
42967bf1 DM |
288 | let mut status = match self.status.take() { |
289 | Some(status) => status, | |
290 | None => return Ok(()), // no media loaded | |
291 | }; | |
292 | ||
293 | let (drive_config, _digest) = crate::config::drive::config()?; | |
294 | ||
295 | if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? { | |
5654d8ce DM |
296 | worker.log("eject media"); |
297 | status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster | |
edb90f6a | 298 | drop(status); // close drive |
5654d8ce DM |
299 | worker.log("unload media"); |
300 | changer.unload_media(None)?; //eject and unload | |
42967bf1 | 301 | } else { |
5654d8ce | 302 | worker.log("standalone drive - ejecting media"); |
42967bf1 DM |
303 | status.drive.eject_media()?; |
304 | } | |
305 | ||
306 | Ok(()) | |
307 | } | |
308 | ||
edb90f6a DM |
309 | /// Export current media set and drop PoolWriterState (close drive) |
310 | pub fn export_media_set(&mut self, worker: &WorkerTask) -> Result<(), Error> { | |
5654d8ce | 311 | let mut status = self.status.take(); |
edb90f6a DM |
312 | |
313 | let (drive_config, _digest) = crate::config::drive::config()?; | |
314 | ||
315 | if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? { | |
5654d8ce DM |
316 | |
317 | if let Some(ref mut status) = status { | |
318 | worker.log("eject media"); | |
319 | status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster | |
320 | } | |
edb90f6a DM |
321 | drop(status); // close drive |
322 | ||
5654d8ce | 323 | worker.log("unload media"); |
edb90f6a DM |
324 | changer.unload_media(None)?; |
325 | ||
326 | for media_uuid in self.pool.current_media_list()? { | |
327 | let media = self.pool.lookup_media(media_uuid)?; | |
8446fbca DM |
328 | let label_text = media.label_text(); |
329 | if let Some(slot) = changer.export_media(label_text)? { | |
330 | worker.log(format!("exported media '{}' to import/export slot {}", label_text, slot)); | |
edb90f6a | 331 | } else { |
8446fbca | 332 | worker.warn(format!("export failed - media '{}' is not online", label_text)); |
edb90f6a DM |
333 | } |
334 | } | |
335 | ||
6334bdc1 FG |
336 | } else if let Some(mut status) = status { |
337 | worker.log("standalone drive - ejecting media instead of export"); | |
338 | status.drive.eject_media()?; | |
edb90f6a DM |
339 | } |
340 | ||
341 | Ok(()) | |
342 | } | |
343 | ||
d37da6b7 DM |
344 | /// commit changes to tape and catalog |
345 | /// | |
346 | /// This is done automatically during a backupsession, but needs to | |
347 | /// be called explicitly before dropping the PoolWriter | |
348 | pub fn commit(&mut self) -> Result<(), Error> { | |
5c4755ad DM |
349 | if let Some(PoolWriterState {ref mut drive, .. }) = self.status { |
350 | drive.sync()?; // sync all data to the tape | |
d37da6b7 | 351 | } |
5c4755ad | 352 | self.catalog_builder.lock().unwrap().commit()?; // then commit the catalog |
d37da6b7 DM |
353 | Ok(()) |
354 | } | |
355 | ||
356 | /// Load a writable media into the drive | |
ff58c519 | 357 | pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> { |
5c4755ad DM |
358 | let last_media_uuid = match self.catalog_builder.lock().unwrap().catalog { |
359 | Some(ref catalog) => Some(catalog.uuid().clone()), | |
d37da6b7 DM |
360 | None => None, |
361 | }; | |
362 | ||
363 | let current_time = proxmox::tools::time::epoch_i64(); | |
364 | let media_uuid = self.pool.alloc_writable_media(current_time)?; | |
365 | ||
366 | let media = self.pool.lookup_media(&media_uuid).unwrap(); | |
367 | ||
368 | let media_changed = match last_media_uuid { | |
369 | Some(ref last_media_uuid) => last_media_uuid != &media_uuid, | |
370 | None => true, | |
371 | }; | |
372 | ||
373 | if !media_changed { | |
374 | return Ok(media_uuid); | |
375 | } | |
376 | ||
3fbf2311 DM |
377 | task_log!(worker, "allocated new writable media '{}'", media.label_text()); |
378 | ||
5c4755ad DM |
379 | if let Some(PoolWriterState {mut drive, .. }) = self.status.take() { |
380 | if last_media_uuid.is_some() { | |
381 | task_log!(worker, "eject current media"); | |
382 | drive.eject_media()?; | |
383 | } | |
d37da6b7 DM |
384 | } |
385 | ||
386 | let (drive_config, _digest) = crate::config::drive::config()?; | |
66e42bec DM |
387 | |
388 | let (mut drive, old_media_id) = | |
c9793d47 | 389 | request_and_load_media(worker, &drive_config, &self.drive_name, media.label(), &self.notify_email)?; |
66e42bec | 390 | |
5843268c | 391 | // test for critical tape alert flags |
a08a1985 DM |
392 | if let Ok(alert_flags) = drive.tape_alert_flags() { |
393 | if !alert_flags.is_empty() { | |
394 | worker.log(format!("TapeAlertFlags: {:?}", alert_flags)); | |
395 | if tape_alert_flags_critical(alert_flags) { | |
25350f33 | 396 | self.pool.set_media_status_damaged(&media_uuid)?; |
a08a1985 DM |
397 | bail!("aborting due to critical tape alert flags: {:?}", alert_flags); |
398 | } | |
5843268c DM |
399 | } |
400 | } | |
401 | ||
66e42bec | 402 | let catalog = update_media_set_label( |
c503ea70 | 403 | worker, |
66e42bec DM |
404 | drive.as_mut(), |
405 | old_media_id.media_set_label, | |
406 | media.id(), | |
c503ea70 DM |
407 | )?; |
408 | ||
5c4755ad DM |
409 | self.catalog_builder.lock().unwrap().append_catalog(catalog)?; |
410 | ||
2b191385 DM |
411 | let media_set = media.media_set_label().clone().unwrap(); |
412 | ||
413 | let encrypt_fingerprint = media_set | |
8a0046f5 | 414 | .encryption_key_fingerprint |
2b191385 DM |
415 | .clone() |
416 | .map(|fp| (fp, media_set.uuid.clone())); | |
8a0046f5 DM |
417 | |
418 | drive.set_encryption(encrypt_fingerprint)?; | |
419 | ||
5c4755ad | 420 | self.status = Some(PoolWriterState { drive, at_eom: false, bytes_written: 0 }); |
d37da6b7 DM |
421 | |
422 | Ok(media_uuid) | |
423 | } | |
424 | ||
d1d74c43 | 425 | /// Move to EOM (if not already there), then creates a new snapshot |
d37da6b7 DM |
426 | /// archive writing specified files (as .pxar) into it. On |
427 | /// success, this return 'Ok(true)' and the media catalog gets | |
428 | /// updated. | |
429 | ||
430 | /// Please note that this may fail when there is not enough space | |
431 | /// on the media (return value 'Ok(false, _)'). In that case, the | |
432 | /// archive is marked incomplete, and we do not use it. The caller | |
433 | /// should mark the media as full and try again using another | |
434 | /// media. | |
435 | pub fn append_snapshot_archive( | |
436 | &mut self, | |
5654d8ce | 437 | worker: &WorkerTask, |
d37da6b7 DM |
438 | snapshot_reader: &SnapshotReader, |
439 | ) -> Result<(bool, usize), Error> { | |
440 | ||
441 | let status = match self.status { | |
442 | Some(ref mut status) => status, | |
443 | None => bail!("PoolWriter - no media loaded"), | |
444 | }; | |
445 | ||
446 | if !status.at_eom { | |
5654d8ce | 447 | worker.log(String::from("moving to end of media")); |
d37da6b7 DM |
448 | status.drive.move_to_eom()?; |
449 | status.at_eom = true; | |
450 | } | |
451 | ||
452 | let current_file_number = status.drive.current_file_number()?; | |
453 | if current_file_number < 2 { | |
454 | bail!("got strange file position number from drive ({})", current_file_number); | |
455 | } | |
456 | ||
457 | let (done, bytes_written) = { | |
458 | let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?; | |
459 | ||
460 | match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? { | |
461 | Some(content_uuid) => { | |
5c4755ad | 462 | self.catalog_builder.lock().unwrap().register_snapshot( |
d37da6b7 DM |
463 | content_uuid, |
464 | current_file_number, | |
465 | &snapshot_reader.snapshot().to_string(), | |
466 | )?; | |
467 | (true, writer.bytes_written()) | |
468 | } | |
469 | None => (false, writer.bytes_written()), | |
470 | } | |
471 | }; | |
472 | ||
473 | status.bytes_written += bytes_written; | |
474 | ||
39735609 | 475 | let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE; |
d37da6b7 DM |
476 | |
477 | if !done || request_sync { | |
5c4755ad | 478 | self.commit()?; |
d37da6b7 DM |
479 | } |
480 | ||
481 | Ok((done, bytes_written)) | |
482 | } | |
483 | ||
d1d74c43 | 484 | /// Move to EOM (if not already there), then creates a new chunk |
d37da6b7 DM |
485 | /// archive and writes chunks from 'chunk_iter'. This stops when |
486 | /// it detect LEOM or when we reach max archive size | |
487 | /// (4GB). Written chunks are registered in the media catalog. | |
488 | pub fn append_chunk_archive( | |
489 | &mut self, | |
31cf625a | 490 | worker: &WorkerTask, |
5c4755ad | 491 | chunk_iter: &mut std::iter::Peekable<NewChunksIterator>, |
d37da6b7 DM |
492 | ) -> Result<(bool, usize), Error> { |
493 | ||
494 | let status = match self.status { | |
495 | Some(ref mut status) => status, | |
496 | None => bail!("PoolWriter - no media loaded"), | |
497 | }; | |
498 | ||
499 | if !status.at_eom { | |
5654d8ce | 500 | worker.log(String::from("moving to end of media")); |
d37da6b7 DM |
501 | status.drive.move_to_eom()?; |
502 | status.at_eom = true; | |
503 | } | |
504 | ||
505 | let current_file_number = status.drive.current_file_number()?; | |
506 | if current_file_number < 2 { | |
507 | bail!("got strange file position number from drive ({})", current_file_number); | |
508 | } | |
509 | let writer = status.drive.write_file()?; | |
510 | ||
31cf625a DM |
511 | let start_time = SystemTime::now(); |
512 | ||
d37da6b7 | 513 | let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive( |
31cf625a | 514 | worker, |
d37da6b7 | 515 | writer, |
d37da6b7 | 516 | chunk_iter, |
d37da6b7 DM |
517 | MAX_CHUNK_ARCHIVE_SIZE, |
518 | )?; | |
519 | ||
520 | status.bytes_written += bytes_written; | |
521 | ||
31cf625a DM |
522 | let elapsed = start_time.elapsed()?.as_secs_f64(); |
523 | worker.log(format!( | |
776dabfb | 524 | "wrote {} chunks ({:.2} MB at {:.2} MB/s)", |
2c10410b | 525 | saved_chunks.len(), |
776dabfb DM |
526 | bytes_written as f64 /1_000_000.0, |
527 | (bytes_written as f64)/(1_000_000.0*elapsed), | |
31cf625a DM |
528 | )); |
529 | ||
39735609 | 530 | let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE; |
d37da6b7 DM |
531 | |
532 | // register chunks in media_catalog | |
5c4755ad DM |
533 | self.catalog_builder.lock().unwrap() |
534 | .register_chunk_archive(content_uuid, current_file_number, &saved_chunks)?; | |
d37da6b7 DM |
535 | |
536 | if leom || request_sync { | |
5c4755ad | 537 | self.commit()?; |
d37da6b7 DM |
538 | } |
539 | ||
540 | Ok((leom, bytes_written)) | |
541 | } | |
5c4755ad DM |
542 | |
543 | pub fn spawn_chunk_reader_thread( | |
544 | &self, | |
545 | datastore: Arc<DataStore>, | |
546 | snapshot_reader: Arc<Mutex<SnapshotReader>>, | |
547 | ) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> { | |
548 | NewChunksIterator::spawn( | |
549 | datastore, | |
550 | snapshot_reader, | |
551 | Arc::clone(&self.catalog_builder), | |
552 | ) | |
553 | } | |
d37da6b7 DM |
554 | } |
555 | ||
556 | /// write up to <max_size> of chunks | |
557 | fn write_chunk_archive<'a>( | |
2c10410b | 558 | _worker: &WorkerTask, |
d37da6b7 | 559 | writer: Box<dyn 'a + TapeWrite>, |
5c4755ad | 560 | chunk_iter: &mut std::iter::Peekable<NewChunksIterator>, |
d37da6b7 DM |
561 | max_size: usize, |
562 | ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> { | |
563 | ||
564 | let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?; | |
565 | ||
d37da6b7 DM |
566 | // we want to get the chunk list in correct order |
567 | let mut chunk_list: Vec<[u8;32]> = Vec::new(); | |
568 | ||
569 | let mut leom = false; | |
570 | ||
571 | loop { | |
5c4755ad | 572 | let (digest, blob) = match chunk_iter.peek() { |
d37da6b7 | 573 | None => break, |
5c4755ad | 574 | Some(Ok((digest, blob))) => (digest, blob), |
e8913fea | 575 | Some(Err(err)) => bail!("{}", err), |
d37da6b7 | 576 | }; |
e8913fea | 577 | |
5c4755ad | 578 | //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(digest), blob.raw_size()); |
d37da6b7 DM |
579 | |
580 | match writer.try_write_chunk(&digest, &blob) { | |
e8913fea | 581 | Ok(true) => { |
5c4755ad | 582 | chunk_list.push(*digest); |
e8913fea | 583 | chunk_iter.next(); // consume |
d37da6b7 DM |
584 | } |
585 | Ok(false) => { | |
e8913fea | 586 | // Note; we do not consume the chunk (no chunk_iter.next()) |
d37da6b7 DM |
587 | leom = true; |
588 | break; | |
589 | } | |
590 | Err(err) => bail!("write chunk failed - {}", err), | |
591 | } | |
592 | ||
593 | if writer.bytes_written() > max_size { | |
2c10410b | 594 | //worker.log("Chunk Archive max size reached, closing archive".to_string()); |
d37da6b7 DM |
595 | break; |
596 | } | |
597 | } | |
598 | ||
599 | writer.finish()?; | |
600 | ||
601 | Ok((chunk_list, content_uuid, leom, writer.bytes_written())) | |
602 | } | |
603 | ||
66e42bec DM |
604 | // Compare the media set label. If the media is empty, or the existing |
605 | // set label does not match the expected media set, overwrite the | |
606 | // media set label. | |
607 | fn update_media_set_label( | |
ff58c519 | 608 | worker: &WorkerTask, |
66e42bec DM |
609 | drive: &mut dyn TapeDriver, |
610 | old_set: Option<MediaSetLabel>, | |
d37da6b7 | 611 | media_id: &MediaId, |
66e42bec | 612 | ) -> Result<MediaCatalog, Error> { |
d37da6b7 DM |
613 | |
614 | let media_catalog; | |
615 | ||
616 | let new_set = match media_id.media_set_label { | |
66e42bec | 617 | None => bail!("got media without media set - internal error"), |
d37da6b7 DM |
618 | Some(ref set) => set, |
619 | }; | |
620 | ||
feb1645f DM |
621 | let key_config = if let Some(ref fingerprint) = new_set.encryption_key_fingerprint { |
622 | let (config_map, _digest) = load_key_configs()?; | |
623 | match config_map.get(fingerprint) { | |
82a103c8 | 624 | Some(key_config) => Some(key_config.clone()), |
feb1645f DM |
625 | None => { |
626 | bail!("unable to find tape encryption key config '{}'", fingerprint); | |
627 | } | |
628 | } | |
629 | } else { | |
630 | None | |
631 | }; | |
632 | ||
d37da6b7 DM |
633 | let status_path = Path::new(TAPE_STATUS_DIR); |
634 | ||
66e42bec | 635 | match old_set { |
d37da6b7 | 636 | None => { |
3b82f3ee | 637 | worker.log("wrinting new media set label".to_string()); |
feb1645f | 638 | drive.write_media_set_label(new_set, key_config.as_ref())?; |
31cf625a | 639 | media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?; |
d37da6b7 DM |
640 | } |
641 | Some(media_set_label) => { | |
642 | if new_set.uuid == media_set_label.uuid { | |
643 | if new_set.seq_nr != media_set_label.seq_nr { | |
644 | bail!("got media with wrong media sequence number ({} != {}", | |
645 | new_set.seq_nr,media_set_label.seq_nr); | |
646 | } | |
8a0046f5 DM |
647 | if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint { |
648 | bail!("detected changed encryption fingerprint - internal error"); | |
649 | } | |
d37da6b7 DM |
650 | media_catalog = MediaCatalog::open(status_path, &media_id.label.uuid, true, false)?; |
651 | } else { | |
66e42bec DM |
652 | worker.log( |
653 | format!("wrinting new media set label (overwrite '{}/{}')", | |
654 | media_set_label.uuid.to_string(), media_set_label.seq_nr) | |
655 | ); | |
d37da6b7 | 656 | |
feb1645f | 657 | drive.write_media_set_label(new_set, key_config.as_ref())?; |
31cf625a | 658 | media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?; |
d37da6b7 DM |
659 | } |
660 | } | |
661 | } | |
662 | ||
663 | // todo: verify last content/media_catalog somehow? | |
d37da6b7 | 664 | |
66e42bec | 665 | Ok(media_catalog) |
d37da6b7 | 666 | } |