]> git.proxmox.com Git - proxmox-backup.git/blobdiff - proxmox-rrd/src/cache.rs
update to proxmox-sys 0.2 crate
[proxmox-backup.git] / proxmox-rrd / src / cache.rs
index 547f66038b2a20844a59fa1a0368809213225b14..b786f14f8a95e8c95cc87e21f8999d72dd06d72c 100644 (file)
@@ -4,10 +4,13 @@ use std::sync::{Arc, RwLock};
 use std::io::{BufRead, BufReader};
 use std::time::SystemTime;
 use std::thread::spawn;
+use std::os::unix::io::AsRawFd;
+use std::collections::BTreeSet;
+
 use crossbeam_channel::{bounded, TryRecvError};
 use anyhow::{format_err, bail, Error};
 
-use proxmox::tools::fs::{create_path, CreateOptions};
+use proxmox_sys::fs::{create_path, CreateOptions};
 
 use crate::rrd::{DST, CF, RRD, RRA};
 
@@ -120,39 +123,14 @@ impl RRDCache {
         RRD::new(dst, rra_list)
     }
 
-    fn parse_journal_line(line: &str) -> Result<JournalEntry, Error> {
-
-        let line = line.trim();
-
-        let parts: Vec<&str> = line.splitn(4, ':').collect();
-        if parts.len() != 4 {
-            bail!("wrong numper of components");
-        }
-
-        let time: f64 = parts[0].parse()
-            .map_err(|_| format_err!("unable to parse time"))?;
-        let value: f64 = parts[1].parse()
-            .map_err(|_| format_err!("unable to parse value"))?;
-        let dst: u8 = parts[2].parse()
-            .map_err(|_| format_err!("unable to parse data source type"))?;
-
-        let dst = match dst {
-            0 => DST::Gauge,
-            1 => DST::Derive,
-            _ => bail!("got strange value for data source type '{}'", dst),
-        };
-
-        let rel_path = parts[3].to_string();
-
-        Ok(JournalEntry { time, value, dst, rel_path })
-    }
-
+    /// Sync the journal data to disk (using `fdatasync` syscall)
     pub fn sync_journal(&self) -> Result<(), Error> {
         self.state.read().unwrap().sync_journal()
     }
 
     /// Apply and commit the journal. Should be used at server startup.
     pub fn apply_journal(&self) -> Result<bool, Error> {
+        let config = Arc::clone(&self.config);
         let state = Arc::clone(&self.state);
         let rrd_map = Arc::clone(&self.rrd_map);
 
@@ -194,7 +172,7 @@ impl RRDCache {
         state_guard.apply_thread_result = Some(receiver);
 
         spawn(move || {
-            let result = apply_and_commit_journal_thread(state, rrd_map, journal_applied)
+            let result = apply_and_commit_journal_thread(config, state, rrd_map, journal_applied)
                 .map_err(|err| err.to_string());
             sender.send(result).unwrap();
         });
@@ -245,6 +223,7 @@ impl RRDCache {
 
 
 fn apply_and_commit_journal_thread(
+    config: Arc<CacheConfig>,
     state: Arc<RwLock<JournalState>>,
     rrd_map: Arc<RwLock<RRDMap>>,
     commit_only: bool,
@@ -268,7 +247,7 @@ fn apply_and_commit_journal_thread(
     let start_time = SystemTime::now();
     log::debug!("commit rrd journal");
 
-    match commit_journal_impl(state, rrd_map) {
+    match commit_journal_impl(config, state, rrd_map) {
         Ok(rrd_file_count) => {
             let elapsed = start_time.elapsed().unwrap().as_secs_f64();
             log::info!("rrd journal successfully committed ({} files in {:.3} seconds)",
@@ -301,7 +280,7 @@ fn apply_journal_lines(
 
         if len == 0 { break; }
 
-        let entry = match RRDCache::parse_journal_line(&line) {
+        let entry: JournalEntry = match line.parse() {
             Ok(entry) => entry,
             Err(err) => {
                 log::warn!(
@@ -372,7 +351,32 @@ fn apply_journal_impl(
     Ok(lines)
 }
 
+fn fsync_file_or_dir(path: &Path) -> Result<(), Error> {
+    let file = std::fs::File::open(path)?;
+    nix::unistd::fsync(file.as_raw_fd())?;
+    Ok(())
+}
+
+pub(crate)fn fsync_file_and_parent(path: &Path) -> Result<(), Error> {
+    let file = std::fs::File::open(path)?;
+    nix::unistd::fsync(file.as_raw_fd())?;
+    if let Some(parent) = path.parent() {
+        fsync_file_or_dir(parent)?;
+    }
+    Ok(())
+}
+
+fn rrd_parent_dir(basedir: &Path, rel_path: &str) -> PathBuf {
+    let mut path = basedir.to_owned();
+    let rel_path = Path::new(rel_path);
+    if let Some(parent) = rel_path.parent() {
+        path.push(parent);
+    }
+    path
+}
+
 fn commit_journal_impl(
+    config: Arc<CacheConfig>,
     state: Arc<RwLock<JournalState>>,
     rrd_map: Arc<RwLock<RRDMap>>,
 ) -> Result<usize, Error> {
@@ -382,8 +386,15 @@ fn commit_journal_impl(
     let mut rrd_file_count = 0;
     let mut errors = 0;
 
+    let mut dir_set = BTreeSet::new();
+
+    log::info!("write rrd data back to disk");
+
     // save all RRDs - we only need a read lock here
+    // Note: no fsync here (we do it afterwards)
     for rel_path in files.iter() {
+        let parent_dir = rrd_parent_dir(&config.basedir, &rel_path);
+        dir_set.insert(parent_dir);
         rrd_file_count += 1;
         if let Err(err) = rrd_map.read().unwrap().flush_rrd_file(&rel_path) {
             errors += 1;
@@ -391,12 +402,29 @@ fn commit_journal_impl(
         }
     }
 
-    state.read().unwrap().syncfs()?;
-
     if errors != 0 {
         bail!("errors during rrd flush - unable to commit rrd journal");
     }
 
+    // Important: We fsync files after writing all data! This increase
+    // the likelihood that files are already synced, so this is
+    // much faster (although we need to re-open the files).
+
+    log::info!("starting rrd data sync");
+
+    for rel_path in files.iter() {
+        let mut path = config.basedir.clone();
+        path.push(&rel_path);
+        fsync_file_or_dir(&path)
+            .map_err(|err| format_err!("fsync rrd file {} failed - {}", rel_path, err))?;
+    }
+
+    // also fsync directories
+    for dir_path in dir_set {
+        fsync_file_or_dir(&dir_path)
+            .map_err(|err| format_err!("fsync rrd dir {:?} failed - {}", dir_path, err))?;
+    }
+
     // if everything went ok, remove the old journal files
     state.write().unwrap().remove_old_journals()?;