2 This module provides two `std::io::Read` implementations:
4 * [`read::FrameDecoder`](struct.FrameDecoder.html)
5 wraps another `std::io::Read` implemenation, and decompresses data encoded
6 using the Snappy frame format. Use this if you have a compressed data source
7 and wish to read it as uncompressed data.
8 * [`read::FrameEncoder`](struct.FrameEncoder.html)
9 wraps another `std::io::Read` implemenation, and compresses data encoded
10 using the Snappy frame format. Use this if you have uncompressed data source
11 and wish to read it as compressed data.
13 Typically, `read::FrameDecoder` is the version that you'll want.
21 use crate::compress
::Encoder
;
22 use crate::crc32
::CheckSummer
;
23 use crate::decompress
::{decompress_len, Decoder}
;
24 use crate::error
::Error
;
26 compress_frame
, ChunkType
, CHUNK_HEADER_AND_CRC_SIZE
,
27 MAX_COMPRESS_BLOCK_SIZE
, STREAM_BODY
, STREAM_IDENTIFIER
,
29 use crate::MAX_BLOCK_SIZE
;
31 /// The maximum size of a compressed block, including the header and stream
32 /// identifier, that can be emitted by FrameEncoder.
33 const MAX_READ_FRAME_ENCODER_BLOCK_SIZE
: usize = STREAM_IDENTIFIER
.len()
34 + CHUNK_HEADER_AND_CRC_SIZE
35 + MAX_COMPRESS_BLOCK_SIZE
;
37 /// A reader for decompressing a Snappy stream.
39 /// This `FrameDecoder` wraps any other reader that implements `std::io::Read`.
40 /// Bytes read from this reader are decompressed using the
41 /// [Snappy frame format](https://github.com/google/snappy/blob/master/framing_format.txt)
42 /// (file extension `sz`, MIME type `application/x-snappy-framed`).
44 /// This reader can potentially make many small reads from the underlying
45 /// stream depending on its format, therefore, passing in a buffered reader
46 /// may be beneficial.
47 pub struct FrameDecoder
<R
: io
::Read
> {
48 /// The underlying reader.
50 /// A Snappy decoder that we reuse that does the actual block based
53 /// A CRC32 checksummer that is configured to either use the portable
54 /// fallback version or the SSE4.2 accelerated version when the right CPU
55 /// features are available.
56 checksummer
: CheckSummer
,
57 /// The compressed bytes buffer, taken from the underlying reader.
59 /// The decompressed bytes buffer. Bytes are decompressed from src to dst
60 /// before being passed back to the caller.
62 /// Index into dst: starting point of bytes not yet given back to caller.
64 /// Index into dst: ending point of bytes not yet given back to caller.
66 /// Whether we've read the special stream header or not.
67 read_stream_ident
: bool
,
70 impl<R
: io
::Read
> FrameDecoder
<R
> {
71 /// Create a new reader for streaming Snappy decompression.
72 pub fn new(rdr
: R
) -> FrameDecoder
<R
> {
76 checksummer
: CheckSummer
::new(),
77 src
: vec
![0; MAX_COMPRESS_BLOCK_SIZE
],
78 dst
: vec
![0; MAX_BLOCK_SIZE
],
81 read_stream_ident
: false,
85 /// Gets a reference to the underlying reader in this decoder.
86 pub fn get_ref(&self) -> &R
{
90 /// Gets a mutable reference to the underlying reader in this decoder.
92 /// Note that mutation of the stream may result in surprising results if
93 /// this decoder is continued to be used.
94 pub fn get_mut(&mut self) -> &mut R
{
99 impl<R
: io
::Read
> io
::Read
for FrameDecoder
<R
> {
100 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> {
103 return Err(io
::Error
::from($err
));
107 if self.dsts
< self.dste
{
108 let len
= cmp
::min(self.dste
- self.dsts
, buf
.len());
109 let dste
= self.dsts
.checked_add(len
).unwrap();
110 buf
[0..len
].copy_from_slice(&self.dst
[self.dsts
..dste
]);
114 if !read_exact_eof(&mut self.r
, &mut self.src
[0..4])?
{
117 let ty
= ChunkType
::from_u8(self.src
[0]);
118 if !self.read_stream_ident
{
119 if ty
!= Ok(ChunkType
::Stream
) {
120 fail
!(Error
::StreamHeader { byte: self.src[0] }
);
122 self.read_stream_ident
= true;
124 let len64
= bytes
::read_u24_le(&self.src
[1..]) as u64;
125 if len64
> self.src
.len() as u64 {
126 fail
!(Error
::UnsupportedChunkLength
{
131 let len
= len64
as usize;
133 Err(b
) if 0x02 <= b
&& b
<= 0x7F => {
134 // Spec says that chunk types 0x02-0x7F are reserved and
135 // conformant decoders must return an error.
136 fail
!(Error
::UnsupportedChunkType { byte: b }
);
138 Err(b
) if 0x80 <= b
&& b
<= 0xFD => {
139 // Spec says that chunk types 0x80-0xFD are reserved but
141 self.r
.read_exact(&mut self.src
[0..len
])?
;
144 // Can never happen. 0x02-0x7F and 0x80-0xFD are handled
145 // above in the error case. That leaves 0x00, 0x01, 0xFE
146 // and 0xFF, each of which correspond to one of the four
147 // defined chunk types.
148 unreachable
!("BUG: unhandled chunk type: {}", b
);
150 Ok(ChunkType
::Padding
) => {
151 // Just read and move on.
152 self.r
.read_exact(&mut self.src
[0..len
])?
;
154 Ok(ChunkType
::Stream
) => {
155 if len
!= STREAM_BODY
.len() {
156 fail
!(Error
::UnsupportedChunkLength
{
161 self.r
.read_exact(&mut self.src
[0..len
])?
;
162 if &self.src
[0..len
] != STREAM_BODY
{
163 fail
!(Error
::StreamHeaderMismatch
{
164 bytes
: self.src
[0..len
].to_vec(),
168 Ok(ChunkType
::Uncompressed
) => {
170 fail
!(Error
::UnsupportedChunkLength
{
175 let expected_sum
= bytes
::io_read_u32_le(&mut self.r
)?
;
177 if n
> self.dst
.len() {
178 fail
!(Error
::UnsupportedChunkLength
{
183 self.r
.read_exact(&mut self.dst
[0..n
])?
;
185 self.checksummer
.crc32c_masked(&self.dst
[0..n
]);
186 if expected_sum
!= got_sum
{
187 fail
!(Error
::Checksum
{
188 expected
: expected_sum
,
195 Ok(ChunkType
::Compressed
) => {
197 fail
!(Error
::UnsupportedChunkLength
{
202 let expected_sum
= bytes
::io_read_u32_le(&mut self.r
)?
;
204 if sn
> self.src
.len() {
205 fail
!(Error
::UnsupportedChunkLength
{
210 self.r
.read_exact(&mut self.src
[0..sn
])?
;
211 let dn
= decompress_len(&self.src
)?
;
212 if dn
> self.dst
.len() {
213 fail
!(Error
::UnsupportedChunkLength
{
219 .decompress(&self.src
[0..sn
], &mut self.dst
[0..dn
])?
;
221 self.checksummer
.crc32c_masked(&self.dst
[0..dn
]);
222 if expected_sum
!= got_sum
{
223 fail
!(Error
::Checksum
{
224 expected
: expected_sum
,
236 impl<R
: fmt
::Debug
+ io
::Read
> fmt
::Debug
for FrameDecoder
<R
> {
237 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
238 f
.debug_struct("FrameDecoder")
240 .field("dec", &self.dec
)
241 .field("checksummer", &self.checksummer
)
242 .field("src", &"[...]")
243 .field("dst", &"[...]")
244 .field("dsts", &self.dsts
)
245 .field("dste", &self.dste
)
246 .field("read_stream_ident", &self.read_stream_ident
)
251 /// A reader for compressing data using snappy as it is read.
253 /// This `FrameEncoder` wraps any other reader that implements `std::io::Read`.
254 /// Bytes read from this reader are compressed using the
255 /// [Snappy frame format](https://github.com/google/snappy/blob/master/framing_format.txt)
256 /// (file extension `sz`, MIME type `application/x-snappy-framed`).
258 /// Usually you'll want
259 /// [`read::FrameDecoder`](struct.FrameDecoder.html)
260 /// (for decompressing while reading) or
261 /// [`write::FrameEncoder`](../write/struct.FrameEncoder.html)
262 /// (for compressing while writing) instead.
264 /// Unlike `FrameDecoder`, this will attempt to make large reads roughly
265 /// equivalent to the size of a single Snappy block. Therefore, callers may not
266 /// benefit from using a buffered reader.
267 pub struct FrameEncoder
<R
: io
::Read
> {
268 /// Internally, we split `FrameEncoder` in two to keep the borrow checker
269 /// happy. The `inner` member contains everything that `read_frame` needs
270 /// to fetch a frame's worth of data and compress it.
272 /// Data that we've encoded and are ready to return to our caller.
274 /// Starting point of bytes in `dst` not yet given back to the caller.
276 /// Ending point of bytes in `dst` that we want to give to our caller.
280 struct Inner
<R
: io
::Read
> {
281 /// The underlying data source.
283 /// An encoder that we reuse that does the actual block based compression.
285 /// A CRC32 checksummer that is configured to either use the portable
286 /// fallback version or the SSE4.2 accelerated version when the right CPU
287 /// features are available.
288 checksummer
: CheckSummer
,
289 /// Data taken from the underlying `r`, and not yet compressed.
291 /// Have we written the standard snappy header to `dst` yet?
292 wrote_stream_ident
: bool
,
295 impl<R
: io
::Read
> FrameEncoder
<R
> {
296 /// Create a new reader for streaming Snappy compression.
297 pub fn new(rdr
: R
) -> FrameEncoder
<R
> {
302 checksummer
: CheckSummer
::new(),
303 src
: vec
![0; MAX_BLOCK_SIZE
],
304 wrote_stream_ident
: false,
306 dst
: vec
![0; MAX_READ_FRAME_ENCODER_BLOCK_SIZE
],
312 /// Gets a reference to the underlying reader in this decoder.
313 pub fn get_ref(&self) -> &R
{
317 /// Gets a mutable reference to the underlying reader in this decoder.
319 /// Note that mutation of the stream may result in surprising results if
320 /// this encoder is continued to be used.
321 pub fn get_mut(&mut self) -> &mut R
{
325 /// Read previously compressed data from `self.dst`, returning the number of
326 /// bytes read. If `self.dst` is empty, returns 0.
327 fn read_from_dst(&mut self, buf
: &mut [u8]) -> usize {
328 let available_bytes
= self.dste
- self.dsts
;
329 let count
= cmp
::min(available_bytes
, buf
.len());
330 buf
[..count
].copy_from_slice(&self.dst
[self.dsts
..self.dsts
+ count
]);
336 impl<R
: io
::Read
> io
::Read
for FrameEncoder
<R
> {
337 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> {
338 // Try reading previously compressed bytes from our `dst` buffer, if
340 let count
= self.read_from_dst(buf
);
343 // We had some bytes in our `dst` buffer that we used.
345 } else if buf
.len() >= MAX_READ_FRAME_ENCODER_BLOCK_SIZE
{
346 // Our output `buf` is big enough that we can directly write into
347 // it, so bypass `dst` entirely.
348 self.inner
.read_frame(buf
)
350 // We need to refill `self.dst`, and then return some bytes from
352 let count
= self.inner
.read_frame(&mut self.dst
)?
;
355 Ok(self.read_from_dst(buf
))
360 impl<R
: io
::Read
> Inner
<R
> {
361 /// Read from `self.r`, and create a new frame, writing it to `dst`, which
362 /// must be at least `MAX_READ_FRAME_ENCODER_BLOCK_SIZE` bytes in size.
363 fn read_frame(&mut self, dst
: &mut [u8]) -> io
::Result
<usize> {
364 debug_assert
!(dst
.len() >= MAX_READ_FRAME_ENCODER_BLOCK_SIZE
);
366 // We make one read to the underlying reader. If the underlying reader
367 // doesn't fill the buffer but there are still bytes to be read, then
368 // compression won't be optimal. The alternative would be to block
369 // until our buffer is maximally full (or we see EOF), but this seems
370 // more surprising. In general, io::Read implementations should try to
371 // fill the caller's buffer as much as they can, so this seems like the
373 let nread
= self.r
.read(&mut self.src
)?
;
378 // If we haven't yet written the stream header to `dst`, write it.
379 let mut dst_write_start
= 0;
380 if !self.wrote_stream_ident
{
381 dst
[0..STREAM_IDENTIFIER
.len()].copy_from_slice(STREAM_IDENTIFIER
);
382 dst_write_start
+= STREAM_IDENTIFIER
.len();
383 self.wrote_stream_ident
= true;
386 // Reserve space for our chunk header. We need to use `split_at_mut` so
387 // that we can get two mutable slices pointing at non-overlapping parts
389 let (chunk_header
, remaining_dst
) =
390 dst
[dst_write_start
..].split_at_mut(CHUNK_HEADER_AND_CRC_SIZE
);
391 dst_write_start
+= CHUNK_HEADER_AND_CRC_SIZE
;
393 // Compress our frame if possible, telling `compress_frame` to always
394 // put the output in `dst`.
395 let frame_data
= compress_frame(
403 Ok(dst_write_start
+ frame_data
.len())
407 impl<R
: fmt
::Debug
+ io
::Read
> fmt
::Debug
for FrameEncoder
<R
> {
408 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
409 f
.debug_struct("FrameEncoder")
410 .field("inner", &self.inner
)
411 .field("dst", &"[...]")
412 .field("dsts", &self.dsts
)
413 .field("dste", &self.dste
)
418 impl<R
: fmt
::Debug
+ io
::Read
> fmt
::Debug
for Inner
<R
> {
419 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
420 f
.debug_struct("Inner")
422 .field("enc", &self.enc
)
423 .field("checksummer", &self.checksummer
)
424 .field("src", &"[...]")
425 .field("wrote_stream_ident", &self.wrote_stream_ident
)
430 // read_exact_eof is like Read::read_exact, except it converts an UnexpectedEof
431 // error to a bool of false.
433 // If no error occurred, then this returns true.
434 fn read_exact_eof
<R
: io
::Read
>(
437 ) -> io
::Result
<bool
> {
438 use std
::io
::ErrorKind
::UnexpectedEof
;
439 match rdr
.read_exact(buf
) {
441 Err(ref err
) if err
.kind() == UnexpectedEof
=> Ok(false),
442 Err(err
) => Err(err
),