]> git.proxmox.com Git - pve-cluster.git/blob - data/src/status.c
bfeca6bf4149f516d892eb1af631b2d209763e33
[pve-cluster.git] / data / src / status.c
1 /*
2 Copyright (C) 2010 - 2020 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19 */
20
21 #define G_LOG_DOMAIN "status"
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif /* HAVE_CONFIG_H */
26
27 #include <stdio.h>
28 #include <stdint.h>
29 #include <string.h>
30 #include <errno.h>
31 #include <glib.h>
32 #include <sys/syslog.h>
33 #include <rrd.h>
34 #include <rrd_client.h>
35 #include <time.h>
36 #include <ctype.h>
37
38 #include "cfs-utils.h"
39 #include "status.h"
40 #include "memdb.h"
41 #include "logger.h"
42
43 #define KVSTORE_CPG_GROUP_NAME "pve_kvstore_v1"
44
45 typedef enum {
46 KVSTORE_MESSAGE_UPDATE = 1,
47 KVSTORE_MESSAGE_UPDATE_COMPLETE = 2,
48 KVSTORE_MESSAGE_LOG = 3,
49 } kvstore_message_t;
50
51 static uint32_t vminfo_version_counter;
52
53 typedef struct {
54 uint32_t vmid;
55 char *nodename;
56 int vmtype;
57 uint32_t version;
58 } vminfo_t;
59
60 typedef struct {
61 char *key;
62 gpointer data;
63 size_t len;
64 uint32_t version;
65 } kventry_t;
66
67 typedef struct {
68 char *key;
69 gpointer data;
70 size_t len;
71 uint32_t time;
72 } rrdentry_t;
73
74 typedef struct {
75 char *path;
76 uint32_t version;
77 } memdb_change_t;
78
79 static memdb_change_t memdb_change_array[] = {
80 { .path = "corosync.conf" },
81 { .path = "corosync.conf.new" },
82 { .path = "storage.cfg" },
83 { .path = "user.cfg" },
84 { .path = "domains.cfg" },
85 { .path = "priv/shadow.cfg" },
86 { .path = "priv/acme/plugins.cfg" },
87 { .path = "priv/tfa.cfg" },
88 { .path = "priv/token.cfg" },
89 { .path = "datacenter.cfg" },
90 { .path = "vzdump.cron" },
91 { .path = "ha/crm_commands" },
92 { .path = "ha/manager_status" },
93 { .path = "ha/resources.cfg" },
94 { .path = "ha/groups.cfg" },
95 { .path = "ha/fence.cfg" },
96 { .path = "status.cfg" },
97 { .path = "replication.cfg" },
98 { .path = "ceph.conf" },
99 { .path = "sdn/vnets.cfg" },
100 { .path = "sdn/zones.cfg" },
101 { .path = "sdn/controllers.cfg" },
102 { .path = "sdn/subnets.cfg" },
103 { .path = "sdn/ipams.cfg" },
104 { .path = "sdn/.version" },
105 { .path = "virtual-guest/cpu-models.conf" },
106 };
107
108 static GMutex mutex;
109
110 typedef struct {
111 time_t start_time;
112
113 uint32_t quorate;
114
115 cfs_clinfo_t *clinfo;
116 uint32_t clinfo_version;
117
118 GHashTable *vmlist;
119 uint32_t vmlist_version;
120
121 dfsm_t *kvstore;
122 GHashTable *kvhash;
123 GHashTable *rrdhash;
124 GHashTable *iphash;
125
126 GHashTable *memdb_changes;
127
128 clusterlog_t *clusterlog;
129 } cfs_status_t;
130
131 static cfs_status_t cfs_status;
132
133 struct cfs_clnode {
134 char *name;
135 uint32_t nodeid;
136 uint32_t votes;
137 gboolean online;
138 GHashTable *kvhash;
139 };
140
141 struct cfs_clinfo {
142 char *cluster_name;
143 uint32_t cman_version;
144
145 GHashTable *nodes_byid;
146 GHashTable *nodes_byname;
147 };
148
149 static guint
150 g_int32_hash (gconstpointer v)
151 {
152 return *(const uint32_t *) v;
153 }
154
155 static gboolean
156 g_int32_equal (gconstpointer v1,
157 gconstpointer v2)
158 {
159 return *((const uint32_t*) v1) == *((const uint32_t*) v2);
160 }
161
162 static void vminfo_free(vminfo_t *vminfo)
163 {
164 g_return_if_fail(vminfo != NULL);
165
166 if (vminfo->nodename)
167 g_free(vminfo->nodename);
168
169
170 g_free(vminfo);
171 }
172
173 static const char *vminfo_type_to_string(vminfo_t *vminfo)
174 {
175 if (vminfo->vmtype == VMTYPE_QEMU) {
176 return "qemu";
177 } else if (vminfo->vmtype == VMTYPE_OPENVZ) {
178 // FIXME: remove openvz stuff for 7.x
179 return "openvz";
180 } else if (vminfo->vmtype == VMTYPE_LXC) {
181 return "lxc";
182 } else {
183 return "unknown";
184 }
185 }
186
187 static const char *vminfo_type_to_path_type(vminfo_t *vminfo)
188 {
189 if (vminfo->vmtype == VMTYPE_QEMU) {
190 return "qemu-server"; // special case..
191 } else {
192 return vminfo_type_to_string(vminfo);
193 }
194 }
195
196 int vminfo_to_path(vminfo_t *vminfo, GString *path)
197 {
198 g_return_val_if_fail(vminfo != NULL, -1);
199 g_return_val_if_fail(path != NULL, -1);
200
201 if (!vminfo->nodename)
202 return 0;
203
204 const char *type = vminfo_type_to_path_type(vminfo);
205 g_string_printf(path, "/nodes/%s/%s/%u.conf", vminfo->nodename, type, vminfo->vmid);
206
207 return 1;
208 }
209
210 void cfs_clnode_destroy(
211 cfs_clnode_t *clnode)
212 {
213 g_return_if_fail(clnode != NULL);
214
215 if (clnode->kvhash)
216 g_hash_table_destroy(clnode->kvhash);
217
218 if (clnode->name)
219 g_free(clnode->name);
220
221 g_free(clnode);
222 }
223
224 cfs_clnode_t *cfs_clnode_new(
225 const char *name,
226 uint32_t nodeid,
227 uint32_t votes)
228 {
229 g_return_val_if_fail(name != NULL, NULL);
230
231 cfs_clnode_t *clnode = g_new0(cfs_clnode_t, 1);
232 if (!clnode)
233 return NULL;
234
235 clnode->name = g_strdup(name);
236 clnode->nodeid = nodeid;
237 clnode->votes = votes;
238
239 return clnode;
240 }
241
242 gboolean cfs_clinfo_destroy(
243 cfs_clinfo_t *clinfo)
244 {
245 g_return_val_if_fail(clinfo != NULL, FALSE);
246
247 if (clinfo->cluster_name)
248 g_free(clinfo->cluster_name);
249
250 if (clinfo->nodes_byname)
251 g_hash_table_destroy(clinfo->nodes_byname);
252
253 if (clinfo->nodes_byid)
254 g_hash_table_destroy(clinfo->nodes_byid);
255
256 g_free(clinfo);
257
258 return TRUE;
259 }
260
261 cfs_clinfo_t *cfs_clinfo_new(
262 const char *cluster_name,
263 uint32_t cman_version)
264 {
265 g_return_val_if_fail(cluster_name != NULL, NULL);
266
267 cfs_clinfo_t *clinfo = g_new0(cfs_clinfo_t, 1);
268 if (!clinfo)
269 return NULL;
270
271 clinfo->cluster_name = g_strdup(cluster_name);
272 clinfo->cman_version = cman_version;
273
274 if (!(clinfo->nodes_byid = g_hash_table_new_full(
275 g_int32_hash, g_int32_equal, NULL,
276 (GDestroyNotify)cfs_clnode_destroy)))
277 goto fail;
278
279 if (!(clinfo->nodes_byname = g_hash_table_new(g_str_hash, g_str_equal)))
280 goto fail;
281
282 return clinfo;
283
284 fail:
285 cfs_clinfo_destroy(clinfo);
286
287 return NULL;
288 }
289
290 gboolean cfs_clinfo_add_node(
291 cfs_clinfo_t *clinfo,
292 cfs_clnode_t *clnode)
293 {
294 g_return_val_if_fail(clinfo != NULL, FALSE);
295 g_return_val_if_fail(clnode != NULL, FALSE);
296
297 g_hash_table_replace(clinfo->nodes_byid, &clnode->nodeid, clnode);
298 g_hash_table_replace(clinfo->nodes_byname, clnode->name, clnode);
299
300 return TRUE;
301 }
302
303 int
304 cfs_create_memberlist_msg(
305 GString *str)
306 {
307 g_return_val_if_fail(str != NULL, -EINVAL);
308
309 g_mutex_lock (&mutex);
310
311 g_string_append_printf(str,"{\n");
312
313 guint nodecount = 0;
314
315 cfs_clinfo_t *clinfo = cfs_status.clinfo;
316
317 if (clinfo && clinfo->nodes_byid)
318 nodecount = g_hash_table_size(clinfo->nodes_byid);
319
320 if (nodecount) {
321 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
322 g_string_append_printf(str, "\"version\": %u,\n", cfs_status.clinfo_version);
323
324 g_string_append_printf(str, "\"cluster\": { ");
325 g_string_append_printf(str, "\"name\": \"%s\", \"version\": %d, "
326 "\"nodes\": %d, \"quorate\": %d ",
327 clinfo->cluster_name, clinfo->cman_version,
328 nodecount, cfs_status.quorate);
329
330 g_string_append_printf(str,"},\n");
331 g_string_append_printf(str,"\"nodelist\": {\n");
332
333 GHashTable *ht = clinfo->nodes_byid;
334 GHashTableIter iter;
335 gpointer key, value;
336
337 g_hash_table_iter_init (&iter, ht);
338
339 int i = 0;
340 while (g_hash_table_iter_next (&iter, &key, &value)) {
341 cfs_clnode_t *node = (cfs_clnode_t *)value;
342 if (i) g_string_append_printf(str, ",\n");
343 i++;
344
345 g_string_append_printf(str, " \"%s\": { \"id\": %d, \"online\": %d",
346 node->name, node->nodeid, node->online);
347
348
349 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, node->name);
350 if (ip) {
351 g_string_append_printf(str, ", \"ip\": \"%s\"", ip);
352 }
353
354 g_string_append_printf(str, "}");
355
356 }
357 g_string_append_printf(str,"\n }\n");
358 } else {
359 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
360 g_string_append_printf(str, "\"version\": %u\n", cfs_status.clinfo_version);
361 }
362
363 g_string_append_printf(str,"}\n");
364
365 g_mutex_unlock (&mutex);
366
367 return 0;
368 }
369
370 static void
371 kventry_free(kventry_t *entry)
372 {
373 g_return_if_fail(entry != NULL);
374
375 g_free(entry->key);
376 g_free(entry->data);
377 g_free(entry);
378 }
379
380 static GHashTable *
381 kventry_hash_new(void)
382 {
383 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
384 (GDestroyNotify)kventry_free);
385 }
386
387 static void
388 rrdentry_free(rrdentry_t *entry)
389 {
390 g_return_if_fail(entry != NULL);
391
392 g_free(entry->key);
393 g_free(entry->data);
394 g_free(entry);
395 }
396
397 static GHashTable *
398 rrdentry_hash_new(void)
399 {
400 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
401 (GDestroyNotify)rrdentry_free);
402 }
403
404 void
405 cfs_cluster_log_dump(GString *str, const char *user, guint max_entries)
406 {
407 clusterlog_dump(cfs_status.clusterlog, str, user, max_entries);
408 }
409
410 void
411 cfs_cluster_log(clog_entry_t *entry)
412 {
413 g_return_if_fail(entry != NULL);
414
415 clusterlog_insert(cfs_status.clusterlog, entry);
416
417 if (cfs_status.kvstore) {
418 struct iovec iov[1];
419 iov[0].iov_base = (char *)entry;
420 iov[0].iov_len = clog_entry_size(entry);
421
422 if (dfsm_is_initialized(cfs_status.kvstore))
423 dfsm_send_message(cfs_status.kvstore, KVSTORE_MESSAGE_LOG, iov, 1);
424 }
425 }
426
427 void cfs_status_init(void)
428 {
429 g_mutex_lock (&mutex);
430
431 cfs_status.start_time = time(NULL);
432
433 cfs_status.vmlist = vmlist_hash_new();
434
435 cfs_status.kvhash = kventry_hash_new();
436
437 cfs_status.rrdhash = rrdentry_hash_new();
438
439 cfs_status.iphash = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
440
441 cfs_status.memdb_changes = g_hash_table_new(g_str_hash, g_str_equal);
442
443 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
444 g_hash_table_replace(cfs_status.memdb_changes,
445 memdb_change_array[i].path,
446 &memdb_change_array[i]);
447 }
448
449 cfs_status.clusterlog = clusterlog_new();
450
451 // fixme:
452 clusterlog_add(cfs_status.clusterlog, "root", "cluster", getpid(),
453 LOG_INFO, "starting cluster log");
454
455 g_mutex_unlock (&mutex);
456 }
457
458 void cfs_status_cleanup(void)
459 {
460 g_mutex_lock (&mutex);
461
462 cfs_status.clinfo_version++;
463
464 if (cfs_status.clinfo) {
465 cfs_clinfo_destroy(cfs_status.clinfo);
466 cfs_status.clinfo = NULL;
467 }
468
469 if (cfs_status.vmlist) {
470 g_hash_table_destroy(cfs_status.vmlist);
471 cfs_status.vmlist = NULL;
472 }
473
474 if (cfs_status.kvhash) {
475 g_hash_table_destroy(cfs_status.kvhash);
476 cfs_status.kvhash = NULL;
477 }
478
479 if (cfs_status.rrdhash) {
480 g_hash_table_destroy(cfs_status.rrdhash);
481 cfs_status.rrdhash = NULL;
482 }
483
484 if (cfs_status.iphash) {
485 g_hash_table_destroy(cfs_status.iphash);
486 cfs_status.iphash = NULL;
487 }
488
489 if (cfs_status.clusterlog)
490 clusterlog_destroy(cfs_status.clusterlog);
491
492 g_mutex_unlock (&mutex);
493 }
494
495 void cfs_status_set_clinfo(
496 cfs_clinfo_t *clinfo)
497 {
498 g_return_if_fail(clinfo != NULL);
499
500 g_mutex_lock (&mutex);
501
502 cfs_status.clinfo_version++;
503
504 cfs_clinfo_t *old = cfs_status.clinfo;
505
506 cfs_status.clinfo = clinfo;
507
508 cfs_message("update cluster info (cluster name %s, version = %d)",
509 clinfo->cluster_name, clinfo->cman_version);
510
511
512 if (old && old->nodes_byid && clinfo->nodes_byid) {
513 /* copy kvstore */
514 GHashTable *ht = clinfo->nodes_byid;
515 GHashTableIter iter;
516 gpointer key, value;
517
518 g_hash_table_iter_init (&iter, ht);
519
520 while (g_hash_table_iter_next (&iter, &key, &value)) {
521 cfs_clnode_t *node = (cfs_clnode_t *)value;
522 cfs_clnode_t *oldnode;
523 if ((oldnode = g_hash_table_lookup(old->nodes_byid, key))) {
524 node->online = oldnode->online;
525 node->kvhash = oldnode->kvhash;
526 oldnode->kvhash = NULL;
527 }
528 }
529
530 }
531
532 if (old)
533 cfs_clinfo_destroy(old);
534
535
536 g_mutex_unlock (&mutex);
537 }
538
539 static void
540 dump_kvstore_versions(
541 GString *str,
542 GHashTable *kvhash,
543 const char *nodename)
544 {
545 g_return_if_fail(kvhash != NULL);
546 g_return_if_fail(str != NULL);
547 g_return_if_fail(nodename != NULL);
548
549 GHashTable *ht = kvhash;
550 GHashTableIter iter;
551 gpointer key, value;
552
553 g_string_append_printf(str, "\"%s\": {\n", nodename);
554
555 g_hash_table_iter_init (&iter, ht);
556
557 int i = 0;
558 while (g_hash_table_iter_next (&iter, &key, &value)) {
559 kventry_t *entry = (kventry_t *)value;
560 if (i) g_string_append_printf(str, ",\n");
561 i++;
562 g_string_append_printf(str,"\"%s\": %u", entry->key, entry->version);
563 }
564
565 g_string_append_printf(str, "}\n");
566 }
567
568 int
569 cfs_create_version_msg(GString *str)
570 {
571 g_return_val_if_fail(str != NULL, -EINVAL);
572
573 g_mutex_lock (&mutex);
574
575 g_string_append_printf(str,"{\n");
576
577 g_string_append_printf(str, "\"starttime\": %lu,\n", (unsigned long)cfs_status.start_time);
578
579 g_string_append_printf(str, "\"clinfo\": %u,\n", cfs_status.clinfo_version);
580
581 g_string_append_printf(str, "\"vmlist\": %u,\n", cfs_status.vmlist_version);
582
583 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
584 g_string_append_printf(str, "\"%s\": %u,\n",
585 memdb_change_array[i].path,
586 memdb_change_array[i].version);
587 }
588
589 g_string_append_printf(str, "\"kvstore\": {\n");
590
591 dump_kvstore_versions(str, cfs_status.kvhash, cfs.nodename);
592
593 cfs_clinfo_t *clinfo = cfs_status.clinfo;
594
595 if (clinfo && clinfo->nodes_byid) {
596 GHashTable *ht = clinfo->nodes_byid;
597 GHashTableIter iter;
598 gpointer key, value;
599
600 g_hash_table_iter_init (&iter, ht);
601
602 while (g_hash_table_iter_next (&iter, &key, &value)) {
603 cfs_clnode_t *node = (cfs_clnode_t *)value;
604 if (!node->kvhash)
605 continue;
606 g_string_append_printf(str, ",\n");
607 dump_kvstore_versions(str, node->kvhash, node->name);
608 }
609 }
610
611 g_string_append_printf(str,"}\n");
612
613 g_string_append_printf(str,"}\n");
614
615 g_mutex_unlock (&mutex);
616
617 return 0;
618 }
619
620 GHashTable *
621 vmlist_hash_new(void)
622 {
623 return g_hash_table_new_full(g_int_hash, g_int_equal, NULL,
624 (GDestroyNotify)vminfo_free);
625 }
626
627 gboolean
628 vmlist_hash_insert_vm(
629 GHashTable *vmlist,
630 int vmtype,
631 guint32 vmid,
632 const char *nodename,
633 gboolean replace)
634 {
635 g_return_val_if_fail(vmlist != NULL, FALSE);
636 g_return_val_if_fail(nodename != NULL, FALSE);
637 g_return_val_if_fail(vmid != 0, FALSE);
638 // FIXME: remove openvz stuff for 7.x
639 g_return_val_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
640 vmtype == VMTYPE_LXC, FALSE);
641
642 if (!replace && g_hash_table_lookup(vmlist, &vmid)) {
643 cfs_critical("detected duplicate VMID %d", vmid);
644 return FALSE;
645 }
646
647 vminfo_t *vminfo = g_new0(vminfo_t, 1);
648
649 vminfo->vmid = vmid;
650 vminfo->vmtype = vmtype;
651 vminfo->nodename = g_strdup(nodename);
652
653 vminfo->version = ++vminfo_version_counter;
654
655 g_hash_table_replace(vmlist, &vminfo->vmid, vminfo);
656
657 return TRUE;
658 }
659
660 void
661 vmlist_register_vm(
662 int vmtype,
663 guint32 vmid,
664 const char *nodename)
665 {
666 g_return_if_fail(cfs_status.vmlist != NULL);
667 g_return_if_fail(nodename != NULL);
668 g_return_if_fail(vmid != 0);
669 // FIXME: remove openvz stuff for 7.x
670 g_return_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
671 vmtype == VMTYPE_LXC);
672
673 cfs_debug("vmlist_register_vm: %s/%u %d", nodename, vmid, vmtype);
674
675 g_mutex_lock (&mutex);
676
677 cfs_status.vmlist_version++;
678
679 vmlist_hash_insert_vm(cfs_status.vmlist, vmtype, vmid, nodename, TRUE);
680
681 g_mutex_unlock (&mutex);
682 }
683
684 gboolean
685 vmlist_different_vm_exists(
686 int vmtype,
687 guint32 vmid,
688 const char *nodename)
689 {
690 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
691 g_return_val_if_fail(vmid != 0, FALSE);
692
693 gboolean res = FALSE;
694
695 g_mutex_lock (&mutex);
696
697 vminfo_t *vminfo;
698 if ((vminfo = (vminfo_t *)g_hash_table_lookup(cfs_status.vmlist, &vmid))) {
699 if (!(vminfo->vmtype == vmtype && strcmp(vminfo->nodename, nodename) == 0))
700 res = TRUE;
701 }
702 g_mutex_unlock (&mutex);
703
704 return res;
705 }
706
707 gboolean
708 vmlist_vm_exists(
709 guint32 vmid)
710 {
711 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
712 g_return_val_if_fail(vmid != 0, FALSE);
713
714 g_mutex_lock (&mutex);
715
716 gpointer res = g_hash_table_lookup(cfs_status.vmlist, &vmid);
717
718 g_mutex_unlock (&mutex);
719
720 return res != NULL;
721 }
722
723 void
724 vmlist_delete_vm(
725 guint32 vmid)
726 {
727 g_return_if_fail(cfs_status.vmlist != NULL);
728 g_return_if_fail(vmid != 0);
729
730 g_mutex_lock (&mutex);
731
732 cfs_status.vmlist_version++;
733
734 g_hash_table_remove(cfs_status.vmlist, &vmid);
735
736 g_mutex_unlock (&mutex);
737 }
738
739 void cfs_status_set_vmlist(
740 GHashTable *vmlist)
741 {
742 g_return_if_fail(vmlist != NULL);
743
744 g_mutex_lock (&mutex);
745
746 cfs_status.vmlist_version++;
747
748 if (cfs_status.vmlist)
749 g_hash_table_destroy(cfs_status.vmlist);
750
751 cfs_status.vmlist = vmlist;
752
753 g_mutex_unlock (&mutex);
754 }
755
756 int
757 cfs_create_vmlist_msg(GString *str)
758 {
759 g_return_val_if_fail(cfs_status.vmlist != NULL, -EINVAL);
760 g_return_val_if_fail(str != NULL, -EINVAL);
761
762 g_mutex_lock (&mutex);
763
764 g_string_append_printf(str,"{\n");
765
766 GHashTable *ht = cfs_status.vmlist;
767
768 guint count = g_hash_table_size(ht);
769
770 if (!count) {
771 g_string_append_printf(str,"\"version\": %u\n", cfs_status.vmlist_version);
772 } else {
773 g_string_append_printf(str,"\"version\": %u,\n", cfs_status.vmlist_version);
774
775 g_string_append_printf(str,"\"ids\": {\n");
776
777 GHashTableIter iter;
778 gpointer key, value;
779
780 g_hash_table_iter_init (&iter, ht);
781
782 int first = 1;
783 while (g_hash_table_iter_next (&iter, &key, &value)) {
784 vminfo_t *vminfo = (vminfo_t *)value;
785 const char *type = vminfo_type_to_string(vminfo);
786
787 if (!first)
788 g_string_append_printf(str, ",\n");
789 first = 0;
790
791 g_string_append_printf(str,"\"%u\": { \"node\": \"%s\", \"type\": \"%s\", \"version\": %u }",
792 vminfo->vmid, vminfo->nodename, type, vminfo->version);
793 }
794
795 g_string_append_printf(str,"}\n");
796 }
797 g_string_append_printf(str,"\n}\n");
798
799 g_mutex_unlock (&mutex);
800
801 return 0;
802 }
803
804 // checks the conf for a line starting with '$prop:' and returns the value
805 // afterwards, whitout initial whitespace(s), we only deal with the format
806 // restricion imposed by our perl VM config parser, main reference is
807 // PVE::QemuServer::parse_vm_config this allows to be very fast and still
808 // relatively simple
809 // main restrictions used for our advantage is the properties match reges:
810 // ($line =~ m/^([a-z][a-z_]*\d*):\s*(.+?)\s*$/) from parse_vm_config
811 // currently we only look at the current configuration in place, i.e., *no*
812 // snapshort and *no* pending changes
813 static char *
814 _get_property_value(char *conf, int conf_size, const char *prop, int prop_len)
815 {
816 const char *const conf_end = conf + conf_size;
817 char *line = conf;
818 size_t remaining_size;
819
820 char *next_newline = memchr(conf, '\n', conf_size);
821 if (next_newline == NULL) {
822 return NULL; // valid property lines end with \n, but none in the config
823 }
824 *next_newline = '\0';
825
826 while (line != NULL) {
827 if (!line[0]) goto next;
828
829 // snapshot or pending section start, but nothing found yet -> not found
830 if (line[0] == '[') return NULL;
831 // properties start with /^[a-z]/, so continue early if not
832 if (line[0] < 'a' || line[0] > 'z') goto next;
833
834 int line_len = strlen(line);
835 if (line_len <= prop_len + 1) goto next;
836
837 if (line[prop_len] == ':' && memcmp(line, prop, prop_len) == 0) { // found
838 char *v_start = &line[prop_len + 1];
839
840 // drop initial value whitespaces here already
841 while (*v_start && isspace(*v_start)) v_start++;
842
843 if (!*v_start) return NULL;
844
845 char *v_end = &line[line_len - 1];
846 while (v_end > v_start && isspace(*v_end)) v_end--;
847 v_end[1] = '\0';
848
849 return v_start;
850 }
851 next:
852 line = next_newline + 1;
853 remaining_size = conf_end - line;
854 if (remaining_size <= prop_len) {
855 return NULL;
856 }
857 next_newline = memchr(line, '\n', remaining_size);
858 if (next_newline == NULL) {
859 return NULL; // valid property lines end with \n, but none in the config
860 }
861 *next_newline = '\0';
862 }
863
864 return NULL; // not found
865 }
866
867 static void
868 _g_str_append_kv_jsonescaped(GString *str, const char *k, const char *v)
869 {
870 g_string_append_printf(str, "\"%s\": \"", k);
871
872 for (; *v; v++) {
873 if (*v == '\\' || *v == '"') {
874 g_string_append_c(str, '\\');
875 }
876 g_string_append_c(str, *v);
877 }
878
879 g_string_append_c(str, '"');
880 }
881
882 int
883 cfs_create_guest_conf_property_msg(GString *str, memdb_t *memdb, const char *prop, uint32_t vmid)
884 {
885 g_return_val_if_fail(cfs_status.vmlist != NULL, -EINVAL);
886 g_return_val_if_fail(str != NULL, -EINVAL);
887
888 int prop_len = strlen(prop);
889 int res = 0;
890 GString *path = NULL;
891
892 // Prelock &memdb->mutex in order to enable the usage of memdb_read_nolock
893 // to prevent Deadlocks as in #2553
894 g_mutex_lock (&memdb->mutex);
895 g_mutex_lock (&mutex);
896
897 g_string_printf(str,"{\n");
898
899 GHashTable *ht = cfs_status.vmlist;
900 gpointer tmp = NULL;
901 if (!g_hash_table_size(ht)) {
902 goto ret;
903 }
904
905 if ((path = g_string_sized_new(256)) == NULL) {
906 res = -ENOMEM;
907 goto ret;
908 }
909
910 if (vmid >= 100) {
911 vminfo_t *vminfo = (vminfo_t *) g_hash_table_lookup(cfs_status.vmlist, &vmid);
912 if (vminfo == NULL) goto enoent;
913
914 if (!vminfo_to_path(vminfo, path)) goto err;
915
916 // use memdb_read_nolock because lock is handled here
917 int size = memdb_read_nolock(memdb, path->str, &tmp);
918 if (tmp == NULL) goto err;
919 if (size <= prop_len) goto ret;
920
921 char *val = _get_property_value(tmp, size, prop, prop_len);
922 if (val == NULL) goto ret;
923
924 g_string_append_printf(str, "\"%u\":{", vmid);
925 _g_str_append_kv_jsonescaped(str, prop, val);
926 g_string_append_c(str, '}');
927
928 } else {
929 GHashTableIter iter;
930 g_hash_table_iter_init (&iter, ht);
931
932 gpointer key, value;
933 int first = 1;
934 while (g_hash_table_iter_next (&iter, &key, &value)) {
935 vminfo_t *vminfo = (vminfo_t *)value;
936
937 if (!vminfo_to_path(vminfo, path)) goto err;
938
939 g_free(tmp); // no-op if already null
940 tmp = NULL;
941 // use memdb_read_nolock because lock is handled here
942 int size = memdb_read_nolock(memdb, path->str, &tmp);
943 if (tmp == NULL || size <= prop_len) continue;
944
945 char *val = _get_property_value(tmp, size, prop, prop_len);
946 if (val == NULL) continue;
947
948 if (!first) g_string_append_printf(str, ",\n");
949 else first = 0;
950
951 g_string_append_printf(str, "\"%u\":{", vminfo->vmid);
952 _g_str_append_kv_jsonescaped(str, prop, val);
953 g_string_append_c(str, '}');
954 }
955 }
956 ret:
957 g_free(tmp);
958 if (path != NULL) {
959 g_string_free(path, TRUE);
960 }
961 g_string_append_printf(str,"\n}\n");
962 g_mutex_unlock (&mutex);
963 g_mutex_unlock (&memdb->mutex);
964 return res;
965 err:
966 res = -EIO;
967 goto ret;
968 enoent:
969 res = -ENOENT;
970 goto ret;
971 }
972
973 void
974 record_memdb_change(const char *path)
975 {
976 g_return_if_fail(cfs_status.memdb_changes != 0);
977
978 memdb_change_t *ce;
979
980 if ((ce = (memdb_change_t *)g_hash_table_lookup(cfs_status.memdb_changes, path))) {
981 ce->version++;
982 }
983 }
984
985 void
986 record_memdb_reload(void)
987 {
988 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
989 memdb_change_array[i].version++;
990 }
991 }
992
993 static gboolean
994 kventry_hash_set(
995 GHashTable *kvhash,
996 const char *key,
997 gconstpointer data,
998 size_t len)
999 {
1000 g_return_val_if_fail(kvhash != NULL, FALSE);
1001 g_return_val_if_fail(key != NULL, FALSE);
1002 g_return_val_if_fail(data != NULL, FALSE);
1003
1004 kventry_t *entry;
1005 if (!len) {
1006 g_hash_table_remove(kvhash, key);
1007 } else if ((entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
1008 g_free(entry->data);
1009 entry->data = g_memdup(data, len);
1010 entry->len = len;
1011 entry->version++;
1012 } else {
1013 kventry_t *entry = g_new0(kventry_t, 1);
1014
1015 entry->key = g_strdup(key);
1016 entry->data = g_memdup(data, len);
1017 entry->len = len;
1018
1019 g_hash_table_replace(kvhash, entry->key, entry);
1020 }
1021
1022 return TRUE;
1023 }
1024
1025 static const char *rrd_def_node[] = {
1026 "DS:loadavg:GAUGE:120:0:U",
1027 "DS:maxcpu:GAUGE:120:0:U",
1028 "DS:cpu:GAUGE:120:0:U",
1029 "DS:iowait:GAUGE:120:0:U",
1030 "DS:memtotal:GAUGE:120:0:U",
1031 "DS:memused:GAUGE:120:0:U",
1032 "DS:swaptotal:GAUGE:120:0:U",
1033 "DS:swapused:GAUGE:120:0:U",
1034 "DS:roottotal:GAUGE:120:0:U",
1035 "DS:rootused:GAUGE:120:0:U",
1036 "DS:netin:DERIVE:120:0:U",
1037 "DS:netout:DERIVE:120:0:U",
1038
1039 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
1040 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
1041 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
1042 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
1043 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
1044
1045 "RRA:MAX:0.5:1:70", // 1 min max - one hour
1046 "RRA:MAX:0.5:30:70", // 30 min max - one day
1047 "RRA:MAX:0.5:180:70", // 3 hour max - one week
1048 "RRA:MAX:0.5:720:70", // 12 hour max - one month
1049 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
1050 NULL,
1051 };
1052
1053 static const char *rrd_def_vm[] = {
1054 "DS:maxcpu:GAUGE:120:0:U",
1055 "DS:cpu:GAUGE:120:0:U",
1056 "DS:maxmem:GAUGE:120:0:U",
1057 "DS:mem:GAUGE:120:0:U",
1058 "DS:maxdisk:GAUGE:120:0:U",
1059 "DS:disk:GAUGE:120:0:U",
1060 "DS:netin:DERIVE:120:0:U",
1061 "DS:netout:DERIVE:120:0:U",
1062 "DS:diskread:DERIVE:120:0:U",
1063 "DS:diskwrite:DERIVE:120:0:U",
1064
1065 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
1066 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
1067 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
1068 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
1069 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
1070
1071 "RRA:MAX:0.5:1:70", // 1 min max - one hour
1072 "RRA:MAX:0.5:30:70", // 30 min max - one day
1073 "RRA:MAX:0.5:180:70", // 3 hour max - one week
1074 "RRA:MAX:0.5:720:70", // 12 hour max - one month
1075 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
1076 NULL,
1077 };
1078
1079 static const char *rrd_def_storage[] = {
1080 "DS:total:GAUGE:120:0:U",
1081 "DS:used:GAUGE:120:0:U",
1082
1083 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
1084 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
1085 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
1086 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
1087 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
1088
1089 "RRA:MAX:0.5:1:70", // 1 min max - one hour
1090 "RRA:MAX:0.5:30:70", // 30 min max - one day
1091 "RRA:MAX:0.5:180:70", // 3 hour max - one week
1092 "RRA:MAX:0.5:720:70", // 12 hour max - one month
1093 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
1094 NULL,
1095 };
1096
1097 #define RRDDIR "/var/lib/rrdcached/db"
1098
1099 static void
1100 create_rrd_file(
1101 const char *filename,
1102 int argcount,
1103 const char *rrddef[])
1104 {
1105 /* start at day boundary */
1106 time_t ctime;
1107 time(&ctime);
1108 struct tm *ltm = localtime(&ctime);
1109 ltm->tm_sec = 0;
1110 ltm->tm_min = 0;
1111 ltm->tm_hour = 0;
1112
1113 rrd_clear_error();
1114 if (rrd_create_r(filename, 60, timelocal(ltm), argcount, rrddef)) {
1115 cfs_message("RRD create error %s: %s", filename, rrd_get_error());
1116 }
1117 }
1118
1119 static inline const char *
1120 rrd_skip_data(
1121 const char *data,
1122 int count)
1123 {
1124 int found = 0;
1125 while (*data && found < count) {
1126 if (*data++ == ':')
1127 found++;
1128 }
1129 return data;
1130 }
1131
1132 static void
1133 update_rrd_data(
1134 const char *key,
1135 gconstpointer data,
1136 size_t len)
1137 {
1138 g_return_if_fail(key != NULL);
1139 g_return_if_fail(data != NULL);
1140 g_return_if_fail(len > 0);
1141 g_return_if_fail(len < 4096);
1142
1143 static const char *rrdcsock = "unix:/var/run/rrdcached.sock";
1144
1145 int use_daemon = 1;
1146 if (rrdc_connect(rrdcsock) != 0)
1147 use_daemon = 0;
1148
1149 char *filename = NULL;
1150
1151 int skip = 0;
1152
1153 if (strncmp(key, "pve2-node/", 10) == 0) {
1154 const char *node = key + 10;
1155
1156 skip = 2;
1157
1158 if (strchr(node, '/') != NULL)
1159 goto keyerror;
1160
1161 if (strlen(node) < 1)
1162 goto keyerror;
1163
1164 filename = g_strdup_printf(RRDDIR "/%s", key);
1165
1166 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1167
1168 mkdir(RRDDIR "/pve2-node", 0755);
1169 int argcount = sizeof(rrd_def_node)/sizeof(void*) - 1;
1170 create_rrd_file(filename, argcount, rrd_def_node);
1171 }
1172
1173 } else if ((strncmp(key, "pve2-vm/", 8) == 0) ||
1174 (strncmp(key, "pve2.3-vm/", 10) == 0)) {
1175 const char *vmid;
1176
1177 if (strncmp(key, "pve2-vm/", 8) == 0) {
1178 vmid = key + 8;
1179 skip = 2;
1180 } else {
1181 vmid = key + 10;
1182 skip = 4;
1183 }
1184
1185 if (strchr(vmid, '/') != NULL)
1186 goto keyerror;
1187
1188 if (strlen(vmid) < 1)
1189 goto keyerror;
1190
1191 filename = g_strdup_printf(RRDDIR "/%s/%s", "pve2-vm", vmid);
1192
1193 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1194
1195 mkdir(RRDDIR "/pve2-vm", 0755);
1196 int argcount = sizeof(rrd_def_vm)/sizeof(void*) - 1;
1197 create_rrd_file(filename, argcount, rrd_def_vm);
1198 }
1199
1200 } else if (strncmp(key, "pve2-storage/", 13) == 0) {
1201 const char *node = key + 13;
1202
1203 const char *storage = node;
1204 while (*storage && *storage != '/')
1205 storage++;
1206
1207 if (*storage != '/' || ((storage - node) < 1))
1208 goto keyerror;
1209
1210 storage++;
1211
1212 if (strchr(storage, '/') != NULL)
1213 goto keyerror;
1214
1215 if (strlen(storage) < 1)
1216 goto keyerror;
1217
1218 filename = g_strdup_printf(RRDDIR "/%s", key);
1219
1220 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1221
1222 mkdir(RRDDIR "/pve2-storage", 0755);
1223
1224 char *dir = g_path_get_dirname(filename);
1225 mkdir(dir, 0755);
1226 g_free(dir);
1227
1228 int argcount = sizeof(rrd_def_storage)/sizeof(void*) - 1;
1229 create_rrd_file(filename, argcount, rrd_def_storage);
1230 }
1231
1232 } else {
1233 goto keyerror;
1234 }
1235
1236 const char *dp = skip ? rrd_skip_data(data, skip) : data;
1237
1238 const char *update_args[] = { dp, NULL };
1239
1240 if (use_daemon) {
1241 int status;
1242 if ((status = rrdc_update(filename, 1, update_args)) != 0) {
1243 cfs_message("RRDC update error %s: %d", filename, status);
1244 rrdc_disconnect();
1245 rrd_clear_error();
1246 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1247 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1248 }
1249 }
1250
1251 } else {
1252 rrd_clear_error();
1253 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1254 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1255 }
1256 }
1257
1258 ret:
1259 if (filename)
1260 g_free(filename);
1261
1262 return;
1263
1264 keyerror:
1265 cfs_critical("RRD update error: unknown/wrong key %s", key);
1266 goto ret;
1267 }
1268
1269 static gboolean
1270 rrd_entry_is_old(
1271 gpointer key,
1272 gpointer value,
1273 gpointer user_data)
1274 {
1275 rrdentry_t *entry = (rrdentry_t *)value;
1276 uint32_t ctime = GPOINTER_TO_UINT(user_data);
1277
1278 int diff = ctime - entry->time;
1279
1280 /* remove everything older than 5 minutes */
1281 int expire = 60*5;
1282
1283 return (diff > expire) ? TRUE : FALSE;
1284 }
1285
1286 static char *rrd_dump_buf = NULL;
1287 static time_t rrd_dump_last = 0;
1288
1289 void
1290 cfs_rrd_dump(GString *str)
1291 {
1292 time_t ctime;
1293
1294 g_mutex_lock (&mutex);
1295
1296 time(&ctime);
1297 if (rrd_dump_buf && (ctime - rrd_dump_last) < 2) {
1298 g_string_assign(str, rrd_dump_buf);
1299 g_mutex_unlock (&mutex);
1300 return;
1301 }
1302
1303 /* remove old data */
1304 g_hash_table_foreach_remove(cfs_status.rrdhash, rrd_entry_is_old,
1305 GUINT_TO_POINTER(ctime));
1306
1307 g_string_set_size(str, 0);
1308
1309 GHashTableIter iter;
1310 gpointer key, value;
1311
1312 g_hash_table_iter_init (&iter, cfs_status.rrdhash);
1313
1314 while (g_hash_table_iter_next (&iter, &key, &value)) {
1315 rrdentry_t *entry = (rrdentry_t *)value;
1316 g_string_append(str, key);
1317 g_string_append(str, ":");
1318 g_string_append(str, entry->data);
1319 g_string_append(str, "\n");
1320 }
1321
1322 g_string_append_c(str, 0); // never return undef
1323
1324 rrd_dump_last = ctime;
1325 if (rrd_dump_buf)
1326 g_free(rrd_dump_buf);
1327 rrd_dump_buf = g_strdup(str->str);
1328
1329 g_mutex_unlock (&mutex);
1330 }
1331
1332 static gboolean
1333 nodeip_hash_set(
1334 GHashTable *iphash,
1335 const char *nodename,
1336 const char *ip,
1337 size_t len)
1338 {
1339 g_return_val_if_fail(iphash != NULL, FALSE);
1340 g_return_val_if_fail(nodename != NULL, FALSE);
1341 g_return_val_if_fail(ip != NULL, FALSE);
1342 g_return_val_if_fail(len > 0, FALSE);
1343 g_return_val_if_fail(len < 256, FALSE);
1344 g_return_val_if_fail(ip[len-1] == 0, FALSE);
1345
1346 char *oldip = (char *)g_hash_table_lookup(iphash, nodename);
1347
1348 if (!oldip || (strcmp(oldip, ip) != 0)) {
1349 cfs_status.clinfo_version++;
1350 g_hash_table_replace(iphash, g_strdup(nodename), g_strdup(ip));
1351 }
1352
1353 return TRUE;
1354 }
1355
1356 static gboolean
1357 rrdentry_hash_set(
1358 GHashTable *rrdhash,
1359 const char *key,
1360 const char *data,
1361 size_t len)
1362 {
1363 g_return_val_if_fail(rrdhash != NULL, FALSE);
1364 g_return_val_if_fail(key != NULL, FALSE);
1365 g_return_val_if_fail(data != NULL, FALSE);
1366 g_return_val_if_fail(len > 0, FALSE);
1367 g_return_val_if_fail(len < 4096, FALSE);
1368 g_return_val_if_fail(data[len-1] == 0, FALSE);
1369
1370 rrdentry_t *entry;
1371 if ((entry = (rrdentry_t *)g_hash_table_lookup(rrdhash, key))) {
1372 g_free(entry->data);
1373 entry->data = g_memdup(data, len);
1374 entry->len = len;
1375 entry->time = time(NULL);
1376 } else {
1377 rrdentry_t *entry = g_new0(rrdentry_t, 1);
1378
1379 entry->key = g_strdup(key);
1380 entry->data = g_memdup(data, len);
1381 entry->len = len;
1382 entry->time = time(NULL);
1383
1384 g_hash_table_replace(rrdhash, entry->key, entry);
1385 }
1386
1387 update_rrd_data(key, data, len);
1388
1389 return TRUE;
1390 }
1391
1392 static int
1393 kvstore_send_update_message(
1394 dfsm_t *dfsm,
1395 const char *key,
1396 gpointer data,
1397 guint32 len)
1398 {
1399 if (!dfsm_is_initialized(dfsm))
1400 return -EACCES;
1401
1402 struct iovec iov[2];
1403
1404 char name[256];
1405 g_strlcpy(name, key, sizeof(name));
1406
1407 iov[0].iov_base = &name;
1408 iov[0].iov_len = sizeof(name);
1409
1410 iov[1].iov_base = (char *)data;
1411 iov[1].iov_len = len;
1412
1413 if (dfsm_send_message(dfsm, KVSTORE_MESSAGE_UPDATE, iov, 2) == CS_OK)
1414 return 0;
1415
1416 return -EACCES;
1417 }
1418
1419 static clog_entry_t *
1420 kvstore_parse_log_message(
1421 const void *msg,
1422 size_t msg_len)
1423 {
1424 g_return_val_if_fail(msg != NULL, NULL);
1425
1426 if (msg_len < sizeof(clog_entry_t)) {
1427 cfs_critical("received short log message (%zu < %zu)", msg_len, sizeof(clog_entry_t));
1428 return NULL;
1429 }
1430
1431 clog_entry_t *entry = (clog_entry_t *)msg;
1432
1433 uint32_t size = sizeof(clog_entry_t) + entry->node_len +
1434 entry->ident_len + entry->tag_len + entry->msg_len;
1435
1436 if (msg_len != size) {
1437 cfs_critical("received log message with wrong size (%zu != %u)", msg_len, size);
1438 return NULL;
1439 }
1440
1441 char *msgptr = entry->data;
1442
1443 if (*((char *)msgptr + entry->node_len - 1)) {
1444 cfs_critical("unterminated string in log message");
1445 return NULL;
1446 }
1447 msgptr += entry->node_len;
1448
1449 if (*((char *)msgptr + entry->ident_len - 1)) {
1450 cfs_critical("unterminated string in log message");
1451 return NULL;
1452 }
1453 msgptr += entry->ident_len;
1454
1455 if (*((char *)msgptr + entry->tag_len - 1)) {
1456 cfs_critical("unterminated string in log message");
1457 return NULL;
1458 }
1459 msgptr += entry->tag_len;
1460
1461 if (*((char *)msgptr + entry->msg_len - 1)) {
1462 cfs_critical("unterminated string in log message");
1463 return NULL;
1464 }
1465
1466 return entry;
1467 }
1468
1469 static gboolean
1470 kvstore_parse_update_message(
1471 const void *msg,
1472 size_t msg_len,
1473 const char **key,
1474 gconstpointer *data,
1475 guint32 *len)
1476 {
1477 g_return_val_if_fail(msg != NULL, FALSE);
1478 g_return_val_if_fail(key != NULL, FALSE);
1479 g_return_val_if_fail(data != NULL, FALSE);
1480 g_return_val_if_fail(len != NULL, FALSE);
1481
1482 if (msg_len < 256) {
1483 cfs_critical("received short kvstore message (%zu < 256)", msg_len);
1484 return FALSE;
1485 }
1486
1487 /* test if key is null terminated */
1488 int i = 0;
1489 for (i = 0; i < 256; i++)
1490 if (((char *)msg)[i] == 0)
1491 break;
1492
1493 if (i == 256)
1494 return FALSE;
1495
1496
1497 *len = msg_len - 256;
1498 *key = msg;
1499 *data = (char *) msg + 256;
1500
1501 return TRUE;
1502 }
1503
1504 int
1505 cfs_create_status_msg(
1506 GString *str,
1507 const char *nodename,
1508 const char *key)
1509 {
1510 g_return_val_if_fail(str != NULL, -EINVAL);
1511 g_return_val_if_fail(key != NULL, -EINVAL);
1512
1513 int res = -ENOENT;
1514
1515 GHashTable *kvhash = NULL;
1516
1517 g_mutex_lock (&mutex);
1518
1519 if (!nodename || !nodename[0] || !strcmp(nodename, cfs.nodename)) {
1520 kvhash = cfs_status.kvhash;
1521 } else if (cfs_status.clinfo && cfs_status.clinfo->nodes_byname) {
1522 cfs_clnode_t *clnode;
1523 if ((clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byname, nodename)))
1524 kvhash = clnode->kvhash;
1525 }
1526
1527 kventry_t *entry;
1528 if (kvhash && (entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
1529 g_string_append_len(str, entry->data, entry->len);
1530 res = 0;
1531 }
1532
1533 g_mutex_unlock (&mutex);
1534
1535 return res;
1536 }
1537
1538 int
1539 cfs_status_set(
1540 const char *key,
1541 gpointer data,
1542 size_t len)
1543 {
1544 g_return_val_if_fail(key != NULL, FALSE);
1545 g_return_val_if_fail(data != NULL, FALSE);
1546 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1547
1548 if (len > CFS_MAX_STATUS_SIZE)
1549 return -EFBIG;
1550
1551 g_mutex_lock (&mutex);
1552
1553 gboolean res;
1554
1555 if (strncmp(key, "rrd/", 4) == 0) {
1556 res = rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1557 } else if (!strcmp(key, "nodeip")) {
1558 res = nodeip_hash_set(cfs_status.iphash, cfs.nodename, data, len);
1559 } else {
1560 res = kventry_hash_set(cfs_status.kvhash, key, data, len);
1561 }
1562 g_mutex_unlock (&mutex);
1563
1564 if (cfs_status.kvstore)
1565 kvstore_send_update_message(cfs_status.kvstore, key, data, len);
1566
1567 return res ? 0 : -ENOMEM;
1568 }
1569
1570 gboolean
1571 cfs_kvstore_node_set(
1572 uint32_t nodeid,
1573 const char *key,
1574 gconstpointer data,
1575 size_t len)
1576 {
1577 g_return_val_if_fail(nodeid != 0, FALSE);
1578 g_return_val_if_fail(key != NULL, FALSE);
1579 g_return_val_if_fail(data != NULL, FALSE);
1580
1581 g_mutex_lock (&mutex);
1582
1583 if (!cfs_status.clinfo || !cfs_status.clinfo->nodes_byid)
1584 goto ret; /* ignore */
1585
1586 cfs_clnode_t *clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byid, &nodeid);
1587 if (!clnode)
1588 goto ret; /* ignore */
1589
1590 cfs_debug("got node %d status update %s", nodeid, key);
1591
1592 if (strncmp(key, "rrd/", 4) == 0) {
1593 rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1594 } else if (!strcmp(key, "nodeip")) {
1595 nodeip_hash_set(cfs_status.iphash, clnode->name, data, len);
1596 } else {
1597 if (!clnode->kvhash) {
1598 if (!(clnode->kvhash = kventry_hash_new())) {
1599 goto ret; /*ignore */
1600 }
1601 }
1602
1603 kventry_hash_set(clnode->kvhash, key, data, len);
1604
1605 }
1606 ret:
1607 g_mutex_unlock (&mutex);
1608
1609 return TRUE;
1610 }
1611
1612 static gboolean
1613 cfs_kvstore_sync(void)
1614 {
1615 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1616 g_return_val_if_fail(cfs_status.kvstore != NULL, FALSE);
1617
1618 gboolean res = TRUE;
1619
1620 g_mutex_lock (&mutex);
1621
1622 GHashTable *ht = cfs_status.kvhash;
1623 GHashTableIter iter;
1624 gpointer key, value;
1625
1626 g_hash_table_iter_init (&iter, ht);
1627
1628 while (g_hash_table_iter_next (&iter, &key, &value)) {
1629 kventry_t *entry = (kventry_t *)value;
1630 kvstore_send_update_message(cfs_status.kvstore, entry->key, entry->data, entry->len);
1631 }
1632
1633 g_mutex_unlock (&mutex);
1634
1635 return res;
1636 }
1637
1638 static int
1639 dfsm_deliver(
1640 dfsm_t *dfsm,
1641 gpointer data,
1642 int *res_ptr,
1643 uint32_t nodeid,
1644 uint32_t pid,
1645 uint16_t msg_type,
1646 uint32_t msg_time,
1647 const void *msg,
1648 size_t msg_len)
1649 {
1650 g_return_val_if_fail(dfsm != NULL, -1);
1651 g_return_val_if_fail(msg != NULL, -1);
1652 g_return_val_if_fail(res_ptr != NULL, -1);
1653
1654 /* ignore message for ourself */
1655 if (dfsm_nodeid_is_local(dfsm, nodeid, pid))
1656 goto ret;
1657
1658 if (msg_type == KVSTORE_MESSAGE_UPDATE) {
1659 const char *key;
1660 gconstpointer data;
1661 guint32 len;
1662 if (kvstore_parse_update_message(msg, msg_len, &key, &data, &len)) {
1663 cfs_kvstore_node_set(nodeid, key, data, len);
1664 } else {
1665 cfs_critical("cant parse update message");
1666 }
1667 } else if (msg_type == KVSTORE_MESSAGE_LOG) {
1668 cfs_message("received log"); // fixme: remove
1669 const clog_entry_t *entry;
1670 if ((entry = kvstore_parse_log_message(msg, msg_len))) {
1671 clusterlog_insert(cfs_status.clusterlog, entry);
1672 } else {
1673 cfs_critical("cant parse log message");
1674 }
1675 } else {
1676 cfs_critical("received unknown message type %d\n", msg_type);
1677 goto fail;
1678 }
1679
1680 ret:
1681 *res_ptr = 0;
1682 return 1;
1683
1684 fail:
1685 *res_ptr = -EACCES;
1686 return 1;
1687 }
1688
1689 static void
1690 dfsm_confchg(
1691 dfsm_t *dfsm,
1692 gpointer data,
1693 const struct cpg_address *member_list,
1694 size_t member_list_entries)
1695 {
1696 g_return_if_fail(dfsm != NULL);
1697 g_return_if_fail(member_list != NULL);
1698
1699 cfs_debug("enter %s", __func__);
1700
1701 g_mutex_lock (&mutex);
1702
1703 cfs_clinfo_t *clinfo = cfs_status.clinfo;
1704
1705 if (clinfo && clinfo->nodes_byid) {
1706
1707 GHashTable *ht = clinfo->nodes_byid;
1708 GHashTableIter iter;
1709 gpointer key, value;
1710
1711 g_hash_table_iter_init (&iter, ht);
1712
1713 while (g_hash_table_iter_next (&iter, &key, &value)) {
1714 cfs_clnode_t *node = (cfs_clnode_t *)value;
1715 node->online = FALSE;
1716 }
1717
1718 for (int i = 0; i < member_list_entries; i++) {
1719 cfs_clnode_t *node;
1720 if ((node = g_hash_table_lookup(clinfo->nodes_byid, &member_list[i].nodeid))) {
1721 node->online = TRUE;
1722 }
1723 }
1724
1725 cfs_status.clinfo_version++;
1726 }
1727
1728 g_mutex_unlock (&mutex);
1729 }
1730
1731 static gpointer
1732 dfsm_get_state(
1733 dfsm_t *dfsm,
1734 gpointer data,
1735 unsigned int *res_len)
1736 {
1737 g_return_val_if_fail(dfsm != NULL, NULL);
1738
1739 gpointer msg = clusterlog_get_state(cfs_status.clusterlog, res_len);
1740
1741 return msg;
1742 }
1743
1744 static int
1745 dfsm_process_update(
1746 dfsm_t *dfsm,
1747 gpointer data,
1748 dfsm_sync_info_t *syncinfo,
1749 uint32_t nodeid,
1750 uint32_t pid,
1751 const void *msg,
1752 size_t msg_len)
1753 {
1754 cfs_critical("%s: received unexpected update message", __func__);
1755
1756 return -1;
1757 }
1758
1759 static int
1760 dfsm_process_state_update(
1761 dfsm_t *dfsm,
1762 gpointer data,
1763 dfsm_sync_info_t *syncinfo)
1764 {
1765 g_return_val_if_fail(dfsm != NULL, -1);
1766 g_return_val_if_fail(syncinfo != NULL, -1);
1767
1768 clog_base_t *clog[syncinfo->node_count];
1769
1770 int local_index = -1;
1771 for (int i = 0; i < syncinfo->node_count; i++) {
1772 dfsm_node_info_t *ni = &syncinfo->nodes[i];
1773 ni->synced = 1;
1774
1775 if (syncinfo->local == ni)
1776 local_index = i;
1777
1778 clog_base_t *base = (clog_base_t *)ni->state;
1779 if (ni->state_len > 8 && ni->state_len == clog_size(base)) {
1780 clog[i] = ni->state;
1781 } else {
1782 cfs_critical("received log with wrong size %u", ni->state_len);
1783 clog[i] = NULL;
1784 }
1785 }
1786
1787 if (!clusterlog_merge(cfs_status.clusterlog, clog, syncinfo->node_count, local_index)) {
1788 cfs_critical("unable to merge log files");
1789 }
1790
1791 cfs_kvstore_sync();
1792
1793 return 1;
1794 }
1795
1796 static int
1797 dfsm_commit(
1798 dfsm_t *dfsm,
1799 gpointer data,
1800 dfsm_sync_info_t *syncinfo)
1801 {
1802 g_return_val_if_fail(dfsm != NULL, -1);
1803 g_return_val_if_fail(syncinfo != NULL, -1);
1804
1805 return 1;
1806 }
1807
1808 static void
1809 dfsm_synced(dfsm_t *dfsm)
1810 {
1811 g_return_if_fail(dfsm != NULL);
1812
1813 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, cfs.nodename);
1814 if (!ip)
1815 ip = cfs.ip;
1816
1817 cfs_status_set("nodeip", ip, strlen(ip) + 1);
1818 }
1819
1820 static int
1821 dfsm_cleanup(
1822 dfsm_t *dfsm,
1823 gpointer data,
1824 dfsm_sync_info_t *syncinfo)
1825 {
1826 return 1;
1827 }
1828
1829 static dfsm_callbacks_t kvstore_dfsm_callbacks = {
1830 .dfsm_deliver_fn = dfsm_deliver,
1831 .dfsm_confchg_fn = dfsm_confchg,
1832
1833 .dfsm_get_state_fn = dfsm_get_state,
1834 .dfsm_process_state_update_fn = dfsm_process_state_update,
1835 .dfsm_process_update_fn = dfsm_process_update,
1836 .dfsm_commit_fn = dfsm_commit,
1837 .dfsm_cleanup_fn = dfsm_cleanup,
1838 .dfsm_synced_fn = dfsm_synced,
1839 };
1840
1841 dfsm_t *
1842 cfs_status_dfsm_new(void)
1843 {
1844 g_mutex_lock (&mutex);
1845
1846 cfs_status.kvstore = dfsm_new(NULL, KVSTORE_CPG_GROUP_NAME, G_LOG_DOMAIN,
1847 0, &kvstore_dfsm_callbacks);
1848 g_mutex_unlock (&mutex);
1849
1850 return cfs_status.kvstore;
1851 }
1852
1853 gboolean
1854 cfs_is_quorate(void)
1855 {
1856 g_mutex_lock (&mutex);
1857 gboolean res = cfs_status.quorate;
1858 g_mutex_unlock (&mutex);
1859
1860 return res;
1861 }
1862
1863 void
1864 cfs_set_quorate(
1865 uint32_t quorate,
1866 gboolean quiet)
1867 {
1868 g_mutex_lock (&mutex);
1869
1870 uint32_t prev_quorate = cfs_status.quorate;
1871 cfs_status.quorate = quorate;
1872
1873 if (!prev_quorate && cfs_status.quorate) {
1874 if (!quiet)
1875 cfs_message("node has quorum");
1876 }
1877
1878 if (prev_quorate && !cfs_status.quorate) {
1879 if (!quiet)
1880 cfs_message("node lost quorum");
1881 }
1882
1883 g_mutex_unlock (&mutex);
1884 }