]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer.rs
tape: further PoolWriter cleanups
[proxmox-backup.git] / src / tape / pool_writer.rs
1 use std::collections::HashSet;
2 use std::path::Path;
3
4 use anyhow::{bail, Error};
5
6 use proxmox::tools::Uuid;
7
8 use crate::{
9 backup::{
10 DataStore,
11 },
12 server::WorkerTask,
13 tape::{
14 TAPE_STATUS_DIR,
15 MAX_CHUNK_ARCHIVE_SIZE,
16 COMMIT_BLOCK_SIZE,
17 TapeDriver,
18 TapeWrite,
19 ChunkArchiveWriter,
20 SnapshotReader,
21 SnapshotChunkIterator,
22 MediaPool,
23 MediaId,
24 MediaCatalog,
25 MediaSetCatalog,
26 tape_write_snapshot_archive,
27 request_and_load_media,
28 file_formats::MediaSetLabel,
29 },
30 };
31
32
33 struct PoolWriterState {
34 drive: Box<dyn TapeDriver>,
35 catalog: MediaCatalog,
36 // tell if we already moved to EOM
37 at_eom: bool,
38 // bytes written after the last tape fush/sync
39 bytes_written: usize,
40 }
41
42 impl PoolWriterState {
43
44 fn commit(&mut self) -> Result<(), Error> {
45 self.drive.sync()?; // sync all data to the tape
46 self.catalog.commit()?; // then commit the catalog
47 self.bytes_written = 0;
48 Ok(())
49 }
50 }
51
52 /// Helper to manage a backup job, writing several tapes of a pool
53 pub struct PoolWriter {
54 pool: MediaPool,
55 drive_name: String,
56 status: Option<PoolWriterState>,
57 media_set_catalog: MediaSetCatalog,
58 }
59
60 impl PoolWriter {
61
62 pub fn new(mut pool: MediaPool, drive_name: &str) -> Result<Self, Error> {
63
64 let current_time = proxmox::tools::time::epoch_i64();
65
66 pool.start_write_session(current_time)?;
67
68 let mut media_set_catalog = MediaSetCatalog::new();
69
70 // load all catalogs read-only at start
71 for media_uuid in pool.current_media_list()? {
72 let media_catalog = MediaCatalog::open(
73 Path::new(TAPE_STATUS_DIR),
74 &media_uuid,
75 false,
76 false,
77 )?;
78 media_set_catalog.append_catalog(media_catalog)?;
79 }
80
81 Ok(Self {
82 pool,
83 drive_name: drive_name.to_string(),
84 status: None,
85 media_set_catalog,
86 })
87 }
88
89 pub fn pool(&mut self) -> &mut MediaPool {
90 &mut self.pool
91 }
92
93 /// Set media status to FULL (persistent - stores pool status)
94 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
95 self.pool.set_media_status_full(&uuid)?;
96 Ok(())
97 }
98
99 pub fn contains_snapshot(&self, snapshot: &str) -> bool {
100 if let Some(PoolWriterState { ref catalog, .. }) = self.status {
101 if catalog.contains_snapshot(snapshot) {
102 return true;
103 }
104 }
105 self.media_set_catalog.contains_snapshot(snapshot)
106 }
107
108 /// commit changes to tape and catalog
109 ///
110 /// This is done automatically during a backupsession, but needs to
111 /// be called explicitly before dropping the PoolWriter
112 pub fn commit(&mut self) -> Result<(), Error> {
113 if let Some(ref mut status) = self.status {
114 status.commit()?;
115 }
116 Ok(())
117 }
118
119 /// Load a writable media into the drive
120 pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
121 let last_media_uuid = match self.status {
122 Some(PoolWriterState { ref catalog, .. }) => Some(catalog.uuid().clone()),
123 None => None,
124 };
125
126 let current_time = proxmox::tools::time::epoch_i64();
127 let media_uuid = self.pool.alloc_writable_media(current_time)?;
128
129 let media = self.pool.lookup_media(&media_uuid).unwrap();
130
131 let media_changed = match last_media_uuid {
132 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
133 None => true,
134 };
135
136 if !media_changed {
137 return Ok(media_uuid);
138 }
139
140 // remove read-only catalog (we store a writable version in status)
141 self.media_set_catalog.remove_catalog(&media_uuid);
142
143 if let Some(PoolWriterState {mut drive, catalog, .. }) = self.status.take() {
144 self.media_set_catalog.append_catalog(catalog)?;
145 drive.eject_media()?;
146 }
147
148 let (drive_config, _digest) = crate::config::drive::config()?;
149
150 let (mut drive, old_media_id) =
151 request_and_load_media(worker, &drive_config, &self.drive_name, media.label())?;
152
153 let catalog = update_media_set_label(
154 worker,
155 drive.as_mut(),
156 old_media_id.media_set_label,
157 media.id(),
158 )?;
159
160 self.status = Some(PoolWriterState { drive, catalog, at_eom: false, bytes_written: 0 });
161
162 Ok(media_uuid)
163 }
164
165 /// uuid of currently loaded BackupMedia
166 pub fn current_media_uuid(&self) -> Result<&Uuid, Error> {
167 match self.status {
168 Some(PoolWriterState { ref catalog, ..}) => Ok(catalog.uuid()),
169 None => bail!("PoolWriter - no media loaded"),
170 }
171 }
172
173 /// Move to EOM (if not aleady there), then creates a new snapshot
174 /// archive writing specified files (as .pxar) into it. On
175 /// success, this return 'Ok(true)' and the media catalog gets
176 /// updated.
177
178 /// Please note that this may fail when there is not enough space
179 /// on the media (return value 'Ok(false, _)'). In that case, the
180 /// archive is marked incomplete, and we do not use it. The caller
181 /// should mark the media as full and try again using another
182 /// media.
183 pub fn append_snapshot_archive(
184 &mut self,
185 snapshot_reader: &SnapshotReader,
186 ) -> Result<(bool, usize), Error> {
187
188 let status = match self.status {
189 Some(ref mut status) => status,
190 None => bail!("PoolWriter - no media loaded"),
191 };
192
193 if !status.at_eom {
194 status.drive.move_to_eom()?;
195 status.at_eom = true;
196 }
197
198 let current_file_number = status.drive.current_file_number()?;
199 if current_file_number < 2 {
200 bail!("got strange file position number from drive ({})", current_file_number);
201 }
202
203 let (done, bytes_written) = {
204 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
205
206 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
207 Some(content_uuid) => {
208 status.catalog.register_snapshot(
209 content_uuid,
210 current_file_number,
211 &snapshot_reader.snapshot().to_string(),
212 )?;
213 (true, writer.bytes_written())
214 }
215 None => (false, writer.bytes_written()),
216 }
217 };
218
219 status.bytes_written += bytes_written;
220
221 let request_sync = if status.bytes_written >= COMMIT_BLOCK_SIZE { true } else { false };
222
223 if !done || request_sync {
224 status.commit()?;
225 }
226
227 Ok((done, bytes_written))
228 }
229
230 /// Move to EOM (if not aleady there), then creates a new chunk
231 /// archive and writes chunks from 'chunk_iter'. This stops when
232 /// it detect LEOM or when we reach max archive size
233 /// (4GB). Written chunks are registered in the media catalog.
234 pub fn append_chunk_archive(
235 &mut self,
236 datastore: &DataStore,
237 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
238 ) -> Result<(bool, usize), Error> {
239
240 let status = match self.status {
241 Some(ref mut status) => status,
242 None => bail!("PoolWriter - no media loaded"),
243 };
244
245 if !status.at_eom {
246 status.drive.move_to_eom()?;
247 status.at_eom = true;
248 }
249
250 let current_file_number = status.drive.current_file_number()?;
251 if current_file_number < 2 {
252 bail!("got strange file position number from drive ({})", current_file_number);
253 }
254 let writer = status.drive.write_file()?;
255
256 let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
257 writer,
258 datastore,
259 chunk_iter,
260 &self.media_set_catalog,
261 &status.catalog,
262 MAX_CHUNK_ARCHIVE_SIZE,
263 )?;
264
265 status.bytes_written += bytes_written;
266
267 let request_sync = if status.bytes_written >= COMMIT_BLOCK_SIZE { true } else { false };
268
269 // register chunks in media_catalog
270 status.catalog.start_chunk_archive(content_uuid, current_file_number)?;
271 for digest in saved_chunks {
272 status.catalog.register_chunk(&digest)?;
273 }
274 status.catalog.end_chunk_archive()?;
275
276 if leom || request_sync {
277 status.commit()?;
278 }
279
280 Ok((leom, bytes_written))
281 }
282 }
283
284 /// write up to <max_size> of chunks
285 fn write_chunk_archive<'a>(
286 writer: Box<dyn 'a + TapeWrite>,
287 datastore: &DataStore,
288 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
289 media_set_catalog: &MediaSetCatalog,
290 media_catalog: &MediaCatalog,
291 max_size: usize,
292 ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
293
294 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?;
295
296 let mut chunk_index: HashSet<[u8;32]> = HashSet::new();
297
298 // we want to get the chunk list in correct order
299 let mut chunk_list: Vec<[u8;32]> = Vec::new();
300
301 let mut leom = false;
302
303 loop {
304 let digest = match chunk_iter.next() {
305 None => break,
306 Some(digest) => digest?,
307 };
308 if media_catalog.contains_chunk(&digest)
309 || chunk_index.contains(&digest)
310 || media_set_catalog.contains_chunk(&digest)
311 {
312 continue;
313 }
314
315 let blob = datastore.load_chunk(&digest)?;
316 println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(&digest), blob.raw_size());
317
318 match writer.try_write_chunk(&digest, &blob) {
319 Ok(true) => {
320 chunk_index.insert(digest);
321 chunk_list.push(digest);
322 }
323 Ok(false) => {
324 leom = true;
325 break;
326 }
327 Err(err) => bail!("write chunk failed - {}", err),
328 }
329
330 if writer.bytes_written() > max_size {
331 println!("Chunk Archive max size reached, closing archive");
332 break;
333 }
334 }
335
336 writer.finish()?;
337
338 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
339 }
340
341 // Compare the media set label. If the media is empty, or the existing
342 // set label does not match the expected media set, overwrite the
343 // media set label.
344 fn update_media_set_label(
345 worker: &WorkerTask,
346 drive: &mut dyn TapeDriver,
347 old_set: Option<MediaSetLabel>,
348 media_id: &MediaId,
349 ) -> Result<MediaCatalog, Error> {
350
351 let media_catalog;
352
353 let new_set = match media_id.media_set_label {
354 None => bail!("got media without media set - internal error"),
355 Some(ref set) => set,
356 };
357
358 let status_path = Path::new(TAPE_STATUS_DIR);
359
360 match old_set {
361 None => {
362 worker.log(format!("wrinting new media set label"));
363 drive.write_media_set_label(new_set)?;
364 media_catalog = MediaCatalog::overwrite(status_path, media_id, true)?;
365 }
366 Some(media_set_label) => {
367 if new_set.uuid == media_set_label.uuid {
368 if new_set.seq_nr != media_set_label.seq_nr {
369 bail!("got media with wrong media sequence number ({} != {}",
370 new_set.seq_nr,media_set_label.seq_nr);
371 }
372 media_catalog = MediaCatalog::open(status_path, &media_id.label.uuid, true, false)?;
373 } else {
374 worker.log(
375 format!("wrinting new media set label (overwrite '{}/{}')",
376 media_set_label.uuid.to_string(), media_set_label.seq_nr)
377 );
378
379 drive.write_media_set_label(new_set)?;
380 media_catalog = MediaCatalog::overwrite(status_path, media_id, true)?;
381 }
382 }
383 }
384
385 // todo: verify last content/media_catalog somehow?
386 drive.move_to_eom()?;
387
388 Ok(media_catalog)
389 }