]>
Commit | Line | Data |
---|---|---|
1d44f175 | 1 | use std::fs::File; |
09340f28 | 2 | use std::path::{Path, PathBuf}; |
0ca41155 | 3 | use std::sync::{Arc, RwLock}; |
1d44f175 | 4 | use std::io::{BufRead, BufReader}; |
658357c5 | 5 | use std::time::SystemTime; |
0ca41155 | 6 | use std::thread::spawn; |
f8430423 | 7 | use crossbeam_channel::{bounded, TryRecvError}; |
1d44f175 | 8 | use anyhow::{format_err, bail, Error}; |
09340f28 | 9 | |
f8430423 | 10 | use proxmox::tools::fs::{create_path, CreateOptions}; |
09340f28 | 11 | |
1198f8d4 | 12 | use crate::rrd::{DST, CF, RRD, RRA}; |
09340f28 | 13 | |
f8430423 DM |
14 | mod journal; |
15 | use journal::*; | |
1d44f175 | 16 | |
45700e2e DM |
17 | mod rrd_map; |
18 | use rrd_map::*; | |
19 | ||
09340f28 DM |
20 | /// RRD cache - keep RRD data in RAM, but write updates to disk |
21 | /// | |
22 | /// This cache is designed to run as single instance (no concurrent | |
23 | /// access from other processes). | |
24 | pub struct RRDCache { | |
0ca41155 DM |
25 | config: Arc<CacheConfig>, |
26 | state: Arc<RwLock<JournalState>>, | |
27 | rrd_map: Arc<RwLock<RRDMap>>, | |
28 | } | |
29 | ||
f8430423 | 30 | pub(crate) struct CacheConfig { |
1d44f175 | 31 | apply_interval: f64, |
09340f28 DM |
32 | basedir: PathBuf, |
33 | file_options: CreateOptions, | |
0ca41155 | 34 | dir_options: CreateOptions, |
fce7cd0d DM |
35 | } |
36 | ||
0ca41155 | 37 | |
09340f28 DM |
38 | impl RRDCache { |
39 | ||
40 | /// Creates a new instance | |
2e3f94e1 DM |
41 | /// |
42 | /// `basedir`: All files are stored relative to this path. | |
43 | /// | |
44 | /// `file_options`: Files are created with this options. | |
45 | /// | |
46 | /// `dir_options`: Directories are created with this options. | |
47 | /// | |
48 | /// `apply_interval`: Commit journal after `apply_interval` seconds. | |
49 | /// | |
a9017805 | 50 | /// `load_rrd_cb`; The callback function is used to load RRD files, |
2e3f94e1 DM |
51 | /// and should return a newly generated RRD if the file does not |
52 | /// exists (or is unreadable). This may generate RRDs with | |
53 | /// different configurations (dependent on `rel_path`). | |
09340f28 DM |
54 | pub fn new<P: AsRef<Path>>( |
55 | basedir: P, | |
56 | file_options: Option<CreateOptions>, | |
57 | dir_options: Option<CreateOptions>, | |
1d44f175 | 58 | apply_interval: f64, |
fce7cd0d | 59 | load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD, |
1d44f175 | 60 | ) -> Result<Self, Error> { |
09340f28 | 61 | let basedir = basedir.as_ref().to_owned(); |
1d44f175 DM |
62 | |
63 | let file_options = file_options.unwrap_or_else(|| CreateOptions::new()); | |
64 | let dir_options = dir_options.unwrap_or_else(|| CreateOptions::new()); | |
65 | ||
66 | create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone())) | |
67 | .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?; | |
68 | ||
0ca41155 | 69 | let config = Arc::new(CacheConfig { |
fce7cd0d DM |
70 | basedir: basedir.clone(), |
71 | file_options: file_options.clone(), | |
72 | dir_options: dir_options, | |
0ca41155 DM |
73 | apply_interval, |
74 | }); | |
75 | ||
76 | let state = JournalState::new(Arc::clone(&config))?; | |
77 | let rrd_map = RRDMap::new(Arc::clone(&config), load_rrd_cb); | |
fce7cd0d | 78 | |
1d44f175 | 79 | Ok(Self { |
0ca41155 DM |
80 | config: Arc::clone(&config), |
81 | state: Arc::new(RwLock::new(state)), | |
82 | rrd_map: Arc::new(RwLock::new(rrd_map)), | |
83 | }) | |
1d44f175 DM |
84 | } |
85 | ||
2e3f94e1 DM |
86 | /// Create a new RRD as used by the proxmox backup server |
87 | /// | |
88 | /// It contains the following RRAs: | |
89 | /// | |
90 | /// * cf=average,r=60,n=1440 => 1day | |
91 | /// * cf=maximum,r=60,n=1440 => 1day | |
92 | /// * cf=average,r=30*60,n=1440 => 1month | |
93 | /// * cf=maximum,r=30*60,n=1440 => 1month | |
94 | /// * cf=average,r=6*3600,n=1440 => 1year | |
95 | /// * cf=maximum,r=6*3600,n=1440 => 1year | |
96 | /// * cf=average,r=7*86400,n=570 => 10years | |
97 | /// * cf=maximum,r=7*86400,n=570 => 10year | |
98 | /// | |
99 | /// The resultion data file size is about 80KB. | |
100 | pub fn create_proxmox_backup_default_rrd(dst: DST) -> RRD { | |
1198f8d4 DM |
101 | |
102 | let mut rra_list = Vec::new(); | |
103 | ||
104 | // 1min * 1440 => 1day | |
105 | rra_list.push(RRA::new(CF::Average, 60, 1440)); | |
106 | rra_list.push(RRA::new(CF::Maximum, 60, 1440)); | |
107 | ||
108 | // 30min * 1440 => 30days = 1month | |
109 | rra_list.push(RRA::new(CF::Average, 30*60, 1440)); | |
110 | rra_list.push(RRA::new(CF::Maximum, 30*60, 1440)); | |
111 | ||
112 | // 6h * 1440 => 360days = 1year | |
113 | rra_list.push(RRA::new(CF::Average, 6*3600, 1440)); | |
114 | rra_list.push(RRA::new(CF::Maximum, 6*3600, 1440)); | |
115 | ||
116 | // 1week * 570 => 10years | |
117 | rra_list.push(RRA::new(CF::Average, 7*86400, 570)); | |
118 | rra_list.push(RRA::new(CF::Maximum, 7*86400, 570)); | |
119 | ||
120 | RRD::new(dst, rra_list) | |
121 | } | |
122 | ||
1d44f175 DM |
123 | fn parse_journal_line(line: &str) -> Result<JournalEntry, Error> { |
124 | ||
125 | let line = line.trim(); | |
126 | ||
127 | let parts: Vec<&str> = line.splitn(4, ':').collect(); | |
128 | if parts.len() != 4 { | |
129 | bail!("wrong numper of components"); | |
09340f28 | 130 | } |
1d44f175 DM |
131 | |
132 | let time: f64 = parts[0].parse() | |
133 | .map_err(|_| format_err!("unable to parse time"))?; | |
134 | let value: f64 = parts[1].parse() | |
135 | .map_err(|_| format_err!("unable to parse value"))?; | |
136 | let dst: u8 = parts[2].parse() | |
137 | .map_err(|_| format_err!("unable to parse data source type"))?; | |
138 | ||
139 | let dst = match dst { | |
140 | 0 => DST::Gauge, | |
141 | 1 => DST::Derive, | |
142 | _ => bail!("got strange value for data source type '{}'", dst), | |
143 | }; | |
144 | ||
145 | let rel_path = parts[3].to_string(); | |
146 | ||
147 | Ok(JournalEntry { time, value, dst, rel_path }) | |
148 | } | |
149 | ||
658357c5 | 150 | /// Apply and commit the journal. Should be used at server startup. |
0ca41155 DM |
151 | pub fn apply_journal(&self) -> Result<bool, Error> { |
152 | let state = Arc::clone(&self.state); | |
153 | let rrd_map = Arc::clone(&self.rrd_map); | |
09340f28 | 154 | |
0ca41155 DM |
155 | let mut state_guard = self.state.write().unwrap(); |
156 | let journal_applied = state_guard.journal_applied; | |
1d44f175 | 157 | |
0ca41155 DM |
158 | if let Some(ref recv) = state_guard.apply_thread_result { |
159 | match recv.try_recv() { | |
160 | Ok(Ok(())) => { | |
161 | // finished without errors, OK | |
c17fbbbc | 162 | state_guard.apply_thread_result = None; |
0ca41155 DM |
163 | } |
164 | Ok(Err(err)) => { | |
165 | // finished with errors, log them | |
166 | log::error!("{}", err); | |
c17fbbbc | 167 | state_guard.apply_thread_result = None; |
0ca41155 DM |
168 | } |
169 | Err(TryRecvError::Empty) => { | |
170 | // still running | |
171 | return Ok(journal_applied); | |
172 | } | |
173 | Err(TryRecvError::Disconnected) => { | |
174 | // crashed, start again | |
175 | log::error!("apply journal thread crashed - try again"); | |
c17fbbbc | 176 | state_guard.apply_thread_result = None; |
658357c5 | 177 | } |
658357c5 | 178 | } |
658357c5 DM |
179 | } |
180 | ||
c17fbbbc DM |
181 | let now = proxmox_time::epoch_f64(); |
182 | let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval; | |
183 | ||
184 | if journal_applied && !wants_commit { return Ok(journal_applied); } | |
185 | ||
0ca41155 | 186 | state_guard.last_journal_flush = proxmox_time::epoch_f64(); |
1d44f175 | 187 | |
0ca41155 DM |
188 | let (sender, receiver) = bounded(1); |
189 | state_guard.apply_thread_result = Some(receiver); | |
1d44f175 | 190 | |
0ca41155 DM |
191 | spawn(move || { |
192 | let result = apply_and_commit_journal_thread(state, rrd_map, journal_applied) | |
193 | .map_err(|err| err.to_string()); | |
194 | sender.send(result).unwrap(); | |
195 | }); | |
658357c5 | 196 | |
0ca41155 | 197 | Ok(journal_applied) |
658357c5 DM |
198 | } |
199 | ||
09340f28 | 200 | |
400f0814 | 201 | /// Update data in RAM and write file back to disk (journal) |
09340f28 DM |
202 | pub fn update_value( |
203 | &self, | |
204 | rel_path: &str, | |
58f70bcc | 205 | time: f64, |
09340f28 DM |
206 | value: f64, |
207 | dst: DST, | |
09340f28 DM |
208 | ) -> Result<(), Error> { |
209 | ||
0ca41155 | 210 | let journal_applied = self.apply_journal()?; |
1d44f175 | 211 | |
f8430423 DM |
212 | self.state.write().unwrap() |
213 | .append_journal_entry(time, value, dst, rel_path)?; | |
1d44f175 | 214 | |
0ca41155 DM |
215 | if journal_applied { |
216 | self.rrd_map.write().unwrap().update(rel_path, time, value, dst, false)?; | |
217 | } | |
09340f28 DM |
218 | |
219 | Ok(()) | |
220 | } | |
221 | ||
222 | /// Extract data from cached RRD | |
eb37d4ec DM |
223 | /// |
224 | /// `start`: Start time. If not sepecified, we simply extract 10 data points. | |
a9017805 | 225 | /// |
eb37d4ec | 226 | /// `end`: End time. Default is to use the current time. |
09340f28 DM |
227 | pub fn extract_cached_data( |
228 | &self, | |
229 | base: &str, | |
230 | name: &str, | |
eb37d4ec DM |
231 | cf: CF, |
232 | resolution: u64, | |
233 | start: Option<u64>, | |
234 | end: Option<u64>, | |
1198f8d4 | 235 | ) -> Result<Option<(u64, u64, Vec<Option<f64>>)>, Error> { |
fce7cd0d DM |
236 | self.rrd_map.read().unwrap() |
237 | .extract_cached_data(base, name, cf, resolution, start, end) | |
09340f28 | 238 | } |
09340f28 | 239 | } |
0ca41155 DM |
240 | |
241 | ||
242 | fn apply_and_commit_journal_thread( | |
243 | state: Arc<RwLock<JournalState>>, | |
244 | rrd_map: Arc<RwLock<RRDMap>>, | |
245 | commit_only: bool, | |
246 | ) -> Result<(), Error> { | |
247 | ||
248 | if commit_only { | |
249 | state.write().unwrap().rotate_journal()?; // start new journal, keep old one | |
250 | } else { | |
251 | let start_time = SystemTime::now(); | |
252 | log::debug!("applying rrd journal"); | |
253 | ||
254 | match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) { | |
255 | Ok(entries) => { | |
256 | let elapsed = start_time.elapsed().unwrap().as_secs_f64(); | |
257 | log::info!("applied rrd journal ({} entries in {:.3} seconds)", entries, elapsed); | |
258 | } | |
259 | Err(err) => bail!("apply rrd journal failed - {}", err), | |
260 | } | |
261 | } | |
262 | ||
263 | let start_time = SystemTime::now(); | |
264 | log::debug!("commit rrd journal"); | |
265 | ||
266 | match commit_journal_impl(state, rrd_map) { | |
267 | Ok(rrd_file_count) => { | |
268 | let elapsed = start_time.elapsed().unwrap().as_secs_f64(); | |
269 | log::info!("rrd journal successfully committed ({} files in {:.3} seconds)", | |
270 | rrd_file_count, elapsed); | |
271 | } | |
272 | Err(err) => bail!("rrd journal commit failed: {}", err), | |
273 | } | |
274 | Ok(()) | |
275 | } | |
276 | ||
277 | fn apply_journal_lines( | |
278 | state: Arc<RwLock<JournalState>>, | |
279 | rrd_map: Arc<RwLock<RRDMap>>, | |
280 | journal_name: &str, // used for logging | |
281 | reader: &mut BufReader<File>, | |
282 | lock_read_line: bool, | |
283 | ) -> Result<usize, Error> { | |
284 | ||
285 | let mut linenr = 0; | |
286 | ||
287 | loop { | |
288 | linenr += 1; | |
289 | let mut line = String::new(); | |
290 | let len = if lock_read_line { | |
291 | let _lock = state.read().unwrap(); // make sure we read entire lines | |
292 | reader.read_line(&mut line)? | |
293 | } else { | |
294 | reader.read_line(&mut line)? | |
295 | }; | |
296 | ||
297 | if len == 0 { break; } | |
298 | ||
299 | let entry = match RRDCache::parse_journal_line(&line) { | |
300 | Ok(entry) => entry, | |
301 | Err(err) => { | |
302 | log::warn!( | |
303 | "unable to parse rrd journal '{}' line {} (skip) - {}", | |
304 | journal_name, linenr, err, | |
305 | ); | |
306 | continue; // skip unparsable lines | |
307 | } | |
308 | }; | |
309 | ||
310 | rrd_map.write().unwrap().update(&entry.rel_path, entry.time, entry.value, entry.dst, true)?; | |
311 | } | |
312 | Ok(linenr) | |
313 | } | |
314 | ||
315 | fn apply_journal_impl( | |
316 | state: Arc<RwLock<JournalState>>, | |
317 | rrd_map: Arc<RwLock<RRDMap>>, | |
318 | ) -> Result<usize, Error> { | |
319 | ||
320 | let mut lines = 0; | |
321 | ||
322 | // Apply old journals first | |
323 | let journal_list = state.read().unwrap().list_old_journals()?; | |
324 | ||
2b05008a DM |
325 | for entry in journal_list { |
326 | log::info!("apply old journal log {}", entry.name); | |
327 | let file = std::fs::OpenOptions::new().read(true).open(&entry.path)?; | |
0ca41155 DM |
328 | let mut reader = BufReader::new(file); |
329 | lines += apply_journal_lines( | |
330 | Arc::clone(&state), | |
331 | Arc::clone(&rrd_map), | |
2b05008a | 332 | &entry.name, |
0ca41155 DM |
333 | &mut reader, |
334 | false, | |
335 | )?; | |
336 | } | |
337 | ||
338 | let mut journal = state.read().unwrap().open_journal_reader()?; | |
339 | ||
340 | lines += apply_journal_lines( | |
341 | Arc::clone(&state), | |
342 | Arc::clone(&rrd_map), | |
343 | "rrd.journal", | |
344 | &mut journal, | |
345 | true, | |
346 | )?; | |
347 | ||
348 | { | |
349 | let mut state_guard = state.write().unwrap(); // block other writers | |
350 | ||
351 | lines += apply_journal_lines( | |
352 | Arc::clone(&state), | |
353 | Arc::clone(&rrd_map), | |
354 | "rrd.journal", | |
355 | &mut journal, | |
356 | false, | |
357 | )?; | |
358 | ||
359 | state_guard.rotate_journal()?; // start new journal, keep old one | |
360 | ||
361 | // We need to apply the journal only once, because further updates | |
362 | // are always directly applied. | |
363 | state_guard.journal_applied = true; | |
364 | } | |
365 | ||
366 | ||
367 | Ok(lines) | |
368 | } | |
369 | ||
370 | fn commit_journal_impl( | |
371 | state: Arc<RwLock<JournalState>>, | |
372 | rrd_map: Arc<RwLock<RRDMap>>, | |
373 | ) -> Result<usize, Error> { | |
374 | ||
375 | // save all RRDs - we only need a read lock here | |
376 | let rrd_file_count = rrd_map.read().unwrap().flush_rrd_files()?; | |
377 | ||
378 | // if everything went ok, remove the old journal files | |
379 | state.write().unwrap().remove_old_journals()?; | |
380 | ||
381 | Ok(rrd_file_count) | |
382 | } |