7 use PVE
::Cluster
qw(cfs_read_file cfs_lock_file);
12 PVE
::Jobs
::VZDump-
>register();
13 PVE
::Jobs
::Plugin-
>init();
15 my $state_dir = "/var/lib/pve-manager/jobs";
16 my $lock_dir = "/var/lock/pve-manager";
18 my $get_state_file = sub {
19 my ($jobid, $type) = @_;
20 return "$state_dir/$type-$jobid.json";
28 # lockless, since we use file_get_contents, which is atomic
30 my ($jobid, $type) = @_;
31 my $path = $get_state_file->($jobid, $type);
34 my $raw = PVE
::Tools
::file_get_contents
($path);
36 return $default_state if $raw eq '';
39 if ($raw =~ m/^(\{.*\})$/) {
40 return decode_json
($1);
43 die "invalid json data in '$path'\n";
47 my ($jobid, $type, $sub) = @_;
49 my $filename = "$lock_dir/$type-$jobid.lck";
51 my $res = PVE
::Tools
::lock_file
($filename, 10, $sub);
57 my $get_job_task_status = sub {
60 if (!defined($state->{upid
})) {
64 my ($task, $filename) = PVE
::Tools
::upid_decode
($state->{upid
}, 1);
65 die "unable to parse worker upid - $state->{upid}\n" if !$task;
66 die "no such task\n" if ! -f
$filename;
68 my $pstart = PVE
::ProcFSTools
::read_proc_starttime
($task->{pid
});
69 if ($pstart && $pstart == $task->{pstart
}) {
70 return; # still running
73 return PVE
::Tools
::upid_read_status
($state->{upid
});
76 # checks if the job is already finished if it was started before and
77 # updates the statefile accordingly
78 sub update_job_stopped
{
79 my ($jobid, $type) = @_;
81 # first check unlocked to save time,
82 my $state = read_job_state
($jobid, $type);
83 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
85 if (defined($get_job_task_status->($state))) {
86 lock_job_state
($jobid, $type, sub {
87 my $state = read_job_state
($jobid, $type);
88 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
92 msg
=> $get_job_task_status->($state) // 'internal error',
93 upid
=> $state->{upid
},
96 if ($state->{updated
}) { # save updated time stamp
97 $new_state->{updated
} = $state->{updated
};
100 my $path = $get_state_file->($jobid, $type);
101 PVE
::Tools
::file_set_contents
($path, encode_json
($new_state));
106 # must be called when the job is first created
108 my ($jobid, $type) = @_;
110 lock_job_state
($jobid, $type, sub {
111 my $state = read_job_state
($jobid, $type) // $default_state;
113 if ($state->{state} ne 'created') {
114 die "job state already exists\n";
117 $state->{time} = time();
119 my $path = $get_state_file->($jobid, $type);
120 PVE
::Tools
::file_set_contents
($path, encode_json
($state));
124 # to be called when the job is removed
126 my ($jobid, $type) = @_;
127 my $path = $get_state_file->($jobid, $type);
131 # checks if the job can be started and sets the state to 'starting'
132 # returns 1 if the job can be started, 0 otherwise
134 my ($jobid, $type) = @_;
136 # first check unlocked to save time
137 my $state = read_job_state
($jobid, $type);
138 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
140 lock_job_state
($jobid, $type, sub {
141 my $state = read_job_state
($jobid, $type);
142 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
149 my $path = $get_state_file->($jobid, $type);
150 PVE
::Tools
::file_set_contents
($path, encode_json
($new_state));
156 my ($jobid, $type, $upid, $msg) = @_;
158 lock_job_state
($jobid, $type, sub {
159 my $state = read_job_state
($jobid, $type);
160 return if !defined($state); # job was removed, do not update
161 die "unexpected state '$state->{state}'\n" if $state->{state} ne 'starting';
177 my $path = $get_state_file->($jobid, $type);
178 PVE
::Tools
::file_set_contents
($path, encode_json
($new_state));
182 # will be called when the job schedule is updated
183 sub updated_job_schedule
{
184 my ($jobid, $type) = @_;
185 lock_job_state
($jobid, $type, sub {
186 my $old_state = read_job_state
($jobid, $type) // $default_state;
188 $old_state->{updated
} = time();
190 my $path = $get_state_file->($jobid, $type);
191 PVE
::Tools
::file_set_contents
($path, encode_json
($old_state));
195 sub get_last_runtime
{
196 my ($jobid, $type) = @_;
198 my $state = read_job_state
($jobid, $type) // $default_state;
200 return $state->{updated
} if defined($state->{updated
});
202 if (my $upid = $state->{upid
}) {
203 my ($task) = PVE
::Tools
::upid_decode
($upid, 1);
204 die "unable to parse worker upid\n" if !$task;
205 return $task->{starttime
};
208 return $state->{time} // 0;
212 synchronize_job_states_with_config
();
214 my $jobs_cfg = cfs_read_file
('jobs.cfg');
215 my $nodename = PVE
::INotify
::nodename
();
217 foreach my $id (sort keys %{$jobs_cfg->{ids
}}) {
218 my $cfg = $jobs_cfg->{ids
}->{$id};
219 my $type = $cfg->{type
};
220 my $schedule = delete $cfg->{schedule
};
222 # only schedule local jobs
223 next if defined($cfg->{node
}) && $cfg->{node
} ne $nodename;
225 eval { update_job_stopped
($id, $type) };
227 warn "could not update job state, skipping - $err\n";
231 next if defined($cfg->{enabled
}) && !$cfg->{enabled
}; # only schedule actually enabled jobs
233 my $last_run = get_last_runtime
($id, $type);
234 my $calspec = PVE
::CalendarEvent
::parse_calendar_event
($schedule);
235 my $next_sync = PVE
::CalendarEvent
::compute_next_event
($calspec, $last_run);
237 next if !defined($next_sync) || time() < $next_sync; # not yet its (next) turn
239 my $plugin = PVE
::Jobs
::Plugin-
>lookup($type);
240 if (starting_job
($id, $type)) {
241 my $upid = eval { $plugin->run($cfg) };
244 started_job
($id, $type, undef, $err);
245 } elsif ($upid eq 'OK') { # some jobs return OK immediately
246 started_job
($id, $type, undef, 'OK');
248 started_job
($id, $type, $upid);
254 # creates and removes statefiles for job configs
255 sub synchronize_job_states_with_config
{
256 cfs_lock_file
('jobs.cfg', undef, sub {
257 my $data = cfs_read_file
('jobs.cfg');
259 for my $id (keys $data->{ids
}->%*) {
260 my $job = $data->{ids
}->{$id};
261 my $type = $job->{type
};
262 my $jobstate = read_job_state
($id, $type);
263 create_job
($id, $type) if !defined($jobstate);
266 PVE
::Tools
::dir_glob_foreach
($state_dir, '(.*?)-(.*).json', sub {
267 my ($path, $type, $id) = @_;
269 if (!defined($data->{ids
}->{$id})) {
270 remove_job
($id, $type);