use PVE::ProcFSTools;
use PVE::Tools;
use PVE::Cluster;
+use PVE::DataCenterConfig;
use PVE::Storage;
use PVE::GuestHelpers;
use PVE::ReplicationConfig;
use PVE::ReplicationState;
+use PVE::SSHInfo;
# regression tests should overwrite this
($last_snapshots->{$volid}->{$parent_snapname} &&
$remote_snapshots->{$volid}->{$parent_snapname})) {
$base_snapshots->{$volid} = $parent_snapname;
+ } elsif ($last_sync == 0) {
+ my @desc_sorted_snap =
+ map { $_->[1] } sort { $b->[0] <=> $a->[0] }
+ map { [ ($_ =~ /__replicate_\Q$jobid\E_(\d+)_/)[0] || 0, $_ ] }
+ keys %{$remote_snapshots->{$volid}};
+
+ foreach my $remote_snap (@desc_sorted_snap) {
+ if (defined($last_snapshots->{$volid}->{$remote_snap})) {
+ $base_snapshots->{$volid} = $remote_snap;
+ last;
+ }
+ }
+ die "No common base to restore the job state\n".
+ "please delete jobid: $jobid and create the job again\n"
+ if !defined($base_snapshots->{$volid});
}
}
}
sub remote_prepare_local_job {
my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
- my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
+ my $ssh_cmd = PVE::SSHInfo::ssh_info_to_command($ssh_info);
my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
push @$cmd, @$volumes if scalar(@$volumes);
sub remote_finalize_local_job {
my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
- my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
+ my $ssh_cmd = PVE::SSHInfo::ssh_info_to_command($ssh_info);
my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
@$volumes, '--last_sync', $last_sync];
(defined($parent_snapname) && ($snap eq $parent_snapname))) {
$last_snapshots->{$volid}->{$snap} = 1;
} elsif ($snap =~ m/^\Q$prefix\E/) {
- $logfunc->("delete stale replication snapshot '$snap' on $volid");
- PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
- $cleaned_replicated_volumes->{$volid} = 1;
+ if ($last_sync != 0) {
+ $logfunc->("delete stale replication snapshot '$snap' on $volid");
+ eval {
+ PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
+ $cleaned_replicated_volumes->{$volid} = 1;
+ };
+
+ # If deleting the snapshot fails, we can not be sure if it was due to an error or a timeout.
+ # The likelihood that the delete has worked out is high at a timeout.
+ # If it really fails, it will try to remove on the next run.
+ if (my $err = $@) {
+ # warn is for syslog/journal.
+ warn $err;
+
+ # logfunc will written in replication log.
+ $logfunc->("delete stale replication snapshot error: $err");
+ }
+ # Last_sync=0 and a replication snapshot only occur, if the VM was stolen
+ } else {
+ $last_snapshots->{$volid}->{$snap} = 1;
+ }
}
}
}
}
sub replicate_volume {
- my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure) = @_;
+ my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure, $logfunc) = @_;
my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
- my $ratelimit_bps = int(1000000*$rate) if $rate;
+ my $ratelimit_bps = $rate ? int(1000000 * $rate) : undef;
+
PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
- $base_snapshot, $sync_snapname, $ratelimit_bps, $insecure);
+ $base_snapshot, $sync_snapname, $ratelimit_bps, $insecure, 1, $logfunc);
}
if $start_time <= $last_sync;
my $vmid = $jobcfg->{guest};
- my $vmtype = $jobcfg->{vmtype};
my $conf = $guest_class->load_config($vmid);
my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
$running //= 0; # to avoid undef warnings from logfunc
- $logfunc->("guest => $vmid, type => $vmtype, running => $running");
+ my $guest_name = $guest_class->guest_type() . ' ' . $vmid;
+
+ $logfunc->("guest => $guest_name, running => $running");
$logfunc->("volumes => " . join(',', @$sorted_volids));
if (my $remove_job = $jobcfg->{remove_job}) {
if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) {
# remove all remote volumes
- my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target});
- remote_prepare_local_job($ssh_info, $jobid, $vmid, [], $state->{storeid_list}, 0, undef, 1, $logfunc);
+ my @store_list = map { (PVE::Storage::parse_volume_id($_))[0] } @$sorted_volids;
+
+ my %hash = map { $_ => 1 } @store_list;
+
+ my $ssh_info = PVE::SSHInfo::get_ssh_info($jobcfg->{target});
+ remote_prepare_local_job($ssh_info, $jobid, $vmid, [], [ keys %hash ], 1, undef, 1, $logfunc);
}
# remove all local replication snapshots (lastsync => 0)
- prepare($storecfg, $sorted_volids, $jobid, 0, undef, $logfunc);
+ prepare($storecfg, $sorted_volids, $jobid, 1, undef, $logfunc);
PVE::ReplicationConfig::delete_job($jobid); # update config
$logfunc->("job removed");
return undef;
}
- my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
+ my $ssh_info = PVE::SSHInfo::get_ssh_info($jobcfg->{target}, $migration_network);
my $parent_snapname = $conf->{parent};
};
my $err = $@;
- # unfreeze immediately
+ # thaw immediately
if ($freezefs) {
+ $logfunc->("thaw guest filesystem");
$guest_class->__snapshot_freeze($vmid, 1);
}
my $rate = $jobcfg->{rate};
my $insecure = $migration_type eq 'insecure';
+ $logfunc->("using $migration_type transmission, rate limit: "
+ . ($rate ? "$rate MByte/s" : "none"));
+
foreach my $volid (@$sorted_volids) {
my $base_snapname;
$logfunc->("full sync '$volid' ($sync_snapname)");
}
- replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
+ replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure, $logfunc);
}
};
- $err = $@;
- if ($err) {
+ if ($err = $@) {
$cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
# we do not cleanup the remote side here - this is done in
# next run of prepare_local_job
# remove old snapshots because they are no longer needed
$cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
- remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
+ eval {
+ remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
+ };
- die $err if $err;
+ # old snapshots will removed by next run from prepare_local_job.
+ if ($err = $@) {
+ # warn is for syslog/journal.
+ warn $err;
+
+ # logfunc will written in replication log.
+ $logfunc->("delete stale replication snapshot error: $err");
+ }
return $volumes;
}
my $volumes;
- # we normaly write errors into the state file,
+ # we normally write errors into the state file,
# but we also catch unexpected errors and log them to syslog
# (for examply when there are problems writing the state file)
- eval {
- my $state = PVE::ReplicationState::read_job_state($jobcfg);
- PVE::ReplicationState::record_job_start($jobcfg, $state, $start_time, $iteration);
+ my $state = PVE::ReplicationState::read_job_state($jobcfg);
- my $t0 = [gettimeofday];
+ PVE::ReplicationState::record_job_start($jobcfg, $state, $start_time, $iteration);
- mkdir $PVE::ReplicationState::replicate_logdir;
- my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
- open(my $logfd, '>', $logfile) ||
- die "unable to open replication log '$logfile' - $!\n";
-
- my $logfunc_wrapper = sub {
- my ($msg) = @_;
-
- my $ctime = get_log_time();
- print $logfd "$ctime $jobid: $msg\n";
- if ($logfunc) {
- if ($verbose) {
- $logfunc->("$ctime $jobid: $msg");
- } else {
- $logfunc->($msg);
- }
- }
- };
+ my $t0 = [gettimeofday];
- $logfunc_wrapper->("start replication job");
+ mkdir $PVE::ReplicationState::replicate_logdir;
+ my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
+ open(my $logfd, '>', $logfile) ||
+ die "unable to open replication log '$logfile' - $!\n";
- eval {
- $volumes = replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
- };
- my $err = $@;
+ my $logfunc_wrapper = sub {
+ my ($msg) = @_;
- if ($err) {
- chomp $err;
- $logfunc_wrapper->("end replication job with error: $err");
- } else {
- $logfunc_wrapper->("end replication job");
+ my $ctime = get_log_time();
+ print $logfd "$ctime $jobid: $msg\n";
+ if ($logfunc) {
+ if ($verbose) {
+ $logfunc->("$ctime $jobid: $msg");
+ } else {
+ $logfunc->($msg);
+ }
}
+ };
- PVE::ReplicationState::record_job_end($jobcfg, $state, $start_time, tv_interval($t0), $err);
+ $logfunc_wrapper->("start replication job");
- close($logfd);
+ eval {
+ $volumes = replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
};
- if (my $err = $@) {
- warn "$jobid: got unexpected replication job error - $err";
+ my $err = $@;
+
+ if ($err) {
+ my $msg = "end replication job with error: $err";
+ chomp $msg;
+ $logfunc_wrapper->($msg);
+ } else {
+ $logfunc_wrapper->("end replication job");
}
+ PVE::ReplicationState::record_job_end($jobcfg, $state, $start_time, tv_interval($t0), $err);
+
+ close($logfd);
+
+ die $err if $err;
+
return $volumes;
};
sub run_replication {
- my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr, $verbose) = @_;
+ my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
my $volumes;
- eval {
- my $timeout = 2; # do not wait too long - we repeat periodically anyways
- $volumes = PVE::GuestHelpers::guest_migration_lock(
- $jobcfg->{guest}, $timeout, $run_replication_nolock,
- $guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose);
- };
- if (my $err = $@) {
- return undef if $noerr;
- die $err;
- }
+ my $timeout = 2; # do not wait too long - we repeat periodically anyways
+ $volumes = PVE::GuestHelpers::guest_migration_lock(
+ $jobcfg->{guest}, $timeout, $run_replication_nolock,
+ $guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose);
+
return $volumes;
}