]> git.proxmox.com Git - pve-cluster.git/blob - data/src/dfsm.c
imported from svn 'pve-cluster/trunk'
[pve-cluster.git] / data / src / dfsm.c
1 /*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19 */
20
21
22 /* NOTE: we try to keep the CPG handle as long as possible, because
23 * calling cpg_initialize/cpg_finalize multiple times from the
24 * same process confuses corosync.
25 * Note: CS_ERR_LIBRARY is returned when corosync died
26 */
27
28 #ifdef HAVE_CONFIG_H
29 #include <config.h>
30 #endif /* HAVE_CONFIG_H */
31
32 #include <sys/types.h>
33 #include <unistd.h>
34 #include <string.h>
35 #include <stdlib.h>
36
37 #include <corosync/corotypes.h>
38 #include <corosync/cpg.h>
39 #include <glib.h>
40
41 #include "cfs-utils.h"
42 #include "dfsm.h"
43
44 static cpg_callbacks_t cpg_callbacks;
45
46 typedef enum {
47 DFSM_MODE_START = 0,
48 DFSM_MODE_START_SYNC = 1,
49 DFSM_MODE_SYNCED = 2,
50 DFSM_MODE_UPDATE = 3,
51
52 /* values >= 128 indicates abnormal/error conditions */
53 DFSM_ERROR_MODE_START = 128,
54 DFSM_MODE_LEAVE = 253,
55 DFSM_MODE_VERSION_ERROR = 254,
56 DFSM_MODE_ERROR = 255,
57 } dfsm_mode_t;
58
59 typedef enum {
60 DFSM_MESSAGE_NORMAL = 0,
61 DFSM_MESSAGE_SYNC_START = 1,
62 DFSM_MESSAGE_STATE = 2,
63 DFSM_MESSAGE_UPDATE = 3,
64 DFSM_MESSAGE_UPDATE_COMPLETE = 4,
65 DFSM_MESSAGE_VERIFY_REQUEST = 5,
66 DFSM_MESSAGE_VERIFY = 6,
67 } dfsm_message_t;
68
69 #define DFSM_VALID_STATE_MESSAGE(mt) (mt >= DFSM_MESSAGE_SYNC_START && mt <= DFSM_MESSAGE_VERIFY)
70
71 typedef struct {
72 uint16_t type;
73 uint16_t subtype;
74 uint32_t protocol_version;
75 uint32_t time;
76 uint32_t reserved;
77 } dfsm_message_header_t;
78
79 typedef struct {
80 uint32_t epoch; // per process (not globally unique)
81 uint32_t time;
82 uint32_t nodeid;
83 uint32_t pid;
84 } dfsm_sync_epoch_t;
85
86 typedef struct {
87 dfsm_message_header_t base;
88 dfsm_sync_epoch_t epoch;
89 } dfsm_message_state_header_t;
90
91 typedef struct {
92 dfsm_message_header_t base;
93 uint64_t count;
94 } dfsm_message_normal_header_t;
95
96 typedef struct {
97 uint32_t nodeid;
98 uint32_t pid;
99 uint64_t msg_count;
100 void *msg;
101 int msg_len; // fixme: unsigned?
102 } dfsm_queued_message_t;
103
104 struct dfsm {
105 char *log_domain;
106 cpg_callbacks_t *cpg_callbacks;
107 dfsm_callbacks_t *dfsm_callbacks;
108 cpg_handle_t cpg_handle;
109 struct cpg_name cpg_group_name;
110 uint32_t nodeid;
111 uint32_t pid;
112 int we_are_member;
113
114 guint32 protocol_version;
115 gpointer data;
116
117 gboolean joined;
118
119 /* mode is protected with mode_mutex */
120 GMutex *mode_mutex;
121 dfsm_mode_t mode;
122
123 GHashTable *members; /* contains dfsm_node_info_t pointers */
124 dfsm_sync_info_t *sync_info;
125 uint32_t local_epoch_counter;
126 dfsm_sync_epoch_t sync_epoch;
127 uint32_t lowest_nodeid;
128 GSequence *msg_queue;
129 GList *sync_queue;
130
131 /* synchrounous message transmission, protected with sync_mutex */
132 GMutex *sync_mutex;
133 GCond *sync_cond;
134 GHashTable *results;
135 uint64_t msgcount;
136 uint64_t msgcount_rcvd;
137
138 /* state verification */
139 guchar csum[32];
140 dfsm_sync_epoch_t csum_epoch;
141 uint64_t csum_id;
142 uint64_t csum_counter;
143 };
144
145 static gboolean dfsm_deliver_queue(dfsm_t *dfsm);
146 static gboolean dfsm_deliver_sync_queue(dfsm_t *dfsm);
147
148 gboolean
149 dfsm_nodeid_is_local(
150 dfsm_t *dfsm,
151 uint32_t nodeid,
152 uint32_t pid)
153 {
154 g_return_val_if_fail(dfsm != NULL, FALSE);
155
156 return (nodeid == dfsm->nodeid && pid == dfsm->pid);
157 }
158
159
160 static void
161 dfsm_send_sync_message_abort(dfsm_t *dfsm)
162 {
163 g_return_if_fail(dfsm != NULL);
164 g_return_if_fail(dfsm->sync_mutex != NULL);
165 g_return_if_fail(dfsm->sync_cond != NULL);
166
167 g_mutex_lock (dfsm->sync_mutex);
168 dfsm->msgcount_rcvd = dfsm->msgcount;
169 g_cond_broadcast (dfsm->sync_cond);
170 g_mutex_unlock (dfsm->sync_mutex);
171 }
172
173 static void
174 dfsm_record_local_result(
175 dfsm_t *dfsm,
176 uint64_t msg_count,
177 int msg_result,
178 gboolean processed)
179 {
180 g_return_if_fail(dfsm != NULL);
181 g_return_if_fail(dfsm->sync_mutex != NULL);
182 g_return_if_fail(dfsm->sync_cond != NULL);
183 g_return_if_fail(dfsm->results != NULL);
184
185 g_mutex_lock (dfsm->sync_mutex);
186 dfsm_result_t *rp = (dfsm_result_t *)g_hash_table_lookup(dfsm->results, &msg_count);
187 if (rp) {
188 rp->result = msg_result;
189 rp->processed = processed;
190 }
191 dfsm->msgcount_rcvd = msg_count;
192 g_cond_broadcast (dfsm->sync_cond);
193 g_mutex_unlock (dfsm->sync_mutex);
194 }
195
196 static cpg_error_t
197 dfsm_send_message_full(
198 dfsm_t *dfsm,
199 struct iovec *iov,
200 unsigned int len,
201 int retry)
202 {
203 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
204 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
205
206 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
207 cpg_error_t result;
208 int retries = 0;
209 loop:
210 result = cpg_mcast_joined(dfsm->cpg_handle, CPG_TYPE_AGREED, iov, len);
211 if (retry && result == CPG_ERR_TRY_AGAIN) {
212 nanosleep(&tvreq, NULL);
213 ++retries;
214 if ((retries % 10) == 0)
215 cfs_dom_message(dfsm->log_domain, "cpg_send_message retry %d", retries);
216 if (retries < 100)
217 goto loop;
218 }
219
220 if (retries)
221 cfs_dom_message(dfsm->log_domain, "cpg_send_message retried %d times", retries);
222
223 if (result != CS_OK &&
224 (!retry || result != CPG_ERR_TRY_AGAIN))
225 cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
226
227 return result;
228 }
229
230 static cpg_error_t
231 dfsm_send_state_message_full(
232 dfsm_t *dfsm,
233 uint16_t type,
234 struct iovec *iov,
235 unsigned int len)
236 {
237 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
238 g_return_val_if_fail(DFSM_VALID_STATE_MESSAGE(type), CS_ERR_INVALID_PARAM);
239 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
240
241 dfsm_message_state_header_t header;
242 header.base.type = type;
243 header.base.subtype = 0;
244 header.base.protocol_version = dfsm->protocol_version;
245 header.base.time = time(NULL);
246 header.base.reserved = 0;
247
248 header.epoch = dfsm->sync_epoch;
249
250 struct iovec real_iov[len + 1];
251
252 real_iov[0].iov_base = (char *)&header;
253 real_iov[0].iov_len = sizeof(header);
254
255 for (int i = 0; i < len; i++)
256 real_iov[i + 1] = iov[i];
257
258 return dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
259 }
260
261 cpg_error_t
262 dfsm_send_update(
263 dfsm_t *dfsm,
264 struct iovec *iov,
265 unsigned int len)
266 {
267 return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE, iov, len);
268 }
269
270 cpg_error_t
271 dfsm_send_update_complete(dfsm_t *dfsm)
272 {
273 return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE_COMPLETE, NULL, 0);
274 }
275
276
277 cpg_error_t
278 dfsm_send_message(
279 dfsm_t *dfsm,
280 uint16_t msgtype,
281 struct iovec *iov,
282 int len)
283 {
284 return dfsm_send_message_sync(dfsm, msgtype, iov, len, NULL);
285 }
286
287 cpg_error_t
288 dfsm_send_message_sync(
289 dfsm_t *dfsm,
290 uint16_t msgtype,
291 struct iovec *iov,
292 int len,
293 dfsm_result_t *rp)
294 {
295 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
296 g_return_val_if_fail(dfsm->sync_mutex != NULL, CS_ERR_INVALID_PARAM);
297 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
298
299 g_mutex_lock (dfsm->sync_mutex);
300 /* note: hold lock until message is sent - to guarantee ordering */
301 uint64_t msgcount = ++dfsm->msgcount;
302 if (rp) {
303 rp->msgcount = msgcount;
304 rp->processed = 0;
305 g_hash_table_replace(dfsm->results, &rp->msgcount, rp);
306 }
307
308 dfsm_message_normal_header_t header;
309 header.base.type = DFSM_MESSAGE_NORMAL;
310 header.base.subtype = msgtype;
311 header.base.protocol_version = dfsm->protocol_version;
312 header.base.time = time(NULL);
313 header.base.reserved = 0;
314 header.count = msgcount;
315
316 struct iovec real_iov[len + 1];
317
318 real_iov[0].iov_base = (char *)&header;
319 real_iov[0].iov_len = sizeof(header);
320
321 for (int i = 0; i < len; i++)
322 real_iov[i + 1] = iov[i];
323
324 cpg_error_t result = dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
325
326 g_mutex_unlock (dfsm->sync_mutex);
327
328 if (result != CS_OK) {
329 cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
330
331 if (rp) {
332 g_mutex_lock (dfsm->sync_mutex);
333 g_hash_table_remove(dfsm->results, &rp->msgcount);
334 g_mutex_unlock (dfsm->sync_mutex);
335 }
336 return result;
337 }
338
339 if (rp) {
340 g_mutex_lock (dfsm->sync_mutex);
341
342 while (dfsm->msgcount_rcvd < msgcount)
343 g_cond_wait (dfsm->sync_cond, dfsm->sync_mutex);
344
345
346 g_hash_table_remove(dfsm->results, &rp->msgcount);
347
348 g_mutex_unlock (dfsm->sync_mutex);
349
350 return rp->processed ? CS_OK : CS_ERR_FAILED_OPERATION;
351 }
352
353 return CS_OK;
354 }
355
356 static gboolean
357 dfsm_send_checksum(dfsm_t *dfsm)
358 {
359 g_return_val_if_fail(dfsm != NULL, FALSE);
360
361 int len = 2;
362 struct iovec iov[len];
363
364 iov[0].iov_base = (char *)&dfsm->csum_id;
365 iov[0].iov_len = sizeof(dfsm->csum_id);
366 iov[1].iov_base = dfsm->csum;
367 iov[1].iov_len = sizeof(dfsm->csum);
368
369 gboolean res = (dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY, iov, len) == CS_OK);
370
371 return res;
372 }
373
374 static void
375 dfsm_free_queue_entry(gpointer data)
376 {
377 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)data;
378 g_free (qm->msg);
379 g_free (qm);
380 }
381
382 static void
383 dfsm_free_message_queue(dfsm_t *dfsm)
384 {
385 g_return_if_fail(dfsm != NULL);
386 g_return_if_fail(dfsm->msg_queue != NULL);
387
388 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
389 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
390 while (iter != end) {
391 GSequenceIter *cur = iter;
392 iter = g_sequence_iter_next(iter);
393 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
394 g_sequence_get(cur);
395 dfsm_free_queue_entry(qm);
396 g_sequence_remove(cur);
397 }
398 }
399
400 static void
401 dfsm_free_sync_queue(dfsm_t *dfsm)
402 {
403 g_return_if_fail(dfsm != NULL);
404
405 GList *iter = dfsm->sync_queue;
406 while (iter) {
407 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data;
408 iter = g_list_next(iter);
409 dfsm_free_queue_entry(qm);
410 }
411
412 g_list_free(dfsm->sync_queue);
413 dfsm->sync_queue = NULL;
414 }
415
416 static gint
417 message_queue_sort_fn(
418 gconstpointer a,
419 gconstpointer b,
420 gpointer user_data)
421 {
422 return ((dfsm_queued_message_t *)a)->msg_count -
423 ((dfsm_queued_message_t *)b)->msg_count;
424 }
425
426 static dfsm_node_info_t *
427 dfsm_node_info_lookup(
428 dfsm_t *dfsm,
429 uint32_t nodeid,
430 uint32_t pid)
431 {
432 g_return_val_if_fail(dfsm != NULL, NULL);
433 g_return_val_if_fail(dfsm->members != NULL, NULL);
434
435 dfsm_node_info_t info = { .nodeid = nodeid, .pid = pid };
436
437 return (dfsm_node_info_t *)g_hash_table_lookup(dfsm->members, &info);
438 }
439
440 static dfsm_queued_message_t *
441 dfsm_queue_add_message(
442 dfsm_t *dfsm,
443 uint32_t nodeid,
444 uint32_t pid,
445 uint64_t msg_count,
446 const void *msg,
447 size_t msg_len)
448 {
449 g_return_val_if_fail(dfsm != NULL, NULL);
450 g_return_val_if_fail(msg != NULL, NULL);
451 g_return_val_if_fail(msg_len != 0, NULL);
452
453 dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, nodeid, pid);
454 if (!ni) {
455 cfs_dom_critical(dfsm->log_domain, "dfsm_node_info_lookup failed");
456 return NULL;
457 }
458
459 dfsm_queued_message_t *qm = g_new0(dfsm_queued_message_t, 1);
460 g_return_val_if_fail(qm != NULL, NULL);
461
462 qm->nodeid = nodeid;
463 qm->pid = pid;
464 qm->msg = g_memdup (msg, msg_len);
465 qm->msg_len = msg_len;
466 qm->msg_count = msg_count;
467
468 if (dfsm->mode == DFSM_MODE_UPDATE && ni->synced) {
469 dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm);
470 } else {
471 /* NOTE: we only need to sort the queue because we resend all
472 * queued messages sometimes.
473 */
474 g_sequence_insert_sorted(dfsm->msg_queue, qm, message_queue_sort_fn, NULL);
475 }
476
477 return qm;
478 }
479
480 static guint
481 dfsm_sync_info_hash(gconstpointer key)
482 {
483 dfsm_node_info_t *info = (dfsm_node_info_t *)key;
484
485 return g_int_hash(&info->nodeid) + g_int_hash(&info->pid);
486 }
487
488 static gboolean
489 dfsm_sync_info_equal(
490 gconstpointer v1,
491 gconstpointer v2)
492 {
493 dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1;
494 dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2;
495
496 if (info1->nodeid == info2->nodeid &&
497 info1->pid == info2->pid)
498 return TRUE;
499
500 return FALSE;
501 }
502
503 static int
504 dfsm_sync_info_compare(
505 gconstpointer v1,
506 gconstpointer v2)
507 {
508 dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1;
509 dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2;
510
511 if (info1->nodeid != info2->nodeid)
512 return info1->nodeid - info2->nodeid;
513
514 return info1->pid - info2->pid;
515 }
516
517 static void
518 dfsm_set_mode(
519 dfsm_t *dfsm,
520 dfsm_mode_t new_mode)
521 {
522 g_return_if_fail(dfsm != NULL);
523
524 cfs_debug("dfsm_set_mode - set mode to %d", new_mode);
525
526 int changed = 0;
527 g_mutex_lock (dfsm->mode_mutex);
528 if (dfsm->mode != new_mode) {
529 if (new_mode < DFSM_ERROR_MODE_START ||
530 (dfsm->mode < DFSM_ERROR_MODE_START || new_mode >= dfsm->mode)) {
531 dfsm->mode = new_mode;
532 changed = 1;
533 }
534 }
535 g_mutex_unlock (dfsm->mode_mutex);
536
537 if (!changed)
538 return;
539
540 if (new_mode == DFSM_MODE_START) {
541 cfs_dom_message(dfsm->log_domain, "start cluster connection");
542 } else if (new_mode == DFSM_MODE_START_SYNC) {
543 cfs_dom_message(dfsm->log_domain, "starting data syncronisation");
544 } else if (new_mode == DFSM_MODE_SYNCED) {
545 cfs_dom_message(dfsm->log_domain, "all data is up to date");
546 if (dfsm->dfsm_callbacks->dfsm_synced_fn)
547 dfsm->dfsm_callbacks->dfsm_synced_fn(dfsm);
548 } else if (new_mode == DFSM_MODE_UPDATE) {
549 cfs_dom_message(dfsm->log_domain, "waiting for updates from leader");
550 } else if (new_mode == DFSM_MODE_LEAVE) {
551 cfs_dom_critical(dfsm->log_domain, "leaving CPG group");
552 } else if (new_mode == DFSM_MODE_ERROR) {
553 cfs_dom_critical(dfsm->log_domain, "serious internal error - stop cluster connection");
554 } else if (new_mode == DFSM_MODE_VERSION_ERROR) {
555 cfs_dom_critical(dfsm->log_domain, "detected newer protocol - please update this node");
556 }
557 }
558
559 static dfsm_mode_t
560 dfsm_get_mode(dfsm_t *dfsm)
561 {
562 g_return_val_if_fail(dfsm != NULL, DFSM_MODE_ERROR);
563
564 g_mutex_lock (dfsm->mode_mutex);
565 dfsm_mode_t mode = dfsm->mode;
566 g_mutex_unlock (dfsm->mode_mutex);
567
568 return mode;
569 }
570
571 gboolean
572 dfsm_restartable(dfsm_t *dfsm)
573 {
574 dfsm_mode_t mode = dfsm_get_mode(dfsm);
575
576 return !(mode == DFSM_MODE_ERROR ||
577 mode == DFSM_MODE_VERSION_ERROR);
578 }
579
580 void
581 dfsm_set_errormode(dfsm_t *dfsm)
582 {
583 dfsm_set_mode(dfsm, DFSM_MODE_ERROR);
584 }
585
586 static void
587 dfsm_release_sync_resources(
588 dfsm_t *dfsm,
589 const struct cpg_address *member_list,
590 size_t member_list_entries)
591 {
592 g_return_if_fail(dfsm != NULL);
593 g_return_if_fail(dfsm->members != NULL);
594 g_return_if_fail(!member_list_entries || member_list != NULL);
595
596 cfs_debug("enter dfsm_release_sync_resources");
597
598 if (dfsm->sync_info) {
599
600 if (dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn) {
601 dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info);
602 dfsm->sync_info->data = NULL;
603 }
604
605 for (int i = 0; i < dfsm->sync_info->node_count; i++) {
606 if (dfsm->sync_info->nodes[i].state) {
607 g_free(dfsm->sync_info->nodes[i].state);
608 dfsm->sync_info->nodes[i].state = NULL;
609 dfsm->sync_info->nodes[i].state_len = 0;
610 }
611 }
612 }
613
614 if (member_list) {
615
616 g_hash_table_remove_all(dfsm->members);
617
618 if (dfsm->sync_info)
619 g_free(dfsm->sync_info);
620
621 int size = sizeof(dfsm_sync_info_t) +
622 member_list_entries*sizeof(dfsm_sync_info_t);
623 dfsm_sync_info_t *sync_info = dfsm->sync_info = g_malloc0(size);
624 sync_info->node_count = member_list_entries;
625
626 for (int i = 0; i < member_list_entries; i++) {
627 sync_info->nodes[i].nodeid = member_list[i].nodeid;
628 sync_info->nodes[i].pid = member_list[i].pid;
629 }
630
631 qsort(sync_info->nodes, member_list_entries, sizeof(dfsm_node_info_t),
632 dfsm_sync_info_compare);
633
634 for (int i = 0; i < member_list_entries; i++) {
635 dfsm_node_info_t *info = &sync_info->nodes[i];
636 g_hash_table_insert(dfsm->members, info, info);
637 if (info->nodeid == dfsm->nodeid && info->pid == dfsm->pid)
638 sync_info->local = info;
639 }
640 }
641 }
642
643 static void
644 dfsm_cpg_deliver_callback(
645 cpg_handle_t handle,
646 const struct cpg_name *group_name,
647 uint32_t nodeid,
648 uint32_t pid,
649 void *msg,
650 size_t msg_len)
651 {
652 cs_error_t result;
653
654 dfsm_t *dfsm = NULL;
655 result = cpg_context_get(handle, (gpointer *)&dfsm);
656 if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
657 cfs_critical("cpg_context_get error: %d (%p)", result, dfsm);
658 return; /* we have no valid dfsm pointer, so we can just ignore this */
659 }
660 dfsm_mode_t mode = dfsm_get_mode(dfsm);
661
662 cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode);
663
664 if (mode >= DFSM_ERROR_MODE_START) {
665 cfs_dom_debug(dfsm->log_domain, "error mode - ignoring message");
666 goto leave;
667 }
668
669 if (!dfsm->sync_info) {
670 cfs_dom_critical(dfsm->log_domain, "no dfsm_sync_info - internal error");
671 goto leave;
672 }
673
674 if (msg_len < sizeof(dfsm_message_header_t)) {
675 cfs_dom_critical(dfsm->log_domain, "received short message (%ld bytes)", msg_len);
676 goto leave;
677 }
678
679 dfsm_message_header_t *base_header = (dfsm_message_header_t *)msg;
680
681 if (base_header->protocol_version > dfsm->protocol_version) {
682 cfs_dom_critical(dfsm->log_domain, "received message with protocol version %d",
683 base_header->protocol_version);
684 dfsm_set_mode(dfsm, DFSM_MODE_VERSION_ERROR);
685 return;
686 } else if (base_header->protocol_version < dfsm->protocol_version) {
687 cfs_dom_message(dfsm->log_domain, "ignore message with wrong protocol version %d",
688 base_header->protocol_version);
689 return;
690 }
691
692 if (base_header->type == DFSM_MESSAGE_NORMAL) {
693
694 dfsm_message_normal_header_t *header = (dfsm_message_normal_header_t *)msg;
695
696 if (msg_len < sizeof(dfsm_message_normal_header_t)) {
697 cfs_dom_critical(dfsm->log_domain, "received short message (type = %d, subtype = %d, %ld bytes)",
698 base_header->type, base_header->subtype, msg_len);
699 goto leave;
700 }
701
702 if (mode != DFSM_MODE_SYNCED) {
703 cfs_dom_debug(dfsm->log_domain, "queue message %zu (subtype = %d, length = %ld)",
704 header->count, base_header->subtype, msg_len);
705
706 if (!dfsm_queue_add_message(dfsm, nodeid, pid, header->count, msg, msg_len))
707 goto leave;
708 } else {
709
710 int msg_res = -1;
711 int res = dfsm->dfsm_callbacks->dfsm_deliver_fn(
712 dfsm, dfsm->data, &msg_res, nodeid, pid, base_header->subtype,
713 base_header->time, msg + sizeof(dfsm_message_normal_header_t),
714 msg_len - sizeof(dfsm_message_normal_header_t));
715
716 if (nodeid == dfsm->nodeid && pid == dfsm->pid)
717 dfsm_record_local_result(dfsm, header->count, msg_res, res);
718
719 if (res < 0)
720 goto leave;
721 }
722
723 return;
724 }
725
726 /* state related messages
727 * we needs right epoch - else we simply discard the message
728 */
729
730 dfsm_message_state_header_t *header = (dfsm_message_state_header_t *)msg;
731
732 if (msg_len < sizeof(dfsm_message_state_header_t)) {
733 cfs_dom_critical(dfsm->log_domain, "received short state message (type = %d, subtype = %d, %ld bytes)",
734 base_header->type, base_header->subtype, msg_len);
735 goto leave;
736 }
737
738 if (base_header->type != DFSM_MESSAGE_SYNC_START &&
739 (memcmp(&header->epoch, &dfsm->sync_epoch, sizeof(dfsm_sync_epoch_t)) != 0)) {
740 cfs_dom_debug(dfsm->log_domain, "ignore message (msg_type == %d) with "
741 "wrong epoch (epoch %d/%d/%08X)", base_header->type,
742 header->epoch.nodeid, header->epoch.pid, header->epoch.epoch);
743 return;
744 }
745
746 msg += sizeof(dfsm_message_state_header_t);
747 msg_len -= sizeof(dfsm_message_state_header_t);
748
749 if (mode == DFSM_MODE_SYNCED) {
750 if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) {
751
752 for (int i = 0; i < dfsm->sync_info->node_count; i++)
753 dfsm->sync_info->nodes[i].synced = 1;
754
755 if (!dfsm_deliver_queue(dfsm))
756 goto leave;
757
758 return;
759
760 } else if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST) {
761
762 if (msg_len != sizeof(dfsm->csum_counter)) {
763 cfs_dom_critical(dfsm->log_domain, "cpg received verify request with wrong length (%ld bytes) form node %d/%d", msg_len, nodeid, pid);
764 goto leave;
765 }
766
767 uint64_t csum_id = *((uint64_t *)msg);
768 msg += 8; msg_len -= 8;
769
770 cfs_dom_debug(dfsm->log_domain, "got verify request from node %d %016zX", nodeid, csum_id);
771
772 if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
773 if (!dfsm->dfsm_callbacks->dfsm_checksum_fn(
774 dfsm, dfsm->data, dfsm->csum, sizeof(dfsm->csum))) {
775 cfs_dom_critical(dfsm->log_domain, "unable to compute data checksum");
776 goto leave;
777 }
778
779 dfsm->csum_epoch = header->epoch;
780 dfsm->csum_id = csum_id;
781
782 if (nodeid == dfsm->nodeid && pid == dfsm->pid) {
783 if (!dfsm_send_checksum(dfsm))
784 goto leave;
785 }
786 }
787
788 return;
789
790 } else if (base_header->type == DFSM_MESSAGE_VERIFY) {
791
792 cfs_dom_debug(dfsm->log_domain, "received verify message");
793
794 if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
795
796 if (msg_len != (sizeof(dfsm->csum_id) + sizeof(dfsm->csum))) {
797 cfs_dom_critical(dfsm->log_domain, "cpg received verify message with wrong length (%ld bytes)", msg_len);
798 goto leave;
799 }
800
801 uint64_t csum_id = *((uint64_t *)msg);
802 msg += 8; msg_len -= 8;
803
804 if (dfsm->csum_id == csum_id &&
805 (memcmp(&dfsm->csum_epoch, &header->epoch, sizeof(dfsm_sync_epoch_t)) == 0)) {
806 if (memcmp(msg, dfsm->csum, sizeof(dfsm->csum)) != 0) {
807 cfs_dom_critical(dfsm->log_domain, "wrong checksum %016zX != %016zX - restarting",
808 *(uint64_t *)msg, *(uint64_t *)dfsm->csum);
809 goto leave;
810 } else {
811 cfs_dom_message(dfsm->log_domain, "data verification successful");
812 }
813 } else {
814 cfs_dom_message(dfsm->log_domain, "skip verification - no checksum saved");
815 }
816 }
817
818 return;
819
820 } else {
821 /* ignore (we already got all required updates, or we are leader) */
822 cfs_dom_debug(dfsm->log_domain, "ignore state sync message %d",
823 base_header->type);
824 return;
825 }
826
827 } else if (mode == DFSM_MODE_START_SYNC) {
828
829 if (base_header->type == DFSM_MESSAGE_SYNC_START) {
830
831 if (nodeid != dfsm->lowest_nodeid) {
832 cfs_dom_critical(dfsm->log_domain, "ignore sync request from wrong member %d/%d",
833 nodeid, pid);
834 }
835
836 cfs_dom_message(dfsm->log_domain, "received sync request (epoch %d/%d/%08X)",
837 header->epoch.nodeid, header->epoch.pid, header->epoch.epoch);
838
839 dfsm->sync_epoch = header->epoch;
840
841 dfsm_release_sync_resources(dfsm, NULL, 0);
842
843 unsigned int state_len = 0;
844 gpointer state = NULL;
845
846 state = dfsm->dfsm_callbacks->dfsm_get_state_fn(dfsm, dfsm->data, &state_len);
847
848 if (!(state && state_len)) {
849 cfs_dom_critical(dfsm->log_domain, "dfsm_get_state_fn failed");
850 goto leave;
851 }
852
853 struct iovec iov[1];
854 iov[0].iov_base = state;
855 iov[0].iov_len = state_len;
856
857 result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_STATE, iov, 1);
858
859 if (state)
860 g_free(state);
861
862 if (result != CS_OK)
863 goto leave;
864
865 return;
866
867 } else if (base_header->type == DFSM_MESSAGE_STATE) {
868
869 dfsm_node_info_t *ni;
870
871 if (!(ni = dfsm_node_info_lookup(dfsm, nodeid, pid))) {
872 cfs_dom_critical(dfsm->log_domain, "received state for non-member %d/%d", nodeid, pid);
873 goto leave;
874 }
875
876 if (ni->state) {
877 cfs_dom_critical(dfsm->log_domain, "received duplicate state for member %d/%d", nodeid, pid);
878 goto leave;
879 }
880
881 ni->state = g_memdup(msg, msg_len);
882 ni->state_len = msg_len;
883
884 int received_all = 1;
885 for (int i = 0; i < dfsm->sync_info->node_count; i++) {
886 if (!dfsm->sync_info->nodes[i].state) {
887 received_all = 0;
888 break;
889 }
890 }
891
892 if (received_all) {
893 cfs_dom_message(dfsm->log_domain, "received all states");
894
895 int res = dfsm->dfsm_callbacks->dfsm_process_state_update_fn(dfsm, dfsm->data, dfsm->sync_info);
896 if (res < 0)
897 goto leave;
898
899 if (dfsm->sync_info->local->synced) {
900 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
901 dfsm_release_sync_resources(dfsm, NULL, 0);
902
903 if (!dfsm_deliver_queue(dfsm))
904 goto leave;
905
906 } else {
907 dfsm_set_mode(dfsm, DFSM_MODE_UPDATE);
908
909 if (!dfsm_deliver_queue(dfsm))
910 goto leave;
911 }
912
913 }
914
915 return;
916 }
917
918 } else if (mode == DFSM_MODE_UPDATE) {
919
920 if (base_header->type == DFSM_MESSAGE_UPDATE) {
921
922 int res = dfsm->dfsm_callbacks->dfsm_process_update_fn(
923 dfsm, dfsm->data, dfsm->sync_info, nodeid, pid, msg, msg_len);
924
925 if (res < 0)
926 goto leave;
927
928 return;
929
930 } else if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) {
931
932
933 int res = dfsm->dfsm_callbacks->dfsm_commit_fn(dfsm, dfsm->data, dfsm->sync_info);
934
935 if (res < 0)
936 goto leave;
937
938 for (int i = 0; i < dfsm->sync_info->node_count; i++)
939 dfsm->sync_info->nodes[i].synced = 1;
940
941 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
942
943 if (!dfsm_deliver_sync_queue(dfsm))
944 goto leave;
945
946 if (!dfsm_deliver_queue(dfsm))
947 goto leave;
948
949 dfsm_release_sync_resources(dfsm, NULL, 0);
950
951 return;
952 }
953
954 } else {
955 cfs_dom_critical(dfsm->log_domain, "internal error - unknown mode %d", mode);
956 goto leave;
957 }
958
959 if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST ||
960 base_header->type == DFSM_MESSAGE_VERIFY) {
961
962 cfs_dom_debug(dfsm->log_domain, "ignore verify message %d while not synced", base_header->type);
963
964 } else {
965 cfs_dom_critical(dfsm->log_domain, "received unknown state message type (type = %d, %ld bytes)",
966 base_header->type, msg_len);
967 goto leave;
968 }
969
970 leave:
971 dfsm_set_mode(dfsm, DFSM_MODE_LEAVE);
972 dfsm_release_sync_resources(dfsm, NULL, 0);
973 return;
974 }
975
976 static gboolean
977 dfsm_resend_queue(dfsm_t *dfsm)
978 {
979 g_return_val_if_fail(dfsm != NULL, FALSE);
980 g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE);
981
982 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
983 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
984 gboolean res = TRUE;
985
986 while (iter != end) {
987 GSequenceIter *cur = iter;
988 iter = g_sequence_iter_next(iter);
989
990 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
991 g_sequence_get(cur);
992
993 if (qm->nodeid == dfsm->nodeid && qm->pid == dfsm->pid) {
994 cpg_error_t result;
995 struct iovec iov[1];
996 iov[0].iov_base = qm->msg;
997 iov[0].iov_len = qm->msg_len;
998
999 if ((result = dfsm_send_message_full(dfsm, iov, 1, 1)) != CS_OK) {
1000 res = FALSE;
1001 break;
1002 }
1003 }
1004 }
1005
1006 dfsm_free_message_queue(dfsm);
1007
1008 return res;
1009 }
1010
1011 static gboolean
1012 dfsm_deliver_sync_queue(dfsm_t *dfsm)
1013 {
1014 g_return_val_if_fail(dfsm != NULL, FALSE);
1015
1016 if (!dfsm->sync_queue)
1017 return TRUE;
1018
1019 gboolean res = TRUE;
1020
1021 // fixme: cfs_debug
1022 cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__,
1023 g_list_length(dfsm->sync_queue));
1024
1025 GList *iter = dfsm->sync_queue;
1026 while (iter) {
1027 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data;
1028 iter = g_list_next(iter);
1029
1030 if (res && dfsm->mode == DFSM_MODE_SYNCED) {
1031 dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name,
1032 qm->nodeid, qm->pid, qm->msg, qm->msg_len);
1033 } else {
1034 res = FALSE;
1035 }
1036
1037 dfsm_free_queue_entry(qm);
1038 }
1039 g_list_free(dfsm->sync_queue);
1040 dfsm->sync_queue = NULL;
1041
1042 return res;
1043 }
1044
1045 static gboolean
1046 dfsm_deliver_queue(dfsm_t *dfsm)
1047 {
1048 g_return_val_if_fail(dfsm != NULL, FALSE);
1049 g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE);
1050
1051 int qlen = g_sequence_get_length(dfsm->msg_queue);
1052 if (!qlen)
1053 return TRUE;
1054
1055 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
1056 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
1057 gboolean res = TRUE;
1058
1059 // fixme: cfs_debug
1060 cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__, qlen);
1061
1062 while (iter != end) {
1063 GSequenceIter *cur = iter;
1064 iter = g_sequence_iter_next(iter);
1065
1066 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
1067 g_sequence_get(cur);
1068
1069 dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, qm->nodeid, qm->pid);
1070 if (!ni) {
1071 cfs_dom_message(dfsm->log_domain, "remove mesage from non-member %d/%d",
1072 qm->nodeid, qm->pid);
1073 dfsm_free_queue_entry(qm);
1074 g_sequence_remove(cur);
1075 continue;
1076 }
1077
1078 if (dfsm->mode == DFSM_MODE_SYNCED) {
1079 if (ni->synced) {
1080 dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name,
1081 qm->nodeid, qm->pid, qm->msg, qm->msg_len);
1082 dfsm_free_queue_entry(qm);
1083 g_sequence_remove(cur);
1084 }
1085 } else if (dfsm->mode == DFSM_MODE_UPDATE) {
1086 if (ni->synced) {
1087 dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm);
1088 g_sequence_remove(cur);
1089 }
1090 } else {
1091 res = FALSE;
1092 break;
1093 }
1094 }
1095
1096 return res;
1097 }
1098
1099 static void
1100 dfsm_cpg_confchg_callback(
1101 cpg_handle_t handle,
1102 const struct cpg_name *group_name,
1103 const struct cpg_address *member_list,
1104 size_t member_list_entries,
1105 const struct cpg_address *left_list,
1106 size_t left_list_entries,
1107 const struct cpg_address *joined_list,
1108 size_t joined_list_entries)
1109 {
1110 cs_error_t result;
1111
1112 dfsm_t *dfsm = NULL;
1113 result = cpg_context_get(handle, (gpointer *)&dfsm);
1114 if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
1115 cfs_critical("cpg_context_get error: %d (%p)", result, dfsm);
1116 return; /* we have no valid dfsm pointer, so we can just ignore this */
1117 }
1118
1119 dfsm->we_are_member = 0;
1120
1121 /* create new epoch */
1122 dfsm->local_epoch_counter++;
1123 dfsm->sync_epoch.epoch = dfsm->local_epoch_counter;
1124 dfsm->sync_epoch.nodeid = dfsm->nodeid;
1125 dfsm->sync_epoch.pid = dfsm->pid;
1126 dfsm->sync_epoch.time = time(NULL);
1127
1128 /* invalidate saved checksum */
1129 dfsm->csum_id = dfsm->csum_counter;
1130 memset(&dfsm->csum_epoch, 0, sizeof(dfsm->csum_epoch));
1131
1132 dfsm_free_sync_queue(dfsm);
1133
1134 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1135
1136 cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode);
1137
1138 if (mode >= DFSM_ERROR_MODE_START) {
1139 cfs_dom_debug(dfsm->log_domain, "already left group - ignore message");
1140 return;
1141 }
1142
1143 int lowest_nodeid = 0;
1144 GString *str = g_string_new("members: ");
1145 for (int i = 0; i < member_list_entries; i++) {
1146
1147 g_string_append_printf(str, i ? ", %d/%d" : "%d/%d",
1148 member_list[i].nodeid, member_list[i].pid);
1149
1150 if (lowest_nodeid == 0 || lowest_nodeid > member_list[i].nodeid)
1151 lowest_nodeid = member_list[i].nodeid;
1152
1153 if (member_list[i].nodeid == dfsm->nodeid &&
1154 member_list[i].pid == dfsm->pid)
1155 dfsm->we_are_member = 1;
1156 }
1157
1158
1159 if ((dfsm->we_are_member || mode != DFSM_MODE_START))
1160 cfs_dom_message(dfsm->log_domain, str->str);
1161
1162 g_string_free(str, 1);
1163
1164 dfsm->lowest_nodeid = lowest_nodeid;
1165
1166 /* NOTE: one node can be in left and joined list at the same time,
1167 so it is better to query member list. Also JOIN/LEAVE list are
1168 different on different nodes!
1169 */
1170
1171 dfsm_release_sync_resources(dfsm, member_list, member_list_entries);
1172
1173 if (!dfsm->we_are_member) {
1174 if (mode == DFSM_MODE_START) {
1175 cfs_dom_debug(dfsm->log_domain, "ignore leave message");
1176 return;
1177 }
1178 cfs_dom_message(dfsm->log_domain, "we (%d/%d) left the process group",
1179 dfsm->nodeid, dfsm->pid);
1180 goto leave;
1181 }
1182
1183 if (member_list_entries > 1) {
1184
1185 int qlen = g_sequence_get_length(dfsm->msg_queue);
1186 if (joined_list_entries && qlen) {
1187 /* we need to make sure that all members have the same queue. */
1188 cfs_dom_message(dfsm->log_domain, "queue not emtpy - resening %d messages", qlen);
1189 if (!dfsm_resend_queue(dfsm)) {
1190 cfs_dom_critical(dfsm->log_domain, "dfsm_resend_queue failed");
1191 goto leave;
1192 }
1193 }
1194
1195 dfsm_set_mode(dfsm, DFSM_MODE_START_SYNC);
1196 if (lowest_nodeid == dfsm->nodeid) {
1197 if (!dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_SYNC_START, NULL, 0)) {
1198 cfs_dom_critical(dfsm->log_domain, "failed to send SYNC_START message");
1199 goto leave;
1200 }
1201 }
1202 } else {
1203 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
1204 dfsm->sync_info->local->synced = 1;
1205 if (!dfsm_deliver_queue(dfsm))
1206 goto leave;
1207 }
1208
1209 if (dfsm->dfsm_callbacks->dfsm_confchg_fn)
1210 dfsm->dfsm_callbacks->dfsm_confchg_fn(dfsm, dfsm->data, member_list, member_list_entries);
1211
1212 return;
1213 leave:
1214 dfsm_set_mode(dfsm, DFSM_MODE_LEAVE);
1215 return;
1216 }
1217
1218 static cpg_callbacks_t cpg_callbacks = {
1219 .cpg_deliver_fn = dfsm_cpg_deliver_callback,
1220 .cpg_confchg_fn = dfsm_cpg_confchg_callback,
1221 };
1222
1223 dfsm_t *
1224 dfsm_new(
1225 gpointer data,
1226 const char *group_name,
1227 const char *log_domain,
1228 guint32 protocol_version,
1229 dfsm_callbacks_t *callbacks)
1230 {
1231 g_return_val_if_fail(sizeof(dfsm_message_header_t) == 16, NULL);
1232 g_return_val_if_fail(sizeof(dfsm_message_state_header_t) == 32, NULL);
1233 g_return_val_if_fail(sizeof(dfsm_message_normal_header_t) == 24, NULL);
1234
1235 g_return_val_if_fail(callbacks != NULL, NULL);
1236 g_return_val_if_fail(callbacks->dfsm_deliver_fn != NULL, NULL);
1237
1238 g_return_val_if_fail(callbacks->dfsm_get_state_fn != NULL, NULL);
1239 g_return_val_if_fail(callbacks->dfsm_process_state_update_fn != NULL, NULL);
1240 g_return_val_if_fail(callbacks->dfsm_process_update_fn != NULL, NULL);
1241 g_return_val_if_fail(callbacks->dfsm_commit_fn != NULL, NULL);
1242
1243 dfsm_t *dfsm;
1244
1245 if ((dfsm = g_new0(dfsm_t, 1)) == NULL)
1246 return NULL;
1247
1248 if (!(dfsm->sync_mutex = g_mutex_new()))
1249 goto err;
1250
1251 if (!(dfsm->sync_cond = g_cond_new()))
1252 goto err;
1253
1254 if (!(dfsm->results = g_hash_table_new(g_int64_hash, g_int64_equal)))
1255 goto err;
1256
1257 if (!(dfsm->msg_queue = g_sequence_new(NULL)))
1258 goto err;
1259
1260 dfsm->log_domain = g_strdup(log_domain);
1261 dfsm->data = data;
1262 dfsm->mode = DFSM_MODE_START;
1263 dfsm->protocol_version = protocol_version;
1264 strcpy (dfsm->cpg_group_name.value, group_name);
1265 dfsm->cpg_group_name.length = strlen (group_name) + 1;
1266
1267 dfsm->cpg_callbacks = &cpg_callbacks;
1268 dfsm->dfsm_callbacks = callbacks;
1269
1270 dfsm->members = g_hash_table_new(dfsm_sync_info_hash, dfsm_sync_info_equal);
1271 if (!dfsm->members)
1272 goto err;
1273
1274 if ((dfsm->mode_mutex = g_mutex_new()) == NULL)
1275 goto err;
1276
1277 return dfsm;
1278
1279 err:
1280 dfsm_destroy(dfsm);
1281 return NULL;
1282 }
1283
1284 gboolean
1285 dfsm_lowest_nodeid(dfsm_t *dfsm)
1286 {
1287 g_return_val_if_fail(dfsm != NULL, FALSE);
1288
1289 if (dfsm->lowest_nodeid && (dfsm->lowest_nodeid == dfsm->nodeid))
1290 return TRUE;
1291
1292 return FALSE;
1293 }
1294
1295 cs_error_t
1296 dfsm_verify_request(dfsm_t *dfsm)
1297 {
1298 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1299
1300 /* only do when we have lowest nodeid */
1301 if (!dfsm->lowest_nodeid || (dfsm->lowest_nodeid != dfsm->nodeid))
1302 return CS_OK;
1303
1304 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1305 if (mode != DFSM_MODE_SYNCED)
1306 return CS_OK;
1307
1308 int len = 1;
1309 struct iovec iov[len];
1310
1311 if (dfsm->csum_counter != dfsm->csum_id) {
1312 g_message("delay verify request %016zX", dfsm->csum_counter + 1);
1313 return CS_OK;
1314 };
1315
1316 dfsm->csum_counter++;
1317 iov[0].iov_base = (char *)&dfsm->csum_counter;
1318 iov[0].iov_len = sizeof(dfsm->csum_counter);
1319
1320 cfs_debug("send verify request %016zX", dfsm->csum_counter);
1321
1322 cs_error_t result;
1323 result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY_REQUEST, iov, len);
1324
1325 if (result != CS_OK)
1326 cfs_dom_critical(dfsm->log_domain, "failed to send VERIFY_REQUEST message");
1327
1328 return result;
1329 }
1330
1331
1332 cs_error_t
1333 dfsm_dispatch(
1334 dfsm_t *dfsm,
1335 cs_dispatch_flags_t dispatch_types)
1336 {
1337 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1338 g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_INVALID_PARAM);
1339
1340 cs_error_t result;
1341
1342 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1343 int retries = 0;
1344 loop:
1345 result = cpg_dispatch(dfsm->cpg_handle, dispatch_types);
1346 if (result == CPG_ERR_TRY_AGAIN) {
1347 nanosleep(&tvreq, NULL);
1348 ++retries;
1349 if ((retries % 10) == 0)
1350 cfs_dom_message(dfsm->log_domain, "cpg_dispatch retry %d", retries);
1351 goto loop;
1352 }
1353
1354 if (!(result == CS_OK || result == CS_ERR_TRY_AGAIN)) {
1355 cfs_dom_critical(dfsm->log_domain, "cpg_dispatch failed: %d", result);
1356 }
1357
1358 return result;
1359 }
1360
1361
1362 cs_error_t
1363 dfsm_initialize(dfsm_t *dfsm, int *fd)
1364 {
1365 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1366 g_return_val_if_fail(fd != NULL, CS_ERR_INVALID_PARAM);
1367
1368 /* remove old messages */
1369 dfsm_free_message_queue(dfsm);
1370 dfsm_send_sync_message_abort(dfsm);
1371
1372 dfsm->joined = FALSE;
1373 dfsm->we_are_member = 0;
1374
1375 dfsm_set_mode(dfsm, DFSM_MODE_START);
1376
1377 cs_error_t result;
1378
1379 if (dfsm->cpg_handle == 0) {
1380 if ((result = cpg_initialize(&dfsm->cpg_handle, dfsm->cpg_callbacks)) != CS_OK) {
1381 cfs_dom_critical(dfsm->log_domain, "cpg_initialize failed: %d", result);
1382 dfsm->cpg_handle = 0;
1383 goto fail;
1384 }
1385
1386 if ((result = cpg_local_get(dfsm->cpg_handle, &dfsm->nodeid)) != CS_OK) {
1387 cfs_dom_critical(dfsm->log_domain, "cpg_local_get failed: %d", result);
1388 goto fail;
1389 }
1390
1391 dfsm->pid = getpid();
1392
1393 result = cpg_context_set(dfsm->cpg_handle, dfsm);
1394 if (result != CS_OK) {
1395 cfs_dom_critical(dfsm->log_domain, "cpg_context_set failed: %d", result);
1396 goto fail;
1397 }
1398 }
1399
1400 result = cpg_fd_get(dfsm->cpg_handle, fd);
1401 if (result != CS_OK) {
1402 cfs_dom_critical(dfsm->log_domain, "cpg_fd_get failed: %d", result);
1403 goto fail;
1404 }
1405
1406 return CS_OK;
1407
1408 fail:
1409 if (dfsm->cpg_handle)
1410 cpg_finalize(dfsm->cpg_handle);
1411 dfsm->cpg_handle = 0;
1412 return result;
1413 }
1414
1415 cs_error_t
1416 dfsm_join(dfsm_t *dfsm)
1417 {
1418 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1419 g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_LIBRARY);
1420 g_return_val_if_fail(dfsm->joined == 0, CS_ERR_EXIST);
1421
1422 cs_error_t result;
1423
1424 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1425 int retries = 0;
1426 loop:
1427 result = cpg_join(dfsm->cpg_handle, &dfsm->cpg_group_name);
1428 if (result == CPG_ERR_TRY_AGAIN) {
1429 nanosleep(&tvreq, NULL);
1430 ++retries;
1431 if ((retries % 10) == 0)
1432 cfs_dom_message(dfsm->log_domain, "cpg_join retry %d", retries);
1433 goto loop;
1434 }
1435
1436 if (result != CS_OK) {
1437 cfs_dom_critical(dfsm->log_domain, "cpg_join failed: %d", result);
1438 return result;
1439 }
1440
1441 dfsm->joined = TRUE;
1442 return TRUE;
1443 }
1444
1445 cs_error_t
1446 dfsm_leave (dfsm_t *dfsm)
1447 {
1448 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1449 g_return_val_if_fail(dfsm->joined, CS_ERR_NOT_EXIST);
1450
1451 cs_error_t result;
1452
1453 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1454 int retries = 0;
1455 loop:
1456 result = cpg_leave(dfsm->cpg_handle, &dfsm->cpg_group_name);
1457 if (result == CPG_ERR_TRY_AGAIN) {
1458 nanosleep(&tvreq, NULL);
1459 ++retries;
1460 if ((retries % 10) == 0)
1461 cfs_dom_message(dfsm->log_domain, "cpg_leave retry %d", retries);
1462 goto loop;
1463 }
1464
1465 if (result != CS_OK) {
1466 cfs_dom_critical(dfsm->log_domain, "cpg_leave failed: %d", result);
1467 return result;
1468 }
1469
1470 dfsm->joined = FALSE;
1471
1472 return TRUE;
1473 }
1474
1475 gboolean
1476 dfsm_finalize(dfsm_t *dfsm)
1477 {
1478 g_return_val_if_fail(dfsm != NULL, FALSE);
1479
1480 dfsm_send_sync_message_abort(dfsm);
1481
1482 if (dfsm->joined)
1483 dfsm_leave(dfsm);
1484
1485 if (dfsm->cpg_handle) {
1486 cpg_finalize(dfsm->cpg_handle);
1487 dfsm->cpg_handle = 0;
1488 dfsm->joined = FALSE;
1489 dfsm->we_are_member = 0;
1490 }
1491
1492 return TRUE;
1493 }
1494
1495 void
1496 dfsm_destroy(dfsm_t *dfsm)
1497 {
1498 g_return_if_fail(dfsm != NULL);
1499
1500 dfsm_finalize(dfsm);
1501
1502 if (dfsm->sync_info && dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn)
1503 dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info);
1504
1505 dfsm_free_sync_queue(dfsm);
1506
1507 if (dfsm->mode_mutex)
1508 g_mutex_free (dfsm->mode_mutex);
1509
1510 if (dfsm->sync_mutex)
1511 g_mutex_free (dfsm->sync_mutex);
1512
1513 if (dfsm->sync_cond)
1514 g_cond_free (dfsm->sync_cond);
1515
1516 if (dfsm->results)
1517 g_hash_table_destroy(dfsm->results);
1518
1519 if (dfsm->msg_queue) {
1520 dfsm_free_message_queue(dfsm);
1521 g_sequence_free(dfsm->msg_queue);
1522 }
1523
1524 if (dfsm->sync_info)
1525 g_free(dfsm->sync_info);
1526
1527 if (dfsm->cpg_handle)
1528 cpg_finalize(dfsm->cpg_handle);
1529
1530 if (dfsm->members)
1531 g_hash_table_destroy(dfsm->members);
1532
1533 if (dfsm->log_domain)
1534 g_free(dfsm->log_domain);
1535
1536 g_free(dfsm);
1537 }
1538
1539 typedef struct {
1540 dfsm_t *dfsm;
1541 } service_dfsm_private_t;
1542
1543 static gboolean
1544 service_dfsm_finalize(
1545 cfs_service_t *service,
1546 gpointer context)
1547 {
1548 g_return_val_if_fail(service != NULL, FALSE);
1549 g_return_val_if_fail(context != NULL, FALSE);
1550
1551 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1552 dfsm_t *dfsm = private->dfsm;
1553
1554 g_return_val_if_fail(dfsm != NULL, FALSE);
1555
1556 return dfsm_finalize(dfsm);
1557 }
1558
1559 static int
1560 service_dfsm_initialize(
1561 cfs_service_t *service,
1562 gpointer context)
1563 {
1564 g_return_val_if_fail(service != NULL, -1);
1565 g_return_val_if_fail(context != NULL, -1);
1566
1567 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1568 dfsm_t *dfsm = private->dfsm;
1569
1570 g_return_val_if_fail(dfsm != NULL, -1);
1571
1572 /* serious internal error - don't try to recover */
1573 if (!dfsm_restartable(dfsm))
1574 return -1;
1575
1576 int fd = -1;
1577
1578 cs_error_t result;
1579 if ((result = dfsm_initialize(dfsm, &fd)) != CS_OK)
1580 return -1;
1581
1582 result = dfsm_join(dfsm);
1583 if (result != CS_OK) {
1584 /* we can't dispatch if not joined, so we need to finalize */
1585 dfsm_finalize(dfsm);
1586 return -1;
1587 }
1588
1589 return fd;
1590 }
1591
1592 static gboolean
1593 service_dfsm_dispatch(
1594 cfs_service_t *service,
1595 gpointer context)
1596 {
1597 g_return_val_if_fail(service != NULL, FALSE);
1598 g_return_val_if_fail(context != NULL, FALSE);
1599
1600 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1601 dfsm_t *dfsm = private->dfsm;
1602
1603 g_return_val_if_fail(dfsm != NULL, FALSE);
1604 g_return_val_if_fail(dfsm->cpg_handle != 0, FALSE);
1605
1606 cs_error_t result;
1607
1608 result = dfsm_dispatch(dfsm, CPG_DISPATCH_ONE);
1609 if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE)
1610 goto finalize;
1611 if (result != CS_OK)
1612 goto fail;
1613
1614 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1615 if (mode >= DFSM_ERROR_MODE_START) {
1616 if (dfsm->joined) {
1617 result = dfsm_leave(dfsm);
1618 if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE)
1619 goto finalize;
1620 if (result != CS_OK)
1621 goto finalize;
1622 } else {
1623 if (!dfsm->we_are_member)
1624 return FALSE;
1625 }
1626 }
1627
1628 return TRUE;
1629
1630 fail:
1631 cfs_service_set_restartable(service, dfsm_restartable(dfsm));
1632 return FALSE;
1633
1634 finalize:
1635 dfsm_finalize(dfsm);
1636 goto fail;
1637 }
1638
1639 static void
1640 service_dfsm_timer(
1641 cfs_service_t *service,
1642 gpointer context)
1643 {
1644 g_return_if_fail(service != NULL);
1645 g_return_if_fail(context != NULL);
1646
1647 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1648 dfsm_t *dfsm = private->dfsm;
1649
1650 g_return_if_fail(dfsm != NULL);
1651
1652 dfsm_verify_request(dfsm);
1653 }
1654
1655 static cfs_service_callbacks_t cfs_dfsm_callbacks = {
1656 .cfs_service_initialize_fn = service_dfsm_initialize,
1657 .cfs_service_finalize_fn = service_dfsm_finalize,
1658 .cfs_service_dispatch_fn = service_dfsm_dispatch,
1659 .cfs_service_timer_fn = service_dfsm_timer,
1660 };
1661
1662 cfs_service_t *
1663 service_dfsm_new(dfsm_t *dfsm)
1664 {
1665 cfs_service_t *service;
1666
1667 g_return_val_if_fail(dfsm != NULL, NULL);
1668
1669 service_dfsm_private_t *private = g_new0(service_dfsm_private_t, 1);
1670 if (!private)
1671 return NULL;
1672
1673 private->dfsm = dfsm;
1674
1675 service = cfs_service_new(&cfs_dfsm_callbacks, dfsm->log_domain, private);
1676
1677 return service;
1678 }
1679
1680 void
1681 service_dfsm_destroy(cfs_service_t *service)
1682 {
1683 g_return_if_fail(service != NULL);
1684
1685 service_dfsm_private_t *private =
1686 (service_dfsm_private_t *)cfs_service_get_context(service);
1687
1688 g_free(private);
1689 g_free(service);
1690 }
1691
1692
1693
1694