2 This module provides a `std::io::Write` implementation:
4 - `write::FrameEncoder` wraps another `std::io::Write` implemenation, and
5 compresses data encoded using the Snappy frame format. Use this if you have
6 uncompressed data source and wish to write it as compressed data.
8 It would also be possible to provide a `write::FrameDecoder`, which decompresses
9 data as it writes it, but it hasn't been implemented yet.
13 use std
::io
::{self, Write}
;
15 use crate::compress
::Encoder
;
16 use crate::crc32
::CheckSummer
;
17 pub use crate::error
::IntoInnerError
;
19 compress_frame
, CHUNK_HEADER_AND_CRC_SIZE
, MAX_COMPRESS_BLOCK_SIZE
,
22 use crate::MAX_BLOCK_SIZE
;
24 /// A writer for compressing a Snappy stream.
26 /// This `FrameEncoder` wraps any other writer that implements `io::Write`.
27 /// Bytes written to this writer are compressed using the [Snappy frame
28 /// format](https://github.com/google/snappy/blob/master/framing_format.txt)
29 /// (file extension `sz`, MIME type `application/x-snappy-framed`).
31 /// Writes are buffered automatically, so there's no need to wrap the given
32 /// writer in a `std::io::BufWriter`.
34 /// The writer will be flushed automatically when it is dropped. If an error
35 /// occurs, it is ignored.
36 pub struct FrameEncoder
<W
: io
::Write
> {
37 /// Our main internal state, split out for borrowck reasons (happily paid).
39 /// Also, it's an `Option` so we can move out of it even though
40 /// `FrameEncoder` impls `Drop`.
41 inner
: Option
<Inner
<W
>>,
42 /// Our buffer of uncompressed bytes. This isn't part of `inner` because
43 /// we may write bytes directly from the caller if the given buffer was
44 /// big enough. As a result, the main `write` implementation needs to
45 /// accept either the internal buffer or the caller's bytes directly. Since
46 /// `write` requires a mutable borrow, we satisfy the borrow checker by
47 /// separating `src` from the rest of the state.
52 /// The underlying writer.
54 /// An encoder that we reuse that does the actual block based compression.
56 /// A CRC32 checksummer that is configured to either use the portable
57 /// fallback version or the SSE4.2 accelerated version when the right CPU
58 /// features are available.
59 checksummer
: CheckSummer
,
60 /// The compressed bytes buffer. Bytes are compressed from src (usually)
61 /// to dst before being written to w.
63 /// When false, the stream identifier (with magic bytes) must precede the
65 wrote_stream_ident
: bool
,
66 /// Space for writing the header of a chunk before writing it to the
67 /// underlying writer.
68 chunk_header
: [u8; 8],
71 impl<W
: io
::Write
> FrameEncoder
<W
> {
72 /// Create a new writer for streaming Snappy compression.
73 pub fn new(wtr
: W
) -> FrameEncoder
<W
> {
78 checksummer
: CheckSummer
::new(),
79 dst
: vec
![0; MAX_COMPRESS_BLOCK_SIZE
],
80 wrote_stream_ident
: false,
81 chunk_header
: [0; CHUNK_HEADER_AND_CRC_SIZE
],
83 src
: Vec
::with_capacity(MAX_BLOCK_SIZE
),
87 /// Returns the underlying stream, consuming and flushing this writer.
89 /// If flushing the writer caused an error, then an `IntoInnerError` is
90 /// returned, which contains both the writer and the original writer.
91 pub fn into_inner(mut self) -> Result
<W
, IntoInnerError
<FrameEncoder
<W
>>> {
93 Ok(()) => Ok(self.inner
.take().unwrap().w
),
94 Err(err
) => Err(IntoInnerError
::new(self, err
)),
98 /// Gets a reference to the underlying writer in this encoder.
99 pub fn get_ref(&self) -> &W
{
100 &self.inner
.as_ref().unwrap().w
103 /// Gets a reference to the underlying writer in this encoder.
105 /// Note that mutating the output/input state of the stream may corrupt
106 /// this encoder, so care must be taken when using this method.
107 pub fn get_mut(&mut self) -> &mut W
{
108 &mut self.inner
.as_mut().unwrap().w
112 impl<W
: io
::Write
> Drop
for FrameEncoder
<W
> {
114 if self.inner
.is_some() {
115 // Ignore errors because we can't conceivably return an error and
116 // panicing in a dtor is bad juju.
117 let _
= self.flush();
122 impl<W
: io
::Write
> io
::Write
for FrameEncoder
<W
> {
123 fn write(&mut self, mut buf
: &[u8]) -> io
::Result
<usize> {
125 // If there isn't enough room to add buf to src, then add only a piece
126 // of it, flush it and mush on.
128 let free
= self.src
.capacity() - self.src
.len();
129 // n is the number of bytes extracted from buf.
130 let n
= if buf
.len() <= free
{
132 } else if self.src
.is_empty() {
133 // If buf is bigger than our entire buffer then avoid
134 // the indirection and write the buffer directly.
135 self.inner
.as_mut().unwrap().write(buf
)?
137 self.src
.extend_from_slice(&buf
[0..free
]);
144 // We're only here if buf.len() will fit within the available space of
146 debug_assert
!(buf
.len() <= (self.src
.capacity() - self.src
.len()));
147 self.src
.extend_from_slice(buf
);
149 // We should never expand or contract self.src.
150 debug_assert
!(self.src
.capacity() == MAX_BLOCK_SIZE
);
154 fn flush(&mut self) -> io
::Result
<()> {
155 if self.src
.is_empty() {
158 self.inner
.as_mut().unwrap().write(&self.src
)?
;
159 self.src
.truncate(0);
164 impl<W
: io
::Write
> Inner
<W
> {
165 fn write(&mut self, mut buf
: &[u8]) -> io
::Result
<usize> {
167 if !self.wrote_stream_ident
{
168 self.wrote_stream_ident
= true;
169 self.w
.write_all(STREAM_IDENTIFIER
)?
;
171 while !buf
.is_empty() {
172 // Advance buf and get our block.
174 if src
.len() > MAX_BLOCK_SIZE
{
175 src
= &src
[0..MAX_BLOCK_SIZE
];
177 buf
= &buf
[src
.len()..];
179 let frame_data
= compress_frame(
183 &mut self.chunk_header
,
187 self.w
.write_all(&self.chunk_header
)?
;
188 self.w
.write_all(frame_data
)?
;
195 impl<W
: fmt
::Debug
+ io
::Write
> fmt
::Debug
for FrameEncoder
<W
> {
196 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
197 f
.debug_struct("FrameEncoder")
198 .field("inner", &self.inner
)
199 .field("src", &"[...]")
204 impl<W
: fmt
::Debug
+ io
::Write
> fmt
::Debug
for Inner
<W
> {
205 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
206 f
.debug_struct("Inner")
208 .field("enc", &self.enc
)
209 .field("checksummer", &self.checksummer
)
210 .field("dst", &"[...]")
211 .field("wrote_stream_ident", &self.wrote_stream_ident
)
212 .field("chunk_header", &self.chunk_header
)