]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-rrd/src/cache/journal.rs
use new fsync parameter to replace_file and atomic_open_or_create
[proxmox-backup.git] / proxmox-rrd / src / cache / journal.rs
1 use std::fs::File;
2 use std::path::PathBuf;
3 use std::sync::Arc;
4 use std::io::{Write, BufReader};
5 use std::ffi::OsStr;
6 use std::os::unix::io::AsRawFd;
7 use std::str::FromStr;
8
9 use anyhow::{bail, format_err, Error};
10 use nix::fcntl::OFlag;
11 use crossbeam_channel::Receiver;
12
13 use proxmox::tools::fs::atomic_open_or_create_file;
14
15 const RRD_JOURNAL_NAME: &str = "rrd.journal";
16
17 use crate::rrd::DST;
18 use crate::cache::CacheConfig;
19
20 // shared state behind RwLock
21 pub struct JournalState {
22 config: Arc<CacheConfig>,
23 journal: File,
24 pub last_journal_flush: f64,
25 pub journal_applied: bool,
26 pub apply_thread_result: Option<Receiver<Result<(), String>>>,
27 }
28
29 pub struct JournalEntry {
30 pub time: f64,
31 pub value: f64,
32 pub dst: DST,
33 pub rel_path: String,
34 }
35
36 impl FromStr for JournalEntry {
37 type Err = Error;
38
39 fn from_str(line: &str) -> Result<Self, Self::Err> {
40
41 let line = line.trim();
42
43 let parts: Vec<&str> = line.splitn(4, ':').collect();
44 if parts.len() != 4 {
45 bail!("wrong numper of components");
46 }
47
48 let time: f64 = parts[0].parse()
49 .map_err(|_| format_err!("unable to parse time"))?;
50 let value: f64 = parts[1].parse()
51 .map_err(|_| format_err!("unable to parse value"))?;
52 let dst: u8 = parts[2].parse()
53 .map_err(|_| format_err!("unable to parse data source type"))?;
54
55 let dst = match dst {
56 0 => DST::Gauge,
57 1 => DST::Derive,
58 _ => bail!("got strange value for data source type '{}'", dst),
59 };
60
61 let rel_path = parts[3].to_string();
62
63 Ok(JournalEntry { time, value, dst, rel_path })
64 }
65 }
66
67 pub struct JournalFileInfo {
68 pub time: u64,
69 pub name: String,
70 pub path: PathBuf,
71 }
72
73 impl JournalState {
74
75 pub(crate) fn new(config: Arc<CacheConfig>) -> Result<Self, Error> {
76 let journal = JournalState::open_journal_writer(&config)?;
77 Ok(Self {
78 config,
79 journal,
80 last_journal_flush: 0.0,
81 journal_applied: false,
82 apply_thread_result: None,
83 })
84 }
85
86 pub fn sync_journal(&self) -> Result<(), Error> {
87 nix::unistd::fdatasync(self.journal.as_raw_fd())?;
88 Ok(())
89 }
90
91 pub fn append_journal_entry(
92 &mut self,
93 time: f64,
94 value: f64,
95 dst: DST,
96 rel_path: &str,
97 ) -> Result<(), Error> {
98 let journal_entry = format!(
99 "{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
100 self.journal.write_all(journal_entry.as_bytes())?;
101 Ok(())
102 }
103
104 pub fn open_journal_reader(&self) -> Result<BufReader<File>, Error> {
105
106 // fixme : dup self.journal instead??
107 let mut journal_path = self.config.basedir.clone();
108 journal_path.push(RRD_JOURNAL_NAME);
109
110 let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
111 let journal = atomic_open_or_create_file(
112 &journal_path,
113 flags,
114 &[],
115 self.config.file_options.clone(),
116 false,
117 )?;
118 Ok(BufReader::new(journal))
119 }
120
121 fn open_journal_writer(config: &CacheConfig) -> Result<File, Error> {
122 let mut journal_path = config.basedir.clone();
123 journal_path.push(RRD_JOURNAL_NAME);
124
125 let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
126 let journal = atomic_open_or_create_file(
127 &journal_path,
128 flags,
129 &[],
130 config.file_options.clone(),
131 false,
132 )?;
133 Ok(journal)
134 }
135
136 pub fn rotate_journal(&mut self) -> Result<(), Error> {
137 let mut journal_path = self.config.basedir.clone();
138 journal_path.push(RRD_JOURNAL_NAME);
139
140 let mut new_name = journal_path.clone();
141 let now = proxmox_time::epoch_i64();
142 new_name.set_extension(format!("journal-{:08x}", now));
143 std::fs::rename(journal_path, &new_name)?;
144
145 self.journal = Self::open_journal_writer(&self.config)?;
146
147 // make sure the old journal data landed on the disk
148 super::fsync_file_and_parent(&new_name)?;
149
150 Ok(())
151 }
152
153 pub fn remove_old_journals(&self) -> Result<(), Error> {
154
155 let journal_list = self.list_old_journals()?;
156
157 for entry in journal_list {
158 std::fs::remove_file(entry.path)?;
159 }
160
161 Ok(())
162 }
163
164 pub fn list_old_journals(&self) -> Result<Vec<JournalFileInfo>, Error> {
165 let mut list = Vec::new();
166 for entry in std::fs::read_dir(&self.config.basedir)? {
167 let entry = entry?;
168 let path = entry.path();
169
170 if !path.is_file() { continue; }
171
172 match path.file_stem() {
173 None => continue,
174 Some(stem) if stem != OsStr::new("rrd") => continue,
175 Some(_) => (),
176 }
177
178 if let Some(extension) = path.extension() {
179 if let Some(extension) = extension.to_str() {
180 if let Some(rest) = extension.strip_prefix("journal-") {
181 if let Ok(time) = u64::from_str_radix(rest, 16) {
182 list.push(JournalFileInfo {
183 time,
184 name: format!("rrd.{}", extension),
185 path: path.to_owned(),
186 });
187 }
188 }
189 }
190 }
191 }
192 list.sort_unstable_by_key(|entry| entry.time);
193 Ok(list)
194 }
195 }