]> git.proxmox.com Git - pve-zsync.git/blobdiff - pve-zsync
remove unused function write_cron
[pve-zsync.git] / pve-zsync
index b9a6995efdf76ecc556a0f9a2ef5836798054a9b..881b9c85cbfb98c6cd8a0e0341a4482ba4ad07c5 100755 (executable)
--- a/pve-zsync
+++ b/pve-zsync
@@ -2,10 +2,9 @@
 
 use strict;
 use warnings;
-use Data::Dumper qw(Dumper);
+
 use Fcntl qw(:flock SEEK_END);
 use Getopt::Long qw(GetOptionsFromArray);
-use File::Copy qw(move);
 use File::Path qw(make_path);
 use JSON;
 use IO::File;
@@ -19,10 +18,18 @@ my $PATH = "/usr/sbin";
 my $PVE_DIR = "/etc/pve/local";
 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
 my $LXC_CONF = "${PVE_DIR}/lxc";
-my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
 my $PROG_PATH = "$PATH/${PROGNAME}";
 my $INTERVAL = 15;
-my $DEBUG = 0;
+my $DEBUG;
+
+BEGIN {
+    $DEBUG = 0; # change default here. not above on declaration!
+    $DEBUG ||= $ENV{ZSYNC_DEBUG};
+    if ($DEBUG) {
+       require Data::Dumper;
+       Data::Dumper->import();
+    }
+}
 
 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
@@ -46,6 +53,8 @@ my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])";       # ipv6 must always be in brac
 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
 
+my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|mp)\d+)|rootfs): /;
+
 my $command = $ARGV[0];
 
 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
@@ -55,10 +64,9 @@ if (defined($command) && $command ne 'help' && $command ne 'printpod') {
     check_bin ('scp');
 }
 
