use Data::Dumper;
use JSON;
use Time::HiRes qw(gettimeofday tv_interval);
+use POSIX qw(strftime);
use PVE::INotify;
use PVE::ProcFSTools;
# regression tests should overwrite this
sub get_log_time {
- return time();
+ return strftime("%F %H:%M:%S", localtime);
+}
+
+# Find common base replication snapshot, available on local and remote side.
+# Note: this also removes stale replication snapshots
+sub find_common_replication_snapshot {
+ my ($ssh_info, $jobid, $vmid, $storecfg, $volumes, $storeid_list, $last_sync, $parent_snapname, $logfunc) = @_;
+
+ my $last_sync_snapname =
+ PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
+
+ # test if we have a replication_ snapshot from last sync
+ # and remove all other/stale replication snapshots
+
+ my $last_snapshots = prepare(
+ $storecfg, $volumes, $jobid, $last_sync, $parent_snapname, $logfunc);
+
+ # prepare remote side
+ my $remote_snapshots = remote_prepare_local_job(
+ $ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, 0, $logfunc);
+
+ my $base_snapshots = {};
+
+ foreach my $volid (@$volumes) {
+ my $base_snapname;
+
+ if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
+ if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
+ $remote_snapshots->{$volid}->{$last_sync_snapname}) {
+ $base_snapshots->{$volid} = $last_sync_snapname;
+ } elsif (defined($parent_snapname) &&
+ ($last_snapshots->{$volid}->{$parent_snapname} &&
+ $remote_snapshots->{$volid}->{$parent_snapname})) {
+ $base_snapshots->{$volid} = $parent_snapname;
+ }
+ }
+ }
+
+ return ($base_snapshots, $last_snapshots, $last_sync_snapname);
}
sub remote_prepare_local_job {
$last_sync //= 0;
- my ($prefix, $snapname) =
- PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
+ my ($prefix, $snapname);
+
+ if (defined($jobid)) {
+ ($prefix, $snapname) = PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
+ } else {
+ $prefix = '__replicate_';
+ }
my $last_snapshots = {};
my $cleaned_replicated_volumes = {};
foreach my $volid (@$volids) {
my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid);
foreach my $snap (@$list) {
- if ($snap eq $snapname || (defined($parent_snapname) && ($snap eq $parent_snapname))) {
+ if ((defined($snapname) && ($snap eq $snapname)) ||
+ (defined($parent_snapname) && ($snap eq $parent_snapname))) {
$last_snapshots->{$volid}->{$snap} = 1;
} elsif ($snap =~ m/^\Q$prefix\E/) {
$logfunc->("delete stale replication snapshot '$snap' on $volid");
my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
- # fixme: handle $rate, $insecure ??
+ my $ratelimit_bps = int(1000000*$rate) if $rate;
PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
- $base_snapshot, $sync_snapname);
+ $base_snapshot, $sync_snapname, $ratelimit_bps, $insecure);
}
my $conf = $guest_class->load_config($vmid);
my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
- my $volumes = $guest_class->get_replicatable_volumes($storecfg, $conf);
+ my $volumes = $guest_class->get_replicatable_volumes($storecfg, $vmid, $conf, defined($jobcfg->{remove_job}));
my $sorted_volids = [ sort keys %$volumes ];
PVE::ReplicationConfig::delete_job($jobid); # update config
$logfunc->("job removed");
- return;
+ return undef;
}
my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
- my $last_sync_snapname =
- PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
- my $sync_snapname =
- PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
-
my $parent_snapname = $conf->{parent};
- # test if we have a replication_ snapshot from last sync
- # and remove all other/stale replication snapshots
-
- my $last_snapshots = prepare(
- $storecfg, $sorted_volids, $jobid, $last_sync, $parent_snapname, $logfunc);
-
- # prepare remote side
- my $remote_snapshots = remote_prepare_local_job(
- $ssh_info, $jobid, $vmid, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, 0, $logfunc);
+ my ($base_snapshots, $last_snapshots, $last_sync_snapname) = find_common_replication_snapshot(
+ $ssh_info, $jobid, $vmid, $storecfg, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, $logfunc);
my $storeid_hash = {};
foreach my $volid (@$sorted_volids) {
}
# make snapshot of all volumes
+ my $sync_snapname =
+ PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
+
my $replicate_snapshots = {};
eval {
foreach my $volid (@$sorted_volids) {
foreach my $volid (@$sorted_volids) {
my $base_snapname;
- if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
- if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
- $remote_snapshots->{$volid}->{$last_sync_snapname}) {
- $logfunc->("incremental sync '$volid' ($last_sync_snapname => $sync_snapname)");
- $base_snapname = $last_sync_snapname;
- } elsif (defined($parent_snapname) &&
- ($last_snapshots->{$volid}->{$parent_snapname} &&
- $remote_snapshots->{$volid}->{$parent_snapname})) {
- $logfunc->("incremental sync '$volid' ($parent_snapname => $sync_snapname)");
- $base_snapname = $parent_snapname;
- }
+ if (defined($base_snapname = $base_snapshots->{$volid})) {
+ $logfunc->("incremental sync '$volid' ($base_snapname => $sync_snapname)");
+ } else {
+ $logfunc->("full sync '$volid' ($sync_snapname)");
}
- $logfunc->("full sync '$volid' ($sync_snapname)") if !defined($base_snapname);
replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
}
};
remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
die $err if $err;
+
+ return $volumes;
}
my $run_replication_nolock = sub {
- my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc) = @_;
+ my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
my $jobid = $jobcfg->{id};
+ my $volumes;
+
# we normaly write errors into the state file,
# but we also catch unexpected errors and log them to syslog
# (for examply when there are problems writing the state file)
eval {
my $state = PVE::ReplicationState::read_job_state($jobcfg);
- my $t0 = [gettimeofday];
-
- $state->{pid} = $$;
- $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
- $state->{last_node} = PVE::INotify::nodename();
- $state->{last_try} = $start_time;
- $state->{last_iteration} = $iteration;
- $state->{storeid_list} //= [];
+ PVE::ReplicationState::record_job_start($jobcfg, $state, $start_time, $iteration);
- PVE::ReplicationState::write_job_state($jobcfg, $state);
+ my $t0 = [gettimeofday];
mkdir $PVE::ReplicationState::replicate_logdir;
my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
my $ctime = get_log_time();
print $logfd "$ctime $jobid: $msg\n";
- $logfunc->("$ctime $jobid: $msg") if $logfunc;
+ if ($logfunc) {
+ if ($verbose) {
+ $logfunc->("$ctime $jobid: $msg");
+ } else {
+ $logfunc->($msg);
+ }
+ }
};
$logfunc_wrapper->("start replication job");
eval {
- replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
+ $volumes = replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
};
my $err = $@;
- $state->{duration} = tv_interval($t0);
- delete $state->{pid};
- delete $state->{ptime};
-
if ($err) {
chomp $err;
- $state->{fail_count}++;
- $state->{error} = "$err";
- PVE::ReplicationState::write_job_state($jobcfg, $state);
$logfunc_wrapper->("end replication job with error: $err");
} else {
$logfunc_wrapper->("end replication job");
- $state->{last_sync} = $start_time;
- $state->{fail_count} = 0;
- delete $state->{error};
- PVE::ReplicationState::write_job_state($jobcfg, $state);
}
+ PVE::ReplicationState::record_job_end($jobcfg, $state, $start_time, tv_interval($t0), $err);
+
close($logfd);
};
if (my $err = $@) {
warn "$jobid: got unexpected replication job error - $err";
}
+
+ return $volumes;
};
sub run_replication {
- my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_;
+ my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr, $verbose) = @_;
+
+ my $volumes;
eval {
my $timeout = 2; # do not wait too long - we repeat periodically anyways
- PVE::GuestHelpers::guest_migration_lock(
+ $volumes = PVE::GuestHelpers::guest_migration_lock(
$jobcfg->{guest}, $timeout, $run_replication_nolock,
- $guest_class, $jobcfg, $iteration, $start_time, $logfunc);
+ $guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose);
};
if (my $err = $@) {
return undef if $noerr;
die $err;
}
+ return $volumes;
}
1;