]> git.proxmox.com Git - pve-cluster.git/blob - data/src/status.c
628015af92dde8f7b3766a9e3febd7626d096d3f
[pve-cluster.git] / data / src / status.c
1 /*
2 Copyright (C) 2010 - 2020 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19 */
20
21 #define G_LOG_DOMAIN "status"
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif /* HAVE_CONFIG_H */
26
27 #include <stdio.h>
28 #include <stdint.h>
29 #include <string.h>
30 #include <errno.h>
31 #include <glib.h>
32 #include <sys/syslog.h>
33 #include <rrd.h>
34 #include <rrd_client.h>
35 #include <time.h>
36 #include <ctype.h>
37
38 #include "cfs-utils.h"
39 #include "status.h"
40 #include "memdb.h"
41 #include "logger.h"
42
43 #define KVSTORE_CPG_GROUP_NAME "pve_kvstore_v1"
44
45 typedef enum {
46 KVSTORE_MESSAGE_UPDATE = 1,
47 KVSTORE_MESSAGE_UPDATE_COMPLETE = 2,
48 KVSTORE_MESSAGE_LOG = 3,
49 } kvstore_message_t;
50
51 static uint32_t vminfo_version_counter;
52
53 typedef struct {
54 uint32_t vmid;
55 char *nodename;
56 int vmtype;
57 uint32_t version;
58 } vminfo_t;
59
60 typedef struct {
61 char *key;
62 gpointer data;
63 size_t len;
64 uint32_t version;
65 } kventry_t;
66
67 typedef struct {
68 char *key;
69 gpointer data;
70 size_t len;
71 uint32_t time;
72 } rrdentry_t;
73
74 typedef struct {
75 char *path;
76 uint32_t version;
77 } memdb_change_t;
78
79 static memdb_change_t memdb_change_array[] = {
80 { .path = "corosync.conf" },
81 { .path = "corosync.conf.new" },
82 { .path = "storage.cfg" },
83 { .path = "user.cfg" },
84 { .path = "domains.cfg" },
85 { .path = "priv/shadow.cfg" },
86 { .path = "priv/acme/plugins.cfg" },
87 { .path = "priv/tfa.cfg" },
88 { .path = "priv/token.cfg" },
89 { .path = "priv/ipam.db" },
90 { .path = "datacenter.cfg" },
91 { .path = "vzdump.cron" },
92 { .path = "ha/crm_commands" },
93 { .path = "ha/manager_status" },
94 { .path = "ha/resources.cfg" },
95 { .path = "ha/groups.cfg" },
96 { .path = "ha/fence.cfg" },
97 { .path = "status.cfg" },
98 { .path = "replication.cfg" },
99 { .path = "ceph.conf" },
100 { .path = "sdn/vnets.cfg" },
101 { .path = "sdn/zones.cfg" },
102 { .path = "sdn/controllers.cfg" },
103 { .path = "sdn/subnets.cfg" },
104 { .path = "sdn/ipams.cfg" },
105 { .path = "sdn/.version" },
106 { .path = "virtual-guest/cpu-models.conf" },
107 };
108
109 static GMutex mutex;
110
111 typedef struct {
112 time_t start_time;
113
114 uint32_t quorate;
115
116 cfs_clinfo_t *clinfo;
117 uint32_t clinfo_version;
118
119 GHashTable *vmlist;
120 uint32_t vmlist_version;
121
122 dfsm_t *kvstore;
123 GHashTable *kvhash;
124 GHashTable *rrdhash;
125 GHashTable *iphash;
126
127 GHashTable *memdb_changes;
128
129 clusterlog_t *clusterlog;
130 } cfs_status_t;
131
132 static cfs_status_t cfs_status;
133
134 struct cfs_clnode {
135 char *name;
136 uint32_t nodeid;
137 uint32_t votes;
138 gboolean online;
139 GHashTable *kvhash;
140 };
141
142 struct cfs_clinfo {
143 char *cluster_name;
144 uint32_t cman_version;
145
146 GHashTable *nodes_byid;
147 GHashTable *nodes_byname;
148 };
149
150 static guint
151 g_int32_hash (gconstpointer v)
152 {
153 return *(const uint32_t *) v;
154 }
155
156 static gboolean
157 g_int32_equal (gconstpointer v1,
158 gconstpointer v2)
159 {
160 return *((const uint32_t*) v1) == *((const uint32_t*) v2);
161 }
162
163 static void vminfo_free(vminfo_t *vminfo)
164 {
165 g_return_if_fail(vminfo != NULL);
166
167 if (vminfo->nodename)
168 g_free(vminfo->nodename);
169
170
171 g_free(vminfo);
172 }
173
174 static const char *vminfo_type_to_string(vminfo_t *vminfo)
175 {
176 if (vminfo->vmtype == VMTYPE_QEMU) {
177 return "qemu";
178 } else if (vminfo->vmtype == VMTYPE_OPENVZ) {
179 // FIXME: remove openvz stuff for 7.x
180 return "openvz";
181 } else if (vminfo->vmtype == VMTYPE_LXC) {
182 return "lxc";
183 } else {
184 return "unknown";
185 }
186 }
187
188 static const char *vminfo_type_to_path_type(vminfo_t *vminfo)
189 {
190 if (vminfo->vmtype == VMTYPE_QEMU) {
191 return "qemu-server"; // special case..
192 } else {
193 return vminfo_type_to_string(vminfo);
194 }
195 }
196
197 int vminfo_to_path(vminfo_t *vminfo, GString *path)
198 {
199 g_return_val_if_fail(vminfo != NULL, -1);
200 g_return_val_if_fail(path != NULL, -1);
201
202 if (!vminfo->nodename)
203 return 0;
204
205 const char *type = vminfo_type_to_path_type(vminfo);
206 g_string_printf(path, "/nodes/%s/%s/%u.conf", vminfo->nodename, type, vminfo->vmid);
207
208 return 1;
209 }
210
211 void cfs_clnode_destroy(
212 cfs_clnode_t *clnode)
213 {
214 g_return_if_fail(clnode != NULL);
215
216 if (clnode->kvhash)
217 g_hash_table_destroy(clnode->kvhash);
218
219 if (clnode->name)
220 g_free(clnode->name);
221
222 g_free(clnode);
223 }
224
225 cfs_clnode_t *cfs_clnode_new(
226 const char *name,
227 uint32_t nodeid,
228 uint32_t votes)
229 {
230 g_return_val_if_fail(name != NULL, NULL);
231
232 cfs_clnode_t *clnode = g_new0(cfs_clnode_t, 1);
233 if (!clnode)
234 return NULL;
235
236 clnode->name = g_strdup(name);
237 clnode->nodeid = nodeid;
238 clnode->votes = votes;
239
240 return clnode;
241 }
242
243 gboolean cfs_clinfo_destroy(
244 cfs_clinfo_t *clinfo)
245 {
246 g_return_val_if_fail(clinfo != NULL, FALSE);
247
248 if (clinfo->cluster_name)
249 g_free(clinfo->cluster_name);
250
251 if (clinfo->nodes_byname)
252 g_hash_table_destroy(clinfo->nodes_byname);
253
254 if (clinfo->nodes_byid)
255 g_hash_table_destroy(clinfo->nodes_byid);
256
257 g_free(clinfo);
258
259 return TRUE;
260 }
261
262 cfs_clinfo_t *cfs_clinfo_new(
263 const char *cluster_name,
264 uint32_t cman_version)
265 {
266 g_return_val_if_fail(cluster_name != NULL, NULL);
267
268 cfs_clinfo_t *clinfo = g_new0(cfs_clinfo_t, 1);
269 if (!clinfo)
270 return NULL;
271
272 clinfo->cluster_name = g_strdup(cluster_name);
273 clinfo->cman_version = cman_version;
274
275 if (!(clinfo->nodes_byid = g_hash_table_new_full(
276 g_int32_hash, g_int32_equal, NULL,
277 (GDestroyNotify)cfs_clnode_destroy)))
278 goto fail;
279
280 if (!(clinfo->nodes_byname = g_hash_table_new(g_str_hash, g_str_equal)))
281 goto fail;
282
283 return clinfo;
284
285 fail:
286 cfs_clinfo_destroy(clinfo);
287
288 return NULL;
289 }
290
291 gboolean cfs_clinfo_add_node(
292 cfs_clinfo_t *clinfo,
293 cfs_clnode_t *clnode)
294 {
295 g_return_val_if_fail(clinfo != NULL, FALSE);
296 g_return_val_if_fail(clnode != NULL, FALSE);
297
298 g_hash_table_replace(clinfo->nodes_byid, &clnode->nodeid, clnode);
299 g_hash_table_replace(clinfo->nodes_byname, clnode->name, clnode);
300
301 return TRUE;
302 }
303
304 int
305 cfs_create_memberlist_msg(
306 GString *str)
307 {
308 g_return_val_if_fail(str != NULL, -EINVAL);
309
310 g_mutex_lock (&mutex);
311
312 g_string_append_printf(str,"{\n");
313
314 guint nodecount = 0;
315
316 cfs_clinfo_t *clinfo = cfs_status.clinfo;
317
318 if (clinfo && clinfo->nodes_byid)
319 nodecount = g_hash_table_size(clinfo->nodes_byid);
320
321 if (nodecount) {
322 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
323 g_string_append_printf(str, "\"version\": %u,\n", cfs_status.clinfo_version);
324
325 g_string_append_printf(str, "\"cluster\": { ");
326 g_string_append_printf(str, "\"name\": \"%s\", \"version\": %d, "
327 "\"nodes\": %d, \"quorate\": %d ",
328 clinfo->cluster_name, clinfo->cman_version,
329 nodecount, cfs_status.quorate);
330
331 g_string_append_printf(str,"},\n");
332 g_string_append_printf(str,"\"nodelist\": {\n");
333
334 GHashTable *ht = clinfo->nodes_byid;
335 GHashTableIter iter;
336 gpointer key, value;
337
338 g_hash_table_iter_init (&iter, ht);
339
340 int i = 0;
341 while (g_hash_table_iter_next (&iter, &key, &value)) {
342 cfs_clnode_t *node = (cfs_clnode_t *)value;
343 if (i) g_string_append_printf(str, ",\n");
344 i++;
345
346 g_string_append_printf(str, " \"%s\": { \"id\": %d, \"online\": %d",
347 node->name, node->nodeid, node->online);
348
349
350 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, node->name);
351 if (ip) {
352 g_string_append_printf(str, ", \"ip\": \"%s\"", ip);
353 }
354
355 g_string_append_printf(str, "}");
356
357 }
358 g_string_append_printf(str,"\n }\n");
359 } else {
360 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
361 g_string_append_printf(str, "\"version\": %u\n", cfs_status.clinfo_version);
362 }
363
364 g_string_append_printf(str,"}\n");
365
366 g_mutex_unlock (&mutex);
367
368 return 0;
369 }
370
371 static void
372 kventry_free(kventry_t *entry)
373 {
374 g_return_if_fail(entry != NULL);
375
376 g_free(entry->key);
377 g_free(entry->data);
378 g_free(entry);
379 }
380
381 static GHashTable *
382 kventry_hash_new(void)
383 {
384 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
385 (GDestroyNotify)kventry_free);
386 }
387
388 static void
389 rrdentry_free(rrdentry_t *entry)
390 {
391 g_return_if_fail(entry != NULL);
392
393 g_free(entry->key);
394 g_free(entry->data);
395 g_free(entry);
396 }
397
398 static GHashTable *
399 rrdentry_hash_new(void)
400 {
401 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
402 (GDestroyNotify)rrdentry_free);
403 }
404
405 void
406 cfs_cluster_log_dump(GString *str, const char *user, guint max_entries)
407 {
408 clusterlog_dump(cfs_status.clusterlog, str, user, max_entries);
409 }
410
411 void
412 cfs_cluster_log(clog_entry_t *entry)
413 {
414 g_return_if_fail(entry != NULL);
415
416 clusterlog_insert(cfs_status.clusterlog, entry);
417
418 if (cfs_status.kvstore) {
419 struct iovec iov[1];
420 iov[0].iov_base = (char *)entry;
421 iov[0].iov_len = clog_entry_size(entry);
422
423 if (dfsm_is_initialized(cfs_status.kvstore))
424 dfsm_send_message(cfs_status.kvstore, KVSTORE_MESSAGE_LOG, iov, 1);
425 }
426 }
427
428 void cfs_status_init(void)
429 {
430 g_mutex_lock (&mutex);
431
432 cfs_status.start_time = time(NULL);
433
434 cfs_status.vmlist = vmlist_hash_new();
435
436 cfs_status.kvhash = kventry_hash_new();
437
438 cfs_status.rrdhash = rrdentry_hash_new();
439
440 cfs_status.iphash = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
441
442 cfs_status.memdb_changes = g_hash_table_new(g_str_hash, g_str_equal);
443
444 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
445 g_hash_table_replace(cfs_status.memdb_changes,
446 memdb_change_array[i].path,
447 &memdb_change_array[i]);
448 }
449
450 cfs_status.clusterlog = clusterlog_new();
451
452 // fixme:
453 clusterlog_add(cfs_status.clusterlog, "root", "cluster", getpid(),
454 LOG_INFO, "starting cluster log");
455
456 g_mutex_unlock (&mutex);
457 }
458
459 void cfs_status_cleanup(void)
460 {
461 g_mutex_lock (&mutex);
462
463 cfs_status.clinfo_version++;
464
465 if (cfs_status.clinfo) {
466 cfs_clinfo_destroy(cfs_status.clinfo);
467 cfs_status.clinfo = NULL;
468 }
469
470 if (cfs_status.vmlist) {
471 g_hash_table_destroy(cfs_status.vmlist);
472 cfs_status.vmlist = NULL;
473 }
474
475 if (cfs_status.kvhash) {
476 g_hash_table_destroy(cfs_status.kvhash);
477 cfs_status.kvhash = NULL;
478 }
479
480 if (cfs_status.rrdhash) {
481 g_hash_table_destroy(cfs_status.rrdhash);
482 cfs_status.rrdhash = NULL;
483 }
484
485 if (cfs_status.iphash) {
486 g_hash_table_destroy(cfs_status.iphash);
487 cfs_status.iphash = NULL;
488 }
489
490 if (cfs_status.clusterlog)
491 clusterlog_destroy(cfs_status.clusterlog);
492
493 g_mutex_unlock (&mutex);
494 }
495
496 void cfs_status_set_clinfo(
497 cfs_clinfo_t *clinfo)
498 {
499 g_return_if_fail(clinfo != NULL);
500
501 g_mutex_lock (&mutex);
502
503 cfs_status.clinfo_version++;
504
505 cfs_clinfo_t *old = cfs_status.clinfo;
506
507 cfs_status.clinfo = clinfo;
508
509 cfs_message("update cluster info (cluster name %s, version = %d)",
510 clinfo->cluster_name, clinfo->cman_version);
511
512
513 if (old && old->nodes_byid && clinfo->nodes_byid) {
514 /* copy kvstore */
515 GHashTable *ht = clinfo->nodes_byid;
516 GHashTableIter iter;
517 gpointer key, value;
518
519 g_hash_table_iter_init (&iter, ht);
520
521 while (g_hash_table_iter_next (&iter, &key, &value)) {
522 cfs_clnode_t *node = (cfs_clnode_t *)value;
523 cfs_clnode_t *oldnode;
524 if ((oldnode = g_hash_table_lookup(old->nodes_byid, key))) {
525 node->online = oldnode->online;
526 node->kvhash = oldnode->kvhash;
527 oldnode->kvhash = NULL;
528 }
529 }
530
531 }
532
533 if (old)
534 cfs_clinfo_destroy(old);
535
536
537 g_mutex_unlock (&mutex);
538 }
539
540 static void
541 dump_kvstore_versions(
542 GString *str,
543 GHashTable *kvhash,
544 const char *nodename)
545 {
546 g_return_if_fail(kvhash != NULL);
547 g_return_if_fail(str != NULL);
548 g_return_if_fail(nodename != NULL);
549
550 GHashTable *ht = kvhash;
551 GHashTableIter iter;
552 gpointer key, value;
553
554 g_string_append_printf(str, "\"%s\": {\n", nodename);
555
556 g_hash_table_iter_init (&iter, ht);
557
558 int i = 0;
559 while (g_hash_table_iter_next (&iter, &key, &value)) {
560 kventry_t *entry = (kventry_t *)value;
561 if (i) g_string_append_printf(str, ",\n");
562 i++;
563 g_string_append_printf(str,"\"%s\": %u", entry->key, entry->version);
564 }
565
566 g_string_append_printf(str, "}\n");
567 }
568
569 int
570 cfs_create_version_msg(GString *str)
571 {
572 g_return_val_if_fail(str != NULL, -EINVAL);
573
574 g_mutex_lock (&mutex);
575
576 g_string_append_printf(str,"{\n");
577
578 g_string_append_printf(str, "\"starttime\": %lu,\n", (unsigned long)cfs_status.start_time);
579
580 g_string_append_printf(str, "\"clinfo\": %u,\n", cfs_status.clinfo_version);
581
582 g_string_append_printf(str, "\"vmlist\": %u,\n", cfs_status.vmlist_version);
583
584 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
585 g_string_append_printf(str, "\"%s\": %u,\n",
586 memdb_change_array[i].path,
587 memdb_change_array[i].version);
588 }
589
590 g_string_append_printf(str, "\"kvstore\": {\n");
591
592 dump_kvstore_versions(str, cfs_status.kvhash, cfs.nodename);
593
594 cfs_clinfo_t *clinfo = cfs_status.clinfo;
595
596 if (clinfo && clinfo->nodes_byid) {
597 GHashTable *ht = clinfo->nodes_byid;
598 GHashTableIter iter;
599 gpointer key, value;
600
601 g_hash_table_iter_init (&iter, ht);
602
603 while (g_hash_table_iter_next (&iter, &key, &value)) {
604 cfs_clnode_t *node = (cfs_clnode_t *)value;
605 if (!node->kvhash)
606 continue;
607 g_string_append_printf(str, ",\n");
608 dump_kvstore_versions(str, node->kvhash, node->name);
609 }
610 }
611
612 g_string_append_printf(str,"}\n");
613
614 g_string_append_printf(str,"}\n");
615
616 g_mutex_unlock (&mutex);
617
618 return 0;
619 }
620
621 GHashTable *
622 vmlist_hash_new(void)
623 {
624 return g_hash_table_new_full(g_int_hash, g_int_equal, NULL,
625 (GDestroyNotify)vminfo_free);
626 }
627
628 gboolean
629 vmlist_hash_insert_vm(
630 GHashTable *vmlist,
631 int vmtype,
632 guint32 vmid,
633 const char *nodename,
634 gboolean replace)
635 {
636 g_return_val_if_fail(vmlist != NULL, FALSE);
637 g_return_val_if_fail(nodename != NULL, FALSE);
638 g_return_val_if_fail(vmid != 0, FALSE);
639 // FIXME: remove openvz stuff for 7.x
640 g_return_val_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
641 vmtype == VMTYPE_LXC, FALSE);
642
643 if (!replace && g_hash_table_lookup(vmlist, &vmid)) {
644 cfs_critical("detected duplicate VMID %d", vmid);
645 return FALSE;
646 }
647
648 vminfo_t *vminfo = g_new0(vminfo_t, 1);
649
650 vminfo->vmid = vmid;
651 vminfo->vmtype = vmtype;
652 vminfo->nodename = g_strdup(nodename);
653
654 vminfo->version = ++vminfo_version_counter;
655
656 g_hash_table_replace(vmlist, &vminfo->vmid, vminfo);
657
658 return TRUE;
659 }
660
661 void
662 vmlist_register_vm(
663 int vmtype,
664 guint32 vmid,
665 const char *nodename)
666 {
667 g_return_if_fail(cfs_status.vmlist != NULL);
668 g_return_if_fail(nodename != NULL);
669 g_return_if_fail(vmid != 0);
670 // FIXME: remove openvz stuff for 7.x
671 g_return_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
672 vmtype == VMTYPE_LXC);
673
674 cfs_debug("vmlist_register_vm: %s/%u %d", nodename, vmid, vmtype);
675
676 g_mutex_lock (&mutex);
677
678 cfs_status.vmlist_version++;
679
680 vmlist_hash_insert_vm(cfs_status.vmlist, vmtype, vmid, nodename, TRUE);
681
682 g_mutex_unlock (&mutex);
683 }
684
685 gboolean
686 vmlist_different_vm_exists(
687 int vmtype,
688 guint32 vmid,
689 const char *nodename)
690 {
691 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
692 g_return_val_if_fail(vmid != 0, FALSE);
693
694 gboolean res = FALSE;
695
696 g_mutex_lock (&mutex);
697
698 vminfo_t *vminfo;
699 if ((vminfo = (vminfo_t *)g_hash_table_lookup(cfs_status.vmlist, &vmid))) {
700 if (!(vminfo->vmtype == vmtype && strcmp(vminfo->nodename, nodename) == 0))
701 res = TRUE;
702 }
703 g_mutex_unlock (&mutex);
704
705 return res;
706 }
707
708 gboolean
709 vmlist_vm_exists(
710 guint32 vmid)
711 {
712 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
713 g_return_val_if_fail(vmid != 0, FALSE);
714
715 g_mutex_lock (&mutex);
716
717 gpointer res = g_hash_table_lookup(cfs_status.vmlist, &vmid);
718
719 g_mutex_unlock (&mutex);
720
721 return res != NULL;
722 }
723
724 void
725 vmlist_delete_vm(
726 guint32 vmid)
727 {
728 g_return_if_fail(cfs_status.vmlist != NULL);
729 g_return_if_fail(vmid != 0);
730
731 g_mutex_lock (&mutex);
732
733 cfs_status.vmlist_version++;
734
735 g_hash_table_remove(cfs_status.vmlist, &vmid);
736
737 g_mutex_unlock (&mutex);
738 }
739
740 void cfs_status_set_vmlist(
741 GHashTable *vmlist)
742 {
743 g_return_if_fail(vmlist != NULL);
744
745 g_mutex_lock (&mutex);
746
747 cfs_status.vmlist_version++;
748
749 if (cfs_status.vmlist)
750 g_hash_table_destroy(cfs_status.vmlist);
751
752 cfs_status.vmlist = vmlist;
753
754 g_mutex_unlock (&mutex);
755 }
756
757 int
758 cfs_create_vmlist_msg(GString *str)
759 {
760 g_return_val_if_fail(cfs_status.vmlist != NULL, -EINVAL);
761 g_return_val_if_fail(str != NULL, -EINVAL);
762
763 g_mutex_lock (&mutex);
764
765 g_string_append_printf(str,"{\n");
766
767 GHashTable *ht = cfs_status.vmlist;
768
769 guint count = g_hash_table_size(ht);
770
771 if (!count) {
772 g_string_append_printf(str,"\"version\": %u\n", cfs_status.vmlist_version);
773 } else {
774 g_string_append_printf(str,"\"version\": %u,\n", cfs_status.vmlist_version);
775
776 g_string_append_printf(str,"\"ids\": {\n");
777
778 GHashTableIter iter;
779 gpointer key, value;
780
781 g_hash_table_iter_init (&iter, ht);
782
783 int first = 1;
784 while (g_hash_table_iter_next (&iter, &key, &value)) {
785 vminfo_t *vminfo = (vminfo_t *)value;
786 const char *type = vminfo_type_to_string(vminfo);
787
788 if (!first)
789 g_string_append_printf(str, ",\n");
790 first = 0;
791
792 g_string_append_printf(str,"\"%u\": { \"node\": \"%s\", \"type\": \"%s\", \"version\": %u }",
793 vminfo->vmid, vminfo->nodename, type, vminfo->version);
794 }
795
796 g_string_append_printf(str,"}\n");
797 }
798 g_string_append_printf(str,"\n}\n");
799
800 g_mutex_unlock (&mutex);
801
802 return 0;
803 }
804
805 // checks the conf for a line starting with '$prop:' and returns the value
806 // afterwards, whitout initial whitespace(s), we only deal with the format
807 // restricion imposed by our perl VM config parser, main reference is
808 // PVE::QemuServer::parse_vm_config this allows to be very fast and still
809 // relatively simple
810 // main restrictions used for our advantage is the properties match reges:
811 // ($line =~ m/^([a-z][a-z_]*\d*):\s*(.+?)\s*$/) from parse_vm_config
812 // currently we only look at the current configuration in place, i.e., *no*
813 // snapshort and *no* pending changes
814 static char *
815 _get_property_value(char *conf, int conf_size, const char *prop, int prop_len)
816 {
817 const char *const conf_end = conf + conf_size;
818 char *line = conf;
819 size_t remaining_size;
820
821 char *next_newline = memchr(conf, '\n', conf_size);
822 if (next_newline == NULL) {
823 return NULL; // valid property lines end with \n, but none in the config
824 }
825 *next_newline = '\0';
826
827 while (line != NULL) {
828 if (!line[0]) goto next;
829
830 // snapshot or pending section start, but nothing found yet -> not found
831 if (line[0] == '[') return NULL;
832 // properties start with /^[a-z]/, so continue early if not
833 if (line[0] < 'a' || line[0] > 'z') goto next;
834
835 int line_len = strlen(line);
836 if (line_len <= prop_len + 1) goto next;
837
838 if (line[prop_len] == ':' && memcmp(line, prop, prop_len) == 0) { // found
839 char *v_start = &line[prop_len + 1];
840
841 // drop initial value whitespaces here already
842 while (*v_start && isspace(*v_start)) v_start++;
843
844 if (!*v_start) return NULL;
845
846 char *v_end = &line[line_len - 1];
847 while (v_end > v_start && isspace(*v_end)) v_end--;
848 v_end[1] = '\0';
849
850 return v_start;
851 }
852 next:
853 line = next_newline + 1;
854 remaining_size = conf_end - line;
855 if (remaining_size <= prop_len) {
856 return NULL;
857 }
858 next_newline = memchr(line, '\n', remaining_size);
859 if (next_newline == NULL) {
860 return NULL; // valid property lines end with \n, but none in the config
861 }
862 *next_newline = '\0';
863 }
864
865 return NULL; // not found
866 }
867
868 static void
869 _g_str_append_kv_jsonescaped(GString *str, const char *k, const char *v)
870 {
871 g_string_append_printf(str, "\"%s\": \"", k);
872
873 for (; *v; v++) {
874 if (*v == '\\' || *v == '"') {
875 g_string_append_c(str, '\\');
876 }
877 g_string_append_c(str, *v);
878 }
879
880 g_string_append_c(str, '"');
881 }
882
883 int
884 cfs_create_guest_conf_property_msg(GString *str, memdb_t *memdb, const char *prop, uint32_t vmid)
885 {
886 g_return_val_if_fail(cfs_status.vmlist != NULL, -EINVAL);
887 g_return_val_if_fail(str != NULL, -EINVAL);
888
889 int prop_len = strlen(prop);
890 int res = 0;
891 GString *path = NULL;
892
893 // Prelock &memdb->mutex in order to enable the usage of memdb_read_nolock
894 // to prevent Deadlocks as in #2553
895 g_mutex_lock (&memdb->mutex);
896 g_mutex_lock (&mutex);
897
898 g_string_printf(str,"{\n");
899
900 GHashTable *ht = cfs_status.vmlist;
901 gpointer tmp = NULL;
902 if (!g_hash_table_size(ht)) {
903 goto ret;
904 }
905
906 if ((path = g_string_sized_new(256)) == NULL) {
907 res = -ENOMEM;
908 goto ret;
909 }
910
911 if (vmid >= 100) {
912 vminfo_t *vminfo = (vminfo_t *) g_hash_table_lookup(cfs_status.vmlist, &vmid);
913 if (vminfo == NULL) goto enoent;
914
915 if (!vminfo_to_path(vminfo, path)) goto err;
916
917 // use memdb_read_nolock because lock is handled here
918 int size = memdb_read_nolock(memdb, path->str, &tmp);
919 if (tmp == NULL) goto err;
920 if (size <= prop_len) goto ret;
921
922 char *val = _get_property_value(tmp, size, prop, prop_len);
923 if (val == NULL) goto ret;
924
925 g_string_append_printf(str, "\"%u\":{", vmid);
926 _g_str_append_kv_jsonescaped(str, prop, val);
927 g_string_append_c(str, '}');
928
929 } else {
930 GHashTableIter iter;
931 g_hash_table_iter_init (&iter, ht);
932
933 gpointer key, value;
934 int first = 1;
935 while (g_hash_table_iter_next (&iter, &key, &value)) {
936 vminfo_t *vminfo = (vminfo_t *)value;
937
938 if (!vminfo_to_path(vminfo, path)) goto err;
939
940 g_free(tmp); // no-op if already null
941 tmp = NULL;
942 // use memdb_read_nolock because lock is handled here
943 int size = memdb_read_nolock(memdb, path->str, &tmp);
944 if (tmp == NULL || size <= prop_len) continue;
945
946 char *val = _get_property_value(tmp, size, prop, prop_len);
947 if (val == NULL) continue;
948
949 if (!first) g_string_append_printf(str, ",\n");
950 else first = 0;
951
952 g_string_append_printf(str, "\"%u\":{", vminfo->vmid);
953 _g_str_append_kv_jsonescaped(str, prop, val);
954 g_string_append_c(str, '}');
955 }
956 }
957 ret:
958 g_free(tmp);
959 if (path != NULL) {
960 g_string_free(path, TRUE);
961 }
962 g_string_append_printf(str,"\n}\n");
963 g_mutex_unlock (&mutex);
964 g_mutex_unlock (&memdb->mutex);
965 return res;
966 err:
967 res = -EIO;
968 goto ret;
969 enoent:
970 res = -ENOENT;
971 goto ret;
972 }
973
974 void
975 record_memdb_change(const char *path)
976 {
977 g_return_if_fail(cfs_status.memdb_changes != 0);
978
979 memdb_change_t *ce;
980
981 if ((ce = (memdb_change_t *)g_hash_table_lookup(cfs_status.memdb_changes, path))) {
982 ce->version++;
983 }
984 }
985
986 void
987 record_memdb_reload(void)
988 {
989 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
990 memdb_change_array[i].version++;
991 }
992 }
993
994 static gboolean
995 kventry_hash_set(
996 GHashTable *kvhash,
997 const char *key,
998 gconstpointer data,
999 size_t len)
1000 {
1001 g_return_val_if_fail(kvhash != NULL, FALSE);
1002 g_return_val_if_fail(key != NULL, FALSE);
1003 g_return_val_if_fail(data != NULL, FALSE);
1004
1005 kventry_t *entry;
1006 if (!len) {
1007 g_hash_table_remove(kvhash, key);
1008 } else if ((entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
1009 g_free(entry->data);
1010 entry->data = g_memdup(data, len);
1011 entry->len = len;
1012 entry->version++;
1013 } else {
1014 kventry_t *entry = g_new0(kventry_t, 1);
1015
1016 entry->key = g_strdup(key);
1017 entry->data = g_memdup(data, len);
1018 entry->len = len;
1019
1020 g_hash_table_replace(kvhash, entry->key, entry);
1021 }
1022
1023 return TRUE;
1024 }
1025
1026 static const char *rrd_def_node[] = {
1027 "DS:loadavg:GAUGE:120:0:U",
1028 "DS:maxcpu:GAUGE:120:0:U",
1029 "DS:cpu:GAUGE:120:0:U",
1030 "DS:iowait:GAUGE:120:0:U",
1031 "DS:memtotal:GAUGE:120:0:U",
1032 "DS:memused:GAUGE:120:0:U",
1033 "DS:swaptotal:GAUGE:120:0:U",
1034 "DS:swapused:GAUGE:120:0:U",
1035 "DS:roottotal:GAUGE:120:0:U",
1036 "DS:rootused:GAUGE:120:0:U",
1037 "DS:netin:DERIVE:120:0:U",
1038 "DS:netout:DERIVE:120:0:U",
1039
1040 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
1041 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
1042 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
1043 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
1044 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
1045
1046 "RRA:MAX:0.5:1:70", // 1 min max - one hour
1047 "RRA:MAX:0.5:30:70", // 30 min max - one day
1048 "RRA:MAX:0.5:180:70", // 3 hour max - one week
1049 "RRA:MAX:0.5:720:70", // 12 hour max - one month
1050 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
1051 NULL,
1052 };
1053
1054 static const char *rrd_def_vm[] = {
1055 "DS:maxcpu:GAUGE:120:0:U",
1056 "DS:cpu:GAUGE:120:0:U",
1057 "DS:maxmem:GAUGE:120:0:U",
1058 "DS:mem:GAUGE:120:0:U",
1059 "DS:maxdisk:GAUGE:120:0:U",
1060 "DS:disk:GAUGE:120:0:U",
1061 "DS:netin:DERIVE:120:0:U",
1062 "DS:netout:DERIVE:120:0:U",
1063 "DS:diskread:DERIVE:120:0:U",
1064 "DS:diskwrite:DERIVE:120:0:U",
1065
1066 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
1067 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
1068 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
1069 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
1070 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
1071
1072 "RRA:MAX:0.5:1:70", // 1 min max - one hour
1073 "RRA:MAX:0.5:30:70", // 30 min max - one day
1074 "RRA:MAX:0.5:180:70", // 3 hour max - one week
1075 "RRA:MAX:0.5:720:70", // 12 hour max - one month
1076 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
1077 NULL,
1078 };
1079
1080 static const char *rrd_def_storage[] = {
1081 "DS:total:GAUGE:120:0:U",
1082 "DS:used:GAUGE:120:0:U",
1083
1084 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
1085 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
1086 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
1087 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
1088 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
1089
1090 "RRA:MAX:0.5:1:70", // 1 min max - one hour
1091 "RRA:MAX:0.5:30:70", // 30 min max - one day
1092 "RRA:MAX:0.5:180:70", // 3 hour max - one week
1093 "RRA:MAX:0.5:720:70", // 12 hour max - one month
1094 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
1095 NULL,
1096 };
1097
1098 #define RRDDIR "/var/lib/rrdcached/db"
1099
1100 static void
1101 create_rrd_file(
1102 const char *filename,
1103 int argcount,
1104 const char *rrddef[])
1105 {
1106 /* start at day boundary */
1107 time_t ctime;
1108 time(&ctime);
1109 struct tm *ltm = localtime(&ctime);
1110 ltm->tm_sec = 0;
1111 ltm->tm_min = 0;
1112 ltm->tm_hour = 0;
1113
1114 rrd_clear_error();
1115 if (rrd_create_r(filename, 60, timelocal(ltm), argcount, rrddef)) {
1116 cfs_message("RRD create error %s: %s", filename, rrd_get_error());
1117 }
1118 }
1119
1120 static inline const char *
1121 rrd_skip_data(
1122 const char *data,
1123 int count)
1124 {
1125 int found = 0;
1126 while (*data && found < count) {
1127 if (*data++ == ':')
1128 found++;
1129 }
1130 return data;
1131 }
1132
1133 static void
1134 update_rrd_data(
1135 const char *key,
1136 gconstpointer data,
1137 size_t len)
1138 {
1139 g_return_if_fail(key != NULL);
1140 g_return_if_fail(data != NULL);
1141 g_return_if_fail(len > 0);
1142 g_return_if_fail(len < 4096);
1143
1144 static const char *rrdcsock = "unix:/var/run/rrdcached.sock";
1145
1146 int use_daemon = 1;
1147 if (rrdc_connect(rrdcsock) != 0)
1148 use_daemon = 0;
1149
1150 char *filename = NULL;
1151
1152 int skip = 0;
1153
1154 if (strncmp(key, "pve2-node/", 10) == 0) {
1155 const char *node = key + 10;
1156
1157 skip = 2;
1158
1159 if (strchr(node, '/') != NULL)
1160 goto keyerror;
1161
1162 if (strlen(node) < 1)
1163 goto keyerror;
1164
1165 filename = g_strdup_printf(RRDDIR "/%s", key);
1166
1167 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1168
1169 mkdir(RRDDIR "/pve2-node", 0755);
1170 int argcount = sizeof(rrd_def_node)/sizeof(void*) - 1;
1171 create_rrd_file(filename, argcount, rrd_def_node);
1172 }
1173
1174 } else if ((strncmp(key, "pve2-vm/", 8) == 0) ||
1175 (strncmp(key, "pve2.3-vm/", 10) == 0)) {
1176 const char *vmid;
1177
1178 if (strncmp(key, "pve2-vm/", 8) == 0) {
1179 vmid = key + 8;
1180 skip = 2;
1181 } else {
1182 vmid = key + 10;
1183 skip = 4;
1184 }
1185
1186 if (strchr(vmid, '/') != NULL)
1187 goto keyerror;
1188
1189 if (strlen(vmid) < 1)
1190 goto keyerror;
1191
1192 filename = g_strdup_printf(RRDDIR "/%s/%s", "pve2-vm", vmid);
1193
1194 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1195
1196 mkdir(RRDDIR "/pve2-vm", 0755);
1197 int argcount = sizeof(rrd_def_vm)/sizeof(void*) - 1;
1198 create_rrd_file(filename, argcount, rrd_def_vm);
1199 }
1200
1201 } else if (strncmp(key, "pve2-storage/", 13) == 0) {
1202 const char *node = key + 13;
1203
1204 const char *storage = node;
1205 while (*storage && *storage != '/')
1206 storage++;
1207
1208 if (*storage != '/' || ((storage - node) < 1))
1209 goto keyerror;
1210
1211 storage++;
1212
1213 if (strchr(storage, '/') != NULL)
1214 goto keyerror;
1215
1216 if (strlen(storage) < 1)
1217 goto keyerror;
1218
1219 filename = g_strdup_printf(RRDDIR "/%s", key);
1220
1221 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1222
1223 mkdir(RRDDIR "/pve2-storage", 0755);
1224
1225 char *dir = g_path_get_dirname(filename);
1226 mkdir(dir, 0755);
1227 g_free(dir);
1228
1229 int argcount = sizeof(rrd_def_storage)/sizeof(void*) - 1;
1230 create_rrd_file(filename, argcount, rrd_def_storage);
1231 }
1232
1233 } else {
1234 goto keyerror;
1235 }
1236
1237 const char *dp = skip ? rrd_skip_data(data, skip) : data;
1238
1239 const char *update_args[] = { dp, NULL };
1240
1241 if (use_daemon) {
1242 int status;
1243 if ((status = rrdc_update(filename, 1, update_args)) != 0) {
1244 cfs_message("RRDC update error %s: %d", filename, status);
1245 rrdc_disconnect();
1246 rrd_clear_error();
1247 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1248 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1249 }
1250 }
1251
1252 } else {
1253 rrd_clear_error();
1254 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1255 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1256 }
1257 }
1258
1259 ret:
1260 if (filename)
1261 g_free(filename);
1262
1263 return;
1264
1265 keyerror:
1266 cfs_critical("RRD update error: unknown/wrong key %s", key);
1267 goto ret;
1268 }
1269
1270 static gboolean
1271 rrd_entry_is_old(
1272 gpointer key,
1273 gpointer value,
1274 gpointer user_data)
1275 {
1276 rrdentry_t *entry = (rrdentry_t *)value;
1277 uint32_t ctime = GPOINTER_TO_UINT(user_data);
1278
1279 int diff = ctime - entry->time;
1280
1281 /* remove everything older than 5 minutes */
1282 int expire = 60*5;
1283
1284 return (diff > expire) ? TRUE : FALSE;
1285 }
1286
1287 static char *rrd_dump_buf = NULL;
1288 static time_t rrd_dump_last = 0;
1289
1290 void
1291 cfs_rrd_dump(GString *str)
1292 {
1293 time_t ctime;
1294
1295 g_mutex_lock (&mutex);
1296
1297 time(&ctime);
1298 if (rrd_dump_buf && (ctime - rrd_dump_last) < 2) {
1299 g_string_assign(str, rrd_dump_buf);
1300 g_mutex_unlock (&mutex);
1301 return;
1302 }
1303
1304 /* remove old data */
1305 g_hash_table_foreach_remove(cfs_status.rrdhash, rrd_entry_is_old,
1306 GUINT_TO_POINTER(ctime));
1307
1308 g_string_set_size(str, 0);
1309
1310 GHashTableIter iter;
1311 gpointer key, value;
1312
1313 g_hash_table_iter_init (&iter, cfs_status.rrdhash);
1314
1315 while (g_hash_table_iter_next (&iter, &key, &value)) {
1316 rrdentry_t *entry = (rrdentry_t *)value;
1317 g_string_append(str, key);
1318 g_string_append(str, ":");
1319 g_string_append(str, entry->data);
1320 g_string_append(str, "\n");
1321 }
1322
1323 g_string_append_c(str, 0); // never return undef
1324
1325 rrd_dump_last = ctime;
1326 if (rrd_dump_buf)
1327 g_free(rrd_dump_buf);
1328 rrd_dump_buf = g_strdup(str->str);
1329
1330 g_mutex_unlock (&mutex);
1331 }
1332
1333 static gboolean
1334 nodeip_hash_set(
1335 GHashTable *iphash,
1336 const char *nodename,
1337 const char *ip,
1338 size_t len)
1339 {
1340 g_return_val_if_fail(iphash != NULL, FALSE);
1341 g_return_val_if_fail(nodename != NULL, FALSE);
1342 g_return_val_if_fail(ip != NULL, FALSE);
1343 g_return_val_if_fail(len > 0, FALSE);
1344 g_return_val_if_fail(len < 256, FALSE);
1345 g_return_val_if_fail(ip[len-1] == 0, FALSE);
1346
1347 char *oldip = (char *)g_hash_table_lookup(iphash, nodename);
1348
1349 if (!oldip || (strcmp(oldip, ip) != 0)) {
1350 cfs_status.clinfo_version++;
1351 g_hash_table_replace(iphash, g_strdup(nodename), g_strdup(ip));
1352 }
1353
1354 return TRUE;
1355 }
1356
1357 static gboolean
1358 rrdentry_hash_set(
1359 GHashTable *rrdhash,
1360 const char *key,
1361 const char *data,
1362 size_t len)
1363 {
1364 g_return_val_if_fail(rrdhash != NULL, FALSE);
1365 g_return_val_if_fail(key != NULL, FALSE);
1366 g_return_val_if_fail(data != NULL, FALSE);
1367 g_return_val_if_fail(len > 0, FALSE);
1368 g_return_val_if_fail(len < 4096, FALSE);
1369 g_return_val_if_fail(data[len-1] == 0, FALSE);
1370
1371 rrdentry_t *entry;
1372 if ((entry = (rrdentry_t *)g_hash_table_lookup(rrdhash, key))) {
1373 g_free(entry->data);
1374 entry->data = g_memdup(data, len);
1375 entry->len = len;
1376 entry->time = time(NULL);
1377 } else {
1378 rrdentry_t *entry = g_new0(rrdentry_t, 1);
1379
1380 entry->key = g_strdup(key);
1381 entry->data = g_memdup(data, len);
1382 entry->len = len;
1383 entry->time = time(NULL);
1384
1385 g_hash_table_replace(rrdhash, entry->key, entry);
1386 }
1387
1388 update_rrd_data(key, data, len);
1389
1390 return TRUE;
1391 }
1392
1393 static int
1394 kvstore_send_update_message(
1395 dfsm_t *dfsm,
1396 const char *key,
1397 gpointer data,
1398 guint32 len)
1399 {
1400 if (!dfsm_is_initialized(dfsm))
1401 return -EACCES;
1402
1403 struct iovec iov[2];
1404
1405 char name[256];
1406 g_strlcpy(name, key, sizeof(name));
1407
1408 iov[0].iov_base = &name;
1409 iov[0].iov_len = sizeof(name);
1410
1411 iov[1].iov_base = (char *)data;
1412 iov[1].iov_len = len;
1413
1414 if (dfsm_send_message(dfsm, KVSTORE_MESSAGE_UPDATE, iov, 2) == CS_OK)
1415 return 0;
1416
1417 return -EACCES;
1418 }
1419
1420 static clog_entry_t *
1421 kvstore_parse_log_message(
1422 const void *msg,
1423 size_t msg_len)
1424 {
1425 g_return_val_if_fail(msg != NULL, NULL);
1426
1427 if (msg_len < sizeof(clog_entry_t)) {
1428 cfs_critical("received short log message (%zu < %zu)", msg_len, sizeof(clog_entry_t));
1429 return NULL;
1430 }
1431
1432 clog_entry_t *entry = (clog_entry_t *)msg;
1433
1434 uint32_t size = sizeof(clog_entry_t) + entry->node_len +
1435 entry->ident_len + entry->tag_len + entry->msg_len;
1436
1437 if (msg_len != size) {
1438 cfs_critical("received log message with wrong size (%zu != %u)", msg_len, size);
1439 return NULL;
1440 }
1441
1442 char *msgptr = entry->data;
1443
1444 if (*((char *)msgptr + entry->node_len - 1)) {
1445 cfs_critical("unterminated string in log message");
1446 return NULL;
1447 }
1448 msgptr += entry->node_len;
1449
1450 if (*((char *)msgptr + entry->ident_len - 1)) {
1451 cfs_critical("unterminated string in log message");
1452 return NULL;
1453 }
1454 msgptr += entry->ident_len;
1455
1456 if (*((char *)msgptr + entry->tag_len - 1)) {
1457 cfs_critical("unterminated string in log message");
1458 return NULL;
1459 }
1460 msgptr += entry->tag_len;
1461
1462 if (*((char *)msgptr + entry->msg_len - 1)) {
1463 cfs_critical("unterminated string in log message");
1464 return NULL;
1465 }
1466
1467 return entry;
1468 }
1469
1470 static gboolean
1471 kvstore_parse_update_message(
1472 const void *msg,
1473 size_t msg_len,
1474 const char **key,
1475 gconstpointer *data,
1476 guint32 *len)
1477 {
1478 g_return_val_if_fail(msg != NULL, FALSE);
1479 g_return_val_if_fail(key != NULL, FALSE);
1480 g_return_val_if_fail(data != NULL, FALSE);
1481 g_return_val_if_fail(len != NULL, FALSE);
1482
1483 if (msg_len < 256) {
1484 cfs_critical("received short kvstore message (%zu < 256)", msg_len);
1485 return FALSE;
1486 }
1487
1488 /* test if key is null terminated */
1489 int i = 0;
1490 for (i = 0; i < 256; i++)
1491 if (((char *)msg)[i] == 0)
1492 break;
1493
1494 if (i == 256)
1495 return FALSE;
1496
1497
1498 *len = msg_len - 256;
1499 *key = msg;
1500 *data = (char *) msg + 256;
1501
1502 return TRUE;
1503 }
1504
1505 int
1506 cfs_create_status_msg(
1507 GString *str,
1508 const char *nodename,
1509 const char *key)
1510 {
1511 g_return_val_if_fail(str != NULL, -EINVAL);
1512 g_return_val_if_fail(key != NULL, -EINVAL);
1513
1514 int res = -ENOENT;
1515
1516 GHashTable *kvhash = NULL;
1517
1518 g_mutex_lock (&mutex);
1519
1520 if (!nodename || !nodename[0] || !strcmp(nodename, cfs.nodename)) {
1521 kvhash = cfs_status.kvhash;
1522 } else if (cfs_status.clinfo && cfs_status.clinfo->nodes_byname) {
1523 cfs_clnode_t *clnode;
1524 if ((clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byname, nodename)))
1525 kvhash = clnode->kvhash;
1526 }
1527
1528 kventry_t *entry;
1529 if (kvhash && (entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
1530 g_string_append_len(str, entry->data, entry->len);
1531 res = 0;
1532 }
1533
1534 g_mutex_unlock (&mutex);
1535
1536 return res;
1537 }
1538
1539 int
1540 cfs_status_set(
1541 const char *key,
1542 gpointer data,
1543 size_t len)
1544 {
1545 g_return_val_if_fail(key != NULL, FALSE);
1546 g_return_val_if_fail(data != NULL, FALSE);
1547 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1548
1549 if (len > CFS_MAX_STATUS_SIZE)
1550 return -EFBIG;
1551
1552 g_mutex_lock (&mutex);
1553
1554 gboolean res;
1555
1556 if (strncmp(key, "rrd/", 4) == 0) {
1557 res = rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1558 } else if (!strcmp(key, "nodeip")) {
1559 res = nodeip_hash_set(cfs_status.iphash, cfs.nodename, data, len);
1560 } else {
1561 res = kventry_hash_set(cfs_status.kvhash, key, data, len);
1562 }
1563 g_mutex_unlock (&mutex);
1564
1565 if (cfs_status.kvstore)
1566 kvstore_send_update_message(cfs_status.kvstore, key, data, len);
1567
1568 return res ? 0 : -ENOMEM;
1569 }
1570
1571 gboolean
1572 cfs_kvstore_node_set(
1573 uint32_t nodeid,
1574 const char *key,
1575 gconstpointer data,
1576 size_t len)
1577 {
1578 g_return_val_if_fail(nodeid != 0, FALSE);
1579 g_return_val_if_fail(key != NULL, FALSE);
1580 g_return_val_if_fail(data != NULL, FALSE);
1581
1582 g_mutex_lock (&mutex);
1583
1584 if (!cfs_status.clinfo || !cfs_status.clinfo->nodes_byid)
1585 goto ret; /* ignore */
1586
1587 cfs_clnode_t *clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byid, &nodeid);
1588 if (!clnode)
1589 goto ret; /* ignore */
1590
1591 cfs_debug("got node %d status update %s", nodeid, key);
1592
1593 if (strncmp(key, "rrd/", 4) == 0) {
1594 rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1595 } else if (!strcmp(key, "nodeip")) {
1596 nodeip_hash_set(cfs_status.iphash, clnode->name, data, len);
1597 } else {
1598 if (!clnode->kvhash) {
1599 if (!(clnode->kvhash = kventry_hash_new())) {
1600 goto ret; /*ignore */
1601 }
1602 }
1603
1604 kventry_hash_set(clnode->kvhash, key, data, len);
1605
1606 }
1607 ret:
1608 g_mutex_unlock (&mutex);
1609
1610 return TRUE;
1611 }
1612
1613 static gboolean
1614 cfs_kvstore_sync(void)
1615 {
1616 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1617 g_return_val_if_fail(cfs_status.kvstore != NULL, FALSE);
1618
1619 gboolean res = TRUE;
1620
1621 g_mutex_lock (&mutex);
1622
1623 GHashTable *ht = cfs_status.kvhash;
1624 GHashTableIter iter;
1625 gpointer key, value;
1626
1627 g_hash_table_iter_init (&iter, ht);
1628
1629 while (g_hash_table_iter_next (&iter, &key, &value)) {
1630 kventry_t *entry = (kventry_t *)value;
1631 kvstore_send_update_message(cfs_status.kvstore, entry->key, entry->data, entry->len);
1632 }
1633
1634 g_mutex_unlock (&mutex);
1635
1636 return res;
1637 }
1638
1639 static int
1640 dfsm_deliver(
1641 dfsm_t *dfsm,
1642 gpointer data,
1643 int *res_ptr,
1644 uint32_t nodeid,
1645 uint32_t pid,
1646 uint16_t msg_type,
1647 uint32_t msg_time,
1648 const void *msg,
1649 size_t msg_len)
1650 {
1651 g_return_val_if_fail(dfsm != NULL, -1);
1652 g_return_val_if_fail(msg != NULL, -1);
1653 g_return_val_if_fail(res_ptr != NULL, -1);
1654
1655 /* ignore message for ourself */
1656 if (dfsm_nodeid_is_local(dfsm, nodeid, pid))
1657 goto ret;
1658
1659 if (msg_type == KVSTORE_MESSAGE_UPDATE) {
1660 const char *key;
1661 gconstpointer data;
1662 guint32 len;
1663 if (kvstore_parse_update_message(msg, msg_len, &key, &data, &len)) {
1664 cfs_kvstore_node_set(nodeid, key, data, len);
1665 } else {
1666 cfs_critical("cant parse update message");
1667 }
1668 } else if (msg_type == KVSTORE_MESSAGE_LOG) {
1669 cfs_message("received log"); // fixme: remove
1670 const clog_entry_t *entry;
1671 if ((entry = kvstore_parse_log_message(msg, msg_len))) {
1672 clusterlog_insert(cfs_status.clusterlog, entry);
1673 } else {
1674 cfs_critical("cant parse log message");
1675 }
1676 } else {
1677 cfs_critical("received unknown message type %d\n", msg_type);
1678 goto fail;
1679 }
1680
1681 ret:
1682 *res_ptr = 0;
1683 return 1;
1684
1685 fail:
1686 *res_ptr = -EACCES;
1687 return 1;
1688 }
1689
1690 static void
1691 dfsm_confchg(
1692 dfsm_t *dfsm,
1693 gpointer data,
1694 const struct cpg_address *member_list,
1695 size_t member_list_entries)
1696 {
1697 g_return_if_fail(dfsm != NULL);
1698 g_return_if_fail(member_list != NULL);
1699
1700 cfs_debug("enter %s", __func__);
1701
1702 g_mutex_lock (&mutex);
1703
1704 cfs_clinfo_t *clinfo = cfs_status.clinfo;
1705
1706 if (clinfo && clinfo->nodes_byid) {
1707
1708 GHashTable *ht = clinfo->nodes_byid;
1709 GHashTableIter iter;
1710 gpointer key, value;
1711
1712 g_hash_table_iter_init (&iter, ht);
1713
1714 while (g_hash_table_iter_next (&iter, &key, &value)) {
1715 cfs_clnode_t *node = (cfs_clnode_t *)value;
1716 node->online = FALSE;
1717 }
1718
1719 for (int i = 0; i < member_list_entries; i++) {
1720 cfs_clnode_t *node;
1721 if ((node = g_hash_table_lookup(clinfo->nodes_byid, &member_list[i].nodeid))) {
1722 node->online = TRUE;
1723 }
1724 }
1725
1726 cfs_status.clinfo_version++;
1727 }
1728
1729 g_mutex_unlock (&mutex);
1730 }
1731
1732 static gpointer
1733 dfsm_get_state(
1734 dfsm_t *dfsm,
1735 gpointer data,
1736 unsigned int *res_len)
1737 {
1738 g_return_val_if_fail(dfsm != NULL, NULL);
1739
1740 gpointer msg = clusterlog_get_state(cfs_status.clusterlog, res_len);
1741
1742 return msg;
1743 }
1744
1745 static int
1746 dfsm_process_update(
1747 dfsm_t *dfsm,
1748 gpointer data,
1749 dfsm_sync_info_t *syncinfo,
1750 uint32_t nodeid,
1751 uint32_t pid,
1752 const void *msg,
1753 size_t msg_len)
1754 {
1755 cfs_critical("%s: received unexpected update message", __func__);
1756
1757 return -1;
1758 }
1759
1760 static int
1761 dfsm_process_state_update(
1762 dfsm_t *dfsm,
1763 gpointer data,
1764 dfsm_sync_info_t *syncinfo)
1765 {
1766 g_return_val_if_fail(dfsm != NULL, -1);
1767 g_return_val_if_fail(syncinfo != NULL, -1);
1768
1769 clog_base_t *clog[syncinfo->node_count];
1770
1771 int local_index = -1;
1772 for (int i = 0; i < syncinfo->node_count; i++) {
1773 dfsm_node_info_t *ni = &syncinfo->nodes[i];
1774 ni->synced = 1;
1775
1776 if (syncinfo->local == ni)
1777 local_index = i;
1778
1779 clog_base_t *base = (clog_base_t *)ni->state;
1780 if (ni->state_len > 8 && ni->state_len == clog_size(base)) {
1781 clog[i] = ni->state;
1782 } else {
1783 cfs_critical("received log with wrong size %u", ni->state_len);
1784 clog[i] = NULL;
1785 }
1786 }
1787
1788 if (!clusterlog_merge(cfs_status.clusterlog, clog, syncinfo->node_count, local_index)) {
1789 cfs_critical("unable to merge log files");
1790 }
1791
1792 cfs_kvstore_sync();
1793
1794 return 1;
1795 }
1796
1797 static int
1798 dfsm_commit(
1799 dfsm_t *dfsm,
1800 gpointer data,
1801 dfsm_sync_info_t *syncinfo)
1802 {
1803 g_return_val_if_fail(dfsm != NULL, -1);
1804 g_return_val_if_fail(syncinfo != NULL, -1);
1805
1806 return 1;
1807 }
1808
1809 static void
1810 dfsm_synced(dfsm_t *dfsm)
1811 {
1812 g_return_if_fail(dfsm != NULL);
1813
1814 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, cfs.nodename);
1815 if (!ip)
1816 ip = cfs.ip;
1817
1818 cfs_status_set("nodeip", ip, strlen(ip) + 1);
1819 }
1820
1821 static int
1822 dfsm_cleanup(
1823 dfsm_t *dfsm,
1824 gpointer data,
1825 dfsm_sync_info_t *syncinfo)
1826 {
1827 return 1;
1828 }
1829
1830 static dfsm_callbacks_t kvstore_dfsm_callbacks = {
1831 .dfsm_deliver_fn = dfsm_deliver,
1832 .dfsm_confchg_fn = dfsm_confchg,
1833
1834 .dfsm_get_state_fn = dfsm_get_state,
1835 .dfsm_process_state_update_fn = dfsm_process_state_update,
1836 .dfsm_process_update_fn = dfsm_process_update,
1837 .dfsm_commit_fn = dfsm_commit,
1838 .dfsm_cleanup_fn = dfsm_cleanup,
1839 .dfsm_synced_fn = dfsm_synced,
1840 };
1841
1842 dfsm_t *
1843 cfs_status_dfsm_new(void)
1844 {
1845 g_mutex_lock (&mutex);
1846
1847 cfs_status.kvstore = dfsm_new(NULL, KVSTORE_CPG_GROUP_NAME, G_LOG_DOMAIN,
1848 0, &kvstore_dfsm_callbacks);
1849 g_mutex_unlock (&mutex);
1850
1851 return cfs_status.kvstore;
1852 }
1853
1854 gboolean
1855 cfs_is_quorate(void)
1856 {
1857 g_mutex_lock (&mutex);
1858 gboolean res = cfs_status.quorate;
1859 g_mutex_unlock (&mutex);
1860
1861 return res;
1862 }
1863
1864 void
1865 cfs_set_quorate(
1866 uint32_t quorate,
1867 gboolean quiet)
1868 {
1869 g_mutex_lock (&mutex);
1870
1871 uint32_t prev_quorate = cfs_status.quorate;
1872 cfs_status.quorate = quorate;
1873
1874 if (!prev_quorate && cfs_status.quorate) {
1875 if (!quiet)
1876 cfs_message("node has quorum");
1877 }
1878
1879 if (prev_quorate && !cfs_status.quorate) {
1880 if (!quiet)
1881 cfs_message("node lost quorum");
1882 }
1883
1884 g_mutex_unlock (&mutex);
1885 }