]> git.proxmox.com Git - pve-cluster.git/blame - data/src/dfsm.c
pmxcfs: update copyright in license header
[pve-cluster.git] / data / src / dfsm.c
CommitLineData
fe000966 1/*
84c98315 2 Copyright (C) 2010 - 2020 Proxmox Server Solutions GmbH
fe000966
DM
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19*/
20
21
22/* NOTE: we try to keep the CPG handle as long as possible, because
23 * calling cpg_initialize/cpg_finalize multiple times from the
24 * same process confuses corosync.
25 * Note: CS_ERR_LIBRARY is returned when corosync died
26 */
27
28#ifdef HAVE_CONFIG_H
29#include <config.h>
30#endif /* HAVE_CONFIG_H */
31
32#include <sys/types.h>
e5a5a3ea 33#include <inttypes.h>
fe000966
DM
34#include <unistd.h>
35#include <string.h>
36#include <stdlib.h>
37
38#include <corosync/corotypes.h>
39#include <corosync/cpg.h>
40#include <glib.h>
41
42#include "cfs-utils.h"
43#include "dfsm.h"
44
45static cpg_callbacks_t cpg_callbacks;
46
47typedef enum {
48 DFSM_MODE_START = 0,
49 DFSM_MODE_START_SYNC = 1,
50 DFSM_MODE_SYNCED = 2,
51 DFSM_MODE_UPDATE = 3,
52
53 /* values >= 128 indicates abnormal/error conditions */
54 DFSM_ERROR_MODE_START = 128,
55 DFSM_MODE_LEAVE = 253,
56 DFSM_MODE_VERSION_ERROR = 254,
57 DFSM_MODE_ERROR = 255,
58} dfsm_mode_t;
59
60typedef enum {
61 DFSM_MESSAGE_NORMAL = 0,
62 DFSM_MESSAGE_SYNC_START = 1,
63 DFSM_MESSAGE_STATE = 2,
64 DFSM_MESSAGE_UPDATE = 3,
65 DFSM_MESSAGE_UPDATE_COMPLETE = 4,
66 DFSM_MESSAGE_VERIFY_REQUEST = 5,
67 DFSM_MESSAGE_VERIFY = 6,
68} dfsm_message_t;
69
70#define DFSM_VALID_STATE_MESSAGE(mt) (mt >= DFSM_MESSAGE_SYNC_START && mt <= DFSM_MESSAGE_VERIFY)
71
72typedef struct {
73 uint16_t type;
74 uint16_t subtype;
75 uint32_t protocol_version;
76 uint32_t time;
77 uint32_t reserved;
78} dfsm_message_header_t;
79
80typedef struct {
81 uint32_t epoch; // per process (not globally unique)
82 uint32_t time;
83 uint32_t nodeid;
84 uint32_t pid;
85} dfsm_sync_epoch_t;
86
87typedef struct {
88 dfsm_message_header_t base;
89 dfsm_sync_epoch_t epoch;
90} dfsm_message_state_header_t;
91
92typedef struct {
93 dfsm_message_header_t base;
94 uint64_t count;
95} dfsm_message_normal_header_t;
96
97typedef struct {
98 uint32_t nodeid;
99 uint32_t pid;
100 uint64_t msg_count;
101 void *msg;
102 int msg_len; // fixme: unsigned?
103} dfsm_queued_message_t;
104
105struct dfsm {
6392e29a 106 const char *log_domain;
fe000966
DM
107 cpg_callbacks_t *cpg_callbacks;
108 dfsm_callbacks_t *dfsm_callbacks;
109 cpg_handle_t cpg_handle;
9dd86620 110 GMutex cpg_mutex;
fe000966
DM
111 struct cpg_name cpg_group_name;
112 uint32_t nodeid;
113 uint32_t pid;
114 int we_are_member;
115
116 guint32 protocol_version;
117 gpointer data;
118
119 gboolean joined;
120
121 /* mode is protected with mode_mutex */
89fde9ac 122 GMutex mode_mutex;
fe000966
DM
123 dfsm_mode_t mode;
124
125 GHashTable *members; /* contains dfsm_node_info_t pointers */
126 dfsm_sync_info_t *sync_info;
127 uint32_t local_epoch_counter;
128 dfsm_sync_epoch_t sync_epoch;
129 uint32_t lowest_nodeid;
130 GSequence *msg_queue;
131 GList *sync_queue;
132
133 /* synchrounous message transmission, protected with sync_mutex */
89fde9ac
DM
134 GMutex sync_mutex;
135 GCond sync_cond;
fe000966
DM
136 GHashTable *results;
137 uint64_t msgcount;
138 uint64_t msgcount_rcvd;
139
140 /* state verification */
141 guchar csum[32];
142 dfsm_sync_epoch_t csum_epoch;
143 uint64_t csum_id;
144 uint64_t csum_counter;
145};
146
147static gboolean dfsm_deliver_queue(dfsm_t *dfsm);
148static gboolean dfsm_deliver_sync_queue(dfsm_t *dfsm);
149
150gboolean
151dfsm_nodeid_is_local(
152 dfsm_t *dfsm,
153 uint32_t nodeid,
154 uint32_t pid)
155{
156 g_return_val_if_fail(dfsm != NULL, FALSE);
157
158 return (nodeid == dfsm->nodeid && pid == dfsm->pid);
159}
160
161
162static void
163dfsm_send_sync_message_abort(dfsm_t *dfsm)
164{
165 g_return_if_fail(dfsm != NULL);
fe000966 166
89fde9ac 167 g_mutex_lock (&dfsm->sync_mutex);
fe000966 168 dfsm->msgcount_rcvd = dfsm->msgcount;
89fde9ac
DM
169 g_cond_broadcast (&dfsm->sync_cond);
170 g_mutex_unlock (&dfsm->sync_mutex);
fe000966
DM
171}
172
173static void
174dfsm_record_local_result(
175 dfsm_t *dfsm,
176 uint64_t msg_count,
177 int msg_result,
178 gboolean processed)
179{
180 g_return_if_fail(dfsm != NULL);
fe000966
DM
181 g_return_if_fail(dfsm->results != NULL);
182
89fde9ac 183 g_mutex_lock (&dfsm->sync_mutex);
fe000966
DM
184 dfsm_result_t *rp = (dfsm_result_t *)g_hash_table_lookup(dfsm->results, &msg_count);
185 if (rp) {
186 rp->result = msg_result;
187 rp->processed = processed;
188 }
189 dfsm->msgcount_rcvd = msg_count;
89fde9ac
DM
190 g_cond_broadcast (&dfsm->sync_cond);
191 g_mutex_unlock (&dfsm->sync_mutex);
fe000966
DM
192}
193
89fde9ac 194static cs_error_t
fe000966
DM
195dfsm_send_message_full(
196 dfsm_t *dfsm,
197 struct iovec *iov,
198 unsigned int len,
199 int retry)
200{
201 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
202 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
203
204 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
89fde9ac 205 cs_error_t result;
fe000966
DM
206 int retries = 0;
207loop:
9dd86620 208 g_mutex_lock (&dfsm->cpg_mutex);
fe000966 209 result = cpg_mcast_joined(dfsm->cpg_handle, CPG_TYPE_AGREED, iov, len);
9dd86620 210 g_mutex_unlock (&dfsm->cpg_mutex);
89fde9ac 211 if (retry && result == CS_ERR_TRY_AGAIN) {
fe000966
DM
212 nanosleep(&tvreq, NULL);
213 ++retries;
214 if ((retries % 10) == 0)
215 cfs_dom_message(dfsm->log_domain, "cpg_send_message retry %d", retries);
216 if (retries < 100)
217 goto loop;
218 }
219
220 if (retries)
221 cfs_dom_message(dfsm->log_domain, "cpg_send_message retried %d times", retries);
222
223 if (result != CS_OK &&
89fde9ac 224 (!retry || result != CS_ERR_TRY_AGAIN))
fe000966
DM
225 cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
226
227 return result;
228}
229
89fde9ac 230static cs_error_t
fe000966
DM
231dfsm_send_state_message_full(
232 dfsm_t *dfsm,
233 uint16_t type,
234 struct iovec *iov,
235 unsigned int len)
236{
237 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
238 g_return_val_if_fail(DFSM_VALID_STATE_MESSAGE(type), CS_ERR_INVALID_PARAM);
239 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
240
241 dfsm_message_state_header_t header;
242 header.base.type = type;
243 header.base.subtype = 0;
244 header.base.protocol_version = dfsm->protocol_version;
245 header.base.time = time(NULL);
246 header.base.reserved = 0;
247
248 header.epoch = dfsm->sync_epoch;
249
250 struct iovec real_iov[len + 1];
251
252 real_iov[0].iov_base = (char *)&header;
253 real_iov[0].iov_len = sizeof(header);
254
255 for (int i = 0; i < len; i++)
256 real_iov[i + 1] = iov[i];
257
258 return dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
259}
260
89fde9ac 261cs_error_t
fe000966
DM
262dfsm_send_update(
263 dfsm_t *dfsm,
264 struct iovec *iov,
265 unsigned int len)
266{
267 return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE, iov, len);
268}
269
89fde9ac 270cs_error_t
fe000966
DM
271dfsm_send_update_complete(dfsm_t *dfsm)
272{
273 return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE_COMPLETE, NULL, 0);
274}
275
276
89fde9ac 277cs_error_t
fe000966
DM
278dfsm_send_message(
279 dfsm_t *dfsm,
280 uint16_t msgtype,
281 struct iovec *iov,
282 int len)
283{
284 return dfsm_send_message_sync(dfsm, msgtype, iov, len, NULL);
285}
286
89fde9ac 287cs_error_t
fe000966
DM
288dfsm_send_message_sync(
289 dfsm_t *dfsm,
290 uint16_t msgtype,
291 struct iovec *iov,
292 int len,
293 dfsm_result_t *rp)
294{
295 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
fe000966
DM
296 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
297
89fde9ac 298 g_mutex_lock (&dfsm->sync_mutex);
fe000966
DM
299 /* note: hold lock until message is sent - to guarantee ordering */
300 uint64_t msgcount = ++dfsm->msgcount;
301 if (rp) {
302 rp->msgcount = msgcount;
303 rp->processed = 0;
304 g_hash_table_replace(dfsm->results, &rp->msgcount, rp);
305 }
306
307 dfsm_message_normal_header_t header;
308 header.base.type = DFSM_MESSAGE_NORMAL;
309 header.base.subtype = msgtype;
310 header.base.protocol_version = dfsm->protocol_version;
311 header.base.time = time(NULL);
312 header.base.reserved = 0;
313 header.count = msgcount;
314
315 struct iovec real_iov[len + 1];
316
317 real_iov[0].iov_base = (char *)&header;
318 real_iov[0].iov_len = sizeof(header);
319
320 for (int i = 0; i < len; i++)
321 real_iov[i + 1] = iov[i];
322
89fde9ac 323 cs_error_t result = dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
fe000966 324
89fde9ac 325 g_mutex_unlock (&dfsm->sync_mutex);
fe000966
DM
326
327 if (result != CS_OK) {
328 cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
329
330 if (rp) {
89fde9ac 331 g_mutex_lock (&dfsm->sync_mutex);
fe000966 332 g_hash_table_remove(dfsm->results, &rp->msgcount);
89fde9ac 333 g_mutex_unlock (&dfsm->sync_mutex);
fe000966
DM
334 }
335 return result;
336 }
337
338 if (rp) {
89fde9ac 339 g_mutex_lock (&dfsm->sync_mutex);
fe000966
DM
340
341 while (dfsm->msgcount_rcvd < msgcount)
89fde9ac 342 g_cond_wait (&dfsm->sync_cond, &dfsm->sync_mutex);
fe000966
DM
343
344
345 g_hash_table_remove(dfsm->results, &rp->msgcount);
346
89fde9ac 347 g_mutex_unlock (&dfsm->sync_mutex);
fe000966
DM
348
349 return rp->processed ? CS_OK : CS_ERR_FAILED_OPERATION;
350 }
351
352 return CS_OK;
353}
354
355static gboolean
356dfsm_send_checksum(dfsm_t *dfsm)
357{
358 g_return_val_if_fail(dfsm != NULL, FALSE);
359
360 int len = 2;
361 struct iovec iov[len];
362
363 iov[0].iov_base = (char *)&dfsm->csum_id;
364 iov[0].iov_len = sizeof(dfsm->csum_id);
365 iov[1].iov_base = dfsm->csum;
366 iov[1].iov_len = sizeof(dfsm->csum);
367
368 gboolean res = (dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY, iov, len) == CS_OK);
369
370 return res;
371}
372
373static void
374dfsm_free_queue_entry(gpointer data)
375{
376 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)data;
377 g_free (qm->msg);
378 g_free (qm);
379}
380
381static void
382dfsm_free_message_queue(dfsm_t *dfsm)
383{
384 g_return_if_fail(dfsm != NULL);
385 g_return_if_fail(dfsm->msg_queue != NULL);
386
387 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
388 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
389 while (iter != end) {
390 GSequenceIter *cur = iter;
391 iter = g_sequence_iter_next(iter);
392 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
393 g_sequence_get(cur);
394 dfsm_free_queue_entry(qm);
395 g_sequence_remove(cur);
396 }
397}
398
399static void
400dfsm_free_sync_queue(dfsm_t *dfsm)
401{
402 g_return_if_fail(dfsm != NULL);
403
404 GList *iter = dfsm->sync_queue;
405 while (iter) {
406 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data;
407 iter = g_list_next(iter);
408 dfsm_free_queue_entry(qm);
409 }
410
411 g_list_free(dfsm->sync_queue);
412 dfsm->sync_queue = NULL;
413}
414
415static gint
416message_queue_sort_fn(
417 gconstpointer a,
418 gconstpointer b,
419 gpointer user_data)
420{
421 return ((dfsm_queued_message_t *)a)->msg_count -
422 ((dfsm_queued_message_t *)b)->msg_count;
423}
424
425static dfsm_node_info_t *
426dfsm_node_info_lookup(
427 dfsm_t *dfsm,
428 uint32_t nodeid,
429 uint32_t pid)
430{
431 g_return_val_if_fail(dfsm != NULL, NULL);
432 g_return_val_if_fail(dfsm->members != NULL, NULL);
433
434 dfsm_node_info_t info = { .nodeid = nodeid, .pid = pid };
435
436 return (dfsm_node_info_t *)g_hash_table_lookup(dfsm->members, &info);
437}
438
439static dfsm_queued_message_t *
440dfsm_queue_add_message(
441 dfsm_t *dfsm,
442 uint32_t nodeid,
443 uint32_t pid,
444 uint64_t msg_count,
445 const void *msg,
446 size_t msg_len)
447{
448 g_return_val_if_fail(dfsm != NULL, NULL);
449 g_return_val_if_fail(msg != NULL, NULL);
450 g_return_val_if_fail(msg_len != 0, NULL);
451
452 dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, nodeid, pid);
453 if (!ni) {
454 cfs_dom_critical(dfsm->log_domain, "dfsm_node_info_lookup failed");
455 return NULL;
456 }
457
458 dfsm_queued_message_t *qm = g_new0(dfsm_queued_message_t, 1);
459 g_return_val_if_fail(qm != NULL, NULL);
460
461 qm->nodeid = nodeid;
462 qm->pid = pid;
463 qm->msg = g_memdup (msg, msg_len);
464 qm->msg_len = msg_len;
465 qm->msg_count = msg_count;
466
467 if (dfsm->mode == DFSM_MODE_UPDATE && ni->synced) {
468 dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm);
469 } else {
470 /* NOTE: we only need to sort the queue because we resend all
471 * queued messages sometimes.
472 */
473 g_sequence_insert_sorted(dfsm->msg_queue, qm, message_queue_sort_fn, NULL);
474 }
475
476 return qm;
477}
478
479static guint
480dfsm_sync_info_hash(gconstpointer key)
481{
482 dfsm_node_info_t *info = (dfsm_node_info_t *)key;
483
484 return g_int_hash(&info->nodeid) + g_int_hash(&info->pid);
485}
486
487static gboolean
488dfsm_sync_info_equal(
489 gconstpointer v1,
490 gconstpointer v2)
491{
492 dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1;
493 dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2;
494
495 if (info1->nodeid == info2->nodeid &&
496 info1->pid == info2->pid)
497 return TRUE;
498
499 return FALSE;
500}
501
502static int
503dfsm_sync_info_compare(
504 gconstpointer v1,
505 gconstpointer v2)
506{
507 dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1;
508 dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2;
509
510 if (info1->nodeid != info2->nodeid)
511 return info1->nodeid - info2->nodeid;
512
513 return info1->pid - info2->pid;
514}
515
516static void
517dfsm_set_mode(
518 dfsm_t *dfsm,
519 dfsm_mode_t new_mode)
520{
521 g_return_if_fail(dfsm != NULL);
522
523 cfs_debug("dfsm_set_mode - set mode to %d", new_mode);
524
525 int changed = 0;
89fde9ac 526 g_mutex_lock (&dfsm->mode_mutex);
fe000966
DM
527 if (dfsm->mode != new_mode) {
528 if (new_mode < DFSM_ERROR_MODE_START ||
529 (dfsm->mode < DFSM_ERROR_MODE_START || new_mode >= dfsm->mode)) {
530 dfsm->mode = new_mode;
531 changed = 1;
532 }
533 }
89fde9ac 534 g_mutex_unlock (&dfsm->mode_mutex);
fe000966
DM
535
536 if (!changed)
537 return;
538
539 if (new_mode == DFSM_MODE_START) {
540 cfs_dom_message(dfsm->log_domain, "start cluster connection");
541 } else if (new_mode == DFSM_MODE_START_SYNC) {
542 cfs_dom_message(dfsm->log_domain, "starting data syncronisation");
543 } else if (new_mode == DFSM_MODE_SYNCED) {
544 cfs_dom_message(dfsm->log_domain, "all data is up to date");
545 if (dfsm->dfsm_callbacks->dfsm_synced_fn)
546 dfsm->dfsm_callbacks->dfsm_synced_fn(dfsm);
547 } else if (new_mode == DFSM_MODE_UPDATE) {
548 cfs_dom_message(dfsm->log_domain, "waiting for updates from leader");
549 } else if (new_mode == DFSM_MODE_LEAVE) {
550 cfs_dom_critical(dfsm->log_domain, "leaving CPG group");
551 } else if (new_mode == DFSM_MODE_ERROR) {
552 cfs_dom_critical(dfsm->log_domain, "serious internal error - stop cluster connection");
553 } else if (new_mode == DFSM_MODE_VERSION_ERROR) {
554 cfs_dom_critical(dfsm->log_domain, "detected newer protocol - please update this node");
555 }
556}
557
558static dfsm_mode_t
559dfsm_get_mode(dfsm_t *dfsm)
560{
561 g_return_val_if_fail(dfsm != NULL, DFSM_MODE_ERROR);
562
89fde9ac 563 g_mutex_lock (&dfsm->mode_mutex);
fe000966 564 dfsm_mode_t mode = dfsm->mode;
89fde9ac 565 g_mutex_unlock (&dfsm->mode_mutex);
fe000966
DM
566
567 return mode;
568}
569
570gboolean
571dfsm_restartable(dfsm_t *dfsm)
572{
573 dfsm_mode_t mode = dfsm_get_mode(dfsm);
574
575 return !(mode == DFSM_MODE_ERROR ||
576 mode == DFSM_MODE_VERSION_ERROR);
577}
578
579void
580dfsm_set_errormode(dfsm_t *dfsm)
581{
582 dfsm_set_mode(dfsm, DFSM_MODE_ERROR);
583}
584
585static void
586dfsm_release_sync_resources(
587 dfsm_t *dfsm,
588 const struct cpg_address *member_list,
589 size_t member_list_entries)
590{
591 g_return_if_fail(dfsm != NULL);
592 g_return_if_fail(dfsm->members != NULL);
593 g_return_if_fail(!member_list_entries || member_list != NULL);
594
595 cfs_debug("enter dfsm_release_sync_resources");
596
597 if (dfsm->sync_info) {
598
599 if (dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn) {
600 dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info);
601 dfsm->sync_info->data = NULL;
602 }
603
604 for (int i = 0; i < dfsm->sync_info->node_count; i++) {
605 if (dfsm->sync_info->nodes[i].state) {
606 g_free(dfsm->sync_info->nodes[i].state);
607 dfsm->sync_info->nodes[i].state = NULL;
608 dfsm->sync_info->nodes[i].state_len = 0;
609 }
610 }
611 }
612
613 if (member_list) {
614
615 g_hash_table_remove_all(dfsm->members);
616
617 if (dfsm->sync_info)
618 g_free(dfsm->sync_info);
619
620 int size = sizeof(dfsm_sync_info_t) +
621 member_list_entries*sizeof(dfsm_sync_info_t);
622 dfsm_sync_info_t *sync_info = dfsm->sync_info = g_malloc0(size);
623 sync_info->node_count = member_list_entries;
624
625 for (int i = 0; i < member_list_entries; i++) {
626 sync_info->nodes[i].nodeid = member_list[i].nodeid;
627 sync_info->nodes[i].pid = member_list[i].pid;
628 }
629
630 qsort(sync_info->nodes, member_list_entries, sizeof(dfsm_node_info_t),
631 dfsm_sync_info_compare);
632
633 for (int i = 0; i < member_list_entries; i++) {
634 dfsm_node_info_t *info = &sync_info->nodes[i];
635 g_hash_table_insert(dfsm->members, info, info);
636 if (info->nodeid == dfsm->nodeid && info->pid == dfsm->pid)
637 sync_info->local = info;
638 }
639 }
640}
641
642static void
643dfsm_cpg_deliver_callback(
644 cpg_handle_t handle,
645 const struct cpg_name *group_name,
646 uint32_t nodeid,
647 uint32_t pid,
648 void *msg,
649 size_t msg_len)
650{
651 cs_error_t result;
652
653 dfsm_t *dfsm = NULL;
654 result = cpg_context_get(handle, (gpointer *)&dfsm);
655 if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
84e1bd35 656 cfs_critical("cpg_context_get error: %d (%p)", result, (void *) dfsm);
fe000966
DM
657 return; /* we have no valid dfsm pointer, so we can just ignore this */
658 }
659 dfsm_mode_t mode = dfsm_get_mode(dfsm);
660
661 cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode);
662
663 if (mode >= DFSM_ERROR_MODE_START) {
664 cfs_dom_debug(dfsm->log_domain, "error mode - ignoring message");
665 goto leave;
666 }
667
668 if (!dfsm->sync_info) {
669 cfs_dom_critical(dfsm->log_domain, "no dfsm_sync_info - internal error");
670 goto leave;
671 }
672
673 if (msg_len < sizeof(dfsm_message_header_t)) {
e5a5a3ea 674 cfs_dom_critical(dfsm->log_domain, "received short message (%zd bytes)", msg_len);
fe000966
DM
675 goto leave;
676 }
677
678 dfsm_message_header_t *base_header = (dfsm_message_header_t *)msg;
679
680 if (base_header->protocol_version > dfsm->protocol_version) {
681 cfs_dom_critical(dfsm->log_domain, "received message with protocol version %d",
682 base_header->protocol_version);
683 dfsm_set_mode(dfsm, DFSM_MODE_VERSION_ERROR);
684 return;
685 } else if (base_header->protocol_version < dfsm->protocol_version) {
686 cfs_dom_message(dfsm->log_domain, "ignore message with wrong protocol version %d",
687 base_header->protocol_version);
688 return;
689 }
690
691 if (base_header->type == DFSM_MESSAGE_NORMAL) {
692
693 dfsm_message_normal_header_t *header = (dfsm_message_normal_header_t *)msg;
694
695 if (msg_len < sizeof(dfsm_message_normal_header_t)) {
e5a5a3ea 696 cfs_dom_critical(dfsm->log_domain, "received short message (type = %d, subtype = %d, %zd bytes)",
fe000966
DM
697 base_header->type, base_header->subtype, msg_len);
698 goto leave;
699 }
700
701 if (mode != DFSM_MODE_SYNCED) {
e5a5a3ea 702 cfs_dom_debug(dfsm->log_domain, "queue message %" PRIu64 " (subtype = %d, length = %zd)",
fe000966
DM
703 header->count, base_header->subtype, msg_len);
704
705 if (!dfsm_queue_add_message(dfsm, nodeid, pid, header->count, msg, msg_len))
706 goto leave;
707 } else {
708
709 int msg_res = -1;
710 int res = dfsm->dfsm_callbacks->dfsm_deliver_fn(
711 dfsm, dfsm->data, &msg_res, nodeid, pid, base_header->subtype,
975f9b0c 712 base_header->time, (uint8_t *)msg + sizeof(dfsm_message_normal_header_t),
fe000966
DM
713 msg_len - sizeof(dfsm_message_normal_header_t));
714
715 if (nodeid == dfsm->nodeid && pid == dfsm->pid)
716 dfsm_record_local_result(dfsm, header->count, msg_res, res);
717
718 if (res < 0)
719 goto leave;
720 }
721
722 return;
723 }
724
725 /* state related messages
726 * we needs right epoch - else we simply discard the message
727 */
728
729 dfsm_message_state_header_t *header = (dfsm_message_state_header_t *)msg;
730
731 if (msg_len < sizeof(dfsm_message_state_header_t)) {
e5a5a3ea 732 cfs_dom_critical(dfsm->log_domain, "received short state message (type = %d, subtype = %d, %zd bytes)",
fe000966
DM
733 base_header->type, base_header->subtype, msg_len);
734 goto leave;
735 }
736
737 if (base_header->type != DFSM_MESSAGE_SYNC_START &&
738 (memcmp(&header->epoch, &dfsm->sync_epoch, sizeof(dfsm_sync_epoch_t)) != 0)) {
739 cfs_dom_debug(dfsm->log_domain, "ignore message (msg_type == %d) with "
740 "wrong epoch (epoch %d/%d/%08X)", base_header->type,
741 header->epoch.nodeid, header->epoch.pid, header->epoch.epoch);
742 return;
743 }
744
975f9b0c 745 msg = (uint8_t *) msg + sizeof(dfsm_message_state_header_t);
fe000966
DM
746 msg_len -= sizeof(dfsm_message_state_header_t);
747
748 if (mode == DFSM_MODE_SYNCED) {
749 if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) {
750
751 for (int i = 0; i < dfsm->sync_info->node_count; i++)
752 dfsm->sync_info->nodes[i].synced = 1;
753
754 if (!dfsm_deliver_queue(dfsm))
755 goto leave;
756
757 return;
758
759 } else if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST) {
760
761 if (msg_len != sizeof(dfsm->csum_counter)) {
e5a5a3ea 762 cfs_dom_critical(dfsm->log_domain, "cpg received verify request with wrong length (%zd bytes) form node %d/%d", msg_len, nodeid, pid);
fe000966
DM
763 goto leave;
764 }
765
766 uint64_t csum_id = *((uint64_t *)msg);
975f9b0c 767 msg = (uint8_t *) msg + 8; msg_len -= 8;
fe000966 768
e5a5a3ea 769 cfs_dom_debug(dfsm->log_domain, "got verify request from node %d %016" PRIX64, nodeid, csum_id);
fe000966
DM
770
771 if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
772 if (!dfsm->dfsm_callbacks->dfsm_checksum_fn(
773 dfsm, dfsm->data, dfsm->csum, sizeof(dfsm->csum))) {
774 cfs_dom_critical(dfsm->log_domain, "unable to compute data checksum");
775 goto leave;
776 }
777
778 dfsm->csum_epoch = header->epoch;
779 dfsm->csum_id = csum_id;
780
781 if (nodeid == dfsm->nodeid && pid == dfsm->pid) {
782 if (!dfsm_send_checksum(dfsm))
783 goto leave;
784 }
785 }
786
787 return;
788
789 } else if (base_header->type == DFSM_MESSAGE_VERIFY) {
790
791 cfs_dom_debug(dfsm->log_domain, "received verify message");
792
793 if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
794
795 if (msg_len != (sizeof(dfsm->csum_id) + sizeof(dfsm->csum))) {
e5a5a3ea 796 cfs_dom_critical(dfsm->log_domain, "cpg received verify message with wrong length (%zd bytes)", msg_len);
fe000966
DM
797 goto leave;
798 }
799
800 uint64_t csum_id = *((uint64_t *)msg);
975f9b0c 801 msg = (uint8_t *) msg + 8; msg_len -= 8;
fe000966
DM
802
803 if (dfsm->csum_id == csum_id &&
804 (memcmp(&dfsm->csum_epoch, &header->epoch, sizeof(dfsm_sync_epoch_t)) == 0)) {
805 if (memcmp(msg, dfsm->csum, sizeof(dfsm->csum)) != 0) {
e5a5a3ea 806 cfs_dom_critical(dfsm->log_domain, "wrong checksum %016" PRIX64 " != %016" PRIX64 " - restarting",
fe000966
DM
807 *(uint64_t *)msg, *(uint64_t *)dfsm->csum);
808 goto leave;
809 } else {
810 cfs_dom_message(dfsm->log_domain, "data verification successful");
811 }
812 } else {
813 cfs_dom_message(dfsm->log_domain, "skip verification - no checksum saved");
814 }
815 }
816
817 return;
818
819 } else {
820 /* ignore (we already got all required updates, or we are leader) */
821 cfs_dom_debug(dfsm->log_domain, "ignore state sync message %d",
822 base_header->type);
823 return;
824 }
825
826 } else if (mode == DFSM_MODE_START_SYNC) {
827
828 if (base_header->type == DFSM_MESSAGE_SYNC_START) {
829
830 if (nodeid != dfsm->lowest_nodeid) {
831 cfs_dom_critical(dfsm->log_domain, "ignore sync request from wrong member %d/%d",
832 nodeid, pid);
833 }
834
835 cfs_dom_message(dfsm->log_domain, "received sync request (epoch %d/%d/%08X)",
836 header->epoch.nodeid, header->epoch.pid, header->epoch.epoch);
837
838 dfsm->sync_epoch = header->epoch;
839
840 dfsm_release_sync_resources(dfsm, NULL, 0);
841
842 unsigned int state_len = 0;
843 gpointer state = NULL;
844
845 state = dfsm->dfsm_callbacks->dfsm_get_state_fn(dfsm, dfsm->data, &state_len);
846
847 if (!(state && state_len)) {
848 cfs_dom_critical(dfsm->log_domain, "dfsm_get_state_fn failed");
849 goto leave;
850 }
851
852 struct iovec iov[1];
853 iov[0].iov_base = state;
854 iov[0].iov_len = state_len;
855
856 result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_STATE, iov, 1);
857
858 if (state)
859 g_free(state);
860
861 if (result != CS_OK)
862 goto leave;
863
864 return;
865
866 } else if (base_header->type == DFSM_MESSAGE_STATE) {
867
868 dfsm_node_info_t *ni;
869
870 if (!(ni = dfsm_node_info_lookup(dfsm, nodeid, pid))) {
871 cfs_dom_critical(dfsm->log_domain, "received state for non-member %d/%d", nodeid, pid);
872 goto leave;
873 }
874
875 if (ni->state) {
876 cfs_dom_critical(dfsm->log_domain, "received duplicate state for member %d/%d", nodeid, pid);
877 goto leave;
878 }
879
880 ni->state = g_memdup(msg, msg_len);
881 ni->state_len = msg_len;
882
883 int received_all = 1;
884 for (int i = 0; i < dfsm->sync_info->node_count; i++) {
885 if (!dfsm->sync_info->nodes[i].state) {
886 received_all = 0;
887 break;
888 }
889 }
890
891 if (received_all) {
892 cfs_dom_message(dfsm->log_domain, "received all states");
893
894 int res = dfsm->dfsm_callbacks->dfsm_process_state_update_fn(dfsm, dfsm->data, dfsm->sync_info);
895 if (res < 0)
896 goto leave;
897
898 if (dfsm->sync_info->local->synced) {
899 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
900 dfsm_release_sync_resources(dfsm, NULL, 0);
901
902 if (!dfsm_deliver_queue(dfsm))
903 goto leave;
904
905 } else {
906 dfsm_set_mode(dfsm, DFSM_MODE_UPDATE);
907
908 if (!dfsm_deliver_queue(dfsm))
909 goto leave;
910 }
911
912 }
913
914 return;
915 }
916
917 } else if (mode == DFSM_MODE_UPDATE) {
918
919 if (base_header->type == DFSM_MESSAGE_UPDATE) {
920
921 int res = dfsm->dfsm_callbacks->dfsm_process_update_fn(
922 dfsm, dfsm->data, dfsm->sync_info, nodeid, pid, msg, msg_len);
923
924 if (res < 0)
925 goto leave;
926
927 return;
928
929 } else if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) {
930
931
932 int res = dfsm->dfsm_callbacks->dfsm_commit_fn(dfsm, dfsm->data, dfsm->sync_info);
933
934 if (res < 0)
935 goto leave;
936
937 for (int i = 0; i < dfsm->sync_info->node_count; i++)
938 dfsm->sync_info->nodes[i].synced = 1;
939
940 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
941
942 if (!dfsm_deliver_sync_queue(dfsm))
943 goto leave;
944
945 if (!dfsm_deliver_queue(dfsm))
946 goto leave;
947
948 dfsm_release_sync_resources(dfsm, NULL, 0);
949
950 return;
951 }
952
953 } else {
954 cfs_dom_critical(dfsm->log_domain, "internal error - unknown mode %d", mode);
955 goto leave;
956 }
957
958 if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST ||
959 base_header->type == DFSM_MESSAGE_VERIFY) {
960
961 cfs_dom_debug(dfsm->log_domain, "ignore verify message %d while not synced", base_header->type);
962
963 } else {
e5a5a3ea 964 cfs_dom_critical(dfsm->log_domain, "received unknown state message type (type = %d, %zd bytes)",
fe000966
DM
965 base_header->type, msg_len);
966 goto leave;
967 }
968
969leave:
970 dfsm_set_mode(dfsm, DFSM_MODE_LEAVE);
971 dfsm_release_sync_resources(dfsm, NULL, 0);
972 return;
973}
974
975static gboolean
976dfsm_resend_queue(dfsm_t *dfsm)
977{
978 g_return_val_if_fail(dfsm != NULL, FALSE);
979 g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE);
980
981 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
982 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
983 gboolean res = TRUE;
984
985 while (iter != end) {
986 GSequenceIter *cur = iter;
987 iter = g_sequence_iter_next(iter);
988
989 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
990 g_sequence_get(cur);
991
992 if (qm->nodeid == dfsm->nodeid && qm->pid == dfsm->pid) {
89fde9ac 993 cs_error_t result;
fe000966
DM
994 struct iovec iov[1];
995 iov[0].iov_base = qm->msg;
996 iov[0].iov_len = qm->msg_len;
997
998 if ((result = dfsm_send_message_full(dfsm, iov, 1, 1)) != CS_OK) {
999 res = FALSE;
1000 break;
1001 }
1002 }
1003 }
1004
1005 dfsm_free_message_queue(dfsm);
1006
1007 return res;
1008}
1009
1010static gboolean
1011dfsm_deliver_sync_queue(dfsm_t *dfsm)
1012{
1013 g_return_val_if_fail(dfsm != NULL, FALSE);
1014
1015 if (!dfsm->sync_queue)
1016 return TRUE;
1017
1018 gboolean res = TRUE;
1019
1020 // fixme: cfs_debug
1021 cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__,
1022 g_list_length(dfsm->sync_queue));
1023
1024 GList *iter = dfsm->sync_queue;
1025 while (iter) {
1026 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data;
1027 iter = g_list_next(iter);
1028
1029 if (res && dfsm->mode == DFSM_MODE_SYNCED) {
1030 dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name,
1031 qm->nodeid, qm->pid, qm->msg, qm->msg_len);
1032 } else {
1033 res = FALSE;
1034 }
1035
1036 dfsm_free_queue_entry(qm);
1037 }
1038 g_list_free(dfsm->sync_queue);
1039 dfsm->sync_queue = NULL;
1040
1041 return res;
1042}
1043
1044static gboolean
1045dfsm_deliver_queue(dfsm_t *dfsm)
1046{
1047 g_return_val_if_fail(dfsm != NULL, FALSE);
1048 g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE);
1049
1050 int qlen = g_sequence_get_length(dfsm->msg_queue);
1051 if (!qlen)
1052 return TRUE;
1053
1054 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
1055 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
1056 gboolean res = TRUE;
1057
1058 // fixme: cfs_debug
1059 cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__, qlen);
1060
1061 while (iter != end) {
1062 GSequenceIter *cur = iter;
1063 iter = g_sequence_iter_next(iter);
1064
1065 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
1066 g_sequence_get(cur);
1067
1068 dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, qm->nodeid, qm->pid);
1069 if (!ni) {
d3eeade6 1070 cfs_dom_message(dfsm->log_domain, "remove message from non-member %d/%d",
fe000966
DM
1071 qm->nodeid, qm->pid);
1072 dfsm_free_queue_entry(qm);
1073 g_sequence_remove(cur);
1074 continue;
1075 }
1076
1077 if (dfsm->mode == DFSM_MODE_SYNCED) {
1078 if (ni->synced) {
1079 dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name,
1080 qm->nodeid, qm->pid, qm->msg, qm->msg_len);
1081 dfsm_free_queue_entry(qm);
1082 g_sequence_remove(cur);
1083 }
1084 } else if (dfsm->mode == DFSM_MODE_UPDATE) {
1085 if (ni->synced) {
1086 dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm);
1087 g_sequence_remove(cur);
1088 }
1089 } else {
1090 res = FALSE;
1091 break;
1092 }
1093 }
1094
1095 return res;
1096}
1097
1098static void
1099dfsm_cpg_confchg_callback(
1100 cpg_handle_t handle,
1101 const struct cpg_name *group_name,
1102 const struct cpg_address *member_list,
1103 size_t member_list_entries,
1104 const struct cpg_address *left_list,
1105 size_t left_list_entries,
1106 const struct cpg_address *joined_list,
1107 size_t joined_list_entries)
1108{
1109 cs_error_t result;
1110
1111 dfsm_t *dfsm = NULL;
1112 result = cpg_context_get(handle, (gpointer *)&dfsm);
1113 if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
84e1bd35 1114 cfs_critical("cpg_context_get error: %d (%p)", result, (void *) dfsm);
fe000966
DM
1115 return; /* we have no valid dfsm pointer, so we can just ignore this */
1116 }
1117
1118 dfsm->we_are_member = 0;
1119
1120 /* create new epoch */
1121 dfsm->local_epoch_counter++;
1122 dfsm->sync_epoch.epoch = dfsm->local_epoch_counter;
1123 dfsm->sync_epoch.nodeid = dfsm->nodeid;
1124 dfsm->sync_epoch.pid = dfsm->pid;
1125 dfsm->sync_epoch.time = time(NULL);
1126
1127 /* invalidate saved checksum */
1128 dfsm->csum_id = dfsm->csum_counter;
1129 memset(&dfsm->csum_epoch, 0, sizeof(dfsm->csum_epoch));
1130
1131 dfsm_free_sync_queue(dfsm);
1132
1133 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1134
1135 cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode);
1136
1137 if (mode >= DFSM_ERROR_MODE_START) {
1138 cfs_dom_debug(dfsm->log_domain, "already left group - ignore message");
1139 return;
1140 }
1141
1142 int lowest_nodeid = 0;
e26a1b5f 1143 GString *member_ids = g_string_new(NULL);
fe000966
DM
1144 for (int i = 0; i < member_list_entries; i++) {
1145
e26a1b5f 1146 g_string_append_printf(member_ids, i ? ", %d/%d" : "%d/%d",
fe000966
DM
1147 member_list[i].nodeid, member_list[i].pid);
1148
1149 if (lowest_nodeid == 0 || lowest_nodeid > member_list[i].nodeid)
1150 lowest_nodeid = member_list[i].nodeid;
1151
1152 if (member_list[i].nodeid == dfsm->nodeid &&
1153 member_list[i].pid == dfsm->pid)
1154 dfsm->we_are_member = 1;
1155 }
1156
1157
1158 if ((dfsm->we_are_member || mode != DFSM_MODE_START))
e26a1b5f 1159 cfs_dom_message(dfsm->log_domain, "members: %s", member_ids->str);
fe000966 1160
e26a1b5f 1161 g_string_free(member_ids, 1);
fe000966
DM
1162
1163 dfsm->lowest_nodeid = lowest_nodeid;
1164
1165 /* NOTE: one node can be in left and joined list at the same time,
1166 so it is better to query member list. Also JOIN/LEAVE list are
1167 different on different nodes!
1168 */
1169
1170 dfsm_release_sync_resources(dfsm, member_list, member_list_entries);
1171
1172 if (!dfsm->we_are_member) {
1173 if (mode == DFSM_MODE_START) {
1174 cfs_dom_debug(dfsm->log_domain, "ignore leave message");
1175 return;
1176 }
1177 cfs_dom_message(dfsm->log_domain, "we (%d/%d) left the process group",
1178 dfsm->nodeid, dfsm->pid);
1179 goto leave;
1180 }
1181
1182 if (member_list_entries > 1) {
1183
1184 int qlen = g_sequence_get_length(dfsm->msg_queue);
1185 if (joined_list_entries && qlen) {
1186 /* we need to make sure that all members have the same queue. */
1187 cfs_dom_message(dfsm->log_domain, "queue not emtpy - resening %d messages", qlen);
1188 if (!dfsm_resend_queue(dfsm)) {
1189 cfs_dom_critical(dfsm->log_domain, "dfsm_resend_queue failed");
1190 goto leave;
1191 }
1192 }
1193
1194 dfsm_set_mode(dfsm, DFSM_MODE_START_SYNC);
1195 if (lowest_nodeid == dfsm->nodeid) {
38fde8cc 1196 if (dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_SYNC_START, NULL, 0) != CS_OK) {
fe000966
DM
1197 cfs_dom_critical(dfsm->log_domain, "failed to send SYNC_START message");
1198 goto leave;
1199 }
1200 }
1201 } else {
1202 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
1203 dfsm->sync_info->local->synced = 1;
1204 if (!dfsm_deliver_queue(dfsm))
1205 goto leave;
1206 }
1207
1208 if (dfsm->dfsm_callbacks->dfsm_confchg_fn)
1209 dfsm->dfsm_callbacks->dfsm_confchg_fn(dfsm, dfsm->data, member_list, member_list_entries);
1210
1211 return;
1212leave:
1213 dfsm_set_mode(dfsm, DFSM_MODE_LEAVE);
1214 return;
1215}
1216
1217static cpg_callbacks_t cpg_callbacks = {
1218 .cpg_deliver_fn = dfsm_cpg_deliver_callback,
1219 .cpg_confchg_fn = dfsm_cpg_confchg_callback,
1220};
1221
1222dfsm_t *
1223dfsm_new(
1224 gpointer data,
1225 const char *group_name,
1226 const char *log_domain,
1227 guint32 protocol_version,
1228 dfsm_callbacks_t *callbacks)
1229{
1230 g_return_val_if_fail(sizeof(dfsm_message_header_t) == 16, NULL);
1231 g_return_val_if_fail(sizeof(dfsm_message_state_header_t) == 32, NULL);
1232 g_return_val_if_fail(sizeof(dfsm_message_normal_header_t) == 24, NULL);
1233
1234 g_return_val_if_fail(callbacks != NULL, NULL);
1235 g_return_val_if_fail(callbacks->dfsm_deliver_fn != NULL, NULL);
1236
1237 g_return_val_if_fail(callbacks->dfsm_get_state_fn != NULL, NULL);
1238 g_return_val_if_fail(callbacks->dfsm_process_state_update_fn != NULL, NULL);
1239 g_return_val_if_fail(callbacks->dfsm_process_update_fn != NULL, NULL);
1240 g_return_val_if_fail(callbacks->dfsm_commit_fn != NULL, NULL);
1241
1242 dfsm_t *dfsm;
1243
1244 if ((dfsm = g_new0(dfsm_t, 1)) == NULL)
1245 return NULL;
1246
89fde9ac
DM
1247 g_mutex_init(&dfsm->sync_mutex);
1248
1249 g_cond_init(&dfsm->sync_cond);
fe000966
DM
1250
1251 if (!(dfsm->results = g_hash_table_new(g_int64_hash, g_int64_equal)))
1252 goto err;
1253
1254 if (!(dfsm->msg_queue = g_sequence_new(NULL)))
1255 goto err;
9dd86620
FG
1256
1257 g_mutex_init(&dfsm->cpg_mutex);
1258
6392e29a 1259 dfsm->log_domain = log_domain;
fe000966
DM
1260 dfsm->data = data;
1261 dfsm->mode = DFSM_MODE_START;
1262 dfsm->protocol_version = protocol_version;
1263 strcpy (dfsm->cpg_group_name.value, group_name);
1264 dfsm->cpg_group_name.length = strlen (group_name) + 1;
1265
1266 dfsm->cpg_callbacks = &cpg_callbacks;
1267 dfsm->dfsm_callbacks = callbacks;
1268
1269 dfsm->members = g_hash_table_new(dfsm_sync_info_hash, dfsm_sync_info_equal);
1270 if (!dfsm->members)
1271 goto err;
1272
89fde9ac 1273 g_mutex_init(&dfsm->mode_mutex);
fe000966
DM
1274
1275 return dfsm;
1276
1277err:
1278 dfsm_destroy(dfsm);
1279 return NULL;
1280}
1281
af2e9dd4
DM
1282gboolean
1283dfsm_is_initialized(dfsm_t *dfsm)
1284{
1285 g_return_val_if_fail(dfsm != NULL, FALSE);
1286
1287 return (dfsm->cpg_handle != 0) ? TRUE : FALSE;
1288}
1289
fe000966
DM
1290gboolean
1291dfsm_lowest_nodeid(dfsm_t *dfsm)
1292{
1293 g_return_val_if_fail(dfsm != NULL, FALSE);
1294
1295 if (dfsm->lowest_nodeid && (dfsm->lowest_nodeid == dfsm->nodeid))
1296 return TRUE;
1297
1298 return FALSE;
1299}
1300
1301cs_error_t
1302dfsm_verify_request(dfsm_t *dfsm)
1303{
1304 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1305
1306 /* only do when we have lowest nodeid */
1307 if (!dfsm->lowest_nodeid || (dfsm->lowest_nodeid != dfsm->nodeid))
1308 return CS_OK;
1309
1310 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1311 if (mode != DFSM_MODE_SYNCED)
1312 return CS_OK;
1313
1314 int len = 1;
1315 struct iovec iov[len];
1316
1317 if (dfsm->csum_counter != dfsm->csum_id) {
e5a5a3ea 1318 g_message("delay verify request %016" PRIX64, dfsm->csum_counter + 1);
fe000966
DM
1319 return CS_OK;
1320 };
1321
1322 dfsm->csum_counter++;
1323 iov[0].iov_base = (char *)&dfsm->csum_counter;
1324 iov[0].iov_len = sizeof(dfsm->csum_counter);
1325
e5a5a3ea 1326 cfs_debug("send verify request %016" PRIX64, dfsm->csum_counter);
fe000966
DM
1327
1328 cs_error_t result;
1329 result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY_REQUEST, iov, len);
1330
1331 if (result != CS_OK)
1332 cfs_dom_critical(dfsm->log_domain, "failed to send VERIFY_REQUEST message");
1333
1334 return result;
1335}
1336
1337
1338cs_error_t
1339dfsm_dispatch(
1340 dfsm_t *dfsm,
1341 cs_dispatch_flags_t dispatch_types)
1342{
1343 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1344 g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_INVALID_PARAM);
1345
1346 cs_error_t result;
1347
1348 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1349 int retries = 0;
1350loop:
1351 result = cpg_dispatch(dfsm->cpg_handle, dispatch_types);
89fde9ac 1352 if (result == CS_ERR_TRY_AGAIN) {
fe000966
DM
1353 nanosleep(&tvreq, NULL);
1354 ++retries;
1355 if ((retries % 10) == 0)
1356 cfs_dom_message(dfsm->log_domain, "cpg_dispatch retry %d", retries);
1357 goto loop;
1358 }
1359
1360 if (!(result == CS_OK || result == CS_ERR_TRY_AGAIN)) {
1361 cfs_dom_critical(dfsm->log_domain, "cpg_dispatch failed: %d", result);
1362 }
1363
1364 return result;
1365}
1366
1367
1368cs_error_t
1369dfsm_initialize(dfsm_t *dfsm, int *fd)
1370{
1371 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1372 g_return_val_if_fail(fd != NULL, CS_ERR_INVALID_PARAM);
1373
1374 /* remove old messages */
1375 dfsm_free_message_queue(dfsm);
1376 dfsm_send_sync_message_abort(dfsm);
1377
1378 dfsm->joined = FALSE;
1379 dfsm->we_are_member = 0;
1380
1381 dfsm_set_mode(dfsm, DFSM_MODE_START);
1382
1383 cs_error_t result;
1384
1385 if (dfsm->cpg_handle == 0) {
1386 if ((result = cpg_initialize(&dfsm->cpg_handle, dfsm->cpg_callbacks)) != CS_OK) {
1387 cfs_dom_critical(dfsm->log_domain, "cpg_initialize failed: %d", result);
751a3b88 1388 goto err_no_finalize;
fe000966
DM
1389 }
1390
1391 if ((result = cpg_local_get(dfsm->cpg_handle, &dfsm->nodeid)) != CS_OK) {
1392 cfs_dom_critical(dfsm->log_domain, "cpg_local_get failed: %d", result);
751a3b88 1393 goto err_finalize;
fe000966
DM
1394 }
1395
1396 dfsm->pid = getpid();
1397
1398 result = cpg_context_set(dfsm->cpg_handle, dfsm);
1399 if (result != CS_OK) {
1400 cfs_dom_critical(dfsm->log_domain, "cpg_context_set failed: %d", result);
751a3b88 1401 goto err_finalize;
fe000966
DM
1402 }
1403 }
1404
1405 result = cpg_fd_get(dfsm->cpg_handle, fd);
1406 if (result != CS_OK) {
1407 cfs_dom_critical(dfsm->log_domain, "cpg_fd_get failed: %d", result);
751a3b88 1408 goto err_finalize;
fe000966
DM
1409 }
1410
1411 return CS_OK;
1412
751a3b88
TL
1413 err_finalize:
1414 cpg_finalize(dfsm->cpg_handle);
1415 err_no_finalize:
fe000966
DM
1416 dfsm->cpg_handle = 0;
1417 return result;
1418}
1419
1420cs_error_t
1421dfsm_join(dfsm_t *dfsm)
1422{
1423 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1424 g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_LIBRARY);
1425 g_return_val_if_fail(dfsm->joined == 0, CS_ERR_EXIST);
1426
1427 cs_error_t result;
1428
1429 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1430 int retries = 0;
1431loop:
9dd86620 1432 g_mutex_lock (&dfsm->cpg_mutex);
fe000966 1433 result = cpg_join(dfsm->cpg_handle, &dfsm->cpg_group_name);
9dd86620 1434 g_mutex_unlock (&dfsm->cpg_mutex);
89fde9ac 1435 if (result == CS_ERR_TRY_AGAIN) {
fe000966
DM
1436 nanosleep(&tvreq, NULL);
1437 ++retries;
1438 if ((retries % 10) == 0)
1439 cfs_dom_message(dfsm->log_domain, "cpg_join retry %d", retries);
1440 goto loop;
1441 }
1442
1443 if (result != CS_OK) {
1444 cfs_dom_critical(dfsm->log_domain, "cpg_join failed: %d", result);
1445 return result;
1446 }
1447
1448 dfsm->joined = TRUE;
1449 return TRUE;
1450}
1451
1452cs_error_t
1453dfsm_leave (dfsm_t *dfsm)
1454{
1455 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1456 g_return_val_if_fail(dfsm->joined, CS_ERR_NOT_EXIST);
1457
1458 cs_error_t result;
1459
1460 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1461 int retries = 0;
1462loop:
9dd86620 1463 g_mutex_lock (&dfsm->cpg_mutex);
fe000966 1464 result = cpg_leave(dfsm->cpg_handle, &dfsm->cpg_group_name);
9dd86620 1465 g_mutex_unlock (&dfsm->cpg_mutex);
89fde9ac 1466 if (result == CS_ERR_TRY_AGAIN) {
fe000966
DM
1467 nanosleep(&tvreq, NULL);
1468 ++retries;
1469 if ((retries % 10) == 0)
1470 cfs_dom_message(dfsm->log_domain, "cpg_leave retry %d", retries);
1471 goto loop;
1472 }
1473
1474 if (result != CS_OK) {
1475 cfs_dom_critical(dfsm->log_domain, "cpg_leave failed: %d", result);
1476 return result;
1477 }
1478
1479 dfsm->joined = FALSE;
1480
1481 return TRUE;
1482}
1483
1484gboolean
1485dfsm_finalize(dfsm_t *dfsm)
1486{
1487 g_return_val_if_fail(dfsm != NULL, FALSE);
1488
1489 dfsm_send_sync_message_abort(dfsm);
1490
1491 if (dfsm->joined)
1492 dfsm_leave(dfsm);
1493
1494 if (dfsm->cpg_handle) {
1495 cpg_finalize(dfsm->cpg_handle);
1496 dfsm->cpg_handle = 0;
1497 dfsm->joined = FALSE;
1498 dfsm->we_are_member = 0;
1499 }
1500
1501 return TRUE;
1502}
1503
1504void
1505dfsm_destroy(dfsm_t *dfsm)
1506{
1507 g_return_if_fail(dfsm != NULL);
1508
1509 dfsm_finalize(dfsm);
1510
1511 if (dfsm->sync_info && dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn)
1512 dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info);
1513
1514 dfsm_free_sync_queue(dfsm);
1515
89fde9ac 1516 g_mutex_clear (&dfsm->mode_mutex);
fe000966 1517
89fde9ac 1518 g_mutex_clear (&dfsm->sync_mutex);
fe000966 1519
89fde9ac 1520 g_cond_clear (&dfsm->sync_cond);
9dd86620
FG
1521
1522 g_mutex_clear (&dfsm->cpg_mutex);
fe000966
DM
1523
1524 if (dfsm->results)
1525 g_hash_table_destroy(dfsm->results);
1526
1527 if (dfsm->msg_queue) {
1528 dfsm_free_message_queue(dfsm);
1529 g_sequence_free(dfsm->msg_queue);
1530 }
1531
1532 if (dfsm->sync_info)
1533 g_free(dfsm->sync_info);
1534
1535 if (dfsm->cpg_handle)
1536 cpg_finalize(dfsm->cpg_handle);
1537
1538 if (dfsm->members)
1539 g_hash_table_destroy(dfsm->members);
1540
fe000966
DM
1541 g_free(dfsm);
1542}
1543
1544typedef struct {
1545 dfsm_t *dfsm;
1546} service_dfsm_private_t;
1547
1548static gboolean
1549service_dfsm_finalize(
1550 cfs_service_t *service,
1551 gpointer context)
1552{
1553 g_return_val_if_fail(service != NULL, FALSE);
1554 g_return_val_if_fail(context != NULL, FALSE);
1555
1556 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1557 dfsm_t *dfsm = private->dfsm;
1558
1559 g_return_val_if_fail(dfsm != NULL, FALSE);
1560
1561 return dfsm_finalize(dfsm);
1562}
1563
1564static int
1565service_dfsm_initialize(
1566 cfs_service_t *service,
1567 gpointer context)
1568{
1569 g_return_val_if_fail(service != NULL, -1);
1570 g_return_val_if_fail(context != NULL, -1);
1571
1572 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1573 dfsm_t *dfsm = private->dfsm;
1574
1575 g_return_val_if_fail(dfsm != NULL, -1);
1576
1577 /* serious internal error - don't try to recover */
1578 if (!dfsm_restartable(dfsm))
1579 return -1;
1580
1581 int fd = -1;
1582
1583 cs_error_t result;
1584 if ((result = dfsm_initialize(dfsm, &fd)) != CS_OK)
1585 return -1;
1586
1587 result = dfsm_join(dfsm);
1588 if (result != CS_OK) {
1589 /* we can't dispatch if not joined, so we need to finalize */
1590 dfsm_finalize(dfsm);
1591 return -1;
1592 }
1593
1594 return fd;
1595}
1596
1597static gboolean
1598service_dfsm_dispatch(
1599 cfs_service_t *service,
1600 gpointer context)
1601{
1602 g_return_val_if_fail(service != NULL, FALSE);
1603 g_return_val_if_fail(context != NULL, FALSE);
1604
1605 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1606 dfsm_t *dfsm = private->dfsm;
1607
1608 g_return_val_if_fail(dfsm != NULL, FALSE);
1609 g_return_val_if_fail(dfsm->cpg_handle != 0, FALSE);
1610
1611 cs_error_t result;
1612
89fde9ac 1613 result = dfsm_dispatch(dfsm, CS_DISPATCH_ONE);
fe000966
DM
1614 if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE)
1615 goto finalize;
1616 if (result != CS_OK)
1617 goto fail;
1618
1619 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1620 if (mode >= DFSM_ERROR_MODE_START) {
1621 if (dfsm->joined) {
1622 result = dfsm_leave(dfsm);
1623 if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE)
1624 goto finalize;
1625 if (result != CS_OK)
1626 goto finalize;
1627 } else {
1628 if (!dfsm->we_are_member)
1629 return FALSE;
1630 }
1631 }
1632
1633 return TRUE;
1634
751a3b88
TL
1635finalize:
1636 dfsm_finalize(dfsm);
fe000966
DM
1637fail:
1638 cfs_service_set_restartable(service, dfsm_restartable(dfsm));
1639 return FALSE;
fe000966
DM
1640}
1641
1642static void
1643service_dfsm_timer(
1644 cfs_service_t *service,
1645 gpointer context)
1646{
1647 g_return_if_fail(service != NULL);
1648 g_return_if_fail(context != NULL);
1649
1650 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1651 dfsm_t *dfsm = private->dfsm;
1652
1653 g_return_if_fail(dfsm != NULL);
1654
1655 dfsm_verify_request(dfsm);
1656}
1657
1658static cfs_service_callbacks_t cfs_dfsm_callbacks = {
1659 .cfs_service_initialize_fn = service_dfsm_initialize,
1660 .cfs_service_finalize_fn = service_dfsm_finalize,
1661 .cfs_service_dispatch_fn = service_dfsm_dispatch,
1662 .cfs_service_timer_fn = service_dfsm_timer,
1663};
1664
1665cfs_service_t *
1666service_dfsm_new(dfsm_t *dfsm)
1667{
1668 cfs_service_t *service;
1669
1670 g_return_val_if_fail(dfsm != NULL, NULL);
1671
1672 service_dfsm_private_t *private = g_new0(service_dfsm_private_t, 1);
1673 if (!private)
1674 return NULL;
1675
1676 private->dfsm = dfsm;
1677
1678 service = cfs_service_new(&cfs_dfsm_callbacks, dfsm->log_domain, private);
1679
1680 return service;
1681}
1682
1683void
1684service_dfsm_destroy(cfs_service_t *service)
1685{
1686 g_return_if_fail(service != NULL);
1687
1688 service_dfsm_private_t *private =
1689 (service_dfsm_private_t *)cfs_service_get_context(service);
1690
1691 g_free(private);
1692 g_free(service);
1693}
1694
1695
1696
1697