]> git.proxmox.com Git - proxmox-backup.git/blobdiff - proxmox-rrd/src/cache.rs
rrd_cache: use `proxmox-rrd` from `proxmox` workspace
[proxmox-backup.git] / proxmox-rrd / src / cache.rs
index 4c7f05f06a2fb0ce7d54b2fa32c843717887b900..254010f3547b199d1ae2fa5cf6b8932ad2da54a1 100644 (file)
@@ -1,51 +1,44 @@
+use std::collections::BTreeSet;
 use std::fs::File;
-use std::path::{Path, PathBuf};
-use std::collections::HashMap;
-use std::sync::RwLock;
-use std::io::Write;
 use std::io::{BufRead, BufReader};
 use std::os::unix::io::AsRawFd;
+use std::path::{Path, PathBuf};
+use std::sync::{Arc, RwLock};
+use std::thread::spawn;
 use std::time::SystemTime;
 
-use anyhow::{format_err, bail, Error};
-use nix::fcntl::OFlag;
+use anyhow::{bail, format_err, Error};
+use crossbeam_channel::{bounded, TryRecvError};
 
-use proxmox::tools::fs::{atomic_open_or_create_file, create_path, CreateOptions};
+use proxmox_sys::fs::{create_path, CreateOptions};
 
-use crate::rrd::{DST, CF, RRD, RRA};
+use crate::rrd::{CF, DST, RRA, RRD};
+use crate::Entry;
 
-const RRD_JOURNAL_NAME: &str = "rrd.journal";
+mod journal;
+use journal::*;
+
+mod rrd_map;
+use rrd_map::*;
 
 /// RRD cache - keep RRD data in RAM, but write updates to disk
 ///
 /// This cache is designed to run as single instance (no concurrent
 /// access from other processes).
 pub struct RRDCache {
+    config: Arc<CacheConfig>,
+    state: Arc<RwLock<JournalState>>,
+    rrd_map: Arc<RwLock<RRDMap>>,
+}
+
+pub(crate) struct CacheConfig {
     apply_interval: f64,
     basedir: PathBuf,
     file_options: CreateOptions,
     dir_options: CreateOptions,
-    state: RwLock<RRDCacheState>,
-    load_rrd_cb: fn(cache: &RRDCache, path: &Path, rel_path: &str, dst: DST) -> RRD,
-}
-
-// shared state behind RwLock
-struct RRDCacheState {
-    rrd_map: HashMap<String, RRD>,
-    journal: File,
-    last_journal_flush: f64,
-    journal_applied: bool,
-}
-
-struct JournalEntry {
-    time: f64,
-    value: f64,
-    dst: DST,
-    rel_path: String,
 }
 
 impl RRDCache {
-
     /// Creates a new instance
     ///
     /// `basedir`: All files are stored relative to this path.
@@ -65,36 +58,34 @@ impl RRDCache {
         file_options: Option<CreateOptions>,
         dir_options: Option<CreateOptions>,
         apply_interval: f64,
-        load_rrd_cb: fn(cache: &RRDCache, path: &Path, rel_path: &str, dst: DST) -> RRD,
+        load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
     ) -> Result<Self, Error> {
         let basedir = basedir.as_ref().to_owned();
 
-        let file_options = file_options.unwrap_or_else(|| CreateOptions::new());
-        let dir_options = dir_options.unwrap_or_else(|| CreateOptions::new());
-
-        create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone()))
-            .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
-
-        let mut journal_path = basedir.clone();
-        journal_path.push(RRD_JOURNAL_NAME);
+        let file_options = file_options.unwrap_or_else(CreateOptions::new);
+        let dir_options = dir_options.unwrap_or_else(CreateOptions::new);
 
-        let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
-        let journal = atomic_open_or_create_file(&journal_path, flags,  &[], file_options.clone())?;
+        create_path(
+            &basedir,
+            Some(dir_options.clone()),
+            Some(dir_options.clone()),
+        )
+        .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
 
