7 PROXMOX_TAPE_BLOCK_HEADER_MAGIC_1_0
,
12 /// Read a block stream generated by 'BlockWriter'.
14 /// This class implements 'TapeRead'. It always read whole blocks from
15 /// the underlying reader, and does additional error checks:
17 /// - check magic number (detect streams not written by 'BlockWriter')
18 /// - check block size
19 /// - check block sequence numbers
21 /// The reader consumes the EOF mark after the data stream (if read to
22 /// the end of the stream).
23 pub struct BlockedReader
<R
> {
25 buffer
: Box
<BlockHeader
>,
27 found_end_marker
: bool
,
34 impl <R
: BlockRead
> BlockedReader
<R
> {
36 /// Create a new BlockedReader instance.
38 /// This tries to read the first block. Please inspect the error
39 /// to detect EOF and EOT.
40 pub fn open(mut reader
: R
) -> Result
<Self, BlockReadError
> {
42 let mut buffer
= BlockHeader
::new();
44 Self::read_block_frame(&mut buffer
, &mut reader
)?
;
46 let (_size
, found_end_marker
) = Self::check_buffer(&buffer
, 0)?
;
48 let mut incomplete
= false;
49 let mut got_eod
= false;
52 incomplete
= buffer
.flags
.contains(BlockHeaderFlags
::INCOMPLETE
);
53 Self::consume_eof_marker(&mut reader
)?
;
69 fn check_buffer(buffer
: &BlockHeader
, seq_nr
: u32) -> Result
<(usize, bool
), std
::io
::Error
> {
71 if buffer
.magic
!= PROXMOX_TAPE_BLOCK_HEADER_MAGIC_1_0
{
72 proxmox
::io_bail
!("detected tape block with wrong magic number - not written by proxmox tape");
75 if seq_nr
!= buffer
.seq_nr() {
77 "detected tape block with wrong sequence number ({} != {})",
78 seq_nr
, buffer
.seq_nr())
81 let size
= buffer
.size();
82 let found_end_marker
= buffer
.flags
.contains(BlockHeaderFlags
::END_OF_STREAM
);
84 if size
> buffer
.payload
.len() {
85 proxmox
::io_bail
!("detected tape block with wrong payload size ({} > {}", size
, buffer
.payload
.len());
86 } else if size
== 0 && !found_end_marker
{
87 proxmox
::io_bail
!("detected tape block with zero payload size");
91 Ok((size
, found_end_marker
))
94 fn read_block_frame(buffer
: &mut BlockHeader
, reader
: &mut R
) -> Result
<(), BlockReadError
> {
97 std
::slice
::from_raw_parts_mut(
98 (buffer
as *mut BlockHeader
) as *mut u8,
103 let bytes
= reader
.read_block(data
)?
;
105 if bytes
!= BlockHeader
::SIZE
{
106 return Err(proxmox
::io_format_err
!("got wrong block size").into());
112 fn consume_eof_marker(reader
: &mut R
) -> Result
<(), std
::io
::Error
> {
113 let mut tmp_buf
= [0u8; 512]; // use a small buffer for testing EOF
114 match reader
.read_block(&mut tmp_buf
) {
116 proxmox
::io_bail
!("detected tape block after block-stream end marker");
118 Err(BlockReadError
::EndOfFile
) => {
121 Err(BlockReadError
::EndOfStream
) => {
122 proxmox
::io_bail
!("got unexpected end of tape");
124 Err(BlockReadError
::Error(err
)) => {
130 fn read_block(&mut self, check_end_marker
: bool
) -> Result
<usize, std
::io
::Error
> {
132 match Self::read_block_frame(&mut self.buffer
, &mut self.reader
) {
133 Ok(()) => { /* ok */ }
134 Err(BlockReadError
::EndOfFile
) => {
136 self.read_pos
= self.buffer
.payload
.len();
137 if !self.found_end_marker
&& check_end_marker
{
138 proxmox
::io_bail
!("detected tape stream without end marker");
142 Err(BlockReadError
::EndOfStream
) => {
143 proxmox
::io_bail
!("got unexpected end of tape");
145 Err(BlockReadError
::Error(err
)) => {
150 let (size
, found_end_marker
) = Self::check_buffer(&self.buffer
, self.seq_nr
)?
;
153 if found_end_marker
{ // consume EOF mark
154 self.found_end_marker
= true;
155 self.incomplete
= self.buffer
.flags
.contains(BlockHeaderFlags
::INCOMPLETE
);
156 Self::consume_eof_marker(&mut self.reader
)?
;
166 impl <R
: BlockRead
> TapeRead
for BlockedReader
<R
> {
168 fn is_incomplete(&self) -> Result
<bool
, std
::io
::Error
> {
170 proxmox
::io_bail
!("is_incomplete failed: EOD not reached");
172 if !self.found_end_marker
{
173 proxmox
::io_bail
!("is_incomplete failed: no end marker found");
179 fn has_end_marker(&self) -> Result
<bool
, std
::io
::Error
> {
181 proxmox
::io_bail
!("has_end_marker failed: EOD not reached");
184 Ok(self.found_end_marker
)
187 // like ReadExt::skip_to_end(), but does not raise an error if the
188 // stream has no end marker.
189 fn skip_data(&mut self) -> Result
<usize, std
::io
::Error
> {
191 let buffer_size
= self.buffer
.size();
192 let rest
= (buffer_size
as isize) - (self.read_pos
as isize);
194 bytes
= rest
as usize;
200 bytes
+= self.read_block(false)?
;
205 impl <R
: BlockRead
> Read
for BlockedReader
<R
> {
207 fn read(&mut self, buffer
: &mut [u8]) -> Result
<usize, std
::io
::Error
> {
210 proxmox
::io_bail
!("detected read after error - internal error");
213 let mut buffer_size
= self.buffer
.size();
214 let mut rest
= (buffer_size
as isize) - (self.read_pos
as isize);
216 if rest
<= 0 && !self.got_eod
{ // try to refill buffer
217 buffer_size
= match self.read_block(true) {
220 self.read_error
= true;
224 rest
= buffer_size
as isize;
230 let copy_len
= if (buffer
.len() as isize) < rest
{
235 buffer
[..copy_len
].copy_from_slice(
236 &self.buffer
.payload
[self.read_pos
..(self.read_pos
+ copy_len
)]);
237 self.read_pos
+= copy_len
;
246 use anyhow
::{bail, Error}
;
252 PROXMOX_TAPE_BLOCK_SIZE
,
257 fn write_and_verify(data
: &[u8]) -> Result
<(), Error
> {
259 let mut tape_data
= Vec
::new();
262 let writer
= EmulateTapeWriter
::new(&mut tape_data
, 1024*1024*10);
263 let mut writer
= BlockedWriter
::new(writer
);
265 writer
.write_all(data
)?
;
267 writer
.finish(false)?
;
272 ((data
.len() + PROXMOX_TAPE_BLOCK_SIZE
)/PROXMOX_TAPE_BLOCK_SIZE
)
273 *PROXMOX_TAPE_BLOCK_SIZE
276 let reader
= &mut &tape_data
[..];
277 let reader
= EmulateTapeReader
::new(reader
);
278 let mut reader
= BlockedReader
::open(reader
)?
;
280 let mut read_data
= Vec
::with_capacity(PROXMOX_TAPE_BLOCK_SIZE
);
281 reader
.read_to_end(&mut read_data
)?
;
283 assert_eq
!(data
.len(), read_data
.len());
285 assert_eq
!(data
, &read_data
[..]);
291 fn empty_stream() -> Result
<(), Error
> {
292 write_and_verify(b
"")
296 fn small_data() -> Result
<(), Error
> {
297 write_and_verify(b
"ABC")
301 fn large_data() -> Result
<(), Error
> {
302 let data
= proxmox
::sys
::linux
::random_data(1024*1024*5)?
;
303 write_and_verify(&data
)
307 fn no_data() -> Result
<(), Error
> {
308 let tape_data
= Vec
::new();
309 let reader
= &mut &tape_data
[..];
310 let reader
= EmulateTapeReader
::new(reader
);
311 match BlockedReader
::open(reader
) {
312 Err(BlockReadError
::EndOfFile
) => { /* OK */ }
,
313 _
=> bail
!("expected EOF"),
320 fn no_end_marker() -> Result
<(), Error
> {
321 let mut tape_data
= Vec
::new();
323 let writer
= EmulateTapeWriter
::new(&mut tape_data
, 1024*1024);
324 let mut writer
= BlockedWriter
::new(writer
);
325 // write at least one block
326 let data
= proxmox
::sys
::linux
::random_data(PROXMOX_TAPE_BLOCK_SIZE
)?
;
327 writer
.write_all(&data
)?
;
328 // but do not call finish here
331 let reader
= &mut &tape_data
[..];
332 let reader
= EmulateTapeReader
::new(reader
);
333 let mut reader
= BlockedReader
::open(reader
)?
;
335 let mut data
= Vec
::with_capacity(PROXMOX_TAPE_BLOCK_SIZE
);
336 assert
!(reader
.read_to_end(&mut data
).is_err());
342 fn small_read_buffer() -> Result
<(), Error
> {
343 let mut tape_data
= Vec
::new();
346 let writer
= EmulateTapeWriter
::new(&mut tape_data
, 1024*1024);
347 let mut writer
= BlockedWriter
::new(writer
);
349 writer
.write_all(b
"ABC")?
;
351 writer
.finish(false)?
;
354 let reader
= &mut &tape_data
[..];
355 let reader
= EmulateTapeReader
::new(reader
);
356 let mut reader
= BlockedReader
::open(reader
)?
;
358 let mut buf
= [0u8; 1];
359 assert_eq
!(reader
.read(&mut buf
)?
, 1, "wrong byte count");
360 assert_eq
!(&buf
, b
"A");
361 assert_eq
!(reader
.read(&mut buf
)?
, 1, "wrong byte count");
362 assert_eq
!(&buf
, b
"B");
363 assert_eq
!(reader
.read(&mut buf
)?
, 1, "wrong byte count");
364 assert_eq
!(&buf
, b
"C");
365 assert_eq
!(reader
.read(&mut buf
)?
, 0, "wrong byte count");
366 assert_eq
!(reader
.read(&mut buf
)?
, 0, "wrong byte count");