]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer.rs
tape: add PoolWriter
[proxmox-backup.git] / src / tape / pool_writer.rs
1 use std::collections::HashSet;
2 use std::path::Path;
3
4 use anyhow::{bail, Error};
5
6 use proxmox::{
7 tools::Uuid,
8 api::section_config::SectionConfigData,
9 };
10
11 use crate::{
12 backup::{
13 DataStore,
14 },
15 tape::{
16 TAPE_STATUS_DIR,
17 MAX_CHUNK_ARCHIVE_SIZE,
18 COMMIT_BLOCK_SIZE,
19 TapeDriver,
20 TapeWrite,
21 ChunkArchiveWriter,
22 SnapshotReader,
23 SnapshotChunkIterator,
24 MediaPool,
25 MediaId,
26 MediaCatalog,
27 MediaSetCatalog,
28 tape_write_snapshot_archive,
29 request_and_load_media,
30 },
31 };
32
33
34 struct PoolWriterState {
35 drive: Box<dyn TapeDriver>,
36 catalog: MediaCatalog,
37 // tell if we already moved to EOM
38 at_eom: bool,
39 // bytes written after the last tape fush/sync
40 bytes_written: usize,
41 }
42
43 impl PoolWriterState {
44
45 fn commit(&mut self) -> Result<(), Error> {
46 self.drive.sync()?; // sync all data to the tape
47 self.catalog.commit()?; // then commit the catalog
48 self.bytes_written = 0;
49 Ok(())
50 }
51 }
52
53 /// Helper to manage a backup job, writing several tapes of a pool
54 pub struct PoolWriter {
55 pool: MediaPool,
56 drive_name: String,
57 status: Option<PoolWriterState>,
58 media_set_catalog: MediaSetCatalog,
59 }
60
61 impl PoolWriter {
62
63 pub fn new(mut pool: MediaPool, drive_name: &str) -> Result<Self, Error> {
64
65 let current_time = proxmox::tools::time::epoch_i64();
66
67 pool.start_write_session(current_time)?;
68
69 let mut media_set_catalog = MediaSetCatalog::new();
70
71 // load all catalogs read-only at start
72 for media_uuid in pool.current_media_list()? {
73 let media_catalog = MediaCatalog::open(
74 Path::new(TAPE_STATUS_DIR),
75 &media_uuid,
76 false,
77 false,
78 )?;
79 media_set_catalog.append_catalog(media_catalog)?;
80 }
81
82 Ok(Self {
83 pool,
84 drive_name: drive_name.to_string(),
85 status: None,
86 media_set_catalog,
87 })
88 }
89
90 pub fn pool(&mut self) -> &mut MediaPool {
91 &mut self.pool
92 }
93
94 /// Set media status to FULL (persistent - stores pool status)
95 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
96 self.pool.set_media_status_full(&uuid)?;
97 Ok(())
98 }
99
100 pub fn contains_snapshot(&self, snapshot: &str) -> bool {
101 if let Some(PoolWriterState { ref catalog, .. }) = self.status {
102 if catalog.contains_snapshot(snapshot) {
103 return true;
104 }
105 }
106 self.media_set_catalog.contains_snapshot(snapshot)
107 }
108
109 /// commit changes to tape and catalog
110 ///
111 /// This is done automatically during a backupsession, but needs to
112 /// be called explicitly before dropping the PoolWriter
113 pub fn commit(&mut self) -> Result<(), Error> {
114 if let Some(ref mut status) = self.status {
115 status.commit()?;
116 }
117 Ok(())
118 }
119
120 /// Load a writable media into the drive
121 pub fn load_writable_media(&mut self) -> Result<Uuid, Error> {
122 let last_media_uuid = match self.status {
123 Some(PoolWriterState { ref catalog, .. }) => Some(catalog.uuid().clone()),
124 None => None,
125 };
126
127 let current_time = proxmox::tools::time::epoch_i64();
128 let media_uuid = self.pool.alloc_writable_media(current_time)?;
129
130 let media = self.pool.lookup_media(&media_uuid).unwrap();
131
132 let media_changed = match last_media_uuid {
133 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
134 None => true,
135 };
136
137 if !media_changed {
138 return Ok(media_uuid);
139 }
140
141 // remove read-only catalog (we store a writable version in status)
142 self.media_set_catalog.remove_catalog(&media_uuid);
143
144 if let Some(PoolWriterState {mut drive, catalog, .. }) = self.status.take() {
145 self.media_set_catalog.append_catalog(catalog)?;
146 drive.eject_media()?;
147 }
148
149 let (drive_config, _digest) = crate::config::drive::config()?;
150 let (drive, catalog) = drive_load_and_label_media(&drive_config, &self.drive_name, &media.id())?;
151 self.status = Some(PoolWriterState { drive, catalog, at_eom: false, bytes_written: 0 });
152
153 Ok(media_uuid)
154 }
155
156 /// uuid of currently loaded BackupMedia
157 pub fn current_media_uuid(&self) -> Result<&Uuid, Error> {
158 match self.status {
159 Some(PoolWriterState { ref catalog, ..}) => Ok(catalog.uuid()),
160 None => bail!("PoolWriter - no media loaded"),
161 }
162 }
163
164 /// Move to EOM (if not aleady there), then creates a new snapshot
165 /// archive writing specified files (as .pxar) into it. On
166 /// success, this return 'Ok(true)' and the media catalog gets
167 /// updated.
168
169 /// Please note that this may fail when there is not enough space
170 /// on the media (return value 'Ok(false, _)'). In that case, the
171 /// archive is marked incomplete, and we do not use it. The caller
172 /// should mark the media as full and try again using another
173 /// media.
174 pub fn append_snapshot_archive(
175 &mut self,
176 snapshot_reader: &SnapshotReader,
177 ) -> Result<(bool, usize), Error> {
178
179 let status = match self.status {
180 Some(ref mut status) => status,
181 None => bail!("PoolWriter - no media loaded"),
182 };
183
184 if !status.at_eom {
185 status.drive.move_to_eom()?;
186 status.at_eom = true;
187 }
188
189 let current_file_number = status.drive.current_file_number()?;
190 if current_file_number < 2 {
191 bail!("got strange file position number from drive ({})", current_file_number);
192 }
193
194 let (done, bytes_written) = {
195 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
196
197 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
198 Some(content_uuid) => {
199 status.catalog.register_snapshot(
200 content_uuid,
201 current_file_number,
202 &snapshot_reader.snapshot().to_string(),
203 )?;
204 (true, writer.bytes_written())
205 }
206 None => (false, writer.bytes_written()),
207 }
208 };
209
210 status.bytes_written += bytes_written;
211
212 let request_sync = if status.bytes_written >= COMMIT_BLOCK_SIZE { true } else { false };
213
214 if !done || request_sync {
215 status.commit()?;
216 }
217
218 Ok((done, bytes_written))
219 }
220
221 /// Move to EOM (if not aleady there), then creates a new chunk
222 /// archive and writes chunks from 'chunk_iter'. This stops when
223 /// it detect LEOM or when we reach max archive size
224 /// (4GB). Written chunks are registered in the media catalog.
225 pub fn append_chunk_archive(
226 &mut self,
227 datastore: &DataStore,
228 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
229 ) -> Result<(bool, usize), Error> {
230
231 let status = match self.status {
232 Some(ref mut status) => status,
233 None => bail!("PoolWriter - no media loaded"),
234 };
235
236 if !status.at_eom {
237 status.drive.move_to_eom()?;
238 status.at_eom = true;
239 }
240
241 let current_file_number = status.drive.current_file_number()?;
242 if current_file_number < 2 {
243 bail!("got strange file position number from drive ({})", current_file_number);
244 }
245 let writer = status.drive.write_file()?;
246
247 let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
248 writer,
249 datastore,
250 chunk_iter,
251 &self.media_set_catalog,
252 &status.catalog,
253 MAX_CHUNK_ARCHIVE_SIZE,
254 )?;
255
256 status.bytes_written += bytes_written;
257
258 let request_sync = if status.bytes_written >= COMMIT_BLOCK_SIZE { true } else { false };
259
260 // register chunks in media_catalog
261 status.catalog.start_chunk_archive(content_uuid, current_file_number)?;
262 for digest in saved_chunks {
263 status.catalog.register_chunk(&digest)?;
264 }
265 status.catalog.end_chunk_archive()?;
266
267 if leom || request_sync {
268 status.commit()?;
269 }
270
271 Ok((leom, bytes_written))
272 }
273 }
274
275 /// write up to <max_size> of chunks
276 fn write_chunk_archive<'a>(
277 writer: Box<dyn 'a + TapeWrite>,
278 datastore: &DataStore,
279 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
280 media_set_catalog: &MediaSetCatalog,
281 media_catalog: &MediaCatalog,
282 max_size: usize,
283 ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
284
285 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?;
286
287 let mut chunk_index: HashSet<[u8;32]> = HashSet::new();
288
289 // we want to get the chunk list in correct order
290 let mut chunk_list: Vec<[u8;32]> = Vec::new();
291
292 let mut leom = false;
293
294 loop {
295 let digest = match chunk_iter.next() {
296 None => break,
297 Some(digest) => digest?,
298 };
299 if media_catalog.contains_chunk(&digest)
300 || chunk_index.contains(&digest)
301 || media_set_catalog.contains_chunk(&digest)
302 {
303 continue;
304 }
305
306 let blob = datastore.load_chunk(&digest)?;
307 println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(&digest), blob.raw_size());
308
309 match writer.try_write_chunk(&digest, &blob) {
310 Ok(true) => {
311 chunk_index.insert(digest);
312 chunk_list.push(digest);
313 }
314 Ok(false) => {
315 leom = true;
316 break;
317 }
318 Err(err) => bail!("write chunk failed - {}", err),
319 }
320
321 if writer.bytes_written() > max_size {
322 println!("Chunk Archive max size reached, closing archive");
323 break;
324 }
325 }
326
327 writer.finish()?;
328
329 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
330 }
331
332 // Requests and load 'media' into the drive. Then compare the media
333 // set label. If the tabe is empty, or the existing set label does not
334 // match the expected media set, overwrite the media set label.
335 fn drive_load_and_label_media(
336 drive_config: &SectionConfigData,
337 drive_name: &str,
338 media_id: &MediaId,
339 ) -> Result<(Box<dyn TapeDriver>, MediaCatalog), Error> {
340
341 let (mut tmp_drive, info) =
342 request_and_load_media(&drive_config, &drive_name, &media_id.label)?;
343
344 let media_catalog;
345
346 let new_set = match media_id.media_set_label {
347 None => {
348 bail!("got media without media set - internal error");
349 }
350 Some(ref set) => set,
351 };
352
353 let status_path = Path::new(TAPE_STATUS_DIR);
354
355 match &info.media_set_label {
356 None => {
357 println!("wrinting new media set label");
358 tmp_drive.write_media_set_label(new_set)?;
359
360 let info = MediaId {
361 label: info.label,
362 media_set_label: Some(new_set.clone()),
363 };
364 media_catalog = MediaCatalog::overwrite(status_path, &info, true)?;
365 }
366 Some(media_set_label) => {
367 if new_set.uuid == media_set_label.uuid {
368 if new_set.seq_nr != media_set_label.seq_nr {
369 bail!("got media with wrong media sequence number ({} != {}",
370 new_set.seq_nr,media_set_label.seq_nr);
371 }
372 media_catalog = MediaCatalog::open(status_path, &media_id.label.uuid, true, false)?;
373 } else {
374 println!("wrinting new media set label (overwrite '{}/{}')",
375 media_set_label.uuid.to_string(), media_set_label.seq_nr);
376
377 tmp_drive.write_media_set_label(new_set)?;
378
379 let info = MediaId {
380 label: info.label,
381 media_set_label: Some(new_set.clone()),
382 };
383 media_catalog = MediaCatalog::overwrite(status_path, &info, true)?;
384 }
385 }
386 }
387
388 // todo: verify last content/media_catalog somehow?
389 tmp_drive.move_to_eom()?;
390
391 Ok((tmp_drive, media_catalog))
392 }