store: &'a ChunkStore,
chunker: Chunker,
file: File,
+ closed: bool,
filename: PathBuf,
tmp_filename: PathBuf,
uuid: [u8; 16],
store,
chunker: Chunker::new(chunk_size),
file: file,
+ closed: false,
filename: full_path,
tmp_filename: tmp_path,
ctime,
chunk_buffer: Vec::with_capacity(chunk_size*4),
})
}
+
+ pub fn close(&mut self) -> Result<(), Error> {
+
+ if self.closed {
+ bail!("cannot close already closed archive index file {:?}", self.filename);
+ }
+
+ self.closed = true;
+
+ self.write_chunk_buffer()?;
+
+ self.file.sync_all()?;
+
+ // fixme:
+
+ if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
+ bail!("Atomic rename file {:?} failed - {}", self.filename, err);
+ }
+
+ Ok(())
+ }
+
+ fn write_chunk_buffer(&mut self) -> Result<(), std::io::Error> {
+
+ use std::io::{Error, ErrorKind};
+
+ let chunk_size = self.chunk_buffer.len();
+
+ if chunk_size == 0 { return Ok(()); }
+
+ let expected_chunk_size = self.chunk_offset - self.last_chunk;
+ if expected_chunk_size != self.chunk_buffer.len() {
+ return Err(Error::new(
+ ErrorKind::Other,
+ format!("wrong chunk size {} != {}", expected_chunk_size, chunk_size)));
+ }
+
+ self.last_chunk = self.chunk_offset;
+
+ match self.store.insert_chunk(&self.chunk_buffer) {
+ Ok((is_duplicate, digest)) => {
+ println!("ADD CHUNK {} {} {} {}", self.chunk_offset, chunk_size, is_duplicate, digest_to_hex(&digest));
+ self.chunk_buffer.truncate(0);
+ return Ok(());
+ }
+ Err(err) => {
+ self.chunk_buffer.truncate(0);
+ return Err(Error::new(ErrorKind::Other, err.to_string()));
+ }
+ }
+
+ Ok(())
+ }
}
impl <'a> Write for ArchiveIndexWriter<'a> {
self.chunk_buffer.extend(&data[0..pos]);
self.chunk_offset += pos;
- let chunk_size = self.chunk_buffer.len();
-
- let expected_chunk_size = self.chunk_offset - self.last_chunk;
- if expected_chunk_size != self.chunk_buffer.len() {
- panic!("wrong chunk size {} != {}",
- expected_chunk_size, chunk_size);
- }
-
- self.last_chunk = self.chunk_offset;
-
- match self.store.insert_chunk(&self.chunk_buffer) {
- Ok((is_duplicate, digest)) => {
- println!("ADD CHUNK {} {} {} {}", self.chunk_offset, chunk_size, is_duplicate, digest_to_hex(&digest));
- self.chunk_buffer.truncate(0);
- return Ok(pos);
- }
- Err(err) => {
- self.chunk_buffer.truncate(0);
- return Err(Error::new(ErrorKind::Other, err.to_string()));
- }
- }
+ self.write_chunk_buffer()?;
+ Ok(pos)
} else {
self.chunk_offset += data.len();
self.chunk_buffer.extend(data);
- return Ok(data.len());
+ Ok(data.len())
}
}
use std::io::{Error, ErrorKind};
- let chunk_size = self.chunk_buffer.len();
-
- if chunk_size == 0 { return Ok(()); }
-
- // fixme: finalize index, disable further writes
- match self.store.insert_chunk(&self.chunk_buffer) {
- Ok((is_duplicate, digest)) => {
- println!("ADD LAST CHUNK {} {} {} {}", self.last_chunk, chunk_size, is_duplicate, digest_to_hex(&digest));
- self.chunk_buffer.truncate(0);
- Ok(())
- }
- Err(err) => {
- self.chunk_buffer.truncate(0);
- Err(Error::new(ErrorKind::Other, err.to_string()))
- }
- }
+ Err(Error::new(ErrorKind::Other, "please use close() instead of flush()"))
}
}
target.set_extension("aidx");
}
- // fixme: implement chunked writer
- // let writer = std::fs::OpenOptions::new()
- // .create(true)
- // .write(true)
- // .truncate(true)
- // .open("mytest.catar")?;
-
- let index = datastore.create_archive_writer(&target, chunk_size)?;
+ let mut index = datastore.create_archive_writer(&target, chunk_size)?;
let path = std::path::PathBuf::from(path);
- CaTarEncoder::encode(path, dir, index)?;
+ CaTarEncoder::encode(path, dir, &mut index)?;
+
+ index.close()?; // commit changes
Ok(())
}
/// maximum memory usage.
pub const MAX_DIRECTORY_ENTRIES: usize = 256*1024;
-pub struct CaTarEncoder<W: Write> {
+pub struct CaTarEncoder<'a, W: Write> {
current_path: PathBuf, // used for error reporting
- writer: W,
+ writer: &'a mut W,
writer_pos: usize,
size: usize,
file_copy_buffer: Vec<u8>,
}
+impl <'a, W: Write> CaTarEncoder<'a, W> {
-impl <W: Write> CaTarEncoder<W> {
-
- pub fn encode(path: PathBuf, dir: &mut nix::dir::Dir, writer: W) -> Result<(), Error> {
+ pub fn encode(path: PathBuf, dir: &mut nix::dir::Dir, writer: &'a mut W) -> Result<(), Error> {
const FILE_COPY_BUFFER_SIZE: usize = 1024*1024;
// todo: use scandirat??
me.encode_dir(dir)?;
- me.writer.flush()?;
-
Ok(())
}