]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
extend send_config with method local
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Copy qw(move);
9 use File::Path qw(make_path);
10 use Switch;
11 use JSON;
12 use IO::File;
13 use String::ShellQuote 'shell_quote';
14
15 my $PROGNAME = "pve-zsync";
16 my $CONFIG_PATH = "/var/lib/${PROGNAME}/";
17 my $STATE = "${CONFIG_PATH}sync_state";
18 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
19 my $PATH = "/usr/sbin/";
20 my $PVE_DIR = "/etc/pve/local/";
21 my $QEMU_CONF = "${PVE_DIR}qemu-server/";
22 my $LXC_CONF = "${PVE_DIR}lxc/";
23 my $LOCKFILE = "$CONFIG_PATH${PROGNAME}.lock";
24 my $PROG_PATH = "$PATH${PROGNAME}";
25 my $INTERVAL = 15;
26 my $DEBUG = 0;
27
28 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
29 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
30 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
31 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
32
33 my $IPV6RE = "(?:" .
34 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
35 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
36 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
41 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
42 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
43
44 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
45 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
46 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
47 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
48 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
49
50 check_bin ('cstream');
51 check_bin ('zfs');
52 check_bin ('ssh');
53 check_bin ('scp');
54
55 sub check_bin {
56 my ($bin) = @_;
57
58 foreach my $p (split (/:/, $ENV{PATH})) {
59 my $fn = "$p/$bin";
60 if (-x $fn) {
61 return $fn;
62 }
63 }
64
65 die "unable to find command '$bin'\n";
66 }
67
68 sub cut_target_width {
69 my ($target, $max) = @_;
70
71 return $target if (length($target) <= $max);
72 my @spl = split('/', $target);
73
74 my $count = length($spl[@spl-1]);
75 return "..\/".substr($spl[@spl-1],($count-$max)+3 ,$count) if $count > $max;
76
77 $count += length($spl[0]) if @spl > 1;
78 return substr($spl[0], 0, $max-4-length($spl[@spl-1]))."\/..\/".$spl[@spl-1] if $count > $max;
79
80 my $rest = 1;
81 $rest = $max-$count if ($max-$count > 0);
82
83 return "$spl[0]".substr($target, length($spl[0]), $rest)."..\/".$spl[@spl-1];
84 }
85
86 sub lock {
87 my ($fh) = @_;
88 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
89 }
90
91 sub unlock {
92 my ($fh) = @_;
93 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
94 }
95
96 sub get_status {
97 my ($source, $name, $status) = @_;
98
99 if ($status->{$source->{all}}->{$name}->{status}) {
100 return $status;
101 }
102
103 return undef;
104 }
105
106 sub check_pool_exists {
107 my ($target) = @_;
108
109 my $cmd = [];
110 push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
111 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
112 eval {
113 run_cmd($cmd);
114 };
115
116 if ($@) {
117 return 1;
118 }
119 return undef;
120 }
121
122 sub parse_target {
123 my ($text) = @_;
124
125 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
126 my $target = {};
127
128 if ($text !~ $TARGETRE) {
129 die "$errstr\n";
130 }
131 $target->{all} = $2;
132 $target->{ip} = $1 if $1;
133 my @parts = split('/', $2);
134
135 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
136
137 my $pool = $target->{pool} = shift(@parts);
138 die "$errstr\n" if !$pool;
139
140 if ($pool =~ m/^\d+$/) {
141 $target->{vmid} = $pool;
142 delete $target->{pool};
143 }
144
145 return $target if (@parts == 0);
146 $target->{last_part} = pop(@parts);
147
148 if ($target->{ip}) {
149 pop(@parts);
150 }
151 if (@parts > 0) {
152 $target->{path} = join('/', @parts);
153 }
154
155 return $target;
156 }
157
158 sub read_cron {
159
160 #This is for the first use to init file;
161 if (!-e $CRONJOBS) {
162 my $new_fh = IO::File->new("> $CRONJOBS");
163 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
164 close($new_fh);
165 return undef;
166 }
167
168 my $fh = IO::File->new("< $CRONJOBS");
169 die "Could not open file $CRONJOBS: $!\n" if !$fh;
170
171 my @text = <$fh>;
172
173 close($fh);
174
175 return encode_cron(@text);
176 }
177
178 sub parse_argv {
179 my (@arg) = @_;
180
181 my $param = {};
182 $param->{dest} = undef;
183 $param->{source} = undef;
184 $param->{verbose} = undef;
185 $param->{limit} = undef;
186 $param->{maxsnap} = undef;
187 $param->{name} = undef;
188 $param->{skip} = undef;
189 $param->{method} = undef;
190
191 my ($ret, $ar) = GetOptionsFromArray(\@arg,
192 'dest=s' => \$param->{dest},
193 'source=s' => \$param->{source},
194 'verbose' => \$param->{verbose},
195 'limit=i' => \$param->{limit},
196 'maxsnap=i' => \$param->{maxsnap},
197 'name=s' => \$param->{name},
198 'skip' => \$param->{skip},
199 'method=s' => \$param->{method});
200
201 if ($ret == 0) {
202 die "can't parse options\n";
203 }
204
205 $param->{name} = "default" if !$param->{name};
206 $param->{maxsnap} = 1 if !$param->{maxsnap};
207 $param->{method} = "ssh" if !$param->{method};
208
209 return $param;
210 }
211
212 sub add_state_to_job {
213 my ($job) = @_;
214
215 my $states = read_state();
216 my $state = $states->{$job->{source}}->{$job->{name}};
217
218 $job->{state} = $state->{state};
219 $job->{lsync} = $state->{lsync};
220 $job->{vm_type} = $state->{vm_type};
221
222 for (my $i = 0; $state->{"snap$i"}; $i++) {
223 $job->{"snap$i"} = $state->{"snap$i"};
224 }
225
226 return $job;
227 }
228
229 sub encode_cron {
230 my (@text) = @_;
231
232 my $cfg = {};
233
234 while (my $line = shift(@text)) {
235
236 my @arg = split('\s', $line);
237 my $param = parse_argv(@arg);
238
239 if ($param->{source} && $param->{dest}) {
240 $cfg->{$param->{source}}->{$param->{name}}->{dest} = $param->{dest};
241 $cfg->{$param->{source}}->{$param->{name}}->{verbose} = $param->{verbose};
242 $cfg->{$param->{source}}->{$param->{name}}->{limit} = $param->{limit};
243 $cfg->{$param->{source}}->{$param->{name}}->{maxsnap} = $param->{maxsnap};
244 $cfg->{$param->{source}}->{$param->{name}}->{skip} = $param->{skip};
245 $cfg->{$param->{source}}->{$param->{name}}->{method} = $param->{method};
246 }
247 }
248
249 return $cfg;
250 }
251
252 sub param_to_job {
253 my ($param) = @_;
254
255 my $job = {};
256
257 my $source = parse_target($param->{source});
258 my $dest = parse_target($param->{dest}) if $param->{dest};
259
260 $job->{name} = !$param->{name} ? "default" : $param->{name};
261 $job->{dest} = $param->{dest} if $param->{dest};
262 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
263 $job->{method} = "ssh" if !$job->{method};
264 $job->{limit} = $param->{limit};
265 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
266 $job->{source} = $param->{source};
267
268 return $job;
269 }
270
271 sub read_state {
272
273 if (!-e $STATE) {
274 make_path $CONFIG_PATH;
275 my $new_fh = IO::File->new("> $STATE");
276 die "Could not create $STATE: $!\n" if !$new_fh;
277 print $new_fh "{}";
278 close($new_fh);
279 return undef;
280 }
281
282 my $fh = IO::File->new("< $STATE");
283 die "Could not open file $STATE: $!\n" if !$fh;
284
285 my $text = <$fh>;
286 my $states = decode_json($text);
287
288 close($fh);
289
290 return $states;
291 }
292
293 sub update_state {
294 my ($job) = @_;
295 my $text;
296 my $in_fh;
297
298 eval {
299
300 $in_fh = IO::File->new("< $STATE");
301 die "Could not open file $STATE: $!\n" if !$in_fh;
302 lock($in_fh);
303 $text = <$in_fh>;
304 };
305
306 my $out_fh = IO::File->new("> $STATE.new");
307 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
308
309 my $states = {};
310 my $state = {};
311 if ($text){
312 $states = decode_json($text);
313 $state = $states->{$job->{source}}->{$job->{name}};
314 }
315
316 if ($job->{state} ne "del") {
317 $state->{state} = $job->{state};
318 $state->{lsync} = $job->{lsync};
319 $state->{vm_type} = $job->{vm_type};
320
321 for (my $i = 0; $job->{"snap$i"} ; $i++) {
322 $state->{"snap$i"} = $job->{"snap$i"};
323 }
324 $states->{$job->{source}}->{$job->{name}} = $state;
325 } else {
326
327 delete $states->{$job->{source}}->{$job->{name}};
328 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
329 }
330
331 $text = encode_json($states);
332 print $out_fh $text;
333
334 close($out_fh);
335 move("$STATE.new", $STATE);
336 eval {
337 close($in_fh);
338 };
339
340 return $states;
341 }
342
343 sub update_cron {
344 my ($job) = @_;
345
346 my $updated;
347 my $has_header;
348 my $line_no = 0;
349 my $text = "";
350 my $header = "SHELL=/bin/sh\n";
351 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
352
353 my $fh = IO::File->new("< $CRONJOBS");
354 die "Could not open file $CRONJOBS: $!\n" if !$fh;
355 lock($fh);
356
357 my @test = <$fh>;
358
359 while (my $line = shift(@test)) {
360 chomp($line);
361 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
362 $updated = 1;
363 next if $job->{state} eq "del";
364 $text .= format_job($job, $line);
365 } else {
366 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
367 $has_header = 1;
368 }
369 $text .= "$line\n";
370 }
371 $line_no++;
372 }
373
374 if (!$has_header) {
375 $text = "$header$text";
376 }
377
378 if (!$updated) {
379 $text .= format_job($job);
380 }
381 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
382 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
383
384 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
385 close ($new_fh);
386
387 die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
388 close ($fh);
389 }
390
391 sub format_job {
392 my ($job, $line) = @_;
393 my $text = "";
394
395 if ($job->{state} eq "stopped") {
396 $text = "#";
397 }
398 if ($line) {
399 $line =~ /^#*(.+) root/;
400 $text .= $1;
401 } else {
402 $text .= "*/$INTERVAL * * * *";
403 }
404 $text .= " root";
405 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
406 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
407 $text .= " --method $job->{method}";
408 $text .= " --verbose" if $job->{verbose};
409 $text .= "\n";
410
411 return $text;
412 }
413
414 sub list {
415
416 my $cfg = read_cron();
417
418 my $list = sprintf("%-25s%-10s%-7s%-20s%-5s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
419
420 my $states = read_state();
421 foreach my $source (sort keys%{$cfg}) {
422 foreach my $name (sort keys%{$cfg->{$source}}) {
423 $list .= sprintf("%-25s", cut_target_width($source, 25));
424 $list .= sprintf("%-10s", cut_target_width($name, 10));
425 $list .= sprintf("%-7s", $states->{$source}->{$name}->{state});
426 $list .= sprintf("%-20s",$states->{$source}->{$name}->{lsync});
427 $list .= sprintf("%-5s",$states->{$source}->{$name}->{vm_type});
428 $list .= sprintf("%-5s\n",$cfg->{$source}->{$name}->{method});
429 }
430 }
431
432 return $list;
433 }
434
435 sub vm_exists {
436 my ($target) = @_;
437
438 my @cmd = ('ssh', "root\@$target->{ip}", '--') if $target->{ip};
439
440 my $res = undef;
441
442 eval { $res = run_cmd([@cmd, 'ls', "$QEMU_CONF$target->{vmid}.conf"]) };
443
444 return "qemu" if $res;
445
446 eval { $res = run_cmd([@cmd, 'ls', "$LXC_CONF$target->{vmid}.conf"]) };
447
448 return "lxc" if $res;
449
450 return undef;
451 }
452
453 sub init {
454 my ($param) = @_;
455
456 my $cfg = read_cron();
457
458 my $job = param_to_job($param);
459
460 $job->{state} = "ok";
461 $job->{lsync} = 0;
462
463 my $source = parse_target($param->{source});
464 my $dest = parse_target($param->{dest});
465
466 if (my $ip = $dest->{ip}) {
467 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
468 }
469
470 if (my $ip = $source->{ip}) {
471 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
472 }
473
474 die "Pool $dest->{all} does not exists\n" if check_pool_exists($dest);
475
476 my $check = check_pool_exists($source->{path}, $source->{ip}) if !$source->{vmid} && $source->{path};
477
478 die "Pool $source->{path} does not exists\n" if undef($check);
479
480 my $vm_type = vm_exists($source);
481 $job->{vm_type} = $vm_type;
482
483 die "VM $source->{vmid} doesn't exist\n" if $param->{vmid} && !$vm_type;
484
485 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
486
487 update_cron($job);
488 update_state($job);
489
490 eval {
491 sync($param) if !$param->{skip};
492 };
493 if(my $err = $@) {
494 destroy_job($param);
495 print $err;
496 }
497 }
498
499 sub get_job {
500 my ($param) = @_;
501
502 my $cfg = read_cron();
503
504 if (!$cfg->{$param->{source}}->{$param->{name}}) {
505 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
506 }
507 my $job = $cfg->{$param->{source}}->{$param->{name}};
508 $job->{name} = $param->{name};
509 $job->{source} = $param->{source};
510 $job = add_state_to_job($job);
511
512 return $job;
513 }
514
515 sub destroy_job {
516 my ($param) = @_;
517
518 my $job = get_job($param);
519 $job->{state} = "del";
520
521 update_cron($job);
522 update_state($job);
523 }
524
525 sub sync {
526 my ($param) = @_;
527
528 my $lock_fh = IO::File->new("> $LOCKFILE");
529 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
530 lock($lock_fh);
531
532 my $date = get_date();
533 my $job;
534 eval {
535 $job = get_job($param);
536 };
537
538 if ($job && $job->{state} eq "syncing") {
539 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
540 }
541
542 my $dest = parse_target($param->{dest});
543 my $source = parse_target($param->{source});
544
545 my $sync_path = sub {
546 my ($source, $dest, $job, $param, $date) = @_;
547
548 ($source->{old_snap},$source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name});
549
550 snapshot_add($source, $dest, $param->{name}, $date);
551
552 send_image($source, $dest, $param);
553
554 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}) if ($source->{destroy} && $source->{old_snap});
555
556 };
557
558 my $vm_type = vm_exists($source);
559 $source->{vm_type} = $vm_type;
560
561 if ($job) {
562 $job->{state} = "syncing";
563 $job->{vm_type} = $vm_type if !$job->{vm_type};
564 update_state($job);
565 }
566
567 eval{
568 if ($source->{vmid}) {
569 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
570 my $disks = get_disks($source);
571
572 foreach my $disk (sort keys %{$disks}) {
573 $source->{all} = $disks->{$disk}->{all};
574 $source->{pool} = $disks->{$disk}->{pool};
575 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
576 $source->{last_part} = $disks->{$disk}->{last_part};
577 &$sync_path($source, $dest, $job, $param, $date);
578 }
579 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
580 send_config($source, $dest,'ssh');
581 } else {
582 send_config($source, $dest,'local');
583 }
584 } else {
585 &$sync_path($source, $dest, $job, $param, $date);
586 }
587 };
588 if(my $err = $@) {
589 if ($job) {
590 $job->{state} = "error";
591 update_state($job);
592 unlock($lock_fh);
593 close($lock_fh);
594 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
595 }
596 die "$err\n";
597 }
598
599 if ($job) {
600 $job->{state} = "ok";
601 $job->{lsync} = $date;
602 update_state($job);
603 }
604
605 unlock($lock_fh);
606 close($lock_fh);
607 }
608
609 sub snapshot_get{
610 my ($source, $dest, $max_snap, $name) = @_;
611
612 my $cmd = [];
613 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
614 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
615 push @$cmd, $source->{all};
616
617 my $raw = run_cmd($cmd);
618 my $index = 0;
619 my $line = "";
620 my $last_snap = undef;
621 my $old_snap;
622
623 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
624 $line = $1;
625 if ($line =~ m/(rep_$name.*)$/) {
626 $last_snap = $1 if (!$last_snap);
627 $old_snap = $1;
628 $index++;
629 if ($index == $max_snap) {
630 $source->{destroy} = 1;
631 last;
632 };
633 }
634 }
635
636 return ($old_snap, $last_snap) if $last_snap;
637
638 return undef;
639 }
640
641 sub snapshot_add {
642 my ($source, $dest, $name, $date) = @_;
643
644 my $snap_name = "rep_$name\_".$date;
645
646 $source->{new_snap} = $snap_name;
647
648 my $path = "$source->{all}\@$snap_name";
649
650 my $cmd = [];
651 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
652 push @$cmd, 'zfs', 'snapshot', $path;
653 eval{
654 run_cmd($cmd);
655 };
656
657 if (my $err = $@) {
658 snapshot_destroy($source, $dest, 'ssh', $snap_name);
659 die "$err\n";
660 }
661 }
662
663 sub write_cron {
664 my ($cfg) = @_;
665
666 my $text = "SHELL=/bin/sh\n";
667 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
668
669 my $fh = IO::File->new("> $CRONJOBS");
670 die "Could not open file: $!\n" if !$fh;
671
672 foreach my $source (sort keys%{$cfg}) {
673 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
674 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
675 $text .= "$PROG_PATH sync";
676 $text .= " -source ";
677 if ($cfg->{$source}->{$sync_name}->{vmid}) {
678 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
679 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
680 } else {
681 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
682 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
683 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
684 }
685 $text .= " -dest ";
686 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
687 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
688 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
689 $text .= " -name $sync_name ";
690 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
691 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
692 $text .= "\n";
693 }
694 }
695 die "Can't write to cron\n" if (!print($fh $text));
696 close($fh);
697 }
698
699 sub get_disks {
700 my ($target) = @_;
701
702 my $cmd = [];
703 push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
704
705 if ($target->{vm_type} eq 'qemu') {
706 push @$cmd, 'qm', 'config', $target->{vmid};
707 } elsif ($target->{vm_type} eq 'lxc') {
708 push @$cmd, 'pct', 'config', $target->{vmid};
709 } else {
710 die "VM Type unknown\n";
711 }
712
713 my $res = run_cmd($cmd);
714
715 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type});
716
717 return $disks;
718 }
719
720 sub run_cmd {
721 my ($cmd) = @_;
722 print "Start CMD\n" if $DEBUG;
723 print Dumper $cmd if $DEBUG;
724 if (ref($cmd) eq 'ARRAY') {
725 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
726 }
727 my $output = `$cmd 2>&1`;
728
729 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
730
731 chomp($output);
732 print Dumper $output if $DEBUG;
733 print "END CMD\n" if $DEBUG;
734 return $output;
735 }
736
737 sub parse_disks {
738 my ($text, $ip, $vm_type) = @_;
739
740 my $disks;
741
742 my $num = 0;
743 while ($text && $text =~ s/^(.*?)(\n|$)//) {
744 my $line = $1;
745
746 next if $line =~ /cdrom|none/;
747 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
748
749 my $disk = undef;
750 my $stor = undef;
751 if($line =~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.+:)([A-Za-z0-9\-]+),(.*)$/) {
752 $disk = $3;
753 $stor = $2;
754 } else {
755 die "disk is not on ZFS Storage\n";
756 }
757
758 my $cmd = [];
759 push @$cmd, 'ssh', "root\@$ip", '--' if $ip;
760 push @$cmd, 'pvesm', 'path', "$stor$disk";
761 my $path = run_cmd($cmd);
762
763 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
764
765 my @array = split('/', $1);
766 $disks->{$num}->{pool} = shift(@array);
767 $disks->{$num}->{all} = $disks->{$num}->{pool};
768 if (0 < @array) {
769 $disks->{$num}->{path} = join('/', @array);
770 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
771 }
772 $disks->{$num}->{last_part} = $disk;
773 $disks->{$num}->{all} .= "\/$disk";
774
775 $num++;
776 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)\/(\w+.*)(\/$disk)$/) {
777
778 $disks->{$num}->{pool} = $1;
779 $disks->{$num}->{all} = $disks->{$num}->{pool};
780
781 if ($2) {
782 $disks->{$num}->{path} = $2;
783 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
784 }
785
786 $disks->{$num}->{last_part} = $disk;
787 $disks->{$num}->{all} .= "\/$disk";
788
789 $num++;
790
791 } else {
792 die "ERROR: in path\n";
793 }
794 }
795
796 return $disks;
797 }
798
799 sub snapshot_destroy {
800 my ($source, $dest, $method, $snap) = @_;
801
802 my @zfscmd = ('zfs', 'destroy');
803 my $snapshot = "$source->{all}\@$snap";
804
805 eval {
806 if($source->{ip} && $method eq 'ssh'){
807 run_cmd(['ssh', "root\@$source->{ip}", '--', @zfscmd, $snapshot]);
808 } else {
809 run_cmd([@zfscmd, $snapshot]);
810 }
811 };
812 if (my $erro = $@) {
813 warn "WARN: $erro";
814 }
815 if ($dest) {
816 my @ssh = $dest->{ip} ? ('ssh', "root\@$dest->{ip}", '--') : ();
817
818 my $path = "$dest->{all}\/$source->{last_part}";
819
820 eval {
821 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
822 };
823 if (my $erro = $@) {
824 warn "WARN: $erro";
825 }
826 }
827 }
828
829 sub snapshot_exist {
830 my ($source ,$dest, $method) = @_;
831
832 my $cmd = [];
833 push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
834 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
835 push @$cmd, "$dest->{all}/$source->{last_part}\@$source->{old_snap}";
836
837 my $text = "";
838 eval {$text =run_cmd($cmd);};
839 if (my $erro = $@) {
840 warn "WARN: $erro";
841 return undef;
842 }
843
844 while ($text && $text =~ s/^(.*?)(\n|$)//) {
845 my $line = $1;
846 return 1 if $line =~ m/^.*$source->{old_snap}$/;
847 }
848 }
849
850 sub send_image {
851 my ($source, $dest, $param) = @_;
852
853 my $cmd = [];
854
855 push @$cmd, 'ssh', "root\@$source->{ip}", '--' if $source->{ip};
856 push @$cmd, 'zfs', 'send';
857 push @$cmd, '-v' if $param->{verbose};
858
859 if($source->{last_snap} && snapshot_exist($source ,$dest, $param->{method})) {
860 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
861 }
862 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
863
864 if ($param->{limit}){
865 my $bwl = $param->{limit}*1024;
866 push @$cmd, \'|', 'cstream', '-t', $bwl;
867 }
868 my $target = "$dest->{all}/$source->{last_part}";
869 $target =~ s!/+!/!g;
870
871 push @$cmd, \'|';
872 push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
873 push @$cmd, 'zfs', 'recv', '-F', '--';
874 push @$cmd, "$target";
875
876 eval {
877 run_cmd($cmd)
878 };
879
880 if (my $erro = $@) {
881 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap});
882 die $erro;
883 };
884 }
885
886
887 sub send_config{
888 my ($source, $dest, $method) = @_;
889
890 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF$source->{vmid}.conf": "$LXC_CONF$source->{vmid}.conf";
891 my $dest_target_new ="$CONFIG_PATH$source->{vmid}.conf.$source->{new_snap}";
892
893 if ($method eq 'ssh'){
894 if ($dest->{ip} && $source->{ip}) {
895 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $CONFIG_PATH]);
896 run_cmd(['scp', '--', "root\@[$source->{ip}]:$source_target", "root\@[$dest->{ip}]:$dest_target_new"]);
897 } elsif ($dest->{ip}) {
898 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $CONFIG_PATH]);
899 run_cmd(['scp', '--', $source_target, "root\@[$dest->{ip}]:$dest_target_new"]);
900 } elsif ($source->{ip}) {
901 run_cmd(['mkdir', '-p', '--', $CONFIG_PATH]);
902 run_cmd(['scp', '--', "root\@$source->{ip}:$source_target", $dest_target_new]);
903 }
904
905 if ($source->{destroy}){
906 my $dest_target_old ="$CONFIG_PATH$source->{vmid}.conf.$source->{old_snap}";
907 if($dest->{ip}){
908 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
909 } else {
910 run_cmd(['rm', '-f', '--', $dest_target_old]);
911 }
912 }
913 } elsif ($method eq 'local') {
914 run_cmd(['mkdir', '-p', '--', $CONFIG_PATH]);
915 run_cmd(['cp', $source_target, $dest_target_new]);
916 }
917 }
918
919 sub get_date {
920 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
921 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
922
923 return $datestamp;
924 }
925
926 sub status {
927 my $cfg = read_cron();
928
929 my $status_list = sprintf("%-25s%-15s%-10s\n", "SOURCE", "NAME", "STATUS");
930
931 my $states = read_state();
932
933 foreach my $source (sort keys%{$cfg}) {
934 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
935 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
936 $status_list .= sprintf("%-15s", cut_target_width($sync_name, 25));
937 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
938 }
939 }
940
941 return $status_list;
942 }
943
944 sub enable_job {
945 my ($param) = @_;
946
947 my $job = get_job($param);
948 $job->{state} = "ok";
949 update_state($job);
950 update_cron($job);
951 }
952
953 sub disable_job {
954 my ($param) = @_;
955
956 my $job = get_job($param);
957 $job->{state} = "stopped";
958 update_state($job);
959 update_cron($job);
960 }
961
962 my $command = $ARGV[0];
963
964 my $commands = {'destroy' => 1,
965 'create' => 1,
966 'sync' => 1,
967 'list' => 1,
968 'status' => 1,
969 'help' => 1,
970 'enable' => 1,
971 'disable' => 1};
972
973 if (!$command || !$commands->{$command}) {
974 usage();
975 die "\n";
976 }
977
978 my $help_sync = "$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
979 \twill sync one time\n
980 \t-dest\tstring\n
981 \t\tthe destination target is like [IP:]<Pool>[/Path]\n
982 \t-limit\tinteger\n
983 \t\tmax sync speed in kBytes/s, default unlimited\n
984 \t-maxsnap\tinteger\n
985 \t\thow much snapshots will be kept before get erased, default 1/n
986 \t-name\tstring\n
987 \t\tname of the sync job, if not set it is default.
988 \tIt is only necessary if scheduler allready contains this source.\n
989 \t-source\tstring\n
990 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
991
992 my $help_create = "$PROGNAME create -dest <string> -source <string> [OPTIONS]/n
993 \tCreate a sync Job\n
994 \t-dest\tstringn\n
995 \t\tthe destination target is like [IP]:<Pool>[/Path]\n
996 \t-limit\tinteger\n
997 \t\tmax sync speed in kBytes/s, default unlimited\n
998 \t-maxsnap\tstring\n
999 \t\thow much snapshots will be kept before get erased, default 1\n
1000 \t-name\tstring\n
1001 \t\tname of the sync job, if not set it is default\n
1002 \t-skip\tboolean\n
1003 \t\tif this flag is set it will skip the first sync\n
1004 \t-source\tstring\n
1005 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1006
1007 my $help_destroy = "$PROGNAME destroy -source <string> [OPTIONS]\n
1008 \tremove a sync Job from the scheduler\n
1009 \t-name\tstring\n
1010 \t\tname of the sync job, if not set it is default\n
1011 \t-source\tstring\n
1012 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1013
1014 my $help_help = "$PROGNAME help <cmd> [OPTIONS]\n
1015 \tGet help about specified command.\n
1016 \t<cmd>\tstring\n
1017 \t\tCommand name\n
1018 \t-verbose\tboolean\n
1019 \t\tVerbose output format.\n";
1020
1021 my $help_list = "$PROGNAME list\n
1022 \tGet a List of all scheduled Sync Jobs\n";
1023
1024 my $help_status = "$PROGNAME status\n
1025 \tGet the status of all scheduled Sync Jobs\n";
1026
1027 my $help_enable = "$PROGNAME enable -source <string> [OPTIONS]\n
1028 \tenable a syncjob and reset error\n
1029 \t-name\tstring\n
1030 \t\tname of the sync job, if not set it is default\n
1031 \t-source\tstring\n
1032 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1033
1034 my $help_disable = "$PROGNAME disable -source <string> [OPTIONS]\n
1035 \tpause a syncjob\n
1036 \t-name\tstring\n
1037 \t\tname of the sync job, if not set it is default\n
1038 \t-source\tstring\n
1039 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
1040
1041 sub help {
1042 my ($command) = @_;
1043
1044 switch($command){
1045 case 'help'
1046 {
1047 die "$help_help\n";
1048 }
1049 case 'sync'
1050 {
1051 die "$help_sync\n";
1052 }
1053 case 'destroy'
1054 {
1055 die "$help_destroy\n";
1056 }
1057 case 'create'
1058 {
1059 die "$help_create\n";
1060 }
1061 case 'list'
1062 {
1063 die "$help_list\n";
1064 }
1065 case 'status'
1066 {
1067 die "$help_status\n";
1068 }
1069 case 'enable'
1070 {
1071 die "$help_enable\n";
1072 }
1073 case 'disable'
1074 {
1075 die "$help_enable\n";
1076 }
1077 }
1078
1079 }
1080
1081 my @arg = @ARGV;
1082 my $param = parse_argv(@arg);
1083
1084
1085 switch($command) {
1086 case "destroy"
1087 {
1088 die "$help_destroy\n" if !$param->{source};
1089 check_target($param->{source});
1090 destroy_job($param);
1091 }
1092 case "sync"
1093 {
1094 die "$help_sync\n" if !$param->{source} || !$param->{dest};
1095 check_target($param->{source});
1096 check_target($param->{dest});
1097 sync($param);
1098 }
1099 case "create"
1100 {
1101 die "$help_create\n" if !$param->{source} || !$param->{dest};
1102 check_target($param->{source});
1103 check_target($param->{dest});
1104 init($param);
1105 }
1106 case "status"
1107 {
1108 print status();
1109 }
1110 case "list"
1111 {
1112 print list();
1113 }
1114 case "help"
1115 {
1116 my $help_command = $ARGV[1];
1117 if ($help_command && $commands->{$help_command}) {
1118 print help($help_command);
1119 }
1120 if ($param->{verbose} == 1){
1121 exec("man $PROGNAME");
1122 } else {
1123 usage(1);
1124 }
1125 }
1126 case "enable"
1127 {
1128 die "$help_enable\n" if !$param->{source};
1129 check_target($param->{source});
1130 enable_job($param);
1131 }
1132 case "disable"
1133 {
1134 die "$help_disable\n" if !$param->{source};
1135 check_target($param->{source});
1136 disable_job($param);
1137 }
1138 }
1139
1140 sub usage {
1141 my ($help) = @_;
1142
1143 print("ERROR:\tno command specified\n") if !$help;
1144 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1145 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1146 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1147 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1148 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1149 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1150 print("\t$PROGNAME list\n");
1151 print("\t$PROGNAME status\n");
1152 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1153 }
1154
1155 sub check_target {
1156 my ($target) = @_;
1157 parse_target($target);
1158 }
1159
1160 __END__
1161
1162 =head1 NAME
1163
1164 pve-zsync - PVE ZFS Replication Manager
1165
1166 =head1 SYNOPSIS
1167
1168 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1169
1170 pve-zsync help <cmd> [OPTIONS]
1171
1172 Get help about specified command.
1173
1174 <cmd> string
1175
1176 Command name
1177
1178 -verbose boolean
1179
1180 Verbose output format.
1181
1182 pve-zsync create -dest <string> -source <string> [OPTIONS]
1183
1184 Create a sync Job
1185
1186 -dest string
1187
1188 the destination target is like [IP]:<Pool>[/Path]
1189
1190 -limit integer
1191
1192 max sync speed in kBytes/s, default unlimited
1193
1194 -maxsnap string
1195
1196 how much snapshots will be kept before get erased, default 1
1197
1198 -name string
1199
1200 name of the sync job, if not set it is default
1201
1202 -skip boolean
1203
1204 if this flag is set it will skip the first sync
1205
1206 -source string
1207
1208 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1209
1210 pve-zsync destroy -source <string> [OPTIONS]
1211
1212 remove a sync Job from the scheduler
1213
1214 -name string
1215
1216 name of the sync job, if not set it is default
1217
1218 -source string
1219
1220 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1221
1222 pve-zsync disable -source <string> [OPTIONS]
1223
1224 pause a sync job
1225
1226 -name string
1227
1228 name of the sync job, if not set it is default
1229
1230 -source string
1231
1232 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1233
1234 pve-zsync enable -source <string> [OPTIONS]
1235
1236 enable a syncjob and reset error
1237
1238 -name string
1239
1240 name of the sync job, if not set it is default
1241
1242 -source string
1243
1244 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1245 pve-zsync list
1246
1247 Get a List of all scheduled Sync Jobs
1248
1249 pve-zsync status
1250
1251 Get the status of all scheduled Sync Jobs
1252
1253 pve-zsync sync -dest <string> -source <string> [OPTIONS]
1254
1255 will sync one time
1256
1257 -dest string
1258
1259 the destination target is like [IP:]<Pool>[/Path]
1260
1261 -limit integer
1262
1263 max sync speed in kBytes/s, default unlimited
1264
1265 -maxsnap integer
1266
1267 how much snapshots will be kept before get erased, default 1
1268
1269 -name string
1270
1271 name of the sync job, if not set it is default.
1272 It is only necessary if scheduler allready contains this source.
1273
1274 -source string
1275
1276 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1277
1278 =head1 DESCRIPTION
1279
1280 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1281 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1282 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1283 To config cron see man crontab.
1284
1285 =head2 PVE ZFS Storage sync Tool
1286
1287 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1288
1289 =head1 EXAMPLES
1290
1291 add sync job from local VM to remote ZFS Server
1292 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1293
1294 =head1 IMPORTANT FILES
1295
1296 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1297
1298 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1299
1300 =head1 COPYRIGHT AND DISCLAIMER
1301
1302 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1303
1304 This program is free software: you can redistribute it and/or modify it
1305 under the terms of the GNU Affero General Public License as published
1306 by the Free Software Foundation, either version 3 of the License, or
1307 (at your option) any later version.
1308
1309 This program is distributed in the hope that it will be useful, but
1310 WITHOUT ANY WARRANTY; without even the implied warranty of
1311 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1312 Affero General Public License for more details.
1313
1314 You should have received a copy of the GNU Affero General Public
1315 License along with this program. If not, see
1316 <http://www.gnu.org/licenses/>.