]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
Fix: You can now use the pool as replication source.
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Copy qw(move);
9 use File::Path qw(make_path);
10 use JSON;
11 use IO::File;
12 use String::ShellQuote 'shell_quote';
13
14 my $PROGNAME = "pve-zsync";
15 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
16 my $STATE = "${CONFIG_PATH}/sync_state";
17 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
18 my $PATH = "/usr/sbin";
19 my $PVE_DIR = "/etc/pve/local";
20 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
21 my $LXC_CONF = "${PVE_DIR}/lxc";
22 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
23 my $PROG_PATH = "$PATH/${PROGNAME}";
24 my $INTERVAL = 15;
25 my $DEBUG = 0;
26
27 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
28 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
29 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
30 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
31
32 my $IPV6RE = "(?:" .
33 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
34 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
35 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
36 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
41 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
42
43 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
44 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
45 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
46 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
47 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
48
49 check_bin ('cstream');
50 check_bin ('zfs');
51 check_bin ('ssh');
52 check_bin ('scp');
53
54 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =
55 sub {
56 die "Signal aborting sync\n";
57 };
58
59 sub check_bin {
60 my ($bin) = @_;
61
62 foreach my $p (split (/:/, $ENV{PATH})) {
63 my $fn = "$p/$bin";
64 if (-x $fn) {
65 return $fn;
66 }
67 }
68
69 die "unable to find command '$bin'\n";
70 }
71
72 sub cut_target_width {
73 my ($path, $maxlen) = @_;
74 $path =~ s@/+@/@g;
75
76 return $path if length($path) <= $maxlen;
77
78 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
79
80 $path =~ s@/([^/]+/?)$@@;
81 my $tail = $1;
82
83 if (length($tail)+3 == $maxlen) {
84 return "../$tail";
85 } elsif (length($tail)+2 >= $maxlen) {
86 return '..'.substr($tail, -$maxlen+2)
87 }
88
89 $path =~ s@(/[^/]+)(?:/|$)@@;
90 my $head = $1;
91 my $both = length($head) + length($tail);
92 my $remaining = $maxlen-$both-4; # -4 for "/../"
93
94 if ($remaining < 0) {
95 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
96 }
97
98 substr($path, ($remaining/2), (length($path)-$remaining), '..');
99 return "$head/" . $path . "/$tail";
100 }
101
102 sub lock {
103 my ($fh) = @_;
104 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
105 }
106
107 sub unlock {
108 my ($fh) = @_;
109 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
110 }
111
112 sub get_status {
113 my ($source, $name, $status) = @_;
114
115 if ($status->{$source->{all}}->{$name}->{status}) {
116 return $status;
117 }
118
119 return undef;
120 }
121
122 sub check_pool_exists {
123 my ($target) = @_;
124
125 my $cmd = [];
126
127 if ($target->{ip}) {
128 push @$cmd, 'ssh', "root\@$target->{ip}", '--';
129 }
130 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
131 eval {
132 run_cmd($cmd);
133 };
134
135 if ($@) {
136 return 0;
137 }
138 return 1;
139 }
140
141 sub parse_target {
142 my ($text) = @_;
143
144 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
145 my $target = {};
146
147 if ($text !~ $TARGETRE) {
148 die "$errstr\n";
149 }
150 $target->{all} = $2;
151 $target->{ip} = $1 if $1;
152 my @parts = split('/', $2);
153
154 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
155
156 my $pool = $target->{pool} = shift(@parts);
157 die "$errstr\n" if !$pool;
158
159 if ($pool =~ m/^\d+$/) {
160 $target->{vmid} = $pool;
161 delete $target->{pool};
162 }
163
164 return $target if (@parts == 0);
165 $target->{last_part} = pop(@parts);
166
167 if ($target->{ip}) {
168 pop(@parts);
169 }
170 if (@parts > 0) {
171 $target->{path} = join('/', @parts);
172 }
173
174 return $target;
175 }
176
177 sub read_cron {
178
179 #This is for the first use to init file;
180 if (!-e $CRONJOBS) {
181 my $new_fh = IO::File->new("> $CRONJOBS");
182 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
183 close($new_fh);
184 return undef;
185 }
186
187 my $fh = IO::File->new("< $CRONJOBS");
188 die "Could not open file $CRONJOBS: $!\n" if !$fh;
189
190 my @text = <$fh>;
191
192 close($fh);
193
194 return encode_cron(@text);
195 }
196
197 sub parse_argv {
198 my (@arg) = @_;
199
200 my $param = {};
201 $param->{dest} = undef;
202 $param->{source} = undef;
203 $param->{verbose} = undef;
204 $param->{limit} = undef;
205 $param->{maxsnap} = undef;
206 $param->{name} = undef;
207 $param->{skip} = undef;
208 $param->{method} = undef;
209
210 my ($ret, $ar) = GetOptionsFromArray(\@arg,
211 'dest=s' => \$param->{dest},
212 'source=s' => \$param->{source},
213 'verbose' => \$param->{verbose},
214 'limit=i' => \$param->{limit},
215 'maxsnap=i' => \$param->{maxsnap},
216 'name=s' => \$param->{name},
217 'skip' => \$param->{skip},
218 'method=s' => \$param->{method});
219
220 if ($ret == 0) {
221 die "can't parse options\n";
222 }
223
224 $param->{name} = "default" if !$param->{name};
225 $param->{maxsnap} = 1 if !$param->{maxsnap};
226 $param->{method} = "ssh" if !$param->{method};
227
228 return $param;
229 }
230
231 sub add_state_to_job {
232 my ($job) = @_;
233
234 my $states = read_state();
235 my $state = $states->{$job->{source}}->{$job->{name}};
236
237 $job->{state} = $state->{state};
238 $job->{lsync} = $state->{lsync};
239 $job->{vm_type} = $state->{vm_type};
240
241 for (my $i = 0; $state->{"snap$i"}; $i++) {
242 $job->{"snap$i"} = $state->{"snap$i"};
243 }
244
245 return $job;
246 }
247
248 sub encode_cron {
249 my (@text) = @_;
250
251 my $cfg = {};
252
253 while (my $line = shift(@text)) {
254
255 my @arg = split('\s', $line);
256 my $param = parse_argv(@arg);
257
258 if ($param->{source} && $param->{dest}) {
259 $cfg->{$param->{source}}->{$param->{name}}->{dest} = $param->{dest};
260 $cfg->{$param->{source}}->{$param->{name}}->{verbose} = $param->{verbose};
261 $cfg->{$param->{source}}->{$param->{name}}->{limit} = $param->{limit};
262 $cfg->{$param->{source}}->{$param->{name}}->{maxsnap} = $param->{maxsnap};
263 $cfg->{$param->{source}}->{$param->{name}}->{skip} = $param->{skip};
264 $cfg->{$param->{source}}->{$param->{name}}->{method} = $param->{method};
265 }
266 }
267
268 return $cfg;
269 }
270
271 sub param_to_job {
272 my ($param) = @_;
273
274 my $job = {};
275
276 my $source = parse_target($param->{source});
277 my $dest = parse_target($param->{dest}) if $param->{dest};
278
279 $job->{name} = !$param->{name} ? "default" : $param->{name};
280 $job->{dest} = $param->{dest} if $param->{dest};
281 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
282 $job->{method} = "ssh" if !$job->{method};
283 $job->{limit} = $param->{limit};
284 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
285 $job->{source} = $param->{source};
286
287 return $job;
288 }
289
290 sub read_state {
291
292 if (!-e $STATE) {
293 make_path $CONFIG_PATH;
294 my $new_fh = IO::File->new("> $STATE");
295 die "Could not create $STATE: $!\n" if !$new_fh;
296 print $new_fh "{}";
297 close($new_fh);
298 return undef;
299 }
300
301 my $fh = IO::File->new("< $STATE");
302 die "Could not open file $STATE: $!\n" if !$fh;
303
304 my $text = <$fh>;
305 my $states = decode_json($text);
306
307 close($fh);
308
309 return $states;
310 }
311
312 sub update_state {
313 my ($job) = @_;
314 my $text;
315 my $in_fh;
316
317 eval {
318
319 $in_fh = IO::File->new("< $STATE");
320 die "Could not open file $STATE: $!\n" if !$in_fh;
321 lock($in_fh);
322 $text = <$in_fh>;
323 };
324
325 my $out_fh = IO::File->new("> $STATE.new");
326 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
327
328 my $states = {};
329 my $state = {};
330 if ($text){
331 $states = decode_json($text);
332 $state = $states->{$job->{source}}->{$job->{name}};
333 }
334
335 if ($job->{state} ne "del") {
336 $state->{state} = $job->{state};
337 $state->{lsync} = $job->{lsync};
338 $state->{vm_type} = $job->{vm_type};
339
340 for (my $i = 0; $job->{"snap$i"} ; $i++) {
341 $state->{"snap$i"} = $job->{"snap$i"};
342 }
343 $states->{$job->{source}}->{$job->{name}} = $state;
344 } else {
345
346 delete $states->{$job->{source}}->{$job->{name}};
347 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
348 }
349
350 $text = encode_json($states);
351 print $out_fh $text;
352
353 close($out_fh);
354 move("$STATE.new", $STATE);
355 eval {
356 close($in_fh);
357 };
358
359 return $states;
360 }
361
362 sub update_cron {
363 my ($job) = @_;
364
365 my $updated;
366 my $has_header;
367 my $line_no = 0;
368 my $text = "";
369 my $header = "SHELL=/bin/sh\n";
370 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
371
372 my $fh = IO::File->new("< $CRONJOBS");
373 die "Could not open file $CRONJOBS: $!\n" if !$fh;
374 lock($fh);
375
376 my @test = <$fh>;
377
378 while (my $line = shift(@test)) {
379 chomp($line);
380 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
381 $updated = 1;
382 next if $job->{state} eq "del";
383 $text .= format_job($job, $line);
384 } else {
385 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
386 $has_header = 1;
387 }
388 $text .= "$line\n";
389 }
390 $line_no++;
391 }
392
393 if (!$has_header) {
394 $text = "$header$text";
395 }
396
397 if (!$updated) {
398 $text .= format_job($job);
399 }
400 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
401 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
402
403 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
404 close ($new_fh);
405
406 die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
407 close ($fh);
408 }
409
410 sub format_job {
411 my ($job, $line) = @_;
412 my $text = "";
413
414 if ($job->{state} eq "stopped") {
415 $text = "#";
416 }
417 if ($line) {
418 $line =~ /^#*(.+) root/;
419 $text .= $1;
420 } else {
421 $text .= "*/$INTERVAL * * * *";
422 }
423 $text .= " root";
424 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
425 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
426 $text .= " --limit $job->{limit}" if $job->{limit};
427 $text .= " --method $job->{method}";
428 $text .= " --verbose" if $job->{verbose};
429 $text .= "\n";
430
431 return $text;
432 }
433
434 sub list {
435
436 my $cfg = read_cron();
437
438 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
439
440 my $states = read_state();
441 foreach my $source (sort keys%{$cfg}) {
442 foreach my $name (sort keys%{$cfg->{$source}}) {
443 $list .= sprintf("%-25s", cut_target_width($source, 25));
444 $list .= sprintf("%-25s", cut_target_width($name, 25));
445 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
446 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
447 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
448 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
449 }
450 }
451
452 return $list;
453 }
454
455 sub vm_exists {
456 my ($target) = @_;
457
458 my @cmd = ('ssh', "root\@$target->{ip}", '--') if $target->{ip};
459
460 my $res = undef;
461
462 return undef if !defined($target->{vmid});
463
464 eval { $res = run_cmd([@cmd, 'ls', "$QEMU_CONF/$target->{vmid}.conf"]) };
465
466 return "qemu" if $res;
467
468 eval { $res = run_cmd([@cmd, 'ls', "$LXC_CONF/$target->{vmid}.conf"]) };
469
470 return "lxc" if $res;
471
472 return undef;
473 }
474
475 sub init {
476 my ($param) = @_;
477
478 my $cfg = read_cron();
479
480 my $job = param_to_job($param);
481
482 $job->{state} = "ok";
483 $job->{lsync} = 0;
484
485 my $source = parse_target($param->{source});
486 my $dest = parse_target($param->{dest});
487
488 if (my $ip = $dest->{ip}) {
489 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
490 }
491
492 if (my $ip = $source->{ip}) {
493 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
494 }
495
496 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest);
497
498 if (!defined($source->{vmid})) {
499 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source);
500 }
501
502 my $vm_type = vm_exists($source);
503 $job->{vm_type} = $vm_type;
504 $source->{vm_type} = $vm_type;
505
506 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
507
508 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
509
510 #check if vm has zfs disks if not die;
511 get_disks($source) if $source->{vmid};
512
513 update_cron($job);
514 update_state($job);
515
516 eval {
517 sync($param) if !$param->{skip};
518 };
519 if(my $err = $@) {
520 destroy_job($param);
521 print $err;
522 }
523 }
524
525 sub get_job {
526 my ($param) = @_;
527
528 my $cfg = read_cron();
529
530 if (!$cfg->{$param->{source}}->{$param->{name}}) {
531 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
532 }
533 my $job = $cfg->{$param->{source}}->{$param->{name}};
534 $job->{name} = $param->{name};
535 $job->{source} = $param->{source};
536 $job = add_state_to_job($job);
537
538 return $job;
539 }
540
541 sub destroy_job {
542 my ($param) = @_;
543
544 my $job = get_job($param);
545 $job->{state} = "del";
546
547 update_cron($job);
548 update_state($job);
549 }
550
551 sub sync {
552 my ($param) = @_;
553
554 my $lock_fh = IO::File->new("> $LOCKFILE");
555 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
556 lock($lock_fh);
557
558 my $date = get_date();
559 my $job;
560 eval {
561 $job = get_job($param);
562 };
563
564 if ($job && $job->{state} eq "syncing") {
565 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
566 }
567
568 my $dest = parse_target($param->{dest});
569 my $source = parse_target($param->{source});
570
571 my $sync_path = sub {
572 my ($source, $dest, $job, $param, $date) = @_;
573
574 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name});
575
576 snapshot_add($source, $dest, $param->{name}, $date);
577
578 send_image($source, $dest, $param);
579
580 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}) if ($source->{destroy} && $source->{old_snap});
581
582 };
583
584 my $vm_type = vm_exists($source);
585 $source->{vm_type} = $vm_type;
586
587 if ($job) {
588 $job->{state} = "syncing";
589 $job->{vm_type} = $vm_type if !$job->{vm_type};
590 update_state($job);
591 }
592
593 eval{
594 if ($source->{vmid}) {
595 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
596 my $disks = get_disks($source);
597
598 foreach my $disk (sort keys %{$disks}) {
599 $source->{all} = $disks->{$disk}->{all};
600 $source->{pool} = $disks->{$disk}->{pool};
601 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
602 $source->{last_part} = $disks->{$disk}->{last_part};
603 &$sync_path($source, $dest, $job, $param, $date);
604 }
605 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
606 send_config($source, $dest,'ssh');
607 } else {
608 send_config($source, $dest,'local');
609 }
610 } else {
611 &$sync_path($source, $dest, $job, $param, $date);
612 }
613 };
614 if(my $err = $@) {
615 if ($job) {
616 $job->{state} = "error";
617 update_state($job);
618 unlock($lock_fh);
619 close($lock_fh);
620 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
621 }
622 die "$err\n";
623 }
624
625 if ($job) {
626 $job->{state} = "ok";
627 $job->{lsync} = $date;
628 update_state($job);
629 }
630
631 unlock($lock_fh);
632 close($lock_fh);
633 }
634
635 sub snapshot_get{
636 my ($source, $dest, $max_snap, $name) = @_;
637
638 my $cmd = [];
639 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
640 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
641 push @$cmd, $source->{all};
642
643 my $raw = run_cmd($cmd);
644 my $index = 0;
645 my $line = "";
646 my $last_snap = undef;
647 my $old_snap;
648
649 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
650 $line = $1;
651 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
652
653 $last_snap = $1 if (!$last_snap);
654 $old_snap = $1;
655 $index++;
656 if ($index == $max_snap) {
657 $source->{destroy} = 1;
658 last;
659 };
660 }
661 }
662
663 return ($old_snap, $last_snap) if $last_snap;
664
665 return undef;
666 }
667
668 sub snapshot_add {
669 my ($source, $dest, $name, $date) = @_;
670
671 my $snap_name = "rep_$name\_".$date;
672
673 $source->{new_snap} = $snap_name;
674
675 my $path = "$source->{all}\@$snap_name";
676
677 my $cmd = [];
678 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
679 push @$cmd, 'zfs', 'snapshot', $path;
680 eval{
681 run_cmd($cmd);
682 };
683
684 if (my $err = $@) {
685 snapshot_destroy($source, $dest, 'ssh', $snap_name);
686 die "$err\n";
687 }
688 }
689
690 sub write_cron {
691 my ($cfg) = @_;
692
693 my $text = "SHELL=/bin/sh\n";
694 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
695
696 my $fh = IO::File->new("> $CRONJOBS");
697 die "Could not open file: $!\n" if !$fh;
698
699 foreach my $source (sort keys%{$cfg}) {
700 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
701 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
702 $text .= "$PROG_PATH sync";
703 $text .= " -source ";
704 if ($cfg->{$source}->{$sync_name}->{vmid}) {
705 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
706 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
707 } else {
708 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
709 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
710 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
711 }
712 $text .= " -dest ";
713 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
714 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
715 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
716 $text .= " -name $sync_name ";
717 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
718 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
719 $text .= "\n";
720 }
721 }
722 die "Can't write to cron\n" if (!print($fh $text));
723 close($fh);
724 }
725
726 sub get_disks {
727 my ($target) = @_;
728
729 my $cmd = [];
730 push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
731
732 if ($target->{vm_type} eq 'qemu') {
733 push @$cmd, 'qm', 'config', $target->{vmid};
734 } elsif ($target->{vm_type} eq 'lxc') {
735 push @$cmd, 'pct', 'config', $target->{vmid};
736 } else {
737 die "VM Type unknown\n";
738 }
739
740 my $res = run_cmd($cmd);
741
742 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type});
743
744 return $disks;
745 }
746
747 sub run_cmd {
748 my ($cmd) = @_;
749 print "Start CMD\n" if $DEBUG;
750 print Dumper $cmd if $DEBUG;
751 if (ref($cmd) eq 'ARRAY') {
752 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
753 }
754 my $output = `$cmd 2>&1`;
755
756 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
757
758 chomp($output);
759 print Dumper $output if $DEBUG;
760 print "END CMD\n" if $DEBUG;
761 return $output;
762 }
763
764 sub parse_disks {
765 my ($text, $ip, $vm_type) = @_;
766
767 my $disks;
768
769 my $num = 0;
770 while ($text && $text =~ s/^(.*?)(\n|$)//) {
771 my $line = $1;
772
773 next if $line =~ /media=cdrom/;
774 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
775
776 #QEMU if backup is not set include in sync
777 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
778
779 #LXC if backup is not set do no in sync
780 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
781
782 my $disk = undef;
783 my $stor = undef;
784 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
785 my @parameter = split(/,/,$1);
786
787 foreach my $opt (@parameter) {
788 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
789 $disk = $2;
790 $stor = $1;
791 last;
792 }
793 }
794 }
795 if (!defined($disk) || !defined($stor)) {
796 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
797 next;
798 }
799
800 my $cmd = [];
801 push @$cmd, 'ssh', "root\@$ip", '--' if $ip;
802 push @$cmd, 'pvesm', 'path', "$stor$disk";
803 my $path = run_cmd($cmd);
804
805 die "Get no path from pvesm path $stor$disk\n" if !$path;
806
807 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
808
809 my @array = split('/', $1);
810 $disks->{$num}->{pool} = shift(@array);
811 $disks->{$num}->{all} = $disks->{$num}->{pool};
812 if (0 < @array) {
813 $disks->{$num}->{path} = join('/', @array);
814 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
815 }
816 $disks->{$num}->{last_part} = $disk;
817 $disks->{$num}->{all} .= "\/$disk";
818
819 $num++;
820 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
821
822 $disks->{$num}->{pool} = $1;
823 $disks->{$num}->{all} = $disks->{$num}->{pool};
824
825 if ($2) {
826 $disks->{$num}->{path} = $3;
827 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
828 }
829
830 $disks->{$num}->{last_part} = $disk;
831 $disks->{$num}->{all} .= "\/$disk";
832
833 $num++;
834
835 } else {
836 die "ERROR: in path\n";
837 }
838 }
839
840 die "Vm include no disk on zfs.\n" if !$disks->{0};
841 return $disks;
842 }
843
844 sub snapshot_destroy {
845 my ($source, $dest, $method, $snap) = @_;
846
847 my @zfscmd = ('zfs', 'destroy');
848 my $snapshot = "$source->{all}\@$snap";
849
850 eval {
851 if($source->{ip} && $method eq 'ssh'){
852 run_cmd(['ssh', "root\@$source->{ip}", '--', @zfscmd, $snapshot]);
853 } else {
854 run_cmd([@zfscmd, $snapshot]);
855 }
856 };
857 if (my $erro = $@) {
858 warn "WARN: $erro";
859 }
860 if ($dest) {
861 my @ssh = $dest->{ip} ? ('ssh', "root\@$dest->{ip}", '--') : ();
862
863 my $path = "$dest->{all}";
864 $path .= "/$source->{last_part}" if $source->{last_part};
865
866 eval {
867 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
868 };
869 if (my $erro = $@) {
870 warn "WARN: $erro";
871 }
872 }
873 }
874
875 sub snapshot_exist {
876 my ($source , $dest, $method) = @_;
877
878 my $cmd = [];
879 push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
880 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
881
882 my $path = $dest->{all};
883 $path .= "/$source->{last_part}" if $source->{last_part};
884 $path .= "\@$source->{old_snap}";
885
886 push @$cmd, $path;
887
888
889 my $text = "";
890 eval {$text =run_cmd($cmd);};
891 if (my $erro =$@) {
892 warn "WARN: $erro";
893 return undef;
894 }
895
896 while ($text && $text =~ s/^(.*?)(\n|$)//) {
897 my $line =$1;
898 return 1 if $line =~ m/^.*$source->{old_snap}$/;
899 }
900 }
901
902 sub send_image {
903 my ($source, $dest, $param) = @_;
904
905 my $cmd = [];
906
907 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "root\@$source->{ip}", '--' if $source->{ip};
908 push @$cmd, 'zfs', 'send';
909 push @$cmd, '-v' if $param->{verbose};
910
911 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method})) {
912 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
913 }
914 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
915
916 if ($param->{limit}){
917 my $bwl = $param->{limit}*1024;
918 push @$cmd, \'|', 'cstream', '-t', $bwl;
919 }
920 my $target = "$dest->{all}";
921 $target .= "/$source->{last_part}" if $source->{last_part};
922 $target =~ s!/+!/!g;
923
924 push @$cmd, \'|';
925 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "root\@$dest->{ip}", '--' if $dest->{ip};
926 push @$cmd, 'zfs', 'recv', '-F', '--';
927 push @$cmd, "$target";
928
929 eval {
930 run_cmd($cmd)
931 };
932
933 if (my $erro = $@) {
934 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap});
935 die $erro;
936 };
937 }
938
939
940 sub send_config{
941 my ($source, $dest, $method) = @_;
942
943 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
944 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
945
946 my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
947
948 $dest_target_new = $config_dir.'/'.$dest_target_new;
949
950 if ($method eq 'ssh'){
951 if ($dest->{ip} && $source->{ip}) {
952 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
953 run_cmd(['scp', '--', "root\@[$source->{ip}]:$source_target", "root\@[$dest->{ip}]:$dest_target_new"]);
954 } elsif ($dest->{ip}) {
955 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
956 run_cmd(['scp', '--', $source_target, "root\@[$dest->{ip}]:$dest_target_new"]);
957 } elsif ($source->{ip}) {
958 run_cmd(['mkdir', '-p', '--', $config_dir]);
959 run_cmd(['scp', '--', "root\@[$source->{ip}]:$source_target", $dest_target_new]);
960 }
961
962 if ($source->{destroy}){
963 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
964 if($dest->{ip}){
965 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
966 } else {
967 run_cmd(['rm', '-f', '--', $dest_target_old]);
968 }
969 }
970 } elsif ($method eq 'local') {
971 run_cmd(['mkdir', '-p', '--', $config_dir]);
972 run_cmd(['cp', $source_target, $dest_target_new]);
973 }
974 }
975
976 sub get_date {
977 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
978 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
979
980 return $datestamp;
981 }
982
983 sub status {
984 my $cfg = read_cron();
985
986 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
987
988 my $states = read_state();
989
990 foreach my $source (sort keys%{$cfg}) {
991 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
992 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
993 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
994 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
995 }
996 }
997
998 return $status_list;
999 }
1000
1001 sub enable_job {
1002 my ($param) = @_;
1003
1004 my $job = get_job($param);
1005 $job->{state} = "ok";
1006 update_state($job);
1007 update_cron($job);
1008 }
1009
1010 sub disable_job {
1011 my ($param) = @_;
1012
1013 my $job = get_job($param);
1014 $job->{state} = "stopped";
1015 update_state($job);
1016 update_cron($job);
1017 }
1018
1019 my $command = $ARGV[0];
1020
1021 my $commands = {'destroy' => 1,
1022 'create' => 1,
1023 'sync' => 1,
1024 'list' => 1,
1025 'status' => 1,
1026 'help' => 1,
1027 'enable' => 1,
1028 'disable' => 1};
1029
1030 if (!$command || !$commands->{$command}) {
1031 usage();
1032 die "\n";
1033 }
1034
1035 my $help_sync = <<EOF;
1036 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1037
1038 will sync one time
1039
1040 -dest string
1041
1042 the destination target is like [IP:]<Pool>[/Path]
1043
1044 -limit integer
1045
1046 max sync speed in kBytes/s, default unlimited
1047
1048 -maxsnap integer
1049
1050 how much snapshots will be kept before get erased, default 1
1051
1052 -name string
1053
1054 name of the sync job, if not set it is default.
1055 It is only necessary if scheduler allready contains this source.
1056
1057 -source string
1058
1059 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1060
1061 -verbose boolean
1062
1063 print out the sync progress.
1064 EOF
1065
1066 my $help_create = <<EOF;
1067 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1068
1069 Create a sync Job
1070
1071 -dest string
1072
1073 the destination target is like [IP]:<Pool>[/Path]
1074
1075 -limit integer
1076
1077 max sync speed in kBytes/s, default unlimited
1078
1079 -maxsnap string
1080
1081 how much snapshots will be kept before get erased, default 1
1082
1083 -name string
1084
1085 name of the sync job, if not set it is default
1086
1087 -skip boolean
1088
1089 if this flag is set it will skip the first sync
1090
1091 -source string
1092
1093 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1094 EOF
1095
1096 my $help_destroy = <<EOF;
1097 $PROGNAME destroy -source <string> [OPTIONS]
1098
1099 remove a sync Job from the scheduler
1100
1101 -name string
1102
1103 name of the sync job, if not set it is default
1104
1105 -source string
1106
1107 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1108 EOF
1109
1110 my $help_help = <<EOF;
1111 $PROGNAME help <cmd> [OPTIONS]
1112
1113 Get help about specified command.
1114
1115 <cmd> string
1116
1117 Command name
1118
1119 -verbose boolean
1120
1121 Verbose output format.
1122 EOF
1123
1124 my $help_list = <<EOF;
1125 $PROGNAME list
1126
1127 Get a List of all scheduled Sync Jobs
1128 EOF
1129
1130 my $help_status = <<EOF;
1131 $PROGNAME status
1132
1133 Get the status of all scheduled Sync Jobs
1134 EOF
1135
1136 my $help_enable = <<EOF;
1137 $PROGNAME enable -source <string> [OPTIONS]
1138
1139 enable a syncjob and reset error
1140
1141 -name string
1142
1143 name of the sync job, if not set it is default
1144
1145 -source string
1146
1147 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1148 EOF
1149
1150 my $help_disable = <<EOF;
1151 $PROGNAME disable -source <string> [OPTIONS]
1152
1153 pause a sync job
1154
1155 -name string
1156
1157 name of the sync job, if not set it is default
1158
1159 -source string
1160
1161 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1162 EOF
1163
1164 sub help {
1165 my ($command) = @_;
1166
1167 if ($command eq 'help') {
1168 die "$help_help\n";
1169
1170 } elsif ($command eq 'sync') {
1171 die "$help_sync\n";
1172
1173 } elsif ($command eq 'destroy') {
1174 die "$help_destroy\n";
1175
1176 } elsif ($command eq 'create') {
1177 die "$help_create\n";
1178
1179 } elsif ($command eq 'list') {
1180 die "$help_list\n";
1181
1182 } elsif ($command eq 'status') {
1183 die "$help_status\n";
1184
1185 } elsif ($command eq 'enable') {
1186 die "$help_enable\n";
1187
1188 } elsif ($command eq 'disable') {
1189 die "$help_disable\n";
1190
1191 }
1192
1193 }
1194
1195 my @arg = @ARGV;
1196 my $param = parse_argv(@arg);
1197
1198 if ($command eq 'destroy') {
1199 die "$help_destroy\n" if !$param->{source};
1200
1201 check_target($param->{source});
1202 destroy_job($param);
1203
1204 } elsif ($command eq 'sync') {
1205 die "$help_sync\n" if !$param->{source} || !$param->{dest};
1206
1207 check_target($param->{source});
1208 check_target($param->{dest});
1209 sync($param);
1210
1211 } elsif ($command eq 'create') {
1212 die "$help_create\n" if !$param->{source} || !$param->{dest};
1213
1214 check_target($param->{source});
1215 check_target($param->{dest});
1216 init($param);
1217
1218 } elsif ($command eq 'status') {
1219 print status();
1220
1221 } elsif ($command eq 'list') {
1222 print list();
1223
1224 } elsif ($command eq 'help') {
1225 my $help_command = $ARGV[1];
1226
1227 if ($help_command && $commands->{$help_command}) {
1228 print help($help_command);
1229
1230 }
1231 if ($param->{verbose}) {
1232 exec("man $PROGNAME");
1233
1234 } else {
1235 usage(1);
1236
1237 }
1238
1239 } elsif ($command eq 'enable') {
1240 die "$help_enable\n" if !$param->{source};
1241
1242 check_target($param->{source});
1243 enable_job($param);
1244
1245 } elsif ($command eq 'disable') {
1246 die "$help_disable\n" if !$param->{source};
1247
1248 check_target($param->{source});
1249 disable_job($param);
1250
1251 }
1252
1253 sub usage {
1254 my ($help) = @_;
1255
1256 print("ERROR:\tno command specified\n") if !$help;
1257 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1258 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1259 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1260 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1261 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1262 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1263 print("\t$PROGNAME list\n");
1264 print("\t$PROGNAME status\n");
1265 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1266 }
1267
1268 sub check_target {
1269 my ($target) = @_;
1270 parse_target($target);
1271 }
1272
1273 __END__
1274
1275 =head1 NAME
1276
1277 pve-zsync - PVE ZFS Replication Manager
1278
1279 =head1 SYNOPSIS
1280
1281 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1282
1283 pve-zsync help <cmd> [OPTIONS]
1284
1285 Get help about specified command.
1286
1287 <cmd> string
1288
1289 Command name
1290
1291 -verbose boolean
1292
1293 Verbose output format.
1294
1295 pve-zsync create -dest <string> -source <string> [OPTIONS]
1296
1297 Create a sync Job
1298
1299 -dest string
1300
1301 the destination target is like [IP]:<Pool>[/Path]
1302
1303 -limit integer
1304
1305 max sync speed in kBytes/s, default unlimited
1306
1307 -maxsnap string
1308
1309 how much snapshots will be kept before get erased, default 1
1310
1311 -name string
1312
1313 name of the sync job, if not set it is default
1314
1315 -skip boolean
1316
1317 if this flag is set it will skip the first sync
1318
1319 -source string
1320
1321 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1322
1323 pve-zsync destroy -source <string> [OPTIONS]
1324
1325 remove a sync Job from the scheduler
1326
1327 -name string
1328
1329 name of the sync job, if not set it is default
1330
1331 -source string
1332
1333 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1334
1335 pve-zsync disable -source <string> [OPTIONS]
1336
1337 pause a sync job
1338
1339 -name string
1340
1341 name of the sync job, if not set it is default
1342
1343 -source string
1344
1345 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1346
1347 pve-zsync enable -source <string> [OPTIONS]
1348
1349 enable a syncjob and reset error
1350
1351 -name string
1352
1353 name of the sync job, if not set it is default
1354
1355 -source string
1356
1357 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1358 pve-zsync list
1359
1360 Get a List of all scheduled Sync Jobs
1361
1362 pve-zsync status
1363
1364 Get the status of all scheduled Sync Jobs
1365
1366 pve-zsync sync -dest <string> -source <string> [OPTIONS]
1367
1368 will sync one time
1369
1370 -dest string
1371
1372 the destination target is like [IP:]<Pool>[/Path]
1373
1374 -limit integer
1375
1376 max sync speed in kBytes/s, default unlimited
1377
1378 -maxsnap integer
1379
1380 how much snapshots will be kept before get erased, default 1
1381
1382 -name string
1383
1384 name of the sync job, if not set it is default.
1385 It is only necessary if scheduler allready contains this source.
1386
1387 -source string
1388
1389 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1390
1391 -verbose boolean
1392
1393 print out the sync progress.
1394
1395 =head1 DESCRIPTION
1396
1397 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1398 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1399 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1400 To config cron see man crontab.
1401
1402 =head2 PVE ZFS Storage sync Tool
1403
1404 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1405
1406 =head1 EXAMPLES
1407
1408 add sync job from local VM to remote ZFS Server
1409 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1410
1411 =head1 IMPORTANT FILES
1412
1413 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1414
1415 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1416
1417 =head1 COPYRIGHT AND DISCLAIMER
1418
1419 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1420
1421 This program is free software: you can redistribute it and/or modify it
1422 under the terms of the GNU Affero General Public License as published
1423 by the Free Software Foundation, either version 3 of the License, or
1424 (at your option) any later version.
1425
1426 This program is distributed in the hope that it will be useful, but
1427 WITHOUT ANY WARRANTY; without even the implied warranty of
1428 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1429 Affero General Public License for more details.
1430
1431 You should have received a copy of the GNU Affero General Public
1432 License along with this program. If not, see
1433 <http://www.gnu.org/licenses/>.