]> git.proxmox.com Git - proxmox-backup.git/commitdiff
move chunk_store to pbs-datastore
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Wed, 7 Jul 2021 12:37:47 +0000 (14:37 +0200)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Wed, 7 Jul 2021 12:37:47 +0000 (14:37 +0200)
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
21 files changed:
pbs-api-types/src/lib.rs
pbs-datastore/src/chunk_store.rs [new file with mode: 0644]
pbs-datastore/src/lib.rs
pbs-datastore/src/task.rs
src/api2/config/datastore.rs
src/api2/tape/backup.rs
src/api2/tape/drive.rs
src/api2/tape/restore.rs
src/api2/types/mod.rs
src/backup/chunk_store.rs [deleted file]
src/backup/datastore.rs
src/backup/mod.rs
src/backup/verify.rs
src/client/pull.rs
src/lib.rs
src/server/prune_job.rs
src/server/verify_job.rs
src/server/worker_task.rs
src/tape/drive/mod.rs
src/tape/pool_writer/mod.rs
src/task.rs [deleted file]

index 50072bf40145eda63a9e31308024506655f5258e..cc5103f4e0723101145eb8d0a64d8417e31a7f7e 100644 (file)
@@ -1,5 +1,8 @@
 //! Basic API types used by most of the PBS code.
 
+use serde::{Deserialize, Serialize};
+
+use proxmox::api::api;
 use proxmox::api::schema::{ApiStringFormat, Schema, StringSchema};
 use proxmox::const_regex;
 
@@ -37,6 +40,7 @@ pub use userid::{Username, UsernameRef};
 pub use userid::{PROXMOX_GROUP_ID_SCHEMA, PROXMOX_TOKEN_ID_SCHEMA, PROXMOX_TOKEN_NAME_SCHEMA};
 
 pub mod upid;
