X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=PVE%2FReplication.pm;h=5a7274e79519a726642e9cef7c5483469de52cd5;hb=93c3695b0593397e8c8948e70ab79c91c6cf2793;hp=f978267e1f6ad7d9f7b3763a8c17b8013b510c4b;hpb=a6538c1ec5e77af5a5ae4b16eb1306d98d4ced58;p=pve-guest-common.git diff --git a/PVE/Replication.pm b/PVE/Replication.pm index f978267..5a7274e 100644 --- a/PVE/Replication.pm +++ b/PVE/Replication.pm @@ -5,6 +5,7 @@ use strict; use Data::Dumper; use JSON; use Time::HiRes qw(gettimeofday tv_interval); +use POSIX qw(strftime); use PVE::INotify; use PVE::ProcFSTools; @@ -19,7 +20,45 @@ use PVE::ReplicationState; # regression tests should overwrite this sub get_log_time { - return time(); + return strftime("%F %H:%M:%S", localtime); +} + +# Find common base replication snapshot, available on local and remote side. +# Note: this also removes stale replication snapshots +sub find_common_replication_snapshot { + my ($ssh_info, $jobid, $vmid, $storecfg, $volumes, $storeid_list, $last_sync, $parent_snapname, $logfunc) = @_; + + my $last_sync_snapname = + PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync); + + # test if we have a replication_ snapshot from last sync + # and remove all other/stale replication snapshots + + my $last_snapshots = prepare( + $storecfg, $volumes, $jobid, $last_sync, $parent_snapname, $logfunc); + + # prepare remote side + my $remote_snapshots = remote_prepare_local_job( + $ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, 0, $logfunc); + + my $base_snapshots = {}; + + foreach my $volid (@$volumes) { + my $base_snapname; + + if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) { + if ($last_snapshots->{$volid}->{$last_sync_snapname} && + $remote_snapshots->{$volid}->{$last_sync_snapname}) { + $base_snapshots->{$volid} = $last_sync_snapname; + } elsif (defined($parent_snapname) && + ($last_snapshots->{$volid}->{$parent_snapname} && + $remote_snapshots->{$volid}->{$parent_snapname})) { + $base_snapshots->{$volid} = $parent_snapname; + } + } + } + + return ($base_snapshots, $last_snapshots, $last_sync_snapname); } sub remote_prepare_local_job { @@ -79,15 +118,21 @@ sub prepare { $last_sync //= 0; - my ($prefix, $snapname) = - PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync); + my ($prefix, $snapname); + + if (defined($jobid)) { + ($prefix, $snapname) = PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync); + } else { + $prefix = '__replicate_'; + } my $last_snapshots = {}; my $cleaned_replicated_volumes = {}; foreach my $volid (@$volids) { my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid); foreach my $snap (@$list) { - if ($snap eq $snapname || (defined($parent_snapname) && ($snap eq $parent_snapname))) { + if ((defined($snapname) && ($snap eq $snapname)) || + (defined($parent_snapname) && ($snap eq $parent_snapname))) { $last_snapshots->{$volid}->{$snap} = 1; } elsif ($snap =~ m/^\Q$prefix\E/) { $logfunc->("delete stale replication snapshot '$snap' on $volid"); @@ -105,9 +150,9 @@ sub replicate_volume { my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid); - # fixme: handle $rate, $insecure ?? + my $ratelimit_bps = int(1000000*$rate) if $rate; PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname, - $base_snapshot, $sync_snapname); + $base_snapshot, $sync_snapname, $ratelimit_bps, $insecure); } @@ -139,7 +184,7 @@ sub replicate { my $conf = $guest_class->load_config($vmid); my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0); - my $volumes = $guest_class->get_replicatable_volumes($storecfg, $conf); + my $volumes = $guest_class->get_replicatable_volumes($storecfg, $vmid, $conf, defined($jobcfg->{remove_job})); my $sorted_volids = [ sort keys %$volumes ]; @@ -164,27 +209,15 @@ sub replicate { PVE::ReplicationConfig::delete_job($jobid); # update config $logfunc->("job removed"); - return; + return undef; } my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network); - my $last_sync_snapname = - PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync); - my $sync_snapname = - PVE::ReplicationState::replication_snapshot_name($jobid, $start_time); - my $parent_snapname = $conf->{parent}; - # test if we have a replication_ snapshot from last sync - # and remove all other/stale replication snapshots - - my $last_snapshots = prepare( - $storecfg, $sorted_volids, $jobid, $last_sync, $parent_snapname, $logfunc); - - # prepare remote side - my $remote_snapshots = remote_prepare_local_job( - $ssh_info, $jobid, $vmid, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, 0, $logfunc); + my ($base_snapshots, $last_snapshots, $last_sync_snapname) = find_common_replication_snapshot( + $ssh_info, $jobid, $vmid, $storecfg, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, $logfunc); my $storeid_hash = {}; foreach my $volid (@$sorted_volids) { @@ -200,6 +233,9 @@ sub replicate { } # make snapshot of all volumes + my $sync_snapname = + PVE::ReplicationState::replication_snapshot_name($jobid, $start_time); + my $replicate_snapshots = {}; eval { foreach my $volid (@$sorted_volids) { @@ -237,20 +273,12 @@ sub replicate { foreach my $volid (@$sorted_volids) { my $base_snapname; - if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) { - if ($last_snapshots->{$volid}->{$last_sync_snapname} && - $remote_snapshots->{$volid}->{$last_sync_snapname}) { - $logfunc->("incremental sync '$volid' ($last_sync_snapname => $sync_snapname)"); - $base_snapname = $last_sync_snapname; - } elsif (defined($parent_snapname) && - ($last_snapshots->{$volid}->{$parent_snapname} && - $remote_snapshots->{$volid}->{$parent_snapname})) { - $logfunc->("incremental sync '$volid' ($parent_snapname => $sync_snapname)"); - $base_snapname = $parent_snapname; - } + if (defined($base_snapname = $base_snapshots->{$volid})) { + $logfunc->("incremental sync '$volid' ($base_snapname => $sync_snapname)"); + } else { + $logfunc->("full sync '$volid' ($sync_snapname)"); } - $logfunc->("full sync '$volid' ($sync_snapname)") if !defined($base_snapname); replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure); } }; @@ -269,29 +297,26 @@ sub replicate { remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc); die $err if $err; + + return $volumes; } my $run_replication_nolock = sub { - my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc) = @_; + my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_; my $jobid = $jobcfg->{id}; + my $volumes; + # we normaly write errors into the state file, # but we also catch unexpected errors and log them to syslog # (for examply when there are problems writing the state file) eval { my $state = PVE::ReplicationState::read_job_state($jobcfg); - my $t0 = [gettimeofday]; - - $state->{pid} = $$; - $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid}); - $state->{last_node} = PVE::INotify::nodename(); - $state->{last_try} = $start_time; - $state->{last_iteration} = $iteration; - $state->{storeid_list} //= []; + PVE::ReplicationState::record_job_start($jobcfg, $state, $start_time, $iteration); - PVE::ReplicationState::write_job_state($jobcfg, $state); + my $t0 = [gettimeofday]; mkdir $PVE::ReplicationState::replicate_logdir; my $logfile = PVE::ReplicationState::job_logfile_name($jobid); @@ -303,54 +328,56 @@ my $run_replication_nolock = sub { my $ctime = get_log_time(); print $logfd "$ctime $jobid: $msg\n"; - $logfunc->("$ctime $jobid: $msg") if $logfunc; + if ($logfunc) { + if ($verbose) { + $logfunc->("$ctime $jobid: $msg"); + } else { + $logfunc->($msg); + } + } }; $logfunc_wrapper->("start replication job"); eval { - replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper); + $volumes = replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper); }; my $err = $@; - $state->{duration} = tv_interval($t0); - delete $state->{pid}; - delete $state->{ptime}; - if ($err) { chomp $err; - $state->{fail_count}++; - $state->{error} = "$err"; - PVE::ReplicationState::write_job_state($jobcfg, $state); $logfunc_wrapper->("end replication job with error: $err"); } else { $logfunc_wrapper->("end replication job"); - $state->{last_sync} = $start_time; - $state->{fail_count} = 0; - delete $state->{error}; - PVE::ReplicationState::write_job_state($jobcfg, $state); } + PVE::ReplicationState::record_job_end($jobcfg, $state, $start_time, tv_interval($t0), $err); + close($logfd); }; if (my $err = $@) { warn "$jobid: got unexpected replication job error - $err"; } + + return $volumes; }; sub run_replication { - my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_; + my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr, $verbose) = @_; + + my $volumes; eval { my $timeout = 2; # do not wait too long - we repeat periodically anyways - PVE::GuestHelpers::guest_migration_lock( + $volumes = PVE::GuestHelpers::guest_migration_lock( $jobcfg->{guest}, $timeout, $run_replication_nolock, - $guest_class, $jobcfg, $iteration, $start_time, $logfunc); + $guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose); }; if (my $err = $@) { return undef if $noerr; die $err; } + return $volumes; } 1;