]> git.proxmox.com Git - proxmox.git/blob - proxmox-rrd/src/rrd.rs
2fab9df33ed618d5dad1d7c20583b1765b918328
[proxmox.git] / proxmox-rrd / src / rrd.rs
1 //! # Proxmox RRD format version 2
2 //!
3 //! The new format uses
4 //! [CBOR](https://datatracker.ietf.org/doc/html/rfc8949) as storage
5 //! format. This way we can use the serde serialization framework,
6 //! which make our code more flexible, much nicer and type safe.
7 //!
8 //! ## Features
9 //!
10 //! * Well defined data format [CBOR](https://datatracker.ietf.org/doc/html/rfc8949)
11 //! * Plattform independent (big endian f64, hopefully a standard format?)
12 //! * Arbitrary number of RRAs (dynamically changeable)
13
14 use std::path::Path;
15 use std::io::{Read, Write};
16 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
17
18 use anyhow::{bail, format_err, Error};
19 use serde::{Serialize, Deserialize};
20
21 use proxmox::tools::fs::{make_tmp_file, CreateOptions};
22 use proxmox_schema::api;
23
24 use crate::rrd_v1;
25
26 /// Proxmox RRD v2 file magic number
27 // openssl::sha::sha256(b"Proxmox Round Robin Database file v2.0")[0..8];
28 pub const PROXMOX_RRD_MAGIC_2_0: [u8; 8] = [224, 200, 228, 27, 239, 112, 122, 159];
29
30 #[api()]
31 #[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq)]
32 #[serde(rename_all = "kebab-case")]
33 /// RRD data source type
34 pub enum DST {
35 /// Gauge values are stored unmodified.
36 Gauge,
37 /// Stores the difference to the previous value.
38 Derive,
39 /// Stores the difference to the previous value (like Derive), but
40 /// detect counter overflow (and ignores that value)
41 Counter,
42 }
43
44 #[api()]
45 #[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq)]
46 #[serde(rename_all = "kebab-case")]
47 /// Consolidation function
48 pub enum CF {
49 /// Average
50 Average,
51 /// Maximum
52 Maximum,
53 /// Minimum
54 Minimum,
55 /// Use the last value
56 Last,
57 }
58
59 #[derive(Serialize, Deserialize)]
60 /// Data source specification
61 pub struct DataSource {
62 /// Data source type
63 pub dst: DST,
64 /// Last update time (epoch)
65 pub last_update: f64,
66 /// Stores the last value, used to compute differential value for
67 /// derive/counters
68 pub last_value: f64,
69 }
70
71 impl DataSource {
72
73 /// Create a new Instance
74 pub fn new(dst: DST) -> Self {
75 Self {
76 dst,
77 last_update: 0.0,
78 last_value: f64::NAN,
79 }
80 }
81
82 fn compute_new_value(&mut self, time: f64, mut value: f64) -> Result<f64, Error> {
83 if time < 0.0 {
84 bail!("got negative time");
85 }
86 if time <= self.last_update {
87 bail!("time in past ({} < {})", time, self.last_update);
88 }
89
90 if value.is_nan() {
91 bail!("new value is NAN");
92 }
93
94 // derive counter value
95 let is_counter = self.dst == DST::Counter;
96
97 if is_counter || self.dst == DST::Derive {
98 let time_diff = time - self.last_update;
99
100 let diff = if self.last_value.is_nan() {
101 0.0
102 } else if is_counter && value < 0.0 {
103 bail!("got negative value for counter");
104 } else if is_counter && value < self.last_value {
105 // Note: We do not try automatic overflow corrections, but
106 // we update last_value anyways, so that we can compute the diff
107 // next time.
108 self.last_value = value;
109 bail!("conter overflow/reset detected");
110 } else {
111 value - self.last_value
112 };
113 self.last_value = value;
114 value = diff/time_diff;
115 } else {
116 self.last_value = value;
117 }
118
119 Ok(value)
120 }
121
122
123 }
124
125 #[derive(Serialize, Deserialize)]
126 /// Round Robin Archive
127 pub struct RRA {
128 /// Number of seconds spaned by a single data entry.
129 pub resolution: u64,
130 /// Consolitation function.
131 pub cf: CF,
132 /// Count values computed inside this update interval.
133 pub last_count: u64,
134 /// The actual data entries.
135 pub data: Vec<f64>,
136 }
137
138 impl RRA {
139
140 /// Creates a new instance
141 pub fn new(cf: CF, resolution: u64, points: usize) -> Self {
142 Self {
143 cf,
144 resolution,
145 last_count: 0,
146 data: vec![f64::NAN; points],
147 }
148 }
149
150 /// Data slot end time
151 pub fn slot_end_time(&self, time: u64) -> u64 {
152 self.resolution * (time / self.resolution + 1)
153 }
154
155 /// Data slot start time
156 pub fn slot_start_time(&self, time: u64) -> u64 {
157 self.resolution * (time / self.resolution)
158 }
159
160 /// Data slot index
161 pub fn slot(&self, time: u64) -> usize {
162 ((time / self.resolution) as usize) % self.data.len()
163 }
164
165 /// Directly overwrite data slots.
166 ///
167 /// The caller need to set `last_update` value on the [DataSource] manually.
168 pub fn insert_data(
169 &mut self,
170 start: u64,
171 resolution: u64,
172 data: Vec<Option<f64>>,
173 ) -> Result<(), Error> {
174 if resolution != self.resolution {
175 bail!("inser_data failed: got wrong resolution");
176 }
177
178 let mut index = self.slot(start);
179
180 for i in 0..data.len() {
181 if let Some(v) = data[i] {
182 self.data[index] = v;
183 }
184 index += 1; if index >= self.data.len() { index = 0; }
185 }
186 Ok(())
187 }
188
189 fn delete_old_slots(&mut self, time: f64, last_update: f64) {
190 let epoch = time as u64;
191 let last_update = last_update as u64;
192 let reso = self.resolution;
193 let num_entries = self.data.len() as u64;
194
195 let min_time = epoch.saturating_sub(num_entries*reso);
196 let min_time = self.slot_end_time(min_time);
197
198 let mut t = last_update.saturating_sub(num_entries*reso);
199 let mut index = self.slot(t);
200
201 for _ in 0..num_entries {
202 t += reso;
203 index += 1; if index >= self.data.len() { index = 0; }
204 if t < min_time {
205 self.data[index] = f64::NAN;
206 } else {
207 break;
208 }
209 }
210 }
211
212 fn compute_new_value(&mut self, time: f64, last_update: f64, value: f64) {
213 let epoch = time as u64;
214 let last_update = last_update as u64;
215 let reso = self.resolution;
216
217 let index = self.slot(epoch);
218 let last_index = self.slot(last_update);
219
220 if (epoch - last_update) > reso || index != last_index {
221 self.last_count = 0;
222 }
223
224 let last_value = self.data[index];
225 if last_value.is_nan() {
226 self.last_count = 0;
227 }
228
229 let new_count = self.last_count.saturating_add(1);
230
231 if self.last_count == 0 {
232 self.data[index] = value;
233 self.last_count = 1;
234 } else {
235 let new_value = match self.cf {
236 CF::Maximum => if last_value > value { last_value } else { value },
237 CF::Minimum => if last_value < value { last_value } else { value },
238 CF::Last => value,
239 CF::Average => {
240 (last_value*(self.last_count as f64))/(new_count as f64)
241 + value/(new_count as f64)
242 }
243 };
244 self.data[index] = new_value;
245 self.last_count = new_count;
246 }
247 }
248
249 /// Extract data
250 ///
251 /// Extract data from `start` to `end`. The RRA itself does not
252 /// store the `last_update` time, so you need to pass this a
253 /// parameter (see [DataSource]).
254 pub fn extract_data(
255 &self,
256 start: u64,
257 end: u64,
258 last_update: f64,
259 ) -> (u64, u64, Vec<Option<f64>>) {
260 let last_update = last_update as u64;
261 let reso = self.resolution;
262 let num_entries = self.data.len() as u64;
263
264 let mut list = Vec::new();
265
266 let rrd_end = self.slot_end_time(last_update);
267 let rrd_start = rrd_end.saturating_sub(reso*num_entries);
268
269 let mut t = start;
270 let mut index = self.slot(t);
271 for _ in 0..num_entries {
272 if t > end { break; };
273 if t < rrd_start || t >= rrd_end {
274 list.push(None);
275 } else {
276 let value = self.data[index];
277 if value.is_nan() {
278 list.push(None);
279 } else {
280 list.push(Some(value));
281 }
282 }
283 t += reso;
284 index += 1; if index >= self.data.len() { index = 0; }
285 }
286
287 (start, reso, list)
288 }
289 }
290
291 #[derive(Serialize, Deserialize)]
292 /// Round Robin Database
293 pub struct RRD {
294 /// The data source definition
295 pub source: DataSource,
296 /// List of round robin archives
297 pub rra_list: Vec<RRA>,
298 }
299
300 impl RRD {
301
302 /// Creates a new Instance
303 pub fn new(dst: DST, rra_list: Vec<RRA>) -> RRD {
304
305 let source = DataSource::new(dst);
306
307 RRD {
308 source,
309 rra_list,
310 }
311
312 }
313
314 fn from_raw(raw: &[u8]) -> Result<Self, Error> {
315 if raw.len() < 8 {
316 bail!("not an rrd file - file is too small ({})", raw.len());
317 }
318
319 let rrd = if raw[0..8] == rrd_v1::PROXMOX_RRD_MAGIC_1_0 {
320 let v1 = rrd_v1::RRDv1::from_raw(&raw)?;
321 v1.to_rrd_v2()
322 .map_err(|err| format_err!("unable to convert from old V1 format - {}", err))?
323 } else if raw[0..8] == PROXMOX_RRD_MAGIC_2_0 {
324 serde_cbor::from_slice(&raw[8..])
325 .map_err(|err| format_err!("unable to decode RRD file - {}", err))?
326 } else {
327 bail!("not an rrd file - unknown magic number");
328 };
329
330 if rrd.source.last_update < 0.0 {
331 bail!("rrd file has negative last_update time");
332 }
333
334 Ok(rrd)
335 }
336
337 /// Load data from a file
338 ///
339 /// Setting `avoid_page_cache` uses
340 /// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in
341 /// the linux page cache.
342 pub fn load(path: &Path, avoid_page_cache: bool) -> Result<Self, std::io::Error> {
343
344 let mut file = std::fs::File::open(path)?;
345 let buffer_size = file.metadata().map(|m| m.len() as usize + 1).unwrap_or(0);
346 let mut raw = Vec::with_capacity(buffer_size);
347 file.read_to_end(&mut raw)?;
348
349 if avoid_page_cache {
350 nix::fcntl::posix_fadvise(
351 file.as_raw_fd(),
352 0,
353 buffer_size as i64,
354 nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED,
355 ).map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?;
356 }
357
358 match Self::from_raw(&raw) {
359 Ok(rrd) => Ok(rrd),
360 Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err.to_string())),
361 }
362 }
363
364 /// Store data into a file (atomic replace file)
365 ///
366 /// Setting `avoid_page_cache` uses
367 /// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in
368 /// the linux page cache.
369 pub fn save(
370 &self,
371 path: &Path,
372 options: CreateOptions,
373 avoid_page_cache: bool,
374 ) -> Result<(), Error> {
375
376 let (fd, tmp_path) = make_tmp_file(&path, options)?;
377 let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) };
378
379 let mut try_block = || -> Result<(), Error> {
380 let mut data: Vec<u8> = Vec::new();
381 data.extend(&PROXMOX_RRD_MAGIC_2_0);
382 serde_cbor::to_writer(&mut data, self)?;
383 file.write_all(&data)?;
384
385 if avoid_page_cache {
386 nix::fcntl::posix_fadvise(
387 file.as_raw_fd(),
388 0,
389 data.len() as i64,
390 nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED,
391 )?;
392 }
393
394 Ok(())
395 };
396
397 match try_block() {
398 Ok(()) => (),
399 error => {
400 let _ = nix::unistd::unlink(&tmp_path);
401 return error;
402 }
403 }
404
405 if let Err(err) = std::fs::rename(&tmp_path, &path) {
406 let _ = nix::unistd::unlink(&tmp_path);
407 bail!("Atomic rename failed - {}", err);
408 }
409
410 Ok(())
411 }
412
413 /// Returns the last update time.
414 pub fn last_update(&self) -> f64 {
415 self.source.last_update
416 }
417
418 /// Update the value (in memory)
419 ///
420 /// Note: This does not call [Self::save].
421 pub fn update(&mut self, time: f64, value: f64) {
422
423 let value = match self.source.compute_new_value(time, value) {
424 Ok(value) => value,
425 Err(err) => {
426 log::error!("rrd update failed: {}", err);
427 return;
428 }
429 };
430
431 let last_update = self.source.last_update;
432 self.source.last_update = time;
433
434 for rra in self.rra_list.iter_mut() {
435 rra.delete_old_slots(time, last_update);
436 rra.compute_new_value(time, last_update, value);
437 }
438 }
439
440 /// Extract data from the archive
441 ///
442 /// This selects the RRA with specified [CF] and (minimum)
443 /// resolution, and extract data from `start` to `end`.
444 ///
445 /// `start`: Start time. If not sepecified, we simply extract 10 data points.
446 /// `end`: End time. Default is to use the current time.
447 pub fn extract_data(
448 &self,
449 cf: CF,
450 resolution: u64,
451 start: Option<u64>,
452 end: Option<u64>,
453 ) -> Result<(u64, u64, Vec<Option<f64>>), Error> {
454
455 let mut rra: Option<&RRA> = None;
456 for item in self.rra_list.iter() {
457 if item.cf != cf { continue; }
458 if item.resolution > resolution { continue; }
459
460 if let Some(current) = rra {
461 if item.resolution > current.resolution {
462 rra = Some(item);
463 }
464 } else {
465 rra = Some(item);
466 }
467 }
468
469 match rra {
470 Some(rra) => {
471 let end = end.unwrap_or_else(|| proxmox_time::epoch_f64() as u64);
472 let start = start.unwrap_or(end.saturating_sub(10*rra.resolution));
473 Ok(rra.extract_data(start, end, self.source.last_update))
474 }
475 None => bail!("unable to find RRA suitable ({:?}:{})", cf, resolution),
476 }
477 }
478
479 }
480
481
482 #[cfg(test)]
483 mod tests {
484 use super::*;
485
486 #[test]
487 fn basic_rra_maximum_gauge_test() -> Result<(), Error> {
488 let rra = RRA::new(CF::Maximum, 60, 5);
489 let mut rrd = RRD::new(DST::Gauge, vec![rra]);
490
491 for i in 2..10 {
492 rrd.update((i as f64)*30.0, i as f64);
493 }
494
495 let (start, reso, data) = rrd.extract_data(CF::Maximum, 60, Some(0), Some(5*60))?;
496 assert_eq!(start, 0);
497 assert_eq!(reso, 60);
498 assert_eq!(data, [None, Some(3.0), Some(5.0), Some(7.0), Some(9.0)]);
499
500 Ok(())
501 }
502
503 #[test]
504 fn basic_rra_minimum_gauge_test() -> Result<(), Error> {
505 let rra = RRA::new(CF::Minimum, 60, 5);
506 let mut rrd = RRD::new(DST::Gauge, vec![rra]);
507
508 for i in 2..10 {
509 rrd.update((i as f64)*30.0, i as f64);
510 }
511
512 let (start, reso, data) = rrd.extract_data(CF::Minimum, 60, Some(0), Some(5*60))?;
513 assert_eq!(start, 0);
514 assert_eq!(reso, 60);
515 assert_eq!(data, [None, Some(2.0), Some(4.0), Some(6.0), Some(8.0)]);
516
517 Ok(())
518 }
519
520 #[test]
521 fn basic_rra_last_gauge_test() -> Result<(), Error> {
522 let rra = RRA::new(CF::Last, 60, 5);
523 let mut rrd = RRD::new(DST::Gauge, vec![rra]);
524
525 for i in 2..10 {
526 rrd.update((i as f64)*30.0, i as f64);
527 }
528
529 assert!(rrd.extract_data(CF::Average, 60, Some(0), Some(5*60)).is_err(), "CF::Average should not exist");
530
531 let (start, reso, data) = rrd.extract_data(CF::Last, 60, Some(0), Some(20*60))?;
532 assert_eq!(start, 0);
533 assert_eq!(reso, 60);
534 assert_eq!(data, [None, Some(3.0), Some(5.0), Some(7.0), Some(9.0)]);
535
536 Ok(())
537 }
538
539 #[test]
540 fn basic_rra_average_derive_test() -> Result<(), Error> {
541 let rra = RRA::new(CF::Average, 60, 5);
542 let mut rrd = RRD::new(DST::Derive, vec![rra]);
543
544 for i in 2..10 {
545 rrd.update((i as f64)*30.0, (i*60) as f64);
546 }
547
548 let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(60), Some(5*60))?;
549 assert_eq!(start, 60);
550 assert_eq!(reso, 60);
551 assert_eq!(data, [Some(1.0), Some(2.0), Some(2.0), Some(2.0), None]);
552
553 Ok(())
554 }
555
556 #[test]
557 fn basic_rra_average_gauge_test() -> Result<(), Error> {
558 let rra = RRA::new(CF::Average, 60, 5);
559 let mut rrd = RRD::new(DST::Gauge, vec![rra]);
560
561 for i in 2..10 {
562 rrd.update((i as f64)*30.0, i as f64);
563 }
564
565 let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(60), Some(5*60))?;
566 assert_eq!(start, 60);
567 assert_eq!(reso, 60);
568 assert_eq!(data, [Some(2.5), Some(4.5), Some(6.5), Some(8.5), None]);
569
570 for i in 10..14 {
571 rrd.update((i as f64)*30.0, i as f64);
572 }
573
574 let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(60), Some(5*60))?;
575 assert_eq!(start, 60);
576 assert_eq!(reso, 60);
577 assert_eq!(data, [None, Some(4.5), Some(6.5), Some(8.5), Some(10.5)]);
578
579 let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(3*60), Some(8*60))?;
580 assert_eq!(start, 3*60);
581 assert_eq!(reso, 60);
582 assert_eq!(data, [Some(6.5), Some(8.5), Some(10.5), Some(12.5), None]);
583
584 // add much newer vaule (should delete all previous/outdated value)
585 let i = 100; rrd.update((i as f64)*30.0, i as f64);
586 println!("TEST {:?}", serde_json::to_string_pretty(&rrd));
587
588 let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(100*30), Some(100*30 + 5*60))?;
589 assert_eq!(start, 100*30);
590 assert_eq!(reso, 60);
591 assert_eq!(data, [Some(100.0), None, None, None, None]);
592
593 // extract with end time smaller than start time
594 let (start, reso, data) = rrd.extract_data(CF::Average, 60, Some(100*30), Some(60))?;
595 assert_eq!(start, 100*30);
596 assert_eq!(reso, 60);
597 assert_eq!(data, []);
598
599 Ok(())
600 }
601 }