]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
whitespace fix
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $PROG_PATH = "$PATH/${PROGNAME}";
22 my $INTERVAL = 15;
23 my $DEBUG;
24
25 BEGIN {
26 $DEBUG = 0; # change default here. not above on declaration!
27 $DEBUG ||= $ENV{ZSYNC_DEBUG};
28 if ($DEBUG) {
29 require Data::Dumper;
30 Data::Dumper->import();
31 }
32 }
33
34 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
35 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
36 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
37 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
38
39 my $IPV6RE = "(?:" .
40 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
41 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
42 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
43 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
49
50 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
51 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
52 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
53 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
54 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
55
56 my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|mp)\d+)|rootfs): /;
57
58 my $INSTANCE_ID = get_instance_id($$);
59
60 my $command = $ARGV[0];
61
62 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
63 check_bin ('cstream');
64 check_bin ('zfs');
65 check_bin ('ssh');
66 check_bin ('scp');
67 }
68
69 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
70 die "Signaled, aborting sync: $!\n";
71 };
72
73 sub check_bin {
74 my ($bin) = @_;
75
76 foreach my $p (split (/:/, $ENV{PATH})) {
77 my $fn = "$p/$bin";
78 if (-x $fn) {
79 return $fn;
80 }
81 }
82
83 die "unable to find command '$bin'\n";
84 }
85
86 sub read_file {
87 my ($filename, $one_line_only) = @_;
88
89 my $fh = IO::File->new($filename, "r")
90 or die "Could not open file ${filename}: $!\n";
91
92 my $text = $one_line_only ? <$fh> : [ <$fh> ];
93
94 close($fh);
95
96 return $text;
97 }
98
99 sub cut_target_width {
100 my ($path, $maxlen) = @_;
101 $path =~ s@/+@/@g;
102
103 return $path if length($path) <= $maxlen;
104
105 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
106
107 $path =~ s@/([^/]+/?)$@@;
108 my $tail = $1;
109
110 if (length($tail)+3 == $maxlen) {
111 return "../$tail";
112 } elsif (length($tail)+2 >= $maxlen) {
113 return '..'.substr($tail, -$maxlen+2)
114 }
115
116 $path =~ s@(/[^/]+)(?:/|$)@@;
117 my $head = $1;
118 my $both = length($head) + length($tail);
119 my $remaining = $maxlen-$both-4; # -4 for "/../"
120
121 if ($remaining < 0) {
122 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
123 }
124
125 substr($path, ($remaining/2), (length($path)-$remaining), '..');
126 return "$head/" . $path . "/$tail";
127 }
128
129 sub locked {
130 my ($lock_fn, $code) = @_;
131
132 my $lock_fh = IO::File->new("> $lock_fn");
133
134 flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
135 my $res = eval { $code->() };
136 my $err = $@;
137
138 flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
139 die "$err" if $err;
140
141 close($lock_fh);
142 return $res;
143 }
144
145 sub get_status {
146 my ($source, $name, $status) = @_;
147
148 if ($status->{$source->{all}}->{$name}->{status}) {
149 return $status;
150 }
151
152 return undef;
153 }
154
155 sub check_pool_exists {
156 my ($target, $user) = @_;
157
158 my $cmd = [];
159
160 if ($target->{ip}) {
161 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
162 }
163 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
164 eval {
165 run_cmd($cmd);
166 };
167
168 if ($@) {
169 return 0;
170 }
171 return 1;
172 }
173
174 sub parse_target {
175 my ($text) = @_;
176
177 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
178 my $target = {};
179
180 if ($text !~ $TARGETRE) {
181 die "$errstr\n";
182 }
183 $target->{all} = $2;
184 $target->{ip} = $1 if $1;
185 my @parts = split('/', $2);
186
187 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
188
189 my $pool = $target->{pool} = shift(@parts);
190 die "$errstr\n" if !$pool;
191
192 if ($pool =~ m/^\d+$/) {
193 $target->{vmid} = $pool;
194 delete $target->{pool};
195 }
196
197 return $target if (@parts == 0);
198 $target->{last_part} = pop(@parts);
199
200 if ($target->{ip}) {
201 pop(@parts);
202 }
203 if (@parts > 0) {
204 $target->{path} = join('/', @parts);
205 }
206
207 return $target;
208 }
209
210 sub read_cron {
211
212 #This is for the first use to init file;
213 if (!-e $CRONJOBS) {
214 my $new_fh = IO::File->new("> $CRONJOBS");
215 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
216 close($new_fh);
217 return undef;
218 }
219
220 my $text = read_file($CRONJOBS, 0);
221
222 return encode_cron(@{$text});
223 }
224
225 sub parse_argv {
226 my (@arg) = @_;
227
228 my $param = {
229 dest => undef,
230 source => undef,
231 verbose => undef,
232 limit => undef,
233 maxsnap => undef,
234 name => undef,
235 skip => undef,
236 method => undef,
237 source_user => undef,
238 dest_user => undef,
239 properties => undef,
240 dest_config_path => undef,
241 };
242
243 my ($ret) = GetOptionsFromArray(
244 \@arg,
245 'dest=s' => \$param->{dest},
246 'source=s' => \$param->{source},
247 'verbose' => \$param->{verbose},
248 'limit=i' => \$param->{limit},
249 'maxsnap=i' => \$param->{maxsnap},
250 'name=s' => \$param->{name},
251 'skip' => \$param->{skip},
252 'method=s' => \$param->{method},
253 'source-user=s' => \$param->{source_user},
254 'dest-user=s' => \$param->{dest_user},
255 'properties' => \$param->{properties},
256 'dest-config-path=s' => \$param->{dest_config_path},
257 );
258
259 die "can't parse options\n" if $ret == 0;
260
261 $param->{name} //= "default";
262 $param->{maxsnap} //= 1;
263 $param->{method} //= "ssh";
264 $param->{source_user} //= "root";
265 $param->{dest_user} //= "root";
266
267 return $param;
268 }
269
270 sub add_state_to_job {
271 my ($job) = @_;
272
273 my $states = read_state();
274 my $state = $states->{$job->{source}}->{$job->{name}};
275
276 $job->{state} = $state->{state};
277 $job->{lsync} = $state->{lsync};
278 $job->{vm_type} = $state->{vm_type};
279 $job->{instance_id} = $state->{instance_id};
280
281 for (my $i = 0; $state->{"snap$i"}; $i++) {
282 $job->{"snap$i"} = $state->{"snap$i"};
283 }
284
285 return $job;
286 }
287
288 sub encode_cron {
289 my (@text) = @_;
290
291 my $cfg = {};
292
293 while (my $line = shift(@text)) {
294
295 my @arg = split('\s', $line);
296 my $param = parse_argv(@arg);
297
298 if ($param->{source} && $param->{dest}) {
299 my $source = delete $param->{source};
300 my $name = delete $param->{name};
301
302 $cfg->{$source}->{$name} = $param;
303 }
304 }
305
306 return $cfg;
307 }
308
309 sub param_to_job {
310 my ($param) = @_;
311
312 my $job = {};
313
314 my $source = parse_target($param->{source});
315 my $dest = parse_target($param->{dest}) if $param->{dest};
316
317 $job->{name} = !$param->{name} ? "default" : $param->{name};
318 $job->{dest} = $param->{dest} if $param->{dest};
319 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
320 $job->{method} = "ssh" if !$job->{method};
321 $job->{limit} = $param->{limit};
322 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
323 $job->{source} = $param->{source};
324 $job->{source_user} = $param->{source_user};
325 $job->{dest_user} = $param->{dest_user};
326 $job->{properties} = !!$param->{properties};
327 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
328
329 return $job;
330 }
331
332 sub read_state {
333
334 if (!-e $STATE) {
335 make_path $CONFIG_PATH;
336 my $new_fh = IO::File->new("> $STATE");
337 die "Could not create $STATE: $!\n" if !$new_fh;
338 print $new_fh "{}";
339 close($new_fh);
340 return undef;
341 }
342
343 my $text = read_file($STATE, 1);
344 return decode_json($text);
345 }
346
347 sub update_state {
348 my ($job) = @_;
349
350 my $text = eval { read_file($STATE, 1); };
351
352 my $out_fh = IO::File->new("> $STATE.new");
353 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
354
355 my $states = {};
356 my $state = {};
357 if ($text){
358 $states = decode_json($text);
359 $state = $states->{$job->{source}}->{$job->{name}};
360 }
361
362 if ($job->{state} ne "del") {
363 $state->{state} = $job->{state};
364 $state->{lsync} = $job->{lsync};
365 $state->{instance_id} = $job->{instance_id};
366 $state->{vm_type} = $job->{vm_type};
367
368 for (my $i = 0; $job->{"snap$i"} ; $i++) {
369 $state->{"snap$i"} = $job->{"snap$i"};
370 }
371 $states->{$job->{source}}->{$job->{name}} = $state;
372 } else {
373
374 delete $states->{$job->{source}}->{$job->{name}};
375 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
376 }
377
378 $text = encode_json($states);
379 print $out_fh $text;
380
381 close($out_fh);
382 rename "$STATE.new", $STATE;
383
384 return $states;
385 }
386
387 sub update_cron {
388 my ($job) = @_;
389
390 my $updated;
391 my $has_header;
392 my $line_no = 0;
393 my $text = "";
394 my $header = "SHELL=/bin/sh\n";
395 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
396
397 my $current = read_file($CRONJOBS, 0);
398
399 foreach my $line (@{$current}) {
400 chomp($line);
401 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
402 $updated = 1;
403 next if $job->{state} eq "del";
404 $text .= format_job($job, $line);
405 } else {
406 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
407 $has_header = 1;
408 }
409 $text .= "$line\n";
410 }
411 $line_no++;
412 }
413
414 if (!$has_header) {
415 $text = "$header$text";
416 }
417
418 if (!$updated) {
419 $text .= format_job($job);
420 }
421 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
422 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
423
424 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
425 close ($new_fh);
426
427 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
428 }
429
430 sub format_job {
431 my ($job, $line) = @_;
432 my $text = "";
433
434 if ($job->{state} eq "stopped") {
435 $text = "#";
436 }
437 if ($line) {
438 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
439 $text .= $1;
440 } else {
441 $text .= "*/$INTERVAL * * * *";
442 }
443 $text .= " root";
444 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
445 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
446 $text .= " --limit $job->{limit}" if $job->{limit};
447 $text .= " --method $job->{method}";
448 $text .= " --verbose" if $job->{verbose};
449 $text .= " --source-user $job->{source_user}";
450 $text .= " --dest-user $job->{dest_user}";
451 $text .= " --properties" if $job->{properties};
452 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
453 $text .= "\n";
454
455 return $text;
456 }
457
458 sub list {
459
460 my $cfg = read_cron();
461
462 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
463
464 my $states = read_state();
465 foreach my $source (sort keys%{$cfg}) {
466 foreach my $name (sort keys%{$cfg->{$source}}) {
467 $list .= sprintf("%-25s", cut_target_width($source, 25));
468 $list .= sprintf("%-25s", cut_target_width($name, 25));
469 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
470 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
471 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
472 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
473 }
474 }
475
476 return $list;
477 }
478
479 sub vm_exists {
480 my ($target, $user) = @_;
481
482 return undef if !defined($target->{vmid});
483
484 my $conf_fn = "$target->{vmid}.conf";
485
486 if ($target->{ip}) {
487 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
488 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
489 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
490 } else {
491 return "qemu" if -f "$QEMU_CONF/$conf_fn";
492 return "lxc" if -f "$LXC_CONF/$conf_fn";
493 }
494
495 return undef;
496 }
497
498 sub init {
499 my ($param) = @_;
500
501 locked("$CONFIG_PATH/cron_and_state.lock", sub {
502 my $cfg = read_cron();
503
504 my $job = param_to_job($param);
505
506 $job->{state} = "ok";
507 $job->{lsync} = 0;
508
509 my $source = parse_target($param->{source});
510 my $dest = parse_target($param->{dest});
511
512 if (my $ip = $dest->{ip}) {
513 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
514 }
515
516 if (my $ip = $source->{ip}) {
517 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
518 }
519
520 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
521
522 if (!defined($source->{vmid})) {
523 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
524 }
525
526 my $vm_type = vm_exists($source, $param->{source_user});
527 $job->{vm_type} = $vm_type;
528 $source->{vm_type} = $vm_type;
529
530 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
531
532 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
533
534 #check if vm has zfs disks if not die;
535 get_disks($source, $param->{source_user}) if $source->{vmid};
536
537 update_cron($job);
538 update_state($job);
539 }); #cron and state lock
540
541 return if $param->{skip};
542
543 eval { sync($param) };
544 if (my $err = $@) {
545 destroy_job($param);
546 print $err;
547 }
548 }
549
550 sub get_job {
551 my ($param) = @_;
552
553 my $cfg = read_cron();
554
555 if (!$cfg->{$param->{source}}->{$param->{name}}) {
556 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
557 }
558 my $job = $cfg->{$param->{source}}->{$param->{name}};
559 $job->{name} = $param->{name};
560 $job->{source} = $param->{source};
561 $job = add_state_to_job($job);
562
563 return $job;
564 }
565
566 sub destroy_job {
567 my ($param) = @_;
568
569 locked("$CONFIG_PATH/cron_and_state.lock", sub {
570 my $job = get_job($param);
571 $job->{state} = "del";
572
573 update_cron($job);
574 update_state($job);
575 });
576 }
577
578 sub get_instance_id {
579 my ($pid) = @_;
580
581 my $stat = read_file("/proc/$pid/stat", 1)
582 or die "unable to read process stats\n";
583 my $boot_id = read_file("/proc/sys/kernel/random/boot_id", 1)
584 or die "unable to read boot ID\n";
585
586 my $stats = [ split(/\s+/, $stat) ];
587 my $starttime = $stats->[21];
588 chomp($boot_id);
589
590 return "${pid}:${starttime}:${boot_id}";
591 }
592
593 sub instance_exists {
594 my ($instance_id) = @_;
595
596 if (defined($instance_id) && $instance_id =~ m/^([1-9][0-9]*):/) {
597 my $pid = $1;
598 my $actual_id = eval { get_instance_id($pid); };
599 return defined($actual_id) && $actual_id eq $instance_id;
600 }
601
602 return 0;
603 }
604
605 sub sync {
606 my ($param) = @_;
607
608 my $job;
609
610 locked("$CONFIG_PATH/cron_and_state.lock", sub {
611 eval { $job = get_job($param) };
612
613 if ($job) {
614 my $state = $job->{state} // 'ok';
615 $state = 'ok' if !instance_exists($job->{instance_id});
616
617 if ($state eq "syncing" || $state eq "waiting") {
618 die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
619 }
620
621 $job->{state} = "waiting";
622 $job->{instance_id} = $INSTANCE_ID;
623
624 update_state($job);
625 }
626 });
627
628 locked("$CONFIG_PATH/sync.lock", sub {
629
630 my $date = get_date();
631
632 my $dest;
633 my $source;
634 my $vm_type;
635
636 locked("$CONFIG_PATH/cron_and_state.lock", sub {
637 #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
638 eval { $job = get_job($param); };
639
640 if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
641 die "Job --source $param->{source} --name $param->{name} has been disabled\n";
642 }
643
644 $dest = parse_target($param->{dest});
645 $source = parse_target($param->{source});
646
647 $vm_type = vm_exists($source, $param->{source_user});
648 $source->{vm_type} = $vm_type;
649
650 if ($job) {
651 $job->{state} = "syncing";
652 $job->{vm_type} = $vm_type if !$job->{vm_type};
653 update_state($job);
654 }
655 }); #cron and state lock
656
657 my $sync_path = sub {
658 my ($source, $dest, $job, $param, $date) = @_;
659
660 ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{dest_user});
661
662 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
663
664 send_image($source, $dest, $param);
665
666 snapshot_destroy($source, $dest, $param->{method}, $dest->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $dest->{old_snap});
667
668 };
669
670 eval{
671 if ($source->{vmid}) {
672 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
673 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
674 my $disks = get_disks($source, $param->{source_user});
675
676 foreach my $disk (sort keys %{$disks}) {
677 $source->{all} = $disks->{$disk}->{all};
678 $source->{pool} = $disks->{$disk}->{pool};
679 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
680 $source->{last_part} = $disks->{$disk}->{last_part};
681 &$sync_path($source, $dest, $job, $param, $date);
682 }
683 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
684 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
685 } else {
686 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
687 }
688 } else {
689 &$sync_path($source, $dest, $job, $param, $date);
690 }
691 };
692 if (my $err = $@) {
693 locked("$CONFIG_PATH/cron_and_state.lock", sub {
694 eval { $job = get_job($param); };
695 if ($job) {
696 $job->{state} = "error";
697 delete $job->{instance_id};
698 update_state($job);
699 }
700 });
701 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
702 die "$err\n";
703 }
704
705 locked("$CONFIG_PATH/cron_and_state.lock", sub {
706 eval { $job = get_job($param); };
707 if ($job) {
708 if (defined($job->{state}) && $job->{state} eq "stopped") {
709 $job->{state} = "stopped";
710 } else {
711 $job->{state} = "ok";
712 }
713 $job->{lsync} = $date;
714 delete $job->{instance_id};
715 update_state($job);
716 }
717 });
718 }); #sync lock
719 }
720
721 sub snapshot_get{
722 my ($source, $dest, $max_snap, $name, $dest_user) = @_;
723
724 my $cmd = [];
725 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--', if $dest->{ip};
726 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
727
728 my $path = $dest->{all};
729 $path .= "/$source->{last_part}" if $source->{last_part};
730 push @$cmd, $path;
731
732 my $raw;
733 eval {$raw = run_cmd($cmd)};
734 if (my $erro =$@) { #this means the volume doesn't exist on dest yet
735 return undef;
736 }
737
738 my $index = 0;
739 my $line = "";
740 my $last_snap = undef;
741 my $old_snap;
742
743 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
744 $line = $1;
745 if ($line =~ m/@(.*)$/) {
746 $last_snap = $1 if (!$last_snap);
747 }
748 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
749 $old_snap = $1;
750 $index++;
751 if ($index == $max_snap) {
752 $source->{destroy} = 1;
753 last;
754 };
755 }
756 }
757
758 return ($old_snap, $last_snap) if $last_snap;
759
760 return undef;
761 }
762
763 sub snapshot_add {
764 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
765
766 my $snap_name = "rep_$name\_".$date;
767
768 $source->{new_snap} = $snap_name;
769
770 my $path = "$source->{all}\@$snap_name";
771
772 my $cmd = [];
773 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
774 push @$cmd, 'zfs', 'snapshot', $path;
775 eval{
776 run_cmd($cmd);
777 };
778
779 if (my $err = $@) {
780 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
781 die "$err\n";
782 }
783 }
784
785 sub get_disks {
786 my ($target, $user) = @_;
787
788 my $cmd = [];
789 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
790
791 if ($target->{vm_type} eq 'qemu') {
792 push @$cmd, 'qm', 'config', $target->{vmid};
793 } elsif ($target->{vm_type} eq 'lxc') {
794 push @$cmd, 'pct', 'config', $target->{vmid};
795 } else {
796 die "VM Type unknown\n";
797 }
798
799 my $res = run_cmd($cmd);
800
801 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
802
803 return $disks;
804 }
805
806 sub run_cmd {
807 my ($cmd) = @_;
808 print "Start CMD\n" if $DEBUG;
809 print Dumper $cmd if $DEBUG;
810 if (ref($cmd) eq 'ARRAY') {
811 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
812 }
813 my $output = `$cmd 2>&1`;
814
815 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
816
817 chomp($output);
818 print Dumper $output if $DEBUG;
819 print "END CMD\n" if $DEBUG;
820 return $output;
821 }
822
823 sub parse_disks {
824 my ($text, $ip, $vm_type, $user) = @_;
825
826 my $disks;
827
828 my $num = 0;
829 while ($text && $text =~ s/^(.*?)(\n|$)//) {
830 my $line = $1;
831
832 next if $line =~ /media=cdrom/;
833 next if $line !~ m/$DISK_KEY_RE/;
834
835 #QEMU if backup is not set include in sync
836 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
837
838 #LXC if backup is not set do no in sync
839 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
840
841 my $disk = undef;
842 my $stor = undef;
843 if($line =~ m/$DISK_KEY_RE(.*)$/) {
844 my @parameter = split(/,/,$1);
845
846 foreach my $opt (@parameter) {
847 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
848 $disk = $2;
849 $stor = $1;
850 last;
851 }
852 }
853 }
854 if (!defined($disk) || !defined($stor)) {
855 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
856 next;
857 }
858
859 my $cmd = [];
860 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
861 push @$cmd, 'pvesm', 'path', "$stor$disk";
862 my $path = run_cmd($cmd);
863
864 die "Get no path from pvesm path $stor$disk\n" if !$path;
865
866 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
867
868 my @array = split('/', $1);
869 $disks->{$num}->{pool} = shift(@array);
870 $disks->{$num}->{all} = $disks->{$num}->{pool};
871 if (0 < @array) {
872 $disks->{$num}->{path} = join('/', @array);
873 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
874 }
875 $disks->{$num}->{last_part} = $disk;
876 $disks->{$num}->{all} .= "\/$disk";
877
878 $num++;
879 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
880
881 $disks->{$num}->{pool} = $1;
882 $disks->{$num}->{all} = $disks->{$num}->{pool};
883
884 if ($2) {
885 $disks->{$num}->{path} = $3;
886 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
887 }
888
889 $disks->{$num}->{last_part} = $disk;
890 $disks->{$num}->{all} .= "\/$disk";
891
892 $num++;
893
894 } else {
895 die "ERROR: in path\n";
896 }
897 }
898
899 die "Vm include no disk on zfs.\n" if !$disks->{0};
900 return $disks;
901 }
902
903 sub snapshot_destroy {
904 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
905
906 my @zfscmd = ('zfs', 'destroy');
907 my $snapshot = "$source->{all}\@$snap";
908
909 eval {
910 if($source->{ip} && $method eq 'ssh'){
911 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
912 } else {
913 run_cmd([@zfscmd, $snapshot]);
914 }
915 };
916 if (my $erro = $@) {
917 warn "WARN: $erro";
918 }
919 if ($dest) {
920 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
921
922 my $path = "$dest->{all}";
923 $path .= "/$source->{last_part}" if $source->{last_part};
924
925 eval {
926 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
927 };
928 if (my $erro = $@) {
929 warn "WARN: $erro";
930 }
931 }
932 }
933
934 # check if snapshot for incremental sync exist on source side
935 sub snapshot_exist {
936 my ($source , $dest, $method, $source_user) = @_;
937
938 my $cmd = [];
939 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip};
940 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
941
942 my $path = $source->{all};
943 $path .= "\@$dest->{last_snap}";
944
945 push @$cmd, $path;
946
947 eval {run_cmd($cmd)};
948 if (my $erro =$@) {
949 warn "WARN: $erro";
950 return undef;
951 }
952 return 1;
953 }
954
955 sub send_image {
956 my ($source, $dest, $param) = @_;
957
958 my $cmd = [];
959
960 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
961 push @$cmd, 'zfs', 'send';
962 push @$cmd, '-p', if $param->{properties};
963 push @$cmd, '-v' if $param->{verbose};
964
965 if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) {
966 push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}";
967 }
968 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
969
970 if ($param->{limit}){
971 my $bwl = $param->{limit}*1024;
972 push @$cmd, \'|', 'cstream', '-t', $bwl;
973 }
974 my $target = "$dest->{all}";
975 $target .= "/$source->{last_part}" if $source->{last_part};
976 $target =~ s!/+!/!g;
977
978 push @$cmd, \'|';
979 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
980 push @$cmd, 'zfs', 'recv', '-F', '--';
981 push @$cmd, "$target";
982
983 eval {
984 run_cmd($cmd)
985 };
986
987 if (my $erro = $@) {
988 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
989 die $erro;
990 };
991 }
992
993
994 sub send_config{
995 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
996
997 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
998 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
999
1000 my $config_dir = $dest_config_path // $CONFIG_PATH;
1001 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
1002
1003 $dest_target_new = $config_dir.'/'.$dest_target_new;
1004
1005 if ($method eq 'ssh'){
1006 if ($dest->{ip} && $source->{ip}) {
1007 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1008 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1009 } elsif ($dest->{ip}) {
1010 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1011 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1012 } elsif ($source->{ip}) {
1013 run_cmd(['mkdir', '-p', '--', $config_dir]);
1014 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
1015 }
1016
1017 if ($source->{destroy}){
1018 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$dest->{old_snap}";
1019 if($dest->{ip}){
1020 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
1021 } else {
1022 run_cmd(['rm', '-f', '--', $dest_target_old]);
1023 }
1024 }
1025 } elsif ($method eq 'local') {
1026 run_cmd(['mkdir', '-p', '--', $config_dir]);
1027 run_cmd(['cp', $source_target, $dest_target_new]);
1028 }
1029 }
1030
1031 sub get_date {
1032 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1033 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1034
1035 return $datestamp;
1036 }
1037
1038 sub status {
1039 my $cfg = read_cron();
1040
1041 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1042
1043 my $states = read_state();
1044
1045 foreach my $source (sort keys%{$cfg}) {
1046 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1047 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1048 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1049 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1050 }
1051 }
1052
1053 return $status_list;
1054 }
1055
1056 sub enable_job {
1057 my ($param) = @_;
1058
1059 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1060 my $job = get_job($param);
1061 $job->{state} = "ok";
1062 update_state($job);
1063 update_cron($job);
1064 });
1065 }
1066
1067 sub disable_job {
1068 my ($param) = @_;
1069
1070 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1071 my $job = get_job($param);
1072 $job->{state} = "stopped";
1073 update_state($job);
1074 update_cron($job);
1075 });
1076 }
1077
1078 my $cmd_help = {
1079 destroy => qq{
1080 $PROGNAME destroy -source <string> [OPTIONS]
1081
1082 remove a sync Job from the scheduler
1083
1084 -name string
1085
1086 name of the sync job, if not set it is default
1087
1088 -source string
1089
1090 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1091 },
1092 create => qq{
1093 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1094
1095 Create a sync Job
1096
1097 -dest string
1098
1099 the destination target is like [IP]:<Pool>[/Path]
1100
1101 -dest-user string
1102
1103 name of the user on the destination target, root by default
1104
1105 -limit integer
1106
1107 max sync speed in kBytes/s, default unlimited
1108
1109 -maxsnap string
1110
1111 how much snapshots will be kept before get erased, default 1
1112
1113 -name string
1114
1115 name of the sync job, if not set it is default
1116
1117 -skip boolean
1118
1119 if this flag is set it will skip the first sync
1120
1121 -source string
1122
1123 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1124
1125 -source-user string
1126
1127 name of the user on the source target, root by default
1128
1129 -properties boolean
1130
1131 Include the dataset's properties in the stream.
1132
1133 -dest-config-path string
1134
1135 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1136 },
1137 sync => qq{
1138 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1139
1140 will sync one time
1141
1142 -dest string
1143
1144 the destination target is like [IP:]<Pool>[/Path]
1145
1146 -dest-user string
1147
1148 name of the user on the destination target, root by default
1149
1150 -limit integer
1151
1152 max sync speed in kBytes/s, default unlimited
1153
1154 -maxsnap integer
1155
1156 how much snapshots will be kept before get erased, default 1
1157
1158 -name string
1159
1160 name of the sync job, if not set it is default.
1161 It is only necessary if scheduler allready contains this source.
1162
1163 -source string
1164
1165 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1166
1167 -source-user string
1168
1169 name of the user on the source target, root by default
1170
1171 -verbose boolean
1172
1173 print out the sync progress.
1174
1175 -properties boolean
1176
1177 Include the dataset's properties in the stream.
1178
1179 -dest-config-path string
1180
1181 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1182 },
1183 list => qq{
1184 $PROGNAME list
1185
1186 Get a List of all scheduled Sync Jobs
1187 },
1188 status => qq{
1189 $PROGNAME status
1190
1191 Get the status of all scheduled Sync Jobs
1192 },
1193 help => qq{
1194 $PROGNAME help <cmd> [OPTIONS]
1195
1196 Get help about specified command.
1197
1198 <cmd> string
1199
1200 Command name
1201
1202 -verbose boolean
1203
1204 Verbose output format.
1205 },
1206 enable => qq{
1207 $PROGNAME enable -source <string> [OPTIONS]
1208
1209 enable a syncjob and reset error
1210
1211 -name string
1212
1213 name of the sync job, if not set it is default
1214
1215 -source string
1216
1217 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1218 },
1219 disable => qq{
1220 $PROGNAME disable -source <string> [OPTIONS]
1221
1222 pause a sync job
1223
1224 -name string
1225
1226 name of the sync job, if not set it is default
1227
1228 -source string
1229
1230 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1231 },
1232 printpod => 'internal command',
1233
1234 };
1235
1236 if (!$command) {
1237 usage(); die "\n";
1238 } elsif (!$cmd_help->{$command}) {
1239 print "ERROR: unknown command '$command'";
1240 usage(1); die "\n";
1241 }
1242
1243 my @arg = @ARGV;
1244 my $param = parse_argv(@arg);
1245
1246 sub check_params {
1247 for (@_) {
1248 die "$cmd_help->{$command}\n" if !$param->{$_};
1249 }
1250 }
1251
1252 if ($command eq 'destroy') {
1253 check_params(qw(source));
1254
1255 check_target($param->{source});
1256 destroy_job($param);
1257
1258 } elsif ($command eq 'sync') {
1259 check_params(qw(source dest));
1260
1261 check_target($param->{source});
1262 check_target($param->{dest});
1263 sync($param);
1264
1265 } elsif ($command eq 'create') {
1266 check_params(qw(source dest));
1267
1268 check_target($param->{source});
1269 check_target($param->{dest});
1270 init($param);
1271
1272 } elsif ($command eq 'status') {
1273 print status();
1274
1275 } elsif ($command eq 'list') {
1276 print list();
1277
1278 } elsif ($command eq 'help') {
1279 my $help_command = $ARGV[1];
1280
1281 if ($help_command && $cmd_help->{$help_command}) {
1282 die "$cmd_help->{$help_command}\n";
1283
1284 }
1285 if ($param->{verbose}) {
1286 exec("man $PROGNAME");
1287
1288 } else {
1289 usage(1);
1290
1291 }
1292
1293 } elsif ($command eq 'enable') {
1294 check_params(qw(source));
1295
1296 check_target($param->{source});
1297 enable_job($param);
1298
1299 } elsif ($command eq 'disable') {
1300 check_params(qw(source));
1301
1302 check_target($param->{source});
1303 disable_job($param);
1304
1305 } elsif ($command eq 'printpod') {
1306 print_pod();
1307 }
1308
1309 sub usage {
1310 my ($help) = @_;
1311
1312 print("ERROR:\tno command specified\n") if !$help;
1313 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1314 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1315 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1316 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1317 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1318 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1319 print("\t$PROGNAME list\n");
1320 print("\t$PROGNAME status\n");
1321 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1322 }
1323
1324 sub check_target {
1325 my ($target) = @_;
1326 parse_target($target);
1327 }
1328
1329 sub print_pod {
1330
1331 my $synopsis = join("\n", sort values %$cmd_help);
1332
1333 print <<EOF;
1334 =head1 NAME
1335
1336 pve-zsync - PVE ZFS Replication Manager
1337
1338 =head1 SYNOPSIS
1339
1340 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1341
1342 $synopsis
1343
1344 =head1 DESCRIPTION
1345
1346 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1347 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1348 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1349 To config cron see man crontab.
1350
1351 =head2 PVE ZFS Storage sync Tool
1352
1353 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1354
1355 =head1 EXAMPLES
1356
1357 add sync job from local VM to remote ZFS Server
1358 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1359
1360 =head1 IMPORTANT FILES
1361
1362 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1363
1364 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1365
1366 =head1 COPYRIGHT AND DISCLAIMER
1367
1368 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1369
1370 This program is free software: you can redistribute it and/or modify it
1371 under the terms of the GNU Affero General Public License as published
1372 by the Free Software Foundation, either version 3 of the License, or
1373 (at your option) any later version.
1374
1375 This program is distributed in the hope that it will be useful, but
1376 WITHOUT ANY WARRANTY; without even the implied warranty of
1377 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1378 Affero General Public License for more details.
1379
1380 You should have received a copy of the GNU Affero General Public
1381 License along with this program. If not, see
1382 <http://www.gnu.org/licenses/>.
1383
1384 EOF
1385 }