return $state;
}
+sub extract_vmid_tranfer_state {
+ my ($stateobj, $vmid, $old_target, $new_target) = @_;
+
+ my $oldid = PVE::ReplicationConfig::Cluster->get_unique_target_id({ target => $old_target });
+ my $newid = PVE::ReplicationConfig::Cluster->get_unique_target_id({ target => $new_target });
+
+ if (defined(my $vmstate = $stateobj->{$vmid})) {
+ $vmstate->{$newid} = delete($vmstate->{$oldid}) if defined($vmstate->{$oldid});
+ return $vmstate;
+ }
+
+ return {};
+}
+
sub read_job_state {
my ($jobcfg) = @_;
}
# update state for a single job
+# pass $state = undef to delete the job state completely
sub write_job_state {
my ($jobcfg, $state) = @_;
my $stateobj = read_state();
# Note: tuple ($vmid, $tid) is unique
- $stateobj->{$vmid}->{$tid} = $state;
-
+ if (defined($state)) {
+ $stateobj->{$vmid}->{$tid} = $state;
+ } else {
+ delete $stateobj->{$vmid}->{$tid};
+ delete $stateobj->{$vmid} if !%{$stateobj->{$vmid}};
+ }
PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
};
write_job_state($jobcfg, $state);
}
+sub delete_guest_states {
+ my ($vmid) = @_;
+
+ my $code = sub {
+ my $stateobj = read_state();
+ delete $stateobj->{$vmid};
+ PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
+ };
+
+ PVE::Tools::lock_file($state_lock, 10, $code);
+}
+
sub record_job_end {
my ($jobcfg, $state, $start_time, $duration, $err) = @_;
chomp $err;
$state->{fail_count}++;
$state->{error} = "$err";
+ write_job_state($jobcfg, $state);
} else {
- $state->{last_sync} = $start_time;
- $state->{fail_count} = 0;
- delete $state->{error};
+ if ($jobcfg->{remove_job}) {
+ write_job_state($jobcfg, undef);
+ } else {
+ $state->{last_sync} = $start_time;
+ $state->{fail_count} = 0;
+ delete $state->{error};
+ write_job_state($jobcfg, $state);
+ }
}
- write_job_state($jobcfg, $state);
}
sub replication_snapshot_name {
wantarray ? ($prefix, $snapname) : $snapname;
}
+sub purge_old_states {
+
+ my $local_node = PVE::INotify::nodename();
+
+ my $cfg = PVE::ReplicationConfig->new();
+ my $vms = PVE::Cluster::get_vmlist();
+
+ my $used_tids = {};
+
+ foreach my $jobid (sort keys %{$cfg->{ids}}) {
+ my $jobcfg = $cfg->{ids}->{$jobid};
+ my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
+ my $tid = $plugin->get_unique_target_id($jobcfg);
+ my $vmid = $jobcfg->{guest};
+ $used_tids->{$vmid}->{$tid} = 1
+ if defined($vms->{ids}->{$vmid}); # && $vms->{ids}->{$vmid}->{node} eq $local_node;
+ }
+
+ my $purge_state = sub {
+ my $stateobj = read_state();
+ my $next_stateobj = {};
+
+ foreach my $vmid (keys %$stateobj) {
+ foreach my $tid (keys %{$stateobj->{$vmid}}) {
+ $next_stateobj->{$vmid}->{$tid} = $stateobj->{$vmid}->{$tid} if $used_tids->{$vmid}->{$tid};
+ }
+ }
+ PVE::Tools::file_set_contents($state_path, encode_json($next_stateobj));
+ };
+
+ PVE::Tools::lock_file($state_lock, 10, $purge_state);
+ die $@ if $@;
+}
+
sub job_status {
+ my ($get_disabled) = @_;
my $local_node = PVE::INotify::nodename();
# only consider guest on local node
next if $vms->{ids}->{$vmid}->{node} ne $local_node;
+ my $target = $jobcfg->{target};
if (!$jobcfg->{remove_job}) {
# never sync to local node
- next if $jobcfg->{target} eq $local_node;
+ next if $target eq $local_node;
- next if $jobcfg->{disable};
+ next if !$get_disabled && $jobcfg->{disable};
}
my $state = extract_job_state($stateobj, $jobcfg);
# todo: consider fail_count? How many retries?
} else {
if (my $fail_count = $state->{fail_count}) {
- if ($fail_count < 3) {
- $next_sync = $state->{last_try} + 5*60*$fail_count;
+ my $members = PVE::Cluster::get_members();
+ if (!$fail_count || ($members->{$target} && $members->{$target}->{online})) {
+ $next_sync = $state->{last_try} + 60*($fail_count < 3 ? 5*$fail_count : 30);
}
} else {
my $schedule = $jobcfg->{schedule} || '*/15';
my $jobb = $jobs->{$b};
my $sa = $joba->{state};
my $sb = $jobb->{state};
- my $res = $sa->{last_iteration} cmp $sb->{last_iteration};
+ my $res = $sa->{last_iteration} <=> $sb->{last_iteration};
return $res if $res != 0;
$res = $joba->{next_sync} <=> $jobb->{next_sync};
return $res if $res != 0;
return undef;
}
+sub schedule_job_now {
+ my ($jobcfg) = @_;
+
+ PVE::GuestHelpers::guest_migration_lock($jobcfg->{guest}, undef, sub {
+ PVE::Tools::lock_file($state_lock, 10, sub {
+ my $stateobj = read_state();
+ my $vmid = $jobcfg->{guest};
+ my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
+ my $tid = $plugin->get_unique_target_id($jobcfg);
+ # no not modify anything if there is no state
+ return if !defined($stateobj->{$vmid}->{$tid});
+
+ my $state = read_job_state($jobcfg);
+ $state->{last_try} = 0;
+ write_job_state($jobcfg, $state);
+ });
+ die $@ if $@;
+ });
+}
+
1;