]>
Commit | Line | Data |
---|---|---|
9dcc64b7 | 1 | use std::ffi::OsStr; |
d5b9d1f4 TL |
2 | use std::fs::File; |
3 | use std::io::{BufReader, Write}; | |
336e8f3e | 4 | use std::os::unix::io::AsRawFd; |
d5b9d1f4 | 5 | use std::path::PathBuf; |
ed6a7f52 | 6 | use std::str::FromStr; |
d5b9d1f4 | 7 | use std::sync::Arc; |
9dcc64b7 | 8 | |
ed6a7f52 | 9 | use anyhow::{bail, format_err, Error}; |
9dcc64b7 | 10 | use crossbeam_channel::Receiver; |
d5b9d1f4 | 11 | use nix::fcntl::OFlag; |
9dcc64b7 | 12 | |
a092ef9c | 13 | use proxmox_sys::fs::atomic_open_or_create_file; |
9dcc64b7 DM |
14 | |
15 | const RRD_JOURNAL_NAME: &str = "rrd.journal"; | |
16 | ||
9dcc64b7 | 17 | use crate::cache::CacheConfig; |
d5b9d1f4 | 18 | use crate::rrd::DST; |
9dcc64b7 DM |
19 | |
20 | // shared state behind RwLock | |
21 | pub struct JournalState { | |
22 | config: Arc<CacheConfig>, | |
23 | journal: File, | |
24 | pub last_journal_flush: f64, | |
25 | pub journal_applied: bool, | |
26 | pub apply_thread_result: Option<Receiver<Result<(), String>>>, | |
27 | } | |
28 | ||
29 | pub struct JournalEntry { | |
30 | pub time: f64, | |
31 | pub value: f64, | |
32 | pub dst: DST, | |
33 | pub rel_path: String, | |
34 | } | |
35 | ||
ed6a7f52 DM |
36 | impl FromStr for JournalEntry { |
37 | type Err = Error; | |
38 | ||
d5b9d1f4 TL |
39 | fn from_str(line: &str) -> Result<Self, Self::Err> { |
40 | let line = line.trim(); | |
ed6a7f52 DM |
41 | |
42 | let parts: Vec<&str> = line.splitn(4, ':').collect(); | |
43 | if parts.len() != 4 { | |
f9e8ebfd | 44 | bail!("wrong number of components"); |
ed6a7f52 DM |
45 | } |
46 | ||
d5b9d1f4 TL |
47 | let time: f64 = parts[0] |
48 | .parse() | |
ed6a7f52 | 49 | .map_err(|_| format_err!("unable to parse time"))?; |
d5b9d1f4 TL |
50 | let value: f64 = parts[1] |
51 | .parse() | |
ed6a7f52 | 52 | .map_err(|_| format_err!("unable to parse value"))?; |
d5b9d1f4 TL |
53 | let dst: u8 = parts[2] |
54 | .parse() | |
ed6a7f52 DM |
55 | .map_err(|_| format_err!("unable to parse data source type"))?; |
56 | ||
57 | let dst = match dst { | |
58 | 0 => DST::Gauge, | |
59 | 1 => DST::Derive, | |
60 | _ => bail!("got strange value for data source type '{}'", dst), | |
61 | }; | |
62 | ||
63 | let rel_path = parts[3].to_string(); | |
64 | ||
d5b9d1f4 TL |
65 | Ok(JournalEntry { |
66 | time, | |
67 | value, | |
68 | dst, | |
69 | rel_path, | |
70 | }) | |
71 | } | |
ed6a7f52 DM |
72 | } |
73 | ||
a74384f7 DM |
74 | pub struct JournalFileInfo { |
75 | pub time: u64, | |
76 | pub name: String, | |
77 | pub path: PathBuf, | |
78 | } | |
79 | ||
9dcc64b7 | 80 | impl JournalState { |
9dcc64b7 DM |
81 | pub(crate) fn new(config: Arc<CacheConfig>) -> Result<Self, Error> { |
82 | let journal = JournalState::open_journal_writer(&config)?; | |
83 | Ok(Self { | |
84 | config, | |
85 | journal, | |
86 | last_journal_flush: 0.0, | |
87 | journal_applied: false, | |
88 | apply_thread_result: None, | |
89 | }) | |
90 | } | |
91 | ||
336e8f3e DM |
92 | pub fn sync_journal(&self) -> Result<(), Error> { |
93 | nix::unistd::fdatasync(self.journal.as_raw_fd())?; | |
94 | Ok(()) | |
95 | } | |
96 | ||
9dcc64b7 DM |
97 | pub fn append_journal_entry( |
98 | &mut self, | |
99 | time: f64, | |
100 | value: f64, | |
101 | dst: DST, | |
102 | rel_path: &str, | |
103 | ) -> Result<(), Error> { | |
d5b9d1f4 | 104 | let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path); |
9dcc64b7 DM |
105 | self.journal.write_all(journal_entry.as_bytes())?; |
106 | Ok(()) | |
107 | } | |
108 | ||
109 | pub fn open_journal_reader(&self) -> Result<BufReader<File>, Error> { | |
9dcc64b7 DM |
110 | // fixme : dup self.journal instead?? |
111 | let mut journal_path = self.config.basedir.clone(); | |
112 | journal_path.push(RRD_JOURNAL_NAME); | |
113 | ||
d5b9d1f4 | 114 | let flags = OFlag::O_CLOEXEC | OFlag::O_RDONLY; |
9dcc64b7 DM |
115 | let journal = atomic_open_or_create_file( |
116 | &journal_path, | |
117 | flags, | |
118 | &[], | |
119 | self.config.file_options.clone(), | |
75ca726c | 120 | false, |
9dcc64b7 DM |
121 | )?; |
122 | Ok(BufReader::new(journal)) | |
123 | } | |
124 | ||
125 | fn open_journal_writer(config: &CacheConfig) -> Result<File, Error> { | |
126 | let mut journal_path = config.basedir.clone(); | |
127 | journal_path.push(RRD_JOURNAL_NAME); | |
128 | ||
d5b9d1f4 | 129 | let flags = OFlag::O_CLOEXEC | OFlag::O_WRONLY | OFlag::O_APPEND; |
9dcc64b7 DM |
130 | let journal = atomic_open_or_create_file( |
131 | &journal_path, | |
132 | flags, | |
133 | &[], | |
134 | config.file_options.clone(), | |
75ca726c | 135 | false, |
9dcc64b7 DM |
136 | )?; |
137 | Ok(journal) | |
138 | } | |
139 | ||
140 | pub fn rotate_journal(&mut self) -> Result<(), Error> { | |
141 | let mut journal_path = self.config.basedir.clone(); | |
142 | journal_path.push(RRD_JOURNAL_NAME); | |
143 | ||
144 | let mut new_name = journal_path.clone(); | |
145 | let now = proxmox_time::epoch_i64(); | |
146 | new_name.set_extension(format!("journal-{:08x}", now)); | |
41271202 | 147 | std::fs::rename(journal_path, &new_name)?; |
9dcc64b7 DM |
148 | |
149 | self.journal = Self::open_journal_writer(&self.config)?; | |
41271202 DM |
150 | |
151 | // make sure the old journal data landed on the disk | |
152 | super::fsync_file_and_parent(&new_name)?; | |
153 | ||
9dcc64b7 DM |
154 | Ok(()) |
155 | } | |
156 | ||
157 | pub fn remove_old_journals(&self) -> Result<(), Error> { | |
9dcc64b7 DM |
158 | let journal_list = self.list_old_journals()?; |
159 | ||
a74384f7 DM |
160 | for entry in journal_list { |
161 | std::fs::remove_file(entry.path)?; | |
9dcc64b7 DM |
162 | } |
163 | ||
164 | Ok(()) | |
165 | } | |
166 | ||
a74384f7 | 167 | pub fn list_old_journals(&self) -> Result<Vec<JournalFileInfo>, Error> { |
9dcc64b7 DM |
168 | let mut list = Vec::new(); |
169 | for entry in std::fs::read_dir(&self.config.basedir)? { | |
170 | let entry = entry?; | |
171 | let path = entry.path(); | |
e23f3ec7 | 172 | |
d5b9d1f4 TL |
173 | if !path.is_file() { |
174 | continue; | |
175 | } | |
e23f3ec7 DM |
176 | |
177 | match path.file_stem() { | |
178 | None => continue, | |
179 | Some(stem) if stem != OsStr::new("rrd") => continue, | |
180 | Some(_) => (), | |
181 | } | |
182 | ||
183 | if let Some(extension) = path.extension() { | |
184 | if let Some(extension) = extension.to_str() { | |
185 | if let Some(rest) = extension.strip_prefix("journal-") { | |
186 | if let Ok(time) = u64::from_str_radix(rest, 16) { | |
187 | list.push(JournalFileInfo { | |
188 | time, | |
189 | name: format!("rrd.{}", extension), | |
190 | path: path.to_owned(), | |
191 | }); | |
9dcc64b7 DM |
192 | } |
193 | } | |
194 | } | |
195 | } | |
196 | } | |
a74384f7 | 197 | list.sort_unstable_by_key(|entry| entry.time); |
9dcc64b7 DM |
198 | Ok(list) |
199 | } | |
200 | } |