]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
add check_dataset_exists function
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $PROG_PATH = "$PATH/${PROGNAME}";
22 my $INTERVAL = 15;
23 my $DEBUG;
24
25 BEGIN {
26 $DEBUG = 0; # change default here. not above on declaration!
27 $DEBUG ||= $ENV{ZSYNC_DEBUG};
28 if ($DEBUG) {
29 require Data::Dumper;
30 Data::Dumper->import();
31 }
32 }
33
34 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
35 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
36 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
37 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
38
39 my $IPV6RE = "(?:" .
40 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
41 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
42 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
43 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
49
50 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
51 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
52 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
53 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
54 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
55
56 my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|mp)\d+)|rootfs): /;
57
58 my $INSTANCE_ID = get_instance_id($$);
59
60 my $command = $ARGV[0];
61
62 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
63 check_bin ('cstream');
64 check_bin ('zfs');
65 check_bin ('ssh');
66 check_bin ('scp');
67 }
68
69 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
70 die "Signaled, aborting sync: $!\n";
71 };
72
73 sub check_bin {
74 my ($bin) = @_;
75
76 foreach my $p (split (/:/, $ENV{PATH})) {
77 my $fn = "$p/$bin";
78 if (-x $fn) {
79 return $fn;
80 }
81 }
82
83 die "unable to find command '$bin'\n";
84 }
85
86 sub read_file {
87 my ($filename, $one_line_only) = @_;
88
89 my $fh = IO::File->new($filename, "r")
90 or die "Could not open file ${filename}: $!\n";
91
92 my $text = $one_line_only ? <$fh> : [ <$fh> ];
93
94 close($fh);
95
96 return $text;
97 }
98
99 sub cut_target_width {
100 my ($path, $maxlen) = @_;
101 $path =~ s@/+@/@g;
102
103 return $path if length($path) <= $maxlen;
104
105 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
106
107 $path =~ s@/([^/]+/?)$@@;
108 my $tail = $1;
109
110 if (length($tail)+3 == $maxlen) {
111 return "../$tail";
112 } elsif (length($tail)+2 >= $maxlen) {
113 return '..'.substr($tail, -$maxlen+2)
114 }
115
116 $path =~ s@(/[^/]+)(?:/|$)@@;
117 my $head = $1;
118 my $both = length($head) + length($tail);
119 my $remaining = $maxlen-$both-4; # -4 for "/../"
120
121 if ($remaining < 0) {
122 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
123 }
124
125 substr($path, ($remaining/2), (length($path)-$remaining), '..');
126 return "$head/" . $path . "/$tail";
127 }
128
129 sub locked {
130 my ($lock_fn, $code) = @_;
131
132 my $lock_fh = IO::File->new("> $lock_fn");
133
134 flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
135 my $res = eval { $code->() };
136 my $err = $@;
137
138 flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
139 die "$err" if $err;
140
141 close($lock_fh);
142 return $res;
143 }
144
145 sub get_status {
146 my ($source, $name, $status) = @_;
147
148 if ($status->{$source->{all}}->{$name}->{status}) {
149 return $status;
150 }
151
152 return undef;
153 }
154
155 sub check_dataset_exists {
156 my ($dataset, $ip, $user) = @_;
157
158 my $cmd = [];
159
160 if ($ip) {
161 push @$cmd, 'ssh', "$user\@$ip", '--';
162 }
163 push @$cmd, 'zfs', 'list', '-H', '--', $dataset;
164 eval {
165 run_cmd($cmd);
166 };
167
168 if ($@) {
169 return 0;
170 }
171 return 1;
172 }
173
174 sub parse_target {
175 my ($text) = @_;
176
177 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
178 my $target = {};
179
180 if ($text !~ $TARGETRE) {
181 die "$errstr\n";
182 }
183 $target->{all} = $2;
184 $target->{ip} = $1 if $1;
185 my @parts = split('/', $2);
186
187 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
188
189 my $pool = $target->{pool} = shift(@parts);
190 die "$errstr\n" if !$pool;
191
192 if ($pool =~ m/^\d+$/) {
193 $target->{vmid} = $pool;
194 delete $target->{pool};
195 }
196
197 return $target if (@parts == 0);
198 $target->{last_part} = pop(@parts);
199
200 if ($target->{ip}) {
201 pop(@parts);
202 }
203 if (@parts > 0) {
204 $target->{path} = join('/', @parts);
205 }
206
207 return $target;
208 }
209
210 sub read_cron {
211
212 #This is for the first use to init file;
213 if (!-e $CRONJOBS) {
214 my $new_fh = IO::File->new("> $CRONJOBS");
215 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
216 close($new_fh);
217 return undef;
218 }
219
220 my $text = read_file($CRONJOBS, 0);
221
222 return encode_cron(@{$text});
223 }
224
225 sub parse_argv {
226 my (@arg) = @_;
227
228 my $param = {
229 dest => undef,
230 source => undef,
231 verbose => undef,
232 limit => undef,
233 maxsnap => undef,
234 name => undef,
235 skip => undef,
236 method => undef,
237 source_user => undef,
238 dest_user => undef,
239 properties => undef,
240 dest_config_path => undef,
241 };
242
243 my ($ret) = GetOptionsFromArray(
244 \@arg,
245 'dest=s' => \$param->{dest},
246 'source=s' => \$param->{source},
247 'verbose' => \$param->{verbose},
248 'limit=i' => \$param->{limit},
249 'maxsnap=i' => \$param->{maxsnap},
250 'name=s' => \$param->{name},
251 'skip' => \$param->{skip},
252 'method=s' => \$param->{method},
253 'source-user=s' => \$param->{source_user},
254 'dest-user=s' => \$param->{dest_user},
255 'properties' => \$param->{properties},
256 'dest-config-path=s' => \$param->{dest_config_path},
257 );
258
259 die "can't parse options\n" if $ret == 0;
260
261 $param->{name} //= "default";
262 $param->{maxsnap} //= 1;
263 $param->{method} //= "ssh";
264 $param->{source_user} //= "root";
265 $param->{dest_user} //= "root";
266
267 return $param;
268 }
269
270 sub add_state_to_job {
271 my ($job) = @_;
272
273 my $states = read_state();
274 my $state = $states->{$job->{source}}->{$job->{name}};
275
276 $job->{state} = $state->{state};
277 $job->{lsync} = $state->{lsync};
278 $job->{vm_type} = $state->{vm_type};
279 $job->{instance_id} = $state->{instance_id};
280
281 for (my $i = 0; $state->{"snap$i"}; $i++) {
282 $job->{"snap$i"} = $state->{"snap$i"};
283 }
284
285 return $job;
286 }
287
288 sub encode_cron {
289 my (@text) = @_;
290
291 my $cfg = {};
292
293 while (my $line = shift(@text)) {
294
295 my @arg = split('\s', $line);
296 my $param = parse_argv(@arg);
297
298 if ($param->{source} && $param->{dest}) {
299 my $source = delete $param->{source};
300 my $name = delete $param->{name};
301
302 $cfg->{$source}->{$name} = $param;
303 }
304 }
305
306 return $cfg;
307 }
308
309 sub param_to_job {
310 my ($param) = @_;
311
312 my $job = {};
313
314 my $source = parse_target($param->{source});
315 my $dest = parse_target($param->{dest}) if $param->{dest};
316
317 $job->{name} = !$param->{name} ? "default" : $param->{name};
318 $job->{dest} = $param->{dest} if $param->{dest};
319 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
320 $job->{method} = "ssh" if !$job->{method};
321 $job->{limit} = $param->{limit};
322 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
323 $job->{source} = $param->{source};
324 $job->{source_user} = $param->{source_user};
325 $job->{dest_user} = $param->{dest_user};
326 $job->{properties} = !!$param->{properties};
327 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
328
329 return $job;
330 }
331
332 sub read_state {
333
334 if (!-e $STATE) {
335 make_path $CONFIG_PATH;
336 my $new_fh = IO::File->new("> $STATE");
337 die "Could not create $STATE: $!\n" if !$new_fh;
338 print $new_fh "{}";
339 close($new_fh);
340 return undef;
341 }
342
343 my $text = read_file($STATE, 1);
344 return decode_json($text);
345 }
346
347 sub update_state {
348 my ($job) = @_;
349
350 my $text = eval { read_file($STATE, 1); };
351
352 my $out_fh = IO::File->new("> $STATE.new");
353 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
354
355 my $states = {};
356 my $state = {};
357 if ($text){
358 $states = decode_json($text);
359 $state = $states->{$job->{source}}->{$job->{name}};
360 }
361
362 if ($job->{state} ne "del") {
363 $state->{state} = $job->{state};
364 $state->{lsync} = $job->{lsync};
365 $state->{instance_id} = $job->{instance_id};
366 $state->{vm_type} = $job->{vm_type};
367
368 for (my $i = 0; $job->{"snap$i"} ; $i++) {
369 $state->{"snap$i"} = $job->{"snap$i"};
370 }
371 $states->{$job->{source}}->{$job->{name}} = $state;
372 } else {
373
374 delete $states->{$job->{source}}->{$job->{name}};
375 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
376 }
377
378 $text = encode_json($states);
379 print $out_fh $text;
380
381 close($out_fh);
382 rename "$STATE.new", $STATE;
383
384 return $states;
385 }
386
387 sub update_cron {
388 my ($job) = @_;
389
390 my $updated;
391 my $has_header;
392 my $line_no = 0;
393 my $text = "";
394 my $header = "SHELL=/bin/sh\n";
395 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
396
397 my $current = read_file($CRONJOBS, 0);
398
399 foreach my $line (@{$current}) {
400 chomp($line);
401 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
402 $updated = 1;
403 next if $job->{state} eq "del";
404 $text .= format_job($job, $line);
405 } else {
406 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
407 $has_header = 1;
408 }
409 $text .= "$line\n";
410 }
411 $line_no++;
412 }
413
414 if (!$has_header) {
415 $text = "$header$text";
416 }
417
418 if (!$updated) {
419 $text .= format_job($job);
420 }
421 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
422 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
423
424 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
425 close ($new_fh);
426
427 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
428 }
429
430 sub format_job {
431 my ($job, $line) = @_;
432 my $text = "";
433
434 if ($job->{state} eq "stopped") {
435 $text = "#";
436 }
437 if ($line) {
438 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
439 $text .= $1;
440 } else {
441 $text .= "*/$INTERVAL * * * *";
442 }
443 $text .= " root";
444 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
445 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
446 $text .= " --limit $job->{limit}" if $job->{limit};
447 $text .= " --method $job->{method}";
448 $text .= " --verbose" if $job->{verbose};
449 $text .= " --source-user $job->{source_user}";
450 $text .= " --dest-user $job->{dest_user}";
451 $text .= " --properties" if $job->{properties};
452 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
453 $text .= "\n";
454
455 return $text;
456 }
457
458 sub list {
459
460 my $cfg = read_cron();
461
462 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
463
464 my $states = read_state();
465 foreach my $source (sort keys%{$cfg}) {
466 foreach my $name (sort keys%{$cfg->{$source}}) {
467 $list .= sprintf("%-25s", cut_target_width($source, 25));
468 $list .= sprintf("%-25s", cut_target_width($name, 25));
469 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
470 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
471 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
472 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
473 }
474 }
475
476 return $list;
477 }
478
479 sub vm_exists {
480 my ($target, $user) = @_;
481
482 return undef if !defined($target->{vmid});
483
484 my $conf_fn = "$target->{vmid}.conf";
485
486 if ($target->{ip}) {
487 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
488 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
489 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
490 } else {
491 return "qemu" if -f "$QEMU_CONF/$conf_fn";
492 return "lxc" if -f "$LXC_CONF/$conf_fn";
493 }
494
495 return undef;
496 }
497
498 sub init {
499 my ($param) = @_;
500
501 locked("$CONFIG_PATH/cron_and_state.lock", sub {
502 my $cfg = read_cron();
503
504 my $job = param_to_job($param);
505
506 $job->{state} = "ok";
507 $job->{lsync} = 0;
508
509 my $source = parse_target($param->{source});
510 my $dest = parse_target($param->{dest});
511
512 if (my $ip = $dest->{ip}) {
513 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
514 }
515
516 if (my $ip = $source->{ip}) {
517 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
518 }
519
520 die "Pool $dest->{all} does not exist\n"
521 if !check_dataset_exists($dest->{all}, $dest->{ip}, $param->{dest_user});
522
523 if (!defined($source->{vmid})) {
524 die "Pool $source->{all} does not exist\n"
525 if !check_dataset_exists($source->{all}, $source->{ip}, $param->{source_user});
526 }
527
528 my $vm_type = vm_exists($source, $param->{source_user});
529 $job->{vm_type} = $vm_type;
530 $source->{vm_type} = $vm_type;
531
532 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
533
534 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
535
536 #check if vm has zfs disks if not die;
537 get_disks($source, $param->{source_user}) if $source->{vmid};
538
539 update_cron($job);
540 update_state($job);
541 }); #cron and state lock
542
543 return if $param->{skip};
544
545 eval { sync($param) };
546 if (my $err = $@) {
547 destroy_job($param);
548 print $err;
549 }
550 }
551
552 sub get_job {
553 my ($param) = @_;
554
555 my $cfg = read_cron();
556
557 if (!$cfg->{$param->{source}}->{$param->{name}}) {
558 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
559 }
560 my $job = $cfg->{$param->{source}}->{$param->{name}};
561 $job->{name} = $param->{name};
562 $job->{source} = $param->{source};
563 $job = add_state_to_job($job);
564
565 return $job;
566 }
567
568 sub destroy_job {
569 my ($param) = @_;
570
571 locked("$CONFIG_PATH/cron_and_state.lock", sub {
572 my $job = get_job($param);
573 $job->{state} = "del";
574
575 update_cron($job);
576 update_state($job);
577 });
578 }
579
580 sub get_instance_id {
581 my ($pid) = @_;
582
583 my $stat = read_file("/proc/$pid/stat", 1)
584 or die "unable to read process stats\n";
585 my $boot_id = read_file("/proc/sys/kernel/random/boot_id", 1)
586 or die "unable to read boot ID\n";
587
588 my $stats = [ split(/\s+/, $stat) ];
589 my $starttime = $stats->[21];
590 chomp($boot_id);
591
592 return "${pid}:${starttime}:${boot_id}";
593 }
594
595 sub instance_exists {
596 my ($instance_id) = @_;
597
598 if (defined($instance_id) && $instance_id =~ m/^([1-9][0-9]*):/) {
599 my $pid = $1;
600 my $actual_id = eval { get_instance_id($pid); };
601 return defined($actual_id) && $actual_id eq $instance_id;
602 }
603
604 return 0;
605 }
606
607 sub sync {
608 my ($param) = @_;
609
610 my $job;
611
612 locked("$CONFIG_PATH/cron_and_state.lock", sub {
613 eval { $job = get_job($param) };
614
615 if ($job) {
616 my $state = $job->{state} // 'ok';
617 $state = 'ok' if !instance_exists($job->{instance_id});
618
619 if ($state eq "syncing" || $state eq "waiting") {
620 die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
621 }
622
623 $job->{state} = "waiting";
624 $job->{instance_id} = $INSTANCE_ID;
625
626 update_state($job);
627 }
628 });
629
630 locked("$CONFIG_PATH/sync.lock", sub {
631
632 my $date = get_date();
633
634 my $dest;
635 my $source;
636 my $vm_type;
637
638 locked("$CONFIG_PATH/cron_and_state.lock", sub {
639 #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
640 eval { $job = get_job($param); };
641
642 if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
643 die "Job --source $param->{source} --name $param->{name} has been disabled\n";
644 }
645
646 $dest = parse_target($param->{dest});
647 $source = parse_target($param->{source});
648
649 $vm_type = vm_exists($source, $param->{source_user});
650 $source->{vm_type} = $vm_type;
651
652 if ($job) {
653 $job->{state} = "syncing";
654 $job->{vm_type} = $vm_type if !$job->{vm_type};
655 update_state($job);
656 }
657 }); #cron and state lock
658
659 my $sync_path = sub {
660 my ($source, $dest, $job, $param, $date) = @_;
661
662 ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{dest_user});
663
664 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
665
666 send_image($source, $dest, $param);
667
668 snapshot_destroy($source, $dest, $param->{method}, $dest->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $dest->{old_snap});
669
670 };
671
672 eval{
673 if ($source->{vmid}) {
674 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
675 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
676 my $disks = get_disks($source, $param->{source_user});
677
678 foreach my $disk (sort keys %{$disks}) {
679 $source->{all} = $disks->{$disk}->{all};
680 $source->{pool} = $disks->{$disk}->{pool};
681 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
682 $source->{last_part} = $disks->{$disk}->{last_part};
683 &$sync_path($source, $dest, $job, $param, $date);
684 }
685 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
686 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
687 } else {
688 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
689 }
690 } else {
691 &$sync_path($source, $dest, $job, $param, $date);
692 }
693 };
694 if (my $err = $@) {
695 locked("$CONFIG_PATH/cron_and_state.lock", sub {
696 eval { $job = get_job($param); };
697 if ($job) {
698 $job->{state} = "error";
699 delete $job->{instance_id};
700 update_state($job);
701 }
702 });
703 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
704 die "$err\n";
705 }
706
707 locked("$CONFIG_PATH/cron_and_state.lock", sub {
708 eval { $job = get_job($param); };
709 if ($job) {
710 if (defined($job->{state}) && $job->{state} eq "stopped") {
711 $job->{state} = "stopped";
712 } else {
713 $job->{state} = "ok";
714 }
715 $job->{lsync} = $date;
716 delete $job->{instance_id};
717 update_state($job);
718 }
719 });
720 }); #sync lock
721 }
722
723 sub snapshot_get{
724 my ($source, $dest, $max_snap, $name, $dest_user) = @_;
725
726 my $cmd = [];
727 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--', if $dest->{ip};
728 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
729
730 my $path = target_dataset($source, $dest);
731 push @$cmd, $path;
732
733 my $raw;
734 eval {$raw = run_cmd($cmd)};
735 if (my $erro =$@) { #this means the volume doesn't exist on dest yet
736 return undef;
737 }
738
739 my $index = 0;
740 my $line = "";
741 my $last_snap = undef;
742 my $old_snap;
743
744 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
745 $line = $1;
746 if ($line =~ m/@(.*)$/) {
747 $last_snap = $1 if (!$last_snap);
748 }
749 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
750 $old_snap = $1;
751 $index++;
752 if ($index == $max_snap) {
753 $source->{destroy} = 1;
754 last;
755 };
756 }
757 }
758
759 return ($old_snap, $last_snap) if $last_snap;
760
761 return undef;
762 }
763
764 sub snapshot_add {
765 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
766
767 my $snap_name = "rep_$name\_".$date;
768
769 $source->{new_snap} = $snap_name;
770
771 my $path = "$source->{all}\@$snap_name";
772
773 my $cmd = [];
774 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
775 push @$cmd, 'zfs', 'snapshot', $path;
776 eval{
777 run_cmd($cmd);
778 };
779
780 if (my $err = $@) {
781 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
782 die "$err\n";
783 }
784 }
785
786 sub get_disks {
787 my ($target, $user) = @_;
788
789 my $cmd = [];
790 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
791
792 if ($target->{vm_type} eq 'qemu') {
793 push @$cmd, 'qm', 'config', $target->{vmid};
794 } elsif ($target->{vm_type} eq 'lxc') {
795 push @$cmd, 'pct', 'config', $target->{vmid};
796 } else {
797 die "VM Type unknown\n";
798 }
799
800 my $res = run_cmd($cmd);
801
802 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
803
804 return $disks;
805 }
806
807 sub run_cmd {
808 my ($cmd) = @_;
809 print "Start CMD\n" if $DEBUG;
810 print Dumper $cmd if $DEBUG;
811 if (ref($cmd) eq 'ARRAY') {
812 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
813 }
814 my $output = `$cmd 2>&1`;
815
816 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
817
818 chomp($output);
819 print Dumper $output if $DEBUG;
820 print "END CMD\n" if $DEBUG;
821 return $output;
822 }
823
824 sub parse_disks {
825 my ($text, $ip, $vm_type, $user) = @_;
826
827 my $disks;
828
829 my $num = 0;
830 while ($text && $text =~ s/^(.*?)(\n|$)//) {
831 my $line = $1;
832
833 next if $line =~ /media=cdrom/;
834 next if $line !~ m/$DISK_KEY_RE/;
835
836 #QEMU if backup is not set include in sync
837 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
838
839 #LXC if backup is not set do no in sync
840 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
841
842 my $disk = undef;
843 my $stor = undef;
844 if($line =~ m/$DISK_KEY_RE(.*)$/) {
845 my @parameter = split(/,/,$1);
846
847 foreach my $opt (@parameter) {
848 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
849 $disk = $2;
850 $stor = $1;
851 last;
852 }
853 }
854 }
855 if (!defined($disk) || !defined($stor)) {
856 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
857 next;
858 }
859
860 my $cmd = [];
861 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
862 push @$cmd, 'pvesm', 'path', "$stor$disk";
863 my $path = run_cmd($cmd);
864
865 die "Get no path from pvesm path $stor$disk\n" if !$path;
866
867 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
868
869 my @array = split('/', $1);
870 $disks->{$num}->{pool} = shift(@array);
871 $disks->{$num}->{all} = $disks->{$num}->{pool};
872 if (0 < @array) {
873 $disks->{$num}->{path} = join('/', @array);
874 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
875 }
876 $disks->{$num}->{last_part} = $disk;
877 $disks->{$num}->{all} .= "\/$disk";
878
879 $num++;
880 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
881
882 $disks->{$num}->{pool} = $1;
883 $disks->{$num}->{all} = $disks->{$num}->{pool};
884
885 if ($2) {
886 $disks->{$num}->{path} = $3;
887 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
888 }
889
890 $disks->{$num}->{last_part} = $disk;
891 $disks->{$num}->{all} .= "\/$disk";
892
893 $num++;
894
895 } else {
896 die "ERROR: in path\n";
897 }
898 }
899
900 die "Vm include no disk on zfs.\n" if !$disks->{0};
901 return $disks;
902 }
903
904 # how the corresponding dataset is named on the target
905 sub target_dataset {
906 my ($source, $dest) = @_;
907
908 my $target = "$dest->{all}";
909 $target .= "/$source->{last_part}" if $source->{last_part};
910 $target =~ s!/+!/!g;
911
912 return $target;
913 }
914
915 sub snapshot_destroy {
916 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
917
918 my @zfscmd = ('zfs', 'destroy');
919 my $snapshot = "$source->{all}\@$snap";
920
921 eval {
922 if($source->{ip} && $method eq 'ssh'){
923 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
924 } else {
925 run_cmd([@zfscmd, $snapshot]);
926 }
927 };
928 if (my $erro = $@) {
929 warn "WARN: $erro";
930 }
931 if ($dest) {
932 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
933
934 my $path = target_dataset($source, $dest);
935
936 eval {
937 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
938 };
939 if (my $erro = $@) {
940 warn "WARN: $erro";
941 }
942 }
943 }
944
945 # check if snapshot for incremental sync exist on source side
946 sub snapshot_exist {
947 my ($source , $dest, $method, $source_user) = @_;
948
949 my $cmd = [];
950 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip};
951 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
952
953 my $path = $source->{all};
954 $path .= "\@$dest->{last_snap}";
955
956 push @$cmd, $path;
957
958 eval {run_cmd($cmd)};
959 if (my $erro =$@) {
960 warn "WARN: $erro";
961 return undef;
962 }
963 return 1;
964 }
965
966 sub send_image {
967 my ($source, $dest, $param) = @_;
968
969 my $cmd = [];
970
971 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
972 push @$cmd, 'zfs', 'send';
973 push @$cmd, '-p', if $param->{properties};
974 push @$cmd, '-v' if $param->{verbose};
975
976 if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) {
977 push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}";
978 }
979 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
980
981 if ($param->{limit}){
982 my $bwl = $param->{limit}*1024;
983 push @$cmd, \'|', 'cstream', '-t', $bwl;
984 }
985 my $target = target_dataset($source, $dest);
986
987 push @$cmd, \'|';
988 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
989 push @$cmd, 'zfs', 'recv', '-F', '--';
990 push @$cmd, "$target";
991
992 eval {
993 run_cmd($cmd)
994 };
995
996 if (my $erro = $@) {
997 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
998 die $erro;
999 };
1000 }
1001
1002
1003 sub send_config{
1004 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
1005
1006 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
1007 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
1008
1009 my $config_dir = $dest_config_path // $CONFIG_PATH;
1010 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
1011
1012 $dest_target_new = $config_dir.'/'.$dest_target_new;
1013
1014 if ($method eq 'ssh'){
1015 if ($dest->{ip} && $source->{ip}) {
1016 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1017 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1018 } elsif ($dest->{ip}) {
1019 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1020 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1021 } elsif ($source->{ip}) {
1022 run_cmd(['mkdir', '-p', '--', $config_dir]);
1023 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
1024 }
1025
1026 if ($source->{destroy}){
1027 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$dest->{old_snap}";
1028 if($dest->{ip}){
1029 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
1030 } else {
1031 run_cmd(['rm', '-f', '--', $dest_target_old]);
1032 }
1033 }
1034 } elsif ($method eq 'local') {
1035 run_cmd(['mkdir', '-p', '--', $config_dir]);
1036 run_cmd(['cp', $source_target, $dest_target_new]);
1037 }
1038 }
1039
1040 sub get_date {
1041 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1042 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1043
1044 return $datestamp;
1045 }
1046
1047 sub status {
1048 my $cfg = read_cron();
1049
1050 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1051
1052 my $states = read_state();
1053
1054 foreach my $source (sort keys%{$cfg}) {
1055 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1056 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1057 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1058 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1059 }
1060 }
1061
1062 return $status_list;
1063 }
1064
1065 sub enable_job {
1066 my ($param) = @_;
1067
1068 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1069 my $job = get_job($param);
1070 $job->{state} = "ok";
1071 update_state($job);
1072 update_cron($job);
1073 });
1074 }
1075
1076 sub disable_job {
1077 my ($param) = @_;
1078
1079 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1080 my $job = get_job($param);
1081 $job->{state} = "stopped";
1082 update_state($job);
1083 update_cron($job);
1084 });
1085 }
1086
1087 my $cmd_help = {
1088 destroy => qq{
1089 $PROGNAME destroy -source <string> [OPTIONS]
1090
1091 remove a sync Job from the scheduler
1092
1093 -name string
1094
1095 name of the sync job, if not set it is default
1096
1097 -source string
1098
1099 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1100 },
1101 create => qq{
1102 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1103
1104 Create a sync Job
1105
1106 -dest string
1107
1108 the destination target is like [IP]:<Pool>[/Path]
1109
1110 -dest-user string
1111
1112 name of the user on the destination target, root by default
1113
1114 -limit integer
1115
1116 max sync speed in kBytes/s, default unlimited
1117
1118 -maxsnap integer
1119
1120 how much snapshots will be kept before get erased, default 1
1121
1122 -name string
1123
1124 name of the sync job, if not set it is default
1125
1126 -skip
1127
1128 If specified, skip the first sync.
1129
1130 -source string
1131
1132 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1133
1134 -source-user string
1135
1136 name of the user on the source target, root by default
1137
1138 -properties
1139
1140 If specified, include the dataset's properties in the stream.
1141
1142 -dest-config-path string
1143
1144 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1145 },
1146 sync => qq{
1147 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1148
1149 will sync one time
1150
1151 -dest string
1152
1153 the destination target is like [IP:]<Pool>[/Path]
1154
1155 -dest-user string
1156
1157 name of the user on the destination target, root by default
1158
1159 -limit integer
1160
1161 max sync speed in kBytes/s, default unlimited
1162
1163 -maxsnap integer
1164
1165 how much snapshots will be kept before get erased, default 1
1166
1167 -name string
1168
1169 name of the sync job, if not set it is default.
1170 It is only necessary if scheduler allready contains this source.
1171
1172 -source string
1173
1174 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1175
1176 -source-user string
1177
1178 name of the user on the source target, root by default
1179
1180 -verbose
1181
1182 If specified, print out the sync progress.
1183
1184 -properties
1185
1186 If specified, include the dataset's properties in the stream.
1187
1188 -dest-config-path string
1189
1190 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1191 },
1192 list => qq{
1193 $PROGNAME list
1194
1195 Get a List of all scheduled Sync Jobs
1196 },
1197 status => qq{
1198 $PROGNAME status
1199
1200 Get the status of all scheduled Sync Jobs
1201 },
1202 help => qq{
1203 $PROGNAME help <cmd> [OPTIONS]
1204
1205 Get help about specified command.
1206
1207 <cmd> string
1208
1209 Command name
1210
1211 -verbose
1212
1213 Verbose output format.
1214 },
1215 enable => qq{
1216 $PROGNAME enable -source <string> [OPTIONS]
1217
1218 enable a syncjob and reset error
1219
1220 -name string
1221
1222 name of the sync job, if not set it is default
1223
1224 -source string
1225
1226 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1227 },
1228 disable => qq{
1229 $PROGNAME disable -source <string> [OPTIONS]
1230
1231 pause a sync job
1232
1233 -name string
1234
1235 name of the sync job, if not set it is default
1236
1237 -source string
1238
1239 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1240 },
1241 printpod => 'internal command',
1242
1243 };
1244
1245 if (!$command) {
1246 usage(); die "\n";
1247 } elsif (!$cmd_help->{$command}) {
1248 print "ERROR: unknown command '$command'";
1249 usage(1); die "\n";
1250 }
1251
1252 my @arg = @ARGV;
1253 my $param = parse_argv(@arg);
1254
1255 sub check_params {
1256 for (@_) {
1257 die "$cmd_help->{$command}\n" if !$param->{$_};
1258 }
1259 }
1260
1261 if ($command eq 'destroy') {
1262 check_params(qw(source));
1263
1264 check_target($param->{source});
1265 destroy_job($param);
1266
1267 } elsif ($command eq 'sync') {
1268 check_params(qw(source dest));
1269
1270 check_target($param->{source});
1271 check_target($param->{dest});
1272 sync($param);
1273
1274 } elsif ($command eq 'create') {
1275 check_params(qw(source dest));
1276
1277 check_target($param->{source});
1278 check_target($param->{dest});
1279 init($param);
1280
1281 } elsif ($command eq 'status') {
1282 print status();
1283
1284 } elsif ($command eq 'list') {
1285 print list();
1286
1287 } elsif ($command eq 'help') {
1288 my $help_command = $ARGV[1];
1289
1290 if ($help_command && $cmd_help->{$help_command}) {
1291 die "$cmd_help->{$help_command}\n";
1292
1293 }
1294 if ($param->{verbose}) {
1295 exec("man $PROGNAME");
1296
1297 } else {
1298 usage(1);
1299
1300 }
1301
1302 } elsif ($command eq 'enable') {
1303 check_params(qw(source));
1304
1305 check_target($param->{source});
1306 enable_job($param);
1307
1308 } elsif ($command eq 'disable') {
1309 check_params(qw(source));
1310
1311 check_target($param->{source});
1312 disable_job($param);
1313
1314 } elsif ($command eq 'printpod') {
1315 print_pod();
1316 }
1317
1318 sub usage {
1319 my ($help) = @_;
1320
1321 print("ERROR:\tno command specified\n") if !$help;
1322 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1323 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1324 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1325 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1326 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1327 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1328 print("\t$PROGNAME list\n");
1329 print("\t$PROGNAME status\n");
1330 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1331 }
1332
1333 sub check_target {
1334 my ($target) = @_;
1335 parse_target($target);
1336 }
1337
1338 sub print_pod {
1339
1340 my $synopsis = join("\n", sort values %$cmd_help);
1341
1342 print <<EOF;
1343 =head1 NAME
1344
1345 pve-zsync - PVE ZFS Replication Manager
1346
1347 =head1 SYNOPSIS
1348
1349 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1350
1351 $synopsis
1352
1353 =head1 DESCRIPTION
1354
1355 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1356 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1357 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1358 To config cron see man crontab.
1359
1360 =head2 PVE ZFS Storage sync Tool
1361
1362 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1363
1364 =head1 EXAMPLES
1365
1366 add sync job from local VM to remote ZFS Server
1367 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1368
1369 =head1 IMPORTANT FILES
1370
1371 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1372
1373 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1374
1375 =head1 COPYRIGHT AND DISCLAIMER
1376
1377 Copyright (C) 2007-2021 Proxmox Server Solutions GmbH
1378
1379 This program is free software: you can redistribute it and/or modify it
1380 under the terms of the GNU Affero General Public License as published
1381 by the Free Software Foundation, either version 3 of the License, or
1382 (at your option) any later version.
1383
1384 This program is distributed in the hope that it will be useful, but
1385 WITHOUT ANY WARRANTY; without even the implied warranty of
1386 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1387 Affero General Public License for more details.
1388
1389 You should have received a copy of the GNU Affero General Public
1390 License along with this program. If not, see
1391 <http://www.gnu.org/licenses/>.
1392
1393 EOF
1394 }