]> git.proxmox.com Git - proxmox-backup.git/blob - src/server/jobstate.rs
update to proxmox-sys 0.2 crate
[proxmox-backup.git] / src / server / jobstate.rs
1 //! Generic JobState handling
2 //!
3 //! A 'Job' can have 3 states
4 //! - Created, when a schedule was created but never executed
5 //! - Started, when a job is running right now
6 //! - Finished, when a job was running in the past
7 //!
8 //! and is identified by 2 values: jobtype and jobname (e.g. 'syncjob' and 'myfirstsyncjob')
9 //!
10 //! This module Provides 2 helper structs to handle those coniditons
11 //! 'Job' which handles locking and writing to a file
12 //! 'JobState' which is the actual state
13 //!
14 //! an example usage would be
15 //! ```no_run
16 //! # use anyhow::{bail, Error};
17 //! # use proxmox_rest_server::TaskState;
18 //! # use proxmox_backup::server::jobstate::*;
19 //! # fn some_code() -> TaskState { TaskState::OK { endtime: 0 } }
20 //! # fn code() -> Result<(), Error> {
21 //! // locks the correct file under /var/lib
22 //! // or fails if someone else holds the lock
23 //! let mut job = match Job::new("jobtype", "jobname") {
24 //! Ok(job) => job,
25 //! Err(err) => bail!("could not lock jobstate"),
26 //! };
27 //!
28 //! // job holds the lock, we can start it
29 //! job.start("someupid")?;
30 //! // do something
31 //! let task_state = some_code();
32 //! job.finish(task_state)?;
33 //!
34 //! // release the lock
35 //! drop(job);
36 //! # Ok(())
37 //! # }
38 //!
39 //! ```
40 use std::path::{Path, PathBuf};
41
42 use anyhow::{bail, format_err, Error};
43 use serde::{Deserialize, Serialize};
44
45 use proxmox_sys::fs::{
46 create_path, file_read_optional_string, replace_file, CreateOptions,
47 };
48
49 use proxmox_time::{compute_next_event, parse_calendar_event};
50
51 use pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR_M;
52 use pbs_config::{open_backup_lockfile, BackupLockGuard};
53 use pbs_api_types::{UPID, JobScheduleStatus};
54
55 use proxmox_rest_server::{upid_read_status, worker_is_active_local, TaskState};
56
57 #[derive(Serialize, Deserialize)]
58 #[serde(rename_all = "kebab-case")]
59 /// Represents the State of a specific Job
60 pub enum JobState {
61 /// A job was created at 'time', but never started/finished
62 Created { time: i64 },
63 /// The Job was last started in 'upid',
64 Started { upid: String },
65 /// The Job was last started in 'upid', which finished with 'state', and was last updated at 'updated'
66 Finished {
67 upid: String,
68 state: TaskState,
69 updated: Option<i64>,
70 },
71 }
72
73 /// Represents a Job and holds the correct lock
74 pub struct Job {
75 jobtype: String,
76 jobname: String,
77 /// The State of the job
78 pub state: JobState,
79 _lock: BackupLockGuard,
80 }
81
82 const JOB_STATE_BASEDIR: &str = concat!(PROXMOX_BACKUP_STATE_DIR_M!(), "/jobstates");
83
84 /// Create jobstate stat dir with correct permission
85 pub fn create_jobstate_dir() -> Result<(), Error> {
86 let backup_user = pbs_config::backup_user()?;
87
88 let opts = CreateOptions::new()
89 .owner(backup_user.uid)
90 .group(backup_user.gid);
91
92 create_path(JOB_STATE_BASEDIR, Some(opts.clone()), Some(opts))
93 .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
94
95 Ok(())
96 }
97
98 fn get_path(jobtype: &str, jobname: &str) -> PathBuf {
99 let mut path = PathBuf::from(JOB_STATE_BASEDIR);
100 path.push(format!("{}-{}.json", jobtype, jobname));
101 path
102 }
103
104 fn get_lock<P>(path: P) -> Result<BackupLockGuard, Error>
105 where
106 P: AsRef<Path>,
107 {
108 let mut path = path.as_ref().to_path_buf();
109 path.set_extension("lck");
110 open_backup_lockfile(&path, None, true)
111 }
112
113 /// Removes the statefile of a job, this is useful if we delete a job
114 pub fn remove_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
115 let mut path = get_path(jobtype, jobname);
116 let _lock = get_lock(&path)?;
117 std::fs::remove_file(&path).map_err(|err| {
118 format_err!(
119 "cannot remove statefile for {} - {}: {}",
120 jobtype,
121 jobname,
122 err
123 )
124 })?;
125 path.set_extension("lck");
126 // ignore errors
127 let _ = std::fs::remove_file(&path).map_err(|err| {
128 format_err!(
129 "cannot remove lockfile for {} - {}: {}",
130 jobtype,
131 jobname,
132 err
133 )
134 });
135 Ok(())
136 }
137
138 /// Creates the statefile with the state 'Created'
139 /// overwrites if it exists already
140 pub fn create_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
141 let mut job = Job::new(jobtype, jobname)?;
142 job.write_state()
143 }
144
145 /// Tries to update the state file with the current time
146 /// if the job is currently running, does nothing.
147 /// Intended for use when the schedule changes.
148 pub fn update_job_last_run_time(jobtype: &str, jobname: &str) -> Result<(), Error> {
149 let mut job = match Job::new(jobtype, jobname) {
150 Ok(job) => job,
151 Err(_) => return Ok(()), // was locked (running), so do not update
152 };
153 let time = proxmox_time::epoch_i64();
154
155 job.state = match JobState::load(jobtype, jobname)? {
156 JobState::Created { .. } => JobState::Created { time },
157 JobState::Started { .. } => return Ok(()), // currently running (without lock?)
158 JobState::Finished {
159 upid,
160 state,
161 updated: _,
162 } => JobState::Finished {
163 upid,
164 state,
165 updated: Some(time),
166 },
167 };
168 job.write_state()
169 }
170
171 /// Returns the last run time of a job by reading the statefile
172 /// Note that this is not locked
173 pub fn last_run_time(jobtype: &str, jobname: &str) -> Result<i64, Error> {
174 match JobState::load(jobtype, jobname)? {
175 JobState::Created { time } => Ok(time),
176 JobState::Finished {
177 updated: Some(time),
178 ..
179 } => Ok(time),
180 JobState::Started { upid }
181 | JobState::Finished {
182 upid,
183 state: _,
184 updated: None,
185 } => {
186 let upid: UPID = upid
187 .parse()
188 .map_err(|err| format_err!("could not parse upid from state: {}", err))?;
189 Ok(upid.starttime)
190 }
191 }
192 }
193
194 impl JobState {
195 /// Loads and deserializes the jobstate from type and name.
196 /// When the loaded state indicates a started UPID,
197 /// we go and check if it has already stopped, and
198 /// returning the correct state.
199 ///
200 /// This does not update the state in the file.
201 pub fn load(jobtype: &str, jobname: &str) -> Result<Self, Error> {
202 if let Some(state) = file_read_optional_string(get_path(jobtype, jobname))? {
203 match serde_json::from_str(&state)? {
204 JobState::Started { upid } => {
205 let parsed: UPID = upid
206 .parse()
207 .map_err(|err| format_err!("error parsing upid: {}", err))?;
208
209 if !worker_is_active_local(&parsed) {
210 let state = upid_read_status(&parsed)
211 .map_err(|err| format_err!("error reading upid log status: {}", err))?;
212
213 Ok(JobState::Finished {
214 upid,
215 state,
216 updated: None,
217 })
218 } else {
219 Ok(JobState::Started { upid })
220 }
221 }
222 other => Ok(other),
223 }
224 } else {
225 Ok(JobState::Created {
226 time: proxmox_time::epoch_i64() - 30,
227 })
228 }
229 }
230 }
231
232 impl Job {
233 /// Creates a new instance of a job with the correct lock held
234 /// (will be hold until the job is dropped again).
235 ///
236 /// This does not load the state from the file, to do that,
237 /// 'load' must be called
238 pub fn new(jobtype: &str, jobname: &str) -> Result<Self, Error> {
239 let path = get_path(jobtype, jobname);
240
241 let _lock = get_lock(&path)?;
242
243 Ok(Self {
244 jobtype: jobtype.to_string(),
245 jobname: jobname.to_string(),
246 state: JobState::Created {
247 time: proxmox_time::epoch_i64(),
248 },
249 _lock,
250 })
251 }
252
253 /// Start the job and update the statefile accordingly
254 /// Fails if the job was already started
255 pub fn start(&mut self, upid: &str) -> Result<(), Error> {
256 if let JobState::Started { .. } = self.state {
257 bail!("cannot start job that is started!");
258 }
259
260 self.state = JobState::Started {
261 upid: upid.to_string(),
262 };
263
264 self.write_state()
265 }
266
267 /// Finish the job and update the statefile accordingly with the given taskstate
268 /// Fails if the job was not yet started
269 pub fn finish(&mut self, state: TaskState) -> Result<(), Error> {
270 let upid = match &self.state {
271 JobState::Created { .. } => bail!("cannot finish when not started"),
272 JobState::Started { upid } => upid,
273 JobState::Finished { upid, .. } => upid,
274 }
275 .to_string();
276
277 self.state = JobState::Finished {
278 upid,
279 state,
280 updated: None,
281 };
282
283 self.write_state()
284 }
285
286 pub fn jobtype(&self) -> &str {
287 &self.jobtype
288 }
289
290 pub fn jobname(&self) -> &str {
291 &self.jobname
292 }
293
294 fn write_state(&mut self) -> Result<(), Error> {
295 let serialized = serde_json::to_string(&self.state)?;
296 let path = get_path(&self.jobtype, &self.jobname);
297
298 let backup_user = pbs_config::backup_user()?;
299 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
300 // set the correct owner/group/permissions while saving file
301 // owner(rw) = backup, group(r)= backup
302 let options = CreateOptions::new()
303 .perm(mode)
304 .owner(backup_user.uid)
305 .group(backup_user.gid);
306
307 replace_file(path, serialized.as_bytes(), options, false)
308 }
309 }
310
311 pub fn compute_schedule_status(
312 job_state: &JobState,
313 schedule: Option<&str>,
314 ) -> Result<JobScheduleStatus, Error> {
315 let (upid, endtime, state, last) = match job_state {
316 JobState::Created { time } => (None, None, None, *time),
317 JobState::Started { upid } => {
318 let parsed_upid: UPID = upid.parse()?;
319 (Some(upid), None, None, parsed_upid.starttime)
320 }
321 JobState::Finished {
322 upid,
323 state,
324 updated,
325 } => {
326 let last = updated.unwrap_or_else(|| state.endtime());
327 (
328 Some(upid),
329 Some(state.endtime()),
330 Some(state.to_string()),
331 last,
332 )
333 }
334 };
335
336 let mut status = JobScheduleStatus::default();
337 status.last_run_upid = upid.map(String::from);
338 status.last_run_state = state;
339 status.last_run_endtime = endtime;
340
341 if let Some(schedule) = schedule {
342 if let Ok(event) = parse_calendar_event(&schedule) {
343 // ignore errors
344 status.next_run = compute_next_event(&event, last, false).unwrap_or(None);
345 }
346 }
347
348 Ok(status)
349 }