]> git.proxmox.com Git - pve-guest-common.git/blobdiff - PVE/Replication.pm
change replica log timestamp to a human readable format
[pve-guest-common.git] / PVE / Replication.pm
index f978267e1f6ad7d9f7b3763a8c17b8013b510c4b..5a7274e79519a726642e9cef7c5483469de52cd5 100644 (file)
@@ -5,6 +5,7 @@ use strict;
 use Data::Dumper;
 use JSON;
 use Time::HiRes qw(gettimeofday tv_interval);
+use POSIX qw(strftime);
 
 use PVE::INotify;
 use PVE::ProcFSTools;
@@ -19,7 +20,45 @@ use PVE::ReplicationState;
 # regression tests should overwrite this
 sub get_log_time {
 
-    return time();
+    return strftime("%F %H:%M:%S", localtime);
+}
+
+# Find common base replication snapshot, available on local and remote side.
+# Note: this also removes stale replication snapshots
+sub find_common_replication_snapshot {
+    my ($ssh_info, $jobid, $vmid, $storecfg, $volumes, $storeid_list, $last_sync, $parent_snapname, $logfunc) = @_;
+
+    my $last_sync_snapname =
+       PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
+
+    # test if we have a replication_ snapshot from last sync
+    # and remove all other/stale replication snapshots
+
+    my $last_snapshots = prepare(
+       $storecfg, $volumes, $jobid, $last_sync, $parent_snapname, $logfunc);
+
+    # prepare remote side
+    my $remote_snapshots = remote_prepare_local_job(
+       $ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, 0, $logfunc);
+
+    my $base_snapshots = {};
+
+    foreach my $volid (@$volumes) {
+       my $base_snapname;
+
+       if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
+           if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
+               $remote_snapshots->{$volid}->{$last_sync_snapname}) {
+               $base_snapshots->{$volid} = $last_sync_snapname;
+           } elsif (defined($parent_snapname) &&
+                    ($last_snapshots->{$volid}->{$parent_snapname} &&
+                     $remote_snapshots->{$volid}->{$parent_snapname})) {
+               $base_snapshots->{$volid} = $parent_snapname;
+           }
+       }
+    }
+
+    return ($base_snapshots, $last_snapshots, $last_sync_snapname);
 }
 
 sub remote_prepare_local_job {
@@ -79,15 +118,21 @@ sub prepare {
 
     $last_sync //= 0;
 
-    my ($prefix, $snapname) =
-       PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
+    my ($prefix, $snapname);
+
+    if (defined($jobid)) {
+       ($prefix, $snapname) = PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
+    } else {
+       $prefix = '__replicate_';
+    }
 
     my $last_snapshots = {};
     my $cleaned_replicated_volumes = {};
     foreach my $volid (@$volids) {
        my $list = PVE::Storage::volume_snapshot_list($storecfg, $volid);
        foreach my $snap (@$list) {
-           if ($snap eq $snapname || (defined($parent_snapname) && ($snap eq $parent_snapname))) {
+           if ((defined($snapname) && ($snap eq $snapname)) ||
+               (defined($parent_snapname) && ($snap eq $parent_snapname))) {
                $last_snapshots->{$volid}->{$snap} = 1;
            } elsif ($snap =~ m/^\Q$prefix\E/) {
                $logfunc->("delete stale replication snapshot '$snap' on $volid");
@@ -105,9 +150,9 @@ sub replicate_volume {
 
     my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
 
-    # fixme: handle $rate, $insecure ??
+    my $ratelimit_bps = int(1000000*$rate) if $rate;
     PVE::Storage::storage_migrate($storecfg, $volid, $ssh_info, $storeid, $volname,
-                                 $base_snapshot, $sync_snapname);
+                                 $base_snapshot, $sync_snapname, $ratelimit_bps, $insecure);
 }
 
 
@@ -139,7 +184,7 @@ sub replicate {
 
     my $conf = $guest_class->load_config($vmid);
     my ($running, $freezefs) = $guest_class->__snapshot_check_freeze_needed($vmid, $conf, 0);
-    my $volumes = $guest_class->get_replicatable_volumes($storecfg, $conf);
+    my $volumes = $guest_class->get_replicatable_volumes($storecfg, $vmid, $conf, defined($jobcfg->{remove_job}));
 
     my $sorted_volids = [ sort keys %$volumes ];
 
@@ -164,27 +209,15 @@ sub replicate {
        PVE::ReplicationConfig::delete_job($jobid); # update config
        $logfunc->("job removed");
 
-       return;
+       return undef;
     }
 
     my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
 
-    my $last_sync_snapname =
-       PVE::ReplicationState::replication_snapshot_name($jobid, $last_sync);
-    my $sync_snapname =
-       PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
-
     my $parent_snapname = $conf->{parent};
 
-    # test if we have a replication_ snapshot from last sync
-    # and remove all other/stale replication snapshots
-
-    my $last_snapshots = prepare(
-       $storecfg, $sorted_volids, $jobid, $last_sync, $parent_snapname, $logfunc);
-
-    # prepare remote side
-    my $remote_snapshots = remote_prepare_local_job(
-       $ssh_info, $jobid, $vmid, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, 0, $logfunc);
+    my ($base_snapshots, $last_snapshots, $last_sync_snapname) = find_common_replication_snapshot(
+       $ssh_info, $jobid, $vmid, $storecfg, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, $logfunc);
 
     my $storeid_hash = {};
     foreach my $volid (@$sorted_volids) {
@@ -200,6 +233,9 @@ sub replicate {
     }
 
     # make snapshot of all volumes
+    my $sync_snapname =
+       PVE::ReplicationState::replication_snapshot_name($jobid, $start_time);
+
     my $replicate_snapshots = {};
     eval {
        foreach my $volid (@$sorted_volids) {
@@ -237,20 +273,12 @@ sub replicate {
        foreach my $volid (@$sorted_volids) {
            my $base_snapname;
 
-           if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
-               if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
-                   $remote_snapshots->{$volid}->{$last_sync_snapname}) {
-                   $logfunc->("incremental sync '$volid' ($last_sync_snapname => $sync_snapname)");
-                   $base_snapname = $last_sync_snapname;
-               } elsif (defined($parent_snapname) &&
-                        ($last_snapshots->{$volid}->{$parent_snapname} &&
-                         $remote_snapshots->{$volid}->{$parent_snapname})) {
-                   $logfunc->("incremental sync '$volid' ($parent_snapname => $sync_snapname)");
-                   $base_snapname = $parent_snapname;
-               }
+           if (defined($base_snapname = $base_snapshots->{$volid})) {
+               $logfunc->("incremental sync '$volid' ($base_snapname => $sync_snapname)");
+           } else {
+               $logfunc->("full sync '$volid' ($sync_snapname)");
            }
 
-           $logfunc->("full sync '$volid' ($sync_snapname)") if !defined($base_snapname);
            replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
        }
     };
@@ -269,29 +297,26 @@ sub replicate {
     remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
 
     die $err if $err;
+
+    return $volumes;
 }
 
 my $run_replication_nolock = sub {
-    my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc) = @_;
+    my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose) = @_;
 
     my $jobid = $jobcfg->{id};
 
+    my $volumes;
+
     # we normaly write errors into the state file,
     # but we also catch unexpected errors and log them to syslog
     # (for examply when there are problems writing the state file)
     eval {
        my $state = PVE::ReplicationState::read_job_state($jobcfg);
 
-       my $t0 = [gettimeofday];
-
-       $state->{pid} = $$;
-       $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
-       $state->{last_node} = PVE::INotify::nodename();
-       $state->{last_try} = $start_time;
-       $state->{last_iteration} = $iteration;
-       $state->{storeid_list} //= [];
+       PVE::ReplicationState::record_job_start($jobcfg, $state, $start_time, $iteration);
 
-       PVE::ReplicationState::write_job_state($jobcfg, $state);
+       my $t0 = [gettimeofday];
 
        mkdir $PVE::ReplicationState::replicate_logdir;
        my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
@@ -303,54 +328,56 @@ my $run_replication_nolock = sub {
 
            my $ctime = get_log_time();
            print $logfd "$ctime $jobid: $msg\n";
-           $logfunc->("$ctime $jobid: $msg") if $logfunc;
+           if ($logfunc) {
+               if ($verbose) {
+                   $logfunc->("$ctime $jobid: $msg");
+               } else {
+                   $logfunc->($msg);
+               }
+           }
        };
 
        $logfunc_wrapper->("start replication job");
 
        eval {
-           replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
+           $volumes = replicate($guest_class, $jobcfg, $state, $start_time, $logfunc_wrapper);
        };
        my $err = $@;
 
-       $state->{duration} = tv_interval($t0);
-       delete $state->{pid};
-       delete $state->{ptime};
-
        if ($err) {
            chomp $err;
-           $state->{fail_count}++;
-           $state->{error} = "$err";
-           PVE::ReplicationState::write_job_state($jobcfg,  $state);
            $logfunc_wrapper->("end replication job with error: $err");
        } else {
            $logfunc_wrapper->("end replication job");
-           $state->{last_sync} = $start_time;
-           $state->{fail_count} = 0;
-           delete $state->{error};
-           PVE::ReplicationState::write_job_state($jobcfg,  $state);
        }
 
+       PVE::ReplicationState::record_job_end($jobcfg, $state, $start_time, tv_interval($t0), $err);
+
        close($logfd);
     };
     if (my $err = $@) {
        warn "$jobid: got unexpected replication job error - $err";
     }
+
+    return $volumes;
 };
 
 sub run_replication {
-    my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_;
+    my ($guest_class, $jobcfg, $iteration, $start_time, $logfunc, $noerr, $verbose) = @_;
+
+    my $volumes;
 
     eval {
        my $timeout = 2; # do not wait too long - we repeat periodically anyways
-       PVE::GuestHelpers::guest_migration_lock(
+       $volumes = PVE::GuestHelpers::guest_migration_lock(
            $jobcfg->{guest}, $timeout, $run_replication_nolock,
-           $guest_class, $jobcfg, $iteration, $start_time, $logfunc);
+           $guest_class, $jobcfg, $iteration, $start_time, $logfunc, $verbose);
     };
     if (my $err = $@) {
        return undef if $noerr;
        die $err;
     }
+    return $volumes;
 }
 
 1;