]> git.proxmox.com Git - pve-guest-common.git/blob - PVE/ReplicationState.pm
PVE::ReplicationState - new helpers record_job_start/record_job_end
[pve-guest-common.git] / PVE / ReplicationState.pm
1 package PVE::ReplicationState;
2
3 use warnings;
4 use strict;
5 use JSON;
6
7 use PVE::INotify;
8 use PVE::ProcFSTools;
9 use PVE::Tools;
10 use PVE::CalendarEvent;
11 use PVE::Cluster;
12 use PVE::GuestHelpers;
13 use PVE::ReplicationConfig;
14
15 # Note: regression tests can overwrite $state_path for testing
16 our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
17 our $state_lock = "/var/lib/pve-manager/pve-replication-state.lck";
18 our $replicate_logdir = "/var/log/pve/replicate";
19
20 # regression tests should overwrite this
21 sub job_logfile_name {
22 my ($jobid) = @_;
23
24 return "${replicate_logdir}/$jobid";
25 }
26
27 # Note: We use PVE::Tools::file_set_contents to write state file atomically,
28 # so read_state() always returns an consistent copy (even when not locked).
29
30 sub read_state {
31
32 return {} if ! -e $state_path;
33
34 my $raw = PVE::Tools::file_get_contents($state_path);
35
36 return {} if $raw eq '';
37
38 # untaint $raw
39 if ($raw =~ m/^({.*})$/) {
40 return decode_json($1);
41 }
42
43 die "invalid json data in '$state_path'\n";
44 }
45
46 sub extract_job_state {
47 my ($stateobj, $jobcfg) = @_;
48
49 my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
50
51 my $vmid = $jobcfg->{guest};
52 my $tid = $plugin->get_unique_target_id($jobcfg);
53 my $state = $stateobj->{$vmid}->{$tid};
54
55 $state = {} if !$state;
56
57 $state->{last_iteration} //= 0;
58 $state->{last_try} //= 0; # last sync start time
59 $state->{last_sync} //= 0; # last successful sync start time
60 $state->{fail_count} //= 0;
61
62 return $state;
63 }
64
65 sub read_job_state {
66 my ($jobcfg) = @_;
67
68 my $stateobj = read_state();
69 return extract_job_state($stateobj, $jobcfg);
70 }
71
72 # update state for a single job
73 sub write_job_state {
74 my ($jobcfg, $state) = @_;
75
76 my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
77
78 my $vmid = $jobcfg->{guest};
79 my $tid = $plugin->get_unique_target_id($jobcfg);
80
81 my $update = sub {
82
83 my $stateobj = read_state();
84 # Note: tuple ($vmid, $tid) is unique
85 $stateobj->{$vmid}->{$tid} = $state;
86
87 PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
88 };
89
90 my $code = sub {
91 PVE::Tools::lock_file($state_lock, 10, $update);
92 die $@ if $@;
93 };
94
95 # make sure we have guest_migration_lock during update
96 PVE::GuestHelpers::guest_migration_lock($vmid, undef, $code);
97 }
98
99 # update all job states related to a specific $vmid
100 sub write_vmid_job_states {
101 my ($vmid_state, $vmid) = @_;
102
103 my $update = sub {
104 my $stateobj = read_state();
105 $stateobj->{$vmid} = $vmid_state;
106 PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
107 };
108
109 my $code = sub {
110 PVE::Tools::lock_file($state_lock, 10, $update);
111 die $@ if $@;
112 };
113
114 # make sure we have guest_migration_lock during update
115 PVE::GuestHelpers::guest_migration_lock($vmid, undef, $code);
116 }
117
118 sub record_job_start {
119 my ($jobcfg, $state, $start_time, $iteration) = @_;
120
121 $state->{pid} = $$;
122 $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
123 $state->{last_node} = PVE::INotify::nodename();
124 $state->{last_try} = $start_time;
125 $state->{last_iteration} = $iteration;
126 $state->{storeid_list} //= [];
127
128 write_job_state($jobcfg, $state);
129 }
130
131 sub record_job_end {
132 my ($jobcfg, $state, $start_time, $duration, $err) = @_;
133
134 $state->{duration} = $duration;
135 delete $state->{pid};
136 delete $state->{ptime};
137
138 if ($err) {
139 chomp $err;
140 $state->{fail_count}++;
141 $state->{error} = "$err";
142 } else {
143 $state->{last_sync} = $start_time;
144 $state->{fail_count} = 0;
145 delete $state->{error};
146 }
147 write_job_state($jobcfg, $state);
148 }
149
150 sub replication_snapshot_name {
151 my ($jobid, $last_sync) = @_;
152
153 my $prefix = "__replicate_${jobid}_";
154 my $snapname = "${prefix}${last_sync}__";
155
156 wantarray ? ($prefix, $snapname) : $snapname;
157 }
158
159 sub job_status {
160
161 my $local_node = PVE::INotify::nodename();
162
163 my $jobs = {};
164
165 my $stateobj = read_state();
166
167 my $cfg = PVE::ReplicationConfig->new();
168
169 my $vms = PVE::Cluster::get_vmlist();
170
171 foreach my $jobid (sort keys %{$cfg->{ids}}) {
172 my $jobcfg = $cfg->{ids}->{$jobid};
173 my $vmid = $jobcfg->{guest};
174
175 die "internal error - not implemented" if $jobcfg->{type} ne 'local';
176
177 # skip non existing vms
178 next if !$vms->{ids}->{$vmid};
179
180 # only consider guest on local node
181 next if $vms->{ids}->{$vmid}->{node} ne $local_node;
182
183 if (!$jobcfg->{remove_job}) {
184 # never sync to local node
185 next if $jobcfg->{target} eq $local_node;
186
187 next if $jobcfg->{disable};
188 }
189
190 my $state = extract_job_state($stateobj, $jobcfg);
191 $jobcfg->{state} = $state;
192 $jobcfg->{id} = $jobid;
193 $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
194
195 my $next_sync = 0;
196
197 if ($jobcfg->{remove_job}) {
198 $next_sync = 1; # lowest possible value
199 # todo: consider fail_count? How many retries?
200 } else {
201 if (my $fail_count = $state->{fail_count}) {
202 if ($fail_count < 3) {
203 $next_sync = $state->{last_try} + 5*60*$fail_count;
204 }
205 } else {
206 my $schedule = $jobcfg->{schedule} || '*/15';
207 my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
208 $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $state->{last_try}) // 0;
209 }
210 }
211
212 $jobcfg->{next_sync} = $next_sync;
213
214 $jobs->{$jobid} = $jobcfg;
215 }
216
217 return $jobs;
218 }
219
220 sub get_next_job {
221 my ($iteration, $start_time) = @_;
222
223 my $jobs = job_status();
224
225 my $sort_func = sub {
226 my $joba = $jobs->{$a};
227 my $jobb = $jobs->{$b};
228 my $sa = $joba->{state};
229 my $sb = $jobb->{state};
230 my $res = $sa->{last_iteration} cmp $sb->{last_iteration};
231 return $res if $res != 0;
232 $res = $joba->{next_sync} <=> $jobb->{next_sync};
233 return $res if $res != 0;
234 return $joba->{guest} <=> $jobb->{guest};
235 };
236
237 foreach my $jobid (sort $sort_func keys %$jobs) {
238 my $jobcfg = $jobs->{$jobid};
239 next if $jobcfg->{state}->{last_iteration} >= $iteration;
240 if ($jobcfg->{next_sync} && ($start_time >= $jobcfg->{next_sync})) {
241 return $jobcfg;
242 }
243 }
244
245 return undef;
246 }
247
248 1;