]> git.proxmox.com Git - proxmox-backup.git/blobdiff - proxmox-rrd/src/rrd.rs
tree-wide: fix needless borrows
[proxmox-backup.git] / proxmox-rrd / src / rrd.rs
index c31572a4d8dc58ca7b6974aaa2d2808d9092e1e6..4b48d0cfecebc307bb2e4e1d445dee3b5a00fd5f 100644 (file)
-//! # Round Robin Database file format
+//! # Proxmox RRD format version 2
+//!
+//! The new format uses
+//! [CBOR](https://datatracker.ietf.org/doc/html/rfc8949) as storage
+//! format. This way we can use the serde serialization framework,
+//! which make our code more flexible, much nicer and type safe.
+//!
+//! ## Features
+//!
+//! * Well defined data format [CBOR](https://datatracker.ietf.org/doc/html/rfc8949)
+//! * Plattform independent (big endian f64, hopefully a standard format?)
+//! * Arbitrary number of RRAs (dynamically changeable)
 
-use std::io::Read;
 use std::path::Path;
+use std::io::{Read, Write};
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
+
+use anyhow::{bail, format_err, Error};
+use serde::{Serialize, Deserialize};
+
+use proxmox_sys::fs::{make_tmp_file, CreateOptions};
+use proxmox_schema::api;
+
+use crate::rrd_v1;
+
+/// Proxmox RRD v2 file magic number
+// openssl::sha::sha256(b"Proxmox Round Robin Database file v2.0")[0..8];
+pub const PROXMOX_RRD_MAGIC_2_0: [u8; 8] = [224, 200, 228, 27, 239, 112, 122, 159];
+
+#[api()]
+#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq)]
+#[serde(rename_all = "kebab-case")]
+/// RRD data source type
+pub enum DST {
+    /// Gauge values are stored unmodified.
+    Gauge,
+    /// Stores the difference to the previous value.
+    Derive,
+    /// Stores the difference to the previous value (like Derive), but
+    /// detect counter overflow (and ignores that value)
+    Counter,
+}
+
+#[api()]
+#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq)]
+#[serde(rename_all = "kebab-case")]
+/// Consolidation function
+pub enum CF {
+    /// Average
+    Average,
+    /// Maximum
+    Maximum,
+    /// Minimum
+    Minimum,
+    /// Use the last value
+    Last,
+}
 
-use anyhow::Error;
-use bitflags::bitflags;
+#[derive(Serialize, Deserialize)]
+/// Data source specification
+pub struct DataSource {
+    /// Data source type
+    pub dst: DST,
+    /// Last update time (epoch)
+    pub last_update: f64,
+    /// Stores the last value, used to compute differential value for
+    /// derive/counters
+    pub last_value: f64,
+}
 
