]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-rrd/src/cache.rs
proxmox-rrd: rename RRDCacheState to JournalState
[proxmox-backup.git] / proxmox-rrd / src / cache.rs
1 use std::fs::File;
2 use std::path::{Path, PathBuf};
3 use std::collections::HashMap;
4 use std::sync::RwLock;
5 use std::io::Write;
6 use std::io::{BufRead, BufReader};
7 use std::os::unix::io::AsRawFd;
8 use std::time::SystemTime;
9
10 use anyhow::{format_err, bail, Error};
11 use nix::fcntl::OFlag;
12
13 use proxmox::tools::fs::{atomic_open_or_create_file, create_path, CreateOptions};
14
15 use crate::rrd::{DST, CF, RRD, RRA};
16
17 const RRD_JOURNAL_NAME: &str = "rrd.journal";
18
19 /// RRD cache - keep RRD data in RAM, but write updates to disk
20 ///
21 /// This cache is designed to run as single instance (no concurrent
22 /// access from other processes).
23 pub struct RRDCache {
24 apply_interval: f64,
25 basedir: PathBuf,
26 file_options: CreateOptions,
27 state: RwLock<JournalState>,
28 rrd_map: RwLock<RRDMap>,
29 }
30
31 struct RRDMap {
32 basedir: PathBuf,
33 file_options: CreateOptions,
34 dir_options: CreateOptions,
35 map: HashMap<String, RRD>,
36 load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
37 }
38
39 impl RRDMap {
40
41 fn update(
42 &mut self,
43 rel_path: &str,
44 time: f64,
45 value: f64,
46 dst: DST,
47 new_only: bool,
48 ) -> Result<(), Error> {
49 if let Some(rrd) = self.map.get_mut(rel_path) {
50 if !new_only || time > rrd.last_update() {
51 rrd.update(time, value);
52 }
53 } else {
54 let mut path = self.basedir.clone();
55 path.push(rel_path);
56 create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
57
58 let mut rrd = (self.load_rrd_cb)(&path, rel_path, dst);
59
60 if !new_only || time > rrd.last_update() {
61 rrd.update(time, value);
62 }
63 self.map.insert(rel_path.to_string(), rrd);
64 }
65 Ok(())
66 }
67
68 fn flush_rrd_files(&self) -> Result<usize, Error> {
69 let mut rrd_file_count = 0;
70
71 let mut errors = 0;
72 for (rel_path, rrd) in self.map.iter() {
73 rrd_file_count += 1;
74
75 let mut path = self.basedir.clone();
76 path.push(&rel_path);
77
78 if let Err(err) = rrd.save(&path, self.file_options.clone()) {
79 errors += 1;
80 log::error!("unable to save {:?}: {}", path, err);
81 }
82 }
83
84 if errors != 0 {
85 bail!("errors during rrd flush - unable to commit rrd journal");
86 }
87
88 Ok(rrd_file_count)
89 }
90
91 fn extract_cached_data(
92 &self,
93 base: &str,
94 name: &str,
95 cf: CF,
96 resolution: u64,
97 start: Option<u64>,
98 end: Option<u64>,
99 ) -> Result<Option<(u64, u64, Vec<Option<f64>>)>, Error> {
100 match self.map.get(&format!("{}/{}", base, name)) {
101 Some(rrd) => Ok(Some(rrd.extract_data(cf, resolution, start, end)?)),
102 None => Ok(None),
103 }
104 }
105 }
106
107 // shared state behind RwLock
108 struct JournalState {
109 journal: File,
110 last_journal_flush: f64,
111 journal_applied: bool,
112 }
113
114 struct JournalEntry {
115 time: f64,
116 value: f64,
117 dst: DST,
118 rel_path: String,
119 }
120
121 impl RRDCache {
122
123 /// Creates a new instance
124 ///
125 /// `basedir`: All files are stored relative to this path.
126 ///
127 /// `file_options`: Files are created with this options.
128 ///
129 /// `dir_options`: Directories are created with this options.
130 ///
131 /// `apply_interval`: Commit journal after `apply_interval` seconds.
132 ///
133 /// `load_rrd_cb`; The callback function is used to load RRD files,
134 /// and should return a newly generated RRD if the file does not
135 /// exists (or is unreadable). This may generate RRDs with
136 /// different configurations (dependent on `rel_path`).
137 pub fn new<P: AsRef<Path>>(
138 basedir: P,
139 file_options: Option<CreateOptions>,
140 dir_options: Option<CreateOptions>,
141 apply_interval: f64,
142 load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
143 ) -> Result<Self, Error> {
144 let basedir = basedir.as_ref().to_owned();
145
146 let file_options = file_options.unwrap_or_else(|| CreateOptions::new());
147 let dir_options = dir_options.unwrap_or_else(|| CreateOptions::new());
148
149 create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone()))
150 .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
151
152 let mut journal_path = basedir.clone();
153 journal_path.push(RRD_JOURNAL_NAME);
154
155 let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
156 let journal = atomic_open_or_create_file(&journal_path, flags, &[], file_options.clone())?;
157
158 let state = JournalState {
159 journal,
160 last_journal_flush: 0.0,
161 journal_applied: false,
162 };
163
164 let rrd_map = RRDMap {
165 basedir: basedir.clone(),
166 file_options: file_options.clone(),
167 dir_options: dir_options,
168 map: HashMap::new(),
169 load_rrd_cb,
170 };
171
172 Ok(Self {
173 basedir,
174 file_options,
175 apply_interval,
176 state: RwLock::new(state),
177 rrd_map: RwLock::new(rrd_map),
178 })
179 }
180
181 /// Create a new RRD as used by the proxmox backup server
182 ///
183 /// It contains the following RRAs:
184 ///
185 /// * cf=average,r=60,n=1440 => 1day
186 /// * cf=maximum,r=60,n=1440 => 1day
187 /// * cf=average,r=30*60,n=1440 => 1month
188 /// * cf=maximum,r=30*60,n=1440 => 1month
189 /// * cf=average,r=6*3600,n=1440 => 1year
190 /// * cf=maximum,r=6*3600,n=1440 => 1year
191 /// * cf=average,r=7*86400,n=570 => 10years
192 /// * cf=maximum,r=7*86400,n=570 => 10year
193 ///
194 /// The resultion data file size is about 80KB.
195 pub fn create_proxmox_backup_default_rrd(dst: DST) -> RRD {
196
197 let mut rra_list = Vec::new();
198
199 // 1min * 1440 => 1day
200 rra_list.push(RRA::new(CF::Average, 60, 1440));
201 rra_list.push(RRA::new(CF::Maximum, 60, 1440));
202
203 // 30min * 1440 => 30days = 1month
204 rra_list.push(RRA::new(CF::Average, 30*60, 1440));
205 rra_list.push(RRA::new(CF::Maximum, 30*60, 1440));
206
207 // 6h * 1440 => 360days = 1year
208 rra_list.push(RRA::new(CF::Average, 6*3600, 1440));
209 rra_list.push(RRA::new(CF::Maximum, 6*3600, 1440));
210
211 // 1week * 570 => 10years
212 rra_list.push(RRA::new(CF::Average, 7*86400, 570));
213 rra_list.push(RRA::new(CF::Maximum, 7*86400, 570));
214
215 RRD::new(dst, rra_list)
216 }
217
218 fn parse_journal_line(line: &str) -> Result<JournalEntry, Error> {
219
220 let line = line.trim();
221
222 let parts: Vec<&str> = line.splitn(4, ':').collect();
223 if parts.len() != 4 {
224 bail!("wrong numper of components");
225 }
226
227 let time: f64 = parts[0].parse()
228 .map_err(|_| format_err!("unable to parse time"))?;
229 let value: f64 = parts[1].parse()
230 .map_err(|_| format_err!("unable to parse value"))?;
231 let dst: u8 = parts[2].parse()
232 .map_err(|_| format_err!("unable to parse data source type"))?;
233
234 let dst = match dst {
235 0 => DST::Gauge,
236 1 => DST::Derive,
237 _ => bail!("got strange value for data source type '{}'", dst),
238 };
239
240 let rel_path = parts[3].to_string();
241
242 Ok(JournalEntry { time, value, dst, rel_path })
243 }
244
245 fn append_journal_entry(
246 state: &mut JournalState,
247 time: f64,
248 value: f64,
249 dst: DST,
250 rel_path: &str,
251 ) -> Result<(), Error> {
252 let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
253 state.journal.write_all(journal_entry.as_bytes())?;
254 Ok(())
255 }
256
257 /// Apply and commit the journal. Should be used at server startup.
258 pub fn apply_journal(&self) -> Result<(), Error> {
259 let mut state = self.state.write().unwrap(); // block writers
260 self.apply_and_commit_journal_locked(&mut state)
261 }
262
263 fn apply_and_commit_journal_locked(&self, state: &mut JournalState) -> Result<(), Error> {
264
265 state.last_journal_flush = proxmox_time::epoch_f64();
266
267 if !state.journal_applied {
268 let start_time = SystemTime::now();
269 log::debug!("applying rrd journal");
270
271 match self.apply_journal_locked(state) {
272 Ok(entries) => {
273 let elapsed = start_time.elapsed()?.as_secs_f64();
274 log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed);
275 }
276 Err(err) => bail!("apply rrd journal failed - {}", err),
277 }
278 }
279
280 let start_time = SystemTime::now();
281 log::debug!("commit rrd journal");
282
283 match self.commit_journal_locked(state) {
284 Ok(rrd_file_count) => {
285 let elapsed = start_time.elapsed()?.as_secs_f64();
286 log::info!("rrd journal successfully committed ({} files in {:.3} seconds)",
287 rrd_file_count, elapsed);
288 }
289 Err(err) => bail!("rrd journal commit failed: {}", err),
290 }
291
292 Ok(())
293 }
294
295 fn apply_journal_locked(&self, state: &mut JournalState) -> Result<usize, Error> {
296
297 let mut journal_path = self.basedir.clone();
298 journal_path.push(RRD_JOURNAL_NAME);
299
300 let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
301 let journal = atomic_open_or_create_file(&journal_path, flags, &[], self.file_options.clone())?;
302 let mut journal = BufReader::new(journal);
303
304 // fixme: apply blocked to avoid too many calls to self.rrd_map.write() ??
305 let mut linenr = 0;
306 loop {
307 linenr += 1;
308 let mut line = String::new();
309 let len = journal.read_line(&mut line)?;
310 if len == 0 { break; }
311
312 let entry = match Self::parse_journal_line(&line) {
313 Ok(entry) => entry,
314 Err(err) => {
315 log::warn!("unable to parse rrd journal line {} (skip) - {}", linenr, err);
316 continue; // skip unparsable lines
317 }
318 };
319
320 self.rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?;
321 }
322
323 // We need to apply the journal only once, because further updates
324 // are always directly applied.
325 state.journal_applied = true;
326
327 Ok(linenr)
328 }
329
330 fn commit_journal_locked(&self, state: &mut JournalState) -> Result<usize, Error> {
331
332 // save all RRDs - we only need a read lock here
333 let rrd_file_count = self.rrd_map.read().unwrap().flush_rrd_files()?;
334
335 // if everything went ok, commit the journal
336
337 nix::unistd::ftruncate(state.journal.as_raw_fd(), 0)
338 .map_err(|err| format_err!("unable to truncate journal - {}", err))?;
339
340 Ok(rrd_file_count)
341 }
342
343 /// Update data in RAM and write file back to disk (journal)
344 pub fn update_value(
345 &self,
346 rel_path: &str,
347 time: f64,
348 value: f64,
349 dst: DST,
350 ) -> Result<(), Error> {
351
352 let mut state = self.state.write().unwrap(); // block other writers
353
354 if !state.journal_applied || (time - state.last_journal_flush) > self.apply_interval {
355 self.apply_and_commit_journal_locked(&mut state)?;
356 }
357
358 Self::append_journal_entry(&mut state, time, value, dst, rel_path)?;
359
360 self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?;
361
362 Ok(())
363 }
364
365 /// Extract data from cached RRD
366 ///
367 /// `start`: Start time. If not sepecified, we simply extract 10 data points.
368 ///
369 /// `end`: End time. Default is to use the current time.
370 pub fn extract_cached_data(
371 &self,
372 base: &str,
373 name: &str,
374 cf: CF,
375 resolution: u64,
376 start: Option<u64>,
377 end: Option<u64>,
378 ) -> Result<Option<(u64, u64, Vec<Option<f64>>)>, Error> {
379 self.rrd_map.read().unwrap()
380 .extract_cached_data(base, name, cf, resolution, start, end)
381 }
382 }