]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
replace File::Copy::move with built-in rename
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
22 my $PROG_PATH = "$PATH/${PROGNAME}";
23 my $INTERVAL = 15;
24 my $DEBUG = 0;
25
26 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
27 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
28 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
29 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
30
31 my $IPV6RE = "(?:" .
32 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
33 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
34 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
35 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
36 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
41
42 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
43 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
44 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
45 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
46 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
47
48 my $command = $ARGV[0];
49
50 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
51 check_bin ('cstream');
52 check_bin ('zfs');
53 check_bin ('ssh');
54 check_bin ('scp');
55 }
56
57 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =
58 sub {
59 die "Signal aborting sync\n";
60 };
61
62 sub check_bin {
63 my ($bin) = @_;
64
65 foreach my $p (split (/:/, $ENV{PATH})) {
66 my $fn = "$p/$bin";
67 if (-x $fn) {
68 return $fn;
69 }
70 }
71
72 die "unable to find command '$bin'\n";
73 }
74
75 sub cut_target_width {
76 my ($path, $maxlen) = @_;
77 $path =~ s@/+@/@g;
78
79 return $path if length($path) <= $maxlen;
80
81 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
82
83 $path =~ s@/([^/]+/?)$@@;
84 my $tail = $1;
85
86 if (length($tail)+3 == $maxlen) {
87 return "../$tail";
88 } elsif (length($tail)+2 >= $maxlen) {
89 return '..'.substr($tail, -$maxlen+2)
90 }
91
92 $path =~ s@(/[^/]+)(?:/|$)@@;
93 my $head = $1;
94 my $both = length($head) + length($tail);
95 my $remaining = $maxlen-$both-4; # -4 for "/../"
96
97 if ($remaining < 0) {
98 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
99 }
100
101 substr($path, ($remaining/2), (length($path)-$remaining), '..');
102 return "$head/" . $path . "/$tail";
103 }
104
105 sub lock {
106 my ($fh) = @_;
107 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
108 }
109
110 sub unlock {
111 my ($fh) = @_;
112 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
113 }
114
115 sub get_status {
116 my ($source, $name, $status) = @_;
117
118 if ($status->{$source->{all}}->{$name}->{status}) {
119 return $status;
120 }
121
122 return undef;
123 }
124
125 sub check_pool_exists {
126 my ($target, $user) = @_;
127
128 my $cmd = [];
129
130 if ($target->{ip}) {
131 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
132 }
133 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
134 eval {
135 run_cmd($cmd);
136 };
137
138 if ($@) {
139 return 0;
140 }
141 return 1;
142 }
143
144 sub parse_target {
145 my ($text) = @_;
146
147 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
148 my $target = {};
149
150 if ($text !~ $TARGETRE) {
151 die "$errstr\n";
152 }
153 $target->{all} = $2;
154 $target->{ip} = $1 if $1;
155 my @parts = split('/', $2);
156
157 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
158
159 my $pool = $target->{pool} = shift(@parts);
160 die "$errstr\n" if !$pool;
161
162 if ($pool =~ m/^\d+$/) {
163 $target->{vmid} = $pool;
164 delete $target->{pool};
165 }
166
167 return $target if (@parts == 0);
168 $target->{last_part} = pop(@parts);
169
170 if ($target->{ip}) {
171 pop(@parts);
172 }
173 if (@parts > 0) {
174 $target->{path} = join('/', @parts);
175 }
176
177 return $target;
178 }
179
180 sub read_cron {
181
182 #This is for the first use to init file;
183 if (!-e $CRONJOBS) {
184 my $new_fh = IO::File->new("> $CRONJOBS");
185 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
186 close($new_fh);
187 return undef;
188 }
189
190 my $fh = IO::File->new("< $CRONJOBS");
191 die "Could not open file $CRONJOBS: $!\n" if !$fh;
192
193 my @text = <$fh>;
194
195 close($fh);
196
197 return encode_cron(@text);
198 }
199
200 sub parse_argv {
201 my (@arg) = @_;
202
203 my $param = {
204 dest => undef,
205 source => undef,
206 verbose => undef,
207 limit => undef,
208 maxsnap => undef,
209 name => undef,
210 skip => undef,
211 method => undef,
212 source_user => undef,
213 dest_user => undef,
214 properties => undef,
215 };
216
217 my ($ret) = GetOptionsFromArray(
218 \@arg,
219 'dest=s' => \$param->{dest},
220 'source=s' => \$param->{source},
221 'verbose' => \$param->{verbose},
222 'limit=i' => \$param->{limit},
223 'maxsnap=i' => \$param->{maxsnap},
224 'name=s' => \$param->{name},
225 'skip' => \$param->{skip},
226 'method=s' => \$param->{method},
227 'source-user=s' => \$param->{source_user},
228 'dest-user=s' => \$param->{dest_user},
229 'properties' => \$param->{properties},
230 );
231
232 die "can't parse options\n" if $ret == 0;
233
234 $param->{name} //= "default";
235 $param->{maxsnap} //= 1;
236 $param->{method} //= "ssh";
237 $param->{source_user} //= "root";
238 $param->{dest_user} //= "root";
239
240 return $param;
241 }
242
243 sub add_state_to_job {
244 my ($job) = @_;
245
246 my $states = read_state();
247 my $state = $states->{$job->{source}}->{$job->{name}};
248
249 $job->{state} = $state->{state};
250 $job->{lsync} = $state->{lsync};
251 $job->{vm_type} = $state->{vm_type};
252
253 for (my $i = 0; $state->{"snap$i"}; $i++) {
254 $job->{"snap$i"} = $state->{"snap$i"};
255 }
256
257 return $job;
258 }
259
260 sub encode_cron {
261 my (@text) = @_;
262
263 my $cfg = {};
264
265 while (my $line = shift(@text)) {
266
267 my @arg = split('\s', $line);
268 my $param = parse_argv(@arg);
269
270 if ($param->{source} && $param->{dest}) {
271 my $source = delete $param->{source};
272 my $name = delete $param->{name};
273
274 $cfg->{$source}->{$name} = $param;
275 }
276 }
277
278 return $cfg;
279 }
280
281 sub param_to_job {
282 my ($param) = @_;
283
284 my $job = {};
285
286 my $source = parse_target($param->{source});
287 my $dest = parse_target($param->{dest}) if $param->{dest};
288
289 $job->{name} = !$param->{name} ? "default" : $param->{name};
290 $job->{dest} = $param->{dest} if $param->{dest};
291 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
292 $job->{method} = "ssh" if !$job->{method};
293 $job->{limit} = $param->{limit};
294 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
295 $job->{source} = $param->{source};
296 $job->{source_user} = $param->{source_user};
297 $job->{dest_user} = $param->{dest_user};
298 $job->{properties} = !!$param->{properties};
299
300 return $job;
301 }
302
303 sub read_state {
304
305 if (!-e $STATE) {
306 make_path $CONFIG_PATH;
307 my $new_fh = IO::File->new("> $STATE");
308 die "Could not create $STATE: $!\n" if !$new_fh;
309 print $new_fh "{}";
310 close($new_fh);
311 return undef;
312 }
313
314 my $fh = IO::File->new("< $STATE");
315 die "Could not open file $STATE: $!\n" if !$fh;
316
317 my $text = <$fh>;
318 my $states = decode_json($text);
319
320 close($fh);
321
322 return $states;
323 }
324
325 sub update_state {
326 my ($job) = @_;
327 my $text;
328 my $in_fh;
329
330 eval {
331
332 $in_fh = IO::File->new("< $STATE");
333 die "Could not open file $STATE: $!\n" if !$in_fh;
334 lock($in_fh);
335 $text = <$in_fh>;
336 };
337
338 my $out_fh = IO::File->new("> $STATE.new");
339 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
340
341 my $states = {};
342 my $state = {};
343 if ($text){
344 $states = decode_json($text);
345 $state = $states->{$job->{source}}->{$job->{name}};
346 }
347
348 if ($job->{state} ne "del") {
349 $state->{state} = $job->{state};
350 $state->{lsync} = $job->{lsync};
351 $state->{vm_type} = $job->{vm_type};
352
353 for (my $i = 0; $job->{"snap$i"} ; $i++) {
354 $state->{"snap$i"} = $job->{"snap$i"};
355 }
356 $states->{$job->{source}}->{$job->{name}} = $state;
357 } else {
358
359 delete $states->{$job->{source}}->{$job->{name}};
360 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
361 }
362
363 $text = encode_json($states);
364 print $out_fh $text;
365
366 close($out_fh);
367 rename "$STATE.new", $STATE;
368 eval {
369 close($in_fh);
370 };
371
372 return $states;
373 }
374
375 sub update_cron {
376 my ($job) = @_;
377
378 my $updated;
379 my $has_header;
380 my $line_no = 0;
381 my $text = "";
382 my $header = "SHELL=/bin/sh\n";
383 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
384
385 my $fh = IO::File->new("< $CRONJOBS");
386 die "Could not open file $CRONJOBS: $!\n" if !$fh;
387 lock($fh);
388
389 my @test = <$fh>;
390
391 while (my $line = shift(@test)) {
392 chomp($line);
393 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
394 $updated = 1;
395 next if $job->{state} eq "del";
396 $text .= format_job($job, $line);
397 } else {
398 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
399 $has_header = 1;
400 }
401 $text .= "$line\n";
402 }
403 $line_no++;
404 }
405
406 if (!$has_header) {
407 $text = "$header$text";
408 }
409
410 if (!$updated) {
411 $text .= format_job($job);
412 }
413 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
414 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
415
416 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
417 close ($new_fh);
418
419 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
420 close ($fh);
421 }
422
423 sub format_job {
424 my ($job, $line) = @_;
425 my $text = "";
426
427 if ($job->{state} eq "stopped") {
428 $text = "#";
429 }
430 if ($line) {
431 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
432 $text .= $1;
433 } else {
434 $text .= "*/$INTERVAL * * * *";
435 }
436 $text .= " root";
437 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
438 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
439 $text .= " --limit $job->{limit}" if $job->{limit};
440 $text .= " --method $job->{method}";
441 $text .= " --verbose" if $job->{verbose};
442 $text .= " --source-user $job->{source_user}";
443 $text .= " --dest-user $job->{dest_user}";
444 $text .= " --properties" if $job->{properties};
445 $text .= "\n";
446
447 return $text;
448 }
449
450 sub list {
451
452 my $cfg = read_cron();
453
454 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
455
456 my $states = read_state();
457 foreach my $source (sort keys%{$cfg}) {
458 foreach my $name (sort keys%{$cfg->{$source}}) {
459 $list .= sprintf("%-25s", cut_target_width($source, 25));
460 $list .= sprintf("%-25s", cut_target_width($name, 25));
461 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
462 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
463 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
464 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
465 }
466 }
467
468 return $list;
469 }
470
471 sub vm_exists {
472 my ($target, $user) = @_;
473
474 return undef if !defined($target->{vmid});
475
476 my $conf_fn = "$target->{vmid}.conf";
477
478 if ($target->{ip}) {
479 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
480 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
481 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
482 } else {
483 return "qemu" if -f "$QEMU_CONF/$conf_fn";
484 return "lxc" if -f "$LXC_CONF/$conf_fn";
485 }
486
487 return undef;
488 }
489
490 sub init {
491 my ($param) = @_;
492
493 my $cfg = read_cron();
494
495 my $job = param_to_job($param);
496
497 $job->{state} = "ok";
498 $job->{lsync} = 0;
499
500 my $source = parse_target($param->{source});
501 my $dest = parse_target($param->{dest});
502
503 if (my $ip = $dest->{ip}) {
504 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
505 }
506
507 if (my $ip = $source->{ip}) {
508 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
509 }
510
511 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
512
513 if (!defined($source->{vmid})) {
514 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
515 }
516
517 my $vm_type = vm_exists($source, $param->{source_user});
518 $job->{vm_type} = $vm_type;
519 $source->{vm_type} = $vm_type;
520
521 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
522
523 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
524
525 #check if vm has zfs disks if not die;
526 get_disks($source, $param->{source_user}) if $source->{vmid};
527
528 update_cron($job);
529 update_state($job);
530
531 eval {
532 sync($param) if !$param->{skip};
533 };
534 if(my $err = $@) {
535 destroy_job($param);
536 print $err;
537 }
538 }
539
540 sub get_job {
541 my ($param) = @_;
542
543 my $cfg = read_cron();
544
545 if (!$cfg->{$param->{source}}->{$param->{name}}) {
546 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
547 }
548 my $job = $cfg->{$param->{source}}->{$param->{name}};
549 $job->{name} = $param->{name};
550 $job->{source} = $param->{source};
551 $job = add_state_to_job($job);
552
553 return $job;
554 }
555
556 sub destroy_job {
557 my ($param) = @_;
558
559 my $job = get_job($param);
560 $job->{state} = "del";
561
562 update_cron($job);
563 update_state($job);
564 }
565
566 sub sync {
567 my ($param) = @_;
568
569 my $lock_fh = IO::File->new("> $LOCKFILE");
570 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
571 lock($lock_fh);
572
573 my $date = get_date();
574 my $job;
575 eval {
576 $job = get_job($param);
577 };
578
579 if ($job && defined($job->{state}) && $job->{state} eq "syncing") {
580 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
581 }
582
583 my $dest = parse_target($param->{dest});
584 my $source = parse_target($param->{source});
585
586 my $sync_path = sub {
587 my ($source, $dest, $job, $param, $date) = @_;
588
589 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
590
591 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
592
593 send_image($source, $dest, $param);
594
595 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
596
597 };
598
599 my $vm_type = vm_exists($source, $param->{source_user});
600 $source->{vm_type} = $vm_type;
601
602 if ($job) {
603 $job->{state} = "syncing";
604 $job->{vm_type} = $vm_type if !$job->{vm_type};
605 update_state($job);
606 }
607
608 eval{
609 if ($source->{vmid}) {
610 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
611 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
612 my $disks = get_disks($source, $param->{source_user});
613
614 foreach my $disk (sort keys %{$disks}) {
615 $source->{all} = $disks->{$disk}->{all};
616 $source->{pool} = $disks->{$disk}->{pool};
617 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
618 $source->{last_part} = $disks->{$disk}->{last_part};
619 &$sync_path($source, $dest, $job, $param, $date);
620 }
621 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
622 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user});
623 } else {
624 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user});
625 }
626 } else {
627 &$sync_path($source, $dest, $job, $param, $date);
628 }
629 };
630 if(my $err = $@) {
631 if ($job) {
632 $job->{state} = "error";
633 update_state($job);
634 unlock($lock_fh);
635 close($lock_fh);
636 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
637 }
638 die "$err\n";
639 }
640
641 if ($job) {
642 $job->{state} = "ok";
643 $job->{lsync} = $date;
644 update_state($job);
645 }
646
647 unlock($lock_fh);
648 close($lock_fh);
649 }
650
651 sub snapshot_get{
652 my ($source, $dest, $max_snap, $name, $source_user) = @_;
653
654 my $cmd = [];
655 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
656 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
657 push @$cmd, $source->{all};
658
659 my $raw = run_cmd($cmd);
660 my $index = 0;
661 my $line = "";
662 my $last_snap = undef;
663 my $old_snap;
664
665 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
666 $line = $1;
667 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
668
669 $last_snap = $1 if (!$last_snap);
670 $old_snap = $1;
671 $index++;
672 if ($index == $max_snap) {
673 $source->{destroy} = 1;
674 last;
675 };
676 }
677 }
678
679 return ($old_snap, $last_snap) if $last_snap;
680
681 return undef;
682 }
683
684 sub snapshot_add {
685 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
686
687 my $snap_name = "rep_$name\_".$date;
688
689 $source->{new_snap} = $snap_name;
690
691 my $path = "$source->{all}\@$snap_name";
692
693 my $cmd = [];
694 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
695 push @$cmd, 'zfs', 'snapshot', $path;
696 eval{
697 run_cmd($cmd);
698 };
699
700 if (my $err = $@) {
701 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
702 die "$err\n";
703 }
704 }
705
706 sub write_cron {
707 my ($cfg) = @_;
708
709 my $text = "SHELL=/bin/sh\n";
710 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
711
712 my $fh = IO::File->new("> $CRONJOBS");
713 die "Could not open file: $!\n" if !$fh;
714
715 foreach my $source (sort keys%{$cfg}) {
716 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
717 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
718 $text .= "$PROG_PATH sync";
719 $text .= " -source ";
720 if ($cfg->{$source}->{$sync_name}->{vmid}) {
721 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
722 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
723 } else {
724 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
725 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
726 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
727 }
728 $text .= " -dest ";
729 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
730 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
731 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
732 $text .= " -name $sync_name ";
733 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
734 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
735 $text .= "\n";
736 }
737 }
738 die "Can't write to cron\n" if (!print($fh $text));
739 close($fh);
740 }
741
742 sub get_disks {
743 my ($target, $user) = @_;
744
745 my $cmd = [];
746 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
747
748 if ($target->{vm_type} eq 'qemu') {
749 push @$cmd, 'qm', 'config', $target->{vmid};
750 } elsif ($target->{vm_type} eq 'lxc') {
751 push @$cmd, 'pct', 'config', $target->{vmid};
752 } else {
753 die "VM Type unknown\n";
754 }
755
756 my $res = run_cmd($cmd);
757
758 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
759
760 return $disks;
761 }
762
763 sub run_cmd {
764 my ($cmd) = @_;
765 print "Start CMD\n" if $DEBUG;
766 print Dumper $cmd if $DEBUG;
767 if (ref($cmd) eq 'ARRAY') {
768 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
769 }
770 my $output = `$cmd 2>&1`;
771
772 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
773
774 chomp($output);
775 print Dumper $output if $DEBUG;
776 print "END CMD\n" if $DEBUG;
777 return $output;
778 }
779
780 sub parse_disks {
781 my ($text, $ip, $vm_type, $user) = @_;
782
783 my $disks;
784
785 my $num = 0;
786 while ($text && $text =~ s/^(.*?)(\n|$)//) {
787 my $line = $1;
788
789 next if $line =~ /media=cdrom/;
790 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
791
792 #QEMU if backup is not set include in sync
793 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
794
795 #LXC if backup is not set do no in sync
796 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
797
798 my $disk = undef;
799 my $stor = undef;
800 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
801 my @parameter = split(/,/,$1);
802
803 foreach my $opt (@parameter) {
804 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
805 $disk = $2;
806 $stor = $1;
807 last;
808 }
809 }
810 }
811 if (!defined($disk) || !defined($stor)) {
812 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
813 next;
814 }
815
816 my $cmd = [];
817 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
818 push @$cmd, 'pvesm', 'path', "$stor$disk";
819 my $path = run_cmd($cmd);
820
821 die "Get no path from pvesm path $stor$disk\n" if !$path;
822
823 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
824
825 my @array = split('/', $1);
826 $disks->{$num}->{pool} = shift(@array);
827 $disks->{$num}->{all} = $disks->{$num}->{pool};
828 if (0 < @array) {
829 $disks->{$num}->{path} = join('/', @array);
830 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
831 }
832 $disks->{$num}->{last_part} = $disk;
833 $disks->{$num}->{all} .= "\/$disk";
834
835 $num++;
836 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
837
838 $disks->{$num}->{pool} = $1;
839 $disks->{$num}->{all} = $disks->{$num}->{pool};
840
841 if ($2) {
842 $disks->{$num}->{path} = $3;
843 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
844 }
845
846 $disks->{$num}->{last_part} = $disk;
847 $disks->{$num}->{all} .= "\/$disk";
848
849 $num++;
850
851 } else {
852 die "ERROR: in path\n";
853 }
854 }
855
856 die "Vm include no disk on zfs.\n" if !$disks->{0};
857 return $disks;
858 }
859
860 sub snapshot_destroy {
861 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
862
863 my @zfscmd = ('zfs', 'destroy');
864 my $snapshot = "$source->{all}\@$snap";
865
866 eval {
867 if($source->{ip} && $method eq 'ssh'){
868 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
869 } else {
870 run_cmd([@zfscmd, $snapshot]);
871 }
872 };
873 if (my $erro = $@) {
874 warn "WARN: $erro";
875 }
876 if ($dest) {
877 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
878
879 my $path = "$dest->{all}";
880 $path .= "/$source->{last_part}" if $source->{last_part};
881
882 eval {
883 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
884 };
885 if (my $erro = $@) {
886 warn "WARN: $erro";
887 }
888 }
889 }
890
891 sub snapshot_exist {
892 my ($source , $dest, $method, $dest_user) = @_;
893
894 my $cmd = [];
895 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--' if $dest->{ip};
896 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
897
898 my $path = $dest->{all};
899 $path .= "/$source->{last_part}" if $source->{last_part};
900 $path .= "\@$source->{old_snap}";
901
902 push @$cmd, $path;
903
904
905 my $text = "";
906 eval {$text =run_cmd($cmd);};
907 if (my $erro =$@) {
908 warn "WARN: $erro";
909 return undef;
910 }
911
912 while ($text && $text =~ s/^(.*?)(\n|$)//) {
913 my $line =$1;
914 return 1 if $line =~ m/^.*$source->{old_snap}$/;
915 }
916 }
917
918 sub send_image {
919 my ($source, $dest, $param) = @_;
920
921 my $cmd = [];
922
923 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
924 push @$cmd, 'zfs', 'send';
925 push @$cmd, '-p', if $param->{properties};
926 push @$cmd, '-v' if $param->{verbose};
927
928 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{dest_user})) {
929 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
930 }
931 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
932
933 if ($param->{limit}){
934 my $bwl = $param->{limit}*1024;
935 push @$cmd, \'|', 'cstream', '-t', $bwl;
936 }
937 my $target = "$dest->{all}";
938 $target .= "/$source->{last_part}" if $source->{last_part};
939 $target =~ s!/+!/!g;
940
941 push @$cmd, \'|';
942 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
943 push @$cmd, 'zfs', 'recv', '-F', '--';
944 push @$cmd, "$target";
945
946 eval {
947 run_cmd($cmd)
948 };
949
950 if (my $erro = $@) {
951 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
952 die $erro;
953 };
954 }
955
956
957 sub send_config{
958 my ($source, $dest, $method, $source_user, $dest_user) = @_;
959
960 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
961 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
962
963 my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
964
965 $dest_target_new = $config_dir.'/'.$dest_target_new;
966
967 if ($method eq 'ssh'){
968 if ($dest->{ip} && $source->{ip}) {
969 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
970 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
971 } elsif ($dest->{ip}) {
972 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
973 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
974 } elsif ($source->{ip}) {
975 run_cmd(['mkdir', '-p', '--', $config_dir]);
976 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
977 }
978
979 if ($source->{destroy}){
980 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
981 if($dest->{ip}){
982 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
983 } else {
984 run_cmd(['rm', '-f', '--', $dest_target_old]);
985 }
986 }
987 } elsif ($method eq 'local') {
988 run_cmd(['mkdir', '-p', '--', $config_dir]);
989 run_cmd(['cp', $source_target, $dest_target_new]);
990 }
991 }
992
993 sub get_date {
994 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
995 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
996
997 return $datestamp;
998 }
999
1000 sub status {
1001 my $cfg = read_cron();
1002
1003 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1004
1005 my $states = read_state();
1006
1007 foreach my $source (sort keys%{$cfg}) {
1008 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1009 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1010 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1011 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1012 }
1013 }
1014
1015 return $status_list;
1016 }
1017
1018 sub enable_job {
1019 my ($param) = @_;
1020
1021 my $job = get_job($param);
1022 $job->{state} = "ok";
1023 update_state($job);
1024 update_cron($job);
1025 }
1026
1027 sub disable_job {
1028 my ($param) = @_;
1029
1030 my $job = get_job($param);
1031 $job->{state} = "stopped";
1032 update_state($job);
1033 update_cron($job);
1034 }
1035
1036 my $cmd_help = {
1037 destroy => qq{
1038 $PROGNAME destroy -source <string> [OPTIONS]
1039
1040 remove a sync Job from the scheduler
1041
1042 -name string
1043
1044 name of the sync job, if not set it is default
1045
1046 -source string
1047
1048 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1049 },
1050 create => qq{
1051 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1052
1053 Create a sync Job
1054
1055 -dest string
1056
1057 the destination target is like [IP]:<Pool>[/Path]
1058
1059 -dest-user string
1060
1061 name of the user on the destination target, root by default
1062
1063 -limit integer
1064
1065 max sync speed in kBytes/s, default unlimited
1066
1067 -maxsnap string
1068
1069 how much snapshots will be kept before get erased, default 1
1070
1071 -name string
1072
1073 name of the sync job, if not set it is default
1074
1075 -skip boolean
1076
1077 if this flag is set it will skip the first sync
1078
1079 -source string
1080
1081 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1082
1083 -source-user string
1084
1085 name of the user on the source target, root by default
1086
1087 -properties boolean
1088
1089 Include the dataset's properties in the stream.
1090 },
1091 sync => qq{
1092 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1093
1094 will sync one time
1095
1096 -dest string
1097
1098 the destination target is like [IP:]<Pool>[/Path]
1099
1100 -dest-user string
1101
1102 name of the user on the destination target, root by default
1103
1104 -limit integer
1105
1106 max sync speed in kBytes/s, default unlimited
1107
1108 -maxsnap integer
1109
1110 how much snapshots will be kept before get erased, default 1
1111
1112 -name string
1113
1114 name of the sync job, if not set it is default.
1115 It is only necessary if scheduler allready contains this source.
1116
1117 -source string
1118
1119 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1120
1121 -source-user string
1122
1123 name of the user on the source target, root by default
1124
1125 -verbose boolean
1126
1127 print out the sync progress.
1128
1129 -properties boolean
1130
1131 Include the dataset's properties in the stream.
1132 },
1133 list => qq{
1134 $PROGNAME list
1135
1136 Get a List of all scheduled Sync Jobs
1137 },
1138 status => qq{
1139 $PROGNAME status
1140
1141 Get the status of all scheduled Sync Jobs
1142 },
1143 help => qq{
1144 $PROGNAME help <cmd> [OPTIONS]
1145
1146 Get help about specified command.
1147
1148 <cmd> string
1149
1150 Command name
1151
1152 -verbose boolean
1153
1154 Verbose output format.
1155 },
1156 enable => qq{
1157 $PROGNAME enable -source <string> [OPTIONS]
1158
1159 enable a syncjob and reset error
1160
1161 -name string
1162
1163 name of the sync job, if not set it is default
1164
1165 -source string
1166
1167 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1168 },
1169 disable => qq{
1170 $PROGNAME disable -source <string> [OPTIONS]
1171
1172 pause a sync job
1173
1174 -name string
1175
1176 name of the sync job, if not set it is default
1177
1178 -source string
1179
1180 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1181 },
1182 printpod => 'internal command',
1183
1184 };
1185
1186 if (!$command) {
1187 usage(); die "\n";
1188 } elsif (!$cmd_help->{$command}) {
1189 print "ERROR: unknown command '$command'";
1190 usage(1); die "\n";
1191 }
1192
1193 my @arg = @ARGV;
1194 my $param = parse_argv(@arg);
1195
1196 sub check_params {
1197 for (@_) {
1198 die "$cmd_help->{$command}\n" if !$param->{$_};
1199 }
1200 }
1201
1202 if ($command eq 'destroy') {
1203 check_params(qw(source));
1204
1205 check_target($param->{source});
1206 destroy_job($param);
1207
1208 } elsif ($command eq 'sync') {
1209 check_params(qw(source dest));
1210
1211 check_target($param->{source});
1212 check_target($param->{dest});
1213 sync($param);
1214
1215 } elsif ($command eq 'create') {
1216 check_params(qw(source dest));
1217
1218 check_target($param->{source});
1219 check_target($param->{dest});
1220 init($param);
1221
1222 } elsif ($command eq 'status') {
1223 print status();
1224
1225 } elsif ($command eq 'list') {
1226 print list();
1227
1228 } elsif ($command eq 'help') {
1229 my $help_command = $ARGV[1];
1230
1231 if ($help_command && $cmd_help->{$help_command}) {
1232 die "$cmd_help->{$help_command}\n";
1233
1234 }
1235 if ($param->{verbose}) {
1236 exec("man $PROGNAME");
1237
1238 } else {
1239 usage(1);
1240
1241 }
1242
1243 } elsif ($command eq 'enable') {
1244 check_params(qw(source));
1245
1246 check_target($param->{source});
1247 enable_job($param);
1248
1249 } elsif ($command eq 'disable') {
1250 check_params(qw(source));
1251
1252 check_target($param->{source});
1253 disable_job($param);
1254
1255 } elsif ($command eq 'printpod') {
1256 print_pod();
1257 }
1258
1259 sub usage {
1260 my ($help) = @_;
1261
1262 print("ERROR:\tno command specified\n") if !$help;
1263 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1264 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1265 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1266 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1267 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1268 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1269 print("\t$PROGNAME list\n");
1270 print("\t$PROGNAME status\n");
1271 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1272 }
1273
1274 sub check_target {
1275 my ($target) = @_;
1276 parse_target($target);
1277 }
1278
1279 sub print_pod {
1280
1281 my $synopsis = join("\n", sort values %$cmd_help);
1282
1283 print <<EOF;
1284 =head1 NAME
1285
1286 pve-zsync - PVE ZFS Replication Manager
1287
1288 =head1 SYNOPSIS
1289
1290 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1291
1292 $synopsis
1293
1294 =head1 DESCRIPTION
1295
1296 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1297 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1298 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1299 To config cron see man crontab.
1300
1301 =head2 PVE ZFS Storage sync Tool
1302
1303 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1304
1305 =head1 EXAMPLES
1306
1307 add sync job from local VM to remote ZFS Server
1308 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1309
1310 =head1 IMPORTANT FILES
1311
1312 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1313
1314 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1315
1316 =head1 COPYRIGHT AND DISCLAIMER
1317
1318 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1319
1320 This program is free software: you can redistribute it and/or modify it
1321 under the terms of the GNU Affero General Public License as published
1322 by the Free Software Foundation, either version 3 of the License, or
1323 (at your option) any later version.
1324
1325 This program is distributed in the hope that it will be useful, but
1326 WITHOUT ANY WARRANTY; without even the implied warranty of
1327 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1328 Affero General Public License for more details.
1329
1330 You should have received a copy of the GNU Affero General Public
1331 License along with this program. If not, see
1332 <http://www.gnu.org/licenses/>.
1333
1334 EOF
1335 }