]>
Commit | Line | Data |
---|---|---|
d5b9d1f4 | 1 | use std::collections::BTreeSet; |
03555549 | 2 | use std::fs::File; |
d5b9d1f4 TL |
3 | use std::io::{BufRead, BufReader}; |
4 | use std::os::unix::io::AsRawFd; | |
8d1a9d2e | 5 | use std::path::{Path, PathBuf}; |
30b4800f | 6 | use std::sync::{Arc, RwLock}; |
30b4800f | 7 | use std::thread::spawn; |
d5b9d1f4 | 8 | use std::time::SystemTime; |
41271202 | 9 | |
d5b9d1f4 | 10 | use anyhow::{bail, format_err, Error}; |
9dcc64b7 | 11 | use crossbeam_channel::{bounded, TryRecvError}; |
8d1a9d2e | 12 | |
a092ef9c | 13 | use proxmox_sys::fs::{create_path, CreateOptions}; |
8d1a9d2e | 14 | |
2f942833 | 15 | use crate::rrd::{AggregationFn, Archive, DataSourceType, Database}; |
56b5c289 | 16 | use crate::Entry; |
8d1a9d2e | 17 | |
9dcc64b7 DM |
18 | mod journal; |
19 | use journal::*; | |
03555549 | 20 | |
4393b93a DM |
21 | mod rrd_map; |
22 | use rrd_map::*; | |
23 | ||
8d1a9d2e DM |
24 | /// RRD cache - keep RRD data in RAM, but write updates to disk |
25 | /// | |
26 | /// This cache is designed to run as single instance (no concurrent | |
27 | /// access from other processes). | |
2f942833 | 28 | pub struct Cache { |
30b4800f DM |
29 | config: Arc<CacheConfig>, |
30 | state: Arc<RwLock<JournalState>>, | |
31 | rrd_map: Arc<RwLock<RRDMap>>, | |
32 | } | |
33 | ||
9dcc64b7 | 34 | pub(crate) struct CacheConfig { |
03555549 | 35 | apply_interval: f64, |
8d1a9d2e DM |
36 | basedir: PathBuf, |
37 | file_options: CreateOptions, | |
30b4800f | 38 | dir_options: CreateOptions, |
2be07c22 DM |
39 | } |
40 | ||
2f942833 | 41 | impl Cache { |
8d1a9d2e | 42 | /// Creates a new instance |
8619b21e DM |
43 | /// |
44 | /// `basedir`: All files are stored relative to this path. | |
45 | /// | |
46 | /// `file_options`: Files are created with this options. | |
47 | /// | |
48 | /// `dir_options`: Directories are created with this options. | |
49 | /// | |
50 | /// `apply_interval`: Commit journal after `apply_interval` seconds. | |
51 | /// | |
26bd6a4f | 52 | /// `load_rrd_cb`; The callback function is used to load RRD files, |
8619b21e DM |
53 | /// and should return a newly generated RRD if the file does not |
54 | /// exists (or is unreadable). This may generate RRDs with | |
55 | /// different configurations (dependent on `rel_path`). | |
8d1a9d2e DM |
56 | pub fn new<P: AsRef<Path>>( |
57 | basedir: P, | |
58 | file_options: Option<CreateOptions>, | |
59 | dir_options: Option<CreateOptions>, | |
03555549 | 60 | apply_interval: f64, |
2f942833 | 61 | load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DataSourceType) -> Database, |
03555549 | 62 | ) -> Result<Self, Error> { |
8d1a9d2e | 63 | let basedir = basedir.as_ref().to_owned(); |
03555549 | 64 | |
d80d195c FG |
65 | let file_options = file_options.unwrap_or_else(CreateOptions::new); |
66 | let dir_options = dir_options.unwrap_or_else(CreateOptions::new); | |
03555549 | 67 | |
d5b9d1f4 TL |
68 | create_path( |
69 | &basedir, | |
70 | Some(dir_options.clone()), | |
71 | Some(dir_options.clone()), | |
72 | ) | |
73 | .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?; | |
03555549 | 74 | |
30b4800f | 75 | let config = Arc::new(CacheConfig { |
c8e73a22 FG |
76 | basedir, |
77 | file_options, | |
78 | dir_options, | |
30b4800f DM |
79 | apply_interval, |
80 | }); | |
81 | ||
82 | let state = JournalState::new(Arc::clone(&config))?; | |
83 | let rrd_map = RRDMap::new(Arc::clone(&config), load_rrd_cb); | |
2be07c22 | 84 | |
03555549 | 85 | Ok(Self { |
30b4800f DM |
86 | config: Arc::clone(&config), |
87 | state: Arc::new(RwLock::new(state)), | |
88 | rrd_map: Arc::new(RwLock::new(rrd_map)), | |
89 | }) | |
03555549 DM |
90 | } |
91 | ||
8619b21e DM |
92 | /// Create a new RRD as used by the proxmox backup server |
93 | /// | |
94 | /// It contains the following RRAs: | |
95 | /// | |
96 | /// * cf=average,r=60,n=1440 => 1day | |
97 | /// * cf=maximum,r=60,n=1440 => 1day | |
98 | /// * cf=average,r=30*60,n=1440 => 1month | |
99 | /// * cf=maximum,r=30*60,n=1440 => 1month | |
100 | /// * cf=average,r=6*3600,n=1440 => 1year | |
101 | /// * cf=maximum,r=6*3600,n=1440 => 1year | |
102 | /// * cf=average,r=7*86400,n=570 => 10years | |
103 | /// * cf=maximum,r=7*86400,n=570 => 10year | |
104 | /// | |
109902fb | 105 | /// The resulting data file size is about 80KB. |
2f942833 | 106 | pub fn create_proxmox_backup_default_rrd(dst: DataSourceType) -> Database { |
6149c171 TL |
107 | let rra_list = vec![ |
108 | // 1 min * 1440 => 1 day | |
2f942833 LW |
109 | Archive::new(AggregationFn::Average, 60, 1440), |
110 | Archive::new(AggregationFn::Maximum, 60, 1440), | |
6149c171 | 111 | // 30 min * 1440 => 30 days ~ 1 month |
2f942833 LW |
112 | Archive::new(AggregationFn::Average, 30 * 60, 1440), |
113 | Archive::new(AggregationFn::Maximum, 30 * 60, 1440), | |
6149c171 | 114 | // 6 h * 1440 => 360 days ~ 1 year |
2f942833 LW |
115 | Archive::new(AggregationFn::Average, 6 * 3600, 1440), |
116 | Archive::new(AggregationFn::Maximum, 6 * 3600, 1440), | |
6149c171 | 117 | // 1 week * 570 => 10 years |
2f942833 LW |
118 | Archive::new(AggregationFn::Average, 7 * 86400, 570), |
119 | Archive::new(AggregationFn::Maximum, 7 * 86400, 570), | |
6149c171 | 120 | ]; |
bc68dee1 | 121 | |
2f942833 | 122 | Database::new(dst, rra_list) |
bc68dee1 DM |
123 | } |
124 | ||
86b50e18 | 125 | /// Sync the journal data to disk (using `fdatasync` syscall) |
336e8f3e DM |
126 | pub fn sync_journal(&self) -> Result<(), Error> { |
127 | self.state.read().unwrap().sync_journal() | |
128 | } | |
129 | ||
3275f1ac | 130 | /// Apply and commit the journal. Should be used at server startup. |
30b4800f | 131 | pub fn apply_journal(&self) -> Result<bool, Error> { |
41271202 | 132 | let config = Arc::clone(&self.config); |
30b4800f DM |
133 | let state = Arc::clone(&self.state); |
134 | let rrd_map = Arc::clone(&self.rrd_map); | |
8d1a9d2e | 135 | |
30b4800f DM |
136 | let mut state_guard = self.state.write().unwrap(); |
137 | let journal_applied = state_guard.journal_applied; | |
03555549 | 138 | |
30b4800f DM |
139 | if let Some(ref recv) = state_guard.apply_thread_result { |
140 | match recv.try_recv() { | |
141 | Ok(Ok(())) => { | |
142 | // finished without errors, OK | |
cc0bb597 | 143 | state_guard.apply_thread_result = None; |
30b4800f DM |
144 | } |
145 | Ok(Err(err)) => { | |
146 | // finished with errors, log them | |
147 | log::error!("{}", err); | |
cc0bb597 | 148 | state_guard.apply_thread_result = None; |
30b4800f DM |
149 | } |
150 | Err(TryRecvError::Empty) => { | |
151 | // still running | |
152 | return Ok(journal_applied); | |
153 | } | |
154 | Err(TryRecvError::Disconnected) => { | |
155 | // crashed, start again | |
156 | log::error!("apply journal thread crashed - try again"); | |
cc0bb597 | 157 | state_guard.apply_thread_result = None; |
3275f1ac | 158 | } |
3275f1ac | 159 | } |
3275f1ac DM |
160 | } |
161 | ||
cc0bb597 DM |
162 | let now = proxmox_time::epoch_f64(); |
163 | let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval; | |
164 | ||
d5b9d1f4 TL |
165 | if journal_applied && !wants_commit { |
166 | return Ok(journal_applied); | |
167 | } | |
cc0bb597 | 168 | |
30b4800f | 169 | state_guard.last_journal_flush = proxmox_time::epoch_f64(); |
03555549 | 170 | |
30b4800f DM |
171 | let (sender, receiver) = bounded(1); |
172 | state_guard.apply_thread_result = Some(receiver); | |
03555549 | 173 | |
30b4800f | 174 | spawn(move || { |
41271202 | 175 | let result = apply_and_commit_journal_thread(config, state, rrd_map, journal_applied) |
30b4800f DM |
176 | .map_err(|err| err.to_string()); |
177 | sender.send(result).unwrap(); | |
178 | }); | |
3275f1ac | 179 | |
30b4800f | 180 | Ok(journal_applied) |
3275f1ac DM |
181 | } |
182 | ||
2c72c6a7 | 183 | /// Update data in RAM and write file back to disk (journal) |
8d1a9d2e DM |
184 | pub fn update_value( |
185 | &self, | |
186 | rel_path: &str, | |
a1eede69 | 187 | time: f64, |
8d1a9d2e | 188 | value: f64, |
2f942833 | 189 | dst: DataSourceType, |
8d1a9d2e | 190 | ) -> Result<(), Error> { |
30b4800f | 191 | let journal_applied = self.apply_journal()?; |
03555549 | 192 | |
d5b9d1f4 TL |
193 | self.state |
194 | .write() | |
195 | .unwrap() | |
9dcc64b7 | 196 | .append_journal_entry(time, value, dst, rel_path)?; |
03555549 | 197 | |
30b4800f | 198 | if journal_applied { |
d5b9d1f4 TL |
199 | self.rrd_map |
200 | .write() | |
201 | .unwrap() | |
202 | .update(rel_path, time, value, dst, false)?; | |
30b4800f | 203 | } |
8d1a9d2e DM |
204 | |
205 | Ok(()) | |
206 | } | |
207 | ||
208 | /// Extract data from cached RRD | |
cf097c5a | 209 | /// |
109902fb | 210 | /// `start`: Start time. If not specified, we simply extract 10 data points. |
26bd6a4f | 211 | /// |
cf097c5a | 212 | /// `end`: End time. Default is to use the current time. |
8d1a9d2e DM |
213 | pub fn extract_cached_data( |
214 | &self, | |
215 | base: &str, | |
216 | name: &str, | |
2f942833 | 217 | cf: AggregationFn, |
cf097c5a DM |
218 | resolution: u64, |
219 | start: Option<u64>, | |
220 | end: Option<u64>, | |
56b5c289 | 221 | ) -> Result<Option<Entry>, Error> { |
d5b9d1f4 TL |
222 | self.rrd_map |
223 | .read() | |
224 | .unwrap() | |
2be07c22 | 225 | .extract_cached_data(base, name, cf, resolution, start, end) |
8d1a9d2e | 226 | } |
8d1a9d2e | 227 | } |
30b4800f | 228 | |
30b4800f | 229 | fn apply_and_commit_journal_thread( |
41271202 | 230 | config: Arc<CacheConfig>, |
30b4800f DM |
231 | state: Arc<RwLock<JournalState>>, |
232 | rrd_map: Arc<RwLock<RRDMap>>, | |
233 | commit_only: bool, | |
234 | ) -> Result<(), Error> { | |
30b4800f DM |
235 | if commit_only { |
236 | state.write().unwrap().rotate_journal()?; // start new journal, keep old one | |
237 | } else { | |
238 | let start_time = SystemTime::now(); | |
239 | log::debug!("applying rrd journal"); | |
240 | ||
241 | match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) { | |
242 | Ok(entries) => { | |
243 | let elapsed = start_time.elapsed().unwrap().as_secs_f64(); | |
d5b9d1f4 TL |
244 | log::info!( |
245 | "applied rrd journal ({} entries in {:.3} seconds)", | |
246 | entries, | |
247 | elapsed | |
248 | ); | |
30b4800f DM |
249 | } |
250 | Err(err) => bail!("apply rrd journal failed - {}", err), | |
251 | } | |
252 | } | |
253 | ||
254 | let start_time = SystemTime::now(); | |
255 | log::debug!("commit rrd journal"); | |
256 | ||
41271202 | 257 | match commit_journal_impl(config, state, rrd_map) { |
30b4800f DM |
258 | Ok(rrd_file_count) => { |
259 | let elapsed = start_time.elapsed().unwrap().as_secs_f64(); | |
d5b9d1f4 TL |
260 | log::info!( |
261 | "rrd journal successfully committed ({} files in {:.3} seconds)", | |
262 | rrd_file_count, | |
263 | elapsed | |
264 | ); | |
30b4800f DM |
265 | } |
266 | Err(err) => bail!("rrd journal commit failed: {}", err), | |
267 | } | |
268 | Ok(()) | |
269 | } | |
270 | ||
271 | fn apply_journal_lines( | |
272 | state: Arc<RwLock<JournalState>>, | |
273 | rrd_map: Arc<RwLock<RRDMap>>, | |
274 | journal_name: &str, // used for logging | |
275 | reader: &mut BufReader<File>, | |
276 | lock_read_line: bool, | |
277 | ) -> Result<usize, Error> { | |
30b4800f DM |
278 | let mut linenr = 0; |
279 | ||
280 | loop { | |
281 | linenr += 1; | |
282 | let mut line = String::new(); | |
283 | let len = if lock_read_line { | |
284 | let _lock = state.read().unwrap(); // make sure we read entire lines | |
285 | reader.read_line(&mut line)? | |
286 | } else { | |
287 | reader.read_line(&mut line)? | |
288 | }; | |
289 | ||
d5b9d1f4 TL |
290 | if len == 0 { |
291 | break; | |
292 | } | |
30b4800f | 293 | |
ed6a7f52 | 294 | let entry: JournalEntry = match line.parse() { |
30b4800f DM |
295 | Ok(entry) => entry, |
296 | Err(err) => { | |
297 | log::warn!( | |
298 | "unable to parse rrd journal '{}' line {} (skip) - {}", | |
d5b9d1f4 TL |
299 | journal_name, |
300 | linenr, | |
301 | err, | |
30b4800f | 302 | ); |
f9e8ebfd | 303 | continue; // skip unparseable lines |
30b4800f DM |
304 | } |
305 | }; | |
306 | ||
d5b9d1f4 TL |
307 | rrd_map.write().unwrap().update( |
308 | &entry.rel_path, | |
309 | entry.time, | |
310 | entry.value, | |
311 | entry.dst, | |
312 | true, | |
313 | )?; | |
30b4800f DM |
314 | } |
315 | Ok(linenr) | |
316 | } | |
317 | ||
318 | fn apply_journal_impl( | |
319 | state: Arc<RwLock<JournalState>>, | |
320 | rrd_map: Arc<RwLock<RRDMap>>, | |
321 | ) -> Result<usize, Error> { | |
30b4800f DM |
322 | let mut lines = 0; |
323 | ||
324 | // Apply old journals first | |
325 | let journal_list = state.read().unwrap().list_old_journals()?; | |
326 | ||
a74384f7 DM |
327 | for entry in journal_list { |
328 | log::info!("apply old journal log {}", entry.name); | |
329 | let file = std::fs::OpenOptions::new().read(true).open(&entry.path)?; | |
30b4800f DM |
330 | let mut reader = BufReader::new(file); |
331 | lines += apply_journal_lines( | |
332 | Arc::clone(&state), | |
333 | Arc::clone(&rrd_map), | |
a74384f7 | 334 | &entry.name, |
30b4800f DM |
335 | &mut reader, |
336 | false, | |
337 | )?; | |
338 | } | |
339 | ||
340 | let mut journal = state.read().unwrap().open_journal_reader()?; | |
341 | ||
342 | lines += apply_journal_lines( | |
343 | Arc::clone(&state), | |
344 | Arc::clone(&rrd_map), | |
345 | "rrd.journal", | |
346 | &mut journal, | |
347 | true, | |
348 | )?; | |
349 | ||
350 | { | |
351 | let mut state_guard = state.write().unwrap(); // block other writers | |
352 | ||
353 | lines += apply_journal_lines( | |
354 | Arc::clone(&state), | |
355 | Arc::clone(&rrd_map), | |
356 | "rrd.journal", | |
357 | &mut journal, | |
358 | false, | |
359 | )?; | |
360 | ||
361 | state_guard.rotate_journal()?; // start new journal, keep old one | |
362 | ||
363 | // We need to apply the journal only once, because further updates | |
364 | // are always directly applied. | |
365 | state_guard.journal_applied = true; | |
366 | } | |
367 | ||
30b4800f DM |
368 | Ok(lines) |
369 | } | |
370 | ||
41271202 DM |
371 | fn fsync_file_or_dir(path: &Path) -> Result<(), Error> { |
372 | let file = std::fs::File::open(path)?; | |
373 | nix::unistd::fsync(file.as_raw_fd())?; | |
374 | Ok(()) | |
375 | } | |
376 | ||
d5b9d1f4 | 377 | pub(crate) fn fsync_file_and_parent(path: &Path) -> Result<(), Error> { |
41271202 DM |
378 | let file = std::fs::File::open(path)?; |
379 | nix::unistd::fsync(file.as_raw_fd())?; | |
380 | if let Some(parent) = path.parent() { | |
381 | fsync_file_or_dir(parent)?; | |
382 | } | |
383 | Ok(()) | |
384 | } | |
385 | ||
386 | fn rrd_parent_dir(basedir: &Path, rel_path: &str) -> PathBuf { | |
387 | let mut path = basedir.to_owned(); | |
388 | let rel_path = Path::new(rel_path); | |
389 | if let Some(parent) = rel_path.parent() { | |
390 | path.push(parent); | |
391 | } | |
392 | path | |
393 | } | |
394 | ||
30b4800f | 395 | fn commit_journal_impl( |
41271202 | 396 | config: Arc<CacheConfig>, |
30b4800f DM |
397 | state: Arc<RwLock<JournalState>>, |
398 | rrd_map: Arc<RwLock<RRDMap>>, | |
399 | ) -> Result<usize, Error> { | |
77c2e466 DM |
400 | let files = rrd_map.read().unwrap().file_list(); |
401 | ||
402 | let mut rrd_file_count = 0; | |
403 | let mut errors = 0; | |
404 | ||
41271202 DM |
405 | let mut dir_set = BTreeSet::new(); |
406 | ||
407 | log::info!("write rrd data back to disk"); | |
408 | ||
30b4800f | 409 | // save all RRDs - we only need a read lock here |
41271202 | 410 | // Note: no fsync here (we do it afterwards) |
77c2e466 | 411 | for rel_path in files.iter() { |
5b193680 | 412 | let parent_dir = rrd_parent_dir(&config.basedir, rel_path); |
41271202 | 413 | dir_set.insert(parent_dir); |
77c2e466 | 414 | rrd_file_count += 1; |
5b193680 | 415 | if let Err(err) = rrd_map.read().unwrap().flush_rrd_file(rel_path) { |
77c2e466 DM |
416 | errors += 1; |
417 | log::error!("unable to save rrd {}: {}", rel_path, err); | |
418 | } | |
419 | } | |
420 | ||
421 | if errors != 0 { | |
422 | bail!("errors during rrd flush - unable to commit rrd journal"); | |
423 | } | |
30b4800f | 424 | |
41271202 DM |
425 | // Important: We fsync files after writing all data! This increase |
426 | // the likelihood that files are already synced, so this is | |
427 | // much faster (although we need to re-open the files). | |
428 | ||
429 | log::info!("starting rrd data sync"); | |
430 | ||
431 | for rel_path in files.iter() { | |
432 | let mut path = config.basedir.clone(); | |
1aa6f0ea | 433 | path.push(rel_path); |
41271202 DM |
434 | fsync_file_or_dir(&path) |
435 | .map_err(|err| format_err!("fsync rrd file {} failed - {}", rel_path, err))?; | |
436 | } | |
437 | ||
438 | // also fsync directories | |
439 | for dir_path in dir_set { | |
440 | fsync_file_or_dir(&dir_path) | |
441 | .map_err(|err| format_err!("fsync rrd dir {:?} failed - {}", dir_path, err))?; | |
442 | } | |
443 | ||
30b4800f DM |
444 | // if everything went ok, remove the old journal files |
445 | state.write().unwrap().remove_old_journals()?; | |
446 | ||
447 | Ok(rrd_file_count) | |
448 | } |