]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
improve signal handler, print error
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
22 my $PROG_PATH = "$PATH/${PROGNAME}";
23 my $INTERVAL = 15;
24 my $DEBUG = 0;
25
26 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
27 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
28 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
29 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
30
31 my $IPV6RE = "(?:" .
32 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
33 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
34 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
35 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
36 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
41
42 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
43 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
44 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
45 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
46 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
47
48 my $command = $ARGV[0];
49
50 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
51 check_bin ('cstream');
52 check_bin ('zfs');
53 check_bin ('ssh');
54 check_bin ('scp');
55 }
56
57 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
58 die "Signaled, aborting sync: $!\n";
59 };
60
61 sub check_bin {
62 my ($bin) = @_;
63
64 foreach my $p (split (/:/, $ENV{PATH})) {
65 my $fn = "$p/$bin";
66 if (-x $fn) {
67 return $fn;
68 }
69 }
70
71 die "unable to find command '$bin'\n";
72 }
73
74 sub cut_target_width {
75 my ($path, $maxlen) = @_;
76 $path =~ s@/+@/@g;
77
78 return $path if length($path) <= $maxlen;
79
80 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
81
82 $path =~ s@/([^/]+/?)$@@;
83 my $tail = $1;
84
85 if (length($tail)+3 == $maxlen) {
86 return "../$tail";
87 } elsif (length($tail)+2 >= $maxlen) {
88 return '..'.substr($tail, -$maxlen+2)
89 }
90
91 $path =~ s@(/[^/]+)(?:/|$)@@;
92 my $head = $1;
93 my $both = length($head) + length($tail);
94 my $remaining = $maxlen-$both-4; # -4 for "/../"
95
96 if ($remaining < 0) {
97 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
98 }
99
100 substr($path, ($remaining/2), (length($path)-$remaining), '..');
101 return "$head/" . $path . "/$tail";
102 }
103
104 sub lock {
105 my ($fh) = @_;
106 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
107 }
108
109 sub unlock {
110 my ($fh) = @_;
111 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
112 }
113
114 sub get_status {
115 my ($source, $name, $status) = @_;
116
117 if ($status->{$source->{all}}->{$name}->{status}) {
118 return $status;
119 }
120
121 return undef;
122 }
123
124 sub check_pool_exists {
125 my ($target, $user) = @_;
126
127 my $cmd = [];
128
129 if ($target->{ip}) {
130 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
131 }
132 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
133 eval {
134 run_cmd($cmd);
135 };
136
137 if ($@) {
138 return 0;
139 }
140 return 1;
141 }
142
143 sub parse_target {
144 my ($text) = @_;
145
146 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
147 my $target = {};
148
149 if ($text !~ $TARGETRE) {
150 die "$errstr\n";
151 }
152 $target->{all} = $2;
153 $target->{ip} = $1 if $1;
154 my @parts = split('/', $2);
155
156 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
157
158 my $pool = $target->{pool} = shift(@parts);
159 die "$errstr\n" if !$pool;
160
161 if ($pool =~ m/^\d+$/) {
162 $target->{vmid} = $pool;
163 delete $target->{pool};
164 }
165
166 return $target if (@parts == 0);
167 $target->{last_part} = pop(@parts);
168
169 if ($target->{ip}) {
170 pop(@parts);
171 }
172 if (@parts > 0) {
173 $target->{path} = join('/', @parts);
174 }
175
176 return $target;
177 }
178
179 sub read_cron {
180
181 #This is for the first use to init file;
182 if (!-e $CRONJOBS) {
183 my $new_fh = IO::File->new("> $CRONJOBS");
184 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
185 close($new_fh);
186 return undef;
187 }
188
189 my $fh = IO::File->new("< $CRONJOBS");
190 die "Could not open file $CRONJOBS: $!\n" if !$fh;
191
192 my @text = <$fh>;
193
194 close($fh);
195
196 return encode_cron(@text);
197 }
198
199 sub parse_argv {
200 my (@arg) = @_;
201
202 my $param = {
203 dest => undef,
204 source => undef,
205 verbose => undef,
206 limit => undef,
207 maxsnap => undef,
208 name => undef,
209 skip => undef,
210 method => undef,
211 source_user => undef,
212 dest_user => undef,
213 properties => undef,
214 };
215
216 my ($ret) = GetOptionsFromArray(
217 \@arg,
218 'dest=s' => \$param->{dest},
219 'source=s' => \$param->{source},
220 'verbose' => \$param->{verbose},
221 'limit=i' => \$param->{limit},
222 'maxsnap=i' => \$param->{maxsnap},
223 'name=s' => \$param->{name},
224 'skip' => \$param->{skip},
225 'method=s' => \$param->{method},
226 'source-user=s' => \$param->{source_user},
227 'dest-user=s' => \$param->{dest_user},
228 'properties' => \$param->{properties},
229 );
230
231 die "can't parse options\n" if $ret == 0;
232
233 $param->{name} //= "default";
234 $param->{maxsnap} //= 1;
235 $param->{method} //= "ssh";
236 $param->{source_user} //= "root";
237 $param->{dest_user} //= "root";
238
239 return $param;
240 }
241
242 sub add_state_to_job {
243 my ($job) = @_;
244
245 my $states = read_state();
246 my $state = $states->{$job->{source}}->{$job->{name}};
247
248 $job->{state} = $state->{state};
249 $job->{lsync} = $state->{lsync};
250 $job->{vm_type} = $state->{vm_type};
251
252 for (my $i = 0; $state->{"snap$i"}; $i++) {
253 $job->{"snap$i"} = $state->{"snap$i"};
254 }
255
256 return $job;
257 }
258
259 sub encode_cron {
260 my (@text) = @_;
261
262 my $cfg = {};
263
264 while (my $line = shift(@text)) {
265
266 my @arg = split('\s', $line);
267 my $param = parse_argv(@arg);
268
269 if ($param->{source} && $param->{dest}) {
270 my $source = delete $param->{source};
271 my $name = delete $param->{name};
272
273 $cfg->{$source}->{$name} = $param;
274 }
275 }
276
277 return $cfg;
278 }
279
280 sub param_to_job {
281 my ($param) = @_;
282
283 my $job = {};
284
285 my $source = parse_target($param->{source});
286 my $dest = parse_target($param->{dest}) if $param->{dest};
287
288 $job->{name} = !$param->{name} ? "default" : $param->{name};
289 $job->{dest} = $param->{dest} if $param->{dest};
290 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
291 $job->{method} = "ssh" if !$job->{method};
292 $job->{limit} = $param->{limit};
293 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
294 $job->{source} = $param->{source};
295 $job->{source_user} = $param->{source_user};
296 $job->{dest_user} = $param->{dest_user};
297 $job->{properties} = !!$param->{properties};
298
299 return $job;
300 }
301
302 sub read_state {
303
304 if (!-e $STATE) {
305 make_path $CONFIG_PATH;
306 my $new_fh = IO::File->new("> $STATE");
307 die "Could not create $STATE: $!\n" if !$new_fh;
308 print $new_fh "{}";
309 close($new_fh);
310 return undef;
311 }
312
313 my $fh = IO::File->new("< $STATE");
314 die "Could not open file $STATE: $!\n" if !$fh;
315
316 my $text = <$fh>;
317 my $states = decode_json($text);
318
319 close($fh);
320
321 return $states;
322 }
323
324 sub update_state {
325 my ($job) = @_;
326 my $text;
327 my $in_fh;
328
329 eval {
330
331 $in_fh = IO::File->new("< $STATE");
332 die "Could not open file $STATE: $!\n" if !$in_fh;
333 lock($in_fh);
334 $text = <$in_fh>;
335 };
336
337 my $out_fh = IO::File->new("> $STATE.new");
338 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
339
340 my $states = {};
341 my $state = {};
342 if ($text){
343 $states = decode_json($text);
344 $state = $states->{$job->{source}}->{$job->{name}};
345 }
346
347 if ($job->{state} ne "del") {
348 $state->{state} = $job->{state};
349 $state->{lsync} = $job->{lsync};
350 $state->{vm_type} = $job->{vm_type};
351
352 for (my $i = 0; $job->{"snap$i"} ; $i++) {
353 $state->{"snap$i"} = $job->{"snap$i"};
354 }
355 $states->{$job->{source}}->{$job->{name}} = $state;
356 } else {
357
358 delete $states->{$job->{source}}->{$job->{name}};
359 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
360 }
361
362 $text = encode_json($states);
363 print $out_fh $text;
364
365 close($out_fh);
366 rename "$STATE.new", $STATE;
367 eval {
368 close($in_fh);
369 };
370
371 return $states;
372 }
373
374 sub update_cron {
375 my ($job) = @_;
376
377 my $updated;
378 my $has_header;
379 my $line_no = 0;
380 my $text = "";
381 my $header = "SHELL=/bin/sh\n";
382 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
383
384 my $fh = IO::File->new("< $CRONJOBS");
385 die "Could not open file $CRONJOBS: $!\n" if !$fh;
386 lock($fh);
387
388 my @test = <$fh>;
389
390 while (my $line = shift(@test)) {
391 chomp($line);
392 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
393 $updated = 1;
394 next if $job->{state} eq "del";
395 $text .= format_job($job, $line);
396 } else {
397 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
398 $has_header = 1;
399 }
400 $text .= "$line\n";
401 }
402 $line_no++;
403 }
404
405 if (!$has_header) {
406 $text = "$header$text";
407 }
408
409 if (!$updated) {
410 $text .= format_job($job);
411 }
412 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
413 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
414
415 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
416 close ($new_fh);
417
418 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
419 close ($fh);
420 }
421
422 sub format_job {
423 my ($job, $line) = @_;
424 my $text = "";
425
426 if ($job->{state} eq "stopped") {
427 $text = "#";
428 }
429 if ($line) {
430 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
431 $text .= $1;
432 } else {
433 $text .= "*/$INTERVAL * * * *";
434 }
435 $text .= " root";
436 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
437 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
438 $text .= " --limit $job->{limit}" if $job->{limit};
439 $text .= " --method $job->{method}";
440 $text .= " --verbose" if $job->{verbose};
441 $text .= " --source-user $job->{source_user}";
442 $text .= " --dest-user $job->{dest_user}";
443 $text .= " --properties" if $job->{properties};
444 $text .= "\n";
445
446 return $text;
447 }
448
449 sub list {
450
451 my $cfg = read_cron();
452
453 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
454
455 my $states = read_state();
456 foreach my $source (sort keys%{$cfg}) {
457 foreach my $name (sort keys%{$cfg->{$source}}) {
458 $list .= sprintf("%-25s", cut_target_width($source, 25));
459 $list .= sprintf("%-25s", cut_target_width($name, 25));
460 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
461 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
462 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
463 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
464 }
465 }
466
467 return $list;
468 }
469
470 sub vm_exists {
471 my ($target, $user) = @_;
472
473 return undef if !defined($target->{vmid});
474
475 my $conf_fn = "$target->{vmid}.conf";
476
477 if ($target->{ip}) {
478 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
479 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
480 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
481 } else {
482 return "qemu" if -f "$QEMU_CONF/$conf_fn";
483 return "lxc" if -f "$LXC_CONF/$conf_fn";
484 }
485
486 return undef;
487 }
488
489 sub init {
490 my ($param) = @_;
491
492 my $cfg = read_cron();
493
494 my $job = param_to_job($param);
495
496 $job->{state} = "ok";
497 $job->{lsync} = 0;
498
499 my $source = parse_target($param->{source});
500 my $dest = parse_target($param->{dest});
501
502 if (my $ip = $dest->{ip}) {
503 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
504 }
505
506 if (my $ip = $source->{ip}) {
507 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
508 }
509
510 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
511
512 if (!defined($source->{vmid})) {
513 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
514 }
515
516 my $vm_type = vm_exists($source, $param->{source_user});
517 $job->{vm_type} = $vm_type;
518 $source->{vm_type} = $vm_type;
519
520 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
521
522 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
523
524 #check if vm has zfs disks if not die;
525 get_disks($source, $param->{source_user}) if $source->{vmid};
526
527 update_cron($job);
528 update_state($job);
529
530 eval {
531 sync($param) if !$param->{skip};
532 };
533 if(my $err = $@) {
534 destroy_job($param);
535 print $err;
536 }
537 }
538
539 sub get_job {
540 my ($param) = @_;
541
542 my $cfg = read_cron();
543
544 if (!$cfg->{$param->{source}}->{$param->{name}}) {
545 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
546 }
547 my $job = $cfg->{$param->{source}}->{$param->{name}};
548 $job->{name} = $param->{name};
549 $job->{source} = $param->{source};
550 $job = add_state_to_job($job);
551
552 return $job;
553 }
554
555 sub destroy_job {
556 my ($param) = @_;
557
558 my $job = get_job($param);
559 $job->{state} = "del";
560
561 update_cron($job);
562 update_state($job);
563 }
564
565 sub sync {
566 my ($param) = @_;
567
568 my $lock_fh = IO::File->new("> $LOCKFILE");
569 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
570 lock($lock_fh);
571
572 my $date = get_date();
573 my $job;
574 eval {
575 $job = get_job($param);
576 };
577
578 if ($job && defined($job->{state}) && $job->{state} eq "syncing") {
579 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
580 }
581
582 my $dest = parse_target($param->{dest});
583 my $source = parse_target($param->{source});
584
585 my $sync_path = sub {
586 my ($source, $dest, $job, $param, $date) = @_;
587
588 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
589
590 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
591
592 send_image($source, $dest, $param);
593
594 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
595
596 };
597
598 my $vm_type = vm_exists($source, $param->{source_user});
599 $source->{vm_type} = $vm_type;
600
601 if ($job) {
602 $job->{state} = "syncing";
603 $job->{vm_type} = $vm_type if !$job->{vm_type};
604 update_state($job);
605 }
606
607 eval{
608 if ($source->{vmid}) {
609 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
610 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
611 my $disks = get_disks($source, $param->{source_user});
612
613 foreach my $disk (sort keys %{$disks}) {
614 $source->{all} = $disks->{$disk}->{all};
615 $source->{pool} = $disks->{$disk}->{pool};
616 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
617 $source->{last_part} = $disks->{$disk}->{last_part};
618 &$sync_path($source, $dest, $job, $param, $date);
619 }
620 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
621 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user});
622 } else {
623 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user});
624 }
625 } else {
626 &$sync_path($source, $dest, $job, $param, $date);
627 }
628 };
629 if(my $err = $@) {
630 if ($job) {
631 $job->{state} = "error";
632 update_state($job);
633 unlock($lock_fh);
634 close($lock_fh);
635 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
636 }
637 die "$err\n";
638 }
639
640 if ($job) {
641 $job->{state} = "ok";
642 $job->{lsync} = $date;
643 update_state($job);
644 }
645
646 unlock($lock_fh);
647 close($lock_fh);
648 }
649
650 sub snapshot_get{
651 my ($source, $dest, $max_snap, $name, $source_user) = @_;
652
653 my $cmd = [];
654 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
655 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
656 push @$cmd, $source->{all};
657
658 my $raw = run_cmd($cmd);
659 my $index = 0;
660 my $line = "";
661 my $last_snap = undef;
662 my $old_snap;
663
664 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
665 $line = $1;
666 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
667
668 $last_snap = $1 if (!$last_snap);
669 $old_snap = $1;
670 $index++;
671 if ($index == $max_snap) {
672 $source->{destroy} = 1;
673 last;
674 };
675 }
676 }
677
678 return ($old_snap, $last_snap) if $last_snap;
679
680 return undef;
681 }
682
683 sub snapshot_add {
684 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
685
686 my $snap_name = "rep_$name\_".$date;
687
688 $source->{new_snap} = $snap_name;
689
690 my $path = "$source->{all}\@$snap_name";
691
692 my $cmd = [];
693 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
694 push @$cmd, 'zfs', 'snapshot', $path;
695 eval{
696 run_cmd($cmd);
697 };
698
699 if (my $err = $@) {
700 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
701 die "$err\n";
702 }
703 }
704
705 sub write_cron {
706 my ($cfg) = @_;
707
708 my $text = "SHELL=/bin/sh\n";
709 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
710
711 my $fh = IO::File->new("> $CRONJOBS");
712 die "Could not open file: $!\n" if !$fh;
713
714 foreach my $source (sort keys%{$cfg}) {
715 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
716 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
717 $text .= "$PROG_PATH sync";
718 $text .= " -source ";
719 if ($cfg->{$source}->{$sync_name}->{vmid}) {
720 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
721 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
722 } else {
723 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
724 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
725 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
726 }
727 $text .= " -dest ";
728 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
729 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
730 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
731 $text .= " -name $sync_name ";
732 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
733 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
734 $text .= "\n";
735 }
736 }
737 die "Can't write to cron\n" if (!print($fh $text));
738 close($fh);
739 }
740
741 sub get_disks {
742 my ($target, $user) = @_;
743
744 my $cmd = [];
745 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
746
747 if ($target->{vm_type} eq 'qemu') {
748 push @$cmd, 'qm', 'config', $target->{vmid};
749 } elsif ($target->{vm_type} eq 'lxc') {
750 push @$cmd, 'pct', 'config', $target->{vmid};
751 } else {
752 die "VM Type unknown\n";
753 }
754
755 my $res = run_cmd($cmd);
756
757 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
758
759 return $disks;
760 }
761
762 sub run_cmd {
763 my ($cmd) = @_;
764 print "Start CMD\n" if $DEBUG;
765 print Dumper $cmd if $DEBUG;
766 if (ref($cmd) eq 'ARRAY') {
767 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
768 }
769 my $output = `$cmd 2>&1`;
770
771 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
772
773 chomp($output);
774 print Dumper $output if $DEBUG;
775 print "END CMD\n" if $DEBUG;
776 return $output;
777 }
778
779 sub parse_disks {
780 my ($text, $ip, $vm_type, $user) = @_;
781
782 my $disks;
783
784 my $num = 0;
785 while ($text && $text =~ s/^(.*?)(\n|$)//) {
786 my $line = $1;
787
788 next if $line =~ /media=cdrom/;
789 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
790
791 #QEMU if backup is not set include in sync
792 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
793
794 #LXC if backup is not set do no in sync
795 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
796
797 my $disk = undef;
798 my $stor = undef;
799 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
800 my @parameter = split(/,/,$1);
801
802 foreach my $opt (@parameter) {
803 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
804 $disk = $2;
805 $stor = $1;
806 last;
807 }
808 }
809 }
810 if (!defined($disk) || !defined($stor)) {
811 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
812 next;
813 }
814
815 my $cmd = [];
816 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
817 push @$cmd, 'pvesm', 'path', "$stor$disk";
818 my $path = run_cmd($cmd);
819
820 die "Get no path from pvesm path $stor$disk\n" if !$path;
821
822 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
823
824 my @array = split('/', $1);
825 $disks->{$num}->{pool} = shift(@array);
826 $disks->{$num}->{all} = $disks->{$num}->{pool};
827 if (0 < @array) {
828 $disks->{$num}->{path} = join('/', @array);
829 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
830 }
831 $disks->{$num}->{last_part} = $disk;
832 $disks->{$num}->{all} .= "\/$disk";
833
834 $num++;
835 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
836
837 $disks->{$num}->{pool} = $1;
838 $disks->{$num}->{all} = $disks->{$num}->{pool};
839
840 if ($2) {
841 $disks->{$num}->{path} = $3;
842 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
843 }
844
845 $disks->{$num}->{last_part} = $disk;
846 $disks->{$num}->{all} .= "\/$disk";
847
848 $num++;
849
850 } else {
851 die "ERROR: in path\n";
852 }
853 }
854
855 die "Vm include no disk on zfs.\n" if !$disks->{0};
856 return $disks;
857 }
858
859 sub snapshot_destroy {
860 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
861
862 my @zfscmd = ('zfs', 'destroy');
863 my $snapshot = "$source->{all}\@$snap";
864
865 eval {
866 if($source->{ip} && $method eq 'ssh'){
867 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
868 } else {
869 run_cmd([@zfscmd, $snapshot]);
870 }
871 };
872 if (my $erro = $@) {
873 warn "WARN: $erro";
874 }
875 if ($dest) {
876 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
877
878 my $path = "$dest->{all}";
879 $path .= "/$source->{last_part}" if $source->{last_part};
880
881 eval {
882 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
883 };
884 if (my $erro = $@) {
885 warn "WARN: $erro";
886 }
887 }
888 }
889
890 sub snapshot_exist {
891 my ($source , $dest, $method, $dest_user) = @_;
892
893 my $cmd = [];
894 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--' if $dest->{ip};
895 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
896
897 my $path = $dest->{all};
898 $path .= "/$source->{last_part}" if $source->{last_part};
899 $path .= "\@$source->{old_snap}";
900
901 push @$cmd, $path;
902
903
904 my $text = "";
905 eval {$text =run_cmd($cmd);};
906 if (my $erro =$@) {
907 warn "WARN: $erro";
908 return undef;
909 }
910
911 while ($text && $text =~ s/^(.*?)(\n|$)//) {
912 my $line =$1;
913 return 1 if $line =~ m/^.*$source->{old_snap}$/;
914 }
915 }
916
917 sub send_image {
918 my ($source, $dest, $param) = @_;
919
920 my $cmd = [];
921
922 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
923 push @$cmd, 'zfs', 'send';
924 push @$cmd, '-p', if $param->{properties};
925 push @$cmd, '-v' if $param->{verbose};
926
927 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{dest_user})) {
928 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
929 }
930 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
931
932 if ($param->{limit}){
933 my $bwl = $param->{limit}*1024;
934 push @$cmd, \'|', 'cstream', '-t', $bwl;
935 }
936 my $target = "$dest->{all}";
937 $target .= "/$source->{last_part}" if $source->{last_part};
938 $target =~ s!/+!/!g;
939
940 push @$cmd, \'|';
941 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
942 push @$cmd, 'zfs', 'recv', '-F', '--';
943 push @$cmd, "$target";
944
945 eval {
946 run_cmd($cmd)
947 };
948
949 if (my $erro = $@) {
950 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
951 die $erro;
952 };
953 }
954
955
956 sub send_config{
957 my ($source, $dest, $method, $source_user, $dest_user) = @_;
958
959 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
960 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
961
962 my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
963
964 $dest_target_new = $config_dir.'/'.$dest_target_new;
965
966 if ($method eq 'ssh'){
967 if ($dest->{ip} && $source->{ip}) {
968 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
969 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
970 } elsif ($dest->{ip}) {
971 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
972 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
973 } elsif ($source->{ip}) {
974 run_cmd(['mkdir', '-p', '--', $config_dir]);
975 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
976 }
977
978 if ($source->{destroy}){
979 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
980 if($dest->{ip}){
981 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
982 } else {
983 run_cmd(['rm', '-f', '--', $dest_target_old]);
984 }
985 }
986 } elsif ($method eq 'local') {
987 run_cmd(['mkdir', '-p', '--', $config_dir]);
988 run_cmd(['cp', $source_target, $dest_target_new]);
989 }
990 }
991
992 sub get_date {
993 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
994 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
995
996 return $datestamp;
997 }
998
999 sub status {
1000 my $cfg = read_cron();
1001
1002 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1003
1004 my $states = read_state();
1005
1006 foreach my $source (sort keys%{$cfg}) {
1007 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1008 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1009 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1010 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1011 }
1012 }
1013
1014 return $status_list;
1015 }
1016
1017 sub enable_job {
1018 my ($param) = @_;
1019
1020 my $job = get_job($param);
1021 $job->{state} = "ok";
1022 update_state($job);
1023 update_cron($job);
1024 }
1025
1026 sub disable_job {
1027 my ($param) = @_;
1028
1029 my $job = get_job($param);
1030 $job->{state} = "stopped";
1031 update_state($job);
1032 update_cron($job);
1033 }
1034
1035 my $cmd_help = {
1036 destroy => qq{
1037 $PROGNAME destroy -source <string> [OPTIONS]
1038
1039 remove a sync Job from the scheduler
1040
1041 -name string
1042
1043 name of the sync job, if not set it is default
1044
1045 -source string
1046
1047 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1048 },
1049 create => qq{
1050 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1051
1052 Create a sync Job
1053
1054 -dest string
1055
1056 the destination target is like [IP]:<Pool>[/Path]
1057
1058 -dest-user string
1059
1060 name of the user on the destination target, root by default
1061
1062 -limit integer
1063
1064 max sync speed in kBytes/s, default unlimited
1065
1066 -maxsnap string
1067
1068 how much snapshots will be kept before get erased, default 1
1069
1070 -name string
1071
1072 name of the sync job, if not set it is default
1073
1074 -skip boolean
1075
1076 if this flag is set it will skip the first sync
1077
1078 -source string
1079
1080 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1081
1082 -source-user string
1083
1084 name of the user on the source target, root by default
1085
1086 -properties boolean
1087
1088 Include the dataset's properties in the stream.
1089 },
1090 sync => qq{
1091 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1092
1093 will sync one time
1094
1095 -dest string
1096
1097 the destination target is like [IP:]<Pool>[/Path]
1098
1099 -dest-user string
1100
1101 name of the user on the destination target, root by default
1102
1103 -limit integer
1104
1105 max sync speed in kBytes/s, default unlimited
1106
1107 -maxsnap integer
1108
1109 how much snapshots will be kept before get erased, default 1
1110
1111 -name string
1112
1113 name of the sync job, if not set it is default.
1114 It is only necessary if scheduler allready contains this source.
1115
1116 -source string
1117
1118 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1119
1120 -source-user string
1121
1122 name of the user on the source target, root by default
1123
1124 -verbose boolean
1125
1126 print out the sync progress.
1127
1128 -properties boolean
1129
1130 Include the dataset's properties in the stream.
1131 },
1132 list => qq{
1133 $PROGNAME list
1134
1135 Get a List of all scheduled Sync Jobs
1136 },
1137 status => qq{
1138 $PROGNAME status
1139
1140 Get the status of all scheduled Sync Jobs
1141 },
1142 help => qq{
1143 $PROGNAME help <cmd> [OPTIONS]
1144
1145 Get help about specified command.
1146
1147 <cmd> string
1148
1149 Command name
1150
1151 -verbose boolean
1152
1153 Verbose output format.
1154 },
1155 enable => qq{
1156 $PROGNAME enable -source <string> [OPTIONS]
1157
1158 enable a syncjob and reset error
1159
1160 -name string
1161
1162 name of the sync job, if not set it is default
1163
1164 -source string
1165
1166 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1167 },
1168 disable => qq{
1169 $PROGNAME disable -source <string> [OPTIONS]
1170
1171 pause a sync job
1172
1173 -name string
1174
1175 name of the sync job, if not set it is default
1176
1177 -source string
1178
1179 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1180 },
1181 printpod => 'internal command',
1182
1183 };
1184
1185 if (!$command) {
1186 usage(); die "\n";
1187 } elsif (!$cmd_help->{$command}) {
1188 print "ERROR: unknown command '$command'";
1189 usage(1); die "\n";
1190 }
1191
1192 my @arg = @ARGV;
1193 my $param = parse_argv(@arg);
1194
1195 sub check_params {
1196 for (@_) {
1197 die "$cmd_help->{$command}\n" if !$param->{$_};
1198 }
1199 }
1200
1201 if ($command eq 'destroy') {
1202 check_params(qw(source));
1203
1204 check_target($param->{source});
1205 destroy_job($param);
1206
1207 } elsif ($command eq 'sync') {
1208 check_params(qw(source dest));
1209
1210 check_target($param->{source});
1211 check_target($param->{dest});
1212 sync($param);
1213
1214 } elsif ($command eq 'create') {
1215 check_params(qw(source dest));
1216
1217 check_target($param->{source});
1218 check_target($param->{dest});
1219 init($param);
1220
1221 } elsif ($command eq 'status') {
1222 print status();
1223
1224 } elsif ($command eq 'list') {
1225 print list();
1226
1227 } elsif ($command eq 'help') {
1228 my $help_command = $ARGV[1];
1229
1230 if ($help_command && $cmd_help->{$help_command}) {
1231 die "$cmd_help->{$help_command}\n";
1232
1233 }
1234 if ($param->{verbose}) {
1235 exec("man $PROGNAME");
1236
1237 } else {
1238 usage(1);
1239
1240 }
1241
1242 } elsif ($command eq 'enable') {
1243 check_params(qw(source));
1244
1245 check_target($param->{source});
1246 enable_job($param);
1247
1248 } elsif ($command eq 'disable') {
1249 check_params(qw(source));
1250
1251 check_target($param->{source});
1252 disable_job($param);
1253
1254 } elsif ($command eq 'printpod') {
1255 print_pod();
1256 }
1257
1258 sub usage {
1259 my ($help) = @_;
1260
1261 print("ERROR:\tno command specified\n") if !$help;
1262 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1263 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1264 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1265 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1266 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1267 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1268 print("\t$PROGNAME list\n");
1269 print("\t$PROGNAME status\n");
1270 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1271 }
1272
1273 sub check_target {
1274 my ($target) = @_;
1275 parse_target($target);
1276 }
1277
1278 sub print_pod {
1279
1280 my $synopsis = join("\n", sort values %$cmd_help);
1281
1282 print <<EOF;
1283 =head1 NAME
1284
1285 pve-zsync - PVE ZFS Replication Manager
1286
1287 =head1 SYNOPSIS
1288
1289 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1290
1291 $synopsis
1292
1293 =head1 DESCRIPTION
1294
1295 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1296 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1297 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1298 To config cron see man crontab.
1299
1300 =head2 PVE ZFS Storage sync Tool
1301
1302 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1303
1304 =head1 EXAMPLES
1305
1306 add sync job from local VM to remote ZFS Server
1307 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1308
1309 =head1 IMPORTANT FILES
1310
1311 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1312
1313 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1314
1315 =head1 COPYRIGHT AND DISCLAIMER
1316
1317 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1318
1319 This program is free software: you can redistribute it and/or modify it
1320 under the terms of the GNU Affero General Public License as published
1321 by the Free Software Foundation, either version 3 of the License, or
1322 (at your option) any later version.
1323
1324 This program is distributed in the hope that it will be useful, but
1325 WITHOUT ANY WARRANTY; without even the implied warranty of
1326 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1327 Affero General Public License for more details.
1328
1329 You should have received a copy of the GNU Affero General Public
1330 License along with this program. If not, see
1331 <http://www.gnu.org/licenses/>.
1332
1333 EOF
1334 }