]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
docs: use standard long-opt double --arg
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $PROG_PATH = "$PATH/${PROGNAME}";
22 my $INTERVAL = 15;
23 my $DEBUG;
24
25 BEGIN {
26 $DEBUG = 0; # change default here. not above on declaration!
27 $DEBUG ||= $ENV{ZSYNC_DEBUG};
28 if ($DEBUG) {
29 require Data::Dumper;
30 Data::Dumper->import();
31 }
32 }
33
34 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
35 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
36 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
37 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
38
39 my $IPV6RE = "(?:" .
40 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
41 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
42 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
43 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
49
50 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
51 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
52 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
53 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
54 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
55
56 my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|mp)\d+)|rootfs): /;
57
58 my $INSTANCE_ID = get_instance_id($$);
59
60 my $command = $ARGV[0];
61
62 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
63 check_bin ('cstream');
64 check_bin ('zfs');
65 check_bin ('ssh');
66 check_bin ('scp');
67 }
68
69 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
70 die "Signaled, aborting sync: $!\n";
71 };
72
73 sub check_bin {
74 my ($bin) = @_;
75
76 foreach my $p (split (/:/, $ENV{PATH})) {
77 my $fn = "$p/$bin";
78 if (-x $fn) {
79 return $fn;
80 }
81 }
82
83 die "unable to find command '$bin'\n";
84 }
85
86 sub read_file {
87 my ($filename, $one_line_only) = @_;
88
89 my $fh = IO::File->new($filename, "r")
90 or die "Could not open file ${filename}: $!\n";
91
92 my $text = $one_line_only ? <$fh> : [ <$fh> ];
93
94 close($fh);
95
96 return $text;
97 }
98
99 sub cut_target_width {
100 my ($path, $maxlen) = @_;
101 $path =~ s@/+@/@g;
102
103 return $path if length($path) <= $maxlen;
104
105 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
106
107 $path =~ s@/([^/]+/?)$@@;
108 my $tail = $1;
109
110 if (length($tail)+3 == $maxlen) {
111 return "../$tail";
112 } elsif (length($tail)+2 >= $maxlen) {
113 return '..'.substr($tail, -$maxlen+2)
114 }
115
116 $path =~ s@(/[^/]+)(?:/|$)@@;
117 my $head = $1;
118 my $both = length($head) + length($tail);
119 my $remaining = $maxlen-$both-4; # -4 for "/../"
120
121 if ($remaining < 0) {
122 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
123 }
124
125 substr($path, ($remaining/2), (length($path)-$remaining), '..');
126 return "$head/" . $path . "/$tail";
127 }
128
129 sub locked {
130 my ($lock_fn, $code) = @_;
131
132 my $lock_fh = IO::File->new("> $lock_fn");
133
134 flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
135 my $res = eval { $code->() };
136 my $err = $@;
137
138 flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
139 die "$err" if $err;
140
141 close($lock_fh);
142 return $res;
143 }
144
145 sub get_status {
146 my ($source, $name, $status) = @_;
147
148 if ($status->{$source->{all}}->{$name}->{status}) {
149 return $status;
150 }
151
152 return undef;
153 }
154
155 sub check_dataset_exists {
156 my ($dataset, $ip, $user) = @_;
157
158 my $cmd = [];
159
160 if ($ip) {
161 push @$cmd, 'ssh', "$user\@$ip", '--';
162 }
163 push @$cmd, 'zfs', 'list', '-H', '--', $dataset;
164 eval {
165 run_cmd($cmd);
166 };
167
168 if ($@) {
169 return 0;
170 }
171 return 1;
172 }
173
174 sub create_file_system {
175 my ($file_system, $ip, $user) = @_;
176
177 my $cmd = [];
178
179 if ($ip) {
180 push @$cmd, 'ssh', "$user\@$ip", '--';
181 }
182 push @$cmd, 'zfs', 'create', $file_system;
183
184 run_cmd($cmd);
185 }
186
187 sub parse_target {
188 my ($text) = @_;
189
190 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
191 my $target = {};
192
193 if ($text !~ $TARGETRE) {
194 die "$errstr\n";
195 }
196 $target->{all} = $2;
197 $target->{ip} = $1 if $1;
198 my @parts = split('/', $2);
199
200 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
201
202 my $pool = $target->{pool} = shift(@parts);
203 die "$errstr\n" if !$pool;
204
205 if ($pool =~ m/^\d+$/) {
206 $target->{vmid} = $pool;
207 delete $target->{pool};
208 }
209
210 return $target if (@parts == 0);
211 $target->{last_part} = pop(@parts);
212
213 if ($target->{ip}) {
214 pop(@parts);
215 }
216 if (@parts > 0) {
217 $target->{path} = join('/', @parts);
218 }
219
220 return $target;
221 }
222
223 sub read_cron {
224
225 #This is for the first use to init file;
226 if (!-e $CRONJOBS) {
227 my $new_fh = IO::File->new("> $CRONJOBS");
228 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
229 close($new_fh);
230 return undef;
231 }
232
233 my $text = read_file($CRONJOBS, 0);
234
235 return encode_cron(@{$text});
236 }
237
238 sub parse_argv {
239 my (@arg) = @_;
240
241 my $param = {
242 dest => undef,
243 source => undef,
244 verbose => undef,
245 limit => undef,
246 maxsnap => undef,
247 name => undef,
248 skip => undef,
249 method => undef,
250 source_user => undef,
251 dest_user => undef,
252 prepend_storage_id => undef,
253 properties => undef,
254 dest_config_path => undef,
255 };
256
257 my ($ret) = GetOptionsFromArray(
258 \@arg,
259 'dest=s' => \$param->{dest},
260 'source=s' => \$param->{source},
261 'verbose' => \$param->{verbose},
262 'limit=i' => \$param->{limit},
263 'maxsnap=i' => \$param->{maxsnap},
264 'name=s' => \$param->{name},
265 'skip' => \$param->{skip},
266 'method=s' => \$param->{method},
267 'source-user=s' => \$param->{source_user},
268 'dest-user=s' => \$param->{dest_user},
269 'prepend-storage-id' => \$param->{prepend_storage_id},
270 'properties' => \$param->{properties},
271 'dest-config-path=s' => \$param->{dest_config_path},
272 );
273
274 die "can't parse options\n" if $ret == 0;
275
276 $param->{name} //= "default";
277 $param->{maxsnap} //= 1;
278 $param->{method} //= "ssh";
279 $param->{source_user} //= "root";
280 $param->{dest_user} //= "root";
281
282 return $param;
283 }
284
285 sub add_state_to_job {
286 my ($job) = @_;
287
288 my $states = read_state();
289 my $state = $states->{$job->{source}}->{$job->{name}};
290
291 $job->{state} = $state->{state};
292 $job->{lsync} = $state->{lsync};
293 $job->{vm_type} = $state->{vm_type};
294 $job->{instance_id} = $state->{instance_id};
295
296 for (my $i = 0; $state->{"snap$i"}; $i++) {
297 $job->{"snap$i"} = $state->{"snap$i"};
298 }
299
300 return $job;
301 }
302
303 sub encode_cron {
304 my (@text) = @_;
305
306 my $cfg = {};
307
308 while (my $line = shift(@text)) {
309
310 my @arg = split('\s', $line);
311 my $param = parse_argv(@arg);
312
313 if ($param->{source} && $param->{dest}) {
314 my $source = delete $param->{source};
315 my $name = delete $param->{name};
316
317 $cfg->{$source}->{$name} = $param;
318 }
319 }
320
321 return $cfg;
322 }
323
324 sub param_to_job {
325 my ($param) = @_;
326
327 my $job = {};
328
329 my $source = parse_target($param->{source});
330 my $dest;
331 $dest = parse_target($param->{dest}) if $param->{dest};
332
333 $job->{name} = !$param->{name} ? "default" : $param->{name};
334 $job->{dest} = $param->{dest} if $param->{dest};
335 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
336 $job->{method} = "ssh" if !$job->{method};
337 $job->{limit} = $param->{limit};
338 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
339 $job->{source} = $param->{source};
340 $job->{source_user} = $param->{source_user};
341 $job->{dest_user} = $param->{dest_user};
342 $job->{prepend_storage_id} = !!$param->{prepend_storage_id};
343 $job->{properties} = !!$param->{properties};
344 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
345
346 return $job;
347 }
348
349 sub read_state {
350
351 if (!-e $STATE) {
352 make_path $CONFIG_PATH;
353 my $new_fh = IO::File->new("> $STATE");
354 die "Could not create $STATE: $!\n" if !$new_fh;
355 print $new_fh "{}";
356 close($new_fh);
357 return undef;
358 }
359
360 my $text = read_file($STATE, 1);
361 return decode_json($text);
362 }
363
364 sub update_state {
365 my ($job) = @_;
366
367 my $text = eval { read_file($STATE, 1); };
368
369 my $out_fh = IO::File->new("> $STATE.new");
370 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
371
372 my $states = {};
373 my $state = {};
374 if ($text){
375 $states = decode_json($text);
376 $state = $states->{$job->{source}}->{$job->{name}};
377 }
378
379 if ($job->{state} ne "del") {
380 $state->{state} = $job->{state};
381 $state->{lsync} = $job->{lsync};
382 $state->{instance_id} = $job->{instance_id};
383 $state->{vm_type} = $job->{vm_type};
384
385 for (my $i = 0; $job->{"snap$i"} ; $i++) {
386 $state->{"snap$i"} = $job->{"snap$i"};
387 }
388 $states->{$job->{source}}->{$job->{name}} = $state;
389 } else {
390
391 delete $states->{$job->{source}}->{$job->{name}};
392 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
393 }
394
395 $text = encode_json($states);
396 print $out_fh $text;
397
398 close($out_fh);
399 rename "$STATE.new", $STATE;
400
401 return $states;
402 }
403
404 sub update_cron {
405 my ($job) = @_;
406
407 my $updated;
408 my $has_header;
409 my $line_no = 0;
410 my $text = "";
411 my $header = "SHELL=/bin/sh\n";
412 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
413
414 my $current = read_file($CRONJOBS, 0);
415
416 foreach my $line (@{$current}) {
417 chomp($line);
418 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
419 $updated = 1;
420 next if $job->{state} eq "del";
421 $text .= format_job($job, $line);
422 } else {
423 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
424 $has_header = 1;
425 }
426 $text .= "$line\n";
427 }
428 $line_no++;
429 }
430
431 if (!$has_header) {
432 $text = "$header$text";
433 }
434
435 if (!$updated) {
436 $text .= format_job($job);
437 }
438 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
439 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
440
441 print $new_fh $text or die "can't write to $CRONJOBS.new: $!\n";
442 close ($new_fh);
443
444 rename "${CRONJOBS}.new", $CRONJOBS or die "can't move $CRONJOBS.new: $!\n";
445 }
446
447 sub format_job {
448 my ($job, $line) = @_;
449 my $text = "";
450
451 if ($job->{state} eq "stopped") {
452 $text = "#";
453 }
454 if ($line) {
455 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
456 $text .= $1;
457 } else {
458 $text .= "*/$INTERVAL * * * *";
459 }
460 $text .= " root";
461 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
462 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
463 $text .= " --limit $job->{limit}" if $job->{limit};
464 $text .= " --method $job->{method}";
465 $text .= " --verbose" if $job->{verbose};
466 $text .= " --source-user $job->{source_user}";
467 $text .= " --dest-user $job->{dest_user}";
468 $text .= " --prepend-storage-id" if $job->{prepend_storage_id};
469 $text .= " --properties" if $job->{properties};
470 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
471 $text .= "\n";
472
473 return $text;
474 }
475
476 sub list {
477
478 my $cfg = read_cron();
479
480 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
481
482 my $states = read_state();
483 foreach my $source (sort keys%{$cfg}) {
484 foreach my $name (sort keys%{$cfg->{$source}}) {
485 $list .= sprintf("%-25s", cut_target_width($source, 25));
486 $list .= sprintf("%-25s", cut_target_width($name, 25));
487 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
488 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
489 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
490 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
491 }
492 }
493
494 return $list;
495 }
496
497 sub vm_exists {
498 my ($target, $user) = @_;
499
500 return undef if !defined($target->{vmid});
501
502 my $conf_fn = "$target->{vmid}.conf";
503
504 if ($target->{ip}) {
505 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
506 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
507 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
508 } else {
509 return "qemu" if -f "$QEMU_CONF/$conf_fn";
510 return "lxc" if -f "$LXC_CONF/$conf_fn";
511 }
512
513 return undef;
514 }
515
516 sub init {
517 my ($param) = @_;
518
519 locked("$CONFIG_PATH/cron_and_state.lock", sub {
520 my $cfg = read_cron();
521
522 my $job = param_to_job($param);
523
524 $job->{state} = "ok";
525 $job->{lsync} = 0;
526
527 my $source = parse_target($param->{source});
528 my $dest = parse_target($param->{dest});
529
530 if (my $ip = $dest->{ip}) {
531 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
532 }
533
534 if (my $ip = $source->{ip}) {
535 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
536 }
537
538 die "Pool $dest->{all} does not exist\n"
539 if !check_dataset_exists($dest->{all}, $dest->{ip}, $param->{dest_user});
540
541 if (!defined($source->{vmid})) {
542 die "Pool $source->{all} does not exist\n"
543 if !check_dataset_exists($source->{all}, $source->{ip}, $param->{source_user});
544 }
545
546 my $vm_type = vm_exists($source, $param->{source_user});
547 $job->{vm_type} = $vm_type;
548 $source->{vm_type} = $vm_type;
549
550 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
551
552 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
553
554 #check if vm has zfs disks if not die;
555 get_disks($source, $param->{source_user}) if $source->{vmid};
556
557 update_cron($job);
558 update_state($job);
559 }); #cron and state lock
560
561 return if $param->{skip};
562
563 eval { sync($param) };
564 if (my $err = $@) {
565 destroy_job($param);
566 print $err;
567 }
568 }
569
570 sub get_job {
571 my ($param) = @_;
572
573 my $cfg = read_cron();
574
575 if (!$cfg->{$param->{source}}->{$param->{name}}) {
576 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
577 }
578 my $job = $cfg->{$param->{source}}->{$param->{name}};
579 $job->{name} = $param->{name};
580 $job->{source} = $param->{source};
581 $job = add_state_to_job($job);
582
583 return $job;
584 }
585
586 sub destroy_job {
587 my ($param) = @_;
588
589 locked("$CONFIG_PATH/cron_and_state.lock", sub {
590 my $job = get_job($param);
591 $job->{state} = "del";
592
593 update_cron($job);
594 update_state($job);
595 });
596 }
597
598 sub get_instance_id {
599 my ($pid) = @_;
600
601 my $stat = read_file("/proc/$pid/stat", 1)
602 or die "unable to read process stats\n";
603 my $boot_id = read_file("/proc/sys/kernel/random/boot_id", 1)
604 or die "unable to read boot ID\n";
605
606 my $stats = [ split(/\s+/, $stat) ];
607 my $starttime = $stats->[21];
608 chomp($boot_id);
609
610 return "${pid}:${starttime}:${boot_id}";
611 }
612
613 sub instance_exists {
614 my ($instance_id) = @_;
615
616 if (defined($instance_id) && $instance_id =~ m/^([1-9][0-9]*):/) {
617 my $pid = $1;
618 my $actual_id = eval { get_instance_id($pid); };
619 return defined($actual_id) && $actual_id eq $instance_id;
620 }
621
622 return 0;
623 }
624
625 sub sync {
626 my ($param) = @_;
627
628 my $job;
629
630 locked("$CONFIG_PATH/cron_and_state.lock", sub {
631 eval { $job = get_job($param) };
632
633 if ($job) {
634 my $state = $job->{state} // 'ok';
635 $state = 'ok' if !instance_exists($job->{instance_id});
636
637 if ($state eq "syncing" || $state eq "waiting") {
638 die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
639 }
640
641 $job->{state} = "waiting";
642 $job->{instance_id} = $INSTANCE_ID;
643
644 update_state($job);
645 }
646 });
647
648 locked("$CONFIG_PATH/sync.lock", sub {
649
650 my $date = get_date();
651
652 my $dest;
653 my $source;
654 my $vm_type;
655
656 locked("$CONFIG_PATH/cron_and_state.lock", sub {
657 #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
658 eval { $job = get_job($param); };
659
660 if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
661 die "Job --source $param->{source} --name $param->{name} has been disabled\n";
662 }
663
664 $dest = parse_target($param->{dest});
665 $source = parse_target($param->{source});
666
667 $vm_type = vm_exists($source, $param->{source_user});
668 $source->{vm_type} = $vm_type;
669
670 if ($job) {
671 $job->{state} = "syncing";
672 $job->{vm_type} = $vm_type if !$job->{vm_type};
673 update_state($job);
674 }
675 }); #cron and state lock
676
677 my $sync_path = sub {
678 my ($source, $dest, $job, $param, $date) = @_;
679
680 ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{dest_user});
681
682 prepare_prepended_target($source, $dest, $param->{dest_user}) if defined($dest->{prepend});
683
684 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
685
686 send_image($source, $dest, $param);
687
688 snapshot_destroy($source, $dest, $param->{method}, $dest->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $dest->{old_snap});
689
690 };
691
692 eval{
693 if ($source->{vmid}) {
694 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
695 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
696 my $disks = get_disks($source, $param->{source_user});
697
698 foreach my $disk (sort keys %{$disks}) {
699 $source->{all} = $disks->{$disk}->{all};
700 $source->{pool} = $disks->{$disk}->{pool};
701 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
702 $source->{last_part} = $disks->{$disk}->{last_part};
703
704 $dest->{prepend} = $disks->{$disk}->{storage_id}
705 if $param->{prepend_storage_id};
706
707 &$sync_path($source, $dest, $job, $param, $date);
708 }
709 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
710 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
711 } else {
712 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
713 }
714 } else {
715 &$sync_path($source, $dest, $job, $param, $date);
716 }
717 };
718 if (my $err = $@) {
719 locked("$CONFIG_PATH/cron_and_state.lock", sub {
720 eval { $job = get_job($param); };
721 if ($job) {
722 $job->{state} = "error";
723 delete $job->{instance_id};
724 update_state($job);
725 }
726 });
727 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
728 die "$err\n";
729 }
730
731 locked("$CONFIG_PATH/cron_and_state.lock", sub {
732 eval { $job = get_job($param); };
733 if ($job) {
734 if (defined($job->{state}) && $job->{state} eq "stopped") {
735 $job->{state} = "stopped";
736 } else {
737 $job->{state} = "ok";
738 }
739 $job->{lsync} = $date;
740 delete $job->{instance_id};
741 update_state($job);
742 }
743 });
744 }); #sync lock
745 }
746
747 sub snapshot_get{
748 my ($source, $dest, $max_snap, $name, $dest_user) = @_;
749
750 my $cmd = [];
751 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--', if $dest->{ip};
752 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
753
754 my $path = target_dataset($source, $dest);
755 push @$cmd, $path;
756
757 my $raw;
758 eval {$raw = run_cmd($cmd)};
759 if (my $erro =$@) { #this means the volume doesn't exist on dest yet
760 return undef;
761 }
762
763 my $index = 0;
764 my $line = "";
765 my $last_snap = undef;
766 my $old_snap;
767
768 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
769 $line = $1;
770 if ($line =~ m/@(.*)$/) {
771 $last_snap = $1 if (!$last_snap);
772 }
773 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
774 $old_snap = $1;
775 $index++;
776 if ($index == $max_snap) {
777 $source->{destroy} = 1;
778 last;
779 };
780 }
781 }
782
783 return ($old_snap, $last_snap) if $last_snap;
784
785 return undef;
786 }
787
788 sub snapshot_add {
789 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
790
791 my $snap_name = "rep_$name\_".$date;
792
793 $source->{new_snap} = $snap_name;
794
795 my $path = "$source->{all}\@$snap_name";
796
797 my $cmd = [];
798 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
799 push @$cmd, 'zfs', 'snapshot', $path;
800 eval{
801 run_cmd($cmd);
802 };
803
804 if (my $err = $@) {
805 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
806 die "$err\n";
807 }
808 }
809
810 sub get_disks {
811 my ($target, $user) = @_;
812
813 my $cmd = [];
814 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
815
816 if ($target->{vm_type} eq 'qemu') {
817 push @$cmd, 'qm', 'config', $target->{vmid};
818 } elsif ($target->{vm_type} eq 'lxc') {
819 push @$cmd, 'pct', 'config', $target->{vmid};
820 } else {
821 die "VM Type unknown\n";
822 }
823
824 my $res = run_cmd($cmd);
825
826 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
827
828 return $disks;
829 }
830
831 sub run_cmd {
832 my ($cmd) = @_;
833 print "Start CMD\n" if $DEBUG;
834 print Dumper $cmd if $DEBUG;
835 if (ref($cmd) eq 'ARRAY') {
836 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
837 }
838 my $output = `$cmd 2>&1`;
839
840 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
841
842 chomp($output);
843 print Dumper $output if $DEBUG;
844 print "END CMD\n" if $DEBUG;
845 return $output;
846 }
847
848 sub parse_disks {
849 my ($text, $ip, $vm_type, $user) = @_;
850
851 my $disks;
852
853 my $num = 0;
854 while ($text && $text =~ s/^(.*?)(\n|$)//) {
855 my $line = $1;
856
857 next if $line =~ /media=cdrom/;
858 next if $line !~ m/$DISK_KEY_RE/;
859
860 #QEMU if backup is not set include in sync
861 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
862
863 #LXC if backup is not set do no in sync
864 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
865
866 my $disk = undef;
867 my $stor = undef;
868 if($line =~ m/$DISK_KEY_RE(.*)$/) {
869 my @parameter = split(/,/,$1);
870
871 foreach my $opt (@parameter) {
872 if ($opt =~ m/^(?:file=|volume=)?([^:]+):([A-Za-z0-9\-]+)$/){
873 $disk = $2;
874 $stor = $1;
875 last;
876 }
877 }
878 }
879 if (!defined($disk) || !defined($stor)) {
880 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
881 next;
882 }
883
884 my $cmd = [];
885 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
886 push @$cmd, 'pvesm', 'path', "$stor:$disk";
887 my $path = run_cmd($cmd);
888
889 die "Get no path from pvesm path $stor:$disk\n" if !$path;
890
891 $disks->{$num}->{storage_id} = $stor;
892
893 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
894
895 my @array = split('/', $1);
896 $disks->{$num}->{pool} = shift(@array);
897 $disks->{$num}->{all} = $disks->{$num}->{pool};
898 if (0 < @array) {
899 $disks->{$num}->{path} = join('/', @array);
900 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
901 }
902 $disks->{$num}->{last_part} = $disk;
903 $disks->{$num}->{all} .= "\/$disk";
904
905 $num++;
906 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
907
908 $disks->{$num}->{pool} = $1;
909 $disks->{$num}->{all} = $disks->{$num}->{pool};
910
911 if ($2) {
912 $disks->{$num}->{path} = $3;
913 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
914 }
915
916 $disks->{$num}->{last_part} = $disk;
917 $disks->{$num}->{all} .= "\/$disk";
918
919 $num++;
920
921 } else {
922 die "ERROR: in path\n";
923 }
924 }
925
926 die "Vm include no disk on zfs.\n" if !$disks->{0};
927 return $disks;
928 }
929
930 # how the corresponding dataset is named on the target
931 sub target_dataset {
932 my ($source, $dest) = @_;
933
934 my $target = "$dest->{all}";
935 $target .= "/$dest->{prepend}" if defined($dest->{prepend});
936 $target .= "/$source->{last_part}" if $source->{last_part};
937 $target =~ s!/+!/!g;
938
939 return $target;
940 }
941
942 # create the parent dataset for the actual target
943 sub prepare_prepended_target {
944 my ($source, $dest, $dest_user) = @_;
945
946 die "internal error - not a prepended target\n" if !defined($dest->{prepend});
947
948 # The parent dataset shouldn't be the actual target.
949 die "internal error - no last_part for source\n" if !$source->{last_part};
950
951 my $target = "$dest->{all}/$dest->{prepend}";
952 $target =~ s!/+!/!g;
953
954 return if check_dataset_exists($target, $dest->{ip}, $dest_user);
955
956 create_file_system($target, $dest->{ip}, $dest_user);
957 }
958
959 sub snapshot_destroy {
960 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
961
962 my @zfscmd = ('zfs', 'destroy');
963 my $snapshot = "$source->{all}\@$snap";
964
965 eval {
966 if($source->{ip} && $method eq 'ssh'){
967 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
968 } else {
969 run_cmd([@zfscmd, $snapshot]);
970 }
971 };
972 if (my $erro = $@) {
973 warn "WARN: $erro";
974 }
975 if ($dest) {
976 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
977
978 my $path = target_dataset($source, $dest);
979
980 eval {
981 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
982 };
983 if (my $erro = $@) {
984 warn "WARN: $erro";
985 }
986 }
987 }
988
989 # check if snapshot for incremental sync exist on source side
990 sub snapshot_exist {
991 my ($source , $dest, $method, $source_user) = @_;
992
993 my $cmd = [];
994 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip};
995 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
996
997 my $path = $source->{all};
998 $path .= "\@$dest->{last_snap}";
999
1000 push @$cmd, $path;
1001
1002 eval {run_cmd($cmd)};
1003 if (my $erro =$@) {
1004 warn "WARN: $erro";
1005 return undef;
1006 }
1007 return 1;
1008 }
1009
1010 sub send_image {
1011 my ($source, $dest, $param) = @_;
1012
1013 my $cmd = [];
1014
1015 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
1016 push @$cmd, 'zfs', 'send';
1017 push @$cmd, '-p', if $param->{properties};
1018 push @$cmd, '-v' if $param->{verbose};
1019
1020 if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) {
1021 push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}";
1022 }
1023 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
1024
1025 if ($param->{limit}){
1026 my $bwl = $param->{limit}*1024;
1027 push @$cmd, \'|', 'cstream', '-t', $bwl;
1028 }
1029 my $target = target_dataset($source, $dest);
1030
1031 push @$cmd, \'|';
1032 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
1033 push @$cmd, 'zfs', 'recv', '-F', '--';
1034 push @$cmd, "$target";
1035
1036 eval {
1037 run_cmd($cmd)
1038 };
1039
1040 if (my $erro = $@) {
1041 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
1042 die $erro;
1043 };
1044 }
1045
1046
1047 sub send_config{
1048 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
1049
1050 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
1051 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
1052
1053 my $config_dir = $dest_config_path // $CONFIG_PATH;
1054 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
1055
1056 $dest_target_new = $config_dir.'/'.$dest_target_new;
1057
1058 if ($method eq 'ssh'){
1059 if ($dest->{ip} && $source->{ip}) {
1060 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1061 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1062 } elsif ($dest->{ip}) {
1063 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1064 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1065 } elsif ($source->{ip}) {
1066 run_cmd(['mkdir', '-p', '--', $config_dir]);
1067 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
1068 }
1069
1070 if ($source->{destroy}){
1071 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$dest->{old_snap}";
1072 if($dest->{ip}){
1073 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
1074 } else {
1075 run_cmd(['rm', '-f', '--', $dest_target_old]);
1076 }
1077 }
1078 } elsif ($method eq 'local') {
1079 run_cmd(['mkdir', '-p', '--', $config_dir]);
1080 run_cmd(['cp', $source_target, $dest_target_new]);
1081 }
1082 }
1083
1084 sub get_date {
1085 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1086 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1087
1088 return $datestamp;
1089 }
1090
1091 sub status {
1092 my $cfg = read_cron();
1093
1094 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1095
1096 my $states = read_state();
1097
1098 foreach my $source (sort keys%{$cfg}) {
1099 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1100 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1101 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1102 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1103 }
1104 }
1105
1106 return $status_list;
1107 }
1108
1109 sub enable_job {
1110 my ($param) = @_;
1111
1112 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1113 my $job = get_job($param);
1114 $job->{state} = "ok";
1115 update_state($job);
1116 update_cron($job);
1117 });
1118 }
1119
1120 sub disable_job {
1121 my ($param) = @_;
1122
1123 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1124 my $job = get_job($param);
1125 $job->{state} = "stopped";
1126 update_state($job);
1127 update_cron($job);
1128 });
1129 }
1130
1131 my $cmd_help = {
1132 destroy => qq{
1133 $PROGNAME destroy --source <string> [OPTIONS]
1134
1135 Remove a sync Job from the scheduler
1136
1137 --name string
1138 The name of the sync job, if not set 'default' is used.
1139
1140 --source string
1141 The source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1142 },
1143 create => qq{
1144 $PROGNAME create --dest <string> --source <string> [OPTIONS]
1145
1146 Create a new sync-job
1147
1148 --dest string
1149 The destination target is like [IP]:<Pool>[/Path]
1150
1151 --dest-user string
1152 The name of the user on the destination target, root by default
1153
1154 --limit integer
1155 Maximal sync speed in kBytes/s, default is unlimited
1156
1157 --maxsnap integer
1158 How much snapshots will be kept before get erased, default 1
1159
1160 --name string
1161 The name of the sync job, if not set it is default
1162
1163 --prepend-storage-id
1164 If specified, prepend the storage ID to the destination's path(s).
1165
1166 --skip
1167 If specified, skip the first sync.
1168
1169 --source string
1170 The source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1171
1172 --source-user string
1173 The (ssh) user-name on the source target, root by default
1174
1175 --properties
1176 If specified, include the dataset's properties in the stream.
1177
1178 --dest-config-path string
1179 Specifies a custom config path on the destination target.
1180 The default is /var/lib/pve-zsync
1181 },
1182 sync => qq{
1183 $PROGNAME sync --dest <string> --source <string> [OPTIONS]\n
1184
1185 Trigger one sync.
1186
1187 --dest string
1188 The destination target is like [IP:]<Pool>[/Path]
1189
1190 --dest-user string
1191 The (ssh) user-name on the destination target, root by default
1192
1193 --limit integer
1194 The maximal sync speed in kBytes/s, default is unlimited
1195
1196 --maxsnap integer
1197 Configure how many snapshots will be kept before get erased, default 1
1198
1199 --name string
1200 The name of the sync job, if not set it is 'default'.
1201 It is only necessary if scheduler allready contains this source.
1202
1203 --prepend-storage-id
1204 If specified, prepend the storage ID to the destination's path(s).
1205
1206 --source string
1207 The source can either be an <VMID> or [IP:]<ZFSPool>[/Path]
1208
1209 --source-user string
1210 The name of the user on the source target, root by default
1211
1212 --verbose
1213 If specified, print out the sync progress.
1214
1215 --properties
1216 If specified, include the dataset's properties in the stream.
1217
1218 --dest-config-path string
1219 Specifies a custom config path on the destination target.
1220 The default is /var/lib/pve-zsync
1221 },
1222 list => qq{
1223 $PROGNAME list
1224
1225 Get a List of all scheduled Sync Jobs
1226 },
1227 status => qq{
1228 $PROGNAME status
1229
1230 Get the status of all scheduled Sync Jobs
1231 },
1232 help => qq{
1233 $PROGNAME help <cmd> [OPTIONS]
1234
1235 Get help about specified command.
1236
1237 <cmd> string
1238 Command name to get help about.
1239
1240 --verbose
1241 Verbose output format.
1242 },
1243 enable => qq{
1244 $PROGNAME enable --source <string> [OPTIONS]
1245
1246 Enable a sync-job and reset all job-errors, if any.
1247
1248 --name string
1249 name of the sync job, if not set it is default
1250
1251 --source string
1252 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1253 },
1254 disable => qq{
1255 $PROGNAME disable --source <string> [OPTIONS]
1256
1257 Disables (pauses) a sync-job
1258
1259 --name string
1260 name of the sync-job, if not set it is default
1261
1262 --source string
1263 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1264 },
1265 printpod => "$PROGNAME printpod\n\n\tinternal command",
1266
1267 };
1268
1269 if (!$command) {
1270 usage(); die "\n";
1271 } elsif (!$cmd_help->{$command}) {
1272 print "ERROR: unknown command '$command'";
1273 usage(1); die "\n";
1274 }
1275
1276 my @arg = @ARGV;
1277 my $param = parse_argv(@arg);
1278
1279 sub check_params {
1280 for (@_) {
1281 die "$cmd_help->{$command}\n" if !$param->{$_};
1282 }
1283 }
1284
1285 if ($command eq 'destroy') {
1286 check_params(qw(source));
1287
1288 check_target($param->{source});
1289 destroy_job($param);
1290
1291 } elsif ($command eq 'sync') {
1292 check_params(qw(source dest));
1293
1294 check_target($param->{source});
1295 check_target($param->{dest});
1296 sync($param);
1297
1298 } elsif ($command eq 'create') {
1299 check_params(qw(source dest));
1300
1301 check_target($param->{source});
1302 check_target($param->{dest});
1303 init($param);
1304
1305 } elsif ($command eq 'status') {
1306 print status();
1307
1308 } elsif ($command eq 'list') {
1309 print list();
1310
1311 } elsif ($command eq 'help') {
1312 my $help_command = $ARGV[1];
1313
1314 if ($help_command && $cmd_help->{$help_command}) {
1315 die "$cmd_help->{$help_command}\n";
1316
1317 }
1318 if ($param->{verbose}) {
1319 exec("man $PROGNAME");
1320
1321 } else {
1322 usage(1);
1323
1324 }
1325
1326 } elsif ($command eq 'enable') {
1327 check_params(qw(source));
1328
1329 check_target($param->{source});
1330 enable_job($param);
1331
1332 } elsif ($command eq 'disable') {
1333 check_params(qw(source));
1334
1335 check_target($param->{source});
1336 disable_job($param);
1337
1338 } elsif ($command eq 'printpod') {
1339 print_pod();
1340 }
1341
1342 sub usage {
1343 my ($help) = @_;
1344
1345 print("ERROR:\tno command specified\n") if !$help;
1346 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1347 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1348 print("\t$PROGNAME create --dest <string> --source <string> [OPTIONS]\n");
1349 print("\t$PROGNAME destroy --source <string> [OPTIONS]\n");
1350 print("\t$PROGNAME disable --source <string> [OPTIONS]\n");
1351 print("\t$PROGNAME enable --source <string> [OPTIONS]\n");
1352 print("\t$PROGNAME list\n");
1353 print("\t$PROGNAME status\n");
1354 print("\t$PROGNAME sync --dest <string> --source <string> [OPTIONS]\n");
1355 }
1356
1357 sub check_target {
1358 my ($target) = @_;
1359 parse_target($target);
1360 }
1361
1362 sub print_pod {
1363
1364 my $synopsis = join("\n", sort values %$cmd_help);
1365
1366 print <<EOF;
1367 =head1 NAME
1368
1369 pve-zsync - PVE ZFS Replication Manager
1370
1371 =head1 SYNOPSIS
1372
1373 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1374
1375 $synopsis
1376
1377 =head1 DESCRIPTION
1378
1379 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1380 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1381 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1382 To config cron see man crontab.
1383
1384 =head2 PVE ZFS Storage sync Tool
1385
1386 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1387
1388 =head1 EXAMPLES
1389
1390 add sync job from local VM to remote ZFS Server
1391 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1392
1393 =head1 IMPORTANT FILES
1394
1395 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1396
1397 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1398
1399 =head1 COPYRIGHT AND DISCLAIMER
1400
1401 Copyright (C) 2007-2021 Proxmox Server Solutions GmbH
1402
1403 This program is free software: you can redistribute it and/or modify it
1404 under the terms of the GNU Affero General Public License as published
1405 by the Free Software Foundation, either version 3 of the License, or
1406 (at your option) any later version.
1407
1408 This program is distributed in the hope that it will be useful, but
1409 WITHOUT ANY WARRANTY; without even the implied warranty of
1410 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1411 Affero General Public License for more details.
1412
1413 You should have received a copy of the GNU Affero General Public
1414 License along with this program. If not, see
1415 <http://www.gnu.org/licenses/>.
1416
1417 EOF
1418 }