]> git.proxmox.com Git - pve-cluster.git/blame - data/src/status.c
pmxcfs: vminfo_type_to_string: fixup indendation and parenthesis
[pve-cluster.git] / data / src / status.c
CommitLineData
fe000966
DM
1/*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19*/
20
21#define G_LOG_DOMAIN "status"
22
23#ifdef HAVE_CONFIG_H
24#include <config.h>
25#endif /* HAVE_CONFIG_H */
26
27#include <stdio.h>
28#include <stdint.h>
29#include <string.h>
30#include <errno.h>
31#include <glib.h>
32#include <sys/syslog.h>
33#include <rrd.h>
34#include <rrd_client.h>
35#include <time.h>
36
37#include "cfs-utils.h"
38#include "status.h"
39#include "logger.h"
40
41#define KVSTORE_CPG_GROUP_NAME "pve_kvstore_v1"
42
43typedef enum {
44 KVSTORE_MESSAGE_UPDATE = 1,
45 KVSTORE_MESSAGE_UPDATE_COMPLETE = 2,
46 KVSTORE_MESSAGE_LOG = 3,
47} kvstore_message_t;
48
49static uint32_t vminfo_version_counter;
50
51typedef struct {
52 uint32_t vmid;
53 char *nodename;
54 int vmtype;
55 uint32_t version;
56} vminfo_t;
57
58typedef struct {
59 char *key;
60 gpointer data;
61 size_t len;
62 uint32_t version;
63} kventry_t;
64
65typedef struct {
66 char *key;
67 gpointer data;
68 size_t len;
69 uint32_t time;
70} rrdentry_t;
71
72typedef struct {
73 char *path;
74 uint32_t version;
75} memdb_change_t;
76
77static memdb_change_t memdb_change_array[] = {
2113d031
DM
78 { .path = "corosync.conf" },
79 { .path = "corosync.conf.new" },
fe000966
DM
80 { .path = "storage.cfg" },
81 { .path = "user.cfg" },
82 { .path = "domains.cfg" },
83 { .path = "priv/shadow.cfg" },
8a9456dc 84 { .path = "priv/tfa.cfg" },
fe000966 85 { .path = "datacenter.cfg" },
e1735a61 86 { .path = "vzdump.cron" },
5a5417e6
DM
87 { .path = "ha/crm_commands" },
88 { .path = "ha/manager_status" },
89 { .path = "ha/resources.cfg" },
90 { .path = "ha/groups.cfg" },
e9af3eb7 91 { .path = "ha/fence.cfg" },
9d4f69ff 92 { .path = "status.cfg" },
f6de131a 93 { .path = "replication.cfg" },
22e2ed76 94 { .path = "ceph.conf" },
fe000966
DM
95};
96
89fde9ac 97static GMutex mutex;
fe000966
DM
98
99typedef struct {
100 time_t start_time;
101
102 uint32_t quorate;
103
104 cfs_clinfo_t *clinfo;
105 uint32_t clinfo_version;
106
107 GHashTable *vmlist;
108 uint32_t vmlist_version;
109
110 dfsm_t *kvstore;
111 GHashTable *kvhash;
112 GHashTable *rrdhash;
113 GHashTable *iphash;
114
115 GHashTable *memdb_changes;
116
117 clusterlog_t *clusterlog;
118} cfs_status_t;
119
120static cfs_status_t cfs_status;
121
122struct cfs_clnode {
123 char *name;
124 uint32_t nodeid;
125 uint32_t votes;
126 gboolean online;
127 GHashTable *kvhash;
128};
129
130struct cfs_clinfo {
131 char *cluster_name;
132 uint32_t cman_version;
133
134 GHashTable *nodes_byid;
135 GHashTable *nodes_byname;
136};
137
138static guint
139g_int32_hash (gconstpointer v)
140{
141 return *(const uint32_t *) v;
142}
143
144static gboolean
145g_int32_equal (gconstpointer v1,
146 gconstpointer v2)
147{
148 return *((const uint32_t*) v1) == *((const uint32_t*) v2);
149}
150
151static void vminfo_free(vminfo_t *vminfo)
152{
153 g_return_if_fail(vminfo != NULL);
154
155 if (vminfo->nodename)
156 g_free(vminfo->nodename);
157
158
159 g_free(vminfo);
160}
161
99abbd31
TL
162static const char *vminfo_type_to_string(vminfo_t *vminfo)
163{
164 if (vminfo->vmtype == VMTYPE_QEMU) {
165 return "qemu";
166 } else if (vminfo->vmtype == VMTYPE_OPENVZ) {
167 return "openvz";
168 } else if (vminfo->vmtype == VMTYPE_LXC) {
169 return "lxc";
170 } else {
171 return "unknown";
172 }
c26ca647
TL
173}
174
fe000966
DM
175void cfs_clnode_destroy(
176 cfs_clnode_t *clnode)
177{
178 g_return_if_fail(clnode != NULL);
179
180 if (clnode->kvhash)
181 g_hash_table_destroy(clnode->kvhash);
182
183 if (clnode->name)
184 g_free(clnode->name);
185
186 g_free(clnode);
187}
188
189cfs_clnode_t *cfs_clnode_new(
190 const char *name,
191 uint32_t nodeid,
192 uint32_t votes)
193{
194 g_return_val_if_fail(name != NULL, NULL);
195
196 cfs_clnode_t *clnode = g_new0(cfs_clnode_t, 1);
197 if (!clnode)
198 return NULL;
199
200 clnode->name = g_strdup(name);
201 clnode->nodeid = nodeid;
202 clnode->votes = votes;
203
204 return clnode;
205}
206
207gboolean cfs_clinfo_destroy(
208 cfs_clinfo_t *clinfo)
209{
210 g_return_val_if_fail(clinfo != NULL, FALSE);
211
212 if (clinfo->cluster_name)
213 g_free(clinfo->cluster_name);
214
215 if (clinfo->nodes_byname)
216 g_hash_table_destroy(clinfo->nodes_byname);
217
218 if (clinfo->nodes_byid)
219 g_hash_table_destroy(clinfo->nodes_byid);
220
221 g_free(clinfo);
222
223 return TRUE;
224}
225
226cfs_clinfo_t *cfs_clinfo_new(
227 const char *cluster_name,
228 uint32_t cman_version)
229{
230 g_return_val_if_fail(cluster_name != NULL, NULL);
231
232 cfs_clinfo_t *clinfo = g_new0(cfs_clinfo_t, 1);
233 if (!clinfo)
234 return NULL;
235
236 clinfo->cluster_name = g_strdup(cluster_name);
237 clinfo->cman_version = cman_version;
238
239 if (!(clinfo->nodes_byid = g_hash_table_new_full(
240 g_int32_hash, g_int32_equal, NULL,
241 (GDestroyNotify)cfs_clnode_destroy)))
242 goto fail;
243
244 if (!(clinfo->nodes_byname = g_hash_table_new(g_str_hash, g_str_equal)))
245 goto fail;
246
247 return clinfo;
248
249fail:
250 cfs_clinfo_destroy(clinfo);
251
252 return NULL;
253}
254
255gboolean cfs_clinfo_add_node(
256 cfs_clinfo_t *clinfo,
257 cfs_clnode_t *clnode)
258{
259 g_return_val_if_fail(clinfo != NULL, FALSE);
260 g_return_val_if_fail(clnode != NULL, FALSE);
261
262 g_hash_table_replace(clinfo->nodes_byid, &clnode->nodeid, clnode);
263 g_hash_table_replace(clinfo->nodes_byname, clnode->name, clnode);
264
265 return TRUE;
266}
267
268int
269cfs_create_memberlist_msg(
270 GString *str)
271{
272 g_return_val_if_fail(str != NULL, -EINVAL);
273
89fde9ac 274 g_mutex_lock (&mutex);
fe000966
DM
275
276 g_string_append_printf(str,"{\n");
277
278 guint nodecount = 0;
279
280 cfs_clinfo_t *clinfo = cfs_status.clinfo;
281
282 if (clinfo && clinfo->nodes_byid)
283 nodecount = g_hash_table_size(clinfo->nodes_byid);
284
285 if (nodecount) {
286 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
287 g_string_append_printf(str, "\"version\": %u,\n", cfs_status.clinfo_version);
288
289 g_string_append_printf(str, "\"cluster\": { ");
290 g_string_append_printf(str, "\"name\": \"%s\", \"version\": %d, "
291 "\"nodes\": %d, \"quorate\": %d ",
292 clinfo->cluster_name, clinfo->cman_version,
293 nodecount, cfs_status.quorate);
294
295 g_string_append_printf(str,"},\n");
296 g_string_append_printf(str,"\"nodelist\": {\n");
297
298 GHashTable *ht = clinfo->nodes_byid;
299 GHashTableIter iter;
300 gpointer key, value;
301
302 g_hash_table_iter_init (&iter, ht);
303
304 int i = 0;
305 while (g_hash_table_iter_next (&iter, &key, &value)) {
306 cfs_clnode_t *node = (cfs_clnode_t *)value;
307 if (i) g_string_append_printf(str, ",\n");
308 i++;
309
310 g_string_append_printf(str, " \"%s\": { \"id\": %d, \"online\": %d",
311 node->name, node->nodeid, node->online);
312
313
314 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, node->name);
315 if (ip) {
316 g_string_append_printf(str, ", \"ip\": \"%s\"", ip);
317 }
318
319 g_string_append_printf(str, "}");
320
321 }
322 g_string_append_printf(str,"\n }\n");
323 } else {
324 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
325 g_string_append_printf(str, "\"version\": %u\n", cfs_status.clinfo_version);
326 }
327
328 g_string_append_printf(str,"}\n");
329
89fde9ac 330 g_mutex_unlock (&mutex);
fe000966
DM
331
332 return 0;
333}
334
335static void
336kventry_free(kventry_t *entry)
337{
338 g_return_if_fail(entry != NULL);
339
340 g_free(entry->key);
341 g_free(entry->data);
342 g_free(entry);
343}
344
345static GHashTable *
346kventry_hash_new(void)
347{
348 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
349 (GDestroyNotify)kventry_free);
350}
351
352static void
353rrdentry_free(rrdentry_t *entry)
354{
355 g_return_if_fail(entry != NULL);
356
357 g_free(entry->key);
358 g_free(entry->data);
359 g_free(entry);
360}
361
362static GHashTable *
363rrdentry_hash_new(void)
364{
365 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
366 (GDestroyNotify)rrdentry_free);
367}
368
369void
370cfs_cluster_log_dump(GString *str, const char *user, guint max_entries)
371{
372 clusterlog_dump(cfs_status.clusterlog, str, user, max_entries);
373}
374
375void
376cfs_cluster_log(clog_entry_t *entry)
377{
378 g_return_if_fail(entry != NULL);
379
380 clusterlog_insert(cfs_status.clusterlog, entry);
381
382 if (cfs_status.kvstore) {
383 struct iovec iov[1];
384 iov[0].iov_base = (char *)entry;
385 iov[0].iov_len = clog_entry_size(entry);
386
af2e9dd4
DM
387 if (dfsm_is_initialized(cfs_status.kvstore))
388 dfsm_send_message(cfs_status.kvstore, KVSTORE_MESSAGE_LOG, iov, 1);
fe000966
DM
389 }
390}
391
392void cfs_status_init(void)
393{
89fde9ac 394 g_mutex_lock (&mutex);
fe000966
DM
395
396 cfs_status.start_time = time(NULL);
397
398 cfs_status.vmlist = vmlist_hash_new();
399
400 cfs_status.kvhash = kventry_hash_new();
401
402 cfs_status.rrdhash = rrdentry_hash_new();
403
404 cfs_status.iphash = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
405
406 cfs_status.memdb_changes = g_hash_table_new(g_str_hash, g_str_equal);
407
408 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
409 g_hash_table_replace(cfs_status.memdb_changes,
410 memdb_change_array[i].path,
411 &memdb_change_array[i]);
412 }
413
414 cfs_status.clusterlog = clusterlog_new();
415
416 // fixme:
417 clusterlog_add(cfs_status.clusterlog, "root", "cluster", getpid(),
418 LOG_INFO, "starting cluster log");
419
89fde9ac 420 g_mutex_unlock (&mutex);
fe000966
DM
421}
422
423void cfs_status_cleanup(void)
424{
89fde9ac 425 g_mutex_lock (&mutex);
fe000966
DM
426
427 cfs_status.clinfo_version++;
428
429 if (cfs_status.clinfo) {
430 cfs_clinfo_destroy(cfs_status.clinfo);
431 cfs_status.clinfo = NULL;
432 }
433
434 if (cfs_status.vmlist) {
435 g_hash_table_destroy(cfs_status.vmlist);
436 cfs_status.vmlist = NULL;
437 }
438
439 if (cfs_status.kvhash) {
440 g_hash_table_destroy(cfs_status.kvhash);
441 cfs_status.kvhash = NULL;
442 }
443
444 if (cfs_status.rrdhash) {
445 g_hash_table_destroy(cfs_status.rrdhash);
446 cfs_status.rrdhash = NULL;
447 }
448
449 if (cfs_status.iphash) {
450 g_hash_table_destroy(cfs_status.iphash);
451 cfs_status.iphash = NULL;
452 }
453
454 if (cfs_status.clusterlog)
455 clusterlog_destroy(cfs_status.clusterlog);
456
89fde9ac 457 g_mutex_unlock (&mutex);
fe000966
DM
458}
459
460void cfs_status_set_clinfo(
461 cfs_clinfo_t *clinfo)
462{
463 g_return_if_fail(clinfo != NULL);
464
89fde9ac 465 g_mutex_lock (&mutex);
fe000966
DM
466
467 cfs_status.clinfo_version++;
468
469 cfs_clinfo_t *old = cfs_status.clinfo;
470
471 cfs_status.clinfo = clinfo;
472
473 cfs_message("update cluster info (cluster name %s, version = %d)",
474 clinfo->cluster_name, clinfo->cman_version);
475
476
477 if (old && old->nodes_byid && clinfo->nodes_byid) {
478 /* copy kvstore */
479 GHashTable *ht = clinfo->nodes_byid;
480 GHashTableIter iter;
481 gpointer key, value;
482
483 g_hash_table_iter_init (&iter, ht);
484
485 while (g_hash_table_iter_next (&iter, &key, &value)) {
486 cfs_clnode_t *node = (cfs_clnode_t *)value;
487 cfs_clnode_t *oldnode;
469d3acb 488 if ((oldnode = g_hash_table_lookup(old->nodes_byid, key))) {
fe000966
DM
489 node->online = oldnode->online;
490 node->kvhash = oldnode->kvhash;
491 oldnode->kvhash = NULL;
492 }
493 }
494
495 }
496
497 if (old)
498 cfs_clinfo_destroy(old);
499
500
89fde9ac 501 g_mutex_unlock (&mutex);
fe000966
DM
502}
503
504static void
505dump_kvstore_versions(
506 GString *str,
507 GHashTable *kvhash,
508 const char *nodename)
509{
510 g_return_if_fail(kvhash != NULL);
511 g_return_if_fail(str != NULL);
512 g_return_if_fail(nodename != NULL);
513
514 GHashTable *ht = kvhash;
515 GHashTableIter iter;
516 gpointer key, value;
517
518 g_string_append_printf(str, "\"%s\": {\n", nodename);
519
520 g_hash_table_iter_init (&iter, ht);
521
522 int i = 0;
523 while (g_hash_table_iter_next (&iter, &key, &value)) {
524 kventry_t *entry = (kventry_t *)value;
525 if (i) g_string_append_printf(str, ",\n");
526 i++;
527 g_string_append_printf(str,"\"%s\": %u", entry->key, entry->version);
528 }
529
530 g_string_append_printf(str, "}\n");
531}
532
533int
534cfs_create_version_msg(GString *str)
535{
536 g_return_val_if_fail(str != NULL, -EINVAL);
537
89fde9ac 538 g_mutex_lock (&mutex);
fe000966
DM
539
540 g_string_append_printf(str,"{\n");
541
542 g_string_append_printf(str, "\"starttime\": %lu,\n", (unsigned long)cfs_status.start_time);
543
544 g_string_append_printf(str, "\"clinfo\": %u,\n", cfs_status.clinfo_version);
545
546 g_string_append_printf(str, "\"vmlist\": %u,\n", cfs_status.vmlist_version);
547
548 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
549 g_string_append_printf(str, "\"%s\": %u,\n",
550 memdb_change_array[i].path,
551 memdb_change_array[i].version);
552 }
553
554 g_string_append_printf(str, "\"kvstore\": {\n");
555
556 dump_kvstore_versions(str, cfs_status.kvhash, cfs.nodename);
557
558 cfs_clinfo_t *clinfo = cfs_status.clinfo;
559
560 if (clinfo && clinfo->nodes_byid) {
561 GHashTable *ht = clinfo->nodes_byid;
562 GHashTableIter iter;
563 gpointer key, value;
564
565 g_hash_table_iter_init (&iter, ht);
566
567 while (g_hash_table_iter_next (&iter, &key, &value)) {
568 cfs_clnode_t *node = (cfs_clnode_t *)value;
569 if (!node->kvhash)
570 continue;
571 g_string_append_printf(str, ",\n");
572 dump_kvstore_versions(str, node->kvhash, node->name);
573 }
574 }
575
576 g_string_append_printf(str,"}\n");
577
578 g_string_append_printf(str,"}\n");
579
89fde9ac 580 g_mutex_unlock (&mutex);
fe000966
DM
581
582 return 0;
583}
584
585GHashTable *
586vmlist_hash_new(void)
587{
588 return g_hash_table_new_full(g_int_hash, g_int_equal, NULL,
589 (GDestroyNotify)vminfo_free);
590}
591
592gboolean
593vmlist_hash_insert_vm(
594 GHashTable *vmlist,
595 int vmtype,
596 guint32 vmid,
597 const char *nodename,
598 gboolean replace)
599{
600 g_return_val_if_fail(vmlist != NULL, FALSE);
601 g_return_val_if_fail(nodename != NULL, FALSE);
602 g_return_val_if_fail(vmid != 0, FALSE);
7f66b436
DM
603 g_return_val_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
604 vmtype == VMTYPE_LXC, FALSE);
fe000966
DM
605
606 if (!replace && g_hash_table_lookup(vmlist, &vmid)) {
607 cfs_critical("detected duplicate VMID %d", vmid);
608 return FALSE;
609 }
610
611 vminfo_t *vminfo = g_new0(vminfo_t, 1);
612
613 vminfo->vmid = vmid;
614 vminfo->vmtype = vmtype;
615 vminfo->nodename = g_strdup(nodename);
616
617 vminfo->version = ++vminfo_version_counter;
618
619 g_hash_table_replace(vmlist, &vminfo->vmid, vminfo);
620
621 return TRUE;
622}
623
624void
625vmlist_register_vm(
626 int vmtype,
627 guint32 vmid,
628 const char *nodename)
629{
630 g_return_if_fail(cfs_status.vmlist != NULL);
631 g_return_if_fail(nodename != NULL);
632 g_return_if_fail(vmid != 0);
7f66b436
DM
633 g_return_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
634 vmtype == VMTYPE_LXC);
fe000966
DM
635
636 cfs_debug("vmlist_register_vm: %s/%u %d", nodename, vmid, vmtype);
637
89fde9ac 638 g_mutex_lock (&mutex);
fe000966
DM
639
640 cfs_status.vmlist_version++;
641
642 vmlist_hash_insert_vm(cfs_status.vmlist, vmtype, vmid, nodename, TRUE);
643
89fde9ac 644 g_mutex_unlock (&mutex);
fe000966
DM
645}
646
647gboolean
648vmlist_different_vm_exists(
649 int vmtype,
650 guint32 vmid,
651 const char *nodename)
652{
653 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
654 g_return_val_if_fail(vmid != 0, FALSE);
655
656 gboolean res = FALSE;
657
89fde9ac 658 g_mutex_lock (&mutex);
fe000966
DM
659
660 vminfo_t *vminfo;
661 if ((vminfo = (vminfo_t *)g_hash_table_lookup(cfs_status.vmlist, &vmid))) {
662 if (!(vminfo->vmtype == vmtype && strcmp(vminfo->nodename, nodename) == 0))
663 res = TRUE;
664 }
89fde9ac 665 g_mutex_unlock (&mutex);
fe000966
DM
666
667 return res;
668}
669
670gboolean
671vmlist_vm_exists(
672 guint32 vmid)
673{
674 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
675 g_return_val_if_fail(vmid != 0, FALSE);
676
89fde9ac 677 g_mutex_lock (&mutex);
fe000966
DM
678
679 gpointer res = g_hash_table_lookup(cfs_status.vmlist, &vmid);
680
89fde9ac 681 g_mutex_unlock (&mutex);
fe000966
DM
682
683 return res != NULL;
684}
685
686void
687vmlist_delete_vm(
688 guint32 vmid)
689{
690 g_return_if_fail(cfs_status.vmlist != NULL);
691 g_return_if_fail(vmid != 0);
692
89fde9ac 693 g_mutex_lock (&mutex);
fe000966
DM
694
695 cfs_status.vmlist_version++;
696
697 g_hash_table_remove(cfs_status.vmlist, &vmid);
698
89fde9ac 699 g_mutex_unlock (&mutex);
fe000966
DM
700}
701
702void cfs_status_set_vmlist(
703 GHashTable *vmlist)
704{
705 g_return_if_fail(vmlist != NULL);
706
89fde9ac 707 g_mutex_lock (&mutex);
fe000966
DM
708
709 cfs_status.vmlist_version++;
710
711 if (cfs_status.vmlist)
712 g_hash_table_destroy(cfs_status.vmlist);
713
714 cfs_status.vmlist = vmlist;
715
89fde9ac 716 g_mutex_unlock (&mutex);
fe000966
DM
717}
718
719int
720cfs_create_vmlist_msg(GString *str)
721{
722 g_return_val_if_fail(cfs_status.vmlist != NULL, -EINVAL);
723 g_return_val_if_fail(str != NULL, -EINVAL);
724
89fde9ac 725 g_mutex_lock (&mutex);
fe000966
DM
726
727 g_string_append_printf(str,"{\n");
728
729 GHashTable *ht = cfs_status.vmlist;
730
731 guint count = g_hash_table_size(ht);
732
733 if (!count) {
734 g_string_append_printf(str,"\"version\": %u\n", cfs_status.vmlist_version);
735 } else {
736 g_string_append_printf(str,"\"version\": %u,\n", cfs_status.vmlist_version);
737
738 g_string_append_printf(str,"\"ids\": {\n");
739
740 GHashTableIter iter;
741 gpointer key, value;
742
743 g_hash_table_iter_init (&iter, ht);
744
745 int first = 1;
746 while (g_hash_table_iter_next (&iter, &key, &value)) {
747 vminfo_t *vminfo = (vminfo_t *)value;
c26ca647 748 const char *type = vminfo_type_to_string(vminfo);
fe000966
DM
749
750 if (!first)
751 g_string_append_printf(str, ",\n");
752 first = 0;
753
754 g_string_append_printf(str,"\"%u\": { \"node\": \"%s\", \"type\": \"%s\", \"version\": %u }",
755 vminfo->vmid, vminfo->nodename, type, vminfo->version);
756 }
757
758 g_string_append_printf(str,"}\n");
759 }
760 g_string_append_printf(str,"\n}\n");
761
89fde9ac 762 g_mutex_unlock (&mutex);
fe000966
DM
763
764 return 0;
765}
766
767void
768record_memdb_change(const char *path)
769{
770 g_return_if_fail(cfs_status.memdb_changes != 0);
771
772 memdb_change_t *ce;
773
774 if ((ce = (memdb_change_t *)g_hash_table_lookup(cfs_status.memdb_changes, path))) {
775 ce->version++;
776 }
777}
778
779void
780record_memdb_reload(void)
781{
782 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
783 memdb_change_array[i].version++;
784 }
785}
786
787static gboolean
788kventry_hash_set(
789 GHashTable *kvhash,
790 const char *key,
791 gconstpointer data,
792 size_t len)
793{
794 g_return_val_if_fail(kvhash != NULL, FALSE);
795 g_return_val_if_fail(key != NULL, FALSE);
796 g_return_val_if_fail(data != NULL, FALSE);
797
798 kventry_t *entry;
71cc17bc
DC
799 if (!len) {
800 g_hash_table_remove(kvhash, key);
801 } else if ((entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
fe000966
DM
802 g_free(entry->data);
803 entry->data = g_memdup(data, len);
804 entry->len = len;
805 entry->version++;
806 } else {
807 kventry_t *entry = g_new0(kventry_t, 1);
808
809 entry->key = g_strdup(key);
810 entry->data = g_memdup(data, len);
811 entry->len = len;
812
813 g_hash_table_replace(kvhash, entry->key, entry);
814 }
815
816 return TRUE;
817}
818
819static const char *rrd_def_node[] = {
820 "DS:loadavg:GAUGE:120:0:U",
821 "DS:maxcpu:GAUGE:120:0:U",
822 "DS:cpu:GAUGE:120:0:U",
823 "DS:iowait:GAUGE:120:0:U",
824 "DS:memtotal:GAUGE:120:0:U",
825 "DS:memused:GAUGE:120:0:U",
826 "DS:swaptotal:GAUGE:120:0:U",
827 "DS:swapused:GAUGE:120:0:U",
828 "DS:roottotal:GAUGE:120:0:U",
829 "DS:rootused:GAUGE:120:0:U",
764296f1
DM
830 "DS:netin:DERIVE:120:0:U",
831 "DS:netout:DERIVE:120:0:U",
fe000966
DM
832
833 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
834 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
835 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
836 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
837 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
838
839 "RRA:MAX:0.5:1:70", // 1 min max - one hour
840 "RRA:MAX:0.5:30:70", // 30 min max - one day
841 "RRA:MAX:0.5:180:70", // 3 hour max - one week
842 "RRA:MAX:0.5:720:70", // 12 hour max - one month
843 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
844 NULL,
845};
846
847static const char *rrd_def_vm[] = {
848 "DS:maxcpu:GAUGE:120:0:U",
849 "DS:cpu:GAUGE:120:0:U",
850 "DS:maxmem:GAUGE:120:0:U",
851 "DS:mem:GAUGE:120:0:U",
852 "DS:maxdisk:GAUGE:120:0:U",
853 "DS:disk:GAUGE:120:0:U",
764296f1
DM
854 "DS:netin:DERIVE:120:0:U",
855 "DS:netout:DERIVE:120:0:U",
856 "DS:diskread:DERIVE:120:0:U",
857 "DS:diskwrite:DERIVE:120:0:U",
fe000966
DM
858
859 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
860 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
861 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
862 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
863 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
864
865 "RRA:MAX:0.5:1:70", // 1 min max - one hour
866 "RRA:MAX:0.5:30:70", // 30 min max - one day
867 "RRA:MAX:0.5:180:70", // 3 hour max - one week
868 "RRA:MAX:0.5:720:70", // 12 hour max - one month
869 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
870 NULL,
871};
872
873static const char *rrd_def_storage[] = {
874 "DS:total:GAUGE:120:0:U",
875 "DS:used:GAUGE:120:0:U",
876
877 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
878 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
879 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
880 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
881 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
882
883 "RRA:MAX:0.5:1:70", // 1 min max - one hour
884 "RRA:MAX:0.5:30:70", // 30 min max - one day
885 "RRA:MAX:0.5:180:70", // 3 hour max - one week
886 "RRA:MAX:0.5:720:70", // 12 hour max - one month
887 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
888 NULL,
889};
890
891#define RRDDIR "/var/lib/rrdcached/db"
892
893static void
894create_rrd_file(
895 const char *filename,
896 int argcount,
897 const char *rrddef[])
898{
899 /* start at day boundary */
900 time_t ctime;
901 time(&ctime);
902 struct tm *ltm = localtime(&ctime);
903 ltm->tm_sec = 0;
904 ltm->tm_min = 0;
905 ltm->tm_hour = 0;
906
907 rrd_clear_error();
908 if (rrd_create_r(filename, 60, timelocal(ltm), argcount, rrddef)) {
909 cfs_message("RRD create error %s: %s", filename, rrd_get_error());
910 }
911}
912
913static inline const char *
914rrd_skip_data(
915 const char *data,
916 int count)
917{
918 int found = 0;
919 while (*data && found < count) {
920 if (*data++ == ':')
921 found++;
922 }
923 return data;
924}
925
926static void
927update_rrd_data(
928 const char *key,
929 gconstpointer data,
930 size_t len)
931{
932 g_return_if_fail(key != NULL);
933 g_return_if_fail(data != NULL);
934 g_return_if_fail(len > 0);
935 g_return_if_fail(len < 4096);
936
937 static const char *rrdcsock = "unix:/var/run/rrdcached.sock";
938
939 int use_daemon = 1;
940 if (rrdc_connect(rrdcsock) != 0)
941 use_daemon = 0;
942
ba9dcfc1 943 char *filename = NULL;
fe000966
DM
944
945 int skip = 0;
946
947 if (strncmp(key, "pve2-node/", 10) == 0) {
948 const char *node = key + 10;
949
f9c865a8 950 skip = 2;
fe000966
DM
951
952 if (strchr(node, '/') != NULL)
953 goto keyerror;
954
955 if (strlen(node) < 1)
956 goto keyerror;
957
ba9dcfc1
DM
958 filename = g_strdup_printf(RRDDIR "/%s", key);
959
fe000966
DM
960 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
961
962 mkdir(RRDDIR "/pve2-node", 0755);
963 int argcount = sizeof(rrd_def_node)/sizeof(void*) - 1;
964 create_rrd_file(filename, argcount, rrd_def_node);
965 }
966
ba9dcfc1
DM
967 } else if ((strncmp(key, "pve2-vm/", 8) == 0) ||
968 (strncmp(key, "pve2.3-vm/", 10) == 0)) {
969 const char *vmid;
fe000966 970
ba9dcfc1
DM
971 if (strncmp(key, "pve2-vm/", 8) == 0) {
972 vmid = key + 8;
973 skip = 2;
974 } else {
975 vmid = key + 10;
94e4cba3 976 skip = 4;
ba9dcfc1 977 }
fe000966
DM
978
979 if (strchr(vmid, '/') != NULL)
980 goto keyerror;
981
982 if (strlen(vmid) < 1)
983 goto keyerror;
984
ba9dcfc1
DM
985 filename = g_strdup_printf(RRDDIR "/%s/%s", "pve2-vm", vmid);
986
fe000966
DM
987 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
988
989 mkdir(RRDDIR "/pve2-vm", 0755);
990 int argcount = sizeof(rrd_def_vm)/sizeof(void*) - 1;
991 create_rrd_file(filename, argcount, rrd_def_vm);
992 }
993
994 } else if (strncmp(key, "pve2-storage/", 13) == 0) {
995 const char *node = key + 13;
996
997 const char *storage = node;
998 while (*storage && *storage != '/')
999 storage++;
1000
1001 if (*storage != '/' || ((storage - node) < 1))
1002 goto keyerror;
1003
1004 storage++;
1005
1006 if (strchr(storage, '/') != NULL)
1007 goto keyerror;
1008
1009 if (strlen(storage) < 1)
1010 goto keyerror;
1011
ba9dcfc1
DM
1012 filename = g_strdup_printf(RRDDIR "/%s", key);
1013
fe000966
DM
1014 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1015
1016 mkdir(RRDDIR "/pve2-storage", 0755);
1017
1018 char *dir = g_path_get_dirname(filename);
1019 mkdir(dir, 0755);
1020 g_free(dir);
1021
1022 int argcount = sizeof(rrd_def_storage)/sizeof(void*) - 1;
1023 create_rrd_file(filename, argcount, rrd_def_storage);
1024 }
1025
1026 } else {
1027 goto keyerror;
1028 }
1029
1030 const char *dp = skip ? rrd_skip_data(data, skip) : data;
1031
1032 const char *update_args[] = { dp, NULL };
1033
1034 if (use_daemon) {
1035 int status;
1036 if ((status = rrdc_update(filename, 1, update_args)) != 0) {
1037 cfs_message("RRDC update error %s: %d", filename, status);
1038 rrdc_disconnect();
1039 rrd_clear_error();
1040 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1041 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1042 }
1043 }
1044
1045 } else {
1046 rrd_clear_error();
1047 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1048 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1049 }
1050 }
1051
1052ret:
ba9dcfc1
DM
1053 if (filename)
1054 g_free(filename);
1055
fe000966
DM
1056 return;
1057
1058keyerror:
1059 cfs_critical("RRD update error: unknown/wrong key %s", key);
1060 goto ret;
1061}
1062
1063static gboolean
1064rrd_entry_is_old(
1065 gpointer key,
1066 gpointer value,
1067 gpointer user_data)
1068{
1069 rrdentry_t *entry = (rrdentry_t *)value;
1070 uint32_t ctime = GPOINTER_TO_UINT(user_data);
1071
1072 int diff = ctime - entry->time;
1073
1074 /* remove everything older than 5 minutes */
1075 int expire = 60*5;
1076
1077 return (diff > expire) ? TRUE : FALSE;
1078}
1079
1080static char *rrd_dump_buf = NULL;
1081static time_t rrd_dump_last = 0;
1082
1083void
1084cfs_rrd_dump(GString *str)
1085{
1086 time_t ctime;
fe000966 1087
663896cf
TL
1088 g_mutex_lock (&mutex);
1089
1090 time(&ctime);
fe000966
DM
1091 if (rrd_dump_buf && (ctime - rrd_dump_last) < 2) {
1092 g_string_assign(str, rrd_dump_buf);
663896cf 1093 g_mutex_unlock (&mutex);
fe000966
DM
1094 return;
1095 }
1096
1097 /* remove old data */
1098 g_hash_table_foreach_remove(cfs_status.rrdhash, rrd_entry_is_old,
1099 GUINT_TO_POINTER(ctime));
1100
1101 g_string_set_size(str, 0);
1102
1103 GHashTableIter iter;
1104 gpointer key, value;
1105
1106 g_hash_table_iter_init (&iter, cfs_status.rrdhash);
1107
1108 while (g_hash_table_iter_next (&iter, &key, &value)) {
1109 rrdentry_t *entry = (rrdentry_t *)value;
1110 g_string_append(str, key);
1111 g_string_append(str, ":");
1112 g_string_append(str, entry->data);
1113 g_string_append(str, "\n");
1114 }
1115
1116 g_string_append_c(str, 0); // never return undef
1117
1118 rrd_dump_last = ctime;
1119 if (rrd_dump_buf)
1120 g_free(rrd_dump_buf);
1121 rrd_dump_buf = g_strdup(str->str);
663896cf
TL
1122
1123 g_mutex_unlock (&mutex);
fe000966
DM
1124}
1125
1126static gboolean
1127nodeip_hash_set(
1128 GHashTable *iphash,
1129 const char *nodename,
1130 const char *ip,
1131 size_t len)
1132{
1133 g_return_val_if_fail(iphash != NULL, FALSE);
1134 g_return_val_if_fail(nodename != NULL, FALSE);
1135 g_return_val_if_fail(ip != NULL, FALSE);
1136 g_return_val_if_fail(len > 0, FALSE);
1137 g_return_val_if_fail(len < 256, FALSE);
1138 g_return_val_if_fail(ip[len-1] == 0, FALSE);
1139
1140 char *oldip = (char *)g_hash_table_lookup(iphash, nodename);
1141
1142 if (!oldip || (strcmp(oldip, ip) != 0)) {
1143 cfs_status.clinfo_version++;
1144 g_hash_table_replace(iphash, g_strdup(nodename), g_strdup(ip));
1145 }
1146
1147 return TRUE;
1148}
1149
1150static gboolean
1151rrdentry_hash_set(
1152 GHashTable *rrdhash,
1153 const char *key,
1154 const char *data,
1155 size_t len)
1156{
1157 g_return_val_if_fail(rrdhash != NULL, FALSE);
1158 g_return_val_if_fail(key != NULL, FALSE);
1159 g_return_val_if_fail(data != NULL, FALSE);
1160 g_return_val_if_fail(len > 0, FALSE);
1161 g_return_val_if_fail(len < 4096, FALSE);
1162 g_return_val_if_fail(data[len-1] == 0, FALSE);
1163
1164 rrdentry_t *entry;
1165 if ((entry = (rrdentry_t *)g_hash_table_lookup(rrdhash, key))) {
1166 g_free(entry->data);
1167 entry->data = g_memdup(data, len);
1168 entry->len = len;
1169 entry->time = time(NULL);
1170 } else {
1171 rrdentry_t *entry = g_new0(rrdentry_t, 1);
1172
1173 entry->key = g_strdup(key);
1174 entry->data = g_memdup(data, len);
1175 entry->len = len;
1176 entry->time = time(NULL);
1177
1178 g_hash_table_replace(rrdhash, entry->key, entry);
1179 }
1180
1181 update_rrd_data(key, data, len);
1182
1183 return TRUE;
1184}
1185
1186static int
1187kvstore_send_update_message(
1188 dfsm_t *dfsm,
1189 const char *key,
1190 gpointer data,
1191 guint32 len)
1192{
af2e9dd4
DM
1193 if (!dfsm_is_initialized(dfsm))
1194 return -EACCES;
fe000966
DM
1195
1196 struct iovec iov[2];
1197
1198 char name[256];
1199 g_strlcpy(name, key, sizeof(name));
1200
1201 iov[0].iov_base = &name;
1202 iov[0].iov_len = sizeof(name);
1203
1204 iov[1].iov_base = (char *)data;
1205 iov[1].iov_len = len;
1206
1207 if (dfsm_send_message(dfsm, KVSTORE_MESSAGE_UPDATE, iov, 2) == CS_OK)
1208 return 0;
1209
1210 return -EACCES;
1211}
1212
1213static clog_entry_t *
1214kvstore_parse_log_message(
1215 const void *msg,
1216 size_t msg_len)
1217{
1218 g_return_val_if_fail(msg != NULL, NULL);
1219
1220 if (msg_len < sizeof(clog_entry_t)) {
e5a5a3ea 1221 cfs_critical("received short log message (%zu < %zu)", msg_len, sizeof(clog_entry_t));
fe000966
DM
1222 return NULL;
1223 }
1224
1225 clog_entry_t *entry = (clog_entry_t *)msg;
1226
1227 uint32_t size = sizeof(clog_entry_t) + entry->node_len +
1228 entry->ident_len + entry->tag_len + entry->msg_len;
1229
1230 if (msg_len != size) {
e5a5a3ea 1231 cfs_critical("received log message with wrong size (%zu != %u)", msg_len, size);
fe000966
DM
1232 return NULL;
1233 }
1234
1235 msg = entry->data;
1236
1237 if (*((char *)msg + entry->node_len - 1)) {
1238 cfs_critical("unterminated string in log message");
1239 return NULL;
1240 }
1241 msg += entry->node_len;
1242
1243 if (*((char *)msg + entry->ident_len - 1)) {
1244 cfs_critical("unterminated string in log message");
1245 return NULL;
1246 }
1247 msg += entry->ident_len;
1248
1249 if (*((char *)msg + entry->tag_len - 1)) {
1250 cfs_critical("unterminated string in log message");
1251 return NULL;
1252 }
1253 msg += entry->tag_len;
1254
1255 if (*((char *)msg + entry->msg_len - 1)) {
1256 cfs_critical("unterminated string in log message");
1257 return NULL;
1258 }
1259
1260 return entry;
1261}
1262
1263static gboolean
1264kvstore_parse_update_message(
1265 const void *msg,
1266 size_t msg_len,
1267 const char **key,
1268 gconstpointer *data,
1269 guint32 *len)
1270{
1271 g_return_val_if_fail(msg != NULL, FALSE);
1272 g_return_val_if_fail(key != NULL, FALSE);
1273 g_return_val_if_fail(data != NULL, FALSE);
1274 g_return_val_if_fail(len != NULL, FALSE);
1275
1276 if (msg_len < 256) {
e5a5a3ea 1277 cfs_critical("received short kvstore message (%zu < 256)", msg_len);
fe000966
DM
1278 return FALSE;
1279 }
1280
1281 /* test if key is null terminated */
1282 int i = 0;
1283 for (i = 0; i < 256; i++)
1284 if (((char *)msg)[i] == 0)
1285 break;
1286
1287 if (i == 256)
1288 return FALSE;
1289
1290
1291 *len = msg_len - 256;
1292 *key = msg;
1293 *data = msg + 256;
1294
1295 return TRUE;
1296}
1297
1298int
1299cfs_create_status_msg(
1300 GString *str,
1301 const char *nodename,
1302 const char *key)
1303{
1304 g_return_val_if_fail(str != NULL, -EINVAL);
1305 g_return_val_if_fail(key != NULL, -EINVAL);
1306
1307 int res = -ENOENT;
1308
1309 GHashTable *kvhash = NULL;
1310
89fde9ac 1311 g_mutex_lock (&mutex);
fe000966
DM
1312
1313 if (!nodename || !nodename[0] || !strcmp(nodename, cfs.nodename)) {
1314 kvhash = cfs_status.kvhash;
043bbd8f 1315 } else if (cfs_status.clinfo && cfs_status.clinfo->nodes_byname) {
fe000966
DM
1316 cfs_clnode_t *clnode;
1317 if ((clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byname, nodename)))
1318 kvhash = clnode->kvhash;
1319 }
1320
1321 kventry_t *entry;
1322 if (kvhash && (entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
1323 g_string_append_len(str, entry->data, entry->len);
1324 res = 0;
1325 }
1326
89fde9ac 1327 g_mutex_unlock (&mutex);
fe000966
DM
1328
1329 return res;
1330}
1331
1332int
1333cfs_status_set(
1334 const char *key,
1335 gpointer data,
1336 size_t len)
1337{
1338 g_return_val_if_fail(key != NULL, FALSE);
1339 g_return_val_if_fail(data != NULL, FALSE);
1340 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1341
1342 if (len > CFS_MAX_STATUS_SIZE)
1343 return -EFBIG;
1344
89fde9ac 1345 g_mutex_lock (&mutex);
fe000966
DM
1346
1347 gboolean res;
1348
1349 if (strncmp(key, "rrd/", 4) == 0) {
1350 res = rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1351 } else if (!strcmp(key, "nodeip")) {
1352 res = nodeip_hash_set(cfs_status.iphash, cfs.nodename, data, len);
1353 } else {
1354 res = kventry_hash_set(cfs_status.kvhash, key, data, len);
1355 }
89fde9ac 1356 g_mutex_unlock (&mutex);
fe000966
DM
1357
1358 if (cfs_status.kvstore)
1359 kvstore_send_update_message(cfs_status.kvstore, key, data, len);
1360
1361 return res ? 0 : -ENOMEM;
1362}
1363
1364gboolean
1365cfs_kvstore_node_set(
1366 uint32_t nodeid,
1367 const char *key,
1368 gconstpointer data,
1369 size_t len)
1370{
1371 g_return_val_if_fail(nodeid != 0, FALSE);
1372 g_return_val_if_fail(key != NULL, FALSE);
1373 g_return_val_if_fail(data != NULL, FALSE);
1374
89fde9ac 1375 g_mutex_lock (&mutex);
fe000966
DM
1376
1377 if (!cfs_status.clinfo || !cfs_status.clinfo->nodes_byid)
1378 goto ret; /* ignore */
1379
1380 cfs_clnode_t *clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byid, &nodeid);
1381 if (!clnode)
1382 goto ret; /* ignore */
1383
1384 cfs_debug("got node %d status update %s", nodeid, key);
1385
1386 if (strncmp(key, "rrd/", 4) == 0) {
1387 rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1388 } else if (!strcmp(key, "nodeip")) {
1389 nodeip_hash_set(cfs_status.iphash, clnode->name, data, len);
1390 } else {
1391 if (!clnode->kvhash) {
1392 if (!(clnode->kvhash = kventry_hash_new())) {
1393 goto ret; /*ignore */
1394 }
1395 }
1396
1397 kventry_hash_set(clnode->kvhash, key, data, len);
1398
1399 }
1400ret:
89fde9ac 1401 g_mutex_unlock (&mutex);
fe000966
DM
1402
1403 return TRUE;
1404}
1405
1406static gboolean
1407cfs_kvstore_sync(void)
1408{
1409 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1410 g_return_val_if_fail(cfs_status.kvstore != NULL, FALSE);
1411
1412 gboolean res = TRUE;
1413
89fde9ac 1414 g_mutex_lock (&mutex);
fe000966
DM
1415
1416 GHashTable *ht = cfs_status.kvhash;
1417 GHashTableIter iter;
1418 gpointer key, value;
1419
1420 g_hash_table_iter_init (&iter, ht);
1421
1422 while (g_hash_table_iter_next (&iter, &key, &value)) {
1423 kventry_t *entry = (kventry_t *)value;
1424 kvstore_send_update_message(cfs_status.kvstore, entry->key, entry->data, entry->len);
1425 }
1426
89fde9ac 1427 g_mutex_unlock (&mutex);
fe000966
DM
1428
1429 return res;
1430}
1431
1432static int
1433dfsm_deliver(
1434 dfsm_t *dfsm,
1435 gpointer data,
1436 int *res_ptr,
1437 uint32_t nodeid,
1438 uint32_t pid,
1439 uint16_t msg_type,
1440 uint32_t msg_time,
1441 const void *msg,
1442 size_t msg_len)
1443{
1444 g_return_val_if_fail(dfsm != NULL, -1);
1445 g_return_val_if_fail(msg != NULL, -1);
1446 g_return_val_if_fail(res_ptr != NULL, -1);
1447
1448 /* ignore message for ourself */
1449 if (dfsm_nodeid_is_local(dfsm, nodeid, pid))
1450 goto ret;
1451
1452 if (msg_type == KVSTORE_MESSAGE_UPDATE) {
1453 const char *key;
1454 gconstpointer data;
1455 guint32 len;
1456 if (kvstore_parse_update_message(msg, msg_len, &key, &data, &len)) {
1457 cfs_kvstore_node_set(nodeid, key, data, len);
1458 } else {
1459 cfs_critical("cant parse update message");
1460 }
1461 } else if (msg_type == KVSTORE_MESSAGE_LOG) {
1462 cfs_message("received log"); // fixme: remove
1463 const clog_entry_t *entry;
1464 if ((entry = kvstore_parse_log_message(msg, msg_len))) {
1465 clusterlog_insert(cfs_status.clusterlog, entry);
1466 } else {
1467 cfs_critical("cant parse log message");
1468 }
1469 } else {
1470 cfs_critical("received unknown message type %d\n", msg_type);
1471 goto fail;
1472 }
1473
1474ret:
1475 *res_ptr = 0;
1476 return 1;
1477
1478fail:
1479 *res_ptr = -EACCES;
1480 return 1;
1481}
1482
1483static void
1484dfsm_confchg(
1485 dfsm_t *dfsm,
1486 gpointer data,
1487 const struct cpg_address *member_list,
1488 size_t member_list_entries)
1489{
1490 g_return_if_fail(dfsm != NULL);
1491 g_return_if_fail(member_list != NULL);
1492
1493 cfs_debug("enter %s", __func__);
1494
89fde9ac 1495 g_mutex_lock (&mutex);
fe000966
DM
1496
1497 cfs_clinfo_t *clinfo = cfs_status.clinfo;
1498
1499 if (clinfo && clinfo->nodes_byid) {
1500
1501 GHashTable *ht = clinfo->nodes_byid;
1502 GHashTableIter iter;
1503 gpointer key, value;
1504
1505 g_hash_table_iter_init (&iter, ht);
1506
1507 while (g_hash_table_iter_next (&iter, &key, &value)) {
1508 cfs_clnode_t *node = (cfs_clnode_t *)value;
1509 node->online = FALSE;
1510 }
1511
1512 for (int i = 0; i < member_list_entries; i++) {
1513 cfs_clnode_t *node;
1514 if ((node = g_hash_table_lookup(clinfo->nodes_byid, &member_list[i].nodeid))) {
1515 node->online = TRUE;
1516 }
1517 }
1518
1519 cfs_status.clinfo_version++;
1520 }
1521
89fde9ac 1522 g_mutex_unlock (&mutex);
fe000966
DM
1523}
1524
1525static gpointer
1526dfsm_get_state(
1527 dfsm_t *dfsm,
1528 gpointer data,
1529 unsigned int *res_len)
1530{
1531 g_return_val_if_fail(dfsm != NULL, NULL);
1532
1533 gpointer msg = clusterlog_get_state(cfs_status.clusterlog, res_len);
1534
1535 return msg;
1536}
1537
1538static int
1539dfsm_process_update(
1540 dfsm_t *dfsm,
1541 gpointer data,
1542 dfsm_sync_info_t *syncinfo,
1543 uint32_t nodeid,
1544 uint32_t pid,
1545 const void *msg,
1546 size_t msg_len)
1547{
1548 cfs_critical("%s: received unexpected update message", __func__);
1549
1550 return -1;
1551}
1552
1553static int
1554dfsm_process_state_update(
1555 dfsm_t *dfsm,
1556 gpointer data,
1557 dfsm_sync_info_t *syncinfo)
1558{
1559 g_return_val_if_fail(dfsm != NULL, -1);
1560 g_return_val_if_fail(syncinfo != NULL, -1);
1561
1562 clog_base_t *clog[syncinfo->node_count];
1563
1564 int local_index = -1;
1565 for (int i = 0; i < syncinfo->node_count; i++) {
1566 dfsm_node_info_t *ni = &syncinfo->nodes[i];
1567 ni->synced = 1;
1568
1569 if (syncinfo->local == ni)
1570 local_index = i;
1571
1572 clog_base_t *base = (clog_base_t *)ni->state;
1573 if (ni->state_len > 8 && ni->state_len == clog_size(base)) {
1574 clog[i] = ni->state;
1575 } else {
1576 cfs_critical("received log with wrong size %u", ni->state_len);
1577 clog[i] = NULL;
1578 }
1579 }
1580
1581 if (!clusterlog_merge(cfs_status.clusterlog, clog, syncinfo->node_count, local_index)) {
1582 cfs_critical("unable to merge log files");
1583 }
1584
1585 cfs_kvstore_sync();
1586
1587 return 1;
1588}
1589
1590static int
1591dfsm_commit(
1592 dfsm_t *dfsm,
1593 gpointer data,
1594 dfsm_sync_info_t *syncinfo)
1595{
1596 g_return_val_if_fail(dfsm != NULL, -1);
1597 g_return_val_if_fail(syncinfo != NULL, -1);
1598
1599 return 1;
1600}
1601
1602static void
1603dfsm_synced(dfsm_t *dfsm)
1604{
1605 g_return_if_fail(dfsm != NULL);
1606
1607 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, cfs.nodename);
1608 if (!ip)
1609 ip = cfs.ip;
1610
1611 cfs_status_set("nodeip", ip, strlen(ip) + 1);
1612}
1613
1614static int
1615dfsm_cleanup(
1616 dfsm_t *dfsm,
1617 gpointer data,
1618 dfsm_sync_info_t *syncinfo)
1619{
1620 return 1;
1621}
1622
1623static dfsm_callbacks_t kvstore_dfsm_callbacks = {
1624 .dfsm_deliver_fn = dfsm_deliver,
1625 .dfsm_confchg_fn = dfsm_confchg,
1626
1627 .dfsm_get_state_fn = dfsm_get_state,
1628 .dfsm_process_state_update_fn = dfsm_process_state_update,
1629 .dfsm_process_update_fn = dfsm_process_update,
1630 .dfsm_commit_fn = dfsm_commit,
1631 .dfsm_cleanup_fn = dfsm_cleanup,
1632 .dfsm_synced_fn = dfsm_synced,
1633};
1634
1635dfsm_t *
1636cfs_status_dfsm_new(void)
1637{
89fde9ac 1638 g_mutex_lock (&mutex);
fe000966
DM
1639
1640 cfs_status.kvstore = dfsm_new(NULL, KVSTORE_CPG_GROUP_NAME, G_LOG_DOMAIN,
1641 0, &kvstore_dfsm_callbacks);
89fde9ac 1642 g_mutex_unlock (&mutex);
fe000966
DM
1643
1644 return cfs_status.kvstore;
1645}
1646
1647gboolean
1648cfs_is_quorate(void)
1649{
89fde9ac 1650 g_mutex_lock (&mutex);
fe000966 1651 gboolean res = cfs_status.quorate;
89fde9ac 1652 g_mutex_unlock (&mutex);
fe000966
DM
1653
1654 return res;
1655}
1656
1657void
1658cfs_set_quorate(
1659 uint32_t quorate,
1660 gboolean quiet)
1661{
89fde9ac 1662 g_mutex_lock (&mutex);
fe000966
DM
1663
1664 uint32_t prev_quorate = cfs_status.quorate;
1665 cfs_status.quorate = quorate;
1666
1667 if (!prev_quorate && cfs_status.quorate) {
1668 if (!quiet)
1669 cfs_message("node has quorum");
1670 }
1671
1672 if (prev_quorate && !cfs_status.quorate) {
1673 if (!quiet)
1674 cfs_message("node lost quorum");
1675 }
1676
89fde9ac 1677 g_mutex_unlock (&mutex);
fe000966 1678}