7 use PVE
::Cluster
qw(cfs_lock_file cfs_read_file cfs_register_file);
8 use PVE
::Job
::Registry
;
12 PVE
::Jobs
::VZDump-
>register();
13 PVE
::Job
::Registry-
>init();
17 sub { PVE
::Job
::Registry-
>parse_config(@_); },
18 sub { PVE
::Job
::Registry-
>write_config(@_); },
21 my $state_dir = "/var/lib/pve-manager/jobs";
22 my $lock_dir = "/var/lock/pve-manager";
24 my $get_state_file = sub {
25 my ($jobid, $type) = @_;
26 return "$state_dir/$type-$jobid.json";
34 my $saved_config_props = [qw(enabled schedule)];
36 # saves some properties of the jobcfg into the jobstate so we can track
37 # them on different nodes (where the update was not done)
38 # and update the last runtime when they change
39 sub detect_changed_runtime_props
{
40 my ($jobid, $type, $cfg) = @_;
42 lock_job_state
($jobid, $type, sub {
43 my $old_state = read_job_state
($jobid, $type) // $default_state;
46 for my $prop (@$saved_config_props) {
47 my $old_prop = $old_state->{config
}->{$prop} // '';
48 my $new_prop = $cfg->{$prop} // '';
49 next if "$old_prop" eq "$new_prop";
51 if (defined($cfg->{$prop})) {
52 $old_state->{config
}->{$prop} = $cfg->{$prop};
54 delete $old_state->{config
}->{$prop};
61 $old_state->{updated
} = time();
63 my $path = $get_state_file->($jobid, $type);
64 PVE
::Tools
::file_set_contents
($path, encode_json
($old_state));
68 # lockless, since we use file_get_contents, which is atomic
70 my ($jobid, $type) = @_;
71 my $path = $get_state_file->($jobid, $type);
74 my $raw = PVE
::Tools
::file_get_contents
($path);
76 return $default_state if $raw eq '';
79 if ($raw =~ m/^(\{.*\})$/) {
80 return decode_json
($1);
83 die "invalid json data in '$path'\n";
87 my ($jobid, $type, $sub) = @_;
89 my $filename = "$lock_dir/$type-$jobid.lck";
91 my $res = PVE
::Tools
::lock_file
($filename, 10, $sub);
97 my $get_job_task_status = sub {
100 if (!defined($state->{upid
})) {
101 return; # not started
104 my ($task, $filename) = PVE
::Tools
::upid_decode
($state->{upid
}, 1);
105 die "unable to parse worker upid - $state->{upid}\n" if !$task;
106 die "no such task\n" if ! -f
$filename;
108 my $pstart = PVE
::ProcFSTools
::read_proc_starttime
($task->{pid
});
109 if ($pstart && $pstart == $task->{pstart
}) {
110 return; # still running
113 return PVE
::Tools
::upid_read_status
($state->{upid
});
116 # checks if the job is already finished if it was started before and
117 # updates the statefile accordingly
118 sub update_job_stopped
{
119 my ($jobid, $type) = @_;
121 # first check unlocked to save time,
122 my $state = read_job_state
($jobid, $type);
123 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
125 if (defined($get_job_task_status->($state))) {
126 lock_job_state
($jobid, $type, sub {
127 my $state = read_job_state
($jobid, $type);
128 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
132 msg
=> $get_job_task_status->($state) // 'internal error',
133 upid
=> $state->{upid
},
134 config
=> $state->{config
},
137 if ($state->{updated
}) { # save updated time stamp
138 $new_state->{updated
} = $state->{updated
};
141 my $path = $get_state_file->($jobid, $type);
142 PVE
::Tools
::file_set_contents
($path, encode_json
($new_state));
147 # must be called when the job is first created
149 my ($jobid, $type, $cfg) = @_;
151 lock_job_state
($jobid, $type, sub {
152 my $state = read_job_state
($jobid, $type) // $default_state;
154 if ($state->{state} ne 'created') {
155 die "job state already exists\n";
158 $state->{time} = time();
159 for my $prop (@$saved_config_props) {
160 if (defined($cfg->{$prop})) {
161 $state->{config
}->{$prop} = $cfg->{$prop};
165 my $path = $get_state_file->($jobid, $type);
166 PVE
::Tools
::file_set_contents
($path, encode_json
($state));
170 # to be called when the job is removed
172 my ($jobid, $type) = @_;
173 my $path = $get_state_file->($jobid, $type);
177 # checks if the job can be started and sets the state to 'starting'
178 # returns 1 if the job can be started, 0 otherwise
180 my ($jobid, $type) = @_;
182 # first check unlocked to save time
183 my $state = read_job_state
($jobid, $type);
184 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
186 lock_job_state
($jobid, $type, sub {
187 my $state = read_job_state
($jobid, $type);
188 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
193 config
=> $state->{config
},
196 my $path = $get_state_file->($jobid, $type);
197 PVE
::Tools
::file_set_contents
($path, encode_json
($new_state));
203 my ($jobid, $type, $upid, $msg) = @_;
205 lock_job_state
($jobid, $type, sub {
206 my $state = read_job_state
($jobid, $type);
207 return if !defined($state); # job was removed, do not update
208 die "unexpected state '$state->{state}'\n" if $state->{state} ne 'starting';
223 $new_state->{config
} = $state->{config
};
225 my $path = $get_state_file->($jobid, $type);
226 PVE
::Tools
::file_set_contents
($path, encode_json
($new_state));
230 # will be called when the job schedule is updated
231 sub update_last_runtime
{
232 my ($jobid, $type) = @_;
233 lock_job_state
($jobid, $type, sub {
234 my $old_state = read_job_state
($jobid, $type) // $default_state;
236 $old_state->{updated
} = time();
238 my $path = $get_state_file->($jobid, $type);
239 PVE
::Tools
::file_set_contents
($path, encode_json
($old_state));
243 sub get_last_runtime
{
244 my ($jobid, $type) = @_;
246 my $state = read_job_state
($jobid, $type) // $default_state;
248 return $state->{updated
} if defined($state->{updated
});
250 if (my $upid = $state->{upid
}) {
251 my ($task) = PVE
::Tools
::upid_decode
($upid, 1);
252 die "unable to parse worker upid\n" if !$task;
253 return $task->{starttime
};
256 return $state->{time} // 0;
260 my ($first_run) = @_;
262 synchronize_job_states_with_config
();
264 my $jobs_cfg = cfs_read_file
('jobs.cfg');
265 my $nodename = PVE
::INotify
::nodename
();
267 foreach my $id (sort keys %{$jobs_cfg->{ids
}}) {
268 my $cfg = $jobs_cfg->{ids
}->{$id};
269 my $type = $cfg->{type
};
270 my $schedule = delete $cfg->{schedule
};
272 # only schedule local jobs
273 next if defined($cfg->{node
}) && $cfg->{node
} ne $nodename;
275 eval { update_job_stopped
($id, $type) };
277 warn "could not update job state, skipping - $err\n";
281 # update last runtime on the first run when 'repeat-missed' is 0, so that a missed job
282 # will not start immediately after boot
283 update_last_runtime
($id, $type) if $first_run && !$cfg->{'repeat-missed'};
285 next if defined($cfg->{enabled
}) && !$cfg->{enabled
}; # only schedule actually enabled jobs
287 my $last_run = get_last_runtime
($id, $type);
288 my $calspec = PVE
::CalendarEvent
::parse_calendar_event
($schedule);
289 my $next_sync = PVE
::CalendarEvent
::compute_next_event
($calspec, $last_run);
291 next if !defined($next_sync) || time() < $next_sync; # not yet its (next) turn
293 my $plugin = PVE
::Job
::Registry-
>lookup($type);
294 if (starting_job
($id, $type)) {
295 PVE
::Cluster
::cfs_update
();
297 my $upid = eval { $plugin->run($cfg, $id, $schedule) };
300 started_job
($id, $type, undef, $err);
301 } elsif ($upid eq 'OK') { # some jobs return OK immediately
302 started_job
($id, $type, undef, 'OK');
304 started_job
($id, $type, $upid);
310 # creates and removes statefiles for job configs
311 sub synchronize_job_states_with_config
{
312 cfs_lock_file
('jobs.cfg', undef, sub {
313 my $data = cfs_read_file
('jobs.cfg');
315 for my $id (keys $data->{ids
}->%*) {
316 my $job = $data->{ids
}->{$id};
317 my $type = $job->{type
};
319 my $path = $get_state_file->($id, $type);
321 detect_changed_runtime_props
($id, $type, $job);
323 create_job
($id, $type, $job);
327 PVE
::Tools
::dir_glob_foreach
($state_dir, '(.*?)-(.*).json', sub {
328 my ($path, $type, $id) = @_;
330 if (!defined($data->{ids
}->{$id})) {
331 remove_job
($id, $type);