]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-rrd/src/cache.rs
proxmox-rrd: use a journal to reduce amount of bytes written
[proxmox-backup.git] / proxmox-rrd / src / cache.rs
1 use std::fs::File;
2 use std::path::{Path, PathBuf};
3 use std::collections::HashMap;
4 use std::sync::RwLock;
5 use std::io::Write;
6 use std::io::{BufRead, BufReader};
7 use std::os::unix::io::AsRawFd;
8
9 use anyhow::{format_err, bail, Error};
10 use nix::fcntl::OFlag;
11
12 use proxmox::tools::fs::{atomic_open_or_create_file, create_path, CreateOptions};
13
14 use proxmox_rrd_api_types::{RRDMode, RRDTimeFrameResolution};
15
16 use crate::{DST, rrd::RRD};
17
18 const RRD_JOURNAL_NAME: &str = "rrd.journal";
19
20 /// RRD cache - keep RRD data in RAM, but write updates to disk
21 ///
22 /// This cache is designed to run as single instance (no concurrent
23 /// access from other processes).
24 pub struct RRDCache {
25 apply_interval: f64,
26 basedir: PathBuf,
27 file_options: CreateOptions,
28 dir_options: CreateOptions,
29 state: RwLock<RRDCacheState>,
30 }
31
32 // shared state behind RwLock
33 struct RRDCacheState {
34 rrd_map: HashMap<String, RRD>,
35 journal: File,
36 last_journal_flush: f64,
37 }
38
39 struct JournalEntry {
40 time: f64,
41 value: f64,
42 dst: DST,
43 rel_path: String,
44 }
45
46 impl RRDCache {
47
48 /// Creates a new instance
49 pub fn new<P: AsRef<Path>>(
50 basedir: P,
51 file_options: Option<CreateOptions>,
52 dir_options: Option<CreateOptions>,
53 apply_interval: f64,
54 ) -> Result<Self, Error> {
55 let basedir = basedir.as_ref().to_owned();
56
57 let file_options = file_options.unwrap_or_else(|| CreateOptions::new());
58 let dir_options = dir_options.unwrap_or_else(|| CreateOptions::new());
59
60 create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone()))
61 .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
62
63 let mut journal_path = basedir.clone();
64 journal_path.push(RRD_JOURNAL_NAME);
65
66 let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
67 let journal = atomic_open_or_create_file(&journal_path, flags, &[], file_options.clone())?;
68
69 let state = RRDCacheState {
70 journal,
71 rrd_map: HashMap::new(),
72 last_journal_flush: 0.0,
73 };
74
75 Ok(Self {
76 basedir,
77 file_options,
78 dir_options,
79 apply_interval,
80 state: RwLock::new(state),
81 })
82 }
83
84 fn parse_journal_line(line: &str) -> Result<JournalEntry, Error> {
85
86 let line = line.trim();
87
88 let parts: Vec<&str> = line.splitn(4, ':').collect();
89 if parts.len() != 4 {
90 bail!("wrong numper of components");
91 }
92
93 let time: f64 = parts[0].parse()
94 .map_err(|_| format_err!("unable to parse time"))?;
95 let value: f64 = parts[1].parse()
96 .map_err(|_| format_err!("unable to parse value"))?;
97 let dst: u8 = parts[2].parse()
98 .map_err(|_| format_err!("unable to parse data source type"))?;
99
100 let dst = match dst {
101 0 => DST::Gauge,
102 1 => DST::Derive,
103 _ => bail!("got strange value for data source type '{}'", dst),
104 };
105
106 let rel_path = parts[3].to_string();
107
108 Ok(JournalEntry { time, value, dst, rel_path })
109 }
110
111 fn append_journal_entry(
112 state: &mut RRDCacheState,
113 time: f64,
114 value: f64,
115 dst: DST,
116 rel_path: &str,
117 ) -> Result<(), Error> {
118 let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
119 state.journal.write_all(journal_entry.as_bytes())?;
120 Ok(())
121 }
122
123 pub fn apply_journal(&self) -> Result<(), Error> {
124 let mut state = self.state.write().unwrap(); // block writers
125 self.apply_journal_locked(&mut state)
126 }
127
128 fn apply_journal_locked(&self, state: &mut RRDCacheState) -> Result<(), Error> {
129
130 log::info!("applying rrd journal");
131
132 state.last_journal_flush = proxmox_time::epoch_f64();
133
134 let mut journal_path = self.basedir.clone();
135 journal_path.push(RRD_JOURNAL_NAME);
136
137 let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
138 let journal = atomic_open_or_create_file(&journal_path, flags, &[], self.file_options.clone())?;
139 let mut journal = BufReader::new(journal);
140
141 let mut last_update_map = HashMap::new();
142
143 let mut get_last_update = |rel_path: &str, rrd: &RRD| {
144 if let Some(time) = last_update_map.get(rel_path) {
145 return *time;
146 }
147 let last_update = rrd.last_update();
148 last_update_map.insert(rel_path.to_string(), last_update);
149 last_update
150 };
151
152 let mut linenr = 0;
153 loop {
154 linenr += 1;
155 let mut line = String::new();
156 let len = journal.read_line(&mut line)?;
157 if len == 0 { break; }
158
159 let entry = match Self::parse_journal_line(&line) {
160 Ok(entry) => entry,
161 Err(err) => {
162 log::warn!("unable to parse rrd journal line {} (skip) - {}", linenr, err);
163 continue; // skip unparsable lines
164 }
165 };
166
167 if let Some(rrd) = state.rrd_map.get_mut(&entry.rel_path) {
168 if entry.time > get_last_update(&entry.rel_path, &rrd) {
169 rrd.update(entry.time, entry.value);
170 }
171 } else {
172 let mut path = self.basedir.clone();
173 path.push(&entry.rel_path);
174 create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
175
176 let mut rrd = match RRD::load(&path) {
177 Ok(rrd) => rrd,
178 Err(err) => {
179 if err.kind() != std::io::ErrorKind::NotFound {
180 log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err);
181 }
182 RRD::new(entry.dst)
183 },
184 };
185 if entry.time > get_last_update(&entry.rel_path, &rrd) {
186 rrd.update(entry.time, entry.value);
187 }
188 state.rrd_map.insert(entry.rel_path.clone(), rrd);
189 }
190 }
191
192 // save all RRDs
193
194 let mut errors = 0;
195 for (rel_path, rrd) in state.rrd_map.iter() {
196 let mut path = self.basedir.clone();
197 path.push(&rel_path);
198 if let Err(err) = rrd.save(&path, self.file_options.clone()) {
199 errors += 1;
200 log::error!("unable to save {:?}: {}", path, err);
201 }
202 }
203
204 // if everything went ok, commit the journal
205
206 if errors == 0 {
207 nix::unistd::ftruncate(state.journal.as_raw_fd(), 0)
208 .map_err(|err| format_err!("unable to truncate journal - {}", err))?;
209 log::info!("rrd journal successfully committed");
210 } else {
211 log::error!("errors during rrd flush - unable to commit rrd journal");
212 }
213
214 Ok(())
215 }
216
217 /// Update data in RAM and write file back to disk (if `save` is set)
218 pub fn update_value(
219 &self,
220 rel_path: &str,
221 value: f64,
222 dst: DST,
223 ) -> Result<(), Error> {
224
225 let mut state = self.state.write().unwrap(); // block other writers
226
227 let now = proxmox_time::epoch_f64();
228
229 if (now - state.last_journal_flush) > self.apply_interval {
230 if let Err(err) = self.apply_journal_locked(&mut state) {
231 log::error!("apply journal failed: {}", err);
232 }
233 }
234
235 Self::append_journal_entry(&mut state, now, value, dst, rel_path)?;
236
237 if let Some(rrd) = state.rrd_map.get_mut(rel_path) {
238 rrd.update(now, value);
239 } else {
240 let mut path = self.basedir.clone();
241 path.push(rel_path);
242 create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
243 let mut rrd = match RRD::load(&path) {
244 Ok(rrd) => rrd,
245 Err(err) => {
246 if err.kind() != std::io::ErrorKind::NotFound {
247 log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err);
248 }
249 RRD::new(dst)
250 },
251 };
252 rrd.update(now, value);
253 state.rrd_map.insert(rel_path.into(), rrd);
254 }
255
256 Ok(())
257 }
258
259 /// Extract data from cached RRD
260 pub fn extract_cached_data(
261 &self,
262 base: &str,
263 name: &str,
264 now: f64,
265 timeframe: RRDTimeFrameResolution,
266 mode: RRDMode,
267 ) -> Option<(u64, u64, Vec<Option<f64>>)> {
268
269 let state = self.state.read().unwrap();
270
271 match state.rrd_map.get(&format!("{}/{}", base, name)) {
272 Some(rrd) => Some(rrd.extract_data(now, timeframe, mode)),
273 None => None,
274 }
275 }
276 }