apply_interval: f64,
basedir: PathBuf,
file_options: CreateOptions,
- dir_options: CreateOptions,
state: RwLock<RRDCacheState>,
- load_rrd_cb: fn(cache: &RRDCache, path: &Path, rel_path: &str, dst: DST) -> RRD,
+ rrd_map: RwLock<RRDMap>,
+}
+
+struct RRDMap {
+ basedir: PathBuf,
+ file_options: CreateOptions,
+ dir_options: CreateOptions,
+ map: HashMap<String, RRD>,
+ load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
+}
+
+impl RRDMap {
+
+ fn update(
+ &mut self,
+ rel_path: &str,
+ time: f64,
+ value: f64,
+ dst: DST,
+ new_only: bool,
+ ) -> Result<(), Error> {
+ if let Some(rrd) = self.map.get_mut(rel_path) {
+ if !new_only || time > rrd.last_update() {
+ rrd.update(time, value);
+ }
+ } else {
+ let mut path = self.basedir.clone();
+ path.push(rel_path);
+ create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
+
+ let mut rrd = (self.load_rrd_cb)(&path, rel_path, dst);
+
+ if !new_only || time > rrd.last_update() {
+ rrd.update(time, value);
+ }
+ self.map.insert(rel_path.to_string(), rrd);
+ }
+ Ok(())
+ }
+
+ fn flush_rrd_files(&self) -> Result<usize, Error> {
+ let mut rrd_file_count = 0;
+
+ let mut errors = 0;
+ for (rel_path, rrd) in self.map.iter() {
+ rrd_file_count += 1;
+
+ let mut path = self.basedir.clone();
+ path.push(&rel_path);
+
+ if let Err(err) = rrd.save(&path, self.file_options.clone()) {
+ errors += 1;
+ log::error!("unable to save {:?}: {}", path, err);
+ }
+ }
+
+ if errors != 0 {
+ bail!("errors during rrd flush - unable to commit rrd journal");
+ }
+
+ Ok(rrd_file_count)
+ }
+
+ fn extract_cached_data(
+ &self,
+ base: &str,
+ name: &str,
+ cf: CF,
+ resolution: u64,
+ start: Option<u64>,
+ end: Option<u64>,
+ ) -> Result<Option<(u64, u64, Vec<Option<f64>>)>, Error> {
+ match self.map.get(&format!("{}/{}", base, name)) {
+ Some(rrd) => Ok(Some(rrd.extract_data(cf, resolution, start, end)?)),
+ None => Ok(None),
+ }
+ }
}
// shared state behind RwLock
struct RRDCacheState {
- rrd_map: HashMap<String, RRD>,
journal: File,
last_journal_flush: f64,
journal_applied: bool,
file_options: Option<CreateOptions>,
dir_options: Option<CreateOptions>,
apply_interval: f64,
- load_rrd_cb: fn(cache: &RRDCache, path: &Path, rel_path: &str, dst: DST) -> RRD,
+ load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
) -> Result<Self, Error> {
let basedir = basedir.as_ref().to_owned();
let state = RRDCacheState {
journal,
- rrd_map: HashMap::new(),
last_journal_flush: 0.0,
journal_applied: false,
};
+ let rrd_map = RRDMap {
+ basedir: basedir.clone(),
+ file_options: file_options.clone(),
+ dir_options: dir_options,
+ map: HashMap::new(),
+ load_rrd_cb,
+ };
+
Ok(Self {
basedir,
file_options,
- dir_options,
apply_interval,
- load_rrd_cb,
state: RwLock::new(state),
- })
+ rrd_map: RwLock::new(rrd_map),
+ })
}
/// Create a new RRD as used by the proxmox backup server
let journal = atomic_open_or_create_file(&journal_path, flags, &[], self.file_options.clone())?;
let mut journal = BufReader::new(journal);
- let mut last_update_map = HashMap::new();
-
- let mut get_last_update = |rel_path: &str, rrd: &RRD| {
- if let Some(time) = last_update_map.get(rel_path) {
- return *time;
- }
- let last_update = rrd.last_update();
- last_update_map.insert(rel_path.to_string(), last_update);
- last_update
- };
-
+ // fixme: apply blocked to avoid too many calls to self.rrd_map.write() ??
let mut linenr = 0;
loop {
linenr += 1;
}
};
- if let Some(rrd) = state.rrd_map.get_mut(&entry.rel_path) {
- if entry.time > get_last_update(&entry.rel_path, &rrd) {
- rrd.update(entry.time, entry.value);
- }
- } else {
- let mut path = self.basedir.clone();
- path.push(&entry.rel_path);
- create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
-
- let mut rrd = (self.load_rrd_cb)(&self, &path, &entry.rel_path, entry.dst);
-
- if entry.time > get_last_update(&entry.rel_path, &rrd) {
- rrd.update(entry.time, entry.value);
- }
- state.rrd_map.insert(entry.rel_path.clone(), rrd);
- }
+ self.rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?;
}
-
// We need to apply the journal only once, because further updates
// are always directly applied.
state.journal_applied = true;
fn commit_journal_locked(&self, state: &mut RRDCacheState) -> Result<usize, Error> {
- // save all RRDs
- let mut rrd_file_count = 0;
-
- let mut errors = 0;
- for (rel_path, rrd) in state.rrd_map.iter() {
- rrd_file_count += 1;
- let mut path = self.basedir.clone();
- path.push(&rel_path);
- if let Err(err) = rrd.save(&path, self.file_options.clone()) {
- errors += 1;
- log::error!("unable to save {:?}: {}", path, err);
- }
- }
-
- if errors != 0 {
- bail!("errors during rrd flush - unable to commit rrd journal");
- }
+ // save all RRDs - we only need a read lock here
+ let rrd_file_count = self.rrd_map.read().unwrap().flush_rrd_files()?;
// if everything went ok, commit the journal
Self::append_journal_entry(&mut state, time, value, dst, rel_path)?;
- if let Some(rrd) = state.rrd_map.get_mut(rel_path) {
- rrd.update(time, value);
- } else {
- let mut path = self.basedir.clone();
- path.push(rel_path);
- create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
-
- let mut rrd = (self.load_rrd_cb)(&self, &path, rel_path, dst);
-
- rrd.update(time, value);
- state.rrd_map.insert(rel_path.into(), rrd);
- }
+ self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?;
Ok(())
}
start: Option<u64>,
end: Option<u64>,
) -> Result<Option<(u64, u64, Vec<Option<f64>>)>, Error> {
-
- let state = self.state.read().unwrap();
-
- match state.rrd_map.get(&format!("{}/{}", base, name)) {
- Some(rrd) => Ok(Some(rrd.extract_data(cf, resolution, start, end)?)),
- None => Ok(None),
- }
+ self.rrd_map.read().unwrap()
+ .extract_cached_data(base, name, cf, resolution, start, end)
}
}