]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-rrd/src/cache.rs
proxmox-rrd: remove dependency to proxmox-rrd-api-types
[proxmox-backup.git] / proxmox-rrd / src / cache.rs
1 use std::fs::File;
2 use std::path::{Path, PathBuf};
3 use std::collections::HashMap;
4 use std::sync::RwLock;
5 use std::io::Write;
6 use std::io::{BufRead, BufReader};
7 use std::os::unix::io::AsRawFd;
8
9 use anyhow::{format_err, bail, Error};
10 use nix::fcntl::OFlag;
11
12 use proxmox::tools::fs::{atomic_open_or_create_file, create_path, CreateOptions};
13
14 use crate::rrd::{DST, CF, RRD, RRA};
15
16 const RRD_JOURNAL_NAME: &str = "rrd.journal";
17
18 /// RRD cache - keep RRD data in RAM, but write updates to disk
19 ///
20 /// This cache is designed to run as single instance (no concurrent
21 /// access from other processes).
22 pub struct RRDCache {
23 apply_interval: f64,
24 basedir: PathBuf,
25 file_options: CreateOptions,
26 dir_options: CreateOptions,
27 state: RwLock<RRDCacheState>,
28 }
29
30 // shared state behind RwLock
31 struct RRDCacheState {
32 rrd_map: HashMap<String, RRD>,
33 journal: File,
34 last_journal_flush: f64,
35 }
36
37 struct JournalEntry {
38 time: f64,
39 value: f64,
40 dst: DST,
41 rel_path: String,
42 }
43
44 impl RRDCache {
45
46 /// Creates a new instance
47 pub fn new<P: AsRef<Path>>(
48 basedir: P,
49 file_options: Option<CreateOptions>,
50 dir_options: Option<CreateOptions>,
51 apply_interval: f64,
52 ) -> Result<Self, Error> {
53 let basedir = basedir.as_ref().to_owned();
54
55 let file_options = file_options.unwrap_or_else(|| CreateOptions::new());
56 let dir_options = dir_options.unwrap_or_else(|| CreateOptions::new());
57
58 create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone()))
59 .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
60
61 let mut journal_path = basedir.clone();
62 journal_path.push(RRD_JOURNAL_NAME);
63
64 let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
65 let journal = atomic_open_or_create_file(&journal_path, flags, &[], file_options.clone())?;
66
67 let state = RRDCacheState {
68 journal,
69 rrd_map: HashMap::new(),
70 last_journal_flush: 0.0,
71 };
72
73 Ok(Self {
74 basedir,
75 file_options,
76 dir_options,
77 apply_interval,
78 state: RwLock::new(state),
79 })
80 }
81
82 fn create_default_rrd(dst: DST) -> RRD {
83
84 let mut rra_list = Vec::new();
85
86 // 1min * 1440 => 1day
87 rra_list.push(RRA::new(CF::Average, 60, 1440));
88 rra_list.push(RRA::new(CF::Maximum, 60, 1440));
89
90 // 30min * 1440 => 30days = 1month
91 rra_list.push(RRA::new(CF::Average, 30*60, 1440));
92 rra_list.push(RRA::new(CF::Maximum, 30*60, 1440));
93
94 // 6h * 1440 => 360days = 1year
95 rra_list.push(RRA::new(CF::Average, 6*3600, 1440));
96 rra_list.push(RRA::new(CF::Maximum, 6*3600, 1440));
97
98 // 1week * 570 => 10years
99 rra_list.push(RRA::new(CF::Average, 7*86400, 570));
100 rra_list.push(RRA::new(CF::Maximum, 7*86400, 570));
101
102 RRD::new(dst, rra_list)
103 }
104
105 fn parse_journal_line(line: &str) -> Result<JournalEntry, Error> {
106
107 let line = line.trim();
108
109 let parts: Vec<&str> = line.splitn(4, ':').collect();
110 if parts.len() != 4 {
111 bail!("wrong numper of components");
112 }
113
114 let time: f64 = parts[0].parse()
115 .map_err(|_| format_err!("unable to parse time"))?;
116 let value: f64 = parts[1].parse()
117 .map_err(|_| format_err!("unable to parse value"))?;
118 let dst: u8 = parts[2].parse()
119 .map_err(|_| format_err!("unable to parse data source type"))?;
120
121 let dst = match dst {
122 0 => DST::Gauge,
123 1 => DST::Derive,
124 _ => bail!("got strange value for data source type '{}'", dst),
125 };
126
127 let rel_path = parts[3].to_string();
128
129 Ok(JournalEntry { time, value, dst, rel_path })
130 }
131
132 fn append_journal_entry(
133 state: &mut RRDCacheState,
134 time: f64,
135 value: f64,
136 dst: DST,
137 rel_path: &str,
138 ) -> Result<(), Error> {
139 let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
140 state.journal.write_all(journal_entry.as_bytes())?;
141 Ok(())
142 }
143
144 pub fn apply_journal(&self) -> Result<(), Error> {
145 let mut state = self.state.write().unwrap(); // block writers
146 self.apply_journal_locked(&mut state)
147 }
148
149 fn apply_journal_locked(&self, state: &mut RRDCacheState) -> Result<(), Error> {
150
151 log::info!("applying rrd journal");
152
153 state.last_journal_flush = proxmox_time::epoch_f64();
154
155 let mut journal_path = self.basedir.clone();
156 journal_path.push(RRD_JOURNAL_NAME);
157
158 let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
159 let journal = atomic_open_or_create_file(&journal_path, flags, &[], self.file_options.clone())?;
160 let mut journal = BufReader::new(journal);
161
162 let mut last_update_map = HashMap::new();
163
164 let mut get_last_update = |rel_path: &str, rrd: &RRD| {
165 if let Some(time) = last_update_map.get(rel_path) {
166 return *time;
167 }
168 let last_update = rrd.last_update();
169 last_update_map.insert(rel_path.to_string(), last_update);
170 last_update
171 };
172
173 let mut linenr = 0;
174 loop {
175 linenr += 1;
176 let mut line = String::new();
177 let len = journal.read_line(&mut line)?;
178 if len == 0 { break; }
179
180 let entry = match Self::parse_journal_line(&line) {
181 Ok(entry) => entry,
182 Err(err) => {
183 log::warn!("unable to parse rrd journal line {} (skip) - {}", linenr, err);
184 continue; // skip unparsable lines
185 }
186 };
187
188 if let Some(rrd) = state.rrd_map.get_mut(&entry.rel_path) {
189 if entry.time > get_last_update(&entry.rel_path, &rrd) {
190 rrd.update(entry.time, entry.value);
191 }
192 } else {
193 let mut path = self.basedir.clone();
194 path.push(&entry.rel_path);
195 create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
196
197 let mut rrd = match RRD::load(&path) {
198 Ok(rrd) => rrd,
199 Err(err) => {
200 if err.kind() != std::io::ErrorKind::NotFound {
201 log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err);
202 }
203 Self::create_default_rrd(entry.dst)
204 },
205 };
206 if entry.time > get_last_update(&entry.rel_path, &rrd) {
207 rrd.update(entry.time, entry.value);
208 }
209 state.rrd_map.insert(entry.rel_path.clone(), rrd);
210 }
211 }
212
213 // save all RRDs
214
215 let mut errors = 0;
216 for (rel_path, rrd) in state.rrd_map.iter() {
217 let mut path = self.basedir.clone();
218 path.push(&rel_path);
219 if let Err(err) = rrd.save(&path, self.file_options.clone()) {
220 errors += 1;
221 log::error!("unable to save {:?}: {}", path, err);
222 }
223 }
224
225 // if everything went ok, commit the journal
226
227 if errors == 0 {
228 nix::unistd::ftruncate(state.journal.as_raw_fd(), 0)
229 .map_err(|err| format_err!("unable to truncate journal - {}", err))?;
230 log::info!("rrd journal successfully committed");
231 } else {
232 log::error!("errors during rrd flush - unable to commit rrd journal");
233 }
234
235 Ok(())
236 }
237
238 /// Update data in RAM and write file back to disk (if `save` is set)
239 pub fn update_value(
240 &self,
241 rel_path: &str,
242 value: f64,
243 dst: DST,
244 ) -> Result<(), Error> {
245
246 let mut state = self.state.write().unwrap(); // block other writers
247
248 let now = proxmox_time::epoch_f64();
249
250 if (now - state.last_journal_flush) > self.apply_interval {
251 if let Err(err) = self.apply_journal_locked(&mut state) {
252 log::error!("apply journal failed: {}", err);
253 }
254 }
255
256 Self::append_journal_entry(&mut state, now, value, dst, rel_path)?;
257
258 if let Some(rrd) = state.rrd_map.get_mut(rel_path) {
259 rrd.update(now, value);
260 } else {
261 let mut path = self.basedir.clone();
262 path.push(rel_path);
263 create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
264 let mut rrd = match RRD::load(&path) {
265 Ok(rrd) => rrd,
266 Err(err) => {
267 if err.kind() != std::io::ErrorKind::NotFound {
268 log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err);
269 }
270 Self::create_default_rrd(dst)
271 },
272 };
273 rrd.update(now, value);
274 state.rrd_map.insert(rel_path.into(), rrd);
275 }
276
277 Ok(())
278 }
279
280 /// Extract data from cached RRD
281 ///
282 /// `start`: Start time. If not sepecified, we simply extract 10 data points.
283 /// `end`: End time. Default is to use the current time.
284 pub fn extract_cached_data(
285 &self,
286 base: &str,
287 name: &str,
288 cf: CF,
289 resolution: u64,
290 start: Option<u64>,
291 end: Option<u64>,
292 ) -> Result<Option<(u64, u64, Vec<Option<f64>>)>, Error> {
293
294 let state = self.state.read().unwrap();
295
296 match state.rrd_map.get(&format!("{}/{}", base, name)) {
297 Some(rrd) => Ok(Some(rrd.extract_data(cf, resolution, start, end)?)),
298 None => Ok(None),
299 }
300 }
301 }