]> git.proxmox.com Git - proxmox-backup.git/commitdiff
src/backup/archive_index.rs: use close() instead of flush()
authorDietmar Maurer <dietmar@proxmox.com>
Wed, 2 Jan 2019 10:02:56 +0000 (11:02 +0100)
committerDietmar Maurer <dietmar@proxmox.com>
Wed, 2 Jan 2019 10:02:56 +0000 (11:02 +0100)
Also pass a reference to the encoder.

src/backup/archive_index.rs
src/backup/chunker.rs
src/bin/backup-client.rs
src/catar/encoder.rs
tests/catar.rs

index 8a5a89cfc29e529b0a48dbf639b0640243247537..b6b7542c8ca1b94d75fb96db6b1a32b6896a990a 100644 (file)
@@ -23,6 +23,7 @@ pub struct ArchiveIndexWriter<'a> {
     store: &'a ChunkStore,
     chunker: Chunker,
     file: File,
+    closed: bool,
     filename: PathBuf,
     tmp_filename: PathBuf,
     uuid: [u8; 16],
@@ -71,6 +72,7 @@ impl <'a> ArchiveIndexWriter<'a> {
             store,
             chunker: Chunker::new(chunk_size),
             file: file,
+            closed: false,
             filename: full_path,
             tmp_filename: tmp_path,
             ctime,
@@ -81,6 +83,59 @@ impl <'a> ArchiveIndexWriter<'a> {
             chunk_buffer: Vec::with_capacity(chunk_size*4),
         })
     }
+
+    pub fn close(&mut self)  -> Result<(), Error> {
+
+        if self.closed {
+            bail!("cannot close already closed archive index file {:?}", self.filename);
+        }
+
+        self.closed = true;
+
+        self.write_chunk_buffer()?;
+
+        self.file.sync_all()?;
+
+        // fixme:
+
+        if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
+            bail!("Atomic rename file {:?} failed - {}", self.filename, err);
+        }
+
+        Ok(())
+    }
+
+    fn write_chunk_buffer(&mut self) -> Result<(), std::io::Error> {
+
+        use std::io::{Error, ErrorKind};
+
+        let chunk_size = self.chunk_buffer.len();
+
+        if chunk_size == 0 { return Ok(()); }
+
+        let expected_chunk_size = self.chunk_offset - self.last_chunk;
+        if expected_chunk_size != self.chunk_buffer.len() {
+            return Err(Error::new(
+                ErrorKind::Other,
+                format!("wrong chunk size {} != {}", expected_chunk_size, chunk_size)));
+        }
+
+        self.last_chunk = self.chunk_offset;
+
+        match self.store.insert_chunk(&self.chunk_buffer) {
+            Ok((is_duplicate, digest)) => {
+                println!("ADD CHUNK {} {} {} {}", self.chunk_offset, chunk_size, is_duplicate,  digest_to_hex(&digest));
+                self.chunk_buffer.truncate(0);
+                return Ok(());
+            }
+            Err(err) => {
+                self.chunk_buffer.truncate(0);
+                return Err(Error::new(ErrorKind::Other, err.to_string()));
+            }
+        }
+
+        Ok(())
+    }
 }
 
 impl <'a> Write for ArchiveIndexWriter<'a> {
@@ -97,32 +152,13 @@ impl <'a> Write for ArchiveIndexWriter<'a> {
             self.chunk_buffer.extend(&data[0..pos]);
             self.chunk_offset += pos;
 
-            let chunk_size = self.chunk_buffer.len();
-
-            let expected_chunk_size = self.chunk_offset - self.last_chunk;
-            if expected_chunk_size != self.chunk_buffer.len() {
-                panic!("wrong chunk size {} != {}",
-                       expected_chunk_size, chunk_size);
-            }
-
-            self.last_chunk = self.chunk_offset;
-
-            match self.store.insert_chunk(&self.chunk_buffer) {
-                Ok((is_duplicate, digest)) => {
-                    println!("ADD CHUNK {} {} {} {}", self.chunk_offset, chunk_size, is_duplicate,  digest_to_hex(&digest));
-                    self.chunk_buffer.truncate(0);
-                    return Ok(pos);
-                }
-                Err(err) => {
-                    self.chunk_buffer.truncate(0);
-                    return Err(Error::new(ErrorKind::Other, err.to_string()));
-                }
-            }
+            self.write_chunk_buffer()?;
+            Ok(pos)
 
         } else {
             self.chunk_offset += data.len();
             self.chunk_buffer.extend(data);
-            return Ok(data.len());
+            Ok(data.len())
         }
     }
 
