]> git.proxmox.com Git - pve-cluster.git/blob - data/src/dcdb.c
pmxcfs: update copyright in license header
[pve-cluster.git] / data / src / dcdb.c
1 /*
2 Copyright (C) 2010 - 2020 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19 */
20
21 #define G_LOG_DOMAIN "dcdb"
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif /* HAVE_CONFIG_H */
26
27 #include <stdlib.h>
28 #include <stdio.h>
29 #include <inttypes.h>
30 #include <string.h>
31 #include <unistd.h>
32 #include <glib.h>
33 #include <sys/types.h>
34 #include <sys/wait.h>
35 #include <arpa/inet.h>
36 #include <sys/epoll.h>
37 #include <dirent.h>
38 #include <errno.h>
39
40 #include "cfs-utils.h"
41 #include "loop.h"
42 #include "dcdb.h"
43 #include "status.h"
44
45 typedef struct {
46 memdb_index_t *master;
47 memdb_index_t *idx;
48 GList *updates;
49 } dcdb_sync_info_t;
50
51 void
52 dcdb_send_unlock(
53 dfsm_t *dfsm,
54 const char *path,
55 const guchar csum[32],
56 gboolean request)
57 {
58 g_return_if_fail(dfsm != NULL);
59 g_return_if_fail(path != NULL);
60 g_return_if_fail(csum != NULL);
61
62 struct iovec iov[2];
63
64 iov[0].iov_base = (char *)csum;
65 iov[0].iov_len = 32;
66
67 iov[1].iov_base = (char *)path;
68 iov[1].iov_len = strlen(path) + 1;
69
70 if (!cfs_is_quorate())
71 return;
72
73 dcdb_message_t msg_type = request ?
74 DCDB_MESSAGE_CFS_UNLOCK_REQUEST : DCDB_MESSAGE_CFS_UNLOCK;
75
76 dfsm_send_message_sync(dfsm, msg_type, iov, 2, NULL);
77 }
78
79 static gboolean
80 dcdb_parse_unlock_request(
81 const void *msg,
82 size_t msg_len,
83 const char **path,
84 const guchar **csum)
85
86 {
87 g_return_val_if_fail(msg != NULL, FALSE);
88 g_return_val_if_fail(path != NULL, FALSE);
89 g_return_val_if_fail(csum != NULL, FALSE);
90
91 if (msg_len < 33) {
92 cfs_critical("received short unlock message (%zu < 33)", msg_len);
93 return FALSE;
94 }
95
96 char *msg_str = (char *) msg;
97
98 *csum = (guchar *) msg_str;
99 msg_str += 32; msg_len -= 32;
100
101 *path = msg_str;
102 if ((*path)[msg_len - 1] != 0) {
103 cfs_critical("received mailformed unlock message - 'path' not terminated");
104 *path = NULL;
105 return FALSE;
106 }
107
108 return TRUE;
109 }
110
111 int
112 dcdb_send_fuse_message(
113 dfsm_t *dfsm,
114 dcdb_message_t msg_type,
115 const char *path,
116 const char *to,
117 const char *buf,
118 guint32 size,
119 guint32 offset,
120 guint32 flags)
121 {
122 struct iovec iov[8];
123
124 iov[0].iov_base = (char *)&size;
125 iov[0].iov_len = sizeof(size);
126
127 iov[1].iov_base = (char *)&offset;
128 iov[1].iov_len = sizeof(offset);
129
130 guint32 pathlen = path ? strlen(path) + 1 : 0;
131 iov[2].iov_base = (char *)&pathlen;
132 iov[2].iov_len = sizeof(pathlen);
133
134 guint32 tolen = to ? strlen(to) + 1 : 0;
135 iov[3].iov_base = (char *)&tolen;
136 iov[3].iov_len = sizeof(tolen);
137
138 iov[4].iov_base = (char *)&flags;
139 iov[4].iov_len = sizeof(flags);
140
141 iov[5].iov_base = (char *)path;
142 iov[5].iov_len = pathlen;
143
144 iov[6].iov_base = (char *)to;
145 iov[6].iov_len = tolen;
146
147 iov[7].iov_base = (char *)buf;
148 iov[7].iov_len = size;
149
150 dfsm_result_t rc;
151 memset(&rc, 0, sizeof(rc));
152 rc.result = -EBUSY;
153
154 if (!cfs_is_quorate())
155 return -EACCES;
156
157 if (dfsm_send_message_sync(dfsm, msg_type, iov, 8, &rc))
158 return rc.result;
159
160 return -EACCES;
161 }
162
163 static gboolean
164 dcdb_parse_fuse_message(
165 const void *msg,
166 size_t msg_len,
167 const char **path,
168 const char **to,
169 const char **buf,
170 guint32 *size,
171 guint32 *offset,
172 guint32 *flags)
173
174 {
175 g_return_val_if_fail(msg != NULL, FALSE);
176 g_return_val_if_fail(path != NULL, FALSE);
177 g_return_val_if_fail(to != NULL, FALSE);
178 g_return_val_if_fail(buf != NULL, FALSE);
179 g_return_val_if_fail(size != NULL, FALSE);
180 g_return_val_if_fail(offset != NULL, FALSE);
181 g_return_val_if_fail(flags != NULL, FALSE);
182
183 if (msg_len < 20) {
184 cfs_critical("received short fuse message (%zu < 20)", msg_len);
185 return FALSE;
186 }
187
188 const uint8_t *msg_ptr = msg;
189
190 *size = *((guint32 *)msg_ptr);
191 msg_ptr += 4; msg_len -= 4;
192
193 *offset = *((guint32 *)msg_ptr);
194 msg_ptr += 4; msg_len -= 4;
195
196 guint32 pathlen = *((guint32 *)msg_ptr);
197 msg_ptr += 4; msg_len -= 4;
198
199 guint32 tolen = *((guint32 *)msg_ptr);
200 msg_ptr += 4; msg_len -= 4;
201
202 *flags = *((guint32 *)msg_ptr);
203 msg_ptr += 4; msg_len -= 4;
204
205 if (msg_len != ((*size) + pathlen + tolen)) {
206 cfs_critical("received mailformed fuse message");
207 return FALSE;
208 }
209
210 *path = (char *)msg_ptr;
211 msg_ptr += pathlen; msg_len -= pathlen;
212
213 if (pathlen) {
214 if ((*path)[pathlen - 1] != 0) {
215 cfs_critical("received mailformed fuse message - 'path' not terminated");
216 *path = NULL;
217 return FALSE;
218 }
219 } else {
220 *path = NULL;
221 }
222
223 *to = (char *)msg_ptr;
224 msg_ptr += tolen; msg_len -= tolen;
225
226 if (tolen) {
227 if ((*to)[tolen - 1] != 0) {
228 cfs_critical("received mailformed fuse message - 'to' not terminated");
229 *to = NULL;
230 return FALSE;
231 }
232 } else {
233 *to = NULL;
234 }
235
236 *buf = (*size) ? (const char*)msg_ptr : NULL;
237
238 return TRUE;
239 }
240
241 static gboolean
242 dcdb_send_update_inode(
243 dfsm_t *dfsm,
244 memdb_tree_entry_t *te)
245 {
246 g_return_val_if_fail(dfsm != NULL, FALSE);
247 g_return_val_if_fail(te != NULL, FALSE);
248
249 int len;
250 struct iovec iov[20];
251
252 uint32_t namelen = strlen(te->name) + 1;
253
254 iov[0].iov_base = (char *)&te->parent;
255 iov[0].iov_len = sizeof(te->parent);
256 iov[1].iov_base = (char *)&te->inode;
257 iov[1].iov_len = sizeof(te->inode);
258 iov[2].iov_base = (char *)&te->version;
259 iov[2].iov_len = sizeof(te->version);
260 iov[3].iov_base = (char *)&te->writer;
261 iov[3].iov_len = sizeof(te->writer);
262 iov[4].iov_base = (char *)&te->mtime;
263 iov[4].iov_len = sizeof(te->mtime);
264 iov[5].iov_base = (char *)&te->size;
265 iov[5].iov_len = sizeof(te->size);
266 iov[6].iov_base = (char *)&namelen;
267 iov[6].iov_len = sizeof(namelen);
268 iov[7].iov_base = (char *)&te->type;
269 iov[7].iov_len = sizeof(te->type);
270 iov[8].iov_base = (char *)te->name;
271 iov[8].iov_len = namelen;
272
273 len = 9;
274 if (te->type == DT_REG && te->size) {
275 iov[9].iov_base = (char *)te->data.value;
276 iov[9].iov_len = te->size;
277 len++;
278 }
279
280 if (dfsm_send_update(dfsm, iov, len) != CS_OK)
281 return FALSE;
282
283 return TRUE;
284 }
285
286 memdb_tree_entry_t *
287 dcdb_parse_update_inode(
288 const void *msg,
289 size_t msg_len)
290 {
291 if (msg_len < 40) {
292 cfs_critical("received short message (msg_len < 40)");
293 return NULL;
294 }
295
296 char *msg_ptr = (char *) msg;
297
298 guint64 parent = *((guint64 *)msg_ptr);
299 msg_ptr += 8; msg_len -= 8;
300 guint64 inode = *((guint64 *)msg_ptr);
301 msg_ptr += 8; msg_len -= 8;
302 guint64 version = *((guint64 *)msg_ptr);
303 msg_ptr += 8; msg_len -= 8;
304
305 guint32 writer = *((guint32 *)msg_ptr);
306 msg_ptr += 4; msg_len -= 4;
307 guint32 mtime = *((guint32 *)msg_ptr);
308 msg_ptr += 4; msg_len -= 4;
309 guint32 size = *((guint32 *)msg_ptr);
310 msg_ptr += 4; msg_len -= 4;
311 guint32 namelen = *((guint32 *)msg_ptr);
312 msg_ptr += 4; msg_len -= 4;
313
314 char type = *((char *)msg_ptr);
315 msg_ptr += 1; msg_len -= 1;
316
317 if (!(type == DT_REG || type == DT_DIR)) {
318 cfs_critical("received mailformed message (unknown inode type %d)", type);
319 return NULL;
320 }
321
322 if (msg_len != (size + namelen)) {
323 cfs_critical("received mailformed message (msg_len != (size + namelen))");
324 return NULL;
325 }
326
327 char *name = msg_ptr;
328 msg_ptr += namelen; msg_len -= namelen;
329
330 const void *data = msg_ptr;
331
332 if (name[namelen - 1] != 0) {
333 cfs_critical("received mailformed message (name[namelen-1] != 0)");
334 return NULL;
335 }
336
337 memdb_tree_entry_t *te = memdb_tree_entry_new(name);
338 if (!te)
339 return NULL;
340
341 te->parent = parent;
342 te->version = version;
343 te->inode = inode;
344 te->writer = writer;
345 te->mtime = mtime;
346 te->size = size;
347 te->type = type;
348
349 if (te->type == DT_REG && te->size) {
350 te->data.value = g_memdup(data, te->size);
351 if (!te->data.value) {
352 memdb_tree_entry_free(te);
353 return NULL;
354 }
355 }
356
357 return te;
358 }
359
360 void
361 dcdb_sync_corosync_conf(
362 memdb_t *memdb,
363 gboolean notify_corosync)
364 {
365 g_return_if_fail(memdb != NULL);
366
367 int len;
368 gpointer data = NULL;
369
370 len = memdb_read(memdb, "corosync.conf", &data);
371 if (len <= 0)
372 return;
373
374 guint64 new_version = cluster_config_version(data, len);
375 if (!new_version) {
376 cfs_critical("unable to parse cluster config_version");
377 return;
378 }
379
380 char *old_data = NULL;
381 gsize old_length = 0;
382 guint64 old_version = 0;
383
384 GError *err = NULL;
385 if (!g_file_get_contents(HOST_CLUSTER_CONF_FN, &old_data, &old_length, &err)) {
386 if (!g_error_matches(err, G_FILE_ERROR, G_FILE_ERROR_NOENT)) {
387 cfs_critical("unable to read cluster config file '%s' - %s",
388 HOST_CLUSTER_CONF_FN, err->message);
389 }
390 g_error_free (err);
391 } else {
392 if (old_length)
393 old_version = cluster_config_version(old_data, old_length);
394 }
395
396 /* test if something changed - return if no changes */
397 if (data && old_data && (old_length == len) &&
398 !memcmp(data, old_data, len))
399 goto ret;
400
401 if (new_version < old_version) {
402 cfs_critical("local corosync.conf is newer");
403 goto ret;
404 }
405
406 if (!atomic_write_file(HOST_CLUSTER_CONF_FN, data, len, 0644, 0))
407 goto ret;
408
409 cfs_message("wrote new corosync config '%s' (version = %" G_GUINT64_FORMAT ")",
410 HOST_CLUSTER_CONF_FN, new_version);
411
412 if (notify_corosync && old_version) {
413 /* tell corosync that there is a new config file */
414 cfs_debug ("run corosync-cfgtool -R");
415 int status = system("corosync-cfgtool -R >/dev/null 2>&1");
416 if (WIFEXITED(status) && WEXITSTATUS(status) != 0) {
417 cfs_critical("corosync-cfgtool -R failed with exit code %d\n", WEXITSTATUS(status));
418 }
419 cfs_debug ("end corosync-cfgtool -R");
420 }
421
422 ret:
423
424 if (data)
425 g_free(data);
426
427 if (old_data)
428 g_free(old_data);
429 }
430
431 static gpointer
432 dcdb_get_state(
433 dfsm_t *dfsm,
434 gpointer data,
435 unsigned int *res_len)
436 {
437 g_return_val_if_fail(dfsm != NULL, FALSE);
438 g_return_val_if_fail(data != NULL, FALSE);
439
440 memdb_t *memdb = (memdb_t *)data;
441
442 g_return_val_if_fail(memdb->root != NULL, FALSE);
443
444 cfs_debug("enter %s %016" PRIX64 " %08X", __func__, (uint64_t) memdb->root->version, memdb->root->mtime);
445
446 g_mutex_lock (&memdb->mutex);
447 memdb_index_t *idx = memdb_encode_index(memdb->index, memdb->root);
448 g_mutex_unlock (&memdb->mutex);
449
450 if (idx) {
451 *res_len = idx->bytes;
452 }
453
454 return idx;
455 }
456
457 static int
458 dcdb_select_leader(
459 int node_count,
460 memdb_index_t *idx[])
461 {
462 g_return_val_if_fail(idx != NULL, -1);
463
464 cfs_debug("enter %s", __func__);
465
466 int leader = -1;
467
468 /* try select most actual data - compare 'version' an 'time of last write'
469 * NOTE: syncinfo members are sorted
470 */
471 for (int i = 0; i < node_count; i++) {
472 if (leader < 0) {
473 leader = i;
474 } else {
475 memdb_index_t *leaderidx = idx[leader];
476
477 if (idx[i]->version == leaderidx->version &&
478 idx[i]->mtime > leaderidx->mtime) {
479 leader = i;
480 } else if (idx[i]->version > leaderidx->version) {
481 leader = i;
482 }
483 }
484 }
485
486 cfs_debug ("leave %s (%d)", __func__, leader);
487
488 return leader;
489 }
490
491 static gboolean
492 dcdb_create_and_send_updates(
493 dfsm_t *dfsm,
494 memdb_t *memdb,
495 memdb_index_t *master,
496 int node_count,
497 memdb_index_t *idx[])
498 {
499 g_return_val_if_fail(dfsm != NULL, FALSE);
500 g_return_val_if_fail(memdb != NULL, FALSE);
501 g_return_val_if_fail(master != NULL, FALSE);
502
503 cfs_debug("enter %s", __func__);
504
505 gboolean res = FALSE;
506
507 GHashTable *updates = g_hash_table_new(g_int64_hash, g_int64_equal);
508 if (!updates)
509 goto ret;
510
511 g_mutex_lock (&memdb->mutex);
512
513 for (int n = 0; n < node_count; n++) {
514 memdb_index_t *slave = idx[n];
515
516 if (slave == master)
517 continue;
518
519 int j = 0;
520
521 for (int i = 0; i < master->size; i++) {
522 guint64 inode = master->entries[i].inode;
523 while (j < slave->size && slave->entries[j].inode < inode)
524 j++;
525
526 if (memcmp(&slave->entries[j], &master->entries[i],
527 sizeof(memdb_index_extry_t)) == 0) {
528 continue;
529 }
530
531 if (g_hash_table_lookup(updates, &inode))
532 continue;
533
534 cfs_debug("found different inode %d %016" PRIX64, i, (uint64_t) inode);
535
536 memdb_tree_entry_t *te, *cpy;
537
538 if (!(te = g_hash_table_lookup(memdb->index, &inode))) {
539 cfs_critical("can get inode data for inode %016" PRIX64, (uint64_t) inode);
540 goto ret;
541 }
542
543 cpy = memdb_tree_entry_copy(te, 1);
544 g_hash_table_replace(updates, &cpy->inode, cpy);
545 }
546 }
547
548 g_mutex_unlock (&memdb->mutex);
549
550 /* send updates */
551
552 GHashTableIter iter;
553 gpointer key, value;
554 int count = 0;
555
556 cfs_message("start sending inode updates");
557
558 g_hash_table_iter_init (&iter, updates);
559 while (g_hash_table_iter_next (&iter, &key, &value)) {
560 memdb_tree_entry_t *te = (memdb_tree_entry_t *)value;
561 count++;
562
563 if (!dcdb_send_update_inode(dfsm, te)) {
564 /* tolerate error here */
565 cfs_critical("sending update inode failed %016" PRIX64, (uint64_t) te->inode);
566 } else {
567 cfs_debug("sent update inode %016" PRIX64, (uint64_t) te->inode);
568 }
569
570 memdb_tree_entry_free(te);
571 }
572
573 cfs_message("sent all (%d) updates", count);
574
575 if (dfsm_send_update_complete(dfsm) != CS_OK) {
576 cfs_critical("failed to send UPDATE_COMPLETE message");
577 goto ret;
578 }
579
580 res = TRUE;
581
582 ret:
583 if (updates)
584 g_hash_table_destroy(updates);
585
586 cfs_debug("leave %s (%d)", __func__, res);
587
588 return res;
589 }
590
591 static int
592 dcdb_process_state_update(
593 dfsm_t *dfsm,
594 gpointer data,
595 dfsm_sync_info_t *syncinfo)
596 {
597 g_return_val_if_fail(dfsm != NULL, -1);
598 g_return_val_if_fail(data != NULL, -1);
599 g_return_val_if_fail(syncinfo != NULL, -1);
600
601 memdb_t *memdb = (memdb_t *)data;
602
603 cfs_debug("enter %s", __func__);
604
605 dcdb_sync_info_t *localsi = g_new0(dcdb_sync_info_t, 1);
606 if (!localsi)
607 return -1;
608
609 syncinfo->data = localsi;
610
611 memdb_index_t *idx[syncinfo->node_count];
612
613 for (int i = 0; i < syncinfo->node_count; i++) {
614 dfsm_node_info_t *ni = &syncinfo->nodes[i];
615
616 if (ni->state_len < sizeof(memdb_index_t)) {
617 cfs_critical("received short memdb index (len < sizeof(memdb_index_t))");
618 return -1;
619 }
620
621 idx[i] = (memdb_index_t *)ni->state;
622
623 if (ni->state_len != idx[i]->bytes) {
624 cfs_critical("received mailformed memdb index (len != idx->bytes)");
625 return -1;
626 }
627 }
628
629 /* select leader - set mode */
630 int leader = dcdb_select_leader(syncinfo->node_count, idx);
631 if (leader < 0) {
632 cfs_critical("unable to select leader failed");
633 return -1;
634 }
635
636 cfs_message("leader is %d/%d", syncinfo->nodes[leader].nodeid, syncinfo->nodes[leader].pid);
637
638 memdb_index_t *leaderidx = idx[leader];
639 localsi->master = leaderidx;
640
641 GString *synced_member_ids = g_string_new(NULL);
642 g_string_append_printf(synced_member_ids, "%d/%d", syncinfo->nodes[leader].nodeid, syncinfo->nodes[leader].pid);
643
644 for (int i = 0; i < syncinfo->node_count; i++) {
645 dfsm_node_info_t *ni = &syncinfo->nodes[i];
646 if (i == leader) {
647 ni->synced = 1;
648 } else {
649 if (leaderidx->bytes == idx[i]->bytes &&
650 memcmp(leaderidx, idx[i], leaderidx->bytes) == 0) {
651 ni->synced = 1;
652 g_string_append_printf(synced_member_ids, ", %d/%d", ni->nodeid, ni->pid);
653 }
654 }
655 if (dfsm_nodeid_is_local(dfsm, ni->nodeid, ni->pid))
656 localsi->idx = idx[i];
657 }
658 cfs_message("synced members: %s", synced_member_ids->str);
659 g_string_free(synced_member_ids, 1);
660
661 /* send update */
662 if (dfsm_nodeid_is_local(dfsm, syncinfo->nodes[leader].nodeid, syncinfo->nodes[leader].pid)) {
663 if (!dcdb_create_and_send_updates(dfsm, memdb, leaderidx, syncinfo->node_count, idx))
664 return -1;
665 }
666
667 return 0;
668 }
669
670 static int
671 dcdb_process_update(
672 dfsm_t *dfsm,
673 gpointer data,
674 dfsm_sync_info_t *syncinfo,
675 uint32_t nodeid,
676 uint32_t pid,
677 const void *msg,
678 size_t msg_len)
679 {
680 g_return_val_if_fail(dfsm != NULL, -1);
681 g_return_val_if_fail(data != NULL, -1);
682 g_return_val_if_fail(msg != NULL, -1);
683 g_return_val_if_fail(syncinfo != NULL, -1);
684 g_return_val_if_fail(syncinfo->data != NULL, -1);
685
686 cfs_debug("enter %s", __func__);
687
688 memdb_tree_entry_t *te;
689
690 if (!(te = dcdb_parse_update_inode(msg, msg_len)))
691 return -1;
692
693 cfs_debug("received inode update %016" PRIX64 " from node %d",
694 (uint64_t) te->inode, nodeid);
695
696 dcdb_sync_info_t *localsi = (dcdb_sync_info_t *)syncinfo->data;
697
698 localsi->updates = g_list_append(localsi->updates, te);
699
700 return 0;
701 }
702
703 static int
704 dcdb_commit(
705 dfsm_t *dfsm,
706 gpointer data,
707 dfsm_sync_info_t *syncinfo)
708 {
709 g_return_val_if_fail(dfsm != NULL, -1);
710 g_return_val_if_fail(data != NULL, -1);
711 g_return_val_if_fail(syncinfo != NULL, -1);
712 g_return_val_if_fail(syncinfo->data != NULL, -1);
713
714 memdb_t *memdb = (memdb_t *)data;
715
716 cfs_debug("enter %s", __func__);
717
718 dcdb_sync_info_t *localsi = (dcdb_sync_info_t *)syncinfo->data;
719
720 guint count = g_list_length(localsi->updates);
721
722 cfs_message("update complete - trying to commit (got %u inode updates)", count);
723
724 if (!bdb_backend_commit_update(memdb, localsi->master, localsi->idx, localsi->updates))
725 return -1;
726
727 dcdb_sync_corosync_conf(memdb, FALSE);
728
729 return 0;
730 }
731
732 static int
733 dcdb_cleanup(
734 dfsm_t *dfsm,
735 gpointer data,
736 dfsm_sync_info_t *syncinfo)
737 {
738 g_return_val_if_fail(dfsm != NULL, -1);
739 g_return_val_if_fail(data != NULL, -1);
740 g_return_val_if_fail(syncinfo != NULL, -1);
741 g_return_val_if_fail(syncinfo->data != NULL, -1);
742
743 cfs_debug("enter %s", __func__);
744
745 dcdb_sync_info_t *localsi = (dcdb_sync_info_t *)syncinfo->data;
746
747 GList *iter = localsi->updates;
748 while (iter) {
749 memdb_tree_entry_t *te = (memdb_tree_entry_t *)iter->data;
750 memdb_tree_entry_free(te);
751 iter = g_list_next(iter);
752 }
753 g_list_free(localsi->updates);
754
755 g_free(localsi);
756
757 return 0;
758 }
759
760 gboolean
761 dcdb_checksum(
762 dfsm_t *dfsm,
763 gpointer data,
764 unsigned char *csum,
765 size_t csum_len)
766 {
767 g_return_val_if_fail(dfsm != NULL, FALSE);
768 g_return_val_if_fail(csum != NULL, FALSE);
769
770 memdb_t *memdb = (memdb_t *)data;
771
772 g_return_val_if_fail(memdb != NULL, FALSE);
773
774 cfs_debug("enter %s %016" PRIX64 " %08X", __func__, memdb->root->version, memdb->root->mtime);
775
776 g_mutex_lock (&memdb->mutex);
777 gboolean res = memdb_compute_checksum(memdb->index, memdb->root, csum, csum_len);
778 g_mutex_unlock (&memdb->mutex);
779
780 cfs_debug("leave %s %016" PRIX64 " (%d)", __func__, *( (uint64_t *) csum), res);
781
782 return res;
783 }
784
785 static int
786 dcdb_deliver(
787 dfsm_t *dfsm,
788 gpointer data,
789 int *res_ptr,
790 uint32_t nodeid,
791 uint32_t pid,
792 uint16_t msg_type,
793 uint32_t msg_time,
794 const void *msg,
795 size_t msg_len)
796 {
797 g_return_val_if_fail(dfsm != NULL, -1);
798 g_return_val_if_fail(msg != NULL, -1);
799
800 memdb_t *memdb = (memdb_t *)data;
801
802 g_return_val_if_fail(memdb != NULL, -1);
803 g_return_val_if_fail(res_ptr != NULL, -1);
804
805 int res = 1;
806
807 int msg_result = -ENOTSUP;
808
809 if (!DCDB_VALID_MESSAGE_TYPE(msg_type))
810 goto unknown;
811
812 cfs_debug("process message %u (length = %zd)", msg_type, msg_len);
813
814 if (!cfs_is_quorate()) {
815 cfs_critical("received write while not quorate - trigger resync");
816 msg_result = -EACCES;
817 goto leave;
818 }
819
820 const char *path, *to, *buf;
821 guint32 size, offset, flags;
822 const guchar *csum;
823
824 if (msg_type == DCDB_MESSAGE_CFS_UNLOCK_REQUEST ||
825 msg_type == DCDB_MESSAGE_CFS_UNLOCK) {
826 msg_result = 0; /* ignored anyways */
827
828 if (!dcdb_parse_unlock_request(msg, msg_len, &path, &csum))
829 goto leave;
830
831 guchar cur_csum[32];
832 memdb_tree_entry_t *te = memdb_getattr(memdb, path);
833
834 if (te && te->type == DT_DIR &&
835 path_is_lockdir(path) && memdb_tree_entry_csum(te, cur_csum) &&
836 (memcmp(csum, cur_csum, 32) == 0)) {
837
838 if (msg_type == DCDB_MESSAGE_CFS_UNLOCK) {
839
840 cfs_debug("got valid unlock message");
841
842 msg_result = memdb_delete(memdb, path, nodeid, msg_time);
843
844 } else if (dfsm_lowest_nodeid(dfsm)) {
845
846 cfs_debug("got valid unlock request message");
847
848 if (memdb_lock_expired(memdb, path, csum)) {
849 cfs_debug("sending unlock message");
850 dcdb_send_unlock(dfsm, path, csum, FALSE);
851 }
852 }
853 }
854 memdb_tree_entry_free(te);
855
856 } else if (msg_type == DCDB_MESSAGE_CFS_WRITE) {
857
858 if (!dcdb_parse_fuse_message(msg, msg_len, &path, &to, &buf,
859 &size, &offset, &flags))
860 goto leave;
861
862 msg_result = memdb_write(memdb, path, nodeid, msg_time,
863 buf, size, offset, flags);
864
865 if ((msg_result >= 0) && !strcmp(path, "corosync.conf"))
866 dcdb_sync_corosync_conf(memdb, dfsm_nodeid_is_local(dfsm, nodeid, pid));
867
868 } else if (msg_type == DCDB_MESSAGE_CFS_CREATE) {
869
870 if (!dcdb_parse_fuse_message(msg, msg_len, &path, &to, &buf,
871 &size, &offset, &flags))
872 goto leave;
873
874 msg_result = memdb_create(memdb, path, nodeid, msg_time);
875
876 if ((msg_result >= 0) && !strcmp(path, "corosync.conf"))
877 dcdb_sync_corosync_conf(memdb, dfsm_nodeid_is_local(dfsm, nodeid, pid));
878
879 } else if (msg_type == DCDB_MESSAGE_CFS_MKDIR) {
880
881 if (!dcdb_parse_fuse_message(msg, msg_len, &path, &to, &buf,
882 &size, &offset, &flags))
883 goto leave;
884
885 msg_result = memdb_mkdir(memdb, path, nodeid, msg_time);
886
887 } else if (msg_type == DCDB_MESSAGE_CFS_DELETE) {
888
889 if (!dcdb_parse_fuse_message(msg, msg_len, &path, &to, &buf,
890 &size, &offset, &flags))
891 goto leave;
892
893 msg_result = memdb_delete(memdb, path, nodeid, msg_time);
894
895 } else if (msg_type == DCDB_MESSAGE_CFS_RENAME) {
896
897 if (!dcdb_parse_fuse_message(msg, msg_len, &path, &to, &buf,
898 &size, &offset, &flags))
899 goto leave;
900
901 msg_result = memdb_rename(memdb, path, to, nodeid, msg_time);
902
903 if ((msg_result >= 0) && !strcmp(to, "corosync.conf"))
904 dcdb_sync_corosync_conf(memdb, dfsm_nodeid_is_local(dfsm, nodeid, pid));
905
906 } else if (msg_type == DCDB_MESSAGE_CFS_MTIME) {
907
908 if (!dcdb_parse_fuse_message(msg, msg_len, &path, &to, &buf,
909 &size, &offset, &flags))
910 goto leave;
911
912 /* Note: mtime is sent via offset field */
913 msg_result = memdb_mtime(memdb, path, nodeid, offset);
914
915 } else {
916 goto unknown;
917 }
918
919 *res_ptr = msg_result;
920 ret:
921 if (memdb->errors) {
922 dfsm_set_errormode(dfsm);
923 res = -1;
924 }
925
926 cfs_debug("leave %s (%d)", __func__, res);
927
928 return res;
929
930 unknown:
931 cfs_critical("received unknown message type (msg_type == %u)", msg_type);
932 leave:
933 res = -1;
934 goto ret;
935
936 }
937
938 static dfsm_callbacks_t dcdb_dfsm_callbacks = {
939 .dfsm_deliver_fn = dcdb_deliver,
940 .dfsm_get_state_fn = dcdb_get_state,
941 .dfsm_process_state_update_fn = dcdb_process_state_update,
942 .dfsm_process_update_fn = dcdb_process_update,
943 .dfsm_commit_fn = dcdb_commit,
944 .dfsm_cleanup_fn = dcdb_cleanup,
945 .dfsm_checksum_fn = dcdb_checksum,
946 };
947
948 dfsm_t *dcdb_new(memdb_t *memdb)
949 {
950 g_return_val_if_fail(memdb != NULL, NULL);
951
952 return dfsm_new(memdb, DCDB_CPG_GROUP_NAME, G_LOG_DOMAIN,
953 DCDB_PROTOCOL_VERSION, &dcdb_dfsm_callbacks);
954 }