]> git.proxmox.com Git - pve-cluster.git/blob - data/src/dfsm.c
add/use dfsm_is_initialized to avoid unnecessary error logs at startup
[pve-cluster.git] / data / src / dfsm.c
1 /*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19 */
20
21
22 /* NOTE: we try to keep the CPG handle as long as possible, because
23 * calling cpg_initialize/cpg_finalize multiple times from the
24 * same process confuses corosync.
25 * Note: CS_ERR_LIBRARY is returned when corosync died
26 */
27
28 #ifdef HAVE_CONFIG_H
29 #include <config.h>
30 #endif /* HAVE_CONFIG_H */
31
32 #include <sys/types.h>
33 #include <unistd.h>
34 #include <string.h>
35 #include <stdlib.h>
36
37 #include <corosync/corotypes.h>
38 #include <corosync/cpg.h>
39 #include <glib.h>
40
41 #include "cfs-utils.h"
42 #include "dfsm.h"
43
44 static cpg_callbacks_t cpg_callbacks;
45
46 typedef enum {
47 DFSM_MODE_START = 0,
48 DFSM_MODE_START_SYNC = 1,
49 DFSM_MODE_SYNCED = 2,
50 DFSM_MODE_UPDATE = 3,
51
52 /* values >= 128 indicates abnormal/error conditions */
53 DFSM_ERROR_MODE_START = 128,
54 DFSM_MODE_LEAVE = 253,
55 DFSM_MODE_VERSION_ERROR = 254,
56 DFSM_MODE_ERROR = 255,
57 } dfsm_mode_t;
58
59 typedef enum {
60 DFSM_MESSAGE_NORMAL = 0,
61 DFSM_MESSAGE_SYNC_START = 1,
62 DFSM_MESSAGE_STATE = 2,
63 DFSM_MESSAGE_UPDATE = 3,
64 DFSM_MESSAGE_UPDATE_COMPLETE = 4,
65 DFSM_MESSAGE_VERIFY_REQUEST = 5,
66 DFSM_MESSAGE_VERIFY = 6,
67 } dfsm_message_t;
68
69 #define DFSM_VALID_STATE_MESSAGE(mt) (mt >= DFSM_MESSAGE_SYNC_START && mt <= DFSM_MESSAGE_VERIFY)
70
71 typedef struct {
72 uint16_t type;
73 uint16_t subtype;
74 uint32_t protocol_version;
75 uint32_t time;
76 uint32_t reserved;
77 } dfsm_message_header_t;
78
79 typedef struct {
80 uint32_t epoch; // per process (not globally unique)
81 uint32_t time;
82 uint32_t nodeid;
83 uint32_t pid;
84 } dfsm_sync_epoch_t;
85
86 typedef struct {
87 dfsm_message_header_t base;
88 dfsm_sync_epoch_t epoch;
89 } dfsm_message_state_header_t;
90
91 typedef struct {
92 dfsm_message_header_t base;
93 uint64_t count;
94 } dfsm_message_normal_header_t;
95
96 typedef struct {
97 uint32_t nodeid;
98 uint32_t pid;
99 uint64_t msg_count;
100 void *msg;
101 int msg_len; // fixme: unsigned?
102 } dfsm_queued_message_t;
103
104 struct dfsm {
105 const char *log_domain;
106 cpg_callbacks_t *cpg_callbacks;
107 dfsm_callbacks_t *dfsm_callbacks;
108 cpg_handle_t cpg_handle;
109 struct cpg_name cpg_group_name;
110 uint32_t nodeid;
111 uint32_t pid;
112 int we_are_member;
113
114 guint32 protocol_version;
115 gpointer data;
116
117 gboolean joined;
118
119 /* mode is protected with mode_mutex */
120 GMutex mode_mutex;
121 dfsm_mode_t mode;
122
123 GHashTable *members; /* contains dfsm_node_info_t pointers */
124 dfsm_sync_info_t *sync_info;
125 uint32_t local_epoch_counter;
126 dfsm_sync_epoch_t sync_epoch;
127 uint32_t lowest_nodeid;
128 GSequence *msg_queue;
129 GList *sync_queue;
130
131 /* synchrounous message transmission, protected with sync_mutex */
132 GMutex sync_mutex;
133 GCond sync_cond;
134 GHashTable *results;
135 uint64_t msgcount;
136 uint64_t msgcount_rcvd;
137
138 /* state verification */
139 guchar csum[32];
140 dfsm_sync_epoch_t csum_epoch;
141 uint64_t csum_id;
142 uint64_t csum_counter;
143 };
144
145 static gboolean dfsm_deliver_queue(dfsm_t *dfsm);
146 static gboolean dfsm_deliver_sync_queue(dfsm_t *dfsm);
147
148 gboolean
149 dfsm_nodeid_is_local(
150 dfsm_t *dfsm,
151 uint32_t nodeid,
152 uint32_t pid)
153 {
154 g_return_val_if_fail(dfsm != NULL, FALSE);
155
156 return (nodeid == dfsm->nodeid && pid == dfsm->pid);
157 }
158
159
160 static void
161 dfsm_send_sync_message_abort(dfsm_t *dfsm)
162 {
163 g_return_if_fail(dfsm != NULL);
164
165 g_mutex_lock (&dfsm->sync_mutex);
166 dfsm->msgcount_rcvd = dfsm->msgcount;
167 g_cond_broadcast (&dfsm->sync_cond);
168 g_mutex_unlock (&dfsm->sync_mutex);
169 }
170
171 static void
172 dfsm_record_local_result(
173 dfsm_t *dfsm,
174 uint64_t msg_count,
175 int msg_result,
176 gboolean processed)
177 {
178 g_return_if_fail(dfsm != NULL);
179 g_return_if_fail(dfsm->results != NULL);
180
181 g_mutex_lock (&dfsm->sync_mutex);
182 dfsm_result_t *rp = (dfsm_result_t *)g_hash_table_lookup(dfsm->results, &msg_count);
183 if (rp) {
184 rp->result = msg_result;
185 rp->processed = processed;
186 }
187 dfsm->msgcount_rcvd = msg_count;
188 g_cond_broadcast (&dfsm->sync_cond);
189 g_mutex_unlock (&dfsm->sync_mutex);
190 }
191
192 static cs_error_t
193 dfsm_send_message_full(
194 dfsm_t *dfsm,
195 struct iovec *iov,
196 unsigned int len,
197 int retry)
198 {
199 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
200 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
201
202 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
203 cs_error_t result;
204 int retries = 0;
205 loop:
206 result = cpg_mcast_joined(dfsm->cpg_handle, CPG_TYPE_AGREED, iov, len);
207 if (retry && result == CS_ERR_TRY_AGAIN) {
208 nanosleep(&tvreq, NULL);
209 ++retries;
210 if ((retries % 10) == 0)
211 cfs_dom_message(dfsm->log_domain, "cpg_send_message retry %d", retries);
212 if (retries < 100)
213 goto loop;
214 }
215
216 if (retries)
217 cfs_dom_message(dfsm->log_domain, "cpg_send_message retried %d times", retries);
218
219 if (result != CS_OK &&
220 (!retry || result != CS_ERR_TRY_AGAIN))
221 cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
222
223 return result;
224 }
225
226 static cs_error_t
227 dfsm_send_state_message_full(
228 dfsm_t *dfsm,
229 uint16_t type,
230 struct iovec *iov,
231 unsigned int len)
232 {
233 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
234 g_return_val_if_fail(DFSM_VALID_STATE_MESSAGE(type), CS_ERR_INVALID_PARAM);
235 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
236
237 dfsm_message_state_header_t header;
238 header.base.type = type;
239 header.base.subtype = 0;
240 header.base.protocol_version = dfsm->protocol_version;
241 header.base.time = time(NULL);
242 header.base.reserved = 0;
243
244 header.epoch = dfsm->sync_epoch;
245
246 struct iovec real_iov[len + 1];
247
248 real_iov[0].iov_base = (char *)&header;
249 real_iov[0].iov_len = sizeof(header);
250
251 for (int i = 0; i < len; i++)
252 real_iov[i + 1] = iov[i];
253
254 return dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
255 }
256
257 cs_error_t
258 dfsm_send_update(
259 dfsm_t *dfsm,
260 struct iovec *iov,
261 unsigned int len)
262 {
263 return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE, iov, len);
264 }
265
266 cs_error_t
267 dfsm_send_update_complete(dfsm_t *dfsm)
268 {
269 return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE_COMPLETE, NULL, 0);
270 }
271
272
273 cs_error_t
274 dfsm_send_message(
275 dfsm_t *dfsm,
276 uint16_t msgtype,
277 struct iovec *iov,
278 int len)
279 {
280 return dfsm_send_message_sync(dfsm, msgtype, iov, len, NULL);
281 }
282
283 cs_error_t
284 dfsm_send_message_sync(
285 dfsm_t *dfsm,
286 uint16_t msgtype,
287 struct iovec *iov,
288 int len,
289 dfsm_result_t *rp)
290 {
291 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
292 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
293
294 g_mutex_lock (&dfsm->sync_mutex);
295 /* note: hold lock until message is sent - to guarantee ordering */
296 uint64_t msgcount = ++dfsm->msgcount;
297 if (rp) {
298 rp->msgcount = msgcount;
299 rp->processed = 0;
300 g_hash_table_replace(dfsm->results, &rp->msgcount, rp);
301 }
302
303 dfsm_message_normal_header_t header;
304 header.base.type = DFSM_MESSAGE_NORMAL;
305 header.base.subtype = msgtype;
306 header.base.protocol_version = dfsm->protocol_version;
307 header.base.time = time(NULL);
308 header.base.reserved = 0;
309 header.count = msgcount;
310
311 struct iovec real_iov[len + 1];
312
313 real_iov[0].iov_base = (char *)&header;
314 real_iov[0].iov_len = sizeof(header);
315
316 for (int i = 0; i < len; i++)
317 real_iov[i + 1] = iov[i];
318
319 cs_error_t result = dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
320
321 g_mutex_unlock (&dfsm->sync_mutex);
322
323 if (result != CS_OK) {
324 cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
325
326 if (rp) {
327 g_mutex_lock (&dfsm->sync_mutex);
328 g_hash_table_remove(dfsm->results, &rp->msgcount);
329 g_mutex_unlock (&dfsm->sync_mutex);
330 }
331 return result;
332 }
333
334 if (rp) {
335 g_mutex_lock (&dfsm->sync_mutex);
336
337 while (dfsm->msgcount_rcvd < msgcount)
338 g_cond_wait (&dfsm->sync_cond, &dfsm->sync_mutex);
339
340
341 g_hash_table_remove(dfsm->results, &rp->msgcount);
342
343 g_mutex_unlock (&dfsm->sync_mutex);
344
345 return rp->processed ? CS_OK : CS_ERR_FAILED_OPERATION;
346 }
347
348 return CS_OK;
349 }
350
351 static gboolean
352 dfsm_send_checksum(dfsm_t *dfsm)
353 {
354 g_return_val_if_fail(dfsm != NULL, FALSE);
355
356 int len = 2;
357 struct iovec iov[len];
358
359 iov[0].iov_base = (char *)&dfsm->csum_id;
360 iov[0].iov_len = sizeof(dfsm->csum_id);
361 iov[1].iov_base = dfsm->csum;
362 iov[1].iov_len = sizeof(dfsm->csum);
363
364 gboolean res = (dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY, iov, len) == CS_OK);
365
366 return res;
367 }
368
369 static void
370 dfsm_free_queue_entry(gpointer data)
371 {
372 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)data;
373 g_free (qm->msg);
374 g_free (qm);
375 }
376
377 static void
378 dfsm_free_message_queue(dfsm_t *dfsm)
379 {
380 g_return_if_fail(dfsm != NULL);
381 g_return_if_fail(dfsm->msg_queue != NULL);
382
383 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
384 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
385 while (iter != end) {
386 GSequenceIter *cur = iter;
387 iter = g_sequence_iter_next(iter);
388 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
389 g_sequence_get(cur);
390 dfsm_free_queue_entry(qm);
391 g_sequence_remove(cur);
392 }
393 }
394
395 static void
396 dfsm_free_sync_queue(dfsm_t *dfsm)
397 {
398 g_return_if_fail(dfsm != NULL);
399
400 GList *iter = dfsm->sync_queue;
401 while (iter) {
402 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data;
403 iter = g_list_next(iter);
404 dfsm_free_queue_entry(qm);
405 }
406
407 g_list_free(dfsm->sync_queue);
408 dfsm->sync_queue = NULL;
409 }
410
411 static gint
412 message_queue_sort_fn(
413 gconstpointer a,
414 gconstpointer b,
415 gpointer user_data)
416 {
417 return ((dfsm_queued_message_t *)a)->msg_count -
418 ((dfsm_queued_message_t *)b)->msg_count;
419 }
420
421 static dfsm_node_info_t *
422 dfsm_node_info_lookup(
423 dfsm_t *dfsm,
424 uint32_t nodeid,
425 uint32_t pid)
426 {
427 g_return_val_if_fail(dfsm != NULL, NULL);
428 g_return_val_if_fail(dfsm->members != NULL, NULL);
429
430 dfsm_node_info_t info = { .nodeid = nodeid, .pid = pid };
431
432 return (dfsm_node_info_t *)g_hash_table_lookup(dfsm->members, &info);
433 }
434
435 static dfsm_queued_message_t *
436 dfsm_queue_add_message(
437 dfsm_t *dfsm,
438 uint32_t nodeid,
439 uint32_t pid,
440 uint64_t msg_count,
441 const void *msg,
442 size_t msg_len)
443 {
444 g_return_val_if_fail(dfsm != NULL, NULL);
445 g_return_val_if_fail(msg != NULL, NULL);
446 g_return_val_if_fail(msg_len != 0, NULL);
447
448 dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, nodeid, pid);
449 if (!ni) {
450 cfs_dom_critical(dfsm->log_domain, "dfsm_node_info_lookup failed");
451 return NULL;
452 }
453
454 dfsm_queued_message_t *qm = g_new0(dfsm_queued_message_t, 1);
455 g_return_val_if_fail(qm != NULL, NULL);
456
457 qm->nodeid = nodeid;
458 qm->pid = pid;
459 qm->msg = g_memdup (msg, msg_len);
460 qm->msg_len = msg_len;
461 qm->msg_count = msg_count;
462
463 if (dfsm->mode == DFSM_MODE_UPDATE && ni->synced) {
464 dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm);
465 } else {
466 /* NOTE: we only need to sort the queue because we resend all
467 * queued messages sometimes.
468 */
469 g_sequence_insert_sorted(dfsm->msg_queue, qm, message_queue_sort_fn, NULL);
470 }
471
472 return qm;
473 }
474
475 static guint
476 dfsm_sync_info_hash(gconstpointer key)
477 {
478 dfsm_node_info_t *info = (dfsm_node_info_t *)key;
479
480 return g_int_hash(&info->nodeid) + g_int_hash(&info->pid);
481 }
482
483 static gboolean
484 dfsm_sync_info_equal(
485 gconstpointer v1,
486 gconstpointer v2)
487 {
488 dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1;
489 dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2;
490
491 if (info1->nodeid == info2->nodeid &&
492 info1->pid == info2->pid)
493 return TRUE;
494
495 return FALSE;
496 }
497
498 static int
499 dfsm_sync_info_compare(
500 gconstpointer v1,
501 gconstpointer v2)
502 {
503 dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1;
504 dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2;
505
506 if (info1->nodeid != info2->nodeid)
507 return info1->nodeid - info2->nodeid;
508
509 return info1->pid - info2->pid;
510 }
511
512 static void
513 dfsm_set_mode(
514 dfsm_t *dfsm,
515 dfsm_mode_t new_mode)
516 {
517 g_return_if_fail(dfsm != NULL);
518
519 cfs_debug("dfsm_set_mode - set mode to %d", new_mode);
520
521 int changed = 0;
522 g_mutex_lock (&dfsm->mode_mutex);
523 if (dfsm->mode != new_mode) {
524 if (new_mode < DFSM_ERROR_MODE_START ||
525 (dfsm->mode < DFSM_ERROR_MODE_START || new_mode >= dfsm->mode)) {
526 dfsm->mode = new_mode;
527 changed = 1;
528 }
529 }
530 g_mutex_unlock (&dfsm->mode_mutex);
531
532 if (!changed)
533 return;
534
535 if (new_mode == DFSM_MODE_START) {
536 cfs_dom_message(dfsm->log_domain, "start cluster connection");
537 } else if (new_mode == DFSM_MODE_START_SYNC) {
538 cfs_dom_message(dfsm->log_domain, "starting data syncronisation");
539 } else if (new_mode == DFSM_MODE_SYNCED) {
540 cfs_dom_message(dfsm->log_domain, "all data is up to date");
541 if (dfsm->dfsm_callbacks->dfsm_synced_fn)
542 dfsm->dfsm_callbacks->dfsm_synced_fn(dfsm);
543 } else if (new_mode == DFSM_MODE_UPDATE) {
544 cfs_dom_message(dfsm->log_domain, "waiting for updates from leader");
545 } else if (new_mode == DFSM_MODE_LEAVE) {
546 cfs_dom_critical(dfsm->log_domain, "leaving CPG group");
547 } else if (new_mode == DFSM_MODE_ERROR) {
548 cfs_dom_critical(dfsm->log_domain, "serious internal error - stop cluster connection");
549 } else if (new_mode == DFSM_MODE_VERSION_ERROR) {
550 cfs_dom_critical(dfsm->log_domain, "detected newer protocol - please update this node");
551 }
552 }
553
554 static dfsm_mode_t
555 dfsm_get_mode(dfsm_t *dfsm)
556 {
557 g_return_val_if_fail(dfsm != NULL, DFSM_MODE_ERROR);
558
559 g_mutex_lock (&dfsm->mode_mutex);
560 dfsm_mode_t mode = dfsm->mode;
561 g_mutex_unlock (&dfsm->mode_mutex);
562
563 return mode;
564 }
565
566 gboolean
567 dfsm_restartable(dfsm_t *dfsm)
568 {
569 dfsm_mode_t mode = dfsm_get_mode(dfsm);
570
571 return !(mode == DFSM_MODE_ERROR ||
572 mode == DFSM_MODE_VERSION_ERROR);
573 }
574
575 void
576 dfsm_set_errormode(dfsm_t *dfsm)
577 {
578 dfsm_set_mode(dfsm, DFSM_MODE_ERROR);
579 }
580
581 static void
582 dfsm_release_sync_resources(
583 dfsm_t *dfsm,
584 const struct cpg_address *member_list,
585 size_t member_list_entries)
586 {
587 g_return_if_fail(dfsm != NULL);
588 g_return_if_fail(dfsm->members != NULL);
589 g_return_if_fail(!member_list_entries || member_list != NULL);
590
591 cfs_debug("enter dfsm_release_sync_resources");
592
593 if (dfsm->sync_info) {
594
595 if (dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn) {
596 dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info);
597 dfsm->sync_info->data = NULL;
598 }
599
600 for (int i = 0; i < dfsm->sync_info->node_count; i++) {
601 if (dfsm->sync_info->nodes[i].state) {
602 g_free(dfsm->sync_info->nodes[i].state);
603 dfsm->sync_info->nodes[i].state = NULL;
604 dfsm->sync_info->nodes[i].state_len = 0;
605 }
606 }
607 }
608
609 if (member_list) {
610
611 g_hash_table_remove_all(dfsm->members);
612
613 if (dfsm->sync_info)
614 g_free(dfsm->sync_info);
615
616 int size = sizeof(dfsm_sync_info_t) +
617 member_list_entries*sizeof(dfsm_sync_info_t);
618 dfsm_sync_info_t *sync_info = dfsm->sync_info = g_malloc0(size);
619 sync_info->node_count = member_list_entries;
620
621 for (int i = 0; i < member_list_entries; i++) {
622 sync_info->nodes[i].nodeid = member_list[i].nodeid;
623 sync_info->nodes[i].pid = member_list[i].pid;
624 }
625
626 qsort(sync_info->nodes, member_list_entries, sizeof(dfsm_node_info_t),
627 dfsm_sync_info_compare);
628
629 for (int i = 0; i < member_list_entries; i++) {
630 dfsm_node_info_t *info = &sync_info->nodes[i];
631 g_hash_table_insert(dfsm->members, info, info);
632 if (info->nodeid == dfsm->nodeid && info->pid == dfsm->pid)
633 sync_info->local = info;
634 }
635 }
636 }
637
638 static void
639 dfsm_cpg_deliver_callback(
640 cpg_handle_t handle,
641 const struct cpg_name *group_name,
642 uint32_t nodeid,
643 uint32_t pid,
644 void *msg,
645 size_t msg_len)
646 {
647 cs_error_t result;
648
649 dfsm_t *dfsm = NULL;
650 result = cpg_context_get(handle, (gpointer *)&dfsm);
651 if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
652 cfs_critical("cpg_context_get error: %d (%p)", result, dfsm);
653 return; /* we have no valid dfsm pointer, so we can just ignore this */
654 }
655 dfsm_mode_t mode = dfsm_get_mode(dfsm);
656
657 cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode);
658
659 if (mode >= DFSM_ERROR_MODE_START) {
660 cfs_dom_debug(dfsm->log_domain, "error mode - ignoring message");
661 goto leave;
662 }
663
664 if (!dfsm->sync_info) {
665 cfs_dom_critical(dfsm->log_domain, "no dfsm_sync_info - internal error");
666 goto leave;
667 }
668
669 if (msg_len < sizeof(dfsm_message_header_t)) {
670 cfs_dom_critical(dfsm->log_domain, "received short message (%ld bytes)", msg_len);
671 goto leave;
672 }
673
674 dfsm_message_header_t *base_header = (dfsm_message_header_t *)msg;
675
676 if (base_header->protocol_version > dfsm->protocol_version) {
677 cfs_dom_critical(dfsm->log_domain, "received message with protocol version %d",
678 base_header->protocol_version);
679 dfsm_set_mode(dfsm, DFSM_MODE_VERSION_ERROR);
680 return;
681 } else if (base_header->protocol_version < dfsm->protocol_version) {
682 cfs_dom_message(dfsm->log_domain, "ignore message with wrong protocol version %d",
683 base_header->protocol_version);
684 return;
685 }
686
687 if (base_header->type == DFSM_MESSAGE_NORMAL) {
688
689 dfsm_message_normal_header_t *header = (dfsm_message_normal_header_t *)msg;
690
691 if (msg_len < sizeof(dfsm_message_normal_header_t)) {
692 cfs_dom_critical(dfsm->log_domain, "received short message (type = %d, subtype = %d, %ld bytes)",
693 base_header->type, base_header->subtype, msg_len);
694 goto leave;
695 }
696
697 if (mode != DFSM_MODE_SYNCED) {
698 cfs_dom_debug(dfsm->log_domain, "queue message %zu (subtype = %d, length = %ld)",
699 header->count, base_header->subtype, msg_len);
700
701 if (!dfsm_queue_add_message(dfsm, nodeid, pid, header->count, msg, msg_len))
702 goto leave;
703 } else {
704
705 int msg_res = -1;
706 int res = dfsm->dfsm_callbacks->dfsm_deliver_fn(
707 dfsm, dfsm->data, &msg_res, nodeid, pid, base_header->subtype,
708 base_header->time, msg + sizeof(dfsm_message_normal_header_t),
709 msg_len - sizeof(dfsm_message_normal_header_t));
710
711 if (nodeid == dfsm->nodeid && pid == dfsm->pid)
712 dfsm_record_local_result(dfsm, header->count, msg_res, res);
713
714 if (res < 0)
715 goto leave;
716 }
717
718 return;
719 }
720
721 /* state related messages
722 * we needs right epoch - else we simply discard the message
723 */
724
725 dfsm_message_state_header_t *header = (dfsm_message_state_header_t *)msg;
726
727 if (msg_len < sizeof(dfsm_message_state_header_t)) {
728 cfs_dom_critical(dfsm->log_domain, "received short state message (type = %d, subtype = %d, %ld bytes)",
729 base_header->type, base_header->subtype, msg_len);
730 goto leave;
731 }
732
733 if (base_header->type != DFSM_MESSAGE_SYNC_START &&
734 (memcmp(&header->epoch, &dfsm->sync_epoch, sizeof(dfsm_sync_epoch_t)) != 0)) {
735 cfs_dom_debug(dfsm->log_domain, "ignore message (msg_type == %d) with "
736 "wrong epoch (epoch %d/%d/%08X)", base_header->type,
737 header->epoch.nodeid, header->epoch.pid, header->epoch.epoch);
738 return;
739 }
740
741 msg += sizeof(dfsm_message_state_header_t);
742 msg_len -= sizeof(dfsm_message_state_header_t);
743
744 if (mode == DFSM_MODE_SYNCED) {
745 if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) {
746
747 for (int i = 0; i < dfsm->sync_info->node_count; i++)
748 dfsm->sync_info->nodes[i].synced = 1;
749
750 if (!dfsm_deliver_queue(dfsm))
751 goto leave;
752
753 return;
754
755 } else if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST) {
756
757 if (msg_len != sizeof(dfsm->csum_counter)) {
758 cfs_dom_critical(dfsm->log_domain, "cpg received verify request with wrong length (%ld bytes) form node %d/%d", msg_len, nodeid, pid);
759 goto leave;
760 }
761
762 uint64_t csum_id = *((uint64_t *)msg);
763 msg += 8; msg_len -= 8;
764
765 cfs_dom_debug(dfsm->log_domain, "got verify request from node %d %016zX", nodeid, csum_id);
766
767 if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
768 if (!dfsm->dfsm_callbacks->dfsm_checksum_fn(
769 dfsm, dfsm->data, dfsm->csum, sizeof(dfsm->csum))) {
770 cfs_dom_critical(dfsm->log_domain, "unable to compute data checksum");
771 goto leave;
772 }
773
774 dfsm->csum_epoch = header->epoch;
775 dfsm->csum_id = csum_id;
776
777 if (nodeid == dfsm->nodeid && pid == dfsm->pid) {
778 if (!dfsm_send_checksum(dfsm))
779 goto leave;
780 }
781 }
782
783 return;
784
785 } else if (base_header->type == DFSM_MESSAGE_VERIFY) {
786
787 cfs_dom_debug(dfsm->log_domain, "received verify message");
788
789 if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
790
791 if (msg_len != (sizeof(dfsm->csum_id) + sizeof(dfsm->csum))) {
792 cfs_dom_critical(dfsm->log_domain, "cpg received verify message with wrong length (%ld bytes)", msg_len);
793 goto leave;
794 }
795
796 uint64_t csum_id = *((uint64_t *)msg);
797 msg += 8; msg_len -= 8;
798
799 if (dfsm->csum_id == csum_id &&
800 (memcmp(&dfsm->csum_epoch, &header->epoch, sizeof(dfsm_sync_epoch_t)) == 0)) {
801 if (memcmp(msg, dfsm->csum, sizeof(dfsm->csum)) != 0) {
802 cfs_dom_critical(dfsm->log_domain, "wrong checksum %016zX != %016zX - restarting",
803 *(uint64_t *)msg, *(uint64_t *)dfsm->csum);
804 goto leave;
805 } else {
806 cfs_dom_message(dfsm->log_domain, "data verification successful");
807 }
808 } else {
809 cfs_dom_message(dfsm->log_domain, "skip verification - no checksum saved");
810 }
811 }
812
813 return;
814
815 } else {
816 /* ignore (we already got all required updates, or we are leader) */
817 cfs_dom_debug(dfsm->log_domain, "ignore state sync message %d",
818 base_header->type);
819 return;
820 }
821
822 } else if (mode == DFSM_MODE_START_SYNC) {
823
824 if (base_header->type == DFSM_MESSAGE_SYNC_START) {
825
826 if (nodeid != dfsm->lowest_nodeid) {
827 cfs_dom_critical(dfsm->log_domain, "ignore sync request from wrong member %d/%d",
828 nodeid, pid);
829 }
830
831 cfs_dom_message(dfsm->log_domain, "received sync request (epoch %d/%d/%08X)",
832 header->epoch.nodeid, header->epoch.pid, header->epoch.epoch);
833
834 dfsm->sync_epoch = header->epoch;
835
836 dfsm_release_sync_resources(dfsm, NULL, 0);
837
838 unsigned int state_len = 0;
839 gpointer state = NULL;
840
841 state = dfsm->dfsm_callbacks->dfsm_get_state_fn(dfsm, dfsm->data, &state_len);
842
843 if (!(state && state_len)) {
844 cfs_dom_critical(dfsm->log_domain, "dfsm_get_state_fn failed");
845 goto leave;
846 }
847
848 struct iovec iov[1];
849 iov[0].iov_base = state;
850 iov[0].iov_len = state_len;
851
852 result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_STATE, iov, 1);
853
854 if (state)
855 g_free(state);
856
857 if (result != CS_OK)
858 goto leave;
859
860 return;
861
862 } else if (base_header->type == DFSM_MESSAGE_STATE) {
863
864 dfsm_node_info_t *ni;
865
866 if (!(ni = dfsm_node_info_lookup(dfsm, nodeid, pid))) {
867 cfs_dom_critical(dfsm->log_domain, "received state for non-member %d/%d", nodeid, pid);
868 goto leave;
869 }
870
871 if (ni->state) {
872 cfs_dom_critical(dfsm->log_domain, "received duplicate state for member %d/%d", nodeid, pid);
873 goto leave;
874 }
875
876 ni->state = g_memdup(msg, msg_len);
877 ni->state_len = msg_len;
878
879 int received_all = 1;
880 for (int i = 0; i < dfsm->sync_info->node_count; i++) {
881 if (!dfsm->sync_info->nodes[i].state) {
882 received_all = 0;
883 break;
884 }
885 }
886
887 if (received_all) {
888 cfs_dom_message(dfsm->log_domain, "received all states");
889
890 int res = dfsm->dfsm_callbacks->dfsm_process_state_update_fn(dfsm, dfsm->data, dfsm->sync_info);
891 if (res < 0)
892 goto leave;
893
894 if (dfsm->sync_info->local->synced) {
895 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
896 dfsm_release_sync_resources(dfsm, NULL, 0);
897
898 if (!dfsm_deliver_queue(dfsm))
899 goto leave;
900
901 } else {
902 dfsm_set_mode(dfsm, DFSM_MODE_UPDATE);
903
904 if (!dfsm_deliver_queue(dfsm))
905 goto leave;
906 }
907
908 }
909
910 return;
911 }
912
913 } else if (mode == DFSM_MODE_UPDATE) {
914
915 if (base_header->type == DFSM_MESSAGE_UPDATE) {
916
917 int res = dfsm->dfsm_callbacks->dfsm_process_update_fn(
918 dfsm, dfsm->data, dfsm->sync_info, nodeid, pid, msg, msg_len);
919
920 if (res < 0)
921 goto leave;
922
923 return;
924
925 } else if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) {
926
927
928 int res = dfsm->dfsm_callbacks->dfsm_commit_fn(dfsm, dfsm->data, dfsm->sync_info);
929
930 if (res < 0)
931 goto leave;
932
933 for (int i = 0; i < dfsm->sync_info->node_count; i++)
934 dfsm->sync_info->nodes[i].synced = 1;
935
936 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
937
938 if (!dfsm_deliver_sync_queue(dfsm))
939 goto leave;
940
941 if (!dfsm_deliver_queue(dfsm))
942 goto leave;
943
944 dfsm_release_sync_resources(dfsm, NULL, 0);
945
946 return;
947 }
948
949 } else {
950 cfs_dom_critical(dfsm->log_domain, "internal error - unknown mode %d", mode);
951 goto leave;
952 }
953
954 if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST ||
955 base_header->type == DFSM_MESSAGE_VERIFY) {
956
957 cfs_dom_debug(dfsm->log_domain, "ignore verify message %d while not synced", base_header->type);
958
959 } else {
960 cfs_dom_critical(dfsm->log_domain, "received unknown state message type (type = %d, %ld bytes)",
961 base_header->type, msg_len);
962 goto leave;
963 }
964
965 leave:
966 dfsm_set_mode(dfsm, DFSM_MODE_LEAVE);
967 dfsm_release_sync_resources(dfsm, NULL, 0);
968 return;
969 }
970
971 static gboolean
972 dfsm_resend_queue(dfsm_t *dfsm)
973 {
974 g_return_val_if_fail(dfsm != NULL, FALSE);
975 g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE);
976
977 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
978 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
979 gboolean res = TRUE;
980
981 while (iter != end) {
982 GSequenceIter *cur = iter;
983 iter = g_sequence_iter_next(iter);
984
985 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
986 g_sequence_get(cur);
987
988 if (qm->nodeid == dfsm->nodeid && qm->pid == dfsm->pid) {
989 cs_error_t result;
990 struct iovec iov[1];
991 iov[0].iov_base = qm->msg;
992 iov[0].iov_len = qm->msg_len;
993
994 if ((result = dfsm_send_message_full(dfsm, iov, 1, 1)) != CS_OK) {
995 res = FALSE;
996 break;
997 }
998 }
999 }
1000
1001 dfsm_free_message_queue(dfsm);
1002
1003 return res;
1004 }
1005
1006 static gboolean
1007 dfsm_deliver_sync_queue(dfsm_t *dfsm)
1008 {
1009 g_return_val_if_fail(dfsm != NULL, FALSE);
1010
1011 if (!dfsm->sync_queue)
1012 return TRUE;
1013
1014 gboolean res = TRUE;
1015
1016 // fixme: cfs_debug
1017 cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__,
1018 g_list_length(dfsm->sync_queue));
1019
1020 GList *iter = dfsm->sync_queue;
1021 while (iter) {
1022 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data;
1023 iter = g_list_next(iter);
1024
1025 if (res && dfsm->mode == DFSM_MODE_SYNCED) {
1026 dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name,
1027 qm->nodeid, qm->pid, qm->msg, qm->msg_len);
1028 } else {
1029 res = FALSE;
1030 }
1031
1032 dfsm_free_queue_entry(qm);
1033 }
1034 g_list_free(dfsm->sync_queue);
1035 dfsm->sync_queue = NULL;
1036
1037 return res;
1038 }
1039
1040 static gboolean
1041 dfsm_deliver_queue(dfsm_t *dfsm)
1042 {
1043 g_return_val_if_fail(dfsm != NULL, FALSE);
1044 g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE);
1045
1046 int qlen = g_sequence_get_length(dfsm->msg_queue);
1047 if (!qlen)
1048 return TRUE;
1049
1050 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
1051 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
1052 gboolean res = TRUE;
1053
1054 // fixme: cfs_debug
1055 cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__, qlen);
1056
1057 while (iter != end) {
1058 GSequenceIter *cur = iter;
1059 iter = g_sequence_iter_next(iter);
1060
1061 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
1062 g_sequence_get(cur);
1063
1064 dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, qm->nodeid, qm->pid);
1065 if (!ni) {
1066 cfs_dom_message(dfsm->log_domain, "remove message from non-member %d/%d",
1067 qm->nodeid, qm->pid);
1068 dfsm_free_queue_entry(qm);
1069 g_sequence_remove(cur);
1070 continue;
1071 }
1072
1073 if (dfsm->mode == DFSM_MODE_SYNCED) {
1074 if (ni->synced) {
1075 dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name,
1076 qm->nodeid, qm->pid, qm->msg, qm->msg_len);
1077 dfsm_free_queue_entry(qm);
1078 g_sequence_remove(cur);
1079 }
1080 } else if (dfsm->mode == DFSM_MODE_UPDATE) {
1081 if (ni->synced) {
1082 dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm);
1083 g_sequence_remove(cur);
1084 }
1085 } else {
1086 res = FALSE;
1087 break;
1088 }
1089 }
1090
1091 return res;
1092 }
1093
1094 static void
1095 dfsm_cpg_confchg_callback(
1096 cpg_handle_t handle,
1097 const struct cpg_name *group_name,
1098 const struct cpg_address *member_list,
1099 size_t member_list_entries,
1100 const struct cpg_address *left_list,
1101 size_t left_list_entries,
1102 const struct cpg_address *joined_list,
1103 size_t joined_list_entries)
1104 {
1105 cs_error_t result;
1106
1107 dfsm_t *dfsm = NULL;
1108 result = cpg_context_get(handle, (gpointer *)&dfsm);
1109 if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
1110 cfs_critical("cpg_context_get error: %d (%p)", result, dfsm);
1111 return; /* we have no valid dfsm pointer, so we can just ignore this */
1112 }
1113
1114 dfsm->we_are_member = 0;
1115
1116 /* create new epoch */
1117 dfsm->local_epoch_counter++;
1118 dfsm->sync_epoch.epoch = dfsm->local_epoch_counter;
1119 dfsm->sync_epoch.nodeid = dfsm->nodeid;
1120 dfsm->sync_epoch.pid = dfsm->pid;
1121 dfsm->sync_epoch.time = time(NULL);
1122
1123 /* invalidate saved checksum */
1124 dfsm->csum_id = dfsm->csum_counter;
1125 memset(&dfsm->csum_epoch, 0, sizeof(dfsm->csum_epoch));
1126
1127 dfsm_free_sync_queue(dfsm);
1128
1129 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1130
1131 cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode);
1132
1133 if (mode >= DFSM_ERROR_MODE_START) {
1134 cfs_dom_debug(dfsm->log_domain, "already left group - ignore message");
1135 return;
1136 }
1137
1138 int lowest_nodeid = 0;
1139 GString *str = g_string_new("members: ");
1140 for (int i = 0; i < member_list_entries; i++) {
1141
1142 g_string_append_printf(str, i ? ", %d/%d" : "%d/%d",
1143 member_list[i].nodeid, member_list[i].pid);
1144
1145 if (lowest_nodeid == 0 || lowest_nodeid > member_list[i].nodeid)
1146 lowest_nodeid = member_list[i].nodeid;
1147
1148 if (member_list[i].nodeid == dfsm->nodeid &&
1149 member_list[i].pid == dfsm->pid)
1150 dfsm->we_are_member = 1;
1151 }
1152
1153
1154 if ((dfsm->we_are_member || mode != DFSM_MODE_START))
1155 cfs_dom_message(dfsm->log_domain, str->str);
1156
1157 g_string_free(str, 1);
1158
1159 dfsm->lowest_nodeid = lowest_nodeid;
1160
1161 /* NOTE: one node can be in left and joined list at the same time,
1162 so it is better to query member list. Also JOIN/LEAVE list are
1163 different on different nodes!
1164 */
1165
1166 dfsm_release_sync_resources(dfsm, member_list, member_list_entries);
1167
1168 if (!dfsm->we_are_member) {
1169 if (mode == DFSM_MODE_START) {
1170 cfs_dom_debug(dfsm->log_domain, "ignore leave message");
1171 return;
1172 }
1173 cfs_dom_message(dfsm->log_domain, "we (%d/%d) left the process group",
1174 dfsm->nodeid, dfsm->pid);
1175 goto leave;
1176 }
1177
1178 if (member_list_entries > 1) {
1179
1180 int qlen = g_sequence_get_length(dfsm->msg_queue);
1181 if (joined_list_entries && qlen) {
1182 /* we need to make sure that all members have the same queue. */
1183 cfs_dom_message(dfsm->log_domain, "queue not emtpy - resening %d messages", qlen);
1184 if (!dfsm_resend_queue(dfsm)) {
1185 cfs_dom_critical(dfsm->log_domain, "dfsm_resend_queue failed");
1186 goto leave;
1187 }
1188 }
1189
1190 dfsm_set_mode(dfsm, DFSM_MODE_START_SYNC);
1191 if (lowest_nodeid == dfsm->nodeid) {
1192 if (!dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_SYNC_START, NULL, 0)) {
1193 cfs_dom_critical(dfsm->log_domain, "failed to send SYNC_START message");
1194 goto leave;
1195 }
1196 }
1197 } else {
1198 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
1199 dfsm->sync_info->local->synced = 1;
1200 if (!dfsm_deliver_queue(dfsm))
1201 goto leave;
1202 }
1203
1204 if (dfsm->dfsm_callbacks->dfsm_confchg_fn)
1205 dfsm->dfsm_callbacks->dfsm_confchg_fn(dfsm, dfsm->data, member_list, member_list_entries);
1206
1207 return;
1208 leave:
1209 dfsm_set_mode(dfsm, DFSM_MODE_LEAVE);
1210 return;
1211 }
1212
1213 static cpg_callbacks_t cpg_callbacks = {
1214 .cpg_deliver_fn = dfsm_cpg_deliver_callback,
1215 .cpg_confchg_fn = dfsm_cpg_confchg_callback,
1216 };
1217
1218 dfsm_t *
1219 dfsm_new(
1220 gpointer data,
1221 const char *group_name,
1222 const char *log_domain,
1223 guint32 protocol_version,
1224 dfsm_callbacks_t *callbacks)
1225 {
1226 g_return_val_if_fail(sizeof(dfsm_message_header_t) == 16, NULL);
1227 g_return_val_if_fail(sizeof(dfsm_message_state_header_t) == 32, NULL);
1228 g_return_val_if_fail(sizeof(dfsm_message_normal_header_t) == 24, NULL);
1229
1230 g_return_val_if_fail(callbacks != NULL, NULL);
1231 g_return_val_if_fail(callbacks->dfsm_deliver_fn != NULL, NULL);
1232
1233 g_return_val_if_fail(callbacks->dfsm_get_state_fn != NULL, NULL);
1234 g_return_val_if_fail(callbacks->dfsm_process_state_update_fn != NULL, NULL);
1235 g_return_val_if_fail(callbacks->dfsm_process_update_fn != NULL, NULL);
1236 g_return_val_if_fail(callbacks->dfsm_commit_fn != NULL, NULL);
1237
1238 dfsm_t *dfsm;
1239
1240 if ((dfsm = g_new0(dfsm_t, 1)) == NULL)
1241 return NULL;
1242
1243 g_mutex_init(&dfsm->sync_mutex);
1244
1245 g_cond_init(&dfsm->sync_cond);
1246
1247 if (!(dfsm->results = g_hash_table_new(g_int64_hash, g_int64_equal)))
1248 goto err;
1249
1250 if (!(dfsm->msg_queue = g_sequence_new(NULL)))
1251 goto err;
1252
1253 dfsm->log_domain = log_domain;
1254 dfsm->data = data;
1255 dfsm->mode = DFSM_MODE_START;
1256 dfsm->protocol_version = protocol_version;
1257 strcpy (dfsm->cpg_group_name.value, group_name);
1258 dfsm->cpg_group_name.length = strlen (group_name) + 1;
1259
1260 dfsm->cpg_callbacks = &cpg_callbacks;
1261 dfsm->dfsm_callbacks = callbacks;
1262
1263 dfsm->members = g_hash_table_new(dfsm_sync_info_hash, dfsm_sync_info_equal);
1264 if (!dfsm->members)
1265 goto err;
1266
1267 g_mutex_init(&dfsm->mode_mutex);
1268
1269 return dfsm;
1270
1271 err:
1272 dfsm_destroy(dfsm);
1273 return NULL;
1274 }
1275
1276 gboolean
1277 dfsm_is_initialized(dfsm_t *dfsm)
1278 {
1279 g_return_val_if_fail(dfsm != NULL, FALSE);
1280
1281 return (dfsm->cpg_handle != 0) ? TRUE : FALSE;
1282 }
1283
1284 gboolean
1285 dfsm_lowest_nodeid(dfsm_t *dfsm)
1286 {
1287 g_return_val_if_fail(dfsm != NULL, FALSE);
1288
1289 if (dfsm->lowest_nodeid && (dfsm->lowest_nodeid == dfsm->nodeid))
1290 return TRUE;
1291
1292 return FALSE;
1293 }
1294
1295 cs_error_t
1296 dfsm_verify_request(dfsm_t *dfsm)
1297 {
1298 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1299
1300 /* only do when we have lowest nodeid */
1301 if (!dfsm->lowest_nodeid || (dfsm->lowest_nodeid != dfsm->nodeid))
1302 return CS_OK;
1303
1304 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1305 if (mode != DFSM_MODE_SYNCED)
1306 return CS_OK;
1307
1308 int len = 1;
1309 struct iovec iov[len];
1310
1311 if (dfsm->csum_counter != dfsm->csum_id) {
1312 g_message("delay verify request %016zX", dfsm->csum_counter + 1);
1313 return CS_OK;
1314 };
1315
1316 dfsm->csum_counter++;
1317 iov[0].iov_base = (char *)&dfsm->csum_counter;
1318 iov[0].iov_len = sizeof(dfsm->csum_counter);
1319
1320 cfs_debug("send verify request %016zX", dfsm->csum_counter);
1321
1322 cs_error_t result;
1323 result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY_REQUEST, iov, len);
1324
1325 if (result != CS_OK)
1326 cfs_dom_critical(dfsm->log_domain, "failed to send VERIFY_REQUEST message");
1327
1328 return result;
1329 }
1330
1331
1332 cs_error_t
1333 dfsm_dispatch(
1334 dfsm_t *dfsm,
1335 cs_dispatch_flags_t dispatch_types)
1336 {
1337 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1338 g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_INVALID_PARAM);
1339
1340 cs_error_t result;
1341
1342 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1343 int retries = 0;
1344 loop:
1345 result = cpg_dispatch(dfsm->cpg_handle, dispatch_types);
1346 if (result == CS_ERR_TRY_AGAIN) {
1347 nanosleep(&tvreq, NULL);
1348 ++retries;
1349 if ((retries % 10) == 0)
1350 cfs_dom_message(dfsm->log_domain, "cpg_dispatch retry %d", retries);
1351 goto loop;
1352 }
1353
1354 if (!(result == CS_OK || result == CS_ERR_TRY_AGAIN)) {
1355 cfs_dom_critical(dfsm->log_domain, "cpg_dispatch failed: %d", result);
1356 }
1357
1358 return result;
1359 }
1360
1361
1362 cs_error_t
1363 dfsm_initialize(dfsm_t *dfsm, int *fd)
1364 {
1365 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1366 g_return_val_if_fail(fd != NULL, CS_ERR_INVALID_PARAM);
1367
1368 /* remove old messages */
1369 dfsm_free_message_queue(dfsm);
1370 dfsm_send_sync_message_abort(dfsm);
1371
1372 dfsm->joined = FALSE;
1373 dfsm->we_are_member = 0;
1374
1375 dfsm_set_mode(dfsm, DFSM_MODE_START);
1376
1377 cs_error_t result;
1378
1379 if (dfsm->cpg_handle == 0) {
1380 if ((result = cpg_initialize(&dfsm->cpg_handle, dfsm->cpg_callbacks)) != CS_OK) {
1381 cfs_dom_critical(dfsm->log_domain, "cpg_initialize failed: %d", result);
1382 goto err_no_finalize;
1383 }
1384
1385 if ((result = cpg_local_get(dfsm->cpg_handle, &dfsm->nodeid)) != CS_OK) {
1386 cfs_dom_critical(dfsm->log_domain, "cpg_local_get failed: %d", result);
1387 goto err_finalize;
1388 }
1389
1390 dfsm->pid = getpid();
1391
1392 result = cpg_context_set(dfsm->cpg_handle, dfsm);
1393 if (result != CS_OK) {
1394 cfs_dom_critical(dfsm->log_domain, "cpg_context_set failed: %d", result);
1395 goto err_finalize;
1396 }
1397 }
1398
1399 result = cpg_fd_get(dfsm->cpg_handle, fd);
1400 if (result != CS_OK) {
1401 cfs_dom_critical(dfsm->log_domain, "cpg_fd_get failed: %d", result);
1402 goto err_finalize;
1403 }
1404
1405 return CS_OK;
1406
1407 err_finalize:
1408 cpg_finalize(dfsm->cpg_handle);
1409 err_no_finalize:
1410 dfsm->cpg_handle = 0;
1411 return result;
1412 }
1413
1414 cs_error_t
1415 dfsm_join(dfsm_t *dfsm)
1416 {
1417 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1418 g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_LIBRARY);
1419 g_return_val_if_fail(dfsm->joined == 0, CS_ERR_EXIST);
1420
1421 cs_error_t result;
1422
1423 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1424 int retries = 0;
1425 loop:
1426 result = cpg_join(dfsm->cpg_handle, &dfsm->cpg_group_name);
1427 if (result == CS_ERR_TRY_AGAIN) {
1428 nanosleep(&tvreq, NULL);
1429 ++retries;
1430 if ((retries % 10) == 0)
1431 cfs_dom_message(dfsm->log_domain, "cpg_join retry %d", retries);
1432 goto loop;
1433 }
1434
1435 if (result != CS_OK) {
1436 cfs_dom_critical(dfsm->log_domain, "cpg_join failed: %d", result);
1437 return result;
1438 }
1439
1440 dfsm->joined = TRUE;
1441 return TRUE;
1442 }
1443
1444 cs_error_t
1445 dfsm_leave (dfsm_t *dfsm)
1446 {
1447 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1448 g_return_val_if_fail(dfsm->joined, CS_ERR_NOT_EXIST);
1449
1450 cs_error_t result;
1451
1452 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1453 int retries = 0;
1454 loop:
1455 result = cpg_leave(dfsm->cpg_handle, &dfsm->cpg_group_name);
1456 if (result == CS_ERR_TRY_AGAIN) {
1457 nanosleep(&tvreq, NULL);
1458 ++retries;
1459 if ((retries % 10) == 0)
1460 cfs_dom_message(dfsm->log_domain, "cpg_leave retry %d", retries);
1461 goto loop;
1462 }
1463
1464 if (result != CS_OK) {
1465 cfs_dom_critical(dfsm->log_domain, "cpg_leave failed: %d", result);
1466 return result;
1467 }
1468
1469 dfsm->joined = FALSE;
1470
1471 return TRUE;
1472 }
1473
1474 gboolean
1475 dfsm_finalize(dfsm_t *dfsm)
1476 {
1477 g_return_val_if_fail(dfsm != NULL, FALSE);
1478
1479 dfsm_send_sync_message_abort(dfsm);
1480
1481 if (dfsm->joined)
1482 dfsm_leave(dfsm);
1483
1484 if (dfsm->cpg_handle) {
1485 cpg_finalize(dfsm->cpg_handle);
1486 dfsm->cpg_handle = 0;
1487 dfsm->joined = FALSE;
1488 dfsm->we_are_member = 0;
1489 }
1490
1491 return TRUE;
1492 }
1493
1494 void
1495 dfsm_destroy(dfsm_t *dfsm)
1496 {
1497 g_return_if_fail(dfsm != NULL);
1498
1499 dfsm_finalize(dfsm);
1500
1501 if (dfsm->sync_info && dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn)
1502 dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info);
1503
1504 dfsm_free_sync_queue(dfsm);
1505
1506 g_mutex_clear (&dfsm->mode_mutex);
1507
1508 g_mutex_clear (&dfsm->sync_mutex);
1509
1510 g_cond_clear (&dfsm->sync_cond);
1511
1512 if (dfsm->results)
1513 g_hash_table_destroy(dfsm->results);
1514
1515 if (dfsm->msg_queue) {
1516 dfsm_free_message_queue(dfsm);
1517 g_sequence_free(dfsm->msg_queue);
1518 }
1519
1520 if (dfsm->sync_info)
1521 g_free(dfsm->sync_info);
1522
1523 if (dfsm->cpg_handle)
1524 cpg_finalize(dfsm->cpg_handle);
1525
1526 if (dfsm->members)
1527 g_hash_table_destroy(dfsm->members);
1528
1529 g_free(dfsm);
1530 }
1531
1532 typedef struct {
1533 dfsm_t *dfsm;
1534 } service_dfsm_private_t;
1535
1536 static gboolean
1537 service_dfsm_finalize(
1538 cfs_service_t *service,
1539 gpointer context)
1540 {
1541 g_return_val_if_fail(service != NULL, FALSE);
1542 g_return_val_if_fail(context != NULL, FALSE);
1543
1544 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1545 dfsm_t *dfsm = private->dfsm;
1546
1547 g_return_val_if_fail(dfsm != NULL, FALSE);
1548
1549 return dfsm_finalize(dfsm);
1550 }
1551
1552 static int
1553 service_dfsm_initialize(
1554 cfs_service_t *service,
1555 gpointer context)
1556 {
1557 g_return_val_if_fail(service != NULL, -1);
1558 g_return_val_if_fail(context != NULL, -1);
1559
1560 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1561 dfsm_t *dfsm = private->dfsm;
1562
1563 g_return_val_if_fail(dfsm != NULL, -1);
1564
1565 /* serious internal error - don't try to recover */
1566 if (!dfsm_restartable(dfsm))
1567 return -1;
1568
1569 int fd = -1;
1570
1571 cs_error_t result;
1572 if ((result = dfsm_initialize(dfsm, &fd)) != CS_OK)
1573 return -1;
1574
1575 result = dfsm_join(dfsm);
1576 if (result != CS_OK) {
1577 /* we can't dispatch if not joined, so we need to finalize */
1578 dfsm_finalize(dfsm);
1579 return -1;
1580 }
1581
1582 return fd;
1583 }
1584
1585 static gboolean
1586 service_dfsm_dispatch(
1587 cfs_service_t *service,
1588 gpointer context)
1589 {
1590 g_return_val_if_fail(service != NULL, FALSE);
1591 g_return_val_if_fail(context != NULL, FALSE);
1592
1593 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1594 dfsm_t *dfsm = private->dfsm;
1595
1596 g_return_val_if_fail(dfsm != NULL, FALSE);
1597 g_return_val_if_fail(dfsm->cpg_handle != 0, FALSE);
1598
1599 cs_error_t result;
1600
1601 result = dfsm_dispatch(dfsm, CS_DISPATCH_ONE);
1602 if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE)
1603 goto finalize;
1604 if (result != CS_OK)
1605 goto fail;
1606
1607 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1608 if (mode >= DFSM_ERROR_MODE_START) {
1609 if (dfsm->joined) {
1610 result = dfsm_leave(dfsm);
1611 if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE)
1612 goto finalize;
1613 if (result != CS_OK)
1614 goto finalize;
1615 } else {
1616 if (!dfsm->we_are_member)
1617 return FALSE;
1618 }
1619 }
1620
1621 return TRUE;
1622
1623 finalize:
1624 dfsm_finalize(dfsm);
1625 fail:
1626 cfs_service_set_restartable(service, dfsm_restartable(dfsm));
1627 return FALSE;
1628 }
1629
1630 static void
1631 service_dfsm_timer(
1632 cfs_service_t *service,
1633 gpointer context)
1634 {
1635 g_return_if_fail(service != NULL);
1636 g_return_if_fail(context != NULL);
1637
1638 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1639 dfsm_t *dfsm = private->dfsm;
1640
1641 g_return_if_fail(dfsm != NULL);
1642
1643 dfsm_verify_request(dfsm);
1644 }
1645
1646 static cfs_service_callbacks_t cfs_dfsm_callbacks = {
1647 .cfs_service_initialize_fn = service_dfsm_initialize,
1648 .cfs_service_finalize_fn = service_dfsm_finalize,
1649 .cfs_service_dispatch_fn = service_dfsm_dispatch,
1650 .cfs_service_timer_fn = service_dfsm_timer,
1651 };
1652
1653 cfs_service_t *
1654 service_dfsm_new(dfsm_t *dfsm)
1655 {
1656 cfs_service_t *service;
1657
1658 g_return_val_if_fail(dfsm != NULL, NULL);
1659
1660 service_dfsm_private_t *private = g_new0(service_dfsm_private_t, 1);
1661 if (!private)
1662 return NULL;
1663
1664 private->dfsm = dfsm;
1665
1666 service = cfs_service_new(&cfs_dfsm_callbacks, dfsm->log_domain, private);
1667
1668 return service;
1669 }
1670
1671 void
1672 service_dfsm_destroy(cfs_service_t *service)
1673 {
1674 g_return_if_fail(service != NULL);
1675
1676 service_dfsm_private_t *private =
1677 (service_dfsm_private_t *)cfs_service_get_context(service);
1678
1679 g_free(private);
1680 g_free(service);
1681 }
1682
1683
1684
1685