]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
add target_dataset function
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $PROG_PATH = "$PATH/${PROGNAME}";
22 my $INTERVAL = 15;
23 my $DEBUG;
24
25 BEGIN {
26 $DEBUG = 0; # change default here. not above on declaration!
27 $DEBUG ||= $ENV{ZSYNC_DEBUG};
28 if ($DEBUG) {
29 require Data::Dumper;
30 Data::Dumper->import();
31 }
32 }
33
34 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
35 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
36 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
37 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
38
39 my $IPV6RE = "(?:" .
40 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
41 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
42 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
43 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
49
50 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
51 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
52 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
53 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
54 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
55
56 my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|mp)\d+)|rootfs): /;
57
58 my $INSTANCE_ID = get_instance_id($$);
59
60 my $command = $ARGV[0];
61
62 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
63 check_bin ('cstream');
64 check_bin ('zfs');
65 check_bin ('ssh');
66 check_bin ('scp');
67 }
68
69 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
70 die "Signaled, aborting sync: $!\n";
71 };
72
73 sub check_bin {
74 my ($bin) = @_;
75
76 foreach my $p (split (/:/, $ENV{PATH})) {
77 my $fn = "$p/$bin";
78 if (-x $fn) {
79 return $fn;
80 }
81 }
82
83 die "unable to find command '$bin'\n";
84 }
85
86 sub read_file {
87 my ($filename, $one_line_only) = @_;
88
89 my $fh = IO::File->new($filename, "r")
90 or die "Could not open file ${filename}: $!\n";
91
92 my $text = $one_line_only ? <$fh> : [ <$fh> ];
93
94 close($fh);
95
96 return $text;
97 }
98
99 sub cut_target_width {
100 my ($path, $maxlen) = @_;
101 $path =~ s@/+@/@g;
102
103 return $path if length($path) <= $maxlen;
104
105 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
106
107 $path =~ s@/([^/]+/?)$@@;
108 my $tail = $1;
109
110 if (length($tail)+3 == $maxlen) {
111 return "../$tail";
112 } elsif (length($tail)+2 >= $maxlen) {
113 return '..'.substr($tail, -$maxlen+2)
114 }
115
116 $path =~ s@(/[^/]+)(?:/|$)@@;
117 my $head = $1;
118 my $both = length($head) + length($tail);
119 my $remaining = $maxlen-$both-4; # -4 for "/../"
120
121 if ($remaining < 0) {
122 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
123 }
124
125 substr($path, ($remaining/2), (length($path)-$remaining), '..');
126 return "$head/" . $path . "/$tail";
127 }
128
129 sub locked {
130 my ($lock_fn, $code) = @_;
131
132 my $lock_fh = IO::File->new("> $lock_fn");
133
134 flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
135 my $res = eval { $code->() };
136 my $err = $@;
137
138 flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
139 die "$err" if $err;
140
141 close($lock_fh);
142 return $res;
143 }
144
145 sub get_status {
146 my ($source, $name, $status) = @_;
147
148 if ($status->{$source->{all}}->{$name}->{status}) {
149 return $status;
150 }
151
152 return undef;
153 }
154
155 sub check_pool_exists {
156 my ($target, $user) = @_;
157
158 my $cmd = [];
159
160 if ($target->{ip}) {
161 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
162 }
163 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
164 eval {
165 run_cmd($cmd);
166 };
167
168 if ($@) {
169 return 0;
170 }
171 return 1;
172 }
173
174 sub parse_target {
175 my ($text) = @_;
176
177 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
178 my $target = {};
179
180 if ($text !~ $TARGETRE) {
181 die "$errstr\n";
182 }
183 $target->{all} = $2;
184 $target->{ip} = $1 if $1;
185 my @parts = split('/', $2);
186
187 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
188
189 my $pool = $target->{pool} = shift(@parts);
190 die "$errstr\n" if !$pool;
191
192 if ($pool =~ m/^\d+$/) {
193 $target->{vmid} = $pool;
194 delete $target->{pool};
195 }
196
197 return $target if (@parts == 0);
198 $target->{last_part} = pop(@parts);
199
200 if ($target->{ip}) {
201 pop(@parts);
202 }
203 if (@parts > 0) {
204 $target->{path} = join('/', @parts);
205 }
206
207 return $target;
208 }
209
210 sub read_cron {
211
212 #This is for the first use to init file;
213 if (!-e $CRONJOBS) {
214 my $new_fh = IO::File->new("> $CRONJOBS");
215 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
216 close($new_fh);
217 return undef;
218 }
219
220 my $text = read_file($CRONJOBS, 0);
221
222 return encode_cron(@{$text});
223 }
224
225 sub parse_argv {
226 my (@arg) = @_;
227
228 my $param = {
229 dest => undef,
230 source => undef,
231 verbose => undef,
232 limit => undef,
233 maxsnap => undef,
234 name => undef,
235 skip => undef,
236 method => undef,
237 source_user => undef,
238 dest_user => undef,
239 properties => undef,
240 dest_config_path => undef,
241 };
242
243 my ($ret) = GetOptionsFromArray(
244 \@arg,
245 'dest=s' => \$param->{dest},
246 'source=s' => \$param->{source},
247 'verbose' => \$param->{verbose},
248 'limit=i' => \$param->{limit},
249 'maxsnap=i' => \$param->{maxsnap},
250 'name=s' => \$param->{name},
251 'skip' => \$param->{skip},
252 'method=s' => \$param->{method},
253 'source-user=s' => \$param->{source_user},
254 'dest-user=s' => \$param->{dest_user},
255 'properties' => \$param->{properties},
256 'dest-config-path=s' => \$param->{dest_config_path},
257 );
258
259 die "can't parse options\n" if $ret == 0;
260
261 $param->{name} //= "default";
262 $param->{maxsnap} //= 1;
263 $param->{method} //= "ssh";
264 $param->{source_user} //= "root";
265 $param->{dest_user} //= "root";
266
267 return $param;
268 }
269
270 sub add_state_to_job {
271 my ($job) = @_;
272
273 my $states = read_state();
274 my $state = $states->{$job->{source}}->{$job->{name}};
275
276 $job->{state} = $state->{state};
277 $job->{lsync} = $state->{lsync};
278 $job->{vm_type} = $state->{vm_type};
279 $job->{instance_id} = $state->{instance_id};
280
281 for (my $i = 0; $state->{"snap$i"}; $i++) {
282 $job->{"snap$i"} = $state->{"snap$i"};
283 }
284
285 return $job;
286 }
287
288 sub encode_cron {
289 my (@text) = @_;
290
291 my $cfg = {};
292
293 while (my $line = shift(@text)) {
294
295 my @arg = split('\s', $line);
296 my $param = parse_argv(@arg);
297
298 if ($param->{source} && $param->{dest}) {
299 my $source = delete $param->{source};
300 my $name = delete $param->{name};
301
302 $cfg->{$source}->{$name} = $param;
303 }
304 }
305
306 return $cfg;
307 }
308
309 sub param_to_job {
310 my ($param) = @_;
311
312 my $job = {};
313
314 my $source = parse_target($param->{source});
315 my $dest = parse_target($param->{dest}) if $param->{dest};
316
317 $job->{name} = !$param->{name} ? "default" : $param->{name};
318 $job->{dest} = $param->{dest} if $param->{dest};
319 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
320 $job->{method} = "ssh" if !$job->{method};
321 $job->{limit} = $param->{limit};
322 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
323 $job->{source} = $param->{source};
324 $job->{source_user} = $param->{source_user};
325 $job->{dest_user} = $param->{dest_user};
326 $job->{properties} = !!$param->{properties};
327 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
328
329 return $job;
330 }
331
332 sub read_state {
333
334 if (!-e $STATE) {
335 make_path $CONFIG_PATH;
336 my $new_fh = IO::File->new("> $STATE");
337 die "Could not create $STATE: $!\n" if !$new_fh;
338 print $new_fh "{}";
339 close($new_fh);
340 return undef;
341 }
342
343 my $text = read_file($STATE, 1);
344 return decode_json($text);
345 }
346
347 sub update_state {
348 my ($job) = @_;
349
350 my $text = eval { read_file($STATE, 1); };
351
352 my $out_fh = IO::File->new("> $STATE.new");
353 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
354
355 my $states = {};
356 my $state = {};
357 if ($text){
358 $states = decode_json($text);
359 $state = $states->{$job->{source}}->{$job->{name}};
360 }
361
362 if ($job->{state} ne "del") {
363 $state->{state} = $job->{state};
364 $state->{lsync} = $job->{lsync};
365 $state->{instance_id} = $job->{instance_id};
366 $state->{vm_type} = $job->{vm_type};
367
368 for (my $i = 0; $job->{"snap$i"} ; $i++) {
369 $state->{"snap$i"} = $job->{"snap$i"};
370 }
371 $states->{$job->{source}}->{$job->{name}} = $state;
372 } else {
373
374 delete $states->{$job->{source}}->{$job->{name}};
375 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
376 }
377
378 $text = encode_json($states);
379 print $out_fh $text;
380
381 close($out_fh);
382 rename "$STATE.new", $STATE;
383
384 return $states;
385 }
386
387 sub update_cron {
388 my ($job) = @_;
389
390 my $updated;
391 my $has_header;
392 my $line_no = 0;
393 my $text = "";
394 my $header = "SHELL=/bin/sh\n";
395 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
396
397 my $current = read_file($CRONJOBS, 0);
398
399 foreach my $line (@{$current}) {
400 chomp($line);
401 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
402 $updated = 1;
403 next if $job->{state} eq "del";
404 $text .= format_job($job, $line);
405 } else {
406 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
407 $has_header = 1;
408 }
409 $text .= "$line\n";
410 }
411 $line_no++;
412 }
413
414 if (!$has_header) {
415 $text = "$header$text";
416 }
417
418 if (!$updated) {
419 $text .= format_job($job);
420 }
421 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
422 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
423
424 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
425 close ($new_fh);
426
427 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
428 }
429
430 sub format_job {
431 my ($job, $line) = @_;
432 my $text = "";
433
434 if ($job->{state} eq "stopped") {
435 $text = "#";
436 }
437 if ($line) {
438 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
439 $text .= $1;
440 } else {
441 $text .= "*/$INTERVAL * * * *";
442 }
443 $text .= " root";
444 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
445 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
446 $text .= " --limit $job->{limit}" if $job->{limit};
447 $text .= " --method $job->{method}";
448 $text .= " --verbose" if $job->{verbose};
449 $text .= " --source-user $job->{source_user}";
450 $text .= " --dest-user $job->{dest_user}";
451 $text .= " --properties" if $job->{properties};
452 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
453 $text .= "\n";
454
455 return $text;
456 }
457
458 sub list {
459
460 my $cfg = read_cron();
461
462 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
463
464 my $states = read_state();
465 foreach my $source (sort keys%{$cfg}) {
466 foreach my $name (sort keys%{$cfg->{$source}}) {
467 $list .= sprintf("%-25s", cut_target_width($source, 25));
468 $list .= sprintf("%-25s", cut_target_width($name, 25));
469 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
470 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
471 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
472 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
473 }
474 }
475
476 return $list;
477 }
478
479 sub vm_exists {
480 my ($target, $user) = @_;
481
482 return undef if !defined($target->{vmid});
483
484 my $conf_fn = "$target->{vmid}.conf";
485
486 if ($target->{ip}) {
487 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
488 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
489 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
490 } else {
491 return "qemu" if -f "$QEMU_CONF/$conf_fn";
492 return "lxc" if -f "$LXC_CONF/$conf_fn";
493 }
494
495 return undef;
496 }
497
498 sub init {
499 my ($param) = @_;
500
501 locked("$CONFIG_PATH/cron_and_state.lock", sub {
502 my $cfg = read_cron();
503
504 my $job = param_to_job($param);
505
506 $job->{state} = "ok";
507 $job->{lsync} = 0;
508
509 my $source = parse_target($param->{source});
510 my $dest = parse_target($param->{dest});
511
512 if (my $ip = $dest->{ip}) {
513 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
514 }
515
516 if (my $ip = $source->{ip}) {
517 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
518 }
519
520 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
521
522 if (!defined($source->{vmid})) {
523 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
524 }
525
526 my $vm_type = vm_exists($source, $param->{source_user});
527 $job->{vm_type} = $vm_type;
528 $source->{vm_type} = $vm_type;
529
530 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
531
532 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
533
534 #check if vm has zfs disks if not die;
535 get_disks($source, $param->{source_user}) if $source->{vmid};
536
537 update_cron($job);
538 update_state($job);
539 }); #cron and state lock
540
541 return if $param->{skip};
542
543 eval { sync($param) };
544 if (my $err = $@) {
545 destroy_job($param);
546 print $err;
547 }
548 }
549
550 sub get_job {
551 my ($param) = @_;
552
553 my $cfg = read_cron();
554
555 if (!$cfg->{$param->{source}}->{$param->{name}}) {
556 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
557 }
558 my $job = $cfg->{$param->{source}}->{$param->{name}};
559 $job->{name} = $param->{name};
560 $job->{source} = $param->{source};
561 $job = add_state_to_job($job);
562
563 return $job;
564 }
565
566 sub destroy_job {
567 my ($param) = @_;
568
569 locked("$CONFIG_PATH/cron_and_state.lock", sub {
570 my $job = get_job($param);
571 $job->{state} = "del";
572
573 update_cron($job);
574 update_state($job);
575 });
576 }
577
578 sub get_instance_id {
579 my ($pid) = @_;
580
581 my $stat = read_file("/proc/$pid/stat", 1)
582 or die "unable to read process stats\n";
583 my $boot_id = read_file("/proc/sys/kernel/random/boot_id", 1)
584 or die "unable to read boot ID\n";
585
586 my $stats = [ split(/\s+/, $stat) ];
587 my $starttime = $stats->[21];
588 chomp($boot_id);
589
590 return "${pid}:${starttime}:${boot_id}";
591 }
592
593 sub instance_exists {
594 my ($instance_id) = @_;
595
596 if (defined($instance_id) && $instance_id =~ m/^([1-9][0-9]*):/) {
597 my $pid = $1;
598 my $actual_id = eval { get_instance_id($pid); };
599 return defined($actual_id) && $actual_id eq $instance_id;
600 }
601
602 return 0;
603 }
604
605 sub sync {
606 my ($param) = @_;
607
608 my $job;
609
610 locked("$CONFIG_PATH/cron_and_state.lock", sub {
611 eval { $job = get_job($param) };
612
613 if ($job) {
614 my $state = $job->{state} // 'ok';
615 $state = 'ok' if !instance_exists($job->{instance_id});
616
617 if ($state eq "syncing" || $state eq "waiting") {
618 die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
619 }
620
621 $job->{state} = "waiting";
622 $job->{instance_id} = $INSTANCE_ID;
623
624 update_state($job);
625 }
626 });
627
628 locked("$CONFIG_PATH/sync.lock", sub {
629
630 my $date = get_date();
631
632 my $dest;
633 my $source;
634 my $vm_type;
635
636 locked("$CONFIG_PATH/cron_and_state.lock", sub {
637 #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
638 eval { $job = get_job($param); };
639
640 if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
641 die "Job --source $param->{source} --name $param->{name} has been disabled\n";
642 }
643
644 $dest = parse_target($param->{dest});
645 $source = parse_target($param->{source});
646
647 $vm_type = vm_exists($source, $param->{source_user});
648 $source->{vm_type} = $vm_type;
649
650 if ($job) {
651 $job->{state} = "syncing";
652 $job->{vm_type} = $vm_type if !$job->{vm_type};
653 update_state($job);
654 }
655 }); #cron and state lock
656
657 my $sync_path = sub {
658 my ($source, $dest, $job, $param, $date) = @_;
659
660 ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{dest_user});
661
662 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
663
664 send_image($source, $dest, $param);
665
666 snapshot_destroy($source, $dest, $param->{method}, $dest->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $dest->{old_snap});
667
668 };
669
670 eval{
671 if ($source->{vmid}) {
672 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
673 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
674 my $disks = get_disks($source, $param->{source_user});
675
676 foreach my $disk (sort keys %{$disks}) {
677 $source->{all} = $disks->{$disk}->{all};
678 $source->{pool} = $disks->{$disk}->{pool};
679 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
680 $source->{last_part} = $disks->{$disk}->{last_part};
681 &$sync_path($source, $dest, $job, $param, $date);
682 }
683 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
684 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
685 } else {
686 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
687 }
688 } else {
689 &$sync_path($source, $dest, $job, $param, $date);
690 }
691 };
692 if (my $err = $@) {
693 locked("$CONFIG_PATH/cron_and_state.lock", sub {
694 eval { $job = get_job($param); };
695 if ($job) {
696 $job->{state} = "error";
697 delete $job->{instance_id};
698 update_state($job);
699 }
700 });
701 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
702 die "$err\n";
703 }
704
705 locked("$CONFIG_PATH/cron_and_state.lock", sub {
706 eval { $job = get_job($param); };
707 if ($job) {
708 if (defined($job->{state}) && $job->{state} eq "stopped") {
709 $job->{state} = "stopped";
710 } else {
711 $job->{state} = "ok";
712 }
713 $job->{lsync} = $date;
714 delete $job->{instance_id};
715 update_state($job);
716 }
717 });
718 }); #sync lock
719 }
720
721 sub snapshot_get{
722 my ($source, $dest, $max_snap, $name, $dest_user) = @_;
723
724 my $cmd = [];
725 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--', if $dest->{ip};
726 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
727
728 my $path = target_dataset($source, $dest);
729 push @$cmd, $path;
730
731 my $raw;
732 eval {$raw = run_cmd($cmd)};
733 if (my $erro =$@) { #this means the volume doesn't exist on dest yet
734 return undef;
735 }
736
737 my $index = 0;
738 my $line = "";
739 my $last_snap = undef;
740 my $old_snap;
741
742 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
743 $line = $1;
744 if ($line =~ m/@(.*)$/) {
745 $last_snap = $1 if (!$last_snap);
746 }
747 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
748 $old_snap = $1;
749 $index++;
750 if ($index == $max_snap) {
751 $source->{destroy} = 1;
752 last;
753 };
754 }
755 }
756
757 return ($old_snap, $last_snap) if $last_snap;
758
759 return undef;
760 }
761
762 sub snapshot_add {
763 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
764
765 my $snap_name = "rep_$name\_".$date;
766
767 $source->{new_snap} = $snap_name;
768
769 my $path = "$source->{all}\@$snap_name";
770
771 my $cmd = [];
772 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
773 push @$cmd, 'zfs', 'snapshot', $path;
774 eval{
775 run_cmd($cmd);
776 };
777
778 if (my $err = $@) {
779 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
780 die "$err\n";
781 }
782 }
783
784 sub get_disks {
785 my ($target, $user) = @_;
786
787 my $cmd = [];
788 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
789
790 if ($target->{vm_type} eq 'qemu') {
791 push @$cmd, 'qm', 'config', $target->{vmid};
792 } elsif ($target->{vm_type} eq 'lxc') {
793 push @$cmd, 'pct', 'config', $target->{vmid};
794 } else {
795 die "VM Type unknown\n";
796 }
797
798 my $res = run_cmd($cmd);
799
800 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
801
802 return $disks;
803 }
804
805 sub run_cmd {
806 my ($cmd) = @_;
807 print "Start CMD\n" if $DEBUG;
808 print Dumper $cmd if $DEBUG;
809 if (ref($cmd) eq 'ARRAY') {
810 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
811 }
812 my $output = `$cmd 2>&1`;
813
814 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
815
816 chomp($output);
817 print Dumper $output if $DEBUG;
818 print "END CMD\n" if $DEBUG;
819 return $output;
820 }
821
822 sub parse_disks {
823 my ($text, $ip, $vm_type, $user) = @_;
824
825 my $disks;
826
827 my $num = 0;
828 while ($text && $text =~ s/^(.*?)(\n|$)//) {
829 my $line = $1;
830
831 next if $line =~ /media=cdrom/;
832 next if $line !~ m/$DISK_KEY_RE/;
833
834 #QEMU if backup is not set include in sync
835 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
836
837 #LXC if backup is not set do no in sync
838 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
839
840 my $disk = undef;
841 my $stor = undef;
842 if($line =~ m/$DISK_KEY_RE(.*)$/) {
843 my @parameter = split(/,/,$1);
844
845 foreach my $opt (@parameter) {
846 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
847 $disk = $2;
848 $stor = $1;
849 last;
850 }
851 }
852 }
853 if (!defined($disk) || !defined($stor)) {
854 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
855 next;
856 }
857
858 my $cmd = [];
859 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
860 push @$cmd, 'pvesm', 'path', "$stor$disk";
861 my $path = run_cmd($cmd);
862
863 die "Get no path from pvesm path $stor$disk\n" if !$path;
864
865 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
866
867 my @array = split('/', $1);
868 $disks->{$num}->{pool} = shift(@array);
869 $disks->{$num}->{all} = $disks->{$num}->{pool};
870 if (0 < @array) {
871 $disks->{$num}->{path} = join('/', @array);
872 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
873 }
874 $disks->{$num}->{last_part} = $disk;
875 $disks->{$num}->{all} .= "\/$disk";
876
877 $num++;
878 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
879
880 $disks->{$num}->{pool} = $1;
881 $disks->{$num}->{all} = $disks->{$num}->{pool};
882
883 if ($2) {
884 $disks->{$num}->{path} = $3;
885 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
886 }
887
888 $disks->{$num}->{last_part} = $disk;
889 $disks->{$num}->{all} .= "\/$disk";
890
891 $num++;
892
893 } else {
894 die "ERROR: in path\n";
895 }
896 }
897
898 die "Vm include no disk on zfs.\n" if !$disks->{0};
899 return $disks;
900 }
901
902 # how the corresponding dataset is named on the target
903 sub target_dataset {
904 my ($source, $dest) = @_;
905
906 my $target = "$dest->{all}";
907 $target .= "/$source->{last_part}" if $source->{last_part};
908 $target =~ s!/+!/!g;
909
910 return $target;
911 }
912
913 sub snapshot_destroy {
914 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
915
916 my @zfscmd = ('zfs', 'destroy');
917 my $snapshot = "$source->{all}\@$snap";
918
919 eval {
920 if($source->{ip} && $method eq 'ssh'){
921 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
922 } else {
923 run_cmd([@zfscmd, $snapshot]);
924 }
925 };
926 if (my $erro = $@) {
927 warn "WARN: $erro";
928 }
929 if ($dest) {
930 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
931
932 my $path = target_dataset($source, $dest);
933
934 eval {
935 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
936 };
937 if (my $erro = $@) {
938 warn "WARN: $erro";
939 }
940 }
941 }
942
943 # check if snapshot for incremental sync exist on source side
944 sub snapshot_exist {
945 my ($source , $dest, $method, $source_user) = @_;
946
947 my $cmd = [];
948 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip};
949 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
950
951 my $path = $source->{all};
952 $path .= "\@$dest->{last_snap}";
953
954 push @$cmd, $path;
955
956 eval {run_cmd($cmd)};
957 if (my $erro =$@) {
958 warn "WARN: $erro";
959 return undef;
960 }
961 return 1;
962 }
963
964 sub send_image {
965 my ($source, $dest, $param) = @_;
966
967 my $cmd = [];
968
969 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
970 push @$cmd, 'zfs', 'send';
971 push @$cmd, '-p', if $param->{properties};
972 push @$cmd, '-v' if $param->{verbose};
973
974 if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) {
975 push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}";
976 }
977 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
978
979 if ($param->{limit}){
980 my $bwl = $param->{limit}*1024;
981 push @$cmd, \'|', 'cstream', '-t', $bwl;
982 }
983 my $target = target_dataset($source, $dest);
984
985 push @$cmd, \'|';
986 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
987 push @$cmd, 'zfs', 'recv', '-F', '--';
988 push @$cmd, "$target";
989
990 eval {
991 run_cmd($cmd)
992 };
993
994 if (my $erro = $@) {
995 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
996 die $erro;
997 };
998 }
999
1000
1001 sub send_config{
1002 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
1003
1004 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
1005 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
1006
1007 my $config_dir = $dest_config_path // $CONFIG_PATH;
1008 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
1009
1010 $dest_target_new = $config_dir.'/'.$dest_target_new;
1011
1012 if ($method eq 'ssh'){
1013 if ($dest->{ip} && $source->{ip}) {
1014 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1015 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1016 } elsif ($dest->{ip}) {
1017 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1018 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1019 } elsif ($source->{ip}) {
1020 run_cmd(['mkdir', '-p', '--', $config_dir]);
1021 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
1022 }
1023
1024 if ($source->{destroy}){
1025 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$dest->{old_snap}";
1026 if($dest->{ip}){
1027 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
1028 } else {
1029 run_cmd(['rm', '-f', '--', $dest_target_old]);
1030 }
1031 }
1032 } elsif ($method eq 'local') {
1033 run_cmd(['mkdir', '-p', '--', $config_dir]);
1034 run_cmd(['cp', $source_target, $dest_target_new]);
1035 }
1036 }
1037
1038 sub get_date {
1039 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1040 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1041
1042 return $datestamp;
1043 }
1044
1045 sub status {
1046 my $cfg = read_cron();
1047
1048 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1049
1050 my $states = read_state();
1051
1052 foreach my $source (sort keys%{$cfg}) {
1053 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1054 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1055 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1056 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1057 }
1058 }
1059
1060 return $status_list;
1061 }
1062
1063 sub enable_job {
1064 my ($param) = @_;
1065
1066 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1067 my $job = get_job($param);
1068 $job->{state} = "ok";
1069 update_state($job);
1070 update_cron($job);
1071 });
1072 }
1073
1074 sub disable_job {
1075 my ($param) = @_;
1076
1077 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1078 my $job = get_job($param);
1079 $job->{state} = "stopped";
1080 update_state($job);
1081 update_cron($job);
1082 });
1083 }
1084
1085 my $cmd_help = {
1086 destroy => qq{
1087 $PROGNAME destroy -source <string> [OPTIONS]
1088
1089 remove a sync Job from the scheduler
1090
1091 -name string
1092
1093 name of the sync job, if not set it is default
1094
1095 -source string
1096
1097 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1098 },
1099 create => qq{
1100 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1101
1102 Create a sync Job
1103
1104 -dest string
1105
1106 the destination target is like [IP]:<Pool>[/Path]
1107
1108 -dest-user string
1109
1110 name of the user on the destination target, root by default
1111
1112 -limit integer
1113
1114 max sync speed in kBytes/s, default unlimited
1115
1116 -maxsnap integer
1117
1118 how much snapshots will be kept before get erased, default 1
1119
1120 -name string
1121
1122 name of the sync job, if not set it is default
1123
1124 -skip
1125
1126 If specified, skip the first sync.
1127
1128 -source string
1129
1130 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1131
1132 -source-user string
1133
1134 name of the user on the source target, root by default
1135
1136 -properties
1137
1138 If specified, include the dataset's properties in the stream.
1139
1140 -dest-config-path string
1141
1142 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1143 },
1144 sync => qq{
1145 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1146
1147 will sync one time
1148
1149 -dest string
1150
1151 the destination target is like [IP:]<Pool>[/Path]
1152
1153 -dest-user string
1154
1155 name of the user on the destination target, root by default
1156
1157 -limit integer
1158
1159 max sync speed in kBytes/s, default unlimited
1160
1161 -maxsnap integer
1162
1163 how much snapshots will be kept before get erased, default 1
1164
1165 -name string
1166
1167 name of the sync job, if not set it is default.
1168 It is only necessary if scheduler allready contains this source.
1169
1170 -source string
1171
1172 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1173
1174 -source-user string
1175
1176 name of the user on the source target, root by default
1177
1178 -verbose
1179
1180 If specified, print out the sync progress.
1181
1182 -properties
1183
1184 If specified, include the dataset's properties in the stream.
1185
1186 -dest-config-path string
1187
1188 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1189 },
1190 list => qq{
1191 $PROGNAME list
1192
1193 Get a List of all scheduled Sync Jobs
1194 },
1195 status => qq{
1196 $PROGNAME status
1197
1198 Get the status of all scheduled Sync Jobs
1199 },
1200 help => qq{
1201 $PROGNAME help <cmd> [OPTIONS]
1202
1203 Get help about specified command.
1204
1205 <cmd> string
1206
1207 Command name
1208
1209 -verbose
1210
1211 Verbose output format.
1212 },
1213 enable => qq{
1214 $PROGNAME enable -source <string> [OPTIONS]
1215
1216 enable a syncjob and reset error
1217
1218 -name string
1219
1220 name of the sync job, if not set it is default
1221
1222 -source string
1223
1224 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1225 },
1226 disable => qq{
1227 $PROGNAME disable -source <string> [OPTIONS]
1228
1229 pause a sync job
1230
1231 -name string
1232
1233 name of the sync job, if not set it is default
1234
1235 -source string
1236
1237 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1238 },
1239 printpod => 'internal command',
1240
1241 };
1242
1243 if (!$command) {
1244 usage(); die "\n";
1245 } elsif (!$cmd_help->{$command}) {
1246 print "ERROR: unknown command '$command'";
1247 usage(1); die "\n";
1248 }
1249
1250 my @arg = @ARGV;
1251 my $param = parse_argv(@arg);
1252
1253 sub check_params {
1254 for (@_) {
1255 die "$cmd_help->{$command}\n" if !$param->{$_};
1256 }
1257 }
1258
1259 if ($command eq 'destroy') {
1260 check_params(qw(source));
1261
1262 check_target($param->{source});
1263 destroy_job($param);
1264
1265 } elsif ($command eq 'sync') {
1266 check_params(qw(source dest));
1267
1268 check_target($param->{source});
1269 check_target($param->{dest});
1270 sync($param);
1271
1272 } elsif ($command eq 'create') {
1273 check_params(qw(source dest));
1274
1275 check_target($param->{source});
1276 check_target($param->{dest});
1277 init($param);
1278
1279 } elsif ($command eq 'status') {
1280 print status();
1281
1282 } elsif ($command eq 'list') {
1283 print list();
1284
1285 } elsif ($command eq 'help') {
1286 my $help_command = $ARGV[1];
1287
1288 if ($help_command && $cmd_help->{$help_command}) {
1289 die "$cmd_help->{$help_command}\n";
1290
1291 }
1292 if ($param->{verbose}) {
1293 exec("man $PROGNAME");
1294
1295 } else {
1296 usage(1);
1297
1298 }
1299
1300 } elsif ($command eq 'enable') {
1301 check_params(qw(source));
1302
1303 check_target($param->{source});
1304 enable_job($param);
1305
1306 } elsif ($command eq 'disable') {
1307 check_params(qw(source));
1308
1309 check_target($param->{source});
1310 disable_job($param);
1311
1312 } elsif ($command eq 'printpod') {
1313 print_pod();
1314 }
1315
1316 sub usage {
1317 my ($help) = @_;
1318
1319 print("ERROR:\tno command specified\n") if !$help;
1320 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1321 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1322 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1323 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1324 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1325 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1326 print("\t$PROGNAME list\n");
1327 print("\t$PROGNAME status\n");
1328 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1329 }
1330
1331 sub check_target {
1332 my ($target) = @_;
1333 parse_target($target);
1334 }
1335
1336 sub print_pod {
1337
1338 my $synopsis = join("\n", sort values %$cmd_help);
1339
1340 print <<EOF;
1341 =head1 NAME
1342
1343 pve-zsync - PVE ZFS Replication Manager
1344
1345 =head1 SYNOPSIS
1346
1347 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1348
1349 $synopsis
1350
1351 =head1 DESCRIPTION
1352
1353 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1354 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1355 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1356 To config cron see man crontab.
1357
1358 =head2 PVE ZFS Storage sync Tool
1359
1360 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1361
1362 =head1 EXAMPLES
1363
1364 add sync job from local VM to remote ZFS Server
1365 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1366
1367 =head1 IMPORTANT FILES
1368
1369 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1370
1371 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1372
1373 =head1 COPYRIGHT AND DISCLAIMER
1374
1375 Copyright (C) 2007-2021 Proxmox Server Solutions GmbH
1376
1377 This program is free software: you can redistribute it and/or modify it
1378 under the terms of the GNU Affero General Public License as published
1379 by the Free Software Foundation, either version 3 of the License, or
1380 (at your option) any later version.
1381
1382 This program is distributed in the hope that it will be useful, but
1383 WITHOUT ANY WARRANTY; without even the implied warranty of
1384 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1385 Affero General Public License for more details.
1386
1387 You should have received a copy of the GNU Affero General Public
1388 License along with this program. If not, see
1389 <http://www.gnu.org/licenses/>.
1390
1391 EOF
1392 }