use std::io::{BufRead, BufReader};
use std::time::SystemTime;
use std::thread::spawn;
+use std::os::unix::io::AsRawFd;
+use std::collections::BTreeSet;
+
use crossbeam_channel::{bounded, TryRecvError};
use anyhow::{format_err, bail, Error};
-use proxmox::tools::fs::{create_path, CreateOptions};
+use proxmox_sys::fs::{create_path, CreateOptions};
use crate::rrd::{DST, CF, RRD, RRA};
RRD::new(dst, rra_list)
}
- fn parse_journal_line(line: &str) -> Result<JournalEntry, Error> {
-
- let line = line.trim();
-
- let parts: Vec<&str> = line.splitn(4, ':').collect();
- if parts.len() != 4 {
- bail!("wrong numper of components");
- }
-
- let time: f64 = parts[0].parse()
- .map_err(|_| format_err!("unable to parse time"))?;
- let value: f64 = parts[1].parse()
- .map_err(|_| format_err!("unable to parse value"))?;
- let dst: u8 = parts[2].parse()
- .map_err(|_| format_err!("unable to parse data source type"))?;
-
- let dst = match dst {
- 0 => DST::Gauge,
- 1 => DST::Derive,
- _ => bail!("got strange value for data source type '{}'", dst),
- };
-
- let rel_path = parts[3].to_string();
-
- Ok(JournalEntry { time, value, dst, rel_path })
- }
-
+ /// Sync the journal data to disk (using `fdatasync` syscall)
pub fn sync_journal(&self) -> Result<(), Error> {
self.state.read().unwrap().sync_journal()
}
/// Apply and commit the journal. Should be used at server startup.
pub fn apply_journal(&self) -> Result<bool, Error> {
+ let config = Arc::clone(&self.config);
let state = Arc::clone(&self.state);
let rrd_map = Arc::clone(&self.rrd_map);
state_guard.apply_thread_result = Some(receiver);
spawn(move || {
- let result = apply_and_commit_journal_thread(state, rrd_map, journal_applied)
+ let result = apply_and_commit_journal_thread(config, state, rrd_map, journal_applied)
.map_err(|err| err.to_string());
sender.send(result).unwrap();
});
fn apply_and_commit_journal_thread(
+ config: Arc<CacheConfig>,
state: Arc<RwLock<JournalState>>,
rrd_map: Arc<RwLock<RRDMap>>,
commit_only: bool,
let start_time = SystemTime::now();
log::debug!("commit rrd journal");
- match commit_journal_impl(state, rrd_map) {
+ match commit_journal_impl(config, state, rrd_map) {
Ok(rrd_file_count) => {
let elapsed = start_time.elapsed().unwrap().as_secs_f64();
log::info!("rrd journal successfully committed ({} files in {:.3} seconds)",
if len == 0 { break; }
- let entry = match RRDCache::parse_journal_line(&line) {
+ let entry: JournalEntry = match line.parse() {
Ok(entry) => entry,
Err(err) => {
log::warn!(
Ok(lines)
}
+fn fsync_file_or_dir(path: &Path) -> Result<(), Error> {
+ let file = std::fs::File::open(path)?;
+ nix::unistd::fsync(file.as_raw_fd())?;
+ Ok(())
+}
+
+pub(crate)fn fsync_file_and_parent(path: &Path) -> Result<(), Error> {
+ let file = std::fs::File::open(path)?;
+ nix::unistd::fsync(file.as_raw_fd())?;
+ if let Some(parent) = path.parent() {
+ fsync_file_or_dir(parent)?;
+ }
+ Ok(())
+}
+
+fn rrd_parent_dir(basedir: &Path, rel_path: &str) -> PathBuf {
+ let mut path = basedir.to_owned();
+ let rel_path = Path::new(rel_path);
+ if let Some(parent) = rel_path.parent() {
+ path.push(parent);
+ }
+ path
+}
+
fn commit_journal_impl(
+ config: Arc<CacheConfig>,
state: Arc<RwLock<JournalState>>,
rrd_map: Arc<RwLock<RRDMap>>,
) -> Result<usize, Error> {
let mut rrd_file_count = 0;
let mut errors = 0;
+ let mut dir_set = BTreeSet::new();
+
+ log::info!("write rrd data back to disk");
+
// save all RRDs - we only need a read lock here
+ // Note: no fsync here (we do it afterwards)
for rel_path in files.iter() {
+ let parent_dir = rrd_parent_dir(&config.basedir, &rel_path);
+ dir_set.insert(parent_dir);
rrd_file_count += 1;
if let Err(err) = rrd_map.read().unwrap().flush_rrd_file(&rel_path) {
errors += 1;
}
}
- state.read().unwrap().syncfs()?;
-
if errors != 0 {
bail!("errors during rrd flush - unable to commit rrd journal");
}
+ // Important: We fsync files after writing all data! This increase
+ // the likelihood that files are already synced, so this is
+ // much faster (although we need to re-open the files).
+
+ log::info!("starting rrd data sync");
+
+ for rel_path in files.iter() {
+ let mut path = config.basedir.clone();
+ path.push(&rel_path);
+ fsync_file_or_dir(&path)
+ .map_err(|err| format_err!("fsync rrd file {} failed - {}", rel_path, err))?;
+ }
+
+ // also fsync directories
+ for dir_path in dir_set {
+ fsync_file_or_dir(&dir_path)
+ .map_err(|err| format_err!("fsync rrd dir {:?} failed - {}", dir_path, err))?;
+ }
+
// if everything went ok, remove the old journal files
state.write().unwrap().remove_old_journals()?;