]> git.proxmox.com Git - pve-cluster.git/blob - data/src/status.c
082053340869ce3d7fd4a799453b8abf465d9636
[pve-cluster.git] / data / src / status.c
1 /*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19 */
20
21 #define G_LOG_DOMAIN "status"
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif /* HAVE_CONFIG_H */
26
27 #include <stdio.h>
28 #include <stdint.h>
29 #include <string.h>
30 #include <errno.h>
31 #include <glib.h>
32 #include <sys/syslog.h>
33 #include <rrd.h>
34 #include <rrd_client.h>
35 #include <time.h>
36 #include <ctype.h>
37
38 #include "cfs-utils.h"
39 #include "status.h"
40 #include "memdb.h"
41 #include "logger.h"
42
43 #define KVSTORE_CPG_GROUP_NAME "pve_kvstore_v1"
44
45 typedef enum {
46 KVSTORE_MESSAGE_UPDATE = 1,
47 KVSTORE_MESSAGE_UPDATE_COMPLETE = 2,
48 KVSTORE_MESSAGE_LOG = 3,
49 } kvstore_message_t;
50
51 static uint32_t vminfo_version_counter;
52
53 typedef struct {
54 uint32_t vmid;
55 char *nodename;
56 int vmtype;
57 uint32_t version;
58 } vminfo_t;
59
60 typedef struct {
61 char *key;
62 gpointer data;
63 size_t len;
64 uint32_t version;
65 } kventry_t;
66
67 typedef struct {
68 char *key;
69 gpointer data;
70 size_t len;
71 uint32_t time;
72 } rrdentry_t;
73
74 typedef struct {
75 char *path;
76 uint32_t version;
77 } memdb_change_t;
78
79 static memdb_change_t memdb_change_array[] = {
80 { .path = "corosync.conf" },
81 { .path = "corosync.conf.new" },
82 { .path = "storage.cfg" },
83 { .path = "user.cfg" },
84 { .path = "domains.cfg" },
85 { .path = "priv/shadow.cfg" },
86 { .path = "priv/acme/plugins.cfg" },
87 { .path = "priv/tfa.cfg" },
88 { .path = "priv/token.cfg" },
89 { .path = "datacenter.cfg" },
90 { .path = "vzdump.cron" },
91 { .path = "ha/crm_commands" },
92 { .path = "ha/manager_status" },
93 { .path = "ha/resources.cfg" },
94 { .path = "ha/groups.cfg" },
95 { .path = "ha/fence.cfg" },
96 { .path = "status.cfg" },
97 { .path = "replication.cfg" },
98 { .path = "ceph.conf" },
99 { .path = "sdn/vnets.cfg" },
100 { .path = "sdn/vnets.cfg.new" },
101 { .path = "sdn/zones.cfg" },
102 { .path = "sdn/zones.cfg.new" },
103 { .path = "sdn/controllers.cfg" },
104 { .path = "sdn/controllers.cfg.new" },
105 { .path = "virtual-guest/cpu-models.conf" },
106 };
107
108 static GMutex mutex;
109
110 typedef struct {
111 time_t start_time;
112
113 uint32_t quorate;
114
115 cfs_clinfo_t *clinfo;
116 uint32_t clinfo_version;
117
118 GHashTable *vmlist;
119 uint32_t vmlist_version;
120
121 dfsm_t *kvstore;
122 GHashTable *kvhash;
123 GHashTable *rrdhash;
124 GHashTable *iphash;
125
126 GHashTable *memdb_changes;
127
128 clusterlog_t *clusterlog;
129 } cfs_status_t;
130
131 static cfs_status_t cfs_status;
132
133 struct cfs_clnode {
134 char *name;
135 uint32_t nodeid;
136 uint32_t votes;
137 gboolean online;
138 GHashTable *kvhash;
139 };
140
141 struct cfs_clinfo {
142 char *cluster_name;
143 uint32_t cman_version;
144
145 GHashTable *nodes_byid;
146 GHashTable *nodes_byname;
147 };
148
149 static guint
150 g_int32_hash (gconstpointer v)
151 {
152 return *(const uint32_t *) v;
153 }
154
155 static gboolean
156 g_int32_equal (gconstpointer v1,
157 gconstpointer v2)
158 {
159 return *((const uint32_t*) v1) == *((const uint32_t*) v2);
160 }
161
162 static void vminfo_free(vminfo_t *vminfo)
163 {
164 g_return_if_fail(vminfo != NULL);
165
166 if (vminfo->nodename)
167 g_free(vminfo->nodename);
168
169
170 g_free(vminfo);
171 }
172
173 static const char *vminfo_type_to_string(vminfo_t *vminfo)
174 {
175 if (vminfo->vmtype == VMTYPE_QEMU) {
176 return "qemu";
177 } else if (vminfo->vmtype == VMTYPE_OPENVZ) {
178 return "openvz";
179 } else if (vminfo->vmtype == VMTYPE_LXC) {
180 return "lxc";
181 } else {
182 return "unknown";
183 }
184 }
185
186 static const char *vminfo_type_to_path_type(vminfo_t *vminfo)
187 {
188 if (vminfo->vmtype == VMTYPE_QEMU) {
189 return "qemu-server"; // special case..
190 } else {
191 return vminfo_type_to_string(vminfo);
192 }
193 }
194
195 int vminfo_to_path(vminfo_t *vminfo, GString *path)
196 {
197 g_return_val_if_fail(vminfo != NULL, -1);
198 g_return_val_if_fail(path != NULL, -1);
199
200 if (!vminfo->nodename)
201 return 0;
202
203 const char *type = vminfo_type_to_path_type(vminfo);
204 g_string_printf(path, "/nodes/%s/%s/%u.conf", vminfo->nodename, type, vminfo->vmid);
205
206 return 1;
207 }
208
209 void cfs_clnode_destroy(
210 cfs_clnode_t *clnode)
211 {
212 g_return_if_fail(clnode != NULL);
213
214 if (clnode->kvhash)
215 g_hash_table_destroy(clnode->kvhash);
216
217 if (clnode->name)
218 g_free(clnode->name);
219
220 g_free(clnode);
221 }
222
223 cfs_clnode_t *cfs_clnode_new(
224 const char *name,
225 uint32_t nodeid,
226 uint32_t votes)
227 {
228 g_return_val_if_fail(name != NULL, NULL);
229
230 cfs_clnode_t *clnode = g_new0(cfs_clnode_t, 1);
231 if (!clnode)
232 return NULL;
233
234 clnode->name = g_strdup(name);
235 clnode->nodeid = nodeid;
236 clnode->votes = votes;
237
238 return clnode;
239 }
240
241 gboolean cfs_clinfo_destroy(
242 cfs_clinfo_t *clinfo)
243 {
244 g_return_val_if_fail(clinfo != NULL, FALSE);
245
246 if (clinfo->cluster_name)
247 g_free(clinfo->cluster_name);
248
249 if (clinfo->nodes_byname)
250 g_hash_table_destroy(clinfo->nodes_byname);
251
252 if (clinfo->nodes_byid)
253 g_hash_table_destroy(clinfo->nodes_byid);
254
255 g_free(clinfo);
256
257 return TRUE;
258 }
259
260 cfs_clinfo_t *cfs_clinfo_new(
261 const char *cluster_name,
262 uint32_t cman_version)
263 {
264 g_return_val_if_fail(cluster_name != NULL, NULL);
265
266 cfs_clinfo_t *clinfo = g_new0(cfs_clinfo_t, 1);
267 if (!clinfo)
268 return NULL;
269
270 clinfo->cluster_name = g_strdup(cluster_name);
271 clinfo->cman_version = cman_version;
272
273 if (!(clinfo->nodes_byid = g_hash_table_new_full(
274 g_int32_hash, g_int32_equal, NULL,
275 (GDestroyNotify)cfs_clnode_destroy)))
276 goto fail;
277
278 if (!(clinfo->nodes_byname = g_hash_table_new(g_str_hash, g_str_equal)))
279 goto fail;
280
281 return clinfo;
282
283 fail:
284 cfs_clinfo_destroy(clinfo);
285
286 return NULL;
287 }
288
289 gboolean cfs_clinfo_add_node(
290 cfs_clinfo_t *clinfo,
291 cfs_clnode_t *clnode)
292 {
293 g_return_val_if_fail(clinfo != NULL, FALSE);
294 g_return_val_if_fail(clnode != NULL, FALSE);
295
296 g_hash_table_replace(clinfo->nodes_byid, &clnode->nodeid, clnode);
297 g_hash_table_replace(clinfo->nodes_byname, clnode->name, clnode);
298
299 return TRUE;
300 }
301
302 int
303 cfs_create_memberlist_msg(
304 GString *str)
305 {
306 g_return_val_if_fail(str != NULL, -EINVAL);
307
308 g_mutex_lock (&mutex);
309
310 g_string_append_printf(str,"{\n");
311
312 guint nodecount = 0;
313
314 cfs_clinfo_t *clinfo = cfs_status.clinfo;
315
316 if (clinfo && clinfo->nodes_byid)
317 nodecount = g_hash_table_size(clinfo->nodes_byid);
318
319 if (nodecount) {
320 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
321 g_string_append_printf(str, "\"version\": %u,\n", cfs_status.clinfo_version);
322
323 g_string_append_printf(str, "\"cluster\": { ");
324 g_string_append_printf(str, "\"name\": \"%s\", \"version\": %d, "
325 "\"nodes\": %d, \"quorate\": %d ",
326 clinfo->cluster_name, clinfo->cman_version,
327 nodecount, cfs_status.quorate);
328
329 g_string_append_printf(str,"},\n");
330 g_string_append_printf(str,"\"nodelist\": {\n");
331
332 GHashTable *ht = clinfo->nodes_byid;
333 GHashTableIter iter;
334 gpointer key, value;
335
336 g_hash_table_iter_init (&iter, ht);
337
338 int i = 0;
339 while (g_hash_table_iter_next (&iter, &key, &value)) {
340 cfs_clnode_t *node = (cfs_clnode_t *)value;
341 if (i) g_string_append_printf(str, ",\n");
342 i++;
343
344 g_string_append_printf(str, " \"%s\": { \"id\": %d, \"online\": %d",
345 node->name, node->nodeid, node->online);
346
347
348 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, node->name);
349 if (ip) {
350 g_string_append_printf(str, ", \"ip\": \"%s\"", ip);
351 }
352
353 g_string_append_printf(str, "}");
354
355 }
356 g_string_append_printf(str,"\n }\n");
357 } else {
358 g_string_append_printf(str, "\"nodename\": \"%s\",\n", cfs.nodename);
359 g_string_append_printf(str, "\"version\": %u\n", cfs_status.clinfo_version);
360 }
361
362 g_string_append_printf(str,"}\n");
363
364 g_mutex_unlock (&mutex);
365
366 return 0;
367 }
368
369 static void
370 kventry_free(kventry_t *entry)
371 {
372 g_return_if_fail(entry != NULL);
373
374 g_free(entry->key);
375 g_free(entry->data);
376 g_free(entry);
377 }
378
379 static GHashTable *
380 kventry_hash_new(void)
381 {
382 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
383 (GDestroyNotify)kventry_free);
384 }
385
386 static void
387 rrdentry_free(rrdentry_t *entry)
388 {
389 g_return_if_fail(entry != NULL);
390
391 g_free(entry->key);
392 g_free(entry->data);
393 g_free(entry);
394 }
395
396 static GHashTable *
397 rrdentry_hash_new(void)
398 {
399 return g_hash_table_new_full(g_str_hash, g_str_equal, NULL,
400 (GDestroyNotify)rrdentry_free);
401 }
402
403 void
404 cfs_cluster_log_dump(GString *str, const char *user, guint max_entries)
405 {
406 clusterlog_dump(cfs_status.clusterlog, str, user, max_entries);
407 }
408
409 void
410 cfs_cluster_log(clog_entry_t *entry)
411 {
412 g_return_if_fail(entry != NULL);
413
414 clusterlog_insert(cfs_status.clusterlog, entry);
415
416 if (cfs_status.kvstore) {
417 struct iovec iov[1];
418 iov[0].iov_base = (char *)entry;
419 iov[0].iov_len = clog_entry_size(entry);
420
421 if (dfsm_is_initialized(cfs_status.kvstore))
422 dfsm_send_message(cfs_status.kvstore, KVSTORE_MESSAGE_LOG, iov, 1);
423 }
424 }
425
426 void cfs_status_init(void)
427 {
428 g_mutex_lock (&mutex);
429
430 cfs_status.start_time = time(NULL);
431
432 cfs_status.vmlist = vmlist_hash_new();
433
434 cfs_status.kvhash = kventry_hash_new();
435
436 cfs_status.rrdhash = rrdentry_hash_new();
437
438 cfs_status.iphash = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
439
440 cfs_status.memdb_changes = g_hash_table_new(g_str_hash, g_str_equal);
441
442 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
443 g_hash_table_replace(cfs_status.memdb_changes,
444 memdb_change_array[i].path,
445 &memdb_change_array[i]);
446 }
447
448 cfs_status.clusterlog = clusterlog_new();
449
450 // fixme:
451 clusterlog_add(cfs_status.clusterlog, "root", "cluster", getpid(),
452 LOG_INFO, "starting cluster log");
453
454 g_mutex_unlock (&mutex);
455 }
456
457 void cfs_status_cleanup(void)
458 {
459 g_mutex_lock (&mutex);
460
461 cfs_status.clinfo_version++;
462
463 if (cfs_status.clinfo) {
464 cfs_clinfo_destroy(cfs_status.clinfo);
465 cfs_status.clinfo = NULL;
466 }
467
468 if (cfs_status.vmlist) {
469 g_hash_table_destroy(cfs_status.vmlist);
470 cfs_status.vmlist = NULL;
471 }
472
473 if (cfs_status.kvhash) {
474 g_hash_table_destroy(cfs_status.kvhash);
475 cfs_status.kvhash = NULL;
476 }
477
478 if (cfs_status.rrdhash) {
479 g_hash_table_destroy(cfs_status.rrdhash);
480 cfs_status.rrdhash = NULL;
481 }
482
483 if (cfs_status.iphash) {
484 g_hash_table_destroy(cfs_status.iphash);
485 cfs_status.iphash = NULL;
486 }
487
488 if (cfs_status.clusterlog)
489 clusterlog_destroy(cfs_status.clusterlog);
490
491 g_mutex_unlock (&mutex);
492 }
493
494 void cfs_status_set_clinfo(
495 cfs_clinfo_t *clinfo)
496 {
497 g_return_if_fail(clinfo != NULL);
498
499 g_mutex_lock (&mutex);
500
501 cfs_status.clinfo_version++;
502
503 cfs_clinfo_t *old = cfs_status.clinfo;
504
505 cfs_status.clinfo = clinfo;
506
507 cfs_message("update cluster info (cluster name %s, version = %d)",
508 clinfo->cluster_name, clinfo->cman_version);
509
510
511 if (old && old->nodes_byid && clinfo->nodes_byid) {
512 /* copy kvstore */
513 GHashTable *ht = clinfo->nodes_byid;
514 GHashTableIter iter;
515 gpointer key, value;
516
517 g_hash_table_iter_init (&iter, ht);
518
519 while (g_hash_table_iter_next (&iter, &key, &value)) {
520 cfs_clnode_t *node = (cfs_clnode_t *)value;
521 cfs_clnode_t *oldnode;
522 if ((oldnode = g_hash_table_lookup(old->nodes_byid, key))) {
523 node->online = oldnode->online;
524 node->kvhash = oldnode->kvhash;
525 oldnode->kvhash = NULL;
526 }
527 }
528
529 }
530
531 if (old)
532 cfs_clinfo_destroy(old);
533
534
535 g_mutex_unlock (&mutex);
536 }
537
538 static void
539 dump_kvstore_versions(
540 GString *str,
541 GHashTable *kvhash,
542 const char *nodename)
543 {
544 g_return_if_fail(kvhash != NULL);
545 g_return_if_fail(str != NULL);
546 g_return_if_fail(nodename != NULL);
547
548 GHashTable *ht = kvhash;
549 GHashTableIter iter;
550 gpointer key, value;
551
552 g_string_append_printf(str, "\"%s\": {\n", nodename);
553
554 g_hash_table_iter_init (&iter, ht);
555
556 int i = 0;
557 while (g_hash_table_iter_next (&iter, &key, &value)) {
558 kventry_t *entry = (kventry_t *)value;
559 if (i) g_string_append_printf(str, ",\n");
560 i++;
561 g_string_append_printf(str,"\"%s\": %u", entry->key, entry->version);
562 }
563
564 g_string_append_printf(str, "}\n");
565 }
566
567 int
568 cfs_create_version_msg(GString *str)
569 {
570 g_return_val_if_fail(str != NULL, -EINVAL);
571
572 g_mutex_lock (&mutex);
573
574 g_string_append_printf(str,"{\n");
575
576 g_string_append_printf(str, "\"starttime\": %lu,\n", (unsigned long)cfs_status.start_time);
577
578 g_string_append_printf(str, "\"clinfo\": %u,\n", cfs_status.clinfo_version);
579
580 g_string_append_printf(str, "\"vmlist\": %u,\n", cfs_status.vmlist_version);
581
582 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
583 g_string_append_printf(str, "\"%s\": %u,\n",
584 memdb_change_array[i].path,
585 memdb_change_array[i].version);
586 }
587
588 g_string_append_printf(str, "\"kvstore\": {\n");
589
590 dump_kvstore_versions(str, cfs_status.kvhash, cfs.nodename);
591
592 cfs_clinfo_t *clinfo = cfs_status.clinfo;
593
594 if (clinfo && clinfo->nodes_byid) {
595 GHashTable *ht = clinfo->nodes_byid;
596 GHashTableIter iter;
597 gpointer key, value;
598
599 g_hash_table_iter_init (&iter, ht);
600
601 while (g_hash_table_iter_next (&iter, &key, &value)) {
602 cfs_clnode_t *node = (cfs_clnode_t *)value;
603 if (!node->kvhash)
604 continue;
605 g_string_append_printf(str, ",\n");
606 dump_kvstore_versions(str, node->kvhash, node->name);
607 }
608 }
609
610 g_string_append_printf(str,"}\n");
611
612 g_string_append_printf(str,"}\n");
613
614 g_mutex_unlock (&mutex);
615
616 return 0;
617 }
618
619 GHashTable *
620 vmlist_hash_new(void)
621 {
622 return g_hash_table_new_full(g_int_hash, g_int_equal, NULL,
623 (GDestroyNotify)vminfo_free);
624 }
625
626 gboolean
627 vmlist_hash_insert_vm(
628 GHashTable *vmlist,
629 int vmtype,
630 guint32 vmid,
631 const char *nodename,
632 gboolean replace)
633 {
634 g_return_val_if_fail(vmlist != NULL, FALSE);
635 g_return_val_if_fail(nodename != NULL, FALSE);
636 g_return_val_if_fail(vmid != 0, FALSE);
637 g_return_val_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
638 vmtype == VMTYPE_LXC, FALSE);
639
640 if (!replace && g_hash_table_lookup(vmlist, &vmid)) {
641 cfs_critical("detected duplicate VMID %d", vmid);
642 return FALSE;
643 }
644
645 vminfo_t *vminfo = g_new0(vminfo_t, 1);
646
647 vminfo->vmid = vmid;
648 vminfo->vmtype = vmtype;
649 vminfo->nodename = g_strdup(nodename);
650
651 vminfo->version = ++vminfo_version_counter;
652
653 g_hash_table_replace(vmlist, &vminfo->vmid, vminfo);
654
655 return TRUE;
656 }
657
658 void
659 vmlist_register_vm(
660 int vmtype,
661 guint32 vmid,
662 const char *nodename)
663 {
664 g_return_if_fail(cfs_status.vmlist != NULL);
665 g_return_if_fail(nodename != NULL);
666 g_return_if_fail(vmid != 0);
667 g_return_if_fail(vmtype == VMTYPE_QEMU || vmtype == VMTYPE_OPENVZ ||
668 vmtype == VMTYPE_LXC);
669
670 cfs_debug("vmlist_register_vm: %s/%u %d", nodename, vmid, vmtype);
671
672 g_mutex_lock (&mutex);
673
674 cfs_status.vmlist_version++;
675
676 vmlist_hash_insert_vm(cfs_status.vmlist, vmtype, vmid, nodename, TRUE);
677
678 g_mutex_unlock (&mutex);
679 }
680
681 gboolean
682 vmlist_different_vm_exists(
683 int vmtype,
684 guint32 vmid,
685 const char *nodename)
686 {
687 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
688 g_return_val_if_fail(vmid != 0, FALSE);
689
690 gboolean res = FALSE;
691
692 g_mutex_lock (&mutex);
693
694 vminfo_t *vminfo;
695 if ((vminfo = (vminfo_t *)g_hash_table_lookup(cfs_status.vmlist, &vmid))) {
696 if (!(vminfo->vmtype == vmtype && strcmp(vminfo->nodename, nodename) == 0))
697 res = TRUE;
698 }
699 g_mutex_unlock (&mutex);
700
701 return res;
702 }
703
704 gboolean
705 vmlist_vm_exists(
706 guint32 vmid)
707 {
708 g_return_val_if_fail(cfs_status.vmlist != NULL, FALSE);
709 g_return_val_if_fail(vmid != 0, FALSE);
710
711 g_mutex_lock (&mutex);
712
713 gpointer res = g_hash_table_lookup(cfs_status.vmlist, &vmid);
714
715 g_mutex_unlock (&mutex);
716
717 return res != NULL;
718 }
719
720 void
721 vmlist_delete_vm(
722 guint32 vmid)
723 {
724 g_return_if_fail(cfs_status.vmlist != NULL);
725 g_return_if_fail(vmid != 0);
726
727 g_mutex_lock (&mutex);
728
729 cfs_status.vmlist_version++;
730
731 g_hash_table_remove(cfs_status.vmlist, &vmid);
732
733 g_mutex_unlock (&mutex);
734 }
735
736 void cfs_status_set_vmlist(
737 GHashTable *vmlist)
738 {
739 g_return_if_fail(vmlist != NULL);
740
741 g_mutex_lock (&mutex);
742
743 cfs_status.vmlist_version++;
744
745 if (cfs_status.vmlist)
746 g_hash_table_destroy(cfs_status.vmlist);
747
748 cfs_status.vmlist = vmlist;
749
750 g_mutex_unlock (&mutex);
751 }
752
753 int
754 cfs_create_vmlist_msg(GString *str)
755 {
756 g_return_val_if_fail(cfs_status.vmlist != NULL, -EINVAL);
757 g_return_val_if_fail(str != NULL, -EINVAL);
758
759 g_mutex_lock (&mutex);
760
761 g_string_append_printf(str,"{\n");
762
763 GHashTable *ht = cfs_status.vmlist;
764
765 guint count = g_hash_table_size(ht);
766
767 if (!count) {
768 g_string_append_printf(str,"\"version\": %u\n", cfs_status.vmlist_version);
769 } else {
770 g_string_append_printf(str,"\"version\": %u,\n", cfs_status.vmlist_version);
771
772 g_string_append_printf(str,"\"ids\": {\n");
773
774 GHashTableIter iter;
775 gpointer key, value;
776
777 g_hash_table_iter_init (&iter, ht);
778
779 int first = 1;
780 while (g_hash_table_iter_next (&iter, &key, &value)) {
781 vminfo_t *vminfo = (vminfo_t *)value;
782 const char *type = vminfo_type_to_string(vminfo);
783
784 if (!first)
785 g_string_append_printf(str, ",\n");
786 first = 0;
787
788 g_string_append_printf(str,"\"%u\": { \"node\": \"%s\", \"type\": \"%s\", \"version\": %u }",
789 vminfo->vmid, vminfo->nodename, type, vminfo->version);
790 }
791
792 g_string_append_printf(str,"}\n");
793 }
794 g_string_append_printf(str,"\n}\n");
795
796 g_mutex_unlock (&mutex);
797
798 return 0;
799 }
800
801 // checks the conf for a line starting with '$prop:' and returns the value
802 // afterwards, whitout initial whitespace(s), we only deal with the format
803 // restricion imposed by our perl VM config parser, main reference is
804 // PVE::QemuServer::parse_vm_config this allows to be very fast and still
805 // relatively simple
806 // main restrictions used for our advantage is the properties match reges:
807 // ($line =~ m/^([a-z][a-z_]*\d*):\s*(.+?)\s*$/) from parse_vm_config
808 // currently we only look at the current configuration in place, i.e., *no*
809 // snapshort and *no* pending changes
810 static char *
811 _get_property_value(char *conf, int conf_size, const char *prop, int prop_len)
812 {
813 const char *const conf_end = conf + conf_size;
814 char *line = conf;
815 size_t remaining_size;
816
817 char *next_newline = memchr(conf, '\n', conf_size);
818 if (next_newline == NULL) {
819 return NULL; // valid property lines end with \n, but none in the config
820 }
821 *next_newline = '\0';
822
823 while (line != NULL) {
824 if (!line[0]) goto next;
825
826 // snapshot or pending section start, but nothing found yet -> not found
827 if (line[0] == '[') return NULL;
828 // properties start with /^[a-z]/, so continue early if not
829 if (line[0] < 'a' || line[0] > 'z') goto next;
830
831 int line_len = strlen(line);
832 if (line_len <= prop_len + 1) goto next;
833
834 if (line[prop_len] == ':' && memcmp(line, prop, prop_len) == 0) { // found
835 char *v_start = &line[prop_len + 1];
836
837 // drop initial value whitespaces here already
838 while (*v_start && isspace(*v_start)) v_start++;
839
840 if (!*v_start) return NULL;
841
842 char *v_end = &line[line_len - 1];
843 while (v_end > v_start && isspace(*v_end)) v_end--;
844 v_end[1] = '\0';
845
846 return v_start;
847 }
848 next:
849 line = next_newline + 1;
850 remaining_size = conf_end - line;
851 if (remaining_size <= prop_len) {
852 return NULL;
853 }
854 next_newline = memchr(line, '\n', remaining_size);
855 if (next_newline == NULL) {
856 return NULL; // valid property lines end with \n, but none in the config
857 }
858 *next_newline = '\0';
859 }
860
861 return NULL; // not found
862 }
863
864 static void
865 _g_str_append_kv_jsonescaped(GString *str, const char *k, const char *v)
866 {
867 g_string_append_printf(str, "\"%s\": \"", k);
868
869 for (; *v; v++) {
870 if (*v == '\\' || *v == '"') {
871 g_string_append_c(str, '\\');
872 }
873 g_string_append_c(str, *v);
874 }
875
876 g_string_append_c(str, '"');
877 }
878
879 int
880 cfs_create_guest_conf_property_msg(GString *str, memdb_t *memdb, const char *prop, uint32_t vmid)
881 {
882 g_return_val_if_fail(cfs_status.vmlist != NULL, -EINVAL);
883 g_return_val_if_fail(str != NULL, -EINVAL);
884
885 int prop_len = strlen(prop);
886 int res = 0;
887 GString *path = NULL;
888
889 // Prelock &memdb->mutex in order to enable the usage of memdb_read_nolock
890 // to prevent Deadlocks as in #2553
891 g_mutex_lock (&memdb->mutex);
892 g_mutex_lock (&mutex);
893
894 g_string_printf(str,"{\n");
895
896 GHashTable *ht = cfs_status.vmlist;
897 gpointer tmp = NULL;
898 if (!g_hash_table_size(ht)) {
899 goto ret;
900 }
901
902 path = g_string_sized_new(256);
903 if (vmid >= 100) {
904 vminfo_t *vminfo = (vminfo_t *) g_hash_table_lookup(cfs_status.vmlist, &vmid);
905 if (vminfo == NULL) goto enoent;
906
907 if (!vminfo_to_path(vminfo, path)) goto err;
908
909 // use memdb_read_nolock because lock is handled here
910 int size = memdb_read_nolock(memdb, path->str, &tmp);
911 if (tmp == NULL) goto err;
912 if (size <= prop_len) goto ret;
913
914 char *val = _get_property_value(tmp, size, prop, prop_len);
915 if (val == NULL) goto ret;
916
917 g_string_append_printf(str, "\"%u\":{", vmid);
918 _g_str_append_kv_jsonescaped(str, prop, val);
919 g_string_append_c(str, '}');
920
921 } else {
922 GHashTableIter iter;
923 g_hash_table_iter_init (&iter, ht);
924
925 gpointer key, value;
926 int first = 1;
927 while (g_hash_table_iter_next (&iter, &key, &value)) {
928 vminfo_t *vminfo = (vminfo_t *)value;
929
930 if (!vminfo_to_path(vminfo, path)) goto err;
931
932 g_free(tmp); // no-op if already null
933 tmp = NULL;
934 // use memdb_read_nolock because lock is handled here
935 int size = memdb_read_nolock(memdb, path->str, &tmp);
936 if (tmp == NULL || size <= prop_len) continue;
937
938 char *val = _get_property_value(tmp, size, prop, prop_len);
939 if (val == NULL) continue;
940
941 if (!first) g_string_append_printf(str, ",\n");
942 else first = 0;
943
944 g_string_append_printf(str, "\"%u\":{", vminfo->vmid);
945 _g_str_append_kv_jsonescaped(str, prop, val);
946 g_string_append_c(str, '}');
947 }
948 }
949 ret:
950 g_free(tmp);
951 if (path != NULL) {
952 g_string_free(path, TRUE);
953 }
954 g_string_append_printf(str,"\n}\n");
955 g_mutex_unlock (&mutex);
956 g_mutex_unlock (&memdb->mutex);
957 return res;
958 err:
959 res = -EIO;
960 goto ret;
961 enoent:
962 res = -ENOENT;
963 goto ret;
964 }
965
966 void
967 record_memdb_change(const char *path)
968 {
969 g_return_if_fail(cfs_status.memdb_changes != 0);
970
971 memdb_change_t *ce;
972
973 if ((ce = (memdb_change_t *)g_hash_table_lookup(cfs_status.memdb_changes, path))) {
974 ce->version++;
975 }
976 }
977
978 void
979 record_memdb_reload(void)
980 {
981 for (int i = 0; i < G_N_ELEMENTS(memdb_change_array); i++) {
982 memdb_change_array[i].version++;
983 }
984 }
985
986 static gboolean
987 kventry_hash_set(
988 GHashTable *kvhash,
989 const char *key,
990 gconstpointer data,
991 size_t len)
992 {
993 g_return_val_if_fail(kvhash != NULL, FALSE);
994 g_return_val_if_fail(key != NULL, FALSE);
995 g_return_val_if_fail(data != NULL, FALSE);
996
997 kventry_t *entry;
998 if (!len) {
999 g_hash_table_remove(kvhash, key);
1000 } else if ((entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
1001 g_free(entry->data);
1002 entry->data = g_memdup(data, len);
1003 entry->len = len;
1004 entry->version++;
1005 } else {
1006 kventry_t *entry = g_new0(kventry_t, 1);
1007
1008 entry->key = g_strdup(key);
1009 entry->data = g_memdup(data, len);
1010 entry->len = len;
1011
1012 g_hash_table_replace(kvhash, entry->key, entry);
1013 }
1014
1015 return TRUE;
1016 }
1017
1018 static const char *rrd_def_node[] = {
1019 "DS:loadavg:GAUGE:120:0:U",
1020 "DS:maxcpu:GAUGE:120:0:U",
1021 "DS:cpu:GAUGE:120:0:U",
1022 "DS:iowait:GAUGE:120:0:U",
1023 "DS:memtotal:GAUGE:120:0:U",
1024 "DS:memused:GAUGE:120:0:U",
1025 "DS:swaptotal:GAUGE:120:0:U",
1026 "DS:swapused:GAUGE:120:0:U",
1027 "DS:roottotal:GAUGE:120:0:U",
1028 "DS:rootused:GAUGE:120:0:U",
1029 "DS:netin:DERIVE:120:0:U",
1030 "DS:netout:DERIVE:120:0:U",
1031
1032 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
1033 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
1034 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
1035 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
1036 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
1037
1038 "RRA:MAX:0.5:1:70", // 1 min max - one hour
1039 "RRA:MAX:0.5:30:70", // 30 min max - one day
1040 "RRA:MAX:0.5:180:70", // 3 hour max - one week
1041 "RRA:MAX:0.5:720:70", // 12 hour max - one month
1042 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
1043 NULL,
1044 };
1045
1046 static const char *rrd_def_vm[] = {
1047 "DS:maxcpu:GAUGE:120:0:U",
1048 "DS:cpu:GAUGE:120:0:U",
1049 "DS:maxmem:GAUGE:120:0:U",
1050 "DS:mem:GAUGE:120:0:U",
1051 "DS:maxdisk:GAUGE:120:0:U",
1052 "DS:disk:GAUGE:120:0:U",
1053 "DS:netin:DERIVE:120:0:U",
1054 "DS:netout:DERIVE:120:0:U",
1055 "DS:diskread:DERIVE:120:0:U",
1056 "DS:diskwrite:DERIVE:120:0:U",
1057
1058 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
1059 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
1060 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
1061 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
1062 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
1063
1064 "RRA:MAX:0.5:1:70", // 1 min max - one hour
1065 "RRA:MAX:0.5:30:70", // 30 min max - one day
1066 "RRA:MAX:0.5:180:70", // 3 hour max - one week
1067 "RRA:MAX:0.5:720:70", // 12 hour max - one month
1068 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
1069 NULL,
1070 };
1071
1072 static const char *rrd_def_storage[] = {
1073 "DS:total:GAUGE:120:0:U",
1074 "DS:used:GAUGE:120:0:U",
1075
1076 "RRA:AVERAGE:0.5:1:70", // 1 min avg - one hour
1077 "RRA:AVERAGE:0.5:30:70", // 30 min avg - one day
1078 "RRA:AVERAGE:0.5:180:70", // 3 hour avg - one week
1079 "RRA:AVERAGE:0.5:720:70", // 12 hour avg - one month
1080 "RRA:AVERAGE:0.5:10080:70", // 7 day avg - ony year
1081
1082 "RRA:MAX:0.5:1:70", // 1 min max - one hour
1083 "RRA:MAX:0.5:30:70", // 30 min max - one day
1084 "RRA:MAX:0.5:180:70", // 3 hour max - one week
1085 "RRA:MAX:0.5:720:70", // 12 hour max - one month
1086 "RRA:MAX:0.5:10080:70", // 7 day max - ony year
1087 NULL,
1088 };
1089
1090 #define RRDDIR "/var/lib/rrdcached/db"
1091
1092 static void
1093 create_rrd_file(
1094 const char *filename,
1095 int argcount,
1096 const char *rrddef[])
1097 {
1098 /* start at day boundary */
1099 time_t ctime;
1100 time(&ctime);
1101 struct tm *ltm = localtime(&ctime);
1102 ltm->tm_sec = 0;
1103 ltm->tm_min = 0;
1104 ltm->tm_hour = 0;
1105
1106 rrd_clear_error();
1107 if (rrd_create_r(filename, 60, timelocal(ltm), argcount, rrddef)) {
1108 cfs_message("RRD create error %s: %s", filename, rrd_get_error());
1109 }
1110 }
1111
1112 static inline const char *
1113 rrd_skip_data(
1114 const char *data,
1115 int count)
1116 {
1117 int found = 0;
1118 while (*data && found < count) {
1119 if (*data++ == ':')
1120 found++;
1121 }
1122 return data;
1123 }
1124
1125 static void
1126 update_rrd_data(
1127 const char *key,
1128 gconstpointer data,
1129 size_t len)
1130 {
1131 g_return_if_fail(key != NULL);
1132 g_return_if_fail(data != NULL);
1133 g_return_if_fail(len > 0);
1134 g_return_if_fail(len < 4096);
1135
1136 static const char *rrdcsock = "unix:/var/run/rrdcached.sock";
1137
1138 int use_daemon = 1;
1139 if (rrdc_connect(rrdcsock) != 0)
1140 use_daemon = 0;
1141
1142 char *filename = NULL;
1143
1144 int skip = 0;
1145
1146 if (strncmp(key, "pve2-node/", 10) == 0) {
1147 const char *node = key + 10;
1148
1149 skip = 2;
1150
1151 if (strchr(node, '/') != NULL)
1152 goto keyerror;
1153
1154 if (strlen(node) < 1)
1155 goto keyerror;
1156
1157 filename = g_strdup_printf(RRDDIR "/%s", key);
1158
1159 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1160
1161 mkdir(RRDDIR "/pve2-node", 0755);
1162 int argcount = sizeof(rrd_def_node)/sizeof(void*) - 1;
1163 create_rrd_file(filename, argcount, rrd_def_node);
1164 }
1165
1166 } else if ((strncmp(key, "pve2-vm/", 8) == 0) ||
1167 (strncmp(key, "pve2.3-vm/", 10) == 0)) {
1168 const char *vmid;
1169
1170 if (strncmp(key, "pve2-vm/", 8) == 0) {
1171 vmid = key + 8;
1172 skip = 2;
1173 } else {
1174 vmid = key + 10;
1175 skip = 4;
1176 }
1177
1178 if (strchr(vmid, '/') != NULL)
1179 goto keyerror;
1180
1181 if (strlen(vmid) < 1)
1182 goto keyerror;
1183
1184 filename = g_strdup_printf(RRDDIR "/%s/%s", "pve2-vm", vmid);
1185
1186 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1187
1188 mkdir(RRDDIR "/pve2-vm", 0755);
1189 int argcount = sizeof(rrd_def_vm)/sizeof(void*) - 1;
1190 create_rrd_file(filename, argcount, rrd_def_vm);
1191 }
1192
1193 } else if (strncmp(key, "pve2-storage/", 13) == 0) {
1194 const char *node = key + 13;
1195
1196 const char *storage = node;
1197 while (*storage && *storage != '/')
1198 storage++;
1199
1200 if (*storage != '/' || ((storage - node) < 1))
1201 goto keyerror;
1202
1203 storage++;
1204
1205 if (strchr(storage, '/') != NULL)
1206 goto keyerror;
1207
1208 if (strlen(storage) < 1)
1209 goto keyerror;
1210
1211 filename = g_strdup_printf(RRDDIR "/%s", key);
1212
1213 if (!g_file_test(filename, G_FILE_TEST_EXISTS)) {
1214
1215 mkdir(RRDDIR "/pve2-storage", 0755);
1216
1217 char *dir = g_path_get_dirname(filename);
1218 mkdir(dir, 0755);
1219 g_free(dir);
1220
1221 int argcount = sizeof(rrd_def_storage)/sizeof(void*) - 1;
1222 create_rrd_file(filename, argcount, rrd_def_storage);
1223 }
1224
1225 } else {
1226 goto keyerror;
1227 }
1228
1229 const char *dp = skip ? rrd_skip_data(data, skip) : data;
1230
1231 const char *update_args[] = { dp, NULL };
1232
1233 if (use_daemon) {
1234 int status;
1235 if ((status = rrdc_update(filename, 1, update_args)) != 0) {
1236 cfs_message("RRDC update error %s: %d", filename, status);
1237 rrdc_disconnect();
1238 rrd_clear_error();
1239 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1240 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1241 }
1242 }
1243
1244 } else {
1245 rrd_clear_error();
1246 if (rrd_update_r(filename, NULL, 1, update_args) != 0) {
1247 cfs_message("RRD update error %s: %s", filename, rrd_get_error());
1248 }
1249 }
1250
1251 ret:
1252 if (filename)
1253 g_free(filename);
1254
1255 return;
1256
1257 keyerror:
1258 cfs_critical("RRD update error: unknown/wrong key %s", key);
1259 goto ret;
1260 }
1261
1262 static gboolean
1263 rrd_entry_is_old(
1264 gpointer key,
1265 gpointer value,
1266 gpointer user_data)
1267 {
1268 rrdentry_t *entry = (rrdentry_t *)value;
1269 uint32_t ctime = GPOINTER_TO_UINT(user_data);
1270
1271 int diff = ctime - entry->time;
1272
1273 /* remove everything older than 5 minutes */
1274 int expire = 60*5;
1275
1276 return (diff > expire) ? TRUE : FALSE;
1277 }
1278
1279 static char *rrd_dump_buf = NULL;
1280 static time_t rrd_dump_last = 0;
1281
1282 void
1283 cfs_rrd_dump(GString *str)
1284 {
1285 time_t ctime;
1286
1287 g_mutex_lock (&mutex);
1288
1289 time(&ctime);
1290 if (rrd_dump_buf && (ctime - rrd_dump_last) < 2) {
1291 g_string_assign(str, rrd_dump_buf);
1292 g_mutex_unlock (&mutex);
1293 return;
1294 }
1295
1296 /* remove old data */
1297 g_hash_table_foreach_remove(cfs_status.rrdhash, rrd_entry_is_old,
1298 GUINT_TO_POINTER(ctime));
1299
1300 g_string_set_size(str, 0);
1301
1302 GHashTableIter iter;
1303 gpointer key, value;
1304
1305 g_hash_table_iter_init (&iter, cfs_status.rrdhash);
1306
1307 while (g_hash_table_iter_next (&iter, &key, &value)) {
1308 rrdentry_t *entry = (rrdentry_t *)value;
1309 g_string_append(str, key);
1310 g_string_append(str, ":");
1311 g_string_append(str, entry->data);
1312 g_string_append(str, "\n");
1313 }
1314
1315 g_string_append_c(str, 0); // never return undef
1316
1317 rrd_dump_last = ctime;
1318 if (rrd_dump_buf)
1319 g_free(rrd_dump_buf);
1320 rrd_dump_buf = g_strdup(str->str);
1321
1322 g_mutex_unlock (&mutex);
1323 }
1324
1325 static gboolean
1326 nodeip_hash_set(
1327 GHashTable *iphash,
1328 const char *nodename,
1329 const char *ip,
1330 size_t len)
1331 {
1332 g_return_val_if_fail(iphash != NULL, FALSE);
1333 g_return_val_if_fail(nodename != NULL, FALSE);
1334 g_return_val_if_fail(ip != NULL, FALSE);
1335 g_return_val_if_fail(len > 0, FALSE);
1336 g_return_val_if_fail(len < 256, FALSE);
1337 g_return_val_if_fail(ip[len-1] == 0, FALSE);
1338
1339 char *oldip = (char *)g_hash_table_lookup(iphash, nodename);
1340
1341 if (!oldip || (strcmp(oldip, ip) != 0)) {
1342 cfs_status.clinfo_version++;
1343 g_hash_table_replace(iphash, g_strdup(nodename), g_strdup(ip));
1344 }
1345
1346 return TRUE;
1347 }
1348
1349 static gboolean
1350 rrdentry_hash_set(
1351 GHashTable *rrdhash,
1352 const char *key,
1353 const char *data,
1354 size_t len)
1355 {
1356 g_return_val_if_fail(rrdhash != NULL, FALSE);
1357 g_return_val_if_fail(key != NULL, FALSE);
1358 g_return_val_if_fail(data != NULL, FALSE);
1359 g_return_val_if_fail(len > 0, FALSE);
1360 g_return_val_if_fail(len < 4096, FALSE);
1361 g_return_val_if_fail(data[len-1] == 0, FALSE);
1362
1363 rrdentry_t *entry;
1364 if ((entry = (rrdentry_t *)g_hash_table_lookup(rrdhash, key))) {
1365 g_free(entry->data);
1366 entry->data = g_memdup(data, len);
1367 entry->len = len;
1368 entry->time = time(NULL);
1369 } else {
1370 rrdentry_t *entry = g_new0(rrdentry_t, 1);
1371
1372 entry->key = g_strdup(key);
1373 entry->data = g_memdup(data, len);
1374 entry->len = len;
1375 entry->time = time(NULL);
1376
1377 g_hash_table_replace(rrdhash, entry->key, entry);
1378 }
1379
1380 update_rrd_data(key, data, len);
1381
1382 return TRUE;
1383 }
1384
1385 static int
1386 kvstore_send_update_message(
1387 dfsm_t *dfsm,
1388 const char *key,
1389 gpointer data,
1390 guint32 len)
1391 {
1392 if (!dfsm_is_initialized(dfsm))
1393 return -EACCES;
1394
1395 struct iovec iov[2];
1396
1397 char name[256];
1398 g_strlcpy(name, key, sizeof(name));
1399
1400 iov[0].iov_base = &name;
1401 iov[0].iov_len = sizeof(name);
1402
1403 iov[1].iov_base = (char *)data;
1404 iov[1].iov_len = len;
1405
1406 if (dfsm_send_message(dfsm, KVSTORE_MESSAGE_UPDATE, iov, 2) == CS_OK)
1407 return 0;
1408
1409 return -EACCES;
1410 }
1411
1412 static clog_entry_t *
1413 kvstore_parse_log_message(
1414 const void *msg,
1415 size_t msg_len)
1416 {
1417 g_return_val_if_fail(msg != NULL, NULL);
1418
1419 if (msg_len < sizeof(clog_entry_t)) {
1420 cfs_critical("received short log message (%zu < %zu)", msg_len, sizeof(clog_entry_t));
1421 return NULL;
1422 }
1423
1424 clog_entry_t *entry = (clog_entry_t *)msg;
1425
1426 uint32_t size = sizeof(clog_entry_t) + entry->node_len +
1427 entry->ident_len + entry->tag_len + entry->msg_len;
1428
1429 if (msg_len != size) {
1430 cfs_critical("received log message with wrong size (%zu != %u)", msg_len, size);
1431 return NULL;
1432 }
1433
1434 char *msgptr = entry->data;
1435
1436 if (*((char *)msgptr + entry->node_len - 1)) {
1437 cfs_critical("unterminated string in log message");
1438 return NULL;
1439 }
1440 msgptr += entry->node_len;
1441
1442 if (*((char *)msgptr + entry->ident_len - 1)) {
1443 cfs_critical("unterminated string in log message");
1444 return NULL;
1445 }
1446 msgptr += entry->ident_len;
1447
1448 if (*((char *)msgptr + entry->tag_len - 1)) {
1449 cfs_critical("unterminated string in log message");
1450 return NULL;
1451 }
1452 msgptr += entry->tag_len;
1453
1454 if (*((char *)msgptr + entry->msg_len - 1)) {
1455 cfs_critical("unterminated string in log message");
1456 return NULL;
1457 }
1458
1459 return entry;
1460 }
1461
1462 static gboolean
1463 kvstore_parse_update_message(
1464 const void *msg,
1465 size_t msg_len,
1466 const char **key,
1467 gconstpointer *data,
1468 guint32 *len)
1469 {
1470 g_return_val_if_fail(msg != NULL, FALSE);
1471 g_return_val_if_fail(key != NULL, FALSE);
1472 g_return_val_if_fail(data != NULL, FALSE);
1473 g_return_val_if_fail(len != NULL, FALSE);
1474
1475 if (msg_len < 256) {
1476 cfs_critical("received short kvstore message (%zu < 256)", msg_len);
1477 return FALSE;
1478 }
1479
1480 /* test if key is null terminated */
1481 int i = 0;
1482 for (i = 0; i < 256; i++)
1483 if (((char *)msg)[i] == 0)
1484 break;
1485
1486 if (i == 256)
1487 return FALSE;
1488
1489
1490 *len = msg_len - 256;
1491 *key = msg;
1492 *data = (char *) msg + 256;
1493
1494 return TRUE;
1495 }
1496
1497 int
1498 cfs_create_status_msg(
1499 GString *str,
1500 const char *nodename,
1501 const char *key)
1502 {
1503 g_return_val_if_fail(str != NULL, -EINVAL);
1504 g_return_val_if_fail(key != NULL, -EINVAL);
1505
1506 int res = -ENOENT;
1507
1508 GHashTable *kvhash = NULL;
1509
1510 g_mutex_lock (&mutex);
1511
1512 if (!nodename || !nodename[0] || !strcmp(nodename, cfs.nodename)) {
1513 kvhash = cfs_status.kvhash;
1514 } else if (cfs_status.clinfo && cfs_status.clinfo->nodes_byname) {
1515 cfs_clnode_t *clnode;
1516 if ((clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byname, nodename)))
1517 kvhash = clnode->kvhash;
1518 }
1519
1520 kventry_t *entry;
1521 if (kvhash && (entry = (kventry_t *)g_hash_table_lookup(kvhash, key))) {
1522 g_string_append_len(str, entry->data, entry->len);
1523 res = 0;
1524 }
1525
1526 g_mutex_unlock (&mutex);
1527
1528 return res;
1529 }
1530
1531 int
1532 cfs_status_set(
1533 const char *key,
1534 gpointer data,
1535 size_t len)
1536 {
1537 g_return_val_if_fail(key != NULL, FALSE);
1538 g_return_val_if_fail(data != NULL, FALSE);
1539 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1540
1541 if (len > CFS_MAX_STATUS_SIZE)
1542 return -EFBIG;
1543
1544 g_mutex_lock (&mutex);
1545
1546 gboolean res;
1547
1548 if (strncmp(key, "rrd/", 4) == 0) {
1549 res = rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1550 } else if (!strcmp(key, "nodeip")) {
1551 res = nodeip_hash_set(cfs_status.iphash, cfs.nodename, data, len);
1552 } else {
1553 res = kventry_hash_set(cfs_status.kvhash, key, data, len);
1554 }
1555 g_mutex_unlock (&mutex);
1556
1557 if (cfs_status.kvstore)
1558 kvstore_send_update_message(cfs_status.kvstore, key, data, len);
1559
1560 return res ? 0 : -ENOMEM;
1561 }
1562
1563 gboolean
1564 cfs_kvstore_node_set(
1565 uint32_t nodeid,
1566 const char *key,
1567 gconstpointer data,
1568 size_t len)
1569 {
1570 g_return_val_if_fail(nodeid != 0, FALSE);
1571 g_return_val_if_fail(key != NULL, FALSE);
1572 g_return_val_if_fail(data != NULL, FALSE);
1573
1574 g_mutex_lock (&mutex);
1575
1576 if (!cfs_status.clinfo || !cfs_status.clinfo->nodes_byid)
1577 goto ret; /* ignore */
1578
1579 cfs_clnode_t *clnode = g_hash_table_lookup(cfs_status.clinfo->nodes_byid, &nodeid);
1580 if (!clnode)
1581 goto ret; /* ignore */
1582
1583 cfs_debug("got node %d status update %s", nodeid, key);
1584
1585 if (strncmp(key, "rrd/", 4) == 0) {
1586 rrdentry_hash_set(cfs_status.rrdhash, key + 4, data, len);
1587 } else if (!strcmp(key, "nodeip")) {
1588 nodeip_hash_set(cfs_status.iphash, clnode->name, data, len);
1589 } else {
1590 if (!clnode->kvhash) {
1591 if (!(clnode->kvhash = kventry_hash_new())) {
1592 goto ret; /*ignore */
1593 }
1594 }
1595
1596 kventry_hash_set(clnode->kvhash, key, data, len);
1597
1598 }
1599 ret:
1600 g_mutex_unlock (&mutex);
1601
1602 return TRUE;
1603 }
1604
1605 static gboolean
1606 cfs_kvstore_sync(void)
1607 {
1608 g_return_val_if_fail(cfs_status.kvhash != NULL, FALSE);
1609 g_return_val_if_fail(cfs_status.kvstore != NULL, FALSE);
1610
1611 gboolean res = TRUE;
1612
1613 g_mutex_lock (&mutex);
1614
1615 GHashTable *ht = cfs_status.kvhash;
1616 GHashTableIter iter;
1617 gpointer key, value;
1618
1619 g_hash_table_iter_init (&iter, ht);
1620
1621 while (g_hash_table_iter_next (&iter, &key, &value)) {
1622 kventry_t *entry = (kventry_t *)value;
1623 kvstore_send_update_message(cfs_status.kvstore, entry->key, entry->data, entry->len);
1624 }
1625
1626 g_mutex_unlock (&mutex);
1627
1628 return res;
1629 }
1630
1631 static int
1632 dfsm_deliver(
1633 dfsm_t *dfsm,
1634 gpointer data,
1635 int *res_ptr,
1636 uint32_t nodeid,
1637 uint32_t pid,
1638 uint16_t msg_type,
1639 uint32_t msg_time,
1640 const void *msg,
1641 size_t msg_len)
1642 {
1643 g_return_val_if_fail(dfsm != NULL, -1);
1644 g_return_val_if_fail(msg != NULL, -1);
1645 g_return_val_if_fail(res_ptr != NULL, -1);
1646
1647 /* ignore message for ourself */
1648 if (dfsm_nodeid_is_local(dfsm, nodeid, pid))
1649 goto ret;
1650
1651 if (msg_type == KVSTORE_MESSAGE_UPDATE) {
1652 const char *key;
1653 gconstpointer data;
1654 guint32 len;
1655 if (kvstore_parse_update_message(msg, msg_len, &key, &data, &len)) {
1656 cfs_kvstore_node_set(nodeid, key, data, len);
1657 } else {
1658 cfs_critical("cant parse update message");
1659 }
1660 } else if (msg_type == KVSTORE_MESSAGE_LOG) {
1661 cfs_message("received log"); // fixme: remove
1662 const clog_entry_t *entry;
1663 if ((entry = kvstore_parse_log_message(msg, msg_len))) {
1664 clusterlog_insert(cfs_status.clusterlog, entry);
1665 } else {
1666 cfs_critical("cant parse log message");
1667 }
1668 } else {
1669 cfs_critical("received unknown message type %d\n", msg_type);
1670 goto fail;
1671 }
1672
1673 ret:
1674 *res_ptr = 0;
1675 return 1;
1676
1677 fail:
1678 *res_ptr = -EACCES;
1679 return 1;
1680 }
1681
1682 static void
1683 dfsm_confchg(
1684 dfsm_t *dfsm,
1685 gpointer data,
1686 const struct cpg_address *member_list,
1687 size_t member_list_entries)
1688 {
1689 g_return_if_fail(dfsm != NULL);
1690 g_return_if_fail(member_list != NULL);
1691
1692 cfs_debug("enter %s", __func__);
1693
1694 g_mutex_lock (&mutex);
1695
1696 cfs_clinfo_t *clinfo = cfs_status.clinfo;
1697
1698 if (clinfo && clinfo->nodes_byid) {
1699
1700 GHashTable *ht = clinfo->nodes_byid;
1701 GHashTableIter iter;
1702 gpointer key, value;
1703
1704 g_hash_table_iter_init (&iter, ht);
1705
1706 while (g_hash_table_iter_next (&iter, &key, &value)) {
1707 cfs_clnode_t *node = (cfs_clnode_t *)value;
1708 node->online = FALSE;
1709 }
1710
1711 for (int i = 0; i < member_list_entries; i++) {
1712 cfs_clnode_t *node;
1713 if ((node = g_hash_table_lookup(clinfo->nodes_byid, &member_list[i].nodeid))) {
1714 node->online = TRUE;
1715 }
1716 }
1717
1718 cfs_status.clinfo_version++;
1719 }
1720
1721 g_mutex_unlock (&mutex);
1722 }
1723
1724 static gpointer
1725 dfsm_get_state(
1726 dfsm_t *dfsm,
1727 gpointer data,
1728 unsigned int *res_len)
1729 {
1730 g_return_val_if_fail(dfsm != NULL, NULL);
1731
1732 gpointer msg = clusterlog_get_state(cfs_status.clusterlog, res_len);
1733
1734 return msg;
1735 }
1736
1737 static int
1738 dfsm_process_update(
1739 dfsm_t *dfsm,
1740 gpointer data,
1741 dfsm_sync_info_t *syncinfo,
1742 uint32_t nodeid,
1743 uint32_t pid,
1744 const void *msg,
1745 size_t msg_len)
1746 {
1747 cfs_critical("%s: received unexpected update message", __func__);
1748
1749 return -1;
1750 }
1751
1752 static int
1753 dfsm_process_state_update(
1754 dfsm_t *dfsm,
1755 gpointer data,
1756 dfsm_sync_info_t *syncinfo)
1757 {
1758 g_return_val_if_fail(dfsm != NULL, -1);
1759 g_return_val_if_fail(syncinfo != NULL, -1);
1760
1761 clog_base_t *clog[syncinfo->node_count];
1762
1763 int local_index = -1;
1764 for (int i = 0; i < syncinfo->node_count; i++) {
1765 dfsm_node_info_t *ni = &syncinfo->nodes[i];
1766 ni->synced = 1;
1767
1768 if (syncinfo->local == ni)
1769 local_index = i;
1770
1771 clog_base_t *base = (clog_base_t *)ni->state;
1772 if (ni->state_len > 8 && ni->state_len == clog_size(base)) {
1773 clog[i] = ni->state;
1774 } else {
1775 cfs_critical("received log with wrong size %u", ni->state_len);
1776 clog[i] = NULL;
1777 }
1778 }
1779
1780 if (!clusterlog_merge(cfs_status.clusterlog, clog, syncinfo->node_count, local_index)) {
1781 cfs_critical("unable to merge log files");
1782 }
1783
1784 cfs_kvstore_sync();
1785
1786 return 1;
1787 }
1788
1789 static int
1790 dfsm_commit(
1791 dfsm_t *dfsm,
1792 gpointer data,
1793 dfsm_sync_info_t *syncinfo)
1794 {
1795 g_return_val_if_fail(dfsm != NULL, -1);
1796 g_return_val_if_fail(syncinfo != NULL, -1);
1797
1798 return 1;
1799 }
1800
1801 static void
1802 dfsm_synced(dfsm_t *dfsm)
1803 {
1804 g_return_if_fail(dfsm != NULL);
1805
1806 char *ip = (char *)g_hash_table_lookup(cfs_status.iphash, cfs.nodename);
1807 if (!ip)
1808 ip = cfs.ip;
1809
1810 cfs_status_set("nodeip", ip, strlen(ip) + 1);
1811 }
1812
1813 static int
1814 dfsm_cleanup(
1815 dfsm_t *dfsm,
1816 gpointer data,
1817 dfsm_sync_info_t *syncinfo)
1818 {
1819 return 1;
1820 }
1821
1822 static dfsm_callbacks_t kvstore_dfsm_callbacks = {
1823 .dfsm_deliver_fn = dfsm_deliver,
1824 .dfsm_confchg_fn = dfsm_confchg,
1825
1826 .dfsm_get_state_fn = dfsm_get_state,
1827 .dfsm_process_state_update_fn = dfsm_process_state_update,
1828 .dfsm_process_update_fn = dfsm_process_update,
1829 .dfsm_commit_fn = dfsm_commit,
1830 .dfsm_cleanup_fn = dfsm_cleanup,
1831 .dfsm_synced_fn = dfsm_synced,
1832 };
1833
1834 dfsm_t *
1835 cfs_status_dfsm_new(void)
1836 {
1837 g_mutex_lock (&mutex);
1838
1839 cfs_status.kvstore = dfsm_new(NULL, KVSTORE_CPG_GROUP_NAME, G_LOG_DOMAIN,
1840 0, &kvstore_dfsm_callbacks);
1841 g_mutex_unlock (&mutex);
1842
1843 return cfs_status.kvstore;
1844 }
1845
1846 gboolean
1847 cfs_is_quorate(void)
1848 {
1849 g_mutex_lock (&mutex);
1850 gboolean res = cfs_status.quorate;
1851 g_mutex_unlock (&mutex);
1852
1853 return res;
1854 }
1855
1856 void
1857 cfs_set_quorate(
1858 uint32_t quorate,
1859 gboolean quiet)
1860 {
1861 g_mutex_lock (&mutex);
1862
1863 uint32_t prev_quorate = cfs_status.quorate;
1864 cfs_status.quorate = quorate;
1865
1866 if (!prev_quorate && cfs_status.quorate) {
1867 if (!quiet)
1868 cfs_message("node has quorum");
1869 }
1870
1871 if (prev_quorate && !cfs_status.quorate) {
1872 if (!quiet)
1873 cfs_message("node lost quorum");
1874 }
1875
1876 g_mutex_unlock (&mutex);
1877 }