]> git.proxmox.com Git - pve-cluster.git/blame - data/src/status.c
imported from svn 'pve-cluster/trunk'
[pve-cluster.git] / data / src / status.c
CommitLineData
fe000966
DM
1/*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19*/
20
21#define G_LOG_DOMAIN "status"
22
23#ifdef HAVE_CONFIG_H
24#include <config.h>
25#endif /* HAVE_CONFIG_H */
26
27#include <stdio.h>
28#include <stdint.h>
29#include <string.h>
30#include <errno.h>
31#include <glib.h>
32#include <sys/syslog.h>
33#include <rrd.h>
34#include <rrd_client.h>
35#include <time.h>
36
37#include "cfs-utils.h"
38#include "status.h"
39#include "logger.h"
40
41#define KVSTORE_CPG_GROUP_NAME "pve_kvstore_v1"
42
43typedef enum {
44 KVSTORE_MESSAGE_UPDATE = 1,
45 KVSTORE_MESSAGE_UPDATE_COMPLETE = 2,
46 KVSTORE_MESSAGE_LOG = 3,
47} kvstore_message_t;
48
49static uint32_t vminfo_version_counter;
50
51typedef struct {
52 uint32_t vmid;
53 char *nodename;
54 int vmtype;
55 uint32_t version;
56} vminfo_t;
57
58typedef struct {
59 char *key;
60 gpointer data;
61 size_t len;
62 uint32_t version;
63} kventry_t;
64
65typedef struct {
66 char *key;
67 gpointer data;
68 size_t len;
69 uint32_t time;
70} rrdentry_t;
71
72typedef struct {
73 char *path;
74 uint32_t version;
75} memdb_change_t;
76
77static memdb_change_t memdb_change_array[] = {
78 { .path = "cluster.conf" },
79 { .path = "storage.cfg" },
80 { .path = "user.cfg" },
81 { .path = "domains.cfg" },
82 { .path = "priv/shadow.cfg" },
83 { .path = "datacenter.cfg" },
84};
85
86static GStaticMutex mutex = G_STATIC_MUTEX_INIT;
87
88typedef struct {
89 time_t start_time;
90
91 uint32_t quorate;
92
93 cfs_clinfo_t *clinfo;
94 uint32_t clinfo_version;
95
96 GHashTable *vmlist;
97 uint32_t vmlist_version;
98
99 dfsm_t *kvstore;
100 GHashTable *kvhash;
101 GHashTable *rrdhash;
102 GHashTable *iphash;
103
104 GHashTable *memdb_changes;
105
106 clusterlog_t *clusterlog;
107} cfs_status_t;
108
109static cfs_status_t cfs_status;
110
111struct cfs_clnode {
112 char *name;
113 uint32_t nodeid;
114 uint32_t votes;
115 gboolean online;
116 GHashTable *kvhash;
117};
118
119struct cfs_clinfo {
120 char *cluster_name;
121 uint32_t cman_version;
122
123 GHashTable *nodes_byid;
124 GHashTable *nodes_byname;
125};
126
127static guint
128g_int32_hash (gconstpointer v)
129{
130 return *(const uint32_t *) v;
131}
132
133static gboolean
134g_int32_equal (gconstpointer v1,
135 gconstpointer v2)
136{
137 return *((const uint32_t*) v1) == *((const uint32_t*) v2);
138}
139
140static void vminfo_free(vminfo_t *vminfo)
141{
142 g_return_if_fail(vminfo != NULL);
143
144 if (vminfo->nodename)
145 g_free(vminfo->nodename);
146
147
148 g_free(vminfo);
149}
150
151void cfs_clnode_destroy(
152 cfs_clnode_t *clnode)
153{
154 g_return_if_fail(clnode != NULL);
155
156 if (clnode->kvhash)
157 g_hash_table_destroy(clnode->kvhash);
158
159 if (clnode->name)
160 g_free(clnode->name);
161
162 g_free(clnode);
163}
164
165cfs_clnode_t *cfs_clnode_new(
166 const char *name,
167 uint32_t nodeid,
168 uint32_t votes)
169{
170 g_return_val_if_fail(name != NULL, NULL);
171
172 cfs_clnode_t *clnode = g_new0(cfs_clnode_t, 1);
173 if (!clnode)
174 return NULL;
175
176 clnode->name = g_strdup(name);
177 clnode->nodeid = nodeid;
178 clnode->votes = votes;
179
180 return clnode;
181}
182
183gboolean cfs_clinfo_destroy(
184 cfs_clinfo_t *clinfo)
185{
186 g_return_val_if_fail(clinfo != NULL, FALSE);
187
188 if (clinfo->cluster_name)
189 g_free(clinfo->cluster_name);
190
191 if (clinfo->nodes_byname)
192 g_hash_table_destroy(clinfo->nodes_byname);
193
194 if (clinfo->nodes_byid)
195 g_hash_table_destroy(clinfo->nodes_byid);
196
197 g_free(clinfo);
198
199 return TRUE;
200}
201
202cfs_clinfo_t *cfs_clinfo_new(
203 const char *cluster_name,
204 uint32_t cman_version)
205{
206 g_return_val_if_fail(cluster_name != NULL, NULL);
207
208 cfs_clinfo_t *clinfo = g_new0(cfs_clinfo_t, 1);
209 if (!clinfo)
210 return NULL;
211
212 clinfo->cluster_name = g_strdup(cluster_name);
213 clinfo->cman_version = cman_version;
214
215 if (!(clinfo->nodes_byid = g_hash_table_new_full(
216 g_int32_hash, g_int32_equal, NULL,
217 (GDestroyNotify)cfs_clnode_destroy)))
218 goto fail;
219
220 if (!(clinfo->nodes_byname = g_hash_table_new(g_str_hash, g_str_equal)))
221 goto fail;
222
223 return clinfo;
224
225fail:
226 cfs_clinfo_destroy(clinfo);
227
228 return NULL;
229}
230
231gboolean cfs_clinfo_add_node(
232 cfs_clinfo_t *clinfo,
233 cfs_clnode_t *clnode)
234{
235 g_return_val_if_fail(clinfo != NULL, FALSE);
236 g_return_val_if_fail(clnode != NULL, FALSE);
237
238 g_hash_table_replace(clinfo->nodes_byid, &clnode->nodeid, clnode);
239 g_hash_table_replace(clinfo->nodes_byname, clnode->name, clnode);
240
241 return TRUE;
242}
243
244int
245cfs_create_memberlist_msg(
246 GString *str)
247{
248 g_return_val_if_fail(str != NULL, -EINVAL);
249
250 g_static_mutex_lock(&mutex);
251
252 g_string_append_printf(str,"{\n");
253
254 guint nodecount = 0;
255
256 cfs_clinfo_t *clinfo = cfs_status.clinfo;
257
258 if (clinfo && clinfo->nodes_byid)
259 nodecount = g_hash_table_size(clinfo->nodes_byid);
260
261 if (nodecount) {
262 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
263 g_string_append_printf(str, "\"version\": %u,\n", cfs_status.clinfo_version);
264
265 g_string_append_printf(str, "\"cluster\": { ");
266 g_string_append_printf(str, "\"name\": \"%s\", \"version\": %d, "
267 "\"nodes\": %d, \"quorate\": %d ",
268 clinfo->cluster_name, clinfo->cman_version,
269 nodecount, cfs_status.quorate);
270
271 g_string_append_printf(str,"},\n");
272 g_string_append_printf(str,"\"nodelist\": {\n");
273
274 GHashTable *ht = clinfo->nodes_byid;
275 GHashTableIter iter;
276 gpointer key, value;
277
278 g_hash_table_iter_init (&iter, ht);
279
280 int i = 0;
281 while (g_hash_table_iter_next (&iter, &key, &value)) {
282 cfs_clnode_t *node = (cfs_clnode_t *)value;
283 if (i) g_string_append_printf(str, ",\n");
284 i++;
285
286 g_string_append_printf(str, " \"%s\": { \"id\": %d, \"online\": %d",
287 node->name, node->nodeid, node->online);
288
289
290 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, node->name);
291 if (ip) {
292 g_string_append_printf(str, ", \"ip\": \"%s\"", ip);
293 }
294
295 g_string_append_printf(str, "}");
296
297 }
298 g_string_append_printf(str,"\n }\n");
299 } else {
300 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
301 g_string_append_printf(str, "\"version\": %u\n", cfs_status.clinfo_version);
302 }
303
304 g_string_append_printf(str,"}\n");
305
306 g_static_mutex_unlock (&mutex);
307
308 return 0;
309}
310
311static void
312kventry_free(kventry_t *entry)
313{
314 g_return_if_fail(entry != NULL);
315
316 g_free(entry->key);
317 g_free(entry->data);
318 g_free(entry);
319}
320
321static GHashTable *
322kventry_hash_new(void)
323{
324 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
325 (GDestroyNotify)kventry_free);
326}
327
328static void
329rrdentry_free(rrdentry_t *entry)
330{
331 g_return_if_fail(entry != NULL);
332
333 g_free(entry->key);
334 g_free(entry->data);
335 g_free(entry);
336}
337
338static GHashTable *
339rrdentry_hash_new(void)
340{
341 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
342 (GDestroyNotify)rrdentry_free);
343}
344
345void
346cfs_cluster_log_dump(GString *str, const char *user, guint max_entries)
347{
348 clusterlog_dump(cfs_status.clusterlog, str, user, max_entries);
349}
350
351void
352cfs_cluster_log(clog_entry_t *entry)
353{
354 g_return_if_fail(entry != NULL);
355
356 clusterlog_insert(cfs_status.clusterlog, entry);
357
358 if (cfs_status.kvstore) {
359 struct iovec iov[1];
360 iov[0].iov_base = (char *)entry;
361 iov[0].iov_len = clog_entry_size(entry);
362
363 dfsm_send_message(cfs_status.kvstore, KVSTORE_MESSAGE_LOG, iov, 1);
364 }
365}
366
367void cfs_status_init(void)
368{
369 g_static_mutex_lock (&mutex);
370
371 cfs_status.start_time = time(NULL);
372
373 cfs_status.vmlist = vmlist_hash_new();
374
375 cfs_status.kvhash = kventry_hash_new();
376
377 cfs_status.rrdhash = rrdentry_hash_new();
378
379 cfs_status.iphash = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
380
381 cfs_status.memdb_changes = g_hash_table_new(g_str_hash, g_str_equal);
382
383 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
384 g_hash_table_replace(cfs_status.memdb_changes,
385 memdb_change_array[i].path,
386 &memdb_change_array[i]);
387 }
388
389 cfs_status.clusterlog = clusterlog_new();
390
391 // fixme:
392 clusterlog_add(cfs_status.clusterlog, "root", "cluster", getpid(),
393 LOG_INFO, "starting cluster log");
394
395 g_static_mutex_unlock (&mutex);
396}
397
398void cfs_status_cleanup(void)
399{
400 g_static_mutex_lock (&mutex);
401
402 cfs_status.clinfo_version++;
403
404 if (cfs_status.clinfo) {
405 cfs_clinfo_destroy(cfs_status.clinfo);
406 cfs_status.clinfo = NULL;
407 }
408
409 if (cfs_status.vmlist) {
410 g_hash_table_destroy(cfs_status.vmlist);
411 cfs_status.vmlist = NULL;
412 }
413
414 if (cfs_status.kvhash) {
415 g_hash_table_destroy(cfs_status.kvhash);
416 cfs_status.kvhash = NULL;
417 }
418
419 if (cfs_status.rrdhash) {
420 g_hash_table_destroy(cfs_status.rrdhash);
421 cfs_status.rrdhash = NULL;
422 }
423
424 if (cfs_status.iphash) {
425 g_hash_table_destroy(cfs_status.iphash);
426 cfs_status.iphash = NULL;
427 }
428
429 if (cfs_status.clusterlog)
430 clusterlog_destroy(cfs_status.clusterlog);
431
432 g_static_mutex_unlock (&mutex);
433}
434
435void cfs_status_set_clinfo(
436 cfs_clinfo_t *clinfo)
437{
438 g_return_if_fail(clinfo != NULL);
439
440 g_static_mutex_lock (&mutex);
441
442 cfs_status.clinfo_version++;
443
444 cfs_clinfo_t *old = cfs_status.clinfo;
445
446 cfs_status.clinfo = clinfo;
447
448 cfs_message("update cluster info (cluster name %s, version = %d)",
449 clinfo->cluster_name, clinfo->cman_version);
450
451
452 if (old && old->nodes_byid && clinfo->nodes_byid) {
453 /* copy kvstore */
454 GHashTable *ht = clinfo->nodes_byid;
455 GHashTableIter iter;
456 gpointer key, value;
457
458 g_hash_table_iter_init (&iter, ht);
459
460 while (g_hash_table_iter_next (&iter, &key, &value)) {
461 cfs_clnode_t *node = (cfs_clnode_t *)value;
462 cfs_clnode_t *oldnode;
463 if ((oldnode = g_hash_table_lookup(old->nodes_byid, &key))) {
464 node->online = oldnode->online;
465 node->kvhash = oldnode->kvhash;
466 oldnode->kvhash = NULL;
467 }
468 }
469
470 }
471
472 if (old)
473 cfs_clinfo_destroy(old);
474
475
476 g_static_mutex_unlock (&mutex);
477}
478
479static void
480dump_kvstore_versions(
481 GString *str,
482 GHashTable *kvhash,
483 const char *nodename)
484{
485 g_return_if_fail(kvhash != NULL);
486 g_return_if_fail(str != NULL);
487 g_return_if_fail(nodename != NULL);
488
489 GHashTable *ht = kvhash;
490 GHashTableIter iter;
491 gpointer key, value;
492
493 g_string_append_printf(str, "\"%s\": {\n", nodename);
494
495 g_hash_table_iter_init (&iter, ht);
496
497 int i = 0;
498 while (g_hash_table_iter_next (&iter, &key, &value)) {
499 kventry_t *entry = (kventry_t *)value;
500 if (i) g_string_append_printf(str, ",\n");
501 i++;
502 g_string_append_printf(str,"\"%s\": %u", entry->key, entry->version);
503 }
504
505 g_string_append_printf(str, "}\n");
506}
507
508int
509cfs_create_version_msg(GString *str)
510{
511 g_return_val_if_fail(str != NULL, -EINVAL);
512
513 g_static_mutex_lock (&mutex);
514
515 g_string_append_printf(str,"{\n");
516
517 g_string_append_printf(str, "\"starttime\": %lu,\n", (unsigned long)cfs_status.start_time);
518
519 g_string_append_printf(str, "\"clinfo\": %u,\n", cfs_status.clinfo_version);
520
521 g_string_append_printf(str, "\"vmlist\": %u,\n", cfs_status.vmlist_version);
522
523 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
524 g_string_append_printf(str, "\"%s\": %u,\n",
525 memdb_change_array[i].path,
526 memdb_change_array[i].version);
527 }
528
529 g_string_append_printf(str, "\"kvstore\": {\n");
530
531 dump_kvstore_versions(str, cfs_status.kvhash, cfs.nodename);
532
533 cfs_clinfo_t *clinfo = cfs_status.clinfo;
534
535 if (clinfo && clinfo->nodes_byid) {
536 GHashTable *ht = clinfo->nodes_byid;
537 GHashTableIter iter;
538 gpointer key, value;
539
540 g_hash_table_iter_init (&iter, ht);
541
542 while (g_hash_table_iter_next (&iter, &key, &value)) {
543 cfs_clnode_t *node = (cfs_clnode_t *)value;
544 if (!node->kvhash)
545 continue;
546 g_string_append_printf(str, ",\n");
547 dump_kvstore_versions(str, node->kvhash, node->name);
548 }
549 }
550
551 g_string_append_printf(str,"}\n");
552
553 g_string_append_printf(str,"}\n");
554
555 g_static_mutex_unlock (&mutex);
556
557 return 0;
558}
559
560GHashTable *
561vmlist_hash_new(void)
562{
563 return g_hash_table_new_full(g_int_hash, g_int_equal, NULL,
564 (GDestroyNotify)vminfo_free);
565}
566
567gboolean
568vmlist_hash_insert_vm(
569 GHashTable *vmlist,
570 int vmtype,
571 guint32 vmid,
572 const char *nodename,
573 gboolean replace)
574{
575 g_return_val_if_fail(vmlist != NULL, FALSE);
576 g_return_val_if_fail(nodename != NULL, FALSE);
577 g_return_val_if_fail(vmid != 0, FALSE);
578 g_return_val_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ, FALSE);
579
580 if (!replace && g_hash_table_lookup(vmlist, &vmid)) {
581 cfs_critical("detected duplicate VMID %d", vmid);
582 return FALSE;
583 }
584
585 vminfo_t *vminfo = g_new0(vminfo_t, 1);
586
587 vminfo->vmid = vmid;
588 vminfo->vmtype = vmtype;
589 vminfo->nodename = g_strdup(nodename);
590
591 vminfo->version = ++vminfo_version_counter;
592
593 g_hash_table_replace(vmlist, &vminfo->vmid, vminfo);
594
595 return TRUE;
596}
597
598void
599vmlist_register_vm(
600 int vmtype,
601 guint32 vmid,
602 const char *nodename)
603{
604 g_return_if_fail(cfs_status.vmlist != NULL);
605 g_return_if_fail(nodename != NULL);
606 g_return_if_fail(vmid != 0);
607 g_return_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ);
608
609 cfs_debug("vmlist_register_vm: %s/%u %d", nodename, vmid, vmtype);
610
611 g_static_mutex_lock (&mutex);
612
613 cfs_status.vmlist_version++;
614
615 vmlist_hash_insert_vm(cfs_status.vmlist, vmtype, vmid, nodename, TRUE);
616
617 g_static_mutex_unlock (&mutex);
618}
619
620gboolean
621vmlist_different_vm_exists(
622 int vmtype,
623 guint32 vmid,
624 const char *nodename)
625{
626 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
627 g_return_val_if_fail(vmid != 0, FALSE);
628
629 gboolean res = FALSE;
630
631 g_static_mutex_lock (&mutex);
632
633 vminfo_t *vminfo;
634 if ((vminfo = (vminfo_t *)g_hash_table_lookup(cfs_status.vmlist, &vmid))) {
635 if (!(vminfo->vmtype == vmtype && strcmp(vminfo->nodename, nodename) == 0))
636 res = TRUE;
637 }
638 g_static_mutex_unlock (&mutex);
639
640 return res;
641}
642
643gboolean
644vmlist_vm_exists(
645 guint32 vmid)
646{
647 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
648 g_return_val_if_fail(vmid != 0, FALSE);
649
650 g_static_mutex_lock (&mutex);
651
652 gpointer res = g_hash_table_lookup(cfs_status.vmlist, &vmid);
653
654 g_static_mutex_unlock (&mutex);
655
656 return res != NULL;
657}
658
659void
660vmlist_delete_vm(
661 guint32 vmid)
662{
663 g_return_if_fail(cfs_status.vmlist != NULL);
664 g_return_if_fail(vmid != 0);
665
666 g_static_mutex_lock (&mutex);
667
668 cfs_status.vmlist_version++;
669
670 g_hash_table_remove(cfs_status.vmlist, &vmid);
671
672 g_static_mutex_unlock (&mutex);
673}
674
675void cfs_status_set_vmlist(
676 GHashTable *vmlist)
677{
678 g_return_if_fail(vmlist != NULL);
679
680 g_static_mutex_lock (&mutex);
681
682 cfs_status.vmlist_version++;
683
684 if (cfs_status.vmlist)
685 g_hash_table_destroy(cfs_status.vmlist);
686
687 cfs_status.vmlist = vmlist;
688
689 g_static_mutex_unlock (&mutex);
690}
691
692int
693cfs_create_vmlist_msg(GString *str)
694{
695 g_return_val_if_fail(cfs_status.vmlist != NULL, -EINVAL);
696 g_return_val_if_fail(str != NULL, -EINVAL);
697
698 g_static_mutex_lock (&mutex);
699
700 g_string_append_printf(str,"{\n");
701
702 GHashTable *ht = cfs_status.vmlist;
703
704 guint count = g_hash_table_size(ht);
705
706 if (!count) {
707 g_string_append_printf(str,"\"version\": %u\n", cfs_status.vmlist_version);
708 } else {
709 g_string_append_printf(str,"\"version\": %u,\n", cfs_status.vmlist_version);
710
711 g_string_append_printf(str,"\"ids\": {\n");
712
713 GHashTableIter iter;
714 gpointer key, value;
715
716 g_hash_table_iter_init (&iter, ht);
717
718 int first = 1;
719 while (g_hash_table_iter_next (&iter, &key, &value)) {
720 vminfo_t *vminfo = (vminfo_t *)value;
721 char *type;
722 if (vminfo->vmtype == VMTYPE_QEMU) {
723 type = "qemu";
724 } else if (vminfo->vmtype == VMTYPE_OPENVZ) {
725 type = "openvz";
726 } else {
727 type = "unknown";
728 }
729
730 if (!first)
731 g_string_append_printf(str, ",\n");
732 first = 0;
733
734 g_string_append_printf(str,"\"%u\": { \"node\": \"%s\", \"type\": \"%s\", \"version\": %u }",
735 vminfo->vmid, vminfo->nodename, type, vminfo->version);
736 }
737
738 g_string_append_printf(str,"}\n");
739 }
740 g_string_append_printf(str,"\n}\n");
741
742 g_static_mutex_unlock (&mutex);
743
744 return 0;
745}
746
747void
748record_memdb_change(const char *path)
749{
750 g_return_if_fail(cfs_status.memdb_changes != 0);
751
752 memdb_change_t *ce;
753
754 if ((ce = (memdb_change_t *)g_hash_table_lookup(cfs_status.memdb_changes, path))) {
755 ce->version++;
756 }
757}
758
759void
760record_memdb_reload(void)
761{
762 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
763 memdb_change_array[i].version++;
764 }
765}
766
767static gboolean
768kventry_hash_set(
769 GHashTable *kvhash,
770 const char *key,
771 gconstpointer data,
772 size_t len)
773{
774 g_return_val_if_fail(kvhash != NULL, FALSE);
775 g_return_val_if_fail(key != NULL, FALSE);
776 g_return_val_if_fail(data != NULL, FALSE);
777
778 kventry_t *entry;
779 if ((entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
780 g_free(entry->data);
781 entry->data = g_memdup(data, len);
782 entry->len = len;
783 entry->version++;
784 } else {
785 kventry_t *entry = g_new0(kventry_t, 1);
786
787 entry->key = g_strdup(key);
788 entry->data = g_memdup(data, len);
789 entry->len = len;
790
791 g_hash_table_replace(kvhash, entry->key, entry);
792 }
793
794 return TRUE;
795}
796
797static const char *rrd_def_node[] = {
798 "DS:loadavg:GAUGE:120:0:U",
799 "DS:maxcpu:GAUGE:120:0:U",
800 "DS:cpu:GAUGE:120:0:U",
801 "DS:iowait:GAUGE:120:0:U",
802 "DS:memtotal:GAUGE:120:0:U",
803 "DS:memused:GAUGE:120:0:U",
804 "DS:swaptotal:GAUGE:120:0:U",
805 "DS:swapused:GAUGE:120:0:U",
806 "DS:roottotal:GAUGE:120:0:U",
807 "DS:rootused:GAUGE:120:0:U",
808 "DS:netin:COUNTER:120:0:U",
809 "DS:netout:COUNTER:120:0:U",
810
811 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
812 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
813 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
814 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
815 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
816
817 "RRA:MAX:0.5:1:70", // 1 min max - one hour
818 "RRA:MAX:0.5:30:70", // 30 min max - one day
819 "RRA:MAX:0.5:180:70", // 3 hour max - one week
820 "RRA:MAX:0.5:720:70", // 12 hour max - one month
821 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
822 NULL,
823};
824
825static const char *rrd_def_vm[] = {
826 "DS:maxcpu:GAUGE:120:0:U",
827 "DS:cpu:GAUGE:120:0:U",
828 "DS:maxmem:GAUGE:120:0:U",
829 "DS:mem:GAUGE:120:0:U",
830 "DS:maxdisk:GAUGE:120:0:U",
831 "DS:disk:GAUGE:120:0:U",
832 "DS:netin:COUNTER:120:0:U",
833 "DS:netout:COUNTER:120:0:U",
834 "DS:diskread:COUNTER:120:0:U",
835 "DS:diskwrite:COUNTER:120:0:U",
836
837 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
838 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
839 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
840 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
841 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
842
843 "RRA:MAX:0.5:1:70", // 1 min max - one hour
844 "RRA:MAX:0.5:30:70", // 30 min max - one day
845 "RRA:MAX:0.5:180:70", // 3 hour max - one week
846 "RRA:MAX:0.5:720:70", // 12 hour max - one month
847 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
848 NULL,
849};
850
851static const char *rrd_def_storage[] = {
852 "DS:total:GAUGE:120:0:U",
853 "DS:used:GAUGE:120:0:U",
854
855 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
856 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
857 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
858 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
859 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
860
861 "RRA:MAX:0.5:1:70", // 1 min max - one hour
862 "RRA:MAX:0.5:30:70", // 30 min max - one day
863 "RRA:MAX:0.5:180:70", // 3 hour max - one week
864 "RRA:MAX:0.5:720:70", // 12 hour max - one month
865 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
866 NULL,
867};
868
869#define RRDDIR "/var/lib/rrdcached/db"
870
871static void
872create_rrd_file(
873 const char *filename,
874 int argcount,
875 const char *rrddef[])
876{
877 /* start at day boundary */
878 time_t ctime;
879 time(&ctime);
880 struct tm *ltm = localtime(&ctime);
881 ltm->tm_sec = 0;
882 ltm->tm_min = 0;
883 ltm->tm_hour = 0;
884
885 rrd_clear_error();
886 if (rrd_create_r(filename, 60, timelocal(ltm), argcount, rrddef)) {
887 cfs_message("RRD create error %s: %s", filename, rrd_get_error());
888 }
889}
890
891static inline const char *
892rrd_skip_data(
893 const char *data,
894 int count)
895{
896 int found = 0;
897 while (*data && found < count) {
898 if (*data++ == ':')
899 found++;
900 }
901 return data;
902}
903
904static void
905update_rrd_data(
906 const char *key,
907 gconstpointer data,
908 size_t len)
909{
910 g_return_if_fail(key != NULL);
911 g_return_if_fail(data != NULL);
912 g_return_if_fail(len > 0);
913 g_return_if_fail(len < 4096);
914
915 static const char *rrdcsock = "unix:/var/run/rrdcached.sock";
916
917 int use_daemon = 1;
918 if (rrdc_connect(rrdcsock) != 0)
919 use_daemon = 0;
920
921 char *filename = g_strdup_printf(RRDDIR "/%s", key);
922
923 int skip = 0;
924
925 if (strncmp(key, "pve2-node/", 10) == 0) {
926 const char *node = key + 10;
927
928 skip = 1;
929
930 if (strchr(node, '/') != NULL)
931 goto keyerror;
932
933 if (strlen(node) < 1)
934 goto keyerror;
935
936 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
937
938 mkdir(RRDDIR "/pve2-node", 0755);
939 int argcount = sizeof(rrd_def_node)/sizeof(void*) - 1;
940 create_rrd_file(filename, argcount, rrd_def_node);
941 }
942
943 } else if (strncmp(key, "pve2-vm/", 8) == 0) {
944 const char *vmid = key + 8;
945
946 skip = 2;
947
948 if (strchr(vmid, '/') != NULL)
949 goto keyerror;
950
951 if (strlen(vmid) < 1)
952 goto keyerror;
953
954 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
955
956 mkdir(RRDDIR "/pve2-vm", 0755);
957 int argcount = sizeof(rrd_def_vm)/sizeof(void*) - 1;
958 create_rrd_file(filename, argcount, rrd_def_vm);
959 }
960
961 } else if (strncmp(key, "pve2-storage/", 13) == 0) {
962 const char *node = key + 13;
963
964 const char *storage = node;
965 while (*storage && *storage != '/')
966 storage++;
967
968 if (*storage != '/' || ((storage - node) < 1))
969 goto keyerror;
970
971 storage++;
972
973 if (strchr(storage, '/') != NULL)
974 goto keyerror;
975
976 if (strlen(storage) < 1)
977 goto keyerror;
978
979 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
980
981 mkdir(RRDDIR "/pve2-storage", 0755);
982
983 char *dir = g_path_get_dirname(filename);
984 mkdir(dir, 0755);
985 g_free(dir);
986
987 int argcount = sizeof(rrd_def_storage)/sizeof(void*) - 1;
988 create_rrd_file(filename, argcount, rrd_def_storage);
989 }
990
991 } else {
992 goto keyerror;
993 }
994
995 const char *dp = skip ? rrd_skip_data(data, skip) : data;
996
997 const char *update_args[] = { dp, NULL };
998
999 if (use_daemon) {
1000 int status;
1001 if ((status = rrdc_update(filename, 1, update_args)) != 0) {
1002 cfs_message("RRDC update error %s: %d", filename, status);
1003 rrdc_disconnect();
1004 rrd_clear_error();
1005 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1006 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1007 }
1008 }
1009
1010 } else {
1011 rrd_clear_error();
1012 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1013 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1014 }
1015 }
1016
1017ret:
1018 g_free(filename);
1019 return;
1020
1021keyerror:
1022 cfs_critical("RRD update error: unknown/wrong key %s", key);
1023 goto ret;
1024}
1025
1026static gboolean
1027rrd_entry_is_old(
1028 gpointer key,
1029 gpointer value,
1030 gpointer user_data)
1031{
1032 rrdentry_t *entry = (rrdentry_t *)value;
1033 uint32_t ctime = GPOINTER_TO_UINT(user_data);
1034
1035 int diff = ctime - entry->time;
1036
1037 /* remove everything older than 5 minutes */
1038 int expire = 60*5;
1039
1040 return (diff > expire) ? TRUE : FALSE;
1041}
1042
1043static char *rrd_dump_buf = NULL;
1044static time_t rrd_dump_last = 0;
1045
1046void
1047cfs_rrd_dump(GString *str)
1048{
1049 time_t ctime;
1050 time(&ctime);
1051
1052 if (rrd_dump_buf && (ctime - rrd_dump_last) < 2) {
1053 g_string_assign(str, rrd_dump_buf);
1054 return;
1055 }
1056
1057 /* remove old data */
1058 g_hash_table_foreach_remove(cfs_status.rrdhash, rrd_entry_is_old,
1059 GUINT_TO_POINTER(ctime));
1060
1061 g_string_set_size(str, 0);
1062
1063 GHashTableIter iter;
1064 gpointer key, value;
1065
1066 g_hash_table_iter_init (&iter, cfs_status.rrdhash);
1067
1068 while (g_hash_table_iter_next (&iter, &key, &value)) {
1069 rrdentry_t *entry = (rrdentry_t *)value;
1070 g_string_append(str, key);
1071 g_string_append(str, ":");
1072 g_string_append(str, entry->data);
1073 g_string_append(str, "\n");
1074 }
1075
1076 g_string_append_c(str, 0); // never return undef
1077
1078 rrd_dump_last = ctime;
1079 if (rrd_dump_buf)
1080 g_free(rrd_dump_buf);
1081 rrd_dump_buf = g_strdup(str->str);
1082}
1083
1084static gboolean
1085nodeip_hash_set(
1086 GHashTable *iphash,
1087 const char *nodename,
1088 const char *ip,
1089 size_t len)
1090{
1091 g_return_val_if_fail(iphash != NULL, FALSE);
1092 g_return_val_if_fail(nodename != NULL, FALSE);
1093 g_return_val_if_fail(ip != NULL, FALSE);
1094 g_return_val_if_fail(len > 0, FALSE);
1095 g_return_val_if_fail(len < 256, FALSE);
1096 g_return_val_if_fail(ip[len-1] == 0, FALSE);
1097
1098 char *oldip = (char *)g_hash_table_lookup(iphash, nodename);
1099
1100 if (!oldip || (strcmp(oldip, ip) != 0)) {
1101 cfs_status.clinfo_version++;
1102 g_hash_table_replace(iphash, g_strdup(nodename), g_strdup(ip));
1103 }
1104
1105 return TRUE;
1106}
1107
1108static gboolean
1109rrdentry_hash_set(
1110 GHashTable *rrdhash,
1111 const char *key,
1112 const char *data,
1113 size_t len)
1114{
1115 g_return_val_if_fail(rrdhash != NULL, FALSE);
1116 g_return_val_if_fail(key != NULL, FALSE);
1117 g_return_val_if_fail(data != NULL, FALSE);
1118 g_return_val_if_fail(len > 0, FALSE);
1119 g_return_val_if_fail(len < 4096, FALSE);
1120 g_return_val_if_fail(data[len-1] == 0, FALSE);
1121
1122 rrdentry_t *entry;
1123 if ((entry = (rrdentry_t *)g_hash_table_lookup(rrdhash, key))) {
1124 g_free(entry->data);
1125 entry->data = g_memdup(data, len);
1126 entry->len = len;
1127 entry->time = time(NULL);
1128 } else {
1129 rrdentry_t *entry = g_new0(rrdentry_t, 1);
1130
1131 entry->key = g_strdup(key);
1132 entry->data = g_memdup(data, len);
1133 entry->len = len;
1134 entry->time = time(NULL);
1135
1136 g_hash_table_replace(rrdhash, entry->key, entry);
1137 }
1138
1139 update_rrd_data(key, data, len);
1140
1141 return TRUE;
1142}
1143
1144static int
1145kvstore_send_update_message(
1146 dfsm_t *dfsm,
1147 const char *key,
1148 gpointer data,
1149 guint32 len)
1150{
1151
1152 struct iovec iov[2];
1153
1154 char name[256];
1155 g_strlcpy(name, key, sizeof(name));
1156
1157 iov[0].iov_base = &name;
1158 iov[0].iov_len = sizeof(name);
1159
1160 iov[1].iov_base = (char *)data;
1161 iov[1].iov_len = len;
1162
1163 if (dfsm_send_message(dfsm, KVSTORE_MESSAGE_UPDATE, iov, 2) == CS_OK)
1164 return 0;
1165
1166 return -EACCES;
1167}
1168
1169static clog_entry_t *
1170kvstore_parse_log_message(
1171 const void *msg,
1172 size_t msg_len)
1173{
1174 g_return_val_if_fail(msg != NULL, NULL);
1175
1176 if (msg_len < sizeof(clog_entry_t)) {
1177 cfs_critical("received short log message (%lu < %lu)", msg_len, sizeof(clog_entry_t));
1178 return NULL;
1179 }
1180
1181 clog_entry_t *entry = (clog_entry_t *)msg;
1182
1183 uint32_t size = sizeof(clog_entry_t) + entry->node_len +
1184 entry->ident_len + entry->tag_len + entry->msg_len;
1185
1186 if (msg_len != size) {
1187 cfs_critical("received log message with wrong size (%lu != %u)", msg_len, size);
1188 return NULL;
1189 }
1190
1191 msg = entry->data;
1192
1193 if (*((char *)msg + entry->node_len - 1)) {
1194 cfs_critical("unterminated string in log message");
1195 return NULL;
1196 }
1197 msg += entry->node_len;
1198
1199 if (*((char *)msg + entry->ident_len - 1)) {
1200 cfs_critical("unterminated string in log message");
1201 return NULL;
1202 }
1203 msg += entry->ident_len;
1204
1205 if (*((char *)msg + entry->tag_len - 1)) {
1206 cfs_critical("unterminated string in log message");
1207 return NULL;
1208 }
1209 msg += entry->tag_len;
1210
1211 if (*((char *)msg + entry->msg_len - 1)) {
1212 cfs_critical("unterminated string in log message");
1213 return NULL;
1214 }
1215
1216 return entry;
1217}
1218
1219static gboolean
1220kvstore_parse_update_message(
1221 const void *msg,
1222 size_t msg_len,
1223 const char **key,
1224 gconstpointer *data,
1225 guint32 *len)
1226{
1227 g_return_val_if_fail(msg != NULL, FALSE);
1228 g_return_val_if_fail(key != NULL, FALSE);
1229 g_return_val_if_fail(data != NULL, FALSE);
1230 g_return_val_if_fail(len != NULL, FALSE);
1231
1232 if (msg_len < 256) {
1233 cfs_critical("received short kvstore message (%lu < 256)", msg_len);
1234 return FALSE;
1235 }
1236
1237 /* test if key is null terminated */
1238 int i = 0;
1239 for (i = 0; i < 256; i++)
1240 if (((char *)msg)[i] == 0)
1241 break;
1242
1243 if (i == 256)
1244 return FALSE;
1245
1246
1247 *len = msg_len - 256;
1248 *key = msg;
1249 *data = msg + 256;
1250
1251 return TRUE;
1252}
1253
1254int
1255cfs_create_status_msg(
1256 GString *str,
1257 const char *nodename,
1258 const char *key)
1259{
1260 g_return_val_if_fail(str != NULL, -EINVAL);
1261 g_return_val_if_fail(key != NULL, -EINVAL);
1262
1263 int res = -ENOENT;
1264
1265 GHashTable *kvhash = NULL;
1266
1267 g_static_mutex_lock (&mutex);
1268
1269 if (!nodename || !nodename[0] || !strcmp(nodename, cfs.nodename)) {
1270 kvhash = cfs_status.kvhash;
1271 } else {
1272 cfs_clnode_t *clnode;
1273 if ((clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byname, nodename)))
1274 kvhash = clnode->kvhash;
1275 }
1276
1277 kventry_t *entry;
1278 if (kvhash && (entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
1279 g_string_append_len(str, entry->data, entry->len);
1280 res = 0;
1281 }
1282
1283 g_static_mutex_unlock (&mutex);
1284
1285 return res;
1286}
1287
1288int
1289cfs_status_set(
1290 const char *key,
1291 gpointer data,
1292 size_t len)
1293{
1294 g_return_val_if_fail(key != NULL, FALSE);
1295 g_return_val_if_fail(data != NULL, FALSE);
1296 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1297
1298 if (len > CFS_MAX_STATUS_SIZE)
1299 return -EFBIG;
1300
1301 g_static_mutex_lock (&mutex);
1302
1303 gboolean res;
1304
1305 if (strncmp(key, "rrd/", 4) == 0) {
1306 res = rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1307 } else if (!strcmp(key, "nodeip")) {
1308 res = nodeip_hash_set(cfs_status.iphash, cfs.nodename, data, len);
1309 } else {
1310 res = kventry_hash_set(cfs_status.kvhash, key, data, len);
1311 }
1312 g_static_mutex_unlock (&mutex);
1313
1314 if (cfs_status.kvstore)
1315 kvstore_send_update_message(cfs_status.kvstore, key, data, len);
1316
1317 return res ? 0 : -ENOMEM;
1318}
1319
1320gboolean
1321cfs_kvstore_node_set(
1322 uint32_t nodeid,
1323 const char *key,
1324 gconstpointer data,
1325 size_t len)
1326{
1327 g_return_val_if_fail(nodeid != 0, FALSE);
1328 g_return_val_if_fail(key != NULL, FALSE);
1329 g_return_val_if_fail(data != NULL, FALSE);
1330
1331 g_static_mutex_lock (&mutex);
1332
1333 if (!cfs_status.clinfo || !cfs_status.clinfo->nodes_byid)
1334 goto ret; /* ignore */
1335
1336 cfs_clnode_t *clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byid, &nodeid);
1337 if (!clnode)
1338 goto ret; /* ignore */
1339
1340 cfs_debug("got node %d status update %s", nodeid, key);
1341
1342 if (strncmp(key, "rrd/", 4) == 0) {
1343 rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1344 } else if (!strcmp(key, "nodeip")) {
1345 nodeip_hash_set(cfs_status.iphash, clnode->name, data, len);
1346 } else {
1347 if (!clnode->kvhash) {
1348 if (!(clnode->kvhash = kventry_hash_new())) {
1349 goto ret; /*ignore */
1350 }
1351 }
1352
1353 kventry_hash_set(clnode->kvhash, key, data, len);
1354
1355 }
1356ret:
1357 g_static_mutex_unlock (&mutex);
1358
1359 return TRUE;
1360}
1361
1362static gboolean
1363cfs_kvstore_sync(void)
1364{
1365 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1366 g_return_val_if_fail(cfs_status.kvstore != NULL, FALSE);
1367
1368 gboolean res = TRUE;
1369
1370 g_static_mutex_lock (&mutex);
1371
1372 GHashTable *ht = cfs_status.kvhash;
1373 GHashTableIter iter;
1374 gpointer key, value;
1375
1376 g_hash_table_iter_init (&iter, ht);
1377
1378 while (g_hash_table_iter_next (&iter, &key, &value)) {
1379 kventry_t *entry = (kventry_t *)value;
1380 kvstore_send_update_message(cfs_status.kvstore, entry->key, entry->data, entry->len);
1381 }
1382
1383 g_static_mutex_unlock (&mutex);
1384
1385 return res;
1386}
1387
1388static int
1389dfsm_deliver(
1390 dfsm_t *dfsm,
1391 gpointer data,
1392 int *res_ptr,
1393 uint32_t nodeid,
1394 uint32_t pid,
1395 uint16_t msg_type,
1396 uint32_t msg_time,
1397 const void *msg,
1398 size_t msg_len)
1399{
1400 g_return_val_if_fail(dfsm != NULL, -1);
1401 g_return_val_if_fail(msg != NULL, -1);
1402 g_return_val_if_fail(res_ptr != NULL, -1);
1403
1404 /* ignore message for ourself */
1405 if (dfsm_nodeid_is_local(dfsm, nodeid, pid))
1406 goto ret;
1407
1408 if (msg_type == KVSTORE_MESSAGE_UPDATE) {
1409 const char *key;
1410 gconstpointer data;
1411 guint32 len;
1412 if (kvstore_parse_update_message(msg, msg_len, &key, &data, &len)) {
1413 cfs_kvstore_node_set(nodeid, key, data, len);
1414 } else {
1415 cfs_critical("cant parse update message");
1416 }
1417 } else if (msg_type == KVSTORE_MESSAGE_LOG) {
1418 cfs_message("received log"); // fixme: remove
1419 const clog_entry_t *entry;
1420 if ((entry = kvstore_parse_log_message(msg, msg_len))) {
1421 clusterlog_insert(cfs_status.clusterlog, entry);
1422 } else {
1423 cfs_critical("cant parse log message");
1424 }
1425 } else {
1426 cfs_critical("received unknown message type %d\n", msg_type);
1427 goto fail;
1428 }
1429
1430ret:
1431 *res_ptr = 0;
1432 return 1;
1433
1434fail:
1435 *res_ptr = -EACCES;
1436 return 1;
1437}
1438
1439static void
1440dfsm_confchg(
1441 dfsm_t *dfsm,
1442 gpointer data,
1443 const struct cpg_address *member_list,
1444 size_t member_list_entries)
1445{
1446 g_return_if_fail(dfsm != NULL);
1447 g_return_if_fail(member_list != NULL);
1448
1449 cfs_debug("enter %s", __func__);
1450
1451 g_static_mutex_lock (&mutex);
1452
1453 cfs_clinfo_t *clinfo = cfs_status.clinfo;
1454
1455 if (clinfo && clinfo->nodes_byid) {
1456
1457 GHashTable *ht = clinfo->nodes_byid;
1458 GHashTableIter iter;
1459 gpointer key, value;
1460
1461 g_hash_table_iter_init (&iter, ht);
1462
1463 while (g_hash_table_iter_next (&iter, &key, &value)) {
1464 cfs_clnode_t *node = (cfs_clnode_t *)value;
1465 node->online = FALSE;
1466 }
1467
1468 for (int i = 0; i < member_list_entries; i++) {
1469 cfs_clnode_t *node;
1470 if ((node = g_hash_table_lookup(clinfo->nodes_byid, &member_list[i].nodeid))) {
1471 node->online = TRUE;
1472 }
1473 }
1474
1475 cfs_status.clinfo_version++;
1476 }
1477
1478 g_static_mutex_unlock (&mutex);
1479}
1480
1481static gpointer
1482dfsm_get_state(
1483 dfsm_t *dfsm,
1484 gpointer data,
1485 unsigned int *res_len)
1486{
1487 g_return_val_if_fail(dfsm != NULL, NULL);
1488
1489 gpointer msg = clusterlog_get_state(cfs_status.clusterlog, res_len);
1490
1491 return msg;
1492}
1493
1494static int
1495dfsm_process_update(
1496 dfsm_t *dfsm,
1497 gpointer data,
1498 dfsm_sync_info_t *syncinfo,
1499 uint32_t nodeid,
1500 uint32_t pid,
1501 const void *msg,
1502 size_t msg_len)
1503{
1504 cfs_critical("%s: received unexpected update message", __func__);
1505
1506 return -1;
1507}
1508
1509static int
1510dfsm_process_state_update(
1511 dfsm_t *dfsm,
1512 gpointer data,
1513 dfsm_sync_info_t *syncinfo)
1514{
1515 g_return_val_if_fail(dfsm != NULL, -1);
1516 g_return_val_if_fail(syncinfo != NULL, -1);
1517
1518 clog_base_t *clog[syncinfo->node_count];
1519
1520 int local_index = -1;
1521 for (int i = 0; i < syncinfo->node_count; i++) {
1522 dfsm_node_info_t *ni = &syncinfo->nodes[i];
1523 ni->synced = 1;
1524
1525 if (syncinfo->local == ni)
1526 local_index = i;
1527
1528 clog_base_t *base = (clog_base_t *)ni->state;
1529 if (ni->state_len > 8 && ni->state_len == clog_size(base)) {
1530 clog[i] = ni->state;
1531 } else {
1532 cfs_critical("received log with wrong size %u", ni->state_len);
1533 clog[i] = NULL;
1534 }
1535 }
1536
1537 if (!clusterlog_merge(cfs_status.clusterlog, clog, syncinfo->node_count, local_index)) {
1538 cfs_critical("unable to merge log files");
1539 }
1540
1541 cfs_kvstore_sync();
1542
1543 return 1;
1544}
1545
1546static int
1547dfsm_commit(
1548 dfsm_t *dfsm,
1549 gpointer data,
1550 dfsm_sync_info_t *syncinfo)
1551{
1552 g_return_val_if_fail(dfsm != NULL, -1);
1553 g_return_val_if_fail(syncinfo != NULL, -1);
1554
1555 return 1;
1556}
1557
1558static void
1559dfsm_synced(dfsm_t *dfsm)
1560{
1561 g_return_if_fail(dfsm != NULL);
1562
1563 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, cfs.nodename);
1564 if (!ip)
1565 ip = cfs.ip;
1566
1567 cfs_status_set("nodeip", ip, strlen(ip) + 1);
1568}
1569
1570static int
1571dfsm_cleanup(
1572 dfsm_t *dfsm,
1573 gpointer data,
1574 dfsm_sync_info_t *syncinfo)
1575{
1576 return 1;
1577}
1578
1579static dfsm_callbacks_t kvstore_dfsm_callbacks = {
1580 .dfsm_deliver_fn = dfsm_deliver,
1581 .dfsm_confchg_fn = dfsm_confchg,
1582
1583 .dfsm_get_state_fn = dfsm_get_state,
1584 .dfsm_process_state_update_fn = dfsm_process_state_update,
1585 .dfsm_process_update_fn = dfsm_process_update,
1586 .dfsm_commit_fn = dfsm_commit,
1587 .dfsm_cleanup_fn = dfsm_cleanup,
1588 .dfsm_synced_fn = dfsm_synced,
1589};
1590
1591dfsm_t *
1592cfs_status_dfsm_new(void)
1593{
1594 g_static_mutex_lock (&mutex);
1595
1596 cfs_status.kvstore = dfsm_new(NULL, KVSTORE_CPG_GROUP_NAME, G_LOG_DOMAIN,
1597 0, &kvstore_dfsm_callbacks);
1598 g_static_mutex_unlock (&mutex);
1599
1600 return cfs_status.kvstore;
1601}
1602
1603gboolean
1604cfs_is_quorate(void)
1605{
1606 g_static_mutex_lock (&mutex);
1607 gboolean res = cfs_status.quorate;
1608 g_static_mutex_unlock (&mutex);
1609
1610 return res;
1611}
1612
1613void
1614cfs_set_quorate(
1615 uint32_t quorate,
1616 gboolean quiet)
1617{
1618 g_static_mutex_lock (&mutex);
1619
1620 uint32_t prev_quorate = cfs_status.quorate;
1621 cfs_status.quorate = quorate;
1622
1623 if (!prev_quorate && cfs_status.quorate) {
1624 if (!quiet)
1625 cfs_message("node has quorum");
1626 }
1627
1628 if (prev_quorate && !cfs_status.quorate) {
1629 if (!quiet)
1630 cfs_message("node lost quorum");
1631 }
1632
1633 g_static_mutex_unlock (&mutex);
1634}
1635