]> git.proxmox.com Git - proxmox-backup.git/commitdiff
remove `proxmox-rrd` crate
authorLukas Wagner <l.wagner@proxmox.com>
Wed, 31 Jan 2024 15:24:36 +0000 (16:24 +0100)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Thu, 1 Feb 2024 12:56:28 +0000 (13:56 +0100)
The crate was split out and moved to the `proxmox` workspace.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
12 files changed:
Cargo.toml
proxmox-rrd/Cargo.toml [deleted file]
proxmox-rrd/examples/prrd.rs [deleted file]
proxmox-rrd/src/cache.rs [deleted file]
proxmox-rrd/src/cache/journal.rs [deleted file]
proxmox-rrd/src/cache/rrd_map.rs [deleted file]
proxmox-rrd/src/lib.rs [deleted file]
proxmox-rrd/src/rrd.rs [deleted file]
proxmox-rrd/src/rrd_v1.rs [deleted file]
proxmox-rrd/tests/file_format_test.rs [deleted file]
proxmox-rrd/tests/testdata/cpu.rrd_v1 [deleted file]
proxmox-rrd/tests/testdata/cpu.rrd_v2 [deleted file]

index 6255a6a0ec25b6e8ad5dc7ac6d10c08ded1821c1..9965dd47a2de5c727b791ce94d1a653b2372a354 100644 (file)
@@ -132,7 +132,6 @@ pin-project-lite = "0.2"
 regex = "1.5.5"
 rustyline = "9"
 serde = { version = "1.0", features = ["derive"] }
-serde_cbor = "0.11.1"
 serde_json = "1.0"
 serde_plain = "1"
 siphasher = "0.3"
