]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer.rs
tape: improve media request/load
[proxmox-backup.git] / src / tape / pool_writer.rs
1 use std::collections::HashSet;
2 use std::path::Path;
3
4 use anyhow::{bail, Error};
5
6 use proxmox::{
7 tools::Uuid,
8 api::section_config::SectionConfigData,
9 };
10
11 use crate::{
12 backup::{
13 DataStore,
14 },
15 server::WorkerTask,
16 tape::{
17 TAPE_STATUS_DIR,
18 MAX_CHUNK_ARCHIVE_SIZE,
19 COMMIT_BLOCK_SIZE,
20 TapeDriver,
21 TapeWrite,
22 ChunkArchiveWriter,
23 SnapshotReader,
24 SnapshotChunkIterator,
25 MediaPool,
26 MediaId,
27 MediaCatalog,
28 MediaSetCatalog,
29 tape_write_snapshot_archive,
30 request_and_load_media,
31 },
32 };
33
34
35 struct PoolWriterState {
36 drive: Box<dyn TapeDriver>,
37 catalog: MediaCatalog,
38 // tell if we already moved to EOM
39 at_eom: bool,
40 // bytes written after the last tape fush/sync
41 bytes_written: usize,
42 }
43
44 impl PoolWriterState {
45
46 fn commit(&mut self) -> Result<(), Error> {
47 self.drive.sync()?; // sync all data to the tape
48 self.catalog.commit()?; // then commit the catalog
49 self.bytes_written = 0;
50 Ok(())
51 }
52 }
53
54 /// Helper to manage a backup job, writing several tapes of a pool
55 pub struct PoolWriter {
56 pool: MediaPool,
57 drive_name: String,
58 status: Option<PoolWriterState>,
59 media_set_catalog: MediaSetCatalog,
60 }
61
62 impl PoolWriter {
63
64 pub fn new(mut pool: MediaPool, drive_name: &str) -> Result<Self, Error> {
65
66 let current_time = proxmox::tools::time::epoch_i64();
67
68 pool.start_write_session(current_time)?;
69
70 let mut media_set_catalog = MediaSetCatalog::new();
71
72 // load all catalogs read-only at start
73 for media_uuid in pool.current_media_list()? {
74 let media_catalog = MediaCatalog::open(
75 Path::new(TAPE_STATUS_DIR),
76 &media_uuid,
77 false,
78 false,
79 )?;
80 media_set_catalog.append_catalog(media_catalog)?;
81 }
82
83 Ok(Self {
84 pool,
85 drive_name: drive_name.to_string(),
86 status: None,
87 media_set_catalog,
88 })
89 }
90
91 pub fn pool(&mut self) -> &mut MediaPool {
92 &mut self.pool
93 }
94
95 /// Set media status to FULL (persistent - stores pool status)
96 pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> {
97 self.pool.set_media_status_full(&uuid)?;
98 Ok(())
99 }
100
101 pub fn contains_snapshot(&self, snapshot: &str) -> bool {
102 if let Some(PoolWriterState { ref catalog, .. }) = self.status {
103 if catalog.contains_snapshot(snapshot) {
104 return true;
105 }
106 }
107 self.media_set_catalog.contains_snapshot(snapshot)
108 }
109
110 /// commit changes to tape and catalog
111 ///
112 /// This is done automatically during a backupsession, but needs to
113 /// be called explicitly before dropping the PoolWriter
114 pub fn commit(&mut self) -> Result<(), Error> {
115 if let Some(ref mut status) = self.status {
116 status.commit()?;
117 }
118 Ok(())
119 }
120
121 /// Load a writable media into the drive
122 pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
123 let last_media_uuid = match self.status {
124 Some(PoolWriterState { ref catalog, .. }) => Some(catalog.uuid().clone()),
125 None => None,
126 };
127
128 let current_time = proxmox::tools::time::epoch_i64();
129 let media_uuid = self.pool.alloc_writable_media(current_time)?;
130
131 let media = self.pool.lookup_media(&media_uuid).unwrap();
132
133 let media_changed = match last_media_uuid {
134 Some(ref last_media_uuid) => last_media_uuid != &media_uuid,
135 None => true,
136 };
137
138 if !media_changed {
139 return Ok(media_uuid);
140 }
141
142 // remove read-only catalog (we store a writable version in status)
143 self.media_set_catalog.remove_catalog(&media_uuid);
144
145 if let Some(PoolWriterState {mut drive, catalog, .. }) = self.status.take() {
146 self.media_set_catalog.append_catalog(catalog)?;
147 drive.eject_media()?;
148 }
149
150 let (drive_config, _digest) = crate::config::drive::config()?;
151 let (drive, catalog) = drive_load_and_label_media(worker, &drive_config, &self.drive_name, &media.id())?;
152 self.status = Some(PoolWriterState { drive, catalog, at_eom: false, bytes_written: 0 });
153
154 Ok(media_uuid)
155 }
156
157 /// uuid of currently loaded BackupMedia
158 pub fn current_media_uuid(&self) -> Result<&Uuid, Error> {
159 match self.status {
160 Some(PoolWriterState { ref catalog, ..}) => Ok(catalog.uuid()),
161 None => bail!("PoolWriter - no media loaded"),
162 }
163 }
164
165 /// Move to EOM (if not aleady there), then creates a new snapshot
166 /// archive writing specified files (as .pxar) into it. On
167 /// success, this return 'Ok(true)' and the media catalog gets
168 /// updated.
169
170 /// Please note that this may fail when there is not enough space
171 /// on the media (return value 'Ok(false, _)'). In that case, the
172 /// archive is marked incomplete, and we do not use it. The caller
173 /// should mark the media as full and try again using another
174 /// media.
175 pub fn append_snapshot_archive(
176 &mut self,
177 snapshot_reader: &SnapshotReader,
178 ) -> Result<(bool, usize), Error> {
179
180 let status = match self.status {
181 Some(ref mut status) => status,
182 None => bail!("PoolWriter - no media loaded"),
183 };
184
185 if !status.at_eom {
186 status.drive.move_to_eom()?;
187 status.at_eom = true;
188 }
189
190 let current_file_number = status.drive.current_file_number()?;
191 if current_file_number < 2 {
192 bail!("got strange file position number from drive ({})", current_file_number);
193 }
194
195 let (done, bytes_written) = {
196 let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?;
197
198 match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
199 Some(content_uuid) => {
200 status.catalog.register_snapshot(
201 content_uuid,
202 current_file_number,
203 &snapshot_reader.snapshot().to_string(),
204 )?;
205 (true, writer.bytes_written())
206 }
207 None => (false, writer.bytes_written()),
208 }
209 };
210
211 status.bytes_written += bytes_written;
212
213 let request_sync = if status.bytes_written >= COMMIT_BLOCK_SIZE { true } else { false };
214
215 if !done || request_sync {
216 status.commit()?;
217 }
218
219 Ok((done, bytes_written))
220 }
221
222 /// Move to EOM (if not aleady there), then creates a new chunk
223 /// archive and writes chunks from 'chunk_iter'. This stops when
224 /// it detect LEOM or when we reach max archive size
225 /// (4GB). Written chunks are registered in the media catalog.
226 pub fn append_chunk_archive(
227 &mut self,
228 datastore: &DataStore,
229 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
230 ) -> Result<(bool, usize), Error> {
231
232 let status = match self.status {
233 Some(ref mut status) => status,
234 None => bail!("PoolWriter - no media loaded"),
235 };
236
237 if !status.at_eom {
238 status.drive.move_to_eom()?;
239 status.at_eom = true;
240 }
241
242 let current_file_number = status.drive.current_file_number()?;
243 if current_file_number < 2 {
244 bail!("got strange file position number from drive ({})", current_file_number);
245 }
246 let writer = status.drive.write_file()?;
247
248 let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
249 writer,
250 datastore,
251 chunk_iter,
252 &self.media_set_catalog,
253 &status.catalog,
254 MAX_CHUNK_ARCHIVE_SIZE,
255 )?;
256
257 status.bytes_written += bytes_written;
258
259 let request_sync = if status.bytes_written >= COMMIT_BLOCK_SIZE { true } else { false };
260
261 // register chunks in media_catalog
262 status.catalog.start_chunk_archive(content_uuid, current_file_number)?;
263 for digest in saved_chunks {
264 status.catalog.register_chunk(&digest)?;
265 }
266 status.catalog.end_chunk_archive()?;
267
268 if leom || request_sync {
269 status.commit()?;
270 }
271
272 Ok((leom, bytes_written))
273 }
274 }
275
276 /// write up to <max_size> of chunks
277 fn write_chunk_archive<'a>(
278 writer: Box<dyn 'a + TapeWrite>,
279 datastore: &DataStore,
280 chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
281 media_set_catalog: &MediaSetCatalog,
282 media_catalog: &MediaCatalog,
283 max_size: usize,
284 ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
285
286 let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?;
287
288 let mut chunk_index: HashSet<[u8;32]> = HashSet::new();
289
290 // we want to get the chunk list in correct order
291 let mut chunk_list: Vec<[u8;32]> = Vec::new();
292
293 let mut leom = false;
294
295 loop {
296 let digest = match chunk_iter.next() {
297 None => break,
298 Some(digest) => digest?,
299 };
300 if media_catalog.contains_chunk(&digest)
301 || chunk_index.contains(&digest)
302 || media_set_catalog.contains_chunk(&digest)
303 {
304 continue;
305 }
306
307 let blob = datastore.load_chunk(&digest)?;
308 println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(&digest), blob.raw_size());
309
310 match writer.try_write_chunk(&digest, &blob) {
311 Ok(true) => {
312 chunk_index.insert(digest);
313 chunk_list.push(digest);
314 }
315 Ok(false) => {
316 leom = true;
317 break;
318 }
319 Err(err) => bail!("write chunk failed - {}", err),
320 }
321
322 if writer.bytes_written() > max_size {
323 println!("Chunk Archive max size reached, closing archive");
324 break;
325 }
326 }
327
328 writer.finish()?;
329
330 Ok((chunk_list, content_uuid, leom, writer.bytes_written()))
331 }
332
333 // Requests and load 'media' into the drive. Then compare the media
334 // set label. If the tabe is empty, or the existing set label does not
335 // match the expected media set, overwrite the media set label.
336 fn drive_load_and_label_media(
337 worker: &WorkerTask,
338 drive_config: &SectionConfigData,
339 drive_name: &str,
340 media_id: &MediaId,
341 ) -> Result<(Box<dyn TapeDriver>, MediaCatalog), Error> {
342
343 let (mut tmp_drive, info) =
344 request_and_load_media(worker, &drive_config, &drive_name, &media_id.label)?;
345
346 let media_catalog;
347
348 let new_set = match media_id.media_set_label {
349 None => {
350 bail!("got media without media set - internal error");
351 }
352 Some(ref set) => set,
353 };
354
355 let status_path = Path::new(TAPE_STATUS_DIR);
356
357 match &info.media_set_label {
358 None => {
359 println!("wrinting new media set label");
360 tmp_drive.write_media_set_label(new_set)?;
361
362 let info = MediaId {
363 label: info.label,
364 media_set_label: Some(new_set.clone()),
365 };
366 media_catalog = MediaCatalog::overwrite(status_path, &info, true)?;
367 }
368 Some(media_set_label) => {
369 if new_set.uuid == media_set_label.uuid {
370 if new_set.seq_nr != media_set_label.seq_nr {
371 bail!("got media with wrong media sequence number ({} != {}",
372 new_set.seq_nr,media_set_label.seq_nr);
373 }
374 media_catalog = MediaCatalog::open(status_path, &media_id.label.uuid, true, false)?;
375 } else {
376 println!("wrinting new media set label (overwrite '{}/{}')",
377 media_set_label.uuid.to_string(), media_set_label.seq_nr);
378
379 tmp_drive.write_media_set_label(new_set)?;
380
381 let info = MediaId {
382 label: info.label,
383 media_set_label: Some(new_set.clone()),
384 };
385 media_catalog = MediaCatalog::overwrite(status_path, &info, true)?;
386 }
387 }
388 }
389
390 // todo: verify last content/media_catalog somehow?
391 tmp_drive.move_to_eom()?;
392
393 Ok((tmp_drive, media_catalog))
394 }