use PVE::RPCEnvironment;
use PVE::ProcFSTools;
use PVE::ReplicationConfig;
+use PVE::ReplicationState;
use PVE::Replication;
use PVE::QemuConfig;
use PVE::QemuServer;
my $code = sub {
my $start_time = $now // time();
- while (my $jobcfg = PVE::Replication::get_next_job($iteration, $start_time)) {
+ while (my $jobcfg = PVE::ReplicationState::get_next_job($iteration, $start_time)) {
my $guest_class = $lookup_guest_class->($jobcfg->{vmtype});
PVE::Replication::run_replication($guest_class, $jobcfg, $iteration, $start_time, $logfunc, 1);
$start_time = $now // time();
my $rpcenv = PVE::RPCEnvironment::get();
my $authuser = $rpcenv->get_user();
- my $jobs = PVE::Replication::job_status();
+ my $jobs = PVE::ReplicationState::job_status();
my $res = [];
foreach my $id (sort keys %$jobs) {
my $rpcenv = PVE::RPCEnvironment::get();
my $authuser = $rpcenv->get_user();
- my $jobs = PVE::Replication::job_status();
+ my $jobs = PVE::ReplicationState::job_status();
my $jobid = $param->{id};
my $jobcfg = $jobs->{$jobid};
use PVE::INotify;
use PVE::ProcFSTools;
use PVE::Tools;
-use PVE::CalendarEvent;
use PVE::Cluster;
-use PVE::AbstractConfig;
use PVE::Storage;
use PVE::GuestHelpers;
use PVE::ReplicationConfig;
use PVE::ReplicationState;
-our $replicate_logdir = "/var/log/pve/replicate";
-
-# regression tests should overwrite this
-sub job_logfile_name {
- my ($jobid) = @_;
-
- return "${replicate_logdir}/$jobid";
-}
# regression tests should overwrite this
sub get_log_time {
return time();
}
-sub job_status {
-
- my $local_node = PVE::INotify::nodename();
-
- my $jobs = {};
-
- my $stateobj = PVE::ReplicationState::read_state();
-
- my $cfg = PVE::ReplicationConfig->new();
-
- my $vms = PVE::Cluster::get_vmlist();
-
- foreach my $jobid (sort keys %{$cfg->{ids}}) {
- my $jobcfg = $cfg->{ids}->{$jobid};
- my $vmid = $jobcfg->{guest};
-
- die "internal error - not implemented" if $jobcfg->{type} ne 'local';
-
- # skip non existing vms
- next if !$vms->{ids}->{$vmid};
-
- # only consider guest on local node
- next if $vms->{ids}->{$vmid}->{node} ne $local_node;
-
- if (!$jobcfg->{remove_job}) {
- # never sync to local node
- next if $jobcfg->{target} eq $local_node;
-
- next if $jobcfg->{disable};
- }
-
- my $state = PVE::ReplicationState::extract_job_state($stateobj, $jobcfg);
- $jobcfg->{state} = $state;
- $jobcfg->{id} = $jobid;
- $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
-
- my $next_sync = 0;
-
- if ($jobcfg->{remove_job}) {
- $next_sync = 1; # lowest possible value
- # todo: consider fail_count? How many retries?
- } else {
- if (my $fail_count = $state->{fail_count}) {
- if ($fail_count < 3) {
- $next_sync = $state->{last_try} + 5*60*$fail_count;
- }
- } else {
- my $schedule = $jobcfg->{schedule} || '*/15';
- my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
- $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $state->{last_try}) // 0;
- }
- }
-
- $jobcfg->{next_sync} = $next_sync;
-
- $jobs->{$jobid} = $jobcfg;
- }
-
- return $jobs;
-}
-
-sub get_next_job {
- my ($iteration, $start_time) = @_;
-
- my $jobs = job_status();
-
- my $sort_func = sub {
- my $joba = $jobs->{$a};
- my $jobb = $jobs->{$b};
- my $sa = $joba->{state};
- my $sb = $jobb->{state};
- my $res = $sa->{last_iteration} cmp $sb->{last_iteration};
- return $res if $res != 0;
- $res = $joba->{next_sync} <=> $jobb->{next_sync};
- return $res if $res != 0;
- return $joba->{guest} <=> $jobb->{guest};
- };
-
- foreach my $jobid (sort $sort_func keys %$jobs) {
- my $jobcfg = $jobs->{$jobid};
- next if $jobcfg->{state}->{last_iteration} >= $iteration;
- if ($jobcfg->{next_sync} && ($start_time >= $jobcfg->{next_sync})) {
- return $jobcfg;
- }
- }
-
- return undef;
-}
-
sub remote_prepare_local_job {
my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
$base_snapshot, $sync_snapname);
}
-sub delete_job {
- my ($jobid) = @_;
-
- my $code = sub {
- my $cfg = PVE::ReplicationConfig->new();
- delete $cfg->{ids}->{$jobid};
- $cfg->write();
- };
-
- PVE::ReplicationConfig::lock($code);
-}
sub replicate {
my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
# remove all local replication snapshots (lastsync => 0)
prepare($storecfg, $sorted_volids, $jobid, 0, undef, $logfunc);
- delete_job($jobid); # update config
+ PVE::ReplicationConfig::delete_job($jobid); # update config
$logfunc->("job removed");
return;
PVE::ReplicationState::write_job_state($jobcfg, $state);
- mkdir $replicate_logdir;
- my $logfile = job_logfile_name($jobid);
+ mkdir $PVE::ReplicationState::replicate_logdir;
+ my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
open(my $logfd, '>', $logfile) ||
die "unable to open replication log '$logfile' - $!\n";
our $mocked_replication_jobs = {};
-my $pve_replicationconfig = Test::MockModule->new('PVE::ReplicationConfig');
+my $pve_replication_config_module = Test::MockModule->new('PVE::ReplicationConfig');
+my $pve_replication_state_module = Test::MockModule->new('PVE::ReplicationState');
our $mocked_vm_configs = {};
my $pve_lxc_config_module = Test::MockModule->new('PVE::LXC::Config');
-my $mocked_replication_config = sub {
+my $mocked_replication_config_new = sub {
my $res = clone($mocked_replication_jobs);
};
sub setup {
- $pve_replication_module->mock(job_logfile_name => $mocked_job_logfile_name);
+ $pve_replication_state_module->mock(job_logfile_name => $mocked_job_logfile_name);
$pve_replication_module->mock(get_log_time => $mocked_get_log_time);
$pve_storage_module->mock(config => sub { return $mocked_storage_config; });
$pve_storage_module->mock(volume_snapshot => $mocked_volume_snapshot);
$pve_storage_module->mock(volume_snapshot_delete => $mocked_volume_snapshot_delete);
- $pve_replicationconfig->mock(new => $mocked_replication_config);
+ $pve_replication_config_module->mock(new => $mocked_replication_config_new);
$pve_qemuserver_module->mock(check_running => sub { return 0; });
$pve_qemuconfig_module->mock(load_config => $mocked_qemu_load_conf);
};
if (!$status) {
- $status = PVE::Replication::job_status();
+ $status = PVE::ReplicationState::job_status();
foreach my $jobid (sort keys %$status) {
my $jobcfg = $status->{$jobid};
$logmsg->("$ctime $jobid: new job next_sync => $jobcfg->{next_sync}");
PVE::API2::Replication::run_jobs($ctime, $logmsg);
- my $new = PVE::Replication::job_status();
+ my $new = PVE::ReplicationState::job_status();
# detect removed jobs
foreach my $jobid (sort keys %$status) {