+pub use upid::UPID;
 
 const_regex! {
     pub BACKUP_TYPE_REGEX = concat!(r"^(", BACKUP_TYPE_RE!(), r")$");
@@ -84,3 +88,56 @@ pub const SINGLE_LINE_COMMENT_SCHEMA: Schema = StringSchema::new("Comment (singl
     .schema();
 
 pub const BACKUP_ID_FORMAT: ApiStringFormat = ApiStringFormat::Pattern(&BACKUP_ID_REGEX);
+
+#[api(
+    properties: {
+        "upid": {
+            optional: true,
+            type: UPID,
+        },
+    },
+)]
+#[derive(Clone, Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+/// Garbage collection status.
+pub struct GarbageCollectionStatus {
+    pub upid: Option<String>,
+    /// Number of processed index files.
+    pub index_file_count: usize,
+    /// Sum of bytes referred by index files.
+    pub index_data_bytes: u64,
+    /// Bytes used on disk.
+    pub disk_bytes: u64,
+    /// Chunks used on disk.
+    pub disk_chunks: usize,
+    /// Sum of removed bytes.
+    pub removed_bytes: u64,
+    /// Number of removed chunks.
+    pub removed_chunks: usize,
+    /// Sum of pending bytes (pending removal - kept for safety).
+    pub pending_bytes: u64,
+    /// Number of pending chunks (pending removal - kept for safety).
+    pub pending_chunks: usize,
+    /// Number of chunks marked as .bad by verify that have been removed by GC.
+    pub removed_bad: usize,
+    /// Number of chunks still marked as .bad after garbage collection.
+    pub still_bad: usize,
+}
+
+impl Default for GarbageCollectionStatus {
+    fn default() -> Self {
+        GarbageCollectionStatus {
+            upid: None,
+            index_file_count: 0,
+            index_data_bytes: 0,
+            disk_bytes: 0,
+            disk_chunks: 0,
+            removed_bytes: 0,
+            removed_chunks: 0,
+            pending_bytes: 0,
+            pending_chunks: 0,
+            removed_bad: 0,
+            still_bad: 0,
+        }
+    }
+}
diff --git a/pbs-datastore/src/chunk_store.rs b/pbs-datastore/src/chunk_store.rs
new file mode 100644 (file)
index 0000000..ddf0a76
--- /dev/null
@@ -0,0 +1,487 @@
+use std::io::Write;
+use std::os::unix::io::AsRawFd;
+use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
+
+use anyhow::{bail, format_err, Error};
+
+use proxmox::tools::fs::{CreateOptions, create_path, create_dir};
+
+use pbs_api_types::GarbageCollectionStatus;
+use pbs_tools::process_locker::{self, ProcessLocker};
+
+use crate::DataBlob;
+use crate::task_log;
+use crate::task::TaskState;
+
+/// File system based chunk store
+pub struct ChunkStore {
+    name: String, // used for error reporting
+    pub (crate) base: PathBuf,
+    chunk_dir: PathBuf,
+    mutex: Mutex<()>,
+    locker: Arc<Mutex<ProcessLocker>>,
+}
+
+// TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ?
+
+pub fn verify_chunk_size(size: usize) -> Result<(), Error> {
+
+    static SIZES: [usize; 7] = [64*1024, 128*1024, 256*1024, 512*1024, 1024*1024, 2048*1024, 4096*1024];
+
+    if !SIZES.contains(&size) {
+        bail!("Got unsupported chunk size '{}'", size);
+    }
+    Ok(())
+}
+
+fn digest_to_prefix(digest: &[u8]) -> PathBuf {
+
+    let mut buf = Vec::<u8>::with_capacity(2+1+2+1);
+
+    const HEX_CHARS: &[u8; 16] = b"0123456789abcdef";
+
+    buf.push(HEX_CHARS[(digest[0] as usize) >> 4]);
+    buf.push(HEX_CHARS[(digest[0] as usize) &0xf]);
+    buf.push(HEX_CHARS[(digest[1] as usize) >> 4]);
+    buf.push(HEX_CHARS[(digest[1] as usize) & 0xf]);
+    buf.push(b'/');
+
+    let path = unsafe { String::from_utf8_unchecked(buf)};
+
+    path.into()
+}
+
+impl ChunkStore {
+
+    fn chunk_dir<P: AsRef<Path>>(path: P) -> PathBuf {
+
+        let mut chunk_dir: PathBuf = PathBuf::from(path.as_ref());
+        chunk_dir.push(".chunks");
+
+        chunk_dir
+    }
+
+    pub fn base(&self) -> &Path {
+        &self.base
+    }
+
+    pub fn create<P>(name: &str, path: P, uid: nix::unistd::Uid, gid: nix::unistd::Gid, worker: Option<&dyn TaskState>) -> Result<Self, Error>
+    where
+        P: Into<PathBuf>,
+    {
+
+        let base: PathBuf = path.into();
+
+        if !base.is_absolute() {
+            bail!("expected absolute path - got {:?}", base);
+        }
+
+        let chunk_dir = Self::chunk_dir(&base);
+
+        let options = CreateOptions::new()
+            .owner(uid)
+            .group(gid);
+
+        let default_options = CreateOptions::new();
+
+        match create_path(&base, Some(default_options), Some(options.clone())) {
+            Err(err) => bail!("unable to create chunk store '{}' at {:?} - {}", name, base, err),
+            Ok(res) => if ! res  { nix::unistd::chown(&base, Some(uid), Some(gid))? },
+        }
+
+        if let Err(err) = create_dir(&chunk_dir, options.clone()) {
+            bail!("unable to create chunk store '{}' subdir {:?} - {}", name, chunk_dir, err);
+        }
+
+        // create lock file with correct owner/group
+        let lockfile_path = Self::lockfile_path(&base);
+        proxmox::tools::fs::replace_file(lockfile_path, b"", options.clone())?;
+
+        // create 64*1024 subdirs
+        let mut last_percentage = 0;
+
+        for i in 0..64*1024 {
+            let mut l1path = chunk_dir.clone();
+            l1path.push(format!("{:04x}", i));
+            if let Err(err) = create_dir(&l1path, options.clone()) {
+                bail!("unable to create chunk store '{}' subdir {:?} - {}", name, l1path, err);
+            }
+            let percentage = (i*100)/(64*1024);
+            if percentage != last_percentage {
+                if let Some(worker) = worker {
+                    task_log!(worker, "Chunkstore create: {}%", percentage)
+                }
+                last_percentage = percentage;
+            }
+        }
+
+        Self::open(name, base)
+    }
+
+    fn lockfile_path<P: Into<PathBuf>>(base: P) -> PathBuf {
+        let mut lockfile_path: PathBuf = base.into();
+
+        lockfile_path.push(".lock");
+
+        lockfile_path
+    }
+
+    pub fn open<P: Into<PathBuf>>(name: &str, base: P) -> Result<Self, Error> {
+
+        let base: PathBuf = base.into();
+
+        if !base.is_absolute() {
+            bail!("expected absolute path - got {:?}", base);
+        }
+
+        let chunk_dir = Self::chunk_dir(&base);
+
+        if let Err(err) = std::fs::metadata(&chunk_dir) {
+            bail!("unable to open chunk store '{}' at {:?} - {}", name, chunk_dir, err);
+        }
+
+        let lockfile_path = Self::lockfile_path(&base);
+
+        let locker = ProcessLocker::new(&lockfile_path)?;
+
+        Ok(ChunkStore {
+            name: name.to_owned(),
+            base,
+            chunk_dir,
+            locker,
+            mutex: Mutex::new(())
+        })
+    }
+
+    pub fn touch_chunk(&self, digest: &[u8; 32]) -> Result<(), Error> {
+        self.cond_touch_chunk(digest, true)?;
+        Ok(())
+    }
+
+    pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> {
+        let (chunk_path, _digest_str) = self.chunk_path(digest);
+        self.cond_touch_path(&chunk_path, fail_if_not_exist)
+    }
+
+    pub fn cond_touch_path(&self, path: &Path, fail_if_not_exist: bool) -> Result<bool, Error> {
+        const UTIME_NOW: i64 = (1 << 30) - 1;
+        const UTIME_OMIT: i64 = (1 << 30) - 2;
+
+        let times: [libc::timespec; 2] = [
+            libc::timespec { tv_sec: 0, tv_nsec: UTIME_NOW },
+            libc::timespec { tv_sec: 0, tv_nsec: UTIME_OMIT }
+        ];
+
+        use nix::NixPath;
+
+        let res = path.with_nix_path(|cstr| unsafe {
+            let tmp = libc::utimensat(-1, cstr.as_ptr(), &times[0], libc::AT_SYMLINK_NOFOLLOW);
+            nix::errno::Errno::result(tmp)
+        })?;
+
+        if let Err(err) = res {
+            if !fail_if_not_exist && err.as_errno() == Some(nix::errno::Errno::ENOENT) {
+                return Ok(false);
+            }
+
+            bail!("update atime failed for chunk/file {:?} - {}", path, err);
+        }
+
+        Ok(true)
+    }
+
+    pub fn get_chunk_iterator(
+        &self,
+    ) -> Result<
+        impl Iterator<Item = (Result<pbs_tools::fs::ReadDirEntry, Error>, usize, bool)> + std::iter::FusedIterator,
+        Error
+    > {
+        use nix::dir::Dir;
+        use nix::fcntl::OFlag;
+        use nix::sys::stat::Mode;
+
+        let base_handle = Dir::open(&self.chunk_dir, OFlag::O_RDONLY, Mode::empty())
+            .map_err(|err| {
+                format_err!(
+                    "unable to open store '{}' chunk dir {:?} - {}",
+                    self.name,
+                    self.chunk_dir,
+                    err,
+                )
+            })?;
+
+        let mut done = false;
+        let mut inner: Option<pbs_tools::fs::ReadDir> = None;
+        let mut at = 0;
+        let mut percentage = 0;
+        Ok(std::iter::from_fn(move || {
+            if done {
+                return None;
+            }
+
+            loop {
+                if let Some(ref mut inner) = inner {
+                    match inner.next() {
+                        Some(Ok(entry)) => {
+                            // skip files if they're not a hash
+                            let bytes = entry.file_name().to_bytes();
+                            if bytes.len() != 64 && bytes.len() != 64 + ".0.bad".len() {
+                                continue;
+                            }
+                            if !bytes.iter().take(64).all(u8::is_ascii_hexdigit) {
+                                continue;
+                            }
+
+                            let bad = bytes.ends_with(b".bad");
+                            return Some((Ok(entry), percentage, bad));
+                        }
+                        Some(Err(err)) => {
+                            // stop after first error
+                            done = true;
+                            // and pass the error through:
+                            return Some((Err(err), percentage, false));
+                        }
+                        None => (), // open next directory
+                    }
+                }
+
+                inner = None;
+
+                if at == 0x10000 {
+                    done = true;
+                    return None;
+                }
+
+                let subdir: &str = &format!("{:04x}", at);
+                percentage = (at * 100) / 0x10000;
+                at += 1;
+                match pbs_tools::fs::read_subdir(base_handle.as_raw_fd(), subdir) {
+                    Ok(dir) => {
+                        inner = Some(dir);
+                        // start reading:
+                        continue;
+                    }
+                    Err(ref err) if err.as_errno() == Some(nix::errno::Errno::ENOENT) => {
+                        // non-existing directories are okay, just keep going:
+                        continue;
+                    }
+                    Err(err) => {
+                        // other errors are fatal, so end our iteration
+                        done = true;
+                        // and pass the error through:
+                        return Some((Err(format_err!("unable to read subdir '{}' - {}", subdir, err)), percentage, false));
+                    }
+                }
+            }
+        }).fuse())
+    }
+
+    pub fn oldest_writer(&self) -> Option<i64> {
+        ProcessLocker::oldest_shared_lock(self.locker.clone())
+    }
+
+    pub fn sweep_unused_chunks<F: Fn() -> Result<(), Error>>(
+        &self,
+        oldest_writer: i64,
+        phase1_start_time: i64,
+        status: &mut GarbageCollectionStatus,
+        worker: &dyn TaskState,
+        fail_on_shutdown: F,
+    ) -> Result<(), Error> {
+        use nix::sys::stat::fstatat;
+        use nix::unistd::{unlinkat, UnlinkatFlags};
+
+        let mut min_atime = phase1_start_time - 3600*24; // at least 24h (see mount option relatime)
+
+        if oldest_writer < min_atime {
+            min_atime = oldest_writer;
+        }
+
+        min_atime -= 300; // add 5 mins gap for safety
+
+        let mut last_percentage = 0;
+        let mut chunk_count = 0;
+
+        for (entry, percentage, bad) in self.get_chunk_iterator()? {
+            if last_percentage != percentage {
+                last_percentage = percentage;
+                crate::task_log!(
+                    worker,
+                    "processed {}% ({} chunks)",
+                    percentage,
+                    chunk_count,
+                );
+            }
+
+            worker.check_abort()?;
+            fail_on_shutdown()?;
+
+            let (dirfd, entry) = match entry {
+                Ok(entry) => (entry.parent_fd(), entry),
+                Err(err) => bail!("chunk iterator on chunk store '{}' failed - {}", self.name, err),
+            };
+
+            let file_type = match entry.file_type() {
+                Some(file_type) => file_type,
+                None => bail!("unsupported file system type on chunk store '{}'", self.name),
+            };
+            if file_type != nix::dir::Type::File {
+                continue;
+            }
+
+            chunk_count += 1;
+
+            let filename = entry.file_name();
+
+            let lock = self.mutex.lock();
+
+            if let Ok(stat) = fstatat(dirfd, filename, nix::fcntl::AtFlags::AT_SYMLINK_NOFOLLOW) {
+                if stat.st_atime < min_atime {
+                    //let age = now - stat.st_atime;
+                    //println!("UNLINK {}  {:?}", age/(3600*24), filename);
+                    if let Err(err) = unlinkat(Some(dirfd), filename, UnlinkatFlags::NoRemoveDir) {
+                        if bad {
+                            status.still_bad += 1;
+                        }
+                        bail!(
+                            "unlinking chunk {:?} failed on store '{}' - {}",
+                            filename,
+                            self.name,
+                            err,
+                        );
+                    }
+                    if bad {
+                        status.removed_bad += 1;
+                    } else {
+                        status.removed_chunks += 1;
+                    }
+                    status.removed_bytes += stat.st_size as u64;
+                } else if stat.st_atime < oldest_writer {
+                    if bad {
+                        status.still_bad += 1;
+                    } else {
+                        status.pending_chunks += 1;
+                    }
+                    status.pending_bytes += stat.st_size as u64;
+                } else {
+                    if !bad {
+                        status.disk_chunks += 1;
+                    }
+                    status.disk_bytes += stat.st_size as u64;
+                }
+            }
+            drop(lock);
+        }
+
+        Ok(())
+    }
+
+    pub fn insert_chunk(
+        &self,
+        chunk: &DataBlob,
+        digest: &[u8; 32],
+    ) -> Result<(bool, u64), Error> {
+
+        //println!("DIGEST {}", proxmox::tools::digest_to_hex(digest));
+
+        let (chunk_path, digest_str) = self.chunk_path(digest);
+
+        let lock = self.mutex.lock();
+
+        if let Ok(metadata) = std::fs::metadata(&chunk_path) {
+            if metadata.is_file() {
+                self.touch_chunk(digest)?;
+                return Ok((true, metadata.len()));
+            } else {
+                bail!("Got unexpected file type on store '{}' for chunk {}", self.name, digest_str);
+            }
+        }
+
+        let mut tmp_path = chunk_path.clone();
+        tmp_path.set_extension("tmp");
+
+        let mut file = std::fs::File::create(&tmp_path)?;
+
+        let raw_data = chunk.raw_data();
+        let encoded_size = raw_data.len() as u64;
+
+        file.write_all(raw_data)?;
+
+        if let Err(err) = std::fs::rename(&tmp_path, &chunk_path) {
+            if std::fs::remove_file(&tmp_path).is_err()  { /* ignore */ }
+            bail!(
+                "Atomic rename on store '{}' failed for chunk {} - {}",
+                self.name,
+                digest_str,
+                err,
+            );
+        }
+
+        drop(lock);
+
+        Ok((false, encoded_size))
+    }
+
+    pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
+        let mut chunk_path = self.chunk_dir.clone();
+        let prefix = digest_to_prefix(digest);
+        chunk_path.push(&prefix);
+        let digest_str = proxmox::tools::digest_to_hex(digest);
+        chunk_path.push(&digest_str);
+        (chunk_path, digest_str)
+    }
+
+    pub fn relative_path(&self, path: &Path) -> PathBuf {
+
+        let mut full_path = self.base.clone();
+        full_path.push(path);
+        full_path
+    }
+
+    pub fn name(&self) -> &str {
+        &self.name
+    }
+
+    pub fn base_path(&self) -> PathBuf {
+        self.base.clone()
+    }
+
+    pub fn try_shared_lock(&self) -> Result<process_locker::ProcessLockSharedGuard, Error> {
+        ProcessLocker::try_shared_lock(self.locker.clone())
+    }
+
+    pub fn try_exclusive_lock(&self) -> Result<process_locker::ProcessLockExclusiveGuard, Error> {
+        ProcessLocker::try_exclusive_lock(self.locker.clone())
+    }
+}
+
+
+#[test]
+fn test_chunk_store1() {
+
+    let mut path = std::fs::canonicalize(".").unwrap(); // we need absolute path
+    path.push(".testdir");
+
+    if let Err(_e) = std::fs::remove_dir_all(".testdir") { /* ignore */ }
+
+    let chunk_store = ChunkStore::open("test", &path);
+    assert!(chunk_store.is_err());
+
+    let user = nix::unistd::User::from_uid(nix::unistd::Uid::current()).unwrap().unwrap();
+    let chunk_store = ChunkStore::create("test", &path, user.uid, user.gid, None).unwrap();
+
+    let (chunk, digest) = crate::data_blob::DataChunkBuilder::new(&[0u8, 1u8]).build().unwrap();
+
+    let (exists, _) = chunk_store.insert_chunk(&chunk, &digest).unwrap();
+    assert!(!exists);
+
+    let (exists, _) = chunk_store.insert_chunk(&chunk, &digest).unwrap();
+    assert!(exists);
+
+
+    let chunk_store = ChunkStore::create("test", &path, user.uid, user.gid, None);
+    assert!(chunk_store.is_err());
+
+    if let Err(_e) = std::fs::remove_dir_all(".testdir") { /* ignore */ }
+}
index 25ec669c09cea5fa45f60fb027f1d0c62354a6fb..cae66905cf73a7cde73fddf4aff845357a142a60 100644 (file)
@@ -182,6 +182,7 @@ pub mod backup_info;
 pub mod catalog;
 pub mod checksum_reader;
 pub mod checksum_writer;
