]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
Fix #1245: Fix snapshot parser.
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Copy qw(move);
9 use File::Path qw(make_path);
10 use JSON;
11 use IO::File;
12 use String::ShellQuote 'shell_quote';
13
14 my $PROGNAME = "pve-zsync";
15 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
16 my $STATE = "${CONFIG_PATH}/sync_state";
17 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
18 my $PATH = "/usr/sbin";
19 my $PVE_DIR = "/etc/pve/local";
20 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
21 my $LXC_CONF = "${PVE_DIR}/lxc";
22 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
23 my $PROG_PATH = "$PATH/${PROGNAME}";
24 my $INTERVAL = 15;
25 my $DEBUG = 0;
26
27 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
28 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
29 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
30 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
31
32 my $IPV6RE = "(?:" .
33 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
34 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
35 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
36 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
41 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
42
43 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
44 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
45 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
46 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
47 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
48
49 check_bin ('cstream');
50 check_bin ('zfs');
51 check_bin ('ssh');
52 check_bin ('scp');
53
54 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =
55 sub {
56 die "Signal aborting sync\n";
57 };
58
59 sub check_bin {
60 my ($bin) = @_;
61
62 foreach my $p (split (/:/, $ENV{PATH})) {
63 my $fn = "$p/$bin";
64 if (-x $fn) {
65 return $fn;
66 }
67 }
68
69 die "unable to find command '$bin'\n";
70 }
71
72 sub cut_target_width {
73 my ($target, $max) = @_;
74
75 return $target if (length($target) <= $max);
76 my @spl = split('/', $target);
77
78 my $count = length($spl[@spl-1]);
79 return "..\/".substr($spl[@spl-1],($count-$max)+3 , $count) if $count > $max;
80
81 $count += length($spl[0]) if @spl > 1;
82 return substr($spl[0], 0, $max-4-length($spl[@spl-1]))."\/..\/".$spl[@spl-1] if $count > $max;
83
84 my $rest = 1;
85 $rest = $max-$count if ($max-$count > 0);
86
87 return "$spl[0]".substr($target, length($spl[0]), $rest)."..\/".$spl[@spl-1];
88 }
89
90 sub lock {
91 my ($fh) = @_;
92 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
93 }
94
95 sub unlock {
96 my ($fh) = @_;
97 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
98 }
99
100 sub get_status {
101 my ($source, $name, $status) = @_;
102
103 if ($status->{$source->{all}}->{$name}->{status}) {
104 return $status;
105 }
106
107 return undef;
108 }
109
110 sub check_pool_exists {
111 my ($target) = @_;
112
113 my $cmd = [];
114
115 if ($target->{ip}) {
116 push @$cmd, 'ssh', "root\@$target->{ip}", '--';
117 }
118 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
119 eval {
120 run_cmd($cmd);
121 };
122
123 if ($@) {
124 return 0;
125 }
126 return 1;
127 }
128
129 sub parse_target {
130 my ($text) = @_;
131
132 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
133 my $target = {};
134
135 if ($text !~ $TARGETRE) {
136 die "$errstr\n";
137 }
138 $target->{all} = $2;
139 $target->{ip} = $1 if $1;
140 my @parts = split('/', $2);
141
142 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
143
144 my $pool = $target->{pool} = shift(@parts);
145 die "$errstr\n" if !$pool;
146
147 if ($pool =~ m/^\d+$/) {
148 $target->{vmid} = $pool;
149 delete $target->{pool};
150 }
151
152 return $target if (@parts == 0);
153 $target->{last_part} = pop(@parts);
154
155 if ($target->{ip}) {
156 pop(@parts);
157 }
158 if (@parts > 0) {
159 $target->{path} = join('/', @parts);
160 }
161
162 return $target;
163 }
164
165 sub read_cron {
166
167 #This is for the first use to init file;
168 if (!-e $CRONJOBS) {
169 my $new_fh = IO::File->new("> $CRONJOBS");
170 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
171 close($new_fh);
172 return undef;
173 }
174
175 my $fh = IO::File->new("< $CRONJOBS");
176 die "Could not open file $CRONJOBS: $!\n" if !$fh;
177
178 my @text = <$fh>;
179
180 close($fh);
181
182 return encode_cron(@text);
183 }
184
185 sub parse_argv {
186 my (@arg) = @_;
187
188 my $param = {};
189 $param->{dest} = undef;
190 $param->{source} = undef;
191 $param->{verbose} = undef;
192 $param->{limit} = undef;
193 $param->{maxsnap} = undef;
194 $param->{name} = undef;
195 $param->{skip} = undef;
196 $param->{method} = undef;
197
198 my ($ret, $ar) = GetOptionsFromArray(\@arg,
199 'dest=s' => \$param->{dest},
200 'source=s' => \$param->{source},
201 'verbose' => \$param->{verbose},
202 'limit=i' => \$param->{limit},
203 'maxsnap=i' => \$param->{maxsnap},
204 'name=s' => \$param->{name},
205 'skip' => \$param->{skip},
206 'method=s' => \$param->{method});
207
208 if ($ret == 0) {
209 die "can't parse options\n";
210 }
211
212 $param->{name} = "default" if !$param->{name};
213 $param->{maxsnap} = 1 if !$param->{maxsnap};
214 $param->{method} = "ssh" if !$param->{method};
215
216 return $param;
217 }
218
219 sub add_state_to_job {
220 my ($job) = @_;
221
222 my $states = read_state();
223 my $state = $states->{$job->{source}}->{$job->{name}};
224
225 $job->{state} = $state->{state};
226 $job->{lsync} = $state->{lsync};
227 $job->{vm_type} = $state->{vm_type};
228
229 for (my $i = 0; $state->{"snap$i"}; $i++) {
230 $job->{"snap$i"} = $state->{"snap$i"};
231 }
232
233 return $job;
234 }
235
236 sub encode_cron {
237 my (@text) = @_;
238
239 my $cfg = {};
240
241 while (my $line = shift(@text)) {
242
243 my @arg = split('\s', $line);
244 my $param = parse_argv(@arg);
245
246 if ($param->{source} && $param->{dest}) {
247 $cfg->{$param->{source}}->{$param->{name}}->{dest} = $param->{dest};
248 $cfg->{$param->{source}}->{$param->{name}}->{verbose} = $param->{verbose};
249 $cfg->{$param->{source}}->{$param->{name}}->{limit} = $param->{limit};
250 $cfg->{$param->{source}}->{$param->{name}}->{maxsnap} = $param->{maxsnap};
251 $cfg->{$param->{source}}->{$param->{name}}->{skip} = $param->{skip};
252 $cfg->{$param->{source}}->{$param->{name}}->{method} = $param->{method};
253 }
254 }
255
256 return $cfg;
257 }
258
259 sub param_to_job {
260 my ($param) = @_;
261
262 my $job = {};
263
264 my $source = parse_target($param->{source});
265 my $dest = parse_target($param->{dest}) if $param->{dest};
266
267 $job->{name} = !$param->{name} ? "default" : $param->{name};
268 $job->{dest} = $param->{dest} if $param->{dest};
269 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
270 $job->{method} = "ssh" if !$job->{method};
271 $job->{limit} = $param->{limit};
272 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
273 $job->{source} = $param->{source};
274
275 return $job;
276 }
277
278 sub read_state {
279
280 if (!-e $STATE) {
281 make_path $CONFIG_PATH;
282 my $new_fh = IO::File->new("> $STATE");
283 die "Could not create $STATE: $!\n" if !$new_fh;
284 print $new_fh "{}";
285 close($new_fh);
286 return undef;
287 }
288
289 my $fh = IO::File->new("< $STATE");
290 die "Could not open file $STATE: $!\n" if !$fh;
291
292 my $text = <$fh>;
293 my $states = decode_json($text);
294
295 close($fh);
296
297 return $states;
298 }
299
300 sub update_state {
301 my ($job) = @_;
302 my $text;
303 my $in_fh;
304
305 eval {
306
307 $in_fh = IO::File->new("< $STATE");
308 die "Could not open file $STATE: $!\n" if !$in_fh;
309 lock($in_fh);
310 $text = <$in_fh>;
311 };
312
313 my $out_fh = IO::File->new("> $STATE.new");
314 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
315
316 my $states = {};
317 my $state = {};
318 if ($text){
319 $states = decode_json($text);
320 $state = $states->{$job->{source}}->{$job->{name}};
321 }
322
323 if ($job->{state} ne "del") {
324 $state->{state} = $job->{state};
325 $state->{lsync} = $job->{lsync};
326 $state->{vm_type} = $job->{vm_type};
327
328 for (my $i = 0; $job->{"snap$i"} ; $i++) {
329 $state->{"snap$i"} = $job->{"snap$i"};
330 }
331 $states->{$job->{source}}->{$job->{name}} = $state;
332 } else {
333
334 delete $states->{$job->{source}}->{$job->{name}};
335 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
336 }
337
338 $text = encode_json($states);
339 print $out_fh $text;
340
341 close($out_fh);
342 move("$STATE.new", $STATE);
343 eval {
344 close($in_fh);
345 };
346
347 return $states;
348 }
349
350 sub update_cron {
351 my ($job) = @_;
352
353 my $updated;
354 my $has_header;
355 my $line_no = 0;
356 my $text = "";
357 my $header = "SHELL=/bin/sh\n";
358 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
359
360 my $fh = IO::File->new("< $CRONJOBS");
361 die "Could not open file $CRONJOBS: $!\n" if !$fh;
362 lock($fh);
363
364 my @test = <$fh>;
365
366 while (my $line = shift(@test)) {
367 chomp($line);
368 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
369 $updated = 1;
370 next if $job->{state} eq "del";
371 $text .= format_job($job, $line);
372 } else {
373 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
374 $has_header = 1;
375 }
376 $text .= "$line\n";
377 }
378 $line_no++;
379 }
380
381 if (!$has_header) {
382 $text = "$header$text";
383 }
384
385 if (!$updated) {
386 $text .= format_job($job);
387 }
388 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
389 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
390
391 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
392 close ($new_fh);
393
394 die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
395 close ($fh);
396 }
397
398 sub format_job {
399 my ($job, $line) = @_;
400 my $text = "";
401
402 if ($job->{state} eq "stopped") {
403 $text = "#";
404 }
405 if ($line) {
406 $line =~ /^#*(.+) root/;
407 $text .= $1;
408 } else {
409 $text .= "*/$INTERVAL * * * *";
410 }
411 $text .= " root";
412 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
413 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
414 $text .= " --limit $job->{limit}" if $job->{limit};
415 $text .= " --method $job->{method}";
416 $text .= " --verbose" if $job->{verbose};
417 $text .= "\n";
418
419 return $text;
420 }
421
422 sub list {
423
424 my $cfg = read_cron();
425
426 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
427
428 my $states = read_state();
429 foreach my $source (sort keys%{$cfg}) {
430 foreach my $name (sort keys%{$cfg->{$source}}) {
431 $list .= sprintf("%-25s", cut_target_width($source, 25));
432 $list .= sprintf("%-25s", cut_target_width($name, 25));
433 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
434 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
435 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
436 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
437 }
438 }
439
440 return $list;
441 }
442
443 sub vm_exists {
444 my ($target) = @_;
445
446 my @cmd = ('ssh', "root\@$target->{ip}", '--') if $target->{ip};
447
448 my $res = undef;
449
450 return undef if !defined($target->{vmid});
451
452 eval { $res = run_cmd([@cmd, 'ls', "$QEMU_CONF/$target->{vmid}.conf"]) };
453
454 return "qemu" if $res;
455
456 eval { $res = run_cmd([@cmd, 'ls', "$LXC_CONF/$target->{vmid}.conf"]) };
457
458 return "lxc" if $res;
459
460 return undef;
461 }
462
463 sub init {
464 my ($param) = @_;
465
466 my $cfg = read_cron();
467
468 my $job = param_to_job($param);
469
470 $job->{state} = "ok";
471 $job->{lsync} = 0;
472
473 my $source = parse_target($param->{source});
474 my $dest = parse_target($param->{dest});
475
476 if (my $ip = $dest->{ip}) {
477 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
478 }
479
480 if (my $ip = $source->{ip}) {
481 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
482 }
483
484 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest);
485
486 if (!defined($source->{vmid})) {
487 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source);
488 }
489
490 my $vm_type = vm_exists($source);
491 $job->{vm_type} = $vm_type;
492 $source->{vm_type} = $vm_type;
493
494 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
495
496 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
497
498 #check if vm has zfs disks if not die;
499 get_disks($source, 1) if $source->{vmid};
500
501 update_cron($job);
502 update_state($job);
503
504 eval {
505 sync($param) if !$param->{skip};
506 };
507 if(my $err = $@) {
508 destroy_job($param);
509 print $err;
510 }
511 }
512
513 sub get_job {
514 my ($param) = @_;
515
516 my $cfg = read_cron();
517
518 if (!$cfg->{$param->{source}}->{$param->{name}}) {
519 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
520 }
521 my $job = $cfg->{$param->{source}}->{$param->{name}};
522 $job->{name} = $param->{name};
523 $job->{source} = $param->{source};
524 $job = add_state_to_job($job);
525
526 return $job;
527 }
528
529 sub destroy_job {
530 my ($param) = @_;
531
532 my $job = get_job($param);
533 $job->{state} = "del";
534
535 update_cron($job);
536 update_state($job);
537 }
538
539 sub sync {
540 my ($param) = @_;
541
542 my $lock_fh = IO::File->new("> $LOCKFILE");
543 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
544 lock($lock_fh);
545
546 my $date = get_date();
547 my $job;
548 eval {
549 $job = get_job($param);
550 };
551
552 if ($job && $job->{state} eq "syncing") {
553 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
554 }
555
556 my $dest = parse_target($param->{dest});
557 my $source = parse_target($param->{source});
558
559 my $sync_path = sub {
560 my ($source, $dest, $job, $param, $date) = @_;
561
562 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name});
563
564 snapshot_add($source, $dest, $param->{name}, $date);
565
566 send_image($source, $dest, $param);
567
568 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}) if ($source->{destroy} && $source->{old_snap});
569
570 };
571
572 my $vm_type = vm_exists($source);
573 $source->{vm_type} = $vm_type;
574
575 if ($job) {
576 $job->{state} = "syncing";
577 $job->{vm_type} = $vm_type if !$job->{vm_type};
578 update_state($job);
579 }
580
581 eval{
582 if ($source->{vmid}) {
583 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
584 my $disks = get_disks($source);
585
586 foreach my $disk (sort keys %{$disks}) {
587 $source->{all} = $disks->{$disk}->{all};
588 $source->{pool} = $disks->{$disk}->{pool};
589 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
590 $source->{last_part} = $disks->{$disk}->{last_part};
591 &$sync_path($source, $dest, $job, $param, $date);
592 }
593 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
594 send_config($source, $dest,'ssh');
595 } else {
596 send_config($source, $dest,'local');
597 }
598 } else {
599 &$sync_path($source, $dest, $job, $param, $date);
600 }
601 };
602 if(my $err = $@) {
603 if ($job) {
604 $job->{state} = "error";
605 update_state($job);
606 unlock($lock_fh);
607 close($lock_fh);
608 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
609 }
610 die "$err\n";
611 }
612
613 if ($job) {
614 $job->{state} = "ok";
615 $job->{lsync} = $date;
616 update_state($job);
617 }
618
619 unlock($lock_fh);
620 close($lock_fh);
621 }
622
623 sub snapshot_get{
624 my ($source, $dest, $max_snap, $name) = @_;
625
626 my $cmd = [];
627 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
628 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
629 push @$cmd, $source->{all};
630
631 my $raw = run_cmd($cmd);
632 my $index = 0;
633 my $line = "";
634 my $last_snap = undef;
635 my $old_snap;
636
637 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
638 $line = $1;
639 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
640
641 $last_snap = $1 if (!$last_snap);
642 $old_snap = $1;
643 $index++;
644 if ($index == $max_snap) {
645 $source->{destroy} = 1;
646 last;
647 };
648 }
649 }
650
651 return ($old_snap, $last_snap) if $last_snap;
652
653 return undef;
654 }
655
656 sub snapshot_add {
657 my ($source, $dest, $name, $date) = @_;
658
659 my $snap_name = "rep_$name\_".$date;
660
661 $source->{new_snap} = $snap_name;
662
663 my $path = "$source->{all}\@$snap_name";
664
665 my $cmd = [];
666 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
667 push @$cmd, 'zfs', 'snapshot', $path;
668 eval{
669 run_cmd($cmd);
670 };
671
672 if (my $err = $@) {
673 snapshot_destroy($source, $dest, 'ssh', $snap_name);
674 die "$err\n";
675 }
676 }
677
678 sub write_cron {
679 my ($cfg) = @_;
680
681 my $text = "SHELL=/bin/sh\n";
682 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
683
684 my $fh = IO::File->new("> $CRONJOBS");
685 die "Could not open file: $!\n" if !$fh;
686
687 foreach my $source (sort keys%{$cfg}) {
688 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
689 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
690 $text .= "$PROG_PATH sync";
691 $text .= " -source ";
692 if ($cfg->{$source}->{$sync_name}->{vmid}) {
693 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
694 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
695 } else {
696 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
697 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
698 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
699 }
700 $text .= " -dest ";
701 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
702 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
703 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
704 $text .= " -name $sync_name ";
705 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
706 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
707 $text .= "\n";
708 }
709 }
710 die "Can't write to cron\n" if (!print($fh $text));
711 close($fh);
712 }
713
714 sub get_disks {
715 my ($target, $get_err) = @_;
716
717 my $cmd = [];
718 push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
719
720 if ($target->{vm_type} eq 'qemu') {
721 push @$cmd, 'qm', 'config', $target->{vmid};
722 } elsif ($target->{vm_type} eq 'lxc') {
723 push @$cmd, 'pct', 'config', $target->{vmid};
724 } else {
725 die "VM Type unknown\n";
726 }
727
728 my $res = run_cmd($cmd);
729
730 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $get_err);
731
732 return $disks;
733 }
734
735 sub run_cmd {
736 my ($cmd) = @_;
737 print "Start CMD\n" if $DEBUG;
738 print Dumper $cmd if $DEBUG;
739 if (ref($cmd) eq 'ARRAY') {
740 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
741 }
742 my $output = `$cmd 2>&1`;
743
744 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
745
746 chomp($output);
747 print Dumper $output if $DEBUG;
748 print "END CMD\n" if $DEBUG;
749 return $output;
750 }
751
752 sub parse_disks {
753 my ($text, $ip, $vm_type, $get_err) = @_;
754
755 my $disks;
756
757 my $num = 0;
758 while ($text && $text =~ s/^(.*?)(\n|$)//) {
759 my $line = $1;
760 my $error = $vm_type eq 'qemu' ? 1 : 0 ;
761
762 next if $line =~ /cdrom|none/;
763 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
764
765 #QEMU if backup is not set include in sync
766 next if $vm_type eq 'qemu && ($line =~ m/backup=(?i:0|no|off|false)/)';
767
768 #LXC if backup is not set do no in sync
769 $error = ($line =~ m/backup=(?i:1|yes|on|true)/) if $vm_type eq 'lxc';
770
771 my $disk = undef;
772 my $stor = undef;
773 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
774 my @parameter = split(/,/,$1);
775
776 foreach my $opt (@parameter) {
777 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
778 $disk = $2;
779 $stor = $1;
780 last;
781 }
782 }
783
784 } else {
785 print "Disk: \"$line\" will not include in pve-sync\n" if $get_err || $error;
786 next;
787 }
788
789 my $cmd = [];
790 push @$cmd, 'ssh', "root\@$ip", '--' if $ip;
791 push @$cmd, 'pvesm', 'path', "$stor$disk";
792 my $path = run_cmd($cmd);
793
794 die "Get no path from pvesm path $stor$disk\n" if !$path;
795
796 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
797
798 my @array = split('/', $1);
799 $disks->{$num}->{pool} = shift(@array);
800 $disks->{$num}->{all} = $disks->{$num}->{pool};
801 if (0 < @array) {
802 $disks->{$num}->{path} = join('/', @array);
803 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
804 }
805 $disks->{$num}->{last_part} = $disk;
806 $disks->{$num}->{all} .= "\/$disk";
807
808 $num++;
809 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
810
811 $disks->{$num}->{pool} = $1;
812 $disks->{$num}->{all} = $disks->{$num}->{pool};
813
814 if ($2) {
815 $disks->{$num}->{path} = $3;
816 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
817 }
818
819 $disks->{$num}->{last_part} = $disk;
820 $disks->{$num}->{all} .= "\/$disk";
821
822 $num++;
823
824 } else {
825 die "ERROR: in path\n";
826 }
827 }
828
829 die "Vm include no disk on zfs.\n" if !$disks->{0};
830 return $disks;
831 }
832
833 sub snapshot_destroy {
834 my ($source, $dest, $method, $snap) = @_;
835
836 my @zfscmd = ('zfs', 'destroy');
837 my $snapshot = "$source->{all}\@$snap";
838
839 eval {
840 if($source->{ip} && $method eq 'ssh'){
841 run_cmd(['ssh', "root\@$source->{ip}", '--', @zfscmd, $snapshot]);
842 } else {
843 run_cmd([@zfscmd, $snapshot]);
844 }
845 };
846 if (my $erro = $@) {
847 warn "WARN: $erro";
848 }
849 if ($dest) {
850 my @ssh = $dest->{ip} ? ('ssh', "root\@$dest->{ip}", '--') : ();
851
852 my $path = "$dest->{all}\/$source->{last_part}";
853
854 eval {
855 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
856 };
857 if (my $erro = $@) {
858 warn "WARN: $erro";
859 }
860 }
861 }
862
863 sub snapshot_exist {
864 my ($source , $dest, $method) = @_;
865
866 my $cmd = [];
867 push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
868 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
869 push @$cmd, "$dest->{all}/$source->{last_part}\@$source->{old_snap}";
870
871 my $text = "";
872 eval {$text =run_cmd($cmd);};
873 if (my $erro =$@) {
874 warn "WARN: $erro";
875 return undef;
876 }
877
878 while ($text && $text =~ s/^(.*?)(\n|$)//) {
879 my $line =$1;
880 return 1 if $line =~ m/^.*$source->{old_snap}$/;
881 }
882 }
883
884 sub send_image {
885 my ($source, $dest, $param) = @_;
886
887 my $cmd = [];
888
889 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "root\@$source->{ip}", '--' if $source->{ip};
890 push @$cmd, 'zfs', 'send';
891 push @$cmd, '-v' if $param->{verbose};
892
893 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method})) {
894 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
895 }
896 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
897
898 if ($param->{limit}){
899 my $bwl = $param->{limit}*1024;
900 push @$cmd, \'|', 'cstream', '-t', $bwl;
901 }
902 my $target = "$dest->{all}/$source->{last_part}";
903 $target =~ s!/+!/!g;
904
905 push @$cmd, \'|';
906 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "root\@$dest->{ip}", '--' if $dest->{ip};
907 push @$cmd, 'zfs', 'recv', '-F', '--';
908 push @$cmd, "$target";
909
910 eval {
911 run_cmd($cmd)
912 };
913
914 if (my $erro = $@) {
915 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap});
916 die $erro;
917 };
918 }
919
920
921 sub send_config{
922 my ($source, $dest, $method) = @_;
923
924 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
925 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
926
927 my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
928
929 $dest_target_new = $config_dir.'/'.$dest_target_new;
930
931 if ($method eq 'ssh'){
932 if ($dest->{ip} && $source->{ip}) {
933 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
934 run_cmd(['scp', '--', "root\@[$source->{ip}]:$source_target", "root\@[$dest->{ip}]:$dest_target_new"]);
935 } elsif ($dest->{ip}) {
936 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
937 run_cmd(['scp', '--', $source_target, "root\@[$dest->{ip}]:$dest_target_new"]);
938 } elsif ($source->{ip}) {
939 run_cmd(['mkdir', '-p', '--', $config_dir]);
940 run_cmd(['scp', '--', "root\@$source->{ip}:$source_target", $dest_target_new]);
941 }
942
943 if ($source->{destroy}){
944 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
945 if($dest->{ip}){
946 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
947 } else {
948 run_cmd(['rm', '-f', '--', $dest_target_old]);
949 }
950 }
951 } elsif ($method eq 'local') {
952 run_cmd(['mkdir', '-p', '--', $config_dir]);
953 run_cmd(['cp', $source_target, $dest_target_new]);
954 }
955 }
956
957 sub get_date {
958 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
959 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
960
961 return $datestamp;
962 }
963
964 sub status {
965 my $cfg = read_cron();
966
967 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
968
969 my $states = read_state();
970
971 foreach my $source (sort keys%{$cfg}) {
972 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
973 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
974 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
975 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
976 }
977 }
978
979 return $status_list;
980 }
981
982 sub enable_job {
983 my ($param) = @_;
984
985 my $job = get_job($param);
986 $job->{state} = "ok";
987 update_state($job);
988 update_cron($job);
989 }
990
991 sub disable_job {
992 my ($param) = @_;
993
994 my $job = get_job($param);
995 $job->{state} = "stopped";
996 update_state($job);
997 update_cron($job);
998 }
999
1000 my $command = $ARGV[0];
1001
1002 my $commands = {'destroy' => 1,
1003 'create' => 1,
1004 'sync' => 1,
1005 'list' => 1,
1006 'status' => 1,
1007 'help' => 1,
1008 'enable' => 1,
1009 'disable' => 1};
1010
1011 if (!$command || !$commands->{$command}) {
1012 usage();
1013 die "\n";
1014 }
1015
1016 my $help_sync = "$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1017 \twill sync one time\n
1018 \t-dest\tstring\n
1019 \t\tthe destination target is like [IP:]<Pool>[/Path]\n
1020 \t-limit\tinteger\n
1021 \t\tmax sync speed in kBytes/s, default unlimited\n
1022 \t-maxsnap\tinteger\n
1023 \t\thow much snapshots will be kept before get erased, default 1/n
1024 \t-name\tstring\n
1025 \t\tname of the sync job, if not set it is default.
1026 \tIt is only necessary if scheduler allready contains this source.\n
1027 \t-source\tstring\n
1028 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1029
1030 my $help_create = "$PROGNAME create -dest <string> -source <string> [OPTIONS]/n
1031 \tCreate a sync Job\n
1032 \t-dest\tstring\n
1033 \t\tthe destination target is like [IP]:<Pool>[/Path]\n
1034 \t-limit\tinteger\n
1035 \t\tmax sync speed in kBytes/s, default unlimited\n
1036 \t-maxsnap\tstring\n
1037 \t\thow much snapshots will be kept before get erased, default 1\n
1038 \t-name\tstring\n
1039 \t\tname of the sync job, if not set it is default\n
1040 \t-skip\tboolean\n
1041 \t\tif this flag is set it will skip the first sync\n
1042 \t-source\tstring\n
1043 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1044
1045 my $help_destroy = "$PROGNAME destroy -source <string> [OPTIONS]\n
1046 \tremove a sync Job from the scheduler\n
1047 \t-name\tstring\n
1048 \t\tname of the sync job, if not set it is default\n
1049 \t-source\tstring\n
1050 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1051
1052 my $help_help = "$PROGNAME help <cmd> [OPTIONS]\n
1053 \tGet help about specified command.\n
1054 \t<cmd>\tstring\n
1055 \t\tCommand name\n
1056 \t-verbose\tboolean\n
1057 \t\tVerbose output format.\n";
1058
1059 my $help_list = "$PROGNAME list\n
1060 \tGet a List of all scheduled Sync Jobs\n";
1061
1062 my $help_status = "$PROGNAME status\n
1063 \tGet the status of all scheduled Sync Jobs\n";
1064
1065 my $help_enable = "$PROGNAME enable -source <string> [OPTIONS]\n
1066 \tenable a syncjob and reset error\n
1067 \t-name\tstring\n
1068 \t\tname of the sync job, if not set it is default\n
1069 \t-source\tstring\n
1070 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1071
1072 my $help_disable = "$PROGNAME disable -source <string> [OPTIONS]\n
1073 \tpause a syncjob\n
1074 \t-name\tstring\n
1075 \t\tname of the sync job, if not set it is default\n
1076 \t-source\tstring\n
1077 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1078
1079 sub help {
1080 my ($command) = @_;
1081
1082 if ($command eq 'help') {
1083 die "$help_help\n";
1084
1085 } elsif ($command eq 'sync') {
1086 die "$help_sync\n";
1087
1088 } elsif ($command eq 'destroy') {
1089 die "$help_destroy\n";
1090
1091 } elsif ($command eq 'create') {
1092 die "$help_create\n";
1093
1094 } elsif ($command eq 'list') {
1095 die "$help_list\n";
1096
1097 } elsif ($command eq 'status') {
1098 die "$help_status\n";
1099
1100 } elsif ($command eq 'enable') {
1101 die "$help_enable\n";
1102
1103 } elsif ($command eq 'disable') {
1104 die "$help_disable\n";
1105
1106 }
1107
1108 }
1109
1110 my @arg = @ARGV;
1111 my $param = parse_argv(@arg);
1112
1113 if ($command eq 'destroy') {
1114 die "$help_destroy\n" if !$param->{source};
1115
1116 check_target($param->{source});
1117 destroy_job($param);
1118
1119 } elsif ($command eq 'sync') {
1120 die "$help_sync\n" if !$param->{source} || !$param->{dest};
1121
1122 check_target($param->{source});
1123 check_target($param->{dest});
1124 sync($param);
1125
1126 } elsif ($command eq 'create') {
1127 die "$help_create\n" if !$param->{source} || !$param->{dest};
1128
1129 check_target($param->{source});
1130 check_target($param->{dest});
1131 init($param);
1132
1133 } elsif ($command eq 'status') {
1134 print status();
1135
1136 } elsif ($command eq 'list') {
1137 print list();
1138
1139 } elsif ($command eq 'help') {
1140 my $help_command = $ARGV[1];
1141
1142 if ($help_command && $commands->{$help_command}) {
1143 print help($help_command);
1144
1145 }
1146 if ($param->{verbose} == 1){
1147 exec("man $PROGNAME");
1148
1149 } else {
1150 usage(1);
1151
1152 }
1153
1154 } elsif ($command eq 'enable') {
1155 die "$help_enable\n" if !$param->{source};
1156
1157 check_target($param->{source});
1158 enable_job($param);
1159
1160 } elsif ($command eq 'disable') {
1161 die "$help_disable\n" if !$param->{source};
1162
1163 check_target($param->{source});
1164 disable_job($param);
1165
1166 }
1167
1168 sub usage {
1169 my ($help) = @_;
1170
1171 print("ERROR:\tno command specified\n") if !$help;
1172 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1173 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1174 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1175 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1176 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1177 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1178 print("\t$PROGNAME list\n");
1179 print("\t$PROGNAME status\n");
1180 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1181 }
1182
1183 sub check_target {
1184 my ($target) = @_;
1185 parse_target($target);
1186 }
1187
1188 __END__
1189
1190 =head1 NAME
1191
1192 pve-zsync - PVE ZFS Replication Manager
1193
1194 =head1 SYNOPSIS
1195
1196 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1197
1198 pve-zsync help <cmd> [OPTIONS]
1199
1200 Get help about specified command.
1201
1202 <cmd> string
1203
1204 Command name
1205
1206 -verbose boolean
1207
1208 Verbose output format.
1209
1210 pve-zsync create -dest <string> -source <string> [OPTIONS]
1211
1212 Create a sync Job
1213
1214 -dest string
1215
1216 the destination target is like [IP]:<Pool>[/Path]
1217
1218 -limit integer
1219
1220 max sync speed in kBytes/s, default unlimited
1221
1222 -maxsnap string
1223
1224 how much snapshots will be kept before get erased, default 1
1225
1226 -name string
1227
1228 name of the sync job, if not set it is default
1229
1230 -skip boolean
1231
1232 if this flag is set it will skip the first sync
1233
1234 -source string
1235
1236 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1237
1238 pve-zsync destroy -source <string> [OPTIONS]
1239
1240 remove a sync Job from the scheduler
1241
1242 -name string
1243
1244 name of the sync job, if not set it is default
1245
1246 -source string
1247
1248 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1249
1250 pve-zsync disable -source <string> [OPTIONS]
1251
1252 pause a sync job
1253
1254 -name string
1255
1256 name of the sync job, if not set it is default
1257
1258 -source string
1259
1260 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1261
1262 pve-zsync enable -source <string> [OPTIONS]
1263
1264 enable a syncjob and reset error
1265
1266 -name string
1267
1268 name of the sync job, if not set it is default
1269
1270 -source string
1271
1272 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1273 pve-zsync list
1274
1275 Get a List of all scheduled Sync Jobs
1276
1277 pve-zsync status
1278
1279 Get the status of all scheduled Sync Jobs
1280
1281 pve-zsync sync -dest <string> -source <string> [OPTIONS]
1282
1283 will sync one time
1284
1285 -dest string
1286
1287 the destination target is like [IP:]<Pool>[/Path]
1288
1289 -limit integer
1290
1291 max sync speed in kBytes/s, default unlimited
1292
1293 -maxsnap integer
1294
1295 how much snapshots will be kept before get erased, default 1
1296
1297 -name string
1298
1299 name of the sync job, if not set it is default.
1300 It is only necessary if scheduler allready contains this source.
1301
1302 -source string
1303
1304 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1305
1306 =head1 DESCRIPTION
1307
1308 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1309 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1310 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1311 To config cron see man crontab.
1312
1313 =head2 PVE ZFS Storage sync Tool
1314
1315 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1316
1317 =head1 EXAMPLES
1318
1319 add sync job from local VM to remote ZFS Server
1320 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1321
1322 =head1 IMPORTANT FILES
1323
1324 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1325
1326 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1327
1328 =head1 COPYRIGHT AND DISCLAIMER
1329
1330 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1331
1332 This program is free software: you can redistribute it and/or modify it
1333 under the terms of the GNU Affero General Public License as published
1334 by the Free Software Foundation, either version 3 of the License, or
1335 (at your option) any later version.
1336
1337 This program is distributed in the hope that it will be useful, but
1338 WITHOUT ANY WARRANTY; without even the implied warranty of
1339 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1340 Affero General Public License for more details.
1341
1342 You should have received a copy of the GNU Affero General Public
1343 License along with this program. If not, see
1344 <http://www.gnu.org/licenses/>.