]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
fix #887 add "--limit" option to cron
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Copy qw(move);
9 use File::Path qw(make_path);
10 use Switch;
11 use JSON;
12 use IO::File;
13 use String::ShellQuote 'shell_quote';
14
15 my $PROGNAME = "pve-zsync";
16 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
17 my $STATE = "${CONFIG_PATH}/sync_state";
18 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
19 my $PATH = "/usr/sbin";
20 my $PVE_DIR = "/etc/pve/local";
21 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
22 my $LXC_CONF = "${PVE_DIR}/lxc";
23 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
24 my $PROG_PATH = "$PATH/${PROGNAME}";
25 my $INTERVAL = 15;
26 my $DEBUG = 0;
27
28 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
29 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
30 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
31 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
32
33 my $IPV6RE = "(?:" .
34 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
35 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
36 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
41 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
42 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
43
44 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
45 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
46 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
47 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
48 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
49
50 check_bin ('cstream');
51 check_bin ('zfs');
52 check_bin ('ssh');
53 check_bin ('scp');
54
55 sub check_bin {
56 my ($bin) = @_;
57
58 foreach my $p (split (/:/, $ENV{PATH})) {
59 my $fn = "$p/$bin";
60 if (-x $fn) {
61 return $fn;
62 }
63 }
64
65 die "unable to find command '$bin'\n";
66 }
67
68 sub cut_target_width {
69 my ($target, $max) = @_;
70
71 return $target if (length($target) <= $max);
72 my @spl = split('/', $target);
73
74 my $count = length($spl[@spl-1]);
75 return "..\/".substr($spl[@spl-1],($count-$max)+3 , $count) if $count > $max;
76
77 $count += length($spl[0]) if @spl > 1;
78 return substr($spl[0], 0, $max-4-length($spl[@spl-1]))."\/..\/".$spl[@spl-1] if $count > $max;
79
80 my $rest = 1;
81 $rest = $max-$count if ($max-$count > 0);
82
83 return "$spl[0]".substr($target, length($spl[0]), $rest)."..\/".$spl[@spl-1];
84 }
85
86 sub lock {
87 my ($fh) = @_;
88 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
89 }
90
91 sub unlock {
92 my ($fh) = @_;
93 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
94 }
95
96 sub get_status {
97 my ($source, $name, $status) = @_;
98
99 if ($status->{$source->{all}}->{$name}->{status}) {
100 return $status;
101 }
102
103 return undef;
104 }
105
106 sub check_pool_exists {
107 my ($target) = @_;
108
109 my $cmd = [];
110 push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
111 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
112 eval {
113 run_cmd($cmd);
114 };
115
116 if ($@) {
117 return 1;
118 }
119 return undef;
120 }
121
122 sub parse_target {
123 my ($text) = @_;
124
125 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
126 my $target = {};
127
128 if ($text !~ $TARGETRE) {
129 die "$errstr\n";
130 }
131 $target->{all} = $2;
132 $target->{ip} = $1 if $1;
133 my @parts = split('/', $2);
134
135 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
136
137 my $pool = $target->{pool} = shift(@parts);
138 die "$errstr\n" if !$pool;
139
140 if ($pool =~ m/^\d+$/) {
141 $target->{vmid} = $pool;
142 delete $target->{pool};
143 }
144
145 return $target if (@parts == 0);
146 $target->{last_part} = pop(@parts);
147
148 if ($target->{ip}) {
149 pop(@parts);
150 }
151 if (@parts > 0) {
152 $target->{path} = join('/', @parts);
153 }
154
155 return $target;
156 }
157
158 sub read_cron {
159
160 #This is for the first use to init file;
161 if (!-e $CRONJOBS) {
162 my $new_fh = IO::File->new("> $CRONJOBS");
163 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
164 close($new_fh);
165 return undef;
166 }
167
168 my $fh = IO::File->new("< $CRONJOBS");
169 die "Could not open file $CRONJOBS: $!\n" if !$fh;
170
171 my @text = <$fh>;
172
173 close($fh);
174
175 return encode_cron(@text);
176 }
177
178 sub parse_argv {
179 my (@arg) = @_;
180
181 my $param = {};
182 $param->{dest} = undef;
183 $param->{source} = undef;
184 $param->{verbose} = undef;
185 $param->{limit} = undef;
186 $param->{maxsnap} = undef;
187 $param->{name} = undef;
188 $param->{skip} = undef;
189 $param->{method} = undef;
190
191 my ($ret, $ar) = GetOptionsFromArray(\@arg,
192 'dest=s' => \$param->{dest},
193 'source=s' => \$param->{source},
194 'verbose' => \$param->{verbose},
195 'limit=i' => \$param->{limit},
196 'maxsnap=i' => \$param->{maxsnap},
197 'name=s' => \$param->{name},
198 'skip' => \$param->{skip},
199 'method=s' => \$param->{method});
200
201 if ($ret == 0) {
202 die "can't parse options\n";
203 }
204
205 $param->{name} = "default" if !$param->{name};
206 $param->{maxsnap} = 1 if !$param->{maxsnap};
207 $param->{method} = "ssh" if !$param->{method};
208
209 return $param;
210 }
211
212 sub add_state_to_job {
213 my ($job) = @_;
214
215 my $states = read_state();
216 my $state = $states->{$job->{source}}->{$job->{name}};
217
218 $job->{state} = $state->{state};
219 $job->{lsync} = $state->{lsync};
220 $job->{vm_type} = $state->{vm_type};
221
222 for (my $i = 0; $state->{"snap$i"}; $i++) {
223 $job->{"snap$i"} = $state->{"snap$i"};
224 }
225
226 return $job;
227 }
228
229 sub encode_cron {
230 my (@text) = @_;
231
232 my $cfg = {};
233
234 while (my $line = shift(@text)) {
235
236 my @arg = split('\s', $line);
237 my $param = parse_argv(@arg);
238
239 if ($param->{source} && $param->{dest}) {
240 $cfg->{$param->{source}}->{$param->{name}}->{dest} = $param->{dest};
241 $cfg->{$param->{source}}->{$param->{name}}->{verbose} = $param->{verbose};
242 $cfg->{$param->{source}}->{$param->{name}}->{limit} = $param->{limit};
243 $cfg->{$param->{source}}->{$param->{name}}->{maxsnap} = $param->{maxsnap};
244 $cfg->{$param->{source}}->{$param->{name}}->{skip} = $param->{skip};
245 $cfg->{$param->{source}}->{$param->{name}}->{method} = $param->{method};
246 }
247 }
248
249 return $cfg;
250 }
251
252 sub param_to_job {
253 my ($param) = @_;
254
255 my $job = {};
256
257 my $source = parse_target($param->{source});
258 my $dest = parse_target($param->{dest}) if $param->{dest};
259
260 $job->{name} = !$param->{name} ? "default" : $param->{name};
261 $job->{dest} = $param->{dest} if $param->{dest};
262 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
263 $job->{method} = "ssh" if !$job->{method};
264 $job->{limit} = $param->{limit};
265 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
266 $job->{source} = $param->{source};
267
268 return $job;
269 }
270
271 sub read_state {
272
273 if (!-e $STATE) {
274 make_path $CONFIG_PATH;
275 my $new_fh = IO::File->new("> $STATE");
276 die "Could not create $STATE: $!\n" if !$new_fh;
277 print $new_fh "{}";
278 close($new_fh);
279 return undef;
280 }
281
282 my $fh = IO::File->new("< $STATE");
283 die "Could not open file $STATE: $!\n" if !$fh;
284
285 my $text = <$fh>;
286 my $states = decode_json($text);
287
288 close($fh);
289
290 return $states;
291 }
292
293 sub update_state {
294 my ($job) = @_;
295 my $text;
296 my $in_fh;
297
298 eval {
299
300 $in_fh = IO::File->new("< $STATE");
301 die "Could not open file $STATE: $!\n" if !$in_fh;
302 lock($in_fh);
303 $text = <$in_fh>;
304 };
305
306 my $out_fh = IO::File->new("> $STATE.new");
307 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
308
309 my $states = {};
310 my $state = {};
311 if ($text){
312 $states = decode_json($text);
313 $state = $states->{$job->{source}}->{$job->{name}};
314 }
315
316 if ($job->{state} ne "del") {
317 $state->{state} = $job->{state};
318 $state->{lsync} = $job->{lsync};
319 $state->{vm_type} = $job->{vm_type};
320
321 for (my $i = 0; $job->{"snap$i"} ; $i++) {
322 $state->{"snap$i"} = $job->{"snap$i"};
323 }
324 $states->{$job->{source}}->{$job->{name}} = $state;
325 } else {
326
327 delete $states->{$job->{source}}->{$job->{name}};
328 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
329 }
330
331 $text = encode_json($states);
332 print $out_fh $text;
333
334 close($out_fh);
335 move("$STATE.new", $STATE);
336 eval {
337 close($in_fh);
338 };
339
340 return $states;
341 }
342
343 sub update_cron {
344 my ($job) = @_;
345
346 my $updated;
347 my $has_header;
348 my $line_no = 0;
349 my $text = "";
350 my $header = "SHELL=/bin/sh\n";
351 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
352
353 my $fh = IO::File->new("< $CRONJOBS");
354 die "Could not open file $CRONJOBS: $!\n" if !$fh;
355 lock($fh);
356
357 my @test = <$fh>;
358
359 while (my $line = shift(@test)) {
360 chomp($line);
361 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
362 $updated = 1;
363 next if $job->{state} eq "del";
364 $text .= format_job($job, $line);
365 } else {
366 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
367 $has_header = 1;
368 }
369 $text .= "$line\n";
370 }
371 $line_no++;
372 }
373
374 if (!$has_header) {
375 $text = "$header$text";
376 }
377
378 if (!$updated) {
379 $text .= format_job($job);
380 }
381 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
382 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
383
384 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
385 close ($new_fh);
386
387 die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
388 close ($fh);
389 }
390
391 sub format_job {
392 my ($job, $line) = @_;
393 my $text = "";
394
395 if ($job->{state} eq "stopped") {
396 $text = "#";
397 }
398 if ($line) {
399 $line =~ /^#*(.+) root/;
400 $text .= $1;
401 } else {
402 $text .= "*/$INTERVAL * * * *";
403 }
404 $text .= " root";
405 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
406 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
407 $text .= " --limit $job->{limit}" if $job->{limit};
408 $text .= " --method $job->{method}";
409 $text .= " --verbose" if $job->{verbose};
410 $text .= "\n";
411
412 return $text;
413 }
414
415 sub list {
416
417 my $cfg = read_cron();
418
419 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
420
421 my $states = read_state();
422 foreach my $source (sort keys%{$cfg}) {
423 foreach my $name (sort keys%{$cfg->{$source}}) {
424 $list .= sprintf("%-25s", cut_target_width($source, 25));
425 $list .= sprintf("%-25s", cut_target_width($name, 25));
426 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
427 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
428 $list .= sprintf("%-6s", $states->{$source}->{$name}->{vm_type});
429 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
430 }
431 }
432
433 return $list;
434 }
435
436 sub vm_exists {
437 my ($target) = @_;
438
439 my @cmd = ('ssh', "root\@$target->{ip}", '--') if $target->{ip};
440
441 my $res = undef;
442
443 eval { $res = run_cmd([@cmd, 'ls', "$QEMU_CONF/$target->{vmid}.conf"]) };
444
445 return "qemu" if $res;
446
447 eval { $res = run_cmd([@cmd, 'ls', "$LXC_CONF/$target->{vmid}.conf"]) };
448
449 return "lxc" if $res;
450
451 return undef;
452 }
453
454 sub init {
455 my ($param) = @_;
456
457 my $cfg = read_cron();
458
459 my $job = param_to_job($param);
460
461 $job->{state} = "ok";
462 $job->{lsync} = 0;
463
464 my $source = parse_target($param->{source});
465 my $dest = parse_target($param->{dest});
466
467 if (my $ip = $dest->{ip}) {
468 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
469 }
470
471 if (my $ip = $source->{ip}) {
472 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
473 }
474
475 die "Pool $dest->{all} does not exists\n" if check_pool_exists($dest);
476
477 my $check = check_pool_exists($source->{path}, $source->{ip}) if !$source->{vmid} && $source->{path};
478
479 die "Pool $source->{path} does not exists\n" if undef($check);
480
481 my $vm_type = vm_exists($source);
482 $job->{vm_type} = $vm_type;
483 $source->{vm_type} = $vm_type;
484
485 die "VM $source->{vmid} doesn't exist\n" if $param->{vmid} && !$vm_type;
486
487 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
488
489 #check if vm has zfs disks if not die;
490 get_disks($source, 1) if $source->{vmid};
491
492 update_cron($job);
493 update_state($job);
494
495 eval {
496 sync($param) if !$param->{skip};
497 };
498 if(my $err = $@) {
499 destroy_job($param);
500 print $err;
501 }
502 }
503
504 sub get_job {
505 my ($param) = @_;
506
507 my $cfg = read_cron();
508
509 if (!$cfg->{$param->{source}}->{$param->{name}}) {
510 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
511 }
512 my $job = $cfg->{$param->{source}}->{$param->{name}};
513 $job->{name} = $param->{name};
514 $job->{source} = $param->{source};
515 $job = add_state_to_job($job);
516
517 return $job;
518 }
519
520 sub destroy_job {
521 my ($param) = @_;
522
523 my $job = get_job($param);
524 $job->{state} = "del";
525
526 update_cron($job);
527 update_state($job);
528 }
529
530 sub sync {
531 my ($param) = @_;
532
533 my $lock_fh = IO::File->new("> $LOCKFILE");
534 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
535 lock($lock_fh);
536
537 my $date = get_date();
538 my $job;
539 eval {
540 $job = get_job($param);
541 };
542
543 if ($job && $job->{state} eq "syncing") {
544 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
545 }
546
547 my $dest = parse_target($param->{dest});
548 my $source = parse_target($param->{source});
549
550 my $sync_path = sub {
551 my ($source, $dest, $job, $param, $date) = @_;
552
553 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name});
554
555 snapshot_add($source, $dest, $param->{name}, $date);
556
557 send_image($source, $dest, $param);
558
559 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}) if ($source->{destroy} && $source->{old_snap});
560
561 };
562
563 my $vm_type = vm_exists($source);
564 $source->{vm_type} = $vm_type;
565
566 if ($job) {
567 $job->{state} = "syncing";
568 $job->{vm_type} = $vm_type if !$job->{vm_type};
569 update_state($job);
570 }
571
572 eval{
573 if ($source->{vmid}) {
574 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
575 my $disks = get_disks($source);
576
577 foreach my $disk (sort keys %{$disks}) {
578 $source->{all} = $disks->{$disk}->{all};
579 $source->{pool} = $disks->{$disk}->{pool};
580 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
581 $source->{last_part} = $disks->{$disk}->{last_part};
582 &$sync_path($source, $dest, $job, $param, $date);
583 }
584 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
585 send_config($source, $dest,'ssh');
586 } else {
587 send_config($source, $dest,'local');
588 }
589 } else {
590 &$sync_path($source, $dest, $job, $param, $date);
591 }
592 };
593 if(my $err = $@) {
594 if ($job) {
595 $job->{state} = "error";
596 update_state($job);
597 unlock($lock_fh);
598 close($lock_fh);
599 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
600 }
601 die "$err\n";
602 }
603
604 if ($job) {
605 $job->{state} = "ok";
606 $job->{lsync} = $date;
607 update_state($job);
608 }
609
610 unlock($lock_fh);
611 close($lock_fh);
612 }
613
614 sub snapshot_get{
615 my ($source, $dest, $max_snap, $name) = @_;
616
617 my $cmd = [];
618 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
619 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
620 push @$cmd, $source->{all};
621
622 my $raw = run_cmd($cmd);
623 my $index = 0;
624 my $line = "";
625 my $last_snap = undef;
626 my $old_snap;
627
628 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
629 $line = $1;
630 if ($line =~ m/(rep_$name.*)$/) {
631
632 $last_snap = $1 if (!$last_snap);
633 $old_snap = $1;
634 $index++;
635 if ($index == $max_snap) {
636 $source->{destroy} = 1;
637 last;
638 };
639 }
640 }
641
642 return ($old_snap, $last_snap) if $last_snap;
643
644 return undef;
645 }
646
647 sub snapshot_add {
648 my ($source, $dest, $name, $date) = @_;
649
650 my $snap_name = "rep_$name\_".$date;
651
652 $source->{new_snap} = $snap_name;
653
654 my $path = "$source->{all}\@$snap_name";
655
656 my $cmd = [];
657 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
658 push @$cmd, 'zfs', 'snapshot', $path;
659 eval{
660 run_cmd($cmd);
661 };
662
663 if (my $err = $@) {
664 snapshot_destroy($source, $dest, 'ssh', $snap_name);
665 die "$err\n";
666 }
667 }
668
669 sub write_cron {
670 my ($cfg) = @_;
671
672 my $text = "SHELL=/bin/sh\n";
673 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
674
675 my $fh = IO::File->new("> $CRONJOBS");
676 die "Could not open file: $!\n" if !$fh;
677
678 foreach my $source (sort keys%{$cfg}) {
679 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
680 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
681 $text .= "$PROG_PATH sync";
682 $text .= " -source ";
683 if ($cfg->{$source}->{$sync_name}->{vmid}) {
684 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
685 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
686 } else {
687 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
688 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
689 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
690 }
691 $text .= " -dest ";
692 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
693 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
694 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
695 $text .= " -name $sync_name ";
696 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
697 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
698 $text .= "\n";
699 }
700 }
701 die "Can't write to cron\n" if (!print($fh $text));
702 close($fh);
703 }
704
705 sub get_disks {
706 my ($target, $get_err) = @_;
707
708 my $cmd = [];
709 push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
710
711 if ($target->{vm_type} eq 'qemu') {
712 push @$cmd, 'qm', 'config', $target->{vmid};
713 } elsif ($target->{vm_type} eq 'lxc') {
714 push @$cmd, 'pct', 'config', $target->{vmid};
715 } else {
716 die "VM Type unknown\n";
717 }
718
719 my $res = run_cmd($cmd);
720
721 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $get_err);
722
723 return $disks;
724 }
725
726 sub run_cmd {
727 my ($cmd) = @_;
728 print "Start CMD\n" if $DEBUG;
729 print Dumper $cmd if $DEBUG;
730 if (ref($cmd) eq 'ARRAY') {
731 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
732 }
733 my $output = `$cmd 2>&1`;
734
735 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
736
737 chomp($output);
738 print Dumper $output if $DEBUG;
739 print "END CMD\n" if $DEBUG;
740 return $output;
741 }
742
743 sub parse_disks {
744 my ($text, $ip, $vm_type, $get_err) = @_;
745
746 my $disks;
747
748 my $num = 0;
749 while ($text && $text =~ s/^(.*?)(\n|$)//) {
750 my $line = $1;
751 my $error = $vm_type eq 'qemu' ? 1 : 0 ;
752
753 next if $line =~ /cdrom|none/;
754 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
755
756 #QEMU if backup is not set include in sync
757 next if $vm_type eq 'qemu && ($line =~ m/backup=(?i:0|no|off|false)/)';
758
759 #LXC if backup is not set do no in sync
760 $error = ($line =~ m/backup=(?i:1|yes|on|true)/) if $vm_type eq 'lxc';
761
762 my $disk = undef;
763 my $stor = undef;
764 if($line =~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): ([^:]+:)([A-Za-z0-9\-]+),(.*)$/) {
765 $disk = $3;
766 $stor = $2;
767 } else {
768 print "Disk: \"$line\" will not include in pve-sync\n" if $get_err || $error;
769 next;
770 }
771
772 my $cmd = [];
773 push @$cmd, 'ssh', "root\@$ip", '--' if $ip;
774 push @$cmd, 'pvesm', 'path', "$stor$disk";
775 my $path = run_cmd($cmd);
776
777 die "Get no path from pvesm path $stor$disk\n" if !$path;
778
779 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
780
781 my @array = split('/', $1);
782 $disks->{$num}->{pool} = shift(@array);
783 $disks->{$num}->{all} = $disks->{$num}->{pool};
784 if (0 < @array) {
785 $disks->{$num}->{path} = join('/', @array);
786 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
787 }
788 $disks->{$num}->{last_part} = $disk;
789 $disks->{$num}->{all} .= "\/$disk";
790
791 $num++;
792 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
793
794 $disks->{$num}->{pool} = $1;
795 $disks->{$num}->{all} = $disks->{$num}->{pool};
796
797 if ($2) {
798 $disks->{$num}->{path} = $3;
799 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
800 }
801
802 $disks->{$num}->{last_part} = $disk;
803 $disks->{$num}->{all} .= "\/$disk";
804
805 $num++;
806
807 } else {
808 die "ERROR: in path\n";
809 }
810 }
811
812 die "Vm include no disk on zfs.\n" if !$disks->{0};
813 return $disks;
814 }
815
816 sub snapshot_destroy {
817 my ($source, $dest, $method, $snap) = @_;
818
819 my @zfscmd = ('zfs', 'destroy');
820 my $snapshot = "$source->{all}\@$snap";
821
822 eval {
823 if($source->{ip} && $method eq 'ssh'){
824 run_cmd(['ssh', "root\@$source->{ip}", '--', @zfscmd, $snapshot]);
825 } else {
826 run_cmd([@zfscmd, $snapshot]);
827 }
828 };
829 if (my $erro = $@) {
830 warn "WARN: $erro";
831 }
832 if ($dest) {
833 my @ssh = $dest->{ip} ? ('ssh', "root\@$dest->{ip}", '--') : ();
834
835 my $path = "$dest->{all}\/$source->{last_part}";
836
837 eval {
838 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
839 };
840 if (my $erro = $@) {
841 warn "WARN: $erro";
842 }
843 }
844 }
845
846 sub snapshot_exist {
847 my ($source , $dest, $method) = @_;
848
849 my $cmd = [];
850 push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
851 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
852 push @$cmd, "$dest->{all}/$source->{last_part}\@$source->{old_snap}";
853
854 my $text = "";
855 eval {$text =run_cmd($cmd);};
856 if (my $erro =$@) {
857 warn "WARN: $erro";
858 return undef;
859 }
860
861 while ($text && $text =~ s/^(.*?)(\n|$)//) {
862 my $line =$1;
863 return 1 if $line =~ m/^.*$source->{old_snap}$/;
864 }
865 }
866
867 sub send_image {
868 my ($source, $dest, $param) = @_;
869
870 my $cmd = [];
871
872 push @$cmd, 'ssh', "root\@$source->{ip}", '--' if $source->{ip};
873 push @$cmd, 'zfs', 'send';
874 push @$cmd, '-v' if $param->{verbose};
875
876 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method})) {
877 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
878 }
879 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
880
881 if ($param->{limit}){
882 my $bwl = $param->{limit}*1024;
883 push @$cmd, \'|', 'cstream', '-t', $bwl;
884 }
885 my $target = "$dest->{all}/$source->{last_part}";
886 $target =~ s!/+!/!g;
887
888 push @$cmd, \'|';
889 push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
890 push @$cmd, 'zfs', 'recv', '-F', '--';
891 push @$cmd, "$target";
892
893 eval {
894 run_cmd($cmd)
895 };
896
897 if (my $erro = $@) {
898 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap});
899 die $erro;
900 };
901 }
902
903
904 sub send_config{
905 my ($source, $dest, $method) = @_;
906
907 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
908 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
909
910 my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
911
912 $dest_target_new = $config_dir.'/'.$dest_target_new;
913
914 if ($method eq 'ssh'){
915 if ($dest->{ip} && $source->{ip}) {
916 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
917 run_cmd(['scp', '--', "root\@[$source->{ip}]:$source_target", "root\@[$dest->{ip}]:$dest_target_new"]);
918 } elsif ($dest->{ip}) {
919 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
920 run_cmd(['scp', '--', $source_target, "root\@[$dest->{ip}]:$dest_target_new"]);
921 } elsif ($source->{ip}) {
922 run_cmd(['mkdir', '-p', '--', $config_dir]);
923 run_cmd(['scp', '--', "root\@$source->{ip}:$source_target", $dest_target_new]);
924 }
925
926 if ($source->{destroy}){
927 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
928 if($dest->{ip}){
929 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
930 } else {
931 run_cmd(['rm', '-f', '--', $dest_target_old]);
932 }
933 }
934 } elsif ($method eq 'local') {
935 run_cmd(['mkdir', '-p', '--', $config_dir]);
936 run_cmd(['cp', $source_target, $dest_target_new]);
937 }
938 }
939
940 sub get_date {
941 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
942 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
943
944 return $datestamp;
945 }
946
947 sub status {
948 my $cfg = read_cron();
949
950 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
951
952 my $states = read_state();
953
954 foreach my $source (sort keys%{$cfg}) {
955 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
956 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
957 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
958 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
959 }
960 }
961
962 return $status_list;
963 }
964
965 sub enable_job {
966 my ($param) = @_;
967
968 my $job = get_job($param);
969 $job->{state} = "ok";
970 update_state($job);
971 update_cron($job);
972 }
973
974 sub disable_job {
975 my ($param) = @_;
976
977 my $job = get_job($param);
978 $job->{state} = "stopped";
979 update_state($job);
980 update_cron($job);
981 }
982
983 my $command = $ARGV[0];
984
985 my $commands = {'destroy' => 1,
986 'create' => 1,
987 'sync' => 1,
988 'list' => 1,
989 'status' => 1,
990 'help' => 1,
991 'enable' => 1,
992 'disable' => 1};
993
994 if (!$command || !$commands->{$command}) {
995 usage();
996 die "\n";
997 }
998
999 my $help_sync = "$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1000 \twill sync one time\n
1001 \t-dest\tstring\n
1002 \t\tthe destination target is like [IP:]<Pool>[/Path]\n
1003 \t-limit\tinteger\n
1004 \t\tmax sync speed in kBytes/s, default unlimited\n
1005 \t-maxsnap\tinteger\n
1006 \t\thow much snapshots will be kept before get erased, default 1/n
1007 \t-name\tstring\n
1008 \t\tname of the sync job, if not set it is default.
1009 \tIt is only necessary if scheduler allready contains this source.\n
1010 \t-source\tstring\n
1011 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1012
1013 my $help_create = "$PROGNAME create -dest <string> -source <string> [OPTIONS]/n
1014 \tCreate a sync Job\n
1015 \t-dest\tstring\n
1016 \t\tthe destination target is like [IP]:<Pool>[/Path]\n
1017 \t-limit\tinteger\n
1018 \t\tmax sync speed in kBytes/s, default unlimited\n
1019 \t-maxsnap\tstring\n
1020 \t\thow much snapshots will be kept before get erased, default 1\n
1021 \t-name\tstring\n
1022 \t\tname of the sync job, if not set it is default\n
1023 \t-skip\tboolean\n
1024 \t\tif this flag is set it will skip the first sync\n
1025 \t-source\tstring\n
1026 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1027
1028 my $help_destroy = "$PROGNAME destroy -source <string> [OPTIONS]\n
1029 \tremove a sync Job from the scheduler\n
1030 \t-name\tstring\n
1031 \t\tname of the sync job, if not set it is default\n
1032 \t-source\tstring\n
1033 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1034
1035 my $help_help = "$PROGNAME help <cmd> [OPTIONS]\n
1036 \tGet help about specified command.\n
1037 \t<cmd>\tstring\n
1038 \t\tCommand name\n
1039 \t-verbose\tboolean\n
1040 \t\tVerbose output format.\n";
1041
1042 my $help_list = "$PROGNAME list\n
1043 \tGet a List of all scheduled Sync Jobs\n";
1044
1045 my $help_status = "$PROGNAME status\n
1046 \tGet the status of all scheduled Sync Jobs\n";
1047
1048 my $help_enable = "$PROGNAME enable -source <string> [OPTIONS]\n
1049 \tenable a syncjob and reset error\n
1050 \t-name\tstring\n
1051 \t\tname of the sync job, if not set it is default\n
1052 \t-source\tstring\n
1053 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1054
1055 my $help_disable = "$PROGNAME disable -source <string> [OPTIONS]\n
1056 \tpause a syncjob\n
1057 \t-name\tstring\n
1058 \t\tname of the sync job, if not set it is default\n
1059 \t-source\tstring\n
1060 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1061
1062 sub help {
1063 my ($command) = @_;
1064
1065 switch($command){
1066 case 'help'
1067 {
1068 die "$help_help\n";
1069 }
1070 case 'sync'
1071 {
1072 die "$help_sync\n";
1073 }
1074 case 'destroy'
1075 {
1076 die "$help_destroy\n";
1077 }
1078 case 'create'
1079 {
1080 die "$help_create\n";
1081 }
1082 case 'list'
1083 {
1084 die "$help_list\n";
1085 }
1086 case 'status'
1087 {
1088 die "$help_status\n";
1089 }
1090 case 'enable'
1091 {
1092 die "$help_enable\n";
1093 }
1094 case 'disable'
1095 {
1096 die "$help_enable\n";
1097 }
1098 }
1099
1100 }
1101
1102 my @arg = @ARGV;
1103 my $param = parse_argv(@arg);
1104
1105
1106 switch($command) {
1107 case "destroy"
1108 {
1109 die "$help_destroy\n" if !$param->{source};
1110 check_target($param->{source});
1111 destroy_job($param);
1112 }
1113 case "sync"
1114 {
1115 die "$help_sync\n" if !$param->{source} || !$param->{dest};
1116 check_target($param->{source});
1117 check_target($param->{dest});
1118 sync($param);
1119 }
1120 case "create"
1121 {
1122 die "$help_create\n" if !$param->{source} || !$param->{dest};
1123 check_target($param->{source});
1124 check_target($param->{dest});
1125 init($param);
1126 }
1127 case "status"
1128 {
1129 print status();
1130 }
1131 case "list"
1132 {
1133 print list();
1134 }
1135 case "help"
1136 {
1137 my $help_command = $ARGV[1];
1138 if ($help_command && $commands->{$help_command}) {
1139 print help($help_command);
1140 }
1141 if ($param->{verbose} == 1){
1142 exec("man $PROGNAME");
1143 } else {
1144 usage(1);
1145 }
1146 }
1147 case "enable"
1148 {
1149 die "$help_enable\n" if !$param->{source};
1150 check_target($param->{source});
1151 enable_job($param);
1152 }
1153 case "disable"
1154 {
1155 die "$help_disable\n" if !$param->{source};
1156 check_target($param->{source});
1157 disable_job($param);
1158 }
1159 }
1160
1161 sub usage {
1162 my ($help) = @_;
1163
1164 print("ERROR:\tno command specified\n") if !$help;
1165 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1166 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1167 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1168 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1169 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1170 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1171 print("\t$PROGNAME list\n");
1172 print("\t$PROGNAME status\n");
1173 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1174 }
1175
1176 sub check_target {
1177 my ($target) = @_;
1178 parse_target($target);
1179 }
1180
1181 __END__
1182
1183 =head1 NAME
1184
1185 pve-zsync - PVE ZFS Replication Manager
1186
1187 =head1 SYNOPSIS
1188
1189 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1190
1191 pve-zsync help <cmd> [OPTIONS]
1192
1193 Get help about specified command.
1194
1195 <cmd> string
1196
1197 Command name
1198
1199 -verbose boolean
1200
1201 Verbose output format.
1202
1203 pve-zsync create -dest <string> -source <string> [OPTIONS]
1204
1205 Create a sync Job
1206
1207 -dest string
1208
1209 the destination target is like [IP]:<Pool>[/Path]
1210
1211 -limit integer
1212
1213 max sync speed in kBytes/s, default unlimited
1214
1215 -maxsnap string
1216
1217 how much snapshots will be kept before get erased, default 1
1218
1219 -name string
1220
1221 name of the sync job, if not set it is default
1222
1223 -skip boolean
1224
1225 if this flag is set it will skip the first sync
1226
1227 -source string
1228
1229 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1230
1231 pve-zsync destroy -source <string> [OPTIONS]
1232
1233 remove a sync Job from the scheduler
1234
1235 -name string
1236
1237 name of the sync job, if not set it is default
1238
1239 -source string
1240
1241 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1242
1243 pve-zsync disable -source <string> [OPTIONS]
1244
1245 pause a sync job
1246
1247 -name string
1248
1249 name of the sync job, if not set it is default
1250
1251 -source string
1252
1253 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1254
1255 pve-zsync enable -source <string> [OPTIONS]
1256
1257 enable a syncjob and reset error
1258
1259 -name string
1260
1261 name of the sync job, if not set it is default
1262
1263 -source string
1264
1265 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1266 pve-zsync list
1267
1268 Get a List of all scheduled Sync Jobs
1269
1270 pve-zsync status
1271
1272 Get the status of all scheduled Sync Jobs
1273
1274 pve-zsync sync -dest <string> -source <string> [OPTIONS]
1275
1276 will sync one time
1277
1278 -dest string
1279
1280 the destination target is like [IP:]<Pool>[/Path]
1281
1282 -limit integer
1283
1284 max sync speed in kBytes/s, default unlimited
1285
1286 -maxsnap integer
1287
1288 how much snapshots will be kept before get erased, default 1
1289
1290 -name string
1291
1292 name of the sync job, if not set it is default.
1293 It is only necessary if scheduler allready contains this source.
1294
1295 -source string
1296
1297 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1298
1299 =head1 DESCRIPTION
1300
1301 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1302 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1303 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1304 To config cron see man crontab.
1305
1306 =head2 PVE ZFS Storage sync Tool
1307
1308 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1309
1310 =head1 EXAMPLES
1311
1312 add sync job from local VM to remote ZFS Server
1313 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1314
1315 =head1 IMPORTANT FILES
1316
1317 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1318
1319 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1320
1321 =head1 COPYRIGHT AND DISCLAIMER
1322
1323 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1324
1325 This program is free software: you can redistribute it and/or modify it
1326 under the terms of the GNU Affero General Public License as published
1327 by the Free Software Foundation, either version 3 of the License, or
1328 (at your option) any later version.
1329
1330 This program is distributed in the hope that it will be useful, but
1331 WITHOUT ANY WARRANTY; without even the implied warranty of
1332 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1333 Affero General Public License for more details.
1334
1335 You should have received a copy of the GNU Affero General Public
1336 License along with this program. If not, see
1337 <http://www.gnu.org/licenses/>.