X-Git-Url: https://git.proxmox.com/?p=pve-zsync.git;a=blobdiff_plain;f=pve-zsync;h=425ffa2a5220ec001956e9567b59e9fbfb9a9b01;hp=5b1f12c57e6c4f0be6c29d6da85c148295ed4982;hb=HEAD;hpb=cd9247d50b84082500634147d58e0244642e5a7d diff --git a/pve-zsync b/pve-zsync index 5b1f12c..de5d46f 100755 --- a/pve-zsync +++ b/pve-zsync @@ -2,14 +2,14 @@ use strict; use warnings; -use Data::Dumper qw(Dumper); + use Fcntl qw(:flock SEEK_END); use Getopt::Long qw(GetOptionsFromArray); -use File::Copy qw(move); use File::Path qw(make_path); use JSON; use IO::File; use String::ShellQuote 'shell_quote'; +use Text::ParseWords; my $PROGNAME = "pve-zsync"; my $CONFIG_PATH = "/var/lib/${PROGNAME}"; @@ -19,10 +19,18 @@ my $PATH = "/usr/sbin"; my $PVE_DIR = "/etc/pve/local"; my $QEMU_CONF = "${PVE_DIR}/qemu-server"; my $LXC_CONF = "${PVE_DIR}/lxc"; -my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock"; my $PROG_PATH = "$PATH/${PROGNAME}"; my $INTERVAL = 15; -my $DEBUG = 0; +my $DEBUG; + +BEGIN { + $DEBUG = 0; # change default here. not above on declaration! + $DEBUG ||= $ENV{ZSYNC_DEBUG}; + if ($DEBUG) { + require Data::Dumper; + Data::Dumper->import(); + } +} my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])"; my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)"; @@ -46,6 +54,10 @@ my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brac # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!; +my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|tpmstate|mp)\d+)|rootfs): /; + +my $INSTANCE_ID = get_instance_id($$); + my $command = $ARGV[0]; if (defined($command) && $command ne 'help' && $command ne 'printpod') { @@ -55,10 +67,9 @@ if (defined($command) && $command ne 'help' && $command ne 'printpod') { check_bin ('scp'); } -$SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = - sub { - die "Signal aborting sync\n"; - }; +$SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub { + die "Signaled, aborting sync: $!\n"; +}; sub check_bin { my ($bin) = @_; @@ -73,6 +84,19 @@ sub check_bin { die "unable to find command '$bin'\n"; } +sub read_file { + my ($filename, $one_line_only) = @_; + + my $fh = IO::File->new($filename, "r") + or die "Could not open file ${filename}: $!\n"; + + my $text = $one_line_only ? <$fh> : [ <$fh> ]; + + close($fh); + + return $text; +} + sub cut_target_width { my ($path, $maxlen) = @_; $path =~ s@/+@/@g; @@ -103,14 +127,20 @@ sub cut_target_width { return "$head/" . $path . "/$tail"; } -sub lock { - my ($fh) = @_; - flock($fh, LOCK_EX) || die "Can't lock config - $!\n"; -} +sub locked { + my ($lock_fn, $code) = @_; + + my $lock_fh = IO::File->new("> $lock_fn"); + + flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n"; + my $res = eval { $code->() }; + my $err = $@; + + flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n"; + die "$err" if $err; -sub unlock { - my ($fh) = @_; - flock($fh, LOCK_UN) || die "Can't unlock config- $!\n"; + close($lock_fh); + return $res; } sub get_status { @@ -123,15 +153,15 @@ sub get_status { return undef; } -sub check_pool_exists { - my ($target, $user) = @_; +sub check_dataset_exists { + my ($dataset, $ip, $user) = @_; my $cmd = []; - if ($target->{ip}) { - push @$cmd, 'ssh', "$user\@$target->{ip}", '--'; + if ($ip) { + push @$cmd, 'ssh', "$user\@$ip", '--'; } - push @$cmd, 'zfs', 'list', '-H', '--', $target->{all}; + push @$cmd, 'zfs', 'list', '-H', '--', $dataset; eval { run_cmd($cmd); }; @@ -142,6 +172,19 @@ sub check_pool_exists { return 1; } +sub create_file_system { + my ($file_system, $ip, $user) = @_; + + my $cmd = []; + + if ($ip) { + push @$cmd, 'ssh', "$user\@$ip", '--'; + } + push @$cmd, 'zfs', 'create', $file_system; + + run_cmd($cmd); +} + sub parse_target { my ($text) = @_; @@ -188,14 +231,9 @@ sub read_cron { return undef; } - my $fh = IO::File->new("< $CRONJOBS"); - die "Could not open file $CRONJOBS: $!\n" if !$fh; - - my @text = <$fh>; - - close($fh); + my $text = read_file($CRONJOBS, 0); - return encode_cron(@text); + return parse_cron(@{$text}); } sub parse_argv { @@ -207,12 +245,16 @@ sub parse_argv { verbose => undef, limit => undef, maxsnap => undef, + dest_maxsnap => undef, name => undef, skip => undef, method => undef, source_user => undef, dest_user => undef, + prepend_storage_id => undef, + compressed => undef, properties => undef, + dest_config_path => undef, }; my ($ret) = GetOptionsFromArray( @@ -222,12 +264,16 @@ sub parse_argv { 'verbose' => \$param->{verbose}, 'limit=i' => \$param->{limit}, 'maxsnap=i' => \$param->{maxsnap}, + 'dest-maxsnap=i' => \$param->{dest_maxsnap}, 'name=s' => \$param->{name}, 'skip' => \$param->{skip}, 'method=s' => \$param->{method}, 'source-user=s' => \$param->{source_user}, 'dest-user=s' => \$param->{dest_user}, + 'prepend-storage-id' => \$param->{prepend_storage_id}, + 'compressed' => \$param->{compressed}, 'properties' => \$param->{properties}, + 'dest-config-path=s' => \$param->{dest_config_path}, ); die "can't parse options\n" if $ret == 0; @@ -250,6 +296,7 @@ sub add_state_to_job { $job->{state} = $state->{state}; $job->{lsync} = $state->{lsync}; $job->{vm_type} = $state->{vm_type}; + $job->{instance_id} = $state->{instance_id}; for (my $i = 0; $state->{"snap$i"}; $i++) { $job->{"snap$i"} = $state->{"snap$i"}; @@ -258,14 +305,13 @@ sub add_state_to_job { return $job; } -sub encode_cron { +sub parse_cron { my (@text) = @_; my $cfg = {}; while (my $line = shift(@text)) { - - my @arg = split('\s', $line); + my @arg = Text::ParseWords::shellwords($line); my $param = parse_argv(@arg); if ($param->{source} && $param->{dest}) { @@ -285,18 +331,23 @@ sub param_to_job { my $job = {}; my $source = parse_target($param->{source}); - my $dest = parse_target($param->{dest}) if $param->{dest}; + my $dest; + $dest = parse_target($param->{dest}) if $param->{dest}; $job->{name} = !$param->{name} ? "default" : $param->{name}; $job->{dest} = $param->{dest} if $param->{dest}; $job->{method} = "local" if !$dest->{ip} && !$source->{ip}; $job->{method} = "ssh" if !$job->{method}; $job->{limit} = $param->{limit}; - $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap}; + $job->{maxsnap} = $param->{maxsnap}; + $job->{dest_maxsnap} = $param->{dest_maxsnap}; $job->{source} = $param->{source}; $job->{source_user} = $param->{source_user}; $job->{dest_user} = $param->{dest_user}; + $job->{prepend_storage_id} = !!$param->{prepend_storage_id}; + $job->{compressed} = !!$param->{compressed}; $job->{properties} = !!$param->{properties}; + $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path}; return $job; } @@ -312,29 +363,14 @@ sub read_state { return undef; } - my $fh = IO::File->new("< $STATE"); - die "Could not open file $STATE: $!\n" if !$fh; - - my $text = <$fh>; - my $states = decode_json($text); - - close($fh); - - return $states; + my $text = read_file($STATE, 1); + return decode_json($text); } sub update_state { my ($job) = @_; - my $text; - my $in_fh; - eval { - - $in_fh = IO::File->new("< $STATE"); - die "Could not open file $STATE: $!\n" if !$in_fh; - lock($in_fh); - $text = <$in_fh>; - }; + my $text = eval { read_file($STATE, 1); }; my $out_fh = IO::File->new("> $STATE.new"); die "Could not open file ${STATE}.new: $!\n" if !$out_fh; @@ -349,6 +385,7 @@ sub update_state { if ($job->{state} ne "del") { $state->{state} = $job->{state}; $state->{lsync} = $job->{lsync}; + $state->{instance_id} = $job->{instance_id}; $state->{vm_type} = $job->{vm_type}; for (my $i = 0; $job->{"snap$i"} ; $i++) { @@ -365,10 +402,7 @@ sub update_state { print $out_fh $text; close($out_fh); - move("$STATE.new", $STATE); - eval { - close($in_fh); - }; + rename "$STATE.new", $STATE; return $states; } @@ -383,13 +417,9 @@ sub update_cron { my $header = "SHELL=/bin/sh\n"; $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n"; - my $fh = IO::File->new("< $CRONJOBS"); - die "Could not open file $CRONJOBS: $!\n" if !$fh; - lock($fh); - - my @test = <$fh>; + my $current = read_file($CRONJOBS, 0); - while (my $line = shift(@test)) { + foreach my $line (@{$current}) { chomp($line); if ($line =~ m/source $job->{source} .*name $job->{name} /) { $updated = 1; @@ -414,11 +444,10 @@ sub update_cron { my $new_fh = IO::File->new("> ${CRONJOBS}.new"); die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh; - die "can't write to $CRONJOBS.new\n" if !print($new_fh $text); + print $new_fh $text or die "can't write to $CRONJOBS.new: $!\n"; close ($new_fh); - die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS"); - close ($fh); + rename "${CRONJOBS}.new", $CRONJOBS or die "can't move $CRONJOBS.new: $!\n"; } sub format_job { @@ -437,12 +466,16 @@ sub format_job { $text .= " root"; $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}"; $text .= " --name $job->{name} --maxsnap $job->{maxsnap}"; + $text .= " --dest-maxsnap $job->{dest_maxsnap}" if defined($job->{dest_maxsnap}); $text .= " --limit $job->{limit}" if $job->{limit}; $text .= " --method $job->{method}"; $text .= " --verbose" if $job->{verbose}; $text .= " --source-user $job->{source_user}"; $text .= " --dest-user $job->{dest_user}"; + $text .= " --prepend-storage-id" if $job->{prepend_storage_id}; + $text .= " --compressed" if $job->{compressed}; $text .= " --properties" if $job->{properties}; + $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path}; $text .= "\n"; return $text; @@ -491,48 +524,52 @@ sub vm_exists { sub init { my ($param) = @_; - my $cfg = read_cron(); + locked("$CONFIG_PATH/cron_and_state.lock", sub { + my $cfg = read_cron(); - my $job = param_to_job($param); + my $job = param_to_job($param); - $job->{state} = "ok"; - $job->{lsync} = 0; + $job->{state} = "ok"; + $job->{lsync} = 0; - my $source = parse_target($param->{source}); - my $dest = parse_target($param->{dest}); + my $source = parse_target($param->{source}); + my $dest = parse_target($param->{dest}); - if (my $ip = $dest->{ip}) { - run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]); - } + if (my $ip = $dest->{ip}) { + run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]); + } - if (my $ip = $source->{ip}) { - run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]); - } + if (my $ip = $source->{ip}) { + run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]); + } - die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user}); + die "Pool $dest->{all} does not exist\n" + if !check_dataset_exists($dest->{all}, $dest->{ip}, $param->{dest_user}); - if (!defined($source->{vmid})) { - die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user}); - } + if (!defined($source->{vmid})) { + die "Pool $source->{all} does not exist\n" + if !check_dataset_exists($source->{all}, $source->{ip}, $param->{source_user}); + } - my $vm_type = vm_exists($source, $param->{source_user}); - $job->{vm_type} = $vm_type; - $source->{vm_type} = $vm_type; + my $vm_type = vm_exists($source, $param->{source_user}); + $job->{vm_type} = $vm_type; + $source->{vm_type} = $vm_type; - die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type; + die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type; - die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}}; + die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}}; - #check if vm has zfs disks if not die; - get_disks($source, $param->{source_user}) if $source->{vmid}; + #check if vm has zfs disks if not die; + get_disks($source, $param->{source_user}) if $source->{vmid}; - update_cron($job); - update_state($job); + update_cron($job); + update_state($job); + }); #cron and state lock - eval { - sync($param) if !$param->{skip}; - }; - if(my $err = $@) { + return if $param->{skip}; + + eval { sync($param) }; + if (my $err = $@) { destroy_job($param); print $err; } @@ -557,123 +594,219 @@ sub get_job { sub destroy_job { my ($param) = @_; - my $job = get_job($param); - $job->{state} = "del"; + locked("$CONFIG_PATH/cron_and_state.lock", sub { + my $job = get_job($param); + $job->{state} = "del"; - update_cron($job); - update_state($job); + update_cron($job); + update_state($job); + }); +} + +sub get_instance_id { + my ($pid) = @_; + + my $stat = read_file("/proc/$pid/stat", 1) + or die "unable to read process stats\n"; + my $boot_id = read_file("/proc/sys/kernel/random/boot_id", 1) + or die "unable to read boot ID\n"; + + my $stats = [ split(/\s+/, $stat) ]; + my $starttime = $stats->[21]; + chomp($boot_id); + + return "${pid}:${starttime}:${boot_id}"; +} + +sub instance_exists { + my ($instance_id) = @_; + + if (defined($instance_id) && $instance_id =~ m/^([1-9][0-9]*):/) { + my $pid = $1; + my $actual_id = eval { get_instance_id($pid); }; + return defined($actual_id) && $actual_id eq $instance_id; + } + + return 0; } sub sync { my ($param) = @_; - my $lock_fh = IO::File->new("> $LOCKFILE"); - die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh; - lock($lock_fh); - - my $date = get_date(); my $job; - eval { - $job = get_job($param); - }; - if ($job && defined($job->{state}) && $job->{state} eq "syncing") { - die "Job --source $param->{source} --name $param->{name} is syncing at the moment"; - } + locked("$CONFIG_PATH/cron_and_state.lock", sub { + eval { $job = get_job($param) }; - my $dest = parse_target($param->{dest}); - my $source = parse_target($param->{source}); + if ($job) { + my $state = $job->{state} // 'ok'; + $state = 'ok' if !instance_exists($job->{instance_id}); - my $sync_path = sub { - my ($source, $dest, $job, $param, $date) = @_; + if ($state eq "syncing" || $state eq "waiting") { + die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n"; + } - ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user}); + $job->{state} = "waiting"; + $job->{instance_id} = $INSTANCE_ID; - snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user}); + update_state($job); + } + }); - send_image($source, $dest, $param); + locked("$CONFIG_PATH/sync.lock", sub { - snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap}); + my $date = get_date(); - }; + my $dest; + my $source; + my $vm_type; - my $vm_type = vm_exists($source, $param->{source_user}); - $source->{vm_type} = $vm_type; + locked("$CONFIG_PATH/cron_and_state.lock", sub { + #job might've changed while we waited for the sync lock, but we can be sure it's not syncing + eval { $job = get_job($param); }; - if ($job) { - $job->{state} = "syncing"; - $job->{vm_type} = $vm_type if !$job->{vm_type}; - update_state($job); - } + if ($job && defined($job->{state}) && $job->{state} eq "stopped") { + die "Job --source $param->{source} --name $param->{name} has been disabled\n"; + } - eval{ - if ($source->{vmid}) { - die "VM $source->{vmid} doesn't exist\n" if !$vm_type; - die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root"); - my $disks = get_disks($source, $param->{source_user}); - - foreach my $disk (sort keys %{$disks}) { - $source->{all} = $disks->{$disk}->{all}; - $source->{pool} = $disks->{$disk}->{pool}; - $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path}; - $source->{last_part} = $disks->{$disk}->{last_part}; - &$sync_path($source, $dest, $job, $param, $date); + $dest = parse_target($param->{dest}); + $source = parse_target($param->{source}); + + $vm_type = vm_exists($source, $param->{source_user}); + $source->{vm_type} = $vm_type; + + if ($job) { + $job->{state} = "syncing"; + $job->{vm_type} = $vm_type if !$job->{vm_type}; + update_state($job); } - if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) { - send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}); + }); #cron and state lock + + my $sync_path = sub { + my ($source, $dest, $job, $param, $date) = @_; + + my $dest_dataset = target_dataset($source, $dest); + + ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get( + $dest_dataset, + $param->{dest_maxsnap} // $param->{maxsnap}, + $param->{name}, + $dest->{ip}, + $param->{dest_user}, + ); + + ($source->{old_snap}) = snapshot_get( + $source->{all}, + $param->{maxsnap}, + $param->{name}, + $source->{ip}, + $param->{source_user}, + ); + + prepare_prepended_target($source, $dest, $param->{dest_user}) if defined($dest->{prepend}); + + snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user}); + + send_image($source, $dest, $param); + + for my $old_snap (@{$source->{old_snap}}) { + snapshot_destroy($source->{all}, $old_snap, $source->{ip}, $param->{source_user}); + } + + for my $old_snap (@{$dest->{old_snap}}) { + snapshot_destroy($dest_dataset, $old_snap, $dest->{ip}, $param->{dest_user}); + } + }; + + eval{ + if ($source->{vmid}) { + die "VM $source->{vmid} doesn't exist\n" if !$vm_type; + die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root"); + my $disks = get_disks($source, $param->{source_user}); + + foreach my $disk (sort keys %{$disks}) { + $source->{all} = $disks->{$disk}->{all}; + $source->{pool} = $disks->{$disk}->{pool}; + $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path}; + $source->{last_part} = $disks->{$disk}->{last_part}; + + $dest->{prepend} = $disks->{$disk}->{storage_id} + if $param->{prepend_storage_id}; + + &$sync_path($source, $dest, $job, $param, $date); + } + if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) { + send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path}); + } else { + send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path}); + } } else { - send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}); + &$sync_path($source, $dest, $job, $param, $date); } - } else { - &$sync_path($source, $dest, $job, $param, $date); - } - }; - if(my $err = $@) { - if ($job) { - $job->{state} = "error"; - update_state($job); - unlock($lock_fh); - close($lock_fh); + }; + if (my $err = $@) { + locked("$CONFIG_PATH/cron_and_state.lock", sub { + eval { $job = get_job($param); }; + if ($job) { + $job->{state} = "error"; + delete $job->{instance_id}; + update_state($job); + } + }); print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n"; + die "$err\n"; } - die "$err\n"; - } - - if ($job) { - $job->{state} = "ok"; - $job->{lsync} = $date; - update_state($job); - } - unlock($lock_fh); - close($lock_fh); + locked("$CONFIG_PATH/cron_and_state.lock", sub { + eval { $job = get_job($param); }; + if ($job) { + if (defined($job->{state}) && $job->{state} eq "stopped") { + $job->{state} = "stopped"; + } else { + $job->{state} = "ok"; + } + $job->{lsync} = $date; + delete $job->{instance_id}; + update_state($job); + } + }); + }); #sync lock } sub snapshot_get{ - my ($source, $dest, $max_snap, $name, $source_user) = @_; + my ($dataset, $max_snap, $name, $ip, $user) = @_; my $cmd = []; - push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip}; + push @$cmd, 'ssh', "$user\@$ip", '--', if $ip; push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation'; - push @$cmd, $source->{all}; + push @$cmd, $dataset; + + my $raw; + eval {$raw = run_cmd($cmd)}; + if (my $erro =$@) { #this means the volume doesn't exist on dest yet + return undef; + } - my $raw = run_cmd($cmd); my $index = 0; my $line = ""; my $last_snap = undef; - my $old_snap; + my $old_snap = []; while ($raw && $raw =~ s/^(.*?)(\n|$)//) { $line = $1; + if ($line =~ m/@(.*)$/) { + $last_snap = $1 if (!$last_snap); + } if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) { + # interpreted as infinity + last if $max_snap <= 0; - $last_snap = $1 if (!$last_snap); - $old_snap = $1; + my $snap = $1; $index++; - if ($index == $max_snap) { - $source->{destroy} = 1; - last; - }; + + if ($index >= $max_snap) { + push @{$old_snap}, $snap; + } } } @@ -699,47 +832,11 @@ sub snapshot_add { }; if (my $err = $@) { - snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user); + snapshot_destroy($source->{all}, $snap_name, $source->{ip}, $source_user); die "$err\n"; } } -sub write_cron { - my ($cfg) = @_; - - my $text = "SHELL=/bin/sh\n"; - $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n"; - - my $fh = IO::File->new("> $CRONJOBS"); - die "Could not open file: $!\n" if !$fh; - - foreach my $source (sort keys%{$cfg}) { - foreach my $sync_name (sort keys%{$cfg->{$source}}) { - next if $cfg->{$source}->{$sync_name}->{status} ne 'ok'; - $text .= "$PROG_PATH sync"; - $text .= " -source "; - if ($cfg->{$source}->{$sync_name}->{vmid}) { - $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip}; - $text .= "$cfg->{$source}->{$sync_name}->{vmid} "; - } else { - $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip}; - $text .= "$cfg->{$source}->{$sync_name}->{source_pool}"; - $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path}; - } - $text .= " -dest "; - $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip}; - $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}"; - $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path}; - $text .= " -name $sync_name "; - $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit}; - $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap}; - $text .= "\n"; - } - } - die "Can't write to cron\n" if (!print($fh $text)); - close($fh); -} - sub get_disks { my ($target, $user) = @_; @@ -788,7 +885,7 @@ sub parse_disks { my $line = $1; next if $line =~ /media=cdrom/; - next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /; + next if $line !~ m/$DISK_KEY_RE/; #QEMU if backup is not set include in sync next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/); @@ -798,11 +895,11 @@ sub parse_disks { my $disk = undef; my $stor = undef; - if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) { + if($line =~ m/$DISK_KEY_RE(.*)$/) { my @parameter = split(/,/,$1); foreach my $opt (@parameter) { - if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){ + if ($opt =~ m/^(?:file=|volume=)?([^:]+):([A-Za-z0-9\-]+)$/){ $disk = $2; $stor = $1; last; @@ -816,11 +913,13 @@ sub parse_disks { my $cmd = []; push @$cmd, 'ssh', "$user\@$ip", '--' if $ip; - push @$cmd, 'pvesm', 'path', "$stor$disk"; + push @$cmd, 'pvesm', 'path', "$stor:$disk"; my $path = run_cmd($cmd); - die "Get no path from pvesm path $stor$disk\n" if !$path; - + die "Get no path from pvesm path $stor:$disk\n" if !$path; + + $disks->{$num}->{storage_id} = $stor; + if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) { my @array = split('/', $1); @@ -850,23 +949,53 @@ sub parse_disks { $num++; } else { - die "ERROR: in path\n"; + die "unexpected path '$path'\n"; } } - die "Vm include no disk on zfs.\n" if !$disks->{0}; + die "Guest does not include any ZFS volumes (or all are excluded by the backup flag).\n" + if !$disks->{0}; return $disks; } +# how the corresponding dataset is named on the target +sub target_dataset { + my ($source, $dest) = @_; + + my $target = "$dest->{all}"; + $target .= "/$dest->{prepend}" if defined($dest->{prepend}); + $target .= "/$source->{last_part}" if $source->{last_part}; + $target =~ s!/+!/!g; + + return $target; +} + +# create the parent dataset for the actual target +sub prepare_prepended_target { + my ($source, $dest, $dest_user) = @_; + + die "internal error - not a prepended target\n" if !defined($dest->{prepend}); + + # The parent dataset shouldn't be the actual target. + die "internal error - no last_part for source\n" if !$source->{last_part}; + + my $target = "$dest->{all}/$dest->{prepend}"; + $target =~ s!/+!/!g; + + return if check_dataset_exists($target, $dest->{ip}, $dest_user); + + create_file_system($target, $dest->{ip}, $dest_user); +} + sub snapshot_destroy { - my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_; + my ($dataset, $snap, $ip, $user) = @_; my @zfscmd = ('zfs', 'destroy'); - my $snapshot = "$source->{all}\@$snap"; + my $snapshot = "$dataset\@$snap"; eval { - if($source->{ip} && $method eq 'ssh'){ - run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]); + if ($ip) { + run_cmd(['ssh', "$user\@$ip", '--', @zfscmd, $snapshot]); } else { run_cmd([@zfscmd, $snapshot]); } @@ -874,46 +1003,27 @@ sub snapshot_destroy { if (my $erro = $@) { warn "WARN: $erro"; } - if ($dest) { - my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : (); - - my $path = "$dest->{all}"; - $path .= "/$source->{last_part}" if $source->{last_part}; - - eval { - run_cmd([@ssh, @zfscmd, "$path\@$snap"]); - }; - if (my $erro = $@) { - warn "WARN: $erro"; - } - } } +# check if snapshot for incremental sync exist on source side sub snapshot_exist { - my ($source , $dest, $method, $dest_user) = @_; + my ($source , $dest, $method, $source_user) = @_; my $cmd = []; - push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--' if $dest->{ip}; + push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip}; push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name'; - my $path = $dest->{all}; - $path .= "/$source->{last_part}" if $source->{last_part}; - $path .= "\@$source->{old_snap}"; + my $path = $source->{all}; + $path .= "\@$dest->{last_snap}"; push @$cmd, $path; - - my $text = ""; - eval {$text =run_cmd($cmd);}; + eval {run_cmd($cmd)}; if (my $erro =$@) { warn "WARN: $erro"; return undef; } - - while ($text && $text =~ s/^(.*?)(\n|$)//) { - my $line =$1; - return 1 if $line =~ m/^.*$source->{old_snap}$/; - } + return 1; } sub send_image { @@ -923,11 +1033,13 @@ sub send_image { push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip}; push @$cmd, 'zfs', 'send'; + push @$cmd, '-L', if $param->{compressed}; # no effect if dataset never had large recordsize + push @$cmd, '-c', if $param->{compressed}; push @$cmd, '-p', if $param->{properties}; push @$cmd, '-v' if $param->{verbose}; - if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{dest_user})) { - push @$cmd, '-i', "$source->{all}\@$source->{last_snap}"; + if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) { + push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}"; } push @$cmd, '--', "$source->{all}\@$source->{new_snap}"; @@ -935,9 +1047,7 @@ sub send_image { my $bwl = $param->{limit}*1024; push @$cmd, \'|', 'cstream', '-t', $bwl; } - my $target = "$dest->{all}"; - $target .= "/$source->{last_part}" if $source->{last_part}; - $target =~ s!/+!/!g; + my $target = target_dataset($source, $dest); push @$cmd, \'|'; push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip}; @@ -949,19 +1059,20 @@ sub send_image { }; if (my $erro = $@) { - snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user}); + snapshot_destroy($source->{all}, $source->{new_snap}, $source->{ip}, $param->{source_user}); die $erro; }; } sub send_config{ - my ($source, $dest, $method, $source_user, $dest_user) = @_; + my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_; my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf"; my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}"; - my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH; + my $config_dir = $dest_config_path // $CONFIG_PATH; + $config_dir .= "/$dest->{last_part}" if $dest->{last_part}; $dest_target_new = $config_dir.'/'.$dest_target_new; @@ -977,8 +1088,8 @@ sub send_config{ run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]); } - if ($source->{destroy}){ - my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}"; + for my $old_snap (@{$dest->{old_snap}}) { + my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.${old_snap}"; if($dest->{ip}){ run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]); } else { @@ -1019,117 +1130,133 @@ sub status { sub enable_job { my ($param) = @_; - my $job = get_job($param); - $job->{state} = "ok"; - update_state($job); - update_cron($job); + locked("$CONFIG_PATH/cron_and_state.lock", sub { + my $job = get_job($param); + $job->{state} = "ok"; + update_state($job); + update_cron($job); + }); } sub disable_job { my ($param) = @_; - my $job = get_job($param); - $job->{state} = "stopped"; - update_state($job); - update_cron($job); + locked("$CONFIG_PATH/cron_and_state.lock", sub { + my $job = get_job($param); + $job->{state} = "stopped"; + update_state($job); + update_cron($job); + }); } my $cmd_help = { destroy => qq{ -$PROGNAME destroy -source [OPTIONS] - - remove a sync Job from the scheduler +$PROGNAME destroy --source [OPTIONS] - -name string - - name of the sync job, if not set it is default + Remove a sync Job from the scheduler - -source string + --name string + The name of the sync job, if not set 'default' is used. - the source can be an or [IP:][/Path] + --source string + The source can be an or [IP:][/Path] }, create => qq{ -$PROGNAME create -dest -source [OPTIONS] +$PROGNAME create --dest --source [OPTIONS] - Create a sync Job + Create a new sync-job - -dest string + --dest string + The destination target is like [IP]:[/Path] - the destination target is like [IP]:[/Path] + --dest-user string + The name of the user on the destination target, root by default - -dest-user string + --limit integer + Maximal sync speed in kBytes/s, default is unlimited - name of the user on the destination target, root by default + --maxsnap integer + The number of snapshots to keep until older ones are erased. + The default is 1, use 0 for unlimited. - -limit integer + --dest-maxsnap integer + Override maxsnap for the destination dataset. - max sync speed in kBytes/s, default unlimited + --name string + The name of the sync job, if not set it is default - -maxsnap string + --prepend-storage-id + If specified, prepend the storage ID to the destination's path(s). - how much snapshots will be kept before get erased, default 1 - - -name string - - name of the sync job, if not set it is default + --skip + If specified, skip the first sync. - -skip boolean + --source string + The source can be an or [IP:][/Path] - if this flag is set it will skip the first sync + --source-user string + The (ssh) user-name on the source target, root by default - -source string + --compressed + If specified, send data without decompressing first. If features lz4_compress, + zstd_compress or large_blocks are in use by the source, they need to be enabled on + the target as well. - the source can be an or [IP:][/Path] + --properties + If specified, include the dataset's properties in the stream. - -source-user string - - name of the user on the source target, root by default - - -properties boolean - - Include the dataset's properties in the stream. + --dest-config-path string + Specifies a custom config path on the destination target. + The default is /var/lib/pve-zsync }, sync => qq{ -$PROGNAME sync -dest -source [OPTIONS]\n - - will sync one time - - -dest string - - the destination target is like [IP:][/Path] +$PROGNAME sync --dest --source [OPTIONS]\n - -dest-user string + Trigger one sync. - name of the user on the destination target, root by default + --dest string + The destination target is like [IP:][/Path] - -limit integer + --dest-user string + The (ssh) user-name on the destination target, root by default - max sync speed in kBytes/s, default unlimited + --limit integer + The maximal sync speed in kBytes/s, default is unlimited - -maxsnap integer + --maxsnap integer + The number of snapshots to keep until older ones are erased. + The default is 1, use 0 for unlimited. - how much snapshots will be kept before get erased, default 1 + --dest-maxsnap integer + Override maxsnap for the destination dataset. - -name string - - name of the sync job, if not set it is default. + --name string + The name of the sync job, if not set it is 'default'. It is only necessary if scheduler allready contains this source. - -source string - - the source can be an or [IP:][/Path] + --prepend-storage-id + If specified, prepend the storage ID to the destination's path(s). - -source-user string + --source string + The source can either be an or [IP:][/Path] - name of the user on the source target, root by default + --source-user string + The name of the user on the source target, root by default - -verbose boolean + --verbose + If specified, print out the sync progress. - print out the sync progress. + --compressed + If specified, send data without decompressing first. If features lz4_compress, + zstd_compress or large_blocks are in use by the source, they need to be enabled on + the target as well. - -properties boolean + --properties + If specified, include the dataset's properties in the stream. - Include the dataset's properties in the stream. + --dest-config-path string + Specifies a custom config path on the destination target. + The default is /var/lib/pve-zsync }, list => qq{ $PROGNAME list @@ -1144,43 +1271,37 @@ $PROGNAME status help => qq{ $PROGNAME help [OPTIONS] - Get help about specified command. - - string + Get help about specified command. - Command name - - -verbose boolean + string + Command name to get help about. + --verbose Verbose output format. }, enable => qq{ -$PROGNAME enable -source [OPTIONS] - - enable a syncjob and reset error +$PROGNAME enable --source [OPTIONS] - -name string + Enable a sync-job and reset all job-errors, if any. + --name string name of the sync job, if not set it is default - -source string - - the source can be an or [IP:][/Path] + --source string + the source can be an or [IP:][/Path] }, disable => qq{ -$PROGNAME disable -source [OPTIONS] +$PROGNAME disable --source [OPTIONS] - pause a sync job + Disables (pauses) a sync-job - -name string + --name string + name of the sync-job, if not set it is default - name of the sync job, if not set it is default - - -source string - - the source can be an or [IP:][/Path] + --source string + the source can be an or [IP:][/Path] }, - printpod => 'internal command', + printpod => "$PROGNAME printpod\n\n\tinternal command", }; @@ -1263,13 +1384,13 @@ sub usage { print("ERROR:\tno command specified\n") if !$help; print("USAGE:\t$PROGNAME [ARGS] [OPTIONS]\n"); print("\t$PROGNAME help [] [OPTIONS]\n\n"); - print("\t$PROGNAME create -dest -source [OPTIONS]\n"); - print("\t$PROGNAME destroy -source [OPTIONS]\n"); - print("\t$PROGNAME disable -source [OPTIONS]\n"); - print("\t$PROGNAME enable -source [OPTIONS]\n"); + print("\t$PROGNAME create --dest --source [OPTIONS]\n"); + print("\t$PROGNAME destroy --source [OPTIONS]\n"); + print("\t$PROGNAME disable --source [OPTIONS]\n"); + print("\t$PROGNAME enable --source [OPTIONS]\n"); print("\t$PROGNAME list\n"); print("\t$PROGNAME status\n"); - print("\t$PROGNAME sync -dest -source [OPTIONS]\n"); + print("\t$PROGNAME sync --dest --source [OPTIONS]\n"); } sub check_target { @@ -1280,57 +1401,62 @@ sub check_target { sub print_pod { my $synopsis = join("\n", sort values %$cmd_help); + my $commands = join(", ", sort keys %$cmd_help); print < [ARGS] [OPTIONS] -$synopsis +Where can be one of: $commands =head1 DESCRIPTION -This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers. -This tool also has the capability to add jobs to cron so the sync will be automatically done. -The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync. -To config cron see man crontab. +The pve-zsync tool can help you to sync your VMs or directories stored on ZFS +between multiple servers. + +pve-zsync is able to automatically configure CRON jobs, so that a periodic sync +will be automatically triggered. +The default sync interval is 15 min, if you want to change this value you can +do this in F. If you need help to configure CRON tabs, see +man crontab. -=head2 PVE ZFS Storage sync Tool +=head1 COMMANDS AND OPTIONS -This Tool can get remote pool on other PVE or send Pool to others ZFS machines +$synopsis =head1 EXAMPLES -add sync job from local VM to remote ZFS Server -pve-zsync create -source=100 -dest=192.168.1.2:zfspool +Adds a job for syncing the local VM 100 to a remote server's ZFS pool named "tank": + pve-zsync create --source=100 -dest=192.168.1.2:tank =head1 IMPORTANT FILES -Cron jobs and config are stored at /etc/cron.d/pve-zsync +Cron jobs and config are stored in F -The VM config get copied on the destination machine to /var/lib/pve-zsync/ +The VM configuration itself gets copied to the destination machines +F path. =head1 COPYRIGHT AND DISCLAIMER -Copyright (C) 2007-2015 Proxmox Server Solutions GmbH +Copyright (C) 2007-2021 Proxmox Server Solutions GmbH -This program is free software: you can redistribute it and/or modify it -under the terms of the GNU Affero General Public License as published -by the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. +This program is free software: you can redistribute it and/or modify it under +the terms of the GNU Affero General Public License as published by the Free +Software Foundation, either version 3 of the License, or (at your option) any +later version. -This program is distributed in the hope that it will be useful, but -WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -Affero General Public License for more details. +This program is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +PARTICULAR PURPOSE. See the GNU Affero General Public License for more +details. -You should have received a copy of the GNU Affero General Public -License along with this program. If not, see -. +You should have received a copy of the GNU Affero General Public License along +with this program. If not, see . EOF }