//! * Arbitrary number of RRAs (dynamically changeable)
use std::path::Path;
+use std::io::{Read, Write};
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
use anyhow::{bail, format_err, Error};
-
use serde::{Serialize, Deserialize};
-use proxmox::tools::fs::{replace_file, CreateOptions};
+use proxmox_sys::fs::{make_tmp_file, CreateOptions};
use proxmox_schema::api;
use crate::rrd_v1;
impl DataSource {
+ /// Create a new Instance
pub fn new(dst: DST) -> Self {
Self {
dst,
impl RRA {
+ /// Creates a new instance
pub fn new(cf: CF, resolution: u64, points: usize) -> Self {
Self {
cf,
}
}
+ /// Data slot end time
pub fn slot_end_time(&self, time: u64) -> u64 {
self.resolution * (time / self.resolution + 1)
}
+ /// Data slot start time
pub fn slot_start_time(&self, time: u64) -> u64 {
self.resolution * (time / self.resolution)
}
+ /// Data slot index
pub fn slot(&self, time: u64) -> usize {
((time / self.resolution) as usize) % self.data.len()
}
- // directly overwrite data slots
- // the caller need to set last_update value on the DataSource manually.
+ /// Directly overwrite data slots.
+ ///
+ /// The caller need to set `last_update` value on the [DataSource] manually.
pub fn insert_data(
&mut self,
start: u64,
let num_entries = self.data.len() as u64;
let min_time = epoch.saturating_sub(num_entries*reso);
- let min_time = (min_time/reso + 1)*reso;
- let mut t = last_update.saturating_sub(num_entries*reso);
+ let min_time = self.slot_end_time(min_time);
+ let mut t = last_update.saturating_sub(num_entries*reso);
let mut index = self.slot(t);
for _ in 0..num_entries {
self.last_count = 0;
}
- let new_count = if self.last_count < u64::MAX {
- self.last_count + 1
- } else {
- u64::MAX // should never happen
- };
+ let new_count = self.last_count.saturating_add(1);
if self.last_count == 0 {
self.data[index] = value;
}
}
+ /// Extract data
+ ///
+ /// Extract data from `start` to `end`. The RRA itself does not
+ /// store the `last_update` time, so you need to pass this a
+ /// parameter (see [DataSource]).
pub fn extract_data(
&self,
start: u64,
impl RRD {
+ /// Creates a new Instance
pub fn new(dst: DST, rra_list: Vec<RRA>) -> RRD {
let source = DataSource::new(dst);
}
let rrd = if raw[0..8] == rrd_v1::PROXMOX_RRD_MAGIC_1_0 {
- let v1 = rrd_v1::RRDv1::from_raw(&raw)?;
+ let v1 = rrd_v1::RRDv1::from_raw(raw)?;
v1.to_rrd_v2()
.map_err(|err| format_err!("unable to convert from old V1 format - {}", err))?
} else if raw[0..8] == PROXMOX_RRD_MAGIC_2_0 {
}
/// Load data from a file
- pub fn load(path: &Path) -> Result<Self, std::io::Error> {
- let raw = std::fs::read(path)?;
+ ///
+ /// Setting `avoid_page_cache` uses
+ /// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in
+ /// the linux page cache.
+ pub fn load(path: &Path, avoid_page_cache: bool) -> Result<Self, std::io::Error> {
+
+ let mut file = std::fs::File::open(path)?;
+ let buffer_size = file.metadata().map(|m| m.len() as usize + 1).unwrap_or(0);
+ let mut raw = Vec::with_capacity(buffer_size);
+ file.read_to_end(&mut raw)?;
+
+ if avoid_page_cache {
+ nix::fcntl::posix_fadvise(
+ file.as_raw_fd(),
+ 0,
+ buffer_size as i64,
+ nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED,
+ ).map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?;
+ }
match Self::from_raw(&raw) {
Ok(rrd) => Ok(rrd),
}
/// Store data into a file (atomic replace file)
- pub fn save(&self, filename: &Path, options: CreateOptions) -> Result<(), Error> {
- let mut data: Vec<u8> = Vec::new();
- data.extend(&PROXMOX_RRD_MAGIC_2_0);
- serde_cbor::to_writer(&mut data, self)?;
- replace_file(filename, &data, options)
+ ///
+ /// Setting `avoid_page_cache` uses
+ /// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in
+ /// the linux page cache.
+ pub fn save(
+ &self,
+ path: &Path,
+ options: CreateOptions,
+ avoid_page_cache: bool,
+ ) -> Result<(), Error> {
+
+ let (fd, tmp_path) = make_tmp_file(&path, options)?;
+ let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) };
+
+ let mut try_block = || -> Result<(), Error> {
+ let mut data: Vec<u8> = Vec::new();
+ data.extend(&PROXMOX_RRD_MAGIC_2_0);
+ serde_cbor::to_writer(&mut data, self)?;
+ file.write_all(&data)?;
+
+ if avoid_page_cache {
+ nix::fcntl::posix_fadvise(
+ file.as_raw_fd(),
+ 0,
+ data.len() as i64,
+ nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED,
+ )?;
+ }
+
+ Ok(())
+ };
+
+ match try_block() {
+ Ok(()) => (),
+ error => {
+ let _ = nix::unistd::unlink(&tmp_path);
+ return error;
+ }
+ }
+
+ if let Err(err) = std::fs::rename(&tmp_path, &path) {
+ let _ = nix::unistd::unlink(&tmp_path);
+ bail!("Atomic rename failed - {}", err);
+ }
+
+ Ok(())
}
+ /// Returns the last update time.
pub fn last_update(&self) -> f64 {
self.source.last_update
}
match rra {
Some(rra) => {
let end = end.unwrap_or_else(|| proxmox_time::epoch_f64() as u64);
- let start = start.unwrap_or(end - 10*rra.resolution);
+ let start = start.unwrap_or(end.saturating_sub(10*rra.resolution));
Ok(rra.extract_data(start, end, self.source.last_update))
}
None => bail!("unable to find RRA suitable ({:?}:{})", cf, resolution),