]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
fix another case where source-user was used to connect to dest
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Copy qw(move);
9 use File::Path qw(make_path);
10 use JSON;
11 use IO::File;
12 use String::ShellQuote 'shell_quote';
13
14 my $PROGNAME = "pve-zsync";
15 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
16 my $STATE = "${CONFIG_PATH}/sync_state";
17 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
18 my $PATH = "/usr/sbin";
19 my $PVE_DIR = "/etc/pve/local";
20 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
21 my $LXC_CONF = "${PVE_DIR}/lxc";
22 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
23 my $PROG_PATH = "$PATH/${PROGNAME}";
24 my $INTERVAL = 15;
25 my $DEBUG = 0;
26
27 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
28 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
29 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
30 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
31
32 my $IPV6RE = "(?:" .
33 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
34 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
35 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
36 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
41 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
42
43 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
44 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
45 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
46 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
47 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
48
49 my $command = $ARGV[0];
50
51 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
52 check_bin ('cstream');
53 check_bin ('zfs');
54 check_bin ('ssh');
55 check_bin ('scp');
56 }
57
58 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =
59 sub {
60 die "Signal aborting sync\n";
61 };
62
63 sub check_bin {
64 my ($bin) = @_;
65
66 foreach my $p (split (/:/, $ENV{PATH})) {
67 my $fn = "$p/$bin";
68 if (-x $fn) {
69 return $fn;
70 }
71 }
72
73 die "unable to find command '$bin'\n";
74 }
75
76 sub cut_target_width {
77 my ($path, $maxlen) = @_;
78 $path =~ s@/+@/@g;
79
80 return $path if length($path) <= $maxlen;
81
82 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
83
84 $path =~ s@/([^/]+/?)$@@;
85 my $tail = $1;
86
87 if (length($tail)+3 == $maxlen) {
88 return "../$tail";
89 } elsif (length($tail)+2 >= $maxlen) {
90 return '..'.substr($tail, -$maxlen+2)
91 }
92
93 $path =~ s@(/[^/]+)(?:/|$)@@;
94 my $head = $1;
95 my $both = length($head) + length($tail);
96 my $remaining = $maxlen-$both-4; # -4 for "/../"
97
98 if ($remaining < 0) {
99 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
100 }
101
102 substr($path, ($remaining/2), (length($path)-$remaining), '..');
103 return "$head/" . $path . "/$tail";
104 }
105
106 sub lock {
107 my ($fh) = @_;
108 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
109 }
110
111 sub unlock {
112 my ($fh) = @_;
113 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
114 }
115
116 sub get_status {
117 my ($source, $name, $status) = @_;
118
119 if ($status->{$source->{all}}->{$name}->{status}) {
120 return $status;
121 }
122
123 return undef;
124 }
125
126 sub check_pool_exists {
127 my ($target, $user) = @_;
128
129 my $cmd = [];
130
131 if ($target->{ip}) {
132 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
133 }
134 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
135 eval {
136 run_cmd($cmd);
137 };
138
139 if ($@) {
140 return 0;
141 }
142 return 1;
143 }
144
145 sub parse_target {
146 my ($text) = @_;
147
148 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
149 my $target = {};
150
151 if ($text !~ $TARGETRE) {
152 die "$errstr\n";
153 }
154 $target->{all} = $2;
155 $target->{ip} = $1 if $1;
156 my @parts = split('/', $2);
157
158 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
159
160 my $pool = $target->{pool} = shift(@parts);
161 die "$errstr\n" if !$pool;
162
163 if ($pool =~ m/^\d+$/) {
164 $target->{vmid} = $pool;
165 delete $target->{pool};
166 }
167
168 return $target if (@parts == 0);
169 $target->{last_part} = pop(@parts);
170
171 if ($target->{ip}) {
172 pop(@parts);
173 }
174 if (@parts > 0) {
175 $target->{path} = join('/', @parts);
176 }
177
178 return $target;
179 }
180
181 sub read_cron {
182
183 #This is for the first use to init file;
184 if (!-e $CRONJOBS) {
185 my $new_fh = IO::File->new("> $CRONJOBS");
186 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
187 close($new_fh);
188 return undef;
189 }
190
191 my $fh = IO::File->new("< $CRONJOBS");
192 die "Could not open file $CRONJOBS: $!\n" if !$fh;
193
194 my @text = <$fh>;
195
196 close($fh);
197
198 return encode_cron(@text);
199 }
200
201 sub parse_argv {
202 my (@arg) = @_;
203
204 my $param = {
205 dest => undef,
206 source => undef,
207 verbose => undef,
208 limit => undef,
209 maxsnap => undef,
210 name => undef,
211 skip => undef,
212 method => undef,
213 source_user => undef,
214 dest_user => undef,
215 };
216
217 my ($ret) = GetOptionsFromArray(
218 \@arg,
219 'dest=s' => \$param->{dest},
220 'source=s' => \$param->{source},
221 'verbose' => \$param->{verbose},
222 'limit=i' => \$param->{limit},
223 'maxsnap=i' => \$param->{maxsnap},
224 'name=s' => \$param->{name},
225 'skip' => \$param->{skip},
226 'method=s' => \$param->{method},
227 'source-user=s' => \$param->{source_user},
228 'dest-user=s' => \$param->{dest_user}
229 );
230
231 die "can't parse options\n" if $ret == 0;
232
233 $param->{name} //= "default";
234 $param->{maxsnap} //= 1;
235 $param->{method} //= "ssh";
236 $param->{source_user} //= "root";
237 $param->{dest_user} //= "root";
238
239 return $param;
240 }
241
242 sub add_state_to_job {
243 my ($job) = @_;
244
245 my $states = read_state();
246 my $state = $states->{$job->{source}}->{$job->{name}};
247
248 $job->{state} = $state->{state};
249 $job->{lsync} = $state->{lsync};
250 $job->{vm_type} = $state->{vm_type};
251
252 for (my $i = 0; $state->{"snap$i"}; $i++) {
253 $job->{"snap$i"} = $state->{"snap$i"};
254 }
255
256 return $job;
257 }
258
259 sub encode_cron {
260 my (@text) = @_;
261
262 my $cfg = {};
263
264 while (my $line = shift(@text)) {
265
266 my @arg = split('\s', $line);
267 my $param = parse_argv(@arg);
268
269 if ($param->{source} && $param->{dest}) {
270 my $source = delete $param->{source};
271 my $name = delete $param->{name};
272
273 $cfg->{$source}->{$name} = $param;
274 }
275 }
276
277 return $cfg;
278 }
279
280 sub param_to_job {
281 my ($param) = @_;
282
283 my $job = {};
284
285 my $source = parse_target($param->{source});
286 my $dest = parse_target($param->{dest}) if $param->{dest};
287
288 $job->{name} = !$param->{name} ? "default" : $param->{name};
289 $job->{dest} = $param->{dest} if $param->{dest};
290 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
291 $job->{method} = "ssh" if !$job->{method};
292 $job->{limit} = $param->{limit};
293 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
294 $job->{source} = $param->{source};
295 $job->{source_user} = $param->{source_user};
296 $job->{dest_user} = $param->{dest_user};
297
298 return $job;
299 }
300
301 sub read_state {
302
303 if (!-e $STATE) {
304 make_path $CONFIG_PATH;
305 my $new_fh = IO::File->new("> $STATE");
306 die "Could not create $STATE: $!\n" if !$new_fh;
307 print $new_fh "{}";
308 close($new_fh);
309 return undef;
310 }
311
312 my $fh = IO::File->new("< $STATE");
313 die "Could not open file $STATE: $!\n" if !$fh;
314
315 my $text = <$fh>;
316 my $states = decode_json($text);
317
318 close($fh);
319
320 return $states;
321 }
322
323 sub update_state {
324 my ($job) = @_;
325 my $text;
326 my $in_fh;
327
328 eval {
329
330 $in_fh = IO::File->new("< $STATE");
331 die "Could not open file $STATE: $!\n" if !$in_fh;
332 lock($in_fh);
333 $text = <$in_fh>;
334 };
335
336 my $out_fh = IO::File->new("> $STATE.new");
337 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
338
339 my $states = {};
340 my $state = {};
341 if ($text){
342 $states = decode_json($text);
343 $state = $states->{$job->{source}}->{$job->{name}};
344 }
345
346 if ($job->{state} ne "del") {
347 $state->{state} = $job->{state};
348 $state->{lsync} = $job->{lsync};
349 $state->{vm_type} = $job->{vm_type};
350
351 for (my $i = 0; $job->{"snap$i"} ; $i++) {
352 $state->{"snap$i"} = $job->{"snap$i"};
353 }
354 $states->{$job->{source}}->{$job->{name}} = $state;
355 } else {
356
357 delete $states->{$job->{source}}->{$job->{name}};
358 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
359 }
360
361 $text = encode_json($states);
362 print $out_fh $text;
363
364 close($out_fh);
365 move("$STATE.new", $STATE);
366 eval {
367 close($in_fh);
368 };
369
370 return $states;
371 }
372
373 sub update_cron {
374 my ($job) = @_;
375
376 my $updated;
377 my $has_header;
378 my $line_no = 0;
379 my $text = "";
380 my $header = "SHELL=/bin/sh\n";
381 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
382
383 my $fh = IO::File->new("< $CRONJOBS");
384 die "Could not open file $CRONJOBS: $!\n" if !$fh;
385 lock($fh);
386
387 my @test = <$fh>;
388
389 while (my $line = shift(@test)) {
390 chomp($line);
391 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
392 $updated = 1;
393 next if $job->{state} eq "del";
394 $text .= format_job($job, $line);
395 } else {
396 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
397 $has_header = 1;
398 }
399 $text .= "$line\n";
400 }
401 $line_no++;
402 }
403
404 if (!$has_header) {
405 $text = "$header$text";
406 }
407
408 if (!$updated) {
409 $text .= format_job($job);
410 }
411 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
412 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
413
414 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
415 close ($new_fh);
416
417 die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
418 close ($fh);
419 }
420
421 sub format_job {
422 my ($job, $line) = @_;
423 my $text = "";
424
425 if ($job->{state} eq "stopped") {
426 $text = "#";
427 }
428 if ($line) {
429 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
430 $text .= $1;
431 } else {
432 $text .= "*/$INTERVAL * * * *";
433 }
434 $text .= " root";
435 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
436 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
437 $text .= " --limit $job->{limit}" if $job->{limit};
438 $text .= " --method $job->{method}";
439 $text .= " --verbose" if $job->{verbose};
440 $text .= " --source-user $job->{source_user}";
441 $text .= " --dest-user $job->{dest_user}";
442 $text .= "\n";
443
444 return $text;
445 }
446
447 sub list {
448
449 my $cfg = read_cron();
450
451 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
452
453 my $states = read_state();
454 foreach my $source (sort keys%{$cfg}) {
455 foreach my $name (sort keys%{$cfg->{$source}}) {
456 $list .= sprintf("%-25s", cut_target_width($source, 25));
457 $list .= sprintf("%-25s", cut_target_width($name, 25));
458 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
459 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
460 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
461 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
462 }
463 }
464
465 return $list;
466 }
467
468 sub vm_exists {
469 my ($target, $user) = @_;
470
471 my @cmd = ('ssh', "$user\@$target->{ip}", '--') if $target->{ip};
472
473 my $res = undef;
474
475 return undef if !defined($target->{vmid});
476
477 eval { $res = run_cmd([@cmd, 'ls', "$QEMU_CONF/$target->{vmid}.conf"]) };
478
479 return "qemu" if $res;
480
481 eval { $res = run_cmd([@cmd, 'ls', "$LXC_CONF/$target->{vmid}.conf"]) };
482
483 return "lxc" if $res;
484
485 return undef;
486 }
487
488 sub init {
489 my ($param) = @_;
490
491 my $cfg = read_cron();
492
493 my $job = param_to_job($param);
494
495 $job->{state} = "ok";
496 $job->{lsync} = 0;
497
498 my $source = parse_target($param->{source});
499 my $dest = parse_target($param->{dest});
500
501 if (my $ip = $dest->{ip}) {
502 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
503 }
504
505 if (my $ip = $source->{ip}) {
506 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
507 }
508
509 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
510
511 if (!defined($source->{vmid})) {
512 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
513 }
514
515 my $vm_type = vm_exists($source, $param->{source_user});
516 $job->{vm_type} = $vm_type;
517 $source->{vm_type} = $vm_type;
518
519 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
520
521 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
522
523 #check if vm has zfs disks if not die;
524 get_disks($source, $param->{source_user}) if $source->{vmid};
525
526 update_cron($job);
527 update_state($job);
528
529 eval {
530 sync($param) if !$param->{skip};
531 };
532 if(my $err = $@) {
533 destroy_job($param);
534 print $err;
535 }
536 }
537
538 sub get_job {
539 my ($param) = @_;
540
541 my $cfg = read_cron();
542
543 if (!$cfg->{$param->{source}}->{$param->{name}}) {
544 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
545 }
546 my $job = $cfg->{$param->{source}}->{$param->{name}};
547 $job->{name} = $param->{name};
548 $job->{source} = $param->{source};
549 $job = add_state_to_job($job);
550
551 return $job;
552 }
553
554 sub destroy_job {
555 my ($param) = @_;
556
557 my $job = get_job($param);
558 $job->{state} = "del";
559
560 update_cron($job);
561 update_state($job);
562 }
563
564 sub sync {
565 my ($param) = @_;
566
567 my $lock_fh = IO::File->new("> $LOCKFILE");
568 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
569 lock($lock_fh);
570
571 my $date = get_date();
572 my $job;
573 eval {
574 $job = get_job($param);
575 };
576
577 if ($job && $job->{state} eq "syncing") {
578 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
579 }
580
581 my $dest = parse_target($param->{dest});
582 my $source = parse_target($param->{source});
583
584 my $sync_path = sub {
585 my ($source, $dest, $job, $param, $date) = @_;
586
587 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
588
589 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
590
591 send_image($source, $dest, $param);
592
593 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
594
595 };
596
597 my $vm_type = vm_exists($source, $param->{source_user});
598 $source->{vm_type} = $vm_type;
599
600 if ($job) {
601 $job->{state} = "syncing";
602 $job->{vm_type} = $vm_type if !$job->{vm_type};
603 update_state($job);
604 }
605
606 eval{
607 if ($source->{vmid}) {
608 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
609 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
610 my $disks = get_disks($source, $param->{source_user});
611
612 foreach my $disk (sort keys %{$disks}) {
613 $source->{all} = $disks->{$disk}->{all};
614 $source->{pool} = $disks->{$disk}->{pool};
615 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
616 $source->{last_part} = $disks->{$disk}->{last_part};
617 &$sync_path($source, $dest, $job, $param, $date);
618 }
619 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
620 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user});
621 } else {
622 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user});
623 }
624 } else {
625 &$sync_path($source, $dest, $job, $param, $date);
626 }
627 };
628 if(my $err = $@) {
629 if ($job) {
630 $job->{state} = "error";
631 update_state($job);
632 unlock($lock_fh);
633 close($lock_fh);
634 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
635 }
636 die "$err\n";
637 }
638
639 if ($job) {
640 $job->{state} = "ok";
641 $job->{lsync} = $date;
642 update_state($job);
643 }
644
645 unlock($lock_fh);
646 close($lock_fh);
647 }
648
649 sub snapshot_get{
650 my ($source, $dest, $max_snap, $name, $source_user) = @_;
651
652 my $cmd = [];
653 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
654 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
655 push @$cmd, $source->{all};
656
657 my $raw = run_cmd($cmd);
658 my $index = 0;
659 my $line = "";
660 my $last_snap = undef;
661 my $old_snap;
662
663 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
664 $line = $1;
665 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
666
667 $last_snap = $1 if (!$last_snap);
668 $old_snap = $1;
669 $index++;
670 if ($index == $max_snap) {
671 $source->{destroy} = 1;
672 last;
673 };
674 }
675 }
676
677 return ($old_snap, $last_snap) if $last_snap;
678
679 return undef;
680 }
681
682 sub snapshot_add {
683 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
684
685 my $snap_name = "rep_$name\_".$date;
686
687 $source->{new_snap} = $snap_name;
688
689 my $path = "$source->{all}\@$snap_name";
690
691 my $cmd = [];
692 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
693 push @$cmd, 'zfs', 'snapshot', $path;
694 eval{
695 run_cmd($cmd);
696 };
697
698 if (my $err = $@) {
699 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
700 die "$err\n";
701 }
702 }
703
704 sub write_cron {
705 my ($cfg) = @_;
706
707 my $text = "SHELL=/bin/sh\n";
708 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
709
710 my $fh = IO::File->new("> $CRONJOBS");
711 die "Could not open file: $!\n" if !$fh;
712
713 foreach my $source (sort keys%{$cfg}) {
714 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
715 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
716 $text .= "$PROG_PATH sync";
717 $text .= " -source ";
718 if ($cfg->{$source}->{$sync_name}->{vmid}) {
719 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
720 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
721 } else {
722 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
723 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
724 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
725 }
726 $text .= " -dest ";
727 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
728 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
729 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
730 $text .= " -name $sync_name ";
731 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
732 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
733 $text .= "\n";
734 }
735 }
736 die "Can't write to cron\n" if (!print($fh $text));
737 close($fh);
738 }
739
740 sub get_disks {
741 my ($target, $user) = @_;
742
743 my $cmd = [];
744 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
745
746 if ($target->{vm_type} eq 'qemu') {
747 push @$cmd, 'qm', 'config', $target->{vmid};
748 } elsif ($target->{vm_type} eq 'lxc') {
749 push @$cmd, 'pct', 'config', $target->{vmid};
750 } else {
751 die "VM Type unknown\n";
752 }
753
754 my $res = run_cmd($cmd);
755
756 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
757
758 return $disks;
759 }
760
761 sub run_cmd {
762 my ($cmd) = @_;
763 print "Start CMD\n" if $DEBUG;
764 print Dumper $cmd if $DEBUG;
765 if (ref($cmd) eq 'ARRAY') {
766 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
767 }
768 my $output = `$cmd 2>&1`;
769
770 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
771
772 chomp($output);
773 print Dumper $output if $DEBUG;
774 print "END CMD\n" if $DEBUG;
775 return $output;
776 }
777
778 sub parse_disks {
779 my ($text, $ip, $vm_type, $user) = @_;
780
781 my $disks;
782
783 my $num = 0;
784 while ($text && $text =~ s/^(.*?)(\n|$)//) {
785 my $line = $1;
786
787 next if $line =~ /media=cdrom/;
788 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
789
790 #QEMU if backup is not set include in sync
791 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
792
793 #LXC if backup is not set do no in sync
794 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
795
796 my $disk = undef;
797 my $stor = undef;
798 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
799 my @parameter = split(/,/,$1);
800
801 foreach my $opt (@parameter) {
802 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
803 $disk = $2;
804 $stor = $1;
805 last;
806 }
807 }
808 }
809 if (!defined($disk) || !defined($stor)) {
810 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
811 next;
812 }
813
814 my $cmd = [];
815 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
816 push @$cmd, 'pvesm', 'path', "$stor$disk";
817 my $path = run_cmd($cmd);
818
819 die "Get no path from pvesm path $stor$disk\n" if !$path;
820
821 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
822
823 my @array = split('/', $1);
824 $disks->{$num}->{pool} = shift(@array);
825 $disks->{$num}->{all} = $disks->{$num}->{pool};
826 if (0 < @array) {
827 $disks->{$num}->{path} = join('/', @array);
828 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
829 }
830 $disks->{$num}->{last_part} = $disk;
831 $disks->{$num}->{all} .= "\/$disk";
832
833 $num++;
834 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
835
836 $disks->{$num}->{pool} = $1;
837 $disks->{$num}->{all} = $disks->{$num}->{pool};
838
839 if ($2) {
840 $disks->{$num}->{path} = $3;
841 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
842 }
843
844 $disks->{$num}->{last_part} = $disk;
845 $disks->{$num}->{all} .= "\/$disk";
846
847 $num++;
848
849 } else {
850 die "ERROR: in path\n";
851 }
852 }
853
854 die "Vm include no disk on zfs.\n" if !$disks->{0};
855 return $disks;
856 }
857
858 sub snapshot_destroy {
859 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
860
861 my @zfscmd = ('zfs', 'destroy');
862 my $snapshot = "$source->{all}\@$snap";
863
864 eval {
865 if($source->{ip} && $method eq 'ssh'){
866 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
867 } else {
868 run_cmd([@zfscmd, $snapshot]);
869 }
870 };
871 if (my $erro = $@) {
872 warn "WARN: $erro";
873 }
874 if ($dest) {
875 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
876
877 my $path = "$dest->{all}";
878 $path .= "/$source->{last_part}" if $source->{last_part};
879
880 eval {
881 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
882 };
883 if (my $erro = $@) {
884 warn "WARN: $erro";
885 }
886 }
887 }
888
889 sub snapshot_exist {
890 my ($source , $dest, $method, $dest_user) = @_;
891
892 my $cmd = [];
893 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--' if $dest->{ip};
894 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
895
896 my $path = $dest->{all};
897 $path .= "/$source->{last_part}" if $source->{last_part};
898 $path .= "\@$source->{old_snap}";
899
900 push @$cmd, $path;
901
902
903 my $text = "";
904 eval {$text =run_cmd($cmd);};
905 if (my $erro =$@) {
906 warn "WARN: $erro";
907 return undef;
908 }
909
910 while ($text && $text =~ s/^(.*?)(\n|$)//) {
911 my $line =$1;
912 return 1 if $line =~ m/^.*$source->{old_snap}$/;
913 }
914 }
915
916 sub send_image {
917 my ($source, $dest, $param) = @_;
918
919 my $cmd = [];
920
921 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
922 push @$cmd, 'zfs', 'send';
923 push @$cmd, '-v' if $param->{verbose};
924
925 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{dest_user})) {
926 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
927 }
928 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
929
930 if ($param->{limit}){
931 my $bwl = $param->{limit}*1024;
932 push @$cmd, \'|', 'cstream', '-t', $bwl;
933 }
934 my $target = "$dest->{all}";
935 $target .= "/$source->{last_part}" if $source->{last_part};
936 $target =~ s!/+!/!g;
937
938 push @$cmd, \'|';
939 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
940 push @$cmd, 'zfs', 'recv', '-F', '--';
941 push @$cmd, "$target";
942
943 eval {
944 run_cmd($cmd)
945 };
946
947 if (my $erro = $@) {
948 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
949 die $erro;
950 };
951 }
952
953
954 sub send_config{
955 my ($source, $dest, $method, $source_user, $dest_user) = @_;
956
957 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
958 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
959
960 my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
961
962 $dest_target_new = $config_dir.'/'.$dest_target_new;
963
964 if ($method eq 'ssh'){
965 if ($dest->{ip} && $source->{ip}) {
966 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
967 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
968 } elsif ($dest->{ip}) {
969 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
970 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
971 } elsif ($source->{ip}) {
972 run_cmd(['mkdir', '-p', '--', $config_dir]);
973 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
974 }
975
976 if ($source->{destroy}){
977 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
978 if($dest->{ip}){
979 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
980 } else {
981 run_cmd(['rm', '-f', '--', $dest_target_old]);
982 }
983 }
984 } elsif ($method eq 'local') {
985 run_cmd(['mkdir', '-p', '--', $config_dir]);
986 run_cmd(['cp', $source_target, $dest_target_new]);
987 }
988 }
989
990 sub get_date {
991 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
992 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
993
994 return $datestamp;
995 }
996
997 sub status {
998 my $cfg = read_cron();
999
1000 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1001
1002 my $states = read_state();
1003
1004 foreach my $source (sort keys%{$cfg}) {
1005 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1006 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1007 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1008 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1009 }
1010 }
1011
1012 return $status_list;
1013 }
1014
1015 sub enable_job {
1016 my ($param) = @_;
1017
1018 my $job = get_job($param);
1019 $job->{state} = "ok";
1020 update_state($job);
1021 update_cron($job);
1022 }
1023
1024 sub disable_job {
1025 my ($param) = @_;
1026
1027 my $job = get_job($param);
1028 $job->{state} = "stopped";
1029 update_state($job);
1030 update_cron($job);
1031 }
1032
1033 my $cmd_help = {
1034 destroy => qq{
1035 $PROGNAME destroy -source <string> [OPTIONS]
1036
1037 remove a sync Job from the scheduler
1038
1039 -name string
1040
1041 name of the sync job, if not set it is default
1042
1043 -source string
1044
1045 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1046 },
1047 create => qq{
1048 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1049
1050 Create a sync Job
1051
1052 -dest string
1053
1054 the destination target is like [IP]:<Pool>[/Path]
1055
1056 -dest-user string
1057
1058 name of the user on the destination target, root by default
1059
1060 -limit integer
1061
1062 max sync speed in kBytes/s, default unlimited
1063
1064 -maxsnap string
1065
1066 how much snapshots will be kept before get erased, default 1
1067
1068 -name string
1069
1070 name of the sync job, if not set it is default
1071
1072 -skip boolean
1073
1074 if this flag is set it will skip the first sync
1075
1076 -source string
1077
1078 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1079
1080 -source-user string
1081
1082 name of the user on the source target, root by default
1083 },
1084 sync => qq{
1085 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1086
1087 will sync one time
1088
1089 -dest string
1090
1091 the destination target is like [IP:]<Pool>[/Path]
1092
1093 -dest-user string
1094
1095 name of the user on the destination target, root by default
1096
1097 -limit integer
1098
1099 max sync speed in kBytes/s, default unlimited
1100
1101 -maxsnap integer
1102
1103 how much snapshots will be kept before get erased, default 1
1104
1105 -name string
1106
1107 name of the sync job, if not set it is default.
1108 It is only necessary if scheduler allready contains this source.
1109
1110 -source string
1111
1112 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1113
1114 -source-user string
1115
1116 name of the user on the source target, root by default
1117
1118 -verbose boolean
1119
1120 print out the sync progress.
1121 },
1122 list => qq{
1123 $PROGNAME list
1124
1125 Get a List of all scheduled Sync Jobs
1126 },
1127 status => qq{
1128 $PROGNAME status
1129
1130 Get the status of all scheduled Sync Jobs
1131 },
1132 help => qq{
1133 $PROGNAME help <cmd> [OPTIONS]
1134
1135 Get help about specified command.
1136
1137 <cmd> string
1138
1139 Command name
1140
1141 -verbose boolean
1142
1143 Verbose output format.
1144 },
1145 enable => qq{
1146 $PROGNAME enable -source <string> [OPTIONS]
1147
1148 enable a syncjob and reset error
1149
1150 -name string
1151
1152 name of the sync job, if not set it is default
1153
1154 -source string
1155
1156 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1157 },
1158 disable => qq{
1159 $PROGNAME disable -source <string> [OPTIONS]
1160
1161 pause a sync job
1162
1163 -name string
1164
1165 name of the sync job, if not set it is default
1166
1167 -source string
1168
1169 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1170 },
1171 printpod => 'internal command',
1172
1173 };
1174
1175 if (!$command) {
1176 usage(); die "\n";
1177 } elsif (!$cmd_help->{$command}) {
1178 print "ERROR: unknown command '$command'";
1179 usage(1); die "\n";
1180 }
1181
1182 my @arg = @ARGV;
1183 my $param = parse_argv(@arg);
1184
1185 sub check_params {
1186 for (@_) {
1187 die "$cmd_help->{$command}\n" if !$param->{$_};
1188 }
1189 }
1190
1191 if ($command eq 'destroy') {
1192 check_params(qw(source));
1193
1194 check_target($param->{source});
1195 destroy_job($param);
1196
1197 } elsif ($command eq 'sync') {
1198 check_params(qw(source dest));
1199
1200 check_target($param->{source});
1201 check_target($param->{dest});
1202 sync($param);
1203
1204 } elsif ($command eq 'create') {
1205 check_params(qw(source dest));
1206
1207 check_target($param->{source});
1208 check_target($param->{dest});
1209 init($param);
1210
1211 } elsif ($command eq 'status') {
1212 print status();
1213
1214 } elsif ($command eq 'list') {
1215 print list();
1216
1217 } elsif ($command eq 'help') {
1218 my $help_command = $ARGV[1];
1219
1220 if ($help_command && $cmd_help->{$help_command}) {
1221 die "$cmd_help->{$help_command}\n";
1222
1223 }
1224 if ($param->{verbose}) {
1225 exec("man $PROGNAME");
1226
1227 } else {
1228 usage(1);
1229
1230 }
1231
1232 } elsif ($command eq 'enable') {
1233 check_params(qw(source));
1234
1235 check_target($param->{source});
1236 enable_job($param);
1237
1238 } elsif ($command eq 'disable') {
1239 check_params(qw(source));
1240
1241 check_target($param->{source});
1242 disable_job($param);
1243
1244 } elsif ($command eq 'printpod') {
1245 print_pod();
1246 }
1247
1248 sub usage {
1249 my ($help) = @_;
1250
1251 print("ERROR:\tno command specified\n") if !$help;
1252 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1253 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1254 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1255 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1256 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1257 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1258 print("\t$PROGNAME list\n");
1259 print("\t$PROGNAME status\n");
1260 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1261 }
1262
1263 sub check_target {
1264 my ($target) = @_;
1265 parse_target($target);
1266 }
1267
1268 sub print_pod {
1269
1270 my $synopsis = join("\n", sort values %$cmd_help);
1271
1272 print <<EOF;
1273 =head1 NAME
1274
1275 pve-zsync - PVE ZFS Replication Manager
1276
1277 =head1 SYNOPSIS
1278
1279 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1280
1281 $synopsis
1282
1283 =head1 DESCRIPTION
1284
1285 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1286 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1287 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1288 To config cron see man crontab.
1289
1290 =head2 PVE ZFS Storage sync Tool
1291
1292 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1293
1294 =head1 EXAMPLES
1295
1296 add sync job from local VM to remote ZFS Server
1297 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1298
1299 =head1 IMPORTANT FILES
1300
1301 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1302
1303 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1304
1305 =head1 COPYRIGHT AND DISCLAIMER
1306
1307 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1308
1309 This program is free software: you can redistribute it and/or modify it
1310 under the terms of the GNU Affero General Public License as published
1311 by the Free Software Foundation, either version 3 of the License, or
1312 (at your option) any later version.
1313
1314 This program is distributed in the hope that it will be useful, but
1315 WITHOUT ANY WARRANTY; without even the implied warranty of
1316 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1317 Affero General Public License for more details.
1318
1319 You should have received a copy of the GNU Affero General Public
1320 License along with this program. If not, see
1321 <http://www.gnu.org/licenses/>.
1322
1323 EOF
1324 }