]> git.proxmox.com Git - pve-cluster.git/blame - data/src/dfsm.c
pvecm: pass correct nodename to finish_join
[pve-cluster.git] / data / src / dfsm.c
CommitLineData
fe000966
DM
1/*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19*/
20
21
22/* NOTE: we try to keep the CPG handle as long as possible, because
23 * calling cpg_initialize/cpg_finalize multiple times from the
24 * same process confuses corosync.
25 * Note: CS_ERR_LIBRARY is returned when corosync died
26 */
27
28#ifdef HAVE_CONFIG_H
29#include <config.h>
30#endif /* HAVE_CONFIG_H */
31
32#include <sys/types.h>
e5a5a3ea 33#include <inttypes.h>
fe000966
DM
34#include <unistd.h>
35#include <string.h>
36#include <stdlib.h>
37
38#include <corosync/corotypes.h>
39#include <corosync/cpg.h>
40#include <glib.h>
41
42#include "cfs-utils.h"
43#include "dfsm.h"
44
45static cpg_callbacks_t cpg_callbacks;
46
47typedef enum {
48 DFSM_MODE_START = 0,
49 DFSM_MODE_START_SYNC = 1,
50 DFSM_MODE_SYNCED = 2,
51 DFSM_MODE_UPDATE = 3,
52
53 /* values >= 128 indicates abnormal/error conditions */
54 DFSM_ERROR_MODE_START = 128,
55 DFSM_MODE_LEAVE = 253,
56 DFSM_MODE_VERSION_ERROR = 254,
57 DFSM_MODE_ERROR = 255,
58} dfsm_mode_t;
59
60typedef enum {
61 DFSM_MESSAGE_NORMAL = 0,
62 DFSM_MESSAGE_SYNC_START = 1,
63 DFSM_MESSAGE_STATE = 2,
64 DFSM_MESSAGE_UPDATE = 3,
65 DFSM_MESSAGE_UPDATE_COMPLETE = 4,
66 DFSM_MESSAGE_VERIFY_REQUEST = 5,
67 DFSM_MESSAGE_VERIFY = 6,
68} dfsm_message_t;
69
70#define DFSM_VALID_STATE_MESSAGE(mt) (mt >= DFSM_MESSAGE_SYNC_START && mt <= DFSM_MESSAGE_VERIFY)
71
72typedef struct {
73 uint16_t type;
74 uint16_t subtype;
75 uint32_t protocol_version;
76 uint32_t time;
77 uint32_t reserved;
78} dfsm_message_header_t;
79
80typedef struct {
81 uint32_t epoch; // per process (not globally unique)
82 uint32_t time;
83 uint32_t nodeid;
84 uint32_t pid;
85} dfsm_sync_epoch_t;
86
87typedef struct {
88 dfsm_message_header_t base;
89 dfsm_sync_epoch_t epoch;
90} dfsm_message_state_header_t;
91
92typedef struct {
93 dfsm_message_header_t base;
94 uint64_t count;
95} dfsm_message_normal_header_t;
96
97typedef struct {
98 uint32_t nodeid;
99 uint32_t pid;
100 uint64_t msg_count;
101 void *msg;
102 int msg_len; // fixme: unsigned?
103} dfsm_queued_message_t;
104
105struct dfsm {
6392e29a 106 const char *log_domain;
fe000966
DM
107 cpg_callbacks_t *cpg_callbacks;
108 dfsm_callbacks_t *dfsm_callbacks;
109 cpg_handle_t cpg_handle;
110 struct cpg_name cpg_group_name;
111 uint32_t nodeid;
112 uint32_t pid;
113 int we_are_member;
114
115 guint32 protocol_version;
116 gpointer data;
117
118 gboolean joined;
119
120 /* mode is protected with mode_mutex */
89fde9ac 121 GMutex mode_mutex;
fe000966
DM
122 dfsm_mode_t mode;
123
124 GHashTable *members; /* contains dfsm_node_info_t pointers */
125 dfsm_sync_info_t *sync_info;
126 uint32_t local_epoch_counter;
127 dfsm_sync_epoch_t sync_epoch;
128 uint32_t lowest_nodeid;
129 GSequence *msg_queue;
130 GList *sync_queue;
131
132 /* synchrounous message transmission, protected with sync_mutex */
89fde9ac
DM
133 GMutex sync_mutex;
134 GCond sync_cond;
fe000966
DM
135 GHashTable *results;
136 uint64_t msgcount;
137 uint64_t msgcount_rcvd;
138
139 /* state verification */
140 guchar csum[32];
141 dfsm_sync_epoch_t csum_epoch;
142 uint64_t csum_id;
143 uint64_t csum_counter;
144};
145
146static gboolean dfsm_deliver_queue(dfsm_t *dfsm);
147static gboolean dfsm_deliver_sync_queue(dfsm_t *dfsm);
148
149gboolean
150dfsm_nodeid_is_local(
151 dfsm_t *dfsm,
152 uint32_t nodeid,
153 uint32_t pid)
154{
155 g_return_val_if_fail(dfsm != NULL, FALSE);
156
157 return (nodeid == dfsm->nodeid && pid == dfsm->pid);
158}
159
160
161static void
162dfsm_send_sync_message_abort(dfsm_t *dfsm)
163{
164 g_return_if_fail(dfsm != NULL);
fe000966 165
89fde9ac 166 g_mutex_lock (&dfsm->sync_mutex);
fe000966 167 dfsm->msgcount_rcvd = dfsm->msgcount;
89fde9ac
DM
168 g_cond_broadcast (&dfsm->sync_cond);
169 g_mutex_unlock (&dfsm->sync_mutex);
fe000966
DM
170}
171
172static void
173dfsm_record_local_result(
174 dfsm_t *dfsm,
175 uint64_t msg_count,
176 int msg_result,
177 gboolean processed)
178{
179 g_return_if_fail(dfsm != NULL);
fe000966
DM
180 g_return_if_fail(dfsm->results != NULL);
181
89fde9ac 182 g_mutex_lock (&dfsm->sync_mutex);
fe000966
DM
183 dfsm_result_t *rp = (dfsm_result_t *)g_hash_table_lookup(dfsm->results, &msg_count);
184 if (rp) {
185 rp->result = msg_result;
186 rp->processed = processed;
187 }
188 dfsm->msgcount_rcvd = msg_count;
89fde9ac
DM
189 g_cond_broadcast (&dfsm->sync_cond);
190 g_mutex_unlock (&dfsm->sync_mutex);
fe000966
DM
191}
192
89fde9ac 193static cs_error_t
fe000966
DM
194dfsm_send_message_full(
195 dfsm_t *dfsm,
196 struct iovec *iov,
197 unsigned int len,
198 int retry)
199{
200 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
201 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
202
203 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
89fde9ac 204 cs_error_t result;
fe000966
DM
205 int retries = 0;
206loop:
207 result = cpg_mcast_joined(dfsm->cpg_handle, CPG_TYPE_AGREED, iov, len);
89fde9ac 208 if (retry && result == CS_ERR_TRY_AGAIN) {
fe000966
DM
209 nanosleep(&tvreq, NULL);
210 ++retries;
211 if ((retries % 10) == 0)
212 cfs_dom_message(dfsm->log_domain, "cpg_send_message retry %d", retries);
213 if (retries < 100)
214 goto loop;
215 }
216
217 if (retries)
218 cfs_dom_message(dfsm->log_domain, "cpg_send_message retried %d times", retries);
219
220 if (result != CS_OK &&
89fde9ac 221 (!retry || result != CS_ERR_TRY_AGAIN))
fe000966
DM
222 cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
223
224 return result;
225}
226
89fde9ac 227static cs_error_t
fe000966
DM
228dfsm_send_state_message_full(
229 dfsm_t *dfsm,
230 uint16_t type,
231 struct iovec *iov,
232 unsigned int len)
233{
234 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
235 g_return_val_if_fail(DFSM_VALID_STATE_MESSAGE(type), CS_ERR_INVALID_PARAM);
236 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
237
238 dfsm_message_state_header_t header;
239 header.base.type = type;
240 header.base.subtype = 0;
241 header.base.protocol_version = dfsm->protocol_version;
242 header.base.time = time(NULL);
243 header.base.reserved = 0;
244
245 header.epoch = dfsm->sync_epoch;
246
247 struct iovec real_iov[len + 1];
248
249 real_iov[0].iov_base = (char *)&header;
250 real_iov[0].iov_len = sizeof(header);
251
252 for (int i = 0; i < len; i++)
253 real_iov[i + 1] = iov[i];
254
255 return dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
256}
257
89fde9ac 258cs_error_t
fe000966
DM
259dfsm_send_update(
260 dfsm_t *dfsm,
261 struct iovec *iov,
262 unsigned int len)
263{
264 return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE, iov, len);
265}
266
89fde9ac 267cs_error_t
fe000966
DM
268dfsm_send_update_complete(dfsm_t *dfsm)
269{
270 return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE_COMPLETE, NULL, 0);
271}
272
273
89fde9ac 274cs_error_t
fe000966
DM
275dfsm_send_message(
276 dfsm_t *dfsm,
277 uint16_t msgtype,
278 struct iovec *iov,
279 int len)
280{
281 return dfsm_send_message_sync(dfsm, msgtype, iov, len, NULL);
282}
283
89fde9ac 284cs_error_t
fe000966
DM
285dfsm_send_message_sync(
286 dfsm_t *dfsm,
287 uint16_t msgtype,
288 struct iovec *iov,
289 int len,
290 dfsm_result_t *rp)
291{
292 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
fe000966
DM
293 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
294
89fde9ac 295 g_mutex_lock (&dfsm->sync_mutex);
fe000966
DM
296 /* note: hold lock until message is sent - to guarantee ordering */
297 uint64_t msgcount = ++dfsm->msgcount;
298 if (rp) {
299 rp->msgcount = msgcount;
300 rp->processed = 0;
301 g_hash_table_replace(dfsm->results, &rp->msgcount, rp);
302 }
303
304 dfsm_message_normal_header_t header;
305 header.base.type = DFSM_MESSAGE_NORMAL;
306 header.base.subtype = msgtype;
307 header.base.protocol_version = dfsm->protocol_version;
308 header.base.time = time(NULL);
309 header.base.reserved = 0;
310 header.count = msgcount;
311
312 struct iovec real_iov[len + 1];
313
314 real_iov[0].iov_base = (char *)&header;
315 real_iov[0].iov_len = sizeof(header);
316
317 for (int i = 0; i < len; i++)
318 real_iov[i + 1] = iov[i];
319
89fde9ac 320 cs_error_t result = dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
fe000966 321
89fde9ac 322 g_mutex_unlock (&dfsm->sync_mutex);
fe000966
DM
323
324 if (result != CS_OK) {
325 cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
326
327 if (rp) {
89fde9ac 328 g_mutex_lock (&dfsm->sync_mutex);
fe000966 329 g_hash_table_remove(dfsm->results, &rp->msgcount);
89fde9ac 330 g_mutex_unlock (&dfsm->sync_mutex);
fe000966
DM
331 }
332 return result;
333 }
334
335 if (rp) {
89fde9ac 336 g_mutex_lock (&dfsm->sync_mutex);
fe000966
DM
337
338 while (dfsm->msgcount_rcvd < msgcount)
89fde9ac 339 g_cond_wait (&dfsm->sync_cond, &dfsm->sync_mutex);
fe000966
DM
340
341
342 g_hash_table_remove(dfsm->results, &rp->msgcount);
343
89fde9ac 344 g_mutex_unlock (&dfsm->sync_mutex);
fe000966
DM
345
346 return rp->processed ? CS_OK : CS_ERR_FAILED_OPERATION;
347 }
348
349 return CS_OK;
350}
351
352static gboolean
353dfsm_send_checksum(dfsm_t *dfsm)
354{
355 g_return_val_if_fail(dfsm != NULL, FALSE);
356
357 int len = 2;
358 struct iovec iov[len];
359
360 iov[0].iov_base = (char *)&dfsm->csum_id;
361 iov[0].iov_len = sizeof(dfsm->csum_id);
362 iov[1].iov_base = dfsm->csum;
363 iov[1].iov_len = sizeof(dfsm->csum);
364
365 gboolean res = (dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY, iov, len) == CS_OK);
366
367 return res;
368}
369
370static void
371dfsm_free_queue_entry(gpointer data)
372{
373 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)data;
374 g_free (qm->msg);
375 g_free (qm);
376}
377
378static void
379dfsm_free_message_queue(dfsm_t *dfsm)
380{
381 g_return_if_fail(dfsm != NULL);
382 g_return_if_fail(dfsm->msg_queue != NULL);
383
384 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
385 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
386 while (iter != end) {
387 GSequenceIter *cur = iter;
388 iter = g_sequence_iter_next(iter);
389 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
390 g_sequence_get(cur);
391 dfsm_free_queue_entry(qm);
392 g_sequence_remove(cur);
393 }
394}
395
396static void
397dfsm_free_sync_queue(dfsm_t *dfsm)
398{
399 g_return_if_fail(dfsm != NULL);
400
401 GList *iter = dfsm->sync_queue;
402 while (iter) {
403 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data;
404 iter = g_list_next(iter);
405 dfsm_free_queue_entry(qm);
406 }
407
408 g_list_free(dfsm->sync_queue);
409 dfsm->sync_queue = NULL;
410}
411
412static gint
413message_queue_sort_fn(
414 gconstpointer a,
415 gconstpointer b,
416 gpointer user_data)
417{
418 return ((dfsm_queued_message_t *)a)->msg_count -
419 ((dfsm_queued_message_t *)b)->msg_count;
420}
421
422static dfsm_node_info_t *
423dfsm_node_info_lookup(
424 dfsm_t *dfsm,
425 uint32_t nodeid,
426 uint32_t pid)
427{
428 g_return_val_if_fail(dfsm != NULL, NULL);
429 g_return_val_if_fail(dfsm->members != NULL, NULL);
430
431 dfsm_node_info_t info = { .nodeid = nodeid, .pid = pid };
432
433 return (dfsm_node_info_t *)g_hash_table_lookup(dfsm->members, &info);
434}
435
436static dfsm_queued_message_t *
437dfsm_queue_add_message(
438 dfsm_t *dfsm,
439 uint32_t nodeid,
440 uint32_t pid,
441 uint64_t msg_count,
442 const void *msg,
443 size_t msg_len)
444{
445 g_return_val_if_fail(dfsm != NULL, NULL);
446 g_return_val_if_fail(msg != NULL, NULL);
447 g_return_val_if_fail(msg_len != 0, NULL);
448
449 dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, nodeid, pid);
450 if (!ni) {
451 cfs_dom_critical(dfsm->log_domain, "dfsm_node_info_lookup failed");
452 return NULL;
453 }
454
455 dfsm_queued_message_t *qm = g_new0(dfsm_queued_message_t, 1);
456 g_return_val_if_fail(qm != NULL, NULL);
457
458 qm->nodeid = nodeid;
459 qm->pid = pid;
460 qm->msg = g_memdup (msg, msg_len);
461 qm->msg_len = msg_len;
462 qm->msg_count = msg_count;
463
464 if (dfsm->mode == DFSM_MODE_UPDATE && ni->synced) {
465 dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm);
466 } else {
467 /* NOTE: we only need to sort the queue because we resend all
468 * queued messages sometimes.
469 */
470 g_sequence_insert_sorted(dfsm->msg_queue, qm, message_queue_sort_fn, NULL);
471 }
472
473 return qm;
474}
475
476static guint
477dfsm_sync_info_hash(gconstpointer key)
478{
479 dfsm_node_info_t *info = (dfsm_node_info_t *)key;
480
481 return g_int_hash(&info->nodeid) + g_int_hash(&info->pid);
482}
483
484static gboolean
485dfsm_sync_info_equal(
486 gconstpointer v1,
487 gconstpointer v2)
488{
489 dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1;
490 dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2;
491
492 if (info1->nodeid == info2->nodeid &&
493 info1->pid == info2->pid)
494 return TRUE;
495
496 return FALSE;
497}
498
499static int
500dfsm_sync_info_compare(
501 gconstpointer v1,
502 gconstpointer v2)
503{
504 dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1;
505 dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2;
506
507 if (info1->nodeid != info2->nodeid)
508 return info1->nodeid - info2->nodeid;
509
510 return info1->pid - info2->pid;
511}
512
513static void
514dfsm_set_mode(
515 dfsm_t *dfsm,
516 dfsm_mode_t new_mode)
517{
518 g_return_if_fail(dfsm != NULL);
519
520 cfs_debug("dfsm_set_mode - set mode to %d", new_mode);
521
522 int changed = 0;
89fde9ac 523 g_mutex_lock (&dfsm->mode_mutex);
fe000966
DM
524 if (dfsm->mode != new_mode) {
525 if (new_mode < DFSM_ERROR_MODE_START ||
526 (dfsm->mode < DFSM_ERROR_MODE_START || new_mode >= dfsm->mode)) {
527 dfsm->mode = new_mode;
528 changed = 1;
529 }
530 }
89fde9ac 531 g_mutex_unlock (&dfsm->mode_mutex);
fe000966
DM
532
533 if (!changed)
534 return;
535
536 if (new_mode == DFSM_MODE_START) {
537 cfs_dom_message(dfsm->log_domain, "start cluster connection");
538 } else if (new_mode == DFSM_MODE_START_SYNC) {
539 cfs_dom_message(dfsm->log_domain, "starting data syncronisation");
540 } else if (new_mode == DFSM_MODE_SYNCED) {
541 cfs_dom_message(dfsm->log_domain, "all data is up to date");
542 if (dfsm->dfsm_callbacks->dfsm_synced_fn)
543 dfsm->dfsm_callbacks->dfsm_synced_fn(dfsm);
544 } else if (new_mode == DFSM_MODE_UPDATE) {
545 cfs_dom_message(dfsm->log_domain, "waiting for updates from leader");
546 } else if (new_mode == DFSM_MODE_LEAVE) {
547 cfs_dom_critical(dfsm->log_domain, "leaving CPG group");
548 } else if (new_mode == DFSM_MODE_ERROR) {
549 cfs_dom_critical(dfsm->log_domain, "serious internal error - stop cluster connection");
550 } else if (new_mode == DFSM_MODE_VERSION_ERROR) {
551 cfs_dom_critical(dfsm->log_domain, "detected newer protocol - please update this node");
552 }
553}
554
555static dfsm_mode_t
556dfsm_get_mode(dfsm_t *dfsm)
557{
558 g_return_val_if_fail(dfsm != NULL, DFSM_MODE_ERROR);
559
89fde9ac 560 g_mutex_lock (&dfsm->mode_mutex);
fe000966 561 dfsm_mode_t mode = dfsm->mode;
89fde9ac 562 g_mutex_unlock (&dfsm->mode_mutex);
fe000966
DM
563
564 return mode;
565}
566
567gboolean
568dfsm_restartable(dfsm_t *dfsm)
569{
570 dfsm_mode_t mode = dfsm_get_mode(dfsm);
571
572 return !(mode == DFSM_MODE_ERROR ||
573 mode == DFSM_MODE_VERSION_ERROR);
574}
575
576void
577dfsm_set_errormode(dfsm_t *dfsm)
578{
579 dfsm_set_mode(dfsm, DFSM_MODE_ERROR);
580}
581
582static void
583dfsm_release_sync_resources(
584 dfsm_t *dfsm,
585 const struct cpg_address *member_list,
586 size_t member_list_entries)
587{
588 g_return_if_fail(dfsm != NULL);
589 g_return_if_fail(dfsm->members != NULL);
590 g_return_if_fail(!member_list_entries || member_list != NULL);
591
592 cfs_debug("enter dfsm_release_sync_resources");
593
594 if (dfsm->sync_info) {
595
596 if (dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn) {
597 dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info);
598 dfsm->sync_info->data = NULL;
599 }
600
601 for (int i = 0; i < dfsm->sync_info->node_count; i++) {
602 if (dfsm->sync_info->nodes[i].state) {
603 g_free(dfsm->sync_info->nodes[i].state);
604 dfsm->sync_info->nodes[i].state = NULL;
605 dfsm->sync_info->nodes[i].state_len = 0;
606 }
607 }
608 }
609
610 if (member_list) {
611
612 g_hash_table_remove_all(dfsm->members);
613
614 if (dfsm->sync_info)
615 g_free(dfsm->sync_info);
616
617 int size = sizeof(dfsm_sync_info_t) +
618 member_list_entries*sizeof(dfsm_sync_info_t);
619 dfsm_sync_info_t *sync_info = dfsm->sync_info = g_malloc0(size);
620 sync_info->node_count = member_list_entries;
621
622 for (int i = 0; i < member_list_entries; i++) {
623 sync_info->nodes[i].nodeid = member_list[i].nodeid;
624 sync_info->nodes[i].pid = member_list[i].pid;
625 }
626
627 qsort(sync_info->nodes, member_list_entries, sizeof(dfsm_node_info_t),
628 dfsm_sync_info_compare);
629
630 for (int i = 0; i < member_list_entries; i++) {
631 dfsm_node_info_t *info = &sync_info->nodes[i];
632 g_hash_table_insert(dfsm->members, info, info);
633 if (info->nodeid == dfsm->nodeid && info->pid == dfsm->pid)
634 sync_info->local = info;
635 }
636 }
637}
638
639static void
640dfsm_cpg_deliver_callback(
641 cpg_handle_t handle,
642 const struct cpg_name *group_name,
643 uint32_t nodeid,
644 uint32_t pid,
645 void *msg,
646 size_t msg_len)
647{
648 cs_error_t result;
649
650 dfsm_t *dfsm = NULL;
651 result = cpg_context_get(handle, (gpointer *)&dfsm);
652 if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
84e1bd35 653 cfs_critical("cpg_context_get error: %d (%p)", result, (void *) dfsm);
fe000966
DM
654 return; /* we have no valid dfsm pointer, so we can just ignore this */
655 }
656 dfsm_mode_t mode = dfsm_get_mode(dfsm);
657
658 cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode);
659
660 if (mode >= DFSM_ERROR_MODE_START) {
661 cfs_dom_debug(dfsm->log_domain, "error mode - ignoring message");
662 goto leave;
663 }
664
665 if (!dfsm->sync_info) {
666 cfs_dom_critical(dfsm->log_domain, "no dfsm_sync_info - internal error");
667 goto leave;
668 }
669
670 if (msg_len < sizeof(dfsm_message_header_t)) {
e5a5a3ea 671 cfs_dom_critical(dfsm->log_domain, "received short message (%zd bytes)", msg_len);
fe000966
DM
672 goto leave;
673 }
674
675 dfsm_message_header_t *base_header = (dfsm_message_header_t *)msg;
676
677 if (base_header->protocol_version > dfsm->protocol_version) {
678 cfs_dom_critical(dfsm->log_domain, "received message with protocol version %d",
679 base_header->protocol_version);
680 dfsm_set_mode(dfsm, DFSM_MODE_VERSION_ERROR);
681 return;
682 } else if (base_header->protocol_version < dfsm->protocol_version) {
683 cfs_dom_message(dfsm->log_domain, "ignore message with wrong protocol version %d",
684 base_header->protocol_version);
685 return;
686 }
687
688 if (base_header->type == DFSM_MESSAGE_NORMAL) {
689
690 dfsm_message_normal_header_t *header = (dfsm_message_normal_header_t *)msg;
691
692 if (msg_len < sizeof(dfsm_message_normal_header_t)) {
e5a5a3ea 693 cfs_dom_critical(dfsm->log_domain, "received short message (type = %d, subtype = %d, %zd bytes)",
fe000966
DM
694 base_header->type, base_header->subtype, msg_len);
695 goto leave;
696 }
697
698 if (mode != DFSM_MODE_SYNCED) {
e5a5a3ea 699 cfs_dom_debug(dfsm->log_domain, "queue message %" PRIu64 " (subtype = %d, length = %zd)",
fe000966
DM
700 header->count, base_header->subtype, msg_len);
701
702 if (!dfsm_queue_add_message(dfsm, nodeid, pid, header->count, msg, msg_len))
703 goto leave;
704 } else {
705
706 int msg_res = -1;
707 int res = dfsm->dfsm_callbacks->dfsm_deliver_fn(
708 dfsm, dfsm->data, &msg_res, nodeid, pid, base_header->subtype,
975f9b0c 709 base_header->time, (uint8_t *)msg + sizeof(dfsm_message_normal_header_t),
fe000966
DM
710 msg_len - sizeof(dfsm_message_normal_header_t));
711
712 if (nodeid == dfsm->nodeid && pid == dfsm->pid)
713 dfsm_record_local_result(dfsm, header->count, msg_res, res);
714
715 if (res < 0)
716 goto leave;
717 }
718
719 return;
720 }
721
722 /* state related messages
723 * we needs right epoch - else we simply discard the message
724 */
725
726 dfsm_message_state_header_t *header = (dfsm_message_state_header_t *)msg;
727
728 if (msg_len < sizeof(dfsm_message_state_header_t)) {
e5a5a3ea 729 cfs_dom_critical(dfsm->log_domain, "received short state message (type = %d, subtype = %d, %zd bytes)",
fe000966
DM
730 base_header->type, base_header->subtype, msg_len);
731 goto leave;
732 }
733
734 if (base_header->type != DFSM_MESSAGE_SYNC_START &&
735 (memcmp(&header->epoch, &dfsm->sync_epoch, sizeof(dfsm_sync_epoch_t)) != 0)) {
736 cfs_dom_debug(dfsm->log_domain, "ignore message (msg_type == %d) with "
737 "wrong epoch (epoch %d/%d/%08X)", base_header->type,
738 header->epoch.nodeid, header->epoch.pid, header->epoch.epoch);
739 return;
740 }
741
975f9b0c 742 msg = (uint8_t *) msg + sizeof(dfsm_message_state_header_t);
fe000966
DM
743 msg_len -= sizeof(dfsm_message_state_header_t);
744
745 if (mode == DFSM_MODE_SYNCED) {
746 if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) {
747
748 for (int i = 0; i < dfsm->sync_info->node_count; i++)
749 dfsm->sync_info->nodes[i].synced = 1;
750
751 if (!dfsm_deliver_queue(dfsm))
752 goto leave;
753
754 return;
755
756 } else if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST) {
757
758 if (msg_len != sizeof(dfsm->csum_counter)) {
e5a5a3ea 759 cfs_dom_critical(dfsm->log_domain, "cpg received verify request with wrong length (%zd bytes) form node %d/%d", msg_len, nodeid, pid);
fe000966
DM
760 goto leave;
761 }
762
763 uint64_t csum_id = *((uint64_t *)msg);
975f9b0c 764 msg = (uint8_t *) msg + 8; msg_len -= 8;
fe000966 765
e5a5a3ea 766 cfs_dom_debug(dfsm->log_domain, "got verify request from node %d %016" PRIX64, nodeid, csum_id);
fe000966
DM
767
768 if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
769 if (!dfsm->dfsm_callbacks->dfsm_checksum_fn(
770 dfsm, dfsm->data, dfsm->csum, sizeof(dfsm->csum))) {
771 cfs_dom_critical(dfsm->log_domain, "unable to compute data checksum");
772 goto leave;
773 }
774
775 dfsm->csum_epoch = header->epoch;
776 dfsm->csum_id = csum_id;
777
778 if (nodeid == dfsm->nodeid && pid == dfsm->pid) {
779 if (!dfsm_send_checksum(dfsm))
780 goto leave;
781 }
782 }
783
784 return;
785
786 } else if (base_header->type == DFSM_MESSAGE_VERIFY) {
787
788 cfs_dom_debug(dfsm->log_domain, "received verify message");
789
790 if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
791
792 if (msg_len != (sizeof(dfsm->csum_id) + sizeof(dfsm->csum))) {
e5a5a3ea 793 cfs_dom_critical(dfsm->log_domain, "cpg received verify message with wrong length (%zd bytes)", msg_len);
fe000966
DM
794 goto leave;
795 }
796
797 uint64_t csum_id = *((uint64_t *)msg);
975f9b0c 798 msg = (uint8_t *) msg + 8; msg_len -= 8;
fe000966
DM
799
800 if (dfsm->csum_id == csum_id &&
801 (memcmp(&dfsm->csum_epoch, &header->epoch, sizeof(dfsm_sync_epoch_t)) == 0)) {
802 if (memcmp(msg, dfsm->csum, sizeof(dfsm->csum)) != 0) {
e5a5a3ea 803 cfs_dom_critical(dfsm->log_domain, "wrong checksum %016" PRIX64 " != %016" PRIX64 " - restarting",
fe000966
DM
804 *(uint64_t *)msg, *(uint64_t *)dfsm->csum);
805 goto leave;
806 } else {
807 cfs_dom_message(dfsm->log_domain, "data verification successful");
808 }
809 } else {
810 cfs_dom_message(dfsm->log_domain, "skip verification - no checksum saved");
811 }
812 }
813
814 return;
815
816 } else {
817 /* ignore (we already got all required updates, or we are leader) */
818 cfs_dom_debug(dfsm->log_domain, "ignore state sync message %d",
819 base_header->type);
820 return;
821 }
822
823 } else if (mode == DFSM_MODE_START_SYNC) {
824
825 if (base_header->type == DFSM_MESSAGE_SYNC_START) {
826
827 if (nodeid != dfsm->lowest_nodeid) {
828 cfs_dom_critical(dfsm->log_domain, "ignore sync request from wrong member %d/%d",
829 nodeid, pid);
830 }
831
832 cfs_dom_message(dfsm->log_domain, "received sync request (epoch %d/%d/%08X)",
833 header->epoch.nodeid, header->epoch.pid, header->epoch.epoch);
834
835 dfsm->sync_epoch = header->epoch;
836
837 dfsm_release_sync_resources(dfsm, NULL, 0);
838
839 unsigned int state_len = 0;
840 gpointer state = NULL;
841
842 state = dfsm->dfsm_callbacks->dfsm_get_state_fn(dfsm, dfsm->data, &state_len);
843
844 if (!(state && state_len)) {
845 cfs_dom_critical(dfsm->log_domain, "dfsm_get_state_fn failed");
846 goto leave;
847 }
848
849 struct iovec iov[1];
850 iov[0].iov_base = state;
851 iov[0].iov_len = state_len;
852
853 result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_STATE, iov, 1);
854
855 if (state)
856 g_free(state);
857
858 if (result != CS_OK)
859 goto leave;
860
861 return;
862
863 } else if (base_header->type == DFSM_MESSAGE_STATE) {
864
865 dfsm_node_info_t *ni;
866
867 if (!(ni = dfsm_node_info_lookup(dfsm, nodeid, pid))) {
868 cfs_dom_critical(dfsm->log_domain, "received state for non-member %d/%d", nodeid, pid);
869 goto leave;
870 }
871
872 if (ni->state) {
873 cfs_dom_critical(dfsm->log_domain, "received duplicate state for member %d/%d", nodeid, pid);
874 goto leave;
875 }
876
877 ni->state = g_memdup(msg, msg_len);
878 ni->state_len = msg_len;
879
880 int received_all = 1;
881 for (int i = 0; i < dfsm->sync_info->node_count; i++) {
882 if (!dfsm->sync_info->nodes[i].state) {
883 received_all = 0;
884 break;
885 }
886 }
887
888 if (received_all) {
889 cfs_dom_message(dfsm->log_domain, "received all states");
890
891 int res = dfsm->dfsm_callbacks->dfsm_process_state_update_fn(dfsm, dfsm->data, dfsm->sync_info);
892 if (res < 0)
893 goto leave;
894
895 if (dfsm->sync_info->local->synced) {
896 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
897 dfsm_release_sync_resources(dfsm, NULL, 0);
898
899 if (!dfsm_deliver_queue(dfsm))
900 goto leave;
901
902 } else {
903 dfsm_set_mode(dfsm, DFSM_MODE_UPDATE);
904
905 if (!dfsm_deliver_queue(dfsm))
906 goto leave;
907 }
908
909 }
910
911 return;
912 }
913
914 } else if (mode == DFSM_MODE_UPDATE) {
915
916 if (base_header->type == DFSM_MESSAGE_UPDATE) {
917
918 int res = dfsm->dfsm_callbacks->dfsm_process_update_fn(
919 dfsm, dfsm->data, dfsm->sync_info, nodeid, pid, msg, msg_len);
920
921 if (res < 0)
922 goto leave;
923
924 return;
925
926 } else if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) {
927
928
929 int res = dfsm->dfsm_callbacks->dfsm_commit_fn(dfsm, dfsm->data, dfsm->sync_info);
930
931 if (res < 0)
932 goto leave;
933
934 for (int i = 0; i < dfsm->sync_info->node_count; i++)
935 dfsm->sync_info->nodes[i].synced = 1;
936
937 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
938
939 if (!dfsm_deliver_sync_queue(dfsm))
940 goto leave;
941
942 if (!dfsm_deliver_queue(dfsm))
943 goto leave;
944
945 dfsm_release_sync_resources(dfsm, NULL, 0);
946
947 return;
948 }
949
950 } else {
951 cfs_dom_critical(dfsm->log_domain, "internal error - unknown mode %d", mode);
952 goto leave;
953 }
954
955 if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST ||
956 base_header->type == DFSM_MESSAGE_VERIFY) {
957
958 cfs_dom_debug(dfsm->log_domain, "ignore verify message %d while not synced", base_header->type);
959
960 } else {
e5a5a3ea 961 cfs_dom_critical(dfsm->log_domain, "received unknown state message type (type = %d, %zd bytes)",
fe000966
DM
962 base_header->type, msg_len);
963 goto leave;
964 }
965
966leave:
967 dfsm_set_mode(dfsm, DFSM_MODE_LEAVE);
968 dfsm_release_sync_resources(dfsm, NULL, 0);
969 return;
970}
971
972static gboolean
973dfsm_resend_queue(dfsm_t *dfsm)
974{
975 g_return_val_if_fail(dfsm != NULL, FALSE);
976 g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE);
977
978 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
979 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
980 gboolean res = TRUE;
981
982 while (iter != end) {
983 GSequenceIter *cur = iter;
984 iter = g_sequence_iter_next(iter);
985
986 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
987 g_sequence_get(cur);
988
989 if (qm->nodeid == dfsm->nodeid && qm->pid == dfsm->pid) {
89fde9ac 990 cs_error_t result;
fe000966
DM
991 struct iovec iov[1];
992 iov[0].iov_base = qm->msg;
993 iov[0].iov_len = qm->msg_len;
994
995 if ((result = dfsm_send_message_full(dfsm, iov, 1, 1)) != CS_OK) {
996 res = FALSE;
997 break;
998 }
999 }
1000 }
1001
1002 dfsm_free_message_queue(dfsm);
1003
1004 return res;
1005}
1006
1007static gboolean
1008dfsm_deliver_sync_queue(dfsm_t *dfsm)
1009{
1010 g_return_val_if_fail(dfsm != NULL, FALSE);
1011
1012 if (!dfsm->sync_queue)
1013 return TRUE;
1014
1015 gboolean res = TRUE;
1016
1017 // fixme: cfs_debug
1018 cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__,
1019 g_list_length(dfsm->sync_queue));
1020
1021 GList *iter = dfsm->sync_queue;
1022 while (iter) {
1023 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data;
1024 iter = g_list_next(iter);
1025
1026 if (res && dfsm->mode == DFSM_MODE_SYNCED) {
1027 dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name,
1028 qm->nodeid, qm->pid, qm->msg, qm->msg_len);
1029 } else {
1030 res = FALSE;
1031 }
1032
1033 dfsm_free_queue_entry(qm);
1034 }
1035 g_list_free(dfsm->sync_queue);
1036 dfsm->sync_queue = NULL;
1037
1038 return res;
1039}
1040
1041static gboolean
1042dfsm_deliver_queue(dfsm_t *dfsm)
1043{
1044 g_return_val_if_fail(dfsm != NULL, FALSE);
1045 g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE);
1046
1047 int qlen = g_sequence_get_length(dfsm->msg_queue);
1048 if (!qlen)
1049 return TRUE;
1050
1051 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
1052 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
1053 gboolean res = TRUE;
1054
1055 // fixme: cfs_debug
1056 cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__, qlen);
1057
1058 while (iter != end) {
1059 GSequenceIter *cur = iter;
1060 iter = g_sequence_iter_next(iter);
1061
1062 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
1063 g_sequence_get(cur);
1064
1065 dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, qm->nodeid, qm->pid);
1066 if (!ni) {
d3eeade6 1067 cfs_dom_message(dfsm->log_domain, "remove message from non-member %d/%d",
fe000966
DM
1068 qm->nodeid, qm->pid);
1069 dfsm_free_queue_entry(qm);
1070 g_sequence_remove(cur);
1071 continue;
1072 }
1073
1074 if (dfsm->mode == DFSM_MODE_SYNCED) {
1075 if (ni->synced) {
1076 dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name,
1077 qm->nodeid, qm->pid, qm->msg, qm->msg_len);
1078 dfsm_free_queue_entry(qm);
1079 g_sequence_remove(cur);
1080 }
1081 } else if (dfsm->mode == DFSM_MODE_UPDATE) {
1082 if (ni->synced) {
1083 dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm);
1084 g_sequence_remove(cur);
1085 }
1086 } else {
1087 res = FALSE;
1088 break;
1089 }
1090 }
1091
1092 return res;
1093}
1094
1095static void
1096dfsm_cpg_confchg_callback(
1097 cpg_handle_t handle,
1098 const struct cpg_name *group_name,
1099 const struct cpg_address *member_list,
1100 size_t member_list_entries,
1101 const struct cpg_address *left_list,
1102 size_t left_list_entries,
1103 const struct cpg_address *joined_list,
1104 size_t joined_list_entries)
1105{
1106 cs_error_t result;
1107
1108 dfsm_t *dfsm = NULL;
1109 result = cpg_context_get(handle, (gpointer *)&dfsm);
1110 if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
84e1bd35 1111 cfs_critical("cpg_context_get error: %d (%p)", result, (void *) dfsm);
fe000966
DM
1112 return; /* we have no valid dfsm pointer, so we can just ignore this */
1113 }
1114
1115 dfsm->we_are_member = 0;
1116
1117 /* create new epoch */
1118 dfsm->local_epoch_counter++;
1119 dfsm->sync_epoch.epoch = dfsm->local_epoch_counter;
1120 dfsm->sync_epoch.nodeid = dfsm->nodeid;
1121 dfsm->sync_epoch.pid = dfsm->pid;
1122 dfsm->sync_epoch.time = time(NULL);
1123
1124 /* invalidate saved checksum */
1125 dfsm->csum_id = dfsm->csum_counter;
1126 memset(&dfsm->csum_epoch, 0, sizeof(dfsm->csum_epoch));
1127
1128 dfsm_free_sync_queue(dfsm);
1129
1130 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1131
1132 cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode);
1133
1134 if (mode >= DFSM_ERROR_MODE_START) {
1135 cfs_dom_debug(dfsm->log_domain, "already left group - ignore message");
1136 return;
1137 }
1138
1139 int lowest_nodeid = 0;
e26a1b5f 1140 GString *member_ids = g_string_new(NULL);
fe000966
DM
1141 for (int i = 0; i < member_list_entries; i++) {
1142
e26a1b5f 1143 g_string_append_printf(member_ids, i ? ", %d/%d" : "%d/%d",
fe000966
DM
1144 member_list[i].nodeid, member_list[i].pid);
1145
1146 if (lowest_nodeid == 0 || lowest_nodeid > member_list[i].nodeid)
1147 lowest_nodeid = member_list[i].nodeid;
1148
1149 if (member_list[i].nodeid == dfsm->nodeid &&
1150 member_list[i].pid == dfsm->pid)
1151 dfsm->we_are_member = 1;
1152 }
1153
1154
1155 if ((dfsm->we_are_member || mode != DFSM_MODE_START))
e26a1b5f 1156 cfs_dom_message(dfsm->log_domain, "members: %s", member_ids->str);
fe000966 1157
e26a1b5f 1158 g_string_free(member_ids, 1);
fe000966
DM
1159
1160 dfsm->lowest_nodeid = lowest_nodeid;
1161
1162 /* NOTE: one node can be in left and joined list at the same time,
1163 so it is better to query member list. Also JOIN/LEAVE list are
1164 different on different nodes!
1165 */
1166
1167 dfsm_release_sync_resources(dfsm, member_list, member_list_entries);
1168
1169 if (!dfsm->we_are_member) {
1170 if (mode == DFSM_MODE_START) {
1171 cfs_dom_debug(dfsm->log_domain, "ignore leave message");
1172 return;
1173 }
1174 cfs_dom_message(dfsm->log_domain, "we (%d/%d) left the process group",
1175 dfsm->nodeid, dfsm->pid);
1176 goto leave;
1177 }
1178
1179 if (member_list_entries > 1) {
1180
1181 int qlen = g_sequence_get_length(dfsm->msg_queue);
1182 if (joined_list_entries && qlen) {
1183 /* we need to make sure that all members have the same queue. */
1184 cfs_dom_message(dfsm->log_domain, "queue not emtpy - resening %d messages", qlen);
1185 if (!dfsm_resend_queue(dfsm)) {
1186 cfs_dom_critical(dfsm->log_domain, "dfsm_resend_queue failed");
1187 goto leave;
1188 }
1189 }
1190
1191 dfsm_set_mode(dfsm, DFSM_MODE_START_SYNC);
1192 if (lowest_nodeid == dfsm->nodeid) {
1193 if (!dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_SYNC_START, NULL, 0)) {
1194 cfs_dom_critical(dfsm->log_domain, "failed to send SYNC_START message");
1195 goto leave;
1196 }
1197 }
1198 } else {
1199 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
1200 dfsm->sync_info->local->synced = 1;
1201 if (!dfsm_deliver_queue(dfsm))
1202 goto leave;
1203 }
1204
1205 if (dfsm->dfsm_callbacks->dfsm_confchg_fn)
1206 dfsm->dfsm_callbacks->dfsm_confchg_fn(dfsm, dfsm->data, member_list, member_list_entries);
1207
1208 return;
1209leave:
1210 dfsm_set_mode(dfsm, DFSM_MODE_LEAVE);
1211 return;
1212}
1213
1214static cpg_callbacks_t cpg_callbacks = {
1215 .cpg_deliver_fn = dfsm_cpg_deliver_callback,
1216 .cpg_confchg_fn = dfsm_cpg_confchg_callback,
1217};
1218
1219dfsm_t *
1220dfsm_new(
1221 gpointer data,
1222 const char *group_name,
1223 const char *log_domain,
1224 guint32 protocol_version,
1225 dfsm_callbacks_t *callbacks)
1226{
1227 g_return_val_if_fail(sizeof(dfsm_message_header_t) == 16, NULL);
1228 g_return_val_if_fail(sizeof(dfsm_message_state_header_t) == 32, NULL);
1229 g_return_val_if_fail(sizeof(dfsm_message_normal_header_t) == 24, NULL);
1230
1231 g_return_val_if_fail(callbacks != NULL, NULL);
1232 g_return_val_if_fail(callbacks->dfsm_deliver_fn != NULL, NULL);
1233
1234 g_return_val_if_fail(callbacks->dfsm_get_state_fn != NULL, NULL);
1235 g_return_val_if_fail(callbacks->dfsm_process_state_update_fn != NULL, NULL);
1236 g_return_val_if_fail(callbacks->dfsm_process_update_fn != NULL, NULL);
1237 g_return_val_if_fail(callbacks->dfsm_commit_fn != NULL, NULL);
1238
1239 dfsm_t *dfsm;
1240
1241 if ((dfsm = g_new0(dfsm_t, 1)) == NULL)
1242 return NULL;
1243
89fde9ac
DM
1244 g_mutex_init(&dfsm->sync_mutex);
1245
1246 g_cond_init(&dfsm->sync_cond);
fe000966
DM
1247
1248 if (!(dfsm->results = g_hash_table_new(g_int64_hash, g_int64_equal)))
1249 goto err;
1250
1251 if (!(dfsm->msg_queue = g_sequence_new(NULL)))
1252 goto err;
1253
6392e29a 1254 dfsm->log_domain = log_domain;
fe000966
DM
1255 dfsm->data = data;
1256 dfsm->mode = DFSM_MODE_START;
1257 dfsm->protocol_version = protocol_version;
1258 strcpy (dfsm->cpg_group_name.value, group_name);
1259 dfsm->cpg_group_name.length = strlen (group_name) + 1;
1260
1261 dfsm->cpg_callbacks = &cpg_callbacks;
1262 dfsm->dfsm_callbacks = callbacks;
1263
1264 dfsm->members = g_hash_table_new(dfsm_sync_info_hash, dfsm_sync_info_equal);
1265 if (!dfsm->members)
1266 goto err;
1267
89fde9ac 1268 g_mutex_init(&dfsm->mode_mutex);
fe000966
DM
1269
1270 return dfsm;
1271
1272err:
1273 dfsm_destroy(dfsm);
1274 return NULL;
1275}
1276
af2e9dd4
DM
1277gboolean
1278dfsm_is_initialized(dfsm_t *dfsm)
1279{
1280 g_return_val_if_fail(dfsm != NULL, FALSE);
1281
1282 return (dfsm->cpg_handle != 0) ? TRUE : FALSE;
1283}
1284
fe000966
DM
1285gboolean
1286dfsm_lowest_nodeid(dfsm_t *dfsm)
1287{
1288 g_return_val_if_fail(dfsm != NULL, FALSE);
1289
1290 if (dfsm->lowest_nodeid && (dfsm->lowest_nodeid == dfsm->nodeid))
1291 return TRUE;
1292
1293 return FALSE;
1294}
1295
1296cs_error_t
1297dfsm_verify_request(dfsm_t *dfsm)
1298{
1299 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1300
1301 /* only do when we have lowest nodeid */
1302 if (!dfsm->lowest_nodeid || (dfsm->lowest_nodeid != dfsm->nodeid))
1303 return CS_OK;
1304
1305 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1306 if (mode != DFSM_MODE_SYNCED)
1307 return CS_OK;
1308
1309 int len = 1;
1310 struct iovec iov[len];
1311
1312 if (dfsm->csum_counter != dfsm->csum_id) {
e5a5a3ea 1313 g_message("delay verify request %016" PRIX64, dfsm->csum_counter + 1);
fe000966
DM
1314 return CS_OK;
1315 };
1316
1317 dfsm->csum_counter++;
1318 iov[0].iov_base = (char *)&dfsm->csum_counter;
1319 iov[0].iov_len = sizeof(dfsm->csum_counter);
1320
e5a5a3ea 1321 cfs_debug("send verify request %016" PRIX64, dfsm->csum_counter);
fe000966
DM
1322
1323 cs_error_t result;
1324 result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY_REQUEST, iov, len);
1325
1326 if (result != CS_OK)
1327 cfs_dom_critical(dfsm->log_domain, "failed to send VERIFY_REQUEST message");
1328
1329 return result;
1330}
1331
1332
1333cs_error_t
1334dfsm_dispatch(
1335 dfsm_t *dfsm,
1336 cs_dispatch_flags_t dispatch_types)
1337{
1338 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1339 g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_INVALID_PARAM);
1340
1341 cs_error_t result;
1342
1343 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1344 int retries = 0;
1345loop:
1346 result = cpg_dispatch(dfsm->cpg_handle, dispatch_types);
89fde9ac 1347 if (result == CS_ERR_TRY_AGAIN) {
fe000966
DM
1348 nanosleep(&tvreq, NULL);
1349 ++retries;
1350 if ((retries % 10) == 0)
1351 cfs_dom_message(dfsm->log_domain, "cpg_dispatch retry %d", retries);
1352 goto loop;
1353 }
1354
1355 if (!(result == CS_OK || result == CS_ERR_TRY_AGAIN)) {
1356 cfs_dom_critical(dfsm->log_domain, "cpg_dispatch failed: %d", result);
1357 }
1358
1359 return result;
1360}
1361
1362
1363cs_error_t
1364dfsm_initialize(dfsm_t *dfsm, int *fd)
1365{
1366 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1367 g_return_val_if_fail(fd != NULL, CS_ERR_INVALID_PARAM);
1368
1369 /* remove old messages */
1370 dfsm_free_message_queue(dfsm);
1371 dfsm_send_sync_message_abort(dfsm);
1372
1373 dfsm->joined = FALSE;
1374 dfsm->we_are_member = 0;
1375
1376 dfsm_set_mode(dfsm, DFSM_MODE_START);
1377
1378 cs_error_t result;
1379
1380 if (dfsm->cpg_handle == 0) {
1381 if ((result = cpg_initialize(&dfsm->cpg_handle, dfsm->cpg_callbacks)) != CS_OK) {
1382 cfs_dom_critical(dfsm->log_domain, "cpg_initialize failed: %d", result);
751a3b88 1383 goto err_no_finalize;
fe000966
DM
1384 }
1385
1386 if ((result = cpg_local_get(dfsm->cpg_handle, &dfsm->nodeid)) != CS_OK) {
1387 cfs_dom_critical(dfsm->log_domain, "cpg_local_get failed: %d", result);
751a3b88 1388 goto err_finalize;
fe000966
DM
1389 }
1390
1391 dfsm->pid = getpid();
1392
1393 result = cpg_context_set(dfsm->cpg_handle, dfsm);
1394 if (result != CS_OK) {
1395 cfs_dom_critical(dfsm->log_domain, "cpg_context_set failed: %d", result);
751a3b88 1396 goto err_finalize;
fe000966
DM
1397 }
1398 }
1399
1400 result = cpg_fd_get(dfsm->cpg_handle, fd);
1401 if (result != CS_OK) {
1402 cfs_dom_critical(dfsm->log_domain, "cpg_fd_get failed: %d", result);
751a3b88 1403 goto err_finalize;
fe000966
DM
1404 }
1405
1406 return CS_OK;
1407
751a3b88
TL
1408 err_finalize:
1409 cpg_finalize(dfsm->cpg_handle);
1410 err_no_finalize:
fe000966
DM
1411 dfsm->cpg_handle = 0;
1412 return result;
1413}
1414
1415cs_error_t
1416dfsm_join(dfsm_t *dfsm)
1417{
1418 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1419 g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_LIBRARY);
1420 g_return_val_if_fail(dfsm->joined == 0, CS_ERR_EXIST);
1421
1422 cs_error_t result;
1423
1424 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1425 int retries = 0;
1426loop:
1427 result = cpg_join(dfsm->cpg_handle, &dfsm->cpg_group_name);
89fde9ac 1428 if (result == CS_ERR_TRY_AGAIN) {
fe000966
DM
1429 nanosleep(&tvreq, NULL);
1430 ++retries;
1431 if ((retries % 10) == 0)
1432 cfs_dom_message(dfsm->log_domain, "cpg_join retry %d", retries);
1433 goto loop;
1434 }
1435
1436 if (result != CS_OK) {
1437 cfs_dom_critical(dfsm->log_domain, "cpg_join failed: %d", result);
1438 return result;
1439 }
1440
1441 dfsm->joined = TRUE;
1442 return TRUE;
1443}
1444
1445cs_error_t
1446dfsm_leave (dfsm_t *dfsm)
1447{
1448 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1449 g_return_val_if_fail(dfsm->joined, CS_ERR_NOT_EXIST);
1450
1451 cs_error_t result;
1452
1453 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1454 int retries = 0;
1455loop:
1456 result = cpg_leave(dfsm->cpg_handle, &dfsm->cpg_group_name);
89fde9ac 1457 if (result == CS_ERR_TRY_AGAIN) {
fe000966
DM
1458 nanosleep(&tvreq, NULL);
1459 ++retries;
1460 if ((retries % 10) == 0)
1461 cfs_dom_message(dfsm->log_domain, "cpg_leave retry %d", retries);
1462 goto loop;
1463 }
1464
1465 if (result != CS_OK) {
1466 cfs_dom_critical(dfsm->log_domain, "cpg_leave failed: %d", result);
1467 return result;
1468 }
1469
1470 dfsm->joined = FALSE;
1471
1472 return TRUE;
1473}
1474
1475gboolean
1476dfsm_finalize(dfsm_t *dfsm)
1477{
1478 g_return_val_if_fail(dfsm != NULL, FALSE);
1479
1480 dfsm_send_sync_message_abort(dfsm);
1481
1482 if (dfsm->joined)
1483 dfsm_leave(dfsm);
1484
1485 if (dfsm->cpg_handle) {
1486 cpg_finalize(dfsm->cpg_handle);
1487 dfsm->cpg_handle = 0;
1488 dfsm->joined = FALSE;
1489 dfsm->we_are_member = 0;
1490 }
1491
1492 return TRUE;
1493}
1494
1495void
1496dfsm_destroy(dfsm_t *dfsm)
1497{
1498 g_return_if_fail(dfsm != NULL);
1499
1500 dfsm_finalize(dfsm);
1501
1502 if (dfsm->sync_info && dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn)
1503 dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info);
1504
1505 dfsm_free_sync_queue(dfsm);
1506
89fde9ac 1507 g_mutex_clear (&dfsm->mode_mutex);
fe000966 1508
89fde9ac 1509 g_mutex_clear (&dfsm->sync_mutex);
fe000966 1510
89fde9ac 1511 g_cond_clear (&dfsm->sync_cond);
fe000966
DM
1512
1513 if (dfsm->results)
1514 g_hash_table_destroy(dfsm->results);
1515
1516 if (dfsm->msg_queue) {
1517 dfsm_free_message_queue(dfsm);
1518 g_sequence_free(dfsm->msg_queue);
1519 }
1520
1521 if (dfsm->sync_info)
1522 g_free(dfsm->sync_info);
1523
1524 if (dfsm->cpg_handle)
1525 cpg_finalize(dfsm->cpg_handle);
1526
1527 if (dfsm->members)
1528 g_hash_table_destroy(dfsm->members);
1529
fe000966
DM
1530 g_free(dfsm);
1531}
1532
1533typedef struct {
1534 dfsm_t *dfsm;
1535} service_dfsm_private_t;
1536
1537static gboolean
1538service_dfsm_finalize(
1539 cfs_service_t *service,
1540 gpointer context)
1541{
1542 g_return_val_if_fail(service != NULL, FALSE);
1543 g_return_val_if_fail(context != NULL, FALSE);
1544
1545 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1546 dfsm_t *dfsm = private->dfsm;
1547
1548 g_return_val_if_fail(dfsm != NULL, FALSE);
1549
1550 return dfsm_finalize(dfsm);
1551}
1552
1553static int
1554service_dfsm_initialize(
1555 cfs_service_t *service,
1556 gpointer context)
1557{
1558 g_return_val_if_fail(service != NULL, -1);
1559 g_return_val_if_fail(context != NULL, -1);
1560
1561 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1562 dfsm_t *dfsm = private->dfsm;
1563
1564 g_return_val_if_fail(dfsm != NULL, -1);
1565
1566 /* serious internal error - don't try to recover */
1567 if (!dfsm_restartable(dfsm))
1568 return -1;
1569
1570 int fd = -1;
1571
1572 cs_error_t result;
1573 if ((result = dfsm_initialize(dfsm, &fd)) != CS_OK)
1574 return -1;
1575
1576 result = dfsm_join(dfsm);
1577 if (result != CS_OK) {
1578 /* we can't dispatch if not joined, so we need to finalize */
1579 dfsm_finalize(dfsm);
1580 return -1;
1581 }
1582
1583 return fd;
1584}
1585
1586static gboolean
1587service_dfsm_dispatch(
1588 cfs_service_t *service,
1589 gpointer context)
1590{
1591 g_return_val_if_fail(service != NULL, FALSE);
1592 g_return_val_if_fail(context != NULL, FALSE);
1593
1594 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1595 dfsm_t *dfsm = private->dfsm;
1596
1597 g_return_val_if_fail(dfsm != NULL, FALSE);
1598 g_return_val_if_fail(dfsm->cpg_handle != 0, FALSE);
1599
1600 cs_error_t result;
1601
89fde9ac 1602 result = dfsm_dispatch(dfsm, CS_DISPATCH_ONE);
fe000966
DM
1603 if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE)
1604 goto finalize;
1605 if (result != CS_OK)
1606 goto fail;
1607
1608 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1609 if (mode >= DFSM_ERROR_MODE_START) {
1610 if (dfsm->joined) {
1611 result = dfsm_leave(dfsm);
1612 if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE)
1613 goto finalize;
1614 if (result != CS_OK)
1615 goto finalize;
1616 } else {
1617 if (!dfsm->we_are_member)
1618 return FALSE;
1619 }
1620 }
1621
1622 return TRUE;
1623
751a3b88
TL
1624finalize:
1625 dfsm_finalize(dfsm);
fe000966
DM
1626fail:
1627 cfs_service_set_restartable(service, dfsm_restartable(dfsm));
1628 return FALSE;
fe000966
DM
1629}
1630
1631static void
1632service_dfsm_timer(
1633 cfs_service_t *service,
1634 gpointer context)
1635{
1636 g_return_if_fail(service != NULL);
1637 g_return_if_fail(context != NULL);
1638
1639 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1640 dfsm_t *dfsm = private->dfsm;
1641
1642 g_return_if_fail(dfsm != NULL);
1643
1644 dfsm_verify_request(dfsm);
1645}
1646
1647static cfs_service_callbacks_t cfs_dfsm_callbacks = {
1648 .cfs_service_initialize_fn = service_dfsm_initialize,
1649 .cfs_service_finalize_fn = service_dfsm_finalize,
1650 .cfs_service_dispatch_fn = service_dfsm_dispatch,
1651 .cfs_service_timer_fn = service_dfsm_timer,
1652};
1653
1654cfs_service_t *
1655service_dfsm_new(dfsm_t *dfsm)
1656{
1657 cfs_service_t *service;
1658
1659 g_return_val_if_fail(dfsm != NULL, NULL);
1660
1661 service_dfsm_private_t *private = g_new0(service_dfsm_private_t, 1);
1662 if (!private)
1663 return NULL;
1664
1665 private->dfsm = dfsm;
1666
1667 service = cfs_service_new(&cfs_dfsm_callbacks, dfsm->log_domain, private);
1668
1669 return service;
1670}
1671
1672void
1673service_dfsm_destroy(cfs_service_t *service)
1674{
1675 g_return_if_fail(service != NULL);
1676
1677 service_dfsm_private_t *private =
1678 (service_dfsm_private_t *)cfs_service_get_context(service);
1679
1680 g_free(private);
1681 g_free(service);
1682}
1683
1684
1685
1686