]> git.proxmox.com Git - pve-cluster.git/blob - data/src/status.c
9b5d4c6a929c77ab030dd13d3ef0e6c2f164e49b
[pve-cluster.git] / data / src / status.c
1 /*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19 */
20
21 #define G_LOG_DOMAIN "status"
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif /* HAVE_CONFIG_H */
26
27 #include <stdio.h>
28 #include <stdint.h>
29 #include <string.h>
30 #include <errno.h>
31 #include <glib.h>
32 #include <sys/syslog.h>
33 #include <rrd.h>
34 #include <rrd_client.h>
35 #include <time.h>
36
37 #include "cfs-utils.h"
38 #include "status.h"
39 #include "logger.h"
40
41 #define KVSTORE_CPG_GROUP_NAME "pve_kvstore_v1"
42
43 typedef enum {
44 KVSTORE_MESSAGE_UPDATE = 1,
45 KVSTORE_MESSAGE_UPDATE_COMPLETE = 2,
46 KVSTORE_MESSAGE_LOG = 3,
47 } kvstore_message_t;
48
49 static uint32_t vminfo_version_counter;
50
51 typedef struct {
52 uint32_t vmid;
53 char *nodename;
54 int vmtype;
55 uint32_t version;
56 } vminfo_t;
57
58 typedef struct {
59 char *key;
60 gpointer data;
61 size_t len;
62 uint32_t version;
63 } kventry_t;
64
65 typedef struct {
66 char *key;
67 gpointer data;
68 size_t len;
69 uint32_t time;
70 } rrdentry_t;
71
72 typedef struct {
73 char *path;
74 uint32_t version;
75 } memdb_change_t;
76
77 static memdb_change_t memdb_change_array[] = {
78 { .path = "corosync.conf" },
79 { .path = "corosync.conf.new" },
80 { .path = "storage.cfg" },
81 { .path = "user.cfg" },
82 { .path = "domains.cfg" },
83 { .path = "priv/shadow.cfg" },
84 { .path = "datacenter.cfg" },
85 { .path = "vzdump.cron" },
86 { .path = "ha/crm_commands" },
87 { .path = "ha/manager_status" },
88 { .path = "ha/resources.cfg" },
89 { .path = "ha/groups.cfg" },
90 { .path = "ha/fence.cfg" },
91 { .path = "status.cfg" },
92 { .path = "replication.cfg" },
93 { .path = "ceph.conf" },
94 };
95
96 static GMutex mutex;
97
98 typedef struct {
99 time_t start_time;
100
101 uint32_t quorate;
102
103 cfs_clinfo_t *clinfo;
104 uint32_t clinfo_version;
105
106 GHashTable *vmlist;
107 uint32_t vmlist_version;
108
109 dfsm_t *kvstore;
110 GHashTable *kvhash;
111 GHashTable *rrdhash;
112 GHashTable *iphash;
113
114 GHashTable *memdb_changes;
115
116 clusterlog_t *clusterlog;
117 } cfs_status_t;
118
119 static cfs_status_t cfs_status;
120
121 struct cfs_clnode {
122 char *name;
123 uint32_t nodeid;
124 uint32_t votes;
125 gboolean online;
126 GHashTable *kvhash;
127 };
128
129 struct cfs_clinfo {
130 char *cluster_name;
131 uint32_t cman_version;
132
133 GHashTable *nodes_byid;
134 GHashTable *nodes_byname;
135 };
136
137 static guint
138 g_int32_hash (gconstpointer v)
139 {
140 return *(const uint32_t *) v;
141 }
142
143 static gboolean
144 g_int32_equal (gconstpointer v1,
145 gconstpointer v2)
146 {
147 return *((const uint32_t*) v1) == *((const uint32_t*) v2);
148 }
149
150 static void vminfo_free(vminfo_t *vminfo)
151 {
152 g_return_if_fail(vminfo != NULL);
153
154 if (vminfo->nodename)
155 g_free(vminfo->nodename);
156
157
158 g_free(vminfo);
159 }
160
161 void cfs_clnode_destroy(
162 cfs_clnode_t *clnode)
163 {
164 g_return_if_fail(clnode != NULL);
165
166 if (clnode->kvhash)
167 g_hash_table_destroy(clnode->kvhash);
168
169 if (clnode->name)
170 g_free(clnode->name);
171
172 g_free(clnode);
173 }
174
175 cfs_clnode_t *cfs_clnode_new(
176 const char *name,
177 uint32_t nodeid,
178 uint32_t votes)
179 {
180 g_return_val_if_fail(name != NULL, NULL);
181
182 cfs_clnode_t *clnode = g_new0(cfs_clnode_t, 1);
183 if (!clnode)
184 return NULL;
185
186 clnode->name = g_strdup(name);
187 clnode->nodeid = nodeid;
188 clnode->votes = votes;
189
190 return clnode;
191 }
192
193 gboolean cfs_clinfo_destroy(
194 cfs_clinfo_t *clinfo)
195 {
196 g_return_val_if_fail(clinfo != NULL, FALSE);
197
198 if (clinfo->cluster_name)
199 g_free(clinfo->cluster_name);
200
201 if (clinfo->nodes_byname)
202 g_hash_table_destroy(clinfo->nodes_byname);
203
204 if (clinfo->nodes_byid)
205 g_hash_table_destroy(clinfo->nodes_byid);
206
207 g_free(clinfo);
208
209 return TRUE;
210 }
211
212 cfs_clinfo_t *cfs_clinfo_new(
213 const char *cluster_name,
214 uint32_t cman_version)
215 {
216 g_return_val_if_fail(cluster_name != NULL, NULL);
217
218 cfs_clinfo_t *clinfo = g_new0(cfs_clinfo_t, 1);
219 if (!clinfo)
220 return NULL;
221
222 clinfo->cluster_name = g_strdup(cluster_name);
223 clinfo->cman_version = cman_version;
224
225 if (!(clinfo->nodes_byid = g_hash_table_new_full(
226 g_int32_hash, g_int32_equal, NULL,
227 (GDestroyNotify)cfs_clnode_destroy)))
228 goto fail;
229
230 if (!(clinfo->nodes_byname = g_hash_table_new(g_str_hash, g_str_equal)))
231 goto fail;
232
233 return clinfo;
234
235 fail:
236 cfs_clinfo_destroy(clinfo);
237
238 return NULL;
239 }
240
241 gboolean cfs_clinfo_add_node(
242 cfs_clinfo_t *clinfo,
243 cfs_clnode_t *clnode)
244 {
245 g_return_val_if_fail(clinfo != NULL, FALSE);
246 g_return_val_if_fail(clnode != NULL, FALSE);
247
248 g_hash_table_replace(clinfo->nodes_byid, &clnode->nodeid, clnode);
249 g_hash_table_replace(clinfo->nodes_byname, clnode->name, clnode);
250
251 return TRUE;
252 }
253
254 int
255 cfs_create_memberlist_msg(
256 GString *str)
257 {
258 g_return_val_if_fail(str != NULL, -EINVAL);
259
260 g_mutex_lock (&mutex);
261
262 g_string_append_printf(str,"{\n");
263
264 guint nodecount = 0;
265
266 cfs_clinfo_t *clinfo = cfs_status.clinfo;
267
268 if (clinfo && clinfo->nodes_byid)
269 nodecount = g_hash_table_size(clinfo->nodes_byid);
270
271 if (nodecount) {
272 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
273 g_string_append_printf(str, "\"version\": %u,\n", cfs_status.clinfo_version);
274
275 g_string_append_printf(str, "\"cluster\": { ");
276 g_string_append_printf(str, "\"name\": \"%s\", \"version\": %d, "
277 "\"nodes\": %d, \"quorate\": %d ",
278 clinfo->cluster_name, clinfo->cman_version,
279 nodecount, cfs_status.quorate);
280
281 g_string_append_printf(str,"},\n");
282 g_string_append_printf(str,"\"nodelist\": {\n");
283
284 GHashTable *ht = clinfo->nodes_byid;
285 GHashTableIter iter;
286 gpointer key, value;
287
288 g_hash_table_iter_init (&iter, ht);
289
290 int i = 0;
291 while (g_hash_table_iter_next (&iter, &key, &value)) {
292 cfs_clnode_t *node = (cfs_clnode_t *)value;
293 if (i) g_string_append_printf(str, ",\n");
294 i++;
295
296 g_string_append_printf(str, " \"%s\": { \"id\": %d, \"online\": %d",
297 node->name, node->nodeid, node->online);
298
299
300 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, node->name);
301 if (ip) {
302 g_string_append_printf(str, ", \"ip\": \"%s\"", ip);
303 }
304
305 g_string_append_printf(str, "}");
306
307 }
308 g_string_append_printf(str,"\n }\n");
309 } else {
310 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
311 g_string_append_printf(str, "\"version\": %u\n", cfs_status.clinfo_version);
312 }
313
314 g_string_append_printf(str,"}\n");
315
316 g_mutex_unlock (&mutex);
317
318 return 0;
319 }
320
321 static void
322 kventry_free(kventry_t *entry)
323 {
324 g_return_if_fail(entry != NULL);
325
326 g_free(entry->key);
327 g_free(entry->data);
328 g_free(entry);
329 }
330
331 static GHashTable *
332 kventry_hash_new(void)
333 {
334 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
335 (GDestroyNotify)kventry_free);
336 }
337
338 static void
339 rrdentry_free(rrdentry_t *entry)
340 {
341 g_return_if_fail(entry != NULL);
342
343 g_free(entry->key);
344 g_free(entry->data);
345 g_free(entry);
346 }
347
348 static GHashTable *
349 rrdentry_hash_new(void)
350 {
351 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
352 (GDestroyNotify)rrdentry_free);
353 }
354
355 void
356 cfs_cluster_log_dump(GString *str, const char *user, guint max_entries)
357 {
358 clusterlog_dump(cfs_status.clusterlog, str, user, max_entries);
359 }
360
361 void
362 cfs_cluster_log(clog_entry_t *entry)
363 {
364 g_return_if_fail(entry != NULL);
365
366 clusterlog_insert(cfs_status.clusterlog, entry);
367
368 if (cfs_status.kvstore) {
369 struct iovec iov[1];
370 iov[0].iov_base = (char *)entry;
371 iov[0].iov_len = clog_entry_size(entry);
372
373 if (dfsm_is_initialized(cfs_status.kvstore))
374 dfsm_send_message(cfs_status.kvstore, KVSTORE_MESSAGE_LOG, iov, 1);
375 }
376 }
377
378 void cfs_status_init(void)
379 {
380 g_mutex_lock (&mutex);
381
382 cfs_status.start_time = time(NULL);
383
384 cfs_status.vmlist = vmlist_hash_new();
385
386 cfs_status.kvhash = kventry_hash_new();
387
388 cfs_status.rrdhash = rrdentry_hash_new();
389
390 cfs_status.iphash = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
391
392 cfs_status.memdb_changes = g_hash_table_new(g_str_hash, g_str_equal);
393
394 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
395 g_hash_table_replace(cfs_status.memdb_changes,
396 memdb_change_array[i].path,
397 &memdb_change_array[i]);
398 }
399
400 cfs_status.clusterlog = clusterlog_new();
401
402 // fixme:
403 clusterlog_add(cfs_status.clusterlog, "root", "cluster", getpid(),
404 LOG_INFO, "starting cluster log");
405
406 g_mutex_unlock (&mutex);
407 }
408
409 void cfs_status_cleanup(void)
410 {
411 g_mutex_lock (&mutex);
412
413 cfs_status.clinfo_version++;
414
415 if (cfs_status.clinfo) {
416 cfs_clinfo_destroy(cfs_status.clinfo);
417 cfs_status.clinfo = NULL;
418 }
419
420 if (cfs_status.vmlist) {
421 g_hash_table_destroy(cfs_status.vmlist);
422 cfs_status.vmlist = NULL;
423 }
424
425 if (cfs_status.kvhash) {
426 g_hash_table_destroy(cfs_status.kvhash);
427 cfs_status.kvhash = NULL;
428 }
429
430 if (cfs_status.rrdhash) {
431 g_hash_table_destroy(cfs_status.rrdhash);
432 cfs_status.rrdhash = NULL;
433 }
434
435 if (cfs_status.iphash) {
436 g_hash_table_destroy(cfs_status.iphash);
437 cfs_status.iphash = NULL;
438 }
439
440 if (cfs_status.clusterlog)
441 clusterlog_destroy(cfs_status.clusterlog);
442
443 g_mutex_unlock (&mutex);
444 }
445
446 void cfs_status_set_clinfo(
447 cfs_clinfo_t *clinfo)
448 {
449 g_return_if_fail(clinfo != NULL);
450
451 g_mutex_lock (&mutex);
452
453 cfs_status.clinfo_version++;
454
455 cfs_clinfo_t *old = cfs_status.clinfo;
456
457 cfs_status.clinfo = clinfo;
458
459 cfs_message("update cluster info (cluster name %s, version = %d)",
460 clinfo->cluster_name, clinfo->cman_version);
461
462
463 if (old && old->nodes_byid && clinfo->nodes_byid) {
464 /* copy kvstore */
465 GHashTable *ht = clinfo->nodes_byid;
466 GHashTableIter iter;
467 gpointer key, value;
468
469 g_hash_table_iter_init (&iter, ht);
470
471 while (g_hash_table_iter_next (&iter, &key, &value)) {
472 cfs_clnode_t *node = (cfs_clnode_t *)value;
473 cfs_clnode_t *oldnode;
474 if ((oldnode = g_hash_table_lookup(old->nodes_byid, key))) {
475 node->online = oldnode->online;
476 node->kvhash = oldnode->kvhash;
477 oldnode->kvhash = NULL;
478 }
479 }
480
481 }
482
483 if (old)
484 cfs_clinfo_destroy(old);
485
486
487 g_mutex_unlock (&mutex);
488 }
489
490 static void
491 dump_kvstore_versions(
492 GString *str,
493 GHashTable *kvhash,
494 const char *nodename)
495 {
496 g_return_if_fail(kvhash != NULL);
497 g_return_if_fail(str != NULL);
498 g_return_if_fail(nodename != NULL);
499
500 GHashTable *ht = kvhash;
501 GHashTableIter iter;
502 gpointer key, value;
503
504 g_string_append_printf(str, "\"%s\": {\n", nodename);
505
506 g_hash_table_iter_init (&iter, ht);
507
508 int i = 0;
509 while (g_hash_table_iter_next (&iter, &key, &value)) {
510 kventry_t *entry = (kventry_t *)value;
511 if (i) g_string_append_printf(str, ",\n");
512 i++;
513 g_string_append_printf(str,"\"%s\": %u", entry->key, entry->version);
514 }
515
516 g_string_append_printf(str, "}\n");
517 }
518
519 int
520 cfs_create_version_msg(GString *str)
521 {
522 g_return_val_if_fail(str != NULL, -EINVAL);
523
524 g_mutex_lock (&mutex);
525
526 g_string_append_printf(str,"{\n");
527
528 g_string_append_printf(str, "\"starttime\": %lu,\n", (unsigned long)cfs_status.start_time);
529
530 g_string_append_printf(str, "\"clinfo\": %u,\n", cfs_status.clinfo_version);
531
532 g_string_append_printf(str, "\"vmlist\": %u,\n", cfs_status.vmlist_version);
533
534 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
535 g_string_append_printf(str, "\"%s\": %u,\n",
536 memdb_change_array[i].path,
537 memdb_change_array[i].version);
538 }
539
540 g_string_append_printf(str, "\"kvstore\": {\n");
541
542 dump_kvstore_versions(str, cfs_status.kvhash, cfs.nodename);
543
544 cfs_clinfo_t *clinfo = cfs_status.clinfo;
545
546 if (clinfo && clinfo->nodes_byid) {
547 GHashTable *ht = clinfo->nodes_byid;
548 GHashTableIter iter;
549 gpointer key, value;
550
551 g_hash_table_iter_init (&iter, ht);
552
553 while (g_hash_table_iter_next (&iter, &key, &value)) {
554 cfs_clnode_t *node = (cfs_clnode_t *)value;
555 if (!node->kvhash)
556 continue;
557 g_string_append_printf(str, ",\n");
558 dump_kvstore_versions(str, node->kvhash, node->name);
559 }
560 }
561
562 g_string_append_printf(str,"}\n");
563
564 g_string_append_printf(str,"}\n");
565
566 g_mutex_unlock (&mutex);
567
568 return 0;
569 }
570
571 GHashTable *
572 vmlist_hash_new(void)
573 {
574 return g_hash_table_new_full(g_int_hash, g_int_equal, NULL,
575 (GDestroyNotify)vminfo_free);
576 }
577
578 gboolean
579 vmlist_hash_insert_vm(
580 GHashTable *vmlist,
581 int vmtype,
582 guint32 vmid,
583 const char *nodename,
584 gboolean replace)
585 {
586 g_return_val_if_fail(vmlist != NULL, FALSE);
587 g_return_val_if_fail(nodename != NULL, FALSE);
588 g_return_val_if_fail(vmid != 0, FALSE);
589 g_return_val_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
590 vmtype == VMTYPE_LXC, FALSE);
591
592 if (!replace && g_hash_table_lookup(vmlist, &vmid)) {
593 cfs_critical("detected duplicate VMID %d", vmid);
594 return FALSE;
595 }
596
597 vminfo_t *vminfo = g_new0(vminfo_t, 1);
598
599 vminfo->vmid = vmid;
600 vminfo->vmtype = vmtype;
601 vminfo->nodename = g_strdup(nodename);
602
603 vminfo->version = ++vminfo_version_counter;
604
605 g_hash_table_replace(vmlist, &vminfo->vmid, vminfo);
606
607 return TRUE;
608 }
609
610 void
611 vmlist_register_vm(
612 int vmtype,
613 guint32 vmid,
614 const char *nodename)
615 {
616 g_return_if_fail(cfs_status.vmlist != NULL);
617 g_return_if_fail(nodename != NULL);
618 g_return_if_fail(vmid != 0);
619 g_return_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
620 vmtype == VMTYPE_LXC);
621
622 cfs_debug("vmlist_register_vm: %s/%u %d", nodename, vmid, vmtype);
623
624 g_mutex_lock (&mutex);
625
626 cfs_status.vmlist_version++;
627
628 vmlist_hash_insert_vm(cfs_status.vmlist, vmtype, vmid, nodename, TRUE);
629
630 g_mutex_unlock (&mutex);
631 }
632
633 gboolean
634 vmlist_different_vm_exists(
635 int vmtype,
636 guint32 vmid,
637 const char *nodename)
638 {
639 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
640 g_return_val_if_fail(vmid != 0, FALSE);
641
642 gboolean res = FALSE;
643
644 g_mutex_lock (&mutex);
645
646 vminfo_t *vminfo;
647 if ((vminfo = (vminfo_t *)g_hash_table_lookup(cfs_status.vmlist, &vmid))) {
648 if (!(vminfo->vmtype == vmtype && strcmp(vminfo->nodename, nodename) == 0))
649 res = TRUE;
650 }
651 g_mutex_unlock (&mutex);
652
653 return res;
654 }
655
656 gboolean
657 vmlist_vm_exists(
658 guint32 vmid)
659 {
660 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
661 g_return_val_if_fail(vmid != 0, FALSE);
662
663 g_mutex_lock (&mutex);
664
665 gpointer res = g_hash_table_lookup(cfs_status.vmlist, &vmid);
666
667 g_mutex_unlock (&mutex);
668
669 return res != NULL;
670 }
671
672 void
673 vmlist_delete_vm(
674 guint32 vmid)
675 {
676 g_return_if_fail(cfs_status.vmlist != NULL);
677 g_return_if_fail(vmid != 0);
678
679 g_mutex_lock (&mutex);
680
681 cfs_status.vmlist_version++;
682
683 g_hash_table_remove(cfs_status.vmlist, &vmid);
684
685 g_mutex_unlock (&mutex);
686 }
687
688 void cfs_status_set_vmlist(
689 GHashTable *vmlist)
690 {
691 g_return_if_fail(vmlist != NULL);
692
693 g_mutex_lock (&mutex);
694
695 cfs_status.vmlist_version++;
696
697 if (cfs_status.vmlist)
698 g_hash_table_destroy(cfs_status.vmlist);
699
700 cfs_status.vmlist = vmlist;
701
702 g_mutex_unlock (&mutex);
703 }
704
705 int
706 cfs_create_vmlist_msg(GString *str)
707 {
708 g_return_val_if_fail(cfs_status.vmlist != NULL, -EINVAL);
709 g_return_val_if_fail(str != NULL, -EINVAL);
710
711 g_mutex_lock (&mutex);
712
713 g_string_append_printf(str,"{\n");
714
715 GHashTable *ht = cfs_status.vmlist;
716
717 guint count = g_hash_table_size(ht);
718
719 if (!count) {
720 g_string_append_printf(str,"\"version\": %u\n", cfs_status.vmlist_version);
721 } else {
722 g_string_append_printf(str,"\"version\": %u,\n", cfs_status.vmlist_version);
723
724 g_string_append_printf(str,"\"ids\": {\n");
725
726 GHashTableIter iter;
727 gpointer key, value;
728
729 g_hash_table_iter_init (&iter, ht);
730
731 int first = 1;
732 while (g_hash_table_iter_next (&iter, &key, &value)) {
733 vminfo_t *vminfo = (vminfo_t *)value;
734 char *type;
735 if (vminfo->vmtype == VMTYPE_QEMU) {
736 type = "qemu";
737 } else if (vminfo->vmtype == VMTYPE_OPENVZ) {
738 type = "openvz";
739 } else if (vminfo->vmtype == VMTYPE_LXC) {
740 type = "lxc";
741 } else {
742 type = "unknown";
743 }
744
745 if (!first)
746 g_string_append_printf(str, ",\n");
747 first = 0;
748
749 g_string_append_printf(str,"\"%u\": { \"node\": \"%s\", \"type\": \"%s\", \"version\": %u }",
750 vminfo->vmid, vminfo->nodename, type, vminfo->version);
751 }
752
753 g_string_append_printf(str,"}\n");
754 }
755 g_string_append_printf(str,"\n}\n");
756
757 g_mutex_unlock (&mutex);
758
759 return 0;
760 }
761
762 void
763 record_memdb_change(const char *path)
764 {
765 g_return_if_fail(cfs_status.memdb_changes != 0);
766
767 memdb_change_t *ce;
768
769 if ((ce = (memdb_change_t *)g_hash_table_lookup(cfs_status.memdb_changes, path))) {
770 ce->version++;
771 }
772 }
773
774 void
775 record_memdb_reload(void)
776 {
777 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
778 memdb_change_array[i].version++;
779 }
780 }
781
782 static gboolean
783 kventry_hash_set(
784 GHashTable *kvhash,
785 const char *key,
786 gconstpointer data,
787 size_t len)
788 {
789 g_return_val_if_fail(kvhash != NULL, FALSE);
790 g_return_val_if_fail(key != NULL, FALSE);
791 g_return_val_if_fail(data != NULL, FALSE);
792
793 kventry_t *entry;
794 if ((entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
795 g_free(entry->data);
796 entry->data = g_memdup(data, len);
797 entry->len = len;
798 entry->version++;
799 } else {
800 kventry_t *entry = g_new0(kventry_t, 1);
801
802 entry->key = g_strdup(key);
803 entry->data = g_memdup(data, len);
804 entry->len = len;
805
806 g_hash_table_replace(kvhash, entry->key, entry);
807 }
808
809 return TRUE;
810 }
811
812 static const char *rrd_def_node[] = {
813 "DS:loadavg:GAUGE:120:0:U",
814 "DS:maxcpu:GAUGE:120:0:U",
815 "DS:cpu:GAUGE:120:0:U",
816 "DS:iowait:GAUGE:120:0:U",
817 "DS:memtotal:GAUGE:120:0:U",
818 "DS:memused:GAUGE:120:0:U",
819 "DS:swaptotal:GAUGE:120:0:U",
820 "DS:swapused:GAUGE:120:0:U",
821 "DS:roottotal:GAUGE:120:0:U",
822 "DS:rootused:GAUGE:120:0:U",
823 "DS:netin:DERIVE:120:0:U",
824 "DS:netout:DERIVE:120:0:U",
825
826 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
827 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
828 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
829 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
830 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
831
832 "RRA:MAX:0.5:1:70", // 1 min max - one hour
833 "RRA:MAX:0.5:30:70", // 30 min max - one day
834 "RRA:MAX:0.5:180:70", // 3 hour max - one week
835 "RRA:MAX:0.5:720:70", // 12 hour max - one month
836 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
837 NULL,
838 };
839
840 static const char *rrd_def_vm[] = {
841 "DS:maxcpu:GAUGE:120:0:U",
842 "DS:cpu:GAUGE:120:0:U",
843 "DS:maxmem:GAUGE:120:0:U",
844 "DS:mem:GAUGE:120:0:U",
845 "DS:maxdisk:GAUGE:120:0:U",
846 "DS:disk:GAUGE:120:0:U",
847 "DS:netin:DERIVE:120:0:U",
848 "DS:netout:DERIVE:120:0:U",
849 "DS:diskread:DERIVE:120:0:U",
850 "DS:diskwrite:DERIVE:120:0:U",
851
852 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
853 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
854 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
855 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
856 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
857
858 "RRA:MAX:0.5:1:70", // 1 min max - one hour
859 "RRA:MAX:0.5:30:70", // 30 min max - one day
860 "RRA:MAX:0.5:180:70", // 3 hour max - one week
861 "RRA:MAX:0.5:720:70", // 12 hour max - one month
862 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
863 NULL,
864 };
865
866 static const char *rrd_def_storage[] = {
867 "DS:total:GAUGE:120:0:U",
868 "DS:used:GAUGE:120:0:U",
869
870 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
871 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
872 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
873 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
874 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
875
876 "RRA:MAX:0.5:1:70", // 1 min max - one hour
877 "RRA:MAX:0.5:30:70", // 30 min max - one day
878 "RRA:MAX:0.5:180:70", // 3 hour max - one week
879 "RRA:MAX:0.5:720:70", // 12 hour max - one month
880 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
881 NULL,
882 };
883
884 #define RRDDIR "/var/lib/rrdcached/db"
885
886 static void
887 create_rrd_file(
888 const char *filename,
889 int argcount,
890 const char *rrddef[])
891 {
892 /* start at day boundary */
893 time_t ctime;
894 time(&ctime);
895 struct tm *ltm = localtime(&ctime);
896 ltm->tm_sec = 0;
897 ltm->tm_min = 0;
898 ltm->tm_hour = 0;
899
900 rrd_clear_error();
901 if (rrd_create_r(filename, 60, timelocal(ltm), argcount, rrddef)) {
902 cfs_message("RRD create error %s: %s", filename, rrd_get_error());
903 }
904 }
905
906 static inline const char *
907 rrd_skip_data(
908 const char *data,
909 int count)
910 {
911 int found = 0;
912 while (*data && found < count) {
913 if (*data++ == ':')
914 found++;
915 }
916 return data;
917 }
918
919 static void
920 update_rrd_data(
921 const char *key,
922 gconstpointer data,
923 size_t len)
924 {
925 g_return_if_fail(key != NULL);
926 g_return_if_fail(data != NULL);
927 g_return_if_fail(len > 0);
928 g_return_if_fail(len < 4096);
929
930 static const char *rrdcsock = "unix:/var/run/rrdcached.sock";
931
932 int use_daemon = 1;
933 if (rrdc_connect(rrdcsock) != 0)
934 use_daemon = 0;
935
936 char *filename = NULL;
937
938 int skip = 0;
939
940 if (strncmp(key, "pve2-node/", 10) == 0) {
941 const char *node = key + 10;
942
943 skip = 2;
944
945 if (strchr(node, '/') != NULL)
946 goto keyerror;
947
948 if (strlen(node) < 1)
949 goto keyerror;
950
951 filename = g_strdup_printf(RRDDIR "/%s", key);
952
953 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
954
955 mkdir(RRDDIR "/pve2-node", 0755);
956 int argcount = sizeof(rrd_def_node)/sizeof(void*) - 1;
957 create_rrd_file(filename, argcount, rrd_def_node);
958 }
959
960 } else if ((strncmp(key, "pve2-vm/", 8) == 0) ||
961 (strncmp(key, "pve2.3-vm/", 10) == 0)) {
962 const char *vmid;
963
964 if (strncmp(key, "pve2-vm/", 8) == 0) {
965 vmid = key + 8;
966 skip = 2;
967 } else {
968 vmid = key + 10;
969 skip = 4;
970 }
971
972 if (strchr(vmid, '/') != NULL)
973 goto keyerror;
974
975 if (strlen(vmid) < 1)
976 goto keyerror;
977
978 filename = g_strdup_printf(RRDDIR "/%s/%s", "pve2-vm", vmid);
979
980 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
981
982 mkdir(RRDDIR "/pve2-vm", 0755);
983 int argcount = sizeof(rrd_def_vm)/sizeof(void*) - 1;
984 create_rrd_file(filename, argcount, rrd_def_vm);
985 }
986
987 } else if (strncmp(key, "pve2-storage/", 13) == 0) {
988 const char *node = key + 13;
989
990 const char *storage = node;
991 while (*storage && *storage != '/')
992 storage++;
993
994 if (*storage != '/' || ((storage - node) < 1))
995 goto keyerror;
996
997 storage++;
998
999 if (strchr(storage, '/') != NULL)
1000 goto keyerror;
1001
1002 if (strlen(storage) < 1)
1003 goto keyerror;
1004
1005 filename = g_strdup_printf(RRDDIR "/%s", key);
1006
1007 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1008
1009 mkdir(RRDDIR "/pve2-storage", 0755);
1010
1011 char *dir = g_path_get_dirname(filename);
1012 mkdir(dir, 0755);
1013 g_free(dir);
1014
1015 int argcount = sizeof(rrd_def_storage)/sizeof(void*) - 1;
1016 create_rrd_file(filename, argcount, rrd_def_storage);
1017 }
1018
1019 } else {
1020 goto keyerror;
1021 }
1022
1023 const char *dp = skip ? rrd_skip_data(data, skip) : data;
1024
1025 const char *update_args[] = { dp, NULL };
1026
1027 if (use_daemon) {
1028 int status;
1029 if ((status = rrdc_update(filename, 1, update_args)) != 0) {
1030 cfs_message("RRDC update error %s: %d", filename, status);
1031 rrdc_disconnect();
1032 rrd_clear_error();
1033 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1034 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1035 }
1036 }
1037
1038 } else {
1039 rrd_clear_error();
1040 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1041 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1042 }
1043 }
1044
1045 ret:
1046 if (filename)
1047 g_free(filename);
1048
1049 return;
1050
1051 keyerror:
1052 cfs_critical("RRD update error: unknown/wrong key %s", key);
1053 goto ret;
1054 }
1055
1056 static gboolean
1057 rrd_entry_is_old(
1058 gpointer key,
1059 gpointer value,
1060 gpointer user_data)
1061 {
1062 rrdentry_t *entry = (rrdentry_t *)value;
1063 uint32_t ctime = GPOINTER_TO_UINT(user_data);
1064
1065 int diff = ctime - entry->time;
1066
1067 /* remove everything older than 5 minutes */
1068 int expire = 60*5;
1069
1070 return (diff > expire) ? TRUE : FALSE;
1071 }
1072
1073 static char *rrd_dump_buf = NULL;
1074 static time_t rrd_dump_last = 0;
1075
1076 void
1077 cfs_rrd_dump(GString *str)
1078 {
1079 time_t ctime;
1080
1081 g_mutex_lock (&mutex);
1082
1083 time(&ctime);
1084 if (rrd_dump_buf && (ctime - rrd_dump_last) < 2) {
1085 g_string_assign(str, rrd_dump_buf);
1086 g_mutex_unlock (&mutex);
1087 return;
1088 }
1089
1090 /* remove old data */
1091 g_hash_table_foreach_remove(cfs_status.rrdhash, rrd_entry_is_old,
1092 GUINT_TO_POINTER(ctime));
1093
1094 g_string_set_size(str, 0);
1095
1096 GHashTableIter iter;
1097 gpointer key, value;
1098
1099 g_hash_table_iter_init (&iter, cfs_status.rrdhash);
1100
1101 while (g_hash_table_iter_next (&iter, &key, &value)) {
1102 rrdentry_t *entry = (rrdentry_t *)value;
1103 g_string_append(str, key);
1104 g_string_append(str, ":");
1105 g_string_append(str, entry->data);
1106 g_string_append(str, "\n");
1107 }
1108
1109 g_string_append_c(str, 0); // never return undef
1110
1111 rrd_dump_last = ctime;
1112 if (rrd_dump_buf)
1113 g_free(rrd_dump_buf);
1114 rrd_dump_buf = g_strdup(str->str);
1115
1116 g_mutex_unlock (&mutex);
1117 }
1118
1119 static gboolean
1120 nodeip_hash_set(
1121 GHashTable *iphash,
1122 const char *nodename,
1123 const char *ip,
1124 size_t len)
1125 {
1126 g_return_val_if_fail(iphash != NULL, FALSE);
1127 g_return_val_if_fail(nodename != NULL, FALSE);
1128 g_return_val_if_fail(ip != NULL, FALSE);
1129 g_return_val_if_fail(len > 0, FALSE);
1130 g_return_val_if_fail(len < 256, FALSE);
1131 g_return_val_if_fail(ip[len-1] == 0, FALSE);
1132
1133 char *oldip = (char *)g_hash_table_lookup(iphash, nodename);
1134
1135 if (!oldip || (strcmp(oldip, ip) != 0)) {
1136 cfs_status.clinfo_version++;
1137 g_hash_table_replace(iphash, g_strdup(nodename), g_strdup(ip));
1138 }
1139
1140 return TRUE;
1141 }
1142
1143 static gboolean
1144 rrdentry_hash_set(
1145 GHashTable *rrdhash,
1146 const char *key,
1147 const char *data,
1148 size_t len)
1149 {
1150 g_return_val_if_fail(rrdhash != NULL, FALSE);
1151 g_return_val_if_fail(key != NULL, FALSE);
1152 g_return_val_if_fail(data != NULL, FALSE);
1153 g_return_val_if_fail(len > 0, FALSE);
1154 g_return_val_if_fail(len < 4096, FALSE);
1155 g_return_val_if_fail(data[len-1] == 0, FALSE);
1156
1157 rrdentry_t *entry;
1158 if ((entry = (rrdentry_t *)g_hash_table_lookup(rrdhash, key))) {
1159 g_free(entry->data);
1160 entry->data = g_memdup(data, len);
1161 entry->len = len;
1162 entry->time = time(NULL);
1163 } else {
1164 rrdentry_t *entry = g_new0(rrdentry_t, 1);
1165
1166 entry->key = g_strdup(key);
1167 entry->data = g_memdup(data, len);
1168 entry->len = len;
1169 entry->time = time(NULL);
1170
1171 g_hash_table_replace(rrdhash, entry->key, entry);
1172 }
1173
1174 update_rrd_data(key, data, len);
1175
1176 return TRUE;
1177 }
1178
1179 static int
1180 kvstore_send_update_message(
1181 dfsm_t *dfsm,
1182 const char *key,
1183 gpointer data,
1184 guint32 len)
1185 {
1186 if (!dfsm_is_initialized(dfsm))
1187 return -EACCES;
1188
1189 struct iovec iov[2];
1190
1191 char name[256];
1192 g_strlcpy(name, key, sizeof(name));
1193
1194 iov[0].iov_base = &name;
1195 iov[0].iov_len = sizeof(name);
1196
1197 iov[1].iov_base = (char *)data;
1198 iov[1].iov_len = len;
1199
1200 if (dfsm_send_message(dfsm, KVSTORE_MESSAGE_UPDATE, iov, 2) == CS_OK)
1201 return 0;
1202
1203 return -EACCES;
1204 }
1205
1206 static clog_entry_t *
1207 kvstore_parse_log_message(
1208 const void *msg,
1209 size_t msg_len)
1210 {
1211 g_return_val_if_fail(msg != NULL, NULL);
1212
1213 if (msg_len < sizeof(clog_entry_t)) {
1214 cfs_critical("received short log message (%zu < %zu)", msg_len, sizeof(clog_entry_t));
1215 return NULL;
1216 }
1217
1218 clog_entry_t *entry = (clog_entry_t *)msg;
1219
1220 uint32_t size = sizeof(clog_entry_t) + entry->node_len +
1221 entry->ident_len + entry->tag_len + entry->msg_len;
1222
1223 if (msg_len != size) {
1224 cfs_critical("received log message with wrong size (%zu != %u)", msg_len, size);
1225 return NULL;
1226 }
1227
1228 msg = entry->data;
1229
1230 if (*((char *)msg + entry->node_len - 1)) {
1231 cfs_critical("unterminated string in log message");
1232 return NULL;
1233 }
1234 msg += entry->node_len;
1235
1236 if (*((char *)msg + entry->ident_len - 1)) {
1237 cfs_critical("unterminated string in log message");
1238 return NULL;
1239 }
1240 msg += entry->ident_len;
1241
1242 if (*((char *)msg + entry->tag_len - 1)) {
1243 cfs_critical("unterminated string in log message");
1244 return NULL;
1245 }
1246 msg += entry->tag_len;
1247
1248 if (*((char *)msg + entry->msg_len - 1)) {
1249 cfs_critical("unterminated string in log message");
1250 return NULL;
1251 }
1252
1253 return entry;
1254 }
1255
1256 static gboolean
1257 kvstore_parse_update_message(
1258 const void *msg,
1259 size_t msg_len,
1260 const char **key,
1261 gconstpointer *data,
1262 guint32 *len)
1263 {
1264 g_return_val_if_fail(msg != NULL, FALSE);
1265 g_return_val_if_fail(key != NULL, FALSE);
1266 g_return_val_if_fail(data != NULL, FALSE);
1267 g_return_val_if_fail(len != NULL, FALSE);
1268
1269 if (msg_len < 256) {
1270 cfs_critical("received short kvstore message (%zu < 256)", msg_len);
1271 return FALSE;
1272 }
1273
1274 /* test if key is null terminated */
1275 int i = 0;
1276 for (i = 0; i < 256; i++)
1277 if (((char *)msg)[i] == 0)
1278 break;
1279
1280 if (i == 256)
1281 return FALSE;
1282
1283
1284 *len = msg_len - 256;
1285 *key = msg;
1286 *data = msg + 256;
1287
1288 return TRUE;
1289 }
1290
1291 int
1292 cfs_create_status_msg(
1293 GString *str,
1294 const char *nodename,
1295 const char *key)
1296 {
1297 g_return_val_if_fail(str != NULL, -EINVAL);
1298 g_return_val_if_fail(key != NULL, -EINVAL);
1299
1300 int res = -ENOENT;
1301
1302 GHashTable *kvhash = NULL;
1303
1304 g_mutex_lock (&mutex);
1305
1306 if (!nodename || !nodename[0] || !strcmp(nodename, cfs.nodename)) {
1307 kvhash = cfs_status.kvhash;
1308 } else if (cfs_status.clinfo && cfs_status.clinfo->nodes_byname) {
1309 cfs_clnode_t *clnode;
1310 if ((clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byname, nodename)))
1311 kvhash = clnode->kvhash;
1312 }
1313
1314 kventry_t *entry;
1315 if (kvhash && (entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
1316 g_string_append_len(str, entry->data, entry->len);
1317 res = 0;
1318 }
1319
1320 g_mutex_unlock (&mutex);
1321
1322 return res;
1323 }
1324
1325 int
1326 cfs_status_set(
1327 const char *key,
1328 gpointer data,
1329 size_t len)
1330 {
1331 g_return_val_if_fail(key != NULL, FALSE);
1332 g_return_val_if_fail(data != NULL, FALSE);
1333 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1334
1335 if (len > CFS_MAX_STATUS_SIZE)
1336 return -EFBIG;
1337
1338 g_mutex_lock (&mutex);
1339
1340 gboolean res;
1341
1342 if (strncmp(key, "rrd/", 4) == 0) {
1343 res = rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1344 } else if (!strcmp(key, "nodeip")) {
1345 res = nodeip_hash_set(cfs_status.iphash, cfs.nodename, data, len);
1346 } else {
1347 res = kventry_hash_set(cfs_status.kvhash, key, data, len);
1348 }
1349 g_mutex_unlock (&mutex);
1350
1351 if (cfs_status.kvstore)
1352 kvstore_send_update_message(cfs_status.kvstore, key, data, len);
1353
1354 return res ? 0 : -ENOMEM;
1355 }
1356
1357 gboolean
1358 cfs_kvstore_node_set(
1359 uint32_t nodeid,
1360 const char *key,
1361 gconstpointer data,
1362 size_t len)
1363 {
1364 g_return_val_if_fail(nodeid != 0, FALSE);
1365 g_return_val_if_fail(key != NULL, FALSE);
1366 g_return_val_if_fail(data != NULL, FALSE);
1367
1368 g_mutex_lock (&mutex);
1369
1370 if (!cfs_status.clinfo || !cfs_status.clinfo->nodes_byid)
1371 goto ret; /* ignore */
1372
1373 cfs_clnode_t *clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byid, &nodeid);
1374 if (!clnode)
1375 goto ret; /* ignore */
1376
1377 cfs_debug("got node %d status update %s", nodeid, key);
1378
1379 if (strncmp(key, "rrd/", 4) == 0) {
1380 rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1381 } else if (!strcmp(key, "nodeip")) {
1382 nodeip_hash_set(cfs_status.iphash, clnode->name, data, len);
1383 } else {
1384 if (!clnode->kvhash) {
1385 if (!(clnode->kvhash = kventry_hash_new())) {
1386 goto ret; /*ignore */
1387 }
1388 }
1389
1390 kventry_hash_set(clnode->kvhash, key, data, len);
1391
1392 }
1393 ret:
1394 g_mutex_unlock (&mutex);
1395
1396 return TRUE;
1397 }
1398
1399 static gboolean
1400 cfs_kvstore_sync(void)
1401 {
1402 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1403 g_return_val_if_fail(cfs_status.kvstore != NULL, FALSE);
1404
1405 gboolean res = TRUE;
1406
1407 g_mutex_lock (&mutex);
1408
1409 GHashTable *ht = cfs_status.kvhash;
1410 GHashTableIter iter;
1411 gpointer key, value;
1412
1413 g_hash_table_iter_init (&iter, ht);
1414
1415 while (g_hash_table_iter_next (&iter, &key, &value)) {
1416 kventry_t *entry = (kventry_t *)value;
1417 kvstore_send_update_message(cfs_status.kvstore, entry->key, entry->data, entry->len);
1418 }
1419
1420 g_mutex_unlock (&mutex);
1421
1422 return res;
1423 }
1424
1425 static int
1426 dfsm_deliver(
1427 dfsm_t *dfsm,
1428 gpointer data,
1429 int *res_ptr,
1430 uint32_t nodeid,
1431 uint32_t pid,
1432 uint16_t msg_type,
1433 uint32_t msg_time,
1434 const void *msg,
1435 size_t msg_len)
1436 {
1437 g_return_val_if_fail(dfsm != NULL, -1);
1438 g_return_val_if_fail(msg != NULL, -1);
1439 g_return_val_if_fail(res_ptr != NULL, -1);
1440
1441 /* ignore message for ourself */
1442 if (dfsm_nodeid_is_local(dfsm, nodeid, pid))
1443 goto ret;
1444
1445 if (msg_type == KVSTORE_MESSAGE_UPDATE) {
1446 const char *key;
1447 gconstpointer data;
1448 guint32 len;
1449 if (kvstore_parse_update_message(msg, msg_len, &key, &data, &len)) {
1450 cfs_kvstore_node_set(nodeid, key, data, len);
1451 } else {
1452 cfs_critical("cant parse update message");
1453 }
1454 } else if (msg_type == KVSTORE_MESSAGE_LOG) {
1455 cfs_message("received log"); // fixme: remove
1456 const clog_entry_t *entry;
1457 if ((entry = kvstore_parse_log_message(msg, msg_len))) {
1458 clusterlog_insert(cfs_status.clusterlog, entry);
1459 } else {
1460 cfs_critical("cant parse log message");
1461 }
1462 } else {
1463 cfs_critical("received unknown message type %d\n", msg_type);
1464 goto fail;
1465 }
1466
1467 ret:
1468 *res_ptr = 0;
1469 return 1;
1470
1471 fail:
1472 *res_ptr = -EACCES;
1473 return 1;
1474 }
1475
1476 static void
1477 dfsm_confchg(
1478 dfsm_t *dfsm,
1479 gpointer data,
1480 const struct cpg_address *member_list,
1481 size_t member_list_entries)
1482 {
1483 g_return_if_fail(dfsm != NULL);
1484 g_return_if_fail(member_list != NULL);
1485
1486 cfs_debug("enter %s", __func__);
1487
1488 g_mutex_lock (&mutex);
1489
1490 cfs_clinfo_t *clinfo = cfs_status.clinfo;
1491
1492 if (clinfo && clinfo->nodes_byid) {
1493
1494 GHashTable *ht = clinfo->nodes_byid;
1495 GHashTableIter iter;
1496 gpointer key, value;
1497
1498 g_hash_table_iter_init (&iter, ht);
1499
1500 while (g_hash_table_iter_next (&iter, &key, &value)) {
1501 cfs_clnode_t *node = (cfs_clnode_t *)value;
1502 node->online = FALSE;
1503 }
1504
1505 for (int i = 0; i < member_list_entries; i++) {
1506 cfs_clnode_t *node;
1507 if ((node = g_hash_table_lookup(clinfo->nodes_byid, &member_list[i].nodeid))) {
1508 node->online = TRUE;
1509 }
1510 }
1511
1512 cfs_status.clinfo_version++;
1513 }
1514
1515 g_mutex_unlock (&mutex);
1516 }
1517
1518 static gpointer
1519 dfsm_get_state(
1520 dfsm_t *dfsm,
1521 gpointer data,
1522 unsigned int *res_len)
1523 {
1524 g_return_val_if_fail(dfsm != NULL, NULL);
1525
1526 gpointer msg = clusterlog_get_state(cfs_status.clusterlog, res_len);
1527
1528 return msg;
1529 }
1530
1531 static int
1532 dfsm_process_update(
1533 dfsm_t *dfsm,
1534 gpointer data,
1535 dfsm_sync_info_t *syncinfo,
1536 uint32_t nodeid,
1537 uint32_t pid,
1538 const void *msg,
1539 size_t msg_len)
1540 {
1541 cfs_critical("%s: received unexpected update message", __func__);
1542
1543 return -1;
1544 }
1545
1546 static int
1547 dfsm_process_state_update(
1548 dfsm_t *dfsm,
1549 gpointer data,
1550 dfsm_sync_info_t *syncinfo)
1551 {
1552 g_return_val_if_fail(dfsm != NULL, -1);
1553 g_return_val_if_fail(syncinfo != NULL, -1);
1554
1555 clog_base_t *clog[syncinfo->node_count];
1556
1557 int local_index = -1;
1558 for (int i = 0; i < syncinfo->node_count; i++) {
1559 dfsm_node_info_t *ni = &syncinfo->nodes[i];
1560 ni->synced = 1;
1561
1562 if (syncinfo->local == ni)
1563 local_index = i;
1564
1565 clog_base_t *base = (clog_base_t *)ni->state;
1566 if (ni->state_len > 8 && ni->state_len == clog_size(base)) {
1567 clog[i] = ni->state;
1568 } else {
1569 cfs_critical("received log with wrong size %u", ni->state_len);
1570 clog[i] = NULL;
1571 }
1572 }
1573
1574 if (!clusterlog_merge(cfs_status.clusterlog, clog, syncinfo->node_count, local_index)) {
1575 cfs_critical("unable to merge log files");
1576 }
1577
1578 cfs_kvstore_sync();
1579
1580 return 1;
1581 }
1582
1583 static int
1584 dfsm_commit(
1585 dfsm_t *dfsm,
1586 gpointer data,
1587 dfsm_sync_info_t *syncinfo)
1588 {
1589 g_return_val_if_fail(dfsm != NULL, -1);
1590 g_return_val_if_fail(syncinfo != NULL, -1);
1591
1592 return 1;
1593 }
1594
1595 static void
1596 dfsm_synced(dfsm_t *dfsm)
1597 {
1598 g_return_if_fail(dfsm != NULL);
1599
1600 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, cfs.nodename);
1601 if (!ip)
1602 ip = cfs.ip;
1603
1604 cfs_status_set("nodeip", ip, strlen(ip) + 1);
1605 }
1606
1607 static int
1608 dfsm_cleanup(
1609 dfsm_t *dfsm,
1610 gpointer data,
1611 dfsm_sync_info_t *syncinfo)
1612 {
1613 return 1;
1614 }
1615
1616 static dfsm_callbacks_t kvstore_dfsm_callbacks = {
1617 .dfsm_deliver_fn = dfsm_deliver,
1618 .dfsm_confchg_fn = dfsm_confchg,
1619
1620 .dfsm_get_state_fn = dfsm_get_state,
1621 .dfsm_process_state_update_fn = dfsm_process_state_update,
1622 .dfsm_process_update_fn = dfsm_process_update,
1623 .dfsm_commit_fn = dfsm_commit,
1624 .dfsm_cleanup_fn = dfsm_cleanup,
1625 .dfsm_synced_fn = dfsm_synced,
1626 };
1627
1628 dfsm_t *
1629 cfs_status_dfsm_new(void)
1630 {
1631 g_mutex_lock (&mutex);
1632
1633 cfs_status.kvstore = dfsm_new(NULL, KVSTORE_CPG_GROUP_NAME, G_LOG_DOMAIN,
1634 0, &kvstore_dfsm_callbacks);
1635 g_mutex_unlock (&mutex);
1636
1637 return cfs_status.kvstore;
1638 }
1639
1640 gboolean
1641 cfs_is_quorate(void)
1642 {
1643 g_mutex_lock (&mutex);
1644 gboolean res = cfs_status.quorate;
1645 g_mutex_unlock (&mutex);
1646
1647 return res;
1648 }
1649
1650 void
1651 cfs_set_quorate(
1652 uint32_t quorate,
1653 gboolean quiet)
1654 {
1655 g_mutex_lock (&mutex);
1656
1657 uint32_t prev_quorate = cfs_status.quorate;
1658 cfs_status.quorate = quorate;
1659
1660 if (!prev_quorate && cfs_status.quorate) {
1661 if (!quiet)
1662 cfs_message("node has quorum");
1663 }
1664
1665 if (prev_quorate && !cfs_status.quorate) {
1666 if (!quiet)
1667 cfs_message("node lost quorum");
1668 }
1669
1670 g_mutex_unlock (&mutex);
1671 }
1672