X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=pve-zsync;h=881b9c85cbfb98c6cd8a0e0341a4482ba4ad07c5;hb=f06c336b2a3fd5f97a95cc48e9fe6b62d46e7e8f;hp=3d9780c3dd5890570bea9f192b3512ac80f209ed;hpb=7d93705fdf059659c5498f1999f930f7bb7b506b;p=pve-zsync.git diff --git a/pve-zsync b/pve-zsync old mode 100644 new mode 100755 index 3d9780c..881b9c8 --- a/pve-zsync +++ b/pve-zsync @@ -1,611 +1,779 @@ #!/usr/bin/perl -my $PROGNAME = "pve-zsync"; -my $CONFIG_PATH = '/var/lib/'.$PROGNAME.'/'; -my $CONFIG = "$PROGNAME.cfg"; -my $CRONJOBS = '/etc/cron.d/'.$PROGNAME; -my $VMCONFIG = '/var/lib/'.$PROGNAME.'/'; -my $PATH = "/usr/sbin/"; -my $QEMU_CONF = '/etc/pve/local/qemu-server/'; -my $DEBUG = 0; - use strict; use warnings; -use Data::Dumper qw(Dumper); + use Fcntl qw(:flock SEEK_END); -use Getopt::Long; -use Switch; +use Getopt::Long qw(GetOptionsFromArray); +use File::Path qw(make_path); +use JSON; +use IO::File; +use String::ShellQuote 'shell_quote'; + +my $PROGNAME = "pve-zsync"; +my $CONFIG_PATH = "/var/lib/${PROGNAME}"; +my $STATE = "${CONFIG_PATH}/sync_state"; +my $CRONJOBS = "/etc/cron.d/$PROGNAME"; +my $PATH = "/usr/sbin"; +my $PVE_DIR = "/etc/pve/local"; +my $QEMU_CONF = "${PVE_DIR}/qemu-server"; +my $LXC_CONF = "${PVE_DIR}/lxc"; +my $PROG_PATH = "$PATH/${PROGNAME}"; +my $INTERVAL = 15; +my $DEBUG; + +BEGIN { + $DEBUG = 0; # change default here. not above on declaration! + $DEBUG ||= $ENV{ZSYNC_DEBUG}; + if ($DEBUG) { + require Data::Dumper; + Data::Dumper->import(); + } +} + +my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])"; +my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)"; +my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})"; +my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))"; + +my $IPV6RE = "(?:" . + "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" . + "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" . + "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" . + "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" . + "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" . + "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" . + "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" . + "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" . + "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))"; + +my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address +my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too +my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets +# targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional +my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!; + +my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|mp)\d+)|rootfs): /; + +my $command = $ARGV[0]; + +if (defined($command) && $command ne 'help' && $command ne 'printpod') { + check_bin ('cstream'); + check_bin ('zfs'); + check_bin ('ssh'); + check_bin ('scp'); +} -check_bin ('cstream'); -check_bin ('zfs'); -check_bin ('ssh'); -check_bin ('scp'); +$SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub { + die "Signaled, aborting sync: $!\n"; +}; sub check_bin { my ($bin) = @_; foreach my $p (split (/:/, $ENV{PATH})) { - my $fn = "$p/$bin"; - if (-x $fn) { - return $fn; + my $fn = "$p/$bin"; + if (-x $fn) { + return $fn; } } - warn "unable to find command '$bin'\n"; + die "unable to find command '$bin'\n"; } -sub cut_to_width { - my ($text, $max) = @_; +sub cut_target_width { + my ($path, $maxlen) = @_; + $path =~ s@/+@/@g; - return $text if (length($text) <= $max); - my @spl = split('/', $text); + return $path if length($path) <= $maxlen; - my $count = length($spl[@spl-1]); - return "..\/".substr($spl[@spl-1],($count-$max)+3 ,$count) if $count > $max; + return '..'.substr($path, -$maxlen+2) if $path !~ m@/@; - $count += length($spl[0]) if @spl > 1; - return substr($spl[0], 0, $max-4-length($spl[@spl-1]))."\/..\/".$spl[@spl-1] if $count > $max; + $path =~ s@/([^/]+/?)$@@; + my $tail = $1; - my $rest = 1 ; - $rest = $max-$count if ($max-$count > 0); + if (length($tail)+3 == $maxlen) { + return "../$tail"; + } elsif (length($tail)+2 >= $maxlen) { + return '..'.substr($tail, -$maxlen+2) + } - return "$spl[0]".substr($text, length($spl[0]), $rest)."..\/".$spl[@spl-1]; -} + $path =~ s@(/[^/]+)(?:/|$)@@; + my $head = $1; + my $both = length($head) + length($tail); + my $remaining = $maxlen-$both-4; # -4 for "/../" -sub lock { - my ($fh) = @_; - flock($fh, LOCK_EX) or die "Cannot lock config - $!\n"; + if ($remaining < 0) { + return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../" + } - seek($fh, 0, SEEK_END) or die "Cannot seek - $!\n"; + substr($path, ($remaining/2), (length($path)-$remaining), '..'); + return "$head/" . $path . "/$tail"; } -sub unlock { - my ($fh) = @_; - flock($fh, LOCK_UN) or die "Cannot unlock config- $!\n"; +sub locked { + my ($lock_fn, $code) = @_; + + my $lock_fh = IO::File->new("> $lock_fn"); + + flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n"; + my $res = eval { $code->() }; + my $err = $@; + + flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n"; + die "$err" if $err; + + close($lock_fh); + return $res; } -sub check_config { - my ($source, $name, $cfg) = @_; +sub get_status { + my ($source, $name, $status) = @_; - if ($source->{vmid} && $cfg->{$source->{vmid}}->{$name}->{locked}){ - return "active" if $cfg->{$source->{vmid}}->{$name}->{locked} eq 'yes'; - return "exist" if $cfg->{$source->{vmid}}->{$name}->{locked} eq 'no'; - } elsif ($cfg->{$source->{abs_path}}->{$name}->{locked}) { - return "active" if $cfg->{$source->{abs_path}}->{$name}->{locked} eq 'yes'; - return "exist" if $cfg->{$source->{abs_path}}->{$name}->{locked} eq 'no'; + if ($status->{$source->{all}}->{$name}->{status}) { + return $status; } return undef; } +sub check_pool_exists { + my ($target, $user) = @_; -sub check_pool_exsits { - my ($pool, $ip) = @_; + my $cmd = []; - my $cmd = ''; - $cmd = "ssh root\@$ip " if $ip; - $cmd .= "zfs list $pool -H"; + if ($target->{ip}) { + push @$cmd, 'ssh', "$user\@$target->{ip}", '--'; + } + push @$cmd, 'zfs', 'list', '-H', '--', $target->{all}; eval { run_cmd($cmd); }; - if($@){ - return 1; + if ($@) { + return 0; } - return undef; + return 1; } -sub write_to_config { - my ($cfg) = @_; +sub parse_target { + my ($text) = @_; - open(my $fh, ">", "$CONFIG_PATH$CONFIG") - or die "cannot open >$CONFIG_PATH$CONFIG: $!\n"; + my $errstr = "$text : is not a valid input! Use [IP:] or [IP:][/Path]"; + my $target = {}; - my $text = decode_config($cfg); + if ($text !~ $TARGETRE) { + die "$errstr\n"; + } + $target->{all} = $2; + $target->{ip} = $1 if $1; + my @parts = split('/', $2); - print($fh $text); + $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip}; - close($fh); -} + my $pool = $target->{pool} = shift(@parts); + die "$errstr\n" if !$pool; -sub read_from_config { + if ($pool =~ m/^\d+$/) { + $target->{vmid} = $pool; + delete $target->{pool}; + } - unless(-e "$CONFIG_PATH$CONFIG") { - return undef; + return $target if (@parts == 0); + $target->{last_part} = pop(@parts); + + if ($target->{ip}) { + pop(@parts); + } + if (@parts > 0) { + $target->{path} = join('/', @parts); } - open(my $fh, "<", "$CONFIG_PATH$CONFIG") - or die "cannot open > $CONFIG_PATH$CONFIG: $!\n"; + return $target; +} - $/ = undef; +sub read_cron { - my $text = <$fh>; + #This is for the first use to init file; + if (!-e $CRONJOBS) { + my $new_fh = IO::File->new("> $CRONJOBS"); + die "Could not create $CRONJOBS: $!\n" if !$new_fh; + close($new_fh); + return undef; + } - unlock($fh); + my $fh = IO::File->new("< $CRONJOBS"); + die "Could not open file $CRONJOBS: $!\n" if !$fh; - close($fh); + my @text = <$fh>; - my $cfg = encode_config($text); + close($fh); - return $cfg; + return encode_cron(@text); } -sub decode_config { - my ($cfg) = @_; - my $raw = ''; - foreach my $source (sort keys%{$cfg}){ - foreach my $sync_name (sort keys%{$cfg->{$source}}){ - $raw .= "$source: $sync_name\n"; - foreach my $parameter (sort keys%{$cfg->{$source}->{$sync_name}}){ - $raw .= "\t$parameter: $cfg->{$source}->{$sync_name}->{$parameter}\n"; - } - } - } - return $raw; +sub parse_argv { + my (@arg) = @_; + + my $param = { + dest => undef, + source => undef, + verbose => undef, + limit => undef, + maxsnap => undef, + name => undef, + skip => undef, + method => undef, + source_user => undef, + dest_user => undef, + properties => undef, + dest_config_path => undef, + }; + + my ($ret) = GetOptionsFromArray( + \@arg, + 'dest=s' => \$param->{dest}, + 'source=s' => \$param->{source}, + 'verbose' => \$param->{verbose}, + 'limit=i' => \$param->{limit}, + 'maxsnap=i' => \$param->{maxsnap}, + 'name=s' => \$param->{name}, + 'skip' => \$param->{skip}, + 'method=s' => \$param->{method}, + 'source-user=s' => \$param->{source_user}, + 'dest-user=s' => \$param->{dest_user}, + 'properties' => \$param->{properties}, + 'dest-config-path=s' => \$param->{dest_config_path}, + ); + + die "can't parse options\n" if $ret == 0; + + $param->{name} //= "default"; + $param->{maxsnap} //= 1; + $param->{method} //= "ssh"; + $param->{source_user} //= "root"; + $param->{dest_user} //= "root"; + + return $param; } -sub encode_config { - my ($raw) = @_; - my $cfg = {}; - my $source; - my $check = 0; - my $sync_name; +sub add_state_to_job { + my ($job) = @_; - while ($raw && $raw =~ s/^(.*?)(\n|$)//) { - my $line = $1; + my $states = read_state(); + my $state = $states->{$job->{source}}->{$job->{name}}; - next if $line =~ m/^\#/; - next if $line =~ m/^\s*$/; - - if ($line =~ m/^(\t| )(\w+): (.+)/){ - my $par = $2; - my $value = $3; - - if ($par eq 'source_pool') { - $cfg->{$source}->{$sync_name}->{$par} = $value; - die "error in Config: SourcePool value doubled\n" if ($check & 1); - $check += 1; - } elsif ($par eq 'source_ip') { - $cfg->{$source}->{$sync_name}->{$par} = $value; - die "error in Config: SourceIP value doubled\n" if ($check & 2); - $check += 2; - } elsif ($par eq 'locked') { - $cfg->{$source}->{$sync_name}->{$par} = $value; - die "error in Config: Locked value doubled\n" if ($check & 4); - $check += 4; - } elsif ($par eq 'method') { - $cfg -> {$source}->{$sync_name}->{$par} = $value; - die "error in Config: Method value doubled\n" if ($check & 8); - $check += 8; - } elsif ($par eq 'interval') { - $cfg -> {$source}->{$sync_name}->{$par} = $value; - die "error in Config: Iterval value doubled\n" if ($check & 16); - $check += 16; - } elsif ($par eq 'limit') { - $cfg -> {$source}->{$sync_name}->{$par} = $value; - die "error in Config: Limit value doubled\n" if ($check & 32); - $check += 32; - } elsif ($par eq 'dest_pool') { - $cfg -> {$source}->{$sync_name}->{$par} = $value; - die "error in Config: DestPool value doubled\n" if ($check & 64); - $check += 64; - } elsif ($par eq 'dest_ip') { - $cfg -> {$source}->{$sync_name}->{$par} = $value; - die "error in Config: DestIp value doubled\n" if ($check & 128); - $check += 128; - } elsif ($par eq 'dest_path') { - $cfg -> {$source}->{$sync_name}->{$par} = $value; - die "error in Config: DestPath value doubled\n" if ($check & 256); - $check += 256; - } elsif ($par eq 'source_path') { - $cfg -> {$source}->{$sync_name}->{$par} = $value; - die "error in Config: SourcePath value doubled\n" if ($check & 512); - $check += 512; - } elsif ($par eq 'vmid') { - $cfg -> {$source}->{$sync_name}->{$par} = $value; - die "error in Config: Vmid value doubled\n" if ($check & 1024); - $check += 1024; - } elsif ($par =~ 'lsync') { - $cfg->{$source}->{$sync_name}->{$par} = $value; - die "error in Config: lsync value doubled\n" if ($check & 2048); - $check += 2048; - } elsif ($par =~ 'maxsnap') { - $cfg->{$source}->{$sync_name}->{$par} = $value; - die "error in Config: maxsnap value doubled\n" if ($check & 4096); - $check += 4096; - } else { - die "error in Config\n"; - } - } elsif ($line =~ m/^((\d+.\d+.\d+.\d+):)?([\w\-\_\/]+): (.+){0,1}/){ - $source = $3; - $sync_name = $4 ? $4 : 'default' ; - $cfg->{$source}->{$sync_name} = undef; - $cfg->{$source}->{$sync_name}->{source_ip} = $2 if $2; - $check = 0; - } + $job->{state} = $state->{state}; + $job->{lsync} = $state->{lsync}; + $job->{vm_type} = $state->{vm_type}; + + for (my $i = 0; $state->{"snap$i"}; $i++) { + $job->{"snap$i"} = $state->{"snap$i"}; } - return $cfg; + + return $job; } -sub parse_target { - my ($text) = @_; +sub encode_cron { + my (@text) = @_; - if ($text =~ m/^((\d+.\d+.\d+.\d+):)?((\w+)\/?)([\w\/\-\_]*)?$/) { + my $cfg = {}; - die "Input not valid\n" if !$3; - my $tmp = $3; - my $target = {}; + while (my $line = shift(@text)) { - if ($2) { - $target->{ip} = $2 ; - } + my @arg = split('\s', $line); + my $param = parse_argv(@arg); - if ($tmp =~ m/^(\d\d\d+)$/){ - $target->{vmid} = $tmp; - } else { - $target->{pool} = $4; - my $abs_path = $4; - if ($5) { - $target->{path} = "\/$5"; - $abs_path .= "\/$5"; - } - $target->{abs_path} = $abs_path; - } + if ($param->{source} && $param->{dest}) { + my $source = delete $param->{source}; + my $name = delete $param->{name}; - return $target; + $cfg->{$source}->{$name} = $param; + } } - die "Input not valid\n"; + + return $cfg; } -sub list { +sub param_to_job { + my ($param) = @_; - my $cfg = read_from_config("$CONFIG_PATH$CONFIG"); - - my $list = sprintf("%-25s%-15s%-7s%-20s%-10s%-5s\n" , "SOURCE", "NAME", "ACTIVE", "LAST SYNC", "INTERVAL", "TYPE"); - - foreach my $source (sort keys%{$cfg}){ - foreach my $sync_name (sort keys%{$cfg->{$source}}){ - my $source_name = $source; - $source_name = $cfg->{$source}->{$sync_name}->{source_ip}.":".$source if $cfg->{$source}->{$sync_name}->{source_ip}; - $list .= sprintf("%-25s%-15s", cut_to_width($source_name,25), cut_to_width($sync_name,15)); - $list .= sprintf("%-7s",$cfg->{$source}->{$sync_name}->{locked}); - $list .= sprintf("%-20s",$cfg->{$source}->{$sync_name}->{lsync}); - $list .= sprintf("%-10s",$cfg->{$source}->{$sync_name}->{interval}); - $list .= sprintf("%-5s\n",$cfg->{$source}->{$sync_name}->{method}); - } - } + my $job = {}; - return $list; + my $source = parse_target($param->{source}); + my $dest = parse_target($param->{dest}) if $param->{dest}; + + $job->{name} = !$param->{name} ? "default" : $param->{name}; + $job->{dest} = $param->{dest} if $param->{dest}; + $job->{method} = "local" if !$dest->{ip} && !$source->{ip}; + $job->{method} = "ssh" if !$job->{method}; + $job->{limit} = $param->{limit}; + $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap}; + $job->{source} = $param->{source}; + $job->{source_user} = $param->{source_user}; + $job->{dest_user} = $param->{dest_user}; + $job->{properties} = !!$param->{properties}; + $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path}; + + return $job; } -sub vm_exists { - my ($target) = @_; +sub read_state { - my $cmd = ""; - $cmd = "ssh root\@$target->{ip} " if ($target->{ip}); - $cmd .= "qm status $target->{vmid}"; + if (!-e $STATE) { + make_path $CONFIG_PATH; + my $new_fh = IO::File->new("> $STATE"); + die "Could not create $STATE: $!\n" if !$new_fh; + print $new_fh "{}"; + close($new_fh); + return undef; + } - my $res = run_cmd($cmd); + my $fh = IO::File->new("< $STATE"); + die "Could not open file $STATE: $!\n" if !$fh; - return 1 if ($res =~ m/^status.*$/); - return undef; + my $text = <$fh>; + my $states = decode_json($text); + + close($fh); + + return $states; } -sub init { - my ($param) = @_; +sub update_state { + my ($job) = @_; + my $text; + my $in_fh; - my $cfg = read_from_config; + eval { - my $vm = {}; + $in_fh = IO::File->new("< $STATE"); + die "Could not open file $STATE: $!\n" if !$in_fh; + $text = <$in_fh>; + }; - my $name = $param->{name} ? $param->{name} : "default"; - my $interval = $param->{interval} ? $param->{interval} : 15; + my $out_fh = IO::File->new("> $STATE.new"); + die "Could not open file ${STATE}.new: $!\n" if !$out_fh; - my $source = parse_target($param->{source}); - my $dest = parse_target($param->{dest}); + my $states = {}; + my $state = {}; + if ($text){ + $states = decode_json($text); + $state = $states->{$job->{source}}->{$job->{name}}; + } - $vm->{$name}->{dest_pool} = $dest->{pool}; - $vm->{$name}->{dest_ip} = $dest->{ip} if $dest->{ip}; - $vm->{$name}->{dest_path} = $dest->{path} if $dest->{path}; + if ($job->{state} ne "del") { + $state->{state} = $job->{state}; + $state->{lsync} = $job->{lsync}; + $state->{vm_type} = $job->{vm_type}; - $param->{method} = "local" if !$dest->{ip} && !$source->{ip}; - $vm->{$name}->{locked} = "no"; - $vm->{$name}->{interval} = $interval; - $vm->{$name}->{method} = $param->{method} ? $param->{method} : "ssh"; - $vm->{$name}->{limit} = $param->{limit} if $param->{limit}; - $vm->{$name}->{maxsnap} = $param->{maxsnap} if $param->{maxsnap}; + for (my $i = 0; $job->{"snap$i"} ; $i++) { + $state->{"snap$i"} = $job->{"snap$i"}; + } + $states->{$job->{source}}->{$job->{name}} = $state; + } else { - if ( my $ip = $vm->{$name}->{dest_ip} ) { - run_cmd("ssh-copy-id -i /root/.ssh/id_rsa.pub root\@$ip"); + delete $states->{$job->{source}}->{$job->{name}}; + delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}}; } - if ( my $ip = $source->{ip} ) { - run_cmd("ssh-copy-id -i /root/.ssh/id_rsa.pub root\@$ip"); - } + $text = encode_json($states); + print $out_fh $text; - die "Pool $dest->{abs_path} does not exists\n" if check_pool_exsits($dest->{abs_path}, $dest->{ip}); + close($out_fh); + rename "$STATE.new", $STATE; + eval { + close($in_fh); + }; - my $check = check_pool_exsits($source->{abs_path}, $source->{ip}) if !$source->{vmid} && $source->{abs_path}; + return $states; +} - die "Pool $source->{abs_path} does not exists\n" if undef($check); +sub update_cron { + my ($job) = @_; - my $add_job = sub { - my ($vm, $name) = @_; - my $source = ""; + my $updated; + my $has_header; + my $line_no = 0; + my $text = ""; + my $header = "SHELL=/bin/sh\n"; + $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n"; + + my $fh = IO::File->new("< $CRONJOBS"); + die "Could not open file $CRONJOBS: $!\n" if !$fh; + + my @test = <$fh>; - if ($vm->{$name}->{vmid}) { - $source = "$vm->{$name}->{source_ip}:" if $vm->{$name}->{source_ip}; - $source .= $vm->{$name}->{vmid}; + while (my $line = shift(@test)) { + chomp($line); + if ($line =~ m/source $job->{source} .*name $job->{name} /) { + $updated = 1; + next if $job->{state} eq "del"; + $text .= format_job($job, $line); } else { - $source = $vm->{$name}->{source_pool}; - $source .= $vm->{$name}->{source_path} if $vm->{$name}->{source_path}; + if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) { + $has_header = 1; + } + $text .= "$line\n"; } - die "Config already exists\n" if $cfg->{$source}->{$name}; + $line_no++; + } - cron_add($vm); + if (!$has_header) { + $text = "$header$text"; + } - $cfg->{$source}->{$name} = $vm->{$name}; + if (!$updated) { + $text .= format_job($job); + } + my $new_fh = IO::File->new("> ${CRONJOBS}.new"); + die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh; - write_to_config($cfg); - }; + die "can't write to $CRONJOBS.new\n" if !print($new_fh $text); + close ($new_fh); - if ($source->{vmid}) { - die "VM $source->{vmid} doesn't exist\n" if !vm_exists($source); - my $disks = get_disks($source); - $vm->{$name}->{vmid} = $source->{vmid}; - $vm->{$name}->{lsync} = 0; - $vm->{$name}->{source_ip} = $source->{ip} if $source->{ip}; + die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS; + close ($fh); +} - &$add_job($vm, $name); +sub format_job { + my ($job, $line) = @_; + my $text = ""; + if ($job->{state} eq "stopped") { + $text = "#"; + } + if ($line) { + $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/; + $text .= $1; } else { - $vm->{$name}->{source_pool} = $source->{pool}; - $vm->{$name}->{source_ip} = $source->{ip} if $source->{ip}; - $vm->{$name}->{source_path} = $source->{path} if $source->{path}; - $vm->{$name}->{lsync} = 0; + $text .= "*/$INTERVAL * * * *"; + } + $text .= " root"; + $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}"; + $text .= " --name $job->{name} --maxsnap $job->{maxsnap}"; + $text .= " --limit $job->{limit}" if $job->{limit}; + $text .= " --method $job->{method}"; + $text .= " --verbose" if $job->{verbose}; + $text .= " --source-user $job->{source_user}"; + $text .= " --dest-user $job->{dest_user}"; + $text .= " --properties" if $job->{properties}; + $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path}; + $text .= "\n"; + + return $text; +} + +sub list { + + my $cfg = read_cron(); + + my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON"); - &$add_job($vm, $name); + my $states = read_state(); + foreach my $source (sort keys%{$cfg}) { + foreach my $name (sort keys%{$cfg->{$source}}) { + $list .= sprintf("%-25s", cut_target_width($source, 25)); + $list .= sprintf("%-25s", cut_target_width($name, 25)); + $list .= sprintf("%-10s", $states->{$source}->{$name}->{state}); + $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync}); + $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef"); + $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method}); + } } - eval {sync($param) if !$param->{skip};}; - if(my $err = $@) { - destroy($param); - print $err; + return $list; +} + +sub vm_exists { + my ($target, $user) = @_; + + return undef if !defined($target->{vmid}); + + my $conf_fn = "$target->{vmid}.conf"; + + if ($target->{ip}) { + my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls'); + return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) }; + return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) }; + } else { + return "qemu" if -f "$QEMU_CONF/$conf_fn"; + return "lxc" if -f "$LXC_CONF/$conf_fn"; } + + return undef; } -sub destroy { +sub init { my ($param) = @_; - my $cfg = read_from_config("$CONFIG_PATH$CONFIG"); - my $name = $param->{name} ? $param->{name} : "default"; + locked("$CONFIG_PATH/cron_and_state.lock", sub { + my $cfg = read_cron(); - my $source = parse_target($param->{source}); + my $job = param_to_job($param); - my $delete_cron = sub { - my ($path, $name, $cfg) = @_; + $job->{state} = "ok"; + $job->{lsync} = 0; - die "Source does not exist!\n" unless $cfg->{$path} ; + my $source = parse_target($param->{source}); + my $dest = parse_target($param->{dest}); - die "Sync Name does not exist!\n" unless $cfg->{$path}->{$name}; + if (my $ip = $dest->{ip}) { + run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]); + } - my $source .= $cfg->{$path}->{$name}->{source_ip} ? "$cfg->{$path}->{$name}->{source_ip}:" : ''; + if (my $ip = $source->{ip}) { + run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]); + } - $source .= $cfg->{$path}->{$name}->{source_pool}; - $source .= $cfg->{$path}->{$name}->{source_path} ? $cfg->{$path}->{$name}->{source_path} :''; + die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user}); - my $dest = $cfg->{$path}->{$name}->{dest_ip} ? $cfg->{$path}->{$name}->{dest_ip} :""; - $dest .= $cfg->{$path}->{$name}->{dest_pool}; - $dest .= $cfg->{$path}->{$name}->{dest_path} ? $cfg->{$path}->{$name}->{dest_path} :''; + if (!defined($source->{vmid})) { + die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user}); + } - delete $cfg->{$path}->{$name}; + my $vm_type = vm_exists($source, $param->{source_user}); + $job->{vm_type} = $vm_type; + $source->{vm_type} = $vm_type; - delete $cfg->{$path} if keys%{$cfg->{$path}} == 0; + die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type; - write_to_config($cfg); + die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}}; - cron_del($source, $dest, $name); - }; + #check if vm has zfs disks if not die; + get_disks($source, $param->{source_user}) if $source->{vmid}; + update_cron($job); + update_state($job); + }); #cron and state lock - if ($source->{vmid}) { - my $path = $source->{vmid}; + return if $param->{skip}; - &$delete_cron($path, $name, $cfg) + eval { sync($param) }; + if (my $err = $@) { + destroy_job($param); + print $err; + } +} - } else { +sub get_job { + my ($param) = @_; - my $path = $source->{pool}; - $path .= $source->{path} if $source->{path}; + my $cfg = read_cron(); - &$delete_cron($path, $name, $cfg); + if (!$cfg->{$param->{source}}->{$param->{name}}) { + die "Job with source $param->{source} and name $param->{name} does not exist\n" ; } + my $job = $cfg->{$param->{source}}->{$param->{name}}; + $job->{name} = $param->{name}; + $job->{source} = $param->{source}; + $job = add_state_to_job($job); + + return $job; } -sub sync { +sub destroy_job { my ($param) = @_; - my $cfg = read_from_config("$CONFIG_PATH$CONFIG"); + locked("$CONFIG_PATH/cron_and_state.lock", sub { + my $job = get_job($param); + $job->{state} = "del"; - my $name = $param->{name} ? $param->{name} : "default"; - my $max_snap = $param->{maxsnap} ? $param->{maxsnap} : 1; - my $method = $param->{method} ? $param->{method} : "ssh"; + update_cron($job); + update_state($job); + }); +} - my $dest = parse_target($param->{dest}); - my $source = parse_target($param->{source}); +sub sync { + my ($param) = @_; - my $sync_path = sub { - my ($source, $name, $cfg, $max_snap, $dest, $method) = @_; + my $job; - ($source->{old_snap},$source->{last_snap}) = snapshot_get($source, $dest, $max_snap, $name); + locked("$CONFIG_PATH/cron_and_state.lock", sub { + eval { $job = get_job($param) }; - my $job_status = check_config($source, $name, $cfg) if $cfg; - die "VM syncing at the moment!\n" if ($job_status && $job_status eq "active"); + if ($job) { + if (defined($job->{state}) && ($job->{state} eq "syncing" || $job->{state} eq "waiting")) { + die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n"; + } - if ($job_status && $job_status eq "exist") { - my $conf_name = $source->{abs_path}; - $conf_name = $source->{vmid} if $source->{vmid}; - $cfg->{$conf_name}->{$name}->{locked} = "yes"; - write_to_config($cfg); + $job->{state} = "waiting"; + update_state($job); } + }); - my $date = snapshot_add($source, $dest, $name); + locked("$CONFIG_PATH/sync.lock", sub { - send_image($source, $dest, $method, $param->{verbose}, $param->{limit}); + my $date = get_date(); - snapshot_destroy($source, $dest, $method, $source->{old_snap}) if ($source->{destroy} && $source->{old_snap}); + my $dest; + my $source; + my $vm_type; + locked("$CONFIG_PATH/cron_and_state.lock", sub { + #job might've changed while we waited for the sync lock, but we can be sure it's not syncing + eval { $job = get_job($param); }; - if ($job_status && $job_status eq "exist") { - my $conf_name = $source->{abs_path}; - $conf_name = $source->{vmid} if $source->{vmid}; - $cfg->{$conf_name}->{$name}->{locked} = "no"; - $cfg->{$conf_name}->{$name}->{lsync} = $date; - write_to_config($cfg); - } - }; + if ($job && defined($job->{state}) && $job->{state} eq "stopped") { + die "Job --source $param->{source} --name $param->{name} has been disabled\n"; + } - $param->{method} = "ssh" if !$param->{method}; + $dest = parse_target($param->{dest}); + $source = parse_target($param->{source}); - if ($source->{vmid}) { - die "VM $source->{vmid} doesn't exist\n" if !vm_exists($source); - my $disks = get_disks($source); + $vm_type = vm_exists($source, $param->{source_user}); + $source->{vm_type} = $vm_type; - foreach my $disk (sort keys %{$disks}) { - $source->{abs_path} = $disks->{$disk}->{pool}; - $source->{abs_path} .= "\/$disks->{$disk}->{path}" if $disks->{$disk}->{path}; + if ($job) { + $job->{state} = "syncing"; + $job->{vm_type} = $vm_type if !$job->{vm_type}; + update_state($job); + } + }); #cron and state lock - $source->{pool} = $disks->{$disk}->{pool}; - $source->{path} = "\/$disks->{$disk}->{path}"; + my $sync_path = sub { + my ($source, $dest, $job, $param, $date) = @_; - &$sync_path($source, $name, $cfg, $max_snap, $dest, $method); - } - if ($method eq "ssh") { - send_config($source, $dest,'ssh'); + ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{dest_user}); + + snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user}); + + send_image($source, $dest, $param); + + snapshot_destroy($source, $dest, $param->{method}, $dest->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $dest->{old_snap}); + + }; + + eval{ + if ($source->{vmid}) { + die "VM $source->{vmid} doesn't exist\n" if !$vm_type; + die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root"); + my $disks = get_disks($source, $param->{source_user}); + + foreach my $disk (sort keys %{$disks}) { + $source->{all} = $disks->{$disk}->{all}; + $source->{pool} = $disks->{$disk}->{pool}; + $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path}; + $source->{last_part} = $disks->{$disk}->{last_part}; + &$sync_path($source, $dest, $job, $param, $date); + } + if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) { + send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path}); + } else { + send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path}); + } + } else { + &$sync_path($source, $dest, $job, $param, $date); + } + }; + if (my $err = $@) { + locked("$CONFIG_PATH/cron_and_state.lock", sub { + eval { $job = get_job($param); }; + if ($job) { + $job->{state} = "error"; + update_state($job); + } + }); + print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n"; + die "$err\n"; } - } else { - &$sync_path($source, $name, $cfg, $max_snap, $dest, $method); - } + + locked("$CONFIG_PATH/cron_and_state.lock", sub { + eval { $job = get_job($param); }; + if ($job) { + if (defined($job->{state}) && $job->{state} eq "stopped") { + $job->{state} = "stopped"; + } else { + $job->{state} = "ok"; + } + $job->{lsync} = $date; + update_state($job); + } + }); + }); #sync lock } sub snapshot_get{ - my ($source, $dest, $max_snap, $name) = @_; + my ($source, $dest, $max_snap, $name, $dest_user) = @_; - my $cmd = "zfs list -r -t snapshot -Ho name, -S creation "; + my $cmd = []; + push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--', if $dest->{ip}; + push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation'; - $cmd .= $source->{abs_path}; - $cmd = "ssh root\@$source->{ip} ".$cmd if $source->{ip}; + my $path = $dest->{all}; + $path .= "/$source->{last_part}" if $source->{last_part}; + push @$cmd, $path; - my $raw = run_cmd($cmd); - my $index = 1; + my $raw; + eval {$raw = run_cmd($cmd)}; + if (my $erro =$@) { #this means the volume doesn't exist on dest yet + return undef; + } + + my $index = 0; my $line = ""; my $last_snap = undef; + my $old_snap; while ($raw && $raw =~ s/^(.*?)(\n|$)//) { $line = $1; - $last_snap = $line if $index == 1; - if ($index == $max_snap) { - $source->{destroy} = 1; - last; - }; - $index++; + if ($line =~ m/@(.*)$/) { + $last_snap = $1 if (!$last_snap); + } + if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) { + $old_snap = $1; + $index++; + if ($index == $max_snap) { + $source->{destroy} = 1; + last; + }; + } } - $line =~ m/^(.+)\@(rep_$name\_.+)(\n|$)/; - return ($2, $last_snap) if $2; + return ($old_snap, $last_snap) if $last_snap; return undef; } sub snapshot_add { - my ($source, $dest, $name) = @_; - - my $date = get_date(); + my ($source, $dest, $name, $date, $source_user, $dest_user) = @_; my $snap_name = "rep_$name\_".$date; $source->{new_snap} = $snap_name; - my $path = $source->{abs_path}."\@".$snap_name; - - my $cmd = "zfs snapshot $path"; - $cmd = "ssh root\@$source->{ip} ".$cmd if $source->{ip}; + my $path = "$source->{all}\@$snap_name"; + my $cmd = []; + push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip}; + push @$cmd, 'zfs', 'snapshot', $path; eval{ run_cmd($cmd); }; - if (my $err = $@){ - snapshot_destroy($source, $dest, 'ssh', $snap_name); + if (my $err = $@) { + snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user); die "$err\n"; } - return $date; -} - -sub cron_add { - my ($vm) = @_; - - open(my $fh, '>>', "$CRONJOBS") - or die "Could not open file: $!\n"; - - foreach my $name (keys%{$vm}){ - my $text = "*/$vm->{$name}->{interval} * * * * root "; - $text .= "$PATH$PROGNAME sync"; - $text .= " -source "; - if ($vm->{$name}->{vmid}) { - $text .= "$vm->{$name}->{source_ip}:" if $vm->{$name}->{source_ip}; - $text .= "$vm->{$name}->{vmid} "; - } else { - $text .= "$vm->{$name}->{source_ip}:" if $vm->{$name}->{source_ip}; - $text .= "$vm->{$name}->{source_pool}"; - $text .= "$vm->{$name}->{source_path}" if $vm->{$name}->{source_path}; - } - $text .= " -dest "; - $text .= "$vm->{$name}->{dest_ip}:" if $vm->{$name}->{dest_ip}; - $text .= "$vm->{$name}->{dest_pool}"; - $text .= "$vm->{$name}->{dest_path}" if $vm->{$name}->{dest_path}; - $text .= " -name $name "; - $text .= " -limit $vm->{$name}->{limit}" if $vm->{$name}->{limit}; - $text .= " -maxsnap $vm->{$name}->{maxsnap}" if $vm->{$name}->{maxsnap}; - $text .= "\n"; - print($fh $text); - } - close($fh); } -sub cron_del { - my ($source, $dest, $name) = @_; - - open(my $fh, '<', "$CRONJOBS") - or die "Could not open file: $!\n"; +sub get_disks { + my ($target, $user) = @_; - $/ = undef; + my $cmd = []; + push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip}; - my $text = <$fh>; - my $buffer = ""; - close($fh); - while ($text && $text =~ s/^(.*?)(\n|$)//) { - my $line = $1.$2; - if ($line !~ m/^.*root $PATH$PROGNAME sync -source $source.*-dest $dest.*-name $name.*$/){ - $buffer .= $line; - } + if ($target->{vm_type} eq 'qemu') { + push @$cmd, 'qm', 'config', $target->{vmid}; + } elsif ($target->{vm_type} eq 'lxc') { + push @$cmd, 'pct', 'config', $target->{vmid}; + } else { + die "VM Type unknown\n"; } - open($fh, '>', "$CRONJOBS") - or die "Could not open file: $!\n"; - print($fh $buffer); - close($fh); -} - -sub get_disks { - my ($target) = @_; - - my $cmd = ""; - $cmd = "ssh root\@$target->{ip} " if $target->{ip}; - $cmd .= "qm config $target->{vmid}"; my $res = run_cmd($cmd); - my $disks = parse_disks($res, $target->{ip}); + my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user); return $disks; } @@ -614,9 +782,12 @@ sub run_cmd { my ($cmd) = @_; print "Start CMD\n" if $DEBUG; print Dumper $cmd if $DEBUG; + if (ref($cmd) eq 'ARRAY') { + $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd); + } my $output = `$cmd 2>&1`; - die $output if 0 != $?; + die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?; chomp($output); print Dumper $output if $DEBUG; @@ -625,81 +796,109 @@ sub run_cmd { } sub parse_disks { - my ($text, $ip) = @_; + my ($text, $ip, $vm_type, $user) = @_; my $disks; my $num = 0; - my $cmd = ""; - $cmd .= "ssh root\@$ip " if $ip; - $cmd .= "pvesm zfsscan"; - my $zfs_pools = run_cmd($cmd); while ($text && $text =~ s/^(.*?)(\n|$)//) { my $line = $1; + + next if $line =~ /media=cdrom/; + next if $line !~ m/$DISK_KEY_RE/; + + #QEMU if backup is not set include in sync + next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/); + + #LXC if backup is not set do no in sync + next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/); + my $disk = undef; my $stor = undef; - my $is_disk = $line =~ m/^(virtio|ide|scsi|sata){1}\d+: /; - if($line =~ m/^(virtio\d+: )(.+:)([A-Za-z0-9\-]+),(.*)$/) { - $disk = $3; - $stor = $2; - } elsif($line =~ m/^(ide\d+: )(.+:)([A-Za-z0-9\-]+),(.*)$/) { - $disk = $3; - $stor = $2; - } elsif($line =~ m/^(scsi\d+: )(.+:)([A-Za-z0-9\-]+),(.*)$/) { - $disk = $3; - $stor = $2; - } elsif($line =~ m/^(sata\d+: )(.+:)([A-Za-z0-9\-]+),(.*)$/) { - $disk = $3; - $stor = $2; + if($line =~ m/$DISK_KEY_RE(.*)$/) { + my @parameter = split(/,/,$1); + + foreach my $opt (@parameter) { + if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){ + $disk = $2; + $stor = $1; + last; + } + } + } + if (!defined($disk) || !defined($stor)) { + print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n"; + next; } - die "disk is not on ZFS Storage\n" if $is_disk && !$disk && $line !~ m/cdrom/; - - if($disk && $line !~ m/none/ && $line !~ m/cdrom/ ) { - my $cmd = ""; - $cmd .= "ssh root\@$ip " if $ip; - $cmd .= "pvesm path $stor$disk"; - my $path = run_cmd($cmd); + my $cmd = []; + push @$cmd, 'ssh', "$user\@$ip", '--' if $ip; + push @$cmd, 'pvesm', 'path', "$stor$disk"; + my $path = run_cmd($cmd); + + die "Get no path from pvesm path $stor$disk\n" if !$path; + + if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) { + + my @array = split('/', $1); + $disks->{$num}->{pool} = shift(@array); + $disks->{$num}->{all} = $disks->{$num}->{pool}; + if (0 < @array) { + $disks->{$num}->{path} = join('/', @array); + $disks->{$num}->{all} .= "\/$disks->{$num}->{path}"; + } + $disks->{$num}->{last_part} = $disk; + $disks->{$num}->{all} .= "\/$disk"; - if ($path =~ m/^\/dev\/zvol\/(\w+).*(\/$disk)$/){ + $num++; + } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) { - $disks->{$num}->{pool} = $1; - $disks->{$num}->{path} = $disk; - $num++; + $disks->{$num}->{pool} = $1; + $disks->{$num}->{all} = $disks->{$num}->{pool}; - } else { - die "ERROR: in path\n"; + if ($2) { + $disks->{$num}->{path} = $3; + $disks->{$num}->{all} .= "\/$disks->{$num}->{path}"; } + + $disks->{$num}->{last_part} = $disk; + $disks->{$num}->{all} .= "\/$disk"; + + $num++; + + } else { + die "ERROR: in path\n"; } } + + die "Vm include no disk on zfs.\n" if !$disks->{0}; return $disks; } sub snapshot_destroy { - my ($source, $dest, $method, $snap) = @_; + my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_; - my $zfscmd = "zfs destroy "; - my $name = "$source->{path}\@$snap"; + my @zfscmd = ('zfs', 'destroy'); + my $snapshot = "$source->{all}\@$snap"; eval { if($source->{ip} && $method eq 'ssh'){ - run_cmd("ssh root\@$source->{ip} $zfscmd $source->{pool}$name"); + run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]); } else { - run_cmd("$zfscmd $source->{pool}$name"); + run_cmd([@zfscmd, $snapshot]); } }; if (my $erro = $@) { warn "WARN: $erro"; } - if ($dest){ - my $ssh = $dest->{ip} ? "ssh root\@$dest->{ip}" : ""; + if ($dest) { + my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : (); - my $path = ""; - $path ="$dest->{path}" if $dest->{path}; + my $path = "$dest->{all}"; + $path .= "/$source->{last_part}" if $source->{last_part}; - my @dir = split(/\//, $source->{path}); eval { - run_cmd("$ssh $zfscmd $dest->{pool}$path\/$dir[@dir-1]\@$snap "); + run_cmd([@ssh, @zfscmd, "$path\@$snap"]); }; if (my $erro = $@) { warn "WARN: $erro"; @@ -707,339 +906,267 @@ sub snapshot_destroy { } } +# check if snapshot for incremental sync exist on source side sub snapshot_exist { - my ($source ,$dest, $method) = @_; + my ($source , $dest, $method, $source_user) = @_; - my $cmd = ""; - $cmd = "ssh root\@$dest->{ip} " if $dest->{ip}; - $cmd .= "zfs list -rt snapshot -Ho name $dest->{pool}"; - $cmd .= "$dest->{path}" if $dest->{path}; - my @dir = split(/\//, $source->{path}); - $cmd .= "\/$dir[@dir-1]\@$source->{old_snap}"; + my $cmd = []; + push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip}; + push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name'; - my $text = ""; - eval {$text =run_cmd($cmd);}; - if (my $erro = $@) { + my $path = $source->{all}; + $path .= "\@$dest->{last_snap}"; + + push @$cmd, $path; + + eval {run_cmd($cmd)}; + if (my $erro =$@) { warn "WARN: $erro"; return undef; } - - while ($text && $text =~ s/^(.*?)(\n|$)//) { - my $line = $1; - return 1 if $line =~ m/^.*$source->{old_snap}$/; - } + return 1; } sub send_image { - my ($source, $dest, $method, $verbose, $limit) = @_; + my ($source, $dest, $param) = @_; - my $cmd = ""; + my $cmd = []; - $cmd .= "ssh root\@$source->{ip} " if $source->{ip}; - $cmd .= "zfs send "; - $cmd .= "-v " if $verbose; + push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip}; + push @$cmd, 'zfs', 'send'; + push @$cmd, '-p', if $param->{properties}; + push @$cmd, '-v' if $param->{verbose}; - if($source->{last_snap} && snapshot_exist($source ,$dest, $method)) { - $cmd .= "-i $source->{abs_path}\@$source->{old_snap} $source->{abs_path}\@$source->{new_snap} "; - } else { - $cmd .= "$source->{abs_path}\@$source->{new_snap} "; + if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) { + push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}"; } + push @$cmd, '--', "$source->{all}\@$source->{new_snap}"; - if ($limit){ - my $bwl = $limit*1024; - $cmd .= "| cstream -t $bwl"; + if ($param->{limit}){ + my $bwl = $param->{limit}*1024; + push @$cmd, \'|', 'cstream', '-t', $bwl; } - $cmd .= "| "; - $cmd .= "ssh root\@$dest->{ip} " if $dest->{ip}; - $cmd .= "zfs recv $dest->{pool}"; - $cmd .= "$dest->{path}" if $dest->{path}; + my $target = "$dest->{all}"; + $target .= "/$source->{last_part}" if $source->{last_part}; + $target =~ s!/+!/!g; - my @dir = split(/\//,$source->{path}); - $cmd .= "\/$dir[@dir-1]\@$source->{new_snap}"; + push @$cmd, \'|'; + push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip}; + push @$cmd, 'zfs', 'recv', '-F', '--'; + push @$cmd, "$target"; eval { run_cmd($cmd) }; if (my $erro = $@) { - snapshot_destroy($source, undef, $method, $source->{new_snap}); + snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user}); die $erro; }; } sub send_config{ - my ($source, $dest, $method) = @_; + my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_; + + my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf"; + my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}"; + + my $config_dir = $dest_config_path // $CONFIG_PATH; + $config_dir .= "/$dest->{last_part}" if $dest->{last_part}; + + $dest_target_new = $config_dir.'/'.$dest_target_new; if ($method eq 'ssh'){ if ($dest->{ip} && $source->{ip}) { - run_cmd("ssh root\@$dest->{ip} mkdir $VMCONFIG -p"); - run_cmd("scp root\@$source->{ip}:$QEMU_CONF$source->{vmid}.conf root\@$dest->{ip}:$VMCONFIG$source->{vmid}.conf.$source->{new_snap}"); + run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]); + run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]); } elsif ($dest->{ip}) { - run_cmd("ssh root\@$dest->{ip} mkdir $VMCONFIG -p"); - run_cmd("scp $QEMU_CONF$source->{vmid}.conf root\@$dest->{ip}:$VMCONFIG$source->{vmid}.conf.$source->{new_snap}"); + run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]); + run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]); } elsif ($source->{ip}) { - run_cmd("mkdir $VMCONFIG -p"); - run_cmd("scp root\@$source->{ip}:$QEMU_CONF$source->{vmid}.conf $VMCONFIG$source->{vmid}.conf.$source->{new_snap}"); + run_cmd(['mkdir', '-p', '--', $config_dir]); + run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]); } if ($source->{destroy}){ + my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$dest->{old_snap}"; if($dest->{ip}){ - run_cmd("ssh root\@$dest->{ip} rm -f $VMCONFIG$source->{vmid}.conf.$source->{old_snap}"); + run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]); } else { - run_cmd("rm -f $VMCONFIG$source->{vmid}.conf.$source->{old_snap}"); + run_cmd(['rm', '-f', '--', $dest_target_old]); } } + } elsif ($method eq 'local') { + run_cmd(['mkdir', '-p', '--', $config_dir]); + run_cmd(['cp', $source_target, $dest_target_new]); } } sub get_date { - my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time); - my $datestamp = sprintf ( "%04d-%02d-%02d_%02d:%02d:%02d",$year+1900,$mon+1,$mday,$hour,$min,$sec); + my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time); + my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec); return $datestamp; } sub status { - my $cfg = read_from_config("$CONFIG_PATH$CONFIG"); - - my $status_list = sprintf("%-25s%-15s%-10s\n","SOURCE","NAME","STATUS"); - - foreach my $source (sort keys%{$cfg}){ - foreach my $sync_name (sort keys%{$cfg->{$source}}){ - my $status; + my $cfg = read_cron(); - my $source_name = $source; + my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS"); - $source_name = $cfg->{$source}->{$sync_name}->{source_ip}.":".$source if $cfg->{$source}->{$sync_name}->{source_ip}; + my $states = read_state(); - if ($cfg->{$source}->{$sync_name}->{locked} eq 'no'){ - $status = sprintf("%-10s","OK"); - } elsif ($cfg->{$source}->{$sync_name}->{locked} eq 'yes' && - $cfg->{$source}->{$sync_name}->{failure}) { - $status = sprintf("%-10s","sync error"); - } else { - $status = sprintf("%-10s","syncing"); - } - - $status_list .= sprintf("%-25s%-15s", cut_to_width($source_name,25), cut_to_width($sync_name,15)); - $status_list .= "$status\n"; + foreach my $source (sort keys%{$cfg}) { + foreach my $sync_name (sort keys%{$cfg->{$source}}) { + $status_list .= sprintf("%-25s", cut_target_width($source, 25)); + $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25)); + $status_list .= "$states->{$source}->{$sync_name}->{state}\n"; } } return $status_list; } +sub enable_job { + my ($param) = @_; -my $command = $ARGV[0]; + locked("$CONFIG_PATH/cron_and_state.lock", sub { + my $job = get_job($param); + $job->{state} = "ok"; + update_state($job); + update_cron($job); + }); +} -my $commands = {'destroy' => 1, - 'create' => 1, - 'sync' => 1, - 'list' => 1, - 'status' => 1, - 'help' => 1}; +sub disable_job { + my ($param) = @_; -if (!$command || !$commands->{$command}) { - usage(); - die "\n"; + locked("$CONFIG_PATH/cron_and_state.lock", sub { + my $job = get_job($param); + $job->{state} = "stopped"; + update_state($job); + update_cron($job); + }); } -my $dest = ''; -my $source = ''; -my $verbose = ''; -my $interval = ''; -my $limit = ''; -my $maxsnap = ''; -my $name = ''; -my $skip = ''; - -my $help_sync = "zfs-zsync sync -dest -source [OPTIONS]\n -\twill sync one time\n -\t-dest\tstring\n -\t\tthe destination target is like [IP:][/Path]\n -\t-limit\tinteger\n -\t\tmax sync speed in kBytes/s, default unlimited\n -\t-maxsnap\tinteger\n -\t\thow much snapshots will be kept before get erased, default 1/n -\t-name\tstring\n -\t\tname of the sync job, if not set it is default. -\tIt is only necessary if scheduler allready contains this source.\n -\t-source\tstring\n -\t\tthe source can be an or [IP:][/Path]\n"; - -my $help_create = "zfs-zsync create -dest -source [OPTIONS]/n -\tCreate a sync Job\n -\t-dest\tstringn\n -\t\tthe destination target is like [IP]:[/Path]\n -\t-interval\tinteger\n -\t\tthe interval in min in witch the zfs will sync, -\t\tdefault is 15 min\n -\t-limit\tinteger\n -\t\tmax sync speed, default unlimited\n -\t-maxsnap\tstring\n -\t\thow much snapshots will be kept before get erased, default 1\n -\t-name\tstring\n -\t\tname of the sync job, if not set it is default\n -\t-skip\tboolean\n -\t\tif this flag is set it will skip the first sync\n -\t-source\tstring\n -\t\tthe source can be an or [IP:][/Path]\n"; - -my $help_destroy = "zfs-zsync destroy -source [OPTIONS]\n -\tremove a sync Job from the scheduler\n -\t-name\tstring\n -\t\tname of the sync job, if not set it is default\n -\t-source\tstring\n -\t\tthe source can be an or [IP:][/Path]\n"; - -my $help_help = "zfs-zsync help [OPTIONS]\n -\tGet help about specified command.\n -\t\tstring\n -\t\tCommand name\n -\t-verbose\tboolean\n -\t\tVerbose output format.\n"; - -my $help_list = "zfs-zsync list\n -\tGet a List of all scheduled Sync Jobs\n"; - -my $help_status = "zfs-zsync status\n -\tGet the status of all scheduled Sync Jobs\n"; - -sub help{ - my ($command) = @_; - - switch($command){ - case 'help' - { - die "$help_help\n"; - } - case 'sync' - { - die "$help_sync\n"; - } - case 'destroy' - { - die "$help_destroy\n"; - } - case 'create' - { - die "$help_create\n"; - } - case 'list' - { - die "$help_list\n"; - } - case 'status' - { - die "$help_status\n"; - } - } +my $cmd_help = { + destroy => qq{ +$PROGNAME destroy -source [OPTIONS] -} + remove a sync Job from the scheduler -my $err = GetOptions ('dest=s' => \$dest, - 'source=s' => \$source, - 'verbose' => \$verbose, - 'interval=i' => \$interval, - 'limit=i' => \$limit, - 'maxsnap=i' => \$maxsnap, - 'name=s' => \$name, - 'skip' => \$skip); - -if ($err == 0) { - die "can't parse options\n"; -} + -name string -my $param; -$param->{dest} = $dest; -$param->{source} = $source; -$param->{verbose} = $verbose; -$param->{interval} = $interval; -$param->{limit} = $limit; -$param->{maxsnap} = $maxsnap; -$param->{name} = $name; -$param->{skip} = $skip; - -switch($command){ - case "destroy" - { - die "$help_destroy\n" if !$source; - check_target($source); - destroy($param); - } - case "sync" - { - die "$help_sync\n" if !$source || !$dest; - check_target($source); - check_target($dest); - sync($param); - } - case "create" - { - die "$help_create\n" if !$source || !$dest; - check_target($source); - check_target($dest); - init($param); - } - case "status" - { - print status(); - } - case "list" - { - print list(); - } - case "help" - { - my $help_command = $ARGV[1]; - if ($help_command && $commands->{$help_command}) { - print help($help_command); - } - if ($verbose){ - exec("man $PROGNAME"); - } else { - usage(1); - } - } -} + name of the sync job, if not set it is default -sub usage{ - my ($help) = @_; + -source string - print("ERROR:\tno command specified\n") if !$help; - print("USAGE:\t$PROGNAME [ARGS] [OPTIONS]\n"); - print("\tpve-zsync help [] [OPTIONS]\n\n"); - print("\tpve-zsync create -dest -source [OPTIONS]\n"); - print("\tpve-zsync destroy -source [OPTIONS]\n"); - print("\tpve-zsync list\n"); - print("\tpve-zsync status\n"); - print("\tpve-zsync sync -dest -source [OPTIONS]\n"); -} + the source can be an or [IP:][/Path] + }, + create => qq{ +$PROGNAME create -dest -source [OPTIONS] -sub check_target{ - my ($target) = @_; + Create a sync Job - chomp($target); + -dest string - if($target !~ m/(\d+.\d+.\d+.\d+:)?([\w\-\_\/]+)(\/.+)?/){ - print("ERROR:\t$target is not valid.\n\tUse [IP:][/Path]!\n"); - return 1; - } - return undef; -} + the destination target is like [IP]:[/Path] -__END__ + -dest-user string -=head1 NAME + name of the user on the destination target, root by default -pve-zsync - PVE ZFS Replication Manager + -limit integer -=head1 SYNOPSIS + max sync speed in kBytes/s, default unlimited + + -maxsnap string + + how much snapshots will be kept before get erased, default 1 + + -name string + + name of the sync job, if not set it is default + + -skip boolean + + if this flag is set it will skip the first sync + + -source string + + the source can be an or [IP:][/Path] + + -source-user string + + name of the user on the source target, root by default + + -properties boolean + + Include the dataset's properties in the stream. + + -dest-config-path string + + specify a custom config path on the destination target. default is /var/lib/pve-zsync + }, + sync => qq{ +$PROGNAME sync -dest -source [OPTIONS]\n + + will sync one time + + -dest string + + the destination target is like [IP:][/Path] + + -dest-user string + + name of the user on the destination target, root by default + + -limit integer + + max sync speed in kBytes/s, default unlimited -zfs-zsync [ARGS] [OPTIONS] + -maxsnap integer -zfs-zsync help [OPTIONS] + how much snapshots will be kept before get erased, default 1 + + -name string + + name of the sync job, if not set it is default. + It is only necessary if scheduler allready contains this source. + + -source string + + the source can be an or [IP:][/Path] + + -source-user string + + name of the user on the source target, root by default + + -verbose boolean + + print out the sync progress. + + -properties boolean + + Include the dataset's properties in the stream. + + -dest-config-path string + + specify a custom config path on the destination target. default is /var/lib/pve-zsync + }, + list => qq{ +$PROGNAME list + + Get a List of all scheduled Sync Jobs + }, + status => qq{ +$PROGNAME status + + Get the status of all scheduled Sync Jobs + }, + help => qq{ +$PROGNAME help [OPTIONS] Get help about specified command. @@ -1050,88 +1177,151 @@ zfs-zsync help [OPTIONS] -verbose boolean Verbose output format. + }, + enable => qq{ +$PROGNAME enable -source [OPTIONS] + + enable a syncjob and reset error -zfs-zsync create -dest -source [OPTIONS] + -name string - Create a sync Job + name of the sync job, if not set it is default - -dest string + -source string - the destination target is like [IP]:[/Path] + the source can be an or [IP:][/Path] + }, + disable => qq{ +$PROGNAME disable -source [OPTIONS] - -interval integer + pause a sync job - the interval in min in witch the zfs will sync, default is 15 min + -name string - -limit integer + name of the sync job, if not set it is default - max sync speed, default unlimited + -source string - -maxsnap string + the source can be an or [IP:][/Path] + }, + printpod => 'internal command', - how much snapshots will be kept before get erased, default 1 +}; - -name string +if (!$command) { + usage(); die "\n"; +} elsif (!$cmd_help->{$command}) { + print "ERROR: unknown command '$command'"; + usage(1); die "\n"; +} - name of the sync job, if not set it is default +my @arg = @ARGV; +my $param = parse_argv(@arg); - -skip boolean +sub check_params { + for (@_) { + die "$cmd_help->{$command}\n" if !$param->{$_}; + } +} - if this flag is set it will skip the first sync +if ($command eq 'destroy') { + check_params(qw(source)); - -source string + check_target($param->{source}); + destroy_job($param); - the source can be an or [IP:][/Path] +} elsif ($command eq 'sync') { + check_params(qw(source dest)); -zfs-zsync destroy -source [OPTIONS] + check_target($param->{source}); + check_target($param->{dest}); + sync($param); - remove a sync Job from the scheduler +} elsif ($command eq 'create') { + check_params(qw(source dest)); - -name string + check_target($param->{source}); + check_target($param->{dest}); + init($param); - name of the sync job, if not set it is default +} elsif ($command eq 'status') { + print status(); - -source string +} elsif ($command eq 'list') { + print list(); - the source can be an or [IP:][/Path] +} elsif ($command eq 'help') { + my $help_command = $ARGV[1]; -zfs-zsync list + if ($help_command && $cmd_help->{$help_command}) { + die "$cmd_help->{$help_command}\n"; - Get a List of all scheduled Sync Jobs + } + if ($param->{verbose}) { + exec("man $PROGNAME"); -zfs-zsync status + } else { + usage(1); - Get the status of all scheduled Sync Jobs + } -zfs-zsync sync -dest -source [OPTIONS] +} elsif ($command eq 'enable') { + check_params(qw(source)); - will sync one time + check_target($param->{source}); + enable_job($param); - -dest string +} elsif ($command eq 'disable') { + check_params(qw(source)); - the destination target is like [IP:][/Path] + check_target($param->{source}); + disable_job($param); - -limit integer +} elsif ($command eq 'printpod') { + print_pod(); +} - max sync speed in kBytes/s, default unlimited +sub usage { + my ($help) = @_; - -maxsnap integer + print("ERROR:\tno command specified\n") if !$help; + print("USAGE:\t$PROGNAME [ARGS] [OPTIONS]\n"); + print("\t$PROGNAME help [] [OPTIONS]\n\n"); + print("\t$PROGNAME create -dest -source [OPTIONS]\n"); + print("\t$PROGNAME destroy -source [OPTIONS]\n"); + print("\t$PROGNAME disable -source [OPTIONS]\n"); + print("\t$PROGNAME enable -source [OPTIONS]\n"); + print("\t$PROGNAME list\n"); + print("\t$PROGNAME status\n"); + print("\t$PROGNAME sync -dest -source [OPTIONS]\n"); +} - how much snapshots will be kept before get erased, default 1 +sub check_target { + my ($target) = @_; + parse_target($target); +} - -name string +sub print_pod { - name of the sync job, if not set it is default. - It is only necessary if scheduler allready contains this source. + my $synopsis = join("\n", sort values %$cmd_help); - -source string + print < or [IP:][/Path] +pve-zsync - PVE ZFS Replication Manager + +=head1 SYNOPSIS + +pve-zsync [ARGS] [OPTIONS] + +$synopsis =head1 DESCRIPTION This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers. This tool also has the capability to add jobs to cron so the sync will be automatically done. +The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync. +To config cron see man crontab. =head2 PVE ZFS Storage sync Tool @@ -1140,13 +1330,15 @@ This Tool can get remote pool on other PVE or send Pool to others ZFS machines =head1 EXAMPLES add sync job from local VM to remote ZFS Server -zfs-zsync -source=100 -dest=192.168.1.2:zfspool +pve-zsync create -source=100 -dest=192.168.1.2:zfspool =head1 IMPORTANT FILES -Where the cron jobs are stored /etc/cron.d/pve-zsync -Where the VM config get copied on the destination machine /var/pve-zsync -Where the config is stored /var/pve-zsync +Cron jobs and config are stored at /etc/cron.d/pve-zsync + +The VM config get copied on the destination machine to /var/lib/pve-zsync/ + +=head1 COPYRIGHT AND DISCLAIMER Copyright (C) 2007-2015 Proxmox Server Solutions GmbH @@ -1163,3 +1355,6 @@ Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . + +EOF +}