]> git.proxmox.com Git - proxmox-backup.git/blame - pbs-tools/src/compression.rs
update to first proxmox crate split
[proxmox-backup.git] / pbs-tools / src / compression.rs
CommitLineData
2b7f8dd5
WB
1use std::io;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use anyhow::Error;
6use bytes::Bytes;
7use flate2::{Compress, Compression, FlushCompress};
8use futures::ready;
9use futures::stream::Stream;
10use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
11
12use proxmox::io_format_err;
6ef1b649 13use proxmox_io::ByteBuffer;
2b7f8dd5
WB
14
15const BUFFER_SIZE: usize = 8192;
16
17pub enum Level {
18 Fastest,
19 Best,
20 Default,
21 Precise(u32),
22}
23
24#[derive(Eq, PartialEq)]
25enum EncoderState {
26 Reading,
27 Writing,
28 Flushing,
29 Finished,
30}
31
32pub struct DeflateEncoder<T> {
33 inner: T,
34 compressor: Compress,
35 buffer: ByteBuffer,
36 input_buffer: Bytes,
37 state: EncoderState,
38}
39
40impl<T> DeflateEncoder<T> {
41 pub fn new(inner: T) -> Self {
42 Self::with_quality(inner, Level::Default)
43 }
44
45 pub fn with_quality(inner: T, level: Level) -> Self {
46 let level = match level {
47 Level::Fastest => Compression::fast(),
48 Level::Best => Compression::best(),
49 Level::Default => Compression::new(3),
50 Level::Precise(val) => Compression::new(val),
51 };
52
53 Self {
54 inner,
55 compressor: Compress::new(level, false),
56 buffer: ByteBuffer::with_capacity(BUFFER_SIZE),
57 input_buffer: Bytes::new(),
58 state: EncoderState::Reading,
59 }
60 }
61
62 pub fn total_in(&self) -> u64 {
63 self.compressor.total_in()
64 }
65
66 pub fn total_out(&self) -> u64 {
67 self.compressor.total_out()
68 }
69
70 pub fn into_inner(self) -> T {
71 self.inner
72 }
73
74 fn encode(
75 &mut self,
76 inbuf: &[u8],
77 flush: FlushCompress,
78 ) -> Result<(usize, flate2::Status), io::Error> {
79 let old_in = self.compressor.total_in();
80 let old_out = self.compressor.total_out();
81 let res = self
82 .compressor
83 .compress(&inbuf[..], self.buffer.get_free_mut_slice(), flush)?;
84 let new_in = (self.compressor.total_in() - old_in) as usize;
85 let new_out = (self.compressor.total_out() - old_out) as usize;
86 self.buffer.add_size(new_out);
87
88 Ok((new_in, res))
89 }
90}
91
92impl DeflateEncoder<Vec<u8>> {
93 // assume small files
94 pub async fn compress_vec<R>(&mut self, reader: &mut R, size_hint: usize) -> Result<(), Error>
95 where
96 R: AsyncRead + Unpin,
97 {
98 let mut buffer = Vec::with_capacity(size_hint);
99 reader.read_to_end(&mut buffer).await?;
100 self.inner.reserve(size_hint); // should be enough since we want smalller files
101 self.compressor.compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?;
102 Ok(())
103 }
104}
105
106impl<T: AsyncWrite + Unpin> DeflateEncoder<T> {
107 pub async fn compress<R>(&mut self, reader: &mut R) -> Result<(), Error>
108 where
109 R: AsyncRead + Unpin,
110 {
111 let mut buffer = ByteBuffer::with_capacity(BUFFER_SIZE);
112 let mut eof = false;
113 loop {
114 if !eof && !buffer.is_full() {
115 let read = buffer.read_from_async(reader).await?;
116 if read == 0 {
117 eof = true;
118 }
119 }
120 let (read, _res) = self.encode(&buffer[..], FlushCompress::None)?;
121 buffer.consume(read);
122
123 self.inner.write_all(&self.buffer[..]).await?;
124 self.buffer.clear();
125
126 if buffer.is_empty() && eof {
127 break;
128 }
129 }
130
131 loop {
132 let (_read, res) = self.encode(&[][..], FlushCompress::Finish)?;
133 self.inner.write_all(&self.buffer[..]).await?;
134 self.buffer.clear();
135 if res == flate2::Status::StreamEnd {
136 break;
137 }
138 }
139
140 Ok(())
141 }
142}
143
144impl<T, O> Stream for DeflateEncoder<T>
145where
146 T: Stream<Item = Result<O, io::Error>> + Unpin,
147 O: Into<Bytes>
148{
149 type Item = Result<Bytes, io::Error>;
150
151 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
152 let this = self.get_mut();
153
154 loop {
155 match this.state {
156 EncoderState::Reading => {
157 if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
158 let buf = res?;
159 this.input_buffer = buf.into();
160 this.state = EncoderState::Writing;
161 } else {
162 this.state = EncoderState::Flushing;
163 }
164 }
165 EncoderState::Writing => {
166 if this.input_buffer.is_empty() {
167 return Poll::Ready(Some(Err(io_format_err!("empty input during write"))));
168 }
169 let mut buf = this.input_buffer.split_off(0);
170 let (read, res) = this.encode(&buf[..], FlushCompress::None)?;
171 this.input_buffer = buf.split_off(read);
172 if this.input_buffer.is_empty() {
173 this.state = EncoderState::Reading;
174 }
175 if this.buffer.is_full() || res == flate2::Status::BufError {
176 let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
177 return Poll::Ready(Some(Ok(bytes.into())));
178 }
179 }
180 EncoderState::Flushing => {
181 let (_read, res) = this.encode(&[][..], FlushCompress::Finish)?;
182 if !this.buffer.is_empty() {
183 let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
184 return Poll::Ready(Some(Ok(bytes.into())));
185 }
186 if res == flate2::Status::StreamEnd {
187 this.state = EncoderState::Finished;
188 }
189 }
190 EncoderState::Finished => return Poll::Ready(None),
191 }
192 }
193 }
194}