2 use std
::path
::{Path, PathBuf}
;
3 use std
::collections
::HashMap
;
6 use std
::io
::{BufRead, BufReader}
;
7 use std
::os
::unix
::io
::AsRawFd
;
9 use anyhow
::{format_err, bail, Error}
;
10 use nix
::fcntl
::OFlag
;
12 use proxmox
::tools
::fs
::{atomic_open_or_create_file, create_path, CreateOptions}
;
14 use crate::rrd
::{DST, CF, RRD, RRA}
;
16 const RRD_JOURNAL_NAME
: &str = "rrd.journal";
18 /// RRD cache - keep RRD data in RAM, but write updates to disk
20 /// This cache is designed to run as single instance (no concurrent
21 /// access from other processes).
25 file_options
: CreateOptions
,
26 dir_options
: CreateOptions
,
27 state
: RwLock
<RRDCacheState
>,
30 // shared state behind RwLock
31 struct RRDCacheState
{
32 rrd_map
: HashMap
<String
, RRD
>,
34 last_journal_flush
: f64,
46 /// Creates a new instance
47 pub fn new
<P
: AsRef
<Path
>>(
49 file_options
: Option
<CreateOptions
>,
50 dir_options
: Option
<CreateOptions
>,
52 ) -> Result
<Self, Error
> {
53 let basedir
= basedir
.as_ref().to_owned();
55 let file_options
= file_options
.unwrap_or_else(|| CreateOptions
::new());
56 let dir_options
= dir_options
.unwrap_or_else(|| CreateOptions
::new());
58 create_path(&basedir
, Some(dir_options
.clone()), Some(dir_options
.clone()))
59 .map_err(|err
: Error
| format_err
!("unable to create rrdb stat dir - {}", err
))?
;
61 let mut journal_path
= basedir
.clone();
62 journal_path
.push(RRD_JOURNAL_NAME
);
64 let flags
= OFlag
::O_CLOEXEC
|OFlag
::O_WRONLY
|OFlag
::O_APPEND
;
65 let journal
= atomic_open_or_create_file(&journal_path
, flags
, &[], file_options
.clone())?
;
67 let state
= RRDCacheState
{
69 rrd_map
: HashMap
::new(),
70 last_journal_flush
: 0.0,
78 state
: RwLock
::new(state
),
82 fn create_default_rrd(dst
: DST
) -> RRD
{
84 let mut rra_list
= Vec
::new();
86 // 1min * 1440 => 1day
87 rra_list
.push(RRA
::new(CF
::Average
, 60, 1440));
88 rra_list
.push(RRA
::new(CF
::Maximum
, 60, 1440));
90 // 30min * 1440 => 30days = 1month
91 rra_list
.push(RRA
::new(CF
::Average
, 30*60, 1440));
92 rra_list
.push(RRA
::new(CF
::Maximum
, 30*60, 1440));
94 // 6h * 1440 => 360days = 1year
95 rra_list
.push(RRA
::new(CF
::Average
, 6*3600, 1440));
96 rra_list
.push(RRA
::new(CF
::Maximum
, 6*3600, 1440));
98 // 1week * 570 => 10years
99 rra_list
.push(RRA
::new(CF
::Average
, 7*86400, 570));
100 rra_list
.push(RRA
::new(CF
::Maximum
, 7*86400, 570));
102 RRD
::new(dst
, rra_list
)
105 fn parse_journal_line(line
: &str) -> Result
<JournalEntry
, Error
> {
107 let line
= line
.trim();
109 let parts
: Vec
<&str> = line
.splitn(4, '
:'
).collect();
110 if parts
.len() != 4 {
111 bail
!("wrong numper of components");
114 let time
: f64 = parts
[0].parse()
115 .map_err(|_
| format_err
!("unable to parse time"))?
;
116 let value
: f64 = parts
[1].parse()
117 .map_err(|_
| format_err
!("unable to parse value"))?
;
118 let dst
: u8 = parts
[2].parse()
119 .map_err(|_
| format_err
!("unable to parse data source type"))?
;
121 let dst
= match dst
{
124 _
=> bail
!("got strange value for data source type '{}'", dst
),
127 let rel_path
= parts
[3].to_string();
129 Ok(JournalEntry { time, value, dst, rel_path }
)
132 fn append_journal_entry(
133 state
: &mut RRDCacheState
,
138 ) -> Result
<(), Error
> {
139 let journal_entry
= format
!("{}:{}:{}:{}\n", time
, value
, dst
as u8, rel_path
);
140 state
.journal
.write_all(journal_entry
.as_bytes())?
;
144 pub fn apply_journal(&self) -> Result
<(), Error
> {
145 let mut state
= self.state
.write().unwrap(); // block writers
146 self.apply_journal_locked(&mut state
)
149 fn apply_journal_locked(&self, state
: &mut RRDCacheState
) -> Result
<(), Error
> {
151 log
::info
!("applying rrd journal");
153 state
.last_journal_flush
= proxmox_time
::epoch_f64();
155 let mut journal_path
= self.basedir
.clone();
156 journal_path
.push(RRD_JOURNAL_NAME
);
158 let flags
= OFlag
::O_CLOEXEC
|OFlag
::O_RDONLY
;
159 let journal
= atomic_open_or_create_file(&journal_path
, flags
, &[], self.file_options
.clone())?
;
160 let mut journal
= BufReader
::new(journal
);
162 let mut last_update_map
= HashMap
::new();
164 let mut get_last_update
= |rel_path
: &str, rrd
: &RRD
| {
165 if let Some(time
) = last_update_map
.get(rel_path
) {
168 let last_update
= rrd
.last_update();
169 last_update_map
.insert(rel_path
.to_string(), last_update
);
176 let mut line
= String
::new();
177 let len
= journal
.read_line(&mut line
)?
;
178 if len
== 0 { break; }
180 let entry
= match Self::parse_journal_line(&line
) {
183 log
::warn
!("unable to parse rrd journal line {} (skip) - {}", linenr
, err
);
184 continue; // skip unparsable lines
188 if let Some(rrd
) = state
.rrd_map
.get_mut(&entry
.rel_path
) {
189 if entry
.time
> get_last_update(&entry
.rel_path
, &rrd
) {
190 rrd
.update(entry
.time
, entry
.value
);
193 let mut path
= self.basedir
.clone();
194 path
.push(&entry
.rel_path
);
195 create_path(path
.parent().unwrap(), Some(self.dir_options
.clone()), Some(self.dir_options
.clone()))?
;
197 let mut rrd
= Self::load_rrd(&path
, entry
.dst
);
199 if entry
.time
> get_last_update(&entry
.rel_path
, &rrd
) {
200 rrd
.update(entry
.time
, entry
.value
);
202 state
.rrd_map
.insert(entry
.rel_path
.clone(), rrd
);
209 for (rel_path
, rrd
) in state
.rrd_map
.iter() {
210 let mut path
= self.basedir
.clone();
211 path
.push(&rel_path
);
212 if let Err(err
) = rrd
.save(&path
, self.file_options
.clone()) {
214 log
::error
!("unable to save {:?}: {}", path
, err
);
218 // if everything went ok, commit the journal
221 nix
::unistd
::ftruncate(state
.journal
.as_raw_fd(), 0)
222 .map_err(|err
| format_err
!("unable to truncate journal - {}", err
))?
;
223 log
::info
!("rrd journal successfully committed");
225 log
::error
!("errors during rrd flush - unable to commit rrd journal");
231 fn load_rrd(path
: &Path
, dst
: DST
) -> RRD
{
232 match RRD
::load(path
) {
235 if err
.kind() != std
::io
::ErrorKind
::NotFound
{
236 log
::warn
!("overwriting RRD file {:?}, because of load error: {}", path
, err
);
238 Self::create_default_rrd(dst
)
243 /// Update data in RAM and write file back to disk (journal)
250 ) -> Result
<(), Error
> {
252 let mut state
= self.state
.write().unwrap(); // block other writers
254 if (time
- state
.last_journal_flush
) > self.apply_interval
{
255 if let Err(err
) = self.apply_journal_locked(&mut state
) {
256 log
::error
!("apply journal failed: {}", err
);
260 Self::append_journal_entry(&mut state
, time
, value
, dst
, rel_path
)?
;
262 if let Some(rrd
) = state
.rrd_map
.get_mut(rel_path
) {
263 rrd
.update(time
, value
);
265 let mut path
= self.basedir
.clone();
267 create_path(path
.parent().unwrap(), Some(self.dir_options
.clone()), Some(self.dir_options
.clone()))?
;
269 let mut rrd
= Self::load_rrd(&path
, dst
);
271 rrd
.update(time
, value
);
272 state
.rrd_map
.insert(rel_path
.into(), rrd
);
278 /// Extract data from cached RRD
280 /// `start`: Start time. If not sepecified, we simply extract 10 data points.
281 /// `end`: End time. Default is to use the current time.
282 pub fn extract_cached_data(
290 ) -> Result
<Option
<(u64, u64, Vec
<Option
<f64>>)>, Error
> {
292 let state
= self.state
.read().unwrap();
294 match state
.rrd_map
.get(&format
!("{}/{}", base
, name
)) {
295 Some(rrd
) => Ok(Some(rrd
.extract_data(cf
, resolution
, start
, end
)?
)),