]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
bump version to 1.6-13
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Copy qw(move);
9 use File::Path qw(make_path);
10 use JSON;
11 use IO::File;
12 use String::ShellQuote 'shell_quote';
13
14 my $PROGNAME = "pve-zsync";
15 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
16 my $STATE = "${CONFIG_PATH}/sync_state";
17 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
18 my $PATH = "/usr/sbin";
19 my $PVE_DIR = "/etc/pve/local";
20 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
21 my $LXC_CONF = "${PVE_DIR}/lxc";
22 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
23 my $PROG_PATH = "$PATH/${PROGNAME}";
24 my $INTERVAL = 15;
25 my $DEBUG = 0;
26
27 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
28 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
29 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
30 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
31
32 my $IPV6RE = "(?:" .
33 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
34 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
35 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
36 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
41 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
42
43 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
44 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
45 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
46 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
47 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
48
49 check_bin ('cstream');
50 check_bin ('zfs');
51 check_bin ('ssh');
52 check_bin ('scp');
53
54 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =
55 sub {
56 die "Signal aborting sync\n";
57 };
58
59 sub check_bin {
60 my ($bin) = @_;
61
62 foreach my $p (split (/:/, $ENV{PATH})) {
63 my $fn = "$p/$bin";
64 if (-x $fn) {
65 return $fn;
66 }
67 }
68
69 die "unable to find command '$bin'\n";
70 }
71
72 sub cut_target_width {
73 my ($path, $maxlen) = @_;
74 $path =~ s@/+@/@g;
75
76 return $path if length($path) <= $maxlen;
77
78 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
79
80 $path =~ s@/([^/]+/?)$@@;
81 my $tail = $1;
82
83 if (length($tail)+3 == $maxlen) {
84 return "../$tail";
85 } elsif (length($tail)+2 >= $maxlen) {
86 return '..'.substr($tail, -$maxlen+2)
87 }
88
89 $path =~ s@(/[^/]+)(?:/|$)@@;
90 my $head = $1;
91 my $both = length($head) + length($tail);
92 my $remaining = $maxlen-$both-4; # -4 for "/../"
93
94 if ($remaining < 0) {
95 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
96 }
97
98 substr($path, ($remaining/2), (length($path)-$remaining), '..');
99 return "$head/" . $path . "/$tail";
100 }
101
102 sub lock {
103 my ($fh) = @_;
104 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
105 }
106
107 sub unlock {
108 my ($fh) = @_;
109 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
110 }
111
112 sub get_status {
113 my ($source, $name, $status) = @_;
114
115 if ($status->{$source->{all}}->{$name}->{status}) {
116 return $status;
117 }
118
119 return undef;
120 }
121
122 sub check_pool_exists {
123 my ($target) = @_;
124
125 my $cmd = [];
126
127 if ($target->{ip}) {
128 push @$cmd, 'ssh', "root\@$target->{ip}", '--';
129 }
130 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
131 eval {
132 run_cmd($cmd);
133 };
134
135 if ($@) {
136 return 0;
137 }
138 return 1;
139 }
140
141 sub parse_target {
142 my ($text) = @_;
143
144 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
145 my $target = {};
146
147 if ($text !~ $TARGETRE) {
148 die "$errstr\n";
149 }
150 $target->{all} = $2;
151 $target->{ip} = $1 if $1;
152 my @parts = split('/', $2);
153
154 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
155
156 my $pool = $target->{pool} = shift(@parts);
157 die "$errstr\n" if !$pool;
158
159 if ($pool =~ m/^\d+$/) {
160 $target->{vmid} = $pool;
161 delete $target->{pool};
162 }
163
164 return $target if (@parts == 0);
165 $target->{last_part} = pop(@parts);
166
167 if ($target->{ip}) {
168 pop(@parts);
169 }
170 if (@parts > 0) {
171 $target->{path} = join('/', @parts);
172 }
173
174 return $target;
175 }
176
177 sub read_cron {
178
179 #This is for the first use to init file;
180 if (!-e $CRONJOBS) {
181 my $new_fh = IO::File->new("> $CRONJOBS");
182 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
183 close($new_fh);
184 return undef;
185 }
186
187 my $fh = IO::File->new("< $CRONJOBS");
188 die "Could not open file $CRONJOBS: $!\n" if !$fh;
189
190 my @text = <$fh>;
191
192 close($fh);
193
194 return encode_cron(@text);
195 }
196
197 sub parse_argv {
198 my (@arg) = @_;
199
200 my $param = {};
201 $param->{dest} = undef;
202 $param->{source} = undef;
203 $param->{verbose} = undef;
204 $param->{limit} = undef;
205 $param->{maxsnap} = undef;
206 $param->{name} = undef;
207 $param->{skip} = undef;
208 $param->{method} = undef;
209
210 my ($ret, $ar) = GetOptionsFromArray(\@arg,
211 'dest=s' => \$param->{dest},
212 'source=s' => \$param->{source},
213 'verbose' => \$param->{verbose},
214 'limit=i' => \$param->{limit},
215 'maxsnap=i' => \$param->{maxsnap},
216 'name=s' => \$param->{name},
217 'skip' => \$param->{skip},
218 'method=s' => \$param->{method});
219
220 if ($ret == 0) {
221 die "can't parse options\n";
222 }
223
224 $param->{name} = "default" if !$param->{name};
225 $param->{maxsnap} = 1 if !$param->{maxsnap};
226 $param->{method} = "ssh" if !$param->{method};
227
228 return $param;
229 }
230
231 sub add_state_to_job {
232 my ($job) = @_;
233
234 my $states = read_state();
235 my $state = $states->{$job->{source}}->{$job->{name}};
236
237 $job->{state} = $state->{state};
238 $job->{lsync} = $state->{lsync};
239 $job->{vm_type} = $state->{vm_type};
240
241 for (my $i = 0; $state->{"snap$i"}; $i++) {
242 $job->{"snap$i"} = $state->{"snap$i"};
243 }
244
245 return $job;
246 }
247
248 sub encode_cron {
249 my (@text) = @_;
250
251 my $cfg = {};
252
253 while (my $line = shift(@text)) {
254
255 my @arg = split('\s', $line);
256 my $param = parse_argv(@arg);
257
258 if ($param->{source} && $param->{dest}) {
259 $cfg->{$param->{source}}->{$param->{name}}->{dest} = $param->{dest};
260 $cfg->{$param->{source}}->{$param->{name}}->{verbose} = $param->{verbose};
261 $cfg->{$param->{source}}->{$param->{name}}->{limit} = $param->{limit};
262 $cfg->{$param->{source}}->{$param->{name}}->{maxsnap} = $param->{maxsnap};
263 $cfg->{$param->{source}}->{$param->{name}}->{skip} = $param->{skip};
264 $cfg->{$param->{source}}->{$param->{name}}->{method} = $param->{method};
265 }
266 }
267
268 return $cfg;
269 }
270
271 sub param_to_job {
272 my ($param) = @_;
273
274 my $job = {};
275
276 my $source = parse_target($param->{source});
277 my $dest = parse_target($param->{dest}) if $param->{dest};
278
279 $job->{name} = !$param->{name} ? "default" : $param->{name};
280 $job->{dest} = $param->{dest} if $param->{dest};
281 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
282 $job->{method} = "ssh" if !$job->{method};
283 $job->{limit} = $param->{limit};
284 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
285 $job->{source} = $param->{source};
286
287 return $job;
288 }
289
290 sub read_state {
291
292 if (!-e $STATE) {
293 make_path $CONFIG_PATH;
294 my $new_fh = IO::File->new("> $STATE");
295 die "Could not create $STATE: $!\n" if !$new_fh;
296 print $new_fh "{}";
297 close($new_fh);
298 return undef;
299 }
300
301 my $fh = IO::File->new("< $STATE");
302 die "Could not open file $STATE: $!\n" if !$fh;
303
304 my $text = <$fh>;
305 my $states = decode_json($text);
306
307 close($fh);
308
309 return $states;
310 }
311
312 sub update_state {
313 my ($job) = @_;
314 my $text;
315 my $in_fh;
316
317 eval {
318
319 $in_fh = IO::File->new("< $STATE");
320 die "Could not open file $STATE: $!\n" if !$in_fh;
321 lock($in_fh);
322 $text = <$in_fh>;
323 };
324
325 my $out_fh = IO::File->new("> $STATE.new");
326 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
327
328 my $states = {};
329 my $state = {};
330 if ($text){
331 $states = decode_json($text);
332 $state = $states->{$job->{source}}->{$job->{name}};
333 }
334
335 if ($job->{state} ne "del") {
336 $state->{state} = $job->{state};
337 $state->{lsync} = $job->{lsync};
338 $state->{vm_type} = $job->{vm_type};
339
340 for (my $i = 0; $job->{"snap$i"} ; $i++) {
341 $state->{"snap$i"} = $job->{"snap$i"};
342 }
343 $states->{$job->{source}}->{$job->{name}} = $state;
344 } else {
345
346 delete $states->{$job->{source}}->{$job->{name}};
347 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
348 }
349
350 $text = encode_json($states);
351 print $out_fh $text;
352
353 close($out_fh);
354 move("$STATE.new", $STATE);
355 eval {
356 close($in_fh);
357 };
358
359 return $states;
360 }
361
362 sub update_cron {
363 my ($job) = @_;
364
365 my $updated;
366 my $has_header;
367 my $line_no = 0;
368 my $text = "";
369 my $header = "SHELL=/bin/sh\n";
370 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
371
372 my $fh = IO::File->new("< $CRONJOBS");
373 die "Could not open file $CRONJOBS: $!\n" if !$fh;
374 lock($fh);
375
376 my @test = <$fh>;
377
378 while (my $line = shift(@test)) {
379 chomp($line);
380 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
381 $updated = 1;
382 next if $job->{state} eq "del";
383 $text .= format_job($job, $line);
384 } else {
385 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
386 $has_header = 1;
387 }
388 $text .= "$line\n";
389 }
390 $line_no++;
391 }
392
393 if (!$has_header) {
394 $text = "$header$text";
395 }
396
397 if (!$updated) {
398 $text .= format_job($job);
399 }
400 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
401 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
402
403 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
404 close ($new_fh);
405
406 die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
407 close ($fh);
408 }
409
410 sub format_job {
411 my ($job, $line) = @_;
412 my $text = "";
413
414 if ($job->{state} eq "stopped") {
415 $text = "#";
416 }
417 if ($line) {
418 $line =~ /^#*(.+) root/;
419 $text .= $1;
420 } else {
421 $text .= "*/$INTERVAL * * * *";
422 }
423 $text .= " root";
424 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
425 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
426 $text .= " --limit $job->{limit}" if $job->{limit};
427 $text .= " --method $job->{method}";
428 $text .= " --verbose" if $job->{verbose};
429 $text .= "\n";
430
431 return $text;
432 }
433
434 sub list {
435
436 my $cfg = read_cron();
437
438 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
439
440 my $states = read_state();
441 foreach my $source (sort keys%{$cfg}) {
442 foreach my $name (sort keys%{$cfg->{$source}}) {
443 $list .= sprintf("%-25s", cut_target_width($source, 25));
444 $list .= sprintf("%-25s", cut_target_width($name, 25));
445 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
446 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
447 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
448 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
449 }
450 }
451
452 return $list;
453 }
454
455 sub vm_exists {
456 my ($target) = @_;
457
458 my @cmd = ('ssh', "root\@$target->{ip}", '--') if $target->{ip};
459
460 my $res = undef;
461
462 return undef if !defined($target->{vmid});
463
464 eval { $res = run_cmd([@cmd, 'ls', "$QEMU_CONF/$target->{vmid}.conf"]) };
465
466 return "qemu" if $res;
467
468 eval { $res = run_cmd([@cmd, 'ls', "$LXC_CONF/$target->{vmid}.conf"]) };
469
470 return "lxc" if $res;
471
472 return undef;
473 }
474
475 sub init {
476 my ($param) = @_;
477
478 my $cfg = read_cron();
479
480 my $job = param_to_job($param);
481
482 $job->{state} = "ok";
483 $job->{lsync} = 0;
484
485 my $source = parse_target($param->{source});
486 my $dest = parse_target($param->{dest});
487
488 if (my $ip = $dest->{ip}) {
489 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
490 }
491
492 if (my $ip = $source->{ip}) {
493 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
494 }
495
496 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest);
497
498 if (!defined($source->{vmid})) {
499 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source);
500 }
501
502 my $vm_type = vm_exists($source);
503 $job->{vm_type} = $vm_type;
504 $source->{vm_type} = $vm_type;
505
506 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
507
508 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
509
510 #check if vm has zfs disks if not die;
511 get_disks($source, 1) if $source->{vmid};
512
513 update_cron($job);
514 update_state($job);
515
516 eval {
517 sync($param) if !$param->{skip};
518 };
519 if(my $err = $@) {
520 destroy_job($param);
521 print $err;
522 }
523 }
524
525 sub get_job {
526 my ($param) = @_;
527
528 my $cfg = read_cron();
529
530 if (!$cfg->{$param->{source}}->{$param->{name}}) {
531 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
532 }
533 my $job = $cfg->{$param->{source}}->{$param->{name}};
534 $job->{name} = $param->{name};
535 $job->{source} = $param->{source};
536 $job = add_state_to_job($job);
537
538 return $job;
539 }
540
541 sub destroy_job {
542 my ($param) = @_;
543
544 my $job = get_job($param);
545 $job->{state} = "del";
546
547 update_cron($job);
548 update_state($job);
549 }
550
551 sub sync {
552 my ($param) = @_;
553
554 my $lock_fh = IO::File->new("> $LOCKFILE");
555 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
556 lock($lock_fh);
557
558 my $date = get_date();
559 my $job;
560 eval {
561 $job = get_job($param);
562 };
563
564 if ($job && $job->{state} eq "syncing") {
565 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
566 }
567
568 my $dest = parse_target($param->{dest});
569 my $source = parse_target($param->{source});
570
571 my $sync_path = sub {
572 my ($source, $dest, $job, $param, $date) = @_;
573
574 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name});
575
576 snapshot_add($source, $dest, $param->{name}, $date);
577
578 send_image($source, $dest, $param);
579
580 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}) if ($source->{destroy} && $source->{old_snap});
581
582 };
583
584 my $vm_type = vm_exists($source);
585 $source->{vm_type} = $vm_type;
586
587 if ($job) {
588 $job->{state} = "syncing";
589 $job->{vm_type} = $vm_type if !$job->{vm_type};
590 update_state($job);
591 }
592
593 eval{
594 if ($source->{vmid}) {
595 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
596 my $disks = get_disks($source);
597
598 foreach my $disk (sort keys %{$disks}) {
599 $source->{all} = $disks->{$disk}->{all};
600 $source->{pool} = $disks->{$disk}->{pool};
601 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
602 $source->{last_part} = $disks->{$disk}->{last_part};
603 &$sync_path($source, $dest, $job, $param, $date);
604 }
605 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
606 send_config($source, $dest,'ssh');
607 } else {
608 send_config($source, $dest,'local');
609 }
610 } else {
611 &$sync_path($source, $dest, $job, $param, $date);
612 }
613 };
614 if(my $err = $@) {
615 if ($job) {
616 $job->{state} = "error";
617 update_state($job);
618 unlock($lock_fh);
619 close($lock_fh);
620 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
621 }
622 die "$err\n";
623 }
624
625 if ($job) {
626 $job->{state} = "ok";
627 $job->{lsync} = $date;
628 update_state($job);
629 }
630
631 unlock($lock_fh);
632 close($lock_fh);
633 }
634
635 sub snapshot_get{
636 my ($source, $dest, $max_snap, $name) = @_;
637
638 my $cmd = [];
639 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
640 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
641 push @$cmd, $source->{all};
642
643 my $raw = run_cmd($cmd);
644 my $index = 0;
645 my $line = "";
646 my $last_snap = undef;
647 my $old_snap;
648
649 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
650 $line = $1;
651 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
652
653 $last_snap = $1 if (!$last_snap);
654 $old_snap = $1;
655 $index++;
656 if ($index == $max_snap) {
657 $source->{destroy} = 1;
658 last;
659 };
660 }
661 }
662
663 return ($old_snap, $last_snap) if $last_snap;
664
665 return undef;
666 }
667
668 sub snapshot_add {
669 my ($source, $dest, $name, $date) = @_;
670
671 my $snap_name = "rep_$name\_".$date;
672
673 $source->{new_snap} = $snap_name;
674
675 my $path = "$source->{all}\@$snap_name";
676
677 my $cmd = [];
678 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
679 push @$cmd, 'zfs', 'snapshot', $path;
680 eval{
681 run_cmd($cmd);
682 };
683
684 if (my $err = $@) {
685 snapshot_destroy($source, $dest, 'ssh', $snap_name);
686 die "$err\n";
687 }
688 }
689
690 sub write_cron {
691 my ($cfg) = @_;
692
693 my $text = "SHELL=/bin/sh\n";
694 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
695
696 my $fh = IO::File->new("> $CRONJOBS");
697 die "Could not open file: $!\n" if !$fh;
698
699 foreach my $source (sort keys%{$cfg}) {
700 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
701 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
702 $text .= "$PROG_PATH sync";
703 $text .= " -source ";
704 if ($cfg->{$source}->{$sync_name}->{vmid}) {
705 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
706 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
707 } else {
708 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
709 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
710 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
711 }
712 $text .= " -dest ";
713 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
714 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
715 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
716 $text .= " -name $sync_name ";
717 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
718 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
719 $text .= "\n";
720 }
721 }
722 die "Can't write to cron\n" if (!print($fh $text));
723 close($fh);
724 }
725
726 sub get_disks {
727 my ($target, $get_err) = @_;
728
729 my $cmd = [];
730 push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
731
732 if ($target->{vm_type} eq 'qemu') {
733 push @$cmd, 'qm', 'config', $target->{vmid};
734 } elsif ($target->{vm_type} eq 'lxc') {
735 push @$cmd, 'pct', 'config', $target->{vmid};
736 } else {
737 die "VM Type unknown\n";
738 }
739
740 my $res = run_cmd($cmd);
741
742 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $get_err);
743
744 return $disks;
745 }
746
747 sub run_cmd {
748 my ($cmd) = @_;
749 print "Start CMD\n" if $DEBUG;
750 print Dumper $cmd if $DEBUG;
751 if (ref($cmd) eq 'ARRAY') {
752 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
753 }
754 my $output = `$cmd 2>&1`;
755
756 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
757
758 chomp($output);
759 print Dumper $output if $DEBUG;
760 print "END CMD\n" if $DEBUG;
761 return $output;
762 }
763
764 sub parse_disks {
765 my ($text, $ip, $vm_type, $get_err) = @_;
766
767 my $disks;
768
769 my $num = 0;
770 while ($text && $text =~ s/^(.*?)(\n|$)//) {
771 my $line = $1;
772 my $error = $vm_type eq 'qemu' ? 1 : 0 ;
773
774 next if $line =~ /cdrom|none/;
775 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
776
777 #QEMU if backup is not set include in sync
778 next if $vm_type eq 'qemu && ($line =~ m/backup=(?i:0|no|off|false)/)';
779
780 #LXC if backup is not set do no in sync
781 $error = ($line =~ m/backup=(?i:1|yes|on|true)/) if $vm_type eq 'lxc';
782
783 my $disk = undef;
784 my $stor = undef;
785 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
786 my @parameter = split(/,/,$1);
787
788 foreach my $opt (@parameter) {
789 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
790 $disk = $2;
791 $stor = $1;
792 last;
793 }
794 }
795
796 } else {
797 print "Disk: \"$line\" will not include in pve-sync\n" if $get_err || $error;
798 next;
799 }
800
801 my $cmd = [];
802 push @$cmd, 'ssh', "root\@$ip", '--' if $ip;
803 push @$cmd, 'pvesm', 'path', "$stor$disk";
804 my $path = run_cmd($cmd);
805
806 die "Get no path from pvesm path $stor$disk\n" if !$path;
807
808 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
809
810 my @array = split('/', $1);
811 $disks->{$num}->{pool} = shift(@array);
812 $disks->{$num}->{all} = $disks->{$num}->{pool};
813 if (0 < @array) {
814 $disks->{$num}->{path} = join('/', @array);
815 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
816 }
817 $disks->{$num}->{last_part} = $disk;
818 $disks->{$num}->{all} .= "\/$disk";
819
820 $num++;
821 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
822
823 $disks->{$num}->{pool} = $1;
824 $disks->{$num}->{all} = $disks->{$num}->{pool};
825
826 if ($2) {
827 $disks->{$num}->{path} = $3;
828 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
829 }
830
831 $disks->{$num}->{last_part} = $disk;
832 $disks->{$num}->{all} .= "\/$disk";
833
834 $num++;
835
836 } else {
837 die "ERROR: in path\n";
838 }
839 }
840
841 die "Vm include no disk on zfs.\n" if !$disks->{0};
842 return $disks;
843 }
844
845 sub snapshot_destroy {
846 my ($source, $dest, $method, $snap) = @_;
847
848 my @zfscmd = ('zfs', 'destroy');
849 my $snapshot = "$source->{all}\@$snap";
850
851 eval {
852 if($source->{ip} && $method eq 'ssh'){
853 run_cmd(['ssh', "root\@$source->{ip}", '--', @zfscmd, $snapshot]);
854 } else {
855 run_cmd([@zfscmd, $snapshot]);
856 }
857 };
858 if (my $erro = $@) {
859 warn "WARN: $erro";
860 }
861 if ($dest) {
862 my @ssh = $dest->{ip} ? ('ssh', "root\@$dest->{ip}", '--') : ();
863
864 my $path = "$dest->{all}\/$source->{last_part}";
865
866 eval {
867 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
868 };
869 if (my $erro = $@) {
870 warn "WARN: $erro";
871 }
872 }
873 }
874
875 sub snapshot_exist {
876 my ($source , $dest, $method) = @_;
877
878 my $cmd = [];
879 push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
880 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
881 push @$cmd, "$dest->{all}/$source->{last_part}\@$source->{old_snap}";
882
883 my $text = "";
884 eval {$text =run_cmd($cmd);};
885 if (my $erro =$@) {
886 warn "WARN: $erro";
887 return undef;
888 }
889
890 while ($text && $text =~ s/^(.*?)(\n|$)//) {
891 my $line =$1;
892 return 1 if $line =~ m/^.*$source->{old_snap}$/;
893 }
894 }
895
896 sub send_image {
897 my ($source, $dest, $param) = @_;
898
899 my $cmd = [];
900
901 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "root\@$source->{ip}", '--' if $source->{ip};
902 push @$cmd, 'zfs', 'send';
903 push @$cmd, '-v' if $param->{verbose};
904
905 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method})) {
906 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
907 }
908 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
909
910 if ($param->{limit}){
911 my $bwl = $param->{limit}*1024;
912 push @$cmd, \'|', 'cstream', '-t', $bwl;
913 }
914 my $target = "$dest->{all}/$source->{last_part}";
915 $target =~ s!/+!/!g;
916
917 push @$cmd, \'|';
918 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "root\@$dest->{ip}", '--' if $dest->{ip};
919 push @$cmd, 'zfs', 'recv', '-F', '--';
920 push @$cmd, "$target";
921
922 eval {
923 run_cmd($cmd)
924 };
925
926 if (my $erro = $@) {
927 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap});
928 die $erro;
929 };
930 }
931
932
933 sub send_config{
934 my ($source, $dest, $method) = @_;
935
936 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
937 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
938
939 my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
940
941 $dest_target_new = $config_dir.'/'.$dest_target_new;
942
943 if ($method eq 'ssh'){
944 if ($dest->{ip} && $source->{ip}) {
945 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
946 run_cmd(['scp', '--', "root\@[$source->{ip}]:$source_target", "root\@[$dest->{ip}]:$dest_target_new"]);
947 } elsif ($dest->{ip}) {
948 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
949 run_cmd(['scp', '--', $source_target, "root\@[$dest->{ip}]:$dest_target_new"]);
950 } elsif ($source->{ip}) {
951 run_cmd(['mkdir', '-p', '--', $config_dir]);
952 run_cmd(['scp', '--', "root\@$source->{ip}:$source_target", $dest_target_new]);
953 }
954
955 if ($source->{destroy}){
956 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
957 if($dest->{ip}){
958 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
959 } else {
960 run_cmd(['rm', '-f', '--', $dest_target_old]);
961 }
962 }
963 } elsif ($method eq 'local') {
964 run_cmd(['mkdir', '-p', '--', $config_dir]);
965 run_cmd(['cp', $source_target, $dest_target_new]);
966 }
967 }
968
969 sub get_date {
970 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
971 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
972
973 return $datestamp;
974 }
975
976 sub status {
977 my $cfg = read_cron();
978
979 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
980
981 my $states = read_state();
982
983 foreach my $source (sort keys%{$cfg}) {
984 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
985 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
986 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
987 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
988 }
989 }
990
991 return $status_list;
992 }
993
994 sub enable_job {
995 my ($param) = @_;
996
997 my $job = get_job($param);
998 $job->{state} = "ok";
999 update_state($job);
1000 update_cron($job);
1001 }
1002
1003 sub disable_job {
1004 my ($param) = @_;
1005
1006 my $job = get_job($param);
1007 $job->{state} = "stopped";
1008 update_state($job);
1009 update_cron($job);
1010 }
1011
1012 my $command = $ARGV[0];
1013
1014 my $commands = {'destroy' => 1,
1015 'create' => 1,
1016 'sync' => 1,
1017 'list' => 1,
1018 'status' => 1,
1019 'help' => 1,
1020 'enable' => 1,
1021 'disable' => 1};
1022
1023 if (!$command || !$commands->{$command}) {
1024 usage();
1025 die "\n";
1026 }
1027
1028 my $help_sync = "$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1029 \twill sync one time\n
1030 \t-dest\tstring\n
1031 \t\tthe destination target is like [IP:]<Pool>[/Path]\n
1032 \t-limit\tinteger\n
1033 \t\tmax sync speed in kBytes/s, default unlimited\n
1034 \t-maxsnap\tinteger\n
1035 \t\thow much snapshots will be kept before get erased, default 1/n
1036 \t-name\tstring\n
1037 \t\tname of the sync job, if not set it is default.
1038 \tIt is only necessary if scheduler allready contains this source.\n
1039 \t-source\tstring\n
1040 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1041
1042 my $help_create = "$PROGNAME create -dest <string> -source <string> [OPTIONS]/n
1043 \tCreate a sync Job\n
1044 \t-dest\tstring\n
1045 \t\tthe destination target is like [IP]:<Pool>[/Path]\n
1046 \t-limit\tinteger\n
1047 \t\tmax sync speed in kBytes/s, default unlimited\n
1048 \t-maxsnap\tstring\n
1049 \t\thow much snapshots will be kept before get erased, default 1\n
1050 \t-name\tstring\n
1051 \t\tname of the sync job, if not set it is default\n
1052 \t-skip\tboolean\n
1053 \t\tif this flag is set it will skip the first sync\n
1054 \t-source\tstring\n
1055 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1056
1057 my $help_destroy = "$PROGNAME destroy -source <string> [OPTIONS]\n
1058 \tremove a sync Job from the scheduler\n
1059 \t-name\tstring\n
1060 \t\tname of the sync job, if not set it is default\n
1061 \t-source\tstring\n
1062 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1063
1064 my $help_help = "$PROGNAME help <cmd> [OPTIONS]\n
1065 \tGet help about specified command.\n
1066 \t<cmd>\tstring\n
1067 \t\tCommand name\n
1068 \t-verbose\tboolean\n
1069 \t\tVerbose output format.\n";
1070
1071 my $help_list = "$PROGNAME list\n
1072 \tGet a List of all scheduled Sync Jobs\n";
1073
1074 my $help_status = "$PROGNAME status\n
1075 \tGet the status of all scheduled Sync Jobs\n";
1076
1077 my $help_enable = "$PROGNAME enable -source <string> [OPTIONS]\n
1078 \tenable a syncjob and reset error\n
1079 \t-name\tstring\n
1080 \t\tname of the sync job, if not set it is default\n
1081 \t-source\tstring\n
1082 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1083
1084 my $help_disable = "$PROGNAME disable -source <string> [OPTIONS]\n
1085 \tpause a syncjob\n
1086 \t-name\tstring\n
1087 \t\tname of the sync job, if not set it is default\n
1088 \t-source\tstring\n
1089 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1090
1091 sub help {
1092 my ($command) = @_;
1093
1094 if ($command eq 'help') {
1095 die "$help_help\n";
1096
1097 } elsif ($command eq 'sync') {
1098 die "$help_sync\n";
1099
1100 } elsif ($command eq 'destroy') {
1101 die "$help_destroy\n";
1102
1103 } elsif ($command eq 'create') {
1104 die "$help_create\n";
1105
1106 } elsif ($command eq 'list') {
1107 die "$help_list\n";
1108
1109 } elsif ($command eq 'status') {
1110 die "$help_status\n";
1111
1112 } elsif ($command eq 'enable') {
1113 die "$help_enable\n";
1114
1115 } elsif ($command eq 'disable') {
1116 die "$help_disable\n";
1117
1118 }
1119
1120 }
1121
1122 my @arg = @ARGV;
1123 my $param = parse_argv(@arg);
1124
1125 if ($command eq 'destroy') {
1126 die "$help_destroy\n" if !$param->{source};
1127
1128 check_target($param->{source});
1129 destroy_job($param);
1130
1131 } elsif ($command eq 'sync') {
1132 die "$help_sync\n" if !$param->{source} || !$param->{dest};
1133
1134 check_target($param->{source});
1135 check_target($param->{dest});
1136 sync($param);
1137
1138 } elsif ($command eq 'create') {
1139 die "$help_create\n" if !$param->{source} || !$param->{dest};
1140
1141 check_target($param->{source});
1142 check_target($param->{dest});
1143 init($param);
1144
1145 } elsif ($command eq 'status') {
1146 print status();
1147
1148 } elsif ($command eq 'list') {
1149 print list();
1150
1151 } elsif ($command eq 'help') {
1152 my $help_command = $ARGV[1];
1153
1154 if ($help_command && $commands->{$help_command}) {
1155 print help($help_command);
1156
1157 }
1158 if ($param->{verbose} == 1){
1159 exec("man $PROGNAME");
1160
1161 } else {
1162 usage(1);
1163
1164 }
1165
1166 } elsif ($command eq 'enable') {
1167 die "$help_enable\n" if !$param->{source};
1168
1169 check_target($param->{source});
1170 enable_job($param);
1171
1172 } elsif ($command eq 'disable') {
1173 die "$help_disable\n" if !$param->{source};
1174
1175 check_target($param->{source});
1176 disable_job($param);
1177
1178 }
1179
1180 sub usage {
1181 my ($help) = @_;
1182
1183 print("ERROR:\tno command specified\n") if !$help;
1184 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1185 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1186 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1187 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1188 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1189 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1190 print("\t$PROGNAME list\n");
1191 print("\t$PROGNAME status\n");
1192 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1193 }
1194
1195 sub check_target {
1196 my ($target) = @_;
1197 parse_target($target);
1198 }
1199
1200 __END__
1201
1202 =head1 NAME
1203
1204 pve-zsync - PVE ZFS Replication Manager
1205
1206 =head1 SYNOPSIS
1207
1208 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1209
1210 pve-zsync help <cmd> [OPTIONS]
1211
1212 Get help about specified command.
1213
1214 <cmd> string
1215
1216 Command name
1217
1218 -verbose boolean
1219
1220 Verbose output format.
1221
1222 pve-zsync create -dest <string> -source <string> [OPTIONS]
1223
1224 Create a sync Job
1225
1226 -dest string
1227
1228 the destination target is like [IP]:<Pool>[/Path]
1229
1230 -limit integer
1231
1232 max sync speed in kBytes/s, default unlimited
1233
1234 -maxsnap string
1235
1236 how much snapshots will be kept before get erased, default 1
1237
1238 -name string
1239
1240 name of the sync job, if not set it is default
1241
1242 -skip boolean
1243
1244 if this flag is set it will skip the first sync
1245
1246 -source string
1247
1248 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1249
1250 pve-zsync destroy -source <string> [OPTIONS]
1251
1252 remove a sync Job from the scheduler
1253
1254 -name string
1255
1256 name of the sync job, if not set it is default
1257
1258 -source string
1259
1260 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1261
1262 pve-zsync disable -source <string> [OPTIONS]
1263
1264 pause a sync job
1265
1266 -name string
1267
1268 name of the sync job, if not set it is default
1269
1270 -source string
1271
1272 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1273
1274 pve-zsync enable -source <string> [OPTIONS]
1275
1276 enable a syncjob and reset error
1277
1278 -name string
1279
1280 name of the sync job, if not set it is default
1281
1282 -source string
1283
1284 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1285 pve-zsync list
1286
1287 Get a List of all scheduled Sync Jobs
1288
1289 pve-zsync status
1290
1291 Get the status of all scheduled Sync Jobs
1292
1293 pve-zsync sync -dest <string> -source <string> [OPTIONS]
1294
1295 will sync one time
1296
1297 -dest string
1298
1299 the destination target is like [IP:]<Pool>[/Path]
1300
1301 -limit integer
1302
1303 max sync speed in kBytes/s, default unlimited
1304
1305 -maxsnap integer
1306
1307 how much snapshots will be kept before get erased, default 1
1308
1309 -name string
1310
1311 name of the sync job, if not set it is default.
1312 It is only necessary if scheduler allready contains this source.
1313
1314 -source string
1315
1316 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1317
1318 =head1 DESCRIPTION
1319
1320 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1321 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1322 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1323 To config cron see man crontab.
1324
1325 =head2 PVE ZFS Storage sync Tool
1326
1327 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1328
1329 =head1 EXAMPLES
1330
1331 add sync job from local VM to remote ZFS Server
1332 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1333
1334 =head1 IMPORTANT FILES
1335
1336 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1337
1338 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1339
1340 =head1 COPYRIGHT AND DISCLAIMER
1341
1342 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1343
1344 This program is free software: you can redistribute it and/or modify it
1345 under the terms of the GNU Affero General Public License as published
1346 by the Free Software Foundation, either version 3 of the License, or
1347 (at your option) any later version.
1348
1349 This program is distributed in the hope that it will be useful, but
1350 WITHOUT ANY WARRANTY; without even the implied warranty of
1351 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1352 Affero General Public License for more details.
1353
1354 You should have received a copy of the GNU Affero General Public
1355 License along with this program. If not, see
1356 <http://www.gnu.org/licenses/>.