]> git.proxmox.com Git - pve-cluster.git/blame - data/src/dcdb.c
add ability to update cfs locks, bump version to 3.0-17
[pve-cluster.git] / data / src / dcdb.c
CommitLineData
fe000966
DM
1/*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19*/
20
21#define G_LOG_DOMAIN "dcdb"
22
23#ifdef HAVE_CONFIG_H
24#include <config.h>
25#endif /* HAVE_CONFIG_H */
26
27#include <stdlib.h>
28#include <stdio.h>
29#include <string.h>
30#include <unistd.h>
31#include <glib.h>
32#include <sys/types.h>
33#include <sys/wait.h>
34#include <arpa/inet.h>
35#include <sys/epoll.h>
36#include <dirent.h>
37#include <errno.h>
38
39#include "cfs-utils.h"
40#include "loop.h"
41#include "dcdb.h"
42#include "status.h"
43
44typedef struct {
45 memdb_index_t *master;
46 memdb_index_t *idx;
47 GList *updates;
48} dcdb_sync_info_t;
49
50void
51dcdb_send_unlock(
52 dfsm_t *dfsm,
53 const char *path,
54 const guchar csum[32],
55 gboolean request)
56{
57 g_return_if_fail(dfsm != NULL);
58 g_return_if_fail(path != NULL);
59 g_return_if_fail(csum != NULL);
60
61 struct iovec iov[2];
62
63 iov[0].iov_base = (char *)csum;
64 iov[0].iov_len = 32;
65
66 iov[1].iov_base = (char *)path;
67 iov[1].iov_len = strlen(path) + 1;
68
69 if (!cfs_is_quorate())
70 return;
71
72 dcdb_message_t msg_type = request ?
73 DCDB_MESSAGE_CFS_UNLOCK_REQUEST : DCDB_MESSAGE_CFS_UNLOCK;
74
75 dfsm_send_message_sync(dfsm, msg_type, iov, 2, NULL);
76}
77
78static gboolean
79dcdb_parse_unlock_request(
80 const void *msg,
81 size_t msg_len,
82 const char **path,
83 const guchar **csum)
84
85{
86 g_return_val_if_fail(msg != NULL, FALSE);
87 g_return_val_if_fail(path != NULL, FALSE);
88 g_return_val_if_fail(csum != NULL, FALSE);
89
90 if (msg_len < 33) {
91 cfs_critical("received short unlock message (%lu < 33)", msg_len);
92 return FALSE;
93 }
94
95 *csum = msg;
96 msg += 32; msg_len -= 32;
97
98 *path = msg;
99 if ((*path)[msg_len - 1] != 0) {
100 cfs_critical("received mailformed unlock message - 'path' not terminated");
101 *path = NULL;
102 return FALSE;
103 }
104
105 return TRUE;
106}
107
108int
109dcdb_send_fuse_message(
110 dfsm_t *dfsm,
111 dcdb_message_t msg_type,
112 const char *path,
113 const char *to,
114 const char *buf,
115 guint32 size,
116 guint32 offset,
117 guint32 flags)
118{
119 struct iovec iov[8];
120
121 iov[0].iov_base = (char *)&size;
122 iov[0].iov_len = sizeof(size);
123
124 iov[1].iov_base = (char *)&offset;
125 iov[1].iov_len = sizeof(offset);
126
127 guint32 pathlen = path ? strlen(path) + 1 : 0;
128 iov[2].iov_base = (char *)&pathlen;
129 iov[2].iov_len = sizeof(pathlen);
130
131 guint32 tolen = to ? strlen(to) + 1 : 0;
132 iov[3].iov_base = (char *)&tolen;
133 iov[3].iov_len = sizeof(tolen);
134
135 iov[4].iov_base = (char *)&flags;
136 iov[4].iov_len = sizeof(flags);
137
138 iov[5].iov_base = (char *)path;
139 iov[5].iov_len = pathlen;
140
141 iov[6].iov_base = (char *)to;
142 iov[6].iov_len = tolen;
143
144 iov[7].iov_base = (char *)buf;
145 iov[7].iov_len = size;
146
147 dfsm_result_t rc;
148 memset(&rc, 0, sizeof(rc));
149 rc.result = -EBUSY;
150
151 if (!cfs_is_quorate())
152 return -EACCES;
153
154 if (dfsm_send_message_sync(dfsm, msg_type, iov, 8, &rc))
155 return rc.result;
156
157 return -EACCES;
158}
159
160static gboolean
161dcdb_parse_fuse_message(
162 const void *msg,
163 size_t msg_len,
164 const char **path,
165 const char **to,
166 const char **buf,
167 guint32 *size,
168 guint32 *offset,
169 guint32 *flags)
170
171{
172 g_return_val_if_fail(msg != NULL, FALSE);
173 g_return_val_if_fail(path != NULL, FALSE);
174 g_return_val_if_fail(to != NULL, FALSE);
175 g_return_val_if_fail(buf != NULL, FALSE);
176 g_return_val_if_fail(size != NULL, FALSE);
177 g_return_val_if_fail(offset != NULL, FALSE);
178 g_return_val_if_fail(flags != NULL, FALSE);
179
180 if (msg_len < 20) {
181 cfs_critical("received short fuse message (%lu < 20)", msg_len);
182 return FALSE;
183 }
184
185 *size = *((guint32 *)msg);
186 msg += 4; msg_len -= 4;
187
188 *offset = *((guint32 *)msg);
189 msg += 4; msg_len -= 4;
190
191 guint32 pathlen = *((guint32 *)msg);
192 msg += 4; msg_len -= 4;
193
194 guint32 tolen = *((guint32 *)msg);
195 msg += 4; msg_len -= 4;
196
197 *flags = *((guint32 *)msg);
198 msg += 4; msg_len -= 4;
199
200 if (msg_len != ((*size) + pathlen + tolen)) {
201 cfs_critical("received mailformed fuse message");
202 return FALSE;
203 }
204
205 *path = (char *)msg;
206 msg += pathlen; msg_len -= pathlen;
207
208 if (pathlen) {
209 if ((*path)[pathlen - 1] != 0) {
210 cfs_critical("received mailformed fuse message - 'path' not terminated");
211 *path = NULL;
212 return FALSE;
213 }
214 } else {
215 *path = NULL;
216 }
217
218 *to = (char *)msg;
219 msg += tolen; msg_len -= tolen;
220
221 if (tolen) {
222 if ((*to)[tolen - 1] != 0) {
223 cfs_critical("received mailformed fuse message - 'to' not terminated");
224 *to = NULL;
225 return FALSE;
226 }
227 } else {
228 *to = NULL;
229 }
230
231 *buf = (*size) ? msg : NULL;
232
233 return TRUE;
234}
235
236static gboolean
237dcdb_send_update_inode(
238 dfsm_t *dfsm,
239 memdb_tree_entry_t *te)
240{
241 g_return_val_if_fail(dfsm != NULL, FALSE);
242 g_return_val_if_fail(te != NULL, FALSE);
243
244 int len;
245 struct iovec iov[20];
246
247 uint32_t namelen = strlen(te->name) + 1;
248
249 iov[0].iov_base = (char *)&te->parent;
250 iov[0].iov_len = sizeof(te->parent);
251 iov[1].iov_base = (char *)&te->inode;
252 iov[1].iov_len = sizeof(te->inode);
253 iov[2].iov_base = (char *)&te->version;
254 iov[2].iov_len = sizeof(te->version);
255 iov[3].iov_base = (char *)&te->writer;
256 iov[3].iov_len = sizeof(te->writer);
257 iov[4].iov_base = (char *)&te->mtime;
258 iov[4].iov_len = sizeof(te->mtime);
259 iov[5].iov_base = (char *)&te->size;
260 iov[5].iov_len = sizeof(te->size);
261 iov[6].iov_base = (char *)&namelen;
262 iov[6].iov_len = sizeof(namelen);
263 iov[7].iov_base = (char *)&te->type;
264 iov[7].iov_len = sizeof(te->type);
265 iov[8].iov_base = (char *)te->name;
266 iov[8].iov_len = namelen;
267
268 len = 9;
269 if (te->type == DT_REG && te->size) {
270 iov[9].iov_base = (char *)te->data.value;
271 iov[9].iov_len = te->size;
272 len++;
273 }
274
275 if (dfsm_send_update(dfsm, iov, len) != CS_OK)
276 return FALSE;
277
278 return TRUE;
279}
280
281memdb_tree_entry_t *
282dcdb_parse_update_inode(
283 const void *msg,
284 size_t msg_len)
285{
286 if (msg_len < 40) {
287 cfs_critical("received short message (msg_len < 40)");
288 return NULL;
289 }
290
291 guint64 parent = *((guint64 *)msg);
292 msg += 8; msg_len -= 8;
293 guint64 inode = *((guint64 *)msg);
294 msg += 8; msg_len -= 8;
295 guint64 version = *((guint64 *)msg);
296 msg += 8; msg_len -= 8;
297
298 guint32 writer = *((guint32 *)msg);
299 msg += 4; msg_len -= 4;
300 guint32 mtime = *((guint32 *)msg);
301 msg += 4; msg_len -= 4;
302 guint32 size = *((guint32 *)msg);
303 msg += 4; msg_len -= 4;
304 guint32 namelen = *((guint32 *)msg);
305 msg += 4; msg_len -= 4;
306
307 char type = *((char *)msg);
308 msg += 1; msg_len -= 1;
309
310 if (!(type == DT_REG || type == DT_DIR)) {
311 cfs_critical("received mailformed message (unknown inode type %d)", type);
312 return NULL;
313 }
314
315 if (msg_len != (size + namelen)) {
316 cfs_critical("received mailformed message (msg_len != (size + namelen))");
317 return NULL;
318 }
319
320 char *name = (char *)msg;
321 msg += namelen; msg_len -= namelen;
322
323 const void *data = msg;
324
325 if (name[namelen - 1] != 0) {
326 cfs_critical("received mailformed message (name[namelen-1] != 0)");
327 return NULL;
328 }
329
330 memdb_tree_entry_t *te = memdb_tree_entry_new(name);
331 if (!te)
332 return NULL;
333
334 te->parent = parent;
335 te->version = version;
336 te->inode = inode;
337 te->writer = writer;
338 te->mtime = mtime;
339 te->size = size;
340 te->type = type;
341
342 if (te->type == DT_REG && te->size) {
343 te->data.value = g_memdup(data, te->size);
344 if (!te->data.value) {
345 memdb_tree_entry_free(te);
346 return NULL;
347 }
348 }
349
350 return te;
351}
352
353void
354dcdb_sync_cluster_conf(
355 memdb_t *memdb,
356 gboolean notify_cman)
357{
358 g_return_if_fail(memdb != NULL);
359
360 int len;
361 gpointer data = NULL;
362
363 len = memdb_read(memdb, "cluster.conf", &data);
364 if (len <= 0)
365 return;
366
367 guint64 new_version = cluster_config_version(data, len);
368 if (!new_version)
369 return;
370
371 char *old_data = NULL;
372 gsize old_length = 0;
373 guint64 old_version = 0;
374
375 GError *err = NULL;
376 if (!g_file_get_contents(HOST_CLUSTER_CONF_FN, &old_data, &old_length, &err)) {
f6efd6bf
DM
377 if (!g_error_matches(err, G_FILE_ERROR, G_FILE_ERROR_NOENT)) {
378 cfs_critical("unable to read cluster config file '%s' - %s",
379 HOST_CLUSTER_CONF_FN, err->message);
380 }
fe000966
DM
381 g_error_free (err);
382 } else {
383 if (old_length)
384 old_version = cluster_config_version(old_data, old_length);
385 }
386
387 /* test if something changed - return if no changes */
388 if (data && old_data && (old_length == len) &&
389 !memcmp(data, old_data, len))
390 goto ret;
391
392 if (new_version < old_version) {
393 cfs_critical("local cluster.conf is newer");
394 goto ret;
395 }
396
397 if (!atomic_write_file(HOST_CLUSTER_CONF_FN, data, len, 0640, 0))
398 goto ret;
399
400 cfs_message("wrote new cluster config '%s'", HOST_CLUSTER_CONF_FN);
401
f6efd6bf 402 if (notify_cman && old_version) {
fe000966
DM
403 /* tell cman that there is a new config file */
404 cfs_debug ("run cman_tool version");
405 int status = system("/usr/sbin/cman_tool version -r -S >/dev/null 2>&1");
406 if (WIFEXITED(status) && WEXITSTATUS(status) != 0) {
407 cfs_critical("cman_tool version failed with exit code %d\n", WEXITSTATUS(status));
408 }
409 cfs_debug ("end cman_tool version");
410 }
411
412ret:
413
414 if (data)
415 g_free(data);
416
417 if (old_data)
418 g_free(old_data);
419}
420
421static gpointer
422dcdb_get_state(
423 dfsm_t *dfsm,
424 gpointer data,
425 unsigned int *res_len)
426{
427 g_return_val_if_fail(dfsm != NULL, FALSE);
428 g_return_val_if_fail(data != NULL, FALSE);
429
430 memdb_t *memdb = (memdb_t *)data;
431
432 g_return_val_if_fail(memdb->root != NULL, FALSE);
433 g_return_val_if_fail(memdb->mutex != NULL, FALSE);
434
435 cfs_debug("enter %s %016zX %08X", __func__, memdb->root->version, memdb->root->mtime);
436
437 g_mutex_lock (memdb->mutex);
438 memdb_index_t *idx = memdb_encode_index(memdb->index, memdb->root);
439 g_mutex_unlock (memdb->mutex);
440
441 if (idx) {
442 *res_len = idx->bytes;
443 }
444
445 return idx;
446}
447
448static int
449dcdb_select_leader(
450 int node_count,
451 memdb_index_t *idx[])
452{
453 g_return_val_if_fail(idx != NULL, -1);
454
455 cfs_debug("enter %s", __func__);
456
457 int leader = -1;
458
459 /* try select most actual data - compare 'version' an 'time of last write'
460 * NOTE: syncinfo members are sorted
461 */
462 for (int i = 0; i < node_count; i++) {
463 if (leader < 0) {
464 leader = i;
465 } else {
466 memdb_index_t *leaderidx = idx[leader];
467
468 if (idx[i]->version == leaderidx->version &&
469 idx[i]->mtime > leaderidx->mtime) {
470 leader = i;
471 } else if (idx[i]->version > leaderidx->version) {
472 leader = i;
473 }
474 }
475 }
476
477 cfs_debug ("leave %s (%d)", __func__, leader);
478
479 return leader;
480}
481
482static gboolean
483dcdb_create_and_send_updates(
484 dfsm_t *dfsm,
485 memdb_t *memdb,
486 memdb_index_t *master,
487 int node_count,
488 memdb_index_t *idx[])
489{
490 g_return_val_if_fail(dfsm != NULL, FALSE);
491 g_return_val_if_fail(memdb != NULL, FALSE);
492 g_return_val_if_fail(memdb->mutex != NULL, FALSE);
493 g_return_val_if_fail(master != NULL, FALSE);
494
495 cfs_debug("enter %s", __func__);
496
497 gboolean res = FALSE;
498
499 GHashTable *updates = g_hash_table_new(g_int64_hash, g_int64_equal);
500 if (!updates)
501 goto ret;
502
503 g_mutex_lock (memdb->mutex);
504
505 for (int n = 0; n < node_count; n++) {
506 memdb_index_t *slave = idx[n];
507
508 if (slave == master)
509 continue;
510
511 int j = 0;
512
513 for (int i = 0; i < master->size; i++) {
514 guint64 inode = master->entries[i].inode;
515 while (j < slave->size && slave->entries[j].inode < inode)
516 j++;
517
518 if (memcmp(&slave->entries[j], &master->entries[i],
519 sizeof(memdb_index_extry_t)) == 0) {
520 continue;
521 }
522
523 if (g_hash_table_lookup(updates, &inode))
524 continue;
525
526 cfs_debug("found different inode %d %016zX", i, inode);
527
528 memdb_tree_entry_t *te, *cpy;
529
530 if (!(te = g_hash_table_lookup(memdb->index, &inode))) {
531 cfs_critical("can get inode data for inode %016zX", inode);
532 goto ret;
533 }
534
535 cpy = memdb_tree_entry_copy(te, 1);
536 g_hash_table_replace(updates, &cpy->inode, cpy);
537 }
538 }
539
540 g_mutex_unlock (memdb->mutex);
541
542 /* send updates */
543
544 GHashTableIter iter;
545 gpointer key, value;
546 int count = 0;
547
548 cfs_message("start sending inode updates");
549
550 g_hash_table_iter_init (&iter, updates);
551 while (g_hash_table_iter_next (&iter, &key, &value)) {
552 memdb_tree_entry_t *te = (memdb_tree_entry_t *)value;
553 count++;
554
555 if (!dcdb_send_update_inode(dfsm, te)) {
556 /* tolerate error here */
557 cfs_critical("sending update inode failed %016zX", te->inode);
558 } else {
559 cfs_debug("sent update inode %016zX", te->inode);
560 }
561
562 memdb_tree_entry_free(te);
563 }
564
565 cfs_message("sent all (%d) updates", count);
566
567 if (dfsm_send_update_complete(dfsm) != CS_OK) {
568 cfs_critical("failed to send UPDATE_COMPLETE message");
569 goto ret;
570 }
571
572 res = TRUE;
573
574 ret:
575 if (updates)
576 g_hash_table_destroy(updates);
577
578 cfs_debug("leave %s (%d)", __func__, res);
579
580 return res;
581}
582
583static int
584dcdb_process_state_update(
585 dfsm_t *dfsm,
586 gpointer data,
587 dfsm_sync_info_t *syncinfo)
588{
589 g_return_val_if_fail(dfsm != NULL, -1);
590 g_return_val_if_fail(data != NULL, -1);
591 g_return_val_if_fail(syncinfo != NULL, -1);
592
593 memdb_t *memdb = (memdb_t *)data;
594
595 cfs_debug("enter %s", __func__);
596
597 dcdb_sync_info_t *localsi = g_new0(dcdb_sync_info_t, 1);
598 if (!localsi)
599 return -1;
600
601 syncinfo->data = localsi;
602
603 memdb_index_t *idx[syncinfo->node_count];
604
605 for (int i = 0; i < syncinfo->node_count; i++) {
606 dfsm_node_info_t *ni = &syncinfo->nodes[i];
607
608 if (ni->state_len < sizeof(memdb_index_t)) {
609 cfs_critical("received short memdb index (len < sizeof(memdb_index_t))");
610 return -1;
611 }
612
613 idx[i] = (memdb_index_t *)ni->state;
614
615 if (ni->state_len != idx[i]->bytes) {
616 cfs_critical("received mailformed memdb index (len != idx->bytes)");
617 return -1;
618 }
619 }
620
621 /* select leader - set mode */
622 int leader = dcdb_select_leader(syncinfo->node_count, idx);
623 if (leader < 0) {
624 cfs_critical("unable to select leader failed");
625 return -1;
626 }
627
628 cfs_message("leader is %d/%d", syncinfo->nodes[leader].nodeid, syncinfo->nodes[leader].pid);
629
630 memdb_index_t *leaderidx = idx[leader];
631 localsi->master = leaderidx;
632
633 GString *str = g_string_new("synced members:");
634 g_string_append_printf(str, " %d/%d", syncinfo->nodes[leader].nodeid, syncinfo->nodes[leader].pid);
635
636 for (int i = 0; i < syncinfo->node_count; i++) {
637 dfsm_node_info_t *ni = &syncinfo->nodes[i];
638 if (i == leader) {
639 ni->synced = 1;
640 } else {
641 if (leaderidx->bytes == idx[i]->bytes &&
642 memcmp(leaderidx, idx[i], leaderidx->bytes) == 0) {
643 ni->synced = 1;
644 g_string_append_printf(str, ", %d/%d", ni->nodeid, ni->pid);
645 }
646 }
647 if (dfsm_nodeid_is_local(dfsm, ni->nodeid, ni->pid))
648 localsi->idx = idx[i];
649 }
650 cfs_message(str->str);
651 g_string_free(str, 1);
652
653 /* send update */
654 if (dfsm_nodeid_is_local(dfsm, syncinfo->nodes[leader].nodeid, syncinfo->nodes[leader].pid)) {
655 if (!dcdb_create_and_send_updates(dfsm, memdb, leaderidx, syncinfo->node_count, idx))
656 return -1;
657 }
658
659 return 0;
660}
661
662static int
663dcdb_process_update(
664 dfsm_t *dfsm,
665 gpointer data,
666 dfsm_sync_info_t *syncinfo,
667 uint32_t nodeid,
668 uint32_t pid,
669 const void *msg,
670 size_t msg_len)
671{
672 g_return_val_if_fail(dfsm != NULL, -1);
673 g_return_val_if_fail(data != NULL, -1);
674 g_return_val_if_fail(msg != NULL, -1);
675 g_return_val_if_fail(syncinfo != NULL, -1);
676 g_return_val_if_fail(syncinfo->data != NULL, -1);
677
678 cfs_debug("enter %s", __func__);
679
680 memdb_tree_entry_t *te;
681
682 if (!(te = dcdb_parse_update_inode(msg, msg_len)))
683 return -1;
684
685 cfs_debug("received inode update %016zX from node %d",
686 te->inode, nodeid);
687
688 dcdb_sync_info_t *localsi = (dcdb_sync_info_t *)syncinfo->data;
689
690 localsi->updates = g_list_append(localsi->updates, te);
691
692 return 0;
693}
694
695static int
696dcdb_commit(
697 dfsm_t *dfsm,
698 gpointer data,
699 dfsm_sync_info_t *syncinfo)
700{
701 g_return_val_if_fail(dfsm != NULL, -1);
702 g_return_val_if_fail(data != NULL, -1);
703 g_return_val_if_fail(syncinfo != NULL, -1);
704 g_return_val_if_fail(syncinfo->data != NULL, -1);
705
706 memdb_t *memdb = (memdb_t *)data;
707
708 cfs_debug("enter %s", __func__);
709
710 dcdb_sync_info_t *localsi = (dcdb_sync_info_t *)syncinfo->data;
711
712 guint count = g_list_length(localsi->updates);
713
714 cfs_message("update complete - trying to commit (got %u inode updates)", count);
715
716 if (!bdb_backend_commit_update(memdb, localsi->master, localsi->idx, localsi->updates))
717 return -1;
718
719 dcdb_sync_cluster_conf(memdb, FALSE);
720
721 return 0;
722}
723
724static int
725dcdb_cleanup(
726 dfsm_t *dfsm,
727 gpointer data,
728 dfsm_sync_info_t *syncinfo)
729{
730 g_return_val_if_fail(dfsm != NULL, -1);
731 g_return_val_if_fail(data != NULL, -1);
732 g_return_val_if_fail(syncinfo != NULL, -1);
733 g_return_val_if_fail(syncinfo->data != NULL, -1);
734
735 cfs_debug("enter %s", __func__);
736
737 dcdb_sync_info_t *localsi = (dcdb_sync_info_t *)syncinfo->data;
738
739 GList *iter = localsi->updates;
740 while (iter) {
741 memdb_tree_entry_t *te = (memdb_tree_entry_t *)iter->data;
742 memdb_tree_entry_free(te);
743 iter = g_list_next(iter);
744 }
745 g_list_free(localsi->updates);
746
747 g_free(localsi);
748
749 return 0;
750}
751
752gboolean
753dcdb_checksum(
754 dfsm_t *dfsm,
755 gpointer data,
756 unsigned char *csum,
757 size_t csum_len)
758{
759 g_return_val_if_fail(dfsm != NULL, FALSE);
760 g_return_val_if_fail(csum != NULL, FALSE);
761
762 memdb_t *memdb = (memdb_t *)data;
763
764 g_return_val_if_fail(memdb != NULL, FALSE);
765 g_return_val_if_fail(memdb->mutex != NULL, FALSE);
766
767 cfs_debug("enter %s %016zX %08X", __func__, memdb->root->version, memdb->root->mtime);
768
769 g_mutex_lock (memdb->mutex);
770 gboolean res = memdb_compute_checksum(memdb->index, memdb->root, csum, csum_len);
771 g_mutex_unlock (memdb->mutex);
772
773 cfs_debug("leave %s %016zX (%d)", __func__, *(uint64_t *)csum, res);
774
775 return res;
776}
777
778static int
779dcdb_deliver(
780 dfsm_t *dfsm,
781 gpointer data,
782 int *res_ptr,
783 uint32_t nodeid,
784 uint32_t pid,
785 uint16_t msg_type,
786 uint32_t msg_time,
787 const void *msg,
788 size_t msg_len)
789{
790 g_return_val_if_fail(dfsm != NULL, -1);
791 g_return_val_if_fail(msg != NULL, -1);
792
793 memdb_t *memdb = (memdb_t *)data;
794
795 g_return_val_if_fail(memdb != NULL, -1);
796 g_return_val_if_fail(res_ptr != NULL, -1);
797
798 int res = 1;
799
800 int msg_result = -ENOTSUP;
801
802 if (!DCDB_VALID_MESSAGE_TYPE(msg_type))
803 goto unknown;
804
805 cfs_debug("process message %d (length = %ld)", msg_type, msg_len);
806
807 if (!cfs_is_quorate()) {
808 cfs_critical("received write while not quorate - trigger resync");
809 msg_result = -EACCES;
810 goto leave;
811 }
812
813 const char *path, *to, *buf;
814 guint32 size, offset, flags;
815 const guchar *csum;
816
817 if (msg_type == DCDB_MESSAGE_CFS_UNLOCK_REQUEST ||
818 msg_type == DCDB_MESSAGE_CFS_UNLOCK) {
819 msg_result = 0; /* ignored anyways */
820
821 if (!dcdb_parse_unlock_request(msg, msg_len, &path, &csum))
822 goto leave;
823
824 guchar cur_csum[32];
825 memdb_tree_entry_t *te = memdb_getattr(memdb, path);
826
827 if (te && te->type == DT_DIR &&
828 path_is_lockdir(path) && memdb_tree_entry_csum(te, cur_csum) &&
829 (memcmp(csum, cur_csum, 32) == 0)) {
830
831 if (msg_type == DCDB_MESSAGE_CFS_UNLOCK) {
832
833 cfs_debug("got valid unlock message");
834
835 msg_result = memdb_delete(memdb, path, nodeid, msg_time);
836
837 } else if (dfsm_lowest_nodeid(dfsm)) {
838
839 cfs_debug("got valid unlock request message");
840
841 if (memdb_lock_expired(memdb, path, csum)) {
842 cfs_debug("sending unlock message");
843 dcdb_send_unlock(dfsm, path, csum, FALSE);
844 }
845 }
846 }
847
848 } else if (msg_type == DCDB_MESSAGE_CFS_WRITE) {
849
850 if (!dcdb_parse_fuse_message(msg, msg_len, &path, &to, &buf,
851 &size, &offset, &flags))
852 goto leave;
853
854 msg_result = memdb_write(memdb, path, nodeid, msg_time,
855 buf, size, offset, flags);
856
857 if ((msg_result >= 0) && !strcmp(path, "cluster.conf"))
858 dcdb_sync_cluster_conf(memdb, dfsm_nodeid_is_local(dfsm, nodeid, pid));
859
860 } else if (msg_type == DCDB_MESSAGE_CFS_CREATE) {
861
862 if (!dcdb_parse_fuse_message(msg, msg_len, &path, &to, &buf,
863 &size, &offset, &flags))
864 goto leave;
865
866 msg_result = memdb_create(memdb, path, nodeid, msg_time);
867
868 if ((msg_result >= 0) && !strcmp(path, "cluster.conf"))
869 dcdb_sync_cluster_conf(memdb, dfsm_nodeid_is_local(dfsm, nodeid, pid));
870
871 } else if (msg_type == DCDB_MESSAGE_CFS_MKDIR) {
872
873 if (!dcdb_parse_fuse_message(msg, msg_len, &path, &to, &buf,
874 &size, &offset, &flags))
875 goto leave;
876
877 msg_result = memdb_mkdir(memdb, path, nodeid, msg_time);
878
879 } else if (msg_type == DCDB_MESSAGE_CFS_DELETE) {
880
881 if (!dcdb_parse_fuse_message(msg, msg_len, &path, &to, &buf,
882 &size, &offset, &flags))
883 goto leave;
884
885 msg_result = memdb_delete(memdb, path, nodeid, msg_time);
886
887 } else if (msg_type == DCDB_MESSAGE_CFS_RENAME) {
888
889 if (!dcdb_parse_fuse_message(msg, msg_len, &path, &to, &buf,
890 &size, &offset, &flags))
891 goto leave;
892
893 msg_result = memdb_rename(memdb, path, to, nodeid, msg_time);
894
895 if ((msg_result >= 0) && !strcmp(to, "cluster.conf"))
896 dcdb_sync_cluster_conf(memdb, dfsm_nodeid_is_local(dfsm, nodeid, pid));
897
898 } else if (msg_type == DCDB_MESSAGE_CFS_MTIME) {
899
900 if (!dcdb_parse_fuse_message(msg, msg_len, &path, &to, &buf,
901 &size, &offset, &flags))
902 goto leave;
903
904 /* Note: mtime is sent via offset field */
905 msg_result = memdb_mtime(memdb, path, nodeid, offset);
906
907 } else {
908 goto unknown;
909 }
910
911 *res_ptr = msg_result;
912ret:
913 if (memdb->errors) {
914 dfsm_set_errormode(dfsm);
915 res = -1;
916 }
917
918 cfs_debug("leave %s (%d)", __func__, res);
919
920 return res;
921leave:
922 res = -1;
923 goto ret;
924
925unknown:
926 cfs_critical("received unknown message type (msg_type == %d)", msg_type);
927 goto leave;
928};
929
930static dfsm_callbacks_t dcdb_dfsm_callbacks = {
931 .dfsm_deliver_fn = dcdb_deliver,
932 .dfsm_get_state_fn = dcdb_get_state,
933 .dfsm_process_state_update_fn = dcdb_process_state_update,
934 .dfsm_process_update_fn = dcdb_process_update,
935 .dfsm_commit_fn = dcdb_commit,
936 .dfsm_cleanup_fn = dcdb_cleanup,
937 .dfsm_checksum_fn = dcdb_checksum,
938};
939
940dfsm_t *dcdb_new(memdb_t *memdb)
941{
942 g_return_val_if_fail(memdb != NULL, NULL);
943
944 return dfsm_new(memdb, DCDB_CPG_GROUP_NAME, G_LOG_DOMAIN,
945 DCDB_PROTOCOL_VERSION, &dcdb_dfsm_callbacks);
946}