1 package PVE
::ReplicationState
;
9 use PVE
::CalendarEvent
;
11 use PVE
::GuestHelpers
;
12 use PVE
::ReplicationConfig
;
14 # Note: regression tests can overwrite $state_path for testing
15 our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
16 our $state_lock = "/var/lib/pve-manager/pve-replication-state.lck";
17 our $replicate_logdir = "/var/log/pve/replicate";
19 # regression tests should overwrite this
20 sub job_logfile_name
{
23 return "${replicate_logdir}/$jobid";
26 # Note: We use PVE::Tools::file_set_contents to write state file atomically,
27 # so read_state() always returns an consistent copy (even when not locked).
31 return {} if ! -e
$state_path;
33 my $raw = PVE
::Tools
::file_get_contents
($state_path);
35 return {} if $raw eq '';
38 if ($raw =~ m/^({.*})$/) {
39 return decode_json
($1);
42 die "invalid json data in '$state_path'\n";
45 sub extract_job_state
{
46 my ($stateobj, $jobcfg) = @_;
48 my $plugin = PVE
::ReplicationConfig-
>lookup($jobcfg->{type
});
50 my $vmid = $jobcfg->{guest
};
51 my $tid = $plugin->get_unique_target_id($jobcfg);
52 my $state = $stateobj->{$vmid}->{$tid};
54 $state = {} if !$state;
56 $state->{last_iteration
} //= 0;
57 $state->{last_try
} //= 0; # last sync start time
58 $state->{last_sync
} //= 0; # last successful sync start time
59 $state->{fail_count
} //= 0;
67 my $stateobj = read_state
();
68 return extract_job_state
($stateobj, $jobcfg);
71 # update state for a single job
73 my ($jobcfg, $state) = @_;
75 my $plugin = PVE
::ReplicationConfig-
>lookup($jobcfg->{type
});
77 my $vmid = $jobcfg->{guest
};
78 my $tid = $plugin->get_unique_target_id($jobcfg);
82 my $stateobj = read_state
();
83 # Note: tuple ($vmid, $tid) is unique
84 $stateobj->{$vmid}->{$tid} = $state;
86 PVE
::Tools
::file_set_contents
($state_path, encode_json
($stateobj));
90 PVE
::Tools
::lock_file
($state_lock, 10, $update);
94 # make sure we have guest_migration_lock during update
95 PVE
::GuestHelpers
::guest_migration_lock
($vmid, undef, $code);
98 # update all job states related to a specific $vmid
99 sub write_vmid_job_states
{
100 my ($vmid_state, $vmid) = @_;
103 my $stateobj = read_state
();
104 $stateobj->{$vmid} = $vmid_state;
105 PVE
::Tools
::file_set_contents
($state_path, encode_json
($stateobj));
109 PVE
::Tools
::lock_file
($state_lock, 10, $update);
113 # make sure we have guest_migration_lock during update
114 PVE
::GuestHelpers
::guest_migration_lock
($vmid, undef, $code);
117 sub replication_snapshot_name
{
118 my ($jobid, $last_sync) = @_;
120 my $prefix = "__replicate_${jobid}_";
121 my $snapname = "${prefix}${last_sync}__";
123 wantarray ?
($prefix, $snapname) : $snapname;
128 my $local_node = PVE
::INotify
::nodename
();
132 my $stateobj = read_state
();
134 my $cfg = PVE
::ReplicationConfig-
>new();
136 my $vms = PVE
::Cluster
::get_vmlist
();
138 foreach my $jobid (sort keys %{$cfg->{ids
}}) {
139 my $jobcfg = $cfg->{ids
}->{$jobid};
140 my $vmid = $jobcfg->{guest
};
142 die "internal error - not implemented" if $jobcfg->{type
} ne 'local';
144 # skip non existing vms
145 next if !$vms->{ids
}->{$vmid};
147 # only consider guest on local node
148 next if $vms->{ids
}->{$vmid}->{node
} ne $local_node;
150 if (!$jobcfg->{remove_job
}) {
151 # never sync to local node
152 next if $jobcfg->{target
} eq $local_node;
154 next if $jobcfg->{disable
};
157 my $state = extract_job_state
($stateobj, $jobcfg);
158 $jobcfg->{state} = $state;
159 $jobcfg->{id
} = $jobid;
160 $jobcfg->{vmtype
} = $vms->{ids
}->{$vmid}->{type
};
164 if ($jobcfg->{remove_job
}) {
165 $next_sync = 1; # lowest possible value
166 # todo: consider fail_count? How many retries?
168 if (my $fail_count = $state->{fail_count
}) {
169 if ($fail_count < 3) {
170 $next_sync = $state->{last_try
} + 5*60*$fail_count;
173 my $schedule = $jobcfg->{schedule
} || '*/15';
174 my $calspec = PVE
::CalendarEvent
::parse_calendar_event
($schedule);
175 $next_sync = PVE
::CalendarEvent
::compute_next_event
($calspec, $state->{last_try
}) // 0;
179 $jobcfg->{next_sync
} = $next_sync;
181 $jobs->{$jobid} = $jobcfg;
188 my ($iteration, $start_time) = @_;
190 my $jobs = job_status
();
192 my $sort_func = sub {
193 my $joba = $jobs->{$a};
194 my $jobb = $jobs->{$b};
195 my $sa = $joba->{state};
196 my $sb = $jobb->{state};
197 my $res = $sa->{last_iteration
} cmp $sb->{last_iteration
};
198 return $res if $res != 0;
199 $res = $joba->{next_sync
} <=> $jobb->{next_sync
};
200 return $res if $res != 0;
201 return $joba->{guest
} <=> $jobb->{guest
};
204 foreach my $jobid (sort $sort_func keys %$jobs) {
205 my $jobcfg = $jobs->{$jobid};
206 next if $jobcfg->{state}->{last_iteration
} >= $iteration;
207 if ($jobcfg->{next_sync
} && ($start_time >= $jobcfg->{next_sync
})) {