]> git.proxmox.com Git - pve-guest-common.git/blob - PVE/ReplicationState.pm
6a743a3e5ec358ff69e1bed4d7d96860d71c6f08
[pve-guest-common.git] / PVE / ReplicationState.pm
1 package PVE::ReplicationState;
2
3 use warnings;
4 use strict;
5 use JSON;
6
7 use PVE::INotify;
8 use PVE::ProcFSTools;
9 use PVE::Tools;
10 use PVE::CalendarEvent;
11 use PVE::Cluster;
12 use PVE::GuestHelpers;
13 use PVE::ReplicationConfig;
14
15 # Note: regression tests can overwrite $state_path for testing
16 our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
17 our $state_lock = "/var/lib/pve-manager/pve-replication-state.lck";
18 our $replicate_logdir = "/var/log/pve/replicate";
19
20 # regression tests should overwrite this
21 sub job_logfile_name {
22 my ($jobid) = @_;
23
24 return "${replicate_logdir}/$jobid";
25 }
26
27 # Note: We use PVE::Tools::file_set_contents to write state file atomically,
28 # so read_state() always returns an consistent copy (even when not locked).
29
30 sub read_state {
31
32 return {} if ! -e $state_path;
33
34 my $raw = PVE::Tools::file_get_contents($state_path);
35
36 return {} if $raw eq '';
37
38 # untaint $raw
39 if ($raw =~ m/^({.*})$/) {
40 return decode_json($1);
41 }
42
43 die "invalid json data in '$state_path'\n";
44 }
45
46 sub extract_job_state {
47 my ($stateobj, $jobcfg) = @_;
48
49 my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
50
51 my $vmid = $jobcfg->{guest};
52 my $tid = $plugin->get_unique_target_id($jobcfg);
53 my $state = $stateobj->{$vmid}->{$tid};
54
55 $state = {} if !$state;
56
57 $state->{last_iteration} //= 0;
58 $state->{last_try} //= 0; # last sync start time
59 $state->{last_sync} //= 0; # last successful sync start time
60 $state->{fail_count} //= 0;
61
62 return $state;
63 }
64
65 sub extract_vmid_tranfer_state {
66 my ($stateobj, $vmid, $old_target, $new_target) = @_;
67
68 my $oldid = PVE::ReplicationConfig::Cluster->get_unique_target_id({ target => $old_target });
69 my $newid = PVE::ReplicationConfig::Cluster->get_unique_target_id({ target => $new_target });
70
71 if (defined(my $vmstate = $stateobj->{$vmid})) {
72 $vmstate->{$newid} = delete($vmstate->{$oldid}) if defined($vmstate->{$oldid});
73 return $vmstate;
74 }
75
76 return {};
77 }
78
79 sub read_job_state {
80 my ($jobcfg) = @_;
81
82 my $stateobj = read_state();
83 return extract_job_state($stateobj, $jobcfg);
84 }
85
86 # update state for a single job
87 sub write_job_state {
88 my ($jobcfg, $state) = @_;
89
90 my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
91
92 my $vmid = $jobcfg->{guest};
93 my $tid = $plugin->get_unique_target_id($jobcfg);
94
95 my $update = sub {
96
97 my $stateobj = read_state();
98 # Note: tuple ($vmid, $tid) is unique
99 $stateobj->{$vmid}->{$tid} = $state;
100
101 PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
102 };
103
104 my $code = sub {
105 PVE::Tools::lock_file($state_lock, 10, $update);
106 die $@ if $@;
107 };
108
109 # make sure we have guest_migration_lock during update
110 PVE::GuestHelpers::guest_migration_lock($vmid, undef, $code);
111 }
112
113 # update all job states related to a specific $vmid
114 sub write_vmid_job_states {
115 my ($vmid_state, $vmid) = @_;
116
117 my $update = sub {
118 my $stateobj = read_state();
119 $stateobj->{$vmid} = $vmid_state;
120 PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
121 };
122
123 my $code = sub {
124 PVE::Tools::lock_file($state_lock, 10, $update);
125 die $@ if $@;
126 };
127
128 # make sure we have guest_migration_lock during update
129 PVE::GuestHelpers::guest_migration_lock($vmid, undef, $code);
130 }
131
132 sub record_job_start {
133 my ($jobcfg, $state, $start_time, $iteration) = @_;
134
135 $state->{pid} = $$;
136 $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
137 $state->{last_node} = PVE::INotify::nodename();
138 $state->{last_try} = $start_time;
139 $state->{last_iteration} = $iteration;
140 $state->{storeid_list} //= [];
141
142 write_job_state($jobcfg, $state);
143 }
144
145 sub record_job_end {
146 my ($jobcfg, $state, $start_time, $duration, $err) = @_;
147
148 $state->{duration} = $duration;
149 delete $state->{pid};
150 delete $state->{ptime};
151
152 if ($err) {
153 chomp $err;
154 $state->{fail_count}++;
155 $state->{error} = "$err";
156 } else {
157 $state->{last_sync} = $start_time;
158 $state->{fail_count} = 0;
159 delete $state->{error};
160 }
161 write_job_state($jobcfg, $state);
162 }
163
164 sub replication_snapshot_name {
165 my ($jobid, $last_sync) = @_;
166
167 my $prefix = "__replicate_${jobid}_";
168 my $snapname = "${prefix}${last_sync}__";
169
170 wantarray ? ($prefix, $snapname) : $snapname;
171 }
172
173 sub job_status {
174
175 my $local_node = PVE::INotify::nodename();
176
177 my $jobs = {};
178
179 my $stateobj = read_state();
180
181 my $cfg = PVE::ReplicationConfig->new();
182
183 my $vms = PVE::Cluster::get_vmlist();
184
185 foreach my $jobid (sort keys %{$cfg->{ids}}) {
186 my $jobcfg = $cfg->{ids}->{$jobid};
187 my $vmid = $jobcfg->{guest};
188
189 die "internal error - not implemented" if $jobcfg->{type} ne 'local';
190
191 # skip non existing vms
192 next if !$vms->{ids}->{$vmid};
193
194 # only consider guest on local node
195 next if $vms->{ids}->{$vmid}->{node} ne $local_node;
196
197 if (!$jobcfg->{remove_job}) {
198 # never sync to local node
199 next if $jobcfg->{target} eq $local_node;
200
201 next if $jobcfg->{disable};
202 }
203
204 my $state = extract_job_state($stateobj, $jobcfg);
205 $jobcfg->{state} = $state;
206 $jobcfg->{id} = $jobid;
207 $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
208
209 my $next_sync = 0;
210
211 if ($jobcfg->{remove_job}) {
212 $next_sync = 1; # lowest possible value
213 # todo: consider fail_count? How many retries?
214 } else {
215 if (my $fail_count = $state->{fail_count}) {
216 if ($fail_count < 3) {
217 $next_sync = $state->{last_try} + 5*60*$fail_count;
218 }
219 } else {
220 my $schedule = $jobcfg->{schedule} || '*/15';
221 my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
222 $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $state->{last_try}) // 0;
223 }
224 }
225
226 $jobcfg->{next_sync} = $next_sync;
227
228 $jobs->{$jobid} = $jobcfg;
229 }
230
231 return $jobs;
232 }
233
234 sub get_next_job {
235 my ($iteration, $start_time) = @_;
236
237 my $jobs = job_status();
238
239 my $sort_func = sub {
240 my $joba = $jobs->{$a};
241 my $jobb = $jobs->{$b};
242 my $sa = $joba->{state};
243 my $sb = $jobb->{state};
244 my $res = $sa->{last_iteration} cmp $sb->{last_iteration};
245 return $res if $res != 0;
246 $res = $joba->{next_sync} <=> $jobb->{next_sync};
247 return $res if $res != 0;
248 return $joba->{guest} <=> $jobb->{guest};
249 };
250
251 foreach my $jobid (sort $sort_func keys %$jobs) {
252 my $jobcfg = $jobs->{$jobid};
253 next if $jobcfg->{state}->{last_iteration} >= $iteration;
254 if ($jobcfg->{next_sync} && ($start_time >= $jobcfg->{next_sync})) {
255 return $jobcfg;
256 }
257 }
258
259 return undef;
260 }
261
262 1;