]>
Commit | Line | Data |
---|---|---|
2e7014e3 DM |
1 | use std::io::Read; |
2 | ||
048b43af | 3 | use crate::{ |
2e7014e3 | 4 | TapeRead, |
0db57124 | 5 | BlockRead, |
318b3106 | 6 | BlockReadError, |
048b43af DM |
7 | PROXMOX_TAPE_BLOCK_HEADER_MAGIC_1_0, |
8 | BlockHeader, | |
9 | BlockHeaderFlags, | |
2e7014e3 DM |
10 | }; |
11 | ||
12 | /// Read a block stream generated by 'BlockWriter'. | |
13 | /// | |
14 | /// This class implements 'TapeRead'. It always read whole blocks from | |
15 | /// the underlying reader, and does additional error checks: | |
16 | /// | |
17 | /// - check magic number (detect streams not written by 'BlockWriter') | |
18 | /// - check block size | |
19 | /// - check block sequence numbers | |
20 | /// | |
21 | /// The reader consumes the EOF mark after the data stream (if read to | |
22 | /// the end of the stream). | |
23 | pub struct BlockedReader<R> { | |
24 | reader: R, | |
25 | buffer: Box<BlockHeader>, | |
26 | seq_nr: u32, | |
27 | found_end_marker: bool, | |
28 | incomplete: bool, | |
29 | got_eod: bool, | |
30 | read_error: bool, | |
31 | read_pos: usize, | |
32 | } | |
33 | ||
0db57124 | 34 | impl <R: BlockRead> BlockedReader<R> { |
2e7014e3 DM |
35 | |
36 | /// Create a new BlockedReader instance. | |
37 | /// | |
318b3106 DM |
38 | /// This tries to read the first block. Please inspect the error |
39 | /// to detect EOF and EOT. | |
40 | pub fn open(mut reader: R) -> Result<Self, BlockReadError> { | |
2e7014e3 DM |
41 | |
42 | let mut buffer = BlockHeader::new(); | |
43 | ||
318b3106 | 44 | Self::read_block_frame(&mut buffer, &mut reader)?; |
2e7014e3 DM |
45 | |
46 | let (_size, found_end_marker) = Self::check_buffer(&buffer, 0)?; | |
47 | ||
48 | let mut incomplete = false; | |
7d2c156e DM |
49 | let mut got_eod = false; |
50 | ||
2e7014e3 DM |
51 | if found_end_marker { |
52 | incomplete = buffer.flags.contains(BlockHeaderFlags::INCOMPLETE); | |
7d2c156e DM |
53 | Self::consume_eof_marker(&mut reader)?; |
54 | got_eod = true; | |
2e7014e3 | 55 | } |
7d2c156e | 56 | |
318b3106 | 57 | Ok(Self { |
2e7014e3 DM |
58 | reader, |
59 | buffer, | |
60 | found_end_marker, | |
61 | incomplete, | |
7d2c156e | 62 | got_eod, |
2e7014e3 | 63 | seq_nr: 1, |
2e7014e3 DM |
64 | read_error: false, |
65 | read_pos: 0, | |
318b3106 | 66 | }) |
2e7014e3 DM |
67 | } |
68 | ||
69 | fn check_buffer(buffer: &BlockHeader, seq_nr: u32) -> Result<(usize, bool), std::io::Error> { | |
70 | ||
71 | if buffer.magic != PROXMOX_TAPE_BLOCK_HEADER_MAGIC_1_0 { | |
25877d05 | 72 | proxmox_sys::io_bail!("detected tape block with wrong magic number - not written by proxmox tape"); |
2e7014e3 DM |
73 | } |
74 | ||
75 | if seq_nr != buffer.seq_nr() { | |
25877d05 | 76 | proxmox_sys::io_bail!( |
d1d74c43 | 77 | "detected tape block with wrong sequence number ({} != {})", |
2e7014e3 DM |
78 | seq_nr, buffer.seq_nr()) |
79 | } | |
80 | ||
81 | let size = buffer.size(); | |
82 | let found_end_marker = buffer.flags.contains(BlockHeaderFlags::END_OF_STREAM); | |
83 | ||
84 | if size > buffer.payload.len() { | |
25877d05 | 85 | proxmox_sys::io_bail!("detected tape block with wrong payload size ({} > {}", size, buffer.payload.len()); |
6334bdc1 | 86 | } else if size == 0 && !found_end_marker { |
25877d05 | 87 | proxmox_sys::io_bail!("detected tape block with zero payload size"); |
2e7014e3 DM |
88 | } |
89 | ||
90 | ||
91 | Ok((size, found_end_marker)) | |
92 | } | |
93 | ||
318b3106 | 94 | fn read_block_frame(buffer: &mut BlockHeader, reader: &mut R) -> Result<(), BlockReadError> { |
2e7014e3 DM |
95 | |
96 | let data = unsafe { | |
97 | std::slice::from_raw_parts_mut( | |
98 | (buffer as *mut BlockHeader) as *mut u8, | |
99 | BlockHeader::SIZE, | |
100 | ) | |
101 | }; | |
102 | ||
318b3106 DM |
103 | let bytes = reader.read_block(data)?; |
104 | ||
105 | if bytes != BlockHeader::SIZE { | |
25877d05 | 106 | return Err(proxmox_sys::io_format_err!("got wrong block size").into()); |
0db57124 | 107 | } |
318b3106 DM |
108 | |
109 | Ok(()) | |
2e7014e3 DM |
110 | } |
111 | ||
7d2c156e DM |
112 | fn consume_eof_marker(reader: &mut R) -> Result<(), std::io::Error> { |
113 | let mut tmp_buf = [0u8; 512]; // use a small buffer for testing EOF | |
0db57124 | 114 | match reader.read_block(&mut tmp_buf) { |
318b3106 | 115 | Ok(_) => { |
25877d05 | 116 | proxmox_sys::io_bail!("detected tape block after block-stream end marker"); |
0db57124 | 117 | } |
318b3106 | 118 | Err(BlockReadError::EndOfFile) => { |
0db57124 DM |
119 | return Ok(()); |
120 | } | |
318b3106 | 121 | Err(BlockReadError::EndOfStream) => { |
25877d05 | 122 | proxmox_sys::io_bail!("got unexpected end of tape"); |
0db57124 | 123 | } |
318b3106 | 124 | Err(BlockReadError::Error(err)) => { |
0db57124 DM |
125 | return Err(err); |
126 | } | |
7d2c156e | 127 | } |
7d2c156e DM |
128 | } |
129 | ||
90461b76 | 130 | fn read_block(&mut self, check_end_marker: bool) -> Result<usize, std::io::Error> { |
2e7014e3 | 131 | |
318b3106 DM |
132 | match Self::read_block_frame(&mut self.buffer, &mut self.reader) { |
133 | Ok(()) => { /* ok */ } | |
134 | Err(BlockReadError::EndOfFile) => { | |
135 | self.got_eod = true; | |
136 | self.read_pos = self.buffer.payload.len(); | |
90461b76 | 137 | if !self.found_end_marker && check_end_marker { |
25877d05 | 138 | proxmox_sys::io_bail!("detected tape stream without end marker"); |
318b3106 DM |
139 | } |
140 | return Ok(0); // EOD | |
141 | } | |
142 | Err(BlockReadError::EndOfStream) => { | |
25877d05 | 143 | proxmox_sys::io_bail!("got unexpected end of tape"); |
318b3106 DM |
144 | } |
145 | Err(BlockReadError::Error(err)) => { | |
146 | return Err(err); | |
2e7014e3 | 147 | } |
2e7014e3 DM |
148 | } |
149 | ||
150 | let (size, found_end_marker) = Self::check_buffer(&self.buffer, self.seq_nr)?; | |
151 | self.seq_nr += 1; | |
152 | ||
153 | if found_end_marker { // consume EOF mark | |
154 | self.found_end_marker = true; | |
155 | self.incomplete = self.buffer.flags.contains(BlockHeaderFlags::INCOMPLETE); | |
7d2c156e DM |
156 | Self::consume_eof_marker(&mut self.reader)?; |
157 | self.got_eod = true; | |
2e7014e3 DM |
158 | } |
159 | ||
160 | self.read_pos = 0; | |
161 | ||
162 | Ok(size) | |
163 | } | |
164 | } | |
165 | ||
0db57124 | 166 | impl <R: BlockRead> TapeRead for BlockedReader<R> { |
2e7014e3 DM |
167 | |
168 | fn is_incomplete(&self) -> Result<bool, std::io::Error> { | |
169 | if !self.got_eod { | |
25877d05 | 170 | proxmox_sys::io_bail!("is_incomplete failed: EOD not reached"); |
2e7014e3 DM |
171 | } |
172 | if !self.found_end_marker { | |
25877d05 | 173 | proxmox_sys::io_bail!("is_incomplete failed: no end marker found"); |
2e7014e3 DM |
174 | } |
175 | ||
176 | Ok(self.incomplete) | |
177 | } | |
178 | ||
179 | fn has_end_marker(&self) -> Result<bool, std::io::Error> { | |
180 | if !self.got_eod { | |
25877d05 | 181 | proxmox_sys::io_bail!("has_end_marker failed: EOD not reached"); |
2e7014e3 DM |
182 | } |
183 | ||
184 | Ok(self.found_end_marker) | |
185 | } | |
90461b76 DM |
186 | |
187 | // like ReadExt::skip_to_end(), but does not raise an error if the | |
188 | // stream has no end marker. | |
189 | fn skip_data(&mut self) -> Result<usize, std::io::Error> { | |
190 | let mut bytes = 0; | |
191 | let buffer_size = self.buffer.size(); | |
192 | let rest = (buffer_size as isize) - (self.read_pos as isize); | |
193 | if rest > 0 { | |
194 | bytes = rest as usize; | |
195 | } | |
196 | loop { | |
197 | if self.got_eod { | |
198 | return Ok(bytes); | |
199 | } | |
200 | bytes += self.read_block(false)?; | |
201 | } | |
202 | } | |
2e7014e3 DM |
203 | } |
204 | ||
0db57124 | 205 | impl <R: BlockRead> Read for BlockedReader<R> { |
2e7014e3 DM |
206 | |
207 | fn read(&mut self, buffer: &mut [u8]) -> Result<usize, std::io::Error> { | |
208 | ||
209 | if self.read_error { | |
25877d05 | 210 | proxmox_sys::io_bail!("detected read after error - internal error"); |
2e7014e3 DM |
211 | } |
212 | ||
213 | let mut buffer_size = self.buffer.size(); | |
214 | let mut rest = (buffer_size as isize) - (self.read_pos as isize); | |
215 | ||
216 | if rest <= 0 && !self.got_eod { // try to refill buffer | |
90461b76 | 217 | buffer_size = match self.read_block(true) { |
2e7014e3 DM |
218 | Ok(len) => len, |
219 | err => { | |
220 | self.read_error = true; | |
221 | return err; | |
222 | } | |
223 | }; | |
224 | rest = buffer_size as isize; | |
225 | } | |
226 | ||
227 | if rest <= 0 { | |
38556bf6 | 228 | Ok(0) |
2e7014e3 DM |
229 | } else { |
230 | let copy_len = if (buffer.len() as isize) < rest { | |
231 | buffer.len() | |
232 | } else { | |
233 | rest as usize | |
234 | }; | |
235 | buffer[..copy_len].copy_from_slice( | |
236 | &self.buffer.payload[self.read_pos..(self.read_pos + copy_len)]); | |
237 | self.read_pos += copy_len; | |
38556bf6 | 238 | Ok(copy_len) |
2e7014e3 DM |
239 | } |
240 | } | |
241 | } | |
242 | ||
243 | #[cfg(test)] | |
244 | mod test { | |
245 | use std::io::Read; | |
ce5327ba | 246 | use anyhow::{bail, Error}; |
048b43af | 247 | use crate::{ |
2e7014e3 | 248 | TapeWrite, |
ce5327ba | 249 | BlockReadError, |
048b43af DM |
250 | EmulateTapeReader, |
251 | EmulateTapeWriter, | |
252 | PROXMOX_TAPE_BLOCK_SIZE, | |
253 | BlockedReader, | |
254 | BlockedWriter, | |
2e7014e3 DM |
255 | }; |
256 | ||
257 | fn write_and_verify(data: &[u8]) -> Result<(), Error> { | |
258 | ||
259 | let mut tape_data = Vec::new(); | |
260 | ||
0db57124 DM |
261 | { |
262 | let writer = EmulateTapeWriter::new(&mut tape_data, 1024*1024*10); | |
263 | let mut writer = BlockedWriter::new(writer); | |
2e7014e3 | 264 | |
0db57124 | 265 | writer.write_all(data)?; |
2e7014e3 | 266 | |
0db57124 DM |
267 | writer.finish(false)?; |
268 | } | |
2e7014e3 DM |
269 | |
270 | assert_eq!( | |
271 | tape_data.len(), | |
272 | ((data.len() + PROXMOX_TAPE_BLOCK_SIZE)/PROXMOX_TAPE_BLOCK_SIZE) | |
273 | *PROXMOX_TAPE_BLOCK_SIZE | |
274 | ); | |
275 | ||
276 | let reader = &mut &tape_data[..]; | |
0db57124 | 277 | let reader = EmulateTapeReader::new(reader); |
ce5327ba | 278 | let mut reader = BlockedReader::open(reader)?; |
2e7014e3 DM |
279 | |
280 | let mut read_data = Vec::with_capacity(PROXMOX_TAPE_BLOCK_SIZE); | |
281 | reader.read_to_end(&mut read_data)?; | |
282 | ||
283 | assert_eq!(data.len(), read_data.len()); | |
284 | ||
285 | assert_eq!(data, &read_data[..]); | |
286 | ||
287 | Ok(()) | |
288 | } | |
289 | ||
290 | #[test] | |
291 | fn empty_stream() -> Result<(), Error> { | |
292 | write_and_verify(b"") | |
293 | } | |
294 | ||
295 | #[test] | |
296 | fn small_data() -> Result<(), Error> { | |
297 | write_and_verify(b"ABC") | |
298 | } | |
299 | ||
300 | #[test] | |
301 | fn large_data() -> Result<(), Error> { | |
25877d05 | 302 | let data = proxmox_sys::linux::random_data(1024*1024*5)?; |
2e7014e3 DM |
303 | write_and_verify(&data) |
304 | } | |
305 | ||
306 | #[test] | |
307 | fn no_data() -> Result<(), Error> { | |
308 | let tape_data = Vec::new(); | |
309 | let reader = &mut &tape_data[..]; | |
0db57124 | 310 | let reader = EmulateTapeReader::new(reader); |
ce5327ba DM |
311 | match BlockedReader::open(reader) { |
312 | Err(BlockReadError::EndOfFile) => { /* OK */ }, | |
313 | _ => bail!("expected EOF"), | |
314 | } | |
2e7014e3 DM |
315 | |
316 | Ok(()) | |
317 | } | |
318 | ||
319 | #[test] | |
320 | fn no_end_marker() -> Result<(), Error> { | |
321 | let mut tape_data = Vec::new(); | |
322 | { | |
0db57124 DM |
323 | let writer = EmulateTapeWriter::new(&mut tape_data, 1024*1024); |
324 | let mut writer = BlockedWriter::new(writer); | |
2e7014e3 | 325 | // write at least one block |
25877d05 | 326 | let data = proxmox_sys::linux::random_data(PROXMOX_TAPE_BLOCK_SIZE)?; |
2e7014e3 DM |
327 | writer.write_all(&data)?; |
328 | // but do not call finish here | |
329 | } | |
0db57124 | 330 | |
2e7014e3 | 331 | let reader = &mut &tape_data[..]; |
0db57124 | 332 | let reader = EmulateTapeReader::new(reader); |
ce5327ba | 333 | let mut reader = BlockedReader::open(reader)?; |
2e7014e3 DM |
334 | |
335 | let mut data = Vec::with_capacity(PROXMOX_TAPE_BLOCK_SIZE); | |
336 | assert!(reader.read_to_end(&mut data).is_err()); | |
337 | ||
338 | Ok(()) | |
339 | } | |
340 | ||
341 | #[test] | |
342 | fn small_read_buffer() -> Result<(), Error> { | |
343 | let mut tape_data = Vec::new(); | |
344 | ||
0db57124 DM |
345 | { |
346 | let writer = EmulateTapeWriter::new(&mut tape_data, 1024*1024); | |
347 | let mut writer = BlockedWriter::new(writer); | |
2e7014e3 | 348 | |
0db57124 | 349 | writer.write_all(b"ABC")?; |
2e7014e3 | 350 | |
0db57124 DM |
351 | writer.finish(false)?; |
352 | } | |
2e7014e3 DM |
353 | |
354 | let reader = &mut &tape_data[..]; | |
0db57124 | 355 | let reader = EmulateTapeReader::new(reader); |
ce5327ba | 356 | let mut reader = BlockedReader::open(reader)?; |
2e7014e3 DM |
357 | |
358 | let mut buf = [0u8; 1]; | |
359 | assert_eq!(reader.read(&mut buf)?, 1, "wrong byte count"); | |
360 | assert_eq!(&buf, b"A"); | |
361 | assert_eq!(reader.read(&mut buf)?, 1, "wrong byte count"); | |
362 | assert_eq!(&buf, b"B"); | |
363 | assert_eq!(reader.read(&mut buf)?, 1, "wrong byte count"); | |
364 | assert_eq!(&buf, b"C"); | |
365 | assert_eq!(reader.read(&mut buf)?, 0, "wrong byte count"); | |
366 | assert_eq!(reader.read(&mut buf)?, 0, "wrong byte count"); | |
367 | ||
368 | Ok(()) | |
369 | } | |
370 | } |