X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=PVE%2FReplication.pm;h=2e28e537c49e42d261491980d1b08ec1782a8851;hb=1b82f171171fcf8423fc222b023643947793746e;hp=ed8d07b3538bafd7d49fdcd0c733b3d45928f360;hpb=c475e16d11ab9cae30749c2f80d7fb4c38e7054e;p=pve-guest-common.git diff --git a/PVE/Replication.pm b/PVE/Replication.pm index ed8d07b..2e28e53 100644 --- a/PVE/Replication.pm +++ b/PVE/Replication.pm @@ -5,6 +5,7 @@ use strict; use Data::Dumper; use JSON; use Time::HiRes qw(gettimeofday tv_interval); +use POSIX qw(strftime); use PVE::INotify; use PVE::ProcFSTools; @@ -19,7 +20,45 @@ use PVE::ReplicationState; # regression tests should overwrite this sub get_log_time { - return time(); + return strftime("%F %H:%M:%S", localtime); +} + +# Find common base replication snapshot, available on local and remote side. +# Note: this also removes stale replication snapshots +sub find_common_replication_snapshot { + my ($ssh_info, $jobid, $vmid, $storecfg, $volumes, $storeid_list, $last_sync, $parent_snapname, $logfunc) = @_; + + my $last_sync_snapname = + PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync); + + # test if we have a replication_ snapshot from last sync + # and remove all other/stale replication snapshots + + my $last_snapshots = prepare( + $storecfg, $volumes, $jobid, $last_sync, $parent_snapname, $logfunc); + + # prepare remote side + my $remote_snapshots = remote_prepare_local_job( + $ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, 0, $logfunc); + + my $base_snapshots = {}; + + foreach my $volid (@$volumes) { + my $base_snapname; + + if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) { + if ($last_snapshots->{$volid}->{$last_sync_snapname} && + $remote_snapshots->{$volid}->{$last_sync_snapname}) { + $base_snapshots->{$volid} = $last_sync_snapname; + } elsif (defined($parent_snapname) && + ($last_snapshots->{$volid}->{$parent_snapname} && + $remote_snapshots->{$volid}->{$parent_snapname})) { + $base_snapshots->{$volid} = $parent_snapname; + } + } + } + + return ($base_snapshots, $last_snapshots, $last_sync_snapname); } sub remote_prepare_local_job { @@ -170,27 +209,15 @@ sub replicate { PVE::ReplicationConfig::delete_job($jobid); # update config $logfunc->("job removed"); - return; + return undef; } my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network); - my $last_sync_snapname = - PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync); - my $sync_snapname = - PVE::ReplicationState::replication_snapshot_name($jobid, $start_time); - my $parent_snapname = $conf->{parent}; - # test if we have a replication_ snapshot from last sync - # and remove all other/stale replication snapshots - - my $last_snapshots = prepare( - $storecfg, $sorted_volids, $jobid, $last_sync, $parent_snapname, $logfunc); - - # prepare remote side - my $remote_snapshots = remote_prepare_local_job( - $ssh_info, $jobid, $vmid, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, 0, $logfunc); + my ($base_snapshots, $last_snapshots, $last_sync_snapname) = find_common_replication_snapshot( + $ssh_info, $jobid, $vmid, $storecfg, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, $logfunc); my $storeid_hash = {}; foreach my $volid (@$sorted_volids) { @@ -206,6 +233,9 @@ sub replicate { } # make snapshot of all volumes + my $sync_snapname = + PVE::ReplicationState::replication_snapshot_name($jobid, $start_time); + my $replicate_snapshots = {}; eval { foreach my $volid (@$sorted_volids) { @@ -216,8 +246,9 @@ sub replicate { }; my $err = $@; - # unfreeze immediately + # thaw immediately if ($freezefs) { + $logfunc->("thaw guest filesystem"); $guest_class->__snapshot_freeze($vmid, 1); } @@ -243,20 +274,12 @@ sub replicate { foreach my $volid (@$sorted_volids) { my $base_snapname; - if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) { - if ($last_snapshots->{$volid}->{$last_sync_snapname} && - $remote_snapshots->{$volid}->{$last_sync_snapname}) { - $logfunc->("incremental sync '$volid' ($last_sync_snapname => $sync_snapname)"); - $base_snapname = $last_sync_snapname; - } elsif (defined($parent_snapname) && - ($last_snapshots->{$volid}->{$parent_snapname} && - $remote_snapshots->{$volid}->{$parent_snapname})) { - $logfunc->("incremental sync '$volid' ($parent_snapname => $sync_snapname)"); - $base_snapname = $parent_snapname; - } + if (defined($base_snapname = $base_snapshots->{$volid})) { + $logfunc->("incremental sync '$volid' ($base_snapname => $sync_snapname)"); + } else { + $logfunc->("full sync '$volid' ($sync_snapname)"); } - $logfunc->("full sync '$volid' ($sync_snapname)") if !defined($base_snapname); replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure); } }; @@ -275,29 +298,26 @@ sub replicate { remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc); die $err if $err; + + return $volumes; } my $run_replication_nolock = sub { - my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc) = @_; + my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr, $verbose) = @_; my $jobid = $jobcfg->{id}; + my $volumes; + # we normaly write errors into the state file, # but we also catch unexpected errors and log them to syslog # (for examply when there are problems writing the state file) eval { my $state = PVE::ReplicationState::read_job_state($jobcfg); - my $t0 = [gettimeofday]; - - $state->{pid} = $$; - $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid}); - $state->{last_node} = PVE::INotify::nodename(); - $state->{last_try} = $start_time; - $state->{last_iteration} = $iteration; - $state->{storeid_list} //= []; + PVE::ReplicationState::record_job_start($jobcfg, $state, $start_time, $iteration); - PVE::ReplicationState::write_job_state($jobcfg, $state); + my $t0 = [gettimeofday]; mkdir $PVE::ReplicationState::replicate_logdir; my $logfile = PVE::ReplicationState::job_logfile_name($jobid); @@ -309,54 +329,63 @@ my $run_replication_nolock = sub { my $ctime = get_log_time(); print $logfd "$ctime $jobid: $msg\n"; - $logfunc->("$ctime $jobid: $msg") if $logfunc; + if ($logfunc) { + if ($verbose) { + $logfunc->("$ctime $jobid: $msg"); + } else { + $logfunc->($msg); + } + } }; $logfunc_wrapper->("start replication job"); eval { - replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper); + $volumes = replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper); }; my $err = $@; - $state->{duration} = tv_interval($t0); - delete $state->{pid}; - delete $state->{ptime}; - if ($err) { - chomp $err; - $state->{fail_count}++; - $state->{error} = "$err"; - PVE::ReplicationState::write_job_state($jobcfg, $state); - $logfunc_wrapper->("end replication job with error: $err"); + my $msg = "end replication job with error: $err"; + chomp $msg; + $logfunc_wrapper->($msg); } else { $logfunc_wrapper->("end replication job"); - $state->{last_sync} = $start_time; - $state->{fail_count} = 0; - delete $state->{error}; - PVE::ReplicationState::write_job_state($jobcfg, $state); } + PVE::ReplicationState::record_job_end($jobcfg, $state, $start_time, tv_interval($t0), $err); + close($logfd); + + die $err if $err && !$noerr; }; if (my $err = $@) { - warn "$jobid: got unexpected replication job error - $err"; + if ($noerr) { + warn "$jobid: got unexpected replication job error - $err"; + } else { + die $err; + } } + + return $volumes; }; sub run_replication { - my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_; + my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr, $verbose) = @_; + + my $volumes; eval { my $timeout = 2; # do not wait too long - we repeat periodically anyways - PVE::GuestHelpers::guest_migration_lock( + $volumes = PVE::GuestHelpers::guest_migration_lock( $jobcfg->{guest}, $timeout, $run_replication_nolock, - $guest_class, $jobcfg, $iteration, $start_time, $logfunc); + $guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr, $verbose); }; if (my $err = $@) { return undef if $noerr; die $err; } + return $volumes; } 1;