]> git.proxmox.com Git - pve-cluster.git/blame - data/src/status.c
pmxcfs: status: refactor out vminfo_type_to_string
[pve-cluster.git] / data / src / status.c
CommitLineData
fe000966
DM
1/*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19*/
20
21#define G_LOG_DOMAIN "status"
22
23#ifdef HAVE_CONFIG_H
24#include <config.h>
25#endif /* HAVE_CONFIG_H */
26
27#include <stdio.h>
28#include <stdint.h>
29#include <string.h>
30#include <errno.h>
31#include <glib.h>
32#include <sys/syslog.h>
33#include <rrd.h>
34#include <rrd_client.h>
35#include <time.h>
36
37#include "cfs-utils.h"
38#include "status.h"
39#include "logger.h"
40
41#define KVSTORE_CPG_GROUP_NAME "pve_kvstore_v1"
42
43typedef enum {
44 KVSTORE_MESSAGE_UPDATE = 1,
45 KVSTORE_MESSAGE_UPDATE_COMPLETE = 2,
46 KVSTORE_MESSAGE_LOG = 3,
47} kvstore_message_t;
48
49static uint32_t vminfo_version_counter;
50
51typedef struct {
52 uint32_t vmid;
53 char *nodename;
54 int vmtype;
55 uint32_t version;
56} vminfo_t;
57
58typedef struct {
59 char *key;
60 gpointer data;
61 size_t len;
62 uint32_t version;
63} kventry_t;
64
65typedef struct {
66 char *key;
67 gpointer data;
68 size_t len;
69 uint32_t time;
70} rrdentry_t;
71
72typedef struct {
73 char *path;
74 uint32_t version;
75} memdb_change_t;
76
77static memdb_change_t memdb_change_array[] = {
2113d031
DM
78 { .path = "corosync.conf" },
79 { .path = "corosync.conf.new" },
fe000966
DM
80 { .path = "storage.cfg" },
81 { .path = "user.cfg" },
82 { .path = "domains.cfg" },
83 { .path = "priv/shadow.cfg" },
8a9456dc 84 { .path = "priv/tfa.cfg" },
fe000966 85 { .path = "datacenter.cfg" },
e1735a61 86 { .path = "vzdump.cron" },
5a5417e6
DM
87 { .path = "ha/crm_commands" },
88 { .path = "ha/manager_status" },
89 { .path = "ha/resources.cfg" },
90 { .path = "ha/groups.cfg" },
e9af3eb7 91 { .path = "ha/fence.cfg" },
9d4f69ff 92 { .path = "status.cfg" },
f6de131a 93 { .path = "replication.cfg" },
22e2ed76 94 { .path = "ceph.conf" },
fe000966
DM
95};
96
89fde9ac 97static GMutex mutex;
fe000966
DM
98
99typedef struct {
100 time_t start_time;
101
102 uint32_t quorate;
103
104 cfs_clinfo_t *clinfo;
105 uint32_t clinfo_version;
106
107 GHashTable *vmlist;
108 uint32_t vmlist_version;
109
110 dfsm_t *kvstore;
111 GHashTable *kvhash;
112 GHashTable *rrdhash;
113 GHashTable *iphash;
114
115 GHashTable *memdb_changes;
116
117 clusterlog_t *clusterlog;
118} cfs_status_t;
119
120static cfs_status_t cfs_status;
121
122struct cfs_clnode {
123 char *name;
124 uint32_t nodeid;
125 uint32_t votes;
126 gboolean online;
127 GHashTable *kvhash;
128};
129
130struct cfs_clinfo {
131 char *cluster_name;
132 uint32_t cman_version;
133
134 GHashTable *nodes_byid;
135 GHashTable *nodes_byname;
136};
137
138static guint
139g_int32_hash (gconstpointer v)
140{
141 return *(const uint32_t *) v;
142}
143
144static gboolean
145g_int32_equal (gconstpointer v1,
146 gconstpointer v2)
147{
148 return *((const uint32_t*) v1) == *((const uint32_t*) v2);
149}
150
151static void vminfo_free(vminfo_t *vminfo)
152{
153 g_return_if_fail(vminfo != NULL);
154
155 if (vminfo->nodename)
156 g_free(vminfo->nodename);
157
158
159 g_free(vminfo);
160}
161
c26ca647
TL
162static const char *vminfo_type_to_string(vminfo_t *vminfo) {
163 if (vminfo->vmtype == VMTYPE_QEMU) {
164 return "qemu";
165 } else if (vminfo->vmtype == VMTYPE_OPENVZ) {
166 return "openvz";
167 } else if (vminfo->vmtype == VMTYPE_LXC) {
168 return "lxc";
169 } else {
170 return "unknown";
171 }
172}
173
fe000966
DM
174void cfs_clnode_destroy(
175 cfs_clnode_t *clnode)
176{
177 g_return_if_fail(clnode != NULL);
178
179 if (clnode->kvhash)
180 g_hash_table_destroy(clnode->kvhash);
181
182 if (clnode->name)
183 g_free(clnode->name);
184
185 g_free(clnode);
186}
187
188cfs_clnode_t *cfs_clnode_new(
189 const char *name,
190 uint32_t nodeid,
191 uint32_t votes)
192{
193 g_return_val_if_fail(name != NULL, NULL);
194
195 cfs_clnode_t *clnode = g_new0(cfs_clnode_t, 1);
196 if (!clnode)
197 return NULL;
198
199 clnode->name = g_strdup(name);
200 clnode->nodeid = nodeid;
201 clnode->votes = votes;
202
203 return clnode;
204}
205
206gboolean cfs_clinfo_destroy(
207 cfs_clinfo_t *clinfo)
208{
209 g_return_val_if_fail(clinfo != NULL, FALSE);
210
211 if (clinfo->cluster_name)
212 g_free(clinfo->cluster_name);
213
214 if (clinfo->nodes_byname)
215 g_hash_table_destroy(clinfo->nodes_byname);
216
217 if (clinfo->nodes_byid)
218 g_hash_table_destroy(clinfo->nodes_byid);
219
220 g_free(clinfo);
221
222 return TRUE;
223}
224
225cfs_clinfo_t *cfs_clinfo_new(
226 const char *cluster_name,
227 uint32_t cman_version)
228{
229 g_return_val_if_fail(cluster_name != NULL, NULL);
230
231 cfs_clinfo_t *clinfo = g_new0(cfs_clinfo_t, 1);
232 if (!clinfo)
233 return NULL;
234
235 clinfo->cluster_name = g_strdup(cluster_name);
236 clinfo->cman_version = cman_version;
237
238 if (!(clinfo->nodes_byid = g_hash_table_new_full(
239 g_int32_hash, g_int32_equal, NULL,
240 (GDestroyNotify)cfs_clnode_destroy)))
241 goto fail;
242
243 if (!(clinfo->nodes_byname = g_hash_table_new(g_str_hash, g_str_equal)))
244 goto fail;
245
246 return clinfo;
247
248fail:
249 cfs_clinfo_destroy(clinfo);
250
251 return NULL;
252}
253
254gboolean cfs_clinfo_add_node(
255 cfs_clinfo_t *clinfo,
256 cfs_clnode_t *clnode)
257{
258 g_return_val_if_fail(clinfo != NULL, FALSE);
259 g_return_val_if_fail(clnode != NULL, FALSE);
260
261 g_hash_table_replace(clinfo->nodes_byid, &clnode->nodeid, clnode);
262 g_hash_table_replace(clinfo->nodes_byname, clnode->name, clnode);
263
264 return TRUE;
265}
266
267int
268cfs_create_memberlist_msg(
269 GString *str)
270{
271 g_return_val_if_fail(str != NULL, -EINVAL);
272
89fde9ac 273 g_mutex_lock (&mutex);
fe000966
DM
274
275 g_string_append_printf(str,"{\n");
276
277 guint nodecount = 0;
278
279 cfs_clinfo_t *clinfo = cfs_status.clinfo;
280
281 if (clinfo && clinfo->nodes_byid)
282 nodecount = g_hash_table_size(clinfo->nodes_byid);
283
284 if (nodecount) {
285 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
286 g_string_append_printf(str, "\"version\": %u,\n", cfs_status.clinfo_version);
287
288 g_string_append_printf(str, "\"cluster\": { ");
289 g_string_append_printf(str, "\"name\": \"%s\", \"version\": %d, "
290 "\"nodes\": %d, \"quorate\": %d ",
291 clinfo->cluster_name, clinfo->cman_version,
292 nodecount, cfs_status.quorate);
293
294 g_string_append_printf(str,"},\n");
295 g_string_append_printf(str,"\"nodelist\": {\n");
296
297 GHashTable *ht = clinfo->nodes_byid;
298 GHashTableIter iter;
299 gpointer key, value;
300
301 g_hash_table_iter_init (&iter, ht);
302
303 int i = 0;
304 while (g_hash_table_iter_next (&iter, &key, &value)) {
305 cfs_clnode_t *node = (cfs_clnode_t *)value;
306 if (i) g_string_append_printf(str, ",\n");
307 i++;
308
309 g_string_append_printf(str, " \"%s\": { \"id\": %d, \"online\": %d",
310 node->name, node->nodeid, node->online);
311
312
313 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, node->name);
314 if (ip) {
315 g_string_append_printf(str, ", \"ip\": \"%s\"", ip);
316 }
317
318 g_string_append_printf(str, "}");
319
320 }
321 g_string_append_printf(str,"\n }\n");
322 } else {
323 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
324 g_string_append_printf(str, "\"version\": %u\n", cfs_status.clinfo_version);
325 }
326
327 g_string_append_printf(str,"}\n");
328
89fde9ac 329 g_mutex_unlock (&mutex);
fe000966
DM
330
331 return 0;
332}
333
334static void
335kventry_free(kventry_t *entry)
336{
337 g_return_if_fail(entry != NULL);
338
339 g_free(entry->key);
340 g_free(entry->data);
341 g_free(entry);
342}
343
344static GHashTable *
345kventry_hash_new(void)
346{
347 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
348 (GDestroyNotify)kventry_free);
349}
350
351static void
352rrdentry_free(rrdentry_t *entry)
353{
354 g_return_if_fail(entry != NULL);
355
356 g_free(entry->key);
357 g_free(entry->data);
358 g_free(entry);
359}
360
361static GHashTable *
362rrdentry_hash_new(void)
363{
364 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
365 (GDestroyNotify)rrdentry_free);
366}
367
368void
369cfs_cluster_log_dump(GString *str, const char *user, guint max_entries)
370{
371 clusterlog_dump(cfs_status.clusterlog, str, user, max_entries);
372}
373
374void
375cfs_cluster_log(clog_entry_t *entry)
376{
377 g_return_if_fail(entry != NULL);
378
379 clusterlog_insert(cfs_status.clusterlog, entry);
380
381 if (cfs_status.kvstore) {
382 struct iovec iov[1];
383 iov[0].iov_base = (char *)entry;
384 iov[0].iov_len = clog_entry_size(entry);
385
af2e9dd4
DM
386 if (dfsm_is_initialized(cfs_status.kvstore))
387 dfsm_send_message(cfs_status.kvstore, KVSTORE_MESSAGE_LOG, iov, 1);
fe000966
DM
388 }
389}
390
391void cfs_status_init(void)
392{
89fde9ac 393 g_mutex_lock (&mutex);
fe000966
DM
394
395 cfs_status.start_time = time(NULL);
396
397 cfs_status.vmlist = vmlist_hash_new();
398
399 cfs_status.kvhash = kventry_hash_new();
400
401 cfs_status.rrdhash = rrdentry_hash_new();
402
403 cfs_status.iphash = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
404
405 cfs_status.memdb_changes = g_hash_table_new(g_str_hash, g_str_equal);
406
407 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
408 g_hash_table_replace(cfs_status.memdb_changes,
409 memdb_change_array[i].path,
410 &memdb_change_array[i]);
411 }
412
413 cfs_status.clusterlog = clusterlog_new();
414
415 // fixme:
416 clusterlog_add(cfs_status.clusterlog, "root", "cluster", getpid(),
417 LOG_INFO, "starting cluster log");
418
89fde9ac 419 g_mutex_unlock (&mutex);
fe000966
DM
420}
421
422void cfs_status_cleanup(void)
423{
89fde9ac 424 g_mutex_lock (&mutex);
fe000966
DM
425
426 cfs_status.clinfo_version++;
427
428 if (cfs_status.clinfo) {
429 cfs_clinfo_destroy(cfs_status.clinfo);
430 cfs_status.clinfo = NULL;
431 }
432
433 if (cfs_status.vmlist) {
434 g_hash_table_destroy(cfs_status.vmlist);
435 cfs_status.vmlist = NULL;
436 }
437
438 if (cfs_status.kvhash) {
439 g_hash_table_destroy(cfs_status.kvhash);
440 cfs_status.kvhash = NULL;
441 }
442
443 if (cfs_status.rrdhash) {
444 g_hash_table_destroy(cfs_status.rrdhash);
445 cfs_status.rrdhash = NULL;
446 }
447
448 if (cfs_status.iphash) {
449 g_hash_table_destroy(cfs_status.iphash);
450 cfs_status.iphash = NULL;
451 }
452
453 if (cfs_status.clusterlog)
454 clusterlog_destroy(cfs_status.clusterlog);
455
89fde9ac 456 g_mutex_unlock (&mutex);
fe000966
DM
457}
458
459void cfs_status_set_clinfo(
460 cfs_clinfo_t *clinfo)
461{
462 g_return_if_fail(clinfo != NULL);
463
89fde9ac 464 g_mutex_lock (&mutex);
fe000966
DM
465
466 cfs_status.clinfo_version++;
467
468 cfs_clinfo_t *old = cfs_status.clinfo;
469
470 cfs_status.clinfo = clinfo;
471
472 cfs_message("update cluster info (cluster name %s, version = %d)",
473 clinfo->cluster_name, clinfo->cman_version);
474
475
476 if (old && old->nodes_byid && clinfo->nodes_byid) {
477 /* copy kvstore */
478 GHashTable *ht = clinfo->nodes_byid;
479 GHashTableIter iter;
480 gpointer key, value;
481
482 g_hash_table_iter_init (&iter, ht);
483
484 while (g_hash_table_iter_next (&iter, &key, &value)) {
485 cfs_clnode_t *node = (cfs_clnode_t *)value;
486 cfs_clnode_t *oldnode;
469d3acb 487 if ((oldnode = g_hash_table_lookup(old->nodes_byid, key))) {
fe000966
DM
488 node->online = oldnode->online;
489 node->kvhash = oldnode->kvhash;
490 oldnode->kvhash = NULL;
491 }
492 }
493
494 }
495
496 if (old)
497 cfs_clinfo_destroy(old);
498
499
89fde9ac 500 g_mutex_unlock (&mutex);
fe000966
DM
501}
502
503static void
504dump_kvstore_versions(
505 GString *str,
506 GHashTable *kvhash,
507 const char *nodename)
508{
509 g_return_if_fail(kvhash != NULL);
510 g_return_if_fail(str != NULL);
511 g_return_if_fail(nodename != NULL);
512
513 GHashTable *ht = kvhash;
514 GHashTableIter iter;
515 gpointer key, value;
516
517 g_string_append_printf(str, "\"%s\": {\n", nodename);
518
519 g_hash_table_iter_init (&iter, ht);
520
521 int i = 0;
522 while (g_hash_table_iter_next (&iter, &key, &value)) {
523 kventry_t *entry = (kventry_t *)value;
524 if (i) g_string_append_printf(str, ",\n");
525 i++;
526 g_string_append_printf(str,"\"%s\": %u", entry->key, entry->version);
527 }
528
529 g_string_append_printf(str, "}\n");
530}
531
532int
533cfs_create_version_msg(GString *str)
534{
535 g_return_val_if_fail(str != NULL, -EINVAL);
536
89fde9ac 537 g_mutex_lock (&mutex);
fe000966
DM
538
539 g_string_append_printf(str,"{\n");
540
541 g_string_append_printf(str, "\"starttime\": %lu,\n", (unsigned long)cfs_status.start_time);
542
543 g_string_append_printf(str, "\"clinfo\": %u,\n", cfs_status.clinfo_version);
544
545 g_string_append_printf(str, "\"vmlist\": %u,\n", cfs_status.vmlist_version);
546
547 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
548 g_string_append_printf(str, "\"%s\": %u,\n",
549 memdb_change_array[i].path,
550 memdb_change_array[i].version);
551 }
552
553 g_string_append_printf(str, "\"kvstore\": {\n");
554
555 dump_kvstore_versions(str, cfs_status.kvhash, cfs.nodename);
556
557 cfs_clinfo_t *clinfo = cfs_status.clinfo;
558
559 if (clinfo && clinfo->nodes_byid) {
560 GHashTable *ht = clinfo->nodes_byid;
561 GHashTableIter iter;
562 gpointer key, value;
563
564 g_hash_table_iter_init (&iter, ht);
565
566 while (g_hash_table_iter_next (&iter, &key, &value)) {
567 cfs_clnode_t *node = (cfs_clnode_t *)value;
568 if (!node->kvhash)
569 continue;
570 g_string_append_printf(str, ",\n");
571 dump_kvstore_versions(str, node->kvhash, node->name);
572 }
573 }
574
575 g_string_append_printf(str,"}\n");
576
577 g_string_append_printf(str,"}\n");
578
89fde9ac 579 g_mutex_unlock (&mutex);
fe000966
DM
580
581 return 0;
582}
583
584GHashTable *
585vmlist_hash_new(void)
586{
587 return g_hash_table_new_full(g_int_hash, g_int_equal, NULL,
588 (GDestroyNotify)vminfo_free);
589}
590
591gboolean
592vmlist_hash_insert_vm(
593 GHashTable *vmlist,
594 int vmtype,
595 guint32 vmid,
596 const char *nodename,
597 gboolean replace)
598{
599 g_return_val_if_fail(vmlist != NULL, FALSE);
600 g_return_val_if_fail(nodename != NULL, FALSE);
601 g_return_val_if_fail(vmid != 0, FALSE);
7f66b436
DM
602 g_return_val_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
603 vmtype == VMTYPE_LXC, FALSE);
fe000966
DM
604
605 if (!replace && g_hash_table_lookup(vmlist, &vmid)) {
606 cfs_critical("detected duplicate VMID %d", vmid);
607 return FALSE;
608 }
609
610 vminfo_t *vminfo = g_new0(vminfo_t, 1);
611
612 vminfo->vmid = vmid;
613 vminfo->vmtype = vmtype;
614 vminfo->nodename = g_strdup(nodename);
615
616 vminfo->version = ++vminfo_version_counter;
617
618 g_hash_table_replace(vmlist, &vminfo->vmid, vminfo);
619
620 return TRUE;
621}
622
623void
624vmlist_register_vm(
625 int vmtype,
626 guint32 vmid,
627 const char *nodename)
628{
629 g_return_if_fail(cfs_status.vmlist != NULL);
630 g_return_if_fail(nodename != NULL);
631 g_return_if_fail(vmid != 0);
7f66b436
DM
632 g_return_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
633 vmtype == VMTYPE_LXC);
fe000966
DM
634
635 cfs_debug("vmlist_register_vm: %s/%u %d", nodename, vmid, vmtype);
636
89fde9ac 637 g_mutex_lock (&mutex);
fe000966
DM
638
639 cfs_status.vmlist_version++;
640
641 vmlist_hash_insert_vm(cfs_status.vmlist, vmtype, vmid, nodename, TRUE);
642
89fde9ac 643 g_mutex_unlock (&mutex);
fe000966
DM
644}
645
646gboolean
647vmlist_different_vm_exists(
648 int vmtype,
649 guint32 vmid,
650 const char *nodename)
651{
652 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
653 g_return_val_if_fail(vmid != 0, FALSE);
654
655 gboolean res = FALSE;
656
89fde9ac 657 g_mutex_lock (&mutex);
fe000966
DM
658
659 vminfo_t *vminfo;
660 if ((vminfo = (vminfo_t *)g_hash_table_lookup(cfs_status.vmlist, &vmid))) {
661 if (!(vminfo->vmtype == vmtype && strcmp(vminfo->nodename, nodename) == 0))
662 res = TRUE;
663 }
89fde9ac 664 g_mutex_unlock (&mutex);
fe000966
DM
665
666 return res;
667}
668
669gboolean
670vmlist_vm_exists(
671 guint32 vmid)
672{
673 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
674 g_return_val_if_fail(vmid != 0, FALSE);
675
89fde9ac 676 g_mutex_lock (&mutex);
fe000966
DM
677
678 gpointer res = g_hash_table_lookup(cfs_status.vmlist, &vmid);
679
89fde9ac 680 g_mutex_unlock (&mutex);
fe000966
DM
681
682 return res != NULL;
683}
684
685void
686vmlist_delete_vm(
687 guint32 vmid)
688{
689 g_return_if_fail(cfs_status.vmlist != NULL);
690 g_return_if_fail(vmid != 0);
691
89fde9ac 692 g_mutex_lock (&mutex);
fe000966
DM
693
694 cfs_status.vmlist_version++;
695
696 g_hash_table_remove(cfs_status.vmlist, &vmid);
697
89fde9ac 698 g_mutex_unlock (&mutex);
fe000966
DM
699}
700
701void cfs_status_set_vmlist(
702 GHashTable *vmlist)
703{
704 g_return_if_fail(vmlist != NULL);
705
89fde9ac 706 g_mutex_lock (&mutex);
fe000966
DM
707
708 cfs_status.vmlist_version++;
709
710 if (cfs_status.vmlist)
711 g_hash_table_destroy(cfs_status.vmlist);
712
713 cfs_status.vmlist = vmlist;
714
89fde9ac 715 g_mutex_unlock (&mutex);
fe000966
DM
716}
717
718int
719cfs_create_vmlist_msg(GString *str)
720{
721 g_return_val_if_fail(cfs_status.vmlist != NULL, -EINVAL);
722 g_return_val_if_fail(str != NULL, -EINVAL);
723
89fde9ac 724 g_mutex_lock (&mutex);
fe000966
DM
725
726 g_string_append_printf(str,"{\n");
727
728 GHashTable *ht = cfs_status.vmlist;
729
730 guint count = g_hash_table_size(ht);
731
732 if (!count) {
733 g_string_append_printf(str,"\"version\": %u\n", cfs_status.vmlist_version);
734 } else {
735 g_string_append_printf(str,"\"version\": %u,\n", cfs_status.vmlist_version);
736
737 g_string_append_printf(str,"\"ids\": {\n");
738
739 GHashTableIter iter;
740 gpointer key, value;
741
742 g_hash_table_iter_init (&iter, ht);
743
744 int first = 1;
745 while (g_hash_table_iter_next (&iter, &key, &value)) {
746 vminfo_t *vminfo = (vminfo_t *)value;
c26ca647 747 const char *type = vminfo_type_to_string(vminfo);
fe000966
DM
748
749 if (!first)
750 g_string_append_printf(str, ",\n");
751 first = 0;
752
753 g_string_append_printf(str,"\"%u\": { \"node\": \"%s\", \"type\": \"%s\", \"version\": %u }",
754 vminfo->vmid, vminfo->nodename, type, vminfo->version);
755 }
756
757 g_string_append_printf(str,"}\n");
758 }
759 g_string_append_printf(str,"\n}\n");
760
89fde9ac 761 g_mutex_unlock (&mutex);
fe000966
DM
762
763 return 0;
764}
765
766void
767record_memdb_change(const char *path)
768{
769 g_return_if_fail(cfs_status.memdb_changes != 0);
770
771 memdb_change_t *ce;
772
773 if ((ce = (memdb_change_t *)g_hash_table_lookup(cfs_status.memdb_changes, path))) {
774 ce->version++;
775 }
776}
777
778void
779record_memdb_reload(void)
780{
781 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
782 memdb_change_array[i].version++;
783 }
784}
785
786static gboolean
787kventry_hash_set(
788 GHashTable *kvhash,
789 const char *key,
790 gconstpointer data,
791 size_t len)
792{
793 g_return_val_if_fail(kvhash != NULL, FALSE);
794 g_return_val_if_fail(key != NULL, FALSE);
795 g_return_val_if_fail(data != NULL, FALSE);
796
797 kventry_t *entry;
71cc17bc
DC
798 if (!len) {
799 g_hash_table_remove(kvhash, key);
800 } else if ((entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
fe000966
DM
801 g_free(entry->data);
802 entry->data = g_memdup(data, len);
803 entry->len = len;
804 entry->version++;
805 } else {
806 kventry_t *entry = g_new0(kventry_t, 1);
807
808 entry->key = g_strdup(key);
809 entry->data = g_memdup(data, len);
810 entry->len = len;
811
812 g_hash_table_replace(kvhash, entry->key, entry);
813 }
814
815 return TRUE;
816}
817
818static const char *rrd_def_node[] = {
819 "DS:loadavg:GAUGE:120:0:U",
820 "DS:maxcpu:GAUGE:120:0:U",
821 "DS:cpu:GAUGE:120:0:U",
822 "DS:iowait:GAUGE:120:0:U",
823 "DS:memtotal:GAUGE:120:0:U",
824 "DS:memused:GAUGE:120:0:U",
825 "DS:swaptotal:GAUGE:120:0:U",
826 "DS:swapused:GAUGE:120:0:U",
827 "DS:roottotal:GAUGE:120:0:U",
828 "DS:rootused:GAUGE:120:0:U",
764296f1
DM
829 "DS:netin:DERIVE:120:0:U",
830 "DS:netout:DERIVE:120:0:U",
fe000966
DM
831
832 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
833 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
834 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
835 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
836 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
837
838 "RRA:MAX:0.5:1:70", // 1 min max - one hour
839 "RRA:MAX:0.5:30:70", // 30 min max - one day
840 "RRA:MAX:0.5:180:70", // 3 hour max - one week
841 "RRA:MAX:0.5:720:70", // 12 hour max - one month
842 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
843 NULL,
844};
845
846static const char *rrd_def_vm[] = {
847 "DS:maxcpu:GAUGE:120:0:U",
848 "DS:cpu:GAUGE:120:0:U",
849 "DS:maxmem:GAUGE:120:0:U",
850 "DS:mem:GAUGE:120:0:U",
851 "DS:maxdisk:GAUGE:120:0:U",
852 "DS:disk:GAUGE:120:0:U",
764296f1
DM
853 "DS:netin:DERIVE:120:0:U",
854 "DS:netout:DERIVE:120:0:U",
855 "DS:diskread:DERIVE:120:0:U",
856 "DS:diskwrite:DERIVE:120:0:U",
fe000966
DM
857
858 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
859 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
860 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
861 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
862 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
863
864 "RRA:MAX:0.5:1:70", // 1 min max - one hour
865 "RRA:MAX:0.5:30:70", // 30 min max - one day
866 "RRA:MAX:0.5:180:70", // 3 hour max - one week
867 "RRA:MAX:0.5:720:70", // 12 hour max - one month
868 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
869 NULL,
870};
871
872static const char *rrd_def_storage[] = {
873 "DS:total:GAUGE:120:0:U",
874 "DS:used:GAUGE:120:0:U",
875
876 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
877 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
878 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
879 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
880 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
881
882 "RRA:MAX:0.5:1:70", // 1 min max - one hour
883 "RRA:MAX:0.5:30:70", // 30 min max - one day
884 "RRA:MAX:0.5:180:70", // 3 hour max - one week
885 "RRA:MAX:0.5:720:70", // 12 hour max - one month
886 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
887 NULL,
888};
889
890#define RRDDIR "/var/lib/rrdcached/db"
891
892static void
893create_rrd_file(
894 const char *filename,
895 int argcount,
896 const char *rrddef[])
897{
898 /* start at day boundary */
899 time_t ctime;
900 time(&ctime);
901 struct tm *ltm = localtime(&ctime);
902 ltm->tm_sec = 0;
903 ltm->tm_min = 0;
904 ltm->tm_hour = 0;
905
906 rrd_clear_error();
907 if (rrd_create_r(filename, 60, timelocal(ltm), argcount, rrddef)) {
908 cfs_message("RRD create error %s: %s", filename, rrd_get_error());
909 }
910}
911
912static inline const char *
913rrd_skip_data(
914 const char *data,
915 int count)
916{
917 int found = 0;
918 while (*data && found < count) {
919 if (*data++ == ':')
920 found++;
921 }
922 return data;
923}
924
925static void
926update_rrd_data(
927 const char *key,
928 gconstpointer data,
929 size_t len)
930{
931 g_return_if_fail(key != NULL);
932 g_return_if_fail(data != NULL);
933 g_return_if_fail(len > 0);
934 g_return_if_fail(len < 4096);
935
936 static const char *rrdcsock = "unix:/var/run/rrdcached.sock";
937
938 int use_daemon = 1;
939 if (rrdc_connect(rrdcsock) != 0)
940 use_daemon = 0;
941
ba9dcfc1 942 char *filename = NULL;
fe000966
DM
943
944 int skip = 0;
945
946 if (strncmp(key, "pve2-node/", 10) == 0) {
947 const char *node = key + 10;
948
f9c865a8 949 skip = 2;
fe000966
DM
950
951 if (strchr(node, '/') != NULL)
952 goto keyerror;
953
954 if (strlen(node) < 1)
955 goto keyerror;
956
ba9dcfc1
DM
957 filename = g_strdup_printf(RRDDIR "/%s", key);
958
fe000966
DM
959 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
960
961 mkdir(RRDDIR "/pve2-node", 0755);
962 int argcount = sizeof(rrd_def_node)/sizeof(void*) - 1;
963 create_rrd_file(filename, argcount, rrd_def_node);
964 }
965
ba9dcfc1
DM
966 } else if ((strncmp(key, "pve2-vm/", 8) == 0) ||
967 (strncmp(key, "pve2.3-vm/", 10) == 0)) {
968 const char *vmid;
fe000966 969
ba9dcfc1
DM
970 if (strncmp(key, "pve2-vm/", 8) == 0) {
971 vmid = key + 8;
972 skip = 2;
973 } else {
974 vmid = key + 10;
94e4cba3 975 skip = 4;
ba9dcfc1 976 }
fe000966
DM
977
978 if (strchr(vmid, '/') != NULL)
979 goto keyerror;
980
981 if (strlen(vmid) < 1)
982 goto keyerror;
983
ba9dcfc1
DM
984 filename = g_strdup_printf(RRDDIR "/%s/%s", "pve2-vm", vmid);
985
fe000966
DM
986 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
987
988 mkdir(RRDDIR "/pve2-vm", 0755);
989 int argcount = sizeof(rrd_def_vm)/sizeof(void*) - 1;
990 create_rrd_file(filename, argcount, rrd_def_vm);
991 }
992
993 } else if (strncmp(key, "pve2-storage/", 13) == 0) {
994 const char *node = key + 13;
995
996 const char *storage = node;
997 while (*storage && *storage != '/')
998 storage++;
999
1000 if (*storage != '/' || ((storage - node) < 1))
1001 goto keyerror;
1002
1003 storage++;
1004
1005 if (strchr(storage, '/') != NULL)
1006 goto keyerror;
1007
1008 if (strlen(storage) < 1)
1009 goto keyerror;
1010
ba9dcfc1
DM
1011 filename = g_strdup_printf(RRDDIR "/%s", key);
1012
fe000966
DM
1013 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1014
1015 mkdir(RRDDIR "/pve2-storage", 0755);
1016
1017 char *dir = g_path_get_dirname(filename);
1018 mkdir(dir, 0755);
1019 g_free(dir);
1020
1021 int argcount = sizeof(rrd_def_storage)/sizeof(void*) - 1;
1022 create_rrd_file(filename, argcount, rrd_def_storage);
1023 }
1024
1025 } else {
1026 goto keyerror;
1027 }
1028
1029 const char *dp = skip ? rrd_skip_data(data, skip) : data;
1030
1031 const char *update_args[] = { dp, NULL };
1032
1033 if (use_daemon) {
1034 int status;
1035 if ((status = rrdc_update(filename, 1, update_args)) != 0) {
1036 cfs_message("RRDC update error %s: %d", filename, status);
1037 rrdc_disconnect();
1038 rrd_clear_error();
1039 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1040 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1041 }
1042 }
1043
1044 } else {
1045 rrd_clear_error();
1046 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1047 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1048 }
1049 }
1050
1051ret:
ba9dcfc1
DM
1052 if (filename)
1053 g_free(filename);
1054
fe000966
DM
1055 return;
1056
1057keyerror:
1058 cfs_critical("RRD update error: unknown/wrong key %s", key);
1059 goto ret;
1060}
1061
1062static gboolean
1063rrd_entry_is_old(
1064 gpointer key,
1065 gpointer value,
1066 gpointer user_data)
1067{
1068 rrdentry_t *entry = (rrdentry_t *)value;
1069 uint32_t ctime = GPOINTER_TO_UINT(user_data);
1070
1071 int diff = ctime - entry->time;
1072
1073 /* remove everything older than 5 minutes */
1074 int expire = 60*5;
1075
1076 return (diff > expire) ? TRUE : FALSE;
1077}
1078
1079static char *rrd_dump_buf = NULL;
1080static time_t rrd_dump_last = 0;
1081
1082void
1083cfs_rrd_dump(GString *str)
1084{
1085 time_t ctime;
fe000966 1086
663896cf
TL
1087 g_mutex_lock (&mutex);
1088
1089 time(&ctime);
fe000966
DM
1090 if (rrd_dump_buf && (ctime - rrd_dump_last) < 2) {
1091 g_string_assign(str, rrd_dump_buf);
663896cf 1092 g_mutex_unlock (&mutex);
fe000966
DM
1093 return;
1094 }
1095
1096 /* remove old data */
1097 g_hash_table_foreach_remove(cfs_status.rrdhash, rrd_entry_is_old,
1098 GUINT_TO_POINTER(ctime));
1099
1100 g_string_set_size(str, 0);
1101
1102 GHashTableIter iter;
1103 gpointer key, value;
1104
1105 g_hash_table_iter_init (&iter, cfs_status.rrdhash);
1106
1107 while (g_hash_table_iter_next (&iter, &key, &value)) {
1108 rrdentry_t *entry = (rrdentry_t *)value;
1109 g_string_append(str, key);
1110 g_string_append(str, ":");
1111 g_string_append(str, entry->data);
1112 g_string_append(str, "\n");
1113 }
1114
1115 g_string_append_c(str, 0); // never return undef
1116
1117 rrd_dump_last = ctime;
1118 if (rrd_dump_buf)
1119 g_free(rrd_dump_buf);
1120 rrd_dump_buf = g_strdup(str->str);
663896cf
TL
1121
1122 g_mutex_unlock (&mutex);
fe000966
DM
1123}
1124
1125static gboolean
1126nodeip_hash_set(
1127 GHashTable *iphash,
1128 const char *nodename,
1129 const char *ip,
1130 size_t len)
1131{
1132 g_return_val_if_fail(iphash != NULL, FALSE);
1133 g_return_val_if_fail(nodename != NULL, FALSE);
1134 g_return_val_if_fail(ip != NULL, FALSE);
1135 g_return_val_if_fail(len > 0, FALSE);
1136 g_return_val_if_fail(len < 256, FALSE);
1137 g_return_val_if_fail(ip[len-1] == 0, FALSE);
1138
1139 char *oldip = (char *)g_hash_table_lookup(iphash, nodename);
1140
1141 if (!oldip || (strcmp(oldip, ip) != 0)) {
1142 cfs_status.clinfo_version++;
1143 g_hash_table_replace(iphash, g_strdup(nodename), g_strdup(ip));
1144 }
1145
1146 return TRUE;
1147}
1148
1149static gboolean
1150rrdentry_hash_set(
1151 GHashTable *rrdhash,
1152 const char *key,
1153 const char *data,
1154 size_t len)
1155{
1156 g_return_val_if_fail(rrdhash != NULL, FALSE);
1157 g_return_val_if_fail(key != NULL, FALSE);
1158 g_return_val_if_fail(data != NULL, FALSE);
1159 g_return_val_if_fail(len > 0, FALSE);
1160 g_return_val_if_fail(len < 4096, FALSE);
1161 g_return_val_if_fail(data[len-1] == 0, FALSE);
1162
1163 rrdentry_t *entry;
1164 if ((entry = (rrdentry_t *)g_hash_table_lookup(rrdhash, key))) {
1165 g_free(entry->data);
1166 entry->data = g_memdup(data, len);
1167 entry->len = len;
1168 entry->time = time(NULL);
1169 } else {
1170 rrdentry_t *entry = g_new0(rrdentry_t, 1);
1171
1172 entry->key = g_strdup(key);
1173 entry->data = g_memdup(data, len);
1174 entry->len = len;
1175 entry->time = time(NULL);
1176
1177 g_hash_table_replace(rrdhash, entry->key, entry);
1178 }
1179
1180 update_rrd_data(key, data, len);
1181
1182 return TRUE;
1183}
1184
1185static int
1186kvstore_send_update_message(
1187 dfsm_t *dfsm,
1188 const char *key,
1189 gpointer data,
1190 guint32 len)
1191{
af2e9dd4
DM
1192 if (!dfsm_is_initialized(dfsm))
1193 return -EACCES;
fe000966
DM
1194
1195 struct iovec iov[2];
1196
1197 char name[256];
1198 g_strlcpy(name, key, sizeof(name));
1199
1200 iov[0].iov_base = &name;
1201 iov[0].iov_len = sizeof(name);
1202
1203 iov[1].iov_base = (char *)data;
1204 iov[1].iov_len = len;
1205
1206 if (dfsm_send_message(dfsm, KVSTORE_MESSAGE_UPDATE, iov, 2) == CS_OK)
1207 return 0;
1208
1209 return -EACCES;
1210}
1211
1212static clog_entry_t *
1213kvstore_parse_log_message(
1214 const void *msg,
1215 size_t msg_len)
1216{
1217 g_return_val_if_fail(msg != NULL, NULL);
1218
1219 if (msg_len < sizeof(clog_entry_t)) {
e5a5a3ea 1220 cfs_critical("received short log message (%zu < %zu)", msg_len, sizeof(clog_entry_t));
fe000966
DM
1221 return NULL;
1222 }
1223
1224 clog_entry_t *entry = (clog_entry_t *)msg;
1225
1226 uint32_t size = sizeof(clog_entry_t) + entry->node_len +
1227 entry->ident_len + entry->tag_len + entry->msg_len;
1228
1229 if (msg_len != size) {
e5a5a3ea 1230 cfs_critical("received log message with wrong size (%zu != %u)", msg_len, size);
fe000966
DM
1231 return NULL;
1232 }
1233
1234 msg = entry->data;
1235
1236 if (*((char *)msg + entry->node_len - 1)) {
1237 cfs_critical("unterminated string in log message");
1238 return NULL;
1239 }
1240 msg += entry->node_len;
1241
1242 if (*((char *)msg + entry->ident_len - 1)) {
1243 cfs_critical("unterminated string in log message");
1244 return NULL;
1245 }
1246 msg += entry->ident_len;
1247
1248 if (*((char *)msg + entry->tag_len - 1)) {
1249 cfs_critical("unterminated string in log message");
1250 return NULL;
1251 }
1252 msg += entry->tag_len;
1253
1254 if (*((char *)msg + entry->msg_len - 1)) {
1255 cfs_critical("unterminated string in log message");
1256 return NULL;
1257 }
1258
1259 return entry;
1260}
1261
1262static gboolean
1263kvstore_parse_update_message(
1264 const void *msg,
1265 size_t msg_len,
1266 const char **key,
1267 gconstpointer *data,
1268 guint32 *len)
1269{
1270 g_return_val_if_fail(msg != NULL, FALSE);
1271 g_return_val_if_fail(key != NULL, FALSE);
1272 g_return_val_if_fail(data != NULL, FALSE);
1273 g_return_val_if_fail(len != NULL, FALSE);
1274
1275 if (msg_len < 256) {
e5a5a3ea 1276 cfs_critical("received short kvstore message (%zu < 256)", msg_len);
fe000966
DM
1277 return FALSE;
1278 }
1279
1280 /* test if key is null terminated */
1281 int i = 0;
1282 for (i = 0; i < 256; i++)
1283 if (((char *)msg)[i] == 0)
1284 break;
1285
1286 if (i == 256)
1287 return FALSE;
1288
1289
1290 *len = msg_len - 256;
1291 *key = msg;
1292 *data = msg + 256;
1293
1294 return TRUE;
1295}
1296
1297int
1298cfs_create_status_msg(
1299 GString *str,
1300 const char *nodename,
1301 const char *key)
1302{
1303 g_return_val_if_fail(str != NULL, -EINVAL);
1304 g_return_val_if_fail(key != NULL, -EINVAL);
1305
1306 int res = -ENOENT;
1307
1308 GHashTable *kvhash = NULL;
1309
89fde9ac 1310 g_mutex_lock (&mutex);
fe000966
DM
1311
1312 if (!nodename || !nodename[0] || !strcmp(nodename, cfs.nodename)) {
1313 kvhash = cfs_status.kvhash;
043bbd8f 1314 } else if (cfs_status.clinfo && cfs_status.clinfo->nodes_byname) {
fe000966
DM
1315 cfs_clnode_t *clnode;
1316 if ((clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byname, nodename)))
1317 kvhash = clnode->kvhash;
1318 }
1319
1320 kventry_t *entry;
1321 if (kvhash && (entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
1322 g_string_append_len(str, entry->data, entry->len);
1323 res = 0;
1324 }
1325
89fde9ac 1326 g_mutex_unlock (&mutex);
fe000966
DM
1327
1328 return res;
1329}
1330
1331int
1332cfs_status_set(
1333 const char *key,
1334 gpointer data,
1335 size_t len)
1336{
1337 g_return_val_if_fail(key != NULL, FALSE);
1338 g_return_val_if_fail(data != NULL, FALSE);
1339 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1340
1341 if (len > CFS_MAX_STATUS_SIZE)
1342 return -EFBIG;
1343
89fde9ac 1344 g_mutex_lock (&mutex);
fe000966
DM
1345
1346 gboolean res;
1347
1348 if (strncmp(key, "rrd/", 4) == 0) {
1349 res = rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1350 } else if (!strcmp(key, "nodeip")) {
1351 res = nodeip_hash_set(cfs_status.iphash, cfs.nodename, data, len);
1352 } else {
1353 res = kventry_hash_set(cfs_status.kvhash, key, data, len);
1354 }
89fde9ac 1355 g_mutex_unlock (&mutex);
fe000966
DM
1356
1357 if (cfs_status.kvstore)
1358 kvstore_send_update_message(cfs_status.kvstore, key, data, len);
1359
1360 return res ? 0 : -ENOMEM;
1361}
1362
1363gboolean
1364cfs_kvstore_node_set(
1365 uint32_t nodeid,
1366 const char *key,
1367 gconstpointer data,
1368 size_t len)
1369{
1370 g_return_val_if_fail(nodeid != 0, FALSE);
1371 g_return_val_if_fail(key != NULL, FALSE);
1372 g_return_val_if_fail(data != NULL, FALSE);
1373
89fde9ac 1374 g_mutex_lock (&mutex);
fe000966
DM
1375
1376 if (!cfs_status.clinfo || !cfs_status.clinfo->nodes_byid)
1377 goto ret; /* ignore */
1378
1379 cfs_clnode_t *clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byid, &nodeid);
1380 if (!clnode)
1381 goto ret; /* ignore */
1382
1383 cfs_debug("got node %d status update %s", nodeid, key);
1384
1385 if (strncmp(key, "rrd/", 4) == 0) {
1386 rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1387 } else if (!strcmp(key, "nodeip")) {
1388 nodeip_hash_set(cfs_status.iphash, clnode->name, data, len);
1389 } else {
1390 if (!clnode->kvhash) {
1391 if (!(clnode->kvhash = kventry_hash_new())) {
1392 goto ret; /*ignore */
1393 }
1394 }
1395
1396 kventry_hash_set(clnode->kvhash, key, data, len);
1397
1398 }
1399ret:
89fde9ac 1400 g_mutex_unlock (&mutex);
fe000966
DM
1401
1402 return TRUE;
1403}
1404
1405static gboolean
1406cfs_kvstore_sync(void)
1407{
1408 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1409 g_return_val_if_fail(cfs_status.kvstore != NULL, FALSE);
1410
1411 gboolean res = TRUE;
1412
89fde9ac 1413 g_mutex_lock (&mutex);
fe000966
DM
1414
1415 GHashTable *ht = cfs_status.kvhash;
1416 GHashTableIter iter;
1417 gpointer key, value;
1418
1419 g_hash_table_iter_init (&iter, ht);
1420
1421 while (g_hash_table_iter_next (&iter, &key, &value)) {
1422 kventry_t *entry = (kventry_t *)value;
1423 kvstore_send_update_message(cfs_status.kvstore, entry->key, entry->data, entry->len);
1424 }
1425
89fde9ac 1426 g_mutex_unlock (&mutex);
fe000966
DM
1427
1428 return res;
1429}
1430
1431static int
1432dfsm_deliver(
1433 dfsm_t *dfsm,
1434 gpointer data,
1435 int *res_ptr,
1436 uint32_t nodeid,
1437 uint32_t pid,
1438 uint16_t msg_type,
1439 uint32_t msg_time,
1440 const void *msg,
1441 size_t msg_len)
1442{
1443 g_return_val_if_fail(dfsm != NULL, -1);
1444 g_return_val_if_fail(msg != NULL, -1);
1445 g_return_val_if_fail(res_ptr != NULL, -1);
1446
1447 /* ignore message for ourself */
1448 if (dfsm_nodeid_is_local(dfsm, nodeid, pid))
1449 goto ret;
1450
1451 if (msg_type == KVSTORE_MESSAGE_UPDATE) {
1452 const char *key;
1453 gconstpointer data;
1454 guint32 len;
1455 if (kvstore_parse_update_message(msg, msg_len, &key, &data, &len)) {
1456 cfs_kvstore_node_set(nodeid, key, data, len);
1457 } else {
1458 cfs_critical("cant parse update message");
1459 }
1460 } else if (msg_type == KVSTORE_MESSAGE_LOG) {
1461 cfs_message("received log"); // fixme: remove
1462 const clog_entry_t *entry;
1463 if ((entry = kvstore_parse_log_message(msg, msg_len))) {
1464 clusterlog_insert(cfs_status.clusterlog, entry);
1465 } else {
1466 cfs_critical("cant parse log message");
1467 }
1468 } else {
1469 cfs_critical("received unknown message type %d\n", msg_type);
1470 goto fail;
1471 }
1472
1473ret:
1474 *res_ptr = 0;
1475 return 1;
1476
1477fail:
1478 *res_ptr = -EACCES;
1479 return 1;
1480}
1481
1482static void
1483dfsm_confchg(
1484 dfsm_t *dfsm,
1485 gpointer data,
1486 const struct cpg_address *member_list,
1487 size_t member_list_entries)
1488{
1489 g_return_if_fail(dfsm != NULL);
1490 g_return_if_fail(member_list != NULL);
1491
1492 cfs_debug("enter %s", __func__);
1493
89fde9ac 1494 g_mutex_lock (&mutex);
fe000966
DM
1495
1496 cfs_clinfo_t *clinfo = cfs_status.clinfo;
1497
1498 if (clinfo && clinfo->nodes_byid) {
1499
1500 GHashTable *ht = clinfo->nodes_byid;
1501 GHashTableIter iter;
1502 gpointer key, value;
1503
1504 g_hash_table_iter_init (&iter, ht);
1505
1506 while (g_hash_table_iter_next (&iter, &key, &value)) {
1507 cfs_clnode_t *node = (cfs_clnode_t *)value;
1508 node->online = FALSE;
1509 }
1510
1511 for (int i = 0; i < member_list_entries; i++) {
1512 cfs_clnode_t *node;
1513 if ((node = g_hash_table_lookup(clinfo->nodes_byid, &member_list[i].nodeid))) {
1514 node->online = TRUE;
1515 }
1516 }
1517
1518 cfs_status.clinfo_version++;
1519 }
1520
89fde9ac 1521 g_mutex_unlock (&mutex);
fe000966
DM
1522}
1523
1524static gpointer
1525dfsm_get_state(
1526 dfsm_t *dfsm,
1527 gpointer data,
1528 unsigned int *res_len)
1529{
1530 g_return_val_if_fail(dfsm != NULL, NULL);
1531
1532 gpointer msg = clusterlog_get_state(cfs_status.clusterlog, res_len);
1533
1534 return msg;
1535}
1536
1537static int
1538dfsm_process_update(
1539 dfsm_t *dfsm,
1540 gpointer data,
1541 dfsm_sync_info_t *syncinfo,
1542 uint32_t nodeid,
1543 uint32_t pid,
1544 const void *msg,
1545 size_t msg_len)
1546{
1547 cfs_critical("%s: received unexpected update message", __func__);
1548
1549 return -1;
1550}
1551
1552static int
1553dfsm_process_state_update(
1554 dfsm_t *dfsm,
1555 gpointer data,
1556 dfsm_sync_info_t *syncinfo)
1557{
1558 g_return_val_if_fail(dfsm != NULL, -1);
1559 g_return_val_if_fail(syncinfo != NULL, -1);
1560
1561 clog_base_t *clog[syncinfo->node_count];
1562
1563 int local_index = -1;
1564 for (int i = 0; i < syncinfo->node_count; i++) {
1565 dfsm_node_info_t *ni = &syncinfo->nodes[i];
1566 ni->synced = 1;
1567
1568 if (syncinfo->local == ni)
1569 local_index = i;
1570
1571 clog_base_t *base = (clog_base_t *)ni->state;
1572 if (ni->state_len > 8 && ni->state_len == clog_size(base)) {
1573 clog[i] = ni->state;
1574 } else {
1575 cfs_critical("received log with wrong size %u", ni->state_len);
1576 clog[i] = NULL;
1577 }
1578 }
1579
1580 if (!clusterlog_merge(cfs_status.clusterlog, clog, syncinfo->node_count, local_index)) {
1581 cfs_critical("unable to merge log files");
1582 }
1583
1584 cfs_kvstore_sync();
1585
1586 return 1;
1587}
1588
1589static int
1590dfsm_commit(
1591 dfsm_t *dfsm,
1592 gpointer data,
1593 dfsm_sync_info_t *syncinfo)
1594{
1595 g_return_val_if_fail(dfsm != NULL, -1);
1596 g_return_val_if_fail(syncinfo != NULL, -1);
1597
1598 return 1;
1599}
1600
1601static void
1602dfsm_synced(dfsm_t *dfsm)
1603{
1604 g_return_if_fail(dfsm != NULL);
1605
1606 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, cfs.nodename);
1607 if (!ip)
1608 ip = cfs.ip;
1609
1610 cfs_status_set("nodeip", ip, strlen(ip) + 1);
1611}
1612
1613static int
1614dfsm_cleanup(
1615 dfsm_t *dfsm,
1616 gpointer data,
1617 dfsm_sync_info_t *syncinfo)
1618{
1619 return 1;
1620}
1621
1622static dfsm_callbacks_t kvstore_dfsm_callbacks = {
1623 .dfsm_deliver_fn = dfsm_deliver,
1624 .dfsm_confchg_fn = dfsm_confchg,
1625
1626 .dfsm_get_state_fn = dfsm_get_state,
1627 .dfsm_process_state_update_fn = dfsm_process_state_update,
1628 .dfsm_process_update_fn = dfsm_process_update,
1629 .dfsm_commit_fn = dfsm_commit,
1630 .dfsm_cleanup_fn = dfsm_cleanup,
1631 .dfsm_synced_fn = dfsm_synced,
1632};
1633
1634dfsm_t *
1635cfs_status_dfsm_new(void)
1636{
89fde9ac 1637 g_mutex_lock (&mutex);
fe000966
DM
1638
1639 cfs_status.kvstore = dfsm_new(NULL, KVSTORE_CPG_GROUP_NAME, G_LOG_DOMAIN,
1640 0, &kvstore_dfsm_callbacks);
89fde9ac 1641 g_mutex_unlock (&mutex);
fe000966
DM
1642
1643 return cfs_status.kvstore;
1644}
1645
1646gboolean
1647cfs_is_quorate(void)
1648{
89fde9ac 1649 g_mutex_lock (&mutex);
fe000966 1650 gboolean res = cfs_status.quorate;
89fde9ac 1651 g_mutex_unlock (&mutex);
fe000966
DM
1652
1653 return res;
1654}
1655
1656void
1657cfs_set_quorate(
1658 uint32_t quorate,
1659 gboolean quiet)
1660{
89fde9ac 1661 g_mutex_lock (&mutex);
fe000966
DM
1662
1663 uint32_t prev_quorate = cfs_status.quorate;
1664 cfs_status.quorate = quorate;
1665
1666 if (!prev_quorate && cfs_status.quorate) {
1667 if (!quiet)
1668 cfs_message("node has quorum");
1669 }
1670
1671 if (prev_quorate && !cfs_status.quorate) {
1672 if (!quiet)
1673 cfs_message("node lost quorum");
1674 }
1675
89fde9ac 1676 g_mutex_unlock (&mutex);
fe000966 1677}