]> git.proxmox.com Git - pve-cluster.git/blob - data/src/status.c
fix #1559: pmxcfs: add missing lock when dumping .rrd
[pve-cluster.git] / data / src / status.c
1 /*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19 */
20
21 #define G_LOG_DOMAIN "status"
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif /* HAVE_CONFIG_H */
26
27 #include <stdio.h>
28 #include <stdint.h>
29 #include <string.h>
30 #include <errno.h>
31 #include <glib.h>
32 #include <sys/syslog.h>
33 #include <rrd.h>
34 #include <rrd_client.h>
35 #include <time.h>
36
37 #include "cfs-utils.h"
38 #include "status.h"
39 #include "logger.h"
40
41 #define KVSTORE_CPG_GROUP_NAME "pve_kvstore_v1"
42
43 typedef enum {
44 KVSTORE_MESSAGE_UPDATE = 1,
45 KVSTORE_MESSAGE_UPDATE_COMPLETE = 2,
46 KVSTORE_MESSAGE_LOG = 3,
47 } kvstore_message_t;
48
49 static uint32_t vminfo_version_counter;
50
51 typedef struct {
52 uint32_t vmid;
53 char *nodename;
54 int vmtype;
55 uint32_t version;
56 } vminfo_t;
57
58 typedef struct {
59 char *key;
60 gpointer data;
61 size_t len;
62 uint32_t version;
63 } kventry_t;
64
65 typedef struct {
66 char *key;
67 gpointer data;
68 size_t len;
69 uint32_t time;
70 } rrdentry_t;
71
72 typedef struct {
73 char *path;
74 uint32_t version;
75 } memdb_change_t;
76
77 static memdb_change_t memdb_change_array[] = {
78 { .path = "corosync.conf" },
79 { .path = "corosync.conf.new" },
80 { .path = "storage.cfg" },
81 { .path = "user.cfg" },
82 { .path = "domains.cfg" },
83 { .path = "priv/shadow.cfg" },
84 { .path = "datacenter.cfg" },
85 { .path = "vzdump.cron" },
86 { .path = "ha/crm_commands" },
87 { .path = "ha/manager_status" },
88 { .path = "ha/resources.cfg" },
89 { .path = "ha/groups.cfg" },
90 { .path = "ha/fence.cfg" },
91 { .path = "status.cfg" },
92 { .path = "replication.cfg" },
93 };
94
95 static GMutex mutex;
96
97 typedef struct {
98 time_t start_time;
99
100 uint32_t quorate;
101
102 cfs_clinfo_t *clinfo;
103 uint32_t clinfo_version;
104
105 GHashTable *vmlist;
106 uint32_t vmlist_version;
107
108 dfsm_t *kvstore;
109 GHashTable *kvhash;
110 GHashTable *rrdhash;
111 GHashTable *iphash;
112
113 GHashTable *memdb_changes;
114
115 clusterlog_t *clusterlog;
116 } cfs_status_t;
117
118 static cfs_status_t cfs_status;
119
120 struct cfs_clnode {
121 char *name;
122 uint32_t nodeid;
123 uint32_t votes;
124 gboolean online;
125 GHashTable *kvhash;
126 };
127
128 struct cfs_clinfo {
129 char *cluster_name;
130 uint32_t cman_version;
131
132 GHashTable *nodes_byid;
133 GHashTable *nodes_byname;
134 };
135
136 static guint
137 g_int32_hash (gconstpointer v)
138 {
139 return *(const uint32_t *) v;
140 }
141
142 static gboolean
143 g_int32_equal (gconstpointer v1,
144 gconstpointer v2)
145 {
146 return *((const uint32_t*) v1) == *((const uint32_t*) v2);
147 }
148
149 static void vminfo_free(vminfo_t *vminfo)
150 {
151 g_return_if_fail(vminfo != NULL);
152
153 if (vminfo->nodename)
154 g_free(vminfo->nodename);
155
156
157 g_free(vminfo);
158 }
159
160 void cfs_clnode_destroy(
161 cfs_clnode_t *clnode)
162 {
163 g_return_if_fail(clnode != NULL);
164
165 if (clnode->kvhash)
166 g_hash_table_destroy(clnode->kvhash);
167
168 if (clnode->name)
169 g_free(clnode->name);
170
171 g_free(clnode);
172 }
173
174 cfs_clnode_t *cfs_clnode_new(
175 const char *name,
176 uint32_t nodeid,
177 uint32_t votes)
178 {
179 g_return_val_if_fail(name != NULL, NULL);
180
181 cfs_clnode_t *clnode = g_new0(cfs_clnode_t, 1);
182 if (!clnode)
183 return NULL;
184
185 clnode->name = g_strdup(name);
186 clnode->nodeid = nodeid;
187 clnode->votes = votes;
188
189 return clnode;
190 }
191
192 gboolean cfs_clinfo_destroy(
193 cfs_clinfo_t *clinfo)
194 {
195 g_return_val_if_fail(clinfo != NULL, FALSE);
196
197 if (clinfo->cluster_name)
198 g_free(clinfo->cluster_name);
199
200 if (clinfo->nodes_byname)
201 g_hash_table_destroy(clinfo->nodes_byname);
202
203 if (clinfo->nodes_byid)
204 g_hash_table_destroy(clinfo->nodes_byid);
205
206 g_free(clinfo);
207
208 return TRUE;
209 }
210
211 cfs_clinfo_t *cfs_clinfo_new(
212 const char *cluster_name,
213 uint32_t cman_version)
214 {
215 g_return_val_if_fail(cluster_name != NULL, NULL);
216
217 cfs_clinfo_t *clinfo = g_new0(cfs_clinfo_t, 1);
218 if (!clinfo)
219 return NULL;
220
221 clinfo->cluster_name = g_strdup(cluster_name);
222 clinfo->cman_version = cman_version;
223
224 if (!(clinfo->nodes_byid = g_hash_table_new_full(
225 g_int32_hash, g_int32_equal, NULL,
226 (GDestroyNotify)cfs_clnode_destroy)))
227 goto fail;
228
229 if (!(clinfo->nodes_byname = g_hash_table_new(g_str_hash, g_str_equal)))
230 goto fail;
231
232 return clinfo;
233
234 fail:
235 cfs_clinfo_destroy(clinfo);
236
237 return NULL;
238 }
239
240 gboolean cfs_clinfo_add_node(
241 cfs_clinfo_t *clinfo,
242 cfs_clnode_t *clnode)
243 {
244 g_return_val_if_fail(clinfo != NULL, FALSE);
245 g_return_val_if_fail(clnode != NULL, FALSE);
246
247 g_hash_table_replace(clinfo->nodes_byid, &clnode->nodeid, clnode);
248 g_hash_table_replace(clinfo->nodes_byname, clnode->name, clnode);
249
250 return TRUE;
251 }
252
253 int
254 cfs_create_memberlist_msg(
255 GString *str)
256 {
257 g_return_val_if_fail(str != NULL, -EINVAL);
258
259 g_mutex_lock (&mutex);
260
261 g_string_append_printf(str,"{\n");
262
263 guint nodecount = 0;
264
265 cfs_clinfo_t *clinfo = cfs_status.clinfo;
266
267 if (clinfo && clinfo->nodes_byid)
268 nodecount = g_hash_table_size(clinfo->nodes_byid);
269
270 if (nodecount) {
271 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
272 g_string_append_printf(str, "\"version\": %u,\n", cfs_status.clinfo_version);
273
274 g_string_append_printf(str, "\"cluster\": { ");
275 g_string_append_printf(str, "\"name\": \"%s\", \"version\": %d, "
276 "\"nodes\": %d, \"quorate\": %d ",
277 clinfo->cluster_name, clinfo->cman_version,
278 nodecount, cfs_status.quorate);
279
280 g_string_append_printf(str,"},\n");
281 g_string_append_printf(str,"\"nodelist\": {\n");
282
283 GHashTable *ht = clinfo->nodes_byid;
284 GHashTableIter iter;
285 gpointer key, value;
286
287 g_hash_table_iter_init (&iter, ht);
288
289 int i = 0;
290 while (g_hash_table_iter_next (&iter, &key, &value)) {
291 cfs_clnode_t *node = (cfs_clnode_t *)value;
292 if (i) g_string_append_printf(str, ",\n");
293 i++;
294
295 g_string_append_printf(str, " \"%s\": { \"id\": %d, \"online\": %d",
296 node->name, node->nodeid, node->online);
297
298
299 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, node->name);
300 if (ip) {
301 g_string_append_printf(str, ", \"ip\": \"%s\"", ip);
302 }
303
304 g_string_append_printf(str, "}");
305
306 }
307 g_string_append_printf(str,"\n }\n");
308 } else {
309 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
310 g_string_append_printf(str, "\"version\": %u\n", cfs_status.clinfo_version);
311 }
312
313 g_string_append_printf(str,"}\n");
314
315 g_mutex_unlock (&mutex);
316
317 return 0;
318 }
319
320 static void
321 kventry_free(kventry_t *entry)
322 {
323 g_return_if_fail(entry != NULL);
324
325 g_free(entry->key);
326 g_free(entry->data);
327 g_free(entry);
328 }
329
330 static GHashTable *
331 kventry_hash_new(void)
332 {
333 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
334 (GDestroyNotify)kventry_free);
335 }
336
337 static void
338 rrdentry_free(rrdentry_t *entry)
339 {
340 g_return_if_fail(entry != NULL);
341
342 g_free(entry->key);
343 g_free(entry->data);
344 g_free(entry);
345 }
346
347 static GHashTable *
348 rrdentry_hash_new(void)
349 {
350 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
351 (GDestroyNotify)rrdentry_free);
352 }
353
354 void
355 cfs_cluster_log_dump(GString *str, const char *user, guint max_entries)
356 {
357 clusterlog_dump(cfs_status.clusterlog, str, user, max_entries);
358 }
359
360 void
361 cfs_cluster_log(clog_entry_t *entry)
362 {
363 g_return_if_fail(entry != NULL);
364
365 clusterlog_insert(cfs_status.clusterlog, entry);
366
367 if (cfs_status.kvstore) {
368 struct iovec iov[1];
369 iov[0].iov_base = (char *)entry;
370 iov[0].iov_len = clog_entry_size(entry);
371
372 if (dfsm_is_initialized(cfs_status.kvstore))
373 dfsm_send_message(cfs_status.kvstore, KVSTORE_MESSAGE_LOG, iov, 1);
374 }
375 }
376
377 void cfs_status_init(void)
378 {
379 g_mutex_lock (&mutex);
380
381 cfs_status.start_time = time(NULL);
382
383 cfs_status.vmlist = vmlist_hash_new();
384
385 cfs_status.kvhash = kventry_hash_new();
386
387 cfs_status.rrdhash = rrdentry_hash_new();
388
389 cfs_status.iphash = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
390
391 cfs_status.memdb_changes = g_hash_table_new(g_str_hash, g_str_equal);
392
393 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
394 g_hash_table_replace(cfs_status.memdb_changes,
395 memdb_change_array[i].path,
396 &memdb_change_array[i]);
397 }
398
399 cfs_status.clusterlog = clusterlog_new();
400
401 // fixme:
402 clusterlog_add(cfs_status.clusterlog, "root", "cluster", getpid(),
403 LOG_INFO, "starting cluster log");
404
405 g_mutex_unlock (&mutex);
406 }
407
408 void cfs_status_cleanup(void)
409 {
410 g_mutex_lock (&mutex);
411
412 cfs_status.clinfo_version++;
413
414 if (cfs_status.clinfo) {
415 cfs_clinfo_destroy(cfs_status.clinfo);
416 cfs_status.clinfo = NULL;
417 }
418
419 if (cfs_status.vmlist) {
420 g_hash_table_destroy(cfs_status.vmlist);
421 cfs_status.vmlist = NULL;
422 }
423
424 if (cfs_status.kvhash) {
425 g_hash_table_destroy(cfs_status.kvhash);
426 cfs_status.kvhash = NULL;
427 }
428
429 if (cfs_status.rrdhash) {
430 g_hash_table_destroy(cfs_status.rrdhash);
431 cfs_status.rrdhash = NULL;
432 }
433
434 if (cfs_status.iphash) {
435 g_hash_table_destroy(cfs_status.iphash);
436 cfs_status.iphash = NULL;
437 }
438
439 if (cfs_status.clusterlog)
440 clusterlog_destroy(cfs_status.clusterlog);
441
442 g_mutex_unlock (&mutex);
443 }
444
445 void cfs_status_set_clinfo(
446 cfs_clinfo_t *clinfo)
447 {
448 g_return_if_fail(clinfo != NULL);
449
450 g_mutex_lock (&mutex);
451
452 cfs_status.clinfo_version++;
453
454 cfs_clinfo_t *old = cfs_status.clinfo;
455
456 cfs_status.clinfo = clinfo;
457
458 cfs_message("update cluster info (cluster name %s, version = %d)",
459 clinfo->cluster_name, clinfo->cman_version);
460
461
462 if (old && old->nodes_byid && clinfo->nodes_byid) {
463 /* copy kvstore */
464 GHashTable *ht = clinfo->nodes_byid;
465 GHashTableIter iter;
466 gpointer key, value;
467
468 g_hash_table_iter_init (&iter, ht);
469
470 while (g_hash_table_iter_next (&iter, &key, &value)) {
471 cfs_clnode_t *node = (cfs_clnode_t *)value;
472 cfs_clnode_t *oldnode;
473 if ((oldnode = g_hash_table_lookup(old->nodes_byid, key))) {
474 node->online = oldnode->online;
475 node->kvhash = oldnode->kvhash;
476 oldnode->kvhash = NULL;
477 }
478 }
479
480 }
481
482 if (old)
483 cfs_clinfo_destroy(old);
484
485
486 g_mutex_unlock (&mutex);
487 }
488
489 static void
490 dump_kvstore_versions(
491 GString *str,
492 GHashTable *kvhash,
493 const char *nodename)
494 {
495 g_return_if_fail(kvhash != NULL);
496 g_return_if_fail(str != NULL);
497 g_return_if_fail(nodename != NULL);
498
499 GHashTable *ht = kvhash;
500 GHashTableIter iter;
501 gpointer key, value;
502
503 g_string_append_printf(str, "\"%s\": {\n", nodename);
504
505 g_hash_table_iter_init (&iter, ht);
506
507 int i = 0;
508 while (g_hash_table_iter_next (&iter, &key, &value)) {
509 kventry_t *entry = (kventry_t *)value;
510 if (i) g_string_append_printf(str, ",\n");
511 i++;
512 g_string_append_printf(str,"\"%s\": %u", entry->key, entry->version);
513 }
514
515 g_string_append_printf(str, "}\n");
516 }
517
518 int
519 cfs_create_version_msg(GString *str)
520 {
521 g_return_val_if_fail(str != NULL, -EINVAL);
522
523 g_mutex_lock (&mutex);
524
525 g_string_append_printf(str,"{\n");
526
527 g_string_append_printf(str, "\"starttime\": %lu,\n", (unsigned long)cfs_status.start_time);
528
529 g_string_append_printf(str, "\"clinfo\": %u,\n", cfs_status.clinfo_version);
530
531 g_string_append_printf(str, "\"vmlist\": %u,\n", cfs_status.vmlist_version);
532
533 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
534 g_string_append_printf(str, "\"%s\": %u,\n",
535 memdb_change_array[i].path,
536 memdb_change_array[i].version);
537 }
538
539 g_string_append_printf(str, "\"kvstore\": {\n");
540
541 dump_kvstore_versions(str, cfs_status.kvhash, cfs.nodename);
542
543 cfs_clinfo_t *clinfo = cfs_status.clinfo;
544
545 if (clinfo && clinfo->nodes_byid) {
546 GHashTable *ht = clinfo->nodes_byid;
547 GHashTableIter iter;
548 gpointer key, value;
549
550 g_hash_table_iter_init (&iter, ht);
551
552 while (g_hash_table_iter_next (&iter, &key, &value)) {
553 cfs_clnode_t *node = (cfs_clnode_t *)value;
554 if (!node->kvhash)
555 continue;
556 g_string_append_printf(str, ",\n");
557 dump_kvstore_versions(str, node->kvhash, node->name);
558 }
559 }
560
561 g_string_append_printf(str,"}\n");
562
563 g_string_append_printf(str,"}\n");
564
565 g_mutex_unlock (&mutex);
566
567 return 0;
568 }
569
570 GHashTable *
571 vmlist_hash_new(void)
572 {
573 return g_hash_table_new_full(g_int_hash, g_int_equal, NULL,
574 (GDestroyNotify)vminfo_free);
575 }
576
577 gboolean
578 vmlist_hash_insert_vm(
579 GHashTable *vmlist,
580 int vmtype,
581 guint32 vmid,
582 const char *nodename,
583 gboolean replace)
584 {
585 g_return_val_if_fail(vmlist != NULL, FALSE);
586 g_return_val_if_fail(nodename != NULL, FALSE);
587 g_return_val_if_fail(vmid != 0, FALSE);
588 g_return_val_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
589 vmtype == VMTYPE_LXC, FALSE);
590
591 if (!replace && g_hash_table_lookup(vmlist, &vmid)) {
592 cfs_critical("detected duplicate VMID %d", vmid);
593 return FALSE;
594 }
595
596 vminfo_t *vminfo = g_new0(vminfo_t, 1);
597
598 vminfo->vmid = vmid;
599 vminfo->vmtype = vmtype;
600 vminfo->nodename = g_strdup(nodename);
601
602 vminfo->version = ++vminfo_version_counter;
603
604 g_hash_table_replace(vmlist, &vminfo->vmid, vminfo);
605
606 return TRUE;
607 }
608
609 void
610 vmlist_register_vm(
611 int vmtype,
612 guint32 vmid,
613 const char *nodename)
614 {
615 g_return_if_fail(cfs_status.vmlist != NULL);
616 g_return_if_fail(nodename != NULL);
617 g_return_if_fail(vmid != 0);
618 g_return_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
619 vmtype == VMTYPE_LXC);
620
621 cfs_debug("vmlist_register_vm: %s/%u %d", nodename, vmid, vmtype);
622
623 g_mutex_lock (&mutex);
624
625 cfs_status.vmlist_version++;
626
627 vmlist_hash_insert_vm(cfs_status.vmlist, vmtype, vmid, nodename, TRUE);
628
629 g_mutex_unlock (&mutex);
630 }
631
632 gboolean
633 vmlist_different_vm_exists(
634 int vmtype,
635 guint32 vmid,
636 const char *nodename)
637 {
638 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
639 g_return_val_if_fail(vmid != 0, FALSE);
640
641 gboolean res = FALSE;
642
643 g_mutex_lock (&mutex);
644
645 vminfo_t *vminfo;
646 if ((vminfo = (vminfo_t *)g_hash_table_lookup(cfs_status.vmlist, &vmid))) {
647 if (!(vminfo->vmtype == vmtype && strcmp(vminfo->nodename, nodename) == 0))
648 res = TRUE;
649 }
650 g_mutex_unlock (&mutex);
651
652 return res;
653 }
654
655 gboolean
656 vmlist_vm_exists(
657 guint32 vmid)
658 {
659 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
660 g_return_val_if_fail(vmid != 0, FALSE);
661
662 g_mutex_lock (&mutex);
663
664 gpointer res = g_hash_table_lookup(cfs_status.vmlist, &vmid);
665
666 g_mutex_unlock (&mutex);
667
668 return res != NULL;
669 }
670
671 void
672 vmlist_delete_vm(
673 guint32 vmid)
674 {
675 g_return_if_fail(cfs_status.vmlist != NULL);
676 g_return_if_fail(vmid != 0);
677
678 g_mutex_lock (&mutex);
679
680 cfs_status.vmlist_version++;
681
682 g_hash_table_remove(cfs_status.vmlist, &vmid);
683
684 g_mutex_unlock (&mutex);
685 }
686
687 void cfs_status_set_vmlist(
688 GHashTable *vmlist)
689 {
690 g_return_if_fail(vmlist != NULL);
691
692 g_mutex_lock (&mutex);
693
694 cfs_status.vmlist_version++;
695
696 if (cfs_status.vmlist)
697 g_hash_table_destroy(cfs_status.vmlist);
698
699 cfs_status.vmlist = vmlist;
700
701 g_mutex_unlock (&mutex);
702 }
703
704 int
705 cfs_create_vmlist_msg(GString *str)
706 {
707 g_return_val_if_fail(cfs_status.vmlist != NULL, -EINVAL);
708 g_return_val_if_fail(str != NULL, -EINVAL);
709
710 g_mutex_lock (&mutex);
711
712 g_string_append_printf(str,"{\n");
713
714 GHashTable *ht = cfs_status.vmlist;
715
716 guint count = g_hash_table_size(ht);
717
718 if (!count) {
719 g_string_append_printf(str,"\"version\": %u\n", cfs_status.vmlist_version);
720 } else {
721 g_string_append_printf(str,"\"version\": %u,\n", cfs_status.vmlist_version);
722
723 g_string_append_printf(str,"\"ids\": {\n");
724
725 GHashTableIter iter;
726 gpointer key, value;
727
728 g_hash_table_iter_init (&iter, ht);
729
730 int first = 1;
731 while (g_hash_table_iter_next (&iter, &key, &value)) {
732 vminfo_t *vminfo = (vminfo_t *)value;
733 char *type;
734 if (vminfo->vmtype == VMTYPE_QEMU) {
735 type = "qemu";
736 } else if (vminfo->vmtype == VMTYPE_OPENVZ) {
737 type = "openvz";
738 } else if (vminfo->vmtype == VMTYPE_LXC) {
739 type = "lxc";
740 } else {
741 type = "unknown";
742 }
743
744 if (!first)
745 g_string_append_printf(str, ",\n");
746 first = 0;
747
748 g_string_append_printf(str,"\"%u\": { \"node\": \"%s\", \"type\": \"%s\", \"version\": %u }",
749 vminfo->vmid, vminfo->nodename, type, vminfo->version);
750 }
751
752 g_string_append_printf(str,"}\n");
753 }
754 g_string_append_printf(str,"\n}\n");
755
756 g_mutex_unlock (&mutex);
757
758 return 0;
759 }
760
761 void
762 record_memdb_change(const char *path)
763 {
764 g_return_if_fail(cfs_status.memdb_changes != 0);
765
766 memdb_change_t *ce;
767
768 if ((ce = (memdb_change_t *)g_hash_table_lookup(cfs_status.memdb_changes, path))) {
769 ce->version++;
770 }
771 }
772
773 void
774 record_memdb_reload(void)
775 {
776 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
777 memdb_change_array[i].version++;
778 }
779 }
780
781 static gboolean
782 kventry_hash_set(
783 GHashTable *kvhash,
784 const char *key,
785 gconstpointer data,
786 size_t len)
787 {
788 g_return_val_if_fail(kvhash != NULL, FALSE);
789 g_return_val_if_fail(key != NULL, FALSE);
790 g_return_val_if_fail(data != NULL, FALSE);
791
792 kventry_t *entry;
793 if ((entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
794 g_free(entry->data);
795 entry->data = g_memdup(data, len);
796 entry->len = len;
797 entry->version++;
798 } else {
799 kventry_t *entry = g_new0(kventry_t, 1);
800
801 entry->key = g_strdup(key);
802 entry->data = g_memdup(data, len);
803 entry->len = len;
804
805 g_hash_table_replace(kvhash, entry->key, entry);
806 }
807
808 return TRUE;
809 }
810
811 static const char *rrd_def_node[] = {
812 "DS:loadavg:GAUGE:120:0:U",
813 "DS:maxcpu:GAUGE:120:0:U",
814 "DS:cpu:GAUGE:120:0:U",
815 "DS:iowait:GAUGE:120:0:U",
816 "DS:memtotal:GAUGE:120:0:U",
817 "DS:memused:GAUGE:120:0:U",
818 "DS:swaptotal:GAUGE:120:0:U",
819 "DS:swapused:GAUGE:120:0:U",
820 "DS:roottotal:GAUGE:120:0:U",
821 "DS:rootused:GAUGE:120:0:U",
822 "DS:netin:DERIVE:120:0:U",
823 "DS:netout:DERIVE:120:0:U",
824
825 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
826 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
827 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
828 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
829 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
830
831 "RRA:MAX:0.5:1:70", // 1 min max - one hour
832 "RRA:MAX:0.5:30:70", // 30 min max - one day
833 "RRA:MAX:0.5:180:70", // 3 hour max - one week
834 "RRA:MAX:0.5:720:70", // 12 hour max - one month
835 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
836 NULL,
837 };
838
839 static const char *rrd_def_vm[] = {
840 "DS:maxcpu:GAUGE:120:0:U",
841 "DS:cpu:GAUGE:120:0:U",
842 "DS:maxmem:GAUGE:120:0:U",
843 "DS:mem:GAUGE:120:0:U",
844 "DS:maxdisk:GAUGE:120:0:U",
845 "DS:disk:GAUGE:120:0:U",
846 "DS:netin:DERIVE:120:0:U",
847 "DS:netout:DERIVE:120:0:U",
848 "DS:diskread:DERIVE:120:0:U",
849 "DS:diskwrite:DERIVE:120:0:U",
850
851 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
852 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
853 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
854 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
855 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
856
857 "RRA:MAX:0.5:1:70", // 1 min max - one hour
858 "RRA:MAX:0.5:30:70", // 30 min max - one day
859 "RRA:MAX:0.5:180:70", // 3 hour max - one week
860 "RRA:MAX:0.5:720:70", // 12 hour max - one month
861 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
862 NULL,
863 };
864
865 static const char *rrd_def_storage[] = {
866 "DS:total:GAUGE:120:0:U",
867 "DS:used:GAUGE:120:0:U",
868
869 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
870 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
871 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
872 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
873 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
874
875 "RRA:MAX:0.5:1:70", // 1 min max - one hour
876 "RRA:MAX:0.5:30:70", // 30 min max - one day
877 "RRA:MAX:0.5:180:70", // 3 hour max - one week
878 "RRA:MAX:0.5:720:70", // 12 hour max - one month
879 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
880 NULL,
881 };
882
883 #define RRDDIR "/var/lib/rrdcached/db"
884
885 static void
886 create_rrd_file(
887 const char *filename,
888 int argcount,
889 const char *rrddef[])
890 {
891 /* start at day boundary */
892 time_t ctime;
893 time(&ctime);
894 struct tm *ltm = localtime(&ctime);
895 ltm->tm_sec = 0;
896 ltm->tm_min = 0;
897 ltm->tm_hour = 0;
898
899 rrd_clear_error();
900 if (rrd_create_r(filename, 60, timelocal(ltm), argcount, rrddef)) {
901 cfs_message("RRD create error %s: %s", filename, rrd_get_error());
902 }
903 }
904
905 static inline const char *
906 rrd_skip_data(
907 const char *data,
908 int count)
909 {
910 int found = 0;
911 while (*data && found < count) {
912 if (*data++ == ':')
913 found++;
914 }
915 return data;
916 }
917
918 static void
919 update_rrd_data(
920 const char *key,
921 gconstpointer data,
922 size_t len)
923 {
924 g_return_if_fail(key != NULL);
925 g_return_if_fail(data != NULL);
926 g_return_if_fail(len > 0);
927 g_return_if_fail(len < 4096);
928
929 static const char *rrdcsock = "unix:/var/run/rrdcached.sock";
930
931 int use_daemon = 1;
932 if (rrdc_connect(rrdcsock) != 0)
933 use_daemon = 0;
934
935 char *filename = NULL;
936
937 int skip = 0;
938
939 if (strncmp(key, "pve2-node/", 10) == 0) {
940 const char *node = key + 10;
941
942 skip = 2;
943
944 if (strchr(node, '/') != NULL)
945 goto keyerror;
946
947 if (strlen(node) < 1)
948 goto keyerror;
949
950 filename = g_strdup_printf(RRDDIR "/%s", key);
951
952 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
953
954 mkdir(RRDDIR "/pve2-node", 0755);
955 int argcount = sizeof(rrd_def_node)/sizeof(void*) - 1;
956 create_rrd_file(filename, argcount, rrd_def_node);
957 }
958
959 } else if ((strncmp(key, "pve2-vm/", 8) == 0) ||
960 (strncmp(key, "pve2.3-vm/", 10) == 0)) {
961 const char *vmid;
962
963 if (strncmp(key, "pve2-vm/", 8) == 0) {
964 vmid = key + 8;
965 skip = 2;
966 } else {
967 vmid = key + 10;
968 skip = 4;
969 }
970
971 if (strchr(vmid, '/') != NULL)
972 goto keyerror;
973
974 if (strlen(vmid) < 1)
975 goto keyerror;
976
977 filename = g_strdup_printf(RRDDIR "/%s/%s", "pve2-vm", vmid);
978
979 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
980
981 mkdir(RRDDIR "/pve2-vm", 0755);
982 int argcount = sizeof(rrd_def_vm)/sizeof(void*) - 1;
983 create_rrd_file(filename, argcount, rrd_def_vm);
984 }
985
986 } else if (strncmp(key, "pve2-storage/", 13) == 0) {
987 const char *node = key + 13;
988
989 const char *storage = node;
990 while (*storage && *storage != '/')
991 storage++;
992
993 if (*storage != '/' || ((storage - node) < 1))
994 goto keyerror;
995
996 storage++;
997
998 if (strchr(storage, '/') != NULL)
999 goto keyerror;
1000
1001 if (strlen(storage) < 1)
1002 goto keyerror;
1003
1004 filename = g_strdup_printf(RRDDIR "/%s", key);
1005
1006 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1007
1008 mkdir(RRDDIR "/pve2-storage", 0755);
1009
1010 char *dir = g_path_get_dirname(filename);
1011 mkdir(dir, 0755);
1012 g_free(dir);
1013
1014 int argcount = sizeof(rrd_def_storage)/sizeof(void*) - 1;
1015 create_rrd_file(filename, argcount, rrd_def_storage);
1016 }
1017
1018 } else {
1019 goto keyerror;
1020 }
1021
1022 const char *dp = skip ? rrd_skip_data(data, skip) : data;
1023
1024 const char *update_args[] = { dp, NULL };
1025
1026 if (use_daemon) {
1027 int status;
1028 if ((status = rrdc_update(filename, 1, update_args)) != 0) {
1029 cfs_message("RRDC update error %s: %d", filename, status);
1030 rrdc_disconnect();
1031 rrd_clear_error();
1032 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1033 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1034 }
1035 }
1036
1037 } else {
1038 rrd_clear_error();
1039 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1040 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1041 }
1042 }
1043
1044 ret:
1045 if (filename)
1046 g_free(filename);
1047
1048 return;
1049
1050 keyerror:
1051 cfs_critical("RRD update error: unknown/wrong key %s", key);
1052 goto ret;
1053 }
1054
1055 static gboolean
1056 rrd_entry_is_old(
1057 gpointer key,
1058 gpointer value,
1059 gpointer user_data)
1060 {
1061 rrdentry_t *entry = (rrdentry_t *)value;
1062 uint32_t ctime = GPOINTER_TO_UINT(user_data);
1063
1064 int diff = ctime - entry->time;
1065
1066 /* remove everything older than 5 minutes */
1067 int expire = 60*5;
1068
1069 return (diff > expire) ? TRUE : FALSE;
1070 }
1071
1072 static char *rrd_dump_buf = NULL;
1073 static time_t rrd_dump_last = 0;
1074
1075 void
1076 cfs_rrd_dump(GString *str)
1077 {
1078 time_t ctime;
1079
1080 g_mutex_lock (&mutex);
1081
1082 time(&ctime);
1083 if (rrd_dump_buf && (ctime - rrd_dump_last) < 2) {
1084 g_string_assign(str, rrd_dump_buf);
1085 g_mutex_unlock (&mutex);
1086 return;
1087 }
1088
1089 /* remove old data */
1090 g_hash_table_foreach_remove(cfs_status.rrdhash, rrd_entry_is_old,
1091 GUINT_TO_POINTER(ctime));
1092
1093 g_string_set_size(str, 0);
1094
1095 GHashTableIter iter;
1096 gpointer key, value;
1097
1098 g_hash_table_iter_init (&iter, cfs_status.rrdhash);
1099
1100 while (g_hash_table_iter_next (&iter, &key, &value)) {
1101 rrdentry_t *entry = (rrdentry_t *)value;
1102 g_string_append(str, key);
1103 g_string_append(str, ":");
1104 g_string_append(str, entry->data);
1105 g_string_append(str, "\n");
1106 }
1107
1108 g_string_append_c(str, 0); // never return undef
1109
1110 rrd_dump_last = ctime;
1111 if (rrd_dump_buf)
1112 g_free(rrd_dump_buf);
1113 rrd_dump_buf = g_strdup(str->str);
1114
1115 g_mutex_unlock (&mutex);
1116 }
1117
1118 static gboolean
1119 nodeip_hash_set(
1120 GHashTable *iphash,
1121 const char *nodename,
1122 const char *ip,
1123 size_t len)
1124 {
1125 g_return_val_if_fail(iphash != NULL, FALSE);
1126 g_return_val_if_fail(nodename != NULL, FALSE);
1127 g_return_val_if_fail(ip != NULL, FALSE);
1128 g_return_val_if_fail(len > 0, FALSE);
1129 g_return_val_if_fail(len < 256, FALSE);
1130 g_return_val_if_fail(ip[len-1] == 0, FALSE);
1131
1132 char *oldip = (char *)g_hash_table_lookup(iphash, nodename);
1133
1134 if (!oldip || (strcmp(oldip, ip) != 0)) {
1135 cfs_status.clinfo_version++;
1136 g_hash_table_replace(iphash, g_strdup(nodename), g_strdup(ip));
1137 }
1138
1139 return TRUE;
1140 }
1141
1142 static gboolean
1143 rrdentry_hash_set(
1144 GHashTable *rrdhash,
1145 const char *key,
1146 const char *data,
1147 size_t len)
1148 {
1149 g_return_val_if_fail(rrdhash != NULL, FALSE);
1150 g_return_val_if_fail(key != NULL, FALSE);
1151 g_return_val_if_fail(data != NULL, FALSE);
1152 g_return_val_if_fail(len > 0, FALSE);
1153 g_return_val_if_fail(len < 4096, FALSE);
1154 g_return_val_if_fail(data[len-1] == 0, FALSE);
1155
1156 rrdentry_t *entry;
1157 if ((entry = (rrdentry_t *)g_hash_table_lookup(rrdhash, key))) {
1158 g_free(entry->data);
1159 entry->data = g_memdup(data, len);
1160 entry->len = len;
1161 entry->time = time(NULL);
1162 } else {
1163 rrdentry_t *entry = g_new0(rrdentry_t, 1);
1164
1165 entry->key = g_strdup(key);
1166 entry->data = g_memdup(data, len);
1167 entry->len = len;
1168 entry->time = time(NULL);
1169
1170 g_hash_table_replace(rrdhash, entry->key, entry);
1171 }
1172
1173 update_rrd_data(key, data, len);
1174
1175 return TRUE;
1176 }
1177
1178 static int
1179 kvstore_send_update_message(
1180 dfsm_t *dfsm,
1181 const char *key,
1182 gpointer data,
1183 guint32 len)
1184 {
1185 if (!dfsm_is_initialized(dfsm))
1186 return -EACCES;
1187
1188 struct iovec iov[2];
1189
1190 char name[256];
1191 g_strlcpy(name, key, sizeof(name));
1192
1193 iov[0].iov_base = &name;
1194 iov[0].iov_len = sizeof(name);
1195
1196 iov[1].iov_base = (char *)data;
1197 iov[1].iov_len = len;
1198
1199 if (dfsm_send_message(dfsm, KVSTORE_MESSAGE_UPDATE, iov, 2) == CS_OK)
1200 return 0;
1201
1202 return -EACCES;
1203 }
1204
1205 static clog_entry_t *
1206 kvstore_parse_log_message(
1207 const void *msg,
1208 size_t msg_len)
1209 {
1210 g_return_val_if_fail(msg != NULL, NULL);
1211
1212 if (msg_len < sizeof(clog_entry_t)) {
1213 cfs_critical("received short log message (%zu < %zu)", msg_len, sizeof(clog_entry_t));
1214 return NULL;
1215 }
1216
1217 clog_entry_t *entry = (clog_entry_t *)msg;
1218
1219 uint32_t size = sizeof(clog_entry_t) + entry->node_len +
1220 entry->ident_len + entry->tag_len + entry->msg_len;
1221
1222 if (msg_len != size) {
1223 cfs_critical("received log message with wrong size (%zu != %u)", msg_len, size);
1224 return NULL;
1225 }
1226
1227 msg = entry->data;
1228
1229 if (*((char *)msg + entry->node_len - 1)) {
1230 cfs_critical("unterminated string in log message");
1231 return NULL;
1232 }
1233 msg += entry->node_len;
1234
1235 if (*((char *)msg + entry->ident_len - 1)) {
1236 cfs_critical("unterminated string in log message");
1237 return NULL;
1238 }
1239 msg += entry->ident_len;
1240
1241 if (*((char *)msg + entry->tag_len - 1)) {
1242 cfs_critical("unterminated string in log message");
1243 return NULL;
1244 }
1245 msg += entry->tag_len;
1246
1247 if (*((char *)msg + entry->msg_len - 1)) {
1248 cfs_critical("unterminated string in log message");
1249 return NULL;
1250 }
1251
1252 return entry;
1253 }
1254
1255 static gboolean
1256 kvstore_parse_update_message(
1257 const void *msg,
1258 size_t msg_len,
1259 const char **key,
1260 gconstpointer *data,
1261 guint32 *len)
1262 {
1263 g_return_val_if_fail(msg != NULL, FALSE);
1264 g_return_val_if_fail(key != NULL, FALSE);
1265 g_return_val_if_fail(data != NULL, FALSE);
1266 g_return_val_if_fail(len != NULL, FALSE);
1267
1268 if (msg_len < 256) {
1269 cfs_critical("received short kvstore message (%zu < 256)", msg_len);
1270 return FALSE;
1271 }
1272
1273 /* test if key is null terminated */
1274 int i = 0;
1275 for (i = 0; i < 256; i++)
1276 if (((char *)msg)[i] == 0)
1277 break;
1278
1279 if (i == 256)
1280 return FALSE;
1281
1282
1283 *len = msg_len - 256;
1284 *key = msg;
1285 *data = msg + 256;
1286
1287 return TRUE;
1288 }
1289
1290 int
1291 cfs_create_status_msg(
1292 GString *str,
1293 const char *nodename,
1294 const char *key)
1295 {
1296 g_return_val_if_fail(str != NULL, -EINVAL);
1297 g_return_val_if_fail(key != NULL, -EINVAL);
1298
1299 int res = -ENOENT;
1300
1301 GHashTable *kvhash = NULL;
1302
1303 g_mutex_lock (&mutex);
1304
1305 if (!nodename || !nodename[0] || !strcmp(nodename, cfs.nodename)) {
1306 kvhash = cfs_status.kvhash;
1307 } else if (cfs_status.clinfo && cfs_status.clinfo->nodes_byname) {
1308 cfs_clnode_t *clnode;
1309 if ((clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byname, nodename)))
1310 kvhash = clnode->kvhash;
1311 }
1312
1313 kventry_t *entry;
1314 if (kvhash && (entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
1315 g_string_append_len(str, entry->data, entry->len);
1316 res = 0;
1317 }
1318
1319 g_mutex_unlock (&mutex);
1320
1321 return res;
1322 }
1323
1324 int
1325 cfs_status_set(
1326 const char *key,
1327 gpointer data,
1328 size_t len)
1329 {
1330 g_return_val_if_fail(key != NULL, FALSE);
1331 g_return_val_if_fail(data != NULL, FALSE);
1332 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1333
1334 if (len > CFS_MAX_STATUS_SIZE)
1335 return -EFBIG;
1336
1337 g_mutex_lock (&mutex);
1338
1339 gboolean res;
1340
1341 if (strncmp(key, "rrd/", 4) == 0) {
1342 res = rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1343 } else if (!strcmp(key, "nodeip")) {
1344 res = nodeip_hash_set(cfs_status.iphash, cfs.nodename, data, len);
1345 } else {
1346 res = kventry_hash_set(cfs_status.kvhash, key, data, len);
1347 }
1348 g_mutex_unlock (&mutex);
1349
1350 if (cfs_status.kvstore)
1351 kvstore_send_update_message(cfs_status.kvstore, key, data, len);
1352
1353 return res ? 0 : -ENOMEM;
1354 }
1355
1356 gboolean
1357 cfs_kvstore_node_set(
1358 uint32_t nodeid,
1359 const char *key,
1360 gconstpointer data,
1361 size_t len)
1362 {
1363 g_return_val_if_fail(nodeid != 0, FALSE);
1364 g_return_val_if_fail(key != NULL, FALSE);
1365 g_return_val_if_fail(data != NULL, FALSE);
1366
1367 g_mutex_lock (&mutex);
1368
1369 if (!cfs_status.clinfo || !cfs_status.clinfo->nodes_byid)
1370 goto ret; /* ignore */
1371
1372 cfs_clnode_t *clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byid, &nodeid);
1373 if (!clnode)
1374 goto ret; /* ignore */
1375
1376 cfs_debug("got node %d status update %s", nodeid, key);
1377
1378 if (strncmp(key, "rrd/", 4) == 0) {
1379 rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1380 } else if (!strcmp(key, "nodeip")) {
1381 nodeip_hash_set(cfs_status.iphash, clnode->name, data, len);
1382 } else {
1383 if (!clnode->kvhash) {
1384 if (!(clnode->kvhash = kventry_hash_new())) {
1385 goto ret; /*ignore */
1386 }
1387 }
1388
1389 kventry_hash_set(clnode->kvhash, key, data, len);
1390
1391 }
1392 ret:
1393 g_mutex_unlock (&mutex);
1394
1395 return TRUE;
1396 }
1397
1398 static gboolean
1399 cfs_kvstore_sync(void)
1400 {
1401 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1402 g_return_val_if_fail(cfs_status.kvstore != NULL, FALSE);
1403
1404 gboolean res = TRUE;
1405
1406 g_mutex_lock (&mutex);
1407
1408 GHashTable *ht = cfs_status.kvhash;
1409 GHashTableIter iter;
1410 gpointer key, value;
1411
1412 g_hash_table_iter_init (&iter, ht);
1413
1414 while (g_hash_table_iter_next (&iter, &key, &value)) {
1415 kventry_t *entry = (kventry_t *)value;
1416 kvstore_send_update_message(cfs_status.kvstore, entry->key, entry->data, entry->len);
1417 }
1418
1419 g_mutex_unlock (&mutex);
1420
1421 return res;
1422 }
1423
1424 static int
1425 dfsm_deliver(
1426 dfsm_t *dfsm,
1427 gpointer data,
1428 int *res_ptr,
1429 uint32_t nodeid,
1430 uint32_t pid,
1431 uint16_t msg_type,
1432 uint32_t msg_time,
1433 const void *msg,
1434 size_t msg_len)
1435 {
1436 g_return_val_if_fail(dfsm != NULL, -1);
1437 g_return_val_if_fail(msg != NULL, -1);
1438 g_return_val_if_fail(res_ptr != NULL, -1);
1439
1440 /* ignore message for ourself */
1441 if (dfsm_nodeid_is_local(dfsm, nodeid, pid))
1442 goto ret;
1443
1444 if (msg_type == KVSTORE_MESSAGE_UPDATE) {
1445 const char *key;
1446 gconstpointer data;
1447 guint32 len;
1448 if (kvstore_parse_update_message(msg, msg_len, &key, &data, &len)) {
1449 cfs_kvstore_node_set(nodeid, key, data, len);
1450 } else {
1451 cfs_critical("cant parse update message");
1452 }
1453 } else if (msg_type == KVSTORE_MESSAGE_LOG) {
1454 cfs_message("received log"); // fixme: remove
1455 const clog_entry_t *entry;
1456 if ((entry = kvstore_parse_log_message(msg, msg_len))) {
1457 clusterlog_insert(cfs_status.clusterlog, entry);
1458 } else {
1459 cfs_critical("cant parse log message");
1460 }
1461 } else {
1462 cfs_critical("received unknown message type %d\n", msg_type);
1463 goto fail;
1464 }
1465
1466 ret:
1467 *res_ptr = 0;
1468 return 1;
1469
1470 fail:
1471 *res_ptr = -EACCES;
1472 return 1;
1473 }
1474
1475 static void
1476 dfsm_confchg(
1477 dfsm_t *dfsm,
1478 gpointer data,
1479 const struct cpg_address *member_list,
1480 size_t member_list_entries)
1481 {
1482 g_return_if_fail(dfsm != NULL);
1483 g_return_if_fail(member_list != NULL);
1484
1485 cfs_debug("enter %s", __func__);
1486
1487 g_mutex_lock (&mutex);
1488
1489 cfs_clinfo_t *clinfo = cfs_status.clinfo;
1490
1491 if (clinfo && clinfo->nodes_byid) {
1492
1493 GHashTable *ht = clinfo->nodes_byid;
1494 GHashTableIter iter;
1495 gpointer key, value;
1496
1497 g_hash_table_iter_init (&iter, ht);
1498
1499 while (g_hash_table_iter_next (&iter, &key, &value)) {
1500 cfs_clnode_t *node = (cfs_clnode_t *)value;
1501 node->online = FALSE;
1502 }
1503
1504 for (int i = 0; i < member_list_entries; i++) {
1505 cfs_clnode_t *node;
1506 if ((node = g_hash_table_lookup(clinfo->nodes_byid, &member_list[i].nodeid))) {
1507 node->online = TRUE;
1508 }
1509 }
1510
1511 cfs_status.clinfo_version++;
1512 }
1513
1514 g_mutex_unlock (&mutex);
1515 }
1516
1517 static gpointer
1518 dfsm_get_state(
1519 dfsm_t *dfsm,
1520 gpointer data,
1521 unsigned int *res_len)
1522 {
1523 g_return_val_if_fail(dfsm != NULL, NULL);
1524
1525 gpointer msg = clusterlog_get_state(cfs_status.clusterlog, res_len);
1526
1527 return msg;
1528 }
1529
1530 static int
1531 dfsm_process_update(
1532 dfsm_t *dfsm,
1533 gpointer data,
1534 dfsm_sync_info_t *syncinfo,
1535 uint32_t nodeid,
1536 uint32_t pid,
1537 const void *msg,
1538 size_t msg_len)
1539 {
1540 cfs_critical("%s: received unexpected update message", __func__);
1541
1542 return -1;
1543 }
1544
1545 static int
1546 dfsm_process_state_update(
1547 dfsm_t *dfsm,
1548 gpointer data,
1549 dfsm_sync_info_t *syncinfo)
1550 {
1551 g_return_val_if_fail(dfsm != NULL, -1);
1552 g_return_val_if_fail(syncinfo != NULL, -1);
1553
1554 clog_base_t *clog[syncinfo->node_count];
1555
1556 int local_index = -1;
1557 for (int i = 0; i < syncinfo->node_count; i++) {
1558 dfsm_node_info_t *ni = &syncinfo->nodes[i];
1559 ni->synced = 1;
1560
1561 if (syncinfo->local == ni)
1562 local_index = i;
1563
1564 clog_base_t *base = (clog_base_t *)ni->state;
1565 if (ni->state_len > 8 && ni->state_len == clog_size(base)) {
1566 clog[i] = ni->state;
1567 } else {
1568 cfs_critical("received log with wrong size %u", ni->state_len);
1569 clog[i] = NULL;
1570 }
1571 }
1572
1573 if (!clusterlog_merge(cfs_status.clusterlog, clog, syncinfo->node_count, local_index)) {
1574 cfs_critical("unable to merge log files");
1575 }
1576
1577 cfs_kvstore_sync();
1578
1579 return 1;
1580 }
1581
1582 static int
1583 dfsm_commit(
1584 dfsm_t *dfsm,
1585 gpointer data,
1586 dfsm_sync_info_t *syncinfo)
1587 {
1588 g_return_val_if_fail(dfsm != NULL, -1);
1589 g_return_val_if_fail(syncinfo != NULL, -1);
1590
1591 return 1;
1592 }
1593
1594 static void
1595 dfsm_synced(dfsm_t *dfsm)
1596 {
1597 g_return_if_fail(dfsm != NULL);
1598
1599 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, cfs.nodename);
1600 if (!ip)
1601 ip = cfs.ip;
1602
1603 cfs_status_set("nodeip", ip, strlen(ip) + 1);
1604 }
1605
1606 static int
1607 dfsm_cleanup(
1608 dfsm_t *dfsm,
1609 gpointer data,
1610 dfsm_sync_info_t *syncinfo)
1611 {
1612 return 1;
1613 }
1614
1615 static dfsm_callbacks_t kvstore_dfsm_callbacks = {
1616 .dfsm_deliver_fn = dfsm_deliver,
1617 .dfsm_confchg_fn = dfsm_confchg,
1618
1619 .dfsm_get_state_fn = dfsm_get_state,
1620 .dfsm_process_state_update_fn = dfsm_process_state_update,
1621 .dfsm_process_update_fn = dfsm_process_update,
1622 .dfsm_commit_fn = dfsm_commit,
1623 .dfsm_cleanup_fn = dfsm_cleanup,
1624 .dfsm_synced_fn = dfsm_synced,
1625 };
1626
1627 dfsm_t *
1628 cfs_status_dfsm_new(void)
1629 {
1630 g_mutex_lock (&mutex);
1631
1632 cfs_status.kvstore = dfsm_new(NULL, KVSTORE_CPG_GROUP_NAME, G_LOG_DOMAIN,
1633 0, &kvstore_dfsm_callbacks);
1634 g_mutex_unlock (&mutex);
1635
1636 return cfs_status.kvstore;
1637 }
1638
1639 gboolean
1640 cfs_is_quorate(void)
1641 {
1642 g_mutex_lock (&mutex);
1643 gboolean res = cfs_status.quorate;
1644 g_mutex_unlock (&mutex);
1645
1646 return res;
1647 }
1648
1649 void
1650 cfs_set_quorate(
1651 uint32_t quorate,
1652 gboolean quiet)
1653 {
1654 g_mutex_lock (&mutex);
1655
1656 uint32_t prev_quorate = cfs_status.quorate;
1657 cfs_status.quorate = quorate;
1658
1659 if (!prev_quorate && cfs_status.quorate) {
1660 if (!quiet)
1661 cfs_message("node has quorum");
1662 }
1663
1664 if (prev_quorate && !cfs_status.quorate) {
1665 if (!quiet)
1666 cfs_message("node lost quorum");
1667 }
1668
1669 g_mutex_unlock (&mutex);
1670 }
1671