-$SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =
-    sub {
-       die "Signal aborting sync\n";
-    };
+$SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =  sub {
+    die "Signaled, aborting sync: $!\n";
+};
 
 sub check_bin {
     my ($bin)  = @_;
@@ -103,14 +111,20 @@ sub cut_target_width {
     return "$head/" . $path . "/$tail";
 }
 
-sub lock {
-    my ($fh) = @_;
-    flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
-}
+sub locked {
+    my ($lock_fn, $code) = @_;
+
+    my $lock_fh = IO::File->new("> $lock_fn");
 
-sub unlock {
-    my ($fh) = @_;
-    flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
+    flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
+    my $res = eval { $code->() };
+    my $err = $@;
+
+    flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
+    die "$err" if $err;
+
+    close($lock_fh);
+    return $res;
 }
 
 sub get_status {
@@ -212,6 +226,8 @@ sub parse_argv {
        method => undef,
        source_user => undef,
        dest_user => undef,
+       properties => undef,
+       dest_config_path => undef,
     };
 
     my ($ret) = GetOptionsFromArray(
@@ -225,7 +241,9 @@ sub parse_argv {
        'skip' => \$param->{skip},
        'method=s' => \$param->{method},
        'source-user=s' => \$param->{source_user},
-       'dest-user=s' => \$param->{dest_user}
+       'dest-user=s' => \$param->{dest_user},
+       'properties' => \$param->{properties},
+       'dest-config-path=s' => \$param->{dest_config_path},
     );
 
     die "can't parse options\n" if $ret == 0;
@@ -294,6 +312,8 @@ sub param_to_job {
     $job->{source} = $param->{source};
     $job->{source_user} = $param->{source_user};
     $job->{dest_user} = $param->{dest_user};
+    $job->{properties} = !!$param->{properties};
+    $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
 
     return $job;
 }
@@ -329,7 +349,6 @@ sub update_state {
 
        $in_fh = IO::File->new("< $STATE");
        die "Could not open file $STATE: $!\n" if !$in_fh;
-       lock($in_fh);
        $text = <$in_fh>;
     };
 
@@ -362,7 +381,7 @@ sub update_state {
     print $out_fh $text;
 
     close($out_fh);
-    move("$STATE.new", $STATE);
+    rename "$STATE.new", $STATE;
     eval {
        close($in_fh);
     };
@@ -382,7 +401,6 @@ sub update_cron {
 
     my $fh = IO::File->new("< $CRONJOBS");
     die "Could not open file $CRONJOBS: $!\n" if !$fh;
-    lock($fh);
 
     my @test = <$fh>;
 
@@ -414,7 +432,7 @@ sub update_cron {
     die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
     close ($new_fh);
 
-    die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
+    die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
     close ($fh);
 }
 
@@ -439,6 +457,8 @@ sub format_job {
     $text .= " --verbose" if $job->{verbose};
     $text .= " --source-user $job->{source_user}";
     $text .= " --dest-user $job->{dest_user}";
+    $text .= " --properties" if $job->{properties};
+    $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
     $text .= "\n";
 
     return $text;
@@ -468,19 +488,18 @@ sub list {
 sub vm_exists {
     my ($target, $user) = @_;
 
-    my @cmd = ('ssh', "$user\@$target->{ip}", '--') if $target->{ip};
-
-    my $res = undef;
-
     return undef if !defined($target->{vmid});
 
-    eval { $res = run_cmd([@cmd, 'ls',  "$QEMU_CONF/$target->{vmid}.conf"]) };
-
-    return "qemu" if $res;
-
-    eval { $res = run_cmd([@cmd, 'ls',  "$LXC_CONF/$target->{vmid}.conf"]) };
+    my $conf_fn = "$target->{vmid}.conf";
 
-    return "lxc" if $res;
+    if ($target->{ip}) {
+       my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
+       return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
+       return "lxc" if  eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
+    } else {
+       return "qemu" if -f "$QEMU_CONF/$conf_fn";
+       return "lxc" if -f "$LXC_CONF/$conf_fn";
+    }
 
     return undef;
 }
@@ -488,48 +507,50 @@ sub vm_exists {
 sub init {
     my ($param) = @_;
 
-    my $cfg = read_cron();
+    locked("$CONFIG_PATH/cron_and_state.lock", sub {
+       my $cfg = read_cron();
 
-    my $job = param_to_job($param);
+       my $job = param_to_job($param);
 
-    $job->{state} = "ok";
-    $job->{lsync} = 0;
+       $job->{state} = "ok";
+       $job->{lsync} = 0;
 
-    my $source = parse_target($param->{source});
-    my $dest = parse_target($param->{dest});
+       my $source = parse_target($param->{source});
+       my $dest = parse_target($param->{dest});
 
-    if (my $ip =  $dest->{ip}) {
-       run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
-    }
+       if (my $ip =  $dest->{ip}) {
+           run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
+       }
 
-    if (my $ip =  $source->{ip}) {
-       run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
-    }
+       if (my $ip =  $source->{ip}) {
+           run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
+       }
 
-    die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
+       die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
 
-    if (!defined($source->{vmid})) {
-       die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
-    }
+       if (!defined($source->{vmid})) {
+           die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
+       }
 
-    my $vm_type = vm_exists($source, $param->{source_user});
-    $job->{vm_type} = $vm_type;
-    $source->{vm_type} = $vm_type;
+       my $vm_type = vm_exists($source, $param->{source_user});
+       $job->{vm_type} = $vm_type;
+       $source->{vm_type} = $vm_type;
 
-    die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
+       die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
 
-    die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
+       die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
 
-    #check if vm has zfs disks if not die;
-    get_disks($source, $param->{source_user}) if $source->{vmid};
+       #check if vm has zfs disks if not die;
+       get_disks($source, $param->{source_user}) if $source->{vmid};
 
-    update_cron($job);
-    update_state($job);
+       update_cron($job);
+       update_state($job);
+    }); #cron and state lock
 
-    eval {
-       sync($param) if !$param->{skip};
-    };
-    if(my $err = $@) {
+    return if $param->{skip};
+
+    eval { sync($param) };
+    if (my $err = $@) {
        destroy_job($param);
        print $err;
     }
@@ -554,107 +575,141 @@ sub get_job {
 sub destroy_job {
     my ($param) = @_;
 
-    my $job = get_job($param);
-    $job->{state} = "del";
+    locked("$CONFIG_PATH/cron_and_state.lock", sub {
+       my $job = get_job($param);
+       $job->{state} = "del";
 
-    update_cron($job);
-    update_state($job);
+       update_cron($job);
+       update_state($job);
+    });
 }
 
 sub sync {
     my ($param) = @_;
 
-    my $lock_fh = IO::File->new("> $LOCKFILE");
-    die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
-    lock($lock_fh);
-
-    my $date = get_date();
     my $job;
-    eval {
-       $job = get_job($param);
-    };
 
-    if ($job && $job->{state} eq "syncing") {
-       die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
-    }
+    locked("$CONFIG_PATH/cron_and_state.lock", sub {
+       eval { $job = get_job($param) };
 
-    my $dest = parse_target($param->{dest});
-    my $source = parse_target($param->{source});
+       if ($job) {
+           if (defined($job->{state}) && ($job->{state} eq "syncing" || $job->{state} eq "waiting")) {
+               die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
+           }
 
-    my $sync_path = sub {
-       my ($source, $dest, $job, $param, $date) = @_;
+           $job->{state} = "waiting";
+           update_state($job);
+       }
+    });
 
-       ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
+    locked("$CONFIG_PATH/sync.lock", sub {
 
-       snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
+       my $date = get_date();
 
-       send_image($source, $dest, $param);
+       my $dest;
+       my $source;
+       my $vm_type;
 
-       snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
+       locked("$CONFIG_PATH/cron_and_state.lock", sub {
+           #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
+           eval { $job = get_job($param); };
 
-    };
+           if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
+               die "Job --source $param->{source} --name $param->{name} has been disabled\n";
+           }
 
-    my $vm_type = vm_exists($source, $param->{source_user});
-    $source->{vm_type} = $vm_type;
+           $dest = parse_target($param->{dest});
+           $source = parse_target($param->{source});
 
-    if ($job) {
-       $job->{state} = "syncing";
-       $job->{vm_type} = $vm_type if !$job->{vm_type};
-       update_state($job);
-    }
+           $vm_type = vm_exists($source, $param->{source_user});
+           $source->{vm_type} = $vm_type;
 
-    eval{
-       if ($source->{vmid}) {
-           die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
-           die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
-           my $disks = get_disks($source, $param->{source_user});
-
-           foreach my $disk (sort keys %{$disks}) {
-               $source->{all} = $disks->{$disk}->{all};
-               $source->{pool} = $disks->{$disk}->{pool};
-               $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
-               $source->{last_part} = $disks->{$disk}->{last_part};
-               &$sync_path($source, $dest, $job, $param, $date);
+           if ($job) {
+               $job->{state} = "syncing";
+               $job->{vm_type} = $vm_type if !$job->{vm_type};
+               update_state($job);
            }
-           if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
-               send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user});
+       }); #cron and state lock
+
+       my $sync_path = sub {
+           my ($source, $dest, $job, $param, $date) = @_;
+
+           ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{dest_user});
+
+           snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
+
+           send_image($source, $dest, $param);
+
+           snapshot_destroy($source, $dest, $param->{method}, $dest->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $dest->{old_snap});
+
+       };
+
+       eval{
+           if ($source->{vmid}) {
+               die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
+               die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
+               my $disks = get_disks($source, $param->{source_user});
+
+               foreach my $disk (sort keys %{$disks}) {
+                   $source->{all} = $disks->{$disk}->{all};
+                   $source->{pool} = $disks->{$disk}->{pool};
+                   $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
+                   $source->{last_part} = $disks->{$disk}->{last_part};
+                   &$sync_path($source, $dest, $job, $param, $date);
+               }
+               if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
+                   send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
+               } else {
+                   send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
+               }
            } else {
-               send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user});
+               &$sync_path($source, $dest, $job, $param, $date);
            }
-       } else {
-           &$sync_path($source, $dest, $job, $param, $date);
-       }
-    };
-    if(my $err = $@) {
-       if ($job) {
-           $job->{state} = "error";
-           update_state($job);
-           unlock($lock_fh);
-           close($lock_fh);
+       };
+       if (my $err = $@) {
+           locked("$CONFIG_PATH/cron_and_state.lock", sub {
+               eval { $job = get_job($param); };
+               if ($job) {
+                   $job->{state} = "error";
+                   update_state($job);
+               }
+           });
            print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
+           die "$err\n";
        }
-       die "$err\n";
-    }
 
-    if ($job) {
-       $job->{state} = "ok";
-       $job->{lsync} = $date;
-       update_state($job);
-    }
-
-    unlock($lock_fh);
-    close($lock_fh);
+       locked("$CONFIG_PATH/cron_and_state.lock", sub {
+           eval { $job = get_job($param); };
+           if ($job) {
+               if (defined($job->{state}) && $job->{state} eq "stopped") {
+                   $job->{state} = "stopped";
+               } else {
+                   $job->{state} = "ok";
+               }
+               $job->{lsync} = $date;
+               update_state($job);
+           }
+       });
+    }); #sync lock
 }
 
 sub snapshot_get{
-    my ($source, $dest, $max_snap, $name, $source_user) = @_;
+    my ($source, $dest, $max_snap, $name, $dest_user) = @_;
 
     my $cmd = [];
-    push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
+    push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--', if $dest->{ip};
     push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
-    push @$cmd, $source->{all};
 
-    my $raw = run_cmd($cmd);
+    my $path = $dest->{all};
+    $path .= "/$source->{last_part}" if $source->{last_part};
+    push @$cmd, $path;
+
+    my $raw;
+    eval {$raw = run_cmd($cmd)};
+    if (my $erro =$@) { #this means the volume doesn't exist on dest yet
+       return undef;
+    }
+
     my $index = 0;
     my $line = "";
     my $last_snap = undef;
@@ -662,9 +717,10 @@ sub snapshot_get{
 
     while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
        $line = $1;
-       if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
-
+       if ($line =~ m/@(.*)$/) {
            $last_snap = $1 if (!$last_snap);
+       }
+       if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
            $old_snap = $1;
            $index++;
            if ($index == $max_snap) {
@@ -701,42 +757,6 @@ sub snapshot_add {
     }
 }
 
-sub write_cron {
-    my ($cfg) = @_;
-
-    my $text = "SHELL=/bin/sh\n";
-    $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
-
-    my $fh = IO::File->new("> $CRONJOBS");
-    die "Could not open file: $!\n" if !$fh;
-
-    foreach my $source (sort keys%{$cfg}) {
-       foreach my $sync_name (sort keys%{$cfg->{$source}}) {
-           next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
-           $text .= "$PROG_PATH sync";
-           $text .= " -source  ";
-           if ($cfg->{$source}->{$sync_name}->{vmid}) {
-               $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
-               $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
-           } else {
-               $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
-               $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
-               $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
-           }
-           $text .= " -dest  ";
-           $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
-           $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
-           $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
-           $text .= " -name $sync_name ";
-           $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
-           $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
-           $text .= "\n";
-       }
-    }
-    die "Can't write to cron\n" if (!print($fh $text));
-    close($fh);
-}
-
 sub get_disks {
     my ($target, $user) = @_;
 
@@ -785,7 +805,7 @@ sub parse_disks {
        my $line = $1;
 
        next if $line =~ /media=cdrom/;
-       next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
+       next if $line !~ m/$DISK_KEY_RE/;
 
        #QEMU if backup is not set include in  sync
        next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
@@ -795,7 +815,7 @@ sub parse_disks {
 
        my $disk = undef;
        my $stor = undef;
-       if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
+       if($line =~ m/$DISK_KEY_RE(.*)$/) {
            my @parameter = split(/,/,$1);
 
            foreach my $opt (@parameter) {
@@ -886,31 +906,25 @@ sub snapshot_destroy {
     }
 }
 
+# check if snapshot for incremental sync exist on source side
 sub snapshot_exist {
-    my ($source , $dest, $method, $dest_user) = @_;
+    my ($source , $dest, $method, $source_user) = @_;
 
     my $cmd = [];
-    push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--' if $dest->{ip};
+    push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip};
     push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
 
-    my $path = $dest->{all};
-    $path .= "/$source->{last_part}" if $source->{last_part};
-    $path .= "\@$source->{old_snap}";
+    my $path = $source->{all};
+    $path .= "\@$dest->{last_snap}";
 
     push @$cmd, $path;
 
-
-    my $text = "";
-    eval {$text =run_cmd($cmd);};
+    eval {run_cmd($cmd)};
     if (my $erro =$@) {
        warn "WARN: $erro";
        return undef;
     }
-
-    while ($text && $text =~ s/^(.*?)(\n|$)//) {
-       my $line =$1;
-       return 1 if $line =~ m/^.*$source->{old_snap}$/;
-    }
+    return 1;
 }
 
 sub send_image {
@@ -920,10 +934,11 @@ sub send_image {
 
     push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
     push @$cmd, 'zfs', 'send';
+    push @$cmd, '-p', if $param->{properties};
     push @$cmd, '-v' if $param->{verbose};
 
-    if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{dest_user})) {
-       push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
+    if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) {
+       push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}";
     }
     push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
 
@@ -952,12 +967,13 @@ sub send_image {
 
 
 sub send_config{
-    my ($source, $dest, $method, $source_user, $dest_user) = @_;
+    my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
 
     my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
     my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
 
-    my $config_dir = $dest->{last_part} ?  "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
+    my $config_dir = $dest_config_path // $CONFIG_PATH;
+    $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
 
     $dest_target_new = $config_dir.'/'.$dest_target_new;
 
@@ -974,7 +990,7 @@ sub send_config{
        }
 
        if ($source->{destroy}){
-           my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
+           my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$dest->{old_snap}";
            if($dest->{ip}){
                run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
            } else {
@@ -1015,19 +1031,23 @@ sub status {
 sub enable_job {
     my ($param) = @_;
 
-    my $job = get_job($param);
-    $job->{state} = "ok";
-    update_state($job);
-    update_cron($job);
+    locked("$CONFIG_PATH/cron_and_state.lock", sub {
+       my $job = get_job($param);
+       $job->{state} = "ok";
+       update_state($job);
+       update_cron($job);
+    });
 }
 
 sub disable_job {
     my ($param) = @_;
 
-    my $job = get_job($param);
-    $job->{state} = "stopped";
-    update_state($job);
-    update_cron($job);
+    locked("$CONFIG_PATH/cron_and_state.lock", sub {
+       my $job = get_job($param);
+       $job->{state} = "stopped";
+       update_state($job);
+       update_cron($job);
+    });
 }
 
 my $cmd_help = {
@@ -1080,6 +1100,14 @@ $PROGNAME create -dest <string> -source <string> [OPTIONS]
         -source-user    string
 
                name of the user on the source target, root by default
+
+       -properties     boolean
+
+               Include the dataset's properties in the stream.
+
+       -dest-config-path    string
+
+               specify a custom config path on the destination target. default is /var/lib/pve-zsync
     },
     sync => qq{
 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
@@ -1118,6 +1146,14 @@ $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
        -verbose   boolean
 
                print out the sync progress.
+
+       -properties     boolean
+
+               Include the dataset's properties in the stream.
+
+       -dest-config-path    string
+
+               specify a custom config path on the destination target. default is /var/lib/pve-zsync
     },
     list => qq{
 $PROGNAME list