]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-rrd/src/cache.rs
tree-wide: drop redundant clones
[proxmox-backup.git] / proxmox-rrd / src / cache.rs
1 use std::fs::File;
2 use std::path::{Path, PathBuf};
3 use std::sync::{Arc, RwLock};
4 use std::io::{BufRead, BufReader};
5 use std::time::SystemTime;
6 use std::thread::spawn;
7 use std::os::unix::io::AsRawFd;
8 use std::collections::BTreeSet;
9
10 use crossbeam_channel::{bounded, TryRecvError};
11 use anyhow::{format_err, bail, Error};
12
13 use proxmox_sys::fs::{create_path, CreateOptions};
14
15 use crate::rrd::{DST, CF, RRD, RRA};
16
17 mod journal;
18 use journal::*;
19
20 mod rrd_map;
21 use rrd_map::*;
22
23 /// RRD cache - keep RRD data in RAM, but write updates to disk
24 ///
25 /// This cache is designed to run as single instance (no concurrent
26 /// access from other processes).
27 pub struct RRDCache {
28 config: Arc<CacheConfig>,
29 state: Arc<RwLock<JournalState>>,
30 rrd_map: Arc<RwLock<RRDMap>>,
31 }
32
33 pub(crate) struct CacheConfig {
34 apply_interval: f64,
35 basedir: PathBuf,
36 file_options: CreateOptions,
37 dir_options: CreateOptions,
38 }
39
40
41 impl RRDCache {
42
43 /// Creates a new instance
44 ///
45 /// `basedir`: All files are stored relative to this path.
46 ///
47 /// `file_options`: Files are created with this options.
48 ///
49 /// `dir_options`: Directories are created with this options.
50 ///
51 /// `apply_interval`: Commit journal after `apply_interval` seconds.
52 ///
53 /// `load_rrd_cb`; The callback function is used to load RRD files,
54 /// and should return a newly generated RRD if the file does not
55 /// exists (or is unreadable). This may generate RRDs with
56 /// different configurations (dependent on `rel_path`).
57 pub fn new<P: AsRef<Path>>(
58 basedir: P,
59 file_options: Option<CreateOptions>,
60 dir_options: Option<CreateOptions>,
61 apply_interval: f64,
62 load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
63 ) -> Result<Self, Error> {
64 let basedir = basedir.as_ref().to_owned();
65
66 let file_options = file_options.unwrap_or_else(|| CreateOptions::new());
67 let dir_options = dir_options.unwrap_or_else(|| CreateOptions::new());
68
69 create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone()))
70 .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
71
72 let config = Arc::new(CacheConfig {
73 basedir: basedir,
74 file_options: file_options,
75 dir_options: dir_options,
76 apply_interval,
77 });
78
79 let state = JournalState::new(Arc::clone(&config))?;
80 let rrd_map = RRDMap::new(Arc::clone(&config), load_rrd_cb);
81
82 Ok(Self {
83 config: Arc::clone(&config),
84 state: Arc::new(RwLock::new(state)),
85 rrd_map: Arc::new(RwLock::new(rrd_map)),
86 })
87 }
88
89 /// Create a new RRD as used by the proxmox backup server
90 ///
91 /// It contains the following RRAs:
92 ///
93 /// * cf=average,r=60,n=1440 => 1day
94 /// * cf=maximum,r=60,n=1440 => 1day
95 /// * cf=average,r=30*60,n=1440 => 1month
96 /// * cf=maximum,r=30*60,n=1440 => 1month
97 /// * cf=average,r=6*3600,n=1440 => 1year
98 /// * cf=maximum,r=6*3600,n=1440 => 1year
99 /// * cf=average,r=7*86400,n=570 => 10years
100 /// * cf=maximum,r=7*86400,n=570 => 10year
101 ///
102 /// The resultion data file size is about 80KB.
103 pub fn create_proxmox_backup_default_rrd(dst: DST) -> RRD {
104
105 let mut rra_list = Vec::new();
106
107 // 1min * 1440 => 1day
108 rra_list.push(RRA::new(CF::Average, 60, 1440));
109 rra_list.push(RRA::new(CF::Maximum, 60, 1440));
110
111 // 30min * 1440 => 30days = 1month
112 rra_list.push(RRA::new(CF::Average, 30*60, 1440));
113 rra_list.push(RRA::new(CF::Maximum, 30*60, 1440));
114
115 // 6h * 1440 => 360days = 1year
116 rra_list.push(RRA::new(CF::Average, 6*3600, 1440));
117 rra_list.push(RRA::new(CF::Maximum, 6*3600, 1440));
118
119 // 1week * 570 => 10years
120 rra_list.push(RRA::new(CF::Average, 7*86400, 570));
121 rra_list.push(RRA::new(CF::Maximum, 7*86400, 570));
122
123 RRD::new(dst, rra_list)
124 }
125
126 /// Sync the journal data to disk (using `fdatasync` syscall)
127 pub fn sync_journal(&self) -> Result<(), Error> {
128 self.state.read().unwrap().sync_journal()
129 }
130
131 /// Apply and commit the journal. Should be used at server startup.
132 pub fn apply_journal(&self) -> Result<bool, Error> {
133 let config = Arc::clone(&self.config);
134 let state = Arc::clone(&self.state);
135 let rrd_map = Arc::clone(&self.rrd_map);
136
137
138 let mut state_guard = self.state.write().unwrap();
139 let journal_applied = state_guard.journal_applied;
140
141 if let Some(ref recv) = state_guard.apply_thread_result {
142 match recv.try_recv() {
143 Ok(Ok(())) => {
144 // finished without errors, OK
145 state_guard.apply_thread_result = None;
146 }
147 Ok(Err(err)) => {
148 // finished with errors, log them
149 log::error!("{}", err);
150 state_guard.apply_thread_result = None;
151 }
152 Err(TryRecvError::Empty) => {
153 // still running
154 return Ok(journal_applied);
155 }
156 Err(TryRecvError::Disconnected) => {
157 // crashed, start again
158 log::error!("apply journal thread crashed - try again");
159 state_guard.apply_thread_result = None;
160 }
161 }
162 }
163
164 let now = proxmox_time::epoch_f64();
165 let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval;
166
167 if journal_applied && !wants_commit { return Ok(journal_applied); }
168
169 state_guard.last_journal_flush = proxmox_time::epoch_f64();
170
171 let (sender, receiver) = bounded(1);
172 state_guard.apply_thread_result = Some(receiver);
173
174 spawn(move || {
175 let result = apply_and_commit_journal_thread(config, state, rrd_map, journal_applied)
176 .map_err(|err| err.to_string());
177 sender.send(result).unwrap();
178 });
179
180 Ok(journal_applied)
181 }
182
183
184 /// Update data in RAM and write file back to disk (journal)
185 pub fn update_value(
186 &self,
187 rel_path: &str,
188 time: f64,
189 value: f64,
190 dst: DST,
191 ) -> Result<(), Error> {
192
193 let journal_applied = self.apply_journal()?;
194
195 self.state.write().unwrap()
196 .append_journal_entry(time, value, dst, rel_path)?;
197
198 if journal_applied {
199 self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?;
200 }
201
202 Ok(())
203 }
204
205 /// Extract data from cached RRD
206 ///
207 /// `start`: Start time. If not sepecified, we simply extract 10 data points.
208 ///
209 /// `end`: End time. Default is to use the current time.
210 pub fn extract_cached_data(
211 &self,
212 base: &str,
213 name: &str,
214 cf: CF,
215 resolution: u64,
216 start: Option<u64>,
217 end: Option<u64>,
218 ) -> Result<Option<(u64, u64, Vec<Option<f64>>)>, Error> {
219 self.rrd_map.read().unwrap()
220 .extract_cached_data(base, name, cf, resolution, start, end)
221 }
222 }
223
224
225 fn apply_and_commit_journal_thread(
226 config: Arc<CacheConfig>,
227 state: Arc<RwLock<JournalState>>,
228 rrd_map: Arc<RwLock<RRDMap>>,
229 commit_only: bool,
230 ) -> Result<(), Error> {
231
232 if commit_only {
233 state.write().unwrap().rotate_journal()?; // start new journal, keep old one
234 } else {
235 let start_time = SystemTime::now();
236 log::debug!("applying rrd journal");
237
238 match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) {
239 Ok(entries) => {
240 let elapsed = start_time.elapsed().unwrap().as_secs_f64();
241 log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed);
242 }
243 Err(err) => bail!("apply rrd journal failed - {}", err),
244 }
245 }
246
247 let start_time = SystemTime::now();
248 log::debug!("commit rrd journal");
249
250 match commit_journal_impl(config, state, rrd_map) {
251 Ok(rrd_file_count) => {
252 let elapsed = start_time.elapsed().unwrap().as_secs_f64();
253 log::info!("rrd journal successfully committed ({} files in {:.3} seconds)",
254 rrd_file_count, elapsed);
255 }
256 Err(err) => bail!("rrd journal commit failed: {}", err),
257 }
258 Ok(())
259 }
260
261 fn apply_journal_lines(
262 state: Arc<RwLock<JournalState>>,
263 rrd_map: Arc<RwLock<RRDMap>>,
264 journal_name: &str, // used for logging
265 reader: &mut BufReader<File>,
266 lock_read_line: bool,
267 ) -> Result<usize, Error> {
268
269 let mut linenr = 0;
270
271 loop {
272 linenr += 1;
273 let mut line = String::new();
274 let len = if lock_read_line {
275 let _lock = state.read().unwrap(); // make sure we read entire lines
276 reader.read_line(&mut line)?
277 } else {
278 reader.read_line(&mut line)?
279 };
280
281 if len == 0 { break; }
282
283 let entry: JournalEntry = match line.parse() {
284 Ok(entry) => entry,
285 Err(err) => {
286 log::warn!(
287 "unable to parse rrd journal '{}' line {} (skip) - {}",
288 journal_name, linenr, err,
289 );
290 continue; // skip unparsable lines
291 }
292 };
293
294 rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?;
295 }
296 Ok(linenr)
297 }
298
299 fn apply_journal_impl(
300 state: Arc<RwLock<JournalState>>,
301 rrd_map: Arc<RwLock<RRDMap>>,
302 ) -> Result<usize, Error> {
303
304 let mut lines = 0;
305
306 // Apply old journals first
307 let journal_list = state.read().unwrap().list_old_journals()?;
308
309 for entry in journal_list {
310 log::info!("apply old journal log {}", entry.name);
311 let file = std::fs::OpenOptions::new().read(true).open(&entry.path)?;
312 let mut reader = BufReader::new(file);
313 lines += apply_journal_lines(
314 Arc::clone(&state),
315 Arc::clone(&rrd_map),
316 &entry.name,
317 &mut reader,
318 false,
319 )?;
320 }
321
322 let mut journal = state.read().unwrap().open_journal_reader()?;
323
324 lines += apply_journal_lines(
325 Arc::clone(&state),
326 Arc::clone(&rrd_map),
327 "rrd.journal",
328 &mut journal,
329 true,
330 )?;
331
332 {
333 let mut state_guard = state.write().unwrap(); // block other writers
334
335 lines += apply_journal_lines(
336 Arc::clone(&state),
337 Arc::clone(&rrd_map),
338 "rrd.journal",
339 &mut journal,
340 false,
341 )?;
342
343 state_guard.rotate_journal()?; // start new journal, keep old one
344
345 // We need to apply the journal only once, because further updates
346 // are always directly applied.
347 state_guard.journal_applied = true;
348 }
349
350
351 Ok(lines)
352 }
353
354 fn fsync_file_or_dir(path: &Path) -> Result<(), Error> {
355 let file = std::fs::File::open(path)?;
356 nix::unistd::fsync(file.as_raw_fd())?;
357 Ok(())
358 }
359
360 pub(crate)fn fsync_file_and_parent(path: &Path) -> Result<(), Error> {
361 let file = std::fs::File::open(path)?;
362 nix::unistd::fsync(file.as_raw_fd())?;
363 if let Some(parent) = path.parent() {
364 fsync_file_or_dir(parent)?;
365 }
366 Ok(())
367 }
368
369 fn rrd_parent_dir(basedir: &Path, rel_path: &str) -> PathBuf {
370 let mut path = basedir.to_owned();
371 let rel_path = Path::new(rel_path);
372 if let Some(parent) = rel_path.parent() {
373 path.push(parent);
374 }
375 path
376 }
377
378 fn commit_journal_impl(
379 config: Arc<CacheConfig>,
380 state: Arc<RwLock<JournalState>>,
381 rrd_map: Arc<RwLock<RRDMap>>,
382 ) -> Result<usize, Error> {
383
384 let files = rrd_map.read().unwrap().file_list();
385
386 let mut rrd_file_count = 0;
387 let mut errors = 0;
388
389 let mut dir_set = BTreeSet::new();
390
391 log::info!("write rrd data back to disk");
392
393 // save all RRDs - we only need a read lock here
394 // Note: no fsync here (we do it afterwards)
395 for rel_path in files.iter() {
396 let parent_dir = rrd_parent_dir(&config.basedir, rel_path);
397 dir_set.insert(parent_dir);
398 rrd_file_count += 1;
399 if let Err(err) = rrd_map.read().unwrap().flush_rrd_file(rel_path) {
400 errors += 1;
401 log::error!("unable to save rrd {}: {}", rel_path, err);
402 }
403 }
404
405 if errors != 0 {
406 bail!("errors during rrd flush - unable to commit rrd journal");
407 }
408
409 // Important: We fsync files after writing all data! This increase
410 // the likelihood that files are already synced, so this is
411 // much faster (although we need to re-open the files).
412
413 log::info!("starting rrd data sync");
414
415 for rel_path in files.iter() {
416 let mut path = config.basedir.clone();
417 path.push(&rel_path);
418 fsync_file_or_dir(&path)
419 .map_err(|err| format_err!("fsync rrd file {} failed - {}", rel_path, err))?;
420 }
421
422 // also fsync directories
423 for dir_path in dir_set {
424 fsync_file_or_dir(&dir_path)
425 .map_err(|err| format_err!("fsync rrd dir {:?} failed - {}", dir_path, err))?;
426 }
427
428 // if everything went ok, remove the old journal files
429 state.write().unwrap().remove_old_journals()?;
430
431 Ok(rrd_file_count)
432 }