]> git.proxmox.com Git - pve-storage.git/commitdiff
This patch will include storage asynchronous replication.
authorWolfgang Link <w.link@proxmox.com>
Mon, 24 Apr 2017 15:15:31 +0000 (17:15 +0200)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Fri, 28 Apr 2017 08:05:27 +0000 (10:05 +0200)
It is possible to synchronise a volume to an other node in a defined interval.
So if a node fail there will be an copy of the volumes from a VM
on an other node.
With this copy it is possible to start the VM on this node.

Makefile
PVE/API2/Makefile
PVE/API2/StorageReplication.pm [new file with mode: 0644]
PVE/CLI/Makefile
PVE/CLI/pvesr.pm [new file with mode: 0644]
PVE/Makefile
PVE/ReplicationTools.pm [new file with mode: 0644]
pvesr [new file with mode: 0644]

index 594749a5eca9b751b0a36fb75e0908a112c9602a..0d80ce53bf426e8908d79b60b245cb8271196d8f 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -33,15 +33,23 @@ pvesm.bash-completion:
        perl -I. -T -e "use PVE::CLI::pvesm; PVE::CLI::pvesm->generate_bash_completions();" >$@.tmp
        mv $@.tmp $@
 
+pvesr.bash-completion:
+       perl -I. -T -e "use PVE::CLI::pvesr; PVE::CLI::pvesr->generate_bash_completions();" >$@.tmp
+       mv $@.tmp $@
+
 .PHONY: install
-install: pvesm.1 pvesm.bash-completion
+install: pvesm.1 pvesm.bash-completion pvesr.bash-completion
        install -d ${DESTDIR}${SBINDIR}
        install -m 0755 pvesm ${DESTDIR}${SBINDIR}
+       install -m 0755 pvesr ${DESTDIR}${SBINDIR}
        make -C PVE install
+       install -d ${DESTDIR}/var/lib/pve-replica
        install -d ${DESTDIR}/usr/share/man/man1
        install -m 0644 pvesm.1 ${DESTDIR}/usr/share/man/man1/
        gzip -9 -n ${DESTDIR}/usr/share/man/man1/pvesm.1
        install -m 0644 -D pvesm.bash-completion ${DESTDIR}${BASHCOMPLDIR}/pvesm
+       install -m 0644 -D pvesr.bash-completion ${DESTDIR}${BASHCOMPLDIR}/pverepm
+
 
 .PHONY: deb
 deb: ${DEB}
@@ -65,7 +73,7 @@ ${DEB}:
 .PHONY: clean
 clean:
        make cleanup-docgen
-       rm -rf debian *.deb ${PACKAGE}-*.tar.gz dist *.1 *.tmp pvesm.bash-completion
+       rm -rf debian *.deb ${PACKAGE}-*.tar.gz dist *.1 *.tmp pvesm.bash-completion pvesr.bash-completion
        find . -name '*~' -exec rm {} ';'
 
 .PHONY: distclean
index 7b7226e25592bb5d1f7bdb8183a9660681d6e6e2..621221d1fa7b1b1500fb5175f1e6e4fe5cee596d 100644 (file)
@@ -3,4 +3,5 @@
 .PHONY: install
 install:
        install -D -m 0644 Disks.pm ${DESTDIR}${PERLDIR}/PVE/API2/Disks.pm
+       install -D -m 0644 StorageReplication.pm ${DESTDIR}${PERLDIR}/PVE/API2/StorageReplication.pm
        make -C Storage install
