1 //! # Proxmox RRD format version 2
3 //! The new format uses
4 //! [CBOR](https://datatracker.ietf.org/doc/html/rfc8949) as storage
5 //! format. This way we can use the serde serialization framework,
6 //! which make our code more flexible, much nicer and type safe.
10 //! * Well defined data format [CBOR](https://datatracker.ietf.org/doc/html/rfc8949)
11 //! * Plattform independent (big endian f64, hopefully a standard format?)
12 //! * Arbitrary number of RRAs (dynamically changeable)
15 use std
::io
::{Read, Write}
;
16 use std
::os
::unix
::io
::{AsRawFd, FromRawFd, IntoRawFd}
;
18 use anyhow
::{bail, format_err, Error}
;
19 use serde
::{Serialize, Deserialize}
;
21 use proxmox
::tools
::fs
::{make_tmp_file, CreateOptions}
;
22 use proxmox_schema
::api
;
26 /// Proxmox RRD v2 file magic number
27 // openssl::sha::sha256(b"Proxmox Round Robin Database file v2.0")[0..8];
28 pub const PROXMOX_RRD_MAGIC_2_0
: [u8; 8] = [224, 200, 228, 27, 239, 112, 122, 159];
31 #[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq)]
32 #[serde(rename_all = "kebab-case")]
33 /// RRD data source type
35 /// Gauge values are stored unmodified.
37 /// Stores the difference to the previous value.
39 /// Stores the difference to the previous value (like Derive), but
40 /// detect counter overflow (and ignores that value)
45 #[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq)]
46 #[serde(rename_all = "kebab-case")]
47 /// Consolidation function
55 /// Use the last value
59 #[derive(Serialize, Deserialize)]
60 /// Data source specification
61 pub struct DataSource
{
64 /// Last update time (epoch)
66 /// Stores the last value, used to compute differential value for
73 /// Create a new Instance
74 pub fn new(dst
: DST
) -> Self {
82 fn compute_new_value(&mut self, time
: f64, mut value
: f64) -> Result
<f64, Error
> {
84 bail
!("got negative time");
86 if time
<= self.last_update
{
87 bail
!("time in past ({} < {})", time
, self.last_update
);
91 bail
!("new value is NAN");
94 // derive counter value
95 let is_counter
= self.dst
== DST
::Counter
;
97 if is_counter
|| self.dst
== DST
::Derive
{
98 let time_diff
= time
- self.last_update
;
100 let diff
= if self.last_value
.is_nan() {
102 } else if is_counter
&& value
< 0.0 {
103 bail
!("got negative value for counter");
104 } else if is_counter
&& value
< self.last_value
{
105 // Note: We do not try automatic overflow corrections, but
106 // we update last_value anyways, so that we can compute the diff
108 self.last_value
= value
;
109 bail
!("conter overflow/reset detected");
111 value
- self.last_value
113 self.last_value
= value
;
114 value
= diff
/time_diff
;
116 self.last_value
= value
;
125 #[derive(Serialize, Deserialize)]
126 /// Round Robin Archive
128 /// Number of seconds spaned by a single data entry.
130 /// Consolitation function.
132 /// Count values computed inside this update interval.
134 /// The actual data entries.
140 /// Creates a new instance
141 pub fn new(cf
: CF
, resolution
: u64, points
: usize) -> Self {
146 data
: vec
![f64::NAN
; points
],
150 /// Data slot end time
151 pub fn slot_end_time(&self, time
: u64) -> u64 {
152 self.resolution
* (time
/ self.resolution
+ 1)
155 /// Data slot start time
156 pub fn slot_start_time(&self, time
: u64) -> u64 {
157 self.resolution
* (time
/ self.resolution
)
161 pub fn slot(&self, time
: u64) -> usize {
162 ((time
/ self.resolution
) as usize) % self.data
.len()
165 /// Directly overwrite data slots.
167 /// The caller need to set `last_update` value on the [DataSource] manually.
172 data
: Vec
<Option
<f64>>,
173 ) -> Result
<(), Error
> {
174 if resolution
!= self.resolution
{
175 bail
!("inser_data failed: got wrong resolution");
178 let mut index
= self.slot(start
);
180 for i
in 0..data
.len() {
181 if let Some(v
) = data
[i
] {
182 self.data
[index
] = v
;
184 index
+= 1; if index
>= self.data
.len() { index = 0; }
189 fn delete_old_slots(&mut self, time
: f64, last_update
: f64) {
190 let epoch
= time
as u64;
191 let last_update
= last_update
as u64;
192 let reso
= self.resolution
;
193 let num_entries
= self.data
.len() as u64;
195 let min_time
= epoch
.saturating_sub(num_entries
*reso
);
196 let min_time
= self.slot_end_time(min_time
);
198 let mut t
= last_update
.saturating_sub(num_entries
*reso
);
199 let mut index
= self.slot(t
);
201 for _
in 0..num_entries
{
203 index
+= 1; if index
>= self.data
.len() { index = 0; }
205 self.data
[index
] = f64::NAN
;
212 fn compute_new_value(&mut self, time
: f64, last_update
: f64, value
: f64) {
213 let epoch
= time
as u64;
214 let last_update
= last_update
as u64;
215 let reso
= self.resolution
;
217 let index
= self.slot(epoch
);
218 let last_index
= self.slot(last_update
);
220 if (epoch
- last_update
) > reso
|| index
!= last_index
{
224 let last_value
= self.data
[index
];
225 if last_value
.is_nan() {
229 let new_count
= self.last_count
.saturating_add(1);
231 if self.last_count
== 0 {
232 self.data
[index
] = value
;
235 let new_value
= match self.cf
{
236 CF
::Maximum
=> if last_value
> value { last_value }
else { value }
,
237 CF
::Minimum
=> if last_value
< value { last_value }
else { value }
,
240 (last_value
*(self.last_count
as f64))/(new_count
as f64)
241 + value
/(new_count
as f64)
244 self.data
[index
] = new_value
;
245 self.last_count
= new_count
;
251 /// Extract data from `start` to `end`. The RRA itself does not
252 /// store the `last_update` time, so you need to pass this a
253 /// parameter (see [DataSource]).
259 ) -> (u64, u64, Vec
<Option
<f64>>) {
260 let last_update
= last_update
as u64;
261 let reso
= self.resolution
;
262 let num_entries
= self.data
.len() as u64;
264 let mut list
= Vec
::new();
266 let rrd_end
= self.slot_end_time(last_update
);
267 let rrd_start
= rrd_end
.saturating_sub(reso
*num_entries
);
270 let mut index
= self.slot(t
);
271 for _
in 0..num_entries
{
272 if t
> end { break; }
;
273 if t
< rrd_start
|| t
>= rrd_end
{
276 let value
= self.data
[index
];
280 list
.push(Some(value
));
284 index
+= 1; if index
>= self.data
.len() { index = 0; }
291 #[derive(Serialize, Deserialize)]
292 /// Round Robin Database
294 /// The data source definition
295 pub source
: DataSource
,
296 /// List of round robin archives
297 pub rra_list
: Vec
<RRA
>,
302 /// Creates a new Instance
303 pub fn new(dst
: DST
, rra_list
: Vec
<RRA
>) -> RRD
{
305 let source
= DataSource
::new(dst
);
314 fn from_raw(raw
: &[u8]) -> Result
<Self, Error
> {
316 bail
!("not an rrd file - file is too small ({})", raw
.len());
319 let rrd
= if raw
[0..8] == rrd_v1
::PROXMOX_RRD_MAGIC_1_0
{
320 let v1
= rrd_v1
::RRDv1
::from_raw(&raw
)?
;
322 .map_err(|err
| format_err
!("unable to convert from old V1 format - {}", err
))?
323 } else if raw
[0..8] == PROXMOX_RRD_MAGIC_2_0
{
324 serde_cbor
::from_slice(&raw
[8..])
325 .map_err(|err
| format_err
!("unable to decode RRD file - {}", err
))?
327 bail
!("not an rrd file - unknown magic number");
330 if rrd
.source
.last_update
< 0.0 {
331 bail
!("rrd file has negative last_update time");
337 /// Load data from a file
339 /// Setting `avoid_page_cache` uses
340 /// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in
341 /// the linux page cache.
342 pub fn load(path
: &Path
, avoid_page_cache
: bool
) -> Result
<Self, std
::io
::Error
> {
344 let mut file
= std
::fs
::File
::open(path
)?
;
345 let buffer_size
= file
.metadata().map(|m
| m
.len() as usize + 1).unwrap_or(0);
346 let mut raw
= Vec
::with_capacity(buffer_size
);
347 file
.read_to_end(&mut raw
)?
;
349 if avoid_page_cache
{
350 nix
::fcntl
::posix_fadvise(
354 nix
::fcntl
::PosixFadviseAdvice
::POSIX_FADV_DONTNEED
,
355 ).map_err(|err
| std
::io
::Error
::new(std
::io
::ErrorKind
::Other
, err
.to_string()))?
;
358 match Self::from_raw(&raw
) {
360 Err(err
) => Err(std
::io
::Error
::new(std
::io
::ErrorKind
::Other
, err
.to_string())),
364 /// Store data into a file (atomic replace file)
366 /// Setting `avoid_page_cache` uses
367 /// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in
368 /// the linux page cache.
372 options
: CreateOptions
,
373 avoid_page_cache
: bool
,
374 ) -> Result
<(), Error
> {
376 let (fd
, tmp_path
) = make_tmp_file(&path
, options
)?
;
377 let mut file
= unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) }
;
379 let mut try_block
= || -> Result
<(), Error
> {
380 let mut data
: Vec
<u8> = Vec
::new();
381 data
.extend(&PROXMOX_RRD_MAGIC_2_0
);
382 serde_cbor
::to_writer(&mut data
, self)?
;
383 file
.write_all(&data
)?
;
385 if avoid_page_cache
{
386 nix
::fcntl
::posix_fadvise(
390 nix
::fcntl
::PosixFadviseAdvice
::POSIX_FADV_DONTNEED
,
400 let _
= nix
::unistd
::unlink(&tmp_path
);
405 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
406 let _
= nix
::unistd
::unlink(&tmp_path
);
407 bail
!("Atomic rename failed - {}", err
);
413 /// Returns the last update time.
414 pub fn last_update(&self) -> f64 {
415 self.source
.last_update
418 /// Update the value (in memory)
420 /// Note: This does not call [Self::save].
421 pub fn update(&mut self, time
: f64, value
: f64) {
423 let value
= match self.source
.compute_new_value(time
, value
) {
426 log
::error
!("rrd update failed: {}", err
);
431 let last_update
= self.source
.last_update
;
432 self.source
.last_update
= time
;
434 for rra
in self.rra_list
.iter_mut() {
435 rra
.delete_old_slots(time
, last_update
);
436 rra
.compute_new_value(time
, last_update
, value
);
440 /// Extract data from the archive
442 /// This selects the RRA with specified [CF] and (minimum)
443 /// resolution, and extract data from `start` to `end`.
445 /// `start`: Start time. If not sepecified, we simply extract 10 data points.
446 /// `end`: End time. Default is to use the current time.
453 ) -> Result
<(u64, u64, Vec
<Option
<f64>>), Error
> {
455 let mut rra
: Option
<&RRA
> = None
;
456 for item
in self.rra_list
.iter() {
457 if item
.cf
!= cf { continue; }
458 if item
.resolution
> resolution { continue; }
460 if let Some(current
) = rra
{
461 if item
.resolution
> current
.resolution
{
471 let end
= end
.unwrap_or_else(|| proxmox_time
::epoch_f64() as u64);
472 let start
= start
.unwrap_or(end
.saturating_sub(10*rra
.resolution
));
473 Ok(rra
.extract_data(start
, end
, self.source
.last_update
))
475 None
=> bail
!("unable to find RRA suitable ({:?}:{})", cf
, resolution
),
487 fn basic_rra_maximum_gauge_test() -> Result
<(), Error
> {
488 let rra
= RRA
::new(CF
::Maximum
, 60, 5);
489 let mut rrd
= RRD
::new(DST
::Gauge
, vec
![rra
]);
492 rrd
.update((i
as f64)*30.0, i
as f64);
495 let (start
, reso
, data
) = rrd
.extract_data(CF
::Maximum
, 60, Some(0), Some(5*60))?
;
496 assert_eq
!(start
, 0);
497 assert_eq
!(reso
, 60);
498 assert_eq
!(data
, [None
, Some(3.0), Some(5.0), Some(7.0), Some(9.0)]);
504 fn basic_rra_minimum_gauge_test() -> Result
<(), Error
> {
505 let rra
= RRA
::new(CF
::Minimum
, 60, 5);
506 let mut rrd
= RRD
::new(DST
::Gauge
, vec
![rra
]);
509 rrd
.update((i
as f64)*30.0, i
as f64);
512 let (start
, reso
, data
) = rrd
.extract_data(CF
::Minimum
, 60, Some(0), Some(5*60))?
;
513 assert_eq
!(start
, 0);
514 assert_eq
!(reso
, 60);
515 assert_eq
!(data
, [None
, Some(2.0), Some(4.0), Some(6.0), Some(8.0)]);
521 fn basic_rra_last_gauge_test() -> Result
<(), Error
> {
522 let rra
= RRA
::new(CF
::Last
, 60, 5);
523 let mut rrd
= RRD
::new(DST
::Gauge
, vec
![rra
]);
526 rrd
.update((i
as f64)*30.0, i
as f64);
529 assert
!(rrd
.extract_data(CF
::Average
, 60, Some(0), Some(5*60)).is_err(), "CF::Average should not exist");
531 let (start
, reso
, data
) = rrd
.extract_data(CF
::Last
, 60, Some(0), Some(20*60))?
;
532 assert_eq
!(start
, 0);
533 assert_eq
!(reso
, 60);
534 assert_eq
!(data
, [None
, Some(3.0), Some(5.0), Some(7.0), Some(9.0)]);
540 fn basic_rra_average_derive_test() -> Result
<(), Error
> {
541 let rra
= RRA
::new(CF
::Average
, 60, 5);
542 let mut rrd
= RRD
::new(DST
::Derive
, vec
![rra
]);
545 rrd
.update((i
as f64)*30.0, (i
*60) as f64);
548 let (start
, reso
, data
) = rrd
.extract_data(CF
::Average
, 60, Some(60), Some(5*60))?
;
549 assert_eq
!(start
, 60);
550 assert_eq
!(reso
, 60);
551 assert_eq
!(data
, [Some(1.0), Some(2.0), Some(2.0), Some(2.0), None
]);
557 fn basic_rra_average_gauge_test() -> Result
<(), Error
> {
558 let rra
= RRA
::new(CF
::Average
, 60, 5);
559 let mut rrd
= RRD
::new(DST
::Gauge
, vec
![rra
]);
562 rrd
.update((i
as f64)*30.0, i
as f64);
565 let (start
, reso
, data
) = rrd
.extract_data(CF
::Average
, 60, Some(60), Some(5*60))?
;
566 assert_eq
!(start
, 60);
567 assert_eq
!(reso
, 60);
568 assert_eq
!(data
, [Some(2.5), Some(4.5), Some(6.5), Some(8.5), None
]);
571 rrd
.update((i
as f64)*30.0, i
as f64);
574 let (start
, reso
, data
) = rrd
.extract_data(CF
::Average
, 60, Some(60), Some(5*60))?
;
575 assert_eq
!(start
, 60);
576 assert_eq
!(reso
, 60);
577 assert_eq
!(data
, [None
, Some(4.5), Some(6.5), Some(8.5), Some(10.5)]);
579 let (start
, reso
, data
) = rrd
.extract_data(CF
::Average
, 60, Some(3*60), Some(8*60))?
;
580 assert_eq
!(start
, 3*60);
581 assert_eq
!(reso
, 60);
582 assert_eq
!(data
, [Some(6.5), Some(8.5), Some(10.5), Some(12.5), None
]);
584 // add much newer vaule (should delete all previous/outdated value)
585 let i
= 100; rrd
.update((i
as f64)*30.0, i
as f64);
586 println
!("TEST {:?}", serde_json
::to_string_pretty(&rrd
));
588 let (start
, reso
, data
) = rrd
.extract_data(CF
::Average
, 60, Some(100*30), Some(100*30 + 5*60))?
;
589 assert_eq
!(start
, 100*30);
590 assert_eq
!(reso
, 60);
591 assert_eq
!(data
, [Some(100.0), None
, None
, None
, None
]);
593 // extract with end time smaller than start time
594 let (start
, reso
, data
) = rrd
.extract_data(CF
::Average
, 60, Some(100*30), Some(60))?
;
595 assert_eq
!(start
, 100*30);
596 assert_eq
!(reso
, 60);
597 assert_eq
!(data
, []);