-        let state = RRDCacheState {
-            journal,
-            rrd_map: HashMap::new(),
-            last_journal_flush: 0.0,
-            journal_applied: false,
-        };
-
-        Ok(Self {
+        let config = Arc::new(CacheConfig {
             basedir,
             file_options,
             dir_options,
             apply_interval,
-            load_rrd_cb,
-            state: RwLock::new(state),
+        });
+
+        let state = JournalState::new(Arc::clone(&config))?;
+        let rrd_map = RRDMap::new(Arc::clone(&config), load_rrd_cb);
+
+        Ok(Self {
+            config: Arc::clone(&config),
+            state: Arc::new(RwLock::new(state)),
+            rrd_map: Arc::new(RwLock::new(rrd_map)),
         })
     }
 
@@ -111,249 +102,347 @@ impl RRDCache {
     /// * cf=average,r=7*86400,n=570 => 10years
     /// * cf=maximum,r=7*86400,n=570 => 10year
     ///
-    /// The resultion data file size is about 80KB.
+    /// The resulting data file size is about 80KB.
     pub fn create_proxmox_backup_default_rrd(dst: DST) -> RRD {
-
-        let mut rra_list = Vec::new();
-
-        // 1min * 1440 => 1day
-        rra_list.push(RRA::new(CF::Average, 60, 1440));
-        rra_list.push(RRA::new(CF::Maximum, 60, 1440));
-
-        // 30min * 1440 => 30days = 1month
-        rra_list.push(RRA::new(CF::Average, 30*60, 1440));
-        rra_list.push(RRA::new(CF::Maximum, 30*60, 1440));
-
-        // 6h * 1440 => 360days = 1year
-        rra_list.push(RRA::new(CF::Average, 6*3600, 1440));
-        rra_list.push(RRA::new(CF::Maximum, 6*3600, 1440));
-
-        // 1week * 570 => 10years
-        rra_list.push(RRA::new(CF::Average, 7*86400, 570));
-        rra_list.push(RRA::new(CF::Maximum, 7*86400, 570));
+        let rra_list = vec![
+            // 1 min * 1440 => 1 day
+            RRA::new(CF::Average, 60, 1440),
+            RRA::new(CF::Maximum, 60, 1440),
+            // 30 min * 1440 => 30 days ~ 1 month
+            RRA::new(CF::Average, 30 * 60, 1440),
+            RRA::new(CF::Maximum, 30 * 60, 1440),
+            // 6 h * 1440 => 360 days ~ 1 year
+            RRA::new(CF::Average, 6 * 3600, 1440),
+            RRA::new(CF::Maximum, 6 * 3600, 1440),
+            // 1 week * 570 => 10 years
+            RRA::new(CF::Average, 7 * 86400, 570),
+            RRA::new(CF::Maximum, 7 * 86400, 570),
+        ];
 
         RRD::new(dst, rra_list)
     }
 
-    fn parse_journal_line(line: &str) -> Result<JournalEntry, Error> {
+    /// Sync the journal data to disk (using `fdatasync` syscall)
+    pub fn sync_journal(&self) -> Result<(), Error> {
+        self.state.read().unwrap().sync_journal()
+    }
+
+    /// Apply and commit the journal. Should be used at server startup.
+    pub fn apply_journal(&self) -> Result<bool, Error> {
+        let config = Arc::clone(&self.config);
+        let state = Arc::clone(&self.state);
+        let rrd_map = Arc::clone(&self.rrd_map);
+
+        let mut state_guard = self.state.write().unwrap();
+        let journal_applied = state_guard.journal_applied;
+
+        if let Some(ref recv) = state_guard.apply_thread_result {
+            match recv.try_recv() {
+                Ok(Ok(())) => {
+                    // finished without errors, OK
+                    state_guard.apply_thread_result = None;
+                }
+                Ok(Err(err)) => {
+                    // finished with errors, log them
+                    log::error!("{}", err);
+                    state_guard.apply_thread_result = None;
+                }
+                Err(TryRecvError::Empty) => {
+                    // still running
+                    return Ok(journal_applied);
+                }
+                Err(TryRecvError::Disconnected) => {
+                    // crashed, start again
+                    log::error!("apply journal thread crashed - try again");
+                    state_guard.apply_thread_result = None;
+                }
+            }
+        }
 
-        let line = line.trim();
+        let now = proxmox_time::epoch_f64();
+        let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval;
 
-        let parts: Vec<&str> = line.splitn(4, ':').collect();
-        if parts.len() != 4 {
-            bail!("wrong numper of components");
+        if journal_applied && !wants_commit {
+            return Ok(journal_applied);
         }
 
-        let time: f64 = parts[0].parse()
-            .map_err(|_| format_err!("unable to parse time"))?;
-        let value: f64 = parts[1].parse()
-            .map_err(|_| format_err!("unable to parse value"))?;
-        let dst: u8 = parts[2].parse()
-            .map_err(|_| format_err!("unable to parse data source type"))?;
-
-        let dst = match dst {
-            0 => DST::Gauge,
-            1 => DST::Derive,
-            _ => bail!("got strange value for data source type '{}'", dst),
-        };
+        state_guard.last_journal_flush = proxmox_time::epoch_f64();
+
+        let (sender, receiver) = bounded(1);
+        state_guard.apply_thread_result = Some(receiver);
 
-        let rel_path = parts[3].to_string();
+        spawn(move || {
+            let result = apply_and_commit_journal_thread(config, state, rrd_map, journal_applied)
+                .map_err(|err| err.to_string());
+            sender.send(result).unwrap();
+        });
 
-        Ok(JournalEntry { time, value, dst, rel_path })
+        Ok(journal_applied)
     }
 
-    fn append_journal_entry(
-        state: &mut RRDCacheState,
+    /// Update data in RAM and write file back to disk (journal)
+    pub fn update_value(
+        &self,
+        rel_path: &str,
         time: f64,
         value: f64,
         dst: DST,
-        rel_path: &str,
     ) -> Result<(), Error> {
-        let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
-        state.journal.write_all(journal_entry.as_bytes())?;
+        let journal_applied = self.apply_journal()?;
+
+        self.state
+            .write()
+            .unwrap()
+            .append_journal_entry(time, value, dst, rel_path)?;
+
+        if journal_applied {
+            self.rrd_map
+                .write()
+                .unwrap()
+                .update(rel_path, time, value, dst, false)?;
+        }
+
         Ok(())
     }
 
-    /// Apply and commit the journal. Should be used at server startup.
-    pub fn apply_journal(&self) -> Result<(), Error> {
-        let mut state = self.state.write().unwrap(); // block writers
-        self.apply_and_commit_journal_locked(&mut state)
+    /// Extract data from cached RRD
+    ///
+    /// `start`: Start time. If not specified, we simply extract 10 data points.
+    ///
+    /// `end`: End time. Default is to use the current time.
+    pub fn extract_cached_data(
+        &self,
+        base: &str,
+        name: &str,
+        cf: CF,
+        resolution: u64,
+        start: Option<u64>,
+        end: Option<u64>,
+    ) -> Result<Option<Entry>, Error> {
+        self.rrd_map
+            .read()
+            .unwrap()
+            .extract_cached_data(base, name, cf, resolution, start, end)
     }
+}
 
-    fn apply_and_commit_journal_locked(&self, state: &mut RRDCacheState) -> Result<(), Error> {
-
-        state.last_journal_flush = proxmox_time::epoch_f64();
-
-        if !state.journal_applied {
-            let start_time = SystemTime::now();
-            log::debug!("applying rrd journal");
-
-            match self.apply_journal_locked(state) {
-                Ok(entries) => {
-                    let elapsed = start_time.elapsed()?.as_secs_f64();
-                    log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed);
-                }
-                Err(err) => bail!("apply rrd journal failed - {}", err),
-            }
-        }
-
+fn apply_and_commit_journal_thread(
+    config: Arc<CacheConfig>,
+    state: Arc<RwLock<JournalState>>,
+    rrd_map: Arc<RwLock<RRDMap>>,
+    commit_only: bool,
+) -> Result<(), Error> {
+    if commit_only {
+        state.write().unwrap().rotate_journal()?; // start new journal, keep old one
+    } else {
         let start_time = SystemTime::now();
-        log::debug!("commit rrd journal");
-
-        match self.commit_journal_locked(state) {
-            Ok(rrd_file_count) => {
-                let elapsed = start_time.elapsed()?.as_secs_f64();
-                log::info!("rrd journal successfully committed ({} files in {:.3} seconds)",
-                           rrd_file_count, elapsed);
+        log::debug!("applying rrd journal");
+
+        match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) {
+            Ok(entries) => {
+                let elapsed = start_time.elapsed().unwrap().as_secs_f64();
+                log::info!(
+                    "applied rrd journal ({} entries in {:.3} seconds)",
+                    entries,
+                    elapsed
+                );
             }
-            Err(err) => bail!("rrd journal commit failed: {}", err),
+            Err(err) => bail!("apply rrd journal failed - {}", err),
         }
-
-        Ok(())
     }
 
-    fn apply_journal_locked(&self, state: &mut RRDCacheState) -> Result<usize, Error> {
-
-        let mut journal_path = self.basedir.clone();
-        journal_path.push(RRD_JOURNAL_NAME);
+    let start_time = SystemTime::now();
+    log::debug!("commit rrd journal");
+
+    match commit_journal_impl(config, state, rrd_map) {
+        Ok(rrd_file_count) => {
+            let elapsed = start_time.elapsed().unwrap().as_secs_f64();
+            log::info!(
+                "rrd journal successfully committed ({} files in {:.3} seconds)",
+                rrd_file_count,
+                elapsed
+            );
+        }
+        Err(err) => bail!("rrd journal commit failed: {}", err),
+    }
+    Ok(())
+}
 
-        let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
-        let journal = atomic_open_or_create_file(&journal_path, flags,  &[], self.file_options.clone())?;
-        let mut journal = BufReader::new(journal);
+fn apply_journal_lines(
+    state: Arc<RwLock<JournalState>>,
+    rrd_map: Arc<RwLock<RRDMap>>,
+    journal_name: &str, // used for logging
+    reader: &mut BufReader<File>,
+    lock_read_line: bool,
+) -> Result<usize, Error> {
+    let mut linenr = 0;
+
+    loop {
+        linenr += 1;
+        let mut line = String::new();
+        let len = if lock_read_line {
+            let _lock = state.read().unwrap(); // make sure we read entire lines
+            reader.read_line(&mut line)?
+        } else {
+            reader.read_line(&mut line)?
+        };
 
-        let mut last_update_map = HashMap::new();
+        if len == 0 {
+            break;
+        }
 
-        let mut get_last_update = |rel_path: &str, rrd: &RRD| {
-            if let Some(time) = last_update_map.get(rel_path) {
-                return *time;
+        let entry: JournalEntry = match line.parse() {
+            Ok(entry) => entry,
+            Err(err) => {
+                log::warn!(
+                    "unable to parse rrd journal '{}' line {} (skip) - {}",
+                    journal_name,
+                    linenr,
+                    err,
+                );
+                continue; // skip unparsable lines
             }
-            let last_update =  rrd.last_update();
-            last_update_map.insert(rel_path.to_string(), last_update);
-            last_update
         };
 
-        let mut linenr = 0;
-        loop {
-            linenr += 1;
-            let mut line = String::new();
-            let len = journal.read_line(&mut line)?;
-            if len == 0 { break; }
-
-            let entry = match Self::parse_journal_line(&line) {
-                Ok(entry) => entry,
-                Err(err) => {
-                    log::warn!("unable to parse rrd journal line {} (skip) - {}", linenr, err);
-                    continue; // skip unparsable lines
-                }
-            };
+        rrd_map.write().unwrap().update(
+            &entry.rel_path,
+            entry.time,
+            entry.value,
+            entry.dst,
+            true,
+        )?;
+    }
+    Ok(linenr)
+}
 
-            if let Some(rrd) = state.rrd_map.get_mut(&entry.rel_path) {
-                if entry.time > get_last_update(&entry.rel_path, &rrd) {
-                    rrd.update(entry.time, entry.value);
-                }
-            } else {
-                let mut path = self.basedir.clone();
-                path.push(&entry.rel_path);
-                create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
+fn apply_journal_impl(
+    state: Arc<RwLock<JournalState>>,
+    rrd_map: Arc<RwLock<RRDMap>>,
+) -> Result<usize, Error> {
+    let mut lines = 0;
+
+    // Apply old journals first
+    let journal_list = state.read().unwrap().list_old_journals()?;
+
+    for entry in journal_list {
+        log::info!("apply old journal log {}", entry.name);
+        let file = std::fs::OpenOptions::new().read(true).open(&entry.path)?;
+        let mut reader = BufReader::new(file);
+        lines += apply_journal_lines(
+            Arc::clone(&state),
+            Arc::clone(&rrd_map),
+            &entry.name,
+            &mut reader,
+            false,
+        )?;
+    }
 
-                let mut rrd = (self.load_rrd_cb)(&self, &path, &entry.rel_path, entry.dst);
+    let mut journal = state.read().unwrap().open_journal_reader()?;
 
-                if entry.time > get_last_update(&entry.rel_path, &rrd) {
-                    rrd.update(entry.time, entry.value);
-                }
-                state.rrd_map.insert(entry.rel_path.clone(), rrd);
-            }
-        }
+    lines += apply_journal_lines(
+        Arc::clone(&state),
+        Arc::clone(&rrd_map),
+        "rrd.journal",
+        &mut journal,
+        true,
+    )?;
+
+    {
+        let mut state_guard = state.write().unwrap(); // block other writers
 
+        lines += apply_journal_lines(
+            Arc::clone(&state),
+            Arc::clone(&rrd_map),
+            "rrd.journal",
+            &mut journal,
+            false,
+        )?;
+
+        state_guard.rotate_journal()?; // start new journal, keep old one
 
         // We need to apply the journal only once, because further updates
         // are always directly applied.
-        state.journal_applied = true;
-
-        Ok(linenr)
+        state_guard.journal_applied = true;
     }
 
-    fn commit_journal_locked(&self, state: &mut RRDCacheState) -> Result<usize, Error> {
-
-        // save all RRDs
-        let mut rrd_file_count = 0;
-
-        let mut errors = 0;
-        for (rel_path, rrd) in state.rrd_map.iter() {
-            rrd_file_count += 1;
-            let mut path = self.basedir.clone();
-            path.push(&rel_path);
-            if let Err(err) = rrd.save(&path, self.file_options.clone()) {
-                errors += 1;
-                log::error!("unable to save {:?}: {}", path, err);
-            }
-        }
-
-       if errors != 0 {
-            bail!("errors during rrd flush - unable to commit rrd journal");
-        }
-
-        // if everything went ok, commit the journal
+    Ok(lines)
+}
 
-        nix::unistd::ftruncate(state.journal.as_raw_fd(), 0)
-            .map_err(|err| format_err!("unable to truncate journal - {}", err))?;
+fn fsync_file_or_dir(path: &Path) -> Result<(), Error> {
+    let file = std::fs::File::open(path)?;
+    nix::unistd::fsync(file.as_raw_fd())?;
+    Ok(())
+}
 
-        Ok(rrd_file_count)
+pub(crate) fn fsync_file_and_parent(path: &Path) -> Result<(), Error> {
+    let file = std::fs::File::open(path)?;
+    nix::unistd::fsync(file.as_raw_fd())?;
+    if let Some(parent) = path.parent() {
+        fsync_file_or_dir(parent)?;
     }
+    Ok(())
+}
 
-    /// Update data in RAM and write file back to disk (journal)
-    pub fn update_value(
-        &self,
-        rel_path: &str,
-        time: f64,
-        value: f64,
-        dst: DST,
-    ) -> Result<(), Error> {
-
-        let mut state = self.state.write().unwrap(); // block other writers
+fn rrd_parent_dir(basedir: &Path, rel_path: &str) -> PathBuf {
+    let mut path = basedir.to_owned();
+    let rel_path = Path::new(rel_path);
+    if let Some(parent) = rel_path.parent() {
+        path.push(parent);
+    }
+    path
+}
 
-        if !state.journal_applied || (time - state.last_journal_flush) > self.apply_interval {
-            self.apply_and_commit_journal_locked(&mut state)?;
+fn commit_journal_impl(
+    config: Arc<CacheConfig>,
+    state: Arc<RwLock<JournalState>>,
+    rrd_map: Arc<RwLock<RRDMap>>,
+) -> Result<usize, Error> {
+    let files = rrd_map.read().unwrap().file_list();
+
+    let mut rrd_file_count = 0;
+    let mut errors = 0;
+
+    let mut dir_set = BTreeSet::new();
+
+    log::info!("write rrd data back to disk");
+
+    // save all RRDs - we only need a read lock here
+    // Note: no fsync here (we do it afterwards)
+    for rel_path in files.iter() {
+        let parent_dir = rrd_parent_dir(&config.basedir, rel_path);
+        dir_set.insert(parent_dir);
+        rrd_file_count += 1;
+        if let Err(err) = rrd_map.read().unwrap().flush_rrd_file(rel_path) {
+            errors += 1;
+            log::error!("unable to save rrd {}: {}", rel_path, err);
         }
+    }
 
-        Self::append_journal_entry(&mut state, time, value, dst, rel_path)?;
-
-        if let Some(rrd) = state.rrd_map.get_mut(rel_path) {
-            rrd.update(time, value);
-        } else {
-            let mut path = self.basedir.clone();
-            path.push(rel_path);
-            create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
+    if errors != 0 {
+        bail!("errors during rrd flush - unable to commit rrd journal");
+    }
 
-            let mut rrd = (self.load_rrd_cb)(&self, &path, rel_path, dst);
+    // Important: We fsync files after writing all data! This increase
+    // the likelihood that files are already synced, so this is
+    // much faster (although we need to re-open the files).
 
-            rrd.update(time, value);
-            state.rrd_map.insert(rel_path.into(), rrd);
-        }
+    log::info!("starting rrd data sync");
 
-        Ok(())
+    for rel_path in files.iter() {
+        let mut path = config.basedir.clone();
+        path.push(rel_path);
+        fsync_file_or_dir(&path)
+            .map_err(|err| format_err!("fsync rrd file {} failed - {}", rel_path, err))?;
     }
 
-    /// Extract data from cached RRD
-    ///
-    /// `start`: Start time. If not sepecified, we simply extract 10 data points.
-    ///
-    /// `end`: End time. Default is to use the current time.
-    pub fn extract_cached_data(
-        &self,
-        base: &str,
-        name: &str,
-        cf: CF,
-        resolution: u64,
-        start: Option<u64>,
-        end: Option<u64>,
-    ) -> Result<Option<(u64, u64, Vec<Option<f64>>)>, Error> {
+    // also fsync directories
+    for dir_path in dir_set {
+        fsync_file_or_dir(&dir_path)
+            .map_err(|err| format_err!("fsync rrd dir {:?} failed - {}", dir_path, err))?;
+    }
 
-        let state = self.state.read().unwrap();
+    // if everything went ok, remove the old journal files
+    state.write().unwrap().remove_old_journals()?;
 
-        match state.rrd_map.get(&format!("{}/{}", base, name)) {
-            Some(rrd) => Ok(Some(rrd.extract_data(cf, resolution, start, end)?)),
-            None => Ok(None),
-        }
-    }
+    Ok(rrd_file_count)
 }