+pub mod chunk_store;
 pub mod chunker;
 pub mod crypt_config;
 pub mod crypt_reader;
@@ -198,6 +199,7 @@ pub mod task;
 pub use backup_info::{BackupDir, BackupGroup, BackupInfo};
 pub use checksum_reader::ChecksumReader;
 pub use checksum_writer::ChecksumWriter;
+pub use chunk_store::ChunkStore;
 pub use chunker::Chunker;
 pub use crypt_config::{CryptConfig, CryptMode, Fingerprint};
 pub use crypt_reader::CryptReader;
index 91a6bb1190ad14126672e4a5a146f7af9fd280e1..8cfd6fe87e27ccd0114bd1f7692caf35a81e0fc8 100644 (file)
@@ -19,3 +19,38 @@ impl<T: TaskState + ?Sized> TaskState for std::sync::Arc<T> {
         <T as TaskState>::log(&*self, level, message)
     }
 }
+
+#[macro_export]
+macro_rules! task_error {
+    ($task:expr, $($fmt:tt)+) => {{
+        $crate::task::TaskState::log(&*$task, log::Level::Error, &format_args!($($fmt)+))
+    }};
+}
+
+#[macro_export]
+macro_rules! task_warn {
+    ($task:expr, $($fmt:tt)+) => {{
+        $crate::task::TaskState::log(&*$task, log::Level::Warn, &format_args!($($fmt)+))
+    }};
+}
+
+#[macro_export]
+macro_rules! task_log {
+    ($task:expr, $($fmt:tt)+) => {{
+        $crate::task::TaskState::log(&*$task, log::Level::Info, &format_args!($($fmt)+))
+    }};
+}
+
+#[macro_export]
+macro_rules! task_debug {
+    ($task:expr, $($fmt:tt)+) => {{
+        $crate::task::TaskState::log(&*$task, log::Level::Debug, &format_args!($($fmt)+))
+    }};
+}
+
+#[macro_export]
+macro_rules! task_trace {
+    ($task:expr, $($fmt:tt)+) => {{
+        $crate::task::TaskState::log(&*$task, log::Level::Trace, &format_args!($($fmt)+))
+    }};
+}
index 316d9f972703e9e4d105fa41c6c8ed9d864398a7..d083d0c49b9c815976506c2df40d3b5f75f7870b 100644 (file)
@@ -8,6 +8,8 @@ use proxmox::api::{api, Router, RpcEnvironment, Permission};
 use proxmox::api::section_config::SectionConfigData;
 use proxmox::api::schema::parse_property_string;
 
