]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
Detect other running instances of the same job
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $PROG_PATH = "$PATH/${PROGNAME}";
22 my $INTERVAL = 15;
23 my $DEBUG;
24
25 BEGIN {
26 $DEBUG = 0; # change default here. not above on declaration!
27 $DEBUG ||= $ENV{ZSYNC_DEBUG};
28 if ($DEBUG) {
29 require Data::Dumper;
30 Data::Dumper->import();
31 }
32 }
33
34 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
35 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
36 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
37 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
38
39 my $IPV6RE = "(?:" .
40 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
41 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
42 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
43 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
49
50 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
51 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
52 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
53 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
54 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
55
56 my $command = $ARGV[0];
57
58 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
59 check_bin ('cstream');
60 check_bin ('zfs');
61 check_bin ('ssh');
62 check_bin ('scp');
63 }
64
65 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
66 die "Signaled, aborting sync: $!\n";
67 };
68
69 sub check_bin {
70 my ($bin) = @_;
71
72 foreach my $p (split (/:/, $ENV{PATH})) {
73 my $fn = "$p/$bin";
74 if (-x $fn) {
75 return $fn;
76 }
77 }
78
79 die "unable to find command '$bin'\n";
80 }
81
82 sub cut_target_width {
83 my ($path, $maxlen) = @_;
84 $path =~ s@/+@/@g;
85
86 return $path if length($path) <= $maxlen;
87
88 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
89
90 $path =~ s@/([^/]+/?)$@@;
91 my $tail = $1;
92
93 if (length($tail)+3 == $maxlen) {
94 return "../$tail";
95 } elsif (length($tail)+2 >= $maxlen) {
96 return '..'.substr($tail, -$maxlen+2)
97 }
98
99 $path =~ s@(/[^/]+)(?:/|$)@@;
100 my $head = $1;
101 my $both = length($head) + length($tail);
102 my $remaining = $maxlen-$both-4; # -4 for "/../"
103
104 if ($remaining < 0) {
105 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
106 }
107
108 substr($path, ($remaining/2), (length($path)-$remaining), '..');
109 return "$head/" . $path . "/$tail";
110 }
111
112 sub locked {
113 my ($lock_fn, $code) = @_;
114
115 my $lock_fh = IO::File->new("> $lock_fn");
116
117 flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
118 my $res = eval { $code->() };
119 my $err = $@;
120
121 flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
122 die "$err" if $err;
123
124 close($lock_fh);
125 return $res;
126 }
127
128 sub get_status {
129 my ($source, $name, $status) = @_;
130
131 if ($status->{$source->{all}}->{$name}->{status}) {
132 return $status;
133 }
134
135 return undef;
136 }
137
138 sub check_pool_exists {
139 my ($target, $user) = @_;
140
141 my $cmd = [];
142
143 if ($target->{ip}) {
144 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
145 }
146 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
147 eval {
148 run_cmd($cmd);
149 };
150
151 if ($@) {
152 return 0;
153 }
154 return 1;
155 }
156
157 sub parse_target {
158 my ($text) = @_;
159
160 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
161 my $target = {};
162
163 if ($text !~ $TARGETRE) {
164 die "$errstr\n";
165 }
166 $target->{all} = $2;
167 $target->{ip} = $1 if $1;
168 my @parts = split('/', $2);
169
170 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
171
172 my $pool = $target->{pool} = shift(@parts);
173 die "$errstr\n" if !$pool;
174
175 if ($pool =~ m/^\d+$/) {
176 $target->{vmid} = $pool;
177 delete $target->{pool};
178 }
179
180 return $target if (@parts == 0);
181 $target->{last_part} = pop(@parts);
182
183 if ($target->{ip}) {
184 pop(@parts);
185 }
186 if (@parts > 0) {
187 $target->{path} = join('/', @parts);
188 }
189
190 return $target;
191 }
192
193 sub read_cron {
194
195 #This is for the first use to init file;
196 if (!-e $CRONJOBS) {
197 my $new_fh = IO::File->new("> $CRONJOBS");
198 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
199 close($new_fh);
200 return undef;
201 }
202
203 my $fh = IO::File->new("< $CRONJOBS");
204 die "Could not open file $CRONJOBS: $!\n" if !$fh;
205
206 my @text = <$fh>;
207
208 close($fh);
209
210 return encode_cron(@text);
211 }
212
213 sub parse_argv {
214 my (@arg) = @_;
215
216 my $param = {
217 dest => undef,
218 source => undef,
219 verbose => undef,
220 limit => undef,
221 maxsnap => undef,
222 name => undef,
223 skip => undef,
224 method => undef,
225 source_user => undef,
226 dest_user => undef,
227 properties => undef,
228 dest_config_path => undef,
229 };
230
231 my ($ret) = GetOptionsFromArray(
232 \@arg,
233 'dest=s' => \$param->{dest},
234 'source=s' => \$param->{source},
235 'verbose' => \$param->{verbose},
236 'limit=i' => \$param->{limit},
237 'maxsnap=i' => \$param->{maxsnap},
238 'name=s' => \$param->{name},
239 'skip' => \$param->{skip},
240 'method=s' => \$param->{method},
241 'source-user=s' => \$param->{source_user},
242 'dest-user=s' => \$param->{dest_user},
243 'properties' => \$param->{properties},
244 'dest-config-path=s' => \$param->{dest_config_path},
245 );
246
247 die "can't parse options\n" if $ret == 0;
248
249 $param->{name} //= "default";
250 $param->{maxsnap} //= 1;
251 $param->{method} //= "ssh";
252 $param->{source_user} //= "root";
253 $param->{dest_user} //= "root";
254
255 return $param;
256 }
257
258 sub add_state_to_job {
259 my ($job) = @_;
260
261 my $states = read_state();
262 my $state = $states->{$job->{source}}->{$job->{name}};
263
264 $job->{state} = $state->{state};
265 $job->{lsync} = $state->{lsync};
266 $job->{vm_type} = $state->{vm_type};
267
268 for (my $i = 0; $state->{"snap$i"}; $i++) {
269 $job->{"snap$i"} = $state->{"snap$i"};
270 }
271
272 return $job;
273 }
274
275 sub encode_cron {
276 my (@text) = @_;
277
278 my $cfg = {};
279
280 while (my $line = shift(@text)) {
281
282 my @arg = split('\s', $line);
283 my $param = parse_argv(@arg);
284
285 if ($param->{source} && $param->{dest}) {
286 my $source = delete $param->{source};
287 my $name = delete $param->{name};
288
289 $cfg->{$source}->{$name} = $param;
290 }
291 }
292
293 return $cfg;
294 }
295
296 sub param_to_job {
297 my ($param) = @_;
298
299 my $job = {};
300
301 my $source = parse_target($param->{source});
302 my $dest = parse_target($param->{dest}) if $param->{dest};
303
304 $job->{name} = !$param->{name} ? "default" : $param->{name};
305 $job->{dest} = $param->{dest} if $param->{dest};
306 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
307 $job->{method} = "ssh" if !$job->{method};
308 $job->{limit} = $param->{limit};
309 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
310 $job->{source} = $param->{source};
311 $job->{source_user} = $param->{source_user};
312 $job->{dest_user} = $param->{dest_user};
313 $job->{properties} = !!$param->{properties};
314 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
315
316 return $job;
317 }
318
319 sub read_state {
320
321 if (!-e $STATE) {
322 make_path $CONFIG_PATH;
323 my $new_fh = IO::File->new("> $STATE");
324 die "Could not create $STATE: $!\n" if !$new_fh;
325 print $new_fh "{}";
326 close($new_fh);
327 return undef;
328 }
329
330 my $fh = IO::File->new("< $STATE");
331 die "Could not open file $STATE: $!\n" if !$fh;
332
333 my $text = <$fh>;
334 my $states = decode_json($text);
335
336 close($fh);
337
338 return $states;
339 }
340
341 sub update_state {
342 my ($job) = @_;
343 my $text;
344 my $in_fh;
345
346 eval {
347
348 $in_fh = IO::File->new("< $STATE");
349 die "Could not open file $STATE: $!\n" if !$in_fh;
350 $text = <$in_fh>;
351 };
352
353 my $out_fh = IO::File->new("> $STATE.new");
354 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
355
356 my $states = {};
357 my $state = {};
358 if ($text){
359 $states = decode_json($text);
360 $state = $states->{$job->{source}}->{$job->{name}};
361 }
362
363 if ($job->{state} ne "del") {
364 $state->{state} = $job->{state};
365 $state->{lsync} = $job->{lsync};
366 $state->{vm_type} = $job->{vm_type};
367
368 for (my $i = 0; $job->{"snap$i"} ; $i++) {
369 $state->{"snap$i"} = $job->{"snap$i"};
370 }
371 $states->{$job->{source}}->{$job->{name}} = $state;
372 } else {
373
374 delete $states->{$job->{source}}->{$job->{name}};
375 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
376 }
377
378 $text = encode_json($states);
379 print $out_fh $text;
380
381 close($out_fh);
382 rename "$STATE.new", $STATE;
383 eval {
384 close($in_fh);
385 };
386
387 return $states;
388 }
389
390 sub update_cron {
391 my ($job) = @_;
392
393 my $updated;
394 my $has_header;
395 my $line_no = 0;
396 my $text = "";
397 my $header = "SHELL=/bin/sh\n";
398 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
399
400 my $fh = IO::File->new("< $CRONJOBS");
401 die "Could not open file $CRONJOBS: $!\n" if !$fh;
402
403 my @test = <$fh>;
404
405 while (my $line = shift(@test)) {
406 chomp($line);
407 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
408 $updated = 1;
409 next if $job->{state} eq "del";
410 $text .= format_job($job, $line);
411 } else {
412 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
413 $has_header = 1;
414 }
415 $text .= "$line\n";
416 }
417 $line_no++;
418 }
419
420 if (!$has_header) {
421 $text = "$header$text";
422 }
423
424 if (!$updated) {
425 $text .= format_job($job);
426 }
427 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
428 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
429
430 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
431 close ($new_fh);
432
433 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
434 close ($fh);
435 }
436
437 sub format_job {
438 my ($job, $line) = @_;
439 my $text = "";
440
441 if ($job->{state} eq "stopped") {
442 $text = "#";
443 }
444 if ($line) {
445 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
446 $text .= $1;
447 } else {
448 $text .= "*/$INTERVAL * * * *";
449 }
450 $text .= " root";
451 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
452 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
453 $text .= " --limit $job->{limit}" if $job->{limit};
454 $text .= " --method $job->{method}";
455 $text .= " --verbose" if $job->{verbose};
456 $text .= " --source-user $job->{source_user}";
457 $text .= " --dest-user $job->{dest_user}";
458 $text .= " --properties" if $job->{properties};
459 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
460 $text .= "\n";
461
462 return $text;
463 }
464
465 sub list {
466
467 my $cfg = read_cron();
468
469 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
470
471 my $states = read_state();
472 foreach my $source (sort keys%{$cfg}) {
473 foreach my $name (sort keys%{$cfg->{$source}}) {
474 $list .= sprintf("%-25s", cut_target_width($source, 25));
475 $list .= sprintf("%-25s", cut_target_width($name, 25));
476 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
477 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
478 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
479 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
480 }
481 }
482
483 return $list;
484 }
485
486 sub vm_exists {
487 my ($target, $user) = @_;
488
489 return undef if !defined($target->{vmid});
490
491 my $conf_fn = "$target->{vmid}.conf";
492
493 if ($target->{ip}) {
494 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
495 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
496 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
497 } else {
498 return "qemu" if -f "$QEMU_CONF/$conf_fn";
499 return "lxc" if -f "$LXC_CONF/$conf_fn";
500 }
501
502 return undef;
503 }
504
505 sub init {
506 my ($param) = @_;
507
508 locked("$CONFIG_PATH/cron_and_state.lock", sub {
509 my $cfg = read_cron();
510
511 my $job = param_to_job($param);
512
513 $job->{state} = "ok";
514 $job->{lsync} = 0;
515
516 my $source = parse_target($param->{source});
517 my $dest = parse_target($param->{dest});
518
519 if (my $ip = $dest->{ip}) {
520 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
521 }
522
523 if (my $ip = $source->{ip}) {
524 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
525 }
526
527 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
528
529 if (!defined($source->{vmid})) {
530 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
531 }
532
533 my $vm_type = vm_exists($source, $param->{source_user});
534 $job->{vm_type} = $vm_type;
535 $source->{vm_type} = $vm_type;
536
537 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
538
539 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
540
541 #check if vm has zfs disks if not die;
542 get_disks($source, $param->{source_user}) if $source->{vmid};
543
544 update_cron($job);
545 update_state($job);
546 }); #cron and state lock
547
548 eval {
549 sync($param) if !$param->{skip};
550 };
551 if(my $err = $@) {
552 destroy_job($param);
553 print $err;
554 }
555 }
556
557 sub get_job {
558 my ($param) = @_;
559
560 my $cfg = read_cron();
561
562 if (!$cfg->{$param->{source}}->{$param->{name}}) {
563 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
564 }
565 my $job = $cfg->{$param->{source}}->{$param->{name}};
566 $job->{name} = $param->{name};
567 $job->{source} = $param->{source};
568 $job = add_state_to_job($job);
569
570 return $job;
571 }
572
573 sub destroy_job {
574 my ($param) = @_;
575
576 locked("$CONFIG_PATH/cron_and_state.lock", sub {
577 my $job = get_job($param);
578 $job->{state} = "del";
579
580 update_cron($job);
581 update_state($job);
582 });
583 }
584
585 sub sync {
586 my ($param) = @_;
587
588 my $job;
589
590 locked("$CONFIG_PATH/cron_and_state.lock", sub {
591 eval { $job = get_job($param) };
592
593 if ($job) {
594 if (defined($job->{state}) && ($job->{state} eq "syncing" || $job->{state} eq "waiting")) {
595 die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
596 }
597
598 $job->{state} = "waiting";
599 update_state($job);
600 }
601 });
602
603 locked("$CONFIG_PATH/sync.lock", sub {
604
605 my $date = get_date();
606
607 my $dest;
608 my $source;
609 my $vm_type;
610
611 locked("$CONFIG_PATH/cron_and_state.lock", sub {
612 #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
613 eval { $job = get_job($param); };
614
615 $dest = parse_target($param->{dest});
616 $source = parse_target($param->{source});
617
618 $vm_type = vm_exists($source, $param->{source_user});
619 $source->{vm_type} = $vm_type;
620
621 if ($job) {
622 $job->{state} = "syncing";
623 $job->{vm_type} = $vm_type if !$job->{vm_type};
624 update_state($job);
625 }
626 }); #cron and state lock
627
628 my $sync_path = sub {
629 my ($source, $dest, $job, $param, $date) = @_;
630
631 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
632
633 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
634
635 send_image($source, $dest, $param);
636
637 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
638
639 };
640
641 eval{
642 if ($source->{vmid}) {
643 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
644 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
645 my $disks = get_disks($source, $param->{source_user});
646
647 foreach my $disk (sort keys %{$disks}) {
648 $source->{all} = $disks->{$disk}->{all};
649 $source->{pool} = $disks->{$disk}->{pool};
650 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
651 $source->{last_part} = $disks->{$disk}->{last_part};
652 &$sync_path($source, $dest, $job, $param, $date);
653 }
654 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
655 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
656 } else {
657 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
658 }
659 } else {
660 &$sync_path($source, $dest, $job, $param, $date);
661 }
662 };
663 if (my $err = $@) {
664 locked("$CONFIG_PATH/cron_and_state.lock", sub {
665 eval { $job = get_job($param); };
666 if ($job) {
667 $job->{state} = "error";
668 update_state($job);
669 }
670 });
671 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
672 die "$err\n";
673 }
674
675 locked("$CONFIG_PATH/cron_and_state.lock", sub {
676 eval { $job = get_job($param); };
677 if ($job) {
678 $job->{state} = "ok";
679 $job->{lsync} = $date;
680 update_state($job);
681 }
682 });
683 }); #sync lock
684 }
685
686 sub snapshot_get{
687 my ($source, $dest, $max_snap, $name, $source_user) = @_;
688
689 my $cmd = [];
690 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
691 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
692 push @$cmd, $source->{all};
693
694 my $raw = run_cmd($cmd);
695 my $index = 0;
696 my $line = "";
697 my $last_snap = undef;
698 my $old_snap;
699
700 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
701 $line = $1;
702 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
703
704 $last_snap = $1 if (!$last_snap);
705 $old_snap = $1;
706 $index++;
707 if ($index == $max_snap) {
708 $source->{destroy} = 1;
709 last;
710 };
711 }
712 }
713
714 return ($old_snap, $last_snap) if $last_snap;
715
716 return undef;
717 }
718
719 sub snapshot_add {
720 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
721
722 my $snap_name = "rep_$name\_".$date;
723
724 $source->{new_snap} = $snap_name;
725
726 my $path = "$source->{all}\@$snap_name";
727
728 my $cmd = [];
729 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
730 push @$cmd, 'zfs', 'snapshot', $path;
731 eval{
732 run_cmd($cmd);
733 };
734
735 if (my $err = $@) {
736 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
737 die "$err\n";
738 }
739 }
740
741 sub write_cron {
742 my ($cfg) = @_;
743
744 my $text = "SHELL=/bin/sh\n";
745 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
746
747 my $fh = IO::File->new("> $CRONJOBS");
748 die "Could not open file: $!\n" if !$fh;
749
750 foreach my $source (sort keys%{$cfg}) {
751 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
752 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
753 $text .= "$PROG_PATH sync";
754 $text .= " -source ";
755 if ($cfg->{$source}->{$sync_name}->{vmid}) {
756 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
757 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
758 } else {
759 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
760 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
761 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
762 }
763 $text .= " -dest ";
764 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
765 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
766 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
767 $text .= " -name $sync_name ";
768 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
769 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
770 $text .= "\n";
771 }
772 }
773 die "Can't write to cron\n" if (!print($fh $text));
774 close($fh);
775 }
776
777 sub get_disks {
778 my ($target, $user) = @_;
779
780 my $cmd = [];
781 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
782
783 if ($target->{vm_type} eq 'qemu') {
784 push @$cmd, 'qm', 'config', $target->{vmid};
785 } elsif ($target->{vm_type} eq 'lxc') {
786 push @$cmd, 'pct', 'config', $target->{vmid};
787 } else {
788 die "VM Type unknown\n";
789 }
790
791 my $res = run_cmd($cmd);
792
793 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
794
795 return $disks;
796 }
797
798 sub run_cmd {
799 my ($cmd) = @_;
800 print "Start CMD\n" if $DEBUG;
801 print Dumper $cmd if $DEBUG;
802 if (ref($cmd) eq 'ARRAY') {
803 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
804 }
805 my $output = `$cmd 2>&1`;
806
807 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
808
809 chomp($output);
810 print Dumper $output if $DEBUG;
811 print "END CMD\n" if $DEBUG;
812 return $output;
813 }
814
815 sub parse_disks {
816 my ($text, $ip, $vm_type, $user) = @_;
817
818 my $disks;
819
820 my $num = 0;
821 while ($text && $text =~ s/^(.*?)(\n|$)//) {
822 my $line = $1;
823
824 next if $line =~ /media=cdrom/;
825 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
826
827 #QEMU if backup is not set include in sync
828 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
829
830 #LXC if backup is not set do no in sync
831 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
832
833 my $disk = undef;
834 my $stor = undef;
835 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
836 my @parameter = split(/,/,$1);
837
838 foreach my $opt (@parameter) {
839 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
840 $disk = $2;
841 $stor = $1;
842 last;
843 }
844 }
845 }
846 if (!defined($disk) || !defined($stor)) {
847 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
848 next;
849 }
850
851 my $cmd = [];
852 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
853 push @$cmd, 'pvesm', 'path', "$stor$disk";
854 my $path = run_cmd($cmd);
855
856 die "Get no path from pvesm path $stor$disk\n" if !$path;
857
858 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
859
860 my @array = split('/', $1);
861 $disks->{$num}->{pool} = shift(@array);
862 $disks->{$num}->{all} = $disks->{$num}->{pool};
863 if (0 < @array) {
864 $disks->{$num}->{path} = join('/', @array);
865 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
866 }
867 $disks->{$num}->{last_part} = $disk;
868 $disks->{$num}->{all} .= "\/$disk";
869
870 $num++;
871 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
872
873 $disks->{$num}->{pool} = $1;
874 $disks->{$num}->{all} = $disks->{$num}->{pool};
875
876 if ($2) {
877 $disks->{$num}->{path} = $3;
878 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
879 }
880
881 $disks->{$num}->{last_part} = $disk;
882 $disks->{$num}->{all} .= "\/$disk";
883
884 $num++;
885
886 } else {
887 die "ERROR: in path\n";
888 }
889 }
890
891 die "Vm include no disk on zfs.\n" if !$disks->{0};
892 return $disks;
893 }
894
895 sub snapshot_destroy {
896 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
897
898 my @zfscmd = ('zfs', 'destroy');
899 my $snapshot = "$source->{all}\@$snap";
900
901 eval {
902 if($source->{ip} && $method eq 'ssh'){
903 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
904 } else {
905 run_cmd([@zfscmd, $snapshot]);
906 }
907 };
908 if (my $erro = $@) {
909 warn "WARN: $erro";
910 }
911 if ($dest) {
912 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
913
914 my $path = "$dest->{all}";
915 $path .= "/$source->{last_part}" if $source->{last_part};
916
917 eval {
918 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
919 };
920 if (my $erro = $@) {
921 warn "WARN: $erro";
922 }
923 }
924 }
925
926 sub snapshot_exist {
927 my ($source , $dest, $method, $dest_user) = @_;
928
929 my $cmd = [];
930 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--' if $dest->{ip};
931 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
932
933 my $path = $dest->{all};
934 $path .= "/$source->{last_part}" if $source->{last_part};
935 $path .= "\@$source->{old_snap}";
936
937 push @$cmd, $path;
938
939
940 my $text = "";
941 eval {$text =run_cmd($cmd);};
942 if (my $erro =$@) {
943 warn "WARN: $erro";
944 return undef;
945 }
946
947 while ($text && $text =~ s/^(.*?)(\n|$)//) {
948 my $line =$1;
949 return 1 if $line =~ m/^.*$source->{old_snap}$/;
950 }
951 }
952
953 sub send_image {
954 my ($source, $dest, $param) = @_;
955
956 my $cmd = [];
957
958 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
959 push @$cmd, 'zfs', 'send';
960 push @$cmd, '-p', if $param->{properties};
961 push @$cmd, '-v' if $param->{verbose};
962
963 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{dest_user})) {
964 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
965 }
966 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
967
968 if ($param->{limit}){
969 my $bwl = $param->{limit}*1024;
970 push @$cmd, \'|', 'cstream', '-t', $bwl;
971 }
972 my $target = "$dest->{all}";
973 $target .= "/$source->{last_part}" if $source->{last_part};
974 $target =~ s!/+!/!g;
975
976 push @$cmd, \'|';
977 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
978 push @$cmd, 'zfs', 'recv', '-F', '--';
979 push @$cmd, "$target";
980
981 eval {
982 run_cmd($cmd)
983 };
984
985 if (my $erro = $@) {
986 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
987 die $erro;
988 };
989 }
990
991
992 sub send_config{
993 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
994
995 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
996 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
997
998 my $config_dir = $dest_config_path // $CONFIG_PATH;
999 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
1000
1001 $dest_target_new = $config_dir.'/'.$dest_target_new;
1002
1003 if ($method eq 'ssh'){
1004 if ($dest->{ip} && $source->{ip}) {
1005 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1006 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1007 } elsif ($dest->{ip}) {
1008 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1009 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1010 } elsif ($source->{ip}) {
1011 run_cmd(['mkdir', '-p', '--', $config_dir]);
1012 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
1013 }
1014
1015 if ($source->{destroy}){
1016 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
1017 if($dest->{ip}){
1018 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
1019 } else {
1020 run_cmd(['rm', '-f', '--', $dest_target_old]);
1021 }
1022 }
1023 } elsif ($method eq 'local') {
1024 run_cmd(['mkdir', '-p', '--', $config_dir]);
1025 run_cmd(['cp', $source_target, $dest_target_new]);
1026 }
1027 }
1028
1029 sub get_date {
1030 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1031 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1032
1033 return $datestamp;
1034 }
1035
1036 sub status {
1037 my $cfg = read_cron();
1038
1039 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1040
1041 my $states = read_state();
1042
1043 foreach my $source (sort keys%{$cfg}) {
1044 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1045 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1046 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1047 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1048 }
1049 }
1050
1051 return $status_list;
1052 }
1053
1054 sub enable_job {
1055 my ($param) = @_;
1056
1057 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1058 my $job = get_job($param);
1059 $job->{state} = "ok";
1060 update_state($job);
1061 update_cron($job);
1062 });
1063 }
1064
1065 sub disable_job {
1066 my ($param) = @_;
1067
1068 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1069 my $job = get_job($param);
1070 $job->{state} = "stopped";
1071 update_state($job);
1072 update_cron($job);
1073 });
1074 }
1075
1076 my $cmd_help = {
1077 destroy => qq{
1078 $PROGNAME destroy -source <string> [OPTIONS]
1079
1080 remove a sync Job from the scheduler
1081
1082 -name string
1083
1084 name of the sync job, if not set it is default
1085
1086 -source string
1087
1088 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1089 },
1090 create => qq{
1091 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1092
1093 Create a sync Job
1094
1095 -dest string
1096
1097 the destination target is like [IP]:<Pool>[/Path]
1098
1099 -dest-user string
1100
1101 name of the user on the destination target, root by default
1102
1103 -limit integer
1104
1105 max sync speed in kBytes/s, default unlimited
1106
1107 -maxsnap string
1108
1109 how much snapshots will be kept before get erased, default 1
1110
1111 -name string
1112
1113 name of the sync job, if not set it is default
1114
1115 -skip boolean
1116
1117 if this flag is set it will skip the first sync
1118
1119 -source string
1120
1121 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1122
1123 -source-user string
1124
1125 name of the user on the source target, root by default
1126
1127 -properties boolean
1128
1129 Include the dataset's properties in the stream.
1130
1131 -dest-config-path string
1132
1133 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1134 },
1135 sync => qq{
1136 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1137
1138 will sync one time
1139
1140 -dest string
1141
1142 the destination target is like [IP:]<Pool>[/Path]
1143
1144 -dest-user string
1145
1146 name of the user on the destination target, root by default
1147
1148 -limit integer
1149
1150 max sync speed in kBytes/s, default unlimited
1151
1152 -maxsnap integer
1153
1154 how much snapshots will be kept before get erased, default 1
1155
1156 -name string
1157
1158 name of the sync job, if not set it is default.
1159 It is only necessary if scheduler allready contains this source.
1160
1161 -source string
1162
1163 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1164
1165 -source-user string
1166
1167 name of the user on the source target, root by default
1168
1169 -verbose boolean
1170
1171 print out the sync progress.
1172
1173 -properties boolean
1174
1175 Include the dataset's properties in the stream.
1176
1177 -dest-config-path string
1178
1179 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1180 },
1181 list => qq{
1182 $PROGNAME list
1183
1184 Get a List of all scheduled Sync Jobs
1185 },
1186 status => qq{
1187 $PROGNAME status
1188
1189 Get the status of all scheduled Sync Jobs
1190 },
1191 help => qq{
1192 $PROGNAME help <cmd> [OPTIONS]
1193
1194 Get help about specified command.
1195
1196 <cmd> string
1197
1198 Command name
1199
1200 -verbose boolean
1201
1202 Verbose output format.
1203 },
1204 enable => qq{
1205 $PROGNAME enable -source <string> [OPTIONS]
1206
1207 enable a syncjob and reset error
1208
1209 -name string
1210
1211 name of the sync job, if not set it is default
1212
1213 -source string
1214
1215 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1216 },
1217 disable => qq{
1218 $PROGNAME disable -source <string> [OPTIONS]
1219
1220 pause a sync job
1221
1222 -name string
1223
1224 name of the sync job, if not set it is default
1225
1226 -source string
1227
1228 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1229 },
1230 printpod => 'internal command',
1231
1232 };
1233
1234 if (!$command) {
1235 usage(); die "\n";
1236 } elsif (!$cmd_help->{$command}) {
1237 print "ERROR: unknown command '$command'";
1238 usage(1); die "\n";
1239 }
1240
1241 my @arg = @ARGV;
1242 my $param = parse_argv(@arg);
1243
1244 sub check_params {
1245 for (@_) {
1246 die "$cmd_help->{$command}\n" if !$param->{$_};
1247 }
1248 }
1249
1250 if ($command eq 'destroy') {
1251 check_params(qw(source));
1252
1253 check_target($param->{source});
1254 destroy_job($param);
1255
1256 } elsif ($command eq 'sync') {
1257 check_params(qw(source dest));
1258
1259 check_target($param->{source});
1260 check_target($param->{dest});
1261 sync($param);
1262
1263 } elsif ($command eq 'create') {
1264 check_params(qw(source dest));
1265
1266 check_target($param->{source});
1267 check_target($param->{dest});
1268 init($param);
1269
1270 } elsif ($command eq 'status') {
1271 print status();
1272
1273 } elsif ($command eq 'list') {
1274 print list();
1275
1276 } elsif ($command eq 'help') {
1277 my $help_command = $ARGV[1];
1278
1279 if ($help_command && $cmd_help->{$help_command}) {
1280 die "$cmd_help->{$help_command}\n";
1281
1282 }
1283 if ($param->{verbose}) {
1284 exec("man $PROGNAME");
1285
1286 } else {
1287 usage(1);
1288
1289 }
1290
1291 } elsif ($command eq 'enable') {
1292 check_params(qw(source));
1293
1294 check_target($param->{source});
1295 enable_job($param);
1296
1297 } elsif ($command eq 'disable') {
1298 check_params(qw(source));
1299
1300 check_target($param->{source});
1301 disable_job($param);
1302
1303 } elsif ($command eq 'printpod') {
1304 print_pod();
1305 }
1306
1307 sub usage {
1308 my ($help) = @_;
1309
1310 print("ERROR:\tno command specified\n") if !$help;
1311 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1312 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1313 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1314 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1315 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1316 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1317 print("\t$PROGNAME list\n");
1318 print("\t$PROGNAME status\n");
1319 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1320 }
1321
1322 sub check_target {
1323 my ($target) = @_;
1324 parse_target($target);
1325 }
1326
1327 sub print_pod {
1328
1329 my $synopsis = join("\n", sort values %$cmd_help);
1330
1331 print <<EOF;
1332 =head1 NAME
1333
1334 pve-zsync - PVE ZFS Replication Manager
1335
1336 =head1 SYNOPSIS
1337
1338 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1339
1340 $synopsis
1341
1342 =head1 DESCRIPTION
1343
1344 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1345 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1346 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1347 To config cron see man crontab.
1348
1349 =head2 PVE ZFS Storage sync Tool
1350
1351 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1352
1353 =head1 EXAMPLES
1354
1355 add sync job from local VM to remote ZFS Server
1356 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1357
1358 =head1 IMPORTANT FILES
1359
1360 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1361
1362 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1363
1364 =head1 COPYRIGHT AND DISCLAIMER
1365
1366 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1367
1368 This program is free software: you can redistribute it and/or modify it
1369 under the terms of the GNU Affero General Public License as published
1370 by the Free Software Foundation, either version 3 of the License, or
1371 (at your option) any later version.
1372
1373 This program is distributed in the hope that it will be useful, but
1374 WITHOUT ANY WARRANTY; without even the implied warranty of
1375 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1376 Affero General Public License for more details.
1377
1378 You should have received a copy of the GNU Affero General Public
1379 License along with this program. If not, see
1380 <http://www.gnu.org/licenses/>.
1381
1382 EOF
1383 }