]> git.proxmox.com Git - pve-manager.git/blob - PVE/Jobs.pm
jobs: move base registry to pve-common & split vzdump base out to guest-common
[pve-manager.git] / PVE / Jobs.pm
1 package PVE::Jobs;
2
3 use strict;
4 use warnings;
5 use JSON;
6
7 use PVE::Cluster qw(cfs_lock_file cfs_read_file cfs_register_file);
8 use PVE::Job::Registry;
9 use PVE::Jobs::VZDump;
10 use PVE::Tools;
11
12 PVE::Jobs::VZDump->register();
13 PVE::Job::Registry->init();
14
15 cfs_register_file(
16 'jobs.cfg',
17 sub { PVE::Job::Registry->parse_config(@_); },
18 sub { PVE::Job::Registry->write_config(@_); },
19 );
20
21 my $state_dir = "/var/lib/pve-manager/jobs";
22 my $lock_dir = "/var/lock/pve-manager";
23
24 my $get_state_file = sub {
25 my ($jobid, $type) = @_;
26 return "$state_dir/$type-$jobid.json";
27 };
28
29 my $default_state = {
30 state => 'created',
31 time => 0,
32 };
33
34 my $saved_config_props = [qw(enabled schedule)];
35
36 # saves some properties of the jobcfg into the jobstate so we can track
37 # them on different nodes (where the update was not done)
38 # and update the last runtime when they change
39 sub detect_changed_runtime_props {
40 my ($jobid, $type, $cfg) = @_;
41
42 lock_job_state($jobid, $type, sub {
43 my $old_state = read_job_state($jobid, $type) // $default_state;
44
45 my $updated = 0;
46 for my $prop (@$saved_config_props) {
47 my $old_prop = $old_state->{config}->{$prop} // '';
48 my $new_prop = $cfg->{$prop} // '';
49 next if "$old_prop" eq "$new_prop";
50
51 if (defined($cfg->{$prop})) {
52 $old_state->{config}->{$prop} = $cfg->{$prop};
53 } else {
54 delete $old_state->{config}->{$prop};
55 }
56
57 $updated = 1;
58 }
59
60 return if !$updated;
61 $old_state->{updated} = time();
62
63 my $path = $get_state_file->($jobid, $type);
64 PVE::Tools::file_set_contents($path, encode_json($old_state));
65 });
66 }
67
68 # lockless, since we use file_get_contents, which is atomic
69 sub read_job_state {
70 my ($jobid, $type) = @_;
71 my $path = $get_state_file->($jobid, $type);
72 return if ! -e $path;
73
74 my $raw = PVE::Tools::file_get_contents($path);
75
76 return $default_state if $raw eq '';
77
78 # untaint $raw
79 if ($raw =~ m/^(\{.*\})$/) {
80 return decode_json($1);
81 }
82
83 die "invalid json data in '$path'\n";
84 }
85
86 sub lock_job_state {
87 my ($jobid, $type, $sub) = @_;
88
89 my $filename = "$lock_dir/$type-$jobid.lck";
90
91 my $res = PVE::Tools::lock_file($filename, 10, $sub);
92 die $@ if $@;
93
94 return $res;
95 }
96
97 my $get_job_task_status = sub {
98 my ($state) = @_;
99
100 if (!defined($state->{upid})) {
101 return; # not started
102 }
103
104 my ($task, $filename) = PVE::Tools::upid_decode($state->{upid}, 1);
105 die "unable to parse worker upid - $state->{upid}\n" if !$task;
106 die "no such task\n" if ! -f $filename;
107
108 my $pstart = PVE::ProcFSTools::read_proc_starttime($task->{pid});
109 if ($pstart && $pstart == $task->{pstart}) {
110 return; # still running
111 }
112
113 return PVE::Tools::upid_read_status($state->{upid});
114 };
115
116 # checks if the job is already finished if it was started before and
117 # updates the statefile accordingly
118 sub update_job_stopped {
119 my ($jobid, $type) = @_;
120
121 # first check unlocked to save time,
122 my $state = read_job_state($jobid, $type);
123 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
124
125 if (defined($get_job_task_status->($state))) {
126 lock_job_state($jobid, $type, sub {
127 my $state = read_job_state($jobid, $type);
128 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
129
130 my $new_state = {
131 state => 'stopped',
132 msg => $get_job_task_status->($state) // 'internal error',
133 upid => $state->{upid},
134 config => $state->{config},
135 };
136
137 if ($state->{updated}) { # save updated time stamp
138 $new_state->{updated} = $state->{updated};
139 }
140
141 my $path = $get_state_file->($jobid, $type);
142 PVE::Tools::file_set_contents($path, encode_json($new_state));
143 });
144 }
145 }
146
147 # must be called when the job is first created
148 sub create_job {
149 my ($jobid, $type, $cfg) = @_;
150
151 lock_job_state($jobid, $type, sub {
152 my $state = read_job_state($jobid, $type) // $default_state;
153
154 if ($state->{state} ne 'created') {
155 die "job state already exists\n";
156 }
157
158 $state->{time} = time();
159 for my $prop (@$saved_config_props) {
160 if (defined($cfg->{$prop})) {
161 $state->{config}->{$prop} = $cfg->{$prop};
162 }
163 }
164
165 my $path = $get_state_file->($jobid, $type);
166 PVE::Tools::file_set_contents($path, encode_json($state));
167 });
168 }
169
170 # to be called when the job is removed
171 sub remove_job {
172 my ($jobid, $type) = @_;
173 my $path = $get_state_file->($jobid, $type);
174 unlink $path;
175 }
176
177 # checks if the job can be started and sets the state to 'starting'
178 # returns 1 if the job can be started, 0 otherwise
179 sub starting_job {
180 my ($jobid, $type) = @_;
181
182 # first check unlocked to save time
183 my $state = read_job_state($jobid, $type);
184 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
185
186 lock_job_state($jobid, $type, sub {
187 my $state = read_job_state($jobid, $type);
188 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
189
190 my $new_state = {
191 state => 'starting',
192 time => time(),
193 config => $state->{config},
194 };
195
196 my $path = $get_state_file->($jobid, $type);
197 PVE::Tools::file_set_contents($path, encode_json($new_state));
198 });
199 return 1;
200 }
201
202 sub started_job {
203 my ($jobid, $type, $upid, $msg) = @_;
204
205 lock_job_state($jobid, $type, sub {
206 my $state = read_job_state($jobid, $type);
207 return if !defined($state); # job was removed, do not update
208 die "unexpected state '$state->{state}'\n" if $state->{state} ne 'starting';
209
210 my $new_state;
211 if (defined($msg)) {
212 $new_state = {
213 state => 'stopped',
214 msg => $msg,
215 time => time(),
216 };
217 } else {
218 $new_state = {
219 state => 'started',
220 upid => $upid,
221 };
222 }
223 $new_state->{config} = $state->{config};
224
225 my $path = $get_state_file->($jobid, $type);
226 PVE::Tools::file_set_contents($path, encode_json($new_state));
227 });
228 }
229
230 # will be called when the job schedule is updated
231 sub update_last_runtime {
232 my ($jobid, $type) = @_;
233 lock_job_state($jobid, $type, sub {
234 my $old_state = read_job_state($jobid, $type) // $default_state;
235
236 $old_state->{updated} = time();
237
238 my $path = $get_state_file->($jobid, $type);
239 PVE::Tools::file_set_contents($path, encode_json($old_state));
240 });
241 }
242
243 sub get_last_runtime {
244 my ($jobid, $type) = @_;
245
246 my $state = read_job_state($jobid, $type) // $default_state;
247
248 return $state->{updated} if defined($state->{updated});
249
250 if (my $upid = $state->{upid}) {
251 my ($task) = PVE::Tools::upid_decode($upid, 1);
252 die "unable to parse worker upid\n" if !$task;
253 return $task->{starttime};
254 }
255
256 return $state->{time} // 0;
257 }
258
259 sub run_jobs {
260 my ($first_run) = @_;
261
262 synchronize_job_states_with_config();
263
264 my $jobs_cfg = cfs_read_file('jobs.cfg');
265 my $nodename = PVE::INotify::nodename();
266
267 foreach my $id (sort keys %{$jobs_cfg->{ids}}) {
268 my $cfg = $jobs_cfg->{ids}->{$id};
269 my $type = $cfg->{type};
270 my $schedule = delete $cfg->{schedule};
271
272 # only schedule local jobs
273 next if defined($cfg->{node}) && $cfg->{node} ne $nodename;
274
275 eval { update_job_stopped($id, $type) };
276 if (my $err = $@) {
277 warn "could not update job state, skipping - $err\n";
278 next;
279 }
280
281 # update last runtime on the first run when 'repeat-missed' is 0, so that a missed job
282 # will not start immediately after boot
283 update_last_runtime($id, $type) if $first_run && !$cfg->{'repeat-missed'};
284
285 next if defined($cfg->{enabled}) && !$cfg->{enabled}; # only schedule actually enabled jobs
286
287 my $last_run = get_last_runtime($id, $type);
288 my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
289 my $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $last_run);
290
291 next if !defined($next_sync) || time() < $next_sync; # not yet its (next) turn
292
293 my $plugin = PVE::Job::Registry->lookup($type);
294 if (starting_job($id, $type)) {
295 my $upid = eval { $plugin->run($cfg, $id, $schedule) };
296 if (my $err = $@) {
297 warn $@ if $@;
298 started_job($id, $type, undef, $err);
299 } elsif ($upid eq 'OK') { # some jobs return OK immediately
300 started_job($id, $type, undef, 'OK');
301 } else {
302 started_job($id, $type, $upid);
303 }
304 }
305 }
306 }
307
308 # creates and removes statefiles for job configs
309 sub synchronize_job_states_with_config {
310 cfs_lock_file('jobs.cfg', undef, sub {
311 my $data = cfs_read_file('jobs.cfg');
312
313 for my $id (keys $data->{ids}->%*) {
314 my $job = $data->{ids}->{$id};
315 my $type = $job->{type};
316
317 my $path = $get_state_file->($id, $type);
318 if (-e $path) {
319 detect_changed_runtime_props($id, $type, $job);
320 } else {
321 create_job($id, $type, $job);
322 }
323 }
324
325 PVE::Tools::dir_glob_foreach($state_dir, '(.*?)-(.*).json', sub {
326 my ($path, $type, $id) = @_;
327
328 if (!defined($data->{ids}->{$id})) {
329 remove_job($id, $type);
330 }
331 });
332 });
333 die $@ if $@;
334 }
335
336 sub setup_dirs {
337 mkdir $state_dir;
338 mkdir $lock_dir;
339 }
340
341 1;