]> git.proxmox.com Git - pve-guest-common.git/blame - PVE/ReplicationState.pm
Add lock to pervent lost update.
[pve-guest-common.git] / PVE / ReplicationState.pm
CommitLineData
3f7cacff
DM
1package PVE::ReplicationState;
2
3use warnings;
4use strict;
5use JSON;
6
c292c8e9 7use PVE::INotify;
c17dcb3e 8use PVE::ProcFSTools;
3f7cacff 9use PVE::Tools;
c292c8e9
DM
10use PVE::CalendarEvent;
11use PVE::Cluster;
90c07bf7 12use PVE::GuestHelpers;
3f7cacff
DM
13use PVE::ReplicationConfig;
14
3f7cacff
DM
15# Note: regression tests can overwrite $state_path for testing
16our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
17our $state_lock = "/var/lib/pve-manager/pve-replication-state.lck";
c292c8e9
DM
18our $replicate_logdir = "/var/log/pve/replicate";
19
20# regression tests should overwrite this
21sub job_logfile_name {
22 my ($jobid) = @_;
23
24 return "${replicate_logdir}/$jobid";
25}
3f7cacff
DM
26
27# Note: We use PVE::Tools::file_set_contents to write state file atomically,
28# so read_state() always returns an consistent copy (even when not locked).
29
30sub read_state {
31
32 return {} if ! -e $state_path;
33
34 my $raw = PVE::Tools::file_get_contents($state_path);
35
36 return {} if $raw eq '';
37
38 # untaint $raw
39 if ($raw =~ m/^({.*})$/) {
40 return decode_json($1);
41 }
42
43 die "invalid json data in '$state_path'\n";
44}
45
46sub extract_job_state {
47 my ($stateobj, $jobcfg) = @_;
48
49 my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
50
51 my $vmid = $jobcfg->{guest};
52 my $tid = $plugin->get_unique_target_id($jobcfg);
53 my $state = $stateobj->{$vmid}->{$tid};
54
55 $state = {} if !$state;
56
57 $state->{last_iteration} //= 0;
58 $state->{last_try} //= 0; # last sync start time
59 $state->{last_sync} //= 0; # last successful sync start time
60 $state->{fail_count} //= 0;
61
62 return $state;
63}
64
210a5f79
DM
65sub extract_vmid_tranfer_state {
66 my ($stateobj, $vmid, $old_target, $new_target) = @_;
67
68 my $oldid = PVE::ReplicationConfig::Cluster->get_unique_target_id({ target => $old_target });
69 my $newid = PVE::ReplicationConfig::Cluster->get_unique_target_id({ target => $new_target });
70
71 if (defined(my $vmstate = $stateobj->{$vmid})) {
72 $vmstate->{$newid} = delete($vmstate->{$oldid}) if defined($vmstate->{$oldid});
73 return $vmstate;
74 }
75
76 return {};
77}
78
3f7cacff
DM
79sub read_job_state {
80 my ($jobcfg) = @_;
81
82 my $stateobj = read_state();
83 return extract_job_state($stateobj, $jobcfg);
84}
85
55222f37 86# update state for a single job
2c508173 87# pass $state = undef to delete the job state completely
3f7cacff
DM
88sub write_job_state {
89 my ($jobcfg, $state) = @_;
90
91 my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
92
93 my $vmid = $jobcfg->{guest};
94 my $tid = $plugin->get_unique_target_id($jobcfg);
95
90c07bf7 96 my $update = sub {
3f7cacff
DM
97
98 my $stateobj = read_state();
99 # Note: tuple ($vmid, $tid) is unique
2c508173
DM
100 if (defined($state)) {
101 $stateobj->{$vmid}->{$tid} = $state;
102 } else {
103 delete $stateobj->{$vmid}->{$tid};
104 delete $stateobj->{$vmid} if !%{$stateobj->{$vmid}};
105 }
3f7cacff
DM
106 PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
107 };
108
90c07bf7
DM
109 my $code = sub {
110 PVE::Tools::lock_file($state_lock, 10, $update);
111 die $@ if $@;
112 };
113
114 # make sure we have guest_migration_lock during update
115 PVE::GuestHelpers::guest_migration_lock($vmid, undef, $code);
3f7cacff
DM
116}
117
55222f37
DM
118# update all job states related to a specific $vmid
119sub write_vmid_job_states {
120 my ($vmid_state, $vmid) = @_;
121
122 my $update = sub {
123 my $stateobj = read_state();
124 $stateobj->{$vmid} = $vmid_state;
125 PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
126 };
127
128 my $code = sub {
129 PVE::Tools::lock_file($state_lock, 10, $update);
130 die $@ if $@;
131 };
132
133 # make sure we have guest_migration_lock during update
134 PVE::GuestHelpers::guest_migration_lock($vmid, undef, $code);
135}
136
c17dcb3e
DM
137sub record_job_start {
138 my ($jobcfg, $state, $start_time, $iteration) = @_;
139
140 $state->{pid} = $$;
141 $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
142 $state->{last_node} = PVE::INotify::nodename();
143 $state->{last_try} = $start_time;
144 $state->{last_iteration} = $iteration;
145 $state->{storeid_list} //= [];
146
147 write_job_state($jobcfg, $state);
148}
149
14849765
WL
150sub delete_guest_states {
151 my ($vmid) = @_;
152
153 my $code = sub {
154 my $stateobj = read_state();
155 delete $stateobj->{$vmid};
156 PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
157 };
158
159 PVE::Tools::lock_file($state_lock, 10, $code);
160}
161
c17dcb3e
DM
162sub record_job_end {
163 my ($jobcfg, $state, $start_time, $duration, $err) = @_;
164
165 $state->{duration} = $duration;
166 delete $state->{pid};
167 delete $state->{ptime};
168
169 if ($err) {
170 chomp $err;
171 $state->{fail_count}++;
172 $state->{error} = "$err";
92a243e9 173 write_job_state($jobcfg, $state);
c17dcb3e 174 } else {
92a243e9
DM
175 if ($jobcfg->{remove_job}) {
176 write_job_state($jobcfg, undef);
177 } else {
178 $state->{last_sync} = $start_time;
179 $state->{fail_count} = 0;
180 delete $state->{error};
181 write_job_state($jobcfg, $state);
182 }
c17dcb3e 183 }
c17dcb3e
DM
184}
185
52dcecfc
DM
186sub replication_snapshot_name {
187 my ($jobid, $last_sync) = @_;
188
189 my $prefix = "__replicate_${jobid}_";
190 my $snapname = "${prefix}${last_sync}__";
191
192 wantarray ? ($prefix, $snapname) : $snapname;
193}
194
44972014
DM
195sub purge_old_states {
196
197 my $local_node = PVE::INotify::nodename();
198
199 my $cfg = PVE::ReplicationConfig->new();
81228d28 200 PVE::Cluster::cfs_update(1); # fail if we cannot query the vm list
44972014
DM
201 my $vms = PVE::Cluster::get_vmlist();
202
203 my $used_tids = {};
204
205 foreach my $jobid (sort keys %{$cfg->{ids}}) {
206 my $jobcfg = $cfg->{ids}->{$jobid};
207 my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
208 my $tid = $plugin->get_unique_target_id($jobcfg);
209 my $vmid = $jobcfg->{guest};
210 $used_tids->{$vmid}->{$tid} = 1
211 if defined($vms->{ids}->{$vmid}); # && $vms->{ids}->{$vmid}->{node} eq $local_node;
212 }
213
214 my $purge_state = sub {
215 my $stateobj = read_state();
216 my $next_stateobj = {};
217
218 foreach my $vmid (keys %$stateobj) {
219 foreach my $tid (keys %{$stateobj->{$vmid}}) {
220 $next_stateobj->{$vmid}->{$tid} = $stateobj->{$vmid}->{$tid} if $used_tids->{$vmid}->{$tid};
221 }
222 }
223 PVE::Tools::file_set_contents($state_path, encode_json($next_stateobj));
224 };
225
226 PVE::Tools::lock_file($state_lock, 10, $purge_state);
227 die $@ if $@;
228}
229
c292c8e9 230sub job_status {
23ca78cd 231 my ($get_disabled) = @_;
c292c8e9
DM
232
233 my $local_node = PVE::INotify::nodename();
234
235 my $jobs = {};
236
237 my $stateobj = read_state();
238
239 my $cfg = PVE::ReplicationConfig->new();
240
241 my $vms = PVE::Cluster::get_vmlist();
242
7319c02c
WL
243 my $func = sub {
244 foreach my $jobid (sort keys %{$cfg->{ids}}) {
245 my $jobcfg = $cfg->{ids}->{$jobid};
246 my $vmid = $jobcfg->{guest};
247
248 die "internal error - not implemented" if $jobcfg->{type} ne 'local';
249
250 # skip non existing vms
251 next if !$vms->{ids}->{$vmid};
252
253 # only consider guest on local node
254 next if $vms->{ids}->{$vmid}->{node} ne $local_node;
255
256 my $target = $jobcfg->{target};
257 if (!$jobcfg->{remove_job}) {
258 # check if vm was stolen (swapped source target)
259 if ($target eq $local_node) {
260 my $source = $jobcfg->{source};
261 if (defined($source) && $source ne $target) {
262 $jobcfg = PVE::ReplicationConfig::swap_source_target_nolock($jobid);
263 $cfg->{ids}->{$jobid} = $jobcfg;
264 } else {
265 # never sync to local node
266 next;
267 }
268 }
c292c8e9 269
7319c02c
WL
270 next if !$get_disabled && $jobcfg->{disable};
271 }
c292c8e9 272
7319c02c
WL
273 my $state = extract_job_state($stateobj, $jobcfg);
274 $jobcfg->{state} = $state;
275 $jobcfg->{id} = $jobid;
276 $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
277
278 my $next_sync = 0;
279
280 if ($jobcfg->{remove_job}) {
281 $next_sync = 1; # lowest possible value
282 # todo: consider fail_count? How many retries?
283 } else {
284 if (my $fail_count = $state->{fail_count}) {
285 my $members = PVE::Cluster::get_members();
286 if (!$fail_count || ($members->{$target} && $members->{$target}->{online})) {
287 $next_sync = $state->{last_try} + 60*($fail_count < 3 ? 5*$fail_count : 30);
288 }
7bbb0cd6 289 } else {
7319c02c
WL
290 my $schedule = $jobcfg->{schedule} || '*/15';
291 my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
292 $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $state->{last_try}) // 0;
7bbb0cd6
WL
293 }
294 }
c292c8e9 295
7319c02c 296 $jobcfg->{next_sync} = $next_sync;
c292c8e9 297
7319c02c
WL
298 if (!defined($jobcfg->{source}) || $jobcfg->{source} ne $local_node) {
299 $jobcfg->{source} = $cfg->{ids}->{$jobid}->{source} = $local_node;
300 PVE::ReplicationConfig::write($cfg);
c292c8e9 301 }
c292c8e9 302
7319c02c 303 $jobs->{$jobid} = $jobcfg;
4ea5167e 304 }
7319c02c 305 };
4ea5167e 306
7319c02c 307 PVE::ReplicationConfig::lock($func);
c292c8e9
DM
308
309 return $jobs;
310}
311
312sub get_next_job {
313 my ($iteration, $start_time) = @_;
314
315 my $jobs = job_status();
316
317 my $sort_func = sub {
318 my $joba = $jobs->{$a};
319 my $jobb = $jobs->{$b};
320 my $sa = $joba->{state};
321 my $sb = $jobb->{state};
621b955f 322 my $res = $sa->{last_iteration} <=> $sb->{last_iteration};
c292c8e9
DM
323 return $res if $res != 0;
324 $res = $joba->{next_sync} <=> $jobb->{next_sync};
325 return $res if $res != 0;
326 return $joba->{guest} <=> $jobb->{guest};
327 };
328
329 foreach my $jobid (sort $sort_func keys %$jobs) {
330 my $jobcfg = $jobs->{$jobid};
331 next if $jobcfg->{state}->{last_iteration} >= $iteration;
332 if ($jobcfg->{next_sync} && ($start_time >= $jobcfg->{next_sync})) {
333 return $jobcfg;
334 }
335 }
336
337 return undef;
338}
339
b90dc712
WB
340sub schedule_job_now {
341 my ($jobcfg) = @_;
40bcf652 342
b90dc712
WB
343 PVE::GuestHelpers::guest_migration_lock($jobcfg->{guest}, undef, sub {
344 PVE::Tools::lock_file($state_lock, 10, sub {
22ce1367
DM
345 my $stateobj = read_state();
346 my $vmid = $jobcfg->{guest};
40bcf652 347 my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
22ce1367
DM
348 my $tid = $plugin->get_unique_target_id($jobcfg);
349 # no not modify anything if there is no state
350 return if !defined($stateobj->{$vmid}->{$tid});
351
b90dc712
WB
352 my $state = read_job_state($jobcfg);
353 $state->{last_try} = 0;
354 write_job_state($jobcfg, $state);
355 });
356 die $@ if $@;
357 });
358}
359
3f7cacff 3601;