diff --git a/PVE/API2/StorageReplication.pm b/PVE/API2/StorageReplication.pm
new file mode 100644 (file)
index 0000000..d229197
--- /dev/null
@@ -0,0 +1,49 @@
+package PVE::API2::StorageReplication;
+
+use warnings;
+use strict;
+
+use PVE::JSONSchema qw(get_standard_option);
+use PVE::ReplicationTools;
+
+use PVE::RESTHandler;
+
+use base qw(PVE::RESTHandler);
+
+__PACKAGE__->register_method ({
+    name => 'list',
+    path => 'list',
+    method => 'GET',
+    description => "List of all replication jobs.",
+    permissions => {
+       user => 'all',
+    },
+    protected => 1,
+    proxyto => 'node',
+    parameters => {
+       additionalProperties => 0,
+       properties => {
+           node => get_standard_option('pve-node'),
+           nodes => get_standard_option('pve-node-list' ,
+                                        {description => "Notes where the jobs is located.",
+                                         optional => 1}),
+       },
+    },
+    returns => { type => 'object' },
+    code => sub {
+       my ($param) = @_;
+
+       if ($param->{nodes}) {
+           foreach my $node (PVE::Tools::split_list($param->{nodes})) {
+               die "Node: $node does not exists.\n" if
+                   !PVE::Cluster::check_node_exists($node);
+           }
+       }
+
+       my $nodes = $param->{nodes} ?
+           $param->{nodes} : $param->{node};
+
+       return PVE::ReplicationTools::get_all_jobs($nodes);
+}});
+
+1;
index 6c6e2582723405c8413231ccea1640e354d1a59d..3d6f96d11942b8bcacd4732bf3f210d29a3268b7 100644 (file)
@@ -1,4 +1,4 @@
-SOURCES=pvesm.pm
+SOURCES=pvesm.pm pvesr.pm
 
 .PHONY: install
 install: ${SOURCES}
diff --git a/PVE/CLI/pvesr.pm b/PVE/CLI/pvesr.pm
new file mode 100644 (file)
index 0000000..6f3712c
--- /dev/null
@@ -0,0 +1,220 @@
+package PVE::CLI::pvesr;
+
+use strict;
+use warnings;
+
+use PVE::API2::StorageReplication;
+use PVE::JSONSchema qw(get_standard_option);
+use PVE::INotify;
+use PVE::RPCEnvironment;
+use PVE::Tools qw(extract_param);
+use PVE::SafeSyslog;
+use PVE::CLIHandler;
+use POSIX qw(strftime);
+
+use base qw(PVE::CLIHandler);
+
+my $nodename = PVE::INotify::nodename();
+
+sub setup_environment {
+    PVE::RPCEnvironment->setup_default_cli_env();
+}
+
+my $print_list = sub {
+    my ($conf, $json) = @_;
+
+    if (defined($json)) {
+       print JSON::encode_json($conf);
+    } else {
+       printf("%-10s%-20s%-20s%-5s%-10s%-5s\n",
+              "VMID", "DEST", "LAST SYNC","IVAL", "STATE", "FAIL");
+
+       foreach my $vmid (sort keys %$conf) {
+           my $job = $conf->{$vmid};
+           my $timestr = strftime("%Y-%m-%d_%H:%M:%S", localtime($job->{lastsync}));
+
+           printf("%-9s ", $vmid);
+           printf("%-19s ", $job->{tnode});
+           printf("%-19s ", $timestr);
+           printf("%-4s ", $job->{interval});
+           printf("%-9s ", $job->{state});
+           printf("%-9s\n", $job->{fail});
+       }
+    }
+
+};
+
+sub set_list {
+    my ($list, $synctime, $vmid) = @_;
+
+    if (defined($list->{$synctime})) {
+       $list = set_list($list,$synctime+1, $vmid);
+    } else {
+       $list->{$synctime} = $vmid;
+    }
+    return $list;
+}
+
+my $get_replica_list = sub {
+
+    my $jobs = PVE::ReplicationTools::read_state();
+    my $list = {};
+
+    foreach my $vmid (keys %$jobs) {
+       my $job = $jobs->{$vmid};
+       my $lastsync = $job->{lastsync};
+
+       # interval in min
+       my $interval = $job->{interval};
+       my $now = time();
+       my $fail = $job->{fail};
+
+       my $synctime = $lastsync + $interval * 60;
+
+       if ($now >= $synctime && $job->{state} eq 'ok') {
+           $list = set_list($list, $synctime, $vmid);
+       } elsif ($job->{state} eq 'sync') {
+
+           my $synctime += $interval * ($job->{fail}+1);
+           $list = set_list($list, $synctime, $vmid)
+               if ($now >= $synctime);
+
+       }
+    }
+
+    return $list;
+};
+
+my $replicate_vms  = sub {
+    my ($list) = @_;
+
+    my @sorted_times = reverse sort keys %$list;
+
+    foreach my $synctime (@sorted_times) {
+       eval {
+           PVE::ReplicationTools::sync_guest($list->{$synctime});
+       };
+       if (my $err = $@) {
+           syslog ('err', $err );
+       }
+    }
+};
+
+__PACKAGE__->register_method ({
+    name => 'run',
+    path => 'run',
+    method => 'POST',
+    description => "This method will run by the systemd-timer and sync all jobs",
+    permissions => {
+       description => {
+           check => ['perm', '/', [ 'Sys.Console' ]],
+       },
+    },
+    protected => 1,
+    parameters => {
+       additionalProperties => 0,
+       properties => {
+       },
+    },
+    returns => { type => 'null' },
+    code => sub {
+
+       my $list = &$get_replica_list();
+       &$replicate_vms($list);
+
+       return undef;
+    }});
+
+__PACKAGE__->register_method ({
+    name => 'destroyjob',
+    path => 'destroyjob',
+    method => 'DELETE',
+    description => "Destroy an async replication job",
+    permissions => {
+       description => {
+           check => ['perm', '/storage', ['Datastore.Allocate']],
+       },
+    },
+    protected => 1,
+    parameters => {
+       additionalProperties => 0,
+       properties => {
+           vmid => {
+               type => 'string', format => 'pve-vmid',
+               description => "The VMID of the guest.",
+               completion => \&PVE::Cluster::complete_local_vmid,
+           },
+       },
+    },
+    returns => { type => 'null' },
+    code => sub {
+       my ($param) = @_;
+
+       my $vmid = extract_param($param, 'vmid');
+
+       PVE::ReplicationTools::destroy_replica($vmid);
+
+    }});
+
+__PACKAGE__->register_method ({
+    name => 'list',
+    path => 'list',
+    method => 'GET',
+    description => "List of all replication jobs.",
+    permissions => {
+       user => 'all'
+    },
+    protected => 1,
+    parameters => {
+       additionalProperties => 0,
+       properties => {
+           vmid => {
+               type => 'string', format => 'pve-vmid',
+               description => "The VMID of the guest.",
+               completion => \&PVE::Cluster::complete_local_vmid,
+           },
+       },
+    },
+    protected => 1,
+    proxyto => 'node',
+    parameters => {
+       additionalProperties => 0,
+       properties => {
+           node => get_standard_option('pve-node'),
+           nodes => get_standard_option('pve-node-list' ,
+                                        {description => "Notes where the jobs is located.",
+                                         optional => 1}),
+           json => {
+               optional => 1,
+               type => 'boolean',
+               description => "Output in JSON format.",
+           },
+       },
+    },
+    returns => { type => 'string' },
+    code => sub {
+       my ($param) = @_;
+
+       if ($param->{nodes}) {
+           foreach my $node (PVE::Tools::split_list($param->{nodes})) {
+               die "Node: $node does not exists.\n" if
+                   !PVE::Cluster::check_node_exists($node);
+           }
+       }
+
+       my $nodes = $param->{nodes} ?
+           $param->{nodes} : $param->{node};
+
+       my $list = PVE::ReplicationTools::get_all_jobs($nodes);
+
+       &$print_list($list, $param->{json});
+}});
+
+
+our $cmddef = {
+    list => [  __PACKAGE__ , 'list' , [],  {node => $nodename}],
+    run => [ __PACKAGE__ , 'run'],
+    destroyjob => [ __PACKAGE__ , 'destroyjob', ['vmid']],
+};
+
+1;
index ae2bd35efce19e0d1dc1a0e8b73ad00679502b64..c4075d14013cda7d0e4bd52d07f2e268473d85b7 100644 (file)
@@ -3,6 +3,7 @@
 .PHONY: install
 install:
        install -D -m 0644 Storage.pm ${DESTDIR}${PERLDIR}/PVE/Storage.pm
