]> git.proxmox.com Git - pve-cluster.git/blame - data/src/dfsm.c
add ability to update cfs locks, bump version to 3.0-17
[pve-cluster.git] / data / src / dfsm.c
CommitLineData
fe000966
DM
1/*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19*/
20
21
22/* NOTE: we try to keep the CPG handle as long as possible, because
23 * calling cpg_initialize/cpg_finalize multiple times from the
24 * same process confuses corosync.
25 * Note: CS_ERR_LIBRARY is returned when corosync died
26 */
27
28#ifdef HAVE_CONFIG_H
29#include <config.h>
30#endif /* HAVE_CONFIG_H */
31
32#include <sys/types.h>
33#include <unistd.h>
34#include <string.h>
35#include <stdlib.h>
36
37#include <corosync/corotypes.h>
38#include <corosync/cpg.h>
39#include <glib.h>
40
41#include "cfs-utils.h"
42#include "dfsm.h"
43
44static cpg_callbacks_t cpg_callbacks;
45
46typedef enum {
47 DFSM_MODE_START = 0,
48 DFSM_MODE_START_SYNC = 1,
49 DFSM_MODE_SYNCED = 2,
50 DFSM_MODE_UPDATE = 3,
51
52 /* values >= 128 indicates abnormal/error conditions */
53 DFSM_ERROR_MODE_START = 128,
54 DFSM_MODE_LEAVE = 253,
55 DFSM_MODE_VERSION_ERROR = 254,
56 DFSM_MODE_ERROR = 255,
57} dfsm_mode_t;
58
59typedef enum {
60 DFSM_MESSAGE_NORMAL = 0,
61 DFSM_MESSAGE_SYNC_START = 1,
62 DFSM_MESSAGE_STATE = 2,
63 DFSM_MESSAGE_UPDATE = 3,
64 DFSM_MESSAGE_UPDATE_COMPLETE = 4,
65 DFSM_MESSAGE_VERIFY_REQUEST = 5,
66 DFSM_MESSAGE_VERIFY = 6,
67} dfsm_message_t;
68
69#define DFSM_VALID_STATE_MESSAGE(mt) (mt >= DFSM_MESSAGE_SYNC_START && mt <= DFSM_MESSAGE_VERIFY)
70
71typedef struct {
72 uint16_t type;
73 uint16_t subtype;
74 uint32_t protocol_version;
75 uint32_t time;
76 uint32_t reserved;
77} dfsm_message_header_t;
78
79typedef struct {
80 uint32_t epoch; // per process (not globally unique)
81 uint32_t time;
82 uint32_t nodeid;
83 uint32_t pid;
84} dfsm_sync_epoch_t;
85
86typedef struct {
87 dfsm_message_header_t base;
88 dfsm_sync_epoch_t epoch;
89} dfsm_message_state_header_t;
90
91typedef struct {
92 dfsm_message_header_t base;
93 uint64_t count;
94} dfsm_message_normal_header_t;
95
96typedef struct {
97 uint32_t nodeid;
98 uint32_t pid;
99 uint64_t msg_count;
100 void *msg;
101 int msg_len; // fixme: unsigned?
102} dfsm_queued_message_t;
103
104struct dfsm {
6392e29a 105 const char *log_domain;
fe000966
DM
106 cpg_callbacks_t *cpg_callbacks;
107 dfsm_callbacks_t *dfsm_callbacks;
108 cpg_handle_t cpg_handle;
109 struct cpg_name cpg_group_name;
110 uint32_t nodeid;
111 uint32_t pid;
112 int we_are_member;
113
114 guint32 protocol_version;
115 gpointer data;
116
117 gboolean joined;
118
119 /* mode is protected with mode_mutex */
120 GMutex *mode_mutex;
121 dfsm_mode_t mode;
122
123 GHashTable *members; /* contains dfsm_node_info_t pointers */
124 dfsm_sync_info_t *sync_info;
125 uint32_t local_epoch_counter;
126 dfsm_sync_epoch_t sync_epoch;
127 uint32_t lowest_nodeid;
128 GSequence *msg_queue;
129 GList *sync_queue;
130
131 /* synchrounous message transmission, protected with sync_mutex */
132 GMutex *sync_mutex;
133 GCond *sync_cond;
134 GHashTable *results;
135 uint64_t msgcount;
136 uint64_t msgcount_rcvd;
137
138 /* state verification */
139 guchar csum[32];
140 dfsm_sync_epoch_t csum_epoch;
141 uint64_t csum_id;
142 uint64_t csum_counter;
143};
144
145static gboolean dfsm_deliver_queue(dfsm_t *dfsm);
146static gboolean dfsm_deliver_sync_queue(dfsm_t *dfsm);
147
148gboolean
149dfsm_nodeid_is_local(
150 dfsm_t *dfsm,
151 uint32_t nodeid,
152 uint32_t pid)
153{
154 g_return_val_if_fail(dfsm != NULL, FALSE);
155
156 return (nodeid == dfsm->nodeid && pid == dfsm->pid);
157}
158
159
160static void
161dfsm_send_sync_message_abort(dfsm_t *dfsm)
162{
163 g_return_if_fail(dfsm != NULL);
164 g_return_if_fail(dfsm->sync_mutex != NULL);
165 g_return_if_fail(dfsm->sync_cond != NULL);
166
167 g_mutex_lock (dfsm->sync_mutex);
168 dfsm->msgcount_rcvd = dfsm->msgcount;
169 g_cond_broadcast (dfsm->sync_cond);
170 g_mutex_unlock (dfsm->sync_mutex);
171}
172
173static void
174dfsm_record_local_result(
175 dfsm_t *dfsm,
176 uint64_t msg_count,
177 int msg_result,
178 gboolean processed)
179{
180 g_return_if_fail(dfsm != NULL);
181 g_return_if_fail(dfsm->sync_mutex != NULL);
182 g_return_if_fail(dfsm->sync_cond != NULL);
183 g_return_if_fail(dfsm->results != NULL);
184
185 g_mutex_lock (dfsm->sync_mutex);
186 dfsm_result_t *rp = (dfsm_result_t *)g_hash_table_lookup(dfsm->results, &msg_count);
187 if (rp) {
188 rp->result = msg_result;
189 rp->processed = processed;
190 }
191 dfsm->msgcount_rcvd = msg_count;
192 g_cond_broadcast (dfsm->sync_cond);
193 g_mutex_unlock (dfsm->sync_mutex);
194}
195
196static cpg_error_t
197dfsm_send_message_full(
198 dfsm_t *dfsm,
199 struct iovec *iov,
200 unsigned int len,
201 int retry)
202{
203 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
204 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
205
206 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
207 cpg_error_t result;
208 int retries = 0;
209loop:
210 result = cpg_mcast_joined(dfsm->cpg_handle, CPG_TYPE_AGREED, iov, len);
211 if (retry && result == CPG_ERR_TRY_AGAIN) {
212 nanosleep(&tvreq, NULL);
213 ++retries;
214 if ((retries % 10) == 0)
215 cfs_dom_message(dfsm->log_domain, "cpg_send_message retry %d", retries);
216 if (retries < 100)
217 goto loop;
218 }
219
220 if (retries)
221 cfs_dom_message(dfsm->log_domain, "cpg_send_message retried %d times", retries);
222
223 if (result != CS_OK &&
224 (!retry || result != CPG_ERR_TRY_AGAIN))
225 cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
226
227 return result;
228}
229
230static cpg_error_t
231dfsm_send_state_message_full(
232 dfsm_t *dfsm,
233 uint16_t type,
234 struct iovec *iov,
235 unsigned int len)
236{
237 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
238 g_return_val_if_fail(DFSM_VALID_STATE_MESSAGE(type), CS_ERR_INVALID_PARAM);
239 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
240
241 dfsm_message_state_header_t header;
242 header.base.type = type;
243 header.base.subtype = 0;
244 header.base.protocol_version = dfsm->protocol_version;
245 header.base.time = time(NULL);
246 header.base.reserved = 0;
247
248 header.epoch = dfsm->sync_epoch;
249
250 struct iovec real_iov[len + 1];
251
252 real_iov[0].iov_base = (char *)&header;
253 real_iov[0].iov_len = sizeof(header);
254
255 for (int i = 0; i < len; i++)
256 real_iov[i + 1] = iov[i];
257
258 return dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
259}
260
261cpg_error_t
262dfsm_send_update(
263 dfsm_t *dfsm,
264 struct iovec *iov,
265 unsigned int len)
266{
267 return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE, iov, len);
268}
269
270cpg_error_t
271dfsm_send_update_complete(dfsm_t *dfsm)
272{
273 return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE_COMPLETE, NULL, 0);
274}
275
276
277cpg_error_t
278dfsm_send_message(
279 dfsm_t *dfsm,
280 uint16_t msgtype,
281 struct iovec *iov,
282 int len)
283{
284 return dfsm_send_message_sync(dfsm, msgtype, iov, len, NULL);
285}
286
287cpg_error_t
288dfsm_send_message_sync(
289 dfsm_t *dfsm,
290 uint16_t msgtype,
291 struct iovec *iov,
292 int len,
293 dfsm_result_t *rp)
294{
295 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
296 g_return_val_if_fail(dfsm->sync_mutex != NULL, CS_ERR_INVALID_PARAM);
297 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
298
299 g_mutex_lock (dfsm->sync_mutex);
300 /* note: hold lock until message is sent - to guarantee ordering */
301 uint64_t msgcount = ++dfsm->msgcount;
302 if (rp) {
303 rp->msgcount = msgcount;
304 rp->processed = 0;
305 g_hash_table_replace(dfsm->results, &rp->msgcount, rp);
306 }
307
308 dfsm_message_normal_header_t header;
309 header.base.type = DFSM_MESSAGE_NORMAL;
310 header.base.subtype = msgtype;
311 header.base.protocol_version = dfsm->protocol_version;
312 header.base.time = time(NULL);
313 header.base.reserved = 0;
314 header.count = msgcount;
315
316 struct iovec real_iov[len + 1];
317
318 real_iov[0].iov_base = (char *)&header;
319 real_iov[0].iov_len = sizeof(header);
320
321 for (int i = 0; i < len; i++)
322 real_iov[i + 1] = iov[i];
323
324 cpg_error_t result = dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
325
326 g_mutex_unlock (dfsm->sync_mutex);
327
328 if (result != CS_OK) {
329 cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
330
331 if (rp) {
332 g_mutex_lock (dfsm->sync_mutex);
333 g_hash_table_remove(dfsm->results, &rp->msgcount);
334 g_mutex_unlock (dfsm->sync_mutex);
335 }
336 return result;
337 }
338
339 if (rp) {
340 g_mutex_lock (dfsm->sync_mutex);
341
342 while (dfsm->msgcount_rcvd < msgcount)
343 g_cond_wait (dfsm->sync_cond, dfsm->sync_mutex);
344
345
346 g_hash_table_remove(dfsm->results, &rp->msgcount);
347
348 g_mutex_unlock (dfsm->sync_mutex);
349
350 return rp->processed ? CS_OK : CS_ERR_FAILED_OPERATION;
351 }
352
353 return CS_OK;
354}
355
356static gboolean
357dfsm_send_checksum(dfsm_t *dfsm)
358{
359 g_return_val_if_fail(dfsm != NULL, FALSE);
360
361 int len = 2;
362 struct iovec iov[len];
363
364 iov[0].iov_base = (char *)&dfsm->csum_id;
365 iov[0].iov_len = sizeof(dfsm->csum_id);
366 iov[1].iov_base = dfsm->csum;
367 iov[1].iov_len = sizeof(dfsm->csum);
368
369 gboolean res = (dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY, iov, len) == CS_OK);
370
371 return res;
372}
373
374static void
375dfsm_free_queue_entry(gpointer data)
376{
377 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)data;
378 g_free (qm->msg);
379 g_free (qm);
380}
381
382static void
383dfsm_free_message_queue(dfsm_t *dfsm)
384{
385 g_return_if_fail(dfsm != NULL);
386 g_return_if_fail(dfsm->msg_queue != NULL);
387
388 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
389 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
390 while (iter != end) {
391 GSequenceIter *cur = iter;
392 iter = g_sequence_iter_next(iter);
393 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
394 g_sequence_get(cur);
395 dfsm_free_queue_entry(qm);
396 g_sequence_remove(cur);
397 }
398}
399
400static void
401dfsm_free_sync_queue(dfsm_t *dfsm)
402{
403 g_return_if_fail(dfsm != NULL);
404
405 GList *iter = dfsm->sync_queue;
406 while (iter) {
407 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data;
408 iter = g_list_next(iter);
409 dfsm_free_queue_entry(qm);
410 }
411
412 g_list_free(dfsm->sync_queue);
413 dfsm->sync_queue = NULL;
414}
415
416static gint
417message_queue_sort_fn(
418 gconstpointer a,
419 gconstpointer b,
420 gpointer user_data)
421{
422 return ((dfsm_queued_message_t *)a)->msg_count -
423 ((dfsm_queued_message_t *)b)->msg_count;
424}
425
426static dfsm_node_info_t *
427dfsm_node_info_lookup(
428 dfsm_t *dfsm,
429 uint32_t nodeid,
430 uint32_t pid)
431{
432 g_return_val_if_fail(dfsm != NULL, NULL);
433 g_return_val_if_fail(dfsm->members != NULL, NULL);
434
435 dfsm_node_info_t info = { .nodeid = nodeid, .pid = pid };
436
437 return (dfsm_node_info_t *)g_hash_table_lookup(dfsm->members, &info);
438}
439
440static dfsm_queued_message_t *
441dfsm_queue_add_message(
442 dfsm_t *dfsm,
443 uint32_t nodeid,
444 uint32_t pid,
445 uint64_t msg_count,
446 const void *msg,
447 size_t msg_len)
448{
449 g_return_val_if_fail(dfsm != NULL, NULL);
450 g_return_val_if_fail(msg != NULL, NULL);
451 g_return_val_if_fail(msg_len != 0, NULL);
452
453 dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, nodeid, pid);
454 if (!ni) {
455 cfs_dom_critical(dfsm->log_domain, "dfsm_node_info_lookup failed");
456 return NULL;
457 }
458
459 dfsm_queued_message_t *qm = g_new0(dfsm_queued_message_t, 1);
460 g_return_val_if_fail(qm != NULL, NULL);
461
462 qm->nodeid = nodeid;
463 qm->pid = pid;
464 qm->msg = g_memdup (msg, msg_len);
465 qm->msg_len = msg_len;
466 qm->msg_count = msg_count;
467
468 if (dfsm->mode == DFSM_MODE_UPDATE && ni->synced) {
469 dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm);
470 } else {
471 /* NOTE: we only need to sort the queue because we resend all
472 * queued messages sometimes.
473 */
474 g_sequence_insert_sorted(dfsm->msg_queue, qm, message_queue_sort_fn, NULL);
475 }
476
477 return qm;
478}
479
480static guint
481dfsm_sync_info_hash(gconstpointer key)
482{
483 dfsm_node_info_t *info = (dfsm_node_info_t *)key;
484
485 return g_int_hash(&info->nodeid) + g_int_hash(&info->pid);
486}
487
488static gboolean
489dfsm_sync_info_equal(
490 gconstpointer v1,
491 gconstpointer v2)
492{
493 dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1;
494 dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2;
495
496 if (info1->nodeid == info2->nodeid &&
497 info1->pid == info2->pid)
498 return TRUE;
499
500 return FALSE;
501}
502
503static int
504dfsm_sync_info_compare(
505 gconstpointer v1,
506 gconstpointer v2)
507{
508 dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1;
509 dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2;
510
511 if (info1->nodeid != info2->nodeid)
512 return info1->nodeid - info2->nodeid;
513
514 return info1->pid - info2->pid;
515}
516
517static void
518dfsm_set_mode(
519 dfsm_t *dfsm,
520 dfsm_mode_t new_mode)
521{
522 g_return_if_fail(dfsm != NULL);
523
524 cfs_debug("dfsm_set_mode - set mode to %d", new_mode);
525
526 int changed = 0;
527 g_mutex_lock (dfsm->mode_mutex);
528 if (dfsm->mode != new_mode) {
529 if (new_mode < DFSM_ERROR_MODE_START ||
530 (dfsm->mode < DFSM_ERROR_MODE_START || new_mode >= dfsm->mode)) {
531 dfsm->mode = new_mode;
532 changed = 1;
533 }
534 }
535 g_mutex_unlock (dfsm->mode_mutex);
536
537 if (!changed)
538 return;
539
540 if (new_mode == DFSM_MODE_START) {
541 cfs_dom_message(dfsm->log_domain, "start cluster connection");
542 } else if (new_mode == DFSM_MODE_START_SYNC) {
543 cfs_dom_message(dfsm->log_domain, "starting data syncronisation");
544 } else if (new_mode == DFSM_MODE_SYNCED) {
545 cfs_dom_message(dfsm->log_domain, "all data is up to date");
546 if (dfsm->dfsm_callbacks->dfsm_synced_fn)
547 dfsm->dfsm_callbacks->dfsm_synced_fn(dfsm);
548 } else if (new_mode == DFSM_MODE_UPDATE) {
549 cfs_dom_message(dfsm->log_domain, "waiting for updates from leader");
550 } else if (new_mode == DFSM_MODE_LEAVE) {
551 cfs_dom_critical(dfsm->log_domain, "leaving CPG group");
552 } else if (new_mode == DFSM_MODE_ERROR) {
553 cfs_dom_critical(dfsm->log_domain, "serious internal error - stop cluster connection");
554 } else if (new_mode == DFSM_MODE_VERSION_ERROR) {
555 cfs_dom_critical(dfsm->log_domain, "detected newer protocol - please update this node");
556 }
557}
558
559static dfsm_mode_t
560dfsm_get_mode(dfsm_t *dfsm)
561{
562 g_return_val_if_fail(dfsm != NULL, DFSM_MODE_ERROR);
563
564 g_mutex_lock (dfsm->mode_mutex);
565 dfsm_mode_t mode = dfsm->mode;
566 g_mutex_unlock (dfsm->mode_mutex);
567
568 return mode;
569}
570
571gboolean
572dfsm_restartable(dfsm_t *dfsm)
573{
574 dfsm_mode_t mode = dfsm_get_mode(dfsm);
575
576 return !(mode == DFSM_MODE_ERROR ||
577 mode == DFSM_MODE_VERSION_ERROR);
578}
579
580void
581dfsm_set_errormode(dfsm_t *dfsm)
582{
583 dfsm_set_mode(dfsm, DFSM_MODE_ERROR);
584}
585
586static void
587dfsm_release_sync_resources(
588 dfsm_t *dfsm,
589 const struct cpg_address *member_list,
590 size_t member_list_entries)
591{
592 g_return_if_fail(dfsm != NULL);
593 g_return_if_fail(dfsm->members != NULL);
594 g_return_if_fail(!member_list_entries || member_list != NULL);
595
596 cfs_debug("enter dfsm_release_sync_resources");
597
598 if (dfsm->sync_info) {
599
600 if (dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn) {
601 dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info);
602 dfsm->sync_info->data = NULL;
603 }
604
605 for (int i = 0; i < dfsm->sync_info->node_count; i++) {
606 if (dfsm->sync_info->nodes[i].state) {
607 g_free(dfsm->sync_info->nodes[i].state);
608 dfsm->sync_info->nodes[i].state = NULL;
609 dfsm->sync_info->nodes[i].state_len = 0;
610 }
611 }
612 }
613
614 if (member_list) {
615
616 g_hash_table_remove_all(dfsm->members);
617
618 if (dfsm->sync_info)
619 g_free(dfsm->sync_info);
620
621 int size = sizeof(dfsm_sync_info_t) +
622 member_list_entries*sizeof(dfsm_sync_info_t);
623 dfsm_sync_info_t *sync_info = dfsm->sync_info = g_malloc0(size);
624 sync_info->node_count = member_list_entries;
625
626 for (int i = 0; i < member_list_entries; i++) {
627 sync_info->nodes[i].nodeid = member_list[i].nodeid;
628 sync_info->nodes[i].pid = member_list[i].pid;
629 }
630
631 qsort(sync_info->nodes, member_list_entries, sizeof(dfsm_node_info_t),
632 dfsm_sync_info_compare);
633
634 for (int i = 0; i < member_list_entries; i++) {
635 dfsm_node_info_t *info = &sync_info->nodes[i];
636 g_hash_table_insert(dfsm->members, info, info);
637 if (info->nodeid == dfsm->nodeid && info->pid == dfsm->pid)
638 sync_info->local = info;
639 }
640 }
641}
642
643static void
644dfsm_cpg_deliver_callback(
645 cpg_handle_t handle,
646 const struct cpg_name *group_name,
647 uint32_t nodeid,
648 uint32_t pid,
649 void *msg,
650 size_t msg_len)
651{
652 cs_error_t result;
653
654 dfsm_t *dfsm = NULL;
655 result = cpg_context_get(handle, (gpointer *)&dfsm);
656 if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
657 cfs_critical("cpg_context_get error: %d (%p)", result, dfsm);
658 return; /* we have no valid dfsm pointer, so we can just ignore this */
659 }
660 dfsm_mode_t mode = dfsm_get_mode(dfsm);
661
662 cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode);
663
664 if (mode >= DFSM_ERROR_MODE_START) {
665 cfs_dom_debug(dfsm->log_domain, "error mode - ignoring message");
666 goto leave;
667 }
668
669 if (!dfsm->sync_info) {
670 cfs_dom_critical(dfsm->log_domain, "no dfsm_sync_info - internal error");
671 goto leave;
672 }
673
674 if (msg_len < sizeof(dfsm_message_header_t)) {
675 cfs_dom_critical(dfsm->log_domain, "received short message (%ld bytes)", msg_len);
676 goto leave;
677 }
678
679 dfsm_message_header_t *base_header = (dfsm_message_header_t *)msg;
680
681 if (base_header->protocol_version > dfsm->protocol_version) {
682 cfs_dom_critical(dfsm->log_domain, "received message with protocol version %d",
683 base_header->protocol_version);
684 dfsm_set_mode(dfsm, DFSM_MODE_VERSION_ERROR);
685 return;
686 } else if (base_header->protocol_version < dfsm->protocol_version) {
687 cfs_dom_message(dfsm->log_domain, "ignore message with wrong protocol version %d",
688 base_header->protocol_version);
689 return;
690 }
691
692 if (base_header->type == DFSM_MESSAGE_NORMAL) {
693
694 dfsm_message_normal_header_t *header = (dfsm_message_normal_header_t *)msg;
695
696 if (msg_len < sizeof(dfsm_message_normal_header_t)) {
697 cfs_dom_critical(dfsm->log_domain, "received short message (type = %d, subtype = %d, %ld bytes)",
698 base_header->type, base_header->subtype, msg_len);
699 goto leave;
700 }
701
702 if (mode != DFSM_MODE_SYNCED) {
703 cfs_dom_debug(dfsm->log_domain, "queue message %zu (subtype = %d, length = %ld)",
704 header->count, base_header->subtype, msg_len);
705
706 if (!dfsm_queue_add_message(dfsm, nodeid, pid, header->count, msg, msg_len))
707 goto leave;
708 } else {
709
710 int msg_res = -1;
711 int res = dfsm->dfsm_callbacks->dfsm_deliver_fn(
712 dfsm, dfsm->data, &msg_res, nodeid, pid, base_header->subtype,
713 base_header->time, msg + sizeof(dfsm_message_normal_header_t),
714 msg_len - sizeof(dfsm_message_normal_header_t));
715
716 if (nodeid == dfsm->nodeid && pid == dfsm->pid)
717 dfsm_record_local_result(dfsm, header->count, msg_res, res);
718
719 if (res < 0)
720 goto leave;
721 }
722
723 return;
724 }
725
726 /* state related messages
727 * we needs right epoch - else we simply discard the message
728 */
729
730 dfsm_message_state_header_t *header = (dfsm_message_state_header_t *)msg;
731
732 if (msg_len < sizeof(dfsm_message_state_header_t)) {
733 cfs_dom_critical(dfsm->log_domain, "received short state message (type = %d, subtype = %d, %ld bytes)",
734 base_header->type, base_header->subtype, msg_len);
735 goto leave;
736 }
737
738 if (base_header->type != DFSM_MESSAGE_SYNC_START &&
739 (memcmp(&header->epoch, &dfsm->sync_epoch, sizeof(dfsm_sync_epoch_t)) != 0)) {
740 cfs_dom_debug(dfsm->log_domain, "ignore message (msg_type == %d) with "
741 "wrong epoch (epoch %d/%d/%08X)", base_header->type,
742 header->epoch.nodeid, header->epoch.pid, header->epoch.epoch);
743 return;
744 }
745
746 msg += sizeof(dfsm_message_state_header_t);
747 msg_len -= sizeof(dfsm_message_state_header_t);
748
749 if (mode == DFSM_MODE_SYNCED) {
750 if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) {
751
752 for (int i = 0; i < dfsm->sync_info->node_count; i++)
753 dfsm->sync_info->nodes[i].synced = 1;
754
755 if (!dfsm_deliver_queue(dfsm))
756 goto leave;
757
758 return;
759
760 } else if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST) {
761
762 if (msg_len != sizeof(dfsm->csum_counter)) {
763 cfs_dom_critical(dfsm->log_domain, "cpg received verify request with wrong length (%ld bytes) form node %d/%d", msg_len, nodeid, pid);
764 goto leave;
765 }
766
767 uint64_t csum_id = *((uint64_t *)msg);
768 msg += 8; msg_len -= 8;
769
770 cfs_dom_debug(dfsm->log_domain, "got verify request from node %d %016zX", nodeid, csum_id);
771
772 if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
773 if (!dfsm->dfsm_callbacks->dfsm_checksum_fn(
774 dfsm, dfsm->data, dfsm->csum, sizeof(dfsm->csum))) {
775 cfs_dom_critical(dfsm->log_domain, "unable to compute data checksum");
776 goto leave;
777 }
778
779 dfsm->csum_epoch = header->epoch;
780 dfsm->csum_id = csum_id;
781
782 if (nodeid == dfsm->nodeid && pid == dfsm->pid) {
783 if (!dfsm_send_checksum(dfsm))
784 goto leave;
785 }
786 }
787
788 return;
789
790 } else if (base_header->type == DFSM_MESSAGE_VERIFY) {
791
792 cfs_dom_debug(dfsm->log_domain, "received verify message");
793
794 if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
795
796 if (msg_len != (sizeof(dfsm->csum_id) + sizeof(dfsm->csum))) {
797 cfs_dom_critical(dfsm->log_domain, "cpg received verify message with wrong length (%ld bytes)", msg_len);
798 goto leave;
799 }
800
801 uint64_t csum_id = *((uint64_t *)msg);
802 msg += 8; msg_len -= 8;
803
804 if (dfsm->csum_id == csum_id &&
805 (memcmp(&dfsm->csum_epoch, &header->epoch, sizeof(dfsm_sync_epoch_t)) == 0)) {
806 if (memcmp(msg, dfsm->csum, sizeof(dfsm->csum)) != 0) {
807 cfs_dom_critical(dfsm->log_domain, "wrong checksum %016zX != %016zX - restarting",
808 *(uint64_t *)msg, *(uint64_t *)dfsm->csum);
809 goto leave;
810 } else {
811 cfs_dom_message(dfsm->log_domain, "data verification successful");
812 }
813 } else {
814 cfs_dom_message(dfsm->log_domain, "skip verification - no checksum saved");
815 }
816 }
817
818 return;
819
820 } else {
821 /* ignore (we already got all required updates, or we are leader) */
822 cfs_dom_debug(dfsm->log_domain, "ignore state sync message %d",
823 base_header->type);
824 return;
825 }
826
827 } else if (mode == DFSM_MODE_START_SYNC) {
828
829 if (base_header->type == DFSM_MESSAGE_SYNC_START) {
830
831 if (nodeid != dfsm->lowest_nodeid) {
832 cfs_dom_critical(dfsm->log_domain, "ignore sync request from wrong member %d/%d",
833 nodeid, pid);
834 }
835
836 cfs_dom_message(dfsm->log_domain, "received sync request (epoch %d/%d/%08X)",
837 header->epoch.nodeid, header->epoch.pid, header->epoch.epoch);
838
839 dfsm->sync_epoch = header->epoch;
840
841 dfsm_release_sync_resources(dfsm, NULL, 0);
842
843 unsigned int state_len = 0;
844 gpointer state = NULL;
845
846 state = dfsm->dfsm_callbacks->dfsm_get_state_fn(dfsm, dfsm->data, &state_len);
847
848 if (!(state && state_len)) {
849 cfs_dom_critical(dfsm->log_domain, "dfsm_get_state_fn failed");
850 goto leave;
851 }
852
853 struct iovec iov[1];
854 iov[0].iov_base = state;
855 iov[0].iov_len = state_len;
856
857 result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_STATE, iov, 1);
858
859 if (state)
860 g_free(state);
861
862 if (result != CS_OK)
863 goto leave;
864
865 return;
866
867 } else if (base_header->type == DFSM_MESSAGE_STATE) {
868
869 dfsm_node_info_t *ni;
870
871 if (!(ni = dfsm_node_info_lookup(dfsm, nodeid, pid))) {
872 cfs_dom_critical(dfsm->log_domain, "received state for non-member %d/%d", nodeid, pid);
873 goto leave;
874 }
875
876 if (ni->state) {
877 cfs_dom_critical(dfsm->log_domain, "received duplicate state for member %d/%d", nodeid, pid);
878 goto leave;
879 }
880
881 ni->state = g_memdup(msg, msg_len);
882 ni->state_len = msg_len;
883
884 int received_all = 1;
885 for (int i = 0; i < dfsm->sync_info->node_count; i++) {
886 if (!dfsm->sync_info->nodes[i].state) {
887 received_all = 0;
888 break;
889 }
890 }
891
892 if (received_all) {
893 cfs_dom_message(dfsm->log_domain, "received all states");
894
895 int res = dfsm->dfsm_callbacks->dfsm_process_state_update_fn(dfsm, dfsm->data, dfsm->sync_info);
896 if (res < 0)
897 goto leave;
898
899 if (dfsm->sync_info->local->synced) {
900 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
901 dfsm_release_sync_resources(dfsm, NULL, 0);
902
903 if (!dfsm_deliver_queue(dfsm))
904 goto leave;
905
906 } else {
907 dfsm_set_mode(dfsm, DFSM_MODE_UPDATE);
908
909 if (!dfsm_deliver_queue(dfsm))
910 goto leave;
911 }
912
913 }
914
915 return;
916 }
917
918 } else if (mode == DFSM_MODE_UPDATE) {
919
920 if (base_header->type == DFSM_MESSAGE_UPDATE) {
921
922 int res = dfsm->dfsm_callbacks->dfsm_process_update_fn(
923 dfsm, dfsm->data, dfsm->sync_info, nodeid, pid, msg, msg_len);
924
925 if (res < 0)
926 goto leave;
927
928 return;
929
930 } else if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) {
931
932
933 int res = dfsm->dfsm_callbacks->dfsm_commit_fn(dfsm, dfsm->data, dfsm->sync_info);
934
935 if (res < 0)
936 goto leave;
937
938 for (int i = 0; i < dfsm->sync_info->node_count; i++)
939 dfsm->sync_info->nodes[i].synced = 1;
940
941 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
942
943 if (!dfsm_deliver_sync_queue(dfsm))
944 goto leave;
945
946 if (!dfsm_deliver_queue(dfsm))
947 goto leave;
948
949 dfsm_release_sync_resources(dfsm, NULL, 0);
950
951 return;
952 }
953
954 } else {
955 cfs_dom_critical(dfsm->log_domain, "internal error - unknown mode %d", mode);
956 goto leave;
957 }
958
959 if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST ||
960 base_header->type == DFSM_MESSAGE_VERIFY) {
961
962 cfs_dom_debug(dfsm->log_domain, "ignore verify message %d while not synced", base_header->type);
963
964 } else {
965 cfs_dom_critical(dfsm->log_domain, "received unknown state message type (type = %d, %ld bytes)",
966 base_header->type, msg_len);
967 goto leave;
968 }
969
970leave:
971 dfsm_set_mode(dfsm, DFSM_MODE_LEAVE);
972 dfsm_release_sync_resources(dfsm, NULL, 0);
973 return;
974}
975
976static gboolean
977dfsm_resend_queue(dfsm_t *dfsm)
978{
979 g_return_val_if_fail(dfsm != NULL, FALSE);
980 g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE);
981
982 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
983 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
984 gboolean res = TRUE;
985
986 while (iter != end) {
987 GSequenceIter *cur = iter;
988 iter = g_sequence_iter_next(iter);
989
990 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
991 g_sequence_get(cur);
992
993 if (qm->nodeid == dfsm->nodeid && qm->pid == dfsm->pid) {
994 cpg_error_t result;
995 struct iovec iov[1];
996 iov[0].iov_base = qm->msg;
997 iov[0].iov_len = qm->msg_len;
998
999 if ((result = dfsm_send_message_full(dfsm, iov, 1, 1)) != CS_OK) {
1000 res = FALSE;
1001 break;
1002 }
1003 }
1004 }
1005
1006 dfsm_free_message_queue(dfsm);
1007
1008 return res;
1009}
1010
1011static gboolean
1012dfsm_deliver_sync_queue(dfsm_t *dfsm)
1013{
1014 g_return_val_if_fail(dfsm != NULL, FALSE);
1015
1016 if (!dfsm->sync_queue)
1017 return TRUE;
1018
1019 gboolean res = TRUE;
1020
1021 // fixme: cfs_debug
1022 cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__,
1023 g_list_length(dfsm->sync_queue));
1024
1025 GList *iter = dfsm->sync_queue;
1026 while (iter) {
1027 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data;
1028 iter = g_list_next(iter);
1029
1030 if (res && dfsm->mode == DFSM_MODE_SYNCED) {
1031 dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name,
1032 qm->nodeid, qm->pid, qm->msg, qm->msg_len);
1033 } else {
1034 res = FALSE;
1035 }
1036
1037 dfsm_free_queue_entry(qm);
1038 }
1039 g_list_free(dfsm->sync_queue);
1040 dfsm->sync_queue = NULL;
1041
1042 return res;
1043}
1044
1045static gboolean
1046dfsm_deliver_queue(dfsm_t *dfsm)
1047{
1048 g_return_val_if_fail(dfsm != NULL, FALSE);
1049 g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE);
1050
1051 int qlen = g_sequence_get_length(dfsm->msg_queue);
1052 if (!qlen)
1053 return TRUE;
1054
1055 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
1056 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
1057 gboolean res = TRUE;
1058
1059 // fixme: cfs_debug
1060 cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__, qlen);
1061
1062 while (iter != end) {
1063 GSequenceIter *cur = iter;
1064 iter = g_sequence_iter_next(iter);
1065
1066 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
1067 g_sequence_get(cur);
1068
1069 dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, qm->nodeid, qm->pid);
1070 if (!ni) {
d3eeade6 1071 cfs_dom_message(dfsm->log_domain, "remove message from non-member %d/%d",
fe000966
DM
1072 qm->nodeid, qm->pid);
1073 dfsm_free_queue_entry(qm);
1074 g_sequence_remove(cur);
1075 continue;
1076 }
1077
1078 if (dfsm->mode == DFSM_MODE_SYNCED) {
1079 if (ni->synced) {
1080 dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name,
1081 qm->nodeid, qm->pid, qm->msg, qm->msg_len);
1082 dfsm_free_queue_entry(qm);
1083 g_sequence_remove(cur);
1084 }
1085 } else if (dfsm->mode == DFSM_MODE_UPDATE) {
1086 if (ni->synced) {
1087 dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm);
1088 g_sequence_remove(cur);
1089 }
1090 } else {
1091 res = FALSE;
1092 break;
1093 }
1094 }
1095
1096 return res;
1097}
1098
1099static void
1100dfsm_cpg_confchg_callback(
1101 cpg_handle_t handle,
1102 const struct cpg_name *group_name,
1103 const struct cpg_address *member_list,
1104 size_t member_list_entries,
1105 const struct cpg_address *left_list,
1106 size_t left_list_entries,
1107 const struct cpg_address *joined_list,
1108 size_t joined_list_entries)
1109{
1110 cs_error_t result;
1111
1112 dfsm_t *dfsm = NULL;
1113 result = cpg_context_get(handle, (gpointer *)&dfsm);
1114 if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
1115 cfs_critical("cpg_context_get error: %d (%p)", result, dfsm);
1116 return; /* we have no valid dfsm pointer, so we can just ignore this */
1117 }
1118
1119 dfsm->we_are_member = 0;
1120
1121 /* create new epoch */
1122 dfsm->local_epoch_counter++;
1123 dfsm->sync_epoch.epoch = dfsm->local_epoch_counter;
1124 dfsm->sync_epoch.nodeid = dfsm->nodeid;
1125 dfsm->sync_epoch.pid = dfsm->pid;
1126 dfsm->sync_epoch.time = time(NULL);
1127
1128 /* invalidate saved checksum */
1129 dfsm->csum_id = dfsm->csum_counter;
1130 memset(&dfsm->csum_epoch, 0, sizeof(dfsm->csum_epoch));
1131
1132 dfsm_free_sync_queue(dfsm);
1133
1134 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1135
1136 cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode);
1137
1138 if (mode >= DFSM_ERROR_MODE_START) {
1139 cfs_dom_debug(dfsm->log_domain, "already left group - ignore message");
1140 return;
1141 }
1142
1143 int lowest_nodeid = 0;
1144 GString *str = g_string_new("members: ");
1145 for (int i = 0; i < member_list_entries; i++) {
1146
1147 g_string_append_printf(str, i ? ", %d/%d" : "%d/%d",
1148 member_list[i].nodeid, member_list[i].pid);
1149
1150 if (lowest_nodeid == 0 || lowest_nodeid > member_list[i].nodeid)
1151 lowest_nodeid = member_list[i].nodeid;
1152
1153 if (member_list[i].nodeid == dfsm->nodeid &&
1154 member_list[i].pid == dfsm->pid)
1155 dfsm->we_are_member = 1;
1156 }
1157
1158
1159 if ((dfsm->we_are_member || mode != DFSM_MODE_START))
1160 cfs_dom_message(dfsm->log_domain, str->str);
1161
1162 g_string_free(str, 1);
1163
1164 dfsm->lowest_nodeid = lowest_nodeid;
1165
1166 /* NOTE: one node can be in left and joined list at the same time,
1167 so it is better to query member list. Also JOIN/LEAVE list are
1168 different on different nodes!
1169 */
1170
1171 dfsm_release_sync_resources(dfsm, member_list, member_list_entries);
1172
1173 if (!dfsm->we_are_member) {
1174 if (mode == DFSM_MODE_START) {
1175 cfs_dom_debug(dfsm->log_domain, "ignore leave message");
1176 return;
1177 }
1178 cfs_dom_message(dfsm->log_domain, "we (%d/%d) left the process group",
1179 dfsm->nodeid, dfsm->pid);
1180 goto leave;
1181 }
1182
1183 if (member_list_entries > 1) {
1184
1185 int qlen = g_sequence_get_length(dfsm->msg_queue);
1186 if (joined_list_entries && qlen) {
1187 /* we need to make sure that all members have the same queue. */
1188 cfs_dom_message(dfsm->log_domain, "queue not emtpy - resening %d messages", qlen);
1189 if (!dfsm_resend_queue(dfsm)) {
1190 cfs_dom_critical(dfsm->log_domain, "dfsm_resend_queue failed");
1191 goto leave;
1192 }
1193 }
1194
1195 dfsm_set_mode(dfsm, DFSM_MODE_START_SYNC);
1196 if (lowest_nodeid == dfsm->nodeid) {
1197 if (!dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_SYNC_START, NULL, 0)) {
1198 cfs_dom_critical(dfsm->log_domain, "failed to send SYNC_START message");
1199 goto leave;
1200 }
1201 }
1202 } else {
1203 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
1204 dfsm->sync_info->local->synced = 1;
1205 if (!dfsm_deliver_queue(dfsm))
1206 goto leave;
1207 }
1208
1209 if (dfsm->dfsm_callbacks->dfsm_confchg_fn)
1210 dfsm->dfsm_callbacks->dfsm_confchg_fn(dfsm, dfsm->data, member_list, member_list_entries);
1211
1212 return;
1213leave:
1214 dfsm_set_mode(dfsm, DFSM_MODE_LEAVE);
1215 return;
1216}
1217
1218static cpg_callbacks_t cpg_callbacks = {
1219 .cpg_deliver_fn = dfsm_cpg_deliver_callback,
1220 .cpg_confchg_fn = dfsm_cpg_confchg_callback,
1221};
1222
1223dfsm_t *
1224dfsm_new(
1225 gpointer data,
1226 const char *group_name,
1227 const char *log_domain,
1228 guint32 protocol_version,
1229 dfsm_callbacks_t *callbacks)
1230{
1231 g_return_val_if_fail(sizeof(dfsm_message_header_t) == 16, NULL);
1232 g_return_val_if_fail(sizeof(dfsm_message_state_header_t) == 32, NULL);
1233 g_return_val_if_fail(sizeof(dfsm_message_normal_header_t) == 24, NULL);
1234
1235 g_return_val_if_fail(callbacks != NULL, NULL);
1236 g_return_val_if_fail(callbacks->dfsm_deliver_fn != NULL, NULL);
1237
1238 g_return_val_if_fail(callbacks->dfsm_get_state_fn != NULL, NULL);
1239 g_return_val_if_fail(callbacks->dfsm_process_state_update_fn != NULL, NULL);
1240 g_return_val_if_fail(callbacks->dfsm_process_update_fn != NULL, NULL);
1241 g_return_val_if_fail(callbacks->dfsm_commit_fn != NULL, NULL);
1242
1243 dfsm_t *dfsm;
1244
1245 if ((dfsm = g_new0(dfsm_t, 1)) == NULL)
1246 return NULL;
1247
1248 if (!(dfsm->sync_mutex = g_mutex_new()))
1249 goto err;
1250
1251 if (!(dfsm->sync_cond = g_cond_new()))
1252 goto err;
1253
1254 if (!(dfsm->results = g_hash_table_new(g_int64_hash, g_int64_equal)))
1255 goto err;
1256
1257 if (!(dfsm->msg_queue = g_sequence_new(NULL)))
1258 goto err;
1259
6392e29a 1260 dfsm->log_domain = log_domain;
fe000966
DM
1261 dfsm->data = data;
1262 dfsm->mode = DFSM_MODE_START;
1263 dfsm->protocol_version = protocol_version;
1264 strcpy (dfsm->cpg_group_name.value, group_name);
1265 dfsm->cpg_group_name.length = strlen (group_name) + 1;
1266
1267 dfsm->cpg_callbacks = &cpg_callbacks;
1268 dfsm->dfsm_callbacks = callbacks;
1269
1270 dfsm->members = g_hash_table_new(dfsm_sync_info_hash, dfsm_sync_info_equal);
1271 if (!dfsm->members)
1272 goto err;
1273
1274 if ((dfsm->mode_mutex = g_mutex_new()) == NULL)
1275 goto err;
1276
1277 return dfsm;
1278
1279err:
1280 dfsm_destroy(dfsm);
1281 return NULL;
1282}
1283
1284gboolean
1285dfsm_lowest_nodeid(dfsm_t *dfsm)
1286{
1287 g_return_val_if_fail(dfsm != NULL, FALSE);
1288
1289 if (dfsm->lowest_nodeid && (dfsm->lowest_nodeid == dfsm->nodeid))
1290 return TRUE;
1291
1292 return FALSE;
1293}
1294
1295cs_error_t
1296dfsm_verify_request(dfsm_t *dfsm)
1297{
1298 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1299
1300 /* only do when we have lowest nodeid */
1301 if (!dfsm->lowest_nodeid || (dfsm->lowest_nodeid != dfsm->nodeid))
1302 return CS_OK;
1303
1304 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1305 if (mode != DFSM_MODE_SYNCED)
1306 return CS_OK;
1307
1308 int len = 1;
1309 struct iovec iov[len];
1310
1311 if (dfsm->csum_counter != dfsm->csum_id) {
1312 g_message("delay verify request %016zX", dfsm->csum_counter + 1);
1313 return CS_OK;
1314 };
1315
1316 dfsm->csum_counter++;
1317 iov[0].iov_base = (char *)&dfsm->csum_counter;
1318 iov[0].iov_len = sizeof(dfsm->csum_counter);
1319
1320 cfs_debug("send verify request %016zX", dfsm->csum_counter);
1321
1322 cs_error_t result;
1323 result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY_REQUEST, iov, len);
1324
1325 if (result != CS_OK)
1326 cfs_dom_critical(dfsm->log_domain, "failed to send VERIFY_REQUEST message");
1327
1328 return result;
1329}
1330
1331
1332cs_error_t
1333dfsm_dispatch(
1334 dfsm_t *dfsm,
1335 cs_dispatch_flags_t dispatch_types)
1336{
1337 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1338 g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_INVALID_PARAM);
1339
1340 cs_error_t result;
1341
1342 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1343 int retries = 0;
1344loop:
1345 result = cpg_dispatch(dfsm->cpg_handle, dispatch_types);
1346 if (result == CPG_ERR_TRY_AGAIN) {
1347 nanosleep(&tvreq, NULL);
1348 ++retries;
1349 if ((retries % 10) == 0)
1350 cfs_dom_message(dfsm->log_domain, "cpg_dispatch retry %d", retries);
1351 goto loop;
1352 }
1353
1354 if (!(result == CS_OK || result == CS_ERR_TRY_AGAIN)) {
1355 cfs_dom_critical(dfsm->log_domain, "cpg_dispatch failed: %d", result);
1356 }
1357
1358 return result;
1359}
1360
1361
1362cs_error_t
1363dfsm_initialize(dfsm_t *dfsm, int *fd)
1364{
1365 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1366 g_return_val_if_fail(fd != NULL, CS_ERR_INVALID_PARAM);
1367
1368 /* remove old messages */
1369 dfsm_free_message_queue(dfsm);
1370 dfsm_send_sync_message_abort(dfsm);
1371
1372 dfsm->joined = FALSE;
1373 dfsm->we_are_member = 0;
1374
1375 dfsm_set_mode(dfsm, DFSM_MODE_START);
1376
1377 cs_error_t result;
1378
1379 if (dfsm->cpg_handle == 0) {
1380 if ((result = cpg_initialize(&dfsm->cpg_handle, dfsm->cpg_callbacks)) != CS_OK) {
1381 cfs_dom_critical(dfsm->log_domain, "cpg_initialize failed: %d", result);
1382 dfsm->cpg_handle = 0;
1383 goto fail;
1384 }
1385
1386 if ((result = cpg_local_get(dfsm->cpg_handle, &dfsm->nodeid)) != CS_OK) {
1387 cfs_dom_critical(dfsm->log_domain, "cpg_local_get failed: %d", result);
1388 goto fail;
1389 }
1390
1391 dfsm->pid = getpid();
1392
1393 result = cpg_context_set(dfsm->cpg_handle, dfsm);
1394 if (result != CS_OK) {
1395 cfs_dom_critical(dfsm->log_domain, "cpg_context_set failed: %d", result);
1396 goto fail;
1397 }
1398 }
1399
1400 result = cpg_fd_get(dfsm->cpg_handle, fd);
1401 if (result != CS_OK) {
1402 cfs_dom_critical(dfsm->log_domain, "cpg_fd_get failed: %d", result);
1403 goto fail;
1404 }
1405
1406 return CS_OK;
1407
1408fail:
1409 if (dfsm->cpg_handle)
1410 cpg_finalize(dfsm->cpg_handle);
1411 dfsm->cpg_handle = 0;
1412 return result;
1413}
1414
1415cs_error_t
1416dfsm_join(dfsm_t *dfsm)
1417{
1418 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1419 g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_LIBRARY);
1420 g_return_val_if_fail(dfsm->joined == 0, CS_ERR_EXIST);
1421
1422 cs_error_t result;
1423
1424 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1425 int retries = 0;
1426loop:
1427 result = cpg_join(dfsm->cpg_handle, &dfsm->cpg_group_name);
1428 if (result == CPG_ERR_TRY_AGAIN) {
1429 nanosleep(&tvreq, NULL);
1430 ++retries;
1431 if ((retries % 10) == 0)
1432 cfs_dom_message(dfsm->log_domain, "cpg_join retry %d", retries);
1433 goto loop;
1434 }
1435
1436 if (result != CS_OK) {
1437 cfs_dom_critical(dfsm->log_domain, "cpg_join failed: %d", result);
1438 return result;
1439 }
1440
1441 dfsm->joined = TRUE;
1442 return TRUE;
1443}
1444
1445cs_error_t
1446dfsm_leave (dfsm_t *dfsm)
1447{
1448 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1449 g_return_val_if_fail(dfsm->joined, CS_ERR_NOT_EXIST);
1450
1451 cs_error_t result;
1452
1453 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1454 int retries = 0;
1455loop:
1456 result = cpg_leave(dfsm->cpg_handle, &dfsm->cpg_group_name);
1457 if (result == CPG_ERR_TRY_AGAIN) {
1458 nanosleep(&tvreq, NULL);
1459 ++retries;
1460 if ((retries % 10) == 0)
1461 cfs_dom_message(dfsm->log_domain, "cpg_leave retry %d", retries);
1462 goto loop;
1463 }
1464
1465 if (result != CS_OK) {
1466 cfs_dom_critical(dfsm->log_domain, "cpg_leave failed: %d", result);
1467 return result;
1468 }
1469
1470 dfsm->joined = FALSE;
1471
1472 return TRUE;
1473}
1474
1475gboolean
1476dfsm_finalize(dfsm_t *dfsm)
1477{
1478 g_return_val_if_fail(dfsm != NULL, FALSE);
1479
1480 dfsm_send_sync_message_abort(dfsm);
1481
1482 if (dfsm->joined)
1483 dfsm_leave(dfsm);
1484
1485 if (dfsm->cpg_handle) {
1486 cpg_finalize(dfsm->cpg_handle);
1487 dfsm->cpg_handle = 0;
1488 dfsm->joined = FALSE;
1489 dfsm->we_are_member = 0;
1490 }
1491
1492 return TRUE;
1493}
1494
1495void
1496dfsm_destroy(dfsm_t *dfsm)
1497{
1498 g_return_if_fail(dfsm != NULL);
1499
1500 dfsm_finalize(dfsm);
1501
1502 if (dfsm->sync_info && dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn)
1503 dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info);
1504
1505 dfsm_free_sync_queue(dfsm);
1506
1507 if (dfsm->mode_mutex)
1508 g_mutex_free (dfsm->mode_mutex);
1509
1510 if (dfsm->sync_mutex)
1511 g_mutex_free (dfsm->sync_mutex);
1512
1513 if (dfsm->sync_cond)
1514 g_cond_free (dfsm->sync_cond);
1515
1516 if (dfsm->results)
1517 g_hash_table_destroy(dfsm->results);
1518
1519 if (dfsm->msg_queue) {
1520 dfsm_free_message_queue(dfsm);
1521 g_sequence_free(dfsm->msg_queue);
1522 }
1523
1524 if (dfsm->sync_info)
1525 g_free(dfsm->sync_info);
1526
1527 if (dfsm->cpg_handle)
1528 cpg_finalize(dfsm->cpg_handle);
1529
1530 if (dfsm->members)
1531 g_hash_table_destroy(dfsm->members);
1532
fe000966
DM
1533 g_free(dfsm);
1534}
1535
1536typedef struct {
1537 dfsm_t *dfsm;
1538} service_dfsm_private_t;
1539
1540static gboolean
1541service_dfsm_finalize(
1542 cfs_service_t *service,
1543 gpointer context)
1544{
1545 g_return_val_if_fail(service != NULL, FALSE);
1546 g_return_val_if_fail(context != NULL, FALSE);
1547
1548 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1549 dfsm_t *dfsm = private->dfsm;
1550
1551 g_return_val_if_fail(dfsm != NULL, FALSE);
1552
1553 return dfsm_finalize(dfsm);
1554}
1555
1556static int
1557service_dfsm_initialize(
1558 cfs_service_t *service,
1559 gpointer context)
1560{
1561 g_return_val_if_fail(service != NULL, -1);
1562 g_return_val_if_fail(context != NULL, -1);
1563
1564 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1565 dfsm_t *dfsm = private->dfsm;
1566
1567 g_return_val_if_fail(dfsm != NULL, -1);
1568
1569 /* serious internal error - don't try to recover */
1570 if (!dfsm_restartable(dfsm))
1571 return -1;
1572
1573 int fd = -1;
1574
1575 cs_error_t result;
1576 if ((result = dfsm_initialize(dfsm, &fd)) != CS_OK)
1577 return -1;
1578
1579 result = dfsm_join(dfsm);
1580 if (result != CS_OK) {
1581 /* we can't dispatch if not joined, so we need to finalize */
1582 dfsm_finalize(dfsm);
1583 return -1;
1584 }
1585
1586 return fd;
1587}
1588
1589static gboolean
1590service_dfsm_dispatch(
1591 cfs_service_t *service,
1592 gpointer context)
1593{
1594 g_return_val_if_fail(service != NULL, FALSE);
1595 g_return_val_if_fail(context != NULL, FALSE);
1596
1597 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1598 dfsm_t *dfsm = private->dfsm;
1599
1600 g_return_val_if_fail(dfsm != NULL, FALSE);
1601 g_return_val_if_fail(dfsm->cpg_handle != 0, FALSE);
1602
1603 cs_error_t result;
1604
1605 result = dfsm_dispatch(dfsm, CPG_DISPATCH_ONE);
1606 if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE)
1607 goto finalize;
1608 if (result != CS_OK)
1609 goto fail;
1610
1611 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1612 if (mode >= DFSM_ERROR_MODE_START) {
1613 if (dfsm->joined) {
1614 result = dfsm_leave(dfsm);
1615 if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE)
1616 goto finalize;
1617 if (result != CS_OK)
1618 goto finalize;
1619 } else {
1620 if (!dfsm->we_are_member)
1621 return FALSE;
1622 }
1623 }
1624
1625 return TRUE;
1626
1627fail:
1628 cfs_service_set_restartable(service, dfsm_restartable(dfsm));
1629 return FALSE;
1630
1631finalize:
1632 dfsm_finalize(dfsm);
1633 goto fail;
1634}
1635
1636static void
1637service_dfsm_timer(
1638 cfs_service_t *service,
1639 gpointer context)
1640{
1641 g_return_if_fail(service != NULL);
1642 g_return_if_fail(context != NULL);
1643
1644 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1645 dfsm_t *dfsm = private->dfsm;
1646
1647 g_return_if_fail(dfsm != NULL);
1648
1649 dfsm_verify_request(dfsm);
1650}
1651
1652static cfs_service_callbacks_t cfs_dfsm_callbacks = {
1653 .cfs_service_initialize_fn = service_dfsm_initialize,
1654 .cfs_service_finalize_fn = service_dfsm_finalize,
1655 .cfs_service_dispatch_fn = service_dfsm_dispatch,
1656 .cfs_service_timer_fn = service_dfsm_timer,
1657};
1658
1659cfs_service_t *
1660service_dfsm_new(dfsm_t *dfsm)
1661{
1662 cfs_service_t *service;
1663
1664 g_return_val_if_fail(dfsm != NULL, NULL);
1665
1666 service_dfsm_private_t *private = g_new0(service_dfsm_private_t, 1);
1667 if (!private)
1668 return NULL;
1669
1670 private->dfsm = dfsm;
1671
1672 service = cfs_service_new(&cfs_dfsm_callbacks, dfsm->log_domain, private);
1673
1674 return service;
1675}
1676
1677void
1678service_dfsm_destroy(cfs_service_t *service)
1679{
1680 g_return_if_fail(service != NULL);
1681
1682 service_dfsm_private_t *private =
1683 (service_dfsm_private_t *)cfs_service_get_context(service);
1684
1685 g_free(private);
1686 g_free(service);
1687}
1688
1689
1690
1691