]> git.proxmox.com Git - proxmox-backup.git/blob - src/server/jobstate.rs
96dd21aacbc64ba199fc869ffeacf1ea553b0f7d
[proxmox-backup.git] / src / server / jobstate.rs
1 //! Generic JobState handling
2 //!
3 //! A 'Job' can have 3 states
4 //! - Created, when a schedule was created but never executed
5 //! - Started, when a job is running right now
6 //! - Finished, when a job was running in the past
7 //!
8 //! and is identified by 2 values: jobtype and jobname (e.g. 'syncjob' and 'myfirstsyncjob')
9 //!
10 //! This module Provides 2 helper structs to handle those coniditons
11 //! 'Job' which handles locking and writing to a file
12 //! 'JobState' which is the actual state
13 //!
14 //! an example usage would be
15 //! ```no_run
16 //! # use anyhow::{bail, Error};
17 //! # use proxmox_backup::server::TaskState;
18 //! # use proxmox_backup::server::jobstate::*;
19 //! # fn some_code() -> TaskState { TaskState::OK { endtime: 0 } }
20 //! # fn code() -> Result<(), Error> {
21 //! // locks the correct file under /var/lib
22 //! // or fails if someone else holds the lock
23 //! let mut job = match Job::new("jobtype", "jobname") {
24 //! Ok(job) => job,
25 //! Err(err) => bail!("could not lock jobstate"),
26 //! };
27 //!
28 //! // job holds the lock, we can start it
29 //! job.start("someupid")?;
30 //! // do something
31 //! let task_state = some_code();
32 //! job.finish(task_state)?;
33 //!
34 //! // release the lock
35 //! drop(job);
36 //! # Ok(())
37 //! # }
38 //!
39 //! ```
40 use std::fs::File;
41 use std::path::{Path, PathBuf};
42 use std::time::Duration;
43
44 use anyhow::{bail, format_err, Error};
45 use proxmox::tools::fs::{
46 create_path, file_read_optional_string, open_file_locked, replace_file, CreateOptions,
47 };
48 use serde::{Deserialize, Serialize};
49
50 use crate::{
51 tools::systemd::time::{
52 parse_calendar_event,
53 compute_next_event,
54 },
55 api2::types::JobScheduleStatus,
56 server::{
57 UPID,
58 TaskState,
59 upid_read_status,
60 worker_is_active_local,
61 },
62 };
63
64 #[derive(Serialize, Deserialize)]
65 #[serde(rename_all = "kebab-case")]
66 /// Represents the State of a specific Job
67 pub enum JobState {
68 /// A job was created at 'time', but never started/finished
69 Created { time: i64 },
70 /// The Job was last started in 'upid',
71 Started { upid: String },
72 /// The Job was last started in 'upid', which finished with 'state', and was last updated at 'updated'
73 Finished {
74 upid: String,
75 state: TaskState,
76 updated: Option<i64>,
77 },
78 }
79
80 /// Represents a Job and holds the correct lock
81 pub struct Job {
82 jobtype: String,
83 jobname: String,
84 /// The State of the job
85 pub state: JobState,
86 _lock: File,
87 }
88
89 const JOB_STATE_BASEDIR: &str = "/var/lib/proxmox-backup/jobstates";
90
91 /// Create jobstate stat dir with correct permission
92 pub fn create_jobstate_dir() -> Result<(), Error> {
93 let backup_user = crate::backup::backup_user()?;
94 let opts = CreateOptions::new()
95 .owner(backup_user.uid)
96 .group(backup_user.gid);
97
98 create_path(JOB_STATE_BASEDIR, None, Some(opts))
99 .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
100
101 Ok(())
102 }
103
104 fn get_path(jobtype: &str, jobname: &str) -> PathBuf {
105 let mut path = PathBuf::from(JOB_STATE_BASEDIR);
106 path.push(format!("{}-{}.json", jobtype, jobname));
107 path
108 }
109
110 fn get_lock<P>(path: P) -> Result<File, Error>
111 where
112 P: AsRef<Path>,
113 {
114 let mut path = path.as_ref().to_path_buf();
115 path.set_extension("lck");
116 let lock = open_file_locked(&path, Duration::new(10, 0), true)?;
117 let backup_user = crate::backup::backup_user()?;
118 nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
119 Ok(lock)
120 }
121
122 /// Removes the statefile of a job, this is useful if we delete a job
123 pub fn remove_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
124 let mut path = get_path(jobtype, jobname);
125 let _lock = get_lock(&path)?;
126 std::fs::remove_file(&path).map_err(|err| {
127 format_err!(
128 "cannot remove statefile for {} - {}: {}",
129 jobtype,
130 jobname,
131 err
132 )
133 })?;
134 path.set_extension("lck");
135 // ignore errors
136 let _ = std::fs::remove_file(&path).map_err(|err| {
137 format_err!(
138 "cannot remove lockfile for {} - {}: {}",
139 jobtype,
140 jobname,
141 err
142 )
143 });
144 Ok(())
145 }
146
147 /// Creates the statefile with the state 'Created'
148 /// overwrites if it exists already
149 pub fn create_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
150 let mut job = Job::new(jobtype, jobname)?;
151 job.write_state()
152 }
153
154 /// Tries to update the state file with the current time
155 /// if the job is currently running, does nothing.
156 /// Intended for use when the schedule changes.
157 pub fn update_job_last_run_time(jobtype: &str, jobname: &str) -> Result<(), Error> {
158 let mut job = match Job::new(jobtype, jobname) {
159 Ok(job) => job,
160 Err(_) => return Ok(()), // was locked (running), so do not update
161 };
162 let time = proxmox::tools::time::epoch_i64();
163
164 job.state = match JobState::load(jobtype, jobname)? {
165 JobState::Created { .. } => JobState::Created { time },
166 JobState::Started { .. } => return Ok(()), // currently running (without lock?)
167 JobState::Finished {
168 upid,
169 state,
170 updated: _,
171 } => JobState::Finished {
172 upid,
173 state,
174 updated: Some(time),
175 },
176 };
177 job.write_state()
178 }
179
180 /// Returns the last run time of a job by reading the statefile
181 /// Note that this is not locked
182 pub fn last_run_time(jobtype: &str, jobname: &str) -> Result<i64, Error> {
183 match JobState::load(jobtype, jobname)? {
184 JobState::Created { time } => Ok(time),
185 JobState::Finished {
186 updated: Some(time),
187 ..
188 } => Ok(time),
189 JobState::Started { upid }
190 | JobState::Finished {
191 upid,
192 state: _,
193 updated: None,
194 } => {
195 let upid: UPID = upid
196 .parse()
197 .map_err(|err| format_err!("could not parse upid from state: {}", err))?;
198 Ok(upid.starttime)
199 }
200 }
201 }
202
203 impl JobState {
204 /// Loads and deserializes the jobstate from type and name.
205 /// When the loaded state indicates a started UPID,
206 /// we go and check if it has already stopped, and
207 /// returning the correct state.
208 ///
209 /// This does not update the state in the file.
210 pub fn load(jobtype: &str, jobname: &str) -> Result<Self, Error> {
211 if let Some(state) = file_read_optional_string(get_path(jobtype, jobname))? {
212 match serde_json::from_str(&state)? {
213 JobState::Started { upid } => {
214 let parsed: UPID = upid
215 .parse()
216 .map_err(|err| format_err!("error parsing upid: {}", err))?;
217
218 if !worker_is_active_local(&parsed) {
219 let state = upid_read_status(&parsed)
220 .map_err(|err| format_err!("error reading upid log status: {}", err))?;
221
222 Ok(JobState::Finished {
223 upid,
224 state,
225 updated: None,
226 })
227 } else {
228 Ok(JobState::Started { upid })
229 }
230 }
231 other => Ok(other),
232 }
233 } else {
234 Ok(JobState::Created {
235 time: proxmox::tools::time::epoch_i64() - 30,
236 })
237 }
238 }
239 }
240
241 impl Job {
242 /// Creates a new instance of a job with the correct lock held
243 /// (will be hold until the job is dropped again).
244 ///
245 /// This does not load the state from the file, to do that,
246 /// 'load' must be called
247 pub fn new(jobtype: &str, jobname: &str) -> Result<Self, Error> {
248 let path = get_path(jobtype, jobname);
249
250 let _lock = get_lock(&path)?;
251
252 Ok(Self {
253 jobtype: jobtype.to_string(),
254 jobname: jobname.to_string(),
255 state: JobState::Created {
256 time: proxmox::tools::time::epoch_i64(),
257 },
258 _lock,
259 })
260 }
261
262 /// Start the job and update the statefile accordingly
263 /// Fails if the job was already started
264 pub fn start(&mut self, upid: &str) -> Result<(), Error> {
265 if let JobState::Started { .. } = self.state {
266 bail!("cannot start job that is started!");
267 }
268
269 self.state = JobState::Started {
270 upid: upid.to_string(),
271 };
272
273 self.write_state()
274 }
275
276 /// Finish the job and update the statefile accordingly with the given taskstate
277 /// Fails if the job was not yet started
278 pub fn finish(&mut self, state: TaskState) -> Result<(), Error> {
279 let upid = match &self.state {
280 JobState::Created { .. } => bail!("cannot finish when not started"),
281 JobState::Started { upid } => upid,
282 JobState::Finished { upid, .. } => upid,
283 }
284 .to_string();
285
286 self.state = JobState::Finished {
287 upid,
288 state,
289 updated: None,
290 };
291
292 self.write_state()
293 }
294
295 pub fn jobtype(&self) -> &str {
296 &self.jobtype
297 }
298
299 pub fn jobname(&self) -> &str {
300 &self.jobname
301 }
302
303 fn write_state(&mut self) -> Result<(), Error> {
304 let serialized = serde_json::to_string(&self.state)?;
305 let path = get_path(&self.jobtype, &self.jobname);
306
307 let backup_user = crate::backup::backup_user()?;
308 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
309 // set the correct owner/group/permissions while saving file
310 // owner(rw) = backup, group(r)= backup
311 let options = CreateOptions::new()
312 .perm(mode)
313 .owner(backup_user.uid)
314 .group(backup_user.gid);
315
316 replace_file(path, serialized.as_bytes(), options)
317 }
318 }
319
320 pub fn compute_schedule_status(
321 job_state: &JobState,
322 schedule: Option<&str>,
323 ) -> Result<JobScheduleStatus, Error> {
324 let (upid, endtime, state, last) = match job_state {
325 JobState::Created { time } => (None, None, None, *time),
326 JobState::Started { upid } => {
327 let parsed_upid: UPID = upid.parse()?;
328 (Some(upid), None, None, parsed_upid.starttime)
329 }
330 JobState::Finished {
331 upid,
332 state,
333 updated,
334 } => {
335 let last = updated.unwrap_or_else(|| state.endtime());
336 (
337 Some(upid),
338 Some(state.endtime()),
339 Some(state.to_string()),
340 last,
341 )
342 }
343 };
344
345 let mut status = JobScheduleStatus::default();
346 status.last_run_upid = upid.map(String::from);
347 status.last_run_state = state;
348 status.last_run_endtime = endtime;
349
350 if let Some(schedule) = schedule {
351 if let Ok(event) = parse_calendar_event(&schedule) {
352 // ignore errors
353 status.next_run = compute_next_event(&event, last, false).unwrap_or(None);
354 }
355 }
356
357 Ok(status)
358 }