]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
parse disks: also include storage ID information
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $PROG_PATH = "$PATH/${PROGNAME}";
22 my $INTERVAL = 15;
23 my $DEBUG;
24
25 BEGIN {
26 $DEBUG = 0; # change default here. not above on declaration!
27 $DEBUG ||= $ENV{ZSYNC_DEBUG};
28 if ($DEBUG) {
29 require Data::Dumper;
30 Data::Dumper->import();
31 }
32 }
33
34 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
35 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
36 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
37 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
38
39 my $IPV6RE = "(?:" .
40 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
41 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
42 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
43 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
49
50 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
51 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
52 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
53 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
54 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
55
56 my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|mp)\d+)|rootfs): /;
57
58 my $INSTANCE_ID = get_instance_id($$);
59
60 my $command = $ARGV[0];
61
62 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
63 check_bin ('cstream');
64 check_bin ('zfs');
65 check_bin ('ssh');
66 check_bin ('scp');
67 }
68
69 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
70 die "Signaled, aborting sync: $!\n";
71 };
72
73 sub check_bin {
74 my ($bin) = @_;
75
76 foreach my $p (split (/:/, $ENV{PATH})) {
77 my $fn = "$p/$bin";
78 if (-x $fn) {
79 return $fn;
80 }
81 }
82
83 die "unable to find command '$bin'\n";
84 }
85
86 sub read_file {
87 my ($filename, $one_line_only) = @_;
88
89 my $fh = IO::File->new($filename, "r")
90 or die "Could not open file ${filename}: $!\n";
91
92 my $text = $one_line_only ? <$fh> : [ <$fh> ];
93
94 close($fh);
95
96 return $text;
97 }
98
99 sub cut_target_width {
100 my ($path, $maxlen) = @_;
101 $path =~ s@/+@/@g;
102
103 return $path if length($path) <= $maxlen;
104
105 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
106
107 $path =~ s@/([^/]+/?)$@@;
108 my $tail = $1;
109
110 if (length($tail)+3 == $maxlen) {
111 return "../$tail";
112 } elsif (length($tail)+2 >= $maxlen) {
113 return '..'.substr($tail, -$maxlen+2)
114 }
115
116 $path =~ s@(/[^/]+)(?:/|$)@@;
117 my $head = $1;
118 my $both = length($head) + length($tail);
119 my $remaining = $maxlen-$both-4; # -4 for "/../"
120
121 if ($remaining < 0) {
122 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
123 }
124
125 substr($path, ($remaining/2), (length($path)-$remaining), '..');
126 return "$head/" . $path . "/$tail";
127 }
128
129 sub locked {
130 my ($lock_fn, $code) = @_;
131
132 my $lock_fh = IO::File->new("> $lock_fn");
133
134 flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
135 my $res = eval { $code->() };
136 my $err = $@;
137
138 flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
139 die "$err" if $err;
140
141 close($lock_fh);
142 return $res;
143 }
144
145 sub get_status {
146 my ($source, $name, $status) = @_;
147
148 if ($status->{$source->{all}}->{$name}->{status}) {
149 return $status;
150 }
151
152 return undef;
153 }
154
155 sub check_dataset_exists {
156 my ($dataset, $ip, $user) = @_;
157
158 my $cmd = [];
159
160 if ($ip) {
161 push @$cmd, 'ssh', "$user\@$ip", '--';
162 }
163 push @$cmd, 'zfs', 'list', '-H', '--', $dataset;
164 eval {
165 run_cmd($cmd);
166 };
167
168 if ($@) {
169 return 0;
170 }
171 return 1;
172 }
173
174 sub create_file_system {
175 my ($file_system, $ip, $user) = @_;
176
177 my $cmd = [];
178
179 if ($ip) {
180 push @$cmd, 'ssh', "$user\@$ip", '--';
181 }
182 push @$cmd, 'zfs', 'create', $file_system;
183
184 run_cmd($cmd);
185 }
186
187 sub parse_target {
188 my ($text) = @_;
189
190 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
191 my $target = {};
192
193 if ($text !~ $TARGETRE) {
194 die "$errstr\n";
195 }
196 $target->{all} = $2;
197 $target->{ip} = $1 if $1;
198 my @parts = split('/', $2);
199
200 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
201
202 my $pool = $target->{pool} = shift(@parts);
203 die "$errstr\n" if !$pool;
204
205 if ($pool =~ m/^\d+$/) {
206 $target->{vmid} = $pool;
207 delete $target->{pool};
208 }
209
210 return $target if (@parts == 0);
211 $target->{last_part} = pop(@parts);
212
213 if ($target->{ip}) {
214 pop(@parts);
215 }
216 if (@parts > 0) {
217 $target->{path} = join('/', @parts);
218 }
219
220 return $target;
221 }
222
223 sub read_cron {
224
225 #This is for the first use to init file;
226 if (!-e $CRONJOBS) {
227 my $new_fh = IO::File->new("> $CRONJOBS");
228 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
229 close($new_fh);
230 return undef;
231 }
232
233 my $text = read_file($CRONJOBS, 0);
234
235 return encode_cron(@{$text});
236 }
237
238 sub parse_argv {
239 my (@arg) = @_;
240
241 my $param = {
242 dest => undef,
243 source => undef,
244 verbose => undef,
245 limit => undef,
246 maxsnap => undef,
247 name => undef,
248 skip => undef,
249 method => undef,
250 source_user => undef,
251 dest_user => undef,
252 properties => undef,
253 dest_config_path => undef,
254 };
255
256 my ($ret) = GetOptionsFromArray(
257 \@arg,
258 'dest=s' => \$param->{dest},
259 'source=s' => \$param->{source},
260 'verbose' => \$param->{verbose},
261 'limit=i' => \$param->{limit},
262 'maxsnap=i' => \$param->{maxsnap},
263 'name=s' => \$param->{name},
264 'skip' => \$param->{skip},
265 'method=s' => \$param->{method},
266 'source-user=s' => \$param->{source_user},
267 'dest-user=s' => \$param->{dest_user},
268 'properties' => \$param->{properties},
269 'dest-config-path=s' => \$param->{dest_config_path},
270 );
271
272 die "can't parse options\n" if $ret == 0;
273
274 $param->{name} //= "default";
275 $param->{maxsnap} //= 1;
276 $param->{method} //= "ssh";
277 $param->{source_user} //= "root";
278 $param->{dest_user} //= "root";
279
280 return $param;
281 }
282
283 sub add_state_to_job {
284 my ($job) = @_;
285
286 my $states = read_state();
287 my $state = $states->{$job->{source}}->{$job->{name}};
288
289 $job->{state} = $state->{state};
290 $job->{lsync} = $state->{lsync};
291 $job->{vm_type} = $state->{vm_type};
292 $job->{instance_id} = $state->{instance_id};
293
294 for (my $i = 0; $state->{"snap$i"}; $i++) {
295 $job->{"snap$i"} = $state->{"snap$i"};
296 }
297
298 return $job;
299 }
300
301 sub encode_cron {
302 my (@text) = @_;
303
304 my $cfg = {};
305
306 while (my $line = shift(@text)) {
307
308 my @arg = split('\s', $line);
309 my $param = parse_argv(@arg);
310
311 if ($param->{source} && $param->{dest}) {
312 my $source = delete $param->{source};
313 my $name = delete $param->{name};
314
315 $cfg->{$source}->{$name} = $param;
316 }
317 }
318
319 return $cfg;
320 }
321
322 sub param_to_job {
323 my ($param) = @_;
324
325 my $job = {};
326
327 my $source = parse_target($param->{source});
328 my $dest = parse_target($param->{dest}) if $param->{dest};
329
330 $job->{name} = !$param->{name} ? "default" : $param->{name};
331 $job->{dest} = $param->{dest} if $param->{dest};
332 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
333 $job->{method} = "ssh" if !$job->{method};
334 $job->{limit} = $param->{limit};
335 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
336 $job->{source} = $param->{source};
337 $job->{source_user} = $param->{source_user};
338 $job->{dest_user} = $param->{dest_user};
339 $job->{properties} = !!$param->{properties};
340 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
341
342 return $job;
343 }
344
345 sub read_state {
346
347 if (!-e $STATE) {
348 make_path $CONFIG_PATH;
349 my $new_fh = IO::File->new("> $STATE");
350 die "Could not create $STATE: $!\n" if !$new_fh;
351 print $new_fh "{}";
352 close($new_fh);
353 return undef;
354 }
355
356 my $text = read_file($STATE, 1);
357 return decode_json($text);
358 }
359
360 sub update_state {
361 my ($job) = @_;
362
363 my $text = eval { read_file($STATE, 1); };
364
365 my $out_fh = IO::File->new("> $STATE.new");
366 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
367
368 my $states = {};
369 my $state = {};
370 if ($text){
371 $states = decode_json($text);
372 $state = $states->{$job->{source}}->{$job->{name}};
373 }
374
375 if ($job->{state} ne "del") {
376 $state->{state} = $job->{state};
377 $state->{lsync} = $job->{lsync};
378 $state->{instance_id} = $job->{instance_id};
379 $state->{vm_type} = $job->{vm_type};
380
381 for (my $i = 0; $job->{"snap$i"} ; $i++) {
382 $state->{"snap$i"} = $job->{"snap$i"};
383 }
384 $states->{$job->{source}}->{$job->{name}} = $state;
385 } else {
386
387 delete $states->{$job->{source}}->{$job->{name}};
388 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
389 }
390
391 $text = encode_json($states);
392 print $out_fh $text;
393
394 close($out_fh);
395 rename "$STATE.new", $STATE;
396
397 return $states;
398 }
399
400 sub update_cron {
401 my ($job) = @_;
402
403 my $updated;
404 my $has_header;
405 my $line_no = 0;
406 my $text = "";
407 my $header = "SHELL=/bin/sh\n";
408 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
409
410 my $current = read_file($CRONJOBS, 0);
411
412 foreach my $line (@{$current}) {
413 chomp($line);
414 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
415 $updated = 1;
416 next if $job->{state} eq "del";
417 $text .= format_job($job, $line);
418 } else {
419 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
420 $has_header = 1;
421 }
422 $text .= "$line\n";
423 }
424 $line_no++;
425 }
426
427 if (!$has_header) {
428 $text = "$header$text";
429 }
430
431 if (!$updated) {
432 $text .= format_job($job);
433 }
434 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
435 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
436
437 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
438 close ($new_fh);
439
440 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
441 }
442
443 sub format_job {
444 my ($job, $line) = @_;
445 my $text = "";
446
447 if ($job->{state} eq "stopped") {
448 $text = "#";
449 }
450 if ($line) {
451 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
452 $text .= $1;
453 } else {
454 $text .= "*/$INTERVAL * * * *";
455 }
456 $text .= " root";
457 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
458 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
459 $text .= " --limit $job->{limit}" if $job->{limit};
460 $text .= " --method $job->{method}";
461 $text .= " --verbose" if $job->{verbose};
462 $text .= " --source-user $job->{source_user}";
463 $text .= " --dest-user $job->{dest_user}";
464 $text .= " --properties" if $job->{properties};
465 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
466 $text .= "\n";
467
468 return $text;
469 }
470
471 sub list {
472
473 my $cfg = read_cron();
474
475 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
476
477 my $states = read_state();
478 foreach my $source (sort keys%{$cfg}) {
479 foreach my $name (sort keys%{$cfg->{$source}}) {
480 $list .= sprintf("%-25s", cut_target_width($source, 25));
481 $list .= sprintf("%-25s", cut_target_width($name, 25));
482 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
483 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
484 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
485 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
486 }
487 }
488
489 return $list;
490 }
491
492 sub vm_exists {
493 my ($target, $user) = @_;
494
495 return undef if !defined($target->{vmid});
496
497 my $conf_fn = "$target->{vmid}.conf";
498
499 if ($target->{ip}) {
500 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
501 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
502 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
503 } else {
504 return "qemu" if -f "$QEMU_CONF/$conf_fn";
505 return "lxc" if -f "$LXC_CONF/$conf_fn";
506 }
507
508 return undef;
509 }
510
511 sub init {
512 my ($param) = @_;
513
514 locked("$CONFIG_PATH/cron_and_state.lock", sub {
515 my $cfg = read_cron();
516
517 my $job = param_to_job($param);
518
519 $job->{state} = "ok";
520 $job->{lsync} = 0;
521
522 my $source = parse_target($param->{source});
523 my $dest = parse_target($param->{dest});
524
525 if (my $ip = $dest->{ip}) {
526 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
527 }
528
529 if (my $ip = $source->{ip}) {
530 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
531 }
532
533 die "Pool $dest->{all} does not exist\n"
534 if !check_dataset_exists($dest->{all}, $dest->{ip}, $param->{dest_user});
535
536 if (!defined($source->{vmid})) {
537 die "Pool $source->{all} does not exist\n"
538 if !check_dataset_exists($source->{all}, $source->{ip}, $param->{source_user});
539 }
540
541 my $vm_type = vm_exists($source, $param->{source_user});
542 $job->{vm_type} = $vm_type;
543 $source->{vm_type} = $vm_type;
544
545 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
546
547 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
548
549 #check if vm has zfs disks if not die;
550 get_disks($source, $param->{source_user}) if $source->{vmid};
551
552 update_cron($job);
553 update_state($job);
554 }); #cron and state lock
555
556 return if $param->{skip};
557
558 eval { sync($param) };
559 if (my $err = $@) {
560 destroy_job($param);
561 print $err;
562 }
563 }
564
565 sub get_job {
566 my ($param) = @_;
567
568 my $cfg = read_cron();
569
570 if (!$cfg->{$param->{source}}->{$param->{name}}) {
571 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
572 }
573 my $job = $cfg->{$param->{source}}->{$param->{name}};
574 $job->{name} = $param->{name};
575 $job->{source} = $param->{source};
576 $job = add_state_to_job($job);
577
578 return $job;
579 }
580
581 sub destroy_job {
582 my ($param) = @_;
583
584 locked("$CONFIG_PATH/cron_and_state.lock", sub {
585 my $job = get_job($param);
586 $job->{state} = "del";
587
588 update_cron($job);
589 update_state($job);
590 });
591 }
592
593 sub get_instance_id {
594 my ($pid) = @_;
595
596 my $stat = read_file("/proc/$pid/stat", 1)
597 or die "unable to read process stats\n";
598 my $boot_id = read_file("/proc/sys/kernel/random/boot_id", 1)
599 or die "unable to read boot ID\n";
600
601 my $stats = [ split(/\s+/, $stat) ];
602 my $starttime = $stats->[21];
603 chomp($boot_id);
604
605 return "${pid}:${starttime}:${boot_id}";
606 }
607
608 sub instance_exists {
609 my ($instance_id) = @_;
610
611 if (defined($instance_id) && $instance_id =~ m/^([1-9][0-9]*):/) {
612 my $pid = $1;
613 my $actual_id = eval { get_instance_id($pid); };
614 return defined($actual_id) && $actual_id eq $instance_id;
615 }
616
617 return 0;
618 }
619
620 sub sync {
621 my ($param) = @_;
622
623 my $job;
624
625 locked("$CONFIG_PATH/cron_and_state.lock", sub {
626 eval { $job = get_job($param) };
627
628 if ($job) {
629 my $state = $job->{state} // 'ok';
630 $state = 'ok' if !instance_exists($job->{instance_id});
631
632 if ($state eq "syncing" || $state eq "waiting") {
633 die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
634 }
635
636 $job->{state} = "waiting";
637 $job->{instance_id} = $INSTANCE_ID;
638
639 update_state($job);
640 }
641 });
642
643 locked("$CONFIG_PATH/sync.lock", sub {
644
645 my $date = get_date();
646
647 my $dest;
648 my $source;
649 my $vm_type;
650
651 locked("$CONFIG_PATH/cron_and_state.lock", sub {
652 #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
653 eval { $job = get_job($param); };
654
655 if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
656 die "Job --source $param->{source} --name $param->{name} has been disabled\n";
657 }
658
659 $dest = parse_target($param->{dest});
660 $source = parse_target($param->{source});
661
662 $vm_type = vm_exists($source, $param->{source_user});
663 $source->{vm_type} = $vm_type;
664
665 if ($job) {
666 $job->{state} = "syncing";
667 $job->{vm_type} = $vm_type if !$job->{vm_type};
668 update_state($job);
669 }
670 }); #cron and state lock
671
672 my $sync_path = sub {
673 my ($source, $dest, $job, $param, $date) = @_;
674
675 ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{dest_user});
676
677 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
678
679 send_image($source, $dest, $param);
680
681 snapshot_destroy($source, $dest, $param->{method}, $dest->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $dest->{old_snap});
682
683 };
684
685 eval{
686 if ($source->{vmid}) {
687 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
688 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
689 my $disks = get_disks($source, $param->{source_user});
690
691 foreach my $disk (sort keys %{$disks}) {
692 $source->{all} = $disks->{$disk}->{all};
693 $source->{pool} = $disks->{$disk}->{pool};
694 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
695 $source->{last_part} = $disks->{$disk}->{last_part};
696 &$sync_path($source, $dest, $job, $param, $date);
697 }
698 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
699 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
700 } else {
701 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
702 }
703 } else {
704 &$sync_path($source, $dest, $job, $param, $date);
705 }
706 };
707 if (my $err = $@) {
708 locked("$CONFIG_PATH/cron_and_state.lock", sub {
709 eval { $job = get_job($param); };
710 if ($job) {
711 $job->{state} = "error";
712 delete $job->{instance_id};
713 update_state($job);
714 }
715 });
716 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
717 die "$err\n";
718 }
719
720 locked("$CONFIG_PATH/cron_and_state.lock", sub {
721 eval { $job = get_job($param); };
722 if ($job) {
723 if (defined($job->{state}) && $job->{state} eq "stopped") {
724 $job->{state} = "stopped";
725 } else {
726 $job->{state} = "ok";
727 }
728 $job->{lsync} = $date;
729 delete $job->{instance_id};
730 update_state($job);
731 }
732 });
733 }); #sync lock
734 }
735
736 sub snapshot_get{
737 my ($source, $dest, $max_snap, $name, $dest_user) = @_;
738
739 my $cmd = [];
740 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--', if $dest->{ip};
741 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
742
743 my $path = target_dataset($source, $dest);
744 push @$cmd, $path;
745
746 my $raw;
747 eval {$raw = run_cmd($cmd)};
748 if (my $erro =$@) { #this means the volume doesn't exist on dest yet
749 return undef;
750 }
751
752 my $index = 0;
753 my $line = "";
754 my $last_snap = undef;
755 my $old_snap;
756
757 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
758 $line = $1;
759 if ($line =~ m/@(.*)$/) {
760 $last_snap = $1 if (!$last_snap);
761 }
762 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
763 $old_snap = $1;
764 $index++;
765 if ($index == $max_snap) {
766 $source->{destroy} = 1;
767 last;
768 };
769 }
770 }
771
772 return ($old_snap, $last_snap) if $last_snap;
773
774 return undef;
775 }
776
777 sub snapshot_add {
778 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
779
780 my $snap_name = "rep_$name\_".$date;
781
782 $source->{new_snap} = $snap_name;
783
784 my $path = "$source->{all}\@$snap_name";
785
786 my $cmd = [];
787 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
788 push @$cmd, 'zfs', 'snapshot', $path;
789 eval{
790 run_cmd($cmd);
791 };
792
793 if (my $err = $@) {
794 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
795 die "$err\n";
796 }
797 }
798
799 sub get_disks {
800 my ($target, $user) = @_;
801
802 my $cmd = [];
803 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
804
805 if ($target->{vm_type} eq 'qemu') {
806 push @$cmd, 'qm', 'config', $target->{vmid};
807 } elsif ($target->{vm_type} eq 'lxc') {
808 push @$cmd, 'pct', 'config', $target->{vmid};
809 } else {
810 die "VM Type unknown\n";
811 }
812
813 my $res = run_cmd($cmd);
814
815 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
816
817 return $disks;
818 }
819
820 sub run_cmd {
821 my ($cmd) = @_;
822 print "Start CMD\n" if $DEBUG;
823 print Dumper $cmd if $DEBUG;
824 if (ref($cmd) eq 'ARRAY') {
825 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
826 }
827 my $output = `$cmd 2>&1`;
828
829 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
830
831 chomp($output);
832 print Dumper $output if $DEBUG;
833 print "END CMD\n" if $DEBUG;
834 return $output;
835 }
836
837 sub parse_disks {
838 my ($text, $ip, $vm_type, $user) = @_;
839
840 my $disks;
841
842 my $num = 0;
843 while ($text && $text =~ s/^(.*?)(\n|$)//) {
844 my $line = $1;
845
846 next if $line =~ /media=cdrom/;
847 next if $line !~ m/$DISK_KEY_RE/;
848
849 #QEMU if backup is not set include in sync
850 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
851
852 #LXC if backup is not set do no in sync
853 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
854
855 my $disk = undef;
856 my $stor = undef;
857 if($line =~ m/$DISK_KEY_RE(.*)$/) {
858 my @parameter = split(/,/,$1);
859
860 foreach my $opt (@parameter) {
861 if ($opt =~ m/^(?:file=|volume=)?([^:]+):([A-Za-z0-9\-]+)$/){
862 $disk = $2;
863 $stor = $1;
864 last;
865 }
866 }
867 }
868 if (!defined($disk) || !defined($stor)) {
869 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
870 next;
871 }
872
873 my $cmd = [];
874 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
875 push @$cmd, 'pvesm', 'path', "$stor:$disk";
876 my $path = run_cmd($cmd);
877
878 die "Get no path from pvesm path $stor:$disk\n" if !$path;
879
880 $disks->{$num}->{storage_id} = $stor;
881
882 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
883
884 my @array = split('/', $1);
885 $disks->{$num}->{pool} = shift(@array);
886 $disks->{$num}->{all} = $disks->{$num}->{pool};
887 if (0 < @array) {
888 $disks->{$num}->{path} = join('/', @array);
889 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
890 }
891 $disks->{$num}->{last_part} = $disk;
892 $disks->{$num}->{all} .= "\/$disk";
893
894 $num++;
895 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
896
897 $disks->{$num}->{pool} = $1;
898 $disks->{$num}->{all} = $disks->{$num}->{pool};
899
900 if ($2) {
901 $disks->{$num}->{path} = $3;
902 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
903 }
904
905 $disks->{$num}->{last_part} = $disk;
906 $disks->{$num}->{all} .= "\/$disk";
907
908 $num++;
909
910 } else {
911 die "ERROR: in path\n";
912 }
913 }
914
915 die "Vm include no disk on zfs.\n" if !$disks->{0};
916 return $disks;
917 }
918
919 # how the corresponding dataset is named on the target
920 sub target_dataset {
921 my ($source, $dest) = @_;
922
923 my $target = "$dest->{all}";
924 $target .= "/$source->{last_part}" if $source->{last_part};
925 $target =~ s!/+!/!g;
926
927 return $target;
928 }
929
930 sub snapshot_destroy {
931 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
932
933 my @zfscmd = ('zfs', 'destroy');
934 my $snapshot = "$source->{all}\@$snap";
935
936 eval {
937 if($source->{ip} && $method eq 'ssh'){
938 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
939 } else {
940 run_cmd([@zfscmd, $snapshot]);
941 }
942 };
943 if (my $erro = $@) {
944 warn "WARN: $erro";
945 }
946 if ($dest) {
947 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
948
949 my $path = target_dataset($source, $dest);
950
951 eval {
952 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
953 };
954 if (my $erro = $@) {
955 warn "WARN: $erro";
956 }
957 }
958 }
959
960 # check if snapshot for incremental sync exist on source side
961 sub snapshot_exist {
962 my ($source , $dest, $method, $source_user) = @_;
963
964 my $cmd = [];
965 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip};
966 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
967
968 my $path = $source->{all};
969 $path .= "\@$dest->{last_snap}";
970
971 push @$cmd, $path;
972
973 eval {run_cmd($cmd)};
974 if (my $erro =$@) {
975 warn "WARN: $erro";
976 return undef;
977 }
978 return 1;
979 }
980
981 sub send_image {
982 my ($source, $dest, $param) = @_;
983
984 my $cmd = [];
985
986 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
987 push @$cmd, 'zfs', 'send';
988 push @$cmd, '-p', if $param->{properties};
989 push @$cmd, '-v' if $param->{verbose};
990
991 if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) {
992 push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}";
993 }
994 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
995
996 if ($param->{limit}){
997 my $bwl = $param->{limit}*1024;
998 push @$cmd, \'|', 'cstream', '-t', $bwl;
999 }
1000 my $target = target_dataset($source, $dest);
1001
1002 push @$cmd, \'|';
1003 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
1004 push @$cmd, 'zfs', 'recv', '-F', '--';
1005 push @$cmd, "$target";
1006
1007 eval {
1008 run_cmd($cmd)
1009 };
1010
1011 if (my $erro = $@) {
1012 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
1013 die $erro;
1014 };
1015 }
1016
1017
1018 sub send_config{
1019 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
1020
1021 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
1022 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
1023
1024 my $config_dir = $dest_config_path // $CONFIG_PATH;
1025 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
1026
1027 $dest_target_new = $config_dir.'/'.$dest_target_new;
1028
1029 if ($method eq 'ssh'){
1030 if ($dest->{ip} && $source->{ip}) {
1031 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1032 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1033 } elsif ($dest->{ip}) {
1034 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1035 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1036 } elsif ($source->{ip}) {
1037 run_cmd(['mkdir', '-p', '--', $config_dir]);
1038 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
1039 }
1040
1041 if ($source->{destroy}){
1042 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$dest->{old_snap}";
1043 if($dest->{ip}){
1044 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
1045 } else {
1046 run_cmd(['rm', '-f', '--', $dest_target_old]);
1047 }
1048 }
1049 } elsif ($method eq 'local') {
1050 run_cmd(['mkdir', '-p', '--', $config_dir]);
1051 run_cmd(['cp', $source_target, $dest_target_new]);
1052 }
1053 }
1054
1055 sub get_date {
1056 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1057 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1058
1059 return $datestamp;
1060 }
1061
1062 sub status {
1063 my $cfg = read_cron();
1064
1065 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1066
1067 my $states = read_state();
1068
1069 foreach my $source (sort keys%{$cfg}) {
1070 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1071 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1072 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1073 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1074 }
1075 }
1076
1077 return $status_list;
1078 }
1079
1080 sub enable_job {
1081 my ($param) = @_;
1082
1083 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1084 my $job = get_job($param);
1085 $job->{state} = "ok";
1086 update_state($job);
1087 update_cron($job);
1088 });
1089 }
1090
1091 sub disable_job {
1092 my ($param) = @_;
1093
1094 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1095 my $job = get_job($param);
1096 $job->{state} = "stopped";
1097 update_state($job);
1098 update_cron($job);
1099 });
1100 }
1101
1102 my $cmd_help = {
1103 destroy => qq{
1104 $PROGNAME destroy -source <string> [OPTIONS]
1105
1106 remove a sync Job from the scheduler
1107
1108 -name string
1109
1110 name of the sync job, if not set it is default
1111
1112 -source string
1113
1114 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1115 },
1116 create => qq{
1117 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1118
1119 Create a sync Job
1120
1121 -dest string
1122
1123 the destination target is like [IP]:<Pool>[/Path]
1124
1125 -dest-user string
1126
1127 name of the user on the destination target, root by default
1128
1129 -limit integer
1130
1131 max sync speed in kBytes/s, default unlimited
1132
1133 -maxsnap integer
1134
1135 how much snapshots will be kept before get erased, default 1
1136
1137 -name string
1138
1139 name of the sync job, if not set it is default
1140
1141 -skip
1142
1143 If specified, skip the first sync.
1144
1145 -source string
1146
1147 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1148
1149 -source-user string
1150
1151 name of the user on the source target, root by default
1152
1153 -properties
1154
1155 If specified, include the dataset's properties in the stream.
1156
1157 -dest-config-path string
1158
1159 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1160 },
1161 sync => qq{
1162 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1163
1164 will sync one time
1165
1166 -dest string
1167
1168 the destination target is like [IP:]<Pool>[/Path]
1169
1170 -dest-user string
1171
1172 name of the user on the destination target, root by default
1173
1174 -limit integer
1175
1176 max sync speed in kBytes/s, default unlimited
1177
1178 -maxsnap integer
1179
1180 how much snapshots will be kept before get erased, default 1
1181
1182 -name string
1183
1184 name of the sync job, if not set it is default.
1185 It is only necessary if scheduler allready contains this source.
1186
1187 -source string
1188
1189 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1190
1191 -source-user string
1192
1193 name of the user on the source target, root by default
1194
1195 -verbose
1196
1197 If specified, print out the sync progress.
1198
1199 -properties
1200
1201 If specified, include the dataset's properties in the stream.
1202
1203 -dest-config-path string
1204
1205 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1206 },
1207 list => qq{
1208 $PROGNAME list
1209
1210 Get a List of all scheduled Sync Jobs
1211 },
1212 status => qq{
1213 $PROGNAME status
1214
1215 Get the status of all scheduled Sync Jobs
1216 },
1217 help => qq{
1218 $PROGNAME help <cmd> [OPTIONS]
1219
1220 Get help about specified command.
1221
1222 <cmd> string
1223
1224 Command name
1225
1226 -verbose
1227
1228 Verbose output format.
1229 },
1230 enable => qq{
1231 $PROGNAME enable -source <string> [OPTIONS]
1232
1233 enable a syncjob and reset error
1234
1235 -name string
1236
1237 name of the sync job, if not set it is default
1238
1239 -source string
1240
1241 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1242 },
1243 disable => qq{
1244 $PROGNAME disable -source <string> [OPTIONS]
1245
1246 pause a sync job
1247
1248 -name string
1249
1250 name of the sync job, if not set it is default
1251
1252 -source string
1253
1254 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1255 },
1256 printpod => 'internal command',
1257
1258 };
1259
1260 if (!$command) {
1261 usage(); die "\n";
1262 } elsif (!$cmd_help->{$command}) {
1263 print "ERROR: unknown command '$command'";
1264 usage(1); die "\n";
1265 }
1266
1267 my @arg = @ARGV;
1268 my $param = parse_argv(@arg);
1269
1270 sub check_params {
1271 for (@_) {
1272 die "$cmd_help->{$command}\n" if !$param->{$_};
1273 }
1274 }
1275
1276 if ($command eq 'destroy') {
1277 check_params(qw(source));
1278
1279 check_target($param->{source});
1280 destroy_job($param);
1281
1282 } elsif ($command eq 'sync') {
1283 check_params(qw(source dest));
1284
1285 check_target($param->{source});
1286 check_target($param->{dest});
1287 sync($param);
1288
1289 } elsif ($command eq 'create') {
1290 check_params(qw(source dest));
1291
1292 check_target($param->{source});
1293 check_target($param->{dest});
1294 init($param);
1295
1296 } elsif ($command eq 'status') {
1297 print status();
1298
1299 } elsif ($command eq 'list') {
1300 print list();
1301
1302 } elsif ($command eq 'help') {
1303 my $help_command = $ARGV[1];
1304
1305 if ($help_command && $cmd_help->{$help_command}) {
1306 die "$cmd_help->{$help_command}\n";
1307
1308 }
1309 if ($param->{verbose}) {
1310 exec("man $PROGNAME");
1311
1312 } else {
1313 usage(1);
1314
1315 }
1316
1317 } elsif ($command eq 'enable') {
1318 check_params(qw(source));
1319
1320 check_target($param->{source});
1321 enable_job($param);
1322
1323 } elsif ($command eq 'disable') {
1324 check_params(qw(source));
1325
1326 check_target($param->{source});
1327 disable_job($param);
1328
1329 } elsif ($command eq 'printpod') {
1330 print_pod();
1331 }
1332
1333 sub usage {
1334 my ($help) = @_;
1335
1336 print("ERROR:\tno command specified\n") if !$help;
1337 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1338 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1339 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1340 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1341 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1342 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1343 print("\t$PROGNAME list\n");
1344 print("\t$PROGNAME status\n");
1345 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1346 }
1347
1348 sub check_target {
1349 my ($target) = @_;
1350 parse_target($target);
1351 }
1352
1353 sub print_pod {
1354
1355 my $synopsis = join("\n", sort values %$cmd_help);
1356
1357 print <<EOF;
1358 =head1 NAME
1359
1360 pve-zsync - PVE ZFS Replication Manager
1361
1362 =head1 SYNOPSIS
1363
1364 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1365
1366 $synopsis
1367
1368 =head1 DESCRIPTION
1369
1370 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1371 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1372 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1373 To config cron see man crontab.
1374
1375 =head2 PVE ZFS Storage sync Tool
1376
1377 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1378
1379 =head1 EXAMPLES
1380
1381 add sync job from local VM to remote ZFS Server
1382 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1383
1384 =head1 IMPORTANT FILES
1385
1386 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1387
1388 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1389
1390 =head1 COPYRIGHT AND DISCLAIMER
1391
1392 Copyright (C) 2007-2021 Proxmox Server Solutions GmbH
1393
1394 This program is free software: you can redistribute it and/or modify it
1395 under the terms of the GNU Affero General Public License as published
1396 by the Free Software Foundation, either version 3 of the License, or
1397 (at your option) any later version.
1398
1399 This program is distributed in the hope that it will be useful, but
1400 WITHOUT ANY WARRANTY; without even the implied warranty of
1401 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1402 Affero General Public License for more details.
1403
1404 You should have received a copy of the GNU Affero General Public
1405 License along with this program. If not, see
1406 <http://www.gnu.org/licenses/>.
1407
1408 EOF
1409 }