]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
introduce and use read_file helper
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $PROG_PATH = "$PATH/${PROGNAME}";
22 my $INTERVAL = 15;
23 my $DEBUG;
24
25 BEGIN {
26 $DEBUG = 0; # change default here. not above on declaration!
27 $DEBUG ||= $ENV{ZSYNC_DEBUG};
28 if ($DEBUG) {
29 require Data::Dumper;
30 Data::Dumper->import();
31 }
32 }
33
34 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
35 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
36 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
37 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
38
39 my $IPV6RE = "(?:" .
40 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
41 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
42 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
43 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
49
50 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
51 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
52 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
53 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
54 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
55
56 my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|mp)\d+)|rootfs): /;
57
58 my $command = $ARGV[0];
59
60 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
61 check_bin ('cstream');
62 check_bin ('zfs');
63 check_bin ('ssh');
64 check_bin ('scp');
65 }
66
67 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
68 die "Signaled, aborting sync: $!\n";
69 };
70
71 sub check_bin {
72 my ($bin) = @_;
73
74 foreach my $p (split (/:/, $ENV{PATH})) {
75 my $fn = "$p/$bin";
76 if (-x $fn) {
77 return $fn;
78 }
79 }
80
81 die "unable to find command '$bin'\n";
82 }
83
84 sub read_file {
85 my ($filename, $one_line_only) = @_;
86
87 my $fh = IO::File->new($filename, "r")
88 or die "Could not open file ${filename}: $!\n";
89
90 my $text = $one_line_only ? <$fh> : [ <$fh> ];
91
92 close($fh);
93
94 return $text;
95 }
96
97 sub cut_target_width {
98 my ($path, $maxlen) = @_;
99 $path =~ s@/+@/@g;
100
101 return $path if length($path) <= $maxlen;
102
103 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
104
105 $path =~ s@/([^/]+/?)$@@;
106 my $tail = $1;
107
108 if (length($tail)+3 == $maxlen) {
109 return "../$tail";
110 } elsif (length($tail)+2 >= $maxlen) {
111 return '..'.substr($tail, -$maxlen+2)
112 }
113
114 $path =~ s@(/[^/]+)(?:/|$)@@;
115 my $head = $1;
116 my $both = length($head) + length($tail);
117 my $remaining = $maxlen-$both-4; # -4 for "/../"
118
119 if ($remaining < 0) {
120 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
121 }
122
123 substr($path, ($remaining/2), (length($path)-$remaining), '..');
124 return "$head/" . $path . "/$tail";
125 }
126
127 sub locked {
128 my ($lock_fn, $code) = @_;
129
130 my $lock_fh = IO::File->new("> $lock_fn");
131
132 flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
133 my $res = eval { $code->() };
134 my $err = $@;
135
136 flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
137 die "$err" if $err;
138
139 close($lock_fh);
140 return $res;
141 }
142
143 sub get_status {
144 my ($source, $name, $status) = @_;
145
146 if ($status->{$source->{all}}->{$name}->{status}) {
147 return $status;
148 }
149
150 return undef;
151 }
152
153 sub check_pool_exists {
154 my ($target, $user) = @_;
155
156 my $cmd = [];
157
158 if ($target->{ip}) {
159 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
160 }
161 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
162 eval {
163 run_cmd($cmd);
164 };
165
166 if ($@) {
167 return 0;
168 }
169 return 1;
170 }
171
172 sub parse_target {
173 my ($text) = @_;
174
175 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
176 my $target = {};
177
178 if ($text !~ $TARGETRE) {
179 die "$errstr\n";
180 }
181 $target->{all} = $2;
182 $target->{ip} = $1 if $1;
183 my @parts = split('/', $2);
184
185 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
186
187 my $pool = $target->{pool} = shift(@parts);
188 die "$errstr\n" if !$pool;
189
190 if ($pool =~ m/^\d+$/) {
191 $target->{vmid} = $pool;
192 delete $target->{pool};
193 }
194
195 return $target if (@parts == 0);
196 $target->{last_part} = pop(@parts);
197
198 if ($target->{ip}) {
199 pop(@parts);
200 }
201 if (@parts > 0) {
202 $target->{path} = join('/', @parts);
203 }
204
205 return $target;
206 }
207
208 sub read_cron {
209
210 #This is for the first use to init file;
211 if (!-e $CRONJOBS) {
212 my $new_fh = IO::File->new("> $CRONJOBS");
213 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
214 close($new_fh);
215 return undef;
216 }
217
218 my $text = read_file($CRONJOBS, 0);
219
220 return encode_cron(@{$text});
221 }
222
223 sub parse_argv {
224 my (@arg) = @_;
225
226 my $param = {
227 dest => undef,
228 source => undef,
229 verbose => undef,
230 limit => undef,
231 maxsnap => undef,
232 name => undef,
233 skip => undef,
234 method => undef,
235 source_user => undef,
236 dest_user => undef,
237 properties => undef,
238 dest_config_path => undef,
239 };
240
241 my ($ret) = GetOptionsFromArray(
242 \@arg,
243 'dest=s' => \$param->{dest},
244 'source=s' => \$param->{source},
245 'verbose' => \$param->{verbose},
246 'limit=i' => \$param->{limit},
247 'maxsnap=i' => \$param->{maxsnap},
248 'name=s' => \$param->{name},
249 'skip' => \$param->{skip},
250 'method=s' => \$param->{method},
251 'source-user=s' => \$param->{source_user},
252 'dest-user=s' => \$param->{dest_user},
253 'properties' => \$param->{properties},
254 'dest-config-path=s' => \$param->{dest_config_path},
255 );
256
257 die "can't parse options\n" if $ret == 0;
258
259 $param->{name} //= "default";
260 $param->{maxsnap} //= 1;
261 $param->{method} //= "ssh";
262 $param->{source_user} //= "root";
263 $param->{dest_user} //= "root";
264
265 return $param;
266 }
267
268 sub add_state_to_job {
269 my ($job) = @_;
270
271 my $states = read_state();
272 my $state = $states->{$job->{source}}->{$job->{name}};
273
274 $job->{state} = $state->{state};
275 $job->{lsync} = $state->{lsync};
276 $job->{vm_type} = $state->{vm_type};
277
278 for (my $i = 0; $state->{"snap$i"}; $i++) {
279 $job->{"snap$i"} = $state->{"snap$i"};
280 }
281
282 return $job;
283 }
284
285 sub encode_cron {
286 my (@text) = @_;
287
288 my $cfg = {};
289
290 while (my $line = shift(@text)) {
291
292 my @arg = split('\s', $line);
293 my $param = parse_argv(@arg);
294
295 if ($param->{source} && $param->{dest}) {
296 my $source = delete $param->{source};
297 my $name = delete $param->{name};
298
299 $cfg->{$source}->{$name} = $param;
300 }
301 }
302
303 return $cfg;
304 }
305
306 sub param_to_job {
307 my ($param) = @_;
308
309 my $job = {};
310
311 my $source = parse_target($param->{source});
312 my $dest = parse_target($param->{dest}) if $param->{dest};
313
314 $job->{name} = !$param->{name} ? "default" : $param->{name};
315 $job->{dest} = $param->{dest} if $param->{dest};
316 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
317 $job->{method} = "ssh" if !$job->{method};
318 $job->{limit} = $param->{limit};
319 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
320 $job->{source} = $param->{source};
321 $job->{source_user} = $param->{source_user};
322 $job->{dest_user} = $param->{dest_user};
323 $job->{properties} = !!$param->{properties};
324 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
325
326 return $job;
327 }
328
329 sub read_state {
330
331 if (!-e $STATE) {
332 make_path $CONFIG_PATH;
333 my $new_fh = IO::File->new("> $STATE");
334 die "Could not create $STATE: $!\n" if !$new_fh;
335 print $new_fh "{}";
336 close($new_fh);
337 return undef;
338 }
339
340 my $text = read_file($STATE, 1);
341 return decode_json($text);
342 }
343
344 sub update_state {
345 my ($job) = @_;
346
347 my $text = eval { read_file($STATE, 1); };
348
349 my $out_fh = IO::File->new("> $STATE.new");
350 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
351
352 my $states = {};
353 my $state = {};
354 if ($text){
355 $states = decode_json($text);
356 $state = $states->{$job->{source}}->{$job->{name}};
357 }
358
359 if ($job->{state} ne "del") {
360 $state->{state} = $job->{state};
361 $state->{lsync} = $job->{lsync};
362 $state->{vm_type} = $job->{vm_type};
363
364 for (my $i = 0; $job->{"snap$i"} ; $i++) {
365 $state->{"snap$i"} = $job->{"snap$i"};
366 }
367 $states->{$job->{source}}->{$job->{name}} = $state;
368 } else {
369
370 delete $states->{$job->{source}}->{$job->{name}};
371 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
372 }
373
374 $text = encode_json($states);
375 print $out_fh $text;
376
377 close($out_fh);
378 rename "$STATE.new", $STATE;
379
380 return $states;
381 }
382
383 sub update_cron {
384 my ($job) = @_;
385
386 my $updated;
387 my $has_header;
388 my $line_no = 0;
389 my $text = "";
390 my $header = "SHELL=/bin/sh\n";
391 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
392
393 my $current = read_file($CRONJOBS, 0);
394
395 foreach my $line (@{$current}) {
396 chomp($line);
397 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
398 $updated = 1;
399 next if $job->{state} eq "del";
400 $text .= format_job($job, $line);
401 } else {
402 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
403 $has_header = 1;
404 }
405 $text .= "$line\n";
406 }
407 $line_no++;
408 }
409
410 if (!$has_header) {
411 $text = "$header$text";
412 }
413
414 if (!$updated) {
415 $text .= format_job($job);
416 }
417 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
418 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
419
420 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
421 close ($new_fh);
422
423 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
424 }
425
426 sub format_job {
427 my ($job, $line) = @_;
428 my $text = "";
429
430 if ($job->{state} eq "stopped") {
431 $text = "#";
432 }
433 if ($line) {
434 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
435 $text .= $1;
436 } else {
437 $text .= "*/$INTERVAL * * * *";
438 }
439 $text .= " root";
440 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
441 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
442 $text .= " --limit $job->{limit}" if $job->{limit};
443 $text .= " --method $job->{method}";
444 $text .= " --verbose" if $job->{verbose};
445 $text .= " --source-user $job->{source_user}";
446 $text .= " --dest-user $job->{dest_user}";
447 $text .= " --properties" if $job->{properties};
448 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
449 $text .= "\n";
450
451 return $text;
452 }
453
454 sub list {
455
456 my $cfg = read_cron();
457
458 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
459
460 my $states = read_state();
461 foreach my $source (sort keys%{$cfg}) {
462 foreach my $name (sort keys%{$cfg->{$source}}) {
463 $list .= sprintf("%-25s", cut_target_width($source, 25));
464 $list .= sprintf("%-25s", cut_target_width($name, 25));
465 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
466 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
467 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
468 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
469 }
470 }
471
472 return $list;
473 }
474
475 sub vm_exists {
476 my ($target, $user) = @_;
477
478 return undef if !defined($target->{vmid});
479
480 my $conf_fn = "$target->{vmid}.conf";
481
482 if ($target->{ip}) {
483 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
484 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
485 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
486 } else {
487 return "qemu" if -f "$QEMU_CONF/$conf_fn";
488 return "lxc" if -f "$LXC_CONF/$conf_fn";
489 }
490
491 return undef;
492 }
493
494 sub init {
495 my ($param) = @_;
496
497 locked("$CONFIG_PATH/cron_and_state.lock", sub {
498 my $cfg = read_cron();
499
500 my $job = param_to_job($param);
501
502 $job->{state} = "ok";
503 $job->{lsync} = 0;
504
505 my $source = parse_target($param->{source});
506 my $dest = parse_target($param->{dest});
507
508 if (my $ip = $dest->{ip}) {
509 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
510 }
511
512 if (my $ip = $source->{ip}) {
513 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
514 }
515
516 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
517
518 if (!defined($source->{vmid})) {
519 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
520 }
521
522 my $vm_type = vm_exists($source, $param->{source_user});
523 $job->{vm_type} = $vm_type;
524 $source->{vm_type} = $vm_type;
525
526 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
527
528 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
529
530 #check if vm has zfs disks if not die;
531 get_disks($source, $param->{source_user}) if $source->{vmid};
532
533 update_cron($job);
534 update_state($job);
535 }); #cron and state lock
536
537 return if $param->{skip};
538
539 eval { sync($param) };
540 if (my $err = $@) {
541 destroy_job($param);
542 print $err;
543 }
544 }
545
546 sub get_job {
547 my ($param) = @_;
548
549 my $cfg = read_cron();
550
551 if (!$cfg->{$param->{source}}->{$param->{name}}) {
552 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
553 }
554 my $job = $cfg->{$param->{source}}->{$param->{name}};
555 $job->{name} = $param->{name};
556 $job->{source} = $param->{source};
557 $job = add_state_to_job($job);
558
559 return $job;
560 }
561
562 sub destroy_job {
563 my ($param) = @_;
564
565 locked("$CONFIG_PATH/cron_and_state.lock", sub {
566 my $job = get_job($param);
567 $job->{state} = "del";
568
569 update_cron($job);
570 update_state($job);
571 });
572 }
573
574 sub sync {
575 my ($param) = @_;
576
577 my $job;
578
579 locked("$CONFIG_PATH/cron_and_state.lock", sub {
580 eval { $job = get_job($param) };
581
582 if ($job) {
583 if (defined($job->{state}) && ($job->{state} eq "syncing" || $job->{state} eq "waiting")) {
584 die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
585 }
586
587 $job->{state} = "waiting";
588 update_state($job);
589 }
590 });
591
592 locked("$CONFIG_PATH/sync.lock", sub {
593
594 my $date = get_date();
595
596 my $dest;
597 my $source;
598 my $vm_type;
599
600 locked("$CONFIG_PATH/cron_and_state.lock", sub {
601 #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
602 eval { $job = get_job($param); };
603
604 if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
605 die "Job --source $param->{source} --name $param->{name} has been disabled\n";
606 }
607
608 $dest = parse_target($param->{dest});
609 $source = parse_target($param->{source});
610
611 $vm_type = vm_exists($source, $param->{source_user});
612 $source->{vm_type} = $vm_type;
613
614 if ($job) {
615 $job->{state} = "syncing";
616 $job->{vm_type} = $vm_type if !$job->{vm_type};
617 update_state($job);
618 }
619 }); #cron and state lock
620
621 my $sync_path = sub {
622 my ($source, $dest, $job, $param, $date) = @_;
623
624 ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{dest_user});
625
626 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
627
628 send_image($source, $dest, $param);
629
630 snapshot_destroy($source, $dest, $param->{method}, $dest->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $dest->{old_snap});
631
632 };
633
634 eval{
635 if ($source->{vmid}) {
636 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
637 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
638 my $disks = get_disks($source, $param->{source_user});
639
640 foreach my $disk (sort keys %{$disks}) {
641 $source->{all} = $disks->{$disk}->{all};
642 $source->{pool} = $disks->{$disk}->{pool};
643 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
644 $source->{last_part} = $disks->{$disk}->{last_part};
645 &$sync_path($source, $dest, $job, $param, $date);
646 }
647 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
648 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
649 } else {
650 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
651 }
652 } else {
653 &$sync_path($source, $dest, $job, $param, $date);
654 }
655 };
656 if (my $err = $@) {
657 locked("$CONFIG_PATH/cron_and_state.lock", sub {
658 eval { $job = get_job($param); };
659 if ($job) {
660 $job->{state} = "error";
661 update_state($job);
662 }
663 });
664 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
665 die "$err\n";
666 }
667
668 locked("$CONFIG_PATH/cron_and_state.lock", sub {
669 eval { $job = get_job($param); };
670 if ($job) {
671 if (defined($job->{state}) && $job->{state} eq "stopped") {
672 $job->{state} = "stopped";
673 } else {
674 $job->{state} = "ok";
675 }
676 $job->{lsync} = $date;
677 update_state($job);
678 }
679 });
680 }); #sync lock
681 }
682
683 sub snapshot_get{
684 my ($source, $dest, $max_snap, $name, $dest_user) = @_;
685
686 my $cmd = [];
687 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--', if $dest->{ip};
688 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
689
690 my $path = $dest->{all};
691 $path .= "/$source->{last_part}" if $source->{last_part};
692 push @$cmd, $path;
693
694 my $raw;
695 eval {$raw = run_cmd($cmd)};
696 if (my $erro =$@) { #this means the volume doesn't exist on dest yet
697 return undef;
698 }
699
700 my $index = 0;
701 my $line = "";
702 my $last_snap = undef;
703 my $old_snap;
704
705 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
706 $line = $1;
707 if ($line =~ m/@(.*)$/) {
708 $last_snap = $1 if (!$last_snap);
709 }
710 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
711 $old_snap = $1;
712 $index++;
713 if ($index == $max_snap) {
714 $source->{destroy} = 1;
715 last;
716 };
717 }
718 }
719
720 return ($old_snap, $last_snap) if $last_snap;
721
722 return undef;
723 }
724
725 sub snapshot_add {
726 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
727
728 my $snap_name = "rep_$name\_".$date;
729
730 $source->{new_snap} = $snap_name;
731
732 my $path = "$source->{all}\@$snap_name";
733
734 my $cmd = [];
735 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
736 push @$cmd, 'zfs', 'snapshot', $path;
737 eval{
738 run_cmd($cmd);
739 };
740
741 if (my $err = $@) {
742 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
743 die "$err\n";
744 }
745 }
746
747 sub get_disks {
748 my ($target, $user) = @_;
749
750 my $cmd = [];
751 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
752
753 if ($target->{vm_type} eq 'qemu') {
754 push @$cmd, 'qm', 'config', $target->{vmid};
755 } elsif ($target->{vm_type} eq 'lxc') {
756 push @$cmd, 'pct', 'config', $target->{vmid};
757 } else {
758 die "VM Type unknown\n";
759 }
760
761 my $res = run_cmd($cmd);
762
763 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
764
765 return $disks;
766 }
767
768 sub run_cmd {
769 my ($cmd) = @_;
770 print "Start CMD\n" if $DEBUG;
771 print Dumper $cmd if $DEBUG;
772 if (ref($cmd) eq 'ARRAY') {
773 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
774 }
775 my $output = `$cmd 2>&1`;
776
777 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
778
779 chomp($output);
780 print Dumper $output if $DEBUG;
781 print "END CMD\n" if $DEBUG;
782 return $output;
783 }
784
785 sub parse_disks {
786 my ($text, $ip, $vm_type, $user) = @_;
787
788 my $disks;
789
790 my $num = 0;
791 while ($text && $text =~ s/^(.*?)(\n|$)//) {
792 my $line = $1;
793
794 next if $line =~ /media=cdrom/;
795 next if $line !~ m/$DISK_KEY_RE/;
796
797 #QEMU if backup is not set include in sync
798 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
799
800 #LXC if backup is not set do no in sync
801 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
802
803 my $disk = undef;
804 my $stor = undef;
805 if($line =~ m/$DISK_KEY_RE(.*)$/) {
806 my @parameter = split(/,/,$1);
807
808 foreach my $opt (@parameter) {
809 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
810 $disk = $2;
811 $stor = $1;
812 last;
813 }
814 }
815 }
816 if (!defined($disk) || !defined($stor)) {
817 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
818 next;
819 }
820
821 my $cmd = [];
822 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
823 push @$cmd, 'pvesm', 'path', "$stor$disk";
824 my $path = run_cmd($cmd);
825
826 die "Get no path from pvesm path $stor$disk\n" if !$path;
827
828 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
829
830 my @array = split('/', $1);
831 $disks->{$num}->{pool} = shift(@array);
832 $disks->{$num}->{all} = $disks->{$num}->{pool};
833 if (0 < @array) {
834 $disks->{$num}->{path} = join('/', @array);
835 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
836 }
837 $disks->{$num}->{last_part} = $disk;
838 $disks->{$num}->{all} .= "\/$disk";
839
840 $num++;
841 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
842
843 $disks->{$num}->{pool} = $1;
844 $disks->{$num}->{all} = $disks->{$num}->{pool};
845
846 if ($2) {
847 $disks->{$num}->{path} = $3;
848 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
849 }
850
851 $disks->{$num}->{last_part} = $disk;
852 $disks->{$num}->{all} .= "\/$disk";
853
854 $num++;
855
856 } else {
857 die "ERROR: in path\n";
858 }
859 }
860
861 die "Vm include no disk on zfs.\n" if !$disks->{0};
862 return $disks;
863 }
864
865 sub snapshot_destroy {
866 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
867
868 my @zfscmd = ('zfs', 'destroy');
869 my $snapshot = "$source->{all}\@$snap";
870
871 eval {
872 if($source->{ip} && $method eq 'ssh'){
873 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
874 } else {
875 run_cmd([@zfscmd, $snapshot]);
876 }
877 };
878 if (my $erro = $@) {
879 warn "WARN: $erro";
880 }
881 if ($dest) {
882 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
883
884 my $path = "$dest->{all}";
885 $path .= "/$source->{last_part}" if $source->{last_part};
886
887 eval {
888 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
889 };
890 if (my $erro = $@) {
891 warn "WARN: $erro";
892 }
893 }
894 }
895
896 # check if snapshot for incremental sync exist on source side
897 sub snapshot_exist {
898 my ($source , $dest, $method, $source_user) = @_;
899
900 my $cmd = [];
901 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip};
902 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
903
904 my $path = $source->{all};
905 $path .= "\@$dest->{last_snap}";
906
907 push @$cmd, $path;
908
909 eval {run_cmd($cmd)};
910 if (my $erro =$@) {
911 warn "WARN: $erro";
912 return undef;
913 }
914 return 1;
915 }
916
917 sub send_image {
918 my ($source, $dest, $param) = @_;
919
920 my $cmd = [];
921
922 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
923 push @$cmd, 'zfs', 'send';
924 push @$cmd, '-p', if $param->{properties};
925 push @$cmd, '-v' if $param->{verbose};
926
927 if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) {
928 push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}";
929 }
930 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
931
932 if ($param->{limit}){
933 my $bwl = $param->{limit}*1024;
934 push @$cmd, \'|', 'cstream', '-t', $bwl;
935 }
936 my $target = "$dest->{all}";
937 $target .= "/$source->{last_part}" if $source->{last_part};
938 $target =~ s!/+!/!g;
939
940 push @$cmd, \'|';
941 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
942 push @$cmd, 'zfs', 'recv', '-F', '--';
943 push @$cmd, "$target";
944
945 eval {
946 run_cmd($cmd)
947 };
948
949 if (my $erro = $@) {
950 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
951 die $erro;
952 };
953 }
954
955
956 sub send_config{
957 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
958
959 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
960 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
961
962 my $config_dir = $dest_config_path // $CONFIG_PATH;
963 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
964
965 $dest_target_new = $config_dir.'/'.$dest_target_new;
966
967 if ($method eq 'ssh'){
968 if ($dest->{ip} && $source->{ip}) {
969 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
970 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
971 } elsif ($dest->{ip}) {
972 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
973 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
974 } elsif ($source->{ip}) {
975 run_cmd(['mkdir', '-p', '--', $config_dir]);
976 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
977 }
978
979 if ($source->{destroy}){
980 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$dest->{old_snap}";
981 if($dest->{ip}){
982 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
983 } else {
984 run_cmd(['rm', '-f', '--', $dest_target_old]);
985 }
986 }
987 } elsif ($method eq 'local') {
988 run_cmd(['mkdir', '-p', '--', $config_dir]);
989 run_cmd(['cp', $source_target, $dest_target_new]);
990 }
991 }
992
993 sub get_date {
994 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
995 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
996
997 return $datestamp;
998 }
999
1000 sub status {
1001 my $cfg = read_cron();
1002
1003 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1004
1005 my $states = read_state();
1006
1007 foreach my $source (sort keys%{$cfg}) {
1008 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1009 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1010 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1011 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1012 }
1013 }
1014
1015 return $status_list;
1016 }
1017
1018 sub enable_job {
1019 my ($param) = @_;
1020
1021 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1022 my $job = get_job($param);
1023 $job->{state} = "ok";
1024 update_state($job);
1025 update_cron($job);
1026 });
1027 }
1028
1029 sub disable_job {
1030 my ($param) = @_;
1031
1032 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1033 my $job = get_job($param);
1034 $job->{state} = "stopped";
1035 update_state($job);
1036 update_cron($job);
1037 });
1038 }
1039
1040 my $cmd_help = {
1041 destroy => qq{
1042 $PROGNAME destroy -source <string> [OPTIONS]
1043
1044 remove a sync Job from the scheduler
1045
1046 -name string
1047
1048 name of the sync job, if not set it is default
1049
1050 -source string
1051
1052 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1053 },
1054 create => qq{
1055 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1056
1057 Create a sync Job
1058
1059 -dest string
1060
1061 the destination target is like [IP]:<Pool>[/Path]
1062
1063 -dest-user string
1064
1065 name of the user on the destination target, root by default
1066
1067 -limit integer
1068
1069 max sync speed in kBytes/s, default unlimited
1070
1071 -maxsnap string
1072
1073 how much snapshots will be kept before get erased, default 1
1074
1075 -name string
1076
1077 name of the sync job, if not set it is default
1078
1079 -skip boolean
1080
1081 if this flag is set it will skip the first sync
1082
1083 -source string
1084
1085 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1086
1087 -source-user string
1088
1089 name of the user on the source target, root by default
1090
1091 -properties boolean
1092
1093 Include the dataset's properties in the stream.
1094
1095 -dest-config-path string
1096
1097 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1098 },
1099 sync => qq{
1100 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1101
1102 will sync one time
1103
1104 -dest string
1105
1106 the destination target is like [IP:]<Pool>[/Path]
1107
1108 -dest-user string
1109
1110 name of the user on the destination target, root by default
1111
1112 -limit integer
1113
1114 max sync speed in kBytes/s, default unlimited
1115
1116 -maxsnap integer
1117
1118 how much snapshots will be kept before get erased, default 1
1119
1120 -name string
1121
1122 name of the sync job, if not set it is default.
1123 It is only necessary if scheduler allready contains this source.
1124
1125 -source string
1126
1127 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1128
1129 -source-user string
1130
1131 name of the user on the source target, root by default
1132
1133 -verbose boolean
1134
1135 print out the sync progress.
1136
1137 -properties boolean
1138
1139 Include the dataset's properties in the stream.
1140
1141 -dest-config-path string
1142
1143 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1144 },
1145 list => qq{
1146 $PROGNAME list
1147
1148 Get a List of all scheduled Sync Jobs
1149 },
1150 status => qq{
1151 $PROGNAME status
1152
1153 Get the status of all scheduled Sync Jobs
1154 },
1155 help => qq{
1156 $PROGNAME help <cmd> [OPTIONS]
1157
1158 Get help about specified command.
1159
1160 <cmd> string
1161
1162 Command name
1163
1164 -verbose boolean
1165
1166 Verbose output format.
1167 },
1168 enable => qq{
1169 $PROGNAME enable -source <string> [OPTIONS]
1170
1171 enable a syncjob and reset error
1172
1173 -name string
1174
1175 name of the sync job, if not set it is default
1176
1177 -source string
1178
1179 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1180 },
1181 disable => qq{
1182 $PROGNAME disable -source <string> [OPTIONS]
1183
1184 pause a sync job
1185
1186 -name string
1187
1188 name of the sync job, if not set it is default
1189
1190 -source string
1191
1192 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1193 },
1194 printpod => 'internal command',
1195
1196 };
1197
1198 if (!$command) {
1199 usage(); die "\n";
1200 } elsif (!$cmd_help->{$command}) {
1201 print "ERROR: unknown command '$command'";
1202 usage(1); die "\n";
1203 }
1204
1205 my @arg = @ARGV;
1206 my $param = parse_argv(@arg);
1207
1208 sub check_params {
1209 for (@_) {
1210 die "$cmd_help->{$command}\n" if !$param->{$_};
1211 }
1212 }
1213
1214 if ($command eq 'destroy') {
1215 check_params(qw(source));
1216
1217 check_target($param->{source});
1218 destroy_job($param);
1219
1220 } elsif ($command eq 'sync') {
1221 check_params(qw(source dest));
1222
1223 check_target($param->{source});
1224 check_target($param->{dest});
1225 sync($param);
1226
1227 } elsif ($command eq 'create') {
1228 check_params(qw(source dest));
1229
1230 check_target($param->{source});
1231 check_target($param->{dest});
1232 init($param);
1233
1234 } elsif ($command eq 'status') {
1235 print status();
1236
1237 } elsif ($command eq 'list') {
1238 print list();
1239
1240 } elsif ($command eq 'help') {
1241 my $help_command = $ARGV[1];
1242
1243 if ($help_command && $cmd_help->{$help_command}) {
1244 die "$cmd_help->{$help_command}\n";
1245
1246 }
1247 if ($param->{verbose}) {
1248 exec("man $PROGNAME");
1249
1250 } else {
1251 usage(1);
1252
1253 }
1254
1255 } elsif ($command eq 'enable') {
1256 check_params(qw(source));
1257
1258 check_target($param->{source});
1259 enable_job($param);
1260
1261 } elsif ($command eq 'disable') {
1262 check_params(qw(source));
1263
1264 check_target($param->{source});
1265 disable_job($param);
1266
1267 } elsif ($command eq 'printpod') {
1268 print_pod();
1269 }
1270
1271 sub usage {
1272 my ($help) = @_;
1273
1274 print("ERROR:\tno command specified\n") if !$help;
1275 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1276 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1277 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1278 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1279 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1280 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1281 print("\t$PROGNAME list\n");
1282 print("\t$PROGNAME status\n");
1283 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1284 }
1285
1286 sub check_target {
1287 my ($target) = @_;
1288 parse_target($target);
1289 }
1290
1291 sub print_pod {
1292
1293 my $synopsis = join("\n", sort values %$cmd_help);
1294
1295 print <<EOF;
1296 =head1 NAME
1297
1298 pve-zsync - PVE ZFS Replication Manager
1299
1300 =head1 SYNOPSIS
1301
1302 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1303
1304 $synopsis
1305
1306 =head1 DESCRIPTION
1307
1308 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1309 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1310 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1311 To config cron see man crontab.
1312
1313 =head2 PVE ZFS Storage sync Tool
1314
1315 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1316
1317 =head1 EXAMPLES
1318
1319 add sync job from local VM to remote ZFS Server
1320 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1321
1322 =head1 IMPORTANT FILES
1323
1324 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1325
1326 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1327
1328 =head1 COPYRIGHT AND DISCLAIMER
1329
1330 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1331
1332 This program is free software: you can redistribute it and/or modify it
1333 under the terms of the GNU Affero General Public License as published
1334 by the Free Software Foundation, either version 3 of the License, or
1335 (at your option) any later version.
1336
1337 This program is distributed in the hope that it will be useful, but
1338 WITHOUT ANY WARRANTY; without even the implied warranty of
1339 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1340 Affero General Public License for more details.
1341
1342 You should have received a copy of the GNU Affero General Public
1343 License along with this program. If not, see
1344 <http://www.gnu.org/licenses/>.
1345
1346 EOF
1347 }