7 use PVE
::Cluster
qw(cfs_lock_file cfs_read_file cfs_register_file);
8 use PVE
::Job
::Registry
;
10 use PVE
::Jobs
::RealmSync
;
13 PVE
::Jobs
::VZDump-
>register();
14 PVE
::Jobs
::RealmSync-
>register();
15 PVE
::Job
::Registry-
>init();
19 sub { PVE
::Job
::Registry-
>parse_config(@_); },
20 sub { PVE
::Job
::Registry-
>write_config(@_); },
23 my $state_dir = "/var/lib/pve-manager/jobs";
24 my $lock_dir = "/var/lock/pve-manager";
26 my $get_state_file = sub {
27 my ($jobid, $type) = @_;
28 return "$state_dir/$type-$jobid.json";
36 my $saved_config_props = [qw(enabled schedule)];
38 # saves some properties of the jobcfg into the jobstate so we can track
39 # them on different nodes (where the update was not done)
40 # and update the last runtime when they change
41 sub detect_changed_runtime_props
{
42 my ($jobid, $type, $cfg) = @_;
44 lock_job_state
($jobid, $type, sub {
45 my $old_state = read_job_state
($jobid, $type) // $default_state;
48 for my $prop (@$saved_config_props) {
49 my $old_prop = $old_state->{config
}->{$prop} // '';
50 my $new_prop = $cfg->{$prop} // '';
51 next if "$old_prop" eq "$new_prop";
53 if (defined($cfg->{$prop})) {
54 $old_state->{config
}->{$prop} = $cfg->{$prop};
56 delete $old_state->{config
}->{$prop};
63 $old_state->{updated
} = time();
65 my $path = $get_state_file->($jobid, $type);
66 PVE
::Tools
::file_set_contents
($path, encode_json
($old_state));
70 # lockless, since we use file_get_contents, which is atomic
72 my ($jobid, $type) = @_;
73 my $path = $get_state_file->($jobid, $type);
76 my $raw = PVE
::Tools
::file_get_contents
($path);
78 return $default_state if $raw eq '';
81 if ($raw =~ m/^(\{.*\})$/) {
82 return decode_json
($1);
85 die "invalid json data in '$path'\n";
89 my ($jobid, $type, $sub) = @_;
91 my $filename = "$lock_dir/$type-$jobid.lck";
93 my $res = PVE
::Tools
::lock_file
($filename, 10, $sub);
99 my $get_job_task_status = sub {
102 if (!defined($state->{upid
})) {
103 return; # not started
106 my ($task, $filename) = PVE
::Tools
::upid_decode
($state->{upid
}, 1);
107 die "unable to parse worker upid - $state->{upid}\n" if !$task;
108 die "no such task\n" if ! -f
$filename;
110 my $pstart = PVE
::ProcFSTools
::read_proc_starttime
($task->{pid
});
111 if ($pstart && $pstart == $task->{pstart
}) {
112 return; # still running
115 return PVE
::Tools
::upid_read_status
($state->{upid
});
118 # checks if the job is already finished if it was started before and
119 # updates the statefile accordingly
120 sub update_job_stopped
{
121 my ($jobid, $type) = @_;
123 # first check unlocked to save time,
124 my $state = read_job_state
($jobid, $type);
125 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
127 if (defined($get_job_task_status->($state))) {
128 lock_job_state
($jobid, $type, sub {
129 my $state = read_job_state
($jobid, $type);
130 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
134 msg
=> $get_job_task_status->($state) // 'internal error',
135 upid
=> $state->{upid
},
136 config
=> $state->{config
},
139 if ($state->{updated
}) { # save updated time stamp
140 $new_state->{updated
} = $state->{updated
};
143 my $path = $get_state_file->($jobid, $type);
144 PVE
::Tools
::file_set_contents
($path, encode_json
($new_state));
149 # must be called when the job is first created
151 my ($jobid, $type, $cfg) = @_;
153 lock_job_state
($jobid, $type, sub {
154 my $state = read_job_state
($jobid, $type) // $default_state;
156 if ($state->{state} ne 'created') {
157 die "job state already exists\n";
160 $state->{time} = time();
161 for my $prop (@$saved_config_props) {
162 if (defined($cfg->{$prop})) {
163 $state->{config
}->{$prop} = $cfg->{$prop};
167 my $path = $get_state_file->($jobid, $type);
168 PVE
::Tools
::file_set_contents
($path, encode_json
($state));
172 # to be called when the job is removed
174 my ($jobid, $type) = @_;
175 my $path = $get_state_file->($jobid, $type);
179 # checks if the job can be started and sets the state to 'starting'
180 # returns 1 if the job can be started, 0 otherwise
182 my ($jobid, $type) = @_;
184 # first check unlocked to save time
185 my $state = read_job_state
($jobid, $type);
186 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
188 lock_job_state
($jobid, $type, sub {
189 my $state = read_job_state
($jobid, $type);
190 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
195 config
=> $state->{config
},
198 my $path = $get_state_file->($jobid, $type);
199 PVE
::Tools
::file_set_contents
($path, encode_json
($new_state));
205 my ($jobid, $type, $upid, $msg) = @_;
207 lock_job_state
($jobid, $type, sub {
208 my $state = read_job_state
($jobid, $type);
209 return if !defined($state); # job was removed, do not update
210 die "unexpected state '$state->{state}'\n" if $state->{state} ne 'starting';
225 $new_state->{config
} = $state->{config
};
227 my $path = $get_state_file->($jobid, $type);
228 PVE
::Tools
::file_set_contents
($path, encode_json
($new_state));
232 # will be called when the job schedule is updated
233 sub update_last_runtime
{
234 my ($jobid, $type) = @_;
235 lock_job_state
($jobid, $type, sub {
236 my $old_state = read_job_state
($jobid, $type) // $default_state;
238 $old_state->{updated
} = time();
240 my $path = $get_state_file->($jobid, $type);
241 PVE
::Tools
::file_set_contents
($path, encode_json
($old_state));
245 sub get_last_runtime
{
246 my ($jobid, $type) = @_;
248 my $state = read_job_state
($jobid, $type) // $default_state;
250 return $state->{updated
} if defined($state->{updated
});
252 if (my $upid = $state->{upid
}) {
253 my ($task) = PVE
::Tools
::upid_decode
($upid, 1);
254 die "unable to parse worker upid\n" if !$task;
255 return $task->{starttime
};
258 return $state->{time} // 0;
262 my ($first_run) = @_;
264 synchronize_job_states_with_config
();
266 my $jobs_cfg = cfs_read_file
('jobs.cfg');
267 my $nodename = PVE
::INotify
::nodename
();
269 foreach my $id (sort keys %{$jobs_cfg->{ids
}}) {
270 my $cfg = $jobs_cfg->{ids
}->{$id};
271 my $type = $cfg->{type
};
272 my $schedule = delete $cfg->{schedule
};
274 # only schedule local jobs
275 next if defined($cfg->{node
}) && $cfg->{node
} ne $nodename;
277 eval { update_job_stopped
($id, $type) };
279 warn "could not update job state, skipping - $err\n";
283 # update last runtime on the first run when 'repeat-missed' is 0, so that a missed job
284 # will not start immediately after boot
285 update_last_runtime
($id, $type) if $first_run && !$cfg->{'repeat-missed'};
287 next if defined($cfg->{enabled
}) && !$cfg->{enabled
}; # only schedule actually enabled jobs
289 my $last_run = get_last_runtime
($id, $type);
290 my $calspec = PVE
::CalendarEvent
::parse_calendar_event
($schedule);
291 my $next_sync = PVE
::CalendarEvent
::compute_next_event
($calspec, $last_run);
293 next if !defined($next_sync) || time() < $next_sync; # not yet its (next) turn
295 my $plugin = PVE
::Job
::Registry-
>lookup($type);
296 if (starting_job
($id, $type)) {
297 PVE
::Cluster
::cfs_update
();
299 my $upid = eval { $plugin->run($cfg, $id, $schedule) };
302 started_job
($id, $type, undef, $err);
303 } elsif ($upid eq 'OK') { # some jobs return OK immediately
304 started_job
($id, $type, undef, 'OK');
306 started_job
($id, $type, $upid);
312 # creates and removes statefiles for job configs
313 sub synchronize_job_states_with_config
{
314 cfs_lock_file
('jobs.cfg', undef, sub {
315 my $data = cfs_read_file
('jobs.cfg');
317 for my $id (keys $data->{ids
}->%*) {
318 my $job = $data->{ids
}->{$id};
319 my $type = $job->{type
};
321 my $path = $get_state_file->($id, $type);
323 detect_changed_runtime_props
($id, $type, $job);
325 create_job
($id, $type, $job);
329 my $valid_types = PVE
::Job
::Registry-
>lookup_types();
330 my $type_regex = join("|", $valid_types->@*);
332 PVE
::Tools
::dir_glob_foreach
($state_dir, "(${type_regex})-(.*).json", sub {
333 my ($path, $type, $id) = @_;
335 if (!defined($data->{ids
}->{$id})) {
336 remove_job
($id, $type);