]> git.proxmox.com Git - pve-cluster.git/blame - data/src/status.c
fix bug 204: use DERIVE instead of COUNTER
[pve-cluster.git] / data / src / status.c
CommitLineData
fe000966
DM
1/*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19*/
20
21#define G_LOG_DOMAIN "status"
22
23#ifdef HAVE_CONFIG_H
24#include <config.h>
25#endif /* HAVE_CONFIG_H */
26
27#include <stdio.h>
28#include <stdint.h>
29#include <string.h>
30#include <errno.h>
31#include <glib.h>
32#include <sys/syslog.h>
33#include <rrd.h>
34#include <rrd_client.h>
35#include <time.h>
36
37#include "cfs-utils.h"
38#include "status.h"
39#include "logger.h"
40
41#define KVSTORE_CPG_GROUP_NAME "pve_kvstore_v1"
42
43typedef enum {
44 KVSTORE_MESSAGE_UPDATE = 1,
45 KVSTORE_MESSAGE_UPDATE_COMPLETE = 2,
46 KVSTORE_MESSAGE_LOG = 3,
47} kvstore_message_t;
48
49static uint32_t vminfo_version_counter;
50
51typedef struct {
52 uint32_t vmid;
53 char *nodename;
54 int vmtype;
55 uint32_t version;
56} vminfo_t;
57
58typedef struct {
59 char *key;
60 gpointer data;
61 size_t len;
62 uint32_t version;
63} kventry_t;
64
65typedef struct {
66 char *key;
67 gpointer data;
68 size_t len;
69 uint32_t time;
70} rrdentry_t;
71
72typedef struct {
73 char *path;
74 uint32_t version;
75} memdb_change_t;
76
77static memdb_change_t memdb_change_array[] = {
78 { .path = "cluster.conf" },
ec48ec22 79 { .path = "cluster.conf.new" },
fe000966
DM
80 { .path = "storage.cfg" },
81 { .path = "user.cfg" },
82 { .path = "domains.cfg" },
83 { .path = "priv/shadow.cfg" },
84 { .path = "datacenter.cfg" },
e1735a61 85 { .path = "vzdump.cron" },
fe000966
DM
86};
87
88static GStaticMutex mutex = G_STATIC_MUTEX_INIT;
89
90typedef struct {
91 time_t start_time;
92
93 uint32_t quorate;
94
95 cfs_clinfo_t *clinfo;
96 uint32_t clinfo_version;
97
98 GHashTable *vmlist;
99 uint32_t vmlist_version;
100
101 dfsm_t *kvstore;
102 GHashTable *kvhash;
103 GHashTable *rrdhash;
104 GHashTable *iphash;
105
106 GHashTable *memdb_changes;
107
108 clusterlog_t *clusterlog;
109} cfs_status_t;
110
111static cfs_status_t cfs_status;
112
113struct cfs_clnode {
114 char *name;
115 uint32_t nodeid;
116 uint32_t votes;
117 gboolean online;
118 GHashTable *kvhash;
119};
120
121struct cfs_clinfo {
122 char *cluster_name;
123 uint32_t cman_version;
124
125 GHashTable *nodes_byid;
126 GHashTable *nodes_byname;
127};
128
129static guint
130g_int32_hash (gconstpointer v)
131{
132 return *(const uint32_t *) v;
133}
134
135static gboolean
136g_int32_equal (gconstpointer v1,
137 gconstpointer v2)
138{
139 return *((const uint32_t*) v1) == *((const uint32_t*) v2);
140}
141
142static void vminfo_free(vminfo_t *vminfo)
143{
144 g_return_if_fail(vminfo != NULL);
145
146 if (vminfo->nodename)
147 g_free(vminfo->nodename);
148
149
150 g_free(vminfo);
151}
152
153void cfs_clnode_destroy(
154 cfs_clnode_t *clnode)
155{
156 g_return_if_fail(clnode != NULL);
157
158 if (clnode->kvhash)
159 g_hash_table_destroy(clnode->kvhash);
160
161 if (clnode->name)
162 g_free(clnode->name);
163
164 g_free(clnode);
165}
166
167cfs_clnode_t *cfs_clnode_new(
168 const char *name,
169 uint32_t nodeid,
170 uint32_t votes)
171{
172 g_return_val_if_fail(name != NULL, NULL);
173
174 cfs_clnode_t *clnode = g_new0(cfs_clnode_t, 1);
175 if (!clnode)
176 return NULL;
177
178 clnode->name = g_strdup(name);
179 clnode->nodeid = nodeid;
180 clnode->votes = votes;
181
182 return clnode;
183}
184
185gboolean cfs_clinfo_destroy(
186 cfs_clinfo_t *clinfo)
187{
188 g_return_val_if_fail(clinfo != NULL, FALSE);
189
190 if (clinfo->cluster_name)
191 g_free(clinfo->cluster_name);
192
193 if (clinfo->nodes_byname)
194 g_hash_table_destroy(clinfo->nodes_byname);
195
196 if (clinfo->nodes_byid)
197 g_hash_table_destroy(clinfo->nodes_byid);
198
199 g_free(clinfo);
200
201 return TRUE;
202}
203
204cfs_clinfo_t *cfs_clinfo_new(
205 const char *cluster_name,
206 uint32_t cman_version)
207{
208 g_return_val_if_fail(cluster_name != NULL, NULL);
209
210 cfs_clinfo_t *clinfo = g_new0(cfs_clinfo_t, 1);
211 if (!clinfo)
212 return NULL;
213
214 clinfo->cluster_name = g_strdup(cluster_name);
215 clinfo->cman_version = cman_version;
216
217 if (!(clinfo->nodes_byid = g_hash_table_new_full(
218 g_int32_hash, g_int32_equal, NULL,
219 (GDestroyNotify)cfs_clnode_destroy)))
220 goto fail;
221
222 if (!(clinfo->nodes_byname = g_hash_table_new(g_str_hash, g_str_equal)))
223 goto fail;
224
225 return clinfo;
226
227fail:
228 cfs_clinfo_destroy(clinfo);
229
230 return NULL;
231}
232
233gboolean cfs_clinfo_add_node(
234 cfs_clinfo_t *clinfo,
235 cfs_clnode_t *clnode)
236{
237 g_return_val_if_fail(clinfo != NULL, FALSE);
238 g_return_val_if_fail(clnode != NULL, FALSE);
239
240 g_hash_table_replace(clinfo->nodes_byid, &clnode->nodeid, clnode);
241 g_hash_table_replace(clinfo->nodes_byname, clnode->name, clnode);
242
243 return TRUE;
244}
245
246int
247cfs_create_memberlist_msg(
248 GString *str)
249{
250 g_return_val_if_fail(str != NULL, -EINVAL);
251
252 g_static_mutex_lock(&mutex);
253
254 g_string_append_printf(str,"{\n");
255
256 guint nodecount = 0;
257
258 cfs_clinfo_t *clinfo = cfs_status.clinfo;
259
260 if (clinfo && clinfo->nodes_byid)
261 nodecount = g_hash_table_size(clinfo->nodes_byid);
262
263 if (nodecount) {
264 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
265 g_string_append_printf(str, "\"version\": %u,\n", cfs_status.clinfo_version);
266
267 g_string_append_printf(str, "\"cluster\": { ");
268 g_string_append_printf(str, "\"name\": \"%s\", \"version\": %d, "
269 "\"nodes\": %d, \"quorate\": %d ",
270 clinfo->cluster_name, clinfo->cman_version,
271 nodecount, cfs_status.quorate);
272
273 g_string_append_printf(str,"},\n");
274 g_string_append_printf(str,"\"nodelist\": {\n");
275
276 GHashTable *ht = clinfo->nodes_byid;
277 GHashTableIter iter;
278 gpointer key, value;
279
280 g_hash_table_iter_init (&iter, ht);
281
282 int i = 0;
283 while (g_hash_table_iter_next (&iter, &key, &value)) {
284 cfs_clnode_t *node = (cfs_clnode_t *)value;
285 if (i) g_string_append_printf(str, ",\n");
286 i++;
287
288 g_string_append_printf(str, " \"%s\": { \"id\": %d, \"online\": %d",
289 node->name, node->nodeid, node->online);
290
291
292 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, node->name);
293 if (ip) {
294 g_string_append_printf(str, ", \"ip\": \"%s\"", ip);
295 }
296
297 g_string_append_printf(str, "}");
298
299 }
300 g_string_append_printf(str,"\n }\n");
301 } else {
302 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
303 g_string_append_printf(str, "\"version\": %u\n", cfs_status.clinfo_version);
304 }
305
306 g_string_append_printf(str,"}\n");
307
308 g_static_mutex_unlock (&mutex);
309
310 return 0;
311}
312
313static void
314kventry_free(kventry_t *entry)
315{
316 g_return_if_fail(entry != NULL);
317
318 g_free(entry->key);
319 g_free(entry->data);
320 g_free(entry);
321}
322
323static GHashTable *
324kventry_hash_new(void)
325{
326 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
327 (GDestroyNotify)kventry_free);
328}
329
330static void
331rrdentry_free(rrdentry_t *entry)
332{
333 g_return_if_fail(entry != NULL);
334
335 g_free(entry->key);
336 g_free(entry->data);
337 g_free(entry);
338}
339
340static GHashTable *
341rrdentry_hash_new(void)
342{
343 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
344 (GDestroyNotify)rrdentry_free);
345}
346
347void
348cfs_cluster_log_dump(GString *str, const char *user, guint max_entries)
349{
350 clusterlog_dump(cfs_status.clusterlog, str, user, max_entries);
351}
352
353void
354cfs_cluster_log(clog_entry_t *entry)
355{
356 g_return_if_fail(entry != NULL);
357
358 clusterlog_insert(cfs_status.clusterlog, entry);
359
360 if (cfs_status.kvstore) {
361 struct iovec iov[1];
362 iov[0].iov_base = (char *)entry;
363 iov[0].iov_len = clog_entry_size(entry);
364
365 dfsm_send_message(cfs_status.kvstore, KVSTORE_MESSAGE_LOG, iov, 1);
366 }
367}
368
369void cfs_status_init(void)
370{
371 g_static_mutex_lock (&mutex);
372
373 cfs_status.start_time = time(NULL);
374
375 cfs_status.vmlist = vmlist_hash_new();
376
377 cfs_status.kvhash = kventry_hash_new();
378
379 cfs_status.rrdhash = rrdentry_hash_new();
380
381 cfs_status.iphash = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
382
383 cfs_status.memdb_changes = g_hash_table_new(g_str_hash, g_str_equal);
384
385 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
386 g_hash_table_replace(cfs_status.memdb_changes,
387 memdb_change_array[i].path,
388 &memdb_change_array[i]);
389 }
390
391 cfs_status.clusterlog = clusterlog_new();
392
393 // fixme:
394 clusterlog_add(cfs_status.clusterlog, "root", "cluster", getpid(),
395 LOG_INFO, "starting cluster log");
396
397 g_static_mutex_unlock (&mutex);
398}
399
400void cfs_status_cleanup(void)
401{
402 g_static_mutex_lock (&mutex);
403
404 cfs_status.clinfo_version++;
405
406 if (cfs_status.clinfo) {
407 cfs_clinfo_destroy(cfs_status.clinfo);
408 cfs_status.clinfo = NULL;
409 }
410
411 if (cfs_status.vmlist) {
412 g_hash_table_destroy(cfs_status.vmlist);
413 cfs_status.vmlist = NULL;
414 }
415
416 if (cfs_status.kvhash) {
417 g_hash_table_destroy(cfs_status.kvhash);
418 cfs_status.kvhash = NULL;
419 }
420
421 if (cfs_status.rrdhash) {
422 g_hash_table_destroy(cfs_status.rrdhash);
423 cfs_status.rrdhash = NULL;
424 }
425
426 if (cfs_status.iphash) {
427 g_hash_table_destroy(cfs_status.iphash);
428 cfs_status.iphash = NULL;
429 }
430
431 if (cfs_status.clusterlog)
432 clusterlog_destroy(cfs_status.clusterlog);
433
434 g_static_mutex_unlock (&mutex);
435}
436
437void cfs_status_set_clinfo(
438 cfs_clinfo_t *clinfo)
439{
440 g_return_if_fail(clinfo != NULL);
441
442 g_static_mutex_lock (&mutex);
443
444 cfs_status.clinfo_version++;
445
446 cfs_clinfo_t *old = cfs_status.clinfo;
447
448 cfs_status.clinfo = clinfo;
449
450 cfs_message("update cluster info (cluster name %s, version = %d)",
451 clinfo->cluster_name, clinfo->cman_version);
452
453
454 if (old && old->nodes_byid && clinfo->nodes_byid) {
455 /* copy kvstore */
456 GHashTable *ht = clinfo->nodes_byid;
457 GHashTableIter iter;
458 gpointer key, value;
459
460 g_hash_table_iter_init (&iter, ht);
461
462 while (g_hash_table_iter_next (&iter, &key, &value)) {
463 cfs_clnode_t *node = (cfs_clnode_t *)value;
464 cfs_clnode_t *oldnode;
469d3acb 465 if ((oldnode = g_hash_table_lookup(old->nodes_byid, key))) {
fe000966
DM
466 node->online = oldnode->online;
467 node->kvhash = oldnode->kvhash;
468 oldnode->kvhash = NULL;
469 }
470 }
471
472 }
473
474 if (old)
475 cfs_clinfo_destroy(old);
476
477
478 g_static_mutex_unlock (&mutex);
479}
480
481static void
482dump_kvstore_versions(
483 GString *str,
484 GHashTable *kvhash,
485 const char *nodename)
486{
487 g_return_if_fail(kvhash != NULL);
488 g_return_if_fail(str != NULL);
489 g_return_if_fail(nodename != NULL);
490
491 GHashTable *ht = kvhash;
492 GHashTableIter iter;
493 gpointer key, value;
494
495 g_string_append_printf(str, "\"%s\": {\n", nodename);
496
497 g_hash_table_iter_init (&iter, ht);
498
499 int i = 0;
500 while (g_hash_table_iter_next (&iter, &key, &value)) {
501 kventry_t *entry = (kventry_t *)value;
502 if (i) g_string_append_printf(str, ",\n");
503 i++;
504 g_string_append_printf(str,"\"%s\": %u", entry->key, entry->version);
505 }
506
507 g_string_append_printf(str, "}\n");
508}
509
510int
511cfs_create_version_msg(GString *str)
512{
513 g_return_val_if_fail(str != NULL, -EINVAL);
514
515 g_static_mutex_lock (&mutex);
516
517 g_string_append_printf(str,"{\n");
518
519 g_string_append_printf(str, "\"starttime\": %lu,\n", (unsigned long)cfs_status.start_time);
520
521 g_string_append_printf(str, "\"clinfo\": %u,\n", cfs_status.clinfo_version);
522
523 g_string_append_printf(str, "\"vmlist\": %u,\n", cfs_status.vmlist_version);
524
525 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
526 g_string_append_printf(str, "\"%s\": %u,\n",
527 memdb_change_array[i].path,
528 memdb_change_array[i].version);
529 }
530
531 g_string_append_printf(str, "\"kvstore\": {\n");
532
533 dump_kvstore_versions(str, cfs_status.kvhash, cfs.nodename);
534
535 cfs_clinfo_t *clinfo = cfs_status.clinfo;
536
537 if (clinfo && clinfo->nodes_byid) {
538 GHashTable *ht = clinfo->nodes_byid;
539 GHashTableIter iter;
540 gpointer key, value;
541
542 g_hash_table_iter_init (&iter, ht);
543
544 while (g_hash_table_iter_next (&iter, &key, &value)) {
545 cfs_clnode_t *node = (cfs_clnode_t *)value;
546 if (!node->kvhash)
547 continue;
548 g_string_append_printf(str, ",\n");
549 dump_kvstore_versions(str, node->kvhash, node->name);
550 }
551 }
552
553 g_string_append_printf(str,"}\n");
554
555 g_string_append_printf(str,"}\n");
556
557 g_static_mutex_unlock (&mutex);
558
559 return 0;
560}
561
562GHashTable *
563vmlist_hash_new(void)
564{
565 return g_hash_table_new_full(g_int_hash, g_int_equal, NULL,
566 (GDestroyNotify)vminfo_free);
567}
568
569gboolean
570vmlist_hash_insert_vm(
571 GHashTable *vmlist,
572 int vmtype,
573 guint32 vmid,
574 const char *nodename,
575 gboolean replace)
576{
577 g_return_val_if_fail(vmlist != NULL, FALSE);
578 g_return_val_if_fail(nodename != NULL, FALSE);
579 g_return_val_if_fail(vmid != 0, FALSE);
580 g_return_val_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ, FALSE);
581
582 if (!replace && g_hash_table_lookup(vmlist, &vmid)) {
583 cfs_critical("detected duplicate VMID %d", vmid);
584 return FALSE;
585 }
586
587 vminfo_t *vminfo = g_new0(vminfo_t, 1);
588
589 vminfo->vmid = vmid;
590 vminfo->vmtype = vmtype;
591 vminfo->nodename = g_strdup(nodename);
592
593 vminfo->version = ++vminfo_version_counter;
594
595 g_hash_table_replace(vmlist, &vminfo->vmid, vminfo);
596
597 return TRUE;
598}
599
600void
601vmlist_register_vm(
602 int vmtype,
603 guint32 vmid,
604 const char *nodename)
605{
606 g_return_if_fail(cfs_status.vmlist != NULL);
607 g_return_if_fail(nodename != NULL);
608 g_return_if_fail(vmid != 0);
609 g_return_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ);
610
611 cfs_debug("vmlist_register_vm: %s/%u %d", nodename, vmid, vmtype);
612
613 g_static_mutex_lock (&mutex);
614
615 cfs_status.vmlist_version++;
616
617 vmlist_hash_insert_vm(cfs_status.vmlist, vmtype, vmid, nodename, TRUE);
618
619 g_static_mutex_unlock (&mutex);
620}
621
622gboolean
623vmlist_different_vm_exists(
624 int vmtype,
625 guint32 vmid,
626 const char *nodename)
627{
628 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
629 g_return_val_if_fail(vmid != 0, FALSE);
630
631 gboolean res = FALSE;
632
633 g_static_mutex_lock (&mutex);
634
635 vminfo_t *vminfo;
636 if ((vminfo = (vminfo_t *)g_hash_table_lookup(cfs_status.vmlist, &vmid))) {
637 if (!(vminfo->vmtype == vmtype && strcmp(vminfo->nodename, nodename) == 0))
638 res = TRUE;
639 }
640 g_static_mutex_unlock (&mutex);
641
642 return res;
643}
644
645gboolean
646vmlist_vm_exists(
647 guint32 vmid)
648{
649 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
650 g_return_val_if_fail(vmid != 0, FALSE);
651
652 g_static_mutex_lock (&mutex);
653
654 gpointer res = g_hash_table_lookup(cfs_status.vmlist, &vmid);
655
656 g_static_mutex_unlock (&mutex);
657
658 return res != NULL;
659}
660
661void
662vmlist_delete_vm(
663 guint32 vmid)
664{
665 g_return_if_fail(cfs_status.vmlist != NULL);
666 g_return_if_fail(vmid != 0);
667
668 g_static_mutex_lock (&mutex);
669
670 cfs_status.vmlist_version++;
671
672 g_hash_table_remove(cfs_status.vmlist, &vmid);
673
674 g_static_mutex_unlock (&mutex);
675}
676
677void cfs_status_set_vmlist(
678 GHashTable *vmlist)
679{
680 g_return_if_fail(vmlist != NULL);
681
682 g_static_mutex_lock (&mutex);
683
684 cfs_status.vmlist_version++;
685
686 if (cfs_status.vmlist)
687 g_hash_table_destroy(cfs_status.vmlist);
688
689 cfs_status.vmlist = vmlist;
690
691 g_static_mutex_unlock (&mutex);
692}
693
694int
695cfs_create_vmlist_msg(GString *str)
696{
697 g_return_val_if_fail(cfs_status.vmlist != NULL, -EINVAL);
698 g_return_val_if_fail(str != NULL, -EINVAL);
699
700 g_static_mutex_lock (&mutex);
701
702 g_string_append_printf(str,"{\n");
703
704 GHashTable *ht = cfs_status.vmlist;
705
706 guint count = g_hash_table_size(ht);
707
708 if (!count) {
709 g_string_append_printf(str,"\"version\": %u\n", cfs_status.vmlist_version);
710 } else {
711 g_string_append_printf(str,"\"version\": %u,\n", cfs_status.vmlist_version);
712
713 g_string_append_printf(str,"\"ids\": {\n");
714
715 GHashTableIter iter;
716 gpointer key, value;
717
718 g_hash_table_iter_init (&iter, ht);
719
720 int first = 1;
721 while (g_hash_table_iter_next (&iter, &key, &value)) {
722 vminfo_t *vminfo = (vminfo_t *)value;
723 char *type;
724 if (vminfo->vmtype == VMTYPE_QEMU) {
725 type = "qemu";
726 } else if (vminfo->vmtype == VMTYPE_OPENVZ) {
727 type = "openvz";
728 } else {
729 type = "unknown";
730 }
731
732 if (!first)
733 g_string_append_printf(str, ",\n");
734 first = 0;
735
736 g_string_append_printf(str,"\"%u\": { \"node\": \"%s\", \"type\": \"%s\", \"version\": %u }",
737 vminfo->vmid, vminfo->nodename, type, vminfo->version);
738 }
739
740 g_string_append_printf(str,"}\n");
741 }
742 g_string_append_printf(str,"\n}\n");
743
744 g_static_mutex_unlock (&mutex);
745
746 return 0;
747}
748
749void
750record_memdb_change(const char *path)
751{
752 g_return_if_fail(cfs_status.memdb_changes != 0);
753
754 memdb_change_t *ce;
755
756 if ((ce = (memdb_change_t *)g_hash_table_lookup(cfs_status.memdb_changes, path))) {
757 ce->version++;
758 }
759}
760
761void
762record_memdb_reload(void)
763{
764 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
765 memdb_change_array[i].version++;
766 }
767}
768
769static gboolean
770kventry_hash_set(
771 GHashTable *kvhash,
772 const char *key,
773 gconstpointer data,
774 size_t len)
775{
776 g_return_val_if_fail(kvhash != NULL, FALSE);
777 g_return_val_if_fail(key != NULL, FALSE);
778 g_return_val_if_fail(data != NULL, FALSE);
779
780 kventry_t *entry;
781 if ((entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
782 g_free(entry->data);
783 entry->data = g_memdup(data, len);
784 entry->len = len;
785 entry->version++;
786 } else {
787 kventry_t *entry = g_new0(kventry_t, 1);
788
789 entry->key = g_strdup(key);
790 entry->data = g_memdup(data, len);
791 entry->len = len;
792
793 g_hash_table_replace(kvhash, entry->key, entry);
794 }
795
796 return TRUE;
797}
798
799static const char *rrd_def_node[] = {
800 "DS:loadavg:GAUGE:120:0:U",
801 "DS:maxcpu:GAUGE:120:0:U",
802 "DS:cpu:GAUGE:120:0:U",
803 "DS:iowait:GAUGE:120:0:U",
804 "DS:memtotal:GAUGE:120:0:U",
805 "DS:memused:GAUGE:120:0:U",
806 "DS:swaptotal:GAUGE:120:0:U",
807 "DS:swapused:GAUGE:120:0:U",
808 "DS:roottotal:GAUGE:120:0:U",
809 "DS:rootused:GAUGE:120:0:U",
764296f1
DM
810 "DS:netin:DERIVE:120:0:U",
811 "DS:netout:DERIVE:120:0:U",
fe000966
DM
812
813 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
814 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
815 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
816 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
817 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
818
819 "RRA:MAX:0.5:1:70", // 1 min max - one hour
820 "RRA:MAX:0.5:30:70", // 30 min max - one day
821 "RRA:MAX:0.5:180:70", // 3 hour max - one week
822 "RRA:MAX:0.5:720:70", // 12 hour max - one month
823 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
824 NULL,
825};
826
827static const char *rrd_def_vm[] = {
828 "DS:maxcpu:GAUGE:120:0:U",
829 "DS:cpu:GAUGE:120:0:U",
830 "DS:maxmem:GAUGE:120:0:U",
831 "DS:mem:GAUGE:120:0:U",
832 "DS:maxdisk:GAUGE:120:0:U",
833 "DS:disk:GAUGE:120:0:U",
764296f1
DM
834 "DS:netin:DERIVE:120:0:U",
835 "DS:netout:DERIVE:120:0:U",
836 "DS:diskread:DERIVE:120:0:U",
837 "DS:diskwrite:DERIVE:120:0:U",
fe000966
DM
838
839 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
840 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
841 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
842 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
843 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
844
845 "RRA:MAX:0.5:1:70", // 1 min max - one hour
846 "RRA:MAX:0.5:30:70", // 30 min max - one day
847 "RRA:MAX:0.5:180:70", // 3 hour max - one week
848 "RRA:MAX:0.5:720:70", // 12 hour max - one month
849 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
850 NULL,
851};
852
853static const char *rrd_def_storage[] = {
854 "DS:total:GAUGE:120:0:U",
855 "DS:used:GAUGE:120:0:U",
856
857 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
858 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
859 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
860 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
861 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
862
863 "RRA:MAX:0.5:1:70", // 1 min max - one hour
864 "RRA:MAX:0.5:30:70", // 30 min max - one day
865 "RRA:MAX:0.5:180:70", // 3 hour max - one week
866 "RRA:MAX:0.5:720:70", // 12 hour max - one month
867 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
868 NULL,
869};
870
871#define RRDDIR "/var/lib/rrdcached/db"
872
873static void
874create_rrd_file(
875 const char *filename,
876 int argcount,
877 const char *rrddef[])
878{
879 /* start at day boundary */
880 time_t ctime;
881 time(&ctime);
882 struct tm *ltm = localtime(&ctime);
883 ltm->tm_sec = 0;
884 ltm->tm_min = 0;
885 ltm->tm_hour = 0;
886
887 rrd_clear_error();
888 if (rrd_create_r(filename, 60, timelocal(ltm), argcount, rrddef)) {
889 cfs_message("RRD create error %s: %s", filename, rrd_get_error());
890 }
891}
892
893static inline const char *
894rrd_skip_data(
895 const char *data,
896 int count)
897{
898 int found = 0;
899 while (*data && found < count) {
900 if (*data++ == ':')
901 found++;
902 }
903 return data;
904}
905
906static void
907update_rrd_data(
908 const char *key,
909 gconstpointer data,
910 size_t len)
911{
912 g_return_if_fail(key != NULL);
913 g_return_if_fail(data != NULL);
914 g_return_if_fail(len > 0);
915 g_return_if_fail(len < 4096);
916
917 static const char *rrdcsock = "unix:/var/run/rrdcached.sock";
918
919 int use_daemon = 1;
920 if (rrdc_connect(rrdcsock) != 0)
921 use_daemon = 0;
922
ba9dcfc1 923 char *filename = NULL;
fe000966
DM
924
925 int skip = 0;
926
927 if (strncmp(key, "pve2-node/", 10) == 0) {
928 const char *node = key + 10;
929
f9c865a8 930 skip = 2;
fe000966
DM
931
932 if (strchr(node, '/') != NULL)
933 goto keyerror;
934
935 if (strlen(node) < 1)
936 goto keyerror;
937
ba9dcfc1
DM
938 filename = g_strdup_printf(RRDDIR "/%s", key);
939
fe000966
DM
940 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
941
942 mkdir(RRDDIR "/pve2-node", 0755);
943 int argcount = sizeof(rrd_def_node)/sizeof(void*) - 1;
944 create_rrd_file(filename, argcount, rrd_def_node);
945 }
946
ba9dcfc1
DM
947 } else if ((strncmp(key, "pve2-vm/", 8) == 0) ||
948 (strncmp(key, "pve2.3-vm/", 10) == 0)) {
949 const char *vmid;
fe000966 950
ba9dcfc1
DM
951 if (strncmp(key, "pve2-vm/", 8) == 0) {
952 vmid = key + 8;
953 skip = 2;
954 } else {
955 vmid = key + 10;
94e4cba3 956 skip = 4;
ba9dcfc1 957 }
fe000966
DM
958
959 if (strchr(vmid, '/') != NULL)
960 goto keyerror;
961
962 if (strlen(vmid) < 1)
963 goto keyerror;
964
ba9dcfc1
DM
965 filename = g_strdup_printf(RRDDIR "/%s/%s", "pve2-vm", vmid);
966
fe000966
DM
967 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
968
969 mkdir(RRDDIR "/pve2-vm", 0755);
970 int argcount = sizeof(rrd_def_vm)/sizeof(void*) - 1;
971 create_rrd_file(filename, argcount, rrd_def_vm);
972 }
973
974 } else if (strncmp(key, "pve2-storage/", 13) == 0) {
975 const char *node = key + 13;
976
977 const char *storage = node;
978 while (*storage && *storage != '/')
979 storage++;
980
981 if (*storage != '/' || ((storage - node) < 1))
982 goto keyerror;
983
984 storage++;
985
986 if (strchr(storage, '/') != NULL)
987 goto keyerror;
988
989 if (strlen(storage) < 1)
990 goto keyerror;
991
ba9dcfc1
DM
992 filename = g_strdup_printf(RRDDIR "/%s", key);
993
fe000966
DM
994 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
995
996 mkdir(RRDDIR "/pve2-storage", 0755);
997
998 char *dir = g_path_get_dirname(filename);
999 mkdir(dir, 0755);
1000 g_free(dir);
1001
1002 int argcount = sizeof(rrd_def_storage)/sizeof(void*) - 1;
1003 create_rrd_file(filename, argcount, rrd_def_storage);
1004 }
1005
1006 } else {
1007 goto keyerror;
1008 }
1009
1010 const char *dp = skip ? rrd_skip_data(data, skip) : data;
1011
1012 const char *update_args[] = { dp, NULL };
1013
1014 if (use_daemon) {
1015 int status;
1016 if ((status = rrdc_update(filename, 1, update_args)) != 0) {
1017 cfs_message("RRDC update error %s: %d", filename, status);
1018 rrdc_disconnect();
1019 rrd_clear_error();
1020 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1021 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1022 }
1023 }
1024
1025 } else {
1026 rrd_clear_error();
1027 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1028 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1029 }
1030 }
1031
1032ret:
ba9dcfc1
DM
1033 if (filename)
1034 g_free(filename);
1035
fe000966
DM
1036 return;
1037
1038keyerror:
1039 cfs_critical("RRD update error: unknown/wrong key %s", key);
1040 goto ret;
1041}
1042
1043static gboolean
1044rrd_entry_is_old(
1045 gpointer key,
1046 gpointer value,
1047 gpointer user_data)
1048{
1049 rrdentry_t *entry = (rrdentry_t *)value;
1050 uint32_t ctime = GPOINTER_TO_UINT(user_data);
1051
1052 int diff = ctime - entry->time;
1053
1054 /* remove everything older than 5 minutes */
1055 int expire = 60*5;
1056
1057 return (diff > expire) ? TRUE : FALSE;
1058}
1059
1060static char *rrd_dump_buf = NULL;
1061static time_t rrd_dump_last = 0;
1062
1063void
1064cfs_rrd_dump(GString *str)
1065{
1066 time_t ctime;
1067 time(&ctime);
1068
1069 if (rrd_dump_buf && (ctime - rrd_dump_last) < 2) {
1070 g_string_assign(str, rrd_dump_buf);
1071 return;
1072 }
1073
1074 /* remove old data */
1075 g_hash_table_foreach_remove(cfs_status.rrdhash, rrd_entry_is_old,
1076 GUINT_TO_POINTER(ctime));
1077
1078 g_string_set_size(str, 0);
1079
1080 GHashTableIter iter;
1081 gpointer key, value;
1082
1083 g_hash_table_iter_init (&iter, cfs_status.rrdhash);
1084
1085 while (g_hash_table_iter_next (&iter, &key, &value)) {
1086 rrdentry_t *entry = (rrdentry_t *)value;
1087 g_string_append(str, key);
1088 g_string_append(str, ":");
1089 g_string_append(str, entry->data);
1090 g_string_append(str, "\n");
1091 }
1092
1093 g_string_append_c(str, 0); // never return undef
1094
1095 rrd_dump_last = ctime;
1096 if (rrd_dump_buf)
1097 g_free(rrd_dump_buf);
1098 rrd_dump_buf = g_strdup(str->str);
1099}
1100
1101static gboolean
1102nodeip_hash_set(
1103 GHashTable *iphash,
1104 const char *nodename,
1105 const char *ip,
1106 size_t len)
1107{
1108 g_return_val_if_fail(iphash != NULL, FALSE);
1109 g_return_val_if_fail(nodename != NULL, FALSE);
1110 g_return_val_if_fail(ip != NULL, FALSE);
1111 g_return_val_if_fail(len > 0, FALSE);
1112 g_return_val_if_fail(len < 256, FALSE);
1113 g_return_val_if_fail(ip[len-1] == 0, FALSE);
1114
1115 char *oldip = (char *)g_hash_table_lookup(iphash, nodename);
1116
1117 if (!oldip || (strcmp(oldip, ip) != 0)) {
1118 cfs_status.clinfo_version++;
1119 g_hash_table_replace(iphash, g_strdup(nodename), g_strdup(ip));
1120 }
1121
1122 return TRUE;
1123}
1124
1125static gboolean
1126rrdentry_hash_set(
1127 GHashTable *rrdhash,
1128 const char *key,
1129 const char *data,
1130 size_t len)
1131{
1132 g_return_val_if_fail(rrdhash != NULL, FALSE);
1133 g_return_val_if_fail(key != NULL, FALSE);
1134 g_return_val_if_fail(data != NULL, FALSE);
1135 g_return_val_if_fail(len > 0, FALSE);
1136 g_return_val_if_fail(len < 4096, FALSE);
1137 g_return_val_if_fail(data[len-1] == 0, FALSE);
1138
1139 rrdentry_t *entry;
1140 if ((entry = (rrdentry_t *)g_hash_table_lookup(rrdhash, key))) {
1141 g_free(entry->data);
1142 entry->data = g_memdup(data, len);
1143 entry->len = len;
1144 entry->time = time(NULL);
1145 } else {
1146 rrdentry_t *entry = g_new0(rrdentry_t, 1);
1147
1148 entry->key = g_strdup(key);
1149 entry->data = g_memdup(data, len);
1150 entry->len = len;
1151 entry->time = time(NULL);
1152
1153 g_hash_table_replace(rrdhash, entry->key, entry);
1154 }
1155
1156 update_rrd_data(key, data, len);
1157
1158 return TRUE;
1159}
1160
1161static int
1162kvstore_send_update_message(
1163 dfsm_t *dfsm,
1164 const char *key,
1165 gpointer data,
1166 guint32 len)
1167{
1168
1169 struct iovec iov[2];
1170
1171 char name[256];
1172 g_strlcpy(name, key, sizeof(name));
1173
1174 iov[0].iov_base = &name;
1175 iov[0].iov_len = sizeof(name);
1176
1177 iov[1].iov_base = (char *)data;
1178 iov[1].iov_len = len;
1179
1180 if (dfsm_send_message(dfsm, KVSTORE_MESSAGE_UPDATE, iov, 2) == CS_OK)
1181 return 0;
1182
1183 return -EACCES;
1184}
1185
1186static clog_entry_t *
1187kvstore_parse_log_message(
1188 const void *msg,
1189 size_t msg_len)
1190{
1191 g_return_val_if_fail(msg != NULL, NULL);
1192
1193 if (msg_len < sizeof(clog_entry_t)) {
1194 cfs_critical("received short log message (%lu < %lu)", msg_len, sizeof(clog_entry_t));
1195 return NULL;
1196 }
1197
1198 clog_entry_t *entry = (clog_entry_t *)msg;
1199
1200 uint32_t size = sizeof(clog_entry_t) + entry->node_len +
1201 entry->ident_len + entry->tag_len + entry->msg_len;
1202
1203 if (msg_len != size) {
1204 cfs_critical("received log message with wrong size (%lu != %u)", msg_len, size);
1205 return NULL;
1206 }
1207
1208 msg = entry->data;
1209
1210 if (*((char *)msg + entry->node_len - 1)) {
1211 cfs_critical("unterminated string in log message");
1212 return NULL;
1213 }
1214 msg += entry->node_len;
1215
1216 if (*((char *)msg + entry->ident_len - 1)) {
1217 cfs_critical("unterminated string in log message");
1218 return NULL;
1219 }
1220 msg += entry->ident_len;
1221
1222 if (*((char *)msg + entry->tag_len - 1)) {
1223 cfs_critical("unterminated string in log message");
1224 return NULL;
1225 }
1226 msg += entry->tag_len;
1227
1228 if (*((char *)msg + entry->msg_len - 1)) {
1229 cfs_critical("unterminated string in log message");
1230 return NULL;
1231 }
1232
1233 return entry;
1234}
1235
1236static gboolean
1237kvstore_parse_update_message(
1238 const void *msg,
1239 size_t msg_len,
1240 const char **key,
1241 gconstpointer *data,
1242 guint32 *len)
1243{
1244 g_return_val_if_fail(msg != NULL, FALSE);
1245 g_return_val_if_fail(key != NULL, FALSE);
1246 g_return_val_if_fail(data != NULL, FALSE);
1247 g_return_val_if_fail(len != NULL, FALSE);
1248
1249 if (msg_len < 256) {
1250 cfs_critical("received short kvstore message (%lu < 256)", msg_len);
1251 return FALSE;
1252 }
1253
1254 /* test if key is null terminated */
1255 int i = 0;
1256 for (i = 0; i < 256; i++)
1257 if (((char *)msg)[i] == 0)
1258 break;
1259
1260 if (i == 256)
1261 return FALSE;
1262
1263
1264 *len = msg_len - 256;
1265 *key = msg;
1266 *data = msg + 256;
1267
1268 return TRUE;
1269}
1270
1271int
1272cfs_create_status_msg(
1273 GString *str,
1274 const char *nodename,
1275 const char *key)
1276{
1277 g_return_val_if_fail(str != NULL, -EINVAL);
1278 g_return_val_if_fail(key != NULL, -EINVAL);
1279
1280 int res = -ENOENT;
1281
1282 GHashTable *kvhash = NULL;
1283
1284 g_static_mutex_lock (&mutex);
1285
1286 if (!nodename || !nodename[0] || !strcmp(nodename, cfs.nodename)) {
1287 kvhash = cfs_status.kvhash;
1288 } else {
1289 cfs_clnode_t *clnode;
1290 if ((clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byname, nodename)))
1291 kvhash = clnode->kvhash;
1292 }
1293
1294 kventry_t *entry;
1295 if (kvhash && (entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
1296 g_string_append_len(str, entry->data, entry->len);
1297 res = 0;
1298 }
1299
1300 g_static_mutex_unlock (&mutex);
1301
1302 return res;
1303}
1304
1305int
1306cfs_status_set(
1307 const char *key,
1308 gpointer data,
1309 size_t len)
1310{
1311 g_return_val_if_fail(key != NULL, FALSE);
1312 g_return_val_if_fail(data != NULL, FALSE);
1313 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1314
1315 if (len > CFS_MAX_STATUS_SIZE)
1316 return -EFBIG;
1317
1318 g_static_mutex_lock (&mutex);
1319
1320 gboolean res;
1321
1322 if (strncmp(key, "rrd/", 4) == 0) {
1323 res = rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1324 } else if (!strcmp(key, "nodeip")) {
1325 res = nodeip_hash_set(cfs_status.iphash, cfs.nodename, data, len);
1326 } else {
1327 res = kventry_hash_set(cfs_status.kvhash, key, data, len);
1328 }
1329 g_static_mutex_unlock (&mutex);
1330
1331 if (cfs_status.kvstore)
1332 kvstore_send_update_message(cfs_status.kvstore, key, data, len);
1333
1334 return res ? 0 : -ENOMEM;
1335}
1336
1337gboolean
1338cfs_kvstore_node_set(
1339 uint32_t nodeid,
1340 const char *key,
1341 gconstpointer data,
1342 size_t len)
1343{
1344 g_return_val_if_fail(nodeid != 0, FALSE);
1345 g_return_val_if_fail(key != NULL, FALSE);
1346 g_return_val_if_fail(data != NULL, FALSE);
1347
1348 g_static_mutex_lock (&mutex);
1349
1350 if (!cfs_status.clinfo || !cfs_status.clinfo->nodes_byid)
1351 goto ret; /* ignore */
1352
1353 cfs_clnode_t *clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byid, &nodeid);
1354 if (!clnode)
1355 goto ret; /* ignore */
1356
1357 cfs_debug("got node %d status update %s", nodeid, key);
1358
1359 if (strncmp(key, "rrd/", 4) == 0) {
1360 rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1361 } else if (!strcmp(key, "nodeip")) {
1362 nodeip_hash_set(cfs_status.iphash, clnode->name, data, len);
1363 } else {
1364 if (!clnode->kvhash) {
1365 if (!(clnode->kvhash = kventry_hash_new())) {
1366 goto ret; /*ignore */
1367 }
1368 }
1369
1370 kventry_hash_set(clnode->kvhash, key, data, len);
1371
1372 }
1373ret:
1374 g_static_mutex_unlock (&mutex);
1375
1376 return TRUE;
1377}
1378
1379static gboolean
1380cfs_kvstore_sync(void)
1381{
1382 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1383 g_return_val_if_fail(cfs_status.kvstore != NULL, FALSE);
1384
1385 gboolean res = TRUE;
1386
1387 g_static_mutex_lock (&mutex);
1388
1389 GHashTable *ht = cfs_status.kvhash;
1390 GHashTableIter iter;
1391 gpointer key, value;
1392
1393 g_hash_table_iter_init (&iter, ht);
1394
1395 while (g_hash_table_iter_next (&iter, &key, &value)) {
1396 kventry_t *entry = (kventry_t *)value;
1397 kvstore_send_update_message(cfs_status.kvstore, entry->key, entry->data, entry->len);
1398 }
1399
1400 g_static_mutex_unlock (&mutex);
1401
1402 return res;
1403}
1404
1405static int
1406dfsm_deliver(
1407 dfsm_t *dfsm,
1408 gpointer data,
1409 int *res_ptr,
1410 uint32_t nodeid,
1411 uint32_t pid,
1412 uint16_t msg_type,
1413 uint32_t msg_time,
1414 const void *msg,
1415 size_t msg_len)
1416{
1417 g_return_val_if_fail(dfsm != NULL, -1);
1418 g_return_val_if_fail(msg != NULL, -1);
1419 g_return_val_if_fail(res_ptr != NULL, -1);
1420
1421 /* ignore message for ourself */
1422 if (dfsm_nodeid_is_local(dfsm, nodeid, pid))
1423 goto ret;
1424
1425 if (msg_type == KVSTORE_MESSAGE_UPDATE) {
1426 const char *key;
1427 gconstpointer data;
1428 guint32 len;
1429 if (kvstore_parse_update_message(msg, msg_len, &key, &data, &len)) {
1430 cfs_kvstore_node_set(nodeid, key, data, len);
1431 } else {
1432 cfs_critical("cant parse update message");
1433 }
1434 } else if (msg_type == KVSTORE_MESSAGE_LOG) {
1435 cfs_message("received log"); // fixme: remove
1436 const clog_entry_t *entry;
1437 if ((entry = kvstore_parse_log_message(msg, msg_len))) {
1438 clusterlog_insert(cfs_status.clusterlog, entry);
1439 } else {
1440 cfs_critical("cant parse log message");
1441 }
1442 } else {
1443 cfs_critical("received unknown message type %d\n", msg_type);
1444 goto fail;
1445 }
1446
1447ret:
1448 *res_ptr = 0;
1449 return 1;
1450
1451fail:
1452 *res_ptr = -EACCES;
1453 return 1;
1454}
1455
1456static void
1457dfsm_confchg(
1458 dfsm_t *dfsm,
1459 gpointer data,
1460 const struct cpg_address *member_list,
1461 size_t member_list_entries)
1462{
1463 g_return_if_fail(dfsm != NULL);
1464 g_return_if_fail(member_list != NULL);
1465
1466 cfs_debug("enter %s", __func__);
1467
1468 g_static_mutex_lock (&mutex);
1469
1470 cfs_clinfo_t *clinfo = cfs_status.clinfo;
1471
1472 if (clinfo && clinfo->nodes_byid) {
1473
1474 GHashTable *ht = clinfo->nodes_byid;
1475 GHashTableIter iter;
1476 gpointer key, value;
1477
1478 g_hash_table_iter_init (&iter, ht);
1479
1480 while (g_hash_table_iter_next (&iter, &key, &value)) {
1481 cfs_clnode_t *node = (cfs_clnode_t *)value;
1482 node->online = FALSE;
1483 }
1484
1485 for (int i = 0; i < member_list_entries; i++) {
1486 cfs_clnode_t *node;
1487 if ((node = g_hash_table_lookup(clinfo->nodes_byid, &member_list[i].nodeid))) {
1488 node->online = TRUE;
1489 }
1490 }
1491
1492 cfs_status.clinfo_version++;
1493 }
1494
1495 g_static_mutex_unlock (&mutex);
1496}
1497
1498static gpointer
1499dfsm_get_state(
1500 dfsm_t *dfsm,
1501 gpointer data,
1502 unsigned int *res_len)
1503{
1504 g_return_val_if_fail(dfsm != NULL, NULL);
1505
1506 gpointer msg = clusterlog_get_state(cfs_status.clusterlog, res_len);
1507
1508 return msg;
1509}
1510
1511static int
1512dfsm_process_update(
1513 dfsm_t *dfsm,
1514 gpointer data,
1515 dfsm_sync_info_t *syncinfo,
1516 uint32_t nodeid,
1517 uint32_t pid,
1518 const void *msg,
1519 size_t msg_len)
1520{
1521 cfs_critical("%s: received unexpected update message", __func__);
1522
1523 return -1;
1524}
1525
1526static int
1527dfsm_process_state_update(
1528 dfsm_t *dfsm,
1529 gpointer data,
1530 dfsm_sync_info_t *syncinfo)
1531{
1532 g_return_val_if_fail(dfsm != NULL, -1);
1533 g_return_val_if_fail(syncinfo != NULL, -1);
1534
1535 clog_base_t *clog[syncinfo->node_count];
1536
1537 int local_index = -1;
1538 for (int i = 0; i < syncinfo->node_count; i++) {
1539 dfsm_node_info_t *ni = &syncinfo->nodes[i];
1540 ni->synced = 1;
1541
1542 if (syncinfo->local == ni)
1543 local_index = i;
1544
1545 clog_base_t *base = (clog_base_t *)ni->state;
1546 if (ni->state_len > 8 && ni->state_len == clog_size(base)) {
1547 clog[i] = ni->state;
1548 } else {
1549 cfs_critical("received log with wrong size %u", ni->state_len);
1550 clog[i] = NULL;
1551 }
1552 }
1553
1554 if (!clusterlog_merge(cfs_status.clusterlog, clog, syncinfo->node_count, local_index)) {
1555 cfs_critical("unable to merge log files");
1556 }
1557
1558 cfs_kvstore_sync();
1559
1560 return 1;
1561}
1562
1563static int
1564dfsm_commit(
1565 dfsm_t *dfsm,
1566 gpointer data,
1567 dfsm_sync_info_t *syncinfo)
1568{
1569 g_return_val_if_fail(dfsm != NULL, -1);
1570 g_return_val_if_fail(syncinfo != NULL, -1);
1571
1572 return 1;
1573}
1574
1575static void
1576dfsm_synced(dfsm_t *dfsm)
1577{
1578 g_return_if_fail(dfsm != NULL);
1579
1580 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, cfs.nodename);
1581 if (!ip)
1582 ip = cfs.ip;
1583
1584 cfs_status_set("nodeip", ip, strlen(ip) + 1);
1585}
1586
1587static int
1588dfsm_cleanup(
1589 dfsm_t *dfsm,
1590 gpointer data,
1591 dfsm_sync_info_t *syncinfo)
1592{
1593 return 1;
1594}
1595
1596static dfsm_callbacks_t kvstore_dfsm_callbacks = {
1597 .dfsm_deliver_fn = dfsm_deliver,
1598 .dfsm_confchg_fn = dfsm_confchg,
1599
1600 .dfsm_get_state_fn = dfsm_get_state,
1601 .dfsm_process_state_update_fn = dfsm_process_state_update,
1602 .dfsm_process_update_fn = dfsm_process_update,
1603 .dfsm_commit_fn = dfsm_commit,
1604 .dfsm_cleanup_fn = dfsm_cleanup,
1605 .dfsm_synced_fn = dfsm_synced,
1606};
1607
1608dfsm_t *
1609cfs_status_dfsm_new(void)
1610{
1611 g_static_mutex_lock (&mutex);
1612
1613 cfs_status.kvstore = dfsm_new(NULL, KVSTORE_CPG_GROUP_NAME, G_LOG_DOMAIN,
1614 0, &kvstore_dfsm_callbacks);
1615 g_static_mutex_unlock (&mutex);
1616
1617 return cfs_status.kvstore;
1618}
1619
1620gboolean
1621cfs_is_quorate(void)
1622{
1623 g_static_mutex_lock (&mutex);
1624 gboolean res = cfs_status.quorate;
1625 g_static_mutex_unlock (&mutex);
1626
1627 return res;
1628}
1629
1630void
1631cfs_set_quorate(
1632 uint32_t quorate,
1633 gboolean quiet)
1634{
1635 g_static_mutex_lock (&mutex);
1636
1637 uint32_t prev_quorate = cfs_status.quorate;
1638 cfs_status.quorate = quorate;
1639
1640 if (!prev_quorate && cfs_status.quorate) {
1641 if (!quiet)
1642 cfs_message("node has quorum");
1643 }
1644
1645 if (prev_quorate && !cfs_status.quorate) {
1646 if (!quiet)
1647 cfs_message("node lost quorum");
1648 }
1649
1650 g_static_mutex_unlock (&mutex);
1651}
1652