]> git.proxmox.com Git - pve-zsync.git/blobdiff - pve-zsync
bump version to 2.3.1
[pve-zsync.git] / pve-zsync
index dd3dee4db27c31dbd5a0c89520e4ecf8d0b7f350..de5d46f529f6a6f897d56349961fda11d60c74d3 100755 (executable)
--- a/pve-zsync
+++ b/pve-zsync
@@ -2,14 +2,14 @@
 
 use strict;
 use warnings;
-use Data::Dumper qw(Dumper);
+
 use Fcntl qw(:flock SEEK_END);
 use Getopt::Long qw(GetOptionsFromArray);
-use File::Copy qw(move);
 use File::Path qw(make_path);
 use JSON;
 use IO::File;
 use String::ShellQuote 'shell_quote';
+use Text::ParseWords;
 
 my $PROGNAME = "pve-zsync";
 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
@@ -19,10 +19,18 @@ my $PATH = "/usr/sbin";
 my $PVE_DIR = "/etc/pve/local";
 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
 my $LXC_CONF = "${PVE_DIR}/lxc";
-my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
 my $PROG_PATH = "$PATH/${PROGNAME}";
 my $INTERVAL = 15;
-my $DEBUG = 0;
+my $DEBUG;
+
+BEGIN {
+    $DEBUG = 0; # change default here. not above on declaration!
+    $DEBUG ||= $ENV{ZSYNC_DEBUG};
+    if ($DEBUG) {
+       require Data::Dumper;
+       Data::Dumper->import();
+    }
+}
 
 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
@@ -46,15 +54,22 @@ my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])";       # ipv6 must always be in brac
 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
 
-check_bin ('cstream');
-check_bin ('zfs');
-check_bin ('ssh');
-check_bin ('scp');
+my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|tpmstate|mp)\d+)|rootfs): /;
 
