1 package PVE
::Replication
;
7 use Time
::HiRes
qw(gettimeofday tv_interval);
8 use POSIX
qw(strftime);
14 use PVE
::DataCenterConfig
;
16 use PVE
::GuestHelpers
;
17 use PVE
::ReplicationConfig
;
18 use PVE
::ReplicationState
;
22 # regression tests should overwrite this
25 return strftime
("%F %H:%M:%S", localtime);
28 # Find common base replication snapshot, available on local and remote side.
29 # Note: this also removes stale replication snapshots
30 sub find_common_replication_snapshot
{
31 my ($ssh_info, $jobid, $vmid, $storecfg, $volumes, $storeid_list, $last_sync, $guest_conf, $logfunc) = @_;
33 my $parent_snapname = $guest_conf->{parent
};
34 my $conf_snapshots = $guest_conf->{snapshots
};
36 my $last_sync_snapname =
37 PVE
::ReplicationState
::replication_snapshot_name
($jobid, $last_sync);
40 prepare
($storecfg, $volumes, $jobid, $last_sync, $parent_snapname, $logfunc);
43 my $remote_snapshots = remote_prepare_local_job
(
55 my $base_snapshots = {};
56 my @missing_snapshots = ();
58 foreach my $volid (@$volumes) {
59 my $local_info = $local_snapshots->{$volid};
60 my $remote_info = $remote_snapshots->{$volid};
62 if (defined($local_info) && defined($remote_info)) {
63 my $common_snapshot = sub {
66 return 0 if !$local_info->{$snap} || !$remote_info->{$snap};
68 # Check for ID if remote side supports it already.
69 return $local_info->{$snap}->{id
} eq $remote_info->{$snap}->{id
}
70 if ref($remote_info->{$snap}) eq 'HASH';
75 if ($common_snapshot->($last_sync_snapname)) {
76 $base_snapshots->{$volid} = $last_sync_snapname;
77 } elsif (defined($parent_snapname) && $common_snapshot->($parent_snapname)) {
78 $base_snapshots->{$volid} = $parent_snapname;
80 my $most_recent = [0, undef];
81 for my $snapshot (keys $local_info->%*) {
82 next if !$common_snapshot->($snapshot);
83 next if !$conf_snapshots->{$snapshot} && !is_replication_snapshot
($snapshot);
85 my $timestamp = $local_info->{$snapshot}->{timestamp
};
87 $most_recent = [$timestamp, $snapshot] if $timestamp > $most_recent->[0];
90 if ($most_recent->[1]) {
91 $base_snapshots->{$volid} = $most_recent->[1];
95 push @missing_snapshots, $volid if !defined($base_snapshots->{$volid});
100 if (scalar(@missing_snapshots) > 0) {
101 # There exist volumes without common base snapshot on the remote side.
102 # Trying to (do a full) sync won't work, so die early with a clean error.
103 my $volume_string = join(',', @missing_snapshots);
104 die "No common base snapshot on volume(s) $volume_string\nPlease remove the problematic " .
105 "volume(s) from the replication target or delete and re-create the whole job $jobid\n";
108 return ($base_snapshots, $local_snapshots, $last_sync_snapname);
111 sub remote_prepare_local_job
{
112 my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
114 my $ssh_cmd = PVE
::SSHInfo
::ssh_info_to_command
($ssh_info);
115 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
116 push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
117 push @$cmd, @$volumes if scalar(@$volumes);
119 push @$cmd, '--last_sync', $last_sync;
120 push @$cmd, '--parent_snapname', $parent_snapname
122 push @$cmd, '--force' if $force;
124 my $remote_snapshots;
128 $remote_snapshots = JSON
::decode_json
($line);
134 $logfunc->("(remote_prepare_local_job) $line");
137 PVE
::Tools
::run_command
($cmd, outfunc
=> $parser, errfunc
=> $logger);
139 die "prepare remote node failed - no result\n"
140 if !defined($remote_snapshots);
142 return $remote_snapshots;
145 sub remote_finalize_local_job
{
146 my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
148 my $ssh_cmd = PVE
::SSHInfo
::ssh_info_to_command
($ssh_info);
149 my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
150 @$volumes, '--last_sync', $last_sync];
155 $logfunc->("(remote_finalize_local_job) $line");
158 PVE
::Tools
::run_command
($cmd, outfunc
=> $logger, errfunc
=> $logger);
161 # Finds all local snapshots and removes replication snapshots not matching $last_sync after checking
162 # that it is present. Use last_sync=0 (or undef) to prevent removal (useful if VM was stolen). Use
163 # last_sync=1 to remove all replication snapshots (limited to job if specified).
165 my ($storecfg, $volids, $jobid, $last_sync, $parent_snapname, $logfunc) = @_;
169 my ($prefix, $snapname);
171 if (defined($jobid)) {
172 ($prefix, $snapname) = PVE
::ReplicationState
::replication_snapshot_name
($jobid, $last_sync);
174 $prefix = '__replicate_';
177 my $local_snapshots = {};
178 my $cleaned_replicated_volumes = {};
179 foreach my $volid (@$volids) {
180 $local_snapshots->{$volid} = {};
182 my $info = PVE
::Storage
::volume_snapshot_info
($storecfg, $volid);
184 my $removal_ok = !defined($snapname) || $info->{$snapname};
185 $removal_ok = 0 if $last_sync == 0; # last_sync=0 if the VM was stolen, don't remove!
186 $removal_ok = 1 if $last_sync == 1; # last_sync=1 is a special value used to remove all
188 # check if it's a replication snapshot with the same $prefix but not the $last_sync one
189 my $potentially_stale = sub {
192 return 0 if defined($snapname) && $snap eq $snapname;
193 return 0 if defined($parent_snapname) && $snap eq $parent_snapname;
194 return $snap =~ m/^\Q$prefix\E/;
197 $logfunc->("expected snapshot $snapname not present for $volid, not removing others")
198 if !$removal_ok && $last_sync > 1 && grep { $potentially_stale->($_) } keys $info->%*;
200 for my $snap (keys $info->%*) {
201 if ($potentially_stale->($snap) && $removal_ok) {
202 $logfunc->("delete stale replication snapshot '$snap' on $volid");
204 PVE
::Storage
::volume_snapshot_delete
($storecfg, $volid, $snap);
205 $cleaned_replicated_volumes->{$volid} = 1;
208 # If deleting the snapshot fails, we can not be sure if it was due to an error or a timeout.
209 # The likelihood that the delete has worked out is high at a timeout.
210 # If it really fails, it will try to remove on the next run.
212 # warn is for syslog/journal.
215 # logfunc will written in replication log.
216 $logfunc->("delete stale replication snapshot error: $err");
219 $local_snapshots->{$volid}->{$snap} = $info->{$snap};
224 return wantarray ?
($local_snapshots, $cleaned_replicated_volumes) : $local_snapshots;
227 sub replicate_volume
{
228 my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure, $logfunc) = @_;
230 my ($storeid, $volname) = PVE
::Storage
::parse_volume_id
($volid);
232 my $ratelimit_bps = $rate ?
int(1000000 * $rate) : undef;
235 'target_volname' => $volname,
236 'base_snapshot' => $base_snapshot,
237 'snapshot' => $sync_snapname,
238 'ratelimit_bps' => $ratelimit_bps,
239 'insecure' => $insecure,
240 'with_snapshots' => 1,
243 PVE
::Storage
::storage_migrate
($storecfg, $volid, $ssh_info, $storeid, $opts, $logfunc);
248 my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
250 my $local_node = PVE
::INotify
::nodename
();
252 die "not implemented - internal error" if $jobcfg->{type
} ne 'local';
254 my $dc_conf = PVE
::Cluster
::cfs_read_file
('datacenter.cfg');
256 my $migration_network;
257 my $migration_type = 'secure';
258 if (my $mc = $dc_conf->{migration
}) {
259 $migration_network = $mc->{network
};
260 $migration_type = $mc->{type
} if defined($mc->{type
});
263 my $jobid = $jobcfg->{id
};
264 my $storecfg = PVE
::Storage
::config
();
265 my $last_sync = $state->{last_sync
};
267 die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
268 if $start_time <= $last_sync;
270 my $vmid = $jobcfg->{guest
};
272 my $conf = $guest_class->load_config($vmid);
273 my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
274 my $volumes = $guest_class->get_replicatable_volumes($storecfg, $vmid, $conf, defined($jobcfg->{remove_job
}));
276 my $sorted_volids = [ sort keys %$volumes ];
278 $running //= 0; # to avoid undef warnings from logfunc
280 my $guest_name = $guest_class->guest_type() . ' ' . $vmid;
282 $logfunc->("guest => $guest_name, running => $running");
283 $logfunc->("volumes => " . join(',', @$sorted_volids));
285 # filter out left-over non-existing/removed storages - avoids error on target
286 $state->{storeid_list
} = [ grep { $storecfg->{ids
}->{$_} } $state->{storeid_list
}->@* ];
288 if (my $remove_job = $jobcfg->{remove_job
}) {
290 $logfunc->("start job removal - mode '${remove_job}'");
292 if ($remove_job eq 'full' && $jobcfg->{target
} ne $local_node) {
293 # remove all remote volumes
294 my @store_list = map { (PVE
::Storage
::parse_volume_id
($_))[0] } @$sorted_volids;
295 push @store_list, @{$state->{storeid_list
}};
297 my %hash = map { $_ => 1 } @store_list;
299 my $ssh_info = PVE
::SSHInfo
::get_ssh_info
($jobcfg->{target
});
300 remote_prepare_local_job
($ssh_info, $jobid, $vmid, [], [ keys %hash ], 1, undef, 1, $logfunc);
303 # remove all local replication snapshots (lastsync => 0)
304 prepare
($storecfg, $sorted_volids, $jobid, 1, undef, $logfunc);
306 PVE
::ReplicationConfig
::delete_job
($jobid); # update config
307 $logfunc->("job removed");
312 my $ssh_info = PVE
::SSHInfo
::get_ssh_info
($jobcfg->{target
}, $migration_network);
314 my ($base_snapshots, $local_snapshots, $last_sync_snapname) = find_common_replication_snapshot
(
315 $ssh_info, $jobid, $vmid, $storecfg, $sorted_volids, $state->{storeid_list
}, $last_sync, $conf, $logfunc);
317 my $storeid_hash = {};
318 foreach my $volid (@$sorted_volids) {
319 my ($storeid) = PVE
::Storage
::parse_volume_id
($volid);
320 $storeid_hash->{$storeid} = 1;
322 $state->{storeid_list
} = [ sort keys %$storeid_hash ];
324 # freeze filesystem for data consistency
326 $logfunc->("freeze guest filesystem");
327 $guest_class->__snapshot_freeze($vmid, 0);
330 # make snapshot of all volumes
332 PVE
::ReplicationState
::replication_snapshot_name
($jobid, $start_time);
334 my $replicate_snapshots = {};
336 foreach my $volid (@$sorted_volids) {
337 $logfunc->("create snapshot '${sync_snapname}' on $volid");
338 PVE
::Storage
::volume_snapshot
($storecfg, $volid, $sync_snapname);
339 $replicate_snapshots->{$volid} = 1;
346 $logfunc->("thaw guest filesystem");
347 $guest_class->__snapshot_freeze($vmid, 1);
350 my $cleanup_local_snapshots = sub {
351 my ($volid_hash, $snapname) = @_;
352 foreach my $volid (sort keys %$volid_hash) {
353 $logfunc->("delete previous replication snapshot '$snapname' on $volid");
354 eval { PVE
::Storage
::volume_snapshot_delete
($storecfg, $volid, $snapname); };
360 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
366 my $rate = $jobcfg->{rate
};
367 my $insecure = $migration_type eq 'insecure';
369 $logfunc->("using $migration_type transmission, rate limit: "
370 . ($rate ?
"$rate MByte/s" : "none"));
372 foreach my $volid (@$sorted_volids) {
375 if (defined($base_snapname = $base_snapshots->{$volid})) {
376 $logfunc->("incremental sync '$volid' ($base_snapname => $sync_snapname)");
378 $logfunc->("full sync '$volid' ($sync_snapname)");
381 replicate_volume
($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure, $logfunc);
386 $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
387 # we do not cleanup the remote side here - this is done in
388 # next run of prepare_local_job
392 # Ensure that new sync is recorded before removing old replication snapshots.
393 PVE
::ReplicationState
::record_sync_end
($jobcfg, $state, $start_time);
395 # remove old snapshots because they are no longer needed
396 $cleanup_local_snapshots->($local_snapshots, $last_sync_snapname);
399 remote_finalize_local_job
($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
402 # old snapshots will removed by next run from prepare_local_job.
404 # warn is for syslog/journal.
407 # logfunc will written in replication log.
408 $logfunc->("delete stale replication snapshot error: $err");
414 my $run_replication_nolock = sub {
415 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
417 my $jobid = $jobcfg->{id
};
421 # we normally write errors into the state file,
422 # but we also catch unexpected errors and log them to syslog
423 # (for examply when there are problems writing the state file)
425 my $state = PVE
::ReplicationState
::read_job_state
($jobcfg);
427 PVE
::ReplicationState
::record_job_start
($jobcfg, $state, $start_time, $iteration);
429 my $t0 = [gettimeofday
];
431 mkdir $PVE::ReplicationState
::replicate_logdir
;
432 my $logfile = PVE
::ReplicationState
::job_logfile_name
($jobid);
433 open(my $logfd, '>', $logfile) ||
434 die "unable to open replication log '$logfile' - $!\n";
436 my $logfunc_wrapper = sub {
439 my $ctime = get_log_time
();
440 print $logfd "$ctime $jobid: $msg\n";
443 $logfunc->("$ctime $jobid: $msg");
450 $logfunc_wrapper->("start replication job");
453 $volumes = replicate
($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
458 my $msg = "end replication job with error: $err";
460 $logfunc_wrapper->($msg);
462 $logfunc_wrapper->("end replication job");
465 PVE
::ReplicationState
::record_job_end
($jobcfg, $state, $start_time, tv_interval
($t0), $err);
474 sub run_replication
{
475 my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
479 my $timeout = 2; # do not wait too long - we repeat periodically anyways
480 $volumes = PVE
::GuestHelpers
::guest_migration_lock
(
481 $jobcfg->{guest
}, $timeout, $run_replication_nolock,
482 $guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose);
487 sub is_replication_snapshot
{
488 my ($snapshot_name, $jobid) = @_;
490 if (defined($jobid)) {
491 return $snapshot_name =~ m/^__replicate_\Q$jobid\E/ ?
1 : 0;
494 return $snapshot_name =~ m/^__replicate_/ ?
1 : 0;