3 use std
::task
::{Context, Poll}
;
7 use flate2
::{Compress, Compression, FlushCompress}
;
9 use futures
::stream
::Stream
;
10 use tokio
::io
::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}
;
12 use proxmox
::io_format_err
;
13 use proxmox_io
::ByteBuffer
;
15 const BUFFER_SIZE
: usize = 8192;
24 #[derive(Eq, PartialEq)]
32 pub struct DeflateEncoder
<T
> {
40 impl<T
> DeflateEncoder
<T
> {
41 pub fn new(inner
: T
) -> Self {
42 Self::with_quality(inner
, Level
::Default
)
45 pub fn with_quality(inner
: T
, level
: Level
) -> Self {
46 let level
= match level
{
47 Level
::Fastest
=> Compression
::fast(),
48 Level
::Best
=> Compression
::best(),
49 Level
::Default
=> Compression
::new(3),
50 Level
::Precise(val
) => Compression
::new(val
),
55 compressor
: Compress
::new(level
, false),
56 buffer
: ByteBuffer
::with_capacity(BUFFER_SIZE
),
57 input_buffer
: Bytes
::new(),
58 state
: EncoderState
::Reading
,
62 pub fn total_in(&self) -> u64 {
63 self.compressor
.total_in()
66 pub fn total_out(&self) -> u64 {
67 self.compressor
.total_out()
70 pub fn into_inner(self) -> T
{
78 ) -> Result
<(usize, flate2
::Status
), io
::Error
> {
79 let old_in
= self.compressor
.total_in();
80 let old_out
= self.compressor
.total_out();
83 .compress(&inbuf
[..], self.buffer
.get_free_mut_slice(), flush
)?
;
84 let new_in
= (self.compressor
.total_in() - old_in
) as usize;
85 let new_out
= (self.compressor
.total_out() - old_out
) as usize;
86 self.buffer
.add_size(new_out
);
92 impl DeflateEncoder
<Vec
<u8>> {
94 pub async
fn compress_vec
<R
>(&mut self, reader
: &mut R
, size_hint
: usize) -> Result
<(), Error
>
98 let mut buffer
= Vec
::with_capacity(size_hint
);
99 reader
.read_to_end(&mut buffer
).await?
;
100 self.inner
.reserve(size_hint
); // should be enough since we want smalller files
101 self.compressor
.compress_vec(&buffer
[..], &mut self.inner
, FlushCompress
::Finish
)?
;
106 impl<T
: AsyncWrite
+ Unpin
> DeflateEncoder
<T
> {
107 pub async
fn compress
<R
>(&mut self, reader
: &mut R
) -> Result
<(), Error
>
109 R
: AsyncRead
+ Unpin
,
111 let mut buffer
= ByteBuffer
::with_capacity(BUFFER_SIZE
);
114 if !eof
&& !buffer
.is_full() {
115 let read
= buffer
.read_from_async(reader
).await?
;
120 let (read
, _res
) = self.encode(&buffer
[..], FlushCompress
::None
)?
;
121 buffer
.consume(read
);
123 self.inner
.write_all(&self.buffer
[..]).await?
;
126 if buffer
.is_empty() && eof
{
132 let (_read
, res
) = self.encode(&[][..], FlushCompress
::Finish
)?
;
133 self.inner
.write_all(&self.buffer
[..]).await?
;
135 if res
== flate2
::Status
::StreamEnd
{
144 impl<T
, O
> Stream
for DeflateEncoder
<T
>
146 T
: Stream
<Item
= Result
<O
, io
::Error
>> + Unpin
,
149 type Item
= Result
<Bytes
, io
::Error
>;
151 fn poll_next(self: Pin
<&mut Self>, cx
: &mut Context
<'_
>) -> Poll
<Option
<Self::Item
>> {
152 let this
= self.get_mut();
156 EncoderState
::Reading
=> {
157 if let Some(res
) = ready
!(Pin
::new(&mut this
.inner
).poll_next(cx
)) {
159 this
.input_buffer
= buf
.into();
160 this
.state
= EncoderState
::Writing
;
162 this
.state
= EncoderState
::Flushing
;
165 EncoderState
::Writing
=> {
166 if this
.input_buffer
.is_empty() {
167 return Poll
::Ready(Some(Err(io_format_err
!("empty input during write"))));
169 let mut buf
= this
.input_buffer
.split_off(0);
170 let (read
, res
) = this
.encode(&buf
[..], FlushCompress
::None
)?
;
171 this
.input_buffer
= buf
.split_off(read
);
172 if this
.input_buffer
.is_empty() {
173 this
.state
= EncoderState
::Reading
;
175 if this
.buffer
.is_full() || res
== flate2
::Status
::BufError
{
176 let bytes
= this
.buffer
.remove_data(this
.buffer
.len()).to_vec();
177 return Poll
::Ready(Some(Ok(bytes
.into())));
180 EncoderState
::Flushing
=> {
181 let (_read
, res
) = this
.encode(&[][..], FlushCompress
::Finish
)?
;
182 if !this
.buffer
.is_empty() {
183 let bytes
= this
.buffer
.remove_data(this
.buffer
.len()).to_vec();
184 return Poll
::Ready(Some(Ok(bytes
.into())));
186 if res
== flate2
::Status
::StreamEnd
{
187 this
.state
= EncoderState
::Finished
;
190 EncoderState
::Finished
=> return Poll
::Ready(None
),