]> git.proxmox.com Git - pve-guest-common.git/blobdiff - PVE/Replication.pm
replicaiton: log rate and transport type
[pve-guest-common.git] / PVE / Replication.pm
index 1762f2df4fc4d190da8803a46c6f6025f760c0d4..b0c6db3beb967dd32beefb4a3b8674363e7f6d35 100644 (file)
@@ -5,21 +5,24 @@ use strict;
 use Data::Dumper;
 use JSON;
 use Time::HiRes qw(gettimeofday tv_interval);
+use POSIX qw(strftime);
 
 use PVE::INotify;
 use PVE::ProcFSTools;
 use PVE::Tools;
 use PVE::Cluster;
+use PVE::DataCenterConfig;
 use PVE::Storage;
 use PVE::GuestHelpers;
 use PVE::ReplicationConfig;
 use PVE::ReplicationState;
+use PVE::SSHInfo;
 
 
 # regression tests should overwrite this
 sub get_log_time {
 
-    return time();
+    return strftime("%F %H:%M:%S", localtime);
 }
 
 # Find common base replication snapshot, available on local and remote side.
@@ -53,6 +56,21 @@ sub find_common_replication_snapshot {
                     ($last_snapshots->{$volid}->{$parent_snapname} &&
                      $remote_snapshots->{$volid}->{$parent_snapname})) {
                $base_snapshots->{$volid} = $parent_snapname;
+           } elsif ($last_sync == 0) {
+               my @desc_sorted_snap =
+                   map { $_->[1] } sort { $b->[0] <=> $a->[0] }
+                   map { [ ($_ =~ /__replicate_\Q$jobid\E_(\d+)_/)[0] || 0, $_ ] }
+                   keys %{$remote_snapshots->{$volid}};
+
+               foreach my $remote_snap (@desc_sorted_snap) {
+                   if (defined($last_snapshots->{$volid}->{$remote_snap})) {
+                       $base_snapshots->{$volid} = $remote_snap;
+                       last;
+                   }
+               }
+               die "No common base to restore the job state\n".
+                   "please delete jobid: $jobid and create the job again\n"
+                   if !defined($base_snapshots->{$volid});
            }
        }
     }
@@ -63,7 +81,7 @@ sub find_common_replication_snapshot {
 sub remote_prepare_local_job {
     my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
 
-    my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
+    my $ssh_cmd = PVE::SSHInfo::ssh_info_to_command($ssh_info);
     my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
     push @$cmd, '--scan', join(',', @$storeid_list) if scalar(@$storeid_list);
     push @$cmd, @$volumes if scalar(@$volumes);
@@ -97,7 +115,7 @@ sub remote_prepare_local_job {
 sub remote_finalize_local_job {
     my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
 
-    my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
+    my $ssh_cmd = PVE::SSHInfo::ssh_info_to_command($ssh_info);
     my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
               @$volumes, '--last_sync', $last_sync];
 
@@ -134,9 +152,27 @@ sub prepare {
                (defined($parent_snapname) && ($snap eq $parent_snapname))) {
                $last_snapshots->{$volid}->{$snap} = 1;
            } elsif ($snap =~ m/^\Q$prefix\E/) {
-               $logfunc->("delete stale replication snapshot '$snap' on $volid");
-               PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
-               $cleaned_replicated_volumes->{$volid} = 1;
+               if ($last_sync != 0) {
+                   $logfunc->("delete stale replication snapshot '$snap' on $volid");
+                   eval {
+                       PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
+                       $cleaned_replicated_volumes->{$volid} = 1;
+                   };
+
+                   # If deleting the snapshot fails, we can not be sure if it was due to an error or a timeout.
+                   # The likelihood that the delete has worked out is high at a timeout.
+                   # If it really fails, it will try to remove on the next run.
+                   if (my $err = $@) {
+                       # warn is for syslog/journal.
+                       warn $err;
+
+                       # logfunc will written in replication log.
+                       $logfunc->("delete stale replication snapshot error: $err");
+                   }           
+               # Last_sync=0 and a replication snapshot only occur, if the VM was stolen
+               } else {
+                   $last_snapshots->{$volid}->{$snap} = 1;
+               }
            }
        }
     }
@@ -145,13 +181,14 @@ sub prepare {
 }
 
 sub replicate_volume {
-    my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure) = @_;
+    my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname, $rate, $insecure, $logfunc) = @_;
 
     my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
 
