]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/tools/compression.rs
move client to pbs-client subcrate
[proxmox-backup.git] / src / tools / compression.rs
index b27d7e70ef9977ad2d5c6092e24727f15360f869..19626efc80155d8bf947c1181d65773a33784cde 100644 (file)
@@ -1,19 +1,5 @@
-use std::io;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
 use anyhow::{bail, Error};
-use bytes::Bytes;
-use flate2::{Compress, Compression, FlushCompress};
-use futures::ready;
-use futures::stream::Stream;
 use hyper::header;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
-
-use proxmox::io_format_err;
-use proxmox::tools::byte_buffer::ByteBuffer;
-
-const BUFFER_SIZE: usize = 8192;
 
 /// Possible Compression Methods, order determines preference (later is preferred)
 #[derive(Eq, Ord, PartialEq, PartialOrd, Debug)]
@@ -51,182 +37,3 @@ impl std::str::FromStr for CompressionMethod {
         }
     }
 }
-
-pub enum Level {
-    Fastest,
-    Best,
-    Default,
-    Precise(u32),
-}
-
-#[derive(Eq, PartialEq)]
-enum EncoderState {
-    Reading,
-    Writing,
-    Flushing,
-    Finished,
-}
-
-pub struct DeflateEncoder<T> {
-    inner: T,
-    compressor: Compress,
-    buffer: ByteBuffer,
-    input_buffer: Bytes,
-    state: EncoderState,
-}
-
-impl<T> DeflateEncoder<T> {
-    pub fn new(inner: T) -> Self {
-        Self::with_quality(inner, Level::Default)
-    }
-
-    pub fn with_quality(inner: T, level: Level) -> Self {
-        let level = match level {
-            Level::Fastest => Compression::fast(),
-            Level::Best => Compression::best(),
-            Level::Default => Compression::new(3),
-            Level::Precise(val) => Compression::new(val),
-        };
-
-        Self {
-            inner,
-            compressor: Compress::new(level, false),
-            buffer: ByteBuffer::with_capacity(BUFFER_SIZE),
-            input_buffer: Bytes::new(),
-            state: EncoderState::Reading,
-        }
-    }
-
-    pub fn total_in(&self) -> u64 {
-        self.compressor.total_in()
-    }
-
-    pub fn total_out(&self) -> u64 {
-        self.compressor.total_out()
-    }
-
-    pub fn into_inner(self) -> T {
-        self.inner
-    }
-
-    fn encode(
-        &mut self,
-        inbuf: &[u8],
-        flush: FlushCompress,
-    ) -> Result<(usize, flate2::Status), io::Error> {
-        let old_in = self.compressor.total_in();
-        let old_out = self.compressor.total_out();
-        let res = self
-            .compressor
-            .compress(&inbuf[..], self.buffer.get_free_mut_slice(), flush)?;
-        let new_in = (self.compressor.total_in() - old_in) as usize;
-        let new_out = (self.compressor.total_out() - old_out) as usize;
-        self.buffer.add_size(new_out);
-
-        Ok((new_in, res))
-    }
-}
-
-impl DeflateEncoder<Vec<u8>> {
-    // assume small files
-    pub async fn compress_vec<R>(&mut self, reader: &mut R, size_hint: usize) -> Result<(), Error>
-    where
-        R: AsyncRead + Unpin,
-    {
-        let mut buffer = Vec::with_capacity(size_hint);
-        reader.read_to_end(&mut buffer).await?;
-        self.inner.reserve(size_hint); // should be enough since we want smalller files
-        self.compressor.compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?;
-        Ok(())
-    }
-}
-
-impl<T: AsyncWrite + Unpin> DeflateEncoder<T> {
-    pub async fn compress<R>(&mut self, reader: &mut R) -> Result<(), Error>
-    where
-        R: AsyncRead + Unpin,
-    {
-        let mut buffer = ByteBuffer::with_capacity(BUFFER_SIZE);
-        let mut eof = false;
-        loop {
-            if !eof && !buffer.is_full() {
-                let read = buffer.read_from_async(reader).await?;
-                if read == 0 {
-                    eof = true;
-                }
-            }
-            let (read, _res) = self.encode(&buffer[..], FlushCompress::None)?;
-            buffer.consume(read);
-
-            self.inner.write_all(&self.buffer[..]).await?;
-            self.buffer.clear();
-
-            if buffer.is_empty() && eof {
-                break;
-            }
-        }
-
-        loop {
-            let (_read, res) = self.encode(&[][..], FlushCompress::Finish)?;
-            self.inner.write_all(&self.buffer[..]).await?;
-            self.buffer.clear();
-            if res == flate2::Status::StreamEnd {
-                break;
-            }
-        }
-
-        Ok(())
-    }
-}
-
-impl<T, O> Stream for DeflateEncoder<T>
-where
-    T: Stream<Item = Result<O, io::Error>> + Unpin,
-    O: Into<Bytes>
-{
-    type Item = Result<Bytes, io::Error>;
-
-    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        let this = self.get_mut();
-
-        loop {
-            match this.state {
-                EncoderState::Reading => {
-                    if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
-                        let buf = res?;
-                        this.input_buffer = buf.into();
-                        this.state = EncoderState::Writing;
-                    } else {
-                        this.state = EncoderState::Flushing;
-                    }
-                }
-                EncoderState::Writing => {
-                    if this.input_buffer.is_empty() {
-                        return Poll::Ready(Some(Err(io_format_err!("empty input during write"))));
-                    }
-                    let mut buf = this.input_buffer.split_off(0);
-                    let (read, res) = this.encode(&buf[..], FlushCompress::None)?;
-                    this.input_buffer = buf.split_off(read);
-                    if this.input_buffer.is_empty() {
-                        this.state = EncoderState::Reading;
-                    }
-                    if this.buffer.is_full() || res == flate2::Status::BufError {
-                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
-                        return Poll::Ready(Some(Ok(bytes.into())));
-                    }
-                }
-                EncoderState::Flushing => {
-                    let (_read, res) = this.encode(&[][..], FlushCompress::Finish)?;
-                    if !this.buffer.is_empty() {
-                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
-                        return Poll::Ready(Some(Ok(bytes.into())));
-                    }
-                    if res == flate2::Status::StreamEnd {
-                        this.state = EncoderState::Finished;
-                    }
-                }
-                EncoderState::Finished => return Poll::Ready(None),
-            }
-        }
-    }
-}