]> git.proxmox.com Git - pve-cluster.git/blob - data/src/status.c
use simple files for LXC configuration
[pve-cluster.git] / data / src / status.c
1 /*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19 */
20
21 #define G_LOG_DOMAIN "status"
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif /* HAVE_CONFIG_H */
26
27 #include <stdio.h>
28 #include <stdint.h>
29 #include <string.h>
30 #include <errno.h>
31 #include <glib.h>
32 #include <sys/syslog.h>
33 #include <rrd.h>
34 #include <rrd_client.h>
35 #include <time.h>
36
37 #include "cfs-utils.h"
38 #include "status.h"
39 #include "logger.h"
40
41 #define KVSTORE_CPG_GROUP_NAME "pve_kvstore_v1"
42
43 typedef enum {
44 KVSTORE_MESSAGE_UPDATE = 1,
45 KVSTORE_MESSAGE_UPDATE_COMPLETE = 2,
46 KVSTORE_MESSAGE_LOG = 3,
47 } kvstore_message_t;
48
49 static uint32_t vminfo_version_counter;
50
51 typedef struct {
52 uint32_t vmid;
53 char *nodename;
54 int vmtype;
55 uint32_t version;
56 } vminfo_t;
57
58 typedef struct {
59 char *key;
60 gpointer data;
61 size_t len;
62 uint32_t version;
63 } kventry_t;
64
65 typedef struct {
66 char *key;
67 gpointer data;
68 size_t len;
69 uint32_t time;
70 } rrdentry_t;
71
72 typedef struct {
73 char *path;
74 uint32_t version;
75 } memdb_change_t;
76
77 static memdb_change_t memdb_change_array[] = {
78 { .path = "corosync.conf" },
79 { .path = "corosync.conf.new" },
80 { .path = "storage.cfg" },
81 { .path = "user.cfg" },
82 { .path = "domains.cfg" },
83 { .path = "priv/shadow.cfg" },
84 { .path = "datacenter.cfg" },
85 { .path = "vzdump.cron" },
86 { .path = "ha/crm_commands" },
87 { .path = "ha/manager_status" },
88 { .path = "ha/resources.cfg" },
89 { .path = "ha/groups.cfg" },
90 { .path = "status.cfg" },
91 };
92
93 static GMutex mutex;
94
95 typedef struct {
96 time_t start_time;
97
98 uint32_t quorate;
99
100 cfs_clinfo_t *clinfo;
101 uint32_t clinfo_version;
102
103 GHashTable *vmlist;
104 uint32_t vmlist_version;
105
106 dfsm_t *kvstore;
107 GHashTable *kvhash;
108 GHashTable *rrdhash;
109 GHashTable *iphash;
110
111 GHashTable *memdb_changes;
112
113 clusterlog_t *clusterlog;
114 } cfs_status_t;
115
116 static cfs_status_t cfs_status;
117
118 struct cfs_clnode {
119 char *name;
120 uint32_t nodeid;
121 uint32_t votes;
122 gboolean online;
123 GHashTable *kvhash;
124 };
125
126 struct cfs_clinfo {
127 char *cluster_name;
128 uint32_t cman_version;
129
130 GHashTable *nodes_byid;
131 GHashTable *nodes_byname;
132 };
133
134 static guint
135 g_int32_hash (gconstpointer v)
136 {
137 return *(const uint32_t *) v;
138 }
139
140 static gboolean
141 g_int32_equal (gconstpointer v1,
142 gconstpointer v2)
143 {
144 return *((const uint32_t*) v1) == *((const uint32_t*) v2);
145 }
146
147 static void vminfo_free(vminfo_t *vminfo)
148 {
149 g_return_if_fail(vminfo != NULL);
150
151 if (vminfo->nodename)
152 g_free(vminfo->nodename);
153
154
155 g_free(vminfo);
156 }
157
158 void cfs_clnode_destroy(
159 cfs_clnode_t *clnode)
160 {
161 g_return_if_fail(clnode != NULL);
162
163 if (clnode->kvhash)
164 g_hash_table_destroy(clnode->kvhash);
165
166 if (clnode->name)
167 g_free(clnode->name);
168
169 g_free(clnode);
170 }
171
172 cfs_clnode_t *cfs_clnode_new(
173 const char *name,
174 uint32_t nodeid,
175 uint32_t votes)
176 {
177 g_return_val_if_fail(name != NULL, NULL);
178
179 cfs_clnode_t *clnode = g_new0(cfs_clnode_t, 1);
180 if (!clnode)
181 return NULL;
182
183 clnode->name = g_strdup(name);
184 clnode->nodeid = nodeid;
185 clnode->votes = votes;
186
187 return clnode;
188 }
189
190 gboolean cfs_clinfo_destroy(
191 cfs_clinfo_t *clinfo)
192 {
193 g_return_val_if_fail(clinfo != NULL, FALSE);
194
195 if (clinfo->cluster_name)
196 g_free(clinfo->cluster_name);
197
198 if (clinfo->nodes_byname)
199 g_hash_table_destroy(clinfo->nodes_byname);
200
201 if (clinfo->nodes_byid)
202 g_hash_table_destroy(clinfo->nodes_byid);
203
204 g_free(clinfo);
205
206 return TRUE;
207 }
208
209 cfs_clinfo_t *cfs_clinfo_new(
210 const char *cluster_name,
211 uint32_t cman_version)
212 {
213 g_return_val_if_fail(cluster_name != NULL, NULL);
214
215 cfs_clinfo_t *clinfo = g_new0(cfs_clinfo_t, 1);
216 if (!clinfo)
217 return NULL;
218
219 clinfo->cluster_name = g_strdup(cluster_name);
220 clinfo->cman_version = cman_version;
221
222 if (!(clinfo->nodes_byid = g_hash_table_new_full(
223 g_int32_hash, g_int32_equal, NULL,
224 (GDestroyNotify)cfs_clnode_destroy)))
225 goto fail;
226
227 if (!(clinfo->nodes_byname = g_hash_table_new(g_str_hash, g_str_equal)))
228 goto fail;
229
230 return clinfo;
231
232 fail:
233 cfs_clinfo_destroy(clinfo);
234
235 return NULL;
236 }
237
238 gboolean cfs_clinfo_add_node(
239 cfs_clinfo_t *clinfo,
240 cfs_clnode_t *clnode)
241 {
242 g_return_val_if_fail(clinfo != NULL, FALSE);
243 g_return_val_if_fail(clnode != NULL, FALSE);
244
245 g_hash_table_replace(clinfo->nodes_byid, &clnode->nodeid, clnode);
246 g_hash_table_replace(clinfo->nodes_byname, clnode->name, clnode);
247
248 return TRUE;
249 }
250
251 int
252 cfs_create_memberlist_msg(
253 GString *str)
254 {
255 g_return_val_if_fail(str != NULL, -EINVAL);
256
257 g_mutex_lock (&mutex);
258
259 g_string_append_printf(str,"{\n");
260
261 guint nodecount = 0;
262
263 cfs_clinfo_t *clinfo = cfs_status.clinfo;
264
265 if (clinfo && clinfo->nodes_byid)
266 nodecount = g_hash_table_size(clinfo->nodes_byid);
267
268 if (nodecount) {
269 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
270 g_string_append_printf(str, "\"version\": %u,\n", cfs_status.clinfo_version);
271
272 g_string_append_printf(str, "\"cluster\": { ");
273 g_string_append_printf(str, "\"name\": \"%s\", \"version\": %d, "
274 "\"nodes\": %d, \"quorate\": %d ",
275 clinfo->cluster_name, clinfo->cman_version,
276 nodecount, cfs_status.quorate);
277
278 g_string_append_printf(str,"},\n");
279 g_string_append_printf(str,"\"nodelist\": {\n");
280
281 GHashTable *ht = clinfo->nodes_byid;
282 GHashTableIter iter;
283 gpointer key, value;
284
285 g_hash_table_iter_init (&iter, ht);
286
287 int i = 0;
288 while (g_hash_table_iter_next (&iter, &key, &value)) {
289 cfs_clnode_t *node = (cfs_clnode_t *)value;
290 if (i) g_string_append_printf(str, ",\n");
291 i++;
292
293 g_string_append_printf(str, " \"%s\": { \"id\": %d, \"online\": %d",
294 node->name, node->nodeid, node->online);
295
296
297 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, node->name);
298 if (ip) {
299 g_string_append_printf(str, ", \"ip\": \"%s\"", ip);
300 }
301
302 g_string_append_printf(str, "}");
303
304 }
305 g_string_append_printf(str,"\n }\n");
306 } else {
307 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
308 g_string_append_printf(str, "\"version\": %u\n", cfs_status.clinfo_version);
309 }
310
311 g_string_append_printf(str,"}\n");
312
313 g_mutex_unlock (&mutex);
314
315 return 0;
316 }
317
318 static void
319 kventry_free(kventry_t *entry)
320 {
321 g_return_if_fail(entry != NULL);
322
323 g_free(entry->key);
324 g_free(entry->data);
325 g_free(entry);
326 }
327
328 static GHashTable *
329 kventry_hash_new(void)
330 {
331 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
332 (GDestroyNotify)kventry_free);
333 }
334
335 static void
336 rrdentry_free(rrdentry_t *entry)
337 {
338 g_return_if_fail(entry != NULL);
339
340 g_free(entry->key);
341 g_free(entry->data);
342 g_free(entry);
343 }
344
345 static GHashTable *
346 rrdentry_hash_new(void)
347 {
348 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
349 (GDestroyNotify)rrdentry_free);
350 }
351
352 void
353 cfs_cluster_log_dump(GString *str, const char *user, guint max_entries)
354 {
355 clusterlog_dump(cfs_status.clusterlog, str, user, max_entries);
356 }
357
358 void
359 cfs_cluster_log(clog_entry_t *entry)
360 {
361 g_return_if_fail(entry != NULL);
362
363 clusterlog_insert(cfs_status.clusterlog, entry);
364
365 if (cfs_status.kvstore) {
366 struct iovec iov[1];
367 iov[0].iov_base = (char *)entry;
368 iov[0].iov_len = clog_entry_size(entry);
369
370 dfsm_send_message(cfs_status.kvstore, KVSTORE_MESSAGE_LOG, iov, 1);
371 }
372 }
373
374 void cfs_status_init(void)
375 {
376 g_mutex_lock (&mutex);
377
378 cfs_status.start_time = time(NULL);
379
380 cfs_status.vmlist = vmlist_hash_new();
381
382 cfs_status.kvhash = kventry_hash_new();
383
384 cfs_status.rrdhash = rrdentry_hash_new();
385
386 cfs_status.iphash = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
387
388 cfs_status.memdb_changes = g_hash_table_new(g_str_hash, g_str_equal);
389
390 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
391 g_hash_table_replace(cfs_status.memdb_changes,
392 memdb_change_array[i].path,
393 &memdb_change_array[i]);
394 }
395
396 cfs_status.clusterlog = clusterlog_new();
397
398 // fixme:
399 clusterlog_add(cfs_status.clusterlog, "root", "cluster", getpid(),
400 LOG_INFO, "starting cluster log");
401
402 g_mutex_unlock (&mutex);
403 }
404
405 void cfs_status_cleanup(void)
406 {
407 g_mutex_lock (&mutex);
408
409 cfs_status.clinfo_version++;
410
411 if (cfs_status.clinfo) {
412 cfs_clinfo_destroy(cfs_status.clinfo);
413 cfs_status.clinfo = NULL;
414 }
415
416 if (cfs_status.vmlist) {
417 g_hash_table_destroy(cfs_status.vmlist);
418 cfs_status.vmlist = NULL;
419 }
420
421 if (cfs_status.kvhash) {
422 g_hash_table_destroy(cfs_status.kvhash);
423 cfs_status.kvhash = NULL;
424 }
425
426 if (cfs_status.rrdhash) {
427 g_hash_table_destroy(cfs_status.rrdhash);
428 cfs_status.rrdhash = NULL;
429 }
430
431 if (cfs_status.iphash) {
432 g_hash_table_destroy(cfs_status.iphash);
433 cfs_status.iphash = NULL;
434 }
435
436 if (cfs_status.clusterlog)
437 clusterlog_destroy(cfs_status.clusterlog);
438
439 g_mutex_unlock (&mutex);
440 }
441
442 void cfs_status_set_clinfo(
443 cfs_clinfo_t *clinfo)
444 {
445 g_return_if_fail(clinfo != NULL);
446
447 g_mutex_lock (&mutex);
448
449 cfs_status.clinfo_version++;
450
451 cfs_clinfo_t *old = cfs_status.clinfo;
452
453 cfs_status.clinfo = clinfo;
454
455 cfs_message("update cluster info (cluster name %s, version = %d)",
456 clinfo->cluster_name, clinfo->cman_version);
457
458
459 if (old && old->nodes_byid && clinfo->nodes_byid) {
460 /* copy kvstore */
461 GHashTable *ht = clinfo->nodes_byid;
462 GHashTableIter iter;
463 gpointer key, value;
464
465 g_hash_table_iter_init (&iter, ht);
466
467 while (g_hash_table_iter_next (&iter, &key, &value)) {
468 cfs_clnode_t *node = (cfs_clnode_t *)value;
469 cfs_clnode_t *oldnode;
470 if ((oldnode = g_hash_table_lookup(old->nodes_byid, key))) {
471 node->online = oldnode->online;
472 node->kvhash = oldnode->kvhash;
473 oldnode->kvhash = NULL;
474 }
475 }
476
477 }
478
479 if (old)
480 cfs_clinfo_destroy(old);
481
482
483 g_mutex_unlock (&mutex);
484 }
485
486 static void
487 dump_kvstore_versions(
488 GString *str,
489 GHashTable *kvhash,
490 const char *nodename)
491 {
492 g_return_if_fail(kvhash != NULL);
493 g_return_if_fail(str != NULL);
494 g_return_if_fail(nodename != NULL);
495
496 GHashTable *ht = kvhash;
497 GHashTableIter iter;
498 gpointer key, value;
499
500 g_string_append_printf(str, "\"%s\": {\n", nodename);
501
502 g_hash_table_iter_init (&iter, ht);
503
504 int i = 0;
505 while (g_hash_table_iter_next (&iter, &key, &value)) {
506 kventry_t *entry = (kventry_t *)value;
507 if (i) g_string_append_printf(str, ",\n");
508 i++;
509 g_string_append_printf(str,"\"%s\": %u", entry->key, entry->version);
510 }
511
512 g_string_append_printf(str, "}\n");
513 }
514
515 int
516 cfs_create_version_msg(GString *str)
517 {
518 g_return_val_if_fail(str != NULL, -EINVAL);
519
520 g_mutex_lock (&mutex);
521
522 g_string_append_printf(str,"{\n");
523
524 g_string_append_printf(str, "\"starttime\": %lu,\n", (unsigned long)cfs_status.start_time);
525
526 g_string_append_printf(str, "\"clinfo\": %u,\n", cfs_status.clinfo_version);
527
528 g_string_append_printf(str, "\"vmlist\": %u,\n", cfs_status.vmlist_version);
529
530 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
531 g_string_append_printf(str, "\"%s\": %u,\n",
532 memdb_change_array[i].path,
533 memdb_change_array[i].version);
534 }
535
536 g_string_append_printf(str, "\"kvstore\": {\n");
537
538 dump_kvstore_versions(str, cfs_status.kvhash, cfs.nodename);
539
540 cfs_clinfo_t *clinfo = cfs_status.clinfo;
541
542 if (clinfo && clinfo->nodes_byid) {
543 GHashTable *ht = clinfo->nodes_byid;
544 GHashTableIter iter;
545 gpointer key, value;
546
547 g_hash_table_iter_init (&iter, ht);
548
549 while (g_hash_table_iter_next (&iter, &key, &value)) {
550 cfs_clnode_t *node = (cfs_clnode_t *)value;
551 if (!node->kvhash)
552 continue;
553 g_string_append_printf(str, ",\n");
554 dump_kvstore_versions(str, node->kvhash, node->name);
555 }
556 }
557
558 g_string_append_printf(str,"}\n");
559
560 g_string_append_printf(str,"}\n");
561
562 g_mutex_unlock (&mutex);
563
564 return 0;
565 }
566
567 GHashTable *
568 vmlist_hash_new(void)
569 {
570 return g_hash_table_new_full(g_int_hash, g_int_equal, NULL,
571 (GDestroyNotify)vminfo_free);
572 }
573
574 gboolean
575 vmlist_hash_insert_vm(
576 GHashTable *vmlist,
577 int vmtype,
578 guint32 vmid,
579 const char *nodename,
580 gboolean replace)
581 {
582 g_return_val_if_fail(vmlist != NULL, FALSE);
583 g_return_val_if_fail(nodename != NULL, FALSE);
584 g_return_val_if_fail(vmid != 0, FALSE);
585 g_return_val_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
586 vmtype == VMTYPE_LXC, FALSE);
587
588 if (!replace && g_hash_table_lookup(vmlist, &vmid)) {
589 cfs_critical("detected duplicate VMID %d", vmid);
590 return FALSE;
591 }
592
593 vminfo_t *vminfo = g_new0(vminfo_t, 1);
594
595 vminfo->vmid = vmid;
596 vminfo->vmtype = vmtype;
597 vminfo->nodename = g_strdup(nodename);
598
599 vminfo->version = ++vminfo_version_counter;
600
601 g_hash_table_replace(vmlist, &vminfo->vmid, vminfo);
602
603 return TRUE;
604 }
605
606 void
607 vmlist_register_vm(
608 int vmtype,
609 guint32 vmid,
610 const char *nodename)
611 {
612 g_return_if_fail(cfs_status.vmlist != NULL);
613 g_return_if_fail(nodename != NULL);
614 g_return_if_fail(vmid != 0);
615 g_return_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
616 vmtype == VMTYPE_LXC);
617
618 cfs_debug("vmlist_register_vm: %s/%u %d", nodename, vmid, vmtype);
619
620 g_mutex_lock (&mutex);
621
622 cfs_status.vmlist_version++;
623
624 vmlist_hash_insert_vm(cfs_status.vmlist, vmtype, vmid, nodename, TRUE);
625
626 g_mutex_unlock (&mutex);
627 }
628
629 gboolean
630 vmlist_different_vm_exists(
631 int vmtype,
632 guint32 vmid,
633 const char *nodename)
634 {
635 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
636 g_return_val_if_fail(vmid != 0, FALSE);
637
638 gboolean res = FALSE;
639
640 g_mutex_lock (&mutex);
641
642 vminfo_t *vminfo;
643 if ((vminfo = (vminfo_t *)g_hash_table_lookup(cfs_status.vmlist, &vmid))) {
644 if (!(vminfo->vmtype == vmtype && strcmp(vminfo->nodename, nodename) == 0))
645 res = TRUE;
646 }
647 g_mutex_unlock (&mutex);
648
649 return res;
650 }
651
652 gboolean
653 vmlist_vm_exists(
654 guint32 vmid)
655 {
656 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
657 g_return_val_if_fail(vmid != 0, FALSE);
658
659 g_mutex_lock (&mutex);
660
661 gpointer res = g_hash_table_lookup(cfs_status.vmlist, &vmid);
662
663 g_mutex_unlock (&mutex);
664
665 return res != NULL;
666 }
667
668 void
669 vmlist_delete_vm(
670 guint32 vmid)
671 {
672 g_return_if_fail(cfs_status.vmlist != NULL);
673 g_return_if_fail(vmid != 0);
674
675 g_mutex_lock (&mutex);
676
677 cfs_status.vmlist_version++;
678
679 g_hash_table_remove(cfs_status.vmlist, &vmid);
680
681 g_mutex_unlock (&mutex);
682 }
683
684 void cfs_status_set_vmlist(
685 GHashTable *vmlist)
686 {
687 g_return_if_fail(vmlist != NULL);
688
689 g_mutex_lock (&mutex);
690
691 cfs_status.vmlist_version++;
692
693 if (cfs_status.vmlist)
694 g_hash_table_destroy(cfs_status.vmlist);
695
696 cfs_status.vmlist = vmlist;
697
698 g_mutex_unlock (&mutex);
699 }
700
701 int
702 cfs_create_vmlist_msg(GString *str)
703 {
704 g_return_val_if_fail(cfs_status.vmlist != NULL, -EINVAL);
705 g_return_val_if_fail(str != NULL, -EINVAL);
706
707 g_mutex_lock (&mutex);
708
709 g_string_append_printf(str,"{\n");
710
711 GHashTable *ht = cfs_status.vmlist;
712
713 guint count = g_hash_table_size(ht);
714
715 if (!count) {
716 g_string_append_printf(str,"\"version\": %u\n", cfs_status.vmlist_version);
717 } else {
718 g_string_append_printf(str,"\"version\": %u,\n", cfs_status.vmlist_version);
719
720 g_string_append_printf(str,"\"ids\": {\n");
721
722 GHashTableIter iter;
723 gpointer key, value;
724
725 g_hash_table_iter_init (&iter, ht);
726
727 int first = 1;
728 while (g_hash_table_iter_next (&iter, &key, &value)) {
729 vminfo_t *vminfo = (vminfo_t *)value;
730 char *type;
731 if (vminfo->vmtype == VMTYPE_QEMU) {
732 type = "qemu";
733 } else if (vminfo->vmtype == VMTYPE_OPENVZ) {
734 type = "openvz";
735 } else if (vminfo->vmtype == VMTYPE_LXC) {
736 type = "lxc";
737 } else {
738 type = "unknown";
739 }
740
741 if (!first)
742 g_string_append_printf(str, ",\n");
743 first = 0;
744
745 g_string_append_printf(str,"\"%u\": { \"node\": \"%s\", \"type\": \"%s\", \"version\": %u }",
746 vminfo->vmid, vminfo->nodename, type, vminfo->version);
747 }
748
749 g_string_append_printf(str,"}\n");
750 }
751 g_string_append_printf(str,"\n}\n");
752
753 g_mutex_unlock (&mutex);
754
755 return 0;
756 }
757
758 void
759 record_memdb_change(const char *path)
760 {
761 g_return_if_fail(cfs_status.memdb_changes != 0);
762
763 memdb_change_t *ce;
764
765 if ((ce = (memdb_change_t *)g_hash_table_lookup(cfs_status.memdb_changes, path))) {
766 ce->version++;
767 }
768 }
769
770 void
771 record_memdb_reload(void)
772 {
773 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
774 memdb_change_array[i].version++;
775 }
776 }
777
778 static gboolean
779 kventry_hash_set(
780 GHashTable *kvhash,
781 const char *key,
782 gconstpointer data,
783 size_t len)
784 {
785 g_return_val_if_fail(kvhash != NULL, FALSE);
786 g_return_val_if_fail(key != NULL, FALSE);
787 g_return_val_if_fail(data != NULL, FALSE);
788
789 kventry_t *entry;
790 if ((entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
791 g_free(entry->data);
792 entry->data = g_memdup(data, len);
793 entry->len = len;
794 entry->version++;
795 } else {
796 kventry_t *entry = g_new0(kventry_t, 1);
797
798 entry->key = g_strdup(key);
799 entry->data = g_memdup(data, len);
800 entry->len = len;
801
802 g_hash_table_replace(kvhash, entry->key, entry);
803 }
804
805 return TRUE;
806 }
807
808 static const char *rrd_def_node[] = {
809 "DS:loadavg:GAUGE:120:0:U",
810 "DS:maxcpu:GAUGE:120:0:U",
811 "DS:cpu:GAUGE:120:0:U",
812 "DS:iowait:GAUGE:120:0:U",
813 "DS:memtotal:GAUGE:120:0:U",
814 "DS:memused:GAUGE:120:0:U",
815 "DS:swaptotal:GAUGE:120:0:U",
816 "DS:swapused:GAUGE:120:0:U",
817 "DS:roottotal:GAUGE:120:0:U",
818 "DS:rootused:GAUGE:120:0:U",
819 "DS:netin:DERIVE:120:0:U",
820 "DS:netout:DERIVE:120:0:U",
821
822 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
823 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
824 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
825 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
826 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
827
828 "RRA:MAX:0.5:1:70", // 1 min max - one hour
829 "RRA:MAX:0.5:30:70", // 30 min max - one day
830 "RRA:MAX:0.5:180:70", // 3 hour max - one week
831 "RRA:MAX:0.5:720:70", // 12 hour max - one month
832 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
833 NULL,
834 };
835
836 static const char *rrd_def_vm[] = {
837 "DS:maxcpu:GAUGE:120:0:U",
838 "DS:cpu:GAUGE:120:0:U",
839 "DS:maxmem:GAUGE:120:0:U",
840 "DS:mem:GAUGE:120:0:U",
841 "DS:maxdisk:GAUGE:120:0:U",
842 "DS:disk:GAUGE:120:0:U",
843 "DS:netin:DERIVE:120:0:U",
844 "DS:netout:DERIVE:120:0:U",
845 "DS:diskread:DERIVE:120:0:U",
846 "DS:diskwrite:DERIVE:120:0:U",
847
848 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
849 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
850 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
851 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
852 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
853
854 "RRA:MAX:0.5:1:70", // 1 min max - one hour
855 "RRA:MAX:0.5:30:70", // 30 min max - one day
856 "RRA:MAX:0.5:180:70", // 3 hour max - one week
857 "RRA:MAX:0.5:720:70", // 12 hour max - one month
858 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
859 NULL,
860 };
861
862 static const char *rrd_def_storage[] = {
863 "DS:total:GAUGE:120:0:U",
864 "DS:used:GAUGE:120:0:U",
865
866 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
867 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
868 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
869 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
870 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
871
872 "RRA:MAX:0.5:1:70", // 1 min max - one hour
873 "RRA:MAX:0.5:30:70", // 30 min max - one day
874 "RRA:MAX:0.5:180:70", // 3 hour max - one week
875 "RRA:MAX:0.5:720:70", // 12 hour max - one month
876 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
877 NULL,
878 };
879
880 #define RRDDIR "/var/lib/rrdcached/db"
881
882 static void
883 create_rrd_file(
884 const char *filename,
885 int argcount,
886 const char *rrddef[])
887 {
888 /* start at day boundary */
889 time_t ctime;
890 time(&ctime);
891 struct tm *ltm = localtime(&ctime);
892 ltm->tm_sec = 0;
893 ltm->tm_min = 0;
894 ltm->tm_hour = 0;
895
896 rrd_clear_error();
897 if (rrd_create_r(filename, 60, timelocal(ltm), argcount, rrddef)) {
898 cfs_message("RRD create error %s: %s", filename, rrd_get_error());
899 }
900 }
901
902 static inline const char *
903 rrd_skip_data(
904 const char *data,
905 int count)
906 {
907 int found = 0;
908 while (*data && found < count) {
909 if (*data++ == ':')
910 found++;
911 }
912 return data;
913 }
914
915 static void
916 update_rrd_data(
917 const char *key,
918 gconstpointer data,
919 size_t len)
920 {
921 g_return_if_fail(key != NULL);
922 g_return_if_fail(data != NULL);
923 g_return_if_fail(len > 0);
924 g_return_if_fail(len < 4096);
925
926 static const char *rrdcsock = "unix:/var/run/rrdcached.sock";
927
928 int use_daemon = 1;
929 if (rrdc_connect(rrdcsock) != 0)
930 use_daemon = 0;
931
932 char *filename = NULL;
933
934 int skip = 0;
935
936 if (strncmp(key, "pve2-node/", 10) == 0) {
937 const char *node = key + 10;
938
939 skip = 2;
940
941 if (strchr(node, '/') != NULL)
942 goto keyerror;
943
944 if (strlen(node) < 1)
945 goto keyerror;
946
947 filename = g_strdup_printf(RRDDIR "/%s", key);
948
949 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
950
951 mkdir(RRDDIR "/pve2-node", 0755);
952 int argcount = sizeof(rrd_def_node)/sizeof(void*) - 1;
953 create_rrd_file(filename, argcount, rrd_def_node);
954 }
955
956 } else if ((strncmp(key, "pve2-vm/", 8) == 0) ||
957 (strncmp(key, "pve2.3-vm/", 10) == 0)) {
958 const char *vmid;
959
960 if (strncmp(key, "pve2-vm/", 8) == 0) {
961 vmid = key + 8;
962 skip = 2;
963 } else {
964 vmid = key + 10;
965 skip = 4;
966 }
967
968 if (strchr(vmid, '/') != NULL)
969 goto keyerror;
970
971 if (strlen(vmid) < 1)
972 goto keyerror;
973
974 filename = g_strdup_printf(RRDDIR "/%s/%s", "pve2-vm", vmid);
975
976 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
977
978 mkdir(RRDDIR "/pve2-vm", 0755);
979 int argcount = sizeof(rrd_def_vm)/sizeof(void*) - 1;
980 create_rrd_file(filename, argcount, rrd_def_vm);
981 }
982
983 } else if (strncmp(key, "pve2-storage/", 13) == 0) {
984 const char *node = key + 13;
985
986 const char *storage = node;
987 while (*storage && *storage != '/')
988 storage++;
989
990 if (*storage != '/' || ((storage - node) < 1))
991 goto keyerror;
992
993 storage++;
994
995 if (strchr(storage, '/') != NULL)
996 goto keyerror;
997
998 if (strlen(storage) < 1)
999 goto keyerror;
1000
1001 filename = g_strdup_printf(RRDDIR "/%s", key);
1002
1003 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1004
1005 mkdir(RRDDIR "/pve2-storage", 0755);
1006
1007 char *dir = g_path_get_dirname(filename);
1008 mkdir(dir, 0755);
1009 g_free(dir);
1010
1011 int argcount = sizeof(rrd_def_storage)/sizeof(void*) - 1;
1012 create_rrd_file(filename, argcount, rrd_def_storage);
1013 }
1014
1015 } else {
1016 goto keyerror;
1017 }
1018
1019 const char *dp = skip ? rrd_skip_data(data, skip) : data;
1020
1021 const char *update_args[] = { dp, NULL };
1022
1023 if (use_daemon) {
1024 int status;
1025 if ((status = rrdc_update(filename, 1, update_args)) != 0) {
1026 cfs_message("RRDC update error %s: %d", filename, status);
1027 rrdc_disconnect();
1028 rrd_clear_error();
1029 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1030 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1031 }
1032 }
1033
1034 } else {
1035 rrd_clear_error();
1036 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1037 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1038 }
1039 }
1040
1041 ret:
1042 if (filename)
1043 g_free(filename);
1044
1045 return;
1046
1047 keyerror:
1048 cfs_critical("RRD update error: unknown/wrong key %s", key);
1049 goto ret;
1050 }
1051
1052 static gboolean
1053 rrd_entry_is_old(
1054 gpointer key,
1055 gpointer value,
1056 gpointer user_data)
1057 {
1058 rrdentry_t *entry = (rrdentry_t *)value;
1059 uint32_t ctime = GPOINTER_TO_UINT(user_data);
1060
1061 int diff = ctime - entry->time;
1062
1063 /* remove everything older than 5 minutes */
1064 int expire = 60*5;
1065
1066 return (diff > expire) ? TRUE : FALSE;
1067 }
1068
1069 static char *rrd_dump_buf = NULL;
1070 static time_t rrd_dump_last = 0;
1071
1072 void
1073 cfs_rrd_dump(GString *str)
1074 {
1075 time_t ctime;
1076 time(&ctime);
1077
1078 if (rrd_dump_buf && (ctime - rrd_dump_last) < 2) {
1079 g_string_assign(str, rrd_dump_buf);
1080 return;
1081 }
1082
1083 /* remove old data */
1084 g_hash_table_foreach_remove(cfs_status.rrdhash, rrd_entry_is_old,
1085 GUINT_TO_POINTER(ctime));
1086
1087 g_string_set_size(str, 0);
1088
1089 GHashTableIter iter;
1090 gpointer key, value;
1091
1092 g_hash_table_iter_init (&iter, cfs_status.rrdhash);
1093
1094 while (g_hash_table_iter_next (&iter, &key, &value)) {
1095 rrdentry_t *entry = (rrdentry_t *)value;
1096 g_string_append(str, key);
1097 g_string_append(str, ":");
1098 g_string_append(str, entry->data);
1099 g_string_append(str, "\n");
1100 }
1101
1102 g_string_append_c(str, 0); // never return undef
1103
1104 rrd_dump_last = ctime;
1105 if (rrd_dump_buf)
1106 g_free(rrd_dump_buf);
1107 rrd_dump_buf = g_strdup(str->str);
1108 }
1109
1110 static gboolean
1111 nodeip_hash_set(
1112 GHashTable *iphash,
1113 const char *nodename,
1114 const char *ip,
1115 size_t len)
1116 {
1117 g_return_val_if_fail(iphash != NULL, FALSE);
1118 g_return_val_if_fail(nodename != NULL, FALSE);
1119 g_return_val_if_fail(ip != NULL, FALSE);
1120 g_return_val_if_fail(len > 0, FALSE);
1121 g_return_val_if_fail(len < 256, FALSE);
1122 g_return_val_if_fail(ip[len-1] == 0, FALSE);
1123
1124 char *oldip = (char *)g_hash_table_lookup(iphash, nodename);
1125
1126 if (!oldip || (strcmp(oldip, ip) != 0)) {
1127 cfs_status.clinfo_version++;
1128 g_hash_table_replace(iphash, g_strdup(nodename), g_strdup(ip));
1129 }
1130
1131 return TRUE;
1132 }
1133
1134 static gboolean
1135 rrdentry_hash_set(
1136 GHashTable *rrdhash,
1137 const char *key,
1138 const char *data,
1139 size_t len)
1140 {
1141 g_return_val_if_fail(rrdhash != NULL, FALSE);
1142 g_return_val_if_fail(key != NULL, FALSE);
1143 g_return_val_if_fail(data != NULL, FALSE);
1144 g_return_val_if_fail(len > 0, FALSE);
1145 g_return_val_if_fail(len < 4096, FALSE);
1146 g_return_val_if_fail(data[len-1] == 0, FALSE);
1147
1148 rrdentry_t *entry;
1149 if ((entry = (rrdentry_t *)g_hash_table_lookup(rrdhash, key))) {
1150 g_free(entry->data);
1151 entry->data = g_memdup(data, len);
1152 entry->len = len;
1153 entry->time = time(NULL);
1154 } else {
1155 rrdentry_t *entry = g_new0(rrdentry_t, 1);
1156
1157 entry->key = g_strdup(key);
1158 entry->data = g_memdup(data, len);
1159 entry->len = len;
1160 entry->time = time(NULL);
1161
1162 g_hash_table_replace(rrdhash, entry->key, entry);
1163 }
1164
1165 update_rrd_data(key, data, len);
1166
1167 return TRUE;
1168 }
1169
1170 static int
1171 kvstore_send_update_message(
1172 dfsm_t *dfsm,
1173 const char *key,
1174 gpointer data,
1175 guint32 len)
1176 {
1177
1178 struct iovec iov[2];
1179
1180 char name[256];
1181 g_strlcpy(name, key, sizeof(name));
1182
1183 iov[0].iov_base = &name;
1184 iov[0].iov_len = sizeof(name);
1185
1186 iov[1].iov_base = (char *)data;
1187 iov[1].iov_len = len;
1188
1189 if (dfsm_send_message(dfsm, KVSTORE_MESSAGE_UPDATE, iov, 2) == CS_OK)
1190 return 0;
1191
1192 return -EACCES;
1193 }
1194
1195 static clog_entry_t *
1196 kvstore_parse_log_message(
1197 const void *msg,
1198 size_t msg_len)
1199 {
1200 g_return_val_if_fail(msg != NULL, NULL);
1201
1202 if (msg_len < sizeof(clog_entry_t)) {
1203 cfs_critical("received short log message (%lu < %lu)", msg_len, sizeof(clog_entry_t));
1204 return NULL;
1205 }
1206
1207 clog_entry_t *entry = (clog_entry_t *)msg;
1208
1209 uint32_t size = sizeof(clog_entry_t) + entry->node_len +
1210 entry->ident_len + entry->tag_len + entry->msg_len;
1211
1212 if (msg_len != size) {
1213 cfs_critical("received log message with wrong size (%lu != %u)", msg_len, size);
1214 return NULL;
1215 }
1216
1217 msg = entry->data;
1218
1219 if (*((char *)msg + entry->node_len - 1)) {
1220 cfs_critical("unterminated string in log message");
1221 return NULL;
1222 }
1223 msg += entry->node_len;
1224
1225 if (*((char *)msg + entry->ident_len - 1)) {
1226 cfs_critical("unterminated string in log message");
1227 return NULL;
1228 }
1229 msg += entry->ident_len;
1230
1231 if (*((char *)msg + entry->tag_len - 1)) {
1232 cfs_critical("unterminated string in log message");
1233 return NULL;
1234 }
1235 msg += entry->tag_len;
1236
1237 if (*((char *)msg + entry->msg_len - 1)) {
1238 cfs_critical("unterminated string in log message");
1239 return NULL;
1240 }
1241
1242 return entry;
1243 }
1244
1245 static gboolean
1246 kvstore_parse_update_message(
1247 const void *msg,
1248 size_t msg_len,
1249 const char **key,
1250 gconstpointer *data,
1251 guint32 *len)
1252 {
1253 g_return_val_if_fail(msg != NULL, FALSE);
1254 g_return_val_if_fail(key != NULL, FALSE);
1255 g_return_val_if_fail(data != NULL, FALSE);
1256 g_return_val_if_fail(len != NULL, FALSE);
1257
1258 if (msg_len < 256) {
1259 cfs_critical("received short kvstore message (%lu < 256)", msg_len);
1260 return FALSE;
1261 }
1262
1263 /* test if key is null terminated */
1264 int i = 0;
1265 for (i = 0; i < 256; i++)
1266 if (((char *)msg)[i] == 0)
1267 break;
1268
1269 if (i == 256)
1270 return FALSE;
1271
1272
1273 *len = msg_len - 256;
1274 *key = msg;
1275 *data = msg + 256;
1276
1277 return TRUE;
1278 }
1279
1280 int
1281 cfs_create_status_msg(
1282 GString *str,
1283 const char *nodename,
1284 const char *key)
1285 {
1286 g_return_val_if_fail(str != NULL, -EINVAL);
1287 g_return_val_if_fail(key != NULL, -EINVAL);
1288
1289 int res = -ENOENT;
1290
1291 GHashTable *kvhash = NULL;
1292
1293 g_mutex_lock (&mutex);
1294
1295 if (!nodename || !nodename[0] || !strcmp(nodename, cfs.nodename)) {
1296 kvhash = cfs_status.kvhash;
1297 } else {
1298 cfs_clnode_t *clnode;
1299 if ((clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byname, nodename)))
1300 kvhash = clnode->kvhash;
1301 }
1302
1303 kventry_t *entry;
1304 if (kvhash && (entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
1305 g_string_append_len(str, entry->data, entry->len);
1306 res = 0;
1307 }
1308
1309 g_mutex_unlock (&mutex);
1310
1311 return res;
1312 }
1313
1314 int
1315 cfs_status_set(
1316 const char *key,
1317 gpointer data,
1318 size_t len)
1319 {
1320 g_return_val_if_fail(key != NULL, FALSE);
1321 g_return_val_if_fail(data != NULL, FALSE);
1322 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1323
1324 if (len > CFS_MAX_STATUS_SIZE)
1325 return -EFBIG;
1326
1327 g_mutex_lock (&mutex);
1328
1329 gboolean res;
1330
1331 if (strncmp(key, "rrd/", 4) == 0) {
1332 res = rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1333 } else if (!strcmp(key, "nodeip")) {
1334 res = nodeip_hash_set(cfs_status.iphash, cfs.nodename, data, len);
1335 } else {
1336 res = kventry_hash_set(cfs_status.kvhash, key, data, len);
1337 }
1338 g_mutex_unlock (&mutex);
1339
1340 if (cfs_status.kvstore)
1341 kvstore_send_update_message(cfs_status.kvstore, key, data, len);
1342
1343 return res ? 0 : -ENOMEM;
1344 }
1345
1346 gboolean
1347 cfs_kvstore_node_set(
1348 uint32_t nodeid,
1349 const char *key,
1350 gconstpointer data,
1351 size_t len)
1352 {
1353 g_return_val_if_fail(nodeid != 0, FALSE);
1354 g_return_val_if_fail(key != NULL, FALSE);
1355 g_return_val_if_fail(data != NULL, FALSE);
1356
1357 g_mutex_lock (&mutex);
1358
1359 if (!cfs_status.clinfo || !cfs_status.clinfo->nodes_byid)
1360 goto ret; /* ignore */
1361
1362 cfs_clnode_t *clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byid, &nodeid);
1363 if (!clnode)
1364 goto ret; /* ignore */
1365
1366 cfs_debug("got node %d status update %s", nodeid, key);
1367
1368 if (strncmp(key, "rrd/", 4) == 0) {
1369 rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1370 } else if (!strcmp(key, "nodeip")) {
1371 nodeip_hash_set(cfs_status.iphash, clnode->name, data, len);
1372 } else {
1373 if (!clnode->kvhash) {
1374 if (!(clnode->kvhash = kventry_hash_new())) {
1375 goto ret; /*ignore */
1376 }
1377 }
1378
1379 kventry_hash_set(clnode->kvhash, key, data, len);
1380
1381 }
1382 ret:
1383 g_mutex_unlock (&mutex);
1384
1385 return TRUE;
1386 }
1387
1388 static gboolean
1389 cfs_kvstore_sync(void)
1390 {
1391 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1392 g_return_val_if_fail(cfs_status.kvstore != NULL, FALSE);
1393
1394 gboolean res = TRUE;
1395
1396 g_mutex_lock (&mutex);
1397
1398 GHashTable *ht = cfs_status.kvhash;
1399 GHashTableIter iter;
1400 gpointer key, value;
1401
1402 g_hash_table_iter_init (&iter, ht);
1403
1404 while (g_hash_table_iter_next (&iter, &key, &value)) {
1405 kventry_t *entry = (kventry_t *)value;
1406 kvstore_send_update_message(cfs_status.kvstore, entry->key, entry->data, entry->len);
1407 }
1408
1409 g_mutex_unlock (&mutex);
1410
1411 return res;
1412 }
1413
1414 static int
1415 dfsm_deliver(
1416 dfsm_t *dfsm,
1417 gpointer data,
1418 int *res_ptr,
1419 uint32_t nodeid,
1420 uint32_t pid,
1421 uint16_t msg_type,
1422 uint32_t msg_time,
1423 const void *msg,
1424 size_t msg_len)
1425 {
1426 g_return_val_if_fail(dfsm != NULL, -1);
1427 g_return_val_if_fail(msg != NULL, -1);
1428 g_return_val_if_fail(res_ptr != NULL, -1);
1429
1430 /* ignore message for ourself */
1431 if (dfsm_nodeid_is_local(dfsm, nodeid, pid))
1432 goto ret;
1433
1434 if (msg_type == KVSTORE_MESSAGE_UPDATE) {
1435 const char *key;
1436 gconstpointer data;
1437 guint32 len;
1438 if (kvstore_parse_update_message(msg, msg_len, &key, &data, &len)) {
1439 cfs_kvstore_node_set(nodeid, key, data, len);
1440 } else {
1441 cfs_critical("cant parse update message");
1442 }
1443 } else if (msg_type == KVSTORE_MESSAGE_LOG) {
1444 cfs_message("received log"); // fixme: remove
1445 const clog_entry_t *entry;
1446 if ((entry = kvstore_parse_log_message(msg, msg_len))) {
1447 clusterlog_insert(cfs_status.clusterlog, entry);
1448 } else {
1449 cfs_critical("cant parse log message");
1450 }
1451 } else {
1452 cfs_critical("received unknown message type %d\n", msg_type);
1453 goto fail;
1454 }
1455
1456 ret:
1457 *res_ptr = 0;
1458 return 1;
1459
1460 fail:
1461 *res_ptr = -EACCES;
1462 return 1;
1463 }
1464
1465 static void
1466 dfsm_confchg(
1467 dfsm_t *dfsm,
1468 gpointer data,
1469 const struct cpg_address *member_list,
1470 size_t member_list_entries)
1471 {
1472 g_return_if_fail(dfsm != NULL);
1473 g_return_if_fail(member_list != NULL);
1474
1475 cfs_debug("enter %s", __func__);
1476
1477 g_mutex_lock (&mutex);
1478
1479 cfs_clinfo_t *clinfo = cfs_status.clinfo;
1480
1481 if (clinfo && clinfo->nodes_byid) {
1482
1483 GHashTable *ht = clinfo->nodes_byid;
1484 GHashTableIter iter;
1485 gpointer key, value;
1486
1487 g_hash_table_iter_init (&iter, ht);
1488
1489 while (g_hash_table_iter_next (&iter, &key, &value)) {
1490 cfs_clnode_t *node = (cfs_clnode_t *)value;
1491 node->online = FALSE;
1492 }
1493
1494 for (int i = 0; i < member_list_entries; i++) {
1495 cfs_clnode_t *node;
1496 if ((node = g_hash_table_lookup(clinfo->nodes_byid, &member_list[i].nodeid))) {
1497 node->online = TRUE;
1498 }
1499 }
1500
1501 cfs_status.clinfo_version++;
1502 }
1503
1504 g_mutex_unlock (&mutex);
1505 }
1506
1507 static gpointer
1508 dfsm_get_state(
1509 dfsm_t *dfsm,
1510 gpointer data,
1511 unsigned int *res_len)
1512 {
1513 g_return_val_if_fail(dfsm != NULL, NULL);
1514
1515 gpointer msg = clusterlog_get_state(cfs_status.clusterlog, res_len);
1516
1517 return msg;
1518 }
1519
1520 static int
1521 dfsm_process_update(
1522 dfsm_t *dfsm,
1523 gpointer data,
1524 dfsm_sync_info_t *syncinfo,
1525 uint32_t nodeid,
1526 uint32_t pid,
1527 const void *msg,
1528 size_t msg_len)
1529 {
1530 cfs_critical("%s: received unexpected update message", __func__);
1531
1532 return -1;
1533 }
1534
1535 static int
1536 dfsm_process_state_update(
1537 dfsm_t *dfsm,
1538 gpointer data,
1539 dfsm_sync_info_t *syncinfo)
1540 {
1541 g_return_val_if_fail(dfsm != NULL, -1);
1542 g_return_val_if_fail(syncinfo != NULL, -1);
1543
1544 clog_base_t *clog[syncinfo->node_count];
1545
1546 int local_index = -1;
1547 for (int i = 0; i < syncinfo->node_count; i++) {
1548 dfsm_node_info_t *ni = &syncinfo->nodes[i];
1549 ni->synced = 1;
1550
1551 if (syncinfo->local == ni)
1552 local_index = i;
1553
1554 clog_base_t *base = (clog_base_t *)ni->state;
1555 if (ni->state_len > 8 && ni->state_len == clog_size(base)) {
1556 clog[i] = ni->state;
1557 } else {
1558 cfs_critical("received log with wrong size %u", ni->state_len);
1559 clog[i] = NULL;
1560 }
1561 }
1562
1563 if (!clusterlog_merge(cfs_status.clusterlog, clog, syncinfo->node_count, local_index)) {
1564 cfs_critical("unable to merge log files");
1565 }
1566
1567 cfs_kvstore_sync();
1568
1569 return 1;
1570 }
1571
1572 static int
1573 dfsm_commit(
1574 dfsm_t *dfsm,
1575 gpointer data,
1576 dfsm_sync_info_t *syncinfo)
1577 {
1578 g_return_val_if_fail(dfsm != NULL, -1);
1579 g_return_val_if_fail(syncinfo != NULL, -1);
1580
1581 return 1;
1582 }
1583
1584 static void
1585 dfsm_synced(dfsm_t *dfsm)
1586 {
1587 g_return_if_fail(dfsm != NULL);
1588
1589 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, cfs.nodename);
1590 if (!ip)
1591 ip = cfs.ip;
1592
1593 cfs_status_set("nodeip", ip, strlen(ip) + 1);
1594 }
1595
1596 static int
1597 dfsm_cleanup(
1598 dfsm_t *dfsm,
1599 gpointer data,
1600 dfsm_sync_info_t *syncinfo)
1601 {
1602 return 1;
1603 }
1604
1605 static dfsm_callbacks_t kvstore_dfsm_callbacks = {
1606 .dfsm_deliver_fn = dfsm_deliver,
1607 .dfsm_confchg_fn = dfsm_confchg,
1608
1609 .dfsm_get_state_fn = dfsm_get_state,
1610 .dfsm_process_state_update_fn = dfsm_process_state_update,
1611 .dfsm_process_update_fn = dfsm_process_update,
1612 .dfsm_commit_fn = dfsm_commit,
1613 .dfsm_cleanup_fn = dfsm_cleanup,
1614 .dfsm_synced_fn = dfsm_synced,
1615 };
1616
1617 dfsm_t *
1618 cfs_status_dfsm_new(void)
1619 {
1620 g_mutex_lock (&mutex);
1621
1622 cfs_status.kvstore = dfsm_new(NULL, KVSTORE_CPG_GROUP_NAME, G_LOG_DOMAIN,
1623 0, &kvstore_dfsm_callbacks);
1624 g_mutex_unlock (&mutex);
1625
1626 return cfs_status.kvstore;
1627 }
1628
1629 gboolean
1630 cfs_is_quorate(void)
1631 {
1632 g_mutex_lock (&mutex);
1633 gboolean res = cfs_status.quorate;
1634 g_mutex_unlock (&mutex);
1635
1636 return res;
1637 }
1638
1639 void
1640 cfs_set_quorate(
1641 uint32_t quorate,
1642 gboolean quiet)
1643 {
1644 g_mutex_lock (&mutex);
1645
1646 uint32_t prev_quorate = cfs_status.quorate;
1647 cfs_status.quorate = quorate;
1648
1649 if (!prev_quorate && cfs_status.quorate) {
1650 if (!quiet)
1651 cfs_message("node has quorum");
1652 }
1653
1654 if (prev_quorate && !cfs_status.quorate) {
1655 if (!quiet)
1656 cfs_message("node lost quorum");
1657 }
1658
1659 g_mutex_unlock (&mutex);
1660 }
1661