2 use std
::path
::{Path, PathBuf}
;
3 use std
::collections
::HashMap
;
6 use std
::io
::{BufRead, BufReader}
;
7 use std
::os
::unix
::io
::AsRawFd
;
9 use anyhow
::{format_err, bail, Error}
;
10 use nix
::fcntl
::OFlag
;
12 use proxmox
::tools
::fs
::{atomic_open_or_create_file, create_path, CreateOptions}
;
14 use proxmox_rrd_api_types
::{RRDMode, RRDTimeFrameResolution}
;
16 use crate::{DST, rrd::RRD}
;
18 const RRD_JOURNAL_NAME
: &str = "rrd.journal";
20 /// RRD cache - keep RRD data in RAM, but write updates to disk
22 /// This cache is designed to run as single instance (no concurrent
23 /// access from other processes).
27 file_options
: CreateOptions
,
28 dir_options
: CreateOptions
,
29 state
: RwLock
<RRDCacheState
>,
32 // shared state behind RwLock
33 struct RRDCacheState
{
34 rrd_map
: HashMap
<String
, RRD
>,
36 last_journal_flush
: f64,
48 /// Creates a new instance
49 pub fn new
<P
: AsRef
<Path
>>(
51 file_options
: Option
<CreateOptions
>,
52 dir_options
: Option
<CreateOptions
>,
54 ) -> Result
<Self, Error
> {
55 let basedir
= basedir
.as_ref().to_owned();
57 let file_options
= file_options
.unwrap_or_else(|| CreateOptions
::new());
58 let dir_options
= dir_options
.unwrap_or_else(|| CreateOptions
::new());
60 create_path(&basedir
, Some(dir_options
.clone()), Some(dir_options
.clone()))
61 .map_err(|err
: Error
| format_err
!("unable to create rrdb stat dir - {}", err
))?
;
63 let mut journal_path
= basedir
.clone();
64 journal_path
.push(RRD_JOURNAL_NAME
);
66 let flags
= OFlag
::O_CLOEXEC
|OFlag
::O_WRONLY
|OFlag
::O_APPEND
;
67 let journal
= atomic_open_or_create_file(&journal_path
, flags
, &[], file_options
.clone())?
;
69 let state
= RRDCacheState
{
71 rrd_map
: HashMap
::new(),
72 last_journal_flush
: 0.0,
80 state
: RwLock
::new(state
),
84 fn parse_journal_line(line
: &str) -> Result
<JournalEntry
, Error
> {
86 let line
= line
.trim();
88 let parts
: Vec
<&str> = line
.splitn(4, '
:'
).collect();
90 bail
!("wrong numper of components");
93 let time
: f64 = parts
[0].parse()
94 .map_err(|_
| format_err
!("unable to parse time"))?
;
95 let value
: f64 = parts
[1].parse()
96 .map_err(|_
| format_err
!("unable to parse value"))?
;
97 let dst
: u8 = parts
[2].parse()
98 .map_err(|_
| format_err
!("unable to parse data source type"))?
;
100 let dst
= match dst
{
103 _
=> bail
!("got strange value for data source type '{}'", dst
),
106 let rel_path
= parts
[3].to_string();
108 Ok(JournalEntry { time, value, dst, rel_path }
)
111 fn append_journal_entry(
112 state
: &mut RRDCacheState
,
117 ) -> Result
<(), Error
> {
118 let journal_entry
= format
!("{}:{}:{}:{}\n", time
, value
, dst
as u8, rel_path
);
119 state
.journal
.write_all(journal_entry
.as_bytes())?
;
123 pub fn apply_journal(&self) -> Result
<(), Error
> {
124 let mut state
= self.state
.write().unwrap(); // block writers
125 self.apply_journal_locked(&mut state
)
128 fn apply_journal_locked(&self, state
: &mut RRDCacheState
) -> Result
<(), Error
> {
130 log
::info
!("applying rrd journal");
132 state
.last_journal_flush
= proxmox_time
::epoch_f64();
134 let mut journal_path
= self.basedir
.clone();
135 journal_path
.push(RRD_JOURNAL_NAME
);
137 let flags
= OFlag
::O_CLOEXEC
|OFlag
::O_RDONLY
;
138 let journal
= atomic_open_or_create_file(&journal_path
, flags
, &[], self.file_options
.clone())?
;
139 let mut journal
= BufReader
::new(journal
);
141 let mut last_update_map
= HashMap
::new();
143 let mut get_last_update
= |rel_path
: &str, rrd
: &RRD
| {
144 if let Some(time
) = last_update_map
.get(rel_path
) {
147 let last_update
= rrd
.last_update();
148 last_update_map
.insert(rel_path
.to_string(), last_update
);
155 let mut line
= String
::new();
156 let len
= journal
.read_line(&mut line
)?
;
157 if len
== 0 { break; }
159 let entry
= match Self::parse_journal_line(&line
) {
162 log
::warn
!("unable to parse rrd journal line {} (skip) - {}", linenr
, err
);
163 continue; // skip unparsable lines
167 if let Some(rrd
) = state
.rrd_map
.get_mut(&entry
.rel_path
) {
168 if entry
.time
> get_last_update(&entry
.rel_path
, &rrd
) {
169 rrd
.update(entry
.time
, entry
.value
);
172 let mut path
= self.basedir
.clone();
173 path
.push(&entry
.rel_path
);
174 create_path(path
.parent().unwrap(), Some(self.dir_options
.clone()), Some(self.dir_options
.clone()))?
;
176 let mut rrd
= match RRD
::load(&path
) {
179 if err
.kind() != std
::io
::ErrorKind
::NotFound
{
180 log
::warn
!("overwriting RRD file {:?}, because of load error: {}", path
, err
);
185 if entry
.time
> get_last_update(&entry
.rel_path
, &rrd
) {
186 rrd
.update(entry
.time
, entry
.value
);
188 state
.rrd_map
.insert(entry
.rel_path
.clone(), rrd
);
195 for (rel_path
, rrd
) in state
.rrd_map
.iter() {
196 let mut path
= self.basedir
.clone();
197 path
.push(&rel_path
);
198 if let Err(err
) = rrd
.save(&path
, self.file_options
.clone()) {
200 log
::error
!("unable to save {:?}: {}", path
, err
);
204 // if everything went ok, commit the journal
207 nix
::unistd
::ftruncate(state
.journal
.as_raw_fd(), 0)
208 .map_err(|err
| format_err
!("unable to truncate journal - {}", err
))?
;
209 log
::info
!("rrd journal successfully committed");
211 log
::error
!("errors during rrd flush - unable to commit rrd journal");
217 /// Update data in RAM and write file back to disk (if `save` is set)
223 ) -> Result
<(), Error
> {
225 let mut state
= self.state
.write().unwrap(); // block other writers
227 let now
= proxmox_time
::epoch_f64();
229 if (now
- state
.last_journal_flush
) > self.apply_interval
{
230 if let Err(err
) = self.apply_journal_locked(&mut state
) {
231 log
::error
!("apply journal failed: {}", err
);
235 Self::append_journal_entry(&mut state
, now
, value
, dst
, rel_path
)?
;
237 if let Some(rrd
) = state
.rrd_map
.get_mut(rel_path
) {
238 rrd
.update(now
, value
);
240 let mut path
= self.basedir
.clone();
242 create_path(path
.parent().unwrap(), Some(self.dir_options
.clone()), Some(self.dir_options
.clone()))?
;
243 let mut rrd
= match RRD
::load(&path
) {
246 if err
.kind() != std
::io
::ErrorKind
::NotFound
{
247 log
::warn
!("overwriting RRD file {:?}, because of load error: {}", path
, err
);
252 rrd
.update(now
, value
);
253 state
.rrd_map
.insert(rel_path
.into(), rrd
);
259 /// Extract data from cached RRD
260 pub fn extract_cached_data(
265 timeframe
: RRDTimeFrameResolution
,
267 ) -> Option
<(u64, u64, Vec
<Option
<f64>>)> {
269 let state
= self.state
.read().unwrap();
271 match state
.rrd_map
.get(&format
!("{}/{}", base
, name
)) {
272 Some(rrd
) => Some(rrd
.extract_data(now
, timeframe
, mode
)),