#!/usr/bin/perl use strict; use warnings; use Fcntl qw(:flock SEEK_END); use Getopt::Long qw(GetOptionsFromArray); use File::Path qw(make_path); use JSON; use IO::File; use String::ShellQuote 'shell_quote'; use Text::ParseWords; my $PROGNAME = "pve-zsync"; my $CONFIG_PATH = "/var/lib/${PROGNAME}"; my $STATE = "${CONFIG_PATH}/sync_state"; my $CRONJOBS = "/etc/cron.d/$PROGNAME"; my $PATH = "/usr/sbin"; my $PVE_DIR = "/etc/pve/local"; my $QEMU_CONF = "${PVE_DIR}/qemu-server"; my $LXC_CONF = "${PVE_DIR}/lxc"; my $PROG_PATH = "$PATH/${PROGNAME}"; my $INTERVAL = 15; my $DEBUG; BEGIN { $DEBUG = 0; # change default here. not above on declaration! $DEBUG ||= $ENV{ZSYNC_DEBUG}; if ($DEBUG) { require Data::Dumper; Data::Dumper->import(); } } my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])"; my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)"; my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})"; my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))"; my $IPV6RE = "(?:" . "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" . "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" . "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" . "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" . "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" . "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" . "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" . "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" . "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))"; my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!; my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|tpmstate|mp)\d+)|rootfs): /; my $INSTANCE_ID = get_instance_id($$); my $command = $ARGV[0]; if (defined($command) && $command ne 'help' && $command ne 'printpod') { check_bin ('cstream'); check_bin ('zfs'); check_bin ('ssh'); check_bin ('scp'); } $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub { die "Signaled, aborting sync: $!\n"; }; sub check_bin { my ($bin) = @_; foreach my $p (split (/:/, $ENV{PATH})) { my $fn = "$p/$bin"; if (-x $fn) { return $fn; } } die "unable to find command '$bin'\n"; } sub read_file { my ($filename, $one_line_only) = @_; my $fh = IO::File->new($filename, "r") or die "Could not open file ${filename}: $!\n"; my $text = $one_line_only ? <$fh> : [ <$fh> ]; close($fh); return $text; } sub cut_target_width { my ($path, $maxlen) = @_; $path =~ s@/+@/@g; return $path if length($path) <= $maxlen; return '..'.substr($path, -$maxlen+2) if $path !~ m@/@; $path =~ s@/([^/]+/?)$@@; my $tail = $1; if (length($tail)+3 == $maxlen) { return "../$tail"; } elsif (length($tail)+2 >= $maxlen) { return '..'.substr($tail, -$maxlen+2) } $path =~ s@(/[^/]+)(?:/|$)@@; my $head = $1; my $both = length($head) + length($tail); my $remaining = $maxlen-$both-4; # -4 for "/../" if ($remaining < 0) { return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../" } substr($path, ($remaining/2), (length($path)-$remaining), '..'); return "$head/" . $path . "/$tail"; } sub locked { my ($lock_fn, $code) = @_; my $lock_fh = IO::File->new("> $lock_fn"); flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n"; my $res = eval { $code->() }; my $err = $@; flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n"; die "$err" if $err; close($lock_fh); return $res; } sub get_status { my ($source, $name, $status) = @_; if ($status->{$source->{all}}->{$name}->{status}) { return $status; } return undef; } sub check_dataset_exists { my ($dataset, $ip, $user) = @_; my $cmd = []; if ($ip) { push @$cmd, 'ssh', "$user\@$ip", '--'; } push @$cmd, 'zfs', 'list', '-H', '--', $dataset; eval { run_cmd($cmd); }; if ($@) { return 0; } return 1; } sub create_file_system { my ($file_system, $ip, $user) = @_; my $cmd = []; if ($ip) { push @$cmd, 'ssh', "$user\@$ip", '--'; } push @$cmd, 'zfs', 'create', $file_system; run_cmd($cmd); } sub parse_target { my ($text) = @_; my $errstr = "$text : is not a valid input! Use [IP:] or [IP:][/Path]"; my $target = {}; if ($text !~ $TARGETRE) { die "$errstr\n"; } $target->{all} = $2; $target->{ip} = $1 if $1; my @parts = split('/', $2); $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip}; my $pool = $target->{pool} = shift(@parts); die "$errstr\n" if !$pool; if ($pool =~ m/^\d+$/) { $target->{vmid} = $pool; delete $target->{pool}; } return $target if (@parts == 0); $target->{last_part} = pop(@parts); if ($target->{ip}) { pop(@parts); } if (@parts > 0) { $target->{path} = join('/', @parts); } return $target; } sub read_cron { #This is for the first use to init file; if (!-e $CRONJOBS) { my $new_fh = IO::File->new("> $CRONJOBS"); die "Could not create $CRONJOBS: $!\n" if !$new_fh; close($new_fh); return undef; } my $text = read_file($CRONJOBS, 0); return parse_cron(@{$text}); } sub parse_argv { my (@arg) = @_; my $param = { dest => undef, source => undef, verbose => undef, limit => undef, maxsnap => undef, dest_maxsnap => undef, name => undef, skip => undef, method => undef, source_user => undef, dest_user => undef, prepend_storage_id => undef, properties => undef, dest_config_path => undef, }; my ($ret) = GetOptionsFromArray( \@arg, 'dest=s' => \$param->{dest}, 'source=s' => \$param->{source}, 'verbose' => \$param->{verbose}, 'limit=i' => \$param->{limit}, 'maxsnap=i' => \$param->{maxsnap}, 'dest-maxsnap=i' => \$param->{dest_maxsnap}, 'name=s' => \$param->{name}, 'skip' => \$param->{skip}, 'method=s' => \$param->{method}, 'source-user=s' => \$param->{source_user}, 'dest-user=s' => \$param->{dest_user}, 'prepend-storage-id' => \$param->{prepend_storage_id}, 'properties' => \$param->{properties}, 'dest-config-path=s' => \$param->{dest_config_path}, ); die "can't parse options\n" if $ret == 0; $param->{name} //= "default"; $param->{maxsnap} //= 1; $param->{method} //= "ssh"; $param->{source_user} //= "root"; $param->{dest_user} //= "root"; return $param; } sub add_state_to_job { my ($job) = @_; my $states = read_state(); my $state = $states->{$job->{source}}->{$job->{name}}; $job->{state} = $state->{state}; $job->{lsync} = $state->{lsync}; $job->{vm_type} = $state->{vm_type}; $job->{instance_id} = $state->{instance_id}; for (my $i = 0; $state->{"snap$i"}; $i++) { $job->{"snap$i"} = $state->{"snap$i"}; } return $job; } sub parse_cron { my (@text) = @_; my $cfg = {}; while (my $line = shift(@text)) { my @arg = Text::ParseWords::shellwords($line); my $param = parse_argv(@arg); if ($param->{source} && $param->{dest}) { my $source = delete $param->{source}; my $name = delete $param->{name}; $cfg->{$source}->{$name} = $param; } } return $cfg; } sub param_to_job { my ($param) = @_; my $job = {}; my $source = parse_target($param->{source}); my $dest; $dest = parse_target($param->{dest}) if $param->{dest}; $job->{name} = !$param->{name} ? "default" : $param->{name}; $job->{dest} = $param->{dest} if $param->{dest}; $job->{method} = "local" if !$dest->{ip} && !$source->{ip}; $job->{method} = "ssh" if !$job->{method}; $job->{limit} = $param->{limit}; $job->{maxsnap} = $param->{maxsnap}; $job->{dest_maxsnap} = $param->{dest_maxsnap}; $job->{source} = $param->{source}; $job->{source_user} = $param->{source_user}; $job->{dest_user} = $param->{dest_user}; $job->{prepend_storage_id} = !!$param->{prepend_storage_id}; $job->{properties} = !!$param->{properties}; $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path}; return $job; } sub read_state { if (!-e $STATE) { make_path $CONFIG_PATH; my $new_fh = IO::File->new("> $STATE"); die "Could not create $STATE: $!\n" if !$new_fh; print $new_fh "{}"; close($new_fh); return undef; } my $text = read_file($STATE, 1); return decode_json($text); } sub update_state { my ($job) = @_; my $text = eval { read_file($STATE, 1); }; my $out_fh = IO::File->new("> $STATE.new"); die "Could not open file ${STATE}.new: $!\n" if !$out_fh; my $states = {}; my $state = {}; if ($text){ $states = decode_json($text); $state = $states->{$job->{source}}->{$job->{name}}; } if ($job->{state} ne "del") { $state->{state} = $job->{state}; $state->{lsync} = $job->{lsync}; $state->{instance_id} = $job->{instance_id}; $state->{vm_type} = $job->{vm_type}; for (my $i = 0; $job->{"snap$i"} ; $i++) { $state->{"snap$i"} = $job->{"snap$i"}; } $states->{$job->{source}}->{$job->{name}} = $state; } else { delete $states->{$job->{source}}->{$job->{name}}; delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}}; } $text = encode_json($states); print $out_fh $text; close($out_fh); rename "$STATE.new", $STATE; return $states; } sub update_cron { my ($job) = @_; my $updated; my $has_header; my $line_no = 0; my $text = ""; my $header = "SHELL=/bin/sh\n"; $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n"; my $current = read_file($CRONJOBS, 0); foreach my $line (@{$current}) { chomp($line); if ($line =~ m/source $job->{source} .*name $job->{name} /) { $updated = 1; next if $job->{state} eq "del"; $text .= format_job($job, $line); } else { if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) { $has_header = 1; } $text .= "$line\n"; } $line_no++; } if (!$has_header) { $text = "$header$text"; } if (!$updated) { $text .= format_job($job); } my $new_fh = IO::File->new("> ${CRONJOBS}.new"); die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh; print $new_fh $text or die "can't write to $CRONJOBS.new: $!\n"; close ($new_fh); rename "${CRONJOBS}.new", $CRONJOBS or die "can't move $CRONJOBS.new: $!\n"; } sub format_job { my ($job, $line) = @_; my $text = ""; if ($job->{state} eq "stopped") { $text = "#"; } if ($line) { $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/; $text .= $1; } else { $text .= "*/$INTERVAL * * * *"; } $text .= " root"; $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}"; $text .= " --name $job->{name} --maxsnap $job->{maxsnap}"; $text .= " --dest-maxsnap $job->{dest_maxsnap}" if defined($job->{dest_maxsnap}); $text .= " --limit $job->{limit}" if $job->{limit}; $text .= " --method $job->{method}"; $text .= " --verbose" if $job->{verbose}; $text .= " --source-user $job->{source_user}"; $text .= " --dest-user $job->{dest_user}"; $text .= " --prepend-storage-id" if $job->{prepend_storage_id}; $text .= " --properties" if $job->{properties}; $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path}; $text .= "\n"; return $text; } sub list { my $cfg = read_cron(); my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON"); my $states = read_state(); foreach my $source (sort keys%{$cfg}) { foreach my $name (sort keys%{$cfg->{$source}}) { $list .= sprintf("%-25s", cut_target_width($source, 25)); $list .= sprintf("%-25s", cut_target_width($name, 25)); $list .= sprintf("%-10s", $states->{$source}->{$name}->{state}); $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync}); $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef"); $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method}); } } return $list; } sub vm_exists { my ($target, $user) = @_; return undef if !defined($target->{vmid}); my $conf_fn = "$target->{vmid}.conf"; if ($target->{ip}) { my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls'); return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) }; return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) }; } else { return "qemu" if -f "$QEMU_CONF/$conf_fn"; return "lxc" if -f "$LXC_CONF/$conf_fn"; } return undef; } sub init { my ($param) = @_; locked("$CONFIG_PATH/cron_and_state.lock", sub { my $cfg = read_cron(); my $job = param_to_job($param); $job->{state} = "ok"; $job->{lsync} = 0; my $source = parse_target($param->{source}); my $dest = parse_target($param->{dest}); if (my $ip = $dest->{ip}) { run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]); } if (my $ip = $source->{ip}) { run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]); } die "Pool $dest->{all} does not exist\n" if !check_dataset_exists($dest->{all}, $dest->{ip}, $param->{dest_user}); if (!defined($source->{vmid})) { die "Pool $source->{all} does not exist\n" if !check_dataset_exists($source->{all}, $source->{ip}, $param->{source_user}); } my $vm_type = vm_exists($source, $param->{source_user}); $job->{vm_type} = $vm_type; $source->{vm_type} = $vm_type; die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type; die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}}; #check if vm has zfs disks if not die; get_disks($source, $param->{source_user}) if $source->{vmid}; update_cron($job); update_state($job); }); #cron and state lock return if $param->{skip}; eval { sync($param) }; if (my $err = $@) { destroy_job($param); print $err; } } sub get_job { my ($param) = @_; my $cfg = read_cron(); if (!$cfg->{$param->{source}}->{$param->{name}}) { die "Job with source $param->{source} and name $param->{name} does not exist\n" ; } my $job = $cfg->{$param->{source}}->{$param->{name}}; $job->{name} = $param->{name}; $job->{source} = $param->{source}; $job = add_state_to_job($job); return $job; } sub destroy_job { my ($param) = @_; locked("$CONFIG_PATH/cron_and_state.lock", sub { my $job = get_job($param); $job->{state} = "del"; update_cron($job); update_state($job); }); } sub get_instance_id { my ($pid) = @_; my $stat = read_file("/proc/$pid/stat", 1) or die "unable to read process stats\n"; my $boot_id = read_file("/proc/sys/kernel/random/boot_id", 1) or die "unable to read boot ID\n"; my $stats = [ split(/\s+/, $stat) ]; my $starttime = $stats->[21]; chomp($boot_id); return "${pid}:${starttime}:${boot_id}"; } sub instance_exists { my ($instance_id) = @_; if (defined($instance_id) && $instance_id =~ m/^([1-9][0-9]*):/) { my $pid = $1; my $actual_id = eval { get_instance_id($pid); }; return defined($actual_id) && $actual_id eq $instance_id; } return 0; } sub sync { my ($param) = @_; my $job; locked("$CONFIG_PATH/cron_and_state.lock", sub { eval { $job = get_job($param) }; if ($job) { my $state = $job->{state} // 'ok'; $state = 'ok' if !instance_exists($job->{instance_id}); if ($state eq "syncing" || $state eq "waiting") { die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n"; } $job->{state} = "waiting"; $job->{instance_id} = $INSTANCE_ID; update_state($job); } }); locked("$CONFIG_PATH/sync.lock", sub { my $date = get_date(); my $dest; my $source; my $vm_type; locked("$CONFIG_PATH/cron_and_state.lock", sub { #job might've changed while we waited for the sync lock, but we can be sure it's not syncing eval { $job = get_job($param); }; if ($job && defined($job->{state}) && $job->{state} eq "stopped") { die "Job --source $param->{source} --name $param->{name} has been disabled\n"; } $dest = parse_target($param->{dest}); $source = parse_target($param->{source}); $vm_type = vm_exists($source, $param->{source_user}); $source->{vm_type} = $vm_type; if ($job) { $job->{state} = "syncing"; $job->{vm_type} = $vm_type if !$job->{vm_type}; update_state($job); } }); #cron and state lock my $sync_path = sub { my ($source, $dest, $job, $param, $date) = @_; my $dest_dataset = target_dataset($source, $dest); ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get( $dest_dataset, $param->{dest_maxsnap} // $param->{maxsnap}, $param->{name}, $dest->{ip}, $param->{dest_user}, ); ($source->{old_snap}) = snapshot_get( $source->{all}, $param->{maxsnap}, $param->{name}, $source->{ip}, $param->{source_user}, ); prepare_prepended_target($source, $dest, $param->{dest_user}) if defined($dest->{prepend}); snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user}); send_image($source, $dest, $param); for my $old_snap (@{$source->{old_snap}}) { snapshot_destroy($source->{all}, $old_snap, $source->{ip}, $param->{source_user}); } for my $old_snap (@{$dest->{old_snap}}) { snapshot_destroy($dest_dataset, $old_snap, $dest->{ip}, $param->{dest_user}); } }; eval{ if ($source->{vmid}) { die "VM $source->{vmid} doesn't exist\n" if !$vm_type; die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root"); my $disks = get_disks($source, $param->{source_user}); foreach my $disk (sort keys %{$disks}) { $source->{all} = $disks->{$disk}->{all}; $source->{pool} = $disks->{$disk}->{pool}; $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path}; $source->{last_part} = $disks->{$disk}->{last_part}; $dest->{prepend} = $disks->{$disk}->{storage_id} if $param->{prepend_storage_id}; &$sync_path($source, $dest, $job, $param, $date); } if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) { send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path}); } else { send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path}); } } else { &$sync_path($source, $dest, $job, $param, $date); } }; if (my $err = $@) { locked("$CONFIG_PATH/cron_and_state.lock", sub { eval { $job = get_job($param); }; if ($job) { $job->{state} = "error"; delete $job->{instance_id}; update_state($job); } }); print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n"; die "$err\n"; } locked("$CONFIG_PATH/cron_and_state.lock", sub { eval { $job = get_job($param); }; if ($job) { if (defined($job->{state}) && $job->{state} eq "stopped") { $job->{state} = "stopped"; } else { $job->{state} = "ok"; } $job->{lsync} = $date; delete $job->{instance_id}; update_state($job); } }); }); #sync lock } sub snapshot_get{ my ($dataset, $max_snap, $name, $ip, $user) = @_; my $cmd = []; push @$cmd, 'ssh', "$user\@$ip", '--', if $ip; push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation'; push @$cmd, $dataset; my $raw; eval {$raw = run_cmd($cmd)}; if (my $erro =$@) { #this means the volume doesn't exist on dest yet return undef; } my $index = 0; my $line = ""; my $last_snap = undef; my $old_snap = []; while ($raw && $raw =~ s/^(.*?)(\n|$)//) { $line = $1; if ($line =~ m/@(.*)$/) { $last_snap = $1 if (!$last_snap); } if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) { # interpreted as infinity last if $max_snap <= 0; my $snap = $1; $index++; if ($index >= $max_snap) { push @{$old_snap}, $snap; } } } return ($old_snap, $last_snap) if $last_snap; return undef; } sub snapshot_add { my ($source, $dest, $name, $date, $source_user, $dest_user) = @_; my $snap_name = "rep_$name\_".$date; $source->{new_snap} = $snap_name; my $path = "$source->{all}\@$snap_name"; my $cmd = []; push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip}; push @$cmd, 'zfs', 'snapshot', $path; eval{ run_cmd($cmd); }; if (my $err = $@) { snapshot_destroy($source->{all}, $snap_name, $source->{ip}, $source_user); die "$err\n"; } } sub get_disks { my ($target, $user) = @_; my $cmd = []; push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip}; if ($target->{vm_type} eq 'qemu') { push @$cmd, 'qm', 'config', $target->{vmid}; } elsif ($target->{vm_type} eq 'lxc') { push @$cmd, 'pct', 'config', $target->{vmid}; } else { die "VM Type unknown\n"; } my $res = run_cmd($cmd); my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user); return $disks; } sub run_cmd { my ($cmd) = @_; print "Start CMD\n" if $DEBUG; print Dumper $cmd if $DEBUG; if (ref($cmd) eq 'ARRAY') { $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd); } my $output = `$cmd 2>&1`; die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?; chomp($output); print Dumper $output if $DEBUG; print "END CMD\n" if $DEBUG; return $output; } sub parse_disks { my ($text, $ip, $vm_type, $user) = @_; my $disks; my $num = 0; while ($text && $text =~ s/^(.*?)(\n|$)//) { my $line = $1; next if $line =~ /media=cdrom/; next if $line !~ m/$DISK_KEY_RE/; #QEMU if backup is not set include in sync next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/); #LXC if backup is not set do no in sync next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/); my $disk = undef; my $stor = undef; if($line =~ m/$DISK_KEY_RE(.*)$/) { my @parameter = split(/,/,$1); foreach my $opt (@parameter) { if ($opt =~ m/^(?:file=|volume=)?([^:]+):([A-Za-z0-9\-]+)$/){ $disk = $2; $stor = $1; last; } } } if (!defined($disk) || !defined($stor)) { print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n"; next; } my $cmd = []; push @$cmd, 'ssh', "$user\@$ip", '--' if $ip; push @$cmd, 'pvesm', 'path', "$stor:$disk"; my $path = run_cmd($cmd); die "Get no path from pvesm path $stor:$disk\n" if !$path; $disks->{$num}->{storage_id} = $stor; if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) { my @array = split('/', $1); $disks->{$num}->{pool} = shift(@array); $disks->{$num}->{all} = $disks->{$num}->{pool}; if (0 < @array) { $disks->{$num}->{path} = join('/', @array); $disks->{$num}->{all} .= "\/$disks->{$num}->{path}"; } $disks->{$num}->{last_part} = $disk; $disks->{$num}->{all} .= "\/$disk"; $num++; } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) { $disks->{$num}->{pool} = $1; $disks->{$num}->{all} = $disks->{$num}->{pool}; if ($2) { $disks->{$num}->{path} = $3; $disks->{$num}->{all} .= "\/$disks->{$num}->{path}"; } $disks->{$num}->{last_part} = $disk; $disks->{$num}->{all} .= "\/$disk"; $num++; } else { die "ERROR: in path\n"; } } die "Vm include no disk on zfs.\n" if !$disks->{0}; return $disks; } # how the corresponding dataset is named on the target sub target_dataset { my ($source, $dest) = @_; my $target = "$dest->{all}"; $target .= "/$dest->{prepend}" if defined($dest->{prepend}); $target .= "/$source->{last_part}" if $source->{last_part}; $target =~ s!/+!/!g; return $target; } # create the parent dataset for the actual target sub prepare_prepended_target { my ($source, $dest, $dest_user) = @_; die "internal error - not a prepended target\n" if !defined($dest->{prepend}); # The parent dataset shouldn't be the actual target. die "internal error - no last_part for source\n" if !$source->{last_part}; my $target = "$dest->{all}/$dest->{prepend}"; $target =~ s!/+!/!g; return if check_dataset_exists($target, $dest->{ip}, $dest_user); create_file_system($target, $dest->{ip}, $dest_user); } sub snapshot_destroy { my ($dataset, $snap, $ip, $user) = @_; my @zfscmd = ('zfs', 'destroy'); my $snapshot = "$dataset\@$snap"; eval { if ($ip) { run_cmd(['ssh', "$user\@$ip", '--', @zfscmd, $snapshot]); } else { run_cmd([@zfscmd, $snapshot]); } }; if (my $erro = $@) { warn "WARN: $erro"; } } # check if snapshot for incremental sync exist on source side sub snapshot_exist { my ($source , $dest, $method, $source_user) = @_; my $cmd = []; push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip}; push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name'; my $path = $source->{all}; $path .= "\@$dest->{last_snap}"; push @$cmd, $path; eval {run_cmd($cmd)}; if (my $erro =$@) { warn "WARN: $erro"; return undef; } return 1; } sub send_image { my ($source, $dest, $param) = @_; my $cmd = []; push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip}; push @$cmd, 'zfs', 'send'; push @$cmd, '-p', if $param->{properties}; push @$cmd, '-v' if $param->{verbose}; if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) { push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}"; } push @$cmd, '--', "$source->{all}\@$source->{new_snap}"; if ($param->{limit}){ my $bwl = $param->{limit}*1024; push @$cmd, \'|', 'cstream', '-t', $bwl; } my $target = target_dataset($source, $dest); push @$cmd, \'|'; push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip}; push @$cmd, 'zfs', 'recv', '-F', '--'; push @$cmd, "$target"; eval { run_cmd($cmd) }; if (my $erro = $@) { snapshot_destroy($source->{all}, $source->{new_snap}, $source->{ip}, $param->{source_user}); die $erro; }; } sub send_config{ my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_; my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf"; my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}"; my $config_dir = $dest_config_path // $CONFIG_PATH; $config_dir .= "/$dest->{last_part}" if $dest->{last_part}; $dest_target_new = $config_dir.'/'.$dest_target_new; if ($method eq 'ssh'){ if ($dest->{ip} && $source->{ip}) { run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]); run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]); } elsif ($dest->{ip}) { run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]); run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]); } elsif ($source->{ip}) { run_cmd(['mkdir', '-p', '--', $config_dir]); run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]); } for my $old_snap (@{$dest->{old_snap}}) { my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.${old_snap}"; if($dest->{ip}){ run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]); } else { run_cmd(['rm', '-f', '--', $dest_target_old]); } } } elsif ($method eq 'local') { run_cmd(['mkdir', '-p', '--', $config_dir]); run_cmd(['cp', $source_target, $dest_target_new]); } } sub get_date { my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time); my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec); return $datestamp; } sub status { my $cfg = read_cron(); my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS"); my $states = read_state(); foreach my $source (sort keys%{$cfg}) { foreach my $sync_name (sort keys%{$cfg->{$source}}) { $status_list .= sprintf("%-25s", cut_target_width($source, 25)); $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25)); $status_list .= "$states->{$source}->{$sync_name}->{state}\n"; } } return $status_list; } sub enable_job { my ($param) = @_; locked("$CONFIG_PATH/cron_and_state.lock", sub { my $job = get_job($param); $job->{state} = "ok"; update_state($job); update_cron($job); }); } sub disable_job { my ($param) = @_; locked("$CONFIG_PATH/cron_and_state.lock", sub { my $job = get_job($param); $job->{state} = "stopped"; update_state($job); update_cron($job); }); } my $cmd_help = { destroy => qq{ $PROGNAME destroy --source [OPTIONS] Remove a sync Job from the scheduler --name string The name of the sync job, if not set 'default' is used. --source string The source can be an or [IP:][/Path] }, create => qq{ $PROGNAME create --dest --source [OPTIONS] Create a new sync-job --dest string The destination target is like [IP]:[/Path] --dest-user string The name of the user on the destination target, root by default --limit integer Maximal sync speed in kBytes/s, default is unlimited --maxsnap integer The number of snapshots to keep until older ones are erased. The default is 1, use 0 for unlimited. --dest-maxsnap integer Override maxsnap for the destination dataset. --name string The name of the sync job, if not set it is default --prepend-storage-id If specified, prepend the storage ID to the destination's path(s). --skip If specified, skip the first sync. --source string The source can be an or [IP:][/Path] --source-user string The (ssh) user-name on the source target, root by default --properties If specified, include the dataset's properties in the stream. --dest-config-path string Specifies a custom config path on the destination target. The default is /var/lib/pve-zsync }, sync => qq{ $PROGNAME sync --dest --source [OPTIONS]\n Trigger one sync. --dest string The destination target is like [IP:][/Path] --dest-user string The (ssh) user-name on the destination target, root by default --limit integer The maximal sync speed in kBytes/s, default is unlimited --maxsnap integer The number of snapshots to keep until older ones are erased. The default is 1, use 0 for unlimited. --dest-maxsnap integer Override maxsnap for the destination dataset. --name string The name of the sync job, if not set it is 'default'. It is only necessary if scheduler allready contains this source. --prepend-storage-id If specified, prepend the storage ID to the destination's path(s). --source string The source can either be an or [IP:][/Path] --source-user string The name of the user on the source target, root by default --verbose If specified, print out the sync progress. --properties If specified, include the dataset's properties in the stream. --dest-config-path string Specifies a custom config path on the destination target. The default is /var/lib/pve-zsync }, list => qq{ $PROGNAME list Get a List of all scheduled Sync Jobs }, status => qq{ $PROGNAME status Get the status of all scheduled Sync Jobs }, help => qq{ $PROGNAME help [OPTIONS] Get help about specified command. string Command name to get help about. --verbose Verbose output format. }, enable => qq{ $PROGNAME enable --source [OPTIONS] Enable a sync-job and reset all job-errors, if any. --name string name of the sync job, if not set it is default --source string the source can be an or [IP:][/Path] }, disable => qq{ $PROGNAME disable --source [OPTIONS] Disables (pauses) a sync-job --name string name of the sync-job, if not set it is default --source string the source can be an or [IP:][/Path] }, printpod => "$PROGNAME printpod\n\n\tinternal command", }; if (!$command) { usage(); die "\n"; } elsif (!$cmd_help->{$command}) { print "ERROR: unknown command '$command'"; usage(1); die "\n"; } my @arg = @ARGV; my $param = parse_argv(@arg); sub check_params { for (@_) { die "$cmd_help->{$command}\n" if !$param->{$_}; } } if ($command eq 'destroy') { check_params(qw(source)); check_target($param->{source}); destroy_job($param); } elsif ($command eq 'sync') { check_params(qw(source dest)); check_target($param->{source}); check_target($param->{dest}); sync($param); } elsif ($command eq 'create') { check_params(qw(source dest)); check_target($param->{source}); check_target($param->{dest}); init($param); } elsif ($command eq 'status') { print status(); } elsif ($command eq 'list') { print list(); } elsif ($command eq 'help') { my $help_command = $ARGV[1]; if ($help_command && $cmd_help->{$help_command}) { die "$cmd_help->{$help_command}\n"; } if ($param->{verbose}) { exec("man $PROGNAME"); } else { usage(1); } } elsif ($command eq 'enable') { check_params(qw(source)); check_target($param->{source}); enable_job($param); } elsif ($command eq 'disable') { check_params(qw(source)); check_target($param->{source}); disable_job($param); } elsif ($command eq 'printpod') { print_pod(); } sub usage { my ($help) = @_; print("ERROR:\tno command specified\n") if !$help; print("USAGE:\t$PROGNAME [ARGS] [OPTIONS]\n"); print("\t$PROGNAME help [] [OPTIONS]\n\n"); print("\t$PROGNAME create --dest --source [OPTIONS]\n"); print("\t$PROGNAME destroy --source [OPTIONS]\n"); print("\t$PROGNAME disable --source [OPTIONS]\n"); print("\t$PROGNAME enable --source [OPTIONS]\n"); print("\t$PROGNAME list\n"); print("\t$PROGNAME status\n"); print("\t$PROGNAME sync --dest --source [OPTIONS]\n"); } sub check_target { my ($target) = @_; parse_target($target); } sub print_pod { my $synopsis = join("\n", sort values %$cmd_help); my $commands = join(", ", sort keys %$cmd_help); print < [ARGS] [OPTIONS] Where can be one of: $commands =head1 DESCRIPTION The pve-zsync tool can help you to sync your VMs or directories stored on ZFS between multiple servers. pve-zsync is able to automatically configure CRON jobs, so that a periodic sync will be automatically triggered. The default sync interval is 15 min, if you want to change this value you can do this in F. If you need help to configure CRON tabs, see man crontab. =head1 COMMANDS AND OPTIONS $synopsis =head1 EXAMPLES Adds a job for syncing the local VM 100 to a remote server's ZFS pool named "tank": pve-zsync create --source=100 -dest=192.168.1.2:tank =head1 IMPORTANT FILES Cron jobs and config are stored in F The VM configuration itself gets copied to the destination machines F path. =head1 COPYRIGHT AND DISCLAIMER Copyright (C) 2007-2021 Proxmox Server Solutions GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . EOF }