]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
Refactor locking
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $PROG_PATH = "$PATH/${PROGNAME}";
22 my $INTERVAL = 15;
23 my $DEBUG;
24
25 BEGIN {
26 $DEBUG = 0; # change default here. not above on declaration!
27 $DEBUG ||= $ENV{ZSYNC_DEBUG};
28 if ($DEBUG) {
29 require Data::Dumper;
30 Data::Dumper->import();
31 }
32 }
33
34 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
35 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
36 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
37 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
38
39 my $IPV6RE = "(?:" .
40 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
41 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
42 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
43 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
49
50 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
51 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
52 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
53 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
54 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
55
56 my $command = $ARGV[0];
57
58 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
59 check_bin ('cstream');
60 check_bin ('zfs');
61 check_bin ('ssh');
62 check_bin ('scp');
63 }
64
65 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
66 die "Signaled, aborting sync: $!\n";
67 };
68
69 sub check_bin {
70 my ($bin) = @_;
71
72 foreach my $p (split (/:/, $ENV{PATH})) {
73 my $fn = "$p/$bin";
74 if (-x $fn) {
75 return $fn;
76 }
77 }
78
79 die "unable to find command '$bin'\n";
80 }
81
82 sub cut_target_width {
83 my ($path, $maxlen) = @_;
84 $path =~ s@/+@/@g;
85
86 return $path if length($path) <= $maxlen;
87
88 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
89
90 $path =~ s@/([^/]+/?)$@@;
91 my $tail = $1;
92
93 if (length($tail)+3 == $maxlen) {
94 return "../$tail";
95 } elsif (length($tail)+2 >= $maxlen) {
96 return '..'.substr($tail, -$maxlen+2)
97 }
98
99 $path =~ s@(/[^/]+)(?:/|$)@@;
100 my $head = $1;
101 my $both = length($head) + length($tail);
102 my $remaining = $maxlen-$both-4; # -4 for "/../"
103
104 if ($remaining < 0) {
105 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
106 }
107
108 substr($path, ($remaining/2), (length($path)-$remaining), '..');
109 return "$head/" . $path . "/$tail";
110 }
111
112 sub locked {
113 my ($lock_fn, $code) = @_;
114
115 my $lock_fh = IO::File->new("> $lock_fn");
116
117 flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
118 my $res = eval { $code->() };
119 my $err = $@;
120
121 flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
122 die "$err" if $err;
123
124 close($lock_fh);
125 return $res;
126 }
127
128 sub get_status {
129 my ($source, $name, $status) = @_;
130
131 if ($status->{$source->{all}}->{$name}->{status}) {
132 return $status;
133 }
134
135 return undef;
136 }
137
138 sub check_pool_exists {
139 my ($target, $user) = @_;
140
141 my $cmd = [];
142
143 if ($target->{ip}) {
144 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
145 }
146 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
147 eval {
148 run_cmd($cmd);
149 };
150
151 if ($@) {
152 return 0;
153 }
154 return 1;
155 }
156
157 sub parse_target {
158 my ($text) = @_;
159
160 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
161 my $target = {};
162
163 if ($text !~ $TARGETRE) {
164 die "$errstr\n";
165 }
166 $target->{all} = $2;
167 $target->{ip} = $1 if $1;
168 my @parts = split('/', $2);
169
170 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
171
172 my $pool = $target->{pool} = shift(@parts);
173 die "$errstr\n" if !$pool;
174
175 if ($pool =~ m/^\d+$/) {
176 $target->{vmid} = $pool;
177 delete $target->{pool};
178 }
179
180 return $target if (@parts == 0);
181 $target->{last_part} = pop(@parts);
182
183 if ($target->{ip}) {
184 pop(@parts);
185 }
186 if (@parts > 0) {
187 $target->{path} = join('/', @parts);
188 }
189
190 return $target;
191 }
192
193 sub read_cron {
194
195 #This is for the first use to init file;
196 if (!-e $CRONJOBS) {
197 my $new_fh = IO::File->new("> $CRONJOBS");
198 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
199 close($new_fh);
200 return undef;
201 }
202
203 my $fh = IO::File->new("< $CRONJOBS");
204 die "Could not open file $CRONJOBS: $!\n" if !$fh;
205
206 my @text = <$fh>;
207
208 close($fh);
209
210 return encode_cron(@text);
211 }
212
213 sub parse_argv {
214 my (@arg) = @_;
215
216 my $param = {
217 dest => undef,
218 source => undef,
219 verbose => undef,
220 limit => undef,
221 maxsnap => undef,
222 name => undef,
223 skip => undef,
224 method => undef,
225 source_user => undef,
226 dest_user => undef,
227 properties => undef,
228 dest_config_path => undef,
229 };
230
231 my ($ret) = GetOptionsFromArray(
232 \@arg,
233 'dest=s' => \$param->{dest},
234 'source=s' => \$param->{source},
235 'verbose' => \$param->{verbose},
236 'limit=i' => \$param->{limit},
237 'maxsnap=i' => \$param->{maxsnap},
238 'name=s' => \$param->{name},
239 'skip' => \$param->{skip},
240 'method=s' => \$param->{method},
241 'source-user=s' => \$param->{source_user},
242 'dest-user=s' => \$param->{dest_user},
243 'properties' => \$param->{properties},
244 'dest-config-path=s' => \$param->{dest_config_path},
245 );
246
247 die "can't parse options\n" if $ret == 0;
248
249 $param->{name} //= "default";
250 $param->{maxsnap} //= 1;
251 $param->{method} //= "ssh";
252 $param->{source_user} //= "root";
253 $param->{dest_user} //= "root";
254
255 return $param;
256 }
257
258 sub add_state_to_job {
259 my ($job) = @_;
260
261 my $states = read_state();
262 my $state = $states->{$job->{source}}->{$job->{name}};
263
264 $job->{state} = $state->{state};
265 $job->{lsync} = $state->{lsync};
266 $job->{vm_type} = $state->{vm_type};
267
268 for (my $i = 0; $state->{"snap$i"}; $i++) {
269 $job->{"snap$i"} = $state->{"snap$i"};
270 }
271
272 return $job;
273 }
274
275 sub encode_cron {
276 my (@text) = @_;
277
278 my $cfg = {};
279
280 while (my $line = shift(@text)) {
281
282 my @arg = split('\s', $line);
283 my $param = parse_argv(@arg);
284
285 if ($param->{source} && $param->{dest}) {
286 my $source = delete $param->{source};
287 my $name = delete $param->{name};
288
289 $cfg->{$source}->{$name} = $param;
290 }
291 }
292
293 return $cfg;
294 }
295
296 sub param_to_job {
297 my ($param) = @_;
298
299 my $job = {};
300
301 my $source = parse_target($param->{source});
302 my $dest = parse_target($param->{dest}) if $param->{dest};
303
304 $job->{name} = !$param->{name} ? "default" : $param->{name};
305 $job->{dest} = $param->{dest} if $param->{dest};
306 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
307 $job->{method} = "ssh" if !$job->{method};
308 $job->{limit} = $param->{limit};
309 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
310 $job->{source} = $param->{source};
311 $job->{source_user} = $param->{source_user};
312 $job->{dest_user} = $param->{dest_user};
313 $job->{properties} = !!$param->{properties};
314 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
315
316 return $job;
317 }
318
319 sub read_state {
320
321 if (!-e $STATE) {
322 make_path $CONFIG_PATH;
323 my $new_fh = IO::File->new("> $STATE");
324 die "Could not create $STATE: $!\n" if !$new_fh;
325 print $new_fh "{}";
326 close($new_fh);
327 return undef;
328 }
329
330 my $fh = IO::File->new("< $STATE");
331 die "Could not open file $STATE: $!\n" if !$fh;
332
333 my $text = <$fh>;
334 my $states = decode_json($text);
335
336 close($fh);
337
338 return $states;
339 }
340
341 sub update_state {
342 my ($job) = @_;
343 my $text;
344 my $in_fh;
345
346 eval {
347
348 $in_fh = IO::File->new("< $STATE");
349 die "Could not open file $STATE: $!\n" if !$in_fh;
350 $text = <$in_fh>;
351 };
352
353 my $out_fh = IO::File->new("> $STATE.new");
354 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
355
356 my $states = {};
357 my $state = {};
358 if ($text){
359 $states = decode_json($text);
360 $state = $states->{$job->{source}}->{$job->{name}};
361 }
362
363 if ($job->{state} ne "del") {
364 $state->{state} = $job->{state};
365 $state->{lsync} = $job->{lsync};
366 $state->{vm_type} = $job->{vm_type};
367
368 for (my $i = 0; $job->{"snap$i"} ; $i++) {
369 $state->{"snap$i"} = $job->{"snap$i"};
370 }
371 $states->{$job->{source}}->{$job->{name}} = $state;
372 } else {
373
374 delete $states->{$job->{source}}->{$job->{name}};
375 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
376 }
377
378 $text = encode_json($states);
379 print $out_fh $text;
380
381 close($out_fh);
382 rename "$STATE.new", $STATE;
383 eval {
384 close($in_fh);
385 };
386
387 return $states;
388 }
389
390 sub update_cron {
391 my ($job) = @_;
392
393 my $updated;
394 my $has_header;
395 my $line_no = 0;
396 my $text = "";
397 my $header = "SHELL=/bin/sh\n";
398 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
399
400 my $fh = IO::File->new("< $CRONJOBS");
401 die "Could not open file $CRONJOBS: $!\n" if !$fh;
402
403 my @test = <$fh>;
404
405 while (my $line = shift(@test)) {
406 chomp($line);
407 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
408 $updated = 1;
409 next if $job->{state} eq "del";
410 $text .= format_job($job, $line);
411 } else {
412 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
413 $has_header = 1;
414 }
415 $text .= "$line\n";
416 }
417 $line_no++;
418 }
419
420 if (!$has_header) {
421 $text = "$header$text";
422 }
423
424 if (!$updated) {
425 $text .= format_job($job);
426 }
427 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
428 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
429
430 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
431 close ($new_fh);
432
433 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
434 close ($fh);
435 }
436
437 sub format_job {
438 my ($job, $line) = @_;
439 my $text = "";
440
441 if ($job->{state} eq "stopped") {
442 $text = "#";
443 }
444 if ($line) {
445 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
446 $text .= $1;
447 } else {
448 $text .= "*/$INTERVAL * * * *";
449 }
450 $text .= " root";
451 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
452 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
453 $text .= " --limit $job->{limit}" if $job->{limit};
454 $text .= " --method $job->{method}";
455 $text .= " --verbose" if $job->{verbose};
456 $text .= " --source-user $job->{source_user}";
457 $text .= " --dest-user $job->{dest_user}";
458 $text .= " --properties" if $job->{properties};
459 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
460 $text .= "\n";
461
462 return $text;
463 }
464
465 sub list {
466
467 my $cfg = read_cron();
468
469 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
470
471 my $states = read_state();
472 foreach my $source (sort keys%{$cfg}) {
473 foreach my $name (sort keys%{$cfg->{$source}}) {
474 $list .= sprintf("%-25s", cut_target_width($source, 25));
475 $list .= sprintf("%-25s", cut_target_width($name, 25));
476 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
477 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
478 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
479 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
480 }
481 }
482
483 return $list;
484 }
485
486 sub vm_exists {
487 my ($target, $user) = @_;
488
489 return undef if !defined($target->{vmid});
490
491 my $conf_fn = "$target->{vmid}.conf";
492
493 if ($target->{ip}) {
494 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
495 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
496 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
497 } else {
498 return "qemu" if -f "$QEMU_CONF/$conf_fn";
499 return "lxc" if -f "$LXC_CONF/$conf_fn";
500 }
501
502 return undef;
503 }
504
505 sub init {
506 my ($param) = @_;
507
508 locked("$CONFIG_PATH/cron_and_state.lock", sub {
509 my $cfg = read_cron();
510
511 my $job = param_to_job($param);
512
513 $job->{state} = "ok";
514 $job->{lsync} = 0;
515
516 my $source = parse_target($param->{source});
517 my $dest = parse_target($param->{dest});
518
519 if (my $ip = $dest->{ip}) {
520 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
521 }
522
523 if (my $ip = $source->{ip}) {
524 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
525 }
526
527 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
528
529 if (!defined($source->{vmid})) {
530 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
531 }
532
533 my $vm_type = vm_exists($source, $param->{source_user});
534 $job->{vm_type} = $vm_type;
535 $source->{vm_type} = $vm_type;
536
537 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
538
539 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
540
541 #check if vm has zfs disks if not die;
542 get_disks($source, $param->{source_user}) if $source->{vmid};
543
544 update_cron($job);
545 update_state($job);
546 }); #cron and state lock
547
548 eval {
549 sync($param) if !$param->{skip};
550 };
551 if(my $err = $@) {
552 destroy_job($param);
553 print $err;
554 }
555 }
556
557 sub get_job {
558 my ($param) = @_;
559
560 my $cfg = read_cron();
561
562 if (!$cfg->{$param->{source}}->{$param->{name}}) {
563 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
564 }
565 my $job = $cfg->{$param->{source}}->{$param->{name}};
566 $job->{name} = $param->{name};
567 $job->{source} = $param->{source};
568 $job = add_state_to_job($job);
569
570 return $job;
571 }
572
573 sub destroy_job {
574 my ($param) = @_;
575
576 locked("$CONFIG_PATH/cron_and_state.lock", sub {
577 my $job = get_job($param);
578 $job->{state} = "del";
579
580 update_cron($job);
581 update_state($job);
582 });
583 }
584
585 sub sync {
586 my ($param) = @_;
587
588 locked("$CONFIG_PATH/sync.lock", sub {
589
590 my $date = get_date();
591 my $job;
592 eval {
593 $job = get_job($param);
594 };
595
596 if ($job && defined($job->{state}) && $job->{state} eq "syncing") {
597 die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
598 }
599
600 my $dest = parse_target($param->{dest});
601 my $source = parse_target($param->{source});
602
603 my $sync_path = sub {
604 my ($source, $dest, $job, $param, $date) = @_;
605
606 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
607
608 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
609
610 send_image($source, $dest, $param);
611
612 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
613
614 };
615
616 my $vm_type = vm_exists($source, $param->{source_user});
617 $source->{vm_type} = $vm_type;
618
619 if ($job) {
620 $job->{state} = "syncing";
621 $job->{vm_type} = $vm_type if !$job->{vm_type};
622 locked("$CONFIG_PATH/cron_and_state.lock", sub { update_state($job); });
623 }
624
625 eval{
626 if ($source->{vmid}) {
627 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
628 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
629 my $disks = get_disks($source, $param->{source_user});
630
631 foreach my $disk (sort keys %{$disks}) {
632 $source->{all} = $disks->{$disk}->{all};
633 $source->{pool} = $disks->{$disk}->{pool};
634 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
635 $source->{last_part} = $disks->{$disk}->{last_part};
636 &$sync_path($source, $dest, $job, $param, $date);
637 }
638 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
639 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
640 } else {
641 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
642 }
643 } else {
644 &$sync_path($source, $dest, $job, $param, $date);
645 }
646 };
647 if (my $err = $@) {
648 if ($job) {
649 $job->{state} = "error";
650 locked("$CONFIG_PATH/cron_and_state.lock", sub { update_state($job); });
651 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
652 }
653 die "$err\n";
654 }
655
656 if ($job) {
657 $job->{state} = "ok";
658 $job->{lsync} = $date;
659 locked("$CONFIG_PATH/cron_and_state.lock", sub { update_state($job); });
660 }
661 }); #sync lock
662 }
663
664 sub snapshot_get{
665 my ($source, $dest, $max_snap, $name, $source_user) = @_;
666
667 my $cmd = [];
668 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
669 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
670 push @$cmd, $source->{all};
671
672 my $raw = run_cmd($cmd);
673 my $index = 0;
674 my $line = "";
675 my $last_snap = undef;
676 my $old_snap;
677
678 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
679 $line = $1;
680 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
681
682 $last_snap = $1 if (!$last_snap);
683 $old_snap = $1;
684 $index++;
685 if ($index == $max_snap) {
686 $source->{destroy} = 1;
687 last;
688 };
689 }
690 }
691
692 return ($old_snap, $last_snap) if $last_snap;
693
694 return undef;
695 }
696
697 sub snapshot_add {
698 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
699
700 my $snap_name = "rep_$name\_".$date;
701
702 $source->{new_snap} = $snap_name;
703
704 my $path = "$source->{all}\@$snap_name";
705
706 my $cmd = [];
707 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
708 push @$cmd, 'zfs', 'snapshot', $path;
709 eval{
710 run_cmd($cmd);
711 };
712
713 if (my $err = $@) {
714 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
715 die "$err\n";
716 }
717 }
718
719 sub write_cron {
720 my ($cfg) = @_;
721
722 my $text = "SHELL=/bin/sh\n";
723 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
724
725 my $fh = IO::File->new("> $CRONJOBS");
726 die "Could not open file: $!\n" if !$fh;
727
728 foreach my $source (sort keys%{$cfg}) {
729 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
730 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
731 $text .= "$PROG_PATH sync";
732 $text .= " -source ";
733 if ($cfg->{$source}->{$sync_name}->{vmid}) {
734 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
735 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
736 } else {
737 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
738 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
739 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
740 }
741 $text .= " -dest ";
742 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
743 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
744 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
745 $text .= " -name $sync_name ";
746 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
747 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
748 $text .= "\n";
749 }
750 }
751 die "Can't write to cron\n" if (!print($fh $text));
752 close($fh);
753 }
754
755 sub get_disks {
756 my ($target, $user) = @_;
757
758 my $cmd = [];
759 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
760
761 if ($target->{vm_type} eq 'qemu') {
762 push @$cmd, 'qm', 'config', $target->{vmid};
763 } elsif ($target->{vm_type} eq 'lxc') {
764 push @$cmd, 'pct', 'config', $target->{vmid};
765 } else {
766 die "VM Type unknown\n";
767 }
768
769 my $res = run_cmd($cmd);
770
771 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
772
773 return $disks;
774 }
775
776 sub run_cmd {
777 my ($cmd) = @_;
778 print "Start CMD\n" if $DEBUG;
779 print Dumper $cmd if $DEBUG;
780 if (ref($cmd) eq 'ARRAY') {
781 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
782 }
783 my $output = `$cmd 2>&1`;
784
785 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
786
787 chomp($output);
788 print Dumper $output if $DEBUG;
789 print "END CMD\n" if $DEBUG;
790 return $output;
791 }
792
793 sub parse_disks {
794 my ($text, $ip, $vm_type, $user) = @_;
795
796 my $disks;
797
798 my $num = 0;
799 while ($text && $text =~ s/^(.*?)(\n|$)//) {
800 my $line = $1;
801
802 next if $line =~ /media=cdrom/;
803 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
804
805 #QEMU if backup is not set include in sync
806 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
807
808 #LXC if backup is not set do no in sync
809 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
810
811 my $disk = undef;
812 my $stor = undef;
813 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
814 my @parameter = split(/,/,$1);
815
816 foreach my $opt (@parameter) {
817 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
818 $disk = $2;
819 $stor = $1;
820 last;
821 }
822 }
823 }
824 if (!defined($disk) || !defined($stor)) {
825 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
826 next;
827 }
828
829 my $cmd = [];
830 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
831 push @$cmd, 'pvesm', 'path', "$stor$disk";
832 my $path = run_cmd($cmd);
833
834 die "Get no path from pvesm path $stor$disk\n" if !$path;
835
836 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
837
838 my @array = split('/', $1);
839 $disks->{$num}->{pool} = shift(@array);
840 $disks->{$num}->{all} = $disks->{$num}->{pool};
841 if (0 < @array) {
842 $disks->{$num}->{path} = join('/', @array);
843 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
844 }
845 $disks->{$num}->{last_part} = $disk;
846 $disks->{$num}->{all} .= "\/$disk";
847
848 $num++;
849 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
850
851 $disks->{$num}->{pool} = $1;
852 $disks->{$num}->{all} = $disks->{$num}->{pool};
853
854 if ($2) {
855 $disks->{$num}->{path} = $3;
856 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
857 }
858
859 $disks->{$num}->{last_part} = $disk;
860 $disks->{$num}->{all} .= "\/$disk";
861
862 $num++;
863
864 } else {
865 die "ERROR: in path\n";
866 }
867 }
868
869 die "Vm include no disk on zfs.\n" if !$disks->{0};
870 return $disks;
871 }
872
873 sub snapshot_destroy {
874 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
875
876 my @zfscmd = ('zfs', 'destroy');
877 my $snapshot = "$source->{all}\@$snap";
878
879 eval {
880 if($source->{ip} && $method eq 'ssh'){
881 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
882 } else {
883 run_cmd([@zfscmd, $snapshot]);
884 }
885 };
886 if (my $erro = $@) {
887 warn "WARN: $erro";
888 }
889 if ($dest) {
890 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
891
892 my $path = "$dest->{all}";
893 $path .= "/$source->{last_part}" if $source->{last_part};
894
895 eval {
896 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
897 };
898 if (my $erro = $@) {
899 warn "WARN: $erro";
900 }
901 }
902 }
903
904 sub snapshot_exist {
905 my ($source , $dest, $method, $dest_user) = @_;
906
907 my $cmd = [];
908 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--' if $dest->{ip};
909 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
910
911 my $path = $dest->{all};
912 $path .= "/$source->{last_part}" if $source->{last_part};
913 $path .= "\@$source->{old_snap}";
914
915 push @$cmd, $path;
916
917
918 my $text = "";
919 eval {$text =run_cmd($cmd);};
920 if (my $erro =$@) {
921 warn "WARN: $erro";
922 return undef;
923 }
924
925 while ($text && $text =~ s/^(.*?)(\n|$)//) {
926 my $line =$1;
927 return 1 if $line =~ m/^.*$source->{old_snap}$/;
928 }
929 }
930
931 sub send_image {
932 my ($source, $dest, $param) = @_;
933
934 my $cmd = [];
935
936 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
937 push @$cmd, 'zfs', 'send';
938 push @$cmd, '-p', if $param->{properties};
939 push @$cmd, '-v' if $param->{verbose};
940
941 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{dest_user})) {
942 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
943 }
944 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
945
946 if ($param->{limit}){
947 my $bwl = $param->{limit}*1024;
948 push @$cmd, \'|', 'cstream', '-t', $bwl;
949 }
950 my $target = "$dest->{all}";
951 $target .= "/$source->{last_part}" if $source->{last_part};
952 $target =~ s!/+!/!g;
953
954 push @$cmd, \'|';
955 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
956 push @$cmd, 'zfs', 'recv', '-F', '--';
957 push @$cmd, "$target";
958
959 eval {
960 run_cmd($cmd)
961 };
962
963 if (my $erro = $@) {
964 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
965 die $erro;
966 };
967 }
968
969
970 sub send_config{
971 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
972
973 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
974 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
975
976 my $config_dir = $dest_config_path // $CONFIG_PATH;
977 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
978
979 $dest_target_new = $config_dir.'/'.$dest_target_new;
980
981 if ($method eq 'ssh'){
982 if ($dest->{ip} && $source->{ip}) {
983 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
984 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
985 } elsif ($dest->{ip}) {
986 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
987 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
988 } elsif ($source->{ip}) {
989 run_cmd(['mkdir', '-p', '--', $config_dir]);
990 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
991 }
992
993 if ($source->{destroy}){
994 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
995 if($dest->{ip}){
996 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
997 } else {
998 run_cmd(['rm', '-f', '--', $dest_target_old]);
999 }
1000 }
1001 } elsif ($method eq 'local') {
1002 run_cmd(['mkdir', '-p', '--', $config_dir]);
1003 run_cmd(['cp', $source_target, $dest_target_new]);
1004 }
1005 }
1006
1007 sub get_date {
1008 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1009 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1010
1011 return $datestamp;
1012 }
1013
1014 sub status {
1015 my $cfg = read_cron();
1016
1017 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1018
1019 my $states = read_state();
1020
1021 foreach my $source (sort keys%{$cfg}) {
1022 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1023 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1024 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1025 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1026 }
1027 }
1028
1029 return $status_list;
1030 }
1031
1032 sub enable_job {
1033 my ($param) = @_;
1034
1035 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1036 my $job = get_job($param);
1037 $job->{state} = "ok";
1038 update_state($job);
1039 update_cron($job);
1040 });
1041 }
1042
1043 sub disable_job {
1044 my ($param) = @_;
1045
1046 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1047 my $job = get_job($param);
1048 $job->{state} = "stopped";
1049 update_state($job);
1050 update_cron($job);
1051 });
1052 }
1053
1054 my $cmd_help = {
1055 destroy => qq{
1056 $PROGNAME destroy -source <string> [OPTIONS]
1057
1058 remove a sync Job from the scheduler
1059
1060 -name string
1061
1062 name of the sync job, if not set it is default
1063
1064 -source string
1065
1066 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1067 },
1068 create => qq{
1069 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1070
1071 Create a sync Job
1072
1073 -dest string
1074
1075 the destination target is like [IP]:<Pool>[/Path]
1076
1077 -dest-user string
1078
1079 name of the user on the destination target, root by default
1080
1081 -limit integer
1082
1083 max sync speed in kBytes/s, default unlimited
1084
1085 -maxsnap string
1086
1087 how much snapshots will be kept before get erased, default 1
1088
1089 -name string
1090
1091 name of the sync job, if not set it is default
1092
1093 -skip boolean
1094
1095 if this flag is set it will skip the first sync
1096
1097 -source string
1098
1099 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1100
1101 -source-user string
1102
1103 name of the user on the source target, root by default
1104
1105 -properties boolean
1106
1107 Include the dataset's properties in the stream.
1108
1109 -dest-config-path string
1110
1111 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1112 },
1113 sync => qq{
1114 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1115
1116 will sync one time
1117
1118 -dest string
1119
1120 the destination target is like [IP:]<Pool>[/Path]
1121
1122 -dest-user string
1123
1124 name of the user on the destination target, root by default
1125
1126 -limit integer
1127
1128 max sync speed in kBytes/s, default unlimited
1129
1130 -maxsnap integer
1131
1132 how much snapshots will be kept before get erased, default 1
1133
1134 -name string
1135
1136 name of the sync job, if not set it is default.
1137 It is only necessary if scheduler allready contains this source.
1138
1139 -source string
1140
1141 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1142
1143 -source-user string
1144
1145 name of the user on the source target, root by default
1146
1147 -verbose boolean
1148
1149 print out the sync progress.
1150
1151 -properties boolean
1152
1153 Include the dataset's properties in the stream.
1154
1155 -dest-config-path string
1156
1157 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1158 },
1159 list => qq{
1160 $PROGNAME list
1161
1162 Get a List of all scheduled Sync Jobs
1163 },
1164 status => qq{
1165 $PROGNAME status
1166
1167 Get the status of all scheduled Sync Jobs
1168 },
1169 help => qq{
1170 $PROGNAME help <cmd> [OPTIONS]
1171
1172 Get help about specified command.
1173
1174 <cmd> string
1175
1176 Command name
1177
1178 -verbose boolean
1179
1180 Verbose output format.
1181 },
1182 enable => qq{
1183 $PROGNAME enable -source <string> [OPTIONS]
1184
1185 enable a syncjob and reset error
1186
1187 -name string
1188
1189 name of the sync job, if not set it is default
1190
1191 -source string
1192
1193 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1194 },
1195 disable => qq{
1196 $PROGNAME disable -source <string> [OPTIONS]
1197
1198 pause a sync job
1199
1200 -name string
1201
1202 name of the sync job, if not set it is default
1203
1204 -source string
1205
1206 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1207 },
1208 printpod => 'internal command',
1209
1210 };
1211
1212 if (!$command) {
1213 usage(); die "\n";
1214 } elsif (!$cmd_help->{$command}) {
1215 print "ERROR: unknown command '$command'";
1216 usage(1); die "\n";
1217 }
1218
1219 my @arg = @ARGV;
1220 my $param = parse_argv(@arg);
1221
1222 sub check_params {
1223 for (@_) {
1224 die "$cmd_help->{$command}\n" if !$param->{$_};
1225 }
1226 }
1227
1228 if ($command eq 'destroy') {
1229 check_params(qw(source));
1230
1231 check_target($param->{source});
1232 destroy_job($param);
1233
1234 } elsif ($command eq 'sync') {
1235 check_params(qw(source dest));
1236
1237 check_target($param->{source});
1238 check_target($param->{dest});
1239 sync($param);
1240
1241 } elsif ($command eq 'create') {
1242 check_params(qw(source dest));
1243
1244 check_target($param->{source});
1245 check_target($param->{dest});
1246 init($param);
1247
1248 } elsif ($command eq 'status') {
1249 print status();
1250
1251 } elsif ($command eq 'list') {
1252 print list();
1253
1254 } elsif ($command eq 'help') {
1255 my $help_command = $ARGV[1];
1256
1257 if ($help_command && $cmd_help->{$help_command}) {
1258 die "$cmd_help->{$help_command}\n";
1259
1260 }
1261 if ($param->{verbose}) {
1262 exec("man $PROGNAME");
1263
1264 } else {
1265 usage(1);
1266
1267 }
1268
1269 } elsif ($command eq 'enable') {
1270 check_params(qw(source));
1271
1272 check_target($param->{source});
1273 enable_job($param);
1274
1275 } elsif ($command eq 'disable') {
1276 check_params(qw(source));
1277
1278 check_target($param->{source});
1279 disable_job($param);
1280
1281 } elsif ($command eq 'printpod') {
1282 print_pod();
1283 }
1284
1285 sub usage {
1286 my ($help) = @_;
1287
1288 print("ERROR:\tno command specified\n") if !$help;
1289 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1290 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1291 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1292 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1293 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1294 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1295 print("\t$PROGNAME list\n");
1296 print("\t$PROGNAME status\n");
1297 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1298 }
1299
1300 sub check_target {
1301 my ($target) = @_;
1302 parse_target($target);
1303 }
1304
1305 sub print_pod {
1306
1307 my $synopsis = join("\n", sort values %$cmd_help);
1308
1309 print <<EOF;
1310 =head1 NAME
1311
1312 pve-zsync - PVE ZFS Replication Manager
1313
1314 =head1 SYNOPSIS
1315
1316 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1317
1318 $synopsis
1319
1320 =head1 DESCRIPTION
1321
1322 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1323 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1324 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1325 To config cron see man crontab.
1326
1327 =head2 PVE ZFS Storage sync Tool
1328
1329 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1330
1331 =head1 EXAMPLES
1332
1333 add sync job from local VM to remote ZFS Server
1334 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1335
1336 =head1 IMPORTANT FILES
1337
1338 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1339
1340 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1341
1342 =head1 COPYRIGHT AND DISCLAIMER
1343
1344 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1345
1346 This program is free software: you can redistribute it and/or modify it
1347 under the terms of the GNU Affero General Public License as published
1348 by the Free Software Foundation, either version 3 of the License, or
1349 (at your option) any later version.
1350
1351 This program is distributed in the hope that it will be useful, but
1352 WITHOUT ANY WARRANTY; without even the implied warranty of
1353 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1354 Affero General Public License for more details.
1355
1356 You should have received a copy of the GNU Affero General Public
1357 License along with this program. If not, see
1358 <http://www.gnu.org/licenses/>.
1359
1360 EOF
1361 }