]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
fix #1669: add prepend-storage-id flag
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $PROG_PATH = "$PATH/${PROGNAME}";
22 my $INTERVAL = 15;
23 my $DEBUG;
24
25 BEGIN {
26 $DEBUG = 0; # change default here. not above on declaration!
27 $DEBUG ||= $ENV{ZSYNC_DEBUG};
28 if ($DEBUG) {
29 require Data::Dumper;
30 Data::Dumper->import();
31 }
32 }
33
34 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
35 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
36 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
37 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
38
39 my $IPV6RE = "(?:" .
40 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
41 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
42 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
43 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
49
50 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
51 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
52 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
53 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
54 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
55
56 my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|mp)\d+)|rootfs): /;
57
58 my $INSTANCE_ID = get_instance_id($$);
59
60 my $command = $ARGV[0];
61
62 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
63 check_bin ('cstream');
64 check_bin ('zfs');
65 check_bin ('ssh');
66 check_bin ('scp');
67 }
68
69 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
70 die "Signaled, aborting sync: $!\n";
71 };
72
73 sub check_bin {
74 my ($bin) = @_;
75
76 foreach my $p (split (/:/, $ENV{PATH})) {
77 my $fn = "$p/$bin";
78 if (-x $fn) {
79 return $fn;
80 }
81 }
82
83 die "unable to find command '$bin'\n";
84 }
85
86 sub read_file {
87 my ($filename, $one_line_only) = @_;
88
89 my $fh = IO::File->new($filename, "r")
90 or die "Could not open file ${filename}: $!\n";
91
92 my $text = $one_line_only ? <$fh> : [ <$fh> ];
93
94 close($fh);
95
96 return $text;
97 }
98
99 sub cut_target_width {
100 my ($path, $maxlen) = @_;
101 $path =~ s@/+@/@g;
102
103 return $path if length($path) <= $maxlen;
104
105 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
106
107 $path =~ s@/([^/]+/?)$@@;
108 my $tail = $1;
109
110 if (length($tail)+3 == $maxlen) {
111 return "../$tail";
112 } elsif (length($tail)+2 >= $maxlen) {
113 return '..'.substr($tail, -$maxlen+2)
114 }
115
116 $path =~ s@(/[^/]+)(?:/|$)@@;
117 my $head = $1;
118 my $both = length($head) + length($tail);
119 my $remaining = $maxlen-$both-4; # -4 for "/../"
120
121 if ($remaining < 0) {
122 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
123 }
124
125 substr($path, ($remaining/2), (length($path)-$remaining), '..');
126 return "$head/" . $path . "/$tail";
127 }
128
129 sub locked {
130 my ($lock_fn, $code) = @_;
131
132 my $lock_fh = IO::File->new("> $lock_fn");
133
134 flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
135 my $res = eval { $code->() };
136 my $err = $@;
137
138 flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
139 die "$err" if $err;
140
141 close($lock_fh);
142 return $res;
143 }
144
145 sub get_status {
146 my ($source, $name, $status) = @_;
147
148 if ($status->{$source->{all}}->{$name}->{status}) {
149 return $status;
150 }
151
152 return undef;
153 }
154
155 sub check_dataset_exists {
156 my ($dataset, $ip, $user) = @_;
157
158 my $cmd = [];
159
160 if ($ip) {
161 push @$cmd, 'ssh', "$user\@$ip", '--';
162 }
163 push @$cmd, 'zfs', 'list', '-H', '--', $dataset;
164 eval {
165 run_cmd($cmd);
166 };
167
168 if ($@) {
169 return 0;
170 }
171 return 1;
172 }
173
174 sub create_file_system {
175 my ($file_system, $ip, $user) = @_;
176
177 my $cmd = [];
178
179 if ($ip) {
180 push @$cmd, 'ssh', "$user\@$ip", '--';
181 }
182 push @$cmd, 'zfs', 'create', $file_system;
183
184 run_cmd($cmd);
185 }
186
187 sub parse_target {
188 my ($text) = @_;
189
190 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
191 my $target = {};
192
193 if ($text !~ $TARGETRE) {
194 die "$errstr\n";
195 }
196 $target->{all} = $2;
197 $target->{ip} = $1 if $1;
198 my @parts = split('/', $2);
199
200 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
201
202 my $pool = $target->{pool} = shift(@parts);
203 die "$errstr\n" if !$pool;
204
205 if ($pool =~ m/^\d+$/) {
206 $target->{vmid} = $pool;
207 delete $target->{pool};
208 }
209
210 return $target if (@parts == 0);
211 $target->{last_part} = pop(@parts);
212
213 if ($target->{ip}) {
214 pop(@parts);
215 }
216 if (@parts > 0) {
217 $target->{path} = join('/', @parts);
218 }
219
220 return $target;
221 }
222
223 sub read_cron {
224
225 #This is for the first use to init file;
226 if (!-e $CRONJOBS) {
227 my $new_fh = IO::File->new("> $CRONJOBS");
228 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
229 close($new_fh);
230 return undef;
231 }
232
233 my $text = read_file($CRONJOBS, 0);
234
235 return encode_cron(@{$text});
236 }
237
238 sub parse_argv {
239 my (@arg) = @_;
240
241 my $param = {
242 dest => undef,
243 source => undef,
244 verbose => undef,
245 limit => undef,
246 maxsnap => undef,
247 name => undef,
248 skip => undef,
249 method => undef,
250 source_user => undef,
251 dest_user => undef,
252 prepend_storage_id => undef,
253 properties => undef,
254 dest_config_path => undef,
255 };
256
257 my ($ret) = GetOptionsFromArray(
258 \@arg,
259 'dest=s' => \$param->{dest},
260 'source=s' => \$param->{source},
261 'verbose' => \$param->{verbose},
262 'limit=i' => \$param->{limit},
263 'maxsnap=i' => \$param->{maxsnap},
264 'name=s' => \$param->{name},
265 'skip' => \$param->{skip},
266 'method=s' => \$param->{method},
267 'source-user=s' => \$param->{source_user},
268 'dest-user=s' => \$param->{dest_user},
269 'prepend-storage-id' => \$param->{prepend_storage_id},
270 'properties' => \$param->{properties},
271 'dest-config-path=s' => \$param->{dest_config_path},
272 );
273
274 die "can't parse options\n" if $ret == 0;
275
276 $param->{name} //= "default";
277 $param->{maxsnap} //= 1;
278 $param->{method} //= "ssh";
279 $param->{source_user} //= "root";
280 $param->{dest_user} //= "root";
281
282 return $param;
283 }
284
285 sub add_state_to_job {
286 my ($job) = @_;
287
288 my $states = read_state();
289 my $state = $states->{$job->{source}}->{$job->{name}};
290
291 $job->{state} = $state->{state};
292 $job->{lsync} = $state->{lsync};
293 $job->{vm_type} = $state->{vm_type};
294 $job->{instance_id} = $state->{instance_id};
295
296 for (my $i = 0; $state->{"snap$i"}; $i++) {
297 $job->{"snap$i"} = $state->{"snap$i"};
298 }
299
300 return $job;
301 }
302
303 sub encode_cron {
304 my (@text) = @_;
305
306 my $cfg = {};
307
308 while (my $line = shift(@text)) {
309
310 my @arg = split('\s', $line);
311 my $param = parse_argv(@arg);
312
313 if ($param->{source} && $param->{dest}) {
314 my $source = delete $param->{source};
315 my $name = delete $param->{name};
316
317 $cfg->{$source}->{$name} = $param;
318 }
319 }
320
321 return $cfg;
322 }
323
324 sub param_to_job {
325 my ($param) = @_;
326
327 my $job = {};
328
329 my $source = parse_target($param->{source});
330 my $dest = parse_target($param->{dest}) if $param->{dest};
331
332 $job->{name} = !$param->{name} ? "default" : $param->{name};
333 $job->{dest} = $param->{dest} if $param->{dest};
334 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
335 $job->{method} = "ssh" if !$job->{method};
336 $job->{limit} = $param->{limit};
337 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
338 $job->{source} = $param->{source};
339 $job->{source_user} = $param->{source_user};
340 $job->{dest_user} = $param->{dest_user};
341 $job->{prepend_storage_id} = !!$param->{prepend_storage_id};
342 $job->{properties} = !!$param->{properties};
343 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
344
345 return $job;
346 }
347
348 sub read_state {
349
350 if (!-e $STATE) {
351 make_path $CONFIG_PATH;
352 my $new_fh = IO::File->new("> $STATE");
353 die "Could not create $STATE: $!\n" if !$new_fh;
354 print $new_fh "{}";
355 close($new_fh);
356 return undef;
357 }
358
359 my $text = read_file($STATE, 1);
360 return decode_json($text);
361 }
362
363 sub update_state {
364 my ($job) = @_;
365
366 my $text = eval { read_file($STATE, 1); };
367
368 my $out_fh = IO::File->new("> $STATE.new");
369 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
370
371 my $states = {};
372 my $state = {};
373 if ($text){
374 $states = decode_json($text);
375 $state = $states->{$job->{source}}->{$job->{name}};
376 }
377
378 if ($job->{state} ne "del") {
379 $state->{state} = $job->{state};
380 $state->{lsync} = $job->{lsync};
381 $state->{instance_id} = $job->{instance_id};
382 $state->{vm_type} = $job->{vm_type};
383
384 for (my $i = 0; $job->{"snap$i"} ; $i++) {
385 $state->{"snap$i"} = $job->{"snap$i"};
386 }
387 $states->{$job->{source}}->{$job->{name}} = $state;
388 } else {
389
390 delete $states->{$job->{source}}->{$job->{name}};
391 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
392 }
393
394 $text = encode_json($states);
395 print $out_fh $text;
396
397 close($out_fh);
398 rename "$STATE.new", $STATE;
399
400 return $states;
401 }
402
403 sub update_cron {
404 my ($job) = @_;
405
406 my $updated;
407 my $has_header;
408 my $line_no = 0;
409 my $text = "";
410 my $header = "SHELL=/bin/sh\n";
411 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
412
413 my $current = read_file($CRONJOBS, 0);
414
415 foreach my $line (@{$current}) {
416 chomp($line);
417 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
418 $updated = 1;
419 next if $job->{state} eq "del";
420 $text .= format_job($job, $line);
421 } else {
422 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
423 $has_header = 1;
424 }
425 $text .= "$line\n";
426 }
427 $line_no++;
428 }
429
430 if (!$has_header) {
431 $text = "$header$text";
432 }
433
434 if (!$updated) {
435 $text .= format_job($job);
436 }
437 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
438 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
439
440 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
441 close ($new_fh);
442
443 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
444 }
445
446 sub format_job {
447 my ($job, $line) = @_;
448 my $text = "";
449
450 if ($job->{state} eq "stopped") {
451 $text = "#";
452 }
453 if ($line) {
454 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
455 $text .= $1;
456 } else {
457 $text .= "*/$INTERVAL * * * *";
458 }
459 $text .= " root";
460 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
461 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
462 $text .= " --limit $job->{limit}" if $job->{limit};
463 $text .= " --method $job->{method}";
464 $text .= " --verbose" if $job->{verbose};
465 $text .= " --source-user $job->{source_user}";
466 $text .= " --dest-user $job->{dest_user}";
467 $text .= " --prepend-storage-id" if $job->{prepend_storage_id};
468 $text .= " --properties" if $job->{properties};
469 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
470 $text .= "\n";
471
472 return $text;
473 }
474
475 sub list {
476
477 my $cfg = read_cron();
478
479 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
480
481 my $states = read_state();
482 foreach my $source (sort keys%{$cfg}) {
483 foreach my $name (sort keys%{$cfg->{$source}}) {
484 $list .= sprintf("%-25s", cut_target_width($source, 25));
485 $list .= sprintf("%-25s", cut_target_width($name, 25));
486 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
487 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
488 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
489 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
490 }
491 }
492
493 return $list;
494 }
495
496 sub vm_exists {
497 my ($target, $user) = @_;
498
499 return undef if !defined($target->{vmid});
500
501 my $conf_fn = "$target->{vmid}.conf";
502
503 if ($target->{ip}) {
504 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
505 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
506 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
507 } else {
508 return "qemu" if -f "$QEMU_CONF/$conf_fn";
509 return "lxc" if -f "$LXC_CONF/$conf_fn";
510 }
511
512 return undef;
513 }
514
515 sub init {
516 my ($param) = @_;
517
518 locked("$CONFIG_PATH/cron_and_state.lock", sub {
519 my $cfg = read_cron();
520
521 my $job = param_to_job($param);
522
523 $job->{state} = "ok";
524 $job->{lsync} = 0;
525
526 my $source = parse_target($param->{source});
527 my $dest = parse_target($param->{dest});
528
529 if (my $ip = $dest->{ip}) {
530 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
531 }
532
533 if (my $ip = $source->{ip}) {
534 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
535 }
536
537 die "Pool $dest->{all} does not exist\n"
538 if !check_dataset_exists($dest->{all}, $dest->{ip}, $param->{dest_user});
539
540 if (!defined($source->{vmid})) {
541 die "Pool $source->{all} does not exist\n"
542 if !check_dataset_exists($source->{all}, $source->{ip}, $param->{source_user});
543 }
544
545 my $vm_type = vm_exists($source, $param->{source_user});
546 $job->{vm_type} = $vm_type;
547 $source->{vm_type} = $vm_type;
548
549 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
550
551 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
552
553 #check if vm has zfs disks if not die;
554 get_disks($source, $param->{source_user}) if $source->{vmid};
555
556 update_cron($job);
557 update_state($job);
558 }); #cron and state lock
559
560 return if $param->{skip};
561
562 eval { sync($param) };
563 if (my $err = $@) {
564 destroy_job($param);
565 print $err;
566 }
567 }
568
569 sub get_job {
570 my ($param) = @_;
571
572 my $cfg = read_cron();
573
574 if (!$cfg->{$param->{source}}->{$param->{name}}) {
575 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
576 }
577 my $job = $cfg->{$param->{source}}->{$param->{name}};
578 $job->{name} = $param->{name};
579 $job->{source} = $param->{source};
580 $job = add_state_to_job($job);
581
582 return $job;
583 }
584
585 sub destroy_job {
586 my ($param) = @_;
587
588 locked("$CONFIG_PATH/cron_and_state.lock", sub {
589 my $job = get_job($param);
590 $job->{state} = "del";
591
592 update_cron($job);
593 update_state($job);
594 });
595 }
596
597 sub get_instance_id {
598 my ($pid) = @_;
599
600 my $stat = read_file("/proc/$pid/stat", 1)
601 or die "unable to read process stats\n";
602 my $boot_id = read_file("/proc/sys/kernel/random/boot_id", 1)
603 or die "unable to read boot ID\n";
604
605 my $stats = [ split(/\s+/, $stat) ];
606 my $starttime = $stats->[21];
607 chomp($boot_id);
608
609 return "${pid}:${starttime}:${boot_id}";
610 }
611
612 sub instance_exists {
613 my ($instance_id) = @_;
614
615 if (defined($instance_id) && $instance_id =~ m/^([1-9][0-9]*):/) {
616 my $pid = $1;
617 my $actual_id = eval { get_instance_id($pid); };
618 return defined($actual_id) && $actual_id eq $instance_id;
619 }
620
621 return 0;
622 }
623
624 sub sync {
625 my ($param) = @_;
626
627 my $job;
628
629 locked("$CONFIG_PATH/cron_and_state.lock", sub {
630 eval { $job = get_job($param) };
631
632 if ($job) {
633 my $state = $job->{state} // 'ok';
634 $state = 'ok' if !instance_exists($job->{instance_id});
635
636 if ($state eq "syncing" || $state eq "waiting") {
637 die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
638 }
639
640 $job->{state} = "waiting";
641 $job->{instance_id} = $INSTANCE_ID;
642
643 update_state($job);
644 }
645 });
646
647 locked("$CONFIG_PATH/sync.lock", sub {
648
649 my $date = get_date();
650
651 my $dest;
652 my $source;
653 my $vm_type;
654
655 locked("$CONFIG_PATH/cron_and_state.lock", sub {
656 #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
657 eval { $job = get_job($param); };
658
659 if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
660 die "Job --source $param->{source} --name $param->{name} has been disabled\n";
661 }
662
663 $dest = parse_target($param->{dest});
664 $source = parse_target($param->{source});
665
666 $vm_type = vm_exists($source, $param->{source_user});
667 $source->{vm_type} = $vm_type;
668
669 if ($job) {
670 $job->{state} = "syncing";
671 $job->{vm_type} = $vm_type if !$job->{vm_type};
672 update_state($job);
673 }
674 }); #cron and state lock
675
676 my $sync_path = sub {
677 my ($source, $dest, $job, $param, $date) = @_;
678
679 ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{dest_user});
680
681 prepare_prepended_target($source, $dest, $param->{dest_user}) if defined($dest->{prepend});
682
683 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
684
685 send_image($source, $dest, $param);
686
687 snapshot_destroy($source, $dest, $param->{method}, $dest->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $dest->{old_snap});
688
689 };
690
691 eval{
692 if ($source->{vmid}) {
693 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
694 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
695 my $disks = get_disks($source, $param->{source_user});
696
697 foreach my $disk (sort keys %{$disks}) {
698 $source->{all} = $disks->{$disk}->{all};
699 $source->{pool} = $disks->{$disk}->{pool};
700 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
701 $source->{last_part} = $disks->{$disk}->{last_part};
702
703 $dest->{prepend} = $disks->{$disk}->{storage_id}
704 if $param->{prepend_storage_id};
705
706 &$sync_path($source, $dest, $job, $param, $date);
707 }
708 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
709 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
710 } else {
711 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
712 }
713 } else {
714 &$sync_path($source, $dest, $job, $param, $date);
715 }
716 };
717 if (my $err = $@) {
718 locked("$CONFIG_PATH/cron_and_state.lock", sub {
719 eval { $job = get_job($param); };
720 if ($job) {
721 $job->{state} = "error";
722 delete $job->{instance_id};
723 update_state($job);
724 }
725 });
726 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
727 die "$err\n";
728 }
729
730 locked("$CONFIG_PATH/cron_and_state.lock", sub {
731 eval { $job = get_job($param); };
732 if ($job) {
733 if (defined($job->{state}) && $job->{state} eq "stopped") {
734 $job->{state} = "stopped";
735 } else {
736 $job->{state} = "ok";
737 }
738 $job->{lsync} = $date;
739 delete $job->{instance_id};
740 update_state($job);
741 }
742 });
743 }); #sync lock
744 }
745
746 sub snapshot_get{
747 my ($source, $dest, $max_snap, $name, $dest_user) = @_;
748
749 my $cmd = [];
750 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--', if $dest->{ip};
751 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
752
753 my $path = target_dataset($source, $dest);
754 push @$cmd, $path;
755
756 my $raw;
757 eval {$raw = run_cmd($cmd)};
758 if (my $erro =$@) { #this means the volume doesn't exist on dest yet
759 return undef;
760 }
761
762 my $index = 0;
763 my $line = "";
764 my $last_snap = undef;
765 my $old_snap;
766
767 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
768 $line = $1;
769 if ($line =~ m/@(.*)$/) {
770 $last_snap = $1 if (!$last_snap);
771 }
772 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
773 $old_snap = $1;
774 $index++;
775 if ($index == $max_snap) {
776 $source->{destroy} = 1;
777 last;
778 };
779 }
780 }
781
782 return ($old_snap, $last_snap) if $last_snap;
783
784 return undef;
785 }
786
787 sub snapshot_add {
788 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
789
790 my $snap_name = "rep_$name\_".$date;
791
792 $source->{new_snap} = $snap_name;
793
794 my $path = "$source->{all}\@$snap_name";
795
796 my $cmd = [];
797 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
798 push @$cmd, 'zfs', 'snapshot', $path;
799 eval{
800 run_cmd($cmd);
801 };
802
803 if (my $err = $@) {
804 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
805 die "$err\n";
806 }
807 }
808
809 sub get_disks {
810 my ($target, $user) = @_;
811
812 my $cmd = [];
813 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
814
815 if ($target->{vm_type} eq 'qemu') {
816 push @$cmd, 'qm', 'config', $target->{vmid};
817 } elsif ($target->{vm_type} eq 'lxc') {
818 push @$cmd, 'pct', 'config', $target->{vmid};
819 } else {
820 die "VM Type unknown\n";
821 }
822
823 my $res = run_cmd($cmd);
824
825 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
826
827 return $disks;
828 }
829
830 sub run_cmd {
831 my ($cmd) = @_;
832 print "Start CMD\n" if $DEBUG;
833 print Dumper $cmd if $DEBUG;
834 if (ref($cmd) eq 'ARRAY') {
835 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
836 }
837 my $output = `$cmd 2>&1`;
838
839 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
840
841 chomp($output);
842 print Dumper $output if $DEBUG;
843 print "END CMD\n" if $DEBUG;
844 return $output;
845 }
846
847 sub parse_disks {
848 my ($text, $ip, $vm_type, $user) = @_;
849
850 my $disks;
851
852 my $num = 0;
853 while ($text && $text =~ s/^(.*?)(\n|$)//) {
854 my $line = $1;
855
856 next if $line =~ /media=cdrom/;
857 next if $line !~ m/$DISK_KEY_RE/;
858
859 #QEMU if backup is not set include in sync
860 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
861
862 #LXC if backup is not set do no in sync
863 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
864
865 my $disk = undef;
866 my $stor = undef;
867 if($line =~ m/$DISK_KEY_RE(.*)$/) {
868 my @parameter = split(/,/,$1);
869
870 foreach my $opt (@parameter) {
871 if ($opt =~ m/^(?:file=|volume=)?([^:]+):([A-Za-z0-9\-]+)$/){
872 $disk = $2;
873 $stor = $1;
874 last;
875 }
876 }
877 }
878 if (!defined($disk) || !defined($stor)) {
879 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
880 next;
881 }
882
883 my $cmd = [];
884 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
885 push @$cmd, 'pvesm', 'path', "$stor:$disk";
886 my $path = run_cmd($cmd);
887
888 die "Get no path from pvesm path $stor:$disk\n" if !$path;
889
890 $disks->{$num}->{storage_id} = $stor;
891
892 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
893
894 my @array = split('/', $1);
895 $disks->{$num}->{pool} = shift(@array);
896 $disks->{$num}->{all} = $disks->{$num}->{pool};
897 if (0 < @array) {
898 $disks->{$num}->{path} = join('/', @array);
899 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
900 }
901 $disks->{$num}->{last_part} = $disk;
902 $disks->{$num}->{all} .= "\/$disk";
903
904 $num++;
905 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
906
907 $disks->{$num}->{pool} = $1;
908 $disks->{$num}->{all} = $disks->{$num}->{pool};
909
910 if ($2) {
911 $disks->{$num}->{path} = $3;
912 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
913 }
914
915 $disks->{$num}->{last_part} = $disk;
916 $disks->{$num}->{all} .= "\/$disk";
917
918 $num++;
919
920 } else {
921 die "ERROR: in path\n";
922 }
923 }
924
925 die "Vm include no disk on zfs.\n" if !$disks->{0};
926 return $disks;
927 }
928
929 # how the corresponding dataset is named on the target
930 sub target_dataset {
931 my ($source, $dest) = @_;
932
933 my $target = "$dest->{all}";
934 $target .= "/$dest->{prepend}" if defined($dest->{prepend});
935 $target .= "/$source->{last_part}" if $source->{last_part};
936 $target =~ s!/+!/!g;
937
938 return $target;
939 }
940
941 # create the parent dataset for the actual target
942 sub prepare_prepended_target {
943 my ($source, $dest, $dest_user) = @_;
944
945 die "internal error - not a prepended target\n" if !defined($dest->{prepend});
946
947 # The parent dataset shouldn't be the actual target.
948 die "internal error - no last_part for source\n" if !$source->{last_part};
949
950 my $target = "$dest->{all}/$dest->{prepend}";
951 $target =~ s!/+!/!g;
952
953 return if check_dataset_exists($target, $dest->{ip}, $dest_user);
954
955 create_file_system($target, $dest->{ip}, $dest_user);
956 }
957
958 sub snapshot_destroy {
959 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
960
961 my @zfscmd = ('zfs', 'destroy');
962 my $snapshot = "$source->{all}\@$snap";
963
964 eval {
965 if($source->{ip} && $method eq 'ssh'){
966 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
967 } else {
968 run_cmd([@zfscmd, $snapshot]);
969 }
970 };
971 if (my $erro = $@) {
972 warn "WARN: $erro";
973 }
974 if ($dest) {
975 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
976
977 my $path = target_dataset($source, $dest);
978
979 eval {
980 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
981 };
982 if (my $erro = $@) {
983 warn "WARN: $erro";
984 }
985 }
986 }
987
988 # check if snapshot for incremental sync exist on source side
989 sub snapshot_exist {
990 my ($source , $dest, $method, $source_user) = @_;
991
992 my $cmd = [];
993 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip};
994 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
995
996 my $path = $source->{all};
997 $path .= "\@$dest->{last_snap}";
998
999 push @$cmd, $path;
1000
1001 eval {run_cmd($cmd)};
1002 if (my $erro =$@) {
1003 warn "WARN: $erro";
1004 return undef;
1005 }
1006 return 1;
1007 }
1008
1009 sub send_image {
1010 my ($source, $dest, $param) = @_;
1011
1012 my $cmd = [];
1013
1014 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
1015 push @$cmd, 'zfs', 'send';
1016 push @$cmd, '-p', if $param->{properties};
1017 push @$cmd, '-v' if $param->{verbose};
1018
1019 if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) {
1020 push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}";
1021 }
1022 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
1023
1024 if ($param->{limit}){
1025 my $bwl = $param->{limit}*1024;
1026 push @$cmd, \'|', 'cstream', '-t', $bwl;
1027 }
1028 my $target = target_dataset($source, $dest);
1029
1030 push @$cmd, \'|';
1031 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
1032 push @$cmd, 'zfs', 'recv', '-F', '--';
1033 push @$cmd, "$target";
1034
1035 eval {
1036 run_cmd($cmd)
1037 };
1038
1039 if (my $erro = $@) {
1040 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
1041 die $erro;
1042 };
1043 }
1044
1045
1046 sub send_config{
1047 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
1048
1049 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
1050 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
1051
1052 my $config_dir = $dest_config_path // $CONFIG_PATH;
1053 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
1054
1055 $dest_target_new = $config_dir.'/'.$dest_target_new;
1056
1057 if ($method eq 'ssh'){
1058 if ($dest->{ip} && $source->{ip}) {
1059 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1060 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1061 } elsif ($dest->{ip}) {
1062 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1063 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1064 } elsif ($source->{ip}) {
1065 run_cmd(['mkdir', '-p', '--', $config_dir]);
1066 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
1067 }
1068
1069 if ($source->{destroy}){
1070 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$dest->{old_snap}";
1071 if($dest->{ip}){
1072 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
1073 } else {
1074 run_cmd(['rm', '-f', '--', $dest_target_old]);
1075 }
1076 }
1077 } elsif ($method eq 'local') {
1078 run_cmd(['mkdir', '-p', '--', $config_dir]);
1079 run_cmd(['cp', $source_target, $dest_target_new]);
1080 }
1081 }
1082
1083 sub get_date {
1084 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1085 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1086
1087 return $datestamp;
1088 }
1089
1090 sub status {
1091 my $cfg = read_cron();
1092
1093 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1094
1095 my $states = read_state();
1096
1097 foreach my $source (sort keys%{$cfg}) {
1098 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1099 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1100 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1101 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1102 }
1103 }
1104
1105 return $status_list;
1106 }
1107
1108 sub enable_job {
1109 my ($param) = @_;
1110
1111 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1112 my $job = get_job($param);
1113 $job->{state} = "ok";
1114 update_state($job);
1115 update_cron($job);
1116 });
1117 }
1118
1119 sub disable_job {
1120 my ($param) = @_;
1121
1122 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1123 my $job = get_job($param);
1124 $job->{state} = "stopped";
1125 update_state($job);
1126 update_cron($job);
1127 });
1128 }
1129
1130 my $cmd_help = {
1131 destroy => qq{
1132 $PROGNAME destroy -source <string> [OPTIONS]
1133
1134 remove a sync Job from the scheduler
1135
1136 -name string
1137
1138 name of the sync job, if not set it is default
1139
1140 -source string
1141
1142 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1143 },
1144 create => qq{
1145 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1146
1147 Create a sync Job
1148
1149 -dest string
1150
1151 the destination target is like [IP]:<Pool>[/Path]
1152
1153 -dest-user string
1154
1155 name of the user on the destination target, root by default
1156
1157 -limit integer
1158
1159 max sync speed in kBytes/s, default unlimited
1160
1161 -maxsnap integer
1162
1163 how much snapshots will be kept before get erased, default 1
1164
1165 -name string
1166
1167 name of the sync job, if not set it is default
1168
1169 -prepend-storage-id
1170
1171 If specified, prepend the storage ID to the destination's path(s).
1172
1173 -skip
1174
1175 If specified, skip the first sync.
1176
1177 -source string
1178
1179 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1180
1181 -source-user string
1182
1183 name of the user on the source target, root by default
1184
1185 -properties
1186
1187 If specified, include the dataset's properties in the stream.
1188
1189 -dest-config-path string
1190
1191 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1192 },
1193 sync => qq{
1194 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1195
1196 will sync one time
1197
1198 -dest string
1199
1200 the destination target is like [IP:]<Pool>[/Path]
1201
1202 -dest-user string
1203
1204 name of the user on the destination target, root by default
1205
1206 -limit integer
1207
1208 max sync speed in kBytes/s, default unlimited
1209
1210 -maxsnap integer
1211
1212 how much snapshots will be kept before get erased, default 1
1213
1214 -name string
1215
1216 name of the sync job, if not set it is default.
1217 It is only necessary if scheduler allready contains this source.
1218
1219 -prepend-storage-id
1220
1221 If specified, prepend the storage ID to the destination's path(s).
1222
1223 -source string
1224
1225 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1226
1227 -source-user string
1228
1229 name of the user on the source target, root by default
1230
1231 -verbose
1232
1233 If specified, print out the sync progress.
1234
1235 -properties
1236
1237 If specified, include the dataset's properties in the stream.
1238
1239 -dest-config-path string
1240
1241 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1242 },
1243 list => qq{
1244 $PROGNAME list
1245
1246 Get a List of all scheduled Sync Jobs
1247 },
1248 status => qq{
1249 $PROGNAME status
1250
1251 Get the status of all scheduled Sync Jobs
1252 },
1253 help => qq{
1254 $PROGNAME help <cmd> [OPTIONS]
1255
1256 Get help about specified command.
1257
1258 <cmd> string
1259
1260 Command name
1261
1262 -verbose
1263
1264 Verbose output format.
1265 },
1266 enable => qq{
1267 $PROGNAME enable -source <string> [OPTIONS]
1268
1269 enable a syncjob and reset error
1270
1271 -name string
1272
1273 name of the sync job, if not set it is default
1274
1275 -source string
1276
1277 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1278 },
1279 disable => qq{
1280 $PROGNAME disable -source <string> [OPTIONS]
1281
1282 pause a sync job
1283
1284 -name string
1285
1286 name of the sync job, if not set it is default
1287
1288 -source string
1289
1290 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1291 },
1292 printpod => 'internal command',
1293
1294 };
1295
1296 if (!$command) {
1297 usage(); die "\n";
1298 } elsif (!$cmd_help->{$command}) {
1299 print "ERROR: unknown command '$command'";
1300 usage(1); die "\n";
1301 }
1302
1303 my @arg = @ARGV;
1304 my $param = parse_argv(@arg);
1305
1306 sub check_params {
1307 for (@_) {
1308 die "$cmd_help->{$command}\n" if !$param->{$_};
1309 }
1310 }
1311
1312 if ($command eq 'destroy') {
1313 check_params(qw(source));
1314
1315 check_target($param->{source});
1316 destroy_job($param);
1317
1318 } elsif ($command eq 'sync') {
1319 check_params(qw(source dest));
1320
1321 check_target($param->{source});
1322 check_target($param->{dest});
1323 sync($param);
1324
1325 } elsif ($command eq 'create') {
1326 check_params(qw(source dest));
1327
1328 check_target($param->{source});
1329 check_target($param->{dest});
1330 init($param);
1331
1332 } elsif ($command eq 'status') {
1333 print status();
1334
1335 } elsif ($command eq 'list') {
1336 print list();
1337
1338 } elsif ($command eq 'help') {
1339 my $help_command = $ARGV[1];
1340
1341 if ($help_command && $cmd_help->{$help_command}) {
1342 die "$cmd_help->{$help_command}\n";
1343
1344 }
1345 if ($param->{verbose}) {
1346 exec("man $PROGNAME");
1347
1348 } else {
1349 usage(1);
1350
1351 }
1352
1353 } elsif ($command eq 'enable') {
1354 check_params(qw(source));
1355
1356 check_target($param->{source});
1357 enable_job($param);
1358
1359 } elsif ($command eq 'disable') {
1360 check_params(qw(source));
1361
1362 check_target($param->{source});
1363 disable_job($param);
1364
1365 } elsif ($command eq 'printpod') {
1366 print_pod();
1367 }
1368
1369 sub usage {
1370 my ($help) = @_;
1371
1372 print("ERROR:\tno command specified\n") if !$help;
1373 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1374 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1375 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1376 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1377 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1378 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1379 print("\t$PROGNAME list\n");
1380 print("\t$PROGNAME status\n");
1381 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1382 }
1383
1384 sub check_target {
1385 my ($target) = @_;
1386 parse_target($target);
1387 }
1388
1389 sub print_pod {
1390
1391 my $synopsis = join("\n", sort values %$cmd_help);
1392
1393 print <<EOF;
1394 =head1 NAME
1395
1396 pve-zsync - PVE ZFS Replication Manager
1397
1398 =head1 SYNOPSIS
1399
1400 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1401
1402 $synopsis
1403
1404 =head1 DESCRIPTION
1405
1406 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1407 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1408 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1409 To config cron see man crontab.
1410
1411 =head2 PVE ZFS Storage sync Tool
1412
1413 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1414
1415 =head1 EXAMPLES
1416
1417 add sync job from local VM to remote ZFS Server
1418 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1419
1420 =head1 IMPORTANT FILES
1421
1422 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1423
1424 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1425
1426 =head1 COPYRIGHT AND DISCLAIMER
1427
1428 Copyright (C) 2007-2021 Proxmox Server Solutions GmbH
1429
1430 This program is free software: you can redistribute it and/or modify it
1431 under the terms of the GNU Affero General Public License as published
1432 by the Free Software Foundation, either version 3 of the License, or
1433 (at your option) any later version.
1434
1435 This program is distributed in the hope that it will be useful, but
1436 WITHOUT ANY WARRANTY; without even the implied warranty of
1437 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1438 Affero General Public License for more details.
1439
1440 You should have received a copy of the GNU Affero General Public
1441 License along with this program. If not, see
1442 <http://www.gnu.org/licenses/>.
1443
1444 EOF
1445 }