]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/backup/dynamic_index.rs
update to pxar 0.1.9, update ReadAt implementations
[proxmox-backup.git] / src / backup / dynamic_index.rs
index 2fa1c2c822f26fd345ec865655ebba48949a1a51..ae8427a4c067a7329b5d98454fb127dfd6c0cf36 100644 (file)
@@ -1,23 +1,28 @@
-use std::convert::TryInto;
 use std::fs::File;
-use std::io::{BufWriter, Seek, SeekFrom, Write};
+use std::io::{self, BufWriter, Seek, SeekFrom, Write};
+use std::ops::Range;
 use std::os::unix::io::AsRawFd;
 use std::path::{Path, PathBuf};
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
+use std::task::Context;
+use std::pin::Pin;
 
-use failure::*;
-use uuid::Uuid;
+use anyhow::{bail, format_err, Error};
 
 use proxmox::tools::io::ReadExt;
+use proxmox::tools::uuid::Uuid;
 use proxmox::tools::vec;
-use proxmox_protocol::Chunker;
+use proxmox::tools::mmap::Mmap;
+use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
 
 use super::chunk_stat::ChunkStat;
 use super::chunk_store::ChunkStore;
+use super::index::ChunkReadInfo;
 use super::read_chunk::ReadChunk;
+use super::Chunker;
 use super::IndexFile;
-use super::{DataChunk, DataChunkBuilder};
-use crate::tools;
+use super::{DataBlob, DataChunkBuilder};
+use crate::tools::{self, epoch_now_u64};
 
 /// Header format definition for dynamic index files (`.dixd`)
 #[repr(C)]
@@ -27,59 +32,59 @@ pub struct DynamicIndexHeader {
     pub ctime: u64,
     /// Sha256 over the index ``SHA256(offset1||digest1||offset2||digest2||...)``
     pub index_csum: [u8; 32],
-    reserved: [u8; 4030], // overall size is one page (4096 bytes)
+    reserved: [u8; 4032], // overall size is one page (4096 bytes)
+}
+proxmox::static_assert_size!(DynamicIndexHeader, 4096);
+// TODO: Once non-Copy unions are stabilized, use:
+// union DynamicIndexHeader {
+//     reserved: [u8; 4096],
+//     pub data: DynamicIndexHeaderData,
+// }
+
+#[derive(Clone, Debug)]
+#[repr(C)]
+pub struct DynamicEntry {
+    end_le: u64,
+    digest: [u8; 32],
 }
 
+impl DynamicEntry {
+    #[inline]
+    pub fn end(&self) -> u64 {
+        u64::from_le(self.end_le)
+    }
+}
 
 pub struct DynamicIndexReader {
     _file: File,
     pub size: usize,
-    index: *const u8,
-    index_entries: usize,
+    index: Mmap<DynamicEntry>,
     pub uuid: [u8; 16],
     pub ctime: u64,
     pub index_csum: [u8; 32],
 }
 
