]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
fix #1860 added ability to specify source and destination user
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5 use Data::Dumper qw(Dumper);
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Copy qw(move);
9 use File::Path qw(make_path);
10 use JSON;
11 use IO::File;
12 use String::ShellQuote 'shell_quote';
13
14 my $PROGNAME = "pve-zsync";
15 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
16 my $STATE = "${CONFIG_PATH}/sync_state";
17 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
18 my $PATH = "/usr/sbin";
19 my $PVE_DIR = "/etc/pve/local";
20 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
21 my $LXC_CONF = "${PVE_DIR}/lxc";
22 my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
23 my $PROG_PATH = "$PATH/${PROGNAME}";
24 my $INTERVAL = 15;
25 my $DEBUG = 0;
26
27 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
28 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
29 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
30 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
31
32 my $IPV6RE = "(?:" .
33 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
34 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
35 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
36 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
37 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
38 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
39 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
40 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
41 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
42
43 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
44 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
45 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
46 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
47 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
48
49 my $command = $ARGV[0];
50
51 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
52 check_bin ('cstream');
53 check_bin ('zfs');
54 check_bin ('ssh');
55 check_bin ('scp');
56 }
57
58 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =
59 sub {
60 die "Signal aborting sync\n";
61 };
62
63 sub check_bin {
64 my ($bin) = @_;
65
66 foreach my $p (split (/:/, $ENV{PATH})) {
67 my $fn = "$p/$bin";
68 if (-x $fn) {
69 return $fn;
70 }
71 }
72
73 die "unable to find command '$bin'\n";
74 }
75
76 sub cut_target_width {
77 my ($path, $maxlen) = @_;
78 $path =~ s@/+@/@g;
79
80 return $path if length($path) <= $maxlen;
81
82 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
83
84 $path =~ s@/([^/]+/?)$@@;
85 my $tail = $1;
86
87 if (length($tail)+3 == $maxlen) {
88 return "../$tail";
89 } elsif (length($tail)+2 >= $maxlen) {
90 return '..'.substr($tail, -$maxlen+2)
91 }
92
93 $path =~ s@(/[^/]+)(?:/|$)@@;
94 my $head = $1;
95 my $both = length($head) + length($tail);
96 my $remaining = $maxlen-$both-4; # -4 for "/../"
97
98 if ($remaining < 0) {
99 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
100 }
101
102 substr($path, ($remaining/2), (length($path)-$remaining), '..');
103 return "$head/" . $path . "/$tail";
104 }
105
106 sub lock {
107 my ($fh) = @_;
108 flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
109 }
110
111 sub unlock {
112 my ($fh) = @_;
113 flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
114 }
115
116 sub get_status {
117 my ($source, $name, $status) = @_;
118
119 if ($status->{$source->{all}}->{$name}->{status}) {
120 return $status;
121 }
122
123 return undef;
124 }
125
126 sub check_pool_exists {
127 my ($target, $user) = @_;
128
129 my $cmd = [];
130
131 if ($target->{ip}) {
132 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
133 }
134 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
135 eval {
136 run_cmd($cmd);
137 };
138
139 if ($@) {
140 return 0;
141 }
142 return 1;
143 }
144
145 sub parse_target {
146 my ($text) = @_;
147
148 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
149 my $target = {};
150
151 if ($text !~ $TARGETRE) {
152 die "$errstr\n";
153 }
154 $target->{all} = $2;
155 $target->{ip} = $1 if $1;
156 my @parts = split('/', $2);
157
158 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
159
160 my $pool = $target->{pool} = shift(@parts);
161 die "$errstr\n" if !$pool;
162
163 if ($pool =~ m/^\d+$/) {
164 $target->{vmid} = $pool;
165 delete $target->{pool};
166 }
167
168 return $target if (@parts == 0);
169 $target->{last_part} = pop(@parts);
170
171 if ($target->{ip}) {
172 pop(@parts);
173 }
174 if (@parts > 0) {
175 $target->{path} = join('/', @parts);
176 }
177
178 return $target;
179 }
180
181 sub read_cron {
182
183 #This is for the first use to init file;
184 if (!-e $CRONJOBS) {
185 my $new_fh = IO::File->new("> $CRONJOBS");
186 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
187 close($new_fh);
188 return undef;
189 }
190
191 my $fh = IO::File->new("< $CRONJOBS");
192 die "Could not open file $CRONJOBS: $!\n" if !$fh;
193
194 my @text = <$fh>;
195
196 close($fh);
197
198 return encode_cron(@text);
199 }
200
201 sub parse_argv {
202 my (@arg) = @_;
203
204 my $param = {};
205 $param->{dest} = undef;
206 $param->{source} = undef;
207 $param->{verbose} = undef;
208 $param->{limit} = undef;
209 $param->{maxsnap} = undef;
210 $param->{name} = undef;
211 $param->{skip} = undef;
212 $param->{method} = undef;
213 $param->{source_user} = undef;
214 $param->{dest_user} = undef;
215
216 my ($ret, $ar) = GetOptionsFromArray(\@arg,
217 'dest=s' => \$param->{dest},
218 'source=s' => \$param->{source},
219 'verbose' => \$param->{verbose},
220 'limit=i' => \$param->{limit},
221 'maxsnap=i' => \$param->{maxsnap},
222 'name=s' => \$param->{name},
223 'skip' => \$param->{skip},
224 'method=s' => \$param->{method},
225 'source-user=s' => \$param->{source_user},
226 'dest-user=s' => \$param->{dest_user});
227
228 if ($ret == 0) {
229 die "can't parse options\n";
230 }
231
232 $param->{name} = "default" if !$param->{name};
233 $param->{maxsnap} = 1 if !$param->{maxsnap};
234 $param->{method} = "ssh" if !$param->{method};
235 $param->{source_user} = "root" if(!$param->{source_user});
236 $param->{dest_user} = "root" if(!$param->{dest_user});
237
238 return $param;
239 }
240
241 sub add_state_to_job {
242 my ($job) = @_;
243
244 my $states = read_state();
245 my $state = $states->{$job->{source}}->{$job->{name}};
246
247 $job->{state} = $state->{state};
248 $job->{lsync} = $state->{lsync};
249 $job->{vm_type} = $state->{vm_type};
250
251 for (my $i = 0; $state->{"snap$i"}; $i++) {
252 $job->{"snap$i"} = $state->{"snap$i"};
253 }
254
255 return $job;
256 }
257
258 sub encode_cron {
259 my (@text) = @_;
260
261 my $cfg = {};
262
263 while (my $line = shift(@text)) {
264
265 my @arg = split('\s', $line);
266 my $param = parse_argv(@arg);
267
268 if ($param->{source} && $param->{dest}) {
269 $cfg->{$param->{source}}->{$param->{name}}->{dest} = $param->{dest};
270 $cfg->{$param->{source}}->{$param->{name}}->{verbose} = $param->{verbose};
271 $cfg->{$param->{source}}->{$param->{name}}->{limit} = $param->{limit};
272 $cfg->{$param->{source}}->{$param->{name}}->{maxsnap} = $param->{maxsnap};
273 $cfg->{$param->{source}}->{$param->{name}}->{skip} = $param->{skip};
274 $cfg->{$param->{source}}->{$param->{name}}->{method} = $param->{method};
275 $cfg->{$param->{source}}->{$param->{name}}->{source_user} = $param->{source_user};
276 $cfg->{$param->{source}}->{$param->{name}}->{dest_user} = $param->{dest_user};
277 }
278 }
279
280 return $cfg;
281 }
282
283 sub param_to_job {
284 my ($param) = @_;
285
286 my $job = {};
287
288 my $source = parse_target($param->{source});
289 my $dest = parse_target($param->{dest}) if $param->{dest};
290
291 $job->{name} = !$param->{name} ? "default" : $param->{name};
292 $job->{dest} = $param->{dest} if $param->{dest};
293 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
294 $job->{method} = "ssh" if !$job->{method};
295 $job->{limit} = $param->{limit};
296 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
297 $job->{source} = $param->{source};
298 $job->{source_user} = $param->{source_user};
299 $job->{dest_user} = $param->{dest_user};
300
301 return $job;
302 }
303
304 sub read_state {
305
306 if (!-e $STATE) {
307 make_path $CONFIG_PATH;
308 my $new_fh = IO::File->new("> $STATE");
309 die "Could not create $STATE: $!\n" if !$new_fh;
310 print $new_fh "{}";
311 close($new_fh);
312 return undef;
313 }
314
315 my $fh = IO::File->new("< $STATE");
316 die "Could not open file $STATE: $!\n" if !$fh;
317
318 my $text = <$fh>;
319 my $states = decode_json($text);
320
321 close($fh);
322
323 return $states;
324 }
325
326 sub update_state {
327 my ($job) = @_;
328 my $text;
329 my $in_fh;
330
331 eval {
332
333 $in_fh = IO::File->new("< $STATE");
334 die "Could not open file $STATE: $!\n" if !$in_fh;
335 lock($in_fh);
336 $text = <$in_fh>;
337 };
338
339 my $out_fh = IO::File->new("> $STATE.new");
340 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
341
342 my $states = {};
343 my $state = {};
344 if ($text){
345 $states = decode_json($text);
346 $state = $states->{$job->{source}}->{$job->{name}};
347 }
348
349 if ($job->{state} ne "del") {
350 $state->{state} = $job->{state};
351 $state->{lsync} = $job->{lsync};
352 $state->{vm_type} = $job->{vm_type};
353
354 for (my $i = 0; $job->{"snap$i"} ; $i++) {
355 $state->{"snap$i"} = $job->{"snap$i"};
356 }
357 $states->{$job->{source}}->{$job->{name}} = $state;
358 } else {
359
360 delete $states->{$job->{source}}->{$job->{name}};
361 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
362 }
363
364 $text = encode_json($states);
365 print $out_fh $text;
366
367 close($out_fh);
368 move("$STATE.new", $STATE);
369 eval {
370 close($in_fh);
371 };
372
373 return $states;
374 }
375
376 sub update_cron {
377 my ($job) = @_;
378
379 my $updated;
380 my $has_header;
381 my $line_no = 0;
382 my $text = "";
383 my $header = "SHELL=/bin/sh\n";
384 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
385
386 my $fh = IO::File->new("< $CRONJOBS");
387 die "Could not open file $CRONJOBS: $!\n" if !$fh;
388 lock($fh);
389
390 my @test = <$fh>;
391
392 while (my $line = shift(@test)) {
393 chomp($line);
394 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
395 $updated = 1;
396 next if $job->{state} eq "del";
397 $text .= format_job($job, $line);
398 } else {
399 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
400 $has_header = 1;
401 }
402 $text .= "$line\n";
403 }
404 $line_no++;
405 }
406
407 if (!$has_header) {
408 $text = "$header$text";
409 }
410
411 if (!$updated) {
412 $text .= format_job($job);
413 }
414 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
415 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
416
417 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
418 close ($new_fh);
419
420 die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
421 close ($fh);
422 }
423
424 sub format_job {
425 my ($job, $line) = @_;
426 my $text = "";
427
428 if ($job->{state} eq "stopped") {
429 $text = "#";
430 }
431 if ($line) {
432 $line =~ /^#*(.+) root/;
433 $text .= $1;
434 } else {
435 $text .= "*/$INTERVAL * * * *";
436 }
437 $text .= " root";
438 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
439 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
440 $text .= " --limit $job->{limit}" if $job->{limit};
441 $text .= " --method $job->{method}";
442 $text .= " --verbose" if $job->{verbose};
443 $text .= " --source-user $job->{source_user}";
444 $text .= " --dest-user $job->{dest_user}";
445 $text .= "\n";
446
447 return $text;
448 }
449
450 sub list {
451
452 my $cfg = read_cron();
453
454 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
455
456 my $states = read_state();
457 foreach my $source (sort keys%{$cfg}) {
458 foreach my $name (sort keys%{$cfg->{$source}}) {
459 $list .= sprintf("%-25s", cut_target_width($source, 25));
460 $list .= sprintf("%-25s", cut_target_width($name, 25));
461 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
462 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
463 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
464 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
465 }
466 }
467
468 return $list;
469 }
470
471 sub vm_exists {
472 my ($target, $user) = @_;
473
474 my @cmd = ('ssh', "$user\@$target->{ip}", '--') if $target->{ip};
475
476 my $res = undef;
477
478 return undef if !defined($target->{vmid});
479
480 eval { $res = run_cmd([@cmd, 'ls', "$QEMU_CONF/$target->{vmid}.conf"]) };
481
482 return "qemu" if $res;
483
484 eval { $res = run_cmd([@cmd, 'ls', "$LXC_CONF/$target->{vmid}.conf"]) };
485
486 return "lxc" if $res;
487
488 return undef;
489 }
490
491 sub init {
492 my ($param) = @_;
493
494 my $cfg = read_cron();
495
496 my $job = param_to_job($param);
497
498 $job->{state} = "ok";
499 $job->{lsync} = 0;
500
501 my $source = parse_target($param->{source});
502 my $dest = parse_target($param->{dest});
503
504 if (my $ip = $dest->{ip}) {
505 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
506 }
507
508 if (my $ip = $source->{ip}) {
509 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
510 }
511
512 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
513
514 if (!defined($source->{vmid})) {
515 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
516 }
517
518 my $vm_type = vm_exists($source, $param->{source_user});
519 $job->{vm_type} = $vm_type;
520 $source->{vm_type} = $vm_type;
521
522 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
523
524 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
525
526 #check if vm has zfs disks if not die;
527 get_disks($source, $param->{source_user}) if $source->{vmid};
528
529 update_cron($job);
530 update_state($job);
531
532 eval {
533 sync($param) if !$param->{skip};
534 };
535 if(my $err = $@) {
536 destroy_job($param);
537 print $err;
538 }
539 }
540
541 sub get_job {
542 my ($param) = @_;
543
544 my $cfg = read_cron();
545
546 if (!$cfg->{$param->{source}}->{$param->{name}}) {
547 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
548 }
549 my $job = $cfg->{$param->{source}}->{$param->{name}};
550 $job->{name} = $param->{name};
551 $job->{source} = $param->{source};
552 $job = add_state_to_job($job);
553
554 return $job;
555 }
556
557 sub destroy_job {
558 my ($param) = @_;
559
560 my $job = get_job($param);
561 $job->{state} = "del";
562
563 update_cron($job);
564 update_state($job);
565 }
566
567 sub sync {
568 my ($param) = @_;
569
570 my $lock_fh = IO::File->new("> $LOCKFILE");
571 die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
572 lock($lock_fh);
573
574 my $date = get_date();
575 my $job;
576 eval {
577 $job = get_job($param);
578 };
579
580 if ($job && $job->{state} eq "syncing") {
581 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
582 }
583
584 my $dest = parse_target($param->{dest});
585 my $source = parse_target($param->{source});
586
587 my $sync_path = sub {
588 my ($source, $dest, $job, $param, $date) = @_;
589
590 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
591
592 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
593
594 send_image($source, $dest, $param);
595
596 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
597
598 };
599
600 my $vm_type = vm_exists($source, $param->{source_user});
601 $source->{vm_type} = $vm_type;
602
603 if ($job) {
604 $job->{state} = "syncing";
605 $job->{vm_type} = $vm_type if !$job->{vm_type};
606 update_state($job);
607 }
608
609 eval{
610 if ($source->{vmid}) {
611 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
612 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
613 my $disks = get_disks($source, $param->{source_user});
614
615 foreach my $disk (sort keys %{$disks}) {
616 $source->{all} = $disks->{$disk}->{all};
617 $source->{pool} = $disks->{$disk}->{pool};
618 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
619 $source->{last_part} = $disks->{$disk}->{last_part};
620 &$sync_path($source, $dest, $job, $param, $date);
621 }
622 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
623 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user});
624 } else {
625 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user});
626 }
627 } else {
628 &$sync_path($source, $dest, $job, $param, $date);
629 }
630 };
631 if(my $err = $@) {
632 if ($job) {
633 $job->{state} = "error";
634 update_state($job);
635 unlock($lock_fh);
636 close($lock_fh);
637 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
638 }
639 die "$err\n";
640 }
641
642 if ($job) {
643 $job->{state} = "ok";
644 $job->{lsync} = $date;
645 update_state($job);
646 }
647
648 unlock($lock_fh);
649 close($lock_fh);
650 }
651
652 sub snapshot_get{
653 my ($source, $dest, $max_snap, $name, $source_user) = @_;
654
655 my $cmd = [];
656 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
657 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
658 push @$cmd, $source->{all};
659
660 my $raw = run_cmd($cmd);
661 my $index = 0;
662 my $line = "";
663 my $last_snap = undef;
664 my $old_snap;
665
666 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
667 $line = $1;
668 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
669
670 $last_snap = $1 if (!$last_snap);
671 $old_snap = $1;
672 $index++;
673 if ($index == $max_snap) {
674 $source->{destroy} = 1;
675 last;
676 };
677 }
678 }
679
680 return ($old_snap, $last_snap) if $last_snap;
681
682 return undef;
683 }
684
685 sub snapshot_add {
686 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
687
688 my $snap_name = "rep_$name\_".$date;
689
690 $source->{new_snap} = $snap_name;
691
692 my $path = "$source->{all}\@$snap_name";
693
694 my $cmd = [];
695 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
696 push @$cmd, 'zfs', 'snapshot', $path;
697 eval{
698 run_cmd($cmd);
699 };
700
701 if (my $err = $@) {
702 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
703 die "$err\n";
704 }
705 }
706
707 sub write_cron {
708 my ($cfg) = @_;
709
710 my $text = "SHELL=/bin/sh\n";
711 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
712
713 my $fh = IO::File->new("> $CRONJOBS");
714 die "Could not open file: $!\n" if !$fh;
715
716 foreach my $source (sort keys%{$cfg}) {
717 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
718 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
719 $text .= "$PROG_PATH sync";
720 $text .= " -source ";
721 if ($cfg->{$source}->{$sync_name}->{vmid}) {
722 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
723 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
724 } else {
725 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
726 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
727 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
728 }
729 $text .= " -dest ";
730 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
731 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
732 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
733 $text .= " -name $sync_name ";
734 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
735 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
736 $text .= "\n";
737 }
738 }
739 die "Can't write to cron\n" if (!print($fh $text));
740 close($fh);
741 }
742
743 sub get_disks {
744 my ($target, $user) = @_;
745
746 my $cmd = [];
747 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
748
749 if ($target->{vm_type} eq 'qemu') {
750 push @$cmd, 'qm', 'config', $target->{vmid};
751 } elsif ($target->{vm_type} eq 'lxc') {
752 push @$cmd, 'pct', 'config', $target->{vmid};
753 } else {
754 die "VM Type unknown\n";
755 }
756
757 my $res = run_cmd($cmd);
758
759 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
760
761 return $disks;
762 }
763
764 sub run_cmd {
765 my ($cmd) = @_;
766 print "Start CMD\n" if $DEBUG;
767 print Dumper $cmd if $DEBUG;
768 if (ref($cmd) eq 'ARRAY') {
769 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
770 }
771 my $output = `$cmd 2>&1`;
772
773 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
774
775 chomp($output);
776 print Dumper $output if $DEBUG;
777 print "END CMD\n" if $DEBUG;
778 return $output;
779 }
780
781 sub parse_disks {
782 my ($text, $ip, $vm_type, $user) = @_;
783
784 my $disks;
785
786 my $num = 0;
787 while ($text && $text =~ s/^(.*?)(\n|$)//) {
788 my $line = $1;
789
790 next if $line =~ /media=cdrom/;
791 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
792
793 #QEMU if backup is not set include in sync
794 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
795
796 #LXC if backup is not set do no in sync
797 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
798
799 my $disk = undef;
800 my $stor = undef;
801 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
802 my @parameter = split(/,/,$1);
803
804 foreach my $opt (@parameter) {
805 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
806 $disk = $2;
807 $stor = $1;
808 last;
809 }
810 }
811 }
812 if (!defined($disk) || !defined($stor)) {
813 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
814 next;
815 }
816
817 my $cmd = [];
818 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
819 push @$cmd, 'pvesm', 'path', "$stor$disk";
820 my $path = run_cmd($cmd);
821
822 die "Get no path from pvesm path $stor$disk\n" if !$path;
823
824 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
825
826 my @array = split('/', $1);
827 $disks->{$num}->{pool} = shift(@array);
828 $disks->{$num}->{all} = $disks->{$num}->{pool};
829 if (0 < @array) {
830 $disks->{$num}->{path} = join('/', @array);
831 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
832 }
833 $disks->{$num}->{last_part} = $disk;
834 $disks->{$num}->{all} .= "\/$disk";
835
836 $num++;
837 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
838
839 $disks->{$num}->{pool} = $1;
840 $disks->{$num}->{all} = $disks->{$num}->{pool};
841
842 if ($2) {
843 $disks->{$num}->{path} = $3;
844 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
845 }
846
847 $disks->{$num}->{last_part} = $disk;
848 $disks->{$num}->{all} .= "\/$disk";
849
850 $num++;
851
852 } else {
853 die "ERROR: in path\n";
854 }
855 }
856
857 die "Vm include no disk on zfs.\n" if !$disks->{0};
858 return $disks;
859 }
860
861 sub snapshot_destroy {
862 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
863
864 my @zfscmd = ('zfs', 'destroy');
865 my $snapshot = "$source->{all}\@$snap";
866
867 eval {
868 if($source->{ip} && $method eq 'ssh'){
869 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
870 } else {
871 run_cmd([@zfscmd, $snapshot]);
872 }
873 };
874 if (my $erro = $@) {
875 warn "WARN: $erro";
876 }
877 if ($dest) {
878 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
879
880 my $path = "$dest->{all}";
881 $path .= "/$source->{last_part}" if $source->{last_part};
882
883 eval {
884 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
885 };
886 if (my $erro = $@) {
887 warn "WARN: $erro";
888 }
889 }
890 }
891
892 sub snapshot_exist {
893 my ($source , $dest, $method, $source_user) = @_;
894
895 my $cmd = [];
896 push @$cmd, 'ssh', "$source_user\@$dest->{ip}", '--' if $dest->{ip};
897 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
898
899 my $path = $dest->{all};
900 $path .= "/$source->{last_part}" if $source->{last_part};
901 $path .= "\@$source->{old_snap}";
902
903 push @$cmd, $path;
904
905
906 my $text = "";
907 eval {$text =run_cmd($cmd);};
908 if (my $erro =$@) {
909 warn "WARN: $erro";
910 return undef;
911 }
912
913 while ($text && $text =~ s/^(.*?)(\n|$)//) {
914 my $line =$1;
915 return 1 if $line =~ m/^.*$source->{old_snap}$/;
916 }
917 }
918
919 sub send_image {
920 my ($source, $dest, $param) = @_;
921
922 my $cmd = [];
923
924 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
925 push @$cmd, 'zfs', 'send';
926 push @$cmd, '-v' if $param->{verbose};
927
928 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) {
929 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
930 }
931 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
932
933 if ($param->{limit}){
934 my $bwl = $param->{limit}*1024;
935 push @$cmd, \'|', 'cstream', '-t', $bwl;
936 }
937 my $target = "$dest->{all}";
938 $target .= "/$source->{last_part}" if $source->{last_part};
939 $target =~ s!/+!/!g;
940
941 push @$cmd, \'|';
942 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
943 push @$cmd, 'zfs', 'recv', '-F', '--';
944 push @$cmd, "$target";
945
946 eval {
947 run_cmd($cmd)
948 };
949
950 if (my $erro = $@) {
951 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
952 die $erro;
953 };
954 }
955
956
957 sub send_config{
958 my ($source, $dest, $method, $source_user, $dest_user) = @_;
959
960 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
961 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
962
963 my $config_dir = $dest->{last_part} ? "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;
964
965 $dest_target_new = $config_dir.'/'.$dest_target_new;
966
967 if ($method eq 'ssh'){
968 if ($dest->{ip} && $source->{ip}) {
969 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
970 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
971 } elsif ($dest->{ip}) {
972 run_cmd(['ssh', "$source_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
973 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
974 } elsif ($source->{ip}) {
975 run_cmd(['mkdir', '-p', '--', $config_dir]);
976 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
977 }
978
979 if ($source->{destroy}){
980 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
981 if($dest->{ip}){
982 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
983 } else {
984 run_cmd(['rm', '-f', '--', $dest_target_old]);
985 }
986 }
987 } elsif ($method eq 'local') {
988 run_cmd(['mkdir', '-p', '--', $config_dir]);
989 run_cmd(['cp', $source_target, $dest_target_new]);
990 }
991 }
992
993 sub get_date {
994 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
995 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
996
997 return $datestamp;
998 }
999
1000 sub status {
1001 my $cfg = read_cron();
1002
1003 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1004
1005 my $states = read_state();
1006
1007 foreach my $source (sort keys%{$cfg}) {
1008 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1009 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1010 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1011 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1012 }
1013 }
1014
1015 return $status_list;
1016 }
1017
1018 sub enable_job {
1019 my ($param) = @_;
1020
1021 my $job = get_job($param);
1022 $job->{state} = "ok";
1023 update_state($job);
1024 update_cron($job);
1025 }
1026
1027 sub disable_job {
1028 my ($param) = @_;
1029
1030 my $job = get_job($param);
1031 $job->{state} = "stopped";
1032 update_state($job);
1033 update_cron($job);
1034 }
1035
1036 my $cmd_help = {
1037 destroy => qq{
1038 $PROGNAME destroy -source <string> [OPTIONS]
1039
1040 remove a sync Job from the scheduler
1041
1042 -name string
1043
1044 name of the sync job, if not set it is default
1045
1046 -source string
1047
1048 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1049 },
1050 create => qq{
1051 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1052
1053 Create a sync Job
1054
1055 -dest string
1056
1057 the destination target is like [IP]:<Pool>[/Path]
1058
1059 -dest-user string
1060
1061 name of the user on the destination target, root by default
1062
1063 -limit integer
1064
1065 max sync speed in kBytes/s, default unlimited
1066
1067 -maxsnap string
1068
1069 how much snapshots will be kept before get erased, default 1
1070
1071 -name string
1072
1073 name of the sync job, if not set it is default
1074
1075 -skip boolean
1076
1077 if this flag is set it will skip the first sync
1078
1079 -source string
1080
1081 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1082
1083 -source-user string
1084
1085 name of the user on the source target, root by default
1086 },
1087 sync => qq{
1088 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1089
1090 will sync one time
1091
1092 -dest string
1093
1094 the destination target is like [IP:]<Pool>[/Path]
1095
1096 -dest-user string
1097
1098 name of the user on the destination target, root by default
1099
1100 -limit integer
1101
1102 max sync speed in kBytes/s, default unlimited
1103
1104 -maxsnap integer
1105
1106 how much snapshots will be kept before get erased, default 1
1107
1108 -name string
1109
1110 name of the sync job, if not set it is default.
1111 It is only necessary if scheduler allready contains this source.
1112
1113 -source string
1114
1115 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1116
1117 -source-user string
1118
1119 name of the user on the source target, root by default
1120
1121 -verbose boolean
1122
1123 print out the sync progress.
1124 },
1125 list => qq{
1126 $PROGNAME list
1127
1128 Get a List of all scheduled Sync Jobs
1129 },
1130 status => qq{
1131 $PROGNAME status
1132
1133 Get the status of all scheduled Sync Jobs
1134 },
1135 help => qq{
1136 $PROGNAME help <cmd> [OPTIONS]
1137
1138 Get help about specified command.
1139
1140 <cmd> string
1141
1142 Command name
1143
1144 -verbose boolean
1145
1146 Verbose output format.
1147 },
1148 enable => qq{
1149 $PROGNAME enable -source <string> [OPTIONS]
1150
1151 enable a syncjob and reset error
1152
1153 -name string
1154
1155 name of the sync job, if not set it is default
1156
1157 -source string
1158
1159 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1160 },
1161 disable => qq{
1162 $PROGNAME disable -source <string> [OPTIONS]
1163
1164 pause a sync job
1165
1166 -name string
1167
1168 name of the sync job, if not set it is default
1169
1170 -source string
1171
1172 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1173 },
1174 printpod => 'internal command',
1175
1176 };
1177
1178 if (!$command) {
1179 usage(); die "\n";
1180 } elsif (!$cmd_help->{$command}) {
1181 print "ERROR: unknown command '$command'";
1182 usage(1); die "\n";
1183 }
1184
1185 my @arg = @ARGV;
1186 my $param = parse_argv(@arg);
1187
1188 sub check_params {
1189 for (@_) {
1190 die "$cmd_help->{$command}\n" if !$param->{$_};
1191 }
1192 }
1193
1194 if ($command eq 'destroy') {
1195 check_params(qw(source));
1196
1197 check_target($param->{source});
1198 destroy_job($param);
1199
1200 } elsif ($command eq 'sync') {
1201 check_params(qw(source dest));
1202
1203 check_target($param->{source});
1204 check_target($param->{dest});
1205 sync($param);
1206
1207 } elsif ($command eq 'create') {
1208 check_params(qw(source dest));
1209
1210 check_target($param->{source});
1211 check_target($param->{dest});
1212 init($param);
1213
1214 } elsif ($command eq 'status') {
1215 print status();
1216
1217 } elsif ($command eq 'list') {
1218 print list();
1219
1220 } elsif ($command eq 'help') {
1221 my $help_command = $ARGV[1];
1222
1223 if ($help_command && $cmd_help->{$help_command}) {
1224 die "$cmd_help->{$help_command}\n";
1225
1226 }
1227 if ($param->{verbose}) {
1228 exec("man $PROGNAME");
1229
1230 } else {
1231 usage(1);
1232
1233 }
1234
1235 } elsif ($command eq 'enable') {
1236 check_params(qw(source));
1237
1238 check_target($param->{source});
1239 enable_job($param);
1240
1241 } elsif ($command eq 'disable') {
1242 check_params(qw(source));
1243
1244 check_target($param->{source});
1245 disable_job($param);
1246
1247 } elsif ($command eq 'printpod') {
1248 print_pod();
1249 }
1250
1251 sub usage {
1252 my ($help) = @_;
1253
1254 print("ERROR:\tno command specified\n") if !$help;
1255 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1256 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1257 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1258 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1259 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1260 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1261 print("\t$PROGNAME list\n");
1262 print("\t$PROGNAME status\n");
1263 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1264 }
1265
1266 sub check_target {
1267 my ($target) = @_;
1268 parse_target($target);
1269 }
1270
1271 sub print_pod {
1272
1273 my $synopsis = join("\n", sort values %$cmd_help);
1274
1275 print <<EOF;
1276 =head1 NAME
1277
1278 pve-zsync - PVE ZFS Replication Manager
1279
1280 =head1 SYNOPSIS
1281
1282 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1283
1284 $synopsis
1285
1286 =head1 DESCRIPTION
1287
1288 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1289 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1290 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1291 To config cron see man crontab.
1292
1293 =head2 PVE ZFS Storage sync Tool
1294
1295 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1296
1297 =head1 EXAMPLES
1298
1299 add sync job from local VM to remote ZFS Server
1300 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1301
1302 =head1 IMPORTANT FILES
1303
1304 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1305
1306 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1307
1308 =head1 COPYRIGHT AND DISCLAIMER
1309
1310 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1311
1312 This program is free software: you can redistribute it and/or modify it
1313 under the terms of the GNU Affero General Public License as published
1314 by the Free Software Foundation, either version 3 of the License, or
1315 (at your option) any later version.
1316
1317 This program is distributed in the hope that it will be useful, but
1318 WITHOUT ANY WARRANTY; without even the implied warranty of
1319 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1320 Affero General Public License for more details.
1321
1322 You should have received a copy of the GNU Affero General Public
1323 License along with this program. If not, see
1324 <http://www.gnu.org/licenses/>.
1325
1326 EOF
1327 }