]> git.proxmox.com Git - pve-manager.git/blob - PVE/Jobs.pm
update shipped appliance info index
[pve-manager.git] / PVE / Jobs.pm
1 package PVE::Jobs;
2
3 use strict;
4 use warnings;
5 use JSON;
6
7 use PVE::Cluster qw(cfs_read_file cfs_lock_file);
8 use PVE::Jobs::Plugin;
9 use PVE::Jobs::VZDump;
10 use PVE::Tools;
11
12 PVE::Jobs::VZDump->register();
13 PVE::Jobs::Plugin->init();
14
15 my $state_dir = "/var/lib/pve-manager/jobs";
16 my $lock_dir = "/var/lock/pve-manager";
17
18 my $get_state_file = sub {
19 my ($jobid, $type) = @_;
20 return "$state_dir/$type-$jobid.json";
21 };
22
23 my $default_state = {
24 state => 'created',
25 time => 0,
26 };
27
28 my $saved_config_props = [qw(enabled schedule)];
29
30 # saves some properties of the jobcfg into the jobstate so we can track
31 # them on different nodes (where the update was not done)
32 # and update the last runtime when they change
33 sub detect_changed_runtime_props {
34 my ($jobid, $type, $cfg) = @_;
35
36 lock_job_state($jobid, $type, sub {
37 my $old_state = read_job_state($jobid, $type) // $default_state;
38
39 my $updated = 0;
40 for my $prop (@$saved_config_props) {
41 my $old_prop = $old_state->{config}->{$prop} // '';
42 my $new_prop = $cfg->{$prop} // '';
43 next if "$old_prop" eq "$new_prop";
44
45 if (defined($cfg->{$prop})) {
46 $old_state->{config}->{$prop} = $cfg->{$prop};
47 } else {
48 delete $old_state->{config}->{$prop};
49 }
50
51 $updated = 1;
52 }
53
54 return if !$updated;
55 $old_state->{updated} = time();
56
57 my $path = $get_state_file->($jobid, $type);
58 PVE::Tools::file_set_contents($path, encode_json($old_state));
59 });
60 }
61
62 # lockless, since we use file_get_contents, which is atomic
63 sub read_job_state {
64 my ($jobid, $type) = @_;
65 my $path = $get_state_file->($jobid, $type);
66 return if ! -e $path;
67
68 my $raw = PVE::Tools::file_get_contents($path);
69
70 return $default_state if $raw eq '';
71
72 # untaint $raw
73 if ($raw =~ m/^(\{.*\})$/) {
74 return decode_json($1);
75 }
76
77 die "invalid json data in '$path'\n";
78 }
79
80 sub lock_job_state {
81 my ($jobid, $type, $sub) = @_;
82
83 my $filename = "$lock_dir/$type-$jobid.lck";
84
85 my $res = PVE::Tools::lock_file($filename, 10, $sub);
86 die $@ if $@;
87
88 return $res;
89 }
90
91 my $get_job_task_status = sub {
92 my ($state) = @_;
93
94 if (!defined($state->{upid})) {
95 return; # not started
96 }
97
98 my ($task, $filename) = PVE::Tools::upid_decode($state->{upid}, 1);
99 die "unable to parse worker upid - $state->{upid}\n" if !$task;
100 die "no such task\n" if ! -f $filename;
101
102 my $pstart = PVE::ProcFSTools::read_proc_starttime($task->{pid});
103 if ($pstart && $pstart == $task->{pstart}) {
104 return; # still running
105 }
106
107 return PVE::Tools::upid_read_status($state->{upid});
108 };
109
110 # checks if the job is already finished if it was started before and
111 # updates the statefile accordingly
112 sub update_job_stopped {
113 my ($jobid, $type) = @_;
114
115 # first check unlocked to save time,
116 my $state = read_job_state($jobid, $type);
117 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
118
119 if (defined($get_job_task_status->($state))) {
120 lock_job_state($jobid, $type, sub {
121 my $state = read_job_state($jobid, $type);
122 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
123
124 my $new_state = {
125 state => 'stopped',
126 msg => $get_job_task_status->($state) // 'internal error',
127 upid => $state->{upid},
128 config => $state->{config},
129 };
130
131 if ($state->{updated}) { # save updated time stamp
132 $new_state->{updated} = $state->{updated};
133 }
134
135 my $path = $get_state_file->($jobid, $type);
136 PVE::Tools::file_set_contents($path, encode_json($new_state));
137 });
138 }
139 }
140
141 # must be called when the job is first created
142 sub create_job {
143 my ($jobid, $type, $cfg) = @_;
144
145 lock_job_state($jobid, $type, sub {
146 my $state = read_job_state($jobid, $type) // $default_state;
147
148 if ($state->{state} ne 'created') {
149 die "job state already exists\n";
150 }
151
152 $state->{time} = time();
153 for my $prop (@$saved_config_props) {
154 if (defined($cfg->{$prop})) {
155 $state->{config}->{$prop} = $cfg->{$prop};
156 }
157 }
158
159 my $path = $get_state_file->($jobid, $type);
160 PVE::Tools::file_set_contents($path, encode_json($state));
161 });
162 }
163
164 # to be called when the job is removed
165 sub remove_job {
166 my ($jobid, $type) = @_;
167 my $path = $get_state_file->($jobid, $type);
168 unlink $path;
169 }
170
171 # checks if the job can be started and sets the state to 'starting'
172 # returns 1 if the job can be started, 0 otherwise
173 sub starting_job {
174 my ($jobid, $type) = @_;
175
176 # first check unlocked to save time
177 my $state = read_job_state($jobid, $type);
178 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
179
180 lock_job_state($jobid, $type, sub {
181 my $state = read_job_state($jobid, $type);
182 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
183
184 my $new_state = {
185 state => 'starting',
186 time => time(),
187 config => $state->{config},
188 };
189
190 my $path = $get_state_file->($jobid, $type);
191 PVE::Tools::file_set_contents($path, encode_json($new_state));
192 });
193 return 1;
194 }
195
196 sub started_job {
197 my ($jobid, $type, $upid, $msg) = @_;
198
199 lock_job_state($jobid, $type, sub {
200 my $state = read_job_state($jobid, $type);
201 return if !defined($state); # job was removed, do not update
202 die "unexpected state '$state->{state}'\n" if $state->{state} ne 'starting';
203
204 my $new_state;
205 if (defined($msg)) {
206 $new_state = {
207 state => 'stopped',
208 msg => $msg,
209 time => time(),
210 };
211 } else {
212 $new_state = {
213 state => 'started',
214 upid => $upid,
215 };
216 }
217 $new_state->{config} = $state->{config};
218
219 my $path = $get_state_file->($jobid, $type);
220 PVE::Tools::file_set_contents($path, encode_json($new_state));
221 });
222 }
223
224 # will be called when the job schedule is updated
225 sub update_last_runtime {
226 my ($jobid, $type) = @_;
227 lock_job_state($jobid, $type, sub {
228 my $old_state = read_job_state($jobid, $type) // $default_state;
229
230 $old_state->{updated} = time();
231
232 my $path = $get_state_file->($jobid, $type);
233 PVE::Tools::file_set_contents($path, encode_json($old_state));
234 });
235 }
236
237 sub get_last_runtime {
238 my ($jobid, $type) = @_;
239
240 my $state = read_job_state($jobid, $type) // $default_state;
241
242 return $state->{updated} if defined($state->{updated});
243
244 if (my $upid = $state->{upid}) {
245 my ($task) = PVE::Tools::upid_decode($upid, 1);
246 die "unable to parse worker upid\n" if !$task;
247 return $task->{starttime};
248 }
249
250 return $state->{time} // 0;
251 }
252
253 sub run_jobs {
254 my ($first_run) = @_;
255
256 synchronize_job_states_with_config();
257
258 my $jobs_cfg = cfs_read_file('jobs.cfg');
259 my $nodename = PVE::INotify::nodename();
260
261 foreach my $id (sort keys %{$jobs_cfg->{ids}}) {
262 my $cfg = $jobs_cfg->{ids}->{$id};
263 my $type = $cfg->{type};
264 my $schedule = delete $cfg->{schedule};
265
266 # only schedule local jobs
267 next if defined($cfg->{node}) && $cfg->{node} ne $nodename;
268
269 eval { update_job_stopped($id, $type) };
270 if (my $err = $@) {
271 warn "could not update job state, skipping - $err\n";
272 next;
273 }
274
275 # update last runtime on the first run when 'repeat-missed' is 0, so that a missed job
276 # will not start immediately after boot
277 update_last_runtime($id, $type) if $first_run && !$cfg->{'repeat-missed'};
278
279 next if defined($cfg->{enabled}) && !$cfg->{enabled}; # only schedule actually enabled jobs
280
281 my $last_run = get_last_runtime($id, $type);
282 my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
283 my $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $last_run);
284
285 next if !defined($next_sync) || time() < $next_sync; # not yet its (next) turn
286
287 my $plugin = PVE::Jobs::Plugin->lookup($type);
288 if (starting_job($id, $type)) {
289 my $upid = eval { $plugin->run($cfg) };
290 if (my $err = $@) {
291 warn $@ if $@;
292 started_job($id, $type, undef, $err);
293 } elsif ($upid eq 'OK') { # some jobs return OK immediately
294 started_job($id, $type, undef, 'OK');
295 } else {
296 started_job($id, $type, $upid);
297 }
298 }
299 }
300 }
301
302 # creates and removes statefiles for job configs
303 sub synchronize_job_states_with_config {
304 cfs_lock_file('jobs.cfg', undef, sub {
305 my $data = cfs_read_file('jobs.cfg');
306
307 for my $id (keys $data->{ids}->%*) {
308 my $job = $data->{ids}->{$id};
309 my $type = $job->{type};
310
311 my $path = $get_state_file->($id, $type);
312 if (-e $path) {
313 detect_changed_runtime_props($id, $type, $job);
314 } else {
315 create_job($id, $type, $job);
316 }
317 }
318
319 PVE::Tools::dir_glob_foreach($state_dir, '(.*?)-(.*).json', sub {
320 my ($path, $type, $id) = @_;
321
322 if (!defined($data->{ids}->{$id})) {
323 remove_job($id, $type);
324 }
325 });
326 });
327 die $@ if $@;
328 }
329
330 sub setup_dirs {
331 mkdir $state_dir;
332 mkdir $lock_dir;
333 }
334
335 1;