diff --git a/proxmox-rrd/Cargo.toml b/proxmox-rrd/Cargo.toml
deleted file mode 100644 (file)
index c0b7d70..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-[package]
-name = "proxmox-rrd"
-version = "0.1.0"
-authors.workspace = true
-edition.workspace = true
-license.workspace = true
-description = "Simple RRD database implementation."
-
-[dev-dependencies]
-proxmox-router = { workspace = true, features = ["cli", "server"] }
-
-[dependencies]
-anyhow.workspace = true
-bitflags.workspace = true
-crossbeam-channel.workspace = true
-libc.workspace = true
-log.workspace = true
-nix.workspace = true
-serde.workspace = true
-serde_cbor.workspace = true
-serde_json.workspace = true
-
-proxmox-schema = { workspace = true, features = [ "api-macro" ] }
-proxmox-sys.workspace = true
-proxmox-time.workspace = true
diff --git a/proxmox-rrd/examples/prrd.rs b/proxmox-rrd/examples/prrd.rs
deleted file mode 100644 (file)
index c7d2937..0000000
+++ /dev/null
@@ -1,390 +0,0 @@
-//! RRD toolkit - create/manage/update proxmox RRD (v2) file
-
-use std::path::PathBuf;
-
-use anyhow::{bail, Error};
-use serde::{Deserialize, Serialize};
-use serde_json::json;
-
-use proxmox_router::cli::{
-    complete_file_name, run_cli_command, CliCommand, CliCommandMap, CliEnvironment,
-};
-use proxmox_router::RpcEnvironment;
-use proxmox_schema::{api, ApiStringFormat, ApiType, IntegerSchema, Schema, StringSchema};
-
-use proxmox_sys::fs::CreateOptions;
-
-use proxmox_rrd::rrd::{CF, DST, RRA, RRD};
-
-pub const RRA_INDEX_SCHEMA: Schema = IntegerSchema::new("Index of the RRA.").minimum(0).schema();
-
-pub const RRA_CONFIG_STRING_SCHEMA: Schema = StringSchema::new("RRA configuration")
-    .format(&ApiStringFormat::PropertyString(&RRAConfig::API_SCHEMA))
-    .schema();
-
-#[api(
-    properties: {},
-    default_key: "cf",
-)]
-#[derive(Debug, Serialize, Deserialize)]
-/// RRA configuration
-pub struct RRAConfig {
-    /// Time resolution
-    pub r: u64,
-    pub cf: CF,
-    /// Number of data points
-    pub n: u64,
-}
-
-#[api(
-   input: {
-       properties: {
-          path: {
-              description: "The filename."
-          },
-       },
-   },
-)]
-/// Dump the RRD file in JSON format
-pub fn dump_rrd(path: String) -> Result<(), Error> {
-    let rrd = RRD::load(&PathBuf::from(path), false)?;
-    serde_json::to_writer_pretty(std::io::stdout(), &rrd)?;
-    println!();
-    Ok(())
-}
-
-#[api(
-   input: {
-       properties: {
-          path: {
-              description: "The filename."
-          },
-       },
-   },
-)]
-/// RRD file information
-pub fn rrd_info(path: String) -> Result<(), Error> {
-    let rrd = RRD::load(&PathBuf::from(path), false)?;
-
-    println!("DST: {:?}", rrd.source.dst);
-
-    for (i, rra) in rrd.rra_list.iter().enumerate() {
-        // use RRAConfig property string format
-        println!(
-            "RRA[{}]: {:?},r={},n={}",
-            i,
-            rra.cf,
-            rra.resolution,
-            rra.data.len()
-        );
-    }
-
-    Ok(())
-}
-
-#[api(
-   input: {
-       properties: {
-           path: {
-               description: "The filename."
-           },
-           time: {
-               description: "Update time.",
-               optional: true,
-           },
-           value: {
-               description: "Update value.",
-           },
-       },
-   },
-)]
-/// Update the RRD database
-pub fn update_rrd(path: String, time: Option<u64>, value: f64) -> Result<(), Error> {
-    let path = PathBuf::from(path);
-
-    let time = time
-        .map(|v| v as f64)
-        .unwrap_or_else(proxmox_time::epoch_f64);
-
-    let mut rrd = RRD::load(&path, false)?;
-    rrd.update(time, value);
-
-    rrd.save(&path, CreateOptions::new(), false)?;
-
-    Ok(())
-}
-
-#[api(
-   input: {
-       properties: {
-           path: {
-               description: "The filename."
-           },
-           cf: {
-               type: CF,
-           },
-           resolution: {
-               description: "Time resolution",
-           },
-           start: {
-               description: "Start time. If not specified, we simply extract 10 data points.",
-               optional: true,
-           },
-           end: {
-               description: "End time (Unix Epoch). Default is the last update time.",
-               optional: true,
-           },
-       },
-   },
-)]
-/// Fetch data from the RRD file
-pub fn fetch_rrd(
-    path: String,
-    cf: CF,
-    resolution: u64,
-    start: Option<u64>,
-    end: Option<u64>,
-) -> Result<(), Error> {
-    let rrd = RRD::load(&PathBuf::from(path), false)?;
-
-    let data = rrd.extract_data(cf, resolution, start, end)?;
-
-    println!("{}", serde_json::to_string_pretty(&data)?);
-
-    Ok(())
-}
-
-#[api(
-   input: {
-       properties: {
-           path: {
-               description: "The filename."
-           },
-           "rra-index": {
-               schema: RRA_INDEX_SCHEMA,
-           },
-       },
-   },
-)]
-/// Return the Unix timestamp of the first time slot inside the
-/// specified RRA (slot start time)
-pub fn first_update_time(path: String, rra_index: usize) -> Result<(), Error> {
-    let rrd = RRD::load(&PathBuf::from(path), false)?;
-
-    if rra_index >= rrd.rra_list.len() {
-        bail!("rra-index is out of range");
-    }
-    let rra = &rrd.rra_list[rra_index];
-    let duration = (rra.data.len() as u64) * rra.resolution;
-    let first = rra.slot_start_time((rrd.source.last_update as u64).saturating_sub(duration));
-
-    println!("{}", first);
-    Ok(())
-}
-
-#[api(
-   input: {
-       properties: {
-           path: {
-               description: "The filename."
-           },
-       },
-   },
-)]
-/// Return the Unix timestamp of the last update
-pub fn last_update_time(path: String) -> Result<(), Error> {
-    let rrd = RRD::load(&PathBuf::from(path), false)?;
-
-    println!("{}", rrd.source.last_update);
-    Ok(())
-}
-
-#[api(
-   input: {
-       properties: {
-           path: {
-               description: "The filename."
-           },
-       },
-   },
-)]
-/// Return the time and value from the last update
-pub fn last_update(path: String) -> Result<(), Error> {
-    let rrd = RRD::load(&PathBuf::from(path), false)?;
-
-    let result = json!({
-        "time": rrd.source.last_update,
-        "value": rrd.source.last_value,
-    });
-
-    println!("{}", serde_json::to_string_pretty(&result)?);
-
-    Ok(())
-}
-
-#[api(
-   input: {
-       properties: {
-           dst: {
-               type: DST,
-           },
-           path: {
-               description: "The filename to create."
-           },
-           rra: {
-               description: "Configuration of contained RRAs.",
-               type: Array,
-               items: {
-                   schema:  RRA_CONFIG_STRING_SCHEMA,
-               }
-           },
-       },
-   },
-)]
-/// Create a new RRD file
-pub fn create_rrd(dst: DST, path: String, rra: Vec<String>) -> Result<(), Error> {
-    let mut rra_list = Vec::new();
-
-    for item in rra.iter() {
-        let rra: RRAConfig =
-            serde_json::from_value(RRAConfig::API_SCHEMA.parse_property_string(item)?)?;
-        println!("GOT {:?}", rra);
-        rra_list.push(RRA::new(rra.cf, rra.r, rra.n as usize));
-    }
-
-    let path = PathBuf::from(path);
-
-    let rrd = RRD::new(dst, rra_list);
-
-    rrd.save(&path, CreateOptions::new(), false)?;
-
-    Ok(())
-}
-
-#[api(
-   input: {
-       properties: {
-           path: {
-               description: "The filename."
-           },
-           "rra-index": {
-               schema: RRA_INDEX_SCHEMA,
-           },
-           slots: {
-               description: "The number of slots you want to add or remove.",
-               type: i64,
-           },
-       },
-   },
-)]
-/// Resize. Change the number of data slots for the specified RRA.
-pub fn resize_rrd(path: String, rra_index: usize, slots: i64) -> Result<(), Error> {
-    let path = PathBuf::from(&path);
-
-    let mut rrd = RRD::load(&path, false)?;
-
-    if rra_index >= rrd.rra_list.len() {
-        bail!("rra-index is out of range");
-    }
-
-    let rra = &rrd.rra_list[rra_index];
-
-    let new_slots = (rra.data.len() as i64) + slots;
-
-    if new_slots < 1 {
-        bail!("number of new slots is too small ('{}' < 1)", new_slots);
-    }
-
-    if new_slots > 1024 * 1024 {
-        bail!("number of new slots is too big ('{}' > 1M)", new_slots);
-    }
-
-    let rra_end = rra.slot_end_time(rrd.source.last_update as u64);
-    let rra_start = rra_end - rra.resolution * (rra.data.len() as u64);
-    let (start, reso, data) = rra
-        .extract_data(rra_start, rra_end, rrd.source.last_update)
-        .into();
-
-    let mut new_rra = RRA::new(rra.cf, rra.resolution, new_slots as usize);
-    new_rra.last_count = rra.last_count;
-
-    new_rra.insert_data(start, reso, data)?;
-
-    rrd.rra_list[rra_index] = new_rra;
-
-    rrd.save(&path, CreateOptions::new(), false)?;
-
-    Ok(())
-}
-
-fn main() -> Result<(), Error> {
-    let uid = nix::unistd::Uid::current();
-
-    let username = match nix::unistd::User::from_uid(uid)? {
-        Some(user) => user.name,
-        None => bail!("unable to get user name"),
-    };
-
-    let cmd_def = CliCommandMap::new()
-        .insert(
-            "create",
-            CliCommand::new(&API_METHOD_CREATE_RRD)
-                .arg_param(&["path"])
-                .completion_cb("path", complete_file_name),
-        )
-        .insert(
-            "dump",
-            CliCommand::new(&API_METHOD_DUMP_RRD)
-                .arg_param(&["path"])
-                .completion_cb("path", complete_file_name),
-        )
-        .insert(
-            "fetch",
-            CliCommand::new(&API_METHOD_FETCH_RRD)
-                .arg_param(&["path"])
-                .completion_cb("path", complete_file_name),
-        )
-        .insert(
-            "first",
-            CliCommand::new(&API_METHOD_FIRST_UPDATE_TIME)
-                .arg_param(&["path"])
-                .completion_cb("path", complete_file_name),
-        )
-        .insert(
-            "info",
-            CliCommand::new(&API_METHOD_RRD_INFO)
-                .arg_param(&["path"])
-                .completion_cb("path", complete_file_name),
-        )
-        .insert(
-            "last",
-            CliCommand::new(&API_METHOD_LAST_UPDATE_TIME)
-                .arg_param(&["path"])
-                .completion_cb("path", complete_file_name),
-        )
-        .insert(
-            "lastupdate",
-            CliCommand::new(&API_METHOD_LAST_UPDATE)
-                .arg_param(&["path"])
-                .completion_cb("path", complete_file_name),
-        )
-        .insert(
-            "resize",
-            CliCommand::new(&API_METHOD_RESIZE_RRD)
-                .arg_param(&["path"])
-                .completion_cb("path", complete_file_name),
-        )
-        .insert(
-            "update",
-            CliCommand::new(&API_METHOD_UPDATE_RRD)
-                .arg_param(&["path"])
-                .completion_cb("path", complete_file_name),
-        );
-
-    let mut rpcenv = CliEnvironment::new();
-    rpcenv.set_auth_id(Some(format!("{}@pam", username)));
-
-    run_cli_command(cmd_def, rpcenv, None);
-
-    Ok(())
-}
diff --git a/proxmox-rrd/src/cache.rs b/proxmox-rrd/src/cache.rs
deleted file mode 100644 (file)
index 254010f..0000000
+++ /dev/null
@@ -1,448 +0,0 @@
-use std::collections::BTreeSet;
-use std::fs::File;
-use std::io::{BufRead, BufReader};
-use std::os::unix::io::AsRawFd;
-use std::path::{Path, PathBuf};
-use std::sync::{Arc, RwLock};
-use std::thread::spawn;
-use std::time::SystemTime;
-
-use anyhow::{bail, format_err, Error};
-use crossbeam_channel::{bounded, TryRecvError};
-
-use proxmox_sys::fs::{create_path, CreateOptions};
-
-use crate::rrd::{CF, DST, RRA, RRD};
-use crate::Entry;
-
-mod journal;
-use journal::*;
-
-mod rrd_map;
-use rrd_map::*;
-
-/// RRD cache - keep RRD data in RAM, but write updates to disk
-///
-/// This cache is designed to run as single instance (no concurrent
-/// access from other processes).
-pub struct RRDCache {
-    config: Arc<CacheConfig>,
-    state: Arc<RwLock<JournalState>>,
-    rrd_map: Arc<RwLock<RRDMap>>,
-}
-
-pub(crate) struct CacheConfig {
-    apply_interval: f64,
-    basedir: PathBuf,
-    file_options: CreateOptions,
-    dir_options: CreateOptions,
-}
-
-impl RRDCache {
-    /// Creates a new instance
-    ///
-    /// `basedir`: All files are stored relative to this path.
-    ///
-    /// `file_options`: Files are created with this options.
-    ///
-    /// `dir_options`: Directories are created with this options.
-    ///
-    /// `apply_interval`: Commit journal after `apply_interval` seconds.
-    ///
-    /// `load_rrd_cb`; The callback function is used to load RRD files,
-    /// and should return a newly generated RRD if the file does not
-    /// exists (or is unreadable). This may generate RRDs with
-    /// different configurations (dependent on `rel_path`).
-    pub fn new<P: AsRef<Path>>(
-        basedir: P,
-        file_options: Option<CreateOptions>,
-        dir_options: Option<CreateOptions>,
-        apply_interval: f64,
-        load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
-    ) -> Result<Self, Error> {
-        let basedir = basedir.as_ref().to_owned();
-
-        let file_options = file_options.unwrap_or_else(CreateOptions::new);
-        let dir_options = dir_options.unwrap_or_else(CreateOptions::new);
-
-        create_path(
-            &basedir,
-            Some(dir_options.clone()),
-            Some(dir_options.clone()),
-        )
-        .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
-
-        let config = Arc::new(CacheConfig {
-            basedir,
-            file_options,
-            dir_options,
-            apply_interval,
-        });
-
-        let state = JournalState::new(Arc::clone(&config))?;
-        let rrd_map = RRDMap::new(Arc::clone(&config), load_rrd_cb);
-
-        Ok(Self {
-            config: Arc::clone(&config),
-            state: Arc::new(RwLock::new(state)),
-            rrd_map: Arc::new(RwLock::new(rrd_map)),
-        })
-    }
-
-    /// Create a new RRD as used by the proxmox backup server
-    ///
-    /// It contains the following RRAs:
-    ///
-    /// * cf=average,r=60,n=1440 => 1day
-    /// * cf=maximum,r=60,n=1440 => 1day
-    /// * cf=average,r=30*60,n=1440 => 1month
-    /// * cf=maximum,r=30*60,n=1440 => 1month
-    /// * cf=average,r=6*3600,n=1440 => 1year
-    /// * cf=maximum,r=6*3600,n=1440 => 1year
-    /// * cf=average,r=7*86400,n=570 => 10years
-    /// * cf=maximum,r=7*86400,n=570 => 10year
-    ///
-    /// The resulting data file size is about 80KB.
-    pub fn create_proxmox_backup_default_rrd(dst: DST) -> RRD {
-        let rra_list = vec![
-            // 1 min * 1440 => 1 day
-            RRA::new(CF::Average, 60, 1440),
-            RRA::new(CF::Maximum, 60, 1440),
-            // 30 min * 1440 => 30 days ~ 1 month
-            RRA::new(CF::Average, 30 * 60, 1440),
-            RRA::new(CF::Maximum, 30 * 60, 1440),
-            // 6 h * 1440 => 360 days ~ 1 year
-            RRA::new(CF::Average, 6 * 3600, 1440),
-            RRA::new(CF::Maximum, 6 * 3600, 1440),
-            // 1 week * 570 => 10 years
-            RRA::new(CF::Average, 7 * 86400, 570),
-            RRA::new(CF::Maximum, 7 * 86400, 570),
-        ];
-
-        RRD::new(dst, rra_list)
-    }
-
-    /// Sync the journal data to disk (using `fdatasync` syscall)
-    pub fn sync_journal(&self) -> Result<(), Error> {
-        self.state.read().unwrap().sync_journal()
-    }
-
-    /// Apply and commit the journal. Should be used at server startup.
-    pub fn apply_journal(&self) -> Result<bool, Error> {
-        let config = Arc::clone(&self.config);
-        let state = Arc::clone(&self.state);
-        let rrd_map = Arc::clone(&self.rrd_map);
-
-        let mut state_guard = self.state.write().unwrap();
-        let journal_applied = state_guard.journal_applied;
-
-        if let Some(ref recv) = state_guard.apply_thread_result {
-            match recv.try_recv() {
-                Ok(Ok(())) => {
-                    // finished without errors, OK
-                    state_guard.apply_thread_result = None;
-                }
-                Ok(Err(err)) => {
-                    // finished with errors, log them
-                    log::error!("{}", err);
-                    state_guard.apply_thread_result = None;
-                }
-                Err(TryRecvError::Empty) => {
-                    // still running
-                    return Ok(journal_applied);
-                }
-                Err(TryRecvError::Disconnected) => {
-                    // crashed, start again
-                    log::error!("apply journal thread crashed - try again");
-                    state_guard.apply_thread_result = None;
-                }
-            }
-        }
-
-        let now = proxmox_time::epoch_f64();
-        let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval;
-
-        if journal_applied && !wants_commit {
-            return Ok(journal_applied);
-        }
-
-        state_guard.last_journal_flush = proxmox_time::epoch_f64();
-
-        let (sender, receiver) = bounded(1);
-        state_guard.apply_thread_result = Some(receiver);
-
-        spawn(move || {
-            let result = apply_and_commit_journal_thread(config, state, rrd_map, journal_applied)
-                .map_err(|err| err.to_string());
-            sender.send(result).unwrap();
-        });
-
-        Ok(journal_applied)
-    }
-
-    /// Update data in RAM and write file back to disk (journal)
-    pub fn update_value(
-        &self,
-        rel_path: &str,
-        time: f64,
-        value: f64,
-        dst: DST,
-    ) -> Result<(), Error> {
-        let journal_applied = self.apply_journal()?;
-
-        self.state
-            .write()
-            .unwrap()
-            .append_journal_entry(time, value, dst, rel_path)?;
-
-        if journal_applied {
-            self.rrd_map
-                .write()
-                .unwrap()
-                .update(rel_path, time, value, dst, false)?;
-        }
-
-        Ok(())
-    }
-
-    /// Extract data from cached RRD
-    ///
-    /// `start`: Start time. If not specified, we simply extract 10 data points.
-    ///
-    /// `end`: End time. Default is to use the current time.
-    pub fn extract_cached_data(
-        &self,
-        base: &str,
-        name: &str,
-        cf: CF,
-        resolution: u64,
-        start: Option<u64>,
-        end: Option<u64>,
-    ) -> Result<Option<Entry>, Error> {
-        self.rrd_map
-            .read()
-            .unwrap()
-            .extract_cached_data(base, name, cf, resolution, start, end)
-    }
-}
-
-fn apply_and_commit_journal_thread(
-    config: Arc<CacheConfig>,
-    state: Arc<RwLock<JournalState>>,
-    rrd_map: Arc<RwLock<RRDMap>>,
-    commit_only: bool,
-) -> Result<(), Error> {
-    if commit_only {
-        state.write().unwrap().rotate_journal()?; // start new journal, keep old one
-    } else {
-        let start_time = SystemTime::now();
-        log::debug!("applying rrd journal");
-
-        match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) {
-            Ok(entries) => {
-                let elapsed = start_time.elapsed().unwrap().as_secs_f64();
-                log::info!(
-                    "applied rrd journal ({} entries in {:.3} seconds)",
-                    entries,
-                    elapsed
-                );
-            }
-            Err(err) => bail!("apply rrd journal failed - {}", err),
-        }
-    }
-
-    let start_time = SystemTime::now();
-    log::debug!("commit rrd journal");
-
-    match commit_journal_impl(config, state, rrd_map) {
-        Ok(rrd_file_count) => {
-            let elapsed = start_time.elapsed().unwrap().as_secs_f64();
-            log::info!(
-                "rrd journal successfully committed ({} files in {:.3} seconds)",
-                rrd_file_count,
-                elapsed
-            );
-        }
-        Err(err) => bail!("rrd journal commit failed: {}", err),
-    }
-    Ok(())
-}
-
-fn apply_journal_lines(
-    state: Arc<RwLock<JournalState>>,
-    rrd_map: Arc<RwLock<RRDMap>>,
-    journal_name: &str, // used for logging
-    reader: &mut BufReader<File>,
-    lock_read_line: bool,
-) -> Result<usize, Error> {
-    let mut linenr = 0;
-
-    loop {
-        linenr += 1;
-        let mut line = String::new();
-        let len = if lock_read_line {
-            let _lock = state.read().unwrap(); // make sure we read entire lines
-            reader.read_line(&mut line)?
-        } else {
-            reader.read_line(&mut line)?
-        };
-
-        if len == 0 {
-            break;
-        }
-
-        let entry: JournalEntry = match line.parse() {
-            Ok(entry) => entry,
-            Err(err) => {
-                log::warn!(
-                    "unable to parse rrd journal '{}' line {} (skip) - {}",
-                    journal_name,
-                    linenr,
-                    err,
-                );
-                continue; // skip unparsable lines
-            }
-        };
-
-        rrd_map.write().unwrap().update(
-            &entry.rel_path,
-            entry.time,
-            entry.value,
-            entry.dst,
-            true,
-        )?;
-    }
-    Ok(linenr)
-}
-
-fn apply_journal_impl(
-    state: Arc<RwLock<JournalState>>,
-    rrd_map: Arc<RwLock<RRDMap>>,
-) -> Result<usize, Error> {
-    let mut lines = 0;
-
-    // Apply old journals first
-    let journal_list = state.read().unwrap().list_old_journals()?;
-
-    for entry in journal_list {
-        log::info!("apply old journal log {}", entry.name);
-        let file = std::fs::OpenOptions::new().read(true).open(&entry.path)?;
-        let mut reader = BufReader::new(file);
-        lines += apply_journal_lines(
-            Arc::clone(&state),
-            Arc::clone(&rrd_map),
-            &entry.name,
-            &mut reader,
-            false,
-        )?;
-    }
-
-    let mut journal = state.read().unwrap().open_journal_reader()?;
-
-    lines += apply_journal_lines(
-        Arc::clone(&state),
-        Arc::clone(&rrd_map),
-        "rrd.journal",
-        &mut journal,
-        true,
-    )?;
-
-    {
-        let mut state_guard = state.write().unwrap(); // block other writers
-
-        lines += apply_journal_lines(
-            Arc::clone(&state),
-            Arc::clone(&rrd_map),
-            "rrd.journal",
-            &mut journal,
-            false,
-        )?;
-
-        state_guard.rotate_journal()?; // start new journal, keep old one
-
-        // We need to apply the journal only once, because further updates
-        // are always directly applied.
-        state_guard.journal_applied = true;
-    }
-
-    Ok(lines)
-}
-
-fn fsync_file_or_dir(path: &Path) -> Result<(), Error> {
-    let file = std::fs::File::open(path)?;
-    nix::unistd::fsync(file.as_raw_fd())?;
-    Ok(())
-}
-
-pub(crate) fn fsync_file_and_parent(path: &Path) -> Result<(), Error> {
-    let file = std::fs::File::open(path)?;
-    nix::unistd::fsync(file.as_raw_fd())?;
-    if let Some(parent) = path.parent() {
-        fsync_file_or_dir(parent)?;
-    }
-    Ok(())
-}
-
-fn rrd_parent_dir(basedir: &Path, rel_path: &str) -> PathBuf {
-    let mut path = basedir.to_owned();
-    let rel_path = Path::new(rel_path);
-    if let Some(parent) = rel_path.parent() {
-        path.push(parent);
-    }
-    path
-}
-
-fn commit_journal_impl(
-    config: Arc<CacheConfig>,
-    state: Arc<RwLock<JournalState>>,
-    rrd_map: Arc<RwLock<RRDMap>>,
-) -> Result<usize, Error> {
-    let files = rrd_map.read().unwrap().file_list();
-
-    let mut rrd_file_count = 0;
-    let mut errors = 0;
-
-    let mut dir_set = BTreeSet::new();
-
-    log::info!("write rrd data back to disk");
-
-    // save all RRDs - we only need a read lock here
-    // Note: no fsync here (we do it afterwards)
-    for rel_path in files.iter() {
-        let parent_dir = rrd_parent_dir(&config.basedir, rel_path);
-        dir_set.insert(parent_dir);
-        rrd_file_count += 1;
-        if let Err(err) = rrd_map.read().unwrap().flush_rrd_file(rel_path) {
-            errors += 1;
-            log::error!("unable to save rrd {}: {}", rel_path, err);
-        }
-    }
-
-    if errors != 0 {
-        bail!("errors during rrd flush - unable to commit rrd journal");
-    }
-
-    // Important: We fsync files after writing all data! This increase
-    // the likelihood that files are already synced, so this is
-    // much faster (although we need to re-open the files).
-
-    log::info!("starting rrd data sync");
-
-    for rel_path in files.iter() {
-        let mut path = config.basedir.clone();
-        path.push(rel_path);
-        fsync_file_or_dir(&path)
-            .map_err(|err| format_err!("fsync rrd file {} failed - {}", rel_path, err))?;
-    }
-
-    // also fsync directories
-    for dir_path in dir_set {
-        fsync_file_or_dir(&dir_path)
-            .map_err(|err| format_err!("fsync rrd dir {:?} failed - {}", dir_path, err))?;
-    }
-
-    // if everything went ok, remove the old journal files
-    state.write().unwrap().remove_old_journals()?;
-
-    Ok(rrd_file_count)
-}
diff --git a/proxmox-rrd/src/cache/journal.rs b/proxmox-rrd/src/cache/journal.rs
deleted file mode 100644 (file)
index 7c260e1..0000000
+++ /dev/null
@@ -1,200 +0,0 @@
-use std::ffi::OsStr;
-use std::fs::File;
-use std::io::{BufReader, Write};
-use std::os::unix::io::AsRawFd;
-use std::path::PathBuf;
-use std::str::FromStr;
-use std::sync::Arc;
-
-use anyhow::{bail, format_err, Error};
-use crossbeam_channel::Receiver;
-use nix::fcntl::OFlag;
-
-use proxmox_sys::fs::atomic_open_or_create_file;
-
-const RRD_JOURNAL_NAME: &str = "rrd.journal";
-
-use crate::cache::CacheConfig;
-use crate::rrd::DST;
-
-// shared state behind RwLock
-pub struct JournalState {
-    config: Arc<CacheConfig>,
-    journal: File,
-    pub last_journal_flush: f64,
-    pub journal_applied: bool,
-    pub apply_thread_result: Option<Receiver<Result<(), String>>>,
-}
-
-pub struct JournalEntry {
-    pub time: f64,
-    pub value: f64,
-    pub dst: DST,
-    pub rel_path: String,
-}
-
-impl FromStr for JournalEntry {
-    type Err = Error;
-
-    fn from_str(line: &str) -> Result<Self, Self::Err> {
-        let line = line.trim();
-
-        let parts: Vec<&str> = line.splitn(4, ':').collect();
-        if parts.len() != 4 {
-            bail!("wrong numper of components");
-        }
-
-        let time: f64 = parts[0]
-            .parse()
-            .map_err(|_| format_err!("unable to parse time"))?;
-        let value: f64 = parts[1]
-            .parse()
-            .map_err(|_| format_err!("unable to parse value"))?;
-        let dst: u8 = parts[2]
-            .parse()
-            .map_err(|_| format_err!("unable to parse data source type"))?;
-
-        let dst = match dst {
-            0 => DST::Gauge,
-            1 => DST::Derive,
-            _ => bail!("got strange value for data source type '{}'", dst),
-        };
-
-        let rel_path = parts[3].to_string();
-
-        Ok(JournalEntry {
-            time,
-            value,
-            dst,
-            rel_path,
-        })
-    }
-}
-
-pub struct JournalFileInfo {
-    pub time: u64,
-    pub name: String,
-    pub path: PathBuf,
-}
-
-impl JournalState {
-    pub(crate) fn new(config: Arc<CacheConfig>) -> Result<Self, Error> {
-        let journal = JournalState::open_journal_writer(&config)?;
-        Ok(Self {
-            config,
-            journal,
-            last_journal_flush: 0.0,
-            journal_applied: false,
-            apply_thread_result: None,
-        })
-    }
-
-    pub fn sync_journal(&self) -> Result<(), Error> {
-        nix::unistd::fdatasync(self.journal.as_raw_fd())?;
-        Ok(())
-    }
-
-    pub fn append_journal_entry(
-        &mut self,
-        time: f64,
-        value: f64,
-        dst: DST,
-        rel_path: &str,
-    ) -> Result<(), Error> {
-        let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
-        self.journal.write_all(journal_entry.as_bytes())?;
-        Ok(())
-    }
-
-    pub fn open_journal_reader(&self) -> Result<BufReader<File>, Error> {
-        // fixme : dup self.journal instead??
-        let mut journal_path = self.config.basedir.clone();
-        journal_path.push(RRD_JOURNAL_NAME);
-
-        let flags = OFlag::O_CLOEXEC | OFlag::O_RDONLY;
-        let journal = atomic_open_or_create_file(
-            &journal_path,
-            flags,
-            &[],
-            self.config.file_options.clone(),
-            false,
-        )?;
-        Ok(BufReader::new(journal))
-    }
-
-    fn open_journal_writer(config: &CacheConfig) -> Result<File, Error> {
-        let mut journal_path = config.basedir.clone();
-        journal_path.push(RRD_JOURNAL_NAME);
-
-        let flags = OFlag::O_CLOEXEC | OFlag::O_WRONLY | OFlag::O_APPEND;
-        let journal = atomic_open_or_create_file(
-            &journal_path,
-            flags,
-            &[],
-            config.file_options.clone(),
-            false,
-        )?;
-        Ok(journal)
-    }
-
-    pub fn rotate_journal(&mut self) -> Result<(), Error> {
-        let mut journal_path = self.config.basedir.clone();
-        journal_path.push(RRD_JOURNAL_NAME);
-
-        let mut new_name = journal_path.clone();
-        let now = proxmox_time::epoch_i64();
-        new_name.set_extension(format!("journal-{:08x}", now));
-        std::fs::rename(journal_path, &new_name)?;
-
-        self.journal = Self::open_journal_writer(&self.config)?;
-
-        // make sure the old journal data landed on the disk
-        super::fsync_file_and_parent(&new_name)?;
-
-        Ok(())
-    }
-
-    pub fn remove_old_journals(&self) -> Result<(), Error> {
-        let journal_list = self.list_old_journals()?;
-
-        for entry in journal_list {
-            std::fs::remove_file(entry.path)?;
-        }
-
-        Ok(())
-    }
-
-    pub fn list_old_journals(&self) -> Result<Vec<JournalFileInfo>, Error> {
-        let mut list = Vec::new();
-        for entry in std::fs::read_dir(&self.config.basedir)? {
-            let entry = entry?;
-            let path = entry.path();
-
-            if !path.is_file() {
-                continue;
-            }
-
-            match path.file_stem() {
-                None => continue,
-                Some(stem) if stem != OsStr::new("rrd") => continue,
-                Some(_) => (),
-            }
-
-            if let Some(extension) = path.extension() {
-                if let Some(extension) = extension.to_str() {
-                    if let Some(rest) = extension.strip_prefix("journal-") {
-                        if let Ok(time) = u64::from_str_radix(rest, 16) {
-                            list.push(JournalFileInfo {
-                                time,
-                                name: format!("rrd.{}", extension),
-                                path: path.to_owned(),
-                            });
-                        }
-                    }
-                }
-            }
-        }
-        list.sort_unstable_by_key(|entry| entry.time);
-        Ok(list)
-    }
-}
diff --git a/proxmox-rrd/src/cache/rrd_map.rs b/proxmox-rrd/src/cache/rrd_map.rs
deleted file mode 100644 (file)
index f907d35..0000000
+++ /dev/null
@@ -1,97 +0,0 @@
-use std::collections::HashMap;
-use std::path::Path;
-use std::sync::Arc;
-
-use anyhow::{bail, Error};
-
-use proxmox_sys::fs::create_path;
-
-use crate::rrd::{CF, DST, RRD};
-
-use super::CacheConfig;
-use crate::Entry;
-
-pub struct RRDMap {
-    config: Arc<CacheConfig>,
-    map: HashMap<String, RRD>,
-    load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
-}
-
-impl RRDMap {
-    pub(crate) fn new(
-        config: Arc<CacheConfig>,
-        load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
-    ) -> Self {
-        Self {
-            config,
-            map: HashMap::new(),
-            load_rrd_cb,
-        }
-    }
-
-    pub fn update(
-        &mut self,
-        rel_path: &str,
-        time: f64,
-        value: f64,
-        dst: DST,
-        new_only: bool,
-    ) -> Result<(), Error> {
-        if let Some(rrd) = self.map.get_mut(rel_path) {
-            if !new_only || time > rrd.last_update() {
-                rrd.update(time, value);
-            }
-        } else {
-            let mut path = self.config.basedir.clone();
-            path.push(rel_path);
-            create_path(
-                path.parent().unwrap(),
-                Some(self.config.dir_options.clone()),
-                Some(self.config.dir_options.clone()),
-            )?;
-
-            let mut rrd = (self.load_rrd_cb)(&path, rel_path, dst);
-
-            if !new_only || time > rrd.last_update() {
-                rrd.update(time, value);
-            }
-            self.map.insert(rel_path.to_string(), rrd);
-        }
-        Ok(())
-    }
-
-    pub fn file_list(&self) -> Vec<String> {
-        let mut list = Vec::new();
-
-        for rel_path in self.map.keys() {
-            list.push(rel_path.clone());
-        }
-
-        list
-    }
-
-    pub fn flush_rrd_file(&self, rel_path: &str) -> Result<(), Error> {
-        if let Some(rrd) = self.map.get(rel_path) {
-            let mut path = self.config.basedir.clone();
-            path.push(rel_path);
-            rrd.save(&path, self.config.file_options.clone(), true)
-        } else {
-            bail!("rrd file {} not loaded", rel_path);
-        }
-    }
-
-    pub fn extract_cached_data(
-        &self,
-        base: &str,
-        name: &str,
-        cf: CF,
-        resolution: u64,
-        start: Option<u64>,
-        end: Option<u64>,
-    ) -> Result<Option<Entry>, Error> {
-        match self.map.get(&format!("{}/{}", base, name)) {
-            Some(rrd) => Ok(Some(rrd.extract_data(cf, resolution, start, end)?)),
-            None => Ok(None),
-        }
-    }
-}
diff --git a/proxmox-rrd/src/lib.rs b/proxmox-rrd/src/lib.rs
deleted file mode 100644 (file)
index 80b3943..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-//! # Round Robin Database files
-//!
-//! ## Features
-//!
-//! * One file stores a single data source
-//! * Stores data for different time resolution
-//! * Simple cache implementation with journal support
-
-mod rrd_v1;
-
-pub mod rrd;
-#[doc(inline)]
-pub use rrd::Entry;
-
-mod cache;
-pub use cache::*;
diff --git a/proxmox-rrd/src/rrd.rs b/proxmox-rrd/src/rrd.rs
deleted file mode 100644 (file)
index 0b8ac46..0000000
+++ /dev/null
@@ -1,694 +0,0 @@
-//! # Proxmox RRD format version 2
-//!
-//! The new format uses
-//! [CBOR](https://datatracker.ietf.org/doc/html/rfc8949) as storage
-//! format. This way we can use the serde serialization framework,
-//! which make our code more flexible, much nicer and type safe.
-//!
-//! ## Features
-//!
-//! * Well defined data format [CBOR](https://datatracker.ietf.org/doc/html/rfc8949)
-//! * Platform independent (big endian f64, hopefully a standard format?)
-//! * Arbitrary number of RRAs (dynamically changeable)
-
-use std::io::{Read, Write};
-use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
-use std::path::Path;
-
-use anyhow::{bail, format_err, Error};
-use serde::{Deserialize, Serialize};
-
-use proxmox_schema::api;
-use proxmox_sys::fs::{make_tmp_file, CreateOptions};
-
-use crate::rrd_v1;
-
-/// Proxmox RRD v2 file magic number
-// openssl::sha::sha256(b"Proxmox Round Robin Database file v2.0")[0..8];
-pub const PROXMOX_RRD_MAGIC_2_0: [u8; 8] = [224, 200, 228, 27, 239, 112, 122, 159];
-
-#[api()]
-#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, Eq)]
-#[serde(rename_all = "kebab-case")]
-/// RRD data source type
-pub enum DST {
-    /// Gauge values are stored unmodified.
-    Gauge,
-    /// Stores the difference to the previous value.
-    Derive,
-    /// Stores the difference to the previous value (like Derive), but
-    /// detect counter overflow (and ignores that value)
-    Counter,
-}
-
-#[api()]
-#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, Eq)]
-#[serde(rename_all = "kebab-case")]
-/// Consolidation function
-pub enum CF {
-    /// Average
-    Average,
-    /// Maximum
-    Maximum,
-    /// Minimum
-    Minimum,
-    /// Use the last value
-    Last,
-}
-
-#[derive(Serialize, Deserialize)]
-/// Data source specification
-pub struct DataSource {
-    /// Data source type
-    pub dst: DST,
-    /// Last update time (epoch)
-    pub last_update: f64,
-    /// Stores the last value, used to compute differential value for
-    /// derive/counters
-    pub last_value: f64,
-}
-
-/// An RRD entry.
-///
-/// Serializes as a tuple.
-#[derive(Clone, Debug, Deserialize, Serialize)]
-#[serde(
-    from = "(u64, u64, Vec<Option<f64>>)",
-    into = "(u64, u64, Vec<Option<f64>>)"
-)]
-pub struct Entry {
-    pub start: u64,
-    pub resolution: u64,
-    pub data: Vec<Option<f64>>,
-}
-
-impl Entry {
-    pub const fn new(start: u64, resolution: u64, data: Vec<Option<f64>>) -> Self {
-        Self {
-            start,
-            resolution,
-            data,
-        }
-    }
-
-    /// Get a data point at a specific index which also does bound checking and returns `None` for
-    /// out of bounds indices.
-    pub fn get(&self, idx: usize) -> Option<f64> {
-        self.data.get(idx).copied().flatten()
-    }
-}
-
-impl From<Entry> for (u64, u64, Vec<Option<f64>>) {
-    fn from(entry: Entry) -> (u64, u64, Vec<Option<f64>>) {
-        (entry.start, entry.resolution, entry.data)
-    }
-}
-
-impl From<(u64, u64, Vec<Option<f64>>)> for Entry {
-    fn from(data: (u64, u64, Vec<Option<f64>>)) -> Self {
-        Self::new(data.0, data.1, data.2)
-    }
-}
-
-impl DataSource {
-    /// Create a new Instance
-    pub fn new(dst: DST) -> Self {
-        Self {
-            dst,
-            last_update: 0.0,
-            last_value: f64::NAN,
-        }
-    }
-
-    fn compute_new_value(&mut self, time: f64, mut value: f64) -> Result<f64, Error> {
-        if time < 0.0 {
-            bail!("got negative time");
-        }
-        if time <= self.last_update {
-            bail!("time in past ({} < {})", time, self.last_update);
-        }
-
-        if value.is_nan() {
-            bail!("new value is NAN");
-        }
-
-        // derive counter value
-        let is_counter = self.dst == DST::Counter;
-
-        if is_counter || self.dst == DST::Derive {
-            let time_diff = time - self.last_update;
-
-            let diff = if self.last_value.is_nan() {
-                0.0
-            } else if is_counter && value < 0.0 {
-                bail!("got negative value for counter");
-            } else if is_counter && value < self.last_value {
-                // Note: We do not try automatic overflow corrections, but
-                // we update last_value anyways, so that we can compute the diff
-                // next time.
-                self.last_value = value;
-                bail!("counter overflow/reset detected");
-            } else {
-                value - self.last_value
-            };
-            self.last_value = value;
-            value = diff / time_diff;
-        } else {
-            self.last_value = value;
-        }
-
-        Ok(value)
-    }
-}
-
-#[derive(Serialize, Deserialize)]
-/// Round Robin Archive
-pub struct RRA {
-    /// Number of seconds spaned by a single data entry.
-    pub resolution: u64,
-    /// Consolitation function.
-    pub cf: CF,
-    /// Count values computed inside this update interval.
-    pub last_count: u64,
-    /// The actual data entries.
-    pub data: Vec<f64>,
-}
-
-impl RRA {
-    /// Creates a new instance
-    pub fn new(cf: CF, resolution: u64, points: usize) -> Self {
-        Self {
-            cf,
-            resolution,
-            last_count: 0,
-            data: vec![f64::NAN; points],
-        }
-    }
-
-    /// Data slot end time
-    pub fn slot_end_time(&self, time: u64) -> u64 {
-        self.resolution * (time / self.resolution + 1)
-    }
-
-    /// Data slot start time
-    pub fn slot_start_time(&self, time: u64) -> u64 {
-        self.resolution * (time / self.resolution)
-    }
-
-    /// Data slot index
-    pub fn slot(&self, time: u64) -> usize {
-        ((time / self.resolution) as usize) % self.data.len()
-    }
-
-    /// Directly overwrite data slots.
-    ///
-    /// The caller need to set `last_update` value on the [DataSource] manually.
-    pub fn insert_data(
-        &mut self,
-        start: u64,
-        resolution: u64,
-        data: Vec<Option<f64>>,
-    ) -> Result<(), Error> {
-        if resolution != self.resolution {
-            bail!("inser_data failed: got wrong resolution");
-        }
-
-        let mut index = self.slot(start);
-
-        for item in data {
-            if let Some(v) = item {
-                self.data[index] = v;
-            }
-            index += 1;
-            if index >= self.data.len() {
-                index = 0;
-            }
-        }
-        Ok(())
-    }
-
-    fn delete_old_slots(&mut self, time: f64, last_update: f64) {
-        let epoch = time as u64;
-        let last_update = last_update as u64;
-        let reso = self.resolution;
-        let num_entries = self.data.len() as u64;
-
-        let min_time = epoch.saturating_sub(num_entries * reso);
-        let min_time = self.slot_end_time(min_time);
-
-        let mut t = last_update.saturating_sub(num_entries * reso);
-        let mut index = self.slot(t);
-
-        for _ in 0..num_entries {
-            t += reso;
-            index += 1;
-            if index >= self.data.len() {
-                index = 0;
-            }
-            if t < min_time {
-                self.data[index] = f64::NAN;
-            } else {
-                break;
-            }
-        }
-    }
-
-    fn compute_new_value(&mut self, time: f64, last_update: f64, value: f64) {
-        let epoch = time as u64;
-        let last_update = last_update as u64;
-        let reso = self.resolution;
-
-        let index = self.slot(epoch);
-        let last_index = self.slot(last_update);
-
-        if (epoch - last_update) > reso || index != last_index {
-            self.last_count = 0;
-        }
-
-        let last_value = self.data[index];
-        if last_value.is_nan() {
-            self.last_count = 0;
-        }
-
-        let new_count = self.last_count.saturating_add(1);
-
-        if self.last_count == 0 {
-            self.data[index] = value;
-            self.last_count = 1;
-        } else {
-            let new_value = match self.cf {
-                CF::Maximum => {
-                    if last_value > value {
-                        last_value
-                    } else {
-                        value
-                    }
-                }
-                CF::Minimum => {
-                    if last_value < value {
-                        last_value
-                    } else {
-                        value
-                    }
-                }
-                CF::Last => value,
-                CF::Average => {
-                    (last_value * (self.last_count as f64)) / (new_count as f64)
-                        + value / (new_count as f64)
-                }
-            };
-            self.data[index] = new_value;
-            self.last_count = new_count;
-        }
-    }
-
-    /// Extract data
-    ///
-    /// Extract data from `start` to `end`. The RRA itself does not
-    /// store the `last_update` time, so you need to pass this a
-    /// parameter (see [DataSource]).
-    pub fn extract_data(&self, start: u64, end: u64, last_update: f64) -> Entry {
-        let last_update = last_update as u64;
-        let reso = self.resolution;
-        let num_entries = self.data.len() as u64;
-
-        let mut list = Vec::new();
-
-        let rrd_end = self.slot_end_time(last_update);
-        let rrd_start = rrd_end.saturating_sub(reso * num_entries);
-
-        let mut t = start;
-        let mut index = self.slot(t);
-        for _ in 0..num_entries {
-            if t > end {
-                break;
-            };
-            if t < rrd_start || t >= rrd_end {
-                list.push(None);
-            } else {
-                let value = self.data[index];
-                if value.is_nan() {
-                    list.push(None);
-                } else {
-                    list.push(Some(value));
-                }
-            }
-            t += reso;
-            index += 1;
-            if index >= self.data.len() {
-                index = 0;
-            }
-        }
-
-        Entry::new(start, reso, list)
-    }
-}
-
-#[derive(Serialize, Deserialize)]
-/// Round Robin Database
-pub struct RRD {
-    /// The data source definition
-    pub source: DataSource,
-    /// List of round robin archives
-    pub rra_list: Vec<RRA>,
-}
-
-impl RRD {
-    /// Creates a new Instance
-    pub fn new(dst: DST, rra_list: Vec<RRA>) -> RRD {
-        let source = DataSource::new(dst);
-
-        RRD { source, rra_list }
-    }
-
-    fn from_raw(raw: &[u8]) -> Result<Self, Error> {
-        if raw.len() < 8 {
-            bail!("not an rrd file - file is too small ({})", raw.len());
-        }
-
-        let rrd = if raw[0..8] == rrd_v1::PROXMOX_RRD_MAGIC_1_0 {
-            let v1 = rrd_v1::RRDv1::from_raw(raw)?;
-            v1.to_rrd_v2()
-                .map_err(|err| format_err!("unable to convert from old V1 format - {}", err))?
-        } else if raw[0..8] == PROXMOX_RRD_MAGIC_2_0 {
-            serde_cbor::from_slice(&raw[8..])
-                .map_err(|err| format_err!("unable to decode RRD file - {}", err))?
-        } else {
-            bail!("not an rrd file - unknown magic number");
-        };
-
-        if rrd.source.last_update < 0.0 {
-            bail!("rrd file has negative last_update time");
-        }
-
-        Ok(rrd)
-    }
-
-    /// Load data from a file
-    ///
-    /// Setting `avoid_page_cache` uses
-    /// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in
-    /// the linux page cache.
-    pub fn load(path: &Path, avoid_page_cache: bool) -> Result<Self, std::io::Error> {
-        let mut file = std::fs::File::open(path)?;
-        let buffer_size = file.metadata().map(|m| m.len() as usize + 1).unwrap_or(0);
-        let mut raw = Vec::with_capacity(buffer_size);
-        file.read_to_end(&mut raw)?;
-
-        if avoid_page_cache {
-            nix::fcntl::posix_fadvise(
-                file.as_raw_fd(),
-                0,
-                buffer_size as i64,
-                nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED,
-            )
-            .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?;
-        }
-
-        match Self::from_raw(&raw) {
-            Ok(rrd) => Ok(rrd),
-            Err(err) => Err(std::io::Error::new(
-                std::io::ErrorKind::Other,
-                err.to_string(),
-            )),
-        }
-    }
-
-    /// Store data into a file (atomic replace file)
-    ///
-    /// Setting `avoid_page_cache` uses
-    /// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in
-    /// the linux page cache.
-    pub fn save(
-        &self,
-        path: &Path,
-        options: CreateOptions,
-        avoid_page_cache: bool,
-    ) -> Result<(), Error> {
-        let (fd, tmp_path) = make_tmp_file(path, options)?;
-        let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) };
-
-        let mut try_block = || -> Result<(), Error> {
-            let mut data: Vec<u8> = Vec::new();
-            data.extend(PROXMOX_RRD_MAGIC_2_0);
-            serde_cbor::to_writer(&mut data, self)?;
-            file.write_all(&data)?;
-
-            if avoid_page_cache {
-                nix::fcntl::posix_fadvise(
-                    file.as_raw_fd(),
-                    0,
-                    data.len() as i64,
-                    nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED,
-                )?;
-            }
-
-            Ok(())
-        };
-
-        match try_block() {
-            Ok(()) => (),
-            error => {
-                let _ = nix::unistd::unlink(&tmp_path);
-                return error;
-            }
-        }
-
-        if let Err(err) = std::fs::rename(&tmp_path, path) {
-            let _ = nix::unistd::unlink(&tmp_path);
-            bail!("Atomic rename failed - {}", err);
-        }
-
-        Ok(())
-    }
-
-    /// Returns the last update time.
-    pub fn last_update(&self) -> f64 {
-        self.source.last_update
-    }
-
-    /// Update the value (in memory)
-    ///
-    /// Note: This does not call [Self::save].
-    pub fn update(&mut self, time: f64, value: f64) {
-        let value = match self.source.compute_new_value(time, value) {
-            Ok(value) => value,
-            Err(err) => {
-                log::error!("rrd update failed: {}", err);
-                return;
-            }
-        };
-
-        let last_update = self.source.last_update;
-        self.source.last_update = time;
-
-        for rra in self.rra_list.iter_mut() {
-            rra.delete_old_slots(time, last_update);
-            rra.compute_new_value(time, last_update, value);
-        }
-    }
-
-    /// Extract data from the archive
-    ///
-    /// This selects the RRA with specified [CF] and (minimum)
-    /// resolution, and extract data from `start` to `end`.
-    ///
-    /// `start`: Start time. If not specified, we simply extract 10 data points.
-    /// `end`: End time. Default is to use the current time.
-    pub fn extract_data(
-        &self,
-        cf: CF,
-        resolution: u64,
-        start: Option<u64>,
-        end: Option<u64>,
-    ) -> Result<Entry, Error> {
-        let mut rra: Option<&RRA> = None;
-        for item in self.rra_list.iter() {
-            if item.cf != cf {
-                continue;
-            }
-            if item.resolution > resolution {
-                continue;
-            }
-
-            if let Some(current) = rra {
-                if item.resolution > current.resolution {
-                    rra = Some(item);
-                }
-            } else {
-                rra = Some(item);
-            }
-        }
-
-        match rra {
-            Some(rra) => {
-                let end = end.unwrap_or_else(|| proxmox_time::epoch_f64() as u64);
-                let start = start.unwrap_or_else(|| end.saturating_sub(10 * rra.resolution));
-                Ok(rra.extract_data(start, end, self.source.last_update))
-            }
-            None => bail!("unable to find RRA suitable ({:?}:{})", cf, resolution),
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn basic_rra_maximum_gauge_test() -> Result<(), Error> {
-        let rra = RRA::new(CF::Maximum, 60, 5);
-        let mut rrd = RRD::new(DST::Gauge, vec![rra]);
-
-        for i in 2..10 {
-            rrd.update((i as f64) * 30.0, i as f64);
-        }
-
-        let Entry {
-            start,
-            resolution,
-            data,
-        } = rrd.extract_data(CF::Maximum, 60, Some(0), Some(5 * 60))?;
-        assert_eq!(start, 0);
-        assert_eq!(resolution, 60);
-        assert_eq!(data, [None, Some(3.0), Some(5.0), Some(7.0), Some(9.0)]);
-
-        Ok(())
-    }
-
-    #[test]
-    fn basic_rra_minimum_gauge_test() -> Result<(), Error> {
-        let rra = RRA::new(CF::Minimum, 60, 5);
-        let mut rrd = RRD::new(DST::Gauge, vec![rra]);
-
-        for i in 2..10 {
-            rrd.update((i as f64) * 30.0, i as f64);
-        }
-
-        let Entry {
-            start,
-            resolution,
-            data,
-        } = rrd.extract_data(CF::Minimum, 60, Some(0), Some(5 * 60))?;
-        assert_eq!(start, 0);
-        assert_eq!(resolution, 60);
-        assert_eq!(data, [None, Some(2.0), Some(4.0), Some(6.0), Some(8.0)]);
-
-        Ok(())
-    }
-
-    #[test]
-    fn basic_rra_last_gauge_test() -> Result<(), Error> {
-        let rra = RRA::new(CF::Last, 60, 5);
-        let mut rrd = RRD::new(DST::Gauge, vec![rra]);
-
-        for i in 2..10 {
-            rrd.update((i as f64) * 30.0, i as f64);
-        }
-
-        assert!(
-            rrd.extract_data(CF::Average, 60, Some(0), Some(5 * 60))
-                .is_err(),
-            "CF::Average should not exist"
-        );
-
-        let Entry {
-            start,
-            resolution,
-            data,
-        } = rrd.extract_data(CF::Last, 60, Some(0), Some(20 * 60))?;
-        assert_eq!(start, 0);
-        assert_eq!(resolution, 60);
-        assert_eq!(data, [None, Some(3.0), Some(5.0), Some(7.0), Some(9.0)]);
-
-        Ok(())
-    }
-
-    #[test]
-    fn basic_rra_average_derive_test() -> Result<(), Error> {
-        let rra = RRA::new(CF::Average, 60, 5);
-        let mut rrd = RRD::new(DST::Derive, vec![rra]);
-
-        for i in 2..10 {
-            rrd.update((i as f64) * 30.0, (i * 60) as f64);
-        }
-
-        let Entry {
-            start,
-            resolution,
-            data,
-        } = rrd.extract_data(CF::Average, 60, Some(60), Some(5 * 60))?;
-        assert_eq!(start, 60);
-        assert_eq!(resolution, 60);
-        assert_eq!(data, [Some(1.0), Some(2.0), Some(2.0), Some(2.0), None]);
-
-        Ok(())
-    }
-
-    #[test]
-    fn basic_rra_average_gauge_test() -> Result<(), Error> {
-        let rra = RRA::new(CF::Average, 60, 5);
-        let mut rrd = RRD::new(DST::Gauge, vec![rra]);
-
-        for i in 2..10 {
-            rrd.update((i as f64) * 30.0, i as f64);
-        }
-
-        let Entry {
-            start,
-            resolution,
-            data,
-        } = rrd.extract_data(CF::Average, 60, Some(60), Some(5 * 60))?;
-        assert_eq!(start, 60);
-        assert_eq!(resolution, 60);
-        assert_eq!(data, [Some(2.5), Some(4.5), Some(6.5), Some(8.5), None]);
-
-        for i in 10..14 {
-            rrd.update((i as f64) * 30.0, i as f64);
-        }
-
-        let Entry {
-            start,
-            resolution,
-            data,
-        } = rrd.extract_data(CF::Average, 60, Some(60), Some(5 * 60))?;
-        assert_eq!(start, 60);
-        assert_eq!(resolution, 60);
-        assert_eq!(data, [None, Some(4.5), Some(6.5), Some(8.5), Some(10.5)]);
-
-        let Entry {
-            start,
-            resolution,
-            data,
-        } = rrd.extract_data(CF::Average, 60, Some(3 * 60), Some(8 * 60))?;
-        assert_eq!(start, 3 * 60);
-        assert_eq!(resolution, 60);
-        assert_eq!(data, [Some(6.5), Some(8.5), Some(10.5), Some(12.5), None]);
-
-        // add much newer value (should delete all previous/outdated value)
-        let i = 100;
-        rrd.update((i as f64) * 30.0, i as f64);
-        println!("TEST {:?}", serde_json::to_string_pretty(&rrd));
-
-        let Entry {
-            start,
-            resolution,
-            data,
-        } = rrd.extract_data(CF::Average, 60, Some(100 * 30), Some(100 * 30 + 5 * 60))?;
-        assert_eq!(start, 100 * 30);
-        assert_eq!(resolution, 60);
-        assert_eq!(data, [Some(100.0), None, None, None, None]);
-
-        // extract with end time smaller than start time
-        let Entry {
-            start,
-            resolution,
-            data,
-        } = rrd.extract_data(CF::Average, 60, Some(100 * 30), Some(60))?;
-        assert_eq!(start, 100 * 30);
-        assert_eq!(resolution, 60);
-        assert_eq!(data, []);
-
-        Ok(())
-    }
-}
diff --git a/proxmox-rrd/src/rrd_v1.rs b/proxmox-rrd/src/rrd_v1.rs
deleted file mode 100644 (file)
index 2f4a25f..0000000
+++ /dev/null
@@ -1,295 +0,0 @@
-use std::io::Read;
-
-use anyhow::Error;
-use bitflags::bitflags;
-
-/// The number of data entries per RRA
-pub const RRD_DATA_ENTRIES: usize = 70;
-
-/// Proxmox RRD file magic number
-// openssl::sha::sha256(b"Proxmox Round Robin Database file v1.0")[0..8];
-pub const PROXMOX_RRD_MAGIC_1_0: [u8; 8] = [206, 46, 26, 212, 172, 158, 5, 186];
-
-use crate::rrd::{DataSource, CF, DST, RRA, RRD};
-
-bitflags! {
-    /// Flags to specify the data source type and consolidation function
-    pub struct RRAFlags: u64 {
-        // Data Source Types
-        const DST_GAUGE  = 1;
-        const DST_DERIVE = 2;
-        const DST_COUNTER = 4;
-        const DST_MASK   = 255; // first 8 bits
-
-        // Consolidation Functions
-        const CF_AVERAGE = 1 << 8;
-        const CF_MAX     = 2 << 8;
-        const CF_MASK    = 255 << 8;
-    }
-}
-
-/// Round Robin Archive with [RRD_DATA_ENTRIES] data slots.
-///
-/// This data structure is used inside [RRD] and directly written to the
-/// RRD files.
-#[repr(C)]
-pub struct RRAv1 {
-    /// Defined the data source type and consolidation function
-    pub flags: RRAFlags,
-    /// Resolution (seconds)
-    pub resolution: u64,
-    /// Last update time (epoch)
-    pub last_update: f64,
-    /// Count values computed inside this update interval
-    pub last_count: u64,
-    /// Stores the last value, used to compute differential value for derive/counters
-    pub counter_value: f64,
-    /// Data slots
-    pub data: [f64; RRD_DATA_ENTRIES],
-}
-
-impl RRAv1 {
-    fn extract_data(&self) -> (u64, u64, Vec<Option<f64>>) {
-        let reso = self.resolution;
-
-        let mut list = Vec::new();
-
-        let rra_end = reso * ((self.last_update as u64) / reso);
-        let rra_start = rra_end - reso * (RRD_DATA_ENTRIES as u64);
-
-        let mut t = rra_start;
-        let mut index = ((t / reso) % (RRD_DATA_ENTRIES as u64)) as usize;
-        for _ in 0..RRD_DATA_ENTRIES {
-            let value = self.data[index];
-            if value.is_nan() {
-                list.push(None);
-            } else {
-                list.push(Some(value));
-            }
-
-            t += reso;
-            index = (index + 1) % RRD_DATA_ENTRIES;
-        }
-
-        (rra_start, reso, list)
-    }
-}
-
-/// Round Robin Database file format with fixed number of [RRA]s
-#[repr(C)]
-// Note: Avoid alignment problems by using 8byte types only
-pub struct RRDv1 {
-    /// The magic number to identify the file type
-    pub magic: [u8; 8],
-    /// Hourly data (average values)
-    pub hour_avg: RRAv1,
-    /// Hourly data (maximum values)
-    pub hour_max: RRAv1,
-    /// Dayly data (average values)
-    pub day_avg: RRAv1,
-    /// Dayly data (maximum values)
-    pub day_max: RRAv1,
-    /// Weekly data (average values)
-    pub week_avg: RRAv1,
-    /// Weekly data (maximum values)
-    pub week_max: RRAv1,
-    /// Monthly data (average values)
-    pub month_avg: RRAv1,
-    /// Monthly data (maximum values)
-    pub month_max: RRAv1,
-    /// Yearly data (average values)
-    pub year_avg: RRAv1,
-    /// Yearly data (maximum values)
-    pub year_max: RRAv1,
-}
-
-impl RRDv1 {
-    pub fn from_raw(mut raw: &[u8]) -> Result<Self, std::io::Error> {
-        let expected_len = std::mem::size_of::<RRDv1>();
-
-        if raw.len() != expected_len {
-            let msg = format!("wrong data size ({} != {})", raw.len(), expected_len);
-            return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
-        }
-
-        let mut rrd: RRDv1 = unsafe { std::mem::zeroed() };
-        unsafe {
-            let rrd_slice =
-                std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len);
-            raw.read_exact(rrd_slice)?;
-        }
-
-        if rrd.magic != PROXMOX_RRD_MAGIC_1_0 {
-            let msg = "wrong magic number".to_string();
-            return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
-        }
-
-        Ok(rrd)
-    }
-
-    pub fn to_rrd_v2(&self) -> Result<RRD, Error> {
-        let mut rra_list = Vec::new();
-
-        // old format v1:
-        //
-        // hour      1 min,   70 points
-        // day      30 min,   70 points
-        // week      3 hours, 70 points
-        // month    12 hours, 70 points
-        // year      1 week,  70 points
-        //
-        // new default for RRD v2:
-        //
-        // day      1 min,      1440 points
-        // month   30 min,      1440 points
-        // year   365 min (6h), 1440 points
-        // decade   1 week,      570 points
-
-        // Linear extrapolation
-        fn extrapolate_data(
-            start: u64,
-            reso: u64,
-            factor: u64,
-            data: Vec<Option<f64>>,
-        ) -> (u64, u64, Vec<Option<f64>>) {
-            let mut new = Vec::new();
-
-            for i in 0..data.len() {
-                let mut next = i + 1;
-                if next >= data.len() {
-                    next = 0
-                };
-                let v = data[i];
-                let v1 = data[next];
-                match (v, v1) {
-                    (Some(v), Some(v1)) => {
-                        let diff = (v1 - v) / (factor as f64);
-                        for j in 0..factor {
-                            new.push(Some(v + diff * (j as f64)));
-                        }
-                    }
-                    (Some(v), None) => {
-                        new.push(Some(v));
-                        for _ in 0..factor - 1 {
-                            new.push(None);
-                        }
-                    }
-                    (None, Some(v1)) => {
-                        for _ in 0..factor - 1 {
-                            new.push(None);
-                        }
-                        new.push(Some(v1));
-                    }
-                    (None, None) => {
-                        for _ in 0..factor {
-                            new.push(None);
-                        }
-                    }
-                }
-            }
-
-            (start, reso / factor, new)
-        }
-
-        // Try to convert to new, higher capacity format
-
-        // compute daily average (merge old self.day_avg and self.hour_avg
-        let mut day_avg = RRA::new(CF::Average, 60, 1440);
-
-        let (start, reso, data) = self.day_avg.extract_data();
-        let (start, reso, data) = extrapolate_data(start, reso, 30, data);
-        day_avg.insert_data(start, reso, data)?;
-
-        let (start, reso, data) = self.hour_avg.extract_data();
-        day_avg.insert_data(start, reso, data)?;
-
-        // compute daily maximum (merge old self.day_max and self.hour_max
-        let mut day_max = RRA::new(CF::Maximum, 60, 1440);
-
-        let (start, reso, data) = self.day_max.extract_data();
-        let (start, reso, data) = extrapolate_data(start, reso, 30, data);
-        day_max.insert_data(start, reso, data)?;
-
-        let (start, reso, data) = self.hour_max.extract_data();
-        day_max.insert_data(start, reso, data)?;
-
-        // compute monthly average (merge old self.month_avg,
-        // self.week_avg and self.day_avg)
-        let mut month_avg = RRA::new(CF::Average, 30 * 60, 1440);
-
-        let (start, reso, data) = self.month_avg.extract_data();
-        let (start, reso, data) = extrapolate_data(start, reso, 24, data);
-        month_avg.insert_data(start, reso, data)?;
-
-        let (start, reso, data) = self.week_avg.extract_data();
-        let (start, reso, data) = extrapolate_data(start, reso, 6, data);
-        month_avg.insert_data(start, reso, data)?;
-
-        let (start, reso, data) = self.day_avg.extract_data();
-        month_avg.insert_data(start, reso, data)?;
-
-        // compute monthly maximum (merge old self.month_max,
-        // self.week_max and self.day_max)
-        let mut month_max = RRA::new(CF::Maximum, 30 * 60, 1440);
-
-        let (start, reso, data) = self.month_max.extract_data();
-        let (start, reso, data) = extrapolate_data(start, reso, 24, data);
-        month_max.insert_data(start, reso, data)?;
-
-        let (start, reso, data) = self.week_max.extract_data();
-        let (start, reso, data) = extrapolate_data(start, reso, 6, data);
-        month_max.insert_data(start, reso, data)?;
-
-        let (start, reso, data) = self.day_max.extract_data();
-        month_max.insert_data(start, reso, data)?;
-
-        // compute yearly average (merge old self.year_avg)
-        let mut year_avg = RRA::new(CF::Average, 6 * 3600, 1440);
-
-        let (start, reso, data) = self.year_avg.extract_data();
-        let (start, reso, data) = extrapolate_data(start, reso, 28, data);
-        year_avg.insert_data(start, reso, data)?;
-
-        // compute yearly maximum (merge old self.year_avg)
-        let mut year_max = RRA::new(CF::Maximum, 6 * 3600, 1440);
-
-        let (start, reso, data) = self.year_max.extract_data();
-        let (start, reso, data) = extrapolate_data(start, reso, 28, data);
-        year_max.insert_data(start, reso, data)?;
-
-        // compute decade average (merge old self.year_avg)
-        let mut decade_avg = RRA::new(CF::Average, 7 * 86400, 570);
-        let (start, reso, data) = self.year_avg.extract_data();
-        decade_avg.insert_data(start, reso, data)?;
-
-        // compute decade maximum (merge old self.year_max)
-        let mut decade_max = RRA::new(CF::Maximum, 7 * 86400, 570);
-        let (start, reso, data) = self.year_max.extract_data();
-        decade_max.insert_data(start, reso, data)?;
-
-        rra_list.push(day_avg);
-        rra_list.push(day_max);
-        rra_list.push(month_avg);
-        rra_list.push(month_max);
-        rra_list.push(year_avg);
-        rra_list.push(year_max);
-        rra_list.push(decade_avg);
-        rra_list.push(decade_max);
-
-        // use values from hour_avg for source (all RRAv1 must have the same config)
-        let dst = if self.hour_avg.flags.contains(RRAFlags::DST_COUNTER) {
-            DST::Counter
-        } else if self.hour_avg.flags.contains(RRAFlags::DST_DERIVE) {
-            DST::Derive
-        } else {
-            DST::Gauge
-        };
-
-        let source = DataSource {
-            dst,
-            last_value: f64::NAN,
-            last_update: self.hour_avg.last_update, // IMPORTANT!
-        };
-        Ok(RRD { source, rra_list })
-    }
-}
diff --git a/proxmox-rrd/tests/file_format_test.rs b/proxmox-rrd/tests/file_format_test.rs
deleted file mode 100644 (file)
index 372a407..0000000
+++ /dev/null
@@ -1,56 +0,0 @@
-use std::path::Path;
-use std::process::Command;
-
-use anyhow::{bail, Error};
-
-use proxmox_rrd::rrd::RRD;
-use proxmox_sys::fs::CreateOptions;
-
-fn compare_file(fn1: &str, fn2: &str) -> Result<(), Error> {
-    let status = Command::new("/usr/bin/cmp")
-        .arg(fn1)
-        .arg(fn2)
-        .status()
-        .expect("failed to execute process");
-
-    if !status.success() {
-        bail!("file compare failed");
-    }
-
-    Ok(())
-}
-
-const RRD_V1_FN: &str = "./tests/testdata/cpu.rrd_v1";
-const RRD_V2_FN: &str = "./tests/testdata/cpu.rrd_v2";
-
-// make sure we can load and convert RRD v1
-#[test]
-fn upgrade_from_rrd_v1() -> Result<(), Error> {
-    let rrd = RRD::load(Path::new(RRD_V1_FN), true)?;
-
-    const RRD_V2_NEW_FN: &str = "./tests/testdata/cpu.rrd_v2.upgraded";
-    let new_path = Path::new(RRD_V2_NEW_FN);
-    rrd.save(new_path, CreateOptions::new(), true)?;
-
-    let result = compare_file(RRD_V2_FN, RRD_V2_NEW_FN);
-    let _ = std::fs::remove_file(RRD_V2_NEW_FN);
-    result?;
-
-    Ok(())
-}
-
-// make sure we can load and save RRD v2
-#[test]
-fn load_and_save_rrd_v2() -> Result<(), Error> {
-    let rrd = RRD::load(Path::new(RRD_V2_FN), true)?;
-
-    const RRD_V2_NEW_FN: &str = "./tests/testdata/cpu.rrd_v2.saved";
-    let new_path = Path::new(RRD_V2_NEW_FN);
-    rrd.save(new_path, CreateOptions::new(), true)?;
-
-    let result = compare_file(RRD_V2_FN, RRD_V2_NEW_FN);
-    let _ = std::fs::remove_file(RRD_V2_NEW_FN);
-    result?;
-
-    Ok(())
-}
diff --git a/proxmox-rrd/tests/testdata/cpu.rrd_v1 b/proxmox-rrd/tests/testdata/cpu.rrd_v1
deleted file mode 100644 (file)
index 99d43d3..0000000
Binary files a/proxmox-rrd/tests/testdata/cpu.rrd_v1 and /dev/null differ
diff --git a/proxmox-rrd/tests/testdata/cpu.rrd_v2 b/proxmox-rrd/tests/testdata/cpu.rrd_v2
deleted file mode 100644 (file)
index 5e4dfc7..0000000
Binary files a/proxmox-rrd/tests/testdata/cpu.rrd_v2 and /dev/null differ