]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
Improve read-modify-write enclosures
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $PROG_PATH = "$PATH/${PROGNAME}";
22 my $INTERVAL = 15;
23 my $DEBUG;
24
25 BEGIN {
26 $DEBUG = 0; # change default here. not above on declaration!
27 $DEBUG ||= $ENV{ZSYNC_DEBUG};
28 if ($DEBUG) {
29 require Data::Dumper;
30 Data::Dumper->import();
31 }
32 }
33
34 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
35 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
36 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
37 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
38
39 my $IPV6RE = "(?:" .
40 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
41 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
42 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
43 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
49
50 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
51 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
52 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
53 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
54 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
55
56 my $command = $ARGV[0];
57
58 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
59 check_bin ('cstream');
60 check_bin ('zfs');
61 check_bin ('ssh');
62 check_bin ('scp');
63 }
64
65 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
66 die "Signaled, aborting sync: $!\n";
67 };
68
69 sub check_bin {
70 my ($bin) = @_;
71
72 foreach my $p (split (/:/, $ENV{PATH})) {
73 my $fn = "$p/$bin";
74 if (-x $fn) {
75 return $fn;
76 }
77 }
78
79 die "unable to find command '$bin'\n";
80 }
81
82 sub cut_target_width {
83 my ($path, $maxlen) = @_;
84 $path =~ s@/+@/@g;
85
86 return $path if length($path) <= $maxlen;
87
88 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
89
90 $path =~ s@/([^/]+/?)$@@;
91 my $tail = $1;
92
93 if (length($tail)+3 == $maxlen) {
94 return "../$tail";
95 } elsif (length($tail)+2 >= $maxlen) {
96 return '..'.substr($tail, -$maxlen+2)
97 }
98
99 $path =~ s@(/[^/]+)(?:/|$)@@;
100 my $head = $1;
101 my $both = length($head) + length($tail);
102 my $remaining = $maxlen-$both-4; # -4 for "/../"
103
104 if ($remaining < 0) {
105 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
106 }
107
108 substr($path, ($remaining/2), (length($path)-$remaining), '..');
109 return "$head/" . $path . "/$tail";
110 }
111
112 sub locked {
113 my ($lock_fn, $code) = @_;
114
115 my $lock_fh = IO::File->new("> $lock_fn");
116
117 flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
118 my $res = eval { $code->() };
119 my $err = $@;
120
121 flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
122 die "$err" if $err;
123
124 close($lock_fh);
125 return $res;
126 }
127
128 sub get_status {
129 my ($source, $name, $status) = @_;
130
131 if ($status->{$source->{all}}->{$name}->{status}) {
132 return $status;
133 }
134
135 return undef;
136 }
137
138 sub check_pool_exists {
139 my ($target, $user) = @_;
140
141 my $cmd = [];
142
143 if ($target->{ip}) {
144 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
145 }
146 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
147 eval {
148 run_cmd($cmd);
149 };
150
151 if ($@) {
152 return 0;
153 }
154 return 1;
155 }
156
157 sub parse_target {
158 my ($text) = @_;
159
160 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
161 my $target = {};
162
163 if ($text !~ $TARGETRE) {
164 die "$errstr\n";
165 }
166 $target->{all} = $2;
167 $target->{ip} = $1 if $1;
168 my @parts = split('/', $2);
169
170 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
171
172 my $pool = $target->{pool} = shift(@parts);
173 die "$errstr\n" if !$pool;
174
175 if ($pool =~ m/^\d+$/) {
176 $target->{vmid} = $pool;
177 delete $target->{pool};
178 }
179
180 return $target if (@parts == 0);
181 $target->{last_part} = pop(@parts);
182
183 if ($target->{ip}) {
184 pop(@parts);
185 }
186 if (@parts > 0) {
187 $target->{path} = join('/', @parts);
188 }
189
190 return $target;
191 }
192
193 sub read_cron {
194
195 #This is for the first use to init file;
196 if (!-e $CRONJOBS) {
197 my $new_fh = IO::File->new("> $CRONJOBS");
198 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
199 close($new_fh);
200 return undef;
201 }
202
203 my $fh = IO::File->new("< $CRONJOBS");
204 die "Could not open file $CRONJOBS: $!\n" if !$fh;
205
206 my @text = <$fh>;
207
208 close($fh);
209
210 return encode_cron(@text);
211 }
212
213 sub parse_argv {
214 my (@arg) = @_;
215
216 my $param = {
217 dest => undef,
218 source => undef,
219 verbose => undef,
220 limit => undef,
221 maxsnap => undef,
222 name => undef,
223 skip => undef,
224 method => undef,
225 source_user => undef,
226 dest_user => undef,
227 properties => undef,
228 dest_config_path => undef,
229 };
230
231 my ($ret) = GetOptionsFromArray(
232 \@arg,
233 'dest=s' => \$param->{dest},
234 'source=s' => \$param->{source},
235 'verbose' => \$param->{verbose},
236 'limit=i' => \$param->{limit},
237 'maxsnap=i' => \$param->{maxsnap},
238 'name=s' => \$param->{name},
239 'skip' => \$param->{skip},
240 'method=s' => \$param->{method},
241 'source-user=s' => \$param->{source_user},
242 'dest-user=s' => \$param->{dest_user},
243 'properties' => \$param->{properties},
244 'dest-config-path=s' => \$param->{dest_config_path},
245 );
246
247 die "can't parse options\n" if $ret == 0;
248
249 $param->{name} //= "default";
250 $param->{maxsnap} //= 1;
251 $param->{method} //= "ssh";
252 $param->{source_user} //= "root";
253 $param->{dest_user} //= "root";
254
255 return $param;
256 }
257
258 sub add_state_to_job {
259 my ($job) = @_;
260
261 my $states = read_state();
262 my $state = $states->{$job->{source}}->{$job->{name}};
263
264 $job->{state} = $state->{state};
265 $job->{lsync} = $state->{lsync};
266 $job->{vm_type} = $state->{vm_type};
267
268 for (my $i = 0; $state->{"snap$i"}; $i++) {
269 $job->{"snap$i"} = $state->{"snap$i"};
270 }
271
272 return $job;
273 }
274
275 sub encode_cron {
276 my (@text) = @_;
277
278 my $cfg = {};
279
280 while (my $line = shift(@text)) {
281
282 my @arg = split('\s', $line);
283 my $param = parse_argv(@arg);
284
285 if ($param->{source} && $param->{dest}) {
286 my $source = delete $param->{source};
287 my $name = delete $param->{name};
288
289 $cfg->{$source}->{$name} = $param;
290 }
291 }
292
293 return $cfg;
294 }
295
296 sub param_to_job {
297 my ($param) = @_;
298
299 my $job = {};
300
301 my $source = parse_target($param->{source});
302 my $dest = parse_target($param->{dest}) if $param->{dest};
303
304 $job->{name} = !$param->{name} ? "default" : $param->{name};
305 $job->{dest} = $param->{dest} if $param->{dest};
306 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
307 $job->{method} = "ssh" if !$job->{method};
308 $job->{limit} = $param->{limit};
309 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
310 $job->{source} = $param->{source};
311 $job->{source_user} = $param->{source_user};
312 $job->{dest_user} = $param->{dest_user};
313 $job->{properties} = !!$param->{properties};
314 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
315
316 return $job;
317 }
318
319 sub read_state {
320
321 if (!-e $STATE) {
322 make_path $CONFIG_PATH;
323 my $new_fh = IO::File->new("> $STATE");
324 die "Could not create $STATE: $!\n" if !$new_fh;
325 print $new_fh "{}";
326 close($new_fh);
327 return undef;
328 }
329
330 my $fh = IO::File->new("< $STATE");
331 die "Could not open file $STATE: $!\n" if !$fh;
332
333 my $text = <$fh>;
334 my $states = decode_json($text);
335
336 close($fh);
337
338 return $states;
339 }
340
341 sub update_state {
342 my ($job) = @_;
343 my $text;
344 my $in_fh;
345
346 eval {
347
348 $in_fh = IO::File->new("< $STATE");
349 die "Could not open file $STATE: $!\n" if !$in_fh;
350 $text = <$in_fh>;
351 };
352
353 my $out_fh = IO::File->new("> $STATE.new");
354 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
355
356 my $states = {};
357 my $state = {};
358 if ($text){
359 $states = decode_json($text);
360 $state = $states->{$job->{source}}->{$job->{name}};
361 }
362
363 if ($job->{state} ne "del") {
364 $state->{state} = $job->{state};
365 $state->{lsync} = $job->{lsync};
366 $state->{vm_type} = $job->{vm_type};
367
368 for (my $i = 0; $job->{"snap$i"} ; $i++) {
369 $state->{"snap$i"} = $job->{"snap$i"};
370 }
371 $states->{$job->{source}}->{$job->{name}} = $state;
372 } else {
373
374 delete $states->{$job->{source}}->{$job->{name}};
375 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
376 }
377
378 $text = encode_json($states);
379 print $out_fh $text;
380
381 close($out_fh);
382 rename "$STATE.new", $STATE;
383 eval {
384 close($in_fh);
385 };
386
387 return $states;
388 }
389
390 sub update_cron {
391 my ($job) = @_;
392
393 my $updated;
394 my $has_header;
395 my $line_no = 0;
396 my $text = "";
397 my $header = "SHELL=/bin/sh\n";
398 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
399
400 my $fh = IO::File->new("< $CRONJOBS");
401 die "Could not open file $CRONJOBS: $!\n" if !$fh;
402
403 my @test = <$fh>;
404
405 while (my $line = shift(@test)) {
406 chomp($line);
407 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
408 $updated = 1;
409 next if $job->{state} eq "del";
410 $text .= format_job($job, $line);
411 } else {
412 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
413 $has_header = 1;
414 }
415 $text .= "$line\n";
416 }
417 $line_no++;
418 }
419
420 if (!$has_header) {
421 $text = "$header$text";
422 }
423
424 if (!$updated) {
425 $text .= format_job($job);
426 }
427 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
428 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
429
430 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
431 close ($new_fh);
432
433 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
434 close ($fh);
435 }
436
437 sub format_job {
438 my ($job, $line) = @_;
439 my $text = "";
440
441 if ($job->{state} eq "stopped") {
442 $text = "#";
443 }
444 if ($line) {
445 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
446 $text .= $1;
447 } else {
448 $text .= "*/$INTERVAL * * * *";
449 }
450 $text .= " root";
451 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
452 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
453 $text .= " --limit $job->{limit}" if $job->{limit};
454 $text .= " --method $job->{method}";
455 $text .= " --verbose" if $job->{verbose};
456 $text .= " --source-user $job->{source_user}";
457 $text .= " --dest-user $job->{dest_user}";
458 $text .= " --properties" if $job->{properties};
459 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
460 $text .= "\n";
461
462 return $text;
463 }
464
465 sub list {
466
467 my $cfg = read_cron();
468
469 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
470
471 my $states = read_state();
472 foreach my $source (sort keys%{$cfg}) {
473 foreach my $name (sort keys%{$cfg->{$source}}) {
474 $list .= sprintf("%-25s", cut_target_width($source, 25));
475 $list .= sprintf("%-25s", cut_target_width($name, 25));
476 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
477 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
478 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
479 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
480 }
481 }
482
483 return $list;
484 }
485
486 sub vm_exists {
487 my ($target, $user) = @_;
488
489 return undef if !defined($target->{vmid});
490
491 my $conf_fn = "$target->{vmid}.conf";
492
493 if ($target->{ip}) {
494 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
495 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
496 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
497 } else {
498 return "qemu" if -f "$QEMU_CONF/$conf_fn";
499 return "lxc" if -f "$LXC_CONF/$conf_fn";
500 }
501
502 return undef;
503 }
504
505 sub init {
506 my ($param) = @_;
507
508 locked("$CONFIG_PATH/cron_and_state.lock", sub {
509 my $cfg = read_cron();
510
511 my $job = param_to_job($param);
512
513 $job->{state} = "ok";
514 $job->{lsync} = 0;
515
516 my $source = parse_target($param->{source});
517 my $dest = parse_target($param->{dest});
518
519 if (my $ip = $dest->{ip}) {
520 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
521 }
522
523 if (my $ip = $source->{ip}) {
524 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
525 }
526
527 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
528
529 if (!defined($source->{vmid})) {
530 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
531 }
532
533 my $vm_type = vm_exists($source, $param->{source_user});
534 $job->{vm_type} = $vm_type;
535 $source->{vm_type} = $vm_type;
536
537 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
538
539 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
540
541 #check if vm has zfs disks if not die;
542 get_disks($source, $param->{source_user}) if $source->{vmid};
543
544 update_cron($job);
545 update_state($job);
546 }); #cron and state lock
547
548 eval {
549 sync($param) if !$param->{skip};
550 };
551 if(my $err = $@) {
552 destroy_job($param);
553 print $err;
554 }
555 }
556
557 sub get_job {
558 my ($param) = @_;
559
560 my $cfg = read_cron();
561
562 if (!$cfg->{$param->{source}}->{$param->{name}}) {
563 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
564 }
565 my $job = $cfg->{$param->{source}}->{$param->{name}};
566 $job->{name} = $param->{name};
567 $job->{source} = $param->{source};
568 $job = add_state_to_job($job);
569
570 return $job;
571 }
572
573 sub destroy_job {
574 my ($param) = @_;
575
576 locked("$CONFIG_PATH/cron_and_state.lock", sub {
577 my $job = get_job($param);
578 $job->{state} = "del";
579
580 update_cron($job);
581 update_state($job);
582 });
583 }
584
585 sub sync {
586 my ($param) = @_;
587
588 locked("$CONFIG_PATH/sync.lock", sub {
589
590 my $date = get_date();
591 my $job;
592
593 my $dest;
594 my $source;
595 my $vm_type;
596
597 locked("$CONFIG_PATH/cron_and_state.lock", sub {
598 eval { $job = get_job($param); };
599
600 if ($job && defined($job->{state}) && $job->{state} eq "syncing") {
601 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
602 }
603
604 $dest = parse_target($param->{dest});
605 $source = parse_target($param->{source});
606
607 $vm_type = vm_exists($source, $param->{source_user});
608 $source->{vm_type} = $vm_type;
609
610 if ($job) {
611 $job->{state} = "syncing";
612 $job->{vm_type} = $vm_type if !$job->{vm_type};
613 update_state($job);
614 }
615 }); #cron and state lock
616
617 my $sync_path = sub {
618 my ($source, $dest, $job, $param, $date) = @_;
619
620 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
621
622 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
623
624 send_image($source, $dest, $param);
625
626 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
627
628 };
629
630 eval{
631 if ($source->{vmid}) {
632 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
633 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
634 my $disks = get_disks($source, $param->{source_user});
635
636 foreach my $disk (sort keys %{$disks}) {
637 $source->{all} = $disks->{$disk}->{all};
638 $source->{pool} = $disks->{$disk}->{pool};
639 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
640 $source->{last_part} = $disks->{$disk}->{last_part};
641 &$sync_path($source, $dest, $job, $param, $date);
642 }
643 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
644 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
645 } else {
646 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
647 }
648 } else {
649 &$sync_path($source, $dest, $job, $param, $date);
650 }
651 };
652 if (my $err = $@) {
653 locked("$CONFIG_PATH/cron_and_state.lock", sub {
654 eval { $job = get_job($param); };
655 if ($job) {
656 $job->{state} = "error";
657 update_state($job);
658 }
659 });
660 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
661 die "$err\n";
662 }
663
664 locked("$CONFIG_PATH/cron_and_state.lock", sub {
665 eval { $job = get_job($param); };
666 if ($job) {
667 $job->{state} = "ok";
668 $job->{lsync} = $date;
669 update_state($job);
670 }
671 });
672 }); #sync lock
673 }
674
675 sub snapshot_get{
676 my ($source, $dest, $max_snap, $name, $source_user) = @_;
677
678 my $cmd = [];
679 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
680 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
681 push @$cmd, $source->{all};
682
683 my $raw = run_cmd($cmd);
684 my $index = 0;
685 my $line = "";
686 my $last_snap = undef;
687 my $old_snap;
688
689 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
690 $line = $1;
691 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
692
693 $last_snap = $1 if (!$last_snap);
694 $old_snap = $1;
695 $index++;
696 if ($index == $max_snap) {
697 $source->{destroy} = 1;
698 last;
699 };
700 }
701 }
702
703 return ($old_snap, $last_snap) if $last_snap;
704
705 return undef;
706 }
707
708 sub snapshot_add {
709 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
710
711 my $snap_name = "rep_$name\_".$date;
712
713 $source->{new_snap} = $snap_name;
714
715 my $path = "$source->{all}\@$snap_name";
716
717 my $cmd = [];
718 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
719 push @$cmd, 'zfs', 'snapshot', $path;
720 eval{
721 run_cmd($cmd);
722 };
723
724 if (my $err = $@) {
725 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
726 die "$err\n";
727 }
728 }
729
730 sub write_cron {
731 my ($cfg) = @_;
732
733 my $text = "SHELL=/bin/sh\n";
734 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
735
736 my $fh = IO::File->new("> $CRONJOBS");
737 die "Could not open file: $!\n" if !$fh;
738
739 foreach my $source (sort keys%{$cfg}) {
740 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
741 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
742 $text .= "$PROG_PATH sync";
743 $text .= " -source ";
744 if ($cfg->{$source}->{$sync_name}->{vmid}) {
745 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
746 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
747 } else {
748 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
749 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
750 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
751 }
752 $text .= " -dest ";
753 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
754 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
755 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
756 $text .= " -name $sync_name ";
757 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
758 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
759 $text .= "\n";
760 }
761 }
762 die "Can't write to cron\n" if (!print($fh $text));
763 close($fh);
764 }
765
766 sub get_disks {
767 my ($target, $user) = @_;
768
769 my $cmd = [];
770 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
771
772 if ($target->{vm_type} eq 'qemu') {
773 push @$cmd, 'qm', 'config', $target->{vmid};
774 } elsif ($target->{vm_type} eq 'lxc') {
775 push @$cmd, 'pct', 'config', $target->{vmid};
776 } else {
777 die "VM Type unknown\n";
778 }
779
780 my $res = run_cmd($cmd);
781
782 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
783
784 return $disks;
785 }
786
787 sub run_cmd {
788 my ($cmd) = @_;
789 print "Start CMD\n" if $DEBUG;
790 print Dumper $cmd if $DEBUG;
791 if (ref($cmd) eq 'ARRAY') {
792 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
793 }
794 my $output = `$cmd 2>&1`;
795
796 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
797
798 chomp($output);
799 print Dumper $output if $DEBUG;
800 print "END CMD\n" if $DEBUG;
801 return $output;
802 }
803
804 sub parse_disks {
805 my ($text, $ip, $vm_type, $user) = @_;
806
807 my $disks;
808
809 my $num = 0;
810 while ($text && $text =~ s/^(.*?)(\n|$)//) {
811 my $line = $1;
812
813 next if $line =~ /media=cdrom/;
814 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
815
816 #QEMU if backup is not set include in sync
817 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
818
819 #LXC if backup is not set do no in sync
820 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
821
822 my $disk = undef;
823 my $stor = undef;
824 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
825 my @parameter = split(/,/,$1);
826
827 foreach my $opt (@parameter) {
828 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
829 $disk = $2;
830 $stor = $1;
831 last;
832 }
833 }
834 }
835 if (!defined($disk) || !defined($stor)) {
836 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
837 next;
838 }
839
840 my $cmd = [];
841 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
842 push @$cmd, 'pvesm', 'path', "$stor$disk";
843 my $path = run_cmd($cmd);
844
845 die "Get no path from pvesm path $stor$disk\n" if !$path;
846
847 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
848
849 my @array = split('/', $1);
850 $disks->{$num}->{pool} = shift(@array);
851 $disks->{$num}->{all} = $disks->{$num}->{pool};
852 if (0 < @array) {
853 $disks->{$num}->{path} = join('/', @array);
854 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
855 }
856 $disks->{$num}->{last_part} = $disk;
857 $disks->{$num}->{all} .= "\/$disk";
858
859 $num++;
860 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
861
862 $disks->{$num}->{pool} = $1;
863 $disks->{$num}->{all} = $disks->{$num}->{pool};
864
865 if ($2) {
866 $disks->{$num}->{path} = $3;
867 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
868 }
869
870 $disks->{$num}->{last_part} = $disk;
871 $disks->{$num}->{all} .= "\/$disk";
872
873 $num++;
874
875 } else {
876 die "ERROR: in path\n";
877 }
878 }
879
880 die "Vm include no disk on zfs.\n" if !$disks->{0};
881 return $disks;
882 }
883
884 sub snapshot_destroy {
885 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
886
887 my @zfscmd = ('zfs', 'destroy');
888 my $snapshot = "$source->{all}\@$snap";
889
890 eval {
891 if($source->{ip} && $method eq 'ssh'){
892 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
893 } else {
894 run_cmd([@zfscmd, $snapshot]);
895 }
896 };
897 if (my $erro = $@) {
898 warn "WARN: $erro";
899 }
900 if ($dest) {
901 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
902
903 my $path = "$dest->{all}";
904 $path .= "/$source->{last_part}" if $source->{last_part};
905
906 eval {
907 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
908 };
909 if (my $erro = $@) {
910 warn "WARN: $erro";
911 }
912 }
913 }
914
915 sub snapshot_exist {
916 my ($source , $dest, $method, $dest_user) = @_;
917
918 my $cmd = [];
919 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--' if $dest->{ip};
920 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
921
922 my $path = $dest->{all};
923 $path .= "/$source->{last_part}" if $source->{last_part};
924 $path .= "\@$source->{old_snap}";
925
926 push @$cmd, $path;
927
928
929 my $text = "";
930 eval {$text =run_cmd($cmd);};
931 if (my $erro =$@) {
932 warn "WARN: $erro";
933 return undef;
934 }
935
936 while ($text && $text =~ s/^(.*?)(\n|$)//) {
937 my $line =$1;
938 return 1 if $line =~ m/^.*$source->{old_snap}$/;
939 }
940 }
941
942 sub send_image {
943 my ($source, $dest, $param) = @_;
944
945 my $cmd = [];
946
947 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
948 push @$cmd, 'zfs', 'send';
949 push @$cmd, '-p', if $param->{properties};
950 push @$cmd, '-v' if $param->{verbose};
951
952 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{dest_user})) {
953 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
954 }
955 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
956
957 if ($param->{limit}){
958 my $bwl = $param->{limit}*1024;
959 push @$cmd, \'|', 'cstream', '-t', $bwl;
960 }
961 my $target = "$dest->{all}";
962 $target .= "/$source->{last_part}" if $source->{last_part};
963 $target =~ s!/+!/!g;
964
965 push @$cmd, \'|';
966 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
967 push @$cmd, 'zfs', 'recv', '-F', '--';
968 push @$cmd, "$target";
969
970 eval {
971 run_cmd($cmd)
972 };
973
974 if (my $erro = $@) {
975 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
976 die $erro;
977 };
978 }
979
980
981 sub send_config{
982 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
983
984 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
985 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
986
987 my $config_dir = $dest_config_path // $CONFIG_PATH;
988 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
989
990 $dest_target_new = $config_dir.'/'.$dest_target_new;
991
992 if ($method eq 'ssh'){
993 if ($dest->{ip} && $source->{ip}) {
994 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
995 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
996 } elsif ($dest->{ip}) {
997 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
998 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
999 } elsif ($source->{ip}) {
1000 run_cmd(['mkdir', '-p', '--', $config_dir]);
1001 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
1002 }
1003
1004 if ($source->{destroy}){
1005 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
1006 if($dest->{ip}){
1007 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
1008 } else {
1009 run_cmd(['rm', '-f', '--', $dest_target_old]);
1010 }
1011 }
1012 } elsif ($method eq 'local') {
1013 run_cmd(['mkdir', '-p', '--', $config_dir]);
1014 run_cmd(['cp', $source_target, $dest_target_new]);
1015 }
1016 }
1017
1018 sub get_date {
1019 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1020 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1021
1022 return $datestamp;
1023 }
1024
1025 sub status {
1026 my $cfg = read_cron();
1027
1028 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1029
1030 my $states = read_state();
1031
1032 foreach my $source (sort keys%{$cfg}) {
1033 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1034 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1035 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1036 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1037 }
1038 }
1039
1040 return $status_list;
1041 }
1042
1043 sub enable_job {
1044 my ($param) = @_;
1045
1046 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1047 my $job = get_job($param);
1048 $job->{state} = "ok";
1049 update_state($job);
1050 update_cron($job);
1051 });
1052 }
1053
1054 sub disable_job {
1055 my ($param) = @_;
1056
1057 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1058 my $job = get_job($param);
1059 $job->{state} = "stopped";
1060 update_state($job);
1061 update_cron($job);
1062 });
1063 }
1064
1065 my $cmd_help = {
1066 destroy => qq{
1067 $PROGNAME destroy -source <string> [OPTIONS]
1068
1069 remove a sync Job from the scheduler
1070
1071 -name string
1072
1073 name of the sync job, if not set it is default
1074
1075 -source string
1076
1077 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1078 },
1079 create => qq{
1080 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1081
1082 Create a sync Job
1083
1084 -dest string
1085
1086 the destination target is like [IP]:<Pool>[/Path]
1087
1088 -dest-user string
1089
1090 name of the user on the destination target, root by default
1091
1092 -limit integer
1093
1094 max sync speed in kBytes/s, default unlimited
1095
1096 -maxsnap string
1097
1098 how much snapshots will be kept before get erased, default 1
1099
1100 -name string
1101
1102 name of the sync job, if not set it is default
1103
1104 -skip boolean
1105
1106 if this flag is set it will skip the first sync
1107
1108 -source string
1109
1110 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1111
1112 -source-user string
1113
1114 name of the user on the source target, root by default
1115
1116 -properties boolean
1117
1118 Include the dataset's properties in the stream.
1119
1120 -dest-config-path string
1121
1122 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1123 },
1124 sync => qq{
1125 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1126
1127 will sync one time
1128
1129 -dest string
1130
1131 the destination target is like [IP:]<Pool>[/Path]
1132
1133 -dest-user string
1134
1135 name of the user on the destination target, root by default
1136
1137 -limit integer
1138
1139 max sync speed in kBytes/s, default unlimited
1140
1141 -maxsnap integer
1142
1143 how much snapshots will be kept before get erased, default 1
1144
1145 -name string
1146
1147 name of the sync job, if not set it is default.
1148 It is only necessary if scheduler allready contains this source.
1149
1150 -source string
1151
1152 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1153
1154 -source-user string
1155
1156 name of the user on the source target, root by default
1157
1158 -verbose boolean
1159
1160 print out the sync progress.
1161
1162 -properties boolean
1163
1164 Include the dataset's properties in the stream.
1165
1166 -dest-config-path string
1167
1168 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1169 },
1170 list => qq{
1171 $PROGNAME list
1172
1173 Get a List of all scheduled Sync Jobs
1174 },
1175 status => qq{
1176 $PROGNAME status
1177
1178 Get the status of all scheduled Sync Jobs
1179 },
1180 help => qq{
1181 $PROGNAME help <cmd> [OPTIONS]
1182
1183 Get help about specified command.
1184
1185 <cmd> string
1186
1187 Command name
1188
1189 -verbose boolean
1190
1191 Verbose output format.
1192 },
1193 enable => qq{
1194 $PROGNAME enable -source <string> [OPTIONS]
1195
1196 enable a syncjob and reset error
1197
1198 -name string
1199
1200 name of the sync job, if not set it is default
1201
1202 -source string
1203
1204 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1205 },
1206 disable => qq{
1207 $PROGNAME disable -source <string> [OPTIONS]
1208
1209 pause a sync job
1210
1211 -name string
1212
1213 name of the sync job, if not set it is default
1214
1215 -source string
1216
1217 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1218 },
1219 printpod => 'internal command',
1220
1221 };
1222
1223 if (!$command) {
1224 usage(); die "\n";
1225 } elsif (!$cmd_help->{$command}) {
1226 print "ERROR: unknown command '$command'";
1227 usage(1); die "\n";
1228 }
1229
1230 my @arg = @ARGV;
1231 my $param = parse_argv(@arg);
1232
1233 sub check_params {
1234 for (@_) {
1235 die "$cmd_help->{$command}\n" if !$param->{$_};
1236 }
1237 }
1238
1239 if ($command eq 'destroy') {
1240 check_params(qw(source));
1241
1242 check_target($param->{source});
1243 destroy_job($param);
1244
1245 } elsif ($command eq 'sync') {
1246 check_params(qw(source dest));
1247
1248 check_target($param->{source});
1249 check_target($param->{dest});
1250 sync($param);
1251
1252 } elsif ($command eq 'create') {
1253 check_params(qw(source dest));
1254
1255 check_target($param->{source});
1256 check_target($param->{dest});
1257 init($param);
1258
1259 } elsif ($command eq 'status') {
1260 print status();
1261
1262 } elsif ($command eq 'list') {
1263 print list();
1264
1265 } elsif ($command eq 'help') {
1266 my $help_command = $ARGV[1];
1267
1268 if ($help_command && $cmd_help->{$help_command}) {
1269 die "$cmd_help->{$help_command}\n";
1270
1271 }
1272 if ($param->{verbose}) {
1273 exec("man $PROGNAME");
1274
1275 } else {
1276 usage(1);
1277
1278 }
1279
1280 } elsif ($command eq 'enable') {
1281 check_params(qw(source));
1282
1283 check_target($param->{source});
1284 enable_job($param);
1285
1286 } elsif ($command eq 'disable') {
1287 check_params(qw(source));
1288
1289 check_target($param->{source});
1290 disable_job($param);
1291
1292 } elsif ($command eq 'printpod') {
1293 print_pod();
1294 }
1295
1296 sub usage {
1297 my ($help) = @_;
1298
1299 print("ERROR:\tno command specified\n") if !$help;
1300 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1301 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1302 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1303 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1304 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1305 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1306 print("\t$PROGNAME list\n");
1307 print("\t$PROGNAME status\n");
1308 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1309 }
1310
1311 sub check_target {
1312 my ($target) = @_;
1313 parse_target($target);
1314 }
1315
1316 sub print_pod {
1317
1318 my $synopsis = join("\n", sort values %$cmd_help);
1319
1320 print <<EOF;
1321 =head1 NAME
1322
1323 pve-zsync - PVE ZFS Replication Manager
1324
1325 =head1 SYNOPSIS
1326
1327 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1328
1329 $synopsis
1330
1331 =head1 DESCRIPTION
1332
1333 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1334 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1335 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1336 To config cron see man crontab.
1337
1338 =head2 PVE ZFS Storage sync Tool
1339
1340 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1341
1342 =head1 EXAMPLES
1343
1344 add sync job from local VM to remote ZFS Server
1345 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1346
1347 =head1 IMPORTANT FILES
1348
1349 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1350
1351 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1352
1353 =head1 COPYRIGHT AND DISCLAIMER
1354
1355 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1356
1357 This program is free software: you can redistribute it and/or modify it
1358 under the terms of the GNU Affero General Public License as published
1359 by the Free Software Foundation, either version 3 of the License, or
1360 (at your option) any later version.
1361
1362 This program is distributed in the hope that it will be useful, but
1363 WITHOUT ANY WARRANTY; without even the implied warranty of
1364 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1365 Affero General Public License for more details.
1366
1367 You should have received a copy of the GNU Affero General Public
1368 License along with this program. If not, see
1369 <http://www.gnu.org/licenses/>.
1370
1371 EOF
1372 }