+use pbs_datastore::task::TaskState;
+
 use crate::api2::types::*;
 use crate::backup::*;
 use crate::config::cached_user_info::CachedUserInfo;
@@ -54,7 +56,7 @@ pub(crate) fn do_create_datastore(
     _lock: std::fs::File,
     mut config: SectionConfigData,
     datastore: DataStoreConfig,
-    worker: Option<&dyn crate::task::TaskState>,
+    worker: Option<&dyn TaskState>,
 ) -> Result<(), Error> {
     let path: PathBuf = datastore.path.clone().into();
 
index c3b541c76d35c9a14ead98cc3743a3af37f40235..8119482f6b02a7deb544b476cf5afdca1ed574c2 100644 (file)
@@ -15,9 +15,10 @@ use proxmox::{
     },
 };
 
+use pbs_datastore::{task_log, task_warn};
+use pbs_datastore::task::TaskState;
+
 use crate::{
-    task_log,
-    task_warn,
     config::{
         self,
         cached_user_info::CachedUserInfo,
@@ -55,7 +56,6 @@ use crate::{
         Userid,
     },
     server::WorkerTask,
-    task::TaskState,
     tape::{
         TAPE_STATUS_DIR,
         Inventory,
index 0e4a539f40f4c2edee61c004d6c16344115f8a20..5b698e344430e4bc7173e3e72c406e63458fe43a 100644 (file)
@@ -22,8 +22,9 @@ use proxmox::{
     },
 };
 
+use pbs_datastore::task_log;
+
 use crate::{
-    task_log,
     config::{
         self,
         cached_user_info::CachedUserInfo,
index 14e20ee4719aa47272704b15d2b47018bddbf023..68033c4ab142201ea2f367dd6bb50f37bc4d893b 100644 (file)
@@ -28,10 +28,10 @@ use proxmox::{
     },
 };
 
+use pbs_datastore::{task_log, task_warn};
+use pbs_datastore::task::TaskState;
+
 use crate::{
-    task_log,
-    task_warn,
-    task::TaskState,
     tools::ParallelHandler,
     api2::types::{
         DATASTORE_MAP_ARRAY_SCHEMA,
index 652d7bf430fb4a26edeb52fdeea34da801e043c8..6698f4b79a96ef790cf7b60981c7afc02c15775b 100644 (file)
@@ -683,59 +683,6 @@ pub struct BackupContent {
     pub size: Option<u64>,
 }
 
-#[api(
-    properties: {
-        "upid": {
-            optional: true,
-            schema: UPID_SCHEMA,
-        },
-    },
-)]
-#[derive(Clone, Serialize, Deserialize)]
-#[serde(rename_all="kebab-case")]
-/// Garbage collection status.
-pub struct GarbageCollectionStatus {
-    pub upid: Option<String>,
-    /// Number of processed index files.
-    pub index_file_count: usize,
-    /// Sum of bytes referred by index files.
-    pub index_data_bytes: u64,
-    /// Bytes used on disk.
-    pub disk_bytes: u64,
-    /// Chunks used on disk.
-    pub disk_chunks: usize,
-    /// Sum of removed bytes.
-    pub removed_bytes: u64,
-    /// Number of removed chunks.
-    pub removed_chunks: usize,
-    /// Sum of pending bytes (pending removal - kept for safety).
-    pub pending_bytes: u64,
-    /// Number of pending chunks (pending removal - kept for safety).
-    pub pending_chunks: usize,
-    /// Number of chunks marked as .bad by verify that have been removed by GC.
-    pub removed_bad: usize,
-    /// Number of chunks still marked as .bad after garbage collection.
-    pub still_bad: usize,
-}
-
-impl Default for GarbageCollectionStatus {
-    fn default() -> Self {
-        GarbageCollectionStatus {
-            upid: None,
-            index_file_count: 0,
-            index_data_bytes: 0,
-            disk_bytes: 0,
-            disk_chunks: 0,
-            removed_bytes: 0,
-            removed_chunks: 0,
-            pending_bytes: 0,
-            pending_chunks: 0,
-            removed_bad: 0,
-            still_bad: 0,
-        }
-    }
-}
-
 #[api()]
 #[derive(Default, Serialize, Deserialize)]
 /// Storage space usage information.
diff --git a/src/backup/chunk_store.rs b/src/backup/chunk_store.rs
deleted file mode 100644 (file)
index 1ae85d6..0000000
+++ /dev/null
@@ -1,482 +0,0 @@
-use anyhow::{bail, format_err, Error};
-
-use std::path::{Path, PathBuf};
-use std::io::Write;
-use std::sync::{Arc, Mutex};
-use std::os::unix::io::AsRawFd;
-
-use proxmox::tools::fs::{CreateOptions, create_path, create_dir};
-
-use crate::task_log;
-use crate::tools;
-use crate::api2::types::GarbageCollectionStatus;
-
-use super::DataBlob;
-use crate::task::TaskState;
-
-/// File system based chunk store
-pub struct ChunkStore {
-    name: String, // used for error reporting
-    pub (crate) base: PathBuf,
-    chunk_dir: PathBuf,
-    mutex: Mutex<()>,
-    locker: Arc<Mutex<tools::ProcessLocker>>,
-}
-
-// TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ?
-
-pub fn verify_chunk_size(size: usize) -> Result<(), Error> {
-
-    static SIZES: [usize; 7] = [64*1024, 128*1024, 256*1024, 512*1024, 1024*1024, 2048*1024, 4096*1024];
-
-    if !SIZES.contains(&size) {
-        bail!("Got unsupported chunk size '{}'", size);
-    }
-    Ok(())
-}
-
-fn digest_to_prefix(digest: &[u8]) -> PathBuf {
-
-    let mut buf = Vec::<u8>::with_capacity(2+1+2+1);
-
-    const HEX_CHARS: &[u8; 16] = b"0123456789abcdef";
-
-    buf.push(HEX_CHARS[(digest[0] as usize) >> 4]);
-    buf.push(HEX_CHARS[(digest[0] as usize) &0xf]);
-    buf.push(HEX_CHARS[(digest[1] as usize) >> 4]);
-    buf.push(HEX_CHARS[(digest[1] as usize) & 0xf]);
-    buf.push(b'/');
-
-    let path = unsafe { String::from_utf8_unchecked(buf)};
-
-    path.into()
-}
-
-impl ChunkStore {
-
-    fn chunk_dir<P: AsRef<Path>>(path: P) -> PathBuf {
-
-        let mut chunk_dir: PathBuf = PathBuf::from(path.as_ref());
-        chunk_dir.push(".chunks");
-
-        chunk_dir
-    }
-
-    pub fn create<P>(name: &str, path: P, uid: nix::unistd::Uid, gid: nix::unistd::Gid, worker: Option<&dyn TaskState>) -> Result<Self, Error>
-    where
-        P: Into<PathBuf>,
-    {
-
-        let base: PathBuf = path.into();
-
-        if !base.is_absolute() {
-            bail!("expected absolute path - got {:?}", base);
-        }
-
-        let chunk_dir = Self::chunk_dir(&base);
-
-        let options = CreateOptions::new()
-            .owner(uid)
-            .group(gid);
-
-        let default_options = CreateOptions::new();
-
-        match create_path(&base, Some(default_options), Some(options.clone())) {
-            Err(err) => bail!("unable to create chunk store '{}' at {:?} - {}", name, base, err),
-            Ok(res) => if ! res  { nix::unistd::chown(&base, Some(uid), Some(gid))? },
-        }
-
-        if let Err(err) = create_dir(&chunk_dir, options.clone()) {
-            bail!("unable to create chunk store '{}' subdir {:?} - {}", name, chunk_dir, err);
-        }
-
-        // create lock file with correct owner/group
-        let lockfile_path = Self::lockfile_path(&base);
-        proxmox::tools::fs::replace_file(lockfile_path, b"", options.clone())?;
-
-        // create 64*1024 subdirs
-        let mut last_percentage = 0;
-
-        for i in 0..64*1024 {
-            let mut l1path = chunk_dir.clone();
-            l1path.push(format!("{:04x}", i));
-            if let Err(err) = create_dir(&l1path, options.clone()) {
-                bail!("unable to create chunk store '{}' subdir {:?} - {}", name, l1path, err);
-            }
-            let percentage = (i*100)/(64*1024);
-            if percentage != last_percentage {
-                if let Some(worker) = worker {
-                    task_log!(worker, "Chunkstore create: {}%", percentage)
-                }
-                last_percentage = percentage;
-            }
-        }
-
-        Self::open(name, base)
-    }
-
-    fn lockfile_path<P: Into<PathBuf>>(base: P) -> PathBuf {
-        let mut lockfile_path: PathBuf = base.into();
-
-        lockfile_path.push(".lock");
-
-        lockfile_path
-    }
-
-    pub fn open<P: Into<PathBuf>>(name: &str, base: P) -> Result<Self, Error> {
-
-        let base: PathBuf = base.into();
-
-        if !base.is_absolute() {
-            bail!("expected absolute path - got {:?}", base);
-        }
-
-        let chunk_dir = Self::chunk_dir(&base);
-
-        if let Err(err) = std::fs::metadata(&chunk_dir) {
-            bail!("unable to open chunk store '{}' at {:?} - {}", name, chunk_dir, err);
-        }
-
-        let lockfile_path = Self::lockfile_path(&base);
-
-        let locker = tools::ProcessLocker::new(&lockfile_path)?;
-
-        Ok(ChunkStore {
-            name: name.to_owned(),
-            base,
-            chunk_dir,
-            locker,
-            mutex: Mutex::new(())
-        })
-    }
-
-    pub fn touch_chunk(&self, digest: &[u8; 32]) -> Result<(), Error> {
-        self.cond_touch_chunk(digest, true)?;
-        Ok(())
-    }
-
-    pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> {
-        let (chunk_path, _digest_str) = self.chunk_path(digest);
-        self.cond_touch_path(&chunk_path, fail_if_not_exist)
-    }
-
-    pub fn cond_touch_path(&self, path: &Path, fail_if_not_exist: bool) -> Result<bool, Error> {
-        const UTIME_NOW: i64 = (1 << 30) - 1;
-        const UTIME_OMIT: i64 = (1 << 30) - 2;
-
-        let times: [libc::timespec; 2] = [
-            libc::timespec { tv_sec: 0, tv_nsec: UTIME_NOW },
-            libc::timespec { tv_sec: 0, tv_nsec: UTIME_OMIT }
-        ];
-
-        use nix::NixPath;
-
-        let res = path.with_nix_path(|cstr| unsafe {
-            let tmp = libc::utimensat(-1, cstr.as_ptr(), &times[0], libc::AT_SYMLINK_NOFOLLOW);
-            nix::errno::Errno::result(tmp)
-        })?;
-
-        if let Err(err) = res {
-            if !fail_if_not_exist && err.as_errno() == Some(nix::errno::Errno::ENOENT) {
-                return Ok(false);
-            }
-
-            bail!("update atime failed for chunk/file {:?} - {}", path, err);
-        }
-
-        Ok(true)
-    }
-
-    pub fn get_chunk_iterator(
-        &self,
-    ) -> Result<
-        impl Iterator<Item = (Result<pbs_tools::fs::ReadDirEntry, Error>, usize, bool)> + std::iter::FusedIterator,
-        Error
-    > {
-        use nix::dir::Dir;
-        use nix::fcntl::OFlag;
-        use nix::sys::stat::Mode;
-
-        let base_handle = Dir::open(&self.chunk_dir, OFlag::O_RDONLY, Mode::empty())
-            .map_err(|err| {
-                format_err!(
-                    "unable to open store '{}' chunk dir {:?} - {}",
-                    self.name,
-                    self.chunk_dir,
-                    err,
-                )
-            })?;
-
-        let mut done = false;
-        let mut inner: Option<pbs_tools::fs::ReadDir> = None;
-        let mut at = 0;
-        let mut percentage = 0;
-        Ok(std::iter::from_fn(move || {
-            if done {
-                return None;
-            }
-
-            loop {
-                if let Some(ref mut inner) = inner {
-                    match inner.next() {
-                        Some(Ok(entry)) => {
-                            // skip files if they're not a hash
-                            let bytes = entry.file_name().to_bytes();
-                            if bytes.len() != 64 && bytes.len() != 64 + ".0.bad".len() {
-                                continue;
-                            }
-                            if !bytes.iter().take(64).all(u8::is_ascii_hexdigit) {
-                                continue;
-                            }
-
-                            let bad = bytes.ends_with(b".bad");
-                            return Some((Ok(entry), percentage, bad));
-                        }
-                        Some(Err(err)) => {
-                            // stop after first error
-                            done = true;
-                            // and pass the error through:
-                            return Some((Err(err), percentage, false));
-                        }
-                        None => (), // open next directory
-                    }
-                }
-
-                inner = None;
-
-                if at == 0x10000 {
-                    done = true;
-                    return None;
-                }
-
-                let subdir: &str = &format!("{:04x}", at);
-                percentage = (at * 100) / 0x10000;
-                at += 1;
-                match pbs_tools::fs::read_subdir(base_handle.as_raw_fd(), subdir) {
-                    Ok(dir) => {
-                        inner = Some(dir);
-                        // start reading:
-                        continue;
-                    }
-                    Err(ref err) if err.as_errno() == Some(nix::errno::Errno::ENOENT) => {
-                        // non-existing directories are okay, just keep going:
-                        continue;
-                    }
-                    Err(err) => {
-                        // other errors are fatal, so end our iteration
-                        done = true;
-                        // and pass the error through:
-                        return Some((Err(format_err!("unable to read subdir '{}' - {}", subdir, err)), percentage, false));
-                    }
-                }
-            }
-        }).fuse())
-    }
-
-    pub fn oldest_writer(&self) -> Option<i64> {
-        tools::ProcessLocker::oldest_shared_lock(self.locker.clone())
-    }
-
-    pub fn sweep_unused_chunks(
-        &self,
-        oldest_writer: i64,
-        phase1_start_time: i64,
-        status: &mut GarbageCollectionStatus,
-        worker: &dyn TaskState,
-    ) -> Result<(), Error> {
-        use nix::sys::stat::fstatat;
-        use nix::unistd::{unlinkat, UnlinkatFlags};
-
-        let mut min_atime = phase1_start_time - 3600*24; // at least 24h (see mount option relatime)
-
-        if oldest_writer < min_atime {
-            min_atime = oldest_writer;
-        }
-
-        min_atime -= 300; // add 5 mins gap for safety
-
-        let mut last_percentage = 0;
-        let mut chunk_count = 0;
-
-        for (entry, percentage, bad) in self.get_chunk_iterator()? {
-            if last_percentage != percentage {
-                last_percentage = percentage;
-                crate::task_log!(
-                    worker,
-                    "processed {}% ({} chunks)",
-                    percentage,
-                    chunk_count,
-                );
-            }
-
-            worker.check_abort()?;
-            tools::fail_on_shutdown()?;
-
-            let (dirfd, entry) = match entry {
-                Ok(entry) => (entry.parent_fd(), entry),
-                Err(err) => bail!("chunk iterator on chunk store '{}' failed - {}", self.name, err),
-            };
-
-            let file_type = match entry.file_type() {
-                Some(file_type) => file_type,
-                None => bail!("unsupported file system type on chunk store '{}'", self.name),
-            };
-            if file_type != nix::dir::Type::File {
-                continue;
-            }
-
-            chunk_count += 1;
-
-            let filename = entry.file_name();
-
-            let lock = self.mutex.lock();
-
-            if let Ok(stat) = fstatat(dirfd, filename, nix::fcntl::AtFlags::AT_SYMLINK_NOFOLLOW) {
-                if stat.st_atime < min_atime {
-                    //let age = now - stat.st_atime;
-                    //println!("UNLINK {}  {:?}", age/(3600*24), filename);
-                    if let Err(err) = unlinkat(Some(dirfd), filename, UnlinkatFlags::NoRemoveDir) {
-                        if bad {
-                            status.still_bad += 1;
-                        }
-                        bail!(
-                            "unlinking chunk {:?} failed on store '{}' - {}",
-                            filename,
-                            self.name,
-                            err,
-                        );
-                    }
-                    if bad {
-                        status.removed_bad += 1;
-                    } else {
-                        status.removed_chunks += 1;
-                    }
-                    status.removed_bytes += stat.st_size as u64;
-                } else if stat.st_atime < oldest_writer {
-                    if bad {
-                        status.still_bad += 1;
-                    } else {
-                        status.pending_chunks += 1;
-                    }
-                    status.pending_bytes += stat.st_size as u64;
-                } else {
-                    if !bad {
-                        status.disk_chunks += 1;
-                    }
-                    status.disk_bytes += stat.st_size as u64;
-                }
-            }
-            drop(lock);
-        }
-
-        Ok(())
-    }
-
-    pub fn insert_chunk(
-        &self,
-        chunk: &DataBlob,
-        digest: &[u8; 32],
-    ) -> Result<(bool, u64), Error> {
-
-        //println!("DIGEST {}", proxmox::tools::digest_to_hex(digest));
-
-        let (chunk_path, digest_str) = self.chunk_path(digest);
-
-        let lock = self.mutex.lock();
-
-        if let Ok(metadata) = std::fs::metadata(&chunk_path) {
-            if metadata.is_file() {
-                self.touch_chunk(digest)?;
-                return Ok((true, metadata.len()));
-            } else {
-                bail!("Got unexpected file type on store '{}' for chunk {}", self.name, digest_str);
-            }
-        }
-
-        let mut tmp_path = chunk_path.clone();
-        tmp_path.set_extension("tmp");
-
-        let mut file = std::fs::File::create(&tmp_path)?;
-
-        let raw_data = chunk.raw_data();
-        let encoded_size = raw_data.len() as u64;
-
-        file.write_all(raw_data)?;
-
-        if let Err(err) = std::fs::rename(&tmp_path, &chunk_path) {
-            if std::fs::remove_file(&tmp_path).is_err()  { /* ignore */ }
-            bail!(
-                "Atomic rename on store '{}' failed for chunk {} - {}",
-                self.name,
-                digest_str,
-                err,
-            );
-        }
-
-        drop(lock);
-
-        Ok((false, encoded_size))
-    }
-
-    pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
-        let mut chunk_path = self.chunk_dir.clone();
-        let prefix = digest_to_prefix(digest);
-        chunk_path.push(&prefix);
-        let digest_str = proxmox::tools::digest_to_hex(digest);
-        chunk_path.push(&digest_str);
-        (chunk_path, digest_str)
-    }
-
-    pub fn relative_path(&self, path: &Path) -> PathBuf {
-
-        let mut full_path = self.base.clone();
-        full_path.push(path);
-        full_path
-    }
-
-    pub fn name(&self) -> &str {
-        &self.name
-    }
-
-    pub fn base_path(&self) -> PathBuf {
-        self.base.clone()
-    }
-
-    pub fn try_shared_lock(&self) -> Result<tools::ProcessLockSharedGuard, Error> {
-        tools::ProcessLocker::try_shared_lock(self.locker.clone())
-    }
-
-    pub fn try_exclusive_lock(&self) -> Result<tools::ProcessLockExclusiveGuard, Error> {
-        tools::ProcessLocker::try_exclusive_lock(self.locker.clone())
-    }
-}
-
-
-#[test]
-fn test_chunk_store1() {
-
-    let mut path = std::fs::canonicalize(".").unwrap(); // we need absolute path
-    path.push(".testdir");
-
-    if let Err(_e) = std::fs::remove_dir_all(".testdir") { /* ignore */ }
-
-    let chunk_store = ChunkStore::open("test", &path);
-    assert!(chunk_store.is_err());
-
-    let user = nix::unistd::User::from_uid(nix::unistd::Uid::current()).unwrap().unwrap();
-    let chunk_store = ChunkStore::create("test", &path, user.uid, user.gid, None).unwrap();
-
-    let (chunk, digest) = super::DataChunkBuilder::new(&[0u8, 1u8]).build().unwrap();
-
-    let (exists, _) = chunk_store.insert_chunk(&chunk, &digest).unwrap();
-    assert!(!exists);
-
-    let (exists, _) = chunk_store.insert_chunk(&chunk, &digest).unwrap();
-    assert!(exists);
-
-
-    let chunk_store = ChunkStore::create("test", &path, user.uid, user.gid, None);
-    assert!(chunk_store.is_err());
-
-    if let Err(_e) = std::fs::remove_dir_all(".testdir") { /* ignore */ }
-}
index 55458de36af7533c12546bdc9d2b54c75e70dd2b..412e9f88756ecde4ea4e32a0c17da766d4bf9207 100644 (file)
@@ -12,6 +12,8 @@ use lazy_static::lazy_static;
 
 use proxmox::tools::fs::{replace_file, file_read_optional_string, CreateOptions, open_file_locked};
 
+use pbs_datastore::{task_log, task_warn};
+use pbs_datastore::task::TaskState;
 use pbs_tools::format::HumanByte;
 use pbs_tools::fs::{lock_dir_noblock, DirLockGuard};
 
@@ -23,7 +25,6 @@ use super::manifest::{MANIFEST_BLOB_NAME, MANIFEST_LOCK_NAME, CLIENT_LOG_BLOB_NA
 use super::index::*;
 use super::{DataBlob, ArchiveType, archive_type};
 use crate::config::datastore::{self, DataStoreConfig};
-use crate::task::TaskState;
 use crate::tools;
 use crate::api2::types::{Authid, GarbageCollectionStatus};
 use crate::server::UPID;
@@ -55,7 +56,7 @@ impl DataStore {
 
         if let Some(datastore) = map.get(name) {
             // Compare Config - if changed, create new Datastore object!
-            if datastore.chunk_store.base == path &&
+            if datastore.chunk_store.base() == path &&
                 datastore.verify_new == config.verify_new.unwrap_or(false)
             {
                 return Ok(datastore.clone());
@@ -487,7 +488,7 @@ impl DataStore {
             tools::fail_on_shutdown()?;
             let digest = index.index_digest(pos).unwrap();
             if !self.chunk_store.cond_touch_chunk(digest, false)? {
-                crate::task_warn!(
+                task_warn!(
                     worker,
                     "warning: unable to access non-existent chunk {}, required by {:?}",
                     proxmox::tools::digest_to_hex(digest),
@@ -558,7 +559,7 @@ impl DataStore {
 
             let percentage = (i + 1) * 100 / image_count;
             if percentage > last_percentage {
-                crate::task_log!(
+                task_log!(
                     worker,
                     "marked {}% ({} of {} index files)",
                     percentage,
@@ -570,7 +571,7 @@ impl DataStore {
         }
 
         if strange_paths_count > 0 {
-            crate::task_log!(
+            task_log!(
                 worker,
                 "found (and marked) {} index files outside of expected directory scheme",
                 strange_paths_count,
@@ -604,26 +605,27 @@ impl DataStore {
             let mut gc_status = GarbageCollectionStatus::default();
             gc_status.upid = Some(upid.to_string());
 
-            crate::task_log!(worker, "Start GC phase1 (mark used chunks)");
+            task_log!(worker, "Start GC phase1 (mark used chunks)");
 
             self.mark_used_chunks(&mut gc_status, worker)?;
 
-            crate::task_log!(worker, "Start GC phase2 (sweep unused chunks)");
+            task_log!(worker, "Start GC phase2 (sweep unused chunks)");
             self.chunk_store.sweep_unused_chunks(
                 oldest_writer,
                 phase1_start_time,
                 &mut gc_status,
                 worker,
+                crate::tools::fail_on_shutdown,
             )?;
 
-            crate::task_log!(
+            task_log!(
                 worker,
                 "Removed garbage: {}",
                 HumanByte::from(gc_status.removed_bytes),
             );
-            crate::task_log!(worker, "Removed chunks: {}", gc_status.removed_chunks);
+            task_log!(worker, "Removed chunks: {}", gc_status.removed_chunks);
             if gc_status.pending_bytes > 0 {
-                crate::task_log!(
+                task_log!(
                     worker,
                     "Pending removals: {} (in {} chunks)",
                     HumanByte::from(gc_status.pending_bytes),
@@ -631,14 +633,14 @@ impl DataStore {
                 );
             }
             if gc_status.removed_bad > 0 {
-                crate::task_log!(worker, "Removed bad chunks: {}", gc_status.removed_bad);
+                task_log!(worker, "Removed bad chunks: {}", gc_status.removed_bad);
             }
 
             if gc_status.still_bad > 0 {
-                crate::task_log!(worker, "Leftover bad chunks: {}", gc_status.still_bad);
+                task_log!(worker, "Leftover bad chunks: {}", gc_status.still_bad);
             }
 
-            crate::task_log!(
+            task_log!(
                 worker,
                 "Original data usage: {}",
                 HumanByte::from(gc_status.index_data_bytes),
@@ -646,7 +648,7 @@ impl DataStore {
 
             if gc_status.index_data_bytes > 0 {
                 let comp_per = (gc_status.disk_bytes as f64 * 100.)/gc_status.index_data_bytes as f64;
-                crate::task_log!(
+                task_log!(
                     worker,
                     "On-Disk usage: {} ({:.2}%)",
                     HumanByte::from(gc_status.disk_bytes),
@@ -654,7 +656,7 @@ impl DataStore {
                 );
             }
 
-            crate::task_log!(worker, "On-Disk chunks: {}", gc_status.disk_chunks);
+            task_log!(worker, "On-Disk chunks: {}", gc_status.disk_chunks);
 
             let deduplication_factor = if gc_status.disk_bytes > 0 {
                 (gc_status.index_data_bytes as f64)/(gc_status.disk_bytes as f64)
@@ -662,11 +664,11 @@ impl DataStore {
                 1.0
             };
 
-            crate::task_log!(worker, "Deduplication factor: {:.2}", deduplication_factor);
+            task_log!(worker, "Deduplication factor: {:.2}", deduplication_factor);
 
             if gc_status.disk_chunks > 0 {
                 let avg_chunk = gc_status.disk_bytes/(gc_status.disk_chunks as u64);
-                crate::task_log!(worker, "Average chunk size: {}", HumanByte::from(avg_chunk));
+                task_log!(worker, "Average chunk size: {}", HumanByte::from(avg_chunk));
             }
 
             if let Ok(serialized) = serde_json::to_string(&gc_status) {
index c0acc2467d0db3f85d00ae2fa7784c8c397e3803..34d1c5ac49ea6c2beb6fa4337a8048cd4d45ab9e 100644 (file)
@@ -186,6 +186,8 @@ pub use pbs_datastore::checksum_reader;
 pub use pbs_datastore::checksum_reader::*;
 pub use pbs_datastore::checksum_writer;
 pub use pbs_datastore::checksum_writer::*;
+pub use pbs_datastore::chunk_store;
+pub use pbs_datastore::chunk_store::*;
 pub use pbs_datastore::chunker;
 pub use pbs_datastore::chunker::*;
 pub use pbs_datastore::crypt_config;
@@ -218,9 +220,6 @@ pub use chunk_stat::*;
 mod read_chunk;
 pub use read_chunk::*;
 
-mod chunk_store;
-pub use chunk_store::*;
-
 mod fixed_index;
 pub use fixed_index::*;
 
index 59aa25d01ab0286d206026a5b2457d114e43075e..57b1acf6fd05030143639c0f8724a43584232118 100644 (file)
@@ -6,6 +6,8 @@ use std::time::Instant;
 
 use anyhow::{bail, format_err, Error};
 
+use pbs_datastore::task_log;
+use pbs_datastore::task::TaskState;
 use pbs_tools::fs::lock_dir_noblock_shared;
 
 use crate::{
@@ -25,8 +27,6 @@ use crate::{
         archive_type,
     },
     server::UPID,
-    task::TaskState,
-    task_log,
     tools::ParallelHandler,
 };
 
index 19f91961a728bcdfcfc714a0a5f9d2b0324803c6..8db43f90a3316a9294a16c291471360fc7d085df 100644 (file)
@@ -1,7 +1,5 @@
 //! Sync datastore from remote server
 
-use anyhow::{bail, format_err, Error};
-use serde_json::json;
 use std::collections::{HashMap, HashSet};
 use std::convert::TryFrom;
 use std::io::{Seek, SeekFrom};
@@ -9,15 +7,20 @@ use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
+use anyhow::{bail, format_err, Error};
+use serde_json::json;
+
+use proxmox::api::error::{HttpError, StatusCode};
+
+use pbs_datastore::task_log;
+
 use crate::{
     api2::types::*,
     backup::*,
     client::*,
     server::WorkerTask,
-    task_log,
     tools::{compute_file_csum, ParallelHandler},
 };
-use proxmox::api::error::{HttpError, StatusCode};
 
 // fixme: implement filters
 // fixme: delete vanished groups
index 0af303a4745844efec43b9162bb3bbe1d4dbef7c..4815c4145957d3d0f814911befddcd63b44c1f19 100644 (file)
@@ -3,8 +3,6 @@
 //! The [backup](backup/index.html) module contains some detailed information
 //! on the inner workings of the backup server regarding data storage.
 
-pub mod task;
-
 #[macro_use]
 pub mod tools;
 
index ac56d16751531d23135ec74c28cd19ec79fb9139..248068ea04057eaed3ef08f084104175d4a6a315 100644 (file)
@@ -2,13 +2,13 @@ use anyhow::Error;
 
 use proxmox::try_block;
 
+use pbs_datastore::{task_log, task_warn};
+
 use crate::{
     api2::types::*,
     backup::{compute_prune_info, BackupInfo, DataStore, PruneOptions},
     server::jobstate::Job,
     server::WorkerTask,
-    task_log,
-    task_warn,
 };
 
 pub fn do_prune_job(
index 878fade540c2a4df3481e12146bf82e696f9c40a..ee9a4532ce4910a7469f9fe327b15a1347d8bcac 100644 (file)
@@ -1,5 +1,7 @@
 use anyhow::{format_err, Error};
 
+use pbs_datastore::task_log;
+
 use crate::{
     server::WorkerTask,
     api2::types::*,
@@ -10,7 +12,6 @@ use crate::{
         verify_filter,
         verify_all_backups,
     },
-    task_log,
 };
 
 /// Runs a verification job.
index f60556ef7bd042678f56d0d0a53e5230cca59c3e..135784466df2a929ffcd6aada893d809e04fd29a 100644 (file)
@@ -789,7 +789,7 @@ impl WorkerTask {
     }
 }
 
-impl crate::task::TaskState for WorkerTask {
+impl pbs_datastore::task::TaskState for WorkerTask {
     fn check_abort(&self) -> Result<(), Error> {
         self.fail_on_abort()
     }
index 7c218a465ff47f66ddcee989190d39a4c3f246be..8010d576b28b30e3600f589c2061137364988d27 100644 (file)
@@ -26,9 +26,10 @@ use proxmox::{
     api::section_config::SectionConfigData,
 };
 
+use pbs_datastore::task_log;
+use pbs_datastore::task::TaskState;
+
 use crate::{
-    task_log,
-    task::TaskState,
     backup::{
         Fingerprint,
         KeyConfig,
index a788cbeb8309c6f3d4c595e7248b55b33f1f5141..6f887c60851a61a2d3d8c06dd1b82582152c38c9 100644 (file)
@@ -13,8 +13,9 @@ use anyhow::{bail, Error};
 
 use proxmox::tools::Uuid;
 
+use pbs_datastore::task_log;
+
 use crate::{
-    task_log,
     backup::{
         DataStore,
     },
diff --git a/src/task.rs b/src/task.rs
deleted file mode 100644 (file)
index 69498e2..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-pub use pbs_datastore::task::TaskState;
-
-#[macro_export]
-macro_rules! task_error {
-    ($task:expr, $($fmt:tt)+) => {{
-        $crate::task::TaskState::log(&*$task, log::Level::Error, &format_args!($($fmt)+))
-    }};
-}
-
-#[macro_export]
-macro_rules! task_warn {
-    ($task:expr, $($fmt:tt)+) => {{
-        $crate::task::TaskState::log(&*$task, log::Level::Warn, &format_args!($($fmt)+))
-    }};
-}
-
-#[macro_export]
-macro_rules! task_log {
-    ($task:expr, $($fmt:tt)+) => {{
-        $crate::task::TaskState::log(&*$task, log::Level::Info, &format_args!($($fmt)+))
-    }};
-}
-
-#[macro_export]
-macro_rules! task_debug {
-    ($task:expr, $($fmt:tt)+) => {{
-        $crate::task::TaskState::log(&*$task, log::Level::Debug, &format_args!($($fmt)+))
-    }};
-}
-
-#[macro_export]
-macro_rules! task_trace {
-    ($task:expr, $($fmt:tt)+) => {{
-        $crate::task::TaskState::log(&*$task, log::Level::Trace, &format_args!($($fmt)+))
-    }};
-}