]> git.proxmox.com Git - proxmox-backup.git/blob - src/server/jobstate.rs
server/jobstate: add 'updatd' to Finish variant
[proxmox-backup.git] / src / server / jobstate.rs
1 //! Generic JobState handling
2 //!
3 //! A 'Job' can have 3 states
4 //! - Created, when a schedule was created but never executed
5 //! - Started, when a job is running right now
6 //! - Finished, when a job was running in the past
7 //!
8 //! and is identified by 2 values: jobtype and jobname (e.g. 'syncjob' and 'myfirstsyncjob')
9 //!
10 //! This module Provides 2 helper structs to handle those coniditons
11 //! 'Job' which handles locking and writing to a file
12 //! 'JobState' which is the actual state
13 //!
14 //! an example usage would be
15 //! ```no_run
16 //! # use anyhow::{bail, Error};
17 //! # use proxmox_backup::server::TaskState;
18 //! # use proxmox_backup::server::jobstate::*;
19 //! # fn some_code() -> TaskState { TaskState::OK { endtime: 0 } }
20 //! # fn code() -> Result<(), Error> {
21 //! // locks the correct file under /var/lib
22 //! // or fails if someone else holds the lock
23 //! let mut job = match Job::new("jobtype", "jobname") {
24 //! Ok(job) => job,
25 //! Err(err) => bail!("could not lock jobstate"),
26 //! };
27 //!
28 //! // job holds the lock, we can start it
29 //! job.start("someupid")?;
30 //! // do something
31 //! let task_state = some_code();
32 //! job.finish(task_state)?;
33 //!
34 //! // release the lock
35 //! drop(job);
36 //! # Ok(())
37 //! # }
38 //!
39 //! ```
40 use std::fs::File;
41 use std::path::{Path, PathBuf};
42 use std::time::Duration;
43
44 use anyhow::{bail, format_err, Error};
45 use proxmox::tools::fs::{
46 create_path, file_read_optional_string, open_file_locked, replace_file, CreateOptions,
47 };
48 use serde::{Deserialize, Serialize};
49
50 use crate::{
51 tools::systemd::time::{
52 parse_calendar_event,
53 compute_next_event,
54 },
55 api2::types::JobScheduleStatus,
56 server::{
57 UPID,
58 TaskState,
59 upid_read_status,
60 worker_is_active_local,
61 },
62 };
63
64 #[serde(rename_all = "kebab-case")]
65 #[derive(Serialize, Deserialize)]
66 /// Represents the State of a specific Job
67 pub enum JobState {
68 /// A job was created at 'time', but never started/finished
69 Created { time: i64 },
70 /// The Job was last started in 'upid',
71 Started { upid: String },
72 /// The Job was last started in 'upid', which finished with 'state', and was last updated at 'updated'
73 Finished {
74 upid: String,
75 state: TaskState,
76 updated: Option<i64>,
77 },
78 }
79
80 /// Represents a Job and holds the correct lock
81 pub struct Job {
82 jobtype: String,
83 jobname: String,
84 /// The State of the job
85 pub state: JobState,
86 _lock: File,
87 }
88
89 const JOB_STATE_BASEDIR: &str = "/var/lib/proxmox-backup/jobstates";
90
91 /// Create jobstate stat dir with correct permission
92 pub fn create_jobstate_dir() -> Result<(), Error> {
93 let backup_user = crate::backup::backup_user()?;
94 let opts = CreateOptions::new()
95 .owner(backup_user.uid)
96 .group(backup_user.gid);
97
98 create_path(JOB_STATE_BASEDIR, None, Some(opts))
99 .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
100
101 Ok(())
102 }
103
104 fn get_path(jobtype: &str, jobname: &str) -> PathBuf {
105 let mut path = PathBuf::from(JOB_STATE_BASEDIR);
106 path.push(format!("{}-{}.json", jobtype, jobname));
107 path
108 }
109
110 fn get_lock<P>(path: P) -> Result<File, Error>
111 where
112 P: AsRef<Path>,
113 {
114 let mut path = path.as_ref().to_path_buf();
115 path.set_extension("lck");
116 let lock = open_file_locked(&path, Duration::new(10, 0), true)?;
117 let backup_user = crate::backup::backup_user()?;
118 nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
119 Ok(lock)
120 }
121
122 /// Removes the statefile of a job, this is useful if we delete a job
123 pub fn remove_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
124 let mut path = get_path(jobtype, jobname);
125 let _lock = get_lock(&path)?;
126 std::fs::remove_file(&path).map_err(|err| {
127 format_err!(
128 "cannot remove statefile for {} - {}: {}",
129 jobtype,
130 jobname,
131 err
132 )
133 })?;
134 path.set_extension("lck");
135 // ignore errors
136 let _ = std::fs::remove_file(&path).map_err(|err| {
137 format_err!(
138 "cannot remove lockfile for {} - {}: {}",
139 jobtype,
140 jobname,
141 err
142 )
143 });
144 Ok(())
145 }
146
147 /// Creates the statefile with the state 'Created'
148 /// overwrites if it exists already
149 pub fn create_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
150 let mut job = Job::new(jobtype, jobname)?;
151 job.write_state()
152 }
153
154 /// Tries to update the state file with the current time
155 /// if the job is currently running, does nothing,
156 pub fn try_update_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
157 let mut job = match Job::new(jobtype, jobname) {
158 Ok(job) => job,
159 Err(_) => return Ok(()), // was locked (running), so do not update
160 };
161 let time = proxmox::tools::time::epoch_i64();
162
163 job.state = match JobState::load(jobtype, jobname)? {
164 JobState::Created { .. } => JobState::Created { time },
165 JobState::Started { .. } => return Ok(()), // currently running (without lock?)
166 JobState::Finished {
167 upid,
168 state,
169 updated: _,
170 } => JobState::Finished {
171 upid,
172 state,
173 updated: Some(time),
174 },
175 };
176 job.write_state()
177 }
178
179 /// Returns the last run time of a job by reading the statefile
180 /// Note that this is not locked
181 pub fn last_run_time(jobtype: &str, jobname: &str) -> Result<i64, Error> {
182 match JobState::load(jobtype, jobname)? {
183 JobState::Created { time } => Ok(time),
184 JobState::Finished {
185 updated: Some(time),
186 ..
187 } => Ok(time),
188 JobState::Started { upid }
189 | JobState::Finished {
190 upid,
191 state: _,
192 updated: None,
193 } => {
194 let upid: UPID = upid
195 .parse()
196 .map_err(|err| format_err!("could not parse upid from state: {}", err))?;
197 Ok(upid.starttime)
198 }
199 }
200 }
201
202 impl JobState {
203 /// Loads and deserializes the jobstate from type and name.
204 /// When the loaded state indicates a started UPID,
205 /// we go and check if it has already stopped, and
206 /// returning the correct state.
207 ///
208 /// This does not update the state in the file.
209 pub fn load(jobtype: &str, jobname: &str) -> Result<Self, Error> {
210 if let Some(state) = file_read_optional_string(get_path(jobtype, jobname))? {
211 match serde_json::from_str(&state)? {
212 JobState::Started { upid } => {
213 let parsed: UPID = upid
214 .parse()
215 .map_err(|err| format_err!("error parsing upid: {}", err))?;
216
217 if !worker_is_active_local(&parsed) {
218 let state = upid_read_status(&parsed)
219 .map_err(|err| format_err!("error reading upid log status: {}", err))?;
220
221 Ok(JobState::Finished {
222 upid,
223 state,
224 updated: None,
225 })
226 } else {
227 Ok(JobState::Started { upid })
228 }
229 }
230 other => Ok(other),
231 }
232 } else {
233 Ok(JobState::Created {
234 time: proxmox::tools::time::epoch_i64() - 30,
235 })
236 }
237 }
238 }
239
240 impl Job {
241 /// Creates a new instance of a job with the correct lock held
242 /// (will be hold until the job is dropped again).
243 ///
244 /// This does not load the state from the file, to do that,
245 /// 'load' must be called
246 pub fn new(jobtype: &str, jobname: &str) -> Result<Self, Error> {
247 let path = get_path(jobtype, jobname);
248
249 let _lock = get_lock(&path)?;
250
251 Ok(Self {
252 jobtype: jobtype.to_string(),
253 jobname: jobname.to_string(),
254 state: JobState::Created {
255 time: proxmox::tools::time::epoch_i64(),
256 },
257 _lock,
258 })
259 }
260
261 /// Start the job and update the statefile accordingly
262 /// Fails if the job was already started
263 pub fn start(&mut self, upid: &str) -> Result<(), Error> {
264 if let JobState::Started { .. } = self.state {
265 bail!("cannot start job that is started!");
266 }
267
268 self.state = JobState::Started {
269 upid: upid.to_string(),
270 };
271
272 self.write_state()
273 }
274
275 /// Finish the job and update the statefile accordingly with the given taskstate
276 /// Fails if the job was not yet started
277 pub fn finish(&mut self, state: TaskState) -> Result<(), Error> {
278 let upid = match &self.state {
279 JobState::Created { .. } => bail!("cannot finish when not started"),
280 JobState::Started { upid } => upid,
281 JobState::Finished { upid, .. } => upid,
282 }
283 .to_string();
284
285 self.state = JobState::Finished {
286 upid,
287 state,
288 updated: None,
289 };
290
291 self.write_state()
292 }
293
294 pub fn jobtype(&self) -> &str {
295 &self.jobtype
296 }
297
298 pub fn jobname(&self) -> &str {
299 &self.jobname
300 }
301
302 fn write_state(&mut self) -> Result<(), Error> {
303 let serialized = serde_json::to_string(&self.state)?;
304 let path = get_path(&self.jobtype, &self.jobname);
305
306 let backup_user = crate::backup::backup_user()?;
307 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
308 // set the correct owner/group/permissions while saving file
309 // owner(rw) = backup, group(r)= backup
310 let options = CreateOptions::new()
311 .perm(mode)
312 .owner(backup_user.uid)
313 .group(backup_user.gid);
314
315 replace_file(path, serialized.as_bytes(), options)
316 }
317 }
318
319 pub fn compute_schedule_status(
320 job_state: &JobState,
321 schedule: Option<&str>,
322 ) -> Result<JobScheduleStatus, Error> {
323 let (upid, endtime, state, last) = match job_state {
324 JobState::Created { time } => (None, None, None, *time),
325 JobState::Started { upid } => {
326 let parsed_upid: UPID = upid.parse()?;
327 (Some(upid), None, None, parsed_upid.starttime)
328 }
329 JobState::Finished {
330 upid,
331 state,
332 updated,
333 } => {
334 let last = updated.unwrap_or_else(|| state.endtime());
335 (
336 Some(upid),
337 Some(state.endtime()),
338 Some(state.to_string()),
339 last,
340 )
341 }
342 };
343
344 let mut status = JobScheduleStatus::default();
345 status.last_run_upid = upid.map(String::from);
346 status.last_run_state = state;
347 status.last_run_endtime = endtime;
348
349 if let Some(schedule) = schedule {
350 if let Ok(event) = parse_calendar_event(&schedule) {
351 // ignore errors
352 status.next_run = compute_next_event(&event, last, false).unwrap_or(None);
353 }
354 }
355
356 Ok(status)
357 }