"pbs-fuse-loop",
"pbs-runtime",
"proxmox-rest-server",
+ "proxmox-rrd",
"proxmox-systemd",
"pbs-tape",
"pbs-tools",
pbs-datastore = { path = "pbs-datastore" }
pbs-runtime = { path = "pbs-runtime" }
proxmox-rest-server = { path = "proxmox-rest-server" }
+proxmox-rrd = { path = "proxmox-rrd" }
proxmox-systemd = { path = "proxmox-systemd" }
pbs-tools = { path = "pbs-tools" }
pbs-tape = { path = "pbs-tape" }
pbs-fuse-loop \
pbs-runtime \
proxmox-rest-server \
+ proxmox-rrd \
proxmox-systemd \
pbs-tape \
pbs-tools \
pub extra_info: Option<String>,
}
-#[api()]
-#[derive(Copy, Clone, Serialize, Deserialize)]
-#[serde(rename_all = "UPPERCASE")]
-pub enum RRDMode {
- /// Maximum
- Max,
- /// Average
- Average,
-}
-
-
-#[api()]
-#[repr(u64)]
-#[derive(Copy, Clone, Serialize, Deserialize)]
-#[serde(rename_all = "lowercase")]
-pub enum RRDTimeFrameResolution {
- /// 1 min => last 70 minutes
- Hour = 60,
- /// 30 min => last 35 hours
- Day = 60*30,
- /// 3 hours => about 8 days
- Week = 60*180,
- /// 12 hours => last 35 days
- Month = 60*720,
- /// 1 week => last 490 days
- Year = 60*10080,
-}
#[api()]
#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
--- /dev/null
+[package]
+name = "proxmox-rrd"
+version = "0.1.0"
+authors = ["Dietmar Maurer <dietmar@proxmox.com>"]
+edition = "2018"
+description = "Simple RRD database implementation."
+
+[dependencies]
+anyhow = "1.0"
+bitflags = "1.2.1"
+serde = { version = "1.0", features = [] }
+
+proxmox = { version = "0.13.5", features = ["api-macro"] }
--- /dev/null
+use std::path::{Path, PathBuf};
+use std::collections::HashMap;
+use std::sync::{RwLock};
+
+use anyhow::{format_err, Error};
+
+use proxmox::tools::fs::{create_path, CreateOptions};
+
+use crate::{RRDMode, RRDTimeFrameResolution};
+
+use super::*;
+
+/// RRD cache - keep RRD data in RAM, but write updates to disk
+///
+/// This cache is designed to run as single instance (no concurrent
+/// access from other processes).
+pub struct RRDCache {
+ basedir: PathBuf,
+ file_options: CreateOptions,
+ dir_options: CreateOptions,
+ cache: RwLock<HashMap<String, RRD>>,
+}
+
+impl RRDCache {
+
+ /// Creates a new instance
+ pub fn new<P: AsRef<Path>>(
+ basedir: P,
+ file_options: Option<CreateOptions>,
+ dir_options: Option<CreateOptions>,
+ ) -> Self {
+ let basedir = basedir.as_ref().to_owned();
+ Self {
+ basedir,
+ file_options: file_options.unwrap_or_else(|| CreateOptions::new()),
+ dir_options: dir_options.unwrap_or_else(|| CreateOptions::new()),
+ cache: RwLock::new(HashMap::new()),
+ }
+ }
+}
+
+impl RRDCache {
+
+ /// Create rrdd stat dir with correct permission
+ pub fn create_rrdb_dir(&self) -> Result<(), Error> {
+
+ create_path(&self.basedir, Some(self.dir_options.clone()), Some(self.file_options.clone()))
+ .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
+
+ Ok(())
+ }
+
+ /// Update data in RAM and write file back to disk (if `save` is set)
+ pub fn update_value(
+ &self,
+ rel_path: &str,
+ value: f64,
+ dst: DST,
+ save: bool,
+ ) -> Result<(), Error> {
+
+ let mut path = self.basedir.clone();
+ path.push(rel_path);
+
+ std::fs::create_dir_all(path.parent().unwrap())?; // fixme??
+
+ let mut map = self.cache.write().unwrap();
+ let now = proxmox::tools::time::epoch_f64();
+
+ if let Some(rrd) = map.get_mut(rel_path) {
+ rrd.update(now, value);
+ if save { rrd.save(&path, self.file_options.clone())?; }
+ } else {
+ let mut rrd = match RRD::load(&path) {
+ Ok(rrd) => rrd,
+ Err(err) => {
+ if err.kind() != std::io::ErrorKind::NotFound {
+ eprintln!("overwriting RRD file {:?}, because of load error: {}", path, err);
+ }
+ RRD::new(dst)
+ },
+ };
+ rrd.update(now, value);
+ if save {
+ rrd.save(&path, self.file_options.clone())?;
+ }
+ map.insert(rel_path.into(), rrd);
+ }
+
+ Ok(())
+ }
+
+ /// Extract data from cached RRD
+ pub fn extract_cached_data(
+ &self,
+ base: &str,
+ name: &str,
+ now: f64,
+ timeframe: RRDTimeFrameResolution,
+ mode: RRDMode,
+ ) -> Option<(u64, u64, Vec<Option<f64>>)> {
+
+ let map = self.cache.read().unwrap();
+
+ match map.get(&format!("{}/{}", base, name)) {
+ Some(rrd) => Some(rrd.extract_data(now, timeframe, mode)),
+ None => None,
+ }
+ }
+
+}
--- /dev/null
+mod rrd;
+pub use rrd::*;
+
+mod cache;
+pub use cache::*;
+
+use serde::{Deserialize, Serialize};
+use proxmox::api::api;
+
+#[api()]
+#[derive(Copy, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "UPPERCASE")]
+/// RRD consolidation mode
+pub enum RRDMode {
+ /// Maximum
+ Max,
+ /// Average
+ Average,
+}
+
+#[api()]
+#[repr(u64)]
+#[derive(Copy, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "lowercase")]
+/// RRD time frame resolution
+pub enum RRDTimeFrameResolution {
+ /// 1 min => last 70 minutes
+ Hour = 60,
+ /// 30 min => last 35 hours
+ Day = 60*30,
+ /// 3 hours => about 8 days
+ Week = 60*180,
+ /// 12 hours => last 35 days
+ Month = 60*720,
+ /// 1 week => last 490 days
+ Year = 60*10080,
+}
--- /dev/null
+use std::io::Read;
+use std::path::Path;
+
+use anyhow::Error;
+
+use proxmox::tools::{fs::replace_file, fs::CreateOptions};
+
+use crate::{RRDMode, RRDTimeFrameResolution};
+
+/// The number of data entries per RRA
+pub const RRD_DATA_ENTRIES: usize = 70;
+
+/// Proxmox RRD file magic number
+// openssl::sha::sha256(b"Proxmox Round Robin Database file v1.0")[0..8];
+pub const PROXMOX_RRD_MAGIC_1_0: [u8; 8] = [206, 46, 26, 212, 172, 158, 5, 186];
+
+use bitflags::bitflags;
+
+bitflags!{
+ struct RRAFlags: u64 {
+ // Data Source Types
+ const DST_GAUGE = 1;
+ const DST_DERIVE = 2;
+ const DST_COUNTER = 4;
+ const DST_MASK = 255; // first 8 bits
+
+ // Consolidation Functions
+ const CF_AVERAGE = 1 << 8;
+ const CF_MAX = 2 << 8;
+ const CF_MASK = 255 << 8;
+ }
+}
+
+/// RRD data source tyoe
+pub enum DST {
+ Gauge,
+ Derive,
+}
+
+#[repr(C)]
+struct RRA {
+ flags: RRAFlags,
+ resolution: u64,
+ last_update: f64,
+ last_count: u64,
+ counter_value: f64, // used for derive/counters
+ data: [f64; RRD_DATA_ENTRIES],
+}
+
+impl RRA {
+ fn new(flags: RRAFlags, resolution: u64) -> Self {
+ Self {
+ flags, resolution,
+ last_update: 0.0,
+ last_count: 0,
+ counter_value: f64::NAN,
+ data: [f64::NAN; RRD_DATA_ENTRIES],
+ }
+ }
+
+ fn delete_old(&mut self, time: f64) {
+ let epoch = time as u64;
+ let last_update = self.last_update as u64;
+ let reso = self.resolution;
+
+ let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso;
+ let min_time = (min_time/reso + 1)*reso;
+ let mut t = last_update.saturating_sub((RRD_DATA_ENTRIES as u64)*reso);
+ let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+ for _ in 0..RRD_DATA_ENTRIES {
+ t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
+ if t < min_time {
+ self.data[index] = f64::NAN;
+ } else {
+ break;
+ }
+ }
+ }
+
+ fn compute_new_value(&mut self, time: f64, value: f64) {
+ let epoch = time as u64;
+ let last_update = self.last_update as u64;
+ let reso = self.resolution;
+
+ let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+ let last_index = ((last_update/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+
+ if (epoch - (last_update as u64)) > reso || index != last_index {
+ self.last_count = 0;
+ }
+
+ let last_value = self.data[index];
+ if last_value.is_nan() {
+ self.last_count = 0;
+ }
+
+ let new_count = if self.last_count < u64::MAX {
+ self.last_count + 1
+ } else {
+ u64::MAX // should never happen
+ };
+
+ if self.last_count == 0 {
+ self.data[index] = value;
+ self.last_count = 1;
+ } else {
+ let new_value = if self.flags.contains(RRAFlags::CF_MAX) {
+ if last_value > value { last_value } else { value }
+ } else if self.flags.contains(RRAFlags::CF_AVERAGE) {
+ (last_value*(self.last_count as f64))/(new_count as f64)
+ + value/(new_count as f64)
+ } else {
+ eprintln!("rrdb update failed - unknown CF");
+ return;
+ };
+ self.data[index] = new_value;
+ self.last_count = new_count;
+ }
+ self.last_update = time;
+ }
+
+ fn update(&mut self, time: f64, mut value: f64) {
+
+ if time <= self.last_update {
+ eprintln!("rrdb update failed - time in past ({} < {})", time, self.last_update);
+ }
+
+ if value.is_nan() {
+ eprintln!("rrdb update failed - new value is NAN");
+ return;
+ }
+
+ // derive counter value
+ if self.flags.intersects(RRAFlags::DST_DERIVE | RRAFlags::DST_COUNTER) {
+ let time_diff = time - self.last_update;
+ let is_counter = self.flags.contains(RRAFlags::DST_COUNTER);
+
+ let diff = if self.counter_value.is_nan() {
+ 0.0
+ } else if is_counter && value < 0.0 {
+ eprintln!("rrdb update failed - got negative value for counter");
+ return;
+ } else if is_counter && value < self.counter_value {
+ // Note: We do not try automatic overflow corrections
+ self.counter_value = value;
+ eprintln!("rrdb update failed - conter overflow/reset detected");
+ return;
+ } else {
+ value - self.counter_value
+ };
+ self.counter_value = value;
+ value = diff/time_diff;
+ }
+
+ self.delete_old(time);
+ self.compute_new_value(time, value);
+ }
+}
+
+#[repr(C)]
+// Note: Avoid alignment problems by using 8byte types only
+pub struct RRD {
+ magic: [u8; 8],
+ hour_avg: RRA,
+ hour_max: RRA,
+ day_avg: RRA,
+ day_max: RRA,
+ week_avg: RRA,
+ week_max: RRA,
+ month_avg: RRA,
+ month_max: RRA,
+ year_avg: RRA,
+ year_max: RRA,
+}
+
+impl RRD {
+
+ pub fn new(dst: DST) -> Self {
+ let flags = match dst {
+ DST::Gauge => RRAFlags::DST_GAUGE,
+ DST::Derive => RRAFlags::DST_DERIVE,
+ };
+
+ Self {
+ magic: PROXMOX_RRD_MAGIC_1_0,
+ hour_avg: RRA::new(
+ flags | RRAFlags::CF_AVERAGE,
+ RRDTimeFrameResolution::Hour as u64,
+ ),
+ hour_max: RRA::new(
+ flags | RRAFlags::CF_MAX,
+ RRDTimeFrameResolution::Hour as u64,
+ ),
+ day_avg: RRA::new(
+ flags | RRAFlags::CF_AVERAGE,
+ RRDTimeFrameResolution::Day as u64,
+ ),
+ day_max: RRA::new(
+ flags | RRAFlags::CF_MAX,
+ RRDTimeFrameResolution::Day as u64,
+ ),
+ week_avg: RRA::new(
+ flags | RRAFlags::CF_AVERAGE,
+ RRDTimeFrameResolution::Week as u64,
+ ),
+ week_max: RRA::new(
+ flags | RRAFlags::CF_MAX,
+ RRDTimeFrameResolution::Week as u64,
+ ),
+ month_avg: RRA::new(
+ flags | RRAFlags::CF_AVERAGE,
+ RRDTimeFrameResolution::Month as u64,
+ ),
+ month_max: RRA::new(
+ flags | RRAFlags::CF_MAX,
+ RRDTimeFrameResolution::Month as u64,
+ ),
+ year_avg: RRA::new(
+ flags | RRAFlags::CF_AVERAGE,
+ RRDTimeFrameResolution::Year as u64,
+ ),
+ year_max: RRA::new(
+ flags | RRAFlags::CF_MAX,
+ RRDTimeFrameResolution::Year as u64,
+ ),
+ }
+ }
+
+ pub fn extract_data(
+ &self,
+ time: f64,
+ timeframe: RRDTimeFrameResolution,
+ mode: RRDMode,
+ ) -> (u64, u64, Vec<Option<f64>>) {
+
+ let epoch = time as u64;
+ let reso = timeframe as u64;
+
+ let end = reso*(epoch/reso + 1);
+ let start = end - reso*(RRD_DATA_ENTRIES as u64);
+
+ let mut list = Vec::new();
+
+ let raa = match (mode, timeframe) {
+ (RRDMode::Average, RRDTimeFrameResolution::Hour) => &self.hour_avg,
+ (RRDMode::Max, RRDTimeFrameResolution::Hour) => &self.hour_max,
+ (RRDMode::Average, RRDTimeFrameResolution::Day) => &self.day_avg,
+ (RRDMode::Max, RRDTimeFrameResolution::Day) => &self.day_max,
+ (RRDMode::Average, RRDTimeFrameResolution::Week) => &self.week_avg,
+ (RRDMode::Max, RRDTimeFrameResolution::Week) => &self.week_max,
+ (RRDMode::Average, RRDTimeFrameResolution::Month) => &self.month_avg,
+ (RRDMode::Max, RRDTimeFrameResolution::Month) => &self.month_max,
+ (RRDMode::Average, RRDTimeFrameResolution::Year) => &self.year_avg,
+ (RRDMode::Max, RRDTimeFrameResolution::Year) => &self.year_max,
+ };
+
+ let rrd_end = reso*((raa.last_update as u64)/reso);
+ let rrd_start = rrd_end - reso*(RRD_DATA_ENTRIES as u64);
+
+ let mut t = start;
+ let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+ for _ in 0..RRD_DATA_ENTRIES {
+ if t < rrd_start || t > rrd_end {
+ list.push(None);
+ } else {
+ let value = raa.data[index];
+ if value.is_nan() {
+ list.push(None);
+ } else {
+ list.push(Some(value));
+ }
+ }
+ t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
+ }
+
+ (start, reso, list)
+ }
+
+ pub fn from_raw(mut raw: &[u8]) -> Result<Self, std::io::Error> {
+ let expected_len = std::mem::size_of::<RRD>();
+ if raw.len() != expected_len {
+ let msg = format!("wrong data size ({} != {})", raw.len(), expected_len);
+ return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+ }
+
+ let mut rrd: RRD = unsafe { std::mem::zeroed() };
+ unsafe {
+ let rrd_slice = std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len);
+ raw.read_exact(rrd_slice)?;
+ }
+
+ if rrd.magic != PROXMOX_RRD_MAGIC_1_0 {
+ let msg = "wrong magic number".to_string();
+ return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+ }
+
+ Ok(rrd)
+ }
+
+ pub fn load(path: &Path) -> Result<Self, std::io::Error> {
+ let raw = std::fs::read(path)?;
+ Self::from_raw(&raw)
+ }
+
+ pub fn save(&self, filename: &Path, options: CreateOptions) -> Result<(), Error> {
+ let rrd_slice = unsafe {
+ std::slice::from_raw_parts(self as *const _ as *const u8, std::mem::size_of::<RRD>())
+ };
+ replace_file(filename, rrd_slice, options)
+ }
+
+
+ pub fn update(&mut self, time: f64, value: f64) {
+ self.hour_avg.update(time, value);
+ self.hour_max.update(time, value);
+
+ self.day_avg.update(time, value);
+ self.day_max.update(time, value);
+
+ self.week_avg.update(time, value);
+ self.week_max.update(time, value);
+
+ self.month_avg.update(time, value);
+ self.month_max.update(time, value);
+
+ self.year_avg.update(time, value);
+ self.year_max.update(time, value);
+ }
+}
use pxar::accessor::aio::Accessor;
use pxar::EntryKind;
+use proxmox_rrd::{RRDMode, RRDTimeFrameResolution};
use pbs_api_types::{ Authid, BackupContent, Counts, CryptMode,
DataStoreListItem, GarbageCollectionStatus, GroupListItem,
SnapshotListItem, SnapshotVerifyState, PruneOptions,
- DataStoreStatus, RRDMode, RRDTimeFrameResolution,
+ DataStoreStatus,
BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_TIME_SCHEMA,
BACKUP_TYPE_SCHEMA, DATASTORE_SCHEMA,
IGNORE_VERIFIED_BACKUPS_SCHEMA, UPID_SCHEMA,
use proxmox::api::{api, Permission, Router};
-use pbs_api_types::{RRDMode, RRDTimeFrameResolution, NODE_SCHEMA, PRIV_SYS_AUDIT};
+use pbs_api_types::{NODE_SCHEMA, PRIV_SYS_AUDIT};
+use proxmox_rrd::{RRDMode, RRDTimeFrameResolution, RRD_DATA_ENTRIES};
-use crate::rrd::{extract_cached_data, RRD_DATA_ENTRIES};
+use crate::RRD_CACHE;
pub fn create_value_from_rrd(
basedir: &str,
let now = proxmox::tools::time::epoch_f64();
for name in list {
- let (start, reso, list) = match extract_cached_data(basedir, name, now, timeframe, cf) {
+ let (start, reso, list) = match RRD_CACHE.extract_cached_data(basedir, name, now, timeframe, cf) {
Some(result) => result,
None => continue,
};
};
use pbs_api_types::{
- DATASTORE_SCHEMA, RRDMode, RRDTimeFrameResolution, Authid,
- PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
+ Authid, DATASTORE_SCHEMA, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
};
+use proxmox_rrd::{RRDMode, RRDTimeFrameResolution};
+
use pbs_datastore::DataStore;
use pbs_config::CachedUserInfo;
use crate::tools::statistics::{linear_regression};
+use crate::RRD_CACHE;
#[api(
returns: {
let rrd_dir = format!("datastore/{}", store);
let now = proxmox::tools::time::epoch_f64();
- let get_rrd = |what: &str| crate::rrd::extract_cached_data(
+ let get_rrd = |what: &str| RRD_CACHE.extract_cached_data(
&rrd_dir,
what,
now,
use proxmox_backup::server::auth::check_pbs_auth;
use proxmox_backup::auth_helpers::*;
+use proxmox_backup::RRD_CACHE;
use proxmox_backup::config;
fn main() {
proxmox_backup::server::create_run_dir()?;
- proxmox_backup::rrd::create_rrdb_dir()?;
+ RRD_CACHE.create_rrdb_dir()?;
+
proxmox_backup::server::jobstate::create_jobstate_dir()?;
proxmox_backup::tape::create_tape_status_dir()?;
proxmox_backup::tape::create_drive_state_dir()?;
use pbs_tools::task_log;
use pbs_datastore::DataStore;
+use proxmox_rrd::DST;
+
use proxmox_rest_server::{
rotate_task_log_archive, extract_cookie , AuthError, ApiConfig, RestServer, RestEnvironment,
ServerAdapter, WorkerTask,
};
use proxmox_backup::{
+ RRD_CACHE,
server::{
auth::check_pbs_auth,
jobstate::{
}
fn rrd_update_gauge(name: &str, value: f64, save: bool) {
- use proxmox_backup::rrd;
- if let Err(err) = rrd::update_value(name, value, rrd::DST::Gauge, save) {
+ if let Err(err) = RRD_CACHE.update_value(name, value, DST::Gauge, save) {
eprintln!("rrd::update_value '{}' failed - {}", name, err);
}
}
fn rrd_update_derive(name: &str, value: f64, save: bool) {
- use proxmox_backup::rrd;
- if let Err(err) = rrd::update_value(name, value, rrd::DST::Derive, save) {
+ if let Err(err) = RRD_CACHE.update_value(name, value, DST::Derive, save) {
eprintln!("rrd::update_value '{}' failed - {}", name, err);
}
}
use std::path::PathBuf;
+use proxmox::tools::fs::CreateOptions;
+
use pbs_buildcfg::configdir;
use pbs_tools::cert::CertInfo;
+use proxmox_rrd::RRDCache;
#[macro_use]
pub mod tools;
pub mod auth;
-pub mod rrd;
-
pub mod tape;
pub mod acme;
pub fn cert_info() -> Result<CertInfo, anyhow::Error> {
CertInfo::from_path(PathBuf::from(configdir!("/proxy.pem")))
}
+
+lazy_static::lazy_static!{
+ /// Proxmox Backup Server RRD cache instance
+ pub static ref RRD_CACHE: RRDCache = {
+ let backup_user = pbs_config::backup_user().unwrap();
+ let file_options = CreateOptions::new()
+ .owner(backup_user.uid)
+ .group(backup_user.gid);
+
+ let dir_options = CreateOptions::new()
+ .owner(backup_user.uid)
+ .group(backup_user.gid);
+
+ RRDCache::new(
+ "/var/lib/proxmox-backup/rrdb",
+ Some(file_options),
+ Some(dir_options),
+ )
+ };
+}
+++ /dev/null
-use std::path::PathBuf;
-use std::collections::HashMap;
-use std::sync::{RwLock};
-
-use anyhow::{format_err, Error};
-use lazy_static::lazy_static;
-
-use proxmox::tools::fs::{create_path, CreateOptions};
-
-use pbs_api_types::{RRDMode, RRDTimeFrameResolution};
-
-use super::*;
-
-const PBS_RRD_BASEDIR: &str = "/var/lib/proxmox-backup/rrdb";
-
-lazy_static!{
- static ref RRD_CACHE: RwLock<HashMap<String, RRD>> = {
- RwLock::new(HashMap::new())
- };
-}
-
-/// Create rrdd stat dir with correct permission
-pub fn create_rrdb_dir() -> Result<(), Error> {
-
- let backup_user = pbs_config::backup_user()?;
- let opts = CreateOptions::new()
- .owner(backup_user.uid)
- .group(backup_user.gid);
-
- create_path(PBS_RRD_BASEDIR, None, Some(opts))
- .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
-
- Ok(())
-}
-
-pub fn update_value(rel_path: &str, value: f64, dst: DST, save: bool) -> Result<(), Error> {
-
- let mut path = PathBuf::from(PBS_RRD_BASEDIR);
- path.push(rel_path);
-
- std::fs::create_dir_all(path.parent().unwrap())?;
-
- let mut map = RRD_CACHE.write().unwrap();
- let now = proxmox::tools::time::epoch_f64();
-
- if let Some(rrd) = map.get_mut(rel_path) {
- rrd.update(now, value);
- if save { rrd.save(&path)?; }
- } else {
- let mut rrd = match RRD::load(&path) {
- Ok(rrd) => rrd,
- Err(err) => {
- if err.kind() != std::io::ErrorKind::NotFound {
- eprintln!("overwriting RRD file {:?}, because of load error: {}", path, err);
- }
- RRD::new(dst)
- },
- };
- rrd.update(now, value);
- if save { rrd.save(&path)?; }
- map.insert(rel_path.into(), rrd);
- }
-
- Ok(())
-}
-
-pub fn extract_cached_data(
- base: &str,
- name: &str,
- now: f64,
- timeframe: RRDTimeFrameResolution,
- mode: RRDMode,
-) -> Option<(u64, u64, Vec<Option<f64>>)> {
-
- let map = RRD_CACHE.read().unwrap();
-
- match map.get(&format!("{}/{}", base, name)) {
- Some(rrd) => Some(rrd.extract_data(now, timeframe, mode)),
- None => None,
- }
-}
+++ /dev/null
-#[allow(clippy::module_inception)]
-mod rrd;
-pub use rrd::*;
-mod cache;
-pub use cache::*;
+++ /dev/null
-use std::io::Read;
-use std::path::Path;
-
-use anyhow::Error;
-
-use pbs_api_types::{RRDMode, RRDTimeFrameResolution};
-
-pub const RRD_DATA_ENTRIES: usize = 70;
-
-// openssl::sha::sha256(b"Proxmox Round Robin Database file v1.0")[0..8];
-pub const PROXMOX_RRD_MAGIC_1_0: [u8; 8] = [206, 46, 26, 212, 172, 158, 5, 186];
-
-use bitflags::bitflags;
-
-bitflags!{
- pub struct RRAFlags: u64 {
- // Data Source Types
- const DST_GAUGE = 1;
- const DST_DERIVE = 2;
- const DST_COUNTER = 4;
- const DST_MASK = 255; // first 8 bits
-
- // Consolidation Functions
- const CF_AVERAGE = 1 << 8;
- const CF_MAX = 2 << 8;
- const CF_MASK = 255 << 8;
- }
-}
-
-pub enum DST {
- Gauge,
- Derive,
-}
-
-#[repr(C)]
-struct RRA {
- flags: RRAFlags,
- resolution: u64,
- last_update: f64,
- last_count: u64,
- counter_value: f64, // used for derive/counters
- data: [f64; RRD_DATA_ENTRIES],
-}
-
-impl RRA {
- fn new(flags: RRAFlags, resolution: u64) -> Self {
- Self {
- flags, resolution,
- last_update: 0.0,
- last_count: 0,
- counter_value: f64::NAN,
- data: [f64::NAN; RRD_DATA_ENTRIES],
- }
- }
-
- fn delete_old(&mut self, time: f64) {
- let epoch = time as u64;
- let last_update = self.last_update as u64;
- let reso = self.resolution;
-
- let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso;
- let min_time = (min_time/reso + 1)*reso;
- let mut t = last_update.saturating_sub((RRD_DATA_ENTRIES as u64)*reso);
- let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
- for _ in 0..RRD_DATA_ENTRIES {
- t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
- if t < min_time {
- self.data[index] = f64::NAN;
- } else {
- break;
- }
- }
- }
-
- fn compute_new_value(&mut self, time: f64, value: f64) {
- let epoch = time as u64;
- let last_update = self.last_update as u64;
- let reso = self.resolution;
-
- let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
- let last_index = ((last_update/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
-
- if (epoch - (last_update as u64)) > reso || index != last_index {
- self.last_count = 0;
- }
-
- let last_value = self.data[index];
- if last_value.is_nan() {
- self.last_count = 0;
- }
-
- let new_count = if self.last_count < u64::MAX {
- self.last_count + 1
- } else {
- u64::MAX // should never happen
- };
-
- if self.last_count == 0 {
- self.data[index] = value;
- self.last_count = 1;
- } else {
- let new_value = if self.flags.contains(RRAFlags::CF_MAX) {
- if last_value > value { last_value } else { value }
- } else if self.flags.contains(RRAFlags::CF_AVERAGE) {
- (last_value*(self.last_count as f64))/(new_count as f64)
- + value/(new_count as f64)
- } else {
- eprintln!("rrdb update failed - unknown CF");
- return;
- };
- self.data[index] = new_value;
- self.last_count = new_count;
- }
- self.last_update = time;
- }
-
- fn update(&mut self, time: f64, mut value: f64) {
-
- if time <= self.last_update {
- eprintln!("rrdb update failed - time in past ({} < {})", time, self.last_update);
- }
-
- if value.is_nan() {
- eprintln!("rrdb update failed - new value is NAN");
- return;
- }
-
- // derive counter value
- if self.flags.intersects(RRAFlags::DST_DERIVE | RRAFlags::DST_COUNTER) {
- let time_diff = time - self.last_update;
- let is_counter = self.flags.contains(RRAFlags::DST_COUNTER);
-
- let diff = if self.counter_value.is_nan() {
- 0.0
- } else if is_counter && value < 0.0 {
- eprintln!("rrdb update failed - got negative value for counter");
- return;
- } else if is_counter && value < self.counter_value {
- // Note: We do not try automatic overflow corrections
- self.counter_value = value;
- eprintln!("rrdb update failed - conter overflow/reset detected");
- return;
- } else {
- value - self.counter_value
- };
- self.counter_value = value;
- value = diff/time_diff;
- }
-
- self.delete_old(time);
- self.compute_new_value(time, value);
- }
-}
-
-#[repr(C)]
-// Note: Avoid alignment problems by using 8byte types only
-pub struct RRD {
- magic: [u8; 8],
- hour_avg: RRA,
- hour_max: RRA,
- day_avg: RRA,
- day_max: RRA,
- week_avg: RRA,
- week_max: RRA,
- month_avg: RRA,
- month_max: RRA,
- year_avg: RRA,
- year_max: RRA,
-}
-
-impl RRD {
-
- pub fn new(dst: DST) -> Self {
- let flags = match dst {
- DST::Gauge => RRAFlags::DST_GAUGE,
- DST::Derive => RRAFlags::DST_DERIVE,
- };
-
- Self {
- magic: PROXMOX_RRD_MAGIC_1_0,
- hour_avg: RRA::new(
- flags | RRAFlags::CF_AVERAGE,
- RRDTimeFrameResolution::Hour as u64,
- ),
- hour_max: RRA::new(
- flags | RRAFlags::CF_MAX,
- RRDTimeFrameResolution::Hour as u64,
- ),
- day_avg: RRA::new(
- flags | RRAFlags::CF_AVERAGE,
- RRDTimeFrameResolution::Day as u64,
- ),
- day_max: RRA::new(
- flags | RRAFlags::CF_MAX,
- RRDTimeFrameResolution::Day as u64,
- ),
- week_avg: RRA::new(
- flags | RRAFlags::CF_AVERAGE,
- RRDTimeFrameResolution::Week as u64,
- ),
- week_max: RRA::new(
- flags | RRAFlags::CF_MAX,
- RRDTimeFrameResolution::Week as u64,
- ),
- month_avg: RRA::new(
- flags | RRAFlags::CF_AVERAGE,
- RRDTimeFrameResolution::Month as u64,
- ),
- month_max: RRA::new(
- flags | RRAFlags::CF_MAX,
- RRDTimeFrameResolution::Month as u64,
- ),
- year_avg: RRA::new(
- flags | RRAFlags::CF_AVERAGE,
- RRDTimeFrameResolution::Year as u64,
- ),
- year_max: RRA::new(
- flags | RRAFlags::CF_MAX,
- RRDTimeFrameResolution::Year as u64,
- ),
- }
- }
-
- pub fn extract_data(
- &self,
- time: f64,
- timeframe: RRDTimeFrameResolution,
- mode: RRDMode,
- ) -> (u64, u64, Vec<Option<f64>>) {
- let epoch = time as u64;
- let reso = timeframe as u64;
-
- let end = reso*(epoch/reso + 1);
- let start = end - reso*(RRD_DATA_ENTRIES as u64);
-
- let mut list = Vec::new();
-
- let raa = match (mode, timeframe) {
- (RRDMode::Average, RRDTimeFrameResolution::Hour) => &self.hour_avg,
- (RRDMode::Max, RRDTimeFrameResolution::Hour) => &self.hour_max,
- (RRDMode::Average, RRDTimeFrameResolution::Day) => &self.day_avg,
- (RRDMode::Max, RRDTimeFrameResolution::Day) => &self.day_max,
- (RRDMode::Average, RRDTimeFrameResolution::Week) => &self.week_avg,
- (RRDMode::Max, RRDTimeFrameResolution::Week) => &self.week_max,
- (RRDMode::Average, RRDTimeFrameResolution::Month) => &self.month_avg,
- (RRDMode::Max, RRDTimeFrameResolution::Month) => &self.month_max,
- (RRDMode::Average, RRDTimeFrameResolution::Year) => &self.year_avg,
- (RRDMode::Max, RRDTimeFrameResolution::Year) => &self.year_max,
- };
-
- let rrd_end = reso*((raa.last_update as u64)/reso);
- let rrd_start = rrd_end - reso*(RRD_DATA_ENTRIES as u64);
-
- let mut t = start;
- let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
- for _ in 0..RRD_DATA_ENTRIES {
- if t < rrd_start || t > rrd_end {
- list.push(None);
- } else {
- let value = raa.data[index];
- if value.is_nan() {
- list.push(None);
- } else {
- list.push(Some(value));
- }
- }
- t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
- }
-
- (start, reso, list)
- }
-
- pub fn from_raw(mut raw: &[u8]) -> Result<Self, std::io::Error> {
- let expected_len = std::mem::size_of::<RRD>();
- if raw.len() != expected_len {
- let msg = format!("wrong data size ({} != {})", raw.len(), expected_len);
- return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
- }
-
- let mut rrd: RRD = unsafe { std::mem::zeroed() };
- unsafe {
- let rrd_slice = std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len);
- raw.read_exact(rrd_slice)?;
- }
-
- if rrd.magic != PROXMOX_RRD_MAGIC_1_0 {
- let msg = "wrong magic number".to_string();
- return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
- }
-
- Ok(rrd)
- }
-
- pub fn load(path: &Path) -> Result<Self, std::io::Error> {
- let raw = std::fs::read(path)?;
- Self::from_raw(&raw)
- }
-
- pub fn save(&self, filename: &Path) -> Result<(), Error> {
- use proxmox::tools::{fs::replace_file, fs::CreateOptions};
-
- let rrd_slice = unsafe {
- std::slice::from_raw_parts(self as *const _ as *const u8, std::mem::size_of::<RRD>())
- };
-
- let backup_user = pbs_config::backup_user()?;
- let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
- // set the correct owner/group/permissions while saving file
- // owner(rw) = backup, group(r)= backup
- let options = CreateOptions::new()
- .perm(mode)
- .owner(backup_user.uid)
- .group(backup_user.gid);
-
- replace_file(filename, rrd_slice, options)?;
-
- Ok(())
- }
-
-
- pub fn update(&mut self, time: f64, value: f64) {
- self.hour_avg.update(time, value);
- self.hour_max.update(time, value);
-
- self.day_avg.update(time, value);
- self.day_max.update(time, value);
-
- self.week_avg.update(time, value);
- self.week_max.update(time, value);
-
- self.month_avg.update(time, value);
- self.month_max.update(time, value);
-
- self.year_avg.update(time, value);
- self.year_max.update(time, value);
- }
-}