]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
followup: pass properties without value
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Copy qw(move);
9 use File::Path qw(make_path);
10 use JSON;
11 use IO::File;
12 use String::ShellQuote 'shell_quote';
13
14 my $PROGNAME = "pve-zsync";
15 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
16 my $STATE = "${CONFIG_PATH}/sync_state";
17 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
18 my $PATH = "/usr/sbin";
19 my $PVE_DIR = "/etc/pve/local";
20 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
21 my $LXC_CONF = "${PVE_DIR}/lxc";
22 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
23 my $PROG_PATH = "$PATH/${PROGNAME}";
24 my $INTERVAL = 15;
25 my $DEBUG = 0;
26
27 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
28 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
29 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
30 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
31
32 my $IPV6RE = "(?:" .
33 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
34 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
35 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
36 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
41 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
42
43 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
44 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
45 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
46 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
47 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
48
49 my $command = $ARGV[0];
50
51 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
52 check_bin ('cstream');
53 check_bin ('zfs');
54 check_bin ('ssh');
55 check_bin ('scp');
56 }
57
58 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =
59 sub {
60 die "Signal aborting sync\n";
61 };
62
63 sub check_bin {
64 my ($bin) = @_;
65
66 foreach my $p (split (/:/, $ENV{PATH})) {
67 my $fn = "$p/$bin";
68 if (-x $fn) {
69 return $fn;
70 }
71 }
72
73 die "unable to find command '$bin'\n";
74 }
75
76 sub cut_target_width {
77 my ($path, $maxlen) = @_;
78 $path =~ s@/+@/@g;
79
80 return $path if length($path) <= $maxlen;
81
82 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
83
84 $path =~ s@/([^/]+/?)$@@;
85 my $tail = $1;
86
87 if (length($tail)+3 == $maxlen) {
88 return "../$tail";
89 } elsif (length($tail)+2 >= $maxlen) {
90 return '..'.substr($tail, -$maxlen+2)
91 }
92
93 $path =~ s@(/[^/]+)(?:/|$)@@;
94 my $head = $1;
95 my $both = length($head) + length($tail);
96 my $remaining = $maxlen-$both-4; # -4 for "/../"
97
98 if ($remaining < 0) {
99 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
100 }
101
102 substr($path, ($remaining/2), (length($path)-$remaining), '..');
103 return "$head/" . $path . "/$tail";
104 }
105
106 sub lock {
107 my ($fh) = @_;
108 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
109 }
110
111 sub unlock {
112 my ($fh) = @_;
113 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
114 }
115
116 sub get_status {
117 my ($source, $name, $status) = @_;
118
119 if ($status->{$source->{all}}->{$name}->{status}) {
120 return $status;
121 }
122
123 return undef;
124 }
125
126 sub check_pool_exists {
127 my ($target, $user) = @_;
128
129 my $cmd = [];
130
131 if ($target->{ip}) {
132 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
133 }
134 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
135 eval {
136 run_cmd($cmd);
137 };
138
139 if ($@) {
140 return 0;
141 }
142 return 1;
143 }
144
145 sub parse_target {
146 my ($text) = @_;
147
148 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
149 my $target = {};
150
151 if ($text !~ $TARGETRE) {
152 die "$errstr\n";
153 }
154 $target->{all} = $2;
155 $target->{ip} = $1 if $1;
156 my @parts = split('/', $2);
157
158 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
159
160 my $pool = $target->{pool} = shift(@parts);
161 die "$errstr\n" if !$pool;
162
163 if ($pool =~ m/^\d+$/) {
164 $target->{vmid} = $pool;
165 delete $target->{pool};
166 }
167
168 return $target if (@parts == 0);
169 $target->{last_part} = pop(@parts);
170
171 if ($target->{ip}) {
172 pop(@parts);
173 }
174 if (@parts > 0) {
175 $target->{path} = join('/', @parts);
176 }
177
178 return $target;
179 }
180
181 sub read_cron {
182
183 #This is for the first use to init file;
184 if (!-e $CRONJOBS) {
185 my $new_fh = IO::File->new("> $CRONJOBS");
186 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
187 close($new_fh);
188 return undef;
189 }
190
191 my $fh = IO::File->new("< $CRONJOBS");
192 die "Could not open file $CRONJOBS: $!\n" if !$fh;
193
194 my @text = <$fh>;
195
196 close($fh);
197
198 return encode_cron(@text);
199 }
200
201 sub parse_argv {
202 my (@arg) = @_;
203
204 my $param = {
205 dest => undef,
206 source => undef,
207 verbose => undef,
208 limit => undef,
209 maxsnap => undef,
210 name => undef,
211 skip => undef,
212 method => undef,
213 source_user => undef,
214 dest_user => undef,
215 properties => undef,
216 };
217
218 my ($ret) = GetOptionsFromArray(
219 \@arg,
220 'dest=s' => \$param->{dest},
221 'source=s' => \$param->{source},
222 'verbose' => \$param->{verbose},
223 'limit=i' => \$param->{limit},
224 'maxsnap=i' => \$param->{maxsnap},
225 'name=s' => \$param->{name},
226 'skip' => \$param->{skip},
227 'method=s' => \$param->{method},
228 'source-user=s' => \$param->{source_user},
229 'dest-user=s' => \$param->{dest_user},
230 'properties' => \$param->{properties},
231 );
232
233 die "can't parse options\n" if $ret == 0;
234
235 $param->{name} //= "default";
236 $param->{maxsnap} //= 1;
237 $param->{method} //= "ssh";
238 $param->{source_user} //= "root";
239 $param->{dest_user} //= "root";
240
241 return $param;
242 }
243
244 sub add_state_to_job {
245 my ($job) = @_;
246
247 my $states = read_state();
248 my $state = $states->{$job->{source}}->{$job->{name}};
249
250 $job->{state} = $state->{state};
251 $job->{lsync} = $state->{lsync};
252 $job->{vm_type} = $state->{vm_type};
253
254 for (my $i = 0; $state->{"snap$i"}; $i++) {
255 $job->{"snap$i"} = $state->{"snap$i"};
256 }
257
258 return $job;
259 }
260
261 sub encode_cron {
262 my (@text) = @_;
263
264 my $cfg = {};
265
266 while (my $line = shift(@text)) {
267
268 my @arg = split('\s', $line);
269 my $param = parse_argv(@arg);
270
271 if ($param->{source} && $param->{dest}) {
272 my $source = delete $param->{source};
273 my $name = delete $param->{name};
274
275 $cfg->{$source}->{$name} = $param;
276 }
277 }
278
279 return $cfg;
280 }
281
282 sub param_to_job {
283 my ($param) = @_;
284
285 my $job = {};
286
287 my $source = parse_target($param->{source});
288 my $dest = parse_target($param->{dest}) if $param->{dest};
289
290 $job->{name} = !$param->{name} ? "default" : $param->{name};
291 $job->{dest} = $param->{dest} if $param->{dest};
292 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
293 $job->{method} = "ssh" if !$job->{method};
294 $job->{limit} = $param->{limit};
295 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
296 $job->{source} = $param->{source};
297 $job->{source_user} = $param->{source_user};
298 $job->{dest_user} = $param->{dest_user};
299 $job->{properties} = !!$param->{properties};
300
301 return $job;
302 }
303
304 sub read_state {
305
306 if (!-e $STATE) {
307 make_path $CONFIG_PATH;
308 my $new_fh = IO::File->new("> $STATE");
309 die "Could not create $STATE: $!\n" if !$new_fh;
310 print $new_fh "{}";
311 close($new_fh);
312 return undef;
313 }
314
315 my $fh = IO::File->new("< $STATE");
316 die "Could not open file $STATE: $!\n" if !$fh;
317
318 my $text = <$fh>;
319 my $states = decode_json($text);
320
321 close($fh);
322
323 return $states;
324 }
325
326 sub update_state {
327 my ($job) = @_;
328 my $text;
329 my $in_fh;
330
331 eval {
332
333 $in_fh = IO::File->new("< $STATE");
334 die "Could not open file $STATE: $!\n" if !$in_fh;
335 lock($in_fh);
336 $text = <$in_fh>;
337 };
338
339 my $out_fh = IO::File->new("> $STATE.new");
340 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
341
342 my $states = {};
343 my $state = {};
344 if ($text){
345 $states = decode_json($text);
346 $state = $states->{$job->{source}}->{$job->{name}};
347 }
348
349 if ($job->{state} ne "del") {
350 $state->{state} = $job->{state};
351 $state->{lsync} = $job->{lsync};
352 $state->{vm_type} = $job->{vm_type};
353
354 for (my $i = 0; $job->{"snap$i"} ; $i++) {
355 $state->{"snap$i"} = $job->{"snap$i"};
356 }
357 $states->{$job->{source}}->{$job->{name}} = $state;
358 } else {
359
360 delete $states->{$job->{source}}->{$job->{name}};
361 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
362 }
363
364 $text = encode_json($states);
365 print $out_fh $text;
366
367 close($out_fh);
368 move("$STATE.new", $STATE);
369 eval {
370 close($in_fh);
371 };
372
373 return $states;
374 }
375
376 sub update_cron {
377 my ($job) = @_;
378
379 my $updated;
380 my $has_header;
381 my $line_no = 0;
382 my $text = "";
383 my $header = "SHELL=/bin/sh\n";
384 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
385
386 my $fh = IO::File->new("< $CRONJOBS");
387 die "Could not open file $CRONJOBS: $!\n" if !$fh;
388 lock($fh);
389
390 my @test = <$fh>;
391
392 while (my $line = shift(@test)) {
393 chomp($line);
394 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
395 $updated = 1;
396 next if $job->{state} eq "del";
397 $text .= format_job($job, $line);
398 } else {
399 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
400 $has_header = 1;
401 }
402 $text .= "$line\n";
403 }
404 $line_no++;
405 }
406
407 if (!$has_header) {
408 $text = "$header$text";
409 }
410
411 if (!$updated) {
412 $text .= format_job($job);
413 }
414 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
415 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
416
417 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
418 close ($new_fh);
419
420 die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
421 close ($fh);
422 }
423
424 sub format_job {
425 my ($job, $line) = @_;
426 my $text = "";
427
428 if ($job->{state} eq "stopped") {
429 $text = "#";
430 }
431 if ($line) {
432 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
433 $text .= $1;
434 } else {
435 $text .= "*/$INTERVAL * * * *";
436 }
437 $text .= " root";
438 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
439 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
440 $text .= " --limit $job->{limit}" if $job->{limit};
441 $text .= " --method $job->{method}";
442 $text .= " --verbose" if $job->{verbose};
443 $text .= " --source-user $job->{source_user}";
444 $text .= " --dest-user $job->{dest_user}";
445 $text .= " --properties" if $job->{properties};
446 $text .= "\n";
447
448 return $text;
449 }
450
451 sub list {
452
453 my $cfg = read_cron();
454
455 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
456
457 my $states = read_state();
458 foreach my $source (sort keys%{$cfg}) {
459 foreach my $name (sort keys%{$cfg->{$source}}) {
460 $list .= sprintf("%-25s", cut_target_width($source, 25));
461 $list .= sprintf("%-25s", cut_target_width($name, 25));
462 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
463 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
464 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
465 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
466 }
467 }
468
469 return $list;
470 }
471
472 sub vm_exists {
473 my ($target, $user) = @_;
474
475 return undef if !defined($target->{vmid});
476
477 my $conf_fn = "$target->{vmid}.conf";
478
479 if ($target->{ip}) {
480 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
481 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
482 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
483 } else {
484 return "qemu" if -f "$QEMU_CONF/$conf_fn";
485 return "lxc" if -f "$LXC_CONF/$conf_fn";
486 }
487
488 return undef;
489 }
490
491 sub init {
492 my ($param) = @_;
493
494 my $cfg = read_cron();
495
496 my $job = param_to_job($param);
497
498 $job->{state} = "ok";
499 $job->{lsync} = 0;
500
501 my $source = parse_target($param->{source});
502 my $dest = parse_target($param->{dest});
503
504 if (my $ip = $dest->{ip}) {
505 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
506 }
507
508 if (my $ip = $source->{ip}) {
509 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
510 }
511
512 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
513
514 if (!defined($source->{vmid})) {
515 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
516 }
517
518 my $vm_type = vm_exists($source, $param->{source_user});
519 $job->{vm_type} = $vm_type;
520 $source->{vm_type} = $vm_type;
521
522 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
523
524 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
525
526 #check if vm has zfs disks if not die;
527 get_disks($source, $param->{source_user}) if $source->{vmid};
528
529 update_cron($job);
530 update_state($job);
531
532 eval {
533 sync($param) if !$param->{skip};
534 };
535 if(my $err = $@) {
536 destroy_job($param);
537 print $err;
538 }
539 }
540
541 sub get_job {
542 my ($param) = @_;
543
544 my $cfg = read_cron();
545
546 if (!$cfg->{$param->{source}}->{$param->{name}}) {
547 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
548 }
549 my $job = $cfg->{$param->{source}}->{$param->{name}};
550 $job->{name} = $param->{name};
551 $job->{source} = $param->{source};
552 $job = add_state_to_job($job);
553
554 return $job;
555 }
556
557 sub destroy_job {
558 my ($param) = @_;
559
560 my $job = get_job($param);
561 $job->{state} = "del";
562
563 update_cron($job);
564 update_state($job);
565 }
566
567 sub sync {
568 my ($param) = @_;
569
570 my $lock_fh = IO::File->new("> $LOCKFILE");
571 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
572 lock($lock_fh);
573
574 my $date = get_date();
575 my $job;
576 eval {
577 $job = get_job($param);
578 };
579
580 if ($job && defined($job->{state}) && $job->{state} eq "syncing") {
581 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
582 }
583
584 my $dest = parse_target($param->{dest});
585 my $source = parse_target($param->{source});
586
587 my $sync_path = sub {
588 my ($source, $dest, $job, $param, $date) = @_;
589
590 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
591
592 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
593
594 send_image($source, $dest, $param);
595
596 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
597
598 };
599
600 my $vm_type = vm_exists($source, $param->{source_user});
601 $source->{vm_type} = $vm_type;
602
603 if ($job) {
604 $job->{state} = "syncing";
605 $job->{vm_type} = $vm_type if !$job->{vm_type};
606 update_state($job);
607 }
608
609 eval{
610 if ($source->{vmid}) {
611 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
612 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
613 my $disks = get_disks($source, $param->{source_user});
614
615 foreach my $disk (sort keys %{$disks}) {
616 $source->{all} = $disks->{$disk}->{all};
617 $source->{pool} = $disks->{$disk}->{pool};
618 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
619 $source->{last_part} = $disks->{$disk}->{last_part};
620 &$sync_path($source, $dest, $job, $param, $date);
621 }
622 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
623 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user});
624 } else {
625 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user});
626 }
627 } else {
628 &$sync_path($source, $dest, $job, $param, $date);
629 }
630 };
631 if(my $err = $@) {
632 if ($job) {
633 $job->{state} = "error";
634 update_state($job);
635 unlock($lock_fh);
636 close($lock_fh);
637 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
638 }
639 die "$err\n";
640 }
641
642 if ($job) {
643 $job->{state} = "ok";
644 $job->{lsync} = $date;
645 update_state($job);
646 }
647
648 unlock($lock_fh);
649 close($lock_fh);
650 }
651
652 sub snapshot_get{
653 my ($source, $dest, $max_snap, $name, $source_user) = @_;
654
655 my $cmd = [];
656 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
657 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
658 push @$cmd, $source->{all};
659
660 my $raw = run_cmd($cmd);
661 my $index = 0;
662 my $line = "";
663 my $last_snap = undef;
664 my $old_snap;
665
666 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
667 $line = $1;
668 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
669
670 $last_snap = $1 if (!$last_snap);
671 $old_snap = $1;
672 $index++;
673 if ($index == $max_snap) {
674 $source->{destroy} = 1;
675 last;
676 };
677 }
678 }
679
680 return ($old_snap, $last_snap) if $last_snap;
681
682 return undef;
683 }
684
685 sub snapshot_add {
686 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
687
688 my $snap_name = "rep_$name\_".$date;
689
690 $source->{new_snap} = $snap_name;
691
692 my $path = "$source->{all}\@$snap_name";
693
694 my $cmd = [];
695 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
696 push @$cmd, 'zfs', 'snapshot', $path;
697 eval{
698 run_cmd($cmd);
699 };
700
701 if (my $err = $@) {
702 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
703 die "$err\n";
704 }
705 }
706
707 sub write_cron {
708 my ($cfg) = @_;
709
710 my $text = "SHELL=/bin/sh\n";
711 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
712
713 my $fh = IO::File->new("> $CRONJOBS");
714 die "Could not open file: $!\n" if !$fh;
715
716 foreach my $source (sort keys%{$cfg}) {
717 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
718 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
719 $text .= "$PROG_PATH sync";
720 $text .= " -source ";
721 if ($cfg->{$source}->{$sync_name}->{vmid}) {
722 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
723 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
724 } else {
725 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
726 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
727 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
728 }
729 $text .= " -dest ";
730 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
731 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
732 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
733 $text .= " -name $sync_name ";
734 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
735 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
736 $text .= "\n";
737 }
738 }
739 die "Can't write to cron\n" if (!print($fh $text));
740 close($fh);
741 }
742
743 sub get_disks {
744 my ($target, $user) = @_;
745
746 my $cmd = [];
747 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
748
749 if ($target->{vm_type} eq 'qemu') {
750 push @$cmd, 'qm', 'config', $target->{vmid};
751 } elsif ($target->{vm_type} eq 'lxc') {
752 push @$cmd, 'pct', 'config', $target->{vmid};
753 } else {
754 die "VM Type unknown\n";
755 }
756
757 my $res = run_cmd($cmd);
758
759 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
760
761 return $disks;
762 }
763
764 sub run_cmd {
765 my ($cmd) = @_;
766 print "Start CMD\n" if $DEBUG;
767 print Dumper $cmd if $DEBUG;
768 if (ref($cmd) eq 'ARRAY') {
769 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
770 }
771 my $output = `$cmd 2>&1`;
772
773 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
774
775 chomp($output);
776 print Dumper $output if $DEBUG;
777 print "END CMD\n" if $DEBUG;
778 return $output;
779 }
780
781 sub parse_disks {
782 my ($text, $ip, $vm_type, $user) = @_;
783
784 my $disks;
785
786 my $num = 0;
787 while ($text && $text =~ s/^(.*?)(\n|$)//) {
788 my $line = $1;
789
790 next if $line =~ /media=cdrom/;
791 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
792
793 #QEMU if backup is not set include in sync
794 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
795
796 #LXC if backup is not set do no in sync
797 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
798
799 my $disk = undef;
800 my $stor = undef;
801 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
802 my @parameter = split(/,/,$1);
803
804 foreach my $opt (@parameter) {
805 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
806 $disk = $2;
807 $stor = $1;
808 last;
809 }
810 }
811 }
812 if (!defined($disk) || !defined($stor)) {
813 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
814 next;
815 }
816
817 my $cmd = [];
818 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
819 push @$cmd, 'pvesm', 'path', "$stor$disk";
820 my $path = run_cmd($cmd);
821
822 die "Get no path from pvesm path $stor$disk\n" if !$path;
823
824 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
825
826 my @array = split('/', $1);
827 $disks->{$num}->{pool} = shift(@array);
828 $disks->{$num}->{all} = $disks->{$num}->{pool};
829 if (0 < @array) {
830 $disks->{$num}->{path} = join('/', @array);
831 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
832 }
833 $disks->{$num}->{last_part} = $disk;
834 $disks->{$num}->{all} .= "\/$disk";
835
836 $num++;
837 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
838
839 $disks->{$num}->{pool} = $1;
840 $disks->{$num}->{all} = $disks->{$num}->{pool};
841
842 if ($2) {
843 $disks->{$num}->{path} = $3;
844 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
845 }
846
847 $disks->{$num}->{last_part} = $disk;
848 $disks->{$num}->{all} .= "\/$disk";
849
850 $num++;
851
852 } else {
853 die "ERROR: in path\n";
854 }
855 }
856
857 die "Vm include no disk on zfs.\n" if !$disks->{0};
858 return $disks;
859 }
860
861 sub snapshot_destroy {
862 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
863
864 my @zfscmd = ('zfs', 'destroy');
865 my $snapshot = "$source->{all}\@$snap";
866
867 eval {
868 if($source->{ip} && $method eq 'ssh'){
869 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
870 } else {
871 run_cmd([@zfscmd, $snapshot]);
872 }
873 };
874 if (my $erro = $@) {
875 warn "WARN: $erro";
876 }
877 if ($dest) {
878 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
879
880 my $path = "$dest->{all}";
881 $path .= "/$source->{last_part}" if $source->{last_part};
882
883 eval {
884 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
885 };
886 if (my $erro = $@) {
887 warn "WARN: $erro";
888 }
889 }
890 }
891
892 sub snapshot_exist {
893 my ($source , $dest, $method, $dest_user) = @_;
894
895 my $cmd = [];
896 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--' if $dest->{ip};
897 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
898
899 my $path = $dest->{all};
900 $path .= "/$source->{last_part}" if $source->{last_part};
901 $path .= "\@$source->{old_snap}";
902
903 push @$cmd, $path;
904
905
906 my $text = "";
907 eval {$text =run_cmd($cmd);};
908 if (my $erro =$@) {
909 warn "WARN: $erro";
910 return undef;
911 }
912
913 while ($text && $text =~ s/^(.*?)(\n|$)//) {
914 my $line =$1;
915 return 1 if $line =~ m/^.*$source->{old_snap}$/;
916 }
917 }
918
919 sub send_image {
920 my ($source, $dest, $param) = @_;
921
922 my $cmd = [];
923
924 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
925 push @$cmd, 'zfs', 'send';
926 push @$cmd, '-p', if $param->{properties};
927 push @$cmd, '-v' if $param->{verbose};
928
929 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{dest_user})) {
930 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
931 }
932 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
933
934 if ($param->{limit}){
935 my $bwl = $param->{limit}*1024;
936 push @$cmd, \'|', 'cstream', '-t', $bwl;
937 }
938 my $target = "$dest->{all}";
939 $target .= "/$source->{last_part}" if $source->{last_part};
940 $target =~ s!/+!/!g;
941
942 push @$cmd, \'|';
943 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
944 push @$cmd, 'zfs', 'recv', '-F', '--';
945 push @$cmd, "$target";
946
947 eval {
948 run_cmd($cmd)
949 };
950
951 if (my $erro = $@) {
952 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
953 die $erro;
954 };
955 }
956
957
958 sub send_config{
959 my ($source, $dest, $method, $source_user, $dest_user) = @_;
960
961 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
962 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
963
964 my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
965
966 $dest_target_new = $config_dir.'/'.$dest_target_new;
967
968 if ($method eq 'ssh'){
969 if ($dest->{ip} && $source->{ip}) {
970 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
971 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
972 } elsif ($dest->{ip}) {
973 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
974 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
975 } elsif ($source->{ip}) {
976 run_cmd(['mkdir', '-p', '--', $config_dir]);
977 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
978 }
979
980 if ($source->{destroy}){
981 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
982 if($dest->{ip}){
983 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
984 } else {
985 run_cmd(['rm', '-f', '--', $dest_target_old]);
986 }
987 }
988 } elsif ($method eq 'local') {
989 run_cmd(['mkdir', '-p', '--', $config_dir]);
990 run_cmd(['cp', $source_target, $dest_target_new]);
991 }
992 }
993
994 sub get_date {
995 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
996 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
997
998 return $datestamp;
999 }
1000
1001 sub status {
1002 my $cfg = read_cron();
1003
1004 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1005
1006 my $states = read_state();
1007
1008 foreach my $source (sort keys%{$cfg}) {
1009 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1010 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1011 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1012 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1013 }
1014 }
1015
1016 return $status_list;
1017 }
1018
1019 sub enable_job {
1020 my ($param) = @_;
1021
1022 my $job = get_job($param);
1023 $job->{state} = "ok";
1024 update_state($job);
1025 update_cron($job);
1026 }
1027
1028 sub disable_job {
1029 my ($param) = @_;
1030
1031 my $job = get_job($param);
1032 $job->{state} = "stopped";
1033 update_state($job);
1034 update_cron($job);
1035 }
1036
1037 my $cmd_help = {
1038 destroy => qq{
1039 $PROGNAME destroy -source <string> [OPTIONS]
1040
1041 remove a sync Job from the scheduler
1042
1043 -name string
1044
1045 name of the sync job, if not set it is default
1046
1047 -source string
1048
1049 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1050 },
1051 create => qq{
1052 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1053
1054 Create a sync Job
1055
1056 -dest string
1057
1058 the destination target is like [IP]:<Pool>[/Path]
1059
1060 -dest-user string
1061
1062 name of the user on the destination target, root by default
1063
1064 -limit integer
1065
1066 max sync speed in kBytes/s, default unlimited
1067
1068 -maxsnap string
1069
1070 how much snapshots will be kept before get erased, default 1
1071
1072 -name string
1073
1074 name of the sync job, if not set it is default
1075
1076 -skip boolean
1077
1078 if this flag is set it will skip the first sync
1079
1080 -source string
1081
1082 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1083
1084 -source-user string
1085
1086 name of the user on the source target, root by default
1087
1088 -properties boolean
1089
1090 Include the dataset's properties in the stream.
1091 },
1092 sync => qq{
1093 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1094
1095 will sync one time
1096
1097 -dest string
1098
1099 the destination target is like [IP:]<Pool>[/Path]
1100
1101 -dest-user string
1102
1103 name of the user on the destination target, root by default
1104
1105 -limit integer
1106
1107 max sync speed in kBytes/s, default unlimited
1108
1109 -maxsnap integer
1110
1111 how much snapshots will be kept before get erased, default 1
1112
1113 -name string
1114
1115 name of the sync job, if not set it is default.
1116 It is only necessary if scheduler allready contains this source.
1117
1118 -source string
1119
1120 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1121
1122 -source-user string
1123
1124 name of the user on the source target, root by default
1125
1126 -verbose boolean
1127
1128 print out the sync progress.
1129
1130 -properties boolean
1131
1132 Include the dataset's properties in the stream.
1133 },
1134 list => qq{
1135 $PROGNAME list
1136
1137 Get a List of all scheduled Sync Jobs
1138 },
1139 status => qq{
1140 $PROGNAME status
1141
1142 Get the status of all scheduled Sync Jobs
1143 },
1144 help => qq{
1145 $PROGNAME help <cmd> [OPTIONS]
1146
1147 Get help about specified command.
1148
1149 <cmd> string
1150
1151 Command name
1152
1153 -verbose boolean
1154
1155 Verbose output format.
1156 },
1157 enable => qq{
1158 $PROGNAME enable -source <string> [OPTIONS]
1159
1160 enable a syncjob and reset error
1161
1162 -name string
1163
1164 name of the sync job, if not set it is default
1165
1166 -source string
1167
1168 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1169 },
1170 disable => qq{
1171 $PROGNAME disable -source <string> [OPTIONS]
1172
1173 pause a sync job
1174
1175 -name string
1176
1177 name of the sync job, if not set it is default
1178
1179 -source string
1180
1181 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1182 },
1183 printpod => 'internal command',
1184
1185 };
1186
1187 if (!$command) {
1188 usage(); die "\n";
1189 } elsif (!$cmd_help->{$command}) {
1190 print "ERROR: unknown command '$command'";
1191 usage(1); die "\n";
1192 }
1193
1194 my @arg = @ARGV;
1195 my $param = parse_argv(@arg);
1196
1197 sub check_params {
1198 for (@_) {
1199 die "$cmd_help->{$command}\n" if !$param->{$_};
1200 }
1201 }
1202
1203 if ($command eq 'destroy') {
1204 check_params(qw(source));
1205
1206 check_target($param->{source});
1207 destroy_job($param);
1208
1209 } elsif ($command eq 'sync') {
1210 check_params(qw(source dest));
1211
1212 check_target($param->{source});
1213 check_target($param->{dest});
1214 sync($param);
1215
1216 } elsif ($command eq 'create') {
1217 check_params(qw(source dest));
1218
1219 check_target($param->{source});
1220 check_target($param->{dest});
1221 init($param);
1222
1223 } elsif ($command eq 'status') {
1224 print status();
1225
1226 } elsif ($command eq 'list') {
1227 print list();
1228
1229 } elsif ($command eq 'help') {
1230 my $help_command = $ARGV[1];
1231
1232 if ($help_command && $cmd_help->{$help_command}) {
1233 die "$cmd_help->{$help_command}\n";
1234
1235 }
1236 if ($param->{verbose}) {
1237 exec("man $PROGNAME");
1238
1239 } else {
1240 usage(1);
1241
1242 }
1243
1244 } elsif ($command eq 'enable') {
1245 check_params(qw(source));
1246
1247 check_target($param->{source});
1248 enable_job($param);
1249
1250 } elsif ($command eq 'disable') {
1251 check_params(qw(source));
1252
1253 check_target($param->{source});
1254 disable_job($param);
1255
1256 } elsif ($command eq 'printpod') {
1257 print_pod();
1258 }
1259
1260 sub usage {
1261 my ($help) = @_;
1262
1263 print("ERROR:\tno command specified\n") if !$help;
1264 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1265 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1266 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1267 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1268 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1269 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1270 print("\t$PROGNAME list\n");
1271 print("\t$PROGNAME status\n");
1272 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1273 }
1274
1275 sub check_target {
1276 my ($target) = @_;
1277 parse_target($target);
1278 }
1279
1280 sub print_pod {
1281
1282 my $synopsis = join("\n", sort values %$cmd_help);
1283
1284 print <<EOF;
1285 =head1 NAME
1286
1287 pve-zsync - PVE ZFS Replication Manager
1288
1289 =head1 SYNOPSIS
1290
1291 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1292
1293 $synopsis
1294
1295 =head1 DESCRIPTION
1296
1297 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1298 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1299 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1300 To config cron see man crontab.
1301
1302 =head2 PVE ZFS Storage sync Tool
1303
1304 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1305
1306 =head1 EXAMPLES
1307
1308 add sync job from local VM to remote ZFS Server
1309 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1310
1311 =head1 IMPORTANT FILES
1312
1313 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1314
1315 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1316
1317 =head1 COPYRIGHT AND DISCLAIMER
1318
1319 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1320
1321 This program is free software: you can redistribute it and/or modify it
1322 under the terms of the GNU Affero General Public License as published
1323 by the Free Software Foundation, either version 3 of the License, or
1324 (at your option) any later version.
1325
1326 This program is distributed in the hope that it will be useful, but
1327 WITHOUT ANY WARRANTY; without even the implied warranty of
1328 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1329 Affero General Public License for more details.
1330
1331 You should have received a copy of the GNU Affero General Public
1332 License along with this program. If not, see
1333 <http://www.gnu.org/licenses/>.
1334
1335 EOF
1336 }