]> git.proxmox.com Git - proxmox-backup.git/commitdiff
add simple rrd implementation
authorDietmar Maurer <dietmar@proxmox.com>
Sat, 23 May 2020 07:29:33 +0000 (09:29 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Sat, 23 May 2020 08:42:48 +0000 (10:42 +0200)
src/lib.rs
src/rrd/cache.rs [new file with mode: 0644]
src/rrd/mod.rs [new file with mode: 0644]
src/rrd/rrd.rs [new file with mode: 0644]

index c08444641d5fa3c1c504c3c27ea0a1eb733bc6b1..4125e32e1361e42d04e3c7bb22a5347aef17ab56 100644 (file)
@@ -21,3 +21,5 @@ pub mod client;
 pub mod auth_helpers;
 
 pub mod auth;
+
+pub mod rrd;
diff --git a/src/rrd/cache.rs b/src/rrd/cache.rs
new file mode 100644 (file)
index 0000000..9f8ef9f
--- /dev/null
@@ -0,0 +1,82 @@
+use std::time::{SystemTime, UNIX_EPOCH};
+use std::path::PathBuf;
+use std::collections::HashMap;
+use std::sync::{RwLock};
+
+use anyhow::{format_err, Error};
+use lazy_static::lazy_static;
+use serde_json::Value;
+
+use proxmox::tools::fs::{create_path, CreateOptions};
+
+use super::*;
+
+const PBS_RRD_BASEDIR: &str = "/var/lib/proxmox-backup/rrdb";
+
+lazy_static!{
+    static ref RRD_CACHE: RwLock<HashMap<String, RRD>> = {
+        RwLock::new(HashMap::new())
+    };
+}
+
+/// Create rrdd stat dir with correct permission
+pub fn create_rrdb_dir() -> Result<(), Error> {
+
+    let backup_user = crate::backup::backup_user()?;
+    let opts = CreateOptions::new()
+        .owner(backup_user.uid)
+        .group(backup_user.gid);
+
+    create_path(PBS_RRD_BASEDIR, None, Some(opts))
+        .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
+
+    Ok(())
+}
+
+fn now() -> Result<u64, Error> {
+    let epoch = SystemTime::now().duration_since(UNIX_EPOCH)?;
+    Ok(epoch.as_secs())
+}
+
+pub fn update_value(rel_path: &str, value: f64) -> Result<(), Error> {
+
+    let mut path = PathBuf::from(PBS_RRD_BASEDIR);
+    path.push(rel_path);
+
+    std::fs::create_dir_all(path.parent().unwrap())?;
+
+    let mut map = RRD_CACHE.write().unwrap();
+    let now = now()?;
+    
+    if let Some(rrd) = map.get_mut(rel_path) {
+        rrd.update(now, value);
+        rrd.save(&path)?;
+    } else {
+        let mut rrd = match RRD::load(&path) {
+            Ok(rrd) => rrd,
+            Err(_) => RRD::new(),
+        };
+        rrd.update(now, value);
+        rrd.save(&path)?;
+        map.insert(rel_path.into(), rrd);
+    }
+   
+    Ok(())
+}
+
+pub fn extract_data(
+    rel_path: &str,
+    timeframe: RRDTimeFrameResolution,
+    mode: RRDMode,
+) -> Result<Value, Error> {
+
+    let now = now()?;
+
+    let map = RRD_CACHE.read().unwrap();
+    
+    if let Some(rrd) = map.get(rel_path) {
+        Ok(rrd.extract_data(now, timeframe, mode))
+    } else {
+        Ok(RRD::new().extract_data(now, timeframe, mode))
+    }
+}
diff --git a/src/rrd/mod.rs b/src/rrd/mod.rs
new file mode 100644 (file)
index 0000000..c09efeb
--- /dev/null
@@ -0,0 +1,4 @@
+mod rrd;
+pub use rrd::*;
+mod cache;
+pub use cache::*;
diff --git a/src/rrd/rrd.rs b/src/rrd/rrd.rs
new file mode 100644 (file)
index 0000000..4fcf64f
--- /dev/null
@@ -0,0 +1,224 @@
+use std::io::Read;
+use std::path::Path;
+
+use anyhow::{bail, Error};
+use serde_json::{json, Value};
+
+const RRD_DATA_ENTRIES: usize = 70;
+
+#[derive(Copy, Clone)]
+pub enum RRDMode {
+    Max,
+    Average,
+}
+
+#[repr(u64)]
+#[derive(Copy, Clone)]
+pub enum RRDTimeFrameResolution {
+    Hour = 60,       // 1 min => last 70 minutes
+    Day = 60*30,     // 30 min => last 35 hours
+    Week = 60*180,   // 3 hours => about 8 days
+    Month = 60*720,  // 12 hours => last 35 days
+    Year = 60*10080, // 1 week => last 490 days
+}
+
+#[repr(C)]
+#[derive(Default, Copy, Clone)]
+struct RRDEntry {
+    max: f64,
+    average: f64,
+    count: u64,
+}
+
+#[repr(C)]
+// Note: Avoid alignment problems by using 8byte types only
+pub struct RRD {
+    last_update: u64,
+    hour: [RRDEntry; RRD_DATA_ENTRIES],
+    day: [RRDEntry; RRD_DATA_ENTRIES],
+    week: [RRDEntry; RRD_DATA_ENTRIES],
+    month: [RRDEntry; RRD_DATA_ENTRIES],
+    year: [RRDEntry; RRD_DATA_ENTRIES],
+}
+
+impl RRD {
+
+    pub fn new() -> Self {
+        Self {
+            last_update: 0,
+            hour: [RRDEntry::default(); RRD_DATA_ENTRIES],
+            day: [RRDEntry::default(); RRD_DATA_ENTRIES],
+            week: [RRDEntry::default(); RRD_DATA_ENTRIES],
+            month: [RRDEntry::default(); RRD_DATA_ENTRIES],
+            year: [RRDEntry::default(); RRD_DATA_ENTRIES],
+        }
+    }
+
+    pub fn extract_data(
+        &self,
+        epoch: u64,
+        timeframe: RRDTimeFrameResolution,
+        mode: RRDMode,
+    ) -> Value {
+
+        let reso = timeframe as u64;
+
+        let end = reso*(epoch/reso);
+        let start = end - reso*(RRD_DATA_ENTRIES as u64);
+
+        let rrd_end = reso*(self.last_update/reso);
+        let rrd_start = rrd_end - reso*(RRD_DATA_ENTRIES as u64);
+
+        let mut list = Vec::new();
+
+        let data = match timeframe {
+            RRDTimeFrameResolution::Hour => &self.hour,
+            RRDTimeFrameResolution::Day => &self.day,
+            RRDTimeFrameResolution::Week => &self.week,
+            RRDTimeFrameResolution::Month => &self.month,
+            RRDTimeFrameResolution::Year => &self.year,
+        };
+        
+        let mut t = start;
+        let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+        for _ in 0..RRD_DATA_ENTRIES {
+            if t < rrd_start || t > rrd_end {
+                list.push(json!({ "time": t }));
+            } else {
+                let entry = data[index];
+                if entry.count == 0 {
+                    list.push(json!({ "time": t }));
+                } else {
+                    let value = match mode {
+                        RRDMode::Max => entry.max,
+                        RRDMode::Average => entry.average,
+                    };
+                    list.push(json!({ "time": t, "value": value }));
+                }
+            }
+            t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
+        }
+
+        list.into()
+    }
+
+    pub fn from_raw(mut raw: &[u8]) -> Result<Self, Error> {
+        let expected_len = std::mem::size_of::<RRD>();
+        if raw.len() != expected_len {
+            bail!("RRD::from_raw failed - wrong data size ({} != {})", raw.len(), expected_len);
+        }
+        
+        let mut rrd: RRD = unsafe { std::mem::zeroed() };
+        unsafe {
+            let rrd_slice = std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len);
+            raw.read_exact(rrd_slice)?;
+        }
+
+        Ok(rrd)
+    }
+
+    pub fn load(filename: &Path) -> Result<Self, Error> {
+        let raw = proxmox::tools::fs::file_get_contents(filename)?;
+        Self::from_raw(&raw)
+    }
+    
+    pub fn save(&self, filename: &Path) -> Result<(), Error> {
+        use proxmox::tools::{fs::replace_file, fs::CreateOptions};
+
+        let rrd_slice = unsafe {
+            std::slice::from_raw_parts(self as *const _ as *const u8, std::mem::size_of::<RRD>())
+        };
+
+        let backup_user = crate::backup::backup_user()?;
+        let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
+        // set the correct owner/group/permissions while saving file
+        // owner(rw) = backup, group(r)= backup
+        let options = CreateOptions::new()
+            .perm(mode)
+            .owner(backup_user.uid)
+            .group(backup_user.gid);
+
+        replace_file(filename, rrd_slice, options)?;
+        
+        Ok(())
+    }
+    
+    fn compute_new_value(
+        data: &[RRDEntry; RRD_DATA_ENTRIES],
+        index: usize,
+        value: f64,
+    ) -> RRDEntry {        
+        let RRDEntry { max, average, count } = data[index];
+        let new_count = count + 1; // fixme: check overflow?
+        if count == 0 {
+            RRDEntry { max: value, average: value,  count: 1 }
+        } else {
+            let new_max = if max > value { max } else { value };
+            let new_average = (average*(count as f64) + value)/(new_count as f64);
+            RRDEntry { max: new_max, average: new_average, count: new_count }
+        }
+    }
+    
+    pub fn update(&mut self, epoch: u64, value: f64) {
+        // fixme: check time progress (epoch  last)
+        let last = self.last_update;
+
+        let reso = RRDTimeFrameResolution::Hour as u64;
+
+        let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso;
+        let mut t = last;
+        let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+        for _ in 0..RRD_DATA_ENTRIES {
+            if t < min_time { self.hour[index] = RRDEntry::default(); }
+            t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
+        }
+        let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+        self.hour[index] = Self::compute_new_value(&self.hour, index, value);
+
+        let reso = RRDTimeFrameResolution::Day as u64; 
+        let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso;
+        let mut t = last;
+        let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+        for _ in 0..RRD_DATA_ENTRIES {
+            if t < min_time { self.day[index] = RRDEntry::default(); }
+            t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
+        }
+        let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+        self.day[index] = Self::compute_new_value(&self.day, index, value);
+        let reso = RRDTimeFrameResolution::Week as u64; 
+        let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso;
+        let mut t = last;
+        let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+        for _ in 0..RRD_DATA_ENTRIES {
+            if t < min_time { self.week[index] = RRDEntry::default(); }
+            t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
+        }
+        let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+        self.week[index] = Self::compute_new_value(&self.week, index, value);
+
+        let reso = RRDTimeFrameResolution::Month as u64; 
+        let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso;
+        let mut t = last;
+        let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+        for _ in 0..RRD_DATA_ENTRIES {
+            if t < min_time { self.month[index] = RRDEntry::default(); }
+            t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
+        }
+        let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+        self.month[index] = Self::compute_new_value(&self.month, index, value);
+        
+        let reso = RRDTimeFrameResolution::Year as u64; 
+        let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso;
+        let mut t = last;
+        let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+        for _ in 0..RRD_DATA_ENTRIES {
+            if t < min_time { self.year[index] = RRDEntry::default(); }
+            t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
+        }
+        let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+        self.year[index] = Self::compute_new_value(&self.year, index, value);
+
+        self.last_update = epoch;
+    }
+}