]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
avoid odd post-if style for die
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $PROG_PATH = "$PATH/${PROGNAME}";
22 my $INTERVAL = 15;
23 my $DEBUG;
24
25 BEGIN {
26 $DEBUG = 0; # change default here. not above on declaration!
27 $DEBUG ||= $ENV{ZSYNC_DEBUG};
28 if ($DEBUG) {
29 require Data::Dumper;
30 Data::Dumper->import();
31 }
32 }
33
34 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
35 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
36 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
37 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
38
39 my $IPV6RE = "(?:" .
40 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
41 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
42 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
43 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
49
50 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
51 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
52 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
53 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
54 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
55
56 my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|mp)\d+)|rootfs): /;
57
58 my $INSTANCE_ID = get_instance_id($$);
59
60 my $command = $ARGV[0];
61
62 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
63 check_bin ('cstream');
64 check_bin ('zfs');
65 check_bin ('ssh');
66 check_bin ('scp');
67 }
68
69 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
70 die "Signaled, aborting sync: $!\n";
71 };
72
73 sub check_bin {
74 my ($bin) = @_;
75
76 foreach my $p (split (/:/, $ENV{PATH})) {
77 my $fn = "$p/$bin";
78 if (-x $fn) {
79 return $fn;
80 }
81 }
82
83 die "unable to find command '$bin'\n";
84 }
85
86 sub read_file {
87 my ($filename, $one_line_only) = @_;
88
89 my $fh = IO::File->new($filename, "r")
90 or die "Could not open file ${filename}: $!\n";
91
92 my $text = $one_line_only ? <$fh> : [ <$fh> ];
93
94 close($fh);
95
96 return $text;
97 }
98
99 sub cut_target_width {
100 my ($path, $maxlen) = @_;
101 $path =~ s@/+@/@g;
102
103 return $path if length($path) <= $maxlen;
104
105 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
106
107 $path =~ s@/([^/]+/?)$@@;
108 my $tail = $1;
109
110 if (length($tail)+3 == $maxlen) {
111 return "../$tail";
112 } elsif (length($tail)+2 >= $maxlen) {
113 return '..'.substr($tail, -$maxlen+2)
114 }
115
116 $path =~ s@(/[^/]+)(?:/|$)@@;
117 my $head = $1;
118 my $both = length($head) + length($tail);
119 my $remaining = $maxlen-$both-4; # -4 for "/../"
120
121 if ($remaining < 0) {
122 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
123 }
124
125 substr($path, ($remaining/2), (length($path)-$remaining), '..');
126 return "$head/" . $path . "/$tail";
127 }
128
129 sub locked {
130 my ($lock_fn, $code) = @_;
131
132 my $lock_fh = IO::File->new("> $lock_fn");
133
134 flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
135 my $res = eval { $code->() };
136 my $err = $@;
137
138 flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
139 die "$err" if $err;
140
141 close($lock_fh);
142 return $res;
143 }
144
145 sub get_status {
146 my ($source, $name, $status) = @_;
147
148 if ($status->{$source->{all}}->{$name}->{status}) {
149 return $status;
150 }
151
152 return undef;
153 }
154
155 sub check_dataset_exists {
156 my ($dataset, $ip, $user) = @_;
157
158 my $cmd = [];
159
160 if ($ip) {
161 push @$cmd, 'ssh', "$user\@$ip", '--';
162 }
163 push @$cmd, 'zfs', 'list', '-H', '--', $dataset;
164 eval {
165 run_cmd($cmd);
166 };
167
168 if ($@) {
169 return 0;
170 }
171 return 1;
172 }
173
174 sub create_file_system {
175 my ($file_system, $ip, $user) = @_;
176
177 my $cmd = [];
178
179 if ($ip) {
180 push @$cmd, 'ssh', "$user\@$ip", '--';
181 }
182 push @$cmd, 'zfs', 'create', $file_system;
183
184 run_cmd($cmd);
185 }
186
187 sub parse_target {
188 my ($text) = @_;
189
190 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
191 my $target = {};
192
193 if ($text !~ $TARGETRE) {
194 die "$errstr\n";
195 }
196 $target->{all} = $2;
197 $target->{ip} = $1 if $1;
198 my @parts = split('/', $2);
199
200 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
201
202 my $pool = $target->{pool} = shift(@parts);
203 die "$errstr\n" if !$pool;
204
205 if ($pool =~ m/^\d+$/) {
206 $target->{vmid} = $pool;
207 delete $target->{pool};
208 }
209
210 return $target if (@parts == 0);
211 $target->{last_part} = pop(@parts);
212
213 if ($target->{ip}) {
214 pop(@parts);
215 }
216 if (@parts > 0) {
217 $target->{path} = join('/', @parts);
218 }
219
220 return $target;
221 }
222
223 sub read_cron {
224
225 #This is for the first use to init file;
226 if (!-e $CRONJOBS) {
227 my $new_fh = IO::File->new("> $CRONJOBS");
228 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
229 close($new_fh);
230 return undef;
231 }
232
233 my $text = read_file($CRONJOBS, 0);
234
235 return encode_cron(@{$text});
236 }
237
238 sub parse_argv {
239 my (@arg) = @_;
240
241 my $param = {
242 dest => undef,
243 source => undef,
244 verbose => undef,
245 limit => undef,
246 maxsnap => undef,
247 name => undef,
248 skip => undef,
249 method => undef,
250 source_user => undef,
251 dest_user => undef,
252 prepend_storage_id => undef,
253 properties => undef,
254 dest_config_path => undef,
255 };
256
257 my ($ret) = GetOptionsFromArray(
258 \@arg,
259 'dest=s' => \$param->{dest},
260 'source=s' => \$param->{source},
261 'verbose' => \$param->{verbose},
262 'limit=i' => \$param->{limit},
263 'maxsnap=i' => \$param->{maxsnap},
264 'name=s' => \$param->{name},
265 'skip' => \$param->{skip},
266 'method=s' => \$param->{method},
267 'source-user=s' => \$param->{source_user},
268 'dest-user=s' => \$param->{dest_user},
269 'prepend-storage-id' => \$param->{prepend_storage_id},
270 'properties' => \$param->{properties},
271 'dest-config-path=s' => \$param->{dest_config_path},
272 );
273
274 die "can't parse options\n" if $ret == 0;
275
276 $param->{name} //= "default";
277 $param->{maxsnap} //= 1;
278 $param->{method} //= "ssh";
279 $param->{source_user} //= "root";
280 $param->{dest_user} //= "root";
281
282 return $param;
283 }
284
285 sub add_state_to_job {
286 my ($job) = @_;
287
288 my $states = read_state();
289 my $state = $states->{$job->{source}}->{$job->{name}};
290
291 $job->{state} = $state->{state};
292 $job->{lsync} = $state->{lsync};
293 $job->{vm_type} = $state->{vm_type};
294 $job->{instance_id} = $state->{instance_id};
295
296 for (my $i = 0; $state->{"snap$i"}; $i++) {
297 $job->{"snap$i"} = $state->{"snap$i"};
298 }
299
300 return $job;
301 }
302
303 sub encode_cron {
304 my (@text) = @_;
305
306 my $cfg = {};
307
308 while (my $line = shift(@text)) {
309
310 my @arg = split('\s', $line);
311 my $param = parse_argv(@arg);
312
313 if ($param->{source} && $param->{dest}) {
314 my $source = delete $param->{source};
315 my $name = delete $param->{name};
316
317 $cfg->{$source}->{$name} = $param;
318 }
319 }
320
321 return $cfg;
322 }
323
324 sub param_to_job {
325 my ($param) = @_;
326
327 my $job = {};
328
329 my $source = parse_target($param->{source});
330 my $dest;
331 $dest = parse_target($param->{dest}) if $param->{dest};
332
333 $job->{name} = !$param->{name} ? "default" : $param->{name};
334 $job->{dest} = $param->{dest} if $param->{dest};
335 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
336 $job->{method} = "ssh" if !$job->{method};
337 $job->{limit} = $param->{limit};
338 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
339 $job->{source} = $param->{source};
340 $job->{source_user} = $param->{source_user};
341 $job->{dest_user} = $param->{dest_user};
342 $job->{prepend_storage_id} = !!$param->{prepend_storage_id};
343 $job->{properties} = !!$param->{properties};
344 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
345
346 return $job;
347 }
348
349 sub read_state {
350
351 if (!-e $STATE) {
352 make_path $CONFIG_PATH;
353 my $new_fh = IO::File->new("> $STATE");
354 die "Could not create $STATE: $!\n" if !$new_fh;
355 print $new_fh "{}";
356 close($new_fh);
357 return undef;
358 }
359
360 my $text = read_file($STATE, 1);
361 return decode_json($text);
362 }
363
364 sub update_state {
365 my ($job) = @_;
366
367 my $text = eval { read_file($STATE, 1); };
368
369 my $out_fh = IO::File->new("> $STATE.new");
370 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
371
372 my $states = {};
373 my $state = {};
374 if ($text){
375 $states = decode_json($text);
376 $state = $states->{$job->{source}}->{$job->{name}};
377 }
378
379 if ($job->{state} ne "del") {
380 $state->{state} = $job->{state};
381 $state->{lsync} = $job->{lsync};
382 $state->{instance_id} = $job->{instance_id};
383 $state->{vm_type} = $job->{vm_type};
384
385 for (my $i = 0; $job->{"snap$i"} ; $i++) {
386 $state->{"snap$i"} = $job->{"snap$i"};
387 }
388 $states->{$job->{source}}->{$job->{name}} = $state;
389 } else {
390
391 delete $states->{$job->{source}}->{$job->{name}};
392 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
393 }
394
395 $text = encode_json($states);
396 print $out_fh $text;
397
398 close($out_fh);
399 rename "$STATE.new", $STATE;
400
401 return $states;
402 }
403
404 sub update_cron {
405 my ($job) = @_;
406
407 my $updated;
408 my $has_header;
409 my $line_no = 0;
410 my $text = "";
411 my $header = "SHELL=/bin/sh\n";
412 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
413
414 my $current = read_file($CRONJOBS, 0);
415
416 foreach my $line (@{$current}) {
417 chomp($line);
418 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
419 $updated = 1;
420 next if $job->{state} eq "del";
421 $text .= format_job($job, $line);
422 } else {
423 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
424 $has_header = 1;
425 }
426 $text .= "$line\n";
427 }
428 $line_no++;
429 }
430
431 if (!$has_header) {
432 $text = "$header$text";
433 }
434
435 if (!$updated) {
436 $text .= format_job($job);
437 }
438 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
439 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
440
441 print $new_fh $text or die "can't write to $CRONJOBS.new: $!\n";
442 close ($new_fh);
443
444 rename "${CRONJOBS}.new", $CRONJOBS or die "can't move $CRONJOBS.new: $!\n";
445 }
446
447 sub format_job {
448 my ($job, $line) = @_;
449 my $text = "";
450
451 if ($job->{state} eq "stopped") {
452 $text = "#";
453 }
454 if ($line) {
455 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
456 $text .= $1;
457 } else {
458 $text .= "*/$INTERVAL * * * *";
459 }
460 $text .= " root";
461 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
462 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
463 $text .= " --limit $job->{limit}" if $job->{limit};
464 $text .= " --method $job->{method}";
465 $text .= " --verbose" if $job->{verbose};
466 $text .= " --source-user $job->{source_user}";
467 $text .= " --dest-user $job->{dest_user}";
468 $text .= " --prepend-storage-id" if $job->{prepend_storage_id};
469 $text .= " --properties" if $job->{properties};
470 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
471 $text .= "\n";
472
473 return $text;
474 }
475
476 sub list {
477
478 my $cfg = read_cron();
479
480 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
481
482 my $states = read_state();
483 foreach my $source (sort keys%{$cfg}) {
484 foreach my $name (sort keys%{$cfg->{$source}}) {
485 $list .= sprintf("%-25s", cut_target_width($source, 25));
486 $list .= sprintf("%-25s", cut_target_width($name, 25));
487 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
488 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
489 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
490 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
491 }
492 }
493
494 return $list;
495 }
496
497 sub vm_exists {
498 my ($target, $user) = @_;
499
500 return undef if !defined($target->{vmid});
501
502 my $conf_fn = "$target->{vmid}.conf";
503
504 if ($target->{ip}) {
505 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
506 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
507 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
508 } else {
509 return "qemu" if -f "$QEMU_CONF/$conf_fn";
510 return "lxc" if -f "$LXC_CONF/$conf_fn";
511 }
512
513 return undef;
514 }
515
516 sub init {
517 my ($param) = @_;
518
519 locked("$CONFIG_PATH/cron_and_state.lock", sub {
520 my $cfg = read_cron();
521
522 my $job = param_to_job($param);
523
524 $job->{state} = "ok";
525 $job->{lsync} = 0;
526
527 my $source = parse_target($param->{source});
528 my $dest = parse_target($param->{dest});
529
530 if (my $ip = $dest->{ip}) {
531 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
532 }
533
534 if (my $ip = $source->{ip}) {
535 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
536 }
537
538 die "Pool $dest->{all} does not exist\n"
539 if !check_dataset_exists($dest->{all}, $dest->{ip}, $param->{dest_user});
540
541 if (!defined($source->{vmid})) {
542 die "Pool $source->{all} does not exist\n"
543 if !check_dataset_exists($source->{all}, $source->{ip}, $param->{source_user});
544 }
545
546 my $vm_type = vm_exists($source, $param->{source_user});
547 $job->{vm_type} = $vm_type;
548 $source->{vm_type} = $vm_type;
549
550 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
551
552 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
553
554 #check if vm has zfs disks if not die;
555 get_disks($source, $param->{source_user}) if $source->{vmid};
556
557 update_cron($job);
558 update_state($job);
559 }); #cron and state lock
560
561 return if $param->{skip};
562
563 eval { sync($param) };
564 if (my $err = $@) {
565 destroy_job($param);
566 print $err;
567 }
568 }
569
570 sub get_job {
571 my ($param) = @_;
572
573 my $cfg = read_cron();
574
575 if (!$cfg->{$param->{source}}->{$param->{name}}) {
576 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
577 }
578 my $job = $cfg->{$param->{source}}->{$param->{name}};
579 $job->{name} = $param->{name};
580 $job->{source} = $param->{source};
581 $job = add_state_to_job($job);
582
583 return $job;
584 }
585
586 sub destroy_job {
587 my ($param) = @_;
588
589 locked("$CONFIG_PATH/cron_and_state.lock", sub {
590 my $job = get_job($param);
591 $job->{state} = "del";
592
593 update_cron($job);
594 update_state($job);
595 });
596 }
597
598 sub get_instance_id {
599 my ($pid) = @_;
600
601 my $stat = read_file("/proc/$pid/stat", 1)
602 or die "unable to read process stats\n";
603 my $boot_id = read_file("/proc/sys/kernel/random/boot_id", 1)
604 or die "unable to read boot ID\n";
605
606 my $stats = [ split(/\s+/, $stat) ];
607 my $starttime = $stats->[21];
608 chomp($boot_id);
609
610 return "${pid}:${starttime}:${boot_id}";
611 }
612
613 sub instance_exists {
614 my ($instance_id) = @_;
615
616 if (defined($instance_id) && $instance_id =~ m/^([1-9][0-9]*):/) {
617 my $pid = $1;
618 my $actual_id = eval { get_instance_id($pid); };
619 return defined($actual_id) && $actual_id eq $instance_id;
620 }
621
622 return 0;
623 }
624
625 sub sync {
626 my ($param) = @_;
627
628 my $job;
629
630 locked("$CONFIG_PATH/cron_and_state.lock", sub {
631 eval { $job = get_job($param) };
632
633 if ($job) {
634 my $state = $job->{state} // 'ok';
635 $state = 'ok' if !instance_exists($job->{instance_id});
636
637 if ($state eq "syncing" || $state eq "waiting") {
638 die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
639 }
640
641 $job->{state} = "waiting";
642 $job->{instance_id} = $INSTANCE_ID;
643
644 update_state($job);
645 }
646 });
647
648 locked("$CONFIG_PATH/sync.lock", sub {
649
650 my $date = get_date();
651
652 my $dest;
653 my $source;
654 my $vm_type;
655
656 locked("$CONFIG_PATH/cron_and_state.lock", sub {
657 #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
658 eval { $job = get_job($param); };
659
660 if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
661 die "Job --source $param->{source} --name $param->{name} has been disabled\n";
662 }
663
664 $dest = parse_target($param->{dest});
665 $source = parse_target($param->{source});
666
667 $vm_type = vm_exists($source, $param->{source_user});
668 $source->{vm_type} = $vm_type;
669
670 if ($job) {
671 $job->{state} = "syncing";
672 $job->{vm_type} = $vm_type if !$job->{vm_type};
673 update_state($job);
674 }
675 }); #cron and state lock
676
677 my $sync_path = sub {
678 my ($source, $dest, $job, $param, $date) = @_;
679
680 ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{dest_user});
681
682 prepare_prepended_target($source, $dest, $param->{dest_user}) if defined($dest->{prepend});
683
684 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
685
686 send_image($source, $dest, $param);
687
688 snapshot_destroy($source, $dest, $param->{method}, $dest->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $dest->{old_snap});
689
690 };
691
692 eval{
693 if ($source->{vmid}) {
694 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
695 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
696 my $disks = get_disks($source, $param->{source_user});
697
698 foreach my $disk (sort keys %{$disks}) {
699 $source->{all} = $disks->{$disk}->{all};
700 $source->{pool} = $disks->{$disk}->{pool};
701 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
702 $source->{last_part} = $disks->{$disk}->{last_part};
703
704 $dest->{prepend} = $disks->{$disk}->{storage_id}
705 if $param->{prepend_storage_id};
706
707 &$sync_path($source, $dest, $job, $param, $date);
708 }
709 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
710 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
711 } else {
712 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
713 }
714 } else {
715 &$sync_path($source, $dest, $job, $param, $date);
716 }
717 };
718 if (my $err = $@) {
719 locked("$CONFIG_PATH/cron_and_state.lock", sub {
720 eval { $job = get_job($param); };
721 if ($job) {
722 $job->{state} = "error";
723 delete $job->{instance_id};
724 update_state($job);
725 }
726 });
727 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
728 die "$err\n";
729 }
730
731 locked("$CONFIG_PATH/cron_and_state.lock", sub {
732 eval { $job = get_job($param); };
733 if ($job) {
734 if (defined($job->{state}) && $job->{state} eq "stopped") {
735 $job->{state} = "stopped";
736 } else {
737 $job->{state} = "ok";
738 }
739 $job->{lsync} = $date;
740 delete $job->{instance_id};
741 update_state($job);
742 }
743 });
744 }); #sync lock
745 }
746
747 sub snapshot_get{
748 my ($source, $dest, $max_snap, $name, $dest_user) = @_;
749
750 my $cmd = [];
751 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--', if $dest->{ip};
752 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
753
754 my $path = target_dataset($source, $dest);
755 push @$cmd, $path;
756
757 my $raw;
758 eval {$raw = run_cmd($cmd)};
759 if (my $erro =$@) { #this means the volume doesn't exist on dest yet
760 return undef;
761 }
762
763 my $index = 0;
764 my $line = "";
765 my $last_snap = undef;
766 my $old_snap;
767
768 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
769 $line = $1;
770 if ($line =~ m/@(.*)$/) {
771 $last_snap = $1 if (!$last_snap);
772 }
773 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
774 $old_snap = $1;
775 $index++;
776 if ($index == $max_snap) {
777 $source->{destroy} = 1;
778 last;
779 };
780 }
781 }
782
783 return ($old_snap, $last_snap) if $last_snap;
784
785 return undef;
786 }
787
788 sub snapshot_add {
789 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
790
791 my $snap_name = "rep_$name\_".$date;
792
793 $source->{new_snap} = $snap_name;
794
795 my $path = "$source->{all}\@$snap_name";
796
797 my $cmd = [];
798 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
799 push @$cmd, 'zfs', 'snapshot', $path;
800 eval{
801 run_cmd($cmd);
802 };
803
804 if (my $err = $@) {
805 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
806 die "$err\n";
807 }
808 }
809
810 sub get_disks {
811 my ($target, $user) = @_;
812
813 my $cmd = [];
814 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
815
816 if ($target->{vm_type} eq 'qemu') {
817 push @$cmd, 'qm', 'config', $target->{vmid};
818 } elsif ($target->{vm_type} eq 'lxc') {
819 push @$cmd, 'pct', 'config', $target->{vmid};
820 } else {
821 die "VM Type unknown\n";
822 }
823
824 my $res = run_cmd($cmd);
825
826 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
827
828 return $disks;
829 }
830
831 sub run_cmd {
832 my ($cmd) = @_;
833 print "Start CMD\n" if $DEBUG;
834 print Dumper $cmd if $DEBUG;
835 if (ref($cmd) eq 'ARRAY') {
836 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
837 }
838 my $output = `$cmd 2>&1`;
839
840 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
841
842 chomp($output);
843 print Dumper $output if $DEBUG;
844 print "END CMD\n" if $DEBUG;
845 return $output;
846 }
847
848 sub parse_disks {
849 my ($text, $ip, $vm_type, $user) = @_;
850
851 my $disks;
852
853 my $num = 0;
854 while ($text && $text =~ s/^(.*?)(\n|$)//) {
855 my $line = $1;
856
857 next if $line =~ /media=cdrom/;
858 next if $line !~ m/$DISK_KEY_RE/;
859
860 #QEMU if backup is not set include in sync
861 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
862
863 #LXC if backup is not set do no in sync
864 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
865
866 my $disk = undef;
867 my $stor = undef;
868 if($line =~ m/$DISK_KEY_RE(.*)$/) {
869 my @parameter = split(/,/,$1);
870
871 foreach my $opt (@parameter) {
872 if ($opt =~ m/^(?:file=|volume=)?([^:]+):([A-Za-z0-9\-]+)$/){
873 $disk = $2;
874 $stor = $1;
875 last;
876 }
877 }
878 }
879 if (!defined($disk) || !defined($stor)) {
880 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
881 next;
882 }
883
884 my $cmd = [];
885 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
886 push @$cmd, 'pvesm', 'path', "$stor:$disk";
887 my $path = run_cmd($cmd);
888
889 die "Get no path from pvesm path $stor:$disk\n" if !$path;
890
891 $disks->{$num}->{storage_id} = $stor;
892
893 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
894
895 my @array = split('/', $1);
896 $disks->{$num}->{pool} = shift(@array);
897 $disks->{$num}->{all} = $disks->{$num}->{pool};
898 if (0 < @array) {
899 $disks->{$num}->{path} = join('/', @array);
900 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
901 }
902 $disks->{$num}->{last_part} = $disk;
903 $disks->{$num}->{all} .= "\/$disk";
904
905 $num++;
906 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
907
908 $disks->{$num}->{pool} = $1;
909 $disks->{$num}->{all} = $disks->{$num}->{pool};
910
911 if ($2) {
912 $disks->{$num}->{path} = $3;
913 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
914 }
915
916 $disks->{$num}->{last_part} = $disk;
917 $disks->{$num}->{all} .= "\/$disk";
918
919 $num++;
920
921 } else {
922 die "ERROR: in path\n";
923 }
924 }
925
926 die "Vm include no disk on zfs.\n" if !$disks->{0};
927 return $disks;
928 }
929
930 # how the corresponding dataset is named on the target
931 sub target_dataset {
932 my ($source, $dest) = @_;
933
934 my $target = "$dest->{all}";
935 $target .= "/$dest->{prepend}" if defined($dest->{prepend});
936 $target .= "/$source->{last_part}" if $source->{last_part};
937 $target =~ s!/+!/!g;
938
939 return $target;
940 }
941
942 # create the parent dataset for the actual target
943 sub prepare_prepended_target {
944 my ($source, $dest, $dest_user) = @_;
945
946 die "internal error - not a prepended target\n" if !defined($dest->{prepend});
947
948 # The parent dataset shouldn't be the actual target.
949 die "internal error - no last_part for source\n" if !$source->{last_part};
950
951 my $target = "$dest->{all}/$dest->{prepend}";
952 $target =~ s!/+!/!g;
953
954 return if check_dataset_exists($target, $dest->{ip}, $dest_user);
955
956 create_file_system($target, $dest->{ip}, $dest_user);
957 }
958
959 sub snapshot_destroy {
960 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
961
962 my @zfscmd = ('zfs', 'destroy');
963 my $snapshot = "$source->{all}\@$snap";
964
965 eval {
966 if($source->{ip} && $method eq 'ssh'){
967 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
968 } else {
969 run_cmd([@zfscmd, $snapshot]);
970 }
971 };
972 if (my $erro = $@) {
973 warn "WARN: $erro";
974 }
975 if ($dest) {
976 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
977
978 my $path = target_dataset($source, $dest);
979
980 eval {
981 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
982 };
983 if (my $erro = $@) {
984 warn "WARN: $erro";
985 }
986 }
987 }
988
989 # check if snapshot for incremental sync exist on source side
990 sub snapshot_exist {
991 my ($source , $dest, $method, $source_user) = @_;
992
993 my $cmd = [];
994 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip};
995 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
996
997 my $path = $source->{all};
998 $path .= "\@$dest->{last_snap}";
999
1000 push @$cmd, $path;
1001
1002 eval {run_cmd($cmd)};
1003 if (my $erro =$@) {
1004 warn "WARN: $erro";
1005 return undef;
1006 }
1007 return 1;
1008 }
1009
1010 sub send_image {
1011 my ($source, $dest, $param) = @_;
1012
1013 my $cmd = [];
1014
1015 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
1016 push @$cmd, 'zfs', 'send';
1017 push @$cmd, '-p', if $param->{properties};
1018 push @$cmd, '-v' if $param->{verbose};
1019
1020 if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) {
1021 push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}";
1022 }
1023 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
1024
1025 if ($param->{limit}){
1026 my $bwl = $param->{limit}*1024;
1027 push @$cmd, \'|', 'cstream', '-t', $bwl;
1028 }
1029 my $target = target_dataset($source, $dest);
1030
1031 push @$cmd, \'|';
1032 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
1033 push @$cmd, 'zfs', 'recv', '-F', '--';
1034 push @$cmd, "$target";
1035
1036 eval {
1037 run_cmd($cmd)
1038 };
1039
1040 if (my $erro = $@) {
1041 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
1042 die $erro;
1043 };
1044 }
1045
1046
1047 sub send_config{
1048 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
1049
1050 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
1051 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
1052
1053 my $config_dir = $dest_config_path // $CONFIG_PATH;
1054 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
1055
1056 $dest_target_new = $config_dir.'/'.$dest_target_new;
1057
1058 if ($method eq 'ssh'){
1059 if ($dest->{ip} && $source->{ip}) {
1060 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1061 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1062 } elsif ($dest->{ip}) {
1063 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1064 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1065 } elsif ($source->{ip}) {
1066 run_cmd(['mkdir', '-p', '--', $config_dir]);
1067 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
1068 }
1069
1070 if ($source->{destroy}){
1071 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$dest->{old_snap}";
1072 if($dest->{ip}){
1073 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
1074 } else {
1075 run_cmd(['rm', '-f', '--', $dest_target_old]);
1076 }
1077 }
1078 } elsif ($method eq 'local') {
1079 run_cmd(['mkdir', '-p', '--', $config_dir]);
1080 run_cmd(['cp', $source_target, $dest_target_new]);
1081 }
1082 }
1083
1084 sub get_date {
1085 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1086 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1087
1088 return $datestamp;
1089 }
1090
1091 sub status {
1092 my $cfg = read_cron();
1093
1094 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1095
1096 my $states = read_state();
1097
1098 foreach my $source (sort keys%{$cfg}) {
1099 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1100 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1101 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1102 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1103 }
1104 }
1105
1106 return $status_list;
1107 }
1108
1109 sub enable_job {
1110 my ($param) = @_;
1111
1112 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1113 my $job = get_job($param);
1114 $job->{state} = "ok";
1115 update_state($job);
1116 update_cron($job);
1117 });
1118 }
1119
1120 sub disable_job {
1121 my ($param) = @_;
1122
1123 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1124 my $job = get_job($param);
1125 $job->{state} = "stopped";
1126 update_state($job);
1127 update_cron($job);
1128 });
1129 }
1130
1131 my $cmd_help = {
1132 destroy => qq{
1133 $PROGNAME destroy -source <string> [OPTIONS]
1134
1135 remove a sync Job from the scheduler
1136
1137 -name string
1138
1139 name of the sync job, if not set it is default
1140
1141 -source string
1142
1143 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1144 },
1145 create => qq{
1146 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1147
1148 Create a sync Job
1149
1150 -dest string
1151
1152 the destination target is like [IP]:<Pool>[/Path]
1153
1154 -dest-user string
1155
1156 name of the user on the destination target, root by default
1157
1158 -limit integer
1159
1160 max sync speed in kBytes/s, default unlimited
1161
1162 -maxsnap integer
1163
1164 how much snapshots will be kept before get erased, default 1
1165
1166 -name string
1167
1168 name of the sync job, if not set it is default
1169
1170 -prepend-storage-id
1171
1172 If specified, prepend the storage ID to the destination's path(s).
1173
1174 -skip
1175
1176 If specified, skip the first sync.
1177
1178 -source string
1179
1180 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1181
1182 -source-user string
1183
1184 name of the user on the source target, root by default
1185
1186 -properties
1187
1188 If specified, include the dataset's properties in the stream.
1189
1190 -dest-config-path string
1191
1192 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1193 },
1194 sync => qq{
1195 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1196
1197 will sync one time
1198
1199 -dest string
1200
1201 the destination target is like [IP:]<Pool>[/Path]
1202
1203 -dest-user string
1204
1205 name of the user on the destination target, root by default
1206
1207 -limit integer
1208
1209 max sync speed in kBytes/s, default unlimited
1210
1211 -maxsnap integer
1212
1213 how much snapshots will be kept before get erased, default 1
1214
1215 -name string
1216
1217 name of the sync job, if not set it is default.
1218 It is only necessary if scheduler allready contains this source.
1219
1220 -prepend-storage-id
1221
1222 If specified, prepend the storage ID to the destination's path(s).
1223
1224 -source string
1225
1226 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1227
1228 -source-user string
1229
1230 name of the user on the source target, root by default
1231
1232 -verbose
1233
1234 If specified, print out the sync progress.
1235
1236 -properties
1237
1238 If specified, include the dataset's properties in the stream.
1239
1240 -dest-config-path string
1241
1242 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1243 },
1244 list => qq{
1245 $PROGNAME list
1246
1247 Get a List of all scheduled Sync Jobs
1248 },
1249 status => qq{
1250 $PROGNAME status
1251
1252 Get the status of all scheduled Sync Jobs
1253 },
1254 help => qq{
1255 $PROGNAME help <cmd> [OPTIONS]
1256
1257 Get help about specified command.
1258
1259 <cmd> string
1260
1261 Command name
1262
1263 -verbose
1264
1265 Verbose output format.
1266 },
1267 enable => qq{
1268 $PROGNAME enable -source <string> [OPTIONS]
1269
1270 enable a syncjob and reset error
1271
1272 -name string
1273
1274 name of the sync job, if not set it is default
1275
1276 -source string
1277
1278 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1279 },
1280 disable => qq{
1281 $PROGNAME disable -source <string> [OPTIONS]
1282
1283 pause a sync job
1284
1285 -name string
1286
1287 name of the sync job, if not set it is default
1288
1289 -source string
1290
1291 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1292 },
1293 printpod => 'internal command',
1294
1295 };
1296
1297 if (!$command) {
1298 usage(); die "\n";
1299 } elsif (!$cmd_help->{$command}) {
1300 print "ERROR: unknown command '$command'";
1301 usage(1); die "\n";
1302 }
1303
1304 my @arg = @ARGV;
1305 my $param = parse_argv(@arg);
1306
1307 sub check_params {
1308 for (@_) {
1309 die "$cmd_help->{$command}\n" if !$param->{$_};
1310 }
1311 }
1312
1313 if ($command eq 'destroy') {
1314 check_params(qw(source));
1315
1316 check_target($param->{source});
1317 destroy_job($param);
1318
1319 } elsif ($command eq 'sync') {
1320 check_params(qw(source dest));
1321
1322 check_target($param->{source});
1323 check_target($param->{dest});
1324 sync($param);
1325
1326 } elsif ($command eq 'create') {
1327 check_params(qw(source dest));
1328
1329 check_target($param->{source});
1330 check_target($param->{dest});
1331 init($param);
1332
1333 } elsif ($command eq 'status') {
1334 print status();
1335
1336 } elsif ($command eq 'list') {
1337 print list();
1338
1339 } elsif ($command eq 'help') {
1340 my $help_command = $ARGV[1];
1341
1342 if ($help_command && $cmd_help->{$help_command}) {
1343 die "$cmd_help->{$help_command}\n";
1344
1345 }
1346 if ($param->{verbose}) {
1347 exec("man $PROGNAME");
1348
1349 } else {
1350 usage(1);
1351
1352 }
1353
1354 } elsif ($command eq 'enable') {
1355 check_params(qw(source));
1356
1357 check_target($param->{source});
1358 enable_job($param);
1359
1360 } elsif ($command eq 'disable') {
1361 check_params(qw(source));
1362
1363 check_target($param->{source});
1364 disable_job($param);
1365
1366 } elsif ($command eq 'printpod') {
1367 print_pod();
1368 }
1369
1370 sub usage {
1371 my ($help) = @_;
1372
1373 print("ERROR:\tno command specified\n") if !$help;
1374 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1375 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1376 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1377 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1378 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1379 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1380 print("\t$PROGNAME list\n");
1381 print("\t$PROGNAME status\n");
1382 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1383 }
1384
1385 sub check_target {
1386 my ($target) = @_;
1387 parse_target($target);
1388 }
1389
1390 sub print_pod {
1391
1392 my $synopsis = join("\n", sort values %$cmd_help);
1393
1394 print <<EOF;
1395 =head1 NAME
1396
1397 pve-zsync - PVE ZFS Replication Manager
1398
1399 =head1 SYNOPSIS
1400
1401 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1402
1403 $synopsis
1404
1405 =head1 DESCRIPTION
1406
1407 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1408 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1409 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1410 To config cron see man crontab.
1411
1412 =head2 PVE ZFS Storage sync Tool
1413
1414 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1415
1416 =head1 EXAMPLES
1417
1418 add sync job from local VM to remote ZFS Server
1419 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1420
1421 =head1 IMPORTANT FILES
1422
1423 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1424
1425 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1426
1427 =head1 COPYRIGHT AND DISCLAIMER
1428
1429 Copyright (C) 2007-2021 Proxmox Server Solutions GmbH
1430
1431 This program is free software: you can redistribute it and/or modify it
1432 under the terms of the GNU Affero General Public License as published
1433 by the Free Software Foundation, either version 3 of the License, or
1434 (at your option) any later version.
1435
1436 This program is distributed in the hope that it will be useful, but
1437 WITHOUT ANY WARRANTY; without even the implied warranty of
1438 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1439 Affero General Public License for more details.
1440
1441 You should have received a copy of the GNU Affero General Public
1442 License along with this program. If not, see
1443 <http://www.gnu.org/licenses/>.
1444
1445 EOF
1446 }