]> git.proxmox.com Git - proxmox.git/commitdiff
proxmox-rrd: avoild blocking readers while applying the journal
authorDietmar Maurer <dietmar@proxmox.com>
Fri, 15 Oct 2021 07:22:07 +0000 (09:22 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Fri, 15 Oct 2021 07:22:07 +0000 (09:22 +0200)
By using and extra RwLock<RRDMap> on the rrd data.

proxmox-rrd/src/cache.rs

index 4c7f05f06a2fb0ce7d54b2fa32c843717887b900..54fbe3785c4b46d2c7a41a5a58414bc0c8684dd5 100644 (file)
@@ -24,14 +24,88 @@ pub struct RRDCache {
     apply_interval: f64,
     basedir: PathBuf,
     file_options: CreateOptions,
-    dir_options: CreateOptions,
     state: RwLock<RRDCacheState>,
-    load_rrd_cb: fn(cache: &RRDCache, path: &Path, rel_path: &str, dst: DST) -> RRD,
+    rrd_map: RwLock<RRDMap>,
+}
+
+struct RRDMap {
+    basedir: PathBuf,
+    file_options: CreateOptions,
+    dir_options: CreateOptions,
+    map: HashMap<String, RRD>,
+    load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
+}
+
+impl RRDMap {
+
+    fn update(
+        &mut self,
+        rel_path: &str,
+        time: f64,
+        value: f64,
+        dst: DST,
+        new_only: bool,
+    ) -> Result<(), Error> {
+        if let Some(rrd) = self.map.get_mut(rel_path) {
+            if !new_only || time > rrd.last_update() {
+                rrd.update(time, value);
+            }
+        } else {
+            let mut path = self.basedir.clone();
+            path.push(rel_path);
+            create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
+
+            let mut rrd = (self.load_rrd_cb)(&path, rel_path, dst);
+
+            if !new_only || time > rrd.last_update() {
+                rrd.update(time, value);
+            }
+            self.map.insert(rel_path.to_string(), rrd);
+        }
+        Ok(())
+    }
+
+    fn flush_rrd_files(&self) -> Result<usize, Error> {
+        let mut rrd_file_count = 0;
+
+        let mut errors = 0;
+        for (rel_path, rrd) in self.map.iter() {
+            rrd_file_count += 1;
+
+            let mut path = self.basedir.clone();
+            path.push(&rel_path);
+
+            if let Err(err) = rrd.save(&path, self.file_options.clone()) {
+                errors += 1;
+                log::error!("unable to save {:?}: {}", path, err);
+            }
+        }
+
+        if errors != 0 {
+            bail!("errors during rrd flush - unable to commit rrd journal");
+        }
+
+        Ok(rrd_file_count)
+    }
+
+    fn extract_cached_data(
+        &self,
+        base: &str,
+        name: &str,
+        cf: CF,
+        resolution: u64,
+        start: Option<u64>,
+        end: Option<u64>,
+    ) -> Result<Option<(u64, u64, Vec<Option<f64>>)>, Error> {
+        match self.map.get(&format!("{}/{}", base, name)) {
+            Some(rrd) => Ok(Some(rrd.extract_data(cf, resolution, start, end)?)),
+            None => Ok(None),
+        }
+    }
 }
 
 // shared state behind RwLock
 struct RRDCacheState {
-    rrd_map: HashMap<String, RRD>,
     journal: File,
     last_journal_flush: f64,
     journal_applied: bool,
@@ -65,7 +139,7 @@ impl RRDCache {
         file_options: Option<CreateOptions>,
         dir_options: Option<CreateOptions>,
         apply_interval: f64,
-        load_rrd_cb: fn(cache: &RRDCache, path: &Path, rel_path: &str, dst: DST) -> RRD,
+        load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
     ) -> Result<Self, Error> {
         let basedir = basedir.as_ref().to_owned();
 
@@ -83,19 +157,25 @@ impl RRDCache {
 
         let state = RRDCacheState {
             journal,
-            rrd_map: HashMap::new(),
             last_journal_flush: 0.0,
             journal_applied: false,
         };
 
+        let rrd_map = RRDMap {
+            basedir: basedir.clone(),
+            file_options: file_options.clone(),
+            dir_options: dir_options,
+            map: HashMap::new(),
+            load_rrd_cb,
+        };
+
         Ok(Self {
             basedir,
             file_options,
-            dir_options,
             apply_interval,
-            load_rrd_cb,
             state: RwLock::new(state),
-        })
+            rrd_map: RwLock::new(rrd_map),
+       })
     }
 
     /// Create a new RRD as used by the proxmox backup server
@@ -221,17 +301,7 @@ impl RRDCache {
         let journal = atomic_open_or_create_file(&journal_path, flags,  &[], self.file_options.clone())?;
         let mut journal = BufReader::new(journal);
 
-        let mut last_update_map = HashMap::new();
-
-        let mut get_last_update = |rel_path: &str, rrd: &RRD| {
-            if let Some(time) = last_update_map.get(rel_path) {
-                return *time;
-            }
-            let last_update =  rrd.last_update();
-            last_update_map.insert(rel_path.to_string(), last_update);
-            last_update
-        };
-
+        // fixme: apply blocked to avoid too many calls to self.rrd_map.write() ??
         let mut linenr = 0;
         loop {
             linenr += 1;
@@ -247,25 +317,9 @@ impl RRDCache {
                 }
             };
 
-            if let Some(rrd) = state.rrd_map.get_mut(&entry.rel_path) {
-                if entry.time > get_last_update(&entry.rel_path, &rrd) {
-                    rrd.update(entry.time, entry.value);
-                }
-            } else {
-                let mut path = self.basedir.clone();
-                path.push(&entry.rel_path);
-                create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
-
-                let mut rrd = (self.load_rrd_cb)(&self, &path, &entry.rel_path, entry.dst);
-
-                if entry.time > get_last_update(&entry.rel_path, &rrd) {
-                    rrd.update(entry.time, entry.value);
-                }
-                state.rrd_map.insert(entry.rel_path.clone(), rrd);
-            }
+            self.rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?;
         }
 
-
         // We need to apply the journal only once, because further updates
         // are always directly applied.
         state.journal_applied = true;
@@ -275,23 +329,8 @@ impl RRDCache {
 
     fn commit_journal_locked(&self, state: &mut RRDCacheState) -> Result<usize, Error> {
 
-        // save all RRDs
-        let mut rrd_file_count = 0;
-
-        let mut errors = 0;
-        for (rel_path, rrd) in state.rrd_map.iter() {
-            rrd_file_count += 1;
-            let mut path = self.basedir.clone();
-            path.push(&rel_path);
-            if let Err(err) = rrd.save(&path, self.file_options.clone()) {
-                errors += 1;
-                log::error!("unable to save {:?}: {}", path, err);
-            }
-        }
-
-       if errors != 0 {
-            bail!("errors during rrd flush - unable to commit rrd journal");
-        }
+        // save all RRDs - we only need a read lock here
+        let rrd_file_count = self.rrd_map.read().unwrap().flush_rrd_files()?;
 
         // if everything went ok, commit the journal
 
@@ -318,18 +357,7 @@ impl RRDCache {
 
         Self::append_journal_entry(&mut state, time, value, dst, rel_path)?;
 
-        if let Some(rrd) = state.rrd_map.get_mut(rel_path) {
-            rrd.update(time, value);
-        } else {
-            let mut path = self.basedir.clone();
-            path.push(rel_path);
-            create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
-
-            let mut rrd = (self.load_rrd_cb)(&self, &path, rel_path, dst);
-
-            rrd.update(time, value);
-            state.rrd_map.insert(rel_path.into(), rrd);
-        }
+        self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?;
 
         Ok(())
     }
@@ -348,12 +376,7 @@ impl RRDCache {
         start: Option<u64>,
         end: Option<u64>,
     ) -> Result<Option<(u64, u64, Vec<Option<f64>>)>, Error> {
-
-        let state = self.state.read().unwrap();
-
-        match state.rrd_map.get(&format!("{}/{}", base, name)) {
-            Some(rrd) => Ok(Some(rrd.extract_data(cf, resolution, start, end)?)),
-            None => Ok(None),
-        }
+        self.rrd_map.read().unwrap()
+            .extract_cached_data(base, name, cf, resolution, start, end)
     }
 }