]> git.proxmox.com Git - proxmox-backup.git/commitdiff
change catalog format, use dynamic index to store catalog.
authorDietmar Maurer <dietmar@proxmox.com>
Fri, 8 Nov 2019 09:35:48 +0000 (10:35 +0100)
committerDietmar Maurer <dietmar@proxmox.com>
Fri, 8 Nov 2019 09:35:48 +0000 (10:35 +0100)
In order to remove size restriction of a single blob.

src/backup.rs
src/backup/catalog_blob.rs
src/bin/proxmox-backup-client.rs
src/client/pxar_backup_stream.rs

index 905f60b7e66aa6b65f8c92c6027ab27103b2f67f..0cb9ad7554a86decfe7310eb747a76f9c87a811b 100644 (file)
 //!
 //! Not sure if this is better. TODO
 
-pub const CATALOG_BLOB_NAME: &str = "catalog.blob";
+// Note: .pcat1 => Proxmox Catalog Format version 1
+pub const CATALOG_NAME: &str = "catalog.pcat1.didx";
 
 #[macro_export]
 macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 {
index 478c1a1fec8e3c4f2e428f659cffd58621b9f030..ac6898e3e4754e6c6dfe8b13107e9bd835c2af24 100644 (file)
@@ -1,7 +1,8 @@
 use failure::*;
-use std::sync::Arc;
 use std::ffi::{CStr, CString};
-use std::io::{Read, BufRead, BufReader, Write, Seek};
+use std::os::unix::ffi::OsStringExt;
+use std::convert::TryInto;
+use std::io::{Read, Write, Seek, SeekFrom};
 use std::convert::TryFrom;
 
 use chrono::offset::{TimeZone, Local};
@@ -10,97 +11,207 @@ use proxmox::tools::io::ReadExt;
 
 use crate::pxar::catalog::{BackupCatalogWriter, CatalogEntryType};
 
-use super::{DataBlobWriter, DataBlobReader, CryptConfig};
+enum DirEntry {
+    Directory { name: Vec<u8>, start: u64 },
+    File { name: Vec<u8>, size: u64, mtime: u64 },
+    Symlink { name: Vec<u8> },
+    Hardlink { name: Vec<u8> },
+    BlockDevice { name: Vec<u8> },
+    CharDevice { name: Vec<u8> },
+    Fifo { name: Vec<u8> },
+    Socket { name: Vec<u8> },
+}
 
-pub struct CatalogBlobWriter<W: Write + Seek> {
-    writer: DataBlobWriter<W>,
-    level: usize,
-    pos: u64,
+struct DirInfo {
+    name: CString,
+    entries: Vec<DirEntry>,
 }
 
-impl <W: Write + Seek> CatalogBlobWriter<W> {
-    pub fn new_compressed(writer: W) -> Result<Self, Error> {
-        let writer = DataBlobWriter::new_compressed(writer)?;
-        Ok(Self { writer, level: 0, pos: 0 })
+impl DirInfo {
+
+    fn new(name: CString) -> Self {
+        DirInfo { name, entries: Vec::new() }
+    }
+
+    fn new_rootdir() -> Self {
+        DirInfo::new(CString::new(b"/".to_vec()).unwrap())
+    }
+
+    fn encode_entry(data: &mut Vec<u8>, entry: &DirEntry, pos: u64) {
+        match entry {
+            DirEntry::Directory { name, start } => {
+                data.push(CatalogEntryType::Directory as u8);
+                data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+                data.extend_from_slice(name);
+                data.extend_from_slice(&(pos-start).to_le_bytes());
+            }
+            DirEntry::File { name, size, mtime } => {
+                data.push(CatalogEntryType::File as u8);
+                data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+                data.extend_from_slice(name);
+                data.extend_from_slice(&size.to_le_bytes());
+                data.extend_from_slice(&mtime.to_le_bytes());
+            }
+            DirEntry::Symlink { name } => {
+                data.push(CatalogEntryType::Symlink as u8);
+                data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+                data.extend_from_slice(name);
+            }
+            DirEntry::Hardlink { name } => {
+                data.push(CatalogEntryType::Hardlink as u8);
+                data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+                data.extend_from_slice(name);
+            }
+            DirEntry::BlockDevice { name } => {
+                data.push(CatalogEntryType::BlockDevice as u8);
+                data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+                data.extend_from_slice(name);
+            }
+             DirEntry::CharDevice { name } => {
+                data.push(CatalogEntryType::CharDevice as u8);
+                data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+                data.extend_from_slice(name);
+            }
+            DirEntry::Fifo { name } => {
+                data.push(CatalogEntryType::Fifo as u8);
+                data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+                data.extend_from_slice(name);
+            }
+            DirEntry::Socket { name } => {
+                data.push(CatalogEntryType::Socket as u8);
+                data.extend_from_slice(&(name.len() as u32).to_le_bytes());
+                data.extend_from_slice(name);
+            }
+        }
     }
-    pub fn new_signed_compressed(writer: W, config: Arc<CryptConfig>) -> Result<Self, Error> {
-        let writer = DataBlobWriter::new_signed_compressed(writer, config)?;
-        Ok(Self { writer, level: 0, pos: 0 })
+
+    fn encode(self, start: u64) -> Result<(CString, Vec<u8>), Error> {
+        let mut table = Vec::new();
+        let count: u32 = self.entries.len().try_into()?;
+        for entry in self.entries {
+            Self::encode_entry(&mut table, &entry, start);
+        }
+
+        let data = Vec::new();
+        let mut writer = std::io::Cursor::new(data);
+        let size: u32 = (4 + 4 + table.len()).try_into()?;
+        writer.write_all(&size.to_le_bytes())?;
+        writer.write_all(&count.to_le_bytes())?;
+        writer.write_all(&table)?;
+        Ok((self.name, writer.into_inner()))
     }
-    pub fn new_encrypted_compressed(writer: W, config: Arc<CryptConfig>) -> Result<Self, Error> {
-        let writer = DataBlobWriter::new_encrypted_compressed(writer, config)?;
-        Ok(Self { writer, level: 0, pos: 0 })
+}
+
+pub struct CatalogWriter<W> {
+    writer: W,
+    dirstack: Vec<DirInfo>,
+    pos: u64,
+}
+
+impl <W: Write> CatalogWriter<W> {
+
+    pub fn new(writer: W) -> Result<Self, Error> {
+        Ok(Self { writer, dirstack: vec![ DirInfo::new_rootdir() ], pos: 0 })
     }
-    pub fn finish(self) -> Result<W, Error> {
-        self.writer.finish()
+
+    pub fn finish(&mut self) -> Result<(), Error> {
+        if self.dirstack.len() != 1 {
+            bail!("unable to finish catalog at level {}", self.dirstack.len());
+        }
+
+        let dir = self.dirstack.pop().unwrap();
+
+        let start = self.pos;
+        let (_, data) = dir.encode(start)?;
+        self.write_all(&data)?;
+
+        self.write_all(&start.to_le_bytes())?;
+
+        self.writer.flush()?;
+
+        Ok(())
     }
 }
 
-impl <W: Write + Seek> BackupCatalogWriter for CatalogBlobWriter<W> {
+impl <W: Write> BackupCatalogWriter for CatalogWriter<W> {
 
     fn start_directory(&mut self, name: &CStr) -> Result<(), Error> {
-        self.write_all(&[CatalogEntryType::Directory as u8])?;
-        self.write_all(name.to_bytes_with_nul())?;
-        self.write_all(b"{")?;
-        self.level += 1;
+        let new = DirInfo::new(name.to_owned());
+        self.dirstack.push(new);
         Ok(())
     }
 
     fn end_directory(&mut self) -> Result<(), Error> {
-        if self.level == 0 {
-            bail!("got unexpected end_directory level 0");
-        }
-        self.write_all(b"}")?;
-        self.level -= 1;
+        let (start, name) = match self.dirstack.pop() {
+            Some(dir) => {
+                let start = self.pos;
+                let (name, data) = dir.encode(start)?;
+                self.write_all(&data)?;
+                (start, name)
+            }
+            None => {
+                bail!("got unexpected end_directory level 0");
+            }
+        };
+
+        let current = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+        let name = name.to_bytes().to_vec();
+        current.entries.push(DirEntry::Directory { name, start });
+
         Ok(())
     }
 
     fn add_file(&mut self, name: &CStr, size: u64, mtime: u64) -> Result<(), Error> {
-        self.write_all(&[CatalogEntryType::File as u8])?;
-        self.write_all(&size.to_le_bytes())?;
-        self.write_all(&mtime.to_le_bytes())?;
-        self.write_all(name.to_bytes_with_nul())?;
+        let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+        let name = name.to_bytes().to_vec();
+        dir.entries.push(DirEntry::File { name, size, mtime });
         Ok(())
     }
 
     fn add_symlink(&mut self, name: &CStr) -> Result<(), Error> {
-        self.write_all(&[CatalogEntryType::Symlink as u8])?;
-        self.write_all(name.to_bytes_with_nul())?;
+        let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+        let name = name.to_bytes().to_vec();
+        dir.entries.push(DirEntry::Symlink { name });
         Ok(())
     }
 
     fn add_hardlink(&mut self, name: &CStr) -> Result<(), Error> {
-        self.write_all(&[CatalogEntryType::Hardlink as u8])?;
-        self.write_all(name.to_bytes_with_nul())?;
+        let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+        let name = name.to_bytes().to_vec();
+        dir.entries.push(DirEntry::Hardlink { name });
         Ok(())
     }
 
     fn add_block_device(&mut self, name: &CStr) -> Result<(), Error> {
-        self.write_all(&[CatalogEntryType::BlockDevice as u8])?;
-        self.write_all(name.to_bytes_with_nul())?;
-        Ok(())
+        let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+        let name = name.to_bytes().to_vec();
+        dir.entries.push(DirEntry::BlockDevice { name });
+         Ok(())
     }
 
     fn add_char_device(&mut self, name: &CStr) -> Result<(), Error> {
-        self.write_all(&[CatalogEntryType::CharDevice as u8])?;
-        self.write_all(name.to_bytes_with_nul())?;
+        let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+        let name = name.to_bytes().to_vec();
+        dir.entries.push(DirEntry::CharDevice { name });
         Ok(())
     }
 
     fn add_fifo(&mut self, name: &CStr) -> Result<(), Error> {
-        self.write_all(&[CatalogEntryType::Fifo as u8])?;
-        self.write_all(name.to_bytes_with_nul())?;
+        let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+        let name = name.to_bytes().to_vec();
+        dir.entries.push(DirEntry::Fifo { name });
         Ok(())
     }
 
     fn add_socket(&mut self, name: &CStr) -> Result<(), Error> {
-        self.write_all(&[CatalogEntryType::Socket as u8])?;
-        self.write_all(name.to_bytes_with_nul())?;
+        let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
+        let name = name.to_bytes().to_vec();
+        dir.entries.push(DirEntry::Socket { name });
         Ok(())
     }
 }
 
-impl<W: Write + Seek> CatalogBlobWriter<W> {
+impl<W: Write> CatalogWriter<W> {
     fn write_all(&mut self, data: &[u8]) -> Result<(), Error> {
         self.writer.write_all(data)?;
         self.pos += u64::try_from(data.len())?;
@@ -108,125 +219,109 @@ impl<W: Write + Seek> CatalogBlobWriter<W> {
     }
 }
 
-pub struct CatalogBlobReader<R: Read + BufRead> {
-    reader: BufReader<DataBlobReader<R>>,
-    dir_stack: Vec<CString>,
+// fixme: move to somehere else?
+/// Implement Write to tokio mpsc channel Sender
+pub struct SenderWriter(tokio::sync::mpsc::Sender<Result<Vec<u8>, Error>>);
+
+impl SenderWriter {
+    pub fn new(sender: tokio::sync::mpsc::Sender<Result<Vec<u8>, Error>>) -> Self {
+        Self(sender)
+    }
 }
 
-impl <R: Read + BufRead> CatalogBlobReader<R> {
+impl Write for SenderWriter {
+    fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
+        futures::executor::block_on(async move {
+            self.0.send(Ok(buf.to_vec())).await
+                .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?;
+            Ok(buf.len())
+        })
+    }
 
-    pub fn new(reader: R, crypt_config: Option<Arc<CryptConfig>>) -> Result<Self, Error> {
-        let dir_stack = Vec::new();
+    fn flush(&mut self) -> Result<(), std::io::Error> {
+        Ok(())
+    }
+}
 
-        let reader = BufReader::new(DataBlobReader::new(reader, crypt_config)?);
+pub struct CatalogReader<R> {
+    reader: R,
+}
 
-        Ok(Self { reader, dir_stack })
-    }
+impl <R: Read + Seek> CatalogReader<R> {
 
-    fn read_filename(&mut self) ->  Result<std::ffi::CString, Error> {
-        let mut filename = Vec::new();
-        self.reader.read_until(0u8, &mut filename)?;
-        if filename.len() > 0 && filename[filename.len()-1] == 0u8 {
-            filename.pop();
-        }
-        if filename.len() == 0 {
-            bail!("got zero length filename");
-        }
-        if filename.iter().find(|b| **b == b'/').is_some() {
-            bail!("found invalid filename with slashes.");
-        }
-        Ok(unsafe { CString::from_vec_unchecked(filename) })
+    pub fn new(reader: R) -> Self {
+        Self { reader }
     }
 
-    fn next_byte(&mut self) ->  Result<u8, std::io::Error> {
+    fn next_byte<C: Read>(mut reader: C) ->  Result<u8, std::io::Error> {
         let mut buf = [0u8; 1];
-        self.reader.read_exact(&mut buf)?;
+        reader.read_exact(&mut buf)?;
         Ok(buf[0])
     }
 
-    fn expect_next(&mut self, expect: u8) -> Result<(), Error> {
-        let next = self.next_byte()?;
-        if next != expect {
-            bail!("got unexpected byte ({} != {})", next, expect);
-        }
-        Ok(())
-    }
+    pub fn dump(&mut self) -> Result<(), Error> {
 
-    fn print_entry(&self, etype: CatalogEntryType, filename: &CStr, size: u64, mtime: u64)  -> Result<(), Error> {
-        let mut out = Vec::new();
+        self.reader.seek(SeekFrom::End(-8))?;
 
-        write!(out, "{} ", char::from(etype as u8))?;
+        let start = unsafe { self.reader.read_le_value::<u64>()? };
 
-        for name in &self.dir_stack {
-            out.extend(name.to_bytes());
-            out.push(b'/');
-        }
+        self.dump_dir(std::path::Path::new("./"), start)
+    }
 
-        out.extend(filename.to_bytes());
+    pub fn dump_dir(&mut self, prefix: &std::path::Path, start: u64) -> Result<(), Error> {
 
-        let dt = Local.timestamp(mtime as i64, 0);
+        self.reader.seek(SeekFrom::Start(start))?;
 
-        if etype == CatalogEntryType::File {
-            write!(out, " {} {}", size, dt.to_rfc3339_opts(chrono::SecondsFormat::Secs, false))?;
-        }
+        let size = unsafe { self.reader.read_le_value::<u32>()? } as usize;
 
-        writeln!(out)?;
-        std::io::stdout().write_all(&out)?;
+        if size < 8 { bail!("got small directory size {}", size) };
 
-        Ok(())
-    }
+        let data = self.reader.read_exact_allocated(size - 4)?;
 
-    fn parse_entries(&mut self) ->  Result<(), Error> {
+        let mut cursor = &data[..];
 
-        loop {
-            let etype = match self.next_byte() {
-                Ok(v) => v,
-                Err(err) => {
-                    if err.kind() == std::io::ErrorKind::UnexpectedEof && self.dir_stack.len() == 0 {
-                        break;
-                    }
+        let entries = unsafe { cursor.read_le_value::<u32>()? };
 
-                    return Err(err.into());
-                }
-            };
-            if etype == b'}' {
-                if self.dir_stack.pop().is_none() {
-                    bail!("got unexpected '}'");
-                }
-                break;
-            }
+        //println!("TEST {} {} size {}", start, entries, size);
+
+        for _ in 0..entries {
+            let etype = CatalogEntryType::try_from(Self::next_byte(&mut cursor)?)?;
+            let name_len = unsafe { cursor.read_le_value::<u32>()? };
+            let name = cursor.read_exact_allocated(name_len as usize)?;
+
+            let mut path = std::path::PathBuf::from(prefix);
+            path.push(std::ffi::OsString::from_vec(name));
 
-            let etype = CatalogEntryType::try_from(etype)?;
             match etype {
                 CatalogEntryType::Directory => {
-                    let filename = self.read_filename()?;
-                    self.print_entry(etype.into(), &filename, 0, 0)?;
-                    self.dir_stack.push(filename);
-                    self.expect_next(b'{')?;
-                    self.parse_entries()?;
+                    println!("{} {:?}", char::from(etype as u8), path);
+                    let offset = unsafe { cursor.read_le_value::<u64>()? };
+                    if offset > start {
+                        bail!("got wrong directory offset ({} > {})", offset, start);
+                    }
+                    let pos = start - offset;
+                    self.dump_dir(&path, pos)?;
                 }
                 CatalogEntryType::File => {
-                    let size = unsafe { self.reader.read_le_value::<u64>()? };
-                    let mtime = unsafe { self.reader.read_le_value::<u64>()? };
-                    let filename = self.read_filename()?;
-                    self.print_entry(etype.into(), &filename, size, mtime)?;
+                    let size = unsafe { cursor.read_le_value::<u64>()? };
+                    let mtime = unsafe { cursor.read_le_value::<u64>()? };
+
+                    let dt = Local.timestamp(mtime as i64, 0);
+
+                    println!("{} {:?} {} {}",
+                             char::from(etype as u8),
+                             path,
+                             size,
+                             dt.to_rfc3339_opts(chrono::SecondsFormat::Secs, false)
+                    );
                 }
-                CatalogEntryType::Symlink |
-                CatalogEntryType::Hardlink |
-                CatalogEntryType::Fifo |
-                CatalogEntryType::Socket |
-                CatalogEntryType::BlockDevice |
-                CatalogEntryType::CharDevice => {
-                    let filename = self.read_filename()?;
-                    self.print_entry(etype.into(), &filename, 0, 0)?;
+                _ => {
+                    println!("{} {:?}", char::from(etype as u8), path);
                 }
             }
         }
-        Ok(())
-    }
 
-    pub fn dump(&mut self) -> Result<(), Error> {
-        self.parse_entries()?;
         Ok(())
     }
+
 }
index 38bdb26e5c8d88d7b6922f8129a3d154ef870e63..1fd2c54ae64b1e4022554d39ec33ef0db974e70c 100644 (file)
@@ -8,7 +8,7 @@ use chrono::{Local, Utc, TimeZone};
 use std::path::{Path, PathBuf};
 use std::collections::{HashSet, HashMap};
 use std::ffi::OsStr;
-use std::io::{BufReader, Read, Write, Seek, SeekFrom};
+use std::io::{Read, Write, Seek, SeekFrom};
 use std::os::unix::fs::OpenOptionsExt;
 
 use proxmox::tools::fs::{file_get_contents, file_get_json, file_set_contents, image_size};
@@ -188,7 +188,7 @@ async fn backup_directory<P: AsRef<Path>>(
     verbose: bool,
     skip_lost_and_found: bool,
     crypt_config: Option<Arc<CryptConfig>>,
-    catalog: Arc<Mutex<CatalogBlobWriter<std::fs::File>>>,
+    catalog: Arc<Mutex<CatalogWriter<SenderWriter>>>,
 ) -> Result<BackupStats, Error> {
 
     let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), device_set, verbose, skip_lost_and_found, catalog)?;
@@ -485,23 +485,41 @@ fn dump_catalog(
             true,
         ).await?;
 
+        let tmpfile = std::fs::OpenOptions::new()
+            .write(true)
+            .read(true)
+            .custom_flags(libc::O_TMPFILE)
+            .open("/tmp")?;
+
         let manifest = client.download_manifest().await?;
 
-        let blob_file = std::fs::OpenOptions::new()
-            .read(true)
+        let tmpfile = client.download(CATALOG_NAME, tmpfile).await?;
+
+        let index = DynamicIndexReader::new(tmpfile)
+            .map_err(|err| format_err!("unable to read catalog index - {}", err))?;
+
+        // Note: do not use values stored in index (not trusted) - instead, computed them again
+        let (csum, size) = index.compute_csum();
+        manifest.verify_file(CATALOG_NAME, &csum, size)?;
+
+        let most_used = index.find_most_used_chunks(8);
+
+        let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used);
+
+        let mut reader = BufferedDynamicReader::new(index, chunk_reader);
+
+        let mut catalogfile = std::fs::OpenOptions::new()
             .write(true)
+            .read(true)
             .custom_flags(libc::O_TMPFILE)
             .open("/tmp")?;
 
-        let mut blob_file = client.download(CATALOG_BLOB_NAME, blob_file).await?;
-
-        let (csum, size) = compute_file_csum(&mut blob_file)?;
-        manifest.verify_file(CATALOG_BLOB_NAME, &csum, size)?;
+        std::io::copy(&mut reader, &mut catalogfile)
+            .map_err(|err| format_err!("unable to download catalog - {}", err))?;
 
-        blob_file.seek(SeekFrom::Start(0))?;
+        catalogfile.seek(SeekFrom::Start(0))?;
 
-        let reader = BufReader::new(blob_file);
-        let mut catalog_reader = CatalogBlobReader::new(reader, crypt_config)?;
+        let mut catalog_reader = CatalogReader::new(catalogfile);
 
         catalog_reader.dump()?;
 
@@ -584,6 +602,40 @@ fn parse_backupspec(value: &str) -> Result<(&str, &str), Error> {
     bail!("unable to parse directory specification '{}'", value);
 }
 
+fn spawn_catalog_upload(
+    client: Arc<BackupWriter>,
+    crypt_config: Option<Arc<CryptConfig>>,
+) -> Result<
+        (
+            Arc<Mutex<CatalogWriter<SenderWriter>>>,
+            tokio::sync::oneshot::Receiver<Result<BackupStats, Error>>
+        ), Error>
+{
+    let (catalog_tx, catalog_rx) = mpsc::channel(10); // allow to buffer 10 writes
+    let catalog_stream = catalog_rx.map_err(Error::from);
+    let catalog_chunk_size = 512*1024;
+    let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
+
+    let catalog = Arc::new(Mutex::new(CatalogWriter::new(SenderWriter::new(catalog_tx))?));
+
+    let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel();
+
+    tokio::spawn(async move {
+        let catalog_upload_result = client
+            .upload_stream(CATALOG_NAME, catalog_chunk_stream, "dynamic", None, crypt_config)
+            .await;
+
+        if let Err(ref err) = catalog_upload_result {
+            eprintln!("catalog upload error - {}", err);
+            client.cancel();
+        }
+
+        let _ = catalog_result_tx.send(catalog_upload_result);
+    });
+
+    Ok((catalog, catalog_result_rx))
+}
+
 fn create_backup(
     param: Value,
     _info: &ApiMethod,
@@ -637,6 +689,8 @@ fn create_backup(
 
     enum BackupType { PXAR, IMAGE, CONFIG, LOGFILE };
 
+    let mut upload_catalog = false;
+
     for backupspec in backupspec_list {
         let (target, filename) = parse_backupspec(backupspec.as_str().unwrap())?;
 
@@ -655,6 +709,7 @@ fn create_backup(
                     bail!("got unexpected file type (expected directory)");
                 }
                 upload_list.push((BackupType::PXAR, filename.to_owned(), format!("{}.didx", target), 0));
+                upload_catalog = true;
             }
             "img" => {
 
@@ -731,15 +786,7 @@ fn create_backup(
         let snapshot = BackupDir::new(backup_type, backup_id, backup_time.timestamp());
         let mut manifest = BackupManifest::new(snapshot);
 
-        // fixme: encrypt/sign catalog?
-         let catalog_file = std::fs::OpenOptions::new()
-            .write(true)
-            .read(true)
-            .custom_flags(libc::O_TMPFILE)
-            .open("/tmp")?;
-
-        let catalog = Arc::new(Mutex::new(CatalogBlobWriter::new_compressed(catalog_file)?));
-        let mut upload_catalog = false;
+        let (catalog, catalog_result_rx) = spawn_catalog_upload(client.clone(), crypt_config.clone())?;
 
         for (backup_type, filename, target, size) in upload_list {
             match backup_type {
@@ -758,7 +805,6 @@ fn create_backup(
                     manifest.add_file(target, stats.size, stats.csum);
                 }
                 BackupType::PXAR => {
-                    upload_catalog = true;
                     println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target);
                     catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
                     let stats = backup_directory(
@@ -795,14 +841,15 @@ fn create_backup(
         if upload_catalog {
             let mutex = Arc::try_unwrap(catalog)
                 .map_err(|_| format_err!("unable to get catalog (still used)"))?;
-            let mut catalog_file = mutex.into_inner().unwrap().finish()?;
+            let mut catalog = mutex.into_inner().unwrap();
+
+            catalog.finish()?;
 
-            let target = CATALOG_BLOB_NAME;
+            drop(catalog); // close upload stream
 
-            catalog_file.seek(SeekFrom::Start(0))?;
+            let stats = catalog_result_rx.await??;
 
-            let stats = client.upload_blob(catalog_file, target).await?;
-            manifest.add_file(target.to_owned(), stats.size, stats.csum);
+            manifest.add_file(CATALOG_NAME.to_owned(), stats.size, stats.csum);
         }
 
         if let Some(rsa_encrypted_key) = rsa_encrypted_key {
index 629943661d291e14f5ac31e620e0d0376f8c1588..8a1abd03cae9d7c420f7c97de4619340044d018b 100644 (file)
@@ -1,5 +1,5 @@
 use std::collections::HashSet;
-use std::io::{Seek, Write};
+use std::io::Write;
 use std::os::unix::io::FromRawFd;
 use std::path::{Path, PathBuf};
 use std::pin::Pin;
@@ -15,7 +15,7 @@ use nix::sys::stat::Mode;
 use nix::dir::Dir;
 
 use crate::pxar;
-use crate::backup::CatalogBlobWriter;
+use crate::backup::CatalogWriter;
 
 use crate::tools::wrapped_reader_stream::WrappedReaderStream;
 
@@ -41,13 +41,13 @@ impl Drop for PxarBackupStream {
 impl PxarBackupStream {
     pin_utils::unsafe_pinned!(stream: Option<WrappedReaderStream<std::fs::File>>);
 
-    pub fn new<W: Write + Seek + Send + 'static>(
+    pub fn new<W: Write + Send + 'static>(
         mut dir: Dir,
         path: PathBuf,
         device_set: Option<HashSet<u64>>,
         verbose: bool,
         skip_lost_and_found: bool,
-        catalog: Arc<Mutex<CatalogBlobWriter<W>>>,
+        catalog: Arc<Mutex<CatalogWriter<W>>>,
     ) -> Result<Self, Error> {
 
         let (rx, tx) = nix::unistd::pipe()?;
@@ -89,12 +89,12 @@ impl PxarBackupStream {
         })
     }
 
-    pub fn open<W: Write + Seek + Send + 'static>(
+    pub fn open<W: Write + Send + 'static>(
         dirname: &Path,
         device_set: Option<HashSet<u64>>,
         verbose: bool,
         skip_lost_and_found: bool,
-        catalog: Arc<Mutex<CatalogBlobWriter<W>>>,
+        catalog: Arc<Mutex<CatalogWriter<W>>>,
     ) -> Result<Self, Error> {
 
         let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;