]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
Check whether job has been disabled while waiting/syncing
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $PROG_PATH = "$PATH/${PROGNAME}";
22 my $INTERVAL = 15;
23 my $DEBUG;
24
25 BEGIN {
26 $DEBUG = 0; # change default here. not above on declaration!
27 $DEBUG ||= $ENV{ZSYNC_DEBUG};
28 if ($DEBUG) {
29 require Data::Dumper;
30 Data::Dumper->import();
31 }
32 }
33
34 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
35 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
36 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
37 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
38
39 my $IPV6RE = "(?:" .
40 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
41 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
42 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
43 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
49
50 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
51 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
52 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
53 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
54 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
55
56 my $command = $ARGV[0];
57
58 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
59 check_bin ('cstream');
60 check_bin ('zfs');
61 check_bin ('ssh');
62 check_bin ('scp');
63 }
64
65 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
66 die "Signaled, aborting sync: $!\n";
67 };
68
69 sub check_bin {
70 my ($bin) = @_;
71
72 foreach my $p (split (/:/, $ENV{PATH})) {
73 my $fn = "$p/$bin";
74 if (-x $fn) {
75 return $fn;
76 }
77 }
78
79 die "unable to find command '$bin'\n";
80 }
81
82 sub cut_target_width {
83 my ($path, $maxlen) = @_;
84 $path =~ s@/+@/@g;
85
86 return $path if length($path) <= $maxlen;
87
88 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
89
90 $path =~ s@/([^/]+/?)$@@;
91 my $tail = $1;
92
93 if (length($tail)+3 == $maxlen) {
94 return "../$tail";
95 } elsif (length($tail)+2 >= $maxlen) {
96 return '..'.substr($tail, -$maxlen+2)
97 }
98
99 $path =~ s@(/[^/]+)(?:/|$)@@;
100 my $head = $1;
101 my $both = length($head) + length($tail);
102 my $remaining = $maxlen-$both-4; # -4 for "/../"
103
104 if ($remaining < 0) {
105 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
106 }
107
108 substr($path, ($remaining/2), (length($path)-$remaining), '..');
109 return "$head/" . $path . "/$tail";
110 }
111
112 sub locked {
113 my ($lock_fn, $code) = @_;
114
115 my $lock_fh = IO::File->new("> $lock_fn");
116
117 flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
118 my $res = eval { $code->() };
119 my $err = $@;
120
121 flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
122 die "$err" if $err;
123
124 close($lock_fh);
125 return $res;
126 }
127
128 sub get_status {
129 my ($source, $name, $status) = @_;
130
131 if ($status->{$source->{all}}->{$name}->{status}) {
132 return $status;
133 }
134
135 return undef;
136 }
137
138 sub check_pool_exists {
139 my ($target, $user) = @_;
140
141 my $cmd = [];
142
143 if ($target->{ip}) {
144 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
145 }
146 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
147 eval {
148 run_cmd($cmd);
149 };
150
151 if ($@) {
152 return 0;
153 }
154 return 1;
155 }
156
157 sub parse_target {
158 my ($text) = @_;
159
160 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
161 my $target = {};
162
163 if ($text !~ $TARGETRE) {
164 die "$errstr\n";
165 }
166 $target->{all} = $2;
167 $target->{ip} = $1 if $1;
168 my @parts = split('/', $2);
169
170 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
171
172 my $pool = $target->{pool} = shift(@parts);
173 die "$errstr\n" if !$pool;
174
175 if ($pool =~ m/^\d+$/) {
176 $target->{vmid} = $pool;
177 delete $target->{pool};
178 }
179
180 return $target if (@parts == 0);
181 $target->{last_part} = pop(@parts);
182
183 if ($target->{ip}) {
184 pop(@parts);
185 }
186 if (@parts > 0) {
187 $target->{path} = join('/', @parts);
188 }
189
190 return $target;
191 }
192
193 sub read_cron {
194
195 #This is for the first use to init file;
196 if (!-e $CRONJOBS) {
197 my $new_fh = IO::File->new("> $CRONJOBS");
198 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
199 close($new_fh);
200 return undef;
201 }
202
203 my $fh = IO::File->new("< $CRONJOBS");
204 die "Could not open file $CRONJOBS: $!\n" if !$fh;
205
206 my @text = <$fh>;
207
208 close($fh);
209
210 return encode_cron(@text);
211 }
212
213 sub parse_argv {
214 my (@arg) = @_;
215
216 my $param = {
217 dest => undef,
218 source => undef,
219 verbose => undef,
220 limit => undef,
221 maxsnap => undef,
222 name => undef,
223 skip => undef,
224 method => undef,
225 source_user => undef,
226 dest_user => undef,
227 properties => undef,
228 dest_config_path => undef,
229 };
230
231 my ($ret) = GetOptionsFromArray(
232 \@arg,
233 'dest=s' => \$param->{dest},
234 'source=s' => \$param->{source},
235 'verbose' => \$param->{verbose},
236 'limit=i' => \$param->{limit},
237 'maxsnap=i' => \$param->{maxsnap},
238 'name=s' => \$param->{name},
239 'skip' => \$param->{skip},
240 'method=s' => \$param->{method},
241 'source-user=s' => \$param->{source_user},
242 'dest-user=s' => \$param->{dest_user},
243 'properties' => \$param->{properties},
244 'dest-config-path=s' => \$param->{dest_config_path},
245 );
246
247 die "can't parse options\n" if $ret == 0;
248
249 $param->{name} //= "default";
250 $param->{maxsnap} //= 1;
251 $param->{method} //= "ssh";
252 $param->{source_user} //= "root";
253 $param->{dest_user} //= "root";
254
255 return $param;
256 }
257
258 sub add_state_to_job {
259 my ($job) = @_;
260
261 my $states = read_state();
262 my $state = $states->{$job->{source}}->{$job->{name}};
263
264 $job->{state} = $state->{state};
265 $job->{lsync} = $state->{lsync};
266 $job->{vm_type} = $state->{vm_type};
267
268 for (my $i = 0; $state->{"snap$i"}; $i++) {
269 $job->{"snap$i"} = $state->{"snap$i"};
270 }
271
272 return $job;
273 }
274
275 sub encode_cron {
276 my (@text) = @_;
277
278 my $cfg = {};
279
280 while (my $line = shift(@text)) {
281
282 my @arg = split('\s', $line);
283 my $param = parse_argv(@arg);
284
285 if ($param->{source} && $param->{dest}) {
286 my $source = delete $param->{source};
287 my $name = delete $param->{name};
288
289 $cfg->{$source}->{$name} = $param;
290 }
291 }
292
293 return $cfg;
294 }
295
296 sub param_to_job {
297 my ($param) = @_;
298
299 my $job = {};
300
301 my $source = parse_target($param->{source});
302 my $dest = parse_target($param->{dest}) if $param->{dest};
303
304 $job->{name} = !$param->{name} ? "default" : $param->{name};
305 $job->{dest} = $param->{dest} if $param->{dest};
306 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
307 $job->{method} = "ssh" if !$job->{method};
308 $job->{limit} = $param->{limit};
309 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
310 $job->{source} = $param->{source};
311 $job->{source_user} = $param->{source_user};
312 $job->{dest_user} = $param->{dest_user};
313 $job->{properties} = !!$param->{properties};
314 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
315
316 return $job;
317 }
318
319 sub read_state {
320
321 if (!-e $STATE) {
322 make_path $CONFIG_PATH;
323 my $new_fh = IO::File->new("> $STATE");
324 die "Could not create $STATE: $!\n" if !$new_fh;
325 print $new_fh "{}";
326 close($new_fh);
327 return undef;
328 }
329
330 my $fh = IO::File->new("< $STATE");
331 die "Could not open file $STATE: $!\n" if !$fh;
332
333 my $text = <$fh>;
334 my $states = decode_json($text);
335
336 close($fh);
337
338 return $states;
339 }
340
341 sub update_state {
342 my ($job) = @_;
343 my $text;
344 my $in_fh;
345
346 eval {
347
348 $in_fh = IO::File->new("< $STATE");
349 die "Could not open file $STATE: $!\n" if !$in_fh;
350 $text = <$in_fh>;
351 };
352
353 my $out_fh = IO::File->new("> $STATE.new");
354 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
355
356 my $states = {};
357 my $state = {};
358 if ($text){
359 $states = decode_json($text);
360 $state = $states->{$job->{source}}->{$job->{name}};
361 }
362
363 if ($job->{state} ne "del") {
364 $state->{state} = $job->{state};
365 $state->{lsync} = $job->{lsync};
366 $state->{vm_type} = $job->{vm_type};
367
368 for (my $i = 0; $job->{"snap$i"} ; $i++) {
369 $state->{"snap$i"} = $job->{"snap$i"};
370 }
371 $states->{$job->{source}}->{$job->{name}} = $state;
372 } else {
373
374 delete $states->{$job->{source}}->{$job->{name}};
375 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
376 }
377
378 $text = encode_json($states);
379 print $out_fh $text;
380
381 close($out_fh);
382 rename "$STATE.new", $STATE;
383 eval {
384 close($in_fh);
385 };
386
387 return $states;
388 }
389
390 sub update_cron {
391 my ($job) = @_;
392
393 my $updated;
394 my $has_header;
395 my $line_no = 0;
396 my $text = "";
397 my $header = "SHELL=/bin/sh\n";
398 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
399
400 my $fh = IO::File->new("< $CRONJOBS");
401 die "Could not open file $CRONJOBS: $!\n" if !$fh;
402
403 my @test = <$fh>;
404
405 while (my $line = shift(@test)) {
406 chomp($line);
407 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
408 $updated = 1;
409 next if $job->{state} eq "del";
410 $text .= format_job($job, $line);
411 } else {
412 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
413 $has_header = 1;
414 }
415 $text .= "$line\n";
416 }
417 $line_no++;
418 }
419
420 if (!$has_header) {
421 $text = "$header$text";
422 }
423
424 if (!$updated) {
425 $text .= format_job($job);
426 }
427 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
428 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
429
430 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
431 close ($new_fh);
432
433 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
434 close ($fh);
435 }
436
437 sub format_job {
438 my ($job, $line) = @_;
439 my $text = "";
440
441 if ($job->{state} eq "stopped") {
442 $text = "#";
443 }
444 if ($line) {
445 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
446 $text .= $1;
447 } else {
448 $text .= "*/$INTERVAL * * * *";
449 }
450 $text .= " root";
451 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
452 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
453 $text .= " --limit $job->{limit}" if $job->{limit};
454 $text .= " --method $job->{method}";
455 $text .= " --verbose" if $job->{verbose};
456 $text .= " --source-user $job->{source_user}";
457 $text .= " --dest-user $job->{dest_user}";
458 $text .= " --properties" if $job->{properties};
459 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
460 $text .= "\n";
461
462 return $text;
463 }
464
465 sub list {
466
467 my $cfg = read_cron();
468
469 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
470
471 my $states = read_state();
472 foreach my $source (sort keys%{$cfg}) {
473 foreach my $name (sort keys%{$cfg->{$source}}) {
474 $list .= sprintf("%-25s", cut_target_width($source, 25));
475 $list .= sprintf("%-25s", cut_target_width($name, 25));
476 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
477 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
478 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
479 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
480 }
481 }
482
483 return $list;
484 }
485
486 sub vm_exists {
487 my ($target, $user) = @_;
488
489 return undef if !defined($target->{vmid});
490
491 my $conf_fn = "$target->{vmid}.conf";
492
493 if ($target->{ip}) {
494 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
495 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
496 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
497 } else {
498 return "qemu" if -f "$QEMU_CONF/$conf_fn";
499 return "lxc" if -f "$LXC_CONF/$conf_fn";
500 }
501
502 return undef;
503 }
504
505 sub init {
506 my ($param) = @_;
507
508 locked("$CONFIG_PATH/cron_and_state.lock", sub {
509 my $cfg = read_cron();
510
511 my $job = param_to_job($param);
512
513 $job->{state} = "ok";
514 $job->{lsync} = 0;
515
516 my $source = parse_target($param->{source});
517 my $dest = parse_target($param->{dest});
518
519 if (my $ip = $dest->{ip}) {
520 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
521 }
522
523 if (my $ip = $source->{ip}) {
524 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
525 }
526
527 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
528
529 if (!defined($source->{vmid})) {
530 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
531 }
532
533 my $vm_type = vm_exists($source, $param->{source_user});
534 $job->{vm_type} = $vm_type;
535 $source->{vm_type} = $vm_type;
536
537 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
538
539 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
540
541 #check if vm has zfs disks if not die;
542 get_disks($source, $param->{source_user}) if $source->{vmid};
543
544 update_cron($job);
545 update_state($job);
546 }); #cron and state lock
547
548 eval {
549 sync($param) if !$param->{skip};
550 };
551 if(my $err = $@) {
552 destroy_job($param);
553 print $err;
554 }
555 }
556
557 sub get_job {
558 my ($param) = @_;
559
560 my $cfg = read_cron();
561
562 if (!$cfg->{$param->{source}}->{$param->{name}}) {
563 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
564 }
565 my $job = $cfg->{$param->{source}}->{$param->{name}};
566 $job->{name} = $param->{name};
567 $job->{source} = $param->{source};
568 $job = add_state_to_job($job);
569
570 return $job;
571 }
572
573 sub destroy_job {
574 my ($param) = @_;
575
576 locked("$CONFIG_PATH/cron_and_state.lock", sub {
577 my $job = get_job($param);
578 $job->{state} = "del";
579
580 update_cron($job);
581 update_state($job);
582 });
583 }
584
585 sub sync {
586 my ($param) = @_;
587
588 my $job;
589
590 locked("$CONFIG_PATH/cron_and_state.lock", sub {
591 eval { $job = get_job($param) };
592
593 if ($job) {
594 if (defined($job->{state}) && ($job->{state} eq "syncing" || $job->{state} eq "waiting")) {
595 die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
596 }
597
598 $job->{state} = "waiting";
599 update_state($job);
600 }
601 });
602
603 locked("$CONFIG_PATH/sync.lock", sub {
604
605 my $date = get_date();
606
607 my $dest;
608 my $source;
609 my $vm_type;
610
611 locked("$CONFIG_PATH/cron_and_state.lock", sub {
612 #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
613 eval { $job = get_job($param); };
614
615 if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
616 die "Job --source $param->{source} --name $param->{name} has been disabled\n";
617 }
618
619 $dest = parse_target($param->{dest});
620 $source = parse_target($param->{source});
621
622 $vm_type = vm_exists($source, $param->{source_user});
623 $source->{vm_type} = $vm_type;
624
625 if ($job) {
626 $job->{state} = "syncing";
627 $job->{vm_type} = $vm_type if !$job->{vm_type};
628 update_state($job);
629 }
630 }); #cron and state lock
631
632 my $sync_path = sub {
633 my ($source, $dest, $job, $param, $date) = @_;
634
635 ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
636
637 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
638
639 send_image($source, $dest, $param);
640
641 snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
642
643 };
644
645 eval{
646 if ($source->{vmid}) {
647 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
648 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
649 my $disks = get_disks($source, $param->{source_user});
650
651 foreach my $disk (sort keys %{$disks}) {
652 $source->{all} = $disks->{$disk}->{all};
653 $source->{pool} = $disks->{$disk}->{pool};
654 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
655 $source->{last_part} = $disks->{$disk}->{last_part};
656 &$sync_path($source, $dest, $job, $param, $date);
657 }
658 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
659 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
660 } else {
661 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
662 }
663 } else {
664 &$sync_path($source, $dest, $job, $param, $date);
665 }
666 };
667 if (my $err = $@) {
668 locked("$CONFIG_PATH/cron_and_state.lock", sub {
669 eval { $job = get_job($param); };
670 if ($job) {
671 $job->{state} = "error";
672 update_state($job);
673 }
674 });
675 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
676 die "$err\n";
677 }
678
679 locked("$CONFIG_PATH/cron_and_state.lock", sub {
680 eval { $job = get_job($param); };
681 if ($job) {
682 if (defined($job->{state}) && $job->{state} eq "stopped") {
683 $job->{state} = "stopped";
684 } else {
685 $job->{state} = "ok";
686 }
687 $job->{lsync} = $date;
688 update_state($job);
689 }
690 });
691 }); #sync lock
692 }
693
694 sub snapshot_get{
695 my ($source, $dest, $max_snap, $name, $source_user) = @_;
696
697 my $cmd = [];
698 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
699 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
700 push @$cmd, $source->{all};
701
702 my $raw = run_cmd($cmd);
703 my $index = 0;
704 my $line = "";
705 my $last_snap = undef;
706 my $old_snap;
707
708 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
709 $line = $1;
710 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
711
712 $last_snap = $1 if (!$last_snap);
713 $old_snap = $1;
714 $index++;
715 if ($index == $max_snap) {
716 $source->{destroy} = 1;
717 last;
718 };
719 }
720 }
721
722 return ($old_snap, $last_snap) if $last_snap;
723
724 return undef;
725 }
726
727 sub snapshot_add {
728 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
729
730 my $snap_name = "rep_$name\_".$date;
731
732 $source->{new_snap} = $snap_name;
733
734 my $path = "$source->{all}\@$snap_name";
735
736 my $cmd = [];
737 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
738 push @$cmd, 'zfs', 'snapshot', $path;
739 eval{
740 run_cmd($cmd);
741 };
742
743 if (my $err = $@) {
744 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
745 die "$err\n";
746 }
747 }
748
749 sub write_cron {
750 my ($cfg) = @_;
751
752 my $text = "SHELL=/bin/sh\n";
753 $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";
754
755 my $fh = IO::File->new("> $CRONJOBS");
756 die "Could not open file: $!\n" if !$fh;
757
758 foreach my $source (sort keys%{$cfg}) {
759 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
760 next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
761 $text .= "$PROG_PATH sync";
762 $text .= " -source ";
763 if ($cfg->{$source}->{$sync_name}->{vmid}) {
764 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
765 $text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
766 } else {
767 $text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
768 $text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
769 $text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
770 }
771 $text .= " -dest ";
772 $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
773 $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
774 $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
775 $text .= " -name $sync_name ";
776 $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
777 $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
778 $text .= "\n";
779 }
780 }
781 die "Can't write to cron\n" if (!print($fh $text));
782 close($fh);
783 }
784
785 sub get_disks {
786 my ($target, $user) = @_;
787
788 my $cmd = [];
789 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
790
791 if ($target->{vm_type} eq 'qemu') {
792 push @$cmd, 'qm', 'config', $target->{vmid};
793 } elsif ($target->{vm_type} eq 'lxc') {
794 push @$cmd, 'pct', 'config', $target->{vmid};
795 } else {
796 die "VM Type unknown\n";
797 }
798
799 my $res = run_cmd($cmd);
800
801 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
802
803 return $disks;
804 }
805
806 sub run_cmd {
807 my ($cmd) = @_;
808 print "Start CMD\n" if $DEBUG;
809 print Dumper $cmd if $DEBUG;
810 if (ref($cmd) eq 'ARRAY') {
811 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
812 }
813 my $output = `$cmd 2>&1`;
814
815 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
816
817 chomp($output);
818 print Dumper $output if $DEBUG;
819 print "END CMD\n" if $DEBUG;
820 return $output;
821 }
822
823 sub parse_disks {
824 my ($text, $ip, $vm_type, $user) = @_;
825
826 my $disks;
827
828 my $num = 0;
829 while ($text && $text =~ s/^(.*?)(\n|$)//) {
830 my $line = $1;
831
832 next if $line =~ /media=cdrom/;
833 next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;
834
835 #QEMU if backup is not set include in sync
836 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
837
838 #LXC if backup is not set do no in sync
839 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
840
841 my $disk = undef;
842 my $stor = undef;
843 if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
844 my @parameter = split(/,/,$1);
845
846 foreach my $opt (@parameter) {
847 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
848 $disk = $2;
849 $stor = $1;
850 last;
851 }
852 }
853 }
854 if (!defined($disk) || !defined($stor)) {
855 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
856 next;
857 }
858
859 my $cmd = [];
860 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
861 push @$cmd, 'pvesm', 'path', "$stor$disk";
862 my $path = run_cmd($cmd);
863
864 die "Get no path from pvesm path $stor$disk\n" if !$path;
865
866 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
867
868 my @array = split('/', $1);
869 $disks->{$num}->{pool} = shift(@array);
870 $disks->{$num}->{all} = $disks->{$num}->{pool};
871 if (0 < @array) {
872 $disks->{$num}->{path} = join('/', @array);
873 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
874 }
875 $disks->{$num}->{last_part} = $disk;
876 $disks->{$num}->{all} .= "\/$disk";
877
878 $num++;
879 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
880
881 $disks->{$num}->{pool} = $1;
882 $disks->{$num}->{all} = $disks->{$num}->{pool};
883
884 if ($2) {
885 $disks->{$num}->{path} = $3;
886 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
887 }
888
889 $disks->{$num}->{last_part} = $disk;
890 $disks->{$num}->{all} .= "\/$disk";
891
892 $num++;
893
894 } else {
895 die "ERROR: in path\n";
896 }
897 }
898
899 die "Vm include no disk on zfs.\n" if !$disks->{0};
900 return $disks;
901 }
902
903 sub snapshot_destroy {
904 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
905
906 my @zfscmd = ('zfs', 'destroy');
907 my $snapshot = "$source->{all}\@$snap";
908
909 eval {
910 if($source->{ip} && $method eq 'ssh'){
911 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
912 } else {
913 run_cmd([@zfscmd, $snapshot]);
914 }
915 };
916 if (my $erro = $@) {
917 warn "WARN: $erro";
918 }
919 if ($dest) {
920 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
921
922 my $path = "$dest->{all}";
923 $path .= "/$source->{last_part}" if $source->{last_part};
924
925 eval {
926 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
927 };
928 if (my $erro = $@) {
929 warn "WARN: $erro";
930 }
931 }
932 }
933
934 sub snapshot_exist {
935 my ($source , $dest, $method, $dest_user) = @_;
936
937 my $cmd = [];
938 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--' if $dest->{ip};
939 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
940
941 my $path = $dest->{all};
942 $path .= "/$source->{last_part}" if $source->{last_part};
943 $path .= "\@$source->{old_snap}";
944
945 push @$cmd, $path;
946
947
948 my $text = "";
949 eval {$text =run_cmd($cmd);};
950 if (my $erro =$@) {
951 warn "WARN: $erro";
952 return undef;
953 }
954
955 while ($text && $text =~ s/^(.*?)(\n|$)//) {
956 my $line =$1;
957 return 1 if $line =~ m/^.*$source->{old_snap}$/;
958 }
959 }
960
961 sub send_image {
962 my ($source, $dest, $param) = @_;
963
964 my $cmd = [];
965
966 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
967 push @$cmd, 'zfs', 'send';
968 push @$cmd, '-p', if $param->{properties};
969 push @$cmd, '-v' if $param->{verbose};
970
971 if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{dest_user})) {
972 push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
973 }
974 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
975
976 if ($param->{limit}){
977 my $bwl = $param->{limit}*1024;
978 push @$cmd, \'|', 'cstream', '-t', $bwl;
979 }
980 my $target = "$dest->{all}";
981 $target .= "/$source->{last_part}" if $source->{last_part};
982 $target =~ s!/+!/!g;
983
984 push @$cmd, \'|';
985 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
986 push @$cmd, 'zfs', 'recv', '-F', '--';
987 push @$cmd, "$target";
988
989 eval {
990 run_cmd($cmd)
991 };
992
993 if (my $erro = $@) {
994 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
995 die $erro;
996 };
997 }
998
999
1000 sub send_config{
1001 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
1002
1003 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
1004 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
1005
1006 my $config_dir = $dest_config_path // $CONFIG_PATH;
1007 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
1008
1009 $dest_target_new = $config_dir.'/'.$dest_target_new;
1010
1011 if ($method eq 'ssh'){
1012 if ($dest->{ip} && $source->{ip}) {
1013 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1014 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1015 } elsif ($dest->{ip}) {
1016 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
1017 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
1018 } elsif ($source->{ip}) {
1019 run_cmd(['mkdir', '-p', '--', $config_dir]);
1020 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
1021 }
1022
1023 if ($source->{destroy}){
1024 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
1025 if($dest->{ip}){
1026 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
1027 } else {
1028 run_cmd(['rm', '-f', '--', $dest_target_old]);
1029 }
1030 }
1031 } elsif ($method eq 'local') {
1032 run_cmd(['mkdir', '-p', '--', $config_dir]);
1033 run_cmd(['cp', $source_target, $dest_target_new]);
1034 }
1035 }
1036
1037 sub get_date {
1038 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1039 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1040
1041 return $datestamp;
1042 }
1043
1044 sub status {
1045 my $cfg = read_cron();
1046
1047 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1048
1049 my $states = read_state();
1050
1051 foreach my $source (sort keys%{$cfg}) {
1052 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1053 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1054 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1055 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1056 }
1057 }
1058
1059 return $status_list;
1060 }
1061
1062 sub enable_job {
1063 my ($param) = @_;
1064
1065 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1066 my $job = get_job($param);
1067 $job->{state} = "ok";
1068 update_state($job);
1069 update_cron($job);
1070 });
1071 }
1072
1073 sub disable_job {
1074 my ($param) = @_;
1075
1076 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1077 my $job = get_job($param);
1078 $job->{state} = "stopped";
1079 update_state($job);
1080 update_cron($job);
1081 });
1082 }
1083
1084 my $cmd_help = {
1085 destroy => qq{
1086 $PROGNAME destroy -source <string> [OPTIONS]
1087
1088 remove a sync Job from the scheduler
1089
1090 -name string
1091
1092 name of the sync job, if not set it is default
1093
1094 -source string
1095
1096 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1097 },
1098 create => qq{
1099 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1100
1101 Create a sync Job
1102
1103 -dest string
1104
1105 the destination target is like [IP]:<Pool>[/Path]
1106
1107 -dest-user string
1108
1109 name of the user on the destination target, root by default
1110
1111 -limit integer
1112
1113 max sync speed in kBytes/s, default unlimited
1114
1115 -maxsnap string
1116
1117 how much snapshots will be kept before get erased, default 1
1118
1119 -name string
1120
1121 name of the sync job, if not set it is default
1122
1123 -skip boolean
1124
1125 if this flag is set it will skip the first sync
1126
1127 -source string
1128
1129 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1130
1131 -source-user string
1132
1133 name of the user on the source target, root by default
1134
1135 -properties boolean
1136
1137 Include the dataset's properties in the stream.
1138
1139 -dest-config-path string
1140
1141 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1142 },
1143 sync => qq{
1144 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1145
1146 will sync one time
1147
1148 -dest string
1149
1150 the destination target is like [IP:]<Pool>[/Path]
1151
1152 -dest-user string
1153
1154 name of the user on the destination target, root by default
1155
1156 -limit integer
1157
1158 max sync speed in kBytes/s, default unlimited
1159
1160 -maxsnap integer
1161
1162 how much snapshots will be kept before get erased, default 1
1163
1164 -name string
1165
1166 name of the sync job, if not set it is default.
1167 It is only necessary if scheduler allready contains this source.
1168
1169 -source string
1170
1171 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1172
1173 -source-user string
1174
1175 name of the user on the source target, root by default
1176
1177 -verbose boolean
1178
1179 print out the sync progress.
1180
1181 -properties boolean
1182
1183 Include the dataset's properties in the stream.
1184
1185 -dest-config-path string
1186
1187 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1188 },
1189 list => qq{
1190 $PROGNAME list
1191
1192 Get a List of all scheduled Sync Jobs
1193 },
1194 status => qq{
1195 $PROGNAME status
1196
1197 Get the status of all scheduled Sync Jobs
1198 },
1199 help => qq{
1200 $PROGNAME help <cmd> [OPTIONS]
1201
1202 Get help about specified command.
1203
1204 <cmd> string
1205
1206 Command name
1207
1208 -verbose boolean
1209
1210 Verbose output format.
1211 },
1212 enable => qq{
1213 $PROGNAME enable -source <string> [OPTIONS]
1214
1215 enable a syncjob and reset error
1216
1217 -name string
1218
1219 name of the sync job, if not set it is default
1220
1221 -source string
1222
1223 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1224 },
1225 disable => qq{
1226 $PROGNAME disable -source <string> [OPTIONS]
1227
1228 pause a sync job
1229
1230 -name string
1231
1232 name of the sync job, if not set it is default
1233
1234 -source string
1235
1236 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1237 },
1238 printpod => 'internal command',
1239
1240 };
1241
1242 if (!$command) {
1243 usage(); die "\n";
1244 } elsif (!$cmd_help->{$command}) {
1245 print "ERROR: unknown command '$command'";
1246 usage(1); die "\n";
1247 }
1248
1249 my @arg = @ARGV;
1250 my $param = parse_argv(@arg);
1251
1252 sub check_params {
1253 for (@_) {
1254 die "$cmd_help->{$command}\n" if !$param->{$_};
1255 }
1256 }
1257
1258 if ($command eq 'destroy') {
1259 check_params(qw(source));
1260
1261 check_target($param->{source});
1262 destroy_job($param);
1263
1264 } elsif ($command eq 'sync') {
1265 check_params(qw(source dest));
1266
1267 check_target($param->{source});
1268 check_target($param->{dest});
1269 sync($param);
1270
1271 } elsif ($command eq 'create') {
1272 check_params(qw(source dest));
1273
1274 check_target($param->{source});
1275 check_target($param->{dest});
1276 init($param);
1277
1278 } elsif ($command eq 'status') {
1279 print status();
1280
1281 } elsif ($command eq 'list') {
1282 print list();
1283
1284 } elsif ($command eq 'help') {
1285 my $help_command = $ARGV[1];
1286
1287 if ($help_command && $cmd_help->{$help_command}) {
1288 die "$cmd_help->{$help_command}\n";
1289
1290 }
1291 if ($param->{verbose}) {
1292 exec("man $PROGNAME");
1293
1294 } else {
1295 usage(1);
1296
1297 }
1298
1299 } elsif ($command eq 'enable') {
1300 check_params(qw(source));
1301
1302 check_target($param->{source});
1303 enable_job($param);
1304
1305 } elsif ($command eq 'disable') {
1306 check_params(qw(source));
1307
1308 check_target($param->{source});
1309 disable_job($param);
1310
1311 } elsif ($command eq 'printpod') {
1312 print_pod();
1313 }
1314
1315 sub usage {
1316 my ($help) = @_;
1317
1318 print("ERROR:\tno command specified\n") if !$help;
1319 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1320 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1321 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1322 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1323 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1324 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1325 print("\t$PROGNAME list\n");
1326 print("\t$PROGNAME status\n");
1327 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1328 }
1329
1330 sub check_target {
1331 my ($target) = @_;
1332 parse_target($target);
1333 }
1334
1335 sub print_pod {
1336
1337 my $synopsis = join("\n", sort values %$cmd_help);
1338
1339 print <<EOF;
1340 =head1 NAME
1341
1342 pve-zsync - PVE ZFS Replication Manager
1343
1344 =head1 SYNOPSIS
1345
1346 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1347
1348 $synopsis
1349
1350 =head1 DESCRIPTION
1351
1352 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1353 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1354 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1355 To config cron see man crontab.
1356
1357 =head2 PVE ZFS Storage sync Tool
1358
1359 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1360
1361 =head1 EXAMPLES
1362
1363 add sync job from local VM to remote ZFS Server
1364 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1365
1366 =head1 IMPORTANT FILES
1367
1368 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1369
1370 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1371
1372 =head1 COPYRIGHT AND DISCLAIMER
1373
1374 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1375
1376 This program is free software: you can redistribute it and/or modify it
1377 under the terms of the GNU Affero General Public License as published
1378 by the Free Software Foundation, either version 3 of the License, or
1379 (at your option) any later version.
1380
1381 This program is distributed in the hope that it will be useful, but
1382 WITHOUT ANY WARRANTY; without even the implied warranty of
1383 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1384 Affero General Public License for more details.
1385
1386 You should have received a copy of the GNU Affero General Public
1387 License along with this program. If not, see
1388 <http://www.gnu.org/licenses/>.
1389
1390 EOF
1391 }