]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
fix typo
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Copy qw(move);
9 use File::Path qw(make_path);
10 use Switch;
11 use JSON;
12 use IO::File;
13 use String::ShellQuote 'shell_quote';
14
15 my $PROGNAME = "pve-zsync";
16 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
17 my $STATE = "${CONFIG_PATH}/sync_state";
18 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
19 my $PATH = "/usr/sbin";
20 my $PVE_DIR = "/etc/pve/local";
21 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
22 my $LXC_CONF = "${PVE_DIR}/lxc";
23 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
24 my $PROG_PATH = "$PATH/${PROGNAME}";
25 my $INTERVAL = 15;
26 my $DEBUG = 0;
27
28 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
29 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
30 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
31 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
32
33 my $IPV6RE = "(?:" .
34 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
35 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
36 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
41 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
42 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
43
44 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
45 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
46 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
47 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
48 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
49
50 check_bin ('cstream');
51 check_bin ('zfs');
52 check_bin ('ssh');
53 check_bin ('scp');
54
55 sub check_bin {
56 my ($bin) = @_;
57
58 foreach my $p (split (/:/, $ENV{PATH})) {
59 my $fn = "$p/$bin";
60 if (-x $fn) {
61 return $fn;
62 }
63 }
64
65 die "unable to find command '$bin'\n";
66 }
67
68 sub cut_target_width {
69 my ($target, $max) = @_;
70
71 return $target if (length($target) <= $max);
72 my @spl = split('/', $target);
73
74 my $count = length($spl[@spl-1]);
75 return "..\/".substr($spl[@spl-1],($count-$max)+3 , $count) if $count > $max;
76
77 $count += length($spl[0]) if @spl > 1;
78 return substr($spl[0], 0, $max-4-length($spl[@spl-1]))."\/..\/".$spl[@spl-1] if $count > $max;
79
80 my $rest = 1;
81 $rest = $max-$count if ($max-$count > 0);
82
83 return "$spl[0]".substr($target, length($spl[0]), $rest)."..\/".$spl[@spl-1];
84 }
85
86 sub lock {
87 my ($fh) = @_;
88 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
89 }
90
91 sub unlock {
92 my ($fh) = @_;
93 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
94 }
95
96 sub get_status {
97 my ($source, $name, $status) = @_;
98
99 if ($status->{$source->{all}}->{$name}->{status}) {
100 return $status;
101 }
102
103 return undef;
104 }
105
106 sub check_pool_exists {
107 my ($target) = @_;
108
109 my $cmd = [];
110
111 if ($target->{ip}) {
112 push @$cmd, 'ssh', "root\@$target->{ip}", '--';
113 }
114 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
115 eval {
116 run_cmd($cmd);
117 };
118
119 if ($@) {
120 return 0;
121 }
122 return 1;
123 }
124
125 sub parse_target {
126 my ($text) = @_;
127
128 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
129 my $target = {};
130
131 if ($text !~ $TARGETRE) {
132 die "$errstr\n";
133 }
134 $target->{all} = $2;
135 $target->{ip} = $1 if $1;
136 my @parts = split('/', $2);
137
138 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
139
140 my $pool = $target->{pool} = shift(@parts);
141 die "$errstr\n" if !$pool;
142
143 if ($pool =~ m/^\d+$/) {
144 $target->{vmid} = $pool;
145 delete $target->{pool};
146 }
147
148 return $target if (@parts == 0);
149 $target->{last_part} = pop(@parts);
150
151 if ($target->{ip}) {
152 pop(@parts);
153 }
154 if (@parts > 0) {
155 $target->{path} = join('/', @parts);
156 }
157
158 return $target;
159 }
160
161 sub read_cron {
162
163 #This is for the first use to init file;
164 if (!-e $CRONJOBS) {
165 my $new_fh = IO::File->new("> $CRONJOBS");
166 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
167 close($new_fh);
168 return undef;
169 }
170
171 my $fh = IO::File->new("< $CRONJOBS");
172 die "Could not open file $CRONJOBS: $!\n" if !$fh;
173
174 my @text = <$fh>;
175
176 close($fh);
177
178 return encode_cron(@text);
179 }
180
181 sub parse_argv {
182 my (@arg) = @_;
183
184 my $param = {};
185 $param->{dest} = undef;
186 $param->{source} = undef;
187 $param->{verbose} = undef;
188 $param->{limit} = undef;
189 $param->{maxsnap} = undef;
190 $param->{name} = undef;
191 $param->{skip} = undef;
192 $param->{method} = undef;
193
194 my ($ret, $ar) = GetOptionsFromArray(\@arg,
195 'dest=s' => \$param->{dest},
196 'source=s' => \$param->{source},
197 'verbose' => \$param->{verbose},
198 'limit=i' => \$param->{limit},
199 'maxsnap=i' => \$param->{maxsnap},
200 'name=s' => \$param->{name},
201 'skip' => \$param->{skip},
202 'method=s' => \$param->{method});
203
204 if ($ret == 0) {
205 die "can't parse options\n";
206 }
207
208 $param->{name} = "default" if !$param->{name};
209 $param->{maxsnap} = 1 if !$param->{maxsnap};
210 $param->{method} = "ssh" if !$param->{method};
211
212 return $param;
213 }
214
215 sub add_state_to_job {
216 my ($job) = @_;
217
218 my $states = read_state();
219 my $state = $states->{$job->{source}}->{$job->{name}};
220
221 $job->{state} = $state->{state};
222 $job->{lsync} = $state->{lsync};
223 $job->{vm_type} = $state->{vm_type};
224
225 for (my $i = 0; $state->{"snap$i"}; $i++) {
226 $job->{"snap$i"} = $state->{"snap$i"};
227 }
228
229 return $job;
230 }
231
232 sub encode_cron {
233 my (@text) = @_;
234
235 my $cfg = {};
236
237 while (my $line = shift(@text)) {
238
239 my @arg = split('\s', $line);
240 my $param = parse_argv(@arg);
241
242 if ($param->{source} && $param->{dest}) {
243 $cfg->{$param->{source}}->{$param->{name}}->{dest} = $param->{dest};
244 $cfg->{$param->{source}}->{$param->{name}}->{verbose} = $param->{verbose};
245 $cfg->{$param->{source}}->{$param->{name}}->{limit} = $param->{limit};
246 $cfg->{$param->{source}}->{$param->{name}}->{maxsnap} = $param->{maxsnap};
247 $cfg->{$param->{source}}->{$param->{name}}->{skip} = $param->{skip};
248 $cfg->{$param->{source}}->{$param->{name}}->{method} = $param->{method};
249 }
250 }
251
252 return $cfg;
253 }
254
255 sub param_to_job {
256 my ($param) = @_;
257
258 my $job = {};
259
260 my $source = parse_target($param->{source});
261 my $dest = parse_target($param->{dest}) if $param->{dest};
262
263 $job->{name} = !$param->{name} ? "default" : $param->{name};
264 $job->{dest} = $param->{dest} if $param->{dest};
265 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
266 $job->{method} = "ssh" if !$job->{method};
267 $job->{limit} = $param->{limit};
268 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
269 $job->{source} = $param->{source};
270
271 return $job;
272 }
273
274 sub read_state {
275
276 if (!-e $STATE) {
277 make_path $CONFIG_PATH;
278 my $new_fh = IO::File->new("> $STATE");
279 die "Could not create $STATE: $!\n" if !$new_fh;
280 print $new_fh "{}";
281 close($new_fh);
282 return undef;
283 }
284
285 my $fh = IO::File->new("< $STATE");
286 die "Could not open file $STATE: $!\n" if !$fh;
287
288 my $text = <$fh>;
289 my $states = decode_json($text);
290
291 close($fh);
292
293 return $states;
294 }
295
296 sub update_state {
297 my ($job) = @_;
298 my $text;
299 my $in_fh;
300
301 eval {
302
303 $in_fh = IO::File->new("< $STATE");
304 die "Could not open file $STATE: $!\n" if !$in_fh;
305 lock($in_fh);
306 $text = <$in_fh>;
307 };
308
309 my $out_fh = IO::File->new("> $STATE.new");
310 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
311
312 my $states = {};
313 my $state = {};
314 if ($text){
315 $states = decode_json($text);
316 $state = $states->{$job->{source}}->{$job->{name}};
317 }
318
319 if ($job->{state} ne "del") {
320 $state->{state} = $job->{state};
321 $state->{lsync} = $job->{lsync};
322 $state->{vm_type} = $job->{vm_type};
323
324 for (my $i = 0; $job->{"snap$i"} ; $i++) {
325 $state->{"snap$i"} = $job->{"snap$i"};
326 }
327 $states->{$job->{source}}->{$job->{name}} = $state;
328 } else {
329
330 delete $states->{$job->{source}}->{$job->{name}};
331 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
332 }
333
334 $text = encode_json($states);
335 print $out_fh $text;
336
337 close($out_fh);
338 move("$STATE.new", $STATE);
339 eval {
340 close($in_fh);
341 };
342
343 return $states;
344 }
345
346 sub update_cron {
347 my ($job) = @_;
348
349 my $updated;
350 my $has_header;
351 my $line_no = 0;
352 my $text = "";
353 my $header = "SHELL=/bin/sh\n";
354 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
355
356 my $fh = IO::File->new("< $CRONJOBS");
357 die "Could not open file $CRONJOBS: $!\n" if !$fh;
358 lock($fh);
359
360 my @test = <$fh>;
361
362 while (my $line = shift(@test)) {
363 chomp($line);
364 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
365 $updated = 1;
366 next if $job->{state} eq "del";
367 $text .= format_job($job, $line);
368 } else {
369 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
370 $has_header = 1;
371 }
372 $text .= "$line\n";
373 }
374 $line_no++;
375 }
376
377 if (!$has_header) {
378 $text = "$header$text";
379 }
380
381 if (!$updated) {
382 $text .= format_job($job);
383 }
384 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
385 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
386
387 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
388 close ($new_fh);
389
390 die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
391 close ($fh);
392 }
393
394 sub format_job {
395 my ($job, $line) = @_;
396 my $text = "";
397
398 if ($job->{state} eq "stopped") {
399 $text = "#";
400 }
401 if ($line) {
402 $line =~ /^#*(.+) root/;
403 $text .= $1;
404 } else {
405 $text .= "*/$INTERVAL * * * *";
406 }
407 $text .= " root";
408 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
409 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
410 $text .= " --limit $job->{limit}" if $job->{limit};
411 $text .= " --method $job->{method}";
412 $text .= " --verbose" if $job->{verbose};
413 $text .= "\n";
414
415 return $text;
416 }
417
418 sub list {
419
420 my $cfg = read_cron();
421
422 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
423
424 my $states = read_state();
425 foreach my $source (sort keys%{$cfg}) {
426 foreach my $name (sort keys%{$cfg->{$source}}) {
427 $list .= sprintf("%-25s", cut_target_width($source, 25));
428 $list .= sprintf("%-25s", cut_target_width($name, 25));
429 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
430 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
431 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
432 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
433 }
434 }
435
436 return $list;
437 }
438
439 sub vm_exists {
440 my ($target) = @_;
441
442 my @cmd = ('ssh', "root\@$target->{ip}", '--') if $target->{ip};
443
444 my $res = undef;
445
446 return undef if !defined($target->{vmid});
447
448 eval { $res = run_cmd([@cmd, 'ls', "$QEMU_CONF/$target->{vmid}.conf"]) };
449
450 return "qemu" if $res;
451
452 eval { $res = run_cmd([@cmd, 'ls', "$LXC_CONF/$target->{vmid}.conf"]) };
453
454 return "lxc" if $res;
455
456 return undef;
457 }
458
459 sub init {
460 my ($param) = @_;
461
462 my $cfg = read_cron();
463
464 my $job = param_to_job($param);
465
466 $job->{state} = "ok";
467 $job->{lsync} = 0;
468
469 my $source = parse_target($param->{source});
470 my $dest = parse_target($param->{dest});
471
472 if (my $ip = $dest->{ip}) {
473 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
474 }
475
476 if (my $ip = $source->{ip}) {
477 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
478 }
479
480 die "Pool $dest->{all} does not exists\n" if check_pool_exists($dest);
481
482 my $check = check_pool_exists($source->{path}, $source->{ip}) if !$source->{vmid} && $source->{path};
483
484 die "Pool $source->{path} does not exists\n" if undef($check);
485
486 my $vm_type = vm_exists($source);
487 $job->{vm_type} = $vm_type;
488 $source->{vm_type} = $vm_type;
489
490 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
491
492 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
493
494 #check if vm has zfs disks if not die;
495 get_disks($source, 1) if $source->{vmid};
496
497 update_cron($job);
498 update_state($job);
499
500 eval {
501 sync($param) if !$param->{skip};
502 };
503 if(my $err = $@) {
504 destroy_job($param);
505 print $err;
506 }
507 }
508
509 sub get_job {
510 my ($param) = @_;
511
512 my $cfg = read_cron();
513
514 if (!$cfg->{$param->{source}}->{$param->{name}}) {
515 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
516 }
517 my $job = $cfg->{$param->{source}}->{$param->{name}};
518 $job->{name} = $param->{name};
519 $job->{source} = $param->{source};
520 $job = add_state_to_job($job);
521
522 return $job;
523 }
524
525 sub destroy_job {
526 my ($param) = @_;
527
528 my $job = get_job($param);
529 $job->{state} = "del";
530
531 update_cron($job);
532 update_state($job);
533 }
534
535 sub sync {
536 my ($param) = @_;
537
538 my $lock_fh = IO::File->new("> $LOCKFILE");
539 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
540 lock($lock_fh);
541
542 my $date = get_date();
543 my $job;
544 eval {
545 $job = get_job($param);
546 };
547
548 if ($job && $job->{state} eq "syncing") {
549 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
550 }
551
552 my $dest = parse_target($param->{dest});
553 my $source = parse_target($param->{source});
554
555 my $sync_path = sub {
556 my ($source, $dest, $job, $param, $date) = @_;
557
558 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name});
559
560 snapshot_add($source, $dest, $param->{name}, $date);
561
562 send_image($source, $dest, $param);
563
564 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}) if ($source->{destroy} && $source->{old_snap});
565
566 };
567
568 my $vm_type = vm_exists($source);
569 $source->{vm_type} = $vm_type;
570
571 if ($job) {
572 $job->{state} = "syncing";
573 $job->{vm_type} = $vm_type if !$job->{vm_type};
574 update_state($job);
575 }
576
577 eval{
578 if ($source->{vmid}) {
579 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
580 my $disks = get_disks($source);
581
582 foreach my $disk (sort keys %{$disks}) {
583 $source->{all} = $disks->{$disk}->{all};
584 $source->{pool} = $disks->{$disk}->{pool};
585 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
586 $source->{last_part} = $disks->{$disk}->{last_part};
587 &$sync_path($source, $dest, $job, $param, $date);
588 }
589 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
590 send_config($source, $dest,'ssh');
591 } else {
592 send_config($source, $dest,'local');
593 }
594 } else {
595 &$sync_path($source, $dest, $job, $param, $date);
596 }
597 };
598 if(my $err = $@) {
599 if ($job) {
600 $job->{state} = "error";
601 update_state($job);
602 unlock($lock_fh);
603 close($lock_fh);
604 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
605 }
606 die "$err\n";
607 }
608
609 if ($job) {
610 $job->{state} = "ok";
611 $job->{lsync} = $date;
612 update_state($job);
613 }
614
615 unlock($lock_fh);
616 close($lock_fh);
617 }
618
619 sub snapshot_get{
620 my ($source, $dest, $max_snap, $name) = @_;
621
622 my $cmd = [];
623 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
624 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
625 push @$cmd, $source->{all};
626
627 my $raw = run_cmd($cmd);
628 my $index = 0;
629 my $line = "";
630 my $last_snap = undef;
631 my $old_snap;
632
633 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
634 $line = $1;
635 if ($line =~ m/(rep_$name.*)$/) {
636
637 $last_snap = $1 if (!$last_snap);
638 $old_snap = $1;
639 $index++;
640 if ($index == $max_snap) {
641 $source->{destroy} = 1;
642 last;
643 };
644 }
645 }
646
647 return ($old_snap, $last_snap) if $last_snap;
648
649 return undef;
650 }
651
652 sub snapshot_add {
653 my ($source, $dest, $name, $date) = @_;
654
655 my $snap_name = "rep_$name\_".$date;
656
657 $source->{new_snap} = $snap_name;
658
659 my $path = "$source->{all}\@$snap_name";
660
661 my $cmd = [];
662 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
663 push @$cmd, 'zfs', 'snapshot', $path;
664 eval{
665 run_cmd($cmd);
666 };
667
668 if (my $err = $@) {
669 snapshot_destroy($source, $dest, 'ssh', $snap_name);
670 die "$err\n";
671 }
672 }
673
674 sub write_cron {
675 my ($cfg) = @_;
676
677 my $text = "SHELL=/bin/sh\n";
678 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
679
680 my $fh = IO::File->new("> $CRONJOBS");
681 die "Could not open file: $!\n" if !$fh;
682
683 foreach my $source (sort keys%{$cfg}) {
684 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
685 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
686 $text .= "$PROG_PATH sync";
687 $text .= " -source ";
688 if ($cfg->{$source}->{$sync_name}->{vmid}) {
689 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
690 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
691 } else {
692 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
693 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
694 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
695 }
696 $text .= " -dest ";
697 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
698 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
699 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
700 $text .= " -name $sync_name ";
701 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
702 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
703 $text .= "\n";
704 }
705 }
706 die "Can't write to cron\n" if (!print($fh $text));
707 close($fh);
708 }
709
710 sub get_disks {
711 my ($target, $get_err) = @_;
712
713 my $cmd = [];
714 push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
715
716 if ($target->{vm_type} eq 'qemu') {
717 push @$cmd, 'qm', 'config', $target->{vmid};
718 } elsif ($target->{vm_type} eq 'lxc') {
719 push @$cmd, 'pct', 'config', $target->{vmid};
720 } else {
721 die "VM Type unknown\n";
722 }
723
724 my $res = run_cmd($cmd);
725
726 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $get_err);
727
728 return $disks;
729 }
730
731 sub run_cmd {
732 my ($cmd) = @_;
733 print "Start CMD\n" if $DEBUG;
734 print Dumper $cmd if $DEBUG;
735 if (ref($cmd) eq 'ARRAY') {
736 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
737 }
738 my $output = `$cmd 2>&1`;
739
740 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
741
742 chomp($output);
743 print Dumper $output if $DEBUG;
744 print "END CMD\n" if $DEBUG;
745 return $output;
746 }
747
748 sub parse_disks {
749 my ($text, $ip, $vm_type, $get_err) = @_;
750
751 my $disks;
752
753 my $num = 0;
754 while ($text && $text =~ s/^(.*?)(\n|$)//) {
755 my $line = $1;
756 my $error = $vm_type eq 'qemu' ? 1 : 0 ;
757
758 next if $line =~ /cdrom|none/;
759 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
760
761 #QEMU if backup is not set include in sync
762 next if $vm_type eq 'qemu && ($line =~ m/backup=(?i:0|no|off|false)/)';
763
764 #LXC if backup is not set do no in sync
765 $error = ($line =~ m/backup=(?i:1|yes|on|true)/) if $vm_type eq 'lxc';
766
767 my $disk = undef;
768 my $stor = undef;
769 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
770 my @parameter = split(/,/,$1);
771
772 foreach my $opt (@parameter) {
773 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
774 $disk = $2;
775 $stor = $1;
776 last;
777 }
778 }
779
780 } else {
781 print "Disk: \"$line\" will not include in pve-sync\n" if $get_err || $error;
782 next;
783 }
784
785 my $cmd = [];
786 push @$cmd, 'ssh', "root\@$ip", '--' if $ip;
787 push @$cmd, 'pvesm', 'path', "$stor$disk";
788 my $path = run_cmd($cmd);
789
790 die "Get no path from pvesm path $stor$disk\n" if !$path;
791
792 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
793
794 my @array = split('/', $1);
795 $disks->{$num}->{pool} = shift(@array);
796 $disks->{$num}->{all} = $disks->{$num}->{pool};
797 if (0 < @array) {
798 $disks->{$num}->{path} = join('/', @array);
799 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
800 }
801 $disks->{$num}->{last_part} = $disk;
802 $disks->{$num}->{all} .= "\/$disk";
803
804 $num++;
805 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
806
807 $disks->{$num}->{pool} = $1;
808 $disks->{$num}->{all} = $disks->{$num}->{pool};
809
810 if ($2) {
811 $disks->{$num}->{path} = $3;
812 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
813 }
814
815 $disks->{$num}->{last_part} = $disk;
816 $disks->{$num}->{all} .= "\/$disk";
817
818 $num++;
819
820 } else {
821 die "ERROR: in path\n";
822 }
823 }
824
825 die "Vm include no disk on zfs.\n" if !$disks->{0};
826 return $disks;
827 }
828
829 sub snapshot_destroy {
830 my ($source, $dest, $method, $snap) = @_;
831
832 my @zfscmd = ('zfs', 'destroy');
833 my $snapshot = "$source->{all}\@$snap";
834
835 eval {
836 if($source->{ip} && $method eq 'ssh'){
837 run_cmd(['ssh', "root\@$source->{ip}", '--', @zfscmd, $snapshot]);
838 } else {
839 run_cmd([@zfscmd, $snapshot]);
840 }
841 };
842 if (my $erro = $@) {
843 warn "WARN: $erro";
844 }
845 if ($dest) {
846 my @ssh = $dest->{ip} ? ('ssh', "root\@$dest->{ip}", '--') : ();
847
848 my $path = "$dest->{all}\/$source->{last_part}";
849
850 eval {
851 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
852 };
853 if (my $erro = $@) {
854 warn "WARN: $erro";
855 }
856 }
857 }
858
859 sub snapshot_exist {
860 my ($source , $dest, $method) = @_;
861
862 my $cmd = [];
863 push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
864 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
865 push @$cmd, "$dest->{all}/$source->{last_part}\@$source->{old_snap}";
866
867 my $text = "";
868 eval {$text =run_cmd($cmd);};
869 if (my $erro =$@) {
870 warn "WARN: $erro";
871 return undef;
872 }
873
874 while ($text && $text =~ s/^(.*?)(\n|$)//) {
875 my $line =$1;
876 return 1 if $line =~ m/^.*$source->{old_snap}$/;
877 }
878 }
879
880 sub send_image {
881 my ($source, $dest, $param) = @_;
882
883 my $cmd = [];
884
885 push @$cmd, 'ssh', "root\@$source->{ip}", '--' if $source->{ip};
886 push @$cmd, 'zfs', 'send';
887 push @$cmd, '-v' if $param->{verbose};
888
889 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method})) {
890 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
891 }
892 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
893
894 if ($param->{limit}){
895 my $bwl = $param->{limit}*1024;
896 push @$cmd, \'|', 'cstream', '-t', $bwl;
897 }
898 my $target = "$dest->{all}/$source->{last_part}";
899 $target =~ s!/+!/!g;
900
901 push @$cmd, \'|';
902 push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
903 push @$cmd, 'zfs', 'recv', '-F', '--';
904 push @$cmd, "$target";
905
906 eval {
907 run_cmd($cmd)
908 };
909
910 if (my $erro = $@) {
911 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap});
912 die $erro;
913 };
914 }
915
916
917 sub send_config{
918 my ($source, $dest, $method) = @_;
919
920 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
921 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
922
923 my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
924
925 $dest_target_new = $config_dir.'/'.$dest_target_new;
926
927 if ($method eq 'ssh'){
928 if ($dest->{ip} && $source->{ip}) {
929 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
930 run_cmd(['scp', '--', "root\@[$source->{ip}]:$source_target", "root\@[$dest->{ip}]:$dest_target_new"]);
931 } elsif ($dest->{ip}) {
932 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
933 run_cmd(['scp', '--', $source_target, "root\@[$dest->{ip}]:$dest_target_new"]);
934 } elsif ($source->{ip}) {
935 run_cmd(['mkdir', '-p', '--', $config_dir]);
936 run_cmd(['scp', '--', "root\@$source->{ip}:$source_target", $dest_target_new]);
937 }
938
939 if ($source->{destroy}){
940 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
941 if($dest->{ip}){
942 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
943 } else {
944 run_cmd(['rm', '-f', '--', $dest_target_old]);
945 }
946 }
947 } elsif ($method eq 'local') {
948 run_cmd(['mkdir', '-p', '--', $config_dir]);
949 run_cmd(['cp', $source_target, $dest_target_new]);
950 }
951 }
952
953 sub get_date {
954 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
955 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
956
957 return $datestamp;
958 }
959
960 sub status {
961 my $cfg = read_cron();
962
963 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
964
965 my $states = read_state();
966
967 foreach my $source (sort keys%{$cfg}) {
968 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
969 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
970 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
971 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
972 }
973 }
974
975 return $status_list;
976 }
977
978 sub enable_job {
979 my ($param) = @_;
980
981 my $job = get_job($param);
982 $job->{state} = "ok";
983 update_state($job);
984 update_cron($job);
985 }
986
987 sub disable_job {
988 my ($param) = @_;
989
990 my $job = get_job($param);
991 $job->{state} = "stopped";
992 update_state($job);
993 update_cron($job);
994 }
995
996 my $command = $ARGV[0];
997
998 my $commands = {'destroy' => 1,
999 'create' => 1,
1000 'sync' => 1,
1001 'list' => 1,
1002 'status' => 1,
1003 'help' => 1,
1004 'enable' => 1,
1005 'disable' => 1};
1006
1007 if (!$command || !$commands->{$command}) {
1008 usage();
1009 die "\n";
1010 }
1011
1012 my $help_sync = "$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1013 \twill sync one time\n
1014 \t-dest\tstring\n
1015 \t\tthe destination target is like [IP:]<Pool>[/Path]\n
1016 \t-limit\tinteger\n
1017 \t\tmax sync speed in kBytes/s, default unlimited\n
1018 \t-maxsnap\tinteger\n
1019 \t\thow much snapshots will be kept before get erased, default 1/n
1020 \t-name\tstring\n
1021 \t\tname of the sync job, if not set it is default.
1022 \tIt is only necessary if scheduler allready contains this source.\n
1023 \t-source\tstring\n
1024 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1025
1026 my $help_create = "$PROGNAME create -dest <string> -source <string> [OPTIONS]/n
1027 \tCreate a sync Job\n
1028 \t-dest\tstring\n
1029 \t\tthe destination target is like [IP]:<Pool>[/Path]\n
1030 \t-limit\tinteger\n
1031 \t\tmax sync speed in kBytes/s, default unlimited\n
1032 \t-maxsnap\tstring\n
1033 \t\thow much snapshots will be kept before get erased, default 1\n
1034 \t-name\tstring\n
1035 \t\tname of the sync job, if not set it is default\n
1036 \t-skip\tboolean\n
1037 \t\tif this flag is set it will skip the first sync\n
1038 \t-source\tstring\n
1039 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1040
1041 my $help_destroy = "$PROGNAME destroy -source <string> [OPTIONS]\n
1042 \tremove a sync Job from the scheduler\n
1043 \t-name\tstring\n
1044 \t\tname of the sync job, if not set it is default\n
1045 \t-source\tstring\n
1046 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1047
1048 my $help_help = "$PROGNAME help <cmd> [OPTIONS]\n
1049 \tGet help about specified command.\n
1050 \t<cmd>\tstring\n
1051 \t\tCommand name\n
1052 \t-verbose\tboolean\n
1053 \t\tVerbose output format.\n";
1054
1055 my $help_list = "$PROGNAME list\n
1056 \tGet a List of all scheduled Sync Jobs\n";
1057
1058 my $help_status = "$PROGNAME status\n
1059 \tGet the status of all scheduled Sync Jobs\n";
1060
1061 my $help_enable = "$PROGNAME enable -source <string> [OPTIONS]\n
1062 \tenable a syncjob and reset error\n
1063 \t-name\tstring\n
1064 \t\tname of the sync job, if not set it is default\n
1065 \t-source\tstring\n
1066 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1067
1068 my $help_disable = "$PROGNAME disable -source <string> [OPTIONS]\n
1069 \tpause a syncjob\n
1070 \t-name\tstring\n
1071 \t\tname of the sync job, if not set it is default\n
1072 \t-source\tstring\n
1073 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1074
1075 sub help {
1076 my ($command) = @_;
1077
1078 switch($command){
1079 case 'help'
1080 {
1081 die "$help_help\n";
1082 }
1083 case 'sync'
1084 {
1085 die "$help_sync\n";
1086 }
1087 case 'destroy'
1088 {
1089 die "$help_destroy\n";
1090 }
1091 case 'create'
1092 {
1093 die "$help_create\n";
1094 }
1095 case 'list'
1096 {
1097 die "$help_list\n";
1098 }
1099 case 'status'
1100 {
1101 die "$help_status\n";
1102 }
1103 case 'enable'
1104 {
1105 die "$help_enable\n";
1106 }
1107 case 'disable'
1108 {
1109 die "$help_enable\n";
1110 }
1111 }
1112
1113 }
1114
1115 my @arg = @ARGV;
1116 my $param = parse_argv(@arg);
1117
1118
1119 switch($command) {
1120 case "destroy"
1121 {
1122 die "$help_destroy\n" if !$param->{source};
1123 check_target($param->{source});
1124 destroy_job($param);
1125 }
1126 case "sync"
1127 {
1128 die "$help_sync\n" if !$param->{source} || !$param->{dest};
1129 check_target($param->{source});
1130 check_target($param->{dest});
1131 sync($param);
1132 }
1133 case "create"
1134 {
1135 die "$help_create\n" if !$param->{source} || !$param->{dest};
1136 check_target($param->{source});
1137 check_target($param->{dest});
1138 init($param);
1139 }
1140 case "status"
1141 {
1142 print status();
1143 }
1144 case "list"
1145 {
1146 print list();
1147 }
1148 case "help"
1149 {
1150 my $help_command = $ARGV[1];
1151 if ($help_command && $commands->{$help_command}) {
1152 print help($help_command);
1153 }
1154 if ($param->{verbose} == 1){
1155 exec("man $PROGNAME");
1156 } else {
1157 usage(1);
1158 }
1159 }
1160 case "enable"
1161 {
1162 die "$help_enable\n" if !$param->{source};
1163 check_target($param->{source});
1164 enable_job($param);
1165 }
1166 case "disable"
1167 {
1168 die "$help_disable\n" if !$param->{source};
1169 check_target($param->{source});
1170 disable_job($param);
1171 }
1172 }
1173
1174 sub usage {
1175 my ($help) = @_;
1176
1177 print("ERROR:\tno command specified\n") if !$help;
1178 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1179 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1180 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1181 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1182 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1183 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1184 print("\t$PROGNAME list\n");
1185 print("\t$PROGNAME status\n");
1186 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1187 }
1188
1189 sub check_target {
1190 my ($target) = @_;
1191 parse_target($target);
1192 }
1193
1194 __END__
1195
1196 =head1 NAME
1197
1198 pve-zsync - PVE ZFS Replication Manager
1199
1200 =head1 SYNOPSIS
1201
1202 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1203
1204 pve-zsync help <cmd> [OPTIONS]
1205
1206 Get help about specified command.
1207
1208 <cmd> string
1209
1210 Command name
1211
1212 -verbose boolean
1213
1214 Verbose output format.
1215
1216 pve-zsync create -dest <string> -source <string> [OPTIONS]
1217
1218 Create a sync Job
1219
1220 -dest string
1221
1222 the destination target is like [IP]:<Pool>[/Path]
1223
1224 -limit integer
1225
1226 max sync speed in kBytes/s, default unlimited
1227
1228 -maxsnap string
1229
1230 how much snapshots will be kept before get erased, default 1
1231
1232 -name string
1233
1234 name of the sync job, if not set it is default
1235
1236 -skip boolean
1237
1238 if this flag is set it will skip the first sync
1239
1240 -source string
1241
1242 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1243
1244 pve-zsync destroy -source <string> [OPTIONS]
1245
1246 remove a sync Job from the scheduler
1247
1248 -name string
1249
1250 name of the sync job, if not set it is default
1251
1252 -source string
1253
1254 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1255
1256 pve-zsync disable -source <string> [OPTIONS]
1257
1258 pause a sync job
1259
1260 -name string
1261
1262 name of the sync job, if not set it is default
1263
1264 -source string
1265
1266 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1267
1268 pve-zsync enable -source <string> [OPTIONS]
1269
1270 enable a syncjob and reset error
1271
1272 -name string
1273
1274 name of the sync job, if not set it is default
1275
1276 -source string
1277
1278 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1279 pve-zsync list
1280
1281 Get a List of all scheduled Sync Jobs
1282
1283 pve-zsync status
1284
1285 Get the status of all scheduled Sync Jobs
1286
1287 pve-zsync sync -dest <string> -source <string> [OPTIONS]
1288
1289 will sync one time
1290
1291 -dest string
1292
1293 the destination target is like [IP:]<Pool>[/Path]
1294
1295 -limit integer
1296
1297 max sync speed in kBytes/s, default unlimited
1298
1299 -maxsnap integer
1300
1301 how much snapshots will be kept before get erased, default 1
1302
1303 -name string
1304
1305 name of the sync job, if not set it is default.
1306 It is only necessary if scheduler allready contains this source.
1307
1308 -source string
1309
1310 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1311
1312 =head1 DESCRIPTION
1313
1314 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1315 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1316 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1317 To config cron see man crontab.
1318
1319 =head2 PVE ZFS Storage sync Tool
1320
1321 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1322
1323 =head1 EXAMPLES
1324
1325 add sync job from local VM to remote ZFS Server
1326 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1327
1328 =head1 IMPORTANT FILES
1329
1330 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1331
1332 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1333
1334 =head1 COPYRIGHT AND DISCLAIMER
1335
1336 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1337
1338 This program is free software: you can redistribute it and/or modify it
1339 under the terms of the GNU Affero General Public License as published
1340 by the Free Software Foundation, either version 3 of the License, or
1341 (at your option) any later version.
1342
1343 This program is distributed in the hope that it will be useful, but
1344 WITHOUT ANY WARRANTY; without even the implied warranty of
1345 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1346 Affero General Public License for more details.
1347
1348 You should have received a copy of the GNU Affero General Public
1349 License along with this program. If not, see
1350 <http://www.gnu.org/licenses/>.