]> git.proxmox.com Git - proxmox.git/blame - proxmox-rrd/src/cache/journal.rs
rrd: fix a few typos
[proxmox.git] / proxmox-rrd / src / cache / journal.rs
CommitLineData
9dcc64b7 1use std::ffi::OsStr;
d5b9d1f4
TL
2use std::fs::File;
3use std::io::{BufReader, Write};
336e8f3e 4use std::os::unix::io::AsRawFd;
d5b9d1f4 5use std::path::PathBuf;
ed6a7f52 6use std::str::FromStr;
d5b9d1f4 7use std::sync::Arc;
9dcc64b7 8
ed6a7f52 9use anyhow::{bail, format_err, Error};
9dcc64b7 10use crossbeam_channel::Receiver;
d5b9d1f4 11use nix::fcntl::OFlag;
9dcc64b7 12
a092ef9c 13use proxmox_sys::fs::atomic_open_or_create_file;
9dcc64b7
DM
14
15const RRD_JOURNAL_NAME: &str = "rrd.journal";
16
9dcc64b7 17use crate::cache::CacheConfig;
d5b9d1f4 18use crate::rrd::DST;
9dcc64b7
DM
19
20// shared state behind RwLock
21pub struct JournalState {
22 config: Arc<CacheConfig>,
23 journal: File,
24 pub last_journal_flush: f64,
25 pub journal_applied: bool,
26 pub apply_thread_result: Option<Receiver<Result<(), String>>>,
27}
28
29pub struct JournalEntry {
30 pub time: f64,
31 pub value: f64,
32 pub dst: DST,
33 pub rel_path: String,
34}
35
ed6a7f52
DM
36impl FromStr for JournalEntry {
37 type Err = Error;
38
d5b9d1f4
TL
39 fn from_str(line: &str) -> Result<Self, Self::Err> {
40 let line = line.trim();
ed6a7f52
DM
41
42 let parts: Vec<&str> = line.splitn(4, ':').collect();
43 if parts.len() != 4 {
f9e8ebfd 44 bail!("wrong number of components");
ed6a7f52
DM
45 }
46
d5b9d1f4
TL
47 let time: f64 = parts[0]
48 .parse()
ed6a7f52 49 .map_err(|_| format_err!("unable to parse time"))?;
d5b9d1f4
TL
50 let value: f64 = parts[1]
51 .parse()
ed6a7f52 52 .map_err(|_| format_err!("unable to parse value"))?;
d5b9d1f4
TL
53 let dst: u8 = parts[2]
54 .parse()
ed6a7f52
DM
55 .map_err(|_| format_err!("unable to parse data source type"))?;
56
57 let dst = match dst {
58 0 => DST::Gauge,
59 1 => DST::Derive,
60 _ => bail!("got strange value for data source type '{}'", dst),
61 };
62
63 let rel_path = parts[3].to_string();
64
d5b9d1f4
TL
65 Ok(JournalEntry {
66 time,
67 value,
68 dst,
69 rel_path,
70 })
71 }
ed6a7f52
DM
72}
73
a74384f7
DM
74pub struct JournalFileInfo {
75 pub time: u64,
76 pub name: String,
77 pub path: PathBuf,
78}
79
9dcc64b7 80impl JournalState {
9dcc64b7
DM
81 pub(crate) fn new(config: Arc<CacheConfig>) -> Result<Self, Error> {
82 let journal = JournalState::open_journal_writer(&config)?;
83 Ok(Self {
84 config,
85 journal,
86 last_journal_flush: 0.0,
87 journal_applied: false,
88 apply_thread_result: None,
89 })
90 }
91
336e8f3e
DM
92 pub fn sync_journal(&self) -> Result<(), Error> {
93 nix::unistd::fdatasync(self.journal.as_raw_fd())?;
94 Ok(())
95 }
96
9dcc64b7
DM
97 pub fn append_journal_entry(
98 &mut self,
99 time: f64,
100 value: f64,
101 dst: DST,
102 rel_path: &str,
103 ) -> Result<(), Error> {
d5b9d1f4 104 let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
9dcc64b7
DM
105 self.journal.write_all(journal_entry.as_bytes())?;
106 Ok(())
107 }
108
109 pub fn open_journal_reader(&self) -> Result<BufReader<File>, Error> {
9dcc64b7
DM
110 // fixme : dup self.journal instead??
111 let mut journal_path = self.config.basedir.clone();
112 journal_path.push(RRD_JOURNAL_NAME);
113
d5b9d1f4 114 let flags = OFlag::O_CLOEXEC | OFlag::O_RDONLY;
9dcc64b7
DM
115 let journal = atomic_open_or_create_file(
116 &journal_path,
117 flags,
118 &[],
119 self.config.file_options.clone(),
75ca726c 120 false,
9dcc64b7
DM
121 )?;
122 Ok(BufReader::new(journal))
123 }
124
125 fn open_journal_writer(config: &CacheConfig) -> Result<File, Error> {
126 let mut journal_path = config.basedir.clone();
127 journal_path.push(RRD_JOURNAL_NAME);
128
d5b9d1f4 129 let flags = OFlag::O_CLOEXEC | OFlag::O_WRONLY | OFlag::O_APPEND;
9dcc64b7
DM
130 let journal = atomic_open_or_create_file(
131 &journal_path,
132 flags,
133 &[],
134 config.file_options.clone(),
75ca726c 135 false,
9dcc64b7
DM
136 )?;
137 Ok(journal)
138 }
139
140 pub fn rotate_journal(&mut self) -> Result<(), Error> {
141 let mut journal_path = self.config.basedir.clone();
142 journal_path.push(RRD_JOURNAL_NAME);
143
144 let mut new_name = journal_path.clone();
145 let now = proxmox_time::epoch_i64();
146 new_name.set_extension(format!("journal-{:08x}", now));
41271202 147 std::fs::rename(journal_path, &new_name)?;
9dcc64b7
DM
148
149 self.journal = Self::open_journal_writer(&self.config)?;
41271202
DM
150
151 // make sure the old journal data landed on the disk
152 super::fsync_file_and_parent(&new_name)?;
153
9dcc64b7
DM
154 Ok(())
155 }
156
157 pub fn remove_old_journals(&self) -> Result<(), Error> {
9dcc64b7
DM
158 let journal_list = self.list_old_journals()?;
159
a74384f7
DM
160 for entry in journal_list {
161 std::fs::remove_file(entry.path)?;
9dcc64b7
DM
162 }
163
164 Ok(())
165 }
166
a74384f7 167 pub fn list_old_journals(&self) -> Result<Vec<JournalFileInfo>, Error> {
9dcc64b7
DM
168 let mut list = Vec::new();
169 for entry in std::fs::read_dir(&self.config.basedir)? {
170 let entry = entry?;
171 let path = entry.path();
e23f3ec7 172
d5b9d1f4
TL
173 if !path.is_file() {
174 continue;
175 }
e23f3ec7
DM
176
177 match path.file_stem() {
178 None => continue,
179 Some(stem) if stem != OsStr::new("rrd") => continue,
180 Some(_) => (),
181 }
182
183 if let Some(extension) = path.extension() {
184 if let Some(extension) = extension.to_str() {
185 if let Some(rest) = extension.strip_prefix("journal-") {
186 if let Ok(time) = u64::from_str_radix(rest, 16) {
187 list.push(JournalFileInfo {
188 time,
189 name: format!("rrd.{}", extension),
190 path: path.to_owned(),
191 });
9dcc64b7
DM
192 }
193 }
194 }
195 }
196 }
a74384f7 197 list.sort_unstable_by_key(|entry| entry.time);
9dcc64b7
DM
198 Ok(list)
199 }
200}