]> git.proxmox.com Git - proxmox-backup.git/blob - src/config/jobstate.rs
45672cea4a00616686b5ece57014f566928cc727
[proxmox-backup.git] / src / config / jobstate.rs
1 //! Generic JobState handling
2 //!
3 //! A 'Job' can have 3 states
4 //! - Created, when a schedule was created but never executed
5 //! - Started, when a job is running right now
6 //! - Finished, when a job was running in the past
7 //!
8 //! and is identified by 2 values: jobtype and jobname (e.g. 'syncjob' and 'myfirstsyncjob')
9 //!
10 //! This module Provides 2 helper structs to handle those coniditons
11 //! 'Job' which handles locking and writing to a file
12 //! 'JobState' which is the actual state
13 //!
14 //! an example usage would be
15 //! ```no_run
16 //! # use anyhow::{bail, Error};
17 //! # use proxmox_backup::server::TaskState;
18 //! # use proxmox_backup::config::jobstate::*;
19 //! # fn some_code() -> TaskState { TaskState::OK }
20 //! # fn code() -> Result<(), Error> {
21 //! // locks the correct file under /var/lib
22 //! // or fails if someone else holds the lock
23 //! let mut job = match Job::new("jobtype", "jobname") {
24 //! Ok(job) => job,
25 //! Err(err) => bail!("could not lock jobstate"),
26 //! };
27 //!
28 //! // job holds the lock
29 //! match job.load() {
30 //! Ok(()) => {},
31 //! Err(err) => bail!("could not load state {}", err),
32 //! }
33 //!
34 //! // now the job is loaded;
35 //! job.start("someupid")?;
36 //! // do something
37 //! let task_state = some_code();
38 //! job.finish(task_state)?;
39 //!
40 //! // release the lock
41 //! drop(job);
42 //! # Ok(())
43 //! # }
44 //!
45 //! ```
46 use std::fs::File;
47 use std::path::{Path, PathBuf};
48 use std::time::Duration;
49
50 use serde::{Serialize, Deserialize};
51 use anyhow::{bail, Error, format_err};
52 use proxmox::tools::fs::{file_read_optional_string, replace_file, create_path, CreateOptions, open_file_locked};
53
54 use crate::tools::epoch_now_u64;
55 use crate::server::{TaskState, UPID, worker_is_active_local, upid_read_status};
56
57 #[serde(rename_all="kebab-case")]
58 #[derive(Serialize,Deserialize)]
59 /// Represents the State of a specific Job
60 pub enum JobState {
61 /// A job was created at 'time', but never started/finished
62 Created { time: i64 },
63 /// The Job was last started in 'upid',
64 Started { upid: String },
65 /// The Job was last started in 'upid', which finished with 'state' at 'endtime'
66 Finished { upid: String, endtime: i64, state: TaskState }
67 }
68
69 /// Represents a Job and holds the correct lock
70 pub struct Job {
71 jobtype: String,
72 jobname: String,
73 /// The State of the job
74 pub state: JobState,
75 _lock: File,
76 }
77
78 const JOB_STATE_BASEDIR: &str = "/var/lib/proxmox-backup/jobstates";
79
80 /// Create jobstate stat dir with correct permission
81 pub fn create_jobstate_dir() -> Result<(), Error> {
82 let backup_user = crate::backup::backup_user()?;
83 let opts = CreateOptions::new()
84 .owner(backup_user.uid)
85 .group(backup_user.gid);
86
87 create_path(JOB_STATE_BASEDIR, None, Some(opts))
88 .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
89
90 Ok(())
91 }
92
93 fn get_path(jobtype: &str, jobname: &str) -> PathBuf {
94 let mut path = PathBuf::from(JOB_STATE_BASEDIR);
95 path.push(format!("{}-{}.json", jobtype, jobname));
96 path
97 }
98
99 fn get_lock<P>(path: P) -> Result<File, Error>
100 where
101 P: AsRef<Path>
102 {
103 let mut path = path.as_ref().to_path_buf();
104 path.set_extension("lck");
105 open_file_locked(path, Duration::new(10, 0))
106 }
107
108 /// Removes the statefile of a job, this is useful if we delete a job
109 pub fn remove_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
110 let path = get_path(jobtype, jobname);
111 let _lock = get_lock(&path)?;
112 std::fs::remove_file(&path).map_err(|err|
113 format_err!("cannot remove statefile for {} - {}: {}", jobtype, jobname, err)
114 )
115 }
116
117 /// Returns the last run time of a job by reading the statefile
118 /// Note that this is not locked
119 pub fn last_run_time(jobtype: &str, jobname: &str) -> Result<i64, Error> {
120 match JobState::load(jobtype, jobname)? {
121 JobState::Created { time } => Ok(time),
122 JobState::Started { upid } | JobState::Finished { upid, .. } => {
123 let upid: UPID = upid.parse().map_err(|err|
124 format_err!("could not parse upid from state: {}", err)
125 )?;
126 Ok(upid.starttime)
127 }
128 }
129 }
130
131 impl JobState {
132 /// Loads and deserializes the jobstate from type and name.
133 /// When the loaded state indicates a started UPID,
134 /// we go and check if it has already stopped, and
135 /// returning the correct state.
136 ///
137 /// This does not update the state in the file.
138 pub fn load(jobtype: &str, jobname: &str) -> Result<Self, Error> {
139 if let Some(state) = file_read_optional_string(get_path(jobtype, jobname))? {
140 match serde_json::from_str(&state)? {
141 JobState::Started { upid } => {
142 let parsed: UPID = upid.parse()
143 .map_err(|err| format_err!("error parsing upid: {}", err))?;
144
145 if !worker_is_active_local(&parsed) {
146 let (endtime, state) = upid_read_status(&parsed)
147 .map_err(|err| format_err!("error reading upid log status: {}", err))?;
148
149 Ok(JobState::Finished {
150 upid,
151 endtime,
152 state
153 })
154 } else {
155 Ok(JobState::Started { upid })
156 }
157 }
158 other => Ok(other),
159 }
160 } else {
161 Ok(JobState::Created {
162 time: epoch_now_u64()? as i64
163 })
164 }
165 }
166 }
167
168 impl Job {
169 /// Creates a new instance of a job with the correct lock held
170 /// (will be hold until the job is dropped again).
171 ///
172 /// This does not load the state from the file, to do that,
173 /// 'load' must be called
174 pub fn new(jobtype: &str, jobname: &str) -> Result<Self, Error> {
175 let path = get_path(jobtype, jobname);
176
177 let _lock = get_lock(&path)?;
178
179 Ok(Self{
180 jobtype: jobtype.to_string(),
181 jobname: jobname.to_string(),
182 state: JobState::Created {
183 time: epoch_now_u64()? as i64
184 },
185 _lock,
186 })
187 }
188
189 /// Loads the state from the statefile if it exists.
190 /// If not, it gets created. Updates 'Started' State to 'Finished'
191 /// if we detect the UPID already stopped
192 pub fn load(&mut self) -> Result<(), Error> {
193 self.state = JobState::load(&self.jobtype, &self.jobname)?;
194
195 if let Err(err) = self.write_state() {
196 bail!("could not write statefile: {}", err);
197 }
198
199 Ok(())
200 }
201
202 /// Start the job and update the statefile accordingly
203 /// Fails if the job was already started
204 pub fn start(&mut self, upid: &str) -> Result<(), Error> {
205 match self.state {
206 JobState::Started { .. } => {
207 bail!("cannot start job that is started!");
208 }
209 _ => {}
210 }
211
212 self.state = JobState::Started{
213 upid: upid.to_string(),
214 };
215
216 self.write_state()
217 }
218
219 /// Finish the job and update the statefile accordingly with the given taskstate
220 /// Fails if the job was not yet started
221 pub fn finish(&mut self, state: TaskState) -> Result<(), Error> {
222 let upid = match &self.state {
223 JobState::Created { .. } => bail!("cannot finish when not started"),
224 JobState::Started { upid } => upid,
225 JobState::Finished { upid, .. } => upid,
226 }.to_string();
227
228 let endtime: i64 = epoch_now_u64()? as i64;
229
230 self.state = JobState::Finished {
231 upid,
232 endtime,
233 state,
234 };
235
236 self.write_state()
237 }
238
239 fn write_state(&mut self) -> Result<(), Error> {
240 let serialized = serde_json::to_string(&self.state)?;
241 let path = get_path(&self.jobtype, &self.jobname);
242
243 let backup_user = crate::backup::backup_user()?;
244 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
245 // set the correct owner/group/permissions while saving file
246 // owner(rw) = backup, group(r)= backup
247 let options = CreateOptions::new()
248 .perm(mode)
249 .owner(backup_user.uid)
250 .group(backup_user.gid);
251
252 replace_file(
253 path,
254 serialized.as_bytes(),
255 options,
256 )
257 }
258 }