]> git.proxmox.com Git - pve-cluster.git/blob - data/src/status.c
actually add priv/tfa.cfg to observed files
[pve-cluster.git] / data / src / status.c
1 /*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19 */
20
21 #define G_LOG_DOMAIN "status"
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif /* HAVE_CONFIG_H */
26
27 #include <stdio.h>
28 #include <stdint.h>
29 #include <string.h>
30 #include <errno.h>
31 #include <glib.h>
32 #include <sys/syslog.h>
33 #include <rrd.h>
34 #include <rrd_client.h>
35 #include <time.h>
36
37 #include "cfs-utils.h"
38 #include "status.h"
39 #include "logger.h"
40
41 #define KVSTORE_CPG_GROUP_NAME "pve_kvstore_v1"
42
43 typedef enum {
44 KVSTORE_MESSAGE_UPDATE = 1,
45 KVSTORE_MESSAGE_UPDATE_COMPLETE = 2,
46 KVSTORE_MESSAGE_LOG = 3,
47 } kvstore_message_t;
48
49 static uint32_t vminfo_version_counter;
50
51 typedef struct {
52 uint32_t vmid;
53 char *nodename;
54 int vmtype;
55 uint32_t version;
56 } vminfo_t;
57
58 typedef struct {
59 char *key;
60 gpointer data;
61 size_t len;
62 uint32_t version;
63 } kventry_t;
64
65 typedef struct {
66 char *key;
67 gpointer data;
68 size_t len;
69 uint32_t time;
70 } rrdentry_t;
71
72 typedef struct {
73 char *path;
74 uint32_t version;
75 } memdb_change_t;
76
77 static memdb_change_t memdb_change_array[] = {
78 { .path = "corosync.conf" },
79 { .path = "corosync.conf.new" },
80 { .path = "storage.cfg" },
81 { .path = "user.cfg" },
82 { .path = "domains.cfg" },
83 { .path = "priv/shadow.cfg" },
84 { .path = "priv/tfa.cfg" },
85 { .path = "datacenter.cfg" },
86 { .path = "vzdump.cron" },
87 { .path = "ha/crm_commands" },
88 { .path = "ha/manager_status" },
89 { .path = "ha/resources.cfg" },
90 { .path = "ha/groups.cfg" },
91 { .path = "ha/fence.cfg" },
92 { .path = "status.cfg" },
93 { .path = "replication.cfg" },
94 { .path = "ceph.conf" },
95 };
96
97 static GMutex mutex;
98
99 typedef struct {
100 time_t start_time;
101
102 uint32_t quorate;
103
104 cfs_clinfo_t *clinfo;
105 uint32_t clinfo_version;
106
107 GHashTable *vmlist;
108 uint32_t vmlist_version;
109
110 dfsm_t *kvstore;
111 GHashTable *kvhash;
112 GHashTable *rrdhash;
113 GHashTable *iphash;
114
115 GHashTable *memdb_changes;
116
117 clusterlog_t *clusterlog;
118 } cfs_status_t;
119
120 static cfs_status_t cfs_status;
121
122 struct cfs_clnode {
123 char *name;
124 uint32_t nodeid;
125 uint32_t votes;
126 gboolean online;
127 GHashTable *kvhash;
128 };
129
130 struct cfs_clinfo {
131 char *cluster_name;
132 uint32_t cman_version;
133
134 GHashTable *nodes_byid;
135 GHashTable *nodes_byname;
136 };
137
138 static guint
139 g_int32_hash (gconstpointer v)
140 {
141 return *(const uint32_t *) v;
142 }
143
144 static gboolean
145 g_int32_equal (gconstpointer v1,
146 gconstpointer v2)
147 {
148 return *((const uint32_t*) v1) == *((const uint32_t*) v2);
149 }
150
151 static void vminfo_free(vminfo_t *vminfo)
152 {
153 g_return_if_fail(vminfo != NULL);
154
155 if (vminfo->nodename)
156 g_free(vminfo->nodename);
157
158
159 g_free(vminfo);
160 }
161
162 void cfs_clnode_destroy(
163 cfs_clnode_t *clnode)
164 {
165 g_return_if_fail(clnode != NULL);
166
167 if (clnode->kvhash)
168 g_hash_table_destroy(clnode->kvhash);
169
170 if (clnode->name)
171 g_free(clnode->name);
172
173 g_free(clnode);
174 }
175
176 cfs_clnode_t *cfs_clnode_new(
177 const char *name,
178 uint32_t nodeid,
179 uint32_t votes)
180 {
181 g_return_val_if_fail(name != NULL, NULL);
182
183 cfs_clnode_t *clnode = g_new0(cfs_clnode_t, 1);
184 if (!clnode)
185 return NULL;
186
187 clnode->name = g_strdup(name);
188 clnode->nodeid = nodeid;
189 clnode->votes = votes;
190
191 return clnode;
192 }
193
194 gboolean cfs_clinfo_destroy(
195 cfs_clinfo_t *clinfo)
196 {
197 g_return_val_if_fail(clinfo != NULL, FALSE);
198
199 if (clinfo->cluster_name)
200 g_free(clinfo->cluster_name);
201
202 if (clinfo->nodes_byname)
203 g_hash_table_destroy(clinfo->nodes_byname);
204
205 if (clinfo->nodes_byid)
206 g_hash_table_destroy(clinfo->nodes_byid);
207
208 g_free(clinfo);
209
210 return TRUE;
211 }
212
213 cfs_clinfo_t *cfs_clinfo_new(
214 const char *cluster_name,
215 uint32_t cman_version)
216 {
217 g_return_val_if_fail(cluster_name != NULL, NULL);
218
219 cfs_clinfo_t *clinfo = g_new0(cfs_clinfo_t, 1);
220 if (!clinfo)
221 return NULL;
222
223 clinfo->cluster_name = g_strdup(cluster_name);
224 clinfo->cman_version = cman_version;
225
226 if (!(clinfo->nodes_byid = g_hash_table_new_full(
227 g_int32_hash, g_int32_equal, NULL,
228 (GDestroyNotify)cfs_clnode_destroy)))
229 goto fail;
230
231 if (!(clinfo->nodes_byname = g_hash_table_new(g_str_hash, g_str_equal)))
232 goto fail;
233
234 return clinfo;
235
236 fail:
237 cfs_clinfo_destroy(clinfo);
238
239 return NULL;
240 }
241
242 gboolean cfs_clinfo_add_node(
243 cfs_clinfo_t *clinfo,
244 cfs_clnode_t *clnode)
245 {
246 g_return_val_if_fail(clinfo != NULL, FALSE);
247 g_return_val_if_fail(clnode != NULL, FALSE);
248
249 g_hash_table_replace(clinfo->nodes_byid, &clnode->nodeid, clnode);
250 g_hash_table_replace(clinfo->nodes_byname, clnode->name, clnode);
251
252 return TRUE;
253 }
254
255 int
256 cfs_create_memberlist_msg(
257 GString *str)
258 {
259 g_return_val_if_fail(str != NULL, -EINVAL);
260
261 g_mutex_lock (&mutex);
262
263 g_string_append_printf(str,"{\n");
264
265 guint nodecount = 0;
266
267 cfs_clinfo_t *clinfo = cfs_status.clinfo;
268
269 if (clinfo && clinfo->nodes_byid)
270 nodecount = g_hash_table_size(clinfo->nodes_byid);
271
272 if (nodecount) {
273 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
274 g_string_append_printf(str, "\"version\": %u,\n", cfs_status.clinfo_version);
275
276 g_string_append_printf(str, "\"cluster\": { ");
277 g_string_append_printf(str, "\"name\": \"%s\", \"version\": %d, "
278 "\"nodes\": %d, \"quorate\": %d ",
279 clinfo->cluster_name, clinfo->cman_version,
280 nodecount, cfs_status.quorate);
281
282 g_string_append_printf(str,"},\n");
283 g_string_append_printf(str,"\"nodelist\": {\n");
284
285 GHashTable *ht = clinfo->nodes_byid;
286 GHashTableIter iter;
287 gpointer key, value;
288
289 g_hash_table_iter_init (&iter, ht);
290
291 int i = 0;
292 while (g_hash_table_iter_next (&iter, &key, &value)) {
293 cfs_clnode_t *node = (cfs_clnode_t *)value;
294 if (i) g_string_append_printf(str, ",\n");
295 i++;
296
297 g_string_append_printf(str, " \"%s\": { \"id\": %d, \"online\": %d",
298 node->name, node->nodeid, node->online);
299
300
301 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, node->name);
302 if (ip) {
303 g_string_append_printf(str, ", \"ip\": \"%s\"", ip);
304 }
305
306 g_string_append_printf(str, "}");
307
308 }
309 g_string_append_printf(str,"\n }\n");
310 } else {
311 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
312 g_string_append_printf(str, "\"version\": %u\n", cfs_status.clinfo_version);
313 }
314
315 g_string_append_printf(str,"}\n");
316
317 g_mutex_unlock (&mutex);
318
319 return 0;
320 }
321
322 static void
323 kventry_free(kventry_t *entry)
324 {
325 g_return_if_fail(entry != NULL);
326
327 g_free(entry->key);
328 g_free(entry->data);
329 g_free(entry);
330 }
331
332 static GHashTable *
333 kventry_hash_new(void)
334 {
335 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
336 (GDestroyNotify)kventry_free);
337 }
338
339 static void
340 rrdentry_free(rrdentry_t *entry)
341 {
342 g_return_if_fail(entry != NULL);
343
344 g_free(entry->key);
345 g_free(entry->data);
346 g_free(entry);
347 }
348
349 static GHashTable *
350 rrdentry_hash_new(void)
351 {
352 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
353 (GDestroyNotify)rrdentry_free);
354 }
355
356 void
357 cfs_cluster_log_dump(GString *str, const char *user, guint max_entries)
358 {
359 clusterlog_dump(cfs_status.clusterlog, str, user, max_entries);
360 }
361
362 void
363 cfs_cluster_log(clog_entry_t *entry)
364 {
365 g_return_if_fail(entry != NULL);
366
367 clusterlog_insert(cfs_status.clusterlog, entry);
368
369 if (cfs_status.kvstore) {
370 struct iovec iov[1];
371 iov[0].iov_base = (char *)entry;
372 iov[0].iov_len = clog_entry_size(entry);
373
374 if (dfsm_is_initialized(cfs_status.kvstore))
375 dfsm_send_message(cfs_status.kvstore, KVSTORE_MESSAGE_LOG, iov, 1);
376 }
377 }
378
379 void cfs_status_init(void)
380 {
381 g_mutex_lock (&mutex);
382
383 cfs_status.start_time = time(NULL);
384
385 cfs_status.vmlist = vmlist_hash_new();
386
387 cfs_status.kvhash = kventry_hash_new();
388
389 cfs_status.rrdhash = rrdentry_hash_new();
390
391 cfs_status.iphash = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
392
393 cfs_status.memdb_changes = g_hash_table_new(g_str_hash, g_str_equal);
394
395 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
396 g_hash_table_replace(cfs_status.memdb_changes,
397 memdb_change_array[i].path,
398 &memdb_change_array[i]);
399 }
400
401 cfs_status.clusterlog = clusterlog_new();
402
403 // fixme:
404 clusterlog_add(cfs_status.clusterlog, "root", "cluster", getpid(),
405 LOG_INFO, "starting cluster log");
406
407 g_mutex_unlock (&mutex);
408 }
409
410 void cfs_status_cleanup(void)
411 {
412 g_mutex_lock (&mutex);
413
414 cfs_status.clinfo_version++;
415
416 if (cfs_status.clinfo) {
417 cfs_clinfo_destroy(cfs_status.clinfo);
418 cfs_status.clinfo = NULL;
419 }
420
421 if (cfs_status.vmlist) {
422 g_hash_table_destroy(cfs_status.vmlist);
423 cfs_status.vmlist = NULL;
424 }
425
426 if (cfs_status.kvhash) {
427 g_hash_table_destroy(cfs_status.kvhash);
428 cfs_status.kvhash = NULL;
429 }
430
431 if (cfs_status.rrdhash) {
432 g_hash_table_destroy(cfs_status.rrdhash);
433 cfs_status.rrdhash = NULL;
434 }
435
436 if (cfs_status.iphash) {
437 g_hash_table_destroy(cfs_status.iphash);
438 cfs_status.iphash = NULL;
439 }
440
441 if (cfs_status.clusterlog)
442 clusterlog_destroy(cfs_status.clusterlog);
443
444 g_mutex_unlock (&mutex);
445 }
446
447 void cfs_status_set_clinfo(
448 cfs_clinfo_t *clinfo)
449 {
450 g_return_if_fail(clinfo != NULL);
451
452 g_mutex_lock (&mutex);
453
454 cfs_status.clinfo_version++;
455
456 cfs_clinfo_t *old = cfs_status.clinfo;
457
458 cfs_status.clinfo = clinfo;
459
460 cfs_message("update cluster info (cluster name %s, version = %d)",
461 clinfo->cluster_name, clinfo->cman_version);
462
463
464 if (old && old->nodes_byid && clinfo->nodes_byid) {
465 /* copy kvstore */
466 GHashTable *ht = clinfo->nodes_byid;
467 GHashTableIter iter;
468 gpointer key, value;
469
470 g_hash_table_iter_init (&iter, ht);
471
472 while (g_hash_table_iter_next (&iter, &key, &value)) {
473 cfs_clnode_t *node = (cfs_clnode_t *)value;
474 cfs_clnode_t *oldnode;
475 if ((oldnode = g_hash_table_lookup(old->nodes_byid, key))) {
476 node->online = oldnode->online;
477 node->kvhash = oldnode->kvhash;
478 oldnode->kvhash = NULL;
479 }
480 }
481
482 }
483
484 if (old)
485 cfs_clinfo_destroy(old);
486
487
488 g_mutex_unlock (&mutex);
489 }
490
491 static void
492 dump_kvstore_versions(
493 GString *str,
494 GHashTable *kvhash,
495 const char *nodename)
496 {
497 g_return_if_fail(kvhash != NULL);
498 g_return_if_fail(str != NULL);
499 g_return_if_fail(nodename != NULL);
500
501 GHashTable *ht = kvhash;
502 GHashTableIter iter;
503 gpointer key, value;
504
505 g_string_append_printf(str, "\"%s\": {\n", nodename);
506
507 g_hash_table_iter_init (&iter, ht);
508
509 int i = 0;
510 while (g_hash_table_iter_next (&iter, &key, &value)) {
511 kventry_t *entry = (kventry_t *)value;
512 if (i) g_string_append_printf(str, ",\n");
513 i++;
514 g_string_append_printf(str,"\"%s\": %u", entry->key, entry->version);
515 }
516
517 g_string_append_printf(str, "}\n");
518 }
519
520 int
521 cfs_create_version_msg(GString *str)
522 {
523 g_return_val_if_fail(str != NULL, -EINVAL);
524
525 g_mutex_lock (&mutex);
526
527 g_string_append_printf(str,"{\n");
528
529 g_string_append_printf(str, "\"starttime\": %lu,\n", (unsigned long)cfs_status.start_time);
530
531 g_string_append_printf(str, "\"clinfo\": %u,\n", cfs_status.clinfo_version);
532
533 g_string_append_printf(str, "\"vmlist\": %u,\n", cfs_status.vmlist_version);
534
535 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
536 g_string_append_printf(str, "\"%s\": %u,\n",
537 memdb_change_array[i].path,
538 memdb_change_array[i].version);
539 }
540
541 g_string_append_printf(str, "\"kvstore\": {\n");
542
543 dump_kvstore_versions(str, cfs_status.kvhash, cfs.nodename);
544
545 cfs_clinfo_t *clinfo = cfs_status.clinfo;
546
547 if (clinfo && clinfo->nodes_byid) {
548 GHashTable *ht = clinfo->nodes_byid;
549 GHashTableIter iter;
550 gpointer key, value;
551
552 g_hash_table_iter_init (&iter, ht);
553
554 while (g_hash_table_iter_next (&iter, &key, &value)) {
555 cfs_clnode_t *node = (cfs_clnode_t *)value;
556 if (!node->kvhash)
557 continue;
558 g_string_append_printf(str, ",\n");
559 dump_kvstore_versions(str, node->kvhash, node->name);
560 }
561 }
562
563 g_string_append_printf(str,"}\n");
564
565 g_string_append_printf(str,"}\n");
566
567 g_mutex_unlock (&mutex);
568
569 return 0;
570 }
571
572 GHashTable *
573 vmlist_hash_new(void)
574 {
575 return g_hash_table_new_full(g_int_hash, g_int_equal, NULL,
576 (GDestroyNotify)vminfo_free);
577 }
578
579 gboolean
580 vmlist_hash_insert_vm(
581 GHashTable *vmlist,
582 int vmtype,
583 guint32 vmid,
584 const char *nodename,
585 gboolean replace)
586 {
587 g_return_val_if_fail(vmlist != NULL, FALSE);
588 g_return_val_if_fail(nodename != NULL, FALSE);
589 g_return_val_if_fail(vmid != 0, FALSE);
590 g_return_val_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
591 vmtype == VMTYPE_LXC, FALSE);
592
593 if (!replace && g_hash_table_lookup(vmlist, &vmid)) {
594 cfs_critical("detected duplicate VMID %d", vmid);
595 return FALSE;
596 }
597
598 vminfo_t *vminfo = g_new0(vminfo_t, 1);
599
600 vminfo->vmid = vmid;
601 vminfo->vmtype = vmtype;
602 vminfo->nodename = g_strdup(nodename);
603
604 vminfo->version = ++vminfo_version_counter;
605
606 g_hash_table_replace(vmlist, &vminfo->vmid, vminfo);
607
608 return TRUE;
609 }
610
611 void
612 vmlist_register_vm(
613 int vmtype,
614 guint32 vmid,
615 const char *nodename)
616 {
617 g_return_if_fail(cfs_status.vmlist != NULL);
618 g_return_if_fail(nodename != NULL);
619 g_return_if_fail(vmid != 0);
620 g_return_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
621 vmtype == VMTYPE_LXC);
622
623 cfs_debug("vmlist_register_vm: %s/%u %d", nodename, vmid, vmtype);
624
625 g_mutex_lock (&mutex);
626
627 cfs_status.vmlist_version++;
628
629 vmlist_hash_insert_vm(cfs_status.vmlist, vmtype, vmid, nodename, TRUE);
630
631 g_mutex_unlock (&mutex);
632 }
633
634 gboolean
635 vmlist_different_vm_exists(
636 int vmtype,
637 guint32 vmid,
638 const char *nodename)
639 {
640 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
641 g_return_val_if_fail(vmid != 0, FALSE);
642
643 gboolean res = FALSE;
644
645 g_mutex_lock (&mutex);
646
647 vminfo_t *vminfo;
648 if ((vminfo = (vminfo_t *)g_hash_table_lookup(cfs_status.vmlist, &vmid))) {
649 if (!(vminfo->vmtype == vmtype && strcmp(vminfo->nodename, nodename) == 0))
650 res = TRUE;
651 }
652 g_mutex_unlock (&mutex);
653
654 return res;
655 }
656
657 gboolean
658 vmlist_vm_exists(
659 guint32 vmid)
660 {
661 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
662 g_return_val_if_fail(vmid != 0, FALSE);
663
664 g_mutex_lock (&mutex);
665
666 gpointer res = g_hash_table_lookup(cfs_status.vmlist, &vmid);
667
668 g_mutex_unlock (&mutex);
669
670 return res != NULL;
671 }
672
673 void
674 vmlist_delete_vm(
675 guint32 vmid)
676 {
677 g_return_if_fail(cfs_status.vmlist != NULL);
678 g_return_if_fail(vmid != 0);
679
680 g_mutex_lock (&mutex);
681
682 cfs_status.vmlist_version++;
683
684 g_hash_table_remove(cfs_status.vmlist, &vmid);
685
686 g_mutex_unlock (&mutex);
687 }
688
689 void cfs_status_set_vmlist(
690 GHashTable *vmlist)
691 {
692 g_return_if_fail(vmlist != NULL);
693
694 g_mutex_lock (&mutex);
695
696 cfs_status.vmlist_version++;
697
698 if (cfs_status.vmlist)
699 g_hash_table_destroy(cfs_status.vmlist);
700
701 cfs_status.vmlist = vmlist;
702
703 g_mutex_unlock (&mutex);
704 }
705
706 int
707 cfs_create_vmlist_msg(GString *str)
708 {
709 g_return_val_if_fail(cfs_status.vmlist != NULL, -EINVAL);
710 g_return_val_if_fail(str != NULL, -EINVAL);
711
712 g_mutex_lock (&mutex);
713
714 g_string_append_printf(str,"{\n");
715
716 GHashTable *ht = cfs_status.vmlist;
717
718 guint count = g_hash_table_size(ht);
719
720 if (!count) {
721 g_string_append_printf(str,"\"version\": %u\n", cfs_status.vmlist_version);
722 } else {
723 g_string_append_printf(str,"\"version\": %u,\n", cfs_status.vmlist_version);
724
725 g_string_append_printf(str,"\"ids\": {\n");
726
727 GHashTableIter iter;
728 gpointer key, value;
729
730 g_hash_table_iter_init (&iter, ht);
731
732 int first = 1;
733 while (g_hash_table_iter_next (&iter, &key, &value)) {
734 vminfo_t *vminfo = (vminfo_t *)value;
735 char *type;
736 if (vminfo->vmtype == VMTYPE_QEMU) {
737 type = "qemu";
738 } else if (vminfo->vmtype == VMTYPE_OPENVZ) {
739 type = "openvz";
740 } else if (vminfo->vmtype == VMTYPE_LXC) {
741 type = "lxc";
742 } else {
743 type = "unknown";
744 }
745
746 if (!first)
747 g_string_append_printf(str, ",\n");
748 first = 0;
749
750 g_string_append_printf(str,"\"%u\": { \"node\": \"%s\", \"type\": \"%s\", \"version\": %u }",
751 vminfo->vmid, vminfo->nodename, type, vminfo->version);
752 }
753
754 g_string_append_printf(str,"}\n");
755 }
756 g_string_append_printf(str,"\n}\n");
757
758 g_mutex_unlock (&mutex);
759
760 return 0;
761 }
762
763 void
764 record_memdb_change(const char *path)
765 {
766 g_return_if_fail(cfs_status.memdb_changes != 0);
767
768 memdb_change_t *ce;
769
770 if ((ce = (memdb_change_t *)g_hash_table_lookup(cfs_status.memdb_changes, path))) {
771 ce->version++;
772 }
773 }
774
775 void
776 record_memdb_reload(void)
777 {
778 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
779 memdb_change_array[i].version++;
780 }
781 }
782
783 static gboolean
784 kventry_hash_set(
785 GHashTable *kvhash,
786 const char *key,
787 gconstpointer data,
788 size_t len)
789 {
790 g_return_val_if_fail(kvhash != NULL, FALSE);
791 g_return_val_if_fail(key != NULL, FALSE);
792 g_return_val_if_fail(data != NULL, FALSE);
793
794 kventry_t *entry;
795 if ((entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
796 g_free(entry->data);
797 entry->data = g_memdup(data, len);
798 entry->len = len;
799 entry->version++;
800 } else {
801 kventry_t *entry = g_new0(kventry_t, 1);
802
803 entry->key = g_strdup(key);
804 entry->data = g_memdup(data, len);
805 entry->len = len;
806
807 g_hash_table_replace(kvhash, entry->key, entry);
808 }
809
810 return TRUE;
811 }
812
813 static const char *rrd_def_node[] = {
814 "DS:loadavg:GAUGE:120:0:U",
815 "DS:maxcpu:GAUGE:120:0:U",
816 "DS:cpu:GAUGE:120:0:U",
817 "DS:iowait:GAUGE:120:0:U",
818 "DS:memtotal:GAUGE:120:0:U",
819 "DS:memused:GAUGE:120:0:U",
820 "DS:swaptotal:GAUGE:120:0:U",
821 "DS:swapused:GAUGE:120:0:U",
822 "DS:roottotal:GAUGE:120:0:U",
823 "DS:rootused:GAUGE:120:0:U",
824 "DS:netin:DERIVE:120:0:U",
825 "DS:netout:DERIVE:120:0:U",
826
827 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
828 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
829 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
830 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
831 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
832
833 "RRA:MAX:0.5:1:70", // 1 min max - one hour
834 "RRA:MAX:0.5:30:70", // 30 min max - one day
835 "RRA:MAX:0.5:180:70", // 3 hour max - one week
836 "RRA:MAX:0.5:720:70", // 12 hour max - one month
837 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
838 NULL,
839 };
840
841 static const char *rrd_def_vm[] = {
842 "DS:maxcpu:GAUGE:120:0:U",
843 "DS:cpu:GAUGE:120:0:U",
844 "DS:maxmem:GAUGE:120:0:U",
845 "DS:mem:GAUGE:120:0:U",
846 "DS:maxdisk:GAUGE:120:0:U",
847 "DS:disk:GAUGE:120:0:U",
848 "DS:netin:DERIVE:120:0:U",
849 "DS:netout:DERIVE:120:0:U",
850 "DS:diskread:DERIVE:120:0:U",
851 "DS:diskwrite:DERIVE:120:0:U",
852
853 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
854 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
855 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
856 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
857 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
858
859 "RRA:MAX:0.5:1:70", // 1 min max - one hour
860 "RRA:MAX:0.5:30:70", // 30 min max - one day
861 "RRA:MAX:0.5:180:70", // 3 hour max - one week
862 "RRA:MAX:0.5:720:70", // 12 hour max - one month
863 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
864 NULL,
865 };
866
867 static const char *rrd_def_storage[] = {
868 "DS:total:GAUGE:120:0:U",
869 "DS:used:GAUGE:120:0:U",
870
871 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
872 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
873 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
874 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
875 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
876
877 "RRA:MAX:0.5:1:70", // 1 min max - one hour
878 "RRA:MAX:0.5:30:70", // 30 min max - one day
879 "RRA:MAX:0.5:180:70", // 3 hour max - one week
880 "RRA:MAX:0.5:720:70", // 12 hour max - one month
881 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
882 NULL,
883 };
884
885 #define RRDDIR "/var/lib/rrdcached/db"
886
887 static void
888 create_rrd_file(
889 const char *filename,
890 int argcount,
891 const char *rrddef[])
892 {
893 /* start at day boundary */
894 time_t ctime;
895 time(&ctime);
896 struct tm *ltm = localtime(&ctime);
897 ltm->tm_sec = 0;
898 ltm->tm_min = 0;
899 ltm->tm_hour = 0;
900
901 rrd_clear_error();
902 if (rrd_create_r(filename, 60, timelocal(ltm), argcount, rrddef)) {
903 cfs_message("RRD create error %s: %s", filename, rrd_get_error());
904 }
905 }
906
907 static inline const char *
908 rrd_skip_data(
909 const char *data,
910 int count)
911 {
912 int found = 0;
913 while (*data && found < count) {
914 if (*data++ == ':')
915 found++;
916 }
917 return data;
918 }
919
920 static void
921 update_rrd_data(
922 const char *key,
923 gconstpointer data,
924 size_t len)
925 {
926 g_return_if_fail(key != NULL);
927 g_return_if_fail(data != NULL);
928 g_return_if_fail(len > 0);
929 g_return_if_fail(len < 4096);
930
931 static const char *rrdcsock = "unix:/var/run/rrdcached.sock";
932
933 int use_daemon = 1;
934 if (rrdc_connect(rrdcsock) != 0)
935 use_daemon = 0;
936
937 char *filename = NULL;
938
939 int skip = 0;
940
941 if (strncmp(key, "pve2-node/", 10) == 0) {
942 const char *node = key + 10;
943
944 skip = 2;
945
946 if (strchr(node, '/') != NULL)
947 goto keyerror;
948
949 if (strlen(node) < 1)
950 goto keyerror;
951
952 filename = g_strdup_printf(RRDDIR "/%s", key);
953
954 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
955
956 mkdir(RRDDIR "/pve2-node", 0755);
957 int argcount = sizeof(rrd_def_node)/sizeof(void*) - 1;
958 create_rrd_file(filename, argcount, rrd_def_node);
959 }
960
961 } else if ((strncmp(key, "pve2-vm/", 8) == 0) ||
962 (strncmp(key, "pve2.3-vm/", 10) == 0)) {
963 const char *vmid;
964
965 if (strncmp(key, "pve2-vm/", 8) == 0) {
966 vmid = key + 8;
967 skip = 2;
968 } else {
969 vmid = key + 10;
970 skip = 4;
971 }
972
973 if (strchr(vmid, '/') != NULL)
974 goto keyerror;
975
976 if (strlen(vmid) < 1)
977 goto keyerror;
978
979 filename = g_strdup_printf(RRDDIR "/%s/%s", "pve2-vm", vmid);
980
981 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
982
983 mkdir(RRDDIR "/pve2-vm", 0755);
984 int argcount = sizeof(rrd_def_vm)/sizeof(void*) - 1;
985 create_rrd_file(filename, argcount, rrd_def_vm);
986 }
987
988 } else if (strncmp(key, "pve2-storage/", 13) == 0) {
989 const char *node = key + 13;
990
991 const char *storage = node;
992 while (*storage && *storage != '/')
993 storage++;
994
995 if (*storage != '/' || ((storage - node) < 1))
996 goto keyerror;
997
998 storage++;
999
1000 if (strchr(storage, '/') != NULL)
1001 goto keyerror;
1002
1003 if (strlen(storage) < 1)
1004 goto keyerror;
1005
1006 filename = g_strdup_printf(RRDDIR "/%s", key);
1007
1008 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1009
1010 mkdir(RRDDIR "/pve2-storage", 0755);
1011
1012 char *dir = g_path_get_dirname(filename);
1013 mkdir(dir, 0755);
1014 g_free(dir);
1015
1016 int argcount = sizeof(rrd_def_storage)/sizeof(void*) - 1;
1017 create_rrd_file(filename, argcount, rrd_def_storage);
1018 }
1019
1020 } else {
1021 goto keyerror;
1022 }
1023
1024 const char *dp = skip ? rrd_skip_data(data, skip) : data;
1025
1026 const char *update_args[] = { dp, NULL };
1027
1028 if (use_daemon) {
1029 int status;
1030 if ((status = rrdc_update(filename, 1, update_args)) != 0) {
1031 cfs_message("RRDC update error %s: %d", filename, status);
1032 rrdc_disconnect();
1033 rrd_clear_error();
1034 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1035 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1036 }
1037 }
1038
1039 } else {
1040 rrd_clear_error();
1041 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1042 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1043 }
1044 }
1045
1046 ret:
1047 if (filename)
1048 g_free(filename);
1049
1050 return;
1051
1052 keyerror:
1053 cfs_critical("RRD update error: unknown/wrong key %s", key);
1054 goto ret;
1055 }
1056
1057 static gboolean
1058 rrd_entry_is_old(
1059 gpointer key,
1060 gpointer value,
1061 gpointer user_data)
1062 {
1063 rrdentry_t *entry = (rrdentry_t *)value;
1064 uint32_t ctime = GPOINTER_TO_UINT(user_data);
1065
1066 int diff = ctime - entry->time;
1067
1068 /* remove everything older than 5 minutes */
1069 int expire = 60*5;
1070
1071 return (diff > expire) ? TRUE : FALSE;
1072 }
1073
1074 static char *rrd_dump_buf = NULL;
1075 static time_t rrd_dump_last = 0;
1076
1077 void
1078 cfs_rrd_dump(GString *str)
1079 {
1080 time_t ctime;
1081
1082 g_mutex_lock (&mutex);
1083
1084 time(&ctime);
1085 if (rrd_dump_buf && (ctime - rrd_dump_last) < 2) {
1086 g_string_assign(str, rrd_dump_buf);
1087 g_mutex_unlock (&mutex);
1088 return;
1089 }
1090
1091 /* remove old data */
1092 g_hash_table_foreach_remove(cfs_status.rrdhash, rrd_entry_is_old,
1093 GUINT_TO_POINTER(ctime));
1094
1095 g_string_set_size(str, 0);
1096
1097 GHashTableIter iter;
1098 gpointer key, value;
1099
1100 g_hash_table_iter_init (&iter, cfs_status.rrdhash);
1101
1102 while (g_hash_table_iter_next (&iter, &key, &value)) {
1103 rrdentry_t *entry = (rrdentry_t *)value;
1104 g_string_append(str, key);
1105 g_string_append(str, ":");
1106 g_string_append(str, entry->data);
1107 g_string_append(str, "\n");
1108 }
1109
1110 g_string_append_c(str, 0); // never return undef
1111
1112 rrd_dump_last = ctime;
1113 if (rrd_dump_buf)
1114 g_free(rrd_dump_buf);
1115 rrd_dump_buf = g_strdup(str->str);
1116
1117 g_mutex_unlock (&mutex);
1118 }
1119
1120 static gboolean
1121 nodeip_hash_set(
1122 GHashTable *iphash,
1123 const char *nodename,
1124 const char *ip,
1125 size_t len)
1126 {
1127 g_return_val_if_fail(iphash != NULL, FALSE);
1128 g_return_val_if_fail(nodename != NULL, FALSE);
1129 g_return_val_if_fail(ip != NULL, FALSE);
1130 g_return_val_if_fail(len > 0, FALSE);
1131 g_return_val_if_fail(len < 256, FALSE);
1132 g_return_val_if_fail(ip[len-1] == 0, FALSE);
1133
1134 char *oldip = (char *)g_hash_table_lookup(iphash, nodename);
1135
1136 if (!oldip || (strcmp(oldip, ip) != 0)) {
1137 cfs_status.clinfo_version++;
1138 g_hash_table_replace(iphash, g_strdup(nodename), g_strdup(ip));
1139 }
1140
1141 return TRUE;
1142 }
1143
1144 static gboolean
1145 rrdentry_hash_set(
1146 GHashTable *rrdhash,
1147 const char *key,
1148 const char *data,
1149 size_t len)
1150 {
1151 g_return_val_if_fail(rrdhash != NULL, FALSE);
1152 g_return_val_if_fail(key != NULL, FALSE);
1153 g_return_val_if_fail(data != NULL, FALSE);
1154 g_return_val_if_fail(len > 0, FALSE);
1155 g_return_val_if_fail(len < 4096, FALSE);
1156 g_return_val_if_fail(data[len-1] == 0, FALSE);
1157
1158 rrdentry_t *entry;
1159 if ((entry = (rrdentry_t *)g_hash_table_lookup(rrdhash, key))) {
1160 g_free(entry->data);
1161 entry->data = g_memdup(data, len);
1162 entry->len = len;
1163 entry->time = time(NULL);
1164 } else {
1165 rrdentry_t *entry = g_new0(rrdentry_t, 1);
1166
1167 entry->key = g_strdup(key);
1168 entry->data = g_memdup(data, len);
1169 entry->len = len;
1170 entry->time = time(NULL);
1171
1172 g_hash_table_replace(rrdhash, entry->key, entry);
1173 }
1174
1175 update_rrd_data(key, data, len);
1176
1177 return TRUE;
1178 }
1179
1180 static int
1181 kvstore_send_update_message(
1182 dfsm_t *dfsm,
1183 const char *key,
1184 gpointer data,
1185 guint32 len)
1186 {
1187 if (!dfsm_is_initialized(dfsm))
1188 return -EACCES;
1189
1190 struct iovec iov[2];
1191
1192 char name[256];
1193 g_strlcpy(name, key, sizeof(name));
1194
1195 iov[0].iov_base = &name;
1196 iov[0].iov_len = sizeof(name);
1197
1198 iov[1].iov_base = (char *)data;
1199 iov[1].iov_len = len;
1200
1201 if (dfsm_send_message(dfsm, KVSTORE_MESSAGE_UPDATE, iov, 2) == CS_OK)
1202 return 0;
1203
1204 return -EACCES;
1205 }
1206
1207 static clog_entry_t *
1208 kvstore_parse_log_message(
1209 const void *msg,
1210 size_t msg_len)
1211 {
1212 g_return_val_if_fail(msg != NULL, NULL);
1213
1214 if (msg_len < sizeof(clog_entry_t)) {
1215 cfs_critical("received short log message (%zu < %zu)", msg_len, sizeof(clog_entry_t));
1216 return NULL;
1217 }
1218
1219 clog_entry_t *entry = (clog_entry_t *)msg;
1220
1221 uint32_t size = sizeof(clog_entry_t) + entry->node_len +
1222 entry->ident_len + entry->tag_len + entry->msg_len;
1223
1224 if (msg_len != size) {
1225 cfs_critical("received log message with wrong size (%zu != %u)", msg_len, size);
1226 return NULL;
1227 }
1228
1229 msg = entry->data;
1230
1231 if (*((char *)msg + entry->node_len - 1)) {
1232 cfs_critical("unterminated string in log message");
1233 return NULL;
1234 }
1235 msg += entry->node_len;
1236
1237 if (*((char *)msg + entry->ident_len - 1)) {
1238 cfs_critical("unterminated string in log message");
1239 return NULL;
1240 }
1241 msg += entry->ident_len;
1242
1243 if (*((char *)msg + entry->tag_len - 1)) {
1244 cfs_critical("unterminated string in log message");
1245 return NULL;
1246 }
1247 msg += entry->tag_len;
1248
1249 if (*((char *)msg + entry->msg_len - 1)) {
1250 cfs_critical("unterminated string in log message");
1251 return NULL;
1252 }
1253
1254 return entry;
1255 }
1256
1257 static gboolean
1258 kvstore_parse_update_message(
1259 const void *msg,
1260 size_t msg_len,
1261 const char **key,
1262 gconstpointer *data,
1263 guint32 *len)
1264 {
1265 g_return_val_if_fail(msg != NULL, FALSE);
1266 g_return_val_if_fail(key != NULL, FALSE);
1267 g_return_val_if_fail(data != NULL, FALSE);
1268 g_return_val_if_fail(len != NULL, FALSE);
1269
1270 if (msg_len < 256) {
1271 cfs_critical("received short kvstore message (%zu < 256)", msg_len);
1272 return FALSE;
1273 }
1274
1275 /* test if key is null terminated */
1276 int i = 0;
1277 for (i = 0; i < 256; i++)
1278 if (((char *)msg)[i] == 0)
1279 break;
1280
1281 if (i == 256)
1282 return FALSE;
1283
1284
1285 *len = msg_len - 256;
1286 *key = msg;
1287 *data = msg + 256;
1288
1289 return TRUE;
1290 }
1291
1292 int
1293 cfs_create_status_msg(
1294 GString *str,
1295 const char *nodename,
1296 const char *key)
1297 {
1298 g_return_val_if_fail(str != NULL, -EINVAL);
1299 g_return_val_if_fail(key != NULL, -EINVAL);
1300
1301 int res = -ENOENT;
1302
1303 GHashTable *kvhash = NULL;
1304
1305 g_mutex_lock (&mutex);
1306
1307 if (!nodename || !nodename[0] || !strcmp(nodename, cfs.nodename)) {
1308 kvhash = cfs_status.kvhash;
1309 } else if (cfs_status.clinfo && cfs_status.clinfo->nodes_byname) {
1310 cfs_clnode_t *clnode;
1311 if ((clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byname, nodename)))
1312 kvhash = clnode->kvhash;
1313 }
1314
1315 kventry_t *entry;
1316 if (kvhash && (entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
1317 g_string_append_len(str, entry->data, entry->len);
1318 res = 0;
1319 }
1320
1321 g_mutex_unlock (&mutex);
1322
1323 return res;
1324 }
1325
1326 int
1327 cfs_status_set(
1328 const char *key,
1329 gpointer data,
1330 size_t len)
1331 {
1332 g_return_val_if_fail(key != NULL, FALSE);
1333 g_return_val_if_fail(data != NULL, FALSE);
1334 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1335
1336 if (len > CFS_MAX_STATUS_SIZE)
1337 return -EFBIG;
1338
1339 g_mutex_lock (&mutex);
1340
1341 gboolean res;
1342
1343 if (strncmp(key, "rrd/", 4) == 0) {
1344 res = rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1345 } else if (!strcmp(key, "nodeip")) {
1346 res = nodeip_hash_set(cfs_status.iphash, cfs.nodename, data, len);
1347 } else {
1348 res = kventry_hash_set(cfs_status.kvhash, key, data, len);
1349 }
1350 g_mutex_unlock (&mutex);
1351
1352 if (cfs_status.kvstore)
1353 kvstore_send_update_message(cfs_status.kvstore, key, data, len);
1354
1355 return res ? 0 : -ENOMEM;
1356 }
1357
1358 gboolean
1359 cfs_kvstore_node_set(
1360 uint32_t nodeid,
1361 const char *key,
1362 gconstpointer data,
1363 size_t len)
1364 {
1365 g_return_val_if_fail(nodeid != 0, FALSE);
1366 g_return_val_if_fail(key != NULL, FALSE);
1367 g_return_val_if_fail(data != NULL, FALSE);
1368
1369 g_mutex_lock (&mutex);
1370
1371 if (!cfs_status.clinfo || !cfs_status.clinfo->nodes_byid)
1372 goto ret; /* ignore */
1373
1374 cfs_clnode_t *clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byid, &nodeid);
1375 if (!clnode)
1376 goto ret; /* ignore */
1377
1378 cfs_debug("got node %d status update %s", nodeid, key);
1379
1380 if (strncmp(key, "rrd/", 4) == 0) {
1381 rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1382 } else if (!strcmp(key, "nodeip")) {
1383 nodeip_hash_set(cfs_status.iphash, clnode->name, data, len);
1384 } else {
1385 if (!clnode->kvhash) {
1386 if (!(clnode->kvhash = kventry_hash_new())) {
1387 goto ret; /*ignore */
1388 }
1389 }
1390
1391 kventry_hash_set(clnode->kvhash, key, data, len);
1392
1393 }
1394 ret:
1395 g_mutex_unlock (&mutex);
1396
1397 return TRUE;
1398 }
1399
1400 static gboolean
1401 cfs_kvstore_sync(void)
1402 {
1403 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1404 g_return_val_if_fail(cfs_status.kvstore != NULL, FALSE);
1405
1406 gboolean res = TRUE;
1407
1408 g_mutex_lock (&mutex);
1409
1410 GHashTable *ht = cfs_status.kvhash;
1411 GHashTableIter iter;
1412 gpointer key, value;
1413
1414 g_hash_table_iter_init (&iter, ht);
1415
1416 while (g_hash_table_iter_next (&iter, &key, &value)) {
1417 kventry_t *entry = (kventry_t *)value;
1418 kvstore_send_update_message(cfs_status.kvstore, entry->key, entry->data, entry->len);
1419 }
1420
1421 g_mutex_unlock (&mutex);
1422
1423 return res;
1424 }
1425
1426 static int
1427 dfsm_deliver(
1428 dfsm_t *dfsm,
1429 gpointer data,
1430 int *res_ptr,
1431 uint32_t nodeid,
1432 uint32_t pid,
1433 uint16_t msg_type,
1434 uint32_t msg_time,
1435 const void *msg,
1436 size_t msg_len)
1437 {
1438 g_return_val_if_fail(dfsm != NULL, -1);
1439 g_return_val_if_fail(msg != NULL, -1);
1440 g_return_val_if_fail(res_ptr != NULL, -1);
1441
1442 /* ignore message for ourself */
1443 if (dfsm_nodeid_is_local(dfsm, nodeid, pid))
1444 goto ret;
1445
1446 if (msg_type == KVSTORE_MESSAGE_UPDATE) {
1447 const char *key;
1448 gconstpointer data;
1449 guint32 len;
1450 if (kvstore_parse_update_message(msg, msg_len, &key, &data, &len)) {
1451 cfs_kvstore_node_set(nodeid, key, data, len);
1452 } else {
1453 cfs_critical("cant parse update message");
1454 }
1455 } else if (msg_type == KVSTORE_MESSAGE_LOG) {
1456 cfs_message("received log"); // fixme: remove
1457 const clog_entry_t *entry;
1458 if ((entry = kvstore_parse_log_message(msg, msg_len))) {
1459 clusterlog_insert(cfs_status.clusterlog, entry);
1460 } else {
1461 cfs_critical("cant parse log message");
1462 }
1463 } else {
1464 cfs_critical("received unknown message type %d\n", msg_type);
1465 goto fail;
1466 }
1467
1468 ret:
1469 *res_ptr = 0;
1470 return 1;
1471
1472 fail:
1473 *res_ptr = -EACCES;
1474 return 1;
1475 }
1476
1477 static void
1478 dfsm_confchg(
1479 dfsm_t *dfsm,
1480 gpointer data,
1481 const struct cpg_address *member_list,
1482 size_t member_list_entries)
1483 {
1484 g_return_if_fail(dfsm != NULL);
1485 g_return_if_fail(member_list != NULL);
1486
1487 cfs_debug("enter %s", __func__);
1488
1489 g_mutex_lock (&mutex);
1490
1491 cfs_clinfo_t *clinfo = cfs_status.clinfo;
1492
1493 if (clinfo && clinfo->nodes_byid) {
1494
1495 GHashTable *ht = clinfo->nodes_byid;
1496 GHashTableIter iter;
1497 gpointer key, value;
1498
1499 g_hash_table_iter_init (&iter, ht);
1500
1501 while (g_hash_table_iter_next (&iter, &key, &value)) {
1502 cfs_clnode_t *node = (cfs_clnode_t *)value;
1503 node->online = FALSE;
1504 }
1505
1506 for (int i = 0; i < member_list_entries; i++) {
1507 cfs_clnode_t *node;
1508 if ((node = g_hash_table_lookup(clinfo->nodes_byid, &member_list[i].nodeid))) {
1509 node->online = TRUE;
1510 }
1511 }
1512
1513 cfs_status.clinfo_version++;
1514 }
1515
1516 g_mutex_unlock (&mutex);
1517 }
1518
1519 static gpointer
1520 dfsm_get_state(
1521 dfsm_t *dfsm,
1522 gpointer data,
1523 unsigned int *res_len)
1524 {
1525 g_return_val_if_fail(dfsm != NULL, NULL);
1526
1527 gpointer msg = clusterlog_get_state(cfs_status.clusterlog, res_len);
1528
1529 return msg;
1530 }
1531
1532 static int
1533 dfsm_process_update(
1534 dfsm_t *dfsm,
1535 gpointer data,
1536 dfsm_sync_info_t *syncinfo,
1537 uint32_t nodeid,
1538 uint32_t pid,
1539 const void *msg,
1540 size_t msg_len)
1541 {
1542 cfs_critical("%s: received unexpected update message", __func__);
1543
1544 return -1;
1545 }
1546
1547 static int
1548 dfsm_process_state_update(
1549 dfsm_t *dfsm,
1550 gpointer data,
1551 dfsm_sync_info_t *syncinfo)
1552 {
1553 g_return_val_if_fail(dfsm != NULL, -1);
1554 g_return_val_if_fail(syncinfo != NULL, -1);
1555
1556 clog_base_t *clog[syncinfo->node_count];
1557
1558 int local_index = -1;
1559 for (int i = 0; i < syncinfo->node_count; i++) {
1560 dfsm_node_info_t *ni = &syncinfo->nodes[i];
1561 ni->synced = 1;
1562
1563 if (syncinfo->local == ni)
1564 local_index = i;
1565
1566 clog_base_t *base = (clog_base_t *)ni->state;
1567 if (ni->state_len > 8 && ni->state_len == clog_size(base)) {
1568 clog[i] = ni->state;
1569 } else {
1570 cfs_critical("received log with wrong size %u", ni->state_len);
1571 clog[i] = NULL;
1572 }
1573 }
1574
1575 if (!clusterlog_merge(cfs_status.clusterlog, clog, syncinfo->node_count, local_index)) {
1576 cfs_critical("unable to merge log files");
1577 }
1578
1579 cfs_kvstore_sync();
1580
1581 return 1;
1582 }
1583
1584 static int
1585 dfsm_commit(
1586 dfsm_t *dfsm,
1587 gpointer data,
1588 dfsm_sync_info_t *syncinfo)
1589 {
1590 g_return_val_if_fail(dfsm != NULL, -1);
1591 g_return_val_if_fail(syncinfo != NULL, -1);
1592
1593 return 1;
1594 }
1595
1596 static void
1597 dfsm_synced(dfsm_t *dfsm)
1598 {
1599 g_return_if_fail(dfsm != NULL);
1600
1601 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, cfs.nodename);
1602 if (!ip)
1603 ip = cfs.ip;
1604
1605 cfs_status_set("nodeip", ip, strlen(ip) + 1);
1606 }
1607
1608 static int
1609 dfsm_cleanup(
1610 dfsm_t *dfsm,
1611 gpointer data,
1612 dfsm_sync_info_t *syncinfo)
1613 {
1614 return 1;
1615 }
1616
1617 static dfsm_callbacks_t kvstore_dfsm_callbacks = {
1618 .dfsm_deliver_fn = dfsm_deliver,
1619 .dfsm_confchg_fn = dfsm_confchg,
1620
1621 .dfsm_get_state_fn = dfsm_get_state,
1622 .dfsm_process_state_update_fn = dfsm_process_state_update,
1623 .dfsm_process_update_fn = dfsm_process_update,
1624 .dfsm_commit_fn = dfsm_commit,
1625 .dfsm_cleanup_fn = dfsm_cleanup,
1626 .dfsm_synced_fn = dfsm_synced,
1627 };
1628
1629 dfsm_t *
1630 cfs_status_dfsm_new(void)
1631 {
1632 g_mutex_lock (&mutex);
1633
1634 cfs_status.kvstore = dfsm_new(NULL, KVSTORE_CPG_GROUP_NAME, G_LOG_DOMAIN,
1635 0, &kvstore_dfsm_callbacks);
1636 g_mutex_unlock (&mutex);
1637
1638 return cfs_status.kvstore;
1639 }
1640
1641 gboolean
1642 cfs_is_quorate(void)
1643 {
1644 g_mutex_lock (&mutex);
1645 gboolean res = cfs_status.quorate;
1646 g_mutex_unlock (&mutex);
1647
1648 return res;
1649 }
1650
1651 void
1652 cfs_set_quorate(
1653 uint32_t quorate,
1654 gboolean quiet)
1655 {
1656 g_mutex_lock (&mutex);
1657
1658 uint32_t prev_quorate = cfs_status.quorate;
1659 cfs_status.quorate = quorate;
1660
1661 if (!prev_quorate && cfs_status.quorate) {
1662 if (!quiet)
1663 cfs_message("node has quorum");
1664 }
1665
1666 if (prev_quorate && !cfs_status.quorate) {
1667 if (!quiet)
1668 cfs_message("node lost quorum");
1669 }
1670
1671 g_mutex_unlock (&mutex);
1672 }
1673