use std::fs::File;
use std::path::{Path, PathBuf};
use std::collections::HashMap;
-use std::sync::RwLock;
+use std::sync::{Arc, RwLock};
use std::io::Write;
use std::io::{BufRead, BufReader};
-use std::os::unix::io::AsRawFd;
use std::time::SystemTime;
-
+use std::ffi::OsStr;
+use std::thread::spawn;
+use crossbeam_channel::{bounded, Receiver, TryRecvError};
use anyhow::{format_err, bail, Error};
use nix::fcntl::OFlag;
/// This cache is designed to run as single instance (no concurrent
/// access from other processes).
pub struct RRDCache {
+ config: Arc<CacheConfig>,
+ state: Arc<RwLock<JournalState>>,
+ rrd_map: Arc<RwLock<RRDMap>>,
+}
+
+struct CacheConfig {
apply_interval: f64,
basedir: PathBuf,
file_options: CreateOptions,
- state: RwLock<JournalState>,
- rrd_map: RwLock<RRDMap>,
+ dir_options: CreateOptions,
}
struct RRDMap {
- basedir: PathBuf,
- file_options: CreateOptions,
- dir_options: CreateOptions,
+ config: Arc<CacheConfig>,
map: HashMap<String, RRD>,
load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
}
impl RRDMap {
+ fn new(
+ config: Arc<CacheConfig>,
+ load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
+ ) -> Self {
+ Self {
+ config,
+ map: HashMap::new(),
+ load_rrd_cb,
+ }
+ }
+
fn update(
&mut self,
rel_path: &str,
rrd.update(time, value);
}
} else {
- let mut path = self.basedir.clone();
+ let mut path = self.config.basedir.clone();
path.push(rel_path);
- create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
+ create_path(
+ path.parent().unwrap(),
+ Some(self.config.dir_options.clone()),
+ Some(self.config.dir_options.clone()),
+ )?;
let mut rrd = (self.load_rrd_cb)(&path, rel_path, dst);
for (rel_path, rrd) in self.map.iter() {
rrd_file_count += 1;
- let mut path = self.basedir.clone();
+ let mut path = self.config.basedir.clone();
path.push(&rel_path);
- if let Err(err) = rrd.save(&path, self.file_options.clone()) {
+ if let Err(err) = rrd.save(&path, self.config.file_options.clone()) {
errors += 1;
log::error!("unable to save {:?}: {}", path, err);
}
// shared state behind RwLock
struct JournalState {
+ config: Arc<CacheConfig>,
journal: File,
last_journal_flush: f64,
journal_applied: bool,
+ apply_thread_result: Option<Receiver<Result<(), String>>>,
}
struct JournalEntry {
rel_path: String,
}
+impl JournalState {
+
+ fn new(config: Arc<CacheConfig>) -> Result<Self, Error> {
+ let journal = JournalState::open_journal_writer(&config)?;
+ Ok(Self {
+ config,
+ journal,
+ last_journal_flush: 0.0,
+ journal_applied: false,
+ apply_thread_result: None,
+ })
+ }
+
+ fn open_journal_reader(&self) -> Result<BufReader<File>, Error> {
+
+ // fixme : dup self.journal instead??
+ let mut journal_path = self.config.basedir.clone();
+ journal_path.push(RRD_JOURNAL_NAME);
+
+ let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
+ let journal = atomic_open_or_create_file(
+ &journal_path,
+ flags,
+ &[],
+ self.config.file_options.clone(),
+ )?;
+ Ok(BufReader::new(journal))
+ }
+
+ fn open_journal_writer(config: &CacheConfig) -> Result<File, Error> {
+ let mut journal_path = config.basedir.clone();
+ journal_path.push(RRD_JOURNAL_NAME);
+
+ let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
+ let journal = atomic_open_or_create_file(
+ &journal_path,
+ flags,
+ &[],
+ config.file_options.clone(),
+ )?;
+ Ok(journal)
+ }
+
+ fn rotate_journal(&mut self) -> Result<(), Error> {
+ let mut journal_path = self.config.basedir.clone();
+ journal_path.push(RRD_JOURNAL_NAME);
+
+ let mut new_name = journal_path.clone();
+ let now = proxmox_time::epoch_i64();
+ new_name.set_extension(format!("journal-{:08x}", now));
+ std::fs::rename(journal_path, new_name)?;
+
+ self.journal = Self::open_journal_writer(&self.config)?;
+ Ok(())
+ }
+
+ fn remove_old_journals(&self) -> Result<(), Error> {
+
+ let journal_list = self.list_old_journals()?;
+
+ for (_time, _filename, path) in journal_list {
+ std::fs::remove_file(path)?;
+ }
+
+ Ok(())
+ }
+
+ fn list_old_journals(&self) -> Result<Vec<(u64, String, PathBuf)>, Error> {
+ let mut list = Vec::new();
+ for entry in std::fs::read_dir(&self.config.basedir)? {
+ let entry = entry?;
+ let path = entry.path();
+ if path.is_file() {
+ if let Some(stem) = path.file_stem() {
+ if stem != OsStr::new("rrd") { continue; }
+ if let Some(extension) = path.extension() {
+ if let Some(extension) = extension.to_str() {
+ if let Some(rest) = extension.strip_prefix("journal-") {
+ if let Ok(time) = u64::from_str_radix(rest, 16) {
+ list.push((time, format!("rrd.{}", extension), path.to_owned()));
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ list.sort_unstable_by_key(|t| t.0);
+ Ok(list)
+ }
+}
+
impl RRDCache {
/// Creates a new instance
create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone()))
.map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
- let mut journal_path = basedir.clone();
- journal_path.push(RRD_JOURNAL_NAME);
-
- let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
- let journal = atomic_open_or_create_file(&journal_path, flags, &[], file_options.clone())?;
-
- let state = JournalState {
- journal,
- last_journal_flush: 0.0,
- journal_applied: false,
- };
-
- let rrd_map = RRDMap {
+ let config = Arc::new(CacheConfig {
basedir: basedir.clone(),
file_options: file_options.clone(),
dir_options: dir_options,
- map: HashMap::new(),
- load_rrd_cb,
- };
+ apply_interval,
+ });
+
+ let state = JournalState::new(Arc::clone(&config))?;
+ let rrd_map = RRDMap::new(Arc::clone(&config), load_rrd_cb);
Ok(Self {
- basedir,
- file_options,
- apply_interval,
- state: RwLock::new(state),
- rrd_map: RwLock::new(rrd_map),
- })
+ config: Arc::clone(&config),
+ state: Arc::new(RwLock::new(state)),
+ rrd_map: Arc::new(RwLock::new(rrd_map)),
+ })
}
/// Create a new RRD as used by the proxmox backup server
}
fn append_journal_entry(
- state: &mut JournalState,
+ &self,
time: f64,
value: f64,
dst: DST,
rel_path: &str,
) -> Result<(), Error> {
+ let mut state = self.state.write().unwrap(); // block other writers
let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
state.journal.write_all(journal_entry.as_bytes())?;
Ok(())
}
/// Apply and commit the journal. Should be used at server startup.
- pub fn apply_journal(&self) -> Result<(), Error> {
- let mut state = self.state.write().unwrap(); // block writers
- self.apply_and_commit_journal_locked(&mut state)
- }
+ pub fn apply_journal(&self) -> Result<bool, Error> {
+ let state = Arc::clone(&self.state);
+ let rrd_map = Arc::clone(&self.rrd_map);
- fn apply_and_commit_journal_locked(&self, state: &mut JournalState) -> Result<(), Error> {
+ let mut state_guard = self.state.write().unwrap();
+ let journal_applied = state_guard.journal_applied;
+ let now = proxmox_time::epoch_f64();
+ let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval;
- state.last_journal_flush = proxmox_time::epoch_f64();
+ if journal_applied && !wants_commit { return Ok(journal_applied); }
- if !state.journal_applied {
- let start_time = SystemTime::now();
- log::debug!("applying rrd journal");
-
- match self.apply_journal_locked(state) {
- Ok(entries) => {
- let elapsed = start_time.elapsed()?.as_secs_f64();
- log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed);
+ if let Some(ref recv) = state_guard.apply_thread_result {
+ match recv.try_recv() {
+ Ok(Ok(())) => {
+ // finished without errors, OK
+ }
+ Ok(Err(err)) => {
+ // finished with errors, log them
+ log::error!("{}", err);
+ }
+ Err(TryRecvError::Empty) => {
+ // still running
+ return Ok(journal_applied);
+ }
+ Err(TryRecvError::Disconnected) => {
+ // crashed, start again
+ log::error!("apply journal thread crashed - try again");
}
- Err(err) => bail!("apply rrd journal failed - {}", err),
- }
- }
-
- let start_time = SystemTime::now();
- log::debug!("commit rrd journal");
-
- match self.commit_journal_locked(state) {
- Ok(rrd_file_count) => {
- let elapsed = start_time.elapsed()?.as_secs_f64();
- log::info!("rrd journal successfully committed ({} files in {:.3} seconds)",
- rrd_file_count, elapsed);
}
- Err(err) => bail!("rrd journal commit failed: {}", err),
}
- Ok(())
- }
-
- fn apply_journal_locked(&self, state: &mut JournalState) -> Result<usize, Error> {
-
- let mut journal_path = self.basedir.clone();
- journal_path.push(RRD_JOURNAL_NAME);
+ state_guard.last_journal_flush = proxmox_time::epoch_f64();
- let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
- let journal = atomic_open_or_create_file(&journal_path, flags, &[], self.file_options.clone())?;
- let mut journal = BufReader::new(journal);
-
- // fixme: apply blocked to avoid too many calls to self.rrd_map.write() ??
- let mut linenr = 0;
- loop {
- linenr += 1;
- let mut line = String::new();
- let len = journal.read_line(&mut line)?;
- if len == 0 { break; }
-
- let entry = match Self::parse_journal_line(&line) {
- Ok(entry) => entry,
- Err(err) => {
- log::warn!("unable to parse rrd journal line {} (skip) - {}", linenr, err);
- continue; // skip unparsable lines
- }
- };
+ let (sender, receiver) = bounded(1);
+ state_guard.apply_thread_result = Some(receiver);
- self.rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?;
- }
-
- // We need to apply the journal only once, because further updates
- // are always directly applied.
- state.journal_applied = true;
+ spawn(move || {
+ let result = apply_and_commit_journal_thread(state, rrd_map, journal_applied)
+ .map_err(|err| err.to_string());
+ sender.send(result).unwrap();
+ });
- Ok(linenr)
+ Ok(journal_applied)
}
- fn commit_journal_locked(&self, state: &mut JournalState) -> Result<usize, Error> {
-
- // save all RRDs - we only need a read lock here
- let rrd_file_count = self.rrd_map.read().unwrap().flush_rrd_files()?;
-
- // if everything went ok, commit the journal
-
- nix::unistd::ftruncate(state.journal.as_raw_fd(), 0)
- .map_err(|err| format_err!("unable to truncate journal - {}", err))?;
-
- Ok(rrd_file_count)
- }
/// Update data in RAM and write file back to disk (journal)
pub fn update_value(
dst: DST,
) -> Result<(), Error> {
- let mut state = self.state.write().unwrap(); // block other writers
-
- if !state.journal_applied || (time - state.last_journal_flush) > self.apply_interval {
- self.apply_and_commit_journal_locked(&mut state)?;
- }
+ let journal_applied = self.apply_journal()?;
- Self::append_journal_entry(&mut state, time, value, dst, rel_path)?;
+ self.append_journal_entry(time, value, dst, rel_path)?;
- self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?;
+ if journal_applied {
+ self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?;
+ }
Ok(())
}
.extract_cached_data(base, name, cf, resolution, start, end)
}
}
+
+
+fn apply_and_commit_journal_thread(
+ state: Arc<RwLock<JournalState>>,
+ rrd_map: Arc<RwLock<RRDMap>>,
+ commit_only: bool,
+) -> Result<(), Error> {
+
+ if commit_only {
+ state.write().unwrap().rotate_journal()?; // start new journal, keep old one
+ } else {
+ let start_time = SystemTime::now();
+ log::debug!("applying rrd journal");
+
+ match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) {
+ Ok(entries) => {
+ let elapsed = start_time.elapsed().unwrap().as_secs_f64();
+ log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed);
+ }
+ Err(err) => bail!("apply rrd journal failed - {}", err),
+ }
+ }
+
+ let start_time = SystemTime::now();
+ log::debug!("commit rrd journal");
+
+ match commit_journal_impl(state, rrd_map) {
+ Ok(rrd_file_count) => {
+ let elapsed = start_time.elapsed().unwrap().as_secs_f64();
+ log::info!("rrd journal successfully committed ({} files in {:.3} seconds)",
+ rrd_file_count, elapsed);
+ }
+ Err(err) => bail!("rrd journal commit failed: {}", err),
+ }
+ Ok(())
+}
+
+fn apply_journal_lines(
+ state: Arc<RwLock<JournalState>>,
+ rrd_map: Arc<RwLock<RRDMap>>,
+ journal_name: &str, // used for logging
+ reader: &mut BufReader<File>,
+ lock_read_line: bool,
+) -> Result<usize, Error> {
+
+ let mut linenr = 0;
+
+ loop {
+ linenr += 1;
+ let mut line = String::new();
+ let len = if lock_read_line {
+ let _lock = state.read().unwrap(); // make sure we read entire lines
+ reader.read_line(&mut line)?
+ } else {
+ reader.read_line(&mut line)?
+ };
+
+ if len == 0 { break; }
+
+ let entry = match RRDCache::parse_journal_line(&line) {
+ Ok(entry) => entry,
+ Err(err) => {
+ log::warn!(
+ "unable to parse rrd journal '{}' line {} (skip) - {}",
+ journal_name, linenr, err,
+ );
+ continue; // skip unparsable lines
+ }
+ };
+
+ rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?;
+ }
+ Ok(linenr)
+}
+
+fn apply_journal_impl(
+ state: Arc<RwLock<JournalState>>,
+ rrd_map: Arc<RwLock<RRDMap>>,
+) -> Result<usize, Error> {
+
+ let mut lines = 0;
+
+ // Apply old journals first
+ let journal_list = state.read().unwrap().list_old_journals()?;
+
+ for (_time, filename, path) in journal_list {
+ log::info!("apply old journal log {}", filename);
+ let file = std::fs::OpenOptions::new().read(true).open(path)?;
+ let mut reader = BufReader::new(file);
+ lines += apply_journal_lines(
+ Arc::clone(&state),
+ Arc::clone(&rrd_map),
+ &filename,
+ &mut reader,
+ false,
+ )?;
+ }
+
+ let mut journal = state.read().unwrap().open_journal_reader()?;
+
+ lines += apply_journal_lines(
+ Arc::clone(&state),
+ Arc::clone(&rrd_map),
+ "rrd.journal",
+ &mut journal,
+ true,
+ )?;
+
+ {
+ let mut state_guard = state.write().unwrap(); // block other writers
+
+ lines += apply_journal_lines(
+ Arc::clone(&state),
+ Arc::clone(&rrd_map),
+ "rrd.journal",
+ &mut journal,
+ false,
+ )?;
+
+ state_guard.rotate_journal()?; // start new journal, keep old one
+
+ // We need to apply the journal only once, because further updates
+ // are always directly applied.
+ state_guard.journal_applied = true;
+ }
+
+
+ Ok(lines)
+}
+
+fn commit_journal_impl(
+ state: Arc<RwLock<JournalState>>,
+ rrd_map: Arc<RwLock<RRDMap>>,
+) -> Result<usize, Error> {
+
+ // save all RRDs - we only need a read lock here
+ let rrd_file_count = rrd_map.read().unwrap().flush_rrd_files()?;
+
+ // if everything went ok, remove the old journal files
+ state.write().unwrap().remove_old_journals()?;
+
+ Ok(rrd_file_count)
+}