]> git.proxmox.com Git - pve-cluster.git/blob - data/PVE/Cluster.pm
20571625104f04fb4751771efa2eec6263994602
[pve-cluster.git] / data / PVE / Cluster.pm
1 package PVE::Cluster;
2
3 use strict;
4 use warnings;
5
6 use Encode;
7 use File::stat qw();
8 use JSON;
9 use Net::SSLeay;
10 use POSIX qw(ENOENT);
11 use Socket;
12 use Storable qw(dclone);
13
14 use PVE::Certificate;
15 use PVE::INotify;
16 use PVE::IPCC;
17 use PVE::JSONSchema;
18 use PVE::Network;
19 use PVE::SafeSyslog;
20 use PVE::Tools qw(run_command);
21
22 use PVE::Cluster::IPCConst;
23
24 use base 'Exporter';
25
26 our @EXPORT_OK = qw(
27 cfs_read_file
28 cfs_write_file
29 cfs_register_file
30 cfs_lock_file);
31
32 # x509 certificate utils
33
34 my $basedir = "/etc/pve";
35 my $authdir = "$basedir/priv";
36 my $lockdir = "/etc/pve/priv/lock";
37
38 # cfs and corosync files
39 my $dbfile = "/var/lib/pve-cluster/config.db";
40 my $dbbackupdir = "/var/lib/pve-cluster/backup";
41
42 # this is just a readonly copy, the relevant one is in status.c from pmxcfs
43 # observed files are the one we can get directly through IPCC, they are cached
44 # using a computed version and only those can be used by the cfs_*_file methods
45 my $observed = {
46 'vzdump.cron' => 1,
47 'storage.cfg' => 1,
48 'datacenter.cfg' => 1,
49 'replication.cfg' => 1,
50 'corosync.conf' => 1,
51 'corosync.conf.new' => 1,
52 'user.cfg' => 1,
53 'domains.cfg' => 1,
54 'priv/shadow.cfg' => 1,
55 'priv/tfa.cfg' => 1,
56 '/qemu-server/' => 1,
57 '/openvz/' => 1,
58 '/lxc/' => 1,
59 'ha/crm_commands' => 1,
60 'ha/manager_status' => 1,
61 'ha/resources.cfg' => 1,
62 'ha/groups.cfg' => 1,
63 'ha/fence.cfg' => 1,
64 'status.cfg' => 1,
65 'ceph.conf' => 1,
66 'sdn.cfg' => 1,
67 'sdn.cfg.new' => 1,
68 };
69
70 sub base_dir {
71 return $basedir;
72 }
73
74 sub auth_dir {
75 return $authdir;
76 }
77
78 sub check_cfs_quorum {
79 my ($noerr) = @_;
80
81 # note: -w filename always return 1 for root, so wee need
82 # to use File::lstat here
83 my $st = File::stat::lstat("$basedir/local");
84 my $quorate = ($st && (($st->mode & 0200) != 0));
85
86 die "cluster not ready - no quorum?\n" if !$quorate && !$noerr;
87
88 return $quorate;
89 }
90
91 sub check_cfs_is_mounted {
92 my ($noerr) = @_;
93
94 my $res = -l "$basedir/local";
95
96 die "pve configuration filesystem not mounted\n"
97 if !$res && !$noerr;
98
99 return $res;
100 }
101
102 my $versions = {};
103 my $vmlist = {};
104 my $clinfo = {};
105
106 my $ipcc_send_rec = sub {
107 my ($msgid, $data) = @_;
108
109 my $res = PVE::IPCC::ipcc_send_rec($msgid, $data);
110
111 die "ipcc_send_rec[$msgid] failed: $!\n" if !defined($res) && ($! != 0);
112
113 return $res;
114 };
115
116 my $ipcc_send_rec_json = sub {
117 my ($msgid, $data) = @_;
118
119 my $res = PVE::IPCC::ipcc_send_rec($msgid, $data);
120
121 die "ipcc_send_rec[$msgid] failed: $!\n" if !defined($res) && ($! != 0);
122
123 return decode_json($res);
124 };
125
126 my $ipcc_get_config = sub {
127 my ($path) = @_;
128
129 my $bindata = pack "Z*", $path;
130 my $res = PVE::IPCC::ipcc_send_rec(CFS_IPC_GET_CONFIG, $bindata);
131 if (!defined($res)) {
132 if ($! != 0) {
133 return undef if $! == ENOENT;
134 die "$!\n";
135 }
136 return '';
137 }
138
139 return $res;
140 };
141
142 my $ipcc_get_status = sub {
143 my ($name, $nodename) = @_;
144
145 my $bindata = pack "Z[256]Z[256]", $name, ($nodename || "");
146 return PVE::IPCC::ipcc_send_rec(CFS_IPC_GET_STATUS, $bindata);
147 };
148
149 my $ipcc_remove_status = sub {
150 my ($name) = @_;
151 # we just omit the data payload, pmxcfs takes this as hint and removes this
152 # key from the status hashtable
153 my $bindata = pack "Z[256]", $name;
154 return &$ipcc_send_rec(CFS_IPC_SET_STATUS, $bindata);
155 };
156
157 my $ipcc_update_status = sub {
158 my ($name, $data) = @_;
159
160 my $raw = ref($data) ? encode_json($data) : $data;
161 # update status
162 my $bindata = pack "Z[256]Z*", $name, $raw;
163
164 return &$ipcc_send_rec(CFS_IPC_SET_STATUS, $bindata);
165 };
166
167 my $ipcc_log = sub {
168 my ($priority, $ident, $tag, $msg) = @_;
169
170 my $bindata = pack "CCCZ*Z*Z*", $priority, bytes::length($ident) + 1,
171 bytes::length($tag) + 1, $ident, $tag, $msg;
172
173 return &$ipcc_send_rec(CFS_IPC_LOG_CLUSTER_MSG, $bindata);
174 };
175
176 my $ipcc_get_cluster_log = sub {
177 my ($user, $max) = @_;
178
179 $max = 0 if !defined($max);
180
181 my $bindata = pack "VVVVZ*", $max, 0, 0, 0, ($user || "");
182 return &$ipcc_send_rec(CFS_IPC_GET_CLUSTER_LOG, $bindata);
183 };
184
185 my $ccache = {};
186
187 sub cfs_update {
188 my ($fail) = @_;
189 eval {
190 my $res = &$ipcc_send_rec_json(CFS_IPC_GET_FS_VERSION);
191 die "no starttime\n" if !$res->{starttime};
192
193 if (!$res->{starttime} || !$versions->{starttime} ||
194 $res->{starttime} != $versions->{starttime}) {
195 #print "detected changed starttime\n";
196 $vmlist = {};
197 $clinfo = {};
198 $ccache = {};
199 }
200
201 $versions = $res;
202 };
203 my $err = $@;
204 if ($err) {
205 $versions = {};
206 $vmlist = {};
207 $clinfo = {};
208 $ccache = {};
209 die $err if $fail;
210 warn $err;
211 }
212
213 eval {
214 if (!$clinfo->{version} || $clinfo->{version} != $versions->{clinfo}) {
215 #warn "detected new clinfo\n";
216 $clinfo = &$ipcc_send_rec_json(CFS_IPC_GET_CLUSTER_INFO);
217 }
218 };
219 $err = $@;
220 if ($err) {
221 $clinfo = {};
222 die $err if $fail;
223 warn $err;
224 }
225
226 eval {
227 if (!$vmlist->{version} || $vmlist->{version} != $versions->{vmlist}) {
228 #warn "detected new vmlist1\n";
229 $vmlist = &$ipcc_send_rec_json(CFS_IPC_GET_GUEST_LIST);
230 }
231 };
232 $err = $@;
233 if ($err) {
234 $vmlist = {};
235 die $err if $fail;
236 warn $err;
237 }
238 }
239
240 sub get_vmlist {
241 return $vmlist;
242 }
243
244 sub get_clinfo {
245 return $clinfo;
246 }
247
248 sub get_members {
249 return $clinfo->{nodelist};
250 }
251
252 sub get_nodelist {
253 my $nodelist = $clinfo->{nodelist};
254
255 my $nodename = PVE::INotify::nodename();
256
257 if (!$nodelist || !$nodelist->{$nodename}) {
258 return [ $nodename ];
259 }
260
261 return [ keys %$nodelist ];
262 }
263
264 # only stored in a in-memory hashtable inside pmxcfs, local data is gone after
265 # a restart (of pmxcfs or the node), peer data is still available then
266 # best used for status data, like running (ceph) services, package versions, ...
267 sub broadcast_node_kv {
268 my ($key, $data) = @_;
269
270 if (!defined($data)) {
271 eval {
272 $ipcc_remove_status->("kv/$key");
273 };
274 } else {
275 die "cannot send a reference\n" if ref($data);
276 my $size = length($data);
277 die "data for '$key' too big\n" if $size >= (32 * 1024); # limit from pmxfs
278
279 eval {
280 $ipcc_update_status->("kv/$key", $data);
281 };
282 }
283
284 warn $@ if $@;
285 }
286
287 # nodename is optional
288 sub get_node_kv {
289 my ($key, $nodename) = @_;
290
291 my $res = {};
292 my $get_node_data = sub {
293 my ($node) = @_;
294 my $raw = $ipcc_get_status->("kv/$key", $node);
295 $res->{$node} = unpack("Z*", $raw) if $raw;
296 };
297
298 if ($nodename) {
299 $get_node_data->($nodename);
300 } else {
301 my $nodelist = get_nodelist();
302
303 foreach my $node (@$nodelist) {
304 $get_node_data->($node);
305 }
306 }
307
308 return $res;
309 }
310
311 # property: a config property you want to get, e.g., this is perfect to get
312 # the 'lock' entry of a guest _fast_ (>100 faster than manual parsing here)
313 # vmid: optipnal, if a valid is passed we only check that one, else return all
314 # NOTE: does *not* searches snapshot and PENDING entries sections!
315 sub get_guest_config_property {
316 my ($property, $vmid) = @_;
317
318 die "property is required" if !defined($property);
319
320 my $bindata = pack "VZ*", $vmid // 0, $property;
321 my $res = $ipcc_send_rec_json->(CFS_IPC_GET_GUEST_CONFIG_PROPERTY, $bindata);
322
323 return $res;
324 }
325
326 # $data must be a chronological descending ordered array of tasks
327 sub broadcast_tasklist {
328 my ($data) = @_;
329
330 # the serialized list may not get bigger than 32kb (CFS_MAX_STATUS_SIZE
331 # from pmxcfs) - drop older items until we satisfy this constraint
332 my $size = length(encode_json($data));
333 while ($size >= (32 * 1024)) {
334 pop @$data;
335 $size = length(encode_json($data));
336 }
337
338 eval {
339 &$ipcc_update_status("tasklist", $data);
340 };
341
342 warn $@ if $@;
343 }
344
345 my $tasklistcache = {};
346
347 sub get_tasklist {
348 my ($nodename) = @_;
349
350 my $kvstore = $versions->{kvstore} || {};
351
352 my $nodelist = get_nodelist();
353
354 my $res = [];
355 foreach my $node (@$nodelist) {
356 next if $nodename && ($nodename ne $node);
357 eval {
358 my $ver = $kvstore->{$node}->{tasklist} if $kvstore->{$node};
359 my $cd = $tasklistcache->{$node};
360 if (!$cd || !$ver || !$cd->{version} ||
361 ($cd->{version} != $ver)) {
362 my $raw = &$ipcc_get_status("tasklist", $node) || '[]';
363 my $data = decode_json($raw);
364 push @$res, @$data;
365 $cd = $tasklistcache->{$node} = {
366 data => $data,
367 version => $ver,
368 };
369 } elsif ($cd && $cd->{data}) {
370 push @$res, @{$cd->{data}};
371 }
372 };
373 my $err = $@;
374 syslog('err', $err) if $err;
375 }
376
377 return $res;
378 }
379
380 sub broadcast_rrd {
381 my ($rrdid, $data) = @_;
382
383 eval {
384 &$ipcc_update_status("rrd/$rrdid", $data);
385 };
386 my $err = $@;
387
388 warn $err if $err;
389 }
390
391 my $last_rrd_dump = 0;
392 my $last_rrd_data = "";
393
394 sub rrd_dump {
395
396 my $ctime = time();
397
398 my $diff = $ctime - $last_rrd_dump;
399 if ($diff < 2) {
400 return $last_rrd_data;
401 }
402
403 my $raw;
404 eval {
405 $raw = &$ipcc_send_rec(CFS_IPC_GET_RRD_DUMP);
406 };
407 my $err = $@;
408
409 if ($err) {
410 warn $err;
411 return {};
412 }
413
414 my $res = {};
415
416 if ($raw) {
417 while ($raw =~ s/^(.*)\n//) {
418 my ($key, @ela) = split(/:/, $1);
419 next if !$key;
420 next if !(scalar(@ela) > 1);
421 $res->{$key} = [ map { $_ eq 'U' ? undef : $_ } @ela ];
422 }
423 }
424
425 $last_rrd_dump = $ctime;
426 $last_rrd_data = $res;
427
428 return $res;
429 }
430
431
432 # a fast way to read files (avoid fuse overhead)
433 sub get_config {
434 my ($path) = @_;
435
436 return &$ipcc_get_config($path);
437 }
438
439 sub get_cluster_log {
440 my ($user, $max) = @_;
441
442 return &$ipcc_get_cluster_log($user, $max);
443 }
444
445 my $file_info = {};
446
447 sub cfs_register_file {
448 my ($filename, $parser, $writer) = @_;
449
450 $observed->{$filename} || die "unknown file '$filename'";
451
452 die "file '$filename' already registered" if $file_info->{$filename};
453
454 $file_info->{$filename} = {
455 parser => $parser,
456 writer => $writer,
457 };
458 }
459
460 my $ccache_read = sub {
461 my ($filename, $parser, $version) = @_;
462
463 $ccache->{$filename} = {} if !$ccache->{$filename};
464
465 my $ci = $ccache->{$filename};
466
467 if (!$ci->{version} || !$version || $ci->{version} != $version) {
468 # we always call the parser, even when the file does not exists
469 # (in that case $data is undef)
470 my $data = get_config($filename);
471 $ci->{data} = &$parser("/etc/pve/$filename", $data);
472 $ci->{version} = $version;
473 }
474
475 my $res = ref($ci->{data}) ? dclone($ci->{data}) : $ci->{data};
476
477 return $res;
478 };
479
480 sub cfs_file_version {
481 my ($filename) = @_;
482
483 my $version;
484 my $infotag;
485 if ($filename =~ m!^nodes/[^/]+/(openvz|lxc|qemu-server)/(\d+)\.conf$!) {
486 my ($type, $vmid) = ($1, $2);
487 if ($vmlist && $vmlist->{ids} && $vmlist->{ids}->{$vmid}) {
488 $version = $vmlist->{ids}->{$vmid}->{version};
489 }
490 $infotag = "/$type/";
491 } else {
492 $infotag = $filename;
493 $version = $versions->{$filename};
494 }
495
496 my $info = $file_info->{$infotag} ||
497 die "unknown file type '$filename'\n";
498
499 return wantarray ? ($version, $info) : $version;
500 }
501
502 sub cfs_read_file {
503 my ($filename) = @_;
504
505 my ($version, $info) = cfs_file_version($filename);
506 my $parser = $info->{parser};
507
508 return &$ccache_read($filename, $parser, $version);
509 }
510
511 sub cfs_write_file {
512 my ($filename, $data) = @_;
513
514 my ($version, $info) = cfs_file_version($filename);
515
516 my $writer = $info->{writer} || die "no writer defined";
517
518 my $fsname = "/etc/pve/$filename";
519
520 my $raw = &$writer($fsname, $data);
521
522 if (my $ci = $ccache->{$filename}) {
523 $ci->{version} = undef;
524 }
525
526 PVE::Tools::file_set_contents($fsname, $raw);
527 }
528
529 my $cfs_lock = sub {
530 my ($lockid, $timeout, $code, @param) = @_;
531
532 my $prev_alarm = alarm(0); # suspend outer alarm early
533
534 my $res;
535 my $got_lock = 0;
536
537 # this timeout is for acquire the lock
538 $timeout = 10 if !$timeout;
539
540 my $filename = "$lockdir/$lockid";
541
542 eval {
543
544 mkdir $lockdir;
545
546 if (! -d $lockdir) {
547 die "pve cluster filesystem not online.\n";
548 }
549
550 my $timeout_err = sub { die "got lock request timeout\n"; };
551 local $SIG{ALRM} = $timeout_err;
552
553 while (1) {
554 alarm ($timeout);
555 $got_lock = mkdir($filename);
556 $timeout = alarm(0) - 1; # we'll sleep for 1s, see down below
557
558 last if $got_lock;
559
560 $timeout_err->() if $timeout <= 0;
561
562 print STDERR "trying to acquire cfs lock '$lockid' ...\n";
563 utime (0, 0, $filename); # cfs unlock request
564 sleep(1);
565 }
566
567 # fixed command timeout: cfs locks have a timeout of 120
568 # using 60 gives us another 60 seconds to abort the task
569 local $SIG{ALRM} = sub { die "got lock timeout - aborting command\n"; };
570 alarm(60);
571
572 cfs_update(); # make sure we read latest versions inside code()
573
574 $res = &$code(@param);
575
576 alarm(0);
577 };
578
579 my $err = $@;
580
581 $err = "no quorum!\n" if !$got_lock && !check_cfs_quorum(1);
582
583 rmdir $filename if $got_lock; # if we held the lock always unlock again
584
585 alarm($prev_alarm);
586
587 if ($err) {
588 $@ = "error with cfs lock '$lockid': $err";
589 return undef;
590 }
591
592 $@ = undef;
593
594 return $res;
595 };
596
597 sub cfs_lock_file {
598 my ($filename, $timeout, $code, @param) = @_;
599
600 my $info = $observed->{$filename} || die "unknown file '$filename'";
601
602 my $lockid = "file-$filename";
603 $lockid =~ s/[.\/]/_/g;
604
605 &$cfs_lock($lockid, $timeout, $code, @param);
606 }
607
608 sub cfs_lock_storage {
609 my ($storeid, $timeout, $code, @param) = @_;
610
611 my $lockid = "storage-$storeid";
612
613 &$cfs_lock($lockid, $timeout, $code, @param);
614 }
615
616 sub cfs_lock_domain {
617 my ($domainname, $timeout, $code, @param) = @_;
618
619 my $lockid = "domain-$domainname";
620
621 &$cfs_lock($lockid, $timeout, $code, @param);
622 }
623
624 sub cfs_lock_acme {
625 my ($account, $timeout, $code, @param) = @_;
626
627 my $lockid = "acme-$account";
628
629 &$cfs_lock($lockid, $timeout, $code, @param);
630 }
631
632 sub cfs_lock_authkey {
633 my ($timeout, $code, @param) = @_;
634
635 $cfs_lock->('authkey', $timeout, $code, @param);
636 }
637
638 my $log_levels = {
639 "emerg" => 0,
640 "alert" => 1,
641 "crit" => 2,
642 "critical" => 2,
643 "err" => 3,
644 "error" => 3,
645 "warn" => 4,
646 "warning" => 4,
647 "notice" => 5,
648 "info" => 6,
649 "debug" => 7,
650 };
651
652 sub log_msg {
653 my ($priority, $ident, $msg) = @_;
654
655 if (my $tmp = $log_levels->{$priority}) {
656 $priority = $tmp;
657 }
658
659 die "need numeric log priority" if $priority !~ /^\d+$/;
660
661 my $tag = PVE::SafeSyslog::tag();
662
663 $msg = "empty message" if !$msg;
664
665 $ident = "" if !$ident;
666 $ident = encode("ascii", $ident,
667 sub { sprintf "\\u%04x", shift });
668
669 my $ascii = encode("ascii", $msg, sub { sprintf "\\u%04x", shift });
670
671 if ($ident) {
672 syslog($priority, "<%s> %s", $ident, $ascii);
673 } else {
674 syslog($priority, "%s", $ascii);
675 }
676
677 eval { &$ipcc_log($priority, $ident, $tag, $ascii); };
678
679 syslog("err", "writing cluster log failed: $@") if $@;
680 }
681
682 sub check_vmid_unused {
683 my ($vmid, $noerr) = @_;
684
685 my $vmlist = get_vmlist();
686
687 my $d = $vmlist->{ids}->{$vmid};
688 return 1 if !defined($d);
689
690 return undef if $noerr;
691
692 my $vmtypestr = $d->{type} eq 'qemu' ? 'VM' : 'CT';
693 die "$vmtypestr $vmid already exists on node '$d->{node}'\n";
694 }
695
696 sub check_node_exists {
697 my ($nodename, $noerr) = @_;
698
699 my $nodelist = $clinfo->{nodelist};
700 return 1 if $nodelist && $nodelist->{$nodename};
701
702 return undef if $noerr;
703
704 die "no such cluster node '$nodename'\n";
705 }
706
707 # this is also used to get the IP of the local node
708 sub remote_node_ip {
709 my ($nodename, $noerr) = @_;
710
711 my $nodelist = $clinfo->{nodelist};
712 if ($nodelist && $nodelist->{$nodename}) {
713 if (my $ip = $nodelist->{$nodename}->{ip}) {
714 return $ip if !wantarray;
715 my $family = $nodelist->{$nodename}->{address_family};
716 if (!$family) {
717 $nodelist->{$nodename}->{address_family} =
718 $family =
719 PVE::Tools::get_host_address_family($ip);
720 }
721 return wantarray ? ($ip, $family) : $ip;
722 }
723 }
724
725 # fallback: try to get IP by other means
726 return PVE::Network::get_ip_from_hostname($nodename, $noerr);
727 }
728
729 sub get_node_fingerprint {
730 my ($node) = @_;
731
732 my $cert_path = "/etc/pve/nodes/$node/pve-ssl.pem";
733 my $custom_cert_path = "/etc/pve/nodes/$node/pveproxy-ssl.pem";
734
735 $cert_path = $custom_cert_path if -f $custom_cert_path;
736
737 return PVE::Certificate::get_certificate_fingerprint($cert_path);
738 }
739
740 # bash completion helpers
741
742 sub complete_next_vmid {
743
744 my $vmlist = get_vmlist() || {};
745 my $idlist = $vmlist->{ids} || {};
746
747 for (my $i = 100; $i < 10000; $i++) {
748 return [$i] if !defined($idlist->{$i});
749 }
750
751 return [];
752 }
753
754 sub complete_vmid {
755
756 my $vmlist = get_vmlist();
757 my $ids = $vmlist->{ids} || {};
758
759 return [ keys %$ids ];
760 }
761
762 sub complete_local_vmid {
763
764 my $vmlist = get_vmlist();
765 my $ids = $vmlist->{ids} || {};
766
767 my $nodename = PVE::INotify::nodename();
768
769 my $res = [];
770 foreach my $vmid (keys %$ids) {
771 my $d = $ids->{$vmid};
772 next if !$d->{node} || $d->{node} ne $nodename;
773 push @$res, $vmid;
774 }
775
776 return $res;
777 }
778
779 sub complete_migration_target {
780
781 my $res = [];
782
783 my $nodename = PVE::INotify::nodename();
784
785 my $nodelist = get_nodelist();
786 foreach my $node (@$nodelist) {
787 next if $node eq $nodename;
788 push @$res, $node;
789 }
790
791 return $res;
792 }
793
794
795 # NOTE: filesystem must be offline here, no DB changes allowed
796 sub cfs_backup_database {
797 mkdir $dbbackupdir;
798
799 my $ctime = time();
800 my $backup_fn = "$dbbackupdir/config-$ctime.sql.gz";
801
802 print "backup old database to '$backup_fn'\n";
803
804 my $cmd = [ ['sqlite3', $dbfile, '.dump'], ['gzip', '-', \ ">${backup_fn}"] ];
805 run_command($cmd, 'errmsg' => "cannot backup old database\n");
806
807 my $maxfiles = 10; # purge older backup
808 my $backups = [ sort { $b cmp $a } <$dbbackupdir/config-*.sql.gz> ];
809
810 if ((my $count = scalar(@$backups)) > $maxfiles) {
811 foreach my $f (@$backups[$maxfiles..$count-1]) {
812 next if $f !~ m/^(\S+)$/; # untaint
813 print "delete old backup '$1'\n";
814 unlink $1;
815 }
816 }
817
818 return $dbfile;
819 }
820
821 1;