-    my $ratelimit_bps = int(1000000*$rate) if $rate;
+    my $ratelimit_bps = $rate ? int(1000000 * $rate) : undef;
+
     PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
-                                 $base_snapshot, $sync_snapname, $ratelimit_bps, $insecure);
+                                 $base_snapshot, $sync_snapname, $ratelimit_bps, $insecure, 1, $logfunc);
 }
 
 
@@ -179,7 +216,6 @@ sub replicate {
        if $start_time <= $last_sync;
 
     my $vmid = $jobcfg->{guest};
-    my $vmtype = $jobcfg->{vmtype};
 
     my $conf = $guest_class->load_config($vmid);
     my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
@@ -189,7 +225,9 @@ sub replicate {
 
     $running //= 0;  # to avoid undef warnings from logfunc
 
-    $logfunc->("guest => $vmid, type => $vmtype, running => $running");
+    my $guest_name = $guest_class->guest_type() . ' ' . $vmid;
+
+    $logfunc->("guest => $guest_name, running => $running");
     $logfunc->("volumes => " . join(',', @$sorted_volids));
 
     if (my $remove_job = $jobcfg->{remove_job}) {
@@ -198,20 +236,24 @@ sub replicate {
 
        if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) {
            # remove all remote volumes
-           my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target});
-           remote_prepare_local_job($ssh_info, $jobid, $vmid, [], $state->{storeid_list}, 0, undef, 1, $logfunc);
+           my @store_list = map { (PVE::Storage::parse_volume_id($_))[0] } @$sorted_volids;
+
+           my %hash = map { $_ => 1 } @store_list;
+
+           my $ssh_info = PVE::SSHInfo::get_ssh_info($jobcfg->{target});
+           remote_prepare_local_job($ssh_info, $jobid, $vmid, [], [ keys %hash ], 1, undef, 1, $logfunc);
 
        }
        # remove all local replication snapshots (lastsync => 0)
-       prepare($storecfg, $sorted_volids, $jobid, 0, undef, $logfunc);
+       prepare($storecfg, $sorted_volids, $jobid, 1, undef, $logfunc);
 
        PVE::ReplicationConfig::delete_job($jobid); # update config
        $logfunc->("job removed");
 
-       return;
+       return undef;
     }
 
-    my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
+    my $ssh_info = PVE::SSHInfo::get_ssh_info($jobcfg->{target}, $migration_network);
 
     my $parent_snapname = $conf->{parent};
 
@@ -245,8 +287,9 @@ sub replicate {
     };
     my $err = $@;
 
-    # unfreeze immediately
+    # thaw immediately
     if ($freezefs) {
+       $logfunc->("thaw guest filesystem");
        $guest_class->__snapshot_freeze($vmid, 1);
     }
 
@@ -269,6 +312,9 @@ sub replicate {
        my $rate = $jobcfg->{rate};
        my $insecure = $migration_type eq 'insecure';
 
+       $logfunc->("using $migration_type transmission, rate limit: "
+           . ($rate ? "$rate MByte/s" : "none"));
+
        foreach my $volid (@$sorted_volids) {
            my $base_snapname;
 
@@ -278,12 +324,11 @@ sub replicate {
                $logfunc->("full sync '$volid' ($sync_snapname)");
            }
 
-           replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
+           replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure, $logfunc);
        }
     };
-    $err = $@;
 
