]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
Revert "fix: check for incremental sync snapshot."
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $PROG_PATH = "$PATH/${PROGNAME}";
22 my $INTERVAL = 15;
23 my $DEBUG;
24
25 BEGIN {
26 $DEBUG = 0; # change default here. not above on declaration!
27 $DEBUG ||= $ENV{ZSYNC_DEBUG};
28 if ($DEBUG) {
29 require Data::Dumper;
30 Data::Dumper->import();
31 }
32 }
33
34 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
35 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
36 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
37 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
38
39 my $IPV6RE = "(?:" .
40 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
41 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
42 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
43 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
49
50 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
51 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
52 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
53 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
54 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
55
56 my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|mp)\d+)|rootfs): /;
57
58 my $command = $ARGV[0];
59
60 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
61 check_bin ('cstream');
62 check_bin ('zfs');
63 check_bin ('ssh');
64 check_bin ('scp');
65 }
66
67 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
68 die "Signaled, aborting sync: $!\n";
69 };
70
71 sub check_bin {
72 my ($bin) = @_;
73
74 foreach my $p (split (/:/, $ENV{PATH})) {
75 my $fn = "$p/$bin";
76 if (-x $fn) {
77 return $fn;
78 }
79 }
80
81 die "unable to find command '$bin'\n";
82 }
83
84 sub cut_target_width {
85 my ($path, $maxlen) = @_;
86 $path =~ s@/+@/@g;
87
88 return $path if length($path) <= $maxlen;
89
90 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
91
92 $path =~ s@/([^/]+/?)$@@;
93 my $tail = $1;
94
95 if (length($tail)+3 == $maxlen) {
96 return "../$tail";
97 } elsif (length($tail)+2 >= $maxlen) {
98 return '..'.substr($tail, -$maxlen+2)
99 }
100
101 $path =~ s@(/[^/]+)(?:/|$)@@;
102 my $head = $1;
103 my $both = length($head) + length($tail);
104 my $remaining = $maxlen-$both-4; # -4 for "/../"
105
106 if ($remaining < 0) {
107 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
108 }
109
110 substr($path, ($remaining/2), (length($path)-$remaining), '..');
111 return "$head/" . $path . "/$tail";
112 }
113
114 sub locked {
115 my ($lock_fn, $code) = @_;
116
117 my $lock_fh = IO::File->new("> $lock_fn");
118
119 flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
120 my $res = eval { $code->() };
121 my $err = $@;
122
123 flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
124 die "$err" if $err;
125
126 close($lock_fh);
127 return $res;
128 }
129
130 sub get_status {
131 my ($source, $name, $status) = @_;
132
133 if ($status->{$source->{all}}->{$name}->{status}) {
134 return $status;
135 }
136
137 return undef;
138 }
139
140 sub check_pool_exists {
141 my ($target, $user) = @_;
142
143 my $cmd = [];
144
145 if ($target->{ip}) {
146 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
147 }
148 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
149 eval {
150 run_cmd($cmd);
151 };
152
153 if ($@) {
154 return 0;
155 }
156 return 1;
157 }
158
159 sub parse_target {
160 my ($text) = @_;
161
162 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
163 my $target = {};
164
165 if ($text !~ $TARGETRE) {
166 die "$errstr\n";
167 }
168 $target->{all} = $2;
169 $target->{ip} = $1 if $1;
170 my @parts = split('/', $2);
171
172 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
173
174 my $pool = $target->{pool} = shift(@parts);
175 die "$errstr\n" if !$pool;
176
177 if ($pool =~ m/^\d+$/) {
178 $target->{vmid} = $pool;
179 delete $target->{pool};
180 }
181
182 return $target if (@parts == 0);
183 $target->{last_part} = pop(@parts);
184
185 if ($target->{ip}) {
186 pop(@parts);
187 }
188 if (@parts > 0) {
189 $target->{path} = join('/', @parts);
190 }
191
192 return $target;
193 }
194
195 sub read_cron {
196
197 #This is for the first use to init file;
198 if (!-e $CRONJOBS) {
199 my $new_fh = IO::File->new("> $CRONJOBS");
200 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
201 close($new_fh);
202 return undef;
203 }
204
205 my $fh = IO::File->new("< $CRONJOBS");
206 die "Could not open file $CRONJOBS: $!\n" if !$fh;
207
208 my @text = <$fh>;
209
210 close($fh);
211
212 return encode_cron(@text);
213 }
214
215 sub parse_argv {
216 my (@arg) = @_;
217
218 my $param = {
219 dest => undef,
220 source => undef,
221 verbose => undef,
222 limit => undef,
223 maxsnap => undef,
224 name => undef,
225 skip => undef,
226 method => undef,
227 source_user => undef,
228 dest_user => undef,
229 properties => undef,
230 dest_config_path => undef,
231 };
232
233 my ($ret) = GetOptionsFromArray(
234 \@arg,
235 'dest=s' => \$param->{dest},
236 'source=s' => \$param->{source},
237 'verbose' => \$param->{verbose},
238 'limit=i' => \$param->{limit},
239 'maxsnap=i' => \$param->{maxsnap},
240 'name=s' => \$param->{name},
241 'skip' => \$param->{skip},
242 'method=s' => \$param->{method},
243 'source-user=s' => \$param->{source_user},
244 'dest-user=s' => \$param->{dest_user},
245 'properties' => \$param->{properties},
246 'dest-config-path=s' => \$param->{dest_config_path},
247 );
248
249 die "can't parse options\n" if $ret == 0;
250
251 $param->{name} //= "default";
252 $param->{maxsnap} //= 1;
253 $param->{method} //= "ssh";
254 $param->{source_user} //= "root";
255 $param->{dest_user} //= "root";
256
257 return $param;
258 }
259
260 sub add_state_to_job {
261 my ($job) = @_;
262
263 my $states = read_state();
264 my $state = $states->{$job->{source}}->{$job->{name}};
265
266 $job->{state} = $state->{state};
267 $job->{lsync} = $state->{lsync};
268 $job->{vm_type} = $state->{vm_type};
269
270 for (my $i = 0; $state->{"snap$i"}; $i++) {
271 $job->{"snap$i"} = $state->{"snap$i"};
272 }
273
274 return $job;
275 }
276
277 sub encode_cron {
278 my (@text) = @_;
279
280 my $cfg = {};
281
282 while (my $line = shift(@text)) {
283
284 my @arg = split('\s', $line);
285 my $param = parse_argv(@arg);
286
287 if ($param->{source} && $param->{dest}) {
288 my $source = delete $param->{source};
289 my $name = delete $param->{name};
290
291 $cfg->{$source}->{$name} = $param;
292 }
293 }
294
295 return $cfg;
296 }
297
298 sub param_to_job {
299 my ($param) = @_;
300
301 my $job = {};
302
303 my $source = parse_target($param->{source});
304 my $dest = parse_target($param->{dest}) if $param->{dest};
305
306 $job->{name} = !$param->{name} ? "default" : $param->{name};
307 $job->{dest} = $param->{dest} if $param->{dest};
308 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
309 $job->{method} = "ssh" if !$job->{method};
310 $job->{limit} = $param->{limit};
311 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
312 $job->{source} = $param->{source};
313 $job->{source_user} = $param->{source_user};
314 $job->{dest_user} = $param->{dest_user};
315 $job->{properties} = !!$param->{properties};
316 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
317
318 return $job;
319 }
320
321 sub read_state {
322
323 if (!-e $STATE) {
324 make_path $CONFIG_PATH;
325 my $new_fh = IO::File->new("> $STATE");
326 die "Could not create $STATE: $!\n" if !$new_fh;
327 print $new_fh "{}";
328 close($new_fh);
329 return undef;
330 }
331
332 my $fh = IO::File->new("< $STATE");
333 die "Could not open file $STATE: $!\n" if !$fh;
334
335 my $text = <$fh>;
336 my $states = decode_json($text);
337
338 close($fh);
339
340 return $states;
341 }
342
343 sub update_state {
344 my ($job) = @_;
345 my $text;
346 my $in_fh;
347
348 eval {
349
350 $in_fh = IO::File->new("< $STATE");
351 die "Could not open file $STATE: $!\n" if !$in_fh;
352 $text = <$in_fh>;
353 };
354
355 my $out_fh = IO::File->new("> $STATE.new");
356 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
357
358 my $states = {};
359 my $state = {};
360 if ($text){
361 $states = decode_json($text);
362 $state = $states->{$job->{source}}->{$job->{name}};
363 }
364
365 if ($job->{state} ne "del") {
366 $state->{state} = $job->{state};
367 $state->{lsync} = $job->{lsync};
368 $state->{vm_type} = $job->{vm_type};
369
370 for (my $i = 0; $job->{"snap$i"} ; $i++) {
371 $state->{"snap$i"} = $job->{"snap$i"};
372 }
373 $states->{$job->{source}}->{$job->{name}} = $state;
374 } else {
375
376 delete $states->{$job->{source}}->{$job->{name}};
377 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
378 }
379
380 $text = encode_json($states);
381 print $out_fh $text;
382
383 close($out_fh);
384 rename "$STATE.new", $STATE;
385 eval {
386 close($in_fh);
387 };
388
389 return $states;
390 }
391
392 sub update_cron {
393 my ($job) = @_;
394
395 my $updated;
396 my $has_header;
397 my $line_no = 0;
398 my $text = "";
399 my $header = "SHELL=/bin/sh\n";
400 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
401
402 my $fh = IO::File->new("< $CRONJOBS");
403 die "Could not open file $CRONJOBS: $!\n" if !$fh;
404
405 my @test = <$fh>;
406
407 while (my $line = shift(@test)) {
408 chomp($line);
409 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
410 $updated = 1;
411 next if $job->{state} eq "del";
412 $text .= format_job($job, $line);
413 } else {
414 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
415 $has_header = 1;
416 }
417 $text .= "$line\n";
418 }
419 $line_no++;
420 }
421
422 if (!$has_header) {
423 $text = "$header$text";
424 }
425
426 if (!$updated) {
427 $text .= format_job($job);
428 }
429 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
430 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
431
432 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
433 close ($new_fh);
434
435 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
436 close ($fh);
437 }
438
439 sub format_job {
440 my ($job, $line) = @_;
441 my $text = "";
442
443 if ($job->{state} eq "stopped") {
444 $text = "#";
445 }
446 if ($line) {
447 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
448 $text .= $1;
449 } else {
450 $text .= "*/$INTERVAL * * * *";
451 }
452 $text .= " root";
453 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
454 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
455 $text .= " --limit $job->{limit}" if $job->{limit};
456 $text .= " --method $job->{method}";
457 $text .= " --verbose" if $job->{verbose};
458 $text .= " --source-user $job->{source_user}";
459 $text .= " --dest-user $job->{dest_user}";
460 $text .= " --properties" if $job->{properties};
461 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
462 $text .= "\n";
463
464 return $text;
465 }
466
467 sub list {
468
469 my $cfg = read_cron();
470
471 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
472
473 my $states = read_state();
474 foreach my $source (sort keys%{$cfg}) {
475 foreach my $name (sort keys%{$cfg->{$source}}) {
476 $list .= sprintf("%-25s", cut_target_width($source, 25));
477 $list .= sprintf("%-25s", cut_target_width($name, 25));
478 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
479 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
480 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
481 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
482 }
483 }
484
485 return $list;
486 }
487
488 sub vm_exists {
489 my ($target, $user) = @_;
490
491 return undef if !defined($target->{vmid});
492
493 my $conf_fn = "$target->{vmid}.conf";
494
495 if ($target->{ip}) {
496 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
497 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
498 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
499 } else {
500 return "qemu" if -f "$QEMU_CONF/$conf_fn";
501 return "lxc" if -f "$LXC_CONF/$conf_fn";
502 }
503
504 return undef;
505 }
506
507 sub init {
508 my ($param) = @_;
509
510 locked("$CONFIG_PATH/cron_and_state.lock", sub {
511 my $cfg = read_cron();
512
513 my $job = param_to_job($param);
514
515 $job->{state} = "ok";
516 $job->{lsync} = 0;
517
518 my $source = parse_target($param->{source});
519 my $dest = parse_target($param->{dest});
520
521 if (my $ip = $dest->{ip}) {
522 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
523 }
524
525 if (my $ip = $source->{ip}) {
526 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
527 }
528
529 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
530
531 if (!defined($source->{vmid})) {
532 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
533 }
534
535 my $vm_type = vm_exists($source, $param->{source_user});
536 $job->{vm_type} = $vm_type;
537 $source->{vm_type} = $vm_type;
538
539 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
540
541 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
542
543 #check if vm has zfs disks if not die;
544 get_disks($source, $param->{source_user}) if $source->{vmid};
545
546 update_cron($job);
547 update_state($job);
548 }); #cron and state lock
549
550 return if $param->{skip};
551
552 eval { sync($param) };
553 if (my $err = $@) {
554 destroy_job($param);
555 print $err;
556 }
557 }
558
559 sub get_job {
560 my ($param) = @_;
561
562 my $cfg = read_cron();
563
564 if (!$cfg->{$param->{source}}->{$param->{name}}) {
565 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
566 }
567 my $job = $cfg->{$param->{source}}->{$param->{name}};
568 $job->{name} = $param->{name};
569 $job->{source} = $param->{source};
570 $job = add_state_to_job($job);
571
572 return $job;
573 }
574
575 sub destroy_job {
576 my ($param) = @_;
577
578 locked("$CONFIG_PATH/cron_and_state.lock", sub {
579 my $job = get_job($param);
580 $job->{state} = "del";
581
582 update_cron($job);
583 update_state($job);
584 });
585 }
586
587 sub sync {
588 my ($param) = @_;
589
590 my $job;
591
592 locked("$CONFIG_PATH/cron_and_state.lock", sub {
593 eval { $job = get_job($param) };
594
595 if ($job) {
596 if (defined($job->{state}) && ($job->{state} eq "syncing" || $job->{state} eq "waiting")) {
597 die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
598 }
599
600 $job->{state} = "waiting";
601 update_state($job);
602 }
603 });
604
605 locked("$CONFIG_PATH/sync.lock", sub {
606
607 my $date = get_date();
608
609 my $dest;
610 my $source;
611 my $vm_type;
612
613 locked("$CONFIG_PATH/cron_and_state.lock", sub {
614 #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
615 eval { $job = get_job($param); };
616
617 if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
618 die "Job --source $param->{source} --name $param->{name} has been disabled\n";
619 }
620
621 $dest = parse_target($param->{dest});
622 $source = parse_target($param->{source});
623
624 $vm_type = vm_exists($source, $param->{source_user});
625 $source->{vm_type} = $vm_type;
626
627 if ($job) {
628 $job->{state} = "syncing";
629 $job->{vm_type} = $vm_type if !$job->{vm_type};
630 update_state($job);
631 }
632 }); #cron and state lock
633
634 my $sync_path = sub {
635 my ($source, $dest, $job, $param, $date) = @_;
636
637 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
638
639 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
640
641 send_image($source, $dest, $param);
642
643 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
644
645 };
646
647 eval{
648 if ($source->{vmid}) {
649 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
650 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
651 my $disks = get_disks($source, $param->{source_user});
652
653 foreach my $disk (sort keys %{$disks}) {
654 $source->{all} = $disks->{$disk}->{all};
655 $source->{pool} = $disks->{$disk}->{pool};
656 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
657 $source->{last_part} = $disks->{$disk}->{last_part};
658 &$sync_path($source, $dest, $job, $param, $date);
659 }
660 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
661 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
662 } else {
663 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
664 }
665 } else {
666 &$sync_path($source, $dest, $job, $param, $date);
667 }
668 };
669 if (my $err = $@) {
670 locked("$CONFIG_PATH/cron_and_state.lock", sub {
671 eval { $job = get_job($param); };
672 if ($job) {
673 $job->{state} = "error";
674 update_state($job);
675 }
676 });
677 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
678 die "$err\n";
679 }
680
681 locked("$CONFIG_PATH/cron_and_state.lock", sub {
682 eval { $job = get_job($param); };
683 if ($job) {
684 if (defined($job->{state}) && $job->{state} eq "stopped") {
685 $job->{state} = "stopped";
686 } else {
687 $job->{state} = "ok";
688 }
689 $job->{lsync} = $date;
690 update_state($job);
691 }
692 });
693 }); #sync lock
694 }
695
696 sub snapshot_get{
697 my ($source, $dest, $max_snap, $name, $source_user) = @_;
698
699 my $cmd = [];
700 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
701 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
702 push @$cmd, $source->{all};
703
704 my $raw = run_cmd($cmd);
705 my $index = 0;
706 my $line = "";
707 my $last_snap = undef;
708 my $old_snap;
709
710 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
711 $line = $1;
712 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
713
714 $last_snap = $1 if (!$last_snap);
715 $old_snap = $1;
716 $index++;
717 if ($index == $max_snap) {
718 $source->{destroy} = 1;
719 last;
720 };
721 }
722 }
723
724 return ($old_snap, $last_snap) if $last_snap;
725
726 return undef;
727 }
728
729 sub snapshot_add {
730 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
731
732 my $snap_name = "rep_$name\_".$date;
733
734 $source->{new_snap} = $snap_name;
735
736 my $path = "$source->{all}\@$snap_name";
737
738 my $cmd = [];
739 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
740 push @$cmd, 'zfs', 'snapshot', $path;
741 eval{
742 run_cmd($cmd);
743 };
744
745 if (my $err = $@) {
746 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
747 die "$err\n";
748 }
749 }
750
751 sub write_cron {
752 my ($cfg) = @_;
753
754 my $text = "SHELL=/bin/sh\n";
755 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
756
757 my $fh = IO::File->new("> $CRONJOBS");
758 die "Could not open file: $!\n" if !$fh;
759
760 foreach my $source (sort keys%{$cfg}) {
761 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
762 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
763 $text .= "$PROG_PATH sync";
764 $text .= " -source ";
765 if ($cfg->{$source}->{$sync_name}->{vmid}) {
766 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
767 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
768 } else {
769 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
770 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
771 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
772 }
773 $text .= " -dest ";
774 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
775 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
776 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
777 $text .= " -name $sync_name ";
778 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
779 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
780 $text .= "\n";
781 }
782 }
783 die "Can't write to cron\n" if (!print($fh $text));
784 close($fh);
785 }
786
787 sub get_disks {
788 my ($target, $user) = @_;
789
790 my $cmd = [];
791 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
792
793 if ($target->{vm_type} eq 'qemu') {
794 push @$cmd, 'qm', 'config', $target->{vmid};
795 } elsif ($target->{vm_type} eq 'lxc') {
796 push @$cmd, 'pct', 'config', $target->{vmid};
797 } else {
798 die "VM Type unknown\n";
799 }
800
801 my $res = run_cmd($cmd);
802
803 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
804
805 return $disks;
806 }
807
808 sub run_cmd {
809 my ($cmd) = @_;
810 print "Start CMD\n" if $DEBUG;
811 print Dumper $cmd if $DEBUG;
812 if (ref($cmd) eq 'ARRAY') {
813 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
814 }
815 my $output = `$cmd 2>&1`;
816
817 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
818
819 chomp($output);
820 print Dumper $output if $DEBUG;
821 print "END CMD\n" if $DEBUG;
822 return $output;
823 }
824
825 sub parse_disks {
826 my ($text, $ip, $vm_type, $user) = @_;
827
828 my $disks;
829
830 my $num = 0;
831 while ($text && $text =~ s/^(.*?)(\n|$)//) {
832 my $line = $1;
833
834 next if $line =~ /media=cdrom/;
835 next if $line !~ m/$DISK_KEY_RE/;
836
837 #QEMU if backup is not set include in sync
838 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
839
840 #LXC if backup is not set do no in sync
841 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
842
843 my $disk = undef;
844 my $stor = undef;
845 if($line =~ m/$DISK_KEY_RE(.*)$/) {
846 my @parameter = split(/,/,$1);
847
848 foreach my $opt (@parameter) {
849 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
850 $disk = $2;
851 $stor = $1;
852 last;
853 }
854 }
855 }
856 if (!defined($disk) || !defined($stor)) {
857 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
858 next;
859 }
860
861 my $cmd = [];
862 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
863 push @$cmd, 'pvesm', 'path', "$stor$disk";
864 my $path = run_cmd($cmd);
865
866 die "Get no path from pvesm path $stor$disk\n" if !$path;
867
868 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
869
870 my @array = split('/', $1);
871 $disks->{$num}->{pool} = shift(@array);
872 $disks->{$num}->{all} = $disks->{$num}->{pool};
873 if (0 < @array) {
874 $disks->{$num}->{path} = join('/', @array);
875 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
876 }
877 $disks->{$num}->{last_part} = $disk;
878 $disks->{$num}->{all} .= "\/$disk";
879
880 $num++;
881 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
882
883 $disks->{$num}->{pool} = $1;
884 $disks->{$num}->{all} = $disks->{$num}->{pool};
885
886 if ($2) {
887 $disks->{$num}->{path} = $3;
888 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
889 }
890
891 $disks->{$num}->{last_part} = $disk;
892 $disks->{$num}->{all} .= "\/$disk";
893
894 $num++;
895
896 } else {
897 die "ERROR: in path\n";
898 }
899 }
900
901 die "Vm include no disk on zfs.\n" if !$disks->{0};
902 return $disks;
903 }
904
905 sub snapshot_destroy {
906 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
907
908 my @zfscmd = ('zfs', 'destroy');
909 my $snapshot = "$source->{all}\@$snap";
910
911 eval {
912 if($source->{ip} && $method eq 'ssh'){
913 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
914 } else {
915 run_cmd([@zfscmd, $snapshot]);
916 }
917 };
918 if (my $erro = $@) {
919 warn "WARN: $erro";
920 }
921 if ($dest) {
922 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
923
924 my $path = "$dest->{all}";
925 $path .= "/$source->{last_part}" if $source->{last_part};
926
927 eval {
928 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
929 };
930 if (my $erro = $@) {
931 warn "WARN: $erro";
932 }
933 }
934 }
935
936 sub snapshot_exist {
937 my ($source , $dest, $method, $dest_user) = @_;
938
939 my $cmd = [];
940 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--' if $dest->{ip};
941 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
942
943 my $path = $dest->{all};
944 $path .= "/$source->{last_part}" if $source->{last_part};
945 $path .= "\@$source->{old_snap}";
946
947 push @$cmd, $path;
948
949
950 my $text = "";
951 eval {$text =run_cmd($cmd);};
952 if (my $erro =$@) {
953 warn "WARN: $erro";
954 return undef;
955 }
956
957 while ($text && $text =~ s/^(.*?)(\n|$)//) {
958 my $line =$1;
959 return 1 if $line =~ m/^.*$source->{old_snap}$/;
960 }
961 }
962
963 sub send_image {
964 my ($source, $dest, $param) = @_;
965
966 my $cmd = [];
967
968 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
969 push @$cmd, 'zfs', 'send';
970 push @$cmd, '-p', if $param->{properties};
971 push @$cmd, '-v' if $param->{verbose};
972
973 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{dest_user})) {
974 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
975 }
976 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
977
978 if ($param->{limit}){
979 my $bwl = $param->{limit}*1024;
980 push @$cmd, \'|', 'cstream', '-t', $bwl;
981 }
982 my $target = "$dest->{all}";
983 $target .= "/$source->{last_part}" if $source->{last_part};
984 $target =~ s!/+!/!g;
985
986 push @$cmd, \'|';
987 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
988 push @$cmd, 'zfs', 'recv', '-F', '--';
989 push @$cmd, "$target";
990
991 eval {
992 run_cmd($cmd)
993 };
994
995 if (my $erro = $@) {
996 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
997 die $erro;
998 };
999 }
1000
1001
1002 sub send_config{
1003 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
1004
1005 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
1006 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
1007
1008 my $config_dir = $dest_config_path // $CONFIG_PATH;
1009 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
1010
1011 $dest_target_new = $config_dir.'/'.$dest_target_new;
1012
1013 if ($method eq 'ssh'){
1014 if ($dest->{ip} && $source->{ip}) {
1015 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1016 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1017 } elsif ($dest->{ip}) {
1018 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1019 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1020 } elsif ($source->{ip}) {
1021 run_cmd(['mkdir', '-p', '--', $config_dir]);
1022 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
1023 }
1024
1025 if ($source->{destroy}){
1026 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
1027 if($dest->{ip}){
1028 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
1029 } else {
1030 run_cmd(['rm', '-f', '--', $dest_target_old]);
1031 }
1032 }
1033 } elsif ($method eq 'local') {
1034 run_cmd(['mkdir', '-p', '--', $config_dir]);
1035 run_cmd(['cp', $source_target, $dest_target_new]);
1036 }
1037 }
1038
1039 sub get_date {
1040 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1041 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1042
1043 return $datestamp;
1044 }
1045
1046 sub status {
1047 my $cfg = read_cron();
1048
1049 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1050
1051 my $states = read_state();
1052
1053 foreach my $source (sort keys%{$cfg}) {
1054 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1055 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1056 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1057 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1058 }
1059 }
1060
1061 return $status_list;
1062 }
1063
1064 sub enable_job {
1065 my ($param) = @_;
1066
1067 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1068 my $job = get_job($param);
1069 $job->{state} = "ok";
1070 update_state($job);
1071 update_cron($job);
1072 });
1073 }
1074
1075 sub disable_job {
1076 my ($param) = @_;
1077
1078 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1079 my $job = get_job($param);
1080 $job->{state} = "stopped";
1081 update_state($job);
1082 update_cron($job);
1083 });
1084 }
1085
1086 my $cmd_help = {
1087 destroy => qq{
1088 $PROGNAME destroy -source <string> [OPTIONS]
1089
1090 remove a sync Job from the scheduler
1091
1092 -name string
1093
1094 name of the sync job, if not set it is default
1095
1096 -source string
1097
1098 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1099 },
1100 create => qq{
1101 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1102
1103 Create a sync Job
1104
1105 -dest string
1106
1107 the destination target is like [IP]:<Pool>[/Path]
1108
1109 -dest-user string
1110
1111 name of the user on the destination target, root by default
1112
1113 -limit integer
1114
1115 max sync speed in kBytes/s, default unlimited
1116
1117 -maxsnap string
1118
1119 how much snapshots will be kept before get erased, default 1
1120
1121 -name string
1122
1123 name of the sync job, if not set it is default
1124
1125 -skip boolean
1126
1127 if this flag is set it will skip the first sync
1128
1129 -source string
1130
1131 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1132
1133 -source-user string
1134
1135 name of the user on the source target, root by default
1136
1137 -properties boolean
1138
1139 Include the dataset's properties in the stream.
1140
1141 -dest-config-path string
1142
1143 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1144 },
1145 sync => qq{
1146 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1147
1148 will sync one time
1149
1150 -dest string
1151
1152 the destination target is like [IP:]<Pool>[/Path]
1153
1154 -dest-user string
1155
1156 name of the user on the destination target, root by default
1157
1158 -limit integer
1159
1160 max sync speed in kBytes/s, default unlimited
1161
1162 -maxsnap integer
1163
1164 how much snapshots will be kept before get erased, default 1
1165
1166 -name string
1167
1168 name of the sync job, if not set it is default.
1169 It is only necessary if scheduler allready contains this source.
1170
1171 -source string
1172
1173 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1174
1175 -source-user string
1176
1177 name of the user on the source target, root by default
1178
1179 -verbose boolean
1180
1181 print out the sync progress.
1182
1183 -properties boolean
1184
1185 Include the dataset's properties in the stream.
1186
1187 -dest-config-path string
1188
1189 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1190 },
1191 list => qq{
1192 $PROGNAME list
1193
1194 Get a List of all scheduled Sync Jobs
1195 },
1196 status => qq{
1197 $PROGNAME status
1198
1199 Get the status of all scheduled Sync Jobs
1200 },
1201 help => qq{
1202 $PROGNAME help <cmd> [OPTIONS]
1203
1204 Get help about specified command.
1205
1206 <cmd> string
1207
1208 Command name
1209
1210 -verbose boolean
1211
1212 Verbose output format.
1213 },
1214 enable => qq{
1215 $PROGNAME enable -source <string> [OPTIONS]
1216
1217 enable a syncjob and reset error
1218
1219 -name string
1220
1221 name of the sync job, if not set it is default
1222
1223 -source string
1224
1225 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1226 },
1227 disable => qq{
1228 $PROGNAME disable -source <string> [OPTIONS]
1229
1230 pause a sync job
1231
1232 -name string
1233
1234 name of the sync job, if not set it is default
1235
1236 -source string
1237
1238 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1239 },
1240 printpod => 'internal command',
1241
1242 };
1243
1244 if (!$command) {
1245 usage(); die "\n";
1246 } elsif (!$cmd_help->{$command}) {
1247 print "ERROR: unknown command '$command'";
1248 usage(1); die "\n";
1249 }
1250
1251 my @arg = @ARGV;
1252 my $param = parse_argv(@arg);
1253
1254 sub check_params {
1255 for (@_) {
1256 die "$cmd_help->{$command}\n" if !$param->{$_};
1257 }
1258 }
1259
1260 if ($command eq 'destroy') {
1261 check_params(qw(source));
1262
1263 check_target($param->{source});
1264 destroy_job($param);
1265
1266 } elsif ($command eq 'sync') {
1267 check_params(qw(source dest));
1268
1269 check_target($param->{source});
1270 check_target($param->{dest});
1271 sync($param);
1272
1273 } elsif ($command eq 'create') {
1274 check_params(qw(source dest));
1275
1276 check_target($param->{source});
1277 check_target($param->{dest});
1278 init($param);
1279
1280 } elsif ($command eq 'status') {
1281 print status();
1282
1283 } elsif ($command eq 'list') {
1284 print list();
1285
1286 } elsif ($command eq 'help') {
1287 my $help_command = $ARGV[1];
1288
1289 if ($help_command && $cmd_help->{$help_command}) {
1290 die "$cmd_help->{$help_command}\n";
1291
1292 }
1293 if ($param->{verbose}) {
1294 exec("man $PROGNAME");
1295
1296 } else {
1297 usage(1);
1298
1299 }
1300
1301 } elsif ($command eq 'enable') {
1302 check_params(qw(source));
1303
1304 check_target($param->{source});
1305 enable_job($param);
1306
1307 } elsif ($command eq 'disable') {
1308 check_params(qw(source));
1309
1310 check_target($param->{source});
1311 disable_job($param);
1312
1313 } elsif ($command eq 'printpod') {
1314 print_pod();
1315 }
1316
1317 sub usage {
1318 my ($help) = @_;
1319
1320 print("ERROR:\tno command specified\n") if !$help;
1321 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1322 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1323 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1324 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1325 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1326 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1327 print("\t$PROGNAME list\n");
1328 print("\t$PROGNAME status\n");
1329 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1330 }
1331
1332 sub check_target {
1333 my ($target) = @_;
1334 parse_target($target);
1335 }
1336
1337 sub print_pod {
1338
1339 my $synopsis = join("\n", sort values %$cmd_help);
1340
1341 print <<EOF;
1342 =head1 NAME
1343
1344 pve-zsync - PVE ZFS Replication Manager
1345
1346 =head1 SYNOPSIS
1347
1348 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1349
1350 $synopsis
1351
1352 =head1 DESCRIPTION
1353
1354 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1355 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1356 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1357 To config cron see man crontab.
1358
1359 =head2 PVE ZFS Storage sync Tool
1360
1361 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1362
1363 =head1 EXAMPLES
1364
1365 add sync job from local VM to remote ZFS Server
1366 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1367
1368 =head1 IMPORTANT FILES
1369
1370 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1371
1372 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1373
1374 =head1 COPYRIGHT AND DISCLAIMER
1375
1376 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1377
1378 This program is free software: you can redistribute it and/or modify it
1379 under the terms of the GNU Affero General Public License as published
1380 by the Free Software Foundation, either version 3 of the License, or
1381 (at your option) any later version.
1382
1383 This program is distributed in the hope that it will be useful, but
1384 WITHOUT ANY WARRANTY; without even the implied warranty of
1385 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1386 Affero General Public License for more details.
1387
1388 You should have received a copy of the GNU Affero General Public
1389 License along with this program. If not, see
1390 <http://www.gnu.org/licenses/>.
1391
1392 EOF
1393 }