7 use PVE
::Cluster
qw(cfs_read_file cfs_lock_file);
12 PVE
::Jobs
::VZDump-
>register();
13 PVE
::Jobs
::Plugin-
>init();
15 my $state_dir = "/var/lib/pve-manager/jobs";
16 my $lock_dir = "/var/lock/pve-manager";
18 my $get_state_file = sub {
19 my ($jobid, $type) = @_;
20 return "$state_dir/$type-$jobid.json";
28 my $saved_config_props = [qw(enabled schedule)];
30 # saves some properties of the jobcfg into the jobstate so we can track
31 # them on different nodes (where the update was not done)
32 # and update the last runtime when they change
33 sub detect_changed_runtime_props
{
34 my ($jobid, $type, $cfg) = @_;
36 lock_job_state
($jobid, $type, sub {
37 my $old_state = read_job_state
($jobid, $type) // $default_state;
40 for my $prop (@$saved_config_props) {
41 my $old_prop = $old_state->{config
}->{$prop} // '';
42 my $new_prop = $cfg->{$prop} // '';
43 next if "$old_prop" eq "$new_prop";
45 if (defined($cfg->{$prop})) {
46 $old_state->{config
}->{$prop} = $cfg->{$prop};
48 delete $old_state->{config
}->{$prop};
55 $old_state->{updated
} = time();
57 my $path = $get_state_file->($jobid, $type);
58 PVE
::Tools
::file_set_contents
($path, encode_json
($old_state));
62 # lockless, since we use file_get_contents, which is atomic
64 my ($jobid, $type) = @_;
65 my $path = $get_state_file->($jobid, $type);
68 my $raw = PVE
::Tools
::file_get_contents
($path);
70 return $default_state if $raw eq '';
73 if ($raw =~ m/^(\{.*\})$/) {
74 return decode_json
($1);
77 die "invalid json data in '$path'\n";
81 my ($jobid, $type, $sub) = @_;
83 my $filename = "$lock_dir/$type-$jobid.lck";
85 my $res = PVE
::Tools
::lock_file
($filename, 10, $sub);
91 my $get_job_task_status = sub {
94 if (!defined($state->{upid
})) {
98 my ($task, $filename) = PVE
::Tools
::upid_decode
($state->{upid
}, 1);
99 die "unable to parse worker upid - $state->{upid}\n" if !$task;
100 die "no such task\n" if ! -f
$filename;
102 my $pstart = PVE
::ProcFSTools
::read_proc_starttime
($task->{pid
});
103 if ($pstart && $pstart == $task->{pstart
}) {
104 return; # still running
107 return PVE
::Tools
::upid_read_status
($state->{upid
});
110 # checks if the job is already finished if it was started before and
111 # updates the statefile accordingly
112 sub update_job_stopped
{
113 my ($jobid, $type) = @_;
115 # first check unlocked to save time,
116 my $state = read_job_state
($jobid, $type);
117 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
119 if (defined($get_job_task_status->($state))) {
120 lock_job_state
($jobid, $type, sub {
121 my $state = read_job_state
($jobid, $type);
122 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
126 msg
=> $get_job_task_status->($state) // 'internal error',
127 upid
=> $state->{upid
},
128 config
=> $state->{config
},
131 if ($state->{updated
}) { # save updated time stamp
132 $new_state->{updated
} = $state->{updated
};
135 my $path = $get_state_file->($jobid, $type);
136 PVE
::Tools
::file_set_contents
($path, encode_json
($new_state));
141 # must be called when the job is first created
143 my ($jobid, $type, $cfg) = @_;
145 lock_job_state
($jobid, $type, sub {
146 my $state = read_job_state
($jobid, $type) // $default_state;
148 if ($state->{state} ne 'created') {
149 die "job state already exists\n";
152 $state->{time} = time();
153 for my $prop (@$saved_config_props) {
154 if (defined($cfg->{$prop})) {
155 $state->{config
}->{$prop} = $cfg->{$prop};
159 my $path = $get_state_file->($jobid, $type);
160 PVE
::Tools
::file_set_contents
($path, encode_json
($state));
164 # to be called when the job is removed
166 my ($jobid, $type) = @_;
167 my $path = $get_state_file->($jobid, $type);
171 # checks if the job can be started and sets the state to 'starting'
172 # returns 1 if the job can be started, 0 otherwise
174 my ($jobid, $type) = @_;
176 # first check unlocked to save time
177 my $state = read_job_state
($jobid, $type);
178 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
180 lock_job_state
($jobid, $type, sub {
181 my $state = read_job_state
($jobid, $type);
182 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
187 config
=> $state->{config
},
190 my $path = $get_state_file->($jobid, $type);
191 PVE
::Tools
::file_set_contents
($path, encode_json
($new_state));
197 my ($jobid, $type, $upid, $msg) = @_;
199 lock_job_state
($jobid, $type, sub {
200 my $state = read_job_state
($jobid, $type);
201 return if !defined($state); # job was removed, do not update
202 die "unexpected state '$state->{state}'\n" if $state->{state} ne 'starting';
217 $new_state->{config
} = $state->{config
};
219 my $path = $get_state_file->($jobid, $type);
220 PVE
::Tools
::file_set_contents
($path, encode_json
($new_state));
224 # will be called when the job schedule is updated
225 sub update_last_runtime
{
226 my ($jobid, $type) = @_;
227 lock_job_state
($jobid, $type, sub {
228 my $old_state = read_job_state
($jobid, $type) // $default_state;
230 $old_state->{updated
} = time();
232 my $path = $get_state_file->($jobid, $type);
233 PVE
::Tools
::file_set_contents
($path, encode_json
($old_state));
237 sub get_last_runtime
{
238 my ($jobid, $type) = @_;
240 my $state = read_job_state
($jobid, $type) // $default_state;
242 return $state->{updated
} if defined($state->{updated
});
244 if (my $upid = $state->{upid
}) {
245 my ($task) = PVE
::Tools
::upid_decode
($upid, 1);
246 die "unable to parse worker upid\n" if !$task;
247 return $task->{starttime
};
250 return $state->{time} // 0;
254 my ($first_run) = @_;
256 synchronize_job_states_with_config
();
258 my $jobs_cfg = cfs_read_file
('jobs.cfg');
259 my $nodename = PVE
::INotify
::nodename
();
261 foreach my $id (sort keys %{$jobs_cfg->{ids
}}) {
262 my $cfg = $jobs_cfg->{ids
}->{$id};
263 my $type = $cfg->{type
};
264 my $schedule = delete $cfg->{schedule
};
266 # only schedule local jobs
267 next if defined($cfg->{node
}) && $cfg->{node
} ne $nodename;
269 eval { update_job_stopped
($id, $type) };
271 warn "could not update job state, skipping - $err\n";
275 # update last runtime on the first run when 'repeat-missed' is 0, so that a missed job
276 # will not start immediately after boot
277 update_last_runtime
($id, $type) if $first_run && !$cfg->{'repeat-missed'};
279 next if defined($cfg->{enabled
}) && !$cfg->{enabled
}; # only schedule actually enabled jobs
281 my $last_run = get_last_runtime
($id, $type);
282 my $calspec = PVE
::CalendarEvent
::parse_calendar_event
($schedule);
283 my $next_sync = PVE
::CalendarEvent
::compute_next_event
($calspec, $last_run);
285 next if !defined($next_sync) || time() < $next_sync; # not yet its (next) turn
287 my $plugin = PVE
::Jobs
::Plugin-
>lookup($type);
288 if (starting_job
($id, $type)) {
289 my $upid = eval { $plugin->run($cfg) };
292 started_job
($id, $type, undef, $err);
293 } elsif ($upid eq 'OK') { # some jobs return OK immediately
294 started_job
($id, $type, undef, 'OK');
296 started_job
($id, $type, $upid);
302 # creates and removes statefiles for job configs
303 sub synchronize_job_states_with_config
{
304 cfs_lock_file
('jobs.cfg', undef, sub {
305 my $data = cfs_read_file
('jobs.cfg');
307 for my $id (keys $data->{ids
}->%*) {
308 my $job = $data->{ids
}->{$id};
309 my $type = $job->{type
};
311 my $path = $get_state_file->($id, $type);
313 detect_changed_runtime_props
($id, $type, $job);
315 create_job
($id, $type, $job);
319 PVE
::Tools
::dir_glob_foreach
($state_dir, '(.*?)-(.*).json', sub {
320 my ($path, $type, $id) = @_;
322 if (!defined($data->{ids
}->{$id})) {
323 remove_job
($id, $type);