2 use std
::path
::PathBuf
;
4 use std
::io
::{Write, BufReader}
;
6 use std
::os
::unix
::io
::AsRawFd
;
9 use anyhow
::{bail, format_err, Error}
;
10 use nix
::fcntl
::OFlag
;
11 use crossbeam_channel
::Receiver
;
13 use proxmox
::tools
::fs
::atomic_open_or_create_file
;
15 const RRD_JOURNAL_NAME
: &str = "rrd.journal";
18 use crate::cache
::CacheConfig
;
20 // shared state behind RwLock
21 pub struct JournalState
{
22 config
: Arc
<CacheConfig
>,
24 pub last_journal_flush
: f64,
25 pub journal_applied
: bool
,
26 pub apply_thread_result
: Option
<Receiver
<Result
<(), String
>>>,
29 pub struct JournalEntry
{
36 impl FromStr
for JournalEntry
{
39 fn from_str(line
: &str) -> Result
<Self, Self::Err
> {
41 let line
= line
.trim();
43 let parts
: Vec
<&str> = line
.splitn(4, '
:'
).collect();
45 bail
!("wrong numper of components");
48 let time
: f64 = parts
[0].parse()
49 .map_err(|_
| format_err
!("unable to parse time"))?
;
50 let value
: f64 = parts
[1].parse()
51 .map_err(|_
| format_err
!("unable to parse value"))?
;
52 let dst
: u8 = parts
[2].parse()
53 .map_err(|_
| format_err
!("unable to parse data source type"))?
;
58 _
=> bail
!("got strange value for data source type '{}'", dst
),
61 let rel_path
= parts
[3].to_string();
63 Ok(JournalEntry { time, value, dst, rel_path }
)
67 pub struct JournalFileInfo
{
75 pub(crate) fn new(config
: Arc
<CacheConfig
>) -> Result
<Self, Error
> {
76 let journal
= JournalState
::open_journal_writer(&config
)?
;
80 last_journal_flush
: 0.0,
81 journal_applied
: false,
82 apply_thread_result
: None
,
86 pub fn sync_journal(&self) -> Result
<(), Error
> {
87 nix
::unistd
::fdatasync(self.journal
.as_raw_fd())?
;
91 pub fn append_journal_entry(
97 ) -> Result
<(), Error
> {
98 let journal_entry
= format
!(
99 "{}:{}:{}:{}\n", time
, value
, dst
as u8, rel_path
);
100 self.journal
.write_all(journal_entry
.as_bytes())?
;
104 pub fn open_journal_reader(&self) -> Result
<BufReader
<File
>, Error
> {
106 // fixme : dup self.journal instead??
107 let mut journal_path
= self.config
.basedir
.clone();
108 journal_path
.push(RRD_JOURNAL_NAME
);
110 let flags
= OFlag
::O_CLOEXEC
|OFlag
::O_RDONLY
;
111 let journal
= atomic_open_or_create_file(
115 self.config
.file_options
.clone(),
118 Ok(BufReader
::new(journal
))
121 fn open_journal_writer(config
: &CacheConfig
) -> Result
<File
, Error
> {
122 let mut journal_path
= config
.basedir
.clone();
123 journal_path
.push(RRD_JOURNAL_NAME
);
125 let flags
= OFlag
::O_CLOEXEC
|OFlag
::O_WRONLY
|OFlag
::O_APPEND
;
126 let journal
= atomic_open_or_create_file(
130 config
.file_options
.clone(),
136 pub fn rotate_journal(&mut self) -> Result
<(), Error
> {
137 let mut journal_path
= self.config
.basedir
.clone();
138 journal_path
.push(RRD_JOURNAL_NAME
);
140 let mut new_name
= journal_path
.clone();
141 let now
= proxmox_time
::epoch_i64();
142 new_name
.set_extension(format
!("journal-{:08x}", now
));
143 std
::fs
::rename(journal_path
, &new_name
)?
;
145 self.journal
= Self::open_journal_writer(&self.config
)?
;
147 // make sure the old journal data landed on the disk
148 super::fsync_file_and_parent(&new_name
)?
;
153 pub fn remove_old_journals(&self) -> Result
<(), Error
> {
155 let journal_list
= self.list_old_journals()?
;
157 for entry
in journal_list
{
158 std
::fs
::remove_file(entry
.path
)?
;
164 pub fn list_old_journals(&self) -> Result
<Vec
<JournalFileInfo
>, Error
> {
165 let mut list
= Vec
::new();
166 for entry
in std
::fs
::read_dir(&self.config
.basedir
)?
{
168 let path
= entry
.path();
170 if !path
.is_file() { continue; }
172 match path
.file_stem() {
174 Some(stem
) if stem
!= OsStr
::new("rrd") => continue,
178 if let Some(extension
) = path
.extension() {
179 if let Some(extension
) = extension
.to_str() {
180 if let Some(rest
) = extension
.strip_prefix("journal-") {
181 if let Ok(time
) = u64::from_str_radix(rest
, 16) {
182 list
.push(JournalFileInfo
{
184 name
: format
!("rrd.{}", extension
),
185 path
: path
.to_owned(),
192 list
.sort_unstable_by_key(|entry
| entry
.time
);