]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
pve-zsync: Flip Source and Dest in functions to so jobs can share Dest
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $PROG_PATH = "$PATH/${PROGNAME}";
22 my $INTERVAL = 15;
23 my $DEBUG;
24
25 BEGIN {
26 $DEBUG = 0; # change default here. not above on declaration!
27 $DEBUG ||= $ENV{ZSYNC_DEBUG};
28 if ($DEBUG) {
29 require Data::Dumper;
30 Data::Dumper->import();
31 }
32 }
33
34 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
35 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
36 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
37 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
38
39 my $IPV6RE = "(?:" .
40 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
41 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
42 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
43 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
49
50 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
51 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
52 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
53 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
54 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
55
56 my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|mp)\d+)|rootfs): /;
57
58 my $command = $ARGV[0];
59
60 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
61 check_bin ('cstream');
62 check_bin ('zfs');
63 check_bin ('ssh');
64 check_bin ('scp');
65 }
66
67 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
68 die "Signaled, aborting sync: $!\n";
69 };
70
71 sub check_bin {
72 my ($bin) = @_;
73
74 foreach my $p (split (/:/, $ENV{PATH})) {
75 my $fn = "$p/$bin";
76 if (-x $fn) {
77 return $fn;
78 }
79 }
80
81 die "unable to find command '$bin'\n";
82 }
83
84 sub cut_target_width {
85 my ($path, $maxlen) = @_;
86 $path =~ s@/+@/@g;
87
88 return $path if length($path) <= $maxlen;
89
90 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
91
92 $path =~ s@/([^/]+/?)$@@;
93 my $tail = $1;
94
95 if (length($tail)+3 == $maxlen) {
96 return "../$tail";
97 } elsif (length($tail)+2 >= $maxlen) {
98 return '..'.substr($tail, -$maxlen+2)
99 }
100
101 $path =~ s@(/[^/]+)(?:/|$)@@;
102 my $head = $1;
103 my $both = length($head) + length($tail);
104 my $remaining = $maxlen-$both-4; # -4 for "/../"
105
106 if ($remaining < 0) {
107 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
108 }
109
110 substr($path, ($remaining/2), (length($path)-$remaining), '..');
111 return "$head/" . $path . "/$tail";
112 }
113
114 sub locked {
115 my ($lock_fn, $code) = @_;
116
117 my $lock_fh = IO::File->new("> $lock_fn");
118
119 flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
120 my $res = eval { $code->() };
121 my $err = $@;
122
123 flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
124 die "$err" if $err;
125
126 close($lock_fh);
127 return $res;
128 }
129
130 sub get_status {
131 my ($source, $name, $status) = @_;
132
133 if ($status->{$source->{all}}->{$name}->{status}) {
134 return $status;
135 }
136
137 return undef;
138 }
139
140 sub check_pool_exists {
141 my ($target, $user) = @_;
142
143 my $cmd = [];
144
145 if ($target->{ip}) {
146 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
147 }
148 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
149 eval {
150 run_cmd($cmd);
151 };
152
153 if ($@) {
154 return 0;
155 }
156 return 1;
157 }
158
159 sub parse_target {
160 my ($text) = @_;
161
162 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
163 my $target = {};
164
165 if ($text !~ $TARGETRE) {
166 die "$errstr\n";
167 }
168 $target->{all} = $2;
169 $target->{ip} = $1 if $1;
170 my @parts = split('/', $2);
171
172 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
173
174 my $pool = $target->{pool} = shift(@parts);
175 die "$errstr\n" if !$pool;
176
177 if ($pool =~ m/^\d+$/) {
178 $target->{vmid} = $pool;
179 delete $target->{pool};
180 }
181
182 return $target if (@parts == 0);
183 $target->{last_part} = pop(@parts);
184
185 if ($target->{ip}) {
186 pop(@parts);
187 }
188 if (@parts > 0) {
189 $target->{path} = join('/', @parts);
190 }
191
192 return $target;
193 }
194
195 sub read_cron {
196
197 #This is for the first use to init file;
198 if (!-e $CRONJOBS) {
199 my $new_fh = IO::File->new("> $CRONJOBS");
200 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
201 close($new_fh);
202 return undef;
203 }
204
205 my $fh = IO::File->new("< $CRONJOBS");
206 die "Could not open file $CRONJOBS: $!\n" if !$fh;
207
208 my @text = <$fh>;
209
210 close($fh);
211
212 return encode_cron(@text);
213 }
214
215 sub parse_argv {
216 my (@arg) = @_;
217
218 my $param = {
219 dest => undef,
220 source => undef,
221 verbose => undef,
222 limit => undef,
223 maxsnap => undef,
224 name => undef,
225 skip => undef,
226 method => undef,
227 source_user => undef,
228 dest_user => undef,
229 properties => undef,
230 dest_config_path => undef,
231 };
232
233 my ($ret) = GetOptionsFromArray(
234 \@arg,
235 'dest=s' => \$param->{dest},
236 'source=s' => \$param->{source},
237 'verbose' => \$param->{verbose},
238 'limit=i' => \$param->{limit},
239 'maxsnap=i' => \$param->{maxsnap},
240 'name=s' => \$param->{name},
241 'skip' => \$param->{skip},
242 'method=s' => \$param->{method},
243 'source-user=s' => \$param->{source_user},
244 'dest-user=s' => \$param->{dest_user},
245 'properties' => \$param->{properties},
246 'dest-config-path=s' => \$param->{dest_config_path},
247 );
248
249 die "can't parse options\n" if $ret == 0;
250
251 $param->{name} //= "default";
252 $param->{maxsnap} //= 1;
253 $param->{method} //= "ssh";
254 $param->{source_user} //= "root";
255 $param->{dest_user} //= "root";
256
257 return $param;
258 }
259
260 sub add_state_to_job {
261 my ($job) = @_;
262
263 my $states = read_state();
264 my $state = $states->{$job->{source}}->{$job->{name}};
265
266 $job->{state} = $state->{state};
267 $job->{lsync} = $state->{lsync};
268 $job->{vm_type} = $state->{vm_type};
269
270 for (my $i = 0; $state->{"snap$i"}; $i++) {
271 $job->{"snap$i"} = $state->{"snap$i"};
272 }
273
274 return $job;
275 }
276
277 sub encode_cron {
278 my (@text) = @_;
279
280 my $cfg = {};
281
282 while (my $line = shift(@text)) {
283
284 my @arg = split('\s', $line);
285 my $param = parse_argv(@arg);
286
287 if ($param->{source} && $param->{dest}) {
288 my $source = delete $param->{source};
289 my $name = delete $param->{name};
290
291 $cfg->{$source}->{$name} = $param;
292 }
293 }
294
295 return $cfg;
296 }
297
298 sub param_to_job {
299 my ($param) = @_;
300
301 my $job = {};
302
303 my $source = parse_target($param->{source});
304 my $dest = parse_target($param->{dest}) if $param->{dest};
305
306 $job->{name} = !$param->{name} ? "default" : $param->{name};
307 $job->{dest} = $param->{dest} if $param->{dest};
308 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
309 $job->{method} = "ssh" if !$job->{method};
310 $job->{limit} = $param->{limit};
311 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
312 $job->{source} = $param->{source};
313 $job->{source_user} = $param->{source_user};
314 $job->{dest_user} = $param->{dest_user};
315 $job->{properties} = !!$param->{properties};
316 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
317
318 return $job;
319 }
320
321 sub read_state {
322
323 if (!-e $STATE) {
324 make_path $CONFIG_PATH;
325 my $new_fh = IO::File->new("> $STATE");
326 die "Could not create $STATE: $!\n" if !$new_fh;
327 print $new_fh "{}";
328 close($new_fh);
329 return undef;
330 }
331
332 my $fh = IO::File->new("< $STATE");
333 die "Could not open file $STATE: $!\n" if !$fh;
334
335 my $text = <$fh>;
336 my $states = decode_json($text);
337
338 close($fh);
339
340 return $states;
341 }
342
343 sub update_state {
344 my ($job) = @_;
345 my $text;
346 my $in_fh;
347
348 eval {
349
350 $in_fh = IO::File->new("< $STATE");
351 die "Could not open file $STATE: $!\n" if !$in_fh;
352 $text = <$in_fh>;
353 };
354
355 my $out_fh = IO::File->new("> $STATE.new");
356 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
357
358 my $states = {};
359 my $state = {};
360 if ($text){
361 $states = decode_json($text);
362 $state = $states->{$job->{source}}->{$job->{name}};
363 }
364
365 if ($job->{state} ne "del") {
366 $state->{state} = $job->{state};
367 $state->{lsync} = $job->{lsync};
368 $state->{vm_type} = $job->{vm_type};
369
370 for (my $i = 0; $job->{"snap$i"} ; $i++) {
371 $state->{"snap$i"} = $job->{"snap$i"};
372 }
373 $states->{$job->{source}}->{$job->{name}} = $state;
374 } else {
375
376 delete $states->{$job->{source}}->{$job->{name}};
377 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
378 }
379
380 $text = encode_json($states);
381 print $out_fh $text;
382
383 close($out_fh);
384 rename "$STATE.new", $STATE;
385 eval {
386 close($in_fh);
387 };
388
389 return $states;
390 }
391
392 sub update_cron {
393 my ($job) = @_;
394
395 my $updated;
396 my $has_header;
397 my $line_no = 0;
398 my $text = "";
399 my $header = "SHELL=/bin/sh\n";
400 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
401
402 my $fh = IO::File->new("< $CRONJOBS");
403 die "Could not open file $CRONJOBS: $!\n" if !$fh;
404
405 my @test = <$fh>;
406
407 while (my $line = shift(@test)) {
408 chomp($line);
409 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
410 $updated = 1;
411 next if $job->{state} eq "del";
412 $text .= format_job($job, $line);
413 } else {
414 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
415 $has_header = 1;
416 }
417 $text .= "$line\n";
418 }
419 $line_no++;
420 }
421
422 if (!$has_header) {
423 $text = "$header$text";
424 }
425
426 if (!$updated) {
427 $text .= format_job($job);
428 }
429 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
430 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
431
432 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
433 close ($new_fh);
434
435 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
436 close ($fh);
437 }
438
439 sub format_job {
440 my ($job, $line) = @_;
441 my $text = "";
442
443 if ($job->{state} eq "stopped") {
444 $text = "#";
445 }
446 if ($line) {
447 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
448 $text .= $1;
449 } else {
450 $text .= "*/$INTERVAL * * * *";
451 }
452 $text .= " root";
453 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
454 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
455 $text .= " --limit $job->{limit}" if $job->{limit};
456 $text .= " --method $job->{method}";
457 $text .= " --verbose" if $job->{verbose};
458 $text .= " --source-user $job->{source_user}";
459 $text .= " --dest-user $job->{dest_user}";
460 $text .= " --properties" if $job->{properties};
461 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
462 $text .= "\n";
463
464 return $text;
465 }
466
467 sub list {
468
469 my $cfg = read_cron();
470
471 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
472
473 my $states = read_state();
474 foreach my $source (sort keys%{$cfg}) {
475 foreach my $name (sort keys%{$cfg->{$source}}) {
476 $list .= sprintf("%-25s", cut_target_width($source, 25));
477 $list .= sprintf("%-25s", cut_target_width($name, 25));
478 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
479 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
480 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
481 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
482 }
483 }
484
485 return $list;
486 }
487
488 sub vm_exists {
489 my ($target, $user) = @_;
490
491 return undef if !defined($target->{vmid});
492
493 my $conf_fn = "$target->{vmid}.conf";
494
495 if ($target->{ip}) {
496 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
497 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
498 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
499 } else {
500 return "qemu" if -f "$QEMU_CONF/$conf_fn";
501 return "lxc" if -f "$LXC_CONF/$conf_fn";
502 }
503
504 return undef;
505 }
506
507 sub init {
508 my ($param) = @_;
509
510 locked("$CONFIG_PATH/cron_and_state.lock", sub {
511 my $cfg = read_cron();
512
513 my $job = param_to_job($param);
514
515 $job->{state} = "ok";
516 $job->{lsync} = 0;
517
518 my $source = parse_target($param->{source});
519 my $dest = parse_target($param->{dest});
520
521 if (my $ip = $dest->{ip}) {
522 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
523 }
524
525 if (my $ip = $source->{ip}) {
526 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
527 }
528
529 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
530
531 if (!defined($source->{vmid})) {
532 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
533 }
534
535 my $vm_type = vm_exists($source, $param->{source_user});
536 $job->{vm_type} = $vm_type;
537 $source->{vm_type} = $vm_type;
538
539 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
540
541 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
542
543 #check if vm has zfs disks if not die;
544 get_disks($source, $param->{source_user}) if $source->{vmid};
545
546 update_cron($job);
547 update_state($job);
548 }); #cron and state lock
549
550 return if $param->{skip};
551
552 eval { sync($param) };
553 if (my $err = $@) {
554 destroy_job($param);
555 print $err;
556 }
557 }
558
559 sub get_job {
560 my ($param) = @_;
561
562 my $cfg = read_cron();
563
564 if (!$cfg->{$param->{source}}->{$param->{name}}) {
565 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
566 }
567 my $job = $cfg->{$param->{source}}->{$param->{name}};
568 $job->{name} = $param->{name};
569 $job->{source} = $param->{source};
570 $job = add_state_to_job($job);
571
572 return $job;
573 }
574
575 sub destroy_job {
576 my ($param) = @_;
577
578 locked("$CONFIG_PATH/cron_and_state.lock", sub {
579 my $job = get_job($param);
580 $job->{state} = "del";
581
582 update_cron($job);
583 update_state($job);
584 });
585 }
586
587 sub sync {
588 my ($param) = @_;
589
590 my $job;
591
592 locked("$CONFIG_PATH/cron_and_state.lock", sub {
593 eval { $job = get_job($param) };
594
595 if ($job) {
596 if (defined($job->{state}) && ($job->{state} eq "syncing" || $job->{state} eq "waiting")) {
597 die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
598 }
599
600 $job->{state} = "waiting";
601 update_state($job);
602 }
603 });
604
605 locked("$CONFIG_PATH/sync.lock", sub {
606
607 my $date = get_date();
608
609 my $dest;
610 my $source;
611 my $vm_type;
612
613 locked("$CONFIG_PATH/cron_and_state.lock", sub {
614 #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
615 eval { $job = get_job($param); };
616
617 if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
618 die "Job --source $param->{source} --name $param->{name} has been disabled\n";
619 }
620
621 $dest = parse_target($param->{dest});
622 $source = parse_target($param->{source});
623
624 $vm_type = vm_exists($source, $param->{source_user});
625 $source->{vm_type} = $vm_type;
626
627 if ($job) {
628 $job->{state} = "syncing";
629 $job->{vm_type} = $vm_type if !$job->{vm_type};
630 update_state($job);
631 }
632 }); #cron and state lock
633
634 my $sync_path = sub {
635 my ($source, $dest, $job, $param, $date) = @_;
636
637 ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{dest_user});
638
639 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
640
641 send_image($source, $dest, $param);
642
643 snapshot_destroy($source, $dest, $param->{method}, $dest->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $dest->{old_snap});
644
645 };
646
647 eval{
648 if ($source->{vmid}) {
649 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
650 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
651 my $disks = get_disks($source, $param->{source_user});
652
653 foreach my $disk (sort keys %{$disks}) {
654 $source->{all} = $disks->{$disk}->{all};
655 $source->{pool} = $disks->{$disk}->{pool};
656 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
657 $source->{last_part} = $disks->{$disk}->{last_part};
658 &$sync_path($source, $dest, $job, $param, $date);
659 }
660 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
661 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
662 } else {
663 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
664 }
665 } else {
666 &$sync_path($source, $dest, $job, $param, $date);
667 }
668 };
669 if (my $err = $@) {
670 locked("$CONFIG_PATH/cron_and_state.lock", sub {
671 eval { $job = get_job($param); };
672 if ($job) {
673 $job->{state} = "error";
674 update_state($job);
675 }
676 });
677 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
678 die "$err\n";
679 }
680
681 locked("$CONFIG_PATH/cron_and_state.lock", sub {
682 eval { $job = get_job($param); };
683 if ($job) {
684 if (defined($job->{state}) && $job->{state} eq "stopped") {
685 $job->{state} = "stopped";
686 } else {
687 $job->{state} = "ok";
688 }
689 $job->{lsync} = $date;
690 update_state($job);
691 }
692 });
693 }); #sync lock
694 }
695
696 sub snapshot_get{
697 my ($source, $dest, $max_snap, $name, $dest_user) = @_;
698
699 my $cmd = [];
700 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--', if $dest->{ip};
701 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
702
703 my $path = $dest->{all};
704 $path .= "/$source->{last_part}" if $source->{last_part};
705 push @$cmd, $path;
706
707 my $raw;
708 eval {$raw = run_cmd($cmd)};
709 if (my $erro =$@) { #this means the volume doesn't exist on dest yet
710 return undef;
711 }
712
713 my $index = 0;
714 my $line = "";
715 my $last_snap = undef;
716 my $old_snap;
717
718 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
719 $line = $1;
720 if ($line =~ m/@(.*)$/) {
721 $last_snap = $1 if (!$last_snap);
722 }
723 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
724 $old_snap = $1;
725 $index++;
726 if ($index == $max_snap) {
727 $source->{destroy} = 1;
728 last;
729 };
730 }
731 }
732
733 return ($old_snap, $last_snap) if $last_snap;
734
735 return undef;
736 }
737
738 sub snapshot_add {
739 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
740
741 my $snap_name = "rep_$name\_".$date;
742
743 $source->{new_snap} = $snap_name;
744
745 my $path = "$source->{all}\@$snap_name";
746
747 my $cmd = [];
748 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
749 push @$cmd, 'zfs', 'snapshot', $path;
750 eval{
751 run_cmd($cmd);
752 };
753
754 if (my $err = $@) {
755 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
756 die "$err\n";
757 }
758 }
759
760 sub write_cron {
761 my ($cfg) = @_;
762
763 my $text = "SHELL=/bin/sh\n";
764 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
765
766 my $fh = IO::File->new("> $CRONJOBS");
767 die "Could not open file: $!\n" if !$fh;
768
769 foreach my $source (sort keys%{$cfg}) {
770 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
771 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
772 $text .= "$PROG_PATH sync";
773 $text .= " -source ";
774 if ($cfg->{$source}->{$sync_name}->{vmid}) {
775 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
776 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
777 } else {
778 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
779 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
780 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
781 }
782 $text .= " -dest ";
783 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
784 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
785 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
786 $text .= " -name $sync_name ";
787 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
788 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
789 $text .= "\n";
790 }
791 }
792 die "Can't write to cron\n" if (!print($fh $text));
793 close($fh);
794 }
795
796 sub get_disks {
797 my ($target, $user) = @_;
798
799 my $cmd = [];
800 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
801
802 if ($target->{vm_type} eq 'qemu') {
803 push @$cmd, 'qm', 'config', $target->{vmid};
804 } elsif ($target->{vm_type} eq 'lxc') {
805 push @$cmd, 'pct', 'config', $target->{vmid};
806 } else {
807 die "VM Type unknown\n";
808 }
809
810 my $res = run_cmd($cmd);
811
812 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
813
814 return $disks;
815 }
816
817 sub run_cmd {
818 my ($cmd) = @_;
819 print "Start CMD\n" if $DEBUG;
820 print Dumper $cmd if $DEBUG;
821 if (ref($cmd) eq 'ARRAY') {
822 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
823 }
824 my $output = `$cmd 2>&1`;
825
826 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
827
828 chomp($output);
829 print Dumper $output if $DEBUG;
830 print "END CMD\n" if $DEBUG;
831 return $output;
832 }
833
834 sub parse_disks {
835 my ($text, $ip, $vm_type, $user) = @_;
836
837 my $disks;
838
839 my $num = 0;
840 while ($text && $text =~ s/^(.*?)(\n|$)//) {
841 my $line = $1;
842
843 next if $line =~ /media=cdrom/;
844 next if $line !~ m/$DISK_KEY_RE/;
845
846 #QEMU if backup is not set include in sync
847 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
848
849 #LXC if backup is not set do no in sync
850 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
851
852 my $disk = undef;
853 my $stor = undef;
854 if($line =~ m/$DISK_KEY_RE(.*)$/) {
855 my @parameter = split(/,/,$1);
856
857 foreach my $opt (@parameter) {
858 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
859 $disk = $2;
860 $stor = $1;
861 last;
862 }
863 }
864 }
865 if (!defined($disk) || !defined($stor)) {
866 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
867 next;
868 }
869
870 my $cmd = [];
871 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
872 push @$cmd, 'pvesm', 'path', "$stor$disk";
873 my $path = run_cmd($cmd);
874
875 die "Get no path from pvesm path $stor$disk\n" if !$path;
876
877 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
878
879 my @array = split('/', $1);
880 $disks->{$num}->{pool} = shift(@array);
881 $disks->{$num}->{all} = $disks->{$num}->{pool};
882 if (0 < @array) {
883 $disks->{$num}->{path} = join('/', @array);
884 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
885 }
886 $disks->{$num}->{last_part} = $disk;
887 $disks->{$num}->{all} .= "\/$disk";
888
889 $num++;
890 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
891
892 $disks->{$num}->{pool} = $1;
893 $disks->{$num}->{all} = $disks->{$num}->{pool};
894
895 if ($2) {
896 $disks->{$num}->{path} = $3;
897 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
898 }
899
900 $disks->{$num}->{last_part} = $disk;
901 $disks->{$num}->{all} .= "\/$disk";
902
903 $num++;
904
905 } else {
906 die "ERROR: in path\n";
907 }
908 }
909
910 die "Vm include no disk on zfs.\n" if !$disks->{0};
911 return $disks;
912 }
913
914 sub snapshot_destroy {
915 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
916
917 my @zfscmd = ('zfs', 'destroy');
918 my $snapshot = "$source->{all}\@$snap";
919
920 eval {
921 if($source->{ip} && $method eq 'ssh'){
922 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
923 } else {
924 run_cmd([@zfscmd, $snapshot]);
925 }
926 };
927 if (my $erro = $@) {
928 warn "WARN: $erro";
929 }
930 if ($dest) {
931 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
932
933 my $path = "$dest->{all}";
934 $path .= "/$source->{last_part}" if $source->{last_part};
935
936 eval {
937 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
938 };
939 if (my $erro = $@) {
940 warn "WARN: $erro";
941 }
942 }
943 }
944
945 # check if snapshot for incremental sync exist on source side
946 sub snapshot_exist {
947 my ($source , $dest, $method, $source_user) = @_;
948
949 my $cmd = [];
950 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip};
951 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
952
953 my $path = $source->{all};
954 $path .= "\@$dest->{last_snap}";
955
956 push @$cmd, $path;
957
958 eval {run_cmd($cmd)};
959 if (my $erro =$@) {
960 warn "WARN: $erro";
961 return undef;
962 }
963 return 1;
964 }
965
966 sub send_image {
967 my ($source, $dest, $param) = @_;
968
969 my $cmd = [];
970
971 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
972 push @$cmd, 'zfs', 'send';
973 push @$cmd, '-p', if $param->{properties};
974 push @$cmd, '-v' if $param->{verbose};
975
976 if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) {
977 push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}";
978 }
979 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
980
981 if ($param->{limit}){
982 my $bwl = $param->{limit}*1024;
983 push @$cmd, \'|', 'cstream', '-t', $bwl;
984 }
985 my $target = "$dest->{all}";
986 $target .= "/$source->{last_part}" if $source->{last_part};
987 $target =~ s!/+!/!g;
988
989 push @$cmd, \'|';
990 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
991 push @$cmd, 'zfs', 'recv', '-F', '--';
992 push @$cmd, "$target";
993
994 eval {
995 run_cmd($cmd)
996 };
997
998 if (my $erro = $@) {
999 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
1000 die $erro;
1001 };
1002 }
1003
1004
1005 sub send_config{
1006 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
1007
1008 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
1009 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
1010
1011 my $config_dir = $dest_config_path // $CONFIG_PATH;
1012 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
1013
1014 $dest_target_new = $config_dir.'/'.$dest_target_new;
1015
1016 if ($method eq 'ssh'){
1017 if ($dest->{ip} && $source->{ip}) {
1018 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1019 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1020 } elsif ($dest->{ip}) {
1021 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1022 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1023 } elsif ($source->{ip}) {
1024 run_cmd(['mkdir', '-p', '--', $config_dir]);
1025 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
1026 }
1027
1028 if ($source->{destroy}){
1029 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$dest->{old_snap}";
1030 if($dest->{ip}){
1031 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
1032 } else {
1033 run_cmd(['rm', '-f', '--', $dest_target_old]);
1034 }
1035 }
1036 } elsif ($method eq 'local') {
1037 run_cmd(['mkdir', '-p', '--', $config_dir]);
1038 run_cmd(['cp', $source_target, $dest_target_new]);
1039 }
1040 }
1041
1042 sub get_date {
1043 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1044 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1045
1046 return $datestamp;
1047 }
1048
1049 sub status {
1050 my $cfg = read_cron();
1051
1052 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1053
1054 my $states = read_state();
1055
1056 foreach my $source (sort keys%{$cfg}) {
1057 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1058 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1059 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1060 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1061 }
1062 }
1063
1064 return $status_list;
1065 }
1066
1067 sub enable_job {
1068 my ($param) = @_;
1069
1070 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1071 my $job = get_job($param);
1072 $job->{state} = "ok";
1073 update_state($job);
1074 update_cron($job);
1075 });
1076 }
1077
1078 sub disable_job {
1079 my ($param) = @_;
1080
1081 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1082 my $job = get_job($param);
1083 $job->{state} = "stopped";
1084 update_state($job);
1085 update_cron($job);
1086 });
1087 }
1088
1089 my $cmd_help = {
1090 destroy => qq{
1091 $PROGNAME destroy -source <string> [OPTIONS]
1092
1093 remove a sync Job from the scheduler
1094
1095 -name string
1096
1097 name of the sync job, if not set it is default
1098
1099 -source string
1100
1101 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1102 },
1103 create => qq{
1104 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1105
1106 Create a sync Job
1107
1108 -dest string
1109
1110 the destination target is like [IP]:<Pool>[/Path]
1111
1112 -dest-user string
1113
1114 name of the user on the destination target, root by default
1115
1116 -limit integer
1117
1118 max sync speed in kBytes/s, default unlimited
1119
1120 -maxsnap string
1121
1122 how much snapshots will be kept before get erased, default 1
1123
1124 -name string
1125
1126 name of the sync job, if not set it is default
1127
1128 -skip boolean
1129
1130 if this flag is set it will skip the first sync
1131
1132 -source string
1133
1134 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1135
1136 -source-user string
1137
1138 name of the user on the source target, root by default
1139
1140 -properties boolean
1141
1142 Include the dataset's properties in the stream.
1143
1144 -dest-config-path string
1145
1146 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1147 },
1148 sync => qq{
1149 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1150
1151 will sync one time
1152
1153 -dest string
1154
1155 the destination target is like [IP:]<Pool>[/Path]
1156
1157 -dest-user string
1158
1159 name of the user on the destination target, root by default
1160
1161 -limit integer
1162
1163 max sync speed in kBytes/s, default unlimited
1164
1165 -maxsnap integer
1166
1167 how much snapshots will be kept before get erased, default 1
1168
1169 -name string
1170
1171 name of the sync job, if not set it is default.
1172 It is only necessary if scheduler allready contains this source.
1173
1174 -source string
1175
1176 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1177
1178 -source-user string
1179
1180 name of the user on the source target, root by default
1181
1182 -verbose boolean
1183
1184 print out the sync progress.
1185
1186 -properties boolean
1187
1188 Include the dataset's properties in the stream.
1189
1190 -dest-config-path string
1191
1192 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1193 },
1194 list => qq{
1195 $PROGNAME list
1196
1197 Get a List of all scheduled Sync Jobs
1198 },
1199 status => qq{
1200 $PROGNAME status
1201
1202 Get the status of all scheduled Sync Jobs
1203 },
1204 help => qq{
1205 $PROGNAME help <cmd> [OPTIONS]
1206
1207 Get help about specified command.
1208
1209 <cmd> string
1210
1211 Command name
1212
1213 -verbose boolean
1214
1215 Verbose output format.
1216 },
1217 enable => qq{
1218 $PROGNAME enable -source <string> [OPTIONS]
1219
1220 enable a syncjob and reset error
1221
1222 -name string
1223
1224 name of the sync job, if not set it is default
1225
1226 -source string
1227
1228 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1229 },
1230 disable => qq{
1231 $PROGNAME disable -source <string> [OPTIONS]
1232
1233 pause a sync job
1234
1235 -name string
1236
1237 name of the sync job, if not set it is default
1238
1239 -source string
1240
1241 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1242 },
1243 printpod => 'internal command',
1244
1245 };
1246
1247 if (!$command) {
1248 usage(); die "\n";
1249 } elsif (!$cmd_help->{$command}) {
1250 print "ERROR: unknown command '$command'";
1251 usage(1); die "\n";
1252 }
1253
1254 my @arg = @ARGV;
1255 my $param = parse_argv(@arg);
1256
1257 sub check_params {
1258 for (@_) {
1259 die "$cmd_help->{$command}\n" if !$param->{$_};
1260 }
1261 }
1262
1263 if ($command eq 'destroy') {
1264 check_params(qw(source));
1265
1266 check_target($param->{source});
1267 destroy_job($param);
1268
1269 } elsif ($command eq 'sync') {
1270 check_params(qw(source dest));
1271
1272 check_target($param->{source});
1273 check_target($param->{dest});
1274 sync($param);
1275
1276 } elsif ($command eq 'create') {
1277 check_params(qw(source dest));
1278
1279 check_target($param->{source});
1280 check_target($param->{dest});
1281 init($param);
1282
1283 } elsif ($command eq 'status') {
1284 print status();
1285
1286 } elsif ($command eq 'list') {
1287 print list();
1288
1289 } elsif ($command eq 'help') {
1290 my $help_command = $ARGV[1];
1291
1292 if ($help_command && $cmd_help->{$help_command}) {
1293 die "$cmd_help->{$help_command}\n";
1294
1295 }
1296 if ($param->{verbose}) {
1297 exec("man $PROGNAME");
1298
1299 } else {
1300 usage(1);
1301
1302 }
1303
1304 } elsif ($command eq 'enable') {
1305 check_params(qw(source));
1306
1307 check_target($param->{source});
1308 enable_job($param);
1309
1310 } elsif ($command eq 'disable') {
1311 check_params(qw(source));
1312
1313 check_target($param->{source});
1314 disable_job($param);
1315
1316 } elsif ($command eq 'printpod') {
1317 print_pod();
1318 }
1319
1320 sub usage {
1321 my ($help) = @_;
1322
1323 print("ERROR:\tno command specified\n") if !$help;
1324 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1325 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1326 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1327 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1328 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1329 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1330 print("\t$PROGNAME list\n");
1331 print("\t$PROGNAME status\n");
1332 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1333 }
1334
1335 sub check_target {
1336 my ($target) = @_;
1337 parse_target($target);
1338 }
1339
1340 sub print_pod {
1341
1342 my $synopsis = join("\n", sort values %$cmd_help);
1343
1344 print <<EOF;
1345 =head1 NAME
1346
1347 pve-zsync - PVE ZFS Replication Manager
1348
1349 =head1 SYNOPSIS
1350
1351 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1352
1353 $synopsis
1354
1355 =head1 DESCRIPTION
1356
1357 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1358 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1359 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1360 To config cron see man crontab.
1361
1362 =head2 PVE ZFS Storage sync Tool
1363
1364 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1365
1366 =head1 EXAMPLES
1367
1368 add sync job from local VM to remote ZFS Server
1369 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1370
1371 =head1 IMPORTANT FILES
1372
1373 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1374
1375 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1376
1377 =head1 COPYRIGHT AND DISCLAIMER
1378
1379 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1380
1381 This program is free software: you can redistribute it and/or modify it
1382 under the terms of the GNU Affero General Public License as published
1383 by the Free Software Foundation, either version 3 of the License, or
1384 (at your option) any later version.
1385
1386 This program is distributed in the hope that it will be useful, but
1387 WITHOUT ANY WARRANTY; without even the implied warranty of
1388 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1389 Affero General Public License for more details.
1390
1391 You should have received a copy of the GNU Affero General Public
1392 License along with this program. If not, see
1393 <http://www.gnu.org/licenses/>.
1394
1395 EOF
1396 }