+use std::collections::BTreeSet;
use std::fs::File;
-use std::path::{Path, PathBuf};
-use std::collections::HashMap;
-use std::sync::RwLock;
-use std::io::Write;
use std::io::{BufRead, BufReader};
use std::os::unix::io::AsRawFd;
+use std::path::{Path, PathBuf};
+use std::sync::{Arc, RwLock};
+use std::thread::spawn;
+use std::time::SystemTime;
-use anyhow::{format_err, bail, Error};
-use nix::fcntl::OFlag;
+use anyhow::{bail, format_err, Error};
+use crossbeam_channel::{bounded, TryRecvError};
-use proxmox::tools::fs::{atomic_open_or_create_file, create_path, CreateOptions};
+use proxmox_sys::fs::{create_path, CreateOptions};
-use proxmox_rrd_api_types::{RRDMode, RRDTimeFrameResolution};
+use crate::rrd::{CF, DST, RRA, RRD};
+use crate::Entry;
-use crate::rrd::{DST, CF, RRD, RRA};
+mod journal;
+use journal::*;
-const RRD_JOURNAL_NAME: &str = "rrd.journal";
+mod rrd_map;
+use rrd_map::*;
/// RRD cache - keep RRD data in RAM, but write updates to disk
///
/// This cache is designed to run as single instance (no concurrent
/// access from other processes).
pub struct RRDCache {
+ config: Arc<CacheConfig>,
+ state: Arc<RwLock<JournalState>>,
+ rrd_map: Arc<RwLock<RRDMap>>,
+}
+
+pub(crate) struct CacheConfig {
apply_interval: f64,
basedir: PathBuf,
file_options: CreateOptions,
dir_options: CreateOptions,
- state: RwLock<RRDCacheState>,
-}
-
-// shared state behind RwLock
-struct RRDCacheState {
- rrd_map: HashMap<String, RRD>,
- journal: File,
- last_journal_flush: f64,
-}
-
-struct JournalEntry {
- time: f64,
- value: f64,
- dst: DST,
- rel_path: String,
}
impl RRDCache {
-
/// Creates a new instance
+ ///
+ /// `basedir`: All files are stored relative to this path.
+ ///
+ /// `file_options`: Files are created with this options.
+ ///
+ /// `dir_options`: Directories are created with this options.
+ ///
+ /// `apply_interval`: Commit journal after `apply_interval` seconds.
+ ///
+ /// `load_rrd_cb`; The callback function is used to load RRD files,
+ /// and should return a newly generated RRD if the file does not
+ /// exists (or is unreadable). This may generate RRDs with
+ /// different configurations (dependent on `rel_path`).
pub fn new<P: AsRef<Path>>(
basedir: P,
file_options: Option<CreateOptions>,
dir_options: Option<CreateOptions>,
apply_interval: f64,
+ load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
) -> Result<Self, Error> {
let basedir = basedir.as_ref().to_owned();
- let file_options = file_options.unwrap_or_else(|| CreateOptions::new());
- let dir_options = dir_options.unwrap_or_else(|| CreateOptions::new());
-
- create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone()))
- .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
+ let file_options = file_options.unwrap_or_else(CreateOptions::new);
+ let dir_options = dir_options.unwrap_or_else(CreateOptions::new);
- let mut journal_path = basedir.clone();
- journal_path.push(RRD_JOURNAL_NAME);
-
- let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
- let journal = atomic_open_or_create_file(&journal_path, flags, &[], file_options.clone())?;
-
- let state = RRDCacheState {
- journal,
- rrd_map: HashMap::new(),
- last_journal_flush: 0.0,
- };
+ create_path(
+ &basedir,
+ Some(dir_options.clone()),
+ Some(dir_options.clone()),
+ )
+ .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
- Ok(Self {
+ let config = Arc::new(CacheConfig {
basedir,
file_options,
dir_options,
apply_interval,
- state: RwLock::new(state),
- })
- }
+ });
- fn create_default_rrd(dst: DST) -> RRD {
+ let state = JournalState::new(Arc::clone(&config))?;
+ let rrd_map = RRDMap::new(Arc::clone(&config), load_rrd_cb);
- let mut rra_list = Vec::new();
+ Ok(Self {
+ config: Arc::clone(&config),
+ state: Arc::new(RwLock::new(state)),
+ rrd_map: Arc::new(RwLock::new(rrd_map)),
+ })
+ }
- // 1min * 1440 => 1day
- rra_list.push(RRA::new(CF::Average, 60, 1440));
- rra_list.push(RRA::new(CF::Maximum, 60, 1440));
+ /// Create a new RRD as used by the proxmox backup server
+ ///
+ /// It contains the following RRAs:
+ ///
+ /// * cf=average,r=60,n=1440 => 1day
+ /// * cf=maximum,r=60,n=1440 => 1day
+ /// * cf=average,r=30*60,n=1440 => 1month
+ /// * cf=maximum,r=30*60,n=1440 => 1month
+ /// * cf=average,r=6*3600,n=1440 => 1year
+ /// * cf=maximum,r=6*3600,n=1440 => 1year
+ /// * cf=average,r=7*86400,n=570 => 10years
+ /// * cf=maximum,r=7*86400,n=570 => 10year
+ ///
+ /// The resulting data file size is about 80KB.
+ pub fn create_proxmox_backup_default_rrd(dst: DST) -> RRD {
+ let rra_list = vec![
+ // 1 min * 1440 => 1 day
+ RRA::new(CF::Average, 60, 1440),
+ RRA::new(CF::Maximum, 60, 1440),
+ // 30 min * 1440 => 30 days ~ 1 month
+ RRA::new(CF::Average, 30 * 60, 1440),
+ RRA::new(CF::Maximum, 30 * 60, 1440),
+ // 6 h * 1440 => 360 days ~ 1 year
+ RRA::new(CF::Average, 6 * 3600, 1440),
+ RRA::new(CF::Maximum, 6 * 3600, 1440),
+ // 1 week * 570 => 10 years
+ RRA::new(CF::Average, 7 * 86400, 570),
+ RRA::new(CF::Maximum, 7 * 86400, 570),
+ ];
- // 30min * 1440 => 30days = 1month
- rra_list.push(RRA::new(CF::Average, 30*60, 1440));
- rra_list.push(RRA::new(CF::Maximum, 30*60, 1440));
+ RRD::new(dst, rra_list)
+ }
- // 6h * 1440 => 360days = 1year
- rra_list.push(RRA::new(CF::Average, 6*3600, 1440));
- rra_list.push(RRA::new(CF::Maximum, 6*3600, 1440));
+ /// Sync the journal data to disk (using `fdatasync` syscall)
+ pub fn sync_journal(&self) -> Result<(), Error> {
+ self.state.read().unwrap().sync_journal()
+ }
- // 1week * 570 => 10years
- rra_list.push(RRA::new(CF::Average, 7*86400, 570));
- rra_list.push(RRA::new(CF::Maximum, 7*86400, 570));
+ /// Apply and commit the journal. Should be used at server startup.
+ pub fn apply_journal(&self) -> Result<bool, Error> {
+ let config = Arc::clone(&self.config);
+ let state = Arc::clone(&self.state);
+ let rrd_map = Arc::clone(&self.rrd_map);
- RRD::new(dst, rra_list)
- }
+ let mut state_guard = self.state.write().unwrap();
+ let journal_applied = state_guard.journal_applied;
- fn parse_journal_line(line: &str) -> Result<JournalEntry, Error> {
+ if let Some(ref recv) = state_guard.apply_thread_result {
+ match recv.try_recv() {
+ Ok(Ok(())) => {
+ // finished without errors, OK
+ state_guard.apply_thread_result = None;
+ }
+ Ok(Err(err)) => {
+ // finished with errors, log them
+ log::error!("{}", err);
+ state_guard.apply_thread_result = None;
+ }
+ Err(TryRecvError::Empty) => {
+ // still running
+ return Ok(journal_applied);
+ }
+ Err(TryRecvError::Disconnected) => {
+ // crashed, start again
+ log::error!("apply journal thread crashed - try again");
+ state_guard.apply_thread_result = None;
+ }
+ }
+ }
- let line = line.trim();
+ let now = proxmox_time::epoch_f64();
+ let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval;
- let parts: Vec<&str> = line.splitn(4, ':').collect();
- if parts.len() != 4 {
- bail!("wrong numper of components");
+ if journal_applied && !wants_commit {
+ return Ok(journal_applied);
}
- let time: f64 = parts[0].parse()
- .map_err(|_| format_err!("unable to parse time"))?;
- let value: f64 = parts[1].parse()
- .map_err(|_| format_err!("unable to parse value"))?;
- let dst: u8 = parts[2].parse()
- .map_err(|_| format_err!("unable to parse data source type"))?;
-
- let dst = match dst {
- 0 => DST::Gauge,
- 1 => DST::Derive,
- _ => bail!("got strange value for data source type '{}'", dst),
- };
+ state_guard.last_journal_flush = proxmox_time::epoch_f64();
- let rel_path = parts[3].to_string();
+ let (sender, receiver) = bounded(1);
+ state_guard.apply_thread_result = Some(receiver);
- Ok(JournalEntry { time, value, dst, rel_path })
+ spawn(move || {
+ let result = apply_and_commit_journal_thread(config, state, rrd_map, journal_applied)
+ .map_err(|err| err.to_string());
+ sender.send(result).unwrap();
+ });
+
+ Ok(journal_applied)
}
- fn append_journal_entry(
- state: &mut RRDCacheState,
+ /// Update data in RAM and write file back to disk (journal)
+ pub fn update_value(
+ &self,
+ rel_path: &str,
time: f64,
value: f64,
dst: DST,
- rel_path: &str,
) -> Result<(), Error> {
- let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
- state.journal.write_all(journal_entry.as_bytes())?;
+ let journal_applied = self.apply_journal()?;
+
+ self.state
+ .write()
+ .unwrap()
+ .append_journal_entry(time, value, dst, rel_path)?;
+
+ if journal_applied {
+ self.rrd_map
+ .write()
+ .unwrap()
+ .update(rel_path, time, value, dst, false)?;
+ }
+
Ok(())
}
- pub fn apply_journal(&self) -> Result<(), Error> {
- let mut state = self.state.write().unwrap(); // block writers
- self.apply_journal_locked(&mut state)
+ /// Extract data from cached RRD
+ ///
+ /// `start`: Start time. If not specified, we simply extract 10 data points.
+ ///
+ /// `end`: End time. Default is to use the current time.
+ pub fn extract_cached_data(
+ &self,
+ base: &str,
+ name: &str,
+ cf: CF,
+ resolution: u64,
+ start: Option<u64>,
+ end: Option<u64>,
+ ) -> Result<Option<Entry>, Error> {
+ self.rrd_map
+ .read()
+ .unwrap()
+ .extract_cached_data(base, name, cf, resolution, start, end)
}
+}
- fn apply_journal_locked(&self, state: &mut RRDCacheState) -> Result<(), Error> {
-
- log::info!("applying rrd journal");
-
- state.last_journal_flush = proxmox_time::epoch_f64();
+fn apply_and_commit_journal_thread(
+ config: Arc<CacheConfig>,
+ state: Arc<RwLock<JournalState>>,
+ rrd_map: Arc<RwLock<RRDMap>>,
+ commit_only: bool,
+) -> Result<(), Error> {
+ if commit_only {
+ state.write().unwrap().rotate_journal()?; // start new journal, keep old one
+ } else {
+ let start_time = SystemTime::now();
+ log::debug!("applying rrd journal");
+
+ match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) {
+ Ok(entries) => {
+ let elapsed = start_time.elapsed().unwrap().as_secs_f64();
+ log::info!(
+ "applied rrd journal ({} entries in {:.3} seconds)",
+ entries,
+ elapsed
+ );
+ }
+ Err(err) => bail!("apply rrd journal failed - {}", err),
+ }
+ }
- let mut journal_path = self.basedir.clone();
- journal_path.push(RRD_JOURNAL_NAME);
+ let start_time = SystemTime::now();
+ log::debug!("commit rrd journal");
+
+ match commit_journal_impl(config, state, rrd_map) {
+ Ok(rrd_file_count) => {
+ let elapsed = start_time.elapsed().unwrap().as_secs_f64();
+ log::info!(
+ "rrd journal successfully committed ({} files in {:.3} seconds)",
+ rrd_file_count,
+ elapsed
+ );
+ }
+ Err(err) => bail!("rrd journal commit failed: {}", err),
+ }
+ Ok(())
+}
- let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
- let journal = atomic_open_or_create_file(&journal_path, flags, &[], self.file_options.clone())?;
- let mut journal = BufReader::new(journal);
+fn apply_journal_lines(
+ state: Arc<RwLock<JournalState>>,
+ rrd_map: Arc<RwLock<RRDMap>>,
+ journal_name: &str, // used for logging
+ reader: &mut BufReader<File>,
+ lock_read_line: bool,
+) -> Result<usize, Error> {
+ let mut linenr = 0;
+
+ loop {
+ linenr += 1;
+ let mut line = String::new();
+ let len = if lock_read_line {
+ let _lock = state.read().unwrap(); // make sure we read entire lines
+ reader.read_line(&mut line)?
+ } else {
+ reader.read_line(&mut line)?
+ };
- let mut last_update_map = HashMap::new();
+ if len == 0 {
+ break;
+ }
- let mut get_last_update = |rel_path: &str, rrd: &RRD| {
- if let Some(time) = last_update_map.get(rel_path) {
- return *time;
+ let entry: JournalEntry = match line.parse() {
+ Ok(entry) => entry,
+ Err(err) => {
+ log::warn!(
+ "unable to parse rrd journal '{}' line {} (skip) - {}",
+ journal_name,
+ linenr,
+ err,
+ );
+ continue; // skip unparsable lines
}
- let last_update = rrd.last_update();
- last_update_map.insert(rel_path.to_string(), last_update);
- last_update
};
- let mut linenr = 0;
- loop {
- linenr += 1;
- let mut line = String::new();
- let len = journal.read_line(&mut line)?;
- if len == 0 { break; }
-
- let entry = match Self::parse_journal_line(&line) {
- Ok(entry) => entry,
- Err(err) => {
- log::warn!("unable to parse rrd journal line {} (skip) - {}", linenr, err);
- continue; // skip unparsable lines
- }
- };
+ rrd_map.write().unwrap().update(
+ &entry.rel_path,
+ entry.time,
+ entry.value,
+ entry.dst,
+ true,
+ )?;
+ }
+ Ok(linenr)
+}
- if let Some(rrd) = state.rrd_map.get_mut(&entry.rel_path) {
- if entry.time > get_last_update(&entry.rel_path, &rrd) {
- rrd.update(entry.time, entry.value);
- }
- } else {
- let mut path = self.basedir.clone();
- path.push(&entry.rel_path);
- create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
-
- let mut rrd = match RRD::load(&path) {
- Ok(rrd) => rrd,
- Err(err) => {
- if err.kind() != std::io::ErrorKind::NotFound {
- log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err);
- }
- Self::create_default_rrd(entry.dst)
- },
- };
- if entry.time > get_last_update(&entry.rel_path, &rrd) {
- rrd.update(entry.time, entry.value);
- }
- state.rrd_map.insert(entry.rel_path.clone(), rrd);
- }
- }
+fn apply_journal_impl(
+ state: Arc<RwLock<JournalState>>,
+ rrd_map: Arc<RwLock<RRDMap>>,
+) -> Result<usize, Error> {
+ let mut lines = 0;
+
+ // Apply old journals first
+ let journal_list = state.read().unwrap().list_old_journals()?;
+
+ for entry in journal_list {
+ log::info!("apply old journal log {}", entry.name);
+ let file = std::fs::OpenOptions::new().read(true).open(&entry.path)?;
+ let mut reader = BufReader::new(file);
+ lines += apply_journal_lines(
+ Arc::clone(&state),
+ Arc::clone(&rrd_map),
+ &entry.name,
+ &mut reader,
+ false,
+ )?;
+ }
- // save all RRDs
+ let mut journal = state.read().unwrap().open_journal_reader()?;
- let mut errors = 0;
- for (rel_path, rrd) in state.rrd_map.iter() {
- let mut path = self.basedir.clone();
- path.push(&rel_path);
- if let Err(err) = rrd.save(&path, self.file_options.clone()) {
- errors += 1;
- log::error!("unable to save {:?}: {}", path, err);
- }
- }
+ lines += apply_journal_lines(
+ Arc::clone(&state),
+ Arc::clone(&rrd_map),
+ "rrd.journal",
+ &mut journal,
+ true,
+ )?;
- // if everything went ok, commit the journal
+ {
+ let mut state_guard = state.write().unwrap(); // block other writers
- if errors == 0 {
- nix::unistd::ftruncate(state.journal.as_raw_fd(), 0)
- .map_err(|err| format_err!("unable to truncate journal - {}", err))?;
- log::info!("rrd journal successfully committed");
- } else {
- log::error!("errors during rrd flush - unable to commit rrd journal");
- }
+ lines += apply_journal_lines(
+ Arc::clone(&state),
+ Arc::clone(&rrd_map),
+ "rrd.journal",
+ &mut journal,
+ false,
+ )?;
- Ok(())
- }
+ state_guard.rotate_journal()?; // start new journal, keep old one
- /// Update data in RAM and write file back to disk (if `save` is set)
- pub fn update_value(
- &self,
- rel_path: &str,
- value: f64,
- dst: DST,
- ) -> Result<(), Error> {
+ // We need to apply the journal only once, because further updates
+ // are always directly applied.
+ state_guard.journal_applied = true;
+ }
- let mut state = self.state.write().unwrap(); // block other writers
+ Ok(lines)
+}
- let now = proxmox_time::epoch_f64();
+fn fsync_file_or_dir(path: &Path) -> Result<(), Error> {
+ let file = std::fs::File::open(path)?;
+ nix::unistd::fsync(file.as_raw_fd())?;
+ Ok(())
+}
- if (now - state.last_journal_flush) > self.apply_interval {
- if let Err(err) = self.apply_journal_locked(&mut state) {
- log::error!("apply journal failed: {}", err);
- }
- }
+pub(crate) fn fsync_file_and_parent(path: &Path) -> Result<(), Error> {
+ let file = std::fs::File::open(path)?;
+ nix::unistd::fsync(file.as_raw_fd())?;
+ if let Some(parent) = path.parent() {
+ fsync_file_or_dir(parent)?;
+ }
+ Ok(())
+}
- Self::append_journal_entry(&mut state, now, value, dst, rel_path)?;
+fn rrd_parent_dir(basedir: &Path, rel_path: &str) -> PathBuf {
+ let mut path = basedir.to_owned();
+ let rel_path = Path::new(rel_path);
+ if let Some(parent) = rel_path.parent() {
+ path.push(parent);
+ }
+ path
+}
- if let Some(rrd) = state.rrd_map.get_mut(rel_path) {
- rrd.update(now, value);
- } else {
- let mut path = self.basedir.clone();
- path.push(rel_path);
- create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
- let mut rrd = match RRD::load(&path) {
- Ok(rrd) => rrd,
- Err(err) => {
- if err.kind() != std::io::ErrorKind::NotFound {
- log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err);
- }
- Self::create_default_rrd(dst)
- },
- };
- rrd.update(now, value);
- state.rrd_map.insert(rel_path.into(), rrd);
+fn commit_journal_impl(
+ config: Arc<CacheConfig>,
+ state: Arc<RwLock<JournalState>>,
+ rrd_map: Arc<RwLock<RRDMap>>,
+) -> Result<usize, Error> {
+ let files = rrd_map.read().unwrap().file_list();
+
+ let mut rrd_file_count = 0;
+ let mut errors = 0;
+
+ let mut dir_set = BTreeSet::new();
+
+ log::info!("write rrd data back to disk");
+
+ // save all RRDs - we only need a read lock here
+ // Note: no fsync here (we do it afterwards)
+ for rel_path in files.iter() {
+ let parent_dir = rrd_parent_dir(&config.basedir, rel_path);
+ dir_set.insert(parent_dir);
+ rrd_file_count += 1;
+ if let Err(err) = rrd_map.read().unwrap().flush_rrd_file(rel_path) {
+ errors += 1;
+ log::error!("unable to save rrd {}: {}", rel_path, err);
}
+ }
- Ok(())
+ if errors != 0 {
+ bail!("errors during rrd flush - unable to commit rrd journal");
}
- /// Extract data from cached RRD
- pub fn extract_cached_data(
- &self,
- base: &str,
- name: &str,
- now: f64,
- timeframe: RRDTimeFrameResolution,
- mode: RRDMode,
- ) -> Result<Option<(u64, u64, Vec<Option<f64>>)>, Error> {
+ // Important: We fsync files after writing all data! This increase
+ // the likelihood that files are already synced, so this is
+ // much faster (although we need to re-open the files).
- let state = self.state.read().unwrap();
+ log::info!("starting rrd data sync");
- let cf = match mode {
- RRDMode::Max => CF::Maximum,
- RRDMode::Average => CF::Average,
- };
+ for rel_path in files.iter() {
+ let mut path = config.basedir.clone();
+ path.push(rel_path);
+ fsync_file_or_dir(&path)
+ .map_err(|err| format_err!("fsync rrd file {} failed - {}", rel_path, err))?;
+ }
- let now = now as u64;
+ // also fsync directories
+ for dir_path in dir_set {
+ fsync_file_or_dir(&dir_path)
+ .map_err(|err| format_err!("fsync rrd dir {:?} failed - {}", dir_path, err))?;
+ }
- let (start, resolution) = match timeframe {
- RRDTimeFrameResolution::Hour => (now - 3600, 60),
- RRDTimeFrameResolution::Day => (now - 3600*24, 60),
- RRDTimeFrameResolution::Week => (now - 3600*24*7, 30*60),
- RRDTimeFrameResolution::Month => (now - 3600*24*30, 30*60),
- RRDTimeFrameResolution::Year => (now - 3600*24*365, 6*60*60),
- RRDTimeFrameResolution::Decade => (now - 10*3600*24*366, 7*86400),
- };
+ // if everything went ok, remove the old journal files
+ state.write().unwrap().remove_old_journals()?;
- match state.rrd_map.get(&format!("{}/{}", base, name)) {
- Some(rrd) => Ok(Some(rrd.extract_data(start, now, cf, resolution)?)),
- None => Ok(None),
- }
- }
+ Ok(rrd_file_count)
}