]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
bump version to 2.3.1
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
22 my $PROG_PATH = "$PATH/${PROGNAME}";
23 my $INTERVAL = 15;
24 my $DEBUG;
25
26 BEGIN {
27 $DEBUG = 0; # change default here. not above on declaration!
28 $DEBUG ||= $ENV{ZSYNC_DEBUG};
29 if ($DEBUG) {
30 require Data::Dumper;
31 Data::Dumper->import();
32 }
33 }
34
35 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
36 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
37 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
38 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
39
40 my $IPV6RE = "(?:" .
41 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
42 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
43 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
49 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
50
51 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
52 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
53 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
54 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
55 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
56
57 my $command = $ARGV[0];
58
59 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
60 check_bin ('cstream');
61 check_bin ('zfs');
62 check_bin ('ssh');
63 check_bin ('scp');
64 }
65
66 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
67 die "Signaled, aborting sync: $!\n";
68 };
69
70 sub check_bin {
71 my ($bin) = @_;
72
73 foreach my $p (split (/:/, $ENV{PATH})) {
74 my $fn = "$p/$bin";
75 if (-x $fn) {
76 return $fn;
77 }
78 }
79
80 die "unable to find command '$bin'\n";
81 }
82
83 sub cut_target_width {
84 my ($path, $maxlen) = @_;
85 $path =~ s@/+@/@g;
86
87 return $path if length($path) <= $maxlen;
88
89 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
90
91 $path =~ s@/([^/]+/?)$@@;
92 my $tail = $1;
93
94 if (length($tail)+3 == $maxlen) {
95 return "../$tail";
96 } elsif (length($tail)+2 >= $maxlen) {
97 return '..'.substr($tail, -$maxlen+2)
98 }
99
100 $path =~ s@(/[^/]+)(?:/|$)@@;
101 my $head = $1;
102 my $both = length($head) + length($tail);
103 my $remaining = $maxlen-$both-4; # -4 for "/../"
104
105 if ($remaining < 0) {
106 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
107 }
108
109 substr($path, ($remaining/2), (length($path)-$remaining), '..');
110 return "$head/" . $path . "/$tail";
111 }
112
113 sub lock {
114 my ($fh) = @_;
115 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
116 }
117
118 sub unlock {
119 my ($fh) = @_;
120 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
121 }
122
123 sub get_status {
124 my ($source, $name, $status) = @_;
125
126 if ($status->{$source->{all}}->{$name}->{status}) {
127 return $status;
128 }
129
130 return undef;
131 }
132
133 sub check_pool_exists {
134 my ($target, $user) = @_;
135
136 my $cmd = [];
137
138 if ($target->{ip}) {
139 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
140 }
141 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
142 eval {
143 run_cmd($cmd);
144 };
145
146 if ($@) {
147 return 0;
148 }
149 return 1;
150 }
151
152 sub parse_target {
153 my ($text) = @_;
154
155 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
156 my $target = {};
157
158 if ($text !~ $TARGETRE) {
159 die "$errstr\n";
160 }
161 $target->{all} = $2;
162 $target->{ip} = $1 if $1;
163 my @parts = split('/', $2);
164
165 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
166
167 my $pool = $target->{pool} = shift(@parts);
168 die "$errstr\n" if !$pool;
169
170 if ($pool =~ m/^\d+$/) {
171 $target->{vmid} = $pool;
172 delete $target->{pool};
173 }
174
175 return $target if (@parts == 0);
176 $target->{last_part} = pop(@parts);
177
178 if ($target->{ip}) {
179 pop(@parts);
180 }
181 if (@parts > 0) {
182 $target->{path} = join('/', @parts);
183 }
184
185 return $target;
186 }
187
188 sub read_cron {
189
190 #This is for the first use to init file;
191 if (!-e $CRONJOBS) {
192 my $new_fh = IO::File->new("> $CRONJOBS");
193 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
194 close($new_fh);
195 return undef;
196 }
197
198 my $fh = IO::File->new("< $CRONJOBS");
199 die "Could not open file $CRONJOBS: $!\n" if !$fh;
200
201 my @text = <$fh>;
202
203 close($fh);
204
205 return encode_cron(@text);
206 }
207
208 sub parse_argv {
209 my (@arg) = @_;
210
211 my $param = {
212 dest => undef,
213 source => undef,
214 verbose => undef,
215 limit => undef,
216 maxsnap => undef,
217 name => undef,
218 skip => undef,
219 method => undef,
220 source_user => undef,
221 dest_user => undef,
222 properties => undef,
223 dest_config_path => undef,
224 };
225
226 my ($ret) = GetOptionsFromArray(
227 \@arg,
228 'dest=s' => \$param->{dest},
229 'source=s' => \$param->{source},
230 'verbose' => \$param->{verbose},
231 'limit=i' => \$param->{limit},
232 'maxsnap=i' => \$param->{maxsnap},
233 'name=s' => \$param->{name},
234 'skip' => \$param->{skip},
235 'method=s' => \$param->{method},
236 'source-user=s' => \$param->{source_user},
237 'dest-user=s' => \$param->{dest_user},
238 'properties' => \$param->{properties},
239 'dest-config-path=s' => \$param->{dest_config_path},
240 );
241
242 die "can't parse options\n" if $ret == 0;
243
244 $param->{name} //= "default";
245 $param->{maxsnap} //= 1;
246 $param->{method} //= "ssh";
247 $param->{source_user} //= "root";
248 $param->{dest_user} //= "root";
249
250 return $param;
251 }
252
253 sub add_state_to_job {
254 my ($job) = @_;
255
256 my $states = read_state();
257 my $state = $states->{$job->{source}}->{$job->{name}};
258
259 $job->{state} = $state->{state};
260 $job->{lsync} = $state->{lsync};
261 $job->{vm_type} = $state->{vm_type};
262
263 for (my $i = 0; $state->{"snap$i"}; $i++) {
264 $job->{"snap$i"} = $state->{"snap$i"};
265 }
266
267 return $job;
268 }
269
270 sub encode_cron {
271 my (@text) = @_;
272
273 my $cfg = {};
274
275 while (my $line = shift(@text)) {
276
277 my @arg = split('\s', $line);
278 my $param = parse_argv(@arg);
279
280 if ($param->{source} && $param->{dest}) {
281 my $source = delete $param->{source};
282 my $name = delete $param->{name};
283
284 $cfg->{$source}->{$name} = $param;
285 }
286 }
287
288 return $cfg;
289 }
290
291 sub param_to_job {
292 my ($param) = @_;
293
294 my $job = {};
295
296 my $source = parse_target($param->{source});
297 my $dest = parse_target($param->{dest}) if $param->{dest};
298
299 $job->{name} = !$param->{name} ? "default" : $param->{name};
300 $job->{dest} = $param->{dest} if $param->{dest};
301 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
302 $job->{method} = "ssh" if !$job->{method};
303 $job->{limit} = $param->{limit};
304 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
305 $job->{source} = $param->{source};
306 $job->{source_user} = $param->{source_user};
307 $job->{dest_user} = $param->{dest_user};
308 $job->{properties} = !!$param->{properties};
309 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
310
311 return $job;
312 }
313
314 sub read_state {
315
316 if (!-e $STATE) {
317 make_path $CONFIG_PATH;
318 my $new_fh = IO::File->new("> $STATE");
319 die "Could not create $STATE: $!\n" if !$new_fh;
320 print $new_fh "{}";
321 close($new_fh);
322 return undef;
323 }
324
325 my $fh = IO::File->new("< $STATE");
326 die "Could not open file $STATE: $!\n" if !$fh;
327
328 my $text = <$fh>;
329 my $states = decode_json($text);
330
331 close($fh);
332
333 return $states;
334 }
335
336 sub update_state {
337 my ($job) = @_;
338 my $text;
339 my $in_fh;
340
341 eval {
342
343 $in_fh = IO::File->new("< $STATE");
344 die "Could not open file $STATE: $!\n" if !$in_fh;
345 lock($in_fh);
346 $text = <$in_fh>;
347 };
348
349 my $out_fh = IO::File->new("> $STATE.new");
350 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
351
352 my $states = {};
353 my $state = {};
354 if ($text){
355 $states = decode_json($text);
356 $state = $states->{$job->{source}}->{$job->{name}};
357 }
358
359 if ($job->{state} ne "del") {
360 $state->{state} = $job->{state};
361 $state->{lsync} = $job->{lsync};
362 $state->{vm_type} = $job->{vm_type};
363
364 for (my $i = 0; $job->{"snap$i"} ; $i++) {
365 $state->{"snap$i"} = $job->{"snap$i"};
366 }
367 $states->{$job->{source}}->{$job->{name}} = $state;
368 } else {
369
370 delete $states->{$job->{source}}->{$job->{name}};
371 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
372 }
373
374 $text = encode_json($states);
375 print $out_fh $text;
376
377 close($out_fh);
378 rename "$STATE.new", $STATE;
379 eval {
380 close($in_fh);
381 };
382
383 return $states;
384 }
385
386 sub update_cron {
387 my ($job) = @_;
388
389 my $updated;
390 my $has_header;
391 my $line_no = 0;
392 my $text = "";
393 my $header = "SHELL=/bin/sh\n";
394 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
395
396 my $fh = IO::File->new("< $CRONJOBS");
397 die "Could not open file $CRONJOBS: $!\n" if !$fh;
398 lock($fh);
399
400 my @test = <$fh>;
401
402 while (my $line = shift(@test)) {
403 chomp($line);
404 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
405 $updated = 1;
406 next if $job->{state} eq "del";
407 $text .= format_job($job, $line);
408 } else {
409 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
410 $has_header = 1;
411 }
412 $text .= "$line\n";
413 }
414 $line_no++;
415 }
416
417 if (!$has_header) {
418 $text = "$header$text";
419 }
420
421 if (!$updated) {
422 $text .= format_job($job);
423 }
424 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
425 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
426
427 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
428 close ($new_fh);
429
430 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
431 close ($fh);
432 }
433
434 sub format_job {
435 my ($job, $line) = @_;
436 my $text = "";
437
438 if ($job->{state} eq "stopped") {
439 $text = "#";
440 }
441 if ($line) {
442 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
443 $text .= $1;
444 } else {
445 $text .= "*/$INTERVAL * * * *";
446 }
447 $text .= " root";
448 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
449 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
450 $text .= " --limit $job->{limit}" if $job->{limit};
451 $text .= " --method $job->{method}";
452 $text .= " --verbose" if $job->{verbose};
453 $text .= " --source-user $job->{source_user}";
454 $text .= " --dest-user $job->{dest_user}";
455 $text .= " --properties" if $job->{properties};
456 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
457 $text .= "\n";
458
459 return $text;
460 }
461
462 sub list {
463
464 my $cfg = read_cron();
465
466 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
467
468 my $states = read_state();
469 foreach my $source (sort keys%{$cfg}) {
470 foreach my $name (sort keys%{$cfg->{$source}}) {
471 $list .= sprintf("%-25s", cut_target_width($source, 25));
472 $list .= sprintf("%-25s", cut_target_width($name, 25));
473 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
474 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
475 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
476 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
477 }
478 }
479
480 return $list;
481 }
482
483 sub vm_exists {
484 my ($target, $user) = @_;
485
486 return undef if !defined($target->{vmid});
487
488 my $conf_fn = "$target->{vmid}.conf";
489
490 if ($target->{ip}) {
491 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
492 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
493 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
494 } else {
495 return "qemu" if -f "$QEMU_CONF/$conf_fn";
496 return "lxc" if -f "$LXC_CONF/$conf_fn";
497 }
498
499 return undef;
500 }
501
502 sub init {
503 my ($param) = @_;
504
505 my $cfg = read_cron();
506
507 my $job = param_to_job($param);
508
509 $job->{state} = "ok";
510 $job->{lsync} = 0;
511
512 my $source = parse_target($param->{source});
513 my $dest = parse_target($param->{dest});
514
515 if (my $ip = $dest->{ip}) {
516 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
517 }
518
519 if (my $ip = $source->{ip}) {
520 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
521 }
522
523 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
524
525 if (!defined($source->{vmid})) {
526 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
527 }
528
529 my $vm_type = vm_exists($source, $param->{source_user});
530 $job->{vm_type} = $vm_type;
531 $source->{vm_type} = $vm_type;
532
533 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
534
535 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
536
537 #check if vm has zfs disks if not die;
538 get_disks($source, $param->{source_user}) if $source->{vmid};
539
540 update_cron($job);
541 update_state($job);
542
543 eval {
544 sync($param) if !$param->{skip};
545 };
546 if(my $err = $@) {
547 destroy_job($param);
548 print $err;
549 }
550 }
551
552 sub get_job {
553 my ($param) = @_;
554
555 my $cfg = read_cron();
556
557 if (!$cfg->{$param->{source}}->{$param->{name}}) {
558 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
559 }
560 my $job = $cfg->{$param->{source}}->{$param->{name}};
561 $job->{name} = $param->{name};
562 $job->{source} = $param->{source};
563 $job = add_state_to_job($job);
564
565 return $job;
566 }
567
568 sub destroy_job {
569 my ($param) = @_;
570
571 my $job = get_job($param);
572 $job->{state} = "del";
573
574 update_cron($job);
575 update_state($job);
576 }
577
578 sub sync {
579 my ($param) = @_;
580
581 my $lock_fh = IO::File->new("> $LOCKFILE");
582 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
583 lock($lock_fh);
584
585 my $date = get_date();
586 my $job;
587 eval {
588 $job = get_job($param);
589 };
590
591 if ($job && defined($job->{state}) && $job->{state} eq "syncing") {
592 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
593 }
594
595 my $dest = parse_target($param->{dest});
596 my $source = parse_target($param->{source});
597
598 my $sync_path = sub {
599 my ($source, $dest, $job, $param, $date) = @_;
600
601 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
602
603 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
604
605 send_image($source, $dest, $param);
606
607 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
608
609 };
610
611 my $vm_type = vm_exists($source, $param->{source_user});
612 $source->{vm_type} = $vm_type;
613
614 if ($job) {
615 $job->{state} = "syncing";
616 $job->{vm_type} = $vm_type if !$job->{vm_type};
617 update_state($job);
618 }
619
620 eval{
621 if ($source->{vmid}) {
622 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
623 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
624 my $disks = get_disks($source, $param->{source_user});
625
626 foreach my $disk (sort keys %{$disks}) {
627 $source->{all} = $disks->{$disk}->{all};
628 $source->{pool} = $disks->{$disk}->{pool};
629 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
630 $source->{last_part} = $disks->{$disk}->{last_part};
631 &$sync_path($source, $dest, $job, $param, $date);
632 }
633 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
634 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
635 } else {
636 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
637 }
638 } else {
639 &$sync_path($source, $dest, $job, $param, $date);
640 }
641 };
642 if(my $err = $@) {
643 if ($job) {
644 $job->{state} = "error";
645 update_state($job);
646 unlock($lock_fh);
647 close($lock_fh);
648 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
649 }
650 die "$err\n";
651 }
652
653 if ($job) {
654 $job->{state} = "ok";
655 $job->{lsync} = $date;
656 update_state($job);
657 }
658
659 unlock($lock_fh);
660 close($lock_fh);
661 }
662
663 sub snapshot_get{
664 my ($source, $dest, $max_snap, $name, $source_user) = @_;
665
666 my $cmd = [];
667 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
668 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
669 push @$cmd, $source->{all};
670
671 my $raw = run_cmd($cmd);
672 my $index = 0;
673 my $line = "";
674 my $last_snap = undef;
675 my $old_snap;
676
677 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
678 $line = $1;
679 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
680
681 $last_snap = $1 if (!$last_snap);
682 $old_snap = $1;
683 $index++;
684 if ($index == $max_snap) {
685 $source->{destroy} = 1;
686 last;
687 };
688 }
689 }
690
691 return ($old_snap, $last_snap) if $last_snap;
692
693 return undef;
694 }
695
696 sub snapshot_add {
697 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
698
699 my $snap_name = "rep_$name\_".$date;
700
701 $source->{new_snap} = $snap_name;
702
703 my $path = "$source->{all}\@$snap_name";
704
705 my $cmd = [];
706 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
707 push @$cmd, 'zfs', 'snapshot', $path;
708 eval{
709 run_cmd($cmd);
710 };
711
712 if (my $err = $@) {
713 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
714 die "$err\n";
715 }
716 }
717
718 sub write_cron {
719 my ($cfg) = @_;
720
721 my $text = "SHELL=/bin/sh\n";
722 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
723
724 my $fh = IO::File->new("> $CRONJOBS");
725 die "Could not open file: $!\n" if !$fh;
726
727 foreach my $source (sort keys%{$cfg}) {
728 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
729 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
730 $text .= "$PROG_PATH sync";
731 $text .= " -source ";
732 if ($cfg->{$source}->{$sync_name}->{vmid}) {
733 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
734 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
735 } else {
736 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
737 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
738 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
739 }
740 $text .= " -dest ";
741 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
742 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
743 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
744 $text .= " -name $sync_name ";
745 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
746 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
747 $text .= "\n";
748 }
749 }
750 die "Can't write to cron\n" if (!print($fh $text));
751 close($fh);
752 }
753
754 sub get_disks {
755 my ($target, $user) = @_;
756
757 my $cmd = [];
758 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
759
760 if ($target->{vm_type} eq 'qemu') {
761 push @$cmd, 'qm', 'config', $target->{vmid};
762 } elsif ($target->{vm_type} eq 'lxc') {
763 push @$cmd, 'pct', 'config', $target->{vmid};
764 } else {
765 die "VM Type unknown\n";
766 }
767
768 my $res = run_cmd($cmd);
769
770 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
771
772 return $disks;
773 }
774
775 sub run_cmd {
776 my ($cmd) = @_;
777 print "Start CMD\n" if $DEBUG;
778 print Dumper $cmd if $DEBUG;
779 if (ref($cmd) eq 'ARRAY') {
780 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
781 }
782 my $output = `$cmd 2>&1`;
783
784 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
785
786 chomp($output);
787 print Dumper $output if $DEBUG;
788 print "END CMD\n" if $DEBUG;
789 return $output;
790 }
791
792 sub parse_disks {
793 my ($text, $ip, $vm_type, $user) = @_;
794
795 my $disks;
796
797 my $num = 0;
798 while ($text && $text =~ s/^(.*?)(\n|$)//) {
799 my $line = $1;
800
801 next if $line =~ /media=cdrom/;
802 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
803
804 #QEMU if backup is not set include in sync
805 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
806
807 #LXC if backup is not set do no in sync
808 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
809
810 my $disk = undef;
811 my $stor = undef;
812 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
813 my @parameter = split(/,/,$1);
814
815 foreach my $opt (@parameter) {
816 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
817 $disk = $2;
818 $stor = $1;
819 last;
820 }
821 }
822 }
823 if (!defined($disk) || !defined($stor)) {
824 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
825 next;
826 }
827
828 my $cmd = [];
829 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
830 push @$cmd, 'pvesm', 'path', "$stor$disk";
831 my $path = run_cmd($cmd);
832
833 die "Get no path from pvesm path $stor$disk\n" if !$path;
834
835 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
836
837 my @array = split('/', $1);
838 $disks->{$num}->{pool} = shift(@array);
839 $disks->{$num}->{all} = $disks->{$num}->{pool};
840 if (0 < @array) {
841 $disks->{$num}->{path} = join('/', @array);
842 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
843 }
844 $disks->{$num}->{last_part} = $disk;
845 $disks->{$num}->{all} .= "\/$disk";
846
847 $num++;
848 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
849
850 $disks->{$num}->{pool} = $1;
851 $disks->{$num}->{all} = $disks->{$num}->{pool};
852
853 if ($2) {
854 $disks->{$num}->{path} = $3;
855 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
856 }
857
858 $disks->{$num}->{last_part} = $disk;
859 $disks->{$num}->{all} .= "\/$disk";
860
861 $num++;
862
863 } else {
864 die "ERROR: in path\n";
865 }
866 }
867
868 die "Vm include no disk on zfs.\n" if !$disks->{0};
869 return $disks;
870 }
871
872 sub snapshot_destroy {
873 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
874
875 my @zfscmd = ('zfs', 'destroy');
876 my $snapshot = "$source->{all}\@$snap";
877
878 eval {
879 if($source->{ip} && $method eq 'ssh'){
880 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
881 } else {
882 run_cmd([@zfscmd, $snapshot]);
883 }
884 };
885 if (my $erro = $@) {
886 warn "WARN: $erro";
887 }
888 if ($dest) {
889 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
890
891 my $path = "$dest->{all}";
892 $path .= "/$source->{last_part}" if $source->{last_part};
893
894 eval {
895 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
896 };
897 if (my $erro = $@) {
898 warn "WARN: $erro";
899 }
900 }
901 }
902
903 sub snapshot_exist {
904 my ($source , $dest, $method, $dest_user) = @_;
905
906 my $cmd = [];
907 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--' if $dest->{ip};
908 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
909
910 my $path = $dest->{all};
911 $path .= "/$source->{last_part}" if $source->{last_part};
912 $path .= "\@$source->{old_snap}";
913
914 push @$cmd, $path;
915
916
917 my $text = "";
918 eval {$text =run_cmd($cmd);};
919 if (my $erro =$@) {
920 warn "WARN: $erro";
921 return undef;
922 }
923
924 while ($text && $text =~ s/^(.*?)(\n|$)//) {
925 my $line =$1;
926 return 1 if $line =~ m/^.*$source->{old_snap}$/;
927 }
928 }
929
930 sub send_image {
931 my ($source, $dest, $param) = @_;
932
933 my $cmd = [];
934
935 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
936 push @$cmd, 'zfs', 'send';
937 push @$cmd, '-p', if $param->{properties};
938 push @$cmd, '-v' if $param->{verbose};
939
940 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{dest_user})) {
941 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
942 }
943 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
944
945 if ($param->{limit}){
946 my $bwl = $param->{limit}*1024;
947 push @$cmd, \'|', 'cstream', '-t', $bwl;
948 }
949 my $target = "$dest->{all}";
950 $target .= "/$source->{last_part}" if $source->{last_part};
951 $target =~ s!/+!/!g;
952
953 push @$cmd, \'|';
954 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
955 push @$cmd, 'zfs', 'recv', '-F', '--';
956 push @$cmd, "$target";
957
958 eval {
959 run_cmd($cmd)
960 };
961
962 if (my $erro = $@) {
963 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
964 die $erro;
965 };
966 }
967
968
969 sub send_config{
970 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
971
972 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
973 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
974
975 my $config_dir = $dest_config_path // $CONFIG_PATH;
976 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
977
978 $dest_target_new = $config_dir.'/'.$dest_target_new;
979
980 if ($method eq 'ssh'){
981 if ($dest->{ip} && $source->{ip}) {
982 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
983 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
984 } elsif ($dest->{ip}) {
985 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
986 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
987 } elsif ($source->{ip}) {
988 run_cmd(['mkdir', '-p', '--', $config_dir]);
989 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
990 }
991
992 if ($source->{destroy}){
993 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
994 if($dest->{ip}){
995 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
996 } else {
997 run_cmd(['rm', '-f', '--', $dest_target_old]);
998 }
999 }
1000 } elsif ($method eq 'local') {
1001 run_cmd(['mkdir', '-p', '--', $config_dir]);
1002 run_cmd(['cp', $source_target, $dest_target_new]);
1003 }
1004 }
1005
1006 sub get_date {
1007 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1008 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1009
1010 return $datestamp;
1011 }
1012
1013 sub status {
1014 my $cfg = read_cron();
1015
1016 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1017
1018 my $states = read_state();
1019
1020 foreach my $source (sort keys%{$cfg}) {
1021 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1022 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1023 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1024 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1025 }
1026 }
1027
1028 return $status_list;
1029 }
1030
1031 sub enable_job {
1032 my ($param) = @_;
1033
1034 my $job = get_job($param);
1035 $job->{state} = "ok";
1036 update_state($job);
1037 update_cron($job);
1038 }
1039
1040 sub disable_job {
1041 my ($param) = @_;
1042
1043 my $job = get_job($param);
1044 $job->{state} = "stopped";
1045 update_state($job);
1046 update_cron($job);
1047 }
1048
1049 my $cmd_help = {
1050 destroy => qq{
1051 $PROGNAME destroy -source <string> [OPTIONS]
1052
1053 remove a sync Job from the scheduler
1054
1055 -name string
1056
1057 name of the sync job, if not set it is default
1058
1059 -source string
1060
1061 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1062 },
1063 create => qq{
1064 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1065
1066 Create a sync Job
1067
1068 -dest string
1069
1070 the destination target is like [IP]:<Pool>[/Path]
1071
1072 -dest-user string
1073
1074 name of the user on the destination target, root by default
1075
1076 -limit integer
1077
1078 max sync speed in kBytes/s, default unlimited
1079
1080 -maxsnap string
1081
1082 how much snapshots will be kept before get erased, default 1
1083
1084 -name string
1085
1086 name of the sync job, if not set it is default
1087
1088 -skip boolean
1089
1090 if this flag is set it will skip the first sync
1091
1092 -source string
1093
1094 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1095
1096 -source-user string
1097
1098 name of the user on the source target, root by default
1099
1100 -properties boolean
1101
1102 Include the dataset's properties in the stream.
1103
1104 -dest-config-path string
1105
1106 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1107 },
1108 sync => qq{
1109 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1110
1111 will sync one time
1112
1113 -dest string
1114
1115 the destination target is like [IP:]<Pool>[/Path]
1116
1117 -dest-user string
1118
1119 name of the user on the destination target, root by default
1120
1121 -limit integer
1122
1123 max sync speed in kBytes/s, default unlimited
1124
1125 -maxsnap integer
1126
1127 how much snapshots will be kept before get erased, default 1
1128
1129 -name string
1130
1131 name of the sync job, if not set it is default.
1132 It is only necessary if scheduler allready contains this source.
1133
1134 -source string
1135
1136 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1137
1138 -source-user string
1139
1140 name of the user on the source target, root by default
1141
1142 -verbose boolean
1143
1144 print out the sync progress.
1145
1146 -properties boolean
1147
1148 Include the dataset's properties in the stream.
1149
1150 -dest-config-path string
1151
1152 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1153 },
1154 list => qq{
1155 $PROGNAME list
1156
1157 Get a List of all scheduled Sync Jobs
1158 },
1159 status => qq{
1160 $PROGNAME status
1161
1162 Get the status of all scheduled Sync Jobs
1163 },
1164 help => qq{
1165 $PROGNAME help <cmd> [OPTIONS]
1166
1167 Get help about specified command.
1168
1169 <cmd> string
1170
1171 Command name
1172
1173 -verbose boolean
1174
1175 Verbose output format.
1176 },
1177 enable => qq{
1178 $PROGNAME enable -source <string> [OPTIONS]
1179
1180 enable a syncjob and reset error
1181
1182 -name string
1183
1184 name of the sync job, if not set it is default
1185
1186 -source string
1187
1188 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1189 },
1190 disable => qq{
1191 $PROGNAME disable -source <string> [OPTIONS]
1192
1193 pause a sync job
1194
1195 -name string
1196
1197 name of the sync job, if not set it is default
1198
1199 -source string
1200
1201 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1202 },
1203 printpod => 'internal command',
1204
1205 };
1206
1207 if (!$command) {
1208 usage(); die "\n";
1209 } elsif (!$cmd_help->{$command}) {
1210 print "ERROR: unknown command '$command'";
1211 usage(1); die "\n";
1212 }
1213
1214 my @arg = @ARGV;
1215 my $param = parse_argv(@arg);
1216
1217 sub check_params {
1218 for (@_) {
1219 die "$cmd_help->{$command}\n" if !$param->{$_};
1220 }
1221 }
1222
1223 if ($command eq 'destroy') {
1224 check_params(qw(source));
1225
1226 check_target($param->{source});
1227 destroy_job($param);
1228
1229 } elsif ($command eq 'sync') {
1230 check_params(qw(source dest));
1231
1232 check_target($param->{source});
1233 check_target($param->{dest});
1234 sync($param);
1235
1236 } elsif ($command eq 'create') {
1237 check_params(qw(source dest));
1238
1239 check_target($param->{source});
1240 check_target($param->{dest});
1241 init($param);
1242
1243 } elsif ($command eq 'status') {
1244 print status();
1245
1246 } elsif ($command eq 'list') {
1247 print list();
1248
1249 } elsif ($command eq 'help') {
1250 my $help_command = $ARGV[1];
1251
1252 if ($help_command && $cmd_help->{$help_command}) {
1253 die "$cmd_help->{$help_command}\n";
1254
1255 }
1256 if ($param->{verbose}) {
1257 exec("man $PROGNAME");
1258
1259 } else {
1260 usage(1);
1261
1262 }
1263
1264 } elsif ($command eq 'enable') {
1265 check_params(qw(source));
1266
1267 check_target($param->{source});
1268 enable_job($param);
1269
1270 } elsif ($command eq 'disable') {
1271 check_params(qw(source));
1272
1273 check_target($param->{source});
1274 disable_job($param);
1275
1276 } elsif ($command eq 'printpod') {
1277 print_pod();
1278 }
1279
1280 sub usage {
1281 my ($help) = @_;
1282
1283 print("ERROR:\tno command specified\n") if !$help;
1284 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1285 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1286 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1287 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1288 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1289 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1290 print("\t$PROGNAME list\n");
1291 print("\t$PROGNAME status\n");
1292 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1293 }
1294
1295 sub check_target {
1296 my ($target) = @_;
1297 parse_target($target);
1298 }
1299
1300 sub print_pod {
1301
1302 my $synopsis = join("\n", sort values %$cmd_help);
1303
1304 print <<EOF;
1305 =head1 NAME
1306
1307 pve-zsync - PVE ZFS Replication Manager
1308
1309 =head1 SYNOPSIS
1310
1311 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1312
1313 $synopsis
1314
1315 =head1 DESCRIPTION
1316
1317 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1318 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1319 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1320 To config cron see man crontab.
1321
1322 =head2 PVE ZFS Storage sync Tool
1323
1324 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1325
1326 =head1 EXAMPLES
1327
1328 add sync job from local VM to remote ZFS Server
1329 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1330
1331 =head1 IMPORTANT FILES
1332
1333 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1334
1335 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1336
1337 =head1 COPYRIGHT AND DISCLAIMER
1338
1339 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1340
1341 This program is free software: you can redistribute it and/or modify it
1342 under the terms of the GNU Affero General Public License as published
1343 by the Free Software Foundation, either version 3 of the License, or
1344 (at your option) any later version.
1345
1346 This program is distributed in the hope that it will be useful, but
1347 WITHOUT ANY WARRANTY; without even the implied warranty of
1348 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1349 Affero General Public License for more details.
1350
1351 You should have received a copy of the GNU Affero General Public
1352 License along with this program. If not, see
1353 <http://www.gnu.org/licenses/>.
1354
1355 EOF
1356 }