use failure::*;
-use std::sync::Arc;
use std::ffi::{CStr, CString};
-use std::io::{Read, BufRead, BufReader, Write, Seek};
+use std::os::unix::ffi::OsStringExt;
+use std::convert::TryInto;
+use std::io::{Read, Write, Seek, SeekFrom};
use std::convert::TryFrom;
use chrono::offset::{TimeZone, Local};
use crate::pxar::catalog::{BackupCatalogWriter, CatalogEntryType};
-use super::{DataBlobWriter, DataBlobReader, CryptConfig};
+enum DirEntry {
+ Directory { name: Vec<u8>, start: u64 },
+ File { name: Vec<u8>, size: u64, mtime: u64 },
+ Symlink { name: Vec<u8> },
+ Hardlink { name: Vec<u8> },
+ BlockDevice { name: Vec<u8> },
+ CharDevice { name: Vec<u8> },
+ Fifo { name: Vec<u8> },
+ Socket { name: Vec<u8> },
+}
-pub struct CatalogBlobWriter<W: Write + Seek> {
- writer: DataBlobWriter<W>,
- level: usize,
- pos: u64,
+struct DirInfo {
+ name: CString,
+ entries: Vec<DirEntry>,
}
-impl <W: Write + Seek> CatalogBlobWriter<W> {
- pub fn new_compressed(writer: W) -> Result<Self, Error> {
- let writer = DataBlobWriter::new_compressed(writer)?;
- Ok(Self { writer, level: 0, pos: 0 })
+impl DirInfo {
+
+ fn new(name: CString) -> Self {
+ DirInfo { name, entries: Vec::new() }
+ }
+
+ fn new_rootdir() -> Self {
+ DirInfo::new(CString::new(b"/".to_vec()).unwrap())
+ }
+
+ fn encode_entry(data: &mut Vec<u8>, entry: &DirEntry, pos: u64) {
+ match entry {
+ DirEntry::Directory { name, start } => {
+ data.push(CatalogEntryType::Directory as u8);
+ data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+ data.extend_from_slice(name);
+ data.extend_from_slice(&(pos-start).to_le_bytes());
+ }
+ DirEntry::File { name, size, mtime } => {
+ data.push(CatalogEntryType::File as u8);
+ data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+ data.extend_from_slice(name);
+ data.extend_from_slice(&size.to_le_bytes());
+ data.extend_from_slice(&mtime.to_le_bytes());
+ }
+ DirEntry::Symlink { name } => {
+ data.push(CatalogEntryType::Symlink as u8);
+ data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+ data.extend_from_slice(name);
+ }
+ DirEntry::Hardlink { name } => {
+ data.push(CatalogEntryType::Hardlink as u8);
+ data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+ data.extend_from_slice(name);
+ }
+ DirEntry::BlockDevice { name } => {
+ data.push(CatalogEntryType::BlockDevice as u8);
+ data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+ data.extend_from_slice(name);
+ }
+ DirEntry::CharDevice { name } => {
+ data.push(CatalogEntryType::CharDevice as u8);
+ data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+ data.extend_from_slice(name);
+ }
+ DirEntry::Fifo { name } => {
+ data.push(CatalogEntryType::Fifo as u8);
+ data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+ data.extend_from_slice(name);
+ }
+ DirEntry::Socket { name } => {
+ data.push(CatalogEntryType::Socket as u8);
+ data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+ data.extend_from_slice(name);
+ }
+ }
}
- pub fn new_signed_compressed(writer: W, config: Arc<CryptConfig>) -> Result<Self, Error> {
- let writer = DataBlobWriter::new_signed_compressed(writer, config)?;
- Ok(Self { writer, level: 0, pos: 0 })
+
+ fn encode(self, start: u64) -> Result<(CString, Vec<u8>), Error> {
+ let mut table = Vec::new();
+ let count: u32 = self.entries.len().try_into()?;
+ for entry in self.entries {
+ Self::encode_entry(&mut table, &entry, start);
+ }
+
+ let data = Vec::new();
+ let mut writer = std::io::Cursor::new(data);
+ let size: u32 = (4 + 4 + table.len()).try_into()?;
+ writer.write_all(&size.to_le_bytes())?;
+ writer.write_all(&count.to_le_bytes())?;
+ writer.write_all(&table)?;
+ Ok((self.name, writer.into_inner()))
}
- pub fn new_encrypted_compressed(writer: W, config: Arc<CryptConfig>) -> Result<Self, Error> {
- let writer = DataBlobWriter::new_encrypted_compressed(writer, config)?;
- Ok(Self { writer, level: 0, pos: 0 })
+}
+
+pub struct CatalogWriter<W> {
+ writer: W,
+ dirstack: Vec<DirInfo>,
+ pos: u64,
+}
+
+impl <W: Write> CatalogWriter<W> {
+
+ pub fn new(writer: W) -> Result<Self, Error> {
+ Ok(Self { writer, dirstack: vec![ DirInfo::new_rootdir() ], pos: 0 })
}
- pub fn finish(self) -> Result<W, Error> {
- self.writer.finish()
+
+ pub fn finish(&mut self) -> Result<(), Error> {
+ if self.dirstack.len() != 1 {
+ bail!("unable to finish catalog at level {}", self.dirstack.len());
+ }
+
+ let dir = self.dirstack.pop().unwrap();
+
+ let start = self.pos;
+ let (_, data) = dir.encode(start)?;
+ self.write_all(&data)?;
+
+ self.write_all(&start.to_le_bytes())?;
+
+ self.writer.flush()?;
+
+ Ok(())
}
}
-impl <W: Write + Seek> BackupCatalogWriter for CatalogBlobWriter<W> {
+impl <W: Write> BackupCatalogWriter for CatalogWriter<W> {
fn start_directory(&mut self, name: &CStr) -> Result<(), Error> {
- self.write_all(&[CatalogEntryType::Directory as u8])?;
- self.write_all(name.to_bytes_with_nul())?;
- self.write_all(b"{")?;
- self.level += 1;
+ let new = DirInfo::new(name.to_owned());
+ self.dirstack.push(new);
Ok(())
}
fn end_directory(&mut self) -> Result<(), Error> {
- if self.level == 0 {
- bail!("got unexpected end_directory level 0");
- }
- self.write_all(b"}")?;
- self.level -= 1;
+ let (start, name) = match self.dirstack.pop() {
+ Some(dir) => {
+ let start = self.pos;
+ let (name, data) = dir.encode(start)?;
+ self.write_all(&data)?;
+ (start, name)
+ }
+ None => {
+ bail!("got unexpected end_directory level 0");
+ }
+ };
+
+ let current = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+ let name = name.to_bytes().to_vec();
+ current.entries.push(DirEntry::Directory { name, start });
+
Ok(())
}
fn add_file(&mut self, name: &CStr, size: u64, mtime: u64) -> Result<(), Error> {
- self.write_all(&[CatalogEntryType::File as u8])?;
- self.write_all(&size.to_le_bytes())?;
- self.write_all(&mtime.to_le_bytes())?;
- self.write_all(name.to_bytes_with_nul())?;
+ let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+ let name = name.to_bytes().to_vec();
+ dir.entries.push(DirEntry::File { name, size, mtime });
Ok(())
}
fn add_symlink(&mut self, name: &CStr) -> Result<(), Error> {
- self.write_all(&[CatalogEntryType::Symlink as u8])?;
- self.write_all(name.to_bytes_with_nul())?;
+ let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+ let name = name.to_bytes().to_vec();
+ dir.entries.push(DirEntry::Symlink { name });
Ok(())
}
fn add_hardlink(&mut self, name: &CStr) -> Result<(), Error> {
- self.write_all(&[CatalogEntryType::Hardlink as u8])?;
- self.write_all(name.to_bytes_with_nul())?;
+ let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+ let name = name.to_bytes().to_vec();
+ dir.entries.push(DirEntry::Hardlink { name });
Ok(())
}
fn add_block_device(&mut self, name: &CStr) -> Result<(), Error> {
- self.write_all(&[CatalogEntryType::BlockDevice as u8])?;
- self.write_all(name.to_bytes_with_nul())?;
- Ok(())
+ let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+ let name = name.to_bytes().to_vec();
+ dir.entries.push(DirEntry::BlockDevice { name });
+ Ok(())
}
fn add_char_device(&mut self, name: &CStr) -> Result<(), Error> {
- self.write_all(&[CatalogEntryType::CharDevice as u8])?;
- self.write_all(name.to_bytes_with_nul())?;
+ let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+ let name = name.to_bytes().to_vec();
+ dir.entries.push(DirEntry::CharDevice { name });
Ok(())
}
fn add_fifo(&mut self, name: &CStr) -> Result<(), Error> {
- self.write_all(&[CatalogEntryType::Fifo as u8])?;
- self.write_all(name.to_bytes_with_nul())?;
+ let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+ let name = name.to_bytes().to_vec();
+ dir.entries.push(DirEntry::Fifo { name });
Ok(())
}
fn add_socket(&mut self, name: &CStr) -> Result<(), Error> {
- self.write_all(&[CatalogEntryType::Socket as u8])?;
- self.write_all(name.to_bytes_with_nul())?;
+ let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+ let name = name.to_bytes().to_vec();
+ dir.entries.push(DirEntry::Socket { name });
Ok(())
}
}
-impl<W: Write + Seek> CatalogBlobWriter<W> {
+impl<W: Write> CatalogWriter<W> {
fn write_all(&mut self, data: &[u8]) -> Result<(), Error> {
self.writer.write_all(data)?;
self.pos += u64::try_from(data.len())?;
}
}
-pub struct CatalogBlobReader<R: Read + BufRead> {
- reader: BufReader<DataBlobReader<R>>,
- dir_stack: Vec<CString>,
+// fixme: move to somehere else?
+/// Implement Write to tokio mpsc channel Sender
+pub struct SenderWriter(tokio::sync::mpsc::Sender<Result<Vec<u8>, Error>>);
+
+impl SenderWriter {
+ pub fn new(sender: tokio::sync::mpsc::Sender<Result<Vec<u8>, Error>>) -> Self {
+ Self(sender)
+ }
}
-impl <R: Read + BufRead> CatalogBlobReader<R> {
+impl Write for SenderWriter {
+ fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
+ futures::executor::block_on(async move {
+ self.0.send(Ok(buf.to_vec())).await
+ .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?;
+ Ok(buf.len())
+ })
+ }
- pub fn new(reader: R, crypt_config: Option<Arc<CryptConfig>>) -> Result<Self, Error> {
- let dir_stack = Vec::new();
+ fn flush(&mut self) -> Result<(), std::io::Error> {
+ Ok(())
+ }
+}
- let reader = BufReader::new(DataBlobReader::new(reader, crypt_config)?);
+pub struct CatalogReader<R> {
+ reader: R,
+}
- Ok(Self { reader, dir_stack })
- }
+impl <R: Read + Seek> CatalogReader<R> {
- fn read_filename(&mut self) -> Result<std::ffi::CString, Error> {
- let mut filename = Vec::new();
- self.reader.read_until(0u8, &mut filename)?;
- if filename.len() > 0 && filename[filename.len()-1] == 0u8 {
- filename.pop();
- }
- if filename.len() == 0 {
- bail!("got zero length filename");
- }
- if filename.iter().find(|b| **b == b'/').is_some() {
- bail!("found invalid filename with slashes.");
- }
- Ok(unsafe { CString::from_vec_unchecked(filename) })
+ pub fn new(reader: R) -> Self {
+ Self { reader }
}
- fn next_byte(&mut self) -> Result<u8, std::io::Error> {
+ fn next_byte<C: Read>(mut reader: C) -> Result<u8, std::io::Error> {
let mut buf = [0u8; 1];
- self.reader.read_exact(&mut buf)?;
+ reader.read_exact(&mut buf)?;
Ok(buf[0])
}
- fn expect_next(&mut self, expect: u8) -> Result<(), Error> {
- let next = self.next_byte()?;
- if next != expect {
- bail!("got unexpected byte ({} != {})", next, expect);
- }
- Ok(())
- }
+ pub fn dump(&mut self) -> Result<(), Error> {
- fn print_entry(&self, etype: CatalogEntryType, filename: &CStr, size: u64, mtime: u64) -> Result<(), Error> {
- let mut out = Vec::new();
+ self.reader.seek(SeekFrom::End(-8))?;
- write!(out, "{} ", char::from(etype as u8))?;
+ let start = unsafe { self.reader.read_le_value::<u64>()? };
- for name in &self.dir_stack {
- out.extend(name.to_bytes());
- out.push(b'/');
- }
+ self.dump_dir(std::path::Path::new("./"), start)
+ }
- out.extend(filename.to_bytes());
+ pub fn dump_dir(&mut self, prefix: &std::path::Path, start: u64) -> Result<(), Error> {
- let dt = Local.timestamp(mtime as i64, 0);
+ self.reader.seek(SeekFrom::Start(start))?;
- if etype == CatalogEntryType::File {
- write!(out, " {} {}", size, dt.to_rfc3339_opts(chrono::SecondsFormat::Secs, false))?;
- }
+ let size = unsafe { self.reader.read_le_value::<u32>()? } as usize;
- writeln!(out)?;
- std::io::stdout().write_all(&out)?;
+ if size < 8 { bail!("got small directory size {}", size) };
- Ok(())
- }
+ let data = self.reader.read_exact_allocated(size - 4)?;
- fn parse_entries(&mut self) -> Result<(), Error> {
+ let mut cursor = &data[..];
- loop {
- let etype = match self.next_byte() {
- Ok(v) => v,
- Err(err) => {
- if err.kind() == std::io::ErrorKind::UnexpectedEof && self.dir_stack.len() == 0 {
- break;
- }
+ let entries = unsafe { cursor.read_le_value::<u32>()? };
- return Err(err.into());
- }
- };
- if etype == b'}' {
- if self.dir_stack.pop().is_none() {
- bail!("got unexpected '}'");
- }
- break;
- }
+ //println!("TEST {} {} size {}", start, entries, size);
+
+ for _ in 0..entries {
+ let etype = CatalogEntryType::try_from(Self::next_byte(&mut cursor)?)?;
+ let name_len = unsafe { cursor.read_le_value::<u32>()? };
+ let name = cursor.read_exact_allocated(name_len as usize)?;
+
+ let mut path = std::path::PathBuf::from(prefix);
+ path.push(std::ffi::OsString::from_vec(name));
- let etype = CatalogEntryType::try_from(etype)?;
match etype {
CatalogEntryType::Directory => {
- let filename = self.read_filename()?;
- self.print_entry(etype.into(), &filename, 0, 0)?;
- self.dir_stack.push(filename);
- self.expect_next(b'{')?;
- self.parse_entries()?;
+ println!("{} {:?}", char::from(etype as u8), path);
+ let offset = unsafe { cursor.read_le_value::<u64>()? };
+ if offset > start {
+ bail!("got wrong directory offset ({} > {})", offset, start);
+ }
+ let pos = start - offset;
+ self.dump_dir(&path, pos)?;
}
CatalogEntryType::File => {
- let size = unsafe { self.reader.read_le_value::<u64>()? };
- let mtime = unsafe { self.reader.read_le_value::<u64>()? };
- let filename = self.read_filename()?;
- self.print_entry(etype.into(), &filename, size, mtime)?;
+ let size = unsafe { cursor.read_le_value::<u64>()? };
+ let mtime = unsafe { cursor.read_le_value::<u64>()? };
+
+ let dt = Local.timestamp(mtime as i64, 0);
+
+ println!("{} {:?} {} {}",
+ char::from(etype as u8),
+ path,
+ size,
+ dt.to_rfc3339_opts(chrono::SecondsFormat::Secs, false)
+ );
}
- CatalogEntryType::Symlink |
- CatalogEntryType::Hardlink |
- CatalogEntryType::Fifo |
- CatalogEntryType::Socket |
- CatalogEntryType::BlockDevice |
- CatalogEntryType::CharDevice => {
- let filename = self.read_filename()?;
- self.print_entry(etype.into(), &filename, 0, 0)?;
+ _ => {
+ println!("{} {:?}", char::from(etype as u8), path);
}
}
}
- Ok(())
- }
- pub fn dump(&mut self) -> Result<(), Error> {
- self.parse_entries()?;
Ok(())
}
+
}
use std::path::{Path, PathBuf};
use std::collections::{HashSet, HashMap};
use std::ffi::OsStr;
-use std::io::{BufReader, Read, Write, Seek, SeekFrom};
+use std::io::{Read, Write, Seek, SeekFrom};
use std::os::unix::fs::OpenOptionsExt;
use proxmox::tools::fs::{file_get_contents, file_get_json, file_set_contents, image_size};
verbose: bool,
skip_lost_and_found: bool,
crypt_config: Option<Arc<CryptConfig>>,
- catalog: Arc<Mutex<CatalogBlobWriter<std::fs::File>>>,
+ catalog: Arc<Mutex<CatalogWriter<SenderWriter>>>,
) -> Result<BackupStats, Error> {
let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), device_set, verbose, skip_lost_and_found, catalog)?;
true,
).await?;
+ let tmpfile = std::fs::OpenOptions::new()
+ .write(true)
+ .read(true)
+ .custom_flags(libc::O_TMPFILE)
+ .open("/tmp")?;
+
let manifest = client.download_manifest().await?;
- let blob_file = std::fs::OpenOptions::new()
- .read(true)
+ let tmpfile = client.download(CATALOG_NAME, tmpfile).await?;
+
+ let index = DynamicIndexReader::new(tmpfile)
+ .map_err(|err| format_err!("unable to read catalog index - {}", err))?;
+
+ // Note: do not use values stored in index (not trusted) - instead, computed them again
+ let (csum, size) = index.compute_csum();
+ manifest.verify_file(CATALOG_NAME, &csum, size)?;
+
+ let most_used = index.find_most_used_chunks(8);
+
+ let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used);
+
+ let mut reader = BufferedDynamicReader::new(index, chunk_reader);
+
+ let mut catalogfile = std::fs::OpenOptions::new()
.write(true)
+ .read(true)
.custom_flags(libc::O_TMPFILE)
.open("/tmp")?;
- let mut blob_file = client.download(CATALOG_BLOB_NAME, blob_file).await?;
-
- let (csum, size) = compute_file_csum(&mut blob_file)?;
- manifest.verify_file(CATALOG_BLOB_NAME, &csum, size)?;
+ std::io::copy(&mut reader, &mut catalogfile)
+ .map_err(|err| format_err!("unable to download catalog - {}", err))?;
- blob_file.seek(SeekFrom::Start(0))?;
+ catalogfile.seek(SeekFrom::Start(0))?;
- let reader = BufReader::new(blob_file);
- let mut catalog_reader = CatalogBlobReader::new(reader, crypt_config)?;
+ let mut catalog_reader = CatalogReader::new(catalogfile);
catalog_reader.dump()?;
bail!("unable to parse directory specification '{}'", value);
}
+fn spawn_catalog_upload(
+ client: Arc<BackupWriter>,
+ crypt_config: Option<Arc<CryptConfig>>,
+) -> Result<
+ (
+ Arc<Mutex<CatalogWriter<SenderWriter>>>,
+ tokio::sync::oneshot::Receiver<Result<BackupStats, Error>>
+ ), Error>
+{
+ let (catalog_tx, catalog_rx) = mpsc::channel(10); // allow to buffer 10 writes
+ let catalog_stream = catalog_rx.map_err(Error::from);
+ let catalog_chunk_size = 512*1024;
+ let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
+
+ let catalog = Arc::new(Mutex::new(CatalogWriter::new(SenderWriter::new(catalog_tx))?));
+
+ let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel();
+
+ tokio::spawn(async move {
+ let catalog_upload_result = client
+ .upload_stream(CATALOG_NAME, catalog_chunk_stream, "dynamic", None, crypt_config)
+ .await;
+
+ if let Err(ref err) = catalog_upload_result {
+ eprintln!("catalog upload error - {}", err);
+ client.cancel();
+ }
+
+ let _ = catalog_result_tx.send(catalog_upload_result);
+ });
+
+ Ok((catalog, catalog_result_rx))
+}
+
fn create_backup(
param: Value,
_info: &ApiMethod,
enum BackupType { PXAR, IMAGE, CONFIG, LOGFILE };
+ let mut upload_catalog = false;
+
for backupspec in backupspec_list {
let (target, filename) = parse_backupspec(backupspec.as_str().unwrap())?;
bail!("got unexpected file type (expected directory)");
}
upload_list.push((BackupType::PXAR, filename.to_owned(), format!("{}.didx", target), 0));
+ upload_catalog = true;
}
"img" => {
let snapshot = BackupDir::new(backup_type, backup_id, backup_time.timestamp());
let mut manifest = BackupManifest::new(snapshot);
- // fixme: encrypt/sign catalog?
- let catalog_file = std::fs::OpenOptions::new()
- .write(true)
- .read(true)
- .custom_flags(libc::O_TMPFILE)
- .open("/tmp")?;
-
- let catalog = Arc::new(Mutex::new(CatalogBlobWriter::new_compressed(catalog_file)?));
- let mut upload_catalog = false;
+ let (catalog, catalog_result_rx) = spawn_catalog_upload(client.clone(), crypt_config.clone())?;
for (backup_type, filename, target, size) in upload_list {
match backup_type {
manifest.add_file(target, stats.size, stats.csum);
}
BackupType::PXAR => {
- upload_catalog = true;
println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target);
catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
let stats = backup_directory(
if upload_catalog {
let mutex = Arc::try_unwrap(catalog)
.map_err(|_| format_err!("unable to get catalog (still used)"))?;
- let mut catalog_file = mutex.into_inner().unwrap().finish()?;
+ let mut catalog = mutex.into_inner().unwrap();
+
+ catalog.finish()?;
- let target = CATALOG_BLOB_NAME;
+ drop(catalog); // close upload stream
- catalog_file.seek(SeekFrom::Start(0))?;
+ let stats = catalog_result_rx.await??;
- let stats = client.upload_blob(catalog_file, target).await?;
- manifest.add_file(target.to_owned(), stats.size, stats.csum);
+ manifest.add_file(CATALOG_NAME.to_owned(), stats.size, stats.csum);
}
if let Some(rsa_encrypted_key) = rsa_encrypted_key {