]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
don't check binaries for help and printpod commands
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Copy qw(move);
9 use File::Path qw(make_path);
10 use JSON;
11 use IO::File;
12 use String::ShellQuote 'shell_quote';
13
14 my $PROGNAME = "pve-zsync";
15 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
16 my $STATE = "${CONFIG_PATH}/sync_state";
17 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
18 my $PATH = "/usr/sbin";
19 my $PVE_DIR = "/etc/pve/local";
20 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
21 my $LXC_CONF = "${PVE_DIR}/lxc";
22 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
23 my $PROG_PATH = "$PATH/${PROGNAME}";
24 my $INTERVAL = 15;
25 my $DEBUG = 0;
26
27 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
28 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
29 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
30 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
31
32 my $IPV6RE = "(?:" .
33 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
34 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
35 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
36 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
41 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
42
43 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
44 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
45 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
46 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
47 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
48
49 my $command = $ARGV[0];
50
51 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
52 check_bin ('cstream');
53 check_bin ('zfs');
54 check_bin ('ssh');
55 check_bin ('scp');
56 }
57
58 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =
59 sub {
60 die "Signal aborting sync\n";
61 };
62
63 sub check_bin {
64 my ($bin) = @_;
65
66 foreach my $p (split (/:/, $ENV{PATH})) {
67 my $fn = "$p/$bin";
68 if (-x $fn) {
69 return $fn;
70 }
71 }
72
73 die "unable to find command '$bin'\n";
74 }
75
76 sub cut_target_width {
77 my ($path, $maxlen) = @_;
78 $path =~ s@/+@/@g;
79
80 return $path if length($path) <= $maxlen;
81
82 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
83
84 $path =~ s@/([^/]+/?)$@@;
85 my $tail = $1;
86
87 if (length($tail)+3 == $maxlen) {
88 return "../$tail";
89 } elsif (length($tail)+2 >= $maxlen) {
90 return '..'.substr($tail, -$maxlen+2)
91 }
92
93 $path =~ s@(/[^/]+)(?:/|$)@@;
94 my $head = $1;
95 my $both = length($head) + length($tail);
96 my $remaining = $maxlen-$both-4; # -4 for "/../"
97
98 if ($remaining < 0) {
99 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
100 }
101
102 substr($path, ($remaining/2), (length($path)-$remaining), '..');
103 return "$head/" . $path . "/$tail";
104 }
105
106 sub lock {
107 my ($fh) = @_;
108 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
109 }
110
111 sub unlock {
112 my ($fh) = @_;
113 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
114 }
115
116 sub get_status {
117 my ($source, $name, $status) = @_;
118
119 if ($status->{$source->{all}}->{$name}->{status}) {
120 return $status;
121 }
122
123 return undef;
124 }
125
126 sub check_pool_exists {
127 my ($target) = @_;
128
129 my $cmd = [];
130
131 if ($target->{ip}) {
132 push @$cmd, 'ssh', "root\@$target->{ip}", '--';
133 }
134 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
135 eval {
136 run_cmd($cmd);
137 };
138
139 if ($@) {
140 return 0;
141 }
142 return 1;
143 }
144
145 sub parse_target {
146 my ($text) = @_;
147
148 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
149 my $target = {};
150
151 if ($text !~ $TARGETRE) {
152 die "$errstr\n";
153 }
154 $target->{all} = $2;
155 $target->{ip} = $1 if $1;
156 my @parts = split('/', $2);
157
158 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
159
160 my $pool = $target->{pool} = shift(@parts);
161 die "$errstr\n" if !$pool;
162
163 if ($pool =~ m/^\d+$/) {
164 $target->{vmid} = $pool;
165 delete $target->{pool};
166 }
167
168 return $target if (@parts == 0);
169 $target->{last_part} = pop(@parts);
170
171 if ($target->{ip}) {
172 pop(@parts);
173 }
174 if (@parts > 0) {
175 $target->{path} = join('/', @parts);
176 }
177
178 return $target;
179 }
180
181 sub read_cron {
182
183 #This is for the first use to init file;
184 if (!-e $CRONJOBS) {
185 my $new_fh = IO::File->new("> $CRONJOBS");
186 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
187 close($new_fh);
188 return undef;
189 }
190
191 my $fh = IO::File->new("< $CRONJOBS");
192 die "Could not open file $CRONJOBS: $!\n" if !$fh;
193
194 my @text = <$fh>;
195
196 close($fh);
197
198 return encode_cron(@text);
199 }
200
201 sub parse_argv {
202 my (@arg) = @_;
203
204 my $param = {};
205 $param->{dest} = undef;
206 $param->{source} = undef;
207 $param->{verbose} = undef;
208 $param->{limit} = undef;
209 $param->{maxsnap} = undef;
210 $param->{name} = undef;
211 $param->{skip} = undef;
212 $param->{method} = undef;
213
214 my ($ret, $ar) = GetOptionsFromArray(\@arg,
215 'dest=s' => \$param->{dest},
216 'source=s' => \$param->{source},
217 'verbose' => \$param->{verbose},
218 'limit=i' => \$param->{limit},
219 'maxsnap=i' => \$param->{maxsnap},
220 'name=s' => \$param->{name},
221 'skip' => \$param->{skip},
222 'method=s' => \$param->{method});
223
224 if ($ret == 0) {
225 die "can't parse options\n";
226 }
227
228 $param->{name} = "default" if !$param->{name};
229 $param->{maxsnap} = 1 if !$param->{maxsnap};
230 $param->{method} = "ssh" if !$param->{method};
231
232 return $param;
233 }
234
235 sub add_state_to_job {
236 my ($job) = @_;
237
238 my $states = read_state();
239 my $state = $states->{$job->{source}}->{$job->{name}};
240
241 $job->{state} = $state->{state};
242 $job->{lsync} = $state->{lsync};
243 $job->{vm_type} = $state->{vm_type};
244
245 for (my $i = 0; $state->{"snap$i"}; $i++) {
246 $job->{"snap$i"} = $state->{"snap$i"};
247 }
248
249 return $job;
250 }
251
252 sub encode_cron {
253 my (@text) = @_;
254
255 my $cfg = {};
256
257 while (my $line = shift(@text)) {
258
259 my @arg = split('\s', $line);
260 my $param = parse_argv(@arg);
261
262 if ($param->{source} && $param->{dest}) {
263 $cfg->{$param->{source}}->{$param->{name}}->{dest} = $param->{dest};
264 $cfg->{$param->{source}}->{$param->{name}}->{verbose} = $param->{verbose};
265 $cfg->{$param->{source}}->{$param->{name}}->{limit} = $param->{limit};
266 $cfg->{$param->{source}}->{$param->{name}}->{maxsnap} = $param->{maxsnap};
267 $cfg->{$param->{source}}->{$param->{name}}->{skip} = $param->{skip};
268 $cfg->{$param->{source}}->{$param->{name}}->{method} = $param->{method};
269 }
270 }
271
272 return $cfg;
273 }
274
275 sub param_to_job {
276 my ($param) = @_;
277
278 my $job = {};
279
280 my $source = parse_target($param->{source});
281 my $dest = parse_target($param->{dest}) if $param->{dest};
282
283 $job->{name} = !$param->{name} ? "default" : $param->{name};
284 $job->{dest} = $param->{dest} if $param->{dest};
285 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
286 $job->{method} = "ssh" if !$job->{method};
287 $job->{limit} = $param->{limit};
288 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
289 $job->{source} = $param->{source};
290
291 return $job;
292 }
293
294 sub read_state {
295
296 if (!-e $STATE) {
297 make_path $CONFIG_PATH;
298 my $new_fh = IO::File->new("> $STATE");
299 die "Could not create $STATE: $!\n" if !$new_fh;
300 print $new_fh "{}";
301 close($new_fh);
302 return undef;
303 }
304
305 my $fh = IO::File->new("< $STATE");
306 die "Could not open file $STATE: $!\n" if !$fh;
307
308 my $text = <$fh>;
309 my $states = decode_json($text);
310
311 close($fh);
312
313 return $states;
314 }
315
316 sub update_state {
317 my ($job) = @_;
318 my $text;
319 my $in_fh;
320
321 eval {
322
323 $in_fh = IO::File->new("< $STATE");
324 die "Could not open file $STATE: $!\n" if !$in_fh;
325 lock($in_fh);
326 $text = <$in_fh>;
327 };
328
329 my $out_fh = IO::File->new("> $STATE.new");
330 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
331
332 my $states = {};
333 my $state = {};
334 if ($text){
335 $states = decode_json($text);
336 $state = $states->{$job->{source}}->{$job->{name}};
337 }
338
339 if ($job->{state} ne "del") {
340 $state->{state} = $job->{state};
341 $state->{lsync} = $job->{lsync};
342 $state->{vm_type} = $job->{vm_type};
343
344 for (my $i = 0; $job->{"snap$i"} ; $i++) {
345 $state->{"snap$i"} = $job->{"snap$i"};
346 }
347 $states->{$job->{source}}->{$job->{name}} = $state;
348 } else {
349
350 delete $states->{$job->{source}}->{$job->{name}};
351 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
352 }
353
354 $text = encode_json($states);
355 print $out_fh $text;
356
357 close($out_fh);
358 move("$STATE.new", $STATE);
359 eval {
360 close($in_fh);
361 };
362
363 return $states;
364 }
365
366 sub update_cron {
367 my ($job) = @_;
368
369 my $updated;
370 my $has_header;
371 my $line_no = 0;
372 my $text = "";
373 my $header = "SHELL=/bin/sh\n";
374 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
375
376 my $fh = IO::File->new("< $CRONJOBS");
377 die "Could not open file $CRONJOBS: $!\n" if !$fh;
378 lock($fh);
379
380 my @test = <$fh>;
381
382 while (my $line = shift(@test)) {
383 chomp($line);
384 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
385 $updated = 1;
386 next if $job->{state} eq "del";
387 $text .= format_job($job, $line);
388 } else {
389 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
390 $has_header = 1;
391 }
392 $text .= "$line\n";
393 }
394 $line_no++;
395 }
396
397 if (!$has_header) {
398 $text = "$header$text";
399 }
400
401 if (!$updated) {
402 $text .= format_job($job);
403 }
404 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
405 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
406
407 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
408 close ($new_fh);
409
410 die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
411 close ($fh);
412 }
413
414 sub format_job {
415 my ($job, $line) = @_;
416 my $text = "";
417
418 if ($job->{state} eq "stopped") {
419 $text = "#";
420 }
421 if ($line) {
422 $line =~ /^#*(.+) root/;
423 $text .= $1;
424 } else {
425 $text .= "*/$INTERVAL * * * *";
426 }
427 $text .= " root";
428 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
429 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
430 $text .= " --limit $job->{limit}" if $job->{limit};
431 $text .= " --method $job->{method}";
432 $text .= " --verbose" if $job->{verbose};
433 $text .= "\n";
434
435 return $text;
436 }
437
438 sub list {
439
440 my $cfg = read_cron();
441
442 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
443
444 my $states = read_state();
445 foreach my $source (sort keys%{$cfg}) {
446 foreach my $name (sort keys%{$cfg->{$source}}) {
447 $list .= sprintf("%-25s", cut_target_width($source, 25));
448 $list .= sprintf("%-25s", cut_target_width($name, 25));
449 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
450 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
451 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
452 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
453 }
454 }
455
456 return $list;
457 }
458
459 sub vm_exists {
460 my ($target) = @_;
461
462 my @cmd = ('ssh', "root\@$target->{ip}", '--') if $target->{ip};
463
464 my $res = undef;
465
466 return undef if !defined($target->{vmid});
467
468 eval { $res = run_cmd([@cmd, 'ls', "$QEMU_CONF/$target->{vmid}.conf"]) };
469
470 return "qemu" if $res;
471
472 eval { $res = run_cmd([@cmd, 'ls', "$LXC_CONF/$target->{vmid}.conf"]) };
473
474 return "lxc" if $res;
475
476 return undef;
477 }
478
479 sub init {
480 my ($param) = @_;
481
482 my $cfg = read_cron();
483
484 my $job = param_to_job($param);
485
486 $job->{state} = "ok";
487 $job->{lsync} = 0;
488
489 my $source = parse_target($param->{source});
490 my $dest = parse_target($param->{dest});
491
492 if (my $ip = $dest->{ip}) {
493 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
494 }
495
496 if (my $ip = $source->{ip}) {
497 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
498 }
499
500 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest);
501
502 if (!defined($source->{vmid})) {
503 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source);
504 }
505
506 my $vm_type = vm_exists($source);
507 $job->{vm_type} = $vm_type;
508 $source->{vm_type} = $vm_type;
509
510 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
511
512 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
513
514 #check if vm has zfs disks if not die;
515 get_disks($source) if $source->{vmid};
516
517 update_cron($job);
518 update_state($job);
519
520 eval {
521 sync($param) if !$param->{skip};
522 };
523 if(my $err = $@) {
524 destroy_job($param);
525 print $err;
526 }
527 }
528
529 sub get_job {
530 my ($param) = @_;
531
532 my $cfg = read_cron();
533
534 if (!$cfg->{$param->{source}}->{$param->{name}}) {
535 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
536 }
537 my $job = $cfg->{$param->{source}}->{$param->{name}};
538 $job->{name} = $param->{name};
539 $job->{source} = $param->{source};
540 $job = add_state_to_job($job);
541
542 return $job;
543 }
544
545 sub destroy_job {
546 my ($param) = @_;
547
548 my $job = get_job($param);
549 $job->{state} = "del";
550
551 update_cron($job);
552 update_state($job);
553 }
554
555 sub sync {
556 my ($param) = @_;
557
558 my $lock_fh = IO::File->new("> $LOCKFILE");
559 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
560 lock($lock_fh);
561
562 my $date = get_date();
563 my $job;
564 eval {
565 $job = get_job($param);
566 };
567
568 if ($job && $job->{state} eq "syncing") {
569 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
570 }
571
572 my $dest = parse_target($param->{dest});
573 my $source = parse_target($param->{source});
574
575 my $sync_path = sub {
576 my ($source, $dest, $job, $param, $date) = @_;
577
578 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name});
579
580 snapshot_add($source, $dest, $param->{name}, $date);
581
582 send_image($source, $dest, $param);
583
584 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}) if ($source->{destroy} && $source->{old_snap});
585
586 };
587
588 my $vm_type = vm_exists($source);
589 $source->{vm_type} = $vm_type;
590
591 if ($job) {
592 $job->{state} = "syncing";
593 $job->{vm_type} = $vm_type if !$job->{vm_type};
594 update_state($job);
595 }
596
597 eval{
598 if ($source->{vmid}) {
599 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
600 my $disks = get_disks($source);
601
602 foreach my $disk (sort keys %{$disks}) {
603 $source->{all} = $disks->{$disk}->{all};
604 $source->{pool} = $disks->{$disk}->{pool};
605 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
606 $source->{last_part} = $disks->{$disk}->{last_part};
607 &$sync_path($source, $dest, $job, $param, $date);
608 }
609 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
610 send_config($source, $dest,'ssh');
611 } else {
612 send_config($source, $dest,'local');
613 }
614 } else {
615 &$sync_path($source, $dest, $job, $param, $date);
616 }
617 };
618 if(my $err = $@) {
619 if ($job) {
620 $job->{state} = "error";
621 update_state($job);
622 unlock($lock_fh);
623 close($lock_fh);
624 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
625 }
626 die "$err\n";
627 }
628
629 if ($job) {
630 $job->{state} = "ok";
631 $job->{lsync} = $date;
632 update_state($job);
633 }
634
635 unlock($lock_fh);
636 close($lock_fh);
637 }
638
639 sub snapshot_get{
640 my ($source, $dest, $max_snap, $name) = @_;
641
642 my $cmd = [];
643 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
644 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
645 push @$cmd, $source->{all};
646
647 my $raw = run_cmd($cmd);
648 my $index = 0;
649 my $line = "";
650 my $last_snap = undef;
651 my $old_snap;
652
653 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
654 $line = $1;
655 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
656
657 $last_snap = $1 if (!$last_snap);
658 $old_snap = $1;
659 $index++;
660 if ($index == $max_snap) {
661 $source->{destroy} = 1;
662 last;
663 };
664 }
665 }
666
667 return ($old_snap, $last_snap) if $last_snap;
668
669 return undef;
670 }
671
672 sub snapshot_add {
673 my ($source, $dest, $name, $date) = @_;
674
675 my $snap_name = "rep_$name\_".$date;
676
677 $source->{new_snap} = $snap_name;
678
679 my $path = "$source->{all}\@$snap_name";
680
681 my $cmd = [];
682 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
683 push @$cmd, 'zfs', 'snapshot', $path;
684 eval{
685 run_cmd($cmd);
686 };
687
688 if (my $err = $@) {
689 snapshot_destroy($source, $dest, 'ssh', $snap_name);
690 die "$err\n";
691 }
692 }
693
694 sub write_cron {
695 my ($cfg) = @_;
696
697 my $text = "SHELL=/bin/sh\n";
698 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
699
700 my $fh = IO::File->new("> $CRONJOBS");
701 die "Could not open file: $!\n" if !$fh;
702
703 foreach my $source (sort keys%{$cfg}) {
704 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
705 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
706 $text .= "$PROG_PATH sync";
707 $text .= " -source ";
708 if ($cfg->{$source}->{$sync_name}->{vmid}) {
709 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
710 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
711 } else {
712 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
713 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
714 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
715 }
716 $text .= " -dest ";
717 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
718 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
719 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
720 $text .= " -name $sync_name ";
721 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
722 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
723 $text .= "\n";
724 }
725 }
726 die "Can't write to cron\n" if (!print($fh $text));
727 close($fh);
728 }
729
730 sub get_disks {
731 my ($target) = @_;
732
733 my $cmd = [];
734 push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
735
736 if ($target->{vm_type} eq 'qemu') {
737 push @$cmd, 'qm', 'config', $target->{vmid};
738 } elsif ($target->{vm_type} eq 'lxc') {
739 push @$cmd, 'pct', 'config', $target->{vmid};
740 } else {
741 die "VM Type unknown\n";
742 }
743
744 my $res = run_cmd($cmd);
745
746 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type});
747
748 return $disks;
749 }
750
751 sub run_cmd {
752 my ($cmd) = @_;
753 print "Start CMD\n" if $DEBUG;
754 print Dumper $cmd if $DEBUG;
755 if (ref($cmd) eq 'ARRAY') {
756 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
757 }
758 my $output = `$cmd 2>&1`;
759
760 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
761
762 chomp($output);
763 print Dumper $output if $DEBUG;
764 print "END CMD\n" if $DEBUG;
765 return $output;
766 }
767
768 sub parse_disks {
769 my ($text, $ip, $vm_type) = @_;
770
771 my $disks;
772
773 my $num = 0;
774 while ($text && $text =~ s/^(.*?)(\n|$)//) {
775 my $line = $1;
776
777 next if $line =~ /media=cdrom/;
778 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
779
780 #QEMU if backup is not set include in sync
781 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
782
783 #LXC if backup is not set do no in sync
784 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
785
786 my $disk = undef;
787 my $stor = undef;
788 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
789 my @parameter = split(/,/,$1);
790
791 foreach my $opt (@parameter) {
792 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
793 $disk = $2;
794 $stor = $1;
795 last;
796 }
797 }
798 }
799 if (!defined($disk) || !defined($stor)) {
800 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
801 next;
802 }
803
804 my $cmd = [];
805 push @$cmd, 'ssh', "root\@$ip", '--' if $ip;
806 push @$cmd, 'pvesm', 'path', "$stor$disk";
807 my $path = run_cmd($cmd);
808
809 die "Get no path from pvesm path $stor$disk\n" if !$path;
810
811 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
812
813 my @array = split('/', $1);
814 $disks->{$num}->{pool} = shift(@array);
815 $disks->{$num}->{all} = $disks->{$num}->{pool};
816 if (0 < @array) {
817 $disks->{$num}->{path} = join('/', @array);
818 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
819 }
820 $disks->{$num}->{last_part} = $disk;
821 $disks->{$num}->{all} .= "\/$disk";
822
823 $num++;
824 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
825
826 $disks->{$num}->{pool} = $1;
827 $disks->{$num}->{all} = $disks->{$num}->{pool};
828
829 if ($2) {
830 $disks->{$num}->{path} = $3;
831 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
832 }
833
834 $disks->{$num}->{last_part} = $disk;
835 $disks->{$num}->{all} .= "\/$disk";
836
837 $num++;
838
839 } else {
840 die "ERROR: in path\n";
841 }
842 }
843
844 die "Vm include no disk on zfs.\n" if !$disks->{0};
845 return $disks;
846 }
847
848 sub snapshot_destroy {
849 my ($source, $dest, $method, $snap) = @_;
850
851 my @zfscmd = ('zfs', 'destroy');
852 my $snapshot = "$source->{all}\@$snap";
853
854 eval {
855 if($source->{ip} && $method eq 'ssh'){
856 run_cmd(['ssh', "root\@$source->{ip}", '--', @zfscmd, $snapshot]);
857 } else {
858 run_cmd([@zfscmd, $snapshot]);
859 }
860 };
861 if (my $erro = $@) {
862 warn "WARN: $erro";
863 }
864 if ($dest) {
865 my @ssh = $dest->{ip} ? ('ssh', "root\@$dest->{ip}", '--') : ();
866
867 my $path = "$dest->{all}";
868 $path .= "/$source->{last_part}" if $source->{last_part};
869
870 eval {
871 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
872 };
873 if (my $erro = $@) {
874 warn "WARN: $erro";
875 }
876 }
877 }
878
879 sub snapshot_exist {
880 my ($source , $dest, $method) = @_;
881
882 my $cmd = [];
883 push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
884 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
885
886 my $path = $dest->{all};
887 $path .= "/$source->{last_part}" if $source->{last_part};
888 $path .= "\@$source->{old_snap}";
889
890 push @$cmd, $path;
891
892
893 my $text = "";
894 eval {$text =run_cmd($cmd);};
895 if (my $erro =$@) {
896 warn "WARN: $erro";
897 return undef;
898 }
899
900 while ($text && $text =~ s/^(.*?)(\n|$)//) {
901 my $line =$1;
902 return 1 if $line =~ m/^.*$source->{old_snap}$/;
903 }
904 }
905
906 sub send_image {
907 my ($source, $dest, $param) = @_;
908
909 my $cmd = [];
910
911 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "root\@$source->{ip}", '--' if $source->{ip};
912 push @$cmd, 'zfs', 'send';
913 push @$cmd, '-v' if $param->{verbose};
914
915 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method})) {
916 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
917 }
918 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
919
920 if ($param->{limit}){
921 my $bwl = $param->{limit}*1024;
922 push @$cmd, \'|', 'cstream', '-t', $bwl;
923 }
924 my $target = "$dest->{all}";
925 $target .= "/$source->{last_part}" if $source->{last_part};
926 $target =~ s!/+!/!g;
927
928 push @$cmd, \'|';
929 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "root\@$dest->{ip}", '--' if $dest->{ip};
930 push @$cmd, 'zfs', 'recv', '-F', '--';
931 push @$cmd, "$target";
932
933 eval {
934 run_cmd($cmd)
935 };
936
937 if (my $erro = $@) {
938 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap});
939 die $erro;
940 };
941 }
942
943
944 sub send_config{
945 my ($source, $dest, $method) = @_;
946
947 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
948 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
949
950 my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
951
952 $dest_target_new = $config_dir.'/'.$dest_target_new;
953
954 if ($method eq 'ssh'){
955 if ($dest->{ip} && $source->{ip}) {
956 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
957 run_cmd(['scp', '--', "root\@[$source->{ip}]:$source_target", "root\@[$dest->{ip}]:$dest_target_new"]);
958 } elsif ($dest->{ip}) {
959 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
960 run_cmd(['scp', '--', $source_target, "root\@[$dest->{ip}]:$dest_target_new"]);
961 } elsif ($source->{ip}) {
962 run_cmd(['mkdir', '-p', '--', $config_dir]);
963 run_cmd(['scp', '--', "root\@[$source->{ip}]:$source_target", $dest_target_new]);
964 }
965
966 if ($source->{destroy}){
967 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
968 if($dest->{ip}){
969 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
970 } else {
971 run_cmd(['rm', '-f', '--', $dest_target_old]);
972 }
973 }
974 } elsif ($method eq 'local') {
975 run_cmd(['mkdir', '-p', '--', $config_dir]);
976 run_cmd(['cp', $source_target, $dest_target_new]);
977 }
978 }
979
980 sub get_date {
981 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
982 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
983
984 return $datestamp;
985 }
986
987 sub status {
988 my $cfg = read_cron();
989
990 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
991
992 my $states = read_state();
993
994 foreach my $source (sort keys%{$cfg}) {
995 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
996 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
997 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
998 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
999 }
1000 }
1001
1002 return $status_list;
1003 }
1004
1005 sub enable_job {
1006 my ($param) = @_;
1007
1008 my $job = get_job($param);
1009 $job->{state} = "ok";
1010 update_state($job);
1011 update_cron($job);
1012 }
1013
1014 sub disable_job {
1015 my ($param) = @_;
1016
1017 my $job = get_job($param);
1018 $job->{state} = "stopped";
1019 update_state($job);
1020 update_cron($job);
1021 }
1022
1023 my $cmd_help = {
1024 destroy => qq{
1025 $PROGNAME destroy -source <string> [OPTIONS]
1026
1027 remove a sync Job from the scheduler
1028
1029 -name string
1030
1031 name of the sync job, if not set it is default
1032
1033 -source string
1034
1035 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1036 },
1037 create => qq{
1038 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1039
1040 Create a sync Job
1041
1042 -dest string
1043
1044 the destination target is like [IP]:<Pool>[/Path]
1045
1046 -limit integer
1047
1048 max sync speed in kBytes/s, default unlimited
1049
1050 -maxsnap string
1051
1052 how much snapshots will be kept before get erased, default 1
1053
1054 -name string
1055
1056 name of the sync job, if not set it is default
1057
1058 -skip boolean
1059
1060 if this flag is set it will skip the first sync
1061
1062 -source string
1063
1064 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1065 },
1066 sync => qq{
1067 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1068
1069 will sync one time
1070
1071 -dest string
1072
1073 the destination target is like [IP:]<Pool>[/Path]
1074
1075 -limit integer
1076
1077 max sync speed in kBytes/s, default unlimited
1078
1079 -maxsnap integer
1080
1081 how much snapshots will be kept before get erased, default 1
1082
1083 -name string
1084
1085 name of the sync job, if not set it is default.
1086 It is only necessary if scheduler allready contains this source.
1087
1088 -source string
1089
1090 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1091
1092 -verbose boolean
1093
1094 print out the sync progress.
1095 },
1096 list => qq{
1097 $PROGNAME list
1098
1099 Get a List of all scheduled Sync Jobs
1100 },
1101 status => qq{
1102 $PROGNAME status
1103
1104 Get the status of all scheduled Sync Jobs
1105 },
1106 help => qq{
1107 $PROGNAME help <cmd> [OPTIONS]
1108
1109 Get help about specified command.
1110
1111 <cmd> string
1112
1113 Command name
1114
1115 -verbose boolean
1116
1117 Verbose output format.
1118 },
1119 enable => qq{
1120 $PROGNAME enable -source <string> [OPTIONS]
1121
1122 enable a syncjob and reset error
1123
1124 -name string
1125
1126 name of the sync job, if not set it is default
1127
1128 -source string
1129
1130 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1131 },
1132 disable => qq{
1133 $PROGNAME disable -source <string> [OPTIONS]
1134
1135 pause a sync job
1136
1137 -name string
1138
1139 name of the sync job, if not set it is default
1140
1141 -source string
1142
1143 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1144 },
1145 printpod => 'internal command',
1146
1147 };
1148
1149 if (!$command) {
1150 usage(); die "\n";
1151 } elsif (!$cmd_help->{$command}) {
1152 print "ERROR: unknown command '$command'";
1153 usage(1); die "\n";
1154 }
1155
1156 my @arg = @ARGV;
1157 my $param = parse_argv(@arg);
1158
1159 sub check_params {
1160 for (@_) {
1161 die "$cmd_help->{$command}\n" if !$param->{$_};
1162 }
1163 }
1164
1165 if ($command eq 'destroy') {
1166 check_params(qw(source));
1167
1168 check_target($param->{source});
1169 destroy_job($param);
1170
1171 } elsif ($command eq 'sync') {
1172 check_params(qw(source dest));
1173
1174 check_target($param->{source});
1175 check_target($param->{dest});
1176 sync($param);
1177
1178 } elsif ($command eq 'create') {
1179 check_params(qw(source dest));
1180
1181 check_target($param->{source});
1182 check_target($param->{dest});
1183 init($param);
1184
1185 } elsif ($command eq 'status') {
1186 print status();
1187
1188 } elsif ($command eq 'list') {
1189 print list();
1190
1191 } elsif ($command eq 'help') {
1192 my $help_command = $ARGV[1];
1193
1194 if ($help_command && $cmd_help->{$help_command}) {
1195 die "$cmd_help->{$command}\n";
1196
1197 }
1198 if ($param->{verbose}) {
1199 exec("man $PROGNAME");
1200
1201 } else {
1202 usage(1);
1203
1204 }
1205
1206 } elsif ($command eq 'enable') {
1207 check_params(qw(source));
1208
1209 check_target($param->{source});
1210 enable_job($param);
1211
1212 } elsif ($command eq 'disable') {
1213 check_params(qw(source));
1214
1215 check_target($param->{source});
1216 disable_job($param);
1217
1218 } elsif ($command eq 'printpod') {
1219 print_pod();
1220 }
1221
1222 sub usage {
1223 my ($help) = @_;
1224
1225 print("ERROR:\tno command specified\n") if !$help;
1226 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1227 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1228 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1229 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1230 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1231 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1232 print("\t$PROGNAME list\n");
1233 print("\t$PROGNAME status\n");
1234 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1235 }
1236
1237 sub check_target {
1238 my ($target) = @_;
1239 parse_target($target);
1240 }
1241
1242 sub print_pod {
1243
1244 my $synopsis = join("\n", sort values %$cmd_help);
1245
1246 print <<EOF;
1247 =head1 NAME
1248
1249 pve-zsync - PVE ZFS Replication Manager
1250
1251 =head1 SYNOPSIS
1252
1253 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1254
1255 $synopsis
1256
1257 =head1 DESCRIPTION
1258
1259 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1260 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1261 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1262 To config cron see man crontab.
1263
1264 =head2 PVE ZFS Storage sync Tool
1265
1266 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1267
1268 =head1 EXAMPLES
1269
1270 add sync job from local VM to remote ZFS Server
1271 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1272
1273 =head1 IMPORTANT FILES
1274
1275 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1276
1277 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1278
1279 =head1 COPYRIGHT AND DISCLAIMER
1280
1281 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1282
1283 This program is free software: you can redistribute it and/or modify it
1284 under the terms of the GNU Affero General Public License as published
1285 by the Free Software Foundation, either version 3 of the License, or
1286 (at your option) any later version.
1287
1288 This program is distributed in the hope that it will be useful, but
1289 WITHOUT ANY WARRANTY; without even the implied warranty of
1290 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1291 Affero General Public License for more details.
1292
1293 You should have received a copy of the GNU Affero General Public
1294 License along with this program. If not, see
1295 <http://www.gnu.org/licenses/>.
1296
1297 EOF
1298 }