]> git.proxmox.com Git - proxmox.git/blame - proxmox-rrd/src/cache.rs
rrd: spell out hard to understand abbreviations in public types
[proxmox.git] / proxmox-rrd / src / cache.rs
CommitLineData
d5b9d1f4 1use std::collections::BTreeSet;
03555549 2use std::fs::File;
d5b9d1f4
TL
3use std::io::{BufRead, BufReader};
4use std::os::unix::io::AsRawFd;
8d1a9d2e 5use std::path::{Path, PathBuf};
30b4800f 6use std::sync::{Arc, RwLock};
30b4800f 7use std::thread::spawn;
d5b9d1f4 8use std::time::SystemTime;
41271202 9
d5b9d1f4 10use anyhow::{bail, format_err, Error};
9dcc64b7 11use crossbeam_channel::{bounded, TryRecvError};
8d1a9d2e 12
a092ef9c 13use proxmox_sys::fs::{create_path, CreateOptions};
8d1a9d2e 14
2f942833 15use crate::rrd::{AggregationFn, Archive, DataSourceType, Database};
56b5c289 16use crate::Entry;
8d1a9d2e 17
9dcc64b7
DM
18mod journal;
19use journal::*;
03555549 20
4393b93a
DM
21mod rrd_map;
22use rrd_map::*;
23
8d1a9d2e
DM
24/// RRD cache - keep RRD data in RAM, but write updates to disk
25///
26/// This cache is designed to run as single instance (no concurrent
27/// access from other processes).
2f942833 28pub struct Cache {
30b4800f
DM
29 config: Arc<CacheConfig>,
30 state: Arc<RwLock<JournalState>>,
31 rrd_map: Arc<RwLock<RRDMap>>,
32}
33
9dcc64b7 34pub(crate) struct CacheConfig {
03555549 35 apply_interval: f64,
8d1a9d2e
DM
36 basedir: PathBuf,
37 file_options: CreateOptions,
30b4800f 38 dir_options: CreateOptions,
2be07c22
DM
39}
40
2f942833 41impl Cache {
8d1a9d2e 42 /// Creates a new instance
8619b21e
DM
43 ///
44 /// `basedir`: All files are stored relative to this path.
45 ///
46 /// `file_options`: Files are created with this options.
47 ///
48 /// `dir_options`: Directories are created with this options.
49 ///
50 /// `apply_interval`: Commit journal after `apply_interval` seconds.
51 ///
26bd6a4f 52 /// `load_rrd_cb`; The callback function is used to load RRD files,
8619b21e
DM
53 /// and should return a newly generated RRD if the file does not
54 /// exists (or is unreadable). This may generate RRDs with
55 /// different configurations (dependent on `rel_path`).
8d1a9d2e
DM
56 pub fn new<P: AsRef<Path>>(
57 basedir: P,
58 file_options: Option<CreateOptions>,
59 dir_options: Option<CreateOptions>,
03555549 60 apply_interval: f64,
2f942833 61 load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DataSourceType) -> Database,
03555549 62 ) -> Result<Self, Error> {
8d1a9d2e 63 let basedir = basedir.as_ref().to_owned();
03555549 64
d80d195c
FG
65 let file_options = file_options.unwrap_or_else(CreateOptions::new);
66 let dir_options = dir_options.unwrap_or_else(CreateOptions::new);
03555549 67
d5b9d1f4
TL
68 create_path(
69 &basedir,
70 Some(dir_options.clone()),
71 Some(dir_options.clone()),
72 )
73 .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
03555549 74
30b4800f 75 let config = Arc::new(CacheConfig {
c8e73a22
FG
76 basedir,
77 file_options,
78 dir_options,
30b4800f
DM
79 apply_interval,
80 });
81
82 let state = JournalState::new(Arc::clone(&config))?;
83 let rrd_map = RRDMap::new(Arc::clone(&config), load_rrd_cb);
2be07c22 84
03555549 85 Ok(Self {
30b4800f
DM
86 config: Arc::clone(&config),
87 state: Arc::new(RwLock::new(state)),
88 rrd_map: Arc::new(RwLock::new(rrd_map)),
89 })
03555549
DM
90 }
91
8619b21e
DM
92 /// Create a new RRD as used by the proxmox backup server
93 ///
94 /// It contains the following RRAs:
95 ///
96 /// * cf=average,r=60,n=1440 => 1day
97 /// * cf=maximum,r=60,n=1440 => 1day
98 /// * cf=average,r=30*60,n=1440 => 1month
99 /// * cf=maximum,r=30*60,n=1440 => 1month
100 /// * cf=average,r=6*3600,n=1440 => 1year
101 /// * cf=maximum,r=6*3600,n=1440 => 1year
102 /// * cf=average,r=7*86400,n=570 => 10years
103 /// * cf=maximum,r=7*86400,n=570 => 10year
104 ///
109902fb 105 /// The resulting data file size is about 80KB.
2f942833 106 pub fn create_proxmox_backup_default_rrd(dst: DataSourceType) -> Database {
6149c171
TL
107 let rra_list = vec![
108 // 1 min * 1440 => 1 day
2f942833
LW
109 Archive::new(AggregationFn::Average, 60, 1440),
110 Archive::new(AggregationFn::Maximum, 60, 1440),
6149c171 111 // 30 min * 1440 => 30 days ~ 1 month
2f942833
LW
112 Archive::new(AggregationFn::Average, 30 * 60, 1440),
113 Archive::new(AggregationFn::Maximum, 30 * 60, 1440),
6149c171 114 // 6 h * 1440 => 360 days ~ 1 year
2f942833
LW
115 Archive::new(AggregationFn::Average, 6 * 3600, 1440),
116 Archive::new(AggregationFn::Maximum, 6 * 3600, 1440),
6149c171 117 // 1 week * 570 => 10 years
2f942833
LW
118 Archive::new(AggregationFn::Average, 7 * 86400, 570),
119 Archive::new(AggregationFn::Maximum, 7 * 86400, 570),
6149c171 120 ];
bc68dee1 121
2f942833 122 Database::new(dst, rra_list)
bc68dee1
DM
123 }
124
86b50e18 125 /// Sync the journal data to disk (using `fdatasync` syscall)
336e8f3e
DM
126 pub fn sync_journal(&self) -> Result<(), Error> {
127 self.state.read().unwrap().sync_journal()
128 }
129
3275f1ac 130 /// Apply and commit the journal. Should be used at server startup.
30b4800f 131 pub fn apply_journal(&self) -> Result<bool, Error> {
41271202 132 let config = Arc::clone(&self.config);
30b4800f
DM
133 let state = Arc::clone(&self.state);
134 let rrd_map = Arc::clone(&self.rrd_map);
8d1a9d2e 135
30b4800f
DM
136 let mut state_guard = self.state.write().unwrap();
137 let journal_applied = state_guard.journal_applied;
03555549 138
30b4800f
DM
139 if let Some(ref recv) = state_guard.apply_thread_result {
140 match recv.try_recv() {
141 Ok(Ok(())) => {
142 // finished without errors, OK
cc0bb597 143 state_guard.apply_thread_result = None;
30b4800f
DM
144 }
145 Ok(Err(err)) => {
146 // finished with errors, log them
147 log::error!("{}", err);
cc0bb597 148 state_guard.apply_thread_result = None;
30b4800f
DM
149 }
150 Err(TryRecvError::Empty) => {
151 // still running
152 return Ok(journal_applied);
153 }
154 Err(TryRecvError::Disconnected) => {
155 // crashed, start again
156 log::error!("apply journal thread crashed - try again");
cc0bb597 157 state_guard.apply_thread_result = None;
3275f1ac 158 }
3275f1ac 159 }
3275f1ac
DM
160 }
161
cc0bb597
DM
162 let now = proxmox_time::epoch_f64();
163 let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval;
164
d5b9d1f4
TL
165 if journal_applied && !wants_commit {
166 return Ok(journal_applied);
167 }
cc0bb597 168
30b4800f 169 state_guard.last_journal_flush = proxmox_time::epoch_f64();
03555549 170
30b4800f
DM
171 let (sender, receiver) = bounded(1);
172 state_guard.apply_thread_result = Some(receiver);
03555549 173
30b4800f 174 spawn(move || {
41271202 175 let result = apply_and_commit_journal_thread(config, state, rrd_map, journal_applied)
30b4800f
DM
176 .map_err(|err| err.to_string());
177 sender.send(result).unwrap();
178 });
3275f1ac 179
30b4800f 180 Ok(journal_applied)
3275f1ac
DM
181 }
182
2c72c6a7 183 /// Update data in RAM and write file back to disk (journal)
8d1a9d2e
DM
184 pub fn update_value(
185 &self,
186 rel_path: &str,
a1eede69 187 time: f64,
8d1a9d2e 188 value: f64,
2f942833 189 dst: DataSourceType,
8d1a9d2e 190 ) -> Result<(), Error> {
30b4800f 191 let journal_applied = self.apply_journal()?;
03555549 192
d5b9d1f4
TL
193 self.state
194 .write()
195 .unwrap()
9dcc64b7 196 .append_journal_entry(time, value, dst, rel_path)?;
03555549 197
30b4800f 198 if journal_applied {
d5b9d1f4
TL
199 self.rrd_map
200 .write()
201 .unwrap()
202 .update(rel_path, time, value, dst, false)?;
30b4800f 203 }
8d1a9d2e
DM
204
205 Ok(())
206 }
207
208 /// Extract data from cached RRD
cf097c5a 209 ///
109902fb 210 /// `start`: Start time. If not specified, we simply extract 10 data points.
26bd6a4f 211 ///
cf097c5a 212 /// `end`: End time. Default is to use the current time.
8d1a9d2e
DM
213 pub fn extract_cached_data(
214 &self,
215 base: &str,
216 name: &str,
2f942833 217 cf: AggregationFn,
cf097c5a
DM
218 resolution: u64,
219 start: Option<u64>,
220 end: Option<u64>,
56b5c289 221 ) -> Result<Option<Entry>, Error> {
d5b9d1f4
TL
222 self.rrd_map
223 .read()
224 .unwrap()
2be07c22 225 .extract_cached_data(base, name, cf, resolution, start, end)
8d1a9d2e 226 }
8d1a9d2e 227}
30b4800f 228
30b4800f 229fn apply_and_commit_journal_thread(
41271202 230 config: Arc<CacheConfig>,
30b4800f
DM
231 state: Arc<RwLock<JournalState>>,
232 rrd_map: Arc<RwLock<RRDMap>>,
233 commit_only: bool,
234) -> Result<(), Error> {
30b4800f
DM
235 if commit_only {
236 state.write().unwrap().rotate_journal()?; // start new journal, keep old one
237 } else {
238 let start_time = SystemTime::now();
239 log::debug!("applying rrd journal");
240
241 match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) {
242 Ok(entries) => {
243 let elapsed = start_time.elapsed().unwrap().as_secs_f64();
d5b9d1f4
TL
244 log::info!(
245 "applied rrd journal ({} entries in {:.3} seconds)",
246 entries,
247 elapsed
248 );
30b4800f
DM
249 }
250 Err(err) => bail!("apply rrd journal failed - {}", err),
251 }
252 }
253
254 let start_time = SystemTime::now();
255 log::debug!("commit rrd journal");
256
41271202 257 match commit_journal_impl(config, state, rrd_map) {
30b4800f
DM
258 Ok(rrd_file_count) => {
259 let elapsed = start_time.elapsed().unwrap().as_secs_f64();
d5b9d1f4
TL
260 log::info!(
261 "rrd journal successfully committed ({} files in {:.3} seconds)",
262 rrd_file_count,
263 elapsed
264 );
30b4800f
DM
265 }
266 Err(err) => bail!("rrd journal commit failed: {}", err),
267 }
268 Ok(())
269}
270
271fn apply_journal_lines(
272 state: Arc<RwLock<JournalState>>,
273 rrd_map: Arc<RwLock<RRDMap>>,
274 journal_name: &str, // used for logging
275 reader: &mut BufReader<File>,
276 lock_read_line: bool,
277) -> Result<usize, Error> {
30b4800f
DM
278 let mut linenr = 0;
279
280 loop {
281 linenr += 1;
282 let mut line = String::new();
283 let len = if lock_read_line {
284 let _lock = state.read().unwrap(); // make sure we read entire lines
285 reader.read_line(&mut line)?
286 } else {
287 reader.read_line(&mut line)?
288 };
289
d5b9d1f4
TL
290 if len == 0 {
291 break;
292 }
30b4800f 293
ed6a7f52 294 let entry: JournalEntry = match line.parse() {
30b4800f
DM
295 Ok(entry) => entry,
296 Err(err) => {
297 log::warn!(
298 "unable to parse rrd journal '{}' line {} (skip) - {}",
d5b9d1f4
TL
299 journal_name,
300 linenr,
301 err,
30b4800f 302 );
f9e8ebfd 303 continue; // skip unparseable lines
30b4800f
DM
304 }
305 };
306
d5b9d1f4
TL
307 rrd_map.write().unwrap().update(
308 &entry.rel_path,
309 entry.time,
310 entry.value,
311 entry.dst,
312 true,
313 )?;
30b4800f
DM
314 }
315 Ok(linenr)
316}
317
318fn apply_journal_impl(
319 state: Arc<RwLock<JournalState>>,
320 rrd_map: Arc<RwLock<RRDMap>>,
321) -> Result<usize, Error> {
30b4800f
DM
322 let mut lines = 0;
323
324 // Apply old journals first
325 let journal_list = state.read().unwrap().list_old_journals()?;
326
a74384f7
DM
327 for entry in journal_list {
328 log::info!("apply old journal log {}", entry.name);
329 let file = std::fs::OpenOptions::new().read(true).open(&entry.path)?;
30b4800f
DM
330 let mut reader = BufReader::new(file);
331 lines += apply_journal_lines(
332 Arc::clone(&state),
333 Arc::clone(&rrd_map),
a74384f7 334 &entry.name,
30b4800f
DM
335 &mut reader,
336 false,
337 )?;
338 }
339
340 let mut journal = state.read().unwrap().open_journal_reader()?;
341
342 lines += apply_journal_lines(
343 Arc::clone(&state),
344 Arc::clone(&rrd_map),
345 "rrd.journal",
346 &mut journal,
347 true,
348 )?;
349
350 {
351 let mut state_guard = state.write().unwrap(); // block other writers
352
353 lines += apply_journal_lines(
354 Arc::clone(&state),
355 Arc::clone(&rrd_map),
356 "rrd.journal",
357 &mut journal,
358 false,
359 )?;
360
361 state_guard.rotate_journal()?; // start new journal, keep old one
362
363 // We need to apply the journal only once, because further updates
364 // are always directly applied.
365 state_guard.journal_applied = true;
366 }
367
30b4800f
DM
368 Ok(lines)
369}
370
41271202
DM
371fn fsync_file_or_dir(path: &Path) -> Result<(), Error> {
372 let file = std::fs::File::open(path)?;
373 nix::unistd::fsync(file.as_raw_fd())?;
374 Ok(())
375}
376
d5b9d1f4 377pub(crate) fn fsync_file_and_parent(path: &Path) -> Result<(), Error> {
41271202
DM
378 let file = std::fs::File::open(path)?;
379 nix::unistd::fsync(file.as_raw_fd())?;
380 if let Some(parent) = path.parent() {
381 fsync_file_or_dir(parent)?;
382 }
383 Ok(())
384}
385
386fn rrd_parent_dir(basedir: &Path, rel_path: &str) -> PathBuf {
387 let mut path = basedir.to_owned();
388 let rel_path = Path::new(rel_path);
389 if let Some(parent) = rel_path.parent() {
390 path.push(parent);
391 }
392 path
393}
394
30b4800f 395fn commit_journal_impl(
41271202 396 config: Arc<CacheConfig>,
30b4800f
DM
397 state: Arc<RwLock<JournalState>>,
398 rrd_map: Arc<RwLock<RRDMap>>,
399) -> Result<usize, Error> {
77c2e466
DM
400 let files = rrd_map.read().unwrap().file_list();
401
402 let mut rrd_file_count = 0;
403 let mut errors = 0;
404
41271202
DM
405 let mut dir_set = BTreeSet::new();
406
407 log::info!("write rrd data back to disk");
408
30b4800f 409 // save all RRDs - we only need a read lock here
41271202 410 // Note: no fsync here (we do it afterwards)
77c2e466 411 for rel_path in files.iter() {
5b193680 412 let parent_dir = rrd_parent_dir(&config.basedir, rel_path);
41271202 413 dir_set.insert(parent_dir);
77c2e466 414 rrd_file_count += 1;
5b193680 415 if let Err(err) = rrd_map.read().unwrap().flush_rrd_file(rel_path) {
77c2e466
DM
416 errors += 1;
417 log::error!("unable to save rrd {}: {}", rel_path, err);
418 }
419 }
420
421 if errors != 0 {
422 bail!("errors during rrd flush - unable to commit rrd journal");
423 }
30b4800f 424
41271202
DM
425 // Important: We fsync files after writing all data! This increase
426 // the likelihood that files are already synced, so this is
427 // much faster (although we need to re-open the files).
428
429 log::info!("starting rrd data sync");
430
431 for rel_path in files.iter() {
432 let mut path = config.basedir.clone();
1aa6f0ea 433 path.push(rel_path);
41271202
DM
434 fsync_file_or_dir(&path)
435 .map_err(|err| format_err!("fsync rrd file {} failed - {}", rel_path, err))?;
436 }
437
438 // also fsync directories
439 for dir_path in dir_set {
440 fsync_file_or_dir(&dir_path)
441 .map_err(|err| format_err!("fsync rrd dir {:?} failed - {}", dir_path, err))?;
442 }
443
30b4800f
DM
444 // if everything went ok, remove the old journal files
445 state.write().unwrap().remove_old_journals()?;
446
447 Ok(rrd_file_count)
448}