]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/backup/dynamic_index.rs
backup: Add support for client side encryption
[proxmox-backup.git] / src / backup / dynamic_index.rs
index a0f5f74ec18b2e6fbf242c46227d7c95a1f6a5d0..9af4e0e539db294d4c76dd1c8fe8d6d32e3e6c79 100644 (file)
@@ -1,18 +1,25 @@
 use failure::*;
+use std::convert::TryInto;
 
 use crate::tools;
+use super::IndexFile;
 use super::chunk_stat::*;
 use super::chunk_store::*;
-use super::chunker::*;
+use proxmox_protocol::Chunker;
 
 use std::sync::Arc;
-use std::io::{Read, Write, BufWriter};
+use std::io::{Write, BufWriter};
 use std::fs::File;
 use std::path::{Path, PathBuf};
 use std::os::unix::io::AsRawFd;
 use uuid::Uuid;
 //use chrono::{Local, TimeZone};
 
+use crate::tools::io::ops::*;
+use crate::tools::vec;
+
+use super::{DataChunk, DataChunkBuilder};
+
 /// Header format definition for dynamic index files (`.dixd`)
 #[repr(C)]
 pub struct DynamicIndexHeader {
@@ -36,7 +43,8 @@ pub struct DynamicIndexReader {
     pub ctime: u64,
 }
 
-// fixme: ???!!!
+// `index` is mmap()ed which cannot be thread-local so should be sendable
+// FIXME: Introduce an mmap wrapper type for this?
 unsafe impl Send for DynamicIndexReader {}
 
 impl Drop for DynamicIndexReader {
@@ -56,15 +64,18 @@ impl DynamicIndexReader {
 
         let mut file = std::fs::File::open(&full_path)?;
 
+        if let Err(err) = nix::fcntl::flock(file.as_raw_fd(), nix::fcntl::FlockArg::LockSharedNonblock) {
+            bail!("unable to get shared lock on {:?} - {}", full_path, err);
+        }
+
         let header_size = std::mem::size_of::<DynamicIndexHeader>();
 
         // todo: use static assertion when available in rust
         if header_size != 4096 { bail!("got unexpected header size for {:?}", path); }
 
-        let mut buffer = vec![0u8; header_size];
-        file.read_exact(&mut buffer)?;
+        let buffer = file.read_exact_allocated(header_size)?;
 
-        let header = unsafe { &mut * (buffer.as_ptr() as *mut DynamicIndexHeader) };
+        let header = unsafe { &* (buffer.as_ptr() as *const DynamicIndexHeader) };
 
         if header.magic != *b"PROXMOX-DIDX" {
             bail!("got unknown magic number for {:?}", path);
@@ -124,6 +135,23 @@ impl DynamicIndexReader {
         Ok(())
     }
 
+    pub fn chunk_info(&self, pos: usize) -> Result<(u64, u64, [u8; 32]), Error> {
+        if pos >= self.index_entries {
+            bail!("chunk index out of range");
+        }
+        let start = if pos == 0 {
+            0
+        } else {
+            unsafe { *(self.index.add((pos-1)*40) as *const u64) }
+        };
+
+        let end = unsafe { *(self.index.add(pos*40) as *const u64) };
+        let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() };
+        unsafe { std::ptr::copy_nonoverlapping(self.index.add(pos*40+8), digest.as_mut_ptr(), 32); }
+
+        Ok((start, end, digest))
+    }
+
     #[inline]
     fn chunk_end(&self, pos: usize) -> u64 {
         if pos >= self.index_entries {
@@ -133,16 +161,20 @@ impl DynamicIndexReader {
     }
 
     #[inline]
-    fn chunk_digest(&self, pos: usize) -> &[u8] {
+    fn chunk_digest(&self, pos: usize) -> &[u8; 32] {
         if pos >= self.index_entries {
             panic!("chunk index out of range");
         }
-        unsafe {  std::slice::from_raw_parts(self.index.add(pos*40+8), 32) }
+        let slice = unsafe {  std::slice::from_raw_parts(self.index.add(pos*40+8), 32) };
+        slice.try_into().unwrap()
     }
 
     pub fn mark_used_chunks(&self, _status: &mut GarbageCollectionStatus) -> Result<(), Error> {
 
         for pos in 0..self.index_entries {
+
+            tools::fail_on_shutdown()?;
+
             let digest = self.chunk_digest(pos);
             if let Err(err) = self.store.touch_chunk(digest) {
                 bail!("unable to access chunk {}, required by {:?} - {}",
@@ -152,17 +184,16 @@ impl DynamicIndexReader {
         Ok(())
     }
 
-    pub fn dump_catar(&self, mut writer: Box<Write>) -> Result<(), Error> {
-
-        let mut buffer = Vec::with_capacity(1024*1024);
+    pub fn dump_pxar(&self, mut writer: Box<dyn Write>) -> Result<(), Error> {
 
         for pos in 0..self.index_entries {
             let _end = self.chunk_end(pos);
             let digest = self.chunk_digest(pos);
             //println!("Dump {:08x}", end );
-            self.store.read_chunk(digest, &mut buffer)?;
-            writer.write_all(&buffer)?;
-
+            let chunk = self.store.read_chunk(digest)?;
+            // fimxe: handle encrypted chunks
+            let data = chunk.decode(None)?;
+            writer.write_all(&data)?;
         }
 
         Ok(())
@@ -195,6 +226,22 @@ impl DynamicIndexReader {
     }
 }
 
+impl IndexFile for DynamicIndexReader {
+    fn index_count(&self) -> usize {
+        self.index_entries
+    }
+
+    fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> {
+        if pos >= self.index_entries {
+            None
+        } else {
+            Some(unsafe {
+                std::mem::transmute(self.chunk_digest(pos).as_ptr())
+            })
+        }
+    }
+}
+
 pub struct BufferedDynamicReader {
     index: DynamicIndexReader,
     archive_size: u64,
@@ -226,7 +273,14 @@ impl BufferedDynamicReader {
         let index = &self.index;
         let end = index.chunk_end(idx);
         let digest = index.chunk_digest(idx);
-        index.store.read_chunk(digest, &mut self.read_buffer)?;
+
+        let chunk = index.store.read_chunk(digest)?;
+        // fimxe: handle encrypted chunks
+        // fixme: avoid copy
+        let data = chunk.decode(None)?;
+
+        self.read_buffer.clear();
+        self.read_buffer.extend_from_slice(&data);
 
         self.buffered_chunk_idx = idx;
         self.buffered_chunk_start = end - (self.read_buffer.len() as u64);
@@ -235,7 +289,7 @@ impl BufferedDynamicReader {
     }
 }
 
-impl crate::tools::BufferedReader for  BufferedDynamicReader {
+impl crate::tools::BufferedRead for BufferedDynamicReader {
 
     fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
 
@@ -279,7 +333,7 @@ impl std::io::Read for  BufferedDynamicReader {
     fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
 
         use std::io::{Error, ErrorKind};
-        use crate::tools::BufferedReader;
+        use crate::tools::BufferedRead;
 
         let data = match self.buffered_read(self.read_offset) {
             Ok(v) => v,
@@ -320,21 +374,16 @@ impl std::io::Seek for  BufferedDynamicReader {
     }
 }
 
+/// Create dynamic index files (`.dixd`)
 pub struct DynamicIndexWriter {
     store: Arc<ChunkStore>,
-    chunker: Chunker,
+    _lock: tools::ProcessLockSharedGuard,
     writer: BufWriter<File>,
     closed: bool,
     filename: PathBuf,
     tmp_filename: PathBuf,
     pub uuid: [u8; 16],
     pub ctime: u64,
-
-    stat: ChunkStat,
-
-    chunk_offset: usize,
-    last_chunk: usize,
-    chunk_buffer: Vec<u8>,
 }
 
 impl Drop for DynamicIndexWriter {
@@ -346,7 +395,9 @@ impl Drop for DynamicIndexWriter {
 
 impl DynamicIndexWriter {
 
-    pub fn create(store: Arc<ChunkStore>, path: &Path, chunk_size: usize) -> Result<Self, Error> {
+    pub fn create(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> {
+
+        let shared_lock = store.try_shared_lock()?;
 
         let full_path = store.relative_path(path);
         let mut tmp_path = full_path.clone();
@@ -370,7 +421,7 @@ impl DynamicIndexWriter {
 
         let uuid = Uuid::new_v4();
 
-        let mut buffer = vec![0u8; header_size];
+        let mut buffer = vec::undefined(header_size);
         let header = crate::tools::map_struct_mut::<DynamicIndexHeader>(&mut buffer)?;
 
         header.magic = *b"PROXMOX-DIDX";
@@ -382,22 +433,21 @@ impl DynamicIndexWriter {
 
         Ok(Self {
             store,
-            chunker: Chunker::new(chunk_size),
+            _lock: shared_lock,
             writer: writer,
             closed: false,
             filename: full_path,
             tmp_filename: tmp_path,
             ctime,
             uuid: *uuid.as_bytes(),
-
-            stat: ChunkStat::new(0),
-
-            chunk_offset: 0,
-            last_chunk: 0,
-            chunk_buffer: Vec::with_capacity(chunk_size*4),
         })
     }
 
+    // fixme: use add_chunk instead?
+    pub fn insert_chunk(&self, chunk: &DataChunk) -> Result<(bool, u64), Error> {
+        self.store.insert_chunk(chunk)
+    }
+
     pub fn close(&mut self)  -> Result<(), Error> {
 
         if self.closed {
@@ -406,19 +456,8 @@ impl DynamicIndexWriter {
 
         self.closed = true;
 
-        self.write_chunk_buffer()?;
-
         self.writer.flush()?;
 
-        self.stat.size = self.chunk_offset as u64;
-
-        // add size of index file
-        self.stat.size += (self.stat.chunk_count*40 + std::mem::size_of::<DynamicIndexHeader>()) as u64;
-
-        println!("STAT: {:?}", self.stat);
-
-        // fixme:
-
         if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
             bail!("Atomic rename file {:?} failed - {}", self.filename, err);
         }
@@ -426,13 +465,69 @@ impl DynamicIndexWriter {
         Ok(())
     }
 
+    // fixme: rename to add_digest
+    pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), Error> {
+        if self.closed {
+            bail!("cannot write to closed dynamic index file {:?}", self.filename);
+        }
+        self.writer.write(unsafe { &std::mem::transmute::<u64, [u8;8]>(offset.to_le()) })?;
+        self.writer.write(digest)?;
+        Ok(())
+    }
+}
+
+/// Writer which splits a binary stream into dynamic sized chunks
+///
+/// And store the resulting chunk list into the index file.
+pub struct DynamicChunkWriter {
+    index: DynamicIndexWriter,
+    closed: bool,
+    chunker: Chunker,
+    stat: ChunkStat,
+    chunk_offset: usize,
+    last_chunk: usize,
+    chunk_buffer: Vec<u8>,
+}
+
+impl DynamicChunkWriter {
+
+    pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self {
+        Self {
+            index,
+            closed: false,
+            chunker: Chunker::new(chunk_size),
+            stat: ChunkStat::new(0),
+            chunk_offset: 0,
+            last_chunk: 0,
+            chunk_buffer: Vec::with_capacity(chunk_size*4),
+        }
+    }
+
     pub fn stat(&self) -> &ChunkStat {
         &self.stat
     }
 
-    fn write_chunk_buffer(&mut self) -> Result<(), std::io::Error> {
+    pub fn close(&mut self)  -> Result<(), Error> {
 
-        use std::io::{Error, ErrorKind};
+        if self.closed {
+            return Ok(());
+        }
+
+        self.closed = true;
+
+        self.write_chunk_buffer()?;
+
+        self.index.close()?;
+
+        self.stat.size = self.chunk_offset as u64;
+
+        // add size of index file
+        self.stat.size += (self.stat.chunk_count*40 + std::mem::size_of::<DynamicIndexHeader>()) as u64;
+
+        Ok(())
+    }
+
+    fn write_chunk_buffer(&mut self) -> Result<(), Error> {
 
         let chunk_size = self.chunk_buffer.len();
 
@@ -440,17 +535,21 @@ impl DynamicIndexWriter {
 
         let expected_chunk_size = self.chunk_offset - self.last_chunk;
         if expected_chunk_size != self.chunk_buffer.len() {
-            return Err(Error::new(
-                ErrorKind::Other,
-                format!("wrong chunk size {} != {}", expected_chunk_size, chunk_size)));
+            bail!("wrong chunk size {} != {}", expected_chunk_size, chunk_size);
         }
 
         self.stat.chunk_count += 1;
 
         self.last_chunk = self.chunk_offset;
 
-        match self.store.insert_chunk(&self.chunk_buffer) {
-            Ok((is_duplicate, digest, compressed_size)) => {
+        let chunk = DataChunkBuilder::new(&self.chunk_buffer)
+            .compress(true)
+            .build()?;
+
+        let digest = chunk.digest();
+
+        match self.index.insert_chunk(&chunk) {
+            Ok((is_duplicate, compressed_size)) => {
 
                 self.stat.compressed_size += compressed_size;
                 if is_duplicate {
@@ -460,21 +559,20 @@ impl DynamicIndexWriter {
                 }
 
                 println!("ADD CHUNK {:016x} {} {}% {} {}", self.chunk_offset, chunk_size,
-                         (compressed_size*100)/(chunk_size as u64), is_duplicate,  tools::digest_to_hex(&digest));
-                self.writer.write(unsafe { &std::mem::transmute::<u64, [u8;8]>(self.chunk_offset as u64) })?;
-                self.writer.write(&digest)?;
+                         (compressed_size*100)/(chunk_size as u64), is_duplicate,  tools::digest_to_hex(digest));
+                self.index.add_chunk(self.chunk_offset as u64, &digest)?;
                 self.chunk_buffer.truncate(0);
                 return Ok(());
             }
             Err(err) => {
                 self.chunk_buffer.truncate(0);
-                return Err(Error::new(ErrorKind::Other, err.to_string()));
+                return Err(err);
             }
         }
     }
 }
 
-impl Write for DynamicIndexWriter {
+impl Write for DynamicChunkWriter {
 
     fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
 
@@ -486,7 +584,9 @@ impl Write for DynamicIndexWriter {
             self.chunk_buffer.extend(&data[0..pos]);
             self.chunk_offset += pos;
 
-            self.write_chunk_buffer()?;
+            if let Err(err) = self.write_chunk_buffer() {
+                return Err(std::io::Error::new(std::io::ErrorKind::Other, err.to_string()));
+            }
             Ok(pos)
 
         } else {
@@ -497,9 +597,6 @@ impl Write for DynamicIndexWriter {
     }
 
     fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
-
-        use std::io::{Error, ErrorKind};
-
-        Err(Error::new(ErrorKind::Other, "please use close() instead of flush()"))
+        Err(std::io::Error::new(std::io::ErrorKind::Other, "please use close() instead of flush()"))
     }
 }