@@ -130,21 +166,6 @@ impl <'a> Write for ArchiveIndexWriter<'a> {
 
         use std::io::{Error, ErrorKind};
 
-        let chunk_size = self.chunk_buffer.len();
-
-        if chunk_size == 0 { return Ok(()); }
-
-        // fixme: finalize index, disable further writes
-        match self.store.insert_chunk(&self.chunk_buffer) {
-            Ok((is_duplicate, digest)) => {
-                println!("ADD LAST CHUNK {} {} {} {}", self.last_chunk, chunk_size, is_duplicate,  digest_to_hex(&digest));
-                self.chunk_buffer.truncate(0);
-                Ok(())
-            }
-            Err(err) => {
-                self.chunk_buffer.truncate(0);
-                Err(Error::new(ErrorKind::Other, err.to_string()))
-            }
-        }
+        Err(Error::new(ErrorKind::Other, "please use close() instead of flush()"))
     }
 }
index f652deb0393914c41a203137a61d74340e3cbf7b..eee2dd794fea817a04267a5742b13e9287746895 100644 (file)
@@ -24,9 +24,6 @@ pub struct Chunker {
     discriminator: u32,
 
     window: [u8; CA_CHUNKER_WINDOW_SIZE],
-
-    offset: usize, // only used for debug
-    last_offset: usize, // only used for debug
 }
 
 const BUZHASH_TABLE: [u32; 256] = [
@@ -122,8 +119,6 @@ impl Chunker {
             chunk_size_avg: chunk_size_avg,
             discriminator:  discriminator,
             window: [0u8; CA_CHUNKER_WINDOW_SIZE],
-            offset: 0,
-            last_offset: 0,
         }
     }
 
index d807f46ed0906a9264a72b22a769d9d1cf79d807..5c728aa53d176c00875fa7cdb01c97ed155b59c7 100644 (file)
@@ -36,18 +36,13 @@ fn backup_dir(
         target.set_extension("aidx");
     }
 
-    // fixme: implement chunked writer
-    // let writer = std::fs::OpenOptions::new()
-    //    .create(true)
-    //    .write(true)
-    //    .truncate(true)
-    //    .open("mytest.catar")?;
-
-    let index = datastore.create_archive_writer(&target, chunk_size)?;
+    let mut index = datastore.create_archive_writer(&target, chunk_size)?;
 
     let path = std::path::PathBuf::from(path);
 
-    CaTarEncoder::encode(path, dir, index)?;
+    CaTarEncoder::encode(path, dir, &mut index)?;
+
+    index.close()?; // commit changes
 
     Ok(())
 }
index df8cac688138a9e1d16b7ce0dd0bfd62c23c4479..0e4abcc64acfc7c08f2e417a03a0a486cde0ed38 100644 (file)
@@ -26,18 +26,17 @@ use nix::sys::stat::FileStat;
 /// maximum memory usage.
 pub const MAX_DIRECTORY_ENTRIES: usize = 256*1024;
 
-pub struct CaTarEncoder<W: Write> {
+pub struct CaTarEncoder<'a, W: Write> {
     current_path: PathBuf, // used for error reporting
-    writer: W,
+    writer: &'a mut W,
     writer_pos: usize,
     size: usize,
     file_copy_buffer: Vec<u8>,
 }
 
+impl <'a, W: Write> CaTarEncoder<'a, W> {
 
-impl <W: Write> CaTarEncoder<W> {
-
-    pub fn encode(path: PathBuf, dir: &mut nix::dir::Dir, writer: W) -> Result<(), Error> {
+    pub fn encode(path: PathBuf, dir: &mut nix::dir::Dir, writer: &'a mut W) -> Result<(), Error> {
 
         const FILE_COPY_BUFFER_SIZE: usize = 1024*1024;
 
@@ -55,8 +54,6 @@ impl <W: Write> CaTarEncoder<W> {
         // todo: use scandirat??
         me.encode_dir(dir)?;
 
-        me.writer.flush()?;
-
         Ok(())
     }
 
index 6b274ce0b288ffeba928c3234231576053d9316d..cca9b94f0f755194df717cf17656cac6459dcc3a 100644 (file)
@@ -14,7 +14,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
         .status()
         .expect("failed to execute casync");
 
-    let writer = std::fs::OpenOptions::new()
+    let mut writer = std::fs::OpenOptions::new()
         .create(true)
         .write(true)
         .truncate(true)
@@ -26,7 +26,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
 
     let path = std::path::PathBuf::from(dir_name);
 
-    CaTarEncoder::encode(path, &mut dir, writer)?;
+    CaTarEncoder::encode(path, &mut dir, &mut writer)?;
 
     Command::new("cmp")
         .arg("--verbose")