]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
fix #910: Correctly handle undef
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Copy qw(move);
9 use File::Path qw(make_path);
10 use Switch;
11 use JSON;
12 use IO::File;
13 use String::ShellQuote 'shell_quote';
14
15 my $PROGNAME = "pve-zsync";
16 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
17 my $STATE = "${CONFIG_PATH}/sync_state";
18 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
19 my $PATH = "/usr/sbin";
20 my $PVE_DIR = "/etc/pve/local";
21 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
22 my $LXC_CONF = "${PVE_DIR}/lxc";
23 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
24 my $PROG_PATH = "$PATH/${PROGNAME}";
25 my $INTERVAL = 15;
26 my $DEBUG = 0;
27
28 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
29 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
30 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
31 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
32
33 my $IPV6RE = "(?:" .
34 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
35 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
36 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
41 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
42 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
43
44 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
45 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
46 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
47 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
48 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
49
50 check_bin ('cstream');
51 check_bin ('zfs');
52 check_bin ('ssh');
53 check_bin ('scp');
54
55 sub check_bin {
56 my ($bin) = @_;
57
58 foreach my $p (split (/:/, $ENV{PATH})) {
59 my $fn = "$p/$bin";
60 if (-x $fn) {
61 return $fn;
62 }
63 }
64
65 die "unable to find command '$bin'\n";
66 }
67
68 sub cut_target_width {
69 my ($target, $max) = @_;
70
71 return $target if (length($target) <= $max);
72 my @spl = split('/', $target);
73
74 my $count = length($spl[@spl-1]);
75 return "..\/".substr($spl[@spl-1],($count-$max)+3 , $count) if $count > $max;
76
77 $count += length($spl[0]) if @spl > 1;
78 return substr($spl[0], 0, $max-4-length($spl[@spl-1]))."\/..\/".$spl[@spl-1] if $count > $max;
79
80 my $rest = 1;
81 $rest = $max-$count if ($max-$count > 0);
82
83 return "$spl[0]".substr($target, length($spl[0]), $rest)."..\/".$spl[@spl-1];
84 }
85
86 sub lock {
87 my ($fh) = @_;
88 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
89 }
90
91 sub unlock {
92 my ($fh) = @_;
93 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
94 }
95
96 sub get_status {
97 my ($source, $name, $status) = @_;
98
99 if ($status->{$source->{all}}->{$name}->{status}) {
100 return $status;
101 }
102
103 return undef;
104 }
105
106 sub check_pool_exists {
107 my ($target) = @_;
108
109 my $cmd = [];
110 push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
111 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
112 eval {
113 run_cmd($cmd);
114 };
115
116 if ($@) {
117 return 1;
118 }
119 return undef;
120 }
121
122 sub parse_target {
123 my ($text) = @_;
124
125 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
126 my $target = {};
127
128 if ($text !~ $TARGETRE) {
129 die "$errstr\n";
130 }
131 $target->{all} = $2;
132 $target->{ip} = $1 if $1;
133 my @parts = split('/', $2);
134
135 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
136
137 my $pool = $target->{pool} = shift(@parts);
138 die "$errstr\n" if !$pool;
139
140 if ($pool =~ m/^\d+$/) {
141 $target->{vmid} = $pool;
142 delete $target->{pool};
143 }
144
145 return $target if (@parts == 0);
146 $target->{last_part} = pop(@parts);
147
148 if ($target->{ip}) {
149 pop(@parts);
150 }
151 if (@parts > 0) {
152 $target->{path} = join('/', @parts);
153 }
154
155 return $target;
156 }
157
158 sub read_cron {
159
160 #This is for the first use to init file;
161 if (!-e $CRONJOBS) {
162 my $new_fh = IO::File->new("> $CRONJOBS");
163 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
164 close($new_fh);
165 return undef;
166 }
167
168 my $fh = IO::File->new("< $CRONJOBS");
169 die "Could not open file $CRONJOBS: $!\n" if !$fh;
170
171 my @text = <$fh>;
172
173 close($fh);
174
175 return encode_cron(@text);
176 }
177
178 sub parse_argv {
179 my (@arg) = @_;
180
181 my $param = {};
182 $param->{dest} = undef;
183 $param->{source} = undef;
184 $param->{verbose} = undef;
185 $param->{limit} = undef;
186 $param->{maxsnap} = undef;
187 $param->{name} = undef;
188 $param->{skip} = undef;
189 $param->{method} = undef;
190
191 my ($ret, $ar) = GetOptionsFromArray(\@arg,
192 'dest=s' => \$param->{dest},
193 'source=s' => \$param->{source},
194 'verbose' => \$param->{verbose},
195 'limit=i' => \$param->{limit},
196 'maxsnap=i' => \$param->{maxsnap},
197 'name=s' => \$param->{name},
198 'skip' => \$param->{skip},
199 'method=s' => \$param->{method});
200
201 if ($ret == 0) {
202 die "can't parse options\n";
203 }
204
205 $param->{name} = "default" if !$param->{name};
206 $param->{maxsnap} = 1 if !$param->{maxsnap};
207 $param->{method} = "ssh" if !$param->{method};
208
209 return $param;
210 }
211
212 sub add_state_to_job {
213 my ($job) = @_;
214
215 my $states = read_state();
216 my $state = $states->{$job->{source}}->{$job->{name}};
217
218 $job->{state} = $state->{state};
219 $job->{lsync} = $state->{lsync};
220 $job->{vm_type} = $state->{vm_type};
221
222 for (my $i = 0; $state->{"snap$i"}; $i++) {
223 $job->{"snap$i"} = $state->{"snap$i"};
224 }
225
226 return $job;
227 }
228
229 sub encode_cron {
230 my (@text) = @_;
231
232 my $cfg = {};
233
234 while (my $line = shift(@text)) {
235
236 my @arg = split('\s', $line);
237 my $param = parse_argv(@arg);
238
239 if ($param->{source} && $param->{dest}) {
240 $cfg->{$param->{source}}->{$param->{name}}->{dest} = $param->{dest};
241 $cfg->{$param->{source}}->{$param->{name}}->{verbose} = $param->{verbose};
242 $cfg->{$param->{source}}->{$param->{name}}->{limit} = $param->{limit};
243 $cfg->{$param->{source}}->{$param->{name}}->{maxsnap} = $param->{maxsnap};
244 $cfg->{$param->{source}}->{$param->{name}}->{skip} = $param->{skip};
245 $cfg->{$param->{source}}->{$param->{name}}->{method} = $param->{method};
246 }
247 }
248
249 return $cfg;
250 }
251
252 sub param_to_job {
253 my ($param) = @_;
254
255 my $job = {};
256
257 my $source = parse_target($param->{source});
258 my $dest = parse_target($param->{dest}) if $param->{dest};
259
260 $job->{name} = !$param->{name} ? "default" : $param->{name};
261 $job->{dest} = $param->{dest} if $param->{dest};
262 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
263 $job->{method} = "ssh" if !$job->{method};
264 $job->{limit} = $param->{limit};
265 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
266 $job->{source} = $param->{source};
267
268 return $job;
269 }
270
271 sub read_state {
272
273 if (!-e $STATE) {
274 make_path $CONFIG_PATH;
275 my $new_fh = IO::File->new("> $STATE");
276 die "Could not create $STATE: $!\n" if !$new_fh;
277 print $new_fh "{}";
278 close($new_fh);
279 return undef;
280 }
281
282 my $fh = IO::File->new("< $STATE");
283 die "Could not open file $STATE: $!\n" if !$fh;
284
285 my $text = <$fh>;
286 my $states = decode_json($text);
287
288 close($fh);
289
290 return $states;
291 }
292
293 sub update_state {
294 my ($job) = @_;
295 my $text;
296 my $in_fh;
297
298 eval {
299
300 $in_fh = IO::File->new("< $STATE");
301 die "Could not open file $STATE: $!\n" if !$in_fh;
302 lock($in_fh);
303 $text = <$in_fh>;
304 };
305
306 my $out_fh = IO::File->new("> $STATE.new");
307 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
308
309 my $states = {};
310 my $state = {};
311 if ($text){
312 $states = decode_json($text);
313 $state = $states->{$job->{source}}->{$job->{name}};
314 }
315
316 if ($job->{state} ne "del") {
317 $state->{state} = $job->{state};
318 $state->{lsync} = $job->{lsync};
319 $state->{vm_type} = $job->{vm_type};
320
321 for (my $i = 0; $job->{"snap$i"} ; $i++) {
322 $state->{"snap$i"} = $job->{"snap$i"};
323 }
324 $states->{$job->{source}}->{$job->{name}} = $state;
325 } else {
326
327 delete $states->{$job->{source}}->{$job->{name}};
328 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
329 }
330
331 $text = encode_json($states);
332 print $out_fh $text;
333
334 close($out_fh);
335 move("$STATE.new", $STATE);
336 eval {
337 close($in_fh);
338 };
339
340 return $states;
341 }
342
343 sub update_cron {
344 my ($job) = @_;
345
346 my $updated;
347 my $has_header;
348 my $line_no = 0;
349 my $text = "";
350 my $header = "SHELL=/bin/sh\n";
351 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
352
353 my $fh = IO::File->new("< $CRONJOBS");
354 die "Could not open file $CRONJOBS: $!\n" if !$fh;
355 lock($fh);
356
357 my @test = <$fh>;
358
359 while (my $line = shift(@test)) {
360 chomp($line);
361 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
362 $updated = 1;
363 next if $job->{state} eq "del";
364 $text .= format_job($job, $line);
365 } else {
366 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
367 $has_header = 1;
368 }
369 $text .= "$line\n";
370 }
371 $line_no++;
372 }
373
374 if (!$has_header) {
375 $text = "$header$text";
376 }
377
378 if (!$updated) {
379 $text .= format_job($job);
380 }
381 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
382 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
383
384 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
385 close ($new_fh);
386
387 die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
388 close ($fh);
389 }
390
391 sub format_job {
392 my ($job, $line) = @_;
393 my $text = "";
394
395 if ($job->{state} eq "stopped") {
396 $text = "#";
397 }
398 if ($line) {
399 $line =~ /^#*(.+) root/;
400 $text .= $1;
401 } else {
402 $text .= "*/$INTERVAL * * * *";
403 }
404 $text .= " root";
405 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
406 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
407 $text .= " --limit $job->{limit}" if $job->{limit};
408 $text .= " --method $job->{method}";
409 $text .= " --verbose" if $job->{verbose};
410 $text .= "\n";
411
412 return $text;
413 }
414
415 sub list {
416
417 my $cfg = read_cron();
418
419 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
420
421 my $states = read_state();
422 foreach my $source (sort keys%{$cfg}) {
423 foreach my $name (sort keys%{$cfg->{$source}}) {
424 $list .= sprintf("%-25s", cut_target_width($source, 25));
425 $list .= sprintf("%-25s", cut_target_width($name, 25));
426 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
427 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
428 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
429 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
430 }
431 }
432
433 return $list;
434 }
435
436 sub vm_exists {
437 my ($target) = @_;
438
439 my @cmd = ('ssh', "root\@$target->{ip}", '--') if $target->{ip};
440
441 my $res = undef;
442
443 return undef if !defined($target->{vmid});
444
445 eval { $res = run_cmd([@cmd, 'ls', "$QEMU_CONF/$target->{vmid}.conf"]) };
446
447 return "qemu" if $res;
448
449 eval { $res = run_cmd([@cmd, 'ls', "$LXC_CONF/$target->{vmid}.conf"]) };
450
451 return "lxc" if $res;
452
453 return undef;
454 }
455
456 sub init {
457 my ($param) = @_;
458
459 my $cfg = read_cron();
460
461 my $job = param_to_job($param);
462
463 $job->{state} = "ok";
464 $job->{lsync} = 0;
465
466 my $source = parse_target($param->{source});
467 my $dest = parse_target($param->{dest});
468
469 if (my $ip = $dest->{ip}) {
470 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
471 }
472
473 if (my $ip = $source->{ip}) {
474 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
475 }
476
477 die "Pool $dest->{all} does not exists\n" if check_pool_exists($dest);
478
479 my $check = check_pool_exists($source->{path}, $source->{ip}) if !$source->{vmid} && $source->{path};
480
481 die "Pool $source->{path} does not exists\n" if undef($check);
482
483 my $vm_type = vm_exists($source);
484 $job->{vm_type} = $vm_type;
485 $source->{vm_type} = $vm_type;
486
487 die "VM $source->{vmid} doesn't exist\n" if $param->{vmid} && !$vm_type;
488
489 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
490
491 #check if vm has zfs disks if not die;
492 get_disks($source, 1) if $source->{vmid};
493
494 update_cron($job);
495 update_state($job);
496
497 eval {
498 sync($param) if !$param->{skip};
499 };
500 if(my $err = $@) {
501 destroy_job($param);
502 print $err;
503 }
504 }
505
506 sub get_job {
507 my ($param) = @_;
508
509 my $cfg = read_cron();
510
511 if (!$cfg->{$param->{source}}->{$param->{name}}) {
512 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
513 }
514 my $job = $cfg->{$param->{source}}->{$param->{name}};
515 $job->{name} = $param->{name};
516 $job->{source} = $param->{source};
517 $job = add_state_to_job($job);
518
519 return $job;
520 }
521
522 sub destroy_job {
523 my ($param) = @_;
524
525 my $job = get_job($param);
526 $job->{state} = "del";
527
528 update_cron($job);
529 update_state($job);
530 }
531
532 sub sync {
533 my ($param) = @_;
534
535 my $lock_fh = IO::File->new("> $LOCKFILE");
536 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
537 lock($lock_fh);
538
539 my $date = get_date();
540 my $job;
541 eval {
542 $job = get_job($param);
543 };
544
545 if ($job && $job->{state} eq "syncing") {
546 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
547 }
548
549 my $dest = parse_target($param->{dest});
550 my $source = parse_target($param->{source});
551
552 my $sync_path = sub {
553 my ($source, $dest, $job, $param, $date) = @_;
554
555 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name});
556
557 snapshot_add($source, $dest, $param->{name}, $date);
558
559 send_image($source, $dest, $param);
560
561 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}) if ($source->{destroy} && $source->{old_snap});
562
563 };
564
565 my $vm_type = vm_exists($source);
566 $source->{vm_type} = $vm_type;
567
568 if ($job) {
569 $job->{state} = "syncing";
570 $job->{vm_type} = $vm_type if !$job->{vm_type};
571 update_state($job);
572 }
573
574 eval{
575 if ($source->{vmid}) {
576 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
577 my $disks = get_disks($source);
578
579 foreach my $disk (sort keys %{$disks}) {
580 $source->{all} = $disks->{$disk}->{all};
581 $source->{pool} = $disks->{$disk}->{pool};
582 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
583 $source->{last_part} = $disks->{$disk}->{last_part};
584 &$sync_path($source, $dest, $job, $param, $date);
585 }
586 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
587 send_config($source, $dest,'ssh');
588 } else {
589 send_config($source, $dest,'local');
590 }
591 } else {
592 &$sync_path($source, $dest, $job, $param, $date);
593 }
594 };
595 if(my $err = $@) {
596 if ($job) {
597 $job->{state} = "error";
598 update_state($job);
599 unlock($lock_fh);
600 close($lock_fh);
601 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
602 }
603 die "$err\n";
604 }
605
606 if ($job) {
607 $job->{state} = "ok";
608 $job->{lsync} = $date;
609 update_state($job);
610 }
611
612 unlock($lock_fh);
613 close($lock_fh);
614 }
615
616 sub snapshot_get{
617 my ($source, $dest, $max_snap, $name) = @_;
618
619 my $cmd = [];
620 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
621 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
622 push @$cmd, $source->{all};
623
624 my $raw = run_cmd($cmd);
625 my $index = 0;
626 my $line = "";
627 my $last_snap = undef;
628 my $old_snap;
629
630 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
631 $line = $1;
632 if ($line =~ m/(rep_$name.*)$/) {
633
634 $last_snap = $1 if (!$last_snap);
635 $old_snap = $1;
636 $index++;
637 if ($index == $max_snap) {
638 $source->{destroy} = 1;
639 last;
640 };
641 }
642 }
643
644 return ($old_snap, $last_snap) if $last_snap;
645
646 return undef;
647 }
648
649 sub snapshot_add {
650 my ($source, $dest, $name, $date) = @_;
651
652 my $snap_name = "rep_$name\_".$date;
653
654 $source->{new_snap} = $snap_name;
655
656 my $path = "$source->{all}\@$snap_name";
657
658 my $cmd = [];
659 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
660 push @$cmd, 'zfs', 'snapshot', $path;
661 eval{
662 run_cmd($cmd);
663 };
664
665 if (my $err = $@) {
666 snapshot_destroy($source, $dest, 'ssh', $snap_name);
667 die "$err\n";
668 }
669 }
670
671 sub write_cron {
672 my ($cfg) = @_;
673
674 my $text = "SHELL=/bin/sh\n";
675 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
676
677 my $fh = IO::File->new("> $CRONJOBS");
678 die "Could not open file: $!\n" if !$fh;
679
680 foreach my $source (sort keys%{$cfg}) {
681 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
682 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
683 $text .= "$PROG_PATH sync";
684 $text .= " -source ";
685 if ($cfg->{$source}->{$sync_name}->{vmid}) {
686 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
687 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
688 } else {
689 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
690 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
691 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
692 }
693 $text .= " -dest ";
694 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
695 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
696 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
697 $text .= " -name $sync_name ";
698 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
699 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
700 $text .= "\n";
701 }
702 }
703 die "Can't write to cron\n" if (!print($fh $text));
704 close($fh);
705 }
706
707 sub get_disks {
708 my ($target, $get_err) = @_;
709
710 my $cmd = [];
711 push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
712
713 if ($target->{vm_type} eq 'qemu') {
714 push @$cmd, 'qm', 'config', $target->{vmid};
715 } elsif ($target->{vm_type} eq 'lxc') {
716 push @$cmd, 'pct', 'config', $target->{vmid};
717 } else {
718 die "VM Type unknown\n";
719 }
720
721 my $res = run_cmd($cmd);
722
723 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $get_err);
724
725 return $disks;
726 }
727
728 sub run_cmd {
729 my ($cmd) = @_;
730 print "Start CMD\n" if $DEBUG;
731 print Dumper $cmd if $DEBUG;
732 if (ref($cmd) eq 'ARRAY') {
733 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
734 }
735 my $output = `$cmd 2>&1`;
736
737 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
738
739 chomp($output);
740 print Dumper $output if $DEBUG;
741 print "END CMD\n" if $DEBUG;
742 return $output;
743 }
744
745 sub parse_disks {
746 my ($text, $ip, $vm_type, $get_err) = @_;
747
748 my $disks;
749
750 my $num = 0;
751 while ($text && $text =~ s/^(.*?)(\n|$)//) {
752 my $line = $1;
753 my $error = $vm_type eq 'qemu' ? 1 : 0 ;
754
755 next if $line =~ /cdrom|none/;
756 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
757
758 #QEMU if backup is not set include in sync
759 next if $vm_type eq 'qemu && ($line =~ m/backup=(?i:0|no|off|false)/)';
760
761 #LXC if backup is not set do no in sync
762 $error = ($line =~ m/backup=(?i:1|yes|on|true)/) if $vm_type eq 'lxc';
763
764 my $disk = undef;
765 my $stor = undef;
766 if($line =~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): ([^:]+:)([A-Za-z0-9\-]+),(.*)$/) {
767 $disk = $3;
768 $stor = $2;
769 } else {
770 print "Disk: \"$line\" will not include in pve-sync\n" if $get_err || $error;
771 next;
772 }
773
774 my $cmd = [];
775 push @$cmd, 'ssh', "root\@$ip", '--' if $ip;
776 push @$cmd, 'pvesm', 'path', "$stor$disk";
777 my $path = run_cmd($cmd);
778
779 die "Get no path from pvesm path $stor$disk\n" if !$path;
780
781 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
782
783 my @array = split('/', $1);
784 $disks->{$num}->{pool} = shift(@array);
785 $disks->{$num}->{all} = $disks->{$num}->{pool};
786 if (0 < @array) {
787 $disks->{$num}->{path} = join('/', @array);
788 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
789 }
790 $disks->{$num}->{last_part} = $disk;
791 $disks->{$num}->{all} .= "\/$disk";
792
793 $num++;
794 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
795
796 $disks->{$num}->{pool} = $1;
797 $disks->{$num}->{all} = $disks->{$num}->{pool};
798
799 if ($2) {
800 $disks->{$num}->{path} = $3;
801 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
802 }
803
804 $disks->{$num}->{last_part} = $disk;
805 $disks->{$num}->{all} .= "\/$disk";
806
807 $num++;
808
809 } else {
810 die "ERROR: in path\n";
811 }
812 }
813
814 die "Vm include no disk on zfs.\n" if !$disks->{0};
815 return $disks;
816 }
817
818 sub snapshot_destroy {
819 my ($source, $dest, $method, $snap) = @_;
820
821 my @zfscmd = ('zfs', 'destroy');
822 my $snapshot = "$source->{all}\@$snap";
823
824 eval {
825 if($source->{ip} && $method eq 'ssh'){
826 run_cmd(['ssh', "root\@$source->{ip}", '--', @zfscmd, $snapshot]);
827 } else {
828 run_cmd([@zfscmd, $snapshot]);
829 }
830 };
831 if (my $erro = $@) {
832 warn "WARN: $erro";
833 }
834 if ($dest) {
835 my @ssh = $dest->{ip} ? ('ssh', "root\@$dest->{ip}", '--') : ();
836
837 my $path = "$dest->{all}\/$source->{last_part}";
838
839 eval {
840 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
841 };
842 if (my $erro = $@) {
843 warn "WARN: $erro";
844 }
845 }
846 }
847
848 sub snapshot_exist {
849 my ($source , $dest, $method) = @_;
850
851 my $cmd = [];
852 push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
853 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
854 push @$cmd, "$dest->{all}/$source->{last_part}\@$source->{old_snap}";
855
856 my $text = "";
857 eval {$text =run_cmd($cmd);};
858 if (my $erro =$@) {
859 warn "WARN: $erro";
860 return undef;
861 }
862
863 while ($text && $text =~ s/^(.*?)(\n|$)//) {
864 my $line =$1;
865 return 1 if $line =~ m/^.*$source->{old_snap}$/;
866 }
867 }
868
869 sub send_image {
870 my ($source, $dest, $param) = @_;
871
872 my $cmd = [];
873
874 push @$cmd, 'ssh', "root\@$source->{ip}", '--' if $source->{ip};
875 push @$cmd, 'zfs', 'send';
876 push @$cmd, '-v' if $param->{verbose};
877
878 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method})) {
879 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
880 }
881 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
882
883 if ($param->{limit}){
884 my $bwl = $param->{limit}*1024;
885 push @$cmd, \'|', 'cstream', '-t', $bwl;
886 }
887 my $target = "$dest->{all}/$source->{last_part}";
888 $target =~ s!/+!/!g;
889
890 push @$cmd, \'|';
891 push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
892 push @$cmd, 'zfs', 'recv', '-F', '--';
893 push @$cmd, "$target";
894
895 eval {
896 run_cmd($cmd)
897 };
898
899 if (my $erro = $@) {
900 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap});
901 die $erro;
902 };
903 }
904
905
906 sub send_config{
907 my ($source, $dest, $method) = @_;
908
909 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
910 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
911
912 my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
913
914 $dest_target_new = $config_dir.'/'.$dest_target_new;
915
916 if ($method eq 'ssh'){
917 if ($dest->{ip} && $source->{ip}) {
918 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
919 run_cmd(['scp', '--', "root\@[$source->{ip}]:$source_target", "root\@[$dest->{ip}]:$dest_target_new"]);
920 } elsif ($dest->{ip}) {
921 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
922 run_cmd(['scp', '--', $source_target, "root\@[$dest->{ip}]:$dest_target_new"]);
923 } elsif ($source->{ip}) {
924 run_cmd(['mkdir', '-p', '--', $config_dir]);
925 run_cmd(['scp', '--', "root\@$source->{ip}:$source_target", $dest_target_new]);
926 }
927
928 if ($source->{destroy}){
929 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
930 if($dest->{ip}){
931 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
932 } else {
933 run_cmd(['rm', '-f', '--', $dest_target_old]);
934 }
935 }
936 } elsif ($method eq 'local') {
937 run_cmd(['mkdir', '-p', '--', $config_dir]);
938 run_cmd(['cp', $source_target, $dest_target_new]);
939 }
940 }
941
942 sub get_date {
943 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
944 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
945
946 return $datestamp;
947 }
948
949 sub status {
950 my $cfg = read_cron();
951
952 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
953
954 my $states = read_state();
955
956 foreach my $source (sort keys%{$cfg}) {
957 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
958 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
959 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
960 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
961 }
962 }
963
964 return $status_list;
965 }
966
967 sub enable_job {
968 my ($param) = @_;
969
970 my $job = get_job($param);
971 $job->{state} = "ok";
972 update_state($job);
973 update_cron($job);
974 }
975
976 sub disable_job {
977 my ($param) = @_;
978
979 my $job = get_job($param);
980 $job->{state} = "stopped";
981 update_state($job);
982 update_cron($job);
983 }
984
985 my $command = $ARGV[0];
986
987 my $commands = {'destroy' => 1,
988 'create' => 1,
989 'sync' => 1,
990 'list' => 1,
991 'status' => 1,
992 'help' => 1,
993 'enable' => 1,
994 'disable' => 1};
995
996 if (!$command || !$commands->{$command}) {
997 usage();
998 die "\n";
999 }
1000
1001 my $help_sync = "$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1002 \twill sync one time\n
1003 \t-dest\tstring\n
1004 \t\tthe destination target is like [IP:]<Pool>[/Path]\n
1005 \t-limit\tinteger\n
1006 \t\tmax sync speed in kBytes/s, default unlimited\n
1007 \t-maxsnap\tinteger\n
1008 \t\thow much snapshots will be kept before get erased, default 1/n
1009 \t-name\tstring\n
1010 \t\tname of the sync job, if not set it is default.
1011 \tIt is only necessary if scheduler allready contains this source.\n
1012 \t-source\tstring\n
1013 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1014
1015 my $help_create = "$PROGNAME create -dest <string> -source <string> [OPTIONS]/n
1016 \tCreate a sync Job\n
1017 \t-dest\tstring\n
1018 \t\tthe destination target is like [IP]:<Pool>[/Path]\n
1019 \t-limit\tinteger\n
1020 \t\tmax sync speed in kBytes/s, default unlimited\n
1021 \t-maxsnap\tstring\n
1022 \t\thow much snapshots will be kept before get erased, default 1\n
1023 \t-name\tstring\n
1024 \t\tname of the sync job, if not set it is default\n
1025 \t-skip\tboolean\n
1026 \t\tif this flag is set it will skip the first sync\n
1027 \t-source\tstring\n
1028 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1029
1030 my $help_destroy = "$PROGNAME destroy -source <string> [OPTIONS]\n
1031 \tremove a sync Job from the scheduler\n
1032 \t-name\tstring\n
1033 \t\tname of the sync job, if not set it is default\n
1034 \t-source\tstring\n
1035 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1036
1037 my $help_help = "$PROGNAME help <cmd> [OPTIONS]\n
1038 \tGet help about specified command.\n
1039 \t<cmd>\tstring\n
1040 \t\tCommand name\n
1041 \t-verbose\tboolean\n
1042 \t\tVerbose output format.\n";
1043
1044 my $help_list = "$PROGNAME list\n
1045 \tGet a List of all scheduled Sync Jobs\n";
1046
1047 my $help_status = "$PROGNAME status\n
1048 \tGet the status of all scheduled Sync Jobs\n";
1049
1050 my $help_enable = "$PROGNAME enable -source <string> [OPTIONS]\n
1051 \tenable a syncjob and reset error\n
1052 \t-name\tstring\n
1053 \t\tname of the sync job, if not set it is default\n
1054 \t-source\tstring\n
1055 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1056
1057 my $help_disable = "$PROGNAME disable -source <string> [OPTIONS]\n
1058 \tpause a syncjob\n
1059 \t-name\tstring\n
1060 \t\tname of the sync job, if not set it is default\n
1061 \t-source\tstring\n
1062 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1063
1064 sub help {
1065 my ($command) = @_;
1066
1067 switch($command){
1068 case 'help'
1069 {
1070 die "$help_help\n";
1071 }
1072 case 'sync'
1073 {
1074 die "$help_sync\n";
1075 }
1076 case 'destroy'
1077 {
1078 die "$help_destroy\n";
1079 }
1080 case 'create'
1081 {
1082 die "$help_create\n";
1083 }
1084 case 'list'
1085 {
1086 die "$help_list\n";
1087 }
1088 case 'status'
1089 {
1090 die "$help_status\n";
1091 }
1092 case 'enable'
1093 {
1094 die "$help_enable\n";
1095 }
1096 case 'disable'
1097 {
1098 die "$help_enable\n";
1099 }
1100 }
1101
1102 }
1103
1104 my @arg = @ARGV;
1105 my $param = parse_argv(@arg);
1106
1107
1108 switch($command) {
1109 case "destroy"
1110 {
1111 die "$help_destroy\n" if !$param->{source};
1112 check_target($param->{source});
1113 destroy_job($param);
1114 }
1115 case "sync"
1116 {
1117 die "$help_sync\n" if !$param->{source} || !$param->{dest};
1118 check_target($param->{source});
1119 check_target($param->{dest});
1120 sync($param);
1121 }
1122 case "create"
1123 {
1124 die "$help_create\n" if !$param->{source} || !$param->{dest};
1125 check_target($param->{source});
1126 check_target($param->{dest});
1127 init($param);
1128 }
1129 case "status"
1130 {
1131 print status();
1132 }
1133 case "list"
1134 {
1135 print list();
1136 }
1137 case "help"
1138 {
1139 my $help_command = $ARGV[1];
1140 if ($help_command && $commands->{$help_command}) {
1141 print help($help_command);
1142 }
1143 if ($param->{verbose} == 1){
1144 exec("man $PROGNAME");
1145 } else {
1146 usage(1);
1147 }
1148 }
1149 case "enable"
1150 {
1151 die "$help_enable\n" if !$param->{source};
1152 check_target($param->{source});
1153 enable_job($param);
1154 }
1155 case "disable"
1156 {
1157 die "$help_disable\n" if !$param->{source};
1158 check_target($param->{source});
1159 disable_job($param);
1160 }
1161 }
1162
1163 sub usage {
1164 my ($help) = @_;
1165
1166 print("ERROR:\tno command specified\n") if !$help;
1167 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1168 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1169 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1170 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1171 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1172 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1173 print("\t$PROGNAME list\n");
1174 print("\t$PROGNAME status\n");
1175 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1176 }
1177
1178 sub check_target {
1179 my ($target) = @_;
1180 parse_target($target);
1181 }
1182
1183 __END__
1184
1185 =head1 NAME
1186
1187 pve-zsync - PVE ZFS Replication Manager
1188
1189 =head1 SYNOPSIS
1190
1191 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1192
1193 pve-zsync help <cmd> [OPTIONS]
1194
1195 Get help about specified command.
1196
1197 <cmd> string
1198
1199 Command name
1200
1201 -verbose boolean
1202
1203 Verbose output format.
1204
1205 pve-zsync create -dest <string> -source <string> [OPTIONS]
1206
1207 Create a sync Job
1208
1209 -dest string
1210
1211 the destination target is like [IP]:<Pool>[/Path]
1212
1213 -limit integer
1214
1215 max sync speed in kBytes/s, default unlimited
1216
1217 -maxsnap string
1218
1219 how much snapshots will be kept before get erased, default 1
1220
1221 -name string
1222
1223 name of the sync job, if not set it is default
1224
1225 -skip boolean
1226
1227 if this flag is set it will skip the first sync
1228
1229 -source string
1230
1231 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1232
1233 pve-zsync destroy -source <string> [OPTIONS]
1234
1235 remove a sync Job from the scheduler
1236
1237 -name string
1238
1239 name of the sync job, if not set it is default
1240
1241 -source string
1242
1243 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1244
1245 pve-zsync disable -source <string> [OPTIONS]
1246
1247 pause a sync job
1248
1249 -name string
1250
1251 name of the sync job, if not set it is default
1252
1253 -source string
1254
1255 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1256
1257 pve-zsync enable -source <string> [OPTIONS]
1258
1259 enable a syncjob and reset error
1260
1261 -name string
1262
1263 name of the sync job, if not set it is default
1264
1265 -source string
1266
1267 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1268 pve-zsync list
1269
1270 Get a List of all scheduled Sync Jobs
1271
1272 pve-zsync status
1273
1274 Get the status of all scheduled Sync Jobs
1275
1276 pve-zsync sync -dest <string> -source <string> [OPTIONS]
1277
1278 will sync one time
1279
1280 -dest string
1281
1282 the destination target is like [IP:]<Pool>[/Path]
1283
1284 -limit integer
1285
1286 max sync speed in kBytes/s, default unlimited
1287
1288 -maxsnap integer
1289
1290 how much snapshots will be kept before get erased, default 1
1291
1292 -name string
1293
1294 name of the sync job, if not set it is default.
1295 It is only necessary if scheduler allready contains this source.
1296
1297 -source string
1298
1299 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1300
1301 =head1 DESCRIPTION
1302
1303 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1304 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1305 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1306 To config cron see man crontab.
1307
1308 =head2 PVE ZFS Storage sync Tool
1309
1310 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1311
1312 =head1 EXAMPLES
1313
1314 add sync job from local VM to remote ZFS Server
1315 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1316
1317 =head1 IMPORTANT FILES
1318
1319 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1320
1321 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1322
1323 =head1 COPYRIGHT AND DISCLAIMER
1324
1325 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1326
1327 This program is free software: you can redistribute it and/or modify it
1328 under the terms of the GNU Affero General Public License as published
1329 by the Free Software Foundation, either version 3 of the License, or
1330 (at your option) any later version.
1331
1332 This program is distributed in the hope that it will be useful, but
1333 WITHOUT ANY WARRANTY; without even the implied warranty of
1334 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1335 Affero General Public License for more details.
1336
1337 You should have received a copy of the GNU Affero General Public
1338 License along with this program. If not, see
1339 <http://www.gnu.org/licenses/>.