-use proxmox::tools::{fs::replace_file, fs::CreateOptions};
+impl DataSource {
+
+    /// Create a new Instance
+    pub fn new(dst: DST) -> Self {
+        Self {
+            dst,
+            last_update: 0.0,
+            last_value: f64::NAN,
+        }
+    }
 
-use proxmox_rrd_api_types::{RRDMode, RRDTimeFrameResolution};
+    fn compute_new_value(&mut self, time: f64, mut value: f64) -> Result<f64, Error> {
+        if time < 0.0 {
+            bail!("got negative time");
+        }
+        if time <= self.last_update {
+            bail!("time in past ({} < {})", time, self.last_update);
+        }
 
-/// The number of data entries per RRA
-pub const RRD_DATA_ENTRIES: usize = 70;
+        if value.is_nan() {
+            bail!("new value is NAN");
+        }
 
-/// Proxmox RRD file magic number
-// openssl::sha::sha256(b"Proxmox Round Robin Database file v1.0")[0..8];
-pub const PROXMOX_RRD_MAGIC_1_0: [u8; 8] =  [206, 46, 26, 212, 172, 158, 5, 186];
+        // derive counter value
+        let is_counter = self.dst == DST::Counter;
 
-use crate::DST;
+        if is_counter || self.dst == DST::Derive {
+            let time_diff = time - self.last_update;
 
-bitflags!{
-    /// Flags to specify the data soure type and consolidation function
-    pub struct RRAFlags: u64 {
-        // Data Source Types
-        const DST_GAUGE  = 1;
-        const DST_DERIVE = 2;
-        const DST_COUNTER = 4;
-        const DST_MASK   = 255; // first 8 bits
+            let diff = if self.last_value.is_nan() {
+                0.0
+            } else if is_counter && value < 0.0 {
+                bail!("got negative value for counter");
+            } else if is_counter && value < self.last_value {
+                // Note: We do not try automatic overflow corrections, but
+                // we update last_value anyways, so that we can compute the diff
+                // next time.
+                self.last_value = value;
+                bail!("conter overflow/reset detected");
+            } else {
+                value - self.last_value
+            };
+            self.last_value = value;
+            value = diff/time_diff;
+        } else {
+            self.last_value = value;
+        }
 
-        // Consolidation Functions
-        const CF_AVERAGE = 1 << 8;
-        const CF_MAX     = 2 << 8;
-        const CF_MASK    = 255 << 8;
+        Ok(value)
     }
+
+
 }
 
-/// Round Robin Archive with [RRD_DATA_ENTRIES] data slots.
-///
-/// This data structure is used inside [RRD] and directly written to the
-/// RRD files.
-#[repr(C)]
+#[derive(Serialize, Deserialize)]
+/// Round Robin Archive
 pub struct RRA {
-    /// Defined the data soure type and consolidation function
-    pub flags: RRAFlags,
-    /// Resulution (seconds) from [RRDTimeFrameResolution]
+    /// Number of seconds spaned by a single data entry.
     pub resolution: u64,
-    /// Last update time (epoch)
-    pub last_update: f64,
-    /// Count values computed inside this update interval
+    /// Consolitation function.
+    pub cf: CF,
+    /// Count values computed inside this update interval.
     pub last_count: u64,
-    /// Stores the last value, used to compute differential value for derive/counters
-    pub counter_value: f64,
-    /// Data slots
-    pub data: [f64; RRD_DATA_ENTRIES],
+    /// The actual data entries.
+    pub data: Vec<f64>,
 }
 
 impl RRA {
-    fn new(flags: RRAFlags, resolution: u64) -> Self {
+
+    /// Creates a new instance
+    pub fn new(cf: CF, resolution: u64, points: usize) -> Self {
         Self {
-            flags, resolution,
-            last_update: 0.0,
+            cf,
+            resolution,
             last_count: 0,
-            counter_value: f64::NAN,
-            data: [f64::NAN; RRD_DATA_ENTRIES],
+            data: vec![f64::NAN; points],
+        }
+    }
+
+    /// Data slot end time
+    pub fn slot_end_time(&self, time: u64) -> u64 {
+        self.resolution * (time / self.resolution + 1)
+    }
+
+    /// Data slot start time
+    pub fn slot_start_time(&self, time: u64) -> u64 {
+        self.resolution * (time / self.resolution)
+    }
+
+    /// Data slot index
+    pub fn slot(&self, time: u64) -> usize {
+        ((time / self.resolution) as usize) % self.data.len()
+    }
+
+    /// Directly overwrite data slots.
+    ///
+    /// The caller need to set `last_update` value on the [DataSource] manually.
+    pub fn insert_data(
+        &mut self,
+        start: u64,
+        resolution: u64,
+        data: Vec<Option<f64>>,
+    ) -> Result<(), Error> {
+        if resolution != self.resolution {
+            bail!("inser_data failed: got wrong resolution");
+        }
+
+        let mut index = self.slot(start);
+
+        for i in 0..data.len() {
+            if let Some(v) = data[i] {
+                self.data[index] = v;
+            }
+            index += 1; if index >= self.data.len() { index = 0; }
         }
+        Ok(())
     }
 
-    fn delete_old(&mut self, time: f64) {
+    fn delete_old_slots(&mut self, time: f64, last_update: f64) {
         let epoch = time as u64;
-        let last_update = self.last_update as u64;
+        let last_update = last_update as u64;
         let reso = self.resolution;
+        let num_entries = self.data.len() as u64;
+
+        let min_time = epoch.saturating_sub(num_entries*reso);
+        let min_time = self.slot_end_time(min_time);
 
-        let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso;
-        let min_time = (min_time/reso + 1)*reso;
-        let mut t = last_update.saturating_sub((RRD_DATA_ENTRIES as u64)*reso);
-        let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
-        for _ in 0..RRD_DATA_ENTRIES {
-            t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
+        let mut t = last_update.saturating_sub(num_entries*reso);
+        let mut index = self.slot(t);
+
+        for _ in 0..num_entries {
+            t += reso;
+            index += 1; if index >= self.data.len() { index = 0; }
             if t < min_time {
                 self.data[index] = f64::NAN;
             } else {
@@ -85,15 +209,15 @@ impl RRA {
         }
     }
 
-    fn compute_new_value(&mut self, time: f64, value: f64) {
+    fn compute_new_value(&mut self, time: f64, last_update: f64, value: f64) {
         let epoch = time as u64;
-        let last_update = self.last_update as u64;
+        let last_update = last_update as u64;
         let reso = self.resolution;
 
-        let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
-        let last_index = ((last_update/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+        let index = self.slot(epoch);
+        let last_index = self.slot(last_update);
 
-        if (epoch - (last_update as u64)) > reso || index != last_index {
+        if (epoch - last_update) > reso || index != last_index {
             self.last_count = 0;
         }
 
@@ -102,267 +226,376 @@ impl RRA {
             self.last_count = 0;
         }
 
-        let new_count = if self.last_count < u64::MAX {
-            self.last_count + 1
-        } else {
-            u64::MAX // should never happen
-        };
+        let new_count = self.last_count.saturating_add(1);
 
         if self.last_count == 0 {
             self.data[index] = value;
             self.last_count = 1;
         } else {
-            let new_value = if self.flags.contains(RRAFlags::CF_MAX) {
-                if last_value > value { last_value } else { value }
-            } else if self.flags.contains(RRAFlags::CF_AVERAGE) {
-                (last_value*(self.last_count as f64))/(new_count as f64)
-                    + value/(new_count as f64)
-            } else {
-                log::error!("rrdb update failed - unknown CF");
-                return;
+            let new_value = match self.cf {
+                CF::Maximum => if last_value > value { last_value } else { value },
+                CF::Minimum => if last_value < value { last_value } else { value },
+                CF::Last => value,
+                CF::Average => {
+                    (last_value*(self.last_count as f64))/(new_count as f64)
+                        + value/(new_count as f64)
+                }
             };
             self.data[index] = new_value;
             self.last_count = new_count;
         }
-        self.last_update = time;
     }
 
-    fn update(&mut self, time: f64, mut value: f64, log_time_in_past: &mut bool) {
-
-        if time <= self.last_update {
-            if *log_time_in_past {
-                log::warn!("rrdb update failed - time in past ({} < {})", time, self.last_update);
-                *log_time_in_past = false; // avoid logging this multiple times inside a RRD
-            }
-            return;
-        }
+    /// Extract data
+    ///
+    /// Extract data from `start` to `end`. The RRA itself does not
+    /// store the `last_update` time, so you need to pass this a
+    /// parameter (see [DataSource]).
+    pub fn extract_data(
+        &self,
+        start: u64,
+        end: u64,
+        last_update: f64,
+    ) -> (u64, u64, Vec<Option<f64>>) {
+        let last_update = last_update as u64;
+        let reso = self.resolution;
+        let num_entries = self.data.len() as u64;
 
-        if value.is_nan() {
-            log::warn!("rrdb update failed - new value is NAN");
-            return;
-        }
+        let mut list = Vec::new();
 
-        // derive counter value
-        if self.flags.intersects(RRAFlags::DST_DERIVE | RRAFlags::DST_COUNTER) {
-            let time_diff = time - self.last_update;
-            let is_counter = self.flags.contains(RRAFlags::DST_COUNTER);
+        let rrd_end = self.slot_end_time(last_update);
+        let rrd_start = rrd_end.saturating_sub(reso*num_entries);
 
-            let diff = if self.counter_value.is_nan() {
-                0.0
-            } else if is_counter && value < 0.0 {
-                log::warn!("rrdb update failed - got negative value for counter");
-                return;
-            } else if is_counter && value < self.counter_value {
-                // Note: We do not try automatic overflow corrections
-                self.counter_value = value;
-                log::warn!("rrdb update failed - conter overflow/reset detected");
-                return;
+        let mut t = start;
+        let mut index = self.slot(t);
+        for _ in 0..num_entries {
+            if t > end { break; };
+            if t < rrd_start || t >= rrd_end {
+                list.push(None);
             } else {
-                value - self.counter_value
-            };
-            self.counter_value = value;
-            value = diff/time_diff;
+                let value = self.data[index];
+                if value.is_nan() {
+                    list.push(None);
+                } else {
+                    list.push(Some(value));
+                }
+            }
+            t += reso;
+            index += 1; if index >= self.data.len() { index = 0; }
         }
 
-        self.delete_old(time);
-        self.compute_new_value(time, value);
+        (start, reso, list)
     }
 }
 
-/// Round Robin Database file format with fixed number of [RRA]s
-#[repr(C)]
-// Note: Avoid alignment problems by using 8byte types only
+#[derive(Serialize, Deserialize)]
+/// Round Robin Database
 pub struct RRD {
-    /// The magic number to identify the file type
-    pub magic: [u8; 8],
-    /// Hourly data (average values)
-    pub hour_avg: RRA,
-    /// Hourly data (maximum values)
-    pub hour_max: RRA,
-    /// Dayly data (average values)
-    pub day_avg: RRA,
-    /// Dayly data (maximum values)
-    pub day_max: RRA,
-    /// Weekly data (average values)
-    pub week_avg: RRA,
-    /// Weekly data (maximum values)
-    pub week_max: RRA,
-    /// Monthly data (average values)
-    pub month_avg: RRA,
-    /// Monthly data (maximum values)
-    pub month_max: RRA,
-    /// Yearly data (average values)
-    pub year_avg: RRA,
-    /// Yearly data (maximum values)
-    pub year_max: RRA,
+    /// The data source definition
+    pub source: DataSource,
+    /// List of round robin archives
+    pub rra_list: Vec<RRA>,
 }
 
 impl RRD {
 
-    /// Create a new empty instance
-    pub fn new(dst: DST) -> Self {
-        let flags = match dst {
-            DST::Gauge => RRAFlags::DST_GAUGE,
-            DST::Derive => RRAFlags::DST_DERIVE,
+    /// Creates a new Instance
+    pub fn new(dst: DST, rra_list: Vec<RRA>) -> RRD {
+
+        let source = DataSource::new(dst);
+
+        RRD {
+            source,
+            rra_list,
+        }
+
+    }
+
+    fn from_raw(raw: &[u8]) -> Result<Self, Error> {
+        if raw.len() < 8 {
+            bail!("not an rrd file - file is too small ({})", raw.len());
+        }
+
+        let rrd = if raw[0..8] == rrd_v1::PROXMOX_RRD_MAGIC_1_0 {
+            let v1 = rrd_v1::RRDv1::from_raw(raw)?;
+            v1.to_rrd_v2()
+                .map_err(|err| format_err!("unable to convert from old V1 format - {}", err))?
+        } else if raw[0..8] == PROXMOX_RRD_MAGIC_2_0 {
+            serde_cbor::from_slice(&raw[8..])
+                .map_err(|err| format_err!("unable to decode RRD file - {}", err))?
+        } else {
+            bail!("not an rrd file - unknown magic number");
         };
 
-        Self {
-            magic: PROXMOX_RRD_MAGIC_1_0,
-            hour_avg: RRA::new(
-                flags | RRAFlags::CF_AVERAGE,
-                RRDTimeFrameResolution::Hour as u64,
-            ),
-            hour_max: RRA::new(
-                flags |  RRAFlags::CF_MAX,
-                RRDTimeFrameResolution::Hour as u64,
-            ),
-            day_avg: RRA::new(
-                flags |  RRAFlags::CF_AVERAGE,
-                RRDTimeFrameResolution::Day as u64,
-            ),
-            day_max: RRA::new(
-                flags |  RRAFlags::CF_MAX,
-                RRDTimeFrameResolution::Day as u64,
-            ),
-            week_avg: RRA::new(
-                flags |  RRAFlags::CF_AVERAGE,
-                RRDTimeFrameResolution::Week as u64,
-            ),
-            week_max: RRA::new(
-                flags |  RRAFlags::CF_MAX,
-                RRDTimeFrameResolution::Week as u64,
-            ),
-            month_avg: RRA::new(
-                flags |  RRAFlags::CF_AVERAGE,
-                RRDTimeFrameResolution::Month as u64,
-            ),
-            month_max: RRA::new(
-                flags |  RRAFlags::CF_MAX,
-                RRDTimeFrameResolution::Month as u64,
-            ),
-            year_avg: RRA::new(
-                flags |  RRAFlags::CF_AVERAGE,
-                RRDTimeFrameResolution::Year as u64,
-            ),
-            year_max: RRA::new(
-                flags |  RRAFlags::CF_MAX,
-                RRDTimeFrameResolution::Year as u64,
-            ),
+        if rrd.source.last_update < 0.0 {
+            bail!("rrd file has negative last_update time");
         }
+
+        Ok(rrd)
     }
 
-    /// Extract data from the archive
-    pub fn extract_data(
+    /// Load data from a file
+    ///
+    /// Setting `avoid_page_cache` uses
+    /// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in
+    /// the linux page cache.
+    pub fn load(path: &Path, avoid_page_cache: bool) -> Result<Self, std::io::Error> {
+
+        let mut file = std::fs::File::open(path)?;
+        let buffer_size = file.metadata().map(|m| m.len() as usize + 1).unwrap_or(0);
+        let mut raw = Vec::with_capacity(buffer_size);
+        file.read_to_end(&mut raw)?;
+
+        if avoid_page_cache {
+            nix::fcntl::posix_fadvise(
+                file.as_raw_fd(),
+                0,
+                buffer_size as i64,
+                nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED,
+            ).map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?;
+        }
+
+        match Self::from_raw(&raw) {
+            Ok(rrd) => Ok(rrd),
+            Err(err) =>  Err(std::io::Error::new(std::io::ErrorKind::Other, err.to_string())),
+        }
+    }
+
+    /// Store data into a file (atomic replace file)
+    ///
+    /// Setting `avoid_page_cache` uses
+    /// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in
+    /// the linux page cache.
+   pub fn save(
         &self,
-        time: f64,
-        timeframe: RRDTimeFrameResolution,
-        mode: RRDMode,
-    ) -> (u64, u64, Vec<Option<f64>>) {
+        path: &Path,
+        options: CreateOptions,
+        avoid_page_cache: bool,
+    ) -> Result<(), Error> {
+
+        let (fd, tmp_path) = make_tmp_file(&path, options)?;
+        let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) };
+
+        let mut try_block = || -> Result<(), Error> {
+            let mut data: Vec<u8> = Vec::new();
+            data.extend(&PROXMOX_RRD_MAGIC_2_0);
+            serde_cbor::to_writer(&mut data, self)?;
+            file.write_all(&data)?;
+
+            if avoid_page_cache {
+                nix::fcntl::posix_fadvise(
+                    file.as_raw_fd(),
+                    0,
+                    data.len() as i64,
+                    nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED,
+                )?;
+            }
 
-        let epoch = time as u64;
-        let reso = timeframe as u64;
+            Ok(())
+        };
 
-        let end = reso*(epoch/reso + 1);
-        let start = end - reso*(RRD_DATA_ENTRIES as u64);
+        match try_block() {
+            Ok(()) => (),
+            error => {
+                let _ = nix::unistd::unlink(&tmp_path);
+                return error;
+            }
+        }
 
-        let mut list = Vec::new();
+        if let Err(err) = std::fs::rename(&tmp_path, &path) {
+            let _ = nix::unistd::unlink(&tmp_path);
+            bail!("Atomic rename failed - {}", err);
+        }
+
+        Ok(())
+    }
+
+    /// Returns the last update time.
+    pub fn last_update(&self) -> f64 {
+        self.source.last_update
+    }
+
+    /// Update the value (in memory)
+    ///
+    /// Note: This does not call [Self::save].
+    pub fn update(&mut self, time: f64, value: f64) {
 
-        let raa = match (mode, timeframe) {
-            (RRDMode::Average, RRDTimeFrameResolution::Hour) => &self.hour_avg,
-            (RRDMode::Max, RRDTimeFrameResolution::Hour) => &self.hour_max,
-            (RRDMode::Average, RRDTimeFrameResolution::Day) => &self.day_avg,
-            (RRDMode::Max, RRDTimeFrameResolution::Day) => &self.day_max,
-            (RRDMode::Average, RRDTimeFrameResolution::Week) => &self.week_avg,
-            (RRDMode::Max, RRDTimeFrameResolution::Week) => &self.week_max,
-            (RRDMode::Average, RRDTimeFrameResolution::Month) => &self.month_avg,
-            (RRDMode::Max, RRDTimeFrameResolution::Month) => &self.month_max,
-            (RRDMode::Average, RRDTimeFrameResolution::Year) => &self.year_avg,
-            (RRDMode::Max, RRDTimeFrameResolution::Year) => &self.year_max,
+        let value = match self.source.compute_new_value(time, value) {
+            Ok(value) => value,
+            Err(err) => {
+                log::error!("rrd update failed: {}", err);
+                return;
+            }
         };
 
-        let rrd_end = reso*((raa.last_update as u64)/reso);
-        let rrd_start = rrd_end - reso*(RRD_DATA_ENTRIES as u64);
+        let last_update = self.source.last_update;
+        self.source.last_update = time;
 
-        let mut t = start;
-        let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
-        for _ in 0..RRD_DATA_ENTRIES {
-            if t < rrd_start || t > rrd_end {
-                list.push(None);
-            } else {
-                let value = raa.data[index];
-                if value.is_nan() {
-                    list.push(None);
-                } else {
-                    list.push(Some(value));
+        for rra in self.rra_list.iter_mut() {
+            rra.delete_old_slots(time, last_update);
+            rra.compute_new_value(time, last_update, value);
+        }
+    }
+
+    /// Extract data from the archive
+    ///
+    /// This selects the RRA with specified [CF] and (minimum)
+    /// resolution, and extract data from `start` to `end`.
+    ///
+    /// `start`: Start time. If not sepecified, we simply extract 10 data points.
+    /// `end`: End time. Default is to use the current time.
+    pub fn extract_data(
+        &self,
+        cf: CF,
+        resolution: u64,
+        start: Option<u64>,
+        end: Option<u64>,
+    ) -> Result<(u64, u64, Vec<Option<f64>>), Error> {
+
+        let mut rra: Option<&RRA> = None;
+        for item in self.rra_list.iter() {
+            if item.cf != cf { continue; }
+            if item.resolution > resolution { continue; }
+
+            if let Some(current) = rra {
+                if item.resolution > current.resolution {
+                    rra = Some(item);
                 }
+            } else {
+                rra = Some(item);
             }
-            t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
         }
 
-        (start, reso, list)
+        match rra {
+            Some(rra) => {
+                let end = end.unwrap_or_else(|| proxmox_time::epoch_f64() as u64);
+                let start = start.unwrap_or(end.saturating_sub(10*rra.resolution));
+                Ok(rra.extract_data(start, end, self.source.last_update))
+            }
+            None => bail!("unable to find RRA suitable ({:?}:{})", cf, resolution),
+        }
     }
 
-    /// Create instance from raw data, testing data len and magic number
-    pub fn from_raw(mut raw: &[u8]) -> Result<Self, std::io::Error> {
-        let expected_len = std::mem::size_of::<RRD>();
-        if raw.len() != expected_len {
-            let msg = format!("wrong data size ({} != {})", raw.len(), expected_len);
-            return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
-        }
+}
+
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn basic_rra_maximum_gauge_test() -> Result<(), Error> {
+        let rra = RRA::new(CF::Maximum, 60, 5);
+        let mut rrd = RRD::new(DST::Gauge, vec![rra]);
 
-        let mut rrd: RRD = unsafe { std::mem::zeroed() };
-        unsafe {
-            let rrd_slice = std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len);
-            raw.read_exact(rrd_slice)?;
+        for i in 2..10 {
+            rrd.update((i as f64)*30.0, i as f64);
         }
 
-        if rrd.magic != PROXMOX_RRD_MAGIC_1_0 {
-            let msg = "wrong magic number".to_string();
-            return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+        let (start, reso, data) = rrd.extract_data(CF::Maximum, 60, Some(0), Some(5*60))?;
+        assert_eq!(start, 0);
+        assert_eq!(reso, 60);
+        assert_eq!(data, [None, Some(3.0), Some(5.0), Some(7.0), Some(9.0)]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn basic_rra_minimum_gauge_test() -> Result<(), Error> {
+        let rra = RRA::new(CF::Minimum, 60, 5);
+        let mut rrd = RRD::new(DST::Gauge, vec![rra]);
+
+        for i in 2..10 {
+            rrd.update((i as f64)*30.0, i as f64);
         }
 
-        Ok(rrd)
+        let (start, reso, data) = rrd.extract_data(CF::Minimum, 60, Some(0), Some(5*60))?;
+        assert_eq!(start, 0);
+        assert_eq!(reso, 60);
+        assert_eq!(data, [None, Some(2.0), Some(4.0), Some(6.0), Some(8.0)]);
+
+        Ok(())
     }
 
-    /// Load data from a file
-    pub fn load(path: &Path) -> Result<Self, std::io::Error> {
-        let raw = std::fs::read(path)?;
-        Self::from_raw(&raw)
+    #[test]
+    fn basic_rra_last_gauge_test() -> Result<(), Error> {
+        let rra = RRA::new(CF::Last, 60, 5);
+        let mut rrd = RRD::new(DST::Gauge, vec![rra]);
+
+        for i in 2..10 {
+            rrd.update((i as f64)*30.0, i as f64);
+        }
+
+        assert!(rrd.extract_data(CF::Average, 60, Some(0), Some(5*60)).is_err(), "CF::Average should not exist");
+
+        let (start, reso, data) = rrd.extract_data(CF::Last, 60, Some(0), Some(20*60))?;
+        assert_eq!(start, 0);
+        assert_eq!(reso, 60);
+        assert_eq!(data, [None, Some(3.0), Some(5.0), Some(7.0), Some(9.0)]);
+
+        Ok(())
     }
 
-    /// Store data into a file (atomic replace file)
-    pub fn save(&self, filename: &Path, options: CreateOptions) -> Result<(), Error> {
-        let rrd_slice = unsafe {
-            std::slice::from_raw_parts(self as *const _ as *const u8, std::mem::size_of::<RRD>())
-        };
-        replace_file(filename, rrd_slice, options)
+    #[test]
+    fn basic_rra_average_derive_test() -> Result<(), Error> {
+        let rra = RRA::new(CF::Average, 60, 5);
+        let mut rrd = RRD::new(DST::Derive, vec![rra]);
+
+        for i in 2..10 {
+            rrd.update((i as f64)*30.0, (i*60) as f64);
+        }
+
+        let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(60), Some(5*60))?;
+        assert_eq!(start, 60);
+        assert_eq!(reso, 60);
+        assert_eq!(data, [Some(1.0), Some(2.0), Some(2.0), Some(2.0), None]);
+
+        Ok(())
     }
 
-    /// Update the value (in memory)
-    ///
-    /// Note: This does not call [Self::save].
-    pub fn update(&mut self, time: f64, value: f64) {
+    #[test]
+    fn basic_rra_average_gauge_test() -> Result<(), Error> {
+        let rra = RRA::new(CF::Average, 60, 5);
+        let mut rrd = RRD::new(DST::Gauge, vec![rra]);
 
-        if value.is_nan() {
-            log::warn!("rrdb update failed - new value is NAN");
-            return;
+        for i in 2..10 {
+            rrd.update((i as f64)*30.0, i as f64);
+        }
+
+        let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(60), Some(5*60))?;
+        assert_eq!(start, 60);
+        assert_eq!(reso, 60);
+        assert_eq!(data, [Some(2.5), Some(4.5), Some(6.5), Some(8.5), None]);
+
+        for i in 10..14 {
+            rrd.update((i as f64)*30.0, i as f64);
         }
 
-        let mut log_time_in_past = true;
+        let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(60), Some(5*60))?;
+        assert_eq!(start, 60);
+        assert_eq!(reso, 60);
+        assert_eq!(data, [None, Some(4.5), Some(6.5), Some(8.5), Some(10.5)]);
 
-        self.hour_avg.update(time, value, &mut log_time_in_past);
-        self.hour_max.update(time, value, &mut log_time_in_past);
+        let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(3*60), Some(8*60))?;
+        assert_eq!(start, 3*60);
+        assert_eq!(reso, 60);
+        assert_eq!(data, [Some(6.5), Some(8.5), Some(10.5), Some(12.5), None]);
 
-        self.day_avg.update(time, value, &mut log_time_in_past);
-        self.day_max.update(time, value, &mut log_time_in_past);
+        // add much newer vaule (should delete all previous/outdated value)
+        let i = 100; rrd.update((i as f64)*30.0, i as f64);
+        println!("TEST {:?}", serde_json::to_string_pretty(&rrd));
 
-        self.week_avg.update(time, value, &mut log_time_in_past);
-        self.week_max.update(time, value, &mut log_time_in_past);
+        let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(100*30), Some(100*30 + 5*60))?;
+        assert_eq!(start, 100*30);
+        assert_eq!(reso, 60);
+        assert_eq!(data, [Some(100.0), None, None, None, None]);
 
-        self.month_avg.update(time, value, &mut log_time_in_past);
-        self.month_max.update(time, value, &mut log_time_in_past);
+        // extract with end time smaller than start time
+        let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(100*30), Some(60))?;
+        assert_eq!(start, 100*30);
+        assert_eq!(reso, 60);
+        assert_eq!(data, []);
 
-        self.year_avg.update(time, value, &mut log_time_in_past);
-        self.year_max.update(time, value, &mut log_time_in_past);
+        Ok(())
     }
 }