]> git.proxmox.com Git - proxmox-backup.git/blame - pbs-tape/src/blocked_reader.rs
update to proxmox-sys 0.2 crate
[proxmox-backup.git] / pbs-tape / src / blocked_reader.rs
CommitLineData
2e7014e3
DM
1use std::io::Read;
2
048b43af 3use crate::{
2e7014e3 4 TapeRead,
0db57124 5 BlockRead,
318b3106 6 BlockReadError,
048b43af
DM
7 PROXMOX_TAPE_BLOCK_HEADER_MAGIC_1_0,
8 BlockHeader,
9 BlockHeaderFlags,
2e7014e3
DM
10};
11
12/// Read a block stream generated by 'BlockWriter'.
13///
14/// This class implements 'TapeRead'. It always read whole blocks from
15/// the underlying reader, and does additional error checks:
16///
17/// - check magic number (detect streams not written by 'BlockWriter')
18/// - check block size
19/// - check block sequence numbers
20///
21/// The reader consumes the EOF mark after the data stream (if read to
22/// the end of the stream).
23pub struct BlockedReader<R> {
24 reader: R,
25 buffer: Box<BlockHeader>,
26 seq_nr: u32,
27 found_end_marker: bool,
28 incomplete: bool,
29 got_eod: bool,
30 read_error: bool,
31 read_pos: usize,
32}
33
0db57124 34impl <R: BlockRead> BlockedReader<R> {
2e7014e3
DM
35
36 /// Create a new BlockedReader instance.
37 ///
318b3106
DM
38 /// This tries to read the first block. Please inspect the error
39 /// to detect EOF and EOT.
40 pub fn open(mut reader: R) -> Result<Self, BlockReadError> {
2e7014e3
DM
41
42 let mut buffer = BlockHeader::new();
43
318b3106 44 Self::read_block_frame(&mut buffer, &mut reader)?;
2e7014e3
DM
45
46 let (_size, found_end_marker) = Self::check_buffer(&buffer, 0)?;
47
48 let mut incomplete = false;
7d2c156e
DM
49 let mut got_eod = false;
50
2e7014e3
DM
51 if found_end_marker {
52 incomplete = buffer.flags.contains(BlockHeaderFlags::INCOMPLETE);
7d2c156e
DM
53 Self::consume_eof_marker(&mut reader)?;
54 got_eod = true;
2e7014e3 55 }
7d2c156e 56
318b3106 57 Ok(Self {
2e7014e3
DM
58 reader,
59 buffer,
60 found_end_marker,
61 incomplete,
7d2c156e 62 got_eod,
2e7014e3 63 seq_nr: 1,
2e7014e3
DM
64 read_error: false,
65 read_pos: 0,
318b3106 66 })
2e7014e3
DM
67 }
68
69 fn check_buffer(buffer: &BlockHeader, seq_nr: u32) -> Result<(usize, bool), std::io::Error> {
70
71 if buffer.magic != PROXMOX_TAPE_BLOCK_HEADER_MAGIC_1_0 {
25877d05 72 proxmox_sys::io_bail!("detected tape block with wrong magic number - not written by proxmox tape");
2e7014e3
DM
73 }
74
75 if seq_nr != buffer.seq_nr() {
25877d05 76 proxmox_sys::io_bail!(
d1d74c43 77 "detected tape block with wrong sequence number ({} != {})",
2e7014e3
DM
78 seq_nr, buffer.seq_nr())
79 }
80
81 let size = buffer.size();
82 let found_end_marker = buffer.flags.contains(BlockHeaderFlags::END_OF_STREAM);
83
84 if size > buffer.payload.len() {
25877d05 85 proxmox_sys::io_bail!("detected tape block with wrong payload size ({} > {}", size, buffer.payload.len());
6334bdc1 86 } else if size == 0 && !found_end_marker {
25877d05 87 proxmox_sys::io_bail!("detected tape block with zero payload size");
2e7014e3
DM
88 }
89
90
91 Ok((size, found_end_marker))
92 }
93
318b3106 94 fn read_block_frame(buffer: &mut BlockHeader, reader: &mut R) -> Result<(), BlockReadError> {
2e7014e3
DM
95
96 let data = unsafe {
97 std::slice::from_raw_parts_mut(
98 (buffer as *mut BlockHeader) as *mut u8,
99 BlockHeader::SIZE,
100 )
101 };
102
318b3106
DM
103 let bytes = reader.read_block(data)?;
104
105 if bytes != BlockHeader::SIZE {
25877d05 106 return Err(proxmox_sys::io_format_err!("got wrong block size").into());
0db57124 107 }
318b3106
DM
108
109 Ok(())
2e7014e3
DM
110 }
111
7d2c156e
DM
112 fn consume_eof_marker(reader: &mut R) -> Result<(), std::io::Error> {
113 let mut tmp_buf = [0u8; 512]; // use a small buffer for testing EOF
0db57124 114 match reader.read_block(&mut tmp_buf) {
318b3106 115 Ok(_) => {
25877d05 116 proxmox_sys::io_bail!("detected tape block after block-stream end marker");
0db57124 117 }
318b3106 118 Err(BlockReadError::EndOfFile) => {
0db57124
DM
119 return Ok(());
120 }
318b3106 121 Err(BlockReadError::EndOfStream) => {
25877d05 122 proxmox_sys::io_bail!("got unexpected end of tape");
0db57124 123 }
318b3106 124 Err(BlockReadError::Error(err)) => {
0db57124
DM
125 return Err(err);
126 }
7d2c156e 127 }
7d2c156e
DM
128 }
129
90461b76 130 fn read_block(&mut self, check_end_marker: bool) -> Result<usize, std::io::Error> {
2e7014e3 131
318b3106
DM
132 match Self::read_block_frame(&mut self.buffer, &mut self.reader) {
133 Ok(()) => { /* ok */ }
134 Err(BlockReadError::EndOfFile) => {
135 self.got_eod = true;
136 self.read_pos = self.buffer.payload.len();
90461b76 137 if !self.found_end_marker && check_end_marker {
25877d05 138 proxmox_sys::io_bail!("detected tape stream without end marker");
318b3106
DM
139 }
140 return Ok(0); // EOD
141 }
142 Err(BlockReadError::EndOfStream) => {
25877d05 143 proxmox_sys::io_bail!("got unexpected end of tape");
318b3106
DM
144 }
145 Err(BlockReadError::Error(err)) => {
146 return Err(err);
2e7014e3 147 }
2e7014e3
DM
148 }
149
150 let (size, found_end_marker) = Self::check_buffer(&self.buffer, self.seq_nr)?;
151 self.seq_nr += 1;
152
153 if found_end_marker { // consume EOF mark
154 self.found_end_marker = true;
155 self.incomplete = self.buffer.flags.contains(BlockHeaderFlags::INCOMPLETE);
7d2c156e
DM
156 Self::consume_eof_marker(&mut self.reader)?;
157 self.got_eod = true;
2e7014e3
DM
158 }
159
160 self.read_pos = 0;
161
162 Ok(size)
163 }
164}
165
0db57124 166impl <R: BlockRead> TapeRead for BlockedReader<R> {
2e7014e3
DM
167
168 fn is_incomplete(&self) -> Result<bool, std::io::Error> {
169 if !self.got_eod {
25877d05 170 proxmox_sys::io_bail!("is_incomplete failed: EOD not reached");
2e7014e3
DM
171 }
172 if !self.found_end_marker {
25877d05 173 proxmox_sys::io_bail!("is_incomplete failed: no end marker found");
2e7014e3
DM
174 }
175
176 Ok(self.incomplete)
177 }
178
179 fn has_end_marker(&self) -> Result<bool, std::io::Error> {
180 if !self.got_eod {
25877d05 181 proxmox_sys::io_bail!("has_end_marker failed: EOD not reached");
2e7014e3
DM
182 }
183
184 Ok(self.found_end_marker)
185 }
90461b76
DM
186
187 // like ReadExt::skip_to_end(), but does not raise an error if the
188 // stream has no end marker.
189 fn skip_data(&mut self) -> Result<usize, std::io::Error> {
190 let mut bytes = 0;
191 let buffer_size = self.buffer.size();
192 let rest = (buffer_size as isize) - (self.read_pos as isize);
193 if rest > 0 {
194 bytes = rest as usize;
195 }
196 loop {
197 if self.got_eod {
198 return Ok(bytes);
199 }
200 bytes += self.read_block(false)?;
201 }
202 }
2e7014e3
DM
203}
204
0db57124 205impl <R: BlockRead> Read for BlockedReader<R> {
2e7014e3
DM
206
207 fn read(&mut self, buffer: &mut [u8]) -> Result<usize, std::io::Error> {
208
209 if self.read_error {
25877d05 210 proxmox_sys::io_bail!("detected read after error - internal error");
2e7014e3
DM
211 }
212
213 let mut buffer_size = self.buffer.size();
214 let mut rest = (buffer_size as isize) - (self.read_pos as isize);
215
216 if rest <= 0 && !self.got_eod { // try to refill buffer
90461b76 217 buffer_size = match self.read_block(true) {
2e7014e3
DM
218 Ok(len) => len,
219 err => {
220 self.read_error = true;
221 return err;
222 }
223 };
224 rest = buffer_size as isize;
225 }
226
227 if rest <= 0 {
38556bf6 228 Ok(0)
2e7014e3
DM
229 } else {
230 let copy_len = if (buffer.len() as isize) < rest {
231 buffer.len()
232 } else {
233 rest as usize
234 };
235 buffer[..copy_len].copy_from_slice(
236 &self.buffer.payload[self.read_pos..(self.read_pos + copy_len)]);
237 self.read_pos += copy_len;
38556bf6 238 Ok(copy_len)
2e7014e3
DM
239 }
240 }
241}
242
243#[cfg(test)]
244mod test {
245 use std::io::Read;
ce5327ba 246 use anyhow::{bail, Error};
048b43af 247 use crate::{
2e7014e3 248 TapeWrite,
ce5327ba 249 BlockReadError,
048b43af
DM
250 EmulateTapeReader,
251 EmulateTapeWriter,
252 PROXMOX_TAPE_BLOCK_SIZE,
253 BlockedReader,
254 BlockedWriter,
2e7014e3
DM
255 };
256
257 fn write_and_verify(data: &[u8]) -> Result<(), Error> {
258
259 let mut tape_data = Vec::new();
260
0db57124
DM
261 {
262 let writer = EmulateTapeWriter::new(&mut tape_data, 1024*1024*10);
263 let mut writer = BlockedWriter::new(writer);
2e7014e3 264
0db57124 265 writer.write_all(data)?;
2e7014e3 266
0db57124
DM
267 writer.finish(false)?;
268 }
2e7014e3
DM
269
270 assert_eq!(
271 tape_data.len(),
272 ((data.len() + PROXMOX_TAPE_BLOCK_SIZE)/PROXMOX_TAPE_BLOCK_SIZE)
273 *PROXMOX_TAPE_BLOCK_SIZE
274 );
275
276 let reader = &mut &tape_data[..];
0db57124 277 let reader = EmulateTapeReader::new(reader);
ce5327ba 278 let mut reader = BlockedReader::open(reader)?;
2e7014e3
DM
279
280 let mut read_data = Vec::with_capacity(PROXMOX_TAPE_BLOCK_SIZE);
281 reader.read_to_end(&mut read_data)?;
282
283 assert_eq!(data.len(), read_data.len());
284
285 assert_eq!(data, &read_data[..]);
286
287 Ok(())
288 }
289
290 #[test]
291 fn empty_stream() -> Result<(), Error> {
292 write_and_verify(b"")
293 }
294
295 #[test]
296 fn small_data() -> Result<(), Error> {
297 write_and_verify(b"ABC")
298 }
299
300 #[test]
301 fn large_data() -> Result<(), Error> {
25877d05 302 let data = proxmox_sys::linux::random_data(1024*1024*5)?;
2e7014e3
DM
303 write_and_verify(&data)
304 }
305
306 #[test]
307 fn no_data() -> Result<(), Error> {
308 let tape_data = Vec::new();
309 let reader = &mut &tape_data[..];
0db57124 310 let reader = EmulateTapeReader::new(reader);
ce5327ba
DM
311 match BlockedReader::open(reader) {
312 Err(BlockReadError::EndOfFile) => { /* OK */ },
313 _ => bail!("expected EOF"),
314 }
2e7014e3
DM
315
316 Ok(())
317 }
318
319 #[test]
320 fn no_end_marker() -> Result<(), Error> {
321 let mut tape_data = Vec::new();
322 {
0db57124
DM
323 let writer = EmulateTapeWriter::new(&mut tape_data, 1024*1024);
324 let mut writer = BlockedWriter::new(writer);
2e7014e3 325 // write at least one block
25877d05 326 let data = proxmox_sys::linux::random_data(PROXMOX_TAPE_BLOCK_SIZE)?;
2e7014e3
DM
327 writer.write_all(&data)?;
328 // but do not call finish here
329 }
0db57124 330
2e7014e3 331 let reader = &mut &tape_data[..];
0db57124 332 let reader = EmulateTapeReader::new(reader);
ce5327ba 333 let mut reader = BlockedReader::open(reader)?;
2e7014e3
DM
334
335 let mut data = Vec::with_capacity(PROXMOX_TAPE_BLOCK_SIZE);
336 assert!(reader.read_to_end(&mut data).is_err());
337
338 Ok(())
339 }
340
341 #[test]
342 fn small_read_buffer() -> Result<(), Error> {
343 let mut tape_data = Vec::new();
344
0db57124
DM
345 {
346 let writer = EmulateTapeWriter::new(&mut tape_data, 1024*1024);
347 let mut writer = BlockedWriter::new(writer);
2e7014e3 348
0db57124 349 writer.write_all(b"ABC")?;
2e7014e3 350
0db57124
DM
351 writer.finish(false)?;
352 }
2e7014e3
DM
353
354 let reader = &mut &tape_data[..];
0db57124 355 let reader = EmulateTapeReader::new(reader);
ce5327ba 356 let mut reader = BlockedReader::open(reader)?;
2e7014e3
DM
357
358 let mut buf = [0u8; 1];
359 assert_eq!(reader.read(&mut buf)?, 1, "wrong byte count");
360 assert_eq!(&buf, b"A");
361 assert_eq!(reader.read(&mut buf)?, 1, "wrong byte count");
362 assert_eq!(&buf, b"B");
363 assert_eq!(reader.read(&mut buf)?, 1, "wrong byte count");
364 assert_eq!(&buf, b"C");
365 assert_eq!(reader.read(&mut buf)?, 0, "wrong byte count");
366 assert_eq!(reader.read(&mut buf)?, 0, "wrong byte count");
367
368 Ok(())
369 }
370}