-// `index` is mmap()ed which cannot be thread-local so should be sendable
-// FIXME: Introduce an mmap wrapper type for this?
-unsafe impl Send for DynamicIndexReader {}
-
-impl Drop for DynamicIndexReader {
-
-    fn drop(&mut self) {
-        if let Err(err) = self.unmap() {
-            eprintln!("Unable to unmap dynamic index - {}", err);
-        }
-    }
-}
-
 impl DynamicIndexReader {
-
     pub fn open(path: &Path) -> Result<Self, Error> {
-
         File::open(path)
             .map_err(Error::from)
-            .and_then(|file| Self::new(file))
+            .and_then(Self::new)
             .map_err(|err| format_err!("Unable to open dynamic index {:?} - {}", path, err))
     }
 
     pub fn new(mut file: std::fs::File) -> Result<Self, Error> {
-
-        if let Err(err) = nix::fcntl::flock(file.as_raw_fd(), nix::fcntl::FlockArg::LockSharedNonblock) {
+        if let Err(err) =
+            nix::fcntl::flock(file.as_raw_fd(), nix::fcntl::FlockArg::LockSharedNonblock)
+        {
             bail!("unable to get shared lock - {}", err);
         }
 
+        // FIXME: This is NOT OUR job! Check the callers of this method and remove this!
         file.seek(SeekFrom::Start(0))?;
 
         let header_size = std::mem::size_of::<DynamicIndexHeader>();
 
-        // todo: use static assertion when available in rust
-        if header_size != 4096 { bail!("got unexpected header size"); }
-
-        let buffer = file.read_exact_allocated(header_size)?;
-
-        let header = unsafe { &* (buffer.as_ptr() as *const DynamicIndexHeader) };
+        let header: Box<DynamicIndexHeader> = unsafe { file.read_host_value_boxed()? };
 
         if header.magic != super::DYNAMIC_SIZED_CHUNK_INDEX_1_0 {
             bail!("got unknown magic number");
@@ -94,102 +99,94 @@ impl DynamicIndexReader {
         let size = stat.st_size as usize;
 
         let index_size = size - header_size;
-        if (index_size % 40) != 0 {
+        let index_count = index_size / 40;
+        if index_count * 40 != index_size {
             bail!("got unexpected file size");
         }
 
-        let data = unsafe { nix::sys::mman::mmap(
-            std::ptr::null_mut(),
-            index_size,
-            nix::sys::mman::ProtFlags::PROT_READ,
-            nix::sys::mman::MapFlags::MAP_PRIVATE,
-            rawfd,
-            header_size as i64) }? as *const u8;
+        let index = unsafe {
+            Mmap::map_fd(
+                rawfd,
+                header_size as u64,
+                index_count,
+                nix::sys::mman::ProtFlags::PROT_READ,
+                nix::sys::mman::MapFlags::MAP_PRIVATE,
+            )?
+        };
 
         Ok(Self {
             _file: file,
             size,
-            index: data,
-            index_entries: index_size/40,
+            index,
             ctime,
             uuid: header.uuid,
             index_csum: header.index_csum,
         })
     }
 
-    fn unmap(&mut self) -> Result<(), Error> {
-
-        if self.index == std::ptr::null_mut() { return Ok(()); }
-
-        if let Err(err) = unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, self.index_entries*40) } {
-            bail!("unmap dynamic index failed - {}", err);
-        }
-
-        self.index = std::ptr::null_mut();
-
-        Ok(())
-    }
-
-    pub fn chunk_info(&self, pos: usize) -> Result<(u64, u64, [u8; 32]), Error> {
-        if pos >= self.index_entries {
+    #[allow(clippy::cast_ptr_alignment)]
+    pub fn chunk_info(&self, pos: usize) -> Result<ChunkReadInfo, Error> {
+        if pos >= self.index.len() {
             bail!("chunk index out of range");
         }
         let start = if pos == 0 {
             0
         } else {
-            unsafe { *(self.index.add((pos-1)*40) as *const u64) }
+            self.index[pos - 1].end()
         };
 
-        let end = unsafe { *(self.index.add(pos*40) as *const u64) };
-        let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() };
-        unsafe { std::ptr::copy_nonoverlapping(self.index.add(pos*40+8), digest.as_mut_ptr(), 32); }
+        let end = self.index[pos].end();
 
-        Ok((start, end, digest))
+        Ok(ChunkReadInfo {
+            range: start..end,
+            digest: self.index[pos].digest.clone(),
+        })
     }
 
     #[inline]
+    #[allow(clippy::cast_ptr_alignment)]
     fn chunk_end(&self, pos: usize) -> u64 {
-        if pos >= self.index_entries {
+        if pos >= self.index.len() {
             panic!("chunk index out of range");
         }
-        unsafe { *(self.index.add(pos*40) as *const u64) }
+        self.index[pos].end()
     }
 
     #[inline]
     fn chunk_digest(&self, pos: usize) -> &[u8; 32] {
-        if pos >= self.index_entries {
+        if pos >= self.index.len() {
             panic!("chunk index out of range");
         }
-        let slice = unsafe {  std::slice::from_raw_parts(self.index.add(pos*40+8), 32) };
-        slice.try_into().unwrap()
+        &self.index[pos].digest
     }
 
-    /*
-    pub fn dump_pxar(&self, mut writer: Box<dyn Write>) -> Result<(), Error> {
-
-        for pos in 0..self.index_entries {
-            let _end = self.chunk_end(pos);
-            let digest = self.chunk_digest(pos);
-            //println!("Dump {:08x}", end );
-            let chunk = self.store.read_chunk(digest)?;
-            // fimxe: handle encrypted chunks
-            let data = chunk.decode(None)?;
-            writer.write_all(&data)?;
+    /// Compute checksum and data size
+    pub fn compute_csum(&self) -> ([u8; 32], u64) {
+        let mut csum = openssl::sha::Sha256::new();
+        for entry in &self.index {
+            csum.update(&entry.end_le.to_ne_bytes());
+            csum.update(&entry.digest);
         }
+        let csum = csum.finish();
 
-        Ok(())
+        (
+            csum,
+            self.index
+                .last()
+                .map(|entry| entry.end())
+                .unwrap_or(0)
+        )
     }
-    */
 
+    // TODO: can we use std::slice::binary_search with Mmap now?
     fn binary_search(
         &self,
         start_idx: usize,
         start: u64,
         end_idx: usize,
         end: u64,
-        offset: u64
+        offset: u64,
     ) -> Result<usize, Error> {
-
         if (offset >= end) || (offset < start) {
             bail!("offset out of range");
         }
@@ -197,38 +194,55 @@ impl DynamicIndexReader {
         if end_idx == start_idx {
             return Ok(start_idx); // found
         }
-        let middle_idx = (start_idx + end_idx)/2;
+        let middle_idx = (start_idx + end_idx) / 2;
         let middle_end = self.chunk_end(middle_idx);
 
         if offset < middle_end {
-            return self.binary_search(start_idx, start, middle_idx, middle_end, offset);
+            self.binary_search(start_idx, start, middle_idx, middle_end, offset)
         } else {
-            return self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset);
+            self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset)
         }
     }
 }
 
 impl IndexFile for DynamicIndexReader {
     fn index_count(&self) -> usize {
-        self.index_entries
+        self.index.len()
     }
 
     fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> {
-        if pos >= self.index_entries {
+        if pos >= self.index.len() {
             None
         } else {
-            Some(unsafe {
-                std::mem::transmute(self.chunk_digest(pos).as_ptr())
-            })
+            Some(unsafe { std::mem::transmute(self.chunk_digest(pos).as_ptr()) })
         }
     }
 
     fn index_bytes(&self) -> u64 {
-        if self.index_entries == 0 {
+        if self.index.is_empty() {
             0
         } else {
-            self.chunk_end((self.index_entries - 1) as usize)
+            self.chunk_end(self.index.len() - 1)
+        }
+    }
+}
+
+struct CachedChunk {
+    range: Range<u64>,
+    data: Vec<u8>,
+}
+
+impl CachedChunk {
+    /// Perform sanity checks on the range and data size:
+    pub fn new(range: Range<u64>, data: Vec<u8>) -> Result<Self, Error> {
+        if data.len() as u64 != range.end - range.start {
+            bail!(
+                "read chunk with wrong size ({} != {})",
+                data.len(),
+                range.end - range.start,
+            );
         }
+        Ok(Self { range, data })
     }
 }
 
@@ -240,63 +254,77 @@ pub struct BufferedDynamicReader<S> {
     buffered_chunk_idx: usize,
     buffered_chunk_start: u64,
     read_offset: u64,
+    lru_cache: crate::tools::lru_cache::LruCache<usize, CachedChunk>,
 }
 
-impl <S: ReadChunk> BufferedDynamicReader<S> {
+struct ChunkCacher<'a, S> {
+    store: &'a mut S,
+    index: &'a DynamicIndexReader,
+}
 
-    pub fn new(index: DynamicIndexReader, store: S) -> Self {
+impl<'a, S: ReadChunk> crate::tools::lru_cache::Cacher<usize, CachedChunk> for ChunkCacher<'a, S> {
+    fn fetch(&mut self, index: usize) -> Result<Option<CachedChunk>, Error> {
+        let info = self.index.chunk_info(index)?;
+        let range = info.range;
+        let data = self.store.read_chunk(&info.digest)?;
+        CachedChunk::new(range, data).map(Some)
+    }
+}
 
-        let archive_size = index.chunk_end(index.index_entries - 1);
+impl<S: ReadChunk> BufferedDynamicReader<S> {
+    pub fn new(index: DynamicIndexReader, store: S) -> Self {
+        let archive_size = index.index_bytes();
         Self {
             store,
-            index: index,
-            archive_size: archive_size,
-            read_buffer: Vec::with_capacity(1024*1024),
+            index,
+            archive_size,
+            read_buffer: Vec::with_capacity(1024 * 1024),
             buffered_chunk_idx: 0,
             buffered_chunk_start: 0,
             read_offset: 0,
+            lru_cache: crate::tools::lru_cache::LruCache::new(32),
         }
     }
 
-    pub fn archive_size(&self) -> u64 { self.archive_size }
+    pub fn archive_size(&self) -> u64 {
+        self.archive_size
+    }
 
     fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
-
-        let index = &self.index;
-        let (start, end, digest) = index.chunk_info(idx)?;
+        //let (start, end, data) = self.lru_cache.access(
+        let cached_chunk = self.lru_cache.access(
+            idx,
+            &mut ChunkCacher {
+                store: &mut self.store,
+                index: &self.index,
+            },
+        )?.ok_or_else(|| format_err!("chunk not found by cacher"))?;
 
         // fixme: avoid copy
-
-        let data = self.store.read_chunk(&digest)?;
-
-        if (end - start) != data.len() as u64  {
-            bail!("read chunk with wrong size ({} != {}", (end - start), data.len());
-        }
-
         self.read_buffer.clear();
-        self.read_buffer.extend_from_slice(&data);
+        self.read_buffer.extend_from_slice(&cached_chunk.data);
 
         self.buffered_chunk_idx = idx;
 
-        self.buffered_chunk_start = start as u64;
+        self.buffered_chunk_start = cached_chunk.range.start;
         //println!("BUFFER {} {}",  self.buffered_chunk_start, end);
         Ok(())
     }
 }
 
-impl <S: ReadChunk> crate::tools::BufferedRead for BufferedDynamicReader<S> {
-
+impl<S: ReadChunk> crate::tools::BufferedRead for BufferedDynamicReader<S> {
     fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
-
-        if offset == self.archive_size { return Ok(&self.read_buffer[0..0]); }
+        if offset == self.archive_size {
+            return Ok(&self.read_buffer[0..0]);
+        }
 
         let buffer_len = self.read_buffer.len();
         let index = &self.index;
 
         // optimization for sequential read
-        if buffer_len > 0 &&
-            ((self.buffered_chunk_idx + 1) < index.index_entries) &&
-            (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
+        if buffer_len > 0
+            && ((self.buffered_chunk_idx + 1) < index.index.len())
+            && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
         {
             let next_idx = self.buffered_chunk_idx + 1;
             let next_end = index.chunk_end(next_idx);
@@ -307,50 +335,50 @@ impl <S: ReadChunk> crate::tools::BufferedRead for BufferedDynamicReader<S> {
             }
         }
 
-        if (buffer_len == 0) ||
-            (offset < self.buffered_chunk_start) ||
-            (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
+        if (buffer_len == 0)
+            || (offset < self.buffered_chunk_start)
+            || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
         {
-            let end_idx = index.index_entries - 1;
+            let end_idx = index.index.len() - 1;
             let end = index.chunk_end(end_idx);
             let idx = index.binary_search(0, 0, end_idx, end, offset)?;
             self.buffer_chunk(idx)?;
-         }
+        }
 
         let buffer_offset = (offset - self.buffered_chunk_start) as usize;
         Ok(&self.read_buffer[buffer_offset..])
     }
 }
 
-impl <S: ReadChunk> std::io::Read for  BufferedDynamicReader<S> {
-
+impl<S: ReadChunk> std::io::Read for BufferedDynamicReader<S> {
     fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
-
-        use std::io::{Error, ErrorKind};
         use crate::tools::BufferedRead;
+        use std::io::{Error, ErrorKind};
 
         let data = match self.buffered_read(self.read_offset) {
             Ok(v) => v,
             Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
         };
 
-        let n = if data.len() > buf.len() { buf.len() } else { data.len() };
+        let n = if data.len() > buf.len() {
+            buf.len()
+        } else {
+            data.len()
+        };
 
-        unsafe { std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); }
+        buf[0..n].copy_from_slice(&data[0..n]);
 
         self.read_offset += n as u64;
 
-        return Ok(n);
+        Ok(n)
     }
 }
 
-impl <S: ReadChunk> std::io::Seek for  BufferedDynamicReader<S> {
-
+impl<S: ReadChunk> std::io::Seek for BufferedDynamicReader<S> {
     fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
-
         let new_offset = match pos {
-            SeekFrom::Start(start_offset) =>  start_offset as i64,
-            SeekFrom::End(end_offset) => (self.archive_size as i64)+ end_offset,
+            SeekFrom::Start(start_offset) => start_offset as i64,
+            SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
             SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
         };
 
@@ -358,7 +386,11 @@ impl <S: ReadChunk> std::io::Seek for  BufferedDynamicReader<S> {
         if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
             return Err(Error::new(
                 ErrorKind::Other,
-                format!("seek is out of range {} ([0..{}])", new_offset, self.archive_size)));
+                format!(
+                    "seek is out of range {} ([0..{}])",
+                    new_offset, self.archive_size
+                ),
+            ));
         }
         self.read_offset = new_offset as u64;
 
@@ -366,6 +398,49 @@ impl <S: ReadChunk> std::io::Seek for  BufferedDynamicReader<S> {
     }
 }
 
+/// This is a workaround until we have cleaned up the chunk/reader/... infrastructure for better
+/// async use!
+///
+/// Ideally BufferedDynamicReader gets replaced so the LruCache maps to `BroadcastFuture<Chunk>`,
+/// so that we can properly access it from multiple threads simultaneously while not issuing
+/// duplicate simultaneous reads over http.
+#[derive(Clone)]
+pub struct LocalDynamicReadAt<R: ReadChunk> {
+    inner: Arc<Mutex<BufferedDynamicReader<R>>>,
+}
+
+impl<R: ReadChunk> LocalDynamicReadAt<R> {
+    pub fn new(inner: BufferedDynamicReader<R>) -> Self {
+        Self {
+            inner: Arc::new(Mutex::new(inner)),
+        }
+    }
+}
+
+impl<R: ReadChunk> ReadAt for LocalDynamicReadAt<R> {
+    fn start_read_at<'a>(
+        self: Pin<&'a Self>,
+        _cx: &mut Context,
+        buf: &'a mut [u8],
+        offset: u64,
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+        use std::io::Read;
+        MaybeReady::Ready(tokio::task::block_in_place(move || {
+            let mut reader = self.inner.lock().unwrap();
+            reader.seek(SeekFrom::Start(offset))?;
+            Ok(reader.read(buf)?)
+        }))
+    }
+
+    fn poll_complete<'a>(
+        self: Pin<&'a Self>,
+        _op: ReadAtOperation<'a>,
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+        panic!("LocalDynamicReadAt::start_read_at returned Pending");
+    }
+}
+
+
 /// Create dynamic index files (`.dixd`)
 pub struct DynamicIndexWriter {
     store: Arc<ChunkStore>,
@@ -380,16 +455,13 @@ pub struct DynamicIndexWriter {
 }
 
 impl Drop for DynamicIndexWriter {
-
     fn drop(&mut self) {
         let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
     }
 }
 
 impl DynamicIndexWriter {
-
     pub fn create(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> {
-
         let shared_lock = store.try_shared_lock()?;
 
         let full_path = store.relative_path(path);
@@ -397,24 +469,26 @@ impl DynamicIndexWriter {
         tmp_path.set_extension("tmp_didx");
 
         let file = std::fs::OpenOptions::new()
-            .create(true).truncate(true)
+            .create(true)
+            .truncate(true)
             .read(true)
             .write(true)
             .open(&tmp_path)?;
 
-        let mut writer = BufWriter::with_capacity(1024*1024, file);
+        let mut writer = BufWriter::with_capacity(1024 * 1024, file);
 
         let header_size = std::mem::size_of::<DynamicIndexHeader>();
 
         // todo: use static assertion when available in rust
-        if header_size != 4096 { panic!("got unexpected header size"); }
+        if header_size != 4096 {
+            panic!("got unexpected header size");
+        }
 
-        let ctime = std::time::SystemTime::now().duration_since(
-            std::time::SystemTime::UNIX_EPOCH)?.as_secs();
+        let ctime = epoch_now_u64()?;
 
-        let uuid = Uuid::new_v4();
+        let uuid = Uuid::generate();
 
-        let mut buffer = vec::undefined(header_size);
+        let mut buffer = vec::zeroed(header_size);
         let header = crate::tools::map_struct_mut::<DynamicIndexHeader>(&mut buffer)?;
 
         header.magic = super::DYNAMIC_SIZED_CHUNK_INDEX_1_0;
@@ -430,7 +504,7 @@ impl DynamicIndexWriter {
         Ok(Self {
             store,
             _lock: shared_lock,
-            writer: writer,
+            writer,
             closed: false,
             filename: full_path,
             tmp_filename: tmp_path,
@@ -441,21 +515,23 @@ impl DynamicIndexWriter {
     }
 
     // fixme: use add_chunk instead?
-    pub fn insert_chunk(&self, chunk: &DataChunk) -> Result<(bool, u64), Error> {
-        self.store.insert_chunk(chunk)
+    pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> {
+        self.store.insert_chunk(chunk, digest)
     }
 
-    pub fn close(&mut self)  -> Result<[u8; 32], Error> {
-
+    pub fn close(&mut self) -> Result<[u8; 32], Error> {
         if self.closed {
-            bail!("cannot close already closed archive index file {:?}", self.filename);
+            bail!(
+                "cannot close already closed archive index file {:?}",
+                self.filename
+            );
         }
 
         self.closed = true;
 
         self.writer.flush()?;
 
-        let csum_offset = proxmox::tools::offsetof!(DynamicIndexHeader, index_csum);
+        let csum_offset = proxmox::offsetof!(DynamicIndexHeader, index_csum);
         self.writer.seek(SeekFrom::Start(csum_offset as u64))?;
 
         let csum = self.csum.take().unwrap();
@@ -464,7 +540,6 @@ impl DynamicIndexWriter {
         self.writer.write_all(&index_csum)?;
         self.writer.flush()?;
 
-
         if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
             bail!("Atomic rename file {:?} failed - {}", self.filename, err);
         }
@@ -475,18 +550,21 @@ impl DynamicIndexWriter {
     // fixme: rename to add_digest
     pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), Error> {
         if self.closed {
-            bail!("cannot write to closed dynamic index file {:?}", self.filename);
+            bail!(
+                "cannot write to closed dynamic index file {:?}",
+                self.filename
+            );
         }
 
-        let offset_le: &[u8; 8] = unsafe { &std::mem::transmute::<u64, [u8;8]>(offset.to_le()) };
+        let offset_le: &[u8; 8] = unsafe { &std::mem::transmute::<u64, [u8; 8]>(offset.to_le()) };
 
         if let Some(ref mut csum) = self.csum {
             csum.update(offset_le);
             csum.update(digest);
         }
 
-        self.writer.write(offset_le)?;
-        self.writer.write(digest)?;
+        self.writer.write_all(offset_le)?;
+        self.writer.write_all(digest)?;
         Ok(())
     }
 }
@@ -505,7 +583,6 @@ pub struct DynamicChunkWriter {
 }
 
 impl DynamicChunkWriter {
-
     pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self {
         Self {
             index,
@@ -514,7 +591,7 @@ impl DynamicChunkWriter {
             stat: ChunkStat::new(0),
             chunk_offset: 0,
             last_chunk: 0,
-            chunk_buffer: Vec::with_capacity(chunk_size*4),
+            chunk_buffer: Vec::with_capacity(chunk_size * 4),
         }
     }
 
@@ -522,8 +599,7 @@ impl DynamicChunkWriter {
         &self.stat
     }
 
-    pub fn close(&mut self)  -> Result<(), Error> {
-
+    pub fn close(&mut self) -> Result<(), Error> {
         if self.closed {
             return Ok(());
         }
@@ -537,16 +613,18 @@ impl DynamicChunkWriter {
         self.stat.size = self.chunk_offset as u64;
 
         // add size of index file
-        self.stat.size += (self.stat.chunk_count*40 + std::mem::size_of::<DynamicIndexHeader>()) as u64;
+        self.stat.size +=
+            (self.stat.chunk_count * 40 + std::mem::size_of::<DynamicIndexHeader>()) as u64;
 
         Ok(())
     }
 
     fn write_chunk_buffer(&mut self) -> Result<(), Error> {
-
         let chunk_size = self.chunk_buffer.len();
 
-        if chunk_size == 0 { return Ok(()); }
+        if chunk_size == 0 {
+            return Ok(());
+        }
 
         let expected_chunk_size = self.chunk_offset - self.last_chunk;
         if expected_chunk_size != self.chunk_buffer.len() {
@@ -557,15 +635,12 @@ impl DynamicChunkWriter {
 
         self.last_chunk = self.chunk_offset;
 
-        let chunk = DataChunkBuilder::new(&self.chunk_buffer)
+        let (chunk, digest) = DataChunkBuilder::new(&self.chunk_buffer)
             .compress(true)
             .build()?;
 
-        let digest = chunk.digest();
-
-        match self.index.insert_chunk(&chunk) {
+        match self.index.insert_chunk(&chunk, &digest) {
             Ok((is_duplicate, compressed_size)) => {
-
                 self.stat.compressed_size += compressed_size;
                 if is_duplicate {
                     self.stat.duplicate_chunks += 1;
@@ -573,45 +648,54 @@ impl DynamicChunkWriter {
                     self.stat.disk_size += compressed_size;
                 }
 
-                println!("ADD CHUNK {:016x} {} {}% {} {}", self.chunk_offset, chunk_size,
-                         (compressed_size*100)/(chunk_size as u64), is_duplicate, proxmox::tools::digest_to_hex(digest));
+                println!(
+                    "ADD CHUNK {:016x} {} {}% {} {}",
+                    self.chunk_offset,
+                    chunk_size,
+                    (compressed_size * 100) / (chunk_size as u64),
+                    is_duplicate,
+                    proxmox::tools::digest_to_hex(&digest)
+                );
                 self.index.add_chunk(self.chunk_offset as u64, &digest)?;
                 self.chunk_buffer.truncate(0);
-                return Ok(());
+                Ok(())
             }
             Err(err) => {
                 self.chunk_buffer.truncate(0);
-                return Err(err);
+                Err(err)
             }
         }
     }
 }
 
 impl Write for DynamicChunkWriter {
-
     fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
-
         let chunker = &mut self.chunker;
 
         let pos = chunker.scan(data);
 
         if pos > 0 {
-            self.chunk_buffer.extend(&data[0..pos]);
+            self.chunk_buffer.extend_from_slice(&data[0..pos]);
             self.chunk_offset += pos;
 
             if let Err(err) = self.write_chunk_buffer() {
-                return Err(std::io::Error::new(std::io::ErrorKind::Other, err.to_string()));
+                return Err(std::io::Error::new(
+                    std::io::ErrorKind::Other,
+                    err.to_string(),
+                ));
             }
             Ok(pos)
-
         } else {
             self.chunk_offset += data.len();
-            self.chunk_buffer.extend(data);
+            self.chunk_buffer.extend_from_slice(data);
             Ok(data.len())
         }
     }
 
     fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
-        Err(std::io::Error::new(std::io::ErrorKind::Other, "please use close() instead of flush()"))
+        Err(std::io::Error::new(
+            std::io::ErrorKind::Other,
+            "please use close() instead of flush()",
+        ))
     }
 }