]> git.proxmox.com Git - pve-manager.git/commitdiff
PVE::Replication - use new PVE::ReplicationState class
authorDietmar Maurer <dietmar@proxmox.com>
Fri, 2 Jun 2017 08:07:33 +0000 (10:07 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Fri, 2 Jun 2017 10:28:43 +0000 (12:28 +0200)
PVE/Replication.pm
bin/test/ReplicationTestEnv.pm

index 55fda1f0e3831069f799023438ddb6544881465a..5e25e6f7b999010a8e0ea0a6d1679041cded575c 100644 (file)
@@ -19,66 +19,17 @@ use PVE::LXC;
 use PVE::Storage;
 use PVE::GuestHelpers;
 use PVE::ReplicationConfig;
+use PVE::ReplicationState;
 
-# Note: regression tests can overwrite $state_path for testing
-our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
 our $pvesr_lock_path = "/var/lock/pvesr.lck";
 
-my $update_job_state = sub {
-    my ($stateobj, $jobcfg, $state) = @_;
-
-    my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
-
-    my $vmid = $jobcfg->{guest};
-    my $tid = $plugin->get_unique_target_id($jobcfg);
-
-    # Note: tuple ($vmid, $tid) is unique
-    $stateobj->{$vmid}->{$tid} = $state;
-
-    PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
-};
-
-my $get_job_state = sub {
-    my ($stateobj, $jobcfg) = @_;
-
-    my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
-
-    my $vmid = $jobcfg->{guest};
-    my $tid = $plugin->get_unique_target_id($jobcfg);
-    my $state = $stateobj->{$vmid}->{$tid};
-
-    $state = {} if !$state;
-
-    $state->{last_iteration} //= 0;
-    $state->{last_try} //= 0; # last sync start time
-    $state->{last_sync} //= 0; # last successful sync start time
-    $state->{fail_count} //= 0;
-
-    return $state;
-};
-
-my $read_state = sub {
-
-    return {} if ! -e $state_path;
-
-    my $raw = PVE::Tools::file_get_contents($state_path);
-
-    return {} if $raw eq '';
-
-    # untaint $raw
-    $raw =~ m/^({.*})$/;
-
-    return decode_json($1);
-};
-
 sub job_status {
-    my ($stateobj) = @_;
 
     my $local_node = PVE::INotify::nodename();
 
     my $jobs = {};
 
-    $stateobj = $read_state->() if !$stateobj;
+    my $stateobj = PVE::ReplicationState::read_state();
 
     my $cfg = PVE::ReplicationConfig->new();
 
@@ -103,7 +54,7 @@ sub job_status {
            next if $jobcfg->{disable};
        }
 
-       my $state = $get_job_state->($stateobj, $jobcfg);
+       my $state = PVE::ReplicationState::extract_job_state($stateobj, $jobcfg);
        $jobcfg->{state} = $state;
        $jobcfg->{id} = $jobid;
        $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
@@ -134,9 +85,9 @@ sub job_status {
 }
 
 my $get_next_job = sub {
-    my ($stateobj, $iteration, $start_time) = @_;
+    my ($iteration, $start_time) = @_;
 
-    my $jobs = job_status($stateobj);
+    my $jobs = job_status();
 
     my $sort_func = sub {
        my $joba = $jobs->{$a};
@@ -404,59 +355,71 @@ sub replicate {
     die $err if $err;
 }
 
-my $run_replication = sub {
-    my ($stateobj, $jobcfg, $iteration, $start_time, $logfunc) = @_;
-
-    my $state = $get_job_state->($stateobj, $jobcfg);
+my $run_replication_nolock = sub {
+    my ($jobcfg, $iteration, $start_time, $logfunc) = @_;
 
-    my $t0 = [gettimeofday];
+    # we normaly write errors into the state file,
+    # but we also catch unexpected errors and log them to syslog
+    # (for examply when there are problems writing the state file)
+    eval {
+       my $state = PVE::ReplicationState::read_job_state($jobcfg);
 
-    # cleanup stale pid/ptime state
-    foreach my $vmid (keys %$stateobj) {
-       foreach my $tid (keys %{$stateobj->{$vmid}}) {
-           my $state = $stateobj->{$vmid}->{$tid};
-           delete $state->{pid};
-           delete $state->{ptime};
-       }
-    }
+       my $t0 = [gettimeofday];
 
-    $state->{pid} = $$;
-    $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
-    $state->{last_try} = $start_time;
-    $state->{last_iteration} = $iteration;
+       $state->{pid} = $$;
+       $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
+       $state->{last_try} = $start_time;
+       $state->{last_iteration} = $iteration;
 
-    $update_job_state->($stateobj, $jobcfg,  $state);
+       PVE::ReplicationState::write_job_state($jobcfg, $state);
 
-    $logfunc->($start_time, "$jobcfg->{id}: start replication job") if $logfunc;
+       $logfunc->($start_time, "$jobcfg->{id}: start replication job") if $logfunc;
 
-    eval {
-       my $timeout = 2; # do not wait too long - we repeat periodically anyways
-       PVE::GuestHelpers::guest_migration_lock(
-           $jobcfg->{guest}, $timeout, \&replicate,
-           $jobcfg, $state->{last_sync}, $start_time, $logfunc);
-    };
-    my $err = $@;
+       eval {
+           replicate($jobcfg, $state->{last_sync}, $start_time, $logfunc);
+       };
+       my $err = $@;
 
-    $state->{duration} = tv_interval($t0);
-    delete $state->{pid};
-    delete $state->{ptime};
+       $state->{duration} = tv_interval($t0);
+       delete $state->{pid};
+       delete $state->{ptime};
 
-    if ($err) {
-       $state->{fail_count}++;
-       $state->{error} = "$err";
-       $update_job_state->($stateobj, $jobcfg,  $state);
-       if ($logfunc) {
+       if ($err) {
            chomp $err;
-           $logfunc->($start_time, "$jobcfg->{id}: end replication job with error: $err");
+           $state->{fail_count}++;
+           $state->{error} = "$err";
+           PVE::ReplicationState::write_job_state($jobcfg,  $state);
+           my $msg = "$jobcfg->{id}: end replication job with error: $err";
+           if ($logfunc) {
+               $logfunc->($start_time, $msg);
+           } else {
+               warn "$msg\n";
+           }
        } else {
-           warn $err;
+           $logfunc->($start_time, "$jobcfg->{id}: end replication job") if $logfunc;
+           $state->{last_sync} = $start_time;
+           $state->{fail_count} = 0;
+           delete $state->{error};
+           PVE::ReplicationState::write_job_state($jobcfg,  $state);
        }
-    } else {
-       $logfunc->($start_time, "$jobcfg->{id}: end replication job") if $logfunc;
-       $state->{last_sync} = $start_time;
-       $state->{fail_count} = 0;
-       delete $state->{error};
-       $update_job_state->($stateobj, $jobcfg,  $state);
+    };
+    if (my $err = $@) {
+       warn "$jobcfg->{id}: got unexpected error - $err";
+    }
+};
+
+my $run_replication = sub {
+    my ($jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_;
+
+    eval {
+       my $timeout = 2; # do not wait too long - we repeat periodically anyways
+       PVE::GuestHelpers::guest_migration_lock(
+           $jobcfg->{guest}, $timeout, $run_replication_nolock,
+           $jobcfg, $iteration, $start_time, $logfunc);
+    };
+    if (my $err = $@) {
+       return undef if $noerr;
+       die $err;
     }
 };
 
@@ -468,8 +431,6 @@ sub run_single_job {
     my $code = sub {
        $now //= time();
 
-       my $stateobj = $read_state->();
-
        my $cfg = PVE::ReplicationConfig->new();
 
        my $jobcfg = $cfg->{ids}->{$jobid};
@@ -489,11 +450,10 @@ sub run_single_job {
 
        die "unable to sync to local node\n" if $jobcfg->{target} eq $local_node;
 
-       $jobcfg->{state} = $get_job_state->($stateobj, $jobcfg);
        $jobcfg->{id} = $jobid;
        $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
 
-       $run_replication->($stateobj, $jobcfg, $now, $now, $logfunc);
+       $run_replication->($jobcfg, $now, $now, $logfunc);
     };
 
     my $res = PVE::Tools::lock_file($pvesr_lock_path, 60, $code);
@@ -506,11 +466,10 @@ sub run_jobs {
     my $iteration = $now // time();
 
     my $code = sub {
-       my $stateobj = $read_state->();
        my $start_time = $now // time();
 
-       while (my $jobcfg = $get_next_job->($stateobj, $iteration, $start_time)) {
-           $run_replication->($stateobj, $jobcfg, $iteration, $start_time, $logfunc);
+       while (my $jobcfg = $get_next_job->($iteration, $start_time)) {
+           $run_replication->($jobcfg, $iteration, $start_time, $logfunc, 1);
            $start_time = $now // time();
        }
     };
index 79d826a1623356022f1f2a8663c3468756dbaba8..a5605c3b95fc22e5e393b91b35da15a09cfc3086 100755 (executable)
@@ -13,6 +13,8 @@ use Data::Dumper;
 use PVE::INotify;
 use PVE::Cluster;
 use PVE::Storage;
+use PVE::ReplicationConfig;
+use PVE::ReplicationState;
 use PVE::Replication;
 use PVE::QemuConfig;
 use PVE::LXC::Config;
@@ -59,27 +61,10 @@ my $mocked_ssh_info_to_command = sub {
 my $statefile = ".mocked_repl_state";
 
 unlink $statefile;
-$PVE::Replication::state_path = $statefile;
+$PVE::ReplicationState::state_path = $statefile;
+$PVE::ReplicationState::state_lock = ".mocked_repl_state_lock";
 $PVE::Replication::pvesr_lock_path = ".mocked_pvesr_lock";
 
-my $mocked_write_state = sub {
-    my ($state) = @_;
-
-    PVE::Tools::file_set_contents($statefile, encode_json($state));
-};
-
-my $mocked_read_state = sub {
-
-    return {} if ! -e $statefile;
-
-    my $raw = PVE::Tools::file_get_contents($statefile);
-
-    return {} if $raw eq '';
-
-    return decode_json($raw);
-};
-
-
 my $pve_cluster_module = Test::MockModule->new('PVE::Cluster');
 
 my $pve_inotify_module = Test::MockModule->new('PVE::INotify');