]> git.proxmox.com Git - pve-manager.git/blob - PVE/Jobs.pm
ceph pools: allow to create erasure code pools
[pve-manager.git] / PVE / Jobs.pm
1 package PVE::Jobs;
2
3 use strict;
4 use warnings;
5 use JSON;
6
7 use PVE::Cluster qw(cfs_read_file cfs_lock_file);
8 use PVE::Jobs::Plugin;
9 use PVE::Jobs::VZDump;
10 use PVE::Tools;
11
12 PVE::Jobs::VZDump->register();
13 PVE::Jobs::Plugin->init();
14
15 my $state_dir = "/var/lib/pve-manager/jobs";
16 my $lock_dir = "/var/lock/pve-manager";
17
18 my $get_state_file = sub {
19 my ($jobid, $type) = @_;
20 return "$state_dir/$type-$jobid.json";
21 };
22
23 my $default_state = {
24 state => 'created',
25 time => 0,
26 };
27
28 # lockless, since we use file_get_contents, which is atomic
29 sub read_job_state {
30 my ($jobid, $type) = @_;
31 my $path = $get_state_file->($jobid, $type);
32 return if ! -e $path;
33
34 my $raw = PVE::Tools::file_get_contents($path);
35
36 return $default_state if $raw eq '';
37
38 # untaint $raw
39 if ($raw =~ m/^(\{.*\})$/) {
40 return decode_json($1);
41 }
42
43 die "invalid json data in '$path'\n";
44 }
45
46 sub lock_job_state {
47 my ($jobid, $type, $sub) = @_;
48
49 my $filename = "$lock_dir/$type-$jobid.lck";
50
51 my $res = PVE::Tools::lock_file($filename, 10, $sub);
52 die $@ if $@;
53
54 return $res;
55 }
56
57 my $get_job_task_status = sub {
58 my ($state) = @_;
59
60 if (!defined($state->{upid})) {
61 return; # not started
62 }
63
64 my ($task, $filename) = PVE::Tools::upid_decode($state->{upid}, 1);
65 die "unable to parse worker upid - $state->{upid}\n" if !$task;
66 die "no such task\n" if ! -f $filename;
67
68 my $pstart = PVE::ProcFSTools::read_proc_starttime($task->{pid});
69 if ($pstart && $pstart == $task->{pstart}) {
70 return; # still running
71 }
72
73 return PVE::Tools::upid_read_status($state->{upid});
74 };
75
76 # checks if the job is already finished if it was started before and
77 # updates the statefile accordingly
78 sub update_job_stopped {
79 my ($jobid, $type) = @_;
80
81 # first check unlocked to save time,
82 my $state = read_job_state($jobid, $type);
83 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
84
85 if (defined($get_job_task_status->($state))) {
86 lock_job_state($jobid, $type, sub {
87 my $state = read_job_state($jobid, $type);
88 return if !defined($state) || $state->{state} ne 'started'; # removed or not started
89
90 my $new_state = {
91 state => 'stopped',
92 msg => $get_job_task_status->($state) // 'internal error',
93 upid => $state->{upid},
94 };
95
96 if ($state->{updated}) { # save updated time stamp
97 $new_state->{updated} = $state->{updated};
98 }
99
100 my $path = $get_state_file->($jobid, $type);
101 PVE::Tools::file_set_contents($path, encode_json($new_state));
102 });
103 }
104 }
105
106 # must be called when the job is first created
107 sub create_job {
108 my ($jobid, $type) = @_;
109
110 lock_job_state($jobid, $type, sub {
111 my $state = read_job_state($jobid, $type) // $default_state;
112
113 if ($state->{state} ne 'created') {
114 die "job state already exists\n";
115 }
116
117 $state->{time} = time();
118
119 my $path = $get_state_file->($jobid, $type);
120 PVE::Tools::file_set_contents($path, encode_json($state));
121 });
122 }
123
124 # to be called when the job is removed
125 sub remove_job {
126 my ($jobid, $type) = @_;
127 my $path = $get_state_file->($jobid, $type);
128 unlink $path;
129 }
130
131 # checks if the job can be started and sets the state to 'starting'
132 # returns 1 if the job can be started, 0 otherwise
133 sub starting_job {
134 my ($jobid, $type) = @_;
135
136 # first check unlocked to save time
137 my $state = read_job_state($jobid, $type);
138 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
139
140 lock_job_state($jobid, $type, sub {
141 my $state = read_job_state($jobid, $type);
142 return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
143
144 my $new_state = {
145 state => 'starting',
146 time => time(),
147 };
148
149 my $path = $get_state_file->($jobid, $type);
150 PVE::Tools::file_set_contents($path, encode_json($new_state));
151 });
152 return 1;
153 }
154
155 sub started_job {
156 my ($jobid, $type, $upid, $msg) = @_;
157
158 lock_job_state($jobid, $type, sub {
159 my $state = read_job_state($jobid, $type);
160 return if !defined($state); # job was removed, do not update
161 die "unexpected state '$state->{state}'\n" if $state->{state} ne 'starting';
162
163 my $new_state;
164 if (defined($msg)) {
165 $new_state = {
166 state => 'stopped',
167 msg => $msg,
168 time => time(),
169 };
170 } else {
171 $new_state = {
172 state => 'started',
173 upid => $upid,
174 };
175 }
176
177 my $path = $get_state_file->($jobid, $type);
178 PVE::Tools::file_set_contents($path, encode_json($new_state));
179 });
180 }
181
182 # will be called when the job schedule is updated
183 sub updated_job_schedule {
184 my ($jobid, $type) = @_;
185 lock_job_state($jobid, $type, sub {
186 my $old_state = read_job_state($jobid, $type) // $default_state;
187
188 $old_state->{updated} = time();
189
190 my $path = $get_state_file->($jobid, $type);
191 PVE::Tools::file_set_contents($path, encode_json($old_state));
192 });
193 }
194
195 sub get_last_runtime {
196 my ($jobid, $type) = @_;
197
198 my $state = read_job_state($jobid, $type) // $default_state;
199
200 return $state->{updated} if defined($state->{updated});
201
202 if (my $upid = $state->{upid}) {
203 my ($task) = PVE::Tools::upid_decode($upid, 1);
204 die "unable to parse worker upid\n" if !$task;
205 return $task->{starttime};
206 }
207
208 return $state->{time} // 0;
209 }
210
211 sub run_jobs {
212 synchronize_job_states_with_config();
213
214 my $jobs_cfg = cfs_read_file('jobs.cfg');
215 my $nodename = PVE::INotify::nodename();
216
217 foreach my $id (sort keys %{$jobs_cfg->{ids}}) {
218 my $cfg = $jobs_cfg->{ids}->{$id};
219 my $type = $cfg->{type};
220 my $schedule = delete $cfg->{schedule};
221
222 # only schedule local jobs
223 next if defined($cfg->{node}) && $cfg->{node} ne $nodename;
224
225 eval { update_job_stopped($id, $type) };
226 if (my $err = $@) {
227 warn "could not update job state, skipping - $err\n";
228 next;
229 }
230
231 next if defined($cfg->{enabled}) && !$cfg->{enabled}; # only schedule actually enabled jobs
232
233 my $last_run = get_last_runtime($id, $type);
234 my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
235 my $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $last_run);
236
237 next if !defined($next_sync) || time() < $next_sync; # not yet its (next) turn
238
239 my $plugin = PVE::Jobs::Plugin->lookup($type);
240 if (starting_job($id, $type)) {
241 my $upid = eval { $plugin->run($cfg) };
242 if (my $err = $@) {
243 warn $@ if $@;
244 started_job($id, $type, undef, $err);
245 } elsif ($upid eq 'OK') { # some jobs return OK immediately
246 started_job($id, $type, undef, 'OK');
247 } else {
248 started_job($id, $type, $upid);
249 }
250 }
251 }
252 }
253
254 # creates and removes statefiles for job configs
255 sub synchronize_job_states_with_config {
256 cfs_lock_file('jobs.cfg', undef, sub {
257 my $data = cfs_read_file('jobs.cfg');
258
259 for my $id (keys $data->{ids}->%*) {
260 my $job = $data->{ids}->{$id};
261 my $type = $job->{type};
262 my $jobstate = read_job_state($id, $type);
263 create_job($id, $type) if !defined($jobstate);
264 }
265
266 PVE::Tools::dir_glob_foreach($state_dir, '(.*?)-(.*).json', sub {
267 my ($path, $type, $id) = @_;
268
269 if (!defined($data->{ids}->{$id})) {
270 remove_job($id, $type);
271 }
272 });
273 });
274 die $@ if $@;
275 }
276
277 sub setup_dirs {
278 mkdir $state_dir;
279 mkdir $lock_dir;
280 }
281
282 1;