2 use std
::path
::{Path, PathBuf}
;
3 use std
::collections
::HashMap
;
6 use std
::io
::{BufRead, BufReader}
;
7 use std
::os
::unix
::io
::AsRawFd
;
8 use std
::time
::SystemTime
;
10 use anyhow
::{format_err, bail, Error}
;
11 use nix
::fcntl
::OFlag
;
13 use proxmox
::tools
::fs
::{atomic_open_or_create_file, create_path, CreateOptions}
;
15 use crate::rrd
::{DST, CF, RRD, RRA}
;
17 const RRD_JOURNAL_NAME
: &str = "rrd.journal";
19 /// RRD cache - keep RRD data in RAM, but write updates to disk
21 /// This cache is designed to run as single instance (no concurrent
22 /// access from other processes).
26 file_options
: CreateOptions
,
27 state
: RwLock
<JournalState
>,
28 rrd_map
: RwLock
<RRDMap
>,
33 file_options
: CreateOptions
,
34 dir_options
: CreateOptions
,
35 map
: HashMap
<String
, RRD
>,
36 load_rrd_cb
: fn(path
: &Path
, rel_path
: &str, dst
: DST
) -> RRD
,
48 ) -> Result
<(), Error
> {
49 if let Some(rrd
) = self.map
.get_mut(rel_path
) {
50 if !new_only
|| time
> rrd
.last_update() {
51 rrd
.update(time
, value
);
54 let mut path
= self.basedir
.clone();
56 create_path(path
.parent().unwrap(), Some(self.dir_options
.clone()), Some(self.dir_options
.clone()))?
;
58 let mut rrd
= (self.load_rrd_cb
)(&path
, rel_path
, dst
);
60 if !new_only
|| time
> rrd
.last_update() {
61 rrd
.update(time
, value
);
63 self.map
.insert(rel_path
.to_string(), rrd
);
68 fn flush_rrd_files(&self) -> Result
<usize, Error
> {
69 let mut rrd_file_count
= 0;
72 for (rel_path
, rrd
) in self.map
.iter() {
75 let mut path
= self.basedir
.clone();
78 if let Err(err
) = rrd
.save(&path
, self.file_options
.clone()) {
80 log
::error
!("unable to save {:?}: {}", path
, err
);
85 bail
!("errors during rrd flush - unable to commit rrd journal");
91 fn extract_cached_data(
99 ) -> Result
<Option
<(u64, u64, Vec
<Option
<f64>>)>, Error
> {
100 match self.map
.get(&format
!("{}/{}", base
, name
)) {
101 Some(rrd
) => Ok(Some(rrd
.extract_data(cf
, resolution
, start
, end
)?
)),
107 // shared state behind RwLock
108 struct JournalState
{
110 last_journal_flush
: f64,
111 journal_applied
: bool
,
114 struct JournalEntry
{
123 /// Creates a new instance
125 /// `basedir`: All files are stored relative to this path.
127 /// `file_options`: Files are created with this options.
129 /// `dir_options`: Directories are created with this options.
131 /// `apply_interval`: Commit journal after `apply_interval` seconds.
133 /// `load_rrd_cb`; The callback function is used to load RRD files,
134 /// and should return a newly generated RRD if the file does not
135 /// exists (or is unreadable). This may generate RRDs with
136 /// different configurations (dependent on `rel_path`).
137 pub fn new
<P
: AsRef
<Path
>>(
139 file_options
: Option
<CreateOptions
>,
140 dir_options
: Option
<CreateOptions
>,
142 load_rrd_cb
: fn(path
: &Path
, rel_path
: &str, dst
: DST
) -> RRD
,
143 ) -> Result
<Self, Error
> {
144 let basedir
= basedir
.as_ref().to_owned();
146 let file_options
= file_options
.unwrap_or_else(|| CreateOptions
::new());
147 let dir_options
= dir_options
.unwrap_or_else(|| CreateOptions
::new());
149 create_path(&basedir
, Some(dir_options
.clone()), Some(dir_options
.clone()))
150 .map_err(|err
: Error
| format_err
!("unable to create rrdb stat dir - {}", err
))?
;
152 let mut journal_path
= basedir
.clone();
153 journal_path
.push(RRD_JOURNAL_NAME
);
155 let flags
= OFlag
::O_CLOEXEC
|OFlag
::O_WRONLY
|OFlag
::O_APPEND
;
156 let journal
= atomic_open_or_create_file(&journal_path
, flags
, &[], file_options
.clone())?
;
158 let state
= JournalState
{
160 last_journal_flush
: 0.0,
161 journal_applied
: false,
164 let rrd_map
= RRDMap
{
165 basedir
: basedir
.clone(),
166 file_options
: file_options
.clone(),
167 dir_options
: dir_options
,
176 state
: RwLock
::new(state
),
177 rrd_map
: RwLock
::new(rrd_map
),
181 /// Create a new RRD as used by the proxmox backup server
183 /// It contains the following RRAs:
185 /// * cf=average,r=60,n=1440 => 1day
186 /// * cf=maximum,r=60,n=1440 => 1day
187 /// * cf=average,r=30*60,n=1440 => 1month
188 /// * cf=maximum,r=30*60,n=1440 => 1month
189 /// * cf=average,r=6*3600,n=1440 => 1year
190 /// * cf=maximum,r=6*3600,n=1440 => 1year
191 /// * cf=average,r=7*86400,n=570 => 10years
192 /// * cf=maximum,r=7*86400,n=570 => 10year
194 /// The resultion data file size is about 80KB.
195 pub fn create_proxmox_backup_default_rrd(dst
: DST
) -> RRD
{
197 let mut rra_list
= Vec
::new();
199 // 1min * 1440 => 1day
200 rra_list
.push(RRA
::new(CF
::Average
, 60, 1440));
201 rra_list
.push(RRA
::new(CF
::Maximum
, 60, 1440));
203 // 30min * 1440 => 30days = 1month
204 rra_list
.push(RRA
::new(CF
::Average
, 30*60, 1440));
205 rra_list
.push(RRA
::new(CF
::Maximum
, 30*60, 1440));
207 // 6h * 1440 => 360days = 1year
208 rra_list
.push(RRA
::new(CF
::Average
, 6*3600, 1440));
209 rra_list
.push(RRA
::new(CF
::Maximum
, 6*3600, 1440));
211 // 1week * 570 => 10years
212 rra_list
.push(RRA
::new(CF
::Average
, 7*86400, 570));
213 rra_list
.push(RRA
::new(CF
::Maximum
, 7*86400, 570));
215 RRD
::new(dst
, rra_list
)
218 fn parse_journal_line(line
: &str) -> Result
<JournalEntry
, Error
> {
220 let line
= line
.trim();
222 let parts
: Vec
<&str> = line
.splitn(4, '
:'
).collect();
223 if parts
.len() != 4 {
224 bail
!("wrong numper of components");
227 let time
: f64 = parts
[0].parse()
228 .map_err(|_
| format_err
!("unable to parse time"))?
;
229 let value
: f64 = parts
[1].parse()
230 .map_err(|_
| format_err
!("unable to parse value"))?
;
231 let dst
: u8 = parts
[2].parse()
232 .map_err(|_
| format_err
!("unable to parse data source type"))?
;
234 let dst
= match dst
{
237 _
=> bail
!("got strange value for data source type '{}'", dst
),
240 let rel_path
= parts
[3].to_string();
242 Ok(JournalEntry { time, value, dst, rel_path }
)
245 fn append_journal_entry(
246 state
: &mut JournalState
,
251 ) -> Result
<(), Error
> {
252 let journal_entry
= format
!("{}:{}:{}:{}\n", time
, value
, dst
as u8, rel_path
);
253 state
.journal
.write_all(journal_entry
.as_bytes())?
;
257 /// Apply and commit the journal. Should be used at server startup.
258 pub fn apply_journal(&self) -> Result
<(), Error
> {
259 let mut state
= self.state
.write().unwrap(); // block writers
260 self.apply_and_commit_journal_locked(&mut state
)
263 fn apply_and_commit_journal_locked(&self, state
: &mut JournalState
) -> Result
<(), Error
> {
265 state
.last_journal_flush
= proxmox_time
::epoch_f64();
267 if !state
.journal_applied
{
268 let start_time
= SystemTime
::now();
269 log
::debug
!("applying rrd journal");
271 match self.apply_journal_locked(state
) {
273 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
274 log
::info
!("applied rrd journal ({} entries in {:.3} seconds)", entries
, elapsed
);
276 Err(err
) => bail
!("apply rrd journal failed - {}", err
),
280 let start_time
= SystemTime
::now();
281 log
::debug
!("commit rrd journal");
283 match self.commit_journal_locked(state
) {
284 Ok(rrd_file_count
) => {
285 let elapsed
= start_time
.elapsed()?
.as_secs_f64();
286 log
::info
!("rrd journal successfully committed ({} files in {:.3} seconds)",
287 rrd_file_count
, elapsed
);
289 Err(err
) => bail
!("rrd journal commit failed: {}", err
),
295 fn apply_journal_locked(&self, state
: &mut JournalState
) -> Result
<usize, Error
> {
297 let mut journal_path
= self.basedir
.clone();
298 journal_path
.push(RRD_JOURNAL_NAME
);
300 let flags
= OFlag
::O_CLOEXEC
|OFlag
::O_RDONLY
;
301 let journal
= atomic_open_or_create_file(&journal_path
, flags
, &[], self.file_options
.clone())?
;
302 let mut journal
= BufReader
::new(journal
);
304 // fixme: apply blocked to avoid too many calls to self.rrd_map.write() ??
308 let mut line
= String
::new();
309 let len
= journal
.read_line(&mut line
)?
;
310 if len
== 0 { break; }
312 let entry
= match Self::parse_journal_line(&line
) {
315 log
::warn
!("unable to parse rrd journal line {} (skip) - {}", linenr
, err
);
316 continue; // skip unparsable lines
320 self.rrd_map
.write().unwrap().update(&entry
.rel_path
, entry
.time
, entry
.value
, entry
.dst
, true)?
;
323 // We need to apply the journal only once, because further updates
324 // are always directly applied.
325 state
.journal_applied
= true;
330 fn commit_journal_locked(&self, state
: &mut JournalState
) -> Result
<usize, Error
> {
332 // save all RRDs - we only need a read lock here
333 let rrd_file_count
= self.rrd_map
.read().unwrap().flush_rrd_files()?
;
335 // if everything went ok, commit the journal
337 nix
::unistd
::ftruncate(state
.journal
.as_raw_fd(), 0)
338 .map_err(|err
| format_err
!("unable to truncate journal - {}", err
))?
;
343 /// Update data in RAM and write file back to disk (journal)
350 ) -> Result
<(), Error
> {
352 let mut state
= self.state
.write().unwrap(); // block other writers
354 if !state
.journal_applied
|| (time
- state
.last_journal_flush
) > self.apply_interval
{
355 self.apply_and_commit_journal_locked(&mut state
)?
;
358 Self::append_journal_entry(&mut state
, time
, value
, dst
, rel_path
)?
;
360 self.rrd_map
.write().unwrap().update(rel_path
, time
, value
, dst
, false)?
;
365 /// Extract data from cached RRD
367 /// `start`: Start time. If not sepecified, we simply extract 10 data points.
369 /// `end`: End time. Default is to use the current time.
370 pub fn extract_cached_data(
378 ) -> Result
<Option
<(u64, u64, Vec
<Option
<f64>>)>, Error
> {
379 self.rrd_map
.read().unwrap()
380 .extract_cached_data(base
, name
, cf
, resolution
, start
, end
)