]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
vm_exists: directly check if we look for local guest
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Copy qw(move);
9 use File::Path qw(make_path);
10 use JSON;
11 use IO::File;
12 use String::ShellQuote 'shell_quote';
13
14 my $PROGNAME = "pve-zsync";
15 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
16 my $STATE = "${CONFIG_PATH}/sync_state";
17 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
18 my $PATH = "/usr/sbin";
19 my $PVE_DIR = "/etc/pve/local";
20 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
21 my $LXC_CONF = "${PVE_DIR}/lxc";
22 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
23 my $PROG_PATH = "$PATH/${PROGNAME}";
24 my $INTERVAL = 15;
25 my $DEBUG = 0;
26
27 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
28 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
29 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
30 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
31
32 my $IPV6RE = "(?:" .
33 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
34 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
35 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
36 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
41 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
42
43 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
44 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
45 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
46 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
47 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
48
49 my $command = $ARGV[0];
50
51 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
52 check_bin ('cstream');
53 check_bin ('zfs');
54 check_bin ('ssh');
55 check_bin ('scp');
56 }
57
58 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =
59 sub {
60 die "Signal aborting sync\n";
61 };
62
63 sub check_bin {
64 my ($bin) = @_;
65
66 foreach my $p (split (/:/, $ENV{PATH})) {
67 my $fn = "$p/$bin";
68 if (-x $fn) {
69 return $fn;
70 }
71 }
72
73 die "unable to find command '$bin'\n";
74 }
75
76 sub cut_target_width {
77 my ($path, $maxlen) = @_;
78 $path =~ s@/+@/@g;
79
80 return $path if length($path) <= $maxlen;
81
82 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
83
84 $path =~ s@/([^/]+/?)$@@;
85 my $tail = $1;
86
87 if (length($tail)+3 == $maxlen) {
88 return "../$tail";
89 } elsif (length($tail)+2 >= $maxlen) {
90 return '..'.substr($tail, -$maxlen+2)
91 }
92
93 $path =~ s@(/[^/]+)(?:/|$)@@;
94 my $head = $1;
95 my $both = length($head) + length($tail);
96 my $remaining = $maxlen-$both-4; # -4 for "/../"
97
98 if ($remaining < 0) {
99 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
100 }
101
102 substr($path, ($remaining/2), (length($path)-$remaining), '..');
103 return "$head/" . $path . "/$tail";
104 }
105
106 sub lock {
107 my ($fh) = @_;
108 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
109 }
110
111 sub unlock {
112 my ($fh) = @_;
113 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
114 }
115
116 sub get_status {
117 my ($source, $name, $status) = @_;
118
119 if ($status->{$source->{all}}->{$name}->{status}) {
120 return $status;
121 }
122
123 return undef;
124 }
125
126 sub check_pool_exists {
127 my ($target, $user) = @_;
128
129 my $cmd = [];
130
131 if ($target->{ip}) {
132 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
133 }
134 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
135 eval {
136 run_cmd($cmd);
137 };
138
139 if ($@) {
140 return 0;
141 }
142 return 1;
143 }
144
145 sub parse_target {
146 my ($text) = @_;
147
148 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
149 my $target = {};
150
151 if ($text !~ $TARGETRE) {
152 die "$errstr\n";
153 }
154 $target->{all} = $2;
155 $target->{ip} = $1 if $1;
156 my @parts = split('/', $2);
157
158 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
159
160 my $pool = $target->{pool} = shift(@parts);
161 die "$errstr\n" if !$pool;
162
163 if ($pool =~ m/^\d+$/) {
164 $target->{vmid} = $pool;
165 delete $target->{pool};
166 }
167
168 return $target if (@parts == 0);
169 $target->{last_part} = pop(@parts);
170
171 if ($target->{ip}) {
172 pop(@parts);
173 }
174 if (@parts > 0) {
175 $target->{path} = join('/', @parts);
176 }
177
178 return $target;
179 }
180
181 sub read_cron {
182
183 #This is for the first use to init file;
184 if (!-e $CRONJOBS) {
185 my $new_fh = IO::File->new("> $CRONJOBS");
186 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
187 close($new_fh);
188 return undef;
189 }
190
191 my $fh = IO::File->new("< $CRONJOBS");
192 die "Could not open file $CRONJOBS: $!\n" if !$fh;
193
194 my @text = <$fh>;
195
196 close($fh);
197
198 return encode_cron(@text);
199 }
200
201 sub parse_argv {
202 my (@arg) = @_;
203
204 my $param = {
205 dest => undef,
206 source => undef,
207 verbose => undef,
208 limit => undef,
209 maxsnap => undef,
210 name => undef,
211 skip => undef,
212 method => undef,
213 source_user => undef,
214 dest_user => undef,
215 };
216
217 my ($ret) = GetOptionsFromArray(
218 \@arg,
219 'dest=s' => \$param->{dest},
220 'source=s' => \$param->{source},
221 'verbose' => \$param->{verbose},
222 'limit=i' => \$param->{limit},
223 'maxsnap=i' => \$param->{maxsnap},
224 'name=s' => \$param->{name},
225 'skip' => \$param->{skip},
226 'method=s' => \$param->{method},
227 'source-user=s' => \$param->{source_user},
228 'dest-user=s' => \$param->{dest_user}
229 );
230
231 die "can't parse options\n" if $ret == 0;
232
233 $param->{name} //= "default";
234 $param->{maxsnap} //= 1;
235 $param->{method} //= "ssh";
236 $param->{source_user} //= "root";
237 $param->{dest_user} //= "root";
238
239 return $param;
240 }
241
242 sub add_state_to_job {
243 my ($job) = @_;
244
245 my $states = read_state();
246 my $state = $states->{$job->{source}}->{$job->{name}};
247
248 $job->{state} = $state->{state};
249 $job->{lsync} = $state->{lsync};
250 $job->{vm_type} = $state->{vm_type};
251
252 for (my $i = 0; $state->{"snap$i"}; $i++) {
253 $job->{"snap$i"} = $state->{"snap$i"};
254 }
255
256 return $job;
257 }
258
259 sub encode_cron {
260 my (@text) = @_;
261
262 my $cfg = {};
263
264 while (my $line = shift(@text)) {
265
266 my @arg = split('\s', $line);
267 my $param = parse_argv(@arg);
268
269 if ($param->{source} && $param->{dest}) {
270 my $source = delete $param->{source};
271 my $name = delete $param->{name};
272
273 $cfg->{$source}->{$name} = $param;
274 }
275 }
276
277 return $cfg;
278 }
279
280 sub param_to_job {
281 my ($param) = @_;
282
283 my $job = {};
284
285 my $source = parse_target($param->{source});
286 my $dest = parse_target($param->{dest}) if $param->{dest};
287
288 $job->{name} = !$param->{name} ? "default" : $param->{name};
289 $job->{dest} = $param->{dest} if $param->{dest};
290 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
291 $job->{method} = "ssh" if !$job->{method};
292 $job->{limit} = $param->{limit};
293 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
294 $job->{source} = $param->{source};
295 $job->{source_user} = $param->{source_user};
296 $job->{dest_user} = $param->{dest_user};
297
298 return $job;
299 }
300
301 sub read_state {
302
303 if (!-e $STATE) {
304 make_path $CONFIG_PATH;
305 my $new_fh = IO::File->new("> $STATE");
306 die "Could not create $STATE: $!\n" if !$new_fh;
307 print $new_fh "{}";
308 close($new_fh);
309 return undef;
310 }
311
312 my $fh = IO::File->new("< $STATE");
313 die "Could not open file $STATE: $!\n" if !$fh;
314
315 my $text = <$fh>;
316 my $states = decode_json($text);
317
318 close($fh);
319
320 return $states;
321 }
322
323 sub update_state {
324 my ($job) = @_;
325 my $text;
326 my $in_fh;
327
328 eval {
329
330 $in_fh = IO::File->new("< $STATE");
331 die "Could not open file $STATE: $!\n" if !$in_fh;
332 lock($in_fh);
333 $text = <$in_fh>;
334 };
335
336 my $out_fh = IO::File->new("> $STATE.new");
337 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
338
339 my $states = {};
340 my $state = {};
341 if ($text){
342 $states = decode_json($text);
343 $state = $states->{$job->{source}}->{$job->{name}};
344 }
345
346 if ($job->{state} ne "del") {
347 $state->{state} = $job->{state};
348 $state->{lsync} = $job->{lsync};
349 $state->{vm_type} = $job->{vm_type};
350
351 for (my $i = 0; $job->{"snap$i"} ; $i++) {
352 $state->{"snap$i"} = $job->{"snap$i"};
353 }
354 $states->{$job->{source}}->{$job->{name}} = $state;
355 } else {
356
357 delete $states->{$job->{source}}->{$job->{name}};
358 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
359 }
360
361 $text = encode_json($states);
362 print $out_fh $text;
363
364 close($out_fh);
365 move("$STATE.new", $STATE);
366 eval {
367 close($in_fh);
368 };
369
370 return $states;
371 }
372
373 sub update_cron {
374 my ($job) = @_;
375
376 my $updated;
377 my $has_header;
378 my $line_no = 0;
379 my $text = "";
380 my $header = "SHELL=/bin/sh\n";
381 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
382
383 my $fh = IO::File->new("< $CRONJOBS");
384 die "Could not open file $CRONJOBS: $!\n" if !$fh;
385 lock($fh);
386
387 my @test = <$fh>;
388
389 while (my $line = shift(@test)) {
390 chomp($line);
391 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
392 $updated = 1;
393 next if $job->{state} eq "del";
394 $text .= format_job($job, $line);
395 } else {
396 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
397 $has_header = 1;
398 }
399 $text .= "$line\n";
400 }
401 $line_no++;
402 }
403
404 if (!$has_header) {
405 $text = "$header$text";
406 }
407
408 if (!$updated) {
409 $text .= format_job($job);
410 }
411 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
412 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
413
414 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
415 close ($new_fh);
416
417 die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
418 close ($fh);
419 }
420
421 sub format_job {
422 my ($job, $line) = @_;
423 my $text = "";
424
425 if ($job->{state} eq "stopped") {
426 $text = "#";
427 }
428 if ($line) {
429 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
430 $text .= $1;
431 } else {
432 $text .= "*/$INTERVAL * * * *";
433 }
434 $text .= " root";
435 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
436 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
437 $text .= " --limit $job->{limit}" if $job->{limit};
438 $text .= " --method $job->{method}";
439 $text .= " --verbose" if $job->{verbose};
440 $text .= " --source-user $job->{source_user}";
441 $text .= " --dest-user $job->{dest_user}";
442 $text .= "\n";
443
444 return $text;
445 }
446
447 sub list {
448
449 my $cfg = read_cron();
450
451 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
452
453 my $states = read_state();
454 foreach my $source (sort keys%{$cfg}) {
455 foreach my $name (sort keys%{$cfg->{$source}}) {
456 $list .= sprintf("%-25s", cut_target_width($source, 25));
457 $list .= sprintf("%-25s", cut_target_width($name, 25));
458 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
459 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
460 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
461 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
462 }
463 }
464
465 return $list;
466 }
467
468 sub vm_exists {
469 my ($target, $user) = @_;
470
471 return undef if !defined($target->{vmid});
472
473 my $conf_fn = "$target->{vmid}.conf";
474
475 if ($target->{ip}) {
476 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
477 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
478 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
479 } else {
480 return "qemu" if -f "$QEMU_CONF/$conf_fn";
481 return "lxc" if -f "$LXC_CONF/$conf_fn";
482 }
483
484 return undef;
485 }
486
487 sub init {
488 my ($param) = @_;
489
490 my $cfg = read_cron();
491
492 my $job = param_to_job($param);
493
494 $job->{state} = "ok";
495 $job->{lsync} = 0;
496
497 my $source = parse_target($param->{source});
498 my $dest = parse_target($param->{dest});
499
500 if (my $ip = $dest->{ip}) {
501 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
502 }
503
504 if (my $ip = $source->{ip}) {
505 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
506 }
507
508 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
509
510 if (!defined($source->{vmid})) {
511 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
512 }
513
514 my $vm_type = vm_exists($source, $param->{source_user});
515 $job->{vm_type} = $vm_type;
516 $source->{vm_type} = $vm_type;
517
518 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
519
520 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
521
522 #check if vm has zfs disks if not die;
523 get_disks($source, $param->{source_user}) if $source->{vmid};
524
525 update_cron($job);
526 update_state($job);
527
528 eval {
529 sync($param) if !$param->{skip};
530 };
531 if(my $err = $@) {
532 destroy_job($param);
533 print $err;
534 }
535 }
536
537 sub get_job {
538 my ($param) = @_;
539
540 my $cfg = read_cron();
541
542 if (!$cfg->{$param->{source}}->{$param->{name}}) {
543 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
544 }
545 my $job = $cfg->{$param->{source}}->{$param->{name}};
546 $job->{name} = $param->{name};
547 $job->{source} = $param->{source};
548 $job = add_state_to_job($job);
549
550 return $job;
551 }
552
553 sub destroy_job {
554 my ($param) = @_;
555
556 my $job = get_job($param);
557 $job->{state} = "del";
558
559 update_cron($job);
560 update_state($job);
561 }
562
563 sub sync {
564 my ($param) = @_;
565
566 my $lock_fh = IO::File->new("> $LOCKFILE");
567 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
568 lock($lock_fh);
569
570 my $date = get_date();
571 my $job;
572 eval {
573 $job = get_job($param);
574 };
575
576 if ($job && defined($job->{state}) && $job->{state} eq "syncing") {
577 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
578 }
579
580 my $dest = parse_target($param->{dest});
581 my $source = parse_target($param->{source});
582
583 my $sync_path = sub {
584 my ($source, $dest, $job, $param, $date) = @_;
585
586 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
587
588 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
589
590 send_image($source, $dest, $param);
591
592 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
593
594 };
595
596 my $vm_type = vm_exists($source, $param->{source_user});
597 $source->{vm_type} = $vm_type;
598
599 if ($job) {
600 $job->{state} = "syncing";
601 $job->{vm_type} = $vm_type if !$job->{vm_type};
602 update_state($job);
603 }
604
605 eval{
606 if ($source->{vmid}) {
607 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
608 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
609 my $disks = get_disks($source, $param->{source_user});
610
611 foreach my $disk (sort keys %{$disks}) {
612 $source->{all} = $disks->{$disk}->{all};
613 $source->{pool} = $disks->{$disk}->{pool};
614 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
615 $source->{last_part} = $disks->{$disk}->{last_part};
616 &$sync_path($source, $dest, $job, $param, $date);
617 }
618 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
619 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user});
620 } else {
621 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user});
622 }
623 } else {
624 &$sync_path($source, $dest, $job, $param, $date);
625 }
626 };
627 if(my $err = $@) {
628 if ($job) {
629 $job->{state} = "error";
630 update_state($job);
631 unlock($lock_fh);
632 close($lock_fh);
633 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
634 }
635 die "$err\n";
636 }
637
638 if ($job) {
639 $job->{state} = "ok";
640 $job->{lsync} = $date;
641 update_state($job);
642 }
643
644 unlock($lock_fh);
645 close($lock_fh);
646 }
647
648 sub snapshot_get{
649 my ($source, $dest, $max_snap, $name, $source_user) = @_;
650
651 my $cmd = [];
652 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
653 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
654 push @$cmd, $source->{all};
655
656 my $raw = run_cmd($cmd);
657 my $index = 0;
658 my $line = "";
659 my $last_snap = undef;
660 my $old_snap;
661
662 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
663 $line = $1;
664 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
665
666 $last_snap = $1 if (!$last_snap);
667 $old_snap = $1;
668 $index++;
669 if ($index == $max_snap) {
670 $source->{destroy} = 1;
671 last;
672 };
673 }
674 }
675
676 return ($old_snap, $last_snap) if $last_snap;
677
678 return undef;
679 }
680
681 sub snapshot_add {
682 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
683
684 my $snap_name = "rep_$name\_".$date;
685
686 $source->{new_snap} = $snap_name;
687
688 my $path = "$source->{all}\@$snap_name";
689
690 my $cmd = [];
691 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
692 push @$cmd, 'zfs', 'snapshot', $path;
693 eval{
694 run_cmd($cmd);
695 };
696
697 if (my $err = $@) {
698 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
699 die "$err\n";
700 }
701 }
702
703 sub write_cron {
704 my ($cfg) = @_;
705
706 my $text = "SHELL=/bin/sh\n";
707 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
708
709 my $fh = IO::File->new("> $CRONJOBS");
710 die "Could not open file: $!\n" if !$fh;
711
712 foreach my $source (sort keys%{$cfg}) {
713 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
714 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
715 $text .= "$PROG_PATH sync";
716 $text .= " -source ";
717 if ($cfg->{$source}->{$sync_name}->{vmid}) {
718 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
719 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
720 } else {
721 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
722 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
723 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
724 }
725 $text .= " -dest ";
726 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
727 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
728 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
729 $text .= " -name $sync_name ";
730 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
731 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
732 $text .= "\n";
733 }
734 }
735 die "Can't write to cron\n" if (!print($fh $text));
736 close($fh);
737 }
738
739 sub get_disks {
740 my ($target, $user) = @_;
741
742 my $cmd = [];
743 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
744
745 if ($target->{vm_type} eq 'qemu') {
746 push @$cmd, 'qm', 'config', $target->{vmid};
747 } elsif ($target->{vm_type} eq 'lxc') {
748 push @$cmd, 'pct', 'config', $target->{vmid};
749 } else {
750 die "VM Type unknown\n";
751 }
752
753 my $res = run_cmd($cmd);
754
755 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
756
757 return $disks;
758 }
759
760 sub run_cmd {
761 my ($cmd) = @_;
762 print "Start CMD\n" if $DEBUG;
763 print Dumper $cmd if $DEBUG;
764 if (ref($cmd) eq 'ARRAY') {
765 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
766 }
767 my $output = `$cmd 2>&1`;
768
769 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
770
771 chomp($output);
772 print Dumper $output if $DEBUG;
773 print "END CMD\n" if $DEBUG;
774 return $output;
775 }
776
777 sub parse_disks {
778 my ($text, $ip, $vm_type, $user) = @_;
779
780 my $disks;
781
782 my $num = 0;
783 while ($text && $text =~ s/^(.*?)(\n|$)//) {
784 my $line = $1;
785
786 next if $line =~ /media=cdrom/;
787 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
788
789 #QEMU if backup is not set include in sync
790 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
791
792 #LXC if backup is not set do no in sync
793 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
794
795 my $disk = undef;
796 my $stor = undef;
797 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
798 my @parameter = split(/,/,$1);
799
800 foreach my $opt (@parameter) {
801 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
802 $disk = $2;
803 $stor = $1;
804 last;
805 }
806 }
807 }
808 if (!defined($disk) || !defined($stor)) {
809 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
810 next;
811 }
812
813 my $cmd = [];
814 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
815 push @$cmd, 'pvesm', 'path', "$stor$disk";
816 my $path = run_cmd($cmd);
817
818 die "Get no path from pvesm path $stor$disk\n" if !$path;
819
820 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
821
822 my @array = split('/', $1);
823 $disks->{$num}->{pool} = shift(@array);
824 $disks->{$num}->{all} = $disks->{$num}->{pool};
825 if (0 < @array) {
826 $disks->{$num}->{path} = join('/', @array);
827 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
828 }
829 $disks->{$num}->{last_part} = $disk;
830 $disks->{$num}->{all} .= "\/$disk";
831
832 $num++;
833 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
834
835 $disks->{$num}->{pool} = $1;
836 $disks->{$num}->{all} = $disks->{$num}->{pool};
837
838 if ($2) {
839 $disks->{$num}->{path} = $3;
840 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
841 }
842
843 $disks->{$num}->{last_part} = $disk;
844 $disks->{$num}->{all} .= "\/$disk";
845
846 $num++;
847
848 } else {
849 die "ERROR: in path\n";
850 }
851 }
852
853 die "Vm include no disk on zfs.\n" if !$disks->{0};
854 return $disks;
855 }
856
857 sub snapshot_destroy {
858 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
859
860 my @zfscmd = ('zfs', 'destroy');
861 my $snapshot = "$source->{all}\@$snap";
862
863 eval {
864 if($source->{ip} && $method eq 'ssh'){
865 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
866 } else {
867 run_cmd([@zfscmd, $snapshot]);
868 }
869 };
870 if (my $erro = $@) {
871 warn "WARN: $erro";
872 }
873 if ($dest) {
874 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
875
876 my $path = "$dest->{all}";
877 $path .= "/$source->{last_part}" if $source->{last_part};
878
879 eval {
880 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
881 };
882 if (my $erro = $@) {
883 warn "WARN: $erro";
884 }
885 }
886 }
887
888 sub snapshot_exist {
889 my ($source , $dest, $method, $dest_user) = @_;
890
891 my $cmd = [];
892 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--' if $dest->{ip};
893 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
894
895 my $path = $dest->{all};
896 $path .= "/$source->{last_part}" if $source->{last_part};
897 $path .= "\@$source->{old_snap}";
898
899 push @$cmd, $path;
900
901
902 my $text = "";
903 eval {$text =run_cmd($cmd);};
904 if (my $erro =$@) {
905 warn "WARN: $erro";
906 return undef;
907 }
908
909 while ($text && $text =~ s/^(.*?)(\n|$)//) {
910 my $line =$1;
911 return 1 if $line =~ m/^.*$source->{old_snap}$/;
912 }
913 }
914
915 sub send_image {
916 my ($source, $dest, $param) = @_;
917
918 my $cmd = [];
919
920 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
921 push @$cmd, 'zfs', 'send';
922 push @$cmd, '-v' if $param->{verbose};
923
924 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{dest_user})) {
925 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
926 }
927 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
928
929 if ($param->{limit}){
930 my $bwl = $param->{limit}*1024;
931 push @$cmd, \'|', 'cstream', '-t', $bwl;
932 }
933 my $target = "$dest->{all}";
934 $target .= "/$source->{last_part}" if $source->{last_part};
935 $target =~ s!/+!/!g;
936
937 push @$cmd, \'|';
938 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
939 push @$cmd, 'zfs', 'recv', '-F', '--';
940 push @$cmd, "$target";
941
942 eval {
943 run_cmd($cmd)
944 };
945
946 if (my $erro = $@) {
947 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
948 die $erro;
949 };
950 }
951
952
953 sub send_config{
954 my ($source, $dest, $method, $source_user, $dest_user) = @_;
955
956 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
957 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
958
959 my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
960
961 $dest_target_new = $config_dir.'/'.$dest_target_new;
962
963 if ($method eq 'ssh'){
964 if ($dest->{ip} && $source->{ip}) {
965 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
966 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
967 } elsif ($dest->{ip}) {
968 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
969 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
970 } elsif ($source->{ip}) {
971 run_cmd(['mkdir', '-p', '--', $config_dir]);
972 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
973 }
974
975 if ($source->{destroy}){
976 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
977 if($dest->{ip}){
978 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
979 } else {
980 run_cmd(['rm', '-f', '--', $dest_target_old]);
981 }
982 }
983 } elsif ($method eq 'local') {
984 run_cmd(['mkdir', '-p', '--', $config_dir]);
985 run_cmd(['cp', $source_target, $dest_target_new]);
986 }
987 }
988
989 sub get_date {
990 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
991 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
992
993 return $datestamp;
994 }
995
996 sub status {
997 my $cfg = read_cron();
998
999 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1000
1001 my $states = read_state();
1002
1003 foreach my $source (sort keys%{$cfg}) {
1004 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1005 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1006 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1007 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1008 }
1009 }
1010
1011 return $status_list;
1012 }
1013
1014 sub enable_job {
1015 my ($param) = @_;
1016
1017 my $job = get_job($param);
1018 $job->{state} = "ok";
1019 update_state($job);
1020 update_cron($job);
1021 }
1022
1023 sub disable_job {
1024 my ($param) = @_;
1025
1026 my $job = get_job($param);
1027 $job->{state} = "stopped";
1028 update_state($job);
1029 update_cron($job);
1030 }
1031
1032 my $cmd_help = {
1033 destroy => qq{
1034 $PROGNAME destroy -source <string> [OPTIONS]
1035
1036 remove a sync Job from the scheduler
1037
1038 -name string
1039
1040 name of the sync job, if not set it is default
1041
1042 -source string
1043
1044 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1045 },
1046 create => qq{
1047 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1048
1049 Create a sync Job
1050
1051 -dest string
1052
1053 the destination target is like [IP]:<Pool>[/Path]
1054
1055 -dest-user string
1056
1057 name of the user on the destination target, root by default
1058
1059 -limit integer
1060
1061 max sync speed in kBytes/s, default unlimited
1062
1063 -maxsnap string
1064
1065 how much snapshots will be kept before get erased, default 1
1066
1067 -name string
1068
1069 name of the sync job, if not set it is default
1070
1071 -skip boolean
1072
1073 if this flag is set it will skip the first sync
1074
1075 -source string
1076
1077 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1078
1079 -source-user string
1080
1081 name of the user on the source target, root by default
1082 },
1083 sync => qq{
1084 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1085
1086 will sync one time
1087
1088 -dest string
1089
1090 the destination target is like [IP:]<Pool>[/Path]
1091
1092 -dest-user string
1093
1094 name of the user on the destination target, root by default
1095
1096 -limit integer
1097
1098 max sync speed in kBytes/s, default unlimited
1099
1100 -maxsnap integer
1101
1102 how much snapshots will be kept before get erased, default 1
1103
1104 -name string
1105
1106 name of the sync job, if not set it is default.
1107 It is only necessary if scheduler allready contains this source.
1108
1109 -source string
1110
1111 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1112
1113 -source-user string
1114
1115 name of the user on the source target, root by default
1116
1117 -verbose boolean
1118
1119 print out the sync progress.
1120 },
1121 list => qq{
1122 $PROGNAME list
1123
1124 Get a List of all scheduled Sync Jobs
1125 },
1126 status => qq{
1127 $PROGNAME status
1128
1129 Get the status of all scheduled Sync Jobs
1130 },
1131 help => qq{
1132 $PROGNAME help <cmd> [OPTIONS]
1133
1134 Get help about specified command.
1135
1136 <cmd> string
1137
1138 Command name
1139
1140 -verbose boolean
1141
1142 Verbose output format.
1143 },
1144 enable => qq{
1145 $PROGNAME enable -source <string> [OPTIONS]
1146
1147 enable a syncjob and reset error
1148
1149 -name string
1150
1151 name of the sync job, if not set it is default
1152
1153 -source string
1154
1155 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1156 },
1157 disable => qq{
1158 $PROGNAME disable -source <string> [OPTIONS]
1159
1160 pause a sync job
1161
1162 -name string
1163
1164 name of the sync job, if not set it is default
1165
1166 -source string
1167
1168 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1169 },
1170 printpod => 'internal command',
1171
1172 };
1173
1174 if (!$command) {
1175 usage(); die "\n";
1176 } elsif (!$cmd_help->{$command}) {
1177 print "ERROR: unknown command '$command'";
1178 usage(1); die "\n";
1179 }
1180
1181 my @arg = @ARGV;
1182 my $param = parse_argv(@arg);
1183
1184 sub check_params {
1185 for (@_) {
1186 die "$cmd_help->{$command}\n" if !$param->{$_};
1187 }
1188 }
1189
1190 if ($command eq 'destroy') {
1191 check_params(qw(source));
1192
1193 check_target($param->{source});
1194 destroy_job($param);
1195
1196 } elsif ($command eq 'sync') {
1197 check_params(qw(source dest));
1198
1199 check_target($param->{source});
1200 check_target($param->{dest});
1201 sync($param);
1202
1203 } elsif ($command eq 'create') {
1204 check_params(qw(source dest));
1205
1206 check_target($param->{source});
1207 check_target($param->{dest});
1208 init($param);
1209
1210 } elsif ($command eq 'status') {
1211 print status();
1212
1213 } elsif ($command eq 'list') {
1214 print list();
1215
1216 } elsif ($command eq 'help') {
1217 my $help_command = $ARGV[1];
1218
1219 if ($help_command && $cmd_help->{$help_command}) {
1220 die "$cmd_help->{$help_command}\n";
1221
1222 }
1223 if ($param->{verbose}) {
1224 exec("man $PROGNAME");
1225
1226 } else {
1227 usage(1);
1228
1229 }
1230
1231 } elsif ($command eq 'enable') {
1232 check_params(qw(source));
1233
1234 check_target($param->{source});
1235 enable_job($param);
1236
1237 } elsif ($command eq 'disable') {
1238 check_params(qw(source));
1239
1240 check_target($param->{source});
1241 disable_job($param);
1242
1243 } elsif ($command eq 'printpod') {
1244 print_pod();
1245 }
1246
1247 sub usage {
1248 my ($help) = @_;
1249
1250 print("ERROR:\tno command specified\n") if !$help;
1251 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1252 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1253 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1254 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1255 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1256 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1257 print("\t$PROGNAME list\n");
1258 print("\t$PROGNAME status\n");
1259 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1260 }
1261
1262 sub check_target {
1263 my ($target) = @_;
1264 parse_target($target);
1265 }
1266
1267 sub print_pod {
1268
1269 my $synopsis = join("\n", sort values %$cmd_help);
1270
1271 print <<EOF;
1272 =head1 NAME
1273
1274 pve-zsync - PVE ZFS Replication Manager
1275
1276 =head1 SYNOPSIS
1277
1278 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1279
1280 $synopsis
1281
1282 =head1 DESCRIPTION
1283
1284 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1285 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1286 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1287 To config cron see man crontab.
1288
1289 =head2 PVE ZFS Storage sync Tool
1290
1291 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1292
1293 =head1 EXAMPLES
1294
1295 add sync job from local VM to remote ZFS Server
1296 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1297
1298 =head1 IMPORTANT FILES
1299
1300 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1301
1302 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1303
1304 =head1 COPYRIGHT AND DISCLAIMER
1305
1306 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1307
1308 This program is free software: you can redistribute it and/or modify it
1309 under the terms of the GNU Affero General Public License as published
1310 by the Free Software Foundation, either version 3 of the License, or
1311 (at your option) any later version.
1312
1313 This program is distributed in the hope that it will be useful, but
1314 WITHOUT ANY WARRANTY; without even the implied warranty of
1315 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1316 Affero General Public License for more details.
1317
1318 You should have received a copy of the GNU Affero General Public
1319 License along with this program. If not, see
1320 <http://www.gnu.org/licenses/>.
1321
1322 EOF
1323 }