use strict;
use warnings;
-use Data::Dumper qw(Dumper);
+
use Fcntl qw(:flock SEEK_END);
use Getopt::Long qw(GetOptionsFromArray);
-use File::Copy qw(move);
use File::Path qw(make_path);
use JSON;
use IO::File;
my $PVE_DIR = "/etc/pve/local";
my $QEMU_CONF = "${PVE_DIR}/qemu-server";
my $LXC_CONF = "${PVE_DIR}/lxc";
-my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
my $PROG_PATH = "$PATH/${PROGNAME}";
my $INTERVAL = 15;
-my $DEBUG = 0;
+my $DEBUG;
+
+BEGIN {
+ $DEBUG = 0; # change default here. not above on declaration!
+ $DEBUG ||= $ENV{ZSYNC_DEBUG};
+ if ($DEBUG) {
+ require Data::Dumper;
+ Data::Dumper->import();
+ }
+}
my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
# targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
+my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|mp)\d+)|rootfs): /;
+
my $command = $ARGV[0];
if (defined($command) && $command ne 'help' && $command ne 'printpod') {
check_bin ('scp');
}
-$SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =
- sub {
- die "Signal aborting sync\n";
- };
+$SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
+ die "Signaled, aborting sync: $!\n";
+};
sub check_bin {
my ($bin) = @_;
return "$head/" . $path . "/$tail";
}
-sub lock {
- my ($fh) = @_;
- flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
-}
+sub locked {
+ my ($lock_fn, $code) = @_;
+
+ my $lock_fh = IO::File->new("> $lock_fn");
+
+ flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
+ my $res = eval { $code->() };
+ my $err = $@;
+
+ flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
+ die "$err" if $err;
-sub unlock {
- my ($fh) = @_;
- flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
+ close($lock_fh);
+ return $res;
}
sub get_status {
source_user => undef,
dest_user => undef,
properties => undef,
+ dest_config_path => undef,
};
my ($ret) = GetOptionsFromArray(
'source-user=s' => \$param->{source_user},
'dest-user=s' => \$param->{dest_user},
'properties' => \$param->{properties},
+ 'dest-config-path=s' => \$param->{dest_config_path},
);
die "can't parse options\n" if $ret == 0;
$job->{source_user} = $param->{source_user};
$job->{dest_user} = $param->{dest_user};
$job->{properties} = !!$param->{properties};
+ $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
return $job;
}
$in_fh = IO::File->new("< $STATE");
die "Could not open file $STATE: $!\n" if !$in_fh;
- lock($in_fh);
$text = <$in_fh>;
};
print $out_fh $text;
close($out_fh);
- move("$STATE.new", $STATE);
+ rename "$STATE.new", $STATE;
eval {
close($in_fh);
};
my $fh = IO::File->new("< $CRONJOBS");
die "Could not open file $CRONJOBS: $!\n" if !$fh;
- lock($fh);
my @test = <$fh>;
die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
close ($new_fh);
- die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
+ die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
close ($fh);
}
$text .= " --source-user $job->{source_user}";
$text .= " --dest-user $job->{dest_user}";
$text .= " --properties" if $job->{properties};
+ $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
$text .= "\n";
return $text;
sub init {
my ($param) = @_;
- my $cfg = read_cron();
+ locked("$CONFIG_PATH/cron_and_state.lock", sub {
+ my $cfg = read_cron();
- my $job = param_to_job($param);
+ my $job = param_to_job($param);
- $job->{state} = "ok";
- $job->{lsync} = 0;
+ $job->{state} = "ok";
+ $job->{lsync} = 0;
- my $source = parse_target($param->{source});
- my $dest = parse_target($param->{dest});
+ my $source = parse_target($param->{source});
+ my $dest = parse_target($param->{dest});
- if (my $ip = $dest->{ip}) {
- run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
- }
+ if (my $ip = $dest->{ip}) {
+ run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
+ }
- if (my $ip = $source->{ip}) {
- run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
- }
+ if (my $ip = $source->{ip}) {
+ run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
+ }
- die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
+ die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
- if (!defined($source->{vmid})) {
- die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
- }
+ if (!defined($source->{vmid})) {
+ die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
+ }
- my $vm_type = vm_exists($source, $param->{source_user});
- $job->{vm_type} = $vm_type;
- $source->{vm_type} = $vm_type;
+ my $vm_type = vm_exists($source, $param->{source_user});
+ $job->{vm_type} = $vm_type;
+ $source->{vm_type} = $vm_type;
- die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
+ die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
- die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
+ die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
- #check if vm has zfs disks if not die;
- get_disks($source, $param->{source_user}) if $source->{vmid};
+ #check if vm has zfs disks if not die;
+ get_disks($source, $param->{source_user}) if $source->{vmid};
- update_cron($job);
- update_state($job);
+ update_cron($job);
+ update_state($job);
+ }); #cron and state lock
- eval {
- sync($param) if !$param->{skip};
- };
- if(my $err = $@) {
+ return if $param->{skip};
+
+ eval { sync($param) };
+ if (my $err = $@) {
destroy_job($param);
print $err;
}
sub destroy_job {
my ($param) = @_;
- my $job = get_job($param);
- $job->{state} = "del";
+ locked("$CONFIG_PATH/cron_and_state.lock", sub {
+ my $job = get_job($param);
+ $job->{state} = "del";
- update_cron($job);
- update_state($job);
+ update_cron($job);
+ update_state($job);
+ });
}
sub sync {
my ($param) = @_;
- my $lock_fh = IO::File->new("> $LOCKFILE");
- die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
- lock($lock_fh);
-
- my $date = get_date();
my $job;
- eval {
- $job = get_job($param);
- };
- if ($job && defined($job->{state}) && $job->{state} eq "syncing") {
- die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
- }
+ locked("$CONFIG_PATH/cron_and_state.lock", sub {
+ eval { $job = get_job($param) };
- my $dest = parse_target($param->{dest});
- my $source = parse_target($param->{source});
+ if ($job) {
+ if (defined($job->{state}) && ($job->{state} eq "syncing" || $job->{state} eq "waiting")) {
+ die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
+ }
- my $sync_path = sub {
- my ($source, $dest, $job, $param, $date) = @_;
+ $job->{state} = "waiting";
+ update_state($job);
+ }
+ });
- ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
+ locked("$CONFIG_PATH/sync.lock", sub {
- snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
+ my $date = get_date();
- send_image($source, $dest, $param);
+ my $dest;
+ my $source;
+ my $vm_type;
- snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
+ locked("$CONFIG_PATH/cron_and_state.lock", sub {
+ #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
+ eval { $job = get_job($param); };
- };
+ if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
+ die "Job --source $param->{source} --name $param->{name} has been disabled\n";
+ }
- my $vm_type = vm_exists($source, $param->{source_user});
- $source->{vm_type} = $vm_type;
+ $dest = parse_target($param->{dest});
+ $source = parse_target($param->{source});
- if ($job) {
- $job->{state} = "syncing";
- $job->{vm_type} = $vm_type if !$job->{vm_type};
- update_state($job);
- }
+ $vm_type = vm_exists($source, $param->{source_user});
+ $source->{vm_type} = $vm_type;
- eval{
- if ($source->{vmid}) {
- die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
- die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
- my $disks = get_disks($source, $param->{source_user});
-
- foreach my $disk (sort keys %{$disks}) {
- $source->{all} = $disks->{$disk}->{all};
- $source->{pool} = $disks->{$disk}->{pool};
- $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
- $source->{last_part} = $disks->{$disk}->{last_part};
- &$sync_path($source, $dest, $job, $param, $date);
+ if ($job) {
+ $job->{state} = "syncing";
+ $job->{vm_type} = $vm_type if !$job->{vm_type};
+ update_state($job);
}
- if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
- send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user});
+ }); #cron and state lock
+
+ my $sync_path = sub {
+ my ($source, $dest, $job, $param, $date) = @_;
+
+ ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
+
+ snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
+
+ send_image($source, $dest, $param);
+
+ snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
+
+ };
+
+ eval{
+ if ($source->{vmid}) {
+ die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
+ die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
+ my $disks = get_disks($source, $param->{source_user});
+
+ foreach my $disk (sort keys %{$disks}) {
+ $source->{all} = $disks->{$disk}->{all};
+ $source->{pool} = $disks->{$disk}->{pool};
+ $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
+ $source->{last_part} = $disks->{$disk}->{last_part};
+ &$sync_path($source, $dest, $job, $param, $date);
+ }
+ if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
+ send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
+ } else {
+ send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
+ }
} else {
- send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user});
+ &$sync_path($source, $dest, $job, $param, $date);
}
- } else {
- &$sync_path($source, $dest, $job, $param, $date);
- }
- };
- if(my $err = $@) {
- if ($job) {
- $job->{state} = "error";
- update_state($job);
- unlock($lock_fh);
- close($lock_fh);
+ };
+ if (my $err = $@) {
+ locked("$CONFIG_PATH/cron_and_state.lock", sub {
+ eval { $job = get_job($param); };
+ if ($job) {
+ $job->{state} = "error";
+ update_state($job);
+ }
+ });
print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
+ die "$err\n";
}
- die "$err\n";
- }
- if ($job) {
- $job->{state} = "ok";
- $job->{lsync} = $date;
- update_state($job);
- }
-
- unlock($lock_fh);
- close($lock_fh);
+ locked("$CONFIG_PATH/cron_and_state.lock", sub {
+ eval { $job = get_job($param); };
+ if ($job) {
+ if (defined($job->{state}) && $job->{state} eq "stopped") {
+ $job->{state} = "stopped";
+ } else {
+ $job->{state} = "ok";
+ }
+ $job->{lsync} = $date;
+ update_state($job);
+ }
+ });
+ }); #sync lock
}
sub snapshot_get{
my $line = $1;
next if $line =~ /media=cdrom/;
- next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
+ next if $line !~ m/$DISK_KEY_RE/;
#QEMU if backup is not set include in sync
next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
my $disk = undef;
my $stor = undef;
- if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
+ if($line =~ m/$DISK_KEY_RE(.*)$/) {
my @parameter = split(/,/,$1);
foreach my $opt (@parameter) {
sub send_config{
- my ($source, $dest, $method, $source_user, $dest_user) = @_;
+ my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
- my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
+ my $config_dir = $dest_config_path // $CONFIG_PATH;
+ $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
$dest_target_new = $config_dir.'/'.$dest_target_new;
sub enable_job {
my ($param) = @_;
- my $job = get_job($param);
- $job->{state} = "ok";
- update_state($job);
- update_cron($job);
+ locked("$CONFIG_PATH/cron_and_state.lock", sub {
+ my $job = get_job($param);
+ $job->{state} = "ok";
+ update_state($job);
+ update_cron($job);
+ });
}
sub disable_job {
my ($param) = @_;
- my $job = get_job($param);
- $job->{state} = "stopped";
- update_state($job);
- update_cron($job);
+ locked("$CONFIG_PATH/cron_and_state.lock", sub {
+ my $job = get_job($param);
+ $job->{state} = "stopped";
+ update_state($job);
+ update_cron($job);
+ });
}
my $cmd_help = {
-properties boolean
Include the dataset's properties in the stream.
+
+ -dest-config-path string
+
+ specify a custom config path on the destination target. default is /var/lib/pve-zsync
},
sync => qq{
$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
-properties boolean
Include the dataset's properties in the stream.
+
+ -dest-config-path string
+
+ specify a custom config path on the destination target. default is /var/lib/pve-zsync
},
list => qq{
$PROGNAME list