]> git.proxmox.com Git - proxmox.git/commitdiff
proxmox-rrd: implement non blocking journal
authorDietmar Maurer <dietmar@proxmox.com>
Fri, 15 Oct 2021 10:26:33 +0000 (12:26 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Sat, 16 Oct 2021 10:45:03 +0000 (12:45 +0200)
Do not block while applying the journal.

proxmox-rrd/Cargo.toml
proxmox-rrd/src/cache.rs

index 31473962cc7d9443b5310407f4b43ecbe51d7b8f..900b8fef8fa822e10464835349ec162ad3157406 100644 (file)
@@ -11,6 +11,7 @@ proxmox-router = "1.1"
 [dependencies]
 anyhow = "1.0"
 bitflags = "1.2.1"
+crossbeam-channel = "0.5"
 log = "0.4"
 nix = "0.19.1"
 serde = { version = "1.0", features = ["derive"] }
index bf8486a0255cef2eaf1d05944a63128ad19ec57a..7366281a61487880d530e45dbd8a172654a19acd 100644 (file)
@@ -1,12 +1,13 @@
 use std::fs::File;
 use std::path::{Path, PathBuf};
 use std::collections::HashMap;
-use std::sync::RwLock;
+use std::sync::{Arc, RwLock};
 use std::io::Write;
 use std::io::{BufRead, BufReader};
-use std::os::unix::io::AsRawFd;
 use std::time::SystemTime;
-
+use std::ffi::OsStr;
+use std::thread::spawn;
+use crossbeam_channel::{bounded, Receiver, TryRecvError};
 use anyhow::{format_err, bail, Error};
 use nix::fcntl::OFlag;
 
@@ -21,23 +22,37 @@ const RRD_JOURNAL_NAME: &str = "rrd.journal";
 /// This cache is designed to run as single instance (no concurrent
 /// access from other processes).
 pub struct RRDCache {
+    config: Arc<CacheConfig>,
+    state: Arc<RwLock<JournalState>>,
+    rrd_map: Arc<RwLock<RRDMap>>,
+}
+
+struct CacheConfig {
     apply_interval: f64,
     basedir: PathBuf,
     file_options: CreateOptions,
-    state: RwLock<JournalState>,
-    rrd_map: RwLock<RRDMap>,
+    dir_options: CreateOptions,
 }
 
 struct RRDMap {
-    basedir: PathBuf,
-    file_options: CreateOptions,
-    dir_options: CreateOptions,
+    config: Arc<CacheConfig>,
     map: HashMap<String, RRD>,
     load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
 }
 
 impl RRDMap {
 
+    fn new(
+        config: Arc<CacheConfig>,
+        load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
+    ) -> Self {
+        Self {
+            config,
+            map: HashMap::new(),
+            load_rrd_cb,
+        }
+    }
+
     fn update(
         &mut self,
         rel_path: &str,
@@ -51,9 +66,13 @@ impl RRDMap {
                 rrd.update(time, value);
             }
         } else {
-            let mut path = self.basedir.clone();
+            let mut path = self.config.basedir.clone();
             path.push(rel_path);
-            create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
+            create_path(
+                path.parent().unwrap(),
+                Some(self.config.dir_options.clone()),
+                Some(self.config.dir_options.clone()),
+            )?;
 
             let mut rrd = (self.load_rrd_cb)(&path, rel_path, dst);
 
@@ -72,10 +91,10 @@ impl RRDMap {
         for (rel_path, rrd) in self.map.iter() {
             rrd_file_count += 1;
 
-            let mut path = self.basedir.clone();
+            let mut path = self.config.basedir.clone();
             path.push(&rel_path);
 
-            if let Err(err) = rrd.save(&path, self.file_options.clone()) {
+            if let Err(err) = rrd.save(&path, self.config.file_options.clone()) {
                 errors += 1;
                 log::error!("unable to save {:?}: {}", path, err);
             }
@@ -106,9 +125,11 @@ impl RRDMap {
 
 // shared state behind RwLock
 struct JournalState {
+    config: Arc<CacheConfig>,
     journal: File,
     last_journal_flush: f64,
     journal_applied: bool,
+    apply_thread_result: Option<Receiver<Result<(), String>>>,
 }
 
 struct JournalEntry {
@@ -118,6 +139,98 @@ struct JournalEntry {
     rel_path: String,
 }
 
+impl JournalState {
+
+    fn new(config: Arc<CacheConfig>) -> Result<Self, Error> {
+        let journal = JournalState::open_journal_writer(&config)?;
+        Ok(Self {
+            config,
+            journal,
+            last_journal_flush: 0.0,
+            journal_applied: false,
+            apply_thread_result: None,
+        })
+    }
+
+    fn open_journal_reader(&self) -> Result<BufReader<File>, Error> {
+
+        // fixme : dup self.journal instead??
+        let mut journal_path = self.config.basedir.clone();
+        journal_path.push(RRD_JOURNAL_NAME);
+
+        let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
+        let journal = atomic_open_or_create_file(
+            &journal_path,
+            flags,
+            &[],
+            self.config.file_options.clone(),
+        )?;
+        Ok(BufReader::new(journal))
+    }
+
+    fn open_journal_writer(config: &CacheConfig) -> Result<File, Error> {
+        let mut journal_path = config.basedir.clone();
+        journal_path.push(RRD_JOURNAL_NAME);
+
+        let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
+        let journal = atomic_open_or_create_file(
+            &journal_path,
+            flags,
+            &[],
+            config.file_options.clone(),
+        )?;
+        Ok(journal)
+    }
+
+    fn rotate_journal(&mut self) -> Result<(), Error> {
+        let mut journal_path = self.config.basedir.clone();
+        journal_path.push(RRD_JOURNAL_NAME);
+
+        let mut new_name = journal_path.clone();
+        let now = proxmox_time::epoch_i64();
+        new_name.set_extension(format!("journal-{:08x}", now));
+        std::fs::rename(journal_path, new_name)?;
+
+        self.journal = Self::open_journal_writer(&self.config)?;
+        Ok(())
+    }
+
+    fn remove_old_journals(&self) -> Result<(), Error> {
+
+        let journal_list = self.list_old_journals()?;
+
+        for (_time, _filename, path) in journal_list {
+            std::fs::remove_file(path)?;
+        }
+
+        Ok(())
+    }
+
+    fn list_old_journals(&self) -> Result<Vec<(u64, String, PathBuf)>, Error> {
+        let mut list = Vec::new();
+        for entry in std::fs::read_dir(&self.config.basedir)? {
+            let entry = entry?;
+            let path = entry.path();
+            if path.is_file() {
+                if let Some(stem) = path.file_stem() {
+                    if stem != OsStr::new("rrd") { continue; }
+                    if let Some(extension) = path.extension() {
+                        if let Some(extension) = extension.to_str() {
+                            if let Some(rest) = extension.strip_prefix("journal-") {
+                                if let Ok(time) = u64::from_str_radix(rest, 16) {
+                                    list.push((time, format!("rrd.{}", extension), path.to_owned()));
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        list.sort_unstable_by_key(|t| t.0);
+        Ok(list)
+    }
+}
+
 impl RRDCache {
 
     /// Creates a new instance
@@ -149,33 +262,21 @@ impl RRDCache {
         create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone()))
             .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
 
-        let mut journal_path = basedir.clone();
-        journal_path.push(RRD_JOURNAL_NAME);
-
-        let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
-        let journal = atomic_open_or_create_file(&journal_path, flags,  &[], file_options.clone())?;
-
-        let state = JournalState {
-            journal,
-            last_journal_flush: 0.0,
-            journal_applied: false,
-        };
-
-        let rrd_map = RRDMap {
+        let config = Arc::new(CacheConfig {
             basedir: basedir.clone(),
             file_options: file_options.clone(),
             dir_options: dir_options,
-            map: HashMap::new(),
-            load_rrd_cb,
-        };
+            apply_interval,
+        });
+
+        let state = JournalState::new(Arc::clone(&config))?;
+        let rrd_map = RRDMap::new(Arc::clone(&config), load_rrd_cb);
 
         Ok(Self {
-            basedir,
-            file_options,
-            apply_interval,
-            state: RwLock::new(state),
-            rrd_map: RwLock::new(rrd_map),
-       })
+            config: Arc::clone(&config),
+            state: Arc::new(RwLock::new(state)),
+            rrd_map: Arc::new(RwLock::new(rrd_map)),
+        })
     }
 
     /// Create a new RRD as used by the proxmox backup server
@@ -243,102 +344,64 @@ impl RRDCache {
     }
 
     fn append_journal_entry(
-        state: &mut JournalState,
+        &self,
         time: f64,
         value: f64,
         dst: DST,
         rel_path: &str,
     ) -> Result<(), Error> {
+        let mut state = self.state.write().unwrap(); // block other writers
         let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
         state.journal.write_all(journal_entry.as_bytes())?;
         Ok(())
     }
 
     /// Apply and commit the journal. Should be used at server startup.
-    pub fn apply_journal(&self) -> Result<(), Error> {
-        let mut state = self.state.write().unwrap(); // block writers
-        self.apply_and_commit_journal_locked(&mut state)
-    }
+    pub fn apply_journal(&self) -> Result<bool, Error> {
+        let state = Arc::clone(&self.state);
+        let rrd_map = Arc::clone(&self.rrd_map);
 
-    fn apply_and_commit_journal_locked(&self, state: &mut JournalState) -> Result<(), Error> {
+        let mut state_guard = self.state.write().unwrap();
+        let journal_applied = state_guard.journal_applied;
+        let now = proxmox_time::epoch_f64();
+        let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval;
 
-        state.last_journal_flush = proxmox_time::epoch_f64();
+        if journal_applied && !wants_commit { return Ok(journal_applied); }
 
-        if !state.journal_applied {
-            let start_time = SystemTime::now();
-            log::debug!("applying rrd journal");
-
-            match self.apply_journal_locked(state) {
-                Ok(entries) => {
-                    let elapsed = start_time.elapsed()?.as_secs_f64();
-                    log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed);
+        if let Some(ref recv) = state_guard.apply_thread_result {
+            match recv.try_recv() {
+                Ok(Ok(())) => {
+                    // finished without errors, OK
+                }
+                Ok(Err(err)) => {
+                    // finished with errors, log them
+                    log::error!("{}", err);
+                }
+                Err(TryRecvError::Empty) => {
+                    // still running
+                    return Ok(journal_applied);
+                }
+                Err(TryRecvError::Disconnected) => {
+                    // crashed, start again
+                    log::error!("apply journal thread crashed - try again");
                 }
-                Err(err) => bail!("apply rrd journal failed - {}", err),
-            }
-        }
-
-        let start_time = SystemTime::now();
-        log::debug!("commit rrd journal");
-
-        match self.commit_journal_locked(state) {
-            Ok(rrd_file_count) => {
-                let elapsed = start_time.elapsed()?.as_secs_f64();
-                log::info!("rrd journal successfully committed ({} files in {:.3} seconds)",
-                           rrd_file_count, elapsed);
             }
-            Err(err) => bail!("rrd journal commit failed: {}", err),
         }
 
-        Ok(())
-    }
-
-    fn apply_journal_locked(&self, state: &mut JournalState) -> Result<usize, Error> {
-
-        let mut journal_path = self.basedir.clone();
-        journal_path.push(RRD_JOURNAL_NAME);
+        state_guard.last_journal_flush = proxmox_time::epoch_f64();
 
-        let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
-        let journal = atomic_open_or_create_file(&journal_path, flags,  &[], self.file_options.clone())?;
-        let mut journal = BufReader::new(journal);
-
-        // fixme: apply blocked to avoid too many calls to self.rrd_map.write() ??
-        let mut linenr = 0;
-        loop {
-            linenr += 1;
-            let mut line = String::new();
-            let len = journal.read_line(&mut line)?;
-            if len == 0 { break; }
-
-            let entry = match Self::parse_journal_line(&line) {
-                Ok(entry) => entry,
-                Err(err) => {
-                    log::warn!("unable to parse rrd journal line {} (skip) - {}", linenr, err);
-                    continue; // skip unparsable lines
-                }
-            };
+        let (sender, receiver) = bounded(1);
+        state_guard.apply_thread_result = Some(receiver);
 
-            self.rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?;
-        }
-
-        // We need to apply the journal only once, because further updates
-        // are always directly applied.
-        state.journal_applied = true;
+        spawn(move || {
+            let result = apply_and_commit_journal_thread(state, rrd_map, journal_applied)
+                .map_err(|err| err.to_string());
+            sender.send(result).unwrap();
+        });
 
-        Ok(linenr)
+        Ok(journal_applied)
     }
 
-    fn commit_journal_locked(&self, state: &mut JournalState) -> Result<usize, Error> {
-
-        // save all RRDs - we only need a read lock here
-        let rrd_file_count = self.rrd_map.read().unwrap().flush_rrd_files()?;
-
-        // if everything went ok, commit the journal
-
-        nix::unistd::ftruncate(state.journal.as_raw_fd(), 0)
-            .map_err(|err| format_err!("unable to truncate journal - {}", err))?;
-
-        Ok(rrd_file_count)
-    }
 
     /// Update data in RAM and write file back to disk (journal)
     pub fn update_value(
@@ -349,15 +412,13 @@ impl RRDCache {
         dst: DST,
     ) -> Result<(), Error> {
 
-        let mut state = self.state.write().unwrap(); // block other writers
-
-        if !state.journal_applied || (time - state.last_journal_flush) > self.apply_interval {
-            self.apply_and_commit_journal_locked(&mut state)?;
-        }
+        let journal_applied = self.apply_journal()?;
 
-        Self::append_journal_entry(&mut state, time, value, dst, rel_path)?;
+        self.append_journal_entry(time, value, dst, rel_path)?;
 
-        self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?;
+        if journal_applied {
+            self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?;
+        }
 
         Ok(())
     }
@@ -380,3 +441,146 @@ impl RRDCache {
             .extract_cached_data(base, name, cf, resolution, start, end)
     }
 }
+
+
+fn apply_and_commit_journal_thread(
+    state: Arc<RwLock<JournalState>>,
+    rrd_map: Arc<RwLock<RRDMap>>,
+    commit_only: bool,
+) -> Result<(), Error> {
+
+    if commit_only {
+        state.write().unwrap().rotate_journal()?; // start new journal, keep old one
+    } else {
+        let start_time = SystemTime::now();
+        log::debug!("applying rrd journal");
+
+        match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) {
+            Ok(entries) => {
+                let elapsed = start_time.elapsed().unwrap().as_secs_f64();
+                log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed);
+            }
+            Err(err) => bail!("apply rrd journal failed - {}", err),
+        }
+    }
+
+    let start_time = SystemTime::now();
+    log::debug!("commit rrd journal");
+
+    match commit_journal_impl(state, rrd_map) {
+        Ok(rrd_file_count) => {
+            let elapsed = start_time.elapsed().unwrap().as_secs_f64();
+            log::info!("rrd journal successfully committed ({} files in {:.3} seconds)",
+                       rrd_file_count, elapsed);
+        }
+        Err(err) => bail!("rrd journal commit failed: {}", err),
+    }
+    Ok(())
+}
+
+fn apply_journal_lines(
+    state: Arc<RwLock<JournalState>>,
+    rrd_map: Arc<RwLock<RRDMap>>,
+    journal_name: &str, // used for logging
+    reader: &mut BufReader<File>,
+    lock_read_line: bool,
+) -> Result<usize, Error> {
+
+    let mut linenr = 0;
+
+    loop {
+        linenr += 1;
+        let mut line = String::new();
+        let len = if lock_read_line {
+            let _lock = state.read().unwrap(); // make sure we read entire lines
+            reader.read_line(&mut line)?
+        } else {
+            reader.read_line(&mut line)?
+        };
+
+        if len == 0 { break; }
+
+        let entry = match RRDCache::parse_journal_line(&line) {
+            Ok(entry) => entry,
+            Err(err) => {
+                log::warn!(
+                    "unable to parse rrd journal '{}' line {} (skip) - {}",
+                    journal_name, linenr, err,
+                );
+                continue; // skip unparsable lines
+            }
+        };
+
+        rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?;
+    }
+    Ok(linenr)
+}
+
+fn apply_journal_impl(
+    state: Arc<RwLock<JournalState>>,
+    rrd_map: Arc<RwLock<RRDMap>>,
+) -> Result<usize, Error> {
+
+    let mut lines = 0;
+
+    // Apply old journals first
+    let journal_list = state.read().unwrap().list_old_journals()?;
+
+    for (_time, filename, path) in journal_list {
+        log::info!("apply old journal log {}", filename);
+        let file = std::fs::OpenOptions::new().read(true).open(path)?;
+        let mut reader = BufReader::new(file);
+        lines += apply_journal_lines(
+            Arc::clone(&state),
+            Arc::clone(&rrd_map),
+            &filename,
+            &mut reader,
+            false,
+        )?;
+    }
+
+    let mut journal = state.read().unwrap().open_journal_reader()?;
+
+    lines += apply_journal_lines(
+        Arc::clone(&state),
+        Arc::clone(&rrd_map),
+        "rrd.journal",
+        &mut journal,
+        true,
+    )?;
+
+    {
+        let mut state_guard = state.write().unwrap(); // block other writers
+
+        lines += apply_journal_lines(
+            Arc::clone(&state),
+            Arc::clone(&rrd_map),
+            "rrd.journal",
+            &mut journal,
+            false,
+        )?;
+
+        state_guard.rotate_journal()?; // start new journal, keep old one
+
+        // We need to apply the journal only once, because further updates
+        // are always directly applied.
+        state_guard.journal_applied = true;
+    }
+
+
+    Ok(lines)
+}
+
+fn commit_journal_impl(
+    state: Arc<RwLock<JournalState>>,
+    rrd_map: Arc<RwLock<RRDMap>>,
+) -> Result<usize, Error> {
+
+    // save all RRDs - we only need a read lock here
+    let rrd_file_count = rrd_map.read().unwrap().flush_rrd_files()?;
+
+    // if everything went ok, remove the old journal files
+    state.write().unwrap().remove_old_journals()?;
+
+    Ok(rrd_file_count)
+}