2 use std
::path
::{Path, PathBuf}
;
3 use std
::sync
::{Arc, RwLock}
;
4 use std
::io
::{BufRead, BufReader}
;
5 use std
::time
::SystemTime
;
6 use std
::thread
::spawn
;
7 use std
::os
::unix
::io
::AsRawFd
;
8 use std
::collections
::BTreeSet
;
10 use crossbeam_channel
::{bounded, TryRecvError}
;
11 use anyhow
::{format_err, bail, Error}
;
13 use proxmox_sys
::fs
::{create_path, CreateOptions}
;
15 use crate::rrd
::{DST, CF, RRD, RRA}
;
23 /// RRD cache - keep RRD data in RAM, but write updates to disk
25 /// This cache is designed to run as single instance (no concurrent
26 /// access from other processes).
28 config
: Arc
<CacheConfig
>,
29 state
: Arc
<RwLock
<JournalState
>>,
30 rrd_map
: Arc
<RwLock
<RRDMap
>>,
33 pub(crate) struct CacheConfig
{
36 file_options
: CreateOptions
,
37 dir_options
: CreateOptions
,
43 /// Creates a new instance
45 /// `basedir`: All files are stored relative to this path.
47 /// `file_options`: Files are created with this options.
49 /// `dir_options`: Directories are created with this options.
51 /// `apply_interval`: Commit journal after `apply_interval` seconds.
53 /// `load_rrd_cb`; The callback function is used to load RRD files,
54 /// and should return a newly generated RRD if the file does not
55 /// exists (or is unreadable). This may generate RRDs with
56 /// different configurations (dependent on `rel_path`).
57 pub fn new
<P
: AsRef
<Path
>>(
59 file_options
: Option
<CreateOptions
>,
60 dir_options
: Option
<CreateOptions
>,
62 load_rrd_cb
: fn(path
: &Path
, rel_path
: &str, dst
: DST
) -> RRD
,
63 ) -> Result
<Self, Error
> {
64 let basedir
= basedir
.as_ref().to_owned();
66 let file_options
= file_options
.unwrap_or_else(|| CreateOptions
::new());
67 let dir_options
= dir_options
.unwrap_or_else(|| CreateOptions
::new());
69 create_path(&basedir
, Some(dir_options
.clone()), Some(dir_options
.clone()))
70 .map_err(|err
: Error
| format_err
!("unable to create rrdb stat dir - {}", err
))?
;
72 let config
= Arc
::new(CacheConfig
{
74 file_options
: file_options
,
75 dir_options
: dir_options
,
79 let state
= JournalState
::new(Arc
::clone(&config
))?
;
80 let rrd_map
= RRDMap
::new(Arc
::clone(&config
), load_rrd_cb
);
83 config
: Arc
::clone(&config
),
84 state
: Arc
::new(RwLock
::new(state
)),
85 rrd_map
: Arc
::new(RwLock
::new(rrd_map
)),
89 /// Create a new RRD as used by the proxmox backup server
91 /// It contains the following RRAs:
93 /// * cf=average,r=60,n=1440 => 1day
94 /// * cf=maximum,r=60,n=1440 => 1day
95 /// * cf=average,r=30*60,n=1440 => 1month
96 /// * cf=maximum,r=30*60,n=1440 => 1month
97 /// * cf=average,r=6*3600,n=1440 => 1year
98 /// * cf=maximum,r=6*3600,n=1440 => 1year
99 /// * cf=average,r=7*86400,n=570 => 10years
100 /// * cf=maximum,r=7*86400,n=570 => 10year
102 /// The resultion data file size is about 80KB.
103 pub fn create_proxmox_backup_default_rrd(dst
: DST
) -> RRD
{
105 let mut rra_list
= Vec
::new();
107 // 1min * 1440 => 1day
108 rra_list
.push(RRA
::new(CF
::Average
, 60, 1440));
109 rra_list
.push(RRA
::new(CF
::Maximum
, 60, 1440));
111 // 30min * 1440 => 30days = 1month
112 rra_list
.push(RRA
::new(CF
::Average
, 30*60, 1440));
113 rra_list
.push(RRA
::new(CF
::Maximum
, 30*60, 1440));
115 // 6h * 1440 => 360days = 1year
116 rra_list
.push(RRA
::new(CF
::Average
, 6*3600, 1440));
117 rra_list
.push(RRA
::new(CF
::Maximum
, 6*3600, 1440));
119 // 1week * 570 => 10years
120 rra_list
.push(RRA
::new(CF
::Average
, 7*86400, 570));
121 rra_list
.push(RRA
::new(CF
::Maximum
, 7*86400, 570));
123 RRD
::new(dst
, rra_list
)
126 /// Sync the journal data to disk (using `fdatasync` syscall)
127 pub fn sync_journal(&self) -> Result
<(), Error
> {
128 self.state
.read().unwrap().sync_journal()
131 /// Apply and commit the journal. Should be used at server startup.
132 pub fn apply_journal(&self) -> Result
<bool
, Error
> {
133 let config
= Arc
::clone(&self.config
);
134 let state
= Arc
::clone(&self.state
);
135 let rrd_map
= Arc
::clone(&self.rrd_map
);
138 let mut state_guard
= self.state
.write().unwrap();
139 let journal_applied
= state_guard
.journal_applied
;
141 if let Some(ref recv
) = state_guard
.apply_thread_result
{
142 match recv
.try_recv() {
144 // finished without errors, OK
145 state_guard
.apply_thread_result
= None
;
148 // finished with errors, log them
149 log
::error
!("{}", err
);
150 state_guard
.apply_thread_result
= None
;
152 Err(TryRecvError
::Empty
) => {
154 return Ok(journal_applied
);
156 Err(TryRecvError
::Disconnected
) => {
157 // crashed, start again
158 log
::error
!("apply journal thread crashed - try again");
159 state_guard
.apply_thread_result
= None
;
164 let now
= proxmox_time
::epoch_f64();
165 let wants_commit
= (now
- state_guard
.last_journal_flush
) > self.config
.apply_interval
;
167 if journal_applied
&& !wants_commit { return Ok(journal_applied); }
169 state_guard
.last_journal_flush
= proxmox_time
::epoch_f64();
171 let (sender
, receiver
) = bounded(1);
172 state_guard
.apply_thread_result
= Some(receiver
);
175 let result
= apply_and_commit_journal_thread(config
, state
, rrd_map
, journal_applied
)
176 .map_err(|err
| err
.to_string());
177 sender
.send(result
).unwrap();
184 /// Update data in RAM and write file back to disk (journal)
191 ) -> Result
<(), Error
> {
193 let journal_applied
= self.apply_journal()?
;
195 self.state
.write().unwrap()
196 .append_journal_entry(time
, value
, dst
, rel_path
)?
;
199 self.rrd_map
.write().unwrap().update(rel_path
, time
, value
, dst
, false)?
;
205 /// Extract data from cached RRD
207 /// `start`: Start time. If not sepecified, we simply extract 10 data points.
209 /// `end`: End time. Default is to use the current time.
210 pub fn extract_cached_data(
218 ) -> Result
<Option
<(u64, u64, Vec
<Option
<f64>>)>, Error
> {
219 self.rrd_map
.read().unwrap()
220 .extract_cached_data(base
, name
, cf
, resolution
, start
, end
)
225 fn apply_and_commit_journal_thread(
226 config
: Arc
<CacheConfig
>,
227 state
: Arc
<RwLock
<JournalState
>>,
228 rrd_map
: Arc
<RwLock
<RRDMap
>>,
230 ) -> Result
<(), Error
> {
233 state
.write().unwrap().rotate_journal()?
; // start new journal, keep old one
235 let start_time
= SystemTime
::now();
236 log
::debug
!("applying rrd journal");
238 match apply_journal_impl(Arc
::clone(&state
), Arc
::clone(&rrd_map
)) {
240 let elapsed
= start_time
.elapsed().unwrap().as_secs_f64();
241 log
::info
!("applied rrd journal ({} entries in {:.3} seconds)", entries
, elapsed
);
243 Err(err
) => bail
!("apply rrd journal failed - {}", err
),
247 let start_time
= SystemTime
::now();
248 log
::debug
!("commit rrd journal");
250 match commit_journal_impl(config
, state
, rrd_map
) {
251 Ok(rrd_file_count
) => {
252 let elapsed
= start_time
.elapsed().unwrap().as_secs_f64();
253 log
::info
!("rrd journal successfully committed ({} files in {:.3} seconds)",
254 rrd_file_count
, elapsed
);
256 Err(err
) => bail
!("rrd journal commit failed: {}", err
),
261 fn apply_journal_lines(
262 state
: Arc
<RwLock
<JournalState
>>,
263 rrd_map
: Arc
<RwLock
<RRDMap
>>,
264 journal_name
: &str, // used for logging
265 reader
: &mut BufReader
<File
>,
266 lock_read_line
: bool
,
267 ) -> Result
<usize, Error
> {
273 let mut line
= String
::new();
274 let len
= if lock_read_line
{
275 let _lock
= state
.read().unwrap(); // make sure we read entire lines
276 reader
.read_line(&mut line
)?
278 reader
.read_line(&mut line
)?
281 if len
== 0 { break; }
283 let entry
: JournalEntry
= match line
.parse() {
287 "unable to parse rrd journal '{}' line {} (skip) - {}",
288 journal_name
, linenr
, err
,
290 continue; // skip unparsable lines
294 rrd_map
.write().unwrap().update(&entry
.rel_path
, entry
.time
, entry
.value
, entry
.dst
, true)?
;
299 fn apply_journal_impl(
300 state
: Arc
<RwLock
<JournalState
>>,
301 rrd_map
: Arc
<RwLock
<RRDMap
>>,
302 ) -> Result
<usize, Error
> {
306 // Apply old journals first
307 let journal_list
= state
.read().unwrap().list_old_journals()?
;
309 for entry
in journal_list
{
310 log
::info
!("apply old journal log {}", entry
.name
);
311 let file
= std
::fs
::OpenOptions
::new().read(true).open(&entry
.path
)?
;
312 let mut reader
= BufReader
::new(file
);
313 lines
+= apply_journal_lines(
315 Arc
::clone(&rrd_map
),
322 let mut journal
= state
.read().unwrap().open_journal_reader()?
;
324 lines
+= apply_journal_lines(
326 Arc
::clone(&rrd_map
),
333 let mut state_guard
= state
.write().unwrap(); // block other writers
335 lines
+= apply_journal_lines(
337 Arc
::clone(&rrd_map
),
343 state_guard
.rotate_journal()?
; // start new journal, keep old one
345 // We need to apply the journal only once, because further updates
346 // are always directly applied.
347 state_guard
.journal_applied
= true;
354 fn fsync_file_or_dir(path
: &Path
) -> Result
<(), Error
> {
355 let file
= std
::fs
::File
::open(path
)?
;
356 nix
::unistd
::fsync(file
.as_raw_fd())?
;
360 pub(crate)fn fsync_file_and_parent(path
: &Path
) -> Result
<(), Error
> {
361 let file
= std
::fs
::File
::open(path
)?
;
362 nix
::unistd
::fsync(file
.as_raw_fd())?
;
363 if let Some(parent
) = path
.parent() {
364 fsync_file_or_dir(parent
)?
;
369 fn rrd_parent_dir(basedir
: &Path
, rel_path
: &str) -> PathBuf
{
370 let mut path
= basedir
.to_owned();
371 let rel_path
= Path
::new(rel_path
);
372 if let Some(parent
) = rel_path
.parent() {
378 fn commit_journal_impl(
379 config
: Arc
<CacheConfig
>,
380 state
: Arc
<RwLock
<JournalState
>>,
381 rrd_map
: Arc
<RwLock
<RRDMap
>>,
382 ) -> Result
<usize, Error
> {
384 let files
= rrd_map
.read().unwrap().file_list();
386 let mut rrd_file_count
= 0;
389 let mut dir_set
= BTreeSet
::new();
391 log
::info
!("write rrd data back to disk");
393 // save all RRDs - we only need a read lock here
394 // Note: no fsync here (we do it afterwards)
395 for rel_path
in files
.iter() {
396 let parent_dir
= rrd_parent_dir(&config
.basedir
, rel_path
);
397 dir_set
.insert(parent_dir
);
399 if let Err(err
) = rrd_map
.read().unwrap().flush_rrd_file(rel_path
) {
401 log
::error
!("unable to save rrd {}: {}", rel_path
, err
);
406 bail
!("errors during rrd flush - unable to commit rrd journal");
409 // Important: We fsync files after writing all data! This increase
410 // the likelihood that files are already synced, so this is
411 // much faster (although we need to re-open the files).
413 log
::info
!("starting rrd data sync");
415 for rel_path
in files
.iter() {
416 let mut path
= config
.basedir
.clone();
417 path
.push(&rel_path
);
418 fsync_file_or_dir(&path
)
419 .map_err(|err
| format_err
!("fsync rrd file {} failed - {}", rel_path
, err
))?
;
422 // also fsync directories
423 for dir_path
in dir_set
{
424 fsync_file_or_dir(&dir_path
)
425 .map_err(|err
| format_err
!("fsync rrd dir {:?} failed - {}", dir_path
, err
))?
;
428 // if everything went ok, remove the old journal files
429 state
.write().unwrap().remove_old_journals()?
;