]> git.proxmox.com Git - pve-manager.git/blame - PVE/Jobs.pm
update shipped appliance info index
[pve-manager.git] / PVE / Jobs.pm
CommitLineData
76c6ee8a
DC
1package PVE::Jobs;
2
3use strict;
4use warnings;
5use JSON;
6
6f2e57c0
TL
7use PVE::Cluster qw(cfs_lock_file cfs_read_file cfs_register_file);
8use PVE::Job::Registry;
76c6ee8a 9use PVE::Jobs::VZDump;
23d64125 10use PVE::Jobs::RealmSync;
76c6ee8a
DC
11use PVE::Tools;
12
13PVE::Jobs::VZDump->register();
23d64125 14PVE::Jobs::RealmSync->register();
6f2e57c0
TL
15PVE::Job::Registry->init();
16
17cfs_register_file(
18 'jobs.cfg',
19 sub { PVE::Job::Registry->parse_config(@_); },
20 sub { PVE::Job::Registry->write_config(@_); },
21);
76c6ee8a
DC
22
23my $state_dir = "/var/lib/pve-manager/jobs";
24my $lock_dir = "/var/lock/pve-manager";
25
26my $get_state_file = sub {
27 my ($jobid, $type) = @_;
28 return "$state_dir/$type-$jobid.json";
29};
30
31my $default_state = {
32 state => 'created',
33 time => 0,
34};
35
2cf7706e
DC
36my $saved_config_props = [qw(enabled schedule)];
37
38# saves some properties of the jobcfg into the jobstate so we can track
39# them on different nodes (where the update was not done)
40# and update the last runtime when they change
41sub detect_changed_runtime_props {
42 my ($jobid, $type, $cfg) = @_;
43
44 lock_job_state($jobid, $type, sub {
45 my $old_state = read_job_state($jobid, $type) // $default_state;
46
47 my $updated = 0;
48 for my $prop (@$saved_config_props) {
49 my $old_prop = $old_state->{config}->{$prop} // '';
50 my $new_prop = $cfg->{$prop} // '';
51 next if "$old_prop" eq "$new_prop";
52
53 if (defined($cfg->{$prop})) {
54 $old_state->{config}->{$prop} = $cfg->{$prop};
55 } else {
56 delete $old_state->{config}->{$prop};
57 }
58
59 $updated = 1;
60 }
61
62 return if !$updated;
63 $old_state->{updated} = time();
64
65 my $path = $get_state_file->($jobid, $type);
66 PVE::Tools::file_set_contents($path, encode_json($old_state));
67 });
68}
69
76c6ee8a
DC
70# lockless, since we use file_get_contents, which is atomic
71sub read_job_state {
72 my ($jobid, $type) = @_;
73 my $path = $get_state_file->($jobid, $type);
74 return if ! -e $path;
75
76 my $raw = PVE::Tools::file_get_contents($path);
77
78 return $default_state if $raw eq '';
79
80 # untaint $raw
81 if ($raw =~ m/^(\{.*\})$/) {
82 return decode_json($1);
83 }
84
85 die "invalid json data in '$path'\n";
86}
87
88sub lock_job_state {
89 my ($jobid, $type, $sub) = @_;
90
91 my $filename = "$lock_dir/$type-$jobid.lck";
92
93 my $res = PVE::Tools::lock_file($filename, 10, $sub);
94 die $@ if $@;
95
96 return $res;
97}
98
775e0cbe 99my $get_job_task_status = sub {
76c6ee8a
DC
100 my ($state) = @_;
101
102 if (!defined($state->{upid})) {
103 return; # not started
104 }
105
106 my ($task, $filename) = PVE::Tools::upid_decode($state->{upid}, 1);
107 die "unable to parse worker upid - $state->{upid}\n" if !$task;
108 die "no such task\n" if ! -f $filename;
109
110 my $pstart = PVE::ProcFSTools::read_proc_starttime($task->{pid});
111 if ($pstart && $pstart == $task->{pstart}) {
112 return; # still running
113 }
114
115 return PVE::Tools::upid_read_status($state->{upid});
116};
117
118# checks if the job is already finished if it was started before and
119# updates the statefile accordingly
120sub update_job_stopped {
121 my ($jobid, $type) = @_;
122
123 # first check unlocked to save time,
124 my $state = read_job_state($jobid, $type);
125 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
126
775e0cbe 127 if (defined($get_job_task_status->($state))) {
76c6ee8a
DC
128 lock_job_state($jobid, $type, sub {
129 my $state = read_job_state($jobid, $type);
130 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
131
76c6ee8a
DC
132 my $new_state = {
133 state => 'stopped',
775e0cbe
FE
134 msg => $get_job_task_status->($state) // 'internal error',
135 upid => $state->{upid},
2cf7706e 136 config => $state->{config},
76c6ee8a
DC
137 };
138
139 if ($state->{updated}) { # save updated time stamp
140 $new_state->{updated} = $state->{updated};
141 }
142
143 my $path = $get_state_file->($jobid, $type);
144 PVE::Tools::file_set_contents($path, encode_json($new_state));
145 });
146 }
147}
148
149# must be called when the job is first created
150sub create_job {
2cf7706e 151 my ($jobid, $type, $cfg) = @_;
76c6ee8a
DC
152
153 lock_job_state($jobid, $type, sub {
154 my $state = read_job_state($jobid, $type) // $default_state;
155
156 if ($state->{state} ne 'created') {
157 die "job state already exists\n";
158 }
159
160 $state->{time} = time();
2cf7706e
DC
161 for my $prop (@$saved_config_props) {
162 if (defined($cfg->{$prop})) {
163 $state->{config}->{$prop} = $cfg->{$prop};
164 }
165 }
76c6ee8a
DC
166
167 my $path = $get_state_file->($jobid, $type);
168 PVE::Tools::file_set_contents($path, encode_json($state));
169 });
170}
171
172# to be called when the job is removed
173sub remove_job {
174 my ($jobid, $type) = @_;
175 my $path = $get_state_file->($jobid, $type);
176 unlink $path;
177}
178
179# checks if the job can be started and sets the state to 'starting'
180# returns 1 if the job can be started, 0 otherwise
181sub starting_job {
182 my ($jobid, $type) = @_;
183
184 # first check unlocked to save time
185 my $state = read_job_state($jobid, $type);
186 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
187
188 lock_job_state($jobid, $type, sub {
189 my $state = read_job_state($jobid, $type);
190 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
191
192 my $new_state = {
193 state => 'starting',
194 time => time(),
2cf7706e 195 config => $state->{config},
76c6ee8a
DC
196 };
197
198 my $path = $get_state_file->($jobid, $type);
199 PVE::Tools::file_set_contents($path, encode_json($new_state));
200 });
201 return 1;
202}
203
204sub started_job {
99f03091
FE
205 my ($jobid, $type, $upid, $msg) = @_;
206
76c6ee8a
DC
207 lock_job_state($jobid, $type, sub {
208 my $state = read_job_state($jobid, $type);
209 return if !defined($state); # job was removed, do not update
210 die "unexpected state '$state->{state}'\n" if $state->{state} ne 'starting';
211
212 my $new_state;
99f03091 213 if (defined($msg)) {
76c6ee8a
DC
214 $new_state = {
215 state => 'stopped',
99f03091 216 msg => $msg,
76c6ee8a
DC
217 time => time(),
218 };
219 } else {
220 $new_state = {
221 state => 'started',
222 upid => $upid,
223 };
224 }
2cf7706e 225 $new_state->{config} = $state->{config};
76c6ee8a
DC
226
227 my $path = $get_state_file->($jobid, $type);
228 PVE::Tools::file_set_contents($path, encode_json($new_state));
229 });
230}
231
232# will be called when the job schedule is updated
c61c192e 233sub update_last_runtime {
76c6ee8a
DC
234 my ($jobid, $type) = @_;
235 lock_job_state($jobid, $type, sub {
236 my $old_state = read_job_state($jobid, $type) // $default_state;
237
238 $old_state->{updated} = time();
239
240 my $path = $get_state_file->($jobid, $type);
241 PVE::Tools::file_set_contents($path, encode_json($old_state));
242 });
243}
244
245sub get_last_runtime {
246 my ($jobid, $type) = @_;
247
248 my $state = read_job_state($jobid, $type) // $default_state;
249
250 return $state->{updated} if defined($state->{updated});
251
252 if (my $upid = $state->{upid}) {
253 my ($task) = PVE::Tools::upid_decode($upid, 1);
254 die "unable to parse worker upid\n" if !$task;
255 return $task->{starttime};
256 }
257
258 return $state->{time} // 0;
259}
260
261sub run_jobs {
c61c192e
DC
262 my ($first_run) = @_;
263
76c6ee8a
DC
264 synchronize_job_states_with_config();
265
266 my $jobs_cfg = cfs_read_file('jobs.cfg');
267 my $nodename = PVE::INotify::nodename();
268
269 foreach my $id (sort keys %{$jobs_cfg->{ids}}) {
270 my $cfg = $jobs_cfg->{ids}->{$id};
271 my $type = $cfg->{type};
272 my $schedule = delete $cfg->{schedule};
273
274 # only schedule local jobs
275 next if defined($cfg->{node}) && $cfg->{node} ne $nodename;
276
727673eb 277 eval { update_job_stopped($id, $type) };
76c6ee8a
DC
278 if (my $err = $@) {
279 warn "could not update job state, skipping - $err\n";
280 next;
281 }
282
c61c192e
DC
283 # update last runtime on the first run when 'repeat-missed' is 0, so that a missed job
284 # will not start immediately after boot
285 update_last_runtime($id, $type) if $first_run && !$cfg->{'repeat-missed'};
286
727673eb 287 next if defined($cfg->{enabled}) && !$cfg->{enabled}; # only schedule actually enabled jobs
76c6ee8a
DC
288
289 my $last_run = get_last_runtime($id, $type);
290 my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
349fe2a9 291 my $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $last_run);
76c6ee8a 292
349fe2a9 293 next if !defined($next_sync) || time() < $next_sync; # not yet its (next) turn
727673eb 294
6f2e57c0 295 my $plugin = PVE::Job::Registry->lookup($type);
727673eb 296 if (starting_job($id, $type)) {
8c791952
TL
297 PVE::Cluster::cfs_update();
298
9f1a43c1 299 my $upid = eval { $plugin->run($cfg, $id, $schedule) };
727673eb
TL
300 if (my $err = $@) {
301 warn $@ if $@;
302 started_job($id, $type, undef, $err);
303 } elsif ($upid eq 'OK') { # some jobs return OK immediately
304 started_job($id, $type, undef, 'OK');
305 } else {
306 started_job($id, $type, $upid);
76c6ee8a
DC
307 }
308 }
309 }
310}
311
312# creates and removes statefiles for job configs
313sub synchronize_job_states_with_config {
314 cfs_lock_file('jobs.cfg', undef, sub {
315 my $data = cfs_read_file('jobs.cfg');
316
317 for my $id (keys $data->{ids}->%*) {
318 my $job = $data->{ids}->{$id};
319 my $type = $job->{type};
2cf7706e
DC
320
321 my $path = $get_state_file->($id, $type);
322 if (-e $path) {
323 detect_changed_runtime_props($id, $type, $job);
324 } else {
325 create_job($id, $type, $job);
326 }
76c6ee8a
DC
327 }
328
a1c51a74
DC
329 my $valid_types = PVE::Job::Registry->lookup_types();
330 my $type_regex = join("|", $valid_types->@*);
331
332 PVE::Tools::dir_glob_foreach($state_dir, "(${type_regex})-(.*).json", sub {
76c6ee8a
DC
333 my ($path, $type, $id) = @_;
334
335 if (!defined($data->{ids}->{$id})) {
336 remove_job($id, $type);
337 }
338 });
339 });
340 die $@ if $@;
341}
342
343sub setup_dirs {
344 mkdir $state_dir;
345 mkdir $lock_dir;
346}
347
3481;