]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
Add ssh BatchMode.
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Copy qw(move);
9 use File::Path qw(make_path);
10 use JSON;
11 use IO::File;
12 use String::ShellQuote 'shell_quote';
13
14 my $PROGNAME = "pve-zsync";
15 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
16 my $STATE = "${CONFIG_PATH}/sync_state";
17 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
18 my $PATH = "/usr/sbin";
19 my $PVE_DIR = "/etc/pve/local";
20 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
21 my $LXC_CONF = "${PVE_DIR}/lxc";
22 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
23 my $PROG_PATH = "$PATH/${PROGNAME}";
24 my $INTERVAL = 15;
25 my $DEBUG = 0;
26
27 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
28 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
29 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
30 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
31
32 my $IPV6RE = "(?:" .
33 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
34 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
35 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
36 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
41 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
42
43 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
44 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
45 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
46 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
47 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
48
49 check_bin ('cstream');
50 check_bin ('zfs');
51 check_bin ('ssh');
52 check_bin ('scp');
53
54 sub check_bin {
55 my ($bin) = @_;
56
57 foreach my $p (split (/:/, $ENV{PATH})) {
58 my $fn = "$p/$bin";
59 if (-x $fn) {
60 return $fn;
61 }
62 }
63
64 die "unable to find command '$bin'\n";
65 }
66
67 sub cut_target_width {
68 my ($target, $max) = @_;
69
70 return $target if (length($target) <= $max);
71 my @spl = split('/', $target);
72
73 my $count = length($spl[@spl-1]);
74 return "..\/".substr($spl[@spl-1],($count-$max)+3 , $count) if $count > $max;
75
76 $count += length($spl[0]) if @spl > 1;
77 return substr($spl[0], 0, $max-4-length($spl[@spl-1]))."\/..\/".$spl[@spl-1] if $count > $max;
78
79 my $rest = 1;
80 $rest = $max-$count if ($max-$count > 0);
81
82 return "$spl[0]".substr($target, length($spl[0]), $rest)."..\/".$spl[@spl-1];
83 }
84
85 sub lock {
86 my ($fh) = @_;
87 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
88 }
89
90 sub unlock {
91 my ($fh) = @_;
92 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
93 }
94
95 sub get_status {
96 my ($source, $name, $status) = @_;
97
98 if ($status->{$source->{all}}->{$name}->{status}) {
99 return $status;
100 }
101
102 return undef;
103 }
104
105 sub check_pool_exists {
106 my ($target) = @_;
107
108 my $cmd = [];
109
110 if ($target->{ip}) {
111 push @$cmd, 'ssh', "root\@$target->{ip}", '--';
112 }
113 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
114 eval {
115 run_cmd($cmd);
116 };
117
118 if ($@) {
119 return 0;
120 }
121 return 1;
122 }
123
124 sub parse_target {
125 my ($text) = @_;
126
127 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
128 my $target = {};
129
130 if ($text !~ $TARGETRE) {
131 die "$errstr\n";
132 }
133 $target->{all} = $2;
134 $target->{ip} = $1 if $1;
135 my @parts = split('/', $2);
136
137 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
138
139 my $pool = $target->{pool} = shift(@parts);
140 die "$errstr\n" if !$pool;
141
142 if ($pool =~ m/^\d+$/) {
143 $target->{vmid} = $pool;
144 delete $target->{pool};
145 }
146
147 return $target if (@parts == 0);
148 $target->{last_part} = pop(@parts);
149
150 if ($target->{ip}) {
151 pop(@parts);
152 }
153 if (@parts > 0) {
154 $target->{path} = join('/', @parts);
155 }
156
157 return $target;
158 }
159
160 sub read_cron {
161
162 #This is for the first use to init file;
163 if (!-e $CRONJOBS) {
164 my $new_fh = IO::File->new("> $CRONJOBS");
165 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
166 close($new_fh);
167 return undef;
168 }
169
170 my $fh = IO::File->new("< $CRONJOBS");
171 die "Could not open file $CRONJOBS: $!\n" if !$fh;
172
173 my @text = <$fh>;
174
175 close($fh);
176
177 return encode_cron(@text);
178 }
179
180 sub parse_argv {
181 my (@arg) = @_;
182
183 my $param = {};
184 $param->{dest} = undef;
185 $param->{source} = undef;
186 $param->{verbose} = undef;
187 $param->{limit} = undef;
188 $param->{maxsnap} = undef;
189 $param->{name} = undef;
190 $param->{skip} = undef;
191 $param->{method} = undef;
192
193 my ($ret, $ar) = GetOptionsFromArray(\@arg,
194 'dest=s' => \$param->{dest},
195 'source=s' => \$param->{source},
196 'verbose' => \$param->{verbose},
197 'limit=i' => \$param->{limit},
198 'maxsnap=i' => \$param->{maxsnap},
199 'name=s' => \$param->{name},
200 'skip' => \$param->{skip},
201 'method=s' => \$param->{method});
202
203 if ($ret == 0) {
204 die "can't parse options\n";
205 }
206
207 $param->{name} = "default" if !$param->{name};
208 $param->{maxsnap} = 1 if !$param->{maxsnap};
209 $param->{method} = "ssh" if !$param->{method};
210
211 return $param;
212 }
213
214 sub add_state_to_job {
215 my ($job) = @_;
216
217 my $states = read_state();
218 my $state = $states->{$job->{source}}->{$job->{name}};
219
220 $job->{state} = $state->{state};
221 $job->{lsync} = $state->{lsync};
222 $job->{vm_type} = $state->{vm_type};
223
224 for (my $i = 0; $state->{"snap$i"}; $i++) {
225 $job->{"snap$i"} = $state->{"snap$i"};
226 }
227
228 return $job;
229 }
230
231 sub encode_cron {
232 my (@text) = @_;
233
234 my $cfg = {};
235
236 while (my $line = shift(@text)) {
237
238 my @arg = split('\s', $line);
239 my $param = parse_argv(@arg);
240
241 if ($param->{source} && $param->{dest}) {
242 $cfg->{$param->{source}}->{$param->{name}}->{dest} = $param->{dest};
243 $cfg->{$param->{source}}->{$param->{name}}->{verbose} = $param->{verbose};
244 $cfg->{$param->{source}}->{$param->{name}}->{limit} = $param->{limit};
245 $cfg->{$param->{source}}->{$param->{name}}->{maxsnap} = $param->{maxsnap};
246 $cfg->{$param->{source}}->{$param->{name}}->{skip} = $param->{skip};
247 $cfg->{$param->{source}}->{$param->{name}}->{method} = $param->{method};
248 }
249 }
250
251 return $cfg;
252 }
253
254 sub param_to_job {
255 my ($param) = @_;
256
257 my $job = {};
258
259 my $source = parse_target($param->{source});
260 my $dest = parse_target($param->{dest}) if $param->{dest};
261
262 $job->{name} = !$param->{name} ? "default" : $param->{name};
263 $job->{dest} = $param->{dest} if $param->{dest};
264 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
265 $job->{method} = "ssh" if !$job->{method};
266 $job->{limit} = $param->{limit};
267 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
268 $job->{source} = $param->{source};
269
270 return $job;
271 }
272
273 sub read_state {
274
275 if (!-e $STATE) {
276 make_path $CONFIG_PATH;
277 my $new_fh = IO::File->new("> $STATE");
278 die "Could not create $STATE: $!\n" if !$new_fh;
279 print $new_fh "{}";
280 close($new_fh);
281 return undef;
282 }
283
284 my $fh = IO::File->new("< $STATE");
285 die "Could not open file $STATE: $!\n" if !$fh;
286
287 my $text = <$fh>;
288 my $states = decode_json($text);
289
290 close($fh);
291
292 return $states;
293 }
294
295 sub update_state {
296 my ($job) = @_;
297 my $text;
298 my $in_fh;
299
300 eval {
301
302 $in_fh = IO::File->new("< $STATE");
303 die "Could not open file $STATE: $!\n" if !$in_fh;
304 lock($in_fh);
305 $text = <$in_fh>;
306 };
307
308 my $out_fh = IO::File->new("> $STATE.new");
309 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
310
311 my $states = {};
312 my $state = {};
313 if ($text){
314 $states = decode_json($text);
315 $state = $states->{$job->{source}}->{$job->{name}};
316 }
317
318 if ($job->{state} ne "del") {
319 $state->{state} = $job->{state};
320 $state->{lsync} = $job->{lsync};
321 $state->{vm_type} = $job->{vm_type};
322
323 for (my $i = 0; $job->{"snap$i"} ; $i++) {
324 $state->{"snap$i"} = $job->{"snap$i"};
325 }
326 $states->{$job->{source}}->{$job->{name}} = $state;
327 } else {
328
329 delete $states->{$job->{source}}->{$job->{name}};
330 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
331 }
332
333 $text = encode_json($states);
334 print $out_fh $text;
335
336 close($out_fh);
337 move("$STATE.new", $STATE);
338 eval {
339 close($in_fh);
340 };
341
342 return $states;
343 }
344
345 sub update_cron {
346 my ($job) = @_;
347
348 my $updated;
349 my $has_header;
350 my $line_no = 0;
351 my $text = "";
352 my $header = "SHELL=/bin/sh\n";
353 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
354
355 my $fh = IO::File->new("< $CRONJOBS");
356 die "Could not open file $CRONJOBS: $!\n" if !$fh;
357 lock($fh);
358
359 my @test = <$fh>;
360
361 while (my $line = shift(@test)) {
362 chomp($line);
363 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
364 $updated = 1;
365 next if $job->{state} eq "del";
366 $text .= format_job($job, $line);
367 } else {
368 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
369 $has_header = 1;
370 }
371 $text .= "$line\n";
372 }
373 $line_no++;
374 }
375
376 if (!$has_header) {
377 $text = "$header$text";
378 }
379
380 if (!$updated) {
381 $text .= format_job($job);
382 }
383 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
384 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
385
386 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
387 close ($new_fh);
388
389 die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
390 close ($fh);
391 }
392
393 sub format_job {
394 my ($job, $line) = @_;
395 my $text = "";
396
397 if ($job->{state} eq "stopped") {
398 $text = "#";
399 }
400 if ($line) {
401 $line =~ /^#*(.+) root/;
402 $text .= $1;
403 } else {
404 $text .= "*/$INTERVAL * * * *";
405 }
406 $text .= " root";
407 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
408 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
409 $text .= " --limit $job->{limit}" if $job->{limit};
410 $text .= " --method $job->{method}";
411 $text .= " --verbose" if $job->{verbose};
412 $text .= "\n";
413
414 return $text;
415 }
416
417 sub list {
418
419 my $cfg = read_cron();
420
421 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
422
423 my $states = read_state();
424 foreach my $source (sort keys%{$cfg}) {
425 foreach my $name (sort keys%{$cfg->{$source}}) {
426 $list .= sprintf("%-25s", cut_target_width($source, 25));
427 $list .= sprintf("%-25s", cut_target_width($name, 25));
428 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
429 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
430 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
431 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
432 }
433 }
434
435 return $list;
436 }
437
438 sub vm_exists {
439 my ($target) = @_;
440
441 my @cmd = ('ssh', "root\@$target->{ip}", '--') if $target->{ip};
442
443 my $res = undef;
444
445 return undef if !defined($target->{vmid});
446
447 eval { $res = run_cmd([@cmd, 'ls', "$QEMU_CONF/$target->{vmid}.conf"]) };
448
449 return "qemu" if $res;
450
451 eval { $res = run_cmd([@cmd, 'ls', "$LXC_CONF/$target->{vmid}.conf"]) };
452
453 return "lxc" if $res;
454
455 return undef;
456 }
457
458 sub init {
459 my ($param) = @_;
460
461 my $cfg = read_cron();
462
463 my $job = param_to_job($param);
464
465 $job->{state} = "ok";
466 $job->{lsync} = 0;
467
468 my $source = parse_target($param->{source});
469 my $dest = parse_target($param->{dest});
470
471 if (my $ip = $dest->{ip}) {
472 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
473 }
474
475 if (my $ip = $source->{ip}) {
476 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
477 }
478
479 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest);
480
481 if (!defined($source->{vmid})) {
482 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source);
483 }
484
485 my $vm_type = vm_exists($source);
486 $job->{vm_type} = $vm_type;
487 $source->{vm_type} = $vm_type;
488
489 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
490
491 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
492
493 #check if vm has zfs disks if not die;
494 get_disks($source, 1) if $source->{vmid};
495
496 update_cron($job);
497 update_state($job);
498
499 eval {
500 sync($param) if !$param->{skip};
501 };
502 if(my $err = $@) {
503 destroy_job($param);
504 print $err;
505 }
506 }
507
508 sub get_job {
509 my ($param) = @_;
510
511 my $cfg = read_cron();
512
513 if (!$cfg->{$param->{source}}->{$param->{name}}) {
514 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
515 }
516 my $job = $cfg->{$param->{source}}->{$param->{name}};
517 $job->{name} = $param->{name};
518 $job->{source} = $param->{source};
519 $job = add_state_to_job($job);
520
521 return $job;
522 }
523
524 sub destroy_job {
525 my ($param) = @_;
526
527 my $job = get_job($param);
528 $job->{state} = "del";
529
530 update_cron($job);
531 update_state($job);
532 }
533
534 sub sync {
535 my ($param) = @_;
536
537 my $lock_fh = IO::File->new("> $LOCKFILE");
538 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
539 lock($lock_fh);
540
541 my $date = get_date();
542 my $job;
543 eval {
544 $job = get_job($param);
545 };
546
547 if ($job && $job->{state} eq "syncing") {
548 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
549 }
550
551 my $dest = parse_target($param->{dest});
552 my $source = parse_target($param->{source});
553
554 my $sync_path = sub {
555 my ($source, $dest, $job, $param, $date) = @_;
556
557 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name});
558
559 snapshot_add($source, $dest, $param->{name}, $date);
560
561 send_image($source, $dest, $param);
562
563 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}) if ($source->{destroy} && $source->{old_snap});
564
565 };
566
567 my $vm_type = vm_exists($source);
568 $source->{vm_type} = $vm_type;
569
570 if ($job) {
571 $job->{state} = "syncing";
572 $job->{vm_type} = $vm_type if !$job->{vm_type};
573 update_state($job);
574 }
575
576 eval{
577 if ($source->{vmid}) {
578 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
579 my $disks = get_disks($source);
580
581 foreach my $disk (sort keys %{$disks}) {
582 $source->{all} = $disks->{$disk}->{all};
583 $source->{pool} = $disks->{$disk}->{pool};
584 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
585 $source->{last_part} = $disks->{$disk}->{last_part};
586 &$sync_path($source, $dest, $job, $param, $date);
587 }
588 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
589 send_config($source, $dest,'ssh');
590 } else {
591 send_config($source, $dest,'local');
592 }
593 } else {
594 &$sync_path($source, $dest, $job, $param, $date);
595 }
596 };
597 if(my $err = $@) {
598 if ($job) {
599 $job->{state} = "error";
600 update_state($job);
601 unlock($lock_fh);
602 close($lock_fh);
603 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
604 }
605 die "$err\n";
606 }
607
608 if ($job) {
609 $job->{state} = "ok";
610 $job->{lsync} = $date;
611 update_state($job);
612 }
613
614 unlock($lock_fh);
615 close($lock_fh);
616 }
617
618 sub snapshot_get{
619 my ($source, $dest, $max_snap, $name) = @_;
620
621 my $cmd = [];
622 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
623 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
624 push @$cmd, $source->{all};
625
626 my $raw = run_cmd($cmd);
627 my $index = 0;
628 my $line = "";
629 my $last_snap = undef;
630 my $old_snap;
631
632 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
633 $line = $1;
634 if ($line =~ m/(rep_$name.*)$/) {
635
636 $last_snap = $1 if (!$last_snap);
637 $old_snap = $1;
638 $index++;
639 if ($index == $max_snap) {
640 $source->{destroy} = 1;
641 last;
642 };
643 }
644 }
645
646 return ($old_snap, $last_snap) if $last_snap;
647
648 return undef;
649 }
650
651 sub snapshot_add {
652 my ($source, $dest, $name, $date) = @_;
653
654 my $snap_name = "rep_$name\_".$date;
655
656 $source->{new_snap} = $snap_name;
657
658 my $path = "$source->{all}\@$snap_name";
659
660 my $cmd = [];
661 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
662 push @$cmd, 'zfs', 'snapshot', $path;
663 eval{
664 run_cmd($cmd);
665 };
666
667 if (my $err = $@) {
668 snapshot_destroy($source, $dest, 'ssh', $snap_name);
669 die "$err\n";
670 }
671 }
672
673 sub write_cron {
674 my ($cfg) = @_;
675
676 my $text = "SHELL=/bin/sh\n";
677 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
678
679 my $fh = IO::File->new("> $CRONJOBS");
680 die "Could not open file: $!\n" if !$fh;
681
682 foreach my $source (sort keys%{$cfg}) {
683 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
684 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
685 $text .= "$PROG_PATH sync";
686 $text .= " -source ";
687 if ($cfg->{$source}->{$sync_name}->{vmid}) {
688 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
689 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
690 } else {
691 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
692 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
693 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
694 }
695 $text .= " -dest ";
696 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
697 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
698 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
699 $text .= " -name $sync_name ";
700 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
701 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
702 $text .= "\n";
703 }
704 }
705 die "Can't write to cron\n" if (!print($fh $text));
706 close($fh);
707 }
708
709 sub get_disks {
710 my ($target, $get_err) = @_;
711
712 my $cmd = [];
713 push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
714
715 if ($target->{vm_type} eq 'qemu') {
716 push @$cmd, 'qm', 'config', $target->{vmid};
717 } elsif ($target->{vm_type} eq 'lxc') {
718 push @$cmd, 'pct', 'config', $target->{vmid};
719 } else {
720 die "VM Type unknown\n";
721 }
722
723 my $res = run_cmd($cmd);
724
725 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $get_err);
726
727 return $disks;
728 }
729
730 sub run_cmd {
731 my ($cmd) = @_;
732 print "Start CMD\n" if $DEBUG;
733 print Dumper $cmd if $DEBUG;
734 if (ref($cmd) eq 'ARRAY') {
735 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
736 }
737 my $output = `$cmd 2>&1`;
738
739 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
740
741 chomp($output);
742 print Dumper $output if $DEBUG;
743 print "END CMD\n" if $DEBUG;
744 return $output;
745 }
746
747 sub parse_disks {
748 my ($text, $ip, $vm_type, $get_err) = @_;
749
750 my $disks;
751
752 my $num = 0;
753 while ($text && $text =~ s/^(.*?)(\n|$)//) {
754 my $line = $1;
755 my $error = $vm_type eq 'qemu' ? 1 : 0 ;
756
757 next if $line =~ /cdrom|none/;
758 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
759
760 #QEMU if backup is not set include in sync
761 next if $vm_type eq 'qemu && ($line =~ m/backup=(?i:0|no|off|false)/)';
762
763 #LXC if backup is not set do no in sync
764 $error = ($line =~ m/backup=(?i:1|yes|on|true)/) if $vm_type eq 'lxc';
765
766 my $disk = undef;
767 my $stor = undef;
768 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
769 my @parameter = split(/,/,$1);
770
771 foreach my $opt (@parameter) {
772 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
773 $disk = $2;
774 $stor = $1;
775 last;
776 }
777 }
778
779 } else {
780 print "Disk: \"$line\" will not include in pve-sync\n" if $get_err || $error;
781 next;
782 }
783
784 my $cmd = [];
785 push @$cmd, 'ssh', "root\@$ip", '--' if $ip;
786 push @$cmd, 'pvesm', 'path', "$stor$disk";
787 my $path = run_cmd($cmd);
788
789 die "Get no path from pvesm path $stor$disk\n" if !$path;
790
791 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
792
793 my @array = split('/', $1);
794 $disks->{$num}->{pool} = shift(@array);
795 $disks->{$num}->{all} = $disks->{$num}->{pool};
796 if (0 < @array) {
797 $disks->{$num}->{path} = join('/', @array);
798 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
799 }
800 $disks->{$num}->{last_part} = $disk;
801 $disks->{$num}->{all} .= "\/$disk";
802
803 $num++;
804 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
805
806 $disks->{$num}->{pool} = $1;
807 $disks->{$num}->{all} = $disks->{$num}->{pool};
808
809 if ($2) {
810 $disks->{$num}->{path} = $3;
811 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
812 }
813
814 $disks->{$num}->{last_part} = $disk;
815 $disks->{$num}->{all} .= "\/$disk";
816
817 $num++;
818
819 } else {
820 die "ERROR: in path\n";
821 }
822 }
823
824 die "Vm include no disk on zfs.\n" if !$disks->{0};
825 return $disks;
826 }
827
828 sub snapshot_destroy {
829 my ($source, $dest, $method, $snap) = @_;
830
831 my @zfscmd = ('zfs', 'destroy');
832 my $snapshot = "$source->{all}\@$snap";
833
834 eval {
835 if($source->{ip} && $method eq 'ssh'){
836 run_cmd(['ssh', "root\@$source->{ip}", '--', @zfscmd, $snapshot]);
837 } else {
838 run_cmd([@zfscmd, $snapshot]);
839 }
840 };
841 if (my $erro = $@) {
842 warn "WARN: $erro";
843 }
844 if ($dest) {
845 my @ssh = $dest->{ip} ? ('ssh', "root\@$dest->{ip}", '--') : ();
846
847 my $path = "$dest->{all}\/$source->{last_part}";
848
849 eval {
850 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
851 };
852 if (my $erro = $@) {
853 warn "WARN: $erro";
854 }
855 }
856 }
857
858 sub snapshot_exist {
859 my ($source , $dest, $method) = @_;
860
861 my $cmd = [];
862 push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
863 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
864 push @$cmd, "$dest->{all}/$source->{last_part}\@$source->{old_snap}";
865
866 my $text = "";
867 eval {$text =run_cmd($cmd);};
868 if (my $erro =$@) {
869 warn "WARN: $erro";
870 return undef;
871 }
872
873 while ($text && $text =~ s/^(.*?)(\n|$)//) {
874 my $line =$1;
875 return 1 if $line =~ m/^.*$source->{old_snap}$/;
876 }
877 }
878
879 sub send_image {
880 my ($source, $dest, $param) = @_;
881
882 my $cmd = [];
883
884 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "root\@$source->{ip}", '--' if $source->{ip};
885 push @$cmd, 'zfs', 'send';
886 push @$cmd, '-v' if $param->{verbose};
887
888 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method})) {
889 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
890 }
891 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
892
893 if ($param->{limit}){
894 my $bwl = $param->{limit}*1024;
895 push @$cmd, \'|', 'cstream', '-t', $bwl;
896 }
897 my $target = "$dest->{all}/$source->{last_part}";
898 $target =~ s!/+!/!g;
899
900 push @$cmd, \'|';
901 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "root\@$dest->{ip}", '--' if $dest->{ip};
902 push @$cmd, 'zfs', 'recv', '-F', '--';
903 push @$cmd, "$target";
904
905 eval {
906 run_cmd($cmd)
907 };
908
909 if (my $erro = $@) {
910 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap});
911 die $erro;
912 };
913 }
914
915
916 sub send_config{
917 my ($source, $dest, $method) = @_;
918
919 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
920 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
921
922 my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
923
924 $dest_target_new = $config_dir.'/'.$dest_target_new;
925
926 if ($method eq 'ssh'){
927 if ($dest->{ip} && $source->{ip}) {
928 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
929 run_cmd(['scp', '--', "root\@[$source->{ip}]:$source_target", "root\@[$dest->{ip}]:$dest_target_new"]);
930 } elsif ($dest->{ip}) {
931 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
932 run_cmd(['scp', '--', $source_target, "root\@[$dest->{ip}]:$dest_target_new"]);
933 } elsif ($source->{ip}) {
934 run_cmd(['mkdir', '-p', '--', $config_dir]);
935 run_cmd(['scp', '--', "root\@$source->{ip}:$source_target", $dest_target_new]);
936 }
937
938 if ($source->{destroy}){
939 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
940 if($dest->{ip}){
941 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
942 } else {
943 run_cmd(['rm', '-f', '--', $dest_target_old]);
944 }
945 }
946 } elsif ($method eq 'local') {
947 run_cmd(['mkdir', '-p', '--', $config_dir]);
948 run_cmd(['cp', $source_target, $dest_target_new]);
949 }
950 }
951
952 sub get_date {
953 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
954 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
955
956 return $datestamp;
957 }
958
959 sub status {
960 my $cfg = read_cron();
961
962 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
963
964 my $states = read_state();
965
966 foreach my $source (sort keys%{$cfg}) {
967 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
968 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
969 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
970 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
971 }
972 }
973
974 return $status_list;
975 }
976
977 sub enable_job {
978 my ($param) = @_;
979
980 my $job = get_job($param);
981 $job->{state} = "ok";
982 update_state($job);
983 update_cron($job);
984 }
985
986 sub disable_job {
987 my ($param) = @_;
988
989 my $job = get_job($param);
990 $job->{state} = "stopped";
991 update_state($job);
992 update_cron($job);
993 }
994
995 my $command = $ARGV[0];
996
997 my $commands = {'destroy' => 1,
998 'create' => 1,
999 'sync' => 1,
1000 'list' => 1,
1001 'status' => 1,
1002 'help' => 1,
1003 'enable' => 1,
1004 'disable' => 1};
1005
1006 if (!$command || !$commands->{$command}) {
1007 usage();
1008 die "\n";
1009 }
1010
1011 my $help_sync = "$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1012 \twill sync one time\n
1013 \t-dest\tstring\n
1014 \t\tthe destination target is like [IP:]<Pool>[/Path]\n
1015 \t-limit\tinteger\n
1016 \t\tmax sync speed in kBytes/s, default unlimited\n
1017 \t-maxsnap\tinteger\n
1018 \t\thow much snapshots will be kept before get erased, default 1/n
1019 \t-name\tstring\n
1020 \t\tname of the sync job, if not set it is default.
1021 \tIt is only necessary if scheduler allready contains this source.\n
1022 \t-source\tstring\n
1023 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1024
1025 my $help_create = "$PROGNAME create -dest <string> -source <string> [OPTIONS]/n
1026 \tCreate a sync Job\n
1027 \t-dest\tstring\n
1028 \t\tthe destination target is like [IP]:<Pool>[/Path]\n
1029 \t-limit\tinteger\n
1030 \t\tmax sync speed in kBytes/s, default unlimited\n
1031 \t-maxsnap\tstring\n
1032 \t\thow much snapshots will be kept before get erased, default 1\n
1033 \t-name\tstring\n
1034 \t\tname of the sync job, if not set it is default\n
1035 \t-skip\tboolean\n
1036 \t\tif this flag is set it will skip the first sync\n
1037 \t-source\tstring\n
1038 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1039
1040 my $help_destroy = "$PROGNAME destroy -source <string> [OPTIONS]\n
1041 \tremove a sync Job from the scheduler\n
1042 \t-name\tstring\n
1043 \t\tname of the sync job, if not set it is default\n
1044 \t-source\tstring\n
1045 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1046
1047 my $help_help = "$PROGNAME help <cmd> [OPTIONS]\n
1048 \tGet help about specified command.\n
1049 \t<cmd>\tstring\n
1050 \t\tCommand name\n
1051 \t-verbose\tboolean\n
1052 \t\tVerbose output format.\n";
1053
1054 my $help_list = "$PROGNAME list\n
1055 \tGet a List of all scheduled Sync Jobs\n";
1056
1057 my $help_status = "$PROGNAME status\n
1058 \tGet the status of all scheduled Sync Jobs\n";
1059
1060 my $help_enable = "$PROGNAME enable -source <string> [OPTIONS]\n
1061 \tenable a syncjob and reset error\n
1062 \t-name\tstring\n
1063 \t\tname of the sync job, if not set it is default\n
1064 \t-source\tstring\n
1065 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1066
1067 my $help_disable = "$PROGNAME disable -source <string> [OPTIONS]\n
1068 \tpause a syncjob\n
1069 \t-name\tstring\n
1070 \t\tname of the sync job, if not set it is default\n
1071 \t-source\tstring\n
1072 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1073
1074 sub help {
1075 my ($command) = @_;
1076
1077 if ($command eq 'help') {
1078 die "$help_help\n";
1079
1080 } elsif ($command eq 'sync') {
1081 die "$help_sync\n";
1082
1083 } elsif ($command eq 'destroy') {
1084 die "$help_destroy\n";
1085
1086 } elsif ($command eq 'create') {
1087 die "$help_create\n";
1088
1089 } elsif ($command eq 'list') {
1090 die "$help_list\n";
1091
1092 } elsif ($command eq 'status') {
1093 die "$help_status\n";
1094
1095 } elsif ($command eq 'enable') {
1096 die "$help_enable\n";
1097
1098 } elsif ($command eq 'disable') {
1099 die "$help_disable\n";
1100
1101 }
1102
1103 }
1104
1105 my @arg = @ARGV;
1106 my $param = parse_argv(@arg);
1107
1108 if ($command eq 'destroy') {
1109 die "$help_destroy\n" if !$param->{source};
1110
1111 check_target($param->{source});
1112 destroy_job($param);
1113
1114 } elsif ($command eq 'sync') {
1115 die "$help_sync\n" if !$param->{source} || !$param->{dest};
1116
1117 check_target($param->{source});
1118 check_target($param->{dest});
1119 sync($param);
1120
1121 } elsif ($command eq 'create') {
1122 die "$help_create\n" if !$param->{source} || !$param->{dest};
1123
1124 check_target($param->{source});
1125 check_target($param->{dest});
1126 init($param);
1127
1128 } elsif ($command eq 'status') {
1129 print status();
1130
1131 } elsif ($command eq 'list') {
1132 print list();
1133
1134 } elsif ($command eq 'help') {
1135 my $help_command = $ARGV[1];
1136
1137 if ($help_command && $commands->{$help_command}) {
1138 print help($help_command);
1139
1140 }
1141 if ($param->{verbose} == 1){
1142 exec("man $PROGNAME");
1143
1144 } else {
1145 usage(1);
1146
1147 }
1148
1149 } elsif ($command eq 'enable') {
1150 die "$help_enable\n" if !$param->{source};
1151
1152 check_target($param->{source});
1153 enable_job($param);
1154
1155 } elsif ($command eq 'disable') {
1156 die "$help_disable\n" if !$param->{source};
1157
1158 check_target($param->{source});
1159 disable_job($param);
1160
1161 }
1162
1163 sub usage {
1164 my ($help) = @_;
1165
1166 print("ERROR:\tno command specified\n") if !$help;
1167 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1168 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1169 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1170 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1171 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1172 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1173 print("\t$PROGNAME list\n");
1174 print("\t$PROGNAME status\n");
1175 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1176 }
1177
1178 sub check_target {
1179 my ($target) = @_;
1180 parse_target($target);
1181 }
1182
1183 __END__
1184
1185 =head1 NAME
1186
1187 pve-zsync - PVE ZFS Replication Manager
1188
1189 =head1 SYNOPSIS
1190
1191 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1192
1193 pve-zsync help <cmd> [OPTIONS]
1194
1195 Get help about specified command.
1196
1197 <cmd> string
1198
1199 Command name
1200
1201 -verbose boolean
1202
1203 Verbose output format.
1204
1205 pve-zsync create -dest <string> -source <string> [OPTIONS]
1206
1207 Create a sync Job
1208
1209 -dest string
1210
1211 the destination target is like [IP]:<Pool>[/Path]
1212
1213 -limit integer
1214
1215 max sync speed in kBytes/s, default unlimited
1216
1217 -maxsnap string
1218
1219 how much snapshots will be kept before get erased, default 1
1220
1221 -name string
1222
1223 name of the sync job, if not set it is default
1224
1225 -skip boolean
1226
1227 if this flag is set it will skip the first sync
1228
1229 -source string
1230
1231 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1232
1233 pve-zsync destroy -source <string> [OPTIONS]
1234
1235 remove a sync Job from the scheduler
1236
1237 -name string
1238
1239 name of the sync job, if not set it is default
1240
1241 -source string
1242
1243 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1244
1245 pve-zsync disable -source <string> [OPTIONS]
1246
1247 pause a sync job
1248
1249 -name string
1250
1251 name of the sync job, if not set it is default
1252
1253 -source string
1254
1255 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1256
1257 pve-zsync enable -source <string> [OPTIONS]
1258
1259 enable a syncjob and reset error
1260
1261 -name string
1262
1263 name of the sync job, if not set it is default
1264
1265 -source string
1266
1267 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1268 pve-zsync list
1269
1270 Get a List of all scheduled Sync Jobs
1271
1272 pve-zsync status
1273
1274 Get the status of all scheduled Sync Jobs
1275
1276 pve-zsync sync -dest <string> -source <string> [OPTIONS]
1277
1278 will sync one time
1279
1280 -dest string
1281
1282 the destination target is like [IP:]<Pool>[/Path]
1283
1284 -limit integer
1285
1286 max sync speed in kBytes/s, default unlimited
1287
1288 -maxsnap integer
1289
1290 how much snapshots will be kept before get erased, default 1
1291
1292 -name string
1293
1294 name of the sync job, if not set it is default.
1295 It is only necessary if scheduler allready contains this source.
1296
1297 -source string
1298
1299 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1300
1301 =head1 DESCRIPTION
1302
1303 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1304 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1305 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1306 To config cron see man crontab.
1307
1308 =head2 PVE ZFS Storage sync Tool
1309
1310 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1311
1312 =head1 EXAMPLES
1313
1314 add sync job from local VM to remote ZFS Server
1315 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1316
1317 =head1 IMPORTANT FILES
1318
1319 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1320
1321 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1322
1323 =head1 COPYRIGHT AND DISCLAIMER
1324
1325 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1326
1327 This program is free software: you can redistribute it and/or modify it
1328 under the terms of the GNU Affero General Public License as published
1329 by the Free Software Foundation, either version 3 of the License, or
1330 (at your option) any later version.
1331
1332 This program is distributed in the hope that it will be useful, but
1333 WITHOUT ANY WARRANTY; without even the implied warranty of
1334 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1335 Affero General Public License for more details.
1336
1337 You should have received a copy of the GNU Affero General Public
1338 License along with this program. If not, see
1339 <http://www.gnu.org/licenses/>.