1 package PVE
::ReplicationState
;
10 use PVE
::CalendarEvent
;
12 use PVE
::GuestHelpers
;
13 use PVE
::ReplicationConfig
;
15 # Note: regression tests can overwrite $state_path for testing
16 our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
17 our $state_lock = "/var/lib/pve-manager/pve-replication-state.lck";
18 our $replicate_logdir = "/var/log/pve/replicate";
20 # regression tests should overwrite this
21 sub job_logfile_name
{
24 return "${replicate_logdir}/$jobid";
27 # Note: We use PVE::Tools::file_set_contents to write state file atomically,
28 # so read_state() always returns an consistent copy (even when not locked).
32 return {} if ! -e
$state_path;
34 my $raw = PVE
::Tools
::file_get_contents
($state_path);
36 return {} if $raw eq '';
39 if ($raw =~ m/^({.*})$/) {
40 return decode_json
($1);
43 die "invalid json data in '$state_path'\n";
46 sub extract_job_state
{
47 my ($stateobj, $jobcfg) = @_;
49 my $plugin = PVE
::ReplicationConfig-
>lookup($jobcfg->{type
});
51 my $vmid = $jobcfg->{guest
};
52 my $tid = $plugin->get_unique_target_id($jobcfg);
53 my $state = $stateobj->{$vmid}->{$tid};
55 $state = {} if !$state;
57 $state->{last_iteration
} //= 0;
58 $state->{last_try
} //= 0; # last sync start time
59 $state->{last_sync
} //= 0; # last successful sync start time
60 $state->{fail_count
} //= 0;
65 sub extract_vmid_tranfer_state
{
66 my ($stateobj, $vmid, $old_target, $new_target) = @_;
68 my $oldid = PVE
::ReplicationConfig
::Cluster-
>get_unique_target_id({ target
=> $old_target });
69 my $newid = PVE
::ReplicationConfig
::Cluster-
>get_unique_target_id({ target
=> $new_target });
71 if (defined(my $vmstate = $stateobj->{$vmid})) {
72 $vmstate->{$newid} = delete($vmstate->{$oldid}) if defined($vmstate->{$oldid});
82 my $stateobj = read_state
();
83 return extract_job_state
($stateobj, $jobcfg);
86 # update state for a single job
87 # pass $state = undef to delete the job state completely
89 my ($jobcfg, $state) = @_;
91 my $plugin = PVE
::ReplicationConfig-
>lookup($jobcfg->{type
});
93 my $vmid = $jobcfg->{guest
};
94 my $tid = $plugin->get_unique_target_id($jobcfg);
98 my $stateobj = read_state
();
99 # Note: tuple ($vmid, $tid) is unique
100 if (defined($state)) {
101 $stateobj->{$vmid}->{$tid} = $state;
103 delete $stateobj->{$vmid}->{$tid};
104 delete $stateobj->{$vmid} if !%{$stateobj->{$vmid}};
106 PVE
::Tools
::file_set_contents
($state_path, encode_json
($stateobj));
110 PVE
::Tools
::lock_file
($state_lock, 10, $update);
114 # make sure we have guest_migration_lock during update
115 PVE
::GuestHelpers
::guest_migration_lock
($vmid, undef, $code);
118 # update all job states related to a specific $vmid
119 sub write_vmid_job_states
{
120 my ($vmid_state, $vmid) = @_;
123 my $stateobj = read_state
();
124 $stateobj->{$vmid} = $vmid_state;
125 PVE
::Tools
::file_set_contents
($state_path, encode_json
($stateobj));
129 PVE
::Tools
::lock_file
($state_lock, 10, $update);
133 # make sure we have guest_migration_lock during update
134 PVE
::GuestHelpers
::guest_migration_lock
($vmid, undef, $code);
137 sub record_job_start
{
138 my ($jobcfg, $state, $start_time, $iteration) = @_;
141 $state->{ptime
} = PVE
::ProcFSTools
::read_proc_starttime
($state->{pid
});
142 $state->{last_node
} = PVE
::INotify
::nodename
();
143 $state->{last_try
} = $start_time;
144 $state->{last_iteration
} = $iteration;
145 $state->{storeid_list
} //= [];
147 write_job_state
($jobcfg, $state);
150 sub delete_guest_states
{
154 my $stateobj = read_state
();
155 delete $stateobj->{$vmid};
156 PVE
::Tools
::file_set_contents
($state_path, encode_json
($stateobj));
159 PVE
::Tools
::lock_file
($state_lock, 10, $code);
163 my ($jobcfg, $state, $start_time, $duration, $err) = @_;
165 $state->{duration
} = $duration;
166 delete $state->{pid
};
167 delete $state->{ptime
};
171 $state->{fail_count
}++;
172 $state->{error
} = "$err";
173 write_job_state
($jobcfg, $state);
175 if ($jobcfg->{remove_job
}) {
176 write_job_state
($jobcfg, undef);
178 $state->{last_sync
} = $start_time;
179 $state->{fail_count
} = 0;
180 delete $state->{error
};
181 write_job_state
($jobcfg, $state);
186 sub replication_snapshot_name
{
187 my ($jobid, $last_sync) = @_;
189 my $prefix = "__replicate_${jobid}_";
190 my $snapname = "${prefix}${last_sync}__";
192 wantarray ?
($prefix, $snapname) : $snapname;
195 sub purge_old_states
{
197 my $local_node = PVE
::INotify
::nodename
();
199 my $cfg = PVE
::ReplicationConfig-
>new();
200 my $vms = PVE
::Cluster
::get_vmlist
();
204 foreach my $jobid (sort keys %{$cfg->{ids
}}) {
205 my $jobcfg = $cfg->{ids
}->{$jobid};
206 my $plugin = PVE
::ReplicationConfig-
>lookup($jobcfg->{type
});
207 my $tid = $plugin->get_unique_target_id($jobcfg);
208 my $vmid = $jobcfg->{guest
};
209 $used_tids->{$vmid}->{$tid} = 1
210 if defined($vms->{ids
}->{$vmid}); # && $vms->{ids}->{$vmid}->{node} eq $local_node;
213 my $purge_state = sub {
214 my $stateobj = read_state
();
215 my $next_stateobj = {};
217 foreach my $vmid (keys %$stateobj) {
218 foreach my $tid (keys %{$stateobj->{$vmid}}) {
219 $next_stateobj->{$vmid}->{$tid} = $stateobj->{$vmid}->{$tid} if $used_tids->{$vmid}->{$tid};
222 PVE
::Tools
::file_set_contents
($state_path, encode_json
($next_stateobj));
225 PVE
::Tools
::lock_file
($state_lock, 10, $purge_state);
231 my $local_node = PVE
::INotify
::nodename
();
235 my $stateobj = read_state
();
237 my $cfg = PVE
::ReplicationConfig-
>new();
239 my $vms = PVE
::Cluster
::get_vmlist
();
241 foreach my $jobid (sort keys %{$cfg->{ids
}}) {
242 my $jobcfg = $cfg->{ids
}->{$jobid};
243 my $vmid = $jobcfg->{guest
};
245 die "internal error - not implemented" if $jobcfg->{type
} ne 'local';
247 # skip non existing vms
248 next if !$vms->{ids
}->{$vmid};
250 # only consider guest on local node
251 next if $vms->{ids
}->{$vmid}->{node
} ne $local_node;
253 my $target = $jobcfg->{target
};
254 if (!$jobcfg->{remove_job
}) {
255 # never sync to local node
256 next if $target eq $local_node;
258 next if $jobcfg->{disable
};
261 my $state = extract_job_state
($stateobj, $jobcfg);
262 $jobcfg->{state} = $state;
263 $jobcfg->{id
} = $jobid;
264 $jobcfg->{vmtype
} = $vms->{ids
}->{$vmid}->{type
};
268 if ($jobcfg->{remove_job
}) {
269 $next_sync = 1; # lowest possible value
270 # todo: consider fail_count? How many retries?
272 if (my $fail_count = $state->{fail_count
}) {
273 my $members = PVE
::Cluster
::get_members
();
274 if (!$fail_count || ($members->{$target} && $members->{$target}->{online
})) {
275 $next_sync = $state->{last_try
} + 60*($fail_count < 3 ?
5*$fail_count : 30);
278 my $schedule = $jobcfg->{schedule
} || '*/15';
279 my $calspec = PVE
::CalendarEvent
::parse_calendar_event
($schedule);
280 $next_sync = PVE
::CalendarEvent
::compute_next_event
($calspec, $state->{last_try
}) // 0;
284 $jobcfg->{next_sync
} = $next_sync;
286 $jobs->{$jobid} = $jobcfg;
293 my ($iteration, $start_time) = @_;
295 my $jobs = job_status
();
297 my $sort_func = sub {
298 my $joba = $jobs->{$a};
299 my $jobb = $jobs->{$b};
300 my $sa = $joba->{state};
301 my $sb = $jobb->{state};
302 my $res = $sa->{last_iteration
} cmp $sb->{last_iteration
};
303 return $res if $res != 0;
304 $res = $joba->{next_sync
} <=> $jobb->{next_sync
};
305 return $res if $res != 0;
306 return $joba->{guest
} <=> $jobb->{guest
};
309 foreach my $jobid (sort $sort_func keys %$jobs) {
310 my $jobcfg = $jobs->{$jobid};
311 next if $jobcfg->{state}->{last_iteration
} >= $iteration;
312 if ($jobcfg->{next_sync
} && ($start_time >= $jobcfg->{next_sync
})) {