+       install -D -m 0644 ReplicationTools.pm ${DESTDIR}${PERLDIR}/PVE/ReplicationTools.pm
        install -D -m 0644 Diskmanage.pm ${DESTDIR}${PERLDIR}/PVE/Diskmanage.pm
        make -C Storage install
        make -C API2 install
diff --git a/PVE/ReplicationTools.pm b/PVE/ReplicationTools.pm
new file mode 100644 (file)
index 0000000..bd85552
--- /dev/null
@@ -0,0 +1,578 @@
+package PVE::ReplicationTools;
+
+use warnings;
+use strict;
+
+use PVE::Tools qw(run_command);
+use PVE::Cluster;
+use PVE::QemuConfig;
+use PVE::LXC::Config;
+use PVE::LXC;
+use PVE::Storage;
+use Time::Local;
+use JSON;
+use Data::Dumper qw(Dumper);
+
+my $STATE_DIR = '/var/lib/pve-replica';
+my $STATE_FILE = "/pve-replica.state";
+my $STATE_PATH = $STATE_DIR.$STATE_FILE;
+
+PVE::Cluster::cfs_update;
+my $local_node = PVE::INotify::nodename();
+
+my $cluster_nodes;
+
+my $get_guestconfig = sub {
+    my ($vmid) = @_;
+
+    my $vms = PVE::Cluster::get_vmlist();
+
+    my $type = $vms->{ids}->{$vmid}->{type};
+
+    my $guestconf;
+    my $running;
+
+    if ($type =~ m/^qemu$/) {
+       $guestconf = PVE::QemuConfig->load_config($vmid);
+       $running = PVE::QemuServer::check_running($vmid);
+    } elsif ($type =~ m/^lxc$/) {
+       $guestconf = PVE::LXC::Config->load_config($vmid);
+       $running = PVE::LXC::check_running($vmid);
+    }
+
+    return ($guestconf, $type, $running);
+};
+
+sub write_state {
+    my ($state) = @_;
+
+    mkdir $STATE_DIR;
+
+    PVE::Tools::file_set_contents($STATE_PATH, JSON::encode_json($state));
+}
+
+sub read_state {
+
+    return {} if !(-e $STATE_PATH);
+
+    my $raw = PVE::Tools::file_get_contents($STATE_PATH);
+
+    return {} if $raw eq '';
+    return JSON::decode_json($raw);
+}
+
+sub get_node_ip {
+    my ($nodename) = @_;
+
+    my $remoteip = PVE::Cluster::remote_node_ip($nodename, 1);
+
+    my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
+    if (my $network = $dc_conf->{storage_replication_network}) {
+
+       my $cmd = ['ssh', '-o', 'Batchmode=yes', "root\@$remoteip", '--'
+                  ,'pvecm', 'mtunnel', '--get_migration_ip',
+                  '--migration_network', $network];
+
+       PVE::Tools::run_command($cmd, outfunc => sub {
+           my $line = shift;
+
+           if ($line =~ m/^ip: '($PVE::Tools::IPRE)'$/) {
+               $remoteip = $1;
+           }
+       });
+    }
+    return $remoteip;
+}
+
+sub get_all_jobs {
+    my ($nodes) = @_;
+
+    my @nodelist = PVE::Tools::split_list($nodes);
+
+    my $vms = PVE::Cluster::get_vmlist();
+    my $state = read_state();
+    my $jobs = {};
+
+    my $outfunc = sub {
+       my $line = shift;
+
+       my $remote_jobs = JSON::decode_json($line);
+       foreach my $vmid (keys %$remote_jobs) {
+           $jobs->{$vmid} = $remote_jobs->{$vmid};
+       }
+    };
+
+    foreach my $node (@nodelist) {
+       if ($local_node ne $node) {
+
+           my $ip = get_node_ip($node);
+           $ip = [$ip] if Net::IP::ip_is_ipv6($ip);
+
+           my @cmd = ('ssh', '-o', 'Batchmode=yes', "root\@$ip", '--',
+                      'pvesr', 'list', '--json');
+
+           run_command([@cmd], outfunc=>$outfunc)
+
+       } else {
+
+           foreach my $vmid (keys %{$vms->{ids}}) {
+
+               next if !($vms->{ids}->{$vmid}->{node} eq $local_node);
+               next if !defined($state->{$vmid});
+               my $vm_state = $state->{$vmid};
+               my $job = {};
+
+               $job->{limit}    = $vm_state->{limit};
+               $job->{interval} = $vm_state->{interval};
+               $job->{tnode}    = $vm_state->{tnode};
+               $job->{lastsync} = $vm_state->{lastsync};
+               $job->{state}    = $vm_state->{state};
+               $job->{fail}     = $vm_state->{fail};
+
+               $jobs->{$vmid}   = $job;
+           }
+
+       }
+    }
+
+    return ($jobs);
+}
+
+sub sync_guest {
+    my ($vmid, $param) = @_;
+
+    my $jobs = read_state();
+    $jobs->{$vmid}->{state} = 'sync';
+    write_state($jobs);
+
+    my ($guest_conf, $vm_type, $running) = &$get_guestconfig($vmid);
+    my $qga = 0;
+
+    my $job = $jobs->{$vmid};
+    my $tnode = $job->{tnode};
+
+    if ($vm_type eq "qemu" && defined($guest_conf->{agent}) ) {
+       $qga = PVE::QemuServer::qga_check_running($vmid)
+           if $running;
+    }
+
+    # will not die if a disk is not syncable
+    my $disks = get_syncable_guestdisks($guest_conf, $vm_type);
+
+    # check if all nodes have the storage availible
+    my $storage_config = PVE::Storage::config();
+    foreach my $volid (keys  %$disks) {
+       my ($storeid) = PVE::Storage::parse_volume_id($volid);
+
+       my $store = $storage_config->{ids}->{$storeid};
+       die "Storage not availible on node: $tnode\n"
+           if $store->{nodes}  && !$store->{nodes}->{$tnode};
+       die "Storage not availible on node: $local_node\n"
+           if $store->{nodes} && !$store->{nodes}->{$local_node};
+
+    }
+
+    my $limit = $param->{limit};
+    $limit = $guest_conf->{replica_rate_limit}
+       if (!defined($limit));
+
+    my $snap_time = time();
+
+    die "Invalid synctime format: $job->{lastsync}."
+       if $job->{lastsync} !~ m/^(\d+)$/;
+
+    my $lastsync = $1;
+    my $incremental_snap = $lastsync ? "replica_$lastsync" : undef;
+
+    # freeze filesystem for data consistency
+    if ($qga) {
+       print "Freeze guest filesystem\n";
+
+       eval {
+           PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-freeze");
+       };
+    }
+
+    my $snapname = "replica_$snap_time";
+
+    my $disks_status = { snapname => $snapname };
+
+    my $sync_job = sub {
+
+       # make snapshot of all volumes
+       foreach my $volid (keys %$disks) {
+
+           eval {
+               PVE::Storage::volume_snapshot($storage_config, $volid, $snapname);
+           };
+
+           if (my $err = $@) {
+               if ($qga) {
+                   print "Unfreeze guest filesystem\n";
+                   eval {
+                       PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw");
+                   };
+                   warn $@ if $@;
+               }
+               cleanup_snapshot($disks_status, $snapname, $storage_config, $running);
+               $jobs->{$vmid}->{state} = 'error';
+               write_state($jobs);
+
+               die $err;
+           }
+
+           $disks_status->{$volid}->{snapshot} = 1;
+       }
+
+       if ($qga) {
+           print "Unfreeze guest filesystem\n";
+           eval { PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw"); };
+           warn $@ if $@;
+       }
+
+       my $ip = get_node_ip($tnode);
+
+       foreach my $volid (keys %$disks) {
+
+           eval {
+               PVE::Storage::volume_send($storage_config, $volid, $snapname,
+                                         $ip, $incremental_snap,
+                                         $param->{verbose}, $limit);
+               $job->{fail} = 0;
+           };
+
+           if (my $err = $@) {
+               cleanup_snapshot($disks_status, $snapname, $storage_config, $running, $ip);
+               $job->{fail}++;
+               $job->{state} = 'error' if $job->{fail} > 3;
+
+               $jobs->{$vmid} = $job;
+               write_state($jobs);
+               die "$err";
+           }
+
+           $disks_status->{$volid}->{synced} = 1;
+       }
+
+       # delet old snapshot if exists
+       cleanup_snapshot($disks_status, $snapname, $storage_config, $running, $ip, $lastsync) if
+           $job->{lastsync} ne '0';
+
+       $job->{lastsync} = $snap_time;
+       $job->{state} = "ok";
+       $jobs->{$vmid} = $job;
+       write_state($jobs);
+    };
+
+    PVE::Tools::lock_file_full($STATE_PATH, 60, 0 , $sync_job);
+    die $@ if $@;
+
+    return $snap_time;
+}
+
+sub get_snapshots {
+    my ($vol, $prefix, $nodes) = @_;
+
+    my $plugin = $vol->{plugin};
+    return $plugin->get_snapshots($vol, $prefix, $nodes);
+}
+
+sub send_image {
+    my ($vol, $param, $ip, $all_snaps_in_delta, $alter_path) = @_;
+
+    my $plugin = $vol->{plugin};
+    $plugin->send_image($vol, $param, $ip, $all_snaps_in_delta, $alter_path);
+}
+
+sub job_enable {
+    my ($vmid, $no_sync, $target) = @_;
+
+    my $update_state = sub {
+       my ($state) = @_;
+
+       my $jobs = read_state();
+       my $job = $jobs->{$vmid};
+       my ($config) = &$get_guestconfig($vmid);
+       my $param = {};
+
+       $job->{interval} = $config->{replica_interval} || 15;
+
+       $job->{tnode} = $target || $config->{replica_target};
+       die "Replica Target must be set\n" if !defined($job->{tnode});
+
+       die "Target and source Node can't be the same\n"
+           if $job->{tnode} eq $local_node;
+
+       $job->{fail} = 0;
+       if (!defined($job->{lastsync})) {
+
+           if ( my $lastsync = get_lastsync($vmid)) {
+               $job->{lastsync} = $lastsync;
+           } else {
+               $job->{lastsync} = 0;
+           }
+       }
+
+       $param->{verbose} = 1;
+
+       $job->{state} = 'ok';
+       $jobs->{$vmid} = $job;
+       write_state($jobs);
+
+       eval{
+           sync_guest($vmid, $param) if !defined($no_sync);
+       };
+       if (my $err = $@) {
+           $jobs->{$vmid}->{state} = 'error';
+           write_state($jobs);
+           die $err;
+       }
+    };
+
+    PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
+    die $@ if $@;
+}
+
+sub job_disable {
+    my ($vmid) = @_;
+
+    my $update_state = sub {
+
+       my $jobs = read_state();
+
+       if (defined($jobs->{$vmid})) {
+           $jobs->{$vmid}->{state} = 'off';
+           write_state($jobs);
+       } else {
+           print "No replica service for $vmid\n";
+       }
+    };
+
+    PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
+    die $@ if $@;
+}
+
+sub job_remove {
+    my ($vmid) = @_;
+
+    my $update_state = sub {
+
+       my $jobs = read_state();
+
+       if (defined($jobs->{$vmid})) {
+           delete($jobs->{$vmid});
+           write_state($jobs);
+       } else {
+           print "No replica service for $vmid\n";
+       }
+    };
+
+    PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
+    die $@ if $@;
+}
+
+sub get_syncable_guestdisks {
+    my ($config, $vm_type, $running, $noerr) = @_;
+
+    my $syncable_disks = {};
+
+    my $cfg = PVE::Storage::config();
+
+    my $warnings = 0;
+    my $func = sub {
+       my ($id, $volume) = @_;
+
+       my $volname;
+       if ($vm_type eq 'qemu') {
+           $volname = $volume->{file};
+       } else {
+           $volname = $volume->{volume};
+       }
+
+       if( PVE::Storage::volume_has_feature($cfg, 'replicate', $volname , undef, $running)) {
+           $syncable_disks->{$volname} = 1;
+       } else {
+           warn "Can't sync Volume: $volname\n"
+               if !$noerr &&
+                  (!defined($volume->{replica}) || $volume->{replica});
+           $warnings = 1;
+       }
+    };
+
+    if ($vm_type eq 'qemu') {
+       PVE::QemuServer::foreach_drive($config, $func);
+    } elsif ($vm_type eq 'lxc') {
+       PVE::LXC::Config->foreach_mountpoint($config, $func);
+    } else {
+       die "Unknown VM Type: $vm_type";
+    }
+
+    return wantarray ? ($warnings, $syncable_disks) : $syncable_disks;
+}
+
+sub destroy_all_snapshots {
+    my ($vmid, $regex, $node) = @_;
+
+    my $ip = defined($node) ? get_node_ip($node) : undef;
+
+    my ($guest_conf, $vm_type, $running) = &$get_guestconfig($vmid);
+
+    my $disks = get_syncable_guestdisks($guest_conf, $vm_type);
+    my $cfg = PVE::Storage::config();
+
+    my $snapshots = {};
+    foreach my $volid (keys %$disks) {
+       $snapshots->{$volid} =
+           PVE::Storage::volume_snapshot_list($cfg, $volid, $regex, $node, $ip);
+    }
+
+    foreach my $volid (keys %$snapshots) {
+
+       if (defined($regex)) {
+           foreach my $snap (@{$snapshots->{$volid}}) {
+               if ($ip) {
+                   PVE::Storage::volume_snapshot_delete_remote($cfg, $volid, $snap, $ip);
+               } else {
+                   PVE::Storage::volume_snapshot_delete($cfg, $volid, $snap, $running);
+               }
+           }
+       } else {
+           if ($ip) {
+
+               my $cmd = ['ssh', '-o', 'Batchmode=yes', "root\@$ip", '--'
+                  ,'pvesm', 'free', $volid];
+               PVE::Tools::run_command($cmd);
+           } else {
+               PVE::Storage::vdisk_free($cfg, $volid);
+           }
+       }
+    }
+
+}
+
+sub cleanup_snapshot {
+    my ($disks, $snapname, $cfg, $running, $ip, $lastsync_snap) = @_;
+
+    if ($lastsync_snap) {
+       $snapname = "replica_$lastsync_snap";
+    }
+
+    foreach my $volid (keys %$disks) {
+       next if $volid eq "snapname";
+
+       if (defined($lastsync_snap) || $disks->{$volid}->{synced}) {
+           PVE::Storage::volume_snapshot_delete_remote($cfg, $volid, $snapname, $ip);
+       }
+
+       if (defined($lastsync_snap) || $disks->{$volid}->{snapshot}) {
+           PVE::Storage::volume_snapshot_delete($cfg, $volid, $snapname, $running);
+       }
+    }
+}
+
+sub destroy_replica {
+    my ($vmid) = @_;
+
+    my $code = sub {
+
+       my $jobs = read_state();
+
+       return if !defined($jobs->{$vmid});
+
+       my ($guest_conf, $vm_type) = &$get_guestconfig($vmid);
+
+       destroy_all_snapshots($vmid, 'replica');
+       destroy_all_snapshots($vmid, undef, $guest_conf->{replica_target});
+
+       delete($jobs->{$vmid});
+
+       delete($guest_conf->{replica_rate_limit});
+       delete($guest_conf->{replica_rate_interval});
+       delete($guest_conf->{replica_target});
+       delete($guest_conf->{replica});
+
+       if ($vm_type eq 'qemu') {
+           PVE::QemuConfig->write_config($vmid, $guest_conf);
+       } else {
+           PVE::LXC::Config->write_config($vmid, $guest_conf);
+       }
+       write_state($jobs);
+    };
+
+    PVE::Tools::lock_file_full($STATE_PATH, 30, 0 , $code);
+    die $@ if $@;
+}
+
+sub get_lastsync {
+    my ($vmid) = @_;
+
+    my ($conf, $vm_type) = &$get_guestconfig($vmid);
+
+    my $sync_vol = get_syncable_guestdisks($conf, $vm_type);
+    my $cfg = PVE::Storage::config();
+
+    my $time;
+    foreach my $volid (keys %$sync_vol) {
+       my $list =
+           PVE::Storage::volume_snapshot_list($cfg, $volid, 'replica', $local_node);
+
+       if (my $tmp_snap = shift @$list) {
+           $tmp_snap =~ m/^replica_(\d+)$/;
+           die "snapshots are not coherent\n"
+               if defined($time) && !($time eq $1);
+           $time = $1;
+       }
+    }
+
+    return $time;
+}
+
+sub get_last_replica_snap {
+    my ($volid) = @_;
+
+    my $cfg = PVE::Storage::config();
+    my $list = PVE::Storage::volume_snapshot_list($cfg, $volid, 'replica_', $local_node);
+
+    return shift @$list;
+}
+
+sub check_guest_volumes_syncable {
+    my ($conf, $vm_type) = @_;
+
+    my ($warnings, $disks) = get_syncable_guestdisks($conf, $vm_type, 1);
+
+    return undef if $warnings || !%$disks;
+
+    return 1;
+}
+
+sub update_conf {
+    my ($vmid, $key, $value) = @_;
+
+    if ($key eq 'replica_target') {
+           destroy_replica($vmid);
+           job_enable($vmid, undef, $value);
+           return;
+    }
+
+    my $update = sub {
+       my $jobs = read_state();
+
+       return if !defined($jobs->{$vmid});
+
+       if ($key eq 'replica_interval') {
+           $jobs->{$vmid}->{interval} = $value || 15;
+       } elsif ($key eq 'replica_rate_limit'){
+               $jobs->{$vmid}->{limit} = $value ||
+                   delet $jobs->{$vmid}->{limit};
+       }  else {
+           die "Config parameter: $key not known";
+       }
+
+       write_state($jobs);
+    };
+
+    PVE::Tools::lock_file_full($STATE_PATH, 60, 0 , $update);
+}
+
+1;
diff --git a/pvesr b/pvesr
new file mode 100644 (file)
index 0000000..ffcf84c
--- /dev/null
+++ b/pvesr
@@ -0,0 +1,8 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+
+use PVE::CLI::pvesr;
+
+PVE::CLI::pvesr->run_cli_handler();