]> git.proxmox.com Git - pve-zsync.git/blob - pve-zsync
remove unused function write_cron
[pve-zsync.git] / pve-zsync
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use Fcntl qw(:flock SEEK_END);
7 use Getopt::Long qw(GetOptionsFromArray);
8 use File::Path qw(make_path);
9 use JSON;
10 use IO::File;
11 use String::ShellQuote 'shell_quote';
12
13 my $PROGNAME = "pve-zsync";
14 my $CONFIG_PATH = "/var/lib/${PROGNAME}";
15 my $STATE = "${CONFIG_PATH}/sync_state";
16 my $CRONJOBS = "/etc/cron.d/$PROGNAME";
17 my $PATH = "/usr/sbin";
18 my $PVE_DIR = "/etc/pve/local";
19 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
20 my $LXC_CONF = "${PVE_DIR}/lxc";
21 my $PROG_PATH = "$PATH/${PROGNAME}";
22 my $INTERVAL = 15;
23 my $DEBUG;
24
25 BEGIN {
26 $DEBUG = 0; # change default here. not above on declaration!
27 $DEBUG ||= $ENV{ZSYNC_DEBUG};
28 if ($DEBUG) {
29 require Data::Dumper;
30 Data::Dumper->import();
31 }
32 }
33
34 my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
35 my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
36 my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
37 my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";
38
39 my $IPV6RE = "(?:" .
40 "(?:(?:" . "(?:$IPV6H16:){6})$IPV6LS32)|" .
41 "(?:(?:" . "::(?:$IPV6H16:){5})$IPV6LS32)|" .
42 "(?:(?:(?:" . "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
43 "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
44 "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
45 "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
46 "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" . ")$IPV6LS32)|" .
47 "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" . ")$IPV6H16)|" .
48 "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" . ")))";
49
50 my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
51 my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
52 my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
53 # targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
54 my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;
55
56 my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|mp)\d+)|rootfs): /;
57
58 my $command = $ARGV[0];
59
60 if (defined($command) && $command ne 'help' && $command ne 'printpod') {
61 check_bin ('cstream');
62 check_bin ('zfs');
63 check_bin ('ssh');
64 check_bin ('scp');
65 }
66
67 $SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
68 die "Signaled, aborting sync: $!\n";
69 };
70
71 sub check_bin {
72 my ($bin) = @_;
73
74 foreach my $p (split (/:/, $ENV{PATH})) {
75 my $fn = "$p/$bin";
76 if (-x $fn) {
77 return $fn;
78 }
79 }
80
81 die "unable to find command '$bin'\n";
82 }
83
84 sub cut_target_width {
85 my ($path, $maxlen) = @_;
86 $path =~ s@/+@/@g;
87
88 return $path if length($path) <= $maxlen;
89
90 return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;
91
92 $path =~ s@/([^/]+/?)$@@;
93 my $tail = $1;
94
95 if (length($tail)+3 == $maxlen) {
96 return "../$tail";
97 } elsif (length($tail)+2 >= $maxlen) {
98 return '..'.substr($tail, -$maxlen+2)
99 }
100
101 $path =~ s@(/[^/]+)(?:/|$)@@;
102 my $head = $1;
103 my $both = length($head) + length($tail);
104 my $remaining = $maxlen-$both-4; # -4 for "/../"
105
106 if ($remaining < 0) {
107 return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
108 }
109
110 substr($path, ($remaining/2), (length($path)-$remaining), '..');
111 return "$head/" . $path . "/$tail";
112 }
113
114 sub locked {
115 my ($lock_fn, $code) = @_;
116
117 my $lock_fh = IO::File->new("> $lock_fn");
118
119 flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
120 my $res = eval { $code->() };
121 my $err = $@;
122
123 flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
124 die "$err" if $err;
125
126 close($lock_fh);
127 return $res;
128 }
129
130 sub get_status {
131 my ($source, $name, $status) = @_;
132
133 if ($status->{$source->{all}}->{$name}->{status}) {
134 return $status;
135 }
136
137 return undef;
138 }
139
140 sub check_pool_exists {
141 my ($target, $user) = @_;
142
143 my $cmd = [];
144
145 if ($target->{ip}) {
146 push @$cmd, 'ssh', "$user\@$target->{ip}", '--';
147 }
148 push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
149 eval {
150 run_cmd($cmd);
151 };
152
153 if ($@) {
154 return 0;
155 }
156 return 1;
157 }
158
159 sub parse_target {
160 my ($text) = @_;
161
162 my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
163 my $target = {};
164
165 if ($text !~ $TARGETRE) {
166 die "$errstr\n";
167 }
168 $target->{all} = $2;
169 $target->{ip} = $1 if $1;
170 my @parts = split('/', $2);
171
172 $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};
173
174 my $pool = $target->{pool} = shift(@parts);
175 die "$errstr\n" if !$pool;
176
177 if ($pool =~ m/^\d+$/) {
178 $target->{vmid} = $pool;
179 delete $target->{pool};
180 }
181
182 return $target if (@parts == 0);
183 $target->{last_part} = pop(@parts);
184
185 if ($target->{ip}) {
186 pop(@parts);
187 }
188 if (@parts > 0) {
189 $target->{path} = join('/', @parts);
190 }
191
192 return $target;
193 }
194
195 sub read_cron {
196
197 #This is for the first use to init file;
198 if (!-e $CRONJOBS) {
199 my $new_fh = IO::File->new("> $CRONJOBS");
200 die "Could not create $CRONJOBS: $!\n" if !$new_fh;
201 close($new_fh);
202 return undef;
203 }
204
205 my $fh = IO::File->new("< $CRONJOBS");
206 die "Could not open file $CRONJOBS: $!\n" if !$fh;
207
208 my @text = <$fh>;
209
210 close($fh);
211
212 return encode_cron(@text);
213 }
214
215 sub parse_argv {
216 my (@arg) = @_;
217
218 my $param = {
219 dest => undef,
220 source => undef,
221 verbose => undef,
222 limit => undef,
223 maxsnap => undef,
224 name => undef,
225 skip => undef,
226 method => undef,
227 source_user => undef,
228 dest_user => undef,
229 properties => undef,
230 dest_config_path => undef,
231 };
232
233 my ($ret) = GetOptionsFromArray(
234 \@arg,
235 'dest=s' => \$param->{dest},
236 'source=s' => \$param->{source},
237 'verbose' => \$param->{verbose},
238 'limit=i' => \$param->{limit},
239 'maxsnap=i' => \$param->{maxsnap},
240 'name=s' => \$param->{name},
241 'skip' => \$param->{skip},
242 'method=s' => \$param->{method},
243 'source-user=s' => \$param->{source_user},
244 'dest-user=s' => \$param->{dest_user},
245 'properties' => \$param->{properties},
246 'dest-config-path=s' => \$param->{dest_config_path},
247 );
248
249 die "can't parse options\n" if $ret == 0;
250
251 $param->{name} //= "default";
252 $param->{maxsnap} //= 1;
253 $param->{method} //= "ssh";
254 $param->{source_user} //= "root";
255 $param->{dest_user} //= "root";
256
257 return $param;
258 }
259
260 sub add_state_to_job {
261 my ($job) = @_;
262
263 my $states = read_state();
264 my $state = $states->{$job->{source}}->{$job->{name}};
265
266 $job->{state} = $state->{state};
267 $job->{lsync} = $state->{lsync};
268 $job->{vm_type} = $state->{vm_type};
269
270 for (my $i = 0; $state->{"snap$i"}; $i++) {
271 $job->{"snap$i"} = $state->{"snap$i"};
272 }
273
274 return $job;
275 }
276
277 sub encode_cron {
278 my (@text) = @_;
279
280 my $cfg = {};
281
282 while (my $line = shift(@text)) {
283
284 my @arg = split('\s', $line);
285 my $param = parse_argv(@arg);
286
287 if ($param->{source} && $param->{dest}) {
288 my $source = delete $param->{source};
289 my $name = delete $param->{name};
290
291 $cfg->{$source}->{$name} = $param;
292 }
293 }
294
295 return $cfg;
296 }
297
298 sub param_to_job {
299 my ($param) = @_;
300
301 my $job = {};
302
303 my $source = parse_target($param->{source});
304 my $dest = parse_target($param->{dest}) if $param->{dest};
305
306 $job->{name} = !$param->{name} ? "default" : $param->{name};
307 $job->{dest} = $param->{dest} if $param->{dest};
308 $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
309 $job->{method} = "ssh" if !$job->{method};
310 $job->{limit} = $param->{limit};
311 $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
312 $job->{source} = $param->{source};
313 $job->{source_user} = $param->{source_user};
314 $job->{dest_user} = $param->{dest_user};
315 $job->{properties} = !!$param->{properties};
316 $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};
317
318 return $job;
319 }
320
321 sub read_state {
322
323 if (!-e $STATE) {
324 make_path $CONFIG_PATH;
325 my $new_fh = IO::File->new("> $STATE");
326 die "Could not create $STATE: $!\n" if !$new_fh;
327 print $new_fh "{}";
328 close($new_fh);
329 return undef;
330 }
331
332 my $fh = IO::File->new("< $STATE");
333 die "Could not open file $STATE: $!\n" if !$fh;
334
335 my $text = <$fh>;
336 my $states = decode_json($text);
337
338 close($fh);
339
340 return $states;
341 }
342
343 sub update_state {
344 my ($job) = @_;
345 my $text;
346 my $in_fh;
347
348 eval {
349
350 $in_fh = IO::File->new("< $STATE");
351 die "Could not open file $STATE: $!\n" if !$in_fh;
352 $text = <$in_fh>;
353 };
354
355 my $out_fh = IO::File->new("> $STATE.new");
356 die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
357
358 my $states = {};
359 my $state = {};
360 if ($text){
361 $states = decode_json($text);
362 $state = $states->{$job->{source}}->{$job->{name}};
363 }
364
365 if ($job->{state} ne "del") {
366 $state->{state} = $job->{state};
367 $state->{lsync} = $job->{lsync};
368 $state->{vm_type} = $job->{vm_type};
369
370 for (my $i = 0; $job->{"snap$i"} ; $i++) {
371 $state->{"snap$i"} = $job->{"snap$i"};
372 }
373 $states->{$job->{source}}->{$job->{name}} = $state;
374 } else {
375
376 delete $states->{$job->{source}}->{$job->{name}};
377 delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
378 }
379
380 $text = encode_json($states);
381 print $out_fh $text;
382
383 close($out_fh);
384 rename "$STATE.new", $STATE;
385 eval {
386 close($in_fh);
387 };
388
389 return $states;
390 }
391
392 sub update_cron {
393 my ($job) = @_;
394
395 my $updated;
396 my $has_header;
397 my $line_no = 0;
398 my $text = "";
399 my $header = "SHELL=/bin/sh\n";
400 $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";
401
402 my $fh = IO::File->new("< $CRONJOBS");
403 die "Could not open file $CRONJOBS: $!\n" if !$fh;
404
405 my @test = <$fh>;
406
407 while (my $line = shift(@test)) {
408 chomp($line);
409 if ($line =~ m/source $job->{source} .*name $job->{name} /) {
410 $updated = 1;
411 next if $job->{state} eq "del";
412 $text .= format_job($job, $line);
413 } else {
414 if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
415 $has_header = 1;
416 }
417 $text .= "$line\n";
418 }
419 $line_no++;
420 }
421
422 if (!$has_header) {
423 $text = "$header$text";
424 }
425
426 if (!$updated) {
427 $text .= format_job($job);
428 }
429 my $new_fh = IO::File->new("> ${CRONJOBS}.new");
430 die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;
431
432 die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
433 close ($new_fh);
434
435 die "can't move $CRONJOBS.new: $!\n" if !rename "${CRONJOBS}.new", $CRONJOBS;
436 close ($fh);
437 }
438
439 sub format_job {
440 my ($job, $line) = @_;
441 my $text = "";
442
443 if ($job->{state} eq "stopped") {
444 $text = "#";
445 }
446 if ($line) {
447 $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
448 $text .= $1;
449 } else {
450 $text .= "*/$INTERVAL * * * *";
451 }
452 $text .= " root";
453 $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
454 $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
455 $text .= " --limit $job->{limit}" if $job->{limit};
456 $text .= " --method $job->{method}";
457 $text .= " --verbose" if $job->{verbose};
458 $text .= " --source-user $job->{source_user}";
459 $text .= " --dest-user $job->{dest_user}";
460 $text .= " --properties" if $job->{properties};
461 $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
462 $text .= "\n";
463
464 return $text;
465 }
466
467 sub list {
468
469 my $cfg = read_cron();
470
471 my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");
472
473 my $states = read_state();
474 foreach my $source (sort keys%{$cfg}) {
475 foreach my $name (sort keys%{$cfg->{$source}}) {
476 $list .= sprintf("%-25s", cut_target_width($source, 25));
477 $list .= sprintf("%-25s", cut_target_width($name, 25));
478 $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
479 $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
480 $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
481 $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
482 }
483 }
484
485 return $list;
486 }
487
488 sub vm_exists {
489 my ($target, $user) = @_;
490
491 return undef if !defined($target->{vmid});
492
493 my $conf_fn = "$target->{vmid}.conf";
494
495 if ($target->{ip}) {
496 my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
497 return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
498 return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
499 } else {
500 return "qemu" if -f "$QEMU_CONF/$conf_fn";
501 return "lxc" if -f "$LXC_CONF/$conf_fn";
502 }
503
504 return undef;
505 }
506
507 sub init {
508 my ($param) = @_;
509
510 locked("$CONFIG_PATH/cron_and_state.lock", sub {
511 my $cfg = read_cron();
512
513 my $job = param_to_job($param);
514
515 $job->{state} = "ok";
516 $job->{lsync} = 0;
517
518 my $source = parse_target($param->{source});
519 my $dest = parse_target($param->{dest});
520
521 if (my $ip = $dest->{ip}) {
522 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
523 }
524
525 if (my $ip = $source->{ip}) {
526 run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
527 }
528
529 die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
530
531 if (!defined($source->{vmid})) {
532 die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
533 }
534
535 my $vm_type = vm_exists($source, $param->{source_user});
536 $job->{vm_type} = $vm_type;
537 $source->{vm_type} = $vm_type;
538
539 die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
540
541 die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
542
543 #check if vm has zfs disks if not die;
544 get_disks($source, $param->{source_user}) if $source->{vmid};
545
546 update_cron($job);
547 update_state($job);
548 }); #cron and state lock
549
550 return if $param->{skip};
551
552 eval { sync($param) };
553 if (my $err = $@) {
554 destroy_job($param);
555 print $err;
556 }
557 }
558
559 sub get_job {
560 my ($param) = @_;
561
562 my $cfg = read_cron();
563
564 if (!$cfg->{$param->{source}}->{$param->{name}}) {
565 die "Job with source $param->{source} and name $param->{name} does not exist\n" ;
566 }
567 my $job = $cfg->{$param->{source}}->{$param->{name}};
568 $job->{name} = $param->{name};
569 $job->{source} = $param->{source};
570 $job = add_state_to_job($job);
571
572 return $job;
573 }
574
575 sub destroy_job {
576 my ($param) = @_;
577
578 locked("$CONFIG_PATH/cron_and_state.lock", sub {
579 my $job = get_job($param);
580 $job->{state} = "del";
581
582 update_cron($job);
583 update_state($job);
584 });
585 }
586
587 sub sync {
588 my ($param) = @_;
589
590 my $job;
591
592 locked("$CONFIG_PATH/cron_and_state.lock", sub {
593 eval { $job = get_job($param) };
594
595 if ($job) {
596 if (defined($job->{state}) && ($job->{state} eq "syncing" || $job->{state} eq "waiting")) {
597 die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
598 }
599
600 $job->{state} = "waiting";
601 update_state($job);
602 }
603 });
604
605 locked("$CONFIG_PATH/sync.lock", sub {
606
607 my $date = get_date();
608
609 my $dest;
610 my $source;
611 my $vm_type;
612
613 locked("$CONFIG_PATH/cron_and_state.lock", sub {
614 #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
615 eval { $job = get_job($param); };
616
617 if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
618 die "Job --source $param->{source} --name $param->{name} has been disabled\n";
619 }
620
621 $dest = parse_target($param->{dest});
622 $source = parse_target($param->{source});
623
624 $vm_type = vm_exists($source, $param->{source_user});
625 $source->{vm_type} = $vm_type;
626
627 if ($job) {
628 $job->{state} = "syncing";
629 $job->{vm_type} = $vm_type if !$job->{vm_type};
630 update_state($job);
631 }
632 }); #cron and state lock
633
634 my $sync_path = sub {
635 my ($source, $dest, $job, $param, $date) = @_;
636
637 ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{dest_user});
638
639 snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
640
641 send_image($source, $dest, $param);
642
643 snapshot_destroy($source, $dest, $param->{method}, $dest->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $dest->{old_snap});
644
645 };
646
647 eval{
648 if ($source->{vmid}) {
649 die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
650 die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
651 my $disks = get_disks($source, $param->{source_user});
652
653 foreach my $disk (sort keys %{$disks}) {
654 $source->{all} = $disks->{$disk}->{all};
655 $source->{pool} = $disks->{$disk}->{pool};
656 $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
657 $source->{last_part} = $disks->{$disk}->{last_part};
658 &$sync_path($source, $dest, $job, $param, $date);
659 }
660 if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
661 send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
662 } else {
663 send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
664 }
665 } else {
666 &$sync_path($source, $dest, $job, $param, $date);
667 }
668 };
669 if (my $err = $@) {
670 locked("$CONFIG_PATH/cron_and_state.lock", sub {
671 eval { $job = get_job($param); };
672 if ($job) {
673 $job->{state} = "error";
674 update_state($job);
675 }
676 });
677 print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
678 die "$err\n";
679 }
680
681 locked("$CONFIG_PATH/cron_and_state.lock", sub {
682 eval { $job = get_job($param); };
683 if ($job) {
684 if (defined($job->{state}) && $job->{state} eq "stopped") {
685 $job->{state} = "stopped";
686 } else {
687 $job->{state} = "ok";
688 }
689 $job->{lsync} = $date;
690 update_state($job);
691 }
692 });
693 }); #sync lock
694 }
695
696 sub snapshot_get{
697 my ($source, $dest, $max_snap, $name, $dest_user) = @_;
698
699 my $cmd = [];
700 push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--', if $dest->{ip};
701 push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
702
703 my $path = $dest->{all};
704 $path .= "/$source->{last_part}" if $source->{last_part};
705 push @$cmd, $path;
706
707 my $raw;
708 eval {$raw = run_cmd($cmd)};
709 if (my $erro =$@) { #this means the volume doesn't exist on dest yet
710 return undef;
711 }
712
713 my $index = 0;
714 my $line = "";
715 my $last_snap = undef;
716 my $old_snap;
717
718 while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
719 $line = $1;
720 if ($line =~ m/@(.*)$/) {
721 $last_snap = $1 if (!$last_snap);
722 }
723 if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
724 $old_snap = $1;
725 $index++;
726 if ($index == $max_snap) {
727 $source->{destroy} = 1;
728 last;
729 };
730 }
731 }
732
733 return ($old_snap, $last_snap) if $last_snap;
734
735 return undef;
736 }
737
738 sub snapshot_add {
739 my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;
740
741 my $snap_name = "rep_$name\_".$date;
742
743 $source->{new_snap} = $snap_name;
744
745 my $path = "$source->{all}\@$snap_name";
746
747 my $cmd = [];
748 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
749 push @$cmd, 'zfs', 'snapshot', $path;
750 eval{
751 run_cmd($cmd);
752 };
753
754 if (my $err = $@) {
755 snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
756 die "$err\n";
757 }
758 }
759
760 sub get_disks {
761 my ($target, $user) = @_;
762
763 my $cmd = [];
764 push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};
765
766 if ($target->{vm_type} eq 'qemu') {
767 push @$cmd, 'qm', 'config', $target->{vmid};
768 } elsif ($target->{vm_type} eq 'lxc') {
769 push @$cmd, 'pct', 'config', $target->{vmid};
770 } else {
771 die "VM Type unknown\n";
772 }
773
774 my $res = run_cmd($cmd);
775
776 my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);
777
778 return $disks;
779 }
780
781 sub run_cmd {
782 my ($cmd) = @_;
783 print "Start CMD\n" if $DEBUG;
784 print Dumper $cmd if $DEBUG;
785 if (ref($cmd) eq 'ARRAY') {
786 $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
787 }
788 my $output = `$cmd 2>&1`;
789
790 die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;
791
792 chomp($output);
793 print Dumper $output if $DEBUG;
794 print "END CMD\n" if $DEBUG;
795 return $output;
796 }
797
798 sub parse_disks {
799 my ($text, $ip, $vm_type, $user) = @_;
800
801 my $disks;
802
803 my $num = 0;
804 while ($text && $text =~ s/^(.*?)(\n|$)//) {
805 my $line = $1;
806
807 next if $line =~ /media=cdrom/;
808 next if $line !~ m/$DISK_KEY_RE/;
809
810 #QEMU if backup is not set include in sync
811 next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);
812
813 #LXC if backup is not set do no in sync
814 next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);
815
816 my $disk = undef;
817 my $stor = undef;
818 if($line =~ m/$DISK_KEY_RE(.*)$/) {
819 my @parameter = split(/,/,$1);
820
821 foreach my $opt (@parameter) {
822 if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
823 $disk = $2;
824 $stor = $1;
825 last;
826 }
827 }
828 }
829 if (!defined($disk) || !defined($stor)) {
830 print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
831 next;
832 }
833
834 my $cmd = [];
835 push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
836 push @$cmd, 'pvesm', 'path', "$stor$disk";
837 my $path = run_cmd($cmd);
838
839 die "Get no path from pvesm path $stor$disk\n" if !$path;
840
841 if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {
842
843 my @array = split('/', $1);
844 $disks->{$num}->{pool} = shift(@array);
845 $disks->{$num}->{all} = $disks->{$num}->{pool};
846 if (0 < @array) {
847 $disks->{$num}->{path} = join('/', @array);
848 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
849 }
850 $disks->{$num}->{last_part} = $disk;
851 $disks->{$num}->{all} .= "\/$disk";
852
853 $num++;
854 } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {
855
856 $disks->{$num}->{pool} = $1;
857 $disks->{$num}->{all} = $disks->{$num}->{pool};
858
859 if ($2) {
860 $disks->{$num}->{path} = $3;
861 $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
862 }
863
864 $disks->{$num}->{last_part} = $disk;
865 $disks->{$num}->{all} .= "\/$disk";
866
867 $num++;
868
869 } else {
870 die "ERROR: in path\n";
871 }
872 }
873
874 die "Vm include no disk on zfs.\n" if !$disks->{0};
875 return $disks;
876 }
877
878 sub snapshot_destroy {
879 my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;
880
881 my @zfscmd = ('zfs', 'destroy');
882 my $snapshot = "$source->{all}\@$snap";
883
884 eval {
885 if($source->{ip} && $method eq 'ssh'){
886 run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
887 } else {
888 run_cmd([@zfscmd, $snapshot]);
889 }
890 };
891 if (my $erro = $@) {
892 warn "WARN: $erro";
893 }
894 if ($dest) {
895 my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();
896
897 my $path = "$dest->{all}";
898 $path .= "/$source->{last_part}" if $source->{last_part};
899
900 eval {
901 run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
902 };
903 if (my $erro = $@) {
904 warn "WARN: $erro";
905 }
906 }
907 }
908
909 # check if snapshot for incremental sync exist on source side
910 sub snapshot_exist {
911 my ($source , $dest, $method, $source_user) = @_;
912
913 my $cmd = [];
914 push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip};
915 push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
916
917 my $path = $source->{all};
918 $path .= "\@$dest->{last_snap}";
919
920 push @$cmd, $path;
921
922 eval {run_cmd($cmd)};
923 if (my $erro =$@) {
924 warn "WARN: $erro";
925 return undef;
926 }
927 return 1;
928 }
929
930 sub send_image {
931 my ($source, $dest, $param) = @_;
932
933 my $cmd = [];
934
935 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
936 push @$cmd, 'zfs', 'send';
937 push @$cmd, '-p', if $param->{properties};
938 push @$cmd, '-v' if $param->{verbose};
939
940 if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) {
941 push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}";
942 }
943 push @$cmd, '--', "$source->{all}\@$source->{new_snap}";
944
945 if ($param->{limit}){
946 my $bwl = $param->{limit}*1024;
947 push @$cmd, \'|', 'cstream', '-t', $bwl;
948 }
949 my $target = "$dest->{all}";
950 $target .= "/$source->{last_part}" if $source->{last_part};
951 $target =~ s!/+!/!g;
952
953 push @$cmd, \'|';
954 push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
955 push @$cmd, 'zfs', 'recv', '-F', '--';
956 push @$cmd, "$target";
957
958 eval {
959 run_cmd($cmd)
960 };
961
962 if (my $erro = $@) {
963 snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
964 die $erro;
965 };
966 }
967
968
969 sub send_config{
970 my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;
971
972 my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
973 my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";
974
975 my $config_dir = $dest_config_path // $CONFIG_PATH;
976 $config_dir .= "/$dest->{last_part}" if $dest->{last_part};
977
978 $dest_target_new = $config_dir.'/'.$dest_target_new;
979
980 if ($method eq 'ssh'){
981 if ($dest->{ip} && $source->{ip}) {
982 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
983 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
984 } elsif ($dest->{ip}) {
985 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
986 run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
987 } elsif ($source->{ip}) {
988 run_cmd(['mkdir', '-p', '--', $config_dir]);
989 run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
990 }
991
992 if ($source->{destroy}){
993 my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$dest->{old_snap}";
994 if($dest->{ip}){
995 run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
996 } else {
997 run_cmd(['rm', '-f', '--', $dest_target_old]);
998 }
999 }
1000 } elsif ($method eq 'local') {
1001 run_cmd(['mkdir', '-p', '--', $config_dir]);
1002 run_cmd(['cp', $source_target, $dest_target_new]);
1003 }
1004 }
1005
1006 sub get_date {
1007 my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
1008 my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
1009
1010 return $datestamp;
1011 }
1012
1013 sub status {
1014 my $cfg = read_cron();
1015
1016 my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");
1017
1018 my $states = read_state();
1019
1020 foreach my $source (sort keys%{$cfg}) {
1021 foreach my $sync_name (sort keys%{$cfg->{$source}}) {
1022 $status_list .= sprintf("%-25s", cut_target_width($source, 25));
1023 $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
1024 $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
1025 }
1026 }
1027
1028 return $status_list;
1029 }
1030
1031 sub enable_job {
1032 my ($param) = @_;
1033
1034 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1035 my $job = get_job($param);
1036 $job->{state} = "ok";
1037 update_state($job);
1038 update_cron($job);
1039 });
1040 }
1041
1042 sub disable_job {
1043 my ($param) = @_;
1044
1045 locked("$CONFIG_PATH/cron_and_state.lock", sub {
1046 my $job = get_job($param);
1047 $job->{state} = "stopped";
1048 update_state($job);
1049 update_cron($job);
1050 });
1051 }
1052
1053 my $cmd_help = {
1054 destroy => qq{
1055 $PROGNAME destroy -source <string> [OPTIONS]
1056
1057 remove a sync Job from the scheduler
1058
1059 -name string
1060
1061 name of the sync job, if not set it is default
1062
1063 -source string
1064
1065 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1066 },
1067 create => qq{
1068 $PROGNAME create -dest <string> -source <string> [OPTIONS]
1069
1070 Create a sync Job
1071
1072 -dest string
1073
1074 the destination target is like [IP]:<Pool>[/Path]
1075
1076 -dest-user string
1077
1078 name of the user on the destination target, root by default
1079
1080 -limit integer
1081
1082 max sync speed in kBytes/s, default unlimited
1083
1084 -maxsnap string
1085
1086 how much snapshots will be kept before get erased, default 1
1087
1088 -name string
1089
1090 name of the sync job, if not set it is default
1091
1092 -skip boolean
1093
1094 if this flag is set it will skip the first sync
1095
1096 -source string
1097
1098 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1099
1100 -source-user string
1101
1102 name of the user on the source target, root by default
1103
1104 -properties boolean
1105
1106 Include the dataset's properties in the stream.
1107
1108 -dest-config-path string
1109
1110 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1111 },
1112 sync => qq{
1113 $PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
1114
1115 will sync one time
1116
1117 -dest string
1118
1119 the destination target is like [IP:]<Pool>[/Path]
1120
1121 -dest-user string
1122
1123 name of the user on the destination target, root by default
1124
1125 -limit integer
1126
1127 max sync speed in kBytes/s, default unlimited
1128
1129 -maxsnap integer
1130
1131 how much snapshots will be kept before get erased, default 1
1132
1133 -name string
1134
1135 name of the sync job, if not set it is default.
1136 It is only necessary if scheduler allready contains this source.
1137
1138 -source string
1139
1140 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1141
1142 -source-user string
1143
1144 name of the user on the source target, root by default
1145
1146 -verbose boolean
1147
1148 print out the sync progress.
1149
1150 -properties boolean
1151
1152 Include the dataset's properties in the stream.
1153
1154 -dest-config-path string
1155
1156 specify a custom config path on the destination target. default is /var/lib/pve-zsync
1157 },
1158 list => qq{
1159 $PROGNAME list
1160
1161 Get a List of all scheduled Sync Jobs
1162 },
1163 status => qq{
1164 $PROGNAME status
1165
1166 Get the status of all scheduled Sync Jobs
1167 },
1168 help => qq{
1169 $PROGNAME help <cmd> [OPTIONS]
1170
1171 Get help about specified command.
1172
1173 <cmd> string
1174
1175 Command name
1176
1177 -verbose boolean
1178
1179 Verbose output format.
1180 },
1181 enable => qq{
1182 $PROGNAME enable -source <string> [OPTIONS]
1183
1184 enable a syncjob and reset error
1185
1186 -name string
1187
1188 name of the sync job, if not set it is default
1189
1190 -source string
1191
1192 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1193 },
1194 disable => qq{
1195 $PROGNAME disable -source <string> [OPTIONS]
1196
1197 pause a sync job
1198
1199 -name string
1200
1201 name of the sync job, if not set it is default
1202
1203 -source string
1204
1205 the source can be an <VMID> or [IP:]<ZFSPool>[/Path]
1206 },
1207 printpod => 'internal command',
1208
1209 };
1210
1211 if (!$command) {
1212 usage(); die "\n";
1213 } elsif (!$cmd_help->{$command}) {
1214 print "ERROR: unknown command '$command'";
1215 usage(1); die "\n";
1216 }
1217
1218 my @arg = @ARGV;
1219 my $param = parse_argv(@arg);
1220
1221 sub check_params {
1222 for (@_) {
1223 die "$cmd_help->{$command}\n" if !$param->{$_};
1224 }
1225 }
1226
1227 if ($command eq 'destroy') {
1228 check_params(qw(source));
1229
1230 check_target($param->{source});
1231 destroy_job($param);
1232
1233 } elsif ($command eq 'sync') {
1234 check_params(qw(source dest));
1235
1236 check_target($param->{source});
1237 check_target($param->{dest});
1238 sync($param);
1239
1240 } elsif ($command eq 'create') {
1241 check_params(qw(source dest));
1242
1243 check_target($param->{source});
1244 check_target($param->{dest});
1245 init($param);
1246
1247 } elsif ($command eq 'status') {
1248 print status();
1249
1250 } elsif ($command eq 'list') {
1251 print list();
1252
1253 } elsif ($command eq 'help') {
1254 my $help_command = $ARGV[1];
1255
1256 if ($help_command && $cmd_help->{$help_command}) {
1257 die "$cmd_help->{$help_command}\n";
1258
1259 }
1260 if ($param->{verbose}) {
1261 exec("man $PROGNAME");
1262
1263 } else {
1264 usage(1);
1265
1266 }
1267
1268 } elsif ($command eq 'enable') {
1269 check_params(qw(source));
1270
1271 check_target($param->{source});
1272 enable_job($param);
1273
1274 } elsif ($command eq 'disable') {
1275 check_params(qw(source));
1276
1277 check_target($param->{source});
1278 disable_job($param);
1279
1280 } elsif ($command eq 'printpod') {
1281 print_pod();
1282 }
1283
1284 sub usage {
1285 my ($help) = @_;
1286
1287 print("ERROR:\tno command specified\n") if !$help;
1288 print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
1289 print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
1290 print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
1291 print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
1292 print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
1293 print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
1294 print("\t$PROGNAME list\n");
1295 print("\t$PROGNAME status\n");
1296 print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
1297 }
1298
1299 sub check_target {
1300 my ($target) = @_;
1301 parse_target($target);
1302 }
1303
1304 sub print_pod {
1305
1306 my $synopsis = join("\n", sort values %$cmd_help);
1307
1308 print <<EOF;
1309 =head1 NAME
1310
1311 pve-zsync - PVE ZFS Replication Manager
1312
1313 =head1 SYNOPSIS
1314
1315 pve-zsync <COMMAND> [ARGS] [OPTIONS]
1316
1317 $synopsis
1318
1319 =head1 DESCRIPTION
1320
1321 This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
1322 This tool also has the capability to add jobs to cron so the sync will be automatically done.
1323 The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
1324 To config cron see man crontab.
1325
1326 =head2 PVE ZFS Storage sync Tool
1327
1328 This Tool can get remote pool on other PVE or send Pool to others ZFS machines
1329
1330 =head1 EXAMPLES
1331
1332 add sync job from local VM to remote ZFS Server
1333 pve-zsync create -source=100 -dest=192.168.1.2:zfspool
1334
1335 =head1 IMPORTANT FILES
1336
1337 Cron jobs and config are stored at /etc/cron.d/pve-zsync
1338
1339 The VM config get copied on the destination machine to /var/lib/pve-zsync/
1340
1341 =head1 COPYRIGHT AND DISCLAIMER
1342
1343 Copyright (C) 2007-2015 Proxmox Server Solutions GmbH
1344
1345 This program is free software: you can redistribute it and/or modify it
1346 under the terms of the GNU Affero General Public License as published
1347 by the Free Software Foundation, either version 3 of the License, or
1348 (at your option) any later version.
1349
1350 This program is distributed in the hope that it will be useful, but
1351 WITHOUT ANY WARRANTY; without even the implied warranty of
1352 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1353 Affero General Public License for more details.
1354
1355 You should have received a copy of the GNU Affero General Public
1356 License along with this program. If not, see
1357 <http://www.gnu.org/licenses/>.
1358
1359 EOF
1360 }