-    if ($err) {
+    if ($err = $@) {
        $cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
        # we do not cleanup the remote side here - this is done in
        # next run of prepare_local_job
@@ -293,91 +338,93 @@ sub replicate {
     # remove old snapshots because they are no longer needed
     $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
 
-    remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
+    eval {
+       remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
+    };
 
-    die $err if $err;
+    # old snapshots will removed by next run from prepare_local_job.
+    if ($err = $@) {
+       # warn is for syslog/journal.
+       warn $err;
+
+       # logfunc will written in replication log.
+       $logfunc->("delete stale replication snapshot error: $err");
+    }
+
+    return $volumes;
 }
 
 my $run_replication_nolock = sub {
-    my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc) = @_;
+    my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
 
     my $jobid = $jobcfg->{id};
 
-    # we normaly write errors into the state file,
+    my $volumes;
+
+    # we normally write errors into the state file,
     # but we also catch unexpected errors and log them to syslog
     # (for examply when there are problems writing the state file)
-    eval {
-       my $state = PVE::ReplicationState::read_job_state($jobcfg);
-
-       my $t0 = [gettimeofday];
-
-       $state->{pid} = $$;
-       $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
-       $state->{last_node} = PVE::INotify::nodename();
-       $state->{last_try} = $start_time;
-       $state->{last_iteration} = $iteration;
-       $state->{storeid_list} //= [];
-
-       PVE::ReplicationState::write_job_state($jobcfg, $state);
-
-       mkdir $PVE::ReplicationState::replicate_logdir;
-       my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
-       open(my $logfd, '>', $logfile) ||
-           die "unable to open replication log '$logfile' - $!\n";
-
-       my $logfunc_wrapper = sub {
-           my ($msg) = @_;
-
-           my $ctime = get_log_time();
-           print $logfd "$ctime $jobid: $msg\n";
-           $logfunc->("$ctime $jobid: $msg") if $logfunc;
-       };
-
-       $logfunc_wrapper->("start replication job");
-
-       eval {
-           replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
-       };
-       my $err = $@;
-
-       $state->{duration} = tv_interval($t0);
-       delete $state->{pid};
-       delete $state->{ptime};
-
-       if ($err) {
-           chomp $err;
-           $state->{fail_count}++;
-           $state->{error} = "$err";
-           PVE::ReplicationState::write_job_state($jobcfg,  $state);
-           $logfunc_wrapper->("end replication job with error: $err");
-       } else {
-           $logfunc_wrapper->("end replication job");
-           $state->{last_sync} = $start_time;
-           $state->{fail_count} = 0;
-           delete $state->{error};
-           PVE::ReplicationState::write_job_state($jobcfg,  $state);
+
+    my $state = PVE::ReplicationState::read_job_state($jobcfg);
+
+    PVE::ReplicationState::record_job_start($jobcfg, $state, $start_time, $iteration);
+
+    my $t0 = [gettimeofday];
+
+    mkdir $PVE::ReplicationState::replicate_logdir;
+    my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
+    open(my $logfd, '>', $logfile) ||
+       die "unable to open replication log '$logfile' - $!\n";
+
+    my $logfunc_wrapper = sub {
+       my ($msg) = @_;
+
+       my $ctime = get_log_time();
+       print $logfd "$ctime $jobid: $msg\n";
+       if ($logfunc) {
+           if ($verbose) {
+               $logfunc->("$ctime $jobid: $msg");
+           } else {
+               $logfunc->($msg);
+           }
        }
+    };
 
-       close($logfd);
+    $logfunc_wrapper->("start replication job");
+
+    eval {
+       $volumes = replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
     };
-    if (my $err = $@) {
-       warn "$jobid: got unexpected replication job error - $err";
+    my $err = $@;
+
+    if ($err) {
+       my $msg = "end replication job with error: $err";
+       chomp $msg;
+       $logfunc_wrapper->($msg);
+    } else {
+       $logfunc_wrapper->("end replication job");
     }
+
+    PVE::ReplicationState::record_job_end($jobcfg, $state, $start_time, tv_interval($t0), $err);
+
+    close($logfd);
+
+    die $err if $err;
+
+    return $volumes;
 };
 
 sub run_replication {
-    my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_;
+    my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
 
-    eval {
-       my $timeout = 2; # do not wait too long - we repeat periodically anyways
-       PVE::GuestHelpers::guest_migration_lock(
-           $jobcfg->{guest}, $timeout, $run_replication_nolock,
-           $guest_class, $jobcfg, $iteration, $start_time, $logfunc);
-    };
-    if (my $err = $@) {
-       return undef if $noerr;
-       die $err;
-    }
+    my $volumes;
+
+    my $timeout = 2; # do not wait too long - we repeat periodically anyways
+    $volumes = PVE::GuestHelpers::guest_migration_lock(
+       $jobcfg->{guest}, $timeout, $run_replication_nolock,
+       $guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose);
+
+    return $volumes;
 }
 
 1;