]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
improve reliability and recover chnged snapshots on source
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Copy qw(move);
9 use File::Path qw(make_path);
10 use Switch;
11 use JSON;
12 use IO::File;
13 use String::ShellQuote 'shell_quote';
14
15 my $PROGNAME = "pve-zsync";
16 my $CONFIG_PATH = "/var/lib/${PROGNAME}/";
17 my $STATE = "${CONFIG_PATH}sync_state";
18 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
19 my $PATH = "/usr/sbin/";
20 my $QEMU_CONF = "/etc/pve/local/qemu-server/";
21 my $LOCKFILE = "$CONFIG_PATH${PROGNAME}.lock";
22 my $PROG_PATH = "$PATH${PROGNAME}";
23 my $INTERVAL = 15;
24 my $DEBUG = 0;
25
26 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
27 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
28 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
29 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
30
31 my $IPV6RE = "(?:" .
32 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
33 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
34 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
35 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
36 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
41
42 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
43 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
44 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
45 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
46 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
47
48 check_bin ('cstream');
49 check_bin ('zfs');
50 check_bin ('ssh');
51 check_bin ('scp');
52
53 sub check_bin {
54 my ($bin) = @_;
55
56 foreach my $p (split (/:/, $ENV{PATH})) {
57 my $fn = "$p/$bin";
58 if (-x $fn) {
59 return $fn;
60 }
61 }
62
63 die "unable to find command '$bin'\n";
64 }
65
66 sub cut_target_width {
67 my ($target, $max) = @_;
68
69 return $target if (length($target) <= $max);
70 my @spl = split('/', $target);
71
72 my $count = length($spl[@spl-1]);
73 return "..\/".substr($spl[@spl-1],($count-$max)+3 ,$count) if $count > $max;
74
75 $count += length($spl[0]) if @spl > 1;
76 return substr($spl[0], 0, $max-4-length($spl[@spl-1]))."\/..\/".$spl[@spl-1] if $count > $max;
77
78 my $rest = 1;
79 $rest = $max-$count if ($max-$count > 0);
80
81 return "$spl[0]".substr($target, length($spl[0]), $rest)."..\/".$spl[@spl-1];
82 }
83
84 sub lock {
85 my ($fh) = @_;
86 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
87 }
88
89 sub unlock {
90 my ($fh) = @_;
91 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
92 }
93
94 sub get_status {
95 my ($source, $name, $status) = @_;
96
97 if ($status->{$source->{all}}->{$name}->{status}) {
98 return $status;
99 }
100
101 return undef;
102 }
103
104 sub check_pool_exists {
105 my ($target) = @_;
106
107 my $cmd = [];
108 push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
109 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
110 eval {
111 run_cmd($cmd);
112 };
113
114 if ($@) {
115 return 1;
116 }
117 return undef;
118 }
119
120 sub parse_target {
121 my ($text) = @_;
122
123 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
124 my $target = {};
125
126 if ($text !~ $TARGETRE) {
127 die "$errstr\n";
128 }
129 $target->{all} = $2;
130 $target->{ip} = $1 if $1;
131 my @parts = split('/', $2);
132
133 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
134
135 my $pool = $target->{pool} = shift(@parts);
136 die "$errstr\n" if !$pool;
137
138 if ($pool =~ m/^\d+$/) {
139 $target->{vmid} = $pool;
140 delete $target->{pool};
141 }
142
143 return $target if (@parts == 0);
144 $target->{last_part} = pop(@parts);
145
146 if ($target->{ip}) {
147 pop(@parts);
148 }
149 if (@parts > 0) {
150 $target->{path} = join('/', @parts);
151 }
152
153 return $target;
154 }
155
156 sub read_cron {
157
158 #This is for the first use to init file;
159 if (!-e $CRONJOBS) {
160 my $new_fh = IO::File->new("> $CRONJOBS");
161 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
162 close($new_fh);
163 return undef;
164 }
165
166 my $fh = IO::File->new("< $CRONJOBS");
167 die "Could not open file $CRONJOBS: $!\n" if !$fh;
168
169 my @text = <$fh>;
170
171 close($fh);
172
173 return encode_cron(@text);
174 }
175
176 sub parse_argv {
177 my (@arg) = @_;
178
179 my $param = {};
180 $param->{dest} = undef;
181 $param->{source} = undef;
182 $param->{verbose} = undef;
183 $param->{limit} = undef;
184 $param->{maxsnap} = undef;
185 $param->{name} = undef;
186 $param->{skip} = undef;
187 $param->{method} = undef;
188
189 my ($ret, $ar) = GetOptionsFromArray(\@arg,
190 'dest=s' => \$param->{dest},
191 'source=s' => \$param->{source},
192 'verbose' => \$param->{verbose},
193 'limit=i' => \$param->{limit},
194 'maxsnap=i' => \$param->{maxsnap},
195 'name=s' => \$param->{name},
196 'skip' => \$param->{skip},
197 'method=s' => \$param->{method});
198
199 if ($ret == 0) {
200 die "can't parse options\n";
201 }
202
203 $param->{name} = "default" if !$param->{name};
204 $param->{maxsnap} = 1 if !$param->{maxsnap};
205 $param->{method} = "ssh" if !$param->{method};
206
207 return $param;
208 }
209
210 sub add_state_to_job {
211 my ($job) = @_;
212
213 my $states = read_state();
214 my $state = $states->{$job->{source}}->{$job->{name}};
215
216 $job->{state} = $state->{state};
217 $job->{lsync} = $state->{lsync};
218
219 for (my $i = 0; $state->{"snap$i"}; $i++) {
220 $job->{"snap$i"} = $state->{"snap$i"};
221 }
222
223 return $job;
224 }
225
226 sub encode_cron {
227 my (@text) = @_;
228
229 my $cfg = {};
230
231 while (my $line = shift(@text)) {
232
233 my @arg = split('\s', $line);
234 my $param = parse_argv(@arg);
235
236 if ($param->{source} && $param->{dest}) {
237 $cfg->{$param->{source}}->{$param->{name}}->{dest} = $param->{dest};
238 $cfg->{$param->{source}}->{$param->{name}}->{verbose} = $param->{verbose};
239 $cfg->{$param->{source}}->{$param->{name}}->{limit} = $param->{limit};
240 $cfg->{$param->{source}}->{$param->{name}}->{maxsnap} = $param->{maxsnap};
241 $cfg->{$param->{source}}->{$param->{name}}->{skip} = $param->{skip};
242 $cfg->{$param->{source}}->{$param->{name}}->{method} = $param->{method};
243 }
244 }
245
246 return $cfg;
247 }
248
249 sub param_to_job {
250 my ($param) = @_;
251
252 my $job = {};
253
254 my $source = parse_target($param->{source});
255 my $dest = parse_target($param->{dest}) if $param->{dest};
256
257 $job->{name} = !$param->{name} ? "default" : $param->{name};
258 $job->{dest} = $param->{dest} if $param->{dest};
259 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
260 $job->{method} = "ssh" if !$job->{method};
261 $job->{limit} = $param->{limit};
262 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
263 $job->{source} = $param->{source};
264
265 return $job;
266 }
267
268 sub read_state {
269
270 if (!-e $STATE) {
271 make_path $CONFIG_PATH;
272 my $new_fh = IO::File->new("> $STATE");
273 die "Could not create $STATE: $!\n" if !$new_fh;
274 print $new_fh "{}";
275 close($new_fh);
276 return undef;
277 }
278
279 my $fh = IO::File->new("< $STATE");
280 die "Could not open file $STATE: $!\n" if !$fh;
281
282 my $text = <$fh>;
283 my $states = decode_json($text);
284
285 close($fh);
286
287 return $states;
288 }
289
290 sub update_state {
291 my ($job) = @_;
292 my $text;
293 my $in_fh;
294
295 eval {
296
297 $in_fh = IO::File->new("< $STATE");
298 die "Could not open file $STATE: $!\n" if !$in_fh;
299 lock($in_fh);
300 $text = <$in_fh>;
301 };
302
303 my $out_fh = IO::File->new("> $STATE.new");
304 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
305
306 my $states = {};
307 my $state = {};
308 if ($text){
309 $states = decode_json($text);
310 $state = $states->{$job->{source}}->{$job->{name}};
311 }
312
313 if ($job->{state} ne "del") {
314 $state->{state} = $job->{state};
315 $state->{lsync} = $job->{lsync};
316
317 for (my $i = 0; $job->{"snap$i"} ; $i++) {
318 $state->{"snap$i"} = $job->{"snap$i"};
319 }
320 $states->{$job->{source}}->{$job->{name}} = $state;
321 } else {
322
323 delete $states->{$job->{source}}->{$job->{name}};
324 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
325 }
326
327 $text = encode_json($states);
328 print $out_fh $text;
329
330 close($out_fh);
331 move("$STATE.new", $STATE);
332 eval {
333 close($in_fh);
334 };
335
336 return $states;
337 }
338
339 sub update_cron {
340 my ($job) = @_;
341
342 my $updated;
343 my $has_header;
344 my $line_no = 0;
345 my $text = "";
346 my $header = "SHELL=/bin/sh\n";
347 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
348
349 my $fh = IO::File->new("< $CRONJOBS");
350 die "Could not open file $CRONJOBS: $!\n" if !$fh;
351 lock($fh);
352
353 my @test = <$fh>;
354
355 while (my $line = shift(@test)) {
356 chomp($line);
357 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
358 $updated = 1;
359 next if $job->{state} eq "del";
360 $text .= format_job($job, $line);
361 } else {
362 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
363 $has_header = 1;
364 }
365 $text .= "$line\n";
366 }
367 $line_no++;
368 }
369
370 if (!$has_header) {
371 $text = "$header$text";
372 }
373
374 if (!$updated) {
375 $text .= format_job($job);
376 }
377 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
378 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
379
380 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
381 close ($new_fh);
382
383 die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
384 close ($fh);
385 }
386
387 sub format_job {
388 my ($job, $line) = @_;
389 my $text = "";
390
391 if ($job->{state} eq "stopped") {
392 $text = "#";
393 }
394 if ($line) {
395 $line =~ /^#*(.+) root/;
396 $text .= $1;
397 } else {
398 $text .= "*/$INTERVAL * * * *";
399 }
400 $text .= " root";
401 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
402 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
403 $text .= " --method $job->{method}";
404 $text .= " --verbose" if $job->{verbose};
405 $text .= "\n";
406
407 return $text;
408 }
409
410 sub list {
411
412 my $cfg = read_cron();
413
414 my $list = sprintf("%-25s%-15s%-7s%-20s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE");
415
416 my $states = read_state();
417 foreach my $source (sort keys%{$cfg}) {
418 foreach my $name (sort keys%{$cfg->{$source}}) {
419 $list .= sprintf("%-25s", cut_target_width($source, 25));
420 $list .= sprintf("%-15s", cut_target_width($name, 15));
421 $list .= sprintf("%-7s", $states->{$source}->{$name}->{state});
422 $list .= sprintf("%-20s",$states->{$source}->{$name}->{lsync});
423 $list .= sprintf("%-5s\n",$cfg->{$source}->{$name}->{method});
424 }
425 }
426
427 return $list;
428 }
429
430 sub vm_exists {
431 my ($target) = @_;
432
433 my $cmd = [];
434 push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
435 push @$cmd, 'qm', 'status', $target->{vmid};
436
437 my $res = run_cmd($cmd);
438
439 return 1 if ($res =~ m/^status.*$/);
440 return undef;
441 }
442
443 sub init {
444 my ($param) = @_;
445
446 my $cfg = read_cron();
447
448 my $job = param_to_job($param);
449
450 $job->{state} = "ok";
451 $job->{lsync} = 0;
452
453 my $source = parse_target($param->{source});
454 my $dest = parse_target($param->{dest});
455
456 if (my $ip = $dest->{ip}) {
457 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
458 }
459
460 if (my $ip = $source->{ip}) {
461 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
462 }
463
464 die "Pool $dest->{all} does not exists\n" if check_pool_exists($dest);
465
466 my $check = check_pool_exists($source->{path}, $source->{ip}) if !$source->{vmid} && $source->{path};
467
468 die "Pool $source->{path} does not exists\n" if undef($check);
469
470 die "VM $source->{vmid} doesn't exist\n" if $param->{vmid} && !vm_exists($source);
471
472 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
473
474 update_cron($job);
475 update_state($job);
476
477 eval {
478 sync($param) if !$param->{skip};
479 };
480 if(my $err = $@) {
481 destroy_job($param);
482 print $err;
483 }
484 }
485
486 sub get_job {
487 my ($param) = @_;
488
489 my $cfg = read_cron();
490
491 if (!$cfg->{$param->{source}}->{$param->{name}}) {
492 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
493 }
494 my $job = $cfg->{$param->{source}}->{$param->{name}};
495 $job->{name} = $param->{name};
496 $job->{source} = $param->{source};
497 $job = add_state_to_job($job);
498
499 return $job;
500 }
501
502 sub destroy_job {
503 my ($param) = @_;
504
505 my $job = get_job($param);
506 $job->{state} = "del";
507
508 update_cron($job);
509 update_state($job);
510 }
511
512 sub sync {
513 my ($param) = @_;
514
515 my $lock_fh = IO::File->new("> $LOCKFILE");
516 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
517 lock($lock_fh);
518
519 my $date = get_date();
520 my $job;
521 eval {
522 $job = get_job($param);
523 };
524
525 if ($job && $job->{state} eq "syncing") {
526 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
527 }
528
529 my $dest = parse_target($param->{dest});
530 my $source = parse_target($param->{source});
531
532 my $sync_path = sub {
533 my ($source, $dest, $job, $param, $date) = @_;
534
535 ($source->{old_snap},$source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name});
536
537 snapshot_add($source, $dest, $param->{name}, $date);
538
539 send_image($source, $dest, $param);
540
541 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}) if ($source->{destroy} && $source->{old_snap});
542
543 };
544
545 if ($job) {
546 $job->{state} = "syncing";
547 update_state($job);
548 }
549
550 eval{
551 if ($source->{vmid}) {
552 die "VM $source->{vmid} doesn't exist\n" if !vm_exists($source);
553 my $disks = get_disks($source);
554
555 foreach my $disk (sort keys %{$disks}) {
556 $source->{all} = $disks->{$disk}->{all};
557 $source->{pool} = $disks->{$disk}->{pool};
558 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
559 $source->{last_part} = $disks->{$disk}->{last_part};
560 &$sync_path($source, $dest, $job, $param, $date);
561 }
562 if ($param->{method} eq "ssh") {
563 send_config($source, $dest,'ssh');
564 }
565 } else {
566 &$sync_path($source, $dest, $job, $param, $date);
567 }
568 };
569 if(my $err = $@) {
570 if ($job) {
571 $job->{state} = "error";
572 update_state($job);
573 unlock($lock_fh);
574 close($lock_fh);
575 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
576 }
577 die "$err\n";
578 }
579
580 if ($job) {
581 $job->{state} = "ok";
582 $job->{lsync} = $date;
583 update_state($job);
584 }
585
586 unlock($lock_fh);
587 close($lock_fh);
588 }
589
590 sub snapshot_get{
591 my ($source, $dest, $max_snap, $name) = @_;
592
593 my $cmd = [];
594 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
595 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
596 push @$cmd, $source->{all};
597
598 my $raw = run_cmd($cmd);
599 my $index = 0;
600 my $line = "";
601 my $last_snap = undef;
602 my $old_snap;
603
604 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
605 $line = $1;
606 if ($line =~ m/(rep_$name.*)$/) {
607 $last_snap = $1 if (!$last_snap);
608 $old_snap = $1;
609 $index++;
610 if ($index == $max_snap) {
611 $source->{destroy} = 1;
612 last;
613 };
614 }
615 }
616
617 return ($old_snap, $last_snap) if $last_snap;
618
619 return undef;
620 }
621
622 sub snapshot_add {
623 my ($source, $dest, $name, $date) = @_;
624
625 my $snap_name = "rep_$name\_".$date;
626
627 $source->{new_snap} = $snap_name;
628
629 my $path = "$source->{all}\@$snap_name";
630
631 my $cmd = [];
632 push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
633 push @$cmd, 'zfs', 'snapshot', $path;
634 eval{
635 run_cmd($cmd);
636 };
637
638 if (my $err = $@) {
639 snapshot_destroy($source, $dest, 'ssh', $snap_name);
640 die "$err\n";
641 }
642 }
643
644 sub write_cron {
645 my ($cfg) = @_;
646
647 my $text = "SHELL=/bin/sh\n";
648 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
649
650 my $fh = IO::File->new("> $CRONJOBS");
651 die "Could not open file: $!\n" if !$fh;
652
653 foreach my $source (sort keys%{$cfg}) {
654 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
655 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
656 $text .= "$PROG_PATH sync";
657 $text .= " -source ";
658 if ($cfg->{$source}->{$sync_name}->{vmid}) {
659 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
660 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
661 } else {
662 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
663 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
664 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
665 }
666 $text .= " -dest ";
667 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
668 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
669 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
670 $text .= " -name $sync_name ";
671 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
672 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
673 $text .= "\n";
674 }
675 }
676 die "Can't write to cron\n" if (!print($fh $text));
677 close($fh);
678 }
679
680 sub get_disks {
681 my ($target) = @_;
682
683 my $cmd = [];
684 push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};
685 push @$cmd, 'qm', 'config', $target->{vmid};
686
687 my $res = run_cmd($cmd);
688
689 my $disks = parse_disks($res, $target->{ip});
690
691 return $disks;
692 }
693
694 sub run_cmd {
695 my ($cmd) = @_;
696 print "Start CMD\n" if $DEBUG;
697 print Dumper $cmd if $DEBUG;
698 if (ref($cmd) eq 'ARRAY') {
699 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
700 }
701 my $output = `$cmd 2>&1`;
702
703 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
704
705 chomp($output);
706 print Dumper $output if $DEBUG;
707 print "END CMD\n" if $DEBUG;
708 return $output;
709 }
710
711 sub parse_disks {
712 my ($text, $ip) = @_;
713
714 my $disks;
715
716 my $num = 0;
717 while ($text && $text =~ s/^(.*?)(\n|$)//) {
718 my $line = $1;
719
720 next if $line =~ /cdrom|none/;
721 next if $line !~ m/^(?:virtio|ide|scsi|sata)\d+: /;
722
723 my $disk = undef;
724 my $stor = undef;
725 if($line =~ m/^(?:virtio|ide|scsi|sata)\d+: (.+:)([A-Za-z0-9\-]+),(.*)$/) {
726 $disk = $2;
727 $stor = $1;
728 } else {
729 die "disk is not on ZFS Storage\n";
730 }
731
732 my $cmd = [];
733 push @$cmd, 'ssh', "root\@$ip", '--' if $ip;
734 push @$cmd, 'pvesm', 'path', "$stor$disk";
735 my $path = run_cmd($cmd);
736
737 if ($path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
738
739 my @array = split('/', $1);
740 $disks->{$num}->{pool} = shift(@array);
741 $disks->{$num}->{all} = $disks->{$num}->{pool};
742 if (0 < @array) {
743 $disks->{$num}->{path} = join('/', @array);
744 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
745 }
746 $disks->{$num}->{last_part} = $disk;
747 $disks->{$num}->{all} .= "\/$disk";
748
749 $num++;
750
751 } else {
752 die "ERROR: in path\n";
753 }
754 }
755
756 return $disks;
757 }
758
759 sub snapshot_destroy {
760 my ($source, $dest, $method, $snap) = @_;
761
762 my @zfscmd = ('zfs', 'destroy');
763 my $snapshot = "$source->{all}\@$snap";
764
765 eval {
766 if($source->{ip} && $method eq 'ssh'){
767 run_cmd(['ssh', "root\@$source->{ip}", '--', @zfscmd, $snapshot]);
768 } else {
769 run_cmd([@zfscmd, $snapshot]);
770 }
771 };
772 if (my $erro = $@) {
773 warn "WARN: $erro";
774 }
775 if ($dest) {
776 my @ssh = $dest->{ip} ? ('ssh', "root\@$dest->{ip}", '--') : ();
777
778 my $path = "$dest->{all}\/$source->{last_part}";
779
780 eval {
781 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
782 };
783 if (my $erro = $@) {
784 warn "WARN: $erro";
785 }
786 }
787 }
788
789 sub snapshot_exist {
790 my ($source ,$dest, $method) = @_;
791
792 my $cmd = [];
793 push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
794 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
795 push @$cmd, "$dest->{all}/$source->{last_part}\@$source->{old_snap}";
796
797 my $text = "";
798 eval {$text =run_cmd($cmd);};
799 if (my $erro = $@) {
800 warn "WARN: $erro";
801 return undef;
802 }
803
804 while ($text && $text =~ s/^(.*?)(\n|$)//) {
805 my $line = $1;
806 return 1 if $line =~ m/^.*$source->{old_snap}$/;
807 }
808 }
809
810 sub send_image {
811 my ($source, $dest, $param) = @_;
812
813 my $cmd = [];
814
815 push @$cmd, 'ssh', "root\@$source->{ip}", '--' if $source->{ip};
816 push @$cmd, 'zfs', 'send';
817 push @$cmd, '-v' if $param->{verbose};
818
819 if($source->{last_snap} && snapshot_exist($source ,$dest, $param->{method})) {
820 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
821 }
822 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
823
824 if ($param->{limit}){
825 my $bwl = $param->{limit}*1024;
826 push @$cmd, \'|', 'cstream', '-t', $bwl;
827 }
828 my $target = "$dest->{all}/$source->{last_part}";
829 $target =~ s!/+!/!g;
830
831 push @$cmd, \'|';
832 push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
833 push @$cmd, 'zfs', 'recv', '-F', '--';
834 push @$cmd, "$target";
835
836 eval {
837 run_cmd($cmd)
838 };
839
840 if (my $erro = $@) {
841 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap});
842 die $erro;
843 };
844 }
845
846
847 sub send_config{
848 my ($source, $dest, $method) = @_;
849
850 my $source_target ="$QEMU_CONF$source->{vmid}.conf";
851 my $dest_target_new ="$CONFIG_PATH$source->{vmid}.conf.$source->{new_snap}";
852
853 if ($method eq 'ssh'){
854 if ($dest->{ip} && $source->{ip}) {
855 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $CONFIG_PATH]);
856 run_cmd(['scp', '--', "root\@[$source->{ip}]:$source_target", "root\@[$dest->{ip}]:$dest_target_new"]);
857 } elsif ($dest->{ip}) {
858 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $CONFIG_PATH]);
859 run_cmd(['scp', '--', $source_target, "root\@[$dest->{ip}]:$dest_target_new"]);
860 } elsif ($source->{ip}) {
861 run_cmd(['mkdir', '-p', '--', $CONFIG_PATH]);
862 run_cmd(['scp', '--', "root\@$source->{ip}:$source_target", $dest_target_new]);
863 }
864
865 if ($source->{destroy}){
866 my $dest_target_old ="$CONFIG_PATH$source->{vmid}.conf.$source->{old_snap}";
867 if($dest->{ip}){
868 run_cmd(['ssh', "root\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
869 } else {
870 run_cmd(['rm', '-f', '--', $dest_target_old]);
871 }
872 }
873 }
874 }
875
876 sub get_date {
877 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
878 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
879
880 return $datestamp;
881 }
882
883 sub status {
884 my $cfg = read_cron();
885
886 my $status_list = sprintf("%-25s%-15s%-10s\n", "SOURCE", "NAME", "STATUS");
887
888 my $states = read_state();
889
890 foreach my $source (sort keys%{$cfg}) {
891 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
892 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
893 $status_list .= sprintf("%-15s", cut_target_width($sync_name, 25));
894 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
895 }
896 }
897
898 return $status_list;
899 }
900
901 sub enable_job {
902 my ($param) = @_;
903
904 my $job = get_job($param);
905 $job->{state} = "ok";
906 update_state($job);
907 update_cron($job);
908 }
909
910 sub disable_job {
911 my ($param) = @_;
912
913 my $job = get_job($param);
914 $job->{state} = "stopped";
915 update_state($job);
916 update_cron($job);
917 }
918
919 my $command = $ARGV[0];
920
921 my $commands = {'destroy' => 1,
922 'create' => 1,
923 'sync' => 1,
924 'list' => 1,
925 'status' => 1,
926 'help' => 1,
927 'enable' => 1,
928 'disable' => 1};
929
930 if (!$command || !$commands->{$command}) {
931 usage();
932 die "\n";
933 }
934
935 my $help_sync = "$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
936 \twill sync one time\n
937 \t-dest\tstring\n
938 \t\tthe destination target is like [IP:]<Pool>[/Path]\n
939 \t-limit\tinteger\n
940 \t\tmax sync speed in kBytes/s, default unlimited\n
941 \t-maxsnap\tinteger\n
942 \t\thow much snapshots will be kept before get erased, default 1/n
943 \t-name\tstring\n
944 \t\tname of the sync job, if not set it is default.
945 \tIt is only necessary if scheduler allready contains this source.\n
946 \t-source\tstring\n
947 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
948
949 my $help_create = "$PROGNAME create -dest <string> -source <string> [OPTIONS]/n
950 \tCreate a sync Job\n
951 \t-dest\tstringn\n
952 \t\tthe destination target is like [IP]:<Pool>[/Path]\n
953 \t-limit\tinteger\n
954 \t\tmax sync speed in kBytes/s, default unlimited\n
955 \t-maxsnap\tstring\n
956 \t\thow much snapshots will be kept before get erased, default 1\n
957 \t-name\tstring\n
958 \t\tname of the sync job, if not set it is default\n
959 \t-skip\tboolean\n
960 \t\tif this flag is set it will skip the first sync\n
961 \t-source\tstring\n
962 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
963
964 my $help_destroy = "$PROGNAME destroy -source <string> [OPTIONS]\n
965 \tremove a sync Job from the scheduler\n
966 \t-name\tstring\n
967 \t\tname of the sync job, if not set it is default\n
968 \t-source\tstring\n
969 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
970
971 my $help_help = "$PROGNAME help <cmd> [OPTIONS]\n
972 \tGet help about specified command.\n
973 \t<cmd>\tstring\n
974 \t\tCommand name\n
975 \t-verbose\tboolean\n
976 \t\tVerbose output format.\n";
977
978 my $help_list = "$PROGNAME list\n
979 \tGet a List of all scheduled Sync Jobs\n";
980
981 my $help_status = "$PROGNAME status\n
982 \tGet the status of all scheduled Sync Jobs\n";
983
984 my $help_enable = "$PROGNAME enable -source <string> [OPTIONS]\n
985 \tenable a syncjob and reset error\n
986 \t-name\tstring\n
987 \t\tname of the sync job, if not set it is default\n
988 \t-source\tstring\n
989 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
990
991 my $help_disable = "$PROGNAME disable -source <string> [OPTIONS]\n
992 \tpause a syncjob\n
993 \t-name\tstring\n
994 \t\tname of the sync job, if not set it is default\n
995 \t-source\tstring\n
996 \t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";
997
998 sub help {
999 my ($command) = @_;
1000
1001 switch($command){
1002 case 'help'
1003 {
1004 die "$help_help\n";
1005 }
1006 case 'sync'
1007 {
1008 die "$help_sync\n";
1009 }
1010 case 'destroy'
1011 {
1012 die "$help_destroy\n";
1013 }
1014 case 'create'
1015 {
1016 die "$help_create\n";
1017 }
1018 case 'list'
1019 {
1020 die "$help_list\n";
1021 }
1022 case 'status'
1023 {
1024 die "$help_status\n";
1025 }
1026 case 'enable'
1027 {
1028 die "$help_enable\n";
1029 }
1030 case 'disable'
1031 {
1032 die "$help_enable\n";
1033 }
1034 }
1035
1036 }
1037
1038 my @arg = @ARGV;
1039 my $param = parse_argv(@arg);
1040
1041
1042 switch($command) {
1043 case "destroy"
1044 {
1045 die "$help_destroy\n" if !$param->{source};
1046 check_target($param->{source});
1047 destroy_job($param);
1048 }
1049 case "sync"
1050 {
1051 die "$help_sync\n" if !$param->{source} || !$param->{dest};
1052 check_target($param->{source});
1053 check_target($param->{dest});
1054 sync($param);
1055 }
1056 case "create"
1057 {
1058 die "$help_create\n" if !$param->{source} || !$param->{dest};
1059 check_target($param->{source});
1060 check_target($param->{dest});
1061 init($param);
1062 }
1063 case "status"
1064 {
1065 print status();
1066 }
1067 case "list"
1068 {
1069 print list();
1070 }
1071 case "help"
1072 {
1073 my $help_command = $ARGV[1];
1074 if ($help_command && $commands->{$help_command}) {
1075 print help($help_command);
1076 }
1077 if ($param->{verbose} == 1){
1078 exec("man $PROGNAME");
1079 } else {
1080 usage(1);
1081 }
1082 }
1083 case "enable"
1084 {
1085 die "$help_enable\n" if !$param->{source};
1086 check_target($param->{source});
1087 enable_job($param);
1088 }
1089 case "disable"
1090 {
1091 die "$help_disable\n" if !$param->{source};
1092 check_target($param->{source});
1093 disable_job($param);
1094 }
1095 }
1096
1097 sub usage {
1098 my ($help) = @_;
1099
1100 print("ERROR:\tno command specified\n") if !$help;
1101 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1102 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1103 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1104 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1105 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1106 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1107 print("\t$PROGNAME list\n");
1108 print("\t$PROGNAME status\n");
1109 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1110 }
1111
1112 sub check_target {
1113 my ($target) = @_;
1114 parse_target($target);
1115 }
1116
1117 __END__
1118
1119 =head1 NAME
1120
1121 pve-zsync - PVE ZFS Replication Manager
1122
1123 =head1 SYNOPSIS
1124
1125 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1126
1127 pve-zsync help <cmd> [OPTIONS]
1128
1129 Get help about specified command.
1130
1131 <cmd> string
1132
1133 Command name
1134
1135 -verbose boolean
1136
1137 Verbose output format.
1138
1139 pve-zsync create -dest <string> -source <string> [OPTIONS]
1140
1141 Create a sync Job
1142
1143 -dest string
1144
1145 the destination target is like [IP]:<Pool>[/Path]
1146
1147 -limit integer
1148
1149 max sync speed in kBytes/s, default unlimited
1150
1151 -maxsnap string
1152
1153 how much snapshots will be kept before get erased, default 1
1154
1155 -name string
1156
1157 name of the sync job, if not set it is default
1158
1159 -skip boolean
1160
1161 if this flag is set it will skip the first sync
1162
1163 -source string
1164
1165 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1166
1167 pve-zsync destroy -source <string> [OPTIONS]
1168
1169 remove a sync Job from the scheduler
1170
1171 -name string
1172
1173 name of the sync job, if not set it is default
1174
1175 -source string
1176
1177 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1178
1179 pve-zsync disable -source <string> [OPTIONS]
1180
1181 pause a sync job
1182
1183 -name string
1184
1185 name of the sync job, if not set it is default
1186
1187 -source string
1188
1189 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1190
1191 pve-zsync enable -source <string> [OPTIONS]
1192
1193 enable a syncjob and reset error
1194
1195 -name string
1196
1197 name of the sync job, if not set it is default
1198
1199 -source string
1200
1201 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1202 pve-zsync list
1203
1204 Get a List of all scheduled Sync Jobs
1205
1206 pve-zsync status
1207
1208 Get the status of all scheduled Sync Jobs
1209
1210 pve-zsync sync -dest <string> -source <string> [OPTIONS]
1211
1212 will sync one time
1213
1214 -dest string
1215
1216 the destination target is like [IP:]<Pool>[/Path]
1217
1218 -limit integer
1219
1220 max sync speed in kBytes/s, default unlimited
1221
1222 -maxsnap integer
1223
1224 how much snapshots will be kept before get erased, default 1
1225
1226 -name string
1227
1228 name of the sync job, if not set it is default.
1229 It is only necessary if scheduler allready contains this source.
1230
1231 -source string
1232
1233 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1234
1235 =head1 DESCRIPTION
1236
1237 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1238 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1239 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1240 To config cron see man crontab.
1241
1242 =head2 PVE ZFS Storage sync Tool
1243
1244 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1245
1246 =head1 EXAMPLES
1247
1248 add sync job from local VM to remote ZFS Server
1249 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1250
1251 =head1 IMPORTANT FILES
1252
1253 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1254
1255 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1256
1257 =head1 COPYRIGHT AND DISCLAIMER
1258
1259 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1260
1261 This program is free software: you can redistribute it and/or modify it
1262 under the terms of the GNU Affero General Public License as published
1263 by the Free Software Foundation, either version 3 of the License, or
1264 (at your option) any later version.
1265
1266 This program is distributed in the hope that it will be useful, but
1267 WITHOUT ANY WARRANTY; without even the implied warranty of
1268 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1269 Affero General Public License for more details.
1270
1271 You should have received a copy of the GNU Affero General Public
1272 License along with this program. If not, see
1273 <http://www.gnu.org/licenses/>.