]> git.proxmox.com Git - pve-cluster.git/blob - data/src/status.c
5fe1eb9e94d1862f018a7f032a638a795b498151
[pve-cluster.git] / data / src / status.c
1 /*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19 */
20
21 #define G_LOG_DOMAIN "status"
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif /* HAVE_CONFIG_H */
26
27 #include <stdio.h>
28 #include <stdint.h>
29 #include <string.h>
30 #include <errno.h>
31 #include <glib.h>
32 #include <sys/syslog.h>
33 #include <rrd.h>
34 #include <rrd_client.h>
35 #include <time.h>
36
37 #include "cfs-utils.h"
38 #include "status.h"
39 #include "logger.h"
40
41 #define KVSTORE_CPG_GROUP_NAME "pve_kvstore_v1"
42
43 typedef enum {
44 KVSTORE_MESSAGE_UPDATE = 1,
45 KVSTORE_MESSAGE_UPDATE_COMPLETE = 2,
46 KVSTORE_MESSAGE_LOG = 3,
47 } kvstore_message_t;
48
49 static uint32_t vminfo_version_counter;
50
51 typedef struct {
52 uint32_t vmid;
53 char *nodename;
54 int vmtype;
55 uint32_t version;
56 } vminfo_t;
57
58 typedef struct {
59 char *key;
60 gpointer data;
61 size_t len;
62 uint32_t version;
63 } kventry_t;
64
65 typedef struct {
66 char *key;
67 gpointer data;
68 size_t len;
69 uint32_t time;
70 } rrdentry_t;
71
72 typedef struct {
73 char *path;
74 uint32_t version;
75 } memdb_change_t;
76
77 static memdb_change_t memdb_change_array[] = {
78 { .path = "corosync.conf" },
79 { .path = "corosync.conf.new" },
80 { .path = "storage.cfg" },
81 { .path = "user.cfg" },
82 { .path = "domains.cfg" },
83 { .path = "priv/shadow.cfg" },
84 { .path = "datacenter.cfg" },
85 { .path = "vzdump.cron" },
86 { .path = "ha/crm_commands" },
87 { .path = "ha/manager_status" },
88 { .path = "ha/resources.cfg" },
89 { .path = "ha/groups.cfg" },
90 { .path = "ha/fence.cfg" },
91 { .path = "status.cfg" },
92 { .path = "replication.cfg" },
93 };
94
95 static GMutex mutex;
96
97 typedef struct {
98 time_t start_time;
99
100 uint32_t quorate;
101
102 cfs_clinfo_t *clinfo;
103 uint32_t clinfo_version;
104
105 GHashTable *vmlist;
106 uint32_t vmlist_version;
107
108 dfsm_t *kvstore;
109 GHashTable *kvhash;
110 GHashTable *rrdhash;
111 GHashTable *iphash;
112
113 GHashTable *memdb_changes;
114
115 clusterlog_t *clusterlog;
116 } cfs_status_t;
117
118 static cfs_status_t cfs_status;
119
120 struct cfs_clnode {
121 char *name;
122 uint32_t nodeid;
123 uint32_t votes;
124 gboolean online;
125 GHashTable *kvhash;
126 };
127
128 struct cfs_clinfo {
129 char *cluster_name;
130 uint32_t cman_version;
131
132 GHashTable *nodes_byid;
133 GHashTable *nodes_byname;
134 };
135
136 static guint
137 g_int32_hash (gconstpointer v)
138 {
139 return *(const uint32_t *) v;
140 }
141
142 static gboolean
143 g_int32_equal (gconstpointer v1,
144 gconstpointer v2)
145 {
146 return *((const uint32_t*) v1) == *((const uint32_t*) v2);
147 }
148
149 static void vminfo_free(vminfo_t *vminfo)
150 {
151 g_return_if_fail(vminfo != NULL);
152
153 if (vminfo->nodename)
154 g_free(vminfo->nodename);
155
156
157 g_free(vminfo);
158 }
159
160 void cfs_clnode_destroy(
161 cfs_clnode_t *clnode)
162 {
163 g_return_if_fail(clnode != NULL);
164
165 if (clnode->kvhash)
166 g_hash_table_destroy(clnode->kvhash);
167
168 if (clnode->name)
169 g_free(clnode->name);
170
171 g_free(clnode);
172 }
173
174 cfs_clnode_t *cfs_clnode_new(
175 const char *name,
176 uint32_t nodeid,
177 uint32_t votes)
178 {
179 g_return_val_if_fail(name != NULL, NULL);
180
181 cfs_clnode_t *clnode = g_new0(cfs_clnode_t, 1);
182 if (!clnode)
183 return NULL;
184
185 clnode->name = g_strdup(name);
186 clnode->nodeid = nodeid;
187 clnode->votes = votes;
188
189 return clnode;
190 }
191
192 gboolean cfs_clinfo_destroy(
193 cfs_clinfo_t *clinfo)
194 {
195 g_return_val_if_fail(clinfo != NULL, FALSE);
196
197 if (clinfo->cluster_name)
198 g_free(clinfo->cluster_name);
199
200 if (clinfo->nodes_byname)
201 g_hash_table_destroy(clinfo->nodes_byname);
202
203 if (clinfo->nodes_byid)
204 g_hash_table_destroy(clinfo->nodes_byid);
205
206 g_free(clinfo);
207
208 return TRUE;
209 }
210
211 cfs_clinfo_t *cfs_clinfo_new(
212 const char *cluster_name,
213 uint32_t cman_version)
214 {
215 g_return_val_if_fail(cluster_name != NULL, NULL);
216
217 cfs_clinfo_t *clinfo = g_new0(cfs_clinfo_t, 1);
218 if (!clinfo)
219 return NULL;
220
221 clinfo->cluster_name = g_strdup(cluster_name);
222 clinfo->cman_version = cman_version;
223
224 if (!(clinfo->nodes_byid = g_hash_table_new_full(
225 g_int32_hash, g_int32_equal, NULL,
226 (GDestroyNotify)cfs_clnode_destroy)))
227 goto fail;
228
229 if (!(clinfo->nodes_byname = g_hash_table_new(g_str_hash, g_str_equal)))
230 goto fail;
231
232 return clinfo;
233
234 fail:
235 cfs_clinfo_destroy(clinfo);
236
237 return NULL;
238 }
239
240 gboolean cfs_clinfo_add_node(
241 cfs_clinfo_t *clinfo,
242 cfs_clnode_t *clnode)
243 {
244 g_return_val_if_fail(clinfo != NULL, FALSE);
245 g_return_val_if_fail(clnode != NULL, FALSE);
246
247 g_hash_table_replace(clinfo->nodes_byid, &clnode->nodeid, clnode);
248 g_hash_table_replace(clinfo->nodes_byname, clnode->name, clnode);
249
250 return TRUE;
251 }
252
253 int
254 cfs_create_memberlist_msg(
255 GString *str)
256 {
257 g_return_val_if_fail(str != NULL, -EINVAL);
258
259 g_mutex_lock (&mutex);
260
261 g_string_append_printf(str,"{\n");
262
263 guint nodecount = 0;
264
265 cfs_clinfo_t *clinfo = cfs_status.clinfo;
266
267 if (clinfo && clinfo->nodes_byid)
268 nodecount = g_hash_table_size(clinfo->nodes_byid);
269
270 if (nodecount) {
271 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
272 g_string_append_printf(str, "\"version\": %u,\n", cfs_status.clinfo_version);
273
274 g_string_append_printf(str, "\"cluster\": { ");
275 g_string_append_printf(str, "\"name\": \"%s\", \"version\": %d, "
276 "\"nodes\": %d, \"quorate\": %d ",
277 clinfo->cluster_name, clinfo->cman_version,
278 nodecount, cfs_status.quorate);
279
280 g_string_append_printf(str,"},\n");
281 g_string_append_printf(str,"\"nodelist\": {\n");
282
283 GHashTable *ht = clinfo->nodes_byid;
284 GHashTableIter iter;
285 gpointer key, value;
286
287 g_hash_table_iter_init (&iter, ht);
288
289 int i = 0;
290 while (g_hash_table_iter_next (&iter, &key, &value)) {
291 cfs_clnode_t *node = (cfs_clnode_t *)value;
292 if (i) g_string_append_printf(str, ",\n");
293 i++;
294
295 g_string_append_printf(str, " \"%s\": { \"id\": %d, \"online\": %d",
296 node->name, node->nodeid, node->online);
297
298
299 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, node->name);
300 if (ip) {
301 g_string_append_printf(str, ", \"ip\": \"%s\"", ip);
302 }
303
304 g_string_append_printf(str, "}");
305
306 }
307 g_string_append_printf(str,"\n }\n");
308 } else {
309 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
310 g_string_append_printf(str, "\"version\": %u\n", cfs_status.clinfo_version);
311 }
312
313 g_string_append_printf(str,"}\n");
314
315 g_mutex_unlock (&mutex);
316
317 return 0;
318 }
319
320 static void
321 kventry_free(kventry_t *entry)
322 {
323 g_return_if_fail(entry != NULL);
324
325 g_free(entry->key);
326 g_free(entry->data);
327 g_free(entry);
328 }
329
330 static GHashTable *
331 kventry_hash_new(void)
332 {
333 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
334 (GDestroyNotify)kventry_free);
335 }
336
337 static void
338 rrdentry_free(rrdentry_t *entry)
339 {
340 g_return_if_fail(entry != NULL);
341
342 g_free(entry->key);
343 g_free(entry->data);
344 g_free(entry);
345 }
346
347 static GHashTable *
348 rrdentry_hash_new(void)
349 {
350 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
351 (GDestroyNotify)rrdentry_free);
352 }
353
354 void
355 cfs_cluster_log_dump(GString *str, const char *user, guint max_entries)
356 {
357 clusterlog_dump(cfs_status.clusterlog, str, user, max_entries);
358 }
359
360 void
361 cfs_cluster_log(clog_entry_t *entry)
362 {
363 g_return_if_fail(entry != NULL);
364
365 clusterlog_insert(cfs_status.clusterlog, entry);
366
367 if (cfs_status.kvstore) {
368 struct iovec iov[1];
369 iov[0].iov_base = (char *)entry;
370 iov[0].iov_len = clog_entry_size(entry);
371
372 if (dfsm_is_initialized(cfs_status.kvstore))
373 dfsm_send_message(cfs_status.kvstore, KVSTORE_MESSAGE_LOG, iov, 1);
374 }
375 }
376
377 void cfs_status_init(void)
378 {
379 g_mutex_lock (&mutex);
380
381 cfs_status.start_time = time(NULL);
382
383 cfs_status.vmlist = vmlist_hash_new();
384
385 cfs_status.kvhash = kventry_hash_new();
386
387 cfs_status.rrdhash = rrdentry_hash_new();
388
389 cfs_status.iphash = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
390
391 cfs_status.memdb_changes = g_hash_table_new(g_str_hash, g_str_equal);
392
393 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
394 g_hash_table_replace(cfs_status.memdb_changes,
395 memdb_change_array[i].path,
396 &memdb_change_array[i]);
397 }
398
399 cfs_status.clusterlog = clusterlog_new();
400
401 // fixme:
402 clusterlog_add(cfs_status.clusterlog, "root", "cluster", getpid(),
403 LOG_INFO, "starting cluster log");
404
405 g_mutex_unlock (&mutex);
406 }
407
408 void cfs_status_cleanup(void)
409 {
410 g_mutex_lock (&mutex);
411
412 cfs_status.clinfo_version++;
413
414 if (cfs_status.clinfo) {
415 cfs_clinfo_destroy(cfs_status.clinfo);
416 cfs_status.clinfo = NULL;
417 }
418
419 if (cfs_status.vmlist) {
420 g_hash_table_destroy(cfs_status.vmlist);
421 cfs_status.vmlist = NULL;
422 }
423
424 if (cfs_status.kvhash) {
425 g_hash_table_destroy(cfs_status.kvhash);
426 cfs_status.kvhash = NULL;
427 }
428
429 if (cfs_status.rrdhash) {
430 g_hash_table_destroy(cfs_status.rrdhash);
431 cfs_status.rrdhash = NULL;
432 }
433
434 if (cfs_status.iphash) {
435 g_hash_table_destroy(cfs_status.iphash);
436 cfs_status.iphash = NULL;
437 }
438
439 if (cfs_status.clusterlog)
440 clusterlog_destroy(cfs_status.clusterlog);
441
442 g_mutex_unlock (&mutex);
443 }
444
445 void cfs_status_set_clinfo(
446 cfs_clinfo_t *clinfo)
447 {
448 g_return_if_fail(clinfo != NULL);
449
450 g_mutex_lock (&mutex);
451
452 cfs_status.clinfo_version++;
453
454 cfs_clinfo_t *old = cfs_status.clinfo;
455
456 cfs_status.clinfo = clinfo;
457
458 cfs_message("update cluster info (cluster name %s, version = %d)",
459 clinfo->cluster_name, clinfo->cman_version);
460
461
462 if (old && old->nodes_byid && clinfo->nodes_byid) {
463 /* copy kvstore */
464 GHashTable *ht = clinfo->nodes_byid;
465 GHashTableIter iter;
466 gpointer key, value;
467
468 g_hash_table_iter_init (&iter, ht);
469
470 while (g_hash_table_iter_next (&iter, &key, &value)) {
471 cfs_clnode_t *node = (cfs_clnode_t *)value;
472 cfs_clnode_t *oldnode;
473 if ((oldnode = g_hash_table_lookup(old->nodes_byid, key))) {
474 node->online = oldnode->online;
475 node->kvhash = oldnode->kvhash;
476 oldnode->kvhash = NULL;
477 }
478 }
479
480 }
481
482 if (old)
483 cfs_clinfo_destroy(old);
484
485
486 g_mutex_unlock (&mutex);
487 }
488
489 static void
490 dump_kvstore_versions(
491 GString *str,
492 GHashTable *kvhash,
493 const char *nodename)
494 {
495 g_return_if_fail(kvhash != NULL);
496 g_return_if_fail(str != NULL);
497 g_return_if_fail(nodename != NULL);
498
499 GHashTable *ht = kvhash;
500 GHashTableIter iter;
501 gpointer key, value;
502
503 g_string_append_printf(str, "\"%s\": {\n", nodename);
504
505 g_hash_table_iter_init (&iter, ht);
506
507 int i = 0;
508 while (g_hash_table_iter_next (&iter, &key, &value)) {
509 kventry_t *entry = (kventry_t *)value;
510 if (i) g_string_append_printf(str, ",\n");
511 i++;
512 g_string_append_printf(str,"\"%s\": %u", entry->key, entry->version);
513 }
514
515 g_string_append_printf(str, "}\n");
516 }
517
518 int
519 cfs_create_version_msg(GString *str)
520 {
521 g_return_val_if_fail(str != NULL, -EINVAL);
522
523 g_mutex_lock (&mutex);
524
525 g_string_append_printf(str,"{\n");
526
527 g_string_append_printf(str, "\"starttime\": %lu,\n", (unsigned long)cfs_status.start_time);
528
529 g_string_append_printf(str, "\"clinfo\": %u,\n", cfs_status.clinfo_version);
530
531 g_string_append_printf(str, "\"vmlist\": %u,\n", cfs_status.vmlist_version);
532
533 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
534 g_string_append_printf(str, "\"%s\": %u,\n",
535 memdb_change_array[i].path,
536 memdb_change_array[i].version);
537 }
538
539 g_string_append_printf(str, "\"kvstore\": {\n");
540
541 dump_kvstore_versions(str, cfs_status.kvhash, cfs.nodename);
542
543 cfs_clinfo_t *clinfo = cfs_status.clinfo;
544
545 if (clinfo && clinfo->nodes_byid) {
546 GHashTable *ht = clinfo->nodes_byid;
547 GHashTableIter iter;
548 gpointer key, value;
549
550 g_hash_table_iter_init (&iter, ht);
551
552 while (g_hash_table_iter_next (&iter, &key, &value)) {
553 cfs_clnode_t *node = (cfs_clnode_t *)value;
554 if (!node->kvhash)
555 continue;
556 g_string_append_printf(str, ",\n");
557 dump_kvstore_versions(str, node->kvhash, node->name);
558 }
559 }
560
561 g_string_append_printf(str,"}\n");
562
563 g_string_append_printf(str,"}\n");
564
565 g_mutex_unlock (&mutex);
566
567 return 0;
568 }
569
570 GHashTable *
571 vmlist_hash_new(void)
572 {
573 return g_hash_table_new_full(g_int_hash, g_int_equal, NULL,
574 (GDestroyNotify)vminfo_free);
575 }
576
577 gboolean
578 vmlist_hash_insert_vm(
579 GHashTable *vmlist,
580 int vmtype,
581 guint32 vmid,
582 const char *nodename,
583 gboolean replace)
584 {
585 g_return_val_if_fail(vmlist != NULL, FALSE);
586 g_return_val_if_fail(nodename != NULL, FALSE);
587 g_return_val_if_fail(vmid != 0, FALSE);
588 g_return_val_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
589 vmtype == VMTYPE_LXC, FALSE);
590
591 if (!replace && g_hash_table_lookup(vmlist, &vmid)) {
592 cfs_critical("detected duplicate VMID %d", vmid);
593 return FALSE;
594 }
595
596 vminfo_t *vminfo = g_new0(vminfo_t, 1);
597
598 vminfo->vmid = vmid;
599 vminfo->vmtype = vmtype;
600 vminfo->nodename = g_strdup(nodename);
601
602 vminfo->version = ++vminfo_version_counter;
603
604 g_hash_table_replace(vmlist, &vminfo->vmid, vminfo);
605
606 return TRUE;
607 }
608
609 void
610 vmlist_register_vm(
611 int vmtype,
612 guint32 vmid,
613 const char *nodename)
614 {
615 g_return_if_fail(cfs_status.vmlist != NULL);
616 g_return_if_fail(nodename != NULL);
617 g_return_if_fail(vmid != 0);
618 g_return_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
619 vmtype == VMTYPE_LXC);
620
621 cfs_debug("vmlist_register_vm: %s/%u %d", nodename, vmid, vmtype);
622
623 g_mutex_lock (&mutex);
624
625 cfs_status.vmlist_version++;
626
627 vmlist_hash_insert_vm(cfs_status.vmlist, vmtype, vmid, nodename, TRUE);
628
629 g_mutex_unlock (&mutex);
630 }
631
632 gboolean
633 vmlist_different_vm_exists(
634 int vmtype,
635 guint32 vmid,
636 const char *nodename)
637 {
638 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
639 g_return_val_if_fail(vmid != 0, FALSE);
640
641 gboolean res = FALSE;
642
643 g_mutex_lock (&mutex);
644
645 vminfo_t *vminfo;
646 if ((vminfo = (vminfo_t *)g_hash_table_lookup(cfs_status.vmlist, &vmid))) {
647 if (!(vminfo->vmtype == vmtype && strcmp(vminfo->nodename, nodename) == 0))
648 res = TRUE;
649 }
650 g_mutex_unlock (&mutex);
651
652 return res;
653 }
654
655 gboolean
656 vmlist_vm_exists(
657 guint32 vmid)
658 {
659 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
660 g_return_val_if_fail(vmid != 0, FALSE);
661
662 g_mutex_lock (&mutex);
663
664 gpointer res = g_hash_table_lookup(cfs_status.vmlist, &vmid);
665
666 g_mutex_unlock (&mutex);
667
668 return res != NULL;
669 }
670
671 void
672 vmlist_delete_vm(
673 guint32 vmid)
674 {
675 g_return_if_fail(cfs_status.vmlist != NULL);
676 g_return_if_fail(vmid != 0);
677
678 g_mutex_lock (&mutex);
679
680 cfs_status.vmlist_version++;
681
682 g_hash_table_remove(cfs_status.vmlist, &vmid);
683
684 g_mutex_unlock (&mutex);
685 }
686
687 void cfs_status_set_vmlist(
688 GHashTable *vmlist)
689 {
690 g_return_if_fail(vmlist != NULL);
691
692 g_mutex_lock (&mutex);
693
694 cfs_status.vmlist_version++;
695
696 if (cfs_status.vmlist)
697 g_hash_table_destroy(cfs_status.vmlist);
698
699 cfs_status.vmlist = vmlist;
700
701 g_mutex_unlock (&mutex);
702 }
703
704 int
705 cfs_create_vmlist_msg(GString *str)
706 {
707 g_return_val_if_fail(cfs_status.vmlist != NULL, -EINVAL);
708 g_return_val_if_fail(str != NULL, -EINVAL);
709
710 g_mutex_lock (&mutex);
711
712 g_string_append_printf(str,"{\n");
713
714 GHashTable *ht = cfs_status.vmlist;
715
716 guint count = g_hash_table_size(ht);
717
718 if (!count) {
719 g_string_append_printf(str,"\"version\": %u\n", cfs_status.vmlist_version);
720 } else {
721 g_string_append_printf(str,"\"version\": %u,\n", cfs_status.vmlist_version);
722
723 g_string_append_printf(str,"\"ids\": {\n");
724
725 GHashTableIter iter;
726 gpointer key, value;
727
728 g_hash_table_iter_init (&iter, ht);
729
730 int first = 1;
731 while (g_hash_table_iter_next (&iter, &key, &value)) {
732 vminfo_t *vminfo = (vminfo_t *)value;
733 char *type;
734 if (vminfo->vmtype == VMTYPE_QEMU) {
735 type = "qemu";
736 } else if (vminfo->vmtype == VMTYPE_OPENVZ) {
737 type = "openvz";
738 } else if (vminfo->vmtype == VMTYPE_LXC) {
739 type = "lxc";
740 } else {
741 type = "unknown";
742 }
743
744 if (!first)
745 g_string_append_printf(str, ",\n");
746 first = 0;
747
748 g_string_append_printf(str,"\"%u\": { \"node\": \"%s\", \"type\": \"%s\", \"version\": %u }",
749 vminfo->vmid, vminfo->nodename, type, vminfo->version);
750 }
751
752 g_string_append_printf(str,"}\n");
753 }
754 g_string_append_printf(str,"\n}\n");
755
756 g_mutex_unlock (&mutex);
757
758 return 0;
759 }
760
761 void
762 record_memdb_change(const char *path)
763 {
764 g_return_if_fail(cfs_status.memdb_changes != 0);
765
766 memdb_change_t *ce;
767
768 if ((ce = (memdb_change_t *)g_hash_table_lookup(cfs_status.memdb_changes, path))) {
769 ce->version++;
770 }
771 }
772
773 void
774 record_memdb_reload(void)
775 {
776 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
777 memdb_change_array[i].version++;
778 }
779 }
780
781 static gboolean
782 kventry_hash_set(
783 GHashTable *kvhash,
784 const char *key,
785 gconstpointer data,
786 size_t len)
787 {
788 g_return_val_if_fail(kvhash != NULL, FALSE);
789 g_return_val_if_fail(key != NULL, FALSE);
790 g_return_val_if_fail(data != NULL, FALSE);
791
792 kventry_t *entry;
793 if ((entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
794 g_free(entry->data);
795 entry->data = g_memdup(data, len);
796 entry->len = len;
797 entry->version++;
798 } else {
799 kventry_t *entry = g_new0(kventry_t, 1);
800
801 entry->key = g_strdup(key);
802 entry->data = g_memdup(data, len);
803 entry->len = len;
804
805 g_hash_table_replace(kvhash, entry->key, entry);
806 }
807
808 return TRUE;
809 }
810
811 static const char *rrd_def_node[] = {
812 "DS:loadavg:GAUGE:120:0:U",
813 "DS:maxcpu:GAUGE:120:0:U",
814 "DS:cpu:GAUGE:120:0:U",
815 "DS:iowait:GAUGE:120:0:U",
816 "DS:memtotal:GAUGE:120:0:U",
817 "DS:memused:GAUGE:120:0:U",
818 "DS:swaptotal:GAUGE:120:0:U",
819 "DS:swapused:GAUGE:120:0:U",
820 "DS:roottotal:GAUGE:120:0:U",
821 "DS:rootused:GAUGE:120:0:U",
822 "DS:netin:DERIVE:120:0:U",
823 "DS:netout:DERIVE:120:0:U",
824
825 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
826 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
827 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
828 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
829 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
830
831 "RRA:MAX:0.5:1:70", // 1 min max - one hour
832 "RRA:MAX:0.5:30:70", // 30 min max - one day
833 "RRA:MAX:0.5:180:70", // 3 hour max - one week
834 "RRA:MAX:0.5:720:70", // 12 hour max - one month
835 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
836 NULL,
837 };
838
839 static const char *rrd_def_vm[] = {
840 "DS:maxcpu:GAUGE:120:0:U",
841 "DS:cpu:GAUGE:120:0:U",
842 "DS:maxmem:GAUGE:120:0:U",
843 "DS:mem:GAUGE:120:0:U",
844 "DS:maxdisk:GAUGE:120:0:U",
845 "DS:disk:GAUGE:120:0:U",
846 "DS:netin:DERIVE:120:0:U",
847 "DS:netout:DERIVE:120:0:U",
848 "DS:diskread:DERIVE:120:0:U",
849 "DS:diskwrite:DERIVE:120:0:U",
850
851 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
852 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
853 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
854 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
855 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
856
857 "RRA:MAX:0.5:1:70", // 1 min max - one hour
858 "RRA:MAX:0.5:30:70", // 30 min max - one day
859 "RRA:MAX:0.5:180:70", // 3 hour max - one week
860 "RRA:MAX:0.5:720:70", // 12 hour max - one month
861 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
862 NULL,
863 };
864
865 static const char *rrd_def_storage[] = {
866 "DS:total:GAUGE:120:0:U",
867 "DS:used:GAUGE:120:0:U",
868
869 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
870 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
871 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
872 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
873 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
874
875 "RRA:MAX:0.5:1:70", // 1 min max - one hour
876 "RRA:MAX:0.5:30:70", // 30 min max - one day
877 "RRA:MAX:0.5:180:70", // 3 hour max - one week
878 "RRA:MAX:0.5:720:70", // 12 hour max - one month
879 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
880 NULL,
881 };
882
883 #define RRDDIR "/var/lib/rrdcached/db"
884
885 static void
886 create_rrd_file(
887 const char *filename,
888 int argcount,
889 const char *rrddef[])
890 {
891 /* start at day boundary */
892 time_t ctime;
893 time(&ctime);
894 struct tm *ltm = localtime(&ctime);
895 ltm->tm_sec = 0;
896 ltm->tm_min = 0;
897 ltm->tm_hour = 0;
898
899 rrd_clear_error();
900 if (rrd_create_r(filename, 60, timelocal(ltm), argcount, rrddef)) {
901 cfs_message("RRD create error %s: %s", filename, rrd_get_error());
902 }
903 }
904
905 static inline const char *
906 rrd_skip_data(
907 const char *data,
908 int count)
909 {
910 int found = 0;
911 while (*data && found < count) {
912 if (*data++ == ':')
913 found++;
914 }
915 return data;
916 }
917
918 static void
919 update_rrd_data(
920 const char *key,
921 gconstpointer data,
922 size_t len)
923 {
924 g_return_if_fail(key != NULL);
925 g_return_if_fail(data != NULL);
926 g_return_if_fail(len > 0);
927 g_return_if_fail(len < 4096);
928
929 static const char *rrdcsock = "unix:/var/run/rrdcached.sock";
930
931 int use_daemon = 1;
932 if (rrdc_connect(rrdcsock) != 0)
933 use_daemon = 0;
934
935 char *filename = NULL;
936
937 int skip = 0;
938
939 if (strncmp(key, "pve2-node/", 10) == 0) {
940 const char *node = key + 10;
941
942 skip = 2;
943
944 if (strchr(node, '/') != NULL)
945 goto keyerror;
946
947 if (strlen(node) < 1)
948 goto keyerror;
949
950 filename = g_strdup_printf(RRDDIR "/%s", key);
951
952 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
953
954 mkdir(RRDDIR "/pve2-node", 0755);
955 int argcount = sizeof(rrd_def_node)/sizeof(void*) - 1;
956 create_rrd_file(filename, argcount, rrd_def_node);
957 }
958
959 } else if ((strncmp(key, "pve2-vm/", 8) == 0) ||
960 (strncmp(key, "pve2.3-vm/", 10) == 0)) {
961 const char *vmid;
962
963 if (strncmp(key, "pve2-vm/", 8) == 0) {
964 vmid = key + 8;
965 skip = 2;
966 } else {
967 vmid = key + 10;
968 skip = 4;
969 }
970
971 if (strchr(vmid, '/') != NULL)
972 goto keyerror;
973
974 if (strlen(vmid) < 1)
975 goto keyerror;
976
977 filename = g_strdup_printf(RRDDIR "/%s/%s", "pve2-vm", vmid);
978
979 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
980
981 mkdir(RRDDIR "/pve2-vm", 0755);
982 int argcount = sizeof(rrd_def_vm)/sizeof(void*) - 1;
983 create_rrd_file(filename, argcount, rrd_def_vm);
984 }
985
986 } else if (strncmp(key, "pve2-storage/", 13) == 0) {
987 const char *node = key + 13;
988
989 const char *storage = node;
990 while (*storage && *storage != '/')
991 storage++;
992
993 if (*storage != '/' || ((storage - node) < 1))
994 goto keyerror;
995
996 storage++;
997
998 if (strchr(storage, '/') != NULL)
999 goto keyerror;
1000
1001 if (strlen(storage) < 1)
1002 goto keyerror;
1003
1004 filename = g_strdup_printf(RRDDIR "/%s", key);
1005
1006 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1007
1008 mkdir(RRDDIR "/pve2-storage", 0755);
1009
1010 char *dir = g_path_get_dirname(filename);
1011 mkdir(dir, 0755);
1012 g_free(dir);
1013
1014 int argcount = sizeof(rrd_def_storage)/sizeof(void*) - 1;
1015 create_rrd_file(filename, argcount, rrd_def_storage);
1016 }
1017
1018 } else {
1019 goto keyerror;
1020 }
1021
1022 const char *dp = skip ? rrd_skip_data(data, skip) : data;
1023
1024 const char *update_args[] = { dp, NULL };
1025
1026 if (use_daemon) {
1027 int status;
1028 if ((status = rrdc_update(filename, 1, update_args)) != 0) {
1029 cfs_message("RRDC update error %s: %d", filename, status);
1030 rrdc_disconnect();
1031 rrd_clear_error();
1032 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1033 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1034 }
1035 }
1036
1037 } else {
1038 rrd_clear_error();
1039 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1040 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1041 }
1042 }
1043
1044 ret:
1045 if (filename)
1046 g_free(filename);
1047
1048 return;
1049
1050 keyerror:
1051 cfs_critical("RRD update error: unknown/wrong key %s", key);
1052 goto ret;
1053 }
1054
1055 static gboolean
1056 rrd_entry_is_old(
1057 gpointer key,
1058 gpointer value,
1059 gpointer user_data)
1060 {
1061 rrdentry_t *entry = (rrdentry_t *)value;
1062 uint32_t ctime = GPOINTER_TO_UINT(user_data);
1063
1064 int diff = ctime - entry->time;
1065
1066 /* remove everything older than 5 minutes */
1067 int expire = 60*5;
1068
1069 return (diff > expire) ? TRUE : FALSE;
1070 }
1071
1072 static char *rrd_dump_buf = NULL;
1073 static time_t rrd_dump_last = 0;
1074
1075 void
1076 cfs_rrd_dump(GString *str)
1077 {
1078 time_t ctime;
1079 time(&ctime);
1080
1081 if (rrd_dump_buf && (ctime - rrd_dump_last) < 2) {
1082 g_string_assign(str, rrd_dump_buf);
1083 return;
1084 }
1085
1086 /* remove old data */
1087 g_hash_table_foreach_remove(cfs_status.rrdhash, rrd_entry_is_old,
1088 GUINT_TO_POINTER(ctime));
1089
1090 g_string_set_size(str, 0);
1091
1092 GHashTableIter iter;
1093 gpointer key, value;
1094
1095 g_hash_table_iter_init (&iter, cfs_status.rrdhash);
1096
1097 while (g_hash_table_iter_next (&iter, &key, &value)) {
1098 rrdentry_t *entry = (rrdentry_t *)value;
1099 g_string_append(str, key);
1100 g_string_append(str, ":");
1101 g_string_append(str, entry->data);
1102 g_string_append(str, "\n");
1103 }
1104
1105 g_string_append_c(str, 0); // never return undef
1106
1107 rrd_dump_last = ctime;
1108 if (rrd_dump_buf)
1109 g_free(rrd_dump_buf);
1110 rrd_dump_buf = g_strdup(str->str);
1111 }
1112
1113 static gboolean
1114 nodeip_hash_set(
1115 GHashTable *iphash,
1116 const char *nodename,
1117 const char *ip,
1118 size_t len)
1119 {
1120 g_return_val_if_fail(iphash != NULL, FALSE);
1121 g_return_val_if_fail(nodename != NULL, FALSE);
1122 g_return_val_if_fail(ip != NULL, FALSE);
1123 g_return_val_if_fail(len > 0, FALSE);
1124 g_return_val_if_fail(len < 256, FALSE);
1125 g_return_val_if_fail(ip[len-1] == 0, FALSE);
1126
1127 char *oldip = (char *)g_hash_table_lookup(iphash, nodename);
1128
1129 if (!oldip || (strcmp(oldip, ip) != 0)) {
1130 cfs_status.clinfo_version++;
1131 g_hash_table_replace(iphash, g_strdup(nodename), g_strdup(ip));
1132 }
1133
1134 return TRUE;
1135 }
1136
1137 static gboolean
1138 rrdentry_hash_set(
1139 GHashTable *rrdhash,
1140 const char *key,
1141 const char *data,
1142 size_t len)
1143 {
1144 g_return_val_if_fail(rrdhash != NULL, FALSE);
1145 g_return_val_if_fail(key != NULL, FALSE);
1146 g_return_val_if_fail(data != NULL, FALSE);
1147 g_return_val_if_fail(len > 0, FALSE);
1148 g_return_val_if_fail(len < 4096, FALSE);
1149 g_return_val_if_fail(data[len-1] == 0, FALSE);
1150
1151 rrdentry_t *entry;
1152 if ((entry = (rrdentry_t *)g_hash_table_lookup(rrdhash, key))) {
1153 g_free(entry->data);
1154 entry->data = g_memdup(data, len);
1155 entry->len = len;
1156 entry->time = time(NULL);
1157 } else {
1158 rrdentry_t *entry = g_new0(rrdentry_t, 1);
1159
1160 entry->key = g_strdup(key);
1161 entry->data = g_memdup(data, len);
1162 entry->len = len;
1163 entry->time = time(NULL);
1164
1165 g_hash_table_replace(rrdhash, entry->key, entry);
1166 }
1167
1168 update_rrd_data(key, data, len);
1169
1170 return TRUE;
1171 }
1172
1173 static int
1174 kvstore_send_update_message(
1175 dfsm_t *dfsm,
1176 const char *key,
1177 gpointer data,
1178 guint32 len)
1179 {
1180 if (!dfsm_is_initialized(dfsm))
1181 return -EACCES;
1182
1183 struct iovec iov[2];
1184
1185 char name[256];
1186 g_strlcpy(name, key, sizeof(name));
1187
1188 iov[0].iov_base = &name;
1189 iov[0].iov_len = sizeof(name);
1190
1191 iov[1].iov_base = (char *)data;
1192 iov[1].iov_len = len;
1193
1194 if (dfsm_send_message(dfsm, KVSTORE_MESSAGE_UPDATE, iov, 2) == CS_OK)
1195 return 0;
1196
1197 return -EACCES;
1198 }
1199
1200 static clog_entry_t *
1201 kvstore_parse_log_message(
1202 const void *msg,
1203 size_t msg_len)
1204 {
1205 g_return_val_if_fail(msg != NULL, NULL);
1206
1207 if (msg_len < sizeof(clog_entry_t)) {
1208 cfs_critical("received short log message (%zu < %zu)", msg_len, sizeof(clog_entry_t));
1209 return NULL;
1210 }
1211
1212 clog_entry_t *entry = (clog_entry_t *)msg;
1213
1214 uint32_t size = sizeof(clog_entry_t) + entry->node_len +
1215 entry->ident_len + entry->tag_len + entry->msg_len;
1216
1217 if (msg_len != size) {
1218 cfs_critical("received log message with wrong size (%zu != %u)", msg_len, size);
1219 return NULL;
1220 }
1221
1222 msg = entry->data;
1223
1224 if (*((char *)msg + entry->node_len - 1)) {
1225 cfs_critical("unterminated string in log message");
1226 return NULL;
1227 }
1228 msg += entry->node_len;
1229
1230 if (*((char *)msg + entry->ident_len - 1)) {
1231 cfs_critical("unterminated string in log message");
1232 return NULL;
1233 }
1234 msg += entry->ident_len;
1235
1236 if (*((char *)msg + entry->tag_len - 1)) {
1237 cfs_critical("unterminated string in log message");
1238 return NULL;
1239 }
1240 msg += entry->tag_len;
1241
1242 if (*((char *)msg + entry->msg_len - 1)) {
1243 cfs_critical("unterminated string in log message");
1244 return NULL;
1245 }
1246
1247 return entry;
1248 }
1249
1250 static gboolean
1251 kvstore_parse_update_message(
1252 const void *msg,
1253 size_t msg_len,
1254 const char **key,
1255 gconstpointer *data,
1256 guint32 *len)
1257 {
1258 g_return_val_if_fail(msg != NULL, FALSE);
1259 g_return_val_if_fail(key != NULL, FALSE);
1260 g_return_val_if_fail(data != NULL, FALSE);
1261 g_return_val_if_fail(len != NULL, FALSE);
1262
1263 if (msg_len < 256) {
1264 cfs_critical("received short kvstore message (%zu < 256)", msg_len);
1265 return FALSE;
1266 }
1267
1268 /* test if key is null terminated */
1269 int i = 0;
1270 for (i = 0; i < 256; i++)
1271 if (((char *)msg)[i] == 0)
1272 break;
1273
1274 if (i == 256)
1275 return FALSE;
1276
1277
1278 *len = msg_len - 256;
1279 *key = msg;
1280 *data = msg + 256;
1281
1282 return TRUE;
1283 }
1284
1285 int
1286 cfs_create_status_msg(
1287 GString *str,
1288 const char *nodename,
1289 const char *key)
1290 {
1291 g_return_val_if_fail(str != NULL, -EINVAL);
1292 g_return_val_if_fail(key != NULL, -EINVAL);
1293
1294 int res = -ENOENT;
1295
1296 GHashTable *kvhash = NULL;
1297
1298 g_mutex_lock (&mutex);
1299
1300 if (!nodename || !nodename[0] || !strcmp(nodename, cfs.nodename)) {
1301 kvhash = cfs_status.kvhash;
1302 } else {
1303 cfs_clnode_t *clnode;
1304 if ((clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byname, nodename)))
1305 kvhash = clnode->kvhash;
1306 }
1307
1308 kventry_t *entry;
1309 if (kvhash && (entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
1310 g_string_append_len(str, entry->data, entry->len);
1311 res = 0;
1312 }
1313
1314 g_mutex_unlock (&mutex);
1315
1316 return res;
1317 }
1318
1319 int
1320 cfs_status_set(
1321 const char *key,
1322 gpointer data,
1323 size_t len)
1324 {
1325 g_return_val_if_fail(key != NULL, FALSE);
1326 g_return_val_if_fail(data != NULL, FALSE);
1327 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1328
1329 if (len > CFS_MAX_STATUS_SIZE)
1330 return -EFBIG;
1331
1332 g_mutex_lock (&mutex);
1333
1334 gboolean res;
1335
1336 if (strncmp(key, "rrd/", 4) == 0) {
1337 res = rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1338 } else if (!strcmp(key, "nodeip")) {
1339 res = nodeip_hash_set(cfs_status.iphash, cfs.nodename, data, len);
1340 } else {
1341 res = kventry_hash_set(cfs_status.kvhash, key, data, len);
1342 }
1343 g_mutex_unlock (&mutex);
1344
1345 if (cfs_status.kvstore)
1346 kvstore_send_update_message(cfs_status.kvstore, key, data, len);
1347
1348 return res ? 0 : -ENOMEM;
1349 }
1350
1351 gboolean
1352 cfs_kvstore_node_set(
1353 uint32_t nodeid,
1354 const char *key,
1355 gconstpointer data,
1356 size_t len)
1357 {
1358 g_return_val_if_fail(nodeid != 0, FALSE);
1359 g_return_val_if_fail(key != NULL, FALSE);
1360 g_return_val_if_fail(data != NULL, FALSE);
1361
1362 g_mutex_lock (&mutex);
1363
1364 if (!cfs_status.clinfo || !cfs_status.clinfo->nodes_byid)
1365 goto ret; /* ignore */
1366
1367 cfs_clnode_t *clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byid, &nodeid);
1368 if (!clnode)
1369 goto ret; /* ignore */
1370
1371 cfs_debug("got node %d status update %s", nodeid, key);
1372
1373 if (strncmp(key, "rrd/", 4) == 0) {
1374 rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1375 } else if (!strcmp(key, "nodeip")) {
1376 nodeip_hash_set(cfs_status.iphash, clnode->name, data, len);
1377 } else {
1378 if (!clnode->kvhash) {
1379 if (!(clnode->kvhash = kventry_hash_new())) {
1380 goto ret; /*ignore */
1381 }
1382 }
1383
1384 kventry_hash_set(clnode->kvhash, key, data, len);
1385
1386 }
1387 ret:
1388 g_mutex_unlock (&mutex);
1389
1390 return TRUE;
1391 }
1392
1393 static gboolean
1394 cfs_kvstore_sync(void)
1395 {
1396 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1397 g_return_val_if_fail(cfs_status.kvstore != NULL, FALSE);
1398
1399 gboolean res = TRUE;
1400
1401 g_mutex_lock (&mutex);
1402
1403 GHashTable *ht = cfs_status.kvhash;
1404 GHashTableIter iter;
1405 gpointer key, value;
1406
1407 g_hash_table_iter_init (&iter, ht);
1408
1409 while (g_hash_table_iter_next (&iter, &key, &value)) {
1410 kventry_t *entry = (kventry_t *)value;
1411 kvstore_send_update_message(cfs_status.kvstore, entry->key, entry->data, entry->len);
1412 }
1413
1414 g_mutex_unlock (&mutex);
1415
1416 return res;
1417 }
1418
1419 static int
1420 dfsm_deliver(
1421 dfsm_t *dfsm,
1422 gpointer data,
1423 int *res_ptr,
1424 uint32_t nodeid,
1425 uint32_t pid,
1426 uint16_t msg_type,
1427 uint32_t msg_time,
1428 const void *msg,
1429 size_t msg_len)
1430 {
1431 g_return_val_if_fail(dfsm != NULL, -1);
1432 g_return_val_if_fail(msg != NULL, -1);
1433 g_return_val_if_fail(res_ptr != NULL, -1);
1434
1435 /* ignore message for ourself */
1436 if (dfsm_nodeid_is_local(dfsm, nodeid, pid))
1437 goto ret;
1438
1439 if (msg_type == KVSTORE_MESSAGE_UPDATE) {
1440 const char *key;
1441 gconstpointer data;
1442 guint32 len;
1443 if (kvstore_parse_update_message(msg, msg_len, &key, &data, &len)) {
1444 cfs_kvstore_node_set(nodeid, key, data, len);
1445 } else {
1446 cfs_critical("cant parse update message");
1447 }
1448 } else if (msg_type == KVSTORE_MESSAGE_LOG) {
1449 cfs_message("received log"); // fixme: remove
1450 const clog_entry_t *entry;
1451 if ((entry = kvstore_parse_log_message(msg, msg_len))) {
1452 clusterlog_insert(cfs_status.clusterlog, entry);
1453 } else {
1454 cfs_critical("cant parse log message");
1455 }
1456 } else {
1457 cfs_critical("received unknown message type %d\n", msg_type);
1458 goto fail;
1459 }
1460
1461 ret:
1462 *res_ptr = 0;
1463 return 1;
1464
1465 fail:
1466 *res_ptr = -EACCES;
1467 return 1;
1468 }
1469
1470 static void
1471 dfsm_confchg(
1472 dfsm_t *dfsm,
1473 gpointer data,
1474 const struct cpg_address *member_list,
1475 size_t member_list_entries)
1476 {
1477 g_return_if_fail(dfsm != NULL);
1478 g_return_if_fail(member_list != NULL);
1479
1480 cfs_debug("enter %s", __func__);
1481
1482 g_mutex_lock (&mutex);
1483
1484 cfs_clinfo_t *clinfo = cfs_status.clinfo;
1485
1486 if (clinfo && clinfo->nodes_byid) {
1487
1488 GHashTable *ht = clinfo->nodes_byid;
1489 GHashTableIter iter;
1490 gpointer key, value;
1491
1492 g_hash_table_iter_init (&iter, ht);
1493
1494 while (g_hash_table_iter_next (&iter, &key, &value)) {
1495 cfs_clnode_t *node = (cfs_clnode_t *)value;
1496 node->online = FALSE;
1497 }
1498
1499 for (int i = 0; i < member_list_entries; i++) {
1500 cfs_clnode_t *node;
1501 if ((node = g_hash_table_lookup(clinfo->nodes_byid, &member_list[i].nodeid))) {
1502 node->online = TRUE;
1503 }
1504 }
1505
1506 cfs_status.clinfo_version++;
1507 }
1508
1509 g_mutex_unlock (&mutex);
1510 }
1511
1512 static gpointer
1513 dfsm_get_state(
1514 dfsm_t *dfsm,
1515 gpointer data,
1516 unsigned int *res_len)
1517 {
1518 g_return_val_if_fail(dfsm != NULL, NULL);
1519
1520 gpointer msg = clusterlog_get_state(cfs_status.clusterlog, res_len);
1521
1522 return msg;
1523 }
1524
1525 static int
1526 dfsm_process_update(
1527 dfsm_t *dfsm,
1528 gpointer data,
1529 dfsm_sync_info_t *syncinfo,
1530 uint32_t nodeid,
1531 uint32_t pid,
1532 const void *msg,
1533 size_t msg_len)
1534 {
1535 cfs_critical("%s: received unexpected update message", __func__);
1536
1537 return -1;
1538 }
1539
1540 static int
1541 dfsm_process_state_update(
1542 dfsm_t *dfsm,
1543 gpointer data,
1544 dfsm_sync_info_t *syncinfo)
1545 {
1546 g_return_val_if_fail(dfsm != NULL, -1);
1547 g_return_val_if_fail(syncinfo != NULL, -1);
1548
1549 clog_base_t *clog[syncinfo->node_count];
1550
1551 int local_index = -1;
1552 for (int i = 0; i < syncinfo->node_count; i++) {
1553 dfsm_node_info_t *ni = &syncinfo->nodes[i];
1554 ni->synced = 1;
1555
1556 if (syncinfo->local == ni)
1557 local_index = i;
1558
1559 clog_base_t *base = (clog_base_t *)ni->state;
1560 if (ni->state_len > 8 && ni->state_len == clog_size(base)) {
1561 clog[i] = ni->state;
1562 } else {
1563 cfs_critical("received log with wrong size %u", ni->state_len);
1564 clog[i] = NULL;
1565 }
1566 }
1567
1568 if (!clusterlog_merge(cfs_status.clusterlog, clog, syncinfo->node_count, local_index)) {
1569 cfs_critical("unable to merge log files");
1570 }
1571
1572 cfs_kvstore_sync();
1573
1574 return 1;
1575 }
1576
1577 static int
1578 dfsm_commit(
1579 dfsm_t *dfsm,
1580 gpointer data,
1581 dfsm_sync_info_t *syncinfo)
1582 {
1583 g_return_val_if_fail(dfsm != NULL, -1);
1584 g_return_val_if_fail(syncinfo != NULL, -1);
1585
1586 return 1;
1587 }
1588
1589 static void
1590 dfsm_synced(dfsm_t *dfsm)
1591 {
1592 g_return_if_fail(dfsm != NULL);
1593
1594 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, cfs.nodename);
1595 if (!ip)
1596 ip = cfs.ip;
1597
1598 cfs_status_set("nodeip", ip, strlen(ip) + 1);
1599 }
1600
1601 static int
1602 dfsm_cleanup(
1603 dfsm_t *dfsm,
1604 gpointer data,
1605 dfsm_sync_info_t *syncinfo)
1606 {
1607 return 1;
1608 }
1609
1610 static dfsm_callbacks_t kvstore_dfsm_callbacks = {
1611 .dfsm_deliver_fn = dfsm_deliver,
1612 .dfsm_confchg_fn = dfsm_confchg,
1613
1614 .dfsm_get_state_fn = dfsm_get_state,
1615 .dfsm_process_state_update_fn = dfsm_process_state_update,
1616 .dfsm_process_update_fn = dfsm_process_update,
1617 .dfsm_commit_fn = dfsm_commit,
1618 .dfsm_cleanup_fn = dfsm_cleanup,
1619 .dfsm_synced_fn = dfsm_synced,
1620 };
1621
1622 dfsm_t *
1623 cfs_status_dfsm_new(void)
1624 {
1625 g_mutex_lock (&mutex);
1626
1627 cfs_status.kvstore = dfsm_new(NULL, KVSTORE_CPG_GROUP_NAME, G_LOG_DOMAIN,
1628 0, &kvstore_dfsm_callbacks);
1629 g_mutex_unlock (&mutex);
1630
1631 return cfs_status.kvstore;
1632 }
1633
1634 gboolean
1635 cfs_is_quorate(void)
1636 {
1637 g_mutex_lock (&mutex);
1638 gboolean res = cfs_status.quorate;
1639 g_mutex_unlock (&mutex);
1640
1641 return res;
1642 }
1643
1644 void
1645 cfs_set_quorate(
1646 uint32_t quorate,
1647 gboolean quiet)
1648 {
1649 g_mutex_lock (&mutex);
1650
1651 uint32_t prev_quorate = cfs_status.quorate;
1652 cfs_status.quorate = quorate;
1653
1654 if (!prev_quorate && cfs_status.quorate) {
1655 if (!quiet)
1656 cfs_message("node has quorum");
1657 }
1658
1659 if (prev_quorate && !cfs_status.quorate) {
1660 if (!quiet)
1661 cfs_message("node lost quorum");
1662 }
1663
1664 g_mutex_unlock (&mutex);
1665 }
1666