]> git.proxmox.com Git - proxmox-backup.git/blame - proxmox-rrd/src/cache.rs
proxmox-rrd: log all errors from apply_and_commit_journal_thread (but only once)
[proxmox-backup.git] / proxmox-rrd / src / cache.rs
CommitLineData
1d44f175 1use std::fs::File;
09340f28 2use std::path::{Path, PathBuf};
0ca41155 3use std::sync::{Arc, RwLock};
1d44f175 4use std::io::{BufRead, BufReader};
658357c5 5use std::time::SystemTime;
0ca41155 6use std::thread::spawn;
f8430423 7use crossbeam_channel::{bounded, TryRecvError};
1d44f175 8use anyhow::{format_err, bail, Error};
09340f28 9
f8430423 10use proxmox::tools::fs::{create_path, CreateOptions};
09340f28 11
1198f8d4 12use crate::rrd::{DST, CF, RRD, RRA};
09340f28 13
f8430423
DM
14mod journal;
15use journal::*;
1d44f175 16
45700e2e
DM
17mod rrd_map;
18use rrd_map::*;
19
09340f28
DM
20/// RRD cache - keep RRD data in RAM, but write updates to disk
21///
22/// This cache is designed to run as single instance (no concurrent
23/// access from other processes).
24pub struct RRDCache {
0ca41155
DM
25 config: Arc<CacheConfig>,
26 state: Arc<RwLock<JournalState>>,
27 rrd_map: Arc<RwLock<RRDMap>>,
28}
29
f8430423 30pub(crate) struct CacheConfig {
1d44f175 31 apply_interval: f64,
09340f28
DM
32 basedir: PathBuf,
33 file_options: CreateOptions,
0ca41155 34 dir_options: CreateOptions,
fce7cd0d
DM
35}
36
0ca41155 37
09340f28
DM
38impl RRDCache {
39
40 /// Creates a new instance
2e3f94e1
DM
41 ///
42 /// `basedir`: All files are stored relative to this path.
43 ///
44 /// `file_options`: Files are created with this options.
45 ///
46 /// `dir_options`: Directories are created with this options.
47 ///
48 /// `apply_interval`: Commit journal after `apply_interval` seconds.
49 ///
a9017805 50 /// `load_rrd_cb`; The callback function is used to load RRD files,
2e3f94e1
DM
51 /// and should return a newly generated RRD if the file does not
52 /// exists (or is unreadable). This may generate RRDs with
53 /// different configurations (dependent on `rel_path`).
09340f28
DM
54 pub fn new<P: AsRef<Path>>(
55 basedir: P,
56 file_options: Option<CreateOptions>,
57 dir_options: Option<CreateOptions>,
1d44f175 58 apply_interval: f64,
fce7cd0d 59 load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
1d44f175 60 ) -> Result<Self, Error> {
09340f28 61 let basedir = basedir.as_ref().to_owned();
1d44f175
DM
62
63 let file_options = file_options.unwrap_or_else(|| CreateOptions::new());
64 let dir_options = dir_options.unwrap_or_else(|| CreateOptions::new());
65
66 create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone()))
67 .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
68
0ca41155 69 let config = Arc::new(CacheConfig {
fce7cd0d
DM
70 basedir: basedir.clone(),
71 file_options: file_options.clone(),
72 dir_options: dir_options,
0ca41155
DM
73 apply_interval,
74 });
75
76 let state = JournalState::new(Arc::clone(&config))?;
77 let rrd_map = RRDMap::new(Arc::clone(&config), load_rrd_cb);
fce7cd0d 78
1d44f175 79 Ok(Self {
0ca41155
DM
80 config: Arc::clone(&config),
81 state: Arc::new(RwLock::new(state)),
82 rrd_map: Arc::new(RwLock::new(rrd_map)),
83 })
1d44f175
DM
84 }
85
2e3f94e1
DM
86 /// Create a new RRD as used by the proxmox backup server
87 ///
88 /// It contains the following RRAs:
89 ///
90 /// * cf=average,r=60,n=1440 => 1day
91 /// * cf=maximum,r=60,n=1440 => 1day
92 /// * cf=average,r=30*60,n=1440 => 1month
93 /// * cf=maximum,r=30*60,n=1440 => 1month
94 /// * cf=average,r=6*3600,n=1440 => 1year
95 /// * cf=maximum,r=6*3600,n=1440 => 1year
96 /// * cf=average,r=7*86400,n=570 => 10years
97 /// * cf=maximum,r=7*86400,n=570 => 10year
98 ///
99 /// The resultion data file size is about 80KB.
100 pub fn create_proxmox_backup_default_rrd(dst: DST) -> RRD {
1198f8d4
DM
101
102 let mut rra_list = Vec::new();
103
104 // 1min * 1440 => 1day
105 rra_list.push(RRA::new(CF::Average, 60, 1440));
106 rra_list.push(RRA::new(CF::Maximum, 60, 1440));
107
108 // 30min * 1440 => 30days = 1month
109 rra_list.push(RRA::new(CF::Average, 30*60, 1440));
110 rra_list.push(RRA::new(CF::Maximum, 30*60, 1440));
111
112 // 6h * 1440 => 360days = 1year
113 rra_list.push(RRA::new(CF::Average, 6*3600, 1440));
114 rra_list.push(RRA::new(CF::Maximum, 6*3600, 1440));
115
116 // 1week * 570 => 10years
117 rra_list.push(RRA::new(CF::Average, 7*86400, 570));
118 rra_list.push(RRA::new(CF::Maximum, 7*86400, 570));
119
120 RRD::new(dst, rra_list)
121 }
122
1d44f175
DM
123 fn parse_journal_line(line: &str) -> Result<JournalEntry, Error> {
124
125 let line = line.trim();
126
127 let parts: Vec<&str> = line.splitn(4, ':').collect();
128 if parts.len() != 4 {
129 bail!("wrong numper of components");
09340f28 130 }
1d44f175
DM
131
132 let time: f64 = parts[0].parse()
133 .map_err(|_| format_err!("unable to parse time"))?;
134 let value: f64 = parts[1].parse()
135 .map_err(|_| format_err!("unable to parse value"))?;
136 let dst: u8 = parts[2].parse()
137 .map_err(|_| format_err!("unable to parse data source type"))?;
138
139 let dst = match dst {
140 0 => DST::Gauge,
141 1 => DST::Derive,
142 _ => bail!("got strange value for data source type '{}'", dst),
143 };
144
145 let rel_path = parts[3].to_string();
146
147 Ok(JournalEntry { time, value, dst, rel_path })
148 }
149
658357c5 150 /// Apply and commit the journal. Should be used at server startup.
0ca41155
DM
151 pub fn apply_journal(&self) -> Result<bool, Error> {
152 let state = Arc::clone(&self.state);
153 let rrd_map = Arc::clone(&self.rrd_map);
09340f28 154
0ca41155
DM
155 let mut state_guard = self.state.write().unwrap();
156 let journal_applied = state_guard.journal_applied;
1d44f175 157
0ca41155
DM
158 if let Some(ref recv) = state_guard.apply_thread_result {
159 match recv.try_recv() {
160 Ok(Ok(())) => {
161 // finished without errors, OK
c17fbbbc 162 state_guard.apply_thread_result = None;
0ca41155
DM
163 }
164 Ok(Err(err)) => {
165 // finished with errors, log them
166 log::error!("{}", err);
c17fbbbc 167 state_guard.apply_thread_result = None;
0ca41155
DM
168 }
169 Err(TryRecvError::Empty) => {
170 // still running
171 return Ok(journal_applied);
172 }
173 Err(TryRecvError::Disconnected) => {
174 // crashed, start again
175 log::error!("apply journal thread crashed - try again");
c17fbbbc 176 state_guard.apply_thread_result = None;
658357c5 177 }
658357c5 178 }
658357c5
DM
179 }
180
c17fbbbc
DM
181 let now = proxmox_time::epoch_f64();
182 let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval;
183
184 if journal_applied && !wants_commit { return Ok(journal_applied); }
185
0ca41155 186 state_guard.last_journal_flush = proxmox_time::epoch_f64();
1d44f175 187
0ca41155
DM
188 let (sender, receiver) = bounded(1);
189 state_guard.apply_thread_result = Some(receiver);
1d44f175 190
0ca41155
DM
191 spawn(move || {
192 let result = apply_and_commit_journal_thread(state, rrd_map, journal_applied)
193 .map_err(|err| err.to_string());
194 sender.send(result).unwrap();
195 });
658357c5 196
0ca41155 197 Ok(journal_applied)
658357c5
DM
198 }
199
09340f28 200
400f0814 201 /// Update data in RAM and write file back to disk (journal)
09340f28
DM
202 pub fn update_value(
203 &self,
204 rel_path: &str,
58f70bcc 205 time: f64,
09340f28
DM
206 value: f64,
207 dst: DST,
09340f28
DM
208 ) -> Result<(), Error> {
209
0ca41155 210 let journal_applied = self.apply_journal()?;
1d44f175 211
f8430423
DM
212 self.state.write().unwrap()
213 .append_journal_entry(time, value, dst, rel_path)?;
1d44f175 214
0ca41155
DM
215 if journal_applied {
216 self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?;
217 }
09340f28
DM
218
219 Ok(())
220 }
221
222 /// Extract data from cached RRD
eb37d4ec
DM
223 ///
224 /// `start`: Start time. If not sepecified, we simply extract 10 data points.
a9017805 225 ///
eb37d4ec 226 /// `end`: End time. Default is to use the current time.
09340f28
DM
227 pub fn extract_cached_data(
228 &self,
229 base: &str,
230 name: &str,
eb37d4ec
DM
231 cf: CF,
232 resolution: u64,
233 start: Option<u64>,
234 end: Option<u64>,
1198f8d4 235 ) -> Result<Option<(u64, u64, Vec<Option<f64>>)>, Error> {
fce7cd0d
DM
236 self.rrd_map.read().unwrap()
237 .extract_cached_data(base, name, cf, resolution, start, end)
09340f28 238 }
09340f28 239}
0ca41155
DM
240
241
242fn apply_and_commit_journal_thread(
243 state: Arc<RwLock<JournalState>>,
244 rrd_map: Arc<RwLock<RRDMap>>,
245 commit_only: bool,
246) -> Result<(), Error> {
247
248 if commit_only {
249 state.write().unwrap().rotate_journal()?; // start new journal, keep old one
250 } else {
251 let start_time = SystemTime::now();
252 log::debug!("applying rrd journal");
253
254 match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) {
255 Ok(entries) => {
256 let elapsed = start_time.elapsed().unwrap().as_secs_f64();
257 log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed);
258 }
259 Err(err) => bail!("apply rrd journal failed - {}", err),
260 }
261 }
262
263 let start_time = SystemTime::now();
264 log::debug!("commit rrd journal");
265
266 match commit_journal_impl(state, rrd_map) {
267 Ok(rrd_file_count) => {
268 let elapsed = start_time.elapsed().unwrap().as_secs_f64();
269 log::info!("rrd journal successfully committed ({} files in {:.3} seconds)",
270 rrd_file_count, elapsed);
271 }
272 Err(err) => bail!("rrd journal commit failed: {}", err),
273 }
274 Ok(())
275}
276
277fn apply_journal_lines(
278 state: Arc<RwLock<JournalState>>,
279 rrd_map: Arc<RwLock<RRDMap>>,
280 journal_name: &str, // used for logging
281 reader: &mut BufReader<File>,
282 lock_read_line: bool,
283) -> Result<usize, Error> {
284
285 let mut linenr = 0;
286
287 loop {
288 linenr += 1;
289 let mut line = String::new();
290 let len = if lock_read_line {
291 let _lock = state.read().unwrap(); // make sure we read entire lines
292 reader.read_line(&mut line)?
293 } else {
294 reader.read_line(&mut line)?
295 };
296
297 if len == 0 { break; }
298
299 let entry = match RRDCache::parse_journal_line(&line) {
300 Ok(entry) => entry,
301 Err(err) => {
302 log::warn!(
303 "unable to parse rrd journal '{}' line {} (skip) - {}",
304 journal_name, linenr, err,
305 );
306 continue; // skip unparsable lines
307 }
308 };
309
310 rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?;
311 }
312 Ok(linenr)
313}
314
315fn apply_journal_impl(
316 state: Arc<RwLock<JournalState>>,
317 rrd_map: Arc<RwLock<RRDMap>>,
318) -> Result<usize, Error> {
319
320 let mut lines = 0;
321
322 // Apply old journals first
323 let journal_list = state.read().unwrap().list_old_journals()?;
324
2b05008a
DM
325 for entry in journal_list {
326 log::info!("apply old journal log {}", entry.name);
327 let file = std::fs::OpenOptions::new().read(true).open(&entry.path)?;
0ca41155
DM
328 let mut reader = BufReader::new(file);
329 lines += apply_journal_lines(
330 Arc::clone(&state),
331 Arc::clone(&rrd_map),
2b05008a 332 &entry.name,
0ca41155
DM
333 &mut reader,
334 false,
335 )?;
336 }
337
338 let mut journal = state.read().unwrap().open_journal_reader()?;
339
340 lines += apply_journal_lines(
341 Arc::clone(&state),
342 Arc::clone(&rrd_map),
343 "rrd.journal",
344 &mut journal,
345 true,
346 )?;
347
348 {
349 let mut state_guard = state.write().unwrap(); // block other writers
350
351 lines += apply_journal_lines(
352 Arc::clone(&state),
353 Arc::clone(&rrd_map),
354 "rrd.journal",
355 &mut journal,
356 false,
357 )?;
358
359 state_guard.rotate_journal()?; // start new journal, keep old one
360
361 // We need to apply the journal only once, because further updates
362 // are always directly applied.
363 state_guard.journal_applied = true;
364 }
365
366
367 Ok(lines)
368}
369
370fn commit_journal_impl(
371 state: Arc<RwLock<JournalState>>,
372 rrd_map: Arc<RwLock<RRDMap>>,
373) -> Result<usize, Error> {
374
375 // save all RRDs - we only need a read lock here
376 let rrd_file_count = rrd_map.read().unwrap().flush_rrd_files()?;
377
378 // if everything went ok, remove the old journal files
379 state.write().unwrap().remove_old_journals()?;
380
381 Ok(rrd_file_count)
382}