]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer.rs
tape: cleanup: s/transfer/transfer_media/, avoid compiler warnings
[proxmox-backup.git] / src / tape / pool_writer.rs
1 use std::collections::HashSet;
2 use std::path::Path;
3
4 use anyhow::{bail, Error};
5
6 use proxmox::tools::Uuid;
7
8 use crate::{
9 backup::{
10 DataStore,
11 },
12 server::WorkerTask,
13 tape::{
14 TAPE_STATUS_DIR,
15 MAX_CHUNK_ARCHIVE_SIZE,
16 COMMIT_BLOCK_SIZE,
17 TapeDriver,
18 TapeWrite,
19 ChunkArchiveWriter,
20 SnapshotReader,
21 SnapshotChunkIterator,
22 MediaPool,
23 MediaId,
24 MediaCatalog,
25 MediaSetCatalog,
26 tape_write_snapshot_archive,
27 request_and_load_media,
28 tape_alert_flags_critical,
29 media_changer,
30 file_formats::MediaSetLabel,
31 },
32 };
33
34
35 struct PoolWriterState {
36 drive: Box<dyn TapeDriver>,
37 catalog: MediaCatalog,
38 // tell if we already moved to EOM
39 at_eom: bool,
40 // bytes written after the last tape fush/sync
41 bytes_written: usize,
42 }
43
44 impl PoolWriterState {
45
46 fn commit(&mut self) -> Result<(), Error> {
47 self.drive.sync()?; // sync all data to the tape
48 self.catalog.commit()?; // then commit the catalog
49 self.bytes_written = 0;
50 Ok(())
51 }
52 }
53
54 /// Helper to manage a backup job, writing several tapes of a pool
55 pub struct PoolWriter {
56 pool: MediaPool,
57 drive_name: String,
58 status: Option<PoolWriterState>,
59 media_set_catalog: MediaSetCatalog,
60 }
61
62 impl PoolWriter {
63
64 pub fn new(mut pool: MediaPool, drive_name: &str) -> Result<Self, Error> {
65
66 let current_time = proxmox::tools::time::epoch_i64();
67
68 pool.start_write_session(current_time)?;
69
70 let mut media_set_catalog = MediaSetCatalog::new();
71
72 // load all catalogs read-only at start
73 for media_uuid in pool.current_media_list()? {
74 let media_catalog = MediaCatalog::open(
75 Path::new(TAPE_STATUS_DIR),
76 &media_uuid,
77 false,
78 false,
79 )?;
80 media_set_catalog.append_catalog(media_catalog)?;
81 }
82
83 Ok(Self {
84 pool,
85 drive_name: drive_name.to_string(),
86 status: None,
87 media_set_catalog,
88 })
89 }
90
91 pub fn pool(&mut self) -> &mut MediaPool {
92 &mut self.pool
93 }
94
95 /// Set media status to FULL (persistent - stores pool status)
96 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
97 self.pool.set_media_status_full(&uuid)?;
98 Ok(())
99 }
100
101 pub fn contains_snapshot(&self, snapshot: &str) -> bool {
102 if let Some(PoolWriterState { ref catalog, .. }) = self.status {
103 if catalog.contains_snapshot(snapshot) {
104 return true;
105 }
106 }
107 self.media_set_catalog.contains_snapshot(snapshot)
108 }
109
110 /// Eject media and drop PoolWriterState (close drive)
111 pub fn eject_media(&mut self) -> Result<(), Error> {
112 let mut status = match self.status.take() {
113 Some(status) => status,
114 None => return Ok(()), // no media loaded
115 };
116
117 let (drive_config, _digest) = crate::config::drive::config()?;
118
119 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
120 drop(status); // close drive
121 changer.unload_media(None)?;
122 } else {
123 status.drive.eject_media()?;
124 }
125
126 Ok(())
127 }
128
129 /// Export current media set and drop PoolWriterState (close drive)
130 pub fn export_media_set(&mut self, worker: &WorkerTask) -> Result<(), Error> {
131 let status = self.status.take();
132
133 let (drive_config, _digest) = crate::config::drive::config()?;
134
135 if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
136 drop(status); // close drive
137
138 changer.unload_media(None)?;
139
140 for media_uuid in self.pool.current_media_list()? {
141 let media = self.pool.lookup_media(media_uuid)?;
142 let changer_id = media.changer_id();
143 if let Some(slot) = changer.export_media(changer_id)? {
144 worker.log(format!("exported media '{}' to import/export slot {}", changer_id, slot));
145 } else {
146 worker.warn(format!("export failed - media '{}' is not online", changer_id));
147 }
148 }
149
150 } else {
151 worker.log("standalone drive - ejecting media instead of export");
152 if let Some(mut status) = status {
153 status.drive.eject_media()?;
154 }
155 }
156
157 Ok(())
158 }
159
160 /// commit changes to tape and catalog
161 ///
162 /// This is done automatically during a backupsession, but needs to
163 /// be called explicitly before dropping the PoolWriter
164 pub fn commit(&mut self) -> Result<(), Error> {
165 if let Some(ref mut status) = self.status {
166 status.commit()?;
167 }
168 Ok(())
169 }
170
171 /// Load a writable media into the drive
172 pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
173 let last_media_uuid = match self.status {
174 Some(PoolWriterState { ref catalog, .. }) => Some(catalog.uuid().clone()),
175 None => None,
176 };
177
178 let current_time = proxmox::tools::time::epoch_i64();
179 let media_uuid = self.pool.alloc_writable_media(current_time)?;
180
181 let media = self.pool.lookup_media(&media_uuid).unwrap();
182
183 let media_changed = match last_media_uuid {
184 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
185 None => true,
186 };
187
188 if !media_changed {
189 return Ok(media_uuid);
190 }
191
192 // remove read-only catalog (we store a writable version in status)
193 self.media_set_catalog.remove_catalog(&media_uuid);
194
195 if let Some(PoolWriterState {mut drive, catalog, .. }) = self.status.take() {
196 self.media_set_catalog.append_catalog(catalog)?;
197 drive.eject_media()?;
198 }
199
200 let (drive_config, _digest) = crate::config::drive::config()?;
201
202 let (mut drive, old_media_id) =
203 request_and_load_media(worker, &drive_config, &self.drive_name, media.label())?;
204
205 // test for critical tape alert flags
206 let alert_flags = drive.tape_alert_flags()?;
207 if !alert_flags.is_empty() {
208 worker.log(format!("TapeAlertFlags: {:?}", alert_flags));
209 if tape_alert_flags_critical(alert_flags) {
210 bail!("aborting due to critical tape alert flags: {:?}", alert_flags);
211 }
212 }
213
214 let catalog = update_media_set_label(
215 worker,
216 drive.as_mut(),
217 old_media_id.media_set_label,
218 media.id(),
219 )?;
220
221 self.status = Some(PoolWriterState { drive, catalog, at_eom: false, bytes_written: 0 });
222
223 Ok(media_uuid)
224 }
225
226 /// uuid of currently loaded BackupMedia
227 pub fn current_media_uuid(&self) -> Result<&Uuid, Error> {
228 match self.status {
229 Some(PoolWriterState { ref catalog, ..}) => Ok(catalog.uuid()),
230 None => bail!("PoolWriter - no media loaded"),
231 }
232 }
233
234 /// Move to EOM (if not aleady there), then creates a new snapshot
235 /// archive writing specified files (as .pxar) into it. On
236 /// success, this return 'Ok(true)' and the media catalog gets
237 /// updated.
238
239 /// Please note that this may fail when there is not enough space
240 /// on the media (return value 'Ok(false, _)'). In that case, the
241 /// archive is marked incomplete, and we do not use it. The caller
242 /// should mark the media as full and try again using another
243 /// media.
244 pub fn append_snapshot_archive(
245 &mut self,
246 snapshot_reader: &SnapshotReader,
247 ) -> Result<(bool, usize), Error> {
248
249 let status = match self.status {
250 Some(ref mut status) => status,
251 None => bail!("PoolWriter - no media loaded"),
252 };
253
254 if !status.at_eom {
255 status.drive.move_to_eom()?;
256 status.at_eom = true;
257 }
258
259 let current_file_number = status.drive.current_file_number()?;
260 if current_file_number < 2 {
261 bail!("got strange file position number from drive ({})", current_file_number);
262 }
263
264 let (done, bytes_written) = {
265 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
266
267 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
268 Some(content_uuid) => {
269 status.catalog.register_snapshot(
270 content_uuid,
271 current_file_number,
272 &snapshot_reader.snapshot().to_string(),
273 )?;
274 (true, writer.bytes_written())
275 }
276 None => (false, writer.bytes_written()),
277 }
278 };
279
280 status.bytes_written += bytes_written;
281
282 let request_sync = if status.bytes_written >= COMMIT_BLOCK_SIZE { true } else { false };
283
284 if !done || request_sync {
285 status.commit()?;
286 }
287
288 Ok((done, bytes_written))
289 }
290
291 /// Move to EOM (if not aleady there), then creates a new chunk
292 /// archive and writes chunks from 'chunk_iter'. This stops when
293 /// it detect LEOM or when we reach max archive size
294 /// (4GB). Written chunks are registered in the media catalog.
295 pub fn append_chunk_archive(
296 &mut self,
297 datastore: &DataStore,
298 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
299 ) -> Result<(bool, usize), Error> {
300
301 let status = match self.status {
302 Some(ref mut status) => status,
303 None => bail!("PoolWriter - no media loaded"),
304 };
305
306 if !status.at_eom {
307 status.drive.move_to_eom()?;
308 status.at_eom = true;
309 }
310
311 let current_file_number = status.drive.current_file_number()?;
312 if current_file_number < 2 {
313 bail!("got strange file position number from drive ({})", current_file_number);
314 }
315 let writer = status.drive.write_file()?;
316
317 let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
318 writer,
319 datastore,
320 chunk_iter,
321 &self.media_set_catalog,
322 &status.catalog,
323 MAX_CHUNK_ARCHIVE_SIZE,
324 )?;
325
326 status.bytes_written += bytes_written;
327
328 let request_sync = if status.bytes_written >= COMMIT_BLOCK_SIZE { true } else { false };
329
330 // register chunks in media_catalog
331 status.catalog.start_chunk_archive(content_uuid, current_file_number)?;
332 for digest in saved_chunks {
333 status.catalog.register_chunk(&digest)?;
334 }
335 status.catalog.end_chunk_archive()?;
336
337 if leom || request_sync {
338 status.commit()?;
339 }
340
341 Ok((leom, bytes_written))
342 }
343 }
344
345 /// write up to <max_size> of chunks
346 fn write_chunk_archive<'a>(
347 writer: Box<dyn 'a + TapeWrite>,
348 datastore: &DataStore,
349 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
350 media_set_catalog: &MediaSetCatalog,
351 media_catalog: &MediaCatalog,
352 max_size: usize,
353 ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
354
355 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?;
356
357 let mut chunk_index: HashSet<[u8;32]> = HashSet::new();
358
359 // we want to get the chunk list in correct order
360 let mut chunk_list: Vec<[u8;32]> = Vec::new();
361
362 let mut leom = false;
363
364 loop {
365 let digest = match chunk_iter.next() {
366 None => break,
367 Some(digest) => digest?,
368 };
369 if media_catalog.contains_chunk(&digest)
370 || chunk_index.contains(&digest)
371 || media_set_catalog.contains_chunk(&digest)
372 {
373 continue;
374 }
375
376 let blob = datastore.load_chunk(&digest)?;
377 println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(&digest), blob.raw_size());
378
379 match writer.try_write_chunk(&digest, &blob) {
380 Ok(true) => {
381 chunk_index.insert(digest);
382 chunk_list.push(digest);
383 }
384 Ok(false) => {
385 leom = true;
386 break;
387 }
388 Err(err) => bail!("write chunk failed - {}", err),
389 }
390
391 if writer.bytes_written() > max_size {
392 println!("Chunk Archive max size reached, closing archive");
393 break;
394 }
395 }
396
397 writer.finish()?;
398
399 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
400 }
401
402 // Compare the media set label. If the media is empty, or the existing
403 // set label does not match the expected media set, overwrite the
404 // media set label.
405 fn update_media_set_label(
406 worker: &WorkerTask,
407 drive: &mut dyn TapeDriver,
408 old_set: Option<MediaSetLabel>,
409 media_id: &MediaId,
410 ) -> Result<MediaCatalog, Error> {
411
412 let media_catalog;
413
414 let new_set = match media_id.media_set_label {
415 None => bail!("got media without media set - internal error"),
416 Some(ref set) => set,
417 };
418
419 let status_path = Path::new(TAPE_STATUS_DIR);
420
421 match old_set {
422 None => {
423 worker.log(format!("wrinting new media set label"));
424 drive.write_media_set_label(new_set)?;
425 media_catalog = MediaCatalog::overwrite(status_path, media_id, true)?;
426 }
427 Some(media_set_label) => {
428 if new_set.uuid == media_set_label.uuid {
429 if new_set.seq_nr != media_set_label.seq_nr {
430 bail!("got media with wrong media sequence number ({} != {}",
431 new_set.seq_nr,media_set_label.seq_nr);
432 }
433 media_catalog = MediaCatalog::open(status_path, &media_id.label.uuid, true, false)?;
434 } else {
435 worker.log(
436 format!("wrinting new media set label (overwrite '{}/{}')",
437 media_set_label.uuid.to_string(), media_set_label.seq_nr)
438 );
439
440 drive.write_media_set_label(new_set)?;
441 media_catalog = MediaCatalog::overwrite(status_path, media_id, true)?;
442 }
443 }
444 }
445
446 // todo: verify last content/media_catalog somehow?
447 drive.move_to_eom()?;
448
449 Ok(media_catalog)
450 }