]> git.proxmox.com Git - pve-guest-common.git/commitdiff
add PVE/Replication.pm - moved from pve-manager
authorDietmar Maurer <dietmar@proxmox.com>
Mon, 12 Jun 2017 05:03:22 +0000 (07:03 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Mon, 12 Jun 2017 05:03:22 +0000 (07:03 +0200)
Makefile
PVE/Replication.pm [new file with mode: 0644]

index 143a6de1fe0e83d97c27ae2f37ae23c228ad1db2..2f7cb7ee1bf8daf09ab748ed9d0875e509665802 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -26,6 +26,7 @@ install: PVE
        install -m 0644 PVE/AbstractMigrate.pm ${PERL5DIR}/PVE/
        install -m 0644 PVE/ReplicationConfig.pm ${PERL5DIR}/PVE/
        install -m 0644 PVE/ReplicationState.pm ${PERL5DIR}/PVE/
+       install -m 0644 PVE/Replication.pm ${PERL5DIR}/PVE/
        install -d ${PERL5DIR}/PVE/VZDump
        install -m 0644 PVE/VZDump/Plugin.pm ${PERL5DIR}/PVE/VZDump/
 
diff --git a/PVE/Replication.pm b/PVE/Replication.pm
new file mode 100644 (file)
index 0000000..f978267
--- /dev/null
@@ -0,0 +1,356 @@
+package PVE::Replication;
+
+use warnings;
+use strict;
+use Data::Dumper;
+use JSON;
+use Time::HiRes qw(gettimeofday tv_interval);
+
+use PVE::INotify;
+use PVE::ProcFSTools;
+use PVE::Tools;
+use PVE::Cluster;
+use PVE::Storage;
+use PVE::GuestHelpers;
+use PVE::ReplicationConfig;
+use PVE::ReplicationState;
+
+
+# regression tests should overwrite this
+sub get_log_time {
+
+    return time();
+}
+
+sub remote_prepare_local_job {
+    my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
+
+    my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
+    my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
+    push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
+    push @$cmd, @$volumes if scalar(@$volumes);
+
+    push @$cmd, '--last_sync', $last_sync;
+    push @$cmd, '--parent_snapname', $parent_snapname
+       if $parent_snapname;
+    push @$cmd, '--force' if $force;
+
+    my $remote_snapshots;
+
+    my $parser = sub {
+       my $line = shift;
+       $remote_snapshots = JSON::decode_json($line);
+    };
+
+    my $logger = sub {
+       my $line = shift;
+       chomp $line;
+       $logfunc->("(remote_prepare_local_job) $line");
+    };
+
+    PVE::Tools::run_command($cmd, outfunc => $parser, errfunc => $logger);
+
+    die "prepare remote node failed - no result\n"
+       if !defined($remote_snapshots);
+
+    return $remote_snapshots;
+}
+
+sub remote_finalize_local_job {
+    my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
+
+    my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
+    my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
+              @$volumes, '--last_sync', $last_sync];
+
+    my $logger = sub {
+       my $line = shift;
+       chomp $line;
+       $logfunc->("(remote_finalize_local_job) $line");
+    };
+
+    PVE::Tools::run_command($cmd, outfunc => $logger, errfunc => $logger);
+}
+
+# finds local replication snapshots from $last_sync
+# and removes all replication snapshots with other time stamps
+sub prepare {
+    my ($storecfg, $volids, $jobid, $last_sync, $parent_snapname, $logfunc) = @_;
+
+    $last_sync //= 0;
+
+    my ($prefix, $snapname) =
+       PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
+
+    my $last_snapshots = {};
+    my $cleaned_replicated_volumes = {};
+    foreach my $volid (@$volids) {
+       my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid);
+       foreach my $snap (@$list) {
+           if ($snap eq $snapname || (defined($parent_snapname) && ($snap eq $parent_snapname))) {
+               $last_snapshots->{$volid}->{$snap} = 1;
+           } elsif ($snap =~ m/^\Q$prefix\E/) {
+               $logfunc->("delete stale replication snapshot '$snap' on $volid");
+               PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
+               $cleaned_replicated_volumes->{$volid} = 1;
+           }
+       }
+    }
+
+    return wantarray ? ($last_snapshots, $cleaned_replicated_volumes) : $last_snapshots;
+}
+
+sub replicate_volume {
+    my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure) = @_;
+
+    my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
+
+    # fixme: handle $rate, $insecure ??
+    PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
+                                 $base_snapshot, $sync_snapname);
+}
+
+
+sub replicate {
+    my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
+
+    my $local_node = PVE::INotify::nodename();
+
+    die "not implemented - internal error" if $jobcfg->{type} ne 'local';
+
+    my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
+
+    my $migration_network;
+    my $migration_type = 'secure';
+    if (my $mc = $dc_conf->{migration}) {
+       $migration_network = $mc->{network};
+       $migration_type = $mc->{type} if defined($mc->{type});
+    }
+
+    my $jobid = $jobcfg->{id};
+    my $storecfg = PVE::Storage::config();
+    my $last_sync = $state->{last_sync};
+
+    die "start time before last sync ($start_time <= $last_sync) - abort sync\n"
+       if $start_time <= $last_sync;
+
+    my $vmid = $jobcfg->{guest};
+    my $vmtype = $jobcfg->{vmtype};
+
+    my $conf = $guest_class->load_config($vmid);
+    my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
+    my $volumes = $guest_class->get_replicatable_volumes($storecfg, $conf);
+
+    my $sorted_volids = [ sort keys %$volumes ];
+
+    $running //= 0;  # to avoid undef warnings from logfunc
+
+    $logfunc->("guest => $vmid, type => $vmtype, running => $running");
+    $logfunc->("volumes => " . join(',', @$sorted_volids));
+
+    if (my $remove_job = $jobcfg->{remove_job}) {
+
+       $logfunc->("start job removal - mode '${remove_job}'");
+
+       if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) {
+           # remove all remote volumes
+           my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target});
+           remote_prepare_local_job($ssh_info, $jobid, $vmid, [], $state->{storeid_list}, 0, undef, 1, $logfunc);
+
+       }
+       # remove all local replication snapshots (lastsync => 0)
+       prepare($storecfg, $sorted_volids, $jobid, 0, undef, $logfunc);
+
+       PVE::ReplicationConfig::delete_job($jobid); # update config
+       $logfunc->("job removed");
+
+       return;
+    }
+
+    my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
+
+    my $last_sync_snapname =
+       PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
+    my $sync_snapname =
+       PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
+
+    my $parent_snapname = $conf->{parent};
+
+    # test if we have a replication_ snapshot from last sync
+    # and remove all other/stale replication snapshots
+
+    my $last_snapshots = prepare(
+       $storecfg, $sorted_volids, $jobid, $last_sync, $parent_snapname, $logfunc);
+
+    # prepare remote side
+    my $remote_snapshots = remote_prepare_local_job(
+       $ssh_info, $jobid, $vmid, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, 0, $logfunc);
+
+    my $storeid_hash = {};
+    foreach my $volid (@$sorted_volids) {
+       my ($storeid) = PVE::Storage::parse_volume_id($volid);
+       $storeid_hash->{$storeid} = 1;
+    }
+    $state->{storeid_list} = [ sort keys %$storeid_hash ];
+
+    # freeze filesystem for data consistency
+    if ($freezefs) {
+       $logfunc->("freeze guest filesystem");
+       $guest_class->__snapshot_freeze($vmid, 0);
+    }
+
+    # make snapshot of all volumes
+    my $replicate_snapshots = {};
+    eval {
+       foreach my $volid (@$sorted_volids) {
+           $logfunc->("create snapshot '${sync_snapname}' on $volid");
+           PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
+           $replicate_snapshots->{$volid} = 1;
+       }
+    };
+    my $err = $@;
+
+    # unfreeze immediately
+    if ($freezefs) {
+       $guest_class->__snapshot_freeze($vmid, 1);
+    }
+
+    my $cleanup_local_snapshots = sub {
+       my ($volid_hash, $snapname) = @_;
+       foreach my $volid (sort keys %$volid_hash) {
+           $logfunc->("delete previous replication snapshot '$snapname' on $volid");
+           eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname); };
+           warn $@ if $@;
+       }
+    };
+
+    if ($err) {
+       $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
+       die $err;
+    }
+
+    eval {
+
+       my $rate = $jobcfg->{rate};
+       my $insecure = $migration_type eq 'insecure';
+
+       foreach my $volid (@$sorted_volids) {
+           my $base_snapname;
+
+           if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
+               if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
+                   $remote_snapshots->{$volid}->{$last_sync_snapname}) {
+                   $logfunc->("incremental sync '$volid' ($last_sync_snapname => $sync_snapname)");
+                   $base_snapname = $last_sync_snapname;
+               } elsif (defined($parent_snapname) &&
+                        ($last_snapshots->{$volid}->{$parent_snapname} &&
+                         $remote_snapshots->{$volid}->{$parent_snapname})) {
+                   $logfunc->("incremental sync '$volid' ($parent_snapname => $sync_snapname)");
+                   $base_snapname = $parent_snapname;
+               }
+           }
+
+           $logfunc->("full sync '$volid' ($sync_snapname)") if !defined($base_snapname);
+           replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
+       }
+    };
+    $err = $@;
+
+    if ($err) {
+       $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
+       # we do not cleanup the remote side here - this is done in
+       # next run of prepare_local_job
+       die $err;
+    }
+
+    # remove old snapshots because they are no longer needed
+    $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
+
+    remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
+
+    die $err if $err;
+}
+
+my $run_replication_nolock = sub {
+    my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc) = @_;
+
+    my $jobid = $jobcfg->{id};
+
+    # we normaly write errors into the state file,
+    # but we also catch unexpected errors and log them to syslog
+    # (for examply when there are problems writing the state file)
+    eval {
+       my $state = PVE::ReplicationState::read_job_state($jobcfg);
+
+       my $t0 = [gettimeofday];
+
+       $state->{pid} = $$;
+       $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
+       $state->{last_node} = PVE::INotify::nodename();
+       $state->{last_try} = $start_time;
+       $state->{last_iteration} = $iteration;
+       $state->{storeid_list} //= [];
+
+       PVE::ReplicationState::write_job_state($jobcfg, $state);
+
+       mkdir $PVE::ReplicationState::replicate_logdir;
+       my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
+       open(my $logfd, '>', $logfile) ||
+           die "unable to open replication log '$logfile' - $!\n";
+
+       my $logfunc_wrapper = sub {
+           my ($msg) = @_;
+
+           my $ctime = get_log_time();
+           print $logfd "$ctime $jobid: $msg\n";
+           $logfunc->("$ctime $jobid: $msg") if $logfunc;
+       };
+
+       $logfunc_wrapper->("start replication job");
+
+       eval {
+           replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
+       };
+       my $err = $@;
+
+       $state->{duration} = tv_interval($t0);
+       delete $state->{pid};
+       delete $state->{ptime};
+
+       if ($err) {
+           chomp $err;
+           $state->{fail_count}++;
+           $state->{error} = "$err";
+           PVE::ReplicationState::write_job_state($jobcfg,  $state);
+           $logfunc_wrapper->("end replication job with error: $err");
+       } else {
+           $logfunc_wrapper->("end replication job");
+           $state->{last_sync} = $start_time;
+           $state->{fail_count} = 0;
+           delete $state->{error};
+           PVE::ReplicationState::write_job_state($jobcfg,  $state);
+       }
+
+       close($logfd);
+    };
+    if (my $err = $@) {
+       warn "$jobid: got unexpected replication job error - $err";
+    }
+};
+
+sub run_replication {
+    my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_;
+
+    eval {
+       my $timeout = 2; # do not wait too long - we repeat periodically anyways
+       PVE::GuestHelpers::guest_migration_lock(
+           $jobcfg->{guest}, $timeout, $run_replication_nolock,
+           $guest_class, $jobcfg, $iteration, $start_time, $logfunc);
+    };
+    if (my $err = $@) {
+       return undef if $noerr;
+       die $err;
+    }
+}
+
+1;