-$SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =
-    sub {
-       die "Signal aborting sync\n";
-    };
+my $INSTANCE_ID = get_instance_id($$);
+
+my $command = $ARGV[0];
+
+if (defined($command) && $command ne 'help' && $command ne 'printpod') {
+    check_bin ('cstream');
+    check_bin ('zfs');
+    check_bin ('ssh');
+    check_bin ('scp');
+}
+
+$SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =  sub {
+    die "Signaled, aborting sync: $!\n";
+};
 
 sub check_bin {
     my ($bin)  = @_;
@@ -69,6 +84,19 @@ sub check_bin {
     die "unable to find command '$bin'\n";
 }
 
+sub read_file {
+    my ($filename, $one_line_only) = @_;
+
+    my $fh = IO::File->new($filename, "r")
+       or die "Could not open file ${filename}: $!\n";
+
+    my $text = $one_line_only ? <$fh> : [ <$fh> ];
+
+    close($fh);
+
+    return $text;
+}
+
 sub cut_target_width {
     my ($path, $maxlen) = @_;
     $path =~ s@/+@/@g;
@@ -99,14 +127,20 @@ sub cut_target_width {
     return "$head/" . $path . "/$tail";
 }
 
-sub lock {
-    my ($fh) = @_;
-    flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
-}
+sub locked {
+    my ($lock_fn, $code) = @_;
+
+    my $lock_fh = IO::File->new("> $lock_fn");
+
+    flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
+    my $res = eval { $code->() };
+    my $err = $@;
 
-sub unlock {
-    my ($fh) = @_;
-    flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
+    flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
+    die "$err" if $err;
+
+    close($lock_fh);
+    return $res;
 }
 
 sub get_status {
@@ -119,15 +153,15 @@ sub get_status {
     return undef;
 }
 
-sub check_pool_exists {
-    my ($target) = @_;
+sub check_dataset_exists {
+    my ($dataset, $ip, $user) = @_;
 
     my $cmd = [];
 
-    if ($target->{ip}) {
-       push @$cmd, 'ssh', "root\@$target->{ip}", '--';
+    if ($ip) {
+       push @$cmd, 'ssh', "$user\@$ip", '--';
     }
-    push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
+    push @$cmd, 'zfs', 'list', '-H', '--', $dataset;
     eval {
        run_cmd($cmd);
     };
@@ -138,6 +172,19 @@ sub check_pool_exists {
     return 1;
 }
 
+sub create_file_system {
+    my ($file_system, $ip, $user) = @_;
+
+    my $cmd = [];
+
+    if ($ip) {
+       push @$cmd, 'ssh', "$user\@$ip", '--';
+    }
+    push @$cmd, 'zfs', 'create', $file_system;
+
+    run_cmd($cmd);
+}
+
 sub parse_target {
     my ($text) = @_;
 
@@ -184,46 +231,58 @@ sub read_cron {
        return undef;
     }
 
-    my $fh = IO::File->new("< $CRONJOBS");
-    die "Could not open file $CRONJOBS: $!\n" if !$fh;
-
-    my @text = <$fh>;
+    my $text = read_file($CRONJOBS, 0);
 
-    close($fh);
-
-    return encode_cron(@text);
+    return parse_cron(@{$text});
 }
 
 sub parse_argv {
     my (@arg) = @_;
 
-    my $param = {};
-    $param->{dest} = undef;
-    $param->{source} = undef;
-    $param->{verbose} = undef;
-    $param->{limit} = undef;
-    $param->{maxsnap} = undef;
-    $param->{name} = undef;
-    $param->{skip} = undef;
-    $param->{method} = undef;
-
-    my ($ret, $ar) = GetOptionsFromArray(\@arg,
-                                        'dest=s' => \$param->{dest},
-                                        'source=s' => \$param->{source},
-                                        'verbose' => \$param->{verbose},
-                                        'limit=i' => \$param->{limit},
-                                        'maxsnap=i' => \$param->{maxsnap},
-                                        'name=s' => \$param->{name},
-                                        'skip' => \$param->{skip},
-                                        'method=s' => \$param->{method});
-
-    if ($ret == 0) {
-       die "can't parse options\n";
-    }
+    my $param = {
+       dest => undef,
+       source => undef,
+       verbose => undef,
+       limit => undef,
+       maxsnap => undef,
+       dest_maxsnap => undef,
+       name => undef,
+       skip => undef,
+       method => undef,
+       source_user => undef,
+       dest_user => undef,
+       prepend_storage_id => undef,
+       compressed => undef,
+       properties => undef,
+       dest_config_path => undef,
+    };
 
-    $param->{name} = "default" if !$param->{name};
-    $param->{maxsnap} = 1 if !$param->{maxsnap};
-    $param->{method} = "ssh" if !$param->{method};
+    my ($ret) = GetOptionsFromArray(
+       \@arg,
+       'dest=s' => \$param->{dest},
+       'source=s' => \$param->{source},
+       'verbose' => \$param->{verbose},
+       'limit=i' => \$param->{limit},
+       'maxsnap=i' => \$param->{maxsnap},
+       'dest-maxsnap=i' => \$param->{dest_maxsnap},
+       'name=s' => \$param->{name},
+       'skip' => \$param->{skip},
+       'method=s' => \$param->{method},
+       'source-user=s' => \$param->{source_user},
+       'dest-user=s' => \$param->{dest_user},
+       'prepend-storage-id' => \$param->{prepend_storage_id},
+       'compressed' => \$param->{compressed},
+       'properties' => \$param->{properties},
+       'dest-config-path=s' => \$param->{dest_config_path},
+    );
+
+    die "can't parse options\n" if $ret == 0;
+
+    $param->{name} //= "default";
+    $param->{maxsnap} //= 1;
+    $param->{method} //= "ssh";
+    $param->{source_user} //= "root";
+    $param->{dest_user} //= "root";
 
     return $param;
 }
@@ -237,6 +296,7 @@ sub add_state_to_job {
     $job->{state} = $state->{state};
     $job->{lsync} = $state->{lsync};
     $job->{vm_type} = $state->{vm_type};
+    $job->{instance_id} = $state->{instance_id};
 
     for (my $i = 0; $state->{"snap$i"}; $i++) {
        $job->{"snap$i"} = $state->{"snap$i"};
@@ -245,23 +305,20 @@ sub add_state_to_job {
     return $job;
 }
 
-sub encode_cron {
+sub parse_cron {
     my (@text) = @_;
 
     my $cfg = {};
 
     while (my $line = shift(@text)) {
-
-       my @arg = split('\s', $line);
+       my @arg = Text::ParseWords::shellwords($line);
        my $param = parse_argv(@arg);
 
        if ($param->{source} && $param->{dest}) {
-           $cfg->{$param->{source}}->{$param->{name}}->{dest} = $param->{dest};
-           $cfg->{$param->{source}}->{$param->{name}}->{verbose} = $param->{verbose};
-           $cfg->{$param->{source}}->{$param->{name}}->{limit} = $param->{limit};
-           $cfg->{$param->{source}}->{$param->{name}}->{maxsnap} = $param->{maxsnap};
-           $cfg->{$param->{source}}->{$param->{name}}->{skip} = $param->{skip};
-           $cfg->{$param->{source}}->{$param->{name}}->{method} = $param->{method};
+           my $source = delete $param->{source};
+           my $name = delete $param->{name};
+
+           $cfg->{$source}->{$name} = $param;
        }
     }
 
@@ -274,15 +331,23 @@ sub param_to_job {
     my $job = {};
 
     my $source = parse_target($param->{source});
-    my $dest = parse_target($param->{dest}) if $param->{dest};
+    my $dest;
+    $dest = parse_target($param->{dest}) if $param->{dest};
 
     $job->{name} = !$param->{name} ? "default" : $param->{name};
     $job->{dest} = $param->{dest} if $param->{dest};
     $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
     $job->{method} = "ssh" if !$job->{method};
     $job->{limit} = $param->{limit};
-    $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
+    $job->{maxsnap} = $param->{maxsnap};
+    $job->{dest_maxsnap} = $param->{dest_maxsnap};
     $job->{source} = $param->{source};
+    $job->{source_user} = $param->{source_user};
+    $job->{dest_user} = $param->{dest_user};
+    $job->{prepend_storage_id} = !!$param->{prepend_storage_id};
+    $job->{compressed} = !!$param->{compressed};
+    $job->{properties} = !!$param->{properties};
+    $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
 
     return $job;
 }
@@ -298,29 +363,14 @@ sub read_state {
        return undef;
     }
 
-    my $fh = IO::File->new("< $STATE");
-    die "Could not open file $STATE: $!\n" if !$fh;
-
-    my $text = <$fh>;
-    my $states = decode_json($text);
-
-    close($fh);
-
-    return $states;
+    my $text = read_file($STATE, 1);
+    return decode_json($text);
 }
 
 sub update_state {
     my ($job) = @_;
-    my $text;
-    my $in_fh;
 
-    eval {
-
-       $in_fh = IO::File->new("< $STATE");
-       die "Could not open file $STATE: $!\n" if !$in_fh;
-       lock($in_fh);
-       $text = <$in_fh>;
-    };
+    my $text = eval { read_file($STATE, 1); };
 
     my $out_fh = IO::File->new("> $STATE.new");
     die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
@@ -335,6 +385,7 @@ sub update_state {
     if ($job->{state} ne "del") {
        $state->{state} = $job->{state};
        $state->{lsync} = $job->{lsync};
+       $state->{instance_id} = $job->{instance_id};
        $state->{vm_type} = $job->{vm_type};
 
        for (my $i = 0; $job->{"snap$i"} ; $i++) {
@@ -351,10 +402,7 @@ sub update_state {
     print $out_fh $text;
 
     close($out_fh);
-    move("$STATE.new", $STATE);
-    eval {
-       close($in_fh);
-    };
+    rename "$STATE.new", $STATE;
 
     return $states;
 }
@@ -369,13 +417,9 @@ sub update_cron {
     my $header = "SHELL=/bin/sh\n";
     $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
 
-    my $fh = IO::File->new("< $CRONJOBS");
-    die "Could not open file $CRONJOBS: $!\n" if !$fh;
-    lock($fh);
-
-    my @test = <$fh>;
+    my $current = read_file($CRONJOBS, 0);
 
-    while (my $line = shift(@test)) {
+    foreach my $line (@{$current}) {
        chomp($line);
        if ($line =~ m/source $job->{source} .*name $job->{name} /) {
            $updated = 1;
@@ -400,11 +444,10 @@ sub update_cron {
     my $new_fh = IO::File->new("> ${CRONJOBS}.new");
     die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
 
-    die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
+    print $new_fh $text or die "can't write to $CRONJOBS.new: $!\n";
     close ($new_fh);
 
-    die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
-    close ($fh);
+    rename "${CRONJOBS}.new", $CRONJOBS or die "can't move $CRONJOBS.new: $!\n";
 }
 
 sub format_job {
@@ -415,7 +458,7 @@ sub format_job {
        $text = "#";
     }
     if ($line) {
-       $line =~ /^#*(.+) root/;
+       $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
        $text .= $1;
     } else {
        $text .= "*/$INTERVAL * * * *";
@@ -423,9 +466,16 @@ sub format_job {
     $text .= " root";
     $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
     $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
+    $text .= " --dest-maxsnap $job->{dest_maxsnap}" if defined($job->{dest_maxsnap});
     $text .= " --limit $job->{limit}" if $job->{limit};
     $text .= " --method $job->{method}";
     $text .= " --verbose" if $job->{verbose};
+    $text .= " --source-user $job->{source_user}";
+    $text .= " --dest-user $job->{dest_user}";
+    $text .= " --prepend-storage-id" if $job->{prepend_storage_id};
+    $text .= " --compressed" if $job->{compressed};
+    $text .= " --properties" if $job->{properties};
+    $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
     $text .= "\n";
 
     return $text;
@@ -453,21 +503,20 @@ sub list {
 }
 
 sub vm_exists {
-    my ($target) = @_;
-
-    my @cmd = ('ssh', "root\@$target->{ip}", '--') if $target->{ip};
-
-    my $res = undef;
+    my ($target, $user) = @_;
 
     return undef if !defined($target->{vmid});
 
-    eval { $res = run_cmd([@cmd, 'ls',  "$QEMU_CONF/$target->{vmid}.conf"]) };
+    my $conf_fn = "$target->{vmid}.conf";
 
-    return "qemu" if $res;
-
-    eval { $res = run_cmd([@cmd, 'ls',  "$LXC_CONF/$target->{vmid}.conf"]) };
-
-    return "lxc" if $res;
+    if ($target->{ip}) {
+       my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
+       return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
+       return "lxc" if  eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
+    } else {
+       return "qemu" if -f "$QEMU_CONF/$conf_fn";
+       return "lxc" if -f "$LXC_CONF/$conf_fn";
+    }
 
     return undef;
 }
@@ -475,48 +524,52 @@ sub vm_exists {
 sub init {
     my ($param) = @_;
 
-    my $cfg = read_cron();
+    locked("$CONFIG_PATH/cron_and_state.lock", sub {
+       my $cfg = read_cron();
 
-    my $job = param_to_job($param);
+       my $job = param_to_job($param);
 
-    $job->{state} = "ok";
-    $job->{lsync} = 0;
+       $job->{state} = "ok";
+       $job->{lsync} = 0;
 
-    my $source = parse_target($param->{source});
-    my $dest = parse_target($param->{dest});
+       my $source = parse_target($param->{source});
+       my $dest = parse_target($param->{dest});
 
-    if (my $ip =  $dest->{ip}) {
-       run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
-    }
+       if (my $ip =  $dest->{ip}) {
+           run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
+       }
 
-    if (my $ip =  $source->{ip}) {
-       run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
-    }
+       if (my $ip =  $source->{ip}) {
+           run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
+       }
 
-    die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest);
+       die "Pool $dest->{all} does not exist\n"
+           if !check_dataset_exists($dest->{all}, $dest->{ip}, $param->{dest_user});
 
-    if (!defined($source->{vmid})) {
-       die "Pool $source->{all} does not exists\n" if !check_pool_exists($source);
-    }
+       if (!defined($source->{vmid})) {
+           die "Pool $source->{all} does not exist\n"
+               if !check_dataset_exists($source->{all}, $source->{ip}, $param->{source_user});
+       }
 
-    my $vm_type = vm_exists($source);
-    $job->{vm_type} = $vm_type;
-    $source->{vm_type} = $vm_type;
+       my $vm_type = vm_exists($source, $param->{source_user});
+       $job->{vm_type} = $vm_type;
+       $source->{vm_type} = $vm_type;
 
-    die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
+       die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
 
-    die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
+       die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
 
-    #check if vm has zfs disks if not die;
-    get_disks($source) if $source->{vmid};
+       #check if vm has zfs disks if not die;
+       get_disks($source, $param->{source_user}) if $source->{vmid};
 
-    update_cron($job);
-    update_state($job);
+       update_cron($job);
+       update_state($job);
+    }); #cron and state lock
 
-    eval {
-       sync($param) if !$param->{skip};
-    };
-    if(my $err = $@) {
+    return if $param->{skip};
+
+    eval { sync($param) };
+    if (my $err = $@) {
        destroy_job($param);
        print $err;
     }
@@ -541,122 +594,219 @@ sub get_job {
 sub destroy_job {
     my ($param) = @_;
 
-    my $job = get_job($param);
-    $job->{state} = "del";
+    locked("$CONFIG_PATH/cron_and_state.lock", sub {
+       my $job = get_job($param);
+       $job->{state} = "del";
+
+       update_cron($job);
+       update_state($job);
+    });
+}
+
+sub get_instance_id {
+    my ($pid) = @_;
+
+    my $stat = read_file("/proc/$pid/stat", 1)
+       or die "unable to read process stats\n";
+    my $boot_id = read_file("/proc/sys/kernel/random/boot_id", 1)
+       or die "unable to read boot ID\n";
+
+    my $stats = [ split(/\s+/, $stat) ];
+    my $starttime = $stats->[21];
+    chomp($boot_id);
+
+    return "${pid}:${starttime}:${boot_id}";
+}
+
+sub instance_exists {
+    my ($instance_id) = @_;
+
+    if (defined($instance_id) && $instance_id =~ m/^([1-9][0-9]*):/) {
+       my $pid = $1;
+       my $actual_id = eval { get_instance_id($pid); };
+       return defined($actual_id) && $actual_id eq $instance_id;
+    }
 
-    update_cron($job);
-    update_state($job);
+    return 0;
 }
 
 sub sync {
     my ($param) = @_;
 
-    my $lock_fh = IO::File->new("> $LOCKFILE");
-    die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
-    lock($lock_fh);
-
-    my $date = get_date();
     my $job;
-    eval {
-       $job = get_job($param);
-    };
 
-    if ($job && $job->{state} eq "syncing") {
-       die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
-    }
+    locked("$CONFIG_PATH/cron_and_state.lock", sub {
+       eval { $job = get_job($param) };
 
-    my $dest = parse_target($param->{dest});
-    my $source = parse_target($param->{source});
+       if ($job) {
+           my $state = $job->{state} // 'ok';
+           $state = 'ok' if !instance_exists($job->{instance_id});
+
+           if ($state eq "syncing" || $state eq "waiting") {
+               die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
+           }
+
+           $job->{state} = "waiting";
+           $job->{instance_id} = $INSTANCE_ID;
 
-    my $sync_path = sub {
-       my ($source, $dest, $job, $param, $date) = @_;
+           update_state($job);
+       }
+    });
 
-       ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name});
+    locked("$CONFIG_PATH/sync.lock", sub {
 
-       snapshot_add($source, $dest, $param->{name}, $date);
+       my $date = get_date();
 
-       send_image($source, $dest, $param);
+       my $dest;
+       my $source;
+       my $vm_type;
 
-       snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}) if ($source->{destroy} && $source->{old_snap});
+       locked("$CONFIG_PATH/cron_and_state.lock", sub {
+           #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
+           eval { $job = get_job($param); };
 
-    };
+           if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
+               die "Job --source $param->{source} --name $param->{name} has been disabled\n";
+           }
 
-    my $vm_type = vm_exists($source);
-    $source->{vm_type} = $vm_type;
+           $dest = parse_target($param->{dest});
+           $source = parse_target($param->{source});
 
-    if ($job) {
-       $job->{state} = "syncing";
-       $job->{vm_type} = $vm_type if !$job->{vm_type};
-       update_state($job);
-    }
+           $vm_type = vm_exists($source, $param->{source_user});
+           $source->{vm_type} = $vm_type;
 
-    eval{
-       if ($source->{vmid}) {
-           die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
-           my $disks = get_disks($source);
-
-           foreach my $disk (sort keys %{$disks}) {
-               $source->{all} = $disks->{$disk}->{all};
-               $source->{pool} = $disks->{$disk}->{pool};
-               $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
-               $source->{last_part} = $disks->{$disk}->{last_part};
-               &$sync_path($source, $dest, $job, $param, $date);
+           if ($job) {
+               $job->{state} = "syncing";
+               $job->{vm_type} = $vm_type if !$job->{vm_type};
+               update_state($job);
+           }
+       }); #cron and state lock
+
+       my $sync_path = sub {
+           my ($source, $dest, $job, $param, $date) = @_;
+
+           my $dest_dataset = target_dataset($source, $dest);
+
+           ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get(
+               $dest_dataset,
+               $param->{dest_maxsnap} // $param->{maxsnap},
+               $param->{name},
+               $dest->{ip},
+               $param->{dest_user},
+           );
+
+           ($source->{old_snap}) = snapshot_get(
+               $source->{all},
+               $param->{maxsnap},
+               $param->{name},
+               $source->{ip},
+               $param->{source_user},
+           );
+
+           prepare_prepended_target($source, $dest, $param->{dest_user}) if defined($dest->{prepend});
+
+           snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
+
+           send_image($source, $dest, $param);
+
+           for my $old_snap (@{$source->{old_snap}}) {
+               snapshot_destroy($source->{all}, $old_snap, $source->{ip}, $param->{source_user});
            }
-           if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
-               send_config($source, $dest,'ssh');
+
+           for my $old_snap (@{$dest->{old_snap}}) {
+               snapshot_destroy($dest_dataset, $old_snap, $dest->{ip}, $param->{dest_user});
+           }
+       };
+
+       eval{
+           if ($source->{vmid}) {
+               die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
+               die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
+               my $disks = get_disks($source, $param->{source_user});
+
+               foreach my $disk (sort keys %{$disks}) {
+                   $source->{all} = $disks->{$disk}->{all};
+                   $source->{pool} = $disks->{$disk}->{pool};
+                   $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
+                   $source->{last_part} = $disks->{$disk}->{last_part};
+
+                   $dest->{prepend} = $disks->{$disk}->{storage_id}
+                       if $param->{prepend_storage_id};
+
+                   &$sync_path($source, $dest, $job, $param, $date);
+               }
+               if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
+                   send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
+               } else {
+                   send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
+               }
            } else {
-               send_config($source, $dest,'local');
+               &$sync_path($source, $dest, $job, $param, $date);
            }
-       } else {
-           &$sync_path($source, $dest, $job, $param, $date);
-       }
-    };
-    if(my $err = $@) {
-       if ($job) {
-           $job->{state} = "error";
-           update_state($job);
-           unlock($lock_fh);
-           close($lock_fh);
+       };
+       if (my $err = $@) {
+           locked("$CONFIG_PATH/cron_and_state.lock", sub {
+               eval { $job = get_job($param); };
+               if ($job) {
+                   $job->{state} = "error";
+                   delete $job->{instance_id};
+                   update_state($job);
+               }
+           });
            print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
+           die "$err\n";
        }
-       die "$err\n";
-    }
-
-    if ($job) {
-       $job->{state} = "ok";
-       $job->{lsync} = $date;
-       update_state($job);
-    }
 
-    unlock($lock_fh);
-    close($lock_fh);
+       locked("$CONFIG_PATH/cron_and_state.lock", sub {
+           eval { $job = get_job($param); };
+           if ($job) {
+               if (defined($job->{state}) && $job->{state} eq "stopped") {
+                   $job->{state} = "stopped";
+               } else {
+                   $job->{state} = "ok";
+               }
+               $job->{lsync} = $date;
+               delete $job->{instance_id};
+               update_state($job);
+           }
+       });
+    }); #sync lock
 }
 
 sub snapshot_get{
-    my ($source, $dest, $max_snap, $name) = @_;
+    my ($dataset, $max_snap, $name, $ip, $user) = @_;
 
     my $cmd = [];
-    push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
+    push @$cmd, 'ssh', "$user\@$ip", '--', if $ip;
     push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
-    push @$cmd, $source->{all};
+    push @$cmd, $dataset;
+
+    my $raw;
+    eval {$raw = run_cmd($cmd)};
+    if (my $erro =$@) { #this means the volume doesn't exist on dest yet
+       return undef;
+    }
 
-    my $raw = run_cmd($cmd);
     my $index = 0;
     my $line = "";
     my $last_snap = undef;
-    my $old_snap;
+    my $old_snap = [];
 
     while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
        $line = $1;
+       if ($line =~ m/@(.*)$/) {
+           $last_snap = $1 if (!$last_snap);
+       }
        if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
+           # interpreted as infinity
+           last if $max_snap <= 0;
 
-           $last_snap = $1 if (!$last_snap);
-           $old_snap = $1;
+           my $snap = $1;
            $index++;
-           if ($index == $max_snap) {
-               $source->{destroy} = 1;
-               last;
-           };
+
+           if ($index >= $max_snap) {
+               push @{$old_snap}, $snap;
+           }
        }
     }
 
@@ -666,7 +816,7 @@ sub snapshot_get{
 }
 
 sub snapshot_add {
-    my ($source, $dest, $name, $date) = @_;
+    my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
 
     my $snap_name = "rep_$name\_".$date;
 
@@ -675,59 +825,23 @@ sub snapshot_add {
     my $path = "$source->{all}\@$snap_name";
 
     my $cmd = [];
-    push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
+    push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
     push @$cmd, 'zfs', 'snapshot', $path;
     eval{
        run_cmd($cmd);
     };
 
     if (my $err = $@) {
-       snapshot_destroy($source, $dest, 'ssh', $snap_name);
+       snapshot_destroy($source->{all}, $snap_name, $source->{ip}, $source_user);
        die "$err\n";
     }
 }
 
-sub write_cron {
-    my ($cfg) = @_;
-
-    my $text = "SHELL=/bin/sh\n";
-    $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
-
-    my $fh = IO::File->new("> $CRONJOBS");
-    die "Could not open file: $!\n" if !$fh;
-
-    foreach my $source (sort keys%{$cfg}) {
-       foreach my $sync_name (sort keys%{$cfg->{$source}}) {
-           next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
-           $text .= "$PROG_PATH sync";
-           $text .= " -source  ";
-           if ($cfg->{$source}->{$sync_name}->{vmid}) {
-               $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
-               $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
-           } else {
-               $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
-               $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
-               $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
-           }
-           $text .= " -dest  ";
-           $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
-           $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
-           $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
-           $text .= " -name $sync_name ";
-           $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
-           $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
-           $text .= "\n";
-       }
-    }
-    die "Can't write to cron\n" if (!print($fh $text));
-    close($fh);
-}
-
 sub get_disks {
-    my ($target) = @_;
+    my ($target, $user) = @_;
 
     my $cmd = [];
-    push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
+    push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
 
     if ($target->{vm_type} eq 'qemu') {
        push @$cmd, 'qm', 'config', $target->{vmid};
@@ -739,7 +853,7 @@ sub get_disks {
 
     my $res = run_cmd($cmd);
 
-    my $disks = parse_disks($res, $target->{ip}, $target->{vm_type});
+    my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
 
     return $disks;
 }
@@ -762,7 +876,7 @@ sub run_cmd {
 }
 
 sub parse_disks {
-    my ($text, $ip, $vm_type) = @_;
+    my ($text, $ip, $vm_type, $user) = @_;
 
     my $disks;
 
@@ -771,7 +885,7 @@ sub parse_disks {
        my $line = $1;
 
        next if $line =~ /media=cdrom/;
-       next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
+       next if $line !~ m/$DISK_KEY_RE/;
 
        #QEMU if backup is not set include in  sync
        next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
@@ -781,11 +895,11 @@ sub parse_disks {
 
        my $disk = undef;
        my $stor = undef;
-       if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
+       if($line =~ m/$DISK_KEY_RE(.*)$/) {
            my @parameter = split(/,/,$1);
 
            foreach my $opt (@parameter) {
-               if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
+               if ($opt =~ m/^(?:file=|volume=)?([^:]+):([A-Za-z0-9\-]+)$/){
                    $disk = $2;
                    $stor = $1;
                    last;
@@ -798,12 +912,14 @@ sub parse_disks {
        }
 
        my $cmd = [];
-       push @$cmd, 'ssh', "root\@$ip", '--' if $ip;
-       push @$cmd, 'pvesm', 'path', "$stor$disk";
+       push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
+       push @$cmd, 'pvesm', 'path', "$stor:$disk";
        my $path = run_cmd($cmd);
 
-       die "Get no path from pvesm path $stor$disk\n" if !$path;
+       die "Get no path from pvesm path $stor:$disk\n" if !$path;
+
+       $disks->{$num}->{storage_id} = $stor;
+
        if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
 
            my @array = split('/', $1);
@@ -833,23 +949,53 @@ sub parse_disks {
            $num++;
 
        } else {
-           die "ERROR: in path\n";
+           die "unexpected path '$path'\n";
        }
     }
 
-    die "Vm include no disk on zfs.\n" if !$disks->{0};
+    die "Guest does not include any ZFS volumes (or all are excluded by the backup flag).\n"
+       if !$disks->{0};
     return $disks;
 }
 
+# how the corresponding dataset is named on the target
+sub target_dataset {
+    my ($source, $dest) = @_;
+
+    my $target = "$dest->{all}";
+    $target .= "/$dest->{prepend}" if defined($dest->{prepend});
+    $target .= "/$source->{last_part}" if $source->{last_part};
+    $target =~ s!/+!/!g;
+
+    return $target;
+}
+
+# create the parent dataset for the actual target
+sub prepare_prepended_target {
+    my ($source, $dest, $dest_user) = @_;
+
+    die "internal error - not a prepended target\n" if !defined($dest->{prepend});
+
+    # The parent dataset shouldn't be the actual target.
+    die "internal error - no last_part for source\n" if !$source->{last_part};
+
+    my $target = "$dest->{all}/$dest->{prepend}";
+    $target =~ s!/+!/!g;
+
+    return if check_dataset_exists($target, $dest->{ip}, $dest_user);
+
+    create_file_system($target, $dest->{ip}, $dest_user);
+}
+
 sub snapshot_destroy {
-    my ($source, $dest, $method, $snap) = @_;
+    my ($dataset, $snap, $ip, $user) = @_;
 
     my @zfscmd = ('zfs', 'destroy');
-    my $snapshot = "$source->{all}\@$snap";
+    my $snapshot = "$dataset\@$snap";
 
     eval {
-       if($source->{ip} && $method eq 'ssh'){
-           run_cmd(['ssh', "root\@$source->{ip}", '--', @zfscmd, $snapshot]);
+       if ($ip) {
+           run_cmd(['ssh', "$user\@$ip", '--', @zfscmd, $snapshot]);
        } else {
            run_cmd([@zfscmd, $snapshot]);
        }
@@ -857,46 +1003,27 @@ sub snapshot_destroy {
     if (my $erro = $@) {
        warn "WARN: $erro";
     }
-    if ($dest) {
-       my @ssh = $dest->{ip} ? ('ssh', "root\@$dest->{ip}", '--') : ();
-
-       my $path = "$dest->{all}";
-       $path .= "/$source->{last_part}" if $source->{last_part};
-
-       eval {
-           run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
-       };
-       if (my $erro = $@) {
-           warn "WARN: $erro";
-       }
-    }
 }
 
+# check if snapshot for incremental sync exist on source side
 sub snapshot_exist {
-    my ($source , $dest, $method) = @_;
+    my ($source , $dest, $method, $source_user) = @_;
 
     my $cmd = [];
-    push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
+    push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip};
     push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
 
-    my $path = $dest->{all};
-    $path .= "/$source->{last_part}" if $source->{last_part};
-    $path .= "\@$source->{old_snap}";
+    my $path = $source->{all};
+    $path .= "\@$dest->{last_snap}";
 
     push @$cmd, $path;
 
-
-    my $text = "";
-    eval {$text =run_cmd($cmd);};
+    eval {run_cmd($cmd)};
     if (my $erro =$@) {
        warn "WARN: $erro";
        return undef;
     }
-
-    while ($text && $text =~ s/^(.*?)(\n|$)//) {
-       my $line =$1;
-       return 1 if $line =~ m/^.*$source->{old_snap}$/;
-    }
+    return 1;
 }
 
 sub send_image {
@@ -904,12 +1031,15 @@ sub send_image {
 
     my $cmd = [];
 
-    push @$cmd, 'ssh', '-o', 'BatchMode=yes', "root\@$source->{ip}", '--' if $source->{ip};
+    push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
     push @$cmd, 'zfs', 'send';
+    push @$cmd, '-L', if $param->{compressed}; # no effect if dataset never had large recordsize
+    push @$cmd, '-c', if $param->{compressed};
+    push @$cmd, '-p', if $param->{properties};
     push @$cmd, '-v' if $param->{verbose};
 
-    if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method})) {
-       push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
+    if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) {
+       push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}";
     }
     push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
 
@@ -917,12 +1047,10 @@ sub send_image {
        my $bwl = $param->{limit}*1024;
        push @$cmd, \'|', 'cstream', '-t', $bwl;
     }
-    my $target = "$dest->{all}";
-    $target .= "/$source->{last_part}" if $source->{last_part};
-    $target =~ s!/+!/!g;
+    my $target = target_dataset($source, $dest);
 
     push @$cmd, \'|';
-    push @$cmd, 'ssh', '-o', 'BatchMode=yes', "root\@$dest->{ip}", '--' if $dest->{ip};
+    push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
     push @$cmd, 'zfs', 'recv', '-F', '--';
     push @$cmd, "$target";
 
@@ -931,38 +1059,39 @@ sub send_image {
     };
 
     if (my $erro = $@) {
-       snapshot_destroy($source, undef, $param->{method}, $source->{new_snap});
+       snapshot_destroy($source->{all}, $source->{new_snap}, $source->{ip}, $param->{source_user});
        die $erro;
     };
 }
 
 
 sub send_config{
-    my ($source, $dest, $method) = @_;
+    my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
 
     my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
     my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
 
-    my $config_dir = $dest->{last_part} ?  "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
+    my $config_dir = $dest_config_path // $CONFIG_PATH;
+    $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
 
     $dest_target_new = $config_dir.'/'.$dest_target_new;
 
     if ($method eq 'ssh'){
        if ($dest->{ip} && $source->{ip}) {
-           run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
-           run_cmd(['scp', '--', "root\@[$source->{ip}]:$source_target", "root\@[$dest->{ip}]:$dest_target_new"]);
+           run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
+           run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
        } elsif ($dest->{ip}) {
-           run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
-           run_cmd(['scp', '--', $source_target, "root\@[$dest->{ip}]:$dest_target_new"]);
+           run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
+           run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
        } elsif ($source->{ip}) {
            run_cmd(['mkdir', '-p', '--', $config_dir]);
-           run_cmd(['scp', '--', "root\@[$source->{ip}]:$source_target", $dest_target_new]);
+           run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
        }
 
-       if ($source->{destroy}){
-           my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
+       for my $old_snap (@{$dest->{old_snap}}) {
+           my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.${old_snap}";
            if($dest->{ip}){
-               run_cmd(['ssh', "root\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
+               run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
            } else {
                run_cmd(['rm', '-f', '--', $dest_target_old]);
            }
@@ -1001,95 +1130,133 @@ sub status {
 sub enable_job {
     my ($param) = @_;
 
-    my $job = get_job($param);
-    $job->{state} = "ok";
-    update_state($job);
-    update_cron($job);
+    locked("$CONFIG_PATH/cron_and_state.lock", sub {
+       my $job = get_job($param);
+       $job->{state} = "ok";
+       update_state($job);
+       update_cron($job);
+    });
 }
 
 sub disable_job {
     my ($param) = @_;
 
-    my $job = get_job($param);
-    $job->{state} = "stopped";
-    update_state($job);
-    update_cron($job);
+    locked("$CONFIG_PATH/cron_and_state.lock", sub {
+       my $job = get_job($param);
+       $job->{state} = "stopped";
+       update_state($job);
+       update_cron($job);
+    });
 }
 
-my $command = $ARGV[0];
-
 my $cmd_help = {
     destroy => qq{
-$PROGNAME destroy -source <string> [OPTIONS]
-
-        remove a sync Job from the scheduler
-
-        -name      string
+$PROGNAME destroy --source <string> [OPTIONS]
 
-               name of the sync job, if not set it is default
+    Remove a sync Job from the scheduler
 
-        -source    string
+       --name      string
+               The name of the sync job, if not set 'default' is used.
 
-                the source can be an  <VMID> or [IP:]<ZFSPool>[/Path]
+       --source    string
+               The source can be an  <VMID> or [IP:]<ZFSPool>[/Path]
     },
     create => qq{
-$PROGNAME create -dest <string> -source <string> [OPTIONS]
+$PROGNAME create --dest <string> --source <string> [OPTIONS]
 
-        Create a sync Job
+    Create a new sync-job
 
-        -dest      string
+       --dest      string
+               The destination target is like [IP]:<Pool>[/Path]
 
-               the destination target is like [IP]:<Pool>[/Path]
+       --dest-user string
+               The name of the user on the destination target, root by default
 
-        -limit     integer
+       --limit     integer
+               Maximal sync speed in kBytes/s, default is unlimited
 
-               max sync speed in kBytes/s, default unlimited
+       --maxsnap   integer
+               The number of snapshots to keep until older ones are erased.
+               The default is 1, use 0 for unlimited.
 
-        -maxsnap   string
+       --dest-maxsnap   integer
+               Override maxsnap for the destination dataset.
 
-               how much snapshots will be kept before get erased, default 1
+       --name      string
+               The name of the sync job, if not set it is default
 
-        -name      string
+       --prepend-storage-id
+               If specified, prepend the storage ID to the destination's path(s).
 
-               name of the sync job, if not set it is default
+       --skip
+               If specified, skip the first sync.
+
+       --source    string
+               The source can be an <VMID> or [IP:]<ZFSPool>[/Path]
 
-        -skip      boolean
+       --source-user    string
+               The (ssh) user-name on the source target, root by default
 
-               if this flag is set it will skip the first sync
+       --compressed
+               If specified, send data without decompressing first. If features lz4_compress,
+               zstd_compress or large_blocks are in use by the source, they need to be enabled on
+               the target as well.
 
-        -source    string
+       --properties
+               If specified, include the dataset's properties in the stream.
 
-               the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
+       --dest-config-path    string
+               Specifies a custom config path on the destination target.
+               The default is /var/lib/pve-zsync
     },
     sync => qq{
-$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
+$PROGNAME sync --dest <string> --source <string> [OPTIONS]\n
 
-       will sync one time
+    Trigger one sync.
 
-        -dest      string
+       --dest      string
+               The destination target is like [IP:]<Pool>[/Path]
 
-               the destination target is like [IP:]<Pool>[/Path]
+       --dest-user string
+               The (ssh) user-name on the destination target, root by default
 
-       -limit     integer
+       --limit     integer
+               The maximal sync speed in kBytes/s, default is unlimited
 
-               max sync speed in kBytes/s, default unlimited
+       --maxsnap   integer
+               The number of snapshots to keep until older ones are erased.
+               The default is 1, use 0 for unlimited.
 
-        -maxsnap   integer
+       --dest-maxsnap   integer
+               Override maxsnap for the destination dataset.
 
-               how much snapshots will be kept before get erased, default 1
+       --name      string
+               The name of the sync job, if not set it is 'default'.
+               It is only necessary if scheduler allready contains this source.
 
-        -name      string
+       --prepend-storage-id
+               If specified, prepend the storage ID to the destination's path(s).
 
-               name of the sync job, if not set it is default.
-               It is only necessary if scheduler allready contains this source.
+       --source    string
+               The source can either be an <VMID> or [IP:]<ZFSPool>[/Path]
 
-        -source    string
+       --source-user    string
+               The name of the user on the source target, root by default
 
-               the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
+       --verbose
+               If specified, print out the sync progress.
 
-       -verbose   boolean
+       --compressed
+               If specified, send data without decompressing first. If features lz4_compress,
+               zstd_compress or large_blocks are in use by the source, they need to be enabled on
+               the target as well.
 
-               print out the sync progress.
+       --properties
+               If specified, include the dataset's properties in the stream.
+
+       --dest-config-path    string
+               Specifies a custom config path on the destination target.
+               The default is /var/lib/pve-zsync
     },
     list => qq{
 $PROGNAME list
@@ -1104,43 +1271,37 @@ $PROGNAME status
     help => qq{
 $PROGNAME help <cmd> [OPTIONS]
 
-       Get help about specified command.
-
-        <cmd>      string
+    Get help about specified command.
 
-               Command name
-
-       -verbose   boolean
+       <cmd>      string
+               Command name to get help about.
 
+       --verbose
                Verbose output format.
     },
     enable => qq{
-$PROGNAME enable -source <string> [OPTIONS]
-
-        enable a syncjob and reset error
+$PROGNAME enable --source <string> [OPTIONS]
 
-        -name      string
+    Enable a sync-job and reset all job-errors, if any.
 
+       --name      string
                name of the sync job, if not set it is default
 
-        -source    string
-
-                the source can be an  <VMID> or [IP:]<ZFSPool>[/Path]
+        --source    string
+               the source can be an  <VMID> or [IP:]<ZFSPool>[/Path]
     },
     disable => qq{
-$PROGNAME disable -source <string> [OPTIONS]
+$PROGNAME disable --source <string> [OPTIONS]
 
-        pause a sync job
+    Disables (pauses) a sync-job
 
-        -name      string
+       --name      string
+               name of the sync-job, if not set it is default
 
-               name of the sync job, if not set it is default
-
-        -source    string
-
-                the source can be an  <VMID> or [IP:]<ZFSPool>[/Path] 
+       --source    string
+               the source can be an  <VMID> or [IP:]<ZFSPool>[/Path]
     },
-    printpod => 'internal command',
+    printpod => "$PROGNAME printpod\n\n\tinternal command",
 
 };
 
@@ -1190,7 +1351,7 @@ if ($command eq 'destroy') {
     my $help_command = $ARGV[1];
 
     if ($help_command && $cmd_help->{$help_command}) {
-       die "$cmd_help->{$command}\n";
+       die "$cmd_help->{$help_command}\n";
 
     }
     if ($param->{verbose}) {
@@ -1223,13 +1384,13 @@ sub usage {
     print("ERROR:\tno command specified\n") if !$help;
     print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
     print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
-    print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
-    print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
-    print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
-    print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
+    print("\t$PROGNAME create --dest <string> --source <string> [OPTIONS]\n");
+    print("\t$PROGNAME destroy --source <string> [OPTIONS]\n");
+    print("\t$PROGNAME disable --source <string> [OPTIONS]\n");
+    print("\t$PROGNAME enable --source <string> [OPTIONS]\n");
     print("\t$PROGNAME list\n");
     print("\t$PROGNAME status\n");
-    print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
+    print("\t$PROGNAME sync --dest <string> --source <string> [OPTIONS]\n");
 }
 
 sub check_target {
@@ -1240,57 +1401,62 @@ sub check_target {
 sub print_pod {
 
     my $synopsis = join("\n", sort values %$cmd_help);
+    my $commands = join(", ", sort keys %$cmd_help);
 
     print <<EOF;
 =head1 NAME
 
-pve-zsync - PVE ZFS Replication Manager
+pve-zsync - PVE ZFS Storage Sync Tool
 
 =head1 SYNOPSIS
 
 pve-zsync <COMMAND> [ARGS] [OPTIONS]
 
-$synopsis
+Where <COMMAND> can be one of: $commands
 
 =head1 DESCRIPTION
 
-This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
-This tool also has the capability to add jobs to cron so the sync will be automatically done.
-The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
-To config cron see man crontab.
+The pve-zsync tool can help you to sync your VMs or directories stored on ZFS
+between multiple servers.
+
+pve-zsync is able to automatically configure CRON jobs, so that a periodic sync
+will be automatically triggered.
+The default sync interval is 15 min, if you want to change this value you can
+do this in F</etc/cron.d/pve-zsync>. If you need help to configure CRON tabs, see
+man crontab.
 
-=head2 PVE ZFS Storage sync Tool
+=head1 COMMANDS AND OPTIONS
 
-This Tool can get remote pool on other PVE or send Pool to others ZFS machines
+$synopsis
 
 =head1 EXAMPLES
 
-add sync job from local VM to remote ZFS Server
-pve-zsync create -source=100 -dest=192.168.1.2:zfspool
+Adds a job for syncing the local VM 100 to a remote server's ZFS pool named "tank":
+    pve-zsync create --source=100 -dest=192.168.1.2:tank
 
 =head1 IMPORTANT FILES
 
-Cron jobs and config are stored at                      /etc/cron.d/pve-zsync
+Cron jobs and config are stored in F</etc/cron.d/pve-zsync>
 
-The VM config get copied on the destination machine to  /var/lib/pve-zsync/
+The VM configuration itself gets copied to the destination machines
+F</var/lib/pve-zsync/> path.
 
 =head1 COPYRIGHT AND DISCLAIMER
 
-Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
+Copyright (C) 2007-2021 Proxmox Server Solutions GmbH
 
-This program is free software: you can redistribute it and/or modify it
-under the terms of the GNU Affero General Public License as published
-by the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
+This program is free software: you can redistribute it and/or modify it under
+the terms of the GNU Affero General Public License as published by the Free
+Software Foundation, either version 3 of the License, or (at your option) any
+later version.
 
-This program is distributed in the hope that it will be useful, but
-WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-Affero General Public License for more details.
+This program is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+details.
 
-You should have received a copy of the GNU Affero General Public
-License along with this program.  If not, see
-<http://www.gnu.org/licenses/>.
+You should have received a copy of the GNU Affero General Public License along
+with this program. If not, see <http://www.gnu.org/licenses/>.
 
 EOF
 }