]>
Commit | Line | Data |
---|---|---|
892821fd DM |
1 | package PVE::Replication; |
2 | ||
3 | use warnings; | |
4 | use strict; | |
5 | use Data::Dumper; | |
6 | use JSON; | |
7 | use Time::HiRes qw(gettimeofday tv_interval); | |
8 | ||
9 | use PVE::INotify; | |
483f89dd | 10 | use PVE::ProcFSTools; |
892821fd DM |
11 | use PVE::Tools; |
12 | use PVE::Cluster; | |
892821fd | 13 | use PVE::Storage; |
1a9dc09e | 14 | use PVE::GuestHelpers; |
892821fd | 15 | use PVE::ReplicationConfig; |
d255af01 | 16 | use PVE::ReplicationState; |
892821fd | 17 | |
f842e812 DM |
18 | |
19 | # regression tests should overwrite this | |
20 | sub get_log_time { | |
21 | ||
22 | return time(); | |
23 | } | |
892821fd | 24 | |
acea170e | 25 | sub remote_prepare_local_job { |
f842e812 | 26 | my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_; |
acea170e DM |
27 | |
28 | my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info); | |
4ce4ae70 | 29 | my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid]; |
91ee6a2f | 30 | push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list); |
f9d38c54 DM |
31 | push @$cmd, @$volumes if scalar(@$volumes); |
32 | ||
33 | push @$cmd, '--last_sync', $last_sync; | |
db4c35c3 WL |
34 | push @$cmd, '--parent_snapname', $parent_snapname |
35 | if $parent_snapname; | |
f9d38c54 | 36 | push @$cmd, '--force' if $force; |
acea170e DM |
37 | |
38 | my $remote_snapshots; | |
39 | ||
40 | my $parser = sub { | |
41 | my $line = shift; | |
42 | $remote_snapshots = JSON::decode_json($line); | |
43 | }; | |
44 | ||
f842e812 DM |
45 | my $logger = sub { |
46 | my $line = shift; | |
47 | chomp $line; | |
48 | $logfunc->("(remote_prepare_local_job) $line"); | |
49 | }; | |
50 | ||
51 | PVE::Tools::run_command($cmd, outfunc => $parser, errfunc => $logger); | |
acea170e DM |
52 | |
53 | die "prepare remote node failed - no result\n" | |
54 | if !defined($remote_snapshots); | |
55 | ||
56 | return $remote_snapshots; | |
57 | } | |
58 | ||
b11e512f | 59 | sub remote_finalize_local_job { |
f842e812 | 60 | my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_; |
b11e512f DM |
61 | |
62 | my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info); | |
63 | my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid, | |
4ce4ae70 | 64 | @$volumes, '--last_sync', $last_sync]; |
b11e512f | 65 | |
f842e812 DM |
66 | my $logger = sub { |
67 | my $line = shift; | |
68 | chomp $line; | |
69 | $logfunc->("(remote_finalize_local_job) $line"); | |
70 | }; | |
71 | ||
72 | PVE::Tools::run_command($cmd, outfunc => $logger, errfunc => $logger); | |
b11e512f DM |
73 | } |
74 | ||
91ee6a2f DM |
75 | # finds local replication snapshots from $last_sync |
76 | # and removes all replication snapshots with other time stamps | |
fae99506 | 77 | sub prepare { |
91ee6a2f DM |
78 | my ($storecfg, $volids, $jobid, $last_sync, $parent_snapname, $logfunc) = @_; |
79 | ||
80 | $last_sync //= 0; | |
fae99506 | 81 | |
9b24b5d4 DM |
82 | my ($prefix, $snapname) = |
83 | PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync); | |
fae99506 DM |
84 | |
85 | my $last_snapshots = {}; | |
fa64c3fb | 86 | my $cleaned_replicated_volumes = {}; |
fae99506 | 87 | foreach my $volid (@$volids) { |
91ee6a2f | 88 | my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid); |
fae99506 | 89 | foreach my $snap (@$list) { |
91ee6a2f DM |
90 | if ($snap eq $snapname || (defined($parent_snapname) && ($snap eq $parent_snapname))) { |
91 | $last_snapshots->{$volid}->{$snap} = 1; | |
92 | } elsif ($snap =~ m/^\Q$prefix\E/) { | |
f842e812 | 93 | $logfunc->("delete stale replication snapshot '$snap' on $volid"); |
fae99506 | 94 | PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap); |
fa64c3fb | 95 | $cleaned_replicated_volumes->{$volid} = 1; |
fae99506 DM |
96 | } |
97 | } | |
98 | } | |
99 | ||
fa64c3fb | 100 | return wantarray ? ($last_snapshots, $cleaned_replicated_volumes) : $last_snapshots; |
fae99506 | 101 | } |
f70997ea | 102 | |
7f6ff9dd | 103 | sub replicate_volume { |
ffe5a020 | 104 | my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure) = @_; |
892821fd | 105 | |
ffe5a020 DM |
106 | my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid); |
107 | ||
108 | # fixme: handle $rate, $insecure ?? | |
109 | PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname, | |
110 | $base_snapshot, $sync_snapname); | |
892821fd DM |
111 | } |
112 | ||
f9d38c54 | 113 | |
7f6ff9dd | 114 | sub replicate { |
b9da11aa | 115 | my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_; |
7f6ff9dd | 116 | |
7f6ff9dd DM |
117 | my $local_node = PVE::INotify::nodename(); |
118 | ||
119 | die "not implemented - internal error" if $jobcfg->{type} ne 'local'; | |
120 | ||
121 | my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg'); | |
ffe5a020 DM |
122 | |
123 | my $migration_network; | |
124 | my $migration_type = 'secure'; | |
125 | if (my $mc = $dc_conf->{migration}) { | |
126 | $migration_network = $mc->{network}; | |
127 | $migration_type = $mc->{type} if defined($mc->{type}); | |
128 | } | |
129 | ||
7f6ff9dd DM |
130 | my $jobid = $jobcfg->{id}; |
131 | my $storecfg = PVE::Storage::config(); | |
356fbf79 | 132 | my $last_sync = $state->{last_sync}; |
7f6ff9dd DM |
133 | |
134 | die "start time before last sync ($start_time <= $last_sync) - abort sync\n" | |
135 | if $start_time <= $last_sync; | |
136 | ||
137 | my $vmid = $jobcfg->{guest}; | |
138 | my $vmtype = $jobcfg->{vmtype}; | |
139 | ||
b9da11aa DM |
140 | my $conf = $guest_class->load_config($vmid); |
141 | my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0); | |
142 | my $volumes = $guest_class->get_replicatable_volumes($storecfg, $conf); | |
7f6ff9dd DM |
143 | |
144 | my $sorted_volids = [ sort keys %$volumes ]; | |
145 | ||
535a24b2 DM |
146 | $running //= 0; # to avoid undef warnings from logfunc |
147 | ||
f842e812 DM |
148 | $logfunc->("guest => $vmid, type => $vmtype, running => $running"); |
149 | $logfunc->("volumes => " . join(',', @$sorted_volids)); | |
7f6ff9dd | 150 | |
f9d38c54 DM |
151 | if (my $remove_job = $jobcfg->{remove_job}) { |
152 | ||
f842e812 | 153 | $logfunc->("start job removal - mode '${remove_job}'"); |
f9d38c54 | 154 | |
495aa710 | 155 | if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) { |
f9d38c54 | 156 | # remove all remote volumes |
d793b4a6 | 157 | my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}); |
f842e812 | 158 | remote_prepare_local_job($ssh_info, $jobid, $vmid, [], $state->{storeid_list}, 0, undef, 1, $logfunc); |
f9d38c54 DM |
159 | |
160 | } | |
161 | # remove all local replication snapshots (lastsync => 0) | |
91ee6a2f | 162 | prepare($storecfg, $sorted_volids, $jobid, 0, undef, $logfunc); |
f9d38c54 | 163 | |
d092dc4f | 164 | PVE::ReplicationConfig::delete_job($jobid); # update config |
f842e812 | 165 | $logfunc->("job removed"); |
f9d38c54 DM |
166 | |
167 | return; | |
168 | } | |
169 | ||
d793b4a6 DM |
170 | my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network); |
171 | ||
9b24b5d4 DM |
172 | my $last_sync_snapname = |
173 | PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync); | |
174 | my $sync_snapname = | |
175 | PVE::ReplicationState::replication_snapshot_name($jobid, $start_time); | |
7f6ff9dd | 176 | |
91ee6a2f DM |
177 | my $parent_snapname = $conf->{parent}; |
178 | ||
179 | # test if we have a replication_ snapshot from last sync | |
180 | # and remove all other/stale replication snapshots | |
181 | ||
7f6ff9dd | 182 | my $last_snapshots = prepare( |
91ee6a2f DM |
183 | $storecfg, $sorted_volids, $jobid, $last_sync, $parent_snapname, $logfunc); |
184 | ||
185 | # prepare remote side | |
186 | my $remote_snapshots = remote_prepare_local_job( | |
f842e812 | 187 | $ssh_info, $jobid, $vmid, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, 0, $logfunc); |
7f6ff9dd | 188 | |
356fbf79 DM |
189 | my $storeid_hash = {}; |
190 | foreach my $volid (@$sorted_volids) { | |
191 | my ($storeid) = PVE::Storage::parse_volume_id($volid); | |
192 | $storeid_hash->{$storeid} = 1; | |
193 | } | |
194 | $state->{storeid_list} = [ sort keys %$storeid_hash ]; | |
195 | ||
7f6ff9dd | 196 | # freeze filesystem for data consistency |
b9da11aa | 197 | if ($freezefs) { |
f842e812 | 198 | $logfunc->("freeze guest filesystem"); |
b9da11aa | 199 | $guest_class->__snapshot_freeze($vmid, 0); |
7f6ff9dd DM |
200 | } |
201 | ||
202 | # make snapshot of all volumes | |
203 | my $replicate_snapshots = {}; | |
204 | eval { | |
205 | foreach my $volid (@$sorted_volids) { | |
f842e812 | 206 | $logfunc->("create snapshot '${sync_snapname}' on $volid"); |
7f6ff9dd DM |
207 | PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname); |
208 | $replicate_snapshots->{$volid} = 1; | |
209 | } | |
210 | }; | |
211 | my $err = $@; | |
212 | ||
213 | # unfreeze immediately | |
b9da11aa DM |
214 | if ($freezefs) { |
215 | $guest_class->__snapshot_freeze($vmid, 1); | |
7f6ff9dd DM |
216 | } |
217 | ||
218 | my $cleanup_local_snapshots = sub { | |
219 | my ($volid_hash, $snapname) = @_; | |
220 | foreach my $volid (sort keys %$volid_hash) { | |
f842e812 | 221 | $logfunc->("delete previous replication snapshot '$snapname' on $volid"); |
b9da11aa | 222 | eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname); }; |
7f6ff9dd DM |
223 | warn $@ if $@; |
224 | } | |
225 | }; | |
226 | ||
227 | if ($err) { | |
228 | $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup | |
229 | die $err; | |
230 | } | |
231 | ||
232 | eval { | |
233 | ||
ffe5a020 DM |
234 | my $rate = $jobcfg->{rate}; |
235 | my $insecure = $migration_type eq 'insecure'; | |
236 | ||
7f6ff9dd | 237 | foreach my $volid (@$sorted_volids) { |
ffe5a020 | 238 | my $base_snapname; |
91ee6a2f DM |
239 | |
240 | if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) { | |
241 | if ($last_snapshots->{$volid}->{$last_sync_snapname} && | |
242 | $remote_snapshots->{$volid}->{$last_sync_snapname}) { | |
f842e812 | 243 | $logfunc->("incremental sync '$volid' ($last_sync_snapname => $sync_snapname)"); |
91ee6a2f DM |
244 | $base_snapname = $last_sync_snapname; |
245 | } elsif (defined($parent_snapname) && | |
246 | ($last_snapshots->{$volid}->{$parent_snapname} && | |
247 | $remote_snapshots->{$volid}->{$parent_snapname})) { | |
f842e812 | 248 | $logfunc->("incremental sync '$volid' ($parent_snapname => $sync_snapname)"); |
91ee6a2f DM |
249 | $base_snapname = $parent_snapname; |
250 | } | |
7f6ff9dd | 251 | } |
91ee6a2f | 252 | |
f842e812 | 253 | $logfunc->("full sync '$volid' ($sync_snapname)") if !defined($base_snapname); |
ffe5a020 | 254 | replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure); |
7f6ff9dd DM |
255 | } |
256 | }; | |
257 | $err = $@; | |
258 | ||
259 | if ($err) { | |
260 | $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup | |
261 | # we do not cleanup the remote side here - this is done in | |
262 | # next run of prepare_local_job | |
263 | die $err; | |
264 | } | |
265 | ||
266 | # remove old snapshots because they are no longer needed | |
267 | $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname); | |
268 | ||
f842e812 | 269 | remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc); |
7f6ff9dd DM |
270 | |
271 | die $err if $err; | |
272 | } | |
273 | ||
d255af01 | 274 | my $run_replication_nolock = sub { |
b9da11aa | 275 | my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc) = @_; |
892821fd | 276 | |
f842e812 DM |
277 | my $jobid = $jobcfg->{id}; |
278 | ||
d255af01 DM |
279 | # we normaly write errors into the state file, |
280 | # but we also catch unexpected errors and log them to syslog | |
281 | # (for examply when there are problems writing the state file) | |
282 | eval { | |
283 | my $state = PVE::ReplicationState::read_job_state($jobcfg); | |
892821fd | 284 | |
d255af01 | 285 | my $t0 = [gettimeofday]; |
483f89dd | 286 | |
d255af01 DM |
287 | $state->{pid} = $$; |
288 | $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid}); | |
e137f69f | 289 | $state->{last_node} = PVE::INotify::nodename(); |
d255af01 DM |
290 | $state->{last_try} = $start_time; |
291 | $state->{last_iteration} = $iteration; | |
356fbf79 | 292 | $state->{storeid_list} //= []; |
6d645cb2 | 293 | |
d255af01 | 294 | PVE::ReplicationState::write_job_state($jobcfg, $state); |
483f89dd | 295 | |
d092dc4f DM |
296 | mkdir $PVE::ReplicationState::replicate_logdir; |
297 | my $logfile = PVE::ReplicationState::job_logfile_name($jobid); | |
f842e812 DM |
298 | open(my $logfd, '>', $logfile) || |
299 | die "unable to open replication log '$logfile' - $!\n"; | |
300 | ||
301 | my $logfunc_wrapper = sub { | |
302 | my ($msg) = @_; | |
303 | ||
304 | my $ctime = get_log_time(); | |
305 | print $logfd "$ctime $jobid: $msg\n"; | |
306 | $logfunc->("$ctime $jobid: $msg") if $logfunc; | |
307 | }; | |
308 | ||
309 | $logfunc_wrapper->("start replication job"); | |
f70997ea | 310 | |
d255af01 | 311 | eval { |
b9da11aa | 312 | replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper); |
d255af01 DM |
313 | }; |
314 | my $err = $@; | |
892821fd | 315 | |
d255af01 DM |
316 | $state->{duration} = tv_interval($t0); |
317 | delete $state->{pid}; | |
318 | delete $state->{ptime}; | |
892821fd | 319 | |
d255af01 | 320 | if ($err) { |
f70997ea | 321 | chomp $err; |
d255af01 DM |
322 | $state->{fail_count}++; |
323 | $state->{error} = "$err"; | |
324 | PVE::ReplicationState::write_job_state($jobcfg, $state); | |
f842e812 | 325 | $logfunc_wrapper->("end replication job with error: $err"); |
f70997ea | 326 | } else { |
f842e812 | 327 | $logfunc_wrapper->("end replication job"); |
d255af01 DM |
328 | $state->{last_sync} = $start_time; |
329 | $state->{fail_count} = 0; | |
330 | delete $state->{error}; | |
331 | PVE::ReplicationState::write_job_state($jobcfg, $state); | |
f70997ea | 332 | } |
f842e812 DM |
333 | |
334 | close($logfd); | |
d255af01 DM |
335 | }; |
336 | if (my $err = $@) { | |
f842e812 | 337 | warn "$jobid: got unexpected replication job error - $err"; |
d255af01 DM |
338 | } |
339 | }; | |
340 | ||
810c6776 | 341 | sub run_replication { |
b9da11aa | 342 | my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_; |
d255af01 DM |
343 | |
344 | eval { | |
345 | my $timeout = 2; # do not wait too long - we repeat periodically anyways | |
346 | PVE::GuestHelpers::guest_migration_lock( | |
347 | $jobcfg->{guest}, $timeout, $run_replication_nolock, | |
b9da11aa | 348 | $guest_class, $jobcfg, $iteration, $start_time, $logfunc); |
d255af01 DM |
349 | }; |
350 | if (my $err = $@) { | |
351 | return undef if $noerr; | |
352 | die $err; | |
892821fd | 353 | } |
892821fd DM |
354 | } |
355 | ||
356 | 1; |