]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
allow to set DEBUG over environment and load Data::Dumper conditionally
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
22 my $PROG_PATH = "$PATH/${PROGNAME}";
23 my $INTERVAL = 15;
24 my $DEBUG;
25
26 BEGIN {
27 $DEBUG = 0; # change default here. not above on declaration!
28 $DEBUG ||= $ENV{ZSYNC_DEBUG};
29 if ($DEBUG) {
30 require Data::Dumper;
31 Data::Dumper->import();
32 }
33 }
34
35 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
36 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
37 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
38 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
39
40 my $IPV6RE = "(?:" .
41 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
42 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
43 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
49 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
50
51 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
52 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
53 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
54 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
55 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
56
57 my $command = $ARGV[0];
58
59 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
60 check_bin ('cstream');
61 check_bin ('zfs');
62 check_bin ('ssh');
63 check_bin ('scp');
64 }
65
66 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
67 die "Signaled, aborting sync: $!\n";
68 };
69
70 sub check_bin {
71 my ($bin) = @_;
72
73 foreach my $p (split (/:/, $ENV{PATH})) {
74 my $fn = "$p/$bin";
75 if (-x $fn) {
76 return $fn;
77 }
78 }
79
80 die "unable to find command '$bin'\n";
81 }
82
83 sub cut_target_width {
84 my ($path, $maxlen) = @_;
85 $path =~ s@/+@/@g;
86
87 return $path if length($path) <= $maxlen;
88
89 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
90
91 $path =~ s@/([^/]+/?)$@@;
92 my $tail = $1;
93
94 if (length($tail)+3 == $maxlen) {
95 return "../$tail";
96 } elsif (length($tail)+2 >= $maxlen) {
97 return '..'.substr($tail, -$maxlen+2)
98 }
99
100 $path =~ s@(/[^/]+)(?:/|$)@@;
101 my $head = $1;
102 my $both = length($head) + length($tail);
103 my $remaining = $maxlen-$both-4; # -4 for "/../"
104
105 if ($remaining < 0) {
106 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
107 }
108
109 substr($path, ($remaining/2), (length($path)-$remaining), '..');
110 return "$head/" . $path . "/$tail";
111 }
112
113 sub lock {
114 my ($fh) = @_;
115 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
116 }
117
118 sub unlock {
119 my ($fh) = @_;
120 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
121 }
122
123 sub get_status {
124 my ($source, $name, $status) = @_;
125
126 if ($status->{$source->{all}}->{$name}->{status}) {
127 return $status;
128 }
129
130 return undef;
131 }
132
133 sub check_pool_exists {
134 my ($target, $user) = @_;
135
136 my $cmd = [];
137
138 if ($target->{ip}) {
139 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
140 }
141 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
142 eval {
143 run_cmd($cmd);
144 };
145
146 if ($@) {
147 return 0;
148 }
149 return 1;
150 }
151
152 sub parse_target {
153 my ($text) = @_;
154
155 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
156 my $target = {};
157
158 if ($text !~ $TARGETRE) {
159 die "$errstr\n";
160 }
161 $target->{all} = $2;
162 $target->{ip} = $1 if $1;
163 my @parts = split('/', $2);
164
165 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
166
167 my $pool = $target->{pool} = shift(@parts);
168 die "$errstr\n" if !$pool;
169
170 if ($pool =~ m/^\d+$/) {
171 $target->{vmid} = $pool;
172 delete $target->{pool};
173 }
174
175 return $target if (@parts == 0);
176 $target->{last_part} = pop(@parts);
177
178 if ($target->{ip}) {
179 pop(@parts);
180 }
181 if (@parts > 0) {
182 $target->{path} = join('/', @parts);
183 }
184
185 return $target;
186 }
187
188 sub read_cron {
189
190 #This is for the first use to init file;
191 if (!-e $CRONJOBS) {
192 my $new_fh = IO::File->new("> $CRONJOBS");
193 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
194 close($new_fh);
195 return undef;
196 }
197
198 my $fh = IO::File->new("< $CRONJOBS");
199 die "Could not open file $CRONJOBS: $!\n" if !$fh;
200
201 my @text = <$fh>;
202
203 close($fh);
204
205 return encode_cron(@text);
206 }
207
208 sub parse_argv {
209 my (@arg) = @_;
210
211 my $param = {
212 dest => undef,
213 source => undef,
214 verbose => undef,
215 limit => undef,
216 maxsnap => undef,
217 name => undef,
218 skip => undef,
219 method => undef,
220 source_user => undef,
221 dest_user => undef,
222 properties => undef,
223 };
224
225 my ($ret) = GetOptionsFromArray(
226 \@arg,
227 'dest=s' => \$param->{dest},
228 'source=s' => \$param->{source},
229 'verbose' => \$param->{verbose},
230 'limit=i' => \$param->{limit},
231 'maxsnap=i' => \$param->{maxsnap},
232 'name=s' => \$param->{name},
233 'skip' => \$param->{skip},
234 'method=s' => \$param->{method},
235 'source-user=s' => \$param->{source_user},
236 'dest-user=s' => \$param->{dest_user},
237 'properties' => \$param->{properties},
238 );
239
240 die "can't parse options\n" if $ret == 0;
241
242 $param->{name} //= "default";
243 $param->{maxsnap} //= 1;
244 $param->{method} //= "ssh";
245 $param->{source_user} //= "root";
246 $param->{dest_user} //= "root";
247
248 return $param;
249 }
250
251 sub add_state_to_job {
252 my ($job) = @_;
253
254 my $states = read_state();
255 my $state = $states->{$job->{source}}->{$job->{name}};
256
257 $job->{state} = $state->{state};
258 $job->{lsync} = $state->{lsync};
259 $job->{vm_type} = $state->{vm_type};
260
261 for (my $i = 0; $state->{"snap$i"}; $i++) {
262 $job->{"snap$i"} = $state->{"snap$i"};
263 }
264
265 return $job;
266 }
267
268 sub encode_cron {
269 my (@text) = @_;
270
271 my $cfg = {};
272
273 while (my $line = shift(@text)) {
274
275 my @arg = split('\s', $line);
276 my $param = parse_argv(@arg);
277
278 if ($param->{source} && $param->{dest}) {
279 my $source = delete $param->{source};
280 my $name = delete $param->{name};
281
282 $cfg->{$source}->{$name} = $param;
283 }
284 }
285
286 return $cfg;
287 }
288
289 sub param_to_job {
290 my ($param) = @_;
291
292 my $job = {};
293
294 my $source = parse_target($param->{source});
295 my $dest = parse_target($param->{dest}) if $param->{dest};
296
297 $job->{name} = !$param->{name} ? "default" : $param->{name};
298 $job->{dest} = $param->{dest} if $param->{dest};
299 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
300 $job->{method} = "ssh" if !$job->{method};
301 $job->{limit} = $param->{limit};
302 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
303 $job->{source} = $param->{source};
304 $job->{source_user} = $param->{source_user};
305 $job->{dest_user} = $param->{dest_user};
306 $job->{properties} = !!$param->{properties};
307
308 return $job;
309 }
310
311 sub read_state {
312
313 if (!-e $STATE) {
314 make_path $CONFIG_PATH;
315 my $new_fh = IO::File->new("> $STATE");
316 die "Could not create $STATE: $!\n" if !$new_fh;
317 print $new_fh "{}";
318 close($new_fh);
319 return undef;
320 }
321
322 my $fh = IO::File->new("< $STATE");
323 die "Could not open file $STATE: $!\n" if !$fh;
324
325 my $text = <$fh>;
326 my $states = decode_json($text);
327
328 close($fh);
329
330 return $states;
331 }
332
333 sub update_state {
334 my ($job) = @_;
335 my $text;
336 my $in_fh;
337
338 eval {
339
340 $in_fh = IO::File->new("< $STATE");
341 die "Could not open file $STATE: $!\n" if !$in_fh;
342 lock($in_fh);
343 $text = <$in_fh>;
344 };
345
346 my $out_fh = IO::File->new("> $STATE.new");
347 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
348
349 my $states = {};
350 my $state = {};
351 if ($text){
352 $states = decode_json($text);
353 $state = $states->{$job->{source}}->{$job->{name}};
354 }
355
356 if ($job->{state} ne "del") {
357 $state->{state} = $job->{state};
358 $state->{lsync} = $job->{lsync};
359 $state->{vm_type} = $job->{vm_type};
360
361 for (my $i = 0; $job->{"snap$i"} ; $i++) {
362 $state->{"snap$i"} = $job->{"snap$i"};
363 }
364 $states->{$job->{source}}->{$job->{name}} = $state;
365 } else {
366
367 delete $states->{$job->{source}}->{$job->{name}};
368 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
369 }
370
371 $text = encode_json($states);
372 print $out_fh $text;
373
374 close($out_fh);
375 rename "$STATE.new", $STATE;
376 eval {
377 close($in_fh);
378 };
379
380 return $states;
381 }
382
383 sub update_cron {
384 my ($job) = @_;
385
386 my $updated;
387 my $has_header;
388 my $line_no = 0;
389 my $text = "";
390 my $header = "SHELL=/bin/sh\n";
391 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
392
393 my $fh = IO::File->new("< $CRONJOBS");
394 die "Could not open file $CRONJOBS: $!\n" if !$fh;
395 lock($fh);
396
397 my @test = <$fh>;
398
399 while (my $line = shift(@test)) {
400 chomp($line);
401 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
402 $updated = 1;
403 next if $job->{state} eq "del";
404 $text .= format_job($job, $line);
405 } else {
406 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
407 $has_header = 1;
408 }
409 $text .= "$line\n";
410 }
411 $line_no++;
412 }
413
414 if (!$has_header) {
415 $text = "$header$text";
416 }
417
418 if (!$updated) {
419 $text .= format_job($job);
420 }
421 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
422 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
423
424 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
425 close ($new_fh);
426
427 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
428 close ($fh);
429 }
430
431 sub format_job {
432 my ($job, $line) = @_;
433 my $text = "";
434
435 if ($job->{state} eq "stopped") {
436 $text = "#";
437 }
438 if ($line) {
439 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
440 $text .= $1;
441 } else {
442 $text .= "*/$INTERVAL * * * *";
443 }
444 $text .= " root";
445 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
446 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
447 $text .= " --limit $job->{limit}" if $job->{limit};
448 $text .= " --method $job->{method}";
449 $text .= " --verbose" if $job->{verbose};
450 $text .= " --source-user $job->{source_user}";
451 $text .= " --dest-user $job->{dest_user}";
452 $text .= " --properties" if $job->{properties};
453 $text .= "\n";
454
455 return $text;
456 }
457
458 sub list {
459
460 my $cfg = read_cron();
461
462 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
463
464 my $states = read_state();
465 foreach my $source (sort keys%{$cfg}) {
466 foreach my $name (sort keys%{$cfg->{$source}}) {
467 $list .= sprintf("%-25s", cut_target_width($source, 25));
468 $list .= sprintf("%-25s", cut_target_width($name, 25));
469 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
470 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
471 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
472 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
473 }
474 }
475
476 return $list;
477 }
478
479 sub vm_exists {
480 my ($target, $user) = @_;
481
482 return undef if !defined($target->{vmid});
483
484 my $conf_fn = "$target->{vmid}.conf";
485
486 if ($target->{ip}) {
487 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
488 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
489 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
490 } else {
491 return "qemu" if -f "$QEMU_CONF/$conf_fn";
492 return "lxc" if -f "$LXC_CONF/$conf_fn";
493 }
494
495 return undef;
496 }
497
498 sub init {
499 my ($param) = @_;
500
501 my $cfg = read_cron();
502
503 my $job = param_to_job($param);
504
505 $job->{state} = "ok";
506 $job->{lsync} = 0;
507
508 my $source = parse_target($param->{source});
509 my $dest = parse_target($param->{dest});
510
511 if (my $ip = $dest->{ip}) {
512 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
513 }
514
515 if (my $ip = $source->{ip}) {
516 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
517 }
518
519 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
520
521 if (!defined($source->{vmid})) {
522 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
523 }
524
525 my $vm_type = vm_exists($source, $param->{source_user});
526 $job->{vm_type} = $vm_type;
527 $source->{vm_type} = $vm_type;
528
529 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
530
531 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
532
533 #check if vm has zfs disks if not die;
534 get_disks($source, $param->{source_user}) if $source->{vmid};
535
536 update_cron($job);
537 update_state($job);
538
539 eval {
540 sync($param) if !$param->{skip};
541 };
542 if(my $err = $@) {
543 destroy_job($param);
544 print $err;
545 }
546 }
547
548 sub get_job {
549 my ($param) = @_;
550
551 my $cfg = read_cron();
552
553 if (!$cfg->{$param->{source}}->{$param->{name}}) {
554 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
555 }
556 my $job = $cfg->{$param->{source}}->{$param->{name}};
557 $job->{name} = $param->{name};
558 $job->{source} = $param->{source};
559 $job = add_state_to_job($job);
560
561 return $job;
562 }
563
564 sub destroy_job {
565 my ($param) = @_;
566
567 my $job = get_job($param);
568 $job->{state} = "del";
569
570 update_cron($job);
571 update_state($job);
572 }
573
574 sub sync {
575 my ($param) = @_;
576
577 my $lock_fh = IO::File->new("> $LOCKFILE");
578 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
579 lock($lock_fh);
580
581 my $date = get_date();
582 my $job;
583 eval {
584 $job = get_job($param);
585 };
586
587 if ($job && defined($job->{state}) && $job->{state} eq "syncing") {
588 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
589 }
590
591 my $dest = parse_target($param->{dest});
592 my $source = parse_target($param->{source});
593
594 my $sync_path = sub {
595 my ($source, $dest, $job, $param, $date) = @_;
596
597 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
598
599 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
600
601 send_image($source, $dest, $param);
602
603 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
604
605 };
606
607 my $vm_type = vm_exists($source, $param->{source_user});
608 $source->{vm_type} = $vm_type;
609
610 if ($job) {
611 $job->{state} = "syncing";
612 $job->{vm_type} = $vm_type if !$job->{vm_type};
613 update_state($job);
614 }
615
616 eval{
617 if ($source->{vmid}) {
618 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
619 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
620 my $disks = get_disks($source, $param->{source_user});
621
622 foreach my $disk (sort keys %{$disks}) {
623 $source->{all} = $disks->{$disk}->{all};
624 $source->{pool} = $disks->{$disk}->{pool};
625 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
626 $source->{last_part} = $disks->{$disk}->{last_part};
627 &$sync_path($source, $dest, $job, $param, $date);
628 }
629 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
630 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user});
631 } else {
632 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user});
633 }
634 } else {
635 &$sync_path($source, $dest, $job, $param, $date);
636 }
637 };
638 if(my $err = $@) {
639 if ($job) {
640 $job->{state} = "error";
641 update_state($job);
642 unlock($lock_fh);
643 close($lock_fh);
644 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
645 }
646 die "$err\n";
647 }
648
649 if ($job) {
650 $job->{state} = "ok";
651 $job->{lsync} = $date;
652 update_state($job);
653 }
654
655 unlock($lock_fh);
656 close($lock_fh);
657 }
658
659 sub snapshot_get{
660 my ($source, $dest, $max_snap, $name, $source_user) = @_;
661
662 my $cmd = [];
663 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
664 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
665 push @$cmd, $source->{all};
666
667 my $raw = run_cmd($cmd);
668 my $index = 0;
669 my $line = "";
670 my $last_snap = undef;
671 my $old_snap;
672
673 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
674 $line = $1;
675 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
676
677 $last_snap = $1 if (!$last_snap);
678 $old_snap = $1;
679 $index++;
680 if ($index == $max_snap) {
681 $source->{destroy} = 1;
682 last;
683 };
684 }
685 }
686
687 return ($old_snap, $last_snap) if $last_snap;
688
689 return undef;
690 }
691
692 sub snapshot_add {
693 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
694
695 my $snap_name = "rep_$name\_".$date;
696
697 $source->{new_snap} = $snap_name;
698
699 my $path = "$source->{all}\@$snap_name";
700
701 my $cmd = [];
702 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
703 push @$cmd, 'zfs', 'snapshot', $path;
704 eval{
705 run_cmd($cmd);
706 };
707
708 if (my $err = $@) {
709 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
710 die "$err\n";
711 }
712 }
713
714 sub write_cron {
715 my ($cfg) = @_;
716
717 my $text = "SHELL=/bin/sh\n";
718 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
719
720 my $fh = IO::File->new("> $CRONJOBS");
721 die "Could not open file: $!\n" if !$fh;
722
723 foreach my $source (sort keys%{$cfg}) {
724 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
725 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
726 $text .= "$PROG_PATH sync";
727 $text .= " -source ";
728 if ($cfg->{$source}->{$sync_name}->{vmid}) {
729 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
730 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
731 } else {
732 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
733 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
734 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
735 }
736 $text .= " -dest ";
737 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
738 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
739 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
740 $text .= " -name $sync_name ";
741 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
742 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
743 $text .= "\n";
744 }
745 }
746 die "Can't write to cron\n" if (!print($fh $text));
747 close($fh);
748 }
749
750 sub get_disks {
751 my ($target, $user) = @_;
752
753 my $cmd = [];
754 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
755
756 if ($target->{vm_type} eq 'qemu') {
757 push @$cmd, 'qm', 'config', $target->{vmid};
758 } elsif ($target->{vm_type} eq 'lxc') {
759 push @$cmd, 'pct', 'config', $target->{vmid};
760 } else {
761 die "VM Type unknown\n";
762 }
763
764 my $res = run_cmd($cmd);
765
766 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
767
768 return $disks;
769 }
770
771 sub run_cmd {
772 my ($cmd) = @_;
773 print "Start CMD\n" if $DEBUG;
774 print Dumper $cmd if $DEBUG;
775 if (ref($cmd) eq 'ARRAY') {
776 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
777 }
778 my $output = `$cmd 2>&1`;
779
780 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
781
782 chomp($output);
783 print Dumper $output if $DEBUG;
784 print "END CMD\n" if $DEBUG;
785 return $output;
786 }
787
788 sub parse_disks {
789 my ($text, $ip, $vm_type, $user) = @_;
790
791 my $disks;
792
793 my $num = 0;
794 while ($text && $text =~ s/^(.*?)(\n|$)//) {
795 my $line = $1;
796
797 next if $line =~ /media=cdrom/;
798 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
799
800 #QEMU if backup is not set include in sync
801 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
802
803 #LXC if backup is not set do no in sync
804 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
805
806 my $disk = undef;
807 my $stor = undef;
808 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
809 my @parameter = split(/,/,$1);
810
811 foreach my $opt (@parameter) {
812 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
813 $disk = $2;
814 $stor = $1;
815 last;
816 }
817 }
818 }
819 if (!defined($disk) || !defined($stor)) {
820 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
821 next;
822 }
823
824 my $cmd = [];
825 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
826 push @$cmd, 'pvesm', 'path', "$stor$disk";
827 my $path = run_cmd($cmd);
828
829 die "Get no path from pvesm path $stor$disk\n" if !$path;
830
831 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
832
833 my @array = split('/', $1);
834 $disks->{$num}->{pool} = shift(@array);
835 $disks->{$num}->{all} = $disks->{$num}->{pool};
836 if (0 < @array) {
837 $disks->{$num}->{path} = join('/', @array);
838 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
839 }
840 $disks->{$num}->{last_part} = $disk;
841 $disks->{$num}->{all} .= "\/$disk";
842
843 $num++;
844 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
845
846 $disks->{$num}->{pool} = $1;
847 $disks->{$num}->{all} = $disks->{$num}->{pool};
848
849 if ($2) {
850 $disks->{$num}->{path} = $3;
851 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
852 }
853
854 $disks->{$num}->{last_part} = $disk;
855 $disks->{$num}->{all} .= "\/$disk";
856
857 $num++;
858
859 } else {
860 die "ERROR: in path\n";
861 }
862 }
863
864 die "Vm include no disk on zfs.\n" if !$disks->{0};
865 return $disks;
866 }
867
868 sub snapshot_destroy {
869 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
870
871 my @zfscmd = ('zfs', 'destroy');
872 my $snapshot = "$source->{all}\@$snap";
873
874 eval {
875 if($source->{ip} && $method eq 'ssh'){
876 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
877 } else {
878 run_cmd([@zfscmd, $snapshot]);
879 }
880 };
881 if (my $erro = $@) {
882 warn "WARN: $erro";
883 }
884 if ($dest) {
885 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
886
887 my $path = "$dest->{all}";
888 $path .= "/$source->{last_part}" if $source->{last_part};
889
890 eval {
891 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
892 };
893 if (my $erro = $@) {
894 warn "WARN: $erro";
895 }
896 }
897 }
898
899 sub snapshot_exist {
900 my ($source , $dest, $method, $dest_user) = @_;
901
902 my $cmd = [];
903 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--' if $dest->{ip};
904 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
905
906 my $path = $dest->{all};
907 $path .= "/$source->{last_part}" if $source->{last_part};
908 $path .= "\@$source->{old_snap}";
909
910 push @$cmd, $path;
911
912
913 my $text = "";
914 eval {$text =run_cmd($cmd);};
915 if (my $erro =$@) {
916 warn "WARN: $erro";
917 return undef;
918 }
919
920 while ($text && $text =~ s/^(.*?)(\n|$)//) {
921 my $line =$1;
922 return 1 if $line =~ m/^.*$source->{old_snap}$/;
923 }
924 }
925
926 sub send_image {
927 my ($source, $dest, $param) = @_;
928
929 my $cmd = [];
930
931 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
932 push @$cmd, 'zfs', 'send';
933 push @$cmd, '-p', if $param->{properties};
934 push @$cmd, '-v' if $param->{verbose};
935
936 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{dest_user})) {
937 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
938 }
939 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
940
941 if ($param->{limit}){
942 my $bwl = $param->{limit}*1024;
943 push @$cmd, \'|', 'cstream', '-t', $bwl;
944 }
945 my $target = "$dest->{all}";
946 $target .= "/$source->{last_part}" if $source->{last_part};
947 $target =~ s!/+!/!g;
948
949 push @$cmd, \'|';
950 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
951 push @$cmd, 'zfs', 'recv', '-F', '--';
952 push @$cmd, "$target";
953
954 eval {
955 run_cmd($cmd)
956 };
957
958 if (my $erro = $@) {
959 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
960 die $erro;
961 };
962 }
963
964
965 sub send_config{
966 my ($source, $dest, $method, $source_user, $dest_user) = @_;
967
968 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
969 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
970
971 my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
972
973 $dest_target_new = $config_dir.'/'.$dest_target_new;
974
975 if ($method eq 'ssh'){
976 if ($dest->{ip} && $source->{ip}) {
977 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
978 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
979 } elsif ($dest->{ip}) {
980 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
981 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
982 } elsif ($source->{ip}) {
983 run_cmd(['mkdir', '-p', '--', $config_dir]);
984 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
985 }
986
987 if ($source->{destroy}){
988 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
989 if($dest->{ip}){
990 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
991 } else {
992 run_cmd(['rm', '-f', '--', $dest_target_old]);
993 }
994 }
995 } elsif ($method eq 'local') {
996 run_cmd(['mkdir', '-p', '--', $config_dir]);
997 run_cmd(['cp', $source_target, $dest_target_new]);
998 }
999 }
1000
1001 sub get_date {
1002 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1003 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1004
1005 return $datestamp;
1006 }
1007
1008 sub status {
1009 my $cfg = read_cron();
1010
1011 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1012
1013 my $states = read_state();
1014
1015 foreach my $source (sort keys%{$cfg}) {
1016 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1017 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1018 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1019 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1020 }
1021 }
1022
1023 return $status_list;
1024 }
1025
1026 sub enable_job {
1027 my ($param) = @_;
1028
1029 my $job = get_job($param);
1030 $job->{state} = "ok";
1031 update_state($job);
1032 update_cron($job);
1033 }
1034
1035 sub disable_job {
1036 my ($param) = @_;
1037
1038 my $job = get_job($param);
1039 $job->{state} = "stopped";
1040 update_state($job);
1041 update_cron($job);
1042 }
1043
1044 my $cmd_help = {
1045 destroy => qq{
1046 $PROGNAME destroy -source <string> [OPTIONS]
1047
1048 remove a sync Job from the scheduler
1049
1050 -name string
1051
1052 name of the sync job, if not set it is default
1053
1054 -source string
1055
1056 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1057 },
1058 create => qq{
1059 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1060
1061 Create a sync Job
1062
1063 -dest string
1064
1065 the destination target is like [IP]:<Pool>[/Path]
1066
1067 -dest-user string
1068
1069 name of the user on the destination target, root by default
1070
1071 -limit integer
1072
1073 max sync speed in kBytes/s, default unlimited
1074
1075 -maxsnap string
1076
1077 how much snapshots will be kept before get erased, default 1
1078
1079 -name string
1080
1081 name of the sync job, if not set it is default
1082
1083 -skip boolean
1084
1085 if this flag is set it will skip the first sync
1086
1087 -source string
1088
1089 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1090
1091 -source-user string
1092
1093 name of the user on the source target, root by default
1094
1095 -properties boolean
1096
1097 Include the dataset's properties in the stream.
1098 },
1099 sync => qq{
1100 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1101
1102 will sync one time
1103
1104 -dest string
1105
1106 the destination target is like [IP:]<Pool>[/Path]
1107
1108 -dest-user string
1109
1110 name of the user on the destination target, root by default
1111
1112 -limit integer
1113
1114 max sync speed in kBytes/s, default unlimited
1115
1116 -maxsnap integer
1117
1118 how much snapshots will be kept before get erased, default 1
1119
1120 -name string
1121
1122 name of the sync job, if not set it is default.
1123 It is only necessary if scheduler allready contains this source.
1124
1125 -source string
1126
1127 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1128
1129 -source-user string
1130
1131 name of the user on the source target, root by default
1132
1133 -verbose boolean
1134
1135 print out the sync progress.
1136
1137 -properties boolean
1138
1139 Include the dataset's properties in the stream.
1140 },
1141 list => qq{
1142 $PROGNAME list
1143
1144 Get a List of all scheduled Sync Jobs
1145 },
1146 status => qq{
1147 $PROGNAME status
1148
1149 Get the status of all scheduled Sync Jobs
1150 },
1151 help => qq{
1152 $PROGNAME help <cmd> [OPTIONS]
1153
1154 Get help about specified command.
1155
1156 <cmd> string
1157
1158 Command name
1159
1160 -verbose boolean
1161
1162 Verbose output format.
1163 },
1164 enable => qq{
1165 $PROGNAME enable -source <string> [OPTIONS]
1166
1167 enable a syncjob and reset error
1168
1169 -name string
1170
1171 name of the sync job, if not set it is default
1172
1173 -source string
1174
1175 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1176 },
1177 disable => qq{
1178 $PROGNAME disable -source <string> [OPTIONS]
1179
1180 pause a sync job
1181
1182 -name string
1183
1184 name of the sync job, if not set it is default
1185
1186 -source string
1187
1188 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1189 },
1190 printpod => 'internal command',
1191
1192 };
1193
1194 if (!$command) {
1195 usage(); die "\n";
1196 } elsif (!$cmd_help->{$command}) {
1197 print "ERROR: unknown command '$command'";
1198 usage(1); die "\n";
1199 }
1200
1201 my @arg = @ARGV;
1202 my $param = parse_argv(@arg);
1203
1204 sub check_params {
1205 for (@_) {
1206 die "$cmd_help->{$command}\n" if !$param->{$_};
1207 }
1208 }
1209
1210 if ($command eq 'destroy') {
1211 check_params(qw(source));
1212
1213 check_target($param->{source});
1214 destroy_job($param);
1215
1216 } elsif ($command eq 'sync') {
1217 check_params(qw(source dest));
1218
1219 check_target($param->{source});
1220 check_target($param->{dest});
1221 sync($param);
1222
1223 } elsif ($command eq 'create') {
1224 check_params(qw(source dest));
1225
1226 check_target($param->{source});
1227 check_target($param->{dest});
1228 init($param);
1229
1230 } elsif ($command eq 'status') {
1231 print status();
1232
1233 } elsif ($command eq 'list') {
1234 print list();
1235
1236 } elsif ($command eq 'help') {
1237 my $help_command = $ARGV[1];
1238
1239 if ($help_command && $cmd_help->{$help_command}) {
1240 die "$cmd_help->{$help_command}\n";
1241
1242 }
1243 if ($param->{verbose}) {
1244 exec("man $PROGNAME");
1245
1246 } else {
1247 usage(1);
1248
1249 }
1250
1251 } elsif ($command eq 'enable') {
1252 check_params(qw(source));
1253
1254 check_target($param->{source});
1255 enable_job($param);
1256
1257 } elsif ($command eq 'disable') {
1258 check_params(qw(source));
1259
1260 check_target($param->{source});
1261 disable_job($param);
1262
1263 } elsif ($command eq 'printpod') {
1264 print_pod();
1265 }
1266
1267 sub usage {
1268 my ($help) = @_;
1269
1270 print("ERROR:\tno command specified\n") if !$help;
1271 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1272 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1273 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1274 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1275 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1276 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1277 print("\t$PROGNAME list\n");
1278 print("\t$PROGNAME status\n");
1279 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1280 }
1281
1282 sub check_target {
1283 my ($target) = @_;
1284 parse_target($target);
1285 }
1286
1287 sub print_pod {
1288
1289 my $synopsis = join("\n", sort values %$cmd_help);
1290
1291 print <<EOF;
1292 =head1 NAME
1293
1294 pve-zsync - PVE ZFS Replication Manager
1295
1296 =head1 SYNOPSIS
1297
1298 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1299
1300 $synopsis
1301
1302 =head1 DESCRIPTION
1303
1304 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1305 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1306 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1307 To config cron see man crontab.
1308
1309 =head2 PVE ZFS Storage sync Tool
1310
1311 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1312
1313 =head1 EXAMPLES
1314
1315 add sync job from local VM to remote ZFS Server
1316 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1317
1318 =head1 IMPORTANT FILES
1319
1320 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1321
1322 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1323
1324 =head1 COPYRIGHT AND DISCLAIMER
1325
1326 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1327
1328 This program is free software: you can redistribute it and/or modify it
1329 under the terms of the GNU Affero General Public License as published
1330 by the Free Software Foundation, either version 3 of the License, or
1331 (at your option) any later version.
1332
1333 This program is distributed in the hope that it will be useful, but
1334 WITHOUT ANY WARRANTY; without even the implied warranty of
1335 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1336 Affero General Public License for more details.
1337
1338 You should have received a copy of the GNU Affero General Public
1339 License along with this program. If not, see
1340 <http://www.gnu.org/licenses/>.
1341
1342 EOF
1343 }